Browse Source

blockchain sync refactoring

cl-refactor
arkpar 10 years ago
parent
commit
10cfa35c09
  1. 2
      libethcore/Common.cpp
  2. 2
      libethereum/BlockChain.cpp
  3. 1
      libethereum/CanonBlockChain.h
  4. 29
      libethereum/Client.cpp
  5. 4
      libethereum/CommonNet.h
  6. 49
      libethereum/DownloadMan.cpp
  7. 123
      libethereum/DownloadMan.h
  8. 561
      libethereum/EthereumHost.cpp
  9. 30
      libethereum/EthereumHost.h
  10. 462
      libethereum/EthereumPeer.cpp
  11. 18
      libethereum/EthereumPeer.h
  12. 1
      libp2p/Capability.h
  13. 1
      libp2p/Host.h
  14. 7
      libp2p/HostCapability.cpp
  15. 1
      libp2p/HostCapability.h
  16. 2
      libp2p/Session.h

2
libethcore/Common.cpp

@ -35,7 +35,7 @@ namespace dev
namespace eth namespace eth
{ {
const unsigned c_protocolVersion = 60; const unsigned c_protocolVersion = 61;
#if ETH_FATDB #if ETH_FATDB
const unsigned c_minorProtocolVersion = 3; const unsigned c_minorProtocolVersion = 3;
const unsigned c_databaseBaseVersion = 9; const unsigned c_databaseBaseVersion = 9;

2
libethereum/BlockChain.cpp

@ -317,7 +317,7 @@ tuple<h256s, h256s, bool> BlockChain::sync(BlockQueue& _bq, OverlayDB const& _st
{ {
try try
{ {
// Nonce & uncle nonces already verified thread at this point. // Nonce & uncle nonces already verified in verification thread at this point.
ImportRoute r; ImportRoute r;
DEV_TIMED_ABOVE(Block import, 500) DEV_TIMED_ABOVE(Block import, 500)
r = import(block.first, block.second, _stateDB, ImportRequirements::Default & ~ImportRequirements::ValidNonce & ~ImportRequirements::CheckUncles); r = import(block.first, block.second, _stateDB, ImportRequirements::Default & ~ImportRequirements::ValidNonce & ~ImportRequirements::CheckUncles);

1
libethereum/CanonBlockChain.h

@ -34,7 +34,6 @@
#include <libdevcore/Guards.h> #include <libdevcore/Guards.h>
#include "BlockDetails.h" #include "BlockDetails.h"
#include "Account.h" #include "Account.h"
#include "BlockQueue.h"
#include "BlockChain.h" #include "BlockChain.h"
namespace ldb = leveldb; namespace ldb = leveldb;

29
libethereum/Client.cpp

@ -164,28 +164,8 @@ const char* ClientDetail::name() { return EthTeal "⧫" EthCoal " ●"; }
#endif #endif
Client::Client(p2p::Host* _extNet, std::string const& _dbPath, WithExisting _forceAction, u256 _networkId): Client::Client(p2p::Host* _extNet, std::string const& _dbPath, WithExisting _forceAction, u256 _networkId):
Worker("eth", 0), Client(_extNet, make_shared<TrivialGasPricer>(), _dbPath, _forceAction, _networkId)
m_vc(_dbPath),
m_bc(_dbPath, max(m_vc.action(), _forceAction), [](unsigned d, unsigned t){ cerr << "REVISING BLOCKCHAIN: Processed " << d << " of " << t << "...\r"; }),
m_gp(new TrivialGasPricer),
m_stateDB(State::openDB(_dbPath, max(m_vc.action(), _forceAction))),
m_preMine(m_stateDB, BaseState::CanonGenesis),
m_postMine(m_stateDB)
{ {
m_lastGetWork = std::chrono::system_clock::now() - chrono::seconds(30);
m_tqReady = m_tq.onReady([=](){ this->onTransactionQueueReady(); }); // TODO: should read m_tq->onReady(thisThread, syncTransactionQueue);
m_bqReady = m_bq.onReady([=](){ this->onBlockQueueReady(); }); // TODO: should read m_bq->onReady(thisThread, syncBlockQueue);
m_farm.onSolutionFound([=](ProofOfWork::Solution const& s){ return this->submitWork(s); });
m_gp->update(m_bc);
m_host = _extNet->registerCapability(new EthereumHost(m_bc, m_tq, m_bq, _networkId));
if (_dbPath.size())
Defaults::setDBPath(_dbPath);
m_vc.setOk();
doWork();
startWorking(); startWorking();
} }
@ -195,7 +175,7 @@ Client::Client(p2p::Host* _extNet, std::shared_ptr<GasPricer> _gp, std::string c
m_bc(_dbPath, max(m_vc.action(), _forceAction), [](unsigned d, unsigned t){ cerr << "REVISING BLOCKCHAIN: Processed " << d << " of " << t << "...\r"; }), m_bc(_dbPath, max(m_vc.action(), _forceAction), [](unsigned d, unsigned t){ cerr << "REVISING BLOCKCHAIN: Processed " << d << " of " << t << "...\r"; }),
m_gp(_gp), m_gp(_gp),
m_stateDB(State::openDB(_dbPath, max(m_vc.action(), _forceAction))), m_stateDB(State::openDB(_dbPath, max(m_vc.action(), _forceAction))),
m_preMine(m_stateDB), m_preMine(m_stateDB, BaseState::CanonGenesis),
m_postMine(m_stateDB) m_postMine(m_stateDB)
{ {
m_lastGetWork = std::chrono::system_clock::now() - chrono::seconds(30); m_lastGetWork = std::chrono::system_clock::now() - chrono::seconds(30);
@ -205,7 +185,10 @@ Client::Client(p2p::Host* _extNet, std::shared_ptr<GasPricer> _gp, std::string c
m_gp->update(m_bc); m_gp->update(m_bc);
m_host = _extNet->registerCapability(new EthereumHost(m_bc, m_tq, m_bq, _networkId));
auto host = _extNet->registerCapability(new EthereumHost(m_bc, m_tq, m_bq, _networkId));
m_host = host;
_extNet->addCapability(host, EthereumHost::staticName(), EthereumHost::staticVersion() - 1);
if (_dbPath.size()) if (_dbPath.size())
Defaults::setDBPath(_dbPath); Defaults::setDBPath(_dbPath);

4
libethereum/CommonNet.h

@ -38,12 +38,12 @@ namespace eth
#if ETH_DEBUG #if ETH_DEBUG
static const unsigned c_maxHashes = 2048; ///< Maximum number of hashes BlockHashes will ever send. static const unsigned c_maxHashes = 2048; ///< Maximum number of hashes BlockHashes will ever send.
static const unsigned c_maxHashesAsk = 2048; ///< Maximum number of hashes GetBlockHashes will ever ask for. static const unsigned c_maxHashesAsk = 2048; ///< Maximum number of hashes GetBlockHashes will ever ask for.
static const unsigned c_maxBlocks = 128; ///< Maximum number of blocks Blocks will ever send. static const unsigned c_maxBlocks = 128; ///< Maximum number of blocks Blocks will ever send.
static const unsigned c_maxBlocksAsk = 128; ///< Maximum number of blocks we ask to receive in Blocks (when using GetChain). static const unsigned c_maxBlocksAsk = 128; ///< Maximum number of blocks we ask to receive in Blocks (when using GetChain).
#else #else
static const unsigned c_maxHashes = 2048; ///< Maximum number of hashes BlockHashes will ever send. static const unsigned c_maxHashes = 2048; ///< Maximum number of hashes BlockHashes will ever send.
static const unsigned c_maxHashesAsk = 2048; ///< Maximum number of hashes GetBlockHashes will ever ask for. static const unsigned c_maxHashesAsk = 2048; ///< Maximum number of hashes GetBlockHashes will ever ask for.
static const unsigned c_maxBlocks = 128; ///< Maximum number of blocks Blocks will ever send. static const unsigned c_maxBlocks = 128; ///< Maximum number of blocks Blocks will ever send.
static const unsigned c_maxBlocksAsk = 128; ///< Maximum number of blocks we ask to receive in Blocks (when using GetChain). static const unsigned c_maxBlocksAsk = 128; ///< Maximum number of blocks we ask to receive in Blocks (when using GetChain).
#endif #endif

49
libethereum/DownloadMan.cpp

@ -75,3 +75,52 @@ bool DownloadSub::noteBlock(h256 _hash)
m_remaining.erase(_hash); m_remaining.erase(_hash);
return ret; return ret;
} }
HashDownloadSub::HashDownloadSub(HashDownloadMan& _man): m_man(&_man)
{
WriteGuard l(m_man->x_subs);
m_asked = RangeMask<unsigned>(m_man->m_chainStart, m_man->m_chainStart + m_man->m_chainCount);
m_attempted = RangeMask<unsigned>(m_man->m_chainStart, m_man->m_chainStart + m_man->m_chainCount);
m_man->m_subs.insert(this);
}
HashDownloadSub::~HashDownloadSub()
{
if (m_man)
{
WriteGuard l(m_man->x_subs);
m_man->m_subs.erase(this);
}
}
void HashDownloadSub::resetFetch()
{
Guard l(m_fetch);
m_remaining = 0;
m_asked = RangeMask<unsigned>(m_man->m_chainStart, m_man->m_chainStart + m_man->m_chainCount);
m_attempted = RangeMask<unsigned>(m_man->m_chainStart, m_man->m_chainStart + m_man->m_chainCount);
}
unsigned HashDownloadSub::nextFetch(unsigned _n)
{
Guard l(m_fetch);
m_asked = RangeMask<unsigned>(m_man->m_chainStart, m_man->m_chainStart + m_man->m_chainCount);
if (!m_man || m_man->chainEmpty())
return 0;
m_asked = (~(m_man->taken() + m_attempted)).lowest(_n);
if (m_asked.empty())
m_asked = (~(m_man->taken(true) + m_attempted)).lowest(_n);
m_attempted += m_asked;
return *m_asked.begin();
}
void HashDownloadSub::noteHash(unsigned _index, unsigned _size)
{
Guard l(m_fetch);
if (m_man)
for(unsigned i = _index; i < _index + _size; ++i)
m_man->m_got += i;
}

123
libethereum/DownloadMan.h

@ -88,6 +88,13 @@ public:
i->m_man = nullptr; i->m_man = nullptr;
} }
void appendToChain(h256s const& _hashes)
{
WriteGuard l(m_lock);
m_chain.insert(m_chain.end(), _hashes.cbegin(), _hashes.cend());
m_blocksGot = RangeMask<unsigned>(0, m_chain.size());
}
void resetToChain(h256s const& _chain) void resetToChain(h256s const& _chain)
{ {
{ {
@ -158,6 +165,122 @@ private:
std::unordered_set<DownloadSub*> m_subs; std::unordered_set<DownloadSub*> m_subs;
}; };
class HashDownloadMan;
class HashDownloadSub
{
friend class HashDownloadMan;
public:
HashDownloadSub(HashDownloadMan& _man);
~HashDownloadSub();
/// Finished last fetch - grab the next hash index to download
unsigned nextFetch(unsigned _n);
/// Note that we've received a particular hash range.
void noteHash(unsigned _index, unsigned count);
/// Nothing doing here.
void doneFetch() { resetFetch(); }
bool askedContains(unsigned _i) const { Guard l(m_fetch); return m_asked.contains(_i); }
RangeMask<unsigned> const& asked() const { return m_asked; }
RangeMask<unsigned> const& attemped() const { return m_attempted; }
private:
void resetFetch(); // Called by DownloadMan when we need to reset the download.
HashDownloadMan* m_man = nullptr;
mutable Mutex m_fetch;
unsigned m_remaining;
RangeMask<unsigned> m_asked;
RangeMask<unsigned> m_attempted;
};
class HashDownloadMan
{
friend class HashDownloadSub;
public:
~HashDownloadMan()
{
for (auto i: m_subs)
i->m_man = nullptr;
}
void resetToRange(unsigned _start, unsigned _count)
{
{
ReadGuard l(x_subs);
for (auto i: m_subs)
i->resetFetch();
}
WriteGuard l(m_lock);
m_chainStart = _start;
m_chainCount = _count;
m_got += RangeMask<unsigned>(_start, _start + _count);
{
ReadGuard l(x_subs);
for (auto i: m_subs)
i->resetFetch();
}
}
void reset(unsigned _start)
{
WriteGuard l(m_lock);
m_chainStart = _start;
m_chainCount = 0;
m_got = RangeMask<unsigned>(_start, _start);
{
ReadGuard l(x_subs);
for (auto i: m_subs)
i->resetFetch();
}
}
RangeMask<unsigned> taken(bool _desperate = false) const
{
ReadGuard l(m_lock);
auto ret = m_got;
if (!_desperate)
{
ReadGuard l(x_subs);
for (auto i: m_subs)
ret += i->m_asked;
}
return ret;
}
bool isComplete() const
{
ReadGuard l(m_lock);
return m_got.full();
}\
size_t chainSize() const { ReadGuard l(m_lock); return m_chainCount; }
size_t chainEmpty() const { ReadGuard l(m_lock); return m_chainCount == 0; }
void foreachSub(std::function<void(HashDownloadSub const&)> const& _f) const { ReadGuard l(x_subs); for(auto i: m_subs) _f(*i); }
unsigned subCount() const { ReadGuard l(x_subs); return m_subs.size(); }
RangeMask<unsigned> hashesGot() const { ReadGuard l(m_lock); return m_got; }
private:
mutable SharedMutex m_lock;
unsigned m_chainStart = 0;
unsigned m_chainCount = 0;
RangeMask<unsigned> m_got;
mutable SharedMutex x_subs;
std::unordered_set<HashDownloadSub*> m_subs;
};
} }
} }

561
libethereum/EthereumHost.cpp

@ -37,6 +37,8 @@ using namespace dev;
using namespace dev::eth; using namespace dev::eth;
using namespace p2p; using namespace p2p;
const unsigned c_prevProtocolVersion = 60;
EthereumHost::EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQueue& _bq, u256 _networkId): EthereumHost::EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQueue& _bq, u256 _networkId):
HostCapability<EthereumPeer>(), HostCapability<EthereumPeer>(),
Worker ("ethsync"), Worker ("ethsync"),
@ -46,12 +48,12 @@ EthereumHost::EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQu
m_networkId (_networkId) m_networkId (_networkId)
{ {
m_latestBlockSent = _ch.currentHash(); m_latestBlockSent = _ch.currentHash();
m_hashMan.reset(m_chain.number() + 1);
} }
EthereumHost::~EthereumHost() EthereumHost::~EthereumHost()
{ {
for (auto i: peerSessions()) forEachPeer([](EthereumPeer* _p) { _p->abortSync(); });
i.first->cap<EthereumPeer>().get()->abortSync();
} }
bool EthereumHost::ensureInitialised() bool EthereumHost::ensureInitialised()
@ -71,86 +73,22 @@ bool EthereumHost::ensureInitialised()
void EthereumHost::noteNeedsSyncing(EthereumPeer* _who) void EthereumHost::noteNeedsSyncing(EthereumPeer* _who)
{ {
// if already downloading hash-chain, ignore. if (_who->m_asking == Asking::Nothing)
if (isSyncing()) continueSync(_who);
{
clog(NetAllDetail) << "Sync in progress: Just set to help out.";
if (m_syncer->m_asking == Asking::Blocks)
_who->transition(Asking::Blocks);
}
else
// otherwise check to see if we should be downloading...
_who->attemptSync();
}
void EthereumHost::changeSyncer(EthereumPeer* _syncer, bool _needHelp)
{
if (_syncer)
clog(NetAllDetail) << "Changing syncer to" << _syncer->session()->socketId();
else
clog(NetAllDetail) << "Clearing syncer.";
m_syncer = _syncer;
if (isSyncing())
{
if (_needHelp && _syncer->m_asking == Asking::Blocks)
for (auto j: peerSessions())
{
clog(NetNote) << "Getting help with downloading blocks";
auto e = j.first->cap<EthereumPeer>().get();
if (e != _syncer && e->m_asking == Asking::Nothing)
e->transition(Asking::Blocks);
}
}
else
{
// start grabbing next hash chain if there is one.
for (auto j: peerSessions())
{
j.first->cap<EthereumPeer>()->attemptSync();
if (isSyncing())
return;
}
clog(NetNote) << "No more peers to sync with.";
}
}
void EthereumHost::noteDoneBlocks(EthereumPeer* _who, bool _clemency)
{
if (m_man.isComplete())
{
// Done our chain-get.
clog(NetNote) << "Chain download complete.";
// 1/100th for each useful block hash.
_who->addRating(m_man.chainSize() / 100);
m_man.reset();
}
else if (_who->isSyncing())
{
if (_clemency)
clog(NetNote) << "Chain download failed. Aborted while incomplete.";
else
{
// Done our chain-get.
clog(NetWarn) << "Chain download failed. Peer with blocks didn't have them all. This peer is bad and should be punished.";
clog(NetWarn) << m_man.remaining();
clog(NetWarn) << "WOULD BAN.";
// m_banned.insert(_who->session()->id()); // We know who you are!
// _who->disable("Peer sent hashes but was unable to provide the blocks.");
}
m_man.reset();
}
} }
void EthereumHost::reset() void EthereumHost::reset()
{ {
if (m_syncer) forEachPeer([](EthereumPeer* _p) { _p->abortSync(); });
m_syncer->abortSync();
m_man.resetToChain(h256s()); m_man.resetToChain(h256s());
m_hashMan.reset(m_chain.number() + 1);
m_needSyncBlocks = true;
m_needSyncHashes = true;
m_syncingLatestHash = h256();
m_syncingTotalDifficulty = 0;
m_latestBlockSent = h256(); m_latestBlockSent = h256();
m_transactionsSent.clear(); m_transactionsSent.clear();
m_v60Hashes.clear();
} }
void EthereumHost::doWork() void EthereumHost::doWork()
@ -172,9 +110,7 @@ void EthereumHost::doWork()
} }
} }
for (auto p: peerSessions()) forEachPeer([](EthereumPeer* _p) { _p->tick(); });
if (shared_ptr<EthereumPeer> const& ep = p.first->cap<EthereumPeer>())
ep->tick();
// return netChange; // return netChange;
// TODO: Figure out what to do with netChange. // TODO: Figure out what to do with netChange.
@ -194,43 +130,60 @@ void EthereumHost::maintainTransactions()
} }
for (auto const& t: ts) for (auto const& t: ts)
m_transactionsSent.insert(t.first); m_transactionsSent.insert(t.first);
for (auto p: peerSessions()) forEachPeer([&](shared_ptr<EthereumPeer> _p)
if (auto ep = p.first->cap<EthereumPeer>()) {
bytes b;
unsigned n = 0;
for (auto const& h: peerTransactions[_p])
{ {
bytes b; _p->m_knownTransactions.insert(h);
unsigned n = 0; b += ts[h].rlp();
for (auto const& h: peerTransactions[ep]) ++n;
{ }
ep->m_knownTransactions.insert(h);
b += ts[h].rlp();
++n;
}
ep->clearKnownTransactions(); _p->clearKnownTransactions();
if (n || ep->m_requireTransactions) if (n || _p->m_requireTransactions)
{ {
RLPStream ts; RLPStream ts;
ep->prep(ts, TransactionsPacket, n).appendRaw(b, n); _p->prep(ts, TransactionsPacket, n).appendRaw(b, n);
ep->sealAndSend(ts); _p->sealAndSend(ts);
}
ep->m_requireTransactions = false;
} }
_p->m_requireTransactions = false;
});
}
void EthereumHost::forEachPeer(std::function<void(EthereumPeer*)> const& _f)
{
forEachPeer([&](std::shared_ptr<EthereumPeer> _p)
{
if (_p)
_f(_p.get());
});
}
void EthereumHost::forEachPeer(std::function<void(std::shared_ptr<EthereumPeer>)> const& _f)
{
for (auto s: peerSessions())
_f(s.first->cap<EthereumPeer>());
for (auto s: peerSessions(protocolVersion() - 1)) //TODO:
_f(s.first->cap<EthereumPeer>(protocolVersion() - 1));
} }
pair<vector<shared_ptr<EthereumPeer>>, vector<shared_ptr<EthereumPeer>>> EthereumHost::randomSelection(unsigned _percent, std::function<bool(EthereumPeer*)> const& _allow) pair<vector<shared_ptr<EthereumPeer>>, vector<shared_ptr<EthereumPeer>>> EthereumHost::randomSelection(unsigned _percent, std::function<bool(EthereumPeer*)> const& _allow)
{ {
pair<vector<shared_ptr<EthereumPeer>>, vector<shared_ptr<EthereumPeer>>> ret; pair<vector<shared_ptr<EthereumPeer>>, vector<shared_ptr<EthereumPeer>>> ret;
ret.second.reserve(peerSessions().size()); vector<shared_ptr<EthereumPeer>> peers;
for (auto const& j: peerSessions()) forEachPeer([&](shared_ptr<EthereumPeer> _p)
{ {
auto pp = j.first->cap<EthereumPeer>(); if (_p && _allow(_p.get()))
if (_allow(pp.get())) ret.second.push_back(_p);
ret.second.push_back(pp); });
}
ret.second.reserve((peerSessions().size() * _percent + 99) / 100); size_t size = (ret.second.size() * _percent + 99) / 100;
for (unsigned i = (peerSessions().size() * _percent + 99) / 100; i-- && ret.second.size();) ret.second.reserve(size);
for (unsigned i = size; i-- && ret.second.size();)
{ {
unsigned n = rand() % ret.second.size(); unsigned n = rand() % ret.second.size();
ret.first.push_back(std::move(ret.second[n])); ret.first.push_back(std::move(ret.second[n]));
@ -279,3 +232,403 @@ void EthereumHost::maintainBlocks(h256 const& _currentHash)
m_latestBlockSent = _currentHash; m_latestBlockSent = _currentHash;
} }
} }
void EthereumHost::onPeerState(EthereumPeer* _peer)
{
if (!_peer->enabled())
{
clog(NetNote) << "Ignoring status from disabled peer";
return;
}
if (_peer->m_genesisHash != m_chain.genesisHash())
_peer->disable("Invalid genesis hash");
else if (_peer->m_protocolVersion != protocolVersion())// && _peer->m_protocolVersion != c_prevProtocolVersion)
_peer->disable("Invalid protocol version.");
else if (_peer->m_networkId != networkId())
_peer->disable("Invalid network identifier.");
else if (_peer->session()->info().clientVersion.find("/v0.7.0/") != string::npos)
_peer->disable("Blacklisted client version.");
else if (isBanned(_peer->session()->id()))
_peer->disable("Peer banned for previous bad behaviour.");
else
{
_peer->m_expectedHashes = 500000; //TODO:
if (m_hashMan.chainSize() < _peer->m_expectedHashes)
m_hashMan.resetToRange(m_chain.number() + 1, _peer->m_expectedHashes);
continueSync(_peer);
}
}
void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes)
{
unsigned knowns = 0;
unsigned unknowns = 0;
h256s neededBlocks;
for (unsigned i = 0; i < _hashes.size(); ++i)
{
_peer->addRating(1);
auto h = _hashes[i];
auto status = m_bq.blockStatus(h);
if (status == QueueStatus::Importing || status == QueueStatus::Ready || m_chain.isKnown(h))
{
clog(NetMessageSummary) << "block hash ready:" << h << ". Start blocks download...";
m_v60Hashes += neededBlocks;
onPeerDoneHashes(_peer, false);
return;
}
else if (status == QueueStatus::Bad)
{
cwarn << "block hash bad!" << h << ". Bailing...";
_peer->setIdle();
return;
}
else if (status == QueueStatus::Unknown)
{
unknowns++;
neededBlocks.push_back(h);
}
else
knowns++;
m_syncingLatestHash = h;
}
m_v60Hashes += neededBlocks;
clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns; now at" << m_syncingLatestHash;
if (_complete)
{
m_needSyncBlocks = true;
continueSync(_peer);
}
else if (m_hashes.size() > _peer->m_expectedHashes)
{
_peer->disable("Too many hashes");
m_hashes.clear();
m_syncingLatestHash = h256();
continueSync(); ///Try with some other peer, keep the chain
}
else
continueSync(_peer); /// Grab next hashes
}
void EthereumHost::onPeerHashes(EthereumPeer* _peer, unsigned /*_index*/, h256s const& _hashes)
{
unsigned knowns = 0;
unsigned unknowns = 0;
h256s neededBlocks;
for (unsigned i = 0; i < _hashes.size(); ++i)
{
_peer->addRating(1);
auto h = _hashes[i];
auto status = m_bq.blockStatus(h);
if (status == QueueStatus::Importing || status == QueueStatus::Ready || m_chain.isKnown(h))
{
clog(NetWarn) << "block hash alrady known:" << h;
}
else if (status == QueueStatus::Bad)
{
cwarn << "block hash bad!" << h << ". Bailing...";
_peer->setIdle();
return;
}
else if (status == QueueStatus::Unknown)
{
unknowns++;
neededBlocks.push_back(h);
}
else
knowns++;
m_syncingLatestHash = h;
}
m_man.appendToChain(neededBlocks);
clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns; now at" << m_syncingLatestHash;
if (m_hashMan.isComplete())
{
// Done our chain-get.
m_needSyncHashes = false;
clog(NetNote) << "Hashes download complete.";
// 1/100th for each useful block hash.
_peer->addRating(m_man.chainSize() / 100); //TODO: what about other peers?
m_hashMan.reset(m_chain.number() + 1);
continueSync();
}
else
continueSync(_peer);
}
void EthereumHost::onPeerDoneHashes(EthereumPeer* _peer, bool _new)
{
m_needSyncHashes = false;
if (_peer->m_protocolVersion == protocolVersion() || _new)
{
continueSync(_peer);
}
else
{
m_man.resetToChain(m_v60Hashes);
continueSync();
}
}
void EthereumHost::onPeerBlocks(EthereumPeer* _peer, RLP const& _r)
{
if (!_peer->enabled())
{
clog(NetNote) << "Ignoring blocks from disabled peer";
return;
}
unsigned itemCount = _r.itemCount();
clog(NetMessageSummary) << "Blocks (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreBlocks");
if (itemCount == 0)
{
// Got to this peer's latest block - just give up.
_peer->setIdle();
return;
}
unsigned success = 0;
unsigned future = 0;
unsigned unknown = 0;
unsigned got = 0;
unsigned repeated = 0;
for (unsigned i = 0; i < itemCount; ++i)
{
auto h = BlockInfo::headerHash(_r[i].data());
if (_peer->m_sub.noteBlock(h))
{
_peer->addRating(10);
switch (m_bq.import(_r[i].data(), m_chain))
{
case ImportResult::Success:
success++;
break;
case ImportResult::Malformed:
case ImportResult::BadChain:
_peer->disable("Malformed block received.");
return;
case ImportResult::FutureTime:
future++;
break;
case ImportResult::AlreadyInChain:
case ImportResult::AlreadyKnown:
got++;
break;
case ImportResult::UnknownParent:
unknown++;
break;
default:;
}
}
else
{
_peer->addRating(0); // -1?
repeated++;
}
}
clog(NetMessageSummary) << dec << success << "imported OK," << unknown << "with unknown parents," << future << "with future timestamps," << got << " already known," << repeated << " repeats received.";
if (m_man.isComplete() && !m_needSyncHashes)
{
// Done our chain-get.
m_needSyncBlocks = false;
clog(NetNote) << "Chain download complete.";
// 1/100th for each useful block hash.
_peer->addRating(m_man.chainSize() / 100); //TODO: what about other peers?
m_man.reset();
}
continueSync(_peer);
}
void EthereumHost::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes)
{
Guard l(x_sync);
if (_peer->m_asking != Asking::Nothing)
{
clog(NetMessageSummary) << "Ignoring new hashes since we're already downloading.";
return;
}
clog(NetNote) << "New block hash discovered: syncing without help.";
onPeerHashes(_peer, _hashes, true);
}
void EthereumHost::onPeerNewBlock(EthereumPeer* _peer, RLP const& _r)
{
Guard l(x_sync);
if (_peer->m_asking != Asking::Nothing)
{
clog(NetMessageSummary) << "Ignoring new blocks since we're already downloading.";
return;
}
auto h = BlockInfo::headerHash(_r[0].data());
clog(NetMessageSummary) << "NewBlock: " << h;
if (_r.itemCount() != 2)
_peer->disable("NewBlock without 2 data fields.");
else
{
bool sync = false;
switch (m_bq.import(_r[0].data(), m_chain))
{
case ImportResult::Success:
_peer->addRating(100);
break;
case ImportResult::FutureTime:
//TODO: Rating dependent on how far in future it is.
break;
case ImportResult::Malformed:
case ImportResult::BadChain:
_peer->disable("Malformed block received.");
return;
case ImportResult::AlreadyInChain:
case ImportResult::AlreadyKnown:
break;
case ImportResult::UnknownParent:
if (h)
{
u256 difficulty = _r[1].toInt<u256>();
if (m_syncingTotalDifficulty < difficulty)
{
clog(NetMessageSummary) << "Received block with no known parent. Resyncing...";
_peer->m_latestHash = h;
_peer->m_totalDifficulty = difficulty;
m_needSyncHashes = true;
m_needSyncBlocks = true;
m_syncingLatestHash = _peer->m_latestHash;
sync = true;
}
}
break;
default:;
}
DEV_GUARDED(_peer->x_knownBlocks)
_peer->m_knownBlocks.insert(h);
if (sync)
continueSync(_peer);
}
}
void EthereumHost::onPeerTransactions(EthereumPeer* _peer, RLP const& _r)
{
unsigned itemCount = _r.itemCount();
clog(NetAllDetail) << "Transactions (" << dec << itemCount << "entries)";
Guard l(_peer->x_knownTransactions);
for (unsigned i = 0; i < itemCount; ++i)
{
auto h = sha3(_r[i].data());
_peer->m_knownTransactions.insert(h);
ImportResult ir = m_tq.import(_r[i].data());
switch (ir)
{
case ImportResult::Malformed:
_peer->addRating(-100);
break;
case ImportResult::AlreadyKnown:
// if we already had the transaction, then don't bother sending it on.
m_transactionsSent.insert(h);
_peer->addRating(0);
break;
case ImportResult::Success:
_peer->addRating(100);
break;
default:;
}
}
}
void EthereumHost::continueSync()
{
forEachPeer([&](EthereumPeer* _p)
{
clog(NetNote) << "Getting help with downloading hashes and blocks";
if (_p->m_asking == Asking::Nothing)
continueSync(_p);
});
}
void EthereumHost::continueSync(EthereumPeer* _peer)
{
bool otherPeerSync = false;
bool thisPeerSync = false;
if (m_needSyncHashes && peerShouldGrabChain(_peer))
{
forEachPeer([&](EthereumPeer* _p)
{
if (_p->m_asking == Asking::Hashes && _p->m_protocolVersion != protocolVersion())
{
// Already have a peer downloading hash chain with old protocol, do nothing
if (_p == _peer)
thisPeerSync = true;
else
otherPeerSync = true;
}
});
if (otherPeerSync)
{
_peer->setIdle();
return;
}
if (_peer->m_protocolVersion == protocolVersion())
_peer->requestHashes();
else
{
// Restart/continue sync in single peer mode
if (!m_syncingLatestHash)
{
m_syncingLatestHash =_peer->m_latestHash;
m_syncingTotalDifficulty = _peer->m_totalDifficulty;
}
_peer->requestHashes(m_syncingLatestHash);
}
}
else if (m_needSyncBlocks && peerShouldGrabBlocks(_peer)) // Check if this peer can help with downloading blocks
_peer->requestBlocks();
else
_peer->setIdle();
}
bool EthereumHost::peerShouldGrabBlocks(EthereumPeer* _peer) const
{
auto td = _peer->m_totalDifficulty;
auto lh = m_syncingLatestHash;
auto ctd = m_chain.details().totalDifficulty;
clog(NetNote) << "Should grab blocks? " << td << "vs" << ctd;
if (td < ctd || (td == ctd && m_chain.currentHash() == lh))
return false;
return true;
}
bool EthereumHost::peerShouldGrabChain(EthereumPeer* _peer) const
{
h256 c = m_chain.currentHash();
unsigned n = m_chain.number();
u256 td = m_chain.details().totalDifficulty;
clog(NetAllDetail) << "Attempt chain-grab? Latest:" << c << ", number:" << n << ", TD:" << td << " versus " << _peer->m_totalDifficulty;
if (td >= _peer->m_totalDifficulty)
{
clog(NetAllDetail) << "No. Our chain is better.";
return false;
}
else
{
clog(NetAllDetail) << "Yes. Their chain is better.";
return true;
}
}

