Browse Source

message serialization

cl-refactor
Vlad Gluhovsky 9 years ago
parent
commit
6eb214666e
  1. 51
      libwhisper/WhisperDB.cpp
  2. 3
      libwhisper/WhisperDB.h
  3. 66
      libwhisper/WhisperHost.cpp
  4. 2
      libwhisper/WhisperHost.h

51
libwhisper/WhisperDB.cpp

@ -60,6 +60,15 @@ void WhisperDB::insert(dev::h256 const& _key, string const& _value)
BOOST_THROW_EXCEPTION(FailedInsertInLevelDB(status.ToString()));
}
void WhisperDB::insert(dev::h256 const& _key, bytes const& _value)
{
leveldb::Slice k((char const*)_key.data(), _key.size);
leveldb::Slice v((char const*)_value.data(), _value.size());
leveldb::Status status = m_db->Put(m_writeOptions, k, v);
if (!status.ok())
BOOST_THROW_EXCEPTION(FailedInsertInLevelDB(status.ToString()));
}
void WhisperDB::kill(dev::h256 const& _key)
{
leveldb::Slice slice((char const*)_key.data(), _key.size);
@ -67,3 +76,45 @@ void WhisperDB::kill(dev::h256 const& _key)
if (!status.ok())
BOOST_THROW_EXCEPTION(FailedDeleteInLevelDB(status.ToString()));
}
void WhisperDB::loadAll(std::map<h256, Envelope>& o_dst)
{
leveldb::ReadOptions op;
op.fill_cache = false;
op.verify_checksums = true;
vector<leveldb::Slice> wasted;
unsigned now = (unsigned)time(0);
leveldb::Iterator* it = m_db->NewIterator(op);
for (it->SeekToFirst(); it->Valid(); it->Next())
{
leveldb::Slice const k = it->key();
leveldb::Slice const v = it->value();
bool useless = false;
RLP rlp((byte const*)v.data(), v.size());
Envelope e(rlp);
h256 h2 = e.sha3();
h256 h1;
if (k.size() == h256::size)
h1 = h256((byte const*)k.data(), h256::ConstructFromPointer);
if (h1 != h2)
{
useless = true;
cwarn << "Corrupted data in Level DB:" << h1.hex() << "versus" << h2.hex();
}
else if (e.expiry() <= now)
useless = true;
if (useless)
wasted.push_back(k);
else
o_dst[h1] = e;
}
leveldb::WriteOptions woptions;
for (auto k: wasted)
m_db->Delete(woptions, k);
}

3
libwhisper/WhisperDB.h

@ -24,6 +24,7 @@
#include <libdevcore/db.h>
#include <libdevcore/FixedHash.h>
#include "Common.h"
#include "Message.h"
namespace dev
{
@ -43,7 +44,9 @@ class WhisperDB
std::string lookup(dev::h256 const& _key) const;
void insert(dev::h256 const& _key, std::string const& _value);
void insert(dev::h256 const& _key, bytes const& _value);
void kill(dev::h256 const& _key);
void loadAll(std::map<h256, Envelope>& o_dst);
private:
leveldb::ReadOptions m_readOptions;

66
libwhisper/WhisperHost.cpp

@ -20,10 +20,11 @@
*/
#include "WhisperHost.h"
#include <libdevcore/CommonIO.h>
#include <libdevcore/Log.h>
#include <libp2p/All.h>
#include "WhisperDB.h"
using namespace std;
using namespace dev;
using namespace dev::p2p;
@ -31,10 +32,12 @@ using namespace dev::shh;
WhisperHost::WhisperHost(): Worker("shh")
{
loadMessagesFromBD();
}
WhisperHost::~WhisperHost()
{
saveMessagesToBD();
}
void WhisperHost::streamMessage(h256 _m, RLPStream& _s) const
@ -200,3 +203,64 @@ void WhisperHost::noteAdvertiseTopicsOfInterest()
for (auto i: peerSessions())
i.first->cap<WhisperPeer>().get()->noteAdvertiseTopicsOfInterest();
}
void WhisperHost::saveMessagesToBD()
{
try
{
WhisperDB db;
ReadGuard g(x_messages);
for (auto const& m: m_messages)
{
RLPStream rlp;
m.second.streamRLP(rlp);
bytes b;
rlp.swapOut(b);
db.insert(m.first, b);
}
}
catch(FailedToOpenLevelDB const& ex)
{
cwarn << "Exception in WhisperHost::saveMessagesToBD() - failed to open DB:" << ex.what();
}
catch(FailedInsertInLevelDB const& ex)
{
cwarn << "Exception in WhisperHost::saveMessagesToBD() - failed to insert:" << ex.what();
}
catch(FailedLookupInLevelDB const& ex)
{
cwarn << "Exception in WhisperHost::saveMessagesToBD() - failed lookup:" << ex.what();
}
catch(FailedDeleteInLevelDB const& ex)
{
cwarn << "Exception in WhisperHost::saveMessagesToBD() - failed to delete:" << ex.what();
}
catch(Exception const& ex)
{
cwarn << "Exception in WhisperHost::saveMessagesToBD():" << ex.what();
}
catch(...)
{
cwarn << "Unknown Exception in WhisperHost::saveMessagesToBD()";
}
}
void WhisperHost::loadMessagesFromBD()
{
try
{
map<h256, Envelope> m;
WhisperDB db;
db.loadAll(m);
WriteGuard g(x_messages);
m_messages.swap(m);
}
catch(Exception const& ex)
{
cwarn << "Exception in WhisperHost::loadMessagesFromBD():" << ex.what();
}
catch(...)
{
cwarn << "Unknown Exception in WhisperHost::loadMessagesFromBD()";
}
}

2
libwhisper/WhisperHost.h

@ -74,6 +74,8 @@ private:
virtual void onStarting() override { startWorking(); }
virtual void onStopping() override { stopWorking(); }
void streamMessage(h256 _m, RLPStream& _s) const;
void saveMessagesToBD();
void loadMessagesFromBD();
mutable dev::SharedMutex x_messages;
std::map<h256, Envelope> m_messages;

Loading…
Cancel
Save