From 91c1819d4331063adf6e7c8ec78c3d0afc60c0e2 Mon Sep 17 00:00:00 2001 From: subtly Date: Wed, 17 Sep 2014 17:15:34 +0200 Subject: [PATCH] mutex shared host/peer sets, write-loop crash fix --- libethereum/EthereumHost.cpp | 53 ++++++++++++++++++++---------------- libethereum/EthereumHost.h | 3 ++ libethereum/EthereumPeer.cpp | 4 ++- libethereum/EthereumPeer.h | 3 ++ libp2p/Session.cpp | 22 +++++++++------ libp2p/Session.h | 2 +- 6 files changed, 53 insertions(+), 34 deletions(-) diff --git a/libethereum/EthereumHost.cpp b/libethereum/EthereumHost.cpp index 7d3bd9a92..1e623b605 100644 --- a/libethereum/EthereumHost.cpp +++ b/libethereum/EthereumHost.cpp @@ -131,37 +131,44 @@ void EthereumHost::maintainTransactions(TransactionQueue& _tq, h256 _currentHash { bool resendAll = (_currentHash != m_latestBlockSent); - for (auto it = m_incomingTransactions.begin(); it != m_incomingTransactions.end(); ++it) - if (_tq.import(&*it)) - {}//ret = true; // just putting a transaction in the queue isn't enough to change the state - it might have an invalid nonce... - else - m_transactionsSent.insert(sha3(*it)); // if we already had the transaction, then don't bother sending it on. - m_incomingTransactions.clear(); + { + lock_guard l(m_incomingLock); + for (auto it = m_incomingTransactions.begin(); it != m_incomingTransactions.end(); ++it) + if (_tq.import(&*it)) + {}//ret = true; // just putting a transaction in the queue isn't enough to change the state - it might have an invalid nonce... + else + m_transactionsSent.insert(sha3(*it)); // if we already had the transaction, then don't bother sending it on. + m_incomingTransactions.clear(); + } // Send any new transactions. for (auto const& p: peers()) { auto ep = p->cap(); - bytes b; - unsigned n = 0; - for (auto const& i: _tq.transactions()) - if ((!m_transactionsSent.count(i.first) && !ep->m_knownTransactions.count(i.first)) || ep->m_requireTransactions || resendAll) + if (ep) + { + bytes b; + unsigned n = 0; + for (auto const& i: _tq.transactions()) + if ((!m_transactionsSent.count(i.first) && !ep->m_knownTransactions.count(i.first)) || ep->m_requireTransactions || resendAll) + { + b += i.second; + ++n; + m_transactionsSent.insert(i.first); + } + ep->clearKnownTransactions(); + + if (n) { - b += i.second; - ++n; - m_transactionsSent.insert(i.first); + RLPStream ts; + EthereumPeer::prep(ts); + ts.appendList(n + 1) << TransactionsPacket; + ts.appendRaw(b, n).swapOut(b); + seal(b); + ep->send(&b); } - if (n) - { - RLPStream ts; - EthereumPeer::prep(ts); - ts.appendList(n + 1) << TransactionsPacket; - ts.appendRaw(b, n).swapOut(b); - seal(b); - ep->send(&b); + ep->m_requireTransactions = false; } - ep->m_knownTransactions.clear(); - ep->m_requireTransactions = false; } } diff --git a/libethereum/EthereumHost.h b/libethereum/EthereumHost.h index bd0a5d0bc..6cd9332a2 100644 --- a/libethereum/EthereumHost.h +++ b/libethereum/EthereumHost.h @@ -75,6 +75,9 @@ private: /// Called when the peer can no longer provide us with any needed blocks. void noteDoneBlocks(); + /// Called by peer to add incoming transactions. + void addIncomingTransaction(bytes const& _bytes) { std::lock_guard l(m_incomingLock); m_incomingTransactions.push_back(_bytes); } + void maintainTransactions(TransactionQueue& _tq, h256 _currentBlock); void maintainBlocks(BlockQueue& _bq, h256 _currentBlock); diff --git a/libethereum/EthereumPeer.cpp b/libethereum/EthereumPeer.cpp index 9e72f8429..c961f6733 100644 --- a/libethereum/EthereumPeer.cpp +++ b/libethereum/EthereumPeer.cpp @@ -142,7 +142,9 @@ bool EthereumPeer::interpret(RLP const& _r) addRating(_r.itemCount() - 1); for (unsigned i = 1; i < _r.itemCount(); ++i) { - host()->m_incomingTransactions.push_back(_r[i].data().toBytes()); + host()->addIncomingTransaction(_r[i].data().toBytes()); + + lock_guard l(x_knownTransactions); m_knownTransactions.insert(sha3(_r[i].data())); } break; diff --git a/libethereum/EthereumPeer.h b/libethereum/EthereumPeer.h index 2e166b1d0..92eb475ec 100644 --- a/libethereum/EthereumPeer.h +++ b/libethereum/EthereumPeer.h @@ -67,6 +67,8 @@ private: void giveUpOnFetch(); + void clearKnownTransactions() { std::lock_guard l(x_knownTransactions); m_knownTransactions.clear(); } + unsigned m_protocolVersion; u256 m_networkId; @@ -82,6 +84,7 @@ private: std::set m_knownBlocks; std::set m_knownTransactions; + std::mutex x_knownTransactions; }; } diff --git a/libp2p/Session.cpp b/libp2p/Session.cpp index 5ed987cff..777717371 100644 --- a/libp2p/Session.cpp +++ b/libp2p/Session.cpp @@ -266,19 +266,19 @@ void Session::writeImpl(bytes& _buffer) if (!m_socket.is_open()) return; - lock_guard l(m_writeLock); - m_writeQueue.push_back(_buffer); - if (m_writeQueue.size() == 1) + bool doWrite = false; + { + lock_guard l(m_writeLock); + m_writeQueue.push_back(_buffer); + doWrite = (m_writeQueue.size() == 1); + } + + if (doWrite) write(); } void Session::write() { -// cerr << (void*)this << " write" << endl; - lock_guard l(m_writeLock); - if (m_writeQueue.empty()) - return; - const bytes& bytes = m_writeQueue[0]; auto self(shared_from_this()); ba::async_write(m_socket, ba::buffer(bytes), [this, self](boost::system::error_code ec, std::size_t /*length*/) @@ -290,12 +290,16 @@ void Session::write() { cwarn << "Error sending: " << ec.message(); dropped(); + return; } else { + lock_guard l(m_writeLock); m_writeQueue.pop_front(); - write(); + if (m_writeQueue.empty()) + return; } + write(); }); } diff --git a/libp2p/Session.h b/libp2p/Session.h index bc43934b3..7c3fc3732 100644 --- a/libp2p/Session.h +++ b/libp2p/Session.h @@ -86,7 +86,7 @@ private: Host* m_server; - std::recursive_mutex m_writeLock; + std::mutex m_writeLock; std::deque m_writeQueue; mutable bi::tcp::socket m_socket; ///< Mutable to ask for native_handle().