Browse Source

import transactions in a separate thread

cl-refactor
arkpar 10 years ago
parent
commit
440f6e7dc2
  1. 13
      libethcore/Common.h
  2. 8
      libethereum/BlockChain.cpp
  3. 2
      libethereum/BlockChainSync.h
  4. 8
      libethereum/BlockQueue.h
  5. 13
      libethereum/Client.cpp
  6. 4
      libethereum/Client.h
  7. 63
      libethereum/EthereumHost.cpp
  8. 2
      libethereum/EthereumHost.h
  9. 1
      libethereum/State.cpp
  10. 2
      libethereum/State.h
  11. 138
      libethereum/TransactionQueue.cpp
  12. 47
      libethereum/TransactionQueue.h
  13. 5
      libp2p/Host.h
  14. 1
      test/libethereum/blockchain.cpp

13
libethcore/Common.h

@ -97,10 +97,13 @@ enum class RelativeBlock: BlockNumber
Pending = PendingBlock Pending = PendingBlock
}; };
class Transaction;
struct ImportRoute struct ImportRoute
{ {
h256s deadBlocks; h256s deadBlocks;
h256s liveBlocks; h256s liveBlocks;
std::vector<Transaction> goodTranactions;
}; };
enum class ImportResult enum class ImportResult
@ -129,10 +132,10 @@ struct ImportRequirements
}; };
/// Super-duper signal mechanism. TODO: replace with somthing a bit heavier weight. /// Super-duper signal mechanism. TODO: replace with somthing a bit heavier weight.
class Signal template<typename... Args> class Signal
{ {
public: public:
using Callback = std::function<void()>; using Callback = std::function<void(Args...)>;
class HandlerAux class HandlerAux
{ {
@ -141,7 +144,7 @@ public:
public: public:
~HandlerAux() { if (m_s) m_s->m_fire.erase(m_i); m_s = nullptr; } ~HandlerAux() { if (m_s) m_s->m_fire.erase(m_i); m_s = nullptr; }
void reset() { m_s = nullptr; } void reset() { m_s = nullptr; }
void fire() { m_h(); } void fire(Args&&... _args) { m_h(std::forward<Args>(_args)...); }
private: private:
HandlerAux(unsigned _i, Signal* _s, Callback const& _h): m_i(_i), m_s(_s), m_h(_h) {} HandlerAux(unsigned _i, Signal* _s, Callback const& _h): m_i(_i), m_s(_s), m_h(_h) {}
@ -165,13 +168,13 @@ public:
return h; return h;
} }
void operator()() { for (auto const& f: m_fire) f.second->fire(); } void operator()(Args&... _args) { for (auto const& f: m_fire) f.second->fire(std::forward<Args>(_args)...); }
private: private:
std::map<unsigned, std::shared_ptr<Signal::HandlerAux>> m_fire; std::map<unsigned, std::shared_ptr<Signal::HandlerAux>> m_fire;
}; };
using Handler = std::shared_ptr<Signal::HandlerAux>; template<class... Args> using Handler = std::shared_ptr<typename Signal<Args...>::HandlerAux>;
struct TransactionSkeleton struct TransactionSkeleton
{ {

8
libethereum/BlockChain.cpp

@ -337,6 +337,7 @@ tuple<ImportRoute, bool, unsigned> BlockChain::sync(BlockQueue& _bq, OverlayDB c
h256s fresh; h256s fresh;
h256s dead; h256s dead;
h256s badBlocks; h256s badBlocks;
Transactions goodTransactions;
unsigned count = 0; unsigned count = 0;
for (VerifiedBlock const& block: blocks) for (VerifiedBlock const& block: blocks)
if (!badBlocks.empty()) if (!badBlocks.empty())
@ -351,6 +352,7 @@ tuple<ImportRoute, bool, unsigned> BlockChain::sync(BlockQueue& _bq, OverlayDB c
r = import(block.verified, _stateDB, ImportRequirements::Default & ~ImportRequirements::ValidNonce & ~ImportRequirements::CheckUncles); r = import(block.verified, _stateDB, ImportRequirements::Default & ~ImportRequirements::ValidNonce & ~ImportRequirements::CheckUncles);
fresh += r.liveBlocks; fresh += r.liveBlocks;
dead += r.deadBlocks; dead += r.deadBlocks;
goodTransactions += r.goodTranactions;
++count; ++count;
} }
catch (dev::eth::UnknownParent) catch (dev::eth::UnknownParent)
@ -377,7 +379,7 @@ tuple<ImportRoute, bool, unsigned> BlockChain::sync(BlockQueue& _bq, OverlayDB c
badBlocks.push_back(block.verified.info.hash()); badBlocks.push_back(block.verified.info.hash());
} }
} }
return make_tuple(ImportRoute{dead, fresh}, _bq.doneDrain(badBlocks), count); return make_tuple(ImportRoute{dead, fresh, goodTransactions}, _bq.doneDrain(badBlocks), count);
} }
pair<ImportResult, ImportRoute> BlockChain::attemptImport(bytes const& _block, OverlayDB const& _stateDB, ImportRequirements::value _ir) noexcept pair<ImportResult, ImportRoute> BlockChain::attemptImport(bytes const& _block, OverlayDB const& _stateDB, ImportRequirements::value _ir) noexcept
@ -497,6 +499,7 @@ ImportRoute BlockChain::import(VerifiedBlockRef const& _block, OverlayDB const&
BlockReceipts br; BlockReceipts br;
u256 td; u256 td;
Transactions goodTransactions;
#if ETH_CATCH #if ETH_CATCH
try try
#endif #endif
@ -510,6 +513,7 @@ ImportRoute BlockChain::import(VerifiedBlockRef const& _block, OverlayDB const&
{ {
blb.blooms.push_back(s.receipt(i).bloom()); blb.blooms.push_back(s.receipt(i).bloom());
br.receipts.push_back(s.receipt(i)); br.receipts.push_back(s.receipt(i));
goodTransactions.push_back(s.pending()[i]);
} }
s.cleanup(true); s.cleanup(true);
@ -750,7 +754,7 @@ ImportRoute BlockChain::import(VerifiedBlockRef const& _block, OverlayDB const&
dead.push_back(h); dead.push_back(h);
else else
fresh.push_back(h); fresh.push_back(h);
return ImportRoute{dead, fresh}; return ImportRoute{dead, fresh, move(goodTransactions)};
} }
void BlockChain::clearBlockBlooms(unsigned _begin, unsigned _end) void BlockChain::clearBlockBlooms(unsigned _begin, unsigned _end)

2
libethereum/BlockChainSync.h

@ -114,7 +114,7 @@ protected:
void requestBlocks(std::shared_ptr<EthereumPeer> _peer); void requestBlocks(std::shared_ptr<EthereumPeer> _peer);
protected: protected:
Handler m_bqRoomAvailable; ///< Triggered once block queue Handler<> m_bqRoomAvailable; ///< Triggered once block queue
mutable RecursiveMutex x_sync; mutable RecursiveMutex x_sync;
SyncState m_state = SyncState::Idle; ///< Current sync state SyncState m_state = SyncState::Idle; ///< Current sync state
unsigned m_estimatedHashes = 0; ///< Number of estimated hashes for the last peer over PV60. Used for status reporting only. unsigned m_estimatedHashes = 0; ///< Number of estimated hashes for the last peer over PV60. Used for status reporting only.

8
libethereum/BlockQueue.h

@ -111,8 +111,8 @@ public:
/// Get some infomration on the given block's status regarding us. /// Get some infomration on the given block's status regarding us.
QueueStatus blockStatus(h256 const& _h) const; QueueStatus blockStatus(h256 const& _h) const;
template <class T> Handler onReady(T const& _t) { return m_onReady.add(_t); } template <class T> Handler<> onReady(T const& _t) { return m_onReady.add(_t); }
template <class T> Handler onRoomAvailable(T const& _t) { return m_onRoomAvailable.add(_t); } template <class T> Handler<> onRoomAvailable(T const& _t) { return m_onRoomAvailable.add(_t); }
template <class T> void setOnBad(T const& _t) { m_onBad = _t; } template <class T> void setOnBad(T const& _t) { m_onBad = _t; }
@ -145,8 +145,8 @@ private:
std::unordered_multimap<h256, std::pair<h256, bytes>> m_unknown; ///< For blocks that have an unknown parent; we map their parent hash to the block stuff, and insert once the block appears. std::unordered_multimap<h256, std::pair<h256, bytes>> m_unknown; ///< For blocks that have an unknown parent; we map their parent hash to the block stuff, and insert once the block appears.
h256Hash m_knownBad; ///< Set of blocks that we know will never be valid. h256Hash m_knownBad; ///< Set of blocks that we know will never be valid.
std::multimap<unsigned, std::pair<h256, bytes>> m_future; ///< Set of blocks that are not yet valid. Ordered by timestamp std::multimap<unsigned, std::pair<h256, bytes>> m_future; ///< Set of blocks that are not yet valid. Ordered by timestamp
Signal m_onReady; ///< Called when a subsequent call to import blocks will return a non-empty container. Be nice and exit fast. Signal<> m_onReady; ///< Called when a subsequent call to import blocks will return a non-empty container. Be nice and exit fast.
Signal m_onRoomAvailable; ///< Called when space for new blocks becomes availabe after a drain. Be nice and exit fast. Signal<> m_onRoomAvailable; ///< Called when space for new blocks becomes availabe after a drain. Be nice and exit fast.
mutable Mutex m_verification; ///< Mutex that allows writing to m_verified, m_verifying and m_unverified. mutable Mutex m_verification; ///< Mutex that allows writing to m_verified, m_verifying and m_unverified.
std::condition_variable m_moreToVerify; ///< Signaled when m_unverified has a new entry. std::condition_variable m_moreToVerify; ///< Signaled when m_unverified has a new entry.

13
libethereum/Client.cpp

@ -604,19 +604,18 @@ void Client::onChainChanged(ImportRoute const& _ir)
for (auto const& t: m_bc.transactions(h)) for (auto const& t: m_bc.transactions(h))
{ {
clog(ClientTrace) << "Resubmitting dead-block transaction " << Transaction(t, CheckTransaction::None); clog(ClientTrace) << "Resubmitting dead-block transaction " << Transaction(t, CheckTransaction::None);
m_tq.import(t, TransactionQueue::ImportCallback(), IfDropped::Retry); m_tq.import(t, IfDropped::Retry);
} }
} }
// remove transactions from m_tq nicely rather than relying on out of date nonce later on. // remove transactions from m_tq nicely rather than relying on out of date nonce later on.
for (auto const& h: _ir.liveBlocks) for (auto const& h: _ir.liveBlocks)
{
clog(ClientTrace) << "Live block:" << h; clog(ClientTrace) << "Live block:" << h;
for (auto const& th: m_bc.transactionHashes(h))
for (auto const& t: _ir.goodTranactions)
{ {
clog(ClientTrace) << "Safely dropping transaction " << th; clog(ClientTrace) << "Safely dropping transaction " << t.sha3();
m_tq.drop(th); m_tq.dropGood(t);
}
} }
if (auto h = m_host.lock()) if (auto h = m_host.lock())
@ -651,7 +650,7 @@ void Client::onChainChanged(ImportRoute const& _ir)
for (auto const& t: m_postMine.pending()) for (auto const& t: m_postMine.pending())
{ {
clog(ClientTrace) << "Resubmitting post-mine transaction " << t; clog(ClientTrace) << "Resubmitting post-mine transaction " << t;
auto ir = m_tq.import(t, TransactionQueue::ImportCallback(), IfDropped::Retry); auto ir = m_tq.import(t, IfDropped::Retry);
if (ir != ImportResult::Success) if (ir != ImportResult::Success)
onTransactionQueueReady(); onTransactionQueueReady();
} }

