@ -40,12 +40,11 @@ Session::Session(Host* _s, bi::tcp::socket _socket, bi::tcp::endpoint const& _ma
m_server ( _s ) ,
m_socket ( std : : move ( _socket ) ) ,
m_node ( nullptr ) ,
m_manualEndpoint ( _manual )
m_manualEndpoint ( _manual ) // NOTE: the port on this shouldn't be used if it's zero.
{
m_disconnect = std : : chrono : : steady_clock : : time_point : : max ( ) ;
m_connect = std : : chrono : : steady_clock : : now ( ) ;
m_lastReceived = m_connect = std : : chrono : : steady_clock : : now ( ) ;
m_info = PeerInfo ( { NodeId ( ) , " ? " , m_manualEndpoint . address ( ) . to_string ( ) , m_manualEndpoint . port ( ) , std : : chrono : : steady_clock : : duration ( 0 ) , CapDescSet ( ) , 0 , map < string , string > ( ) } ) ;
m_info = PeerInfo ( { NodeId ( ) , " ? " , m_manualEndpoint . address ( ) . to_string ( ) , 0 , std : : chrono : : steady_clock : : duration ( 0 ) , CapDescSet ( ) , 0 , map < string , string > ( ) } ) ;
}
Session : : Session ( Host * _s , bi : : tcp : : socket _socket , std : : shared_ptr < Node > const & _n , bool _force ) :
@ -55,15 +54,15 @@ Session::Session(Host* _s, bi::tcp::socket _socket, std::shared_ptr<Node> const&
m_manualEndpoint ( _n - > address ) ,
m_force ( _force )
{
m_disconnect = std : : chrono : : steady_clock : : time_point : : max ( ) ;
m_connect = std : : chrono : : steady_clock : : now ( ) ;
m_lastReceived = m_connect = std : : chrono : : steady_clock : : now ( ) ;
m_info = PeerInfo ( { m_node - > id , " ? " , _n - > address . address ( ) . to_string ( ) , _n - > address . port ( ) , std : : chrono : : steady_clock : : duration ( 0 ) , CapDescSet ( ) , 0 , map < string , string > ( ) } ) ;
}
Session : : ~ Session ( )
{
if ( id ( ) )
m_server - > noteNode ( id ( ) , m_manualEndpoint , Origin : : Unknown , true ) ;
if ( m_node )
if ( id ( ) & & ! isPermanentProblem ( m_node - > lastDisconnect ) & & ! m_node - > dead )
m_server - > m_ready + = m_node - > index ;
// Read-chain finished for one reason or another.
for ( auto & i : m_capabilities )
@ -125,8 +124,51 @@ template <class T> vector<T> randomSelection(vector<T> const& _t, unsigned _n)
return ret ;
}
void Session : : ensureNodesRequested ( )
{
if ( isOpen ( ) & & ! m_weRequestedNodes )
{
m_weRequestedNodes = true ;
RLPStream s ;
sealAndSend ( prep ( s , GetPeersPacket ) ) ;
}
}
void Session : : serviceNodesRequest ( )
{
if ( ! m_theyRequestedNodes )
return ;
auto peers = m_server - > potentialPeers ( m_knownNodes ) ;
if ( peers . empty ( ) )
{
addNote ( " peers " , " requested " ) ;
return ;
}
// note this should cost them...
RLPStream s ;
prep ( s , PeersPacket , min < unsigned > ( 10 , peers . size ( ) ) ) ;
auto rs = randomSelection ( peers , 10 ) ;
for ( auto const & i : rs )
{
clogS ( NetTriviaDetail ) < < " Sending peer " < < i . id . abridged ( ) < < i . address ;
if ( i . address . address ( ) . is_v4 ( ) )
s . appendList ( 3 ) < < bytesConstRef ( i . address . address ( ) . to_v4 ( ) . to_bytes ( ) . data ( ) , 4 ) < < i . address . port ( ) < < i . id ;
else // if (i.second.address().is_v6()) - assumed
s . appendList ( 3 ) < < bytesConstRef ( i . address . address ( ) . to_v6 ( ) . to_bytes ( ) . data ( ) , 16 ) < < i . address . port ( ) < < i . id ;
m_knownNodes . extendAll ( i . index ) ;
m_knownNodes . unionWith ( i . index ) ;
}
sealAndSend ( s ) ;
m_theyRequestedNodes = false ;
addNote ( " peers " , " done " ) ;
}
bool Session : : interpret ( RLP const & _r )
{
m_lastReceived = chrono : : steady_clock : : now ( ) ;
clogS ( NetRight ) < < _r ;
try // Generic try-catch block designed to capture RLP format errors - TODO: give decent diagnostics, make a bit more specific over what is caught.
{
@ -135,9 +177,6 @@ bool Session::interpret(RLP const& _r)
{
case HelloPacket :
{
if ( m_node )
m_node - > lastDisconnect = - 1 ;
m_protocolVersion = _r [ 1 ] . toInt < unsigned > ( ) ;
auto clientVersion = _r [ 2 ] . toString ( ) ;
auto caps = _r [ 3 ] . toVector < CapDesc > ( ) ;
@ -152,12 +191,12 @@ bool Session::interpret(RLP const& _r)
clogS ( NetMessageSummary ) < < " Hello: " < < clientVersion < < " V[ " < < m_protocolVersion < < " ] " < < id . abridged ( ) < < showbase < < capslog . str ( ) < < dec < < listenPort ;
if ( m_server - > havePeer ( id ) )
if ( m_server - > id ( ) = = id )
{
// Already connected.
clogS ( NetWarn ) < < " Already connected to a peer with id" < < id . abridged ( ) ;
disconnect ( DuplicatePeer ) ;
return fals e;
clogS ( NetWarn ) < < " Connected to ourself under a false pretext. We were told this peer was id" < < m_info . id . abridged ( ) ;
disconnect ( LocalIdentity ) ;
return tru e;
}
if ( m_node & & m_node - > id ! = id )
@ -169,14 +208,29 @@ bool Session::interpret(RLP const& _r)
{
clogS ( NetWarn ) < < " Connected to node, but their ID has changed since last time. This could indicate a MitM attack. Disconnecting. " ;
disconnect ( UnexpectedIdentity ) ;
return fals e;
return tru e;
}
if ( m_server - > havePeer ( id ) )
{
m_node - > dead = true ;
disconnect ( DuplicatePeer ) ;
return true ;
}
}
if ( m_server - > havePeer ( id ) )
{
// Already connected.
clogS ( NetWarn ) < < " Already connected to a peer with id " < < id . abridged ( ) ;
disconnect ( DuplicatePeer ) ;
return true ;
}
if ( ! id )
{
disconnect ( NullIdentity ) ;
return false ;
return tru e;
}
m_node = m_server - > noteNode ( id , bi : : tcp : : endpoint ( m_socket . remote_endpoint ( ) . address ( ) , listenPort ) , Origin : : Self , false , ! m_node | | m_node - > id = = id ? NodeId ( ) : m_node - > id ) ;
@ -186,7 +240,7 @@ bool Session::interpret(RLP const& _r)
if ( m_protocolVersion ! = m_server - > protocolVersion ( ) )
{
disconnect ( IncompatibleProtocol ) ;
return fals e;
return tru e;
}
m_info = PeerInfo ( { id , clientVersion , m_socket . remote_endpoint ( ) . address ( ) . to_string ( ) , listenPort , std : : chrono : : steady_clock : : duration ( ) , _r [ 3 ] . toSet < CapDesc > ( ) , ( unsigned ) m_socket . native_handle ( ) , map < string , string > ( ) } ) ;
@ -196,16 +250,16 @@ bool Session::interpret(RLP const& _r)
case DisconnectPacket :
{
string reason = " Unspecified " ;
if ( _r [ 1 ] . isInt ( ) )
reason = reasonOf ( ( DisconnectReason ) _r [ 1 ] . toInt < int > ( ) ) ;
clogS ( NetMessageSummary ) < < " Disconnect (reason: " < < reason < < " ) " ;
if ( m_socket . is_open ( ) )
clogS ( NetNote ) < < " Closing " < < m_socket . remote_endpoint ( ) ;
auto r = ( DisconnectReason ) _r [ 1 ] . toInt < int > ( ) ;
if ( ! _r [ 1 ] . isInt ( ) )
drop ( BadProtocol ) ;
else
clogS ( NetNote ) < < " Remote closed. " ;
m_socket . close ( ) ;
return false ;
{
reason = reasonOf ( r ) ;
clogS ( NetMessageSummary ) < < " Disconnect (reason: " < < reason < < " ) " ;
drop ( DisconnectRequested ) ;
}
break ;
}
case PingPacket :
{
@ -221,27 +275,13 @@ bool Session::interpret(RLP const& _r)
case GetPeersPacket :
{
clogS ( NetTriviaSummary ) < < " GetPeers " ;
auto peers = m_server - > potentialPeers ( m_knownNodes ) ;
if ( peers . empty ( ) )
break ;
RLPStream s ;
prep ( s , PeersPacket , min < unsigned > ( 10 , peers . size ( ) ) ) ;
auto rs = randomSelection ( peers , 10 ) ;
for ( auto const & i : rs )
{
clogS ( NetTriviaDetail ) < < " Sending peer " < < i . id . abridged ( ) < < i . address ;
if ( i . address . address ( ) . is_v4 ( ) )
s . appendList ( 3 ) < < bytesConstRef ( i . address . address ( ) . to_v4 ( ) . to_bytes ( ) . data ( ) , 4 ) < < i . address . port ( ) < < i . id ;
else // if (i.second.address().is_v6()) - assumed
s . appendList ( 3 ) < < bytesConstRef ( i . address . address ( ) . to_v6 ( ) . to_bytes ( ) . data ( ) , 16 ) < < i . address . port ( ) < < i . id ;
m_knownNodes . extendAll ( i . index ) ;
m_knownNodes . unionWith ( i . index ) ;
}
sealAndSend ( s ) ;
m_theyRequestedNodes = true ;
serviceNodesRequest ( ) ;
break ;
}
case PeersPacket :
clogS ( NetTriviaSummary ) < < " Peers ( " < < dec < < ( _r . itemCount ( ) - 1 ) < < " entries) " ;
m_weRequestedNodes = false ;
for ( unsigned i = 1 ; i < _r . itemCount ( ) ; + + i )
{
bi : : address peerAddress ;
@ -252,7 +292,7 @@ bool Session::interpret(RLP const& _r)
else
{
disconnect ( BadProtocol ) ;
return fals e;
return tru e;
}
auto ep = bi : : tcp : : endpoint ( peerAddress , _r [ i ] [ 1 ] . toInt < short > ( ) ) ;
NodeId id = _r [ i ] [ 2 ] . toHash < NodeId > ( ) ;
@ -273,19 +313,29 @@ bool Session::interpret(RLP const& _r)
// check that it's not us or one we already know:
if ( m_server - > m_nodes . count ( id ) )
{
/* MEH. Far from an ideal solution. Leave alone for now.
// Already got this node.
// See if it's any better that ours or not...
// This could be the public address of a known node.
// SECURITY: remove this in beta - it's only for lazy connections and presents an easy attack vector.
if ( m_server - > m_nodes . count ( id ) & & isPrivateAddress ( m_server - > m_nodes . at ( id ) - > address . address ( ) ) )
if ( m_server - > m_nodes . count ( id ) & & isPrivateAddress ( m_server - > m_nodes . at ( id ) - > address . address ( ) ) & & ep . port ( ) ! = 0 )
// Update address if the node if we now have a public IP for it.
m_server - > m_nodes [ id ] - > address = ep ;
*/
goto CONTINUE ;
}
if ( ! ep . port ( ) )
goto CONTINUE ; // Zero port? Don't think so.
if ( ep . port ( ) > = 49152 )
goto CONTINUE ; // Private port according to IANA.
// TODO: PoC-7:
// Technically fine, but ignore for now to avoid peers passing on incoming ports until we can be sure that doesn't happen any more.
// if (ep.port() < 30300 || ep.port() > 30305)
// goto CONTINUE; // Wierd port.
// Avoid our random other addresses that they might end up giving us.
for ( auto i : m_server - > m_addresses )
if ( ep . address ( ) = = i & & ep . port ( ) = = m_server - > listenPort ( ) )
@ -314,10 +364,11 @@ bool Session::interpret(RLP const& _r)
}
}
}
catch ( . . . )
catch ( std : : exception const & _e )
{
clogS ( NetWarn ) < < " Peer causing an exception: " < < _e . what ( ) ;
disconnect ( BadProtocol ) ;
return fals e;
return tru e;
}
return true ;
}
@ -329,12 +380,6 @@ void Session::ping()
m_ping = std : : chrono : : steady_clock : : now ( ) ;
}
void Session : : getPeers ( )
{
RLPStream s ;
sealAndSend ( prep ( s , GetPeersPacket ) ) ;
}
RLPStream & Session : : prep ( RLPStream & _s , PacketType _id , unsigned _args )
{
return prep ( _s ) . appendList ( _args + 1 ) . append ( ( unsigned ) _id ) ;
@ -350,7 +395,7 @@ void Session::sealAndSend(RLPStream& _s)
bytes b ;
_s . swapOut ( b ) ;
m_server - > seal ( b ) ;
sendDestroy ( b ) ;
send ( move ( b ) ) ;
}
bool Session : : checkPacket ( bytesConstRef _msg )
@ -368,42 +413,26 @@ bool Session::checkPacket(bytesConstRef _msg)
return true ;
}
void Session : : sendDestroy ( bytes & _msg )
void Session : : send ( bytesConstRef _msg )
{
clogS ( NetLeft ) < < RLP ( bytesConstRef ( & _msg ) . cropped ( 8 ) ) ;
if ( ! checkPacket ( bytesConstRef ( & _msg ) ) )
{
clogS ( NetWarn ) < < " INVALID PACKET CONSTRUCTED! " ;
}
bytes buffer = bytes ( std : : move ( _msg ) ) ;
writeImpl ( buffer ) ;
send ( _msg . toBytes ( ) ) ;
}
void Session : : send ( bytesConstRef _msg )
void Session : : send ( bytes & & _msg )
{
clogS ( NetLeft ) < < RLP ( _msg . cropped ( 8 ) ) ;
clogS ( NetLeft ) < < RLP ( bytesConstRef ( & _msg ) . cropped ( 8 ) ) ;
if ( ! checkPacket ( _msg ) )
{
if ( ! checkPacket ( bytesConstRef ( & _msg ) ) )
clogS ( NetWarn ) < < " INVALID PACKET CONSTRUCTED! " ;
}
bytes buffer = bytes ( _msg . toBytes ( ) ) ;
writeImpl ( buffer ) ;
}
void Session : : writeImpl ( bytes & _buffer )
{
// cerr << (void*)this << " writeImpl" << endl;
if ( ! m_socket . is_open ( ) )
return ;
bool doWrite = false ;
{
lock_guard < mutex > l ( m_writeLock ) ;
m_writeQueue . push_back ( _buffer ) ;
Guard l ( x_writeQueue ) ;
m_writeQueue . push_back ( _msg ) ;
doWrite = ( m_writeQueue . size ( ) = = 1 ) ;
}
@ -417,18 +446,16 @@ void Session::write()
auto self ( shared_from_this ( ) ) ;
ba : : async_write ( m_socket , ba : : buffer ( bytes ) , [ this , self ] ( boost : : system : : error_code ec , std : : size_t /*length*/ )
{
// cerr << (void*)this << " write.callback" << endl;
// must check queue, as write callback can occur following dropped()
if ( ec )
{
clogS ( NetWarn ) < < " Error sending: " < < ec . message ( ) ;
dropped ( ) ;
drop ( TCPError ) ;
return ;
}
else
{
lock_guard < mutex > l ( m_writeLock ) ;
Guard l ( x_writeQueue ) ;
m_writeQueue . pop_front ( ) ;
if ( m_writeQueue . empty ( ) )
return ;
@ -437,37 +464,43 @@ void Session::write()
} ) ;
}
void Session : : dropped ( )
void Session : : drop ( DisconnectReason _reason )
{
// cerr << (void*)this << " dropped" << endl;
if ( m_dropped )
return ;
cerr < < ( void * ) this < < " dropped " < < endl ;
if ( m_socket . is_open ( ) )
try
{
clogS ( NetConnect ) < < " Closing " < < m_socket . remote_endpoint ( ) ;
clogS ( NetConnect ) < < " Closing " < < m_socket . remote_endpoint ( ) < < " ( " < < reasonOf ( _reason ) < < " ) " ;
m_socket . close ( ) ;
}
catch ( . . . ) { }
}
void Session : : disconnect ( int _reason )
{
clogS ( NetConnect ) < < " Disconnecting (reason: " < < reasonOf ( ( DisconnectReason ) _reason ) < < " ) " ;
if ( m_node )
{
if ( _reason ! = m_node - > lastDisconnect | | _reason = = NoDisconnect | | _reason = = ClientQuit | | _reason = = DisconnectRequested )
m_node - > failedAttempts = 0 ;
m_node - > lastDisconnect = _reason ;
if ( _reason = = BadProtocol )
{
m_node - > rating / = 2 ;
m_node - > score / = 2 ;
}
}
m_dropped = true ;
}
void Session : : disconnect ( DisconnectReason _reason )
{
clogS ( NetConnect ) < < " Disconnecting (our reason: " < < reasonOf ( _reason ) < < " ) " ;
if ( m_socket . is_open ( ) )
{
if ( m_disconnect = = chrono : : steady_clock : : time_point : : max ( ) )
{
RLPStream s ;
prep ( s , DisconnectPacket , 1 ) < < _reason ;
sealAndSend ( s ) ;
m_disconnect = chrono : : steady_clock : : now ( ) ;
}
else
dropped ( ) ;
RLPStream s ;
prep ( s , DisconnectPacket , 1 ) < < ( int ) _reason ;
sealAndSend ( s ) ;
}
drop ( _reason ) ;
}
void Session : : start ( )
@ -481,15 +514,13 @@ void Session::start()
< < m_server - > id ( ) ;
sealAndSend ( s ) ;
ping ( ) ;
getPeers ( ) ;
doRead ( ) ;
}
void Session : : doRead ( )
{
// ignore packets received while waiting to disconnect
if ( chrono : : steady_clock : : now ( ) - m_disconnect > ch rono : : s econ ds ( 0 ) )
if ( m_dropp ed )
return ;
auto self ( shared_from_this ( ) ) ;
@ -500,7 +531,7 @@ void Session::doRead()
{
// got here with length of 1241...
clogS ( NetWarn ) < < " Error reading: " < < ec . message ( ) ;
dropped ( ) ;
drop ( TCPError ) ;
}
else if ( ec & & length = = 0 )
{
@ -541,8 +572,8 @@ void Session::doRead()
RLP r ( data . cropped ( 8 ) ) ;
if ( ! interpret ( r ) )
{
// error
dropped ( ) ;
// error - bad protocol
disconnect ( BadProtocol ) ;
return ;
}
}
@ -555,12 +586,12 @@ void Session::doRead()
catch ( Exception const & _e )
{
clogS ( NetWarn ) < < " ERROR: " < < diagnostic_information ( _e ) ;
dropped ( ) ;
drop ( BadProtocol ) ;
}
catch ( std : : exception const & _e )
{
clogS ( NetWarn ) < < " ERROR: " < < _e . what ( ) ;
dropped ( ) ;
drop ( BadProtocol ) ;
}
}
} ) ;