Browse Source

Merge pull request #2348 from gluk256/_rating

Rating system for outgoing messages introduced
cl-refactor
Gav Wood 10 years ago
parent
commit
35dd7d919c
  1. 2
      libp2p/Session.cpp
  2. 28
      libwhisper/BloomFilter.cpp
  3. 6
      libwhisper/BloomFilter.h
  4. 2
      libwhisper/Common.cpp
  5. 7
      libwhisper/Common.h
  6. 9
      libwhisper/Message.cpp
  7. 2
      libwhisper/Message.h
  8. 24
      libwhisper/WhisperHost.cpp
  9. 2
      libwhisper/WhisperHost.h
  10. 41
      libwhisper/WhisperPeer.cpp
  11. 4
      libwhisper/WhisperPeer.h
  12. 8
      test/libwhisper/bloomFilter.cpp

2
libp2p/Session.cpp

@ -382,7 +382,7 @@ void Session::doRead()
} }
catch (std::exception const& _e) catch (std::exception const& _e)
{ {
clog(NetWarn) << "Exception decoding frame header RLP:" << bytesConstRef(m_data.data(), h128::size).cropped(3); clog(NetWarn) << "Exception decoding frame header RLP:" << _e.what() << bytesConstRef(m_data.data(), h128::size).cropped(3);
drop(BadProtocol); drop(BadProtocol);
return; return;
} }

28
libwhisper/BloomFilter.cpp

@ -1,28 +0,0 @@
/*
This file is part of cpp-ethereum.
cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file BloomFilter.cpp
* @author Vladislav Gluhovsky <vlad@ethdev.com>
* @date June 2015
*/
#include "BloomFilter.h"
using namespace std;
using namespace dev;
using namespace dev::shh;

6
libwhisper/BloomFilter.h

@ -43,13 +43,11 @@ public:
void removeRaw(FixedHash<N> const& _h); void removeRaw(FixedHash<N> const& _h);
bool containsRaw(FixedHash<N> const& _h) const { return this->contains(_h); } bool containsRaw(FixedHash<N> const& _h) const { return this->contains(_h); }
enum { BitsPerBloom = 3 };
private: private:
void init() { for (unsigned i = 0; i < CounterSize; ++i) m_refCounter[i] = 0; } void init() { for (unsigned i = 0; i < CounterSize; ++i) m_refCounter[i] = 0; }
static bool isBitSet(FixedHash<N> const& _h, unsigned _index); static bool isBitSet(FixedHash<N> const& _h, unsigned _index);
enum { CounterSize = 8 * TopicBloomFilterBase::size }; static const unsigned CounterSize = N * 8;
std::array<uint16_t, CounterSize> m_refCounter; std::array<uint16_t, CounterSize> m_refCounter;
}; };
@ -91,7 +89,7 @@ bool TopicBloomFilterBase<N>::isBitSet(FixedHash<N> const& _h, unsigned _index)
return (_h[iByte] & c_powerOfTwoBitMmask[iBit]) != 0; return (_h[iByte] & c_powerOfTwoBitMmask[iBit]) != 0;
} }
using TopicBloomFilter = TopicBloomFilterBase<c_topicBloomFilterSize>; using TopicBloomFilter = TopicBloomFilterBase<TopicBloomFilterSize>;
} }
} }

2
libwhisper/Common.cpp

@ -100,7 +100,7 @@ TopicBloomFilterHash TopicFilter::exportBloom() const
TopicBloomFilterHash ret; TopicBloomFilterHash ret;
for (TopicMask const& t: m_topicMasks) for (TopicMask const& t: m_topicMasks)
for (auto const& i: t) for (auto const& i: t)
ret |= i.first.template bloomPart<TopicBloomFilter::BitsPerBloom, c_topicBloomFilterSize>(); ret |= i.first.template bloomPart<BitsPerBloom, TopicBloomFilterSize>();
return ret; return ret;
} }

7
libwhisper/Common.h

@ -58,8 +58,9 @@ enum WhisperPacket
PacketCount PacketCount
}; };
static const int c_topicBloomFilterSize = 64; static const unsigned TopicBloomFilterSize = 64;
static const int c_whisperProtocolVersion = 3; static const unsigned BitsPerBloom = 3;
static const unsigned WhisperProtocolVersion = 3;
using AbridgedTopic = FixedHash<4>; using AbridgedTopic = FixedHash<4>;
using Topic = h256; using Topic = h256;
@ -67,7 +68,7 @@ using Topic = h256;
using AbridgedTopics = std::vector<AbridgedTopic>; using AbridgedTopics = std::vector<AbridgedTopic>;
using Topics = h256s; using Topics = h256s;
using TopicBloomFilterHash = FixedHash<c_topicBloomFilterSize>; using TopicBloomFilterHash = FixedHash<TopicBloomFilterSize>;
AbridgedTopic abridge(Topic const& _topic); AbridgedTopic abridge(Topic const& _topic);
AbridgedTopics abridge(Topics const& _topics); AbridgedTopics abridge(Topics const& _topics);

9
libwhisper/Message.cpp

