Browse Source

Better drop off semantics.

cl-refactor
Gav Wood 10 years ago
parent
commit
7920f04f30
  1. 8
      libp2p/Capability.cpp
  2. 2
      libp2p/Capability.h
  3. 31
      libp2p/Host.cpp
  4. 132
      libp2p/Session.cpp
  5. 50
      libp2p/Session.h

8
libp2p/Capability.cpp

@ -53,14 +53,14 @@ void Capability::sealAndSend(RLPStream& _s)
m_session->sealAndSend(_s);
}
void Capability::sendDestroy(bytes& _msg)
void Capability::send(bytesConstRef _msg)
{
m_session->sendDestroy(_msg);
m_session->send(_msg);
}
void Capability::send(bytesConstRef _msg)
void Capability::send(bytes&& _msg)
{
m_session->send(_msg);
m_session->send(move(_msg));
}
void Capability::addRating(unsigned _r)

2
libp2p/Capability.h

@ -52,7 +52,7 @@ protected:
RLPStream& prep(RLPStream& _s, unsigned _id, unsigned _args = 0);
void sealAndSend(RLPStream& _s);
void sendDestroy(bytes& _msg);
void send(bytes&& _msg);
void send(bytesConstRef _msg);
void addRating(unsigned _r);

31
libp2p/Host.cpp

