Browse Source

Merge branch 'develop' of https://github.com/ethereum/cpp-ethereum into bc_rf

cl-refactor
arkpar 10 years ago
parent
commit
99f95f1667
  1. 1
      eth/main.cpp
  2. 9
      libdevcore/Common.cpp
  3. 16
      libdevcore/Common.h
  4. 6
      libdevcore/CommonData.h
  5. 1
      libethash-cl/ethash_cl_miner.cpp
  6. 12
      libethereum/BlockChain.cpp
  7. 61
      libethereum/BlockQueue.cpp
  8. 5
      libethereum/BlockQueue.h
  9. 3
      libethereum/Client.cpp
  10. 11
      libp2p/Common.h
  11. 14
      libp2p/Host.cpp
  12. 12
      libp2p/RLPXFrameCoder.cpp
  13. 14
      libp2p/RLPXFrameCoder.h
  14. 286
      libp2p/Session.cpp
  15. 8
      libp2p/Session.h
  16. 10
      test/libp2p/net.cpp

1
eth/main.cpp

@ -827,6 +827,7 @@ int main(int argc, char** argv)
cout << i << " more imported at " << (round(i * 10 / d) / 10) << " blocks/s. " << imported << " imported in " << e << " seconds at " << (round(imported * 10 / e) / 10) << " blocks/s (#" << web3.ethereum()->number() << ")" << endl; cout << i << " more imported at " << (round(i * 10 / d) / 10) << " blocks/s. " << imported << " imported in " << e << " seconds at " << (round(imported * 10 / e) / 10) << " blocks/s (#" << web3.ethereum()->number() << ")" << endl;
last = (unsigned)e; last = (unsigned)e;
lastImported = imported; lastImported = imported;
// cout << web3.ethereum()->blockQueueStatus() << endl;
} }
} }

9
libdevcore/Common.cpp

@ -32,10 +32,13 @@ char const* Version = "0.9.27";
const u256 UndefinedU256 = ~(u256)0; const u256 UndefinedU256 = ~(u256)0;
void HasInvariants::checkInvariants() const void InvariantChecker::checkInvariants() const
{ {
if (!invariants()) if (!m_this->invariants())
BOOST_THROW_EXCEPTION(FailedInvariant()); {
cwarn << "Invariant failed in" << m_function << "at" << m_file << ":" << m_line;
::boost::exception_detail::throw_exception_(FailedInvariant(), m_function, m_file, m_line);
}
} }
struct TimerChannel: public LogChannel { static const char* name(); static const int verbosity = 0; }; struct TimerChannel: public LogChannel { static const char* name(); static const int verbosity = 0; };

16
libdevcore/Common.h

@ -163,10 +163,6 @@ private:
class HasInvariants class HasInvariants
{ {
public: public:
/// Check invariants are met, throw if not.
void checkInvariants() const;
protected:
/// Reimplement to specify the invariants. /// Reimplement to specify the invariants.
virtual bool invariants() const = 0; virtual bool invariants() const = 0;
}; };
@ -175,16 +171,22 @@ protected:
class InvariantChecker class InvariantChecker
{ {
public: public:
InvariantChecker(HasInvariants* _this): m_this(_this) { m_this->checkInvariants(); } InvariantChecker(HasInvariants* _this, char const* _fn, char const* _file, int _line): m_this(_this), m_function(_fn), m_file(_file), m_line(_line) { checkInvariants(); }
~InvariantChecker() { m_this->checkInvariants(); } ~InvariantChecker() { checkInvariants(); }
private: private:
/// Check invariants are met, throw if not.
void checkInvariants() const;
HasInvariants const* m_this; HasInvariants const* m_this;
char const* m_function;
char const* m_file;
int m_line;
}; };
/// Scope guard for invariant check in a class derived from HasInvariants. /// Scope guard for invariant check in a class derived from HasInvariants.
#if ETH_DEBUG #if ETH_DEBUG
#define DEV_INVARIANT_CHECK { ::dev::InvariantChecker __dev_invariantCheck(this); } #define DEV_INVARIANT_CHECK { ::dev::InvariantChecker __dev_invariantCheck(this, BOOST_THROW_EXCEPTION_CURRENT_FUNCTION, __FILE__, __LINE__); }
#else #else
#define DEV_INVARIANT_CHECK (void)0; #define DEV_INVARIANT_CHECK (void)0;
#endif #endif

