Browse Source

Merge pull request #2385 from ethereum/revert-2384-txqueueasyncverify

Revert "Txqueueasyncverify"
cl-refactor
Gav Wood 9 years ago
parent
commit
76e970b7ad
  1. 29
      libdevcore/Guards.h
  2. 7
      libethcore/Common.h
  3. 23
      libethcore/Ethash.cpp
  4. 5
      libethcore/Transaction.cpp
  5. 4
      libethcore/Transaction.h
  6. 10
      libethereum/BlockQueue.cpp
  7. 4
      libethereum/Client.cpp
  8. 14
      libethereum/State.cpp
  9. 4
      libethereum/Transaction.cpp
  10. 4
      libethereum/Transaction.h
  11. 114
      libethereum/TransactionQueue.cpp
  12. 32
      libethereum/TransactionQueue.h
  13. 15
      test/libethereum/transactionqueue.cpp

29
libdevcore/Guards.h

@ -23,7 +23,6 @@
#include <mutex> #include <mutex>
#include <atomic> #include <atomic>
#include <condition_variable>
#include <boost/thread.hpp> #include <boost/thread.hpp>
namespace dev namespace dev
@ -34,7 +33,6 @@ using RecursiveMutex = std::recursive_mutex;
using SharedMutex = boost::shared_mutex; using SharedMutex = boost::shared_mutex;
using Guard = std::lock_guard<std::mutex>; using Guard = std::lock_guard<std::mutex>;
using UniqueGuard = std::unique_lock<std::mutex>;
using RecursiveGuard = std::lock_guard<std::recursive_mutex>; using RecursiveGuard = std::lock_guard<std::recursive_mutex>;
using ReadGuard = boost::shared_lock<boost::shared_mutex>; using ReadGuard = boost::shared_lock<boost::shared_mutex>;
using UpgradableGuard = boost::upgrade_lock<boost::shared_mutex>; using UpgradableGuard = boost::upgrade_lock<boost::shared_mutex>;
@ -124,31 +122,4 @@ using SpinGuard = std::lock_guard<SpinLock>;
#define DEV_WRITE_UNGUARDED(MUTEX) \ #define DEV_WRITE_UNGUARDED(MUTEX) \
for (GenericUnguardBool<SharedMutex> __eth_l(MUTEX); __eth_l.b; __eth_l.b = false) for (GenericUnguardBool<SharedMutex> __eth_l(MUTEX); __eth_l.b; __eth_l.b = false)
template <class N>
class Notified
{
public:
Notified() {}
Notified(N const& _v): m_value(_v) {}
Notified(Notified const&) = delete;
Notified& operator=(N const& _v) { UniqueGuard l(m_mutex); m_value = _v; m_cv.notify_all(); return *this; }
operator N() const { UniqueGuard l(m_mutex); return m_value; }
void wait() const { UniqueGuard l(m_mutex); m_cv.wait(l); }
void wait(N const& _v) const { UniqueGuard l(m_mutex); m_cv.wait(l, [&](){return m_value == _v;}); }
void wait_not(N const& _v) const { UniqueGuard l(m_mutex); m_cv.wait(l, [&](){return m_value != _v;}); }
template <class F> void wait(F const& _f) const { UniqueGuard l(m_mutex); m_cv.wait(l, _f); }
template <class R, class P> void wait(std::chrono::duration<R, P> const& _duration) const { UniqueGuard l(m_mutex); m_cv.wait_for(l, _duration); }
template <class R, class P> void wait(std::chrono::duration<R, P> const& _duration, N const& _v) const { UniqueGuard l(m_mutex); m_cv.wait_for(l, _duration, [&](){return m_value == _v;}); }
template <class R, class P> void wait_not(std::chrono::duration<R, P> const& _duration, N const& _v) const { UniqueGuard l(m_mutex); m_cv.wait_for(l, _duration, [&](){return m_value != _v;}); }
template <class R, class P, class F> void wait(std::chrono::duration<R, P> const& _duration, F const& _f) const { UniqueGuard l(m_mutex); m_cv.wait_for(l, _duration, _f); }
private:
mutable Mutex m_mutex;
mutable std::condition_variable m_cv;
N m_value;
};
} }

