Browse Source

Merge pull request #2263 from gluk256/_topic_advert

topic advertisement
cl-refactor
Gav Wood 10 years ago
parent
commit
b9663288d8
  1. 1
      libdevcore/Common.h
  2. 2
      libp2p/Session.cpp
  3. 6
      libwhisper/Common.cpp
  4. 6
      libwhisper/Common.h
  5. 1
      libwhisper/Interface.h
  6. 55
      libwhisper/WhisperHost.cpp
  7. 21
      libwhisper/WhisperHost.h
  8. 26
      libwhisper/WhisperPeer.cpp
  9. 21
      libwhisper/WhisperPeer.h
  10. 27
      test/libwhisper/bloomFilter.cpp
  11. 73
      test/libwhisper/whisperTopic.cpp

1
libdevcore/Common.h

@ -45,6 +45,7 @@
#pragma warning(push) #pragma warning(push)
#pragma GCC diagnostic push #pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter" #pragma GCC diagnostic ignored "-Wunused-parameter"
#pragma GCC diagnostic ignored "-fpermissive"
#include <boost/version.hpp> #include <boost/version.hpp>
#if (BOOST_VERSION == 105800) #if (BOOST_VERSION == 105800)
#include "boost_multiprecision_number_compare_bug_workaround.hpp" #include "boost_multiprecision_number_compare_bug_workaround.hpp"

2
libp2p/Session.cpp

@ -268,7 +268,7 @@ bool Session::interpret(PacketType _t, RLP const& _r)
} }
catch (std::exception const& _e) catch (std::exception const& _e)
{ {
clog(NetWarn) << "Peer causing an exception:" << _e.what() << _r; clog(NetWarn) << "Exception caught in p2p::Session::interpret(): " << _e.what() << ". PacketType: " << _t << ". RLP: " << _r;
disconnect(BadProtocol); disconnect(BadProtocol);
return true; return true;
} }

6
libwhisper/Common.cpp

@ -95,12 +95,12 @@ TopicFilter::TopicFilter(RLP const& _r)
} }
} }
FixedHash<TopicBloomFilter::size> TopicFilter::exportBloom() const FixedHash<TopicBloomFilterSize> TopicFilter::exportBloom() const
{ {
FixedHash<TopicBloomFilter::size> ret; FixedHash<TopicBloomFilterSize> ret;
for (TopicMask const& t: m_topicMasks) for (TopicMask const& t: m_topicMasks)
for (auto const& i: t) for (auto const& i: t)
ret |= i.first.template bloomPart<TopicBloomFilter::BitsPerBloom, TopicBloomFilter::size>(); ret |= i.first.template bloomPart<TopicBloomFilter::BitsPerBloom, TopicBloomFilterSize>();
return ret; return ret;
} }

6
libwhisper/Common.h

@ -54,12 +54,12 @@ enum WhisperPacket
{ {
StatusPacket = 0, StatusPacket = 0,
MessagesPacket, MessagesPacket,
AddFilterPacket, TopicFilterPacket,
RemoveFilterPacket,
PacketCount PacketCount
}; };
enum { TopicBloomFilterSize = 8 }; enum { TopicBloomFilterSize = 64 };
enum { WhisperProtocolVersion = 3 };
using AbridgedTopic = FixedHash<4>; using AbridgedTopic = FixedHash<4>;
using Topic = h256; using Topic = h256;

1
libwhisper/Interface.h

@ -67,7 +67,6 @@ public:
virtual Topics const& fullTopics(unsigned _id) const = 0; virtual Topics const& fullTopics(unsigned _id) const = 0;
virtual unsigned installWatch(Topics const& _filter) = 0; virtual unsigned installWatch(Topics const& _filter) = 0;
virtual unsigned installWatchOnId(h256 _filterId) = 0;
virtual void uninstallWatch(unsigned _watchId) = 0; virtual void uninstallWatch(unsigned _watchId) = 0;
virtual h256s peekWatch(unsigned _watchId) const = 0; virtual h256s peekWatch(unsigned _watchId) const = 0;
virtual h256s checkWatch(unsigned _watchId) = 0; virtual h256s checkWatch(unsigned _watchId) = 0;

55
libwhisper/WhisperHost.cpp