30
libethereum/EthereumHost.h

@ -57,7 +57,6 @@ class BlockQueue;
class EthereumHost: public p2p::HostCapability<EthereumPeer>, Worker class EthereumHost: public p2p::HostCapability<EthereumPeer>, Worker
{ {
friend class EthereumPeer; friend class EthereumPeer;
public: public:
/// Start server, but don't listen. /// Start server, but don't listen.
EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQueue& _bq, u256 _networkId); EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQueue& _bq, u256 _networkId);
@ -72,22 +71,23 @@ public:
void reset(); void reset();
DownloadMan const& downloadMan() const { return m_man; } DownloadMan const& downloadMan() const { return m_man; }
bool isSyncing() const { return !!m_syncer; } bool isSyncing() const { return m_needSyncBlocks || m_needSyncHashes; }
bool isBanned(p2p::NodeId _id) const { return !!m_banned.count(_id); } bool isBanned(p2p::NodeId _id) const { return !!m_banned.count(_id); }
void noteNewTransactions() { m_newTransactions = true; } void noteNewTransactions() { m_newTransactions = true; }
void noteNewBlocks() { m_newBlocks = true; } void noteNewBlocks() { m_newBlocks = true; }
void onPeerState(EthereumPeer* _peer);
private: private:
std::pair<std::vector<std::shared_ptr<EthereumPeer>>, std::vector<std::shared_ptr<EthereumPeer>>> randomSelection(unsigned _percent = 25, std::function<bool(EthereumPeer*)> const& _allow = [](EthereumPeer const*){ return true; }); std::pair<std::vector<std::shared_ptr<EthereumPeer>>, std::vector<std::shared_ptr<EthereumPeer>>> randomSelection(unsigned _percent = 25, std::function<bool(EthereumPeer*)> const& _allow = [](EthereumPeer const*){ return true; });
void forEachPeer(std::function<void(std::shared_ptr<EthereumPeer>)> const& _f);
void forEachPeer(std::function<void(EthereumPeer*)> const& _f);
/// Session is tell us that we may need (re-)syncing with the peer. /// Session is tell us that we may need (re-)syncing with the peer.
void noteNeedsSyncing(EthereumPeer* _who); void noteNeedsSyncing(EthereumPeer* _who);
/// Called when the peer can no longer provide us with any needed blocks.
void noteDoneBlocks(EthereumPeer* _who, bool _clemency);
/// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network. /// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network.
void doWork(); void doWork();
@ -109,6 +109,14 @@ private:
virtual void onStopping() { stopWorking(); } virtual void onStopping() { stopWorking(); }
void changeSyncer(EthereumPeer* _ignore, bool _needHelp = true); void changeSyncer(EthereumPeer* _ignore, bool _needHelp = true);
void continueSync();
void continueSync(EthereumPeer* _peer);
void onPeerBlocks(EthereumPeer* _peer, RLP const& _r);
void onPeerDoneHashes(EthereumPeer* _peer, bool _new);
void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes);
void onPeerHashes(EthereumPeer* _peer, unsigned _index, h256s const& _hashes);
bool peerShouldGrabBlocks(EthereumPeer* _peer) const;
bool peerShouldGrabChain(EthereumPeer* _peer) const;
BlockChain const& m_chain; BlockChain const& m_chain;
TransactionQueue& m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain. TransactionQueue& m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain.
@ -116,9 +124,10 @@ private:
u256 m_networkId; u256 m_networkId;
EthereumPeer* m_syncer = nullptr; // TODO: switch to weak_ptr std::weak_ptr<EthereumPeer> m_hashSyncer;
DownloadMan m_man; DownloadMan m_man;
HashDownloadMan m_hashMan;
h256 m_latestBlockSent; h256 m_latestBlockSent;
h256Hash m_transactionsSent; h256Hash m_transactionsSent;
@ -127,6 +136,15 @@ private:
bool m_newTransactions = false; bool m_newTransactions = false;
bool m_newBlocks = false; bool m_newBlocks = false;
unsigned m_maxKnownNumber = 0;
u256 m_maxKnownDifficulty;
bool m_needSyncHashes = true;
bool m_needSyncBlocks = true;
h256 m_syncingLatestHash; ///< Peer's latest block's hash, as of the current sync.
u256 m_syncingTotalDifficulty; ///< Peer's latest block's total difficulty, as of the current sync.
h256s m_v60Hashes;
}; };
} }

