Browse Source

Level DB support for WHisper completed

cl-refactor
Vlad Gluhovsky 10 years ago
parent
commit
5d32f81820
  1. 58
      libwhisper/WhisperDB.cpp
  2. 8
      libwhisper/WhisperHost.cpp
  3. 10
      libwhisper/WhisperHost.h
  4. 27
      test/libwhisper/whisperDB.cpp
  5. 30
      test/libwhisper/whisperTopic.cpp

58
libwhisper/WhisperDB.cpp

@ -29,6 +29,7 @@ using namespace dev::shh;
WhisperDB::WhisperDB()
{
m_readOptions.verify_checksums = true;
string path = dev::getDataDir("shh");
boost::filesystem::create_directories(path);
leveldb::Options op;
@ -82,41 +83,54 @@ void WhisperDB::loadAll(std::map<h256, Envelope>& o_dst)
leveldb::ReadOptions op;
op.fill_cache = false;
op.verify_checksums = true;
vector<leveldb::Slice> wasted;
vector<string> wasted;
unsigned now = (unsigned)time(0);
leveldb::Iterator* it = m_db->NewIterator(op);
unique_ptr<leveldb::Iterator> it(m_db->NewIterator(op));
for (it->SeekToFirst(); it->Valid(); it->Next())
{
leveldb::Slice const k = it->key();
leveldb::Slice const v = it->value();
bool useless = true;
bool useless = false;
RLP rlp((byte const*)v.data(), v.size());
Envelope e(rlp);
h256 h2 = e.sha3();
h256 h1;
if (k.size() == h256::size)
h1 = h256((byte const*)k.data(), h256::ConstructFromPointer);
if (h1 != h2)
try
{
useless = true;
cwarn << "Corrupted data in Level DB:" << h1.hex() << "versus" << h2.hex();
RLP rlp((byte const*)v.data(), v.size());
Envelope e(rlp);
h256 h2 = e.sha3();
h256 h1;
if (k.size() == h256::size)
h1 = h256((byte const*)k.data(), h256::ConstructFromPointer);
if (h1 != h2)
cwarn << "Corrupted data in Level DB:" << h1.hex() << "versus" << h2.hex();
else if (e.expiry() > now)
{
o_dst[h1] = e;
useless = false;
}
}
catch(RLPException const& ex)
{
cwarn << "RLPException in WhisperDB::loadAll():" << ex.what();
}
catch(Exception const& ex)
{
cwarn << "Exception in WhisperDB::loadAll():" << ex.what();
}
else if (e.expiry() <= now)
useless = true;
if (useless)
wasted.push_back(k);
else
o_dst[h1] = e;
wasted.push_back(k.ToString());
}
delete it;
cdebug << "WhisperDB::loadAll(): loaded " << o_dst.size() << ", deleted " << wasted.size() << "messages";
leveldb::WriteOptions woptions;
for (auto k: wasted)
m_db->Delete(woptions, k);
{
leveldb::Status status = m_db->Delete(m_writeOptions, k);
if (!status.ok())
cwarn << "Failed to delete an entry from Level DB:" << k;
}
}

8
libwhisper/WhisperHost.cpp