@ -66,12 +66,13 @@ void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p)
m_expiryQueue.insert(make_pair(_m.expiry(), h)); m_expiryQueue.insert(make_pair(_m.expiry(), h));
} }
// if (_p) DEV_GUARDED(m_filterLock)
{ {
Guard l(m_filterLock);
for (auto const& f: m_filters) for (auto const& f: m_filters)
if (f.second.filter.matches(_m)) if (f.second.filter.matches(_m))
noteChanged(h, f.first); for (auto& i: m_watches)
if (i.second.id == f.first)
i.second.changes.push_back(h);
} }
// TODO p2p: capability-based rating // TODO p2p: capability-based rating
@ -85,36 +86,26 @@ void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p)
} }
} }
void WhisperHost::noteChanged(h256 _messageHash, h256 _filter)
{
for (auto& i: m_watches)
if (i.second.id == _filter)
{
cwatshh << "!!!" << i.first << i.second.id;
i.second.changes.push_back(_messageHash);
}
}
unsigned WhisperHost::installWatchOnId(h256 _h)
{
auto ret = m_watches.size() ? m_watches.rbegin()->first + 1 : 0;
m_watches[ret] = ClientWatch(_h);
cwatshh << "+++" << ret << _h;
return ret;
}
unsigned WhisperHost::installWatch(shh::Topics const& _t) unsigned WhisperHost::installWatch(shh::Topics const& _t)
{ {
Guard l(m_filterLock);
InstalledFilter f(_t); InstalledFilter f(_t);
h256 h = f.filter.sha3(); h256 h = f.filter.sha3();
unsigned ret = 0;
DEV_GUARDED(m_filterLock)
{
if (!m_filters.count(h)) if (!m_filters.count(h))
m_filters.insert(make_pair(h, f)); m_filters.insert(make_pair(h, f));
ret = m_watches.size() ? m_watches.rbegin()->first + 1 : 0;
m_watches[ret] = ClientWatch(h);
cwatshh << "+++" << ret << h;
m_bloom.addRaw(f.filter.exportBloom()); m_bloom.addRaw(f.filter.exportBloom());
return installWatchOnId(h); }
noteAdvertiseTopicsOfInterest();
return ret;
} }
h256s WhisperHost::watchMessages(unsigned _watchId) h256s WhisperHost::watchMessages(unsigned _watchId)
@ -142,8 +133,8 @@ void WhisperHost::uninstallWatch(unsigned _i)
{ {
cwatshh << "XXX" << _i; cwatshh << "XXX" << _i;
Guard l(m_filterLock); DEV_GUARDED(m_filterLock)
{
auto it = m_watches.find(_i); auto it = m_watches.find(_i);
if (it == m_watches.end()) if (it == m_watches.end())
return; return;
@ -153,10 +144,14 @@ void WhisperHost::uninstallWatch(unsigned _i)
auto fit = m_filters.find(id); auto fit = m_filters.find(id);
if (fit != m_filters.end()) if (fit != m_filters.end())
{ {
m_bloom.removeRaw(fit->second.filter.exportBloom());
if (!--fit->second.refCount) if (!--fit->second.refCount)
m_filters.erase(fit); m_filters.erase(fit);
m_bloom.removeRaw(fit->second.filter.exportBloom());
}
} }
noteAdvertiseTopicsOfInterest();
} }
void WhisperHost::doWork() void WhisperHost::doWork()
@ -175,3 +170,9 @@ void WhisperHost::cleanup()
for (auto it = m_expiryQueue.begin(); it != m_expiryQueue.end() && it->first <= now; it = m_expiryQueue.erase(it)) for (auto it = m_expiryQueue.begin(); it != m_expiryQueue.end() && it->first <= now; it = m_expiryQueue.erase(it))
m_messages.erase(it->second); m_messages.erase(it->second);
} }
void WhisperHost::noteAdvertiseTopicsOfInterest()
{
for (auto i: peerSessions())
i.first->cap<WhisperPeer>().get()->noteAdvertiseTopicsOfInterest();
}

21
libwhisper/WhisperHost.h

