Browse Source

Merge pull request #2425 from arkpar/tq

TransactionQueue docs added
cl-refactor
Gav Wood 10 years ago
parent
commit
50622504b7
  1. 21
      libethereum/TransactionQueue.cpp
  2. 61
      libethereum/TransactionQueue.h

21
libethereum/TransactionQueue.cpp

@ -36,12 +36,12 @@ TransactionQueue::TransactionQueue(unsigned _limit, unsigned _futureLimit):
m_limit(_limit), m_limit(_limit),
m_futureLimit(_futureLimit) m_futureLimit(_futureLimit)
{ {
unsigned verifierThreads = std::max(thread::hardware_concurrency(), 3U) - 2U; unsigned verifierThreads = std::max(thread::hardware_concurrency(), 3U) - 2U;
for (unsigned i = 0; i < verifierThreads; ++i) for (unsigned i = 0; i < verifierThreads; ++i)
m_verifiers.emplace_back([=](){ m_verifiers.emplace_back([=](){
setThreadName("txcheck" + toString(i)); setThreadName("txcheck" + toString(i));
this->verifierBody(); this->verifierBody();
}); });
} }
TransactionQueue::~TransactionQueue() TransactionQueue::~TransactionQueue()
@ -68,6 +68,9 @@ ImportResult TransactionQueue::import(bytesConstRef _transactionRLP, IfDropped _
try try
{ {
// Check validity of _transactionRLP as a transaction. To do this we just deserialise and attempt to determine the sender.
// If it doesn't work, the signature is bad.
// The transaction's nonce may yet be invalid (or, it could be "valid" but we may be missing a marginally older transaction).
t = Transaction(_transactionRLP, CheckTransaction::Everything); t = Transaction(_transactionRLP, CheckTransaction::Everything);
UpgradeGuard ul(l); UpgradeGuard ul(l);
ir = manageImport_WITH_LOCK(h, t); ir = manageImport_WITH_LOCK(h, t);
@ -99,7 +102,6 @@ ImportResult TransactionQueue::import(Transaction const& _transaction, IfDropped
ImportResult ret; ImportResult ret;
{ {
UpgradableGuard l(m_lock); UpgradableGuard l(m_lock);
// TODO: keep old transactions around and check in State for nonce validity
auto ir = check_WITH_LOCK(h, _ik); auto ir = check_WITH_LOCK(h, _ik);
if (ir != ImportResult::Success) if (ir != ImportResult::Success)
return ir; return ir;
@ -132,11 +134,7 @@ ImportResult TransactionQueue::manageImport_WITH_LOCK(h256 const& _h, Transactio
{ {
try try
{ {
// Check validity of _transactionRLP as a transaction. To do this we just deserialise and attempt to determine the sender.
// If it doesn't work, the signature is bad.
// The transaction's nonce may yet be invalid (or, it could be "valid" but we may be missing a marginally older transaction).
assert(_h == _transaction.sha3()); assert(_h == _transaction.sha3());
// Remove any prior transaction with the same nonce but a lower gas price. // Remove any prior transaction with the same nonce but a lower gas price.
// Bomb out if there's a prior transaction with higher gas price. // Bomb out if there's a prior transaction with higher gas price.
auto cs = m_currentByAddressAndNonce.find(_transaction.from()); auto cs = m_currentByAddressAndNonce.find(_transaction.from());
@ -263,7 +261,6 @@ unsigned TransactionQueue::waiting(Address const& _a) const
void TransactionQueue::setFuture(h256 const& _txHash) void TransactionQueue::setFuture(h256 const& _txHash)
{ {
// cdebug << "txQ::setFuture" << _t.first;
WriteGuard l(m_lock); WriteGuard l(m_lock);
auto it = m_currentByHash.find(_txHash); auto it = m_currentByHash.find(_txHash);
if (it == m_currentByHash.end()) if (it == m_currentByHash.end())

61
libethereum/TransactionQueue.h

@ -42,39 +42,84 @@ struct TransactionQueueChannel: public LogChannel { static const char* name(); s
struct TransactionQueueTraceChannel: public LogChannel { static const char* name(); static const int verbosity = 7; }; struct TransactionQueueTraceChannel: public LogChannel { static const char* name(); static const int verbosity = 7; };
#define ctxq dev::LogOutputStream<dev::eth::TransactionQueueTraceChannel, true>() #define ctxq dev::LogOutputStream<dev::eth::TransactionQueueTraceChannel, true>()
enum class IfDropped { Ignore, Retry }; /// Import transaction policy
enum class IfDropped
{
Ignore, ///< Don't import transaction that was previously dropped.
Retry ///< Import transaction even if it was dropped before.
};
/** /**
* @brief A queue of Transactions, each stored as RLP. * @brief A queue of Transactions, each stored as RLP.
* Maintains a transaction queue sorted by nonce diff and gas price * Maintains a transaction queue sorted by nonce diff and gas price.
* @threadsafe * @threadsafe
*/ */
class TransactionQueue class TransactionQueue
{ {
public: public:
/// @brief TransactionQueue /// @brief TransactionQueue
/// @param _limit Maximum number of pending transactions in the queue /// @param _limit Maximum number of pending transactions in the queue.
/// @param _futureLimit Maximum number of future nonce transactions /// @param _futureLimit Maximum number of future nonce transactions.
TransactionQueue(unsigned _limit = 1024, unsigned _futureLimit = 1024); TransactionQueue(unsigned _limit = 1024, unsigned _futureLimit = 1024);
~TransactionQueue(); ~TransactionQueue();
/// Add transaction to the queue to be verified and imported.
/// @param _data RLP encoded transaction data.
/// @param _nodeId Optional network identified of a node transaction comes from.
void enqueue(RLP const& _data, h512 const& _nodeId); void enqueue(RLP const& _data, h512 const& _nodeId);
/// Verify and add transaction to the queue synchronously.
/// @param _tx RLP encoded transaction data.
/// @param _ik Set to Retry to force re-addinga transaction that was previously dropped.
/// @returns Import result code.
ImportResult import(bytes const& _tx, IfDropped _ik = IfDropped::Ignore) { return import(&_tx, _ik); } ImportResult import(bytes const& _tx, IfDropped _ik = IfDropped::Ignore) { return import(&_tx, _ik); }
/// Verify and add transaction to the queue synchronously.
/// @param _tx Trasnaction data.
/// @param _ik Set to Retry to force re-addinga transaction that was previously dropped.
/// @returns Import result code.
ImportResult import(Transaction const& _tx, IfDropped _ik = IfDropped::Ignore); ImportResult import(Transaction const& _tx, IfDropped _ik = IfDropped::Ignore);
/// Remove transaction from the queue
/// @param _txHash Trasnaction hash
void drop(h256 const& _txHash); void drop(h256 const& _txHash);
/// Get number of pending transactions for account.
/// @returns Pending transaction count.
unsigned waiting(Address const& _a) const; unsigned waiting(Address const& _a) const;
/// Get top transactions from the queue. Returned transactions are not removed from the queue automatically.
/// @param _limit Max number of transactions to return.
/// @returns up to _limit transactions ordered by nonce and gas price.
Transactions topTransactions(unsigned _limit) const; Transactions topTransactions(unsigned _limit) const;
/// Get a hash set of transactions in the queue
/// @returns A hash set of all transactions in the queue
h256Hash knownTransactions() const; h256Hash knownTransactions() const;
/// Get max nonce for an account
/// @returns Max transaction nonce for account in the queue
u256 maxNonce(Address const& _a) const; u256 maxNonce(Address const& _a) const;
/// Mark transaction as future. It wont be retured in topTransactions list until a transaction with a preceeding nonce is imported or marked with dropGood
/// @param _t Transaction hash
void setFuture(h256 const& _t); void setFuture(h256 const& _t);
/// Drop a trasnaction from the list if exists and move following future trasnactions to current (if any)
/// @param _t Transaction hash
void dropGood(Transaction const& _t); void dropGood(Transaction const& _t);
/// Clear the queue
void clear(); void clear();
/// Register a handler that will be called once there is a new transaction imported
template <class T> Handler<> onReady(T const& _t) { return m_onReady.add(_t); } template <class T> Handler<> onReady(T const& _t) { return m_onReady.add(_t); }
/// Register a handler that will be called once asynchronous verification is comeplte an transaction has been imported
template <class T> Handler<ImportResult, h256 const&, h512 const&> onImport(T const& _t) { return m_onImport.add(_t); } template <class T> Handler<ImportResult, h256 const&, h512 const&> onImport(T const& _t) { return m_onImport.add(_t); }
private: private:
/// Verified and imported transaction
struct VerifiedTransaction struct VerifiedTransaction
{ {
VerifiedTransaction(Transaction const& _t): transaction(_t) {} VerifiedTransaction(Transaction const& _t): transaction(_t) {}
@ -83,9 +128,10 @@ private:
VerifiedTransaction(VerifiedTransaction const&) = delete; VerifiedTransaction(VerifiedTransaction const&) = delete;
VerifiedTransaction& operator=(VerifiedTransaction const&) = delete; VerifiedTransaction& operator=(VerifiedTransaction const&) = delete;
Transaction transaction; Transaction transaction; ///< Transaction data
}; };
/// Trasnaction pending verification
struct UnverifiedTransaction struct UnverifiedTransaction
{ {
UnverifiedTransaction() {} UnverifiedTransaction() {}
@ -96,13 +142,14 @@ private:
UnverifiedTransaction(UnverifiedTransaction const&) = delete; UnverifiedTransaction(UnverifiedTransaction const&) = delete;
UnverifiedTransaction& operator=(UnverifiedTransaction const&) = delete; UnverifiedTransaction& operator=(UnverifiedTransaction const&) = delete;
bytes transaction; bytes transaction; ///< RLP encoded transaction data
h512 nodeId; h512 nodeId; ///< Network Id of the peer transaction comes from
}; };
struct PriorityCompare struct PriorityCompare
{ {
TransactionQueue& queue; TransactionQueue& queue;
/// Compare transaction by nonce height and gas price.
bool operator()(VerifiedTransaction const& _first, VerifiedTransaction const& _second) const bool operator()(VerifiedTransaction const& _first, VerifiedTransaction const& _second) const
{ {
u256 const& height1 = _first.transaction.nonce() - queue.m_currentByAddressAndNonce[_first.transaction.sender()].begin()->first; u256 const& height1 = _first.transaction.nonce() - queue.m_currentByAddressAndNonce[_first.transaction.sender()].begin()->first;

Loading…
Cancel
Save