Browse Source

Message save/load functionality upgraded

cl-refactor
Vlad Gluhovsky 10 years ago
parent
commit
5e0fd432ca
  1. 19
      libwhisper/Message.h
  2. 58
      libwhisper/WhisperDB.cpp
  3. 6
      libwhisper/WhisperDB.h
  4. 49
      libwhisper/WhisperHost.cpp
  5. 9
      test/libp2p/peer.cpp

19
libwhisper/Message.h

@ -64,10 +64,11 @@ public:
Envelope() {}
Envelope(RLP const& _m);
operator bool() const { return !!m_expiry; }
void streamRLP(RLPStream& _s, IncludeNonce _withNonce = WithNonce) const { _s.appendList(_withNonce ? 5 : 4) << m_expiry << m_ttl << m_topic << m_data; if (_withNonce) _s << m_nonce; }
h256 sha3(IncludeNonce _withNonce = WithNonce) const { RLPStream s; streamRLP(s, _withNonce); return dev::sha3(s.out()); }
Message open(Topics const& _t, Secret const& _s = Secret()) const;
unsigned workProved() const;
void proveWork(unsigned _ms);
unsigned sent() const { return m_expiry - m_ttl; }
unsigned expiry() const { return m_expiry; }
@ -75,12 +76,12 @@ public:
AbridgedTopics const& topic() const { return m_topic; }
bytes const& data() const { return m_data; }
Message open(Topics const& _t, Secret const& _s = Secret()) const;
unsigned workProved() const;
void proveWork(unsigned _ms);
bool matchesBloomFilter(TopicBloomFilterHash const& f) const;
void setStoreForever() { m_storeForever = true; }
bool isStoreForever() const { return m_storeForever; }
bool isExpired() const { return !m_storeForever && m_expiry <= (unsigned)time(0); }
void setWatched() { m_watched = true; }
bool isWatched() const { return m_watched; }
private:
Envelope(unsigned _exp, unsigned _ttl, AbridgedTopics const& _topic): m_expiry(_exp), m_ttl(_ttl), m_topic(_topic) {}
@ -91,6 +92,10 @@ private:
AbridgedTopics m_topic;
bytes m_data;
/// Metainformation
bool m_storeForever = false;
bool m_watched = false;
};
enum /*Message Flags*/

58
libwhisper/WhisperDB.cpp

@ -84,7 +84,6 @@ void WhisperDB::loadAll(std::map<h256, Envelope>& o_dst)
op.fill_cache = false;
op.verify_checksums = true;
vector<string> wasted;
unsigned now = (unsigned)time(0);
unique_ptr<leveldb::Iterator> it(m_db->NewIterator(op));
for (it->SeekToFirst(); it->Valid(); it->Next())
@ -96,16 +95,18 @@ void WhisperDB::loadAll(std::map<h256, Envelope>& o_dst)
try
{
RLP rlp((byte const*)v.data(), v.size());
Envelope e(rlp);
Envelope e(rlp[1]);
h256 h2 = e.sha3();
h256 h1;
readMetaInfo(rlp[0], e);
if (k.size() == h256::size)
h1 = h256((byte const*)k.data(), h256::ConstructFromPointer);
if (h1 != h2)
cwarn << "Corrupted data in Level DB:" << h1.hex() << "versus" << h2.hex();
else if (e.expiry() > now)
else if (!e.isExpired())
{
o_dst[h1] = e;
useless = false;
@ -134,3 +135,54 @@ void WhisperDB::loadAll(std::map<h256, Envelope>& o_dst)
}
}
void WhisperDB::save(h256 const& _key, Envelope const& _e)
{
try
{
RLPStream meta;
streamMetaInfo(meta, _e);
RLPStream msg;
_e.streamRLP(msg);
RLPStream res(2);
res.appendRaw(meta.out());
res.appendRaw(msg.out());
bytes b;
res.swapOut(b);
insert(_key, b);
}
catch(RLPException const& ex)
{
cwarn << "RLPException in WhisperDB::save():" << ex.what();
}
catch(FailedInsertInLevelDB const& ex)
{
cwarn << "Exception in WhisperDB::save() - failed to insert:" << ex.what();
}
}
void WhisperDB::streamMetaInfo(RLPStream& _rlp, Envelope const& _e)
{
uint32_t x = 0;
if (_e.isStoreForever())
x |= StoreForeverFlag;
if (_e.isWatched())
x |= WatchedFlag;
_rlp.append(x);
}
void WhisperDB::readMetaInfo(RLP const& _rlp, Envelope& _e)
{
unsigned x = (uint32_t)_rlp;
if (x & StoreForeverFlag)
_e.setStoreForever();
if (x & WatchedFlag)
_e.setWatched();
}

6
libwhisper/WhisperDB.h

@ -47,11 +47,17 @@ class WhisperDB
void insert(dev::h256 const& _key, bytes const& _value);
void kill(dev::h256 const& _key);
void loadAll(std::map<h256, Envelope>& o_dst);
void save(dev::h256 const& _key, Envelope const& _e);
private:
void streamMetaInfo(RLPStream& _rlp, Envelope const& _e);
void readMetaInfo(RLP const& _rlp, Envelope& _e);
leveldb::ReadOptions m_readOptions;
leveldb::WriteOptions m_writeOptions;
std::unique_ptr<leveldb::DB> m_db;
enum MetaInformation { StoreForeverFlag = 1, WatchedFlag = 2 };
};
}

