Browse Source

Merge pull request #2376 from arkpar/tq_rf

TransactionQueue priority queue
cl-refactor
Gav Wood 10 years ago
parent
commit
afc016c88c
  1. 25
      libethereum/EthereumHost.cpp
  2. 61
      libethereum/State.cpp
  3. 215
      libethereum/TransactionQueue.cpp
  4. 55
      libethereum/TransactionQueue.h
  5. 2
      libethereum/VerifiedBlock.h
  6. 2
      libp2p/Common.h
  7. 4
      test/libethereum/blockchain.cpp
  8. 108
      test/libethereum/transactionqueue.cpp

25
libethereum/EthereumHost.cpp

@ -41,6 +41,7 @@ using namespace dev::eth;
using namespace p2p; using namespace p2p;
unsigned const EthereumHost::c_oldProtocolVersion = 60; //TODO: remove this once v61+ is common unsigned const EthereumHost::c_oldProtocolVersion = 60; //TODO: remove this once v61+ is common
static unsigned const c_maxSendTransactions = 256;
char const* const EthereumHost::s_stateNames[static_cast<int>(SyncState::Size)] = {"Idle", "Waiting", "Hashes", "Blocks", "NewBlocks" }; char const* const EthereumHost::s_stateNames[static_cast<int>(SyncState::Size)] = {"Idle", "Waiting", "Hashes", "Blocks", "NewBlocks" };
@ -67,8 +68,7 @@ bool EthereumHost::ensureInitialised()
m_latestBlockSent = m_chain.currentHash(); m_latestBlockSent = m_chain.currentHash();
clog(NetNote) << "Initialising: latest=" << m_latestBlockSent; clog(NetNote) << "Initialising: latest=" << m_latestBlockSent;
for (auto const& i: m_tq.transactions()) m_transactionsSent = m_tq.knownTransactions();
m_transactionsSent.insert(i.first);
return true; return true;
} }
return false; return false;
@ -114,25 +114,26 @@ void EthereumHost::doWork()
void EthereumHost::maintainTransactions() void EthereumHost::maintainTransactions()
{ {
// Send any new transactions. // Send any new transactions.
unordered_map<std::shared_ptr<EthereumPeer>, h256s> peerTransactions; unordered_map<std::shared_ptr<EthereumPeer>, std::vector<size_t>> peerTransactions;
auto ts = m_tq.transactions(); auto ts = m_tq.topTransactions(c_maxSendTransactions);
for (auto const& i: ts) for (size_t i = 0; i < ts.size(); ++i)
{ {
bool unsent = !m_transactionsSent.count(i.first); auto const& t = ts[i];
auto peers = get<1>(randomSelection(0, [&](EthereumPeer* p) { return p->m_requireTransactions || (unsent && !p->m_knownTransactions.count(i.first)); })); bool unsent = !m_transactionsSent.count(t.sha3());
auto peers = get<1>(randomSelection(0, [&](EthereumPeer* p) { return p->m_requireTransactions || (unsent && !p->m_knownTransactions.count(t.sha3())); }));
for (auto const& p: peers) for (auto const& p: peers)
peerTransactions[p].push_back(i.first); peerTransactions[p].push_back(i);
} }
for (auto const& t: ts) for (auto const& t: ts)
m_transactionsSent.insert(t.first); m_transactionsSent.insert(t.sha3());
foreachPeer([&](shared_ptr<EthereumPeer> _p) foreachPeer([&](shared_ptr<EthereumPeer> _p)
{ {
bytes b; bytes b;
unsigned n = 0; unsigned n = 0;
for (auto const& h: peerTransactions[_p]) for (auto const& i: peerTransactions[_p])
{ {
_p->m_knownTransactions.insert(h); _p->m_knownTransactions.insert(ts[i].sha3());
b += ts[h].rlp(); b += ts[i].rlp();
++n; ++n;
} }

61
libethereum/State.cpp

@ -47,6 +47,7 @@ namespace fs = boost::filesystem;
#define ETH_TIMED_ENACTMENTS 0 #define ETH_TIMED_ENACTMENTS 0
static const u256 c_blockReward = c_network == Network::Olympic ? (1500 * finney) : (5 * ether); static const u256 c_blockReward = c_network == Network::Olympic ? (1500 * finney) : (5 * ether);
static const unsigned c_maxSyncTransactions = 256;
const char* StateSafeExceptions::name() { return EthViolet "" EthBlue ""; } const char* StateSafeExceptions::name() { return EthViolet "" EthBlue ""; }
const char* StateDetail::name() { return EthViolet "" EthWhite ""; } const char* StateDetail::name() { return EthViolet "" EthWhite ""; }
@ -495,7 +496,7 @@ pair<TransactionReceipts, bool> State::sync(BlockChain const& _bc, TransactionQu
pair<TransactionReceipts, bool> ret; pair<TransactionReceipts, bool> ret;
ret.second = false; ret.second = false;
auto ts = _tq.transactions(); auto ts = _tq.topTransactions(c_maxSyncTransactions);
LastHashes lh; LastHashes lh;
@ -504,27 +505,26 @@ pair<TransactionReceipts, bool> State::sync(BlockChain const& _bc, TransactionQu
for (int goodTxs = 1; goodTxs; ) for (int goodTxs = 1; goodTxs; )
{ {
goodTxs = 0; goodTxs = 0;
for (auto const& i: ts) for (auto const& t: ts)
if (!m_transactionSet.count(i.first)) if (!m_transactionSet.count(t.sha3()))
{ {
try try
{ {
if (i.second.gasPrice() >= _gp.ask(*this)) if (t.gasPrice() >= _gp.ask(*this))
{ {
// Timer t; // Timer t;
if (lh.empty()) if (lh.empty())
lh = _bc.lastHashes(); lh = _bc.lastHashes();
execute(lh, i.second); execute(lh, t);
ret.first.push_back(m_receipts.back()); ret.first.push_back(m_receipts.back());
_tq.noteGood(i);
++goodTxs; ++goodTxs;
// cnote << "TX took:" << t.elapsed() * 1000; // cnote << "TX took:" << t.elapsed() * 1000;
} }
else if (i.second.gasPrice() < _gp.ask(*this) * 9 / 10) else if (t.gasPrice() < _gp.ask(*this) * 9 / 10)
{ {
// less than 90% of our ask price for gas. drop. // less than 90% of our ask price for gas. drop.
cnote << i.first << "Dropping El Cheapo transaction (<90% of ask price)"; cnote << t.sha3() << "Dropping El Cheapo transaction (<90% of ask price)";
_tq.drop(i.first); _tq.drop(t.sha3());
} }
} }
catch (InvalidNonce const& in) catch (InvalidNonce const& in)
@ -535,57 +535,54 @@ pair<TransactionReceipts, bool> State::sync(BlockChain const& _bc, TransactionQu
if (req > got) if (req > got)
{ {
// too old // too old
for (Transaction const& t: m_transactions) for (Transaction const& mt: m_transactions)
if (t.from() == i.second.from()) {
if (mt.from() == t.from())
{ {
if (t.nonce() < i.second.nonce()) if (mt.nonce() < t.nonce())
{ cnote << t.sha3() << "Dropping old transaction (nonce too low)";
cnote << i.first << "Dropping old transaction (nonce too low)"; else if (mt.nonce() == t.nonce() && mt.gasPrice() <= t.gasPrice())
_tq.drop(i.first); cnote << t.sha3() << "Dropping old transaction (gas price lower)";
}
else if (t.nonce() == i.second.nonce() && t.gasPrice() <= i.second.gasPrice())
{
cnote << i.first << "Dropping old transaction (gas price lower)";
_tq.drop(i.first);
}
} }
}
_tq.drop(t.sha3());
} }
else if (got > req + _tq.waiting(i.second.sender())) else if (got > req + _tq.waiting(t.sender()))
{ {
// too new // too new
cnote << i.first << "Dropping new transaction (too many nonces ahead)"; cnote << t.sha3() << "Dropping new transaction (too many nonces ahead)";
_tq.drop(i.first); _tq.drop(t.sha3());
} }
else else
_tq.setFuture(i); _tq.setFuture(t.sha3());
} }
catch (BlockGasLimitReached const& e) catch (BlockGasLimitReached const& e)
{ {
bigint const& got = *boost::get_error_info<errinfo_got>(e); bigint const& got = *boost::get_error_info<errinfo_got>(e);
if (got > m_currentBlock.gasLimit) if (got > m_currentBlock.gasLimit)
{ {
cnote << i.first << "Dropping over-gassy transaction (gas > block's gas limit)"; cnote << t.sha3() << "Dropping over-gassy transaction (gas > block's gas limit)";
_tq.drop(i.first); _tq.drop(t.sha3());
} }
else else
{ {
// Temporarily no gas left in current block. // Temporarily no gas left in current block.
// OPTIMISE: could note this and then we don't evaluate until a block that does have the gas left. // OPTIMISE: could note this and then we don't evaluate until a block that does have the gas left.
// for now, just leave alone. // for now, just leave alone.
// _tq.setFuture(i); // _tq.setFuture(t.sha3());
} }
} }
catch (Exception const& _e) catch (Exception const& _e)
{ {
// Something else went wrong - drop it. // Something else went wrong - drop it.
cnote << i.first << "Dropping invalid transaction:" << diagnostic_information(_e); cnote << t.sha3() << "Dropping invalid transaction:" << diagnostic_information(_e);
_tq.drop(i.first); _tq.drop(t.sha3());
} }
catch (std::exception const&) catch (std::exception const&)
{ {
// Something else went wrong - drop it. // Something else went wrong - drop it.
_tq.drop(i.first); _tq.drop(t.sha3());
cnote << i.first << "Transaction caused low-level exception :("; cnote << t.sha3() << "Transaction caused low-level exception :(";
} }
} }
if (chrono::steady_clock::now() > deadline) if (chrono::steady_clock::now() > deadline)

215
libethereum/TransactionQueue.cpp

@ -95,10 +95,20 @@ ImportResult TransactionQueue::import(Transaction const& _transaction, ImportCal
return ret; return ret;
} }
std::unordered_map<h256, Transaction> TransactionQueue::transactions() const Transactions TransactionQueue::topTransactions(unsigned _limit) const
{ {
ReadGuard l(m_lock); ReadGuard l(m_lock);
return m_current; Transactions res;
unsigned n = _limit;
for (auto t = m_current.begin(); n != 0 && t != m_current.end(); ++t, --n)
res.push_back(t->transaction);
return res;
}
h256Hash TransactionQueue::knownTransactions() const
{
ReadGuard l(m_lock);
return m_known;
} }
ImportResult TransactionQueue::manageImport_WITH_LOCK(h256 const& _h, Transaction const& _transaction, ImportCallback const& _cb) ImportResult TransactionQueue::manageImport_WITH_LOCK(h256 const& _h, Transaction const& _transaction, ImportCallback const& _cb)
@ -108,35 +118,49 @@ ImportResult TransactionQueue::manageImport_WITH_LOCK(h256 const& _h, Transactio
// Check validity of _transactionRLP as a transaction. To do this we just deserialise and attempt to determine the sender. // Check validity of _transactionRLP as a transaction. To do this we just deserialise and attempt to determine the sender.
// If it doesn't work, the signature is bad. // If it doesn't work, the signature is bad.
// The transaction's nonce may yet be invalid (or, it could be "valid" but we may be missing a marginally older transaction). // The transaction's nonce may yet be invalid (or, it could be "valid" but we may be missing a marginally older transaction).
assert(_h == _transaction.sha3());
// Remove any prior transaction with the same nonce but a lower gas price. // Remove any prior transaction with the same nonce but a lower gas price.
// Bomb out if there's a prior transaction with higher gas price. // Bomb out if there's a prior transaction with higher gas price.
auto r = m_senders.equal_range(_transaction.from()); auto cs = m_currentByAddressAndNonce.find(_transaction.from());
for (auto it = r.first; it != r.second; ++it) if (cs != m_currentByAddressAndNonce.end())
if (m_current.count(it->second) && m_current[it->second].nonce() == _transaction.nonce()) {
if (_transaction.gasPrice() < m_current[it->second].gasPrice()) auto t = cs->second.find(_transaction.nonce());
if (t != cs->second.end())
{
if (_transaction.gasPrice() < (*t->second).transaction.gasPrice())
return ImportResult::OverbidGasPrice; return ImportResult::OverbidGasPrice;
else else
{ remove_WITH_LOCK((*t->second).transaction.sha3());
remove_WITH_LOCK(it->second); }
break; }
} auto fs = m_future.find(_transaction.from());
else if (m_future.count(it->second) && m_future[it->second].nonce() == _transaction.nonce()) if (fs != m_future.end())
if (_transaction.gasPrice() < m_future[it->second].gasPrice()) {
auto t = fs->second.find(_transaction.nonce());
if (t != fs->second.end())
{
if (_transaction.gasPrice() < t->second.transaction.gasPrice())
return ImportResult::OverbidGasPrice; return ImportResult::OverbidGasPrice;
else else
{ {
remove_WITH_LOCK(it->second); fs->second.erase(t);
break; --m_futureSize;
} }
else {} }
}
// If valid, append to blocks. // If valid, append to blocks.
insertCurrent_WITH_LOCK(make_pair(_h, _transaction)); insertCurrent_WITH_LOCK(make_pair(_h, _transaction));
m_known.insert(_h);
if (_cb) if (_cb)
m_callbacks[_h] = _cb; m_callbacks[_h] = _cb;
clog(TransactionQueueTraceChannel) << "Queued vaguely legit-looking transaction" << _h; clog(TransactionQueueTraceChannel) << "Queued vaguely legit-looking transaction" << _h;
while (m_current.size() > m_limit)
{
clog(TransactionQueueTraceChannel) << "Dropping out of bounds transaction" << _h;
remove_WITH_LOCK(m_current.rbegin()->transaction.sha3());
}
m_onReady(); m_onReady();
} }
catch (Exception const& _e) catch (Exception const& _e)
@ -163,93 +187,113 @@ u256 TransactionQueue::maxNonce(Address const& _a) const
u256 TransactionQueue::maxNonce_WITH_LOCK(Address const& _a) const u256 TransactionQueue::maxNonce_WITH_LOCK(Address const& _a) const
{ {
u256 ret = 0; u256 ret = 0;
auto r = m_senders.equal_range(_a); auto cs = m_currentByAddressAndNonce.find(_a);
for (auto it = r.first; it != r.second; ++it) if (cs != m_currentByAddressAndNonce.end() && !cs->second.empty())
if (m_current.count(it->second)) ret = cs->second.rbegin()->first;
{ auto fs = m_future.find(_a);
// cdebug << it->first << "1+" << m_current.at(it->second).nonce(); if (fs != m_future.end() && !fs->second.empty())
ret = max(ret, m_current.at(it->second).nonce() + 1); ret = std::max(ret, fs->second.rbegin()->first);
} return ret + 1;
else if (m_future.count(it->second))
{
// cdebug << it->first << "1+" << m_future.at(it->second).nonce();
ret = max(ret, m_future.at(it->second).nonce() + 1);
}
else
{
cwarn << "ERRROR!!!!! m_senders references non-current transaction";
cwarn << "Sender" << it->first << "has transaction" << it->second;
cwarn << "Count of m_current for" << it->second << "is" << m_current.count(it->second);
}
return ret;
} }
void TransactionQueue::insertCurrent_WITH_LOCK(std::pair<h256, Transaction> const& _p) void TransactionQueue::insertCurrent_WITH_LOCK(std::pair<h256, Transaction> const& _p)
{ {
// cdebug << "txQ::insertCurrent" << _p.first << _p.second.sender() << _p.second.nonce(); if (m_currentByHash.count(_p.first))
m_senders.insert(make_pair(_p.second.sender(), _p.first)); {
if (m_current.count(_p.first))
cwarn << "Transaction hash" << _p.first << "already in current?!"; cwarn << "Transaction hash" << _p.first << "already in current?!";
m_current.insert(_p); return;
} }
bool TransactionQueue::remove_WITH_LOCK(h256 const& _txHash) Transaction const& t = _p.second;
{ // Insert into current
// cdebug << "txQ::remove" << _txHash; auto inserted = m_currentByAddressAndNonce[t.from()].insert(std::make_pair(t.nonce(), PriorityQueue::iterator()));
for (std::unordered_map<h256, Transaction>* pool: { &m_current, &m_future }) PriorityQueue::iterator handle = m_current.emplace(VerifiedTransaction(t));
inserted.first->second = handle;
m_currentByHash[_p.first] = handle;
// Move following transactions from future to current
auto fs = m_future.find(t.from());
if (fs != m_future.end())
{ {
auto pit = pool->find(_txHash); u256 nonce = t.nonce() + 1;
if (pit != pool->end()) auto fb = fs->second.find(nonce);
if (fb != fs->second.end())
{ {
auto r = m_senders.equal_range(pit->second.sender()); auto ft = fb;
for (auto i = r.first; i != r.second; ++i) while (ft != fs->second.end() && ft->second.transaction.nonce() == nonce)
if (i->second == _txHash) {
{ inserted = m_currentByAddressAndNonce[t.from()].insert(std::make_pair(ft->second.transaction.nonce(), PriorityQueue::iterator()));
m_senders.erase(i); PriorityQueue::iterator handle = m_current.emplace(move(ft->second));
break; inserted.first->second = handle;
} m_currentByHash[(*handle).transaction.sha3()] = handle;
// cdebug << "=> nonce" << pit->second.nonce(); --m_futureSize;
pool->erase(pit); ++ft;
return true; ++nonce;
}
fs->second.erase(fb, ft);
if (fs->second.empty())
m_future.erase(t.from());
} }
} }
return false; m_known.insert(_p.first);
}
bool TransactionQueue::remove_WITH_LOCK(h256 const& _txHash)
{
auto t = m_currentByHash.find(_txHash);
if (t == m_currentByHash.end())
return false;
Address from = (*t->second).transaction.from();
auto it = m_currentByAddressAndNonce.find(from);
assert (it != m_currentByAddressAndNonce.end());
it->second.erase((*t->second).transaction.nonce());
m_current.erase(t->second);
m_currentByHash.erase(t);
if (it->second.empty())
m_currentByAddressAndNonce.erase(it);
m_known.erase(_txHash);
return true;
} }
unsigned TransactionQueue::waiting(Address const& _a) const unsigned TransactionQueue::waiting(Address const& _a) const
{ {
ReadGuard l(m_lock); ReadGuard l(m_lock);
auto it = m_senders.equal_range(_a);
unsigned ret = 0; unsigned ret = 0;
for (auto i = it.first; i != it.second; ++i, ++ret) {} auto cs = m_currentByAddressAndNonce.find(_a);
if (cs != m_currentByAddressAndNonce.end())
ret = cs->second.size();
auto fs = m_future.find(_a);
if (fs != m_future.end())
ret += fs->second.size();
return ret; return ret;
} }
void TransactionQueue::setFuture(std::pair<h256, Transaction> const& _t) void TransactionQueue::setFuture(h256 const& _txHash)
{ {
// cdebug << "txQ::setFuture" << _t.first; // cdebug << "txQ::setFuture" << _t.first;
WriteGuard l(m_lock); WriteGuard l(m_lock);
if (m_current.count(_t.first)) auto it = m_currentByHash.find(_txHash);
{ if (it == m_currentByHash.end())
m_future.insert(_t); return;
m_current.erase(_t.first);
}
}
void TransactionQueue::noteGood(std::pair<h256, Transaction> const& _t) VerifiedTransaction const& st = *(it->second);
{
// cdebug << "txQ::noteGood" << _t.first; Address from = st.transaction.from();
WriteGuard l(m_lock); auto& queue = m_currentByAddressAndNonce[from];
auto r = m_senders.equal_range(_t.second.sender()); auto& target = m_future[from];
for (auto it = r.first; it != r.second; ++it) auto cutoff = queue.lower_bound(st.transaction.nonce());
for (auto m = cutoff; m != queue.end(); ++m)
{ {
auto fit = m_future.find(it->second); VerifiedTransaction& t = const_cast<VerifiedTransaction&>(*(m->second)); // set has only const iterators. Since we are moving out of container that's fine
if (fit != m_future.end()) m_currentByHash.erase(t.transaction.sha3());
{ target.emplace(t.transaction.nonce(), move(t));
m_current.insert(*fit); m_current.erase(m->second);
m_future.erase(fit); ++m_futureSize;
}
} }
queue.erase(cutoff, queue.end());
if (queue.empty())
m_currentByAddressAndNonce.erase(from);
} }
void TransactionQueue::drop(h256 const& _txHash) void TransactionQueue::drop(h256 const& _txHash)
@ -261,7 +305,18 @@ void TransactionQueue::drop(h256 const& _txHash)
UpgradeGuard ul(l); UpgradeGuard ul(l);
m_dropped.insert(_txHash); m_dropped.insert(_txHash);
m_known.erase(_txHash);
remove_WITH_LOCK(_txHash); remove_WITH_LOCK(_txHash);
}
void TransactionQueue::clear()
{
WriteGuard l(m_lock);
m_known.clear();
m_current.clear();
m_currentByAddressAndNonce.clear();
m_currentByHash.clear();
m_future.clear();
m_futureSize = 0;
} }

55
libethereum/TransactionQueue.h

@ -43,6 +43,7 @@ enum class IfDropped { Ignore, Retry };
/** /**
* @brief A queue of Transactions, each stored as RLP. * @brief A queue of Transactions, each stored as RLP.
* Maintains a transaction queue sorted by nonce diff and gas price
* @threadsafe * @threadsafe
*/ */
class TransactionQueue class TransactionQueue
@ -50,6 +51,10 @@ class TransactionQueue
public: public:
using ImportCallback = std::function<void(ImportResult)>; using ImportCallback = std::function<void(ImportResult)>;
/// @brief TransactionQueue
/// @param _limit Maximum number of pending transactions in the queue
/// @param _futureLimit Maximum number of future nonce transactions
TransactionQueue(unsigned _limit = 1024, unsigned _futureLimit = 1024): m_current(PriorityCompare { *this }), m_limit(_limit), m_futureLimit(_futureLimit) {}
ImportResult import(Transaction const& _tx, ImportCallback const& _cb = ImportCallback(), IfDropped _ik = IfDropped::Ignore); ImportResult import(Transaction const& _tx, ImportCallback const& _cb = ImportCallback(), IfDropped _ik = IfDropped::Ignore);
ImportResult import(bytes const& _tx, ImportCallback const& _cb = ImportCallback(), IfDropped _ik = IfDropped::Ignore) { return import(&_tx, _cb, _ik); } ImportResult import(bytes const& _tx, ImportCallback const& _cb = ImportCallback(), IfDropped _ik = IfDropped::Ignore) { return import(&_tx, _cb, _ik); }
ImportResult import(bytesConstRef _tx, ImportCallback const& _cb = ImportCallback(), IfDropped _ik = IfDropped::Ignore); ImportResult import(bytesConstRef _tx, ImportCallback const& _cb = ImportCallback(), IfDropped _ik = IfDropped::Ignore);
@ -57,17 +62,40 @@ public:
void drop(h256 const& _txHash); void drop(h256 const& _txHash);
unsigned waiting(Address const& _a) const; unsigned waiting(Address const& _a) const;
std::unordered_map<h256, Transaction> transactions() const; Transactions topTransactions(unsigned _limit) const;
std::pair<unsigned, unsigned> items() const { ReadGuard l(m_lock); return std::make_pair(m_current.size(), m_future.size()); } h256Hash knownTransactions() const;
u256 maxNonce(Address const& _a) const; u256 maxNonce(Address const& _a) const;
void setFuture(h256 const& _t);
void setFuture(std::pair<h256, Transaction> const& _t); void clear();
void noteGood(std::pair<h256, Transaction> const& _t);
void clear() { WriteGuard l(m_lock); m_senders.clear(); m_known.clear(); m_current.clear(); m_future.clear(); }
template <class T> Handler onReady(T const& _t) { return m_onReady.add(_t); } template <class T> Handler onReady(T const& _t) { return m_onReady.add(_t); }
private: private:
struct VerifiedTransaction
{
VerifiedTransaction(Transaction const& _t): transaction(_t) {}
VerifiedTransaction(VerifiedTransaction&& _t): transaction(std::move(_t.transaction)) {}
VerifiedTransaction(VerifiedTransaction const&) = delete;
VerifiedTransaction operator=(VerifiedTransaction const&) = delete;
Transaction transaction;
};
struct PriorityCompare
{
TransactionQueue& queue;
bool operator()(VerifiedTransaction const& _first, VerifiedTransaction const& _second) const
{
u256 const& height1 = _first.transaction.nonce() - queue.m_currentByAddressAndNonce[_first.transaction.sender()].begin()->first;
u256 const& height2 = _second.transaction.nonce() - queue.m_currentByAddressAndNonce[_second.transaction.sender()].begin()->first;
return height1 < height2 || (height1 == height2 && _first.transaction.gasPrice() > _second.transaction.gasPrice());
}
};
// Use a set with dynamic comparator for minmax priority queue. The comparator takes into account min account nonce. Updating it does not affect the order.
using PriorityQueue = std::multiset<VerifiedTransaction, PriorityCompare>;
ImportResult check_WITH_LOCK(h256 const& _h, IfDropped _ik); ImportResult check_WITH_LOCK(h256 const& _h, IfDropped _ik);
ImportResult manageImport_WITH_LOCK(h256 const& _h, Transaction const& _transaction, ImportCallback const& _cb); ImportResult manageImport_WITH_LOCK(h256 const& _h, Transaction const& _transaction, ImportCallback const& _cb);
@ -77,12 +105,19 @@ private:
mutable SharedMutex m_lock; ///< General lock. mutable SharedMutex m_lock; ///< General lock.
h256Hash m_known; ///< Hashes of transactions in both sets. h256Hash m_known; ///< Hashes of transactions in both sets.
std::unordered_multimap<Address, h256> m_senders; ///< Mapping from the sender address to the transaction hash; useful for determining the nonce of a given sender.
std::unordered_map<h256, Transaction> m_current; ///< Map of SHA3(tx) to tx.
std::unordered_map<h256, Transaction> m_future; ///< For transactions that have a future nonce; we re-insert into current once the sender has a valid TX.
std::unordered_map<h256, std::function<void(ImportResult)>> m_callbacks; ///< Called once. std::unordered_map<h256, std::function<void(ImportResult)>> m_callbacks; ///< Called once.
h256Hash m_dropped; ///< Transactions that have previously been dropped. h256Hash m_dropped; ///< Transactions that have previously been dropped
PriorityQueue m_current;
std::unordered_map<h256, PriorityQueue::iterator> m_currentByHash; ///< Transaction hash to set ref
std::unordered_map<Address, std::map<u256, PriorityQueue::iterator>> m_currentByAddressAndNonce; ///< Transactions grouped by account and nonce
std::unordered_map<Address, std::map<u256, VerifiedTransaction>> m_future; /// Future transactions
Signal m_onReady; ///< Called when a subsequent call to import transactions will return a non-empty container. Be nice and exit fast. Signal m_onReady; ///< Called when a subsequent call to import transactions will return a non-empty container. Be nice and exit fast.
unsigned m_limit; ///< Max number of pending transactions
unsigned m_futureLimit; ///< Max number of future transactions
unsigned m_futureSize = 0; ///< Current number of future transactions
}; };
} }

2
libethereum/VerifiedBlock.h

@ -43,7 +43,7 @@ struct VerifiedBlockRef
/// @brief Verified block info, combines block data and verified info/transactions /// @brief Verified block info, combines block data and verified info/transactions
struct VerifiedBlock struct VerifiedBlock
{ {
VerifiedBlock() {}; VerifiedBlock() {}
VerifiedBlock(BlockInfo&& _bi) VerifiedBlock(BlockInfo&& _bi)
{ {

2
libp2p/Common.h

@ -236,7 +236,7 @@ template <> struct hash<bi::address>
return boost::hash_range(range.begin(), range.end()); return boost::hash_range(range.begin(), range.end());
} }
if (_a.is_unspecified()) if (_a.is_unspecified())
return static_cast<size_t>(0x3487194039229152ul); // Chosen by fair dice roll, guaranteed to be random return static_cast<size_t>(0x3487194039229152ull); // Chosen by fair dice roll, guaranteed to be random
return std::hash<std::string>()(_a.to_string()); return std::hash<std::string>()(_a.to_string());
} }
}; };

