Browse Source

Connectivity and nodetable callbacks. Disable stale code.

cl-refactor
subtly 10 years ago
parent
commit
75f231419a
  1. 4
      alethzero/MainWin.cpp
  2. 20
      libp2p/Host.cpp
  3. 6
      libp2p/Host.h
  4. 14
      libp2p/NodeTable.cpp
  5. 20
      libp2p/NodeTable.h
  6. 44
      libp2p/Session.cpp
  7. 7
      libp2p/Session.h
  8. 23
      test/peer.cpp

4
alethzero/MainWin.cpp

@ -930,7 +930,7 @@ void Main::refreshNetwork()
.arg(QString::fromStdString(i.id.abridged()))); .arg(QString::fromStdString(i.id.abridged())));
auto ns = web3()->nodes(); auto ns = web3()->nodes();
for (p2p::Node const& i: ns) for (p2p::PeerInfo const& i: ns)
if (!i.dead) if (!i.dead)
ui->nodes->insertItem(clients.count(i.id) ? 0 : ui->nodes->count(), QString("[%1 %3] %2 - ( =%5s | /%4s%6 ) - *%7 $%8") ui->nodes->insertItem(clients.count(i.id) ? 0 : ui->nodes->count(), QString("[%1 %3] %2 - ( =%5s | /%4s%6 ) - *%7 $%8")
.arg(QString::fromStdString(i.id.abridged())) .arg(QString::fromStdString(i.id.abridged()))
@ -940,7 +940,7 @@ void Main::refreshNetwork()
.arg(i.secondsSinceLastConnected()) .arg(i.secondsSinceLastConnected())
.arg(i.isOffline() ? " | " + QString::fromStdString(reasonOf(i.lastDisconnect)) + " | " + QString::number(i.failedAttempts) + "x" : "") .arg(i.isOffline() ? " | " + QString::fromStdString(reasonOf(i.lastDisconnect)) + " | " + QString::number(i.failedAttempts) + "x" : "")
.arg(i.rating) .arg(i.rating)
.arg((int)i.idOrigin) .arg(0 /* (int)i.idOrigin */)
); );
} }
} }

20
libp2p/Host.cpp

@ -177,6 +177,8 @@ void Host::onNodeTableEvent(NodeId _n, NodeTableEventType _e)
{ {
if (_e == NodeEntryAdded) if (_e == NodeEntryAdded)
{ {
clog(NetNote) << "p2p.host.nodeTable.events.nodeEntryAdded " << _n;
auto n = (*m_nodeTable)[_n]; auto n = (*m_nodeTable)[_n];
if (n) if (n)
{ {
@ -196,6 +198,8 @@ void Host::onNodeTableEvent(NodeId _n, NodeTableEventType _e)
} }
else if (_e == NodeEntryRemoved) else if (_e == NodeEntryRemoved)
{ {
clog(NetNote) << "p2p.host.nodeTable.events.nodeEntryRemoved " << _n;
RecursiveGuard l(x_sessions); RecursiveGuard l(x_sessions);
m_peers.erase(_n); m_peers.erase(_n);
} }
@ -294,7 +298,17 @@ void Host::runAcceptor()
{ {
try try
{ {
doHandshake(s); // RecursiveGuard l(x_sessions);
// auto p = m_peers[_n];
// if (!p)
// {
// m_peers[_n] = make_shared<PeerInfo>();
// p = m_peers[_n];
// p->id = _n;
// }
// p->address = n.endpoint.tcp;
doHandshake(s, NodeId());
success = true; success = true;
} }
catch (Exception const& _e) catch (Exception const& _e)
@ -329,6 +343,7 @@ void Host::doHandshake(bi::tcp::socket* _socket, NodeId _nodeId)
clog(NetConnect) << "Accepting connection for " << _socket->remote_endpoint(); clog(NetConnect) << "Accepting connection for " << _socket->remote_endpoint();
} catch (...){} } catch (...){}
//
auto p = std::make_shared<Session>(this, std::move(*_socket), m_peers[_nodeId]); auto p = std::make_shared<Session>(this, std::move(*_socket), m_peers[_nodeId]);
p->start(); p->start();
} }
@ -538,6 +553,8 @@ void Host::run(boost::system::error_code const&)
return; return;
} }
m_nodeTable->processEvents();
for (auto p: m_sessions) for (auto p: m_sessions)
if (auto pp = p.second.lock()) if (auto pp = p.second.lock())
pp->serviceNodesRequest(); pp->serviceNodesRequest();
@ -585,6 +602,7 @@ void Host::startedWorking()
m_nodeTable.reset(new NodeTable(m_ioService, m_key, m_listenPort)); m_nodeTable.reset(new NodeTable(m_ioService, m_key, m_listenPort));
else else
m_nodeTable.reset(new NodeTable(m_ioService, m_key, m_listenPort > 0 ? m_listenPort : 30303)); m_nodeTable.reset(new NodeTable(m_ioService, m_key, m_listenPort > 0 ? m_listenPort : 30303));
m_nodeTable->setEventHandler(new HostNodeTableHandler(*this));
} }
clog(NetNote) << "p2p.started id:" << id().abridged(); clog(NetNote) << "p2p.started id:" << id().abridged();

