|
|
@ -37,7 +37,6 @@ static const eth::uint c_maxBlocksAsk = 256; ///< Maximum number of blocks we as |
|
|
|
|
|
|
|
PeerSession::PeerSession(PeerServer* _s, bi::tcp::socket _socket, uint _rNId, bi::address _peerAddress, unsigned short _peerPort): |
|
|
|
m_server(_s), |
|
|
|
m_strand(_socket.get_io_service()), |
|
|
|
m_socket(std::move(_socket)), |
|
|
|
m_reqNetworkId(_rNId), |
|
|
|
m_listenPort(_peerPort), |
|
|
@ -50,16 +49,13 @@ PeerSession::PeerSession(PeerServer* _s, bi::tcp::socket _socket, uint _rNId, bi |
|
|
|
|
|
|
|
PeerSession::~PeerSession() |
|
|
|
{ |
|
|
|
m_strand.post([=]() |
|
|
|
// Read-chain finished for one reason or another.
|
|
|
|
try |
|
|
|
{ |
|
|
|
if (!m_writeq.empty()) |
|
|
|
m_writeq.clear(); |
|
|
|
|
|
|
|
try { |
|
|
|
if (m_socket.is_open()) |
|
|
|
m_socket.close(); |
|
|
|
}catch (...){} |
|
|
|
}); |
|
|
|
} |
|
|
|
catch (...){} |
|
|
|
} |
|
|
|
|
|
|
|
bi::tcp::endpoint PeerSession::endpoint() const |
|
|
@ -395,19 +391,6 @@ RLPStream& PeerSession::prep(RLPStream& _s) |
|
|
|
return _s.appendRaw(bytes(8, 0)); |
|
|
|
} |
|
|
|
|
|
|
|
void PeerServer::seal(bytes& _b) |
|
|
|
{ |
|
|
|
_b[0] = 0x22; |
|
|
|
_b[1] = 0x40; |
|
|
|
_b[2] = 0x08; |
|
|
|
_b[3] = 0x91; |
|
|
|
uint32_t len = (uint32_t)_b.size() - 8; |
|
|
|
_b[4] = (len >> 24) & 0xff; |
|
|
|
_b[5] = (len >> 16) & 0xff; |
|
|
|
_b[6] = (len >> 8) & 0xff; |
|
|
|
_b[7] = len & 0xff; |
|
|
|
} |
|
|
|
|
|
|
|
void PeerSession::sealAndSend(RLPStream& _s) |
|
|
|
{ |
|
|
|
bytes b; |
|
|
@ -441,7 +424,7 @@ void PeerSession::sendDestroy(bytes& _msg) |
|
|
|
} |
|
|
|
|
|
|
|
bytes buffer = bytes(std::move(_msg)); |
|
|
|
m_strand.post(boost::bind(&PeerSession::writeImpl, this, buffer)); |
|
|
|
writeImpl(buffer); |
|
|
|
} |
|
|
|
|
|
|
|
void PeerSession::send(bytesConstRef _msg) |
|
|
@ -454,59 +437,66 @@ void PeerSession::send(bytesConstRef _msg) |
|
|
|
} |
|
|
|
|
|
|
|
bytes buffer = bytes(_msg.toBytes()); |
|
|
|
m_strand.post(boost::bind(&PeerSession::writeImpl, this, buffer)); |
|
|
|
writeImpl(buffer); |
|
|
|
} |
|
|
|
|
|
|
|
void PeerSession::writeImpl(bytes& _buffer) |
|
|
|
{ |
|
|
|
m_writeq.push_back(_buffer); |
|
|
|
if (m_writeq.size() > 1) |
|
|
|
// cerr << (void*)this << " writeImpl" << endl;
|
|
|
|
if (!m_socket.is_open()) |
|
|
|
return; |
|
|
|
|
|
|
|
this->write(); |
|
|
|
lock_guard<recursive_mutex> l(m_writeLock); |
|
|
|
m_writeQueue.push_back(_buffer); |
|
|
|
if (m_writeQueue.size() == 1) |
|
|
|
write(); |
|
|
|
} |
|
|
|
|
|
|
|
void PeerSession::write() |
|
|
|
{ |
|
|
|
if (m_writeq.empty()) |
|
|
|
// cerr << (void*)this << " write" << endl;
|
|
|
|
lock_guard<recursive_mutex> l(m_writeLock); |
|
|
|
if (m_writeQueue.empty()) |
|
|
|
return; |
|
|
|
|
|
|
|
const bytes& bytes = m_writeq[0]; |
|
|
|
if (m_socket.is_open()) |
|
|
|
ba::async_write(m_socket, ba::buffer(bytes), m_strand.wrap([this](boost::system::error_code ec, std::size_t /*length*/) |
|
|
|
const bytes& bytes = m_writeQueue[0]; |
|
|
|
auto self(shared_from_this()); |
|
|
|
ba::async_write(m_socket, ba::buffer(bytes), [this, self](boost::system::error_code ec, std::size_t /*length*/) |
|
|
|
{ |
|
|
|
// must check que, as write callback can occur following dropped()
|
|
|
|
if (!m_writeq.empty()) |
|
|
|
this->m_writeq.pop_front(); |
|
|
|
// cerr << (void*)this << " write.callback" << endl;
|
|
|
|
|
|
|
|
// must check queue, as write callback can occur following dropped()
|
|
|
|
if (ec) |
|
|
|
{ |
|
|
|
cwarn << "Error sending: " << ec.message(); |
|
|
|
this->dropped(); |
|
|
|
} else |
|
|
|
m_strand.post(boost::bind(&PeerSession::write, this)); |
|
|
|
})); |
|
|
|
dropped(); |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
m_writeQueue.pop_front(); |
|
|
|
write(); |
|
|
|
} |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
void PeerSession::dropped() |
|
|
|
{ |
|
|
|
// cerr << (void*)this << " dropped" << endl;
|
|
|
|
if (m_socket.is_open()) |
|
|
|
try { |
|
|
|
try |
|
|
|
{ |
|
|
|
clogS(NetNote) << "Closing " << m_socket.remote_endpoint(); |
|
|
|
m_socket.close(); |
|
|
|
}catch (...){} |
|
|
|
} |
|
|
|
catch (...) {} |
|
|
|
|
|
|
|
// block future writes by running in strand and clearing queue
|
|
|
|
m_strand.post([=]() |
|
|
|
{ |
|
|
|
m_writeq.clear(); |
|
|
|
// Remove from peer server
|
|
|
|
for (auto i = m_server->m_peers.begin(); i != m_server->m_peers.end(); ++i) |
|
|
|
if (i->second.lock().get() == this) |
|
|
|
{ |
|
|
|
m_server->m_peers.erase(i); |
|
|
|
break; |
|
|
|
} |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
void PeerSession::disconnect(int _reason) |
|
|
@ -568,10 +558,7 @@ void PeerSession::doRead() |
|
|
|
while (m_incoming.size() > 8) |
|
|
|
{ |
|
|
|
if (m_incoming[0] != 0x22 || m_incoming[1] != 0x40 || m_incoming[2] != 0x08 || m_incoming[3] != 0x91) |
|
|
|
{ |
|
|
|
doRead(); |
|
|
|
|
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
uint32_t len = fromBigEndian<uint32_t>(bytesConstRef(m_incoming.data() + 4, 4)); |
|
|
|