From fe0f71536f6b420bedde8a7402a31e6cad544cc1 Mon Sep 17 00:00:00 2001 From: subtly Date: Fri, 19 Jun 2015 23:42:16 -0400 Subject: [PATCH] rlpx frame reader fixes. rename writer::drain to mux. --- libp2p/RLPXFrameWriter.cpp | 2 +- libp2p/RLPXFrameWriter.h | 86 +++++++++++++++++++++----------------- libp2p/RLPXPacket.h | 15 ++++--- libp2p/RLPXSocketIO.cpp | 2 +- test/libp2p/rlpx.cpp | 6 +-- 5 files changed, 63 insertions(+), 48 deletions(-) diff --git a/libp2p/RLPXFrameWriter.cpp b/libp2p/RLPXFrameWriter.cpp index e3b656fb5..c8c1e8ceb 100644 --- a/libp2p/RLPXFrameWriter.cpp +++ b/libp2p/RLPXFrameWriter.cpp @@ -32,7 +32,7 @@ void RLPXFrameWriter::enque(unsigned _packetType, RLPStream& _payload, PacketPri qs.q.push_back(move(RLPXPacket(m_protocolType, _packetType, _payload))); } -size_t RLPXFrameWriter::drain(RLPXFrameCoder& _coder, unsigned _size, vector& o_toWrite) +size_t RLPXFrameWriter::mux(RLPXFrameCoder& _coder, unsigned _size, vector& o_toWrite) { static const size_t c_blockSize = h128::size; static const size_t c_overhead = c_blockSize * 3; // header + headerMac + frameMAC diff --git a/libp2p/RLPXFrameWriter.h b/libp2p/RLPXFrameWriter.h index 60f711add..957bb09a2 100644 --- a/libp2p/RLPXFrameWriter.h +++ b/libp2p/RLPXFrameWriter.h @@ -33,61 +33,71 @@ namespace dev namespace p2p { +/** + * RLPFrameReader + * Reads and assembles RLPX frame byte buffers into RLPX packets. Additionally + * buffers incomplete packets which are pieced into multiple frames (has sequence). + * @todo drop frame and reset incomplete if + * @todo percolate sequenceid to p2p protocol + * @todo informative exception + */ class RLPXFrameReader { RLPXFrameReader(uint16_t _protocolType): m_protocolType(_protocolType) {} - std::vector assemble(bytes& _in) + /// Processes a single frame returning complete packets. + std::vector demux(bytes& _frame, bool _sequence = false, uint16_t _seq = 0, uint32_t _totalSize = 0) { - bytesConstRef buffer(&_in); std::vector ret; - if (!_in.size()) + if (!_frame.size() || _frame.size() > _totalSize) return ret; - + + if (_sequence && m_incomplete.count(_seq)) + { + uint32_t& remaining = m_incomplete.at(_seq).second; + if (!_totalSize && _frame.size() <= remaining) + { + RLPXPacket& p = m_incomplete.at(_seq).first; + if (_frame.size() > remaining) + return ret; + else if(p.streamIn(&_frame)) + { + ret.push_back(std::move(p)); + m_incomplete.erase(_seq); + } + else + remaining -= _frame.size(); + return ret; + } + else + m_incomplete.erase(_seq); + } + + bytesConstRef buffer(&_frame); while (!buffer.empty()) { auto type = RLPXPacket::nextRLP(buffer); + if (type.empty()) + break; buffer = buffer.cropped(type.size()); - auto packet = RLPXPacket::nextRLP(buffer); + // consume entire buffer if packet has sequence + auto packet = _sequence ? buffer : RLPXPacket::nextRLP(buffer); buffer = buffer.cropped(packet.size()); RLPXPacket p(m_protocolType, type); - p.streamIn(packet); - ret.push_back(std::move(p)); - } - return ret; - } - - std::vector assemble(bytes& _in, uint16_t _seq, uint32_t _totalSize) - { - // TODO: cleanup sequencing api (throw exception(s) when bad things happen) - - bytesConstRef buffer(&_in); - if (!m_incomplete.count(_seq) && _totalSize > 0) - { - // new packet - RLPXPacket p(m_protocolType, buffer); - if (p.isValid()) - return std::vector{std::move(p)}; - m_incomplete[_seq] = std::move(p); - } - else - { - RLPXPacket& p = m_incomplete[_seq]; - p.streamIn(buffer); + if (!packet.empty()) + p.streamIn(packet); + if (p.isValid()) - { - std::vector ret{std::move(p)}; - m_incomplete.erase(_seq); - return ret; - } + ret.push_back(std::move(p)); + else if (_sequence) + m_incomplete.insert(std::make_pair(_seq, std::make_pair(std::move(p), _totalSize - _frame.size()))); } - - return std::vector(); + return ret; } protected: uint16_t m_protocolType; - std::map m_incomplete; ///< Incomplete packets which span multiple frames. + std::map> m_incomplete; ///< Incomplete packets and bytes remaining. }; class RLPXFrameWriter @@ -112,11 +122,11 @@ public: size_t size() const { size_t l; size_t h; DEV_GUARDED(m_q.first.x) h = m_q.first.q.size(); DEV_GUARDED(m_q.second.x) l = m_q.second.q.size(); return l + h; } - /// Contents of _payload will be moved. Thread-safe. + /// Contents of _payload will be moved. Adds packet to queue, to be muxed into frames by mux when network buffer is ready for writing. Thread-safe. void enque(unsigned _packetType, RLPStream& _payload, PacketPriority _priority = PriorityLow); /// Returns number of packets framed and outputs frames to o_bytes. Not thread-safe. - size_t drain(RLPXFrameCoder& _coder, unsigned _size, std::vector& o_toWrite); + size_t mux(RLPXFrameCoder& _coder, unsigned _size, std::vector& o_toWrite); private: uint16_t const m_protocolType; // Protocol Type diff --git a/libp2p/RLPXPacket.h b/libp2p/RLPXPacket.h index 42941d02f..1421c74f7 100644 --- a/libp2p/RLPXPacket.h +++ b/libp2p/RLPXPacket.h @@ -40,18 +40,22 @@ public: static bytesConstRef nextRLP(bytesConstRef _b) { try { RLP r(_b, RLP::AllowNonCanon); auto s = r.actualSize(); if (s >= _b.size()) return _b.cropped(s); } catch(...) {} return bytesConstRef(); } /// Construct complete packet. RLPStream data is moved. - RLPXPacket(unsigned _capId, unsigned _type, RLPStream&& _rlps): m_cap(_capId), m_type(_type), m_data(std::move(_rlps.out())) { if (!_type && !m_data.size()) BOOST_THROW_EXCEPTION(RLPXNullPacket()); } + RLPXPacket(unsigned _capId, unsigned _type, RLPStream& _rlps): m_cap(_capId), m_type(_type), m_data(std::move(_rlps.out())) { if (!_type && !m_data.size()) BOOST_THROW_EXCEPTION(RLPXNullPacket()); } /// Construct packet with type and initial bytes; type is determined by RLP of 1st byte and packet may be incomplete. RLPXPacket(unsigned _capId, bytesConstRef _in): m_cap(_capId), m_type(getType(_in)) { assert(_in.size()); if (_in.size() > 1) { m_data.resize(_in.size() - 1); _in.cropped(1).copyTo(&m_data); } } + RLPXPacket(RLPXPacket const& _p): m_cap(_p.m_cap), m_type(_p.m_type), m_data(_p.m_data) {} + + RLPXPacket(RLPXPacket&& _p): m_cap(_p.m_cap), m_type(_p.m_type), m_data(std::move(_p.m_data)) {} + unsigned type() const { return m_type; } bytes const& data() const { return m_data; } - size_t size() const { return m_data.size(); } - - bool streamIn(bytesConstRef _in) { auto offset = m_data.size(); m_data.resize(offset + _in.size()); _in.copyTo(byestConstRef(m_data).cropped(offset)); return isValid(); } + size_t size() const { try { return RLP(m_data, RLP::LaisezFaire).actualSize(); } catch(...) { return 0; } } + + bool streamIn(bytesConstRef _in) { auto offset = m_data.size(); m_data.resize(offset + _in.size()); _in.copyTo(bytesRef(&m_data).cropped(offset)); return isValid(); } - bool isValid() { if (m_type > 0x7f) return false; try { if (RLP(m_data).actualSize() == m_data.size()) return true; } catch (...) {} return false; } + bool isValid() { if (m_type > 0x7f) return false; try { return RLP(m_data).actualSize() == m_data.size(); } catch (...) {} return false; } protected: @@ -59,6 +63,7 @@ protected: unsigned m_cap; unsigned m_type; + uint16_t m_sequence = 0; bytes m_data; }; diff --git a/libp2p/RLPXSocketIO.cpp b/libp2p/RLPXSocketIO.cpp index ccd30409d..deb47a728 100644 --- a/libp2p/RLPXSocketIO.cpp +++ b/libp2p/RLPXSocketIO.cpp @@ -74,7 +74,7 @@ void RLPXSocketIO::doWrite() size_t protFrameSize = capacity / active; if (protFrameSize >= MinFrameSize) for (auto& w: m_writers) - dequed += w.drain(m_coder, protFrameSize, m_toSend); + dequed += w.mux(m_coder, protFrameSize, m_toSend); if (dequed) write(dequed); diff --git a/test/libp2p/rlpx.cpp b/test/libp2p/rlpx.cpp index e7cd1b657..4e974861b 100644 --- a/test/libp2p/rlpx.cpp +++ b/test/libp2p/rlpx.cpp @@ -484,13 +484,13 @@ BOOST_AUTO_TEST_CASE(readerWriter) vector out; for (unsigned i = 1; i < drains; i++) { - auto n = w.drain(coder, RLPXFrameWriter::MinFrameDequeLength, out); + auto n = w.mux(coder, RLPXFrameWriter::MinFrameDequeLength, out); BOOST_REQUIRE_EQUAL(0, n); BOOST_REQUIRE_EQUAL(out.size(), i); } - BOOST_REQUIRE_EQUAL(1, w.drain(coder, RLPXFrameWriter::MinFrameDequeLength, out)); + BOOST_REQUIRE_EQUAL(1, w.mux(coder, RLPXFrameWriter::MinFrameDequeLength, out)); BOOST_REQUIRE_EQUAL(out.size(), drains); - BOOST_REQUIRE_EQUAL(0, w.drain(coder, RLPXFrameWriter::MinFrameDequeLength, out)); + BOOST_REQUIRE_EQUAL(0, w.mux(coder, RLPXFrameWriter::MinFrameDequeLength, out)); BOOST_REQUIRE_EQUAL(out.size(), drains); // we should now have a bunch of ciphertext in out