diff --git a/libp2p/RLPXFrameCoder.h b/libp2p/RLPXFrameCoder.h index f4c13758b..4d06fd529 100644 --- a/libp2p/RLPXFrameCoder.h +++ b/libp2p/RLPXFrameCoder.h @@ -33,18 +33,24 @@ namespace dev namespace p2p { +struct RLPXFrameDecrytFailed: virtual dev::Exception {}; + +/** + * @brief Encapsulation of Frame + * @todo coder integration; padding derived from coder + */ struct RLPXFrameInfo { RLPXFrameInfo() = default; /// Constructor. frame-size || protocol-type, [sequence-id[, total-packet-size]] RLPXFrameInfo(bytesConstRef _frameHeader); - uint32_t length = 0; ///< Max: 2**24 - uint8_t padding = 0; + uint32_t length = 0; ///< Size of frame (excludes padding). Max: 2**24 + uint8_t padding = 0; ///< Length of padding which follows @length. - uint16_t protocolId = 0; - bool hasSequence = false; - uint16_t sequenceId = 0; - uint32_t totalLength = 0; + uint16_t protocolId = 0; ///< Protocol ID as negotiated by handshake. + bool hasSequence = false; ///< If this frame is part of a sequence + uint16_t sequenceId = 0; ///< Sequence ID of frame + uint32_t totalLength = 0; ///< Set to }; class RLPXHandshake; @@ -52,8 +58,12 @@ class RLPXHandshake; /** * @brief Encoder/decoder transport for RLPx connection established by RLPXHandshake. * + * @todo rename to RLPXTranscoder + * @todo Remove 'Frame' nomenclature and expect caller to provide RLPXFrame + * @todo Remove handshake as friend, remove handshake-based constructor + * * Thread Safety - * Distinct Objects: Safe. + * Distinct Objects: Unsafe. * Shared objects: Unsafe. */ class RLPXFrameCoder @@ -69,7 +79,7 @@ public: ~RLPXFrameCoder() {} - /// Establish shared secrets and setup AES and MAC states. Used by both constructors. + /// Establish shared secrets and setup AES and MAC states. void setup(bool _originated, h512 _remoteEphemeral, h256 _remoteNonce, crypto::ECDHE const& _ephemeral, h256 _nonce, bytesConstRef _ackCipher, bytesConstRef _authCipher); /// Write single-frame payload of packet(s). @@ -78,10 +88,10 @@ public: /// Write continuation frame of segmented payload. void writeFrame(uint16_t _protocolType, uint16_t _seqId, bytesConstRef _payload, bytes& o_bytes); - /// Write first frame of segmented payload. + /// Write first frame of segmented or sequence-tagged payload. void writeFrame(uint16_t _protocolType, uint16_t _seqId, uint32_t _totalSize, bytesConstRef _payload, bytes& o_bytes); - /// Encrypt _packet as RLPx frame. + /// Legacy. Encrypt _packet as ill-defined legacy RLPx frame. void writeSingleFramePacket(bytesConstRef _packet, bytes& o_bytes); /// Authenticate and decrypt header in-place. diff --git a/libp2p/RLPXFrameReader.h b/libp2p/RLPXFrameReader.h new file mode 100644 index 000000000..40f59c960 --- /dev/null +++ b/libp2p/RLPXFrameReader.h @@ -0,0 +1,112 @@ +/* + This file is part of cpp-ethereum. + + cpp-ethereum is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + cpp-ethereum is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with cpp-ethereum. If not, see . + */ +/** @file RLPXFrameReader.h + * @author Alex Leverington + * @date 2015 + */ + + +#pragma once + +#include +#include "RLPXFrameCoder.h" +#include "RLPXPacket.h" +namespace ba = boost::asio; +namespace bi = boost::asio::ip; + +namespace dev +{ +namespace p2p +{ + +/** + * RLPFrameReader + * Reads and assembles RLPX frame byte buffers into RLPX packets. Additionally + * buffers incomplete packets which are pieced into multiple frames (has sequence). + */ +class RLPXFrameReader +{ +public: + RLPXFrameReader(uint16_t _protocolType): m_protocolType(_protocolType) {} + + /// Processes a single frame returning complete packets. + std::vector demux(RLPXFrameCoder& _coder, bytesRef _frame, bool _sequence = false, uint16_t _seq = 0, uint32_t _totalSize = 0) + { + if (!_coder.authAndDecryptFrame(_frame)) + BOOST_THROW_EXCEPTION(RLPXFrameDecrytFailed()); + + std::vector ret; + if (!_sequence && (!_frame.size() || _frame.size() > _totalSize)) + return ret; + + // trim mac + bytesConstRef buffer = _frame.cropped(0, _frame.size() - h128::size); + // continue populating incomplete packets + if (_sequence && m_incomplete.count(_seq)) + { + uint32_t& remaining = m_incomplete.at(_seq).second; + if (!_totalSize && buffer.size() > 0 && buffer.size() <= remaining) + { + remaining -= buffer.size(); + + RLPXPacket& p = m_incomplete.at(_seq).first; + if(p.append(buffer) && !remaining) + ret.push_back(std::move(p)); + if (!remaining) + m_incomplete.erase(_seq); + + if (!ret.empty() && remaining) + BOOST_THROW_EXCEPTION(RLPXInvalidPacket()); + else if (ret.empty() && !remaining) + BOOST_THROW_EXCEPTION(RLPXInvalidPacket()); + + return ret; + } + else + m_incomplete.erase(_seq); + } + + while (!buffer.empty()) + { + auto type = nextRLP(buffer); + if (type.empty()) + break; + buffer = buffer.cropped(type.size()); + // consume entire buffer if packet has sequence + auto packet = _sequence ? buffer : nextRLP(buffer); + buffer = buffer.cropped(packet.size()); + RLPXPacket p(m_protocolType, type); + if (!packet.empty()) + p.append(packet); + + uint32_t remaining = _totalSize - type.size() - packet.size(); + if (p.isValid()) + ret.push_back(std::move(p)); + else if (_sequence && remaining) + m_incomplete.insert(std::make_pair(_seq, std::make_pair(std::move(p), remaining))); + // else drop invalid packet + } + return ret; + } + +protected: + uint16_t m_protocolType; + std::map> m_incomplete; ///< Sequence: Incomplete packet and bytes remaining. +}; + +} +} \ No newline at end of file diff --git a/libp2p/RLPXFrameWriter.cpp b/libp2p/RLPXFrameWriter.cpp index 2c00256e4..8ff12b444 100644 --- a/libp2p/RLPXFrameWriter.cpp +++ b/libp2p/RLPXFrameWriter.cpp @@ -28,11 +28,18 @@ using namespace dev::p2p; const uint16_t RLPXFrameWriter::EmptyFrameLength = h128::size * 3; // header + headerMAC + frameMAC const uint16_t RLPXFrameWriter::MinFrameDequeLength = h128::size * 4; // header + headerMAC + padded-block + frameMAC -void RLPXFrameWriter::enque(unsigned _packetType, RLPStream& _payload, PacketPriority _priority) +void RLPXFrameWriter::enque(RLPXPacket&& _p, PacketPriority _priority) { + if (!_p.isValid()) + return; QueueState& qs = _priority ? m_q.first : m_q.second; DEV_GUARDED(qs.x) - qs.q.push_back(move(RLPXPacket(m_protocolType, _packetType, _payload))); + qs.q.push_back(move(_p)); +} + +void RLPXFrameWriter::enque(unsigned _packetType, RLPStream& _payload, PacketPriority _priority) +{ + enque(RLPXPacket(m_protocolType, (RLPStream() << _packetType), _payload), _priority); } size_t RLPXFrameWriter::mux(RLPXFrameCoder& _coder, unsigned _size, vector& o_toWrite) @@ -70,13 +77,11 @@ size_t RLPXFrameWriter::mux(RLPXFrameCoder& _coder, unsigned _size, vectorsize(); - bytes packetType = rlp(qs.writing->type()); - qs.sequenced = qs.remaining + packetType.size() > frameAllot; + qs.sequenced = qs.writing->size() > frameAllot; - // stop here if we can't write-out packet-type - // or payload already packed and packet won't fit - if (packetType.size() > frameAllot || (qs.sequenced && payload.size())) + // break here if we can't write-out packet-type + // or payload is packed and next packet won't fit (implicit) + if (qs.writing->type().size() > frameAllot || (qs.sequenced && !payload.empty())) { qs.writing = nullptr; qs.remaining = 0; @@ -85,14 +90,16 @@ size_t RLPXFrameWriter::mux(RLPXFrameCoder& _coder, unsigned _size, vectortype().size(); + payload += qs.writing->type(); + + qs.remaining = qs.writing->data().size(); } assert(qs.sequenced || (!qs.sequenced && frameAllot >= qs.remaining)); if (frameAllot && qs.remaining) { - offset = qs.writing->size() - qs.remaining; + offset = qs.writing->data().size() - qs.remaining; length = qs.remaining <= frameAllot ? qs.remaining : frameAllot; bytes portion = bytesConstRef(&qs.writing->data()).cropped(offset, length).toBytes(); qs.remaining -= length; diff --git a/libp2p/RLPXFrameWriter.h b/libp2p/RLPXFrameWriter.h index a0e2107fb..96a342de9 100644 --- a/libp2p/RLPXFrameWriter.h +++ b/libp2p/RLPXFrameWriter.h @@ -33,88 +33,11 @@ namespace dev namespace p2p { -struct RLPXFrameDecrytFailed: virtual dev::Exception {}; - -/** - * RLPFrameReader - * Reads and assembles RLPX frame byte buffers into RLPX packets. Additionally - * buffers incomplete packets which are pieced into multiple frames (has sequence). - * @todo drop frame and reset incomplete if - * @todo percolate sequenceid to p2p protocol - * @todo informative exception - */ -class RLPXFrameReader -{ -public: - RLPXFrameReader(uint16_t _protocolType): m_protocolType(_protocolType) {} - - /// Processes a single frame returning complete packets. - std::vector demux(RLPXFrameCoder& _coder, bytesRef _frame, bool _sequence = false, uint16_t _seq = 0, uint32_t _totalSize = 0) - { - if (!_coder.authAndDecryptFrame(_frame)) - BOOST_THROW_EXCEPTION(RLPXFrameDecrytFailed()); - - std::vector ret; - if (!_sequence && (!_frame.size() || _frame.size() > _totalSize)) - return ret; - - // trim mac - bytesConstRef buffer = _frame.cropped(0, _frame.size() - h128::size); - // continue populating incomplete packets - if (_sequence && m_incomplete.count(_seq)) - { - uint32_t& remaining = m_incomplete.at(_seq).second; - if (!_totalSize && buffer.size() > 0 && buffer.size() <= remaining) - { - remaining -= buffer.size(); - - RLPXPacket& p = m_incomplete.at(_seq).first; - if(p.streamIn(buffer) && !remaining) - ret.push_back(std::move(p)); - if (!remaining) - m_incomplete.erase(_seq); - - if (!ret.empty() && remaining) - BOOST_THROW_EXCEPTION(RLPXInvalidPacket()); - else if (ret.empty() && !remaining) - BOOST_THROW_EXCEPTION(RLPXInvalidPacket()); - - return ret; - } - else - m_incomplete.erase(_seq); - } - - while (!buffer.empty()) - { - auto type = RLPXPacket::nextRLP(buffer); - if (type.empty()) - break; - buffer = buffer.cropped(type.size()); - // consume entire buffer if packet has sequence - auto packet = _sequence ? buffer : RLPXPacket::nextRLP(buffer); - buffer = buffer.cropped(packet.size()); - RLPXPacket p(m_protocolType, type); - if (!packet.empty()) - p.streamIn(packet); - - if (p.isValid()) - ret.push_back(std::move(p)); - else if (_sequence) - // ugh: need to make total-size inclusive of packet-type :/ - m_incomplete.insert(std::make_pair(_seq, std::make_pair(std::move(p), _totalSize - packet.size()))); - } - return ret; - } - -protected: - uint16_t m_protocolType; - std::map> m_incomplete; ///< Sequence: Incomplete packet and bytes remaining. -}; - /** * RLPXFrameWriter - * Multiplex and encrypted packets into RLPX frames. + * Multiplex packets into encrypted RLPX frames. + * @todo throw when enqueued packet is invalid + * @todo use RLPXFrameInfo * @todo flag to disable multiple packets per frame */ class RLPXFrameWriter @@ -139,9 +62,11 @@ public: size_t size() const { size_t l; size_t h; DEV_GUARDED(m_q.first.x) h = m_q.first.q.size(); DEV_GUARDED(m_q.second.x) l = m_q.second.q.size(); return l + h; } - /// Contents of _payload will be moved. Adds packet to queue, to be muxed into frames by mux when network buffer is ready for writing. Thread-safe. + /// Adds @_payload to queue (moves @_payload), to be muxed into frames by mux when network buffer is ready for writing. Thread-safe. void enque(unsigned _packetType, RLPStream& _payload, PacketPriority _priority = PriorityLow); + void enque(RLPXPacket&& _p, PacketPriority _priority = PriorityLow); + /// Returns number of packets framed and outputs frames to o_bytes. Not thread-safe. size_t mux(RLPXFrameCoder& _coder, unsigned _size, std::vector& o_toWrite); diff --git a/libp2p/RLPXPacket.h b/libp2p/RLPXPacket.h index 7f6818770..f5d431be4 100644 --- a/libp2p/RLPXPacket.h +++ b/libp2p/RLPXPacket.h @@ -29,42 +29,39 @@ namespace dev namespace p2p { -struct RLPXNullPacket: virtual dev::Exception {}; struct RLPXInvalidPacket: virtual dev::Exception {}; +static bytesConstRef nextRLP(bytesConstRef _b) { try { RLP r(_b, RLP::AllowNonCanon); return _b.cropped(0, std::min((size_t)r.actualSize(), _b.size())); } catch(...) {} return bytesConstRef(); } + /** * RLPX Packet */ class RLPXPacket { public: - static bytesConstRef nextRLP(bytesConstRef _b) { try { RLP r(_b, RLP::AllowNonCanon); return _b.cropped(0, std::min((size_t)r.actualSize(), _b.size())); } catch(...) {} return bytesConstRef(); } - - /// Construct complete packet. RLPStream data is moved. - RLPXPacket(unsigned _capId, unsigned _type, RLPStream& _rlps): m_cap(_capId), m_type(_type), m_data(std::move(_rlps.out())) { if (!_type && !m_data.size()) BOOST_THROW_EXCEPTION(RLPXNullPacket()); } + /// Construct packet. RLPStream data is invalidated. + RLPXPacket(uint8_t _capId, RLPStream& _type, RLPStream& _data): m_cap(_capId), m_type(std::move(_type.out())), m_data(std::move(_data.out())) {} - /// Construct packet with type and initial bytes; type is determined by RLP of 1st byte and packet may be incomplete. - RLPXPacket(unsigned _capId, bytesConstRef _in): m_cap(_capId), m_type(getType(_in)) { assert(_in.size()); if (_in.size() > 1) { m_data.resize(_in.size() - 1); _in.cropped(1).copyTo(&m_data); } } + /// Construct packet from single bytestream. RLPStream data is invalidated. + RLPXPacket(unsigned _capId, bytesConstRef _in): m_cap(_capId), m_type(nextRLP(_in).toBytes()) { if (_in.size() > m_type.size()) { m_data.resize(_in.size() - m_type.size()); _in.cropped(m_type.size()).copyTo(&m_data); } } RLPXPacket(RLPXPacket const& _p): m_cap(_p.m_cap), m_type(_p.m_type), m_data(_p.m_data) {} - - RLPXPacket(RLPXPacket&& _p): m_cap(_p.m_cap), m_type(_p.m_type), m_data(std::move(_p.m_data)) {} - - unsigned type() const { return m_type; } + RLPXPacket(RLPXPacket&& _p): m_cap(_p.m_cap), m_type(std::move(_p.m_type)), m_data(std::move(_p.m_data)) {} + + bytes const& type() const { return m_type; } + bytes const& data() const { return m_data; } - size_t size() const { try { return RLP(m_data, RLP::LaissezFaire).actualSize(); } catch(...) { return 0; } } + + size_t size() const { try { return RLP(m_type).actualSize() + RLP(m_data, RLP::LaissezFaire).actualSize(); } catch(...) { return 0; } } - bool streamIn(bytesConstRef _in) { auto offset = m_data.size(); m_data.resize(offset + _in.size()); _in.copyTo(bytesRef(&m_data).cropped(offset)); return isValid(); } + /// Appends byte data and returns if packet is valid. + bool append(bytesConstRef _in) { auto offset = m_data.size(); m_data.resize(offset + _in.size()); _in.copyTo(bytesRef(&m_data).cropped(offset)); return isValid(); } - bool isValid() { if (m_type > 0x7f) return false; try { return RLP(m_data).actualSize() == m_data.size(); } catch (...) {} return false; } + virtual bool isValid() const noexcept { try { return !(m_type.empty() && m_data.empty()) && RLP(m_type).actualSize() == m_type.size() && RLP(m_data).actualSize() == m_data.size(); } catch (...) {} return false; } - protected: - unsigned getType(bytesConstRef _rlp) { return RLP(_rlp.cropped(0, 1)).toInt(); } - - unsigned m_cap; - unsigned m_type; - uint16_t m_sequence = 0; + uint8_t m_cap; + bytes m_type; bytes m_data; }; diff --git a/test/libp2p/rlpx.cpp b/test/libp2p/rlpx.cpp index a2d4d78f4..af377a5a5 100644 --- a/test/libp2p/rlpx.cpp +++ b/test/libp2p/rlpx.cpp @@ -32,6 +32,7 @@ #include #include #include +#include using namespace std; using namespace dev; @@ -481,7 +482,9 @@ BOOST_AUTO_TEST_CASE(readerWriter) RLPXFrameWriter w(0); RLPStream rlpPayload(RLPStream() << payload); - w.enque(0, rlpPayload); + uint8_t packetType = 0; + bytes packetTypeRLP = (RLPStream() << packetType).out(); + w.enque(packetType, rlpPayload); vector encframes; for (unsigned i = 1; i < drains; i++) { @@ -516,8 +519,9 @@ BOOST_AUTO_TEST_CASE(readerWriter) packets += move(p); } BOOST_REQUIRE_EQUAL(packets.size(), 1); - BOOST_REQUIRE_EQUAL(packets.front().size(), rlpPayload.out().size()); + BOOST_REQUIRE_EQUAL(packets.front().size(), packetTypeRLP.size() + rlpPayload.out().size()); BOOST_REQUIRE_EQUAL(sha3(RLP(packets.front().data()).payload()), sha3(payload)); + BOOST_REQUIRE_EQUAL(sha3(packets.front().type()), sha3(packetTypeRLP)); } BOOST_AUTO_TEST_SUITE_END()