@ -50,36 +50,31 @@ class WhisperHost: public HostCapability<WhisperPeer>, public Interface, public
public: public:
WhisperHost(); WhisperHost();
virtual ~WhisperHost(); virtual ~WhisperHost();
unsigned protocolVersion() const { return WhisperProtocolVersion; }
unsigned protocolVersion() const { return 2; } /// remove old messages
void cleanup();
std::map<h256, Envelope> all() const { dev::ReadGuard l(x_messages); return m_messages; }
FixedHash<TopicBloomFilterSize> bloom() const { dev::Guard l(m_filterLock); return m_bloom; }
virtual void inject(Envelope const& _e, WhisperPeer* _from = nullptr) override; virtual void inject(Envelope const& _e, WhisperPeer* _from = nullptr) override;
virtual Topics const& fullTopics(unsigned _id) const override { try { return m_filters.at(m_watches.at(_id).id).full; } catch (...) { return EmptyTopics; } } virtual Topics const& fullTopics(unsigned _id) const override { try { return m_filters.at(m_watches.at(_id).id).full; } catch (...) { return EmptyTopics; } }
virtual unsigned installWatch(Topics const& _filter) override; virtual unsigned installWatch(Topics const& _filter) override;
virtual unsigned installWatchOnId(h256 _filterId) override;
virtual void uninstallWatch(unsigned _watchId) override; virtual void uninstallWatch(unsigned _watchId) override;
virtual h256s peekWatch(unsigned _watchId) const override { dev::Guard l(m_filterLock); try { return m_watches.at(_watchId).changes; } catch (...) { return h256s(); } } virtual h256s peekWatch(unsigned _watchId) const override { dev::Guard l(m_filterLock); try { return m_watches.at(_watchId).changes; } catch (...) { return h256s(); } }
virtual h256s checkWatch(unsigned _watchId) override { cleanup(); dev::Guard l(m_filterLock); h256s ret; try { ret = m_watches.at(_watchId).changes; m_watches.at(_watchId).changes.clear(); } catch (...) {} return ret; } virtual h256s checkWatch(unsigned _watchId) override { cleanup(); dev::Guard l(m_filterLock); h256s ret; try { ret = m_watches.at(_watchId).changes; m_watches.at(_watchId).changes.clear(); } catch (...) {} return ret; }
virtual h256s watchMessages(unsigned _watchId) override; /// returns IDs of messages, which match specific watch criteria /// returns IDs of messages, which match specific watch criteria
virtual h256s watchMessages(unsigned _watchId) override;
virtual Envelope envelope(h256 _m) const override { try { dev::ReadGuard l(x_messages); return m_messages.at(_m); } catch (...) { return Envelope(); } } virtual Envelope envelope(h256 _m) const override { try { dev::ReadGuard l(x_messages); return m_messages.at(_m); } catch (...) { return Envelope(); } }
std::map<h256, Envelope> all() const { ReadGuard l(x_messages); return m_messages; }
void cleanup();
protected: protected:
virtual void doWork() override; virtual void doWork() override;
void noteAdvertiseTopicsOfInterest();
private: private:
virtual void onStarting() override { startWorking(); } virtual void onStarting() override { startWorking(); }
virtual void onStopping() override { stopWorking(); } virtual void onStopping() override { stopWorking(); }
void streamMessage(h256 _m, RLPStream& _s) const; void streamMessage(h256 _m, RLPStream& _s) const;
void noteChanged(h256 _messageHash, h256 _filter);
mutable dev::SharedMutex x_messages; mutable dev::SharedMutex x_messages;
std::map<h256, Envelope> m_messages; std::map<h256, Envelope> m_messages;
std::multimap<unsigned, h256> m_expiryQueue; std::multimap<unsigned, h256> m_expiryQueue;

26
libwhisper/WhisperPeer.cpp

