Browse Source

Topics of interest advertising made asynchronous

cl-refactor
Vlad Gluhovsky 10 years ago
parent
commit
784a786a82
  1. 17
      libwhisper/WhisperHost.cpp
  2. 11
      libwhisper/WhisperHost.h
  3. 26
      libwhisper/WhisperPeer.cpp
  4. 12
      libwhisper/WhisperPeer.h
  5. 4
      test/libwhisper/whisperTopic.cpp

17
libwhisper/WhisperHost.cpp

@ -87,8 +87,10 @@ void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p)
void WhisperHost::advertiseTopicsOfInterest() void WhisperHost::advertiseTopicsOfInterest()
{ {
FixedHash<TopicBloomFilterSize> b = bloom();
for (auto i: peerSessions()) for (auto i: peerSessions())
i.first->cap<WhisperPeer>().get()->advertiseTopicsOfInterest(); i.first->cap<WhisperPeer>().get()->sendTopicsOfInterest(b);
} }
void WhisperHost::noteChanged(h256 _messageHash, h256 _filter) void WhisperHost::noteChanged(h256 _messageHash, h256 _filter)
@ -111,16 +113,15 @@ unsigned WhisperHost::installWatchOnId(h256 _h)
unsigned WhisperHost::installWatch(shh::Topics const& _t) unsigned WhisperHost::installWatch(shh::Topics const& _t)
{ {
Guard l(m_filterLock);
InstalledFilter f(_t); InstalledFilter f(_t);
h256 h = f.filter.sha3(); h256 h = f.filter.sha3();
Guard l(m_filterLock);
if (!m_filters.count(h)) if (!m_filters.count(h))
m_filters.insert(make_pair(h, f)); m_filters.insert(make_pair(h, f));
m_bloom.addRaw(f.filter.exportBloom()); m_bloom.addRaw(f.filter.exportBloom());
advertiseTopicsOfInterest(); noteAdvertiseTopicsOfInterest();
return installWatchOnId(h); return installWatchOnId(h);
} }
@ -165,7 +166,7 @@ void WhisperHost::uninstallWatch(unsigned _i)
m_filters.erase(fit); m_filters.erase(fit);
m_bloom.removeRaw(fit->second.filter.exportBloom()); m_bloom.removeRaw(fit->second.filter.exportBloom());
advertiseTopicsOfInterest(); noteAdvertiseTopicsOfInterest();
} }
} }
@ -185,3 +186,9 @@ void WhisperHost::cleanup()
for (auto it = m_expiryQueue.begin(); it != m_expiryQueue.end() && it->first <= now; it = m_expiryQueue.erase(it)) for (auto it = m_expiryQueue.begin(); it != m_expiryQueue.end() && it->first <= now; it = m_expiryQueue.erase(it))
m_messages.erase(it->second); m_messages.erase(it->second);
} }
void WhisperHost::noteAdvertiseTopicsOfInterest()
{
for (auto i: peerSessions())
i.first->cap<WhisperPeer>().get()->noteAdvertiseTopicsOfInterest();
}

11
libwhisper/WhisperHost.h

