@ -38,7 +38,8 @@ using namespace dev;
using namespace dev : : eth ;
using namespace p2p ;
unsigned const c_chainReorgSize = 30000 ;
unsigned const c_chainReorgSize = 30000 ; /// Added to estimated hashes to account for potential chain reorganiation
unsigned const c_hashSubchainSize = 8192 ; /// PV61 subchain size
BlockChainSync : : BlockChainSync ( EthereumHost & _host ) :
m_host ( _host )
@ -75,15 +76,18 @@ void BlockChainSync::onPeerStatus(std::shared_ptr<EthereumPeer> _peer)
{
RecursiveGuard l ( x_sync ) ;
DEV_INVARIANT_CHECK ;
std : : shared_ptr < Session > session = _peer - > session ( ) ;
if ( ! session )
return ; // Expired
if ( _peer - > m_genesisHash ! = host ( ) . chain ( ) . genesisHash ( ) )
_peer - > disable ( " Invalid genesis hash " ) ;
else if ( _peer - > m_protocolVersion ! = host ( ) . protocolVersion ( ) & & _peer - > m_protocolVersion ! = EthereumHost : : c_oldProtocolVersion )
_peer - > disable ( " Invalid protocol version. " ) ;
else if ( _peer - > m_networkId ! = host ( ) . networkId ( ) )
_peer - > disable ( " Invalid network identifier. " ) ;
else if ( _peer - > session ( ) - > info ( ) . clientVersion . find ( " /v0.7.0/ " ) ! = string : : npos )
else if ( session - > info ( ) . clientVersion . find ( " /v0.7.0/ " ) ! = string : : npos )
_peer - > disable ( " Blacklisted client version. " ) ;
else if ( host ( ) . isBanned ( _peer - > session ( ) - > id ( ) ) )
else if ( host ( ) . isBanned ( session - > id ( ) ) )
_peer - > disable ( " Peer banned for previous bad behaviour. " ) ;
else
{
@ -91,7 +95,6 @@ void BlockChainSync::onPeerStatus(std::shared_ptr<EthereumPeer> _peer)
_peer - > m_expectedHashes = hashes ;
onNewPeer ( _peer ) ;
}
DEV_INVARIANT_CHECK ;
}
unsigned BlockChainSync : : estimatedHashes ( ) const
@ -114,7 +117,6 @@ void BlockChainSync::requestBlocks(std::shared_ptr<EthereumPeer> _peer)
{
clog ( NetAllDetail ) < < " Waiting for block queue before downloading blocks " ;
pauseSync ( ) ;
_peer - > setIdle ( ) ;
return ;
}
_peer - > requestBlocks ( ) ;
@ -137,13 +139,14 @@ void BlockChainSync::logNewBlock(h256 const& _h)
void BlockChainSync : : onPeerBlocks ( std : : shared_ptr < EthereumPeer > _peer , RLP const & _r )
{
RecursiveGuard l ( x_sync ) ;
DEV_INVARIANT_CHECK ;
unsigned itemCount = _r . itemCount ( ) ;
clog ( NetMessageSummary ) < < " Blocks ( " < < dec < < itemCount < < " entries) " < < ( itemCount ? " " : " : NoMoreBlocks " ) ;
_peer - > setIdle ( ) ;
if ( m_state ! = SyncState : : Blocks & & m_state ! = SyncState : : NewBlocks & & m_state ! = SyncState : : Waiting )
clog ( NetWarn ) < < " Unexpected Blocks received! " ;
{
clog ( NetMessageSummary ) < < " Ignoring unexpected blocks " ;
return ;
}
if ( m_state = = SyncState : : Waiting )
{
clog ( NetAllDetail ) < < " Ignored blocks while waiting " ;
@ -184,6 +187,7 @@ void BlockChainSync::onPeerBlocks(std::shared_ptr<EthereumPeer> _peer, RLP const
case ImportResult : : BadChain :
logNewBlock ( h ) ;
_peer - > disable ( " Malformed block received. " ) ;
restartSync ( ) ;
return ;
case ImportResult : : FutureTimeKnown :
@ -282,7 +286,7 @@ void BlockChainSync::onPeerNewBlock(std::shared_ptr<EthereumPeer> _peer, RLP con
case ImportResult : : FutureTimeUnknown :
case ImportResult : : UnknownParent :
logNewBlock ( h ) ;
clog ( NetMessageSummary ) < < " Received block with no known parent. Resyncing... " ;
clog ( NetMessageDetail ) < < " Received block with no known parent. Resyncing... " ;
resetSyncFor ( _peer , h , _r [ 1 ] . toInt < u256 > ( ) ) ;
break ;
default : ;
@ -291,7 +295,6 @@ void BlockChainSync::onPeerNewBlock(std::shared_ptr<EthereumPeer> _peer, RLP con
DEV_GUARDED ( _peer - > x_knownBlocks )
_peer - > m_knownBlocks . insert ( h ) ;
}
DEV_INVARIANT_CHECK ;
}
PV60Sync : : PV60Sync ( EthereumHost & _host ) :
@ -305,6 +308,7 @@ SyncStatus PV60Sync::status() const
RecursiveGuard l ( x_sync ) ;
SyncStatus res ;
res . state = m_state ;
res . protocolVersion = EthereumHost : : c_oldProtocolVersion ;
if ( m_state = = SyncState : : Hashes )
{
res . hashesTotal = m_estimatedHashes ;
@ -344,26 +348,30 @@ void PV60Sync::restartSync()
{
resetSync ( ) ;
host ( ) . bq ( ) . clear ( ) ;
if ( isSyncing ( ) )
transition ( m_syncer . lock ( ) , SyncState : : Idle ) ;
std : : shared_ptr < EthereumPeer > syncer = m_syncer . lock ( ) ;
if ( syncer )
transition ( syncer , SyncState : : Idle ) ;
}
void PV60Sync : : completeSync ( )
{
if ( isSyncing ( ) )
transition ( m_syncer . lock ( ) , SyncState : : Idle ) ;
std : : shared_ptr < EthereumPeer > syncer = m_syncer . lock ( ) ;
if ( syncer )
transition ( syncer , SyncState : : Idle ) ;
}
void PV60Sync : : pauseSync ( )
{
if ( isSyncing ( ) )
setState ( m_syncer . lock ( ) , SyncState : : Waiting , true ) ;
std : : shared_ptr < EthereumPeer > syncer = m_syncer . lock ( ) ;
if ( syncer )
transition ( syncer , SyncState : : Waiting , true ) ;
}
void PV60Sync : : continueSync ( )
{
if ( isSyncing ( ) )
transition ( m_syncer . lock ( ) , SyncState : : Blocks ) ;
std : : shared_ptr < EthereumPeer > syncer = m_syncer . lock ( ) ;
if ( syncer )
transition ( syncer , SyncState : : Blocks ) ;
}
void PV60Sync : : onNewPeer ( std : : shared_ptr < EthereumPeer > _peer )
@ -381,26 +389,10 @@ void PV60Sync::transition(std::shared_ptr<EthereumPeer> _peer, SyncState _s, boo
RLPStream s ;
if ( _s = = SyncState : : Hashes )
{
if ( m_state = = SyncState : : Idle )
{
if ( isSyncing ( _peer ) )
clog ( NetWarn ) < < " Bad state: not asking for Hashes, yet syncing! " ;
m_syncingLatestHash = _peer - > m_latestHash ;
m_syncingTotalDifficulty = _peer - > m_totalDifficulty ;
setState ( _peer , _s , true ) ;
_peer - > requestHashes ( m_syncingLastReceivedHash ? m_syncingLastReceivedHash : m_syncingLatestHash ) ;
DEV_INVARIANT_CHECK ;
return ;
}
else if ( m_state = = SyncState : : Hashes )
if ( m_state = = SyncState : : Idle | | m_state = = SyncState : : Hashes )
{
if ( ! isSyncing ( _peer ) )
clog ( NetWarn ) < < " Bad state: asking for Hashes yet not syncing! " ;
setState ( _peer , _s , true ) ;
_peer - > requestHashes ( m_syncingLastReceivedHash ) ;
DEV_INVARIANT_CHECK ;
m_estimatedHashes = _peer - > m_expectedHashes - c_chainReorgSize ;
syncHashes ( _peer ) ;
return ;
}
}
@ -462,7 +454,6 @@ void PV60Sync::transition(std::shared_ptr<EthereumPeer> _peer, SyncState _s, boo
}
else if ( _s = = SyncState : : Idle )
{
host ( ) . foreachPeer ( [ this ] ( std : : shared_ptr < EthereumPeer > _p ) { _p - > setIdle ( ) ; return true ; } ) ;
if ( m_state = = SyncState : : Blocks | | m_state = = SyncState : : NewBlocks )
{
clog ( NetMessageDetail ) < < " Finishing blocks fetch... " ;
@ -473,7 +464,6 @@ void PV60Sync::transition(std::shared_ptr<EthereumPeer> _peer, SyncState _s, boo
// NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary.
_peer - > m_sub . doneFetch ( ) ;
_peer - > setIdle ( ) ;
setState ( _peer , SyncState : : Idle , false ) ;
}
else if ( m_state = = SyncState : : Hashes )
@ -502,7 +492,9 @@ void PV60Sync::setNeedsSyncing(std::shared_ptr<EthereumPeer> _peer, h256 const&
if ( _peer - > m_latestHash )
noteNeedsSyncing ( _peer ) ;
_peer - > session ( ) - > addNote ( " sync " , string ( isSyncing ( _peer ) ? " ongoing " : " holding " ) + ( needsSyncing ( _peer ) ? " & needed " : " " ) ) ;
shared_ptr < Session > session = _peer - > session ( ) ;
if ( session )
session - > addNote ( " sync " , string ( isSyncing ( _peer ) ? " ongoing " : " holding " ) + ( needsSyncing ( _peer ) ? " & needed " : " " ) ) ;
}
bool PV60Sync : : needsSyncing ( std : : shared_ptr < EthereumPeer > _peer ) const
@ -562,7 +554,6 @@ void PV60Sync::attemptSync(std::shared_ptr<EthereumPeer> _peer)
else
{
clog ( NetAllDetail ) < < " Yes. Their chain is better. " ;
m_estimatedHashes = _peer - > m_expectedHashes - c_chainReorgSize ;
transition ( _peer , SyncState : : Hashes ) ;
}
}
@ -573,7 +564,9 @@ void PV60Sync::noteNeedsSyncing(std::shared_ptr<EthereumPeer> _peer)
if ( isSyncing ( ) )
{
clog ( NetAllDetail ) < < " Sync in progress: Just set to help out. " ;
if ( m_state = = SyncState : : Blocks )
if ( m_state = = SyncState : : Hashes & & _peer - > m_asking = = Asking : : Nothing )
requestSubchain ( _peer ) ;
else if ( m_state = = SyncState : : Blocks )
requestBlocks ( _peer ) ;
}
else
@ -649,20 +642,45 @@ void PV60Sync::noteDoneBlocks(std::shared_ptr<EthereumPeer> _peer, bool _clemenc
}
resetSync ( ) ;
downloadMan ( ) . reset ( ) ;
}
_peer - > m_sub . doneFetch ( ) ;
}
void PV60Sync : : syncHashes ( std : : shared_ptr < EthereumPeer > _peer )
{
if ( m_state = = SyncState : : Idle )
{
if ( isSyncing ( _peer ) )
clog ( NetWarn ) < < " Bad state: not asking for Hashes, yet syncing! " ;
m_syncingLatestHash = _peer - > m_latestHash ;
m_syncingTotalDifficulty = _peer - > m_totalDifficulty ;
setState ( _peer , SyncState : : Hashes , true ) ;
_peer - > requestHashes ( m_syncingLastReceivedHash ? m_syncingLastReceivedHash : m_syncingLatestHash ) ;
}
else if ( m_state = = SyncState : : Hashes )
{
if ( ! isSyncing ( _peer ) )
clog ( NetWarn ) < < " Bad state: asking for Hashes yet not syncing! " ;
setState ( _peer , SyncState : : Hashes , true ) ;
_peer - > requestHashes ( m_syncingLastReceivedHash ) ;
}
}
void PV60Sync : : onPeerHashes ( std : : shared_ptr < EthereumPeer > _peer , h256s const & _hashes )
{
RecursiveGuard l ( x_sync ) ;
DEV_INVARIANT_CHECK ;
_peer - > setIdle ( ) ;
if ( ! isSyncing ( _peer ) )
{
clog ( NetMessageSummary ) < < " Ignoring hashes since not syncing " ;
return ;
}
if ( _peer - > m_syncHash ! = ( m_syncingLastReceivedHash ? m_syncingLastReceivedHash : m_syncingLatestHash ) )
{
clog ( NetMessageSummary ) < < " Ignoring unexpected hashes " ;
return ;
}
if ( _hashes . size ( ) = = 0 )
{
transition ( _peer , SyncState : : Blocks ) ;
@ -684,7 +702,8 @@ void PV60Sync::onPeerHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _h
else if ( status = = QueueStatus : : Bad )
{
cwarn < < " block hash bad! " < < h < < " . Bailing... " ;
transition ( _peer , SyncState : : Idle ) ;
_peer - > disable ( " Bad blocks " ) ;
restartSync ( ) ;
return ;
}
else if ( status = = QueueStatus : : Unknown )
@ -711,7 +730,6 @@ void PV60Sync::onPeerHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _h
void PV60Sync : : onPeerNewHashes ( std : : shared_ptr < EthereumPeer > _peer , h256s const & _hashes )
{
RecursiveGuard l ( x_sync ) ;
DEV_INVARIANT_CHECK ;
if ( isSyncing ( ) & & ( m_state ! = SyncState : : NewBlocks | | isSyncing ( _peer ) ) )
{
clog ( NetMessageSummary ) < < " Ignoring since we're already downloading. " ;
@ -769,8 +787,33 @@ void PV60Sync::onPeerNewHashes(std::shared_ptr<EthereumPeer> _peer, h256s const&
void PV60Sync : : abortSync ( )
{
// Can't check invariants here since the peers is already removed from the list and the state is not updated yet.
host ( ) . foreachPeer ( [ this ] ( std : : shared_ptr < EthereumPeer > _p ) { _p - > setIdle ( ) ; return true ; } ) ;
setState ( std : : shared_ptr < EthereumPeer > ( ) , SyncState : : Idle , false , true ) ;
bool continueSync = false ;
if ( m_state = = SyncState : : Blocks )
{
// Main syncer aborted, try to find a replacement
host ( ) . foreachPeer ( [ & ] ( std : : shared_ptr < EthereumPeer > _p )
{
if ( _p - > m_asking = = Asking : : Blocks )
{
setState ( _p , SyncState : : Blocks , true , true ) ; // will kick off other peers to help if available.
continueSync = true ;
return false ;
}
if ( _p - > m_asking = = Asking : : Nothing & & shouldGrabBlocks ( _p ) )
{
transition ( _p , SyncState : : Blocks ) ;
clog ( NetMessageDetail ) < < " New sync peer selected " ;
continueSync = true ;
return false ;
}
return true ;
} ) ;
}
if ( ! continueSync )
{
// Just set to idle. Hashchain is keept, Sync will be continued if there are more peers to sync with
setState ( std : : shared_ptr < EthereumPeer > ( ) , SyncState : : Idle , false , true ) ;
}
DEV_INVARIANT_CHECK ;
}
@ -778,37 +821,366 @@ void PV60Sync::onPeerAborting()
{
RecursiveGuard l ( x_sync ) ;
// Can't check invariants here since the peers is already removed from the list and the state is not updated yet.
if ( m_syncer . expired ( ) )
if ( m_syncer . expired ( ) & & m_state ! = SyncState : : Idle )
{
clog ( NetWarn ) < < " Syncing peer disconnected, restarting sync " ;
m_syncer . reset ( ) ;
abortSync ( ) ;
}
DEV_INVARIANT_CHECK ;
}
bool PV60Sync : : invariants ( ) const
{
if ( m_state = = SyncState : : Idle & & isSyncing ( ) )
return false ;
BOOST_THROW_EXCEPTION ( FailedInvariant ( ) < < errinfo_comment ( " Idle while peer syncing " ) ) ;
if ( m_state ! = SyncState : : Idle & & ! isSyncing ( ) )
return false ;
BOOST_THROW_EXCEPTION ( FailedInvariant ( ) < < errinfo_comment ( " Active while peer not syncing " ) ) ;
if ( m_state = = SyncState : : Hashes )
{
if ( ! m_syncingLatestHash )
return false ;
BOOST_THROW_EXCEPTION ( FailedInvariant ( ) < < errinfo_comment ( " m_syncingLatestHash is not set while downloading hashes " ) ) ;
if ( m_syncingNeededBlocks . empty ( ) ! = ( ! m_syncingLastReceivedHash ) )
return false ;
BOOST_THROW_EXCEPTION ( FailedInvariant ( ) < < errinfo_comment ( " Received hashes but the hashes list is empty (or the other way around) " ) ) ;
}
if ( m_state = = SyncState : : Blocks | | m_state = = SyncState : : NewBlocks )
{
if ( downloadMan ( ) . isComplete ( ) )
return false ;
BOOST_THROW_EXCEPTION ( FailedInvariant ( ) < < errinfo_comment ( " Block download complete but the state is still Blocks " ) ) ;
}
if ( m_state = = SyncState : : Waiting & & ! host ( ) . bq ( ) . isActive ( ) )
BOOST_THROW_EXCEPTION ( FailedInvariant ( ) < < errinfo_comment ( " Waiting while block queue is idle " ) ) ;
return true ;
}
PV61Sync : : PV61Sync ( EthereumHost & _host ) :
PV60Sync ( _host )
{
}
void PV61Sync : : syncHashes ( std : : shared_ptr < EthereumPeer > _peer )
{
if ( _peer - > m_protocolVersion ! = host ( ) . protocolVersion ( ) )
{
m_readyChainMap . clear ( ) ;
m_completeChainMap . clear ( ) ;
m_downloadingChainMap . clear ( ) ;
m_syncingBlockNumber = 0 ;
m_chainSyncPeers . clear ( ) ;
m_knownHashes . clear ( ) ;
m_hashScanComplete = false ;
PV60Sync : : syncHashes ( _peer ) ;
return ;
}
if ( m_state = = SyncState : : Idle )
{
bool busy = false ;
host ( ) . foreachPeer ( [ & ] ( std : : shared_ptr < EthereumPeer > _p ) { if ( _p - > m_asking ! = Asking : : Nothing & & _p - > m_asking ! = Asking : : State ) busy = true ; return ! busy ; } ) ;
if ( busy )
return false ;
if ( isSyncing ( _peer ) )
clog ( NetWarn ) < < " Bad state: not asking for Hashes, yet syncing! " ;
if ( m_syncingBlockNumber = = 0 )
m_syncingBlockNumber = host ( ) . chain ( ) . number ( ) + c_hashSubchainSize ;
m_syncingTotalDifficulty = _peer - > m_totalDifficulty ;
setState ( _peer , SyncState : : Hashes , true ) ;
_peer - > requestHashes ( m_syncingBlockNumber , 1 ) ;
}
if ( m_state = = SyncState : : Waiting & & ! host ( ) . bq ( ) . isActive ( ) )
return false ;
else if ( m_state = = SyncState : : Hashes )
{
if ( ! isSyncing ( _peer ) )
clog ( NetWarn ) < < " Bad state: asking for Hashes yet not syncing! " ;
m_syncingBlockNumber + = c_hashSubchainSize ;
setState ( _peer , SyncState : : Hashes , true ) ;
_peer - > requestHashes ( m_syncingBlockNumber , 1 ) ;
}
}
void PV61Sync : : requestSubchain ( std : : shared_ptr < EthereumPeer > _peer )
{
auto syncPeer = m_chainSyncPeers . find ( _peer ) ;
if ( syncPeer ! = m_chainSyncPeers . end ( ) )
{
// Already downoading, request next batch
h256s & d = m_downloadingChainMap . at ( syncPeer - > second ) ;
_peer - > requestHashes ( d . back ( ) ) ;
}
else if ( needsSyncing ( _peer ) )
{
if ( ! m_readyChainMap . empty ( ) )
{
clog ( NetAllDetail ) < < " Helping with hashchin download " ;
h256s & d = m_readyChainMap . begin ( ) - > second ;
_peer - > requestHashes ( d . back ( ) ) ;
m_downloadingChainMap [ m_readyChainMap . begin ( ) - > first ] = move ( d ) ;
m_chainSyncPeers [ _peer ] = m_readyChainMap . begin ( ) - > first ;
m_readyChainMap . erase ( m_readyChainMap . begin ( ) ) ;
}
else if ( ! m_downloadingChainMap . empty ( ) & & m_hashScanComplete )
{
// Lead syncer is done, just grab whatever we can
h256s & d = m_downloadingChainMap . begin ( ) - > second ;
_peer - > requestHashes ( d . back ( ) ) ;
}
}
}
void PV61Sync : : requestSubchains ( )
{
host ( ) . foreachPeer ( [ this ] ( std : : shared_ptr < EthereumPeer > _p )
{
if ( _p - > m_asking = = Asking : : Nothing )
requestSubchain ( _p ) ;
return true ;
} ) ;
}
void PV61Sync : : completeSubchain ( std : : shared_ptr < EthereumPeer > _peer , unsigned _n )
{
m_completeChainMap [ _n ] = move ( m_downloadingChainMap . at ( _n ) ) ;
m_downloadingChainMap . erase ( _n ) ;
for ( auto s = m_chainSyncPeers . begin ( ) ; s ! = m_chainSyncPeers . end ( ) ; + + s )
if ( s - > second = = _n ) //TODO: optimize this
{
m_chainSyncPeers . erase ( s ) ;
break ;
}
_peer - > m_syncHashNumber = 0 ;
auto syncer = m_syncer . lock ( ) ;
if ( ! syncer )
{
restartSync ( ) ;
return ;
}
if ( m_readyChainMap . empty ( ) & & m_downloadingChainMap . empty ( ) & & m_hashScanComplete )
{
//Done chain-get
m_syncingNeededBlocks . clear ( ) ;
for ( auto h = m_completeChainMap . rbegin ( ) ; h ! = m_completeChainMap . rend ( ) ; + + h )
m_syncingNeededBlocks . insert ( m_syncingNeededBlocks . end ( ) , h - > second . begin ( ) , h - > second . end ( ) ) ;
m_completeChainMap . clear ( ) ;
m_knownHashes . clear ( ) ;
m_syncingBlockNumber = 0 ;
transition ( syncer , SyncState : : Blocks ) ;
}
else
requestSubchain ( _peer ) ;
}
void PV61Sync : : restartSync ( )
{
m_completeChainMap . clear ( ) ;
m_readyChainMap . clear ( ) ;
m_downloadingChainMap . clear ( ) ;
m_chainSyncPeers . clear ( ) ;
m_syncingBlockNumber = 0 ;
m_knownHashes . clear ( ) ;
m_hashScanComplete = false ;
PV60Sync : : restartSync ( ) ;
}
void PV61Sync : : onPeerHashes ( std : : shared_ptr < EthereumPeer > _peer , h256s const & _hashes )
{
RecursiveGuard l ( x_sync ) ;
if ( m_syncingBlockNumber = = 0 | | ( _peer = = m_syncer . lock ( ) & & _peer - > m_protocolVersion ! = host ( ) . protocolVersion ( ) ) )
{
// Syncing in pv60 mode
PV60Sync : : onPeerHashes ( _peer , _hashes ) ;
return ;
}
if ( _hashes . size ( ) = = 0 )
{
if ( isSyncing ( _peer ) & & _peer - > m_syncHashNumber = = m_syncingBlockNumber )
{
// End of hash chain, add last chunk to download
m_readyChainMap . insert ( make_pair ( m_syncingBlockNumber , h256s { _peer - > m_latestHash } ) ) ;
m_hashScanComplete = true ;
_peer - > m_syncHashNumber = 0 ;
requestSubchain ( _peer ) ;
}
else
{
auto syncPeer = m_chainSyncPeers . find ( _peer ) ;
if ( syncPeer = = m_chainSyncPeers . end ( ) )
clog ( NetMessageDetail ) < < " Hashes response from unexpected peer " ;
else
{
// Peer does not have request hashes, move back from downloading to ready
unsigned number = syncPeer - > second ;
m_chainSyncPeers . erase ( _peer ) ;
m_readyChainMap [ number ] = move ( m_downloadingChainMap . at ( number ) ) ;
m_downloadingChainMap . erase ( number ) ;
resetNeedsSyncing ( _peer ) ;
requestSubchains ( ) ;
}
}
return ;
}
if ( isSyncing ( _peer ) & & _peer - > m_syncHashNumber = = m_syncingBlockNumber )
{
// Got new subchain marker
assert ( _hashes . size ( ) = = 1 ) ;
m_knownHashes . insert ( _hashes [ 0 ] ) ;
m_readyChainMap . insert ( make_pair ( m_syncingBlockNumber , h256s { _hashes [ 0 ] } ) ) ;
if ( ( m_readyChainMap . size ( ) + m_downloadingChainMap . size ( ) + m_completeChainMap . size ( ) ) * c_hashSubchainSize > _peer - > m_expectedHashes )
{
_peer - > disable ( " Too many hashes from lead peer " ) ;
restartSync ( ) ;
return ;
}
transition ( _peer , SyncState : : Hashes ) ;
requestSubchains ( ) ;
}
else
{
auto syncPeer = m_chainSyncPeers . find ( _peer ) ;
unsigned number = 0 ;
if ( syncPeer = = m_chainSyncPeers . end ( ) )
{
//check downlading peers
for ( auto const & downloader : m_downloadingChainMap )
if ( downloader . second . back ( ) = = _peer - > m_syncHash )
{
number = downloader . first ;
break ;
}
}
else
number = syncPeer - > second ;
if ( number = = 0 )
{
clog ( NetAllDetail ) < < " Hashes response from unexpected/expired peer " ;
return ;
}
auto downloadingPeer = m_downloadingChainMap . find ( number ) ;
if ( downloadingPeer = = m_downloadingChainMap . end ( ) | | downloadingPeer - > second . back ( ) ! = _peer - > m_syncHash )
{
// Too late, other peer has already downloaded our hashes
m_chainSyncPeers . erase ( _peer ) ;
requestSubchain ( _peer ) ;
return ;
}
h256s & hashes = downloadingPeer - > second ;
unsigned knowns = 0 ;
unsigned unknowns = 0 ;
for ( unsigned i = 0 ; i < _hashes . size ( ) ; + + i )
{
auto h = _hashes [ i ] ;
auto status = host ( ) . bq ( ) . blockStatus ( h ) ;
if ( status = = QueueStatus : : Importing | | status = = QueueStatus : : Ready | | host ( ) . chain ( ) . isKnown ( h ) | | ! ! m_knownHashes . count ( h ) )
{
clog ( NetMessageSummary ) < < " Subchain download complete " ;
m_chainSyncPeers . erase ( _peer ) ;
completeSubchain ( _peer , number ) ;
return ;
}
else if ( status = = QueueStatus : : Bad )
{
cwarn < < " block hash bad! " < < h < < " . Bailing... " ;
_peer - > disable ( " Bad hashes " ) ;
if ( isSyncing ( _peer ) )
restartSync ( ) ;
else
{
//try with other peer
m_readyChainMap [ number ] = move ( m_downloadingChainMap . at ( number ) ) ;
m_downloadingChainMap . erase ( number ) ;
m_chainSyncPeers . erase ( _peer ) ;
}
return ;
}
else if ( status = = QueueStatus : : Unknown )
{
unknowns + + ;
hashes . push_back ( h ) ;
}
else
knowns + + ;
}
clog ( NetMessageSummary ) < < knowns < < " knowns, " < < unknowns < < " unknowns; now at " < < hashes . back ( ) ;
if ( hashes . size ( ) > c_hashSubchainSize )
{
_peer - > disable ( " Too many subchain hashes " ) ;
restartSync ( ) ;
return ;
}
requestSubchain ( _peer ) ;
}
DEV_INVARIANT_CHECK ;
}
void PV61Sync : : onPeerAborting ( )
{
RecursiveGuard l ( x_sync ) ;
// Can't check invariants here since the peers is already removed from the list and the state is not updated yet.
for ( auto s = m_chainSyncPeers . begin ( ) ; s ! = m_chainSyncPeers . end ( ) ; )
{
if ( s - > first . expired ( ) )
{
unsigned number = s - > second ;
m_readyChainMap [ number ] = move ( m_downloadingChainMap . at ( number ) ) ;
m_downloadingChainMap . erase ( number ) ;
m_chainSyncPeers . erase ( s + + ) ;
}
else
+ + s ;
}
if ( m_syncer . expired ( ) )
{
if ( m_state = = SyncState : : Hashes )
{
// Main syncer aborted, other peers are probably still downloading hashes, just set one of them as syncer
host ( ) . foreachPeer ( [ & ] ( std : : shared_ptr < EthereumPeer > _p )
{
if ( _p - > m_asking ! = Asking : : Hashes )
return true ;
setState ( _p , SyncState : : Hashes , true , true ) ;
return false ;
} ) ;
}
if ( m_syncer . expired ( ) )
PV60Sync : : onPeerAborting ( ) ;
}
else if ( isPV61Syncing ( ) & & m_state = = SyncState : : Hashes )
requestSubchains ( ) ;
DEV_INVARIANT_CHECK ;
}
SyncStatus PV61Sync : : status ( ) const
{
RecursiveGuard l ( x_sync ) ;
SyncStatus res = PV60Sync : : status ( ) ;
if ( m_state = = SyncState : : Hashes & & isPV61Syncing ( ) )
{
res . protocolVersion = 61 ;
res . hashesReceived = 0 ;
for ( auto const & d : m_readyChainMap )
res . hashesReceived + = d . second . size ( ) ;
for ( auto const & d : m_downloadingChainMap )
res . hashesReceived + = d . second . size ( ) ;
for ( auto const & d : m_completeChainMap )
res . hashesReceived + = d . second . size ( ) ;
}
return res ;
}
bool PV61Sync : : isPV61Syncing ( ) const
{
return m_syncingBlockNumber ! = 0 ;
}
bool PV61Sync : : invariants ( ) const
{
if ( m_state = = SyncState : : Hashes )
{
if ( isPV61Syncing ( ) & & ! m_syncingBlockNumber )
BOOST_THROW_EXCEPTION ( FailedInvariant ( ) < < errinfo_comment ( " Syncing in PV61 with no block number set " ) ) ;
}
else if ( ! PV60Sync : : invariants ( ) )
return false ;
return true ;
}