@ -31,13 +31,12 @@ using namespace eth;
# define clogS(X) eth::LogOutputStream<X, true>(false) << "| " << std::setw(2) << m_socket.native_handle() << "] "
# define clogS(X) eth::LogOutputStream<X, true>(false) << "| " << std::setw(2) << m_socket.native_handle() << "] "
static const eth : : uint c_maxHashes = 32 ; ///< Maximum number of hashes GetChain will ever send.
static const eth : : uint c_maxHashes = 4096 ; ///< Maximum number of hashes GetChain will ever send.
static const eth : : uint c_maxBlocks = 64 ; ///< Maximum number of blocks Blocks will ever send. BUG: if this gets too big (e.g. 2048) stuff starts going wrong .
static const eth : : uint c_maxBlocks = 2048 ; ///< Maximum number of blocks Blocks will ever send.
static const eth : : uint c_maxBlocksAsk = 256 ; ///< Maximum number of blocks we ask to receive in Blocks (when using GetChain).
static const eth : : uint c_maxBlocksAsk = 512 ; ///< Maximum number of blocks we ask to receive in Blocks (when using GetChain).
PeerSession : : PeerSession ( PeerServer * _s , bi : : tcp : : socket _socket , uint _rNId , bi : : address _peerAddress , unsigned short _peerPort ) :
PeerSession : : PeerSession ( PeerServer * _s , bi : : tcp : : socket _socket , uint _rNId , bi : : address _peerAddress , unsigned short _peerPort ) :
m_server ( _s ) ,
m_server ( _s ) ,
m_strand ( _socket . get_io_service ( ) ) ,
m_socket ( std : : move ( _socket ) ) ,
m_socket ( std : : move ( _socket ) ) ,
m_reqNetworkId ( _rNId ) ,
m_reqNetworkId ( _rNId ) ,
m_listenPort ( _peerPort ) ,
m_listenPort ( _peerPort ) ,
@ -50,24 +49,23 @@ PeerSession::PeerSession(PeerServer* _s, bi::tcp::socket _socket, uint _rNId, bi
PeerSession : : ~ PeerSession ( )
PeerSession : : ~ PeerSession ( )
{
{
m_strand . post ( [ = ] ( )
// Read-chain finished for one reason or another.
try
{
{
if ( ! m_writeq . empty ( ) )
m_writeq . clear ( ) ;
try {
if ( m_socket . is_open ( ) )
if ( m_socket . is_open ( ) )
m_socket . close ( ) ;
m_socket . close ( ) ;
} catch ( . . . ) { }
}
} ) ;
catch ( . . . ) { }
}
}
bi : : tcp : : endpoint PeerSession : : endpoint ( ) const
bi : : tcp : : endpoint PeerSession : : endpoint ( ) const
{
{
if ( m_socket . is_open ( ) )
if ( m_socket . is_open ( ) )
try {
try
{
return bi : : tcp : : endpoint ( m_socket . remote_endpoint ( ) . address ( ) , m_listenPort ) ;
return bi : : tcp : : endpoint ( m_socket . remote_endpoint ( ) . address ( ) , m_listenPort ) ;
} catch ( . . . ) { }
}
catch ( . . . ) { }
return bi : : tcp : : endpoint ( ) ;
return bi : : tcp : : endpoint ( ) ;
}
}
@ -395,19 +393,6 @@ RLPStream& PeerSession::prep(RLPStream& _s)
return _s . appendRaw ( bytes ( 8 , 0 ) ) ;
return _s . appendRaw ( bytes ( 8 , 0 ) ) ;
}
}
void PeerServer : : seal ( bytes & _b )
{
_b [ 0 ] = 0x22 ;
_b [ 1 ] = 0x40 ;
_b [ 2 ] = 0x08 ;
_b [ 3 ] = 0x91 ;
uint32_t len = ( uint32_t ) _b . size ( ) - 8 ;
_b [ 4 ] = ( len > > 24 ) & 0xff ;
_b [ 5 ] = ( len > > 16 ) & 0xff ;
_b [ 6 ] = ( len > > 8 ) & 0xff ;
_b [ 7 ] = len & 0xff ;
}
void PeerSession : : sealAndSend ( RLPStream & _s )
void PeerSession : : sealAndSend ( RLPStream & _s )
{
{
bytes b ;
bytes b ;
@ -441,7 +426,7 @@ void PeerSession::sendDestroy(bytes& _msg)
}
}
bytes buffer = bytes ( std : : move ( _msg ) ) ;
bytes buffer = bytes ( std : : move ( _msg ) ) ;
m_strand . post ( boost : : bind ( & PeerSession : : writeImpl , this , buffer ) ) ;
writeImpl ( buffer ) ;
}
}
void PeerSession : : send ( bytesConstRef _msg )
void PeerSession : : send ( bytesConstRef _msg )
@ -454,59 +439,66 @@ void PeerSession::send(bytesConstRef _msg)
}
}
bytes buffer = bytes ( _msg . toBytes ( ) ) ;
bytes buffer = bytes ( _msg . toBytes ( ) ) ;
m_strand . post ( boost : : bind ( & PeerSession : : writeImpl , this , buffer ) ) ;
writeImpl ( buffer ) ;
}
}
void PeerSession : : writeImpl ( bytes & _buffer )
void PeerSession : : writeImpl ( bytes & _buffer )
{
{
m_writeq . push_back ( _buffer ) ;
// cerr << (void*)this << " writeImpl" << endl;
if ( m_writeq . size ( ) > 1 )
if ( ! m_socket . is_open ( ) )
return ;
return ;
lock_guard < recursive_mutex > l ( m_writeLock ) ;
m_writeQueue . push_back ( _buffer ) ;
if ( m_writeQueue . size ( ) = = 1 )
write ( ) ;
write ( ) ;
}
}
void PeerSession : : write ( )
void PeerSession : : write ( )
{
{
if ( m_writeq . empty ( ) )
// cerr << (void*)this << " write" << endl;
lock_guard < recursive_mutex > l ( m_writeLock ) ;
if ( m_writeQueue . empty ( ) )
return ;
return ;
const bytes & bytes = m_writeq [ 0 ] ;
const bytes & bytes = m_writeQueue [ 0 ] ;
if ( m_socket . is_open ( ) )
auto self ( shared_from_this ( ) ) ;
ba : : async_write ( m_socket , ba : : buffer ( bytes ) , m_strand . wrap ( [ this ] ( boost : : system : : error_code ec , std : : size_t /*length*/ )
ba : : async_write ( m_socket , ba : : buffer ( bytes ) , [ this , self ] ( boost : : system : : error_code ec , std : : size_t /*length*/ )
{
{
// must check que, as write callback can occur following dropped()
// cerr << (void*)this << " write.callback" << endl;
if ( ! m_writeq . empty ( ) )
m_writeq . pop_front ( ) ;
// must check queue, as write callback can occur following dropped()
if ( ec )
if ( ec )
{
{
cwarn < < " Error sending: " < < ec . message ( ) ;
cwarn < < " Error sending: " < < ec . message ( ) ;
dropped ( ) ;
dropped ( ) ;
} else
}
m_strand . post ( boost : : bind ( & PeerSession : : write , this ) ) ;
else
} ) ) ;
{
m_writeQueue . pop_front ( ) ;
write ( ) ;
}
} ) ;
}
}
void PeerSession : : dropped ( )
void PeerSession : : dropped ( )
{
{
// cerr << (void*)this << " dropped" << endl;
if ( m_socket . is_open ( ) )
if ( m_socket . is_open ( ) )
try {
try
{
clogS ( NetNote ) < < " Closing " < < m_socket . remote_endpoint ( ) ;
clogS ( NetNote ) < < " Closing " < < m_socket . remote_endpoint ( ) ;
m_socket . close ( ) ;
m_socket . close ( ) ;
} catch ( . . . ) { }
}
catch ( . . . ) { }
// block future writes by running in strand and clearing queue
// Remove from peer server
m_strand . post ( [ = ] ( )
{
m_writeq . clear ( ) ;
for ( auto i = m_server - > m_peers . begin ( ) ; i ! = m_server - > m_peers . end ( ) ; + + i )
for ( auto i = m_server - > m_peers . begin ( ) ; i ! = m_server - > m_peers . end ( ) ; + + i )
if ( i - > second . lock ( ) . get ( ) = = this )
if ( i - > second . lock ( ) . get ( ) = = this )
{
{
m_server - > m_peers . erase ( i ) ;
m_server - > m_peers . erase ( i ) ;
break ;
break ;
}
}
} ) ;
}
}
void PeerSession : : disconnect ( int _reason )
void PeerSession : : disconnect ( int _reason )
@ -568,10 +560,7 @@ void PeerSession::doRead()
while ( m_incoming . size ( ) > 8 )
while ( m_incoming . size ( ) > 8 )
{
{
if ( m_incoming [ 0 ] ! = 0x22 | | m_incoming [ 1 ] ! = 0x40 | | m_incoming [ 2 ] ! = 0x08 | | m_incoming [ 3 ] ! = 0x91 )
if ( m_incoming [ 0 ] ! = 0x22 | | m_incoming [ 1 ] ! = 0x40 | | m_incoming [ 2 ] ! = 0x08 | | m_incoming [ 3 ] ! = 0x91 )
{
doRead ( ) ;
doRead ( ) ;
}
else
else
{
{
uint32_t len = fromBigEndian < uint32_t > ( bytesConstRef ( m_incoming . data ( ) + 4 , 4 ) ) ;
uint32_t len = fromBigEndian < uint32_t > ( bytesConstRef ( m_incoming . data ( ) + 4 , 4 ) ) ;