Browse Source

Merge branch 'netFix' into rlpx

cl-refactor
subtly 10 years ago
parent
commit
f57212421d
  1. 37
      libethereum/BlockChainSync.cpp
  2. 81
      libethereum/BlockChainSync.h
  3. 11
      libp2p/Common.h
  4. 14
      libp2p/Host.cpp
  5. 12
      libp2p/RLPXFrameCoder.cpp
  6. 14
      libp2p/RLPXFrameCoder.h
  7. 337
      libp2p/Session.cpp
  8. 10
      libp2p/Session.h
  9. 10
      test/libp2p/net.cpp

37
libethereum/BlockChainSync.cpp

@ -52,6 +52,7 @@ BlockChainSync::BlockChainSync(EthereumHost& _host):
BlockChainSync::~BlockChainSync()
{
RecursiveGuard l(x_sync);
abortSync();
}
@ -67,8 +68,10 @@ DownloadMan& BlockChainSync::downloadMan()
void BlockChainSync::abortSync()
{
DEV_INVARIANT_CHECK;
host().foreachPeer([this](EthereumPeer* _p) { onPeerAborting(_p); return true; });
downloadMan().resetToChain(h256s());
DEV_INVARIANT_CHECK;
}
void BlockChainSync::onPeerStatus(EthereumPeer* _peer)
@ -87,14 +90,14 @@ void BlockChainSync::onPeerStatus(EthereumPeer* _peer)
_peer->disable("Peer banned for previous bad behaviour.");
else
{
unsigned estimatedHashes = estimateHashes();
_peer->m_expectedHashes = estimatedHashes;
unsigned hashes = estimatedHashes();
_peer->m_expectedHashes = hashes;
onNewPeer(_peer);
}
DEV_INVARIANT_CHECK;
}
unsigned BlockChainSync::estimateHashes() const
unsigned BlockChainSync::estimatedHashes() const
{
BlockInfo block = host().chain().info();
time_t lastBlockTime = (block.hash() == host().chain().genesisHash()) ? 1428192000 : (time_t)block.timestamp;
@ -113,7 +116,6 @@ void BlockChainSync::requestBlocks(EthereumPeer* _peer)
if (host().bq().knownFull())
{
clog(NetAllDetail) << "Waiting for block queue before downloading blocks";
m_lastActiveState = m_state;
pauseSync();
_peer->setIdle();
return;
@ -233,10 +235,8 @@ void BlockChainSync::onPeerBlocks(EthereumPeer* _peer, RLP const& _r)
{
if (downloadMan().isComplete())
completeSync();
else if (!got)
requestBlocks(_peer);
else
peerDoneBlocks(_peer);
requestBlocks(_peer); // Some of the blocks might have been downloaded by helping peers, proceed anyway
}
DEV_INVARIANT_CHECK;
}
@ -508,7 +508,7 @@ bool PV60Sync::shouldGrabBlocks(EthereumPeer* _peer) const
{
auto td = _peer->m_totalDifficulty;
auto lh = _peer->m_latestHash;
auto ctd = host().chain().details().totalDifficulty;
auto ctd = host().chain().details().totalDifficulty;
if (m_syncingNeededBlocks.empty())
return false;
@ -636,7 +636,9 @@ void PV60Sync::noteDoneBlocks(EthereumPeer* _peer, bool _clemency)
// m_banned.insert(_peer->session()->id()); // We know who you are!
// _peer->disable("Peer sent hashes but was unable to provide the blocks.");
}
resetSync();
downloadMan().reset();
transition(_peer, SyncState::Idle);
}
_peer->m_sub.doneFetch();
}
@ -648,7 +650,7 @@ void PV60Sync::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes)
_peer->setIdle();
if (!isSyncing(_peer))
{
clog(NetMessageSummary) << "Ignoring hashes synce not syncing";
clog(NetMessageSummary) << "Ignoring hashes since not syncing";
return;
}
if (_hashes.size() == 0)
@ -705,6 +707,7 @@ void PV60Sync::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes)
clog(NetMessageSummary) << "Ignoring since we're already downloading.";
return;
}
clog(NetMessageDetail) << "Not syncing and new block hash discovered: syncing without help.";
unsigned knowns = 0;
unsigned unknowns = 0;
for (auto const& h: _hashes)
@ -741,6 +744,7 @@ void PV60Sync::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes)
void PV60Sync::abortSync(EthereumPeer* _peer)
{
// Can't check invariants here since the peers is already removed from the list and the state is not updated yet.
if (isSyncing(_peer))
{
host().foreachPeer([this](EthereumPeer* _p) { _p->setIdle(); return true; });
@ -751,6 +755,8 @@ void PV60Sync::abortSync(EthereumPeer* _peer)
void PV60Sync::onPeerAborting(EthereumPeer* _peer)
{
RecursiveGuard l(x_sync);
// Can't check invariants here since the peers is already removed from the list and the state is not updated yet.
abortSync(_peer);
DEV_INVARIANT_CHECK;
}
@ -767,6 +773,10 @@ bool PV60Sync::invariants() const
host().foreachPeer([&](EthereumPeer* _p) { if (_p->m_asking == Asking::Hashes) hashes = true; return !hashes; });
if (!hashes)
return false;
if (!m_syncingLatestHash)
return false;
if (m_syncingNeededBlocks.empty() != (!m_syncingLastReceivedHash))
return false;
}
if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks)
{
@ -777,5 +787,14 @@ bool PV60Sync::invariants() const
if (downloadMan().isComplete())
return false;
}
if (m_state == SyncState::Idle)
{
bool busy = false;
host().foreachPeer([&](EthereumPeer* _p) { if (_p->m_asking != Asking::Nothing && _p->m_asking != Asking::State) busy = true; return !busy; });
if (busy)
return false;
}
if (m_state == SyncState::Waiting && !host().bq().isActive())
return false;
return true;
}

