diff --git a/libethereum/BlockChainSync.cpp b/libethereum/BlockChainSync.cpp index d7fd2c5ea..c3ea67e22 100644 --- a/libethereum/BlockChainSync.cpp +++ b/libethereum/BlockChainSync.cpp @@ -52,6 +52,7 @@ BlockChainSync::BlockChainSync(EthereumHost& _host): BlockChainSync::~BlockChainSync() { + RecursiveGuard l(x_sync); abortSync(); } @@ -67,8 +68,10 @@ DownloadMan& BlockChainSync::downloadMan() void BlockChainSync::abortSync() { + DEV_INVARIANT_CHECK; host().foreachPeer([this](EthereumPeer* _p) { onPeerAborting(_p); return true; }); downloadMan().resetToChain(h256s()); + DEV_INVARIANT_CHECK; } void BlockChainSync::onPeerStatus(EthereumPeer* _peer) @@ -87,14 +90,14 @@ void BlockChainSync::onPeerStatus(EthereumPeer* _peer) _peer->disable("Peer banned for previous bad behaviour."); else { - unsigned estimatedHashes = estimateHashes(); - _peer->m_expectedHashes = estimatedHashes; + unsigned hashes = estimatedHashes(); + _peer->m_expectedHashes = hashes; onNewPeer(_peer); } DEV_INVARIANT_CHECK; } -unsigned BlockChainSync::estimateHashes() const +unsigned BlockChainSync::estimatedHashes() const { BlockInfo block = host().chain().info(); time_t lastBlockTime = (block.hash() == host().chain().genesisHash()) ? 1428192000 : (time_t)block.timestamp; @@ -113,7 +116,6 @@ void BlockChainSync::requestBlocks(EthereumPeer* _peer) if (host().bq().knownFull()) { clog(NetAllDetail) << "Waiting for block queue before downloading blocks"; - m_lastActiveState = m_state; pauseSync(); _peer->setIdle(); return; @@ -233,10 +235,8 @@ void BlockChainSync::onPeerBlocks(EthereumPeer* _peer, RLP const& _r) { if (downloadMan().isComplete()) completeSync(); - else if (!got) - requestBlocks(_peer); else - peerDoneBlocks(_peer); + requestBlocks(_peer); // Some of the blocks might have been downloaded by helping peers, proceed anyway } DEV_INVARIANT_CHECK; } @@ -508,7 +508,7 @@ bool PV60Sync::shouldGrabBlocks(EthereumPeer* _peer) const { auto td = _peer->m_totalDifficulty; auto lh = _peer->m_latestHash; - auto ctd = host().chain().details().totalDifficulty; + auto ctd = host().chain().details().totalDifficulty; if (m_syncingNeededBlocks.empty()) return false; @@ -636,7 +636,9 @@ void PV60Sync::noteDoneBlocks(EthereumPeer* _peer, bool _clemency) // m_banned.insert(_peer->session()->id()); // We know who you are! // _peer->disable("Peer sent hashes but was unable to provide the blocks."); } + resetSync(); downloadMan().reset(); + transition(_peer, SyncState::Idle); } _peer->m_sub.doneFetch(); } @@ -648,7 +650,7 @@ void PV60Sync::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes) _peer->setIdle(); if (!isSyncing(_peer)) { - clog(NetMessageSummary) << "Ignoring hashes synce not syncing"; + clog(NetMessageSummary) << "Ignoring hashes since not syncing"; return; } if (_hashes.size() == 0) @@ -705,6 +707,7 @@ void PV60Sync::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes) clog(NetMessageSummary) << "Ignoring since we're already downloading."; return; } + clog(NetMessageDetail) << "Not syncing and new block hash discovered: syncing without help."; unsigned knowns = 0; unsigned unknowns = 0; for (auto const& h: _hashes) @@ -741,6 +744,7 @@ void PV60Sync::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes) void PV60Sync::abortSync(EthereumPeer* _peer) { + // Can't check invariants here since the peers is already removed from the list and the state is not updated yet. if (isSyncing(_peer)) { host().foreachPeer([this](EthereumPeer* _p) { _p->setIdle(); return true; }); @@ -751,6 +755,8 @@ void PV60Sync::abortSync(EthereumPeer* _peer) void PV60Sync::onPeerAborting(EthereumPeer* _peer) { + RecursiveGuard l(x_sync); + // Can't check invariants here since the peers is already removed from the list and the state is not updated yet. abortSync(_peer); DEV_INVARIANT_CHECK; } @@ -767,6 +773,10 @@ bool PV60Sync::invariants() const host().foreachPeer([&](EthereumPeer* _p) { if (_p->m_asking == Asking::Hashes) hashes = true; return !hashes; }); if (!hashes) return false; + if (!m_syncingLatestHash) + return false; + if (m_syncingNeededBlocks.empty() != (!m_syncingLastReceivedHash)) + return false; } if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks) { @@ -777,5 +787,14 @@ bool PV60Sync::invariants() const if (downloadMan().isComplete()) return false; } + if (m_state == SyncState::Idle) + { + bool busy = false; + host().foreachPeer([&](EthereumPeer* _p) { if (_p->m_asking != Asking::Nothing && _p->m_asking != Asking::State) busy = true; return !busy; }); + if (busy) + return false; + } + if (m_state == SyncState::Waiting && !host().bq().isActive()) + return false; return true; } diff --git a/libethereum/BlockChainSync.h b/libethereum/BlockChainSync.h index 7042c5a48..b946f557e 100644 --- a/libethereum/BlockChainSync.h +++ b/libethereum/BlockChainSync.h @@ -109,17 +109,16 @@ protected: EthereumHost const& host() const { return m_host; } /// Estimates max number of hashes peers can give us. - unsigned estimateHashes() const; + unsigned estimatedHashes() const; /// Request blocks from peer if needed void requestBlocks(EthereumPeer* _peer); protected: - Handler m_bqRoomAvailable; + Handler m_bqRoomAvailable; ///< Triggered once block queue mutable RecursiveMutex x_sync; - SyncState m_state = SyncState::Idle; ///< Current sync state - SyncState m_lastActiveState = SyncState::Idle; ///< Saved state before entering waiting queue mode - unsigned m_estimatedHashes = 0; ///< Number of estimated hashes for the last peer over PV60. Used for status reporting only. + SyncState m_state = SyncState::Idle; ///< Current sync state + unsigned m_estimatedHashes = 0; ///< Number of estimated hashes for the last peer over PV60. Used for status reporting only. private: static char const* const s_stateNames[static_cast(SyncState::Size)]; @@ -133,6 +132,70 @@ private: * @brief Syncrhonization over PV60. Selects a single peer and tries to downloading hashes from it. After hash downaload is complete * Syncs to peers and keeps up to date */ + +/** + * Transitions: + * + * Idle->Hashes + * Triggered when: + * * A new peer appears that we can sync to + * * Transtition to Idle, there are peers we can sync to + * Effects: + * * Set chain sync (m_syncingTotalDifficulty, m_syncingLatestHash, m_syncer) + * * Requests hashes from m_syncer + * + * Hashes->Idle + * Triggered when: + * * Received too many hashes + * * Received 0 total hashes from m_syncer + * * m_syncer aborts + * Effects: + * In case of too many hashes sync is reset + * + * Hashes->Blocks + * Triggered when: + * * Received known hash from m_syncer + * * Received 0 hashes from m_syncer and m_syncingTotalBlocks not empty + * Effects: + * * Set up download manager, clear m_syncingTotalBlocks. Set all peers to help with downloading if they can + * + * Blocks->Idle + * Triggered when: + * * m_syncer aborts + * * m_syncer does not have required block + * * All blocks downloaded + * * Block qeueue is full with unknown blocks + * Effects: + * * Download manager is reset + * + * Blocks->Waiting + * Triggered when: + * * Block queue is full with known blocks + * Effects: + * * Stop requesting blocks from peers + * + * Waiting->Blocks + * Triggered when: + * * Block queue has space for new blocks + * Effects: + * * Continue requesting blocks from peers + * + * Idle->NewBlocks + * Triggered when: + * * New block hashes arrive + * Effects: + * * Set up download manager, clear m_syncingTotalBlocks. Download blocks from a single peer. If downloaded blocks have unknown parents, set the peer to sync + * + * NewBlocks->Idle + * Triggered when: + * * m_syncer aborts + * * m_syncer does not have required block + * * All new blocks downloaded + * * Block qeueue is full with unknown blocks + * Effects: + * * Download manager is reset + * + */ class PV60Sync: public BlockChainSync { public: @@ -153,6 +216,7 @@ public: /// @returns Sync status SyncStatus status() const override; +protected: void onNewPeer(EthereumPeer* _peer) override; void continueSync() override; void peerDoneBlocks(EthereumPeer* _peer) override; @@ -205,9 +269,10 @@ private: h256s m_syncingNeededBlocks; ///< The blocks that we should download from this peer. h256 m_syncingLastReceivedHash; ///< Hash most recently received from peer. - h256 m_syncingLatestHash; ///< Peer's latest block's hash, as of the current sync. - u256 m_syncingTotalDifficulty; ///< Peer's latest block's total difficulty, as of the current sync. - EthereumPeer* m_syncer = nullptr; // TODO: switch to weak_ptr + h256 m_syncingLatestHash; ///< Latest block's hash of the peer we are syncing to, as of the current sync. + u256 m_syncingTotalDifficulty; ///< Latest block's total difficulty of the peer we aresyncing to, as of the current sync. + // TODO: switch to weak_ptr + EthereumPeer* m_syncer = nullptr; ///< Peer we are currently syncing with }; } } diff --git a/libp2p/Common.h b/libp2p/Common.h index 4a1b64b70..445ba0cca 100644 --- a/libp2p/Common.h +++ b/libp2p/Common.h @@ -149,14 +149,15 @@ using CapDescs = std::vector; */ struct PeerSessionInfo { - NodeId id; - std::string clientVersion; - std::string host; - unsigned short port; + NodeId const id; + std::string const clientVersion; + std::string const host; + unsigned short const port; std::chrono::steady_clock::duration lastPing; - std::set caps; + std::set const caps; unsigned socketId; std::map notes; + unsigned const protocolVersion; }; using PeerSessionInfos = std::vector; diff --git a/libp2p/Host.cpp b/libp2p/Host.cpp index feb116c4a..c9c2743ee 100644 --- a/libp2p/Host.cpp +++ b/libp2p/Host.cpp @@ -254,7 +254,7 @@ void Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameCoder* clog(NetMessageSummary) << "Hello: " << clientVersion << "V[" << protocolVersion << "]" << _id << showbase << capslog.str() << dec << listenPort; // create session so disconnects are managed - auto ps = make_shared(this, _io, _s, p, PeerSessionInfo({_id, clientVersion, p->endpoint.address.to_string(), listenPort, chrono::steady_clock::duration(), _rlp[2].toSet(), 0, map()})); + auto ps = make_shared(this, _io, _s, p, PeerSessionInfo({_id, clientVersion, p->endpoint.address.to_string(), listenPort, chrono::steady_clock::duration(), _rlp[2].toSet(), 0, map(), protocolVersion})); if (protocolVersion < dev::p2p::c_protocolVersion - 1) { ps->disconnect(IncompatibleProtocol); @@ -724,8 +724,16 @@ void Host::startedWorking() void Host::doWork() { - if (m_run) - m_ioService.run(); + try + { + if (m_run) + m_ioService.run(); + } + catch (std::exception const& _e) + { + clog(NetP2PWarn) << "Exception in Network Thread:" << _e.what(); + clog(NetP2PWarn) << "Network Restart is Recommended."; + } } void Host::keepAlivePeers() diff --git a/libp2p/RLPXFrameCoder.cpp b/libp2p/RLPXFrameCoder.cpp index dbbe1ddee..c53c5f508 100644 --- a/libp2p/RLPXFrameCoder.cpp +++ b/libp2p/RLPXFrameCoder.cpp @@ -30,6 +30,18 @@ using namespace dev; using namespace dev::p2p; using namespace CryptoPP; +RLPXFrameInfo::RLPXFrameInfo(bytesConstRef _header) +{ + length = (_header[0] * 256 + _header[1]) * 256 + _header[2]; + padding = ((16 - (length % 16)) % 16); + RLP header(_header.cropped(3), RLP::ThrowOnFail | RLP::FailIfTooSmall); + auto itemCount = header.itemCount(); + protocolId = header[0].toInt(); + hasSequence = itemCount > 1; + sequenceId = hasSequence ? header[1].toInt() : 0; + totalLength = itemCount == 3 ? header[2].toInt() : 0; +} + RLPXFrameCoder::RLPXFrameCoder(RLPXHandshake const& _init) { setup(_init.m_originated, _init.m_remoteEphemeral, _init.m_remoteNonce, _init.m_ecdhe, _init.m_nonce, &_init.m_ackCipher, &_init.m_authCipher); diff --git a/libp2p/RLPXFrameCoder.h b/libp2p/RLPXFrameCoder.h index 9706da477..f4c13758b 100644 --- a/libp2p/RLPXFrameCoder.h +++ b/libp2p/RLPXFrameCoder.h @@ -32,7 +32,21 @@ namespace dev { namespace p2p { + +struct RLPXFrameInfo +{ + RLPXFrameInfo() = default; + /// Constructor. frame-size || protocol-type, [sequence-id[, total-packet-size]] + RLPXFrameInfo(bytesConstRef _frameHeader); + uint32_t length = 0; ///< Max: 2**24 + uint8_t padding = 0; + uint16_t protocolId = 0; + bool hasSequence = false; + uint16_t sequenceId = 0; + uint32_t totalLength = 0; +}; + class RLPXHandshake; /** diff --git a/libp2p/Session.cpp b/libp2p/Session.cpp index c3f0f2e35..f24377fb2 100644 --- a/libp2p/Session.cpp +++ b/libp2p/Session.cpp @@ -27,7 +27,6 @@ #include #include #include -#include "RLPxHandshake.h" #include "Host.h" #include "Capability.h" using namespace std; @@ -156,111 +155,25 @@ void Session::serviceNodesRequest() addNote("peers", "done"); } -bool Session::interpret(PacketType _t, RLP const& _r) +bool Session::readPacket(uint16_t _capId, PacketType _t, RLP const& _r) { m_lastReceived = chrono::steady_clock::now(); - clog(NetRight) << _t << _r; try // Generic try-catch block designed to capture RLP format errors - TODO: give decent diagnostics, make a bit more specific over what is caught. { - switch (_t) - { - case DisconnectPacket: - { - string reason = "Unspecified"; - auto r = (DisconnectReason)_r[0].toInt(); - if (!_r[0].isInt()) - drop(BadProtocol); - else - { - reason = reasonOf(r); - clog(NetMessageSummary) << "Disconnect (reason: " << reason << ")"; - drop(DisconnectRequested); - } - break; - } - case PingPacket: - { - clog(NetTriviaSummary) << "Ping"; - RLPStream s; - sealAndSend(prep(s, PongPacket)); - break; - } - case PongPacket: - m_info.lastPing = std::chrono::steady_clock::now() - m_ping; - clog(NetTriviaSummary) << "Latency: " << chrono::duration_cast(m_info.lastPing).count() << " ms"; - break; - case GetPeersPacket: - // Disabled for interop testing. - // GetPeers/PeersPacket will be modified to only exchange new nodes which it's peers are interested in. - break; - - clog(NetTriviaSummary) << "GetPeers"; - m_theyRequestedNodes = true; - serviceNodesRequest(); - break; - case PeersPacket: - // Disabled for interop testing. - // GetPeers/PeersPacket will be modified to only exchange new nodes which it's peers are interested in. - break; - - clog(NetTriviaSummary) << "Peers (" << dec << (_r.itemCount() - 1) << " entries)"; - m_weRequestedNodes = false; - for (unsigned i = 0; i < _r.itemCount(); ++i) - { - bi::address peerAddress; - if (_r[i][0].size() == 16) - peerAddress = bi::address_v6(_r[i][0].toHash>().asArray()); - else if (_r[i][0].size() == 4) - peerAddress = bi::address_v4(_r[i][0].toHash>().asArray()); - else - { - cwarn << "Received bad peer packet:" << _r; - disconnect(BadProtocol); - return true; - } - auto ep = bi::tcp::endpoint(peerAddress, _r[i][1].toInt()); - NodeId id = _r[i][2].toHash(); - - clog(NetAllDetail) << "Checking: " << ep << "(" << id << ")"; - - if (!isPublicAddress(peerAddress)) - goto CONTINUE; // Private address. Ignore. - - if (!id) - goto LAMEPEER; // Null identity. Ignore. - - if (m_server->id() == id) - goto LAMEPEER; // Just our info - we already have that. - - if (id == this->id()) - goto LAMEPEER; // Just their info - we already have that. - - if (!ep.port()) - goto LAMEPEER; // Zero port? Don't think so. - - if (ep.port() >= /*49152*/32768) - goto LAMEPEER; // Private port according to IANA. - - // OK passed all our checks. Assume it's good. - addRating(1000); - m_server->addNode(id, NodeIPEndpoint(ep.address(), ep.port(), ep.port())); - clog(NetTriviaDetail) << "New peer: " << ep << "(" << id << ")"; - CONTINUE:; - LAMEPEER:; - } - break; - default: + // v4 frame headers are useless, offset packet type used + // v5 protocol type is in header, packet type not offset + if (_capId == 0 && _t < UserPacket) + return interpret(_t, _r); + if (m_info.protocolVersion >= 5) + for (auto const& i: m_capabilities) + if (_capId == (uint16_t)i.first.second) + return i.second->m_enabled ? i.second->interpret(_t, _r) : true; + if (m_info.protocolVersion <= 4) for (auto const& i: m_capabilities) if (_t >= (int)i.second->m_idOffset && _t - i.second->m_idOffset < i.second->hostCapability()->messageCount()) - { - if (i.second->m_enabled) - return i.second->interpret(_t - i.second->m_idOffset, _r); - else - return true; - } - return false; - } + return i.second->m_enabled ? i.second->interpret(_t - i.second->m_idOffset, _r) : true; + return false; } catch (std::exception const& _e) { @@ -271,6 +184,101 @@ bool Session::interpret(PacketType _t, RLP const& _r) return true; } +bool Session::interpret(PacketType _t, RLP const& _r) +{ + switch (_t) + { + case DisconnectPacket: + { + string reason = "Unspecified"; + auto r = (DisconnectReason)_r[0].toInt(); + if (!_r[0].isInt()) + drop(BadProtocol); + else + { + reason = reasonOf(r); + clog(NetMessageSummary) << "Disconnect (reason: " << reason << ")"; + drop(DisconnectRequested); + } + break; + } + case PingPacket: + { + clog(NetTriviaSummary) << "Ping"; + RLPStream s; + sealAndSend(prep(s, PongPacket)); + break; + } + case PongPacket: + m_info.lastPing = std::chrono::steady_clock::now() - m_ping; + clog(NetTriviaSummary) << "Latency: " << chrono::duration_cast(m_info.lastPing).count() << " ms"; + break; + case GetPeersPacket: + // Disabled for interop testing. + // GetPeers/PeersPacket will be modified to only exchange new nodes which it's peers are interested in. + break; + + clog(NetTriviaSummary) << "GetPeers"; + m_theyRequestedNodes = true; + serviceNodesRequest(); + break; + case PeersPacket: + // Disabled for interop testing. + // GetPeers/PeersPacket will be modified to only exchange new nodes which it's peers are interested in. + break; + + clog(NetTriviaSummary) << "Peers (" << dec << (_r.itemCount() - 1) << " entries)"; + m_weRequestedNodes = false; + for (unsigned i = 0; i < _r.itemCount(); ++i) + { + bi::address peerAddress; + if (_r[i][0].size() == 16) + peerAddress = bi::address_v6(_r[i][0].toHash>().asArray()); + else if (_r[i][0].size() == 4) + peerAddress = bi::address_v4(_r[i][0].toHash>().asArray()); + else + { + cwarn << "Received bad peer packet:" << _r; + disconnect(BadProtocol); + return true; + } + auto ep = bi::tcp::endpoint(peerAddress, _r[i][1].toInt()); + NodeId id = _r[i][2].toHash(); + + clog(NetAllDetail) << "Checking: " << ep << "(" << id << ")"; + + if (!isPublicAddress(peerAddress)) + goto CONTINUE; // Private address. Ignore. + + if (!id) + goto LAMEPEER; // Null identity. Ignore. + + if (m_server->id() == id) + goto LAMEPEER; // Just our info - we already have that. + + if (id == this->id()) + goto LAMEPEER; // Just their info - we already have that. + + if (!ep.port()) + goto LAMEPEER; // Zero port? Don't think so. + + if (ep.port() >= /*49152*/32768) + goto LAMEPEER; // Private port according to IANA. + + // OK passed all our checks. Assume it's good. + addRating(1000); + m_server->addNode(id, NodeIPEndpoint(ep.address(), ep.port(), ep.port())); + clog(NetTriviaDetail) << "New peer: " << ep << "(" << id << ")"; + CONTINUE:; + LAMEPEER:; + } + break; + default: + return false; + } + return true; +} + void Session::ping() { RLPStream s; @@ -292,12 +300,9 @@ void Session::sealAndSend(RLPStream& _s) bool Session::checkPacket(bytesConstRef _msg) { - if (_msg.size() < 2) + if (_msg[0] > 0x7f || _msg.size() < 2) return false; - if (_msg[0] > 0x7f) - return false; - RLP r(_msg.cropped(1)); - if (r.actualSize() + 1 != _msg.size()) + if (RLP(_msg.cropped(1)).actualSize() + 1 != _msg.size()) return false; return true; } @@ -413,82 +418,78 @@ void Session::doRead() { ThreadContext tc(info().id.abridged()); ThreadContext tc2(info().clientVersion); - if (ec && ec.category() != boost::asio::error::get_misc_category() && ec.value() != boost::asio::error::eof) + if (!checkRead(h256::size, ec, length)) + return; + else if (!m_io->authAndDecryptHeader(bytesRef(m_data.data(), length))) { - clog(NetWarn) << "Error reading: " << ec.message(); - drop(TCPError); + clog(NetWarn) << "header decrypt failed"; + drop(BadProtocol); // todo: better error + return; + } + + RLPXFrameInfo header; + try + { + header = RLPXFrameInfo(bytesConstRef(m_data.data(), length)); } - else if (ec && length == 0) + catch (std::exception const& _e) + { + clog(NetWarn) << "Exception decoding frame header RLP:" << bytesConstRef(m_data.data(), h128::size).cropped(3); + drop(BadProtocol); return; - else + } + + /// read padded frame and mac + auto tlen = header.length + header.padding + h128::size; + ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, tlen), [this, self, header, tlen](boost::system::error_code ec, std::size_t length) { - /// authenticate and decrypt header - bytesRef header(m_data.data(), h256::size); - if (!m_io->authAndDecryptHeader(header)) + ThreadContext tc(info().id.abridged()); + ThreadContext tc2(info().clientVersion); + if (!checkRead(tlen, ec, length)) + return; + else if (!m_io->authAndDecryptFrame(bytesRef(m_data.data(), tlen))) { - clog(NetWarn) << "header decrypt failed"; + clog(NetWarn) << "frame decrypt failed"; drop(BadProtocol); // todo: better error return; } - /// check frame size - uint32_t frameSize = (m_data[0] * 256 + m_data[1]) * 256 + m_data[2]; - if (frameSize >= (uint32_t)1 << 24) + bytesConstRef frame(m_data.data(), header.length); + if (!checkPacket(frame)) { - clog(NetWarn) << "frame size too large"; - drop(BadProtocol); + cerr << "Received " << frame.size() << ": " << toHex(frame) << endl; + clog(NetWarn) << "INVALID MESSAGE RECEIVED"; + disconnect(BadProtocol); return; } - - /// rlp of header has protocol-type, sequence-id[, total-packet-size] - bytes headerRLP(13); - bytesConstRef(m_data.data(), h128::size).cropped(3).copyTo(&headerRLP); - - /// read padded frame and mac - auto tlen = frameSize + ((16 - (frameSize % 16)) % 16) + h128::size; - ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, tlen), [this, self, headerRLP, frameSize, tlen](boost::system::error_code ec, std::size_t length) + else { - ThreadContext tc(info().id.abridged()); - ThreadContext tc2(info().clientVersion); - if (ec && ec.category() != boost::asio::error::get_misc_category() && ec.value() != boost::asio::error::eof) - { - clog(NetWarn) << "Error reading: " << ec.message(); - drop(TCPError); - } - else if (ec && length < tlen) - { - clog(NetWarn) << "Error reading - Abrupt peer disconnect: " << ec.message(); - repMan().noteRude(*this); - drop(TCPError); - return; - } - else - { - if (!m_io->authAndDecryptFrame(bytesRef(m_data.data(), tlen))) - { - clog(NetWarn) << "frame decrypt failed"; - drop(BadProtocol); // todo: better error - return; - } - - bytesConstRef frame(m_data.data(), frameSize); - if (!checkPacket(frame)) - { - cerr << "Received " << frame.size() << ": " << toHex(frame) << endl; - clog(NetWarn) << "INVALID MESSAGE RECEIVED"; - disconnect(BadProtocol); - return; - } - else - { - auto packetType = (PacketType)RLP(frame.cropped(0, 1)).toInt(); - RLP r(frame.cropped(1)); - if (!interpret(packetType, r)) - clog(NetWarn) << "Couldn't interpret packet." << RLP(r); - } - doRead(); - } - }); - } + auto packetType = (PacketType)RLP(frame.cropped(0, 1)).toInt(); + RLP r(frame.cropped(1)); + if (!readPacket(header.protocolId, packetType, r)) + clog(NetWarn) << "Couldn't interpret packet." << RLP(r); + } + doRead(); + }); }); } + +bool Session::checkRead(std::size_t _expected, boost::system::error_code _ec, std::size_t _length) +{ + if (_ec && _ec.category() != boost::asio::error::get_misc_category() && _ec.value() != boost::asio::error::eof) + { + clog(NetConnect) << "Error reading: " << _ec.message(); + drop(TCPError); + return false; + } + else if (_ec && _length < _expected) + { + clog(NetWarn) << "Error reading - Abrupt peer disconnect: " << _ec.message(); + repMan().noteRude(*this); + drop(TCPError); + return false; + } + // If this fails then there's an unhandled asio error + assert(_expected == _length); + return true; +} diff --git a/libp2p/Session.h b/libp2p/Session.h index 6b45fe381..ba46c5a16 100644 --- a/libp2p/Session.h +++ b/libp2p/Session.h @@ -96,13 +96,19 @@ private: /// Perform a read on the socket. void doRead(); + + /// Check error code after reading and drop peer if error code. + bool checkRead(std::size_t _expected, boost::system::error_code _ec, std::size_t _length); /// Perform a single round of the write operation. This could end up calling itself asynchronously. void write(); - /// Interpret an incoming message. - bool interpret(PacketType _t, RLP const& _r); + /// Deliver RLPX packet to Session or Capability for interpretation. + bool readPacket(uint16_t _capId, PacketType _t, RLP const& _r); + /// Interpret an incoming Session packet. + bool interpret(PacketType _t, RLP const& _r); + /// @returns true iff the _msg forms a valid message for sending or receiving on the network. static bool checkPacket(bytesConstRef _msg); diff --git a/test/libp2p/net.cpp b/test/libp2p/net.cpp index 8c652b479..1e3e2e15c 100644 --- a/test/libp2p/net.cpp +++ b/test/libp2p/net.cpp @@ -314,23 +314,23 @@ BOOST_AUTO_TEST_CASE(kademlia) node.nodeTable->discover(); // ideally, joining with empty node table logs warning we can check for node.setup(); node.populate(); - clog << "NodeTable:\n" << *node.nodeTable.get() << endl; +// clog << "NodeTable:\n" << *node.nodeTable.get() << endl; node.populateAll(); - clog << "NodeTable:\n" << *node.nodeTable.get() << endl; +// clog << "NodeTable:\n" << *node.nodeTable.get() << endl; auto nodes = node.nodeTable->nodes(); nodes.sort(); node.nodeTable->reset(); - clog << "NodeTable:\n" << *node.nodeTable.get() << endl; +// clog << "NodeTable:\n" << *node.nodeTable.get() << endl; node.populate(1); - clog << "NodeTable:\n" << *node.nodeTable.get() << endl; +// clog << "NodeTable:\n" << *node.nodeTable.get() << endl; node.nodeTable->discover(); this_thread::sleep_for(chrono::milliseconds(2000)); - clog << "NodeTable:\n" << *node.nodeTable.get() << endl; +// clog << "NodeTable:\n" << *node.nodeTable.get() << endl; BOOST_REQUIRE_EQUAL(node.nodeTable->count(), 8);