Browse Source

fixed a race condition on peer aborting, style

cl-refactor
arkpar 9 years ago
parent
commit
3b2f8f21ea
  1. 41
      libethereum/BlockChainSync.cpp
  2. 21
      libethereum/BlockChainSync.h

41
libethereum/BlockChainSync.cpp

@ -52,6 +52,7 @@ BlockChainSync::BlockChainSync(EthereumHost& _host):
BlockChainSync::~BlockChainSync()
{
RecursiveGuard l(x_sync);
abortSync();
}
@ -67,8 +68,10 @@ DownloadMan& BlockChainSync::downloadMan()
void BlockChainSync::abortSync()
{
DEV_INVARIANT_CHECK;
host().foreachPeer([this](EthereumPeer* _p) { onPeerAborting(_p); return true; });
downloadMan().resetToChain(h256s());
DEV_INVARIANT_CHECK;
}
void BlockChainSync::onPeerStatus(EthereumPeer* _peer)
@ -87,14 +90,14 @@ void BlockChainSync::onPeerStatus(EthereumPeer* _peer)
_peer->disable("Peer banned for previous bad behaviour.");
else
{
unsigned estimatedHashes = estimateHashes();
_peer->m_expectedHashes = estimatedHashes;
unsigned hashes = estimatedHashes();
_peer->m_expectedHashes = hashes;
onNewPeer(_peer);
}
DEV_INVARIANT_CHECK;
}
unsigned BlockChainSync::estimateHashes()
unsigned BlockChainSync::estimatedHashes() const
{
BlockInfo block = host().chain().info();
time_t lastBlockTime = (block.hash() == host().chain().genesisHash()) ? 1428192000 : (time_t)block.timestamp;
@ -113,7 +116,6 @@ void BlockChainSync::requestBlocks(EthereumPeer* _peer)
if (host().bq().knownFull())
{
clog(NetAllDetail) << "Waiting for block queue before downloading blocks";
m_lastActiveState = m_state;
pauseSync();
_peer->setIdle();
return;
@ -233,10 +235,8 @@ void BlockChainSync::onPeerBlocks(EthereumPeer* _peer, RLP const& _r)
{
if (downloadMan().isComplete())
completeSync();
else if (!got)
requestBlocks(_peer);
else
peerDoneBlocks(_peer);
requestBlocks(_peer); // Some of the blocks might have been downloaded by helping peers, proceed anyway
}
DEV_INVARIANT_CHECK;
}
@ -478,12 +478,12 @@ void PV60Sync::transition(EthereumPeer* _peer, SyncState _s, bool _force, bool _
clog(NetWarn) << "Invalid state transition:" << EthereumHost::stateName(_s) << "from" << EthereumHost::stateName(m_state) << ", " << (isSyncing(_peer) ? "syncing" : "holding") << (needsSyncing(_peer) ? "& needed" : "");
}
void PV60Sync::resetSyncFor(EthereumPeer* _peer, h256 _latestHash, u256 _td)
void PV60Sync::resetSyncFor(EthereumPeer* _peer, h256 const& _latestHash, u256 const& _td)
{
setNeedsSyncing(_peer, _latestHash, _td);
}
void PV60Sync::setNeedsSyncing(EthereumPeer* _peer, h256 _latestHash, u256 _td)
void PV60Sync::setNeedsSyncing(EthereumPeer* _peer, h256 const& _latestHash, u256 const& _td)
{
_peer->m_latestHash = _latestHash;
_peer->m_totalDifficulty = _td;
@ -636,7 +636,9 @@ void PV60Sync::noteDoneBlocks(EthereumPeer* _peer, bool _clemency)
// m_banned.insert(_peer->session()->id()); // We know who you are!
// _peer->disable("Peer sent hashes but was unable to provide the blocks.");
}
resetSync();
downloadMan().reset();
transition(_peer, SyncState::Idle);
}
_peer->m_sub.doneFetch();
}
@ -648,7 +650,7 @@ void PV60Sync::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes)
_peer->setIdle();
if (!isSyncing(_peer))
{
clog(NetMessageSummary) << "Ignoring hashes synce not syncing";
clog(NetMessageSummary) << "Ignoring hashes since not syncing";
return;
}
if (_hashes.size() == 0)
@ -705,9 +707,10 @@ void PV60Sync::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes)
clog(NetMessageSummary) << "Ignoring since we're already downloading.";
return;
}
clog(NetMessageDetail) << "Not syncing and new block hash discovered: syncing without help.";
unsigned knowns = 0;
unsigned unknowns = 0;
for (auto h: _hashes)
for (auto const& h: _hashes)
{
_peer->addRating(1);
DEV_GUARDED(_peer->x_knownBlocks)
@ -741,6 +744,7 @@ void PV60Sync::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes)
void PV60Sync::abortSync(EthereumPeer* _peer)
{
// Can't check invariants here since the peers is already removed from the list and the state is not updated yet.
if (isSyncing(_peer))
{
host().foreachPeer([this](EthereumPeer* _p) { _p->setIdle(); return true; });
@ -751,6 +755,8 @@ void PV60Sync::abortSync(EthereumPeer* _peer)
void PV60Sync::onPeerAborting(EthereumPeer* _peer)
{
RecursiveGuard l(x_sync);
// Can't check invariants here since the peers is already removed from the list and the state is not updated yet.
abortSync(_peer);
DEV_INVARIANT_CHECK;
}
@ -767,6 +773,10 @@ bool PV60Sync::invariants() const
host().foreachPeer([&](EthereumPeer* _p) { if (_p->m_asking == Asking::Hashes) hashes = true; return !hashes; });
if (!hashes)
return false;
if (!m_syncingLatestHash)
return false;
if (m_syncingNeededBlocks.empty() != (!m_syncingLastReceivedHash))
return false;
}
if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks)
{
@ -777,5 +787,14 @@ bool PV60Sync::invariants() const
if (downloadMan().isComplete())
return false;
}
if (m_state == SyncState::Idle)
{
bool busy = false;
host().foreachPeer([&](EthereumPeer* _p) { if (_p->m_asking != Asking::Nothing && _p->m_asking != Asking::State) busy = true; return !busy; });
if (busy)
return false;
}
if (m_state == SyncState::Waiting && !host().bq().isActive())
return false;
return true;
}

