Browse Source

Merge pull request #2551 from gluk256/_save_topix

Watch serialization
cl-refactor
Gav Wood 10 years ago
parent
commit
09c4832452
  1. 3
      libp2p/Common.h
  2. 49
      libwhisper/WhisperDB.cpp
  3. 32
      libwhisper/WhisperDB.h
  4. 90
      libwhisper/WhisperHost.cpp
  5. 8
      libwhisper/WhisperHost.h
  6. 14
      test/libp2p/peer.cpp
  7. 103
      test/libwhisper/whisperDB.cpp
  8. 39
      test/libwhisper/whisperTopic.cpp

3
libp2p/Common.h

@ -239,11 +239,12 @@ public:
~DeadlineOps() { stop(); } ~DeadlineOps() { stop(); }
void schedule(unsigned _msInFuture, std::function<void(boost::system::error_code const&)> const& _f) { if (m_stopped) return; DEV_GUARDED(x_timers) m_timers.emplace_back(m_io, _msInFuture, _f); } void schedule(unsigned _msInFuture, std::function<void(boost::system::error_code const&)> const& _f) { if (m_stopped) return; DEV_GUARDED(x_timers) m_timers.emplace_back(m_io, _msInFuture, _f); }
void stop() { m_stopped = true; DEV_GUARDED(x_timers) m_timers.clear(); } void stop() { m_stopped = true; DEV_GUARDED(x_timers) m_timers.clear(); }
protected: protected:
void reap(); void reap();
private: private:
ba::io_service& m_io; ba::io_service& m_io;
unsigned m_reapIntervalMs; unsigned m_reapIntervalMs;

49
libwhisper/WhisperDB.cpp

@ -22,22 +22,25 @@
#include "WhisperDB.h" #include "WhisperDB.h"
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
#include <libdevcore/FileSystem.h> #include <libdevcore/FileSystem.h>
#include "WhisperHost.h"
using namespace std; using namespace std;
using namespace dev; using namespace dev;
using namespace dev::shh; using namespace dev::shh;
namespace fs = boost::filesystem; namespace fs = boost::filesystem;
WhisperDB::WhisperDB() WhisperDB::WhisperDB(string const& _type)
{ {
m_readOptions.verify_checksums = true; m_readOptions.verify_checksums = true;
string path = dev::getDataDir("shh"); string path = dev::getDataDir("shh");
fs::create_directories(path); fs::create_directories(path);
fs::permissions(path, fs::owner_all); fs::permissions(path, fs::owner_all);
path.append("\\").append(_type);
leveldb::Options op; leveldb::Options op;
op.create_if_missing = true; op.create_if_missing = true;
op.max_open_files = 256; op.max_open_files = 256;
leveldb::DB* p = nullptr; leveldb::DB* p = nullptr;
leveldb::Status status = leveldb::DB::Open(op, path + "/messages", &p); leveldb::Status status = leveldb::DB::Open(op, path, &p);
m_db.reset(p); m_db.reset(p);
if (!status.ok()) if (!status.ok())
BOOST_THROW_EXCEPTION(FailedToOpenLevelDB(status.ToString())); BOOST_THROW_EXCEPTION(FailedToOpenLevelDB(status.ToString()));
@ -79,7 +82,7 @@ void WhisperDB::kill(dev::h256 const& _key)
BOOST_THROW_EXCEPTION(FailedDeleteInLevelDB(status.ToString())); BOOST_THROW_EXCEPTION(FailedDeleteInLevelDB(status.ToString()));
} }
void WhisperDB::loadAll(std::map<h256, Envelope>& o_dst) void WhisperMessagesDB::loadAllMessages(std::map<h256, Envelope>& o_dst)
{ {
leveldb::ReadOptions op; leveldb::ReadOptions op;
op.fill_cache = false; op.fill_cache = false;
@ -135,7 +138,7 @@ void WhisperDB::loadAll(std::map<h256, Envelope>& o_dst)
} }
} }
void WhisperDB::save(h256 const& _key, Envelope const& _e) void WhisperMessagesDB::saveSingleMessage(h256 const& _key, Envelope const& _e)
{ {
try try
{ {
@ -154,3 +157,41 @@ void WhisperDB::save(h256 const& _key, Envelope const& _e)
cwarn << boost::diagnostic_information(ex); cwarn << boost::diagnostic_information(ex);
} }
} }
vector<unsigned> WhisperFiltersDB::restoreTopicsFromDB(WhisperHost* _host, h256 const& _id)
{
vector<unsigned> ret;
string raw = lookup(_id);
if (!raw.empty())
{
RLP rlp(raw);
auto sz = rlp.itemCountStrict();
for (unsigned i = 0; i < sz; ++i)
{
RLP r = rlp[i];
bytesConstRef ref(r.toBytesConstRef());
Topics topics;
unsigned num = ref.size() / h256::size;
for (unsigned j = 0; j < num; ++j)
{
h256 topic(ref.data() + j * h256::size, h256::ConstructFromPointerType());
topics.push_back(topic);
}
unsigned w = _host->installWatch(topics);
ret.push_back(w);
}
}
return ret;
}
void WhisperFiltersDB::saveTopicsToDB(WhisperHost const& _host, h256 const& _id)
{
bytes b;
RLPStream rlp;
_host.exportFilters(rlp);
rlp.swapOut(b);
insert(_id, b);
}

