Browse Source

classify handshake and begin pulling rlpx into session

cl-refactor
subtly 10 years ago
parent
commit
23a64667e3
  1. 4
      libp2p/Host.cpp
  2. 6
      libp2p/Host.h
  3. 195
      libp2p/RLPxHandshake.cpp
  4. 41
      libp2p/RLPxHandshake.h
  5. 6
      libp2p/Session.cpp
  6. 6
      libp2p/Session.h

4
libp2p/Host.cpp

@ -157,7 +157,7 @@ unsigned Host::protocolVersion() const
return 4; return 4;
} }
bool Host::startPeerSession(Public const& _id, RLP const& _rlp, bi::tcp::socket *_socket) bool Host::startPeerSession(Public const& _id, RLP const& _rlp, bi::tcp::socket *_socket, RLPXFrameIO* _io)
{ {
/// Get or create Peer /// Get or create Peer
shared_ptr<Peer> p; shared_ptr<Peer> p;
@ -187,7 +187,7 @@ bool Host::startPeerSession(Public const& _id, RLP const& _rlp, bi::tcp::socket
clog(NetMessageSummary) << "Hello: " << clientVersion << "V[" << protocolVersion << "]" << _id.abridged() << showbase << capslog.str() << dec << listenPort; clog(NetMessageSummary) << "Hello: " << clientVersion << "V[" << protocolVersion << "]" << _id.abridged() << showbase << capslog.str() << dec << listenPort;
// create session so disconnects are managed // create session so disconnects are managed
auto ps = make_shared<Session>(this, move(*_socket), p, PeerSessionInfo({_id, clientVersion, _socket->remote_endpoint().address().to_string(), listenPort, chrono::steady_clock::duration(), _rlp[3].toSet<CapDesc>(), 0, map<string, string>()})); auto ps = make_shared<Session>(this, move(*_io), p, PeerSessionInfo({_id, clientVersion, _socket->remote_endpoint().address().to_string(), listenPort, chrono::steady_clock::duration(), _rlp[3].toSet<CapDesc>(), 0, map<string, string>()}));
if (protocolVersion != this->protocolVersion()) if (protocolVersion != this->protocolVersion())
{ {
ps->disconnect(IncompatibleProtocol); ps->disconnect(IncompatibleProtocol);

6
libp2p/Host.h

@ -40,6 +40,7 @@
#include "HostCapability.h" #include "HostCapability.h"
#include "Network.h" #include "Network.h"
#include "Peer.h" #include "Peer.h"
#include "RLPxHandshake.h"
#include "Common.h" #include "Common.h"
namespace ba = boost::asio; namespace ba = boost::asio;
namespace bi = ba::ip; namespace bi = ba::ip;
@ -69,8 +70,7 @@ private:
* @brief The Host class * @brief The Host class
* Capabilities should be registered prior to startNetwork, since m_capabilities is not thread-safe. * Capabilities should be registered prior to startNetwork, since m_capabilities is not thread-safe.
* *
* @todo handshake: gracefully disconnect peer if peer already connected * @todo cleanup startPeerSession
* @todo abstract socket -> IPConnection
* @todo determinePublic: ipv6, udp * @todo determinePublic: ipv6, udp
* @todo handle conflict if addNode/requireNode called and Node already exists w/conflicting tcp or udp port * @todo handle conflict if addNode/requireNode called and Node already exists w/conflicting tcp or udp port
* @todo per-session keepalive/ping instead of broadcast; set ping-timeout via median-latency * @todo per-session keepalive/ping instead of broadcast; set ping-timeout via median-latency
@ -151,7 +151,7 @@ public:
NodeId id() const { return m_alias.pub(); } NodeId id() const { return m_alias.pub(); }
/// Validates and starts peer session, taking ownership of _socket. Disconnects and returns false upon error. /// Validates and starts peer session, taking ownership of _socket. Disconnects and returns false upon error.
bool startPeerSession(Public const& _id, RLP const& _hello, bi::tcp::socket *_socket); bool startPeerSession(Public const& _id, RLP const& _hello, bi::tcp::socket* _socket, RLPXFrameIO* _io);
protected: protected:
void onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e); void onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e);

195
libp2p/RLPxHandshake.cpp

@ -28,7 +28,7 @@ using namespace dev;
using namespace dev::p2p; using namespace dev::p2p;
using namespace CryptoPP; using namespace CryptoPP;
RLPXFrameIO::RLPXFrameIO(RLPXHandshake const& _init): m_socket(_init.socket) RLPXFrameIO::RLPXFrameIO(RLPXHandshake const& _init): m_socket(_init.m_socket)
{ {
// we need: // we need:
// originated? // originated?
@ -41,18 +41,18 @@ RLPXFrameIO::RLPXFrameIO(RLPXHandshake const& _init): m_socket(_init.socket)
// shared-secret = sha3(ecdhe-shared-secret || sha3(nonce || initiator-nonce)) // shared-secret = sha3(ecdhe-shared-secret || sha3(nonce || initiator-nonce))
Secret ephemeralShared; Secret ephemeralShared;
_init.ecdhe.agree(_init.remoteEphemeral, ephemeralShared); _init.m_ecdhe.agree(_init.m_remoteEphemeral, ephemeralShared);
ephemeralShared.ref().copyTo(keyMaterial.cropped(0, h256::size)); ephemeralShared.ref().copyTo(keyMaterial.cropped(0, h256::size));
h512 nonceMaterial; h512 nonceMaterial;
h256 const& leftNonce = _init.originated ? _init.remoteNonce : _init.nonce; h256 const& leftNonce = _init.m_originated ? _init.m_remoteNonce : _init.m_nonce;
h256 const& rightNonce = _init.originated ? _init.nonce : _init.remoteNonce; h256 const& rightNonce = _init.m_originated ? _init.m_nonce : _init.m_remoteNonce;
leftNonce.ref().copyTo(nonceMaterial.ref().cropped(0, h256::size)); leftNonce.ref().copyTo(nonceMaterial.ref().cropped(0, h256::size));
rightNonce.ref().copyTo(nonceMaterial.ref().cropped(h256::size, h256::size)); rightNonce.ref().copyTo(nonceMaterial.ref().cropped(h256::size, h256::size));
auto outRef(keyMaterial.cropped(h256::size, h256::size)); auto outRef(keyMaterial.cropped(h256::size, h256::size));
sha3(nonceMaterial.ref(), outRef); // output h(nonces) sha3(nonceMaterial.ref(), outRef); // output h(nonces)
sha3(keyMaterial, outRef); // output shared-secret sha3(keyMaterial, outRef); // output shared-secret
// token: sha3(outRef, bytesRef(&token)); -> Host (to be saved to disk) // token: sha3(outRef, bytesRef(&token)); -> m_host (to be saved to disk)
// aes-secret = sha3(ecdhe-shared-secret || shared-secret) // aes-secret = sha3(ecdhe-shared-secret || shared-secret)
sha3(keyMaterial, outRef); // output aes-secret sha3(keyMaterial, outRef); // output aes-secret
@ -68,16 +68,16 @@ RLPXFrameIO::RLPXFrameIO(RLPXHandshake const& _init): m_socket(_init.socket)
// Recipient egress-mac: sha3(mac-secret^initiator-nonce || auth-sent-ack) // Recipient egress-mac: sha3(mac-secret^initiator-nonce || auth-sent-ack)
// ingress-mac: sha3(mac-secret^recipient-nonce || auth-recvd-init) // ingress-mac: sha3(mac-secret^recipient-nonce || auth-recvd-init)
(*(h256*)outRef.data() ^ _init.remoteNonce).ref().copyTo(keyMaterial); (*(h256*)outRef.data() ^ _init.m_remoteNonce).ref().copyTo(keyMaterial);
bytes const& egressCipher = _init.originated ? _init.authCipher : _init.ackCipher; bytes const& egressCipher = _init.m_originated ? _init.m_authCipher : _init.m_ackCipher;
keyMaterialBytes.resize(h256::size + egressCipher.size()); keyMaterialBytes.resize(h256::size + egressCipher.size());
keyMaterial.retarget(keyMaterialBytes.data(), keyMaterialBytes.size()); keyMaterial.retarget(keyMaterialBytes.data(), keyMaterialBytes.size());
bytesConstRef(&egressCipher).copyTo(keyMaterial.cropped(h256::size, egressCipher.size())); bytesConstRef(&egressCipher).copyTo(keyMaterial.cropped(h256::size, egressCipher.size()));
m_egressMac.Update(keyMaterial.data(), keyMaterial.size()); m_egressMac.Update(keyMaterial.data(), keyMaterial.size());
// recover mac-secret by re-xoring remoteNonce // recover mac-secret by re-xoring remoteNonce
(*(h256*)keyMaterial.data() ^ _init.remoteNonce ^ _init.nonce).ref().copyTo(keyMaterial); (*(h256*)keyMaterial.data() ^ _init.m_remoteNonce ^ _init.m_nonce).ref().copyTo(keyMaterial);
bytes const& ingressCipher = _init.originated ? _init.ackCipher : _init.authCipher; bytes const& ingressCipher = _init.m_originated ? _init.m_ackCipher : _init.m_authCipher;
keyMaterialBytes.resize(h256::size + ingressCipher.size()); keyMaterialBytes.resize(h256::size + ingressCipher.size());
keyMaterial.retarget(keyMaterialBytes.data(), keyMaterialBytes.size()); keyMaterial.retarget(keyMaterialBytes.data(), keyMaterialBytes.size());
bytesConstRef(&ingressCipher).copyTo(keyMaterial.cropped(h256::size, ingressCipher.size())); bytesConstRef(&ingressCipher).copyTo(keyMaterial.cropped(h256::size, ingressCipher.size()));
@ -204,28 +204,27 @@ void RLPXFrameIO::updateMAC(SHA3_256& _mac, h128 const& _seed)
_mac.Update(encDigest.data(), h128::size); _mac.Update(encDigest.data(), h128::size);
} }
void RLPXHandshake::writeAuth() void RLPXHandshake::writeAuth()
{ {
clog(NetConnect) << "p2p.connect.egress sending auth to " << socket->remote_endpoint(); clog(NetConnect) << "p2p.connect.egress sending auth to " << m_socket->remote_endpoint();
auth.resize(Signature::size + h256::size + Public::size + h256::size + 1); m_auth.resize(Signature::size + h256::size + Public::size + h256::size + 1);
bytesRef sig(&auth[0], Signature::size); bytesRef sig(&m_auth[0], Signature::size);
bytesRef hepubk(&auth[Signature::size], h256::size); bytesRef hepubk(&m_auth[Signature::size], h256::size);
bytesRef pubk(&auth[Signature::size + h256::size], Public::size); bytesRef pubk(&m_auth[Signature::size + h256::size], Public::size);
bytesRef nonce(&auth[Signature::size + h256::size + Public::size], h256::size); bytesRef nonce(&m_auth[Signature::size + h256::size + Public::size], h256::size);
// E(remote-pubk, S(ecdhe-random, ecdh-shared-secret^nonce) || H(ecdhe-random-pubk) || pubk || nonce || 0x0) // E(remote-pubk, S(ecdhe-random, ecdh-shared-secret^nonce) || H(ecdhe-random-pubk) || pubk || nonce || 0x0)
Secret staticShared; Secret staticShared;
crypto::ecdh::agree(host->m_alias.sec(), remote, staticShared); crypto::ecdh::agree(m_host->m_alias.sec(), m_remote, staticShared);
sign(ecdhe.seckey(), staticShared ^ this->nonce).ref().copyTo(sig); sign(m_ecdhe.seckey(), staticShared ^ m_nonce).ref().copyTo(sig);
sha3(ecdhe.pubkey().ref(), hepubk); sha3(m_ecdhe.pubkey().ref(), hepubk);
host->m_alias.pub().ref().copyTo(pubk); m_host->m_alias.pub().ref().copyTo(pubk);
this->nonce.ref().copyTo(nonce); m_nonce.ref().copyTo(nonce);
auth[auth.size() - 1] = 0x0; m_auth[m_auth.size() - 1] = 0x0;
encryptECIES(remote, &auth, authCipher); encryptECIES(m_remote, &m_auth, m_authCipher);
auto self(shared_from_this()); auto self(shared_from_this());
ba::async_write(*socket, ba::buffer(authCipher), [this, self](boost::system::error_code ec, std::size_t) ba::async_write(*m_socket, ba::buffer(m_authCipher), [this, self](boost::system::error_code ec, std::size_t)
{ {
transition(ec); transition(ec);
}); });
@ -233,17 +232,17 @@ void RLPXHandshake::writeAuth()
void RLPXHandshake::writeAck() void RLPXHandshake::writeAck()
{ {
clog(NetConnect) << "p2p.connect.ingress sending ack to " << socket->remote_endpoint(); clog(NetConnect) << "p2p.connect.ingress sending ack to " << m_socket->remote_endpoint();
ack.resize(Public::size + h256::size + 1); m_ack.resize(Public::size + h256::size + 1);
bytesRef epubk(&ack[0], Public::size); bytesRef epubk(&m_ack[0], Public::size);
bytesRef nonce(&ack[Public::size], h256::size); bytesRef nonce(&m_ack[Public::size], h256::size);
ecdhe.pubkey().ref().copyTo(epubk); m_ecdhe.pubkey().ref().copyTo(epubk);
this->nonce.ref().copyTo(nonce); m_nonce.ref().copyTo(nonce);
ack[ack.size() - 1] = 0x0; m_ack[m_ack.size() - 1] = 0x0;
encryptECIES(remote, &ack, ackCipher); encryptECIES(m_remote, &m_ack, m_ackCipher);
auto self(shared_from_this()); auto self(shared_from_this());
ba::async_write(*socket, ba::buffer(ackCipher), [this, self](boost::system::error_code ec, std::size_t) ba::async_write(*m_socket, ba::buffer(m_ackCipher), [this, self](boost::system::error_code ec, std::size_t)
{ {
transition(ec); transition(ec);
}); });
@ -251,32 +250,32 @@ void RLPXHandshake::writeAck()
void RLPXHandshake::readAuth() void RLPXHandshake::readAuth()
{ {
clog(NetConnect) << "p2p.connect.ingress recving auth from " << socket->remote_endpoint(); clog(NetConnect) << "p2p.connect.ingress recving auth from " << m_socket->remote_endpoint();
authCipher.resize(307); m_authCipher.resize(307);
auto self(shared_from_this()); auto self(shared_from_this());
ba::async_read(*socket, ba::buffer(authCipher, 307), [this, self](boost::system::error_code ec, std::size_t) ba::async_read(*m_socket, ba::buffer(m_authCipher, 307), [this, self](boost::system::error_code ec, std::size_t)
{ {
if (ec) if (ec)
transition(ec); transition(ec);
else if (decryptECIES(host->m_alias.sec(), bytesConstRef(&authCipher), auth)) else if (decryptECIES(m_host->m_alias.sec(), bytesConstRef(&m_authCipher), m_auth))
{ {
bytesConstRef sig(&auth[0], Signature::size); bytesConstRef sig(&m_auth[0], Signature::size);
bytesConstRef hepubk(&auth[Signature::size], h256::size); bytesConstRef hepubk(&m_auth[Signature::size], h256::size);
bytesConstRef pubk(&auth[Signature::size + h256::size], Public::size); bytesConstRef pubk(&m_auth[Signature::size + h256::size], Public::size);
bytesConstRef nonce(&auth[Signature::size + h256::size + Public::size], h256::size); bytesConstRef nonce(&m_auth[Signature::size + h256::size + Public::size], h256::size);
pubk.copyTo(remote.ref()); pubk.copyTo(m_remote.ref());
nonce.copyTo(remoteNonce.ref()); nonce.copyTo(m_remoteNonce.ref());
Secret sharedSecret; Secret sharedSecret;
crypto::ecdh::agree(host->m_alias.sec(), remote, sharedSecret); crypto::ecdh::agree(m_host->m_alias.sec(), m_remote, sharedSecret);
remoteEphemeral = recover(*(Signature*)sig.data(), sharedSecret ^ remoteNonce); m_remoteEphemeral = recover(*(Signature*)sig.data(), sharedSecret ^ m_remoteNonce);
assert(sha3(remoteEphemeral) == *(h256*)hepubk.data()); assert(sha3(m_remoteEphemeral) == *(h256*)hepubk.data());
transition(); transition();
} }
else else
{ {
clog(NetWarn) << "p2p.connect.egress recving auth decrypt failed for" << socket->remote_endpoint(); clog(NetWarn) << "p2p.connect.egress recving auth decrypt failed for" << m_socket->remote_endpoint();
nextState = Error; m_nextState = Error;
transition(); transition();
} }
}); });
@ -284,23 +283,23 @@ void RLPXHandshake::readAuth()
void RLPXHandshake::readAck() void RLPXHandshake::readAck()
{ {
clog(NetConnect) << "p2p.connect.egress recving ack from " << socket->remote_endpoint(); clog(NetConnect) << "p2p.connect.egress recving ack from " << m_socket->remote_endpoint();
ackCipher.resize(210); m_ackCipher.resize(210);
auto self(shared_from_this()); auto self(shared_from_this());
ba::async_read(*socket, ba::buffer(ackCipher, 210), [this, self](boost::system::error_code ec, std::size_t) ba::async_read(*m_socket, ba::buffer(m_ackCipher, 210), [this, self](boost::system::error_code ec, std::size_t)
{ {
if (ec) if (ec)
transition(ec); transition(ec);
else if (decryptECIES(host->m_alias.sec(), bytesConstRef(&ackCipher), ack)) else if (decryptECIES(m_host->m_alias.sec(), bytesConstRef(&m_ackCipher), m_ack))
{ {
bytesConstRef(&ack).cropped(0, Public::size).copyTo(remoteEphemeral.ref()); bytesConstRef(&m_ack).cropped(0, Public::size).copyTo(m_remoteEphemeral.ref());
bytesConstRef(&ack).cropped(Public::size, h256::size).copyTo(remoteNonce.ref()); bytesConstRef(&m_ack).cropped(Public::size, h256::size).copyTo(m_remoteNonce.ref());
transition(); transition();
} }
else else
{ {
clog(NetWarn) << "p2p.connect.egress recving ack decrypt failed for " << socket->remote_endpoint(); clog(NetWarn) << "p2p.connect.egress recving ack decrypt failed for " << m_socket->remote_endpoint();
nextState = Error; m_nextState = Error;
transition(); transition();
} }
}); });
@ -308,95 +307,95 @@ void RLPXHandshake::readAck()
void RLPXHandshake::error() void RLPXHandshake::error()
{ {
clog(NetConnect) << "Disconnecting " << socket->remote_endpoint() << " (Handshake Failed)"; clog(NetConnect) << "Disconnecting " << m_socket->remote_endpoint() << " (Handshake Failed)";
boost::system::error_code ec; boost::system::error_code ec;
socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
if (socket->is_open()) if (m_socket->is_open())
socket->close(); m_socket->close();
} }
void RLPXHandshake::transition(boost::system::error_code _ech) void RLPXHandshake::transition(boost::system::error_code _ech)
{ {
if (_ech || nextState == Error) if (_ech || m_nextState == Error)
return error(); return error();
auto self(shared_from_this()); auto self(shared_from_this());
if (nextState == New) if (m_nextState == New)
{ {
nextState = AckAuth; m_nextState = AckAuth;
if (originated) if (m_originated)
writeAuth(); writeAuth();
else else
readAuth(); readAuth();
} }
else if (nextState == AckAuth) else if (m_nextState == AckAuth)
{ {
nextState = WriteHello; m_nextState = WriteHello;
if (originated) if (m_originated)
readAck(); readAck();
else else
writeAck(); writeAck();
} }
else if (nextState == WriteHello) else if (m_nextState == WriteHello)
{ {
nextState = ReadHello; m_nextState = ReadHello;
if (originated) if (m_originated)
clog(NetConnect) << "p2p.connect.egress sending capabilities handshake"; clog(NetConnect) << "p2p.connect.egress sending capabilities handshake";
else else
clog(NetConnect) << "p2p.connect.ingress sending capabilities handshake"; clog(NetConnect) << "p2p.connect.ingress sending capabilities handshake";
io.reset(new RLPXFrameIO(*this)); m_io.reset(new RLPXFrameIO(*this));
// old packet format // old packet format
// 5 arguments, HelloPacket // 5 arguments, HelloPacket
RLPStream s; RLPStream s;
s.appendList(5 + 1).append((unsigned)0) s.appendList(5 + 1).append((unsigned)0)
<< host->protocolVersion() << m_host->protocolVersion()
<< host->m_clientVersion << m_host->m_clientVersion
<< host->caps() << m_host->caps()
<< host->m_tcpPublic.port() << m_host->m_tcpPublic.port()
<< host->id(); << m_host->id();
bytes packet; bytes packet;
s.swapOut(packet); s.swapOut(packet);
io->writeSingleFramePacket(&packet, handshakeOutBuffer); m_io->writeSingleFramePacket(&packet, m_handshakeOutBuffer);
ba::async_write(*socket, ba::buffer(handshakeOutBuffer), [this, self](boost::system::error_code ec, std::size_t) ba::async_write(*m_socket, ba::buffer(m_handshakeOutBuffer), [this, self](boost::system::error_code ec, std::size_t)
{ {
transition(ec); transition(ec);
}); });
} }
else if (nextState == ReadHello) else if (m_nextState == ReadHello)
{ {
// Authenticate and decrypt initial hello frame with initial RLPXFrameIO // Authenticate and decrypt initial hello frame with initial RLPXFrameIO
// and request host to start session. // and request m_host to start session.
nextState = StartSession; m_nextState = StartSession;
// read frame header // read frame header
handshakeInBuffer.resize(h256::size); m_handshakeInBuffer.resize(h256::size);
ba::async_read(*socket, boost::asio::buffer(handshakeInBuffer, h256::size), [this,self](boost::system::error_code ec, std::size_t length) ba::async_read(*m_socket, boost::asio::buffer(m_handshakeInBuffer, h256::size), [this,self](boost::system::error_code ec, std::size_t length)
{ {
if (ec) if (ec)
transition(ec); transition(ec);
else else
{ {
/// authenticate and decrypt header /// authenticate and decrypt header
if (!io->authAndDecryptHeader(*(h256*)handshakeInBuffer.data())) if (!m_io->authAndDecryptHeader(*(h256*)m_handshakeInBuffer.data()))
{ {
nextState = Error; m_nextState = Error;
transition(); transition();
return; return;
} }
clog(NetNote) << (originated ? "p2p.connect.egress" : "p2p.connect.ingress") << "recvd hello header"; clog(NetNote) << (m_originated ? "p2p.connect.egress" : "p2p.connect.ingress") << "recvd hello header";
/// check frame size /// check frame size
bytes& header = handshakeInBuffer; bytes& header = m_handshakeInBuffer;
uint32_t frameSize = (uint32_t)(header[2]) | (uint32_t)(header[1])<<8 | (uint32_t)(header[0])<<16; uint32_t frameSize = (uint32_t)(header[2]) | (uint32_t)(header[1])<<8 | (uint32_t)(header[0])<<16;
if (frameSize > 1024) if (frameSize > 1024)
{ {
// all future frames: 16777216 // all future frames: 16777216
clog(NetWarn) << (originated ? "p2p.connect.egress" : "p2p.connect.ingress") << "hello frame is too large"; clog(NetWarn) << (m_originated ? "p2p.connect.egress" : "p2p.connect.ingress") << "hello frame is too large";
nextState = Error; m_nextState = Error;
transition(); transition();
return; return;
} }
@ -406,33 +405,33 @@ void RLPXHandshake::transition(boost::system::error_code _ech)
bytesConstRef(&header).cropped(3).copyTo(&headerRLP); bytesConstRef(&header).cropped(3).copyTo(&headerRLP);
/// read padded frame and mac /// read padded frame and mac
handshakeInBuffer.resize(frameSize + ((16 - (frameSize % 16)) % 16) + h128::size); m_handshakeInBuffer.resize(frameSize + ((16 - (frameSize % 16)) % 16) + h128::size);
ba::async_read(*socket, boost::asio::buffer(handshakeInBuffer, handshakeInBuffer.size()), [this, self, headerRLP](boost::system::error_code ec, std::size_t length) ba::async_read(*m_socket, boost::asio::buffer(m_handshakeInBuffer, m_handshakeInBuffer.size()), [this, self, headerRLP](boost::system::error_code ec, std::size_t length)
{ {
if (ec) if (ec)
transition(ec); transition(ec);
else else
{ {
if (!io->authAndDecryptFrame(bytesRef(&handshakeInBuffer))) if (!m_io->authAndDecryptFrame(bytesRef(&m_handshakeInBuffer)))
{ {
clog(NetWarn) << (originated ? "p2p.connect.egress" : "p2p.connect.ingress") << "hello frame: decrypt failed"; clog(NetWarn) << (m_originated ? "p2p.connect.egress" : "p2p.connect.ingress") << "hello frame: decrypt failed";
nextState = Error; m_nextState = Error;
transition(); transition();
return; return;
} }
RLP rlp(handshakeInBuffer); RLP rlp(m_handshakeInBuffer);
auto packetType = (PacketType)rlp[0].toInt<unsigned>(); auto packetType = (PacketType)rlp[0].toInt<unsigned>();
if (packetType != 0) if (packetType != 0)
{ {
clog(NetWarn) << (originated ? "p2p.connect.egress" : "p2p.connect.ingress") << "hello frame: invalid packet type"; clog(NetWarn) << (m_originated ? "p2p.connect.egress" : "p2p.connect.ingress") << "hello frame: invalid packet type";
nextState = Error; m_nextState = Error;
transition(); transition();
return; return;
} }
// todo: memory management of RLPFrameIO // todo: memory management of RLPFrameIO
host->startPeerSession(remote, rlp, socket); m_host->startPeerSession(m_remote, rlp, m_socket, m_io.get());
} }
}); });
} }

41
libp2p/RLPxHandshake.h

@ -35,10 +35,12 @@ namespace dev
namespace p2p namespace p2p
{ {
class Session;
class RLPXHandshake; class RLPXHandshake;
class RLPXFrameIO class RLPXFrameIO
{ {
friend class Session;
public: public:
RLPXFrameIO(RLPXHandshake const& _init); RLPXFrameIO(RLPXHandshake const& _init);
@ -74,7 +76,6 @@ private:
bi::tcp::socket* m_socket; bi::tcp::socket* m_socket;
}; };
// TODO: change properties to m_
class RLPXHandshake: public std::enable_shared_from_this<RLPXHandshake> class RLPXHandshake: public std::enable_shared_from_this<RLPXHandshake>
{ {
public: public:
@ -91,12 +92,12 @@ public:
}; };
/// Handshake for ingress connection. Takes ownership of socket. /// Handshake for ingress connection. Takes ownership of socket.
RLPXHandshake(Host* _host, bi::tcp::socket* _socket): host(_host), socket(std::move(_socket)), originated(false) { crypto::Nonce::get().ref().copyTo(nonce.ref()); } RLPXHandshake(Host* _host, bi::tcp::socket* _socket): m_host(_host), m_socket(std::move(_socket)), m_originated(false) { crypto::Nonce::get().ref().copyTo(m_nonce.ref()); }
/// Handshake for egress connection to _remote. Takes ownership of socket. /// Handshake for egress connection to _remote. Takes ownership of socket.
RLPXHandshake(Host* _host, bi::tcp::socket* _socket, NodeId _remote): host(_host), remote(_remote), socket(std::move(_socket)), originated(true) { crypto::Nonce::get().ref().copyTo(nonce.ref()); } RLPXHandshake(Host* _host, bi::tcp::socket* _socket, NodeId _remote): m_host(_host), m_remote(_remote), m_socket(std::move(_socket)), m_originated(true) { crypto::Nonce::get().ref().copyTo(m_nonce.ref()); }
~RLPXHandshake() { delete socket; } ~RLPXHandshake() { delete m_socket; }
void start() { transition(); } void start() { transition(); }
@ -111,32 +112,32 @@ protected:
void transition(boost::system::error_code _ech = boost::system::error_code()); void transition(boost::system::error_code _ech = boost::system::error_code());
/// Current state of handshake. /// Current state of handshake.
State nextState = New; State m_nextState = New;
Host* host; Host* m_host;
/// Node id of remote host for socket. /// Node id of remote host for socket.
NodeId remote; NodeId m_remote;
bi::tcp::socket* socket; bi::tcp::socket* m_socket;
bool originated = false; bool m_originated = false;
/// Buffers for encoded and decoded handshake phases /// Buffers for encoded and decoded handshake phases
bytes auth; bytes m_auth;
bytes authCipher; bytes m_authCipher;
bytes ack; bytes m_ack;
bytes ackCipher; bytes m_ackCipher;
bytes handshakeOutBuffer; bytes m_handshakeOutBuffer;
bytes handshakeInBuffer; bytes m_handshakeInBuffer;
crypto::ECDHE ecdhe; crypto::ECDHE m_ecdhe;
h256 nonce; h256 m_nonce;
Public remoteEphemeral; Public m_remoteEphemeral;
h256 remoteNonce; h256 m_remoteNonce;
/// Frame IO is used to read frame for last step of handshake authentication. /// Frame IO is used to read frame for last step of handshake authentication.
std::unique_ptr<RLPXFrameIO> io; std::unique_ptr<RLPXFrameIO> m_io;
}; };
} }

6
libp2p/Session.cpp

@ -37,9 +37,11 @@ using namespace dev::p2p;
#endif #endif
#define clogS(X) dev::LogOutputStream<X, true>(false) << "| " << std::setw(2) << m_socket.native_handle() << "] " #define clogS(X) dev::LogOutputStream<X, true>(false) << "| " << std::setw(2) << m_socket.native_handle() << "] "
Session::Session(Host* _s, bi::tcp::socket _socket, std::shared_ptr<Peer> const& _n, PeerSessionInfo _info): Session::Session(Host* _s, RLPXFrameIO _io, std::shared_ptr<Peer> const& _n, PeerSessionInfo _info):
m_server(_s), m_server(_s),
m_socket(move(_socket)), #warning fixme
m_socket(move(*_io.m_socket)),
m_io(move(_io)),
m_peer(_n), m_peer(_n),
m_info(_info), m_info(_info),
m_ping(chrono::steady_clock::time_point::max()) m_ping(chrono::steady_clock::time_point::max())

6
libp2p/Session.h

@ -32,6 +32,7 @@
#include <libdevcore/RLP.h> #include <libdevcore/RLP.h>
#include <libdevcore/RangeMask.h> #include <libdevcore/RangeMask.h>
#include <libdevcore/Guards.h> #include <libdevcore/Guards.h>
#include "RLPxHandshake.h"
#include "Common.h" #include "Common.h"
namespace dev namespace dev
@ -52,7 +53,7 @@ class Session: public std::enable_shared_from_this<Session>
friend class HostCapabilityFace; friend class HostCapabilityFace;
public: public:
Session(Host* _server, bi::tcp::socket _socket, std::shared_ptr<Peer> const& _n, PeerSessionInfo _info); Session(Host* _server, RLPXFrameIO _io, std::shared_ptr<Peer> const& _n, PeerSessionInfo _info);
virtual ~Session(); virtual ~Session();
void start(); void start();
@ -63,7 +64,7 @@ public:
bool isConnected() const { return m_socket.is_open(); } bool isConnected() const { return m_socket.is_open(); }
NodeId id() const; NodeId id() const;
unsigned socketId() const { return m_socket.native_handle(); } unsigned socketId() const { return m_info.socket; }
template <class PeerCap> template <class PeerCap>
std::shared_ptr<PeerCap> cap() const { try { return std::static_pointer_cast<PeerCap>(m_capabilities.at(std::make_pair(PeerCap::name(), PeerCap::version()))); } catch (...) { return nullptr; } } std::shared_ptr<PeerCap> cap() const { try { return std::static_pointer_cast<PeerCap>(m_capabilities.at(std::make_pair(PeerCap::name(), PeerCap::version()))); } catch (...) { return nullptr; } }
@ -104,6 +105,7 @@ private:
Host* m_server; ///< The host that owns us. Never null. Host* m_server; ///< The host that owns us. Never null.
mutable bi::tcp::socket m_socket; ///< Socket for the peer's connection. Mutable to ask for native_handle(). mutable bi::tcp::socket m_socket; ///< Socket for the peer's connection. Mutable to ask for native_handle().
RLPXFrameIO m_io; ///< Transport over which packets are sent.
Mutex x_writeQueue; ///< Mutex for the write queue. Mutex x_writeQueue; ///< Mutex for the write queue.
std::deque<bytes> m_writeQueue; ///< The write queue. std::deque<bytes> m_writeQueue; ///< The write queue.
std::array<byte, 65536> m_data; ///< Buffer for ingress packet data. std::array<byte, 65536> m_data; ///< Buffer for ingress packet data.

Loading…
Cancel
Save