4
libethereum/Client.h

@ -310,8 +310,8 @@ private:
GenericFarm<ProofOfWork> m_farm; ///< Our mining farm. GenericFarm<ProofOfWork> m_farm; ///< Our mining farm.
Handler m_tqReady; Handler<> m_tqReady;
Handler m_bqReady; Handler<> m_bqReady;
bool m_wouldMine = false; ///< True if we /should/ be mining. bool m_wouldMine = false; ///< True if we /should/ be mining.
bool m_turboMining = false; ///< Don't squander all of our time mining actually just sleeping. bool m_turboMining = false; ///< Don't squander all of our time mining actually just sleeping.

63
libethereum/EthereumHost.cpp

@ -54,6 +54,7 @@ EthereumHost::EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQu
m_networkId (_networkId) m_networkId (_networkId)
{ {
m_latestBlockSent = _ch.currentHash(); m_latestBlockSent = _ch.currentHash();
m_tq.onImport([this](ImportResult _ir, h256 const& _h, h512 const& _nodeId) { onTransactionImported(_ir, _h, _nodeId); });
} }
EthereumHost::~EthereumHost() EthereumHost::~EthereumHost()
@ -68,6 +69,7 @@ bool EthereumHost::ensureInitialised()
m_latestBlockSent = m_chain.currentHash(); m_latestBlockSent = m_chain.currentHash();
clog(NetNote) << "Initialising: latest=" << m_latestBlockSent; clog(NetNote) << "Initialising: latest=" << m_latestBlockSent;
Guard l(x_transactions);
m_transactionsSent = m_tq.knownTransactions(); m_transactionsSent = m_tq.knownTransactions();
return true; return true;
} }
@ -82,6 +84,7 @@ void EthereumHost::reset()
m_sync.reset(); m_sync.reset();
m_latestBlockSent = h256(); m_latestBlockSent = h256();
Guard tl(x_transactions);
m_transactionsSent.clear(); m_transactionsSent.clear();
} }
@ -116,6 +119,8 @@ void EthereumHost::maintainTransactions()
// Send any new transactions. // Send any new transactions.
unordered_map<std::shared_ptr<EthereumPeer>, std::vector<size_t>> peerTransactions; unordered_map<std::shared_ptr<EthereumPeer>, std::vector<size_t>> peerTransactions;
auto ts = m_tq.topTransactions(c_maxSendTransactions); auto ts = m_tq.topTransactions(c_maxSendTransactions);
{
Guard l(x_transactions);
for (size_t i = 0; i < ts.size(); ++i) for (size_t i = 0; i < ts.size(); ++i)
{ {
auto const& t = ts[i]; auto const& t = ts[i];
@ -126,6 +131,7 @@ void EthereumHost::maintainTransactions()
} }
for (auto const& t: ts) for (auto const& t: ts)
m_transactionsSent.insert(t.sha3()); m_transactionsSent.insert(t.sha3());
}
foreachPeer([&](shared_ptr<EthereumPeer> _p) foreachPeer([&](shared_ptr<EthereumPeer> _p)
{ {
bytes b; bytes b;
@ -291,28 +297,7 @@ void EthereumHost::onPeerTransactions(std::shared_ptr<EthereumPeer> _peer, RLP c
} }
unsigned itemCount = _r.itemCount(); unsigned itemCount = _r.itemCount();
clog(NetAllDetail) << "Transactions (" << dec << itemCount << "entries)"; clog(NetAllDetail) << "Transactions (" << dec << itemCount << "entries)";
Guard l(_peer->x_knownTransactions); m_tq.enqueue(_r, _peer->session()->id());
for (unsigned i = 0; i < min<unsigned>(itemCount, 32); ++i) // process 256 transactions at most. TODO: much better solution.
{
auto h = sha3(_r[i].data());
_peer->m_knownTransactions.insert(h);
ImportResult ir = m_tq.import(_r[i].data());
switch (ir)
{
case ImportResult::Malformed:
_peer->addRating(-100);
break;
case ImportResult::AlreadyKnown:
// if we already had the transaction, then don't bother sending it on.
m_transactionsSent.insert(h);
_peer->addRating(0);
break;
case ImportResult::Success:
_peer->addRating(100);
break;
default:;
}
}
} }
void EthereumHost::onPeerAborting() void EthereumHost::onPeerAborting()
@ -344,3 +329,37 @@ SyncStatus EthereumHost::status() const
return SyncStatus(); return SyncStatus();
return m_sync->status(); return m_sync->status();
} }
void EthereumHost::onTransactionImported(ImportResult _ir, h256 const& _h, h512 const& _nodeId)
{
auto session = host()->peerSession(_nodeId);
if (!session)
return;
std::shared_ptr<EthereumPeer> peer = session->cap<EthereumPeer>();
if (!peer)
peer = session->cap<EthereumPeer>(c_oldProtocolVersion);
if (!peer)
return;
Guard l(peer->x_knownTransactions);
peer->m_knownTransactions.insert(_h);
switch (_ir)
{
case ImportResult::Malformed:
peer->addRating(-100);
break;
case ImportResult::AlreadyKnown:
// if we already had the transaction, then don't bother sending it on.
{
Guard l(x_transactions);
m_transactionsSent.insert(_h);
}
peer->addRating(0);
break;
case ImportResult::Success:
peer->addRating(100);
break;
default:;
}
}

