From 784a786a82c254339f41146fa051e05a53198da9 Mon Sep 17 00:00:00 2001 From: Vlad Gluhovsky Date: Thu, 25 Jun 2015 12:40:19 +0200 Subject: [PATCH] Topics of interest advertising made asynchronous --- libwhisper/WhisperHost.cpp | 17 ++++++++++++----- libwhisper/WhisperHost.h | 11 ++++++----- libwhisper/WhisperPeer.cpp | 26 ++++++++++++-------------- libwhisper/WhisperPeer.h | 12 +++++++++--- test/libwhisper/whisperTopic.cpp | 4 ++-- 5 files changed, 41 insertions(+), 29 deletions(-) diff --git a/libwhisper/WhisperHost.cpp b/libwhisper/WhisperHost.cpp index a6eeed4e3..7d3a0d4e8 100644 --- a/libwhisper/WhisperHost.cpp +++ b/libwhisper/WhisperHost.cpp @@ -87,8 +87,10 @@ void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p) void WhisperHost::advertiseTopicsOfInterest() { + FixedHash b = bloom(); + for (auto i: peerSessions()) - i.first->cap().get()->advertiseTopicsOfInterest(); + i.first->cap().get()->sendTopicsOfInterest(b); } void WhisperHost::noteChanged(h256 _messageHash, h256 _filter) @@ -111,16 +113,15 @@ unsigned WhisperHost::installWatchOnId(h256 _h) unsigned WhisperHost::installWatch(shh::Topics const& _t) { - Guard l(m_filterLock); - InstalledFilter f(_t); h256 h = f.filter.sha3(); + Guard l(m_filterLock); if (!m_filters.count(h)) m_filters.insert(make_pair(h, f)); m_bloom.addRaw(f.filter.exportBloom()); - advertiseTopicsOfInterest(); + noteAdvertiseTopicsOfInterest(); return installWatchOnId(h); } @@ -165,7 +166,7 @@ void WhisperHost::uninstallWatch(unsigned _i) m_filters.erase(fit); m_bloom.removeRaw(fit->second.filter.exportBloom()); - advertiseTopicsOfInterest(); + noteAdvertiseTopicsOfInterest(); } } @@ -185,3 +186,9 @@ void WhisperHost::cleanup() for (auto it = m_expiryQueue.begin(); it != m_expiryQueue.end() && it->first <= now; it = m_expiryQueue.erase(it)) m_messages.erase(it->second); } + +void WhisperHost::noteAdvertiseTopicsOfInterest() +{ + for (auto i: peerSessions()) + i.first->cap().get()->noteAdvertiseTopicsOfInterest(); +} diff --git a/libwhisper/WhisperHost.h b/libwhisper/WhisperHost.h index 851031177..69a1280aa 100644 --- a/libwhisper/WhisperHost.h +++ b/libwhisper/WhisperHost.h @@ -51,9 +51,9 @@ public: WhisperHost(); virtual ~WhisperHost(); unsigned protocolVersion() const { return 2; } - void cleanup(); - std::map all() const { ReadGuard l(x_messages); return m_messages; } - FixedHash const& bloom() const { return m_bloom; } + void cleanup(); ///< remove old messages + std::map all() const { dev::ReadGuard l(x_messages); return m_messages; } + FixedHash bloom() const { dev::Guard l(m_filterLock); return m_bloom; } virtual void inject(Envelope const& _e, WhisperPeer* _from = nullptr) override; virtual Topics const& fullTopics(unsigned _id) const override { try { return m_filters.at(m_watches.at(_id).id).full; } catch (...) { return EmptyTopics; } } @@ -62,11 +62,12 @@ public: virtual void uninstallWatch(unsigned _watchId) override; virtual h256s peekWatch(unsigned _watchId) const override { dev::Guard l(m_filterLock); try { return m_watches.at(_watchId).changes; } catch (...) { return h256s(); } } virtual h256s checkWatch(unsigned _watchId) override { cleanup(); dev::Guard l(m_filterLock); h256s ret; try { ret = m_watches.at(_watchId).changes; m_watches.at(_watchId).changes.clear(); } catch (...) {} return ret; } - virtual h256s watchMessages(unsigned _watchId) override; /// returns IDs of messages, which match specific watch criteria + virtual h256s watchMessages(unsigned _watchId) override; ///< returns IDs of messages, which match specific watch criteria virtual Envelope envelope(h256 _m) const override { try { dev::ReadGuard l(x_messages); return m_messages.at(_m); } catch (...) { return Envelope(); } } protected: virtual void doWork() override; + void noteAdvertiseTopicsOfInterest(); private: virtual void onStarting() override { startWorking(); } @@ -74,7 +75,7 @@ private: void streamMessage(h256 _m, RLPStream& _s) const; void noteChanged(h256 _messageHash, h256 _filter); - void advertiseTopicsOfInterest(); + void advertiseTopicsOfInterest(); ///< send our bloom filter to remote peers mutable dev::SharedMutex x_messages; std::map m_messages; diff --git a/libwhisper/WhisperPeer.cpp b/libwhisper/WhisperPeer.cpp index 6fb03e9a0..ff73732ce 100644 --- a/libwhisper/WhisperPeer.cpp +++ b/libwhisper/WhisperPeer.cpp @@ -31,11 +31,8 @@ using namespace dev::shh; WhisperPeer::WhisperPeer(std::shared_ptr _s, HostCapabilityFace* _h, unsigned _i, CapDesc const&): Capability(_s, _h, _i) { RLPStream s; - //sealAndSend(prep(s, StatusPacket, 1) << version()); - prep(s, StatusPacket, 2); - s << version(); - s << host()->bloom(); - sealAndSend(s); + sealAndSend(prep(s, StatusPacket, 1) << version()); + noteAdvertiseTopicsOfInterest(); } WhisperPeer::~WhisperPeer() @@ -59,23 +56,19 @@ bool WhisperPeer::interpret(unsigned _id, RLP const& _r) if (protocolVersion != version()) disable("Invalid protocol version."); - else - { - if (_r.itemCount() > 1) // for backwards compatibility - m_bloom = (FixedHash)_r[1]; - advertiseTopicsOfInterest(); - } for (auto const& m: host()->all()) m_unseen.insert(make_pair(0, m.first)); if (session()->id() < host()->host()->id()) sendMessages(); + + noteAdvertiseTopicsOfInterest(); break; } case TopicFilterPacket: { - m_bloom = (FixedHash)_r[0]; + setBloom((FixedHash)_r[0]); break; } case MessagesPacket: @@ -92,6 +85,9 @@ bool WhisperPeer::interpret(unsigned _id, RLP const& _r) void WhisperPeer::sendMessages() { + if (m_advertiseTopicsOfInterest) + sendTopicsOfInterest(host()->bloom()); + RLPStream amalg; unsigned msgCount = 0; { @@ -119,10 +115,12 @@ void WhisperPeer::noteNewMessage(h256 _h, Envelope const& _m) m_unseen.insert(make_pair(rating(_m), _h)); } -void WhisperPeer::advertiseTopicsOfInterest() +void WhisperPeer::sendTopicsOfInterest(FixedHash const& _bloom) { RLPStream s; prep(s, TopicFilterPacket, 1); - s << host()->bloom(); + s << _bloom; sealAndSend(s); + + m_advertiseTopicsOfInterest = false; } diff --git a/libwhisper/WhisperPeer.h b/libwhisper/WhisperPeer.h index 431349878..9c242dc1f 100644 --- a/libwhisper/WhisperPeer.h +++ b/libwhisper/WhisperPeer.h @@ -53,21 +53,27 @@ public: virtual ~WhisperPeer(); WhisperHost* host() const; static std::string name() { return "shh"; } - static u256 version() { return 2; } + static u256 version() { return 3; } static unsigned messageCount() { return PacketCount; } - FixedHash const& bloom() const { return m_bloom; } - void advertiseTopicsOfInterest(); ///< sends our bloom filter to remote peer + FixedHash bloom() const { dev::Guard g(x_bloom); return m_bloom; } + void sendTopicsOfInterest(FixedHash const& _bloom); ///< sends our bloom filter to remote peer + void noteAdvertiseTopicsOfInterest() { m_advertiseTopicsOfInterest = true; } private: virtual bool interpret(unsigned _id, RLP const&) override; void sendMessages(); unsigned rating(Envelope const&) const { return 0; } // TODO void noteNewMessage(h256 _h, Envelope const& _m); + void setBloom(FixedHash const& _b) { dev::Guard g(x_bloom); m_bloom = _b; } mutable dev::Mutex x_unseen; std::multimap m_unseen; ///< Rated according to what they want. std::chrono::system_clock::time_point m_timer = std::chrono::system_clock::now(); + + mutable dev::Mutex x_bloom; FixedHash m_bloom; ///< Peer's topics of interest + + bool m_advertiseTopicsOfInterest; }; } diff --git a/test/libwhisper/whisperTopic.cpp b/test/libwhisper/whisperTopic.cpp index 2940ebd6c..a152f756e 100644 --- a/test/libwhisper/whisperTopic.cpp +++ b/test/libwhisper/whisperTopic.cpp @@ -338,7 +338,7 @@ BOOST_AUTO_TEST_CASE(topicAdvertising) std::vector, std::shared_ptr>> sessions; - for (int i = 0; i < 300; ++i) + for (int i = 0; i < 600; ++i) { sessions = whost1->peerSessions(); if (!sessions.empty() && sessions.back().first->cap()->bloom()) @@ -356,7 +356,7 @@ BOOST_AUTO_TEST_CASE(topicAdvertising) whost1->installWatch(BuildTopicMask("test1")); - for (int i = 0; i < 300; ++i) + for (int i = 0; i < 600; ++i) { sessions = whost2->peerSessions(); if (!sessions.empty() && sessions.back().first->cap()->bloom())