Browse Source

Fixup and document frame writer. Add test for coalesced packets.

cl-refactor
subtly 10 years ago
parent
commit
bd2d99f169
  1. 10
      libp2p/RLPXFrameCoder.cpp
  2. 65
      libp2p/RLPXFrameWriter.cpp
  3. 27
      libp2p/RLPXFrameWriter.h
  4. 52
      test/libp2p/rlpx.cpp

10
libp2p/RLPXFrameCoder.cpp

@ -56,7 +56,7 @@ void RLPXFrameCoder::setup(bool _originated, h512 _remoteEphemeral, h256 _remote
{ {
bytes keyMaterialBytes(64); bytes keyMaterialBytes(64);
bytesRef keyMaterial(&keyMaterialBytes); bytesRef keyMaterial(&keyMaterialBytes);
// shared-secret = sha3(ecdhe-shared-secret || sha3(nonce || initiator-nonce)) // shared-secret = sha3(ecdhe-shared-secret || sha3(nonce || initiator-nonce))
Secret ephemeralShared; Secret ephemeralShared;
_ecdhe.agree(_remoteEphemeral, ephemeralShared); _ecdhe.agree(_remoteEphemeral, ephemeralShared);
@ -81,13 +81,13 @@ void RLPXFrameCoder::setup(bool _originated, h512 _remoteEphemeral, h256 _remote
h128 iv; h128 iv;
m_frameEnc.SetKeyWithIV(m_frameEncKey, h256::size, iv.data()); m_frameEnc.SetKeyWithIV(m_frameEncKey, h256::size, iv.data());
m_frameDec.SetKeyWithIV(m_frameDecKey, h256::size, iv.data()); m_frameDec.SetKeyWithIV(m_frameDecKey, h256::size, iv.data());
// mac-secret = sha3(ecdhe-shared-secret || aes-secret) // mac-secret = sha3(ecdhe-shared-secret || aes-secret)
sha3(keyMaterial, outRef); // output mac-secret sha3(keyMaterial, outRef); // output mac-secret
m_macEncKey.resize(h256::size); m_macEncKey.resize(h256::size);
memcpy(m_macEncKey.data(), outRef.data(), h256::size); memcpy(m_macEncKey.data(), outRef.data(), h256::size);
m_macEnc.SetKey(m_macEncKey, h256::size); m_macEnc.SetKey(m_macEncKey, h256::size);
// Initiator egress-mac: sha3(mac-secret^recipient-nonce || auth-sent-init) // Initiator egress-mac: sha3(mac-secret^recipient-nonce || auth-sent-init)
// ingress-mac: sha3(mac-secret^initiator-nonce || auth-recvd-ack) // ingress-mac: sha3(mac-secret^initiator-nonce || auth-recvd-ack)
// Recipient egress-mac: sha3(mac-secret^initiator-nonce || auth-sent-ack) // Recipient egress-mac: sha3(mac-secret^initiator-nonce || auth-sent-ack)
@ -99,7 +99,7 @@ void RLPXFrameCoder::setup(bool _originated, h512 _remoteEphemeral, h256 _remote
keyMaterial.retarget(keyMaterialBytes.data(), keyMaterialBytes.size()); keyMaterial.retarget(keyMaterialBytes.data(), keyMaterialBytes.size());
egressCipher.copyTo(keyMaterial.cropped(h256::size, egressCipher.size())); egressCipher.copyTo(keyMaterial.cropped(h256::size, egressCipher.size()));
m_egressMac.Update(keyMaterial.data(), keyMaterial.size()); m_egressMac.Update(keyMaterial.data(), keyMaterial.size());
// recover mac-secret by re-xoring remoteNonce // recover mac-secret by re-xoring remoteNonce
(*(h256*)keyMaterial.data() ^ _remoteNonce ^ _nonce).ref().copyTo(keyMaterial); (*(h256*)keyMaterial.data() ^ _remoteNonce ^ _nonce).ref().copyTo(keyMaterial);
bytesConstRef ingressCipher = _originated ? _ackCipher : _authCipher; bytesConstRef ingressCipher = _originated ? _ackCipher : _authCipher;
@ -144,7 +144,7 @@ void RLPXFrameCoder::writeFrame(RLPStream const& _header, bytesConstRef _payload
m_frameEnc.ProcessData(headerWithMac.data(), headerWithMac.data(), 16); m_frameEnc.ProcessData(headerWithMac.data(), headerWithMac.data(), 16);
updateEgressMACWithHeader(bytesConstRef(&headerWithMac).cropped(0, 16)); updateEgressMACWithHeader(bytesConstRef(&headerWithMac).cropped(0, 16));
egressDigest().ref().copyTo(bytesRef(&headerWithMac).cropped(h128::size,h128::size)); egressDigest().ref().copyTo(bytesRef(&headerWithMac).cropped(h128::size,h128::size));
auto padding = (16 - (_payload.size() % 16)) % 16; auto padding = (16 - (_payload.size() % 16)) % 16;
o_bytes.swap(headerWithMac); o_bytes.swap(headerWithMac);
o_bytes.resize(32 + _payload.size() + padding + h128::size); o_bytes.resize(32 + _payload.size() + padding + h128::size);

65
libp2p/RLPXFrameWriter.cpp

@ -32,12 +32,12 @@ void RLPXFrameWriter::enque(RLPXPacket&& _p, PacketPriority _priority)
{ {
if (!_p.isValid()) if (!_p.isValid())
return; return;
QueueState& qs = _priority ? m_q.first : m_q.second; WriterState& qs = _priority ? m_q.first : m_q.second;
DEV_GUARDED(qs.x) DEV_GUARDED(qs.x)
qs.q.push_back(move(_p)); qs.q.push_back(move(_p));
} }
void RLPXFrameWriter::enque(unsigned _packetType, RLPStream& _payload, PacketPriority _priority) void RLPXFrameWriter::enque(uint8_t _packetType, RLPStream& _payload, PacketPriority _priority)
{ {
enque(RLPXPacket(m_protocolType, (RLPStream() << _packetType), _payload), _priority); enque(RLPXPacket(m_protocolType, (RLPStream() << _packetType), _payload), _priority);
} }
@ -63,32 +63,45 @@ size_t RLPXFrameWriter::mux(RLPXFrameCoder& _coder, unsigned _size, vector<bytes
lowPending = !!m_q.second.q.size(); lowPending = !!m_q.second.q.size();
if (!highPending && !lowPending) if (!highPending && !lowPending)
return 0; return ret;
// first run when !swapQueues, high > low, otherwise low > high // first run when !swapQueues, high > low, otherwise low > high
bool high = highPending && !swapQueues ? true : lowPending ? false : true; bool high = highPending && !swapQueues ? true : lowPending ? false : true;
QueueState &qs = high ? m_q.first : m_q.second; WriterState &qs = high ? m_q.first : m_q.second;
size_t frameAllot = (!swapQueues && highPending && lowPending ? frameLen / 2 - (c_overhead + c_blockSize) > 0 ? frameLen / 2 : frameLen : frameLen) - c_overhead; size_t frameAllot = (!swapQueues && highPending && lowPending ? frameLen / 2 - (c_overhead + c_blockSize) > 0 ? frameLen / 2 : frameLen : frameLen) - c_overhead;
size_t offset = 0; size_t offset = 0;
size_t length = 0; size_t length = 0;
while (frameAllot >= c_blockSize) while (frameAllot >= c_blockSize)
{ {
// Invariants:
// !qs.writing && payload.empty() initial entry
// !qs.writing && !payload.empty() continuation (multiple packets per frame)
// qs.writing && payload.empty() initial entry, continuation (multiple frames per packet)
// qs.writing && !payload.empty() INVALID
// write packet-type
if (qs.writing == nullptr) if (qs.writing == nullptr)
{ {
DEV_GUARDED(qs.x) {
qs.writing = &qs.q[0]; Guard l(qs.x);
qs.sequenced = qs.writing->size() > frameAllot; if (!qs.q.empty())
qs.writing = &qs.q[0];
else
break;
}
// break here if we can't write-out packet-type // break here if we can't write-out packet-type
// or payload is packed and next packet won't fit (implicit) // or payload is packed and next packet won't fit (implicit)
if (qs.writing->type().size() > frameAllot || (qs.sequenced && !payload.empty())) qs.multiFrame = qs.writing->size() > frameAllot;
assert(qs.writing->type().size());
if (qs.writing->type().size() > frameAllot || (qs.multiFrame && !payload.empty()))
{ {
qs.writing = nullptr; qs.writing = nullptr;
qs.remaining = 0; qs.remaining = 0;
qs.sequenced = false; qs.multiFrame = false;
break; break;
} }
else if (qs.sequenced) else if (qs.multiFrame)
qs.sequence = ++m_sequenceId; qs.sequence = ++m_sequenceId;
frameAllot -= qs.writing->type().size(); frameAllot -= qs.writing->type().size();
@ -96,7 +109,9 @@ size_t RLPXFrameWriter::mux(RLPXFrameCoder& _coder, unsigned _size, vector<bytes
qs.remaining = qs.writing->data().size(); qs.remaining = qs.writing->data().size();
} }
assert(qs.sequenced || (!qs.sequenced && frameAllot >= qs.remaining));
// write payload w/remaining allotment
assert(qs.multiFrame || (!qs.multiFrame && frameAllot >= qs.remaining));
if (frameAllot && qs.remaining) if (frameAllot && qs.remaining)
{ {
offset = qs.writing->data().size() - qs.remaining; offset = qs.writing->data().size() - qs.remaining;
@ -106,16 +121,25 @@ size_t RLPXFrameWriter::mux(RLPXFrameCoder& _coder, unsigned _size, vector<bytes
frameAllot -= portion.size(); frameAllot -= portion.size();
payload += portion; payload += portion;
} }
if (!qs.remaining && ret++)
assert((!qs.remaining && (offset > 0 || !qs.multiFrame)) || (qs.remaining && qs.multiFrame));
if (!qs.remaining)
{
qs.writing = nullptr; qs.writing = nullptr;
if (qs.sequenced) DEV_GUARDED(qs.x)
qs.q.pop_front();
ret++;
}
// qs.writing is left alone for first frame of multi-frame packet
if (qs.multiFrame)
break; break;
} }
if (payload.size()) if (!payload.empty())
{ {
if (qs.sequenced) if (qs.multiFrame)
if (offset == 0) if (offset == 0)
// 1st frame of segmented packet writes total-size of packet
_coder.writeFrame(m_protocolType, qs.sequence, qs.writing->size(), &payload, payload); _coder.writeFrame(m_protocolType, qs.sequence, qs.writing->size(), &payload, payload);
else else
_coder.writeFrame(m_protocolType, qs.sequence, &payload, payload); _coder.writeFrame(m_protocolType, qs.sequence, &payload, payload);
@ -126,13 +150,8 @@ size_t RLPXFrameWriter::mux(RLPXFrameCoder& _coder, unsigned _size, vector<bytes
o_toWrite.push_back(payload); o_toWrite.push_back(payload);
payload.resize(0); payload.resize(0);
if (!qs.remaining) if (!qs.remaining && qs.multiFrame)
{ qs.multiFrame = false;
qs.writing = nullptr;
qs.sequenced = false;
DEV_GUARDED(qs.x)
qs.q.pop_front();
}
} }
else if (swapQueues) else if (swapQueues)
break; break;

27
libp2p/RLPXFrameWriter.h

@ -34,22 +34,27 @@ namespace p2p
{ {
/** /**
* RLPXFrameWriter * @brief Multiplex packets into encrypted RLPX frames.
* Multiplex packets into encrypted RLPX frames.
* @todo throw when enqueued packet is invalid * @todo throw when enqueued packet is invalid
* @todo use RLPXFrameInfo * @todo use RLPXFrameInfo
* @todo flag to disable multiple packets per frame
*/ */
class RLPXFrameWriter class RLPXFrameWriter
{ {
struct QueueState /**
* @brief Queue and state for Writer
* Properties are used independently;
* only valid packets should be added to q
* @todo implement as class
*/
struct WriterState
{ {
std::deque<RLPXPacket> q; std::deque<RLPXPacket> q;
mutable Mutex x;
RLPXPacket* writing = nullptr; RLPXPacket* writing = nullptr;
size_t remaining = 0; size_t remaining = 0;
bool sequenced = false; bool multiFrame = false;
uint16_t sequence; uint16_t sequence;
mutable Mutex x;
}; };
public: public:
@ -60,11 +65,13 @@ public:
RLPXFrameWriter(uint16_t _protocolType): m_protocolType(_protocolType) {} RLPXFrameWriter(uint16_t _protocolType): m_protocolType(_protocolType) {}
RLPXFrameWriter(RLPXFrameWriter const& _s): m_protocolType(_s.m_protocolType) {} RLPXFrameWriter(RLPXFrameWriter const& _s): m_protocolType(_s.m_protocolType) {}
/// Returns total number of queued packets. Thread-safe.
size_t size() const { size_t l; size_t h; DEV_GUARDED(m_q.first.x) h = m_q.first.q.size(); DEV_GUARDED(m_q.second.x) l = m_q.second.q.size(); return l + h; } size_t size() const { size_t l; size_t h; DEV_GUARDED(m_q.first.x) h = m_q.first.q.size(); DEV_GUARDED(m_q.second.x) l = m_q.second.q.size(); return l + h; }
/// Adds @_payload to queue (moves @_payload), to be muxed into frames by mux when network buffer is ready for writing. Thread-safe. /// Moves @_payload output to queue, to be muxed into frames by mux() when network buffer is ready for writing. Thread-safe.
void enque(unsigned _packetType, RLPStream& _payload, PacketPriority _priority = PriorityLow); void enque(uint8_t _packetType, RLPStream& _payload, PacketPriority _priority = PriorityLow);
/// Moves @_p to queue, to be muxed into frames by mux() when network buffer is ready for writing. Thread-safe.
void enque(RLPXPacket&& _p, PacketPriority _priority = PriorityLow); void enque(RLPXPacket&& _p, PacketPriority _priority = PriorityLow);
/// Returns number of packets framed and outputs frames to o_bytes. Not thread-safe. /// Returns number of packets framed and outputs frames to o_bytes. Not thread-safe.
@ -72,7 +79,7 @@ public:
private: private:
uint16_t const m_protocolType; // Protocol Type uint16_t const m_protocolType; // Protocol Type
std::pair<QueueState, QueueState> m_q; // High, Low frame queues std::pair<WriterState, WriterState> m_q; // High, Low frame queues
uint16_t m_sequenceId = 0; // Sequence ID uint16_t m_sequenceId = 0; // Sequence ID
}; };

52
test/libp2p/rlpx.cpp

@ -454,7 +454,7 @@ BOOST_AUTO_TEST_CASE(ecies_interop_test_primitives)
BOOST_REQUIRE(plainTest3 == expectedPlain3); BOOST_REQUIRE(plainTest3 == expectedPlain3);
} }
BOOST_AUTO_TEST_CASE(readerWriter) BOOST_AUTO_TEST_CASE(segmentedPacket)
{ {
ECDHE localEph; ECDHE localEph;
h256 localNonce = Nonce::get(); h256 localNonce = Nonce::get();
@ -464,11 +464,14 @@ BOOST_AUTO_TEST_CASE(readerWriter)
bytes authCipher{1}; bytes authCipher{1};
RLPXFrameCoder encoder(true, remoteEph.pubkey(), remoteNonce, localEph, localNonce, &ackCipher, &authCipher); RLPXFrameCoder encoder(true, remoteEph.pubkey(), remoteNonce, localEph, localNonce, &ackCipher, &authCipher);
/// Test writing a 64byte packet and drain with minimum frame size that /// Test writing a 64byte RLPStream and drain with frame size that
/// forces packet to be pieced into 4 frames. /// forces packet to be pieced into 4 frames.
/// (Minimum frame size has room for 16 bytes of payload) /// (Minimum frame size has room for 16 bytes of payload)
// 64-byte payload minus 3 bytes for packet-type and RLP overhead. // 64-byte payload minus 3 bytes for packet-type and RLP overhead.
// Note: mux() is called with RLPXFrameWriter::MinFrameDequeLength
// which is equal to 64byte, however, after overhead this means
// there are only 16 bytes of payload which will be dequed.
auto dequeLen = 16; auto dequeLen = 16;
bytes stuff = sha3("A").asBytes(); bytes stuff = sha3("A").asBytes();
bytes payload; bytes payload;
@ -526,5 +529,50 @@ BOOST_AUTO_TEST_CASE(readerWriter)
BOOST_REQUIRE_EQUAL(sha3(packets.front().type()), sha3(packetTypeRLP)); BOOST_REQUIRE_EQUAL(sha3(packets.front().type()), sha3(packetTypeRLP));
} }
BOOST_AUTO_TEST_CASE(coalescedPackets)
{
ECDHE localEph;
h256 localNonce = Nonce::get();
ECDHE remoteEph;
h256 remoteNonce = Nonce::get();
bytes ackCipher{0};
bytes authCipher{1};
RLPXFrameCoder encoder(true, remoteEph.pubkey(), remoteNonce, localEph, localNonce, &ackCipher, &authCipher);
/// Test writing four 32 byte RLPStream packets such that
/// a single 1KB frame will incldue all four packets.
auto dequeLen = 1024; // sufficient enough for all packets
bytes initialStuff = sha3("A").asBytes();
vector<h256> packets;
for (unsigned i = 0; i < 4; i++)
packets.push_back(sha3(initialStuff));
RLPXFrameWriter w(0);
uint8_t packetType = 127;
for (auto const& p: packets)
w.enque(packetType, (RLPStream() << p));
vector<bytes> encframes;
BOOST_REQUIRE_EQUAL(4, w.mux(encoder, dequeLen, encframes));
BOOST_REQUIRE_EQUAL(0, w.mux(encoder, dequeLen, encframes));
BOOST_REQUIRE_EQUAL(1, encframes.size());
auto expectedFrameSize = RLPXFrameWriter::EmptyFrameLength + packets.size() * (/*packet-type*/ 1 + h256::size + /*rlp-prefix*/ 1);
expectedFrameSize += ((16 - (expectedFrameSize % 16)) % 16);
BOOST_REQUIRE_EQUAL(expectedFrameSize, encframes[0].size());
}
BOOST_AUTO_TEST_CASE(singleFramePacket)
{
}
BOOST_AUTO_TEST_CASE(manyProtocols)
{
}
BOOST_AUTO_TEST_CASE(allOfSingleSegmentedCoalescedWithManyProtocols)
{
}
BOOST_AUTO_TEST_SUITE_END() BOOST_AUTO_TEST_SUITE_END()

Loading…
Cancel
Save