Browse Source

separate packet type. add coding.

cl-refactor
subtly 10 years ago
parent
commit
df96fcd03b
  1. 28
      libethereum/EthereumPeer.cpp
  2. 2
      libp2p/Capability.cpp
  3. 7
      libp2p/Host.cpp
  4. 65
      libp2p/RLPxFrameIO.cpp
  5. 7
      libp2p/RLPxFrameIO.h
  6. 10
      libp2p/RLPxHandshake.cpp
  7. 2
      libp2p/RLPxHandshake.h
  8. 136
      libp2p/Session.cpp
  9. 11
      libp2p/Session.h
  10. 2
      libwhisper/WhisperPeer.cpp
  11. 15
      test/peer.cpp

28
libethereum/EthereumPeer.cpp

@ -297,13 +297,13 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
{ {
case StatusPacket: case StatusPacket:
{ {
m_protocolVersion = _r[1].toInt<unsigned>(); m_protocolVersion = _r[0].toInt<unsigned>();
m_networkId = _r[2].toInt<u256>(); m_networkId = _r[1].toInt<u256>();
// a bit dirty as we're misusing these to communicate the values to transition, but harmless. // a bit dirty as we're misusing these to communicate the values to transition, but harmless.
m_totalDifficulty = _r[3].toInt<u256>(); m_totalDifficulty = _r[2].toInt<u256>();
m_latestHash = _r[4].toHash<h256>(); m_latestHash = _r[3].toHash<h256>();
auto genesisHash = _r[5].toHash<h256>(); auto genesisHash = _r[4].toHash<h256>();
clogS(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << genesisHash.abridged() << ", TD:" << m_totalDifficulty << "=" << m_latestHash.abridged(); clogS(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << genesisHash.abridged() << ", TD:" << m_totalDifficulty << "=" << m_latestHash.abridged();
@ -327,7 +327,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
clogS(NetMessageSummary) << "Transactions (" << dec << (_r.itemCount() - 1) << "entries)"; clogS(NetMessageSummary) << "Transactions (" << dec << (_r.itemCount() - 1) << "entries)";
addRating(_r.itemCount() - 1); addRating(_r.itemCount() - 1);
Guard l(x_knownTransactions); Guard l(x_knownTransactions);
for (unsigned i = 1; i < _r.itemCount(); ++i) for (unsigned i = 0; i < _r.itemCount(); ++i)
{ {
auto h = sha3(_r[i].data()); auto h = sha3(_r[i].data());
m_knownTransactions.insert(h); m_knownTransactions.insert(h);
@ -339,8 +339,8 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
} }
case GetBlockHashesPacket: case GetBlockHashesPacket:
{ {
h256 later = _r[1].toHash<h256>(); h256 later = _r[0].toHash<h256>();
unsigned limit = _r[2].toInt<unsigned>(); unsigned limit = _r[1].toInt<unsigned>();
clogS(NetMessageSummary) << "GetBlockHashes (" << limit << "entries," << later.abridged() << ")"; clogS(NetMessageSummary) << "GetBlockHashes (" << limit << "entries," << later.abridged() << ")";
unsigned c = min<unsigned>(host()->m_chain.number(later), limit); unsigned c = min<unsigned>(host()->m_chain.number(later), limit);
@ -367,7 +367,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
transition(Asking::Blocks); transition(Asking::Blocks);
return true; return true;
} }
for (unsigned i = 1; i < _r.itemCount(); ++i) for (unsigned i = 0; i < _r.itemCount(); ++i)
{ {
auto h = _r[i].toHash<h256>(); auto h = _r[i].toHash<h256>();
if (host()->m_chain.isKnown(h)) if (host()->m_chain.isKnown(h))
@ -388,7 +388,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
// return the requested blocks. // return the requested blocks.
bytes rlp; bytes rlp;
unsigned n = 0; unsigned n = 0;
for (unsigned i = 1; i < _r.itemCount() && i <= c_maxBlocks; ++i) for (unsigned i = 0; i < _r.itemCount() && i <= c_maxBlocks; ++i)
{ {
auto b = host()->m_chain.block(_r[i].toHash<h256>()); auto b = host()->m_chain.block(_r[i].toHash<h256>());
if (b.size()) if (b.size())
@ -422,7 +422,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
unsigned got = 0; unsigned got = 0;
unsigned repeated = 0; unsigned repeated = 0;
for (unsigned i = 1; i < _r.itemCount(); ++i) for (unsigned i = 0; i < _r.itemCount(); ++i)
{ {
auto h = BlockInfo::headerHash(_r[i].data()); auto h = BlockInfo::headerHash(_r[i].data());
if (m_sub.noteBlock(h)) if (m_sub.noteBlock(h))
@ -467,14 +467,14 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
} }
case NewBlockPacket: case NewBlockPacket:
{ {
auto h = BlockInfo::headerHash(_r[1].data()); auto h = BlockInfo::headerHash(_r[0].data());
clogS(NetMessageSummary) << "NewBlock: " << h.abridged(); clogS(NetMessageSummary) << "NewBlock: " << h.abridged();
if (_r.itemCount() != 3) if (_r.itemCount() != 3)
disable("NewBlock without 2 data fields."); disable("NewBlock without 2 data fields.");
else else
{ {
switch (host()->m_bq.import(_r[1].data(), host()->m_chain)) switch (host()->m_bq.import(_r[0].data(), host()->m_chain))
{ {
case ImportResult::Success: case ImportResult::Success:
addRating(100); addRating(100);
@ -493,7 +493,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
case ImportResult::UnknownParent: case ImportResult::UnknownParent:
clogS(NetMessageSummary) << "Received block with no known parent. Resyncing..."; clogS(NetMessageSummary) << "Received block with no known parent. Resyncing...";
setNeedsSyncing(h, _r[2].toInt<u256>()); setNeedsSyncing(h, _r[1].toInt<u256>());
break; break;
} }
Guard l(x_knownBlocks); Guard l(x_knownBlocks);

2
libp2p/Capability.cpp

@ -45,7 +45,7 @@ void Capability::disable(std::string const& _problem)
RLPStream& Capability::prep(RLPStream& _s, unsigned _id, unsigned _args) RLPStream& Capability::prep(RLPStream& _s, unsigned _id, unsigned _args)
{ {
return Session::prep(_s).appendList(_args + 1).append(_id + m_idOffset); return _s.appendRaw(bytes(1, _id + m_idOffset)).appendList(_args);
} }
void Capability::sealAndSend(RLPStream& _s) void Capability::sealAndSend(RLPStream& _s)

7
libp2p/Host.cpp

@ -187,7 +187,7 @@ bool Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io
clog(NetMessageSummary) << "Hello: " << clientVersion << "V[" << protocolVersion << "]" << _id.abridged() << showbase << capslog.str() << dec << listenPort; clog(NetMessageSummary) << "Hello: " << clientVersion << "V[" << protocolVersion << "]" << _id.abridged() << showbase << capslog.str() << dec << listenPort;
// create session so disconnects are managed // create session so disconnects are managed
auto ps = make_shared<Session>(this, move(*_io), p, PeerSessionInfo({_id, clientVersion, _endpoint.address().to_string(), listenPort, chrono::steady_clock::duration(), _rlp[3].toSet<CapDesc>(), 0, map<string, string>()})); auto ps = make_shared<Session>(this, _io, p, PeerSessionInfo({_id, clientVersion, _endpoint.address().to_string(), listenPort, chrono::steady_clock::duration(), _rlp[3].toSet<CapDesc>(), 0, map<string, string>()}));
if (protocolVersion != this->protocolVersion()) if (protocolVersion != this->protocolVersion())
{ {
ps->disconnect(IncompatibleProtocol); ps->disconnect(IncompatibleProtocol);
@ -273,11 +273,6 @@ void Host::onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e)
void Host::seal(bytes& _b) void Host::seal(bytes& _b)
{ {
uint32_t len = (uint32_t)_b.size() - 4;
_b[0] = (len >> 24) & 0xff;
_b[1] = (len >> 16) & 0xff;
_b[2] = (len >> 8) & 0xff;
_b[3] = len & 0xff;
} }
void Host::determinePublic(string const& _publicAddress, bool _upnp) void Host::determinePublic(string const& _publicAddress, bool _upnp)

65
libp2p/RLPxFrameIO.cpp

@ -57,12 +57,16 @@ RLPXFrameIO::RLPXFrameIO(RLPXHandshake const& _init): m_socket(_init.m_socket)
// aes-secret = sha3(ecdhe-shared-secret || shared-secret) // aes-secret = sha3(ecdhe-shared-secret || shared-secret)
sha3(keyMaterial, outRef); // output aes-secret sha3(keyMaterial, outRef); // output aes-secret
m_frameEnc.SetKeyWithIV(outRef.data(), h128::size, h128().data()); SecByteBlock aesSecretEnc(outRef.data(), h128::size);
m_frameDec.SetKeyWithIV(outRef.data(), h128::size, h128().data()); SecByteBlock aesSecretDec(outRef.data(), h128::size);
SecByteBlock emptyIV(h128::size);
m_frameEnc.SetKeyWithIV(aesSecretEnc, h128::size, emptyIV);
m_frameDec.SetKeyWithIV(aesSecretDec, h128::size, emptyIV);
// mac-secret = sha3(ecdhe-shared-secret || aes-secret) // mac-secret = sha3(ecdhe-shared-secret || aes-secret)
sha3(keyMaterial, outRef); // output mac-secret sha3(keyMaterial, outRef); // output mac-secret
m_macEnc.SetKey(outRef.data(), h128::size); SecByteBlock macSecret(outRef.data(), h128::size);
m_macEnc.SetKey(macSecret, h128::size);
// Initiator egress-mac: sha3(mac-secret^recipient-nonce || auth-sent-init) // Initiator egress-mac: sha3(mac-secret^recipient-nonce || auth-sent-init)
// ingress-mac: sha3(mac-secret^initiator-nonce || auth-recvd-ack) // ingress-mac: sha3(mac-secret^initiator-nonce || auth-recvd-ack)
@ -91,16 +95,17 @@ void RLPXFrameIO::writeSingleFramePacket(bytesConstRef _packet, bytes& o_bytes)
// current/old packet format: prep(_s).appendList(_args + 1).append((unsigned)_id); // current/old packet format: prep(_s).appendList(_args + 1).append((unsigned)_id);
RLPStream header; RLPStream header;
header.appendRaw(bytes({byte(_packet.size() >> 16), byte(_packet.size() >> 8), byte(_packet.size())})); uint32_t len = (uint32_t)_packet.size();
header.appendRaw(bytes({byte((len >> 16) & 0xff), byte((len >> 8) & 0xff), byte(len & 0xff)}));
// zeroHeader: []byte{0xC2, 0x80, 0x80}. Should be rlpList(protocolType,seqId,totalPacketSize). // zeroHeader: []byte{0xC2, 0x80, 0x80}. Should be rlpList(protocolType,seqId,totalPacketSize).
header.appendRaw(bytes({0xc2,0x80,0x80})); header.appendRaw(bytes({0xc2,0x80,0x80}));
// TODO: SECURITY check that header is <= 16 bytes // TODO: SECURITY check that header is <= 16 bytes
bytes headerWithMac; bytes headerWithMac(32);
header.swapOut(headerWithMac); bytes headerBytes(16);
headerWithMac.resize(32); bytesConstRef(&header.out()).copyTo(&headerBytes);
m_frameEnc.ProcessData(headerWithMac.data(), headerWithMac.data(), 16); m_frameEnc.ProcessData(headerWithMac.data(), headerBytes.data(), 16);
updateEgressMACWithHeader(bytesConstRef(&headerWithMac).cropped(0, 16)); updateEgressMACWithHeader(bytesConstRef(&headerWithMac).cropped(0, 16));
egressDigest().ref().copyTo(bytesRef(&headerWithMac).cropped(h128::size,h128::size)); egressDigest().ref().copyTo(bytesRef(&headerWithMac).cropped(h128::size,h128::size));
@ -116,17 +121,16 @@ void RLPXFrameIO::writeSingleFramePacket(bytesConstRef _packet, bytes& o_bytes)
updateEgressMACWithEndOfFrame(packetWithPaddingRef); updateEgressMACWithEndOfFrame(packetWithPaddingRef);
bytesRef macRef(o_bytes.data() + 32 + _packet.size() + padding, h128::size); bytesRef macRef(o_bytes.data() + 32 + _packet.size() + padding, h128::size);
egressDigest().ref().copyTo(macRef); egressDigest().ref().copyTo(macRef);
clog(NetConnect) << "SENT FRAME " << _packet.size() << *(h128*)macRef.data();
clog(NetConnect) << "FRAME TAIL " << *(h128*)(o_bytes.data() + 32 + _packet.size() + padding);
} }
bool RLPXFrameIO::authAndDecryptHeader(h256& io) bool RLPXFrameIO::authAndDecryptHeader(bytesRef io)
{ {
updateIngressMACWithHeader(io.ref()); asserts(io.size() == h256::size);
bytesConstRef macRef = io.ref().cropped(h128::size, h128::size); updateIngressMACWithHeader(io);
bytesConstRef macRef = io.cropped(h128::size, h128::size);
if (*(h128*)macRef.data() != ingressDigest()) if (*(h128*)macRef.data() != ingressDigest())
return false; return false;
m_frameDec.ProcessData(io.data(), io.data(), 16); m_frameDec.ProcessData(io.data(), io.data(), h128::size);
return true; return true;
} }
@ -159,7 +163,7 @@ h128 RLPXFrameIO::ingressDigest()
void RLPXFrameIO::updateEgressMACWithHeader(bytesConstRef _headerCipher) void RLPXFrameIO::updateEgressMACWithHeader(bytesConstRef _headerCipher)
{ {
updateMAC(m_egressMac, *(h128*)_headerCipher.data()); updateMAC(m_egressMac, _headerCipher.cropped(0, 16));
} }
void RLPXFrameIO::updateEgressMACWithEndOfFrame(bytesConstRef _cipher) void RLPXFrameIO::updateEgressMACWithEndOfFrame(bytesConstRef _cipher)
@ -170,13 +174,12 @@ void RLPXFrameIO::updateEgressMACWithEndOfFrame(bytesConstRef _cipher)
SHA3_256 prev(m_egressMac); SHA3_256 prev(m_egressMac);
h128 digest; h128 digest;
prev.TruncatedFinal(digest.data(), h128::size); prev.TruncatedFinal(digest.data(), h128::size);
clog(NetConnect) << "EGRESS FRAMEMAC " << _cipher.size() << digest;
} }
} }
void RLPXFrameIO::updateIngressMACWithHeader(bytesConstRef _headerCipher) void RLPXFrameIO::updateIngressMACWithHeader(bytesConstRef _headerCipher)
{ {
updateMAC(m_ingressMac, *(h128*)_headerCipher.data()); updateMAC(m_ingressMac, _headerCipher.cropped(0, 16));
} }
void RLPXFrameIO::updateIngressMACWithEndOfFrame(bytesConstRef _cipher) void RLPXFrameIO::updateIngressMACWithEndOfFrame(bytesConstRef _cipher)
@ -187,20 +190,28 @@ void RLPXFrameIO::updateIngressMACWithEndOfFrame(bytesConstRef _cipher)
SHA3_256 prev(m_ingressMac); SHA3_256 prev(m_ingressMac);
h128 digest; h128 digest;
prev.TruncatedFinal(digest.data(), h128::size); prev.TruncatedFinal(digest.data(), h128::size);
clog(NetConnect) << "INGRESS FRAMEMAC " << _cipher.size() << digest;
} }
} }
void RLPXFrameIO::updateMAC(SHA3_256& _mac, h128 const& _seed) void RLPXFrameIO::updateMAC(SHA3_256& _mac, bytesConstRef _seed)
{ {
if (_seed.size() && _seed.size() != h128::size)
asserts(false);
SHA3_256 prevDigest(_mac); SHA3_256 prevDigest(_mac);
h128 prevDigestOut; h128 encDigest(h128::size);
prevDigest.TruncatedFinal(prevDigestOut.data(), h128::size); prevDigest.TruncatedFinal(encDigest.data(), h128::size);
h128 prevDigestOut = encDigest;
h128 encDigest;
m_macEnc.ProcessData(encDigest.data(), prevDigestOut.data(), h128::size); {
encDigest ^= (!!_seed ? _seed : prevDigestOut); Guard l(x_macEnc);
m_macEnc.ProcessData(encDigest.data(), encDigest.data(), 16);
}
if (_seed.size())
encDigest ^= *(h128*)_seed.data();
else
encDigest ^= *(h128*)prevDigestOut.data();
// update mac for final digest // update mac for final digest
_mac.Update(encDigest.data(), h128::size); _mac.Update(encDigest.data(), h128::size);
} }

7
libp2p/RLPxFrameIO.h

@ -26,6 +26,7 @@
#include <libdevcrypto/Common.h> #include <libdevcrypto/Common.h>
#include <libdevcrypto/ECDHE.h> #include <libdevcrypto/ECDHE.h>
#include <libdevcrypto/CryptoPP.h> #include <libdevcrypto/CryptoPP.h>
#include <libdevcore/Guards.h>
#include "Common.h" #include "Common.h"
namespace ba = boost::asio; namespace ba = boost::asio;
namespace bi = boost::asio::ip; namespace bi = boost::asio::ip;
@ -57,11 +58,12 @@ class RLPXFrameIO
friend class Session; friend class Session;
public: public:
RLPXFrameIO(RLPXHandshake const& _init); RLPXFrameIO(RLPXHandshake const& _init);
~RLPXFrameIO() {}
void writeSingleFramePacket(bytesConstRef _packet, bytes& o_bytes); void writeSingleFramePacket(bytesConstRef _packet, bytes& o_bytes);
/// Authenticates and decrypts header in-place. /// Authenticates and decrypts header in-place.
bool authAndDecryptHeader(h256& io_cipherWithMac); bool authAndDecryptHeader(bytesRef io_cipherWithMac);
/// Authenticates and decrypts frame in-place. /// Authenticates and decrypts frame in-place.
bool authAndDecryptFrame(bytesRef io_cipherWithMac); bool authAndDecryptFrame(bytesRef io_cipherWithMac);
@ -82,10 +84,11 @@ protected:
bi::tcp::socket& socket() { return m_socket->ref(); } bi::tcp::socket& socket() { return m_socket->ref(); }
private: private:
void updateMAC(CryptoPP::SHA3_256& _mac, h128 const& _seed = h128()); void updateMAC(CryptoPP::SHA3_256& _mac, bytesConstRef _seed = bytesConstRef());
CryptoPP::CTR_Mode<CryptoPP::AES>::Encryption m_frameEnc; CryptoPP::CTR_Mode<CryptoPP::AES>::Encryption m_frameEnc;
CryptoPP::CTR_Mode<CryptoPP::AES>::Encryption m_frameDec; CryptoPP::CTR_Mode<CryptoPP::AES>::Encryption m_frameDec;
Mutex x_macEnc;
CryptoPP::ECB_Mode<CryptoPP::AES>::Encryption m_macEnc; CryptoPP::ECB_Mode<CryptoPP::AES>::Encryption m_macEnc;
CryptoPP::SHA3_256 m_egressMac; CryptoPP::SHA3_256 m_egressMac;
CryptoPP::SHA3_256 m_ingressMac; CryptoPP::SHA3_256 m_ingressMac;

10
libp2p/RLPxHandshake.cpp

@ -166,7 +166,7 @@ void RLPXHandshake::transition(boost::system::error_code _ech)
else else
clog(NetConnect) << "p2p.connect.ingress sending capabilities handshake"; clog(NetConnect) << "p2p.connect.ingress sending capabilities handshake";
m_io.reset(new RLPXFrameIO(*this)); m_io = new RLPXFrameIO(*this);
// old packet format // old packet format
// 5 arguments, HelloPacket // 5 arguments, HelloPacket
@ -193,14 +193,14 @@ void RLPXHandshake::transition(boost::system::error_code _ech)
// read frame header // read frame header
m_handshakeInBuffer.resize(h256::size); m_handshakeInBuffer.resize(h256::size);
ba::async_read(m_socket->ref(), boost::asio::buffer(m_handshakeInBuffer, h256::size), [this,self](boost::system::error_code ec, std::size_t length) ba::async_read(m_socket->ref(), boost::asio::buffer(m_handshakeInBuffer, h256::size), [this, self](boost::system::error_code ec, std::size_t length)
{ {
if (ec) if (ec)
transition(ec); transition(ec);
else else
{ {
/// authenticate and decrypt header /// authenticate and decrypt header
if (!m_io->authAndDecryptHeader(*(h256*)m_handshakeInBuffer.data())) if (!m_io->authAndDecryptHeader(bytesRef(m_handshakeInBuffer.data(), h256::size)))
{ {
m_nextState = Error; m_nextState = Error;
transition(); transition();
@ -215,7 +215,7 @@ void RLPXHandshake::transition(boost::system::error_code _ech)
if (frameSize > 1024) if (frameSize > 1024)
{ {
// all future frames: 16777216 // all future frames: 16777216
clog(NetWarn) << (m_originated ? "p2p.connect.egress" : "p2p.connect.ingress") << "hello frame is too large"; clog(NetWarn) << (m_originated ? "p2p.connect.egress" : "p2p.connect.ingress") << "hello frame is too large" << frameSize;
m_nextState = Error; m_nextState = Error;
transition(); transition();
return; return;
@ -251,7 +251,7 @@ void RLPXHandshake::transition(boost::system::error_code _ech)
return; return;
} }
m_host->startPeerSession(m_remote, rlp, m_io.get(), m_socket->remoteEndpoint()); m_host->startPeerSession(m_remote, rlp, m_io, m_socket->remoteEndpoint());
} }
}); });
} }

2
libp2p/RLPxHandshake.h

@ -94,7 +94,7 @@ protected:
h256 m_remoteNonce; h256 m_remoteNonce;
/// Frame IO is used to read frame for last step of handshake authentication. /// Frame IO is used to read frame for last step of handshake authentication.
std::unique_ptr<RLPXFrameIO> m_io; RLPXFrameIO* m_io;
std::shared_ptr<RLPXSocket> m_socket; std::shared_ptr<RLPXSocket> m_socket;
}; };

136
libp2p/Session.cpp

@ -38,10 +38,10 @@ using namespace dev::p2p;
#endif #endif
#define clogS(X) dev::LogOutputStream<X, true>(false) << "| " << std::setw(2) << m_socket.native_handle() << "] " #define clogS(X) dev::LogOutputStream<X, true>(false) << "| " << std::setw(2) << m_socket.native_handle() << "] "
Session::Session(Host* _s, RLPXFrameIO _io, std::shared_ptr<Peer> const& _n, PeerSessionInfo _info): Session::Session(Host* _s, RLPXFrameIO* _io, std::shared_ptr<Peer> const& _n, PeerSessionInfo _info):
m_server(_s), m_server(_s),
m_io(move(_io)), m_io(_io),
m_socket(m_io.socket()), m_socket(m_io->socket()),
m_peer(_n), m_peer(_n),
m_info(_info), m_info(_info),
m_ping(chrono::steady_clock::time_point::max()) m_ping(chrono::steady_clock::time_point::max())
@ -67,6 +67,7 @@ Session::~Session()
} }
} }
catch (...){} catch (...){}
delete m_io;
} }
NodeId Session::id() const NodeId Session::id() const
@ -144,21 +145,21 @@ void Session::serviceNodesRequest()
addNote("peers", "done"); addNote("peers", "done");
} }
bool Session::interpret(RLP const& _r) bool Session::interpret(PacketType _t, RLP const& _r)
{ {
m_lastReceived = chrono::steady_clock::now(); m_lastReceived = chrono::steady_clock::now();
clogS(NetRight) << _r; clogS(NetRight) << _t << _r;
try // Generic try-catch block designed to capture RLP format errors - TODO: give decent diagnostics, make a bit more specific over what is caught. try // Generic try-catch block designed to capture RLP format errors - TODO: give decent diagnostics, make a bit more specific over what is caught.
{ {
switch ((PacketType)_r[0].toInt<unsigned>()) switch (_t)
{ {
case DisconnectPacket: case DisconnectPacket:
{ {
string reason = "Unspecified"; string reason = "Unspecified";
auto r = (DisconnectReason)_r[1].toInt<int>(); auto r = (DisconnectReason)_r[0].toInt<int>();
if (!_r[1].isInt()) if (!_r[0].isInt())
drop(BadProtocol); drop(BadProtocol);
else else
{ {
@ -197,7 +198,7 @@ bool Session::interpret(RLP const& _r)
clogS(NetTriviaSummary) << "Peers (" << dec << (_r.itemCount() - 1) << " entries)"; clogS(NetTriviaSummary) << "Peers (" << dec << (_r.itemCount() - 1) << " entries)";
m_weRequestedNodes = false; m_weRequestedNodes = false;
for (unsigned i = 1; i < _r.itemCount(); ++i) for (unsigned i = 0; i < _r.itemCount(); ++i)
{ {
bi::address peerAddress; bi::address peerAddress;
if (_r[i][0].size() == 16) if (_r[i][0].size() == 16)
@ -247,12 +248,11 @@ bool Session::interpret(RLP const& _r)
break; break;
default: default:
{ {
auto id = _r[0].toInt<unsigned>();
for (auto const& i: m_capabilities) for (auto const& i: m_capabilities)
if (id >= i.second->m_idOffset && id - i.second->m_idOffset < i.second->hostCapability()->messageCount()) if (_t >= i.second->m_idOffset && _t - i.second->m_idOffset < i.second->hostCapability()->messageCount())
{ {
if (i.second->m_enabled) if (i.second->m_enabled)
return i.second->interpret(id - i.second->m_idOffset, _r); return i.second->interpret(_t - i.second->m_idOffset, _r);
else else
return true; return true;
} }
@ -278,12 +278,7 @@ void Session::ping()
RLPStream& Session::prep(RLPStream& _s, PacketType _id, unsigned _args) RLPStream& Session::prep(RLPStream& _s, PacketType _id, unsigned _args)
{ {
return prep(_s).appendList(_args + 1).append((unsigned)_id); return _s.appendRaw(bytes(1, _id)).appendList(_args);
}
RLPStream& Session::prep(RLPStream& _s)
{
return _s.appendRaw(bytes(4, 0));
} }
void Session::sealAndSend(RLPStream& _s) void Session::sealAndSend(RLPStream& _s)
@ -296,32 +291,32 @@ void Session::sealAndSend(RLPStream& _s)
bool Session::checkPacket(bytesConstRef _msg) bool Session::checkPacket(bytesConstRef _msg)
{ {
if (_msg.size() < 5) if (_msg.size() < 2)
return false; return false;
uint32_t len = ((_msg[0] * 256 + _msg[1]) * 256 + _msg[2]) * 256 + _msg[3]; if (_msg[0] > 0x7f)
if (_msg.size() != len + 4)
return false; return false;
RLP r(_msg.cropped(4)); RLP r(_msg.cropped(1));
if (r.actualSize() != len) if (r.actualSize() + 1 != _msg.size())
return false; return false;
return true; return true;
} }
void Session::send(bytesConstRef _msg) //void Session::send(bytesConstRef _msg)
{ //{
send(_msg.toBytes()); // send(_msg.toBytes());
} //}
void Session::send(bytes&& _msg) void Session::send(bytes&& _msg)
{ {
clogS(NetLeft) << RLP(bytesConstRef(&_msg).cropped(4)); clogS(NetLeft) << RLP(bytesConstRef(&_msg).cropped(1));
if (!checkPacket(bytesConstRef(&_msg))) bytesConstRef msg(&_msg);
if (!checkPacket(msg))
clogS(NetWarn) << "INVALID PACKET CONSTRUCTED!"; clogS(NetWarn) << "INVALID PACKET CONSTRUCTED!";
if (!m_socket.is_open()) if (!m_socket.is_open())
return; return;
bool doWrite = false; bool doWrite = false;
{ {
Guard l(x_writeQueue); Guard l(x_writeQueue);
@ -336,6 +331,7 @@ void Session::send(bytes&& _msg)
void Session::write() void Session::write()
{ {
const bytes& bytes = m_writeQueue[0]; const bytes& bytes = m_writeQueue[0];
m_io->writeSingleFramePacket(&bytes, m_writeQueue[0]);
auto self(shared_from_this()); auto self(shared_from_this());
ba::async_write(m_socket, ba::buffer(bytes), [this, self](boost::system::error_code ec, std::size_t /*length*/) ba::async_write(m_socket, ba::buffer(bytes), [this, self](boost::system::error_code ec, std::size_t /*length*/)
{ {
@ -415,7 +411,7 @@ void Session::doRead()
return; return;
auto self(shared_from_this()); auto self(shared_from_this());
m_socket.async_read_some(boost::asio::buffer(m_data), [this,self](boost::system::error_code ec, std::size_t length) ba::async_read(m_socket, boost::asio::buffer(m_data, h256::size), [this,self](boost::system::error_code ec, std::size_t length)
{ {
if (ec && ec.category() != boost::asio::error::get_misc_category() && ec.value() != boost::asio::error::eof) if (ec && ec.category() != boost::asio::error::get_misc_category() && ec.value() != boost::asio::error::eof)
{ {
@ -426,50 +422,66 @@ void Session::doRead()
return; return;
else else
{ {
try /// authenticate and decrypt header
bytesRef header(m_data.data(), h256::size);
if (!m_io->authAndDecryptHeader(header))
{
clog(NetWarn) << "header decrypt failed";
drop(BadProtocol); // todo: better error
return;
}
/// check frame size
uint32_t frameSize = (m_data[0] * 256 + m_data[1]) * 256 + m_data[2];
if (frameSize > 16777216)
{
clog(NetWarn) << "frame size too large";
drop(BadProtocol);
return;
}
/// rlp of header has protocol-type, sequence-id[, total-packet-size]
bytes headerRLP(13);
bytesConstRef(m_data.data(), h128::size).cropped(3).copyTo(&headerRLP);
/// read padded frame and mac
auto tlen = frameSize + ((16 - (frameSize % 16)) % 16) + h128::size;
ba::async_read(m_socket, boost::asio::buffer(m_data, tlen), [this, self, headerRLP, frameSize, tlen](boost::system::error_code ec, std::size_t length)
{ {
m_incoming.resize(m_incoming.size() + length); if (ec && ec.category() != boost::asio::error::get_misc_category() && ec.value() != boost::asio::error::eof)
memcpy(m_incoming.data() + m_incoming.size() - length, m_data.data(), length); {
clogS(NetWarn) << "Error reading: " << ec.message();
// 4 bytes for length header drop(TCPError);
const uint32_t c_hLen = 4; }
while (m_incoming.size() > c_hLen) else if (ec && length == 0)
return;
else
{ {
// break if data recvd is less than expected size of packet. if (!m_io->authAndDecryptFrame(bytesRef(m_data.data(), tlen)))
uint32_t len = fromBigEndian<uint32_t>(bytesConstRef(m_incoming.data(), c_hLen)); {
uint32_t tlen = c_hLen + len; clog(NetWarn) << "frame decrypt failed";
if (m_incoming.size() < tlen) drop(BadProtocol); // todo: better error
break; return;
bytesConstRef frame(m_incoming.data(), tlen); }
bytesConstRef packet = frame.cropped(c_hLen);
bytesConstRef frame(m_data.data(), frameSize);
if (!checkPacket(frame)) if (!checkPacket(frame))
{ {
cerr << "Received " << packet.size() << ": " << toHex(packet) << endl; cerr << "Received " << frame.size() << ": " << toHex(frame) << endl;
clogS(NetWarn) << "INVALID MESSAGE RECEIVED"; clogS(NetWarn) << "INVALID MESSAGE RECEIVED";
disconnect(BadProtocol); disconnect(BadProtocol);
return; return;
} }
else else
{ {
RLP r(packet); auto packetType = (PacketType)RLP(frame.cropped(0, 1)).toInt<unsigned>();
if (!interpret(r)) RLP r(frame.cropped(1));
if (!interpret(packetType, r))
clogS(NetWarn) << "Couldn't interpret packet." << RLP(r); clogS(NetWarn) << "Couldn't interpret packet." << RLP(r);
} }
memmove(m_incoming.data(), m_incoming.data() + tlen, m_incoming.size() - tlen); doRead();
m_incoming.resize(m_incoming.size() - tlen);
} }
doRead(); });
}
catch (Exception const& _e)
{
clogS(NetWarn) << "ERROR: " << diagnostic_information(_e);
drop(BadProtocol);
}
catch (std::exception const& _e)
{
clogS(NetWarn) << "ERROR: " << _e.what();
drop(BadProtocol);
}
} }
}); });
} }

11
libp2p/Session.h

@ -54,7 +54,7 @@ class Session: public std::enable_shared_from_this<Session>
friend class HostCapabilityFace; friend class HostCapabilityFace;
public: public:
Session(Host* _server, RLPXFrameIO _io, std::shared_ptr<Peer> const& _n, PeerSessionInfo _info); Session(Host* _server, RLPXFrameIO* _io, std::shared_ptr<Peer> const& _n, PeerSessionInfo _info);
virtual ~Session(); virtual ~Session();
void start(); void start();
@ -71,7 +71,6 @@ public:
std::shared_ptr<PeerCap> cap() const { try { return std::static_pointer_cast<PeerCap>(m_capabilities.at(std::make_pair(PeerCap::name(), PeerCap::version()))); } catch (...) { return nullptr; } } std::shared_ptr<PeerCap> cap() const { try { return std::static_pointer_cast<PeerCap>(m_capabilities.at(std::make_pair(PeerCap::name(), PeerCap::version()))); } catch (...) { return nullptr; } }
static RLPStream& prep(RLPStream& _s, PacketType _t, unsigned _args = 0); static RLPStream& prep(RLPStream& _s, PacketType _t, unsigned _args = 0);
static RLPStream& prep(RLPStream& _s);
void sealAndSend(RLPStream& _s); void sealAndSend(RLPStream& _s);
int rating() const; int rating() const;
@ -86,7 +85,7 @@ public:
private: private:
void send(bytes&& _msg); void send(bytes&& _msg);
void send(bytesConstRef _msg); // void send(bytesConstRef _msg);
/// Drop the connection for the reason @a _r. /// Drop the connection for the reason @a _r.
void drop(DisconnectReason _r); void drop(DisconnectReason _r);
@ -98,18 +97,18 @@ private:
void write(); void write();
/// Interpret an incoming message. /// Interpret an incoming message.
bool interpret(RLP const& _r); bool interpret(PacketType _t, RLP const& _r);
/// @returns true iff the _msg forms a valid message for sending or receiving on the network. /// @returns true iff the _msg forms a valid message for sending or receiving on the network.
static bool checkPacket(bytesConstRef _msg); static bool checkPacket(bytesConstRef _msg);
Host* m_server; ///< The host that owns us. Never null. Host* m_server; ///< The host that owns us. Never null.
RLPXFrameIO m_io; ///< Transport over which packets are sent. RLPXFrameIO* m_io; ///< Transport over which packets are sent.
bi::tcp::socket& m_socket; ///< Socket for the peer's connection. bi::tcp::socket& m_socket; ///< Socket for the peer's connection.
Mutex x_writeQueue; ///< Mutex for the write queue. Mutex x_writeQueue; ///< Mutex for the write queue.
std::deque<bytes> m_writeQueue; ///< The write queue. std::deque<bytes> m_writeQueue; ///< The write queue.
std::array<byte, 65536> m_data; ///< Buffer for ingress packet data. std::array<byte, 16777216> m_data; ///< Buffer for ingress packet data.
bytes m_incoming; ///< Read buffer for ingress bytes. bytes m_incoming; ///< Read buffer for ingress bytes.
unsigned m_protocolVersion = 0; ///< The protocol version of the peer. unsigned m_protocolVersion = 0; ///< The protocol version of the peer.

2
libwhisper/WhisperPeer.cpp

@ -55,7 +55,7 @@ bool WhisperPeer::interpret(unsigned _id, RLP const& _r)
{ {
case StatusPacket: case StatusPacket:
{ {
auto protocolVersion = _r[1].toInt<unsigned>(); auto protocolVersion = _r[0].toInt<unsigned>();
clogS(NetMessageSummary) << "Status: " << protocolVersion; clogS(NetMessageSummary) << "Status: " << protocolVersion;

15
test/peer.cpp

@ -32,6 +32,9 @@ BOOST_AUTO_TEST_SUITE(p2p)
BOOST_AUTO_TEST_CASE(host) BOOST_AUTO_TEST_CASE(host)
{ {
auto oldLogVerbosity = g_logVerbosity;
g_logVerbosity = 10;
NetworkPreferences host1prefs(30301, "127.0.0.1", true, true); NetworkPreferences host1prefs(30301, "127.0.0.1", true, true);
NetworkPreferences host2prefs(30302, "127.0.0.1", true, true); NetworkPreferences host2prefs(30302, "127.0.0.1", true, true);
@ -44,10 +47,14 @@ BOOST_AUTO_TEST_CASE(host)
host1.addNode(node2, "127.0.0.1", host2prefs.listenPort, host2prefs.listenPort); host1.addNode(node2, "127.0.0.1", host2prefs.listenPort, host2prefs.listenPort);
this_thread::sleep_for(chrono::seconds(1)); this_thread::sleep_for(chrono::seconds(3));
auto host1peerCount = host1.peerCount();
auto host2peerCount = host2.peerCount();
BOOST_REQUIRE_EQUAL(host1peerCount, 1);
BOOST_REQUIRE_EQUAL(host2peerCount, 1);
BOOST_REQUIRE_EQUAL(host1.peerCount(), 1); g_logVerbosity = oldLogVerbosity;
BOOST_REQUIRE_EQUAL(host2.peerCount(), host1.peerCount());
} }
BOOST_AUTO_TEST_CASE(save_nodes) BOOST_AUTO_TEST_CASE(save_nodes)
@ -71,7 +78,7 @@ BOOST_AUTO_TEST_CASE(save_nodes)
for (auto const& h: hosts) for (auto const& h: hosts)
host2.addNode(h->id(), "127.0.0.1", h->listenPort(), h->listenPort()); host2.addNode(h->id(), "127.0.0.1", h->listenPort(), h->listenPort());
this_thread::sleep_for(chrono::milliseconds(1000)); this_thread::sleep_for(chrono::milliseconds(2000));
bytes firstHostNetwork(host.saveNetwork()); bytes firstHostNetwork(host.saveNetwork());
bytes secondHostNetwork(host.saveNetwork()); bytes secondHostNetwork(host.saveNetwork());

Loading…
Cancel
Save