@ -30,6 +30,7 @@
# endif
# include <chrono>
# include <thread>
# include "Exceptions.h"
# include "Common.h"
# include "BlockChain.h"
@ -42,6 +43,8 @@ using namespace eth;
# define clogS(X) eth::LogOutputStream<X, true>(false) << "| " << std::setw(2) << m_socket.native_handle() << "] "
static const int c_protocolVersion = 3 ;
static const eth : : uint c_maxHashes = 256 ; ///< Maximum number of hashes GetChain will ever send.
static const eth : : uint c_maxBlocks = 128 ; ///< Maximum number of blocks Blocks will ever send. BUG: if this gets too big (e.g. 2048) stuff starts going wrong.
static const eth : : uint c_maxBlocksAsk = 2048 ; ///< Maximum number of blocks we ask to receive in Blocks (when using GetChain).
@ -94,21 +97,29 @@ bool PeerSession::interpret(RLP const& _r)
m_protocolVersion = _r [ 1 ] . toInt < uint > ( ) ;
m_networkId = _r [ 2 ] . toInt < uint > ( ) ;
auto clientVersion = _r [ 3 ] . toString ( ) ;
m_caps = _r . itemCount ( ) > 4 ? _r [ 4 ] . toInt < uint > ( ) : 0x07 ;
m_listenPort = _r . itemCount ( ) > 5 ? _r [ 5 ] . toInt < short > ( ) : 0 ;
m_caps = _r [ 4 ] . toInt < uint > ( ) ;
m_listenPort = _r [ 5 ] . toInt < short > ( ) ;
m_id = _r [ 6 ] . toHash < h512 > ( ) ;
clogS ( NetMessageSummary ) < < " Hello: " < < clientVersion < < " V[ " < < m_protocolVersion < < " / " < < m_networkId < < " ] " < < asHex ( m_id . ref ( ) . cropped ( 0 , 4 ) ) < < showbase < < hex < < m_caps < < dec < < m_listenPort ;
clogS ( NetMessageSummary ) < < " Hello: " < < clientVersion < < " V[ " < < m_protocolVersion < < " / " < < m_networkId < < " ] " < < showbase < < hex < < m_caps < < dec < < m_listenPort ;
if ( m_server - > m_peers . count ( m_id ) | | ! m_id )
{
// Already connected.
disconnect ( DuplicatePeer ) ;
}
m_server - > m_peers [ m_id ] = shared_from_this ( ) ;
if ( m_protocolVersion ! = 1 | | m_networkId ! = m_reqNetworkId )
if ( m_protocolVersion ! = c_protocolVersion | | m_networkId ! = m_reqNetworkId )
{
disconnect ( ) ;
disconnect ( IncompatibleProtocol ) ;
return false ;
}
try
{ m_info = PeerInfo ( { clientVersion , m_socket . remote_endpoint ( ) . address ( ) . to_string ( ) , ( short ) m_socket . remote_endpoint ( ) . port ( ) , std : : chrono : : steady_clock : : duration ( ) } ) ; }
catch ( . . . )
{
disconnect ( ) ;
disconnect ( BadProtocol ) ;
return false ;
}
@ -131,13 +142,30 @@ bool PeerSession::interpret(RLP const& _r)
break ;
}
case DisconnectPacket :
clogS ( NetMessageSummary ) < < " Disconnect " ;
{
string reason = " Unspecified " ;
if ( _r . itemCount ( ) > 1 & & _r [ 1 ] . isInt ( ) )
switch ( _r [ 1 ] . toInt < int > ( ) )
{
case DisconnectRequested : reason = " Disconnect was requested. " ; break ;
case TCPError : reason = " Low-level TCP communication error. " ; break ;
case BadProtocol : reason = " Data format error. " ; break ;
case UselessPeer : reason = " We had no use to peer. " ; break ;
case TooManyPeers : reason = " Peer had too many connections. " ; break ;
case DuplicatePeer : reason = " Peer was already connected. " ; break ;
case WrongGenesis : reason = " Disagreement over genesis block. " ; break ;
case IncompatibleProtocol : reason = " Peer protocol versions are incompatible. " ; break ;
case ClientQuit : reason = " Peer is exiting. " ; break ;
}
clogS ( NetMessageSummary ) < < " Disconnect (reason: " < < reason < < " ) " ;
if ( m_socket . is_open ( ) )
clogS ( NetNote ) < < " Closing " < < m_socket . remote_endpoint ( ) ;
else
clogS ( NetNote ) < < " Remote closed. " ;
m_socket . close ( ) ;
return false ;
}
case PingPacket :
{
// clogS(NetMessageSummary) << "Ping";
@ -152,14 +180,14 @@ bool PeerSession::interpret(RLP const& _r)
case GetPeersPacket :
{
clogS ( NetMessageSummary ) < < " GetPeers " ;
std : : vector < bi : : tcp : : endpoint > peers = m_server - > potentialPeers ( ) ;
auto peers = m_server - > potentialPeers ( ) ;
RLPStream s ;
prep ( s ) . appendList ( peers . size ( ) + 1 ) ;
s < < PeersPacket ;
for ( auto i : peers )
{
clogS ( NetMessageDetail ) < < " Sending peer " < < i ;
s . appendList ( 2 ) < < i . address ( ) . to_v4 ( ) . to_bytes ( ) < < i . port ( ) ;
clogS ( NetMessageDetail ) < < " Sending peer " < < asHex ( i . first . ref ( ) . cropped ( 0 , 4 ) ) < < i . second ;
s . appendList ( 3 ) < < i . second . address ( ) . to_v4 ( ) . to_bytes ( ) < < i . second . port ( ) < < i . first ;
}
sealAndSend ( s ) ;
break ;
@ -169,7 +197,16 @@ bool PeerSession::interpret(RLP const& _r)
for ( unsigned i = 1 ; i < _r . itemCount ( ) ; + + i )
{
auto ep = bi : : tcp : : endpoint ( bi : : address_v4 ( _r [ i ] [ 0 ] . toArray < byte , 4 > ( ) ) , _r [ i ] [ 1 ] . toInt < short > ( ) ) ;
clogS ( NetAllDetail ) < < " Checking: " < < ep ;
Public id ;
if ( _r [ i ] . itemCount ( ) > 2 )
id = _r [ i ] [ 2 ] . toHash < Public > ( ) ;
clogS ( NetAllDetail ) < < " Checking: " < < ep < < " ( " < < asHex ( id . ref ( ) . cropped ( 0 , 4 ) ) < < " ) " ;
// check that it's not us or one we already know:
if ( id & & ( m_server - > m_key . pub ( ) = = id | | m_server - > m_peers . count ( id ) | | m_server - > m_incomingPeers . count ( id ) ) )
goto CONTINUE ;
// check that we're not already connected to addr:
if ( ! ep . port ( ) )
goto CONTINUE ;
@ -177,16 +214,16 @@ bool PeerSession::interpret(RLP const& _r)
if ( ep . address ( ) = = i & & ep . port ( ) = = m_server - > listenPort ( ) )
goto CONTINUE ;
for ( auto i : m_server - > m_peers )
if ( shared_ptr < PeerSession > p = i . lock ( ) )
if ( shared_ptr < PeerSession > p = i . second . lock ( ) )
{
clogS ( NetAllDetail ) < < " ...against " < < p - > endpoint ( ) ;
if ( p - > m_socket . is_open ( ) & & p - > endpoint ( ) = = ep )
goto CONTINUE ;
}
for ( auto i : m_server - > m_incomingPeers )
if ( i = = ep )
if ( i . second = = ep )
goto CONTINUE ;
m_server - > m_incomingPeers . push_back ( ep ) ;
m_server - > m_incomingPeers . insert ( make_pair ( id , ep ) ) ;
clogS ( NetMessageDetail ) < < " New peer: " < < ep ;
CONTINUE : ;
}
@ -313,7 +350,7 @@ bool PeerSession::interpret(RLP const& _r)
if ( noGood = = m_server - > m_chain - > genesisHash ( ) )
{
clogS ( NetWarn ) < < " Discordance over genesis block! Disconnect. " ;
disconnect ( ) ;
disconnect ( WrongGenesis ) ;
}
else
{
@ -410,14 +447,14 @@ void PeerSession::dropped()
} catch ( . . . ) { }
m_socket . close ( ) ;
for ( auto i = m_server - > m_peers . begin ( ) ; i ! = m_server - > m_peers . end ( ) ; + + i )
if ( i - > lock ( ) . get ( ) = = this )
if ( i - > second . lock ( ) . get ( ) = = this )
{
m_server - > m_peers . erase ( i ) ;
break ;
}
}
void PeerSession : : disconnect ( )
void PeerSession : : disconnect ( int _reason )
{
if ( m_socket . is_open ( ) )
{
@ -425,7 +462,7 @@ void PeerSession::disconnect()
{
RLPStream s ;
prep ( s ) ;
s . appendList ( 1 ) < < DisconnectPacket ;
s . appendList ( 1 ) < < DisconnectPacket < < _reason ;
sealAndSend ( s ) ;
m_disconnect = chrono : : steady_clock : : now ( ) ;
}
@ -446,9 +483,7 @@ void PeerSession::start()
{
RLPStream s ;
prep ( s ) ;
s . appendList ( m_server - > m_public . port ( ) ? 6 : 5 ) < < HelloPacket < < ( uint ) 1 < < ( uint ) m_server - > m_requiredNetworkId < < m_server - > m_clientVersion < < ( m_server - > m_mode = = NodeMode : : Full ? 0x07 : m_server - > m_mode = = NodeMode : : PeerServer ? 0x01 : 0 ) ;
if ( m_server - > m_public . port ( ) )
s < < m_server - > m_public . port ( ) ;
s . appendList ( m_server - > m_public . port ( ) ? 6 : 5 ) < < HelloPacket < < ( uint ) c_protocolVersion < < ( uint ) m_server - > m_requiredNetworkId < < m_server - > m_clientVersion < < ( m_server - > m_mode = = NodeMode : : Full ? 0x07 : m_server - > m_mode = = NodeMode : : PeerServer ? 0x01 : 0 ) < < m_server - > m_public . port ( ) < < m_server - > m_key . pub ( ) ;
sealAndSend ( s ) ;
ping ( ) ;
@ -516,31 +551,34 @@ PeerServer::PeerServer(std::string const& _clientVersion, BlockChain const& _ch,
m_chain ( & _ch ) ,
m_acceptor ( m_ioService , bi : : tcp : : endpoint ( bi : : tcp : : v4 ( ) , _port ) ) ,
m_socket ( m_ioService ) ,
m_key ( KeyPair : : create ( ) ) ,
m_requiredNetworkId ( _networkId )
{
populateAddresses ( ) ;
determinePublic ( _publicAddress , _upnp ) ;
ensureAccepting ( ) ;
clog ( NetNote ) < < " Mode: " < < ( _m = = NodeMode : : PeerServer ? " PeerServer " : " Full " ) ;
clog ( NetNote ) < < " Id: " < < asHex ( m_key . address ( ) . ref ( ) . cropped ( 0 , 4 ) ) < < " Mode: " < < ( _m = = NodeMode : : PeerServer ? " PeerServer " : " Full " ) ;
}
PeerServer : : PeerServer ( std : : string const & _clientVersion , uint _networkId ) :
PeerServer : : PeerServer ( std : : string const & _clientVersion , uint _networkId , NodeMode _m ) :
m_clientVersion ( _clientVersion ) ,
m_mode ( _m ) ,
m_listenPort ( - 1 ) ,
m_acceptor ( m_ioService , bi : : tcp : : endpoint ( bi : : tcp : : v4 ( ) , 0 ) ) ,
m_socket ( m_ioService ) ,
m_key ( KeyPair : : create ( ) ) ,
m_requiredNetworkId ( _networkId )
{
// populate addresses.
populateAddresses ( ) ;
clog ( NetNote ) < < " Genesis : " < < m_chain - > genesisHash ( ) ;
clog ( NetNote ) < < " Id: " < < asHex ( m_key . address ( ) . ref ( ) . cropped ( 0 , 4 ) ) < < " Mode : " < < ( m_mode = = NodeMode : : PeerServer ? " PeerServer " : " Full " ) ;
}
PeerServer : : ~ PeerServer ( )
{
for ( auto const & i : m_peers )
if ( auto p = i . lock ( ) )
p - > disconnect ( ) ;
if ( auto p = i . second . lock ( ) )
p - > disconnect ( ClientQuit ) ;
delete m_upnp ;
}
@ -654,17 +692,17 @@ void PeerServer::populateAddresses()
# endif
}
std : : vector < bi : : tcp : : endpoint > PeerServer : : potentialPeers ( )
std : : map < Public , bi : : tcp : : endpoint > PeerServer : : potentialPeers ( )
{
std : : vector < bi : : tcp : : endpoint > ret ;
std : : map < Public , bi : : tcp : : endpoint > ret ;
if ( ! m_public . address ( ) . is_unspecified ( ) )
ret . push_back ( m_public ) ;
ret . insert ( make_pair ( m_key . pub ( ) , m_public ) ) ;
for ( auto i : m_peers )
if ( auto j = i . lock ( ) )
if ( auto j = i . second . lock ( ) )
{
auto ep = j - > endpoint ( ) ;
if ( ep . port ( ) )
ret . push_back ( ep ) ;
if ( ep . port ( ) & & j - > m_id )
ret . insert ( make_pair ( i . first , ep ) ) ;
}
return ret ;
}
@ -684,7 +722,6 @@ void PeerServer::ensureAccepting()
clog ( NetNote ) < < " Accepted connection from " < < m_socket . remote_endpoint ( ) ;
} catch ( . . . ) { }
auto p = std : : make_shared < PeerSession > ( this , std : : move ( m_socket ) , m_requiredNetworkId ) ;
m_peers . push_back ( p ) ;
p - > start ( ) ;
}
catch ( std : : exception const & _e )
@ -712,7 +749,6 @@ void PeerServer::connect(bi::tcp::endpoint const& _ep)
else
{
auto p = make_shared < PeerSession > ( this , std : : move ( * s ) , m_requiredNetworkId ) ;
m_peers . push_back ( p ) ;
clog ( NetNote ) < < " Connected to " < < p - > endpoint ( ) ;
p - > start ( ) ;
}
@ -733,7 +769,7 @@ bool PeerServer::process(BlockChain& _bc)
if ( fullProcess )
for ( auto i = m_peers . begin ( ) ; i ! = m_peers . end ( ) ; )
{
auto p = i - > lock ( ) ;
auto p = i - > second . lock ( ) ;
if ( p & & p - > m_socket . is_open ( ) & &
( p - > m_disconnect = = chrono : : steady_clock : : time_point : : max ( ) | | chrono : : steady_clock : : now ( ) - p - > m_disconnect < chrono : : seconds ( 1 ) ) ) // kill old peers that should be disconnected.
+ + i ;
@ -782,7 +818,7 @@ bool PeerServer::process(BlockChain& _bc, TransactionQueue& _tq, Overlay& _o)
if ( fullProcess )
{
for ( auto j : m_peers )
if ( auto p = j . lock ( ) )
if ( auto p = j . second . lock ( ) )
{
bytes b ;
uint n = 0 ;
@ -798,7 +834,7 @@ bool PeerServer::process(BlockChain& _bc, TransactionQueue& _tq, Overlay& _o)
RLPStream ts ;
PeerSession : : prep ( ts ) ;
ts . appendList ( n + 1 ) < < TransactionsPacket ;
ts . appendRaw ( b ) . swapOut ( b ) ;
ts . appendRaw ( b , n ) . swapOut ( b ) ;
seal ( b ) ;
p - > send ( & b ) ;
}
@ -818,7 +854,7 @@ bool PeerServer::process(BlockChain& _bc, TransactionQueue& _tq, Overlay& _o)
ts . appendRaw ( _bc . block ( _bc . currentHash ( ) ) ) . swapOut ( b ) ;
seal ( b ) ;
for ( auto j : m_peers )
if ( auto p = j . lock ( ) )
if ( auto p = j . second . lock ( ) )
{
if ( ! p - > m_knownBlocks . count ( _bc . currentHash ( ) ) )
p - > send ( & b ) ;
@ -867,7 +903,7 @@ bool PeerServer::process(BlockChain& _bc, TransactionQueue& _tq, Overlay& _o)
( PeerSession : : prep ( s ) . appendList ( 1 ) < < GetPeersPacket ) . swapOut ( b ) ;
seal ( b ) ;
for ( auto const & i : m_peers )
if ( auto p = i . lock ( ) )
if ( auto p = i . second . lock ( ) )
if ( p - > isOpen ( ) )
p - > send ( & b ) ;
m_lastPeersRequest = chrono : : steady_clock : : now ( ) ;
@ -879,8 +915,8 @@ bool PeerServer::process(BlockChain& _bc, TransactionQueue& _tq, Overlay& _o)
break ;
}
connect ( m_incomingPeers . back ( ) ) ;
m_incomingPeers . pop_back ( ) ;
connect ( m_incomingPeers . begin ( ) - > second ) ;
m_incomingPeers . erase ( m_incomingPeers . begin ( ) ) ;
}
}
}
@ -900,7 +936,7 @@ bool PeerServer::process(BlockChain& _bc, TransactionQueue& _tq, Overlay& _o)
shared_ptr < PeerSession > worst ;
unsigned agedPeers = 0 ;
for ( auto i : m_peers )
if ( auto p = i . lock ( ) )
if ( auto p = i . second . lock ( ) )
if ( ( m_mode ! = NodeMode : : PeerServer | | p - > m_caps ! = 0x01 ) & & chrono : : steady_clock : : now ( ) > p - > m_connect + chrono : : milliseconds ( old ) ) // don't throw off new peers; peer-servers should never kick off other peer-servers.
{
+ + agedPeers ;
@ -909,7 +945,7 @@ bool PeerServer::process(BlockChain& _bc, TransactionQueue& _tq, Overlay& _o)
}
if ( ! worst | | agedPeers < = m_idealPeerCount )
break ;
worst - > disconnect ( ) ;
worst - > disconnect ( TooManyPeers ) ;
}
}
@ -919,10 +955,10 @@ bool PeerServer::process(BlockChain& _bc, TransactionQueue& _tq, Overlay& _o)
std : : vector < PeerInfo > PeerServer : : peers ( ) const
{
const_cast < PeerServer * > ( this ) - > pingAll ( ) ;
usleep ( 200000 ) ;
this_thread : : sleep_for ( chrono : : milliseconds ( 200 ) ) ;
std : : vector < PeerInfo > ret ;
for ( auto & i : m_peers )
if ( auto j = i . lock ( ) )
if ( auto j = i . second . lock ( ) )
if ( j - > m_socket . is_open ( ) )
ret . push_back ( j - > m_info ) ;
return ret ;
@ -931,6 +967,6 @@ std::vector<PeerInfo> PeerServer::peers() const
void PeerServer : : pingAll ( )
{
for ( auto & i : m_peers )
if ( auto j = i . lock ( ) )
if ( auto j = i . second . lock ( ) )
j - > ping ( ) ;
}