Browse Source

mux/demux working test

cl-refactor
subtly 9 years ago
parent
commit
bcca52c443
  1. 34
      libp2p/RLPXFrameWriter.h
  2. 5
      libp2p/RLPXPacket.h
  3. 38
      test/libp2p/rlpx.cpp

34
libp2p/RLPXFrameWriter.h

@ -55,31 +55,36 @@ public:
BOOST_THROW_EXCEPTION(RLPXFrameDecrytFailed()); BOOST_THROW_EXCEPTION(RLPXFrameDecrytFailed());
std::vector<RLPXPacket> ret; std::vector<RLPXPacket> ret;
if (!_frame.size() || _frame.size() > _totalSize) if (!_sequence && (!_frame.size() || _frame.size() > _totalSize))
return ret; return ret;
// trim mac
bytesConstRef buffer = _frame.cropped(0, _frame.size() - h128::size);
// continue populating incomplete packets
if (_sequence && m_incomplete.count(_seq)) if (_sequence && m_incomplete.count(_seq))
{ {
uint32_t& remaining = m_incomplete.at(_seq).second; uint32_t& remaining = m_incomplete.at(_seq).second;
if (!_totalSize && _frame.size() <= remaining) if (!_totalSize && buffer.size() > 0 && buffer.size() <= remaining)
{ {
remaining -= buffer.size();
RLPXPacket& p = m_incomplete.at(_seq).first; RLPXPacket& p = m_incomplete.at(_seq).first;
if (_frame.size() > remaining) if(p.streamIn(buffer) && !remaining)
return ret;
else if(p.streamIn(_frame))
{
ret.push_back(std::move(p)); ret.push_back(std::move(p));
if (!remaining)
m_incomplete.erase(_seq); m_incomplete.erase(_seq);
}
else if (!ret.empty() && remaining)
remaining -= _frame.size(); BOOST_THROW_EXCEPTION(RLPXInvalidPacket());
else if (ret.empty() && !remaining)
BOOST_THROW_EXCEPTION(RLPXInvalidPacket());
return ret; return ret;
} }
else else
m_incomplete.erase(_seq); m_incomplete.erase(_seq);
} }
bytesConstRef buffer(_frame);
while (!buffer.empty()) while (!buffer.empty())
{ {
auto type = RLPXPacket::nextRLP(buffer); auto type = RLPXPacket::nextRLP(buffer);
@ -96,14 +101,15 @@ public:
if (p.isValid()) if (p.isValid())
ret.push_back(std::move(p)); ret.push_back(std::move(p));
else if (_sequence) else if (_sequence)
m_incomplete.insert(std::make_pair(_seq, std::make_pair(std::move(p), _totalSize - _frame.size()))); // ugh: need to make total-size inclusive of packet-type :/
m_incomplete.insert(std::make_pair(_seq, std::make_pair(std::move(p), _totalSize - packet.size())));
} }
return ret; return ret;
} }
protected: protected:
uint16_t m_protocolType; uint16_t m_protocolType;
std::map<uint16_t, std::pair<RLPXPacket, uint32_t>> m_incomplete; ///< Incomplete packets and bytes remaining. std::map<uint16_t, std::pair<RLPXPacket, uint32_t>> m_incomplete; ///< Sequence: Incomplete packet and bytes remaining.
}; };
/** /**

5
libp2p/RLPXPacket.h

@ -21,6 +21,7 @@
#pragma once #pragma once
#include <algorithm>
#include "Common.h" #include "Common.h"
namespace dev namespace dev
@ -37,7 +38,7 @@ struct RLPXInvalidPacket: virtual dev::Exception {};
class RLPXPacket class RLPXPacket
{ {
public: public:
static bytesConstRef nextRLP(bytesConstRef _b) { try { RLP r(_b, RLP::AllowNonCanon); auto s = r.actualSize(); if (s >= _b.size()) return _b.cropped(s); } catch(...) {} return bytesConstRef(); } static bytesConstRef nextRLP(bytesConstRef _b) { try { RLP r(_b, RLP::AllowNonCanon); return _b.cropped(0, std::min((size_t)r.actualSize(), _b.size())); } catch(...) {} return bytesConstRef(); }
/// Construct complete packet. RLPStream data is moved. /// Construct complete packet. RLPStream data is moved.
RLPXPacket(unsigned _capId, unsigned _type, RLPStream& _rlps): m_cap(_capId), m_type(_type), m_data(std::move(_rlps.out())) { if (!_type && !m_data.size()) BOOST_THROW_EXCEPTION(RLPXNullPacket()); } RLPXPacket(unsigned _capId, unsigned _type, RLPStream& _rlps): m_cap(_capId), m_type(_type), m_data(std::move(_rlps.out())) { if (!_type && !m_data.size()) BOOST_THROW_EXCEPTION(RLPXNullPacket()); }
@ -59,7 +60,7 @@ public:
protected: protected:
unsigned getType(bytesConstRef _rlp) { return RLP(_rlp.cropped(1)).toInt<unsigned>(); } unsigned getType(bytesConstRef _rlp) { return RLP(_rlp.cropped(0, 1)).toInt<unsigned>(); }
unsigned m_cap; unsigned m_cap;
unsigned m_type; unsigned m_type;

38
test/libp2p/rlpx.cpp

@ -461,7 +461,7 @@ BOOST_AUTO_TEST_CASE(readerWriter)
h256 remoteNonce = Nonce::get(); h256 remoteNonce = Nonce::get();
bytes ackCipher{0}; bytes ackCipher{0};
bytes authCipher{1}; bytes authCipher{1};
RLPXFrameCoder coder(true, remoteEph.pubkey(), remoteNonce, localEph, localNonce, &ackCipher, &authCipher); RLPXFrameCoder encoder(true, remoteEph.pubkey(), remoteNonce, localEph, localNonce, &ackCipher, &authCipher);
/// Test writing a 64byte packet and drain with minimum frame size that /// Test writing a 64byte packet and drain with minimum frame size that
/// forces packet to be pieced into 4 frames. /// forces packet to be pieced into 4 frames.
@ -480,46 +480,44 @@ BOOST_AUTO_TEST_CASE(readerWriter)
BOOST_REQUIRE_EQUAL(4, drains); BOOST_REQUIRE_EQUAL(4, drains);
RLPXFrameWriter w(0); RLPXFrameWriter w(0);
w.enque(0, (RLPStream() << payload)); RLPStream rlpPayload(RLPStream() << payload);
w.enque(0, rlpPayload);
vector<bytes> encframes; vector<bytes> encframes;
for (unsigned i = 1; i < drains; i++) for (unsigned i = 1; i < drains; i++)
{ {
auto n = w.mux(coder, RLPXFrameWriter::MinFrameDequeLength, encframes); auto n = w.mux(encoder, RLPXFrameWriter::MinFrameDequeLength, encframes);
BOOST_REQUIRE_EQUAL(0, n); BOOST_REQUIRE_EQUAL(0, n);
BOOST_REQUIRE_EQUAL(encframes.size(), i); BOOST_REQUIRE_EQUAL(encframes.size(), i);
} }
BOOST_REQUIRE_EQUAL(1, w.mux(coder, RLPXFrameWriter::MinFrameDequeLength, encframes)); BOOST_REQUIRE_EQUAL(1, w.mux(encoder, RLPXFrameWriter::MinFrameDequeLength, encframes));
BOOST_REQUIRE_EQUAL(encframes.size(), drains); BOOST_REQUIRE_EQUAL(encframes.size(), drains);
BOOST_REQUIRE_EQUAL(0, w.mux(coder, RLPXFrameWriter::MinFrameDequeLength, encframes)); BOOST_REQUIRE_EQUAL(0, w.mux(encoder, RLPXFrameWriter::MinFrameDequeLength, encframes));
BOOST_REQUIRE_EQUAL(encframes.size(), drains); BOOST_REQUIRE_EQUAL(encframes.size(), drains);
// we should now have a bunch of ciphertext in encframes // we should now have a bunch of ciphertext in encframes
BOOST_REQUIRE(encframes.size() == drains); BOOST_REQUIRE(encframes.size() == drains);
for (auto const& c: encframes) for (auto const& c: encframes)
{
BOOST_REQUIRE_EQUAL(c.size(), RLPXFrameWriter::MinFrameDequeLength); BOOST_REQUIRE_EQUAL(c.size(), RLPXFrameWriter::MinFrameDequeLength);
}
// read and assemble dequed encframes // read and assemble dequed encframes
RLPXFrameCoder decoder(false, localEph.pubkey(), localNonce, remoteEph, remoteNonce, &ackCipher, &authCipher);
vector<RLPXPacket> packets; vector<RLPXPacket> packets;
RLPXFrameReader r(0); RLPXFrameReader r(0);
for (size_t i = 0; i < encframes.size(); i++) for (size_t i = 0; i < encframes.size(); i++)
{ {
// This fails )-: bytesRef frameWithHeader(encframes[i].data(), encframes[i].size());
// auto size = encframes[i].size(); bytesRef header = frameWithHeader.cropped(0, h256::size);
// auto p = encframes[i].data(); bool decryptedHeader = decoder.authAndDecryptHeader(header);
// bytesRef frameWithHeader(encframes[i].data(), encframes[i].size()); BOOST_REQUIRE(decryptedHeader);
// bytesRef h = frameWithHeader.cropped(0, 16); bytesRef frame = frameWithHeader.cropped(h256::size);
// bool decryptedHeader = coder.authAndDecryptHeader(h); RLPXFrameInfo f(header);
// BOOST_REQUIRE(decryptedHeader); auto p = f.hasSequence ? r.demux(decoder, frame, true, f.sequenceId, f.totalLength) : r.demux(decoder, frame);
// bytesRef frame = frameWithHeader.cropped(16); if (p.size())
// auto packets = r.demux(coder, frame); packets += move(p);
// if (packets.size())
// packets += move(packets);
} }
BOOST_REQUIRE_EQUAL(packets.size(), 1); BOOST_REQUIRE_EQUAL(packets.size(), 1);
BOOST_REQUIRE_EQUAL(packets.front().size(), payload.size()); BOOST_REQUIRE_EQUAL(packets.front().size(), rlpPayload.out().size());
BOOST_REQUIRE_EQUAL(sha3(packets.front().data()), sha3(payload)); BOOST_REQUIRE_EQUAL(sha3(RLP(packets.front().data()).payload()), sha3(payload));
} }
BOOST_AUTO_TEST_SUITE_END() BOOST_AUTO_TEST_SUITE_END()

Loading…
Cancel
Save