462
libethereum/EthereumPeer.cpp

@ -36,9 +36,10 @@ using namespace p2p;
EthereumPeer::EthereumPeer(Session* _s, HostCapabilityFace* _h, unsigned _i): EthereumPeer::EthereumPeer(Session* _s, HostCapabilityFace* _h, unsigned _i):
Capability(_s, _h, _i), Capability(_s, _h, _i),
m_sub(host()->m_man) m_sub(host()->m_man),
m_hashSub(host()->m_hashMan)
{ {
transition(Asking::State); requestState();
} }
EthereumPeer::~EthereumPeer() EthereumPeer::~EthereumPeer()
@ -50,7 +51,7 @@ EthereumPeer::~EthereumPeer()
void EthereumPeer::abortSync() void EthereumPeer::abortSync()
{ {
if (isSyncing()) if (isSyncing())
transition(Asking::Nothing, true); setIdle();
} }
EthereumHost* EthereumPeer::host() const EthereumHost* EthereumPeer::host() const
@ -74,141 +75,87 @@ string toString(Asking _a)
return "?"; return "?";
} }
void EthereumPeer::transition(Asking _a, bool _force, bool _needHelp)
{
clog(NetMessageSummary) << "Transition!" << ::toString(_a) << "from" << ::toString(m_asking) << ", " << (isSyncing() ? "syncing" : "holding") << (needsSyncing() ? "& needed" : "");
if (m_asking == Asking::State && _a != Asking::State)
m_requireTransactions = true;
RLPStream s; void EthereumPeer::setIdle()
{
if (_a == Asking::State) if (m_asking == Asking::Blocks)
{
if (m_asking == Asking::Nothing)
{
setAsking(Asking::State, false);
prep(s, StatusPacket, 5)
<< host()->protocolVersion()
<< host()->networkId()
<< host()->m_chain.details().totalDifficulty
<< host()->m_chain.currentHash()
<< host()->m_chain.genesisHash();
sealAndSend(s);
return;
}
}
else if (_a == Asking::Hashes)
{ {
if (m_asking == Asking::State || m_asking == Asking::Nothing) clog(NetNote) << "Finishing blocks fetch...";
{ // NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary.
if (isSyncing()) m_sub.doneFetch();
clog(NetWarn) << "Bad state: not asking for Hashes, yet syncing!"; m_hashSub.doneFetch();
m_syncingLatestHash = m_latestHash;
m_syncingTotalDifficulty = m_totalDifficulty;
resetNeedsSyncing();
setAsking(_a, true); setAsking(Asking::Nothing);
prep(s, GetBlockHashesPacket, 2) << m_syncingLatestHash << c_maxHashesAsk;
m_syncingNeededBlocks = h256s(1, m_syncingLatestHash);
sealAndSend(s);
return;
}
else if (m_asking == Asking::Hashes)
{
if (!isSyncing())
clog(NetWarn) << "Bad state: asking for Hashes yet not syncing!";
setAsking(_a, true);
prep(s, GetBlockHashesPacket, 2) << m_syncingLastReceivedHash << c_maxHashesAsk;
sealAndSend(s);
return;
}
} }
else if (_a == Asking::Blocks) else if (m_asking == Asking::Hashes)
{ {
if (m_asking == Asking::Hashes) clog(NetNote) << "Finishing hashes fetch...";
{
if (!isSyncing())
clog(NetWarn) << "Bad state: asking for Hashes yet not syncing!";
if (shouldGrabBlocks())
{
clog(NetNote) << "Difficulty of hashchain HIGHER. Grabbing" << m_syncingNeededBlocks.size() << "blocks [latest now" << m_syncingLatestHash << ", was" << host()->m_latestBlockSent << "]";
host()->m_man.resetToChain(m_syncingNeededBlocks); setAsking(Asking::Nothing);
// host()->m_latestBlockSent = m_syncingLatestHash;
}
else
{
clog(NetNote) << "Difficulty of hashchain not HIGHER. Ignoring.";
m_syncingLatestHash = h256();
setAsking(Asking::Nothing, false);
return;
}
}
// run through into...
if (m_asking == Asking::Nothing || m_asking == Asking::Hashes || m_asking == Asking::Blocks)
{
// Looks like it's the best yet for total difficulty. Set to download.
setAsking(Asking::Blocks, isSyncing(), _needHelp); // will kick off other peers to help if available.
auto blocks = m_sub.nextFetch(c_maxBlocksAsk);
if (blocks.size())
{
prep(s, GetBlocksPacket, blocks.size());
for (auto const& i: blocks)
s << i;
sealAndSend(s);
}
else
transition(Asking::Nothing);
return;
}
} }
else if (_a == Asking::Nothing) else if (m_asking == Asking::State)
{ {
if (m_asking == Asking::Blocks) setAsking(Asking::Nothing);
{ }
clog(NetNote) << "Finishing blocks fetch..."; }
// a bit overkill given that the other nodes may yet have the needed blocks, but better to be safe than sorry. void EthereumPeer::requestState()
if (isSyncing()) {
host()->noteDoneBlocks(this, _force); if (m_asking != Asking::Nothing)
clog(NetWarn) << "Bad state: requesting state should be the first action";
setAsking(Asking::State);
RLPStream s;
prep(s, StatusPacket, 5)
<< host()->protocolVersion() - 1
<< host()->networkId()
<< host()->m_chain.details().totalDifficulty
<< host()->m_chain.currentHash()
<< host()->m_chain.genesisHash();
sealAndSend(s);
}
// NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary. void EthereumPeer::requestHashes()
m_sub.doneFetch(); {
assert(m_asking != Asking::Blocks);
m_syncHashNumber = m_hashSub.nextFetch(c_maxBlocksAsk);
setAsking(Asking::Hashes);
RLPStream s;
prep(s, GetBlockHashesPacket, 2) << m_syncHashNumber << c_maxHashesAsk;
sealAndSend(s);
}
setAsking(Asking::Nothing, false); void EthereumPeer::requestHashes(h256 const& _lastHash)
} {
else if (m_asking == Asking::Hashes) assert(m_asking != Asking::Blocks);
{ setAsking(Asking::Hashes);
clog(NetNote) << "Finishing hashes fetch..."; RLPStream s;
prep(s, GetBlockHashesPacket, 2) << _lastHash << c_maxHashesAsk;
sealAndSend(s);
}
setAsking(Asking::Nothing, false); void EthereumPeer::requestBlocks()
} {
else if (m_asking == Asking::State) // Looks like it's the best yet for total difficulty. Set to download.
{ setAsking(Asking::Blocks); // will kick off other peers to help if available.
setAsking(Asking::Nothing, false); auto blocks = m_sub.nextFetch(c_maxBlocksAsk);
// Just got the state - should check to see if we can be of help downloading the chain if any. if (blocks.size())
// Otherwise, should put ourselves up for sync. {
setNeedsSyncing(m_latestHash, m_totalDifficulty); RLPStream s;
} prep(s, GetBlocksPacket, blocks.size());
// Otherwise it's fine. We don't care if it's Nothing->Nothing. for (auto const& i: blocks)
return; s << i;
sealAndSend(s);
} }
else
clog(NetWarn) << "Invalid state transition:" << ::toString(_a) << "from" << ::toString(m_asking) << ", " << (isSyncing() ? "syncing" : "holding") << (needsSyncing() ? "& needed" : ""); setIdle();
return;
} }
void EthereumPeer::setAsking(Asking _a, bool _isSyncing, bool _needHelp) void EthereumPeer::setAsking(Asking _a)
{ {
bool changedAsking = (m_asking != _a);
m_asking = _a; m_asking = _a;
if (_isSyncing != (host()->m_syncer == this) || (_isSyncing && changedAsking)) if (!isSyncing())
host()->changeSyncer(_isSyncing ? this : nullptr, _needHelp);
if (!_isSyncing)
{ {
m_syncingLatestHash = h256(); m_syncingLatestHash = h256();
m_syncingTotalDifficulty = 0; m_syncingTotalDifficulty = 0;
@ -241,57 +188,7 @@ void EthereumPeer::tick()
bool EthereumPeer::isSyncing() const bool EthereumPeer::isSyncing() const
{ {
return host()->m_syncer == this; return m_asking != Asking::Nothing;
}
bool EthereumPeer::shouldGrabBlocks() const
{
auto td = m_syncingTotalDifficulty;
auto lh = m_syncingLatestHash;
auto ctd = host()->m_chain.details().totalDifficulty;
if (m_syncingNeededBlocks.empty())
return false;
clog(NetNote) << "Should grab blocks? " << td << "vs" << ctd << ";" << m_syncingNeededBlocks.size() << " blocks, ends" << m_syncingNeededBlocks.back();
if (td < ctd || (td == ctd && host()->m_chain.currentHash() == lh))
return false;
return true;
}
void EthereumPeer::attemptSync()
{
if (m_asking != Asking::Nothing)
{
clog(NetAllDetail) << "Can't synced with this peer - outstanding asks.";
return;
}
// if already done this, then ignore.
if (!needsSyncing())
{
clog(NetAllDetail) << "Already synced with this peer.";
return;
}
h256 c = host()->m_chain.currentHash();
unsigned n = host()->m_chain.number();
u256 td = host()->m_chain.details().totalDifficulty;
clog(NetAllDetail) << "Attempt chain-grab? Latest:" << c << ", number:" << n << ", TD:" << td << " versus " << m_totalDifficulty;
if (td >= m_totalDifficulty)
{
clog(NetAllDetail) << "No. Our chain is better.";
resetNeedsSyncing();
transition(Asking::Nothing);
}
else
{
clog(NetAllDetail) << "Yes. Their chain is better.";
transition(Asking::Hashes);
}
} }
bool EthereumPeer::interpret(unsigned _id, RLP const& _r) bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
@ -303,27 +200,16 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
case StatusPacket: case StatusPacket:
{ {
m_protocolVersion = _r[0].toInt<unsigned>(); m_protocolVersion = _r[0].toInt<unsigned>();
if (!!session()->cap<EthereumPeer>(EthereumHost::staticVersion()))
m_protocolVersion = host()->protocolVersion();
m_networkId = _r[1].toInt<u256>(); m_networkId = _r[1].toInt<u256>();
// a bit dirty as we're misusing these to communicate the values to transition, but harmless. // a bit dirty as we're misusing these to communicate the values to transition, but harmless.
m_totalDifficulty = _r[2].toInt<u256>(); m_totalDifficulty = _r[2].toInt<u256>();
m_latestHash = _r[3].toHash<h256>(); m_latestHash = _r[3].toHash<h256>();
auto genesisHash = _r[4].toHash<h256>(); m_genesisHash = _r[4].toHash<h256>();
clog(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << m_genesisHash << ", TD:" << m_totalDifficulty << "=" << m_latestHash;
clog(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << genesisHash << ", TD:" << m_totalDifficulty << "=" << m_latestHash; host()->onPeerState(this);
if (genesisHash != host()->m_chain.genesisHash())
disable("Invalid genesis hash");
else if (m_protocolVersion != host()->protocolVersion())
disable("Invalid protocol version.");
else if (m_networkId != host()->networkId())
disable("Invalid network identifier.");
else if (session()->info().clientVersion.find("/v0.7.0/") != string::npos)
disable("Blacklisted client version.");
else if (host()->isBanned(session()->id()))
disable("Peer banned for previous bad behaviour.");
else
transition(Asking::Nothing);
break; break;
} }
case TransactionsPacket: case TransactionsPacket:
@ -356,19 +242,45 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
} }
case GetBlockHashesPacket: case GetBlockHashesPacket:
{ {
h256 later = _r[0].toHash<h256>(); if (m_protocolVersion == host()->protocolVersion())
unsigned limit = _r[1].toInt<unsigned>(); {
clog(NetMessageSummary) << "GetBlockHashes (" << limit << "entries," << later << ")"; u256 number256 = _r[0].toInt<u256>();
unsigned number = (unsigned) number256;
unsigned c = min<unsigned>(host()->m_chain.number(later), limit); unsigned limit = _r[1].toInt<unsigned>();
clog(NetMessageSummary) << "GetBlockHashes (" << number << "-" << number + limit << ")";
RLPStream s; RLPStream s;
prep(s, BlockHashesPacket, c); if (number <= host()->m_chain.number())
h256 p = host()->m_chain.details(later).parent; {
for (unsigned i = 0; i < c && p; ++i, p = host()->m_chain.details(p).parent) unsigned c = min<unsigned>(host()->m_chain.number() - number + 1, limit);
s << p; prep(s, BlockHashesPacket, c);
sealAndSend(s); for (unsigned n = number; n < number + c; n++)
addRating(0); {
h256 p = host()->m_chain.numberHash(n);
s << p;
}
}
else
prep(s, BlockHashesPacket, 0);
sealAndSend(s);
addRating(0);
}
else
{
// Support V60 protocol
h256 later = _r[0].toHash<h256>();
unsigned limit = _r[1].toInt<unsigned>();
clog(NetMessageSummary) << "GetBlockHashes (" << limit << "entries," << later << ")";
unsigned c = min<unsigned>(host()->m_chain.number(later), limit);
RLPStream s;
prep(s, BlockHashesPacket, c);
h256 p = host()->m_chain.details(later).parent;
for (unsigned i = 0; i < c && p; ++i, p = host()->m_chain.details(p).parent)
s << p;
sealAndSend(s);
addRating(0);
}
break; break;
} }
case BlockHashesPacket: case BlockHashesPacket:
@ -383,40 +295,24 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
} }
if (itemCount == 0) if (itemCount == 0)
{ {
transition(Asking::Blocks); host()->onPeerDoneHashes(this, false);
return true; return true;
} }
unsigned knowns = 0; h256s hashes(itemCount);
unsigned unknowns = 0;
for (unsigned i = 0; i < itemCount; ++i) for (unsigned i = 0; i < itemCount; ++i)
{ {
addRating(1); hashes[i] = _r[i].toHash<h256>();
auto h = _r[i].toHash<h256>(); m_hashSub.noteHash(m_syncHashNumber + i, 1);
auto status = host()->m_bq.blockStatus(h);
if (status == QueueStatus::Importing || status == QueueStatus::Ready || host()->m_chain.isKnown(h))
{
clog(NetMessageSummary) << "block hash ready:" << h << ". Start blocks download...";
transition(Asking::Blocks);
return true;
}
else if (status == QueueStatus::Bad)
{
cwarn << "block hash bad!" << h << ". Bailing...";
transition(Asking::Nothing);
return true;
}
else if (status == QueueStatus::Unknown)
{
unknowns++;
m_syncingNeededBlocks.push_back(h);
}
else
knowns++;
m_syncingLastReceivedHash = h;
} }
clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns; now at" << m_syncingLastReceivedHash;
// run through - ask for more. if (m_protocolVersion == host()->protocolVersion())
transition(Asking::Hashes); {
//v61, report hashes ordered by number
host()->onPeerHashes(this, m_syncHashNumber, hashes);
}
else
host()->onPeerHashes(this, hashes);
m_syncHashNumber += itemCount;
break; break;
} }
case GetBlocksPacket: case GetBlocksPacket:
@ -455,74 +351,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
} }
case BlocksPacket: case BlocksPacket:
{ {
unsigned itemCount = _r.itemCount(); host()->onPeerBlocks(this, _r);
clog(NetMessageSummary) << "Blocks (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreBlocks");
if (m_asking != Asking::Blocks)
clog(NetWarn) << "Unexpected Blocks received!";
if (itemCount == 0)
{
// Got to this peer's latest block - just give up.
transition(Asking::Nothing);
break;
}
unsigned success = 0;
unsigned future = 0;
unsigned unknown = 0;
unsigned got = 0;
unsigned repeated = 0;
for (unsigned i = 0; i < itemCount; ++i)
{
auto h = BlockInfo::headerHash(_r[i].data());
if (m_sub.noteBlock(h))
{
addRating(10);
switch (host()->m_bq.import(_r[i].data(), host()->m_chain))
{
case ImportResult::Success:
success++;
break;
case ImportResult::Malformed:
case ImportResult::BadChain:
disable("Malformed block received.");
return true;
case ImportResult::FutureTime:
future++;
break;
case ImportResult::AlreadyInChain:
case ImportResult::AlreadyKnown:
got++;
break;
case ImportResult::UnknownParent:
unknown++;
break;
default:;
}
}
else
{
addRating(0); // -1?
repeated++;
}
}
clog(NetMessageSummary) << dec << success << "imported OK," << unknown << "with unknown parents," << future << "with future timestamps," << got << " already known," << repeated << " repeats received.";
if (m_asking == Asking::Blocks)
{
if (!got)
transition(Asking::Blocks);
else
transition(Asking::Nothing);
}
break; break;
} }
case NewBlockPacket: case NewBlockPacket:
@ -571,39 +400,16 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
clog(NetMessageSummary) << "Ignoring since we're already downloading."; clog(NetMessageSummary) << "Ignoring since we're already downloading.";
else else
{ {
unsigned knowns = 0;
unsigned unknowns = 0;
unsigned itemCount = _r.itemCount(); unsigned itemCount = _r.itemCount();
clog(NetMessageSummary) << "BlockHashes (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreHashes");
h256s hashes(itemCount);
for (unsigned i = 0; i < itemCount; ++i) for (unsigned i = 0; i < itemCount; ++i)
{ hashes[i] = _r[i].toHash<h256>();
addRating(1);
auto h = _r[i].toHash<h256>(); clog(NetNote) << "Not syncing and new block hash discovered: syncing without help.";
DEV_GUARDED(x_knownBlocks) host()->onPeerHashes(this, hashes);
m_knownBlocks.insert(h); host()->onPeerDoneHashes(this, true);
auto status = host()->m_bq.blockStatus(h);
if (status == QueueStatus::Importing || status == QueueStatus::Ready || host()->m_chain.isKnown(h))
knowns++;
else if (status == QueueStatus::Bad)
{
cwarn << "block hash bad!" << h << ". Bailing...";
return true;
}
else if (status == QueueStatus::Unknown)
{
unknowns++;
m_syncingNeededBlocks.push_back(h);
}
else
knowns++;
}
clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns";
if (unknowns > 0)
{
clog(NetNote) << "Not syncing and new block hash discovered: syncing without help.";
host()->m_man.resetToChain(m_syncingNeededBlocks);
host()->changeSyncer(this, false);
transition(Asking::Blocks, false, false); // TODO: transaction(Asking::NewBlocks, false)
}
return true; return true;
} }
break; break;