6
libp2p/Host.h

@ -110,10 +110,10 @@ struct PeerInfo
using Nodes = std::vector<PeerInfo>; using Nodes = std::vector<PeerInfo>;
class Host;
class HostNodeTableHandler: public NodeTableEventHandler class HostNodeTableHandler: public NodeTableEventHandler
{ {
friend class Host;
HostNodeTableHandler(Host& _host); HostNodeTableHandler(Host& _host);
virtual void processEvent(NodeId _n, NodeTableEventType _e); virtual void processEvent(NodeId _n, NodeTableEventType _e);
Host& m_host; Host& m_host;
@ -213,7 +213,7 @@ private:
void runAcceptor(); void runAcceptor();
/// Handler for verifying handshake siganture before creating session. _egressNodeId is passed for outbound connections. /// Handler for verifying handshake siganture before creating session. _egressNodeId is passed for outbound connections.
void doHandshake(bi::tcp::socket* _socket, NodeId _egressNodeId = NodeId()); void doHandshake(bi::tcp::socket* _socket, NodeId _nodeId);
void seal(bytes& _b); void seal(bytes& _b);

14
libp2p/NodeTable.cpp

@ -65,6 +65,10 @@ shared_ptr<NodeEntry> NodeTable::addNode(Node const& _node)
shared_ptr<NodeEntry> ret = m_nodes[_node.id]; shared_ptr<NodeEntry> ret = m_nodes[_node.id];
if (!ret) if (!ret)
{ {
clog(NodeTableNote) << "p2p.nodes.add " << _node.id.abridged();
if (m_nodeEvents)
m_nodeEvents->appendEvent(_node.id, NodeEntryAdded);
ret.reset(new NodeEntry(m_node, _node.id, NodeIPEndpoint(_node.endpoint.udp, _node.endpoint.tcp))); ret.reset(new NodeEntry(m_node, _node.id, NodeIPEndpoint(_node.endpoint.udp, _node.endpoint.tcp)));
m_nodes[_node.id] = ret; m_nodes[_node.id] = ret;
PingNode p(_node.endpoint.udp, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port()); PingNode p(_node.endpoint.udp, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port());
@ -317,8 +321,14 @@ void NodeTable::dropNode(shared_ptr<NodeEntry> _n)
Guard l(x_state); Guard l(x_state);
s.nodes.remove_if([&_n](weak_ptr<NodeEntry> n) { return n.lock() == _n; }); s.nodes.remove_if([&_n](weak_ptr<NodeEntry> n) { return n.lock() == _n; });
} }
Guard l(x_nodes); {
m_nodes.erase(_n->id); Guard l(x_nodes);
m_nodes.erase(_n->id);
}
clog(NodeTableNote) << "p2p.nodes.drop " << _n->id.abridged();
if (m_nodeEvents)
m_nodeEvents->appendEvent(_n->id, NodeEntryRemoved);
} }
NodeTable::NodeBucket& NodeTable::bucket(NodeEntry const* _n) NodeTable::NodeBucket& NodeTable::bucket(NodeEntry const* _n)

20
libp2p/NodeTable.h