2
libethereum/EthereumHost.h

@ -105,6 +105,7 @@ private:
void maintainTransactions(); void maintainTransactions();
void maintainBlocks(h256 const& _currentBlock); void maintainBlocks(h256 const& _currentBlock);
void onTransactionImported(ImportResult _ir, h256 const& _h, h512 const& _nodeId);
/// Check to see if the network peer-state initialisation has happened. /// Check to see if the network peer-state initialisation has happened.
bool isInitialised() const { return (bool)m_latestBlockSent; } bool isInitialised() const { return (bool)m_latestBlockSent; }
@ -132,6 +133,7 @@ private:
bool m_newBlocks = false; bool m_newBlocks = false;
mutable Mutex x_sync; mutable Mutex x_sync;
mutable Mutex x_transactions;
DownloadMan m_man; DownloadMan m_man;
std::unique_ptr<BlockChainSync> m_sync; std::unique_ptr<BlockChainSync> m_sync;
}; };

1
libethereum/State.cpp

@ -38,6 +38,7 @@
#include "Executive.h" #include "Executive.h"
#include "CachedAddressState.h" #include "CachedAddressState.h"
#include "CanonBlockChain.h" #include "CanonBlockChain.h"
#include "TransactionQueue.h"
using namespace std; using namespace std;
using namespace dev; using namespace dev;
using namespace dev::eth; using namespace dev::eth;

