Browse Source

More network fixes.

cl-refactor
Gav Wood 10 years ago
parent
commit
01eecf58b6
  1. 34
      alethzero/Main.ui
  2. 26
      alethzero/MainWin.cpp
  3. 2
      libdevcrypto/MemoryDB.h
  4. 2
      libdevcrypto/TrieDB.h
  5. 5
      libp2p/Common.cpp
  6. 20
      libp2p/Common.h
  7. 47
      libp2p/Host.cpp
  8. 23
      libp2p/Host.h
  9. 85
      libp2p/Session.cpp
  10. 9
      libp2p/Session.h
  11. 5
      libwebthree/WebThree.h

34
alethzero/Main.ui

@ -1480,6 +1480,40 @@ font-size: 14pt</string>
</layout>
</widget>
</widget>
<widget class="QDockWidget" name="dockWidget_13">
<property name="features">
<set>QDockWidget::DockWidgetFeatureMask</set>
</property>
<property name="windowTitle">
<string>Nodes</string>
</property>
<attribute name="dockWidgetArea">
<number>2</number>
</attribute>
<widget class="QWidget" name="dockWidgetContents_13">
<layout class="QHBoxLayout" name="horizontalLayout_10">
<property name="leftMargin">
<number>0</number>
</property>
<property name="topMargin">
<number>0</number>
</property>
<property name="rightMargin">
<number>0</number>
</property>
<property name="bottomMargin">
<number>0</number>
</property>
<item>
<widget class="QListWidget" name="nodes">
<property name="frameShape">
<enum>QFrame::NoFrame</enum>
</property>
</widget>
</item>
</layout>
</widget>
</widget>
<action name="quit">
<property name="text">
<string>&amp;Quit</string>

26
alethzero/MainWin.cpp

@ -750,11 +750,33 @@ void Main::refreshBalances()
void Main::refreshNetwork()
{
auto ps = web3()->peers();
ui->peerCount->setText(QString::fromStdString(toString(ps.size())) + " peer(s)");
ui->peers->clear();
for (PeerInfo const& i: ps)
ui->peers->addItem(QString("[%7] %3 ms - %1:%2 - %4 %5 %6").arg(i.host.c_str()).arg(i.port).arg(chrono::duration_cast<chrono::milliseconds>(i.lastPing).count()).arg(i.clientVersion.c_str()).arg(QString::fromStdString(toString(i.caps))).arg(QString::fromStdString(toString(i.notes))).arg(i.socket));
ui->peers->addItem(QString("[%8 %7] %3 ms - %1:%2 - %4 %5 %6")
.arg(QString::fromStdString(i.host))
.arg(i.port)
.arg(chrono::duration_cast<chrono::milliseconds>(i.lastPing).count())
.arg(QString::fromStdString(i.clientVersion))
.arg(QString::fromStdString(toString(i.caps)))
.arg(QString::fromStdString(toString(i.notes)))
.arg(i.socket)
.arg(QString::fromStdString(i.id.abridged())));
auto ns = web3()->nodes();
ui->nodes->clear();
for (p2p::Node const& i: ns)
ui->nodes->addItem(QString("[%1%3] %2 - ( =%5s | /%4s | %6 | %7x ) - *%8 $%9")
.arg(QString::fromStdString(i.id.abridged()))
.arg(QString::fromStdString(toString(i.address)))
.arg(i.id == web3()->id() ? " self" : i.isOffline() ? "" : " PEER")
.arg(i.secondsSinceLastAttempted())
.arg(i.secondsSinceLastConnected())
.arg(QString::fromStdString(reasonOf(i.lastDisconnect)))
.arg(i.failedAttempts)
.arg(i.rating)
.arg((int)i.idOrigin)
);
}
void Main::refreshAll()

2
libdevcrypto/MemoryDB.h

