Browse Source

Merge pull request #2118 from arkpar/bc2

Hash downloading improvements
cl-refactor
Gav Wood 10 years ago
parent
commit
3a1c323e4d
  1. 4
      alethzero/MainWin.cpp
  2. 8
      libdevcore/RangeMask.h
  3. 6
      libethereum/Client.cpp
  4. 2
      libethereum/Client.h
  5. 7
      libethereum/CommonNet.h
  6. 7
      libethereum/DownloadMan.cpp
  7. 8
      libethereum/DownloadMan.h
  8. 96
      libethereum/EthereumHost.cpp
  9. 11
      libethereum/EthereumHost.h
  10. 13
      libethereum/EthereumPeer.cpp
  11. 5
      libethereum/EthereumPeer.h

4
alethzero/MainWin.cpp

@ -1240,7 +1240,9 @@ void Main::refreshBlockCount()
{
auto d = ethereum()->blockChain().details();
BlockQueueStatus b = ethereum()->blockQueueStatus();
ui->chainStatus->setText(QString("%3 ready %4 verifying %5 unverified %6 future %7 unknown %8 bad %1 #%2").arg(m_privateChain.size() ? "[" + m_privateChain + "] " : "testnet").arg(d.number).arg(b.verified).arg(b.verifying).arg(b.unverified).arg(b.future).arg(b.unknown).arg(b.bad));
HashChainStatus h = ethereum()->hashChainStatus();
ui->chainStatus->setText(QString("%9/%10%11 hashes %3 ready %4 verifying %5 unverified %6 future %7 unknown %8 bad %1 #%2")
.arg(m_privateChain.size() ? "[" + m_privateChain + "] " : "testnet").arg(d.number).arg(b.verified).arg(b.verifying).arg(b.unverified).arg(b.future).arg(b.unknown).arg(b.bad).arg(h.received).arg(h.estimated ? "~" : "").arg(h.total));
}
void Main::on_turboMining_triggered()

8
libdevcore/RangeMask.h

@ -219,6 +219,14 @@ public:
return uit == m_ranges.end() ? m_all.second : uit->first;
}
size_t size() const
{
size_t c = 0;
for (auto const& r: this->m_ranges)
c += r.second - r.first;
return c;
}
private:
UnsignedRange m_all;
std::map<T, T> m_ranges;

6
libethereum/Client.cpp

@ -877,3 +877,9 @@ void Client::flushTransactions()
{
doWork();
}
HashChainStatus Client::hashChainStatus() const
{
auto h = m_host.lock();
return h ? h->status() : HashChainStatus { 0, 0, false };
}

2
libethereum/Client.h

@ -156,6 +156,8 @@ public:
CanonBlockChain const& blockChain() const { return m_bc; }
/// Get some information on the block queue.
BlockQueueStatus blockQueueStatus() const { return m_bq.status(); }
/// Get some information on the block queue.
HashChainStatus hashChainStatus() const;
// Mining stuff:

7
libethereum/CommonNet.h

@ -84,5 +84,12 @@ enum class Syncing
Done
};
struct HashChainStatus
{
unsigned total;
unsigned received;
bool estimated;
};
}
}

7
libethereum/DownloadMan.cpp

@ -80,7 +80,6 @@ HashDownloadSub::HashDownloadSub(HashDownloadMan& _man): m_man(&_man)
{
WriteGuard l(m_man->x_subs);
m_asked = RangeMask<unsigned>(m_man->m_chainStart, m_man->m_chainStart + m_man->m_chainCount);
m_attempted = RangeMask<unsigned>(m_man->m_chainStart, m_man->m_chainStart + m_man->m_chainCount);
m_man->m_subs.insert(this);
}
@ -98,7 +97,6 @@ void HashDownloadSub::resetFetch()
Guard l(m_fetch);
m_remaining = 0;
m_asked = RangeMask<unsigned>(m_man->m_chainStart, m_man->m_chainStart + m_man->m_chainCount);
m_attempted = RangeMask<unsigned>(m_man->m_chainStart, m_man->m_chainStart + m_man->m_chainCount);
}
unsigned HashDownloadSub::nextFetch(unsigned _n)
@ -110,10 +108,9 @@ unsigned HashDownloadSub::nextFetch(unsigned _n)
if (!m_man || m_man->chainEmpty())
return 0;
m_asked = (~(m_man->taken() + m_attempted)).lowest(_n);
m_asked = (~(m_man->taken())).lowest(_n);
if (m_asked.empty())
m_asked = (~(m_man->taken(true) + m_attempted)).lowest(_n);
m_attempted += m_asked;
m_asked = (~(m_man->taken(true))).lowest(_n);
return *m_asked.begin();
}

8
libethereum/DownloadMan.h