7
libethcore/Common.h

@ -113,12 +113,7 @@ enum class ImportResult
AlreadyKnown, AlreadyKnown,
Malformed, Malformed,
OverbidGasPrice, OverbidGasPrice,
GasPriceTooLow, BadChain
BadChain,
NonceTooLow,
FarAway,
TooMuchGas,
Unknown
}; };
struct ImportRequirements struct ImportRequirements

23
libethcore/Ethash.cpp

@ -225,6 +225,29 @@ std::string Ethash::CPUMiner::platformInfo()
#if ETH_ETHASHCL || !ETH_TRUE #if ETH_ETHASHCL || !ETH_TRUE
using UniqueGuard = std::unique_lock<std::mutex>;
template <class N>
class Notified
{
public:
Notified() {}
Notified(N const& _v): m_value(_v) {}
Notified(Notified const&) = delete;
Notified& operator=(N const& _v) { UniqueGuard l(m_mutex); m_value = _v; m_cv.notify_all(); return *this; }
operator N() const { UniqueGuard l(m_mutex); return m_value; }
void wait() const { UniqueGuard l(m_mutex); m_cv.wait(l); }
void wait(N const& _v) const { UniqueGuard l(m_mutex); m_cv.wait(l, [&](){return m_value == _v;}); }
template <class F> void wait(F const& _f) const { UniqueGuard l(m_mutex); m_cv.wait(l, _f); }
private:
mutable Mutex m_mutex;
mutable std::condition_variable m_cv;
N m_value;
};
class EthashCLHook: public ethash_cl_miner::search_hook class EthashCLHook: public ethash_cl_miner::search_hook
{ {
public: public:

5
libethcore/Transaction.cpp

@ -29,8 +29,7 @@ using namespace std;
using namespace dev; using namespace dev;
using namespace dev::eth; using namespace dev::eth;
TransactionBase::TransactionBase(bytesConstRef _rlpData, CheckTransaction _checkSig, h256 const& _precomputed): TransactionBase::TransactionBase(bytesConstRef _rlpData, CheckTransaction _checkSig)
m_hashWith(_precomputed)
{ {
int field = 0; int field = 0;
RLP rlp(_rlpData); RLP rlp(_rlpData);
@ -55,7 +54,7 @@ TransactionBase::TransactionBase(bytesConstRef _rlpData, CheckTransaction _check
h256 s = rlp[field = 8].toInt<u256>(); h256 s = rlp[field = 8].toInt<u256>();
if (rlp.itemCount() > 9) if (rlp.itemCount() > 9)
BOOST_THROW_EXCEPTION(InvalidTransactionFormat() << errinfo_comment("too many fields in the transaction RLP")); BOOST_THROW_EXCEPTION(InvalidTransactionFormat() << errinfo_comment("to many fields in the transaction RLP"));
m_vrs = SignatureStruct{ r, s, v }; m_vrs = SignatureStruct{ r, s, v };
if (_checkSig >= CheckTransaction::Cheap && !m_vrs.isValid()) if (_checkSig >= CheckTransaction::Cheap && !m_vrs.isValid())

4
libethcore/Transaction.h

@ -64,10 +64,10 @@ public:
TransactionBase(u256 const& _value, u256 const& _gasPrice, u256 const& _gas, bytes const& _data, u256 const& _nonce = 0): m_type(ContractCreation), m_nonce(_nonce), m_value(_value), m_gasPrice(_gasPrice), m_gas(_gas), m_data(_data) {} TransactionBase(u256 const& _value, u256 const& _gasPrice, u256 const& _gas, bytes const& _data, u256 const& _nonce = 0): m_type(ContractCreation), m_nonce(_nonce), m_value(_value), m_gasPrice(_gasPrice), m_gas(_gas), m_data(_data) {}
/// Constructs a transaction from the given RLP. /// Constructs a transaction from the given RLP.
explicit TransactionBase(bytesConstRef _rlp, CheckTransaction _checkSig, h256 const& _precomputed = h256()); explicit TransactionBase(bytesConstRef _rlp, CheckTransaction _checkSig);
/// Constructs a transaction from the given RLP. /// Constructs a transaction from the given RLP.
explicit TransactionBase(bytes const& _rlp, CheckTransaction _checkSig, h256 const& _precomputed = h256()): TransactionBase(&_rlp, _checkSig, _precomputed) {} explicit TransactionBase(bytes const& _rlp, CheckTransaction _checkSig): TransactionBase(&_rlp, _checkSig) {}
/// Checks equality of transactions. /// Checks equality of transactions.

10
libethereum/BlockQueue.cpp

@ -53,7 +53,7 @@ BlockQueue::BlockQueue():
unsigned verifierThreads = std::max(thread::hardware_concurrency(), 3U) - 2U; unsigned verifierThreads = std::max(thread::hardware_concurrency(), 3U) - 2U;
for (unsigned i = 0; i < verifierThreads; ++i) for (unsigned i = 0; i < verifierThreads; ++i)
m_verifiers.emplace_back([=](){ m_verifiers.emplace_back([=](){
setThreadName("blockcheck" + toString(i)); setThreadName("verifier" + toString(i));
this->verifierBody(); this->verifierBody();
}); });
} }
@ -92,8 +92,6 @@ void BlockQueue::verifierBody()
{ {
UnverifiedBlock work; UnverifiedBlock work;
DEV_READ_GUARDED(m_lock)
DEV_INVARIANT_CHECK;
{ {
unique_lock<Mutex> l(m_verification); unique_lock<Mutex> l(m_verification);
m_moreToVerify.wait(l, [&](){ return !m_unverified.empty() || m_deleting; }); m_moreToVerify.wait(l, [&](){ return !m_unverified.empty() || m_deleting; });
@ -106,8 +104,6 @@ void BlockQueue::verifierBody()
bi.parentHash = work.parentHash; bi.parentHash = work.parentHash;
m_verifying.emplace_back(move(bi)); m_verifying.emplace_back(move(bi));
} }
DEV_READ_GUARDED(m_lock)
DEV_INVARIANT_CHECK;
VerifiedBlock res; VerifiedBlock res;
swap(work.block, res.blockData); swap(work.block, res.blockData);
@ -120,7 +116,6 @@ void BlockQueue::verifierBody()
// bad block. // bad block.
// has to be this order as that's how invariants() assumes. // has to be this order as that's how invariants() assumes.
WriteGuard l2(m_lock); WriteGuard l2(m_lock);
DEV_INVARIANT_CHECK;
unique_lock<Mutex> l(m_verification); unique_lock<Mutex> l(m_verification);
m_readySet.erase(work.hash); m_readySet.erase(work.hash);
m_knownBad.insert(work.hash); m_knownBad.insert(work.hash);
@ -139,7 +134,6 @@ void BlockQueue::verifierBody()
bool ready = false; bool ready = false;
{ {
WriteGuard l2(m_lock); WriteGuard l2(m_lock);
DEV_INVARIANT_CHECK;
unique_lock<Mutex> l(m_verification); unique_lock<Mutex> l(m_verification);
if (!m_verifying.empty() && m_verifying.front().verified.info.mixHash == work.hash) if (!m_verifying.empty() && m_verifying.front().verified.info.mixHash == work.hash)
{ {
@ -466,8 +460,6 @@ void BlockQueue::drain(VerifiedBlocks& o_out, unsigned _max)
auto h = bs.verified.info.hash(); auto h = bs.verified.info.hash();
m_drainingSet.insert(h); m_drainingSet.insert(h);
m_drainingDifficulty += bs.verified.info.difficulty; m_drainingDifficulty += bs.verified.info.difficulty;
if (!m_readySet.count(h))
cwarn << "ODD: Invariant will fail: ready set doesn't contain drained verified block" << h;
m_readySet.erase(h); m_readySet.erase(h);
m_knownSize -= bs.verified.block.size(); m_knownSize -= bs.verified.block.size();
m_knownCount--; m_knownCount--;

4
libethereum/Client.cpp

@ -604,7 +604,7 @@ void Client::onChainChanged(ImportRoute const& _ir)
for (auto const& t: m_bc.transactions(h)) for (auto const& t: m_bc.transactions(h))
{ {
clog(ClientTrace) << "Resubmitting dead-block transaction " << Transaction(t, CheckTransaction::None); clog(ClientTrace) << "Resubmitting dead-block transaction " << Transaction(t, CheckTransaction::None);
m_tq.import(t, ImportCallback(), IfDropped::Retry); m_tq.import(t, TransactionQueue::ImportCallback(), IfDropped::Retry);
} }
} }
@ -651,7 +651,7 @@ void Client::onChainChanged(ImportRoute const& _ir)
for (auto const& t: m_postMine.pending()) for (auto const& t: m_postMine.pending())
{ {
clog(ClientTrace) << "Resubmitting post-mine transaction " << t; clog(ClientTrace) << "Resubmitting post-mine transaction " << t;
auto ir = m_tq.import(t, ImportCallback(), IfDropped::Retry); auto ir = m_tq.import(t, TransactionQueue::ImportCallback(), IfDropped::Retry);
if (ir != ImportResult::Success) if (ir != ImportResult::Success)
onTransactionQueueReady(); onTransactionQueueReady();
} }

14
libethereum/State.cpp

@ -524,7 +524,7 @@ pair<TransactionReceipts, bool> State::sync(BlockChain const& _bc, TransactionQu
{ {
// less than 90% of our ask price for gas. drop. // less than 90% of our ask price for gas. drop.
cnote << i.first << "Dropping El Cheapo transaction (<90% of ask price)"; cnote << i.first << "Dropping El Cheapo transaction (<90% of ask price)";
_tq.drop(i.first, ImportResult::GasPriceTooLow); _tq.drop(i.first);
} }
} }
catch (InvalidNonce const& in) catch (InvalidNonce const& in)
@ -541,12 +541,12 @@ pair<TransactionReceipts, bool> State::sync(BlockChain const& _bc, TransactionQu
if (t.nonce() < i.second.nonce()) if (t.nonce() < i.second.nonce())
{ {
cnote << i.first << "Dropping old transaction (nonce too low)"; cnote << i.first << "Dropping old transaction (nonce too low)";
_tq.drop(i.first, ImportResult::NonceTooLow); _tq.drop(i.first);
} }
else if (t.nonce() == i.second.nonce() && t.gasPrice() <= i.second.gasPrice()) else if (t.nonce() == i.second.nonce() && t.gasPrice() <= i.second.gasPrice())
{ {
cnote << i.first << "Dropping old transaction (gas price lower)"; cnote << i.first << "Dropping old transaction (gas price lower)";
_tq.drop(i.first, ImportResult::OverbidGasPrice); _tq.drop(i.first);
} }
} }
} }
@ -554,7 +554,7 @@ pair<TransactionReceipts, bool> State::sync(BlockChain const& _bc, TransactionQu
{ {
// too new // too new
cnote << i.first << "Dropping new transaction (too many nonces ahead)"; cnote << i.first << "Dropping new transaction (too many nonces ahead)";
_tq.drop(i.first, ImportResult::FarAway); _tq.drop(i.first);
} }
else else
_tq.setFuture(i); _tq.setFuture(i);
@ -565,7 +565,7 @@ pair<TransactionReceipts, bool> State::sync(BlockChain const& _bc, TransactionQu
if (got > m_currentBlock.gasLimit) if (got > m_currentBlock.gasLimit)
{ {
cnote << i.first << "Dropping over-gassy transaction (gas > block's gas limit)"; cnote << i.first << "Dropping over-gassy transaction (gas > block's gas limit)";
_tq.drop(i.first, ImportResult::TooMuchGas); _tq.drop(i.first);
} }
else else
{ {
@ -579,12 +579,12 @@ pair<TransactionReceipts, bool> State::sync(BlockChain const& _bc, TransactionQu
{ {
// Something else went wrong - drop it. // Something else went wrong - drop it.
cnote << i.first << "Dropping invalid transaction:" << diagnostic_information(_e); cnote << i.first << "Dropping invalid transaction:" << diagnostic_information(_e);
_tq.drop(i.first, ImportResult::Malformed); _tq.drop(i.first);
} }
catch (std::exception const&) catch (std::exception const&)
{ {
// Something else went wrong - drop it. // Something else went wrong - drop it.
_tq.drop(i.first, ImportResult::Malformed); _tq.drop(i.first);
cnote << i.first << "Transaction caused low-level exception :("; cnote << i.first << "Transaction caused low-level exception :(";
} }
} }

4
libethereum/Transaction.cpp

@ -94,8 +94,8 @@ std::ostream& dev::eth::operator<<(std::ostream& _out, TransactionException cons
return _out; return _out;
} }
Transaction::Transaction(bytesConstRef _rlpData, CheckTransaction _checkSig, h256 const& _precomputed): Transaction::Transaction(bytesConstRef _rlpData, CheckTransaction _checkSig):
TransactionBase(_rlpData, _checkSig, _precomputed) TransactionBase(_rlpData, _checkSig)
{ {
if (_checkSig >= CheckTransaction::Cheap && !checkPayment()) if (_checkSig >= CheckTransaction::Cheap && !checkPayment())
BOOST_THROW_EXCEPTION(OutOfGasIntrinsic() << RequirementError(gasRequired(), (bigint)gas())); BOOST_THROW_EXCEPTION(OutOfGasIntrinsic() << RequirementError(gasRequired(), (bigint)gas()));

4
libethereum/Transaction.h

@ -106,10 +106,10 @@ public:
{} {}
/// Constructs a transaction from the given RLP. /// Constructs a transaction from the given RLP.
explicit Transaction(bytesConstRef _rlp, CheckTransaction _checkSig, h256 const& _precomputed = h256()); explicit Transaction(bytesConstRef _rlp, CheckTransaction _checkSig);
/// Constructs a transaction from the given RLP. /// Constructs a transaction from the given RLP.
explicit Transaction(bytes const& _rlp, CheckTransaction _checkSig, h256 const& _precomputed = h256()): Transaction(&_rlp, _checkSig, _precomputed) {} explicit Transaction(bytes const& _rlp, CheckTransaction _checkSig): Transaction(&_rlp, _checkSig) {}
/// @returns true if the transaction contains enough gas for the basic payment. /// @returns true if the transaction contains enough gas for the basic payment.
bool checkPayment() const { return m_gas >= gasRequired(); } bool checkPayment() const { return m_gas >= gasRequired(); }

114
libethereum/TransactionQueue.cpp

@ -31,34 +31,12 @@ using namespace dev::eth;
const char* TransactionQueueChannel::name() { return EthCyan "┉┅▶"; } const char* TransactionQueueChannel::name() { return EthCyan "┉┅▶"; }
const char* TransactionQueueTraceChannel::name() { return EthCyan " ┅▶"; } const char* TransactionQueueTraceChannel::name() { return EthCyan " ┅▶"; }
TransactionQueue::TransactionQueue()
{
// Allow some room for other activity
unsigned verifierThreads = std::max(thread::hardware_concurrency(), 3U) - 2U;
for (unsigned i = 0; i < verifierThreads; ++i)
m_verifiers.emplace_back([=](){
setThreadName("txcheck" + toString(i));
this->verifierBody();
});
}
TransactionQueue::~TransactionQueue()
{
m_deleting = true;
m_moreToVerify.notify_all();
for (auto& i: m_verifiers)
i.join();
}
bool TransactionQueue::invariants() const
{
return true;
}
ImportResult TransactionQueue::import(bytesConstRef _transactionRLP, ImportCallback const& _cb, IfDropped _ik) ImportResult TransactionQueue::import(bytesConstRef _transactionRLP, ImportCallback const& _cb, IfDropped _ik)
{ {
// Check if we already know this transaction. // Check if we already know this transaction.
auto h = sha3(_transactionRLP); h256 h = sha3(_transactionRLP);
Transaction t;
ImportResult ir; ImportResult ir;
{ {
UpgradableGuard l(m_lock); UpgradableGuard l(m_lock);
@ -67,69 +45,24 @@ ImportResult TransactionQueue::import(bytesConstRef _transactionRLP, ImportCallb
if (ir != ImportResult::Success) if (ir != ImportResult::Success)
return ir; return ir;
UpgradeGuard ll(l);
m_submitted.insert(h);
DEV_GUARDED(m_verification)
{
m_unverified.push_back(UnverifiedTransaction{h, _transactionRLP.toBytes(), _cb});
m_moreToVerify.notify_one();
}
}
// cdebug << "import-END: Nonce of" << t.sender() << "now" << maxNonce(t.sender());
return ir;
}
void TransactionQueue::verifierBody()
{
while (!m_deleting)
{
UnverifiedTransaction work;
{
DEV_INVARIANT_CHECK;
unique_lock<Mutex> l(m_verification);
m_moreToVerify.wait(l, [&](){ return !m_unverified.empty() || m_deleting; });
if (m_deleting)
return;
swap(work, m_unverified.front());
m_unverified.pop_front();
}
Transaction res;
try try
{ {
res = Transaction(work.data, CheckTransaction::Everything, work.hash); t = Transaction(_transactionRLP, CheckTransaction::Everything);
} UpgradeGuard ul(l);
catch (Exception& ex) ir = manageImport_WITH_LOCK(h, t, _cb);
{
cdebug << "Bad transation inserted" << ex.what();
cdebug << boost::diagnostic_information(ex);
// bad transaction.
// has to be this order as that's how invariants() assumes.
WriteGuard l(m_lock);
DEV_INVARIANT_CHECK;
m_submitted.erase(work.hash);
m_dropped.insert(work.hash);
if (work.cb)
work.cb(ImportResult::Malformed);
continue;
} }
catch (...)
ImportResult ir;
{ {
WriteGuard l(m_lock); return ImportResult::Malformed;
DEV_INVARIANT_CHECK;
m_submitted.erase(work.hash);
ir = manageImport_WITH_LOCK(work.hash, res, work.cb);
} }
if (ir != ImportResult::Success && work.cb)
work.cb(ir);
} }
// cdebug << "import-END: Nonce of" << t.sender() << "now" << maxNonce(t.sender());
return ir;
} }
ImportResult TransactionQueue::check_WITH_LOCK(h256 const& _h, IfDropped _ik) ImportResult TransactionQueue::check_WITH_LOCK(h256 const& _h, IfDropped _ik)
{ {
if (m_known.count(_h) || m_submitted.count(_h)) if (m_known.count(_h))
return ImportResult::AlreadyKnown; return ImportResult::AlreadyKnown;
if (m_dropped.count(_h) && _ik == IfDropped::Ignore) if (m_dropped.count(_h) && _ik == IfDropped::Ignore)
@ -306,24 +239,7 @@ void TransactionQueue::setFuture(std::pair<h256, Transaction> const& _t)
void TransactionQueue::noteGood(std::pair<h256, Transaction> const& _t) void TransactionQueue::noteGood(std::pair<h256, Transaction> const& _t)
{ {
// cdebug << "txQ::noteGood" << _t.first; // cdebug << "txQ::noteGood" << _t.first;
UpgradableGuard l(m_lock); WriteGuard l(m_lock);
if (!m_known.count(_t.first))
return;
// Point out it has been successfully been placed in a block.
if (m_callbacks.count(_t.first) && m_callbacks[_t.first])
m_callbacks[_t.first](ImportResult::Success);
UpgradeGuard l2(l);
// Erase the transaction itself.
m_current.erase(_t.first);
m_callbacks.erase(_t.first);
// Bring the a now-value transaction with the next nonce from m_future into m_current.
// At present it reinserts all transactions from the good transaction's sender.
// TODO: Should really just insert the transaction with the following nonce.
auto r = m_senders.equal_range(_t.second.sender()); auto r = m_senders.equal_range(_t.second.sender());
for (auto it = r.first; it != r.second; ++it) for (auto it = r.first; it != r.second; ++it)
{ {
@ -336,20 +252,16 @@ void TransactionQueue::noteGood(std::pair<h256, Transaction> const& _t)
} }
} }
void TransactionQueue::drop(h256 const& _txHash, ImportResult _ir) void TransactionQueue::drop(h256 const& _txHash)
{ {
UpgradableGuard l(m_lock); UpgradableGuard l(m_lock);
if (!m_known.count(_txHash)) if (!m_known.count(_txHash))
return; return;
if (m_callbacks.count(_txHash) && m_callbacks[_txHash])
m_callbacks[_txHash](_ir);
UpgradeGuard ul(l); UpgradeGuard ul(l);
m_dropped.insert(_txHash); m_dropped.insert(_txHash);
m_known.erase(_txHash); m_known.erase(_txHash);
m_callbacks.erase(_txHash);
remove_WITH_LOCK(_txHash); remove_WITH_LOCK(_txHash);
} }

32
libethereum/TransactionQueue.h

@ -21,11 +21,7 @@
#pragma once #pragma once
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional> #include <functional>
#include <deque>
#include <libdevcore/Common.h> #include <libdevcore/Common.h>
#include <libdevcore/Guards.h> #include <libdevcore/Guards.h>
#include <libdevcore/Log.h> #include <libdevcore/Log.h>
@ -45,30 +41,20 @@ struct TransactionQueueTraceChannel: public LogChannel { static const char* name
enum class IfDropped { Ignore, Retry }; enum class IfDropped { Ignore, Retry };
using ImportCallback = std::function<void(ImportResult)>;
struct UnverifiedTransaction
{
h256 hash;
bytes data;
ImportCallback cb;
};
/** /**
* @brief A queue of Transactions, each stored as RLP. * @brief A queue of Transactions, each stored as RLP.
* @threadsafe * @threadsafe
*/ */
class TransactionQueue: HasInvariants class TransactionQueue
{ {
public: public:
TransactionQueue(); using ImportCallback = std::function<void(ImportResult)>;
virtual ~TransactionQueue();
ImportResult import(Transaction const& _tx, ImportCallback const& _cb = ImportCallback(), IfDropped _ik = IfDropped::Ignore); ImportResult import(Transaction const& _tx, ImportCallback const& _cb = ImportCallback(), IfDropped _ik = IfDropped::Ignore);
ImportResult import(bytes const& _tx, ImportCallback const& _cb = ImportCallback(), IfDropped _ik = IfDropped::Ignore) { return import(&_tx, _cb, _ik); } ImportResult import(bytes const& _tx, ImportCallback const& _cb = ImportCallback(), IfDropped _ik = IfDropped::Ignore) { return import(&_tx, _cb, _ik); }
ImportResult import(bytesConstRef _tx, ImportCallback const& _cb = ImportCallback(), IfDropped _ik = IfDropped::Ignore); ImportResult import(bytesConstRef _tx, ImportCallback const& _cb = ImportCallback(), IfDropped _ik = IfDropped::Ignore);
void drop(h256 const& _txHash, ImportResult _ir = ImportResult::Unknown); void drop(h256 const& _txHash);
unsigned waiting(Address const& _a) const; unsigned waiting(Address const& _a) const;
std::unordered_map<h256, Transaction> transactions() const; std::unordered_map<h256, Transaction> transactions() const;
@ -89,10 +75,6 @@ private:
bool remove_WITH_LOCK(h256 const& _txHash); bool remove_WITH_LOCK(h256 const& _txHash);
u256 maxNonce_WITH_LOCK(Address const& _a) const; u256 maxNonce_WITH_LOCK(Address const& _a) const;
bool invariants() const override;
void verifierBody();
mutable SharedMutex m_lock; ///< General lock. mutable SharedMutex m_lock; ///< General lock.
h256Hash m_known; ///< Hashes of transactions in both sets. h256Hash m_known; ///< Hashes of transactions in both sets.
std::unordered_multimap<Address, h256> m_senders; ///< Mapping from the sender address to the transaction hash; useful for determining the nonce of a given sender. std::unordered_multimap<Address, h256> m_senders; ///< Mapping from the sender address to the transaction hash; useful for determining the nonce of a given sender.
@ -100,15 +82,7 @@ private:
std::unordered_map<h256, Transaction> m_future; ///< For transactions that have a future nonce; we re-insert into current once the sender has a valid TX. std::unordered_map<h256, Transaction> m_future; ///< For transactions that have a future nonce; we re-insert into current once the sender has a valid TX.
std::unordered_map<h256, std::function<void(ImportResult)>> m_callbacks; ///< Called once. std::unordered_map<h256, std::function<void(ImportResult)>> m_callbacks; ///< Called once.
h256Hash m_dropped; ///< Transactions that have previously been dropped. h256Hash m_dropped; ///< Transactions that have previously been dropped.
h256Hash m_submitted; ///< Hashes of transactions that have been submitted but not yet processed (unverified or verifying).
Signal m_onReady; ///< Called when a subsequent call to import transactions will return a non-empty container. Be nice and exit fast. Signal m_onReady; ///< Called when a subsequent call to import transactions will return a non-empty container. Be nice and exit fast.
mutable Mutex m_verification; ///< Mutex that allows writing to m_unverified & m_moreToVerify.
std::condition_variable m_moreToVerify; ///< Signaled when m_unverified has a new entry.
std::deque<UnverifiedTransaction> m_unverified; ///< List of <tx hash, tx data> ready for verification.
std::vector<std::thread> m_verifiers; ///< Threads who only verify.
bool m_deleting = false; ///< Exit condition for verifiers.
}; };
} }

15
test/libethereum/transactionqueue.cpp

@ -40,15 +40,13 @@ BOOST_AUTO_TEST_CASE(maxNonce)
Address dest = Address("0x095e7baea6a6c7c4c2dfeb977efac326af552d87"); Address dest = Address("0x095e7baea6a6c7c4c2dfeb977efac326af552d87");
Address to = Address("0xa94f5374fce5edbc8e2a8697c15331677e6ebf0b"); Address to = Address("0xa94f5374fce5edbc8e2a8697c15331677e6ebf0b");
Secret sec = Secret("0x45a915e4d060149eb4365960e6a7a45f334393093061116b197e3240065ff2d8"); Secret sec = Secret("0x45a915e4d060149eb4365960e6a7a45f334393093061116b197e3240065ff2d8");
Transaction tx0(0, gasCost, gas, dest, bytes(), 0, sec); Transaction tx0(0, gasCost, gas, dest, bytes(), 0, sec );
Transaction tx0_1(1, gasCost, gas, dest, bytes(), 0, sec); Transaction tx0_1(1, gasCost, gas, dest, bytes(), 0, sec );
Transaction tx1(0, gasCost, gas, dest, bytes(), 1, sec); Transaction tx1(0, gasCost, gas, dest, bytes(), 1, sec );
Transaction tx2(0, gasCost, gas, dest, bytes(), 2, sec); Transaction tx2(0, gasCost, gas, dest, bytes(), 2, sec );
Transaction tx9(0, gasCost, gas, dest, bytes(), 9, sec); Transaction tx9(0, gasCost, gas, dest, bytes(), 9, sec );
txq.import(tx0.rlp()); txq.import(tx0);
while (txq.transactions().empty())
this_thread::sleep_for(chrono::milliseconds(20));
BOOST_CHECK(1 == txq.maxNonce(to)); BOOST_CHECK(1 == txq.maxNonce(to));
txq.import(tx0); txq.import(tx0);
BOOST_CHECK(1 == txq.maxNonce(to)); BOOST_CHECK(1 == txq.maxNonce(to));
@ -60,6 +58,7 @@ BOOST_AUTO_TEST_CASE(maxNonce)
BOOST_CHECK(10 == txq.maxNonce(to)); BOOST_CHECK(10 == txq.maxNonce(to));
txq.import(tx2); txq.import(tx2);
BOOST_CHECK(10 == txq.maxNonce(to)); BOOST_CHECK(10 == txq.maxNonce(to));
} }
BOOST_AUTO_TEST_SUITE_END() BOOST_AUTO_TEST_SUITE_END()

Loading…
Cancel
Save