From 440f6e7dc2d867098ff3eda5675e3621eac0a68e Mon Sep 17 00:00:00 2001 From: arkpar Date: Sun, 5 Jul 2015 22:11:40 +0200 Subject: [PATCH] import transactions in a separate thread --- libethcore/Common.h | 13 +-- libethereum/BlockChain.cpp | 8 +- libethereum/BlockChainSync.h | 2 +- libethereum/BlockQueue.h | 8 +- libethereum/Client.cpp | 15 ++-- libethereum/Client.h | 4 +- libethereum/EthereumHost.cpp | 79 +++++++++++------- libethereum/EthereumHost.h | 2 + libethereum/State.cpp | 1 + libethereum/State.h | 2 +- libethereum/TransactionQueue.cpp | 138 ++++++++++++++++++++++--------- libethereum/TransactionQueue.h | 47 ++++++++--- libp2p/Host.h | 7 +- test/libethereum/blockchain.cpp | 1 + 14 files changed, 225 insertions(+), 102 deletions(-) diff --git a/libethcore/Common.h b/libethcore/Common.h index 732d09981..ba8c554fb 100644 --- a/libethcore/Common.h +++ b/libethcore/Common.h @@ -97,10 +97,13 @@ enum class RelativeBlock: BlockNumber Pending = PendingBlock }; +class Transaction; + struct ImportRoute { h256s deadBlocks; h256s liveBlocks; + std::vector goodTranactions; }; enum class ImportResult @@ -129,10 +132,10 @@ struct ImportRequirements }; /// Super-duper signal mechanism. TODO: replace with somthing a bit heavier weight. -class Signal +template class Signal { public: - using Callback = std::function; + using Callback = std::function; class HandlerAux { @@ -141,7 +144,7 @@ public: public: ~HandlerAux() { if (m_s) m_s->m_fire.erase(m_i); m_s = nullptr; } void reset() { m_s = nullptr; } - void fire() { m_h(); } + void fire(Args&&... _args) { m_h(std::forward(_args)...); } private: HandlerAux(unsigned _i, Signal* _s, Callback const& _h): m_i(_i), m_s(_s), m_h(_h) {} @@ -165,13 +168,13 @@ public: return h; } - void operator()() { for (auto const& f: m_fire) f.second->fire(); } + void operator()(Args&... _args) { for (auto const& f: m_fire) f.second->fire(std::forward(_args)...); } private: std::map> m_fire; }; -using Handler = std::shared_ptr; +template using Handler = std::shared_ptr::HandlerAux>; struct TransactionSkeleton { diff --git a/libethereum/BlockChain.cpp b/libethereum/BlockChain.cpp index 4e0266b5b..d8dbd266f 100644 --- a/libethereum/BlockChain.cpp +++ b/libethereum/BlockChain.cpp @@ -337,6 +337,7 @@ tuple BlockChain::sync(BlockQueue& _bq, OverlayDB c h256s fresh; h256s dead; h256s badBlocks; + Transactions goodTransactions; unsigned count = 0; for (VerifiedBlock const& block: blocks) if (!badBlocks.empty()) @@ -351,6 +352,7 @@ tuple BlockChain::sync(BlockQueue& _bq, OverlayDB c r = import(block.verified, _stateDB, ImportRequirements::Default & ~ImportRequirements::ValidNonce & ~ImportRequirements::CheckUncles); fresh += r.liveBlocks; dead += r.deadBlocks; + goodTransactions += r.goodTranactions; ++count; } catch (dev::eth::UnknownParent) @@ -377,7 +379,7 @@ tuple BlockChain::sync(BlockQueue& _bq, OverlayDB c badBlocks.push_back(block.verified.info.hash()); } } - return make_tuple(ImportRoute{dead, fresh}, _bq.doneDrain(badBlocks), count); + return make_tuple(ImportRoute{dead, fresh, goodTransactions}, _bq.doneDrain(badBlocks), count); } pair BlockChain::attemptImport(bytes const& _block, OverlayDB const& _stateDB, ImportRequirements::value _ir) noexcept @@ -497,6 +499,7 @@ ImportRoute BlockChain::import(VerifiedBlockRef const& _block, OverlayDB const& BlockReceipts br; u256 td; + Transactions goodTransactions; #if ETH_CATCH try #endif @@ -510,6 +513,7 @@ ImportRoute BlockChain::import(VerifiedBlockRef const& _block, OverlayDB const& { blb.blooms.push_back(s.receipt(i).bloom()); br.receipts.push_back(s.receipt(i)); + goodTransactions.push_back(s.pending()[i]); } s.cleanup(true); @@ -750,7 +754,7 @@ ImportRoute BlockChain::import(VerifiedBlockRef const& _block, OverlayDB const& dead.push_back(h); else fresh.push_back(h); - return ImportRoute{dead, fresh}; + return ImportRoute{dead, fresh, move(goodTransactions)}; } void BlockChain::clearBlockBlooms(unsigned _begin, unsigned _end) diff --git a/libethereum/BlockChainSync.h b/libethereum/BlockChainSync.h index 5501075fd..6f1c4306d 100644 --- a/libethereum/BlockChainSync.h +++ b/libethereum/BlockChainSync.h @@ -114,7 +114,7 @@ protected: void requestBlocks(std::shared_ptr _peer); protected: - Handler m_bqRoomAvailable; ///< Triggered once block queue + Handler<> m_bqRoomAvailable; ///< Triggered once block queue mutable RecursiveMutex x_sync; SyncState m_state = SyncState::Idle; ///< Current sync state unsigned m_estimatedHashes = 0; ///< Number of estimated hashes for the last peer over PV60. Used for status reporting only. diff --git a/libethereum/BlockQueue.h b/libethereum/BlockQueue.h index fc9d62300..b9e6f5b3b 100644 --- a/libethereum/BlockQueue.h +++ b/libethereum/BlockQueue.h @@ -111,8 +111,8 @@ public: /// Get some infomration on the given block's status regarding us. QueueStatus blockStatus(h256 const& _h) const; - template Handler onReady(T const& _t) { return m_onReady.add(_t); } - template Handler onRoomAvailable(T const& _t) { return m_onRoomAvailable.add(_t); } + template Handler<> onReady(T const& _t) { return m_onReady.add(_t); } + template Handler<> onRoomAvailable(T const& _t) { return m_onRoomAvailable.add(_t); } template void setOnBad(T const& _t) { m_onBad = _t; } @@ -145,8 +145,8 @@ private: std::unordered_multimap> m_unknown; ///< For blocks that have an unknown parent; we map their parent hash to the block stuff, and insert once the block appears. h256Hash m_knownBad; ///< Set of blocks that we know will never be valid. std::multimap> m_future; ///< Set of blocks that are not yet valid. Ordered by timestamp - Signal m_onReady; ///< Called when a subsequent call to import blocks will return a non-empty container. Be nice and exit fast. - Signal m_onRoomAvailable; ///< Called when space for new blocks becomes availabe after a drain. Be nice and exit fast. + Signal<> m_onReady; ///< Called when a subsequent call to import blocks will return a non-empty container. Be nice and exit fast. + Signal<> m_onRoomAvailable; ///< Called when space for new blocks becomes availabe after a drain. Be nice and exit fast. mutable Mutex m_verification; ///< Mutex that allows writing to m_verified, m_verifying and m_unverified. std::condition_variable m_moreToVerify; ///< Signaled when m_unverified has a new entry. diff --git a/libethereum/Client.cpp b/libethereum/Client.cpp index e812fb4e9..fba8d51fa 100644 --- a/libethereum/Client.cpp +++ b/libethereum/Client.cpp @@ -604,19 +604,18 @@ void Client::onChainChanged(ImportRoute const& _ir) for (auto const& t: m_bc.transactions(h)) { clog(ClientTrace) << "Resubmitting dead-block transaction " << Transaction(t, CheckTransaction::None); - m_tq.import(t, TransactionQueue::ImportCallback(), IfDropped::Retry); + m_tq.import(t, IfDropped::Retry); } } // remove transactions from m_tq nicely rather than relying on out of date nonce later on. for (auto const& h: _ir.liveBlocks) - { clog(ClientTrace) << "Live block:" << h; - for (auto const& th: m_bc.transactionHashes(h)) - { - clog(ClientTrace) << "Safely dropping transaction " << th; - m_tq.drop(th); - } + + for (auto const& t: _ir.goodTranactions) + { + clog(ClientTrace) << "Safely dropping transaction " << t.sha3(); + m_tq.dropGood(t); } if (auto h = m_host.lock()) @@ -651,7 +650,7 @@ void Client::onChainChanged(ImportRoute const& _ir) for (auto const& t: m_postMine.pending()) { clog(ClientTrace) << "Resubmitting post-mine transaction " << t; - auto ir = m_tq.import(t, TransactionQueue::ImportCallback(), IfDropped::Retry); + auto ir = m_tq.import(t, IfDropped::Retry); if (ir != ImportResult::Success) onTransactionQueueReady(); } diff --git a/libethereum/Client.h b/libethereum/Client.h index 343ca5b60..7f9a65875 100644 --- a/libethereum/Client.h +++ b/libethereum/Client.h @@ -310,8 +310,8 @@ private: GenericFarm m_farm; ///< Our mining farm. - Handler m_tqReady; - Handler m_bqReady; + Handler<> m_tqReady; + Handler<> m_bqReady; bool m_wouldMine = false; ///< True if we /should/ be mining. bool m_turboMining = false; ///< Don't squander all of our time mining actually just sleeping. diff --git a/libethereum/EthereumHost.cpp b/libethereum/EthereumHost.cpp index ab3b91ba5..186eb6fa8 100644 --- a/libethereum/EthereumHost.cpp +++ b/libethereum/EthereumHost.cpp @@ -54,6 +54,7 @@ EthereumHost::EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQu m_networkId (_networkId) { m_latestBlockSent = _ch.currentHash(); + m_tq.onImport([this](ImportResult _ir, h256 const& _h, h512 const& _nodeId) { onTransactionImported(_ir, _h, _nodeId); }); } EthereumHost::~EthereumHost() @@ -68,6 +69,7 @@ bool EthereumHost::ensureInitialised() m_latestBlockSent = m_chain.currentHash(); clog(NetNote) << "Initialising: latest=" << m_latestBlockSent; + Guard l(x_transactions); m_transactionsSent = m_tq.knownTransactions(); return true; } @@ -82,6 +84,7 @@ void EthereumHost::reset() m_sync.reset(); m_latestBlockSent = h256(); + Guard tl(x_transactions); m_transactionsSent.clear(); } @@ -116,16 +119,19 @@ void EthereumHost::maintainTransactions() // Send any new transactions. unordered_map, std::vector> peerTransactions; auto ts = m_tq.topTransactions(c_maxSendTransactions); - for (size_t i = 0; i < ts.size(); ++i) { - auto const& t = ts[i]; - bool unsent = !m_transactionsSent.count(t.sha3()); - auto peers = get<1>(randomSelection(0, [&](EthereumPeer* p) { return p->m_requireTransactions || (unsent && !p->m_knownTransactions.count(t.sha3())); })); - for (auto const& p: peers) - peerTransactions[p].push_back(i); + Guard l(x_transactions); + for (size_t i = 0; i < ts.size(); ++i) + { + auto const& t = ts[i]; + bool unsent = !m_transactionsSent.count(t.sha3()); + auto peers = get<1>(randomSelection(0, [&](EthereumPeer* p) { return p->m_requireTransactions || (unsent && !p->m_knownTransactions.count(t.sha3())); })); + for (auto const& p: peers) + peerTransactions[p].push_back(i); + } + for (auto const& t: ts) + m_transactionsSent.insert(t.sha3()); } - for (auto const& t: ts) - m_transactionsSent.insert(t.sha3()); foreachPeer([&](shared_ptr _p) { bytes b; @@ -291,28 +297,7 @@ void EthereumHost::onPeerTransactions(std::shared_ptr _peer, RLP c } unsigned itemCount = _r.itemCount(); clog(NetAllDetail) << "Transactions (" << dec << itemCount << "entries)"; - Guard l(_peer->x_knownTransactions); - for (unsigned i = 0; i < min(itemCount, 32); ++i) // process 256 transactions at most. TODO: much better solution. - { - auto h = sha3(_r[i].data()); - _peer->m_knownTransactions.insert(h); - ImportResult ir = m_tq.import(_r[i].data()); - switch (ir) - { - case ImportResult::Malformed: - _peer->addRating(-100); - break; - case ImportResult::AlreadyKnown: - // if we already had the transaction, then don't bother sending it on. - m_transactionsSent.insert(h); - _peer->addRating(0); - break; - case ImportResult::Success: - _peer->addRating(100); - break; - default:; - } - } + m_tq.enqueue(_r, _peer->session()->id()); } void EthereumHost::onPeerAborting() @@ -344,3 +329,37 @@ SyncStatus EthereumHost::status() const return SyncStatus(); return m_sync->status(); } + +void EthereumHost::onTransactionImported(ImportResult _ir, h256 const& _h, h512 const& _nodeId) +{ + auto session = host()->peerSession(_nodeId); + if (!session) + return; + + std::shared_ptr peer = session->cap(); + if (!peer) + peer = session->cap(c_oldProtocolVersion); + if (!peer) + return; + + Guard l(peer->x_knownTransactions); + peer->m_knownTransactions.insert(_h); + switch (_ir) + { + case ImportResult::Malformed: + peer->addRating(-100); + break; + case ImportResult::AlreadyKnown: + // if we already had the transaction, then don't bother sending it on. + { + Guard l(x_transactions); + m_transactionsSent.insert(_h); + } + peer->addRating(0); + break; + case ImportResult::Success: + peer->addRating(100); + break; + default:; + } +} diff --git a/libethereum/EthereumHost.h b/libethereum/EthereumHost.h index 5c5f16ff7..b65ddb5d0 100644 --- a/libethereum/EthereumHost.h +++ b/libethereum/EthereumHost.h @@ -105,6 +105,7 @@ private: void maintainTransactions(); void maintainBlocks(h256 const& _currentBlock); + void onTransactionImported(ImportResult _ir, h256 const& _h, h512 const& _nodeId); /// Check to see if the network peer-state initialisation has happened. bool isInitialised() const { return (bool)m_latestBlockSent; } @@ -132,6 +133,7 @@ private: bool m_newBlocks = false; mutable Mutex x_sync; + mutable Mutex x_transactions; DownloadMan m_man; std::unique_ptr m_sync; }; diff --git a/libethereum/State.cpp b/libethereum/State.cpp index 5362f6f68..d1afd0403 100644 --- a/libethereum/State.cpp +++ b/libethereum/State.cpp @@ -38,6 +38,7 @@ #include "Executive.h" #include "CachedAddressState.h" #include "CanonBlockChain.h" +#include "TransactionQueue.h" using namespace std; using namespace dev; using namespace dev::eth; diff --git a/libethereum/State.h b/libethereum/State.h index 2a63aeda4..ef8a3251a 100644 --- a/libethereum/State.h +++ b/libethereum/State.h @@ -32,7 +32,6 @@ #include #include #include -#include "TransactionQueue.h" #include "Account.h" #include "Transaction.h" #include "TransactionReceipt.h" @@ -67,6 +66,7 @@ using LogBloomRequirementError = boost::tupleverifierBody(); + }) +{ +} + +TransactionQueue::~TransactionQueue() +{ + m_aborting = true; + m_queueReady.notify_all(); + m_verifier.join(); +} + +ImportResult TransactionQueue::import(bytesConstRef _transactionRLP, IfDropped _ik) { // Check if we already know this transaction. h256 h = sha3(_transactionRLP); @@ -49,14 +67,13 @@ ImportResult TransactionQueue::import(bytesConstRef _transactionRLP, ImportCallb { t = Transaction(_transactionRLP, CheckTransaction::Everything); UpgradeGuard ul(l); - ir = manageImport_WITH_LOCK(h, t, _cb); + ir = manageImport_WITH_LOCK(h, t); } catch (...) { return ImportResult::Malformed; } } -// cdebug << "import-END: Nonce of" << t.sender() << "now" << maxNonce(t.sender()); return ir; } @@ -71,27 +88,24 @@ ImportResult TransactionQueue::check_WITH_LOCK(h256 const& _h, IfDropped _ik) return ImportResult::Success; } -ImportResult TransactionQueue::import(Transaction const& _transaction, ImportCallback const& _cb, IfDropped _ik) +ImportResult TransactionQueue::import(Transaction const& _transaction, IfDropped _ik) { // Check if we already know this transaction. h256 h = _transaction.sha3(WithSignature); -// cdebug << "import-BEGIN: Nonce of sender" << maxNonce(_transaction.sender()); ImportResult ret; { UpgradableGuard l(m_lock); // TODO: keep old transactions around and check in State for nonce validity - auto ir = check_WITH_LOCK(h, _ik); if (ir != ImportResult::Success) return ir; { UpgradeGuard ul(l); - ret = manageImport_WITH_LOCK(h, _transaction, _cb); + ret = manageImport_WITH_LOCK(h, _transaction); } } -// cdebug << "import-END: Nonce of" << _transaction.sender() << "now" << maxNonce(_transaction.sender()); return ret; } @@ -111,7 +125,7 @@ h256Hash TransactionQueue::knownTransactions() const return m_known; } -ImportResult TransactionQueue::manageImport_WITH_LOCK(h256 const& _h, Transaction const& _transaction, ImportCallback const& _cb) +ImportResult TransactionQueue::manageImport_WITH_LOCK(h256 const& _h, Transaction const& _transaction) { try { @@ -149,10 +163,8 @@ ImportResult TransactionQueue::manageImport_WITH_LOCK(h256 const& _h, Transactio } } } - // If valid, append to blocks. + // If valid, append to transactions. insertCurrent_WITH_LOCK(make_pair(_h, _transaction)); - if (_cb) - m_callbacks[_h] = _cb; clog(TransactionQueueTraceChannel) << "Queued vaguely legit-looking transaction" << _h; while (m_current.size() > m_limit) @@ -179,7 +191,6 @@ ImportResult TransactionQueue::manageImport_WITH_LOCK(h256 const& _h, Transactio u256 TransactionQueue::maxNonce(Address const& _a) const { -// cdebug << "txQ::maxNonce" << _a; ReadGuard l(m_lock); return maxNonce_WITH_LOCK(_a); } @@ -212,29 +223,7 @@ void TransactionQueue::insertCurrent_WITH_LOCK(std::pair cons m_currentByHash[_p.first] = handle; // Move following transactions from future to current - auto fs = m_future.find(t.from()); - if (fs != m_future.end()) - { - u256 nonce = t.nonce() + 1; - auto fb = fs->second.find(nonce); - if (fb != fs->second.end()) - { - auto ft = fb; - while (ft != fs->second.end() && ft->second.transaction.nonce() == nonce) - { - inserted = m_currentByAddressAndNonce[t.from()].insert(std::make_pair(ft->second.transaction.nonce(), PriorityQueue::iterator())); - PriorityQueue::iterator handle = m_current.emplace(move(ft->second)); - inserted.first->second = handle; - m_currentByHash[(*handle).transaction.sha3()] = handle; - --m_futureSize; - ++ft; - ++nonce; - } - fs->second.erase(fb, ft); - if (fs->second.empty()) - m_future.erase(t.from()); - } - } + makeCurrent_WITH_LOCK(t); m_known.insert(_p.first); } @@ -296,6 +285,33 @@ void TransactionQueue::setFuture(h256 const& _txHash) m_currentByAddressAndNonce.erase(from); } +void TransactionQueue::makeCurrent_WITH_LOCK(Transaction const& _t) +{ + auto fs = m_future.find(_t.from()); + if (fs != m_future.end()) + { + u256 nonce = _t.nonce() + 1; + auto fb = fs->second.find(nonce); + if (fb != fs->second.end()) + { + auto ft = fb; + while (ft != fs->second.end() && ft->second.transaction.nonce() == nonce) + { + auto inserted = m_currentByAddressAndNonce[_t.from()].insert(std::make_pair(ft->second.transaction.nonce(), PriorityQueue::iterator())); + PriorityQueue::iterator handle = m_current.emplace(move(ft->second)); + inserted.first->second = handle; + m_currentByHash[(*handle).transaction.sha3()] = handle; + --m_futureSize; + ++ft; + ++nonce; + } + fs->second.erase(fb, ft); + if (fs->second.empty()) + m_future.erase(_t.from()); + } + } +} + void TransactionQueue::drop(h256 const& _txHash) { UpgradableGuard l(m_lock); @@ -305,9 +321,17 @@ void TransactionQueue::drop(h256 const& _txHash) UpgradeGuard ul(l); m_dropped.insert(_txHash); - remove_WITH_LOCK(_txHash); +} +void TransactionQueue::dropGood(Transaction const& _t) +{ + WriteGuard l(m_lock); + makeCurrent_WITH_LOCK(_t); + if (!m_known.count(_t.sha3())) + return; + m_dropped.insert(_t.sha3()); + remove_WITH_LOCK(_t.sha3()); } void TransactionQueue::clear() @@ -320,3 +344,43 @@ void TransactionQueue::clear() m_future.clear(); m_futureSize = 0; } + +void TransactionQueue::enqueue(RLP const& _data, h512 const& _nodeId) +{ + unique_lock l(x_queue); + unsigned itemCount = _data.itemCount(); + for (unsigned i = 0; i < itemCount; ++i) + m_unverified.emplace_back(UnverifiedTransaction(_data[i].data(), _nodeId)); + m_queueReady.notify_all(); +} + +void TransactionQueue::verifierBody() +{ + while (!m_aborting) + { + UnverifiedTransaction work; + + { + unique_lock l(x_queue); + m_queueReady.wait(l, [&](){ return !m_unverified.empty() || m_aborting; }); + if (m_aborting) + return; + work = move(m_unverified.front()); + m_unverified.pop_front(); + } + + try + { + + Transaction t(work.transaction, CheckTransaction::Cheap); + ImportResult ir = import(t); + m_onImport(ir, t.sha3(), work.nodeId); + } + catch (...) + { + // should not happen as exceptions are handled in import. + cwarn << "Bad transaction:" << boost::current_exception_diagnostic_information(); + } + } +} + diff --git a/libethereum/TransactionQueue.h b/libethereum/TransactionQueue.h index d92a73dab..cc6b1ca3e 100644 --- a/libethereum/TransactionQueue.h +++ b/libethereum/TransactionQueue.h @@ -22,6 +22,8 @@ #pragma once #include +#include +#include #include #include #include @@ -49,15 +51,14 @@ enum class IfDropped { Ignore, Retry }; class TransactionQueue { public: - using ImportCallback = std::function; - /// @brief TransactionQueue /// @param _limit Maximum number of pending transactions in the queue /// @param _futureLimit Maximum number of future nonce transactions - TransactionQueue(unsigned _limit = 1024, unsigned _futureLimit = 1024): m_current(PriorityCompare { *this }), m_limit(_limit), m_futureLimit(_futureLimit) {} - ImportResult import(Transaction const& _tx, ImportCallback const& _cb = ImportCallback(), IfDropped _ik = IfDropped::Ignore); - ImportResult import(bytes const& _tx, ImportCallback const& _cb = ImportCallback(), IfDropped _ik = IfDropped::Ignore) { return import(&_tx, _cb, _ik); } - ImportResult import(bytesConstRef _tx, ImportCallback const& _cb = ImportCallback(), IfDropped _ik = IfDropped::Ignore); + TransactionQueue(unsigned _limit = 1024, unsigned _futureLimit = 1024); + ~TransactionQueue(); + void enqueue(RLP const& _data, h512 const& _nodeId); + ImportResult import(bytes const& _tx, IfDropped _ik = IfDropped::Ignore) { return import(&_tx, _ik); } + ImportResult import(Transaction const& _tx, IfDropped _ik = IfDropped::Ignore); void drop(h256 const& _txHash); @@ -66,9 +67,11 @@ public: h256Hash knownTransactions() const; u256 maxNonce(Address const& _a) const; void setFuture(h256 const& _t); + void dropGood(Transaction const& _t); void clear(); - template Handler onReady(T const& _t) { return m_onReady.add(_t); } + template Handler<> onReady(T const& _t) { return m_onReady.add(_t); } + template Handler onImport(T const& _t) { return m_onImport.add(_t); } private: struct VerifiedTransaction @@ -77,11 +80,25 @@ private: VerifiedTransaction(VerifiedTransaction&& _t): transaction(std::move(_t.transaction)) {} VerifiedTransaction(VerifiedTransaction const&) = delete; - VerifiedTransaction operator=(VerifiedTransaction const&) = delete; + VerifiedTransaction& operator=(VerifiedTransaction const&) = delete; Transaction transaction; }; + struct UnverifiedTransaction + { + UnverifiedTransaction() {} + UnverifiedTransaction(bytesConstRef const& _t, h512 const& _nodeId): transaction(std::move(_t.toBytes())), nodeId(_nodeId) {} + UnverifiedTransaction(UnverifiedTransaction&& _t): transaction(std::move(_t.transaction)) {} + UnverifiedTransaction& operator=(UnverifiedTransaction&& _other) { transaction = std::move(_other.transaction); nodeId = std::move(_other.nodeId); return *this; } + + UnverifiedTransaction(UnverifiedTransaction const&) = delete; + UnverifiedTransaction& operator=(UnverifiedTransaction const&) = delete; + + bytes transaction; + h512 nodeId; + }; + struct PriorityCompare { TransactionQueue& queue; @@ -96,12 +113,15 @@ private: // Use a set with dynamic comparator for minmax priority queue. The comparator takes into account min account nonce. Updating it does not affect the order. using PriorityQueue = std::multiset; + ImportResult import(bytesConstRef _tx, IfDropped _ik = IfDropped::Ignore); ImportResult check_WITH_LOCK(h256 const& _h, IfDropped _ik); - ImportResult manageImport_WITH_LOCK(h256 const& _h, Transaction const& _transaction, ImportCallback const& _cb); + ImportResult manageImport_WITH_LOCK(h256 const& _h, Transaction const& _transaction); void insertCurrent_WITH_LOCK(std::pair const& _p); + void makeCurrent_WITH_LOCK(Transaction const& _t); bool remove_WITH_LOCK(h256 const& _txHash); u256 maxNonce_WITH_LOCK(Address const& _a) const; + void verifierBody(); mutable SharedMutex m_lock; ///< General lock. h256Hash m_known; ///< Hashes of transactions in both sets. @@ -114,10 +134,17 @@ private: std::unordered_map> m_currentByAddressAndNonce; ///< Transactions grouped by account and nonce std::unordered_map> m_future; /// Future transactions - Signal m_onReady; ///< Called when a subsequent call to import transactions will return a non-empty container. Be nice and exit fast. + Signal<> m_onReady; ///< Called when a subsequent call to import transactions will return a non-empty container. Be nice and exit fast. + Signal m_onImport; ///< Called for each import attempt. Arguments are result, transaction id an node id. Be nice and exit fast. unsigned m_limit; ///< Max number of pending transactions unsigned m_futureLimit; ///< Max number of future transactions unsigned m_futureSize = 0; ///< Current number of future transactions + + std::condition_variable m_queueReady; ///< Signaled when m_unverified has a new entry. + std::thread m_verifier; ///< Verification thread + std::deque m_unverified; ///< Pending verification queue + mutable Mutex x_queue; ///< Verification queue mutex + bool m_aborting = false; ///< Exit condition for verifier. }; } diff --git a/libp2p/Host.h b/libp2p/Host.h index b7ebb3951..17e98f8f6 100644 --- a/libp2p/Host.h +++ b/libp2p/Host.h @@ -202,6 +202,9 @@ public: /// Validates and starts peer session, taking ownership of _io. Disconnects and returns false upon error. void startPeerSession(Public const& _id, RLP const& _hello, RLPXFrameCoder* _io, std::shared_ptr const& _s); + /// Get session by id + std::shared_ptr peerSession(NodeId const& _id) { RecursiveGuard l(x_sessions); return m_sessions.count(_id) ? m_sessions[_id].lock() : std::shared_ptr(); } + protected: void onNodeTableEvent(NodeId const& _n, NodeTableEventType const& _e); @@ -211,8 +214,8 @@ protected: private: enum PeerSlotRatio { Egress = 2, Ingress = 9 }; - bool havePeerSession(NodeId _id) { RecursiveGuard l(x_sessions); return m_sessions.count(_id) ? !!m_sessions[_id].lock() : false; } - + bool havePeerSession(NodeId const& _id) { return !!peerSession(_id); } + /// Determines and sets m_tcpPublic to publicly advertised address. void determinePublic(); diff --git a/test/libethereum/blockchain.cpp b/test/libethereum/blockchain.cpp index 6455eee17..6dea609e2 100644 --- a/test/libethereum/blockchain.cpp +++ b/test/libethereum/blockchain.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include using namespace std;