Browse Source

Cleanup. Interface for enqueuing packets. Fix: Include packet type in total size.

cl-refactor
subtly 10 years ago
parent
commit
639b4fb13c
  1. 30
      libp2p/RLPXFrameCoder.h
  2. 112
      libp2p/RLPXFrameReader.h
  3. 31
      libp2p/RLPXFrameWriter.cpp
  4. 87
      libp2p/RLPXFrameWriter.h
  5. 37
      libp2p/RLPXPacket.h
  6. 8
      test/libp2p/rlpx.cpp

30
libp2p/RLPXFrameCoder.h

@ -33,18 +33,24 @@ namespace dev
namespace p2p
{
struct RLPXFrameDecrytFailed: virtual dev::Exception {};
/**
* @brief Encapsulation of Frame
* @todo coder integration; padding derived from coder
*/
struct RLPXFrameInfo
{
RLPXFrameInfo() = default;
/// Constructor. frame-size || protocol-type, [sequence-id[, total-packet-size]]
RLPXFrameInfo(bytesConstRef _frameHeader);
uint32_t length = 0; ///< Max: 2**24
uint8_t padding = 0;
uint32_t length = 0; ///< Size of frame (excludes padding). Max: 2**24
uint8_t padding = 0; ///< Length of padding which follows @length.
uint16_t protocolId = 0;
bool hasSequence = false;
uint16_t sequenceId = 0;
uint32_t totalLength = 0;
uint16_t protocolId = 0; ///< Protocol ID as negotiated by handshake.
bool hasSequence = false; ///< If this frame is part of a sequence
uint16_t sequenceId = 0; ///< Sequence ID of frame
uint32_t totalLength = 0; ///< Set to
};
class RLPXHandshake;
@ -52,8 +58,12 @@ class RLPXHandshake;
/**
* @brief Encoder/decoder transport for RLPx connection established by RLPXHandshake.
*
* @todo rename to RLPXTranscoder
* @todo Remove 'Frame' nomenclature and expect caller to provide RLPXFrame
* @todo Remove handshake as friend, remove handshake-based constructor
*
* Thread Safety
* Distinct Objects: Safe.
* Distinct Objects: Unsafe.
* Shared objects: Unsafe.
*/
class RLPXFrameCoder
@ -69,7 +79,7 @@ public:
~RLPXFrameCoder() {}
/// Establish shared secrets and setup AES and MAC states. Used by both constructors.
/// Establish shared secrets and setup AES and MAC states.
void setup(bool _originated, h512 _remoteEphemeral, h256 _remoteNonce, crypto::ECDHE const& _ephemeral, h256 _nonce, bytesConstRef _ackCipher, bytesConstRef _authCipher);
/// Write single-frame payload of packet(s).
@ -78,10 +88,10 @@ public:
/// Write continuation frame of segmented payload.
void writeFrame(uint16_t _protocolType, uint16_t _seqId, bytesConstRef _payload, bytes& o_bytes);
/// Write first frame of segmented payload.
/// Write first frame of segmented or sequence-tagged payload.
void writeFrame(uint16_t _protocolType, uint16_t _seqId, uint32_t _totalSize, bytesConstRef _payload, bytes& o_bytes);
/// Encrypt _packet as RLPx frame.
/// Legacy. Encrypt _packet as ill-defined legacy RLPx frame.
void writeSingleFramePacket(bytesConstRef _packet, bytes& o_bytes);
/// Authenticate and decrypt header in-place.

112
libp2p/RLPXFrameReader.h

@ -0,0 +1,112 @@
/*
This file is part of cpp-ethereum.
cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file RLPXFrameReader.h
* @author Alex Leverington <nessence@gmail.com>
* @date 2015
*/
#pragma once
#include <libdevcore/Guards.h>
#include "RLPXFrameCoder.h"
#include "RLPXPacket.h"
namespace ba = boost::asio;
namespace bi = boost::asio::ip;
namespace dev
{
namespace p2p
{
/**
* RLPFrameReader
* Reads and assembles RLPX frame byte buffers into RLPX packets. Additionally
* buffers incomplete packets which are pieced into multiple frames (has sequence).
*/
class RLPXFrameReader
{
public:
RLPXFrameReader(uint16_t _protocolType): m_protocolType(_protocolType) {}
/// Processes a single frame returning complete packets.
std::vector<RLPXPacket> demux(RLPXFrameCoder& _coder, bytesRef _frame, bool _sequence = false, uint16_t _seq = 0, uint32_t _totalSize = 0)
{
if (!_coder.authAndDecryptFrame(_frame))
BOOST_THROW_EXCEPTION(RLPXFrameDecrytFailed());
std::vector<RLPXPacket> ret;
if (!_sequence && (!_frame.size() || _frame.size() > _totalSize))
return ret;
// trim mac
bytesConstRef buffer = _frame.cropped(0, _frame.size() - h128::size);
// continue populating incomplete packets
if (_sequence && m_incomplete.count(_seq))
{
uint32_t& remaining = m_incomplete.at(_seq).second;
if (!_totalSize && buffer.size() > 0 && buffer.size() <= remaining)
{
remaining -= buffer.size();
RLPXPacket& p = m_incomplete.at(_seq).first;
if(p.append(buffer) && !remaining)
ret.push_back(std::move(p));
if (!remaining)
m_incomplete.erase(_seq);
if (!ret.empty() && remaining)
BOOST_THROW_EXCEPTION(RLPXInvalidPacket());
else if (ret.empty() && !remaining)
BOOST_THROW_EXCEPTION(RLPXInvalidPacket());
return ret;
}
else
m_incomplete.erase(_seq);
}
while (!buffer.empty())
{
auto type = nextRLP(buffer);
if (type.empty())
break;
buffer = buffer.cropped(type.size());
// consume entire buffer if packet has sequence
auto packet = _sequence ? buffer : nextRLP(buffer);
buffer = buffer.cropped(packet.size());
RLPXPacket p(m_protocolType, type);
if (!packet.empty())
p.append(packet);
uint32_t remaining = _totalSize - type.size() - packet.size();
if (p.isValid())
ret.push_back(std::move(p));
else if (_sequence && remaining)
m_incomplete.insert(std::make_pair(_seq, std::make_pair(std::move(p), remaining)));
// else drop invalid packet
}
return ret;
}
protected:
uint16_t m_protocolType;
std::map<uint16_t, std::pair<RLPXPacket, uint32_t>> m_incomplete; ///< Sequence: Incomplete packet and bytes remaining.
};
}
}

31
libp2p/RLPXFrameWriter.cpp

@ -28,11 +28,18 @@ using namespace dev::p2p;
const uint16_t RLPXFrameWriter::EmptyFrameLength = h128::size * 3; // header + headerMAC + frameMAC
const uint16_t RLPXFrameWriter::MinFrameDequeLength = h128::size * 4; // header + headerMAC + padded-block + frameMAC
void RLPXFrameWriter::enque(unsigned _packetType, RLPStream& _payload, PacketPriority _priority)
void RLPXFrameWriter::enque(RLPXPacket&& _p, PacketPriority _priority)
{
if (!_p.isValid())
return;
QueueState& qs = _priority ? m_q.first : m_q.second;
DEV_GUARDED(qs.x)
qs.q.push_back(move(RLPXPacket(m_protocolType, _packetType, _payload)));
qs.q.push_back(move(_p));
}
void RLPXFrameWriter::enque(unsigned _packetType, RLPStream& _payload, PacketPriority _priority)
{
enque(RLPXPacket(m_protocolType, (RLPStream() << _packetType), _payload), _priority);
}
size_t RLPXFrameWriter::mux(RLPXFrameCoder& _coder, unsigned _size, vector<bytes>& o_toWrite)
@ -70,13 +77,11 @@ size_t RLPXFrameWriter::mux(RLPXFrameCoder& _coder, unsigned _size, vector<bytes
{
DEV_GUARDED(qs.x)
qs.writing = &qs.q[0];
qs.remaining = qs.writing->size();
bytes packetType = rlp(qs.writing->type());
qs.sequenced = qs.remaining + packetType.size() > frameAllot;
qs.sequenced = qs.writing->size() > frameAllot;
// stop here if we can't write-out packet-type
// or payload already packed and packet won't fit
if (packetType.size() > frameAllot || (qs.sequenced && payload.size()))
// break here if we can't write-out packet-type
// or payload is packed and next packet won't fit (implicit)
if (qs.writing->type().size() > frameAllot || (qs.sequenced && !payload.empty()))
{
qs.writing = nullptr;
qs.remaining = 0;
@ -85,14 +90,16 @@ size_t RLPXFrameWriter::mux(RLPXFrameCoder& _coder, unsigned _size, vector<bytes
}
else if (qs.sequenced)
qs.sequence = ++m_sequenceId;
frameAllot -= packetType.size();
payload += packetType;
frameAllot -= qs.writing->type().size();
payload += qs.writing->type();
qs.remaining = qs.writing->data().size();
}
assert(qs.sequenced || (!qs.sequenced && frameAllot >= qs.remaining));
if (frameAllot && qs.remaining)
{
offset = qs.writing->size() - qs.remaining;
offset = qs.writing->data().size() - qs.remaining;
length = qs.remaining <= frameAllot ? qs.remaining : frameAllot;
bytes portion = bytesConstRef(&qs.writing->data()).cropped(offset, length).toBytes();
qs.remaining -= length;

87
libp2p/RLPXFrameWriter.h

@ -33,88 +33,11 @@ namespace dev
namespace p2p
{
struct RLPXFrameDecrytFailed: virtual dev::Exception {};
/**
* RLPFrameReader
* Reads and assembles RLPX frame byte buffers into RLPX packets. Additionally
* buffers incomplete packets which are pieced into multiple frames (has sequence).
* @todo drop frame and reset incomplete if
* @todo percolate sequenceid to p2p protocol
* @todo informative exception
*/
class RLPXFrameReader
{
public:
RLPXFrameReader(uint16_t _protocolType): m_protocolType(_protocolType) {}
/// Processes a single frame returning complete packets.
std::vector<RLPXPacket> demux(RLPXFrameCoder& _coder, bytesRef _frame, bool _sequence = false, uint16_t _seq = 0, uint32_t _totalSize = 0)
{
if (!_coder.authAndDecryptFrame(_frame))
BOOST_THROW_EXCEPTION(RLPXFrameDecrytFailed());
std::vector<RLPXPacket> ret;
if (!_sequence && (!_frame.size() || _frame.size() > _totalSize))
return ret;
// trim mac
bytesConstRef buffer = _frame.cropped(0, _frame.size() - h128::size);
// continue populating incomplete packets
if (_sequence && m_incomplete.count(_seq))
{
uint32_t& remaining = m_incomplete.at(_seq).second;
if (!_totalSize && buffer.size() > 0 && buffer.size() <= remaining)
{
remaining -= buffer.size();
RLPXPacket& p = m_incomplete.at(_seq).first;
if(p.streamIn(buffer) && !remaining)
ret.push_back(std::move(p));
if (!remaining)
m_incomplete.erase(_seq);
if (!ret.empty() && remaining)
BOOST_THROW_EXCEPTION(RLPXInvalidPacket());
else if (ret.empty() && !remaining)
BOOST_THROW_EXCEPTION(RLPXInvalidPacket());
return ret;
}
else
m_incomplete.erase(_seq);
}
while (!buffer.empty())
{
auto type = RLPXPacket::nextRLP(buffer);
if (type.empty())
break;
buffer = buffer.cropped(type.size());
// consume entire buffer if packet has sequence
auto packet = _sequence ? buffer : RLPXPacket::nextRLP(buffer);
buffer = buffer.cropped(packet.size());
RLPXPacket p(m_protocolType, type);
if (!packet.empty())
p.streamIn(packet);
if (p.isValid())
ret.push_back(std::move(p));
else if (_sequence)
// ugh: need to make total-size inclusive of packet-type :/
m_incomplete.insert(std::make_pair(_seq, std::make_pair(std::move(p), _totalSize - packet.size())));
}
return ret;
}
protected:
uint16_t m_protocolType;
std::map<uint16_t, std::pair<RLPXPacket, uint32_t>> m_incomplete; ///< Sequence: Incomplete packet and bytes remaining.
};
/**
* RLPXFrameWriter
* Multiplex and encrypted packets into RLPX frames.
* Multiplex packets into encrypted RLPX frames.
* @todo throw when enqueued packet is invalid
* @todo use RLPXFrameInfo
* @todo flag to disable multiple packets per frame
*/
class RLPXFrameWriter
@ -139,9 +62,11 @@ public:
size_t size() const { size_t l; size_t h; DEV_GUARDED(m_q.first.x) h = m_q.first.q.size(); DEV_GUARDED(m_q.second.x) l = m_q.second.q.size(); return l + h; }
/// Contents of _payload will be moved. Adds packet to queue, to be muxed into frames by mux when network buffer is ready for writing. Thread-safe.
/// Adds @_payload to queue (moves @_payload), to be muxed into frames by mux when network buffer is ready for writing. Thread-safe.
void enque(unsigned _packetType, RLPStream& _payload, PacketPriority _priority = PriorityLow);
void enque(RLPXPacket&& _p, PacketPriority _priority = PriorityLow);
/// Returns number of packets framed and outputs frames to o_bytes. Not thread-safe.
size_t mux(RLPXFrameCoder& _coder, unsigned _size, std::vector<bytes>& o_toWrite);

37
libp2p/RLPXPacket.h

@ -29,42 +29,39 @@ namespace dev
namespace p2p
{
struct RLPXNullPacket: virtual dev::Exception {};
struct RLPXInvalidPacket: virtual dev::Exception {};
static bytesConstRef nextRLP(bytesConstRef _b) { try { RLP r(_b, RLP::AllowNonCanon); return _b.cropped(0, std::min((size_t)r.actualSize(), _b.size())); } catch(...) {} return bytesConstRef(); }
/**
* RLPX Packet
*/
class RLPXPacket
{
public:
static bytesConstRef nextRLP(bytesConstRef _b) { try { RLP r(_b, RLP::AllowNonCanon); return _b.cropped(0, std::min((size_t)r.actualSize(), _b.size())); } catch(...) {} return bytesConstRef(); }
/// Construct complete packet. RLPStream data is moved.
RLPXPacket(unsigned _capId, unsigned _type, RLPStream& _rlps): m_cap(_capId), m_type(_type), m_data(std::move(_rlps.out())) { if (!_type && !m_data.size()) BOOST_THROW_EXCEPTION(RLPXNullPacket()); }
/// Construct packet. RLPStream data is invalidated.
RLPXPacket(uint8_t _capId, RLPStream& _type, RLPStream& _data): m_cap(_capId), m_type(std::move(_type.out())), m_data(std::move(_data.out())) {}
/// Construct packet with type and initial bytes; type is determined by RLP of 1st byte and packet may be incomplete.
RLPXPacket(unsigned _capId, bytesConstRef _in): m_cap(_capId), m_type(getType(_in)) { assert(_in.size()); if (_in.size() > 1) { m_data.resize(_in.size() - 1); _in.cropped(1).copyTo(&m_data); } }
/// Construct packet from single bytestream. RLPStream data is invalidated.
RLPXPacket(unsigned _capId, bytesConstRef _in): m_cap(_capId), m_type(nextRLP(_in).toBytes()) { if (_in.size() > m_type.size()) { m_data.resize(_in.size() - m_type.size()); _in.cropped(m_type.size()).copyTo(&m_data); } }
RLPXPacket(RLPXPacket const& _p): m_cap(_p.m_cap), m_type(_p.m_type), m_data(_p.m_data) {}
RLPXPacket(RLPXPacket&& _p): m_cap(_p.m_cap), m_type(_p.m_type), m_data(std::move(_p.m_data)) {}
unsigned type() const { return m_type; }
RLPXPacket(RLPXPacket&& _p): m_cap(_p.m_cap), m_type(std::move(_p.m_type)), m_data(std::move(_p.m_data)) {}
bytes const& type() const { return m_type; }
bytes const& data() const { return m_data; }
size_t size() const { try { return RLP(m_data, RLP::LaissezFaire).actualSize(); } catch(...) { return 0; } }
size_t size() const { try { return RLP(m_type).actualSize() + RLP(m_data, RLP::LaissezFaire).actualSize(); } catch(...) { return 0; } }
bool streamIn(bytesConstRef _in) { auto offset = m_data.size(); m_data.resize(offset + _in.size()); _in.copyTo(bytesRef(&m_data).cropped(offset)); return isValid(); }
/// Appends byte data and returns if packet is valid.
bool append(bytesConstRef _in) { auto offset = m_data.size(); m_data.resize(offset + _in.size()); _in.copyTo(bytesRef(&m_data).cropped(offset)); return isValid(); }
bool isValid() { if (m_type > 0x7f) return false; try { return RLP(m_data).actualSize() == m_data.size(); } catch (...) {} return false; }
virtual bool isValid() const noexcept { try { return !(m_type.empty() && m_data.empty()) && RLP(m_type).actualSize() == m_type.size() && RLP(m_data).actualSize() == m_data.size(); } catch (...) {} return false; }
protected:
unsigned getType(bytesConstRef _rlp) { return RLP(_rlp.cropped(0, 1)).toInt<unsigned>(); }
unsigned m_cap;
unsigned m_type;
uint16_t m_sequence = 0;
uint8_t m_cap;
bytes m_type;
bytes m_data;
};

8
test/libp2p/rlpx.cpp

@ -32,6 +32,7 @@
#include <libdevcrypto/CryptoPP.h>
#include <libp2p/RLPxHandshake.h>
#include <libp2p/RLPXFrameWriter.h>
#include <libp2p/RLPXFrameReader.h>
using namespace std;
using namespace dev;
@ -481,7 +482,9 @@ BOOST_AUTO_TEST_CASE(readerWriter)
RLPXFrameWriter w(0);
RLPStream rlpPayload(RLPStream() << payload);
w.enque(0, rlpPayload);
uint8_t packetType = 0;
bytes packetTypeRLP = (RLPStream() << packetType).out();
w.enque(packetType, rlpPayload);
vector<bytes> encframes;
for (unsigned i = 1; i < drains; i++)
{
@ -516,8 +519,9 @@ BOOST_AUTO_TEST_CASE(readerWriter)
packets += move(p);
}
BOOST_REQUIRE_EQUAL(packets.size(), 1);
BOOST_REQUIRE_EQUAL(packets.front().size(), rlpPayload.out().size());
BOOST_REQUIRE_EQUAL(packets.front().size(), packetTypeRLP.size() + rlpPayload.out().size());
BOOST_REQUIRE_EQUAL(sha3(RLP(packets.front().data()).payload()), sha3(payload));
BOOST_REQUIRE_EQUAL(sha3(packets.front().type()), sha3(packetTypeRLP));
}
BOOST_AUTO_TEST_SUITE_END()

Loading…
Cancel
Save