@ -30,7 +30,7 @@ using namespace dev;
using namespace dev::p2p;
using namespace dev::shh;
WhisperHost::WhisperHost(): Worker("shh")
WhisperHost::WhisperHost(bool _useDB): Worker("shh"), m_useDB(_useDB)
{
loadMessagesFromBD();
}
@ -206,6 +206,9 @@ void WhisperHost::noteAdvertiseTopicsOfInterest()
void WhisperHost::saveMessagesToBD()
{
if (!m_useDB)
return;
try
{
WhisperDB db;
@ -247,6 +250,9 @@ void WhisperHost::saveMessagesToBD()
void WhisperHost::loadMessagesFromBD()
{
if (!m_useDB)
return;
try
{
map<h256, Envelope> m;

10
libwhisper/WhisperHost.h

@ -48,11 +48,10 @@ class WhisperHost: public HostCapability<WhisperPeer>, public Interface, public
friend class WhisperPeer;
public:
WhisperHost();
WhisperHost(bool _useDB = true);
virtual ~WhisperHost();
unsigned protocolVersion() const { return WhisperProtocolVersion; }
/// remove old messages
void cleanup();
void cleanup(); ///< remove old messages
std::map<h256, Envelope> all() const { dev::ReadGuard l(x_messages); return m_messages; }
TopicBloomFilterHash bloom() const { dev::Guard l(m_filterLock); return m_bloom; }
@ -62,8 +61,7 @@ public:
virtual void uninstallWatch(unsigned _watchId) override;
virtual h256s peekWatch(unsigned _watchId) const override { dev::Guard l(m_filterLock); try { return m_watches.at(_watchId).changes; } catch (...) { return h256s(); } }
virtual h256s checkWatch(unsigned _watchId) override { cleanup(); dev::Guard l(m_filterLock); h256s ret; try { ret = m_watches.at(_watchId).changes; m_watches.at(_watchId).changes.clear(); } catch (...) {} return ret; }
/// returns IDs of messages, which match specific watch criteria
virtual h256s watchMessages(unsigned _watchId) override;
virtual h256s watchMessages(unsigned _watchId) override; ///< returns IDs of messages, which match specific watch criteria
virtual Envelope envelope(h256 _m) const override { try { dev::ReadGuard l(x_messages); return m_messages.at(_m); } catch (...) { return Envelope(); } }
protected:
@ -85,6 +83,8 @@ private:
std::map<h256, InstalledFilter> m_filters;
std::map<unsigned, ClientWatch> m_watches;
TopicBloomFilter m_bloom;
bool m_useDB; ///< needed for tests and other special cases
};
}

27
test/libwhisper/whisperDB.cpp

@ -124,6 +124,7 @@ BOOST_AUTO_TEST_CASE(persistence)
BOOST_AUTO_TEST_CASE(messages)
{
cnote << "Testing load/save Whisper messages...";
VerbosityHolder setTemporaryLevel(2);
const unsigned TestSize = 3;
map<h256, Envelope> m1;
map<h256, Envelope> preexisting;
@ -173,4 +174,30 @@ BOOST_AUTO_TEST_CASE(messages)
BOOST_REQUIRE_EQUAL(x, TestSize);
}
BOOST_AUTO_TEST_CASE(corruptedData)
{
cnote << "Testing corrupted data...";
VerbosityHolder setTemporaryLevel(2);
map<h256, Envelope> m;
h256 x = h256::random();
{
WhisperDB db;
db.insert(x, "this is a test input, representing corrupt data");
}
{
p2p::Host h("Test");
auto wh = h.registerCapability(new WhisperHost());
m = wh->all();
BOOST_REQUIRE(m.end() == m.find(x));
}
{
WhisperDB db;
string s = db.lookup(x);
BOOST_REQUIRE(s.empty());
}
}
BOOST_AUTO_TEST_SUITE_END()

30
test/libwhisper/whisperTopic.cpp

@ -216,30 +216,25 @@ BOOST_AUTO_TEST_CASE(asyncforwarding)
cnote << "Testing Whisper async forwarding...";
VerbosityHolder setTemporaryLevel(2);
unsigned const TestValue = 8456;
unsigned result = 0;
bool done = false;
// Host must be configured not to share peers.
Host host1("Forwarder", NetworkPreferences("127.0.0.1", 30305, false));
host1.setIdealPeerCount(1);
auto whost1 = host1.registerCapability(new WhisperHost());
auto whost1 = host1.registerCapability(new WhisperHost(false));
host1.start();
while (!host1.haveNetwork())
this_thread::sleep_for(chrono::milliseconds(2));
auto w = whost1->installWatch(BuildTopicMask("test")); // only interested in odd packets
bool startedForwarder = false;
std::thread forwarder([&]()
{
setThreadName("forwarder");
this_thread::sleep_for(chrono::milliseconds(500));
this_thread::sleep_for(chrono::milliseconds(50));
startedForwarder = true;
/// Only interested in odd packets
auto w = whost1->installWatch(BuildTopicMask("test"));
while (!done)
{
for (auto i: whost1->checkWatch(w))
@ -257,31 +252,30 @@ BOOST_AUTO_TEST_CASE(asyncforwarding)
{
Host host2("Sender", NetworkPreferences("127.0.0.1", 30300, false));
host2.setIdealPeerCount(1);
shared_ptr<WhisperHost> whost2 = host2.registerCapability(new WhisperHost());
shared_ptr<WhisperHost> whost2 = host2.registerCapability(new WhisperHost(false));
host2.start();
while (!host2.haveNetwork())
this_thread::sleep_for(chrono::milliseconds(2));
host2.addNode(host1.id(), NodeIPEndpoint(bi::address::from_string("127.0.0.1"), 30305, 30305));
while (!host2.peerCount())
host2.requirePeer(host1.id(), NodeIPEndpoint(bi::address::from_string("127.0.0.1"), 30305, 30305));
while (!host2.peerCount() || !host1.peerCount())
this_thread::sleep_for(chrono::milliseconds(5));
KeyPair us = KeyPair::create();
whost2->post(us.sec(), RLPStream().append(1).out(), BuildTopic("test"));
whost2->post(us.sec(), RLPStream().append(TestValue).out(), BuildTopic("test"), 777000);
this_thread::sleep_for(chrono::milliseconds(250));
}
{
Host ph("Listener", NetworkPreferences("127.0.0.1", 30300, false));
ph.setIdealPeerCount(1);
shared_ptr<WhisperHost> wh = ph.registerCapability(new WhisperHost());
shared_ptr<WhisperHost> wh = ph.registerCapability(new WhisperHost(false));
ph.start();
while (!ph.haveNetwork())
this_thread::sleep_for(chrono::milliseconds(2));
ph.addNode(host1.id(), NodeIPEndpoint(bi::address::from_string("127.0.0.1"), 30305, 30305));
/// Only interested in odd packets
auto w = wh->installWatch(BuildTopicMask("test"));
auto w = wh->installWatch(BuildTopicMask("test")); // only interested in odd packets
ph.requirePeer(host1.id(), NodeIPEndpoint(bi::address::from_string("127.0.0.1"), 30305, 30305));
for (int i = 0; i < 200 && !result; ++i)
{
@ -298,7 +292,7 @@ BOOST_AUTO_TEST_CASE(asyncforwarding)
done = true;
forwarder.join();
BOOST_REQUIRE_EQUAL(result, 1);
BOOST_REQUIRE_EQUAL(result, TestValue);
}
BOOST_AUTO_TEST_CASE(topicAdvertising)

Loading…
Cancel
Save