@ -19,11 +19,10 @@
* @date 2014 * @date 2014
*/ */
#include "WhisperPeer.h"
#include <libdevcore/Log.h> #include <libdevcore/Log.h>
#include <libp2p/All.h> #include <libp2p/All.h>
#include "WhisperHost.h" #include "WhisperHost.h"
using namespace std; using namespace std;
using namespace dev; using namespace dev;
using namespace dev::p2p; using namespace dev::p2p;
@ -33,6 +32,7 @@ WhisperPeer::WhisperPeer(std::shared_ptr<Session> _s, HostCapabilityFace* _h, un
{ {
RLPStream s; RLPStream s;
sealAndSend(prep(s, StatusPacket, 1) << version()); sealAndSend(prep(s, StatusPacket, 1) << version());
noteAdvertiseTopicsOfInterest();
} }
WhisperPeer::~WhisperPeer() WhisperPeer::~WhisperPeer()
@ -62,6 +62,8 @@ bool WhisperPeer::interpret(unsigned _id, RLP const& _r)
if (session()->id() < host()->host()->id()) if (session()->id() < host()->host()->id())
sendMessages(); sendMessages();
noteAdvertiseTopicsOfInterest();
break; break;
} }
case MessagesPacket: case MessagesPacket:
@ -70,6 +72,11 @@ bool WhisperPeer::interpret(unsigned _id, RLP const& _r)
host()->inject(Envelope(i), this); host()->inject(Envelope(i), this);
break; break;
} }
case TopicFilterPacket:
{
setBloom((FixedHash<TopicBloomFilterSize>)_r[0]);
break;
}
default: default:
return false; return false;
} }
@ -78,6 +85,9 @@ bool WhisperPeer::interpret(unsigned _id, RLP const& _r)
void WhisperPeer::sendMessages() void WhisperPeer::sendMessages()
{ {
if (m_advertiseTopicsOfInterest)
sendTopicsOfInterest(host()->bloom());
RLPStream amalg; RLPStream amalg;
unsigned msgCount = 0; unsigned msgCount = 0;
{ {
@ -104,3 +114,15 @@ void WhisperPeer::noteNewMessage(h256 _h, Envelope const& _m)
Guard l(x_unseen); Guard l(x_unseen);
m_unseen.insert(make_pair(rating(_m), _h)); m_unseen.insert(make_pair(rating(_m), _h));
} }
void WhisperPeer::sendTopicsOfInterest(FixedHash<TopicBloomFilterSize> const& _bloom)
{
DEV_GUARDED(x_advertiseTopicsOfInterest)
m_advertiseTopicsOfInterest = false;
RLPStream s;
prep(s, TopicFilterPacket, 1);
s << _bloom;
sealAndSend(s);
}

21
libwhisper/WhisperPeer.h

@ -44,8 +44,6 @@ using p2p::HostCapability;
using p2p::Capability; using p2p::Capability;
using p2p::CapDesc; using p2p::CapDesc;
/**
*/
class WhisperPeer: public Capability class WhisperPeer: public Capability
{ {
friend class WhisperHost; friend class WhisperHost;
@ -53,25 +51,30 @@ class WhisperPeer: public Capability
public: public:
WhisperPeer(std::shared_ptr<Session> _s, HostCapabilityFace* _h, unsigned _i, CapDesc const& _cap); WhisperPeer(std::shared_ptr<Session> _s, HostCapabilityFace* _h, unsigned _i, CapDesc const& _cap);
virtual ~WhisperPeer(); virtual ~WhisperPeer();
WhisperHost* host() const;
static std::string name() { return "shh"; } static std::string name() { return "shh"; }
static u256 version() { return 2; } static u256 version() { return WhisperProtocolVersion; }
static unsigned messageCount() { return PacketCount; } static unsigned messageCount() { return PacketCount; }
FixedHash<TopicBloomFilterSize> bloom() const { dev::Guard g(x_bloom); return m_bloom; }
WhisperHost* host() const; void sendTopicsOfInterest(FixedHash<TopicBloomFilterSize> const& _bloom); ///< sends our bloom filter to remote peer
void noteAdvertiseTopicsOfInterest() { dev::Guard g(x_advertiseTopicsOfInterest); m_advertiseTopicsOfInterest = true; }
private: private:
virtual bool interpret(unsigned _id, RLP const&) override; virtual bool interpret(unsigned _id, RLP const&) override;
void sendMessages(); void sendMessages();
unsigned rating(Envelope const&) const { return 0; } // TODO unsigned rating(Envelope const&) const { return 0; } // TODO
void noteNewMessage(h256 _h, Envelope const& _m); void noteNewMessage(h256 _h, Envelope const& _m);
void setBloom(FixedHash<TopicBloomFilterSize> const& _b) { dev::Guard g(x_bloom); m_bloom = _b; }
mutable dev::Mutex x_unseen; mutable dev::Mutex x_unseen;
std::multimap<unsigned, h256> m_unseen; ///< Rated according to what they want. std::multimap<unsigned, h256> m_unseen; ///< Rated according to what they want.
std::chrono::system_clock::time_point m_timer = std::chrono::system_clock::now(); std::chrono::system_clock::time_point m_timer = std::chrono::system_clock::now();
mutable dev::Mutex x_bloom;
FixedHash<TopicBloomFilterSize> m_bloom; ///< Peer's topics of interest
mutable dev::Mutex x_advertiseTopicsOfInterest;
bool m_advertiseTopicsOfInterest;
}; };
} }

