Browse Source

Various enhancements for P2P networking.

cl-refactor
Gav Wood 11 years ago
parent
commit
b9d5602003
  1. 66
      libethereum/PeerNetwork.cpp

66
libethereum/PeerNetwork.cpp

@ -107,6 +107,23 @@ bi::tcp::endpoint PeerSession::endpoint() const
return bi::tcp::endpoint();
}
static std::string reasonOf(int _r)
{
switch (_r)
{
case DisconnectRequested: return "Disconnect was requested.";
case TCPError: return "Low-level TCP communication error.";
case BadProtocol: return "Data format error.";
case UselessPeer: return "Peer had no use for this node.";
case TooManyPeers: return "Peer had too many connections.";
case DuplicatePeer: return "Peer was already connected.";
case WrongGenesis: return "Disagreement over genesis block.";
case IncompatibleProtocol: return "Peer protocol versions are incompatible.";
case ClientQuit: return "Peer is exiting.";
default: return "Unknown reason.";
}
}
// TODO: BUG! 256 -> work out why things start to break with big packet sizes -> g.t. ~370 blocks.
bool PeerSession::interpret(RLP const& _r)
@ -125,14 +142,17 @@ bool PeerSession::interpret(RLP const& _r)
clogS(NetMessageSummary) << "Hello: " << clientVersion << "V[" << m_protocolVersion << "/" << m_networkId << "]" << asHex(m_id.ref().cropped(0, 4)) << showbase << hex << m_caps << dec << m_listenPort;
if (m_server->m_peers.count(m_id) || !m_id)
if (m_server->m_peers.count(m_id))
if (auto l = m_server->m_peers[m_id].lock())
if (l.get() != this && l->isOpen())
{
// Already connected.
cwarn << "Already have peer id" << m_id << "at" << l->endpoint() << "rather than" << endpoint();
disconnect(DuplicatePeer);
return false;
}
m_server->m_peers[m_id] = shared_from_this();
if (m_protocolVersion != c_protocolVersion || m_networkId != m_reqNetworkId)
if (m_protocolVersion != c_protocolVersion || m_networkId != m_reqNetworkId || !m_id)
{
disconnect(IncompatibleProtocol);
return false;
@ -145,6 +165,8 @@ bool PeerSession::interpret(RLP const& _r)
return false;
}
m_server->m_peers[m_id] = shared_from_this();
// Grab their block chain off them.
{
unsigned count = std::min<unsigned>(c_maxHashes, m_server->m_chain->details(m_server->m_latestBlockSent).number + 1);
@ -167,18 +189,7 @@ bool PeerSession::interpret(RLP const& _r)
{
string reason = "Unspecified";
if (_r[1].isInt())
switch (_r[1].toInt<int>())
{
case DisconnectRequested: reason = "Disconnect was requested."; break;
case TCPError: reason = "Low-level TCP communication error."; break;
case BadProtocol: reason = "Data format error."; break;
case UselessPeer: reason = "Peer had no use for this node."; break;
case TooManyPeers: reason = "Peer had too many connections."; break;
case DuplicatePeer: reason = "Peer was already connected."; break;
case WrongGenesis: reason = "Disagreement over genesis block."; break;
case IncompatibleProtocol: reason = "Peer protocol versions are incompatible."; break;
case ClientQuit: reason = "Peer is exiting."; break;
}
reason = reasonOf(_r[1].toInt<int>());
clogS(NetMessageSummary) << "Disconnect (reason: " << reason << ")";
if (m_socket.is_open())
@ -452,7 +463,10 @@ void PeerSession::sendDestroy(bytes& _msg)
ba::async_write(m_socket, ba::buffer(*buffer), [=](boost::system::error_code ec, std::size_t length)
{
if (ec)
{
cwarn << "Error sending: " << ec.message();
dropped();
}
// cbug << length << " bytes written (EC: " << ec << ")";
});
}
@ -465,7 +479,10 @@ void PeerSession::send(bytesConstRef _msg)
ba::async_write(m_socket, ba::buffer(*buffer), [=](boost::system::error_code ec, std::size_t length)
{
if (ec)
{
cwarn << "Error sending: " << ec.message();
dropped();
}
// cbug << length << " bytes written (EC: " << ec << ")";
});
}
@ -487,6 +504,7 @@ void PeerSession::dropped()
void PeerSession::disconnect(int _reason)
{
clogS(NetNote) << "Disconnecting (reason:" << reasonOf(_reason) << ")";
if (m_socket.is_open())
{
if (m_disconnect == chrono::steady_clock::time_point::max())
@ -498,15 +516,7 @@ void PeerSession::disconnect(int _reason)
m_disconnect = chrono::steady_clock::now();
}
else
{
if (m_socket.is_open())
try {
clogS(NetNote) << "Closing " << m_socket.remote_endpoint();
} catch (...){}
else
clogS(NetNote) << "Remote closed on" << m_socket.native_handle();
m_socket.close();
}
dropped();
}
}
@ -528,7 +538,10 @@ void PeerSession::doRead()
m_socket.async_read_some(boost::asio::buffer(m_data), [this, self](boost::system::error_code ec, std::size_t length)
{
if (ec)
{
cwarn << "Error reading: " << ec.message();
dropped();
}
else
{
try
@ -553,8 +566,11 @@ void PeerSession::doRead()
// cerr << "Received " << len << ": " << asHex(bytesConstRef(m_incoming.data() + 8, len)) << endl;
RLP r(bytesConstRef(m_incoming.data() + 8, len));
if (!interpret(r))
{
// error
break;
dropped();
return;
}
memmove(m_incoming.data(), m_incoming.data() + len + 8, m_incoming.size() - (len + 8));
m_incoming.resize(m_incoming.size() - (len + 8));
}

Loading…
Cancel
Save