@ -94,7 +94,20 @@ public:
protected: protected:
/// Called by NodeTable on behalf of an implementation (Host) to process new events without blocking nodetable. /// Called by NodeTable on behalf of an implementation (Host) to process new events without blocking nodetable.
void processEvents() { std::list<std::pair<NodeId,NodeTableEventType>> events; { Guard l(x_events); if (!m_nodeEvents.size()) return; m_nodeEvents.unique(); for (auto const& n: m_nodeEvents) events.push_back(std::make_pair(n,m_events[n])); m_nodeEvents.empty(); m_events.empty(); } for (auto const& e: events) processEvent(e.first, e.second); } void processEvents() {
std::list<std::pair<NodeId,NodeTableEventType>> events;
{
Guard l(x_events);
if (!m_nodeEvents.size())
return;
m_nodeEvents.unique();
for (auto const& n: m_nodeEvents) events.push_back(std::make_pair(n,m_events[n]));
m_nodeEvents.empty();
m_events.empty();
}
for (auto const& e: events)
processEvent(e.first, e.second);
}
/// Called by NodeTable to append event. /// Called by NodeTable to append event.
virtual void appendEvent(NodeId _n, NodeTableEventType _e) { Guard l(x_events); m_nodeEvents.push_back(_n); m_events[_n] = _e; } virtual void appendEvent(NodeId _n, NodeTableEventType _e) { Guard l(x_events); m_nodeEvents.push_back(_n); m_events[_n] = _e; }
@ -173,7 +186,10 @@ public:
void setEventHandler(NodeTableEventHandler* _handler) { m_nodeEvents.reset(_handler); } void setEventHandler(NodeTableEventHandler* _handler) { m_nodeEvents.reset(_handler); }
/// Called by implementation which provided handler to process NodeEntryAdded/NodeEntryRemoved events. Events are coalesced by type whereby old events are ignored. /// Called by implementation which provided handler to process NodeEntryAdded/NodeEntryRemoved events. Events are coalesced by type whereby old events are ignored.
void processEvents() { if (m_nodeEvents) m_nodeEvents->processEvents(); } void processEvents() {
if (m_nodeEvents)
m_nodeEvents->processEvents();
}
/// Add node. Node will be pinged if it's not already known. /// Add node. Node will be pinged if it's not already known.
std::shared_ptr<NodeEntry> addNode(Public const& _pubk, bi::udp::endpoint const& _udp, bi::tcp::endpoint const& _tcp = bi::tcp::endpoint()); std::shared_ptr<NodeEntry> addNode(Public const& _pubk, bi::udp::endpoint const& _udp, bi::tcp::endpoint const& _tcp = bi::tcp::endpoint());

44
libp2p/Session.cpp

