Browse Source

Prep for #2179. Abstract parsing of frame header from Session into struct. Catch unhandled exceptions thrown by ASIO.

cl-refactor
subtly 10 years ago
parent
commit
e82f8db2de
  1. 11
      libp2p/Common.h
  2. 14
      libp2p/Host.cpp
  3. 12
      libp2p/RLPXFrameCoder.cpp
  4. 14
      libp2p/RLPXFrameCoder.h
  5. 337
      libp2p/Session.cpp
  6. 8
      libp2p/Session.h
  7. 10
      test/libp2p/net.cpp

11
libp2p/Common.h

@ -149,14 +149,15 @@ using CapDescs = std::vector<CapDesc>;
*/ */
struct PeerSessionInfo struct PeerSessionInfo
{ {
NodeId id; NodeId const id;
std::string clientVersion; std::string const clientVersion;
std::string host; std::string const host;
unsigned short port; unsigned short const port;
std::chrono::steady_clock::duration lastPing; std::chrono::steady_clock::duration lastPing;
std::set<CapDesc> caps; std::set<CapDesc> const caps;
unsigned socketId; unsigned socketId;
std::map<std::string, std::string> notes; std::map<std::string, std::string> notes;
unsigned const protocolVersion;
}; };
using PeerSessionInfos = std::vector<PeerSessionInfo>; using PeerSessionInfos = std::vector<PeerSessionInfo>;

14
libp2p/Host.cpp

