diff --git a/libethereum/EthereumPeer.cpp b/libethereum/EthereumPeer.cpp index 95e1aadda..a546c3b22 100644 --- a/libethereum/EthereumPeer.cpp +++ b/libethereum/EthereumPeer.cpp @@ -297,13 +297,13 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) { case StatusPacket: { - m_protocolVersion = _r[1].toInt(); - m_networkId = _r[2].toInt(); + m_protocolVersion = _r[0].toInt(); + m_networkId = _r[1].toInt(); // a bit dirty as we're misusing these to communicate the values to transition, but harmless. - m_totalDifficulty = _r[3].toInt(); - m_latestHash = _r[4].toHash(); - auto genesisHash = _r[5].toHash(); + m_totalDifficulty = _r[2].toInt(); + m_latestHash = _r[3].toHash(); + auto genesisHash = _r[4].toHash(); clogS(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << genesisHash.abridged() << ", TD:" << m_totalDifficulty << "=" << m_latestHash.abridged(); @@ -327,7 +327,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) clogS(NetMessageSummary) << "Transactions (" << dec << (_r.itemCount() - 1) << "entries)"; addRating(_r.itemCount() - 1); Guard l(x_knownTransactions); - for (unsigned i = 1; i < _r.itemCount(); ++i) + for (unsigned i = 0; i < _r.itemCount(); ++i) { auto h = sha3(_r[i].data()); m_knownTransactions.insert(h); @@ -339,8 +339,8 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) } case GetBlockHashesPacket: { - h256 later = _r[1].toHash(); - unsigned limit = _r[2].toInt(); + h256 later = _r[0].toHash(); + unsigned limit = _r[1].toInt(); clogS(NetMessageSummary) << "GetBlockHashes (" << limit << "entries," << later.abridged() << ")"; unsigned c = min(host()->m_chain.number(later), limit); @@ -367,7 +367,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) transition(Asking::Blocks); return true; } - for (unsigned i = 1; i < _r.itemCount(); ++i) + for (unsigned i = 0; i < _r.itemCount(); ++i) { auto h = _r[i].toHash(); if (host()->m_chain.isKnown(h)) @@ -388,7 +388,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) // return the requested blocks. bytes rlp; unsigned n = 0; - for (unsigned i = 1; i < _r.itemCount() && i <= c_maxBlocks; ++i) + for (unsigned i = 0; i < _r.itemCount() && i <= c_maxBlocks; ++i) { auto b = host()->m_chain.block(_r[i].toHash()); if (b.size()) @@ -422,7 +422,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) unsigned got = 0; unsigned repeated = 0; - for (unsigned i = 1; i < _r.itemCount(); ++i) + for (unsigned i = 0; i < _r.itemCount(); ++i) { auto h = BlockInfo::headerHash(_r[i].data()); if (m_sub.noteBlock(h)) @@ -467,14 +467,14 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) } case NewBlockPacket: { - auto h = BlockInfo::headerHash(_r[1].data()); + auto h = BlockInfo::headerHash(_r[0].data()); clogS(NetMessageSummary) << "NewBlock: " << h.abridged(); if (_r.itemCount() != 3) disable("NewBlock without 2 data fields."); else { - switch (host()->m_bq.import(_r[1].data(), host()->m_chain)) + switch (host()->m_bq.import(_r[0].data(), host()->m_chain)) { case ImportResult::Success: addRating(100); @@ -493,7 +493,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) case ImportResult::UnknownParent: clogS(NetMessageSummary) << "Received block with no known parent. Resyncing..."; - setNeedsSyncing(h, _r[2].toInt()); + setNeedsSyncing(h, _r[1].toInt()); break; } Guard l(x_knownBlocks); diff --git a/libp2p/Capability.cpp b/libp2p/Capability.cpp index 60f4fd7f3..dccc130cd 100644 --- a/libp2p/Capability.cpp +++ b/libp2p/Capability.cpp @@ -45,7 +45,7 @@ void Capability::disable(std::string const& _problem) RLPStream& Capability::prep(RLPStream& _s, unsigned _id, unsigned _args) { - return Session::prep(_s).appendList(_args + 1).append(_id + m_idOffset); + return _s.appendRaw(bytes(1, _id + m_idOffset)).appendList(_args); } void Capability::sealAndSend(RLPStream& _s) diff --git a/libp2p/Host.cpp b/libp2p/Host.cpp index 057453d8f..eb5e8adc1 100644 --- a/libp2p/Host.cpp +++ b/libp2p/Host.cpp @@ -187,7 +187,7 @@ bool Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io clog(NetMessageSummary) << "Hello: " << clientVersion << "V[" << protocolVersion << "]" << _id.abridged() << showbase << capslog.str() << dec << listenPort; // create session so disconnects are managed - auto ps = make_shared(this, move(*_io), p, PeerSessionInfo({_id, clientVersion, _endpoint.address().to_string(), listenPort, chrono::steady_clock::duration(), _rlp[3].toSet(), 0, map()})); + auto ps = make_shared(this, _io, p, PeerSessionInfo({_id, clientVersion, _endpoint.address().to_string(), listenPort, chrono::steady_clock::duration(), _rlp[3].toSet(), 0, map()})); if (protocolVersion != this->protocolVersion()) { ps->disconnect(IncompatibleProtocol); @@ -273,11 +273,6 @@ void Host::onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e) void Host::seal(bytes& _b) { - uint32_t len = (uint32_t)_b.size() - 4; - _b[0] = (len >> 24) & 0xff; - _b[1] = (len >> 16) & 0xff; - _b[2] = (len >> 8) & 0xff; - _b[3] = len & 0xff; } void Host::determinePublic(string const& _publicAddress, bool _upnp) diff --git a/libp2p/RLPxFrameIO.cpp b/libp2p/RLPxFrameIO.cpp index 3ddaf9143..8531759e9 100644 --- a/libp2p/RLPxFrameIO.cpp +++ b/libp2p/RLPxFrameIO.cpp @@ -57,12 +57,16 @@ RLPXFrameIO::RLPXFrameIO(RLPXHandshake const& _init): m_socket(_init.m_socket) // aes-secret = sha3(ecdhe-shared-secret || shared-secret) sha3(keyMaterial, outRef); // output aes-secret - m_frameEnc.SetKeyWithIV(outRef.data(), h128::size, h128().data()); - m_frameDec.SetKeyWithIV(outRef.data(), h128::size, h128().data()); + SecByteBlock aesSecretEnc(outRef.data(), h128::size); + SecByteBlock aesSecretDec(outRef.data(), h128::size); + SecByteBlock emptyIV(h128::size); + m_frameEnc.SetKeyWithIV(aesSecretEnc, h128::size, emptyIV); + m_frameDec.SetKeyWithIV(aesSecretDec, h128::size, emptyIV); // mac-secret = sha3(ecdhe-shared-secret || aes-secret) sha3(keyMaterial, outRef); // output mac-secret - m_macEnc.SetKey(outRef.data(), h128::size); + SecByteBlock macSecret(outRef.data(), h128::size); + m_macEnc.SetKey(macSecret, h128::size); // Initiator egress-mac: sha3(mac-secret^recipient-nonce || auth-sent-init) // ingress-mac: sha3(mac-secret^initiator-nonce || auth-recvd-ack) @@ -91,16 +95,17 @@ void RLPXFrameIO::writeSingleFramePacket(bytesConstRef _packet, bytes& o_bytes) // current/old packet format: prep(_s).appendList(_args + 1).append((unsigned)_id); RLPStream header; - header.appendRaw(bytes({byte(_packet.size() >> 16), byte(_packet.size() >> 8), byte(_packet.size())})); + uint32_t len = (uint32_t)_packet.size(); + header.appendRaw(bytes({byte((len >> 16) & 0xff), byte((len >> 8) & 0xff), byte(len & 0xff)})); // zeroHeader: []byte{0xC2, 0x80, 0x80}. Should be rlpList(protocolType,seqId,totalPacketSize). header.appendRaw(bytes({0xc2,0x80,0x80})); // TODO: SECURITY check that header is <= 16 bytes - - bytes headerWithMac; - header.swapOut(headerWithMac); - headerWithMac.resize(32); - m_frameEnc.ProcessData(headerWithMac.data(), headerWithMac.data(), 16); + + bytes headerWithMac(32); + bytes headerBytes(16); + bytesConstRef(&header.out()).copyTo(&headerBytes); + m_frameEnc.ProcessData(headerWithMac.data(), headerBytes.data(), 16); updateEgressMACWithHeader(bytesConstRef(&headerWithMac).cropped(0, 16)); egressDigest().ref().copyTo(bytesRef(&headerWithMac).cropped(h128::size,h128::size)); @@ -116,17 +121,16 @@ void RLPXFrameIO::writeSingleFramePacket(bytesConstRef _packet, bytes& o_bytes) updateEgressMACWithEndOfFrame(packetWithPaddingRef); bytesRef macRef(o_bytes.data() + 32 + _packet.size() + padding, h128::size); egressDigest().ref().copyTo(macRef); - clog(NetConnect) << "SENT FRAME " << _packet.size() << *(h128*)macRef.data(); - clog(NetConnect) << "FRAME TAIL " << *(h128*)(o_bytes.data() + 32 + _packet.size() + padding); } -bool RLPXFrameIO::authAndDecryptHeader(h256& io) +bool RLPXFrameIO::authAndDecryptHeader(bytesRef io) { - updateIngressMACWithHeader(io.ref()); - bytesConstRef macRef = io.ref().cropped(h128::size, h128::size); + asserts(io.size() == h256::size); + updateIngressMACWithHeader(io); + bytesConstRef macRef = io.cropped(h128::size, h128::size); if (*(h128*)macRef.data() != ingressDigest()) return false; - m_frameDec.ProcessData(io.data(), io.data(), 16); + m_frameDec.ProcessData(io.data(), io.data(), h128::size); return true; } @@ -159,7 +163,7 @@ h128 RLPXFrameIO::ingressDigest() void RLPXFrameIO::updateEgressMACWithHeader(bytesConstRef _headerCipher) { - updateMAC(m_egressMac, *(h128*)_headerCipher.data()); + updateMAC(m_egressMac, _headerCipher.cropped(0, 16)); } void RLPXFrameIO::updateEgressMACWithEndOfFrame(bytesConstRef _cipher) @@ -170,13 +174,12 @@ void RLPXFrameIO::updateEgressMACWithEndOfFrame(bytesConstRef _cipher) SHA3_256 prev(m_egressMac); h128 digest; prev.TruncatedFinal(digest.data(), h128::size); - clog(NetConnect) << "EGRESS FRAMEMAC " << _cipher.size() << digest; } } void RLPXFrameIO::updateIngressMACWithHeader(bytesConstRef _headerCipher) { - updateMAC(m_ingressMac, *(h128*)_headerCipher.data()); + updateMAC(m_ingressMac, _headerCipher.cropped(0, 16)); } void RLPXFrameIO::updateIngressMACWithEndOfFrame(bytesConstRef _cipher) @@ -187,20 +190,28 @@ void RLPXFrameIO::updateIngressMACWithEndOfFrame(bytesConstRef _cipher) SHA3_256 prev(m_ingressMac); h128 digest; prev.TruncatedFinal(digest.data(), h128::size); - clog(NetConnect) << "INGRESS FRAMEMAC " << _cipher.size() << digest; } } -void RLPXFrameIO::updateMAC(SHA3_256& _mac, h128 const& _seed) +void RLPXFrameIO::updateMAC(SHA3_256& _mac, bytesConstRef _seed) { + if (_seed.size() && _seed.size() != h128::size) + asserts(false); + SHA3_256 prevDigest(_mac); - h128 prevDigestOut; - prevDigest.TruncatedFinal(prevDigestOut.data(), h128::size); - - h128 encDigest; - m_macEnc.ProcessData(encDigest.data(), prevDigestOut.data(), h128::size); - encDigest ^= (!!_seed ? _seed : prevDigestOut); - + h128 encDigest(h128::size); + prevDigest.TruncatedFinal(encDigest.data(), h128::size); + h128 prevDigestOut = encDigest; + + { + Guard l(x_macEnc); + m_macEnc.ProcessData(encDigest.data(), encDigest.data(), 16); + } + if (_seed.size()) + encDigest ^= *(h128*)_seed.data(); + else + encDigest ^= *(h128*)prevDigestOut.data(); + // update mac for final digest _mac.Update(encDigest.data(), h128::size); } diff --git a/libp2p/RLPxFrameIO.h b/libp2p/RLPxFrameIO.h index 07211d4de..a8b2011eb 100644 --- a/libp2p/RLPxFrameIO.h +++ b/libp2p/RLPxFrameIO.h @@ -26,6 +26,7 @@ #include #include #include +#include #include "Common.h" namespace ba = boost::asio; namespace bi = boost::asio::ip; @@ -57,11 +58,12 @@ class RLPXFrameIO friend class Session; public: RLPXFrameIO(RLPXHandshake const& _init); + ~RLPXFrameIO() {} void writeSingleFramePacket(bytesConstRef _packet, bytes& o_bytes); /// Authenticates and decrypts header in-place. - bool authAndDecryptHeader(h256& io_cipherWithMac); + bool authAndDecryptHeader(bytesRef io_cipherWithMac); /// Authenticates and decrypts frame in-place. bool authAndDecryptFrame(bytesRef io_cipherWithMac); @@ -82,10 +84,11 @@ protected: bi::tcp::socket& socket() { return m_socket->ref(); } private: - void updateMAC(CryptoPP::SHA3_256& _mac, h128 const& _seed = h128()); + void updateMAC(CryptoPP::SHA3_256& _mac, bytesConstRef _seed = bytesConstRef()); CryptoPP::CTR_Mode::Encryption m_frameEnc; CryptoPP::CTR_Mode::Encryption m_frameDec; + Mutex x_macEnc; CryptoPP::ECB_Mode::Encryption m_macEnc; CryptoPP::SHA3_256 m_egressMac; CryptoPP::SHA3_256 m_ingressMac; diff --git a/libp2p/RLPxHandshake.cpp b/libp2p/RLPxHandshake.cpp index bca1afd61..004880c39 100644 --- a/libp2p/RLPxHandshake.cpp +++ b/libp2p/RLPxHandshake.cpp @@ -166,7 +166,7 @@ void RLPXHandshake::transition(boost::system::error_code _ech) else clog(NetConnect) << "p2p.connect.ingress sending capabilities handshake"; - m_io.reset(new RLPXFrameIO(*this)); + m_io = new RLPXFrameIO(*this); // old packet format // 5 arguments, HelloPacket @@ -193,14 +193,14 @@ void RLPXHandshake::transition(boost::system::error_code _ech) // read frame header m_handshakeInBuffer.resize(h256::size); - ba::async_read(m_socket->ref(), boost::asio::buffer(m_handshakeInBuffer, h256::size), [this,self](boost::system::error_code ec, std::size_t length) + ba::async_read(m_socket->ref(), boost::asio::buffer(m_handshakeInBuffer, h256::size), [this, self](boost::system::error_code ec, std::size_t length) { if (ec) transition(ec); else { /// authenticate and decrypt header - if (!m_io->authAndDecryptHeader(*(h256*)m_handshakeInBuffer.data())) + if (!m_io->authAndDecryptHeader(bytesRef(m_handshakeInBuffer.data(), h256::size))) { m_nextState = Error; transition(); @@ -215,7 +215,7 @@ void RLPXHandshake::transition(boost::system::error_code _ech) if (frameSize > 1024) { // all future frames: 16777216 - clog(NetWarn) << (m_originated ? "p2p.connect.egress" : "p2p.connect.ingress") << "hello frame is too large"; + clog(NetWarn) << (m_originated ? "p2p.connect.egress" : "p2p.connect.ingress") << "hello frame is too large" << frameSize; m_nextState = Error; transition(); return; @@ -251,7 +251,7 @@ void RLPXHandshake::transition(boost::system::error_code _ech) return; } - m_host->startPeerSession(m_remote, rlp, m_io.get(), m_socket->remoteEndpoint()); + m_host->startPeerSession(m_remote, rlp, m_io, m_socket->remoteEndpoint()); } }); } diff --git a/libp2p/RLPxHandshake.h b/libp2p/RLPxHandshake.h index 12abe4379..cd5e7fb83 100644 --- a/libp2p/RLPxHandshake.h +++ b/libp2p/RLPxHandshake.h @@ -94,7 +94,7 @@ protected: h256 m_remoteNonce; /// Frame IO is used to read frame for last step of handshake authentication. - std::unique_ptr m_io; + RLPXFrameIO* m_io; std::shared_ptr m_socket; }; diff --git a/libp2p/Session.cpp b/libp2p/Session.cpp index f44fa2d6e..da83c0b5e 100644 --- a/libp2p/Session.cpp +++ b/libp2p/Session.cpp @@ -38,10 +38,10 @@ using namespace dev::p2p; #endif #define clogS(X) dev::LogOutputStream(false) << "| " << std::setw(2) << m_socket.native_handle() << "] " -Session::Session(Host* _s, RLPXFrameIO _io, std::shared_ptr const& _n, PeerSessionInfo _info): +Session::Session(Host* _s, RLPXFrameIO* _io, std::shared_ptr const& _n, PeerSessionInfo _info): m_server(_s), - m_io(move(_io)), - m_socket(m_io.socket()), + m_io(_io), + m_socket(m_io->socket()), m_peer(_n), m_info(_info), m_ping(chrono::steady_clock::time_point::max()) @@ -67,6 +67,7 @@ Session::~Session() } } catch (...){} + delete m_io; } NodeId Session::id() const @@ -144,21 +145,21 @@ void Session::serviceNodesRequest() addNote("peers", "done"); } -bool Session::interpret(RLP const& _r) +bool Session::interpret(PacketType _t, RLP const& _r) { m_lastReceived = chrono::steady_clock::now(); - clogS(NetRight) << _r; + clogS(NetRight) << _t << _r; try // Generic try-catch block designed to capture RLP format errors - TODO: give decent diagnostics, make a bit more specific over what is caught. { - switch ((PacketType)_r[0].toInt()) + switch (_t) { case DisconnectPacket: { string reason = "Unspecified"; - auto r = (DisconnectReason)_r[1].toInt(); - if (!_r[1].isInt()) + auto r = (DisconnectReason)_r[0].toInt(); + if (!_r[0].isInt()) drop(BadProtocol); else { @@ -197,7 +198,7 @@ bool Session::interpret(RLP const& _r) clogS(NetTriviaSummary) << "Peers (" << dec << (_r.itemCount() - 1) << " entries)"; m_weRequestedNodes = false; - for (unsigned i = 1; i < _r.itemCount(); ++i) + for (unsigned i = 0; i < _r.itemCount(); ++i) { bi::address peerAddress; if (_r[i][0].size() == 16) @@ -247,12 +248,11 @@ bool Session::interpret(RLP const& _r) break; default: { - auto id = _r[0].toInt(); for (auto const& i: m_capabilities) - if (id >= i.second->m_idOffset && id - i.second->m_idOffset < i.second->hostCapability()->messageCount()) + if (_t >= i.second->m_idOffset && _t - i.second->m_idOffset < i.second->hostCapability()->messageCount()) { if (i.second->m_enabled) - return i.second->interpret(id - i.second->m_idOffset, _r); + return i.second->interpret(_t - i.second->m_idOffset, _r); else return true; } @@ -278,12 +278,7 @@ void Session::ping() RLPStream& Session::prep(RLPStream& _s, PacketType _id, unsigned _args) { - return prep(_s).appendList(_args + 1).append((unsigned)_id); -} - -RLPStream& Session::prep(RLPStream& _s) -{ - return _s.appendRaw(bytes(4, 0)); + return _s.appendRaw(bytes(1, _id)).appendList(_args); } void Session::sealAndSend(RLPStream& _s) @@ -296,32 +291,32 @@ void Session::sealAndSend(RLPStream& _s) bool Session::checkPacket(bytesConstRef _msg) { - if (_msg.size() < 5) + if (_msg.size() < 2) return false; - uint32_t len = ((_msg[0] * 256 + _msg[1]) * 256 + _msg[2]) * 256 + _msg[3]; - if (_msg.size() != len + 4) + if (_msg[0] > 0x7f) return false; - RLP r(_msg.cropped(4)); - if (r.actualSize() != len) + RLP r(_msg.cropped(1)); + if (r.actualSize() + 1 != _msg.size()) return false; return true; } -void Session::send(bytesConstRef _msg) -{ - send(_msg.toBytes()); -} +//void Session::send(bytesConstRef _msg) +//{ +// send(_msg.toBytes()); +//} void Session::send(bytes&& _msg) { - clogS(NetLeft) << RLP(bytesConstRef(&_msg).cropped(4)); + clogS(NetLeft) << RLP(bytesConstRef(&_msg).cropped(1)); - if (!checkPacket(bytesConstRef(&_msg))) + bytesConstRef msg(&_msg); + if (!checkPacket(msg)) clogS(NetWarn) << "INVALID PACKET CONSTRUCTED!"; if (!m_socket.is_open()) return; - + bool doWrite = false; { Guard l(x_writeQueue); @@ -336,6 +331,7 @@ void Session::send(bytes&& _msg) void Session::write() { const bytes& bytes = m_writeQueue[0]; + m_io->writeSingleFramePacket(&bytes, m_writeQueue[0]); auto self(shared_from_this()); ba::async_write(m_socket, ba::buffer(bytes), [this, self](boost::system::error_code ec, std::size_t /*length*/) { @@ -415,7 +411,7 @@ void Session::doRead() return; auto self(shared_from_this()); - m_socket.async_read_some(boost::asio::buffer(m_data), [this,self](boost::system::error_code ec, std::size_t length) + ba::async_read(m_socket, boost::asio::buffer(m_data, h256::size), [this,self](boost::system::error_code ec, std::size_t length) { if (ec && ec.category() != boost::asio::error::get_misc_category() && ec.value() != boost::asio::error::eof) { @@ -426,50 +422,66 @@ void Session::doRead() return; else { - try + /// authenticate and decrypt header + bytesRef header(m_data.data(), h256::size); + if (!m_io->authAndDecryptHeader(header)) + { + clog(NetWarn) << "header decrypt failed"; + drop(BadProtocol); // todo: better error + return; + } + + /// check frame size + uint32_t frameSize = (m_data[0] * 256 + m_data[1]) * 256 + m_data[2]; + if (frameSize > 16777216) + { + clog(NetWarn) << "frame size too large"; + drop(BadProtocol); + return; + } + + /// rlp of header has protocol-type, sequence-id[, total-packet-size] + bytes headerRLP(13); + bytesConstRef(m_data.data(), h128::size).cropped(3).copyTo(&headerRLP); + + /// read padded frame and mac + auto tlen = frameSize + ((16 - (frameSize % 16)) % 16) + h128::size; + ba::async_read(m_socket, boost::asio::buffer(m_data, tlen), [this, self, headerRLP, frameSize, tlen](boost::system::error_code ec, std::size_t length) { - m_incoming.resize(m_incoming.size() + length); - memcpy(m_incoming.data() + m_incoming.size() - length, m_data.data(), length); - - // 4 bytes for length header - const uint32_t c_hLen = 4; - while (m_incoming.size() > c_hLen) + if (ec && ec.category() != boost::asio::error::get_misc_category() && ec.value() != boost::asio::error::eof) + { + clogS(NetWarn) << "Error reading: " << ec.message(); + drop(TCPError); + } + else if (ec && length == 0) + return; + else { - // break if data recvd is less than expected size of packet. - uint32_t len = fromBigEndian(bytesConstRef(m_incoming.data(), c_hLen)); - uint32_t tlen = c_hLen + len; - if (m_incoming.size() < tlen) - break; - bytesConstRef frame(m_incoming.data(), tlen); - bytesConstRef packet = frame.cropped(c_hLen); + if (!m_io->authAndDecryptFrame(bytesRef(m_data.data(), tlen))) + { + clog(NetWarn) << "frame decrypt failed"; + drop(BadProtocol); // todo: better error + return; + } + + bytesConstRef frame(m_data.data(), frameSize); if (!checkPacket(frame)) { - cerr << "Received " << packet.size() << ": " << toHex(packet) << endl; + cerr << "Received " << frame.size() << ": " << toHex(frame) << endl; clogS(NetWarn) << "INVALID MESSAGE RECEIVED"; disconnect(BadProtocol); return; } else { - RLP r(packet); - if (!interpret(r)) + auto packetType = (PacketType)RLP(frame.cropped(0, 1)).toInt(); + RLP r(frame.cropped(1)); + if (!interpret(packetType, r)) clogS(NetWarn) << "Couldn't interpret packet." << RLP(r); } - memmove(m_incoming.data(), m_incoming.data() + tlen, m_incoming.size() - tlen); - m_incoming.resize(m_incoming.size() - tlen); + doRead(); } - doRead(); - } - catch (Exception const& _e) - { - clogS(NetWarn) << "ERROR: " << diagnostic_information(_e); - drop(BadProtocol); - } - catch (std::exception const& _e) - { - clogS(NetWarn) << "ERROR: " << _e.what(); - drop(BadProtocol); - } + }); } }); } diff --git a/libp2p/Session.h b/libp2p/Session.h index 89ef5fd74..77b7de7bb 100644 --- a/libp2p/Session.h +++ b/libp2p/Session.h @@ -54,7 +54,7 @@ class Session: public std::enable_shared_from_this friend class HostCapabilityFace; public: - Session(Host* _server, RLPXFrameIO _io, std::shared_ptr const& _n, PeerSessionInfo _info); + Session(Host* _server, RLPXFrameIO* _io, std::shared_ptr const& _n, PeerSessionInfo _info); virtual ~Session(); void start(); @@ -71,7 +71,6 @@ public: std::shared_ptr cap() const { try { return std::static_pointer_cast(m_capabilities.at(std::make_pair(PeerCap::name(), PeerCap::version()))); } catch (...) { return nullptr; } } static RLPStream& prep(RLPStream& _s, PacketType _t, unsigned _args = 0); - static RLPStream& prep(RLPStream& _s); void sealAndSend(RLPStream& _s); int rating() const; @@ -86,7 +85,7 @@ public: private: void send(bytes&& _msg); - void send(bytesConstRef _msg); +// void send(bytesConstRef _msg); /// Drop the connection for the reason @a _r. void drop(DisconnectReason _r); @@ -98,18 +97,18 @@ private: void write(); /// Interpret an incoming message. - bool interpret(RLP const& _r); + bool interpret(PacketType _t, RLP const& _r); /// @returns true iff the _msg forms a valid message for sending or receiving on the network. static bool checkPacket(bytesConstRef _msg); Host* m_server; ///< The host that owns us. Never null. - RLPXFrameIO m_io; ///< Transport over which packets are sent. + RLPXFrameIO* m_io; ///< Transport over which packets are sent. bi::tcp::socket& m_socket; ///< Socket for the peer's connection. Mutex x_writeQueue; ///< Mutex for the write queue. std::deque m_writeQueue; ///< The write queue. - std::array m_data; ///< Buffer for ingress packet data. + std::array m_data; ///< Buffer for ingress packet data. bytes m_incoming; ///< Read buffer for ingress bytes. unsigned m_protocolVersion = 0; ///< The protocol version of the peer. diff --git a/libwhisper/WhisperPeer.cpp b/libwhisper/WhisperPeer.cpp index 7480a104e..53ea91a9e 100644 --- a/libwhisper/WhisperPeer.cpp +++ b/libwhisper/WhisperPeer.cpp @@ -55,7 +55,7 @@ bool WhisperPeer::interpret(unsigned _id, RLP const& _r) { case StatusPacket: { - auto protocolVersion = _r[1].toInt(); + auto protocolVersion = _r[0].toInt(); clogS(NetMessageSummary) << "Status: " << protocolVersion; diff --git a/test/peer.cpp b/test/peer.cpp index 7f3c19e1e..0fe3fd1ed 100644 --- a/test/peer.cpp +++ b/test/peer.cpp @@ -32,6 +32,9 @@ BOOST_AUTO_TEST_SUITE(p2p) BOOST_AUTO_TEST_CASE(host) { + auto oldLogVerbosity = g_logVerbosity; + g_logVerbosity = 10; + NetworkPreferences host1prefs(30301, "127.0.0.1", true, true); NetworkPreferences host2prefs(30302, "127.0.0.1", true, true); @@ -44,10 +47,14 @@ BOOST_AUTO_TEST_CASE(host) host1.addNode(node2, "127.0.0.1", host2prefs.listenPort, host2prefs.listenPort); - this_thread::sleep_for(chrono::seconds(1)); + this_thread::sleep_for(chrono::seconds(3)); + + auto host1peerCount = host1.peerCount(); + auto host2peerCount = host2.peerCount(); + BOOST_REQUIRE_EQUAL(host1peerCount, 1); + BOOST_REQUIRE_EQUAL(host2peerCount, 1); - BOOST_REQUIRE_EQUAL(host1.peerCount(), 1); - BOOST_REQUIRE_EQUAL(host2.peerCount(), host1.peerCount()); + g_logVerbosity = oldLogVerbosity; } BOOST_AUTO_TEST_CASE(save_nodes) @@ -71,7 +78,7 @@ BOOST_AUTO_TEST_CASE(save_nodes) for (auto const& h: hosts) host2.addNode(h->id(), "127.0.0.1", h->listenPort(), h->listenPort()); - this_thread::sleep_for(chrono::milliseconds(1000)); + this_thread::sleep_for(chrono::milliseconds(2000)); bytes firstHostNetwork(host.saveNetwork()); bytes secondHostNetwork(host.saveNetwork());