diff --git a/libdevcore/Common.h b/libdevcore/Common.h index f1d35bbc7..4ed4e4365 100644 --- a/libdevcore/Common.h +++ b/libdevcore/Common.h @@ -45,6 +45,7 @@ #pragma warning(push) #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-parameter" +#pragma GCC diagnostic ignored "-fpermissive" #include #if (BOOST_VERSION == 105800) #include "boost_multiprecision_number_compare_bug_workaround.hpp" diff --git a/libp2p/Session.cpp b/libp2p/Session.cpp index 95e7be598..3d0bd0b6a 100644 --- a/libp2p/Session.cpp +++ b/libp2p/Session.cpp @@ -268,7 +268,7 @@ bool Session::interpret(PacketType _t, RLP const& _r) } catch (std::exception const& _e) { - clog(NetWarn) << "Peer causing an exception:" << _e.what() << _r; + clog(NetWarn) << "Exception caught in p2p::Session::interpret(): " << _e.what() << ". PacketType: " << _t << ". RLP: " << _r; disconnect(BadProtocol); return true; } diff --git a/libwhisper/Common.cpp b/libwhisper/Common.cpp index 32acbdd8e..79c87b96f 100644 --- a/libwhisper/Common.cpp +++ b/libwhisper/Common.cpp @@ -95,12 +95,12 @@ TopicFilter::TopicFilter(RLP const& _r) } } -FixedHash TopicFilter::exportBloom() const +FixedHash TopicFilter::exportBloom() const { - FixedHash ret; + FixedHash ret; for (TopicMask const& t: m_topicMasks) for (auto const& i: t) - ret |= i.first.template bloomPart(); + ret |= i.first.template bloomPart(); return ret; } diff --git a/libwhisper/Common.h b/libwhisper/Common.h index 71435603e..deb13d111 100644 --- a/libwhisper/Common.h +++ b/libwhisper/Common.h @@ -54,12 +54,12 @@ enum WhisperPacket { StatusPacket = 0, MessagesPacket, - AddFilterPacket, - RemoveFilterPacket, + TopicFilterPacket, PacketCount }; -enum { TopicBloomFilterSize = 8 }; +enum { TopicBloomFilterSize = 64 }; +enum { WhisperProtocolVersion = 3 }; using AbridgedTopic = FixedHash<4>; using Topic = h256; diff --git a/libwhisper/Interface.h b/libwhisper/Interface.h index ff16c7e53..f53cb17a7 100644 --- a/libwhisper/Interface.h +++ b/libwhisper/Interface.h @@ -67,7 +67,6 @@ public: virtual Topics const& fullTopics(unsigned _id) const = 0; virtual unsigned installWatch(Topics const& _filter) = 0; - virtual unsigned installWatchOnId(h256 _filterId) = 0; virtual void uninstallWatch(unsigned _watchId) = 0; virtual h256s peekWatch(unsigned _watchId) const = 0; virtual h256s checkWatch(unsigned _watchId) = 0; diff --git a/libwhisper/WhisperHost.cpp b/libwhisper/WhisperHost.cpp index d6759df6f..1a47a56d1 100644 --- a/libwhisper/WhisperHost.cpp +++ b/libwhisper/WhisperHost.cpp @@ -66,12 +66,13 @@ void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p) m_expiryQueue.insert(make_pair(_m.expiry(), h)); } -// if (_p) + DEV_GUARDED(m_filterLock) { - Guard l(m_filterLock); for (auto const& f: m_filters) if (f.second.filter.matches(_m)) - noteChanged(h, f.first); + for (auto& i: m_watches) + if (i.second.id == f.first) + i.second.changes.push_back(h); } // TODO p2p: capability-based rating @@ -85,36 +86,26 @@ void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p) } } -void WhisperHost::noteChanged(h256 _messageHash, h256 _filter) -{ - for (auto& i: m_watches) - if (i.second.id == _filter) - { - cwatshh << "!!!" << i.first << i.second.id; - i.second.changes.push_back(_messageHash); - } -} - -unsigned WhisperHost::installWatchOnId(h256 _h) -{ - auto ret = m_watches.size() ? m_watches.rbegin()->first + 1 : 0; - m_watches[ret] = ClientWatch(_h); - cwatshh << "+++" << ret << _h; - return ret; -} - unsigned WhisperHost::installWatch(shh::Topics const& _t) { - Guard l(m_filterLock); - InstalledFilter f(_t); h256 h = f.filter.sha3(); + unsigned ret = 0; - if (!m_filters.count(h)) - m_filters.insert(make_pair(h, f)); + DEV_GUARDED(m_filterLock) + { + if (!m_filters.count(h)) + m_filters.insert(make_pair(h, f)); + + ret = m_watches.size() ? m_watches.rbegin()->first + 1 : 0; + m_watches[ret] = ClientWatch(h); + cwatshh << "+++" << ret << h; - m_bloom.addRaw(f.filter.exportBloom()); - return installWatchOnId(h); + m_bloom.addRaw(f.filter.exportBloom()); + } + + noteAdvertiseTopicsOfInterest(); + return ret; } h256s WhisperHost::watchMessages(unsigned _watchId) @@ -142,21 +133,25 @@ void WhisperHost::uninstallWatch(unsigned _i) { cwatshh << "XXX" << _i; - Guard l(m_filterLock); + DEV_GUARDED(m_filterLock) + { + auto it = m_watches.find(_i); + if (it == m_watches.end()) + return; + auto id = it->second.id; + m_watches.erase(it); - auto it = m_watches.find(_i); - if (it == m_watches.end()) - return; - auto id = it->second.id; - m_watches.erase(it); + auto fit = m_filters.find(id); + if (fit != m_filters.end()) + { + if (!--fit->second.refCount) + m_filters.erase(fit); - auto fit = m_filters.find(id); - if (fit != m_filters.end()) - { - m_bloom.removeRaw(fit->second.filter.exportBloom()); - if (!--fit->second.refCount) - m_filters.erase(fit); + m_bloom.removeRaw(fit->second.filter.exportBloom()); + } } + + noteAdvertiseTopicsOfInterest(); } void WhisperHost::doWork() @@ -175,3 +170,9 @@ void WhisperHost::cleanup() for (auto it = m_expiryQueue.begin(); it != m_expiryQueue.end() && it->first <= now; it = m_expiryQueue.erase(it)) m_messages.erase(it->second); } + +void WhisperHost::noteAdvertiseTopicsOfInterest() +{ + for (auto i: peerSessions()) + i.first->cap().get()->noteAdvertiseTopicsOfInterest(); +} diff --git a/libwhisper/WhisperHost.h b/libwhisper/WhisperHost.h index 46c41f3a2..1a43eda3a 100644 --- a/libwhisper/WhisperHost.h +++ b/libwhisper/WhisperHost.h @@ -50,36 +50,31 @@ class WhisperHost: public HostCapability, public Interface, public public: WhisperHost(); virtual ~WhisperHost(); - - unsigned protocolVersion() const { return 2; } + unsigned protocolVersion() const { return WhisperProtocolVersion; } + /// remove old messages + void cleanup(); + std::map all() const { dev::ReadGuard l(x_messages); return m_messages; } + FixedHash bloom() const { dev::Guard l(m_filterLock); return m_bloom; } virtual void inject(Envelope const& _e, WhisperPeer* _from = nullptr) override; - virtual Topics const& fullTopics(unsigned _id) const override { try { return m_filters.at(m_watches.at(_id).id).full; } catch (...) { return EmptyTopics; } } virtual unsigned installWatch(Topics const& _filter) override; - virtual unsigned installWatchOnId(h256 _filterId) override; virtual void uninstallWatch(unsigned _watchId) override; virtual h256s peekWatch(unsigned _watchId) const override { dev::Guard l(m_filterLock); try { return m_watches.at(_watchId).changes; } catch (...) { return h256s(); } } virtual h256s checkWatch(unsigned _watchId) override { cleanup(); dev::Guard l(m_filterLock); h256s ret; try { ret = m_watches.at(_watchId).changes; m_watches.at(_watchId).changes.clear(); } catch (...) {} return ret; } - virtual h256s watchMessages(unsigned _watchId) override; /// returns IDs of messages, which match specific watch criteria - + /// returns IDs of messages, which match specific watch criteria + virtual h256s watchMessages(unsigned _watchId) override; virtual Envelope envelope(h256 _m) const override { try { dev::ReadGuard l(x_messages); return m_messages.at(_m); } catch (...) { return Envelope(); } } - std::map all() const { ReadGuard l(x_messages); return m_messages; } - - void cleanup(); - protected: virtual void doWork() override; + void noteAdvertiseTopicsOfInterest(); private: virtual void onStarting() override { startWorking(); } virtual void onStopping() override { stopWorking(); } - void streamMessage(h256 _m, RLPStream& _s) const; - void noteChanged(h256 _messageHash, h256 _filter); - mutable dev::SharedMutex x_messages; std::map m_messages; std::multimap m_expiryQueue; diff --git a/libwhisper/WhisperPeer.cpp b/libwhisper/WhisperPeer.cpp index 0be59592d..665364f49 100644 --- a/libwhisper/WhisperPeer.cpp +++ b/libwhisper/WhisperPeer.cpp @@ -19,11 +19,10 @@ * @date 2014 */ -#include "WhisperPeer.h" - #include #include #include "WhisperHost.h" + using namespace std; using namespace dev; using namespace dev::p2p; @@ -33,6 +32,7 @@ WhisperPeer::WhisperPeer(std::shared_ptr _s, HostCapabilityFace* _h, un { RLPStream s; sealAndSend(prep(s, StatusPacket, 1) << version()); + noteAdvertiseTopicsOfInterest(); } WhisperPeer::~WhisperPeer() @@ -62,6 +62,8 @@ bool WhisperPeer::interpret(unsigned _id, RLP const& _r) if (session()->id() < host()->host()->id()) sendMessages(); + + noteAdvertiseTopicsOfInterest(); break; } case MessagesPacket: @@ -70,6 +72,11 @@ bool WhisperPeer::interpret(unsigned _id, RLP const& _r) host()->inject(Envelope(i), this); break; } + case TopicFilterPacket: + { + setBloom((FixedHash)_r[0]); + break; + } default: return false; } @@ -78,6 +85,9 @@ bool WhisperPeer::interpret(unsigned _id, RLP const& _r) void WhisperPeer::sendMessages() { + if (m_advertiseTopicsOfInterest) + sendTopicsOfInterest(host()->bloom()); + RLPStream amalg; unsigned msgCount = 0; { @@ -104,3 +114,15 @@ void WhisperPeer::noteNewMessage(h256 _h, Envelope const& _m) Guard l(x_unseen); m_unseen.insert(make_pair(rating(_m), _h)); } + +void WhisperPeer::sendTopicsOfInterest(FixedHash const& _bloom) +{ + DEV_GUARDED(x_advertiseTopicsOfInterest) + m_advertiseTopicsOfInterest = false; + + RLPStream s; + prep(s, TopicFilterPacket, 1); + s << _bloom; + sealAndSend(s); +} + diff --git a/libwhisper/WhisperPeer.h b/libwhisper/WhisperPeer.h index 5cb124568..1be2df97e 100644 --- a/libwhisper/WhisperPeer.h +++ b/libwhisper/WhisperPeer.h @@ -44,8 +44,6 @@ using p2p::HostCapability; using p2p::Capability; using p2p::CapDesc; -/** - */ class WhisperPeer: public Capability { friend class WhisperHost; @@ -53,25 +51,30 @@ class WhisperPeer: public Capability public: WhisperPeer(std::shared_ptr _s, HostCapabilityFace* _h, unsigned _i, CapDesc const& _cap); virtual ~WhisperPeer(); - + WhisperHost* host() const; static std::string name() { return "shh"; } - static u256 version() { return 2; } + static u256 version() { return WhisperProtocolVersion; } static unsigned messageCount() { return PacketCount; } - - WhisperHost* host() const; + FixedHash bloom() const { dev::Guard g(x_bloom); return m_bloom; } + void sendTopicsOfInterest(FixedHash const& _bloom); ///< sends our bloom filter to remote peer + void noteAdvertiseTopicsOfInterest() { dev::Guard g(x_advertiseTopicsOfInterest); m_advertiseTopicsOfInterest = true; } private: virtual bool interpret(unsigned _id, RLP const&) override; - void sendMessages(); - unsigned rating(Envelope const&) const { return 0; } // TODO void noteNewMessage(h256 _h, Envelope const& _m); + void setBloom(FixedHash const& _b) { dev::Guard g(x_bloom); m_bloom = _b; } mutable dev::Mutex x_unseen; std::multimap m_unseen; ///< Rated according to what they want. - std::chrono::system_clock::time_point m_timer = std::chrono::system_clock::now(); + + mutable dev::Mutex x_bloom; + FixedHash m_bloom; ///< Peer's topics of interest + + mutable dev::Mutex x_advertiseTopicsOfInterest; + bool m_advertiseTopicsOfInterest; }; } diff --git a/test/libwhisper/bloomFilter.cpp b/test/libwhisper/bloomFilter.cpp index e49473bdc..3e71ca305 100644 --- a/test/libwhisper/bloomFilter.cpp +++ b/test/libwhisper/bloomFilter.cpp @@ -28,8 +28,7 @@ using namespace dev; using namespace dev::shh; using TopicBloomFilterShort = TopicBloomFilterBase<4>; -using TopicBloomFilterLong = TopicBloomFilterBase<8>; -using TopicBloomFilterTest = TopicBloomFilterLong; +using TopicBloomFilterTest = TopicBloomFilterBase; void testAddNonExisting(TopicBloomFilterShort& _f, AbridgedTopic const& _h) { @@ -59,7 +58,7 @@ void testRemoveExistingBloom(TopicBloomFilterShort& _f, AbridgedTopic const& _h) BOOST_REQUIRE(!_f.containsBloom(_h)); } -int calculateExpected(TopicBloomFilterTest const& f, int const n) +double calculateExpected(TopicBloomFilterTest const& f, int n) { int const m = f.size * 8; // number of bits in the bloom int const k = f.BitsPerBloom; // number of hash functions (e.g. bits set to 1 in every bloom) @@ -77,10 +76,10 @@ int calculateExpected(TopicBloomFilterTest const& f, int const n) for (int i = 0; i < k; ++i) kBitsSet *= single; - return static_cast(kBitsSet * 100 + 0.5); // in percents, rounded up + return kBitsSet; } -void testFalsePositiveRate(TopicBloomFilterTest const& f, int const inserted, Topic& x) +double testFalsePositiveRate(TopicBloomFilterTest const& f, int inserted, Topic& x) { int const c_sampleSize = 1000; int falsePositive = 0; @@ -93,12 +92,14 @@ void testFalsePositiveRate(TopicBloomFilterTest const& f, int const inserted, To ++falsePositive; } - falsePositive /= (c_sampleSize / 100); // in percents - int expected = calculateExpected(f, inserted); - int allowed = expected + (expected / 5); // allow deviations ~20% + double res = double(falsePositive) / double(c_sampleSize); - //cnote << "Inserted: " << inserted << ", False Positive Rate: " << falsePositive << ", Expected: " << expected; - BOOST_REQUIRE(falsePositive <= allowed); + double expected = calculateExpected(f, inserted); + double allowed = expected * 1.2 + 0.05; // allow deviations ~25% + + //cnote << "Inserted: " << inserted << ", False Positive Rate: " << res << ", Expected: " << expected; + BOOST_REQUIRE(res <= allowed); + return expected; } BOOST_AUTO_TEST_SUITE(bloomFilter) @@ -111,11 +112,13 @@ BOOST_AUTO_TEST_CASE(falsePositiveRate) TopicBloomFilterTest f; Topic x(0xC0DEFEED); // deterministic pseudorandom value - for (int i = 1; i < 21; ++i) + double expectedRate = 0; + + for (int i = 1; i < 50 && isless(expectedRate, 0.5); ++i) { x = sha3(x); f.addBloom(AbridgedTopic(x)); - testFalsePositiveRate(f, i, x); + expectedRate = testFalsePositiveRate(f, i, x); } } diff --git a/test/libwhisper/whisperTopic.cpp b/test/libwhisper/whisperTopic.cpp index f09705ccb..a152f756e 100644 --- a/test/libwhisper/whisperTopic.cpp +++ b/test/libwhisper/whisperTopic.cpp @@ -23,6 +23,7 @@ #include #include +#include #include #include #include @@ -301,4 +302,76 @@ BOOST_AUTO_TEST_CASE(asyncforwarding) BOOST_REQUIRE_EQUAL(result, 1); } +BOOST_AUTO_TEST_CASE(topicAdvertising) +{ + if (test::Options::get().nonetwork) + return; + + cnote << "Testing Topic Advertising..."; + VerbosityHolder setTemporaryLevel(2); + + Host host1("first", NetworkPreferences("127.0.0.1", 30303, false)); + host1.setIdealPeerCount(1); + auto whost1 = host1.registerCapability(new WhisperHost()); + host1.start(); + while (!host1.haveNetwork()) + this_thread::sleep_for(chrono::milliseconds(10)); + + Host host2("second", NetworkPreferences("127.0.0.1", 30305, false)); + host2.setIdealPeerCount(1); + auto whost2 = host2.registerCapability(new WhisperHost()); + whost2->installWatch(BuildTopicMask("test2")); + + host2.start(); + while (!host2.haveNetwork()) + this_thread::sleep_for(chrono::milliseconds(10)); + + host1.addNode(host2.id(), NodeIPEndpoint(bi::address::from_string("127.0.0.1"), 30305, 30305)); + while (!host1.haveNetwork()) + this_thread::sleep_for(chrono::milliseconds(10)); + + while (!host1.peerCount()) + this_thread::sleep_for(chrono::milliseconds(10)); + + while (!host2.peerCount()) + this_thread::sleep_for(chrono::milliseconds(10)); + + std::vector, std::shared_ptr>> sessions; + + for (int i = 0; i < 600; ++i) + { + sessions = whost1->peerSessions(); + if (!sessions.empty() && sessions.back().first->cap()->bloom()) + break; + else + this_thread::sleep_for(chrono::milliseconds(10)); + } + + BOOST_REQUIRE(sessions.size()); + FixedHash bf1 = sessions.back().first->cap()->bloom(); + FixedHash bf2 = whost2->bloom(); + BOOST_REQUIRE_EQUAL(bf1, bf2); + BOOST_REQUIRE(bf1); + BOOST_REQUIRE(!whost1->bloom()); + + whost1->installWatch(BuildTopicMask("test1")); + + for (int i = 0; i < 600; ++i) + { + sessions = whost2->peerSessions(); + if (!sessions.empty() && sessions.back().first->cap()->bloom()) + break; + else + this_thread::sleep_for(chrono::milliseconds(10)); + } + + BOOST_REQUIRE(sessions.size()); + BOOST_REQUIRE_EQUAL(sessions.back().second->id, host1.id()); + + bf2 = sessions.back().first->cap()->bloom(); + bf1 = whost1->bloom(); + BOOST_REQUIRE_EQUAL(bf1, bf2); + BOOST_REQUIRE(bf1); +} + BOOST_AUTO_TEST_SUITE_END()