Browse Source

metainformation removed

cl-refactor
Vlad Gluhovsky 10 years ago
parent
commit
184c1b36cb
  1. 10
      libwhisper/Message.h
  2. 49
      libwhisper/WhisperDB.cpp
  3. 3
      libwhisper/WhisperDB.h
  4. 40
      libwhisper/WhisperHost.cpp
  5. 1
      libwhisper/WhisperHost.h

10
libwhisper/Message.h

@ -77,11 +77,7 @@ public:
bytes const& data() const { return m_data; }
bool matchesBloomFilter(TopicBloomFilterHash const& f) const;
void setStoreForever() { m_storeForever = true; }
bool isStoreForever() const { return m_storeForever; }
bool isExpired() const { return !m_storeForever && m_expiry <= (unsigned)time(0); }
void setWatched() { m_watched = true; }
bool isWatched() const { return m_watched; }
bool isExpired() const { return m_expiry <= (unsigned)time(0); }
private:
Envelope(unsigned _exp, unsigned _ttl, AbridgedTopics const& _topic): m_expiry(_exp), m_ttl(_ttl), m_topic(_topic) {}
@ -92,10 +88,6 @@ private:
AbridgedTopics m_topic;
bytes m_data;
/// Metainformation
bool m_storeForever = false;
bool m_watched = false;
};
enum /*Message Flags*/

49
libwhisper/WhisperDB.cpp