@ -39,9 +39,8 @@ using namespace dev::p2p;
Session::Session(Host* _s, bi::tcp::socket _socket, std::shared_ptr<PeerInfo> const& _n): Session::Session(Host* _s, bi::tcp::socket _socket, std::shared_ptr<PeerInfo> const& _n):
m_server(_s), m_server(_s),
m_socket(std::move(_socket)), m_socket(std::move(_socket)),
m_info({m_peer->id, "?", _n->address.address().to_string(), _n->address.port(), std::chrono::steady_clock::duration(0), CapDescSet(), 0, map<string, string>()}),
m_peer(_n), m_peer(_n),
m_manualEndpoint(_n->address) m_info({NodeId(), "?", m_socket.remote_endpoint().address().to_string(), 0, std::chrono::steady_clock::duration(0), CapDescSet(), 0, map<string, string>()})
{ {
m_lastReceived = m_connect = std::chrono::steady_clock::now(); m_lastReceived = m_connect = std::chrono::steady_clock::now();
} }
@ -87,21 +86,21 @@ int Session::rating() const
return m_peer->rating; return m_peer->rating;
} }
// TODO: P2P integration: session->? should be unavailable when socket isn't open //// TODO: P2P integration: session->? should be unavailable when socket isn't open
bi::tcp::endpoint Session::endpoint() const //bi::tcp::endpoint Session::endpoint() const
{ //{
if (m_socket.is_open() && m_peer) // if (m_socket.is_open() && m_peer)
try // try
{ // {
return bi::tcp::endpoint(m_socket.remote_endpoint().address(), m_peer->address.port()); // return bi::tcp::endpoint(m_socket.remote_endpoint().address(), m_peer->address.port());
} // }
catch (...) {} // catch (...) {}
//
if (m_peer) // if (m_peer)
return m_peer->address; // return m_peer->address;
//
return m_manualEndpoint; // return m_manualEndpoint;
} //}
template <class T> vector<T> randomSelection(vector<T> const& _t, unsigned _n) template <class T> vector<T> randomSelection(vector<T> const& _t, unsigned _n)
{ {
@ -190,14 +189,11 @@ bool Session::interpret(RLP const& _r)
if (m_server->id() == id) if (m_server->id() == id)
{ {
// Already connected. // Already connected.
clogS(NetWarn) << "Connected to ourself under a false pretext. We were told this peer was id" << m_info.id.abridged(); clogS(NetWarn) << "Connected to ourself under a false pretext. We were told this peer was id" << id.abridged();
disconnect(LocalIdentity); disconnect(LocalIdentity);
return true; return true;
} }
assert(!!m_peer);
assert(!!m_peer->id);
// TODO: P2P ensure disabled logic is covered // TODO: P2P ensure disabled logic is covered
if (false /* m_server->havePeer(id) */) if (false /* m_server->havePeer(id) */)
{ {
@ -218,7 +214,13 @@ bool Session::interpret(RLP const& _r)
// TODO: P2P Move all node-lifecycle information into Host. Determine best way to handle peer-lifecycle properties vs node lifecycle. // TODO: P2P Move all node-lifecycle information into Host. Determine best way to handle peer-lifecycle properties vs node lifecycle.
// TODO: P2P remove oldid // TODO: P2P remove oldid
// TODO: P2P with encrypted transport the handshake will fail and we won't get here // TODO: P2P with encrypted transport the handshake will fail and we won't get here
// if peer is missing this is incoming connection and we need to tell host about new potential peer
// m_peer = m_server->noteNode(m_peer->id, bi::tcp::endpoint(m_socket.remote_endpoint().address(), listenPort)); // m_peer = m_server->noteNode(m_peer->id, bi::tcp::endpoint(m_socket.remote_endpoint().address(), listenPort));
assert(!!m_peer);
assert(!!m_peer->id);
if (m_peer->isOffline()) if (m_peer->isOffline())
m_peer->lastConnected = chrono::system_clock::now(); m_peer->lastConnected = chrono::system_clock::now();
// //

7
libp2p/Session.h

@ -64,8 +64,6 @@ public:
NodeId id() const; NodeId id() const;
unsigned socketId() const { return m_socket.native_handle(); } unsigned socketId() const { return m_socket.native_handle(); }
bi::tcp::endpoint endpoint() const; ///< for other peers to connect to.
template <class PeerCap> template <class PeerCap>
std::shared_ptr<PeerCap> cap() const { try { return std::static_pointer_cast<PeerCap>(m_capabilities.at(std::make_pair(PeerCap::name(), PeerCap::version()))); } catch (...) { return nullptr; } } std::shared_ptr<PeerCap> cap() const { try { return std::static_pointer_cast<PeerCap>(m_capabilities.at(std::make_pair(PeerCap::name(), PeerCap::version()))); } catch (...) { return nullptr; } }
@ -109,13 +107,12 @@ private:
std::array<byte, 65536> m_data; ///< Buffer for ingress packet data. std::array<byte, 65536> m_data; ///< Buffer for ingress packet data.
bytes m_incoming; ///< Read buffer for ingress bytes. bytes m_incoming; ///< Read buffer for ingress bytes.
PeerSessionInfo m_info; ///< Dynamic information about this peer.
unsigned m_protocolVersion = 0; ///< The protocol version of the peer. unsigned m_protocolVersion = 0; ///< The protocol version of the peer.
std::shared_ptr<PeerInfo> m_peer; ///< The PeerInfo object. std::shared_ptr<PeerInfo> m_peer; ///< The PeerInfo object.
bi::tcp::endpoint m_manualEndpoint; ///< The endpoint as specified by the constructor.
bool m_dropped = false; ///< If true, we've already divested ourselves of this peer. We're just waiting for the reads & writes to fail before the shared_ptr goes OOS and the destructor kicks in. bool m_dropped = false; ///< If true, we've already divested ourselves of this peer. We're just waiting for the reads & writes to fail before the shared_ptr goes OOS and the destructor kicks in.
PeerSessionInfo m_info; ///< Dynamic information about this peer.
bool m_theyRequestedNodes = false; ///< Has the peer requested nodes from us without receiveing an answer from us? bool m_theyRequestedNodes = false; ///< Has the peer requested nodes from us without receiveing an answer from us?
bool m_weRequestedNodes = false; ///< Have we requested nodes from the peer and not received an answer yet? bool m_weRequestedNodes = false; ///< Have we requested nodes from the peer and not received an answer yet?

23
test/peer.cpp

@ -20,6 +20,7 @@
* Peer Network test functions. * Peer Network test functions.
*/ */
#include <boost/test/unit_test.hpp>
#include <chrono> #include <chrono>
#include <thread> #include <thread>
#include <libp2p/Host.h> #include <libp2p/Host.h>
@ -27,6 +28,28 @@ using namespace std;
using namespace dev; using namespace dev;
using namespace dev::p2p; using namespace dev::p2p;
BOOST_AUTO_TEST_SUITE(p2p)
BOOST_AUTO_TEST_CASE(host)
{
NetworkPreferences host1prefs(30301, "127.0.0.1", true, true);
NetworkPreferences host2prefs(30302, "127.0.0.1", true, true);
Host host1("Test", host1prefs);
NodeId node1 = host1.id();
host1.start();
Host host2("Test", host2prefs);
auto node2 = host2.id();
host2.start();
host1.addNode(node2, "127.0.0.1", host2prefs.listenPort, host2prefs.listenPort);
this_thread::sleep_for(chrono::seconds(2));
}
BOOST_AUTO_TEST_SUITE_END()
int peerTest(int argc, char** argv) int peerTest(int argc, char** argv)
{ {
short listenPort = 30303; short listenPort = 30303;

Loading…
Cancel
Save