@ -497,6 +497,8 @@ void Host::connect(bi::tcp::endpoint const& _ep)
void Node::connect(Host* _h)
{
clog(NetConnect) << "Attempting connection to node" << id.abridged() << "@" << address << "from" << _h->id().abridged();
lastAttempted = std::chrono::system_clock::now();
failedAttempts++;
_h->m_ready -= index;
bi::tcp::socket* s = new bi::tcp::socket(_h->m_ioService);
s->async_connect(address, [=](boost::system::error_code const& ec)
@ -504,14 +506,11 @@ void Node::connect(Host* _h)
if (ec)
{
clog(NetConnect) << "Connection refused to node" << id.abridged() << "@" << address << "(" << ec.message() << ")";
failedAttempts++;
lastAttempted = std::chrono::system_clock::now();
_h->m_ready += index;
}
else
{
clog(NetConnect) << "Connected to" << id.abridged() << "@" << address;
failedAttempts = 0;
lastConnected = std::chrono::system_clock::now();
auto p = make_shared<Session>(_h, std::move(*s), _h->node(id), true); // true because we don't care about ids matched for now. Once we have permenant IDs this will matter a lot more and we can institute a safer mechanism.
p->start();
@ -534,14 +533,24 @@ bool Host::havePeer(NodeId _id) const
return !!m_peers.count(_id);
}
unsigned cumulativeFallback(unsigned _failed)
unsigned cumulativeFallback(unsigned _failed, DisconnectReason _r)
{
if (_failed < 5)
return _failed * 5;
else if (_failed < 15)
return 25 + (_failed - 5) * 10;
else
return 25 + 100 + (_failed - 15) * 20;
switch (_r)
{
case BadProtocol:
return 30 * (_failed + 1);
case UselessPeer:
case TooManyPeers:
case ClientQuit:
return 15 * (_failed + 1);
default:
if (_failed < 5)
return _failed * 5;
else if (_failed < 15)
return 25 + (_failed - 5) * 10;
else
return 25 + 100 + (_failed - 15) * 20;
}
}
void Host::growPeers()
@ -555,7 +564,7 @@ void Host::growPeers()
toTry -= m_private;
set<Node> ns;
for (auto i: toTry)
if (chrono::system_clock::now() > m_nodes[m_nodesList[i]]->lastAttempted + chrono::seconds(cumulativeFallback(m_nodes[m_nodesList[i]]->failedAttempts)))
if (chrono::system_clock::now() > m_nodes[m_nodesList[i]]->lastAttempted + chrono::seconds(cumulativeFallback(m_nodes[m_nodesList[i]]->failedAttempts, m_nodes[m_nodesList[i]]->lastDisconnect)))
ns.insert(*m_nodes[m_nodesList[i]]);
if (ns.size())

132
libp2p/Session.cpp

@ -42,7 +42,6 @@ Session::Session(Host* _s, bi::tcp::socket _socket, bi::tcp::endpoint const& _ma
m_node(nullptr),
m_manualEndpoint(_manual)
{
m_disconnect = std::chrono::steady_clock::time_point::max();
m_connect = std::chrono::steady_clock::now();
m_info = PeerInfo({NodeId(), "?", m_manualEndpoint.address().to_string(), m_manualEndpoint.port(), std::chrono::steady_clock::duration(0), CapDescSet(), 0, map<string, string>()});
@ -55,15 +54,15 @@ Session::Session(Host* _s, bi::tcp::socket _socket, std::shared_ptr<Node> const&
m_manualEndpoint(_n->address),
m_force(_force)
{
m_disconnect = std::chrono::steady_clock::time_point::max();
m_connect = std::chrono::steady_clock::now();
m_info = PeerInfo({m_node->id, "?", _n->address.address().to_string(), _n->address.port(), std::chrono::steady_clock::duration(0), CapDescSet(), 0, map<string, string>()});
}
Session::~Session()
{
if (id() && (!m_node || (!isPermanentProblem(m_node->lastDisconnect) && !m_node->dead)))
m_server->m_ready += m_node->index;
if (m_node)
if (id() && !isPermanentProblem(m_node->lastDisconnect) && !m_node->dead)
m_server->m_ready += m_node->index;
// Read-chain finished for one reason or another.
for (auto& i: m_capabilities)
@ -176,9 +175,6 @@ bool Session::interpret(RLP const& _r)
{
case HelloPacket:
{
if (m_node)
m_node->lastDisconnect = NoDisconnect;
m_protocolVersion = _r[1].toInt<unsigned>();
auto clientVersion = _r[2].toString();
auto caps = _r[3].toVector<CapDesc>();
@ -198,7 +194,7 @@ bool Session::interpret(RLP const& _r)
// Already connected.
clogS(NetWarn) << "Connected to ourself under a false pretext. We were told this peer was id" << m_info.id.abridged();
disconnect(LocalIdentity);
return false;
return true;
}
if (m_node && m_node->id != id)
@ -210,14 +206,14 @@ bool Session::interpret(RLP const& _r)
{
clogS(NetWarn) << "Connected to node, but their ID has changed since last time. This could indicate a MitM attack. Disconnecting.";
disconnect(UnexpectedIdentity);
return false;
return true;
}
if (m_server->havePeer(id))
{
m_node->dead = true;
disconnect(DuplicatePeer);
return false;
return true;
}
}
@ -226,13 +222,13 @@ bool Session::interpret(RLP const& _r)
// Already connected.
clogS(NetWarn) << "Already connected to a peer with id" << id.abridged();
disconnect(DuplicatePeer);
return false;
return true;
}
if (!id)
{
disconnect(NullIdentity);
return false;
return true;
}
m_node = m_server->noteNode(id, bi::tcp::endpoint(m_socket.remote_endpoint().address(), listenPort), Origin::Self, false, !m_node || m_node->id == id ? NodeId() : m_node->id);
@ -242,7 +238,7 @@ bool Session::interpret(RLP const& _r)
if (m_protocolVersion != m_server->protocolVersion())
{
disconnect(IncompatibleProtocol);
return false;
return true;
}
m_info = PeerInfo({id, clientVersion, m_socket.remote_endpoint().address().to_string(), listenPort, std::chrono::steady_clock::duration(), _r[3].toSet<CapDesc>(), (unsigned)m_socket.native_handle(), map<string, string>() });
@ -252,16 +248,13 @@ bool Session::interpret(RLP const& _r)
case DisconnectPacket:
{
string reason = "Unspecified";
auto r = (DisconnectReason)_r[1].toInt<int>();
if (_r[1].isInt())
reason = reasonOf((DisconnectReason)_r[1].toInt<int>());
reason = reasonOf(r);
clogS(NetMessageSummary) << "Disconnect (reason: " << reason << ")";
if (m_socket.is_open())
clogS(NetNote) << "Closing " << m_socket.remote_endpoint();
else
clogS(NetNote) << "Remote closed.";
m_socket.close();
return false;
drop(DisconnectRequested);
return true;
}
case PingPacket:
{
@ -294,7 +287,7 @@ bool Session::interpret(RLP const& _r)
else
{
disconnect(BadProtocol);
return false;
return true;
}
auto ep = bi::tcp::endpoint(peerAddress, _r[i][1].toInt<short>());
NodeId id = _r[i][2].toHash<NodeId>();
@ -356,10 +349,11 @@ bool Session::interpret(RLP const& _r)
}
}
}
catch (...)
catch (std::exception const& _e)
{
clogS(NetWarn) << "Peer causing an exception:" << _e.what();
disconnect(BadProtocol);
return false;
return true;
}
return true;
}
@ -386,7 +380,7 @@ void Session::sealAndSend(RLPStream& _s)
bytes b;
_s.swapOut(b);
m_server->seal(b);
sendDestroy(b);
send(move(b));
}
bool Session::checkPacket(bytesConstRef _msg)
@ -404,42 +398,26 @@ bool Session::checkPacket(bytesConstRef _msg)
return true;
}
void Session::sendDestroy(bytes& _msg)
void Session::send(bytesConstRef _msg)
{
clogS(NetLeft) << RLP(bytesConstRef(&_msg).cropped(8));
if (!checkPacket(bytesConstRef(&_msg)))
{
clogS(NetWarn) << "INVALID PACKET CONSTRUCTED!";
}
bytes buffer = bytes(std::move(_msg));
writeImpl(buffer);
send(_msg.toBytes());
}
void Session::send(bytesConstRef _msg)
void Session::send(bytes&& _msg)
{
clogS(NetLeft) << RLP(_msg.cropped(8));
if (!checkPacket(_msg))
{
clogS(NetWarn) << "INVALID PACKET CONSTRUCTED!";
}
clogS(NetLeft) << RLP(bytesConstRef(&_msg).cropped(8));
bytes buffer = bytes(_msg.toBytes());
writeImpl(buffer);
}
if (!checkPacket(bytesConstRef(&_msg)))
clogS(NetWarn) << "INVALID PACKET CONSTRUCTED!";
void Session::writeImpl(bytes& _buffer)
{
// cerr << (void*)this << " writeImpl" << endl;
if (!m_socket.is_open())
return;
bool doWrite = false;
{
lock_guard<mutex> l(m_writeLock);
m_writeQueue.push_back(_buffer);
Guard l(x_writeQueue);
m_writeQueue.push_back(_msg);
doWrite = (m_writeQueue.size() == 1);
}
@ -453,18 +431,16 @@ void Session::write()
auto self(shared_from_this());
ba::async_write(m_socket, ba::buffer(bytes), [this, self](boost::system::error_code ec, std::size_t /*length*/)
{
// cerr << (void*)this << " write.callback" << endl;
// must check queue, as write callback can occur following dropped()
if (ec)
{
clogS(NetWarn) << "Error sending: " << ec.message();
dropped();
drop(TCPError);
return;
}
else
{
lock_guard<mutex> l(m_writeLock);
Guard l(x_writeQueue);
m_writeQueue.pop_front();
if (m_writeQueue.empty())
return;
@ -473,37 +449,43 @@ void Session::write()
});
}
void Session::dropped()
void Session::drop(DisconnectReason _reason)
{
// cerr << (void*)this << " dropped" << endl;
if (m_dropped)
return;
cerr << (void*)this << " dropped" << endl;
if (m_socket.is_open())
try
{
clogS(NetConnect) << "Closing " << m_socket.remote_endpoint();
clogS(NetConnect) << "Closing " << m_socket.remote_endpoint() << "(" << reasonOf(_reason) << ")";
m_socket.close();
}
catch (...) {}
}
void Session::disconnect(DisconnectReason _reason)
{
clogS(NetConnect) << "Disconnecting (reason:" << reasonOf((DisconnectReason)_reason) << ")";
if (m_node)
{
if (_reason != m_node->lastDisconnect || _reason == NoDisconnect || _reason == ClientQuit || _reason == DisconnectRequested)
m_node->failedAttempts = 0;
m_node->lastDisconnect = _reason;
if (_reason == BadProtocol)
{
m_node->rating /= 2;
m_node->score /= 2;
}
}
m_dropped = true;
}
void Session::disconnect(DisconnectReason _reason)
{
clogS(NetConnect) << "Disconnecting (our reason:" << reasonOf(_reason) << ")";
if (m_socket.is_open())
{
if (m_disconnect == chrono::steady_clock::time_point::max())
{
RLPStream s;
prep(s, DisconnectPacket, 1) << _reason;
sealAndSend(s);
m_disconnect = chrono::steady_clock::now();
}
else
dropped();
RLPStream s;
prep(s, DisconnectPacket, 1) << (int)_reason;
sealAndSend(s);
}
drop(_reason);
}
void Session::start()
@ -523,7 +505,7 @@ void Session::start()
void Session::doRead()
{
// ignore packets received while waiting to disconnect
if (chrono::steady_clock::now() - m_disconnect > chrono::seconds(0))
if (m_dropped)
return;
auto self(shared_from_this());
@ -534,7 +516,7 @@ void Session::doRead()
{
// got here with length of 1241...
clogS(NetWarn) << "Error reading: " << ec.message();
dropped();
drop(TCPError);
}
else if (ec && length == 0)
{
@ -575,8 +557,8 @@ void Session::doRead()
RLP r(data.cropped(8));
if (!interpret(r))
{
// error
dropped();
// error - bad protocol
disconnect(BadProtocol);
return;
}
}
@ -589,12 +571,12 @@ void Session::doRead()
catch (Exception const& _e)
{
clogS(NetWarn) << "ERROR: " << diagnostic_information(_e);
dropped();
drop(BadProtocol);
}
catch (std::exception const& _e)
{
clogS(NetWarn) << "ERROR: " << _e.what();
dropped();
drop(BadProtocol);
}
}
});

