diff --git a/libqethereum/QEthereum.cpp b/libqethereum/QEthereum.cpp index a71aca2b4..e77c9d632 100644 --- a/libqethereum/QEthereum.cpp +++ b/libqethereum/QEthereum.cpp @@ -595,7 +595,7 @@ static shh::Envelope toSealed(QString _json, shh::Message const& _m, Secret _fro for (auto i: f["topic"].toArray()) bt.shift(asBytes(padded(i.toString(), 32))); } - return _m.seal(_from, bt, workToProve, ttl); + return _m.seal(_from, bt, ttl, workToProve); } void QWhisper::doPost(QString _json) diff --git a/libwhisper/WhisperHost.cpp b/libwhisper/WhisperHost.cpp index 25c1a06bf..26ca3d93b 100644 --- a/libwhisper/WhisperHost.cpp +++ b/libwhisper/WhisperHost.cpp @@ -53,6 +53,9 @@ void WhisperHost::streamMessage(h256 _m, RLPStream& _s) const void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p) { + if (_m.expiry() <= time(0)) + return; + auto h = _m.sha3(); { UpgradableGuard l(x_messages); @@ -60,6 +63,7 @@ void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p) return; UpgradeGuard ll(l); m_messages[h] = _m; + m_expiryQueue[_m.expiry()] = h; } // if (_p) @@ -109,6 +113,7 @@ unsigned WhisperHost::installWatch(shh::TopicFilter const& _f) h256s WhisperHost::watchMessages(unsigned _watchId) { + cleanup(); h256s ret; auto wit = m_watches.find(_watchId); if (wit == m_watches.end()) @@ -145,3 +150,13 @@ void WhisperHost::uninstallWatch(unsigned _i) if (!--fit->second.refCount) m_filters.erase(fit); } + +void WhisperHost::cleanup() +{ + // remove old messages. + // should be called every now and again. + auto now = time(0); + WriteGuard l(x_messages); + for (auto it = m_expiryQueue.begin(); it != m_expiryQueue.end() && it->first <= now; it = m_expiryQueue.erase(it)) + m_messages.erase(it->second); +} diff --git a/libwhisper/WhisperHost.h b/libwhisper/WhisperHost.h index 9ad859967..2eb4856ee 100644 --- a/libwhisper/WhisperHost.h +++ b/libwhisper/WhisperHost.h @@ -55,11 +55,13 @@ public: virtual unsigned installWatchOnId(h256 _filterId) override; virtual void uninstallWatch(unsigned _watchId) override; virtual h256s peekWatch(unsigned _watchId) const override { dev::Guard l(m_filterLock); try { return m_watches.at(_watchId).changes; } catch (...) { return h256s(); } } - virtual h256s checkWatch(unsigned _watchId) override { dev::Guard l(m_filterLock); h256s ret; try { ret = m_watches.at(_watchId).changes; m_watches.at(_watchId).changes.clear(); } catch (...) {} return ret; } + virtual h256s checkWatch(unsigned _watchId) override { cleanup(); dev::Guard l(m_filterLock); h256s ret; try { ret = m_watches.at(_watchId).changes; m_watches.at(_watchId).changes.clear(); } catch (...) {} return ret; } virtual h256s watchMessages(unsigned _watchId) override; virtual Envelope envelope(h256 _m) const override { try { dev::ReadGuard l(x_messages); return m_messages.at(_m); } catch (...) { return Envelope(); } } + void cleanup(); + private: void streamMessage(h256 _m, RLPStream& _s) const; @@ -67,6 +69,7 @@ private: mutable dev::SharedMutex x_messages; std::map m_messages; + std::map m_expiryQueue; mutable dev::Mutex m_filterLock; std::map m_filters;