@ -187,7 +187,6 @@ public:
bool askedContains(unsigned _i) const { Guard l(m_fetch); return m_asked.contains(_i); }
RangeMask<unsigned> const& asked() const { return m_asked; }
RangeMask<unsigned> const& attemped() const { return m_attempted; }
private:
void resetFetch(); // Called by DownloadMan when we need to reset the download.
@ -196,7 +195,6 @@ private:
mutable Mutex m_fetch;
unsigned m_remaining;
RangeMask<unsigned> m_asked;
RangeMask<unsigned> m_attempted;
};
class HashDownloadMan
@ -255,6 +253,11 @@ public:
return m_got.full();
}
unsigned gotCount() const
{
return m_got.size();
}
size_t chainSize() const { ReadGuard l(m_lock); return m_chainCount; }
size_t chainEmpty() const { ReadGuard l(m_lock); return m_chainCount == 0; }
void foreachSub(std::function<void(HashDownloadSub const&)> const& _f) const { ReadGuard l(x_subs); for(auto i: m_subs) _f(*i); }
@ -274,3 +277,4 @@ private:
}
}

96
libethereum/EthereumHost.cpp

@ -237,7 +237,7 @@ void EthereumHost::maintainBlocks(h256 const& _currentHash)
void EthereumHost::onPeerStatus(EthereumPeer* _peer)
{
Guard l(x_sync);
RecursiveGuard l(x_sync);
if (_peer->m_genesisHash != m_chain.genesisHash())
_peer->disable("Invalid genesis hash");
else if (_peer->m_protocolVersion != protocolVersion() && _peer->m_protocolVersion != c_oldProtocolVersion)
@ -252,12 +252,13 @@ void EthereumHost::onPeerStatus(EthereumPeer* _peer)
{
if (_peer->m_protocolVersion != protocolVersion())
estimatePeerHashes(_peer);
else if (_peer->m_latestBlockNumber > m_chain.number())
_peer->m_expectedHashes = (unsigned)_peer->m_latestBlockNumber - m_chain.number() + 1000;
else
_peer->m_expectedHashes = 1000;
if (m_hashMan.chainSize() < _peer->m_expectedHashes)
m_hashMan.resetToRange(m_chain.number() + 1, _peer->m_expectedHashes);
{
if (_peer->m_latestBlockNumber > m_chain.number())
_peer->m_expectedHashes = (unsigned)_peer->m_latestBlockNumber - m_chain.number();
if (m_hashMan.chainSize() < _peer->m_expectedHashes)
m_hashMan.resetToRange(m_chain.number() + 1, _peer->m_expectedHashes);
}
continueSync(_peer);
}
}
@ -267,7 +268,7 @@ void EthereumHost::estimatePeerHashes(EthereumPeer* _peer)
BlockInfo block = m_chain.info();
time_t lastBlockTime = (block.hash() == m_chain.genesisHash()) ? 1428192000 : (time_t)block.timestamp;
time_t now = time(0);
unsigned blockCount = 1000;
unsigned blockCount = 30000;
if (lastBlockTime > now)
clog(NetWarn) << "Clock skew? Latest block is in the future";
else
@ -285,7 +286,7 @@ void EthereumHost::noteRude(p2p::NodeId const& _id, std::string const& _client)
void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes)
{
Guard l(x_sync);
RecursiveGuard l(x_sync);
assert(_peer->m_asking == Asking::Nothing);
onPeerHashes(_peer, _hashes, false);
}
@ -294,13 +295,23 @@ void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes, bool
{
if (_hashes.empty())
{
onPeerDoneHashes(_peer, true);
_peer->m_hashSub.doneFetch();
continueSync();
return;
}
bool syncByNumber = _peer->m_syncHashNumber;
if (!syncByNumber && _peer->m_syncHash != m_syncingLatestHash)
{
// Obsolete hashes, discard
continueSync(_peer);
return;
}
unsigned knowns = 0;
unsigned unknowns = 0;
h256s neededBlocks;
bool syncByNumber = !m_syncingLatestHash;
unsigned firstNumber = _peer->m_syncHashNumber - _hashes.size();
for (unsigned i = 0; i < _hashes.size(); ++i)
{
_peer->addRating(1);
@ -330,8 +341,11 @@ void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes, bool
}
else
knowns++;
if (!syncByNumber)
m_syncingLatestHash = h;
else
_peer->m_hashSub.noteHash(firstNumber + i, 1);
}
if (syncByNumber)
{
@ -377,13 +391,14 @@ void EthereumHost::onPeerDoneHashes(EthereumPeer* _peer, bool _localChain)
{
m_man.resetToChain(m_hashes);
m_hashes.clear();
m_hashMan.reset(m_chain.number() + 1);
}
continueSync();
}
void EthereumHost::onPeerBlocks(EthereumPeer* _peer, RLP const& _r)
{
Guard l(x_sync);
RecursiveGuard l(x_sync);
assert(_peer->m_asking == Asking::Nothing);
unsigned itemCount = _r.itemCount();
clog(NetMessageSummary) << "Blocks (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreBlocks");
@ -393,6 +408,7 @@ void EthereumHost::onPeerBlocks(EthereumPeer* _peer, RLP const& _r)
// Got to this peer's latest block - just give up.
clog(NetNote) << "Finishing blocks fetch...";
// NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary.
_peer->m_sub.doneFetch();
_peer->setIdle();
return;
}
@ -459,7 +475,7 @@ void EthereumHost::onPeerBlocks(EthereumPeer* _peer, RLP const& _r)
void EthereumHost::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes)
{
Guard l(x_sync);
RecursiveGuard l(x_sync);
if (isSyncing_UNSAFE())
{
clog(NetMessageSummary) << "Ignoring new hashes since we're already downloading.";
@ -471,7 +487,7 @@ void EthereumHost::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes)
void EthereumHost::onPeerNewBlock(EthereumPeer* _peer, RLP const& _r)
{
Guard l(x_sync);
RecursiveGuard l(x_sync);
if (isSyncing_UNSAFE())
{
clog(NetMessageSummary) << "Ignoring new blocks since we're already downloading.";
@ -558,6 +574,17 @@ void EthereumHost::onPeerTransactions(EthereumPeer* _peer, RLP const& _r)
}
}
void EthereumHost::onPeerAborting(EthereumPeer* _peer)
{
RecursiveGuard l(x_sync);
if (_peer->isSyncing())
{
_peer->setIdle();
_peer->setRude();
continueSync();
}
}
void EthereumHost::continueSync()
{
clog(NetAllDetail) << "Getting help with downloading hashes and blocks";
@ -571,22 +598,37 @@ void EthereumHost::continueSync()
void EthereumHost::continueSync(EthereumPeer* _peer)
{
assert(_peer->m_asking == Asking::Nothing);
bool otherPeerSync = false;
bool otherPeerV60Sync = false;
bool otherPeerV61Sync = false;
if (m_needSyncHashes && peerShouldGrabChain(_peer))
{
foreachPeer([&](EthereumPeer* _p)
{
if (_p != _peer && _p->m_asking == Asking::Hashes && _p->m_protocolVersion != protocolVersion())
otherPeerSync = true; // Already have a peer downloading hash chain with old protocol, do nothing
if (_p != _peer && _p->m_asking == Asking::Hashes)
{
if (_p->m_protocolVersion != protocolVersion())
otherPeerV60Sync = true; // Already have a peer downloading hash chain with old protocol, do nothing
else
otherPeerV61Sync = true; // Already have a peer downloading hash chain with V61+ protocol, join if supported
}
});
if (otherPeerSync)
if (otherPeerV60Sync && !m_hashes.empty())
{
/// Downloading from other peer with v60 protocol, nothing else we can do
_peer->setIdle();
return;
}
if (_peer->m_protocolVersion == protocolVersion() && !m_syncingLatestHash)
if (otherPeerV61Sync && _peer->m_protocolVersion != protocolVersion())
{
/// Downloading from other peer with v61+ protocol which this peer does not support,
_peer->setIdle();
return;
}
if (_peer->m_protocolVersion == protocolVersion() && !m_hashMan.isComplete())
{
m_syncingV61 = true;
_peer->requestHashes(); /// v61+ and not catching up to a particular hash
}
else
{
// Restart/continue sync in single peer mode
@ -595,7 +637,14 @@ void EthereumHost::continueSync(EthereumPeer* _peer)
m_syncingLatestHash =_peer->m_latestHash;
m_syncingTotalDifficulty = _peer->m_totalDifficulty;
}
_peer->requestHashes(m_syncingLatestHash);
if (_peer->m_totalDifficulty >= m_syncingTotalDifficulty)
{
_peer->requestHashes(m_syncingLatestHash);
m_syncingV61 = false;
m_estimatedHashes = _peer->m_expectedHashes;
}
else
_peer->setIdle();
}
}
else if (m_needSyncBlocks && peerShouldGrabBlocks(_peer)) // Check if this peer can help with downloading blocks
@ -655,3 +704,12 @@ bool EthereumHost::isSyncing_UNSAFE() const
});
return syncing;
}
HashChainStatus EthereumHost::status()
{
RecursiveGuard l(x_sync);
if (m_syncingV61)
return HashChainStatus { static_cast<unsigned>(m_hashMan.chainSize()), static_cast<unsigned>(m_hashMan.gotCount()), false };
return HashChainStatus { m_estimatedHashes, static_cast<unsigned>(m_hashes.size()), true };
}

