From 73677835068d07e445b97e9b2ed9620041c3e14f Mon Sep 17 00:00:00 2001 From: subtly Date: Sun, 14 Jun 2015 02:46:48 -0400 Subject: [PATCH 01/22] Initial round-robin packet dequeing. --- libp2p/RLPXFrameCoder.cpp | 60 +++++++++++----- libp2p/RLPXFrameCoder.h | 11 +++ libp2p/RLPXFrameWriter.cpp | 136 +++++++++++++++++++++++++++++++++++++ libp2p/RLPXFrameWriter.h | 69 +++++++++++++++++++ libp2p/RLPXPacket.cpp | 0 libp2p/RLPXPacket.h | 49 +++++++++++++ libp2p/RLPXSocketIO.cpp | 112 ++++++++++++++++++++++++++++++ libp2p/RLPXSocketIO.h | 71 +++++++++++++++++++ 8 files changed, 492 insertions(+), 16 deletions(-) create mode 100644 libp2p/RLPXFrameWriter.cpp create mode 100644 libp2p/RLPXFrameWriter.h create mode 100644 libp2p/RLPXPacket.cpp create mode 100644 libp2p/RLPXPacket.h create mode 100644 libp2p/RLPXSocketIO.cpp create mode 100644 libp2p/RLPXSocketIO.h diff --git a/libp2p/RLPXFrameCoder.cpp b/libp2p/RLPXFrameCoder.cpp index c4bb46814..be718673e 100644 --- a/libp2p/RLPXFrameCoder.cpp +++ b/libp2p/RLPXFrameCoder.cpp @@ -23,6 +23,7 @@ #include #include "RLPxHandshake.h" +#include "RLPXPacket.h" using namespace std; using namespace dev; @@ -92,38 +93,65 @@ RLPXFrameCoder::RLPXFrameCoder(RLPXHandshake const& _init) m_ingressMac.Update(keyMaterial.data(), keyMaterial.size()); } -void RLPXFrameCoder::writeSingleFramePacket(bytesConstRef _packet, bytes& o_bytes) +void RLPXFrameCoder::writeFrame(uint16_t _protocolType, bytesConstRef _payload, bytes& o_bytes) { - // _packet = type || rlpList() + RLPStream header; + uint32_t len = (uint32_t)_payload.size(); + header.appendRaw(bytes({byte((len >> 16) & 0xff), byte((len >> 8) & 0xff), byte(len & 0xff)})); + header.appendList(1) << _protocolType; + writeFrame(header, _payload, o_bytes); +} +void RLPXFrameCoder::writeFrame(uint16_t _protocolType, uint16_t _seqId, bytesConstRef _payload, bytes& o_bytes) +{ RLPStream header; - uint32_t len = (uint32_t)_packet.size(); + uint32_t len = (uint32_t)_payload.size(); header.appendRaw(bytes({byte((len >> 16) & 0xff), byte((len >> 8) & 0xff), byte(len & 0xff)})); - // zeroHeader: []byte{0xC2, 0x80, 0x80}. Should be rlpList(protocolType,seqId,totalPacketSize). - header.appendRaw(bytes({0xc2,0x80,0x80})); - - // TODO: SECURITY check that header is <= 16 bytes + header.appendList(2) << _protocolType << _seqId; + writeFrame(header, _payload, o_bytes); +} +void RLPXFrameCoder::writeFrame(uint16_t _protocolType, uint16_t _seqId, uint32_t _totalSize, bytesConstRef _payload, bytes& o_bytes) +{ + RLPStream header; + uint32_t len = (uint32_t)_payload.size(); + header.appendRaw(bytes({byte((len >> 16) & 0xff), byte((len >> 8) & 0xff), byte(len & 0xff)})); + header.appendList(3) << _protocolType << _seqId << _totalSize; + writeFrame(header, _payload, o_bytes); +} + +void RLPXFrameCoder::writeFrame(RLPStream const& _header, bytesConstRef _payload, bytes& o_bytes) +{ + // TODO: SECURITY check header values && header <= 16 bytes bytes headerWithMac(h256::size); - bytesConstRef(&header.out()).copyTo(bytesRef(&headerWithMac)); + bytesConstRef(&_header.out()).copyTo(bytesRef(&headerWithMac)); m_frameEnc.ProcessData(headerWithMac.data(), headerWithMac.data(), 16); updateEgressMACWithHeader(bytesConstRef(&headerWithMac).cropped(0, 16)); egressDigest().ref().copyTo(bytesRef(&headerWithMac).cropped(h128::size,h128::size)); - - auto padding = (16 - (_packet.size() % 16)) % 16; + + auto padding = (16 - (_payload.size() % 16)) % 16; o_bytes.swap(headerWithMac); - o_bytes.resize(32 + _packet.size() + padding + h128::size); - bytesRef packetRef(o_bytes.data() + 32, _packet.size()); - m_frameEnc.ProcessData(packetRef.data(), _packet.data(), _packet.size()); - bytesRef paddingRef(o_bytes.data() + 32 + _packet.size(), padding); + o_bytes.resize(32 + _payload.size() + padding + h128::size); + bytesRef packetRef(o_bytes.data() + 32, _payload.size()); + m_frameEnc.ProcessData(packetRef.data(), _payload.data(), _payload.size()); + bytesRef paddingRef(o_bytes.data() + 32 + _payload.size(), padding); if (padding) m_frameEnc.ProcessData(paddingRef.data(), paddingRef.data(), padding); - bytesRef packetWithPaddingRef(o_bytes.data() + 32, _packet.size() + padding); + bytesRef packetWithPaddingRef(o_bytes.data() + 32, _payload.size() + padding); updateEgressMACWithFrame(packetWithPaddingRef); - bytesRef macRef(o_bytes.data() + 32 + _packet.size() + padding, h128::size); + bytesRef macRef(o_bytes.data() + 32 + _payload.size() + padding, h128::size); egressDigest().ref().copyTo(macRef); } +void RLPXFrameCoder::writeSingleFramePacket(bytesConstRef _packet, bytes& o_bytes) +{ + RLPStream header; + uint32_t len = (uint32_t)_packet.size(); + header.appendRaw(bytes({byte((len >> 16) & 0xff), byte((len >> 8) & 0xff), byte(len & 0xff)})); + header.appendRaw(bytes({0xc2,0x80,0x80})); + writeFrame(header, _packet, o_bytes); +} + bool RLPXFrameCoder::authAndDecryptHeader(bytesRef io) { asserts(io.size() == h256::size); diff --git a/libp2p/RLPXFrameCoder.h b/libp2p/RLPXFrameCoder.h index 7c5eedbff..eeabb4f15 100644 --- a/libp2p/RLPXFrameCoder.h +++ b/libp2p/RLPXFrameCoder.h @@ -52,6 +52,15 @@ public: RLPXFrameCoder(RLPXHandshake const& _init); ~RLPXFrameCoder() {} + /// Write single-frame payload of packet(s). + void writeFrame(uint16_t _protocolType, bytesConstRef _payload, bytes& o_bytes); + + /// Write continuation frame of segmented payload. + void writeFrame(uint16_t _protocolType, uint16_t _seqId, bytesConstRef _payload, bytes& o_bytes); + + /// Write first frame of segmented payload. + void writeFrame(uint16_t _protocolType, uint16_t _seqId, uint32_t _totalSize, bytesConstRef _payload, bytes& o_bytes); + /// Encrypt _packet as RLPx frame. void writeSingleFramePacket(bytesConstRef _packet, bytes& o_bytes); @@ -68,6 +77,8 @@ public: h128 ingressDigest(); protected: + void writeFrame(RLPStream const& _header, bytesConstRef _payload, bytes& o_bytes); + /// Update state of egress MAC with frame header. void updateEgressMACWithHeader(bytesConstRef _headerCipher); diff --git a/libp2p/RLPXFrameWriter.cpp b/libp2p/RLPXFrameWriter.cpp new file mode 100644 index 000000000..468cdcf43 --- /dev/null +++ b/libp2p/RLPXFrameWriter.cpp @@ -0,0 +1,136 @@ +/* + This file is part of cpp-ethereum. + + cpp-ethereum is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + cpp-ethereum is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with cpp-ethereum. If not, see . + */ +/** @file RLPXFrameWriter.cpp + * @author Alex Leverington + * @date 2015 + */ + +#include "RLPXFrameWriter.h" + +using namespace std; +using namespace dev; +using namespace dev::p2p; + +void RLPXFrameWriter::enque(unsigned _packetType, RLPStream& _payload, PacketPriority _priority) +{ + QueueState& qs = _priority ? m_q.first : m_q.second; + DEV_GUARDED(qs.x) + qs.q.push_back(move(RLPXPacket(m_protocolType, _packetType, _payload))); +} + +size_t RLPXFrameWriter::drain(RLPXFrameCoder& _coder, unsigned _size, vector& o_toWrite) +{ + static const size_t c_blockSize = h128::size; + static const size_t c_overhead = c_blockSize * 2; + if (_size < c_overhead + c_blockSize) + return 0; + + size_t ret = 0; + size_t frameLen = _size; + bytes payload(0); + bool swapQueues = false; + while (frameLen >= c_overhead + c_blockSize) + { + bool highPending; + bool lowPending; + DEV_GUARDED(m_q.first.x) + highPending = !!m_q.first.q.size(); + DEV_GUARDED(m_q.second.x) + lowPending = !!m_q.second.q.size(); + + if (!highPending && !lowPending) + return 0; + + // first run when !swapQueues, high > low, otherwise low > high + bool high = highPending && !swapQueues ? true : lowPending ? false : true; + QueueState &qs = high ? m_q.first : m_q.second; + size_t frameAllot = (!swapQueues && highPending && lowPending ? frameLen / 2 - (c_overhead + c_blockSize) > 0 ? frameLen / 2 : frameLen : frameLen) - c_overhead; + bool sequenced = false; + size_t offset; + while (frameAllot >= c_blockSize) + { + if (qs.writing == nullptr) + { + DEV_GUARDED(qs.x) + qs.writing = &qs.q[0]; + qs.remaining = qs.writing->size(); + bytes packetType = rlp(qs.writing->type()); + qs.sequenced = qs.remaining + packetType.size() > frameAllot; + + // stop here if we can't write-out packet-type + // or payload already packed and packet won't fit + if (packetType.size() > frameAllot || (qs.sequenced && payload.size())) + { + qs.writing = nullptr; + qs.remaining = 0; + qs.sequenced = false; + break; + } + else if (qs.sequenced) + { + sequenced = true; + qs.sequence = ++m_sequenceId; + } + + frameAllot -= packetType.size(); + payload += packetType; + } + assert(qs.sequenced || (!qs.sequenced && frameAllot >= qs.remaining)); + if (frameAllot && qs.remaining) + { + offset = qs.writing->size() - qs.remaining; + auto length = qs.remaining <= frameAllot ? qs.remaining : frameAllot; + bytes portion = bytesConstRef(&qs.writing->data()).cropped(offset, length).toBytes(); + qs.remaining -= length; + frameAllot -= portion.size(); + payload += portion; + } + if (!qs.remaining) + qs.writing = nullptr; + if (qs.sequenced) + break; + } + + if (payload.size()) + { + if (qs.sequenced) + if (!offset) + _coder.writeFrame(m_protocolType, qs.sequence, qs.writing->size(), &payload, payload); + else + _coder.writeFrame(m_protocolType, qs.sequence, &payload, payload); + else + _coder.writeFrame(m_protocolType, &payload, payload); + frameLen -= payload.size(); + o_toWrite.push_back(move(payload)); + payload.resize(0); + + if (!qs.remaining) + { + qs.writing = nullptr; + qs.remaining = 0; + qs.sequenced = false; + DEV_GUARDED(qs.x) + qs.q.pop_front(); + ret++; + } + } + else if (swapQueues) + break; + swapQueues = true; + } + return ret; +} diff --git a/libp2p/RLPXFrameWriter.h b/libp2p/RLPXFrameWriter.h new file mode 100644 index 000000000..2515cce16 --- /dev/null +++ b/libp2p/RLPXFrameWriter.h @@ -0,0 +1,69 @@ +/* + This file is part of cpp-ethereum. + + cpp-ethereum is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + cpp-ethereum is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with cpp-ethereum. If not, see . + */ +/** @file RLPXperimental.h + * @author Alex Leverington + * @date 2015 + */ + + +#pragma once + +#include +#include "RLPXFrameCoder.h" +#include "RLPXPacket.h" +namespace ba = boost::asio; +namespace bi = boost::asio::ip; + +namespace dev +{ +namespace p2p +{ + +class RLPXFrameWriter +{ + struct QueueState + { + std::deque q; + RLPXPacket* writing = nullptr; + size_t remaining = 0; + bool sequenced = false; + uint16_t sequence; + mutable Mutex x; + }; + +public: + enum PacketPriority { PriorityLow = 0, PriorityHigh }; + + RLPXFrameWriter(uint16_t _protocolType): m_protocolType(_protocolType) {} + RLPXFrameWriter(RLPXFrameWriter const& _s): m_protocolType(_s.m_protocolType) {} + + size_t size() const { size_t l; size_t h; DEV_GUARDED(m_q.first.x) h = m_q.first.q.size(); DEV_GUARDED(m_q.second.x) l = m_q.second.q.size(); return l + h; } + + /// Thread-safe. + void enque(unsigned _packetType, RLPStream& _payload, PacketPriority _priority = PriorityLow); + + /// Returns number of packets framed and outputs frames to o_bytes. Not thread-safe. + size_t drain(RLPXFrameCoder& _coder, unsigned _size, std::vector& o_toWrite); + +private: + uint16_t const m_protocolType; // Protocol Type + std::pair m_q; // High, Low frame queues + uint16_t m_sequenceId = 0; // Sequence ID +}; + +} +} \ No newline at end of file diff --git a/libp2p/RLPXPacket.cpp b/libp2p/RLPXPacket.cpp new file mode 100644 index 000000000..e69de29bb diff --git a/libp2p/RLPXPacket.h b/libp2p/RLPXPacket.h new file mode 100644 index 000000000..cf29e7c97 --- /dev/null +++ b/libp2p/RLPXPacket.h @@ -0,0 +1,49 @@ +/* + This file is part of cpp-ethereum. + + cpp-ethereum is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + cpp-ethereum is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with cpp-ethereum. If not, see . + */ +/** @file RLPXPacket.h + * @author Alex Leverington + * @date 2015 + */ + +#pragma once + +#include "Common.h" + +namespace dev +{ +namespace p2p +{ + +struct RLPXPacketNullPacket: virtual dev::Exception {}; + +class RLPXPacket +{ +public: + RLPXPacket(unsigned _capId, unsigned _type, RLPStream& _rlps): m_cap(_capId), m_type(_type), m_data(std::move(_rlps.out())) { if (!_type && !m_data.size()) BOOST_THROW_EXCEPTION(RLPXPacketNullPacket()); } + + unsigned type() const { return m_type; } + bytes const& data() const { return m_data; } + size_t size() const { return m_data.size(); } + +protected: + unsigned m_cap; + unsigned m_type; + bytes m_data; +}; + +} +} \ No newline at end of file diff --git a/libp2p/RLPXSocketIO.cpp b/libp2p/RLPXSocketIO.cpp new file mode 100644 index 000000000..391a03a6d --- /dev/null +++ b/libp2p/RLPXSocketIO.cpp @@ -0,0 +1,112 @@ +/* + This file is part of cpp-ethereum. + + cpp-ethereum is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + cpp-ethereum is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with cpp-ethereum. If not, see . + */ +/** @file RLPXSocketIO.cpp + * @author Alex Leverington + * @date 2015 + */ + +#include "RLPXSocketIO.h" + +#include +using namespace std; +using namespace dev; +using namespace dev::p2p; + +uint32_t const RLPXSocketIO::MinFrameSize = /*header*/h128::size + /*type*/1 + /*mac*/h128::size; +uint32_t const RLPXSocketIO::MaxPacketSize = 1 << 24; +uint16_t const RLPXSocketIO::DefaultInitialCapacity = 8 << 8; + +RLPXSocketIO::RLPXSocketIO(unsigned _protCount, RLPXFrameCoder& _coder, bi::tcp::socket& _socket, bool _flowControl, size_t _initialCapacity): + m_flowControl(_flowControl), + m_coder(_coder), + m_socket(_socket), + m_writers(move(writers(_protCount))), + m_egressCapacity(m_flowControl ? _initialCapacity : MaxPacketSize * m_writers.size()) +{} + +vector RLPXSocketIO::writers(unsigned _capacity) +{ + vector ret; + for (unsigned i = 0; i < _capacity; i++) + ret.push_back(RLPXFrameWriter(i)); + return ret; +} + +void RLPXSocketIO::send(unsigned _protocolType, unsigned _type, RLPStream& _payload) +{ + if (!m_socket.is_open()) + return; // TCPSocketNotOpen + m_writers.at(_protocolType).enque(_type, _payload); + bool wasEmtpy = false; + DEV_GUARDED(x_queued) + wasEmtpy = (++m_queued == 1); + if (wasEmtpy) + doWrite(); +} + +void RLPXSocketIO::doWrite() +{ + m_toSend.clear(); + + size_t capacity; + DEV_GUARDED(x_queued) + capacity = min(m_egressCapacity, MaxPacketSize); + + size_t active = 0; + for (auto const& w: m_writers) + if (w.size()) + active += 1; + size_t dequed = 0; + size_t protFrameSize = capacity / active; + if (protFrameSize >= MinFrameSize * active) + for (auto& w: m_writers) + dequed += w.drain(m_coder, protFrameSize, m_toSend); + + if (dequed) + write(dequed); + else + deferWrite(); +} + +void RLPXSocketIO::deferWrite() +{ + auto self(shared_from_this()); + m_congestion.reset(new ba::deadline_timer(m_socket.get_io_service())); + m_congestion->expires_from_now(boost::posix_time::milliseconds(50)); + m_congestion->async_wait([=](boost::system::error_code const& _ec) { m_congestion.reset(); if (!_ec) doWrite(); }); +} + +void RLPXSocketIO::write(size_t _dequed) +{ + auto self(shared_from_this()); + ba::async_write(m_socket, ba::buffer(m_toSend), [this, self, _dequed](boost::system::error_code ec, size_t written) + { + if (ec) + return; // TCPSocketWriteError + + bool reschedule = false; + DEV_GUARDED(x_queued) + { + if (m_flowControl) + m_egressCapacity -= written; + m_queued -= _dequed; + reschedule = m_queued > 0; + } + if (reschedule) + doWrite(); + }); +} \ No newline at end of file diff --git a/libp2p/RLPXSocketIO.h b/libp2p/RLPXSocketIO.h new file mode 100644 index 000000000..47dd5aa9c --- /dev/null +++ b/libp2p/RLPXSocketIO.h @@ -0,0 +1,71 @@ +/* + This file is part of cpp-ethereum. + + cpp-ethereum is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + cpp-ethereum is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with cpp-ethereum. If not, see . + */ +/** @file RLPXSocketIO.h + * @author Alex Leverington + * @date 2015 + */ + +#pragma once + +#include "RLPXFrameWriter.h" +namespace ba = boost::asio; +namespace bi = boost::asio::ip; + +namespace dev +{ +namespace p2p +{ + +class RLPXSocketIO: public std::enable_shared_from_this +{ +public: + static uint32_t const MinFrameSize; + static uint32_t const MaxPacketSize; + static uint16_t const DefaultInitialCapacity; + + RLPXSocketIO(unsigned _protCount, RLPXFrameCoder& _coder, bi::tcp::socket& _socket, bool _flowControl = true, size_t _initialCapacity = DefaultInitialCapacity); + + void send(unsigned _protocolType, unsigned _type, RLPStream& _payload); + + void doWrite(); + + bool congested() const { return !!m_congestion; } + +private: + static std::vector writers(unsigned _capacity); + + void deferWrite(); + + void write(size_t _dequed); + + bool const m_flowControl; ///< True if flow control is enabled. + + RLPXFrameCoder& m_coder; ///< Encoder/decoder of frame payloads. + bi::tcp::socket& m_socket; + + std::vector m_toSend; ///< Reusable byte buffer for pending socket writes. + + std::vector m_writers; ///< Write queues for each protocol. TODO: map to bytes (of capability) + std::unique_ptr m_congestion; ///< Scheduled when writes are deferred due to congestion. + + Mutex x_queued; + unsigned m_queued = 0; ///< Track total queued packets to ensure single write loop + uint32_t m_egressCapacity; +}; + +} +} \ No newline at end of file From 3cc6f40dcb7eba60ced226d5ef5f62e557d01a5a Mon Sep 17 00:00:00 2001 From: subtly Date: Sun, 14 Jun 2015 03:19:39 -0400 Subject: [PATCH 02/22] corrections --- libp2p/RLPXFrameWriter.cpp | 1 - libp2p/RLPXSocketIO.cpp | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/libp2p/RLPXFrameWriter.cpp b/libp2p/RLPXFrameWriter.cpp index 468cdcf43..7de761edf 100644 --- a/libp2p/RLPXFrameWriter.cpp +++ b/libp2p/RLPXFrameWriter.cpp @@ -121,7 +121,6 @@ size_t RLPXFrameWriter::drain(RLPXFrameCoder& _coder, unsigned _size, vector= MinFrameSize * active) + if (protFrameSize >= MinFrameSize) for (auto& w: m_writers) dequed += w.drain(m_coder, protFrameSize, m_toSend); From 7d4e89a20c49ebef0b5e89010bd1eac3841621fb Mon Sep 17 00:00:00 2001 From: subtly Date: Sun, 14 Jun 2015 04:11:25 -0400 Subject: [PATCH 03/22] no-unused --- libp2p/RLPXFrameWriter.cpp | 4 ---- 1 file changed, 4 deletions(-) diff --git a/libp2p/RLPXFrameWriter.cpp b/libp2p/RLPXFrameWriter.cpp index 7de761edf..70f545024 100644 --- a/libp2p/RLPXFrameWriter.cpp +++ b/libp2p/RLPXFrameWriter.cpp @@ -59,7 +59,6 @@ size_t RLPXFrameWriter::drain(RLPXFrameCoder& _coder, unsigned _size, vector 0 ? frameLen / 2 : frameLen : frameLen) - c_overhead; - bool sequenced = false; size_t offset; while (frameAllot >= c_blockSize) { @@ -81,10 +80,7 @@ size_t RLPXFrameWriter::drain(RLPXFrameCoder& _coder, unsigned _size, vector Date: Mon, 15 Jun 2015 07:29:02 -0400 Subject: [PATCH 04/22] -Werror=maybe-uninitialized --- libp2p/RLPXFrameWriter.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libp2p/RLPXFrameWriter.cpp b/libp2p/RLPXFrameWriter.cpp index 70f545024..7f4a964e5 100644 --- a/libp2p/RLPXFrameWriter.cpp +++ b/libp2p/RLPXFrameWriter.cpp @@ -59,7 +59,7 @@ size_t RLPXFrameWriter::drain(RLPXFrameCoder& _coder, unsigned _size, vector 0 ? frameLen / 2 : frameLen : frameLen) - c_overhead; - size_t offset; + size_t offset = 0; while (frameAllot >= c_blockSize) { if (qs.writing == nullptr) @@ -104,7 +104,7 @@ size_t RLPXFrameWriter::drain(RLPXFrameCoder& _coder, unsigned _size, vectorsize(), &payload, payload); else _coder.writeFrame(m_protocolType, qs.sequence, &payload, payload); From 2999d3e3e5b3b6316036f1bc26fa4e2c7705c913 Mon Sep 17 00:00:00 2001 From: subtly Date: Wed, 17 Jun 2015 21:28:27 -0400 Subject: [PATCH 05/22] Add test for frame writer. Cleanup abstraction in preparation for reader. Initial reader code. --- libp2p/RLPXFrameCoder.cpp | 40 ++++++++++++++----------- libp2p/RLPXFrameCoder.h | 10 +++++-- libp2p/RLPXFrameWriter.cpp | 9 +++--- libp2p/RLPXFrameWriter.h | 61 +++++++++++++++++++++++++++++++++++++- libp2p/RLPXPacket.h | 23 ++++++++++++-- test/libp2p/rlpx.cpp | 52 ++++++++++++++++++++++++++++++++ 6 files changed, 167 insertions(+), 28 deletions(-) diff --git a/libp2p/RLPXFrameCoder.cpp b/libp2p/RLPXFrameCoder.cpp index be718673e..dbbe1ddee 100644 --- a/libp2p/RLPXFrameCoder.cpp +++ b/libp2p/RLPXFrameCoder.cpp @@ -32,22 +32,26 @@ using namespace CryptoPP; RLPXFrameCoder::RLPXFrameCoder(RLPXHandshake const& _init) { - // we need: - // originated? - // Secret == output of ecdhe agreement - // authCipher - // ackCipher + setup(_init.m_originated, _init.m_remoteEphemeral, _init.m_remoteNonce, _init.m_ecdhe, _init.m_nonce, &_init.m_ackCipher, &_init.m_authCipher); +} + +RLPXFrameCoder::RLPXFrameCoder(bool _originated, h512 _remoteEphemeral, h256 _remoteNonce, crypto::ECDHE const& _ecdhe, h256 _nonce, bytesConstRef _ackCipher, bytesConstRef _authCipher) +{ + setup(_originated, _remoteEphemeral, _remoteNonce, _ecdhe, _nonce, _ackCipher, _authCipher); +} +void RLPXFrameCoder::setup(bool _originated, h512 _remoteEphemeral, h256 _remoteNonce, crypto::ECDHE const& _ecdhe, h256 _nonce, bytesConstRef _ackCipher, bytesConstRef _authCipher) +{ bytes keyMaterialBytes(64); bytesRef keyMaterial(&keyMaterialBytes); - + // shared-secret = sha3(ecdhe-shared-secret || sha3(nonce || initiator-nonce)) Secret ephemeralShared; - _init.m_ecdhe.agree(_init.m_remoteEphemeral, ephemeralShared); + _ecdhe.agree(_remoteEphemeral, ephemeralShared); ephemeralShared.ref().copyTo(keyMaterial.cropped(0, h256::size)); h512 nonceMaterial; - h256 const& leftNonce = _init.m_originated ? _init.m_remoteNonce : _init.m_nonce; - h256 const& rightNonce = _init.m_originated ? _init.m_nonce : _init.m_remoteNonce; + h256 const& leftNonce = _originated ? _remoteNonce : _nonce; + h256 const& rightNonce = _originated ? _nonce : _remoteNonce; leftNonce.ref().copyTo(nonceMaterial.ref().cropped(0, h256::size)); rightNonce.ref().copyTo(nonceMaterial.ref().cropped(h256::size, h256::size)); auto outRef(keyMaterial.cropped(h256::size, h256::size)); @@ -65,31 +69,31 @@ RLPXFrameCoder::RLPXFrameCoder(RLPXHandshake const& _init) h128 iv; m_frameEnc.SetKeyWithIV(m_frameEncKey, h256::size, iv.data()); m_frameDec.SetKeyWithIV(m_frameDecKey, h256::size, iv.data()); - + // mac-secret = sha3(ecdhe-shared-secret || aes-secret) sha3(keyMaterial, outRef); // output mac-secret m_macEncKey.resize(h256::size); memcpy(m_macEncKey.data(), outRef.data(), h256::size); m_macEnc.SetKey(m_macEncKey, h256::size); - + // Initiator egress-mac: sha3(mac-secret^recipient-nonce || auth-sent-init) // ingress-mac: sha3(mac-secret^initiator-nonce || auth-recvd-ack) // Recipient egress-mac: sha3(mac-secret^initiator-nonce || auth-sent-ack) // ingress-mac: sha3(mac-secret^recipient-nonce || auth-recvd-init) - (*(h256*)outRef.data() ^ _init.m_remoteNonce).ref().copyTo(keyMaterial); - bytes const& egressCipher = _init.m_originated ? _init.m_authCipher : _init.m_ackCipher; + (*(h256*)outRef.data() ^ _remoteNonce).ref().copyTo(keyMaterial); + bytesConstRef egressCipher = _originated ? _authCipher : _ackCipher; keyMaterialBytes.resize(h256::size + egressCipher.size()); keyMaterial.retarget(keyMaterialBytes.data(), keyMaterialBytes.size()); - bytesConstRef(&egressCipher).copyTo(keyMaterial.cropped(h256::size, egressCipher.size())); + egressCipher.copyTo(keyMaterial.cropped(h256::size, egressCipher.size())); m_egressMac.Update(keyMaterial.data(), keyMaterial.size()); - + // recover mac-secret by re-xoring remoteNonce - (*(h256*)keyMaterial.data() ^ _init.m_remoteNonce ^ _init.m_nonce).ref().copyTo(keyMaterial); - bytes const& ingressCipher = _init.m_originated ? _init.m_ackCipher : _init.m_authCipher; + (*(h256*)keyMaterial.data() ^ _remoteNonce ^ _nonce).ref().copyTo(keyMaterial); + bytesConstRef ingressCipher = _originated ? _ackCipher : _authCipher; keyMaterialBytes.resize(h256::size + ingressCipher.size()); keyMaterial.retarget(keyMaterialBytes.data(), keyMaterialBytes.size()); - bytesConstRef(&ingressCipher).copyTo(keyMaterial.cropped(h256::size, ingressCipher.size())); + ingressCipher.copyTo(keyMaterial.cropped(h256::size, ingressCipher.size())); m_ingressMac.Update(keyMaterial.data(), keyMaterial.size()); } diff --git a/libp2p/RLPXFrameCoder.h b/libp2p/RLPXFrameCoder.h index eeabb4f15..9706da477 100644 --- a/libp2p/RLPXFrameCoder.h +++ b/libp2p/RLPXFrameCoder.h @@ -47,11 +47,17 @@ class RLPXFrameCoder friend class RLPXFrameIOMux; friend class Session; public: - /// Constructor. - /// Requires instance of RLPXHandshake which has completed first two phases of handshake. + /// Construct; requires instance of RLPXHandshake which has encrypted ECDH key exchange (first two phases of handshake). RLPXFrameCoder(RLPXHandshake const& _init); + + /// Construct with external key material. + RLPXFrameCoder(bool _originated, h512 _remoteEphemeral, h256 _remoteNonce, crypto::ECDHE const& _ephemeral, h256 _nonce, bytesConstRef _ackCipher, bytesConstRef _authCipher); + ~RLPXFrameCoder() {} + /// Establish shared secrets and setup AES and MAC states. Used by both constructors. + void setup(bool _originated, h512 _remoteEphemeral, h256 _remoteNonce, crypto::ECDHE const& _ephemeral, h256 _nonce, bytesConstRef _ackCipher, bytesConstRef _authCipher); + /// Write single-frame payload of packet(s). void writeFrame(uint16_t _protocolType, bytesConstRef _payload, bytes& o_bytes); diff --git a/libp2p/RLPXFrameWriter.cpp b/libp2p/RLPXFrameWriter.cpp index 7f4a964e5..e3b656fb5 100644 --- a/libp2p/RLPXFrameWriter.cpp +++ b/libp2p/RLPXFrameWriter.cpp @@ -35,7 +35,7 @@ void RLPXFrameWriter::enque(unsigned _packetType, RLPStream& _payload, PacketPri size_t RLPXFrameWriter::drain(RLPXFrameCoder& _coder, unsigned _size, vector& o_toWrite) { static const size_t c_blockSize = h128::size; - static const size_t c_overhead = c_blockSize * 2; + static const size_t c_overhead = c_blockSize * 3; // header + headerMac + frameMAC if (_size < c_overhead + c_blockSize) return 0; @@ -60,6 +60,7 @@ size_t RLPXFrameWriter::drain(RLPXFrameCoder& _coder, unsigned _size, vector 0 ? frameLen / 2 : frameLen : frameLen) - c_overhead; size_t offset = 0; + size_t length = 0; while (frameAllot >= c_blockSize) { if (qs.writing == nullptr) @@ -89,13 +90,13 @@ size_t RLPXFrameWriter::drain(RLPXFrameCoder& _coder, unsigned _size, vectorsize() - qs.remaining; - auto length = qs.remaining <= frameAllot ? qs.remaining : frameAllot; + length = qs.remaining <= frameAllot ? qs.remaining : frameAllot; bytes portion = bytesConstRef(&qs.writing->data()).cropped(offset, length).toBytes(); qs.remaining -= length; frameAllot -= portion.size(); payload += portion; } - if (!qs.remaining) + if (!qs.remaining && ret++) qs.writing = nullptr; if (qs.sequenced) break; @@ -110,6 +111,7 @@ size_t RLPXFrameWriter::drain(RLPXFrameCoder& _coder, unsigned _size, vector= 0); frameLen -= payload.size(); o_toWrite.push_back(move(payload)); payload.resize(0); @@ -120,7 +122,6 @@ size_t RLPXFrameWriter::drain(RLPXFrameCoder& _coder, unsigned _size, vector assemble(bytes& _in) + { + bytesConstRef buffer(&_in); + std::vector ret; + if (!_in.size()) + return ret; + + while (!buffer.empty()) + { + auto type = RLPXPacket::nextRLP(buffer); + buffer = buffer.cropped(type.size()); + auto packet = RLPXPacket::nextRLP(buffer); + buffer = buffer.cropped(packet.size()); + RLPXPacket p(m_protocolType, type); + p.streamIn(packet); + ret.push_back(std::move(p)); + } + return ret; + } + + std::vector assemble(bytes& _in, uint16_t _seq, uint32_t _totalSize) + { + // TODO: cleanup sequencing api (throw exception(s) when bad things happen) + + bytesConstRef buffer(&_in); + if (!m_incomplete.count(_seq) && _totalSize > 0) + { + // new packet + RLPXPacket p(m_protocolType, buffer); + if (p.isValid()) + return std::vector{std::move(p)}; + m_incomplete[_seq] = std::move(p); + } + else + { + RLPXPacket& p = m_incomplete[_seq]; + p.streamIn(buffer); + if (p.isValid()) + { + std::vector ret{std::move(p)}; + m_incomplete.erase(_seq); + return ret; + } + } + + return std::vector(); + } + +protected: + uint16_t m_protocolType; + std::map m_incomplete; ///< Incomplete packets which span multiple frames. +}; + class RLPXFrameWriter { struct QueueState @@ -47,13 +104,15 @@ class RLPXFrameWriter public: enum PacketPriority { PriorityLow = 0, PriorityHigh }; + static const uint16_t EmptyFrameLength = h128::size * 3; // header + headerMAC + frameMAC + static const uint16_t MinFrameDequeLength = h128::size * 4; // header + headerMAC + padded-block + frameMAC RLPXFrameWriter(uint16_t _protocolType): m_protocolType(_protocolType) {} RLPXFrameWriter(RLPXFrameWriter const& _s): m_protocolType(_s.m_protocolType) {} size_t size() const { size_t l; size_t h; DEV_GUARDED(m_q.first.x) h = m_q.first.q.size(); DEV_GUARDED(m_q.second.x) l = m_q.second.q.size(); return l + h; } - /// Thread-safe. + /// Contents of _payload will be moved. Thread-safe. void enque(unsigned _packetType, RLPStream& _payload, PacketPriority _priority = PriorityLow); /// Returns number of packets framed and outputs frames to o_bytes. Not thread-safe. diff --git a/libp2p/RLPXPacket.h b/libp2p/RLPXPacket.h index cf29e7c97..42941d02f 100644 --- a/libp2p/RLPXPacket.h +++ b/libp2p/RLPXPacket.h @@ -28,18 +28,35 @@ namespace dev namespace p2p { -struct RLPXPacketNullPacket: virtual dev::Exception {}; - +struct RLPXNullPacket: virtual dev::Exception {}; +struct RLPXInvalidPacket: virtual dev::Exception {}; + +/** + * RLPX Packet + */ class RLPXPacket { public: - RLPXPacket(unsigned _capId, unsigned _type, RLPStream& _rlps): m_cap(_capId), m_type(_type), m_data(std::move(_rlps.out())) { if (!_type && !m_data.size()) BOOST_THROW_EXCEPTION(RLPXPacketNullPacket()); } + static bytesConstRef nextRLP(bytesConstRef _b) { try { RLP r(_b, RLP::AllowNonCanon); auto s = r.actualSize(); if (s >= _b.size()) return _b.cropped(s); } catch(...) {} return bytesConstRef(); } + + /// Construct complete packet. RLPStream data is moved. + RLPXPacket(unsigned _capId, unsigned _type, RLPStream&& _rlps): m_cap(_capId), m_type(_type), m_data(std::move(_rlps.out())) { if (!_type && !m_data.size()) BOOST_THROW_EXCEPTION(RLPXNullPacket()); } + + /// Construct packet with type and initial bytes; type is determined by RLP of 1st byte and packet may be incomplete. + RLPXPacket(unsigned _capId, bytesConstRef _in): m_cap(_capId), m_type(getType(_in)) { assert(_in.size()); if (_in.size() > 1) { m_data.resize(_in.size() - 1); _in.cropped(1).copyTo(&m_data); } } unsigned type() const { return m_type; } bytes const& data() const { return m_data; } size_t size() const { return m_data.size(); } + bool streamIn(bytesConstRef _in) { auto offset = m_data.size(); m_data.resize(offset + _in.size()); _in.copyTo(byestConstRef(m_data).cropped(offset)); return isValid(); } + + bool isValid() { if (m_type > 0x7f) return false; try { if (RLP(m_data).actualSize() == m_data.size()) return true; } catch (...) {} return false; } + + protected: + unsigned getType(bytesConstRef _rlp) { return RLP(_rlp.cropped(1)).toInt(); } + unsigned m_cap; unsigned m_type; bytes m_data; diff --git a/test/libp2p/rlpx.cpp b/test/libp2p/rlpx.cpp index 620ddd952..e7cd1b657 100644 --- a/test/libp2p/rlpx.cpp +++ b/test/libp2p/rlpx.cpp @@ -31,10 +31,12 @@ #include #include #include +#include using namespace std; using namespace dev; using namespace dev::crypto; +using namespace dev::p2p; using namespace CryptoPP; BOOST_AUTO_TEST_SUITE(rlpx) @@ -451,5 +453,55 @@ BOOST_AUTO_TEST_CASE(ecies_interop_test_primitives) BOOST_REQUIRE(plainTest3 == expectedPlain3); } +BOOST_AUTO_TEST_CASE(readerWriter) +{ + ECDHE localEph; + h256 localNonce = Nonce::get(); + ECDHE remoteEph; + h256 remoteNonce = Nonce::get(); + bytes ackCipher{0}; + bytes authCipher{1}; + RLPXFrameCoder coder(true, remoteEph.pubkey(), remoteNonce, localEph, localNonce, &ackCipher, &authCipher); + + /// Test writing a 64byte packet and drain with minimum frame size that + /// forces packet to be pieced into 4 frames. + /// (Minimum frame size has room for 16 bytes of payload) + + // 64-byte payload minus 3 bytes for packet-type and RLP overhead. + auto dequeLen = 16; + bytes stuff = sha3("A").asBytes(); + bytes payload; + payload += stuff; + payload += stuff; + payload.resize(payload.size() - 3 /* packet-type, rlp-overhead */); + BOOST_REQUIRE_EQUAL(61, payload.size()); + + auto drains = (payload.size() + 3) / dequeLen; + BOOST_REQUIRE_EQUAL(4, drains); + + RLPXFrameWriter w(0); + w.enque(0, (RLPStream() << payload)); + vector out; + for (unsigned i = 1; i < drains; i++) + { + auto n = w.drain(coder, RLPXFrameWriter::MinFrameDequeLength, out); + BOOST_REQUIRE_EQUAL(0, n); + BOOST_REQUIRE_EQUAL(out.size(), i); + } + BOOST_REQUIRE_EQUAL(1, w.drain(coder, RLPXFrameWriter::MinFrameDequeLength, out)); + BOOST_REQUIRE_EQUAL(out.size(), drains); + BOOST_REQUIRE_EQUAL(0, w.drain(coder, RLPXFrameWriter::MinFrameDequeLength, out)); + BOOST_REQUIRE_EQUAL(out.size(), drains); + + // we should now have a bunch of ciphertext in out + BOOST_REQUIRE(out.size() == drains); + for (auto const& c: out) + BOOST_REQUIRE(c.size() == RLPXFrameWriter::MinFrameDequeLength); + + // read and assemble dequed frames + + +} + BOOST_AUTO_TEST_SUITE_END() From fe0f71536f6b420bedde8a7402a31e6cad544cc1 Mon Sep 17 00:00:00 2001 From: subtly Date: Fri, 19 Jun 2015 23:42:16 -0400 Subject: [PATCH 06/22] rlpx frame reader fixes. rename writer::drain to mux. --- libp2p/RLPXFrameWriter.cpp | 2 +- libp2p/RLPXFrameWriter.h | 86 +++++++++++++++++++++----------------- libp2p/RLPXPacket.h | 15 ++++--- libp2p/RLPXSocketIO.cpp | 2 +- test/libp2p/rlpx.cpp | 6 +-- 5 files changed, 63 insertions(+), 48 deletions(-) diff --git a/libp2p/RLPXFrameWriter.cpp b/libp2p/RLPXFrameWriter.cpp index e3b656fb5..c8c1e8ceb 100644 --- a/libp2p/RLPXFrameWriter.cpp +++ b/libp2p/RLPXFrameWriter.cpp @@ -32,7 +32,7 @@ void RLPXFrameWriter::enque(unsigned _packetType, RLPStream& _payload, PacketPri qs.q.push_back(move(RLPXPacket(m_protocolType, _packetType, _payload))); } -size_t RLPXFrameWriter::drain(RLPXFrameCoder& _coder, unsigned _size, vector& o_toWrite) +size_t RLPXFrameWriter::mux(RLPXFrameCoder& _coder, unsigned _size, vector& o_toWrite) { static const size_t c_blockSize = h128::size; static const size_t c_overhead = c_blockSize * 3; // header + headerMac + frameMAC diff --git a/libp2p/RLPXFrameWriter.h b/libp2p/RLPXFrameWriter.h index 60f711add..957bb09a2 100644 --- a/libp2p/RLPXFrameWriter.h +++ b/libp2p/RLPXFrameWriter.h @@ -33,61 +33,71 @@ namespace dev namespace p2p { +/** + * RLPFrameReader + * Reads and assembles RLPX frame byte buffers into RLPX packets. Additionally + * buffers incomplete packets which are pieced into multiple frames (has sequence). + * @todo drop frame and reset incomplete if + * @todo percolate sequenceid to p2p protocol + * @todo informative exception + */ class RLPXFrameReader { RLPXFrameReader(uint16_t _protocolType): m_protocolType(_protocolType) {} - std::vector assemble(bytes& _in) + /// Processes a single frame returning complete packets. + std::vector demux(bytes& _frame, bool _sequence = false, uint16_t _seq = 0, uint32_t _totalSize = 0) { - bytesConstRef buffer(&_in); std::vector ret; - if (!_in.size()) + if (!_frame.size() || _frame.size() > _totalSize) return ret; - + + if (_sequence && m_incomplete.count(_seq)) + { + uint32_t& remaining = m_incomplete.at(_seq).second; + if (!_totalSize && _frame.size() <= remaining) + { + RLPXPacket& p = m_incomplete.at(_seq).first; + if (_frame.size() > remaining) + return ret; + else if(p.streamIn(&_frame)) + { + ret.push_back(std::move(p)); + m_incomplete.erase(_seq); + } + else + remaining -= _frame.size(); + return ret; + } + else + m_incomplete.erase(_seq); + } + + bytesConstRef buffer(&_frame); while (!buffer.empty()) { auto type = RLPXPacket::nextRLP(buffer); + if (type.empty()) + break; buffer = buffer.cropped(type.size()); - auto packet = RLPXPacket::nextRLP(buffer); + // consume entire buffer if packet has sequence + auto packet = _sequence ? buffer : RLPXPacket::nextRLP(buffer); buffer = buffer.cropped(packet.size()); RLPXPacket p(m_protocolType, type); - p.streamIn(packet); - ret.push_back(std::move(p)); - } - return ret; - } - - std::vector assemble(bytes& _in, uint16_t _seq, uint32_t _totalSize) - { - // TODO: cleanup sequencing api (throw exception(s) when bad things happen) - - bytesConstRef buffer(&_in); - if (!m_incomplete.count(_seq) && _totalSize > 0) - { - // new packet - RLPXPacket p(m_protocolType, buffer); - if (p.isValid()) - return std::vector{std::move(p)}; - m_incomplete[_seq] = std::move(p); - } - else - { - RLPXPacket& p = m_incomplete[_seq]; - p.streamIn(buffer); + if (!packet.empty()) + p.streamIn(packet); + if (p.isValid()) - { - std::vector ret{std::move(p)}; - m_incomplete.erase(_seq); - return ret; - } + ret.push_back(std::move(p)); + else if (_sequence) + m_incomplete.insert(std::make_pair(_seq, std::make_pair(std::move(p), _totalSize - _frame.size()))); } - - return std::vector(); + return ret; } protected: uint16_t m_protocolType; - std::map m_incomplete; ///< Incomplete packets which span multiple frames. + std::map> m_incomplete; ///< Incomplete packets and bytes remaining. }; class RLPXFrameWriter @@ -112,11 +122,11 @@ public: size_t size() const { size_t l; size_t h; DEV_GUARDED(m_q.first.x) h = m_q.first.q.size(); DEV_GUARDED(m_q.second.x) l = m_q.second.q.size(); return l + h; } - /// Contents of _payload will be moved. Thread-safe. + /// Contents of _payload will be moved. Adds packet to queue, to be muxed into frames by mux when network buffer is ready for writing. Thread-safe. void enque(unsigned _packetType, RLPStream& _payload, PacketPriority _priority = PriorityLow); /// Returns number of packets framed and outputs frames to o_bytes. Not thread-safe. - size_t drain(RLPXFrameCoder& _coder, unsigned _size, std::vector& o_toWrite); + size_t mux(RLPXFrameCoder& _coder, unsigned _size, std::vector& o_toWrite); private: uint16_t const m_protocolType; // Protocol Type diff --git a/libp2p/RLPXPacket.h b/libp2p/RLPXPacket.h index 42941d02f..1421c74f7 100644 --- a/libp2p/RLPXPacket.h +++ b/libp2p/RLPXPacket.h @@ -40,18 +40,22 @@ public: static bytesConstRef nextRLP(bytesConstRef _b) { try { RLP r(_b, RLP::AllowNonCanon); auto s = r.actualSize(); if (s >= _b.size()) return _b.cropped(s); } catch(...) {} return bytesConstRef(); } /// Construct complete packet. RLPStream data is moved. - RLPXPacket(unsigned _capId, unsigned _type, RLPStream&& _rlps): m_cap(_capId), m_type(_type), m_data(std::move(_rlps.out())) { if (!_type && !m_data.size()) BOOST_THROW_EXCEPTION(RLPXNullPacket()); } + RLPXPacket(unsigned _capId, unsigned _type, RLPStream& _rlps): m_cap(_capId), m_type(_type), m_data(std::move(_rlps.out())) { if (!_type && !m_data.size()) BOOST_THROW_EXCEPTION(RLPXNullPacket()); } /// Construct packet with type and initial bytes; type is determined by RLP of 1st byte and packet may be incomplete. RLPXPacket(unsigned _capId, bytesConstRef _in): m_cap(_capId), m_type(getType(_in)) { assert(_in.size()); if (_in.size() > 1) { m_data.resize(_in.size() - 1); _in.cropped(1).copyTo(&m_data); } } + RLPXPacket(RLPXPacket const& _p): m_cap(_p.m_cap), m_type(_p.m_type), m_data(_p.m_data) {} + + RLPXPacket(RLPXPacket&& _p): m_cap(_p.m_cap), m_type(_p.m_type), m_data(std::move(_p.m_data)) {} + unsigned type() const { return m_type; } bytes const& data() const { return m_data; } - size_t size() const { return m_data.size(); } - - bool streamIn(bytesConstRef _in) { auto offset = m_data.size(); m_data.resize(offset + _in.size()); _in.copyTo(byestConstRef(m_data).cropped(offset)); return isValid(); } + size_t size() const { try { return RLP(m_data, RLP::LaisezFaire).actualSize(); } catch(...) { return 0; } } + + bool streamIn(bytesConstRef _in) { auto offset = m_data.size(); m_data.resize(offset + _in.size()); _in.copyTo(bytesRef(&m_data).cropped(offset)); return isValid(); } - bool isValid() { if (m_type > 0x7f) return false; try { if (RLP(m_data).actualSize() == m_data.size()) return true; } catch (...) {} return false; } + bool isValid() { if (m_type > 0x7f) return false; try { return RLP(m_data).actualSize() == m_data.size(); } catch (...) {} return false; } protected: @@ -59,6 +63,7 @@ protected: unsigned m_cap; unsigned m_type; + uint16_t m_sequence = 0; bytes m_data; }; diff --git a/libp2p/RLPXSocketIO.cpp b/libp2p/RLPXSocketIO.cpp index ccd30409d..deb47a728 100644 --- a/libp2p/RLPXSocketIO.cpp +++ b/libp2p/RLPXSocketIO.cpp @@ -74,7 +74,7 @@ void RLPXSocketIO::doWrite() size_t protFrameSize = capacity / active; if (protFrameSize >= MinFrameSize) for (auto& w: m_writers) - dequed += w.drain(m_coder, protFrameSize, m_toSend); + dequed += w.mux(m_coder, protFrameSize, m_toSend); if (dequed) write(dequed); diff --git a/test/libp2p/rlpx.cpp b/test/libp2p/rlpx.cpp index e7cd1b657..4e974861b 100644 --- a/test/libp2p/rlpx.cpp +++ b/test/libp2p/rlpx.cpp @@ -484,13 +484,13 @@ BOOST_AUTO_TEST_CASE(readerWriter) vector out; for (unsigned i = 1; i < drains; i++) { - auto n = w.drain(coder, RLPXFrameWriter::MinFrameDequeLength, out); + auto n = w.mux(coder, RLPXFrameWriter::MinFrameDequeLength, out); BOOST_REQUIRE_EQUAL(0, n); BOOST_REQUIRE_EQUAL(out.size(), i); } - BOOST_REQUIRE_EQUAL(1, w.drain(coder, RLPXFrameWriter::MinFrameDequeLength, out)); + BOOST_REQUIRE_EQUAL(1, w.mux(coder, RLPXFrameWriter::MinFrameDequeLength, out)); BOOST_REQUIRE_EQUAL(out.size(), drains); - BOOST_REQUIRE_EQUAL(0, w.drain(coder, RLPXFrameWriter::MinFrameDequeLength, out)); + BOOST_REQUIRE_EQUAL(0, w.mux(coder, RLPXFrameWriter::MinFrameDequeLength, out)); BOOST_REQUIRE_EQUAL(out.size(), drains); // we should now have a bunch of ciphertext in out From 1a4f170b0a23688848ed3518b2a4faa83e1f2855 Mon Sep 17 00:00:00 2001 From: subtly Date: Sat, 20 Jun 2015 03:09:03 -0400 Subject: [PATCH 07/22] Add coder to reader and info struct for frame header. --- libp2p/RLPXFrameWriter.cpp | 12 ++++++++++++ libp2p/RLPXFrameWriter.h | 27 ++++++++++++++++++++++++++- 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/libp2p/RLPXFrameWriter.cpp b/libp2p/RLPXFrameWriter.cpp index c8c1e8ceb..a657afd70 100644 --- a/libp2p/RLPXFrameWriter.cpp +++ b/libp2p/RLPXFrameWriter.cpp @@ -25,6 +25,18 @@ using namespace std; using namespace dev; using namespace dev::p2p; +RLPXFrameInfo::RLPXFrameInfo(bytesConstRef _header) +{ + length = (_header[0] * 256 + _header[1]) * 256 + _header[2]; + padding = ((16 - (length % 16)) % 16); + RLP header(_header.cropped(3), RLP::ThrowOnFail | RLP::FailIfTooSmall); + auto itemCount = header.itemCount(); + protocolId = header[0].toInt(); + hasSequence = itemCount > 1; + sequenceId = hasSequence ? header[1].toInt() : 0; + totalLength = itemCount == 3 ? header[2].toInt() : 0; +} + void RLPXFrameWriter::enque(unsigned _packetType, RLPStream& _payload, PacketPriority _priority) { QueueState& qs = _priority ? m_q.first : m_q.second; diff --git a/libp2p/RLPXFrameWriter.h b/libp2p/RLPXFrameWriter.h index 957bb09a2..384bf0503 100644 --- a/libp2p/RLPXFrameWriter.h +++ b/libp2p/RLPXFrameWriter.h @@ -33,6 +33,22 @@ namespace dev namespace p2p { +struct RLPXFrameDecrytFailed: virtual dev::Exception {}; + +struct RLPXFrameInfo +{ + RLPXFrameInfo() = default; + /// Constructor. frame-size || protocol-type, [sequence-id[, total-packet-size]] + RLPXFrameInfo(bytesConstRef _frameHeader); + uint32_t length = 0; ///< Max: 2**24 + uint8_t padding = 0; + + uint16_t protocolId = 0; + bool hasSequence = false; + uint16_t sequenceId = 0; + uint32_t totalLength = 0; +}; + /** * RLPFrameReader * Reads and assembles RLPX frame byte buffers into RLPX packets. Additionally @@ -43,11 +59,15 @@ namespace p2p */ class RLPXFrameReader { +public: RLPXFrameReader(uint16_t _protocolType): m_protocolType(_protocolType) {} /// Processes a single frame returning complete packets. - std::vector demux(bytes& _frame, bool _sequence = false, uint16_t _seq = 0, uint32_t _totalSize = 0) + std::vector demux(RLPXFrameCoder& _coder, bytes& _frame, bool _sequence = false, uint16_t _seq = 0, uint32_t _totalSize = 0) { + if (!_coder.authAndDecryptFrame(&_frame)) + BOOST_THROW_EXCEPTION(RLPXFrameDecrytFailed()); + std::vector ret; if (!_frame.size() || _frame.size() > _totalSize) return ret; @@ -100,6 +120,11 @@ protected: std::map> m_incomplete; ///< Incomplete packets and bytes remaining. }; +/** + * RLPXFrameWriter + * Multiplex and encrypted packets into RLPX frames. + * @todo flag to disable multiple packets per frame + */ class RLPXFrameWriter { struct QueueState From 300a35a8e280c1dcd00c1f807f0ef6d44b1745fd Mon Sep 17 00:00:00 2001 From: subtly Date: Sat, 20 Jun 2015 03:23:20 -0400 Subject: [PATCH 08/22] prep, moving frame info to coder class. --- libp2p/RLPXFrameWriter.cpp | 12 ------------ libp2p/RLPXFrameWriter.h | 14 -------------- 2 files changed, 26 deletions(-) diff --git a/libp2p/RLPXFrameWriter.cpp b/libp2p/RLPXFrameWriter.cpp index a657afd70..c8c1e8ceb 100644 --- a/libp2p/RLPXFrameWriter.cpp +++ b/libp2p/RLPXFrameWriter.cpp @@ -25,18 +25,6 @@ using namespace std; using namespace dev; using namespace dev::p2p; -RLPXFrameInfo::RLPXFrameInfo(bytesConstRef _header) -{ - length = (_header[0] * 256 + _header[1]) * 256 + _header[2]; - padding = ((16 - (length % 16)) % 16); - RLP header(_header.cropped(3), RLP::ThrowOnFail | RLP::FailIfTooSmall); - auto itemCount = header.itemCount(); - protocolId = header[0].toInt(); - hasSequence = itemCount > 1; - sequenceId = hasSequence ? header[1].toInt() : 0; - totalLength = itemCount == 3 ? header[2].toInt() : 0; -} - void RLPXFrameWriter::enque(unsigned _packetType, RLPStream& _payload, PacketPriority _priority) { QueueState& qs = _priority ? m_q.first : m_q.second; diff --git a/libp2p/RLPXFrameWriter.h b/libp2p/RLPXFrameWriter.h index 384bf0503..3bbcb7a16 100644 --- a/libp2p/RLPXFrameWriter.h +++ b/libp2p/RLPXFrameWriter.h @@ -34,20 +34,6 @@ namespace p2p { struct RLPXFrameDecrytFailed: virtual dev::Exception {}; - -struct RLPXFrameInfo -{ - RLPXFrameInfo() = default; - /// Constructor. frame-size || protocol-type, [sequence-id[, total-packet-size]] - RLPXFrameInfo(bytesConstRef _frameHeader); - uint32_t length = 0; ///< Max: 2**24 - uint8_t padding = 0; - - uint16_t protocolId = 0; - bool hasSequence = false; - uint16_t sequenceId = 0; - uint32_t totalLength = 0; -}; /** * RLPFrameReader From 5d725d8a6f553c3812eed1a2a1fa34404d798d1b Mon Sep 17 00:00:00 2001 From: subtly Date: Sat, 20 Jun 2015 03:24:11 -0400 Subject: [PATCH 09/22] rename drain to mux --- test/libp2p/rlpx.cpp | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/test/libp2p/rlpx.cpp b/test/libp2p/rlpx.cpp index 4e974861b..b1095f1bb 100644 --- a/test/libp2p/rlpx.cpp +++ b/test/libp2p/rlpx.cpp @@ -481,25 +481,28 @@ BOOST_AUTO_TEST_CASE(readerWriter) RLPXFrameWriter w(0); w.enque(0, (RLPStream() << payload)); - vector out; + vector encframes; for (unsigned i = 1; i < drains; i++) { - auto n = w.mux(coder, RLPXFrameWriter::MinFrameDequeLength, out); + auto n = w.mux(coder, RLPXFrameWriter::MinFrameDequeLength, encframes); BOOST_REQUIRE_EQUAL(0, n); - BOOST_REQUIRE_EQUAL(out.size(), i); + BOOST_REQUIRE_EQUAL(encframes.size(), i); } - BOOST_REQUIRE_EQUAL(1, w.mux(coder, RLPXFrameWriter::MinFrameDequeLength, out)); - BOOST_REQUIRE_EQUAL(out.size(), drains); - BOOST_REQUIRE_EQUAL(0, w.mux(coder, RLPXFrameWriter::MinFrameDequeLength, out)); - BOOST_REQUIRE_EQUAL(out.size(), drains); - - // we should now have a bunch of ciphertext in out - BOOST_REQUIRE(out.size() == drains); - for (auto const& c: out) + BOOST_REQUIRE_EQUAL(1, w.mux(coder, RLPXFrameWriter::MinFrameDequeLength, encframes)); + BOOST_REQUIRE_EQUAL(encframes.size(), drains); + BOOST_REQUIRE_EQUAL(0, w.mux(coder, RLPXFrameWriter::MinFrameDequeLength, encframes)); + BOOST_REQUIRE_EQUAL(encframes.size(), drains); + + // we should now have a bunch of ciphertext in encframes + BOOST_REQUIRE(encframes.size() == drains); + for (auto const& c: encframes) BOOST_REQUIRE(c.size() == RLPXFrameWriter::MinFrameDequeLength); - // read and assemble dequed frames - + // read and assemble dequed encframes + vector packets; + RLPXFrameReader r(0); +// for (auto const& b: encframes) +// packets.push_back(r.demux()); } From f5ead2e93a70af27791591f39b559a921302f96d Mon Sep 17 00:00:00 2001 From: subtly Date: Sat, 20 Jun 2015 07:35:04 -0400 Subject: [PATCH 10/22] Windows compile complaint. --- test/libp2p/rlpx.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/libp2p/rlpx.cpp b/test/libp2p/rlpx.cpp index b1095f1bb..f2c602554 100644 --- a/test/libp2p/rlpx.cpp +++ b/test/libp2p/rlpx.cpp @@ -496,7 +496,9 @@ BOOST_AUTO_TEST_CASE(readerWriter) // we should now have a bunch of ciphertext in encframes BOOST_REQUIRE(encframes.size() == drains); for (auto const& c: encframes) + { BOOST_REQUIRE(c.size() == RLPXFrameWriter::MinFrameDequeLength); + } // read and assemble dequed encframes vector packets; From 5360ca1548b51b4a368608063b6b5e05b20b50b7 Mon Sep 17 00:00:00 2001 From: subtly Date: Sat, 20 Jun 2015 08:34:53 -0400 Subject: [PATCH 11/22] Initial reader test. Something broken with encframes vector of bytes. --- libp2p/RLPXFrameWriter.cpp | 5 ++++- libp2p/RLPXFrameWriter.h | 12 ++++++------ test/libp2p/rlpx.cpp | 21 +++++++++++++++++---- 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/libp2p/RLPXFrameWriter.cpp b/libp2p/RLPXFrameWriter.cpp index c8c1e8ceb..2c00256e4 100644 --- a/libp2p/RLPXFrameWriter.cpp +++ b/libp2p/RLPXFrameWriter.cpp @@ -25,6 +25,9 @@ using namespace std; using namespace dev; using namespace dev::p2p; +const uint16_t RLPXFrameWriter::EmptyFrameLength = h128::size * 3; // header + headerMAC + frameMAC +const uint16_t RLPXFrameWriter::MinFrameDequeLength = h128::size * 4; // header + headerMAC + padded-block + frameMAC + void RLPXFrameWriter::enque(unsigned _packetType, RLPStream& _payload, PacketPriority _priority) { QueueState& qs = _priority ? m_q.first : m_q.second; @@ -113,7 +116,7 @@ size_t RLPXFrameWriter::mux(RLPXFrameCoder& _coder, unsigned _size, vector= 0); frameLen -= payload.size(); - o_toWrite.push_back(move(payload)); + o_toWrite.push_back(payload); payload.resize(0); if (!qs.remaining) diff --git a/libp2p/RLPXFrameWriter.h b/libp2p/RLPXFrameWriter.h index 3bbcb7a16..e4a4a1953 100644 --- a/libp2p/RLPXFrameWriter.h +++ b/libp2p/RLPXFrameWriter.h @@ -49,9 +49,9 @@ public: RLPXFrameReader(uint16_t _protocolType): m_protocolType(_protocolType) {} /// Processes a single frame returning complete packets. - std::vector demux(RLPXFrameCoder& _coder, bytes& _frame, bool _sequence = false, uint16_t _seq = 0, uint32_t _totalSize = 0) + std::vector demux(RLPXFrameCoder& _coder, bytesRef _frame, bool _sequence = false, uint16_t _seq = 0, uint32_t _totalSize = 0) { - if (!_coder.authAndDecryptFrame(&_frame)) + if (!_coder.authAndDecryptFrame(_frame)) BOOST_THROW_EXCEPTION(RLPXFrameDecrytFailed()); std::vector ret; @@ -66,7 +66,7 @@ public: RLPXPacket& p = m_incomplete.at(_seq).first; if (_frame.size() > remaining) return ret; - else if(p.streamIn(&_frame)) + else if(p.streamIn(_frame)) { ret.push_back(std::move(p)); m_incomplete.erase(_seq); @@ -79,7 +79,7 @@ public: m_incomplete.erase(_seq); } - bytesConstRef buffer(&_frame); + bytesConstRef buffer(_frame); while (!buffer.empty()) { auto type = RLPXPacket::nextRLP(buffer); @@ -125,8 +125,8 @@ class RLPXFrameWriter public: enum PacketPriority { PriorityLow = 0, PriorityHigh }; - static const uint16_t EmptyFrameLength = h128::size * 3; // header + headerMAC + frameMAC - static const uint16_t MinFrameDequeLength = h128::size * 4; // header + headerMAC + padded-block + frameMAC + static const uint16_t EmptyFrameLength; + static const uint16_t MinFrameDequeLength; RLPXFrameWriter(uint16_t _protocolType): m_protocolType(_protocolType) {} RLPXFrameWriter(RLPXFrameWriter const& _s): m_protocolType(_s.m_protocolType) {} diff --git a/test/libp2p/rlpx.cpp b/test/libp2p/rlpx.cpp index f2c602554..da582b988 100644 --- a/test/libp2p/rlpx.cpp +++ b/test/libp2p/rlpx.cpp @@ -497,15 +497,28 @@ BOOST_AUTO_TEST_CASE(readerWriter) BOOST_REQUIRE(encframes.size() == drains); for (auto const& c: encframes) { - BOOST_REQUIRE(c.size() == RLPXFrameWriter::MinFrameDequeLength); + BOOST_REQUIRE_EQUAL(c.size(), RLPXFrameWriter::MinFrameDequeLength); } // read and assemble dequed encframes vector packets; RLPXFrameReader r(0); -// for (auto const& b: encframes) -// packets.push_back(r.demux()); - + for (auto i = 0; i < encframes.size(); i++) + { + auto size = encframes[i].size(); + auto p = encframes[i].data(); + bytesRef frameWithHeader(encframes[i].data(), encframes[i].size()); + bytesRef h = frameWithHeader.cropped(0, 16); + bool decryptedHeader = coder.authAndDecryptHeader(h); + BOOST_REQUIRE(decryptedHeader); + bytesRef frame = frameWithHeader.cropped(16); + auto packets = r.demux(coder, frame); + if (packets.size()) + packets += move(packets); + } + BOOST_REQUIRE_EQUAL(packets.size(), 1); + BOOST_REQUIRE_EQUAL(packets.front().size(), payload.size()); + BOOST_REQUIRE_EQUAL(sha3(packets.front().data()), sha3(payload)); } BOOST_AUTO_TEST_SUITE_END() From c145d79c4f50fd65020056524683773c7ad5bb7e Mon Sep 17 00:00:00 2001 From: subtly Date: Sat, 27 Jun 2015 07:00:43 -0400 Subject: [PATCH 12/22] Update/fix test for builds. --- test/libp2p/rlpx.cpp | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/test/libp2p/rlpx.cpp b/test/libp2p/rlpx.cpp index da582b988..837bc06bb 100644 --- a/test/libp2p/rlpx.cpp +++ b/test/libp2p/rlpx.cpp @@ -503,18 +503,19 @@ BOOST_AUTO_TEST_CASE(readerWriter) // read and assemble dequed encframes vector packets; RLPXFrameReader r(0); - for (auto i = 0; i < encframes.size(); i++) + for (size_t i = 0; i < encframes.size(); i++) { - auto size = encframes[i].size(); - auto p = encframes[i].data(); - bytesRef frameWithHeader(encframes[i].data(), encframes[i].size()); - bytesRef h = frameWithHeader.cropped(0, 16); - bool decryptedHeader = coder.authAndDecryptHeader(h); - BOOST_REQUIRE(decryptedHeader); - bytesRef frame = frameWithHeader.cropped(16); - auto packets = r.demux(coder, frame); - if (packets.size()) - packets += move(packets); + // This fails )-: +// auto size = encframes[i].size(); +// auto p = encframes[i].data(); +// bytesRef frameWithHeader(encframes[i].data(), encframes[i].size()); +// bytesRef h = frameWithHeader.cropped(0, 16); +// bool decryptedHeader = coder.authAndDecryptHeader(h); +// BOOST_REQUIRE(decryptedHeader); +// bytesRef frame = frameWithHeader.cropped(16); +// auto packets = r.demux(coder, frame); +// if (packets.size()) +// packets += move(packets); } BOOST_REQUIRE_EQUAL(packets.size(), 1); BOOST_REQUIRE_EQUAL(packets.front().size(), payload.size()); From bcca52c443cec3d9dff214066b4fcf892d07900b Mon Sep 17 00:00:00 2001 From: subtly Date: Sun, 5 Jul 2015 00:44:44 -0700 Subject: [PATCH 13/22] mux/demux working test --- libp2p/RLPXFrameWriter.h | 34 ++++++++++++++++++++-------------- libp2p/RLPXPacket.h | 5 +++-- test/libp2p/rlpx.cpp | 38 ++++++++++++++++++-------------------- 3 files changed, 41 insertions(+), 36 deletions(-) diff --git a/libp2p/RLPXFrameWriter.h b/libp2p/RLPXFrameWriter.h index e4a4a1953..a0e2107fb 100644 --- a/libp2p/RLPXFrameWriter.h +++ b/libp2p/RLPXFrameWriter.h @@ -55,31 +55,36 @@ public: BOOST_THROW_EXCEPTION(RLPXFrameDecrytFailed()); std::vector ret; - if (!_frame.size() || _frame.size() > _totalSize) + if (!_sequence && (!_frame.size() || _frame.size() > _totalSize)) return ret; - + + // trim mac + bytesConstRef buffer = _frame.cropped(0, _frame.size() - h128::size); + // continue populating incomplete packets if (_sequence && m_incomplete.count(_seq)) { uint32_t& remaining = m_incomplete.at(_seq).second; - if (!_totalSize && _frame.size() <= remaining) + if (!_totalSize && buffer.size() > 0 && buffer.size() <= remaining) { + remaining -= buffer.size(); + RLPXPacket& p = m_incomplete.at(_seq).first; - if (_frame.size() > remaining) - return ret; - else if(p.streamIn(_frame)) - { + if(p.streamIn(buffer) && !remaining) ret.push_back(std::move(p)); + if (!remaining) m_incomplete.erase(_seq); - } - else - remaining -= _frame.size(); + + if (!ret.empty() && remaining) + BOOST_THROW_EXCEPTION(RLPXInvalidPacket()); + else if (ret.empty() && !remaining) + BOOST_THROW_EXCEPTION(RLPXInvalidPacket()); + return ret; } else m_incomplete.erase(_seq); } - - bytesConstRef buffer(_frame); + while (!buffer.empty()) { auto type = RLPXPacket::nextRLP(buffer); @@ -96,14 +101,15 @@ public: if (p.isValid()) ret.push_back(std::move(p)); else if (_sequence) - m_incomplete.insert(std::make_pair(_seq, std::make_pair(std::move(p), _totalSize - _frame.size()))); + // ugh: need to make total-size inclusive of packet-type :/ + m_incomplete.insert(std::make_pair(_seq, std::make_pair(std::move(p), _totalSize - packet.size()))); } return ret; } protected: uint16_t m_protocolType; - std::map> m_incomplete; ///< Incomplete packets and bytes remaining. + std::map> m_incomplete; ///< Sequence: Incomplete packet and bytes remaining. }; /** diff --git a/libp2p/RLPXPacket.h b/libp2p/RLPXPacket.h index 1421c74f7..e72220439 100644 --- a/libp2p/RLPXPacket.h +++ b/libp2p/RLPXPacket.h @@ -21,6 +21,7 @@ #pragma once +#include #include "Common.h" namespace dev @@ -37,7 +38,7 @@ struct RLPXInvalidPacket: virtual dev::Exception {}; class RLPXPacket { public: - static bytesConstRef nextRLP(bytesConstRef _b) { try { RLP r(_b, RLP::AllowNonCanon); auto s = r.actualSize(); if (s >= _b.size()) return _b.cropped(s); } catch(...) {} return bytesConstRef(); } + static bytesConstRef nextRLP(bytesConstRef _b) { try { RLP r(_b, RLP::AllowNonCanon); return _b.cropped(0, std::min((size_t)r.actualSize(), _b.size())); } catch(...) {} return bytesConstRef(); } /// Construct complete packet. RLPStream data is moved. RLPXPacket(unsigned _capId, unsigned _type, RLPStream& _rlps): m_cap(_capId), m_type(_type), m_data(std::move(_rlps.out())) { if (!_type && !m_data.size()) BOOST_THROW_EXCEPTION(RLPXNullPacket()); } @@ -59,7 +60,7 @@ public: protected: - unsigned getType(bytesConstRef _rlp) { return RLP(_rlp.cropped(1)).toInt(); } + unsigned getType(bytesConstRef _rlp) { return RLP(_rlp.cropped(0, 1)).toInt(); } unsigned m_cap; unsigned m_type; diff --git a/test/libp2p/rlpx.cpp b/test/libp2p/rlpx.cpp index 837bc06bb..6c835f38b 100644 --- a/test/libp2p/rlpx.cpp +++ b/test/libp2p/rlpx.cpp @@ -461,7 +461,7 @@ BOOST_AUTO_TEST_CASE(readerWriter) h256 remoteNonce = Nonce::get(); bytes ackCipher{0}; bytes authCipher{1}; - RLPXFrameCoder coder(true, remoteEph.pubkey(), remoteNonce, localEph, localNonce, &ackCipher, &authCipher); + RLPXFrameCoder encoder(true, remoteEph.pubkey(), remoteNonce, localEph, localNonce, &ackCipher, &authCipher); /// Test writing a 64byte packet and drain with minimum frame size that /// forces packet to be pieced into 4 frames. @@ -480,46 +480,44 @@ BOOST_AUTO_TEST_CASE(readerWriter) BOOST_REQUIRE_EQUAL(4, drains); RLPXFrameWriter w(0); - w.enque(0, (RLPStream() << payload)); + RLPStream rlpPayload(RLPStream() << payload); + w.enque(0, rlpPayload); vector encframes; for (unsigned i = 1; i < drains; i++) { - auto n = w.mux(coder, RLPXFrameWriter::MinFrameDequeLength, encframes); + auto n = w.mux(encoder, RLPXFrameWriter::MinFrameDequeLength, encframes); BOOST_REQUIRE_EQUAL(0, n); BOOST_REQUIRE_EQUAL(encframes.size(), i); } - BOOST_REQUIRE_EQUAL(1, w.mux(coder, RLPXFrameWriter::MinFrameDequeLength, encframes)); + BOOST_REQUIRE_EQUAL(1, w.mux(encoder, RLPXFrameWriter::MinFrameDequeLength, encframes)); BOOST_REQUIRE_EQUAL(encframes.size(), drains); - BOOST_REQUIRE_EQUAL(0, w.mux(coder, RLPXFrameWriter::MinFrameDequeLength, encframes)); + BOOST_REQUIRE_EQUAL(0, w.mux(encoder, RLPXFrameWriter::MinFrameDequeLength, encframes)); BOOST_REQUIRE_EQUAL(encframes.size(), drains); // we should now have a bunch of ciphertext in encframes BOOST_REQUIRE(encframes.size() == drains); for (auto const& c: encframes) - { BOOST_REQUIRE_EQUAL(c.size(), RLPXFrameWriter::MinFrameDequeLength); - } // read and assemble dequed encframes + RLPXFrameCoder decoder(false, localEph.pubkey(), localNonce, remoteEph, remoteNonce, &ackCipher, &authCipher); vector packets; RLPXFrameReader r(0); for (size_t i = 0; i < encframes.size(); i++) { - // This fails )-: -// auto size = encframes[i].size(); -// auto p = encframes[i].data(); -// bytesRef frameWithHeader(encframes[i].data(), encframes[i].size()); -// bytesRef h = frameWithHeader.cropped(0, 16); -// bool decryptedHeader = coder.authAndDecryptHeader(h); -// BOOST_REQUIRE(decryptedHeader); -// bytesRef frame = frameWithHeader.cropped(16); -// auto packets = r.demux(coder, frame); -// if (packets.size()) -// packets += move(packets); + bytesRef frameWithHeader(encframes[i].data(), encframes[i].size()); + bytesRef header = frameWithHeader.cropped(0, h256::size); + bool decryptedHeader = decoder.authAndDecryptHeader(header); + BOOST_REQUIRE(decryptedHeader); + bytesRef frame = frameWithHeader.cropped(h256::size); + RLPXFrameInfo f(header); + auto p = f.hasSequence ? r.demux(decoder, frame, true, f.sequenceId, f.totalLength) : r.demux(decoder, frame); + if (p.size()) + packets += move(p); } BOOST_REQUIRE_EQUAL(packets.size(), 1); - BOOST_REQUIRE_EQUAL(packets.front().size(), payload.size()); - BOOST_REQUIRE_EQUAL(sha3(packets.front().data()), sha3(payload)); + BOOST_REQUIRE_EQUAL(packets.front().size(), rlpPayload.out().size()); + BOOST_REQUIRE_EQUAL(sha3(RLP(packets.front().data()).payload()), sha3(payload)); } BOOST_AUTO_TEST_SUITE_END() From 52228e5ad6233f315b47264ae644ef4ab6e75a4c Mon Sep 17 00:00:00 2001 From: subtly Date: Sun, 5 Jul 2015 00:53:24 -0700 Subject: [PATCH 14/22] enum renamed --- libp2p/RLPXPacket.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2p/RLPXPacket.h b/libp2p/RLPXPacket.h index e72220439..7f6818770 100644 --- a/libp2p/RLPXPacket.h +++ b/libp2p/RLPXPacket.h @@ -52,7 +52,7 @@ public: unsigned type() const { return m_type; } bytes const& data() const { return m_data; } - size_t size() const { try { return RLP(m_data, RLP::LaisezFaire).actualSize(); } catch(...) { return 0; } } + size_t size() const { try { return RLP(m_data, RLP::LaissezFaire).actualSize(); } catch(...) { return 0; } } bool streamIn(bytesConstRef _in) { auto offset = m_data.size(); m_data.resize(offset + _in.size()); _in.copyTo(bytesRef(&m_data).cropped(offset)); return isValid(); } From 639b4fb13c1d787217e18c5aa3810de8659d361e Mon Sep 17 00:00:00 2001 From: subtly Date: Sun, 5 Jul 2015 18:56:06 -0700 Subject: [PATCH 15/22] Cleanup. Interface for enqueuing packets. Fix: Include packet type in total size. --- libp2p/RLPXFrameCoder.h | 30 ++++++---- libp2p/RLPXFrameReader.h | 112 +++++++++++++++++++++++++++++++++++++ libp2p/RLPXFrameWriter.cpp | 31 ++++++---- libp2p/RLPXFrameWriter.h | 87 ++-------------------------- libp2p/RLPXPacket.h | 37 ++++++------ test/libp2p/rlpx.cpp | 8 ++- 6 files changed, 180 insertions(+), 125 deletions(-) create mode 100644 libp2p/RLPXFrameReader.h diff --git a/libp2p/RLPXFrameCoder.h b/libp2p/RLPXFrameCoder.h index f4c13758b..4d06fd529 100644 --- a/libp2p/RLPXFrameCoder.h +++ b/libp2p/RLPXFrameCoder.h @@ -33,18 +33,24 @@ namespace dev namespace p2p { +struct RLPXFrameDecrytFailed: virtual dev::Exception {}; + +/** + * @brief Encapsulation of Frame + * @todo coder integration; padding derived from coder + */ struct RLPXFrameInfo { RLPXFrameInfo() = default; /// Constructor. frame-size || protocol-type, [sequence-id[, total-packet-size]] RLPXFrameInfo(bytesConstRef _frameHeader); - uint32_t length = 0; ///< Max: 2**24 - uint8_t padding = 0; + uint32_t length = 0; ///< Size of frame (excludes padding). Max: 2**24 + uint8_t padding = 0; ///< Length of padding which follows @length. - uint16_t protocolId = 0; - bool hasSequence = false; - uint16_t sequenceId = 0; - uint32_t totalLength = 0; + uint16_t protocolId = 0; ///< Protocol ID as negotiated by handshake. + bool hasSequence = false; ///< If this frame is part of a sequence + uint16_t sequenceId = 0; ///< Sequence ID of frame + uint32_t totalLength = 0; ///< Set to }; class RLPXHandshake; @@ -52,8 +58,12 @@ class RLPXHandshake; /** * @brief Encoder/decoder transport for RLPx connection established by RLPXHandshake. * + * @todo rename to RLPXTranscoder + * @todo Remove 'Frame' nomenclature and expect caller to provide RLPXFrame + * @todo Remove handshake as friend, remove handshake-based constructor + * * Thread Safety - * Distinct Objects: Safe. + * Distinct Objects: Unsafe. * Shared objects: Unsafe. */ class RLPXFrameCoder @@ -69,7 +79,7 @@ public: ~RLPXFrameCoder() {} - /// Establish shared secrets and setup AES and MAC states. Used by both constructors. + /// Establish shared secrets and setup AES and MAC states. void setup(bool _originated, h512 _remoteEphemeral, h256 _remoteNonce, crypto::ECDHE const& _ephemeral, h256 _nonce, bytesConstRef _ackCipher, bytesConstRef _authCipher); /// Write single-frame payload of packet(s). @@ -78,10 +88,10 @@ public: /// Write continuation frame of segmented payload. void writeFrame(uint16_t _protocolType, uint16_t _seqId, bytesConstRef _payload, bytes& o_bytes); - /// Write first frame of segmented payload. + /// Write first frame of segmented or sequence-tagged payload. void writeFrame(uint16_t _protocolType, uint16_t _seqId, uint32_t _totalSize, bytesConstRef _payload, bytes& o_bytes); - /// Encrypt _packet as RLPx frame. + /// Legacy. Encrypt _packet as ill-defined legacy RLPx frame. void writeSingleFramePacket(bytesConstRef _packet, bytes& o_bytes); /// Authenticate and decrypt header in-place. diff --git a/libp2p/RLPXFrameReader.h b/libp2p/RLPXFrameReader.h new file mode 100644 index 000000000..40f59c960 --- /dev/null +++ b/libp2p/RLPXFrameReader.h @@ -0,0 +1,112 @@ +/* + This file is part of cpp-ethereum. + + cpp-ethereum is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + cpp-ethereum is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with cpp-ethereum. If not, see . + */ +/** @file RLPXFrameReader.h + * @author Alex Leverington + * @date 2015 + */ + + +#pragma once + +#include +#include "RLPXFrameCoder.h" +#include "RLPXPacket.h" +namespace ba = boost::asio; +namespace bi = boost::asio::ip; + +namespace dev +{ +namespace p2p +{ + +/** + * RLPFrameReader + * Reads and assembles RLPX frame byte buffers into RLPX packets. Additionally + * buffers incomplete packets which are pieced into multiple frames (has sequence). + */ +class RLPXFrameReader +{ +public: + RLPXFrameReader(uint16_t _protocolType): m_protocolType(_protocolType) {} + + /// Processes a single frame returning complete packets. + std::vector demux(RLPXFrameCoder& _coder, bytesRef _frame, bool _sequence = false, uint16_t _seq = 0, uint32_t _totalSize = 0) + { + if (!_coder.authAndDecryptFrame(_frame)) + BOOST_THROW_EXCEPTION(RLPXFrameDecrytFailed()); + + std::vector ret; + if (!_sequence && (!_frame.size() || _frame.size() > _totalSize)) + return ret; + + // trim mac + bytesConstRef buffer = _frame.cropped(0, _frame.size() - h128::size); + // continue populating incomplete packets + if (_sequence && m_incomplete.count(_seq)) + { + uint32_t& remaining = m_incomplete.at(_seq).second; + if (!_totalSize && buffer.size() > 0 && buffer.size() <= remaining) + { + remaining -= buffer.size(); + + RLPXPacket& p = m_incomplete.at(_seq).first; + if(p.append(buffer) && !remaining) + ret.push_back(std::move(p)); + if (!remaining) + m_incomplete.erase(_seq); + + if (!ret.empty() && remaining) + BOOST_THROW_EXCEPTION(RLPXInvalidPacket()); + else if (ret.empty() && !remaining) + BOOST_THROW_EXCEPTION(RLPXInvalidPacket()); + + return ret; + } + else + m_incomplete.erase(_seq); + } + + while (!buffer.empty()) + { + auto type = nextRLP(buffer); + if (type.empty()) + break; + buffer = buffer.cropped(type.size()); + // consume entire buffer if packet has sequence + auto packet = _sequence ? buffer : nextRLP(buffer); + buffer = buffer.cropped(packet.size()); + RLPXPacket p(m_protocolType, type); + if (!packet.empty()) + p.append(packet); + + uint32_t remaining = _totalSize - type.size() - packet.size(); + if (p.isValid()) + ret.push_back(std::move(p)); + else if (_sequence && remaining) + m_incomplete.insert(std::make_pair(_seq, std::make_pair(std::move(p), remaining))); + // else drop invalid packet + } + return ret; + } + +protected: + uint16_t m_protocolType; + std::map> m_incomplete; ///< Sequence: Incomplete packet and bytes remaining. +}; + +} +} \ No newline at end of file diff --git a/libp2p/RLPXFrameWriter.cpp b/libp2p/RLPXFrameWriter.cpp index 2c00256e4..8ff12b444 100644 --- a/libp2p/RLPXFrameWriter.cpp +++ b/libp2p/RLPXFrameWriter.cpp @@ -28,11 +28,18 @@ using namespace dev::p2p; const uint16_t RLPXFrameWriter::EmptyFrameLength = h128::size * 3; // header + headerMAC + frameMAC const uint16_t RLPXFrameWriter::MinFrameDequeLength = h128::size * 4; // header + headerMAC + padded-block + frameMAC -void RLPXFrameWriter::enque(unsigned _packetType, RLPStream& _payload, PacketPriority _priority) +void RLPXFrameWriter::enque(RLPXPacket&& _p, PacketPriority _priority) { + if (!_p.isValid()) + return; QueueState& qs = _priority ? m_q.first : m_q.second; DEV_GUARDED(qs.x) - qs.q.push_back(move(RLPXPacket(m_protocolType, _packetType, _payload))); + qs.q.push_back(move(_p)); +} + +void RLPXFrameWriter::enque(unsigned _packetType, RLPStream& _payload, PacketPriority _priority) +{ + enque(RLPXPacket(m_protocolType, (RLPStream() << _packetType), _payload), _priority); } size_t RLPXFrameWriter::mux(RLPXFrameCoder& _coder, unsigned _size, vector& o_toWrite) @@ -70,13 +77,11 @@ size_t RLPXFrameWriter::mux(RLPXFrameCoder& _coder, unsigned _size, vectorsize(); - bytes packetType = rlp(qs.writing->type()); - qs.sequenced = qs.remaining + packetType.size() > frameAllot; + qs.sequenced = qs.writing->size() > frameAllot; - // stop here if we can't write-out packet-type - // or payload already packed and packet won't fit - if (packetType.size() > frameAllot || (qs.sequenced && payload.size())) + // break here if we can't write-out packet-type + // or payload is packed and next packet won't fit (implicit) + if (qs.writing->type().size() > frameAllot || (qs.sequenced && !payload.empty())) { qs.writing = nullptr; qs.remaining = 0; @@ -85,14 +90,16 @@ size_t RLPXFrameWriter::mux(RLPXFrameCoder& _coder, unsigned _size, vectortype().size(); + payload += qs.writing->type(); + + qs.remaining = qs.writing->data().size(); } assert(qs.sequenced || (!qs.sequenced && frameAllot >= qs.remaining)); if (frameAllot && qs.remaining) { - offset = qs.writing->size() - qs.remaining; + offset = qs.writing->data().size() - qs.remaining; length = qs.remaining <= frameAllot ? qs.remaining : frameAllot; bytes portion = bytesConstRef(&qs.writing->data()).cropped(offset, length).toBytes(); qs.remaining -= length; diff --git a/libp2p/RLPXFrameWriter.h b/libp2p/RLPXFrameWriter.h index a0e2107fb..96a342de9 100644 --- a/libp2p/RLPXFrameWriter.h +++ b/libp2p/RLPXFrameWriter.h @@ -33,88 +33,11 @@ namespace dev namespace p2p { -struct RLPXFrameDecrytFailed: virtual dev::Exception {}; - -/** - * RLPFrameReader - * Reads and assembles RLPX frame byte buffers into RLPX packets. Additionally - * buffers incomplete packets which are pieced into multiple frames (has sequence). - * @todo drop frame and reset incomplete if - * @todo percolate sequenceid to p2p protocol - * @todo informative exception - */ -class RLPXFrameReader -{ -public: - RLPXFrameReader(uint16_t _protocolType): m_protocolType(_protocolType) {} - - /// Processes a single frame returning complete packets. - std::vector demux(RLPXFrameCoder& _coder, bytesRef _frame, bool _sequence = false, uint16_t _seq = 0, uint32_t _totalSize = 0) - { - if (!_coder.authAndDecryptFrame(_frame)) - BOOST_THROW_EXCEPTION(RLPXFrameDecrytFailed()); - - std::vector ret; - if (!_sequence && (!_frame.size() || _frame.size() > _totalSize)) - return ret; - - // trim mac - bytesConstRef buffer = _frame.cropped(0, _frame.size() - h128::size); - // continue populating incomplete packets - if (_sequence && m_incomplete.count(_seq)) - { - uint32_t& remaining = m_incomplete.at(_seq).second; - if (!_totalSize && buffer.size() > 0 && buffer.size() <= remaining) - { - remaining -= buffer.size(); - - RLPXPacket& p = m_incomplete.at(_seq).first; - if(p.streamIn(buffer) && !remaining) - ret.push_back(std::move(p)); - if (!remaining) - m_incomplete.erase(_seq); - - if (!ret.empty() && remaining) - BOOST_THROW_EXCEPTION(RLPXInvalidPacket()); - else if (ret.empty() && !remaining) - BOOST_THROW_EXCEPTION(RLPXInvalidPacket()); - - return ret; - } - else - m_incomplete.erase(_seq); - } - - while (!buffer.empty()) - { - auto type = RLPXPacket::nextRLP(buffer); - if (type.empty()) - break; - buffer = buffer.cropped(type.size()); - // consume entire buffer if packet has sequence - auto packet = _sequence ? buffer : RLPXPacket::nextRLP(buffer); - buffer = buffer.cropped(packet.size()); - RLPXPacket p(m_protocolType, type); - if (!packet.empty()) - p.streamIn(packet); - - if (p.isValid()) - ret.push_back(std::move(p)); - else if (_sequence) - // ugh: need to make total-size inclusive of packet-type :/ - m_incomplete.insert(std::make_pair(_seq, std::make_pair(std::move(p), _totalSize - packet.size()))); - } - return ret; - } - -protected: - uint16_t m_protocolType; - std::map> m_incomplete; ///< Sequence: Incomplete packet and bytes remaining. -}; - /** * RLPXFrameWriter - * Multiplex and encrypted packets into RLPX frames. + * Multiplex packets into encrypted RLPX frames. + * @todo throw when enqueued packet is invalid + * @todo use RLPXFrameInfo * @todo flag to disable multiple packets per frame */ class RLPXFrameWriter @@ -139,9 +62,11 @@ public: size_t size() const { size_t l; size_t h; DEV_GUARDED(m_q.first.x) h = m_q.first.q.size(); DEV_GUARDED(m_q.second.x) l = m_q.second.q.size(); return l + h; } - /// Contents of _payload will be moved. Adds packet to queue, to be muxed into frames by mux when network buffer is ready for writing. Thread-safe. + /// Adds @_payload to queue (moves @_payload), to be muxed into frames by mux when network buffer is ready for writing. Thread-safe. void enque(unsigned _packetType, RLPStream& _payload, PacketPriority _priority = PriorityLow); + void enque(RLPXPacket&& _p, PacketPriority _priority = PriorityLow); + /// Returns number of packets framed and outputs frames to o_bytes. Not thread-safe. size_t mux(RLPXFrameCoder& _coder, unsigned _size, std::vector& o_toWrite); diff --git a/libp2p/RLPXPacket.h b/libp2p/RLPXPacket.h index 7f6818770..f5d431be4 100644 --- a/libp2p/RLPXPacket.h +++ b/libp2p/RLPXPacket.h @@ -29,42 +29,39 @@ namespace dev namespace p2p { -struct RLPXNullPacket: virtual dev::Exception {}; struct RLPXInvalidPacket: virtual dev::Exception {}; +static bytesConstRef nextRLP(bytesConstRef _b) { try { RLP r(_b, RLP::AllowNonCanon); return _b.cropped(0, std::min((size_t)r.actualSize(), _b.size())); } catch(...) {} return bytesConstRef(); } + /** * RLPX Packet */ class RLPXPacket { public: - static bytesConstRef nextRLP(bytesConstRef _b) { try { RLP r(_b, RLP::AllowNonCanon); return _b.cropped(0, std::min((size_t)r.actualSize(), _b.size())); } catch(...) {} return bytesConstRef(); } - - /// Construct complete packet. RLPStream data is moved. - RLPXPacket(unsigned _capId, unsigned _type, RLPStream& _rlps): m_cap(_capId), m_type(_type), m_data(std::move(_rlps.out())) { if (!_type && !m_data.size()) BOOST_THROW_EXCEPTION(RLPXNullPacket()); } + /// Construct packet. RLPStream data is invalidated. + RLPXPacket(uint8_t _capId, RLPStream& _type, RLPStream& _data): m_cap(_capId), m_type(std::move(_type.out())), m_data(std::move(_data.out())) {} - /// Construct packet with type and initial bytes; type is determined by RLP of 1st byte and packet may be incomplete. - RLPXPacket(unsigned _capId, bytesConstRef _in): m_cap(_capId), m_type(getType(_in)) { assert(_in.size()); if (_in.size() > 1) { m_data.resize(_in.size() - 1); _in.cropped(1).copyTo(&m_data); } } + /// Construct packet from single bytestream. RLPStream data is invalidated. + RLPXPacket(unsigned _capId, bytesConstRef _in): m_cap(_capId), m_type(nextRLP(_in).toBytes()) { if (_in.size() > m_type.size()) { m_data.resize(_in.size() - m_type.size()); _in.cropped(m_type.size()).copyTo(&m_data); } } RLPXPacket(RLPXPacket const& _p): m_cap(_p.m_cap), m_type(_p.m_type), m_data(_p.m_data) {} - - RLPXPacket(RLPXPacket&& _p): m_cap(_p.m_cap), m_type(_p.m_type), m_data(std::move(_p.m_data)) {} - - unsigned type() const { return m_type; } + RLPXPacket(RLPXPacket&& _p): m_cap(_p.m_cap), m_type(std::move(_p.m_type)), m_data(std::move(_p.m_data)) {} + + bytes const& type() const { return m_type; } + bytes const& data() const { return m_data; } - size_t size() const { try { return RLP(m_data, RLP::LaissezFaire).actualSize(); } catch(...) { return 0; } } + + size_t size() const { try { return RLP(m_type).actualSize() + RLP(m_data, RLP::LaissezFaire).actualSize(); } catch(...) { return 0; } } - bool streamIn(bytesConstRef _in) { auto offset = m_data.size(); m_data.resize(offset + _in.size()); _in.copyTo(bytesRef(&m_data).cropped(offset)); return isValid(); } + /// Appends byte data and returns if packet is valid. + bool append(bytesConstRef _in) { auto offset = m_data.size(); m_data.resize(offset + _in.size()); _in.copyTo(bytesRef(&m_data).cropped(offset)); return isValid(); } - bool isValid() { if (m_type > 0x7f) return false; try { return RLP(m_data).actualSize() == m_data.size(); } catch (...) {} return false; } + virtual bool isValid() const noexcept { try { return !(m_type.empty() && m_data.empty()) && RLP(m_type).actualSize() == m_type.size() && RLP(m_data).actualSize() == m_data.size(); } catch (...) {} return false; } - protected: - unsigned getType(bytesConstRef _rlp) { return RLP(_rlp.cropped(0, 1)).toInt(); } - - unsigned m_cap; - unsigned m_type; - uint16_t m_sequence = 0; + uint8_t m_cap; + bytes m_type; bytes m_data; }; diff --git a/test/libp2p/rlpx.cpp b/test/libp2p/rlpx.cpp index a2d4d78f4..af377a5a5 100644 --- a/test/libp2p/rlpx.cpp +++ b/test/libp2p/rlpx.cpp @@ -32,6 +32,7 @@ #include #include #include +#include using namespace std; using namespace dev; @@ -481,7 +482,9 @@ BOOST_AUTO_TEST_CASE(readerWriter) RLPXFrameWriter w(0); RLPStream rlpPayload(RLPStream() << payload); - w.enque(0, rlpPayload); + uint8_t packetType = 0; + bytes packetTypeRLP = (RLPStream() << packetType).out(); + w.enque(packetType, rlpPayload); vector encframes; for (unsigned i = 1; i < drains; i++) { @@ -516,8 +519,9 @@ BOOST_AUTO_TEST_CASE(readerWriter) packets += move(p); } BOOST_REQUIRE_EQUAL(packets.size(), 1); - BOOST_REQUIRE_EQUAL(packets.front().size(), rlpPayload.out().size()); + BOOST_REQUIRE_EQUAL(packets.front().size(), packetTypeRLP.size() + rlpPayload.out().size()); BOOST_REQUIRE_EQUAL(sha3(RLP(packets.front().data()).payload()), sha3(payload)); + BOOST_REQUIRE_EQUAL(sha3(packets.front().type()), sha3(packetTypeRLP)); } BOOST_AUTO_TEST_SUITE_END() From 78367f8352406c8a023921963b9cf834dc0c6789 Mon Sep 17 00:00:00 2001 From: subtly Date: Sun, 5 Jul 2015 18:56:59 -0700 Subject: [PATCH 16/22] windows compile fix --- test/libp2p/rlpx.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/libp2p/rlpx.cpp b/test/libp2p/rlpx.cpp index af377a5a5..79c635487 100644 --- a/test/libp2p/rlpx.cpp +++ b/test/libp2p/rlpx.cpp @@ -500,7 +500,9 @@ BOOST_AUTO_TEST_CASE(readerWriter) // we should now have a bunch of ciphertext in encframes BOOST_REQUIRE(encframes.size() == drains); for (auto const& c: encframes) + { BOOST_REQUIRE_EQUAL(c.size(), RLPXFrameWriter::MinFrameDequeLength); + } // read and assemble dequed encframes RLPXFrameCoder decoder(false, localEph.pubkey(), localNonce, remoteEph, remoteNonce, &ackCipher, &authCipher); From bd2d99f169753ed8b546b1022ce724ed7ffc1f5f Mon Sep 17 00:00:00 2001 From: subtly Date: Mon, 6 Jul 2015 16:52:43 -0700 Subject: [PATCH 17/22] Fixup and document frame writer. Add test for coalesced packets. --- libp2p/RLPXFrameCoder.cpp | 10 +++--- libp2p/RLPXFrameWriter.cpp | 65 ++++++++++++++++++++++++-------------- libp2p/RLPXFrameWriter.h | 27 ++++++++++------ test/libp2p/rlpx.cpp | 52 ++++++++++++++++++++++++++++-- 4 files changed, 114 insertions(+), 40 deletions(-) diff --git a/libp2p/RLPXFrameCoder.cpp b/libp2p/RLPXFrameCoder.cpp index c53c5f508..2019b5c02 100644 --- a/libp2p/RLPXFrameCoder.cpp +++ b/libp2p/RLPXFrameCoder.cpp @@ -56,7 +56,7 @@ void RLPXFrameCoder::setup(bool _originated, h512 _remoteEphemeral, h256 _remote { bytes keyMaterialBytes(64); bytesRef keyMaterial(&keyMaterialBytes); - + // shared-secret = sha3(ecdhe-shared-secret || sha3(nonce || initiator-nonce)) Secret ephemeralShared; _ecdhe.agree(_remoteEphemeral, ephemeralShared); @@ -81,13 +81,13 @@ void RLPXFrameCoder::setup(bool _originated, h512 _remoteEphemeral, h256 _remote h128 iv; m_frameEnc.SetKeyWithIV(m_frameEncKey, h256::size, iv.data()); m_frameDec.SetKeyWithIV(m_frameDecKey, h256::size, iv.data()); - + // mac-secret = sha3(ecdhe-shared-secret || aes-secret) sha3(keyMaterial, outRef); // output mac-secret m_macEncKey.resize(h256::size); memcpy(m_macEncKey.data(), outRef.data(), h256::size); m_macEnc.SetKey(m_macEncKey, h256::size); - + // Initiator egress-mac: sha3(mac-secret^recipient-nonce || auth-sent-init) // ingress-mac: sha3(mac-secret^initiator-nonce || auth-recvd-ack) // Recipient egress-mac: sha3(mac-secret^initiator-nonce || auth-sent-ack) @@ -99,7 +99,7 @@ void RLPXFrameCoder::setup(bool _originated, h512 _remoteEphemeral, h256 _remote keyMaterial.retarget(keyMaterialBytes.data(), keyMaterialBytes.size()); egressCipher.copyTo(keyMaterial.cropped(h256::size, egressCipher.size())); m_egressMac.Update(keyMaterial.data(), keyMaterial.size()); - + // recover mac-secret by re-xoring remoteNonce (*(h256*)keyMaterial.data() ^ _remoteNonce ^ _nonce).ref().copyTo(keyMaterial); bytesConstRef ingressCipher = _originated ? _ackCipher : _authCipher; @@ -144,7 +144,7 @@ void RLPXFrameCoder::writeFrame(RLPStream const& _header, bytesConstRef _payload m_frameEnc.ProcessData(headerWithMac.data(), headerWithMac.data(), 16); updateEgressMACWithHeader(bytesConstRef(&headerWithMac).cropped(0, 16)); egressDigest().ref().copyTo(bytesRef(&headerWithMac).cropped(h128::size,h128::size)); - + auto padding = (16 - (_payload.size() % 16)) % 16; o_bytes.swap(headerWithMac); o_bytes.resize(32 + _payload.size() + padding + h128::size); diff --git a/libp2p/RLPXFrameWriter.cpp b/libp2p/RLPXFrameWriter.cpp index 8ff12b444..63d9c8330 100644 --- a/libp2p/RLPXFrameWriter.cpp +++ b/libp2p/RLPXFrameWriter.cpp @@ -32,12 +32,12 @@ void RLPXFrameWriter::enque(RLPXPacket&& _p, PacketPriority _priority) { if (!_p.isValid()) return; - QueueState& qs = _priority ? m_q.first : m_q.second; + WriterState& qs = _priority ? m_q.first : m_q.second; DEV_GUARDED(qs.x) qs.q.push_back(move(_p)); } -void RLPXFrameWriter::enque(unsigned _packetType, RLPStream& _payload, PacketPriority _priority) +void RLPXFrameWriter::enque(uint8_t _packetType, RLPStream& _payload, PacketPriority _priority) { enque(RLPXPacket(m_protocolType, (RLPStream() << _packetType), _payload), _priority); } @@ -63,32 +63,45 @@ size_t RLPXFrameWriter::mux(RLPXFrameCoder& _coder, unsigned _size, vector low, otherwise low > high bool high = highPending && !swapQueues ? true : lowPending ? false : true; - QueueState &qs = high ? m_q.first : m_q.second; + WriterState &qs = high ? m_q.first : m_q.second; size_t frameAllot = (!swapQueues && highPending && lowPending ? frameLen / 2 - (c_overhead + c_blockSize) > 0 ? frameLen / 2 : frameLen : frameLen) - c_overhead; size_t offset = 0; size_t length = 0; while (frameAllot >= c_blockSize) { + // Invariants: + // !qs.writing && payload.empty() initial entry + // !qs.writing && !payload.empty() continuation (multiple packets per frame) + // qs.writing && payload.empty() initial entry, continuation (multiple frames per packet) + // qs.writing && !payload.empty() INVALID + + // write packet-type if (qs.writing == nullptr) { - DEV_GUARDED(qs.x) - qs.writing = &qs.q[0]; - qs.sequenced = qs.writing->size() > frameAllot; + { + Guard l(qs.x); + if (!qs.q.empty()) + qs.writing = &qs.q[0]; + else + break; + } // break here if we can't write-out packet-type // or payload is packed and next packet won't fit (implicit) - if (qs.writing->type().size() > frameAllot || (qs.sequenced && !payload.empty())) + qs.multiFrame = qs.writing->size() > frameAllot; + assert(qs.writing->type().size()); + if (qs.writing->type().size() > frameAllot || (qs.multiFrame && !payload.empty())) { qs.writing = nullptr; qs.remaining = 0; - qs.sequenced = false; + qs.multiFrame = false; break; } - else if (qs.sequenced) + else if (qs.multiFrame) qs.sequence = ++m_sequenceId; frameAllot -= qs.writing->type().size(); @@ -96,7 +109,9 @@ size_t RLPXFrameWriter::mux(RLPXFrameCoder& _coder, unsigned _size, vectordata().size(); } - assert(qs.sequenced || (!qs.sequenced && frameAllot >= qs.remaining)); + + // write payload w/remaining allotment + assert(qs.multiFrame || (!qs.multiFrame && frameAllot >= qs.remaining)); if (frameAllot && qs.remaining) { offset = qs.writing->data().size() - qs.remaining; @@ -106,16 +121,25 @@ size_t RLPXFrameWriter::mux(RLPXFrameCoder& _coder, unsigned _size, vector 0 || !qs.multiFrame)) || (qs.remaining && qs.multiFrame)); + if (!qs.remaining) + { qs.writing = nullptr; - if (qs.sequenced) + DEV_GUARDED(qs.x) + qs.q.pop_front(); + ret++; + } + // qs.writing is left alone for first frame of multi-frame packet + if (qs.multiFrame) break; } - if (payload.size()) + if (!payload.empty()) { - if (qs.sequenced) + if (qs.multiFrame) if (offset == 0) + // 1st frame of segmented packet writes total-size of packet _coder.writeFrame(m_protocolType, qs.sequence, qs.writing->size(), &payload, payload); else _coder.writeFrame(m_protocolType, qs.sequence, &payload, payload); @@ -126,13 +150,8 @@ size_t RLPXFrameWriter::mux(RLPXFrameCoder& _coder, unsigned _size, vector q; + mutable Mutex x; + RLPXPacket* writing = nullptr; size_t remaining = 0; - bool sequenced = false; + bool multiFrame = false; uint16_t sequence; - mutable Mutex x; }; public: @@ -60,11 +65,13 @@ public: RLPXFrameWriter(uint16_t _protocolType): m_protocolType(_protocolType) {} RLPXFrameWriter(RLPXFrameWriter const& _s): m_protocolType(_s.m_protocolType) {} + /// Returns total number of queued packets. Thread-safe. size_t size() const { size_t l; size_t h; DEV_GUARDED(m_q.first.x) h = m_q.first.q.size(); DEV_GUARDED(m_q.second.x) l = m_q.second.q.size(); return l + h; } - /// Adds @_payload to queue (moves @_payload), to be muxed into frames by mux when network buffer is ready for writing. Thread-safe. - void enque(unsigned _packetType, RLPStream& _payload, PacketPriority _priority = PriorityLow); - + /// Moves @_payload output to queue, to be muxed into frames by mux() when network buffer is ready for writing. Thread-safe. + void enque(uint8_t _packetType, RLPStream& _payload, PacketPriority _priority = PriorityLow); + + /// Moves @_p to queue, to be muxed into frames by mux() when network buffer is ready for writing. Thread-safe. void enque(RLPXPacket&& _p, PacketPriority _priority = PriorityLow); /// Returns number of packets framed and outputs frames to o_bytes. Not thread-safe. @@ -72,7 +79,7 @@ public: private: uint16_t const m_protocolType; // Protocol Type - std::pair m_q; // High, Low frame queues + std::pair m_q; // High, Low frame queues uint16_t m_sequenceId = 0; // Sequence ID }; diff --git a/test/libp2p/rlpx.cpp b/test/libp2p/rlpx.cpp index 79c635487..9ac35b344 100644 --- a/test/libp2p/rlpx.cpp +++ b/test/libp2p/rlpx.cpp @@ -454,7 +454,7 @@ BOOST_AUTO_TEST_CASE(ecies_interop_test_primitives) BOOST_REQUIRE(plainTest3 == expectedPlain3); } -BOOST_AUTO_TEST_CASE(readerWriter) +BOOST_AUTO_TEST_CASE(segmentedPacket) { ECDHE localEph; h256 localNonce = Nonce::get(); @@ -464,11 +464,14 @@ BOOST_AUTO_TEST_CASE(readerWriter) bytes authCipher{1}; RLPXFrameCoder encoder(true, remoteEph.pubkey(), remoteNonce, localEph, localNonce, &ackCipher, &authCipher); - /// Test writing a 64byte packet and drain with minimum frame size that + /// Test writing a 64byte RLPStream and drain with frame size that /// forces packet to be pieced into 4 frames. /// (Minimum frame size has room for 16 bytes of payload) // 64-byte payload minus 3 bytes for packet-type and RLP overhead. + // Note: mux() is called with RLPXFrameWriter::MinFrameDequeLength + // which is equal to 64byte, however, after overhead this means + // there are only 16 bytes of payload which will be dequed. auto dequeLen = 16; bytes stuff = sha3("A").asBytes(); bytes payload; @@ -526,5 +529,50 @@ BOOST_AUTO_TEST_CASE(readerWriter) BOOST_REQUIRE_EQUAL(sha3(packets.front().type()), sha3(packetTypeRLP)); } +BOOST_AUTO_TEST_CASE(coalescedPackets) +{ + ECDHE localEph; + h256 localNonce = Nonce::get(); + ECDHE remoteEph; + h256 remoteNonce = Nonce::get(); + bytes ackCipher{0}; + bytes authCipher{1}; + RLPXFrameCoder encoder(true, remoteEph.pubkey(), remoteNonce, localEph, localNonce, &ackCipher, &authCipher); + + /// Test writing four 32 byte RLPStream packets such that + /// a single 1KB frame will incldue all four packets. + auto dequeLen = 1024; // sufficient enough for all packets + bytes initialStuff = sha3("A").asBytes(); + vector packets; + for (unsigned i = 0; i < 4; i++) + packets.push_back(sha3(initialStuff)); + + RLPXFrameWriter w(0); + uint8_t packetType = 127; + for (auto const& p: packets) + w.enque(packetType, (RLPStream() << p)); + + vector encframes; + BOOST_REQUIRE_EQUAL(4, w.mux(encoder, dequeLen, encframes)); + BOOST_REQUIRE_EQUAL(0, w.mux(encoder, dequeLen, encframes)); + BOOST_REQUIRE_EQUAL(1, encframes.size()); + auto expectedFrameSize = RLPXFrameWriter::EmptyFrameLength + packets.size() * (/*packet-type*/ 1 + h256::size + /*rlp-prefix*/ 1); + expectedFrameSize += ((16 - (expectedFrameSize % 16)) % 16); + BOOST_REQUIRE_EQUAL(expectedFrameSize, encframes[0].size()); +} + +BOOST_AUTO_TEST_CASE(singleFramePacket) +{ +} + +BOOST_AUTO_TEST_CASE(manyProtocols) +{ + +} + +BOOST_AUTO_TEST_CASE(allOfSingleSegmentedCoalescedWithManyProtocols) +{ +} + BOOST_AUTO_TEST_SUITE_END() From 99a97e05f048070ca91dc43be9a5726e7635910d Mon Sep 17 00:00:00 2001 From: subtly Date: Mon, 6 Jul 2015 19:26:10 -0700 Subject: [PATCH 18/22] Cleanup. More fixes. More tests. --- libp2p/RLPXFrameCoder.cpp | 4 +- libp2p/RLPXFrameCoder.h | 4 +- libp2p/RLPXFrameReader.cpp | 90 +++++++++++++++++++++++++++++++++++++ libp2p/RLPXFrameReader.h | 59 +----------------------- libp2p/RLPXFrameWriter.cpp | 2 +- libp2p/RLPXFrameWriter.h | 7 +-- test/libp2p/rlpx.cpp | 92 ++++++++++++++++++++++++++++++++------ 7 files changed, 179 insertions(+), 79 deletions(-) create mode 100644 libp2p/RLPXFrameReader.cpp diff --git a/libp2p/RLPXFrameCoder.cpp b/libp2p/RLPXFrameCoder.cpp index 2019b5c02..dd17cbebc 100644 --- a/libp2p/RLPXFrameCoder.cpp +++ b/libp2p/RLPXFrameCoder.cpp @@ -37,8 +37,8 @@ RLPXFrameInfo::RLPXFrameInfo(bytesConstRef _header) RLP header(_header.cropped(3), RLP::ThrowOnFail | RLP::FailIfTooSmall); auto itemCount = header.itemCount(); protocolId = header[0].toInt(); - hasSequence = itemCount > 1; - sequenceId = hasSequence ? header[1].toInt() : 0; + multiFrame = itemCount > 1; + sequenceId = multiFrame ? header[1].toInt() : 0; totalLength = itemCount == 3 ? header[2].toInt() : 0; } diff --git a/libp2p/RLPXFrameCoder.h b/libp2p/RLPXFrameCoder.h index 4d06fd529..9266da459 100644 --- a/libp2p/RLPXFrameCoder.h +++ b/libp2p/RLPXFrameCoder.h @@ -48,9 +48,9 @@ struct RLPXFrameInfo uint8_t padding = 0; ///< Length of padding which follows @length. uint16_t protocolId = 0; ///< Protocol ID as negotiated by handshake. - bool hasSequence = false; ///< If this frame is part of a sequence + bool multiFrame = false; ///< If this frame is part of a sequence uint16_t sequenceId = 0; ///< Sequence ID of frame - uint32_t totalLength = 0; ///< Set to + uint32_t totalLength = 0; ///< Set to total length of packet in first frame of multiframe packet }; class RLPXHandshake; diff --git a/libp2p/RLPXFrameReader.cpp b/libp2p/RLPXFrameReader.cpp new file mode 100644 index 000000000..b37ffa796 --- /dev/null +++ b/libp2p/RLPXFrameReader.cpp @@ -0,0 +1,90 @@ +/* + This file is part of cpp-ethereum. + + cpp-ethereum is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + cpp-ethereum is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with cpp-ethereum. If not, see . + */ +/** @file RLPXFrameReader.cpp + * @author Alex Leverington + * @date 2015 + */ + +#include "RLPXFrameReader.h" + +using namespace std; +using namespace dev; +using namespace dev::p2p; + +std::vector RLPXFrameReader::demux(RLPXFrameCoder& _coder, RLPXFrameInfo const& _info, bytesRef _frame) +{ + if (!_coder.authAndDecryptFrame(_frame)) + BOOST_THROW_EXCEPTION(RLPXFrameDecrytFailed()); + + std::vector ret; + if (_frame.empty()) + // drop: bad frame (empty) + return ret; + if (_info.multiFrame && _info.totalLength && _frame.size() > _info.totalLength) + // drop: bad frame (too large) + return ret; + + // trim mac + bytesConstRef buffer = _frame.cropped(0, _frame.size() - (h128::size + _info.padding)); + // continue populating multiframe packets + if (_info.multiFrame && m_incomplete.count(_info.sequenceId)) + { + uint32_t& remaining = m_incomplete.at(_info.sequenceId).second; + if (!_info.totalLength && buffer.size() > 0 && buffer.size() <= remaining) + { + remaining -= buffer.size(); + + RLPXPacket& p = m_incomplete.at(_info.sequenceId).first; + if(p.append(buffer) && !remaining) + ret.push_back(std::move(p)); + if (!remaining) + m_incomplete.erase(_info.sequenceId); + + if (!ret.empty() && remaining) + BOOST_THROW_EXCEPTION(RLPXInvalidPacket()); + else if (ret.empty() && !remaining) + BOOST_THROW_EXCEPTION(RLPXInvalidPacket()); + + return ret; + } + else + m_incomplete.erase(_info.sequenceId); + } + + while (!buffer.empty()) + { + auto type = nextRLP(buffer); + if (type.empty()) + break; + buffer = buffer.cropped(type.size()); + // consume entire buffer if packet has sequence + auto packet = _info.multiFrame ? buffer : nextRLP(buffer); + buffer = buffer.cropped(packet.size()); + RLPXPacket p(m_protocolType, type); + if (!packet.empty()) + p.append(packet); + + uint32_t remaining = _info.totalLength - type.size() - packet.size(); + if (p.isValid()) + ret.push_back(std::move(p)); + else if (_info.multiFrame && remaining) + m_incomplete.insert(std::make_pair(_info.sequenceId, std::make_pair(std::move(p), remaining))); + else + BOOST_THROW_EXCEPTION(RLPXInvalidPacket()); + } + return ret; +} diff --git a/libp2p/RLPXFrameReader.h b/libp2p/RLPXFrameReader.h index 40f59c960..20143de24 100644 --- a/libp2p/RLPXFrameReader.h +++ b/libp2p/RLPXFrameReader.h @@ -44,64 +44,7 @@ public: RLPXFrameReader(uint16_t _protocolType): m_protocolType(_protocolType) {} /// Processes a single frame returning complete packets. - std::vector demux(RLPXFrameCoder& _coder, bytesRef _frame, bool _sequence = false, uint16_t _seq = 0, uint32_t _totalSize = 0) - { - if (!_coder.authAndDecryptFrame(_frame)) - BOOST_THROW_EXCEPTION(RLPXFrameDecrytFailed()); - - std::vector ret; - if (!_sequence && (!_frame.size() || _frame.size() > _totalSize)) - return ret; - - // trim mac - bytesConstRef buffer = _frame.cropped(0, _frame.size() - h128::size); - // continue populating incomplete packets - if (_sequence && m_incomplete.count(_seq)) - { - uint32_t& remaining = m_incomplete.at(_seq).second; - if (!_totalSize && buffer.size() > 0 && buffer.size() <= remaining) - { - remaining -= buffer.size(); - - RLPXPacket& p = m_incomplete.at(_seq).first; - if(p.append(buffer) && !remaining) - ret.push_back(std::move(p)); - if (!remaining) - m_incomplete.erase(_seq); - - if (!ret.empty() && remaining) - BOOST_THROW_EXCEPTION(RLPXInvalidPacket()); - else if (ret.empty() && !remaining) - BOOST_THROW_EXCEPTION(RLPXInvalidPacket()); - - return ret; - } - else - m_incomplete.erase(_seq); - } - - while (!buffer.empty()) - { - auto type = nextRLP(buffer); - if (type.empty()) - break; - buffer = buffer.cropped(type.size()); - // consume entire buffer if packet has sequence - auto packet = _sequence ? buffer : nextRLP(buffer); - buffer = buffer.cropped(packet.size()); - RLPXPacket p(m_protocolType, type); - if (!packet.empty()) - p.append(packet); - - uint32_t remaining = _totalSize - type.size() - packet.size(); - if (p.isValid()) - ret.push_back(std::move(p)); - else if (_sequence && remaining) - m_incomplete.insert(std::make_pair(_seq, std::make_pair(std::move(p), remaining))); - // else drop invalid packet - } - return ret; - } + std::vector demux(RLPXFrameCoder& _coder, RLPXFrameInfo const& _info, bytesRef _frame); protected: uint16_t m_protocolType; diff --git a/libp2p/RLPXFrameWriter.cpp b/libp2p/RLPXFrameWriter.cpp index 63d9c8330..69f624d3f 100644 --- a/libp2p/RLPXFrameWriter.cpp +++ b/libp2p/RLPXFrameWriter.cpp @@ -50,7 +50,7 @@ size_t RLPXFrameWriter::mux(RLPXFrameCoder& _coder, unsigned _size, vector= c_overhead + c_blockSize) diff --git a/libp2p/RLPXFrameWriter.h b/libp2p/RLPXFrameWriter.h index 14a3239cb..36d902a11 100644 --- a/libp2p/RLPXFrameWriter.h +++ b/libp2p/RLPXFrameWriter.h @@ -71,12 +71,13 @@ public: /// Moves @_payload output to queue, to be muxed into frames by mux() when network buffer is ready for writing. Thread-safe. void enque(uint8_t _packetType, RLPStream& _payload, PacketPriority _priority = PriorityLow); - /// Moves @_p to queue, to be muxed into frames by mux() when network buffer is ready for writing. Thread-safe. - void enque(RLPXPacket&& _p, PacketPriority _priority = PriorityLow); - /// Returns number of packets framed and outputs frames to o_bytes. Not thread-safe. size_t mux(RLPXFrameCoder& _coder, unsigned _size, std::vector& o_toWrite); +protected: + /// Moves @_p to queue, to be muxed into frames by mux() when network buffer is ready for writing. Thread-safe. + void enque(RLPXPacket&& _p, PacketPriority _priority = PriorityLow); + private: uint16_t const m_protocolType; // Protocol Type std::pair m_q; // High, Low frame queues diff --git a/test/libp2p/rlpx.cpp b/test/libp2p/rlpx.cpp index 9ac35b344..114ec4a3b 100644 --- a/test/libp2p/rlpx.cpp +++ b/test/libp2p/rlpx.cpp @@ -454,7 +454,7 @@ BOOST_AUTO_TEST_CASE(ecies_interop_test_primitives) BOOST_REQUIRE(plainTest3 == expectedPlain3); } -BOOST_AUTO_TEST_CASE(segmentedPacket) +BOOST_AUTO_TEST_CASE(segmentedPacketFlush) { ECDHE localEph; h256 localNonce = Nonce::get(); @@ -519,7 +519,7 @@ BOOST_AUTO_TEST_CASE(segmentedPacket) BOOST_REQUIRE(decryptedHeader); bytesRef frame = frameWithHeader.cropped(h256::size); RLPXFrameInfo f(header); - auto p = f.hasSequence ? r.demux(decoder, frame, true, f.sequenceId, f.totalLength) : r.demux(decoder, frame); + auto p = r.demux(decoder, f, frame); if (p.size()) packets += move(p); } @@ -529,7 +529,7 @@ BOOST_AUTO_TEST_CASE(segmentedPacket) BOOST_REQUIRE_EQUAL(sha3(packets.front().type()), sha3(packetTypeRLP)); } -BOOST_AUTO_TEST_CASE(coalescedPackets) +BOOST_AUTO_TEST_CASE(coalescedPacketsPadded) { ECDHE localEph; h256 localNonce = Nonce::get(); @@ -542,27 +542,97 @@ BOOST_AUTO_TEST_CASE(coalescedPackets) /// Test writing four 32 byte RLPStream packets such that /// a single 1KB frame will incldue all four packets. auto dequeLen = 1024; // sufficient enough for all packets - bytes initialStuff = sha3("A").asBytes(); - vector packets; + bytes stuff = sha3("A").asBytes(); + vector packetsOut; for (unsigned i = 0; i < 4; i++) - packets.push_back(sha3(initialStuff)); + packetsOut.push_back(stuff); RLPXFrameWriter w(0); uint8_t packetType = 127; - for (auto const& p: packets) + bytes packetTypeRLP((RLPStream() << packetType).out()); + for (auto const& p: packetsOut) w.enque(packetType, (RLPStream() << p)); vector encframes; BOOST_REQUIRE_EQUAL(4, w.mux(encoder, dequeLen, encframes)); BOOST_REQUIRE_EQUAL(0, w.mux(encoder, dequeLen, encframes)); BOOST_REQUIRE_EQUAL(1, encframes.size()); - auto expectedFrameSize = RLPXFrameWriter::EmptyFrameLength + packets.size() * (/*packet-type*/ 1 + h256::size + /*rlp-prefix*/ 1); + auto expectedFrameSize = RLPXFrameWriter::EmptyFrameLength + packetsOut.size() * (/*packet-type*/ 1 + h256::size + /*rlp-prefix*/ 1); expectedFrameSize += ((16 - (expectedFrameSize % 16)) % 16); BOOST_REQUIRE_EQUAL(expectedFrameSize, encframes[0].size()); + + // read and assemble dequed encframes + RLPXFrameCoder decoder(false, localEph.pubkey(), localNonce, remoteEph, remoteNonce, &ackCipher, &authCipher); + vector packets; + RLPXFrameReader r(0); + bytesRef frameWithHeader(encframes[0].data(), encframes[0].size()); + bytesRef header = frameWithHeader.cropped(0, h256::size); + bool decryptedHeader = decoder.authAndDecryptHeader(header); + BOOST_REQUIRE(decryptedHeader); + bytesRef frame = frameWithHeader.cropped(h256::size); + RLPXFrameInfo f(header); + BOOST_REQUIRE_EQUAL(f.multiFrame, false); + auto p = r.demux(decoder, f, frame); + packets += move(p); + + RLPStream rlpPayload; + rlpPayload << stuff; + BOOST_REQUIRE_EQUAL(packets.size(), 4); + while (!packets.empty()) + { + BOOST_REQUIRE_EQUAL(packets.back().size(), packetTypeRLP.size() + rlpPayload.out().size()); + BOOST_REQUIRE_EQUAL(sha3(RLP(packets.back().data()).payload()), sha3(stuff)); + BOOST_REQUIRE_EQUAL(sha3(packets.back().type()), sha3(packetTypeRLP)); + packets.pop_back(); + } } -BOOST_AUTO_TEST_CASE(singleFramePacket) +BOOST_AUTO_TEST_CASE(singleFramePacketFlush) { + ECDHE localEph; + h256 localNonce = Nonce::get(); + ECDHE remoteEph; + h256 remoteNonce = Nonce::get(); + bytes ackCipher{0}; + bytes authCipher{1}; + RLPXFrameCoder encoder(true, remoteEph.pubkey(), remoteNonce, localEph, localNonce, &ackCipher, &authCipher); + + /// Test writing four 32 byte RLPStream packets such that + /// a single 1KB frame will incldue all four packets. + bytes stuff = sha3("A").asBytes(); + RLPXFrameWriter w(0); + uint8_t packetType = 127; + bytes packetTypeRLP((RLPStream() << packetType).out()); + w.enque(packetType, (RLPStream() << stuff)); + + vector encframes; + auto dequeLen = RLPXFrameWriter::EmptyFrameLength + 34; + dequeLen += ((16 - (dequeLen % 16)) % 16); + BOOST_REQUIRE_EQUAL(1, w.mux(encoder, dequeLen, encframes)); + BOOST_REQUIRE_EQUAL(0, w.mux(encoder, dequeLen, encframes)); + BOOST_REQUIRE_EQUAL(1, encframes.size()); + BOOST_REQUIRE_EQUAL(dequeLen, encframes[0].size()); + + // read and assemble dequed encframes + RLPXFrameCoder decoder(false, localEph.pubkey(), localNonce, remoteEph, remoteNonce, &ackCipher, &authCipher); + vector packets; + RLPXFrameReader r(0); + bytesRef frameWithHeader(encframes[0].data(), encframes[0].size()); + bytesRef header = frameWithHeader.cropped(0, h256::size); + bool decryptedHeader = decoder.authAndDecryptHeader(header); + BOOST_REQUIRE(decryptedHeader); + bytesRef frame = frameWithHeader.cropped(h256::size); + RLPXFrameInfo f(header); + BOOST_REQUIRE_EQUAL(f.multiFrame, false); + auto p = r.demux(decoder, f, frame); + packets += move(p); + + RLPStream rlpPayload; + rlpPayload << stuff; + BOOST_REQUIRE_EQUAL(packets.size(), 1); + BOOST_REQUIRE_EQUAL(packets.back().size(), packetTypeRLP.size() + rlpPayload.out().size()); + BOOST_REQUIRE_EQUAL(sha3(RLP(packets.back().data()).payload()), sha3(stuff)); + BOOST_REQUIRE_EQUAL(sha3(packets.back().type()), sha3(packetTypeRLP)); } BOOST_AUTO_TEST_CASE(manyProtocols) @@ -570,9 +640,5 @@ BOOST_AUTO_TEST_CASE(manyProtocols) } -BOOST_AUTO_TEST_CASE(allOfSingleSegmentedCoalescedWithManyProtocols) -{ -} - BOOST_AUTO_TEST_SUITE_END() From 4f02a4fee8ba30fb1bbf131736e010f0562dd0c8 Mon Sep 17 00:00:00 2001 From: subtly Date: Fri, 10 Jul 2015 18:05:02 -0700 Subject: [PATCH 19/22] Const frame info. Typo. Const arguments. --- libp2p/RLPXFrameCoder.cpp | 25 ++++++++++++------------- libp2p/RLPXFrameCoder.h | 22 +++++++++++++--------- libp2p/RLPXFrameReader.cpp | 2 +- libp2p/Session.cpp | 18 ++++++++++++------ 4 files changed, 38 insertions(+), 29 deletions(-) diff --git a/libp2p/RLPXFrameCoder.cpp b/libp2p/RLPXFrameCoder.cpp index dd17cbebc..83dc62409 100644 --- a/libp2p/RLPXFrameCoder.cpp +++ b/libp2p/RLPXFrameCoder.cpp @@ -30,29 +30,28 @@ using namespace dev; using namespace dev::p2p; using namespace CryptoPP; -RLPXFrameInfo::RLPXFrameInfo(bytesConstRef _header) -{ - length = (_header[0] * 256 + _header[1]) * 256 + _header[2]; - padding = ((16 - (length % 16)) % 16); - RLP header(_header.cropped(3), RLP::ThrowOnFail | RLP::FailIfTooSmall); - auto itemCount = header.itemCount(); - protocolId = header[0].toInt(); - multiFrame = itemCount > 1; - sequenceId = multiFrame ? header[1].toInt() : 0; - totalLength = itemCount == 3 ? header[2].toInt() : 0; -} +RLPXFrameInfo::RLPXFrameInfo(bytesConstRef _header): + length((_header[0] * 256 + _header[1]) * 256 + _header[2]), + padding((16 - (length % 16)) % 16), + data(_header.cropped(3).toBytes()), + header(RLP(data, RLP::ThrowOnFail | RLP::FailIfTooSmall)), + protocolId(header[0].toInt()), + multiFrame(header.itemCount() > 1), + sequenceId(multiFrame ? header[1].toInt() : 0), + totalLength(header.itemCount() == 3 ? header[2].toInt() : 0) +{} RLPXFrameCoder::RLPXFrameCoder(RLPXHandshake const& _init) { setup(_init.m_originated, _init.m_remoteEphemeral, _init.m_remoteNonce, _init.m_ecdhe, _init.m_nonce, &_init.m_ackCipher, &_init.m_authCipher); } -RLPXFrameCoder::RLPXFrameCoder(bool _originated, h512 _remoteEphemeral, h256 _remoteNonce, crypto::ECDHE const& _ecdhe, h256 _nonce, bytesConstRef _ackCipher, bytesConstRef _authCipher) +RLPXFrameCoder::RLPXFrameCoder(bool _originated, h512 const& _remoteEphemeral, h256 const& _remoteNonce, crypto::ECDHE const& _ecdhe, h256 const& _nonce, bytesConstRef _ackCipher, bytesConstRef _authCipher) { setup(_originated, _remoteEphemeral, _remoteNonce, _ecdhe, _nonce, _ackCipher, _authCipher); } -void RLPXFrameCoder::setup(bool _originated, h512 _remoteEphemeral, h256 _remoteNonce, crypto::ECDHE const& _ecdhe, h256 _nonce, bytesConstRef _ackCipher, bytesConstRef _authCipher) +void RLPXFrameCoder::setup(bool _originated, h512 const& _remoteEphemeral, h256 const& _remoteNonce, crypto::ECDHE const& _ecdhe, h256 const& _nonce, bytesConstRef _ackCipher, bytesConstRef _authCipher) { bytes keyMaterialBytes(64); bytesRef keyMaterial(&keyMaterialBytes); diff --git a/libp2p/RLPXFrameCoder.h b/libp2p/RLPXFrameCoder.h index 9266da459..19c648cd5 100644 --- a/libp2p/RLPXFrameCoder.h +++ b/libp2p/RLPXFrameCoder.h @@ -33,7 +33,7 @@ namespace dev namespace p2p { -struct RLPXFrameDecrytFailed: virtual dev::Exception {}; +struct RLPXFrameDecryptFailed: virtual dev::Exception {}; /** * @brief Encapsulation of Frame @@ -44,13 +44,17 @@ struct RLPXFrameInfo RLPXFrameInfo() = default; /// Constructor. frame-size || protocol-type, [sequence-id[, total-packet-size]] RLPXFrameInfo(bytesConstRef _frameHeader); - uint32_t length = 0; ///< Size of frame (excludes padding). Max: 2**24 - uint8_t padding = 0; ///< Length of padding which follows @length. + + uint32_t const length; ///< Size of frame (excludes padding). Max: 2**24 + uint8_t const padding; ///< Length of padding which follows @length. + + bytes const data; ///< Bytes of Header. + RLP const header; ///< Header RLP. - uint16_t protocolId = 0; ///< Protocol ID as negotiated by handshake. - bool multiFrame = false; ///< If this frame is part of a sequence - uint16_t sequenceId = 0; ///< Sequence ID of frame - uint32_t totalLength = 0; ///< Set to total length of packet in first frame of multiframe packet + uint16_t const protocolId; ///< Protocol ID as negotiated by handshake. + bool const multiFrame; ///< If this frame is part of a sequence + uint16_t const sequenceId; ///< Sequence ID of frame + uint32_t const totalLength; ///< Set to total length of packet in first frame of multiframe packet }; class RLPXHandshake; @@ -75,12 +79,12 @@ public: RLPXFrameCoder(RLPXHandshake const& _init); /// Construct with external key material. - RLPXFrameCoder(bool _originated, h512 _remoteEphemeral, h256 _remoteNonce, crypto::ECDHE const& _ephemeral, h256 _nonce, bytesConstRef _ackCipher, bytesConstRef _authCipher); + RLPXFrameCoder(bool _originated, h512 const& _remoteEphemeral, h256 const& _remoteNonce, crypto::ECDHE const& _ephemeral, h256 const& _nonce, bytesConstRef _ackCipher, bytesConstRef _authCipher); ~RLPXFrameCoder() {} /// Establish shared secrets and setup AES and MAC states. - void setup(bool _originated, h512 _remoteEphemeral, h256 _remoteNonce, crypto::ECDHE const& _ephemeral, h256 _nonce, bytesConstRef _ackCipher, bytesConstRef _authCipher); + void setup(bool _originated, h512 const& _remoteEphemeral, h256 const& _remoteNonce, crypto::ECDHE const& _ephemeral, h256 const& _nonce, bytesConstRef _ackCipher, bytesConstRef _authCipher); /// Write single-frame payload of packet(s). void writeFrame(uint16_t _protocolType, bytesConstRef _payload, bytes& o_bytes); diff --git a/libp2p/RLPXFrameReader.cpp b/libp2p/RLPXFrameReader.cpp index b37ffa796..68a329ad6 100644 --- a/libp2p/RLPXFrameReader.cpp +++ b/libp2p/RLPXFrameReader.cpp @@ -28,7 +28,7 @@ using namespace dev::p2p; std::vector RLPXFrameReader::demux(RLPXFrameCoder& _coder, RLPXFrameInfo const& _info, bytesRef _frame) { if (!_coder.authAndDecryptFrame(_frame)) - BOOST_THROW_EXCEPTION(RLPXFrameDecrytFailed()); + BOOST_THROW_EXCEPTION(RLPXFrameDecryptFailed()); std::vector ret; if (_frame.empty()) diff --git a/libp2p/Session.cpp b/libp2p/Session.cpp index 9a981786d..0d90ea6ea 100644 --- a/libp2p/Session.cpp +++ b/libp2p/Session.cpp @@ -375,10 +375,16 @@ void Session::doRead() return; } - RLPXFrameInfo header; + + uint16_t hProtocolId; + uint32_t hLength; + uint8_t hPadding; try { - header = RLPXFrameInfo(bytesConstRef(m_data.data(), length)); + RLPXFrameInfo header(bytesConstRef(m_data.data(), length)); + hProtocolId = header.protocolId; + hLength = header.length; + hPadding = header.padding; } catch (std::exception const& _e) { @@ -388,8 +394,8 @@ void Session::doRead() } /// read padded frame and mac - auto tlen = header.length + header.padding + h128::size; - ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, tlen), [this, self, header, tlen](boost::system::error_code ec, std::size_t length) + auto tlen = hLength + hPadding + h128::size; + ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, tlen), [this, self, hLength, hProtocolId, tlen](boost::system::error_code ec, std::size_t length) { ThreadContext tc(info().id.abridged()); ThreadContext tc2(info().clientVersion); @@ -402,7 +408,7 @@ void Session::doRead() return; } - bytesConstRef frame(m_data.data(), header.length); + bytesConstRef frame(m_data.data(), hLength); if (!checkPacket(frame)) { cerr << "Received " << frame.size() << ": " << toHex(frame) << endl; @@ -414,7 +420,7 @@ void Session::doRead() { auto packetType = (PacketType)RLP(frame.cropped(0, 1)).toInt(); RLP r(frame.cropped(1)); - if (!readPacket(header.protocolId, packetType, r)) + if (!readPacket(hProtocolId, packetType, r)) clog(NetWarn) << "Couldn't interpret packet." << RLP(r); } doRead(); From c2fff083274739cc5b1f1dd8e01bfdcb98602967 Mon Sep 17 00:00:00 2001 From: subtly Date: Sun, 12 Jul 2015 17:47:02 -0700 Subject: [PATCH 20/22] Update RLPXFrameWriter.h --- libp2p/RLPXFrameWriter.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libp2p/RLPXFrameWriter.h b/libp2p/RLPXFrameWriter.h index 36d902a11..e2600a147 100644 --- a/libp2p/RLPXFrameWriter.h +++ b/libp2p/RLPXFrameWriter.h @@ -54,7 +54,7 @@ class RLPXFrameWriter RLPXPacket* writing = nullptr; size_t remaining = 0; bool multiFrame = false; - uint16_t sequence; + uint16_t sequence = -1; }; public: @@ -85,4 +85,4 @@ private: }; } -} \ No newline at end of file +} From 20ca0ff24744b522533a3f1f22b639c50ecd3cdb Mon Sep 17 00:00:00 2001 From: subtly Date: Mon, 13 Jul 2015 10:55:00 -0700 Subject: [PATCH 21/22] Remove packet copy constructor. Simplify bool use. --- libp2p/RLPXFrameWriter.cpp | 2 +- libp2p/RLPXPacket.h | 2 +- test/libp2p/rlpx.cpp | 13 ++++++------- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/libp2p/RLPXFrameWriter.cpp b/libp2p/RLPXFrameWriter.cpp index 69f624d3f..82f81567a 100644 --- a/libp2p/RLPXFrameWriter.cpp +++ b/libp2p/RLPXFrameWriter.cpp @@ -66,7 +66,7 @@ size_t RLPXFrameWriter::mux(RLPXFrameCoder& _coder, unsigned _size, vector low, otherwise low > high - bool high = highPending && !swapQueues ? true : lowPending ? false : true; + bool high = highPending && !swapQueues ? true : !lowPending; WriterState &qs = high ? m_q.first : m_q.second; size_t frameAllot = (!swapQueues && highPending && lowPending ? frameLen / 2 - (c_overhead + c_blockSize) > 0 ? frameLen / 2 : frameLen : frameLen) - c_overhead; size_t offset = 0; diff --git a/libp2p/RLPXPacket.h b/libp2p/RLPXPacket.h index f5d431be4..1499b855d 100644 --- a/libp2p/RLPXPacket.h +++ b/libp2p/RLPXPacket.h @@ -45,7 +45,7 @@ public: /// Construct packet from single bytestream. RLPStream data is invalidated. RLPXPacket(unsigned _capId, bytesConstRef _in): m_cap(_capId), m_type(nextRLP(_in).toBytes()) { if (_in.size() > m_type.size()) { m_data.resize(_in.size() - m_type.size()); _in.cropped(m_type.size()).copyTo(&m_data); } } - RLPXPacket(RLPXPacket const& _p): m_cap(_p.m_cap), m_type(_p.m_type), m_data(_p.m_data) {} + RLPXPacket(RLPXPacket const& _p) = delete; RLPXPacket(RLPXPacket&& _p): m_cap(_p.m_cap), m_type(std::move(_p.m_type)), m_data(std::move(_p.m_data)) {} bytes const& type() const { return m_type; } diff --git a/test/libp2p/rlpx.cpp b/test/libp2p/rlpx.cpp index 392ea2aff..82c781d7d 100644 --- a/test/libp2p/rlpx.cpp +++ b/test/libp2p/rlpx.cpp @@ -518,9 +518,8 @@ BOOST_AUTO_TEST_CASE(segmentedPacketFlush) BOOST_REQUIRE(decryptedHeader); bytesRef frame = frameWithHeader.cropped(h256::size); RLPXFrameInfo f(header); - auto p = r.demux(decoder, f, frame); - if (p.size()) - packets += move(p); + for (RLPXPacket& p: r.demux(decoder, f, frame)) + packets.push_back(move(p)); } BOOST_REQUIRE_EQUAL(packets.size(), 1); BOOST_REQUIRE_EQUAL(packets.front().size(), packetTypeRLP.size() + rlpPayload.out().size()); @@ -571,8 +570,8 @@ BOOST_AUTO_TEST_CASE(coalescedPacketsPadded) bytesRef frame = frameWithHeader.cropped(h256::size); RLPXFrameInfo f(header); BOOST_REQUIRE_EQUAL(f.multiFrame, false); - auto p = r.demux(decoder, f, frame); - packets += move(p); + for (RLPXPacket& p: r.demux(decoder, f, frame)) + packets.push_back(move(p)); RLPStream rlpPayload; rlpPayload << stuff; @@ -623,8 +622,8 @@ BOOST_AUTO_TEST_CASE(singleFramePacketFlush) bytesRef frame = frameWithHeader.cropped(h256::size); RLPXFrameInfo f(header); BOOST_REQUIRE_EQUAL(f.multiFrame, false); - auto p = r.demux(decoder, f, frame); - packets += move(p); + for (RLPXPacket& p: r.demux(decoder, f, frame)) + packets.push_back(move(p)); RLPStream rlpPayload; rlpPayload << stuff; From fd6a96058a0b6ee115df6e13ccb301b104a3c26e Mon Sep 17 00:00:00 2001 From: subtly Date: Mon, 13 Jul 2015 11:02:31 -0700 Subject: [PATCH 22/22] Remove unused move. --- libp2p/RLPXPacket.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libp2p/RLPXPacket.h b/libp2p/RLPXPacket.h index 1499b855d..27a4f2853 100644 --- a/libp2p/RLPXPacket.h +++ b/libp2p/RLPXPacket.h @@ -40,13 +40,13 @@ class RLPXPacket { public: /// Construct packet. RLPStream data is invalidated. - RLPXPacket(uint8_t _capId, RLPStream& _type, RLPStream& _data): m_cap(_capId), m_type(std::move(_type.out())), m_data(std::move(_data.out())) {} + RLPXPacket(uint8_t _capId, RLPStream& _type, RLPStream& _data): m_cap(_capId), m_type(_type.out()), m_data(_data.out()) {} /// Construct packet from single bytestream. RLPStream data is invalidated. RLPXPacket(unsigned _capId, bytesConstRef _in): m_cap(_capId), m_type(nextRLP(_in).toBytes()) { if (_in.size() > m_type.size()) { m_data.resize(_in.size() - m_type.size()); _in.cropped(m_type.size()).copyTo(&m_data); } } RLPXPacket(RLPXPacket const& _p) = delete; - RLPXPacket(RLPXPacket&& _p): m_cap(_p.m_cap), m_type(std::move(_p.m_type)), m_data(std::move(_p.m_data)) {} + RLPXPacket(RLPXPacket&& _p): m_cap(_p.m_cap), m_type(_p.m_type), m_data(_p.m_data) {} bytes const& type() const { return m_type; }