Browse Source

Merge remote-tracking branch 'upstream/develop' into updateTestPath

cl-refactor
CJentzsch 10 years ago
parent
commit
ce75f125f1
  1. 35
      libethereum/BlockChainSync.cpp
  2. 77
      libethereum/BlockChainSync.h

35
libethereum/BlockChainSync.cpp

@ -52,6 +52,7 @@ BlockChainSync::BlockChainSync(EthereumHost& _host):
BlockChainSync::~BlockChainSync() BlockChainSync::~BlockChainSync()
{ {
RecursiveGuard l(x_sync);
abortSync(); abortSync();
} }
@ -67,8 +68,10 @@ DownloadMan& BlockChainSync::downloadMan()
void BlockChainSync::abortSync() void BlockChainSync::abortSync()
{ {
DEV_INVARIANT_CHECK;
host().foreachPeer([this](EthereumPeer* _p) { onPeerAborting(_p); return true; }); host().foreachPeer([this](EthereumPeer* _p) { onPeerAborting(_p); return true; });
downloadMan().resetToChain(h256s()); downloadMan().resetToChain(h256s());
DEV_INVARIANT_CHECK;
} }
void BlockChainSync::onPeerStatus(EthereumPeer* _peer) void BlockChainSync::onPeerStatus(EthereumPeer* _peer)
@ -87,14 +90,14 @@ void BlockChainSync::onPeerStatus(EthereumPeer* _peer)
_peer->disable("Peer banned for previous bad behaviour."); _peer->disable("Peer banned for previous bad behaviour.");
else else
{ {
unsigned estimatedHashes = estimateHashes(); unsigned hashes = estimatedHashes();
_peer->m_expectedHashes = estimatedHashes; _peer->m_expectedHashes = hashes;
onNewPeer(_peer); onNewPeer(_peer);
} }
DEV_INVARIANT_CHECK; DEV_INVARIANT_CHECK;
} }
unsigned BlockChainSync::estimateHashes() const unsigned BlockChainSync::estimatedHashes() const
{ {
BlockInfo block = host().chain().info(); BlockInfo block = host().chain().info();
time_t lastBlockTime = (block.hash() == host().chain().genesisHash()) ? 1428192000 : (time_t)block.timestamp; time_t lastBlockTime = (block.hash() == host().chain().genesisHash()) ? 1428192000 : (time_t)block.timestamp;
@ -113,7 +116,6 @@ void BlockChainSync::requestBlocks(EthereumPeer* _peer)
if (host().bq().knownFull()) if (host().bq().knownFull())
{ {
clog(NetAllDetail) << "Waiting for block queue before downloading blocks"; clog(NetAllDetail) << "Waiting for block queue before downloading blocks";
m_lastActiveState = m_state;
pauseSync(); pauseSync();
_peer->setIdle(); _peer->setIdle();
return; return;
@ -233,10 +235,8 @@ void BlockChainSync::onPeerBlocks(EthereumPeer* _peer, RLP const& _r)
{ {
if (downloadMan().isComplete()) if (downloadMan().isComplete())
completeSync(); completeSync();
else if (!got)
requestBlocks(_peer);
else else
peerDoneBlocks(_peer); requestBlocks(_peer); // Some of the blocks might have been downloaded by helping peers, proceed anyway
} }
DEV_INVARIANT_CHECK; DEV_INVARIANT_CHECK;
} }
@ -636,7 +636,9 @@ void PV60Sync::noteDoneBlocks(EthereumPeer* _peer, bool _clemency)
// m_banned.insert(_peer->session()->id()); // We know who you are! // m_banned.insert(_peer->session()->id()); // We know who you are!
// _peer->disable("Peer sent hashes but was unable to provide the blocks."); // _peer->disable("Peer sent hashes but was unable to provide the blocks.");
} }
resetSync();
downloadMan().reset(); downloadMan().reset();
transition(_peer, SyncState::Idle);
} }
_peer->m_sub.doneFetch(); _peer->m_sub.doneFetch();
} }
@ -648,7 +650,7 @@ void PV60Sync::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes)
_peer->setIdle(); _peer->setIdle();
if (!isSyncing(_peer)) if (!isSyncing(_peer))
{ {
clog(NetMessageSummary) << "Ignoring hashes synce not syncing"; clog(NetMessageSummary) << "Ignoring hashes since not syncing";
return; return;
} }
if (_hashes.size() == 0) if (_hashes.size() == 0)
@ -705,6 +707,7 @@ void PV60Sync::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes)
clog(NetMessageSummary) << "Ignoring since we're already downloading."; clog(NetMessageSummary) << "Ignoring since we're already downloading.";
return; return;
} }
clog(NetMessageDetail) << "Not syncing and new block hash discovered: syncing without help.";
unsigned knowns = 0; unsigned knowns = 0;
unsigned unknowns = 0; unsigned unknowns = 0;
for (auto const& h: _hashes) for (auto const& h: _hashes)
@ -741,6 +744,7 @@ void PV60Sync::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes)
void PV60Sync::abortSync(EthereumPeer* _peer) void PV60Sync::abortSync(EthereumPeer* _peer)
{ {
// Can't check invariants here since the peers is already removed from the list and the state is not updated yet.
if (isSyncing(_peer)) if (isSyncing(_peer))
{ {
host().foreachPeer([this](EthereumPeer* _p) { _p->setIdle(); return true; }); host().foreachPeer([this](EthereumPeer* _p) { _p->setIdle(); return true; });
@ -751,6 +755,8 @@ void PV60Sync::abortSync(EthereumPeer* _peer)
void PV60Sync::onPeerAborting(EthereumPeer* _peer) void PV60Sync::onPeerAborting(EthereumPeer* _peer)
{ {
RecursiveGuard l(x_sync);
// Can't check invariants here since the peers is already removed from the list and the state is not updated yet.
abortSync(_peer); abortSync(_peer);
DEV_INVARIANT_CHECK; DEV_INVARIANT_CHECK;
} }
@ -767,6 +773,10 @@ bool PV60Sync::invariants() const
host().foreachPeer([&](EthereumPeer* _p) { if (_p->m_asking == Asking::Hashes) hashes = true; return !hashes; }); host().foreachPeer([&](EthereumPeer* _p) { if (_p->m_asking == Asking::Hashes) hashes = true; return !hashes; });
if (!hashes) if (!hashes)
return false; return false;
if (!m_syncingLatestHash)
return false;
if (m_syncingNeededBlocks.empty() != (!m_syncingLastReceivedHash))
return false;
} }
if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks) if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks)
{ {
@ -777,5 +787,14 @@ bool PV60Sync::invariants() const
if (downloadMan().isComplete()) if (downloadMan().isComplete())
return false; return false;
} }
if (m_state == SyncState::Idle)
{
bool busy = false;
host().foreachPeer([&](EthereumPeer* _p) { if (_p->m_asking != Asking::Nothing && _p->m_asking != Asking::State) busy = true; return !busy; });
if (busy)
return false;
}
if (m_state == SyncState::Waiting && !host().bq().isActive())
return false;
return true; return true;
} }