11
libethereum/EthereumHost.h

@ -70,7 +70,7 @@ public:
void reset();
DownloadMan const& downloadMan() const { return m_man; }
bool isSyncing() const { Guard l(x_sync); return isSyncing_UNSAFE(); }
bool isSyncing() const { RecursiveGuard l(x_sync); return isSyncing_UNSAFE(); }
bool isBanned(p2p::NodeId const& _id) const { return !!m_banned.count(_id); }
void noteRude(p2p::NodeId const& _id, std::string const& _client);
bool isRude(p2p::NodeId const& _id, std::string const& _client) const { return m_rudeClients.count(_client) && m_rudeNodes.count(_id); }
@ -84,10 +84,12 @@ public:
void onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes); ///< Called by peer once it has new hashes
void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes); ///< Called by peer once it has another sequential block of hashes during sync
void onPeerTransactions(EthereumPeer* _peer, RLP const& _r); ///< Called by peer when it has new transactions
void onPeerAborting(EthereumPeer* _peer); ///< Called by peer when it is disconnecting
DownloadMan& downloadMan() { return m_man; }
HashDownloadMan& hashDownloadMan() { return m_hashMan; }
BlockChain const& chain() { return m_chain; }
HashChainStatus status();
static unsigned const c_oldProtocolVersion;
@ -143,13 +145,14 @@ private:
bool m_newTransactions = false;
bool m_newBlocks = false;
mutable Mutex x_sync;
mutable RecursiveMutex x_sync;
bool m_needSyncHashes = true; ///< Indicates if need to downlad hashes
bool m_needSyncBlocks = true; ///< Indicates if we still need to download some blocks
h256 m_syncingLatestHash; ///< Latest block's hash, as of the current sync.
u256 m_syncingTotalDifficulty; ///< Latest block's total difficulty, as of the current sync.
h256s m_hashes; ///< List of hashes with unknown block numbers. Used for v60 chain downloading and catching up to a particular unknown
h256s m_hashes; ///< List of hashes with unknown block numbers. Used for PV60 chain downloading and catching up to a particular unknown
unsigned m_estimatedHashes = 0; ///< Number of estimated hashes for the last peer over PV60. Used for status reporting only.
bool m_syncingV61 = false; ///< True if recent activity was over pv61+. Used for status reporting only.
std::unordered_set<p2p::NodeId> m_rudeNodes; ///< Nodes that were impolite while syncing. We avoid syncing from these if possible.
std::unordered_set<std::string> m_rudeClients; ///< Nodes that were impolite while syncing. We avoid syncing from these if possible.
};

