From 10cfa35c09b99f5375c4c30955f5b89f8f9ddc44 Mon Sep 17 00:00:00 2001 From: arkpar Date: Mon, 25 May 2015 13:17:42 +0200 Subject: [PATCH 1/2] blockchain sync refactoring --- libethcore/Common.cpp | 2 +- libethereum/BlockChain.cpp | 2 +- libethereum/CanonBlockChain.h | 1 - libethereum/Client.cpp | 29 +- libethereum/CommonNet.h | 4 +- libethereum/DownloadMan.cpp | 49 +++ libethereum/DownloadMan.h | 123 ++++++++ libethereum/EthereumHost.cpp | 561 +++++++++++++++++++++++++++------- libethereum/EthereumHost.h | 30 +- libethereum/EthereumPeer.cpp | 462 ++++++++-------------------- libethereum/EthereumPeer.h | 18 +- libp2p/Capability.h | 1 + libp2p/Host.h | 1 + libp2p/HostCapability.cpp | 7 +- libp2p/HostCapability.h | 1 + libp2p/Session.h | 2 + 16 files changed, 819 insertions(+), 474 deletions(-) diff --git a/libethcore/Common.cpp b/libethcore/Common.cpp index 56db647f3..8eca84873 100644 --- a/libethcore/Common.cpp +++ b/libethcore/Common.cpp @@ -35,7 +35,7 @@ namespace dev namespace eth { -const unsigned c_protocolVersion = 60; +const unsigned c_protocolVersion = 61; #if ETH_FATDB const unsigned c_minorProtocolVersion = 3; const unsigned c_databaseBaseVersion = 9; diff --git a/libethereum/BlockChain.cpp b/libethereum/BlockChain.cpp index e23fde6b6..67e42d7c8 100644 --- a/libethereum/BlockChain.cpp +++ b/libethereum/BlockChain.cpp @@ -317,7 +317,7 @@ tuple BlockChain::sync(BlockQueue& _bq, OverlayDB const& _st { try { - // Nonce & uncle nonces already verified thread at this point. + // Nonce & uncle nonces already verified in verification thread at this point. ImportRoute r; DEV_TIMED_ABOVE(Block import, 500) r = import(block.first, block.second, _stateDB, ImportRequirements::Default & ~ImportRequirements::ValidNonce & ~ImportRequirements::CheckUncles); diff --git a/libethereum/CanonBlockChain.h b/libethereum/CanonBlockChain.h index df4ac2d88..d1d47cd14 100644 --- a/libethereum/CanonBlockChain.h +++ b/libethereum/CanonBlockChain.h @@ -34,7 +34,6 @@ #include #include "BlockDetails.h" #include "Account.h" -#include "BlockQueue.h" #include "BlockChain.h" namespace ldb = leveldb; diff --git a/libethereum/Client.cpp b/libethereum/Client.cpp index e372e611a..3ca9a3172 100644 --- a/libethereum/Client.cpp +++ b/libethereum/Client.cpp @@ -164,28 +164,8 @@ const char* ClientDetail::name() { return EthTeal "⧫" EthCoal " ●"; } #endif Client::Client(p2p::Host* _extNet, std::string const& _dbPath, WithExisting _forceAction, u256 _networkId): - Worker("eth", 0), - m_vc(_dbPath), - m_bc(_dbPath, max(m_vc.action(), _forceAction), [](unsigned d, unsigned t){ cerr << "REVISING BLOCKCHAIN: Processed " << d << " of " << t << "...\r"; }), - m_gp(new TrivialGasPricer), - m_stateDB(State::openDB(_dbPath, max(m_vc.action(), _forceAction))), - m_preMine(m_stateDB, BaseState::CanonGenesis), - m_postMine(m_stateDB) + Client(_extNet, make_shared(), _dbPath, _forceAction, _networkId) { - m_lastGetWork = std::chrono::system_clock::now() - chrono::seconds(30); - m_tqReady = m_tq.onReady([=](){ this->onTransactionQueueReady(); }); // TODO: should read m_tq->onReady(thisThread, syncTransactionQueue); - m_bqReady = m_bq.onReady([=](){ this->onBlockQueueReady(); }); // TODO: should read m_bq->onReady(thisThread, syncBlockQueue); - m_farm.onSolutionFound([=](ProofOfWork::Solution const& s){ return this->submitWork(s); }); - - m_gp->update(m_bc); - - m_host = _extNet->registerCapability(new EthereumHost(m_bc, m_tq, m_bq, _networkId)); - - if (_dbPath.size()) - Defaults::setDBPath(_dbPath); - m_vc.setOk(); - doWork(); - startWorking(); } @@ -195,7 +175,7 @@ Client::Client(p2p::Host* _extNet, std::shared_ptr _gp, std::string c m_bc(_dbPath, max(m_vc.action(), _forceAction), [](unsigned d, unsigned t){ cerr << "REVISING BLOCKCHAIN: Processed " << d << " of " << t << "...\r"; }), m_gp(_gp), m_stateDB(State::openDB(_dbPath, max(m_vc.action(), _forceAction))), - m_preMine(m_stateDB), + m_preMine(m_stateDB, BaseState::CanonGenesis), m_postMine(m_stateDB) { m_lastGetWork = std::chrono::system_clock::now() - chrono::seconds(30); @@ -205,7 +185,10 @@ Client::Client(p2p::Host* _extNet, std::shared_ptr _gp, std::string c m_gp->update(m_bc); - m_host = _extNet->registerCapability(new EthereumHost(m_bc, m_tq, m_bq, _networkId)); + + auto host = _extNet->registerCapability(new EthereumHost(m_bc, m_tq, m_bq, _networkId)); + m_host = host; + _extNet->addCapability(host, EthereumHost::staticName(), EthereumHost::staticVersion() - 1); if (_dbPath.size()) Defaults::setDBPath(_dbPath); diff --git a/libethereum/CommonNet.h b/libethereum/CommonNet.h index 0b1469f19..8b810bd10 100644 --- a/libethereum/CommonNet.h +++ b/libethereum/CommonNet.h @@ -38,12 +38,12 @@ namespace eth #if ETH_DEBUG static const unsigned c_maxHashes = 2048; ///< Maximum number of hashes BlockHashes will ever send. -static const unsigned c_maxHashesAsk = 2048; ///< Maximum number of hashes GetBlockHashes will ever ask for. +static const unsigned c_maxHashesAsk = 2048; ///< Maximum number of hashes GetBlockHashes will ever ask for. static const unsigned c_maxBlocks = 128; ///< Maximum number of blocks Blocks will ever send. static const unsigned c_maxBlocksAsk = 128; ///< Maximum number of blocks we ask to receive in Blocks (when using GetChain). #else static const unsigned c_maxHashes = 2048; ///< Maximum number of hashes BlockHashes will ever send. -static const unsigned c_maxHashesAsk = 2048; ///< Maximum number of hashes GetBlockHashes will ever ask for. +static const unsigned c_maxHashesAsk = 2048; ///< Maximum number of hashes GetBlockHashes will ever ask for. static const unsigned c_maxBlocks = 128; ///< Maximum number of blocks Blocks will ever send. static const unsigned c_maxBlocksAsk = 128; ///< Maximum number of blocks we ask to receive in Blocks (when using GetChain). #endif diff --git a/libethereum/DownloadMan.cpp b/libethereum/DownloadMan.cpp index 05d0a533e..838e29078 100644 --- a/libethereum/DownloadMan.cpp +++ b/libethereum/DownloadMan.cpp @@ -75,3 +75,52 @@ bool DownloadSub::noteBlock(h256 _hash) m_remaining.erase(_hash); return ret; } + +HashDownloadSub::HashDownloadSub(HashDownloadMan& _man): m_man(&_man) +{ + WriteGuard l(m_man->x_subs); + m_asked = RangeMask(m_man->m_chainStart, m_man->m_chainStart + m_man->m_chainCount); + m_attempted = RangeMask(m_man->m_chainStart, m_man->m_chainStart + m_man->m_chainCount); + m_man->m_subs.insert(this); +} + +HashDownloadSub::~HashDownloadSub() +{ + if (m_man) + { + WriteGuard l(m_man->x_subs); + m_man->m_subs.erase(this); + } +} + +void HashDownloadSub::resetFetch() +{ + Guard l(m_fetch); + m_remaining = 0; + m_asked = RangeMask(m_man->m_chainStart, m_man->m_chainStart + m_man->m_chainCount); + m_attempted = RangeMask(m_man->m_chainStart, m_man->m_chainStart + m_man->m_chainCount); +} + +unsigned HashDownloadSub::nextFetch(unsigned _n) +{ + Guard l(m_fetch); + + m_asked = RangeMask(m_man->m_chainStart, m_man->m_chainStart + m_man->m_chainCount); + + if (!m_man || m_man->chainEmpty()) + return 0; + + m_asked = (~(m_man->taken() + m_attempted)).lowest(_n); + if (m_asked.empty()) + m_asked = (~(m_man->taken(true) + m_attempted)).lowest(_n); + m_attempted += m_asked; + return *m_asked.begin(); +} + +void HashDownloadSub::noteHash(unsigned _index, unsigned _size) +{ + Guard l(m_fetch); + if (m_man) + for(unsigned i = _index; i < _index + _size; ++i) + m_man->m_got += i; +} diff --git a/libethereum/DownloadMan.h b/libethereum/DownloadMan.h index 2b41e660b..4ff83847c 100644 --- a/libethereum/DownloadMan.h +++ b/libethereum/DownloadMan.h @@ -88,6 +88,13 @@ public: i->m_man = nullptr; } + void appendToChain(h256s const& _hashes) + { + WriteGuard l(m_lock); + m_chain.insert(m_chain.end(), _hashes.cbegin(), _hashes.cend()); + m_blocksGot = RangeMask(0, m_chain.size()); + } + void resetToChain(h256s const& _chain) { { @@ -158,6 +165,122 @@ private: std::unordered_set m_subs; }; + +class HashDownloadMan; + +class HashDownloadSub +{ + friend class HashDownloadMan; + +public: + HashDownloadSub(HashDownloadMan& _man); + ~HashDownloadSub(); + + /// Finished last fetch - grab the next hash index to download + unsigned nextFetch(unsigned _n); + + /// Note that we've received a particular hash range. + void noteHash(unsigned _index, unsigned count); + + /// Nothing doing here. + void doneFetch() { resetFetch(); } + + bool askedContains(unsigned _i) const { Guard l(m_fetch); return m_asked.contains(_i); } + RangeMask const& asked() const { return m_asked; } + RangeMask const& attemped() const { return m_attempted; } + +private: + void resetFetch(); // Called by DownloadMan when we need to reset the download. + + HashDownloadMan* m_man = nullptr; + mutable Mutex m_fetch; + unsigned m_remaining; + RangeMask m_asked; + RangeMask m_attempted; +}; + +class HashDownloadMan +{ + friend class HashDownloadSub; + +public: + ~HashDownloadMan() + { + for (auto i: m_subs) + i->m_man = nullptr; + } + + void resetToRange(unsigned _start, unsigned _count) + { + { + ReadGuard l(x_subs); + for (auto i: m_subs) + i->resetFetch(); + } + WriteGuard l(m_lock); + m_chainStart = _start; + m_chainCount = _count; + m_got += RangeMask(_start, _start + _count); + { + ReadGuard l(x_subs); + for (auto i: m_subs) + i->resetFetch(); + } + } + + void reset(unsigned _start) + { + WriteGuard l(m_lock); + m_chainStart = _start; + m_chainCount = 0; + m_got = RangeMask(_start, _start); + + { + ReadGuard l(x_subs); + for (auto i: m_subs) + i->resetFetch(); + } + } + + RangeMask taken(bool _desperate = false) const + { + ReadGuard l(m_lock); + auto ret = m_got; + if (!_desperate) + { + ReadGuard l(x_subs); + for (auto i: m_subs) + ret += i->m_asked; + } + return ret; + } + + bool isComplete() const + { + ReadGuard l(m_lock); + return m_got.full(); + }\ + + size_t chainSize() const { ReadGuard l(m_lock); return m_chainCount; } + size_t chainEmpty() const { ReadGuard l(m_lock); return m_chainCount == 0; } + void foreachSub(std::function const& _f) const { ReadGuard l(x_subs); for(auto i: m_subs) _f(*i); } + unsigned subCount() const { ReadGuard l(x_subs); return m_subs.size(); } + RangeMask hashesGot() const { ReadGuard l(m_lock); return m_got; } + +private: + mutable SharedMutex m_lock; + unsigned m_chainStart = 0; + unsigned m_chainCount = 0; + RangeMask m_got; + + mutable SharedMutex x_subs; + std::unordered_set m_subs; +}; + + + + + } } diff --git a/libethereum/EthereumHost.cpp b/libethereum/EthereumHost.cpp index d62d6716f..af1caf3d4 100644 --- a/libethereum/EthereumHost.cpp +++ b/libethereum/EthereumHost.cpp @@ -37,6 +37,8 @@ using namespace dev; using namespace dev::eth; using namespace p2p; +const unsigned c_prevProtocolVersion = 60; + EthereumHost::EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQueue& _bq, u256 _networkId): HostCapability(), Worker ("ethsync"), @@ -46,12 +48,12 @@ EthereumHost::EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQu m_networkId (_networkId) { m_latestBlockSent = _ch.currentHash(); + m_hashMan.reset(m_chain.number() + 1); } EthereumHost::~EthereumHost() { - for (auto i: peerSessions()) - i.first->cap().get()->abortSync(); + forEachPeer([](EthereumPeer* _p) { _p->abortSync(); }); } bool EthereumHost::ensureInitialised() @@ -71,86 +73,22 @@ bool EthereumHost::ensureInitialised() void EthereumHost::noteNeedsSyncing(EthereumPeer* _who) { - // if already downloading hash-chain, ignore. - if (isSyncing()) - { - clog(NetAllDetail) << "Sync in progress: Just set to help out."; - if (m_syncer->m_asking == Asking::Blocks) - _who->transition(Asking::Blocks); - } - else - // otherwise check to see if we should be downloading... - _who->attemptSync(); -} - -void EthereumHost::changeSyncer(EthereumPeer* _syncer, bool _needHelp) -{ - if (_syncer) - clog(NetAllDetail) << "Changing syncer to" << _syncer->session()->socketId(); - else - clog(NetAllDetail) << "Clearing syncer."; - - m_syncer = _syncer; - if (isSyncing()) - { - if (_needHelp && _syncer->m_asking == Asking::Blocks) - for (auto j: peerSessions()) - { - clog(NetNote) << "Getting help with downloading blocks"; - auto e = j.first->cap().get(); - if (e != _syncer && e->m_asking == Asking::Nothing) - e->transition(Asking::Blocks); - } - } - else - { - // start grabbing next hash chain if there is one. - for (auto j: peerSessions()) - { - j.first->cap()->attemptSync(); - if (isSyncing()) - return; - } - clog(NetNote) << "No more peers to sync with."; - } -} - -void EthereumHost::noteDoneBlocks(EthereumPeer* _who, bool _clemency) -{ - if (m_man.isComplete()) - { - // Done our chain-get. - clog(NetNote) << "Chain download complete."; - // 1/100th for each useful block hash. - _who->addRating(m_man.chainSize() / 100); - m_man.reset(); - } - else if (_who->isSyncing()) - { - if (_clemency) - clog(NetNote) << "Chain download failed. Aborted while incomplete."; - else - { - // Done our chain-get. - clog(NetWarn) << "Chain download failed. Peer with blocks didn't have them all. This peer is bad and should be punished."; - clog(NetWarn) << m_man.remaining(); - clog(NetWarn) << "WOULD BAN."; -// m_banned.insert(_who->session()->id()); // We know who you are! -// _who->disable("Peer sent hashes but was unable to provide the blocks."); - } - m_man.reset(); - } + if (_who->m_asking == Asking::Nothing) + continueSync(_who); } void EthereumHost::reset() { - if (m_syncer) - m_syncer->abortSync(); - + forEachPeer([](EthereumPeer* _p) { _p->abortSync(); }); m_man.resetToChain(h256s()); - + m_hashMan.reset(m_chain.number() + 1); + m_needSyncBlocks = true; + m_needSyncHashes = true; + m_syncingLatestHash = h256(); + m_syncingTotalDifficulty = 0; m_latestBlockSent = h256(); m_transactionsSent.clear(); + m_v60Hashes.clear(); } void EthereumHost::doWork() @@ -172,9 +110,7 @@ void EthereumHost::doWork() } } - for (auto p: peerSessions()) - if (shared_ptr const& ep = p.first->cap()) - ep->tick(); + forEachPeer([](EthereumPeer* _p) { _p->tick(); }); // return netChange; // TODO: Figure out what to do with netChange. @@ -194,43 +130,60 @@ void EthereumHost::maintainTransactions() } for (auto const& t: ts) m_transactionsSent.insert(t.first); - for (auto p: peerSessions()) - if (auto ep = p.first->cap()) + forEachPeer([&](shared_ptr _p) + { + bytes b; + unsigned n = 0; + for (auto const& h: peerTransactions[_p]) { - bytes b; - unsigned n = 0; - for (auto const& h: peerTransactions[ep]) - { - ep->m_knownTransactions.insert(h); - b += ts[h].rlp(); - ++n; - } + _p->m_knownTransactions.insert(h); + b += ts[h].rlp(); + ++n; + } - ep->clearKnownTransactions(); + _p->clearKnownTransactions(); - if (n || ep->m_requireTransactions) - { - RLPStream ts; - ep->prep(ts, TransactionsPacket, n).appendRaw(b, n); - ep->sealAndSend(ts); - } - ep->m_requireTransactions = false; + if (n || _p->m_requireTransactions) + { + RLPStream ts; + _p->prep(ts, TransactionsPacket, n).appendRaw(b, n); + _p->sealAndSend(ts); } + _p->m_requireTransactions = false; + }); +} + +void EthereumHost::forEachPeer(std::function const& _f) +{ + forEachPeer([&](std::shared_ptr _p) + { + if (_p) + _f(_p.get()); + }); +} + +void EthereumHost::forEachPeer(std::function)> const& _f) +{ + for (auto s: peerSessions()) + _f(s.first->cap()); + for (auto s: peerSessions(protocolVersion() - 1)) //TODO: + _f(s.first->cap(protocolVersion() - 1)); + } pair>, vector>> EthereumHost::randomSelection(unsigned _percent, std::function const& _allow) { pair>, vector>> ret; - ret.second.reserve(peerSessions().size()); - for (auto const& j: peerSessions()) + vector> peers; + forEachPeer([&](shared_ptr _p) { - auto pp = j.first->cap(); - if (_allow(pp.get())) - ret.second.push_back(pp); - } + if (_p && _allow(_p.get())) + ret.second.push_back(_p); + }); - ret.second.reserve((peerSessions().size() * _percent + 99) / 100); - for (unsigned i = (peerSessions().size() * _percent + 99) / 100; i-- && ret.second.size();) + size_t size = (ret.second.size() * _percent + 99) / 100; + ret.second.reserve(size); + for (unsigned i = size; i-- && ret.second.size();) { unsigned n = rand() % ret.second.size(); ret.first.push_back(std::move(ret.second[n])); @@ -279,3 +232,403 @@ void EthereumHost::maintainBlocks(h256 const& _currentHash) m_latestBlockSent = _currentHash; } } + +void EthereumHost::onPeerState(EthereumPeer* _peer) +{ + if (!_peer->enabled()) + { + clog(NetNote) << "Ignoring status from disabled peer"; + return; + } + if (_peer->m_genesisHash != m_chain.genesisHash()) + _peer->disable("Invalid genesis hash"); + else if (_peer->m_protocolVersion != protocolVersion())// && _peer->m_protocolVersion != c_prevProtocolVersion) + _peer->disable("Invalid protocol version."); + else if (_peer->m_networkId != networkId()) + _peer->disable("Invalid network identifier."); + else if (_peer->session()->info().clientVersion.find("/v0.7.0/") != string::npos) + _peer->disable("Blacklisted client version."); + else if (isBanned(_peer->session()->id())) + _peer->disable("Peer banned for previous bad behaviour."); + else + { + + _peer->m_expectedHashes = 500000; //TODO: + if (m_hashMan.chainSize() < _peer->m_expectedHashes) + m_hashMan.resetToRange(m_chain.number() + 1, _peer->m_expectedHashes); + continueSync(_peer); + } +} + +void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes) +{ + unsigned knowns = 0; + unsigned unknowns = 0; + h256s neededBlocks; + for (unsigned i = 0; i < _hashes.size(); ++i) + { + _peer->addRating(1); + auto h = _hashes[i]; + auto status = m_bq.blockStatus(h); + if (status == QueueStatus::Importing || status == QueueStatus::Ready || m_chain.isKnown(h)) + { + clog(NetMessageSummary) << "block hash ready:" << h << ". Start blocks download..."; + m_v60Hashes += neededBlocks; + onPeerDoneHashes(_peer, false); + return; + } + else if (status == QueueStatus::Bad) + { + cwarn << "block hash bad!" << h << ". Bailing..."; + _peer->setIdle(); + return; + } + else if (status == QueueStatus::Unknown) + { + unknowns++; + neededBlocks.push_back(h); + } + else + knowns++; + m_syncingLatestHash = h; + } + m_v60Hashes += neededBlocks; + clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns; now at" << m_syncingLatestHash; + if (_complete) + { + m_needSyncBlocks = true; + continueSync(_peer); + } + else if (m_hashes.size() > _peer->m_expectedHashes) + { + _peer->disable("Too many hashes"); + m_hashes.clear(); + m_syncingLatestHash = h256(); + continueSync(); ///Try with some other peer, keep the chain + } + else + continueSync(_peer); /// Grab next hashes +} + +void EthereumHost::onPeerHashes(EthereumPeer* _peer, unsigned /*_index*/, h256s const& _hashes) +{ + unsigned knowns = 0; + unsigned unknowns = 0; + h256s neededBlocks; + for (unsigned i = 0; i < _hashes.size(); ++i) + { + _peer->addRating(1); + auto h = _hashes[i]; + auto status = m_bq.blockStatus(h); + if (status == QueueStatus::Importing || status == QueueStatus::Ready || m_chain.isKnown(h)) + { + clog(NetWarn) << "block hash alrady known:" << h; + } + else if (status == QueueStatus::Bad) + { + cwarn << "block hash bad!" << h << ". Bailing..."; + _peer->setIdle(); + return; + } + else if (status == QueueStatus::Unknown) + { + unknowns++; + neededBlocks.push_back(h); + } + else + knowns++; + m_syncingLatestHash = h; + } + m_man.appendToChain(neededBlocks); + clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns; now at" << m_syncingLatestHash; + + if (m_hashMan.isComplete()) + { + // Done our chain-get. + m_needSyncHashes = false; + clog(NetNote) << "Hashes download complete."; + // 1/100th for each useful block hash. + _peer->addRating(m_man.chainSize() / 100); //TODO: what about other peers? + m_hashMan.reset(m_chain.number() + 1); + continueSync(); + } + else + continueSync(_peer); +} + +void EthereumHost::onPeerDoneHashes(EthereumPeer* _peer, bool _new) +{ + m_needSyncHashes = false; + if (_peer->m_protocolVersion == protocolVersion() || _new) + { + continueSync(_peer); + } + else + { + m_man.resetToChain(m_v60Hashes); + continueSync(); + } +} + +void EthereumHost::onPeerBlocks(EthereumPeer* _peer, RLP const& _r) +{ + if (!_peer->enabled()) + { + clog(NetNote) << "Ignoring blocks from disabled peer"; + return; + } + unsigned itemCount = _r.itemCount(); + clog(NetMessageSummary) << "Blocks (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreBlocks"); + + if (itemCount == 0) + { + // Got to this peer's latest block - just give up. + _peer->setIdle(); + return; + } + + unsigned success = 0; + unsigned future = 0; + unsigned unknown = 0; + unsigned got = 0; + unsigned repeated = 0; + + for (unsigned i = 0; i < itemCount; ++i) + { + auto h = BlockInfo::headerHash(_r[i].data()); + if (_peer->m_sub.noteBlock(h)) + { + _peer->addRating(10); + switch (m_bq.import(_r[i].data(), m_chain)) + { + case ImportResult::Success: + success++; + break; + + case ImportResult::Malformed: + case ImportResult::BadChain: + _peer->disable("Malformed block received."); + return; + + case ImportResult::FutureTime: + future++; + break; + + case ImportResult::AlreadyInChain: + case ImportResult::AlreadyKnown: + got++; + break; + + case ImportResult::UnknownParent: + unknown++; + break; + + default:; + } + } + else + { + _peer->addRating(0); // -1? + repeated++; + } + } + + clog(NetMessageSummary) << dec << success << "imported OK," << unknown << "with unknown parents," << future << "with future timestamps," << got << " already known," << repeated << " repeats received."; + + if (m_man.isComplete() && !m_needSyncHashes) + { + // Done our chain-get. + m_needSyncBlocks = false; + clog(NetNote) << "Chain download complete."; + // 1/100th for each useful block hash. + _peer->addRating(m_man.chainSize() / 100); //TODO: what about other peers? + m_man.reset(); + } + continueSync(_peer); +} + +void EthereumHost::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes) +{ + Guard l(x_sync); + if (_peer->m_asking != Asking::Nothing) + { + clog(NetMessageSummary) << "Ignoring new hashes since we're already downloading."; + return; + } + clog(NetNote) << "New block hash discovered: syncing without help."; + onPeerHashes(_peer, _hashes, true); +} + +void EthereumHost::onPeerNewBlock(EthereumPeer* _peer, RLP const& _r) +{ + Guard l(x_sync); + if (_peer->m_asking != Asking::Nothing) + { + clog(NetMessageSummary) << "Ignoring new blocks since we're already downloading."; + return; + } + auto h = BlockInfo::headerHash(_r[0].data()); + clog(NetMessageSummary) << "NewBlock: " << h; + + if (_r.itemCount() != 2) + _peer->disable("NewBlock without 2 data fields."); + else + { + bool sync = false; + switch (m_bq.import(_r[0].data(), m_chain)) + { + case ImportResult::Success: + _peer->addRating(100); + break; + case ImportResult::FutureTime: + //TODO: Rating dependent on how far in future it is. + break; + + case ImportResult::Malformed: + case ImportResult::BadChain: + _peer->disable("Malformed block received."); + return; + + case ImportResult::AlreadyInChain: + case ImportResult::AlreadyKnown: + break; + + case ImportResult::UnknownParent: + if (h) + { + u256 difficulty = _r[1].toInt(); + if (m_syncingTotalDifficulty < difficulty) + { + clog(NetMessageSummary) << "Received block with no known parent. Resyncing..."; + _peer->m_latestHash = h; + _peer->m_totalDifficulty = difficulty; + m_needSyncHashes = true; + m_needSyncBlocks = true; + m_syncingLatestHash = _peer->m_latestHash; + sync = true; + } + } + break; + default:; + } + + DEV_GUARDED(_peer->x_knownBlocks) + _peer->m_knownBlocks.insert(h); + + if (sync) + continueSync(_peer); + } +} + +void EthereumHost::onPeerTransactions(EthereumPeer* _peer, RLP const& _r) +{ + unsigned itemCount = _r.itemCount(); + clog(NetAllDetail) << "Transactions (" << dec << itemCount << "entries)"; + Guard l(_peer->x_knownTransactions); + for (unsigned i = 0; i < itemCount; ++i) + { + auto h = sha3(_r[i].data()); + _peer->m_knownTransactions.insert(h); + ImportResult ir = m_tq.import(_r[i].data()); + switch (ir) + { + case ImportResult::Malformed: + _peer->addRating(-100); + break; + case ImportResult::AlreadyKnown: + // if we already had the transaction, then don't bother sending it on. + m_transactionsSent.insert(h); + _peer->addRating(0); + break; + case ImportResult::Success: + _peer->addRating(100); + break; + default:; + } + } +} + +void EthereumHost::continueSync() +{ + forEachPeer([&](EthereumPeer* _p) + { + clog(NetNote) << "Getting help with downloading hashes and blocks"; + if (_p->m_asking == Asking::Nothing) + continueSync(_p); + }); +} + +void EthereumHost::continueSync(EthereumPeer* _peer) +{ + bool otherPeerSync = false; + bool thisPeerSync = false; + if (m_needSyncHashes && peerShouldGrabChain(_peer)) + { + forEachPeer([&](EthereumPeer* _p) + { + if (_p->m_asking == Asking::Hashes && _p->m_protocolVersion != protocolVersion()) + { + // Already have a peer downloading hash chain with old protocol, do nothing + if (_p == _peer) + thisPeerSync = true; + else + otherPeerSync = true; + + } + }); + if (otherPeerSync) + { + _peer->setIdle(); + return; + } + if (_peer->m_protocolVersion == protocolVersion()) + _peer->requestHashes(); + else + { + // Restart/continue sync in single peer mode + if (!m_syncingLatestHash) + { + m_syncingLatestHash =_peer->m_latestHash; + m_syncingTotalDifficulty = _peer->m_totalDifficulty; + } + _peer->requestHashes(m_syncingLatestHash); + } + } + else if (m_needSyncBlocks && peerShouldGrabBlocks(_peer)) // Check if this peer can help with downloading blocks + _peer->requestBlocks(); + else + _peer->setIdle(); +} + +bool EthereumHost::peerShouldGrabBlocks(EthereumPeer* _peer) const +{ + auto td = _peer->m_totalDifficulty; + auto lh = m_syncingLatestHash; + auto ctd = m_chain.details().totalDifficulty; + + clog(NetNote) << "Should grab blocks? " << td << "vs" << ctd; + + if (td < ctd || (td == ctd && m_chain.currentHash() == lh)) + return false; + + return true; +} + +bool EthereumHost::peerShouldGrabChain(EthereumPeer* _peer) const +{ + h256 c = m_chain.currentHash(); + unsigned n = m_chain.number(); + u256 td = m_chain.details().totalDifficulty; + + clog(NetAllDetail) << "Attempt chain-grab? Latest:" << c << ", number:" << n << ", TD:" << td << " versus " << _peer->m_totalDifficulty; + if (td >= _peer->m_totalDifficulty) + { + clog(NetAllDetail) << "No. Our chain is better."; + return false; + } + else + { + clog(NetAllDetail) << "Yes. Their chain is better."; + return true; + } +} diff --git a/libethereum/EthereumHost.h b/libethereum/EthereumHost.h index 95c7f147a..900e515b8 100644 --- a/libethereum/EthereumHost.h +++ b/libethereum/EthereumHost.h @@ -57,7 +57,6 @@ class BlockQueue; class EthereumHost: public p2p::HostCapability, Worker { friend class EthereumPeer; - public: /// Start server, but don't listen. EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQueue& _bq, u256 _networkId); @@ -72,22 +71,23 @@ public: void reset(); DownloadMan const& downloadMan() const { return m_man; } - bool isSyncing() const { return !!m_syncer; } + bool isSyncing() const { return m_needSyncBlocks || m_needSyncHashes; } bool isBanned(p2p::NodeId _id) const { return !!m_banned.count(_id); } void noteNewTransactions() { m_newTransactions = true; } void noteNewBlocks() { m_newBlocks = true; } + void onPeerState(EthereumPeer* _peer); + private: std::pair>, std::vector>> randomSelection(unsigned _percent = 25, std::function const& _allow = [](EthereumPeer const*){ return true; }); + void forEachPeer(std::function)> const& _f); + void forEachPeer(std::function const& _f); /// Session is tell us that we may need (re-)syncing with the peer. void noteNeedsSyncing(EthereumPeer* _who); - /// Called when the peer can no longer provide us with any needed blocks. - void noteDoneBlocks(EthereumPeer* _who, bool _clemency); - /// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network. void doWork(); @@ -109,6 +109,14 @@ private: virtual void onStopping() { stopWorking(); } void changeSyncer(EthereumPeer* _ignore, bool _needHelp = true); + void continueSync(); + void continueSync(EthereumPeer* _peer); + void onPeerBlocks(EthereumPeer* _peer, RLP const& _r); + void onPeerDoneHashes(EthereumPeer* _peer, bool _new); + void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes); + void onPeerHashes(EthereumPeer* _peer, unsigned _index, h256s const& _hashes); + bool peerShouldGrabBlocks(EthereumPeer* _peer) const; + bool peerShouldGrabChain(EthereumPeer* _peer) const; BlockChain const& m_chain; TransactionQueue& m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain. @@ -116,9 +124,10 @@ private: u256 m_networkId; - EthereumPeer* m_syncer = nullptr; // TODO: switch to weak_ptr + std::weak_ptr m_hashSyncer; DownloadMan m_man; + HashDownloadMan m_hashMan; h256 m_latestBlockSent; h256Hash m_transactionsSent; @@ -127,6 +136,15 @@ private: bool m_newTransactions = false; bool m_newBlocks = false; + + unsigned m_maxKnownNumber = 0; + u256 m_maxKnownDifficulty; + bool m_needSyncHashes = true; + bool m_needSyncBlocks = true; + h256 m_syncingLatestHash; ///< Peer's latest block's hash, as of the current sync. + u256 m_syncingTotalDifficulty; ///< Peer's latest block's total difficulty, as of the current sync. + h256s m_v60Hashes; + }; } diff --git a/libethereum/EthereumPeer.cpp b/libethereum/EthereumPeer.cpp index b3846757c..1813bb956 100644 --- a/libethereum/EthereumPeer.cpp +++ b/libethereum/EthereumPeer.cpp @@ -36,9 +36,10 @@ using namespace p2p; EthereumPeer::EthereumPeer(Session* _s, HostCapabilityFace* _h, unsigned _i): Capability(_s, _h, _i), - m_sub(host()->m_man) + m_sub(host()->m_man), + m_hashSub(host()->m_hashMan) { - transition(Asking::State); + requestState(); } EthereumPeer::~EthereumPeer() @@ -50,7 +51,7 @@ EthereumPeer::~EthereumPeer() void EthereumPeer::abortSync() { if (isSyncing()) - transition(Asking::Nothing, true); + setIdle(); } EthereumHost* EthereumPeer::host() const @@ -74,141 +75,87 @@ string toString(Asking _a) return "?"; } -void EthereumPeer::transition(Asking _a, bool _force, bool _needHelp) -{ - clog(NetMessageSummary) << "Transition!" << ::toString(_a) << "from" << ::toString(m_asking) << ", " << (isSyncing() ? "syncing" : "holding") << (needsSyncing() ? "& needed" : ""); - - if (m_asking == Asking::State && _a != Asking::State) - m_requireTransactions = true; - RLPStream s; - - if (_a == Asking::State) - { - if (m_asking == Asking::Nothing) - { - setAsking(Asking::State, false); - prep(s, StatusPacket, 5) - << host()->protocolVersion() - << host()->networkId() - << host()->m_chain.details().totalDifficulty - << host()->m_chain.currentHash() - << host()->m_chain.genesisHash(); - sealAndSend(s); - return; - } - } - else if (_a == Asking::Hashes) +void EthereumPeer::setIdle() +{ + if (m_asking == Asking::Blocks) { - if (m_asking == Asking::State || m_asking == Asking::Nothing) - { - if (isSyncing()) - clog(NetWarn) << "Bad state: not asking for Hashes, yet syncing!"; - - m_syncingLatestHash = m_latestHash; - m_syncingTotalDifficulty = m_totalDifficulty; - resetNeedsSyncing(); + clog(NetNote) << "Finishing blocks fetch..."; + // NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary. + m_sub.doneFetch(); + m_hashSub.doneFetch(); - setAsking(_a, true); - prep(s, GetBlockHashesPacket, 2) << m_syncingLatestHash << c_maxHashesAsk; - m_syncingNeededBlocks = h256s(1, m_syncingLatestHash); - sealAndSend(s); - return; - } - else if (m_asking == Asking::Hashes) - { - if (!isSyncing()) - clog(NetWarn) << "Bad state: asking for Hashes yet not syncing!"; - - setAsking(_a, true); - prep(s, GetBlockHashesPacket, 2) << m_syncingLastReceivedHash << c_maxHashesAsk; - sealAndSend(s); - return; - } + setAsking(Asking::Nothing); } - else if (_a == Asking::Blocks) + else if (m_asking == Asking::Hashes) { - if (m_asking == Asking::Hashes) - { - if (!isSyncing()) - clog(NetWarn) << "Bad state: asking for Hashes yet not syncing!"; - if (shouldGrabBlocks()) - { - clog(NetNote) << "Difficulty of hashchain HIGHER. Grabbing" << m_syncingNeededBlocks.size() << "blocks [latest now" << m_syncingLatestHash << ", was" << host()->m_latestBlockSent << "]"; + clog(NetNote) << "Finishing hashes fetch..."; - host()->m_man.resetToChain(m_syncingNeededBlocks); -// host()->m_latestBlockSent = m_syncingLatestHash; - } - else - { - clog(NetNote) << "Difficulty of hashchain not HIGHER. Ignoring."; - m_syncingLatestHash = h256(); - setAsking(Asking::Nothing, false); - return; - } - } - // run through into... - if (m_asking == Asking::Nothing || m_asking == Asking::Hashes || m_asking == Asking::Blocks) - { - // Looks like it's the best yet for total difficulty. Set to download. - setAsking(Asking::Blocks, isSyncing(), _needHelp); // will kick off other peers to help if available. - auto blocks = m_sub.nextFetch(c_maxBlocksAsk); - if (blocks.size()) - { - prep(s, GetBlocksPacket, blocks.size()); - for (auto const& i: blocks) - s << i; - sealAndSend(s); - } - else - transition(Asking::Nothing); - return; - } + setAsking(Asking::Nothing); } - else if (_a == Asking::Nothing) + else if (m_asking == Asking::State) { - if (m_asking == Asking::Blocks) - { - clog(NetNote) << "Finishing blocks fetch..."; + setAsking(Asking::Nothing); + } +} - // a bit overkill given that the other nodes may yet have the needed blocks, but better to be safe than sorry. - if (isSyncing()) - host()->noteDoneBlocks(this, _force); +void EthereumPeer::requestState() +{ + if (m_asking != Asking::Nothing) + clog(NetWarn) << "Bad state: requesting state should be the first action"; + setAsking(Asking::State); + RLPStream s; + prep(s, StatusPacket, 5) + << host()->protocolVersion() - 1 + << host()->networkId() + << host()->m_chain.details().totalDifficulty + << host()->m_chain.currentHash() + << host()->m_chain.genesisHash(); + sealAndSend(s); +} - // NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary. - m_sub.doneFetch(); +void EthereumPeer::requestHashes() +{ + assert(m_asking != Asking::Blocks); + m_syncHashNumber = m_hashSub.nextFetch(c_maxBlocksAsk); + setAsking(Asking::Hashes); + RLPStream s; + prep(s, GetBlockHashesPacket, 2) << m_syncHashNumber << c_maxHashesAsk; + sealAndSend(s); +} - setAsking(Asking::Nothing, false); - } - else if (m_asking == Asking::Hashes) - { - clog(NetNote) << "Finishing hashes fetch..."; +void EthereumPeer::requestHashes(h256 const& _lastHash) +{ + assert(m_asking != Asking::Blocks); + setAsking(Asking::Hashes); + RLPStream s; + prep(s, GetBlockHashesPacket, 2) << _lastHash << c_maxHashesAsk; + sealAndSend(s); +} - setAsking(Asking::Nothing, false); - } - else if (m_asking == Asking::State) - { - setAsking(Asking::Nothing, false); - // Just got the state - should check to see if we can be of help downloading the chain if any. - // Otherwise, should put ourselves up for sync. - setNeedsSyncing(m_latestHash, m_totalDifficulty); - } - // Otherwise it's fine. We don't care if it's Nothing->Nothing. - return; +void EthereumPeer::requestBlocks() +{ + // Looks like it's the best yet for total difficulty. Set to download. + setAsking(Asking::Blocks); // will kick off other peers to help if available. + auto blocks = m_sub.nextFetch(c_maxBlocksAsk); + if (blocks.size()) + { + RLPStream s; + prep(s, GetBlocksPacket, blocks.size()); + for (auto const& i: blocks) + s << i; + sealAndSend(s); } - - clog(NetWarn) << "Invalid state transition:" << ::toString(_a) << "from" << ::toString(m_asking) << ", " << (isSyncing() ? "syncing" : "holding") << (needsSyncing() ? "& needed" : ""); + else + setIdle(); + return; } -void EthereumPeer::setAsking(Asking _a, bool _isSyncing, bool _needHelp) +void EthereumPeer::setAsking(Asking _a) { - bool changedAsking = (m_asking != _a); m_asking = _a; - if (_isSyncing != (host()->m_syncer == this) || (_isSyncing && changedAsking)) - host()->changeSyncer(_isSyncing ? this : nullptr, _needHelp); - - if (!_isSyncing) + if (!isSyncing()) { m_syncingLatestHash = h256(); m_syncingTotalDifficulty = 0; @@ -241,57 +188,7 @@ void EthereumPeer::tick() bool EthereumPeer::isSyncing() const { - return host()->m_syncer == this; -} - -bool EthereumPeer::shouldGrabBlocks() const -{ - auto td = m_syncingTotalDifficulty; - auto lh = m_syncingLatestHash; - auto ctd = host()->m_chain.details().totalDifficulty; - - if (m_syncingNeededBlocks.empty()) - return false; - - clog(NetNote) << "Should grab blocks? " << td << "vs" << ctd << ";" << m_syncingNeededBlocks.size() << " blocks, ends" << m_syncingNeededBlocks.back(); - - if (td < ctd || (td == ctd && host()->m_chain.currentHash() == lh)) - return false; - - return true; -} - -void EthereumPeer::attemptSync() -{ - if (m_asking != Asking::Nothing) - { - clog(NetAllDetail) << "Can't synced with this peer - outstanding asks."; - return; - } - - // if already done this, then ignore. - if (!needsSyncing()) - { - clog(NetAllDetail) << "Already synced with this peer."; - return; - } - - h256 c = host()->m_chain.currentHash(); - unsigned n = host()->m_chain.number(); - u256 td = host()->m_chain.details().totalDifficulty; - - clog(NetAllDetail) << "Attempt chain-grab? Latest:" << c << ", number:" << n << ", TD:" << td << " versus " << m_totalDifficulty; - if (td >= m_totalDifficulty) - { - clog(NetAllDetail) << "No. Our chain is better."; - resetNeedsSyncing(); - transition(Asking::Nothing); - } - else - { - clog(NetAllDetail) << "Yes. Their chain is better."; - transition(Asking::Hashes); - } + return m_asking != Asking::Nothing; } bool EthereumPeer::interpret(unsigned _id, RLP const& _r) @@ -303,27 +200,16 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) case StatusPacket: { m_protocolVersion = _r[0].toInt(); + if (!!session()->cap(EthereumHost::staticVersion())) + m_protocolVersion = host()->protocolVersion(); m_networkId = _r[1].toInt(); // a bit dirty as we're misusing these to communicate the values to transition, but harmless. m_totalDifficulty = _r[2].toInt(); m_latestHash = _r[3].toHash(); - auto genesisHash = _r[4].toHash(); - - clog(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << genesisHash << ", TD:" << m_totalDifficulty << "=" << m_latestHash; - - if (genesisHash != host()->m_chain.genesisHash()) - disable("Invalid genesis hash"); - else if (m_protocolVersion != host()->protocolVersion()) - disable("Invalid protocol version."); - else if (m_networkId != host()->networkId()) - disable("Invalid network identifier."); - else if (session()->info().clientVersion.find("/v0.7.0/") != string::npos) - disable("Blacklisted client version."); - else if (host()->isBanned(session()->id())) - disable("Peer banned for previous bad behaviour."); - else - transition(Asking::Nothing); + m_genesisHash = _r[4].toHash(); + clog(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << m_genesisHash << ", TD:" << m_totalDifficulty << "=" << m_latestHash; + host()->onPeerState(this); break; } case TransactionsPacket: @@ -356,19 +242,45 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) } case GetBlockHashesPacket: { - h256 later = _r[0].toHash(); - unsigned limit = _r[1].toInt(); - clog(NetMessageSummary) << "GetBlockHashes (" << limit << "entries," << later << ")"; - - unsigned c = min(host()->m_chain.number(later), limit); - - RLPStream s; - prep(s, BlockHashesPacket, c); - h256 p = host()->m_chain.details(later).parent; - for (unsigned i = 0; i < c && p; ++i, p = host()->m_chain.details(p).parent) - s << p; - sealAndSend(s); - addRating(0); + if (m_protocolVersion == host()->protocolVersion()) + { + u256 number256 = _r[0].toInt(); + unsigned number = (unsigned) number256; + unsigned limit = _r[1].toInt(); + clog(NetMessageSummary) << "GetBlockHashes (" << number << "-" << number + limit << ")"; + RLPStream s; + if (number <= host()->m_chain.number()) + { + unsigned c = min(host()->m_chain.number() - number + 1, limit); + prep(s, BlockHashesPacket, c); + for (unsigned n = number; n < number + c; n++) + { + h256 p = host()->m_chain.numberHash(n); + s << p; + } + } + else + prep(s, BlockHashesPacket, 0); + sealAndSend(s); + addRating(0); + } + else + { + // Support V60 protocol + h256 later = _r[0].toHash(); + unsigned limit = _r[1].toInt(); + clog(NetMessageSummary) << "GetBlockHashes (" << limit << "entries," << later << ")"; + + unsigned c = min(host()->m_chain.number(later), limit); + + RLPStream s; + prep(s, BlockHashesPacket, c); + h256 p = host()->m_chain.details(later).parent; + for (unsigned i = 0; i < c && p; ++i, p = host()->m_chain.details(p).parent) + s << p; + sealAndSend(s); + addRating(0); + } break; } case BlockHashesPacket: @@ -383,40 +295,24 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) } if (itemCount == 0) { - transition(Asking::Blocks); + host()->onPeerDoneHashes(this, false); return true; } - unsigned knowns = 0; - unsigned unknowns = 0; + h256s hashes(itemCount); for (unsigned i = 0; i < itemCount; ++i) { - addRating(1); - auto h = _r[i].toHash(); - auto status = host()->m_bq.blockStatus(h); - if (status == QueueStatus::Importing || status == QueueStatus::Ready || host()->m_chain.isKnown(h)) - { - clog(NetMessageSummary) << "block hash ready:" << h << ". Start blocks download..."; - transition(Asking::Blocks); - return true; - } - else if (status == QueueStatus::Bad) - { - cwarn << "block hash bad!" << h << ". Bailing..."; - transition(Asking::Nothing); - return true; - } - else if (status == QueueStatus::Unknown) - { - unknowns++; - m_syncingNeededBlocks.push_back(h); - } - else - knowns++; - m_syncingLastReceivedHash = h; + hashes[i] = _r[i].toHash(); + m_hashSub.noteHash(m_syncHashNumber + i, 1); } - clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns; now at" << m_syncingLastReceivedHash; - // run through - ask for more. - transition(Asking::Hashes); + + if (m_protocolVersion == host()->protocolVersion()) + { + //v61, report hashes ordered by number + host()->onPeerHashes(this, m_syncHashNumber, hashes); + } + else + host()->onPeerHashes(this, hashes); + m_syncHashNumber += itemCount; break; } case GetBlocksPacket: @@ -455,74 +351,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) } case BlocksPacket: { - unsigned itemCount = _r.itemCount(); - clog(NetMessageSummary) << "Blocks (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreBlocks"); - - if (m_asking != Asking::Blocks) - clog(NetWarn) << "Unexpected Blocks received!"; - - if (itemCount == 0) - { - // Got to this peer's latest block - just give up. - transition(Asking::Nothing); - break; - } - - unsigned success = 0; - unsigned future = 0; - unsigned unknown = 0; - unsigned got = 0; - unsigned repeated = 0; - - for (unsigned i = 0; i < itemCount; ++i) - { - auto h = BlockInfo::headerHash(_r[i].data()); - if (m_sub.noteBlock(h)) - { - addRating(10); - switch (host()->m_bq.import(_r[i].data(), host()->m_chain)) - { - case ImportResult::Success: - success++; - break; - - case ImportResult::Malformed: - case ImportResult::BadChain: - disable("Malformed block received."); - return true; - - case ImportResult::FutureTime: - future++; - break; - - case ImportResult::AlreadyInChain: - case ImportResult::AlreadyKnown: - got++; - break; - - case ImportResult::UnknownParent: - unknown++; - break; - - default:; - } - } - else - { - addRating(0); // -1? - repeated++; - } - } - - clog(NetMessageSummary) << dec << success << "imported OK," << unknown << "with unknown parents," << future << "with future timestamps," << got << " already known," << repeated << " repeats received."; - - if (m_asking == Asking::Blocks) - { - if (!got) - transition(Asking::Blocks); - else - transition(Asking::Nothing); - } + host()->onPeerBlocks(this, _r); break; } case NewBlockPacket: @@ -571,39 +400,16 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) clog(NetMessageSummary) << "Ignoring since we're already downloading."; else { - unsigned knowns = 0; - unsigned unknowns = 0; unsigned itemCount = _r.itemCount(); + clog(NetMessageSummary) << "BlockHashes (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreHashes"); + + h256s hashes(itemCount); for (unsigned i = 0; i < itemCount; ++i) - { - addRating(1); - auto h = _r[i].toHash(); - DEV_GUARDED(x_knownBlocks) - m_knownBlocks.insert(h); - auto status = host()->m_bq.blockStatus(h); - if (status == QueueStatus::Importing || status == QueueStatus::Ready || host()->m_chain.isKnown(h)) - knowns++; - else if (status == QueueStatus::Bad) - { - cwarn << "block hash bad!" << h << ". Bailing..."; - return true; - } - else if (status == QueueStatus::Unknown) - { - unknowns++; - m_syncingNeededBlocks.push_back(h); - } - else - knowns++; - } - clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns"; - if (unknowns > 0) - { - clog(NetNote) << "Not syncing and new block hash discovered: syncing without help."; - host()->m_man.resetToChain(m_syncingNeededBlocks); - host()->changeSyncer(this, false); - transition(Asking::Blocks, false, false); // TODO: transaction(Asking::NewBlocks, false) - } + hashes[i] = _r[i].toHash(); + + clog(NetNote) << "Not syncing and new block hash discovered: syncing without help."; + host()->onPeerHashes(this, hashes); + host()->onPeerDoneHashes(this, true); return true; } break; diff --git a/libethereum/EthereumPeer.h b/libethereum/EthereumPeer.h index ae2289102..2dd17d8c3 100644 --- a/libethereum/EthereumPeer.h +++ b/libethereum/EthereumPeer.h @@ -70,18 +70,18 @@ public: /// What is the ethereum subprotocol host object. EthereumHost* host() const; + void setIdle(); + void requestState(); + void requestHashes(); + void requestHashes(h256 const& _lastHash); + void requestBlocks(); + private: using p2p::Capability::sealAndSend; /// Interpret an incoming message. virtual bool interpret(unsigned _id, RLP const& _r); - /// Transition state in a particular direction. - void transition(Asking _wantState, bool _force = false, bool _needHelp = true); - - /// Attempt to begin syncing with this peer; first check the peer has a more difficlult chain to download, then start asking for hashes, then move to blocks. - void attemptSync(); - /// Abort the sync operation. void abortSync(); @@ -89,7 +89,7 @@ private: void clearKnownTransactions() { std::lock_guard l(x_knownTransactions); m_knownTransactions.clear(); } /// Update our asking state. - void setAsking(Asking _g, bool _isSyncing, bool _needHelp = true); + void setAsking(Asking _g); /// Update our syncing requirements state. void setNeedsSyncing(h256 _latestHash, u256 _td); @@ -123,6 +123,7 @@ private: /// These are determined through either a Status message or from NewBlock. h256 m_latestHash; ///< Peer's latest block's hash that we know about or default null value if no need to sync. u256 m_totalDifficulty; ///< Peer's latest block's total difficulty. + h256 m_genesisHash; ///< Peer's genesis hash /// Once a sync is started on this peer, they are cleared and moved into m_syncing*. /// This is built as we ask for hashes. Once no more hashes are given, we present this to the @@ -131,9 +132,12 @@ private: h256 m_syncingLastReceivedHash; ///< Hash most recently received from peer. h256 m_syncingLatestHash; ///< Peer's latest block's hash, as of the current sync. u256 m_syncingTotalDifficulty; ///< Peer's latest block's total difficulty, as of the current sync. + unsigned m_expectedHashes = 0; ///< Estimated Upper bound of hashes to expect from this peer. + unsigned m_syncHashNumber = 0; /// Once we're asking for blocks, this becomes in use. DownloadSub m_sub; + HashDownloadSub m_hashSub; /// Have we received a GetTransactions packet that we haven't yet answered? bool m_requireTransactions = false; diff --git a/libp2p/Capability.h b/libp2p/Capability.h index d09391655..536357b51 100644 --- a/libp2p/Capability.h +++ b/libp2p/Capability.h @@ -44,6 +44,7 @@ public: */ Session* session() const { return m_session; } HostCapabilityFace* hostCapability() const { return m_host; } + bool enabled() { return m_enabled; } protected: virtual bool interpret(unsigned _id, RLP const&) = 0; diff --git a/libp2p/Host.h b/libp2p/Host.h index 78dc50727..3c7ce257a 100644 --- a/libp2p/Host.h +++ b/libp2p/Host.h @@ -99,6 +99,7 @@ public: /// Register a peer-capability; all new peer connections will have this capability. template std::shared_ptr registerCapability(T* _t) { _t->m_host = this; std::shared_ptr ret(_t); m_capabilities[std::make_pair(T::staticName(), T::staticVersion())] = ret; return ret; } + template void addCapability(std::shared_ptr const & _p, std::string const& _name, u256 const& _version) { m_capabilities[std::make_pair(_name, _version)] = _p; } bool haveCapability(CapDesc const& _name) const { return m_capabilities.count(_name) != 0; } CapDescs caps() const { CapDescs ret; for (auto const& i: m_capabilities) ret.push_back(i.first); return ret; } diff --git a/libp2p/HostCapability.cpp b/libp2p/HostCapability.cpp index b2acdcd1b..9502d6b86 100644 --- a/libp2p/HostCapability.cpp +++ b/libp2p/HostCapability.cpp @@ -28,12 +28,17 @@ using namespace dev; using namespace dev::p2p; std::vector,std::shared_ptr>> HostCapabilityFace::peerSessions() const +{ + return peerSessions(version()); +} + +std::vector,std::shared_ptr>> HostCapabilityFace::peerSessions(u256 const& _version) const { RecursiveGuard l(m_host->x_sessions); std::vector,std::shared_ptr>> ret; for (auto const& i: m_host->m_sessions) if (std::shared_ptr s = i.second.lock()) - if (s->m_capabilities.count(capDesc())) + if (s->m_capabilities.count(std::make_pair(name(), _version))) ret.push_back(make_pair(s,s->m_peer)); return ret; } diff --git a/libp2p/HostCapability.h b/libp2p/HostCapability.h index 93086b1c9..a9ce1585b 100644 --- a/libp2p/HostCapability.h +++ b/libp2p/HostCapability.h @@ -46,6 +46,7 @@ public: Host* host() const { return m_host; } std::vector,std::shared_ptr>> peerSessions() const; + std::vector,std::shared_ptr>> peerSessions(u256 const& _version) const; protected: virtual std::string name() const = 0; diff --git a/libp2p/Session.h b/libp2p/Session.h index be8422c82..bcbf8022b 100644 --- a/libp2p/Session.h +++ b/libp2p/Session.h @@ -69,6 +69,8 @@ public: template std::shared_ptr cap() const { try { return std::static_pointer_cast(m_capabilities.at(std::make_pair(PeerCap::name(), PeerCap::version()))); } catch (...) { return nullptr; } } + template + std::shared_ptr cap(u256 const& _version) const { try { return std::static_pointer_cast(m_capabilities.at(std::make_pair(PeerCap::name(), _version))); } catch (...) { return nullptr; } } static RLPStream& prep(RLPStream& _s, PacketType _t, unsigned _args = 0); void sealAndSend(RLPStream& _s); From 597f56843b2d9e24b34de9876f06733d07cd4e06 Mon Sep 17 00:00:00 2001 From: arkpar Date: Wed, 27 May 2015 16:22:14 +0200 Subject: [PATCH 2/2] GetBlockHashesByNumber packet --- libethereum/Client.cpp | 3 +- libethereum/CommonNet.h | 1 + libethereum/DownloadMan.cpp | 3 +- libethereum/DownloadMan.h | 12 +- libethereum/EthereumHost.cpp | 148 ++++++++++++-------- libethereum/EthereumHost.h | 51 +++---- libethereum/EthereumPeer.cpp | 264 +++++++++++------------------------ libethereum/EthereumPeer.h | 40 +++--- libp2p/Capability.h | 1 - libp2p/Host.cpp | 6 +- libp2p/HostCapability.h | 4 +- libwhisper/WhisperPeer.cpp | 2 +- libwhisper/WhisperPeer.h | 3 +- test/libp2p/capability.cpp | 2 +- 14 files changed, 230 insertions(+), 310 deletions(-) diff --git a/libethereum/Client.cpp b/libethereum/Client.cpp index 3ca9a3172..e7d7a543b 100644 --- a/libethereum/Client.cpp +++ b/libethereum/Client.cpp @@ -185,10 +185,9 @@ Client::Client(p2p::Host* _extNet, std::shared_ptr _gp, std::string c m_gp->update(m_bc); - auto host = _extNet->registerCapability(new EthereumHost(m_bc, m_tq, m_bq, _networkId)); m_host = host; - _extNet->addCapability(host, EthereumHost::staticName(), EthereumHost::staticVersion() - 1); + _extNet->addCapability(host, EthereumHost::staticName(), EthereumHost::c_oldProtocolVersion); //TODO: remove this one v61+ protocol is common if (_dbPath.size()) Defaults::setDBPath(_dbPath); diff --git a/libethereum/CommonNet.h b/libethereum/CommonNet.h index 8b810bd10..a2f4a2e7c 100644 --- a/libethereum/CommonNet.h +++ b/libethereum/CommonNet.h @@ -63,6 +63,7 @@ enum GetBlocksPacket, BlocksPacket, NewBlockPacket, + GetBlockHashesByNumberPacket, PacketCount }; diff --git a/libethereum/DownloadMan.cpp b/libethereum/DownloadMan.cpp index 838e29078..3e33f3eb5 100644 --- a/libethereum/DownloadMan.cpp +++ b/libethereum/DownloadMan.cpp @@ -122,5 +122,6 @@ void HashDownloadSub::noteHash(unsigned _index, unsigned _size) Guard l(m_fetch); if (m_man) for(unsigned i = _index; i < _index + _size; ++i) - m_man->m_got += i; + if (i >= m_man->m_got.all().first && i < m_man->m_got.all().second) + m_man->m_got += i; } diff --git a/libethereum/DownloadMan.h b/libethereum/DownloadMan.h index 4ff83847c..3e1a071c9 100644 --- a/libethereum/DownloadMan.h +++ b/libethereum/DownloadMan.h @@ -234,12 +234,6 @@ public: m_chainStart = _start; m_chainCount = 0; m_got = RangeMask(_start, _start); - - { - ReadGuard l(x_subs); - for (auto i: m_subs) - i->resetFetch(); - } } RangeMask taken(bool _desperate = false) const @@ -259,7 +253,7 @@ public: { ReadGuard l(m_lock); return m_got.full(); - }\ + } size_t chainSize() const { ReadGuard l(m_lock); return m_chainCount; } size_t chainEmpty() const { ReadGuard l(m_lock); return m_chainCount == 0; } @@ -277,10 +271,6 @@ private: std::unordered_set m_subs; }; - - - - } } diff --git a/libethereum/EthereumHost.cpp b/libethereum/EthereumHost.cpp index af1caf3d4..cbf96a011 100644 --- a/libethereum/EthereumHost.cpp +++ b/libethereum/EthereumHost.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include "BlockChain.h" #include "TransactionQueue.h" #include "BlockQueue.h" @@ -37,7 +38,7 @@ using namespace dev; using namespace dev::eth; using namespace p2p; -const unsigned c_prevProtocolVersion = 60; +unsigned const EthereumHost::c_oldProtocolVersion = 60; //TODO: remove this once v61+ is common EthereumHost::EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQueue& _bq, u256 _networkId): HostCapability(), @@ -71,12 +72,6 @@ bool EthereumHost::ensureInitialised() return false; } -void EthereumHost::noteNeedsSyncing(EthereumPeer* _who) -{ - if (_who->m_asking == Asking::Nothing) - continueSync(_who); -} - void EthereumHost::reset() { forEachPeer([](EthereumPeer* _p) { _p->abortSync(); }); @@ -88,7 +83,7 @@ void EthereumHost::reset() m_syncingTotalDifficulty = 0; m_latestBlockSent = h256(); m_transactionsSent.clear(); - m_v60Hashes.clear(); + m_hashes.clear(); } void EthereumHost::doWork() @@ -130,7 +125,7 @@ void EthereumHost::maintainTransactions() } for (auto const& t: ts) m_transactionsSent.insert(t.first); - forEachPeer([&](shared_ptr _p) + forEachPeerPtr([&](shared_ptr _p) { bytes b; unsigned n = 0; @@ -153,29 +148,27 @@ void EthereumHost::maintainTransactions() }); } -void EthereumHost::forEachPeer(std::function const& _f) +void EthereumHost::forEachPeer(std::function const& _f) const { - forEachPeer([&](std::shared_ptr _p) + forEachPeerPtr([&](std::shared_ptr _p) { if (_p) _f(_p.get()); }); } -void EthereumHost::forEachPeer(std::function)> const& _f) +void EthereumHost::forEachPeerPtr(std::function)> const& _f) const { for (auto s: peerSessions()) _f(s.first->cap()); - for (auto s: peerSessions(protocolVersion() - 1)) //TODO: - _f(s.first->cap(protocolVersion() - 1)); - + for (auto s: peerSessions(c_oldProtocolVersion)) //TODO: remove once v61+ is common + _f(s.first->cap(c_oldProtocolVersion)); } pair>, vector>> EthereumHost::randomSelection(unsigned _percent, std::function const& _allow) { pair>, vector>> ret; - vector> peers; - forEachPeer([&](shared_ptr _p) + forEachPeerPtr([&](shared_ptr _p) { if (_p && _allow(_p.get())) ret.second.push_back(_p); @@ -233,16 +226,12 @@ void EthereumHost::maintainBlocks(h256 const& _currentHash) } } -void EthereumHost::onPeerState(EthereumPeer* _peer) +void EthereumHost::onPeerStatus(EthereumPeer* _peer) { - if (!_peer->enabled()) - { - clog(NetNote) << "Ignoring status from disabled peer"; - return; - } + Guard l(x_sync); if (_peer->m_genesisHash != m_chain.genesisHash()) _peer->disable("Invalid genesis hash"); - else if (_peer->m_protocolVersion != protocolVersion())// && _peer->m_protocolVersion != c_prevProtocolVersion) + else if (_peer->m_protocolVersion != protocolVersion() && _peer->m_protocolVersion != c_oldProtocolVersion) _peer->disable("Invalid protocol version."); else if (_peer->m_networkId != networkId()) _peer->disable("Invalid network identifier."); @@ -252,16 +241,44 @@ void EthereumHost::onPeerState(EthereumPeer* _peer) _peer->disable("Peer banned for previous bad behaviour."); else { - - _peer->m_expectedHashes = 500000; //TODO: + if (_peer->m_protocolVersion != protocolVersion()) + estimatePeerHashes(_peer); + else if (_peer->m_latestBlockNumber > m_chain.number()) + _peer->m_expectedHashes = (unsigned)_peer->m_latestBlockNumber - m_chain.number(); if (m_hashMan.chainSize() < _peer->m_expectedHashes) m_hashMan.resetToRange(m_chain.number() + 1, _peer->m_expectedHashes); continueSync(_peer); } } +void EthereumHost::estimatePeerHashes(EthereumPeer* _peer) +{ + BlockInfo block = m_chain.info(); + time_t lastBlockTime = (block.hash() == m_chain.genesisHash()) ? 1428192000 : (time_t)block.timestamp; + time_t now = time(0); + unsigned blockCount = 1000; + if (lastBlockTime > now) + clog(NetWarn) << "Clock skew? Latest block is in the future"; + else + blockCount += (now - lastBlockTime) / (unsigned)c_durationLimit; + clog(NetAllDetail) << "Estimated hashes: " << blockCount; + _peer->m_expectedHashes = blockCount; +} + void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes) { + Guard l(x_sync); + assert(_peer->m_asking == Asking::Nothing); + onPeerHashes(_peer, _hashes, false); +} + +void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes, bool _complete) +{ + if (_hashes.empty()) + { + onPeerDoneHashes(_peer, true); + return; + } unsigned knowns = 0; unsigned unknowns = 0; h256s neededBlocks; @@ -273,8 +290,8 @@ void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes) if (status == QueueStatus::Importing || status == QueueStatus::Ready || m_chain.isKnown(h)) { clog(NetMessageSummary) << "block hash ready:" << h << ". Start blocks download..."; - m_v60Hashes += neededBlocks; - onPeerDoneHashes(_peer, false); + m_hashes += neededBlocks; + onPeerDoneHashes(_peer, true); return; } else if (status == QueueStatus::Bad) @@ -292,7 +309,7 @@ void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes) knowns++; m_syncingLatestHash = h; } - m_v60Hashes += neededBlocks; + m_hashes += neededBlocks; clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns; now at" << m_syncingLatestHash; if (_complete) { @@ -312,6 +329,13 @@ void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes) void EthereumHost::onPeerHashes(EthereumPeer* _peer, unsigned /*_index*/, h256s const& _hashes) { + Guard l(x_sync); + assert(_peer->m_asking == Asking::Nothing); + if (_hashes.empty()) + { + onPeerDoneHashes(_peer, true); + return; + } unsigned knowns = 0; unsigned unknowns = 0; h256s neededBlocks; @@ -322,11 +346,11 @@ void EthereumHost::onPeerHashes(EthereumPeer* _peer, unsigned /*_index*/, h256s auto status = m_bq.blockStatus(h); if (status == QueueStatus::Importing || status == QueueStatus::Ready || m_chain.isKnown(h)) { - clog(NetWarn) << "block hash alrady known:" << h; + clog(NetWarn) << "block hash already known:" << h; } else if (status == QueueStatus::Bad) { - cwarn << "block hash bad!" << h << ". Bailing..."; + clog(NetWarn) << "block hash bad!" << h << ". Bailing..."; _peer->setIdle(); return; } @@ -337,10 +361,9 @@ void EthereumHost::onPeerHashes(EthereumPeer* _peer, unsigned /*_index*/, h256s } else knowns++; - m_syncingLatestHash = h; } m_man.appendToChain(neededBlocks); - clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns; now at" << m_syncingLatestHash; + clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns"; if (m_hashMan.isComplete()) { @@ -356,33 +379,30 @@ void EthereumHost::onPeerHashes(EthereumPeer* _peer, unsigned /*_index*/, h256s continueSync(_peer); } -void EthereumHost::onPeerDoneHashes(EthereumPeer* _peer, bool _new) +void EthereumHost::onPeerDoneHashes(EthereumPeer* _peer, bool _localChain) { + assert(_peer->m_asking == Asking::Nothing); m_needSyncHashes = false; - if (_peer->m_protocolVersion == protocolVersion() || _new) - { - continueSync(_peer); - } - else + if (_peer->m_protocolVersion != protocolVersion() || _localChain) { - m_man.resetToChain(m_v60Hashes); - continueSync(); + m_man.resetToChain(m_hashes); + m_hashes.clear(); } + continueSync(); } void EthereumHost::onPeerBlocks(EthereumPeer* _peer, RLP const& _r) { - if (!_peer->enabled()) - { - clog(NetNote) << "Ignoring blocks from disabled peer"; - return; - } + Guard l(x_sync); + assert(_peer->m_asking == Asking::Nothing); unsigned itemCount = _r.itemCount(); clog(NetMessageSummary) << "Blocks (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreBlocks"); if (itemCount == 0) { // Got to this peer's latest block - just give up. + clog(NetNote) << "Finishing blocks fetch..."; + // NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary. _peer->setIdle(); return; } @@ -550,9 +570,9 @@ void EthereumHost::onPeerTransactions(EthereumPeer* _peer, RLP const& _r) void EthereumHost::continueSync() { + clog(NetAllDetail) << "Getting help with downloading hashes and blocks"; forEachPeer([&](EthereumPeer* _p) { - clog(NetNote) << "Getting help with downloading hashes and blocks"; if (_p->m_asking == Asking::Nothing) continueSync(_p); }); @@ -560,29 +580,23 @@ void EthereumHost::continueSync() void EthereumHost::continueSync(EthereumPeer* _peer) { + assert(_peer->m_asking == Asking::Nothing); bool otherPeerSync = false; - bool thisPeerSync = false; if (m_needSyncHashes && peerShouldGrabChain(_peer)) { forEachPeer([&](EthereumPeer* _p) { - if (_p->m_asking == Asking::Hashes && _p->m_protocolVersion != protocolVersion()) - { - // Already have a peer downloading hash chain with old protocol, do nothing - if (_p == _peer) - thisPeerSync = true; - else - otherPeerSync = true; - - } + if (_p != _peer && _p->m_asking == Asking::Hashes && _p->m_protocolVersion != protocolVersion()) + otherPeerSync = true; // Already have a peer downloading hash chain with old protocol, do nothing }); if (otherPeerSync) { + /// Downloading from other peer with v60 protocol, nothing ese we can do _peer->setIdle(); return; } - if (_peer->m_protocolVersion == protocolVersion()) - _peer->requestHashes(); + if (_peer->m_protocolVersion == protocolVersion() && !m_syncingLatestHash) + _peer->requestHashes(); /// v61+ and not catching up to a particular hash else { // Restart/continue sync in single peer mode @@ -606,11 +620,9 @@ bool EthereumHost::peerShouldGrabBlocks(EthereumPeer* _peer) const auto lh = m_syncingLatestHash; auto ctd = m_chain.details().totalDifficulty; - clog(NetNote) << "Should grab blocks? " << td << "vs" << ctd; - + clog(NetAllDetail) << "Should grab blocks? " << td << "vs" << ctd; if (td < ctd || (td == ctd && m_chain.currentHash() == lh)) return false; - return true; } @@ -632,3 +644,15 @@ bool EthereumHost::peerShouldGrabChain(EthereumPeer* _peer) const return true; } } + +bool EthereumHost::isSyncing() const +{ + Guard l(x_sync); + bool syncing = false; + forEachPeer([&](EthereumPeer* _p) + { + if (_p->m_asking != Asking::Nothing) + syncing = true; + }); + return syncing; +} diff --git a/libethereum/EthereumHost.h b/libethereum/EthereumHost.h index 900e515b8..497255034 100644 --- a/libethereum/EthereumHost.h +++ b/libethereum/EthereumHost.h @@ -56,7 +56,6 @@ class BlockQueue; */ class EthereumHost: public p2p::HostCapability, Worker { - friend class EthereumPeer; public: /// Start server, but don't listen. EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQueue& _bq, u256 _networkId); @@ -71,22 +70,30 @@ public: void reset(); DownloadMan const& downloadMan() const { return m_man; } - bool isSyncing() const { return m_needSyncBlocks || m_needSyncHashes; } + bool isSyncing() const; bool isBanned(p2p::NodeId _id) const { return !!m_banned.count(_id); } void noteNewTransactions() { m_newTransactions = true; } void noteNewBlocks() { m_newBlocks = true; } - void onPeerState(EthereumPeer* _peer); + void onPeerStatus(EthereumPeer* _peer); ///< Called by peer to report status + void onPeerBlocks(EthereumPeer* _peer, RLP const& _r); ///< Called by peer once it has new blocks during syn + void onPeerNewBlock(EthereumPeer* _peer, RLP const& _r); ///< Called by peer once it has new blocks + void onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes); ///< Called by peer once it has new hashes + void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes); ///< Called by peer once it has another sequential block of hashes during sync + void onPeerHashes(EthereumPeer* _peer, unsigned _index, h256s const& _hashes); ///< Called by peer once it has a new ordered block of hashes starting with a particular number + void onPeerTransactions(EthereumPeer* _peer, RLP const& _r); ///< Called by peer when it has new transactions + DownloadMan& downloadMan() { return m_man; } + HashDownloadMan& hashDownloadMan() { return m_hashMan; } + BlockChain const& chain() { return m_chain; } + + static unsigned const c_oldProtocolVersion; private: std::pair>, std::vector>> randomSelection(unsigned _percent = 25, std::function const& _allow = [](EthereumPeer const*){ return true; }); - void forEachPeer(std::function)> const& _f); - void forEachPeer(std::function const& _f); - - /// Session is tell us that we may need (re-)syncing with the peer. - void noteNeedsSyncing(EthereumPeer* _who); + void forEachPeerPtr(std::function)> const& _f) const; + void forEachPeer(std::function const& _f) const; /// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network. void doWork(); @@ -108,15 +115,13 @@ private: virtual void onStarting() { startWorking(); } virtual void onStopping() { stopWorking(); } - void changeSyncer(EthereumPeer* _ignore, bool _needHelp = true); - void continueSync(); - void continueSync(EthereumPeer* _peer); - void onPeerBlocks(EthereumPeer* _peer, RLP const& _r); - void onPeerDoneHashes(EthereumPeer* _peer, bool _new); - void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes); - void onPeerHashes(EthereumPeer* _peer, unsigned _index, h256s const& _hashes); + void continueSync(); /// Find something to do for all peers + void continueSync(EthereumPeer* _peer); /// Find some work to do for a peer + void onPeerDoneHashes(EthereumPeer* _peer, bool _new); /// Called when done downloading hashes from peer + void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes, bool _complete); bool peerShouldGrabBlocks(EthereumPeer* _peer) const; bool peerShouldGrabChain(EthereumPeer* _peer) const; + void estimatePeerHashes(EthereumPeer* _peer); BlockChain const& m_chain; TransactionQueue& m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain. @@ -124,8 +129,6 @@ private: u256 m_networkId; - std::weak_ptr m_hashSyncer; - DownloadMan m_man; HashDownloadMan m_hashMan; @@ -137,14 +140,12 @@ private: bool m_newTransactions = false; bool m_newBlocks = false; - unsigned m_maxKnownNumber = 0; - u256 m_maxKnownDifficulty; - bool m_needSyncHashes = true; - bool m_needSyncBlocks = true; - h256 m_syncingLatestHash; ///< Peer's latest block's hash, as of the current sync. - u256 m_syncingTotalDifficulty; ///< Peer's latest block's total difficulty, as of the current sync. - h256s m_v60Hashes; - + mutable Mutex x_sync; + bool m_needSyncHashes = true; ///< Indicates if need to downlad hashes + bool m_needSyncBlocks = true; ///< Indicates if we still need to download some blocks + h256 m_syncingLatestHash; ///< Latest block's hash, as of the current sync. + u256 m_syncingTotalDifficulty; ///< Latest block's total difficulty, as of the current sync. + h256s m_hashes; ///< List of hashes with unknown block numbers. Used for v60 chain downloading and catching up to a particular unknown }; } diff --git a/libethereum/EthereumPeer.cpp b/libethereum/EthereumPeer.cpp index 1813bb956..a1c22e449 100644 --- a/libethereum/EthereumPeer.cpp +++ b/libethereum/EthereumPeer.cpp @@ -34,12 +34,14 @@ using namespace dev; using namespace dev::eth; using namespace p2p; -EthereumPeer::EthereumPeer(Session* _s, HostCapabilityFace* _h, unsigned _i): +EthereumPeer::EthereumPeer(Session* _s, HostCapabilityFace* _h, unsigned _i, CapDesc const& _cap): Capability(_s, _h, _i), - m_sub(host()->m_man), - m_hashSub(host()->m_hashMan) + m_sub(host()->downloadMan()), + m_hashSub(host()->hashDownloadMan()), + m_peerCapabilityVersion(_cap.second) { - requestState(); + m_syncHashNumber = host()->chain().number() + 1; + requestStatus(); } EthereumPeer::~EthereumPeer() @@ -78,55 +80,44 @@ string toString(Asking _a) void EthereumPeer::setIdle() { - if (m_asking == Asking::Blocks) - { - clog(NetNote) << "Finishing blocks fetch..."; - // NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary. - m_sub.doneFetch(); - m_hashSub.doneFetch(); - - setAsking(Asking::Nothing); - } - else if (m_asking == Asking::Hashes) - { - clog(NetNote) << "Finishing hashes fetch..."; - - setAsking(Asking::Nothing); - } - else if (m_asking == Asking::State) - { - setAsking(Asking::Nothing); - } + m_sub.doneFetch(); + m_hashSub.doneFetch(); + setAsking(Asking::Nothing); } -void EthereumPeer::requestState() +void EthereumPeer::requestStatus() { if (m_asking != Asking::Nothing) clog(NetWarn) << "Bad state: requesting state should be the first action"; setAsking(Asking::State); RLPStream s; - prep(s, StatusPacket, 5) - << host()->protocolVersion() - 1 + bool latest = m_peerCapabilityVersion == host()->protocolVersion(); + prep(s, StatusPacket, latest ? 6 : 5) + << (latest ? host()->protocolVersion() : EthereumHost::c_oldProtocolVersion) << host()->networkId() - << host()->m_chain.details().totalDifficulty - << host()->m_chain.currentHash() - << host()->m_chain.genesisHash(); + << host()->chain().details().totalDifficulty + << host()->chain().currentHash() + << host()->chain().genesisHash(); + if (latest) + s << u256(host()->chain().number()); sealAndSend(s); } void EthereumPeer::requestHashes() { - assert(m_asking != Asking::Blocks); - m_syncHashNumber = m_hashSub.nextFetch(c_maxBlocksAsk); + if (m_asking == Asking::Blocks) + return; + m_syncHashNumber = m_hashSub.nextFetch(c_maxHashesAsk); setAsking(Asking::Hashes); RLPStream s; - prep(s, GetBlockHashesPacket, 2) << m_syncHashNumber << c_maxHashesAsk; + prep(s, GetBlockHashesByNumberPacket, 2) << m_syncHashNumber << c_maxHashesAsk; sealAndSend(s); } void EthereumPeer::requestHashes(h256 const& _lastHash) { - assert(m_asking != Asking::Blocks); + if (m_asking == Asking::Blocks) + return; setAsking(Asking::Hashes); RLPStream s; prep(s, GetBlockHashesPacket, 2) << _lastHash << c_maxHashesAsk; @@ -135,8 +126,7 @@ void EthereumPeer::requestHashes(h256 const& _lastHash) void EthereumPeer::requestBlocks() { - // Looks like it's the best yet for total difficulty. Set to download. - setAsking(Asking::Blocks); // will kick off other peers to help if available. + setAsking(Asking::Blocks); auto blocks = m_sub.nextFetch(c_maxBlocksAsk); if (blocks.size()) { @@ -154,31 +144,12 @@ void EthereumPeer::requestBlocks() void EthereumPeer::setAsking(Asking _a) { m_asking = _a; - - if (!isSyncing()) - { - m_syncingLatestHash = h256(); - m_syncingTotalDifficulty = 0; - m_syncingNeededBlocks.clear(); - } - m_lastAsk = chrono::system_clock::now(); session()->addNote("ask", _a == Asking::Nothing ? "nothing" : _a == Asking::State ? "state" : _a == Asking::Hashes ? "hashes" : _a == Asking::Blocks ? "blocks" : "?"); session()->addNote("sync", string(isSyncing() ? "ongoing" : "holding") + (needsSyncing() ? " & needed" : "")); } -void EthereumPeer::setNeedsSyncing(h256 _latestHash, u256 _td) -{ - m_latestHash = _latestHash; - m_totalDifficulty = _td; - - if (m_latestHash) - host()->noteNeedsSyncing(this); - - session()->addNote("sync", string(isSyncing() ? "ongoing" : "holding") + (needsSyncing() ? " & needed" : "")); -} - void EthereumPeer::tick() { if (chrono::system_clock::now() - m_lastAsk > chrono::seconds(10) && m_asking != Asking::Nothing) @@ -200,87 +171,62 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) case StatusPacket: { m_protocolVersion = _r[0].toInt(); - if (!!session()->cap(EthereumHost::staticVersion())) - m_protocolVersion = host()->protocolVersion(); m_networkId = _r[1].toInt(); - - // a bit dirty as we're misusing these to communicate the values to transition, but harmless. m_totalDifficulty = _r[2].toInt(); m_latestHash = _r[3].toHash(); m_genesisHash = _r[4].toHash(); - clog(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << m_genesisHash << ", TD:" << m_totalDifficulty << "=" << m_latestHash; - host()->onPeerState(this); + if (m_peerCapabilityVersion == host()->protocolVersion()) + { + m_protocolVersion = host()->protocolVersion(); + m_latestBlockNumber = _r[5].toInt(); + } + + clog(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << m_genesisHash << "/" << m_latestBlockNumber << ", TD:" << m_totalDifficulty << "=" << m_latestHash; + setAsking(Asking::Nothing); + host()->onPeerStatus(this); break; } case TransactionsPacket: { - unsigned itemCount = _r.itemCount(); - clog(NetAllDetail) << "Transactions (" << dec << itemCount << "entries)"; - Guard l(x_knownTransactions); - for (unsigned i = 0; i < itemCount; ++i) - { - auto h = sha3(_r[i].data()); - m_knownTransactions.insert(h); - ImportResult ir = host()->m_tq.import(_r[i].data()); - switch (ir) - { - case ImportResult::Malformed: - addRating(-100); - break; - case ImportResult::AlreadyKnown: - // if we already had the transaction, then don't bother sending it on. - host()->m_transactionsSent.insert(h); - addRating(0); - break; - case ImportResult::Success: - addRating(100); - break; - default:; - } - } + host()->onPeerTransactions(this, _r); break; } case GetBlockHashesPacket: { - if (m_protocolVersion == host()->protocolVersion()) + h256 later = _r[0].toHash(); + unsigned limit = _r[1].toInt(); + clog(NetMessageSummary) << "GetBlockHashes (" << limit << "entries," << later << ")"; + unsigned c = min(host()->chain().number(later), limit); + RLPStream s; + prep(s, BlockHashesPacket, c); + h256 p = host()->chain().details(later).parent; + for (unsigned i = 0; i < c && p; ++i, p = host()->chain().details(p).parent) + s << p; + sealAndSend(s); + addRating(0); + break; + } + case GetBlockHashesByNumberPacket: + { + u256 number256 = _r[0].toInt(); + unsigned number = (unsigned) number256; + unsigned limit = _r[1].toInt(); + clog(NetMessageSummary) << "GetBlockHashesByNumber (" << number << "-" << number + limit << ")"; + RLPStream s; + if (number <= host()->chain().number()) { - u256 number256 = _r[0].toInt(); - unsigned number = (unsigned) number256; - unsigned limit = _r[1].toInt(); - clog(NetMessageSummary) << "GetBlockHashes (" << number << "-" << number + limit << ")"; - RLPStream s; - if (number <= host()->m_chain.number()) + unsigned c = min(host()->chain().number() - number + 1, limit); + prep(s, BlockHashesPacket, c); + for (unsigned n = number; n < number + c; n++) { - unsigned c = min(host()->m_chain.number() - number + 1, limit); - prep(s, BlockHashesPacket, c); - for (unsigned n = number; n < number + c; n++) - { - h256 p = host()->m_chain.numberHash(n); - s << p; - } + h256 p = host()->chain().numberHash(n); + s << p; } - else - prep(s, BlockHashesPacket, 0); - sealAndSend(s); - addRating(0); } else - { - // Support V60 protocol - h256 later = _r[0].toHash(); - unsigned limit = _r[1].toInt(); - clog(NetMessageSummary) << "GetBlockHashes (" << limit << "entries," << later << ")"; - - unsigned c = min(host()->m_chain.number(later), limit); - - RLPStream s; - prep(s, BlockHashesPacket, c); - h256 p = host()->m_chain.details(later).parent; - for (unsigned i = 0; i < c && p; ++i, p = host()->m_chain.details(p).parent) - s << p; - sealAndSend(s); - addRating(0); - } + prep(s, BlockHashesPacket, 0); + sealAndSend(s); + addRating(0); break; } case BlockHashesPacket: @@ -290,14 +236,10 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) if (m_asking != Asking::Hashes) { - cwarn << "Peer giving us hashes when we didn't ask for them."; + clog(NetWarn) << "Peer giving us hashes when we didn't ask for them."; break; } - if (itemCount == 0) - { - host()->onPeerDoneHashes(this, false); - return true; - } + setAsking(Asking::Nothing); h256s hashes(itemCount); for (unsigned i = 0; i < itemCount; ++i) { @@ -306,10 +248,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) } if (m_protocolVersion == host()->protocolVersion()) - { - //v61, report hashes ordered by number - host()->onPeerHashes(this, m_syncHashNumber, hashes); - } + host()->onPeerHashes(this, m_syncHashNumber, hashes); // V61+, report hashes by number else host()->onPeerHashes(this, hashes); m_syncHashNumber += itemCount; @@ -332,9 +271,9 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) for (unsigned i = 0; i < min(count, c_maxBlocks); ++i) { auto h = _r[i].toHash(); - if (host()->m_chain.isKnown(h)) + if (host()->chain().isKnown(h)) { - rlp += host()->m_chain.block(_r[i].toHash()); + rlp += host()->chain().block(_r[i].toHash()); ++n; } } @@ -351,67 +290,30 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) } case BlocksPacket: { - host()->onPeerBlocks(this, _r); + if (m_asking != Asking::Blocks) + clog(NetWarn) << "Peer giving us blocks when we didn't ask for them."; + else + { + setAsking(Asking::Nothing); + host()->onPeerBlocks(this, _r); + } break; } case NewBlockPacket: { - auto h = BlockInfo::headerHash(_r[0].data()); - clog(NetMessageSummary) << "NewBlock: " << h; - - if (_r.itemCount() != 2) - disable("NewBlock without 2 data fields."); - else - { - switch (host()->m_bq.import(_r[0].data(), host()->m_chain)) - { - case ImportResult::Success: - addRating(100); - break; - case ImportResult::FutureTime: - //TODO: Rating dependent on how far in future it is. - break; - - case ImportResult::Malformed: - case ImportResult::BadChain: - disable("Malformed block received."); - return true; - - case ImportResult::AlreadyInChain: - case ImportResult::AlreadyKnown: - break; - - case ImportResult::UnknownParent: - clog(NetMessageSummary) << "Received block with no known parent. Resyncing..."; - setNeedsSyncing(h, _r[1].toInt()); - break; - default:; - } - - DEV_GUARDED(x_knownBlocks) - m_knownBlocks.insert(h); - } + host()->onPeerNewBlock(this, _r); break; } case NewBlockHashesPacket: { - clog(NetMessageSummary) << "NewBlockHashes"; - if (host()->isSyncing()) - clog(NetMessageSummary) << "Ignoring since we're already downloading."; - else - { - unsigned itemCount = _r.itemCount(); - clog(NetMessageSummary) << "BlockHashes (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreHashes"); + unsigned itemCount = _r.itemCount(); + clog(NetMessageSummary) << "BlockHashes (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreHashes"); - h256s hashes(itemCount); - for (unsigned i = 0; i < itemCount; ++i) - hashes[i] = _r[i].toHash(); + h256s hashes(itemCount); + for (unsigned i = 0; i < itemCount; ++i) + hashes[i] = _r[i].toHash(); - clog(NetNote) << "Not syncing and new block hash discovered: syncing without help."; - host()->onPeerHashes(this, hashes); - host()->onPeerDoneHashes(this, true); - return true; - } + host()->onPeerNewHashes(this, hashes); break; } default: diff --git a/libethereum/EthereumPeer.h b/libethereum/EthereumPeer.h index 2dd17d8c3..94dc60501 100644 --- a/libethereum/EthereumPeer.h +++ b/libethereum/EthereumPeer.h @@ -49,11 +49,11 @@ namespace eth */ class EthereumPeer: public p2p::Capability { - friend class EthereumHost; + friend class EthereumHost; //TODO: remove this public: /// Basic constructor. - EthereumPeer(p2p::Session* _s, p2p::HostCapabilityFace* _h, unsigned _i); + EthereumPeer(p2p::Session* _s, p2p::HostCapabilityFace* _h, unsigned _i, p2p::CapDesc const& _cap); /// Basic destructor. virtual ~EthereumPeer(); @@ -70,10 +70,16 @@ public: /// What is the ethereum subprotocol host object. EthereumHost* host() const; + /// Abort sync and reset fetch void setIdle(); - void requestState(); + + /// Request hashes. Uses hash download manager to get hash number. v61+ protocol version only void requestHashes(); + + /// Request hashes for given parent hash. void requestHashes(h256 const& _lastHash); + + /// Request blocks. Uses block download manager. void requestBlocks(); private: @@ -82,6 +88,9 @@ private: /// Interpret an incoming message. virtual bool interpret(unsigned _id, RLP const& _r); + /// Request status. Called from constructor + void requestStatus(); + /// Abort the sync operation. void abortSync(); @@ -91,24 +100,18 @@ private: /// Update our asking state. void setAsking(Asking _g); - /// Update our syncing requirements state. - void setNeedsSyncing(h256 _latestHash, u256 _td); - void resetNeedsSyncing() { setNeedsSyncing(h256(), 0); } - /// Do we presently need syncing with this peer? bool needsSyncing() const { return !!m_latestHash; } /// Are we presently syncing with this peer? bool isSyncing() const; - /// Check whether the session should bother grabbing the peer's blocks. - bool shouldGrabBlocks() const; - /// Runs period checks to check up on the peer. void tick(); /// Peer's protocol version. unsigned m_protocolVersion; + /// Peer's network id. u256 m_networkId; @@ -117,28 +120,24 @@ private: /// When we asked for it. Allows a time out. std::chrono::system_clock::time_point m_lastAsk; - /// Whether this peer is in the process of syncing or not. Only one peer can be syncing at once. - bool m_isSyncing = false; - /// These are determined through either a Status message or from NewBlock. h256 m_latestHash; ///< Peer's latest block's hash that we know about or default null value if no need to sync. u256 m_totalDifficulty; ///< Peer's latest block's total difficulty. h256 m_genesisHash; ///< Peer's genesis hash - /// Once a sync is started on this peer, they are cleared and moved into m_syncing*. + u256 m_latestBlockNumber; ///< Number of the latest block this peer has /// This is built as we ask for hashes. Once no more hashes are given, we present this to the /// host who initialises the DownloadMan and m_sub becomes active for us to begin asking for blocks. - h256s m_syncingNeededBlocks; ///< The blocks that we should download from this peer. - h256 m_syncingLastReceivedHash; ///< Hash most recently received from peer. - h256 m_syncingLatestHash; ///< Peer's latest block's hash, as of the current sync. - u256 m_syncingTotalDifficulty; ///< Peer's latest block's total difficulty, as of the current sync. - unsigned m_expectedHashes = 0; ///< Estimated Upper bound of hashes to expect from this peer. - unsigned m_syncHashNumber = 0; + unsigned m_expectedHashes = 0; ///< Estimated upper bound of hashes to expect from this peer. + unsigned m_syncHashNumber = 0; ///< Number of latest hash we sync to /// Once we're asking for blocks, this becomes in use. DownloadSub m_sub; + + /// Once we're asking for hashes, this becomes in use. HashDownloadSub m_hashSub; + u256 m_peerCapabilityVersion; ///< Protocol version this peer supports received as capability /// Have we received a GetTransactions packet that we haven't yet answered? bool m_requireTransactions = false; @@ -146,7 +145,6 @@ private: h256Hash m_knownBlocks; ///< Blocks that the peer already knows about (that don't need to be sent to them). Mutex x_knownTransactions; h256Hash m_knownTransactions; ///< Transactions that the peer already knows of. - }; } diff --git a/libp2p/Capability.h b/libp2p/Capability.h index 536357b51..d09391655 100644 --- a/libp2p/Capability.h +++ b/libp2p/Capability.h @@ -44,7 +44,6 @@ public: */ Session* session() const { return m_session; } HostCapabilityFace* hostCapability() const { return m_host; } - bool enabled() { return m_enabled; } protected: virtual bool interpret(unsigned _id, RLP const&) = 0; diff --git a/libp2p/Host.cpp b/libp2p/Host.cpp index 998579a90..b6c9efec9 100644 --- a/libp2p/Host.cpp +++ b/libp2p/Host.cpp @@ -202,6 +202,10 @@ void Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io // clang error (previously: ... << hex << caps ...) // "'operator<<' should be declared prior to the call site or in an associated namespace of one of its arguments" stringstream capslog; + + if (caps.size() > 1) + caps.erase(remove_if(caps.begin(), caps.end(), [&](CapDesc const& _r){ return any_of(caps.begin(), caps.end(), [&](CapDesc const& _o){ return _r.first == _o.first && _o.second > _r.second; }); }), caps.end()); + for (auto cap: caps) capslog << "(" << cap.first << "," << dec << cap.second << ")"; clog(NetMessageSummary) << "Hello: " << clientVersion << "V[" << protocolVersion << "]" << _id << showbase << capslog.str() << dec << listenPort; @@ -237,7 +241,7 @@ void Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io for (auto const& i: caps) if (haveCapability(i)) { - ps->m_capabilities[i] = shared_ptr(m_capabilities[i]->newPeerCapability(ps.get(), o)); + ps->m_capabilities[i] = shared_ptr(m_capabilities[i]->newPeerCapability(ps.get(), o, i)); o += m_capabilities[i]->messageCount(); } ps->start(); diff --git a/libp2p/HostCapability.h b/libp2p/HostCapability.h index a9ce1585b..19b149085 100644 --- a/libp2p/HostCapability.h +++ b/libp2p/HostCapability.h @@ -53,7 +53,7 @@ protected: virtual u256 version() const = 0; CapDesc capDesc() const { return std::make_pair(name(), version()); } virtual unsigned messageCount() const = 0; - virtual Capability* newPeerCapability(Session* _s, unsigned _idOffset) = 0; + virtual Capability* newPeerCapability(Session* _s, unsigned _idOffset, CapDesc const& _cap) = 0; virtual void onStarting() {} virtual void onStopping() {} @@ -77,7 +77,7 @@ protected: virtual std::string name() const { return PeerCap::name(); } virtual u256 version() const { return PeerCap::version(); } virtual unsigned messageCount() const { return PeerCap::messageCount(); } - virtual Capability* newPeerCapability(Session* _s, unsigned _idOffset) { return new PeerCap(_s, this, _idOffset); } + virtual Capability* newPeerCapability(Session* _s, unsigned _idOffset, CapDesc const& _cap) { return new PeerCap(_s, this, _idOffset, _cap); } }; } diff --git a/libwhisper/WhisperPeer.cpp b/libwhisper/WhisperPeer.cpp index 7bcdfe8aa..0b75def28 100644 --- a/libwhisper/WhisperPeer.cpp +++ b/libwhisper/WhisperPeer.cpp @@ -29,7 +29,7 @@ using namespace dev; using namespace dev::p2p; using namespace dev::shh; -WhisperPeer::WhisperPeer(Session* _s, HostCapabilityFace* _h, unsigned _i): Capability(_s, _h, _i) +WhisperPeer::WhisperPeer(Session* _s, HostCapabilityFace* _h, unsigned _i, CapDesc const&): Capability(_s, _h, _i) { RLPStream s; sealAndSend(prep(s, StatusPacket, 1) << version()); diff --git a/libwhisper/WhisperPeer.h b/libwhisper/WhisperPeer.h index ab9c8222a..9344da024 100644 --- a/libwhisper/WhisperPeer.h +++ b/libwhisper/WhisperPeer.h @@ -42,6 +42,7 @@ using p2p::Session; using p2p::HostCapabilityFace; using p2p::HostCapability; using p2p::Capability; +using p2p::CapDesc; /** */ @@ -50,7 +51,7 @@ class WhisperPeer: public Capability friend class WhisperHost; public: - WhisperPeer(Session* _s, HostCapabilityFace* _h, unsigned _i); + WhisperPeer(Session* _s, HostCapabilityFace* _h, unsigned _i, CapDesc const& _cap); virtual ~WhisperPeer(); static std::string name() { return "shh"; } diff --git a/test/libp2p/capability.cpp b/test/libp2p/capability.cpp index 2c158f4d8..0a8542ee0 100644 --- a/test/libp2p/capability.cpp +++ b/test/libp2p/capability.cpp @@ -49,7 +49,7 @@ struct VerbosityHolder class TestCapability: public Capability { public: - TestCapability(Session* _s, HostCapabilityFace* _h, unsigned _idOffset): Capability(_s, _h, _idOffset), m_cntReceivedMessages(0), m_testSum(0) {} + TestCapability(Session* _s, HostCapabilityFace* _h, unsigned _idOffset, CapDesc const&): Capability(_s, _h, _idOffset), m_cntReceivedMessages(0), m_testSum(0) {} virtual ~TestCapability() {} int countReceivedMessages() { return m_cntReceivedMessages; } int testSum() { return m_testSum; }