From 5b1d8b858f1528c4fcd0554682206429fd4c2ff0 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Wed, 23 Jul 2014 23:30:05 +0200 Subject: [PATCH 1/3] Possible network fix. --- libethereum/PeerServer.cpp | 13 +++++++ libethereum/PeerSession.cpp | 68 +++++++++++++++++-------------------- libethereum/PeerSession.h | 4 ++- 3 files changed, 47 insertions(+), 38 deletions(-) diff --git a/libethereum/PeerServer.cpp b/libethereum/PeerServer.cpp index 275d9ace7..99e4ad0b0 100644 --- a/libethereum/PeerServer.cpp +++ b/libethereum/PeerServer.cpp @@ -117,6 +117,19 @@ unsigned PeerServer::protocolVersion() return c_protocolVersion; } +void PeerServer::seal(bytes& _b) +{ + _b[0] = 0x22; + _b[1] = 0x40; + _b[2] = 0x08; + _b[3] = 0x91; + uint32_t len = (uint32_t)_b.size() - 8; + _b[4] = (len >> 24) & 0xff; + _b[5] = (len >> 16) & 0xff; + _b[6] = (len >> 8) & 0xff; + _b[7] = len & 0xff; +} + void PeerServer::determinePublic(string const& _publicAddress, bool _upnp) { if (_upnp) diff --git a/libethereum/PeerSession.cpp b/libethereum/PeerSession.cpp index a80cc6b27..f02ca2de3 100644 --- a/libethereum/PeerSession.cpp +++ b/libethereum/PeerSession.cpp @@ -52,8 +52,8 @@ PeerSession::~PeerSession() { m_strand.post([=]() { - if (!m_writeq.empty()) - m_writeq.clear(); + if (!m_writeQueue.empty()) + m_writeQueue.clear(); try { if (m_socket.is_open()) @@ -395,19 +395,6 @@ RLPStream& PeerSession::prep(RLPStream& _s) return _s.appendRaw(bytes(8, 0)); } -void PeerServer::seal(bytes& _b) -{ - _b[0] = 0x22; - _b[1] = 0x40; - _b[2] = 0x08; - _b[3] = 0x91; - uint32_t len = (uint32_t)_b.size() - 8; - _b[4] = (len >> 24) & 0xff; - _b[5] = (len >> 16) & 0xff; - _b[6] = (len >> 8) & 0xff; - _b[7] = len & 0xff; -} - void PeerSession::sealAndSend(RLPStream& _s) { bytes b; @@ -459,31 +446,32 @@ void PeerSession::send(bytesConstRef _msg) void PeerSession::writeImpl(bytes& _buffer) { - m_writeq.push_back(_buffer); - if (m_writeq.size() > 1) + m_writeQueue.push_back(_buffer); + if (m_writeQueue.size() > 1) return; - this->write(); + write(); } void PeerSession::write() { - if (m_writeq.empty()) + if (m_writeQueue.empty()) return; - const bytes& bytes = m_writeq[0]; + const bytes& bytes = m_writeQueue[0]; if (m_socket.is_open()) ba::async_write(m_socket, ba::buffer(bytes), m_strand.wrap([this](boost::system::error_code ec, std::size_t /*length*/) { - // must check que, as write callback can occur following dropped() - if (!m_writeq.empty()) - this->m_writeq.pop_front(); + // must check queue, as write callback can occur following dropped() + if (!m_writeQueue.empty()) + m_writeQueue.pop_front(); if (ec) { cwarn << "Error sending: " << ec.message(); - this->dropped(); - } else + dropped(); + } + else m_strand.post(boost::bind(&PeerSession::write, this)); })); } @@ -491,21 +479,30 @@ void PeerSession::write() void PeerSession::dropped() { if (m_socket.is_open()) - try { + try + { clogS(NetNote) << "Closing " << m_socket.remote_endpoint(); m_socket.close(); - }catch (...){} - + } + catch (...) {} + // block future writes by running in strand and clearing queue m_strand.post([=]() { - m_writeq.clear(); - for (auto i = m_server->m_peers.begin(); i != m_server->m_peers.end(); ++i) - if (i->second.lock().get() == this) + m_writeQueue.clear(); + if (!m_willBeDeleted) // Don't want two deleters on the queue at once! + { + m_willBeDeleted = true; + m_strand.post([=]() { - m_server->m_peers.erase(i); - break; - } + for (auto i = m_server->m_peers.begin(); i != m_server->m_peers.end(); ++i) + if (i->second.lock().get() == this) + { + m_server->m_peers.erase(i); + break; + } + }); + } }); } @@ -568,10 +565,7 @@ void PeerSession::doRead() while (m_incoming.size() > 8) { if (m_incoming[0] != 0x22 || m_incoming[1] != 0x40 || m_incoming[2] != 0x08 || m_incoming[3] != 0x91) - { doRead(); - - } else { uint32_t len = fromBigEndian(bytesConstRef(m_incoming.data() + 4, 4)); diff --git a/libethereum/PeerSession.h b/libethereum/PeerSession.h index b0d93d100..014050bae 100644 --- a/libethereum/PeerSession.h +++ b/libethereum/PeerSession.h @@ -66,7 +66,7 @@ private: void write(); PeerServer* m_server; boost::asio::strand m_strand; - std::deque m_writeq; + std::deque m_writeQueue; bi::tcp::socket m_socket; std::array m_data; @@ -89,6 +89,8 @@ private: std::set m_knownBlocks; std::set m_knownTransactions; + + bool m_willBeDeleted = false; ///< True if we already posted a deleter on the strand. }; } From 8d65de7e28c16c14672d3c1519103ec127bdb0d1 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Wed, 23 Jul 2014 23:58:44 +0200 Subject: [PATCH 2/3] More network fixes. --- alethzero/Main.ui | 57 +++++++++++++++++++++++-------------- alethzero/MainWin.cpp | 2 +- libethereum/PeerSession.cpp | 14 ++++----- walleth/MainWin.cpp | 4 +-- 4 files changed, 43 insertions(+), 34 deletions(-) diff --git a/alethzero/Main.ui b/alethzero/Main.ui index 53466c510..3f78ce6f1 100644 --- a/alethzero/Main.ui +++ b/alethzero/Main.ui @@ -269,7 +269,7 @@ - + 1024 @@ -282,53 +282,66 @@ - - - - 1 + + + + - - 5 + + + + + Automatic - - + + - Ideal &Peers + &Listen on - idealPeers + port - - + + - &Client Name + Ideal &Peers - clientName + idealPeers - - - - &Listen on + + + + 1 - - port + + 5 - + Anonymous + + + + &Client Name + + + clientName + + + diff --git a/alethzero/MainWin.cpp b/alethzero/MainWin.cpp index a892bc125..82df1cc4a 100644 --- a/alethzero/MainWin.cpp +++ b/alethzero/MainWin.cpp @@ -1266,7 +1266,7 @@ void Main::on_net_triggered() m_client->setClientVersion(n); if (ui->net->isChecked()) { - m_client->startNetwork(ui->port->value(), string(), 0, NodeMode::Full, ui->idealPeers->value(), std::string(), ui->upnp->isChecked()); + m_client->startNetwork(ui->port->value(), string(), 0, NodeMode::Full, ui->idealPeers->value(), ui->forceAddress->text().toStdString(), ui->upnp->isChecked()); if (m_peers.size() && ui->usePast->isChecked()) m_client->peerServer()->restorePeers(bytesConstRef((byte*)m_peers.data(), m_peers.size())); } diff --git a/libethereum/PeerSession.cpp b/libethereum/PeerSession.cpp index f02ca2de3..9a5d269a6 100644 --- a/libethereum/PeerSession.cpp +++ b/libethereum/PeerSession.cpp @@ -50,16 +50,12 @@ PeerSession::PeerSession(PeerServer* _s, bi::tcp::socket _socket, uint _rNId, bi PeerSession::~PeerSession() { - m_strand.post([=]() + try { - if (!m_writeQueue.empty()) - m_writeQueue.clear(); - - try { - if (m_socket.is_open()) - m_socket.close(); - }catch (...){} - }); + if (m_socket.is_open()) + m_socket.close(); + } + catch (...){} } bi::tcp::endpoint PeerSession::endpoint() const diff --git a/walleth/MainWin.cpp b/walleth/MainWin.cpp index 2c36d94fc..5343ab240 100644 --- a/walleth/MainWin.cpp +++ b/walleth/MainWin.cpp @@ -243,10 +243,10 @@ void Main::on_net_triggered(bool _auto) if (_auto) { QString s = m_servers[rand() % m_servers.size()]; - client()->startNetwork(m_port, s.section(':', 0, 0).toStdString(), s.section(':', 1).toInt(), NodeMode::Full, m_idealPeers, std::string(), ui->upnp->isChecked()); + client()->startNetwork(m_port, s.section(':', 0, 0).toStdString(), s.section(':', 1).toInt(), NodeMode::Full, m_idealPeers, "", ui->upnp->isChecked()); } else - client()->startNetwork(m_port, string(), 0, NodeMode::Full, m_idealPeers, std::string(), ui->upnp->isChecked()); + client()->startNetwork(m_port, string(), 0, NodeMode::Full, m_idealPeers, "", ui->upnp->isChecked()); if (m_peers.size()) client()->peerServer()->restorePeers(bytesConstRef((byte*)m_peers.data(), m_peers.size())); } From 7ffa5135867ee62d6a3b1645762b6b016ee2ae0c Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Thu, 24 Jul 2014 00:47:52 +0200 Subject: [PATCH 3/3] Networking fix. --- alethzero/MainWin.cpp | 2 ++ libethereum/PeerSession.cpp | 71 ++++++++++++++++++------------------- libethereum/PeerSession.h | 4 ++- 3 files changed, 39 insertions(+), 38 deletions(-) diff --git a/alethzero/MainWin.cpp b/alethzero/MainWin.cpp index 82df1cc4a..dec7f252a 100644 --- a/alethzero/MainWin.cpp +++ b/alethzero/MainWin.cpp @@ -372,6 +372,7 @@ void Main::writeSettings() s.setValue("address", b); s.setValue("upnp", ui->upnp->isChecked()); + s.setValue("forceAddress", ui->forceAddress->text()); s.setValue("usePast", ui->usePast->isChecked()); s.setValue("paranoia", ui->paranoia->isChecked()); s.setValue("showAll", ui->showAll->isChecked()); @@ -418,6 +419,7 @@ void Main::readSettings() m_client->setAddress(m_myKeys.back().address()); m_peers = s.value("peers").toByteArray(); ui->upnp->setChecked(s.value("upnp", true).toBool()); + ui->forceAddress->setText(s.value("forceAddress", "").toString()); ui->usePast->setChecked(s.value("usePast", true).toBool()); ui->paranoia->setChecked(s.value("paranoia", false).toBool()); ui->showAll->setChecked(s.value("showAll", false).toBool()); diff --git a/libethereum/PeerSession.cpp b/libethereum/PeerSession.cpp index 9a5d269a6..f06b2fcbf 100644 --- a/libethereum/PeerSession.cpp +++ b/libethereum/PeerSession.cpp @@ -37,7 +37,6 @@ static const eth::uint c_maxBlocksAsk = 256; ///< Maximum number of blocks we as PeerSession::PeerSession(PeerServer* _s, bi::tcp::socket _socket, uint _rNId, bi::address _peerAddress, unsigned short _peerPort): m_server(_s), - m_strand(_socket.get_io_service()), m_socket(std::move(_socket)), m_reqNetworkId(_rNId), m_listenPort(_peerPort), @@ -50,6 +49,7 @@ PeerSession::PeerSession(PeerServer* _s, bi::tcp::socket _socket, uint _rNId, bi PeerSession::~PeerSession() { + // Read-chain finished for one reason or another. try { if (m_socket.is_open()) @@ -424,7 +424,7 @@ void PeerSession::sendDestroy(bytes& _msg) } bytes buffer = bytes(std::move(_msg)); - m_strand.post(boost::bind(&PeerSession::writeImpl, this, buffer)); + writeImpl(buffer); } void PeerSession::send(bytesConstRef _msg) @@ -437,43 +437,51 @@ void PeerSession::send(bytesConstRef _msg) } bytes buffer = bytes(_msg.toBytes()); - m_strand.post(boost::bind(&PeerSession::writeImpl, this, buffer)); + writeImpl(buffer); } void PeerSession::writeImpl(bytes& _buffer) { - m_writeQueue.push_back(_buffer); - if (m_writeQueue.size() > 1) +// cerr << (void*)this << " writeImpl" << endl; + if (!m_socket.is_open()) return; - write(); + lock_guard l(m_writeLock); + m_writeQueue.push_back(_buffer); + if (m_writeQueue.size() == 1) + write(); } void PeerSession::write() { +// cerr << (void*)this << " write" << endl; + lock_guard l(m_writeLock); if (m_writeQueue.empty()) return; const bytes& bytes = m_writeQueue[0]; - if (m_socket.is_open()) - ba::async_write(m_socket, ba::buffer(bytes), m_strand.wrap([this](boost::system::error_code ec, std::size_t /*length*/) + auto self(shared_from_this()); + ba::async_write(m_socket, ba::buffer(bytes), [this, self](boost::system::error_code ec, std::size_t /*length*/) + { +// cerr << (void*)this << " write.callback" << endl; + + // must check queue, as write callback can occur following dropped() + if (ec) { - // must check queue, as write callback can occur following dropped() - if (!m_writeQueue.empty()) - m_writeQueue.pop_front(); - - if (ec) - { - cwarn << "Error sending: " << ec.message(); - dropped(); - } - else - m_strand.post(boost::bind(&PeerSession::write, this)); - })); + cwarn << "Error sending: " << ec.message(); + dropped(); + } + else + { + m_writeQueue.pop_front(); + write(); + } + }); } void PeerSession::dropped() { +// cerr << (void*)this << " dropped" << endl; if (m_socket.is_open()) try { @@ -482,24 +490,13 @@ void PeerSession::dropped() } catch (...) {} - // block future writes by running in strand and clearing queue - m_strand.post([=]() - { - m_writeQueue.clear(); - if (!m_willBeDeleted) // Don't want two deleters on the queue at once! + // Remove from peer server + for (auto i = m_server->m_peers.begin(); i != m_server->m_peers.end(); ++i) + if (i->second.lock().get() == this) { - m_willBeDeleted = true; - m_strand.post([=]() - { - for (auto i = m_server->m_peers.begin(); i != m_server->m_peers.end(); ++i) - if (i->second.lock().get() == this) - { - m_server->m_peers.erase(i); - break; - } - }); + m_server->m_peers.erase(i); + break; } - }); } void PeerSession::disconnect(int _reason) @@ -548,7 +545,7 @@ void PeerSession::doRead() cwarn << "Error reading: " << ec.message(); dropped(); } - else if(ec && length == 0) + else if (ec && length == 0) { return; } diff --git a/libethereum/PeerSession.h b/libethereum/PeerSession.h index 014050bae..1a28c66ec 100644 --- a/libethereum/PeerSession.h +++ b/libethereum/PeerSession.h @@ -21,6 +21,7 @@ #pragma once +#include #include #include #include @@ -65,7 +66,8 @@ private: void writeImpl(bytes& _buffer); void write(); PeerServer* m_server; - boost::asio::strand m_strand; + + std::recursive_mutex m_writeLock; std::deque m_writeQueue; bi::tcp::socket m_socket;