From 5e0fd432ca19a834cf2eca12b07681a47520a90c Mon Sep 17 00:00:00 2001 From: Vlad Gluhovsky Date: Sun, 19 Jul 2015 01:17:13 +0200 Subject: [PATCH 1/3] Message save/load functionality upgraded --- libwhisper/Message.h | 19 ++++++++----- libwhisper/WhisperDB.cpp | 58 ++++++++++++++++++++++++++++++++++++-- libwhisper/WhisperDB.h | 6 ++++ libwhisper/WhisperHost.cpp | 49 +++++++++++++++++--------------- test/libp2p/peer.cpp | 9 +++--- 5 files changed, 103 insertions(+), 38 deletions(-) diff --git a/libwhisper/Message.h b/libwhisper/Message.h index ae61f210a..8953d1200 100644 --- a/libwhisper/Message.h +++ b/libwhisper/Message.h @@ -64,10 +64,11 @@ public: Envelope() {} Envelope(RLP const& _m); - operator bool() const { return !!m_expiry; } - void streamRLP(RLPStream& _s, IncludeNonce _withNonce = WithNonce) const { _s.appendList(_withNonce ? 5 : 4) << m_expiry << m_ttl << m_topic << m_data; if (_withNonce) _s << m_nonce; } h256 sha3(IncludeNonce _withNonce = WithNonce) const { RLPStream s; streamRLP(s, _withNonce); return dev::sha3(s.out()); } + Message open(Topics const& _t, Secret const& _s = Secret()) const; + unsigned workProved() const; + void proveWork(unsigned _ms); unsigned sent() const { return m_expiry - m_ttl; } unsigned expiry() const { return m_expiry; } @@ -75,12 +76,12 @@ public: AbridgedTopics const& topic() const { return m_topic; } bytes const& data() const { return m_data; } - Message open(Topics const& _t, Secret const& _s = Secret()) const; - - unsigned workProved() const; - void proveWork(unsigned _ms); - bool matchesBloomFilter(TopicBloomFilterHash const& f) const; + void setStoreForever() { m_storeForever = true; } + bool isStoreForever() const { return m_storeForever; } + bool isExpired() const { return !m_storeForever && m_expiry <= (unsigned)time(0); } + void setWatched() { m_watched = true; } + bool isWatched() const { return m_watched; } private: Envelope(unsigned _exp, unsigned _ttl, AbridgedTopics const& _topic): m_expiry(_exp), m_ttl(_ttl), m_topic(_topic) {} @@ -91,6 +92,10 @@ private: AbridgedTopics m_topic; bytes m_data; + + /// Metainformation + bool m_storeForever = false; + bool m_watched = false; }; enum /*Message Flags*/ diff --git a/libwhisper/WhisperDB.cpp b/libwhisper/WhisperDB.cpp index 7104723d7..9c4c77f43 100644 --- a/libwhisper/WhisperDB.cpp +++ b/libwhisper/WhisperDB.cpp @@ -84,7 +84,6 @@ void WhisperDB::loadAll(std::map& o_dst) op.fill_cache = false; op.verify_checksums = true; vector wasted; - unsigned now = (unsigned)time(0); unique_ptr it(m_db->NewIterator(op)); for (it->SeekToFirst(); it->Valid(); it->Next()) @@ -96,16 +95,18 @@ void WhisperDB::loadAll(std::map& o_dst) try { RLP rlp((byte const*)v.data(), v.size()); - Envelope e(rlp); + Envelope e(rlp[1]); h256 h2 = e.sha3(); h256 h1; + readMetaInfo(rlp[0], e); + if (k.size() == h256::size) h1 = h256((byte const*)k.data(), h256::ConstructFromPointer); if (h1 != h2) cwarn << "Corrupted data in Level DB:" << h1.hex() << "versus" << h2.hex(); - else if (e.expiry() > now) + else if (!e.isExpired()) { o_dst[h1] = e; useless = false; @@ -134,3 +135,54 @@ void WhisperDB::loadAll(std::map& o_dst) } } +void WhisperDB::save(h256 const& _key, Envelope const& _e) +{ + try + { + RLPStream meta; + streamMetaInfo(meta, _e); + + RLPStream msg; + _e.streamRLP(msg); + + RLPStream res(2); + res.appendRaw(meta.out()); + res.appendRaw(msg.out()); + + bytes b; + res.swapOut(b); + insert(_key, b); + } + catch(RLPException const& ex) + { + cwarn << "RLPException in WhisperDB::save():" << ex.what(); + } + catch(FailedInsertInLevelDB const& ex) + { + cwarn << "Exception in WhisperDB::save() - failed to insert:" << ex.what(); + } +} + +void WhisperDB::streamMetaInfo(RLPStream& _rlp, Envelope const& _e) +{ + uint32_t x = 0; + + if (_e.isStoreForever()) + x |= StoreForeverFlag; + + if (_e.isWatched()) + x |= WatchedFlag; + + _rlp.append(x); +} + +void WhisperDB::readMetaInfo(RLP const& _rlp, Envelope& _e) +{ + unsigned x = (uint32_t)_rlp; + + if (x & StoreForeverFlag) + _e.setStoreForever(); + + if (x & WatchedFlag) + _e.setWatched(); +} diff --git a/libwhisper/WhisperDB.h b/libwhisper/WhisperDB.h index 5079dfeb4..1fc20b9ab 100644 --- a/libwhisper/WhisperDB.h +++ b/libwhisper/WhisperDB.h @@ -47,11 +47,17 @@ class WhisperDB void insert(dev::h256 const& _key, bytes const& _value); void kill(dev::h256 const& _key); void loadAll(std::map& o_dst); + void save(dev::h256 const& _key, Envelope const& _e); private: + void streamMetaInfo(RLPStream& _rlp, Envelope const& _e); + void readMetaInfo(RLP const& _rlp, Envelope& _e); + leveldb::ReadOptions m_readOptions; leveldb::WriteOptions m_writeOptions; std::unique_ptr m_db; + + enum MetaInformation { StoreForeverFlag = 1, WatchedFlag = 2 }; }; } diff --git a/libwhisper/WhisperHost.cpp b/libwhisper/WhisperHost.cpp index 82c173378..781e6d73f 100644 --- a/libwhisper/WhisperHost.cpp +++ b/libwhisper/WhisperHost.cpp @@ -59,7 +59,7 @@ void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p) cnote << this << ": inject: " << _m.expiry() << _m.ttl() << _m.topic() << toHex(_m.data()); - if (_m.expiry() <= (unsigned)time(0)) + if (_m.isExpired()) return; auto h = _m.sha3(); @@ -69,13 +69,15 @@ void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p) return; UpgradeGuard ll(l); m_messages[h] = _m; - m_expiryQueue.insert(make_pair(_m.expiry(), h)); + if (!_m.isStoreForever()) + m_expiryQueue.insert(make_pair(_m.expiry(), h)); } // rating of incoming message from remote host is assessed according to the following criteria: // 1. installed watch match; 2. bloom filter match; 2. ttl; 3. proof of work int rating = 0; + bool match_watch = false; DEV_GUARDED(m_filterLock) if (_m.matchesBloomFilter(m_bloom)) @@ -84,10 +86,11 @@ void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p) for (auto const& f: m_filters) if (f.second.filter.matches(_m)) for (auto& i: m_watches) - if (i.second.id == f.first) + if (i.second.id == f.first) // match one of the watches { i.second.changes.push_back(h); rating += 2; + match_watch = true; } } @@ -100,6 +103,14 @@ void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p) rating += _m.workProved(); } + if (match_watch) + { + WriteGuard g(x_messages); + auto j = m_messages.find(h); + if (m_messages.end() != j) + j->second.setWatched(); + } + // TODO p2p: capability-based rating for (auto i: peerSessions()) { @@ -195,7 +206,12 @@ void WhisperHost::cleanup() unsigned now = (unsigned)time(0); WriteGuard l(x_messages); for (auto it = m_expiryQueue.begin(); it != m_expiryQueue.end() && it->first <= now; it = m_expiryQueue.erase(it)) - m_messages.erase(it->second); + { + auto j = m_messages.find(it->second); + if (j != m_messages.end()) + if (!j->second.isStoreForever()) + m_messages.erase(it->second); + } } void WhisperHost::noteAdvertiseTopicsOfInterest() @@ -213,31 +229,15 @@ void WhisperHost::saveMessagesToBD() { WhisperDB db; ReadGuard g(x_messages); + unsigned now = (unsigned)time(0); for (auto const& m: m_messages) - { - RLPStream rlp; - m.second.streamRLP(rlp); - bytes b; - rlp.swapOut(b); - db.insert(m.first, b); - } + if (m.second.isStoreForever() || (m.second.expiry() > now && m.second.isWatched())) + db.save(m.first, m.second); } catch(FailedToOpenLevelDB const& ex) { cwarn << "Exception in WhisperHost::saveMessagesToBD() - failed to open DB:" << ex.what(); } - catch(FailedInsertInLevelDB const& ex) - { - cwarn << "Exception in WhisperHost::saveMessagesToBD() - failed to insert:" << ex.what(); - } - catch(FailedLookupInLevelDB const& ex) - { - cwarn << "Exception in WhisperHost::saveMessagesToBD() - failed lookup:" << ex.what(); - } - catch(FailedDeleteInLevelDB const& ex) - { - cwarn << "Exception in WhisperHost::saveMessagesToBD() - failed to delete:" << ex.what(); - } catch(Exception const& ex) { cwarn << "Exception in WhisperHost::saveMessagesToBD():" << ex.what(); @@ -260,6 +260,9 @@ void WhisperHost::loadMessagesFromBD() db.loadAll(m); WriteGuard g(x_messages); m_messages.swap(m); + for (auto const& msg: m) + if (!msg.second.isStoreForever()) + m_expiryQueue.insert(make_pair(msg.second.expiry(), msg.first)); } catch(Exception const& ex) { diff --git a/test/libp2p/peer.cpp b/test/libp2p/peer.cpp index 03cac3c80..acc420c96 100644 --- a/test/libp2p/peer.cpp +++ b/test/libp2p/peer.cpp @@ -44,8 +44,8 @@ BOOST_AUTO_TEST_CASE(host) return; VerbosityHolder setTemporaryLevel(10); - NetworkPreferences host1prefs("127.0.0.1", 30301, false); - NetworkPreferences host2prefs("127.0.0.1", 30302, false); + NetworkPreferences host1prefs("127.0.0.1", 30311, false); + NetworkPreferences host2prefs("127.0.0.1", 30312, false); Host host1("Test", host1prefs); Host host2("Test", host2prefs); host1.start(); @@ -67,9 +67,8 @@ BOOST_AUTO_TEST_CASE(host) for (int i = 0; i < 3000 && (!host1.peerCount() || !host2.peerCount()); i += step) this_thread::sleep_for(chrono::milliseconds(step)); - //Temporary disabled - //BOOST_REQUIRE_EQUAL(host1.peerCount(), 1); - //BOOST_REQUIRE_EQUAL(host2.peerCount(), 1); + BOOST_REQUIRE_EQUAL(host1.peerCount(), 1); + BOOST_REQUIRE_EQUAL(host2.peerCount(), 1); } BOOST_AUTO_TEST_CASE(networkConfig) From 2fa7375720f88d2ba83a3227b3f41f7bc5f3d779 Mon Sep 17 00:00:00 2001 From: Vlad Gluhovsky Date: Sun, 19 Jul 2015 13:00:37 +0200 Subject: [PATCH 2/3] installWatch() call in test --- test/libwhisper/whisperDB.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/libwhisper/whisperDB.cpp b/test/libwhisper/whisperDB.cpp index 0639d9844..6b96410a0 100644 --- a/test/libwhisper/whisperDB.cpp +++ b/test/libwhisper/whisperDB.cpp @@ -141,6 +141,7 @@ BOOST_AUTO_TEST_CASE(messages) auto wh = h.registerCapability(new WhisperHost(true)); preexisting = wh->all(); cnote << preexisting.size() << "preexisting messages in DB"; + wh->installWatch(BuildTopic("test")); for (unsigned i = 0; i < TestSize; ++i) wh->post(us.sec(), RLPStream().append(i).out(), BuildTopic("test"), 0xFFFFF); @@ -152,6 +153,7 @@ BOOST_AUTO_TEST_CASE(messages) p2p::Host h("Test"); auto wh = h.registerCapability(new WhisperHost(true)); map m2 = wh->all(); + wh->installWatch(BuildTopic("test")); BOOST_REQUIRE_EQUAL(m1.size(), m2.size()); BOOST_REQUIRE_EQUAL(m1.size() - preexisting.size(), TestSize); From 184c1b36cbbf6c017878f585c1589cd7f5007eb1 Mon Sep 17 00:00:00 2001 From: Vlad Gluhovsky Date: Mon, 20 Jul 2015 18:14:02 +0200 Subject: [PATCH 3/3] metainformation removed --- libwhisper/Message.h | 10 +------- libwhisper/WhisperDB.cpp | 49 +++++++------------------------------- libwhisper/WhisperDB.h | 3 --- libwhisper/WhisperHost.cpp | 40 ++++++++++++++----------------- libwhisper/WhisperHost.h | 1 + 5 files changed, 28 insertions(+), 75 deletions(-) diff --git a/libwhisper/Message.h b/libwhisper/Message.h index 8953d1200..4d8b34548 100644 --- a/libwhisper/Message.h +++ b/libwhisper/Message.h @@ -77,11 +77,7 @@ public: bytes const& data() const { return m_data; } bool matchesBloomFilter(TopicBloomFilterHash const& f) const; - void setStoreForever() { m_storeForever = true; } - bool isStoreForever() const { return m_storeForever; } - bool isExpired() const { return !m_storeForever && m_expiry <= (unsigned)time(0); } - void setWatched() { m_watched = true; } - bool isWatched() const { return m_watched; } + bool isExpired() const { return m_expiry <= (unsigned)time(0); } private: Envelope(unsigned _exp, unsigned _ttl, AbridgedTopics const& _topic): m_expiry(_exp), m_ttl(_ttl), m_topic(_topic) {} @@ -92,10 +88,6 @@ private: AbridgedTopics m_topic; bytes m_data; - - /// Metainformation - bool m_storeForever = false; - bool m_watched = false; }; enum /*Message Flags*/ diff --git a/libwhisper/WhisperDB.cpp b/libwhisper/WhisperDB.cpp index 9c4c77f43..327ff1521 100644 --- a/libwhisper/WhisperDB.cpp +++ b/libwhisper/WhisperDB.cpp @@ -85,6 +85,7 @@ void WhisperDB::loadAll(std::map& o_dst) op.verify_checksums = true; vector wasted; unique_ptr it(m_db->NewIterator(op)); + unsigned const now = (unsigned)time(0); for (it->SeekToFirst(); it->Valid(); it->Next()) { @@ -95,18 +96,16 @@ void WhisperDB::loadAll(std::map& o_dst) try { RLP rlp((byte const*)v.data(), v.size()); - Envelope e(rlp[1]); + Envelope e(rlp); h256 h2 = e.sha3(); h256 h1; - readMetaInfo(rlp[0], e); - if (k.size() == h256::size) h1 = h256((byte const*)k.data(), h256::ConstructFromPointer); if (h1 != h2) cwarn << "Corrupted data in Level DB:" << h1.hex() << "versus" << h2.hex(); - else if (!e.isExpired()) + else if (e.expiry() > now) { o_dst[h1] = e; useless = false; @@ -139,50 +138,18 @@ void WhisperDB::save(h256 const& _key, Envelope const& _e) { try { - RLPStream meta; - streamMetaInfo(meta, _e); - - RLPStream msg; - _e.streamRLP(msg); - - RLPStream res(2); - res.appendRaw(meta.out()); - res.appendRaw(msg.out()); - + RLPStream rlp; + _e.streamRLP(rlp); bytes b; - res.swapOut(b); + rlp.swapOut(b); insert(_key, b); } catch(RLPException const& ex) { - cwarn << "RLPException in WhisperDB::save():" << ex.what(); + cwarn << boost::diagnostic_information(ex); } catch(FailedInsertInLevelDB const& ex) { - cwarn << "Exception in WhisperDB::save() - failed to insert:" << ex.what(); + cwarn << boost::diagnostic_information(ex); } } - -void WhisperDB::streamMetaInfo(RLPStream& _rlp, Envelope const& _e) -{ - uint32_t x = 0; - - if (_e.isStoreForever()) - x |= StoreForeverFlag; - - if (_e.isWatched()) - x |= WatchedFlag; - - _rlp.append(x); -} - -void WhisperDB::readMetaInfo(RLP const& _rlp, Envelope& _e) -{ - unsigned x = (uint32_t)_rlp; - - if (x & StoreForeverFlag) - _e.setStoreForever(); - - if (x & WatchedFlag) - _e.setWatched(); -} diff --git a/libwhisper/WhisperDB.h b/libwhisper/WhisperDB.h index 1fc20b9ab..f56ae1005 100644 --- a/libwhisper/WhisperDB.h +++ b/libwhisper/WhisperDB.h @@ -50,9 +50,6 @@ class WhisperDB void save(dev::h256 const& _key, Envelope const& _e); private: - void streamMetaInfo(RLPStream& _rlp, Envelope const& _e); - void readMetaInfo(RLP const& _rlp, Envelope& _e); - leveldb::ReadOptions m_readOptions; leveldb::WriteOptions m_writeOptions; std::unique_ptr m_db; diff --git a/libwhisper/WhisperHost.cpp b/libwhisper/WhisperHost.cpp index 781e6d73f..1f5d28bcc 100644 --- a/libwhisper/WhisperHost.cpp +++ b/libwhisper/WhisperHost.cpp @@ -69,15 +69,13 @@ void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p) return; UpgradeGuard ll(l); m_messages[h] = _m; - if (!_m.isStoreForever()) - m_expiryQueue.insert(make_pair(_m.expiry(), h)); + m_expiryQueue.insert(make_pair(_m.expiry(), h)); } // rating of incoming message from remote host is assessed according to the following criteria: // 1. installed watch match; 2. bloom filter match; 2. ttl; 3. proof of work int rating = 0; - bool match_watch = false; DEV_GUARDED(m_filterLock) if (_m.matchesBloomFilter(m_bloom)) @@ -90,7 +88,6 @@ void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p) { i.second.changes.push_back(h); rating += 2; - match_watch = true; } } @@ -103,14 +100,6 @@ void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p) rating += _m.workProved(); } - if (match_watch) - { - WriteGuard g(x_messages); - auto j = m_messages.find(h); - if (m_messages.end() != j) - j->second.setWatched(); - } - // TODO p2p: capability-based rating for (auto i: peerSessions()) { @@ -206,12 +195,7 @@ void WhisperHost::cleanup() unsigned now = (unsigned)time(0); WriteGuard l(x_messages); for (auto it = m_expiryQueue.begin(); it != m_expiryQueue.end() && it->first <= now; it = m_expiryQueue.erase(it)) - { - auto j = m_messages.find(it->second); - if (j != m_messages.end()) - if (!j->second.isStoreForever()) - m_messages.erase(it->second); - } + m_messages.erase(it->second); } void WhisperHost::noteAdvertiseTopicsOfInterest() @@ -220,6 +204,18 @@ void WhisperHost::noteAdvertiseTopicsOfInterest() i.first->cap().get()->noteAdvertiseTopicsOfInterest(); } +bool WhisperHost::isWatched(Envelope const& _e) const +{ + DEV_GUARDED(m_filterLock) + if (_e.matchesBloomFilter(m_bloom)) + for (auto const& f: m_filters) + if (f.second.filter.matches(_e)) + for (auto const& i: m_watches) + if (i.second.id == f.first) + return true; + return false; +} + void WhisperHost::saveMessagesToBD() { if (!m_useDB) @@ -231,8 +227,9 @@ void WhisperHost::saveMessagesToBD() ReadGuard g(x_messages); unsigned now = (unsigned)time(0); for (auto const& m: m_messages) - if (m.second.isStoreForever() || (m.second.expiry() > now && m.second.isWatched())) - db.save(m.first, m.second); + if (m.second.expiry() > now) + if (isWatched(m.second)) + db.save(m.first, m.second); } catch(FailedToOpenLevelDB const& ex) { @@ -261,8 +258,7 @@ void WhisperHost::loadMessagesFromBD() WriteGuard g(x_messages); m_messages.swap(m); for (auto const& msg: m) - if (!msg.second.isStoreForever()) - m_expiryQueue.insert(make_pair(msg.second.expiry(), msg.first)); + m_expiryQueue.insert(make_pair(msg.second.expiry(), msg.first)); } catch(Exception const& ex) { diff --git a/libwhisper/WhisperHost.h b/libwhisper/WhisperHost.h index 20b2d6e7a..d9a71b6b2 100644 --- a/libwhisper/WhisperHost.h +++ b/libwhisper/WhisperHost.h @@ -67,6 +67,7 @@ public: protected: virtual void doWork() override; void noteAdvertiseTopicsOfInterest(); + bool isWatched(Envelope const& _e) const; private: virtual void onStarting() override { startWorking(); }