50
libp2p/Session.h

@ -30,6 +30,7 @@
#include <libdevcore/Common.h>
#include <libdevcore/RLP.h>
#include <libdevcore/RangeMask.h>
#include <libdevcore/Guards.h>
#include "Common.h"
namespace dev
@ -72,7 +73,7 @@ public:
static RLPStream& prep(RLPStream& _s, PacketType _t, unsigned _args = 0);
static RLPStream& prep(RLPStream& _s);
void sealAndSend(RLPStream& _s);
void sendDestroy(bytes& _msg);
void send(bytes&& _msg);
void send(bytesConstRef _msg);
int rating() const;
@ -86,43 +87,48 @@ public:
void serviceNodesRequest();
private:
void dropped();
/// Drop the connection for the reason @a _r.
void drop(DisconnectReason _r);
/// Perform a read on the socket.
void doRead();
void doWrite(std::size_t length);
/// The
void writeImpl(bytes& _buffer);
/// Perform a single round of the write operation. This could end up calling itself asynchronously.
void write();
/// Interpret an incoming message.
bool interpret(RLP const& _r);
/// @returns true iff the _msg forms a valid message for sending or receiving on the network.
static bool checkPacket(bytesConstRef _msg);
Host* m_server;
Host* m_server; ///< The host that owns us. Never null.
std::mutex m_writeLock;
std::deque<bytes> m_writeQueue;
mutable bi::tcp::socket m_socket; ///< Socket for the peer's connection. Mutable to ask for native_handle().
Mutex x_writeQueue; ///< Mutex for the write queue.
std::deque<bytes> m_writeQueue; ///< The write queue.
std::array<byte, 65536> m_data; ///< Data buffer for the write queue.
bytes m_incoming; ///< The incoming read queue of bytes.
mutable bi::tcp::socket m_socket; ///< Mutable to ask for native_handle().
std::array<byte, 65536> m_data;
PeerInfo m_info;
PeerInfo m_info; ///< Dyanamic information about this peer.
bytes m_incoming;
unsigned m_protocolVersion;
std::shared_ptr<Node> m_node;
bi::tcp::endpoint m_manualEndpoint;
bool m_force = false; /// If true, ignore IDs being different. This could open you up to MitM attacks.
unsigned m_protocolVersion = 0; ///< The protocol version of the peer.
std::shared_ptr<Node> m_node; ///< The Node object. Might be null if we constructed using a bare address/port.
bi::tcp::endpoint m_manualEndpoint; ///< The endpoint as specified by the constructor.
bool m_force = false; ///< If true, ignore IDs being different. This could open you up to MitM attacks.
bool m_dropped = false; ///< If true, we've already divested ourselves of this peer. We're just waiting for the reads & writes to fail before the shared_ptr goes OOS and the destructor kicks in.
bool m_theyRequestedNodes = false;
bool m_weRequestedNodes = false;
bool m_theyRequestedNodes = false; ///< Has the peer requested nodes from us without receiveing an answer from us?
bool m_weRequestedNodes = false; ///< Have we requested nodes from the peer and not received an answer yet?
std::chrono::steady_clock::time_point m_ping;
std::chrono::steady_clock::time_point m_connect;
std::chrono::steady_clock::time_point m_disconnect;
std::chrono::steady_clock::time_point m_ping; ///< Time point of last ping.
std::chrono::steady_clock::time_point m_connect; ///< Time point of connection.
std::map<CapDesc, std::shared_ptr<Capability>> m_capabilities;
std::map<CapDesc, std::shared_ptr<Capability>> m_capabilities; ///< The peer's capability set.
RangeMask<unsigned> m_knownNodes; ///< Nodes we already know about as indices into Host's nodesList. These shouldn't be resent to peer.
bool m_willBeDeleted = false; ///< True if we already posted a deleter on the strand.
};
}

Loading…
Cancel
Save