32
libwhisper/WhisperDB.h

@ -31,30 +31,46 @@ namespace dev
namespace shh namespace shh
{ {
struct WrongTypeLevelDB: virtual Exception {};
struct FailedToOpenLevelDB: virtual Exception { FailedToOpenLevelDB(std::string const& _message): Exception(_message) {} }; struct FailedToOpenLevelDB: virtual Exception { FailedToOpenLevelDB(std::string const& _message): Exception(_message) {} };
struct FailedInsertInLevelDB: virtual Exception { FailedInsertInLevelDB(std::string const& _message): Exception(_message) {} }; struct FailedInsertInLevelDB: virtual Exception { FailedInsertInLevelDB(std::string const& _message): Exception(_message) {} };
struct FailedLookupInLevelDB: virtual Exception { FailedLookupInLevelDB(std::string const& _message): Exception(_message) {} }; struct FailedLookupInLevelDB: virtual Exception { FailedLookupInLevelDB(std::string const& _message): Exception(_message) {} };
struct FailedDeleteInLevelDB: virtual Exception { FailedDeleteInLevelDB(std::string const& _message): Exception(_message) {} }; struct FailedDeleteInLevelDB: virtual Exception { FailedDeleteInLevelDB(std::string const& _message): Exception(_message) {} };
class WhisperHost;
class WhisperDB class WhisperDB
{ {
public: public:
WhisperDB(); WhisperDB(std::string const& _type);
~WhisperDB() {} virtual ~WhisperDB() {}
std::string lookup(dev::h256 const& _key) const; std::string lookup(dev::h256 const& _key) const;
void insert(dev::h256 const& _key, std::string const& _value); void insert(dev::h256 const& _key, std::string const& _value);
void insert(dev::h256 const& _key, bytes const& _value); void insert(dev::h256 const& _key, bytes const& _value);
void kill(dev::h256 const& _key); void kill(dev::h256 const& _key);
void loadAll(std::map<h256, Envelope>& o_dst);
void save(dev::h256 const& _key, Envelope const& _e);
private: protected:
leveldb::ReadOptions m_readOptions; leveldb::ReadOptions m_readOptions;
leveldb::WriteOptions m_writeOptions; leveldb::WriteOptions m_writeOptions;
std::unique_ptr<leveldb::DB> m_db; std::unique_ptr<leveldb::DB> m_db;
};
enum MetaInformation { StoreForeverFlag = 1, WatchedFlag = 2 }; class WhisperMessagesDB: public WhisperDB
{
public:
WhisperMessagesDB(): WhisperDB("messages") {}
virtual ~WhisperMessagesDB() {}
void loadAllMessages(std::map<h256, Envelope>& o_dst);
void saveSingleMessage(dev::h256 const& _key, Envelope const& _e);
};
class WhisperFiltersDB: public WhisperDB
{
public:
WhisperFiltersDB(): WhisperDB("filters") {}
virtual ~WhisperFiltersDB() {}
std::vector<unsigned> restoreTopicsFromDB(WhisperHost* _host, h256 const& _id);
void saveTopicsToDB(WhisperHost const& _host, h256 const& _id);
}; };
} }

90
libwhisper/WhisperHost.cpp

