From 75f231419aad339028c92024ea55bd741b0e63e7 Mon Sep 17 00:00:00 2001 From: subtly Date: Mon, 12 Jan 2015 04:58:52 +0100 Subject: [PATCH] Connectivity and nodetable callbacks. Disable stale code. --- alethzero/MainWin.cpp | 4 ++-- libp2p/Host.cpp | 20 +++++++++++++++++++- libp2p/Host.h | 6 +++--- libp2p/NodeTable.cpp | 14 ++++++++++++-- libp2p/NodeTable.h | 20 ++++++++++++++++++-- libp2p/Session.cpp | 44 ++++++++++++++++++++++--------------------- libp2p/Session.h | 7 ++----- test/peer.cpp | 23 ++++++++++++++++++++++ 8 files changed, 102 insertions(+), 36 deletions(-) diff --git a/alethzero/MainWin.cpp b/alethzero/MainWin.cpp index f2437c6d2..f4750326d 100644 --- a/alethzero/MainWin.cpp +++ b/alethzero/MainWin.cpp @@ -930,7 +930,7 @@ void Main::refreshNetwork() .arg(QString::fromStdString(i.id.abridged()))); auto ns = web3()->nodes(); - for (p2p::Node const& i: ns) + for (p2p::PeerInfo const& i: ns) if (!i.dead) ui->nodes->insertItem(clients.count(i.id) ? 0 : ui->nodes->count(), QString("[%1 %3] %2 - ( =%5s | /%4s%6 ) - *%7 $%8") .arg(QString::fromStdString(i.id.abridged())) @@ -940,7 +940,7 @@ void Main::refreshNetwork() .arg(i.secondsSinceLastConnected()) .arg(i.isOffline() ? " | " + QString::fromStdString(reasonOf(i.lastDisconnect)) + " | " + QString::number(i.failedAttempts) + "x" : "") .arg(i.rating) - .arg((int)i.idOrigin) + .arg(0 /* (int)i.idOrigin */) ); } } diff --git a/libp2p/Host.cpp b/libp2p/Host.cpp index dd38ff5bd..61f30b548 100644 --- a/libp2p/Host.cpp +++ b/libp2p/Host.cpp @@ -177,6 +177,8 @@ void Host::onNodeTableEvent(NodeId _n, NodeTableEventType _e) { if (_e == NodeEntryAdded) { + clog(NetNote) << "p2p.host.nodeTable.events.nodeEntryAdded " << _n; + auto n = (*m_nodeTable)[_n]; if (n) { @@ -196,6 +198,8 @@ void Host::onNodeTableEvent(NodeId _n, NodeTableEventType _e) } else if (_e == NodeEntryRemoved) { + clog(NetNote) << "p2p.host.nodeTable.events.nodeEntryRemoved " << _n; + RecursiveGuard l(x_sessions); m_peers.erase(_n); } @@ -294,7 +298,17 @@ void Host::runAcceptor() { try { - doHandshake(s); +// RecursiveGuard l(x_sessions); +// auto p = m_peers[_n]; +// if (!p) +// { +// m_peers[_n] = make_shared(); +// p = m_peers[_n]; +// p->id = _n; +// } +// p->address = n.endpoint.tcp; + + doHandshake(s, NodeId()); success = true; } catch (Exception const& _e) @@ -329,6 +343,7 @@ void Host::doHandshake(bi::tcp::socket* _socket, NodeId _nodeId) clog(NetConnect) << "Accepting connection for " << _socket->remote_endpoint(); } catch (...){} + // auto p = std::make_shared(this, std::move(*_socket), m_peers[_nodeId]); p->start(); } @@ -538,6 +553,8 @@ void Host::run(boost::system::error_code const&) return; } + m_nodeTable->processEvents(); + for (auto p: m_sessions) if (auto pp = p.second.lock()) pp->serviceNodesRequest(); @@ -585,6 +602,7 @@ void Host::startedWorking() m_nodeTable.reset(new NodeTable(m_ioService, m_key, m_listenPort)); else m_nodeTable.reset(new NodeTable(m_ioService, m_key, m_listenPort > 0 ? m_listenPort : 30303)); + m_nodeTable->setEventHandler(new HostNodeTableHandler(*this)); } clog(NetNote) << "p2p.started id:" << id().abridged(); diff --git a/libp2p/Host.h b/libp2p/Host.h index 097b07433..3d386fe8c 100644 --- a/libp2p/Host.h +++ b/libp2p/Host.h @@ -110,10 +110,10 @@ struct PeerInfo using Nodes = std::vector; - -class Host; + class HostNodeTableHandler: public NodeTableEventHandler { + friend class Host; HostNodeTableHandler(Host& _host); virtual void processEvent(NodeId _n, NodeTableEventType _e); Host& m_host; @@ -213,7 +213,7 @@ private: void runAcceptor(); /// Handler for verifying handshake siganture before creating session. _egressNodeId is passed for outbound connections. - void doHandshake(bi::tcp::socket* _socket, NodeId _egressNodeId = NodeId()); + void doHandshake(bi::tcp::socket* _socket, NodeId _nodeId); void seal(bytes& _b); diff --git a/libp2p/NodeTable.cpp b/libp2p/NodeTable.cpp index b384c42d2..635f91bfb 100644 --- a/libp2p/NodeTable.cpp +++ b/libp2p/NodeTable.cpp @@ -65,6 +65,10 @@ shared_ptr NodeTable::addNode(Node const& _node) shared_ptr ret = m_nodes[_node.id]; if (!ret) { + clog(NodeTableNote) << "p2p.nodes.add " << _node.id.abridged(); + if (m_nodeEvents) + m_nodeEvents->appendEvent(_node.id, NodeEntryAdded); + ret.reset(new NodeEntry(m_node, _node.id, NodeIPEndpoint(_node.endpoint.udp, _node.endpoint.tcp))); m_nodes[_node.id] = ret; PingNode p(_node.endpoint.udp, m_node.endpoint.udp.address().to_string(), m_node.endpoint.udp.port()); @@ -317,8 +321,14 @@ void NodeTable::dropNode(shared_ptr _n) Guard l(x_state); s.nodes.remove_if([&_n](weak_ptr n) { return n.lock() == _n; }); } - Guard l(x_nodes); - m_nodes.erase(_n->id); + { + Guard l(x_nodes); + m_nodes.erase(_n->id); + } + + clog(NodeTableNote) << "p2p.nodes.drop " << _n->id.abridged(); + if (m_nodeEvents) + m_nodeEvents->appendEvent(_n->id, NodeEntryRemoved); } NodeTable::NodeBucket& NodeTable::bucket(NodeEntry const* _n) diff --git a/libp2p/NodeTable.h b/libp2p/NodeTable.h index 7f75bf27e..75042dffc 100644 --- a/libp2p/NodeTable.h +++ b/libp2p/NodeTable.h @@ -94,7 +94,20 @@ public: protected: /// Called by NodeTable on behalf of an implementation (Host) to process new events without blocking nodetable. - void processEvents() { std::list> events; { Guard l(x_events); if (!m_nodeEvents.size()) return; m_nodeEvents.unique(); for (auto const& n: m_nodeEvents) events.push_back(std::make_pair(n,m_events[n])); m_nodeEvents.empty(); m_events.empty(); } for (auto const& e: events) processEvent(e.first, e.second); } + void processEvents() { + std::list> events; + { + Guard l(x_events); + if (!m_nodeEvents.size()) + return; + m_nodeEvents.unique(); + for (auto const& n: m_nodeEvents) events.push_back(std::make_pair(n,m_events[n])); + m_nodeEvents.empty(); + m_events.empty(); + } + for (auto const& e: events) + processEvent(e.first, e.second); + } /// Called by NodeTable to append event. virtual void appendEvent(NodeId _n, NodeTableEventType _e) { Guard l(x_events); m_nodeEvents.push_back(_n); m_events[_n] = _e; } @@ -173,7 +186,10 @@ public: void setEventHandler(NodeTableEventHandler* _handler) { m_nodeEvents.reset(_handler); } /// Called by implementation which provided handler to process NodeEntryAdded/NodeEntryRemoved events. Events are coalesced by type whereby old events are ignored. - void processEvents() { if (m_nodeEvents) m_nodeEvents->processEvents(); } + void processEvents() { + if (m_nodeEvents) + m_nodeEvents->processEvents(); + } /// Add node. Node will be pinged if it's not already known. std::shared_ptr addNode(Public const& _pubk, bi::udp::endpoint const& _udp, bi::tcp::endpoint const& _tcp = bi::tcp::endpoint()); diff --git a/libp2p/Session.cpp b/libp2p/Session.cpp index 0ef06ab50..80c538b24 100644 --- a/libp2p/Session.cpp +++ b/libp2p/Session.cpp @@ -39,9 +39,8 @@ using namespace dev::p2p; Session::Session(Host* _s, bi::tcp::socket _socket, std::shared_ptr const& _n): m_server(_s), m_socket(std::move(_socket)), - m_info({m_peer->id, "?", _n->address.address().to_string(), _n->address.port(), std::chrono::steady_clock::duration(0), CapDescSet(), 0, map()}), m_peer(_n), - m_manualEndpoint(_n->address) + m_info({NodeId(), "?", m_socket.remote_endpoint().address().to_string(), 0, std::chrono::steady_clock::duration(0), CapDescSet(), 0, map()}) { m_lastReceived = m_connect = std::chrono::steady_clock::now(); } @@ -87,21 +86,21 @@ int Session::rating() const return m_peer->rating; } -// TODO: P2P integration: session->? should be unavailable when socket isn't open -bi::tcp::endpoint Session::endpoint() const -{ - if (m_socket.is_open() && m_peer) - try - { - return bi::tcp::endpoint(m_socket.remote_endpoint().address(), m_peer->address.port()); - } - catch (...) {} - - if (m_peer) - return m_peer->address; - - return m_manualEndpoint; -} +//// TODO: P2P integration: session->? should be unavailable when socket isn't open +//bi::tcp::endpoint Session::endpoint() const +//{ +// if (m_socket.is_open() && m_peer) +// try +// { +// return bi::tcp::endpoint(m_socket.remote_endpoint().address(), m_peer->address.port()); +// } +// catch (...) {} +// +// if (m_peer) +// return m_peer->address; +// +// return m_manualEndpoint; +//} template vector randomSelection(vector const& _t, unsigned _n) { @@ -190,14 +189,11 @@ bool Session::interpret(RLP const& _r) if (m_server->id() == id) { // Already connected. - clogS(NetWarn) << "Connected to ourself under a false pretext. We were told this peer was id" << m_info.id.abridged(); + clogS(NetWarn) << "Connected to ourself under a false pretext. We were told this peer was id" << id.abridged(); disconnect(LocalIdentity); return true; } - assert(!!m_peer); - assert(!!m_peer->id); - // TODO: P2P ensure disabled logic is covered if (false /* m_server->havePeer(id) */) { @@ -218,7 +214,13 @@ bool Session::interpret(RLP const& _r) // TODO: P2P Move all node-lifecycle information into Host. Determine best way to handle peer-lifecycle properties vs node lifecycle. // TODO: P2P remove oldid // TODO: P2P with encrypted transport the handshake will fail and we won't get here + + // if peer is missing this is incoming connection and we need to tell host about new potential peer + + // m_peer = m_server->noteNode(m_peer->id, bi::tcp::endpoint(m_socket.remote_endpoint().address(), listenPort)); + assert(!!m_peer); + assert(!!m_peer->id); if (m_peer->isOffline()) m_peer->lastConnected = chrono::system_clock::now(); // diff --git a/libp2p/Session.h b/libp2p/Session.h index 9c0472a81..55330c93c 100644 --- a/libp2p/Session.h +++ b/libp2p/Session.h @@ -64,8 +64,6 @@ public: NodeId id() const; unsigned socketId() const { return m_socket.native_handle(); } - bi::tcp::endpoint endpoint() const; ///< for other peers to connect to. - template std::shared_ptr cap() const { try { return std::static_pointer_cast(m_capabilities.at(std::make_pair(PeerCap::name(), PeerCap::version()))); } catch (...) { return nullptr; } } @@ -109,13 +107,12 @@ private: std::array m_data; ///< Buffer for ingress packet data. bytes m_incoming; ///< Read buffer for ingress bytes. - PeerSessionInfo m_info; ///< Dynamic information about this peer. - unsigned m_protocolVersion = 0; ///< The protocol version of the peer. std::shared_ptr m_peer; ///< The PeerInfo object. - bi::tcp::endpoint m_manualEndpoint; ///< The endpoint as specified by the constructor. bool m_dropped = false; ///< If true, we've already divested ourselves of this peer. We're just waiting for the reads & writes to fail before the shared_ptr goes OOS and the destructor kicks in. + PeerSessionInfo m_info; ///< Dynamic information about this peer. + bool m_theyRequestedNodes = false; ///< Has the peer requested nodes from us without receiveing an answer from us? bool m_weRequestedNodes = false; ///< Have we requested nodes from the peer and not received an answer yet? diff --git a/test/peer.cpp b/test/peer.cpp index 5c11d4cfb..c3a617ce6 100644 --- a/test/peer.cpp +++ b/test/peer.cpp @@ -20,6 +20,7 @@ * Peer Network test functions. */ +#include #include #include #include @@ -27,6 +28,28 @@ using namespace std; using namespace dev; using namespace dev::p2p; +BOOST_AUTO_TEST_SUITE(p2p) + +BOOST_AUTO_TEST_CASE(host) +{ + NetworkPreferences host1prefs(30301, "127.0.0.1", true, true); + NetworkPreferences host2prefs(30302, "127.0.0.1", true, true); + + Host host1("Test", host1prefs); + NodeId node1 = host1.id(); + host1.start(); + + Host host2("Test", host2prefs); + auto node2 = host2.id(); + host2.start(); + + host1.addNode(node2, "127.0.0.1", host2prefs.listenPort, host2prefs.listenPort); + + this_thread::sleep_for(chrono::seconds(2)); +} + +BOOST_AUTO_TEST_SUITE_END() + int peerTest(int argc, char** argv) { short listenPort = 30303;