21
libethereum/BlockChainSync.h

@ -103,29 +103,27 @@ protected:
virtual void pauseSync() = 0;
/// Restart sync for given peer
virtual void resetSyncFor(EthereumPeer* _peer, h256 _latestHash, u256 _td) = 0;
virtual void resetSyncFor(EthereumPeer* _peer, h256 const& _latestHash, u256 const& _td) = 0;
EthereumHost& host() { return m_host; }
EthereumHost const& host() const { return m_host; }
/// Estimates max number of hashes peers can give us.
unsigned estimateHashes();
unsigned estimatedHashes() const;
/// Request blocks from peer if needed
void requestBlocks(EthereumPeer* _peer);
Handler m_bqRoomAvailable; ///< Triggered once block queue
mutable RecursiveMutex x_sync;
SyncState m_state = SyncState::Idle; ///< Current sync state
unsigned m_estimatedHashes = 0; ///< Number of estimated hashes for the last peer over PV60. Used for status reporting only.
private:
static char const* const s_stateNames[static_cast<int>(SyncState::Size)];
bool invariants() const override = 0;
EthereumHost& m_host;
HashDownloadMan m_hashMan;
protected:
Handler m_bqRoomAvailable;
mutable RecursiveMutex x_sync;
SyncState m_state = SyncState::Idle; ///< Current sync state
SyncState m_lastActiveState = SyncState::Idle; ///< Saved state before entering waiting queue mode
unsigned m_estimatedHashes = 0; ///< Number of estimated hashes for the last peer over PV60. Used for status reporting only.
};
@ -153,13 +151,14 @@ public:
/// @returns Sync status
SyncStatus status() const override;
protected:
void onNewPeer(EthereumPeer* _peer) override;
void continueSync() override;
void peerDoneBlocks(EthereumPeer* _peer) override;
void restartSync() override;
void completeSync() override;
void pauseSync() override;
void resetSyncFor(EthereumPeer* _peer, h256 _latestHash, u256 _td) override;
void resetSyncFor(EthereumPeer* _peer, h256 const& _latestHash, u256 const& _td) override;
private:
/// Transition sync state in a particular direction. @param _peer Peer that is responsible for state tranfer
@ -169,7 +168,7 @@ private:
void resetNeedsSyncing(EthereumPeer* _peer) { setNeedsSyncing(_peer, h256(), 0); }
/// Update peer syncing requirements state.
void setNeedsSyncing(EthereumPeer* _peer, h256 _latestHash, u256 _td);
void setNeedsSyncing(EthereumPeer* _peer, h256 const& _latestHash, u256 const& _td);
/// Do we presently need syncing with this peer?
bool needsSyncing(EthereumPeer* _peer) const;

Loading…
Cancel
Save