6
libdevcore/CommonData.h

@ -334,4 +334,10 @@ std::vector<T> keysOf(std::unordered_map<T, U> const& _m)
return ret; return ret;
} }
template <class T, class V>
bool contains(T const& _t, V const& _v)
{
return std::end(_t) != std::find(std::begin(_t), std::end(_t), _v);
}
} }

1
libethash-cl/ethash_cl_miner.cpp

@ -497,6 +497,7 @@ void ethash_cl_miner::search(uint8_t const* header, uint64_t target, search_hook
pending.pop(); pending.pop();
} }
(void)_msPerBatch;
/* chrono::high_resolution_clock::duration d = chrono::high_resolution_clock::now() - t; /* chrono::high_resolution_clock::duration d = chrono::high_resolution_clock::now() - t;
if (d > chrono::milliseconds(_msPerBatch * 10 / 9)) if (d > chrono::milliseconds(_msPerBatch * 10 / 9))
{ {

12
libethereum/BlockChain.cpp

@ -24,6 +24,7 @@
#if ETH_PROFILING_GPERF #if ETH_PROFILING_GPERF
#include <gperftools/profiler.h> #include <gperftools/profiler.h>
#endif #endif
#include <boost/timer.hpp> #include <boost/timer.hpp>
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
#include <test/JsonSpiritHeaders.h> #include <test/JsonSpiritHeaders.h>
@ -291,15 +292,6 @@ void BlockChain::rebuild(std::string const& _path, std::function<void(unsigned,
boost::filesystem::remove_all(path + "/details.old"); boost::filesystem::remove_all(path + "/details.old");
} }
template <class T, class V>
bool contains(T const& _t, V const& _v)
{
for (auto const& i: _t)
if (i == _v)
return true;
return false;
}
LastHashes BlockChain::lastHashes(unsigned _n) const LastHashes BlockChain::lastHashes(unsigned _n) const
{ {
Guard l(x_lastLastHashes); Guard l(x_lastLastHashes);
@ -333,7 +325,7 @@ tuple<ImportRoute, bool, unsigned> BlockChain::sync(BlockQueue& _bq, OverlayDB c
{ {
// Nonce & uncle nonces already verified in verification thread at this point. // Nonce & uncle nonces already verified in verification thread at this point.
ImportRoute r; ImportRoute r;
DEV_TIMED_ABOVE("Block import", 500) DEV_TIMED_ABOVE("Block import " + toString(block.verified.info.number), 500)
r = import(block.verified, _stateDB, ImportRequirements::Default & ~ImportRequirements::ValidNonce & ~ImportRequirements::CheckUncles); r = import(block.verified, _stateDB, ImportRequirements::Default & ~ImportRequirements::ValidNonce & ~ImportRequirements::CheckUncles);
fresh += r.liveBlocks; fresh += r.liveBlocks;
dead += r.deadBlocks; dead += r.deadBlocks;

61
libethereum/BlockQueue.cpp

@ -115,15 +115,11 @@ void BlockQueue::verifierBody()
catch (...) catch (...)
{ {
// bad block. // bad block.
{ // has to be this order as that's how invariants() assumes.
// has to be this order as that's how invariants() assumes. WriteGuard l2(m_lock);
WriteGuard l2(m_lock);
unique_lock<Mutex> l(m_verification);
m_readySet.erase(work.hash);
m_knownBad.insert(work.hash);
}
unique_lock<Mutex> l(m_verification); unique_lock<Mutex> l(m_verification);
m_readySet.erase(work.hash);
m_knownBad.insert(work.hash);
for (auto it = m_verifying.begin(); it != m_verifying.end(); ++it) for (auto it = m_verifying.begin(); it != m_verifying.end(); ++it)
if (it->verified.info.mixHash == work.hash) if (it->verified.info.mixHash == work.hash)
{ {
@ -132,6 +128,7 @@ void BlockQueue::verifierBody()
} }
cwarn << "BlockQueue missing our job: was there a GM?"; cwarn << "BlockQueue missing our job: was there a GM?";
OK1:; OK1:;
drainVerified_WITH_BOTH_LOCKS();
continue; continue;
} }
@ -150,17 +147,8 @@ void BlockQueue::verifierBody()
} }
else else
m_verified.emplace_back(move(res)); m_verified.emplace_back(move(res));
while (m_verifying.size() && !m_verifying.front().blockData.empty())
{ drainVerified_WITH_BOTH_LOCKS();
if (m_knownBad.count(m_verifying.front().verified.info.parentHash))
{
m_readySet.erase(m_verifying.front().verified.info.hash());
m_knownBad.insert(res.verified.info.hash());
}
else
m_verified.emplace_back(move(m_verifying.front()));
m_verifying.pop_front();
}
ready = true; ready = true;
} }
else else
@ -180,8 +168,24 @@ void BlockQueue::verifierBody()
} }
} }
void BlockQueue::drainVerified_WITH_BOTH_LOCKS()
{
while (!m_verifying.empty() && !m_verifying.front().blockData.empty())
{
if (m_knownBad.count(m_verifying.front().verified.info.parentHash))
{
m_readySet.erase(m_verifying.front().verified.info.hash());
m_knownBad.insert(m_verifying.front().verified.info.hash());
}
else
m_verified.emplace_back(move(m_verifying.front()));
m_verifying.pop_front();
}
}
ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc, bool _isOurs) ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc, bool _isOurs)
{ {
cdebug << std::this_thread::get_id();
// Check if we already know this block. // Check if we already know this block.
h256 h = BlockInfo::headerHash(_block); h256 h = BlockInfo::headerHash(_block);
@ -243,7 +247,7 @@ ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc, boo
if (m_knownBad.count(bi.parentHash)) if (m_knownBad.count(bi.parentHash))
{ {
m_knownBad.insert(bi.hash()); m_knownBad.insert(bi.hash());
updateBad(bi.hash()); updateBad_WITH_LOCK(bi.hash());
// bad parent; this is bad too, note it as such // bad parent; this is bad too, note it as such
return ImportResult::BadChain; return ImportResult::BadChain;
} }
@ -278,12 +282,12 @@ ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc, boo
} }
} }
void BlockQueue::updateBad(h256 const& _bad) void BlockQueue::updateBad_WITH_LOCK(h256 const& _bad)
{ {
DEV_INVARIANT_CHECK; DEV_INVARIANT_CHECK;
DEV_GUARDED(m_verification) DEV_GUARDED(m_verification)
{ {
collectUnknownBad(_bad); collectUnknownBad_WITH_BOTH_LOCKS(_bad);
bool moreBad = true; bool moreBad = true;
while (moreBad) while (moreBad)
{ {
@ -295,7 +299,7 @@ void BlockQueue::updateBad(h256 const& _bad)
{ {
m_knownBad.insert(b.verified.info.hash()); m_knownBad.insert(b.verified.info.hash());
m_readySet.erase(b.verified.info.hash()); m_readySet.erase(b.verified.info.hash());
collectUnknownBad(b.verified.info.hash()); collectUnknownBad_WITH_BOTH_LOCKS(b.verified.info.hash());
moreBad = true; moreBad = true;
} }
else else
@ -308,7 +312,7 @@ void BlockQueue::updateBad(h256 const& _bad)
{ {
m_knownBad.insert(b.hash); m_knownBad.insert(b.hash);
m_readySet.erase(b.hash); m_readySet.erase(b.hash);
collectUnknownBad(b.hash); collectUnknownBad_WITH_BOTH_LOCKS(b.hash);
moreBad = true; moreBad = true;
} }
else else
@ -322,18 +326,18 @@ void BlockQueue::updateBad(h256 const& _bad)
h256 const& h = b.blockData.size() != 0 ? b.verified.info.hash() : b.verified.info.mixHash; h256 const& h = b.blockData.size() != 0 ? b.verified.info.hash() : b.verified.info.mixHash;
m_knownBad.insert(h); m_knownBad.insert(h);
m_readySet.erase(h); m_readySet.erase(h);
collectUnknownBad(h); collectUnknownBad_WITH_BOTH_LOCKS(h);
moreBad = true; moreBad = true;
} }
else else
m_verifying.push_back(std::move(b)); m_verifying.push_back(std::move(b));
} }
} }
DEV_INVARIANT_CHECK;
} }
void BlockQueue::collectUnknownBad(h256 const& _bad) void BlockQueue::collectUnknownBad_WITH_BOTH_LOCKS(h256 const& _bad)
{ {
DEV_INVARIANT_CHECK;
list<h256> badQueue(1, _bad); list<h256> badQueue(1, _bad);
while (!badQueue.empty()) while (!badQueue.empty())
{ {
@ -350,7 +354,6 @@ void BlockQueue::collectUnknownBad(h256 const& _bad)
} }
m_unknown.erase(r.first, r.second); m_unknown.erase(r.first, r.second);
} }
} }
bool BlockQueue::doneDrain(h256s const& _bad) bool BlockQueue::doneDrain(h256s const& _bad)
@ -365,7 +368,7 @@ bool BlockQueue::doneDrain(h256s const& _bad)
// at least one of them was bad. // at least one of them was bad.
m_knownBad += _bad; m_knownBad += _bad;
for (h256 const& b : _bad) for (h256 const& b : _bad)
updateBad(b); updateBad_WITH_LOCK(b);
} }
return !m_readySet.empty(); return !m_readySet.empty();
} }

5
libethereum/BlockQueue.h

@ -134,8 +134,9 @@ private:
bool invariants() const override; bool invariants() const override;
void verifierBody(); void verifierBody();
void collectUnknownBad(h256 const& _bad); void collectUnknownBad_WITH_BOTH_LOCKS(h256 const& _bad);
void updateBad(h256 const& _bad); void updateBad_WITH_LOCK(h256 const& _bad);
void drainVerified_WITH_BOTH_LOCKS();
mutable boost::shared_mutex m_lock; ///< General lock for the sets, m_future and m_unknown. mutable boost::shared_mutex m_lock; ///< General lock for the sets, m_future and m_unknown.
h256Hash m_drainingSet; ///< All blocks being imported. h256Hash m_drainingSet; ///< All blocks being imported.

3
libethereum/Client.cpp

@ -89,13 +89,14 @@ void VersionChecker::setOk()
ImportResult Client::queueBlock(bytes const& _block, bool _isSafe) ImportResult Client::queueBlock(bytes const& _block, bool _isSafe)
{ {
if (m_bq.status().verified + m_bq.status().verifying + m_bq.status().unverified > 30000) if (m_bq.status().verified + m_bq.status().verifying + m_bq.status().unverified > 10000)
this_thread::sleep_for(std::chrono::milliseconds(500)); this_thread::sleep_for(std::chrono::milliseconds(500));
return m_bq.import(&_block, bc(), _isSafe); return m_bq.import(&_block, bc(), _isSafe);
} }
tuple<ImportRoute, bool, unsigned> Client::syncQueue(unsigned _max) tuple<ImportRoute, bool, unsigned> Client::syncQueue(unsigned _max)
{ {
stopWorking();
return m_bc.sync(m_bq, m_stateDB, _max); return m_bc.sync(m_bq, m_stateDB, _max);
} }

11
libp2p/Common.h

@ -149,14 +149,15 @@ using CapDescs = std::vector<CapDesc>;
*/ */
struct PeerSessionInfo struct PeerSessionInfo
{ {
NodeId id; NodeId const id;
std::string clientVersion; std::string const clientVersion;
std::string host; std::string const host;
unsigned short port; unsigned short const port;
std::chrono::steady_clock::duration lastPing; std::chrono::steady_clock::duration lastPing;
std::set<CapDesc> caps; std::set<CapDesc> const caps;
unsigned socketId; unsigned socketId;
std::map<std::string, std::string> notes; std::map<std::string, std::string> notes;
unsigned const protocolVersion;
}; };
using PeerSessionInfos = std::vector<PeerSessionInfo>; using PeerSessionInfos = std::vector<PeerSessionInfo>;