@ -254,7 +254,7 @@ void Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameCoder*
clog(NetMessageSummary) << "Hello: " << clientVersion << "V[" << protocolVersion << "]" << _id << showbase << capslog.str() << dec << listenPort; clog(NetMessageSummary) << "Hello: " << clientVersion << "V[" << protocolVersion << "]" << _id << showbase << capslog.str() << dec << listenPort;
// create session so disconnects are managed // create session so disconnects are managed
auto ps = make_shared<Session>(this, _io, _s, p, PeerSessionInfo({_id, clientVersion, p->endpoint.address.to_string(), listenPort, chrono::steady_clock::duration(), _rlp[2].toSet<CapDesc>(), 0, map<string, string>()})); auto ps = make_shared<Session>(this, _io, _s, p, PeerSessionInfo({_id, clientVersion, p->endpoint.address.to_string(), listenPort, chrono::steady_clock::duration(), _rlp[2].toSet<CapDesc>(), 0, map<string, string>(), protocolVersion}));
if (protocolVersion < dev::p2p::c_protocolVersion - 1) if (protocolVersion < dev::p2p::c_protocolVersion - 1)
{ {
ps->disconnect(IncompatibleProtocol); ps->disconnect(IncompatibleProtocol);
@ -724,8 +724,16 @@ void Host::startedWorking()
void Host::doWork() void Host::doWork()
{ {
if (m_run) try
m_ioService.run(); {
if (m_run)
m_ioService.run();
}
catch (std::exception const& _e)
{
clog(NetP2PWarn) << "Exception in Network Thread:" << _e.what();
clog(NetP2PWarn) << "Network Restart is Recommended.";
}
} }
void Host::keepAlivePeers() void Host::keepAlivePeers()

12
libp2p/RLPXFrameCoder.cpp

@ -29,6 +29,18 @@ using namespace dev;
using namespace dev::p2p; using namespace dev::p2p;
using namespace CryptoPP; using namespace CryptoPP;
RLPXFrameInfo::RLPXFrameInfo(bytesConstRef _header)
{
length = (_header[0] * 256 + _header[1]) * 256 + _header[2];
padding = ((16 - (length % 16)) % 16);
RLP header(_header.cropped(3), RLP::ThrowOnFail | RLP::FailIfTooSmall);
auto itemCount = header.itemCount();
protocolId = header[0].toInt<uint16_t>();
hasSequence = itemCount > 1;
sequenceId = hasSequence ? header[1].toInt<uint16_t>() : 0;
totalLength = itemCount == 3 ? header[2].toInt<uint32_t>() : 0;
}
RLPXFrameCoder::RLPXFrameCoder(RLPXHandshake const& _init) RLPXFrameCoder::RLPXFrameCoder(RLPXHandshake const& _init)
{ {
// we need: // we need:

14
libp2p/RLPXFrameCoder.h

@ -33,6 +33,20 @@ namespace dev
namespace p2p namespace p2p
{ {
struct RLPXFrameInfo
{
RLPXFrameInfo() = default;
/// Constructor. frame-size || protocol-type, [sequence-id[, total-packet-size]]
RLPXFrameInfo(bytesConstRef _frameHeader);
uint32_t length = 0; ///< Max: 2**24
uint8_t padding = 0;
uint16_t protocolId = 0;
bool hasSequence = false;
uint16_t sequenceId = 0;
uint32_t totalLength = 0;
};
class RLPXHandshake; class RLPXHandshake;
/** /**

337
libp2p/Session.cpp

@ -27,7 +27,6 @@
#include <libdevcore/CommonIO.h> #include <libdevcore/CommonIO.h>
#include <libdevcore/StructuredLogger.h> #include <libdevcore/StructuredLogger.h>
#include <libethcore/Exceptions.h> #include <libethcore/Exceptions.h>
#include "RLPxHandshake.h"
#include "Host.h" #include "Host.h"
#include "Capability.h" #include "Capability.h"
using namespace std; using namespace std;
@ -156,111 +155,25 @@ void Session::serviceNodesRequest()
addNote("peers", "done"); addNote("peers", "done");
} }
bool Session::interpret(PacketType _t, RLP const& _r) bool Session::frameReceived(uint16_t _capId, PacketType _t, RLP const& _r)
{ {
m_lastReceived = chrono::steady_clock::now(); m_lastReceived = chrono::steady_clock::now();
clog(NetRight) << _t << _r; clog(NetRight) << _t << _r;
try // Generic try-catch block designed to capture RLP format errors - TODO: give decent diagnostics, make a bit more specific over what is caught. try // Generic try-catch block designed to capture RLP format errors - TODO: give decent diagnostics, make a bit more specific over what is caught.
{ {
switch (_t) // v4 frame headers are useless, offset packet type used
{ // v5 protocol type is in header, packet type not offset
case DisconnectPacket: if (_capId == 0 && _t < UserPacket)
{ return interpret(_t, _r);
string reason = "Unspecified"; if (m_info.protocolVersion >= 5)
auto r = (DisconnectReason)_r[0].toInt<int>(); for (auto const& i: m_capabilities)
if (!_r[0].isInt()) if (_capId == (uint16_t)i.first.second)
drop(BadProtocol); return i.second->m_enabled ? i.second->interpret(_t, _r) : true;
else if (m_info.protocolVersion <= 4)
{
reason = reasonOf(r);
clog(NetMessageSummary) << "Disconnect (reason: " << reason << ")";
drop(DisconnectRequested);
}
break;
}
case PingPacket:
{
clog(NetTriviaSummary) << "Ping";
RLPStream s;
sealAndSend(prep(s, PongPacket));
break;
}
case PongPacket:
m_info.lastPing = std::chrono::steady_clock::now() - m_ping;
clog(NetTriviaSummary) << "Latency: " << chrono::duration_cast<chrono::milliseconds>(m_info.lastPing).count() << " ms";
break;
case GetPeersPacket:
// Disabled for interop testing.
// GetPeers/PeersPacket will be modified to only exchange new nodes which it's peers are interested in.
break;
clog(NetTriviaSummary) << "GetPeers";
m_theyRequestedNodes = true;
serviceNodesRequest();
break;
case PeersPacket:
// Disabled for interop testing.
// GetPeers/PeersPacket will be modified to only exchange new nodes which it's peers are interested in.
break;
clog(NetTriviaSummary) << "Peers (" << dec << (_r.itemCount() - 1) << " entries)";
m_weRequestedNodes = false;
for (unsigned i = 0; i < _r.itemCount(); ++i)
{
bi::address peerAddress;
if (_r[i][0].size() == 16)
peerAddress = bi::address_v6(_r[i][0].toHash<FixedHash<16>>().asArray());
else if (_r[i][0].size() == 4)
peerAddress = bi::address_v4(_r[i][0].toHash<FixedHash<4>>().asArray());
else
{
cwarn << "Received bad peer packet:" << _r;
disconnect(BadProtocol);
return true;
}
auto ep = bi::tcp::endpoint(peerAddress, _r[i][1].toInt<short>());
NodeId id = _r[i][2].toHash<NodeId>();
clog(NetAllDetail) << "Checking: " << ep << "(" << id << ")";
if (!isPublicAddress(peerAddress))
goto CONTINUE; // Private address. Ignore.
if (!id)
goto LAMEPEER; // Null identity. Ignore.
if (m_server->id() == id)
goto LAMEPEER; // Just our info - we already have that.
if (id == this->id())
goto LAMEPEER; // Just their info - we already have that.
if (!ep.port())
goto LAMEPEER; // Zero port? Don't think so.
if (ep.port() >= /*49152*/32768)
goto LAMEPEER; // Private port according to IANA.
// OK passed all our checks. Assume it's good.
addRating(1000);
m_server->addNode(id, NodeIPEndpoint(ep.address(), ep.port(), ep.port()));
clog(NetTriviaDetail) << "New peer: " << ep << "(" << id << ")";
CONTINUE:;
LAMEPEER:;
}
break;
default:
for (auto const& i: m_capabilities) for (auto const& i: m_capabilities)
if (_t >= (int)i.second->m_idOffset && _t - i.second->m_idOffset < i.second->hostCapability()->messageCount()) if (_t >= (int)i.second->m_idOffset && _t - i.second->m_idOffset < i.second->hostCapability()->messageCount())
{ return i.second->m_enabled ? i.second->interpret(_t - i.second->m_idOffset, _r) : true;
if (i.second->m_enabled) return false;
return i.second->interpret(_t - i.second->m_idOffset, _r);
else
return true;
}
return false;
}
} }
catch (std::exception const& _e) catch (std::exception const& _e)
{ {
@ -271,6 +184,101 @@ bool Session::interpret(PacketType _t, RLP const& _r)
return true; return true;
} }
bool Session::interpret(PacketType _t, RLP const& _r)
{
switch (_t)
{
case DisconnectPacket:
{
string reason = "Unspecified";
auto r = (DisconnectReason)_r[0].toInt<int>();
if (!_r[0].isInt())
drop(BadProtocol);
else
{
reason = reasonOf(r);
clog(NetMessageSummary) << "Disconnect (reason: " << reason << ")";
drop(DisconnectRequested);
}
break;
}
case PingPacket:
{
clog(NetTriviaSummary) << "Ping";
RLPStream s;
sealAndSend(prep(s, PongPacket));
break;
}
case PongPacket:
m_info.lastPing = std::chrono::steady_clock::now() - m_ping;
clog(NetTriviaSummary) << "Latency: " << chrono::duration_cast<chrono::milliseconds>(m_info.lastPing).count() << " ms";
break;
case GetPeersPacket:
// Disabled for interop testing.
// GetPeers/PeersPacket will be modified to only exchange new nodes which it's peers are interested in.
break;
clog(NetTriviaSummary) << "GetPeers";
m_theyRequestedNodes = true;
serviceNodesRequest();
break;
case PeersPacket:
// Disabled for interop testing.
// GetPeers/PeersPacket will be modified to only exchange new nodes which it's peers are interested in.
break;
clog(NetTriviaSummary) << "Peers (" << dec << (_r.itemCount() - 1) << " entries)";
m_weRequestedNodes = false;
for (unsigned i = 0; i < _r.itemCount(); ++i)
{
bi::address peerAddress;
if (_r[i][0].size() == 16)
peerAddress = bi::address_v6(_r[i][0].toHash<FixedHash<16>>().asArray());
else if (_r[i][0].size() == 4)
peerAddress = bi::address_v4(_r[i][0].toHash<FixedHash<4>>().asArray());
else
{
cwarn << "Received bad peer packet:" << _r;
disconnect(BadProtocol);
return true;
}
auto ep = bi::tcp::endpoint(peerAddress, _r[i][1].toInt<short>());
NodeId id = _r[i][2].toHash<NodeId>();
clog(NetAllDetail) << "Checking: " << ep << "(" << id << ")";
if (!isPublicAddress(peerAddress))
goto CONTINUE; // Private address. Ignore.
if (!id)
goto LAMEPEER; // Null identity. Ignore.
if (m_server->id() == id)
goto LAMEPEER; // Just our info - we already have that.
if (id == this->id())
goto LAMEPEER; // Just their info - we already have that.
if (!ep.port())
goto LAMEPEER; // Zero port? Don't think so.
if (ep.port() >= /*49152*/32768)
goto LAMEPEER; // Private port according to IANA.
// OK passed all our checks. Assume it's good.
addRating(1000);
m_server->addNode(id, NodeIPEndpoint(ep.address(), ep.port(), ep.port()));
clog(NetTriviaDetail) << "New peer: " << ep << "(" << id << ")";
CONTINUE:;
LAMEPEER:;
}
break;
default:
return false;
}
return true;
}
void Session::ping() void Session::ping()
{ {
RLPStream s; RLPStream s;
@ -292,12 +300,9 @@ void Session::sealAndSend(RLPStream& _s)
bool Session::checkPacket(bytesConstRef _msg) bool Session::checkPacket(bytesConstRef _msg)
{ {
if (_msg.size() < 2) if (_msg[0] > 0x7f || _msg.size() < 2)
return false;
if (_msg[0] > 0x7f)
return false; return false;
RLP r(_msg.cropped(1)); if (RLP(_msg.cropped(1)).actualSize() + 1 != _msg.size())
if (r.actualSize() + 1 != _msg.size())
return false; return false;
return true; return true;
} }
@ -413,82 +418,78 @@ void Session::doRead()
{ {
ThreadContext tc(info().id.abridged()); ThreadContext tc(info().id.abridged());
ThreadContext tc2(info().clientVersion); ThreadContext tc2(info().clientVersion);
if (ec && ec.category() != boost::asio::error::get_misc_category() && ec.value() != boost::asio::error::eof) if (!checkRead(h256::size, ec, length))
return;
else if (!m_io->authAndDecryptHeader(bytesRef(m_data.data(), length)))
{ {
clog(NetWarn) << "Error reading: " << ec.message(); clog(NetWarn) << "header decrypt failed";
drop(TCPError); drop(BadProtocol); // todo: better error
return;
} }
else if (ec && length == 0)
RLPXFrameInfo header;
try
{
header = RLPXFrameInfo(bytesConstRef(m_data.data(), length));
}
catch (std::exception const& _e)
{
clog(NetWarn) << "Exception decoding frame header RLP:" << bytesConstRef(m_data.data(), h128::size).cropped(3);
drop(BadProtocol);
return; return;
else }
/// read padded frame and mac
auto tlen = header.length + header.padding + h128::size;
ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, tlen), [this, self, header, tlen](boost::system::error_code ec, std::size_t length)
{ {
/// authenticate and decrypt header ThreadContext tc(info().id.abridged());
bytesRef header(m_data.data(), h256::size); ThreadContext tc2(info().clientVersion);
if (!m_io->authAndDecryptHeader(header)) if (!checkRead(tlen, ec, length))
return;
else if (!m_io->authAndDecryptFrame(bytesRef(m_data.data(), tlen)))
{ {
clog(NetWarn) << "header decrypt failed"; clog(NetWarn) << "frame decrypt failed";
drop(BadProtocol); // todo: better error drop(BadProtocol); // todo: better error
return; return;
} }
/// check frame size bytesConstRef frame(m_data.data(), header.length);
uint32_t frameSize = (m_data[0] * 256 + m_data[1]) * 256 + m_data[2]; if (!checkPacket(frame))
if (frameSize >= (uint32_t)1 << 24)
{ {
clog(NetWarn) << "frame size too large"; cerr << "Received " << frame.size() << ": " << toHex(frame) << endl;
drop(BadProtocol); clog(NetWarn) << "INVALID MESSAGE RECEIVED";
disconnect(BadProtocol);
return; return;
} }
else
/// rlp of header has protocol-type, sequence-id[, total-packet-size]
bytes headerRLP(13);
bytesConstRef(m_data.data(), h128::size).cropped(3).copyTo(&headerRLP);
/// read padded frame and mac
auto tlen = frameSize + ((16 - (frameSize % 16)) % 16) + h128::size;
ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, tlen), [this, self, headerRLP, frameSize, tlen](boost::system::error_code ec, std::size_t length)
{ {
ThreadContext tc(info().id.abridged()); auto packetType = (PacketType)RLP(frame.cropped(0, 1)).toInt<unsigned>();
ThreadContext tc2(info().clientVersion); RLP r(frame.cropped(1));
if (ec && ec.category() != boost::asio::error::get_misc_category() && ec.value() != boost::asio::error::eof) if (!frameReceived(header.protocolId, packetType, r))
{ clog(NetWarn) << "Couldn't interpret packet." << RLP(r);
clog(NetWarn) << "Error reading: " << ec.message(); }
drop(TCPError); doRead();
} });
else if (ec && length < tlen)
{
clog(NetWarn) << "Error reading - Abrupt peer disconnect: " << ec.message();
repMan().noteRude(*this);
drop(TCPError);
return;
}
else
{
if (!m_io->authAndDecryptFrame(bytesRef(m_data.data(), tlen)))
{
clog(NetWarn) << "frame decrypt failed";
drop(BadProtocol); // todo: better error
return;
}
bytesConstRef frame(m_data.data(), frameSize);
if (!checkPacket(frame))
{
cerr << "Received " << frame.size() << ": " << toHex(frame) << endl;
clog(NetWarn) << "INVALID MESSAGE RECEIVED";
disconnect(BadProtocol);
return;
}
else
{
auto packetType = (PacketType)RLP(frame.cropped(0, 1)).toInt<unsigned>();
RLP r(frame.cropped(1));
if (!interpret(packetType, r))
clog(NetWarn) << "Couldn't interpret packet." << RLP(r);
}
doRead();
}
});
}
}); });
} }
bool Session::checkRead(std::size_t _expected, boost::system::error_code _ec, std::size_t _length)
{
if (_ec && _ec.category() != boost::asio::error::get_misc_category() && _ec.value() != boost::asio::error::eof)
{
clog(NetConnect) << "Error reading: " << _ec.message();
drop(TCPError);
return false;
}
else if (_ec && _length < _expected)
{
clog(NetWarn) << "Error reading - Abrupt peer disconnect: " << _ec.message();
repMan().noteRude(*this);
drop(TCPError);
return false;
}
// If this fails then there's an unhandled asio error
assert(_expected == _length);
return true;
}

