arkpar
10 years ago
16 changed files with 986 additions and 75 deletions
@ -0,0 +1,90 @@ |
|||
/*
|
|||
This file is part of cpp-ethereum. |
|||
|
|||
cpp-ethereum is free software: you can redistribute it and/or modify |
|||
it under the terms of the GNU General Public License as published by |
|||
the Free Software Foundation, either version 3 of the License, or |
|||
(at your option) any later version. |
|||
|
|||
cpp-ethereum is distributed in the hope that it will be useful, |
|||
but WITHOUT ANY WARRANTY; without even the implied warranty of |
|||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|||
GNU General Public License for more details. |
|||
|
|||
You should have received a copy of the GNU General Public License |
|||
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
|
|||
*/ |
|||
/** @file RLPXFrameReader.cpp
|
|||
* @author Alex Leverington <nessence@gmail.com> |
|||
* @date 2015 |
|||
*/ |
|||
|
|||
#include "RLPXFrameReader.h" |
|||
|
|||
using namespace std; |
|||
using namespace dev; |
|||
using namespace dev::p2p; |
|||
|
|||
std::vector<RLPXPacket> RLPXFrameReader::demux(RLPXFrameCoder& _coder, RLPXFrameInfo const& _info, bytesRef _frame) |
|||
{ |
|||
if (!_coder.authAndDecryptFrame(_frame)) |
|||
BOOST_THROW_EXCEPTION(RLPXFrameDecryptFailed()); |
|||
|
|||
std::vector<RLPXPacket> ret; |
|||
if (_frame.empty()) |
|||
// drop: bad frame (empty)
|
|||
return ret; |
|||
if (_info.multiFrame && _info.totalLength && _frame.size() > _info.totalLength) |
|||
// drop: bad frame (too large)
|
|||
return ret; |
|||
|
|||
// trim mac
|
|||
bytesConstRef buffer = _frame.cropped(0, _frame.size() - (h128::size + _info.padding)); |
|||
// continue populating multiframe packets
|
|||
if (_info.multiFrame && m_incomplete.count(_info.sequenceId)) |
|||
{ |
|||
uint32_t& remaining = m_incomplete.at(_info.sequenceId).second; |
|||
if (!_info.totalLength && buffer.size() > 0 && buffer.size() <= remaining) |
|||
{ |
|||
remaining -= buffer.size(); |
|||
|
|||
RLPXPacket& p = m_incomplete.at(_info.sequenceId).first; |
|||
if(p.append(buffer) && !remaining) |
|||
ret.push_back(std::move(p)); |
|||
if (!remaining) |
|||
m_incomplete.erase(_info.sequenceId); |
|||
|
|||
if (!ret.empty() && remaining) |
|||
BOOST_THROW_EXCEPTION(RLPXInvalidPacket()); |
|||
else if (ret.empty() && !remaining) |
|||
BOOST_THROW_EXCEPTION(RLPXInvalidPacket()); |
|||
|
|||
return ret; |
|||
} |
|||
else |
|||
m_incomplete.erase(_info.sequenceId); |
|||
} |
|||
|
|||
while (!buffer.empty()) |
|||
{ |
|||
auto type = nextRLP(buffer); |
|||
if (type.empty()) |
|||
break; |
|||
buffer = buffer.cropped(type.size()); |
|||
// consume entire buffer if packet has sequence
|
|||
auto packet = _info.multiFrame ? buffer : nextRLP(buffer); |
|||
buffer = buffer.cropped(packet.size()); |
|||
RLPXPacket p(m_protocolType, type); |
|||
if (!packet.empty()) |
|||
p.append(packet); |
|||
|
|||
uint32_t remaining = _info.totalLength - type.size() - packet.size(); |
|||
if (p.isValid()) |
|||
ret.push_back(std::move(p)); |
|||
else if (_info.multiFrame && remaining) |
|||
m_incomplete.insert(std::make_pair(_info.sequenceId, std::make_pair(std::move(p), remaining))); |
|||
else |
|||
BOOST_THROW_EXCEPTION(RLPXInvalidPacket()); |
|||
} |
|||
return ret; |
|||
} |
@ -0,0 +1,55 @@ |
|||
/*
|
|||
This file is part of cpp-ethereum. |
|||
|
|||
cpp-ethereum is free software: you can redistribute it and/or modify |
|||
it under the terms of the GNU General Public License as published by |
|||
the Free Software Foundation, either version 3 of the License, or |
|||
(at your option) any later version. |
|||
|
|||
cpp-ethereum is distributed in the hope that it will be useful, |
|||
but WITHOUT ANY WARRANTY; without even the implied warranty of |
|||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|||
GNU General Public License for more details. |
|||
|
|||
You should have received a copy of the GNU General Public License |
|||
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
|
|||
*/ |
|||
/** @file RLPXFrameReader.h
|
|||
* @author Alex Leverington <nessence@gmail.com> |
|||
* @date 2015 |
|||
*/ |
|||
|
|||
|
|||
#pragma once |
|||
|
|||
#include <libdevcore/Guards.h> |
|||
#include "RLPXFrameCoder.h" |
|||
#include "RLPXPacket.h" |
|||
namespace ba = boost::asio; |
|||
namespace bi = boost::asio::ip; |
|||
|
|||
namespace dev |
|||
{ |
|||
namespace p2p |
|||
{ |
|||
|
|||
/**
|
|||
* RLPFrameReader |
|||
* Reads and assembles RLPX frame byte buffers into RLPX packets. Additionally |
|||
* buffers incomplete packets which are pieced into multiple frames (has sequence). |
|||
*/ |
|||
class RLPXFrameReader |
|||
{ |
|||
public: |
|||
RLPXFrameReader(uint16_t _protocolType): m_protocolType(_protocolType) {} |
|||
|
|||
/// Processes a single frame returning complete packets.
|
|||
std::vector<RLPXPacket> demux(RLPXFrameCoder& _coder, RLPXFrameInfo const& _info, bytesRef _frame); |
|||
|
|||
protected: |
|||
uint16_t m_protocolType; |
|||
std::map<uint16_t, std::pair<RLPXPacket, uint32_t>> m_incomplete; ///< Sequence: Incomplete packet and bytes remaining.
|
|||
}; |
|||
|
|||
} |
|||
} |
@ -0,0 +1,161 @@ |
|||
/*
|
|||
This file is part of cpp-ethereum. |
|||
|
|||
cpp-ethereum is free software: you can redistribute it and/or modify |
|||
it under the terms of the GNU General Public License as published by |
|||
the Free Software Foundation, either version 3 of the License, or |
|||
(at your option) any later version. |
|||
|
|||
cpp-ethereum is distributed in the hope that it will be useful, |
|||
but WITHOUT ANY WARRANTY; without even the implied warranty of |
|||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|||
GNU General Public License for more details. |
|||
|
|||
You should have received a copy of the GNU General Public License |
|||
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
|
|||
*/ |
|||
/** @file RLPXFrameWriter.cpp
|
|||
* @author Alex Leverington <nessence@gmail.com> |
|||
* @date 2015 |
|||
*/ |
|||
|
|||
#include "RLPXFrameWriter.h" |
|||
|
|||
using namespace std; |
|||
using namespace dev; |
|||
using namespace dev::p2p; |
|||
|
|||
const uint16_t RLPXFrameWriter::EmptyFrameLength = h128::size * 3; // header + headerMAC + frameMAC
|
|||
const uint16_t RLPXFrameWriter::MinFrameDequeLength = h128::size * 4; // header + headerMAC + padded-block + frameMAC
|
|||
|
|||
void RLPXFrameWriter::enque(RLPXPacket&& _p, PacketPriority _priority) |
|||
{ |
|||
if (!_p.isValid()) |
|||
return; |
|||
WriterState& qs = _priority ? m_q.first : m_q.second; |
|||
DEV_GUARDED(qs.x) |
|||
qs.q.push_back(move(_p)); |
|||
} |
|||
|
|||
void RLPXFrameWriter::enque(uint8_t _packetType, RLPStream& _payload, PacketPriority _priority) |
|||
{ |
|||
enque(RLPXPacket(m_protocolType, (RLPStream() << _packetType), _payload), _priority); |
|||
} |
|||
|
|||
size_t RLPXFrameWriter::mux(RLPXFrameCoder& _coder, unsigned _size, vector<bytes>& o_toWrite) |
|||
{ |
|||
static const size_t c_blockSize = h128::size; |
|||
static const size_t c_overhead = c_blockSize * 3; // header + headerMac + frameMAC
|
|||
if (_size < c_overhead + c_blockSize) |
|||
return 0; |
|||
|
|||
size_t ret = 0; |
|||
size_t frameLen = _size / 16 * 16; |
|||
bytes payload(0); |
|||
bool swapQueues = false; |
|||
while (frameLen >= c_overhead + c_blockSize) |
|||
{ |
|||
bool highPending; |
|||
bool lowPending; |
|||
DEV_GUARDED(m_q.first.x) |
|||
highPending = !!m_q.first.q.size(); |
|||
DEV_GUARDED(m_q.second.x) |
|||
lowPending = !!m_q.second.q.size(); |
|||
|
|||
if (!highPending && !lowPending) |
|||
return ret; |
|||
|
|||
// first run when !swapQueues, high > low, otherwise low > high
|
|||
bool high = highPending && !swapQueues ? true : !lowPending; |
|||
WriterState &qs = high ? m_q.first : m_q.second; |
|||
size_t frameAllot = (!swapQueues && highPending && lowPending ? frameLen / 2 - (c_overhead + c_blockSize) > 0 ? frameLen / 2 : frameLen : frameLen) - c_overhead; |
|||
size_t offset = 0; |
|||
size_t length = 0; |
|||
while (frameAllot >= c_blockSize) |
|||
{ |
|||
// Invariants:
|
|||
// !qs.writing && payload.empty() initial entry
|
|||
// !qs.writing && !payload.empty() continuation (multiple packets per frame)
|
|||
// qs.writing && payload.empty() initial entry, continuation (multiple frames per packet)
|
|||
// qs.writing && !payload.empty() INVALID
|
|||
|
|||
// write packet-type
|
|||
if (qs.writing == nullptr) |
|||
{ |
|||
{ |
|||
Guard l(qs.x); |
|||
if (!qs.q.empty()) |
|||
qs.writing = &qs.q[0]; |
|||
else |
|||
break; |
|||
} |
|||
|
|||
// break here if we can't write-out packet-type
|
|||
// or payload is packed and next packet won't fit (implicit)
|
|||
qs.multiFrame = qs.writing->size() > frameAllot; |
|||
assert(qs.writing->type().size()); |
|||
if (qs.writing->type().size() > frameAllot || (qs.multiFrame && !payload.empty())) |
|||
{ |
|||
qs.writing = nullptr; |
|||
qs.remaining = 0; |
|||
qs.multiFrame = false; |
|||
break; |
|||
} |
|||
else if (qs.multiFrame) |
|||
qs.sequence = ++m_sequenceId; |
|||
|
|||
frameAllot -= qs.writing->type().size(); |
|||
payload += qs.writing->type(); |
|||
|
|||
qs.remaining = qs.writing->data().size(); |
|||
} |
|||
|
|||
// write payload w/remaining allotment
|
|||
assert(qs.multiFrame || (!qs.multiFrame && frameAllot >= qs.remaining)); |
|||
if (frameAllot && qs.remaining) |
|||
{ |
|||
offset = qs.writing->data().size() - qs.remaining; |
|||
length = qs.remaining <= frameAllot ? qs.remaining : frameAllot; |
|||
bytes portion = bytesConstRef(&qs.writing->data()).cropped(offset, length).toBytes(); |
|||
qs.remaining -= length; |
|||
frameAllot -= portion.size(); |
|||
payload += portion; |
|||
} |
|||
|
|||
assert((!qs.remaining && (offset > 0 || !qs.multiFrame)) || (qs.remaining && qs.multiFrame)); |
|||
if (!qs.remaining) |
|||
{ |
|||
qs.writing = nullptr; |
|||
DEV_GUARDED(qs.x) |
|||
qs.q.pop_front(); |
|||
ret++; |
|||
} |
|||
// qs.writing is left alone for first frame of multi-frame packet
|
|||
if (qs.multiFrame) |
|||
break; |
|||
} |
|||
|
|||
if (!payload.empty()) |
|||
{ |
|||
if (qs.multiFrame) |
|||
if (offset == 0) |
|||
// 1st frame of segmented packet writes total-size of packet
|
|||
_coder.writeFrame(m_protocolType, qs.sequence, qs.writing->size(), &payload, payload); |
|||
else |
|||
_coder.writeFrame(m_protocolType, qs.sequence, &payload, payload); |
|||
else |
|||
_coder.writeFrame(m_protocolType, &payload, payload); |
|||
assert((int)frameLen - payload.size() >= 0); |
|||
frameLen -= payload.size(); |
|||
o_toWrite.push_back(payload); |
|||
payload.resize(0); |
|||
|
|||
if (!qs.remaining && qs.multiFrame) |
|||
qs.multiFrame = false; |
|||
} |
|||
else if (swapQueues) |
|||
break; |
|||
swapQueues = true; |
|||
} |
|||
return ret; |
|||
} |
@ -0,0 +1,88 @@ |
|||
/*
|
|||
This file is part of cpp-ethereum. |
|||
|
|||
cpp-ethereum is free software: you can redistribute it and/or modify |
|||
it under the terms of the GNU General Public License as published by |
|||
the Free Software Foundation, either version 3 of the License, or |
|||
(at your option) any later version. |
|||
|
|||
cpp-ethereum is distributed in the hope that it will be useful, |
|||
but WITHOUT ANY WARRANTY; without even the implied warranty of |
|||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|||
GNU General Public License for more details. |
|||
|
|||
You should have received a copy of the GNU General Public License |
|||
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
|
|||
*/ |
|||
/** @file RLPXperimental.h
|
|||
* @author Alex Leverington <nessence@gmail.com> |
|||
* @date 2015 |
|||
*/ |
|||
|
|||
|
|||
#pragma once |
|||
|
|||
#include <libdevcore/Guards.h> |
|||
#include "RLPXFrameCoder.h" |
|||
#include "RLPXPacket.h" |
|||
namespace ba = boost::asio; |
|||
namespace bi = boost::asio::ip; |
|||
|
|||
namespace dev |
|||
{ |
|||
namespace p2p |
|||
{ |
|||
|
|||
/**
|
|||
* @brief Multiplex packets into encrypted RLPX frames. |
|||
* @todo throw when enqueued packet is invalid |
|||
* @todo use RLPXFrameInfo |
|||
*/ |
|||
class RLPXFrameWriter |
|||
{ |
|||
/**
|
|||
* @brief Queue and state for Writer |
|||
* Properties are used independently; |
|||
* only valid packets should be added to q |
|||
* @todo implement as class |
|||
*/ |
|||
struct WriterState |
|||
{ |
|||
std::deque<RLPXPacket> q; |
|||
mutable Mutex x; |
|||
|
|||
RLPXPacket* writing = nullptr; |
|||
size_t remaining = 0; |
|||
bool multiFrame = false; |
|||
uint16_t sequence = -1; |
|||
}; |
|||
|
|||
public: |
|||
enum PacketPriority { PriorityLow = 0, PriorityHigh }; |
|||
static const uint16_t EmptyFrameLength; |
|||
static const uint16_t MinFrameDequeLength; |
|||
|
|||
RLPXFrameWriter(uint16_t _protocolType): m_protocolType(_protocolType) {} |
|||
RLPXFrameWriter(RLPXFrameWriter const& _s): m_protocolType(_s.m_protocolType) {} |
|||
|
|||
/// Returns total number of queued packets. Thread-safe.
|
|||
size_t size() const { size_t l; size_t h; DEV_GUARDED(m_q.first.x) h = m_q.first.q.size(); DEV_GUARDED(m_q.second.x) l = m_q.second.q.size(); return l + h; } |
|||
|
|||
/// Moves @_payload output to queue, to be muxed into frames by mux() when network buffer is ready for writing. Thread-safe.
|
|||
void enque(uint8_t _packetType, RLPStream& _payload, PacketPriority _priority = PriorityLow); |
|||
|
|||
/// Returns number of packets framed and outputs frames to o_bytes. Not thread-safe.
|
|||
size_t mux(RLPXFrameCoder& _coder, unsigned _size, std::vector<bytes>& o_toWrite); |
|||
|
|||
protected: |
|||
/// Moves @_p to queue, to be muxed into frames by mux() when network buffer is ready for writing. Thread-safe.
|
|||
void enque(RLPXPacket&& _p, PacketPriority _priority = PriorityLow); |
|||
|
|||
private: |
|||
uint16_t const m_protocolType; // Protocol Type
|
|||
std::pair<WriterState, WriterState> m_q; // High, Low frame queues
|
|||
uint16_t m_sequenceId = 0; // Sequence ID
|
|||
}; |
|||
|
|||
} |
|||
} |
@ -0,0 +1,69 @@ |
|||
/*
|
|||
This file is part of cpp-ethereum. |
|||
|
|||
cpp-ethereum is free software: you can redistribute it and/or modify |
|||
it under the terms of the GNU General Public License as published by |
|||
the Free Software Foundation, either version 3 of the License, or |
|||
(at your option) any later version. |
|||
|
|||
cpp-ethereum is distributed in the hope that it will be useful, |
|||
but WITHOUT ANY WARRANTY; without even the implied warranty of |
|||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|||
GNU General Public License for more details. |
|||
|
|||
You should have received a copy of the GNU General Public License |
|||
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
|
|||
*/ |
|||
/** @file RLPXPacket.h
|
|||
* @author Alex Leverington <nessence@gmail.com> |
|||
* @date 2015 |
|||
*/ |
|||
|
|||
#pragma once |
|||
|
|||
#include <algorithm> |
|||
#include "Common.h" |
|||
|
|||
namespace dev |
|||
{ |
|||
namespace p2p |
|||
{ |
|||
|
|||
struct RLPXInvalidPacket: virtual dev::Exception {}; |
|||
|
|||
static bytesConstRef nextRLP(bytesConstRef _b) { try { RLP r(_b, RLP::AllowNonCanon); return _b.cropped(0, std::min((size_t)r.actualSize(), _b.size())); } catch(...) {} return bytesConstRef(); } |
|||
|
|||
/**
|
|||
* RLPX Packet |
|||
*/ |
|||
class RLPXPacket |
|||
{ |
|||
public: |
|||
/// Construct packet. RLPStream data is invalidated.
|
|||
RLPXPacket(uint8_t _capId, RLPStream& _type, RLPStream& _data): m_cap(_capId), m_type(_type.out()), m_data(_data.out()) {} |
|||
|
|||
/// Construct packet from single bytestream. RLPStream data is invalidated.
|
|||
RLPXPacket(unsigned _capId, bytesConstRef _in): m_cap(_capId), m_type(nextRLP(_in).toBytes()) { if (_in.size() > m_type.size()) { m_data.resize(_in.size() - m_type.size()); _in.cropped(m_type.size()).copyTo(&m_data); } } |
|||
|
|||
RLPXPacket(RLPXPacket const& _p) = delete; |
|||
RLPXPacket(RLPXPacket&& _p): m_cap(_p.m_cap), m_type(_p.m_type), m_data(_p.m_data) {} |
|||
|
|||
bytes const& type() const { return m_type; } |
|||
|
|||
bytes const& data() const { return m_data; } |
|||
|
|||
size_t size() const { try { return RLP(m_type).actualSize() + RLP(m_data, RLP::LaissezFaire).actualSize(); } catch(...) { return 0; } } |
|||
|
|||
/// Appends byte data and returns if packet is valid.
|
|||
bool append(bytesConstRef _in) { auto offset = m_data.size(); m_data.resize(offset + _in.size()); _in.copyTo(bytesRef(&m_data).cropped(offset)); return isValid(); } |
|||
|
|||
virtual bool isValid() const noexcept { try { return !(m_type.empty() && m_data.empty()) && RLP(m_type).actualSize() == m_type.size() && RLP(m_data).actualSize() == m_data.size(); } catch (...) {} return false; } |
|||
|
|||
protected: |
|||
uint8_t m_cap; |
|||
bytes m_type; |
|||
bytes m_data; |
|||
}; |
|||
|
|||
} |
|||
} |
@ -0,0 +1,112 @@ |
|||
/*
|
|||
This file is part of cpp-ethereum. |
|||
|
|||
cpp-ethereum is free software: you can redistribute it and/or modify |
|||
it under the terms of the GNU General Public License as published by |
|||
the Free Software Foundation, either version 3 of the License, or |
|||
(at your option) any later version. |
|||
|
|||
cpp-ethereum is distributed in the hope that it will be useful, |
|||
but WITHOUT ANY WARRANTY; without even the implied warranty of |
|||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|||
GNU General Public License for more details. |
|||
|
|||
You should have received a copy of the GNU General Public License |
|||
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
|
|||
*/ |
|||
/** @file RLPXSocketIO.cpp
|
|||
* @author Alex Leverington <nessence@gmail.com> |
|||
* @date 2015 |
|||
*/ |
|||
|
|||
#include "RLPXSocketIO.h" |
|||
|
|||
#include <algorithm> |
|||
using namespace std; |
|||
using namespace dev; |
|||
using namespace dev::p2p; |
|||
|
|||
uint32_t const RLPXSocketIO::MinFrameSize = h128::size * 3; // header + block + mac
|
|||
uint32_t const RLPXSocketIO::MaxPacketSize = 1 << 24; |
|||
uint16_t const RLPXSocketIO::DefaultInitialCapacity = 8 << 8; |
|||
|
|||
RLPXSocketIO::RLPXSocketIO(unsigned _protCount, RLPXFrameCoder& _coder, bi::tcp::socket& _socket, bool _flowControl, size_t _initialCapacity): |
|||
m_flowControl(_flowControl), |
|||
m_coder(_coder), |
|||
m_socket(_socket), |
|||
m_writers(move(writers(_protCount))), |
|||
m_egressCapacity(m_flowControl ? _initialCapacity : MaxPacketSize * m_writers.size()) |
|||
{} |
|||
|
|||
vector<RLPXFrameWriter> RLPXSocketIO::writers(unsigned _capacity) |
|||
{ |
|||
vector<RLPXFrameWriter> ret; |
|||
for (unsigned i = 0; i < _capacity; i++) |
|||
ret.push_back(RLPXFrameWriter(i)); |
|||
return ret; |
|||
} |
|||
|
|||
void RLPXSocketIO::send(unsigned _protocolType, unsigned _type, RLPStream& _payload) |
|||
{ |
|||
if (!m_socket.is_open()) |
|||
return; // TCPSocketNotOpen
|
|||
m_writers.at(_protocolType).enque(_type, _payload); |
|||
bool wasEmtpy = false; |
|||
DEV_GUARDED(x_queued) |
|||
wasEmtpy = (++m_queued == 1); |
|||
if (wasEmtpy) |
|||
doWrite(); |
|||
} |
|||
|
|||
void RLPXSocketIO::doWrite() |
|||
{ |
|||
m_toSend.clear(); |
|||
|
|||
size_t capacity; |
|||
DEV_GUARDED(x_queued) |
|||
capacity = min(m_egressCapacity, MaxPacketSize); |
|||
|
|||
size_t active = 0; |
|||
for (auto const& w: m_writers) |
|||
if (w.size()) |
|||
active += 1; |
|||
size_t dequed = 0; |
|||
size_t protFrameSize = capacity / active; |
|||
if (protFrameSize >= MinFrameSize) |
|||
for (auto& w: m_writers) |
|||
dequed += w.mux(m_coder, protFrameSize, m_toSend); |
|||
|
|||
if (dequed) |
|||
write(dequed); |
|||
else |
|||
deferWrite(); |
|||
} |
|||
|
|||
void RLPXSocketIO::deferWrite() |
|||
{ |
|||
auto self(shared_from_this()); |
|||
m_congestion.reset(new ba::deadline_timer(m_socket.get_io_service())); |
|||
m_congestion->expires_from_now(boost::posix_time::milliseconds(50)); |
|||
m_congestion->async_wait([=](boost::system::error_code const& _ec) { m_congestion.reset(); if (!_ec) doWrite(); }); |
|||
} |
|||
|
|||
void RLPXSocketIO::write(size_t _dequed) |
|||
{ |
|||
auto self(shared_from_this()); |
|||
ba::async_write(m_socket, ba::buffer(m_toSend), [this, self, _dequed](boost::system::error_code ec, size_t written) |
|||
{ |
|||
if (ec) |
|||
return; // TCPSocketWriteError
|
|||
|
|||
bool reschedule = false; |
|||
DEV_GUARDED(x_queued) |
|||
{ |
|||
if (m_flowControl) |
|||
m_egressCapacity -= written; |
|||
m_queued -= _dequed; |
|||
reschedule = m_queued > 0; |
|||
} |
|||
if (reschedule) |
|||
doWrite(); |
|||
}); |
|||
} |
@ -0,0 +1,71 @@ |
|||
/*
|
|||
This file is part of cpp-ethereum. |
|||
|
|||
cpp-ethereum is free software: you can redistribute it and/or modify |
|||
it under the terms of the GNU General Public License as published by |
|||
the Free Software Foundation, either version 3 of the License, or |
|||
(at your option) any later version. |
|||
|
|||
cpp-ethereum is distributed in the hope that it will be useful, |
|||
but WITHOUT ANY WARRANTY; without even the implied warranty of |
|||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|||
GNU General Public License for more details. |
|||
|
|||
You should have received a copy of the GNU General Public License |
|||
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
|
|||
*/ |
|||
/** @file RLPXSocketIO.h
|
|||
* @author Alex Leverington <nessence@gmail.com> |
|||
* @date 2015 |
|||
*/ |
|||
|
|||
#pragma once |
|||
|
|||
#include "RLPXFrameWriter.h" |
|||
namespace ba = boost::asio; |
|||
namespace bi = boost::asio::ip; |
|||
|
|||
namespace dev |
|||
{ |
|||
namespace p2p |
|||
{ |
|||
|
|||
class RLPXSocketIO: public std::enable_shared_from_this<RLPXSocketIO> |
|||
{ |
|||
public: |
|||
static uint32_t const MinFrameSize; |
|||
static uint32_t const MaxPacketSize; |
|||
static uint16_t const DefaultInitialCapacity; |
|||
|
|||
RLPXSocketIO(unsigned _protCount, RLPXFrameCoder& _coder, bi::tcp::socket& _socket, bool _flowControl = true, size_t _initialCapacity = DefaultInitialCapacity); |
|||
|
|||
void send(unsigned _protocolType, unsigned _type, RLPStream& _payload); |
|||
|
|||
void doWrite(); |
|||
|
|||
bool congested() const { return !!m_congestion; } |
|||
|
|||
private: |
|||
static std::vector<RLPXFrameWriter> writers(unsigned _capacity); |
|||
|
|||
void deferWrite(); |
|||
|
|||
void write(size_t _dequed); |
|||
|
|||
bool const m_flowControl; ///< True if flow control is enabled.
|
|||
|
|||
RLPXFrameCoder& m_coder; ///< Encoder/decoder of frame payloads.
|
|||
bi::tcp::socket& m_socket; |
|||
|
|||
std::vector<bytes> m_toSend; ///< Reusable byte buffer for pending socket writes.
|
|||
|
|||
std::vector<RLPXFrameWriter> m_writers; ///< Write queues for each protocol. TODO: map to bytes (of capability)
|
|||
std::unique_ptr<ba::deadline_timer> m_congestion; ///< Scheduled when writes are deferred due to congestion.
|
|||
|
|||
Mutex x_queued; |
|||
unsigned m_queued = 0; ///< Track total queued packets to ensure single write loop
|
|||
uint32_t m_egressCapacity; |
|||
}; |
|||
|
|||
} |
|||
} |
Loading…
Reference in new issue