Browse Source

Possible network fix.

cl-refactor
Gav Wood 11 years ago
parent
commit
5b1d8b858f
  1. 13
      libethereum/PeerServer.cpp
  2. 56
      libethereum/PeerSession.cpp
  3. 4
      libethereum/PeerSession.h

13
libethereum/PeerServer.cpp

@ -117,6 +117,19 @@ unsigned PeerServer::protocolVersion()
return c_protocolVersion; return c_protocolVersion;
} }
void PeerServer::seal(bytes& _b)
{
_b[0] = 0x22;
_b[1] = 0x40;
_b[2] = 0x08;
_b[3] = 0x91;
uint32_t len = (uint32_t)_b.size() - 8;
_b[4] = (len >> 24) & 0xff;
_b[5] = (len >> 16) & 0xff;
_b[6] = (len >> 8) & 0xff;
_b[7] = len & 0xff;
}
void PeerServer::determinePublic(string const& _publicAddress, bool _upnp) void PeerServer::determinePublic(string const& _publicAddress, bool _upnp)
{ {
if (_upnp) if (_upnp)

56
libethereum/PeerSession.cpp

@ -52,8 +52,8 @@ PeerSession::~PeerSession()
{ {
m_strand.post([=]() m_strand.post([=]()
{ {
if (!m_writeq.empty()) if (!m_writeQueue.empty())
m_writeq.clear(); m_writeQueue.clear();
try { try {
if (m_socket.is_open()) if (m_socket.is_open())
@ -395,19 +395,6 @@ RLPStream& PeerSession::prep(RLPStream& _s)
return _s.appendRaw(bytes(8, 0)); return _s.appendRaw(bytes(8, 0));
} }
void PeerServer::seal(bytes& _b)
{
_b[0] = 0x22;
_b[1] = 0x40;
_b[2] = 0x08;
_b[3] = 0x91;
uint32_t len = (uint32_t)_b.size() - 8;
_b[4] = (len >> 24) & 0xff;
_b[5] = (len >> 16) & 0xff;
_b[6] = (len >> 8) & 0xff;
_b[7] = len & 0xff;
}
void PeerSession::sealAndSend(RLPStream& _s) void PeerSession::sealAndSend(RLPStream& _s)
{ {
bytes b; bytes b;
@ -459,31 +446,32 @@ void PeerSession::send(bytesConstRef _msg)
void PeerSession::writeImpl(bytes& _buffer) void PeerSession::writeImpl(bytes& _buffer)
{ {
m_writeq.push_back(_buffer); m_writeQueue.push_back(_buffer);
if (m_writeq.size() > 1) if (m_writeQueue.size() > 1)
return; return;
this->write(); write();
} }
void PeerSession::write() void PeerSession::write()
{ {
if (m_writeq.empty()) if (m_writeQueue.empty())
return; return;
const bytes& bytes = m_writeq[0]; const bytes& bytes = m_writeQueue[0];
if (m_socket.is_open()) if (m_socket.is_open())
ba::async_write(m_socket, ba::buffer(bytes), m_strand.wrap([this](boost::system::error_code ec, std::size_t /*length*/) ba::async_write(m_socket, ba::buffer(bytes), m_strand.wrap([this](boost::system::error_code ec, std::size_t /*length*/)
{ {
// must check que, as write callback can occur following dropped() // must check queue, as write callback can occur following dropped()
if (!m_writeq.empty()) if (!m_writeQueue.empty())
this->m_writeq.pop_front(); m_writeQueue.pop_front();
if (ec) if (ec)
{ {
cwarn << "Error sending: " << ec.message(); cwarn << "Error sending: " << ec.message();
this->dropped(); dropped();
} else }
else
m_strand.post(boost::bind(&PeerSession::write, this)); m_strand.post(boost::bind(&PeerSession::write, this));
})); }));
} }
@ -491,15 +479,22 @@ void PeerSession::write()
void PeerSession::dropped() void PeerSession::dropped()
{ {
if (m_socket.is_open()) if (m_socket.is_open())
try { try
{
clogS(NetNote) << "Closing " << m_socket.remote_endpoint(); clogS(NetNote) << "Closing " << m_socket.remote_endpoint();
m_socket.close(); m_socket.close();
}catch (...){} }
catch (...) {}
// block future writes by running in strand and clearing queue // block future writes by running in strand and clearing queue
m_strand.post([=]() m_strand.post([=]()
{ {
m_writeq.clear(); m_writeQueue.clear();
if (!m_willBeDeleted) // Don't want two deleters on the queue at once!
{
m_willBeDeleted = true;
m_strand.post([=]()
{
for (auto i = m_server->m_peers.begin(); i != m_server->m_peers.end(); ++i) for (auto i = m_server->m_peers.begin(); i != m_server->m_peers.end(); ++i)
if (i->second.lock().get() == this) if (i->second.lock().get() == this)
{ {
@ -507,6 +502,8 @@ void PeerSession::dropped()
break; break;
} }
}); });
}
});
} }
void PeerSession::disconnect(int _reason) void PeerSession::disconnect(int _reason)
@ -568,10 +565,7 @@ void PeerSession::doRead()
while (m_incoming.size() > 8) while (m_incoming.size() > 8)
{ {
if (m_incoming[0] != 0x22 || m_incoming[1] != 0x40 || m_incoming[2] != 0x08 || m_incoming[3] != 0x91) if (m_incoming[0] != 0x22 || m_incoming[1] != 0x40 || m_incoming[2] != 0x08 || m_incoming[3] != 0x91)
{
doRead(); doRead();
}
else else
{ {
uint32_t len = fromBigEndian<uint32_t>(bytesConstRef(m_incoming.data() + 4, 4)); uint32_t len = fromBigEndian<uint32_t>(bytesConstRef(m_incoming.data() + 4, 4));

4
libethereum/PeerSession.h

@ -66,7 +66,7 @@ private:
void write(); void write();
PeerServer* m_server; PeerServer* m_server;
boost::asio::strand m_strand; boost::asio::strand m_strand;
std::deque<bytes> m_writeq; std::deque<bytes> m_writeQueue;
bi::tcp::socket m_socket; bi::tcp::socket m_socket;
std::array<byte, 65536> m_data; std::array<byte, 65536> m_data;
@ -89,6 +89,8 @@ private:
std::set<h256> m_knownBlocks; std::set<h256> m_knownBlocks;
std::set<h256> m_knownTransactions; std::set<h256> m_knownTransactions;
bool m_willBeDeleted = false; ///< True if we already posted a deleter on the strand.
}; };
} }

Loading…
Cancel
Save