From e82f8db2de4a3fa90c4710dfc40adc6a9114f342 Mon Sep 17 00:00:00 2001 From: subtly Date: Sat, 20 Jun 2015 07:24:03 -0400 Subject: [PATCH 01/10] Prep for #2179. Abstract parsing of frame header from Session into struct. Catch unhandled exceptions thrown by ASIO. --- libp2p/Common.h | 11 +- libp2p/Host.cpp | 14 +- libp2p/RLPXFrameCoder.cpp | 12 ++ libp2p/RLPXFrameCoder.h | 14 ++ libp2p/Session.cpp | 337 +++++++++++++++++++------------------- libp2p/Session.h | 10 +- test/libp2p/net.cpp | 10 +- 7 files changed, 225 insertions(+), 183 deletions(-) diff --git a/libp2p/Common.h b/libp2p/Common.h index 4a1b64b70..445ba0cca 100644 --- a/libp2p/Common.h +++ b/libp2p/Common.h @@ -149,14 +149,15 @@ using CapDescs = std::vector; */ struct PeerSessionInfo { - NodeId id; - std::string clientVersion; - std::string host; - unsigned short port; + NodeId const id; + std::string const clientVersion; + std::string const host; + unsigned short const port; std::chrono::steady_clock::duration lastPing; - std::set caps; + std::set const caps; unsigned socketId; std::map notes; + unsigned const protocolVersion; }; using PeerSessionInfos = std::vector; diff --git a/libp2p/Host.cpp b/libp2p/Host.cpp index feb116c4a..c9c2743ee 100644 --- a/libp2p/Host.cpp +++ b/libp2p/Host.cpp @@ -254,7 +254,7 @@ void Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameCoder* clog(NetMessageSummary) << "Hello: " << clientVersion << "V[" << protocolVersion << "]" << _id << showbase << capslog.str() << dec << listenPort; // create session so disconnects are managed - auto ps = make_shared(this, _io, _s, p, PeerSessionInfo({_id, clientVersion, p->endpoint.address.to_string(), listenPort, chrono::steady_clock::duration(), _rlp[2].toSet(), 0, map()})); + auto ps = make_shared(this, _io, _s, p, PeerSessionInfo({_id, clientVersion, p->endpoint.address.to_string(), listenPort, chrono::steady_clock::duration(), _rlp[2].toSet(), 0, map(), protocolVersion})); if (protocolVersion < dev::p2p::c_protocolVersion - 1) { ps->disconnect(IncompatibleProtocol); @@ -724,8 +724,16 @@ void Host::startedWorking() void Host::doWork() { - if (m_run) - m_ioService.run(); + try + { + if (m_run) + m_ioService.run(); + } + catch (std::exception const& _e) + { + clog(NetP2PWarn) << "Exception in Network Thread:" << _e.what(); + clog(NetP2PWarn) << "Network Restart is Recommended."; + } } void Host::keepAlivePeers() diff --git a/libp2p/RLPXFrameCoder.cpp b/libp2p/RLPXFrameCoder.cpp index c4bb46814..193c45511 100644 --- a/libp2p/RLPXFrameCoder.cpp +++ b/libp2p/RLPXFrameCoder.cpp @@ -29,6 +29,18 @@ using namespace dev; using namespace dev::p2p; using namespace CryptoPP; +RLPXFrameInfo::RLPXFrameInfo(bytesConstRef _header) +{ + length = (_header[0] * 256 + _header[1]) * 256 + _header[2]; + padding = ((16 - (length % 16)) % 16); + RLP header(_header.cropped(3), RLP::ThrowOnFail | RLP::FailIfTooSmall); + auto itemCount = header.itemCount(); + protocolId = header[0].toInt(); + hasSequence = itemCount > 1; + sequenceId = hasSequence ? header[1].toInt() : 0; + totalLength = itemCount == 3 ? header[2].toInt() : 0; +} + RLPXFrameCoder::RLPXFrameCoder(RLPXHandshake const& _init) { // we need: diff --git a/libp2p/RLPXFrameCoder.h b/libp2p/RLPXFrameCoder.h index 7c5eedbff..3964326ff 100644 --- a/libp2p/RLPXFrameCoder.h +++ b/libp2p/RLPXFrameCoder.h @@ -32,7 +32,21 @@ namespace dev { namespace p2p { + +struct RLPXFrameInfo +{ + RLPXFrameInfo() = default; + /// Constructor. frame-size || protocol-type, [sequence-id[, total-packet-size]] + RLPXFrameInfo(bytesConstRef _frameHeader); + uint32_t length = 0; ///< Max: 2**24 + uint8_t padding = 0; + uint16_t protocolId = 0; + bool hasSequence = false; + uint16_t sequenceId = 0; + uint32_t totalLength = 0; +}; + class RLPXHandshake; /** diff --git a/libp2p/Session.cpp b/libp2p/Session.cpp index c3f0f2e35..95fd4a3d6 100644 --- a/libp2p/Session.cpp +++ b/libp2p/Session.cpp @@ -27,7 +27,6 @@ #include #include #include -#include "RLPxHandshake.h" #include "Host.h" #include "Capability.h" using namespace std; @@ -156,111 +155,25 @@ void Session::serviceNodesRequest() addNote("peers", "done"); } -bool Session::interpret(PacketType _t, RLP const& _r) +bool Session::frameReceived(uint16_t _capId, PacketType _t, RLP const& _r) { m_lastReceived = chrono::steady_clock::now(); - clog(NetRight) << _t << _r; try // Generic try-catch block designed to capture RLP format errors - TODO: give decent diagnostics, make a bit more specific over what is caught. { - switch (_t) - { - case DisconnectPacket: - { - string reason = "Unspecified"; - auto r = (DisconnectReason)_r[0].toInt(); - if (!_r[0].isInt()) - drop(BadProtocol); - else - { - reason = reasonOf(r); - clog(NetMessageSummary) << "Disconnect (reason: " << reason << ")"; - drop(DisconnectRequested); - } - break; - } - case PingPacket: - { - clog(NetTriviaSummary) << "Ping"; - RLPStream s; - sealAndSend(prep(s, PongPacket)); - break; - } - case PongPacket: - m_info.lastPing = std::chrono::steady_clock::now() - m_ping; - clog(NetTriviaSummary) << "Latency: " << chrono::duration_cast(m_info.lastPing).count() << " ms"; - break; - case GetPeersPacket: - // Disabled for interop testing. - // GetPeers/PeersPacket will be modified to only exchange new nodes which it's peers are interested in. - break; - - clog(NetTriviaSummary) << "GetPeers"; - m_theyRequestedNodes = true; - serviceNodesRequest(); - break; - case PeersPacket: - // Disabled for interop testing. - // GetPeers/PeersPacket will be modified to only exchange new nodes which it's peers are interested in. - break; - - clog(NetTriviaSummary) << "Peers (" << dec << (_r.itemCount() - 1) << " entries)"; - m_weRequestedNodes = false; - for (unsigned i = 0; i < _r.itemCount(); ++i) - { - bi::address peerAddress; - if (_r[i][0].size() == 16) - peerAddress = bi::address_v6(_r[i][0].toHash>().asArray()); - else if (_r[i][0].size() == 4) - peerAddress = bi::address_v4(_r[i][0].toHash>().asArray()); - else - { - cwarn << "Received bad peer packet:" << _r; - disconnect(BadProtocol); - return true; - } - auto ep = bi::tcp::endpoint(peerAddress, _r[i][1].toInt()); - NodeId id = _r[i][2].toHash(); - - clog(NetAllDetail) << "Checking: " << ep << "(" << id << ")"; - - if (!isPublicAddress(peerAddress)) - goto CONTINUE; // Private address. Ignore. - - if (!id) - goto LAMEPEER; // Null identity. Ignore. - - if (m_server->id() == id) - goto LAMEPEER; // Just our info - we already have that. - - if (id == this->id()) - goto LAMEPEER; // Just their info - we already have that. - - if (!ep.port()) - goto LAMEPEER; // Zero port? Don't think so. - - if (ep.port() >= /*49152*/32768) - goto LAMEPEER; // Private port according to IANA. - - // OK passed all our checks. Assume it's good. - addRating(1000); - m_server->addNode(id, NodeIPEndpoint(ep.address(), ep.port(), ep.port())); - clog(NetTriviaDetail) << "New peer: " << ep << "(" << id << ")"; - CONTINUE:; - LAMEPEER:; - } - break; - default: + // v4 frame headers are useless, offset packet type used + // v5 protocol type is in header, packet type not offset + if (_capId == 0 && _t < UserPacket) + return interpret(_t, _r); + if (m_info.protocolVersion >= 5) + for (auto const& i: m_capabilities) + if (_capId == (uint16_t)i.first.second) + return i.second->m_enabled ? i.second->interpret(_t, _r) : true; + if (m_info.protocolVersion <= 4) for (auto const& i: m_capabilities) if (_t >= (int)i.second->m_idOffset && _t - i.second->m_idOffset < i.second->hostCapability()->messageCount()) - { - if (i.second->m_enabled) - return i.second->interpret(_t - i.second->m_idOffset, _r); - else - return true; - } - return false; - } + return i.second->m_enabled ? i.second->interpret(_t - i.second->m_idOffset, _r) : true; + return false; } catch (std::exception const& _e) { @@ -271,6 +184,101 @@ bool Session::interpret(PacketType _t, RLP const& _r) return true; } +bool Session::interpret(PacketType _t, RLP const& _r) +{ + switch (_t) + { + case DisconnectPacket: + { + string reason = "Unspecified"; + auto r = (DisconnectReason)_r[0].toInt(); + if (!_r[0].isInt()) + drop(BadProtocol); + else + { + reason = reasonOf(r); + clog(NetMessageSummary) << "Disconnect (reason: " << reason << ")"; + drop(DisconnectRequested); + } + break; + } + case PingPacket: + { + clog(NetTriviaSummary) << "Ping"; + RLPStream s; + sealAndSend(prep(s, PongPacket)); + break; + } + case PongPacket: + m_info.lastPing = std::chrono::steady_clock::now() - m_ping; + clog(NetTriviaSummary) << "Latency: " << chrono::duration_cast(m_info.lastPing).count() << " ms"; + break; + case GetPeersPacket: + // Disabled for interop testing. + // GetPeers/PeersPacket will be modified to only exchange new nodes which it's peers are interested in. + break; + + clog(NetTriviaSummary) << "GetPeers"; + m_theyRequestedNodes = true; + serviceNodesRequest(); + break; + case PeersPacket: + // Disabled for interop testing. + // GetPeers/PeersPacket will be modified to only exchange new nodes which it's peers are interested in. + break; + + clog(NetTriviaSummary) << "Peers (" << dec << (_r.itemCount() - 1) << " entries)"; + m_weRequestedNodes = false; + for (unsigned i = 0; i < _r.itemCount(); ++i) + { + bi::address peerAddress; + if (_r[i][0].size() == 16) + peerAddress = bi::address_v6(_r[i][0].toHash>().asArray()); + else if (_r[i][0].size() == 4) + peerAddress = bi::address_v4(_r[i][0].toHash>().asArray()); + else + { + cwarn << "Received bad peer packet:" << _r; + disconnect(BadProtocol); + return true; + } + auto ep = bi::tcp::endpoint(peerAddress, _r[i][1].toInt()); + NodeId id = _r[i][2].toHash(); + + clog(NetAllDetail) << "Checking: " << ep << "(" << id << ")"; + + if (!isPublicAddress(peerAddress)) + goto CONTINUE; // Private address. Ignore. + + if (!id) + goto LAMEPEER; // Null identity. Ignore. + + if (m_server->id() == id) + goto LAMEPEER; // Just our info - we already have that. + + if (id == this->id()) + goto LAMEPEER; // Just their info - we already have that. + + if (!ep.port()) + goto LAMEPEER; // Zero port? Don't think so. + + if (ep.port() >= /*49152*/32768) + goto LAMEPEER; // Private port according to IANA. + + // OK passed all our checks. Assume it's good. + addRating(1000); + m_server->addNode(id, NodeIPEndpoint(ep.address(), ep.port(), ep.port())); + clog(NetTriviaDetail) << "New peer: " << ep << "(" << id << ")"; + CONTINUE:; + LAMEPEER:; + } + break; + default: + return false; + } + return true; +} + void Session::ping() { RLPStream s; @@ -292,12 +300,9 @@ void Session::sealAndSend(RLPStream& _s) bool Session::checkPacket(bytesConstRef _msg) { - if (_msg.size() < 2) + if (_msg[0] > 0x7f || _msg.size() < 2) return false; - if (_msg[0] > 0x7f) - return false; - RLP r(_msg.cropped(1)); - if (r.actualSize() + 1 != _msg.size()) + if (RLP(_msg.cropped(1)).actualSize() + 1 != _msg.size()) return false; return true; } @@ -413,82 +418,78 @@ void Session::doRead() { ThreadContext tc(info().id.abridged()); ThreadContext tc2(info().clientVersion); - if (ec && ec.category() != boost::asio::error::get_misc_category() && ec.value() != boost::asio::error::eof) + if (!checkRead(h256::size, ec, length)) + return; + else if (!m_io->authAndDecryptHeader(bytesRef(m_data.data(), length))) { - clog(NetWarn) << "Error reading: " << ec.message(); - drop(TCPError); + clog(NetWarn) << "header decrypt failed"; + drop(BadProtocol); // todo: better error + return; + } + + RLPXFrameInfo header; + try + { + header = RLPXFrameInfo(bytesConstRef(m_data.data(), length)); } - else if (ec && length == 0) + catch (std::exception const& _e) + { + clog(NetWarn) << "Exception decoding frame header RLP:" << bytesConstRef(m_data.data(), h128::size).cropped(3); + drop(BadProtocol); return; - else + } + + /// read padded frame and mac + auto tlen = header.length + header.padding + h128::size; + ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, tlen), [this, self, header, tlen](boost::system::error_code ec, std::size_t length) { - /// authenticate and decrypt header - bytesRef header(m_data.data(), h256::size); - if (!m_io->authAndDecryptHeader(header)) + ThreadContext tc(info().id.abridged()); + ThreadContext tc2(info().clientVersion); + if (!checkRead(tlen, ec, length)) + return; + else if (!m_io->authAndDecryptFrame(bytesRef(m_data.data(), tlen))) { - clog(NetWarn) << "header decrypt failed"; + clog(NetWarn) << "frame decrypt failed"; drop(BadProtocol); // todo: better error return; } - /// check frame size - uint32_t frameSize = (m_data[0] * 256 + m_data[1]) * 256 + m_data[2]; - if (frameSize >= (uint32_t)1 << 24) + bytesConstRef frame(m_data.data(), header.length); + if (!checkPacket(frame)) { - clog(NetWarn) << "frame size too large"; - drop(BadProtocol); + cerr << "Received " << frame.size() << ": " << toHex(frame) << endl; + clog(NetWarn) << "INVALID MESSAGE RECEIVED"; + disconnect(BadProtocol); return; } - - /// rlp of header has protocol-type, sequence-id[, total-packet-size] - bytes headerRLP(13); - bytesConstRef(m_data.data(), h128::size).cropped(3).copyTo(&headerRLP); - - /// read padded frame and mac - auto tlen = frameSize + ((16 - (frameSize % 16)) % 16) + h128::size; - ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, tlen), [this, self, headerRLP, frameSize, tlen](boost::system::error_code ec, std::size_t length) + else { - ThreadContext tc(info().id.abridged()); - ThreadContext tc2(info().clientVersion); - if (ec && ec.category() != boost::asio::error::get_misc_category() && ec.value() != boost::asio::error::eof) - { - clog(NetWarn) << "Error reading: " << ec.message(); - drop(TCPError); - } - else if (ec && length < tlen) - { - clog(NetWarn) << "Error reading - Abrupt peer disconnect: " << ec.message(); - repMan().noteRude(*this); - drop(TCPError); - return; - } - else - { - if (!m_io->authAndDecryptFrame(bytesRef(m_data.data(), tlen))) - { - clog(NetWarn) << "frame decrypt failed"; - drop(BadProtocol); // todo: better error - return; - } - - bytesConstRef frame(m_data.data(), frameSize); - if (!checkPacket(frame)) - { - cerr << "Received " << frame.size() << ": " << toHex(frame) << endl; - clog(NetWarn) << "INVALID MESSAGE RECEIVED"; - disconnect(BadProtocol); - return; - } - else - { - auto packetType = (PacketType)RLP(frame.cropped(0, 1)).toInt(); - RLP r(frame.cropped(1)); - if (!interpret(packetType, r)) - clog(NetWarn) << "Couldn't interpret packet." << RLP(r); - } - doRead(); - } - }); - } + auto packetType = (PacketType)RLP(frame.cropped(0, 1)).toInt(); + RLP r(frame.cropped(1)); + if (!frameReceived(header.protocolId, packetType, r)) + clog(NetWarn) << "Couldn't interpret packet." << RLP(r); + } + doRead(); + }); }); } + +bool Session::checkRead(std::size_t _expected, boost::system::error_code _ec, std::size_t _length) +{ + if (_ec && _ec.category() != boost::asio::error::get_misc_category() && _ec.value() != boost::asio::error::eof) + { + clog(NetConnect) << "Error reading: " << _ec.message(); + drop(TCPError); + return false; + } + else if (_ec && _length < _expected) + { + clog(NetWarn) << "Error reading - Abrupt peer disconnect: " << _ec.message(); + repMan().noteRude(*this); + drop(TCPError); + return false; + } + // If this fails then there's an unhandled asio error + assert(_expected == _length); + return true; +} diff --git a/libp2p/Session.h b/libp2p/Session.h index 6b45fe381..ec079cd1d 100644 --- a/libp2p/Session.h +++ b/libp2p/Session.h @@ -96,13 +96,19 @@ private: /// Perform a read on the socket. void doRead(); + + /// Check error code after reading and drop peer if error code. + bool checkRead(std::size_t _expected, boost::system::error_code _ec, std::size_t _length); /// Perform a single round of the write operation. This could end up calling itself asynchronously. void write(); - /// Interpret an incoming message. - bool interpret(PacketType _t, RLP const& _r); + /// Deliver RLPX packet to Session or Capability for interpretation. + bool frameReceived(uint16_t _capId, PacketType _t, RLP const& _r); + /// Interpret an incoming Session packet. + bool interpret(PacketType _t, RLP const& _r); + /// @returns true iff the _msg forms a valid message for sending or receiving on the network. static bool checkPacket(bytesConstRef _msg); diff --git a/test/libp2p/net.cpp b/test/libp2p/net.cpp index 8c652b479..1e3e2e15c 100644 --- a/test/libp2p/net.cpp +++ b/test/libp2p/net.cpp @@ -314,23 +314,23 @@ BOOST_AUTO_TEST_CASE(kademlia) node.nodeTable->discover(); // ideally, joining with empty node table logs warning we can check for node.setup(); node.populate(); - clog << "NodeTable:\n" << *node.nodeTable.get() << endl; +// clog << "NodeTable:\n" << *node.nodeTable.get() << endl; node.populateAll(); - clog << "NodeTable:\n" << *node.nodeTable.get() << endl; +// clog << "NodeTable:\n" << *node.nodeTable.get() << endl; auto nodes = node.nodeTable->nodes(); nodes.sort(); node.nodeTable->reset(); - clog << "NodeTable:\n" << *node.nodeTable.get() << endl; +// clog << "NodeTable:\n" << *node.nodeTable.get() << endl; node.populate(1); - clog << "NodeTable:\n" << *node.nodeTable.get() << endl; +// clog << "NodeTable:\n" << *node.nodeTable.get() << endl; node.nodeTable->discover(); this_thread::sleep_for(chrono::milliseconds(2000)); - clog << "NodeTable:\n" << *node.nodeTable.get() << endl; +// clog << "NodeTable:\n" << *node.nodeTable.get() << endl; BOOST_REQUIRE_EQUAL(node.nodeTable->count(), 8); From 78467d3ae0af55b3c8c96a9e3a779ae21bc2d0c3 Mon Sep 17 00:00:00 2001 From: subtly Date: Sat, 20 Jun 2015 07:31:20 -0400 Subject: [PATCH 02/10] small rename --- libp2p/Session.cpp | 4 ++-- libp2p/Session.h | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/libp2p/Session.cpp b/libp2p/Session.cpp index 95fd4a3d6..f24377fb2 100644 --- a/libp2p/Session.cpp +++ b/libp2p/Session.cpp @@ -155,7 +155,7 @@ void Session::serviceNodesRequest() addNote("peers", "done"); } -bool Session::frameReceived(uint16_t _capId, PacketType _t, RLP const& _r) +bool Session::readPacket(uint16_t _capId, PacketType _t, RLP const& _r) { m_lastReceived = chrono::steady_clock::now(); clog(NetRight) << _t << _r; @@ -466,7 +466,7 @@ void Session::doRead() { auto packetType = (PacketType)RLP(frame.cropped(0, 1)).toInt(); RLP r(frame.cropped(1)); - if (!frameReceived(header.protocolId, packetType, r)) + if (!readPacket(header.protocolId, packetType, r)) clog(NetWarn) << "Couldn't interpret packet." << RLP(r); } doRead(); diff --git a/libp2p/Session.h b/libp2p/Session.h index ec079cd1d..ba46c5a16 100644 --- a/libp2p/Session.h +++ b/libp2p/Session.h @@ -104,7 +104,7 @@ private: void write(); /// Deliver RLPX packet to Session or Capability for interpretation. - bool frameReceived(uint16_t _capId, PacketType _t, RLP const& _r); + bool readPacket(uint16_t _capId, PacketType _t, RLP const& _r); /// Interpret an incoming Session packet. bool interpret(PacketType _t, RLP const& _r); From d1b22891b26e3abb0e02499f743fd90c169d172e Mon Sep 17 00:00:00 2001 From: Lefteris Karapetsas Date: Mon, 29 Jun 2015 14:14:35 +0200 Subject: [PATCH 03/10] Remove legacy code in blockchain.cpp --- libethereum/BlockChain.cpp | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/libethereum/BlockChain.cpp b/libethereum/BlockChain.cpp index d58a80ec2..da491c774 100644 --- a/libethereum/BlockChain.cpp +++ b/libethereum/BlockChain.cpp @@ -24,6 +24,7 @@ #if ETH_PROFILING_GPERF #include #endif +#include #include #include #include @@ -291,15 +292,6 @@ void BlockChain::rebuild(std::string const& _path, std::function -bool contains(T const& _t, V const& _v) -{ - for (auto const& i: _t) - if (i == _v) - return true; - return false; -} - LastHashes BlockChain::lastHashes(unsigned _n) const { Guard l(x_lastLastHashes); @@ -950,7 +942,7 @@ void BlockChain::checkConsistency() if (p != h256() && p != m_genesisHash) // TODO: for some reason the genesis details with the children get squished. not sure why. { auto dp = details(p); - if (asserts(contains(dp.children, h))) + if (asserts(end(dp.children) != find(begin(dp.children), end(dp.children), h))) { cnote << "Apparently the database is corrupt. Not much we can do at this stage..."; } From b9b193647750d3a2c11f2dbad459e06c99074406 Mon Sep 17 00:00:00 2001 From: Lefteris Karapetsas Date: Tue, 30 Jun 2015 20:36:06 +0200 Subject: [PATCH 04/10] contains() gets moved to CommonData.h - It also now uses std::find() --- libdevcore/CommonData.h | 6 ++++++ libethereum/BlockChain.cpp | 4 ++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/libdevcore/CommonData.h b/libdevcore/CommonData.h index 57360e95a..ed09e60ee 100644 --- a/libdevcore/CommonData.h +++ b/libdevcore/CommonData.h @@ -334,4 +334,10 @@ std::vector keysOf(std::unordered_map const& _m) return ret; } +template +bool contains(T const& _t, V const& _v) +{ + return std::end(_t) != std::find(std::begin(_t), std::end(_t), _v); +} + } diff --git a/libethereum/BlockChain.cpp b/libethereum/BlockChain.cpp index da491c774..4d05ea7db 100644 --- a/libethereum/BlockChain.cpp +++ b/libethereum/BlockChain.cpp @@ -24,7 +24,7 @@ #if ETH_PROFILING_GPERF #include #endif -#include + #include #include #include @@ -942,7 +942,7 @@ void BlockChain::checkConsistency() if (p != h256() && p != m_genesisHash) // TODO: for some reason the genesis details with the children get squished. not sure why. { auto dp = details(p); - if (asserts(end(dp.children) != find(begin(dp.children), end(dp.children), h))) + if (asserts(contains(dp.children, h))) { cnote << "Apparently the database is corrupt. Not much we can do at this stage..."; } From a586e2cafe30ca0cd8c1a88dabbe1d01cf5623ad Mon Sep 17 00:00:00 2001 From: arkpar Date: Tue, 30 Jun 2015 23:04:07 +0200 Subject: [PATCH 05/10] blocks no longer stuck in verifying state --- libethereum/BlockQueue.cpp | 41 ++++++++++++++++++++------------------ libethereum/BlockQueue.h | 1 + 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/libethereum/BlockQueue.cpp b/libethereum/BlockQueue.cpp index 074278d9a..30ae65ed2 100644 --- a/libethereum/BlockQueue.cpp +++ b/libethereum/BlockQueue.cpp @@ -114,15 +114,11 @@ void BlockQueue::verifierBody() catch (...) { // bad block. - { - // has to be this order as that's how invariants() assumes. - WriteGuard l2(m_lock); - unique_lock l(m_verification); - m_readySet.erase(work.hash); - m_knownBad.insert(work.hash); - } - + // has to be this order as that's how invariants() assumes. + WriteGuard l2(m_lock); unique_lock l(m_verification); + m_readySet.erase(work.hash); + m_knownBad.insert(work.hash); for (auto it = m_verifying.begin(); it != m_verifying.end(); ++it) if (it->verified.info.mixHash == work.hash) { @@ -131,6 +127,7 @@ void BlockQueue::verifierBody() } cwarn << "BlockQueue missing our job: was there a GM?"; OK1:; + drainVerified(); continue; } @@ -149,17 +146,8 @@ void BlockQueue::verifierBody() } else m_verified.emplace_back(move(res)); - while (m_verifying.size() && !m_verifying.front().blockData.empty()) - { - if (m_knownBad.count(m_verifying.front().verified.info.parentHash)) - { - m_readySet.erase(m_verifying.front().verified.info.hash()); - m_knownBad.insert(res.verified.info.hash()); - } - else - m_verified.emplace_back(move(m_verifying.front())); - m_verifying.pop_front(); - } + + drainVerified(); ready = true; } else @@ -179,6 +167,21 @@ void BlockQueue::verifierBody() } } +void BlockQueue::drainVerified() +{ + while (m_verifying.size() && !m_verifying.front().blockData.empty()) + { + if (m_knownBad.count(m_verifying.front().verified.info.parentHash)) + { + m_readySet.erase(m_verifying.front().verified.info.hash()); + m_knownBad.insert(m_verifying.front().verified.info.hash()); + } + else + m_verified.emplace_back(move(m_verifying.front())); + m_verifying.pop_front(); + } +} + ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc, bool _isOurs) { // Check if we already know this block. diff --git a/libethereum/BlockQueue.h b/libethereum/BlockQueue.h index 137048ec4..82547cd56 100644 --- a/libethereum/BlockQueue.h +++ b/libethereum/BlockQueue.h @@ -134,6 +134,7 @@ private: bool invariants() const override; void verifierBody(); + void drainVerified(); void collectUnknownBad(h256 const& _bad); void updateBad(h256 const& _bad); From 7a5b4db728135bc7ea8784617159a224a6c4ea3c Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Wed, 1 Jul 2015 09:13:55 +0200 Subject: [PATCH 06/10] Minor renaming in BlockQueue, remove unnecessary invariant check, add additional one. --- libethash-cl/ethash_cl_miner.cpp | 1 + libethereum/BlockQueue.cpp | 19 +++++++++---------- libethereum/BlockQueue.h | 4 ++-- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/libethash-cl/ethash_cl_miner.cpp b/libethash-cl/ethash_cl_miner.cpp index ec400f7a7..b4ef12cb5 100644 --- a/libethash-cl/ethash_cl_miner.cpp +++ b/libethash-cl/ethash_cl_miner.cpp @@ -495,6 +495,7 @@ void ethash_cl_miner::search(uint8_t const* header, uint64_t target, search_hook pending.pop(); } + (void)_msPerBatch; /* chrono::high_resolution_clock::duration d = chrono::high_resolution_clock::now() - t; if (d > chrono::milliseconds(_msPerBatch * 10 / 9)) { diff --git a/libethereum/BlockQueue.cpp b/libethereum/BlockQueue.cpp index 9e647c9c1..4eeeac066 100644 --- a/libethereum/BlockQueue.cpp +++ b/libethereum/BlockQueue.cpp @@ -242,7 +242,7 @@ ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc, boo if (m_knownBad.count(bi.parentHash)) { m_knownBad.insert(bi.hash()); - updateBad(bi.hash()); + updateBad_WITH_LOCK(bi.hash()); // bad parent; this is bad too, note it as such return ImportResult::BadChain; } @@ -277,12 +277,12 @@ ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc, boo } } -void BlockQueue::updateBad(h256 const& _bad) +void BlockQueue::updateBad_WITH_LOCK(h256 const& _bad) { DEV_INVARIANT_CHECK; DEV_GUARDED(m_verification) { - collectUnknownBad(_bad); + collectUnknownBad_WITH_BOTH_LOCKS(_bad); bool moreBad = true; while (moreBad) { @@ -294,7 +294,7 @@ void BlockQueue::updateBad(h256 const& _bad) { m_knownBad.insert(b.verified.info.hash()); m_readySet.erase(b.verified.info.hash()); - collectUnknownBad(b.verified.info.hash()); + collectUnknownBad_WITH_BOTH_LOCKS(b.verified.info.hash()); moreBad = true; } else @@ -307,7 +307,7 @@ void BlockQueue::updateBad(h256 const& _bad) { m_knownBad.insert(b.hash); m_readySet.erase(b.hash); - collectUnknownBad(b.hash); + collectUnknownBad_WITH_BOTH_LOCKS(b.hash); moreBad = true; } else @@ -321,18 +321,18 @@ void BlockQueue::updateBad(h256 const& _bad) h256 const& h = b.blockData.size() != 0 ? b.verified.info.hash() : b.verified.info.mixHash; m_knownBad.insert(h); m_readySet.erase(h); - collectUnknownBad(h); + collectUnknownBad_WITH_BOTH_LOCKS(h); moreBad = true; } else m_verifying.push_back(std::move(b)); } } - DEV_INVARIANT_CHECK; } -void BlockQueue::collectUnknownBad(h256 const& _bad) +void BlockQueue::collectUnknownBad_WITH_BOTH_LOCKS(h256 const& _bad) { + DEV_INVARIANT_CHECK; list badQueue(1, _bad); while (!badQueue.empty()) { @@ -349,7 +349,6 @@ void BlockQueue::collectUnknownBad(h256 const& _bad) } m_unknown.erase(r.first, r.second); } - } bool BlockQueue::doneDrain(h256s const& _bad) @@ -364,7 +363,7 @@ bool BlockQueue::doneDrain(h256s const& _bad) // at least one of them was bad. m_knownBad += _bad; for (h256 const& b : _bad) - updateBad(b); + updateBad_WITH_LOCK(b); } return !m_readySet.empty(); } diff --git a/libethereum/BlockQueue.h b/libethereum/BlockQueue.h index 137048ec4..e42b6a3bd 100644 --- a/libethereum/BlockQueue.h +++ b/libethereum/BlockQueue.h @@ -134,8 +134,8 @@ private: bool invariants() const override; void verifierBody(); - void collectUnknownBad(h256 const& _bad); - void updateBad(h256 const& _bad); + void collectUnknownBad_WITH_BOTH_LOCKS(h256 const& _bad); + void updateBad_WITH_LOCK(h256 const& _bad); mutable boost::shared_mutex m_lock; ///< General lock for the sets, m_future and m_unknown. h256Hash m_drainingSet; ///< All blocks being imported. From 3c78f0f9519f02a794b562e4873eacad075f134d Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Wed, 1 Jul 2015 09:42:03 +0200 Subject: [PATCH 07/10] Better logging/descriptive exceptions on invariants. --- libdevcore/Common.cpp | 9 ++++++--- libdevcore/Common.h | 16 +++++++++------- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/libdevcore/Common.cpp b/libdevcore/Common.cpp index e33936102..5450f9311 100644 --- a/libdevcore/Common.cpp +++ b/libdevcore/Common.cpp @@ -32,10 +32,13 @@ char const* Version = "0.9.27"; const u256 UndefinedU256 = ~(u256)0; -void HasInvariants::checkInvariants() const +void InvariantChecker::checkInvariants() const { - if (!invariants()) - BOOST_THROW_EXCEPTION(FailedInvariant()); + if (!m_this->invariants()) + { + cwarn << "Invariant failed in" << m_function << "at" << m_file << ":" << m_line; + ::boost::exception_detail::throw_exception_(FailedInvariant(), m_function, m_file, m_line); + } } struct TimerChannel: public LogChannel { static const char* name(); static const int verbosity = 0; }; diff --git a/libdevcore/Common.h b/libdevcore/Common.h index f1d35bbc7..a3ab374a6 100644 --- a/libdevcore/Common.h +++ b/libdevcore/Common.h @@ -163,10 +163,6 @@ private: class HasInvariants { public: - /// Check invariants are met, throw if not. - void checkInvariants() const; - -protected: /// Reimplement to specify the invariants. virtual bool invariants() const = 0; }; @@ -175,16 +171,22 @@ protected: class InvariantChecker { public: - InvariantChecker(HasInvariants* _this): m_this(_this) { m_this->checkInvariants(); } - ~InvariantChecker() { m_this->checkInvariants(); } + InvariantChecker(HasInvariants* _this, char const* _fn, char const* _file, int _line): m_this(_this), m_function(_fn), m_file(_file), m_line(_line) { checkInvariants(); } + ~InvariantChecker() { checkInvariants(); } private: + /// Check invariants are met, throw if not. + void checkInvariants() const; + HasInvariants const* m_this; + char const* m_function; + char const* m_file; + int m_line; }; /// Scope guard for invariant check in a class derived from HasInvariants. #if ETH_DEBUG -#define DEV_INVARIANT_CHECK { ::dev::InvariantChecker __dev_invariantCheck(this); } +#define DEV_INVARIANT_CHECK { ::dev::InvariantChecker __dev_invariantCheck(this, BOOST_THROW_EXCEPTION_CURRENT_FUNCTION, __FILE__, __LINE__); } #else #define DEV_INVARIANT_CHECK (void)0; #endif From 7a0f3350b6f13a47640eaff414679bf1f9494187 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Wed, 1 Jul 2015 11:05:45 +0200 Subject: [PATCH 08/10] Fix importing. --- eth/main.cpp | 1 + libethereum/BlockQueue.cpp | 1 + libethereum/Client.cpp | 3 ++- 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/eth/main.cpp b/eth/main.cpp index 4361598d4..0ccba6fca 100644 --- a/eth/main.cpp +++ b/eth/main.cpp @@ -827,6 +827,7 @@ int main(int argc, char** argv) cout << i << " more imported at " << (round(i * 10 / d) / 10) << " blocks/s. " << imported << " imported in " << e << " seconds at " << (round(imported * 10 / e) / 10) << " blocks/s (#" << web3.ethereum()->number() << ")" << endl; last = (unsigned)e; lastImported = imported; +// cout << web3.ethereum()->blockQueueStatus() << endl; } } diff --git a/libethereum/BlockQueue.cpp b/libethereum/BlockQueue.cpp index deeae9773..eb2cfbcab 100644 --- a/libethereum/BlockQueue.cpp +++ b/libethereum/BlockQueue.cpp @@ -181,6 +181,7 @@ void BlockQueue::verifierBody() ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc, bool _isOurs) { + cdebug << std::this_thread::get_id(); // Check if we already know this block. h256 h = BlockInfo::headerHash(_block); diff --git a/libethereum/Client.cpp b/libethereum/Client.cpp index 2b0a64491..bb7e6d993 100644 --- a/libethereum/Client.cpp +++ b/libethereum/Client.cpp @@ -89,13 +89,14 @@ void VersionChecker::setOk() ImportResult Client::queueBlock(bytes const& _block, bool _isSafe) { - if (m_bq.status().verified + m_bq.status().verifying + m_bq.status().unverified > 30000) + if (m_bq.status().verified + m_bq.status().verifying + m_bq.status().unverified > 500) this_thread::sleep_for(std::chrono::milliseconds(500)); return m_bq.import(&_block, bc(), _isSafe); } tuple Client::syncQueue(unsigned _max) { + stopWorking(); return m_bc.sync(m_bq, m_stateDB, _max); } From 7ba71da2806ac8e9fbf58b65bc6435660a3c4b0b Mon Sep 17 00:00:00 2001 From: arkpar Date: Wed, 1 Jul 2015 11:47:34 +0200 Subject: [PATCH 09/10] style --- libethereum/BlockQueue.cpp | 8 ++++---- libethereum/BlockQueue.h | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/libethereum/BlockQueue.cpp b/libethereum/BlockQueue.cpp index 30ae65ed2..344ee0609 100644 --- a/libethereum/BlockQueue.cpp +++ b/libethereum/BlockQueue.cpp @@ -127,7 +127,7 @@ void BlockQueue::verifierBody() } cwarn << "BlockQueue missing our job: was there a GM?"; OK1:; - drainVerified(); + drainVerified_WITH_BOTH_LOCKS(); continue; } @@ -147,7 +147,7 @@ void BlockQueue::verifierBody() else m_verified.emplace_back(move(res)); - drainVerified(); + drainVerified_WITH_BOTH_LOCKS(); ready = true; } else @@ -167,9 +167,9 @@ void BlockQueue::verifierBody() } } -void BlockQueue::drainVerified() +void BlockQueue::drainVerified_WITH_BOTH_LOCKS() { - while (m_verifying.size() && !m_verifying.front().blockData.empty()) + while (!m_verifying.empty() && !m_verifying.front().blockData.empty()) { if (m_knownBad.count(m_verifying.front().verified.info.parentHash)) { diff --git a/libethereum/BlockQueue.h b/libethereum/BlockQueue.h index 82547cd56..18446c613 100644 --- a/libethereum/BlockQueue.h +++ b/libethereum/BlockQueue.h @@ -134,7 +134,7 @@ private: bool invariants() const override; void verifierBody(); - void drainVerified(); + void drainVerified_WITH_BOTH_LOCKS(); void collectUnknownBad(h256 const& _bad); void updateBad(h256 const& _bad); From 91452da11d0828844267e7ce95f99807a08ae2b2 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Wed, 1 Jul 2015 12:02:17 +0200 Subject: [PATCH 10/10] Minor optimisations. --- libethereum/BlockChain.cpp | 2 +- libethereum/Client.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/libethereum/BlockChain.cpp b/libethereum/BlockChain.cpp index 4d05ea7db..47725e088 100644 --- a/libethereum/BlockChain.cpp +++ b/libethereum/BlockChain.cpp @@ -325,7 +325,7 @@ tuple BlockChain::sync(BlockQueue& _bq, OverlayDB c { // Nonce & uncle nonces already verified in verification thread at this point. ImportRoute r; - DEV_TIMED_ABOVE("Block import", 500) + DEV_TIMED_ABOVE("Block import " + toString(block.verified.info.number), 500) r = import(block.verified, _stateDB, ImportRequirements::Default & ~ImportRequirements::ValidNonce & ~ImportRequirements::CheckUncles); fresh += r.liveBlocks; dead += r.deadBlocks; diff --git a/libethereum/Client.cpp b/libethereum/Client.cpp index bb7e6d993..c7d7c60c4 100644 --- a/libethereum/Client.cpp +++ b/libethereum/Client.cpp @@ -89,7 +89,7 @@ void VersionChecker::setOk() ImportResult Client::queueBlock(bytes const& _block, bool _isSafe) { - if (m_bq.status().verified + m_bq.status().verifying + m_bq.status().unverified > 500) + if (m_bq.status().verified + m_bq.status().verifying + m_bq.status().unverified > 10000) this_thread::sleep_for(std::chrono::milliseconds(500)); return m_bq.import(&_block, bc(), _isSafe); }