Browse Source

bloom filter exchage protocol

cl-refactor
Vlad Gluhovsky 10 years ago
parent
commit
d254972f40
  1. 10
      libwhisper/WhisperHost.h
  2. 21
      libwhisper/WhisperPeer.cpp
  3. 20
      libwhisper/WhisperPeer.h
  4. 74
      test/libwhisper/whisperTopic.cpp

10
libwhisper/WhisperHost.h

@ -50,11 +50,12 @@ class WhisperHost: public HostCapability<WhisperPeer>, public Interface, public
public:
WhisperHost();
virtual ~WhisperHost();
unsigned protocolVersion() const { return 2; }
void cleanup();
std::map<h256, Envelope> all() const { ReadGuard l(x_messages); return m_messages; }
FixedHash<TopicBloomFilterSize> const& bloom() const { return m_bloom; }
virtual void inject(Envelope const& _e, WhisperPeer* _from = nullptr) override;
virtual Topics const& fullTopics(unsigned _id) const override { try { return m_filters.at(m_watches.at(_id).id).full; } catch (...) { return EmptyTopics; } }
virtual unsigned installWatch(Topics const& _filter) override;
virtual unsigned installWatchOnId(h256 _filterId) override;
@ -62,13 +63,8 @@ public:
virtual h256s peekWatch(unsigned _watchId) const override { dev::Guard l(m_filterLock); try { return m_watches.at(_watchId).changes; } catch (...) { return h256s(); } }
virtual h256s checkWatch(unsigned _watchId) override { cleanup(); dev::Guard l(m_filterLock); h256s ret; try { ret = m_watches.at(_watchId).changes; m_watches.at(_watchId).changes.clear(); } catch (...) {} return ret; }
virtual h256s watchMessages(unsigned _watchId) override; /// returns IDs of messages, which match specific watch criteria
virtual Envelope envelope(h256 _m) const override { try { dev::ReadGuard l(x_messages); return m_messages.at(_m); } catch (...) { return Envelope(); } }
std::map<h256, Envelope> all() const { ReadGuard l(x_messages); return m_messages; }
void cleanup();
protected:
virtual void doWork() override;

21
libwhisper/WhisperPeer.cpp

@ -19,11 +19,10 @@
* @date 2014
*/
#include "WhisperPeer.h"
#include <libdevcore/Log.h>
#include <libp2p/All.h>
#include "WhisperHost.h"
using namespace std;
using namespace dev;
using namespace dev::p2p;
@ -32,7 +31,10 @@ using namespace dev::shh;
WhisperPeer::WhisperPeer(std::shared_ptr<Session> _s, HostCapabilityFace* _h, unsigned _i, CapDesc const&): Capability(_s, _h, _i)
{
RLPStream s;
sealAndSend(prep(s, StatusPacket, 1) << version());
prep(s, StatusPacket, 2);
s << version();
s << host()->bloom();
sealAndSend(s);
}
WhisperPeer::~WhisperPeer()
@ -57,6 +59,9 @@ bool WhisperPeer::interpret(unsigned _id, RLP const& _r)
if (protocolVersion != version())
disable("Invalid protocol version.");
if (_r.itemCount() > 1) // for backwards compatibility
m_bloom = (FixedHash<TopicBloomFilterSize>)_r[1];
for (auto const& m: host()->all())
m_unseen.insert(make_pair(0, m.first));
@ -64,15 +69,15 @@ bool WhisperPeer::interpret(unsigned _id, RLP const& _r)
sendMessages();
break;
}
case MessagesPacket:
case UpdateTopicFilterPacket:
{
for (auto i: _r)
host()->inject(Envelope(i), this);
m_bloom = (FixedHash<TopicBloomFilterSize>)_r[0];
break;
}
case UpdateTopicFilterPacket:
case MessagesPacket:
{
m_bloom = (FixedHash<TopicBloomFilterSize>)_r;
for (auto i: _r)
host()->inject(Envelope(i), this);
break;
}
default:

20
libwhisper/WhisperPeer.h