13
libethereum/EthereumPeer.cpp

@ -54,9 +54,7 @@ EthereumPeer::~EthereumPeer()
void EthereumPeer::abortSync()
{
if (m_asking != Asking::Nothing)
setRude();
setIdle();
host()->onPeerAborting(this);
}
void EthereumPeer::setRude()
@ -115,6 +113,7 @@ void EthereumPeer::requestHashes()
{
assert(m_asking == Asking::Nothing);
m_syncHashNumber = m_hashSub.nextFetch(c_maxHashesAsk);
m_syncHash = h256();
setAsking(Asking::Hashes);
RLPStream s;
prep(s, GetBlockHashesByNumberPacket, 2) << m_syncHashNumber << c_maxHashesAsk;
@ -129,6 +128,8 @@ void EthereumPeer::requestHashes(h256 const& _lastHash)
RLPStream s;
prep(s, GetBlockHashesPacket, 2) << _lastHash << c_maxHashesAsk;
clog(NetMessageDetail) << "Requesting block hashes staring from " << _lastHash;
m_syncHash = _lastHash;
m_syncHashNumber = 0;
sealAndSend(s);
}
@ -250,12 +251,10 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
setAsking(Asking::Nothing);
h256s hashes(itemCount);
for (unsigned i = 0; i < itemCount; ++i)
{
hashes[i] = _r[i].toHash<h256>();
m_hashSub.noteHash(m_syncHashNumber + i, 1);
}
m_syncHashNumber += itemCount;
if (m_syncHashNumber > 0)
m_syncHashNumber += itemCount;
host()->onPeerHashes(this, hashes);
break;
}

5
libethereum/EthereumPeer.h

@ -134,8 +134,9 @@ private:
/// This is built as we ask for hashes. Once no more hashes are given, we present this to the
/// host who initialises the DownloadMan and m_sub becomes active for us to begin asking for blocks.
unsigned m_expectedHashes = 0; ///< Estimated upper bound of hashes to expect from this peer.
unsigned m_syncHashNumber = 0; ///< Number of latest hash we sync to
unsigned m_expectedHashes = 0; ///< Estimated upper bound of hashes to expect from this peer.
unsigned m_syncHashNumber = 0; ///< Number of latest hash we sync to (PV61+)
h256 m_syncHash; ///< Latest hash we sync to (PV60)
/// Once we're asking for blocks, this becomes in use.
DownloadSub m_sub;

Loading…
Cancel
Save