18
libethereum/EthereumPeer.h

@ -70,18 +70,18 @@ public:
/// What is the ethereum subprotocol host object. /// What is the ethereum subprotocol host object.
EthereumHost* host() const; EthereumHost* host() const;
void setIdle();
void requestState();
void requestHashes();
void requestHashes(h256 const& _lastHash);
void requestBlocks();
private: private:
using p2p::Capability::sealAndSend; using p2p::Capability::sealAndSend;
/// Interpret an incoming message. /// Interpret an incoming message.
virtual bool interpret(unsigned _id, RLP const& _r); virtual bool interpret(unsigned _id, RLP const& _r);
/// Transition state in a particular direction.
void transition(Asking _wantState, bool _force = false, bool _needHelp = true);
/// Attempt to begin syncing with this peer; first check the peer has a more difficlult chain to download, then start asking for hashes, then move to blocks.
void attemptSync();
/// Abort the sync operation. /// Abort the sync operation.
void abortSync(); void abortSync();
@ -89,7 +89,7 @@ private:
void clearKnownTransactions() { std::lock_guard<std::mutex> l(x_knownTransactions); m_knownTransactions.clear(); } void clearKnownTransactions() { std::lock_guard<std::mutex> l(x_knownTransactions); m_knownTransactions.clear(); }
/// Update our asking state. /// Update our asking state.
void setAsking(Asking _g, bool _isSyncing, bool _needHelp = true); void setAsking(Asking _g);
/// Update our syncing requirements state. /// Update our syncing requirements state.
void setNeedsSyncing(h256 _latestHash, u256 _td); void setNeedsSyncing(h256 _latestHash, u256 _td);
@ -123,6 +123,7 @@ private:
/// These are determined through either a Status message or from NewBlock. /// These are determined through either a Status message or from NewBlock.
h256 m_latestHash; ///< Peer's latest block's hash that we know about or default null value if no need to sync. h256 m_latestHash; ///< Peer's latest block's hash that we know about or default null value if no need to sync.
u256 m_totalDifficulty; ///< Peer's latest block's total difficulty. u256 m_totalDifficulty; ///< Peer's latest block's total difficulty.
h256 m_genesisHash; ///< Peer's genesis hash
/// Once a sync is started on this peer, they are cleared and moved into m_syncing*. /// Once a sync is started on this peer, they are cleared and moved into m_syncing*.
/// This is built as we ask for hashes. Once no more hashes are given, we present this to the /// This is built as we ask for hashes. Once no more hashes are given, we present this to the
@ -131,9 +132,12 @@ private:
h256 m_syncingLastReceivedHash; ///< Hash most recently received from peer. h256 m_syncingLastReceivedHash; ///< Hash most recently received from peer.
h256 m_syncingLatestHash; ///< Peer's latest block's hash, as of the current sync. h256 m_syncingLatestHash; ///< Peer's latest block's hash, as of the current sync.
u256 m_syncingTotalDifficulty; ///< Peer's latest block's total difficulty, as of the current sync. u256 m_syncingTotalDifficulty; ///< Peer's latest block's total difficulty, as of the current sync.
unsigned m_expectedHashes = 0; ///< Estimated Upper bound of hashes to expect from this peer.
unsigned m_syncHashNumber = 0;
/// Once we're asking for blocks, this becomes in use. /// Once we're asking for blocks, this becomes in use.
DownloadSub m_sub; DownloadSub m_sub;
HashDownloadSub m_hashSub;
/// Have we received a GetTransactions packet that we haven't yet answered? /// Have we received a GetTransactions packet that we haven't yet answered?
bool m_requireTransactions = false; bool m_requireTransactions = false;