4
test/libethereum/blockchain.cpp

@ -200,8 +200,8 @@ void doBlockchainTests(json_spirit::mValue& _v, bool _fillin)
//get valid transactions //get valid transactions
Transactions txList; Transactions txList;
for (auto const& txi: txs.transactions()) for (auto const& txi: txs.topTransactions(std::numeric_limits<unsigned>::max()))
txList.push_back(txi.second); txList.push_back(txi);
blObj["transactions"] = writeTransactionsToJson(txList); blObj["transactions"] = writeTransactionsToJson(txList);
BlockInfo current_BlockHeader = state.info(); BlockInfo current_BlockHeader = state.info();

108
test/libethereum/transactionqueue.cpp

@ -61,4 +61,112 @@ BOOST_AUTO_TEST_CASE(maxNonce)
} }
BOOST_AUTO_TEST_CASE(priority)
{
dev::eth::TransactionQueue txq;
const u256 gasCostCheap = 10 * szabo;
const u256 gasCostMed = 20 * szabo;
const u256 gasCostHigh = 30 * szabo;
const u256 gas = 25000;
Address dest = Address("0x095e7baea6a6c7c4c2dfeb977efac326af552d87");
Secret sender1 = Secret("0x3333333333333333333333333333333333333333333333333333333333333333");
Secret sender2 = Secret("0x4444444444444444444444444444444444444444444444444444444444444444");
Transaction tx0(0, gasCostCheap, gas, dest, bytes(), 0, sender1 );
Transaction tx0_1(1, gasCostMed, gas, dest, bytes(), 0, sender1 );
Transaction tx1(0, gasCostCheap, gas, dest, bytes(), 1, sender1 );
Transaction tx2(0, gasCostHigh, gas, dest, bytes(), 0, sender2 );
Transaction tx3(0, gasCostCheap + 1, gas, dest, bytes(), 1, sender2 );
Transaction tx4(0, gasCostHigh, gas, dest, bytes(), 2, sender1 );
Transaction tx5(0, gasCostMed, gas, dest, bytes(), 2, sender2 );
txq.import(tx0);
BOOST_CHECK(Transactions { tx0 } == txq.topTransactions(256));
txq.import(tx0);
BOOST_CHECK(Transactions { tx0 } == txq.topTransactions(256));
txq.import(tx0_1);
BOOST_CHECK(Transactions { tx0_1 } == txq.topTransactions(256));
txq.import(tx1);
BOOST_CHECK((Transactions { tx0_1, tx1 }) == txq.topTransactions(256));
txq.import(tx2);
BOOST_CHECK((Transactions { tx2, tx0_1, tx1 }) == txq.topTransactions(256));
txq.import(tx3);
BOOST_CHECK((Transactions { tx2, tx0_1, tx1, tx3 }) == txq.topTransactions(256));
txq.import(tx4);
BOOST_CHECK((Transactions { tx2, tx0_1, tx1, tx3, tx4 }) == txq.topTransactions(256));
txq.import(tx5);
BOOST_CHECK((Transactions { tx2, tx0_1, tx1, tx3, tx5, tx4 }) == txq.topTransactions(256));
txq.drop(tx0_1.sha3());
BOOST_CHECK((Transactions { tx2, tx1, tx3, tx5, tx4 }) == txq.topTransactions(256));
txq.drop(tx1.sha3());
BOOST_CHECK((Transactions { tx2, tx3, tx5, tx4 }) == txq.topTransactions(256));
txq.drop(tx5.sha3());
BOOST_CHECK((Transactions { tx2, tx3, tx4 }) == txq.topTransactions(256));
Transaction tx6(0, gasCostMed, gas, dest, bytes(), 20, sender1 );
txq.import(tx6);
BOOST_CHECK((Transactions { tx2, tx3, tx4, tx6 }) == txq.topTransactions(256));
Transaction tx7(0, gasCostMed, gas, dest, bytes(), 2, sender2 );
txq.import(tx7);
BOOST_CHECK((Transactions { tx2, tx3, tx4, tx6, tx7 }) == txq.topTransactions(256));
}
BOOST_AUTO_TEST_CASE(future)
{
dev::eth::TransactionQueue txq;
// from a94f5374fce5edbc8e2a8697c15331677e6ebf0b
const u256 gasCostMed = 20 * szabo;
const u256 gas = 25000;
Address dest = Address("0x095e7baea6a6c7c4c2dfeb977efac326af552d87");
Secret sender = Secret("0x3333333333333333333333333333333333333333333333333333333333333333");
Transaction tx0(0, gasCostMed, gas, dest, bytes(), 0, sender );
Transaction tx1(0, gasCostMed, gas, dest, bytes(), 1, sender );
Transaction tx2(0, gasCostMed, gas, dest, bytes(), 2, sender );
Transaction tx3(0, gasCostMed, gas, dest, bytes(), 3, sender );
Transaction tx4(0, gasCostMed, gas, dest, bytes(), 4, sender );
txq.import(tx0);
txq.import(tx1);
txq.import(tx2);
txq.import(tx3);
txq.import(tx4);
BOOST_CHECK((Transactions { tx0, tx1, tx2, tx3, tx4 }) == txq.topTransactions(256));
txq.setFuture(tx2.sha3());
BOOST_CHECK((Transactions { tx0, tx1 }) == txq.topTransactions(256));
Transaction tx2_2(1, gasCostMed, gas, dest, bytes(), 2, sender );
txq.import(tx2_2);
BOOST_CHECK((Transactions { tx0, tx1, tx2_2, tx3, tx4 }) == txq.topTransactions(256));
}
BOOST_AUTO_TEST_CASE(lmits)
{
dev::eth::TransactionQueue txq(3, 3);
const u256 gasCostMed = 20 * szabo;
const u256 gas = 25000;
Address dest = Address("0x095e7baea6a6c7c4c2dfeb977efac326af552d87");
Secret sender = Secret("0x3333333333333333333333333333333333333333333333333333333333333333");
Secret sender2 = Secret("0x4444444444444444444444444444444444444444444444444444444444444444");
Transaction tx0(0, gasCostMed, gas, dest, bytes(), 0, sender );
Transaction tx1(0, gasCostMed, gas, dest, bytes(), 1, sender );
Transaction tx2(0, gasCostMed, gas, dest, bytes(), 2, sender );
Transaction tx3(0, gasCostMed, gas, dest, bytes(), 3, sender );
Transaction tx4(0, gasCostMed, gas, dest, bytes(), 4, sender );
Transaction tx5(0, gasCostMed + 1, gas, dest, bytes(), 0, sender2 );
txq.import(tx0);
txq.import(tx1);
txq.import(tx2);
txq.import(tx3);
txq.import(tx4);
txq.import(tx5);
BOOST_CHECK((Transactions { tx5, tx0, tx1 }) == txq.topTransactions(256));
}
BOOST_AUTO_TEST_SUITE_END() BOOST_AUTO_TEST_SUITE_END()

Loading…
Cancel
Save