@ -181,3 +181,12 @@ void Envelope::proveWork(unsigned _ms)
} }
} }
} }
bool Envelope::matchesBloomFilter(TopicBloomFilterHash const& f) const
{
for (AbridgedTopic t: m_topic)
if (f.contains(t.template bloomPart<BitsPerBloom, TopicBloomFilterSize>()))
return true;
return false;
}

2
libwhisper/Message.h

@ -80,6 +80,8 @@ public:
unsigned workProved() const; unsigned workProved() const;
void proveWork(unsigned _ms); void proveWork(unsigned _ms);
bool matchesBloomFilter(TopicBloomFilterHash const& f) const;
private: private:
Envelope(unsigned _exp, unsigned _ttl, AbridgedTopics const& _topic): m_expiry(_exp), m_ttl(_ttl), m_topic(_topic) {} Envelope(unsigned _exp, unsigned _ttl, AbridgedTopics const& _topic): m_expiry(_exp), m_ttl(_ttl), m_topic(_topic) {}

24
libwhisper/WhisperHost.cpp

@ -51,6 +51,8 @@ void WhisperHost::streamMessage(h256 _m, RLPStream& _s) const
void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p) void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p)
{ {
// this function processes messages originated both by local host (_p == null), and by remote peers (_p != null)
cnote << this << ": inject: " << _m.expiry() << _m.ttl() << _m.topic() << toHex(_m.data()); cnote << this << ": inject: " << _m.expiry() << _m.ttl() << _m.topic() << toHex(_m.data());
if (_m.expiry() <= (unsigned)time(0)) if (_m.expiry() <= (unsigned)time(0))
@ -66,21 +68,25 @@ void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p)
m_expiryQueue.insert(make_pair(_m.expiry(), h)); m_expiryQueue.insert(make_pair(_m.expiry(), h));
} }
DEV_GUARDED(m_filterLock) int rating = 1; // rating for local host is based upon: 1. installed watch; 2. proof of work
{
for (auto const& f: m_filters) if (_p) // incoming message from remote peer
if (f.second.filter.matches(_m)) DEV_GUARDED(m_filterLock)
for (auto& i: m_watches) for (auto const& f: m_filters)
if (i.second.id == f.first) if (f.second.filter.matches(_m))
i.second.changes.push_back(h); for (auto& i: m_watches)
} if (i.second.id == f.first)
{
i.second.changes.push_back(h);
rating += 10; // subject to review
}
// TODO p2p: capability-based rating // TODO p2p: capability-based rating
for (auto i: peerSessions()) for (auto i: peerSessions())
{ {
auto w = i.first->cap<WhisperPeer>().get(); auto w = i.first->cap<WhisperPeer>().get();
if (w == _p) if (w == _p)
w->addRating(1); w->addRating(rating);
else else
w->noteNewMessage(h, _m); w->noteNewMessage(h, _m);
} }

2
libwhisper/WhisperHost.h

@ -50,7 +50,7 @@ class WhisperHost: public HostCapability<WhisperPeer>, public Interface, public
public: public:
WhisperHost(); WhisperHost();
virtual ~WhisperHost(); virtual ~WhisperHost();
unsigned protocolVersion() const { return c_whisperProtocolVersion; } unsigned protocolVersion() const { return WhisperProtocolVersion; }
/// remove old messages /// remove old messages
void cleanup(); void cleanup();
std::map<h256, Envelope> all() const { dev::ReadGuard l(x_messages); return m_messages; } std::map<h256, Envelope> all() const { dev::ReadGuard l(x_messages); return m_messages; }

41
libwhisper/WhisperPeer.cpp