@ -32,7 +32,7 @@ namespace dev
namespace eth
{
struct DBChannel: public LogChannel { static const char* name() { return "TDB"; } static const int verbosity = 12; };
struct DBChannel: public LogChannel { static const char* name() { return "TDB"; } static const int verbosity = 18; };
#define dbdebug clog(DBChannel)

2
libdevcrypto/TrieDB.h

@ -42,7 +42,7 @@ namespace dev
namespace eth
{
struct TrieDBChannel: public LogChannel { static const char* name() { return "-T-"; } static const int verbosity = 6; };
struct TrieDBChannel: public LogChannel { static const char* name() { return "-T-"; } static const int verbosity = 17; };
#define tdebug clog(TrieDBChannel)
struct InvalidTrie: virtual dev::Exception {};

5
libp2p/Common.cpp

@ -64,7 +64,12 @@ std::string p2p::reasonOf(DisconnectReason _r)
case TooManyPeers: return "Peer had too many connections.";
case DuplicatePeer: return "Peer was already connected.";
case IncompatibleProtocol: return "Peer protocol versions are incompatible.";
case NullIdentity: return "Null identity given.";
case ClientQuit: return "Peer is exiting.";
case UnexpectedIdentity: return "Unexpected identity given.";
case LocalIdentity: return "Connected to ourselves.";
case UserReason: return "Subprotocol reason.";
case NoDisconnect: return "(No disconnect has happened.)";
default: return "Unknown reason.";
}
}

20
libp2p/Common.h

@ -87,9 +87,27 @@ enum DisconnectReason
NullIdentity,
ClientQuit,
UnexpectedIdentity,
UserReason = 0x10
LocalIdentity,
UserReason = 0x10,
NoDisconnect = 0xffff
};
inline bool isPermanentProblem(DisconnectReason _r)
{
switch (_r)
{
case DisconnectRequested:
case DuplicatePeer:
case IncompatibleProtocol:
case NullIdentity:
case UnexpectedIdentity:
case LocalIdentity:
return true;
default:
return false;
}
}
/// @returns the string form of the given disconnection reason.
std::string reasonOf(DisconnectReason _r);

47
libp2p/Host.cpp

