diff --git a/ethminer/CMakeLists.txt b/ethminer/CMakeLists.txt index df828bc47..90889ae12 100644 --- a/ethminer/CMakeLists.txt +++ b/ethminer/CMakeLists.txt @@ -9,6 +9,9 @@ if (JSONRPC) include_directories(BEFORE ${JSONCPP_INCLUDE_DIRS}) include_directories(${JSON_RPC_CPP_INCLUDE_DIRS}) endif() +if (ETHASHCL) + include_directories(${OpenCL_INCLUDE_DIRS}) +endif () set(EXECUTABLE ethminer) @@ -40,4 +43,3 @@ if (APPLE) else() eth_install_executable(${EXECUTABLE}) endif() - diff --git a/exp/CMakeLists.txt b/exp/CMakeLists.txt index 823425598..e6360317f 100644 --- a/exp/CMakeLists.txt +++ b/exp/CMakeLists.txt @@ -6,6 +6,9 @@ aux_source_directory(. SRC_LIST) include_directories(BEFORE ${JSONCPP_INCLUDE_DIRS}) include_directories(BEFORE ..) include_directories(${DB_INCLUDE_DIRS}) +if (ETHASHCL) + include_directories(${OpenCL_INCLUDE_DIRS}) +endif () set(EXECUTABLE exp) @@ -27,11 +30,6 @@ target_link_libraries(${EXECUTABLE} p2p) if (ETHASHCL) target_link_libraries(${EXECUTABLE} ethash-cl) target_link_libraries(${EXECUTABLE} ethash) - target_link_libraries(${EXECUTABLE} OpenCL) + target_link_libraries(${EXECUTABLE} ${OpenCL_LIBRARIES}) endif() -install( TARGETS ${EXECUTABLE} DESTINATION bin) - - - - - +install( TARGETS ${EXECUTABLE} DESTINATION bin) diff --git a/libdevcore/RLP.cpp b/libdevcore/RLP.cpp index 7e419e42b..fc49796a5 100644 --- a/libdevcore/RLP.cpp +++ b/libdevcore/RLP.cpp @@ -173,8 +173,14 @@ size_t RLP::length() const if (lengthSize > sizeof(ret)) // We did not check, but would most probably not fit in our memory. BOOST_THROW_EXCEPTION(UndersizeRLP()); + // No leading zeroes. + if (!m_data[1]) + BOOST_THROW_EXCEPTION(BadRLP()); for (unsigned i = 0; i < lengthSize; ++i) ret = (ret << 8) | m_data[i + 1]; + // Must be greater than the limit. + if (ret < c_rlpListStart - c_rlpDataImmLenStart - c_rlpMaxLengthBytes) + BOOST_THROW_EXCEPTION(BadRLP()); } else if (n <= c_rlpListIndLenZero) return n - c_rlpListStart; @@ -189,8 +195,12 @@ size_t RLP::length() const if (lengthSize > sizeof(ret)) // We did not check, but would most probably not fit in our memory. BOOST_THROW_EXCEPTION(UndersizeRLP()); + if (!m_data[1]) + BOOST_THROW_EXCEPTION(BadRLP()); for (unsigned i = 0; i < lengthSize; ++i) ret = (ret << 8) | m_data[i + 1]; + if (ret < 0x100 - c_rlpListStart - c_rlpMaxLengthBytes) + BOOST_THROW_EXCEPTION(BadRLP()); } // We have to be able to add payloadOffset to length without overflow. // This rejects roughly 4GB-sized RLPs on some platforms. @@ -203,7 +213,7 @@ size_t RLP::items() const { if (isList()) { - bytesConstRef d = payload().cropped(0, length()); + bytesConstRef d = payload(); size_t i = 0; for (; d.size(); ++i) d = d.cropped(sizeAsEncoded(d)); diff --git a/libdevcore/RLP.h b/libdevcore/RLP.h index 0a4d1b88b..a94c8ba55 100644 --- a/libdevcore/RLP.h +++ b/libdevcore/RLP.h @@ -232,25 +232,22 @@ public: template std::pair toPair() const { + if (itemCountStrict() != 2) + BOOST_THROW_EXCEPTION(BadCast()); std::pair ret; - if (isList()) - { - ret.first = (T)(*this)[0]; - ret.second = (U)(*this)[1]; - } + ret.first = (T)(*this)[0]; + ret.second = (U)(*this)[1]; return ret; } template std::array toArray() const { - if (itemCount() != N || !isList()) + if (itemCountStrict() != N) BOOST_THROW_EXCEPTION(BadCast()); std::array ret; for (size_t i = 0; i < N; ++i) - { ret[i] = (T)operator[](i); - } return ret; } @@ -281,7 +278,9 @@ public: template _N toHash(int _flags = Strict) const { requireGood(); - if (!isData() || (length() > _N::size && (_flags & FailIfTooBig)) || (length() < _N::size && (_flags & FailIfTooSmall))) + auto p = payload(); + auto l = p.size(); + if (!isData() || (l > _N::size && (_flags & FailIfTooBig)) || (l < _N::size && (_flags & FailIfTooSmall))) { if (_flags & ThrowOnFail) BOOST_THROW_EXCEPTION(BadCast()); @@ -290,8 +289,8 @@ public: } _N ret; - size_t s = std::min(_N::size, length()); - memcpy(ret.data() + _N::size - s, payload().data(), s); + size_t s = std::min(_N::size, l); + memcpy(ret.data() + _N::size - s, p.data(), s); return ret; } diff --git a/libp2p/RLPXFrameCoder.cpp b/libp2p/RLPXFrameCoder.cpp index 193c45511..83dc62409 100644 --- a/libp2p/RLPXFrameCoder.cpp +++ b/libp2p/RLPXFrameCoder.cpp @@ -23,42 +23,46 @@ #include #include "RLPxHandshake.h" +#include "RLPXPacket.h" using namespace std; using namespace dev; using namespace dev::p2p; using namespace CryptoPP; -RLPXFrameInfo::RLPXFrameInfo(bytesConstRef _header) +RLPXFrameInfo::RLPXFrameInfo(bytesConstRef _header): + length((_header[0] * 256 + _header[1]) * 256 + _header[2]), + padding((16 - (length % 16)) % 16), + data(_header.cropped(3).toBytes()), + header(RLP(data, RLP::ThrowOnFail | RLP::FailIfTooSmall)), + protocolId(header[0].toInt()), + multiFrame(header.itemCount() > 1), + sequenceId(multiFrame ? header[1].toInt() : 0), + totalLength(header.itemCount() == 3 ? header[2].toInt() : 0) +{} + +RLPXFrameCoder::RLPXFrameCoder(RLPXHandshake const& _init) { - length = (_header[0] * 256 + _header[1]) * 256 + _header[2]; - padding = ((16 - (length % 16)) % 16); - RLP header(_header.cropped(3), RLP::ThrowOnFail | RLP::FailIfTooSmall); - auto itemCount = header.itemCount(); - protocolId = header[0].toInt(); - hasSequence = itemCount > 1; - sequenceId = hasSequence ? header[1].toInt() : 0; - totalLength = itemCount == 3 ? header[2].toInt() : 0; + setup(_init.m_originated, _init.m_remoteEphemeral, _init.m_remoteNonce, _init.m_ecdhe, _init.m_nonce, &_init.m_ackCipher, &_init.m_authCipher); } -RLPXFrameCoder::RLPXFrameCoder(RLPXHandshake const& _init) +RLPXFrameCoder::RLPXFrameCoder(bool _originated, h512 const& _remoteEphemeral, h256 const& _remoteNonce, crypto::ECDHE const& _ecdhe, h256 const& _nonce, bytesConstRef _ackCipher, bytesConstRef _authCipher) { - // we need: - // originated? - // Secret == output of ecdhe agreement - // authCipher - // ackCipher + setup(_originated, _remoteEphemeral, _remoteNonce, _ecdhe, _nonce, _ackCipher, _authCipher); +} +void RLPXFrameCoder::setup(bool _originated, h512 const& _remoteEphemeral, h256 const& _remoteNonce, crypto::ECDHE const& _ecdhe, h256 const& _nonce, bytesConstRef _ackCipher, bytesConstRef _authCipher) +{ bytes keyMaterialBytes(64); bytesRef keyMaterial(&keyMaterialBytes); // shared-secret = sha3(ecdhe-shared-secret || sha3(nonce || initiator-nonce)) Secret ephemeralShared; - _init.m_ecdhe.agree(_init.m_remoteEphemeral, ephemeralShared); + _ecdhe.agree(_remoteEphemeral, ephemeralShared); ephemeralShared.ref().copyTo(keyMaterial.cropped(0, h256::size)); h512 nonceMaterial; - h256 const& leftNonce = _init.m_originated ? _init.m_remoteNonce : _init.m_nonce; - h256 const& rightNonce = _init.m_originated ? _init.m_nonce : _init.m_remoteNonce; + h256 const& leftNonce = _originated ? _remoteNonce : _nonce; + h256 const& rightNonce = _originated ? _nonce : _remoteNonce; leftNonce.ref().copyTo(nonceMaterial.ref().cropped(0, h256::size)); rightNonce.ref().copyTo(nonceMaterial.ref().cropped(h256::size, h256::size)); auto outRef(keyMaterial.cropped(h256::size, h256::size)); @@ -88,54 +92,81 @@ RLPXFrameCoder::RLPXFrameCoder(RLPXHandshake const& _init) // Recipient egress-mac: sha3(mac-secret^initiator-nonce || auth-sent-ack) // ingress-mac: sha3(mac-secret^recipient-nonce || auth-recvd-init) - (*(h256*)outRef.data() ^ _init.m_remoteNonce).ref().copyTo(keyMaterial); - bytes const& egressCipher = _init.m_originated ? _init.m_authCipher : _init.m_ackCipher; + (*(h256*)outRef.data() ^ _remoteNonce).ref().copyTo(keyMaterial); + bytesConstRef egressCipher = _originated ? _authCipher : _ackCipher; keyMaterialBytes.resize(h256::size + egressCipher.size()); keyMaterial.retarget(keyMaterialBytes.data(), keyMaterialBytes.size()); - bytesConstRef(&egressCipher).copyTo(keyMaterial.cropped(h256::size, egressCipher.size())); + egressCipher.copyTo(keyMaterial.cropped(h256::size, egressCipher.size())); m_egressMac.Update(keyMaterial.data(), keyMaterial.size()); // recover mac-secret by re-xoring remoteNonce - (*(h256*)keyMaterial.data() ^ _init.m_remoteNonce ^ _init.m_nonce).ref().copyTo(keyMaterial); - bytes const& ingressCipher = _init.m_originated ? _init.m_ackCipher : _init.m_authCipher; + (*(h256*)keyMaterial.data() ^ _remoteNonce ^ _nonce).ref().copyTo(keyMaterial); + bytesConstRef ingressCipher = _originated ? _ackCipher : _authCipher; keyMaterialBytes.resize(h256::size + ingressCipher.size()); keyMaterial.retarget(keyMaterialBytes.data(), keyMaterialBytes.size()); - bytesConstRef(&ingressCipher).copyTo(keyMaterial.cropped(h256::size, ingressCipher.size())); + ingressCipher.copyTo(keyMaterial.cropped(h256::size, ingressCipher.size())); m_ingressMac.Update(keyMaterial.data(), keyMaterial.size()); } -void RLPXFrameCoder::writeSingleFramePacket(bytesConstRef _packet, bytes& o_bytes) +void RLPXFrameCoder::writeFrame(uint16_t _protocolType, bytesConstRef _payload, bytes& o_bytes) { - // _packet = type || rlpList() + RLPStream header; + uint32_t len = (uint32_t)_payload.size(); + header.appendRaw(bytes({byte((len >> 16) & 0xff), byte((len >> 8) & 0xff), byte(len & 0xff)})); + header.appendList(1) << _protocolType; + writeFrame(header, _payload, o_bytes); +} +void RLPXFrameCoder::writeFrame(uint16_t _protocolType, uint16_t _seqId, bytesConstRef _payload, bytes& o_bytes) +{ RLPStream header; - uint32_t len = (uint32_t)_packet.size(); + uint32_t len = (uint32_t)_payload.size(); header.appendRaw(bytes({byte((len >> 16) & 0xff), byte((len >> 8) & 0xff), byte(len & 0xff)})); - // zeroHeader: []byte{0xC2, 0x80, 0x80}. Should be rlpList(protocolType,seqId,totalPacketSize). - header.appendRaw(bytes({0xc2,0x80,0x80})); - - // TODO: SECURITY check that header is <= 16 bytes + header.appendList(2) << _protocolType << _seqId; + writeFrame(header, _payload, o_bytes); +} +void RLPXFrameCoder::writeFrame(uint16_t _protocolType, uint16_t _seqId, uint32_t _totalSize, bytesConstRef _payload, bytes& o_bytes) +{ + RLPStream header; + uint32_t len = (uint32_t)_payload.size(); + header.appendRaw(bytes({byte((len >> 16) & 0xff), byte((len >> 8) & 0xff), byte(len & 0xff)})); + header.appendList(3) << _protocolType << _seqId << _totalSize; + writeFrame(header, _payload, o_bytes); +} + +void RLPXFrameCoder::writeFrame(RLPStream const& _header, bytesConstRef _payload, bytes& o_bytes) +{ + // TODO: SECURITY check header values && header <= 16 bytes bytes headerWithMac(h256::size); - bytesConstRef(&header.out()).copyTo(bytesRef(&headerWithMac)); + bytesConstRef(&_header.out()).copyTo(bytesRef(&headerWithMac)); m_frameEnc.ProcessData(headerWithMac.data(), headerWithMac.data(), 16); updateEgressMACWithHeader(bytesConstRef(&headerWithMac).cropped(0, 16)); egressDigest().ref().copyTo(bytesRef(&headerWithMac).cropped(h128::size,h128::size)); - auto padding = (16 - (_packet.size() % 16)) % 16; + auto padding = (16 - (_payload.size() % 16)) % 16; o_bytes.swap(headerWithMac); - o_bytes.resize(32 + _packet.size() + padding + h128::size); - bytesRef packetRef(o_bytes.data() + 32, _packet.size()); - m_frameEnc.ProcessData(packetRef.data(), _packet.data(), _packet.size()); - bytesRef paddingRef(o_bytes.data() + 32 + _packet.size(), padding); + o_bytes.resize(32 + _payload.size() + padding + h128::size); + bytesRef packetRef(o_bytes.data() + 32, _payload.size()); + m_frameEnc.ProcessData(packetRef.data(), _payload.data(), _payload.size()); + bytesRef paddingRef(o_bytes.data() + 32 + _payload.size(), padding); if (padding) m_frameEnc.ProcessData(paddingRef.data(), paddingRef.data(), padding); - bytesRef packetWithPaddingRef(o_bytes.data() + 32, _packet.size() + padding); + bytesRef packetWithPaddingRef(o_bytes.data() + 32, _payload.size() + padding); updateEgressMACWithFrame(packetWithPaddingRef); - bytesRef macRef(o_bytes.data() + 32 + _packet.size() + padding, h128::size); + bytesRef macRef(o_bytes.data() + 32 + _payload.size() + padding, h128::size); egressDigest().ref().copyTo(macRef); } +void RLPXFrameCoder::writeSingleFramePacket(bytesConstRef _packet, bytes& o_bytes) +{ + RLPStream header; + uint32_t len = (uint32_t)_packet.size(); + header.appendRaw(bytes({byte((len >> 16) & 0xff), byte((len >> 8) & 0xff), byte(len & 0xff)})); + header.appendRaw(bytes({0xc2,0x80,0x80})); + writeFrame(header, _packet, o_bytes); +} + bool RLPXFrameCoder::authAndDecryptHeader(bytesRef io) { asserts(io.size() == h256::size); diff --git a/libp2p/RLPXFrameCoder.h b/libp2p/RLPXFrameCoder.h index 3964326ff..19c648cd5 100644 --- a/libp2p/RLPXFrameCoder.h +++ b/libp2p/RLPXFrameCoder.h @@ -33,18 +33,28 @@ namespace dev namespace p2p { +struct RLPXFrameDecryptFailed: virtual dev::Exception {}; + +/** + * @brief Encapsulation of Frame + * @todo coder integration; padding derived from coder + */ struct RLPXFrameInfo { RLPXFrameInfo() = default; /// Constructor. frame-size || protocol-type, [sequence-id[, total-packet-size]] RLPXFrameInfo(bytesConstRef _frameHeader); - uint32_t length = 0; ///< Max: 2**24 - uint8_t padding = 0; + + uint32_t const length; ///< Size of frame (excludes padding). Max: 2**24 + uint8_t const padding; ///< Length of padding which follows @length. - uint16_t protocolId = 0; - bool hasSequence = false; - uint16_t sequenceId = 0; - uint32_t totalLength = 0; + bytes const data; ///< Bytes of Header. + RLP const header; ///< Header RLP. + + uint16_t const protocolId; ///< Protocol ID as negotiated by handshake. + bool const multiFrame; ///< If this frame is part of a sequence + uint16_t const sequenceId; ///< Sequence ID of frame + uint32_t const totalLength; ///< Set to total length of packet in first frame of multiframe packet }; class RLPXHandshake; @@ -52,8 +62,12 @@ class RLPXHandshake; /** * @brief Encoder/decoder transport for RLPx connection established by RLPXHandshake. * + * @todo rename to RLPXTranscoder + * @todo Remove 'Frame' nomenclature and expect caller to provide RLPXFrame + * @todo Remove handshake as friend, remove handshake-based constructor + * * Thread Safety - * Distinct Objects: Safe. + * Distinct Objects: Unsafe. * Shared objects: Unsafe. */ class RLPXFrameCoder @@ -61,12 +75,27 @@ class RLPXFrameCoder friend class RLPXFrameIOMux; friend class Session; public: - /// Constructor. - /// Requires instance of RLPXHandshake which has completed first two phases of handshake. + /// Construct; requires instance of RLPXHandshake which has encrypted ECDH key exchange (first two phases of handshake). RLPXFrameCoder(RLPXHandshake const& _init); + + /// Construct with external key material. + RLPXFrameCoder(bool _originated, h512 const& _remoteEphemeral, h256 const& _remoteNonce, crypto::ECDHE const& _ephemeral, h256 const& _nonce, bytesConstRef _ackCipher, bytesConstRef _authCipher); + ~RLPXFrameCoder() {} - /// Encrypt _packet as RLPx frame. + /// Establish shared secrets and setup AES and MAC states. + void setup(bool _originated, h512 const& _remoteEphemeral, h256 const& _remoteNonce, crypto::ECDHE const& _ephemeral, h256 const& _nonce, bytesConstRef _ackCipher, bytesConstRef _authCipher); + + /// Write single-frame payload of packet(s). + void writeFrame(uint16_t _protocolType, bytesConstRef _payload, bytes& o_bytes); + + /// Write continuation frame of segmented payload. + void writeFrame(uint16_t _protocolType, uint16_t _seqId, bytesConstRef _payload, bytes& o_bytes); + + /// Write first frame of segmented or sequence-tagged payload. + void writeFrame(uint16_t _protocolType, uint16_t _seqId, uint32_t _totalSize, bytesConstRef _payload, bytes& o_bytes); + + /// Legacy. Encrypt _packet as ill-defined legacy RLPx frame. void writeSingleFramePacket(bytesConstRef _packet, bytes& o_bytes); /// Authenticate and decrypt header in-place. @@ -82,6 +111,8 @@ public: h128 ingressDigest(); protected: + void writeFrame(RLPStream const& _header, bytesConstRef _payload, bytes& o_bytes); + /// Update state of egress MAC with frame header. void updateEgressMACWithHeader(bytesConstRef _headerCipher); diff --git a/libp2p/RLPXFrameReader.cpp b/libp2p/RLPXFrameReader.cpp new file mode 100644 index 000000000..68a329ad6 --- /dev/null +++ b/libp2p/RLPXFrameReader.cpp @@ -0,0 +1,90 @@ +/* + This file is part of cpp-ethereum. + + cpp-ethereum is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + cpp-ethereum is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with cpp-ethereum. If not, see . + */ +/** @file RLPXFrameReader.cpp + * @author Alex Leverington + * @date 2015 + */ + +#include "RLPXFrameReader.h" + +using namespace std; +using namespace dev; +using namespace dev::p2p; + +std::vector RLPXFrameReader::demux(RLPXFrameCoder& _coder, RLPXFrameInfo const& _info, bytesRef _frame) +{ + if (!_coder.authAndDecryptFrame(_frame)) + BOOST_THROW_EXCEPTION(RLPXFrameDecryptFailed()); + + std::vector ret; + if (_frame.empty()) + // drop: bad frame (empty) + return ret; + if (_info.multiFrame && _info.totalLength && _frame.size() > _info.totalLength) + // drop: bad frame (too large) + return ret; + + // trim mac + bytesConstRef buffer = _frame.cropped(0, _frame.size() - (h128::size + _info.padding)); + // continue populating multiframe packets + if (_info.multiFrame && m_incomplete.count(_info.sequenceId)) + { + uint32_t& remaining = m_incomplete.at(_info.sequenceId).second; + if (!_info.totalLength && buffer.size() > 0 && buffer.size() <= remaining) + { + remaining -= buffer.size(); + + RLPXPacket& p = m_incomplete.at(_info.sequenceId).first; + if(p.append(buffer) && !remaining) + ret.push_back(std::move(p)); + if (!remaining) + m_incomplete.erase(_info.sequenceId); + + if (!ret.empty() && remaining) + BOOST_THROW_EXCEPTION(RLPXInvalidPacket()); + else if (ret.empty() && !remaining) + BOOST_THROW_EXCEPTION(RLPXInvalidPacket()); + + return ret; + } + else + m_incomplete.erase(_info.sequenceId); + } + + while (!buffer.empty()) + { + auto type = nextRLP(buffer); + if (type.empty()) + break; + buffer = buffer.cropped(type.size()); + // consume entire buffer if packet has sequence + auto packet = _info.multiFrame ? buffer : nextRLP(buffer); + buffer = buffer.cropped(packet.size()); + RLPXPacket p(m_protocolType, type); + if (!packet.empty()) + p.append(packet); + + uint32_t remaining = _info.totalLength - type.size() - packet.size(); + if (p.isValid()) + ret.push_back(std::move(p)); + else if (_info.multiFrame && remaining) + m_incomplete.insert(std::make_pair(_info.sequenceId, std::make_pair(std::move(p), remaining))); + else + BOOST_THROW_EXCEPTION(RLPXInvalidPacket()); + } + return ret; +} diff --git a/libp2p/RLPXFrameReader.h b/libp2p/RLPXFrameReader.h new file mode 100644 index 000000000..20143de24 --- /dev/null +++ b/libp2p/RLPXFrameReader.h @@ -0,0 +1,55 @@ +/* + This file is part of cpp-ethereum. + + cpp-ethereum is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + cpp-ethereum is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with cpp-ethereum. If not, see . + */ +/** @file RLPXFrameReader.h + * @author Alex Leverington + * @date 2015 + */ + + +#pragma once + +#include +#include "RLPXFrameCoder.h" +#include "RLPXPacket.h" +namespace ba = boost::asio; +namespace bi = boost::asio::ip; + +namespace dev +{ +namespace p2p +{ + +/** + * RLPFrameReader + * Reads and assembles RLPX frame byte buffers into RLPX packets. Additionally + * buffers incomplete packets which are pieced into multiple frames (has sequence). + */ +class RLPXFrameReader +{ +public: + RLPXFrameReader(uint16_t _protocolType): m_protocolType(_protocolType) {} + + /// Processes a single frame returning complete packets. + std::vector demux(RLPXFrameCoder& _coder, RLPXFrameInfo const& _info, bytesRef _frame); + +protected: + uint16_t m_protocolType; + std::map> m_incomplete; ///< Sequence: Incomplete packet and bytes remaining. +}; + +} +} \ No newline at end of file diff --git a/libp2p/RLPXFrameWriter.cpp b/libp2p/RLPXFrameWriter.cpp new file mode 100644 index 000000000..82f81567a --- /dev/null +++ b/libp2p/RLPXFrameWriter.cpp @@ -0,0 +1,161 @@ +/* + This file is part of cpp-ethereum. + + cpp-ethereum is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + cpp-ethereum is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with cpp-ethereum. If not, see . + */ +/** @file RLPXFrameWriter.cpp + * @author Alex Leverington + * @date 2015 + */ + +#include "RLPXFrameWriter.h" + +using namespace std; +using namespace dev; +using namespace dev::p2p; + +const uint16_t RLPXFrameWriter::EmptyFrameLength = h128::size * 3; // header + headerMAC + frameMAC +const uint16_t RLPXFrameWriter::MinFrameDequeLength = h128::size * 4; // header + headerMAC + padded-block + frameMAC + +void RLPXFrameWriter::enque(RLPXPacket&& _p, PacketPriority _priority) +{ + if (!_p.isValid()) + return; + WriterState& qs = _priority ? m_q.first : m_q.second; + DEV_GUARDED(qs.x) + qs.q.push_back(move(_p)); +} + +void RLPXFrameWriter::enque(uint8_t _packetType, RLPStream& _payload, PacketPriority _priority) +{ + enque(RLPXPacket(m_protocolType, (RLPStream() << _packetType), _payload), _priority); +} + +size_t RLPXFrameWriter::mux(RLPXFrameCoder& _coder, unsigned _size, vector& o_toWrite) +{ + static const size_t c_blockSize = h128::size; + static const size_t c_overhead = c_blockSize * 3; // header + headerMac + frameMAC + if (_size < c_overhead + c_blockSize) + return 0; + + size_t ret = 0; + size_t frameLen = _size / 16 * 16; + bytes payload(0); + bool swapQueues = false; + while (frameLen >= c_overhead + c_blockSize) + { + bool highPending; + bool lowPending; + DEV_GUARDED(m_q.first.x) + highPending = !!m_q.first.q.size(); + DEV_GUARDED(m_q.second.x) + lowPending = !!m_q.second.q.size(); + + if (!highPending && !lowPending) + return ret; + + // first run when !swapQueues, high > low, otherwise low > high + bool high = highPending && !swapQueues ? true : !lowPending; + WriterState &qs = high ? m_q.first : m_q.second; + size_t frameAllot = (!swapQueues && highPending && lowPending ? frameLen / 2 - (c_overhead + c_blockSize) > 0 ? frameLen / 2 : frameLen : frameLen) - c_overhead; + size_t offset = 0; + size_t length = 0; + while (frameAllot >= c_blockSize) + { + // Invariants: + // !qs.writing && payload.empty() initial entry + // !qs.writing && !payload.empty() continuation (multiple packets per frame) + // qs.writing && payload.empty() initial entry, continuation (multiple frames per packet) + // qs.writing && !payload.empty() INVALID + + // write packet-type + if (qs.writing == nullptr) + { + { + Guard l(qs.x); + if (!qs.q.empty()) + qs.writing = &qs.q[0]; + else + break; + } + + // break here if we can't write-out packet-type + // or payload is packed and next packet won't fit (implicit) + qs.multiFrame = qs.writing->size() > frameAllot; + assert(qs.writing->type().size()); + if (qs.writing->type().size() > frameAllot || (qs.multiFrame && !payload.empty())) + { + qs.writing = nullptr; + qs.remaining = 0; + qs.multiFrame = false; + break; + } + else if (qs.multiFrame) + qs.sequence = ++m_sequenceId; + + frameAllot -= qs.writing->type().size(); + payload += qs.writing->type(); + + qs.remaining = qs.writing->data().size(); + } + + // write payload w/remaining allotment + assert(qs.multiFrame || (!qs.multiFrame && frameAllot >= qs.remaining)); + if (frameAllot && qs.remaining) + { + offset = qs.writing->data().size() - qs.remaining; + length = qs.remaining <= frameAllot ? qs.remaining : frameAllot; + bytes portion = bytesConstRef(&qs.writing->data()).cropped(offset, length).toBytes(); + qs.remaining -= length; + frameAllot -= portion.size(); + payload += portion; + } + + assert((!qs.remaining && (offset > 0 || !qs.multiFrame)) || (qs.remaining && qs.multiFrame)); + if (!qs.remaining) + { + qs.writing = nullptr; + DEV_GUARDED(qs.x) + qs.q.pop_front(); + ret++; + } + // qs.writing is left alone for first frame of multi-frame packet + if (qs.multiFrame) + break; + } + + if (!payload.empty()) + { + if (qs.multiFrame) + if (offset == 0) + // 1st frame of segmented packet writes total-size of packet + _coder.writeFrame(m_protocolType, qs.sequence, qs.writing->size(), &payload, payload); + else + _coder.writeFrame(m_protocolType, qs.sequence, &payload, payload); + else + _coder.writeFrame(m_protocolType, &payload, payload); + assert((int)frameLen - payload.size() >= 0); + frameLen -= payload.size(); + o_toWrite.push_back(payload); + payload.resize(0); + + if (!qs.remaining && qs.multiFrame) + qs.multiFrame = false; + } + else if (swapQueues) + break; + swapQueues = true; + } + return ret; +} diff --git a/libp2p/RLPXFrameWriter.h b/libp2p/RLPXFrameWriter.h new file mode 100644 index 000000000..e2600a147 --- /dev/null +++ b/libp2p/RLPXFrameWriter.h @@ -0,0 +1,88 @@ +/* + This file is part of cpp-ethereum. + + cpp-ethereum is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + cpp-ethereum is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with cpp-ethereum. If not, see . + */ +/** @file RLPXperimental.h + * @author Alex Leverington + * @date 2015 + */ + + +#pragma once + +#include +#include "RLPXFrameCoder.h" +#include "RLPXPacket.h" +namespace ba = boost::asio; +namespace bi = boost::asio::ip; + +namespace dev +{ +namespace p2p +{ + +/** + * @brief Multiplex packets into encrypted RLPX frames. + * @todo throw when enqueued packet is invalid + * @todo use RLPXFrameInfo + */ +class RLPXFrameWriter +{ + /** + * @brief Queue and state for Writer + * Properties are used independently; + * only valid packets should be added to q + * @todo implement as class + */ + struct WriterState + { + std::deque q; + mutable Mutex x; + + RLPXPacket* writing = nullptr; + size_t remaining = 0; + bool multiFrame = false; + uint16_t sequence = -1; + }; + +public: + enum PacketPriority { PriorityLow = 0, PriorityHigh }; + static const uint16_t EmptyFrameLength; + static const uint16_t MinFrameDequeLength; + + RLPXFrameWriter(uint16_t _protocolType): m_protocolType(_protocolType) {} + RLPXFrameWriter(RLPXFrameWriter const& _s): m_protocolType(_s.m_protocolType) {} + + /// Returns total number of queued packets. Thread-safe. + size_t size() const { size_t l; size_t h; DEV_GUARDED(m_q.first.x) h = m_q.first.q.size(); DEV_GUARDED(m_q.second.x) l = m_q.second.q.size(); return l + h; } + + /// Moves @_payload output to queue, to be muxed into frames by mux() when network buffer is ready for writing. Thread-safe. + void enque(uint8_t _packetType, RLPStream& _payload, PacketPriority _priority = PriorityLow); + + /// Returns number of packets framed and outputs frames to o_bytes. Not thread-safe. + size_t mux(RLPXFrameCoder& _coder, unsigned _size, std::vector& o_toWrite); + +protected: + /// Moves @_p to queue, to be muxed into frames by mux() when network buffer is ready for writing. Thread-safe. + void enque(RLPXPacket&& _p, PacketPriority _priority = PriorityLow); + +private: + uint16_t const m_protocolType; // Protocol Type + std::pair m_q; // High, Low frame queues + uint16_t m_sequenceId = 0; // Sequence ID +}; + +} +} diff --git a/libp2p/RLPXPacket.cpp b/libp2p/RLPXPacket.cpp new file mode 100644 index 000000000..e69de29bb diff --git a/libp2p/RLPXPacket.h b/libp2p/RLPXPacket.h new file mode 100644 index 000000000..27a4f2853 --- /dev/null +++ b/libp2p/RLPXPacket.h @@ -0,0 +1,69 @@ +/* + This file is part of cpp-ethereum. + + cpp-ethereum is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + cpp-ethereum is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with cpp-ethereum. If not, see . + */ +/** @file RLPXPacket.h + * @author Alex Leverington + * @date 2015 + */ + +#pragma once + +#include +#include "Common.h" + +namespace dev +{ +namespace p2p +{ + +struct RLPXInvalidPacket: virtual dev::Exception {}; + +static bytesConstRef nextRLP(bytesConstRef _b) { try { RLP r(_b, RLP::AllowNonCanon); return _b.cropped(0, std::min((size_t)r.actualSize(), _b.size())); } catch(...) {} return bytesConstRef(); } + +/** + * RLPX Packet + */ +class RLPXPacket +{ +public: + /// Construct packet. RLPStream data is invalidated. + RLPXPacket(uint8_t _capId, RLPStream& _type, RLPStream& _data): m_cap(_capId), m_type(_type.out()), m_data(_data.out()) {} + + /// Construct packet from single bytestream. RLPStream data is invalidated. + RLPXPacket(unsigned _capId, bytesConstRef _in): m_cap(_capId), m_type(nextRLP(_in).toBytes()) { if (_in.size() > m_type.size()) { m_data.resize(_in.size() - m_type.size()); _in.cropped(m_type.size()).copyTo(&m_data); } } + + RLPXPacket(RLPXPacket const& _p) = delete; + RLPXPacket(RLPXPacket&& _p): m_cap(_p.m_cap), m_type(_p.m_type), m_data(_p.m_data) {} + + bytes const& type() const { return m_type; } + + bytes const& data() const { return m_data; } + + size_t size() const { try { return RLP(m_type).actualSize() + RLP(m_data, RLP::LaissezFaire).actualSize(); } catch(...) { return 0; } } + + /// Appends byte data and returns if packet is valid. + bool append(bytesConstRef _in) { auto offset = m_data.size(); m_data.resize(offset + _in.size()); _in.copyTo(bytesRef(&m_data).cropped(offset)); return isValid(); } + + virtual bool isValid() const noexcept { try { return !(m_type.empty() && m_data.empty()) && RLP(m_type).actualSize() == m_type.size() && RLP(m_data).actualSize() == m_data.size(); } catch (...) {} return false; } + +protected: + uint8_t m_cap; + bytes m_type; + bytes m_data; +}; + +} +} \ No newline at end of file diff --git a/libp2p/RLPXSocketIO.cpp b/libp2p/RLPXSocketIO.cpp new file mode 100644 index 000000000..deb47a728 --- /dev/null +++ b/libp2p/RLPXSocketIO.cpp @@ -0,0 +1,112 @@ +/* + This file is part of cpp-ethereum. + + cpp-ethereum is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + cpp-ethereum is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with cpp-ethereum. If not, see . + */ +/** @file RLPXSocketIO.cpp + * @author Alex Leverington + * @date 2015 + */ + +#include "RLPXSocketIO.h" + +#include +using namespace std; +using namespace dev; +using namespace dev::p2p; + +uint32_t const RLPXSocketIO::MinFrameSize = h128::size * 3; // header + block + mac +uint32_t const RLPXSocketIO::MaxPacketSize = 1 << 24; +uint16_t const RLPXSocketIO::DefaultInitialCapacity = 8 << 8; + +RLPXSocketIO::RLPXSocketIO(unsigned _protCount, RLPXFrameCoder& _coder, bi::tcp::socket& _socket, bool _flowControl, size_t _initialCapacity): + m_flowControl(_flowControl), + m_coder(_coder), + m_socket(_socket), + m_writers(move(writers(_protCount))), + m_egressCapacity(m_flowControl ? _initialCapacity : MaxPacketSize * m_writers.size()) +{} + +vector RLPXSocketIO::writers(unsigned _capacity) +{ + vector ret; + for (unsigned i = 0; i < _capacity; i++) + ret.push_back(RLPXFrameWriter(i)); + return ret; +} + +void RLPXSocketIO::send(unsigned _protocolType, unsigned _type, RLPStream& _payload) +{ + if (!m_socket.is_open()) + return; // TCPSocketNotOpen + m_writers.at(_protocolType).enque(_type, _payload); + bool wasEmtpy = false; + DEV_GUARDED(x_queued) + wasEmtpy = (++m_queued == 1); + if (wasEmtpy) + doWrite(); +} + +void RLPXSocketIO::doWrite() +{ + m_toSend.clear(); + + size_t capacity; + DEV_GUARDED(x_queued) + capacity = min(m_egressCapacity, MaxPacketSize); + + size_t active = 0; + for (auto const& w: m_writers) + if (w.size()) + active += 1; + size_t dequed = 0; + size_t protFrameSize = capacity / active; + if (protFrameSize >= MinFrameSize) + for (auto& w: m_writers) + dequed += w.mux(m_coder, protFrameSize, m_toSend); + + if (dequed) + write(dequed); + else + deferWrite(); +} + +void RLPXSocketIO::deferWrite() +{ + auto self(shared_from_this()); + m_congestion.reset(new ba::deadline_timer(m_socket.get_io_service())); + m_congestion->expires_from_now(boost::posix_time::milliseconds(50)); + m_congestion->async_wait([=](boost::system::error_code const& _ec) { m_congestion.reset(); if (!_ec) doWrite(); }); +} + +void RLPXSocketIO::write(size_t _dequed) +{ + auto self(shared_from_this()); + ba::async_write(m_socket, ba::buffer(m_toSend), [this, self, _dequed](boost::system::error_code ec, size_t written) + { + if (ec) + return; // TCPSocketWriteError + + bool reschedule = false; + DEV_GUARDED(x_queued) + { + if (m_flowControl) + m_egressCapacity -= written; + m_queued -= _dequed; + reschedule = m_queued > 0; + } + if (reschedule) + doWrite(); + }); +} \ No newline at end of file diff --git a/libp2p/RLPXSocketIO.h b/libp2p/RLPXSocketIO.h new file mode 100644 index 000000000..47dd5aa9c --- /dev/null +++ b/libp2p/RLPXSocketIO.h @@ -0,0 +1,71 @@ +/* + This file is part of cpp-ethereum. + + cpp-ethereum is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + cpp-ethereum is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with cpp-ethereum. If not, see . + */ +/** @file RLPXSocketIO.h + * @author Alex Leverington + * @date 2015 + */ + +#pragma once + +#include "RLPXFrameWriter.h" +namespace ba = boost::asio; +namespace bi = boost::asio::ip; + +namespace dev +{ +namespace p2p +{ + +class RLPXSocketIO: public std::enable_shared_from_this +{ +public: + static uint32_t const MinFrameSize; + static uint32_t const MaxPacketSize; + static uint16_t const DefaultInitialCapacity; + + RLPXSocketIO(unsigned _protCount, RLPXFrameCoder& _coder, bi::tcp::socket& _socket, bool _flowControl = true, size_t _initialCapacity = DefaultInitialCapacity); + + void send(unsigned _protocolType, unsigned _type, RLPStream& _payload); + + void doWrite(); + + bool congested() const { return !!m_congestion; } + +private: + static std::vector writers(unsigned _capacity); + + void deferWrite(); + + void write(size_t _dequed); + + bool const m_flowControl; ///< True if flow control is enabled. + + RLPXFrameCoder& m_coder; ///< Encoder/decoder of frame payloads. + bi::tcp::socket& m_socket; + + std::vector m_toSend; ///< Reusable byte buffer for pending socket writes. + + std::vector m_writers; ///< Write queues for each protocol. TODO: map to bytes (of capability) + std::unique_ptr m_congestion; ///< Scheduled when writes are deferred due to congestion. + + Mutex x_queued; + unsigned m_queued = 0; ///< Track total queued packets to ensure single write loop + uint32_t m_egressCapacity; +}; + +} +} \ No newline at end of file diff --git a/libp2p/Session.cpp b/libp2p/Session.cpp index 9a981786d..0d90ea6ea 100644 --- a/libp2p/Session.cpp +++ b/libp2p/Session.cpp @@ -375,10 +375,16 @@ void Session::doRead() return; } - RLPXFrameInfo header; + + uint16_t hProtocolId; + uint32_t hLength; + uint8_t hPadding; try { - header = RLPXFrameInfo(bytesConstRef(m_data.data(), length)); + RLPXFrameInfo header(bytesConstRef(m_data.data(), length)); + hProtocolId = header.protocolId; + hLength = header.length; + hPadding = header.padding; } catch (std::exception const& _e) { @@ -388,8 +394,8 @@ void Session::doRead() } /// read padded frame and mac - auto tlen = header.length + header.padding + h128::size; - ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, tlen), [this, self, header, tlen](boost::system::error_code ec, std::size_t length) + auto tlen = hLength + hPadding + h128::size; + ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, tlen), [this, self, hLength, hProtocolId, tlen](boost::system::error_code ec, std::size_t length) { ThreadContext tc(info().id.abridged()); ThreadContext tc2(info().clientVersion); @@ -402,7 +408,7 @@ void Session::doRead() return; } - bytesConstRef frame(m_data.data(), header.length); + bytesConstRef frame(m_data.data(), hLength); if (!checkPacket(frame)) { cerr << "Received " << frame.size() << ": " << toHex(frame) << endl; @@ -414,7 +420,7 @@ void Session::doRead() { auto packetType = (PacketType)RLP(frame.cropped(0, 1)).toInt(); RLP r(frame.cropped(1)); - if (!readPacket(header.protocolId, packetType, r)) + if (!readPacket(hProtocolId, packetType, r)) clog(NetWarn) << "Couldn't interpret packet." << RLP(r); } doRead(); diff --git a/test/libp2p/rlpx.cpp b/test/libp2p/rlpx.cpp index 8251e94e2..82c781d7d 100644 --- a/test/libp2p/rlpx.cpp +++ b/test/libp2p/rlpx.cpp @@ -30,10 +30,13 @@ #include #include #include +#include +#include using namespace std; using namespace dev; using namespace dev::crypto; +using namespace dev::p2p; using namespace CryptoPP; BOOST_AUTO_TEST_SUITE(rlpx) @@ -450,5 +453,190 @@ BOOST_AUTO_TEST_CASE(ecies_interop_test_primitives) BOOST_REQUIRE(plainTest3 == expectedPlain3); } +BOOST_AUTO_TEST_CASE(segmentedPacketFlush) +{ + ECDHE localEph; + h256 localNonce = Nonce::get(); + ECDHE remoteEph; + h256 remoteNonce = Nonce::get(); + bytes ackCipher{0}; + bytes authCipher{1}; + RLPXFrameCoder encoder(true, remoteEph.pubkey(), remoteNonce, localEph, localNonce, &ackCipher, &authCipher); + + /// Test writing a 64byte RLPStream and drain with frame size that + /// forces packet to be pieced into 4 frames. + /// (Minimum frame size has room for 16 bytes of payload) + + // 64-byte payload minus 3 bytes for packet-type and RLP overhead. + // Note: mux() is called with RLPXFrameWriter::MinFrameDequeLength + // which is equal to 64byte, however, after overhead this means + // there are only 16 bytes of payload which will be dequed. + auto dequeLen = 16; + bytes stuff = sha3("A").asBytes(); + bytes payload; + payload += stuff; + payload += stuff; + payload.resize(payload.size() - 3 /* packet-type, rlp-overhead */); + BOOST_REQUIRE_EQUAL(61, payload.size()); + + auto drains = (payload.size() + 3) / dequeLen; + BOOST_REQUIRE_EQUAL(4, drains); + + RLPXFrameWriter w(0); + RLPStream rlpPayload(RLPStream() << payload); + uint8_t packetType = 0; + bytes packetTypeRLP = (RLPStream() << packetType).out(); + w.enque(packetType, rlpPayload); + vector encframes; + for (unsigned i = 1; i < drains; i++) + { + auto n = w.mux(encoder, RLPXFrameWriter::MinFrameDequeLength, encframes); + BOOST_REQUIRE_EQUAL(0, n); + BOOST_REQUIRE_EQUAL(encframes.size(), i); + } + BOOST_REQUIRE_EQUAL(1, w.mux(encoder, RLPXFrameWriter::MinFrameDequeLength, encframes)); + BOOST_REQUIRE_EQUAL(encframes.size(), drains); + BOOST_REQUIRE_EQUAL(0, w.mux(encoder, RLPXFrameWriter::MinFrameDequeLength, encframes)); + BOOST_REQUIRE_EQUAL(encframes.size(), drains); + + // we should now have a bunch of ciphertext in encframes + BOOST_REQUIRE(encframes.size() == drains); + for (auto const& c: encframes) + { + BOOST_REQUIRE_EQUAL(c.size(), RLPXFrameWriter::MinFrameDequeLength); + } + + // read and assemble dequed encframes + RLPXFrameCoder decoder(false, localEph.pubkey(), localNonce, remoteEph, remoteNonce, &ackCipher, &authCipher); + vector packets; + RLPXFrameReader r(0); + for (size_t i = 0; i < encframes.size(); i++) + { + bytesRef frameWithHeader(encframes[i].data(), encframes[i].size()); + bytesRef header = frameWithHeader.cropped(0, h256::size); + bool decryptedHeader = decoder.authAndDecryptHeader(header); + BOOST_REQUIRE(decryptedHeader); + bytesRef frame = frameWithHeader.cropped(h256::size); + RLPXFrameInfo f(header); + for (RLPXPacket& p: r.demux(decoder, f, frame)) + packets.push_back(move(p)); + } + BOOST_REQUIRE_EQUAL(packets.size(), 1); + BOOST_REQUIRE_EQUAL(packets.front().size(), packetTypeRLP.size() + rlpPayload.out().size()); + BOOST_REQUIRE_EQUAL(sha3(RLP(packets.front().data()).payload()), sha3(payload)); + BOOST_REQUIRE_EQUAL(sha3(packets.front().type()), sha3(packetTypeRLP)); +} + +BOOST_AUTO_TEST_CASE(coalescedPacketsPadded) +{ + ECDHE localEph; + h256 localNonce = Nonce::get(); + ECDHE remoteEph; + h256 remoteNonce = Nonce::get(); + bytes ackCipher{0}; + bytes authCipher{1}; + RLPXFrameCoder encoder(true, remoteEph.pubkey(), remoteNonce, localEph, localNonce, &ackCipher, &authCipher); + + /// Test writing four 32 byte RLPStream packets such that + /// a single 1KB frame will incldue all four packets. + auto dequeLen = 1024; // sufficient enough for all packets + bytes stuff = sha3("A").asBytes(); + vector packetsOut; + for (unsigned i = 0; i < 4; i++) + packetsOut.push_back(stuff); + + RLPXFrameWriter w(0); + uint8_t packetType = 127; + bytes packetTypeRLP((RLPStream() << packetType).out()); + for (auto const& p: packetsOut) + w.enque(packetType, (RLPStream() << p)); + + vector encframes; + BOOST_REQUIRE_EQUAL(4, w.mux(encoder, dequeLen, encframes)); + BOOST_REQUIRE_EQUAL(0, w.mux(encoder, dequeLen, encframes)); + BOOST_REQUIRE_EQUAL(1, encframes.size()); + auto expectedFrameSize = RLPXFrameWriter::EmptyFrameLength + packetsOut.size() * (/*packet-type*/ 1 + h256::size + /*rlp-prefix*/ 1); + expectedFrameSize += ((16 - (expectedFrameSize % 16)) % 16); + BOOST_REQUIRE_EQUAL(expectedFrameSize, encframes[0].size()); + + // read and assemble dequed encframes + RLPXFrameCoder decoder(false, localEph.pubkey(), localNonce, remoteEph, remoteNonce, &ackCipher, &authCipher); + vector packets; + RLPXFrameReader r(0); + bytesRef frameWithHeader(encframes[0].data(), encframes[0].size()); + bytesRef header = frameWithHeader.cropped(0, h256::size); + bool decryptedHeader = decoder.authAndDecryptHeader(header); + BOOST_REQUIRE(decryptedHeader); + bytesRef frame = frameWithHeader.cropped(h256::size); + RLPXFrameInfo f(header); + BOOST_REQUIRE_EQUAL(f.multiFrame, false); + for (RLPXPacket& p: r.demux(decoder, f, frame)) + packets.push_back(move(p)); + + RLPStream rlpPayload; + rlpPayload << stuff; + BOOST_REQUIRE_EQUAL(packets.size(), 4); + while (!packets.empty()) + { + BOOST_REQUIRE_EQUAL(packets.back().size(), packetTypeRLP.size() + rlpPayload.out().size()); + BOOST_REQUIRE_EQUAL(sha3(RLP(packets.back().data()).payload()), sha3(stuff)); + BOOST_REQUIRE_EQUAL(sha3(packets.back().type()), sha3(packetTypeRLP)); + packets.pop_back(); + } +} + +BOOST_AUTO_TEST_CASE(singleFramePacketFlush) +{ + ECDHE localEph; + h256 localNonce = Nonce::get(); + ECDHE remoteEph; + h256 remoteNonce = Nonce::get(); + bytes ackCipher{0}; + bytes authCipher{1}; + RLPXFrameCoder encoder(true, remoteEph.pubkey(), remoteNonce, localEph, localNonce, &ackCipher, &authCipher); + + /// Test writing four 32 byte RLPStream packets such that + /// a single 1KB frame will incldue all four packets. + bytes stuff = sha3("A").asBytes(); + RLPXFrameWriter w(0); + uint8_t packetType = 127; + bytes packetTypeRLP((RLPStream() << packetType).out()); + w.enque(packetType, (RLPStream() << stuff)); + + vector encframes; + auto dequeLen = RLPXFrameWriter::EmptyFrameLength + 34; + dequeLen += ((16 - (dequeLen % 16)) % 16); + BOOST_REQUIRE_EQUAL(1, w.mux(encoder, dequeLen, encframes)); + BOOST_REQUIRE_EQUAL(0, w.mux(encoder, dequeLen, encframes)); + BOOST_REQUIRE_EQUAL(1, encframes.size()); + BOOST_REQUIRE_EQUAL(dequeLen, encframes[0].size()); + + // read and assemble dequed encframes + RLPXFrameCoder decoder(false, localEph.pubkey(), localNonce, remoteEph, remoteNonce, &ackCipher, &authCipher); + vector packets; + RLPXFrameReader r(0); + bytesRef frameWithHeader(encframes[0].data(), encframes[0].size()); + bytesRef header = frameWithHeader.cropped(0, h256::size); + bool decryptedHeader = decoder.authAndDecryptHeader(header); + BOOST_REQUIRE(decryptedHeader); + bytesRef frame = frameWithHeader.cropped(h256::size); + RLPXFrameInfo f(header); + BOOST_REQUIRE_EQUAL(f.multiFrame, false); + for (RLPXPacket& p: r.demux(decoder, f, frame)) + packets.push_back(move(p)); + + RLPStream rlpPayload; + rlpPayload << stuff; + BOOST_REQUIRE_EQUAL(packets.size(), 1); + BOOST_REQUIRE_EQUAL(packets.back().size(), packetTypeRLP.size() + rlpPayload.out().size()); + BOOST_REQUIRE_EQUAL(sha3(RLP(packets.back().data()).payload()), sha3(stuff)); + BOOST_REQUIRE_EQUAL(sha3(packets.back().type()), sha3(packetTypeRLP)); +} + +BOOST_AUTO_TEST_CASE(manyProtocols) +{ + +} + BOOST_AUTO_TEST_SUITE_END()