2
libethereum/State.h

@ -32,7 +32,6 @@
#include <libethcore/ProofOfWork.h> #include <libethcore/ProofOfWork.h>
#include <libethcore/Miner.h> #include <libethcore/Miner.h>
#include <libevm/ExtVMFace.h> #include <libevm/ExtVMFace.h>
#include "TransactionQueue.h"
#include "Account.h" #include "Account.h"
#include "Transaction.h" #include "Transaction.h"
#include "TransactionReceipt.h" #include "TransactionReceipt.h"
@ -67,6 +66,7 @@ using LogBloomRequirementError = boost::tuple<errinfo_required_LogBloom, errinfo
class BlockChain; class BlockChain;
class State; class State;
class TransactionQueue;
struct VerifiedBlockRef; struct VerifiedBlockRef;
struct StateChat: public LogChannel { static const char* name(); static const int verbosity = 4; }; struct StateChat: public LogChannel { static const char* name(); static const int verbosity = 4; };

138
libethereum/TransactionQueue.cpp

@ -31,7 +31,25 @@ using namespace dev::eth;
const char* TransactionQueueChannel::name() { return EthCyan "┉┅▶"; } const char* TransactionQueueChannel::name() { return EthCyan "┉┅▶"; }
const char* TransactionQueueTraceChannel::name() { return EthCyan " ┅▶"; } const char* TransactionQueueTraceChannel::name() { return EthCyan " ┅▶"; }
ImportResult TransactionQueue::import(bytesConstRef _transactionRLP, ImportCallback const& _cb, IfDropped _ik) TransactionQueue::TransactionQueue(unsigned _limit, unsigned _futureLimit):
m_current(PriorityCompare { *this }),
m_limit(_limit),
m_futureLimit(_futureLimit),
m_verifier([=](){
setThreadName("tr verified");
this->verifierBody();
})
{
}
TransactionQueue::~TransactionQueue()
{
m_aborting = true;
m_queueReady.notify_all();
m_verifier.join();
}
ImportResult TransactionQueue::import(bytesConstRef _transactionRLP, IfDropped _ik)
{ {
// Check if we already know this transaction. // Check if we already know this transaction.
h256 h = sha3(_transactionRLP); h256 h = sha3(_transactionRLP);
@ -49,14 +67,13 @@ ImportResult TransactionQueue::import(bytesConstRef _transactionRLP, ImportCallb
{ {
t = Transaction(_transactionRLP, CheckTransaction::Everything); t = Transaction(_transactionRLP, CheckTransaction::Everything);
UpgradeGuard ul(l); UpgradeGuard ul(l);
ir = manageImport_WITH_LOCK(h, t, _cb); ir = manageImport_WITH_LOCK(h, t);
} }
catch (...) catch (...)
{ {
return ImportResult::Malformed; return ImportResult::Malformed;
} }
} }
// cdebug << "import-END: Nonce of" << t.sender() << "now" << maxNonce(t.sender());
return ir; return ir;
} }
@ -71,27 +88,24 @@ ImportResult TransactionQueue::check_WITH_LOCK(h256 const& _h, IfDropped _ik)
return ImportResult::Success; return ImportResult::Success;
} }
ImportResult TransactionQueue::import(Transaction const& _transaction, ImportCallback const& _cb, IfDropped _ik) ImportResult TransactionQueue::import(Transaction const& _transaction, IfDropped _ik)
{ {
// Check if we already know this transaction. // Check if we already know this transaction.
h256 h = _transaction.sha3(WithSignature); h256 h = _transaction.sha3(WithSignature);
// cdebug << "import-BEGIN: Nonce of sender" << maxNonce(_transaction.sender());
ImportResult ret; ImportResult ret;
{ {
UpgradableGuard l(m_lock); UpgradableGuard l(m_lock);
// TODO: keep old transactions around and check in State for nonce validity // TODO: keep old transactions around and check in State for nonce validity
auto ir = check_WITH_LOCK(h, _ik); auto ir = check_WITH_LOCK(h, _ik);
if (ir != ImportResult::Success) if (ir != ImportResult::Success)
return ir; return ir;
{ {
UpgradeGuard ul(l); UpgradeGuard ul(l);
ret = manageImport_WITH_LOCK(h, _transaction, _cb); ret = manageImport_WITH_LOCK(h, _transaction);
} }
} }
// cdebug << "import-END: Nonce of" << _transaction.sender() << "now" << maxNonce(_transaction.sender());
return ret; return ret;
} }
@ -111,7 +125,7 @@ h256Hash TransactionQueue::knownTransactions() const
return m_known; return m_known;
} }
ImportResult TransactionQueue::manageImport_WITH_LOCK(h256 const& _h, Transaction const& _transaction, ImportCallback const& _cb) ImportResult TransactionQueue::manageImport_WITH_LOCK(h256 const& _h, Transaction const& _transaction)
{ {
try try
{ {
@ -149,10 +163,8 @@ ImportResult TransactionQueue::manageImport_WITH_LOCK(h256 const& _h, Transactio
} }
} }
} }
// If valid, append to blocks. // If valid, append to transactions.
insertCurrent_WITH_LOCK(make_pair(_h, _transaction)); insertCurrent_WITH_LOCK(make_pair(_h, _transaction));
if (_cb)
m_callbacks[_h] = _cb;
clog(TransactionQueueTraceChannel) << "Queued vaguely legit-looking transaction" << _h; clog(TransactionQueueTraceChannel) << "Queued vaguely legit-looking transaction" << _h;
while (m_current.size() > m_limit) while (m_current.size() > m_limit)
@ -179,7 +191,6 @@ ImportResult TransactionQueue::manageImport_WITH_LOCK(h256 const& _h, Transactio
u256 TransactionQueue::maxNonce(Address const& _a) const u256 TransactionQueue::maxNonce(Address const& _a) const
{ {
// cdebug << "txQ::maxNonce" << _a;
ReadGuard l(m_lock); ReadGuard l(m_lock);
return maxNonce_WITH_LOCK(_a); return maxNonce_WITH_LOCK(_a);
} }
@ -212,29 +223,7 @@ void TransactionQueue::insertCurrent_WITH_LOCK(std::pair<h256, Transaction> cons
m_currentByHash[_p.first] = handle; m_currentByHash[_p.first] = handle;
// Move following transactions from future to current // Move following transactions from future to current
auto fs = m_future.find(t.from()); makeCurrent_WITH_LOCK(t);
if (fs != m_future.end())
{
u256 nonce = t.nonce() + 1;
auto fb = fs->second.find(nonce);
if (fb != fs->second.end())
{
auto ft = fb;
while (ft != fs->second.end() && ft->second.transaction.nonce() == nonce)
{
inserted = m_currentByAddressAndNonce[t.from()].insert(std::make_pair(ft->second.transaction.nonce(), PriorityQueue::iterator()));
PriorityQueue::iterator handle = m_current.emplace(move(ft->second));
inserted.first->second = handle;
m_currentByHash[(*handle).transaction.sha3()] = handle;
--m_futureSize;
++ft;
++nonce;
}
fs->second.erase(fb, ft);
if (fs->second.empty())
m_future.erase(t.from());
}
}
m_known.insert(_p.first); m_known.insert(_p.first);
} }
@ -296,6 +285,33 @@ void TransactionQueue::setFuture(h256 const& _txHash)
m_currentByAddressAndNonce.erase(from); m_currentByAddressAndNonce.erase(from);
} }
void TransactionQueue::makeCurrent_WITH_LOCK(Transaction const& _t)
{
auto fs = m_future.find(_t.from());
if (fs != m_future.end())
{
u256 nonce = _t.nonce() + 1;
auto fb = fs->second.find(nonce);
if (fb != fs->second.end())
{
auto ft = fb;
while (ft != fs->second.end() && ft->second.transaction.nonce() == nonce)
{
auto inserted = m_currentByAddressAndNonce[_t.from()].insert(std::make_pair(ft->second.transaction.nonce(), PriorityQueue::iterator()));
PriorityQueue::iterator handle = m_current.emplace(move(ft->second));
inserted.first->second = handle;
m_currentByHash[(*handle).transaction.sha3()] = handle;
--m_futureSize;
++ft;
++nonce;
}
fs->second.erase(fb, ft);
if (fs->second.empty())
m_future.erase(_t.from());
}
}
}
void TransactionQueue::drop(h256 const& _txHash) void TransactionQueue::drop(h256 const& _txHash)
{ {
UpgradableGuard l(m_lock); UpgradableGuard l(m_lock);
@ -305,9 +321,17 @@ void TransactionQueue::drop(h256 const& _txHash)
UpgradeGuard ul(l); UpgradeGuard ul(l);
m_dropped.insert(_txHash); m_dropped.insert(_txHash);
remove_WITH_LOCK(_txHash); remove_WITH_LOCK(_txHash);
}
void TransactionQueue::dropGood(Transaction const& _t)
{
WriteGuard l(m_lock);
makeCurrent_WITH_LOCK(_t);
if (!m_known.count(_t.sha3()))
return;
m_dropped.insert(_t.sha3());
remove_WITH_LOCK(_t.sha3());
} }
void TransactionQueue::clear() void TransactionQueue::clear()
@ -320,3 +344,43 @@ void TransactionQueue::clear()
m_future.clear(); m_future.clear();
m_futureSize = 0; m_futureSize = 0;
} }
void TransactionQueue::enqueue(RLP const& _data, h512 const& _nodeId)
{
unique_lock<Mutex> l(x_queue);
unsigned itemCount = _data.itemCount();
for (unsigned i = 0; i < itemCount; ++i)
m_unverified.emplace_back(UnverifiedTransaction(_data[i].data(), _nodeId));
m_queueReady.notify_all();
}
void TransactionQueue::verifierBody()
{
while (!m_aborting)
{
UnverifiedTransaction work;
{
unique_lock<Mutex> l(x_queue);
m_queueReady.wait(l, [&](){ return !m_unverified.empty() || m_aborting; });
if (m_aborting)
return;
work = move(m_unverified.front());
m_unverified.pop_front();
}
try
{
Transaction t(work.transaction, CheckTransaction::Cheap);
ImportResult ir = import(t);
m_onImport(ir, t.sha3(), work.nodeId);
}
catch (...)
{
// should not happen as exceptions are handled in import.
cwarn << "Bad transaction:" << boost::current_exception_diagnostic_information();
}
}
}