1
libp2p/Capability.h

@ -44,6 +44,7 @@ public:
*/ */
Session* session() const { return m_session; } Session* session() const { return m_session; }
HostCapabilityFace* hostCapability() const { return m_host; } HostCapabilityFace* hostCapability() const { return m_host; }
bool enabled() { return m_enabled; }
protected: protected:
virtual bool interpret(unsigned _id, RLP const&) = 0; virtual bool interpret(unsigned _id, RLP const&) = 0;

1
libp2p/Host.h

@ -99,6 +99,7 @@ public:
/// Register a peer-capability; all new peer connections will have this capability. /// Register a peer-capability; all new peer connections will have this capability.
template <class T> std::shared_ptr<T> registerCapability(T* _t) { _t->m_host = this; std::shared_ptr<T> ret(_t); m_capabilities[std::make_pair(T::staticName(), T::staticVersion())] = ret; return ret; } template <class T> std::shared_ptr<T> registerCapability(T* _t) { _t->m_host = this; std::shared_ptr<T> ret(_t); m_capabilities[std::make_pair(T::staticName(), T::staticVersion())] = ret; return ret; }
template <class T> void addCapability(std::shared_ptr<T> const & _p, std::string const& _name, u256 const& _version) { m_capabilities[std::make_pair(_name, _version)] = _p; }
bool haveCapability(CapDesc const& _name) const { return m_capabilities.count(_name) != 0; } bool haveCapability(CapDesc const& _name) const { return m_capabilities.count(_name) != 0; }
CapDescs caps() const { CapDescs ret; for (auto const& i: m_capabilities) ret.push_back(i.first); return ret; } CapDescs caps() const { CapDescs ret; for (auto const& i: m_capabilities) ret.push_back(i.first); return ret; }