77
libethereum/BlockChainSync.h

@ -109,16 +109,15 @@ protected:
EthereumHost const& host() const { return m_host; } EthereumHost const& host() const { return m_host; }
/// Estimates max number of hashes peers can give us. /// Estimates max number of hashes peers can give us.
unsigned estimateHashes() const; unsigned estimatedHashes() const;
/// Request blocks from peer if needed /// Request blocks from peer if needed
void requestBlocks(EthereumPeer* _peer); void requestBlocks(EthereumPeer* _peer);
protected: protected:
Handler m_bqRoomAvailable; Handler m_bqRoomAvailable; ///< Triggered once block queue
mutable RecursiveMutex x_sync; mutable RecursiveMutex x_sync;
SyncState m_state = SyncState::Idle; ///< Current sync state SyncState m_state = SyncState::Idle; ///< Current sync state
SyncState m_lastActiveState = SyncState::Idle; ///< Saved state before entering waiting queue mode
unsigned m_estimatedHashes = 0; ///< Number of estimated hashes for the last peer over PV60. Used for status reporting only. unsigned m_estimatedHashes = 0; ///< Number of estimated hashes for the last peer over PV60. Used for status reporting only.
private: private:
@ -133,6 +132,70 @@ private:
* @brief Syncrhonization over PV60. Selects a single peer and tries to downloading hashes from it. After hash downaload is complete * @brief Syncrhonization over PV60. Selects a single peer and tries to downloading hashes from it. After hash downaload is complete
* Syncs to peers and keeps up to date * Syncs to peers and keeps up to date
*/ */
/**
* Transitions:
*
* Idle->Hashes
* Triggered when:
* * A new peer appears that we can sync to
* * Transtition to Idle, there are peers we can sync to
* Effects:
* * Set chain sync (m_syncingTotalDifficulty, m_syncingLatestHash, m_syncer)
* * Requests hashes from m_syncer
*
* Hashes->Idle
* Triggered when:
* * Received too many hashes
* * Received 0 total hashes from m_syncer
* * m_syncer aborts
* Effects:
* In case of too many hashes sync is reset
*
* Hashes->Blocks
* Triggered when:
* * Received known hash from m_syncer
* * Received 0 hashes from m_syncer and m_syncingTotalBlocks not empty
* Effects:
* * Set up download manager, clear m_syncingTotalBlocks. Set all peers to help with downloading if they can
*
* Blocks->Idle
* Triggered when:
* * m_syncer aborts
* * m_syncer does not have required block
* * All blocks downloaded
* * Block qeueue is full with unknown blocks
* Effects:
* * Download manager is reset
*
* Blocks->Waiting
* Triggered when:
* * Block queue is full with known blocks
* Effects:
* * Stop requesting blocks from peers
*
* Waiting->Blocks
* Triggered when:
* * Block queue has space for new blocks
* Effects:
* * Continue requesting blocks from peers
*
* Idle->NewBlocks
* Triggered when:
* * New block hashes arrive
* Effects:
* * Set up download manager, clear m_syncingTotalBlocks. Download blocks from a single peer. If downloaded blocks have unknown parents, set the peer to sync
*
* NewBlocks->Idle
* Triggered when:
* * m_syncer aborts
* * m_syncer does not have required block
* * All new blocks downloaded
* * Block qeueue is full with unknown blocks
* Effects:
* * Download manager is reset
*
*/
class PV60Sync: public BlockChainSync class PV60Sync: public BlockChainSync
{ {
public: public:
@ -153,6 +216,7 @@ public:
/// @returns Sync status /// @returns Sync status
SyncStatus status() const override; SyncStatus status() const override;
protected:
void onNewPeer(EthereumPeer* _peer) override; void onNewPeer(EthereumPeer* _peer) override;
void continueSync() override; void continueSync() override;
void peerDoneBlocks(EthereumPeer* _peer) override; void peerDoneBlocks(EthereumPeer* _peer) override;
@ -205,9 +269,10 @@ private:
h256s m_syncingNeededBlocks; ///< The blocks that we should download from this peer. h256s m_syncingNeededBlocks; ///< The blocks that we should download from this peer.
h256 m_syncingLastReceivedHash; ///< Hash most recently received from peer. h256 m_syncingLastReceivedHash; ///< Hash most recently received from peer.
h256 m_syncingLatestHash; ///< Peer's latest block's hash, as of the current sync. h256 m_syncingLatestHash; ///< Latest block's hash of the peer we are syncing to, as of the current sync.
u256 m_syncingTotalDifficulty; ///< Peer's latest block's total difficulty, as of the current sync. u256 m_syncingTotalDifficulty; ///< Latest block's total difficulty of the peer we aresyncing to, as of the current sync.
EthereumPeer* m_syncer = nullptr; // TODO: switch to weak_ptr // TODO: switch to weak_ptr
EthereumPeer* m_syncer = nullptr; ///< Peer we are currently syncing with
}; };
} }
} }

Loading…
Cancel
Save