Browse Source

Prep for cleanup, subprotocol packet queues, and framing. Disown rlpxsocket from handshake and rename FrameIO to FrameCoder.

cl-refactor
subtly 10 years ago
parent
commit
057311778c
  1. 6
      libp2p/Host.cpp
  2. 5
      libp2p/Host.h
  3. 30
      libp2p/RLPXFrameCoder.cpp
  4. 49
      libp2p/RLPXFrameCoder.h
  5. 0
      libp2p/RLPXSocket.cpp
  6. 56
      libp2p/RLPXSocket.h
  7. 6
      libp2p/RLPxHandshake.cpp
  8. 9
      libp2p/RLPxHandshake.h
  9. 35
      libp2p/Session.cpp
  10. 11
      libp2p/Session.h

6
libp2p/Host.cpp

@ -172,7 +172,7 @@ void Host::doneWorking()
m_sessions.clear();
}
void Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io, bi::tcp::endpoint _endpoint)
void Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameCoder* _io, std::shared_ptr<RLPXSocket> const& _s)
{
// session maybe ingress or egress so m_peers and node table entries may not exist
shared_ptr<Peer> p;
@ -192,7 +192,7 @@ void Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io
}
if (p->isOffline())
p->m_lastConnected = std::chrono::system_clock::now();
p->endpoint.address = _endpoint.address();
p->endpoint.address = _s->remoteEndpoint().address();
auto protocolVersion = _rlp[0].toInt<unsigned>();
auto clientVersion = _rlp[1].toString();
@ -211,7 +211,7 @@ void Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io
clog(NetMessageSummary) << "Hello: " << clientVersion << "V[" << protocolVersion << "]" << _id << showbase << capslog.str() << dec << listenPort;
// create session so disconnects are managed
auto ps = make_shared<Session>(this, _io, p, PeerSessionInfo({_id, clientVersion, _endpoint.address().to_string(), listenPort, chrono::steady_clock::duration(), _rlp[2].toSet<CapDesc>(), 0, map<string, string>()}));
auto ps = make_shared<Session>(this, _io, _s, p, PeerSessionInfo({_id, clientVersion, p->endpoint.address.to_string(), listenPort, chrono::steady_clock::duration(), _rlp[2].toSet<CapDesc>(), 0, map<string, string>()}));
if (protocolVersion < dev::p2p::c_protocolVersion - 1)
{
ps->disconnect(IncompatibleProtocol);

5
libp2p/Host.h

@ -40,7 +40,8 @@
#include "HostCapability.h"
#include "Network.h"
#include "Peer.h"
#include "RLPxFrameIO.h"
#include "RLPXSocket.h"
#include "RLPXFrameCoder.h"
#include "Common.h"
namespace ba = boost::asio;
namespace bi = ba::ip;
@ -158,7 +159,7 @@ public:
NodeId id() const { return m_alias.pub(); }
/// Validates and starts peer session, taking ownership of _io. Disconnects and returns false upon error.
void startPeerSession(Public const& _id, RLP const& _hello, RLPXFrameIO* _io, bi::tcp::endpoint _endpoint);
void startPeerSession(Public const& _id, RLP const& _hello, RLPXFrameCoder* _io, std::shared_ptr<RLPXSocket> const& _s);
protected:
void onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e);

30
libp2p/RLPxFrameIO.cpp → libp2p/RLPXFrameCoder.cpp