49
libwhisper/WhisperHost.cpp

@ -59,7 +59,7 @@ void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p)
cnote << this << ": inject: " << _m.expiry() << _m.ttl() << _m.topic() << toHex(_m.data());
if (_m.expiry() <= (unsigned)time(0))
if (_m.isExpired())
return;
auto h = _m.sha3();
@ -69,13 +69,15 @@ void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p)
return;
UpgradeGuard ll(l);
m_messages[h] = _m;
m_expiryQueue.insert(make_pair(_m.expiry(), h));
if (!_m.isStoreForever())
m_expiryQueue.insert(make_pair(_m.expiry(), h));
}
// rating of incoming message from remote host is assessed according to the following criteria:
// 1. installed watch match; 2. bloom filter match; 2. ttl; 3. proof of work
int rating = 0;
bool match_watch = false;
DEV_GUARDED(m_filterLock)
if (_m.matchesBloomFilter(m_bloom))
@ -84,10 +86,11 @@ void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p)
for (auto const& f: m_filters)
if (f.second.filter.matches(_m))
for (auto& i: m_watches)
if (i.second.id == f.first)
if (i.second.id == f.first) // match one of the watches
{
i.second.changes.push_back(h);
rating += 2;
match_watch = true;
}
}
@ -100,6 +103,14 @@ void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p)
rating += _m.workProved();
}
if (match_watch)
{
WriteGuard g(x_messages);
auto j = m_messages.find(h);
if (m_messages.end() != j)
j->second.setWatched();
}
// TODO p2p: capability-based rating
for (auto i: peerSessions())
{
@ -195,7 +206,12 @@ void WhisperHost::cleanup()
unsigned now = (unsigned)time(0);
WriteGuard l(x_messages);
for (auto it = m_expiryQueue.begin(); it != m_expiryQueue.end() && it->first <= now; it = m_expiryQueue.erase(it))
m_messages.erase(it->second);
{
auto j = m_messages.find(it->second);
if (j != m_messages.end())
if (!j->second.isStoreForever())
m_messages.erase(it->second);
}
}
void WhisperHost::noteAdvertiseTopicsOfInterest()
@ -213,31 +229,15 @@ void WhisperHost::saveMessagesToBD()
{
WhisperDB db;
ReadGuard g(x_messages);
unsigned now = (unsigned)time(0);
for (auto const& m: m_messages)
{
RLPStream rlp;
m.second.streamRLP(rlp);
bytes b;
rlp.swapOut(b);
db.insert(m.first, b);
}
if (m.second.isStoreForever() || (m.second.expiry() > now && m.second.isWatched()))
db.save(m.first, m.second);
}
catch(FailedToOpenLevelDB const& ex)
{
cwarn << "Exception in WhisperHost::saveMessagesToBD() - failed to open DB:" << ex.what();
}
catch(FailedInsertInLevelDB const& ex)
{
cwarn << "Exception in WhisperHost::saveMessagesToBD() - failed to insert:" << ex.what();
}
catch(FailedLookupInLevelDB const& ex)
{
cwarn << "Exception in WhisperHost::saveMessagesToBD() - failed lookup:" << ex.what();
}
catch(FailedDeleteInLevelDB const& ex)
{
cwarn << "Exception in WhisperHost::saveMessagesToBD() - failed to delete:" << ex.what();
}
catch(Exception const& ex)
{
cwarn << "Exception in WhisperHost::saveMessagesToBD():" << ex.what();
@ -260,6 +260,9 @@ void WhisperHost::loadMessagesFromBD()
db.loadAll(m);
WriteGuard g(x_messages);
m_messages.swap(m);
for (auto const& msg: m)
if (!msg.second.isStoreForever())
m_expiryQueue.insert(make_pair(msg.second.expiry(), msg.first));
}
catch(Exception const& ex)
{

9
test/libp2p/peer.cpp

@ -44,8 +44,8 @@ BOOST_AUTO_TEST_CASE(host)
return;
VerbosityHolder setTemporaryLevel(10);
NetworkPreferences host1prefs("127.0.0.1", 30301, false);
NetworkPreferences host2prefs("127.0.0.1", 30302, false);
NetworkPreferences host1prefs("127.0.0.1", 30311, false);
NetworkPreferences host2prefs("127.0.0.1", 30312, false);
Host host1("Test", host1prefs);
Host host2("Test", host2prefs);
host1.start();
@ -67,9 +67,8 @@ BOOST_AUTO_TEST_CASE(host)
for (int i = 0; i < 3000 && (!host1.peerCount() || !host2.peerCount()); i += step)
this_thread::sleep_for(chrono::milliseconds(step));
//Temporary disabled
//BOOST_REQUIRE_EQUAL(host1.peerCount(), 1);
//BOOST_REQUIRE_EQUAL(host2.peerCount(), 1);
BOOST_REQUIRE_EQUAL(host1.peerCount(), 1);
BOOST_REQUIRE_EQUAL(host2.peerCount(), 1);
}
BOOST_AUTO_TEST_CASE(networkConfig)

Loading…
Cancel
Save