27
test/libwhisper/bloomFilter.cpp

@ -28,8 +28,7 @@ using namespace dev;
using namespace dev::shh; using namespace dev::shh;
using TopicBloomFilterShort = TopicBloomFilterBase<4>; using TopicBloomFilterShort = TopicBloomFilterBase<4>;
using TopicBloomFilterLong = TopicBloomFilterBase<8>; using TopicBloomFilterTest = TopicBloomFilterBase<TopicBloomFilterSize>;
using TopicBloomFilterTest = TopicBloomFilterLong;
void testAddNonExisting(TopicBloomFilterShort& _f, AbridgedTopic const& _h) void testAddNonExisting(TopicBloomFilterShort& _f, AbridgedTopic const& _h)
{ {
@ -59,7 +58,7 @@ void testRemoveExistingBloom(TopicBloomFilterShort& _f, AbridgedTopic const& _h)
BOOST_REQUIRE(!_f.containsBloom(_h)); BOOST_REQUIRE(!_f.containsBloom(_h));
} }
int calculateExpected(TopicBloomFilterTest const& f, int const n) double calculateExpected(TopicBloomFilterTest const& f, int n)
{ {
int const m = f.size * 8; // number of bits in the bloom int const m = f.size * 8; // number of bits in the bloom
int const k = f.BitsPerBloom; // number of hash functions (e.g. bits set to 1 in every bloom) int const k = f.BitsPerBloom; // number of hash functions (e.g. bits set to 1 in every bloom)
@ -77,10 +76,10 @@ int calculateExpected(TopicBloomFilterTest const& f, int const n)
for (int i = 0; i < k; ++i) for (int i = 0; i < k; ++i)
kBitsSet *= single; kBitsSet *= single;
return static_cast<int>(kBitsSet * 100 + 0.5); // in percents, rounded up return kBitsSet;
} }
void testFalsePositiveRate(TopicBloomFilterTest const& f, int const inserted, Topic& x) double testFalsePositiveRate(TopicBloomFilterTest const& f, int inserted, Topic& x)
{ {
int const c_sampleSize = 1000; int const c_sampleSize = 1000;
int falsePositive = 0; int falsePositive = 0;
@ -93,12 +92,14 @@ void testFalsePositiveRate(TopicBloomFilterTest const& f, int const inserted, To
++falsePositive; ++falsePositive;
} }
falsePositive /= (c_sampleSize / 100); // in percents double res = double(falsePositive) / double(c_sampleSize);
int expected = calculateExpected(f, inserted);
int allowed = expected + (expected / 5); // allow deviations ~20%
//cnote << "Inserted: " << inserted << ", False Positive Rate: " << falsePositive << ", Expected: " << expected; double expected = calculateExpected(f, inserted);
BOOST_REQUIRE(falsePositive <= allowed); double allowed = expected * 1.2 + 0.05; // allow deviations ~25%
//cnote << "Inserted: " << inserted << ", False Positive Rate: " << res << ", Expected: " << expected;
BOOST_REQUIRE(res <= allowed);
return expected;
} }
BOOST_AUTO_TEST_SUITE(bloomFilter) BOOST_AUTO_TEST_SUITE(bloomFilter)
@ -111,11 +112,13 @@ BOOST_AUTO_TEST_CASE(falsePositiveRate)
TopicBloomFilterTest f; TopicBloomFilterTest f;
Topic x(0xC0DEFEED); // deterministic pseudorandom value Topic x(0xC0DEFEED); // deterministic pseudorandom value
for (int i = 1; i < 21; ++i) double expectedRate = 0;
for (int i = 1; i < 50 && isless(expectedRate, 0.5); ++i)
{ {
x = sha3(x); x = sha3(x);
f.addBloom(AbridgedTopic(x)); f.addBloom(AbridgedTopic(x));
testFalsePositiveRate(f, i, x); expectedRate = testFalsePositiveRate(f, i, x);
} }
} }

