diff --git a/libp2p/Host.cpp b/libp2p/Host.cpp index f47db3c2b..c5e209d70 100644 --- a/libp2p/Host.cpp +++ b/libp2p/Host.cpp @@ -172,7 +172,7 @@ void Host::doneWorking() m_sessions.clear(); } -void Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io, bi::tcp::endpoint _endpoint) +void Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameCoder* _io, std::shared_ptr const& _s) { // session maybe ingress or egress so m_peers and node table entries may not exist shared_ptr p; @@ -192,7 +192,7 @@ void Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io } if (p->isOffline()) p->m_lastConnected = std::chrono::system_clock::now(); - p->endpoint.address = _endpoint.address(); + p->endpoint.address = _s->remoteEndpoint().address(); auto protocolVersion = _rlp[0].toInt(); auto clientVersion = _rlp[1].toString(); @@ -211,7 +211,7 @@ void Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io clog(NetMessageSummary) << "Hello: " << clientVersion << "V[" << protocolVersion << "]" << _id << showbase << capslog.str() << dec << listenPort; // create session so disconnects are managed - auto ps = make_shared(this, _io, p, PeerSessionInfo({_id, clientVersion, _endpoint.address().to_string(), listenPort, chrono::steady_clock::duration(), _rlp[2].toSet(), 0, map()})); + auto ps = make_shared(this, _io, _s, p, PeerSessionInfo({_id, clientVersion, p->endpoint.address.to_string(), listenPort, chrono::steady_clock::duration(), _rlp[2].toSet(), 0, map()})); if (protocolVersion < dev::p2p::c_protocolVersion - 1) { ps->disconnect(IncompatibleProtocol); diff --git a/libp2p/Host.h b/libp2p/Host.h index 3c7ce257a..95b77d5c9 100644 --- a/libp2p/Host.h +++ b/libp2p/Host.h @@ -40,7 +40,8 @@ #include "HostCapability.h" #include "Network.h" #include "Peer.h" -#include "RLPxFrameIO.h" +#include "RLPXSocket.h" +#include "RLPXFrameCoder.h" #include "Common.h" namespace ba = boost::asio; namespace bi = ba::ip; @@ -158,7 +159,7 @@ public: NodeId id() const { return m_alias.pub(); } /// Validates and starts peer session, taking ownership of _io. Disconnects and returns false upon error. - void startPeerSession(Public const& _id, RLP const& _hello, RLPXFrameIO* _io, bi::tcp::endpoint _endpoint); + void startPeerSession(Public const& _id, RLP const& _hello, RLPXFrameCoder* _io, std::shared_ptr const& _s); protected: void onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e); diff --git a/libp2p/RLPxFrameIO.cpp b/libp2p/RLPXFrameCoder.cpp similarity index 89% rename from libp2p/RLPxFrameIO.cpp rename to libp2p/RLPXFrameCoder.cpp index 4fa8557ba..c4bb46814 100644 --- a/libp2p/RLPxFrameIO.cpp +++ b/libp2p/RLPXFrameCoder.cpp @@ -14,16 +14,14 @@ You should have received a copy of the GNU General Public License along with cpp-ethereum. If not, see . */ -/** @file RLPXFrameIO.cpp +/** @file RLPXFrameCoder.cpp * @author Alex Leverington * @date 2015 */ -#include "RLPxFrameIO.h" +#include "RLPXFrameCoder.h" + #include -#include "Host.h" -#include "Session.h" -#include "Peer.h" #include "RLPxHandshake.h" using namespace std; @@ -31,7 +29,7 @@ using namespace dev; using namespace dev::p2p; using namespace CryptoPP; -RLPXFrameIO::RLPXFrameIO(RLPXHandshake const& _init): m_socket(_init.m_socket) +RLPXFrameCoder::RLPXFrameCoder(RLPXHandshake const& _init) { // we need: // originated? @@ -94,7 +92,7 @@ RLPXFrameIO::RLPXFrameIO(RLPXHandshake const& _init): m_socket(_init.m_socket) m_ingressMac.Update(keyMaterial.data(), keyMaterial.size()); } -void RLPXFrameIO::writeSingleFramePacket(bytesConstRef _packet, bytes& o_bytes) +void RLPXFrameCoder::writeSingleFramePacket(bytesConstRef _packet, bytes& o_bytes) { // _packet = type || rlpList() @@ -126,7 +124,7 @@ void RLPXFrameIO::writeSingleFramePacket(bytesConstRef _packet, bytes& o_bytes) egressDigest().ref().copyTo(macRef); } -bool RLPXFrameIO::authAndDecryptHeader(bytesRef io) +bool RLPXFrameCoder::authAndDecryptHeader(bytesRef io) { asserts(io.size() == h256::size); updateIngressMACWithHeader(io); @@ -138,7 +136,7 @@ bool RLPXFrameIO::authAndDecryptHeader(bytesRef io) return true; } -bool RLPXFrameIO::authAndDecryptFrame(bytesRef io) +bool RLPXFrameCoder::authAndDecryptFrame(bytesRef io) { bytesRef cipherText(io.cropped(0, io.size() - h128::size)); updateIngressMACWithFrame(cipherText); @@ -149,7 +147,7 @@ bool RLPXFrameIO::authAndDecryptFrame(bytesRef io) return true; } -h128 RLPXFrameIO::egressDigest() +h128 RLPXFrameCoder::egressDigest() { SHA3_256 h(m_egressMac); h128 digest; @@ -157,7 +155,7 @@ h128 RLPXFrameIO::egressDigest() return digest; } -h128 RLPXFrameIO::ingressDigest() +h128 RLPXFrameCoder::ingressDigest() { SHA3_256 h(m_ingressMac); h128 digest; @@ -165,29 +163,29 @@ h128 RLPXFrameIO::ingressDigest() return digest; } -void RLPXFrameIO::updateEgressMACWithHeader(bytesConstRef _headerCipher) +void RLPXFrameCoder::updateEgressMACWithHeader(bytesConstRef _headerCipher) { updateMAC(m_egressMac, _headerCipher.cropped(0, 16)); } -void RLPXFrameIO::updateEgressMACWithFrame(bytesConstRef _cipher) +void RLPXFrameCoder::updateEgressMACWithFrame(bytesConstRef _cipher) { m_egressMac.Update(_cipher.data(), _cipher.size()); updateMAC(m_egressMac); } -void RLPXFrameIO::updateIngressMACWithHeader(bytesConstRef _headerCipher) +void RLPXFrameCoder::updateIngressMACWithHeader(bytesConstRef _headerCipher) { updateMAC(m_ingressMac, _headerCipher.cropped(0, 16)); } -void RLPXFrameIO::updateIngressMACWithFrame(bytesConstRef _cipher) +void RLPXFrameCoder::updateIngressMACWithFrame(bytesConstRef _cipher) { m_ingressMac.Update(_cipher.data(), _cipher.size()); updateMAC(m_ingressMac); } -void RLPXFrameIO::updateMAC(SHA3_256& _mac, bytesConstRef _seed) +void RLPXFrameCoder::updateMAC(SHA3_256& _mac, bytesConstRef _seed) { if (_seed.size() && _seed.size() != h128::size) asserts(false); diff --git a/libp2p/RLPxFrameIO.h b/libp2p/RLPXFrameCoder.h similarity index 67% rename from libp2p/RLPxFrameIO.h rename to libp2p/RLPXFrameCoder.h index 0f0504e48..7c5eedbff 100644 --- a/libp2p/RLPxFrameIO.h +++ b/libp2p/RLPXFrameCoder.h @@ -14,7 +14,7 @@ You should have received a copy of the GNU General Public License along with cpp-ethereum. If not, see . */ -/** @file RLPXFrameIO.h +/** @file RLPXFrameCoder.h * @author Alex Leverington * @date 2015 */ @@ -23,13 +23,10 @@ #pragma once #include -#include +#include #include #include -#include #include "Common.h" -namespace ba = boost::asio; -namespace bi = boost::asio::ip; namespace dev { @@ -39,45 +36,21 @@ namespace p2p class RLPXHandshake; /** - * @brief Encoder/decoder transport for RLPx connections established by RLPXHandshake. - * Managed (via shared_ptr) socket for use by RLPXHandshake and RLPXFrameIO. - * - * Thread Safety - * Distinct Objects: Safe. - * Shared objects: Unsafe. - * * an instance method must not be called concurrently - * * a writeSingleFramePacket can be called concurrent to authAndDecryptHeader OR authAndDecryptFrame - */ -class RLPXSocket: public std::enable_shared_from_this -{ -public: - RLPXSocket(bi::tcp::socket* _socket): m_socket(std::move(*_socket)) {} - ~RLPXSocket() { close(); } - - bool isConnected() const { return m_socket.is_open(); } - void close() { try { boost::system::error_code ec; m_socket.shutdown(bi::tcp::socket::shutdown_both, ec); if (m_socket.is_open()) m_socket.close(); } catch (...){} } - bi::tcp::endpoint remoteEndpoint() { try { return m_socket.remote_endpoint(); } catch (...){ return bi::tcp::endpoint(); } } - bi::tcp::socket& ref() { return m_socket; } - -protected: - bi::tcp::socket m_socket; -}; - -/** - * @brief Encoder/decoder transport for RLPx connections established by RLPXHandshake. + * @brief Encoder/decoder transport for RLPx connection established by RLPXHandshake. * * Thread Safety * Distinct Objects: Safe. * Shared objects: Unsafe. */ -class RLPXFrameIO +class RLPXFrameCoder { + friend class RLPXFrameIOMux; friend class Session; public: /// Constructor. /// Requires instance of RLPXHandshake which has completed first two phases of handshake. - RLPXFrameIO(RLPXHandshake const& _init); - ~RLPXFrameIO() {} + RLPXFrameCoder(RLPXHandshake const& _init); + ~RLPXFrameCoder() {} /// Encrypt _packet as RLPx frame. void writeSingleFramePacket(bytesConstRef _packet, bytes& o_bytes); @@ -93,7 +66,7 @@ public: /// Return first 16 bytes of current digest from ingress mac. h128 ingressDigest(); - + protected: /// Update state of egress MAC with frame header. void updateEgressMACWithHeader(bytesConstRef _headerCipher); @@ -106,9 +79,7 @@ protected: /// Update state of ingress MAC with frame. void updateIngressMACWithFrame(bytesConstRef _cipher); - - bi::tcp::socket& socket() { return m_socket->ref(); } - + private: /// Update state of _mac. void updateMAC(CryptoPP::SHA3_256& _mac, bytesConstRef _seed = bytesConstRef()); @@ -125,8 +96,6 @@ private: CryptoPP::SHA3_256 m_egressMac; ///< State of MAC for egress ciphertext. CryptoPP::SHA3_256 m_ingressMac; ///< State of MAC for ingress ciphertext. - - std::shared_ptr m_socket; }; } diff --git a/libp2p/RLPXSocket.cpp b/libp2p/RLPXSocket.cpp new file mode 100644 index 000000000..e69de29bb diff --git a/libp2p/RLPXSocket.h b/libp2p/RLPXSocket.h new file mode 100644 index 000000000..389418c76 --- /dev/null +++ b/libp2p/RLPXSocket.h @@ -0,0 +1,56 @@ +/* + This file is part of cpp-ethereum. + + cpp-ethereum is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + cpp-ethereum is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with cpp-ethereum. If not, see . + */ +/** @file RLPXSocket.h + * @author Alex Leverington + * @date 2015 + */ + +#pragma once + +#include "Common.h" + +namespace dev +{ +namespace p2p +{ + +/** + * @brief Shared pointer wrapper for ASIO TCP socket. + * + * Thread Safety + * Distinct Objects: Safe. + * Shared objects: Unsafe. + * * an instance method must not be called concurrently + */ +class RLPXSocket: public std::enable_shared_from_this +{ +public: + /// Constructor. Dereferences and takes ownership of _socket. + RLPXSocket(bi::tcp::socket* _socket): m_socket(std::move(*_socket)) {} + ~RLPXSocket() { close(); } + + bool isConnected() const { return m_socket.is_open(); } + void close() { try { boost::system::error_code ec; m_socket.shutdown(bi::tcp::socket::shutdown_both, ec); if (m_socket.is_open()) m_socket.close(); } catch (...){} } + bi::tcp::endpoint remoteEndpoint() { try { return m_socket.remote_endpoint(); } catch (...){ return bi::tcp::endpoint(); } } + bi::tcp::socket& ref() { return m_socket; } + +protected: + bi::tcp::socket m_socket; +}; + +} +} \ No newline at end of file diff --git a/libp2p/RLPxHandshake.cpp b/libp2p/RLPxHandshake.cpp index 8caf6e4f3..47604eedc 100644 --- a/libp2p/RLPxHandshake.cpp +++ b/libp2p/RLPxHandshake.cpp @@ -179,7 +179,7 @@ void RLPXHandshake::transition(boost::system::error_code _ech) /// This pointer will be freed if there is an error otherwise /// it will be passed to Host which will take ownership. - m_io = new RLPXFrameIO(*this); + m_io = new RLPXFrameCoder(*this); // old packet format // 5 arguments, HelloPacket @@ -200,7 +200,7 @@ void RLPXHandshake::transition(boost::system::error_code _ech) } else if (m_nextState == ReadHello) { - // Authenticate and decrypt initial hello frame with initial RLPXFrameIO + // Authenticate and decrypt initial hello frame with initial RLPXFrameCoder // and request m_host to start session. m_nextState = StartSession; @@ -269,7 +269,7 @@ void RLPXHandshake::transition(boost::system::error_code _ech) try { RLP rlp(frame.cropped(1), RLP::ThrowOnFail | RLP::FailIfTooSmall); - m_host->startPeerSession(m_remote, rlp, m_io, m_socket->remoteEndpoint()); + m_host->startPeerSession(m_remote, rlp, m_io, m_socket); } catch (std::exception const& _e) { diff --git a/libp2p/RLPxHandshake.h b/libp2p/RLPxHandshake.h index 47f6afb57..6ed0819c6 100644 --- a/libp2p/RLPxHandshake.h +++ b/libp2p/RLPxHandshake.h @@ -25,7 +25,8 @@ #include #include #include -#include "RLPxFrameIO.h" +#include "RLPXSocket.h" +#include "RLPXFrameCoder.h" #include "Common.h" namespace ba = boost::asio; namespace bi = boost::asio::ip; @@ -36,7 +37,7 @@ namespace p2p { /** - * @brief Setup inbound or outbound connection for communication over RLPXFrameIO. + * @brief Setup inbound or outbound connection for communication over RLPXFrameCoder. * RLPx Spec: https://github.com/ethereum/devp2p/blob/master/rlpx.md#encrypted-handshake * * @todo Implement StartSession transition via lambda which is passed to constructor. @@ -47,7 +48,7 @@ namespace p2p */ class RLPXHandshake: public std::enable_shared_from_this { - friend class RLPXFrameIO; + friend class RLPXFrameCoder; /// Sequential states of handshake enum State @@ -122,7 +123,7 @@ protected: /// Used to read and write RLPx encrypted frames for last step of handshake authentication. /// Passed onto Host which will take ownership. - RLPXFrameIO* m_io = nullptr; + RLPXFrameCoder* m_io = nullptr; std::shared_ptr m_socket; ///< Socket. boost::asio::deadline_timer m_idleTimer; ///< Timer which enforces c_timeout. diff --git a/libp2p/Session.cpp b/libp2p/Session.cpp index 462fea7b1..260ce10ac 100644 --- a/libp2p/Session.cpp +++ b/libp2p/Session.cpp @@ -27,23 +27,24 @@ #include #include #include +#include "RLPxHandshake.h" #include "Host.h" #include "Capability.h" using namespace std; using namespace dev; using namespace dev::p2p; -Session::Session(Host* _s, RLPXFrameIO* _io, std::shared_ptr const& _n, PeerSessionInfo _info): - m_server(_s), +Session::Session(Host* _h, RLPXFrameCoder* _io, std::shared_ptr const& _s, std::shared_ptr const& _n, PeerSessionInfo _info): + m_server(_h), m_io(_io), - m_socket(m_io->socket()), + m_socket(_s), m_peer(_n), m_info(_info), m_ping(chrono::steady_clock::time_point::max()) { m_peer->m_lastDisconnect = NoDisconnect; m_lastReceived = m_connect = chrono::steady_clock::now(); - m_info.socketId = _io->socket().native_handle(); + m_info.socketId = m_socket->ref().native_handle(); } Session::~Session() @@ -59,11 +60,12 @@ Session::~Session() try { - if (m_socket.is_open()) + bi::tcp::socket& socket = m_socket->ref(); + if (socket.is_open()) { boost::system::error_code ec; - m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); - m_socket.close(); + socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); + socket.close(); } } catch (...){} @@ -303,7 +305,7 @@ void Session::send(bytes&& _msg) if (!checkPacket(msg)) clog(NetWarn) << "INVALID PACKET CONSTRUCTED!"; - if (!m_socket.is_open()) + if (!m_socket->ref().is_open()) return; bool doWrite = false; @@ -326,7 +328,7 @@ void Session::write() out = &m_writeQueue[0]; } auto self(shared_from_this()); - ba::async_write(m_socket, ba::buffer(*out), [this, self](boost::system::error_code ec, std::size_t /*length*/) + ba::async_write(m_socket->ref(), ba::buffer(*out), [this, self](boost::system::error_code ec, std::size_t /*length*/) { ThreadContext tc(info().id.abridged()); ThreadContext tc2(info().clientVersion); @@ -352,13 +354,14 @@ void Session::drop(DisconnectReason _reason) { if (m_dropped) return; - if (m_socket.is_open()) + bi::tcp::socket& socket = m_socket->ref(); + if (socket.is_open()) try { - clog(NetConnect) << "Closing " << m_socket.remote_endpoint() << "(" << reasonOf(_reason) << ")"; + clog(NetConnect) << "Closing " << socket.remote_endpoint() << "(" << reasonOf(_reason) << ")"; boost::system::error_code ec; - m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); - m_socket.close(); + socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); + socket.close(); } catch (...) {} @@ -379,7 +382,7 @@ void Session::disconnect(DisconnectReason _reason) m_peer->endpoint, // TODO: may not be 100% accurate m_server->peerCount() ); - if (m_socket.is_open()) + if (m_socket->ref().is_open()) { RLPStream s; prep(s, DisconnectPacket, 1) << (int)_reason; @@ -401,7 +404,7 @@ void Session::doRead() return; auto self(shared_from_this()); - ba::async_read(m_socket, boost::asio::buffer(m_data, h256::size), [this,self](boost::system::error_code ec, std::size_t length) + ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, h256::size), [this,self](boost::system::error_code ec, std::size_t length) { ThreadContext tc(info().id.abridged()); ThreadContext tc2(info().clientVersion); @@ -438,7 +441,7 @@ void Session::doRead() /// read padded frame and mac auto tlen = frameSize + ((16 - (frameSize % 16)) % 16) + h128::size; - ba::async_read(m_socket, boost::asio::buffer(m_data, tlen), [this, self, headerRLP, frameSize, tlen](boost::system::error_code ec, std::size_t length) + ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, tlen), [this, self, headerRLP, frameSize, tlen](boost::system::error_code ec, std::size_t length) { ThreadContext tc(info().id.abridged()); ThreadContext tc2(info().clientVersion); diff --git a/libp2p/Session.h b/libp2p/Session.h index bcbf8022b..08006f890 100644 --- a/libp2p/Session.h +++ b/libp2p/Session.h @@ -33,7 +33,8 @@ #include #include #include -#include "RLPxHandshake.h" +#include "RLPXFrameCoder.h" +#include "RLPXSocket.h" #include "Common.h" namespace dev @@ -54,7 +55,7 @@ class Session: public std::enable_shared_from_this friend class HostCapabilityFace; public: - Session(Host* _server, RLPXFrameIO* _io, std::shared_ptr const& _n, PeerSessionInfo _info); + Session(Host* _server, RLPXFrameCoder* _io, std::shared_ptr const& _s, std::shared_ptr const& _n, PeerSessionInfo _info); virtual ~Session(); void start(); @@ -62,7 +63,7 @@ public: void ping(); - bool isConnected() const { return m_socket.is_open(); } + bool isConnected() const { return m_socket->ref().is_open(); } NodeId id() const; unsigned socketId() const { return m_info.socketId; } @@ -105,8 +106,8 @@ private: Host* m_server; ///< The host that owns us. Never null. - RLPXFrameIO* m_io; ///< Transport over which packets are sent. - bi::tcp::socket& m_socket; ///< Socket for the peer's connection. + RLPXFrameCoder* m_io; ///< Transport over which packets are sent. + std::shared_ptr m_socket; ///< Socket of peer's connection. Mutex x_writeQueue; ///< Mutex for the write queue. std::deque m_writeQueue; ///< The write queue. std::array m_data; ///< Buffer for ingress packet data.