@ -91,19 +91,17 @@ void WhisperPeer::sendMessages()
if (m_advertiseTopicsOfInterest) if (m_advertiseTopicsOfInterest)
sendTopicsOfInterest(host()->bloom()); sendTopicsOfInterest(host()->bloom());
multimap<unsigned, h256> available;
DEV_GUARDED(x_unseen)
m_unseen.swap(available);
RLPStream amalg; RLPStream amalg;
unsigned msgCount = 0;
{
Guard l(x_unseen);
msgCount = m_unseen.size();
while (m_unseen.size())
{
auto p = *m_unseen.begin();
m_unseen.erase(m_unseen.begin());
host()->streamMessage(p.second, amalg);
}
}
// send the highest rated messages first
for (auto i = available.rbegin(); i != available.rend(); ++i)
host()->streamMessage(i->second, amalg);
unsigned msgCount = available.size();
if (msgCount) if (msgCount)
{ {
RLPStream s; RLPStream s;
@ -114,8 +112,27 @@ void WhisperPeer::sendMessages()
void WhisperPeer::noteNewMessage(h256 _h, Envelope const& _m) void WhisperPeer::noteNewMessage(h256 _h, Envelope const& _m)
{ {
unsigned rate = ratingForPeer(_m);
Guard l(x_unseen); Guard l(x_unseen);
m_unseen.insert(make_pair(rating(_m), _h)); m_unseen.insert(make_pair(rate, _h));
}
unsigned WhisperPeer::ratingForPeer(Envelope const& e) const
{
// we try to estimate, how valuable this nessage will be for the remote peer,
// according to the following criteria:
// 1. bloom filter
// 2. proof of work
static const unsigned BloomFilterMatchReward = 256; // vlad todo: move to common.h
unsigned rating = 0;
DEV_GUARDED(x_bloom)
if (e.matchesBloomFilter(m_bloom))
rating += BloomFilterMatchReward;
rating += e.sha3().firstBitSet();
return rating;
} }
void WhisperPeer::sendTopicsOfInterest(TopicBloomFilterHash const& _bloom) void WhisperPeer::sendTopicsOfInterest(TopicBloomFilterHash const& _bloom)

4
libwhisper/WhisperPeer.h

@ -53,7 +53,7 @@ public:
virtual ~WhisperPeer(); virtual ~WhisperPeer();
WhisperHost* host() const; WhisperHost* host() const;
static std::string name() { return "shh"; } static std::string name() { return "shh"; }
static u256 version() { return c_whisperProtocolVersion; } static u256 version() { return WhisperProtocolVersion; }
static unsigned messageCount() { return PacketCount; } static unsigned messageCount() { return PacketCount; }
TopicBloomFilterHash bloom() const { dev::Guard g(x_bloom); return m_bloom; } TopicBloomFilterHash bloom() const { dev::Guard g(x_bloom); return m_bloom; }
void sendTopicsOfInterest(TopicBloomFilterHash const& _bloom); ///< sends our bloom filter to remote peer void sendTopicsOfInterest(TopicBloomFilterHash const& _bloom); ///< sends our bloom filter to remote peer
@ -62,7 +62,7 @@ public:
private: private:
virtual bool interpret(unsigned _id, RLP const&) override; virtual bool interpret(unsigned _id, RLP const&) override;
void sendMessages(); void sendMessages();
unsigned rating(Envelope const&) const { return 0; } // TODO unsigned ratingForPeer(Envelope const& e) const;
void noteNewMessage(h256 _h, Envelope const& _m); void noteNewMessage(h256 _h, Envelope const& _m);
void setBloom(TopicBloomFilterHash const& _b) { dev::Guard g(x_bloom); m_bloom = _b; } void setBloom(TopicBloomFilterHash const& _b) { dev::Guard g(x_bloom); m_bloom = _b; }

8
test/libwhisper/bloomFilter.cpp

@ -28,7 +28,7 @@ using namespace dev;
using namespace dev::shh; using namespace dev::shh;
using TopicBloomFilterShort = TopicBloomFilterBase<4>; using TopicBloomFilterShort = TopicBloomFilterBase<4>;
using TopicBloomFilterTest = TopicBloomFilterBase<c_topicBloomFilterSize>; using TopicBloomFilterTest = TopicBloomFilterBase<TopicBloomFilterSize>;
void testAddNonExisting(TopicBloomFilterShort& _f, AbridgedTopic const& _h) void testAddNonExisting(TopicBloomFilterShort& _f, AbridgedTopic const& _h)
{ {
@ -61,7 +61,7 @@ void testRemoveExistingBloom(TopicBloomFilterShort& _f, AbridgedTopic const& _h)
double calculateExpected(TopicBloomFilterTest const& f, int n) double calculateExpected(TopicBloomFilterTest const& f, int n)
{ {
int const m = f.size * 8; // number of bits in the bloom int const m = f.size * 8; // number of bits in the bloom
int const k = f.BitsPerBloom; // number of hash functions (e.g. bits set to 1 in every bloom) int const k = BitsPerBloom; // number of hash functions (e.g. bits set to 1 in every bloom)
double singleBitSet = 1.0 / m; // probability of any bit being set after inserting a single bit double singleBitSet = 1.0 / m; // probability of any bit being set after inserting a single bit
double singleBitNotSet = (1.0 - singleBitSet); double singleBitNotSet = (1.0 - singleBitSet);
@ -256,7 +256,7 @@ void updateDistribution(FixedHash<DistributionTestSize> const& _h, array<unsigne
if (_h[i] & c_powerOfTwoBitMmask[j]) if (_h[i] & c_powerOfTwoBitMmask[j])
{ {
_distribution[i * 8 + j]++; _distribution[i * 8 + j]++;
if (++bits >= TopicBloomFilterTest::BitsPerBloom) if (++bits >= BitsPerBloom)
return; return;
} }
} }
@ -274,7 +274,7 @@ BOOST_AUTO_TEST_CASE(distributionRate)
for (unsigned i = 0; i < 22000; ++i) for (unsigned i = 0; i < 22000; ++i)
{ {
x = sha3(x); x = sha3(x);
FixedHash<DistributionTestSize> h = x.template bloomPart<TopicBloomFilterTest::BitsPerBloom, DistributionTestSize>(); FixedHash<DistributionTestSize> h = x.template bloomPart<BitsPerBloom, DistributionTestSize>();
updateDistribution(h, distribution); updateDistribution(h, distribution);
} }

Loading…
Cancel
Save