Browse Source

Merge pull request #2514 from arkpar/bc

Blockchain sync minor fixes
cl-refactor
Gav Wood 10 years ago
parent
commit
9dd9d3169f
  1. 47
      libethereum/BlockChainSync.cpp
  2. 1
      libethereum/BlockChainSync.h
  3. 25
      libethereum/EthereumHost.cpp
  4. 2
      libethereum/EthereumHost.h
  5. 4
      libethereum/EthereumPeer.cpp

47
libethereum/BlockChainSync.cpp

@ -300,7 +300,7 @@ void BlockChainSync::onPeerNewBlock(std::shared_ptr<EthereumPeer> _peer, RLP con
u256 totalDifficulty = _r[1].toInt<u256>(); u256 totalDifficulty = _r[1].toInt<u256>();
if (totalDifficulty > _peer->m_totalDifficulty) if (totalDifficulty > _peer->m_totalDifficulty)
{ {
clog(NetMessageDetail) << "Received block with no known parent. Resyncing..."; clog(NetMessageDetail) << "Received block with no known parent. Peer needs syncing...";
resetSyncFor(_peer, h, totalDifficulty); resetSyncFor(_peer, h, totalDifficulty);
} }
break; break;
@ -648,12 +648,10 @@ void PV60Sync::noteDoneBlocks(std::shared_ptr<EthereumPeer> _peer, bool _clemenc
clog(NetNote) << "Chain download failed. Aborted while incomplete."; clog(NetNote) << "Chain download failed. Aborted while incomplete.";
else else
{ {
// Done our chain-get. // This can happen when the leading peer aborts and the one that is selected instead does not have all the blocks.
clog(NetWarn) << "Chain download failed. Peer with blocks didn't have them all. This peer is bad and should be punished."; // Just stop syncing to this peer. Sync will restart if there are no more peers to sync with.
clog(NetWarn) << downloadMan().remaining(); clog(NetNote) << "Peer does not have required blocks";
clog(NetWarn) << "WOULD BAN."; resetNeedsSyncing(_peer);
// m_banned.insert(_peer->session()->id()); // We know who you are!
// _peer->disable("Peer sent hashes but was unable to provide the blocks.");
} }
resetSync(); resetSync();
downloadMan().reset(); downloadMan().reset();
@ -747,7 +745,7 @@ void PV60Sync::onPeerNewHashes(std::shared_ptr<EthereumPeer> _peer, h256s const&
RecursiveGuard l(x_sync); RecursiveGuard l(x_sync);
if (isSyncing() && (m_state != SyncState::NewBlocks || isSyncing(_peer))) if (isSyncing() && (m_state != SyncState::NewBlocks || isSyncing(_peer)))
{ {
clog(NetMessageSummary) << "Ignoring since we're already downloading."; clog(NetMessageDetail) << "Ignoring new hashes since we're already downloading.";
return; return;
} }
clog(NetMessageDetail) << "Not syncing and new block hash discovered: syncing without help."; clog(NetMessageDetail) << "Not syncing and new block hash discovered: syncing without help.";
@ -838,7 +836,7 @@ void PV60Sync::onPeerAborting()
// Can't check invariants here since the peers is already removed from the list and the state is not updated yet. // Can't check invariants here since the peers is already removed from the list and the state is not updated yet.
if (m_syncer.expired() && m_state != SyncState::Idle) if (m_syncer.expired() && m_state != SyncState::Idle)
{ {
clog(NetWarn) << "Syncing peer disconnected, restarting sync"; clog(NetNote) << "Syncing peer disconnected";
m_syncer.reset(); m_syncer.reset();
abortSync(); abortSync();
} }
@ -969,11 +967,17 @@ void PV61Sync::completeSubchain(std::shared_ptr<EthereumPeer> _peer, unsigned _n
{ {
//Done chain-get //Done chain-get
m_syncingNeededBlocks.clear(); m_syncingNeededBlocks.clear();
// Add hashes to download skipping onces that are already downloaded
for (auto h = m_completeChainMap.rbegin(); h != m_completeChainMap.rend(); ++h) for (auto h = m_completeChainMap.rbegin(); h != m_completeChainMap.rend(); ++h)
m_syncingNeededBlocks.insert(m_syncingNeededBlocks.end(), h->second.hashes.begin(), h->second.hashes.end()); if (!host().chain().isKnown(h->second.hashes.front()) && !host().chain().isKnown(h->second.hashes.back()))
m_completeChainMap.clear(); {
m_knownHashes.clear(); if (host().bq().blockStatus(h->second.hashes.front()) == QueueStatus::Unknown || host().bq().blockStatus(h->second.hashes.back()) == QueueStatus::Unknown)
m_syncingBlockNumber = 0; m_syncingNeededBlocks.insert(m_syncingNeededBlocks.end(), h->second.hashes.begin(), h->second.hashes.end());
else
for (h256 const& hash: h->second.hashes)
if (!host().chain().isKnown(hash) && host().bq().blockStatus(hash) == QueueStatus::Unknown)
m_syncingNeededBlocks.insert(m_syncingNeededBlocks.end(), hash);
}
transition(syncer, SyncState::Blocks); transition(syncer, SyncState::Blocks);
} }
else else
@ -1007,6 +1011,7 @@ void PV61Sync::onPeerHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _h
{ {
// End of hash chain, add last chunk to download // End of hash chain, add last chunk to download
m_readyChainMap.insert(make_pair(m_syncingBlockNumber, SubChain{ h256s{ _peer->m_latestHash }, _peer->m_latestHash })); m_readyChainMap.insert(make_pair(m_syncingBlockNumber, SubChain{ h256s{ _peer->m_latestHash }, _peer->m_latestHash }));
m_knownHashes.insert(_peer->m_latestHash);
m_hashScanComplete = true; m_hashScanComplete = true;
_peer->m_syncHashNumber = 0; _peer->m_syncHashNumber = 0;
requestSubchain(_peer); requestSubchain(_peer);
@ -1032,7 +1037,13 @@ void PV61Sync::onPeerHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _h
if (isSyncing(_peer) && _peer->m_syncHashNumber == m_syncingBlockNumber) if (isSyncing(_peer) && _peer->m_syncHashNumber == m_syncingBlockNumber)
{ {
// Got new subchain marker // Got new subchain marker
assert(_hashes.size() == 1); if (_hashes.size() != 1)
{
clog(NetWarn) << "Peer sent too many hashes";
_peer->disable("Too many hashes");
restartSync();
return;
}
m_knownHashes.insert(_hashes[0]); m_knownHashes.insert(_hashes[0]);
m_readyChainMap.insert(make_pair(m_syncingBlockNumber, SubChain{ h256s{ _hashes[0] }, _hashes[0] })); m_readyChainMap.insert(make_pair(m_syncingBlockNumber, SubChain{ h256s{ _hashes[0] }, _hashes[0] }));
if ((m_readyChainMap.size() + m_downloadingChainMap.size() + m_completeChainMap.size()) * c_hashSubchainSize > _peer->m_expectedHashes) if ((m_readyChainMap.size() + m_downloadingChainMap.size() + m_completeChainMap.size()) * c_hashSubchainSize > _peer->m_expectedHashes)
@ -1186,6 +1197,14 @@ bool PV61Sync::isPV61Syncing() const
return m_syncingBlockNumber != 0; return m_syncingBlockNumber != 0;
} }
void PV61Sync::completeSync()
{
m_completeChainMap.clear();
m_knownHashes.clear();
m_syncingBlockNumber = 0;
PV60Sync::completeSync();
}
bool PV61Sync::invariants() const bool PV61Sync::invariants() const
{ {
if (m_state == SyncState::Hashes) if (m_state == SyncState::Hashes)

1
libethereum/BlockChainSync.h

@ -292,6 +292,7 @@ public:
protected: protected:
void restartSync() override; void restartSync() override;
void completeSync() override;
void requestSubchain(std::shared_ptr<EthereumPeer> _peer) override; void requestSubchain(std::shared_ptr<EthereumPeer> _peer) override;
void syncHashes(std::shared_ptr<EthereumPeer> _peer) override; void syncHashes(std::shared_ptr<EthereumPeer> _peer) override;
void onPeerHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes) override; void onPeerHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes) override;

25
libethereum/EthereumHost.cpp

@ -84,7 +84,7 @@ bool EthereumHost::ensureInitialised()
void EthereumHost::reset() void EthereumHost::reset()
{ {
Guard l(x_sync); RecursiveGuard l(x_sync);
if (m_sync) if (m_sync)
m_sync->abortSync(); m_sync->abortSync();
m_sync.reset(); m_sync.reset();
@ -118,7 +118,7 @@ void EthereumHost::doWork()
if (m_syncStart) if (m_syncStart)
{ {
DEV_GUARDED(x_sync) DEV_RECURSIVE_GUARDED(x_sync)
if (!m_sync) if (!m_sync)
{ {
time_t now = std::chrono::system_clock::to_time_t(chrono::system_clock::now()); time_t now = std::chrono::system_clock::to_time_t(chrono::system_clock::now());
@ -288,46 +288,41 @@ BlockChainSync* EthereumHost::sync()
void EthereumHost::onPeerStatus(std::shared_ptr<EthereumPeer> _peer) void EthereumHost::onPeerStatus(std::shared_ptr<EthereumPeer> _peer)
{ {
Guard l(x_sync); RecursiveGuard l(x_sync);
if (sync()) if (sync())
sync()->onPeerStatus(_peer); sync()->onPeerStatus(_peer);
} }
void EthereumHost::onPeerHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes) void EthereumHost::onPeerHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes)
{ {
Guard l(x_sync); RecursiveGuard l(x_sync);
if (sync()) if (sync())
sync()->onPeerHashes(_peer, _hashes); sync()->onPeerHashes(_peer, _hashes);
} }
void EthereumHost::onPeerBlocks(std::shared_ptr<EthereumPeer> _peer, RLP const& _r) void EthereumHost::onPeerBlocks(std::shared_ptr<EthereumPeer> _peer, RLP const& _r)
{ {
Guard l(x_sync); RecursiveGuard l(x_sync);
if (sync()) if (sync())
sync()->onPeerBlocks(_peer, _r); sync()->onPeerBlocks(_peer, _r);
} }
void EthereumHost::onPeerNewHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes) void EthereumHost::onPeerNewHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes)
{ {
Guard l(x_sync); RecursiveGuard l(x_sync);
if (sync()) if (sync())
sync()->onPeerNewHashes(_peer, _hashes); sync()->onPeerNewHashes(_peer, _hashes);
} }
void EthereumHost::onPeerNewBlock(std::shared_ptr<EthereumPeer> _peer, RLP const& _r) void EthereumHost::onPeerNewBlock(std::shared_ptr<EthereumPeer> _peer, RLP const& _r)
{ {
Guard l(x_sync); RecursiveGuard l(x_sync);
if (sync()) if (sync())
sync()->onPeerNewBlock(_peer, _r); sync()->onPeerNewBlock(_peer, _r);
} }
void EthereumHost::onPeerTransactions(std::shared_ptr<EthereumPeer> _peer, RLP const& _r) void EthereumHost::onPeerTransactions(std::shared_ptr<EthereumPeer> _peer, RLP const& _r)
{ {
if (_peer->isCriticalSyncing())
{
clog(EthereumHostTrace) << "Ignoring transaction from peer we are syncing with";
return;
}
unsigned itemCount = _r.itemCount(); unsigned itemCount = _r.itemCount();
clog(EthereumHostTrace) << "Transactions (" << dec << itemCount << "entries)"; clog(EthereumHostTrace) << "Transactions (" << dec << itemCount << "entries)";
m_tq.enqueue(_r, _peer->session()->id()); m_tq.enqueue(_r, _peer->session()->id());
@ -335,7 +330,7 @@ void EthereumHost::onPeerTransactions(std::shared_ptr<EthereumPeer> _peer, RLP c
void EthereumHost::onPeerAborting() void EthereumHost::onPeerAborting()
{ {
Guard l(x_sync); RecursiveGuard l(x_sync);
try try
{ {
if (m_sync) if (m_sync)
@ -349,7 +344,7 @@ void EthereumHost::onPeerAborting()
bool EthereumHost::isSyncing() const bool EthereumHost::isSyncing() const
{ {
Guard l(x_sync); RecursiveGuard l(x_sync);
if (!m_sync) if (!m_sync)
return false; return false;
return m_sync->isSyncing(); return m_sync->isSyncing();
@ -357,7 +352,7 @@ bool EthereumHost::isSyncing() const
SyncStatus EthereumHost::status() const SyncStatus EthereumHost::status() const
{ {
Guard l(x_sync); RecursiveGuard l(x_sync);
if (!m_sync) if (!m_sync)
return SyncStatus(); return SyncStatus();
return m_sync->status(); return m_sync->status();

2
libethereum/EthereumHost.h

@ -134,7 +134,7 @@ private:
bool m_newTransactions = false; bool m_newTransactions = false;
bool m_newBlocks = false; bool m_newBlocks = false;
mutable Mutex x_sync; mutable RecursiveMutex x_sync;
mutable Mutex x_transactions; mutable Mutex x_transactions;
DownloadMan m_man; DownloadMan m_man;
std::unique_ptr<BlockChainSync> m_sync; std::unique_ptr<BlockChainSync> m_sync;

4
libethereum/EthereumPeer.cpp

@ -145,7 +145,7 @@ void EthereumPeer::requestHashes(u256 _number, unsigned _count)
setAsking(Asking::Hashes); setAsking(Asking::Hashes);
RLPStream s; RLPStream s;
prep(s, GetBlockHashesByNumberPacket, 2) << m_syncHashNumber << _count; prep(s, GetBlockHashesByNumberPacket, 2) << m_syncHashNumber << _count;
clog(NetMessageDetail) << "Requesting block hashes for numbers " << m_syncHashNumber << "-" << m_syncHashNumber + c_maxHashesAsk - 1; clog(NetMessageDetail) << "Requesting block hashes for numbers " << m_syncHashNumber << "-" << m_syncHashNumber + _count - 1;
sealAndSend(s); sealAndSend(s);
} }
@ -298,7 +298,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
if (m_asking != Asking::Hashes) if (m_asking != Asking::Hashes)
{ {
clog(NetWarn) << "Peer giving us hashes when we didn't ask for them."; clog(NetAllDetail) << "Peer giving us hashes when we didn't ask for them.";
break; break;
} }
setIdle(); setIdle();

Loading…
Cancel
Save