@ -85,6 +85,7 @@ void WhisperDB::loadAll(std::map<h256, Envelope>& o_dst)
op.verify_checksums = true;
vector<string> wasted;
unique_ptr<leveldb::Iterator> it(m_db->NewIterator(op));
unsigned const now = (unsigned)time(0);
for (it->SeekToFirst(); it->Valid(); it->Next())
{
@ -95,18 +96,16 @@ void WhisperDB::loadAll(std::map<h256, Envelope>& o_dst)
try
{
RLP rlp((byte const*)v.data(), v.size());
Envelope e(rlp[1]);
Envelope e(rlp);
h256 h2 = e.sha3();
h256 h1;
readMetaInfo(rlp[0], e);
if (k.size() == h256::size)
h1 = h256((byte const*)k.data(), h256::ConstructFromPointer);
if (h1 != h2)
cwarn << "Corrupted data in Level DB:" << h1.hex() << "versus" << h2.hex();
else if (!e.isExpired())
else if (e.expiry() > now)
{
o_dst[h1] = e;
useless = false;
@ -139,50 +138,18 @@ void WhisperDB::save(h256 const& _key, Envelope const& _e)
{
try
{
RLPStream meta;
streamMetaInfo(meta, _e);
RLPStream msg;
_e.streamRLP(msg);
RLPStream res(2);
res.appendRaw(meta.out());
res.appendRaw(msg.out());
RLPStream rlp;
_e.streamRLP(rlp);
bytes b;
res.swapOut(b);
rlp.swapOut(b);
insert(_key, b);
}
catch(RLPException const& ex)
{
cwarn << "RLPException in WhisperDB::save():" << ex.what();
cwarn << boost::diagnostic_information(ex);
}
catch(FailedInsertInLevelDB const& ex)
{
cwarn << "Exception in WhisperDB::save() - failed to insert:" << ex.what();
cwarn << boost::diagnostic_information(ex);
}
}
void WhisperDB::streamMetaInfo(RLPStream& _rlp, Envelope const& _e)
{
uint32_t x = 0;
if (_e.isStoreForever())
x |= StoreForeverFlag;
if (_e.isWatched())
x |= WatchedFlag;
_rlp.append(x);
}
void WhisperDB::readMetaInfo(RLP const& _rlp, Envelope& _e)
{
unsigned x = (uint32_t)_rlp;
if (x & StoreForeverFlag)
_e.setStoreForever();
if (x & WatchedFlag)
_e.setWatched();
}

3
libwhisper/WhisperDB.h

@ -50,9 +50,6 @@ class WhisperDB
void save(dev::h256 const& _key, Envelope const& _e);
private:
void streamMetaInfo(RLPStream& _rlp, Envelope const& _e);
void readMetaInfo(RLP const& _rlp, Envelope& _e);
leveldb::ReadOptions m_readOptions;
leveldb::WriteOptions m_writeOptions;
std::unique_ptr<leveldb::DB> m_db;

40
libwhisper/WhisperHost.cpp

@ -69,15 +69,13 @@ void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p)
return;
UpgradeGuard ll(l);
m_messages[h] = _m;
if (!_m.isStoreForever())
m_expiryQueue.insert(make_pair(_m.expiry(), h));
m_expiryQueue.insert(make_pair(_m.expiry(), h));
}
// rating of incoming message from remote host is assessed according to the following criteria:
// 1. installed watch match; 2. bloom filter match; 2. ttl; 3. proof of work
int rating = 0;
bool match_watch = false;
DEV_GUARDED(m_filterLock)
if (_m.matchesBloomFilter(m_bloom))
@ -90,7 +88,6 @@ void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p)
{
i.second.changes.push_back(h);
rating += 2;
match_watch = true;
}
}
@ -103,14 +100,6 @@ void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p)
rating += _m.workProved();
}
if (match_watch)
{
WriteGuard g(x_messages);
auto j = m_messages.find(h);
if (m_messages.end() != j)
j->second.setWatched();
}
// TODO p2p: capability-based rating
for (auto i: peerSessions())
{
@ -206,12 +195,7 @@ void WhisperHost::cleanup()
unsigned now = (unsigned)time(0);
WriteGuard l(x_messages);
for (auto it = m_expiryQueue.begin(); it != m_expiryQueue.end() && it->first <= now; it = m_expiryQueue.erase(it))
{
auto j = m_messages.find(it->second);
if (j != m_messages.end())
if (!j->second.isStoreForever())
m_messages.erase(it->second);
}
m_messages.erase(it->second);
}
void WhisperHost::noteAdvertiseTopicsOfInterest()
@ -220,6 +204,18 @@ void WhisperHost::noteAdvertiseTopicsOfInterest()
i.first->cap<WhisperPeer>().get()->noteAdvertiseTopicsOfInterest();
}
bool WhisperHost::isWatched(Envelope const& _e) const
{
DEV_GUARDED(m_filterLock)
if (_e.matchesBloomFilter(m_bloom))
for (auto const& f: m_filters)
if (f.second.filter.matches(_e))
for (auto const& i: m_watches)
if (i.second.id == f.first)
return true;
return false;
}
void WhisperHost::saveMessagesToBD()
{
if (!m_useDB)
@ -231,8 +227,9 @@ void WhisperHost::saveMessagesToBD()
ReadGuard g(x_messages);
unsigned now = (unsigned)time(0);
for (auto const& m: m_messages)
if (m.second.isStoreForever() || (m.second.expiry() > now && m.second.isWatched()))
db.save(m.first, m.second);
if (m.second.expiry() > now)
if (isWatched(m.second))
db.save(m.first, m.second);
}
catch(FailedToOpenLevelDB const& ex)
{
@ -261,8 +258,7 @@ void WhisperHost::loadMessagesFromBD()
WriteGuard g(x_messages);
m_messages.swap(m);
for (auto const& msg: m)
if (!msg.second.isStoreForever())
m_expiryQueue.insert(make_pair(msg.second.expiry(), msg.first));
m_expiryQueue.insert(make_pair(msg.second.expiry(), msg.first));
}
catch(Exception const& ex)
{

1
libwhisper/WhisperHost.h

@ -67,6 +67,7 @@ public:
protected:
virtual void doWork() override;
void noteAdvertiseTopicsOfInterest();
bool isWatched(Envelope const& _e) const;
private:
virtual void onStarting() override { startWorking(); }

Loading…
Cancel
Save