8
libp2p/Session.h

@ -97,10 +97,16 @@ private:
/// Perform a read on the socket. /// Perform a read on the socket.
void doRead(); void doRead();
/// Check error code after reading and drop peer if error code.
bool checkRead(std::size_t _expected, boost::system::error_code _ec, std::size_t _length);
/// Perform a single round of the write operation. This could end up calling itself asynchronously. /// Perform a single round of the write operation. This could end up calling itself asynchronously.
void write(); void write();
/// Interpret an incoming message. /// Deliver RLPX packet to Session or Capability for interpretation.
bool frameReceived(uint16_t _capId, PacketType _t, RLP const& _r);
/// Interpret an incoming Session packet.
bool interpret(PacketType _t, RLP const& _r); bool interpret(PacketType _t, RLP const& _r);
/// @returns true iff the _msg forms a valid message for sending or receiving on the network. /// @returns true iff the _msg forms a valid message for sending or receiving on the network.

10
test/libp2p/net.cpp

@ -314,23 +314,23 @@ BOOST_AUTO_TEST_CASE(kademlia)
node.nodeTable->discover(); // ideally, joining with empty node table logs warning we can check for node.nodeTable->discover(); // ideally, joining with empty node table logs warning we can check for
node.setup(); node.setup();
node.populate(); node.populate();
clog << "NodeTable:\n" << *node.nodeTable.get() << endl; // clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
node.populateAll(); node.populateAll();
clog << "NodeTable:\n" << *node.nodeTable.get() << endl; // clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
auto nodes = node.nodeTable->nodes(); auto nodes = node.nodeTable->nodes();
nodes.sort(); nodes.sort();
node.nodeTable->reset(); node.nodeTable->reset();
clog << "NodeTable:\n" << *node.nodeTable.get() << endl; // clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
node.populate(1); node.populate(1);
clog << "NodeTable:\n" << *node.nodeTable.get() << endl; // clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
node.nodeTable->discover(); node.nodeTable->discover();
this_thread::sleep_for(chrono::milliseconds(2000)); this_thread::sleep_for(chrono::milliseconds(2000));
clog << "NodeTable:\n" << *node.nodeTable.get() << endl; // clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
BOOST_REQUIRE_EQUAL(node.nodeTable->count(), 8); BOOST_REQUIRE_EQUAL(node.nodeTable->count(), 8);

Loading…
Cancel
Save