Browse Source

updates and fixes for code review

cl-refactor
subtly 10 years ago
parent
commit
98a2d193c2
  1. 2
      libp2p/Common.h
  2. 62
      libp2p/Host.cpp
  3. 22
      libp2p/Host.h
  4. 28
      libp2p/NodeTable.cpp
  5. 26
      libp2p/NodeTable.h
  6. 4
      libp2p/Session.cpp
  7. 31
      libp2p/UDP.cpp
  8. 16
      test/peer.cpp
  9. 28
      test/whisperTopic.cpp

2
libp2p/Common.h

@ -148,7 +148,7 @@ struct NodeIPEndpoint
bi::udp::endpoint udp;
bi::tcp::endpoint tcp;
operator bool() const { return udp.address().is_unspecified() && tcp.address().is_unspecified(); }
operator bool() const { return !udp.address().is_unspecified() || !tcp.address().is_unspecified(); }
};
struct Node

62
libp2p/Host.cpp

@ -40,7 +40,7 @@ using namespace dev::p2p;
HostNodeTableHandler::HostNodeTableHandler(Host& _host): m_host(_host) {}
void HostNodeTableHandler::processEvent(NodeId _n, NodeTableEventType _e)
void HostNodeTableHandler::processEvent(NodeId const& _n, NodeTableEventType const& _e)
{
m_host.onNodeTableEvent(_n, _e);
}
@ -158,6 +158,8 @@ void Host::registerPeer(std::shared_ptr<Session> _s, CapDescs const& _caps)
{
{
RecursiveGuard l(x_sessions);
// TODO: temporary loose-coupling; if m_peers already has peer,
// it is same as _s->m_peer. (fixing next PR)
if (!m_peers.count(_s->m_peer->id))
m_peers[_s->m_peer->id] = _s->m_peer;
m_sessions[_s->m_peer->id] = _s;
@ -172,7 +174,7 @@ void Host::registerPeer(std::shared_ptr<Session> _s, CapDescs const& _caps)
}
}
void Host::onNodeTableEvent(NodeId _n, NodeTableEventType _e)
void Host::onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e)
{
if (_e == NodeEntryAdded)
@ -320,6 +322,7 @@ void Host::runAcceptor()
}
}
// asio doesn't close socket on error
if (!success && s->is_open())
{
boost::system::error_code ec;
@ -393,25 +396,25 @@ void Host::addNode(NodeId const& _node, std::string const& _addr, unsigned short
addNode(Node(_node, NodeIPEndpoint(bi::udp::endpoint(addr, _udpNodePort), bi::tcp::endpoint(addr, _tcpPeerPort))));
}
void Host::connect(std::shared_ptr<Peer> const& _n)
void Host::connect(std::shared_ptr<Peer> const& _p)
{
if (!m_run)
return;
if (havePeerSession(_n->id))
if (havePeerSession(_p->id))
{
clog(NetWarn) << "Aborted connect. Node already connected.";
return;
}
if (!m_nodeTable->haveNode(_n->id))
if (!m_nodeTable->haveNode(_p->id))
{
clog(NetWarn) << "Aborted connect. Node not in node table.";
return;
}
// prevent concurrently connecting to a node
Peer *nptr = _n.get();
Peer *nptr = _p.get();
{
Guard l(x_pendingNodeConns);
if (m_pendingNodeConns.count(nptr))
@ -419,22 +422,22 @@ void Host::connect(std::shared_ptr<Peer> const& _n)
m_pendingNodeConns.insert(nptr);
}
clog(NetConnect) << "Attempting connection to node" << _n->id.abridged() << "@" << _n->peerEndpoint() << "from" << id().abridged();
clog(NetConnect) << "Attempting connection to node" << _p->id.abridged() << "@" << _p->peerEndpoint() << "from" << id().abridged();
bi::tcp::socket* s = new bi::tcp::socket(m_ioService);
s->async_connect(_n->peerEndpoint(), [=](boost::system::error_code const& ec)
s->async_connect(_p->peerEndpoint(), [=](boost::system::error_code const& ec)
{
if (ec)
{
clog(NetConnect) << "Connection refused to node" << _n->id.abridged() << "@" << _n->peerEndpoint() << "(" << ec.message() << ")";
_n->lastDisconnect = TCPError;
_n->lastAttempted = std::chrono::system_clock::now();
clog(NetConnect) << "Connection refused to node" << _p->id.abridged() << "@" << _p->peerEndpoint() << "(" << ec.message() << ")";
_p->lastDisconnect = TCPError;
_p->lastAttempted = std::chrono::system_clock::now();
}
else
{
clog(NetConnect) << "Connected to" << _n->id.abridged() << "@" << _n->peerEndpoint();
clog(NetConnect) << "Connected to" << _p->id.abridged() << "@" << _p->peerEndpoint();
_n->lastConnected = std::chrono::system_clock::now();
auto ps = make_shared<Session>(this, std::move(*s), _n);
_p->lastConnected = std::chrono::system_clock::now();
auto ps = make_shared<Session>(this, std::move(*s), _p);
ps->start();
}
@ -480,8 +483,8 @@ void Host::run(boost::system::error_code const&)
if (auto pp = p.second.lock())
pp->serviceNodesRequest();
if (chrono::steady_clock::now() - m_lastPing >= chrono::seconds(30)) // ping every 30s.
keepAlivePeers();
disconnectLatePeers();
keepAlivePeers();
auto runcb = [this](boost::system::error_code const& error) { run(error); };
m_timer->expires_from_now(boost::posix_time::milliseconds(c_timerInterval));
@ -541,19 +544,29 @@ void Host::doWork()
void Host::keepAlivePeers()
{
if (chrono::steady_clock::now() < m_lastPing + c_keepAliveInterval)
return;
RecursiveGuard l(x_sessions);
for (auto p: m_sessions)
if (auto pp = p.second.lock())
{
if (chrono::steady_clock::now() - pp->m_lastReceived >= chrono::seconds(60))
pp->disconnect(PingTimeout);
else
pp->ping();
}
m_lastPing = chrono::steady_clock::now();
}
void Host::disconnectLatePeers()
{
if (chrono::steady_clock::now() < m_lastPing + c_keepAliveTimeOut)
return;
RecursiveGuard l(x_sessions);
for (auto p: m_sessions)
if (auto pp = p.second.lock())
if (pp->m_lastReceived < m_lastPing + c_keepAliveTimeOut)
pp->disconnect(PingTimeout);
}
bytes Host::saveNodes() const
{
RLPStream nodes;
@ -563,7 +576,9 @@ bytes Host::saveNodes() const
for (auto const& i: m_peers)
{
Peer const& n = *(i.second);
// TODO: PoC-7: Figure out why it ever shares these ports.//n.address.port() >= 30300 && n.address.port() <= 30305 &&
// TODO: alpha: Figure out why it ever shares these ports.//n.address.port() >= 30300 && n.address.port() <= 30305 &&
// TODO: alpha: if/how to save private addresses
// Only save peers which have connected within 2 days, with properly-advertised port and public IP address
if (chrono::system_clock::now() - n.lastConnected < chrono::seconds(3600 * 48) && n.peerEndpoint().port() > 0 && n.peerEndpoint().port() < /*49152*/32768 && n.id != id() && !isPrivateAddress(n.peerEndpoint().address()))
{
nodes.appendList(10);
@ -571,7 +586,8 @@ bytes Host::saveNodes() const
nodes << n.peerEndpoint().address().to_v4().to_bytes();
else
nodes << n.peerEndpoint().address().to_v6().to_bytes();
nodes << n.peerEndpoint().port() << n.id /* << (int)n.idOrigin */ << 0
// TODO: alpha: replace 0 with trust-state of node
nodes << n.peerEndpoint().port() << n.id << 0
<< chrono::duration_cast<chrono::seconds>(n.lastConnected.time_since_epoch()).count()
<< chrono::duration_cast<chrono::seconds>(n.lastAttempted.time_since_epoch()).count()
<< n.failedAttempts << (unsigned)n.lastDisconnect << n.score << n.rating;

22
libp2p/Host.h

@ -101,7 +101,7 @@ class HostNodeTableHandler: public NodeTableEventHandler
{
friend class Host;
HostNodeTableHandler(Host& _host);
virtual void processEvent(NodeId _n, NodeTableEventType _e);
virtual void processEvent(NodeId const& _n, NodeTableEventType const& _e);
Host& m_host;
};
@ -111,9 +111,11 @@ class HostNodeTableHandler: public NodeTableEventHandler
*
* @todo onNodeTableEvent: move peer-connection logic into ensurePeers
* @todo handshake: gracefully disconnect peer if peer already connected
* @todo abstract socket -> IPConnection
* @todo determinePublic: ipv6, udp
* @todo handle conflict if addNode/requireNode called and Node already exists w/conflicting tcp or udp port
* @todo write host identifier to disk w/nodes
* @todo per-session keepalive/ping instead of broadcast; set ping-timeout via median-latency
*/
class Host: public Worker
{
@ -128,6 +130,12 @@ public:
/// Will block on network process events.
virtual ~Host();
/// Interval at which Host::run will call keepAlivePeers to ping peers.
std::chrono::seconds const c_keepAliveInterval = std::chrono::seconds(30);
/// Disconnect timeout after failure to respond to keepAlivePeers ping.
std::chrono::seconds const c_keepAliveTimeOut = std::chrono::seconds(1);
/// Default host for current version of client.
static std::string pocHost();
@ -141,9 +149,8 @@ public:
CapDescs caps() const { CapDescs ret; for (auto const& i: m_capabilities) ret.push_back(i.first); return ret; }
template <class T> std::shared_ptr<T> cap() const { try { return std::static_pointer_cast<T>(m_capabilities.at(std::make_pair(T::staticName(), T::staticVersion()))); } catch (...) { return nullptr; } }
bool havePeerSession(NodeId _id) { RecursiveGuard l(x_sessions); if (m_sessions.count(_id)) return !!m_sessions[_id].lock(); else return false; }
bool havePeerSession(NodeId _id) { RecursiveGuard l(x_sessions); return m_sessions.count(_id) ? !!m_sessions[_id].lock() : false; }
/// Add node.
void addNode(NodeId const& _node, std::string const& _addr, unsigned short _tcpPort, unsigned short _udpPort);
/// Set ideal number of peers.
@ -187,21 +194,24 @@ public:
void registerPeer(std::shared_ptr<Session> _s, CapDescs const& _caps);
protected:
void onNodeTableEvent(NodeId _n, NodeTableEventType _e);
void onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e);
private:
/// Populate m_peerAddresses with available public addresses.
void determinePublic(std::string const& _publicAddress, bool _upnp);
void connect(std::shared_ptr<Peer> const& _n);
void connect(std::shared_ptr<Peer> const& _p);
/// Ping the peers to update the latency information and disconnect peers which have timed out.
void keepAlivePeers();
/// Disconnect peers which didn't respond to keepAlivePeers ping prior to c_keepAliveTimeOut.
void disconnectLatePeers();
/// Called only from startedWorking().
void runAcceptor();
/// Handler for verifying handshake siganture before creating session. _nodeId is passed for outbound connections.
/// Handler for verifying handshake siganture before creating session. _nodeId is passed for outbound connections. If successful, socket is moved to Session via std::move.
void doHandshake(bi::tcp::socket* _socket, NodeId _nodeId = NodeId());
void seal(bytes& _b);

28
libp2p/NodeTable.cpp

@ -55,8 +55,8 @@ NodeTable::~NodeTable()
void NodeTable::processEvents()
{
if (m_nodeEvents)
m_nodeEvents->processEvents();
if (m_nodeEventHandler)
m_nodeEventHandler->processEvents();
}
shared_ptr<NodeEntry> NodeTable::addNode(Public const& _pubk, bi::udp::endpoint const& _udp, bi::tcp::endpoint const& _tcp)
@ -80,8 +80,8 @@ shared_ptr<NodeEntry> NodeTable::addNode(Node const& _node)
else
{
clog(NodeTableNote) << "p2p.nodes.add " << _node.id.abridged();
if (m_nodeEvents)
m_nodeEvents->appendEvent(_node.id, NodeEntryAdded);
if (m_nodeEventHandler)
m_nodeEventHandler->appendEvent(_node.id, NodeEntryAdded);
ret.reset(new NodeEntry(m_node, _node.id, NodeIPEndpoint(_node.endpoint.udp, _node.endpoint.tcp)));
m_nodes[_node.id] = ret;
@ -341,8 +341,8 @@ void NodeTable::dropNode(shared_ptr<NodeEntry> _n)
}
clog(NodeTableNote) << "p2p.nodes.drop " << _n->id.abridged();
if (m_nodeEvents)
m_nodeEvents->appendEvent(_n->id, NodeEntryRemoved);
if (m_nodeEventHandler)
m_nodeEventHandler->appendEvent(_n->id, NodeEntryRemoved);
}
NodeTable::NodeBucket& NodeTable::bucket(NodeEntry const* _n)
@ -367,23 +367,25 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
return;
}
bytesConstRef rlpBytes(hashedBytes.cropped(Signature::size, hashedBytes.size() - Signature::size));
bytesConstRef signedBytes(hashedBytes.cropped(Signature::size, hashedBytes.size() - Signature::size));
// todo: verify sig via known-nodeid and MDC, or, do ping/pong auth if node/endpoint is unknown/untrusted
bytesConstRef sigBytes(_packet.cropped(h256::size, Signature::size));
Public nodeid(dev::recover(*(Signature const*)sigBytes.data(), sha3(rlpBytes)));
Public nodeid(dev::recover(*(Signature const*)sigBytes.data(), sha3(signedBytes)));
if (!nodeid)
{
clog(NodeTableMessageSummary) << "Invalid Message signature from " << _from.address().to_string() << ":" << _from.port();
return;
}
if (rlpBytes[0] && rlpBytes[0] < 4)
unsigned packetType = signedBytes[0];
if (packetType && packetType < 4)
noteNode(nodeid, _from);
// todo: switch packet-type
RLP rlp(rlpBytes.cropped(1, rlpBytes.size() - 1));
bytesConstRef rlpBytes(_packet.cropped(h256::size + Signature::size + 1));
RLP rlp(rlpBytes);
unsigned itemCount = rlp.itemCount();
try {
switch (itemCount)
@ -404,7 +406,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
if (auto n = (*this)[it->first.first])
addNode(n);
m_evictions.erase(it);
it = m_evictions.erase(it);
}
break;
@ -447,7 +449,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
}
default:
clog(NodeTableMessageSummary) << "Invalid Message, " << std::hex << rlpBytes[0] << ", received from " << _from.address().to_string() << ":" << _from.port();
clog(NodeTableWarn) << "Invalid Message, " << hex << packetType << ", received from " << _from.address().to_string() << ":" << dec << _from.port();
return;
}
}
@ -456,7 +458,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
clog(NodeTableWarn) << "Exception processing message from " << _from.address().to_string() << ":" << _from.port();
}
}
void NodeTable::doCheckEvictions(boost::system::error_code const& _ec)
{
if (_ec || !m_socketPtr->isOpen())

26
libp2p/NodeTable.h

@ -41,7 +41,7 @@ struct NodeEntry: public Node
NodeEntry(Node _src, Public _pubk, NodeIPEndpoint _gw);
NodeEntry(Node _src, Public _pubk, bi::udp::endpoint _udp);
const unsigned distance; ///< Node's distance (xor of _src as integer).
unsigned const distance; ///< Node's distance (xor of _src as integer).
};
enum NodeTableEventType {
@ -53,20 +53,20 @@ class NodeTableEventHandler
{
friend class NodeTable;
public:
virtual void processEvent(NodeId _n, NodeTableEventType _e) =0;
virtual void processEvent(NodeId const& _n, NodeTableEventType const& _e) =0;
protected:
/// Called by NodeTable on behalf of an implementation (Host) to process new events without blocking nodetable.
void processEvents()
{
std::list<std::pair<NodeId,NodeTableEventType>> events;
std::list<std::pair<NodeId, NodeTableEventType>> events;
{
Guard l(x_events);
if (!m_nodeEvents.size())
if (!m_nodeEventHandler.size())
return;
m_nodeEvents.unique();
for (auto const& n: m_nodeEvents) events.push_back(std::make_pair(n,m_events[n]));
m_nodeEvents.clear();
m_nodeEventHandler.unique();
for (auto const& n: m_nodeEventHandler) events.push_back(std::make_pair(n,m_events[n]));
m_nodeEventHandler.clear();
m_events.clear();
}
for (auto const& e: events)
@ -74,11 +74,11 @@ protected:
}
/// Called by NodeTable to append event.
virtual void appendEvent(NodeId _n, NodeTableEventType _e) { Guard l(x_events); m_nodeEvents.push_back(_n); m_events[_n] = _e; }
virtual void appendEvent(NodeId _n, NodeTableEventType _e) { Guard l(x_events); m_nodeEventHandler.push_back(_n); m_events[_n] = _e; }
Mutex x_events;
std::list<NodeId> m_nodeEvents;
std::map<NodeId,NodeTableEventType> m_events;
std::list<NodeId> m_nodeEventHandler;
std::map<NodeId, NodeTableEventType> m_events;
};
/**
@ -143,7 +143,7 @@ public:
static unsigned dist(NodeId const& _a, NodeId const& _b) { u512 d = _a ^ _b; unsigned ret; for (ret = 0; d >>= 1; ++ret) {}; return ret; }
/// Set event handler for NodeEntryAdded and NodeEntryRemoved events.
void setEventHandler(NodeTableEventHandler* _handler) { m_nodeEvents.reset(_handler); }
void setEventHandler(NodeTableEventHandler* _handler) { m_nodeEventHandler.reset(_handler); }
/// Called by implementation which provided handler to process NodeEntryAdded/NodeEntryRemoved events. Events are coalesced by type whereby old events are ignored.
void processEvents();
@ -160,7 +160,7 @@ public:
std::list<NodeId> nodes() const;
std::list<NodeEntry> state() const;
bool haveNode(NodeId _id) { Guard l(x_nodes); return !!m_nodes[_id]; }
bool haveNode(NodeId _id) { Guard l(x_nodes); return m_nodes.count(_id); }
Node operator[](NodeId _id);
std::shared_ptr<NodeEntry> getNodeEntry(NodeId _id);
@ -212,7 +212,7 @@ protected:
/// Sends FindNeighbor packet. See doFindNode.
void requestNeighbours(NodeEntry const& _node, NodeId _target) const;
std::unique_ptr<NodeTableEventHandler> m_nodeEvents; ///< Event handler for node events.
std::unique_ptr<NodeTableEventHandler> m_nodeEventHandler; ///< Event handler for node events.
Node m_node; ///< This node.
Secret m_secret; ///< This nodes secret key.

4
libp2p/Session.cpp

@ -286,7 +286,7 @@ bool Session::interpret(RLP const& _r)
clogS(NetAllDetail) << "Checking: " << ep << "(" << id.abridged() << ")";
// clogS(NetAllDetail) << "Checking: " << ep << "(" << id.abridged() << ")" << isPrivateAddress(peerAddress) << this->id().abridged() << isPrivateAddress(endpoint().address()) << m_server->m_peers.count(id) << (m_server->m_peers.count(id) ? isPrivateAddress(m_server->m_peers.at(id)->address.address()) : -1);
// ignore if dist(us,item) - dist(us,them) > 1
// todo: draft spec: ignore if dist(us,item) - dist(us,them) > 1
// TODO: isPrivate
if (!m_server->m_netPrefs.localNetworking && isPrivateAddress(peerAddress))
@ -309,7 +309,7 @@ bool Session::interpret(RLP const& _r)
// OK passed all our checks. Assume it's good.
addRating(1000);
m_server->addNode(Node(id, NodeIPEndpoint(bi::udp::endpoint(ep.address(), 30303), ep)));
m_server->addNode(Node(id, NodeIPEndpoint(bi::udp::endpoint(ep.address(), ep.port()), ep)));
clogS(NetTriviaDetail) << "New peer: " << ep << "(" << id .abridged()<< ")";
CONTINUE:;
LAMEPEER:;

31
libp2p/UDP.cpp

@ -29,29 +29,26 @@ h256 RLPXDatagramFace::sign(Secret const& _k)
RLPStream rlpxstream;
// rlpxstream.appendRaw(toPublic(_k).asBytes()); // for mdc-based signature
rlpxstream.appendRaw(bytes(1, packetType()));
rlpxstream.appendRaw(bytes(1, packetType())); // prefix by 1 byte for type
streamRLP(rlpxstream);
bytes rlpBytes(rlpxstream.out());
bytes rlpxBytes(rlpxstream.out());
bytesConstRef rlp(&rlpBytes);
h256 hash(dev::sha3(rlp));
Signature sig = dev::sign(_k, hash);
bytesConstRef rlpx(&rlpxBytes);
h256 sighash(dev::sha3(rlpx)); // H(type||data)
Signature sig = dev::sign(_k, sighash); // S(H(type||data))
data.resize(h256::size + Signature::size + rlp.size());
bytesConstRef packetHash(&data[0], h256::size);
bytesConstRef signedPayload(&data[h256::size], Signature::size + rlp.size());
bytesConstRef payloadSig(&data[h256::size], Signature::size);
bytesConstRef payload(&data[h256::size + Signature::size], rlp.size());
data.resize(h256::size + Signature::size + rlpx.size());
bytesConstRef rlpxHash(&data[0], h256::size);
bytesConstRef rlpxSig(&data[h256::size], Signature::size);
bytesConstRef rlpxPayload(&data[h256::size + Signature::size], rlpx.size());
sig.ref().copyTo(payloadSig);
// rlp.cropped(Public::size, rlp.size() - Public::size).copyTo(payload);
rlp.copyTo(payload);
sig.ref().copyTo(rlpxSig);
rlpx.copyTo(rlpxPayload);
// hash.ref().copyTo(packetHash); // for mdc-based signature
dev::sha3(signedPayload).ref().copyTo(packetHash);
bytesConstRef signedRLPx(&data[h256::size], data.size() - h256::size);
dev::sha3(signedRLPx).ref().copyTo(rlpxHash);
return std::move(hash);
return std::move(sighash);
};
Public RLPXDatagramFace::authenticate(bytesConstRef _sig, bytesConstRef _rlp)

16
test/peer.cpp

@ -51,10 +51,11 @@ BOOST_AUTO_TEST_SUITE_END()
int peerTest(int argc, char** argv)
{
Public remoteAlias;
short listenPort = 30303;
string remoteHost;
short remotePort = 30303;
for (int i = 1; i < argc; ++i)
{
string arg = argv[i];
@ -64,21 +65,18 @@ int peerTest(int argc, char** argv)
remoteHost = argv[++i];
else if (arg == "-p" && i + 1 < argc)
remotePort = (short)atoi(argv[++i]);
else if (arg == "-ra" && i + 1 < argc)
remoteAlias = Public(dev::fromHex(argv[++i]));
else
remoteHost = argv[i];
}
Host ph("Test", NetworkPreferences(listenPort));
if (!remoteHost.empty())
ph.addNode(NodeId(), remoteHost, remotePort, remotePort);
if (!remoteHost.empty() && !remoteAlias)
ph.addNode(remoteAlias, remoteHost, remotePort, remotePort);
// for (int i = 0; ; ++i)
// {
// this_thread::sleep_for(chrono::milliseconds(100));
// if (!(i % 10))
// ph.keepAlivePeers();
// }
this_thread::sleep_for(chrono::milliseconds(200));
return 0;
}

28
test/whisperTopic.cpp

@ -34,29 +34,27 @@ BOOST_AUTO_TEST_CASE(topic)
{
cnote << "Testing Whisper...";
auto oldLogVerbosity = g_logVerbosity;
g_logVerbosity = 0;
g_logVerbosity = 5;
Host ph1("Test", NetworkPreferences(30303, "127.0.0.1", true, true));
Host phOther("Test", NetworkPreferences(30303, "127.0.0.1", true, true));
auto whOther = phOther.registerCapability(new WhisperHost());
phOther.start();
bool started = false;
unsigned result = 0;
std::thread listener([&]()
{
setThreadName("other");
auto wh = ph1.registerCapability(new WhisperHost());
ph1.start();
started = true;
/// Only interested in odd packets
auto w = wh->installWatch(BuildTopicMask("odd"));
auto w = whOther->installWatch(BuildTopicMask("odd"));
for (int i = 0, last = 0; i < 200 && last < 81; ++i)
{
for (auto i: wh->checkWatch(w))
for (auto i: whOther->checkWatch(w))
{
Message msg = wh->envelope(i).open();
Message msg = whOther->envelope(i).open();
last = RLP(msg.payload()).toInt<unsigned>();
cnote << "New message from:" << msg.from().abridged() << RLP(msg.payload()).toInt<unsigned>();
result += last;
@ -65,16 +63,16 @@ BOOST_AUTO_TEST_CASE(topic)
}
});
while (!started)
this_thread::sleep_for(chrono::milliseconds(50));
Host ph("Test", NetworkPreferences(30300, "127.0.0.1", true, true));
auto wh = ph.registerCapability(new WhisperHost());
this_thread::sleep_for(chrono::milliseconds(500));
ph.start();
this_thread::sleep_for(chrono::milliseconds(500));
ph.addNode(ph1.id(), "127.0.0.1", 30303, 30303);
ph.addNode(phOther.id(), "127.0.0.1", 30303, 30303);
this_thread::sleep_for(chrono::milliseconds(300));
while (!started)
this_thread::sleep_for(chrono::milliseconds(25));
KeyPair us = KeyPair::create();
for (int i = 0; i < 10; ++i)
{

Loading…
Cancel
Save