81
libethereum/BlockChainSync.h

@ -109,17 +109,16 @@ protected:
EthereumHost const& host() const { return m_host; }
/// Estimates max number of hashes peers can give us.
unsigned estimateHashes() const;
unsigned estimatedHashes() const;
/// Request blocks from peer if needed
void requestBlocks(EthereumPeer* _peer);
protected:
Handler m_bqRoomAvailable;
Handler m_bqRoomAvailable; ///< Triggered once block queue
mutable RecursiveMutex x_sync;
SyncState m_state = SyncState::Idle; ///< Current sync state
SyncState m_lastActiveState = SyncState::Idle; ///< Saved state before entering waiting queue mode
unsigned m_estimatedHashes = 0; ///< Number of estimated hashes for the last peer over PV60. Used for status reporting only.
SyncState m_state = SyncState::Idle; ///< Current sync state
unsigned m_estimatedHashes = 0; ///< Number of estimated hashes for the last peer over PV60. Used for status reporting only.
private:
static char const* const s_stateNames[static_cast<int>(SyncState::Size)];
@ -133,6 +132,70 @@ private:
* @brief Syncrhonization over PV60. Selects a single peer and tries to downloading hashes from it. After hash downaload is complete
* Syncs to peers and keeps up to date
*/
/**
* Transitions:
*
* Idle->Hashes
* Triggered when:
* * A new peer appears that we can sync to
* * Transtition to Idle, there are peers we can sync to
* Effects:
* * Set chain sync (m_syncingTotalDifficulty, m_syncingLatestHash, m_syncer)
* * Requests hashes from m_syncer
*
* Hashes->Idle
* Triggered when:
* * Received too many hashes
* * Received 0 total hashes from m_syncer
* * m_syncer aborts
* Effects:
* In case of too many hashes sync is reset
*
* Hashes->Blocks
* Triggered when:
* * Received known hash from m_syncer
* * Received 0 hashes from m_syncer and m_syncingTotalBlocks not empty
* Effects:
* * Set up download manager, clear m_syncingTotalBlocks. Set all peers to help with downloading if they can
*
* Blocks->Idle
* Triggered when:
* * m_syncer aborts
* * m_syncer does not have required block
* * All blocks downloaded
* * Block qeueue is full with unknown blocks
* Effects:
* * Download manager is reset
*
* Blocks->Waiting
* Triggered when:
* * Block queue is full with known blocks
* Effects:
* * Stop requesting blocks from peers
*
* Waiting->Blocks
* Triggered when:
* * Block queue has space for new blocks
* Effects:
* * Continue requesting blocks from peers
*
* Idle->NewBlocks
* Triggered when:
* * New block hashes arrive
* Effects:
* * Set up download manager, clear m_syncingTotalBlocks. Download blocks from a single peer. If downloaded blocks have unknown parents, set the peer to sync
*
* NewBlocks->Idle
* Triggered when:
* * m_syncer aborts
* * m_syncer does not have required block
* * All new blocks downloaded
* * Block qeueue is full with unknown blocks
* Effects:
* * Download manager is reset
*
*/
class PV60Sync: public BlockChainSync
{
public:
@ -153,6 +216,7 @@ public:
/// @returns Sync status
SyncStatus status() const override;
protected:
void onNewPeer(EthereumPeer* _peer) override;
void continueSync() override;
void peerDoneBlocks(EthereumPeer* _peer) override;
@ -205,9 +269,10 @@ private:
h256s m_syncingNeededBlocks; ///< The blocks that we should download from this peer.
h256 m_syncingLastReceivedHash; ///< Hash most recently received from peer.
h256 m_syncingLatestHash; ///< Peer's latest block's hash, as of the current sync.
u256 m_syncingTotalDifficulty; ///< Peer's latest block's total difficulty, as of the current sync.
EthereumPeer* m_syncer = nullptr; // TODO: switch to weak_ptr
h256 m_syncingLatestHash; ///< Latest block's hash of the peer we are syncing to, as of the current sync.
u256 m_syncingTotalDifficulty; ///< Latest block's total difficulty of the peer we aresyncing to, as of the current sync.
// TODO: switch to weak_ptr
EthereumPeer* m_syncer = nullptr; ///< Peer we are currently syncing with
};
}
}

