Browse Source

Merge pull request #315 from subtly/bugfix

mutex shared host/peer sets, write-loop crash fix
cl-refactor
Gav Wood 11 years ago
parent
commit
57220e5b17
  1. 53
      libethereum/EthereumHost.cpp
  2. 3
      libethereum/EthereumHost.h
  3. 4
      libethereum/EthereumPeer.cpp
  4. 3
      libethereum/EthereumPeer.h
  5. 22
      libp2p/Session.cpp
  6. 2
      libp2p/Session.h

53
libethereum/EthereumHost.cpp

@ -131,37 +131,44 @@ void EthereumHost::maintainTransactions(TransactionQueue& _tq, h256 _currentHash
{
bool resendAll = (_currentHash != m_latestBlockSent);
for (auto it = m_incomingTransactions.begin(); it != m_incomingTransactions.end(); ++it)
if (_tq.import(&*it))
{}//ret = true; // just putting a transaction in the queue isn't enough to change the state - it might have an invalid nonce...
else
m_transactionsSent.insert(sha3(*it)); // if we already had the transaction, then don't bother sending it on.
m_incomingTransactions.clear();
{
lock_guard<recursive_mutex> l(m_incomingLock);
for (auto it = m_incomingTransactions.begin(); it != m_incomingTransactions.end(); ++it)
if (_tq.import(&*it))
{}//ret = true; // just putting a transaction in the queue isn't enough to change the state - it might have an invalid nonce...
else
m_transactionsSent.insert(sha3(*it)); // if we already had the transaction, then don't bother sending it on.
m_incomingTransactions.clear();
}
// Send any new transactions.
for (auto const& p: peers())
{
auto ep = p->cap<EthereumPeer>();
bytes b;
unsigned n = 0;
for (auto const& i: _tq.transactions())
if ((!m_transactionsSent.count(i.first) && !ep->m_knownTransactions.count(i.first)) || ep->m_requireTransactions || resendAll)
if (ep)
{
bytes b;
unsigned n = 0;
for (auto const& i: _tq.transactions())
if ((!m_transactionsSent.count(i.first) && !ep->m_knownTransactions.count(i.first)) || ep->m_requireTransactions || resendAll)
{
b += i.second;
++n;
m_transactionsSent.insert(i.first);
}
ep->clearKnownTransactions();
if (n)
{
b += i.second;
++n;
m_transactionsSent.insert(i.first);
RLPStream ts;
EthereumPeer::prep(ts);
ts.appendList(n + 1) << TransactionsPacket;
ts.appendRaw(b, n).swapOut(b);
seal(b);
ep->send(&b);
}
if (n)
{
RLPStream ts;
EthereumPeer::prep(ts);
ts.appendList(n + 1) << TransactionsPacket;
ts.appendRaw(b, n).swapOut(b);
seal(b);
ep->send(&b);
ep->m_requireTransactions = false;
}
ep->m_knownTransactions.clear();
ep->m_requireTransactions = false;
}
}

3
libethereum/EthereumHost.h

@ -75,6 +75,9 @@ private:
/// Called when the peer can no longer provide us with any needed blocks.
void noteDoneBlocks();
/// Called by peer to add incoming transactions.
void addIncomingTransaction(bytes const& _bytes) { std::lock_guard<std::recursive_mutex> l(m_incomingLock); m_incomingTransactions.push_back(_bytes); }
void maintainTransactions(TransactionQueue& _tq, h256 _currentBlock);
void maintainBlocks(BlockQueue& _bq, h256 _currentBlock);

4
libethereum/EthereumPeer.cpp

@ -142,7 +142,9 @@ bool EthereumPeer::interpret(RLP const& _r)
addRating(_r.itemCount() - 1);
for (unsigned i = 1; i < _r.itemCount(); ++i)
{
host()->m_incomingTransactions.push_back(_r[i].data().toBytes());
host()->addIncomingTransaction(_r[i].data().toBytes());
lock_guard<mutex> l(x_knownTransactions);
m_knownTransactions.insert(sha3(_r[i].data()));
}
break;

3
libethereum/EthereumPeer.h

@ -67,6 +67,8 @@ private:
void giveUpOnFetch();
void clearKnownTransactions() { std::lock_guard<std::mutex> l(x_knownTransactions); m_knownTransactions.clear(); }
unsigned m_protocolVersion;
u256 m_networkId;
@ -82,6 +84,7 @@ private:
std::set<h256> m_knownBlocks;
std::set<h256> m_knownTransactions;
std::mutex x_knownTransactions;
};
}

22
libp2p/Session.cpp

@ -266,19 +266,19 @@ void Session::writeImpl(bytes& _buffer)
if (!m_socket.is_open())
return;
lock_guard<recursive_mutex> l(m_writeLock);
m_writeQueue.push_back(_buffer);
if (m_writeQueue.size() == 1)
bool doWrite = false;
{
lock_guard<mutex> l(m_writeLock);
m_writeQueue.push_back(_buffer);
doWrite = (m_writeQueue.size() == 1);
}
if (doWrite)
write();
}
void Session::write()
{
// cerr << (void*)this << " write" << endl;
lock_guard<recursive_mutex> l(m_writeLock);
if (m_writeQueue.empty())
return;
const bytes& bytes = m_writeQueue[0];
auto self(shared_from_this());
ba::async_write(m_socket, ba::buffer(bytes), [this, self](boost::system::error_code ec, std::size_t /*length*/)
@ -290,12 +290,16 @@ void Session::write()
{
cwarn << "Error sending: " << ec.message();
dropped();
return;
}
else
{
lock_guard<mutex> l(m_writeLock);
m_writeQueue.pop_front();
write();
if (m_writeQueue.empty())
return;
}
write();
});
}

2
libp2p/Session.h

@ -86,7 +86,7 @@ private:
Host* m_server;
std::recursive_mutex m_writeLock;
std::mutex m_writeLock;
std::deque<bytes> m_writeQueue;
mutable bi::tcp::socket m_socket; ///< Mutable to ask for native_handle().

Loading…
Cancel
Save