47
libethereum/TransactionQueue.h

@ -22,6 +22,8 @@
#pragma once #pragma once
#include <functional> #include <functional>
#include <condition_variable>
#include <thread>
#include <libdevcore/Common.h> #include <libdevcore/Common.h>
#include <libdevcore/Guards.h> #include <libdevcore/Guards.h>
#include <libdevcore/Log.h> #include <libdevcore/Log.h>
@ -49,15 +51,14 @@ enum class IfDropped { Ignore, Retry };
class TransactionQueue class TransactionQueue
{ {
public: public:
using ImportCallback = std::function<void(ImportResult)>;
/// @brief TransactionQueue /// @brief TransactionQueue
/// @param _limit Maximum number of pending transactions in the queue /// @param _limit Maximum number of pending transactions in the queue
/// @param _futureLimit Maximum number of future nonce transactions /// @param _futureLimit Maximum number of future nonce transactions
TransactionQueue(unsigned _limit = 1024, unsigned _futureLimit = 1024): m_current(PriorityCompare { *this }), m_limit(_limit), m_futureLimit(_futureLimit) {} TransactionQueue(unsigned _limit = 1024, unsigned _futureLimit = 1024);
ImportResult import(Transaction const& _tx, ImportCallback const& _cb = ImportCallback(), IfDropped _ik = IfDropped::Ignore); ~TransactionQueue();
ImportResult import(bytes const& _tx, ImportCallback const& _cb = ImportCallback(), IfDropped _ik = IfDropped::Ignore) { return import(&_tx, _cb, _ik); } void enqueue(RLP const& _data, h512 const& _nodeId);
ImportResult import(bytesConstRef _tx, ImportCallback const& _cb = ImportCallback(), IfDropped _ik = IfDropped::Ignore); ImportResult import(bytes const& _tx, IfDropped _ik = IfDropped::Ignore) { return import(&_tx, _ik); }
ImportResult import(Transaction const& _tx, IfDropped _ik = IfDropped::Ignore);
void drop(h256 const& _txHash); void drop(h256 const& _txHash);
@ -66,9 +67,11 @@ public:
h256Hash knownTransactions() const; h256Hash knownTransactions() const;
u256 maxNonce(Address const& _a) const; u256 maxNonce(Address const& _a) const;
void setFuture(h256 const& _t); void setFuture(h256 const& _t);
void dropGood(Transaction const& _t);
void clear(); void clear();
template <class T> Handler onReady(T const& _t) { return m_onReady.add(_t); } template <class T> Handler<> onReady(T const& _t) { return m_onReady.add(_t); }
template <class T> Handler<ImportResult, h256 const&, h512 const&> onImport(T const& _t) { return m_onImport.add(_t); }
private: private:
struct VerifiedTransaction struct VerifiedTransaction
@ -77,11 +80,25 @@ private:
VerifiedTransaction(VerifiedTransaction&& _t): transaction(std::move(_t.transaction)) {} VerifiedTransaction(VerifiedTransaction&& _t): transaction(std::move(_t.transaction)) {}
VerifiedTransaction(VerifiedTransaction const&) = delete; VerifiedTransaction(VerifiedTransaction const&) = delete;
VerifiedTransaction operator=(VerifiedTransaction const&) = delete; VerifiedTransaction& operator=(VerifiedTransaction const&) = delete;
Transaction transaction; Transaction transaction;
}; };
struct UnverifiedTransaction
{
UnverifiedTransaction() {}
UnverifiedTransaction(bytesConstRef const& _t, h512 const& _nodeId): transaction(std::move(_t.toBytes())), nodeId(_nodeId) {}
UnverifiedTransaction(UnverifiedTransaction&& _t): transaction(std::move(_t.transaction)) {}
UnverifiedTransaction& operator=(UnverifiedTransaction&& _other) { transaction = std::move(_other.transaction); nodeId = std::move(_other.nodeId); return *this; }
UnverifiedTransaction(UnverifiedTransaction const&) = delete;
UnverifiedTransaction& operator=(UnverifiedTransaction const&) = delete;
bytes transaction;
h512 nodeId;
};
struct PriorityCompare struct PriorityCompare
{ {
TransactionQueue& queue; TransactionQueue& queue;
@ -96,12 +113,15 @@ private:
// Use a set with dynamic comparator for minmax priority queue. The comparator takes into account min account nonce. Updating it does not affect the order. // Use a set with dynamic comparator for minmax priority queue. The comparator takes into account min account nonce. Updating it does not affect the order.
using PriorityQueue = std::multiset<VerifiedTransaction, PriorityCompare>; using PriorityQueue = std::multiset<VerifiedTransaction, PriorityCompare>;
ImportResult import(bytesConstRef _tx, IfDropped _ik = IfDropped::Ignore);
ImportResult check_WITH_LOCK(h256 const& _h, IfDropped _ik); ImportResult check_WITH_LOCK(h256 const& _h, IfDropped _ik);
ImportResult manageImport_WITH_LOCK(h256 const& _h, Transaction const& _transaction, ImportCallback const& _cb); ImportResult manageImport_WITH_LOCK(h256 const& _h, Transaction const& _transaction);
void insertCurrent_WITH_LOCK(std::pair<h256, Transaction> const& _p); void insertCurrent_WITH_LOCK(std::pair<h256, Transaction> const& _p);
void makeCurrent_WITH_LOCK(Transaction const& _t);
bool remove_WITH_LOCK(h256 const& _txHash); bool remove_WITH_LOCK(h256 const& _txHash);
u256 maxNonce_WITH_LOCK(Address const& _a) const; u256 maxNonce_WITH_LOCK(Address const& _a) const;
void verifierBody();
mutable SharedMutex m_lock; ///< General lock. mutable SharedMutex m_lock; ///< General lock.
h256Hash m_known; ///< Hashes of transactions in both sets. h256Hash m_known; ///< Hashes of transactions in both sets.
@ -114,10 +134,17 @@ private:
std::unordered_map<Address, std::map<u256, PriorityQueue::iterator>> m_currentByAddressAndNonce; ///< Transactions grouped by account and nonce std::unordered_map<Address, std::map<u256, PriorityQueue::iterator>> m_currentByAddressAndNonce; ///< Transactions grouped by account and nonce
std::unordered_map<Address, std::map<u256, VerifiedTransaction>> m_future; /// Future transactions std::unordered_map<Address, std::map<u256, VerifiedTransaction>> m_future; /// Future transactions
Signal m_onReady; ///< Called when a subsequent call to import transactions will return a non-empty container. Be nice and exit fast. Signal<> m_onReady; ///< Called when a subsequent call to import transactions will return a non-empty container. Be nice and exit fast.
Signal<ImportResult, h256 const&, h512 const&> m_onImport; ///< Called for each import attempt. Arguments are result, transaction id an node id. Be nice and exit fast.
unsigned m_limit; ///< Max number of pending transactions unsigned m_limit; ///< Max number of pending transactions
unsigned m_futureLimit; ///< Max number of future transactions unsigned m_futureLimit; ///< Max number of future transactions
unsigned m_futureSize = 0; ///< Current number of future transactions unsigned m_futureSize = 0; ///< Current number of future transactions
std::condition_variable m_queueReady; ///< Signaled when m_unverified has a new entry.
std::thread m_verifier; ///< Verification thread
std::deque<UnverifiedTransaction> m_unverified; ///< Pending verification queue
mutable Mutex x_queue; ///< Verification queue mutex
bool m_aborting = false; ///< Exit condition for verifier.
}; };
} }

