Browse Source

Merge pull request #2511 from gluk256/_whisperDB_

Level DB support for Whisper
cl-refactor
Gav Wood 10 years ago
parent
commit
202d8570ea
  1. 11
      libwhisper/Message.h
  2. 21
      libwhisper/WhisperDB.cpp
  3. 3
      libwhisper/WhisperDB.h
  4. 41
      libwhisper/WhisperHost.cpp
  5. 1
      libwhisper/WhisperHost.h
  6. 9
      test/libp2p/peer.cpp
  7. 2
      test/libwhisper/whisperDB.cpp

11
libwhisper/Message.h

@ -64,10 +64,11 @@ public:
Envelope() {}
Envelope(RLP const& _m);
operator bool() const { return !!m_expiry; }
void streamRLP(RLPStream& _s, IncludeNonce _withNonce = WithNonce) const { _s.appendList(_withNonce ? 5 : 4) << m_expiry << m_ttl << m_topic << m_data; if (_withNonce) _s << m_nonce; }
h256 sha3(IncludeNonce _withNonce = WithNonce) const { RLPStream s; streamRLP(s, _withNonce); return dev::sha3(s.out()); }
Message open(Topics const& _t, Secret const& _s = Secret()) const;
unsigned workProved() const;
void proveWork(unsigned _ms);
unsigned sent() const { return m_expiry - m_ttl; }
unsigned expiry() const { return m_expiry; }
@ -75,12 +76,8 @@ public:
AbridgedTopics const& topic() const { return m_topic; }
bytes const& data() const { return m_data; }
Message open(Topics const& _t, Secret const& _s = Secret()) const;
unsigned workProved() const;
void proveWork(unsigned _ms);
bool matchesBloomFilter(TopicBloomFilterHash const& f) const;
bool isExpired() const { return m_expiry <= (unsigned)time(0); }
private:
Envelope(unsigned _exp, unsigned _ttl, AbridgedTopics const& _topic): m_expiry(_exp), m_ttl(_ttl), m_topic(_topic) {}

21
libwhisper/WhisperDB.cpp

@ -85,8 +85,8 @@ void WhisperDB::loadAll(std::map<h256, Envelope>& o_dst)
op.fill_cache = false;
op.verify_checksums = true;
vector<string> wasted;
unsigned now = (unsigned)time(0);
unique_ptr<leveldb::Iterator> it(m_db->NewIterator(op));
unsigned const now = (unsigned)time(0);
for (it->SeekToFirst(); it->Valid(); it->Next())
{
@ -135,3 +135,22 @@ void WhisperDB::loadAll(std::map<h256, Envelope>& o_dst)
}
}
void WhisperDB::save(h256 const& _key, Envelope const& _e)
{
try
{
RLPStream rlp;
_e.streamRLP(rlp);
bytes b;
rlp.swapOut(b);
insert(_key, b);
}
catch(RLPException const& ex)
{
cwarn << boost::diagnostic_information(ex);
}
catch(FailedInsertInLevelDB const& ex)
{
cwarn << boost::diagnostic_information(ex);
}
}

3
libwhisper/WhisperDB.h

@ -47,11 +47,14 @@ class WhisperDB
void insert(dev::h256 const& _key, bytes const& _value);
void kill(dev::h256 const& _key);
void loadAll(std::map<h256, Envelope>& o_dst);
void save(dev::h256 const& _key, Envelope const& _e);
private:
leveldb::ReadOptions m_readOptions;
leveldb::WriteOptions m_writeOptions;
std::unique_ptr<leveldb::DB> m_db;
enum MetaInformation { StoreForeverFlag = 1, WatchedFlag = 2 };
};
}

41
libwhisper/WhisperHost.cpp

