Browse Source

Fix for #246. Removed sync() to prevent race condition within dropped() and . Raised maxblocks to 64.

cl-refactor
subtly 11 years ago
parent
commit
0f5df0a8a8
  1. 24
      libethereum/PeerServer.cpp
  2. 1
      libethereum/PeerServer.h
  3. 117
      libethereum/PeerSession.cpp
  4. 4
      libethereum/PeerSession.h
  5. 2
      test/peer.cpp

24
libethereum/PeerServer.cpp

@ -323,25 +323,6 @@ void PeerServer::connect(bi::tcp::endpoint const& _ep)
});
}
bool PeerServer::sync()
{
bool ret = false;
if (isInitialised())
for (auto i = m_peers.begin(); i != m_peers.end();)
{
auto p = i->second.lock();
if (p && p->m_socket.is_open() &&
(p->m_disconnect == chrono::steady_clock::time_point::max() || chrono::steady_clock::now() - p->m_disconnect < chrono::seconds(1))) // kill old peers that should be disconnected.
++i;
else
{
i = m_peers.erase(i);
ret = true;
}
}
return ret;
}
bool PeerServer::ensureInitialised(BlockChain& _bc, TransactionQueue& _tq)
{
if (m_latestBlockSent == h256())
@ -361,10 +342,7 @@ bool PeerServer::ensureInitialised(BlockChain& _bc, TransactionQueue& _tq)
bool PeerServer::sync(BlockChain& _bc, TransactionQueue& _tq, OverlayDB& _o)
{
bool ret = ensureInitialised(_bc, _tq);
if (sync())
ret = true;
if (m_mode == NodeMode::Full)
{
for (auto it = m_incomingTransactions.begin(); it != m_incomingTransactions.end(); ++it)

1
libethereum/PeerServer.h

@ -58,7 +58,6 @@ public:
/// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network.
bool sync(BlockChain& _bc, TransactionQueue&, OverlayDB& _o);
bool sync();
/// Conduct I/O, polling, syncing, whatever.
/// Ideally all time-consuming I/O is done in a background thread or otherwise asynchronously, but you get this call every 100ms or so anyway.

117
libethereum/PeerSession.cpp

@ -32,11 +32,12 @@ using namespace eth;
#define clogS(X) eth::LogOutputStream<X, true>(false) << "| " << std::setw(2) << m_socket.native_handle() << "] "
static const eth::uint c_maxHashes = 32; ///< Maximum number of hashes GetChain will ever send.
static const eth::uint c_maxBlocks = 32; ///< Maximum number of blocks Blocks will ever send. BUG: if this gets too big (e.g. 2048) stuff starts going wrong.
static const eth::uint c_maxBlocks = 64; ///< Maximum number of blocks Blocks will ever send. BUG: if this gets too big (e.g. 2048) stuff starts going wrong.
static const eth::uint c_maxBlocksAsk = 256; ///< Maximum number of blocks we ask to receive in Blocks (when using GetChain).
PeerSession::PeerSession(PeerServer* _s, bi::tcp::socket _socket, uint _rNId, bi::address _peerAddress, unsigned short _peerPort):
m_server(_s),
m_strand(_socket.get_io_service()),
m_socket(std::move(_socket)),
m_reqNetworkId(_rNId),
m_listenPort(_peerPort),
@ -49,7 +50,16 @@ PeerSession::PeerSession(PeerServer* _s, bi::tcp::socket _socket, uint _rNId, bi
PeerSession::~PeerSession()
{
m_socket.close();
m_strand.post([=]()
{
if (!m_writeq.empty())
m_writeq.clear();
try {
if (m_socket.is_open())
m_socket.close();
}catch (...){}
});
}
bi::tcp::endpoint PeerSession::endpoint() const
@ -62,8 +72,6 @@ bi::tcp::endpoint PeerSession::endpoint() const
return bi::tcp::endpoint();
}
// TODO: BUG! 256 -> work out why things start to break with big packet sizes -> g.t. ~370 blocks.
bool PeerSession::interpret(RLP const& _r)
{
clogS(NetRight) << _r;
@ -281,21 +289,26 @@ bool PeerSession::interpret(RLP const& _r)
uint parentNumber = 0;
RLPStream s;
if (m_server->m_chain->details(parent))
// try to find parent in our blockchain
// todo: add some delta() fn to blockchain
BlockDetails f_parent = m_server->m_chain->details(parent);
if (f_parent)
{
latestNumber = m_server->m_chain->number(latest);
parentNumber = m_server->m_chain->number(parent);
parentNumber = f_parent.number;
uint count = min<uint>(latestNumber - parentNumber, baseCount);
clogS(NetAllDetail) << "Requires " << dec << (latestNumber - parentNumber) << " blocks from " << latestNumber << " to " << parentNumber;
clogS(NetAllDetail) << latest << " - " << parent;
prep(s);
s.appendList(1 + count) << BlocksPacket;
uint endNumber = m_server->m_chain->number(parent);
uint endNumber = parentNumber;
uint startNumber = endNumber + count;
clogS(NetAllDetail) << "Sending " << dec << count << " blocks from " << startNumber << " to " << endNumber;
// append blocks
uint n = latestNumber;
// seek back (occurs when count is limited by baseCount)
for (; n > startNumber; n--, h = m_server->m_chain->details(h).parent) {}
for (uint i = 0; i < count; ++i, --n, h = m_server->m_chain->details(h).parent)
{
@ -429,19 +442,8 @@ void PeerSession::sendDestroy(bytes& _msg)
cwarn << "INVALID PACKET CONSTRUCTED!";
}
auto self(shared_from_this());
bytes* buffer = new bytes(std::move(_msg));
if (m_socket.is_open())
ba::async_write(m_socket, ba::buffer(*buffer), [self, buffer](boost::system::error_code ec, std::size_t /*length*/)
{
delete buffer;
if (ec)
{
cwarn << "Error sending: " << ec.message();
self->dropped();
}
// cbug << length << " bytes written (EC: " << ec << ")";
});
bytes buffer = bytes(std::move(_msg));
m_strand.post(boost::bind(&PeerSession::writeImpl, this, buffer));
}
void PeerSession::send(bytesConstRef _msg)
@ -453,19 +455,39 @@ void PeerSession::send(bytesConstRef _msg)
cwarn << "INVALID PACKET CONSTRUCTED!";
}
auto self(shared_from_this());
bytes* buffer = new bytes(_msg.toBytes());
bytes buffer = bytes(_msg.toBytes());
m_strand.post(boost::bind(&PeerSession::writeImpl, this, buffer));
}
void PeerSession::writeImpl(bytes& _buffer)
{
m_writeq.push_back(_buffer);
if (m_writeq.size() > 1)
return;
this->write();
}
void PeerSession::write()
{
if (m_writeq.empty())
return;
const bytes& bytes = m_writeq[0];
if (m_socket.is_open())
ba::async_write(m_socket, ba::buffer(*buffer), [self, buffer](boost::system::error_code ec, std::size_t /*length*/)
ba::async_write(m_socket, ba::buffer(bytes), m_strand.wrap([this](boost::system::error_code ec, std::size_t /*length*/)
{
delete buffer;
// must check que, as write callback can occur following dropped()
if (!m_writeq.empty())
this->m_writeq.pop_front();
if (ec)
{
cwarn << "Error sending: " << ec.message();
self->dropped();
}
// cbug << length << " bytes written (EC: " << ec << ")";
});
this->dropped();
} else
m_strand.post(boost::bind(&PeerSession::write, this));
}));
}
void PeerSession::dropped()
@ -475,12 +497,18 @@ void PeerSession::dropped()
clogS(NetNote) << "Closing " << m_socket.remote_endpoint();
m_socket.close();
}catch (...){}
for (auto i = m_server->m_peers.begin(); i != m_server->m_peers.end(); ++i)
if (i->second.lock().get() == this)
{
m_server->m_peers.erase(i);
break;
}
// block future writes by running in strand and clearing queue
m_strand.post([=]()
{
m_writeq.clear();
for (auto i = m_server->m_peers.begin(); i != m_server->m_peers.end(); ++i)
if (i->second.lock().get() == this)
{
m_server->m_peers.erase(i);
break;
}
});
}
void PeerSession::disconnect(int _reason)
@ -515,14 +543,24 @@ void PeerSession::start()
void PeerSession::doRead()
{
// ignore packets received while waiting to disconnect
if (chrono::steady_clock::now() - m_disconnect > chrono::seconds(0))
return;
auto self(shared_from_this());
m_socket.async_read_some(boost::asio::buffer(m_data), [this,self](boost::system::error_code ec, std::size_t length)
{
if (ec)
// If error is end of file, ignore
if (ec && ec.category() != boost::asio::error::get_misc_category() && ec.value() != boost::asio::error::eof)
{
// got here with length of 1241...
cwarn << "Error reading: " << ec.message();
dropped();
}
else if(ec && length == 0)
{
return;
}
else
{
try
@ -533,12 +571,8 @@ void PeerSession::doRead()
{
if (m_incoming[0] != 0x22 || m_incoming[1] != 0x40 || m_incoming[2] != 0x08 || m_incoming[3] != 0x91)
{
clogS(NetWarn) << "Out of alignment.";
disconnect(BadProtocol);
return;
clogS(NetNote) << "Skipping: " << hex << showbase << (int)m_incoming[0] << dec;
memmove(m_incoming.data(), m_incoming.data() + 1, m_incoming.size() - 1);
m_incoming.resize(m_incoming.size() - 1);
doRead();
}
else
{
@ -548,7 +582,6 @@ void PeerSession::doRead()
break;
// enough has come in.
// cerr << "Received " << len << ": " << toHex(bytesConstRef(m_incoming.data() + 8, len)) << endl;
auto data = bytesConstRef(m_incoming.data(), tlen);
if (!checkPacket(data))
{

4
libethereum/PeerSession.h

@ -62,7 +62,11 @@ private:
void sealAndSend(RLPStream& _s);
void sendDestroy(bytes& _msg);
void send(bytesConstRef _msg);
void writeImpl(bytes& _buffer);
void write();
PeerServer* m_server;
boost::asio::strand m_strand;
std::deque<bytes> m_writeq;
bi::tcp::socket m_socket;
std::array<byte, 65536> m_data;

2
test/peer.cpp

@ -57,7 +57,7 @@ int peerTest(int argc, char** argv)
for (int i = 0; ; ++i)
{
this_thread::sleep_for(chrono::milliseconds(100));
pn.sync();
// pn.sync();
if (!(i % 10))
pn.pingAll();
}

Loading…
Cancel
Save