14
libp2p/Host.cpp

@ -254,7 +254,7 @@ void Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameCoder*
clog(NetMessageSummary) << "Hello: " << clientVersion << "V[" << protocolVersion << "]" << _id << showbase << capslog.str() << dec << listenPort; clog(NetMessageSummary) << "Hello: " << clientVersion << "V[" << protocolVersion << "]" << _id << showbase << capslog.str() << dec << listenPort;
// create session so disconnects are managed // create session so disconnects are managed
auto ps = make_shared<Session>(this, _io, _s, p, PeerSessionInfo({_id, clientVersion, p->endpoint.address.to_string(), listenPort, chrono::steady_clock::duration(), _rlp[2].toSet<CapDesc>(), 0, map<string, string>()})); auto ps = make_shared<Session>(this, _io, _s, p, PeerSessionInfo({_id, clientVersion, p->endpoint.address.to_string(), listenPort, chrono::steady_clock::duration(), _rlp[2].toSet<CapDesc>(), 0, map<string, string>(), protocolVersion}));
if (protocolVersion < dev::p2p::c_protocolVersion - 1) if (protocolVersion < dev::p2p::c_protocolVersion - 1)
{ {
ps->disconnect(IncompatibleProtocol); ps->disconnect(IncompatibleProtocol);
@ -725,8 +725,16 @@ void Host::startedWorking()
void Host::doWork() void Host::doWork()
{ {
if (m_run) try
m_ioService.run(); {
if (m_run)
m_ioService.run();
}
catch (std::exception const& _e)
{
clog(NetP2PWarn) << "Exception in Network Thread:" << _e.what();
clog(NetP2PWarn) << "Network Restart is Recommended.";
}
} }
void Host::keepAlivePeers() void Host::keepAlivePeers()

12
libp2p/RLPXFrameCoder.cpp

@ -29,6 +29,18 @@ using namespace dev;
using namespace dev::p2p; using namespace dev::p2p;
using namespace CryptoPP; using namespace CryptoPP;
RLPXFrameInfo::RLPXFrameInfo(bytesConstRef _header)
{
length = (_header[0] * 256 + _header[1]) * 256 + _header[2];
padding = ((16 - (length % 16)) % 16);
RLP header(_header.cropped(3), RLP::ThrowOnFail | RLP::FailIfTooSmall);
auto itemCount = header.itemCount();
protocolId = header[0].toInt<uint16_t>();
hasSequence = itemCount > 1;
sequenceId = hasSequence ? header[1].toInt<uint16_t>() : 0;
totalLength = itemCount == 3 ? header[2].toInt<uint32_t>() : 0;
}
RLPXFrameCoder::RLPXFrameCoder(RLPXHandshake const& _init) RLPXFrameCoder::RLPXFrameCoder(RLPXHandshake const& _init)
{ {
// we need: // we need:

14
libp2p/RLPXFrameCoder.h

@ -33,6 +33,20 @@ namespace dev
namespace p2p namespace p2p
{ {
struct RLPXFrameInfo
{
RLPXFrameInfo() = default;
/// Constructor. frame-size || protocol-type, [sequence-id[, total-packet-size]]
RLPXFrameInfo(bytesConstRef _frameHeader);
uint32_t length = 0; ///< Max: 2**24
uint8_t padding = 0;
uint16_t protocolId = 0;
bool hasSequence = false;
uint16_t sequenceId = 0;
uint32_t totalLength = 0;
};
class RLPXHandshake; class RLPXHandshake;
/** /**

286
libp2p/Session.cpp

@ -27,7 +27,6 @@
#include <libdevcore/CommonIO.h> #include <libdevcore/CommonIO.h>
#include <libdevcore/StructuredLogger.h> #include <libdevcore/StructuredLogger.h>
#include <libethcore/Exceptions.h> #include <libethcore/Exceptions.h>
#include "RLPxHandshake.h"
#include "Host.h" #include "Host.h"
#include "Capability.h" #include "Capability.h"
using namespace std; using namespace std;
@ -157,114 +156,25 @@ void Session::serviceNodesRequest()
addNote("peers", "done"); addNote("peers", "done");
} }
bool Session::interpret(PacketType _t, RLP const& _r) bool Session::readPacket(uint16_t _capId, PacketType _t, RLP const& _r)
{ {
m_lastReceived = chrono::steady_clock::now(); m_lastReceived = chrono::steady_clock::now();
clog(NetRight) << _t << _r; clog(NetRight) << _t << _r;
try // Generic try-catch block designed to capture RLP format errors - TODO: give decent diagnostics, make a bit more specific over what is caught. try // Generic try-catch block designed to capture RLP format errors - TODO: give decent diagnostics, make a bit more specific over what is caught.
{ {
switch (_t) // v4 frame headers are useless, offset packet type used
{ // v5 protocol type is in header, packet type not offset
case DisconnectPacket: if (_capId == 0 && _t < UserPacket)
{ return interpret(_t, _r);
string reason = "Unspecified"; if (m_info.protocolVersion >= 5)
auto r = (DisconnectReason)_r[0].toInt<int>(); for (auto const& i: m_capabilities)
if (!_r[0].isInt()) if (_capId == (uint16_t)i.first.second)
drop(BadProtocol); return i.second->m_enabled ? i.second->interpret(_t, _r) : true;
else if (m_info.protocolVersion <= 4)
{
reason = reasonOf(r);
clog(NetMessageSummary) << "Disconnect (reason: " << reason << ")";
drop(DisconnectRequested);
}
break;
}
case PingPacket:
{
clog(NetTriviaSummary) << "Ping";
RLPStream s;
sealAndSend(prep(s, PongPacket));
break;
}
case PongPacket:
{
DEV_GUARDED(x_info)
m_info.lastPing = std::chrono::steady_clock::now() - m_ping;
clog(NetTriviaSummary) << "Latency: " << chrono::duration_cast<chrono::milliseconds>(m_info.lastPing).count() << " ms";
break;
}
case GetPeersPacket:
// Disabled for interop testing.
// GetPeers/PeersPacket will be modified to only exchange new nodes which it's peers are interested in.
break;
clog(NetTriviaSummary) << "GetPeers";
m_theyRequestedNodes = true;
serviceNodesRequest();
break;
case PeersPacket:
// Disabled for interop testing.
// GetPeers/PeersPacket will be modified to only exchange new nodes which it's peers are interested in.
break;
clog(NetTriviaSummary) << "Peers (" << dec << (_r.itemCount() - 1) << " entries)";
m_weRequestedNodes = false;
for (unsigned i = 0; i < _r.itemCount(); ++i)
{
bi::address peerAddress;
if (_r[i][0].size() == 16)
peerAddress = bi::address_v6(_r[i][0].toHash<FixedHash<16>>().asArray());
else if (_r[i][0].size() == 4)
peerAddress = bi::address_v4(_r[i][0].toHash<FixedHash<4>>().asArray());
else
{
cwarn << "Received bad peer packet:" << _r;
disconnect(BadProtocol);
return true;
}
auto ep = bi::tcp::endpoint(peerAddress, _r[i][1].toInt<short>());
NodeId id = _r[i][2].toHash<NodeId>();
clog(NetAllDetail) << "Checking: " << ep << "(" << id << ")";
if (!isPublicAddress(peerAddress))
goto CONTINUE; // Private address. Ignore.
if (!id)
goto LAMEPEER; // Null identity. Ignore.
if (m_server->id() == id)
goto LAMEPEER; // Just our info - we already have that.
if (id == this->id())
goto LAMEPEER; // Just their info - we already have that.
if (!ep.port())
goto LAMEPEER; // Zero port? Don't think so.
if (ep.port() >= /*49152*/32768)
goto LAMEPEER; // Private port according to IANA.
// OK passed all our checks. Assume it's good.
addRating(1000);
m_server->addNode(id, NodeIPEndpoint(ep.address(), ep.port(), ep.port()));
clog(NetTriviaDetail) << "New peer: " << ep << "(" << id << ")";
CONTINUE:;
LAMEPEER:;
}
break;
default:
for (auto const& i: m_capabilities) for (auto const& i: m_capabilities)
if (_t >= (int)i.second->m_idOffset && _t - i.second->m_idOffset < i.second->hostCapability()->messageCount()) if (_t >= (int)i.second->m_idOffset && _t - i.second->m_idOffset < i.second->hostCapability()->messageCount())
{ return i.second->m_enabled ? i.second->interpret(_t - i.second->m_idOffset, _r) : true;
if (i.second->m_enabled) return false;
return i.second->interpret(_t - i.second->m_idOffset, _r);
else
return true;
}
return false;
}
} }
catch (std::exception const& _e) catch (std::exception const& _e)
{ {
@ -275,6 +185,47 @@ bool Session::interpret(PacketType _t, RLP const& _r)
return true; return true;
} }
bool Session::interpret(PacketType _t, RLP const& _r)
{
switch (_t)
{
case DisconnectPacket:
{
string reason = "Unspecified";
auto r = (DisconnectReason)_r[0].toInt<int>();
if (!_r[0].isInt())
drop(BadProtocol);
else
{
reason = reasonOf(r);
clog(NetMessageSummary) << "Disconnect (reason: " << reason << ")";
drop(DisconnectRequested);
}
break;
}
case PingPacket:
{
clog(NetTriviaSummary) << "Ping";
RLPStream s;
sealAndSend(prep(s, PongPacket));
break;
}
case PongPacket:
DEV_GUARDED(x_info)
{
m_info.lastPing = std::chrono::steady_clock::now() - m_ping;
clog(NetTriviaSummary) << "Latency: " << chrono::duration_cast<chrono::milliseconds>(m_info.lastPing).count() << " ms";
}
break;
case GetPeersPacket:
case PeersPacket:
break;
default:
return false;
}
return true;
}
void Session::ping() void Session::ping()
{ {
RLPStream s; RLPStream s;
@ -296,12 +247,9 @@ void Session::sealAndSend(RLPStream& _s)
bool Session::checkPacket(bytesConstRef _msg) bool Session::checkPacket(bytesConstRef _msg)
{ {
if (_msg.size() < 2) if (_msg[0] > 0x7f || _msg.size() < 2)
return false; return false;
if (_msg[0] > 0x7f) if (RLP(_msg.cropped(1)).actualSize() + 1 != _msg.size())
return false;
RLP r(_msg.cropped(1));
if (r.actualSize() + 1 != _msg.size())
return false; return false;
return true; return true;
} }
@ -419,82 +367,78 @@ void Session::doRead()
{ {
ThreadContext tc(info().id.abridged()); ThreadContext tc(info().id.abridged());
ThreadContext tc2(info().clientVersion); ThreadContext tc2(info().clientVersion);
if (ec && ec.category() != boost::asio::error::get_misc_category() && ec.value() != boost::asio::error::eof) if (!checkRead(h256::size, ec, length))
return;
else if (!m_io->authAndDecryptHeader(bytesRef(m_data.data(), length)))
{ {
clog(NetWarn) << "Error reading: " << ec.message(); clog(NetWarn) << "header decrypt failed";
drop(TCPError); drop(BadProtocol); // todo: better error
return;
} }
else if (ec && length == 0)
RLPXFrameInfo header;
try
{
header = RLPXFrameInfo(bytesConstRef(m_data.data(), length));
}
catch (std::exception const& _e)
{
clog(NetWarn) << "Exception decoding frame header RLP:" << bytesConstRef(m_data.data(), h128::size).cropped(3);
drop(BadProtocol);
return; return;
else }
/// read padded frame and mac
auto tlen = header.length + header.padding + h128::size;
ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, tlen), [this, self, header, tlen](boost::system::error_code ec, std::size_t length)
{ {
/// authenticate and decrypt header ThreadContext tc(info().id.abridged());
bytesRef header(m_data.data(), h256::size); ThreadContext tc2(info().clientVersion);
if (!m_io->authAndDecryptHeader(header)) if (!checkRead(tlen, ec, length))
return;
else if (!m_io->authAndDecryptFrame(bytesRef(m_data.data(), tlen)))
{ {
clog(NetWarn) << "header decrypt failed"; clog(NetWarn) << "frame decrypt failed";
drop(BadProtocol); // todo: better error drop(BadProtocol); // todo: better error
return; return;
} }
/// check frame size bytesConstRef frame(m_data.data(), header.length);
uint32_t frameSize = (m_data[0] * 256 + m_data[1]) * 256 + m_data[2]; if (!checkPacket(frame))
if (frameSize >= (uint32_t)1 << 24)
{ {
clog(NetWarn) << "frame size too large"; cerr << "Received " << frame.size() << ": " << toHex(frame) << endl;
drop(BadProtocol); clog(NetWarn) << "INVALID MESSAGE RECEIVED";
disconnect(BadProtocol);
return; return;
} }
else
/// rlp of header has protocol-type, sequence-id[, total-packet-size]
bytes headerRLP(13);
bytesConstRef(m_data.data(), h128::size).cropped(3).copyTo(&headerRLP);
/// read padded frame and mac
auto tlen = frameSize + ((16 - (frameSize % 16)) % 16) + h128::size;
ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, tlen), [this, self, headerRLP, frameSize, tlen](boost::system::error_code ec, std::size_t length)
{ {
ThreadContext tc(info().id.abridged()); auto packetType = (PacketType)RLP(frame.cropped(0, 1)).toInt<unsigned>();
ThreadContext tc2(info().clientVersion); RLP r(frame.cropped(1));
if (ec && ec.category() != boost::asio::error::get_misc_category() && ec.value() != boost::asio::error::eof) if (!readPacket(header.protocolId, packetType, r))
{ clog(NetWarn) << "Couldn't interpret packet." << RLP(r);
clog(NetWarn) << "Error reading: " << ec.message(); }
drop(TCPError); doRead();
} });
else if (ec && length < tlen)
{
clog(NetWarn) << "Error reading - Abrupt peer disconnect: " << ec.message();
repMan().noteRude(*this);
drop(TCPError);
return;
}
else
{
if (!m_io->authAndDecryptFrame(bytesRef(m_data.data(), tlen)))
{
clog(NetWarn) << "frame decrypt failed";
drop(BadProtocol); // todo: better error
return;
}
bytesConstRef frame(m_data.data(), frameSize);
if (!checkPacket(frame))
{
cerr << "Received " << frame.size() << ": " << toHex(frame) << endl;
clog(NetWarn) << "INVALID MESSAGE RECEIVED";
disconnect(BadProtocol);
return;
}
else
{
auto packetType = (PacketType)RLP(frame.cropped(0, 1)).toInt<unsigned>();
RLP r(frame.cropped(1));
if (!interpret(packetType, r))
clog(NetWarn) << "Couldn't interpret packet." << RLP(r);
}
doRead();
}
});
}
}); });
} }
bool Session::checkRead(std::size_t _expected, boost::system::error_code _ec, std::size_t _length)
{
if (_ec && _ec.category() != boost::asio::error::get_misc_category() && _ec.value() != boost::asio::error::eof)
{
clog(NetConnect) << "Error reading: " << _ec.message();
drop(TCPError);
return false;
}
else if (_ec && _length < _expected)
{
clog(NetWarn) << "Error reading - Abrupt peer disconnect: " << _ec.message();
repMan().noteRude(*this);
drop(TCPError);
return false;
}
// If this fails then there's an unhandled asio error
assert(_expected == _length);
return true;
}