@ -59,7 +59,7 @@ void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p)
cnote << this << ": inject: " << _m.expiry() << _m.ttl() << _m.topic() << toHex(_m.data());
if (_m.expiry() <= (unsigned)time(0))
if (_m.isExpired())
return;
auto h = _m.sha3();
@ -84,7 +84,7 @@ void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p)
for (auto const& f: m_filters)
if (f.second.filter.matches(_m))
for (auto& i: m_watches)
if (i.second.id == f.first)
if (i.second.id == f.first) // match one of the watches
{
i.second.changes.push_back(h);
rating += 2;
@ -204,6 +204,18 @@ void WhisperHost::noteAdvertiseTopicsOfInterest()
i.first->cap<WhisperPeer>().get()->noteAdvertiseTopicsOfInterest();
}
bool WhisperHost::isWatched(Envelope const& _e) const
{
DEV_GUARDED(m_filterLock)
if (_e.matchesBloomFilter(m_bloom))
for (auto const& f: m_filters)
if (f.second.filter.matches(_e))
for (auto const& i: m_watches)
if (i.second.id == f.first)
return true;
return false;
}
void WhisperHost::saveMessagesToBD()
{
if (!m_useDB)
@ -213,31 +225,16 @@ void WhisperHost::saveMessagesToBD()
{
WhisperDB db;
ReadGuard g(x_messages);
unsigned now = (unsigned)time(0);
for (auto const& m: m_messages)
{
RLPStream rlp;
m.second.streamRLP(rlp);
bytes b;
rlp.swapOut(b);
db.insert(m.first, b);
}
if (m.second.expiry() > now)
if (isWatched(m.second))
db.save(m.first, m.second);
}
catch(FailedToOpenLevelDB const& ex)
{
cwarn << "Exception in WhisperHost::saveMessagesToBD() - failed to open DB:" << ex.what();
}
catch(FailedInsertInLevelDB const& ex)
{
cwarn << "Exception in WhisperHost::saveMessagesToBD() - failed to insert:" << ex.what();
}
catch(FailedLookupInLevelDB const& ex)
{
cwarn << "Exception in WhisperHost::saveMessagesToBD() - failed lookup:" << ex.what();
}
catch(FailedDeleteInLevelDB const& ex)
{
cwarn << "Exception in WhisperHost::saveMessagesToBD() - failed to delete:" << ex.what();
}
catch(Exception const& ex)
{
cwarn << "Exception in WhisperHost::saveMessagesToBD():" << ex.what();
@ -260,6 +257,8 @@ void WhisperHost::loadMessagesFromBD()
db.loadAll(m);
WriteGuard g(x_messages);
m_messages.swap(m);
for (auto const& msg: m)
m_expiryQueue.insert(make_pair(msg.second.expiry(), msg.first));
}
catch(Exception const& ex)
{

1
libwhisper/WhisperHost.h

@ -67,6 +67,7 @@ public:
protected:
virtual void doWork() override;
void noteAdvertiseTopicsOfInterest();
bool isWatched(Envelope const& _e) const;
private:
virtual void onStarting() override { startWorking(); }

9
test/libp2p/peer.cpp

@ -44,8 +44,8 @@ BOOST_AUTO_TEST_CASE(host)
return;
VerbosityHolder setTemporaryLevel(10);
NetworkPreferences host1prefs("127.0.0.1", 30301, false);
NetworkPreferences host2prefs("127.0.0.1", 30302, false);
NetworkPreferences host1prefs("127.0.0.1", 30311, false);
NetworkPreferences host2prefs("127.0.0.1", 30312, false);
Host host1("Test", host1prefs);
Host host2("Test", host2prefs);
host1.start();
@ -67,9 +67,8 @@ BOOST_AUTO_TEST_CASE(host)
for (int i = 0; i < 3000 && (!host1.peerCount() || !host2.peerCount()); i += step)
this_thread::sleep_for(chrono::milliseconds(step));
//Temporary disabled
//BOOST_REQUIRE_EQUAL(host1.peerCount(), 1);
//BOOST_REQUIRE_EQUAL(host2.peerCount(), 1);
BOOST_REQUIRE_EQUAL(host1.peerCount(), 1);
BOOST_REQUIRE_EQUAL(host2.peerCount(), 1);
}
BOOST_AUTO_TEST_CASE(networkConfig)

2
test/libwhisper/whisperDB.cpp

@ -141,6 +141,7 @@ BOOST_AUTO_TEST_CASE(messages)
auto wh = h.registerCapability(new WhisperHost(true));
preexisting = wh->all();
cnote << preexisting.size() << "preexisting messages in DB";
wh->installWatch(BuildTopic("test"));
for (unsigned i = 0; i < TestSize; ++i)
wh->post(us.sec(), RLPStream().append(i).out(), BuildTopic("test"), 0xFFFFF);
@ -152,6 +153,7 @@ BOOST_AUTO_TEST_CASE(messages)
p2p::Host h("Test");
auto wh = h.registerCapability(new WhisperHost(true));
map<h256, Envelope> m2 = wh->all();
wh->installWatch(BuildTopic("test"));
BOOST_REQUIRE_EQUAL(m1.size(), m2.size());
BOOST_REQUIRE_EQUAL(m1.size() - preexisting.size(), TestSize);

Loading…
Cancel
Save