Browse Source

rlpx frame reader fixes. rename writer::drain to mux.

cl-refactor
subtly 10 years ago
parent
commit
fe0f71536f
  1. 2
      libp2p/RLPXFrameWriter.cpp
  2. 86
      libp2p/RLPXFrameWriter.h
  3. 15
      libp2p/RLPXPacket.h
  4. 2
      libp2p/RLPXSocketIO.cpp
  5. 6
      test/libp2p/rlpx.cpp

2
libp2p/RLPXFrameWriter.cpp

@ -32,7 +32,7 @@ void RLPXFrameWriter::enque(unsigned _packetType, RLPStream& _payload, PacketPri
qs.q.push_back(move(RLPXPacket(m_protocolType, _packetType, _payload))); qs.q.push_back(move(RLPXPacket(m_protocolType, _packetType, _payload)));
} }
size_t RLPXFrameWriter::drain(RLPXFrameCoder& _coder, unsigned _size, vector<bytes>& o_toWrite) size_t RLPXFrameWriter::mux(RLPXFrameCoder& _coder, unsigned _size, vector<bytes>& o_toWrite)
{ {
static const size_t c_blockSize = h128::size; static const size_t c_blockSize = h128::size;
static const size_t c_overhead = c_blockSize * 3; // header + headerMac + frameMAC static const size_t c_overhead = c_blockSize * 3; // header + headerMac + frameMAC

86
libp2p/RLPXFrameWriter.h

@ -33,61 +33,71 @@ namespace dev
namespace p2p namespace p2p
{ {
/**
* RLPFrameReader
* Reads and assembles RLPX frame byte buffers into RLPX packets. Additionally
* buffers incomplete packets which are pieced into multiple frames (has sequence).
* @todo drop frame and reset incomplete if
* @todo percolate sequenceid to p2p protocol
* @todo informative exception
*/
class RLPXFrameReader class RLPXFrameReader
{ {
RLPXFrameReader(uint16_t _protocolType): m_protocolType(_protocolType) {} RLPXFrameReader(uint16_t _protocolType): m_protocolType(_protocolType) {}
std::vector<RLPXPacket> assemble(bytes& _in) /// Processes a single frame returning complete packets.
std::vector<RLPXPacket> demux(bytes& _frame, bool _sequence = false, uint16_t _seq = 0, uint32_t _totalSize = 0)
{ {
bytesConstRef buffer(&_in);
std::vector<RLPXPacket> ret; std::vector<RLPXPacket> ret;
if (!_in.size()) if (!_frame.size() || _frame.size() > _totalSize)
return ret; return ret;
if (_sequence && m_incomplete.count(_seq))
{
uint32_t& remaining = m_incomplete.at(_seq).second;
if (!_totalSize && _frame.size() <= remaining)
{
RLPXPacket& p = m_incomplete.at(_seq).first;
if (_frame.size() > remaining)
return ret;
else if(p.streamIn(&_frame))
{
ret.push_back(std::move(p));
m_incomplete.erase(_seq);
}
else
remaining -= _frame.size();
return ret;
}
else
m_incomplete.erase(_seq);
}
bytesConstRef buffer(&_frame);
while (!buffer.empty()) while (!buffer.empty())
{ {
auto type = RLPXPacket::nextRLP(buffer); auto type = RLPXPacket::nextRLP(buffer);
if (type.empty())
break;
buffer = buffer.cropped(type.size()); buffer = buffer.cropped(type.size());
auto packet = RLPXPacket::nextRLP(buffer); // consume entire buffer if packet has sequence
auto packet = _sequence ? buffer : RLPXPacket::nextRLP(buffer);
buffer = buffer.cropped(packet.size()); buffer = buffer.cropped(packet.size());
RLPXPacket p(m_protocolType, type); RLPXPacket p(m_protocolType, type);
p.streamIn(packet); if (!packet.empty())
ret.push_back(std::move(p)); p.streamIn(packet);
}
return ret;
}
std::vector<RLPXPacket> assemble(bytes& _in, uint16_t _seq, uint32_t _totalSize)
{
// TODO: cleanup sequencing api (throw exception(s) when bad things happen)
bytesConstRef buffer(&_in);
if (!m_incomplete.count(_seq) && _totalSize > 0)
{
// new packet
RLPXPacket p(m_protocolType, buffer);
if (p.isValid())
return std::vector<RLPXPacket>{std::move(p)};
m_incomplete[_seq] = std::move(p);
}
else
{
RLPXPacket& p = m_incomplete[_seq];
p.streamIn(buffer);
if (p.isValid()) if (p.isValid())
{ ret.push_back(std::move(p));
std::vector<RLPXPacket> ret{std::move(p)}; else if (_sequence)
m_incomplete.erase(_seq); m_incomplete.insert(std::make_pair(_seq, std::make_pair(std::move(p), _totalSize - _frame.size())));
return ret;
}
} }
return ret;
return std::vector<RLPXPacket>();
} }
protected: protected:
uint16_t m_protocolType; uint16_t m_protocolType;
std::map<uint16_t, RLPXPacket> m_incomplete; ///< Incomplete packets which span multiple frames. std::map<uint16_t, std::pair<RLPXPacket, uint32_t>> m_incomplete; ///< Incomplete packets and bytes remaining.
}; };
class RLPXFrameWriter class RLPXFrameWriter
@ -112,11 +122,11 @@ public:
size_t size() const { size_t l; size_t h; DEV_GUARDED(m_q.first.x) h = m_q.first.q.size(); DEV_GUARDED(m_q.second.x) l = m_q.second.q.size(); return l + h; } size_t size() const { size_t l; size_t h; DEV_GUARDED(m_q.first.x) h = m_q.first.q.size(); DEV_GUARDED(m_q.second.x) l = m_q.second.q.size(); return l + h; }
/// Contents of _payload will be moved. Thread-safe. /// Contents of _payload will be moved. Adds packet to queue, to be muxed into frames by mux when network buffer is ready for writing. Thread-safe.
void enque(unsigned _packetType, RLPStream& _payload, PacketPriority _priority = PriorityLow); void enque(unsigned _packetType, RLPStream& _payload, PacketPriority _priority = PriorityLow);
/// Returns number of packets framed and outputs frames to o_bytes. Not thread-safe. /// Returns number of packets framed and outputs frames to o_bytes. Not thread-safe.
size_t drain(RLPXFrameCoder& _coder, unsigned _size, std::vector<bytes>& o_toWrite); size_t mux(RLPXFrameCoder& _coder, unsigned _size, std::vector<bytes>& o_toWrite);
private: private:
uint16_t const m_protocolType; // Protocol Type uint16_t const m_protocolType; // Protocol Type

15
libp2p/RLPXPacket.h

@ -40,18 +40,22 @@ public:
static bytesConstRef nextRLP(bytesConstRef _b) { try { RLP r(_b, RLP::AllowNonCanon); auto s = r.actualSize(); if (s >= _b.size()) return _b.cropped(s); } catch(...) {} return bytesConstRef(); } static bytesConstRef nextRLP(bytesConstRef _b) { try { RLP r(_b, RLP::AllowNonCanon); auto s = r.actualSize(); if (s >= _b.size()) return _b.cropped(s); } catch(...) {} return bytesConstRef(); }
/// Construct complete packet. RLPStream data is moved. /// Construct complete packet. RLPStream data is moved.
RLPXPacket(unsigned _capId, unsigned _type, RLPStream&& _rlps): m_cap(_capId), m_type(_type), m_data(std::move(_rlps.out())) { if (!_type && !m_data.size()) BOOST_THROW_EXCEPTION(RLPXNullPacket()); } RLPXPacket(unsigned _capId, unsigned _type, RLPStream& _rlps): m_cap(_capId), m_type(_type), m_data(std::move(_rlps.out())) { if (!_type && !m_data.size()) BOOST_THROW_EXCEPTION(RLPXNullPacket()); }
/// Construct packet with type and initial bytes; type is determined by RLP of 1st byte and packet may be incomplete. /// Construct packet with type and initial bytes; type is determined by RLP of 1st byte and packet may be incomplete.
RLPXPacket(unsigned _capId, bytesConstRef _in): m_cap(_capId), m_type(getType(_in)) { assert(_in.size()); if (_in.size() > 1) { m_data.resize(_in.size() - 1); _in.cropped(1).copyTo(&m_data); } } RLPXPacket(unsigned _capId, bytesConstRef _in): m_cap(_capId), m_type(getType(_in)) { assert(_in.size()); if (_in.size() > 1) { m_data.resize(_in.size() - 1); _in.cropped(1).copyTo(&m_data); } }
RLPXPacket(RLPXPacket const& _p): m_cap(_p.m_cap), m_type(_p.m_type), m_data(_p.m_data) {}
RLPXPacket(RLPXPacket&& _p): m_cap(_p.m_cap), m_type(_p.m_type), m_data(std::move(_p.m_data)) {}
unsigned type() const { return m_type; } unsigned type() const { return m_type; }
bytes const& data() const { return m_data; } bytes const& data() const { return m_data; }
size_t size() const { return m_data.size(); } size_t size() const { try { return RLP(m_data, RLP::LaisezFaire).actualSize(); } catch(...) { return 0; } }
bool streamIn(bytesConstRef _in) { auto offset = m_data.size(); m_data.resize(offset + _in.size()); _in.copyTo(byestConstRef(m_data).cropped(offset)); return isValid(); } bool streamIn(bytesConstRef _in) { auto offset = m_data.size(); m_data.resize(offset + _in.size()); _in.copyTo(bytesRef(&m_data).cropped(offset)); return isValid(); }
bool isValid() { if (m_type > 0x7f) return false; try { if (RLP(m_data).actualSize() == m_data.size()) return true; } catch (...) {} return false; } bool isValid() { if (m_type > 0x7f) return false; try { return RLP(m_data).actualSize() == m_data.size(); } catch (...) {} return false; }
protected: protected:
@ -59,6 +63,7 @@ protected:
unsigned m_cap; unsigned m_cap;
unsigned m_type; unsigned m_type;
uint16_t m_sequence = 0;
bytes m_data; bytes m_data;
}; };

2
libp2p/RLPXSocketIO.cpp

@ -74,7 +74,7 @@ void RLPXSocketIO::doWrite()
size_t protFrameSize = capacity / active; size_t protFrameSize = capacity / active;
if (protFrameSize >= MinFrameSize) if (protFrameSize >= MinFrameSize)
for (auto& w: m_writers) for (auto& w: m_writers)
dequed += w.drain(m_coder, protFrameSize, m_toSend); dequed += w.mux(m_coder, protFrameSize, m_toSend);
if (dequed) if (dequed)
write(dequed); write(dequed);

6
test/libp2p/rlpx.cpp

@ -484,13 +484,13 @@ BOOST_AUTO_TEST_CASE(readerWriter)
vector<bytes> out; vector<bytes> out;
for (unsigned i = 1; i < drains; i++) for (unsigned i = 1; i < drains; i++)
{ {
auto n = w.drain(coder, RLPXFrameWriter::MinFrameDequeLength, out); auto n = w.mux(coder, RLPXFrameWriter::MinFrameDequeLength, out);
BOOST_REQUIRE_EQUAL(0, n); BOOST_REQUIRE_EQUAL(0, n);
BOOST_REQUIRE_EQUAL(out.size(), i); BOOST_REQUIRE_EQUAL(out.size(), i);
} }
BOOST_REQUIRE_EQUAL(1, w.drain(coder, RLPXFrameWriter::MinFrameDequeLength, out)); BOOST_REQUIRE_EQUAL(1, w.mux(coder, RLPXFrameWriter::MinFrameDequeLength, out));
BOOST_REQUIRE_EQUAL(out.size(), drains); BOOST_REQUIRE_EQUAL(out.size(), drains);
BOOST_REQUIRE_EQUAL(0, w.drain(coder, RLPXFrameWriter::MinFrameDequeLength, out)); BOOST_REQUIRE_EQUAL(0, w.mux(coder, RLPXFrameWriter::MinFrameDequeLength, out));
BOOST_REQUIRE_EQUAL(out.size(), drains); BOOST_REQUIRE_EQUAL(out.size(), drains);
// we should now have a bunch of ciphertext in out // we should now have a bunch of ciphertext in out

Loading…
Cancel
Save