73
test/libwhisper/whisperTopic.cpp

@ -23,6 +23,7 @@
#include <boost/test/unit_test.hpp> #include <boost/test/unit_test.hpp>
#include <libp2p/Host.h> #include <libp2p/Host.h>
#include <libp2p/Session.h>
#include <libwhisper/WhisperPeer.h> #include <libwhisper/WhisperPeer.h>
#include <libwhisper/WhisperHost.h> #include <libwhisper/WhisperHost.h>
#include <test/TestHelper.h> #include <test/TestHelper.h>
@ -301,4 +302,76 @@ BOOST_AUTO_TEST_CASE(asyncforwarding)
BOOST_REQUIRE_EQUAL(result, 1); BOOST_REQUIRE_EQUAL(result, 1);
} }
BOOST_AUTO_TEST_CASE(topicAdvertising)
{
if (test::Options::get().nonetwork)
return;
cnote << "Testing Topic Advertising...";
VerbosityHolder setTemporaryLevel(2);
Host host1("first", NetworkPreferences("127.0.0.1", 30303, false));
host1.setIdealPeerCount(1);
auto whost1 = host1.registerCapability(new WhisperHost());
host1.start();
while (!host1.haveNetwork())
this_thread::sleep_for(chrono::milliseconds(10));
Host host2("second", NetworkPreferences("127.0.0.1", 30305, false));
host2.setIdealPeerCount(1);
auto whost2 = host2.registerCapability(new WhisperHost());
whost2->installWatch(BuildTopicMask("test2"));
host2.start();
while (!host2.haveNetwork())
this_thread::sleep_for(chrono::milliseconds(10));
host1.addNode(host2.id(), NodeIPEndpoint(bi::address::from_string("127.0.0.1"), 30305, 30305));
while (!host1.haveNetwork())
this_thread::sleep_for(chrono::milliseconds(10));
while (!host1.peerCount())
this_thread::sleep_for(chrono::milliseconds(10));
while (!host2.peerCount())
this_thread::sleep_for(chrono::milliseconds(10));
std::vector<std::pair<std::shared_ptr<Session>, std::shared_ptr<Peer>>> sessions;
for (int i = 0; i < 600; ++i)
{
sessions = whost1->peerSessions();
if (!sessions.empty() && sessions.back().first->cap<WhisperPeer>()->bloom())
break;
else
this_thread::sleep_for(chrono::milliseconds(10));
}
BOOST_REQUIRE(sessions.size());
FixedHash<TopicBloomFilterSize> bf1 = sessions.back().first->cap<WhisperPeer>()->bloom();
FixedHash<TopicBloomFilterSize> bf2 = whost2->bloom();
BOOST_REQUIRE_EQUAL(bf1, bf2);
BOOST_REQUIRE(bf1);
BOOST_REQUIRE(!whost1->bloom());
whost1->installWatch(BuildTopicMask("test1"));
for (int i = 0; i < 600; ++i)
{
sessions = whost2->peerSessions();
if (!sessions.empty() && sessions.back().first->cap<WhisperPeer>()->bloom())
break;
else
this_thread::sleep_for(chrono::milliseconds(10));
}
BOOST_REQUIRE(sessions.size());
BOOST_REQUIRE_EQUAL(sessions.back().second->id, host1.id());
bf2 = sessions.back().first->cap<WhisperPeer>()->bloom();
bf1 = whost1->bloom();
BOOST_REQUIRE_EQUAL(bf1, bf2);
BOOST_REQUIRE(bf1);
}
BOOST_AUTO_TEST_SUITE_END() BOOST_AUTO_TEST_SUITE_END()

Loading…
Cancel
Save