From d254972f405be53da7f945f46a7baaf5ed9c00b1 Mon Sep 17 00:00:00 2001 From: Vlad Gluhovsky Date: Mon, 22 Jun 2015 21:42:08 +0200 Subject: [PATCH] bloom filter exchage protocol --- libwhisper/WhisperHost.h | 10 ++--- libwhisper/WhisperPeer.cpp | 21 +++++---- libwhisper/WhisperPeer.h | 20 +++------ test/libwhisper/whisperTopic.cpp | 74 ++++++++++++++++++++++++++++++++ 4 files changed, 95 insertions(+), 30 deletions(-) diff --git a/libwhisper/WhisperHost.h b/libwhisper/WhisperHost.h index 7020e3f63..b5e321dea 100644 --- a/libwhisper/WhisperHost.h +++ b/libwhisper/WhisperHost.h @@ -50,11 +50,12 @@ class WhisperHost: public HostCapability, public Interface, public public: WhisperHost(); virtual ~WhisperHost(); - unsigned protocolVersion() const { return 2; } + void cleanup(); + std::map all() const { ReadGuard l(x_messages); return m_messages; } + FixedHash const& bloom() const { return m_bloom; } virtual void inject(Envelope const& _e, WhisperPeer* _from = nullptr) override; - virtual Topics const& fullTopics(unsigned _id) const override { try { return m_filters.at(m_watches.at(_id).id).full; } catch (...) { return EmptyTopics; } } virtual unsigned installWatch(Topics const& _filter) override; virtual unsigned installWatchOnId(h256 _filterId) override; @@ -62,13 +63,8 @@ public: virtual h256s peekWatch(unsigned _watchId) const override { dev::Guard l(m_filterLock); try { return m_watches.at(_watchId).changes; } catch (...) { return h256s(); } } virtual h256s checkWatch(unsigned _watchId) override { cleanup(); dev::Guard l(m_filterLock); h256s ret; try { ret = m_watches.at(_watchId).changes; m_watches.at(_watchId).changes.clear(); } catch (...) {} return ret; } virtual h256s watchMessages(unsigned _watchId) override; /// returns IDs of messages, which match specific watch criteria - virtual Envelope envelope(h256 _m) const override { try { dev::ReadGuard l(x_messages); return m_messages.at(_m); } catch (...) { return Envelope(); } } - std::map all() const { ReadGuard l(x_messages); return m_messages; } - - void cleanup(); - protected: virtual void doWork() override; diff --git a/libwhisper/WhisperPeer.cpp b/libwhisper/WhisperPeer.cpp index 144de3ec6..826b7842d 100644 --- a/libwhisper/WhisperPeer.cpp +++ b/libwhisper/WhisperPeer.cpp @@ -19,11 +19,10 @@ * @date 2014 */ -#include "WhisperPeer.h" - #include #include #include "WhisperHost.h" + using namespace std; using namespace dev; using namespace dev::p2p; @@ -32,7 +31,10 @@ using namespace dev::shh; WhisperPeer::WhisperPeer(std::shared_ptr _s, HostCapabilityFace* _h, unsigned _i, CapDesc const&): Capability(_s, _h, _i) { RLPStream s; - sealAndSend(prep(s, StatusPacket, 1) << version()); + prep(s, StatusPacket, 2); + s << version(); + s << host()->bloom(); + sealAndSend(s); } WhisperPeer::~WhisperPeer() @@ -57,6 +59,9 @@ bool WhisperPeer::interpret(unsigned _id, RLP const& _r) if (protocolVersion != version()) disable("Invalid protocol version."); + if (_r.itemCount() > 1) // for backwards compatibility + m_bloom = (FixedHash)_r[1]; + for (auto const& m: host()->all()) m_unseen.insert(make_pair(0, m.first)); @@ -64,15 +69,15 @@ bool WhisperPeer::interpret(unsigned _id, RLP const& _r) sendMessages(); break; } - case MessagesPacket: + case UpdateTopicFilterPacket: { - for (auto i: _r) - host()->inject(Envelope(i), this); + m_bloom = (FixedHash)_r[0]; break; } - case UpdateTopicFilterPacket: + case MessagesPacket: { - m_bloom = (FixedHash)_r; + for (auto i: _r) + host()->inject(Envelope(i), this); break; } default: diff --git a/libwhisper/WhisperPeer.h b/libwhisper/WhisperPeer.h index f934abb2a..95b4a1952 100644 --- a/libwhisper/WhisperPeer.h +++ b/libwhisper/WhisperPeer.h @@ -44,8 +44,6 @@ using p2p::HostCapability; using p2p::Capability; using p2p::CapDesc; -/** - */ class WhisperPeer: public Capability { friend class WhisperHost; @@ -53,31 +51,23 @@ class WhisperPeer: public Capability public: WhisperPeer(std::shared_ptr _s, HostCapabilityFace* _h, unsigned _i, CapDesc const& _cap); virtual ~WhisperPeer(); - + WhisperHost* host() const; static std::string name() { return "shh"; } static u256 version() { return 2; } static unsigned messageCount() { return PacketCount; } - - WhisperHost* host() const; - - FixedHash const& bloom() const { return m_bloom; } - - /// called by the host, sends our bloom filter to remote peer - void advertizeTopicsOfInterest(FixedHash const& _bloom); + FixedHash const& bloom() const { return m_bloom; } + void advertizeTopicsOfInterest(FixedHash const& _bloom); ///< sends our bloom filter to remote peer + FixedHash const& bloom() { return m_bloom; } private: virtual bool interpret(unsigned _id, RLP const&) override; - void sendMessages(); - unsigned rating(Envelope const&) const { return 0; } // TODO void noteNewMessage(h256 _h, Envelope const& _m); mutable dev::Mutex x_unseen; std::multimap m_unseen; ///< Rated according to what they want. - - std::chrono::system_clock::time_point m_timer = std::chrono::system_clock::now(); - + std::chrono::system_clock::time_point m_timer = std::chrono::system_clock::now(); FixedHash m_bloom; ///< Peer's topics of interest }; diff --git a/test/libwhisper/whisperTopic.cpp b/test/libwhisper/whisperTopic.cpp index f09705ccb..56b1eea02 100644 --- a/test/libwhisper/whisperTopic.cpp +++ b/test/libwhisper/whisperTopic.cpp @@ -23,6 +23,7 @@ #include #include +#include #include #include #include @@ -301,4 +302,77 @@ BOOST_AUTO_TEST_CASE(asyncforwarding) BOOST_REQUIRE_EQUAL(result, 1); } +BOOST_AUTO_TEST_CASE(topicAdvertising) +{ + if (test::Options::get().nonetwork) + return; + + cnote << "Testing Topic Advertising..."; + VerbosityHolder setTemporaryLevel(2); + + Host host1("first", NetworkPreferences("127.0.0.1", 30303, false)); + host1.setIdealPeerCount(1); + auto whost1 = host1.registerCapability(new WhisperHost()); + host1.start(); + while (!host1.haveNetwork()) + this_thread::sleep_for(chrono::milliseconds(10)); + + Host host2("second", NetworkPreferences("127.0.0.1", 30305, false)); + host2.setIdealPeerCount(1); + auto whost2 = host2.registerCapability(new WhisperHost()); + auto watch2 = whost2->installWatch(BuildTopicMask("test2")); + + host2.start(); + while (!host2.haveNetwork()) + this_thread::sleep_for(chrono::milliseconds(10)); + + host1.addNode(host2.id(), NodeIPEndpoint(bi::address::from_string("127.0.0.1"), 30305, 30305)); + while (!host1.haveNetwork()) + this_thread::sleep_for(chrono::milliseconds(10)); + + while (!host1.peerCount()) + this_thread::sleep_for(chrono::milliseconds(10)); + + while (!host2.peerCount()) + this_thread::sleep_for(chrono::milliseconds(10)); + + while (!whost1->peerSessions().size()) + this_thread::sleep_for(chrono::milliseconds(10)); + + for (int i = 0; i < 200; ++i) + { + this_thread::sleep_for(chrono::milliseconds(10)); + auto sessions1 = whost1->peerSessions(); + size_t x = sessions1.size(); + BOOST_REQUIRE(x > 0); + if (whost1->peerSessions()[x-1].first->cap()->bloom()) + break; + } + + BOOST_REQUIRE(whost1->peerSessions().size()); + FixedHash bf1 = whost1->peerSessions().back().first->cap()->bloom(); + FixedHash bf2 = whost2->bloom(); + BOOST_REQUIRE_EQUAL(bf1, bf2); + BOOST_REQUIRE(bf1); + BOOST_REQUIRE(!whost1->bloom()); + + auto watch1 = whost1->installWatch(BuildTopicMask("test1")); + + for (int i = 0; i < 300; ++i) + { + this_thread::sleep_for(chrono::milliseconds(10)); + if (whost2->peerSessions().back().first->cap()->bloom()) + break; + } + + auto sessions2 = whost2->peerSessions(); + BOOST_REQUIRE(sessions2.size()); + BOOST_REQUIRE_EQUAL(sessions2.back().second->id, host1.id()); + + bf2 = sessions2.back().first->cap()->bloom(); + bf1 = whost1->bloom(); + BOOST_REQUIRE_EQUAL(bf1, bf2); + BOOST_REQUIRE(bf1); +} + BOOST_AUTO_TEST_SUITE_END()