@ -51,9 +51,9 @@ public:
WhisperHost(); WhisperHost();
virtual ~WhisperHost(); virtual ~WhisperHost();
unsigned protocolVersion() const { return 2; } unsigned protocolVersion() const { return 2; }
void cleanup(); void cleanup(); ///< remove old messages
std::map<h256, Envelope> all() const { ReadGuard l(x_messages); return m_messages; } std::map<h256, Envelope> all() const { dev::ReadGuard l(x_messages); return m_messages; }
FixedHash<TopicBloomFilterSize> const& bloom() const { return m_bloom; } FixedHash<TopicBloomFilterSize> bloom() const { dev::Guard l(m_filterLock); return m_bloom; }
virtual void inject(Envelope const& _e, WhisperPeer* _from = nullptr) override; virtual void inject(Envelope const& _e, WhisperPeer* _from = nullptr) override;
virtual Topics const& fullTopics(unsigned _id) const override { try { return m_filters.at(m_watches.at(_id).id).full; } catch (...) { return EmptyTopics; } } virtual Topics const& fullTopics(unsigned _id) const override { try { return m_filters.at(m_watches.at(_id).id).full; } catch (...) { return EmptyTopics; } }
@ -62,11 +62,12 @@ public:
virtual void uninstallWatch(unsigned _watchId) override; virtual void uninstallWatch(unsigned _watchId) override;
virtual h256s peekWatch(unsigned _watchId) const override { dev::Guard l(m_filterLock); try { return m_watches.at(_watchId).changes; } catch (...) { return h256s(); } } virtual h256s peekWatch(unsigned _watchId) const override { dev::Guard l(m_filterLock); try { return m_watches.at(_watchId).changes; } catch (...) { return h256s(); } }
virtual h256s checkWatch(unsigned _watchId) override { cleanup(); dev::Guard l(m_filterLock); h256s ret; try { ret = m_watches.at(_watchId).changes; m_watches.at(_watchId).changes.clear(); } catch (...) {} return ret; } virtual h256s checkWatch(unsigned _watchId) override { cleanup(); dev::Guard l(m_filterLock); h256s ret; try { ret = m_watches.at(_watchId).changes; m_watches.at(_watchId).changes.clear(); } catch (...) {} return ret; }
virtual h256s watchMessages(unsigned _watchId) override; /// returns IDs of messages, which match specific watch criteria virtual h256s watchMessages(unsigned _watchId) override; ///< returns IDs of messages, which match specific watch criteria
virtual Envelope envelope(h256 _m) const override { try { dev::ReadGuard l(x_messages); return m_messages.at(_m); } catch (...) { return Envelope(); } } virtual Envelope envelope(h256 _m) const override { try { dev::ReadGuard l(x_messages); return m_messages.at(_m); } catch (...) { return Envelope(); } }
protected: protected:
virtual void doWork() override; virtual void doWork() override;
void noteAdvertiseTopicsOfInterest();
private: private:
virtual void onStarting() override { startWorking(); } virtual void onStarting() override { startWorking(); }
@ -74,7 +75,7 @@ private:
void streamMessage(h256 _m, RLPStream& _s) const; void streamMessage(h256 _m, RLPStream& _s) const;
void noteChanged(h256 _messageHash, h256 _filter); void noteChanged(h256 _messageHash, h256 _filter);
void advertiseTopicsOfInterest(); void advertiseTopicsOfInterest(); ///< send our bloom filter to remote peers
mutable dev::SharedMutex x_messages; mutable dev::SharedMutex x_messages;
std::map<h256, Envelope> m_messages; std::map<h256, Envelope> m_messages;

26
libwhisper/WhisperPeer.cpp