@ -14,16 +14,14 @@
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file RLPXFrameIO.cpp
/** @file RLPXFrameCoder.cpp
* @author Alex Leverington <nessence@gmail.com>
* @date 2015
*/
#include "RLPxFrameIO.h"
#include "RLPXFrameCoder.h"
#include <libdevcore/Assertions.h>
#include "Host.h"
#include "Session.h"
#include "Peer.h"
#include "RLPxHandshake.h"
using namespace std;
@ -31,7 +29,7 @@ using namespace dev;
using namespace dev::p2p;
using namespace CryptoPP;
RLPXFrameIO::RLPXFrameIO(RLPXHandshake const& _init): m_socket(_init.m_socket)
RLPXFrameCoder::RLPXFrameCoder(RLPXHandshake const& _init)
{
// we need:
// originated?
@ -94,7 +92,7 @@ RLPXFrameIO::RLPXFrameIO(RLPXHandshake const& _init): m_socket(_init.m_socket)
m_ingressMac.Update(keyMaterial.data(), keyMaterial.size());
}
void RLPXFrameIO::writeSingleFramePacket(bytesConstRef _packet, bytes& o_bytes)
void RLPXFrameCoder::writeSingleFramePacket(bytesConstRef _packet, bytes& o_bytes)
{
// _packet = type || rlpList()
@ -126,7 +124,7 @@ void RLPXFrameIO::writeSingleFramePacket(bytesConstRef _packet, bytes& o_bytes)
egressDigest().ref().copyTo(macRef);
}
bool RLPXFrameIO::authAndDecryptHeader(bytesRef io)
bool RLPXFrameCoder::authAndDecryptHeader(bytesRef io)
{
asserts(io.size() == h256::size);
updateIngressMACWithHeader(io);
@ -138,7 +136,7 @@ bool RLPXFrameIO::authAndDecryptHeader(bytesRef io)
return true;
}
bool RLPXFrameIO::authAndDecryptFrame(bytesRef io)
bool RLPXFrameCoder::authAndDecryptFrame(bytesRef io)
{
bytesRef cipherText(io.cropped(0, io.size() - h128::size));
updateIngressMACWithFrame(cipherText);
@ -149,7 +147,7 @@ bool RLPXFrameIO::authAndDecryptFrame(bytesRef io)
return true;
}
h128 RLPXFrameIO::egressDigest()
h128 RLPXFrameCoder::egressDigest()
{
SHA3_256 h(m_egressMac);
h128 digest;
@ -157,7 +155,7 @@ h128 RLPXFrameIO::egressDigest()
return digest;
}
h128 RLPXFrameIO::ingressDigest()
h128 RLPXFrameCoder::ingressDigest()
{
SHA3_256 h(m_ingressMac);
h128 digest;
@ -165,29 +163,29 @@ h128 RLPXFrameIO::ingressDigest()
return digest;
}
void RLPXFrameIO::updateEgressMACWithHeader(bytesConstRef _headerCipher)
void RLPXFrameCoder::updateEgressMACWithHeader(bytesConstRef _headerCipher)
{
updateMAC(m_egressMac, _headerCipher.cropped(0, 16));
}
void RLPXFrameIO::updateEgressMACWithFrame(bytesConstRef _cipher)
void RLPXFrameCoder::updateEgressMACWithFrame(bytesConstRef _cipher)
{
m_egressMac.Update(_cipher.data(), _cipher.size());
updateMAC(m_egressMac);
}
void RLPXFrameIO::updateIngressMACWithHeader(bytesConstRef _headerCipher)
void RLPXFrameCoder::updateIngressMACWithHeader(bytesConstRef _headerCipher)
{
updateMAC(m_ingressMac, _headerCipher.cropped(0, 16));
}
void RLPXFrameIO::updateIngressMACWithFrame(bytesConstRef _cipher)
void RLPXFrameCoder::updateIngressMACWithFrame(bytesConstRef _cipher)
{
m_ingressMac.Update(_cipher.data(), _cipher.size());
updateMAC(m_ingressMac);
}
void RLPXFrameIO::updateMAC(SHA3_256& _mac, bytesConstRef _seed)
void RLPXFrameCoder::updateMAC(SHA3_256& _mac, bytesConstRef _seed)
{
if (_seed.size() && _seed.size() != h128::size)
asserts(false);

49
libp2p/RLPxFrameIO.h → libp2p/RLPXFrameCoder.h

@ -14,7 +14,7 @@
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file RLPXFrameIO.h
/** @file RLPXFrameCoder.h
* @author Alex Leverington <nessence@gmail.com>
* @date 2015
*/
@ -23,13 +23,10 @@
#pragma once
#include <memory>
#include <libdevcrypto/Common.h>
#include <libdevcore/Guards.h>
#include <libdevcrypto/ECDHE.h>
#include <libdevcrypto/CryptoPP.h>
#include <libdevcore/Guards.h>
#include "Common.h"
namespace ba = boost::asio;
namespace bi = boost::asio::ip;
namespace dev
{
@ -39,45 +36,21 @@ namespace p2p
class RLPXHandshake;
/**
* @brief Encoder/decoder transport for RLPx connections established by RLPXHandshake.
* Managed (via shared_ptr) socket for use by RLPXHandshake and RLPXFrameIO.
*
* Thread Safety
* Distinct Objects: Safe.
* Shared objects: Unsafe.
* * an instance method must not be called concurrently
* * a writeSingleFramePacket can be called concurrent to authAndDecryptHeader OR authAndDecryptFrame
*/
class RLPXSocket: public std::enable_shared_from_this<RLPXSocket>
{
public:
RLPXSocket(bi::tcp::socket* _socket): m_socket(std::move(*_socket)) {}
~RLPXSocket() { close(); }
bool isConnected() const { return m_socket.is_open(); }
void close() { try { boost::system::error_code ec; m_socket.shutdown(bi::tcp::socket::shutdown_both, ec); if (m_socket.is_open()) m_socket.close(); } catch (...){} }
bi::tcp::endpoint remoteEndpoint() { try { return m_socket.remote_endpoint(); } catch (...){ return bi::tcp::endpoint(); } }
bi::tcp::socket& ref() { return m_socket; }
protected:
bi::tcp::socket m_socket;
};
/**
* @brief Encoder/decoder transport for RLPx connections established by RLPXHandshake.
* @brief Encoder/decoder transport for RLPx connection established by RLPXHandshake.
*
* Thread Safety
* Distinct Objects: Safe.
* Shared objects: Unsafe.
*/
class RLPXFrameIO
class RLPXFrameCoder
{
friend class RLPXFrameIOMux;
friend class Session;
public:
/// Constructor.
/// Requires instance of RLPXHandshake which has completed first two phases of handshake.
RLPXFrameIO(RLPXHandshake const& _init);
~RLPXFrameIO() {}
RLPXFrameCoder(RLPXHandshake const& _init);
~RLPXFrameCoder() {}
/// Encrypt _packet as RLPx frame.
void writeSingleFramePacket(bytesConstRef _packet, bytes& o_bytes);
@ -93,7 +66,7 @@ public:
/// Return first 16 bytes of current digest from ingress mac.
h128 ingressDigest();
protected:
/// Update state of egress MAC with frame header.
void updateEgressMACWithHeader(bytesConstRef _headerCipher);
@ -106,9 +79,7 @@ protected:
/// Update state of ingress MAC with frame.
void updateIngressMACWithFrame(bytesConstRef _cipher);
bi::tcp::socket& socket() { return m_socket->ref(); }
private:
/// Update state of _mac.
void updateMAC(CryptoPP::SHA3_256& _mac, bytesConstRef _seed = bytesConstRef());
@ -125,8 +96,6 @@ private:
CryptoPP::SHA3_256 m_egressMac; ///< State of MAC for egress ciphertext.
CryptoPP::SHA3_256 m_ingressMac; ///< State of MAC for ingress ciphertext.
std::shared_ptr<RLPXSocket> m_socket;
};
}

0
libp2p/RLPXSocket.cpp

56
libp2p/RLPXSocket.h

@ -0,0 +1,56 @@
/*
This file is part of cpp-ethereum.
cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file RLPXSocket.h
* @author Alex Leverington <nessence@gmail.com>
* @date 2015
*/
#pragma once
#include "Common.h"
namespace dev
{
namespace p2p
{
/**
* @brief Shared pointer wrapper for ASIO TCP socket.
*
* Thread Safety
* Distinct Objects: Safe.
* Shared objects: Unsafe.
* * an instance method must not be called concurrently
*/
class RLPXSocket: public std::enable_shared_from_this<RLPXSocket>
{
public:
/// Constructor. Dereferences and takes ownership of _socket.
RLPXSocket(bi::tcp::socket* _socket): m_socket(std::move(*_socket)) {}
~RLPXSocket() { close(); }
bool isConnected() const { return m_socket.is_open(); }
void close() { try { boost::system::error_code ec; m_socket.shutdown(bi::tcp::socket::shutdown_both, ec); if (m_socket.is_open()) m_socket.close(); } catch (...){} }
bi::tcp::endpoint remoteEndpoint() { try { return m_socket.remote_endpoint(); } catch (...){ return bi::tcp::endpoint(); } }
bi::tcp::socket& ref() { return m_socket; }
protected:
bi::tcp::socket m_socket;
};
}
}

6
libp2p/RLPxHandshake.cpp

@ -179,7 +179,7 @@ void RLPXHandshake::transition(boost::system::error_code _ech)
/// This pointer will be freed if there is an error otherwise
/// it will be passed to Host which will take ownership.
m_io = new RLPXFrameIO(*this);
m_io = new RLPXFrameCoder(*this);
// old packet format
// 5 arguments, HelloPacket
@ -200,7 +200,7 @@ void RLPXHandshake::transition(boost::system::error_code _ech)
}
else if (m_nextState == ReadHello)
{
// Authenticate and decrypt initial hello frame with initial RLPXFrameIO
// Authenticate and decrypt initial hello frame with initial RLPXFrameCoder
// and request m_host to start session.
m_nextState = StartSession;
@ -269,7 +269,7 @@ void RLPXHandshake::transition(boost::system::error_code _ech)
try
{
RLP rlp(frame.cropped(1), RLP::ThrowOnFail | RLP::FailIfTooSmall);
m_host->startPeerSession(m_remote, rlp, m_io, m_socket->remoteEndpoint());
m_host->startPeerSession(m_remote, rlp, m_io, m_socket);
}
catch (std::exception const& _e)
{

9
libp2p/RLPxHandshake.h

@ -25,7 +25,8 @@
#include <memory>
#include <libdevcrypto/Common.h>
#include <libdevcrypto/ECDHE.h>
#include "RLPxFrameIO.h"
#include "RLPXSocket.h"
#include "RLPXFrameCoder.h"
#include "Common.h"
namespace ba = boost::asio;
namespace bi = boost::asio::ip;
@ -36,7 +37,7 @@ namespace p2p
{
/**
* @brief Setup inbound or outbound connection for communication over RLPXFrameIO.
* @brief Setup inbound or outbound connection for communication over RLPXFrameCoder.
* RLPx Spec: https://github.com/ethereum/devp2p/blob/master/rlpx.md#encrypted-handshake
*
* @todo Implement StartSession transition via lambda which is passed to constructor.
@ -47,7 +48,7 @@ namespace p2p
*/
class RLPXHandshake: public std::enable_shared_from_this<RLPXHandshake>
{
friend class RLPXFrameIO;
friend class RLPXFrameCoder;
/// Sequential states of handshake
enum State
@ -122,7 +123,7 @@ protected:
/// Used to read and write RLPx encrypted frames for last step of handshake authentication.
/// Passed onto Host which will take ownership.
RLPXFrameIO* m_io = nullptr;
RLPXFrameCoder* m_io = nullptr;
std::shared_ptr<RLPXSocket> m_socket; ///< Socket.
boost::asio::deadline_timer m_idleTimer; ///< Timer which enforces c_timeout.

35
libp2p/Session.cpp

@ -27,23 +27,24 @@
#include <libdevcore/CommonIO.h>
#include <libdevcore/StructuredLogger.h>
#include <libethcore/Exceptions.h>
#include "RLPxHandshake.h"
#include "Host.h"
#include "Capability.h"
using namespace std;
using namespace dev;
using namespace dev::p2p;
Session::Session(Host* _s, RLPXFrameIO* _io, std::shared_ptr<Peer> const& _n, PeerSessionInfo _info):
m_server(_s),
Session::Session(Host* _h, RLPXFrameCoder* _io, std::shared_ptr<RLPXSocket> const& _s, std::shared_ptr<Peer> const& _n, PeerSessionInfo _info):
m_server(_h),
m_io(_io),
m_socket(m_io->socket()),
m_socket(_s),
m_peer(_n),
m_info(_info),
m_ping(chrono::steady_clock::time_point::max())
{
m_peer->m_lastDisconnect = NoDisconnect;
m_lastReceived = m_connect = chrono::steady_clock::now();
m_info.socketId = _io->socket().native_handle();
m_info.socketId = m_socket->ref().native_handle();
}
Session::~Session()
@ -59,11 +60,12 @@ Session::~Session()
try
{
if (m_socket.is_open())
bi::tcp::socket& socket = m_socket->ref();
if (socket.is_open())
{
boost::system::error_code ec;
m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
m_socket.close();
socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
socket.close();
}
}
catch (...){}
@ -303,7 +305,7 @@ void Session::send(bytes&& _msg)
if (!checkPacket(msg))
clog(NetWarn) << "INVALID PACKET CONSTRUCTED!";
if (!m_socket.is_open())
if (!m_socket->ref().is_open())
return;
bool doWrite = false;
@ -326,7 +328,7 @@ void Session::write()
out = &m_writeQueue[0];
}
auto self(shared_from_this());
ba::async_write(m_socket, ba::buffer(*out), [this, self](boost::system::error_code ec, std::size_t /*length*/)
ba::async_write(m_socket->ref(), ba::buffer(*out), [this, self](boost::system::error_code ec, std::size_t /*length*/)
{
ThreadContext tc(info().id.abridged());
ThreadContext tc2(info().clientVersion);
@ -352,13 +354,14 @@ void Session::drop(DisconnectReason _reason)
{
if (m_dropped)
return;
if (m_socket.is_open())
bi::tcp::socket& socket = m_socket->ref();
if (socket.is_open())
try
{
clog(NetConnect) << "Closing " << m_socket.remote_endpoint() << "(" << reasonOf(_reason) << ")";
clog(NetConnect) << "Closing " << socket.remote_endpoint() << "(" << reasonOf(_reason) << ")";
boost::system::error_code ec;
m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
m_socket.close();
socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
socket.close();
}
catch (...) {}
@ -379,7 +382,7 @@ void Session::disconnect(DisconnectReason _reason)
m_peer->endpoint, // TODO: may not be 100% accurate
m_server->peerCount()
);
if (m_socket.is_open())
if (m_socket->ref().is_open())
{
RLPStream s;
prep(s, DisconnectPacket, 1) << (int)_reason;
@ -401,7 +404,7 @@ void Session::doRead()
return;
auto self(shared_from_this());
ba::async_read(m_socket, boost::asio::buffer(m_data, h256::size), [this,self](boost::system::error_code ec, std::size_t length)
ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, h256::size), [this,self](boost::system::error_code ec, std::size_t length)
{
ThreadContext tc(info().id.abridged());
ThreadContext tc2(info().clientVersion);
@ -438,7 +441,7 @@ void Session::doRead()
/// read padded frame and mac
auto tlen = frameSize + ((16 - (frameSize % 16)) % 16) + h128::size;
ba::async_read(m_socket, boost::asio::buffer(m_data, tlen), [this, self, headerRLP, frameSize, tlen](boost::system::error_code ec, std::size_t length)
ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, tlen), [this, self, headerRLP, frameSize, tlen](boost::system::error_code ec, std::size_t length)
{
ThreadContext tc(info().id.abridged());
ThreadContext tc2(info().clientVersion);

11
libp2p/Session.h

@ -33,7 +33,8 @@
#include <libdevcore/RLP.h>
#include <libdevcore/RangeMask.h>
#include <libdevcore/Guards.h>
#include "RLPxHandshake.h"
#include "RLPXFrameCoder.h"
#include "RLPXSocket.h"
#include "Common.h"
namespace dev
@ -54,7 +55,7 @@ class Session: public std::enable_shared_from_this<Session>
friend class HostCapabilityFace;
public:
Session(Host* _server, RLPXFrameIO* _io, std::shared_ptr<Peer> const& _n, PeerSessionInfo _info);
Session(Host* _server, RLPXFrameCoder* _io, std::shared_ptr<RLPXSocket> const& _s, std::shared_ptr<Peer> const& _n, PeerSessionInfo _info);
virtual ~Session();
void start();
@ -62,7 +63,7 @@ public:
void ping();
bool isConnected() const { return m_socket.is_open(); }
bool isConnected() const { return m_socket->ref().is_open(); }
NodeId id() const;
unsigned socketId() const { return m_info.socketId; }
@ -105,8 +106,8 @@ private:
Host* m_server; ///< The host that owns us. Never null.
RLPXFrameIO* m_io; ///< Transport over which packets are sent.
bi::tcp::socket& m_socket; ///< Socket for the peer's connection.
RLPXFrameCoder* m_io; ///< Transport over which packets are sent.
std::shared_ptr<RLPXSocket> m_socket; ///< Socket of peer's connection.
Mutex x_writeQueue; ///< Mutex for the write queue.
std::deque<bytes> m_writeQueue; ///< The write queue.
std::array<byte, 16777216> m_data; ///< Buffer for ingress packet data.

Loading…
Cancel
Save