diff --git a/libethereum/PeerServer.cpp b/libethereum/PeerServer.cpp index 8dd982cbb..f87eebf4f 100644 --- a/libethereum/PeerServer.cpp +++ b/libethereum/PeerServer.cpp @@ -323,25 +323,6 @@ void PeerServer::connect(bi::tcp::endpoint const& _ep) }); } -bool PeerServer::sync() -{ - bool ret = false; - if (isInitialised()) - for (auto i = m_peers.begin(); i != m_peers.end();) - { - auto p = i->second.lock(); - if (p && p->m_socket.is_open() && - (p->m_disconnect == chrono::steady_clock::time_point::max() || chrono::steady_clock::now() - p->m_disconnect < chrono::seconds(1))) // kill old peers that should be disconnected. - ++i; - else - { - i = m_peers.erase(i); - ret = true; - } - } - return ret; -} - bool PeerServer::ensureInitialised(BlockChain& _bc, TransactionQueue& _tq) { if (m_latestBlockSent == h256()) @@ -361,10 +342,7 @@ bool PeerServer::ensureInitialised(BlockChain& _bc, TransactionQueue& _tq) bool PeerServer::sync(BlockChain& _bc, TransactionQueue& _tq, OverlayDB& _o) { bool ret = ensureInitialised(_bc, _tq); - - if (sync()) - ret = true; - + if (m_mode == NodeMode::Full) { for (auto it = m_incomingTransactions.begin(); it != m_incomingTransactions.end(); ++it) diff --git a/libethereum/PeerServer.h b/libethereum/PeerServer.h index 6faa4cad8..ec67b8469 100644 --- a/libethereum/PeerServer.h +++ b/libethereum/PeerServer.h @@ -58,7 +58,6 @@ public: /// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network. bool sync(BlockChain& _bc, TransactionQueue&, OverlayDB& _o); - bool sync(); /// Conduct I/O, polling, syncing, whatever. /// Ideally all time-consuming I/O is done in a background thread or otherwise asynchronously, but you get this call every 100ms or so anyway. diff --git a/libethereum/PeerSession.cpp b/libethereum/PeerSession.cpp index 9ceb949c9..1c3253316 100644 --- a/libethereum/PeerSession.cpp +++ b/libethereum/PeerSession.cpp @@ -32,11 +32,12 @@ using namespace eth; #define clogS(X) eth::LogOutputStream(false) << "| " << std::setw(2) << m_socket.native_handle() << "] " static const eth::uint c_maxHashes = 32; ///< Maximum number of hashes GetChain will ever send. -static const eth::uint c_maxBlocks = 32; ///< Maximum number of blocks Blocks will ever send. BUG: if this gets too big (e.g. 2048) stuff starts going wrong. +static const eth::uint c_maxBlocks = 64; ///< Maximum number of blocks Blocks will ever send. BUG: if this gets too big (e.g. 2048) stuff starts going wrong. static const eth::uint c_maxBlocksAsk = 256; ///< Maximum number of blocks we ask to receive in Blocks (when using GetChain). PeerSession::PeerSession(PeerServer* _s, bi::tcp::socket _socket, uint _rNId, bi::address _peerAddress, unsigned short _peerPort): m_server(_s), + m_strand(_socket.get_io_service()), m_socket(std::move(_socket)), m_reqNetworkId(_rNId), m_listenPort(_peerPort), @@ -49,7 +50,16 @@ PeerSession::PeerSession(PeerServer* _s, bi::tcp::socket _socket, uint _rNId, bi PeerSession::~PeerSession() { - m_socket.close(); + m_strand.post([=]() + { + if (!m_writeq.empty()) + m_writeq.clear(); + + try { + if (m_socket.is_open()) + m_socket.close(); + }catch (...){} + }); } bi::tcp::endpoint PeerSession::endpoint() const @@ -62,8 +72,6 @@ bi::tcp::endpoint PeerSession::endpoint() const return bi::tcp::endpoint(); } -// TODO: BUG! 256 -> work out why things start to break with big packet sizes -> g.t. ~370 blocks. - bool PeerSession::interpret(RLP const& _r) { clogS(NetRight) << _r; @@ -281,21 +289,26 @@ bool PeerSession::interpret(RLP const& _r) uint parentNumber = 0; RLPStream s; - if (m_server->m_chain->details(parent)) + // try to find parent in our blockchain + // todo: add some delta() fn to blockchain + BlockDetails f_parent = m_server->m_chain->details(parent); + if (f_parent) { latestNumber = m_server->m_chain->number(latest); - parentNumber = m_server->m_chain->number(parent); + parentNumber = f_parent.number; uint count = min(latestNumber - parentNumber, baseCount); clogS(NetAllDetail) << "Requires " << dec << (latestNumber - parentNumber) << " blocks from " << latestNumber << " to " << parentNumber; clogS(NetAllDetail) << latest << " - " << parent; prep(s); s.appendList(1 + count) << BlocksPacket; - uint endNumber = m_server->m_chain->number(parent); + uint endNumber = parentNumber; uint startNumber = endNumber + count; clogS(NetAllDetail) << "Sending " << dec << count << " blocks from " << startNumber << " to " << endNumber; + // append blocks uint n = latestNumber; + // seek back (occurs when count is limited by baseCount) for (; n > startNumber; n--, h = m_server->m_chain->details(h).parent) {} for (uint i = 0; i < count; ++i, --n, h = m_server->m_chain->details(h).parent) { @@ -429,19 +442,8 @@ void PeerSession::sendDestroy(bytes& _msg) cwarn << "INVALID PACKET CONSTRUCTED!"; } - auto self(shared_from_this()); - bytes* buffer = new bytes(std::move(_msg)); - if (m_socket.is_open()) - ba::async_write(m_socket, ba::buffer(*buffer), [self, buffer](boost::system::error_code ec, std::size_t /*length*/) - { - delete buffer; - if (ec) - { - cwarn << "Error sending: " << ec.message(); - self->dropped(); - } - // cbug << length << " bytes written (EC: " << ec << ")"; - }); + bytes buffer = bytes(std::move(_msg)); + m_strand.post(boost::bind(&PeerSession::writeImpl, this, buffer)); } void PeerSession::send(bytesConstRef _msg) @@ -453,19 +455,39 @@ void PeerSession::send(bytesConstRef _msg) cwarn << "INVALID PACKET CONSTRUCTED!"; } - auto self(shared_from_this()); - bytes* buffer = new bytes(_msg.toBytes()); + bytes buffer = bytes(_msg.toBytes()); + m_strand.post(boost::bind(&PeerSession::writeImpl, this, buffer)); +} + +void PeerSession::writeImpl(bytes& _buffer) +{ + m_writeq.push_back(_buffer); + if (m_writeq.size() > 1) + return; + + this->write(); +} + +void PeerSession::write() +{ + if (m_writeq.empty()) + return; + + const bytes& bytes = m_writeq[0]; if (m_socket.is_open()) - ba::async_write(m_socket, ba::buffer(*buffer), [self, buffer](boost::system::error_code ec, std::size_t /*length*/) + ba::async_write(m_socket, ba::buffer(bytes), m_strand.wrap([this](boost::system::error_code ec, std::size_t /*length*/) { - delete buffer; + // must check que, as write callback can occur following dropped() + if (!m_writeq.empty()) + this->m_writeq.pop_front(); + if (ec) { cwarn << "Error sending: " << ec.message(); - self->dropped(); - } - // cbug << length << " bytes written (EC: " << ec << ")"; - }); + this->dropped(); + } else + m_strand.post(boost::bind(&PeerSession::write, this)); + })); } void PeerSession::dropped() @@ -475,12 +497,18 @@ void PeerSession::dropped() clogS(NetNote) << "Closing " << m_socket.remote_endpoint(); m_socket.close(); }catch (...){} - for (auto i = m_server->m_peers.begin(); i != m_server->m_peers.end(); ++i) - if (i->second.lock().get() == this) - { - m_server->m_peers.erase(i); - break; - } + + // block future writes by running in strand and clearing queue + m_strand.post([=]() + { + m_writeq.clear(); + for (auto i = m_server->m_peers.begin(); i != m_server->m_peers.end(); ++i) + if (i->second.lock().get() == this) + { + m_server->m_peers.erase(i); + break; + } + }); } void PeerSession::disconnect(int _reason) @@ -515,14 +543,24 @@ void PeerSession::start() void PeerSession::doRead() { + // ignore packets received while waiting to disconnect + if (chrono::steady_clock::now() - m_disconnect > chrono::seconds(0)) + return; + auto self(shared_from_this()); m_socket.async_read_some(boost::asio::buffer(m_data), [this,self](boost::system::error_code ec, std::size_t length) { - if (ec) + // If error is end of file, ignore + if (ec && ec.category() != boost::asio::error::get_misc_category() && ec.value() != boost::asio::error::eof) { + // got here with length of 1241... cwarn << "Error reading: " << ec.message(); dropped(); } + else if(ec && length == 0) + { + return; + } else { try @@ -533,12 +571,8 @@ void PeerSession::doRead() { if (m_incoming[0] != 0x22 || m_incoming[1] != 0x40 || m_incoming[2] != 0x08 || m_incoming[3] != 0x91) { - clogS(NetWarn) << "Out of alignment."; - disconnect(BadProtocol); - return; - clogS(NetNote) << "Skipping: " << hex << showbase << (int)m_incoming[0] << dec; - memmove(m_incoming.data(), m_incoming.data() + 1, m_incoming.size() - 1); - m_incoming.resize(m_incoming.size() - 1); + doRead(); + } else { @@ -548,7 +582,6 @@ void PeerSession::doRead() break; // enough has come in. -// cerr << "Received " << len << ": " << toHex(bytesConstRef(m_incoming.data() + 8, len)) << endl; auto data = bytesConstRef(m_incoming.data(), tlen); if (!checkPacket(data)) { diff --git a/libethereum/PeerSession.h b/libethereum/PeerSession.h index e00a1e111..b0d93d100 100644 --- a/libethereum/PeerSession.h +++ b/libethereum/PeerSession.h @@ -62,7 +62,11 @@ private: void sealAndSend(RLPStream& _s); void sendDestroy(bytes& _msg); void send(bytesConstRef _msg); + void writeImpl(bytes& _buffer); + void write(); PeerServer* m_server; + boost::asio::strand m_strand; + std::deque m_writeq; bi::tcp::socket m_socket; std::array m_data; diff --git a/test/peer.cpp b/test/peer.cpp index 20f2d3225..7370df34b 100644 --- a/test/peer.cpp +++ b/test/peer.cpp @@ -57,7 +57,7 @@ int peerTest(int argc, char** argv) for (int i = 0; ; ++i) { this_thread::sleep_for(chrono::milliseconds(100)); - pn.sync(); +// pn.sync(); if (!(i % 10)) pn.pingAll(); }