11
libp2p/Common.h

@ -149,14 +149,15 @@ using CapDescs = std::vector<CapDesc>;
*/
struct PeerSessionInfo
{
NodeId id;
std::string clientVersion;
std::string host;
unsigned short port;
NodeId const id;
std::string const clientVersion;
std::string const host;
unsigned short const port;
std::chrono::steady_clock::duration lastPing;
std::set<CapDesc> caps;
std::set<CapDesc> const caps;
unsigned socketId;
std::map<std::string, std::string> notes;
unsigned const protocolVersion;
};
using PeerSessionInfos = std::vector<PeerSessionInfo>;

14
libp2p/Host.cpp

@ -254,7 +254,7 @@ void Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameCoder*
clog(NetMessageSummary) << "Hello: " << clientVersion << "V[" << protocolVersion << "]" << _id << showbase << capslog.str() << dec << listenPort;
// create session so disconnects are managed
auto ps = make_shared<Session>(this, _io, _s, p, PeerSessionInfo({_id, clientVersion, p->endpoint.address.to_string(), listenPort, chrono::steady_clock::duration(), _rlp[2].toSet<CapDesc>(), 0, map<string, string>()}));
auto ps = make_shared<Session>(this, _io, _s, p, PeerSessionInfo({_id, clientVersion, p->endpoint.address.to_string(), listenPort, chrono::steady_clock::duration(), _rlp[2].toSet<CapDesc>(), 0, map<string, string>(), protocolVersion}));
if (protocolVersion < dev::p2p::c_protocolVersion - 1)
{
ps->disconnect(IncompatibleProtocol);
@ -724,8 +724,16 @@ void Host::startedWorking()
void Host::doWork()
{
if (m_run)
m_ioService.run();
try
{
if (m_run)
m_ioService.run();
}
catch (std::exception const& _e)
{
clog(NetP2PWarn) << "Exception in Network Thread:" << _e.what();
clog(NetP2PWarn) << "Network Restart is Recommended.";
}
}
void Host::keepAlivePeers()

12
libp2p/RLPXFrameCoder.cpp

@ -30,6 +30,18 @@ using namespace dev;
using namespace dev::p2p;
using namespace CryptoPP;
RLPXFrameInfo::RLPXFrameInfo(bytesConstRef _header)
{
length = (_header[0] * 256 + _header[1]) * 256 + _header[2];
padding = ((16 - (length % 16)) % 16);
RLP header(_header.cropped(3), RLP::ThrowOnFail | RLP::FailIfTooSmall);
auto itemCount = header.itemCount();
protocolId = header[0].toInt<uint16_t>();
hasSequence = itemCount > 1;
sequenceId = hasSequence ? header[1].toInt<uint16_t>() : 0;
totalLength = itemCount == 3 ? header[2].toInt<uint32_t>() : 0;
}
RLPXFrameCoder::RLPXFrameCoder(RLPXHandshake const& _init)
{
setup(_init.m_originated, _init.m_remoteEphemeral, _init.m_remoteNonce, _init.m_ecdhe, _init.m_nonce, &_init.m_ackCipher, &_init.m_authCipher);

14
libp2p/RLPXFrameCoder.h

@ -32,7 +32,21 @@ namespace dev
{
namespace p2p
{
struct RLPXFrameInfo
{
RLPXFrameInfo() = default;
/// Constructor. frame-size || protocol-type, [sequence-id[, total-packet-size]]
RLPXFrameInfo(bytesConstRef _frameHeader);
uint32_t length = 0; ///< Max: 2**24
uint8_t padding = 0;
uint16_t protocolId = 0;
bool hasSequence = false;
uint16_t sequenceId = 0;
uint32_t totalLength = 0;
};
class RLPXHandshake;
/**

337
libp2p/Session.cpp

@ -27,7 +27,6 @@
#include <libdevcore/CommonIO.h>
#include <libdevcore/StructuredLogger.h>
#include <libethcore/Exceptions.h>
#include "RLPxHandshake.h"
#include "Host.h"
#include "Capability.h"
using namespace std;
@ -156,111 +155,25 @@ void Session::serviceNodesRequest()
addNote("peers", "done");
}
bool Session::interpret(PacketType _t, RLP const& _r)
bool Session::readPacket(uint16_t _capId, PacketType _t, RLP const& _r)
{
m_lastReceived = chrono::steady_clock::now();
clog(NetRight) << _t << _r;
try // Generic try-catch block designed to capture RLP format errors - TODO: give decent diagnostics, make a bit more specific over what is caught.
{
switch (_t)
{
case DisconnectPacket:
{
string reason = "Unspecified";
auto r = (DisconnectReason)_r[0].toInt<int>();
if (!_r[0].isInt())
drop(BadProtocol);
else
{
reason = reasonOf(r);
clog(NetMessageSummary) << "Disconnect (reason: " << reason << ")";
drop(DisconnectRequested);
}
break;
}
case PingPacket:
{
clog(NetTriviaSummary) << "Ping";
RLPStream s;
sealAndSend(prep(s, PongPacket));
break;
}
case PongPacket:
m_info.lastPing = std::chrono::steady_clock::now() - m_ping;
clog(NetTriviaSummary) << "Latency: " << chrono::duration_cast<chrono::milliseconds>(m_info.lastPing).count() << " ms";
break;
case GetPeersPacket:
// Disabled for interop testing.
// GetPeers/PeersPacket will be modified to only exchange new nodes which it's peers are interested in.
break;
clog(NetTriviaSummary) << "GetPeers";
m_theyRequestedNodes = true;
serviceNodesRequest();
break;
case PeersPacket:
// Disabled for interop testing.
// GetPeers/PeersPacket will be modified to only exchange new nodes which it's peers are interested in.
break;
clog(NetTriviaSummary) << "Peers (" << dec << (_r.itemCount() - 1) << " entries)";
m_weRequestedNodes = false;
for (unsigned i = 0; i < _r.itemCount(); ++i)
{
bi::address peerAddress;
if (_r[i][0].size() == 16)
peerAddress = bi::address_v6(_r[i][0].toHash<FixedHash<16>>().asArray());
else if (_r[i][0].size() == 4)
peerAddress = bi::address_v4(_r[i][0].toHash<FixedHash<4>>().asArray());
else
{
cwarn << "Received bad peer packet:" << _r;
disconnect(BadProtocol);
return true;
}
auto ep = bi::tcp::endpoint(peerAddress, _r[i][1].toInt<short>());
NodeId id = _r[i][2].toHash<NodeId>();
clog(NetAllDetail) << "Checking: " << ep << "(" << id << ")";
if (!isPublicAddress(peerAddress))
goto CONTINUE; // Private address. Ignore.
if (!id)
goto LAMEPEER; // Null identity. Ignore.
if (m_server->id() == id)
goto LAMEPEER; // Just our info - we already have that.
if (id == this->id())
goto LAMEPEER; // Just their info - we already have that.
if (!ep.port())
goto LAMEPEER; // Zero port? Don't think so.
if (ep.port() >= /*49152*/32768)
goto LAMEPEER; // Private port according to IANA.
// OK passed all our checks. Assume it's good.
addRating(1000);
m_server->addNode(id, NodeIPEndpoint(ep.address(), ep.port(), ep.port()));
clog(NetTriviaDetail) << "New peer: " << ep << "(" << id << ")";
CONTINUE:;
LAMEPEER:;
}
break;
default:
// v4 frame headers are useless, offset packet type used
// v5 protocol type is in header, packet type not offset
if (_capId == 0 && _t < UserPacket)
return interpret(_t, _r);
if (m_info.protocolVersion >= 5)
for (auto const& i: m_capabilities)
if (_capId == (uint16_t)i.first.second)
return i.second->m_enabled ? i.second->interpret(_t, _r) : true;
if (m_info.protocolVersion <= 4)
for (auto const& i: m_capabilities)
if (_t >= (int)i.second->m_idOffset && _t - i.second->m_idOffset < i.second->hostCapability()->messageCount())
{
if (i.second->m_enabled)
return i.second->interpret(_t - i.second->m_idOffset, _r);
else
return true;
}
return false;
}
return i.second->m_enabled ? i.second->interpret(_t - i.second->m_idOffset, _r) : true;
return false;
}
catch (std::exception const& _e)
{
@ -271,6 +184,101 @@ bool Session::interpret(PacketType _t, RLP const& _r)
return true;
}
bool Session::interpret(PacketType _t, RLP const& _r)
{
switch (_t)
{
case DisconnectPacket:
{
string reason = "Unspecified";
auto r = (DisconnectReason)_r[0].toInt<int>();
if (!_r[0].isInt())
drop(BadProtocol);
else
{
reason = reasonOf(r);
clog(NetMessageSummary) << "Disconnect (reason: " << reason << ")";
drop(DisconnectRequested);
}
break;
}
case PingPacket:
{
clog(NetTriviaSummary) << "Ping";
RLPStream s;
sealAndSend(prep(s, PongPacket));
break;
}
case PongPacket:
m_info.lastPing = std::chrono::steady_clock::now() - m_ping;
clog(NetTriviaSummary) << "Latency: " << chrono::duration_cast<chrono::milliseconds>(m_info.lastPing).count() << " ms";
break;
case GetPeersPacket:
// Disabled for interop testing.
// GetPeers/PeersPacket will be modified to only exchange new nodes which it's peers are interested in.
break;
clog(NetTriviaSummary) << "GetPeers";
m_theyRequestedNodes = true;
serviceNodesRequest();
break;
case PeersPacket:
// Disabled for interop testing.
// GetPeers/PeersPacket will be modified to only exchange new nodes which it's peers are interested in.
break;
clog(NetTriviaSummary) << "Peers (" << dec << (_r.itemCount() - 1) << " entries)";
m_weRequestedNodes = false;
for (unsigned i = 0; i < _r.itemCount(); ++i)
{
bi::address peerAddress;
if (_r[i][0].size() == 16)
peerAddress = bi::address_v6(_r[i][0].toHash<FixedHash<16>>().asArray());
else if (_r[i][0].size() == 4)
peerAddress = bi::address_v4(_r[i][0].toHash<FixedHash<4>>().asArray());
else
{
cwarn << "Received bad peer packet:" << _r;
disconnect(BadProtocol);
return true;
}
auto ep = bi::tcp::endpoint(peerAddress, _r[i][1].toInt<short>());
NodeId id = _r[i][2].toHash<NodeId>();
clog(NetAllDetail) << "Checking: " << ep << "(" << id << ")";
if (!isPublicAddress(peerAddress))
goto CONTINUE; // Private address. Ignore.
if (!id)
goto LAMEPEER; // Null identity. Ignore.
if (m_server->id() == id)
goto LAMEPEER; // Just our info - we already have that.
if (id == this->id())
goto LAMEPEER; // Just their info - we already have that.
if (!ep.port())
goto LAMEPEER; // Zero port? Don't think so.
if (ep.port() >= /*49152*/32768)
goto LAMEPEER; // Private port according to IANA.
// OK passed all our checks. Assume it's good.
addRating(1000);
m_server->addNode(id, NodeIPEndpoint(ep.address(), ep.port(), ep.port()));
clog(NetTriviaDetail) << "New peer: " << ep << "(" << id << ")";
CONTINUE:;
LAMEPEER:;
}
break;
default:
return false;
}
return true;
}
void Session::ping()
{
RLPStream s;
@ -292,12 +300,9 @@ void Session::sealAndSend(RLPStream& _s)
bool Session::checkPacket(bytesConstRef _msg)
{
if (_msg.size() < 2)
if (_msg[0] > 0x7f || _msg.size() < 2)
return false;
if (_msg[0] > 0x7f)
return false;
RLP r(_msg.cropped(1));
if (r.actualSize() + 1 != _msg.size())
if (RLP(_msg.cropped(1)).actualSize() + 1 != _msg.size())
return false;
return true;
}
@ -413,82 +418,78 @@ void Session::doRead()
{
ThreadContext tc(info().id.abridged());
ThreadContext tc2(info().clientVersion);
if (ec && ec.category() != boost::asio::error::get_misc_category() && ec.value() != boost::asio::error::eof)
if (!checkRead(h256::size, ec, length))
return;
else if (!m_io->authAndDecryptHeader(bytesRef(m_data.data(), length)))
{
clog(NetWarn) << "Error reading: " << ec.message();
drop(TCPError);
clog(NetWarn) << "header decrypt failed";
drop(BadProtocol); // todo: better error
return;
}
RLPXFrameInfo header;
try
{
header = RLPXFrameInfo(bytesConstRef(m_data.data(), length));
}
else if (ec && length == 0)
catch (std::exception const& _e)
{
clog(NetWarn) << "Exception decoding frame header RLP:" << bytesConstRef(m_data.data(), h128::size).cropped(3);
drop(BadProtocol);
return;
else
}
/// read padded frame and mac
auto tlen = header.length + header.padding + h128::size;
ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, tlen), [this, self, header, tlen](boost::system::error_code ec, std::size_t length)
{
/// authenticate and decrypt header
bytesRef header(m_data.data(), h256::size);
if (!m_io->authAndDecryptHeader(header))
ThreadContext tc(info().id.abridged());
ThreadContext tc2(info().clientVersion);
if (!checkRead(tlen, ec, length))
return;
else if (!m_io->authAndDecryptFrame(bytesRef(m_data.data(), tlen)))
{
clog(NetWarn) << "header decrypt failed";
clog(NetWarn) << "frame decrypt failed";
drop(BadProtocol); // todo: better error
return;
}
/// check frame size
uint32_t frameSize = (m_data[0] * 256 + m_data[1]) * 256 + m_data[2];
if (frameSize >= (uint32_t)1 << 24)
bytesConstRef frame(m_data.data(), header.length);
if (!checkPacket(frame))
{
clog(NetWarn) << "frame size too large";
drop(BadProtocol);
cerr << "Received " << frame.size() << ": " << toHex(frame) << endl;
clog(NetWarn) << "INVALID MESSAGE RECEIVED";
disconnect(BadProtocol);
return;
}
/// rlp of header has protocol-type, sequence-id[, total-packet-size]
bytes headerRLP(13);
bytesConstRef(m_data.data(), h128::size).cropped(3).copyTo(&headerRLP);
/// read padded frame and mac
auto tlen = frameSize + ((16 - (frameSize % 16)) % 16) + h128::size;
ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, tlen), [this, self, headerRLP, frameSize, tlen](boost::system::error_code ec, std::size_t length)
else
{
ThreadContext tc(info().id.abridged());
ThreadContext tc2(info().clientVersion);
if (ec && ec.category() != boost::asio::error::get_misc_category() && ec.value() != boost::asio::error::eof)
{
clog(NetWarn) << "Error reading: " << ec.message();
drop(TCPError);
}
else if (ec && length < tlen)
{
clog(NetWarn) << "Error reading - Abrupt peer disconnect: " << ec.message();
repMan().noteRude(*this);
drop(TCPError);
return;
}
else
{
if (!m_io->authAndDecryptFrame(bytesRef(m_data.data(), tlen)))
{
clog(NetWarn) << "frame decrypt failed";
drop(BadProtocol); // todo: better error
return;
}
bytesConstRef frame(m_data.data(), frameSize);
if (!checkPacket(frame))
{
cerr << "Received " << frame.size() << ": " << toHex(frame) << endl;
clog(NetWarn) << "INVALID MESSAGE RECEIVED";
disconnect(BadProtocol);
return;
}
else
{
auto packetType = (PacketType)RLP(frame.cropped(0, 1)).toInt<unsigned>();
RLP r(frame.cropped(1));
if (!interpret(packetType, r))
clog(NetWarn) << "Couldn't interpret packet." << RLP(r);
}
doRead();
}
});
}
auto packetType = (PacketType)RLP(frame.cropped(0, 1)).toInt<unsigned>();
RLP r(frame.cropped(1));
if (!readPacket(header.protocolId, packetType, r))
clog(NetWarn) << "Couldn't interpret packet." << RLP(r);
}
doRead();
});
});
}
bool Session::checkRead(std::size_t _expected, boost::system::error_code _ec, std::size_t _length)
{
if (_ec && _ec.category() != boost::asio::error::get_misc_category() && _ec.value() != boost::asio::error::eof)
{
clog(NetConnect) << "Error reading: " << _ec.message();
drop(TCPError);
return false;
}
else if (_ec && _length < _expected)
{
clog(NetWarn) << "Error reading - Abrupt peer disconnect: " << _ec.message();
repMan().noteRude(*this);
drop(TCPError);
return false;
}
// If this fails then there's an unhandled asio error
assert(_expected == _length);
return true;
}