@ -44,8 +44,6 @@ using p2p::HostCapability;
using p2p::Capability;
using p2p::CapDesc;
/**
*/
class WhisperPeer: public Capability
{
friend class WhisperHost;
@ -53,31 +51,23 @@ class WhisperPeer: public Capability
public:
WhisperPeer(std::shared_ptr<Session> _s, HostCapabilityFace* _h, unsigned _i, CapDesc const& _cap);
virtual ~WhisperPeer();
WhisperHost* host() const;
static std::string name() { return "shh"; }
static u256 version() { return 2; }
static unsigned messageCount() { return PacketCount; }
WhisperHost* host() const;
FixedHash<TopicBloomFilterSize> const& bloom() const { return m_bloom; }
/// called by the host, sends our bloom filter to remote peer
void advertizeTopicsOfInterest(FixedHash<TopicBloomFilterSize> const& _bloom);
FixedHash<TopicBloomFilterSize> const& bloom() const { return m_bloom; }
void advertizeTopicsOfInterest(FixedHash<TopicBloomFilterSize> const& _bloom); ///< sends our bloom filter to remote peer
FixedHash<TopicBloomFilterSize> const& bloom() { return m_bloom; }
private:
virtual bool interpret(unsigned _id, RLP const&) override;
void sendMessages();
unsigned rating(Envelope const&) const { return 0; } // TODO
void noteNewMessage(h256 _h, Envelope const& _m);
mutable dev::Mutex x_unseen;
std::multimap<unsigned, h256> m_unseen; ///< Rated according to what they want.
std::chrono::system_clock::time_point m_timer = std::chrono::system_clock::now();
std::chrono::system_clock::time_point m_timer = std::chrono::system_clock::now();
FixedHash<TopicBloomFilterSize> m_bloom; ///< Peer's topics of interest
};

74
test/libwhisper/whisperTopic.cpp

@ -23,6 +23,7 @@
#include <boost/test/unit_test.hpp>
#include <libp2p/Host.h>
#include <libp2p/Session.h>
#include <libwhisper/WhisperPeer.h>
#include <libwhisper/WhisperHost.h>
#include <test/TestHelper.h>
@ -301,4 +302,77 @@ BOOST_AUTO_TEST_CASE(asyncforwarding)
BOOST_REQUIRE_EQUAL(result, 1);
}
BOOST_AUTO_TEST_CASE(topicAdvertising)
{
if (test::Options::get().nonetwork)
return;
cnote << "Testing Topic Advertising...";
VerbosityHolder setTemporaryLevel(2);
Host host1("first", NetworkPreferences("127.0.0.1", 30303, false));
host1.setIdealPeerCount(1);
auto whost1 = host1.registerCapability(new WhisperHost());
host1.start();
while (!host1.haveNetwork())
this_thread::sleep_for(chrono::milliseconds(10));
Host host2("second", NetworkPreferences("127.0.0.1", 30305, false));
host2.setIdealPeerCount(1);
auto whost2 = host2.registerCapability(new WhisperHost());
auto watch2 = whost2->installWatch(BuildTopicMask("test2"));
host2.start();
while (!host2.haveNetwork())
this_thread::sleep_for(chrono::milliseconds(10));
host1.addNode(host2.id(), NodeIPEndpoint(bi::address::from_string("127.0.0.1"), 30305, 30305));
while (!host1.haveNetwork())
this_thread::sleep_for(chrono::milliseconds(10));
while (!host1.peerCount())
this_thread::sleep_for(chrono::milliseconds(10));
while (!host2.peerCount())
this_thread::sleep_for(chrono::milliseconds(10));
while (!whost1->peerSessions().size())
this_thread::sleep_for(chrono::milliseconds(10));
for (int i = 0; i < 200; ++i)
{
this_thread::sleep_for(chrono::milliseconds(10));
auto sessions1 = whost1->peerSessions();
size_t x = sessions1.size();
BOOST_REQUIRE(x > 0);
if (whost1->peerSessions()[x-1].first->cap<WhisperPeer>()->bloom())
break;
}
BOOST_REQUIRE(whost1->peerSessions().size());
FixedHash<TopicBloomFilterSize> bf1 = whost1->peerSessions().back().first->cap<WhisperPeer>()->bloom();
FixedHash<TopicBloomFilterSize> bf2 = whost2->bloom();
BOOST_REQUIRE_EQUAL(bf1, bf2);
BOOST_REQUIRE(bf1);
BOOST_REQUIRE(!whost1->bloom());
auto watch1 = whost1->installWatch(BuildTopicMask("test1"));
for (int i = 0; i < 300; ++i)
{
this_thread::sleep_for(chrono::milliseconds(10));
if (whost2->peerSessions().back().first->cap<WhisperPeer>()->bloom())
break;
}
auto sessions2 = whost2->peerSessions();
BOOST_REQUIRE(sessions2.size());
BOOST_REQUIRE_EQUAL(sessions2.back().second->id, host1.id());
bf2 = sessions2.back().first->cap<WhisperPeer>()->bloom();
bf1 = whost1->bloom();
BOOST_REQUIRE_EQUAL(bf1, bf2);
BOOST_REQUIRE(bf1);
}
BOOST_AUTO_TEST_SUITE_END()

Loading…
Cancel
Save