From 5d32f8182092284a0ad9a24a2127a9ee9bdae045 Mon Sep 17 00:00:00 2001 From: Vlad Gluhovsky Date: Wed, 15 Jul 2015 17:39:56 +0200 Subject: [PATCH] Level DB support for WHisper completed --- libwhisper/WhisperDB.cpp | 58 ++++++++++++++++++++------------ libwhisper/WhisperHost.cpp | 8 ++++- libwhisper/WhisperHost.h | 10 +++--- test/libwhisper/whisperDB.cpp | 27 +++++++++++++++ test/libwhisper/whisperTopic.cpp | 30 +++++++---------- 5 files changed, 87 insertions(+), 46 deletions(-) diff --git a/libwhisper/WhisperDB.cpp b/libwhisper/WhisperDB.cpp index 43d775a96..dbafc9365 100644 --- a/libwhisper/WhisperDB.cpp +++ b/libwhisper/WhisperDB.cpp @@ -29,6 +29,7 @@ using namespace dev::shh; WhisperDB::WhisperDB() { + m_readOptions.verify_checksums = true; string path = dev::getDataDir("shh"); boost::filesystem::create_directories(path); leveldb::Options op; @@ -82,41 +83,54 @@ void WhisperDB::loadAll(std::map& o_dst) leveldb::ReadOptions op; op.fill_cache = false; op.verify_checksums = true; - vector wasted; + vector wasted; unsigned now = (unsigned)time(0); - leveldb::Iterator* it = m_db->NewIterator(op); + unique_ptr it(m_db->NewIterator(op)); for (it->SeekToFirst(); it->Valid(); it->Next()) { leveldb::Slice const k = it->key(); leveldb::Slice const v = it->value(); + bool useless = true; - bool useless = false; - RLP rlp((byte const*)v.data(), v.size()); - Envelope e(rlp); - h256 h2 = e.sha3(); - h256 h1; - - if (k.size() == h256::size) - h1 = h256((byte const*)k.data(), h256::ConstructFromPointer); - - if (h1 != h2) + try { - useless = true; - cwarn << "Corrupted data in Level DB:" << h1.hex() << "versus" << h2.hex(); + RLP rlp((byte const*)v.data(), v.size()); + Envelope e(rlp); + h256 h2 = e.sha3(); + h256 h1; + + if (k.size() == h256::size) + h1 = h256((byte const*)k.data(), h256::ConstructFromPointer); + + if (h1 != h2) + cwarn << "Corrupted data in Level DB:" << h1.hex() << "versus" << h2.hex(); + else if (e.expiry() > now) + { + o_dst[h1] = e; + useless = false; + } + } + catch(RLPException const& ex) + { + cwarn << "RLPException in WhisperDB::loadAll():" << ex.what(); + } + catch(Exception const& ex) + { + cwarn << "Exception in WhisperDB::loadAll():" << ex.what(); } - else if (e.expiry() <= now) - useless = true; if (useless) - wasted.push_back(k); - else - o_dst[h1] = e; + wasted.push_back(k.ToString()); } - delete it; + cdebug << "WhisperDB::loadAll(): loaded " << o_dst.size() << ", deleted " << wasted.size() << "messages"; - leveldb::WriteOptions woptions; for (auto k: wasted) - m_db->Delete(woptions, k); + { + leveldb::Status status = m_db->Delete(m_writeOptions, k); + if (!status.ok()) + cwarn << "Failed to delete an entry from Level DB:" << k; + } } + diff --git a/libwhisper/WhisperHost.cpp b/libwhisper/WhisperHost.cpp index 0202fcc24..82c173378 100644 --- a/libwhisper/WhisperHost.cpp +++ b/libwhisper/WhisperHost.cpp @@ -30,7 +30,7 @@ using namespace dev; using namespace dev::p2p; using namespace dev::shh; -WhisperHost::WhisperHost(): Worker("shh") +WhisperHost::WhisperHost(bool _useDB): Worker("shh"), m_useDB(_useDB) { loadMessagesFromBD(); } @@ -206,6 +206,9 @@ void WhisperHost::noteAdvertiseTopicsOfInterest() void WhisperHost::saveMessagesToBD() { + if (!m_useDB) + return; + try { WhisperDB db; @@ -247,6 +250,9 @@ void WhisperHost::saveMessagesToBD() void WhisperHost::loadMessagesFromBD() { + if (!m_useDB) + return; + try { map m; diff --git a/libwhisper/WhisperHost.h b/libwhisper/WhisperHost.h index a2ca28deb..2470ad68a 100644 --- a/libwhisper/WhisperHost.h +++ b/libwhisper/WhisperHost.h @@ -48,11 +48,10 @@ class WhisperHost: public HostCapability, public Interface, public friend class WhisperPeer; public: - WhisperHost(); + WhisperHost(bool _useDB = true); virtual ~WhisperHost(); unsigned protocolVersion() const { return WhisperProtocolVersion; } - /// remove old messages - void cleanup(); + void cleanup(); ///< remove old messages std::map all() const { dev::ReadGuard l(x_messages); return m_messages; } TopicBloomFilterHash bloom() const { dev::Guard l(m_filterLock); return m_bloom; } @@ -62,8 +61,7 @@ public: virtual void uninstallWatch(unsigned _watchId) override; virtual h256s peekWatch(unsigned _watchId) const override { dev::Guard l(m_filterLock); try { return m_watches.at(_watchId).changes; } catch (...) { return h256s(); } } virtual h256s checkWatch(unsigned _watchId) override { cleanup(); dev::Guard l(m_filterLock); h256s ret; try { ret = m_watches.at(_watchId).changes; m_watches.at(_watchId).changes.clear(); } catch (...) {} return ret; } - /// returns IDs of messages, which match specific watch criteria - virtual h256s watchMessages(unsigned _watchId) override; + virtual h256s watchMessages(unsigned _watchId) override; ///< returns IDs of messages, which match specific watch criteria virtual Envelope envelope(h256 _m) const override { try { dev::ReadGuard l(x_messages); return m_messages.at(_m); } catch (...) { return Envelope(); } } protected: @@ -85,6 +83,8 @@ private: std::map m_filters; std::map m_watches; TopicBloomFilter m_bloom; + + bool m_useDB; ///< needed for tests and other special cases }; } diff --git a/test/libwhisper/whisperDB.cpp b/test/libwhisper/whisperDB.cpp index 410420c24..1616dd427 100644 --- a/test/libwhisper/whisperDB.cpp +++ b/test/libwhisper/whisperDB.cpp @@ -124,6 +124,7 @@ BOOST_AUTO_TEST_CASE(persistence) BOOST_AUTO_TEST_CASE(messages) { cnote << "Testing load/save Whisper messages..."; + VerbosityHolder setTemporaryLevel(2); const unsigned TestSize = 3; map m1; map preexisting; @@ -173,4 +174,30 @@ BOOST_AUTO_TEST_CASE(messages) BOOST_REQUIRE_EQUAL(x, TestSize); } +BOOST_AUTO_TEST_CASE(corruptedData) +{ + cnote << "Testing corrupted data..."; + VerbosityHolder setTemporaryLevel(2); + map m; + h256 x = h256::random(); + + { + WhisperDB db; + db.insert(x, "this is a test input, representing corrupt data"); + } + + { + p2p::Host h("Test"); + auto wh = h.registerCapability(new WhisperHost()); + m = wh->all(); + BOOST_REQUIRE(m.end() == m.find(x)); + } + + { + WhisperDB db; + string s = db.lookup(x); + BOOST_REQUIRE(s.empty()); + } +} + BOOST_AUTO_TEST_SUITE_END() diff --git a/test/libwhisper/whisperTopic.cpp b/test/libwhisper/whisperTopic.cpp index 8d0f603bb..ad438f3ba 100644 --- a/test/libwhisper/whisperTopic.cpp +++ b/test/libwhisper/whisperTopic.cpp @@ -216,30 +216,25 @@ BOOST_AUTO_TEST_CASE(asyncforwarding) cnote << "Testing Whisper async forwarding..."; VerbosityHolder setTemporaryLevel(2); - + unsigned const TestValue = 8456; unsigned result = 0; bool done = false; // Host must be configured not to share peers. Host host1("Forwarder", NetworkPreferences("127.0.0.1", 30305, false)); host1.setIdealPeerCount(1); - auto whost1 = host1.registerCapability(new WhisperHost()); + auto whost1 = host1.registerCapability(new WhisperHost(false)); host1.start(); while (!host1.haveNetwork()) this_thread::sleep_for(chrono::milliseconds(2)); + auto w = whost1->installWatch(BuildTopicMask("test")); // only interested in odd packets bool startedForwarder = false; std::thread forwarder([&]() { setThreadName("forwarder"); - - this_thread::sleep_for(chrono::milliseconds(500)); - + this_thread::sleep_for(chrono::milliseconds(50)); startedForwarder = true; - - /// Only interested in odd packets - auto w = whost1->installWatch(BuildTopicMask("test")); - while (!done) { for (auto i: whost1->checkWatch(w)) @@ -257,31 +252,30 @@ BOOST_AUTO_TEST_CASE(asyncforwarding) { Host host2("Sender", NetworkPreferences("127.0.0.1", 30300, false)); host2.setIdealPeerCount(1); - shared_ptr whost2 = host2.registerCapability(new WhisperHost()); + shared_ptr whost2 = host2.registerCapability(new WhisperHost(false)); host2.start(); while (!host2.haveNetwork()) this_thread::sleep_for(chrono::milliseconds(2)); - host2.addNode(host1.id(), NodeIPEndpoint(bi::address::from_string("127.0.0.1"), 30305, 30305)); - while (!host2.peerCount()) + host2.requirePeer(host1.id(), NodeIPEndpoint(bi::address::from_string("127.0.0.1"), 30305, 30305)); + while (!host2.peerCount() || !host1.peerCount()) this_thread::sleep_for(chrono::milliseconds(5)); KeyPair us = KeyPair::create(); - whost2->post(us.sec(), RLPStream().append(1).out(), BuildTopic("test")); + whost2->post(us.sec(), RLPStream().append(TestValue).out(), BuildTopic("test"), 777000); this_thread::sleep_for(chrono::milliseconds(250)); } { Host ph("Listener", NetworkPreferences("127.0.0.1", 30300, false)); ph.setIdealPeerCount(1); - shared_ptr wh = ph.registerCapability(new WhisperHost()); + shared_ptr wh = ph.registerCapability(new WhisperHost(false)); ph.start(); while (!ph.haveNetwork()) this_thread::sleep_for(chrono::milliseconds(2)); - ph.addNode(host1.id(), NodeIPEndpoint(bi::address::from_string("127.0.0.1"), 30305, 30305)); - /// Only interested in odd packets - auto w = wh->installWatch(BuildTopicMask("test")); + auto w = wh->installWatch(BuildTopicMask("test")); // only interested in odd packets + ph.requirePeer(host1.id(), NodeIPEndpoint(bi::address::from_string("127.0.0.1"), 30305, 30305)); for (int i = 0; i < 200 && !result; ++i) { @@ -298,7 +292,7 @@ BOOST_AUTO_TEST_CASE(asyncforwarding) done = true; forwarder.join(); - BOOST_REQUIRE_EQUAL(result, 1); + BOOST_REQUIRE_EQUAL(result, TestValue); } BOOST_AUTO_TEST_CASE(topicAdvertising)