@ -31,11 +31,8 @@ using namespace dev::shh;
WhisperPeer::WhisperPeer(std::shared_ptr<Session> _s, HostCapabilityFace* _h, unsigned _i, CapDesc const&): Capability(_s, _h, _i) WhisperPeer::WhisperPeer(std::shared_ptr<Session> _s, HostCapabilityFace* _h, unsigned _i, CapDesc const&): Capability(_s, _h, _i)
{ {
RLPStream s; RLPStream s;
//sealAndSend(prep(s, StatusPacket, 1) << version()); sealAndSend(prep(s, StatusPacket, 1) << version());
prep(s, StatusPacket, 2); noteAdvertiseTopicsOfInterest();
s << version();
s << host()->bloom();
sealAndSend(s);
} }
WhisperPeer::~WhisperPeer() WhisperPeer::~WhisperPeer()
@ -59,23 +56,19 @@ bool WhisperPeer::interpret(unsigned _id, RLP const& _r)
if (protocolVersion != version()) if (protocolVersion != version())
disable("Invalid protocol version."); disable("Invalid protocol version.");
else
{
if (_r.itemCount() > 1) // for backwards compatibility
m_bloom = (FixedHash<TopicBloomFilterSize>)_r[1];
advertiseTopicsOfInterest();
}
for (auto const& m: host()->all()) for (auto const& m: host()->all())
m_unseen.insert(make_pair(0, m.first)); m_unseen.insert(make_pair(0, m.first));
if (session()->id() < host()->host()->id()) if (session()->id() < host()->host()->id())
sendMessages(); sendMessages();
noteAdvertiseTopicsOfInterest();
break; break;
} }
case TopicFilterPacket: case TopicFilterPacket:
{ {
m_bloom = (FixedHash<TopicBloomFilterSize>)_r[0]; setBloom((FixedHash<TopicBloomFilterSize>)_r[0]);
break; break;
} }
case MessagesPacket: case MessagesPacket:
@ -92,6 +85,9 @@ bool WhisperPeer::interpret(unsigned _id, RLP const& _r)
void WhisperPeer::sendMessages() void WhisperPeer::sendMessages()
{ {
if (m_advertiseTopicsOfInterest)
sendTopicsOfInterest(host()->bloom());
RLPStream amalg; RLPStream amalg;
unsigned msgCount = 0; unsigned msgCount = 0;
{ {
@ -119,10 +115,12 @@ void WhisperPeer::noteNewMessage(h256 _h, Envelope const& _m)
m_unseen.insert(make_pair(rating(_m), _h)); m_unseen.insert(make_pair(rating(_m), _h));
} }
void WhisperPeer::advertiseTopicsOfInterest() void WhisperPeer::sendTopicsOfInterest(FixedHash<TopicBloomFilterSize> const& _bloom)
{ {
RLPStream s; RLPStream s;
prep(s, TopicFilterPacket, 1); prep(s, TopicFilterPacket, 1);
s << host()->bloom(); s << _bloom;
sealAndSend(s); sealAndSend(s);
m_advertiseTopicsOfInterest = false;
} }

12
libwhisper/WhisperPeer.h

@ -53,21 +53,27 @@ public:
virtual ~WhisperPeer(); virtual ~WhisperPeer();
WhisperHost* host() const; WhisperHost* host() const;
static std::string name() { return "shh"; } static std::string name() { return "shh"; }
static u256 version() { return 2; } static u256 version() { return 3; }
static unsigned messageCount() { return PacketCount; } static unsigned messageCount() { return PacketCount; }
FixedHash<TopicBloomFilterSize> const& bloom() const { return m_bloom; } FixedHash<TopicBloomFilterSize> bloom() const { dev::Guard g(x_bloom); return m_bloom; }
void advertiseTopicsOfInterest(); ///< sends our bloom filter to remote peer void sendTopicsOfInterest(FixedHash<TopicBloomFilterSize> const& _bloom); ///< sends our bloom filter to remote peer
void noteAdvertiseTopicsOfInterest() { m_advertiseTopicsOfInterest = true; }
private: private:
virtual bool interpret(unsigned _id, RLP const&) override; virtual bool interpret(unsigned _id, RLP const&) override;
void sendMessages(); void sendMessages();
unsigned rating(Envelope const&) const { return 0; } // TODO unsigned rating(Envelope const&) const { return 0; } // TODO
void noteNewMessage(h256 _h, Envelope const& _m); void noteNewMessage(h256 _h, Envelope const& _m);
void setBloom(FixedHash<TopicBloomFilterSize> const& _b) { dev::Guard g(x_bloom); m_bloom = _b; }
mutable dev::Mutex x_unseen; mutable dev::Mutex x_unseen;
std::multimap<unsigned, h256> m_unseen; ///< Rated according to what they want. std::multimap<unsigned, h256> m_unseen; ///< Rated according to what they want.
std::chrono::system_clock::time_point m_timer = std::chrono::system_clock::now(); std::chrono::system_clock::time_point m_timer = std::chrono::system_clock::now();
mutable dev::Mutex x_bloom;
FixedHash<TopicBloomFilterSize> m_bloom; ///< Peer's topics of interest FixedHash<TopicBloomFilterSize> m_bloom; ///< Peer's topics of interest
bool m_advertiseTopicsOfInterest;
}; };
} }

4
test/libwhisper/whisperTopic.cpp

@ -338,7 +338,7 @@ BOOST_AUTO_TEST_CASE(topicAdvertising)
std::vector<std::pair<std::shared_ptr<Session>, std::shared_ptr<Peer>>> sessions; std::vector<std::pair<std::shared_ptr<Session>, std::shared_ptr<Peer>>> sessions;
for (int i = 0; i < 300; ++i) for (int i = 0; i < 600; ++i)
{ {
sessions = whost1->peerSessions(); sessions = whost1->peerSessions();
if (!sessions.empty() && sessions.back().first->cap<WhisperPeer>()->bloom()) if (!sessions.empty() && sessions.back().first->cap<WhisperPeer>()->bloom())
@ -356,7 +356,7 @@ BOOST_AUTO_TEST_CASE(topicAdvertising)
whost1->installWatch(BuildTopicMask("test1")); whost1->installWatch(BuildTopicMask("test1"));
for (int i = 0; i < 300; ++i) for (int i = 0; i < 600; ++i)
{ {
sessions = whost2->peerSessions(); sessions = whost2->peerSessions();
if (!sessions.empty() && sessions.back().first->cap<WhisperPeer>()->bloom()) if (!sessions.empty() && sessions.back().first->cap<WhisperPeer>()->bloom())

Loading…
Cancel
Save