7
libp2p/HostCapability.cpp

@ -28,12 +28,17 @@ using namespace dev;
using namespace dev::p2p; using namespace dev::p2p;
std::vector<std::pair<std::shared_ptr<Session>,std::shared_ptr<Peer>>> HostCapabilityFace::peerSessions() const std::vector<std::pair<std::shared_ptr<Session>,std::shared_ptr<Peer>>> HostCapabilityFace::peerSessions() const
{
return peerSessions(version());
}
std::vector<std::pair<std::shared_ptr<Session>,std::shared_ptr<Peer>>> HostCapabilityFace::peerSessions(u256 const& _version) const
{ {
RecursiveGuard l(m_host->x_sessions); RecursiveGuard l(m_host->x_sessions);
std::vector<std::pair<std::shared_ptr<Session>,std::shared_ptr<Peer>>> ret; std::vector<std::pair<std::shared_ptr<Session>,std::shared_ptr<Peer>>> ret;
for (auto const& i: m_host->m_sessions) for (auto const& i: m_host->m_sessions)
if (std::shared_ptr<Session> s = i.second.lock()) if (std::shared_ptr<Session> s = i.second.lock())
if (s->m_capabilities.count(capDesc())) if (s->m_capabilities.count(std::make_pair(name(), _version)))
ret.push_back(make_pair(s,s->m_peer)); ret.push_back(make_pair(s,s->m_peer));
return ret; return ret;
} }