10
libp2p/Session.h

@ -96,13 +96,19 @@ private:
/// Perform a read on the socket.
void doRead();
/// Check error code after reading and drop peer if error code.
bool checkRead(std::size_t _expected, boost::system::error_code _ec, std::size_t _length);
/// Perform a single round of the write operation. This could end up calling itself asynchronously.
void write();
/// Interpret an incoming message.
bool interpret(PacketType _t, RLP const& _r);
/// Deliver RLPX packet to Session or Capability for interpretation.
bool readPacket(uint16_t _capId, PacketType _t, RLP const& _r);
/// Interpret an incoming Session packet.
bool interpret(PacketType _t, RLP const& _r);
/// @returns true iff the _msg forms a valid message for sending or receiving on the network.
static bool checkPacket(bytesConstRef _msg);

10
test/libp2p/net.cpp

@ -314,23 +314,23 @@ BOOST_AUTO_TEST_CASE(kademlia)
node.nodeTable->discover(); // ideally, joining with empty node table logs warning we can check for
node.setup();
node.populate();
clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
// clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
node.populateAll();
clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
// clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
auto nodes = node.nodeTable->nodes();
nodes.sort();
node.nodeTable->reset();
clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
// clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
node.populate(1);
clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
// clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
node.nodeTable->discover();
this_thread::sleep_for(chrono::milliseconds(2000));
clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
// clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
BOOST_REQUIRE_EQUAL(node.nodeTable->count(), 8);

Loading…
Cancel
Save