@ -30,7 +30,7 @@ using namespace dev;
using namespace dev::p2p; using namespace dev::p2p;
using namespace dev::shh; using namespace dev::shh;
WhisperHost::WhisperHost(bool _useDB): Worker("shh"), m_useDB(_useDB) WhisperHost::WhisperHost(bool _storeMessagesInDB): Worker("shh"), m_storeMessagesInDB(_storeMessagesInDB)
{ {
loadMessagesFromBD(); loadMessagesFromBD();
} }
@ -135,6 +135,31 @@ unsigned WhisperHost::installWatch(shh::Topics const& _t)
return ret; return ret;
} }
void WhisperHost::uninstallWatch(unsigned _i)
{
cwatshh << "XXX" << _i;
DEV_GUARDED(m_filterLock)
{
auto it = m_watches.find(_i);
if (it == m_watches.end())
return;
auto id = it->second.id;
m_watches.erase(it);
auto fit = m_filters.find(id);
if (fit != m_filters.end())
{
m_bloom.removeRaw(fit->second.filter.exportBloom());
if (!--fit->second.refCount)
m_filters.erase(fit);
}
}
noteAdvertiseTopicsOfInterest();
}
h256s WhisperHost::watchMessages(unsigned _watchId) h256s WhisperHost::watchMessages(unsigned _watchId)
{ {
h256s ret; h256s ret;
@ -156,29 +181,22 @@ h256s WhisperHost::watchMessages(unsigned _watchId)
return ret; return ret;
} }
void WhisperHost::uninstallWatch(unsigned _i) h256s WhisperHost::checkWatch(unsigned _watchId)
{ {
cwatshh << "XXX" << _i; h256s ret;
cleanup();
DEV_GUARDED(m_filterLock) dev::Guard l(m_filterLock);
try
{
ret = m_watches.at(_watchId).changes;
m_watches.at(_watchId).changes.clear();
}
catch (...)
{ {
auto it = m_watches.find(_i);
if (it == m_watches.end())
return;
auto id = it->second.id;
m_watches.erase(it);
auto fit = m_filters.find(id);
if (fit != m_filters.end())
{
m_bloom.removeRaw(fit->second.filter.exportBloom());
if (!--fit->second.refCount)
m_filters.erase(fit);
}
} }
noteAdvertiseTopicsOfInterest(); return ret;
} }
void WhisperHost::doWork() void WhisperHost::doWork()
@ -218,18 +236,18 @@ bool WhisperHost::isWatched(Envelope const& _e) const
void WhisperHost::saveMessagesToBD() void WhisperHost::saveMessagesToBD()
{ {
if (!m_useDB) if (!m_storeMessagesInDB)
return; return;
try try
{ {
WhisperDB db; WhisperMessagesDB db;
ReadGuard g(x_messages); ReadGuard g(x_messages);
unsigned now = (unsigned)time(0); unsigned now = (unsigned)time(0);
for (auto const& m: m_messages) for (auto const& m: m_messages)
if (m.second.expiry() > now) if (m.second.expiry() > now)
if (isWatched(m.second)) if (isWatched(m.second))
db.save(m.first, m.second); db.saveSingleMessage(m.first, m.second);
} }
catch(FailedToOpenLevelDB const& ex) catch(FailedToOpenLevelDB const& ex)
{ {
@ -247,14 +265,14 @@ void WhisperHost::saveMessagesToBD()
void WhisperHost::loadMessagesFromBD() void WhisperHost::loadMessagesFromBD()
{ {
if (!m_useDB) if (!m_storeMessagesInDB)
return; return;
try try
{ {
map<h256, Envelope> m; map<h256, Envelope> m;
WhisperDB db; WhisperMessagesDB db;
db.loadAll(m); db.loadAllMessages(m);
WriteGuard g(x_messages); WriteGuard g(x_messages);
m_messages.swap(m); m_messages.swap(m);
for (auto const& msg: m) for (auto const& msg: m)
@ -269,3 +287,25 @@ void WhisperHost::loadMessagesFromBD()
cwarn << "Unknown Exception in WhisperHost::loadMessagesFromBD()"; cwarn << "Unknown Exception in WhisperHost::loadMessagesFromBD()";
} }
} }
void WhisperHost::exportFilters(RLPStream& o_dst) const
{
DEV_GUARDED(m_filterLock)
{
o_dst.appendList(m_filters.size());
for (auto const& x: m_filters)
{
Topics const& topics = x.second.full;
unsigned const RawDataSize = topics.size() * h256::size;
unique_ptr<byte> p(new byte[RawDataSize]);
unsigned i = 0;
for (auto const& t: topics)
memcpy(p.get() + h256::size * i++, t.data(), h256::size);
bytesConstRef ref(p.get(), RawDataSize);
o_dst.append(ref);
}
}
}

8
libwhisper/WhisperHost.h

@ -48,7 +48,7 @@ class WhisperHost: public HostCapability<WhisperPeer>, public Interface, public
friend class WhisperPeer; friend class WhisperPeer;
public: public:
WhisperHost(bool _useDB = false); WhisperHost(bool _storeMessagesInDB = false);
virtual ~WhisperHost(); virtual ~WhisperHost();
unsigned protocolVersion() const { return WhisperProtocolVersion; } unsigned protocolVersion() const { return WhisperProtocolVersion; }
void cleanup(); ///< remove old messages void cleanup(); ///< remove old messages
@ -60,10 +60,12 @@ public:
virtual unsigned installWatch(Topics const& _filter) override; virtual unsigned installWatch(Topics const& _filter) override;
virtual void uninstallWatch(unsigned _watchId) override; virtual void uninstallWatch(unsigned _watchId) override;
virtual h256s peekWatch(unsigned _watchId) const override { dev::Guard l(m_filterLock); try { return m_watches.at(_watchId).changes; } catch (...) { return h256s(); } } virtual h256s peekWatch(unsigned _watchId) const override { dev::Guard l(m_filterLock); try { return m_watches.at(_watchId).changes; } catch (...) { return h256s(); } }
virtual h256s checkWatch(unsigned _watchId) override { cleanup(); dev::Guard l(m_filterLock); h256s ret; try { ret = m_watches.at(_watchId).changes; m_watches.at(_watchId).changes.clear(); } catch (...) {} return ret; } virtual h256s checkWatch(unsigned _watchId) override;
virtual h256s watchMessages(unsigned _watchId) override; ///< returns IDs of messages, which match specific watch criteria virtual h256s watchMessages(unsigned _watchId) override; ///< returns IDs of messages, which match specific watch criteria
virtual Envelope envelope(h256 _m) const override { try { dev::ReadGuard l(x_messages); return m_messages.at(_m); } catch (...) { return Envelope(); } } virtual Envelope envelope(h256 _m) const override { try { dev::ReadGuard l(x_messages); return m_messages.at(_m); } catch (...) { return Envelope(); } }
void exportFilters(dev::RLPStream& o_dst) const;
protected: protected:
virtual void doWork() override; virtual void doWork() override;
void noteAdvertiseTopicsOfInterest(); void noteAdvertiseTopicsOfInterest();
@ -85,7 +87,7 @@ private:
std::map<unsigned, ClientWatch> m_watches; std::map<unsigned, ClientWatch> m_watches;
TopicBloomFilter m_bloom; TopicBloomFilter m_bloom;
bool m_useDB; ///< needed for tests and other special cases bool m_storeMessagesInDB; ///< needed for tests and other special cases
}; };
} }