1
libp2p/HostCapability.h

@ -46,6 +46,7 @@ public:
Host* host() const { return m_host; } Host* host() const { return m_host; }
std::vector<std::pair<std::shared_ptr<Session>,std::shared_ptr<Peer>>> peerSessions() const; std::vector<std::pair<std::shared_ptr<Session>,std::shared_ptr<Peer>>> peerSessions() const;
std::vector<std::pair<std::shared_ptr<Session>,std::shared_ptr<Peer>>> peerSessions(u256 const& _version) const;
protected: protected:
virtual std::string name() const = 0; virtual std::string name() const = 0;

2
libp2p/Session.h

@ -69,6 +69,8 @@ public:
template <class PeerCap> template <class PeerCap>
std::shared_ptr<PeerCap> cap() const { try { return std::static_pointer_cast<PeerCap>(m_capabilities.at(std::make_pair(PeerCap::name(), PeerCap::version()))); } catch (...) { return nullptr; } } std::shared_ptr<PeerCap> cap() const { try { return std::static_pointer_cast<PeerCap>(m_capabilities.at(std::make_pair(PeerCap::name(), PeerCap::version()))); } catch (...) { return nullptr; } }
template <class PeerCap>
std::shared_ptr<PeerCap> cap(u256 const& _version) const { try { return std::static_pointer_cast<PeerCap>(m_capabilities.at(std::make_pair(PeerCap::name(), _version))); } catch (...) { return nullptr; } }
static RLPStream& prep(RLPStream& _s, PacketType _t, unsigned _args = 0); static RLPStream& prep(RLPStream& _s, PacketType _t, unsigned _args = 0);
void sealAndSend(RLPStream& _s); void sealAndSend(RLPStream& _s);

Loading…
Cancel
Save