8
libp2p/Session.h

@ -97,10 +97,16 @@ private:
/// Perform a read on the socket. /// Perform a read on the socket.
void doRead(); void doRead();
/// Check error code after reading and drop peer if error code.
bool checkRead(std::size_t _expected, boost::system::error_code _ec, std::size_t _length);
/// Perform a single round of the write operation. This could end up calling itself asynchronously. /// Perform a single round of the write operation. This could end up calling itself asynchronously.
void write(); void write();
/// Interpret an incoming message. /// Deliver RLPX packet to Session or Capability for interpretation.
bool readPacket(uint16_t _capId, PacketType _t, RLP const& _r);
/// Interpret an incoming Session packet.
bool interpret(PacketType _t, RLP const& _r); bool interpret(PacketType _t, RLP const& _r);
/// @returns true iff the _msg forms a valid message for sending or receiving on the network. /// @returns true iff the _msg forms a valid message for sending or receiving on the network.

10
test/libp2p/net.cpp

@ -314,23 +314,23 @@ BOOST_AUTO_TEST_CASE(kademlia)
node.nodeTable->discover(); // ideally, joining with empty node table logs warning we can check for node.nodeTable->discover(); // ideally, joining with empty node table logs warning we can check for
node.setup(); node.setup();
node.populate(); node.populate();
clog << "NodeTable:\n" << *node.nodeTable.get() << endl; // clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
node.populateAll(); node.populateAll();
clog << "NodeTable:\n" << *node.nodeTable.get() << endl; // clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
auto nodes = node.nodeTable->nodes(); auto nodes = node.nodeTable->nodes();
nodes.sort(); nodes.sort();
node.nodeTable->reset(); node.nodeTable->reset();
clog << "NodeTable:\n" << *node.nodeTable.get() << endl; // clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
node.populate(1); node.populate(1);
clog << "NodeTable:\n" << *node.nodeTable.get() << endl; // clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
node.nodeTable->discover(); node.nodeTable->discover();
this_thread::sleep_for(chrono::milliseconds(2000)); this_thread::sleep_for(chrono::milliseconds(2000));
clog << "NodeTable:\n" << *node.nodeTable.get() << endl; // clog << "NodeTable:\n" << *node.nodeTable.get() << endl;
BOOST_REQUIRE_EQUAL(node.nodeTable->count(), 8); BOOST_REQUIRE_EQUAL(node.nodeTable->count(), 8);

Loading…
Cancel
Save