14
test/libp2p/peer.cpp

@ -44,8 +44,8 @@ BOOST_AUTO_TEST_CASE(host)
return; return;
VerbosityHolder setTemporaryLevel(10); VerbosityHolder setTemporaryLevel(10);
NetworkPreferences host1prefs("127.0.0.1", 30311, false); NetworkPreferences host1prefs("127.0.0.1", 30321, false);
NetworkPreferences host2prefs("127.0.0.1", 30312, false); NetworkPreferences host2prefs("127.0.0.1", 30322, false);
Host host1("Test", host1prefs); Host host1("Test", host1prefs);
Host host2("Test", host2prefs); Host host2("Test", host2prefs);
host1.start(); host1.start();
@ -97,7 +97,7 @@ BOOST_AUTO_TEST_CASE(saveNodes)
for (unsigned i = 0; i < c_nodes; ++i) for (unsigned i = 0; i < c_nodes; ++i)
{ {
Host* h = new Host("Test", NetworkPreferences("127.0.0.1", 30300 + i, false)); Host* h = new Host("Test", NetworkPreferences("127.0.0.1", 30325 + i, false));
h->setIdealPeerCount(10); h->setIdealPeerCount(10);
// starting host is required so listenport is available // starting host is required so listenport is available
h->start(); h->start();
@ -155,8 +155,8 @@ BOOST_AUTO_TEST_CASE(requirePeer)
VerbosityHolder reduceVerbosity(10); VerbosityHolder reduceVerbosity(10);
const char* const localhost = "127.0.0.1"; const char* const localhost = "127.0.0.1";
NetworkPreferences prefs1(localhost, 30301, false); NetworkPreferences prefs1(localhost, 30323, false);
NetworkPreferences prefs2(localhost, 30302, false); NetworkPreferences prefs2(localhost, 30324, false);
Host host1("Test", prefs1); Host host1("Test", prefs1);
host1.start(); host1.start();
@ -229,9 +229,9 @@ BOOST_AUTO_TEST_SUITE_END()
int peerTest(int argc, char** argv) int peerTest(int argc, char** argv)
{ {
Public remoteAlias; Public remoteAlias;
short listenPort = 30303; short listenPort = 30304;
string remoteHost; string remoteHost;
short remotePort = 30303; short remotePort = 30304;
for (int i = 1; i < argc; ++i) for (int i = 1; i < argc; ++i)
{ {

103
test/libwhisper/whisperDB.cpp

@ -28,6 +28,7 @@ along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
using namespace std; using namespace std;
using namespace dev; using namespace dev;
using namespace dev::shh; using namespace dev::shh;
using namespace dev::p2p;
struct P2PFixture struct P2PFixture
{ {
@ -47,7 +48,7 @@ BOOST_AUTO_TEST_CASE(basic)
string const text2 = "ipsum"; string const text2 = "ipsum";
h256 h1(0xBEEF); h256 h1(0xBEEF);
h256 h2(0xC0FFEE); h256 h2(0xC0FFEE);
WhisperDB db; WhisperMessagesDB db;
db.kill(h1); db.kill(h1);
db.kill(h2); db.kill(h2);
@ -94,7 +95,7 @@ BOOST_AUTO_TEST_CASE(persistence)
h256 const h2(0xBADD00DE); h256 const h2(0xBADD00DE);
{ {
WhisperDB db; WhisperMessagesDB db;
db.kill(h1); db.kill(h1);
db.kill(h2); db.kill(h2);
s = db.lookup(h1); s = db.lookup(h1);
@ -109,7 +110,7 @@ BOOST_AUTO_TEST_CASE(persistence)
this_thread::sleep_for(chrono::milliseconds(20)); this_thread::sleep_for(chrono::milliseconds(20));
{ {
WhisperDB db; WhisperMessagesDB db;
db.insert(h1, text1); db.insert(h1, text1);
db.insert(h2, text2); db.insert(h2, text2);
} }
@ -117,7 +118,7 @@ BOOST_AUTO_TEST_CASE(persistence)
this_thread::sleep_for(chrono::milliseconds(20)); this_thread::sleep_for(chrono::milliseconds(20));
{ {
WhisperDB db; WhisperMessagesDB db;
s = db.lookup(h2); s = db.lookup(h2);
BOOST_REQUIRE(!s.compare(text2)); BOOST_REQUIRE(!s.compare(text2));
s = db.lookup(h1); s = db.lookup(h1);
@ -169,7 +170,7 @@ BOOST_AUTO_TEST_CASE(messages)
} }
} }
WhisperDB db; WhisperMessagesDB db;
unsigned x = 0; unsigned x = 0;
for (auto i: m1) for (auto i: m1)
@ -190,7 +191,7 @@ BOOST_AUTO_TEST_CASE(corruptedData)
h256 x = h256::random(); h256 x = h256::random();
{ {
WhisperDB db; WhisperMessagesDB db;
db.insert(x, "this is a test input, representing corrupt data"); db.insert(x, "this is a test input, representing corrupt data");
} }
@ -202,10 +203,98 @@ BOOST_AUTO_TEST_CASE(corruptedData)
} }
{ {
WhisperDB db; WhisperMessagesDB db;
string s = db.lookup(x); string s = db.lookup(x);
BOOST_REQUIRE(s.empty()); BOOST_REQUIRE(s.empty());
} }
} }
BOOST_AUTO_TEST_CASE(filters)
{
cnote << "Testing filters saving...";
VerbosityHolder setTemporaryLevel(2);
h256 persistID(0xC0FFEE);
{
WhisperFiltersDB db;
p2p::Host h("Test");
auto wh = h.registerCapability(new WhisperHost());
wh->installWatch(BuildTopic("t1"));
wh->installWatch(BuildTopic("t2"));
db.saveTopicsToDB(*wh, persistID);
}
uint16_t port1 = 30308;
unsigned const step = 10;
bool host1Ready = false;
bool sent = false;
unsigned result = 0;
unsigned messageCount = 0;
Host host1("Test", NetworkPreferences("127.0.0.1", port1, false));
host1.setIdealPeerCount(1);
auto whost1 = host1.registerCapability(new WhisperHost());
host1.start();
WhisperFiltersDB db;
auto watches = db.restoreTopicsFromDB(whost1.get(), persistID);
auto zero = db.restoreTopicsFromDB(whost1.get(), ++persistID);
BOOST_REQUIRE(!watches.empty());
BOOST_REQUIRE(zero.empty());
std::thread listener([&]()
{
setThreadName("other");
host1Ready = true;
for (unsigned i = 0; i < 16000 && !sent; i += step)
this_thread::sleep_for(chrono::milliseconds(step));
for (unsigned j = 0; j < 200 && messageCount < 2; ++j)
{
for (unsigned id: watches)
for (auto const& e: whost1->checkWatch(id))
{
Message msg = whost1->envelope(e).open(whost1->fullTopics(id));
unsigned x = RLP(msg.payload()).toInt<unsigned>();
cnote << "New message:" << x;
result += x;
++messageCount;
}
this_thread::sleep_for(chrono::milliseconds(50));
}
});
Host host2("Test", NetworkPreferences("127.0.0.1", 30309, false));
host2.setIdealPeerCount(1);
auto whost2 = host2.registerCapability(new WhisperHost());
host2.start();
for (unsigned i = 0; i < 3000 && !host1.haveNetwork(); i += step)
this_thread::sleep_for(chrono::milliseconds(step));
host2.requirePeer(host1.id(), NodeIPEndpoint(bi::address::from_string("127.0.0.1"), port1, port1));
for (unsigned i = 0; i < 3000 && !host1Ready; i += step)
this_thread::sleep_for(chrono::milliseconds(step));
for (unsigned i = 0; i < 3000 && (!host1.peerCount() || !host2.peerCount()); i += step)
this_thread::sleep_for(chrono::milliseconds(step));
unsigned ttl = 777000;
whost2->post(RLPStream().append(8).out(), BuildTopic("t8"), ttl);
whost2->post(RLPStream().append(4).out(), BuildTopic("t4"), ttl);
whost2->post(RLPStream().append(1).out(), BuildTopic("t1"), ttl);
whost2->post(RLPStream().append(2).out(), BuildTopic("t2"), ttl);
whost2->post(RLPStream().append(16).out(), BuildTopic("t16"), ttl);
sent = true;
for (unsigned i = 0; i < 3000 && messageCount < 2; i += step)
this_thread::sleep_for(chrono::milliseconds(step));
listener.join();
BOOST_REQUIRE_EQUAL(messageCount, 2);
BOOST_REQUIRE_EQUAL(result, 3);
}
BOOST_AUTO_TEST_SUITE_END() BOOST_AUTO_TEST_SUITE_END()

39
test/libwhisper/whisperTopic.cpp

@ -49,7 +49,8 @@ BOOST_AUTO_TEST_CASE(topic)
cnote << "Testing Whisper..."; cnote << "Testing Whisper...";
VerbosityHolder setTemporaryLevel(0); VerbosityHolder setTemporaryLevel(0);
Host host1("Test", NetworkPreferences("127.0.0.1", 30303, false)); uint16_t port1 = 30311;
Host host1("Test", NetworkPreferences("127.0.0.1", port1, false));
host1.setIdealPeerCount(1); host1.setIdealPeerCount(1);
auto whost1 = host1.registerCapability(new WhisperHost()); auto whost1 = host1.registerCapability(new WhisperHost());
host1.start(); host1.start();
@ -80,14 +81,14 @@ BOOST_AUTO_TEST_CASE(topic)
} }
}); });
Host host2("Test", NetworkPreferences("127.0.0.1", 30300, false)); Host host2("Test", NetworkPreferences("127.0.0.1", 30310, false));
host1.setIdealPeerCount(1); host1.setIdealPeerCount(1);
auto whost2 = host2.registerCapability(new WhisperHost()); auto whost2 = host2.registerCapability(new WhisperHost());
host2.start(); host2.start();
while (!host1.haveNetwork()) while (!host1.haveNetwork())
this_thread::sleep_for(chrono::milliseconds(5)); this_thread::sleep_for(chrono::milliseconds(5));
host2.addNode(host1.id(), NodeIPEndpoint(bi::address::from_string("127.0.0.1"), 30303, 30303)); host2.addNode(host1.id(), NodeIPEndpoint(bi::address::from_string("127.0.0.1"), port1, port1));
// wait for nodes to connect // wait for nodes to connect
this_thread::sleep_for(chrono::milliseconds(1000)); this_thread::sleep_for(chrono::milliseconds(1000));
@ -115,7 +116,8 @@ BOOST_AUTO_TEST_CASE(forwarding)
VerbosityHolder setTemporaryLevel(0); VerbosityHolder setTemporaryLevel(0);
// Host must be configured not to share peers. // Host must be configured not to share peers.
Host host1("Listner", NetworkPreferences("127.0.0.1", 30303, false)); uint16_t port1 = 30312;
Host host1("Listner", NetworkPreferences("127.0.0.1", port1, false));
host1.setIdealPeerCount(1); host1.setIdealPeerCount(1);
auto whost1 = host1.registerCapability(new WhisperHost()); auto whost1 = host1.registerCapability(new WhisperHost());
host1.start(); host1.start();
@ -150,7 +152,8 @@ BOOST_AUTO_TEST_CASE(forwarding)
// Host must be configured not to share peers. // Host must be configured not to share peers.
Host host2("Forwarder", NetworkPreferences("127.0.0.1", 30305, false)); uint16_t port2 = 30313;
Host host2("Forwarder", NetworkPreferences("127.0.0.1", port2, false));
host2.setIdealPeerCount(1); host2.setIdealPeerCount(1);
auto whost2 = host2.registerCapability(new WhisperHost()); auto whost2 = host2.registerCapability(new WhisperHost());
host2.start(); host2.start();
@ -167,7 +170,7 @@ BOOST_AUTO_TEST_CASE(forwarding)
this_thread::sleep_for(chrono::milliseconds(50)); this_thread::sleep_for(chrono::milliseconds(50));
this_thread::sleep_for(chrono::milliseconds(500)); this_thread::sleep_for(chrono::milliseconds(500));
host2.addNode(host1.id(), NodeIPEndpoint(bi::address::from_string("127.0.0.1"), 30303, 30303)); host2.addNode(host1.id(), NodeIPEndpoint(bi::address::from_string("127.0.0.1"), port1, port1));
startedForwarder = true; startedForwarder = true;
@ -188,11 +191,11 @@ BOOST_AUTO_TEST_CASE(forwarding)
while (!startedForwarder) while (!startedForwarder)
this_thread::sleep_for(chrono::milliseconds(50)); this_thread::sleep_for(chrono::milliseconds(50));
Host ph("Sender", NetworkPreferences("127.0.0.1", 30300, false)); Host ph("Sender", NetworkPreferences("127.0.0.1", 30314, false));
ph.setIdealPeerCount(1); ph.setIdealPeerCount(1);
shared_ptr<WhisperHost> wh = ph.registerCapability(new WhisperHost()); shared_ptr<WhisperHost> wh = ph.registerCapability(new WhisperHost());
ph.start(); ph.start();
ph.addNode(host2.id(), NodeIPEndpoint(bi::address::from_string("127.0.0.1"), 30305, 30305)); ph.addNode(host2.id(), NodeIPEndpoint(bi::address::from_string("127.0.0.1"), port2, port2));
while (!ph.haveNetwork()) while (!ph.haveNetwork())
this_thread::sleep_for(chrono::milliseconds(10)); this_thread::sleep_for(chrono::milliseconds(10));
@ -221,7 +224,8 @@ BOOST_AUTO_TEST_CASE(asyncforwarding)
bool done = false; bool done = false;
// Host must be configured not to share peers. // Host must be configured not to share peers.
Host host1("Forwarder", NetworkPreferences("127.0.0.1", 30305, false)); uint16_t port1 = 30315;
Host host1("Forwarder", NetworkPreferences("127.0.0.1", port1, false));
host1.setIdealPeerCount(1); host1.setIdealPeerCount(1);
auto whost1 = host1.registerCapability(new WhisperHost()); auto whost1 = host1.registerCapability(new WhisperHost());
host1.start(); host1.start();
@ -250,14 +254,14 @@ BOOST_AUTO_TEST_CASE(asyncforwarding)
this_thread::sleep_for(chrono::milliseconds(2)); this_thread::sleep_for(chrono::milliseconds(2));
{ {
Host host2("Sender", NetworkPreferences("127.0.0.1", 30300, false)); Host host2("Sender", NetworkPreferences("127.0.0.1", 30316, false));
host2.setIdealPeerCount(1); host2.setIdealPeerCount(1);
shared_ptr<WhisperHost> whost2 = host2.registerCapability(new WhisperHost()); shared_ptr<WhisperHost> whost2 = host2.registerCapability(new WhisperHost());
host2.start(); host2.start();
while (!host2.haveNetwork()) while (!host2.haveNetwork())
this_thread::sleep_for(chrono::milliseconds(2)); this_thread::sleep_for(chrono::milliseconds(2));
host2.requirePeer(host1.id(), NodeIPEndpoint(bi::address::from_string("127.0.0.1"), 30305, 30305)); host2.requirePeer(host1.id(), NodeIPEndpoint(bi::address::from_string("127.0.0.1"), port1, port1));
while (!host2.peerCount() || !host1.peerCount()) while (!host2.peerCount() || !host1.peerCount())
this_thread::sleep_for(chrono::milliseconds(5)); this_thread::sleep_for(chrono::milliseconds(5));
@ -267,7 +271,7 @@ BOOST_AUTO_TEST_CASE(asyncforwarding)
} }
{ {
Host ph("Listener", NetworkPreferences("127.0.0.1", 30300, false)); Host ph("Listener", NetworkPreferences("127.0.0.1", 30317, false));
ph.setIdealPeerCount(1); ph.setIdealPeerCount(1);
shared_ptr<WhisperHost> wh = ph.registerCapability(new WhisperHost()); shared_ptr<WhisperHost> wh = ph.registerCapability(new WhisperHost());
ph.start(); ph.start();
@ -275,7 +279,7 @@ BOOST_AUTO_TEST_CASE(asyncforwarding)
this_thread::sleep_for(chrono::milliseconds(2)); this_thread::sleep_for(chrono::milliseconds(2));
auto w = wh->installWatch(BuildTopicMask("test")); // only interested in odd packets auto w = wh->installWatch(BuildTopicMask("test")); // only interested in odd packets
ph.requirePeer(host1.id(), NodeIPEndpoint(bi::address::from_string("127.0.0.1"), 30305, 30305)); ph.requirePeer(host1.id(), NodeIPEndpoint(bi::address::from_string("127.0.0.1"), port1, port1));
for (int i = 0; i < 200 && !result; ++i) for (int i = 0; i < 200 && !result; ++i)
{ {
@ -303,14 +307,15 @@ BOOST_AUTO_TEST_CASE(topicAdvertising)
cnote << "Testing Topic Advertising..."; cnote << "Testing Topic Advertising...";
VerbosityHolder setTemporaryLevel(2); VerbosityHolder setTemporaryLevel(2);
Host host1("first", NetworkPreferences("127.0.0.1", 30303, false)); Host host1("first", NetworkPreferences("127.0.0.1", 30319, false));
host1.setIdealPeerCount(1); host1.setIdealPeerCount(1);
auto whost1 = host1.registerCapability(new WhisperHost()); auto whost1 = host1.registerCapability(new WhisperHost());
host1.start(); host1.start();
while (!host1.haveNetwork()) while (!host1.haveNetwork())
this_thread::sleep_for(chrono::milliseconds(10)); this_thread::sleep_for(chrono::milliseconds(10));
Host host2("second", NetworkPreferences("127.0.0.1", 30305, false)); uint16_t port2 = 30318;
Host host2("second", NetworkPreferences("127.0.0.1", port2, false));
host2.setIdealPeerCount(1); host2.setIdealPeerCount(1);
auto whost2 = host2.registerCapability(new WhisperHost()); auto whost2 = host2.registerCapability(new WhisperHost());
unsigned w2 = whost2->installWatch(BuildTopicMask("test2")); unsigned w2 = whost2->installWatch(BuildTopicMask("test2"));
@ -319,7 +324,7 @@ BOOST_AUTO_TEST_CASE(topicAdvertising)
while (!host2.haveNetwork()) while (!host2.haveNetwork())
this_thread::sleep_for(chrono::milliseconds(10)); this_thread::sleep_for(chrono::milliseconds(10));
host1.addNode(host2.id(), NodeIPEndpoint(bi::address::from_string("127.0.0.1"), 30305, 30305)); host1.addNode(host2.id(), NodeIPEndpoint(bi::address::from_string("127.0.0.1"), port2, port2));
while (!host1.haveNetwork()) while (!host1.haveNetwork())
this_thread::sleep_for(chrono::milliseconds(10)); this_thread::sleep_for(chrono::milliseconds(10));
@ -385,7 +390,7 @@ BOOST_AUTO_TEST_CASE(selfAddressed)
char const* text = "deterministic pseudorandom test"; char const* text = "deterministic pseudorandom test";
BuildTopicMask mask(text); BuildTopicMask mask(text);
Host host("first", NetworkPreferences("127.0.0.1", 30305, false)); Host host("first", NetworkPreferences("127.0.0.1", 30320, false));
auto wh = host.registerCapability(new WhisperHost()); auto wh = host.registerCapability(new WhisperHost());
auto watch = wh->installWatch(BuildTopicMask(text)); auto watch = wh->installWatch(BuildTopicMask(text));

Loading…
Cancel
Save