@ -27,24 +27,34 @@
# include <libp2p/Session.h>
# include "BlockChain.h"
# include "EthereumHost.h"
# include "TransactionQueue.h"
# include "BlockQueue.h"
using namespace std ;
using namespace dev ;
using namespace dev : : eth ;
using namespace p2p ;
# if defined(clogS)
# undef clogS
# endif
# define clogS(X) dev::LogOutputStream<X, true>(false) << "| " << std::setw(2) << session()->socketId() << "] "
EthereumPeer : : EthereumPeer ( Session * _s , HostCapabilityFace * _h ) :
Capability ( _s , _h ) ,
EthereumPeer : : EthereumPeer ( Session * _s , HostCapabilityFace * _h , unsigned _i ) :
Capability ( _s , _h , _i ) ,
m_sub ( host ( ) - > m_man )
{
setGrabbing ( Grabbing : : State ) ;
sendStatus ( ) ;
transition ( Asking : : State ) ;
}
EthereumPeer : : ~ EthereumPeer ( )
{
giveUpOnFetch ( ) ;
abortSync ( ) ;
}
void EthereumPeer : : abortSync ( )
{
if ( isSyncing ( ) )
transition ( Asking : : Nothing , true ) ;
}
EthereumHost * EthereumPeer : : host ( ) const
@ -52,36 +62,207 @@ EthereumHost* EthereumPeer::host() const
return static_cast < EthereumHost * > ( Capability : : hostCapability ( ) ) ;
}
void EthereumPeer : : sendStatus ( )
/*
* Possible asking / syncing states for two peers :
*/
string toString ( Asking _a )
{
switch ( _a )
{
case Asking : : Blocks : return " Blocks " ;
case Asking : : Hashes : return " Hashes " ;
case Asking : : Nothing : return " Nothing " ;
case Asking : : State : return " State " ;
}
return " ? " ;
}
void EthereumPeer : : transition ( Asking _a , bool _force )
{
clogS ( NetMessageSummary ) < < " Transition! " < < : : toString ( _a ) < < " from " < < : : toString ( m_asking ) < < " , " < < ( isSyncing ( ) ? " syncing " : " holding " ) < < ( needsSyncing ( ) ? " & needed " : " " ) ;
RLPStream s ;
prep ( s ) ;
s . appendList ( 6 ) < < StatusPacket
< < host ( ) - > protocolVersion ( )
< < host ( ) - > networkId ( )
< < host ( ) - > m_chain . details ( ) . totalDifficulty
< < host ( ) - > m_chain . currentHash ( )
< < host ( ) - > m_chain . genesisHash ( ) ;
sealAndSend ( s ) ;
if ( _a = = Asking : : State )
{
if ( m_asking = = Asking : : Nothing )
{
setAsking ( Asking : : State , false ) ;
prep ( s , StatusPacket , 5 )
< < host ( ) - > protocolVersion ( )
< < host ( ) - > networkId ( )
< < host ( ) - > m_chain . details ( ) . totalDifficulty
< < host ( ) - > m_chain . currentHash ( )
< < host ( ) - > m_chain . genesisHash ( ) ;
sealAndSend ( s ) ;
return ;
}
}
else if ( _a = = Asking : : Hashes )
{
if ( m_asking = = Asking : : State | | m_asking = = Asking : : Nothing )
{
if ( isSyncing ( ) )
clogS ( NetWarn ) < < " Bad state: not asking for Hashes, yet syncing! " ;
m_syncingLatestHash = m_latestHash ;
m_syncingTotalDifficulty = m_totalDifficulty ;
resetNeedsSyncing ( ) ;
setAsking ( _a , true ) ;
prep ( s , GetBlockHashesPacket , 2 ) < < m_syncingLatestHash < < c_maxHashesAsk ;
m_syncingNeededBlocks = h256s ( 1 , m_syncingLatestHash ) ;
sealAndSend ( s ) ;
return ;
}
else if ( m_asking = = Asking : : Hashes )
{
if ( ! isSyncing ( ) )
clogS ( NetWarn ) < < " Bad state: asking for Hashes yet not syncing! " ;
setAsking ( _a , true ) ;
prep ( s , GetBlockHashesPacket , 2 ) < < m_syncingNeededBlocks . back ( ) < < c_maxHashesAsk ;
sealAndSend ( s ) ;
return ;
}
}
else if ( _a = = Asking : : Blocks )
{
if ( m_asking = = Asking : : Hashes )
{
if ( ! isSyncing ( ) )
clogS ( NetWarn ) < < " Bad state: asking for Hashes yet not syncing! " ;
if ( shouldGrabBlocks ( ) )
{
clog ( NetNote ) < < " Difficulty of hashchain HIGHER. Grabbing " < < m_syncingNeededBlocks . size ( ) < < " blocks [latest now " < < m_syncingLatestHash . abridged ( ) < < " , was " < < host ( ) - > m_latestBlockSent . abridged ( ) < < " ] " ;
host ( ) - > m_man . resetToChain ( m_syncingNeededBlocks ) ;
host ( ) - > m_latestBlockSent = m_syncingLatestHash ;
}
else
{
clog ( NetNote ) < < " Difficulty of hashchain not HIGHER. Ignoring. " ;
m_syncingLatestHash = h256 ( ) ;
setAsking ( Asking : : Nothing , false ) ;
return ;
}
}
// run through into...
if ( m_asking = = Asking : : Nothing | | m_asking = = Asking : : Hashes | | m_asking = = Asking : : Blocks )
{
// Looks like it's the best yet for total difficulty. Set to download.
setAsking ( Asking : : Blocks , true ) ; // will kick off other peers to help if available.
auto blocks = m_sub . nextFetch ( c_maxBlocksAsk ) ;
if ( blocks . size ( ) )
{
prep ( s , GetBlocksPacket , blocks . size ( ) ) ;
for ( auto const & i : blocks )
s < < i ;
sealAndSend ( s ) ;
}
else
transition ( Asking : : Nothing ) ;
return ;
}
}
else if ( _a = = Asking : : Nothing )
{
if ( m_asking = = Asking : : Blocks )
{
clogS ( NetNote ) < < " Finishing blocks fetch... " ;
// a bit overkill given that the other nodes may yet have the needed blocks, but better to be safe than sorry.
if ( isSyncing ( ) )
host ( ) - > noteDoneBlocks ( this , _force ) ;
// NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary.
m_sub . doneFetch ( ) ;
setAsking ( Asking : : Nothing , false ) ;
}
else if ( m_asking = = Asking : : Hashes )
{
clogS ( NetNote ) < < " Finishing hashes fetch... " ;
setAsking ( Asking : : Nothing , false ) ;
}
else if ( m_asking = = Asking : : State )
{
setAsking ( Asking : : Nothing , false ) ;
// Just got the state - should check to see if we can be of help downloading the chain if any.
// Otherwise, should put ourselves up for sync.
setNeedsSyncing ( m_latestHash , m_totalDifficulty ) ;
}
// Otherwise it's fine. We don't care if it's Nothing->Nothing.
return ;
}
clogS ( NetWarn ) < < " Invalid state transition: " < < : : toString ( _a ) < < " from " < < : : toString ( m_asking ) < < " , " < < ( isSyncing ( ) ? " syncing " : " holding " ) < < ( needsSyncing ( ) ? " & needed " : " " ) ;
}
void EthereumPeer : : startInitialSync ( )
void EthereumPeer : : setAsking ( Asking _a , bool _isSyncing )
{
// Grab transactions off them.
bool changedAsking = ( m_asking ! = _a ) ;
m_asking = _a ;
if ( _isSyncing ! = ( host ( ) - > m_syncer = = this ) | | ( _isSyncing & & changedAsking ) )
host ( ) - > changeSyncer ( _isSyncing ? this : nullptr ) ;
if ( ! _isSyncing )
{
RLPStream s ;
prep ( s ) . appendList ( 1 ) ;
s < < GetTransactionsPacket ;
sealAndSend ( s ) ;
m_syncingLatestHash = h256 ( ) ;
m_syncingTotalDifficulty = 0 ;
m_syncingNeededBlocks . clear ( ) ;
}
host ( ) - > noteHavePeerState ( this ) ;
session ( ) - > addNote ( " ask " , _a = = Asking : : Nothing ? " nothing " : _a = = Asking : : State ? " state " : _a = = Asking : : Hashes ? " hashes " : _a = = Asking : : Blocks ? " blocks " : " ? " ) ;
session ( ) - > addNote ( " sync " , string ( isSyncing ( ) ? " ongoing " : " holding " ) + ( needsSyncing ( ) ? " & needed " : " " ) ) ;
}
void EthereumPeer : : setNeedsSyncing ( h256 _latestHash , u256 _td )
{
m_latestHash = _latestHash ;
m_totalDifficulty = _td ;
if ( m_latestHash )
host ( ) - > noteNeedsSyncing ( this ) ;
session ( ) - > addNote ( " sync " , string ( isSyncing ( ) ? " ongoing " : " holding " ) + ( needsSyncing ( ) ? " & needed " : " " ) ) ;
}
bool EthereumPeer : : isSyncing ( ) const
{
return host ( ) - > m_syncer = = this ;
}
bool EthereumPeer : : shouldGrabBlocks ( ) const
{
auto td = m_syncingTotalDifficulty ;
auto lh = m_syncingLatestHash ;
auto ctd = host ( ) - > m_chain . details ( ) . totalDifficulty ;
if ( m_syncingNeededBlocks . empty ( ) )
return false ;
clog ( NetNote ) < < " Should grab blocks? " < < td < < " vs " < < ctd < < " ; " < < m_syncingNeededBlocks . size ( ) < < " blocks, ends " < < m_syncingNeededBlocks . back ( ) . abridged ( ) ;
if ( td < ctd | | ( td = = ctd & & host ( ) - > m_chain . currentHash ( ) = = lh ) )
return false ;
return true ;
}
void EthereumPeer : : tryGrabbingHashChain ( )
void EthereumPeer : : attemptSync ( )
{
if ( m_asking ! = Asking : : Nothing )
{
clogS ( NetAllDetail ) < < " Can't synced with this peer - outstanding asks. " ;
return ;
}
// if already done this, then ignore.
if ( m_grabbing ! = Grabbing : : State )
if ( ! needsSyncing ( ) )
{
clogS ( NetAllDetail ) < < " Already synced with this peer. " ;
return ;
@ -95,47 +276,26 @@ void EthereumPeer::tryGrabbingHashChain()
if ( td > = m_totalDifficulty )
{
clogS ( NetAllDetail ) < < " No. Our chain is better. " ;
setGrabbing ( Grabbing : : Nothing ) ;
return ; // All good - we have the better chain.
resetNeedsSyncing ( ) ;
transition ( Asking : : Nothing ) ;
}
// Our chain isn't better - grab theirs.
else
{
clogS ( NetAllDetail ) < < " Yes. Their chain is better. " ;
host ( ) - > updateGrabbing ( Grabbing : : Hashes ) ;
setGrabbing ( Grabbing : : Hashes ) ;
RLPStream s ;
prep ( s ) . appendList ( 3 ) ;
s < < GetBlockHashesPacket < < m_latestHash < < c_maxHashesAsk ;
m_neededBlocks = h256s ( 1 , m_latestHash ) ;
sealAndSend ( s ) ;
transition ( Asking : : Hashes ) ;
}
}
void EthereumPeer : : giveUpOnFetch ( )
bool EthereumPeer : : interpret ( unsigned _id , RLP const & _r )
{
clogS ( NetNote ) < < " Finishing fetch... " ;
// a bit overkill given that the other nodes may yet have the needed blocks, but better to be safe than sorry.
if ( m_grabbing = = Grabbing : : Chain | | m_grabbing = = Grabbing : : ChainHelper )
{
host ( ) - > noteDoneBlocks ( this ) ;
setGrabbing ( Grabbing : : Nothing ) ;
}
// NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary.
m_sub . doneFetch ( ) ;
}
bool EthereumPeer : : interpret ( RLP const & _r )
{
switch ( _r [ 0 ] . toInt < unsigned > ( ) )
switch ( _id )
{
case StatusPacket :
{
m_protocolVersion = _r [ 1 ] . toInt < unsigned > ( ) ;
m_networkId = _r [ 2 ] . toInt < u256 > ( ) ;
// a bit dirty as we're misusing these to communicate the values to transition, but harmless.
m_totalDifficulty = _r [ 3 ] . toInt < u256 > ( ) ;
m_latestHash = _r [ 4 ] . toHash < h256 > ( ) ;
auto genesisHash = _r [ 5 ] . toHash < h256 > ( ) ;
@ -148,10 +308,18 @@ bool EthereumPeer::interpret(RLP const& _r)
disable ( " Invalid protocol version. " ) ;
else if ( m_networkId ! = host ( ) - > networkId ( ) )
disable ( " Invalid network identifier. " ) ;
else if ( session ( ) - > info ( ) . clientVersion . find ( " /v0.6.9 / " ) ! = string : : npos )
else if ( session ( ) - > info ( ) . clientVersion . find ( " /v0.7.0 / " ) ! = string : : npos )
disable ( " Blacklisted client version. " ) ;
else if ( host ( ) - > isBanned ( session ( ) - > id ( ) ) )
disable ( " Peer banned for previous bad behaviour. " ) ;
else
startInitialSync ( ) ;
{
// Grab transactions off them.
RLPStream s ;
prep ( s , GetTransactionsPacket ) ;
sealAndSend ( s ) ;
transition ( Asking : : Nothing ) ;
}
break ;
}
case GetTransactionsPacket :
@ -163,13 +331,14 @@ bool EthereumPeer::interpret(RLP const& _r)
{
clogS ( NetMessageSummary ) < < " Transactions ( " < < dec < < ( _r . itemCount ( ) - 1 ) < < " entries) " ;
addRating ( _r . itemCount ( ) - 1 ) ;
lock_guard < recursive_mutex > l ( host ( ) - > m_incomingLock ) ;
Guard l ( x_knownTransactions ) ;
for ( unsigned i = 1 ; i < _r . itemCount ( ) ; + + i )
{
host ( ) - > addIncomingTransaction ( _r [ i ] . data ( ) . toBytes ( ) ) ;
lock_guard < mutex > l ( x_knownTransactions ) ;
m_knownTransactions . insert ( sha3 ( _r [ i ] . data ( ) ) ) ;
auto h = sha3 ( _r [ i ] . data ( ) ) ;
m_knownTransactions . insert ( h ) ;
if ( ! host ( ) - > m_tq . import ( _r [ i ] . data ( ) ) )
// if we already had the transaction, then don't bother sending it on.
host ( ) - > m_transactionsSent . insert ( h ) ;
}
break ;
}
@ -182,7 +351,7 @@ bool EthereumPeer::interpret(RLP const& _r)
unsigned c = min < unsigned > ( host ( ) - > m_chain . number ( later ) , limit ) ;
RLPStream s ;
prep ( s ) . appendList ( 1 + c ) . append ( BlockHashesPacket ) ;
prep ( s , BlockHashesPacket , c ) ;
h256 p = host ( ) - > m_chain . details ( later ) . parent ;
for ( unsigned i = 0 ; i < c & & p ; + + i , p = host ( ) - > m_chain . details ( p ) . parent )
s < < p ;
@ -193,14 +362,14 @@ bool EthereumPeer::interpret(RLP const& _r)
{
clogS ( NetMessageSummary ) < < " BlockHashes ( " < < dec < < ( _r . itemCount ( ) - 1 ) < < " entries) " < < ( _r . itemCount ( ) - 1 ? " " : " : NoMoreHashes " ) ;
if ( m_grabbing ! = Grabb ing : : Hashes )
if ( m_asking ! = Ask ing : : Hashes )
{
cwarn < < " Peer giving us hashes when we didn't ask for them. " ;
break ;
}
if ( _r . itemCount ( ) = = 1 )
{
host ( ) - > noteHaveChain ( thi s) ;
transition ( Asking : : Block s) ;
return true ;
}
for ( unsigned i = 1 ; i < _r . itemCount ( ) ; + + i )
@ -208,17 +377,14 @@ bool EthereumPeer::interpret(RLP const& _r)
auto h = _r [ i ] . toHash < h256 > ( ) ;
if ( host ( ) - > m_chain . isKnown ( h ) )
{
host ( ) - > noteHaveChain ( thi s) ;
transition ( Asking : : Block s) ;
return true ;
}
else
m_neededBlocks . push_back ( h ) ;
m_sy ncingN eededBlocks . push_back ( h ) ;
}
// run through - ask for more.
RLPStream s ;
prep ( s ) . appendList ( 3 ) ;
s < < GetBlockHashesPacket < < m_neededBlocks . back ( ) < < c_maxHashesAsk ;
sealAndSend ( s ) ;
transition ( Asking : : Hashes ) ;
break ;
}
case GetBlocksPacket :
@ -237,57 +403,107 @@ bool EthereumPeer::interpret(RLP const& _r)
}
}
RLPStream s ;
sealAndSend ( prep ( s ) . appendList ( n + 1 ) . append ( BlocksPacket ) . appendRaw ( rlp , n ) ) ;
prep ( s , BlocksPacket , n ) . appendRaw ( rlp , n ) ;
sealAndSend ( s ) ;
break ;
}
case BlocksPacket :
{
clogS ( NetMessageSummary ) < < " Blocks ( " < < dec < < ( _r . itemCount ( ) - 1 ) < < " entries) " < < ( _r . itemCount ( ) - 1 ? " " : " : NoMoreBlocks " ) ;
if ( m_asking ! = Asking : : Blocks )
clogS ( NetWarn ) < < " Unexpected Blocks received! " ;
if ( _r . itemCount ( ) = = 1 )
{
// Couldn't get any from last batch - probably got to this peer's latest block - just give up.
if ( m_grabbing = = Grabbing : : Chain | | m_grabbing = = Grabbing : : ChainHelper )
giveUpOnFetch ( ) ;
// Got to this peer's latest block - just give up.
transition ( Asking : : Nothing ) ;
break ;
}
unsigned used = 0 ;
unsigned success = 0 ;
unsigned future = 0 ;
unsigned unknown = 0 ;
unsigned got = 0 ;
unsigned repeated = 0 ;
for ( unsigned i = 1 ; i < _r . itemCount ( ) ; + + i )
{
auto h = BlockInfo : : headerHash ( _r [ i ] . data ( ) ) ;
m_sub . noteBlock ( h ) ;
if ( host ( ) - > noteBlock ( h , _r [ i ] . data ( ) ) )
used + + ;
Guard l ( x_knownBlocks ) ;
m_knownBlocks . insert ( h ) ;
}
addRating ( used ) ;
unsigned knownParents = 0 ;
unsigned unknownParents = 0 ;
if ( g_logVerbosity > = NetMessageSummary : : verbosity )
{
unsigned ic = _r . itemCount ( ) ;
for ( unsigned i = 1 ; i < ic ; + + i )
if ( m_sub . noteBlock ( h ) )
{
auto h = BlockInfo : : headerHash ( _r [ i ] . data ( ) ) ;
BlockInfo bi ( _r [ i ] . data ( ) ) ;
Guard l ( x_knownBlocks ) ;
if ( ! host ( ) - > m_chain . details ( bi . parentHash ) & & ! m_knownBlocks . count ( bi . parentHash ) )
{
unknownParents + + ;
clogS ( NetAllDetail ) < < " Unknown parent " < < bi . parentHash . abridged ( ) < < " of block " < < h . abridged ( ) ;
}
else
addRating ( 10 ) ;
switch ( host ( ) - > m_bq . import ( _r [ i ] . data ( ) , host ( ) - > m_chain ) )
{
knownParents + + ;
clogS ( NetAllDetail ) < < " Known parent " < < bi . parentHash . abridged ( ) < < " of block " < < h . abridged ( ) ;
case ImportResult : : Success :
success + + ;
break ;
case ImportResult : : Malformed :
disable ( " Malformed block received. " ) ;
return true ;
case ImportResult : : FutureTime :
future + + ;
break ;
case ImportResult : : AlreadyInChain :
case ImportResult : : AlreadyKnown :
got + + ;
break ;
case ImportResult : : UnknownParent :
unknown + + ;
break ;
}
}
else
{
addRating ( 0 ) ; // -1?
repeated + + ;
}
}
clogS ( NetMessageSummary ) < < dec < < success < < " imported OK, " < < unknown < < " with unknown parents, " < < future < < " with future timestamps, " < < got < < " already known, " < < repeated < < " repeats received. " ;
if ( m_asking = = Asking : : Blocks )
transition ( Asking : : Blocks ) ;
break ;
}
case NewBlockPacket :
{
auto h = BlockInfo : : headerHash ( _r [ 1 ] . data ( ) ) ;
clogS ( NetMessageSummary ) < < " NewBlock: " < < h . abridged ( ) ;
if ( _r . itemCount ( ) ! = 3 )
disable ( " NewBlock without 2 data fields. " ) ;
else
{
switch ( host ( ) - > m_bq . import ( _r [ 1 ] . data ( ) , host ( ) - > m_chain ) )
{
case ImportResult : : Success :
addRating ( 100 ) ;
break ;
case ImportResult : : FutureTime :
//TODO: Rating dependent on how far in future it is.
break ;
case ImportResult : : Malformed :
disable ( " Malformed block received. " ) ;
break ;
case ImportResult : : AlreadyInChain :
case ImportResult : : AlreadyKnown :
break ;
case ImportResult : : UnknownParent :
clogS ( NetMessageSummary ) < < " Received block with no known parent. Resyncing... " ;
setNeedsSyncing ( h , _r [ 2 ] . toInt < u256 > ( ) ) ;
break ;
}
Guard l ( x_knownBlocks ) ;
m_knownBlocks . insert ( h ) ;
}
clogS ( NetMessageSummary ) < < dec < < knownParents < < " known parents, " < < unknownParents < < " unknown, " < < used < < " used. " ;
if ( m_grabbing = = Grabbing : : Chain | | m_grabbing = = Grabbing : : ChainHelper )
continueGettingChain ( ) ;
break ;
}
default :
@ -295,42 +511,3 @@ bool EthereumPeer::interpret(RLP const& _r)
}
return true ;
}
void EthereumPeer : : ensureGettingChain ( )
{
if ( m_grabbing = = Grabbing : : ChainHelper )
return ; // Already asked & waiting for some.
// Switch to ChainHelper otherwise, unless we're already the Chain grabber.
if ( m_grabbing ! = Grabbing : : Chain )
setGrabbing ( Grabbing : : ChainHelper ) ;
continueGettingChain ( ) ;
}
void EthereumPeer : : continueGettingChain ( )
{
// If we're getting the hashes already, then we shouldn't be asking for the chain.
if ( m_grabbing = = Grabbing : : Hashes )
return ;
auto blocks = m_sub . nextFetch ( c_maxBlocksAsk ) ;
if ( blocks . size ( ) )
{
RLPStream s ;
prep ( s ) ;
s . appendList ( blocks . size ( ) + 1 ) < < GetBlocksPacket ;
for ( auto const & i : blocks )
s < < i ;
sealAndSend ( s ) ;
}
else
giveUpOnFetch ( ) ;
}
void EthereumPeer : : setGrabbing ( Grabbing _g )
{
m_grabbing = _g ;
session ( ) - > addNote ( " grab " , _g = = Grabbing : : Nothing ? " nothing " : _g = = Grabbing : : State ? " state " : _g = = Grabbing : : Hashes ? " hashes " : _g = = Grabbing : : Chain ? " chain " : _g = = Grabbing : : ChainHelper ? " chainhelper " : " ? " ) ;
}