5
libp2p/Host.h

@ -202,6 +202,9 @@ public:
/// Validates and starts peer session, taking ownership of _io. Disconnects and returns false upon error. /// Validates and starts peer session, taking ownership of _io. Disconnects and returns false upon error.
void startPeerSession(Public const& _id, RLP const& _hello, RLPXFrameCoder* _io, std::shared_ptr<RLPXSocket> const& _s); void startPeerSession(Public const& _id, RLP const& _hello, RLPXFrameCoder* _io, std::shared_ptr<RLPXSocket> const& _s);
/// Get session by id
std::shared_ptr<Session> peerSession(NodeId const& _id) { RecursiveGuard l(x_sessions); return m_sessions.count(_id) ? m_sessions[_id].lock() : std::shared_ptr<Session>(); }
protected: protected:
void onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e); void onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e);
@ -211,7 +214,7 @@ protected:
private: private:
enum PeerSlotRatio { Egress = 2, Ingress = 9 }; enum PeerSlotRatio { Egress = 2, Ingress = 9 };
bool havePeerSession(NodeId _id) { RecursiveGuard l(x_sessions); return m_sessions.count(_id) ? !!m_sessions[_id].lock() : false; } bool havePeerSession(NodeId const& _id) { return !!peerSession(_id); }
/// Determines and sets m_tcpPublic to publicly advertised address. /// Determines and sets m_tcpPublic to publicly advertised address.
void determinePublic(); void determinePublic();

1
test/libethereum/blockchain.cpp

@ -24,6 +24,7 @@
#include <libdevcore/FileSystem.h> #include <libdevcore/FileSystem.h>
#include <libdevcore/TransientDirectory.h> #include <libdevcore/TransientDirectory.h>
#include <libethereum/CanonBlockChain.h> #include <libethereum/CanonBlockChain.h>
#include <libethereum/TransactionQueue.h>
#include <test/TestHelper.h> #include <test/TestHelper.h>
using namespace std; using namespace std;

Loading…
Cancel
Save