@ -66,7 +66,6 @@ Host::Host(std::string const& _clientVersion, NetworkPreferences const& _n, bool
m_key(KeyPair::create())
{
populateAddresses();
m_lastPeersRequest = chrono::steady_clock::time_point::min();
clog(NetNote) << "Id:" << id().abridged();
if (_start)
start();
@ -112,7 +111,6 @@ void Host::start()
if (!m_public.address().is_unspecified() && (m_nodes.empty() || m_nodes[m_nodesList[0]]->id != id()))
noteNode(id(), m_public, Origin::Perfect, false);
m_lastPeersRequest = chrono::steady_clock::time_point::min();
clog(NetNote) << "Id:" << id().abridged();
for (auto const& h: m_capabilities)
@ -349,6 +347,10 @@ shared_ptr<Node> Host::noteNode(NodeId _id, bi::tcp::endpoint const& _a, Origin
{
RecursiveGuard l(x_peers);
cnote << "Node:" << _id.abridged() << _a << (_ready ? "ready" : "used") << _oldId.abridged() << (m_nodes.count(_id) ? "[have]" : "[NEW]");
if (!_a.port())
{
cwarn << "PORT IS INVALID!";
}
unsigned i;
if (!m_nodes.count(_id))
{
@ -367,7 +369,7 @@ shared_ptr<Node> Host::noteNode(NodeId _id, bi::tcp::endpoint const& _a, Origin
m_nodesList.push_back(_id);
m_nodes[_id] = make_shared<Node>();
}
m_nodes[_id]->address = m_public;
m_nodes[_id]->address = _a;
m_nodes[_id]->index = i;
m_nodes[_id]->id = _id;
m_nodes[_id]->idOrigin = _o;
@ -390,6 +392,8 @@ shared_ptr<Node> Host::noteNode(NodeId _id, bi::tcp::endpoint const& _a, Origin
cnote << m_nodes[_id]->index << ":" << m_ready;
m_hadNewNodes = true;
return m_nodes[_id];
}
@ -398,7 +402,8 @@ Nodes Host::potentialPeers(RangeMask<unsigned> const& _known)
RecursiveGuard l(x_peers);
Nodes ret;
for (auto i: m_ready - (m_private + _known))
auto ns = (m_netPrefs.localNetworking ? _known : (m_private + _known)).inverted();
for (auto i: ns)
ret.push_back(*m_nodes[m_nodesList[i]]);
return ret;
}
@ -494,7 +499,7 @@ void Host::connect(bi::tcp::endpoint const& _ep)
void Node::connect(Host* _h)
{
clog(NetConnect) << "Attempting connection to node" << id.abridged() << "@" << address;
clog(NetConnect) << "Attempting connection to node" << id.abridged() << "@" << address << "from" << _h->id().abridged();
_h->m_ready -= index;
bi::tcp::socket* s = new bi::tcp::socket(_h->m_ioService);
s->async_connect(address, [=](boost::system::error_code const& ec)
@ -566,23 +571,16 @@ void Host::growPeers()
else
{
ensureAccepting();
if (chrono::steady_clock::now() > m_lastPeersRequest + chrono::seconds(10))
requestPeers();
requestNodes();
}
}
}
void Host::requestPeers()
void Host::requestNodes()
{
RLPStream s;
bytes b;
Session::prep(s, GetPeersPacket).swapOut(b);
seal(b);
for (auto const& i: m_peers)
if (auto p = i.second.lock())
if (p->isOpen())
p->send(&b);
m_lastPeersRequest = chrono::steady_clock::now();
p->ensureNodesRequested();
}
void Host::prunePeers()
@ -637,6 +635,16 @@ void Host::doWork()
{
growPeers();
prunePeers();
if (m_hadNewNodes)
{
for (auto p: m_peers)
if (auto pp = p.second.lock())
pp->serviceNodesRequest();
m_hadNewNodes = false;
}
m_ioService.poll();
}
@ -667,7 +675,7 @@ bytes Host::saveNodes() const
nodes << n.address.port() << n.id << (int)n.idOrigin
<< std::chrono::duration_cast<std::chrono::seconds>(n.lastConnected.time_since_epoch()).count()
<< std::chrono::duration_cast<std::chrono::seconds>(n.lastAttempted.time_since_epoch()).count()
<< n.failedAttempts << n.lastDisconnect << n.score << n.rating;
<< n.failedAttempts << (unsigned)n.lastDisconnect << n.score << n.rating;
count++;
}
}
@ -686,7 +694,11 @@ void Host::restoreNodes(bytesConstRef _b)
switch (r[0].toInt<int>())
{
case 0:
{
auto oldId = id();
m_key = KeyPair(r[1].toHash<Secret>());
noteNode(id(), m_public, Origin::Perfect, false, oldId);
for (auto i: r[2])
{
bi::tcp::endpoint ep;
@ -702,11 +714,12 @@ void Host::restoreNodes(bytesConstRef _b)
n->lastConnected = chrono::system_clock::time_point(chrono::seconds(i[4].toInt<unsigned>()));
n->lastAttempted = chrono::system_clock::time_point(chrono::seconds(i[5].toInt<unsigned>()));
n->failedAttempts = i[6].toInt<unsigned>();
n->lastDisconnect = (int)i[7].toInt<unsigned>();
n->lastDisconnect = (DisconnectReason)i[7].toInt<unsigned>();
n->score = (int)i[8].toInt<unsigned>();
n->rating = (int)i[9].toInt<unsigned>();
}
}
}
default:;
}
else

23
libp2p/Host.h

@ -67,16 +67,19 @@ struct Node
std::chrono::system_clock::time_point lastConnected;
std::chrono::system_clock::time_point lastAttempted;
unsigned failedAttempts = 0;
int lastDisconnect = -1; ///< Reason for disconnect that happened last.
DisconnectReason lastDisconnect = NoDisconnect; ///< Reason for disconnect that happened last.
Origin idOrigin = Origin::Unknown; ///< Thirdparty
Origin idOrigin = Origin::Unknown; ///< How did we get to know this node's id?
bool offline() const { return lastDisconnect == -1 || lastAttempted > lastConnected; }
int secondsSinceLastConnected() const { return lastConnected == std::chrono::system_clock::time_point() ? -1 : (int)std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - lastConnected).count(); }
int secondsSinceLastAttempted() const { return lastAttempted == std::chrono::system_clock::time_point() ? -1 : (int)std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - lastAttempted).count(); }
bool isOffline() const { return lastDisconnect == -1 || lastAttempted > lastConnected; }
bool operator<(Node const& _n) const
{
if (offline() != _n.offline())
return offline();
else if (offline())
if (isOffline() != _n.isOffline())
return isOffline();
else if (isOffline())
if (lastAttempted == _n.lastAttempted)
return failedAttempts < _n.failedAttempts;
else
@ -165,6 +168,8 @@ public:
/// Deserialise the data and populate the set of known peers.
void restoreNodes(bytesConstRef _b);
Nodes nodes() const { RecursiveGuard l(x_peers); Nodes ret; for (auto const& i: m_nodes) ret.push_back(*i.second); return ret; }
void setNetworkPreferences(NetworkPreferences const& _p) { stop(); m_netPrefs = _p; start(); }
void start();
@ -194,7 +199,7 @@ private:
std::shared_ptr<Node> noteNode(NodeId _id, bi::tcp::endpoint const& _a, Origin _o, bool _ready, NodeId _oldId = h256());
Nodes potentialPeers(RangeMask<unsigned> const& _known);
void requestPeers();
void requestNodes();
std::string m_clientVersion; ///< Our version string.
@ -211,6 +216,8 @@ private:
bi::tcp::endpoint m_public; ///< Our public listening endpoint.
KeyPair m_key; ///< Our unique ID.
bool m_hadNewNodes = false;
mutable RecursiveMutex x_peers;
/// The nodes to which we are currently connected.
@ -227,8 +234,6 @@ private:
RangeMask<unsigned> m_ready; ///< Indices into m_nodesList over to which nodes we are not currently connected, connecting or otherwise ignoring.
RangeMask<unsigned> m_private; ///< Indices into m_nodesList over to which nodes are private.
std::chrono::steady_clock::time_point m_lastPeersRequest; ///< Last time we asked for some peers - don't want to do this too often. TODO: peers should be pushed, not polled.
unsigned m_idealPeerCount = 5; ///< Ideal number of peers to be connected to.
// Our addresses.

85
libp2p/Session.cpp

@ -62,8 +62,8 @@ Session::Session(Host* _s, bi::tcp::socket _socket, std::shared_ptr<Node> const&
Session::~Session()
{
if (id())
m_server->noteNode(id(), m_manualEndpoint, Origin::Unknown, true);
if (id() && !isPermanentProblem(m_node->lastDisconnect))
m_server->m_ready += m_node->index;
// Read-chain finished for one reason or another.
for (auto& i: m_capabilities)
@ -125,6 +125,47 @@ template <class T> vector<T> randomSelection(vector<T> const& _t, unsigned _n)
return ret;
}
void Session::ensureNodesRequested()
{
if (isOpen() && !m_weRequestedNodes)
{
m_weRequestedNodes = true;
RLPStream s;
sealAndSend(prep(s, GetPeersPacket));
}
}
void Session::serviceNodesRequest()
{
if (!m_theyRequestedNodes)
return;
auto peers = m_server->potentialPeers(m_knownNodes);
if (peers.empty())
{
addNote("peers", "requested");
return;
}
// note this should cost them...
RLPStream s;
prep(s, PeersPacket, min<unsigned>(10, peers.size()));
auto rs = randomSelection(peers, 10);
for (auto const& i: rs)
{
clogS(NetTriviaDetail) << "Sending peer " << i.id.abridged() << i.address;
if (i.address.address().is_v4())
s.appendList(3) << bytesConstRef(i.address.address().to_v4().to_bytes().data(), 4) << i.address.port() << i.id;
else// if (i.second.address().is_v6()) - assumed
s.appendList(3) << bytesConstRef(i.address.address().to_v6().to_bytes().data(), 16) << i.address.port() << i.id;
m_knownNodes.extendAll(i.index);
m_knownNodes.unionWith(i.index);
}
sealAndSend(s);
m_theyRequestedNodes = false;
addNote("peers", "done");
}
bool Session::interpret(RLP const& _r)
{
clogS(NetRight) << _r;
@ -136,7 +177,7 @@ bool Session::interpret(RLP const& _r)
case HelloPacket:
{
if (m_node)
m_node->lastDisconnect = -1;
m_node->lastDisconnect = NoDisconnect;
m_protocolVersion = _r[1].toInt<unsigned>();
auto clientVersion = _r[2].toString();
@ -152,6 +193,14 @@ bool Session::interpret(RLP const& _r)
clogS(NetMessageSummary) << "Hello: " << clientVersion << "V[" << m_protocolVersion << "]" << id.abridged() << showbase << capslog.str() << dec << listenPort;
if (m_server->id() == id)
{
// Already connected.
clogS(NetWarn) << "Connected to ourself under a false pretext. We were told this peer was id" << m_info.id.abridged();
disconnect(LocalIdentity);
return false;
}
if (m_server->havePeer(id))
{
// Already connected.
@ -221,27 +270,13 @@ bool Session::interpret(RLP const& _r)
case GetPeersPacket:
{
clogS(NetTriviaSummary) << "GetPeers";
auto peers = m_server->potentialPeers(m_knownNodes);
if (peers.empty())
break;
RLPStream s;
prep(s, PeersPacket, min<unsigned>(10, peers.size()));
auto rs = randomSelection(peers, 10);
for (auto const& i: rs)
{
clogS(NetTriviaDetail) << "Sending peer " << i.id.abridged() << i.address;
if (i.address.address().is_v4())
s.appendList(3) << bytesConstRef(i.address.address().to_v4().to_bytes().data(), 4) << i.address.port() << i.id;
else// if (i.second.address().is_v6()) - assumed
s.appendList(3) << bytesConstRef(i.address.address().to_v6().to_bytes().data(), 16) << i.address.port() << i.id;
m_knownNodes.extendAll(i.index);
m_knownNodes.unionWith(i.index);
}
sealAndSend(s);
m_theyRequestedNodes = true;
serviceNodesRequest();
break;
}
case PeersPacket:
clogS(NetTriviaSummary) << "Peers (" << dec << (_r.itemCount() - 1) << " entries)";
m_weRequestedNodes = false;
for (unsigned i = 1; i < _r.itemCount(); ++i)
{
bi::address peerAddress;
@ -329,12 +364,6 @@ void Session::ping()
m_ping = std::chrono::steady_clock::now();
}
void Session::getPeers()
{
RLPStream s;
sealAndSend(prep(s, GetPeersPacket));
}
RLPStream& Session::prep(RLPStream& _s, PacketType _id, unsigned _args)
{
return prep(_s).appendList(_args + 1).append((unsigned)_id);
@ -449,7 +478,7 @@ void Session::dropped()
catch (...) {}
}
void Session::disconnect(int _reason)
void Session::disconnect(DisconnectReason _reason)
{
clogS(NetConnect) << "Disconnecting (reason:" << reasonOf((DisconnectReason)_reason) << ")";
@ -481,8 +510,6 @@ void Session::start()
<< m_server->id();
sealAndSend(s);
ping();
getPeers();
doRead();
}

9
libp2p/Session.h

@ -55,7 +55,7 @@ public:
virtual ~Session();
void start();
void disconnect(int _reason);
void disconnect(DisconnectReason _reason);
void ping();
@ -82,6 +82,9 @@ public:
PeerInfo const& info() const { return m_info; }
void ensureNodesRequested();
void serviceNodesRequest();
private:
void dropped();
void doRead();
@ -89,7 +92,6 @@ private:
void writeImpl(bytes& _buffer);
void write();
void getPeers();
bool interpret(RLP const& _r);
/// @returns true iff the _msg forms a valid message for sending or receiving on the network.
@ -110,6 +112,9 @@ private:
bi::tcp::endpoint m_manualEndpoint;
bool m_force = false; /// If true, ignore IDs being different. This could open you up to MitM attacks.
bool m_theyRequestedNodes = false;
bool m_weRequestedNodes = false;
std::chrono::steady_clock::time_point m_ping;
std::chrono::steady_clock::time_point m_connect;
std::chrono::steady_clock::time_point m_disconnect;

5
libwebthree/WebThree.h

@ -108,6 +108,11 @@ public:
void setNetworkPreferences(p2p::NetworkPreferences const& _n) { auto had = haveNetwork(); if (had) stopNetwork(); m_net.setNetworkPreferences(_n); if (had) startNetwork(); }
p2p::NodeId id() const { return m_net.id(); }
/// Gets the nodes.
p2p::Nodes nodes() const { return m_net.nodes(); }
/// Start the network subsystem.
void startNetwork() { m_net.start(); }

Loading…
Cancel
Save