From be6dd3b62b74d7e4f14c23f835b3cebacdc91389 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Sun, 3 May 2015 15:38:42 +0100 Subject: [PATCH] Optimisations and fixes for the BlockQueue. --- libethereum/BlockChain.cpp | 2 +- libethereum/BlockQueue.cpp | 51 ++++++++++++++++++++++++-------- libethereum/BlockQueue.h | 18 +++++------ libethereum/Client.cpp | 5 ++-- libethereum/Transaction.h | 3 +- libethereum/TransactionQueue.cpp | 46 +++++++++++++++++++++++----- libethereum/TransactionQueue.h | 4 +++ 7 files changed, 95 insertions(+), 34 deletions(-) diff --git a/libethereum/BlockChain.cpp b/libethereum/BlockChain.cpp index e7728f1d2..69078b400 100644 --- a/libethereum/BlockChain.cpp +++ b/libethereum/BlockChain.cpp @@ -303,7 +303,7 @@ LastHashes BlockChain::lastHashes(unsigned _n) const tuple BlockChain::sync(BlockQueue& _bq, OverlayDB const& _stateDB, unsigned _max) { - _bq.tick(*this); +// _bq.tick(*this); vector blocks; _bq.drain(blocks, _max); diff --git a/libethereum/BlockQueue.cpp b/libethereum/BlockQueue.cpp index 44ddda637..0cccf8e6b 100644 --- a/libethereum/BlockQueue.cpp +++ b/libethereum/BlockQueue.cpp @@ -74,17 +74,19 @@ ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc, boo } UpgradeGuard ul(l); + invariants_WITH_LOCK(); // Check it's not in the future (void)_isOurs; if (bi.timestamp > (u256)time(0)/* && !_isOurs*/) { - m_future.insert(make_pair((unsigned)bi.timestamp, _block.toBytes())); + m_future.insert(make_pair((unsigned)bi.timestamp, make_pair(h, _block.toBytes()))); char buf[24]; time_t bit = (unsigned)bi.timestamp; if (strftime(buf, 24, "%X", localtime(&bit)) == 0) buf[0] = '\0'; // empty if case strftime fails cblockq << "OK - queued for future [" << bi.timestamp << "vs" << time(0) << "] - will wait until" << buf; + invariants_WITH_LOCK(); return ImportResult::FutureTime; } else @@ -94,6 +96,7 @@ ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc, boo { m_knownBad.insert(bi.hash()); // bad parent; this is bad too, note it as such + invariants_WITH_LOCK(); return ImportResult::BadChain; } else if (!m_readySet.count(bi.parentHash) && !m_drainingSet.count(bi.parentHash) && !_bc.isKnown(bi.parentHash)) @@ -103,16 +106,18 @@ ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc, boo m_unknown.insert(make_pair(bi.parentHash, make_pair(h, _block.toBytes()))); m_unknownSet.insert(h); + invariants_WITH_LOCK(); return ImportResult::UnknownParent; } else { // If valid, append to blocks. cblockq << "OK - ready for chain insertion."; - m_ready.push_back(_block.toBytes()); + m_ready.push_back(make_pair(h, _block.toBytes())); m_readySet.insert(h); + invariants_WITH_LOCK(); - noteReadyWithoutWriteGuard(h); + noteReady_WITH_LOCK(h); m_onReady(); return ImportResult::Success; } @@ -122,27 +127,33 @@ ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc, boo bool BlockQueue::doneDrain(h256s const& _bad) { WriteGuard l(m_lock); + invariants_WITH_LOCK(); m_drainingSet.clear(); if (_bad.size()) { - vector old; + vector> old; swap(m_ready, old); for (auto& b: old) { - BlockInfo bi(b); + BlockInfo bi(b.second); if (m_knownBad.count(bi.parentHash)) - m_knownBad.insert(bi.hash()); + { + m_knownBad.insert(b.first); + m_readySet.erase(b.first); + } else m_ready.push_back(std::move(b)); } } m_knownBad += _bad; + // GAA!!!! NEVER EMPTY?!?!?! TODO: remove items from readySet! + invariants_WITH_LOCK(); return !m_readySet.empty(); } void BlockQueue::tick(BlockChain const& _bc) { - vector todo; + vector> todo; { UpgradableGuard l(m_lock); if (m_future.empty()) @@ -158,16 +169,18 @@ void BlockQueue::tick(BlockChain const& _bc) { UpgradeGuard l2(l); + invariants_WITH_LOCK(); auto end = m_future.lower_bound(t); for (auto i = m_future.begin(); i != end; ++i) todo.push_back(move(i->second)); m_future.erase(m_future.begin(), end); + invariants_WITH_LOCK(); } } cblockq << "Importing" << todo.size() << "past-future blocks."; for (auto const& b: todo) - import(&b, _bc); + import(&b.second, _bc); } template T advanced(T _t, unsigned _n) @@ -194,25 +207,34 @@ QueueStatus BlockQueue::blockStatus(h256 const& _h) const void BlockQueue::drain(std::vector& o_out, unsigned _max) { WriteGuard l(m_lock); + invariants_WITH_LOCK(); if (m_drainingSet.empty()) { o_out.resize(min(_max, m_ready.size())); for (unsigned i = 0; i < o_out.size(); ++i) - swap(o_out[i], m_ready[i]); + swap(o_out[i], m_ready[i].second); m_ready.erase(m_ready.begin(), advanced(m_ready.begin(), o_out.size())); for (auto const& bs: o_out) { - auto h = sha3(bs); + // TODO: @optimise use map rather than vector & set. + auto h = BlockInfo::headerHash(bs); m_drainingSet.insert(h); m_readySet.erase(h); } // swap(o_out, m_ready); // swap(m_drainingSet, m_readySet); } + invariants_WITH_LOCK(); } -void BlockQueue::noteReadyWithoutWriteGuard(h256 _good) +void BlockQueue::invariants_WITH_LOCK() const { + assert(m_readySet.size() == m_ready.size()); +} + +void BlockQueue::noteReady_WITH_LOCK(h256 const& _good) +{ + invariants_WITH_LOCK(); list goodQueue(1, _good); while (!goodQueue.empty()) { @@ -220,7 +242,7 @@ void BlockQueue::noteReadyWithoutWriteGuard(h256 _good) goodQueue.pop_front(); for (auto it = r.first; it != r.second; ++it) { - m_ready.push_back(it->second.second); + m_ready.push_back(it->second); auto newReady = it->second.first; m_unknownSet.erase(newReady); m_readySet.insert(newReady); @@ -228,16 +250,19 @@ void BlockQueue::noteReadyWithoutWriteGuard(h256 _good) } m_unknown.erase(r.first, r.second); } + invariants_WITH_LOCK(); } void BlockQueue::retryAllUnknown() { + invariants_WITH_LOCK(); for (auto it = m_unknown.begin(); it != m_unknown.end(); ++it) { - m_ready.push_back(it->second.second); + m_ready.push_back(it->second); auto newReady = it->second.first; m_unknownSet.erase(newReady); m_readySet.insert(newReady); } m_unknown.clear(); + invariants_WITH_LOCK(); } diff --git a/libethereum/BlockQueue.h b/libethereum/BlockQueue.h index adcc6ab39..1b1612fb7 100644 --- a/libethereum/BlockQueue.h +++ b/libethereum/BlockQueue.h @@ -78,7 +78,7 @@ public: bool doneDrain(h256s const& _knownBad = h256s()); /// Notify the queue that the chain has changed and a new block has attained 'ready' status (i.e. is in the chain). - void noteReady(h256 _b) { WriteGuard l(m_lock); noteReadyWithoutWriteGuard(_b); } + void noteReady(h256 const& _b) { WriteGuard l(m_lock); noteReady_WITH_LOCK(_b); } /// Force a retry of all the blocks with unknown parents. void retryAllUnknown(); @@ -87,7 +87,7 @@ public: std::pair items() const { ReadGuard l(m_lock); return std::make_pair(m_ready.size(), m_unknown.size()); } /// Clear everything. - void clear() { WriteGuard l(m_lock); m_readySet.clear(); m_drainingSet.clear(); m_ready.clear(); m_unknownSet.clear(); m_unknown.clear(); m_future.clear(); } + void clear() { WriteGuard l(m_lock); invariants_WITH_LOCK(); m_readySet.clear(); m_drainingSet.clear(); m_ready.clear(); m_unknownSet.clear(); m_unknown.clear(); m_future.clear(); invariants_WITH_LOCK(); } /// Return first block with an unknown parent. h256 firstUnknown() const { ReadGuard l(m_lock); return m_unknownSet.size() ? *m_unknownSet.begin() : h256(); } @@ -101,18 +101,18 @@ public: template Handler onReady(T const& _t) { return m_onReady.add(_t); } private: - void noteReadyWithoutWriteGuard(h256 _b); - void notePresentWithoutWriteGuard(bytesConstRef _block); + void noteReady_WITH_LOCK(h256 const& _b); + void invariants_WITH_LOCK() const; mutable boost::shared_mutex m_lock; ///< General lock. - std::set m_readySet; ///< All blocks ready for chain-import. std::set m_drainingSet; ///< All blocks being imported. - std::vector m_ready; ///< List of blocks, in correct order, ready for chain-import. + std::set m_readySet; ///< All blocks ready for chain-import. + std::vector> m_ready; ///< List of blocks, in correct order, ready for chain-import. std::set m_unknownSet; ///< Set of all blocks whose parents are not ready/in-chain. - std::multimap> m_unknown; ///< For transactions that have an unknown parent; we map their parent hash to the block stuff, and insert once the block appears. - std::multimap m_future; ///< Set of blocks that are not yet valid. + std::multimap> m_unknown; ///< For blocks that have an unknown parent; we map their parent hash to the block stuff, and insert once the block appears. std::set m_knownBad; ///< Set of blocks that we know will never be valid. - Signal m_onReady; ///< Called when a subsequent call to import transactions will return a non-empty container. Be nice and exit fast. + std::multimap> m_future;///< Set of blocks that are not yet valid. + Signal m_onReady; ///< Called when a subsequent call to import blocks will return a non-empty container. Be nice and exit fast. }; } diff --git a/libethereum/Client.cpp b/libethereum/Client.cpp index 0643ea31a..8d049d404 100644 --- a/libethereum/Client.cpp +++ b/libethereum/Client.cpp @@ -582,11 +582,12 @@ void Client::onChainChanged(ImportRoute const& _ir) m_preMine = newPreMine; DEV_TIMED(working) ETH_WRITE_GUARDED(x_working) m_working = newPreMine; +// Transactions ts = m_postMine.pending(); ETH_READ_GUARDED(x_postMine) for (auto const& t: m_postMine.pending()) { clog(ClientNote) << "Resubmitting post-mine transaction " << t; - m_tq.import(t.rlp(), TransactionQueue::ImportCallback(), IfDropped::Retry); + m_tq.import(t, TransactionQueue::ImportCallback(), IfDropped::Retry); } ETH_READ_GUARDED(x_working) DEV_TIMED(post) ETH_WRITE_GUARDED(x_postMine) m_postMine = m_working; @@ -662,7 +663,7 @@ void Client::doWork() bool t = true; if (m_syncBlockQueue.compare_exchange_strong(t, false)) - syncBlockQueue(); + syncBlockQueue(); // GAAA!!!!! CALLED TOO OFTEN!!! t = true; if (m_syncTransactionQueue.compare_exchange_strong(t, false) && !m_remoteWorking) diff --git a/libethereum/Transaction.h b/libethereum/Transaction.h index 09102e0ba..4271a5f27 100644 --- a/libethereum/Transaction.h +++ b/libethereum/Transaction.h @@ -149,7 +149,7 @@ public: bytes rlp(IncludeSignature _sig = WithSignature) const { RLPStream s; streamRLP(s, _sig); return s.out(); } /// @returns the SHA3 hash of the RLP serialisation of this transaction. - h256 sha3(IncludeSignature _sig = WithSignature) const { RLPStream s; streamRLP(s, _sig); return dev::sha3(s.out()); } + h256 sha3(IncludeSignature _sig = WithSignature) const { if (_sig == WithSignature && m_hashWith) return m_hashWith; RLPStream s; streamRLP(s, _sig); auto ret = dev::sha3(s.out()); if (_sig == WithSignature) m_hashWith = ret; return ret; } /// @returns the amount of ETH to be transferred by this (message-call) transaction, in Wei. Synonym for endowment(). u256 value() const { return m_value; } @@ -211,6 +211,7 @@ private: bytes m_data; ///< The data associated with the transaction, or the initialiser if it's a creation transaction. SignatureStruct m_vrs; ///< The signature of the transaction. Encodes the sender. + mutable h256 m_hashWith; ///< Cached hash of transaction with signature. mutable Address m_sender; ///< Cached sender, determined from signature. mutable bigint m_gasRequired = 0; ///< Memoised amount required for the transaction to run. }; diff --git a/libethereum/TransactionQueue.cpp b/libethereum/TransactionQueue.cpp index 57429d32c..1bfdf535a 100644 --- a/libethereum/TransactionQueue.cpp +++ b/libethereum/TransactionQueue.cpp @@ -38,26 +38,56 @@ ImportResult TransactionQueue::import(bytesConstRef _transactionRLP, ImportCallb UpgradableGuard l(m_lock); // TODO: keep old transactions around and check in State for nonce validity - if (m_known.count(h)) + auto ir = check_WITH_LOCK(h, _ik); + if (ir != ImportResult::Success) + return ir; + + Transaction t(_transactionRLP, CheckTransaction::Everything); + UpgradeGuard ul(l); + return manageImport_WITH_LOCK(h, t, _cb); +} + +ImportResult TransactionQueue::check_WITH_LOCK(h256 const& _h, IfDropped _ik) +{ + if (m_known.count(_h)) return ImportResult::AlreadyKnown; - if (m_dropped.count(h) && _ik == IfDropped::Ignore) + if (m_dropped.count(_h) && _ik == IfDropped::Ignore) return ImportResult::AlreadyInChain; + return ImportResult::Success; +} + +ImportResult TransactionQueue::import(Transaction const& _transaction, ImportCallback const& _cb, IfDropped _ik) +{ + // Check if we already know this transaction. + h256 h = _transaction.sha3(WithSignature); + + UpgradableGuard l(m_lock); + // TODO: keep old transactions around and check in State for nonce validity + + auto ir = check_WITH_LOCK(h, _ik); + if (ir != ImportResult::Success) + return ir; + + UpgradeGuard ul(l); + return manageImport_WITH_LOCK(h, _transaction, _cb); +} + +ImportResult TransactionQueue::manageImport_WITH_LOCK(h256 const& _h, Transaction const& _transaction, ImportCallback const& _cb) +{ try { // Check validity of _transactionRLP as a transaction. To do this we just deserialise and attempt to determine the sender. // If it doesn't work, the signature is bad. // The transaction's nonce may yet be invalid (or, it could be "valid" but we may be missing a marginally older transaction). - Transaction t(_transactionRLP, CheckTransaction::Everything); - UpgradeGuard ul(l); // If valid, append to blocks. - insertCurrent_WITH_LOCK(make_pair(h, t)); - m_known.insert(h); + insertCurrent_WITH_LOCK(make_pair(_h, _transaction)); + m_known.insert(_h); if (_cb) - m_callbacks[h] = _cb; - ctxq << "Queued vaguely legit-looking transaction" << h; + m_callbacks[_h] = _cb; + ctxq << "Queued vaguely legit-looking transaction" << _h; m_onReady(); } catch (Exception const& _e) diff --git a/libethereum/TransactionQueue.h b/libethereum/TransactionQueue.h index 16bc34641..69e1c935f 100644 --- a/libethereum/TransactionQueue.h +++ b/libethereum/TransactionQueue.h @@ -49,6 +49,7 @@ class TransactionQueue public: using ImportCallback = std::function; + ImportResult import(Transaction const& _tx, ImportCallback const& _cb = ImportCallback(), IfDropped _ik = IfDropped::Ignore); ImportResult import(bytes const& _tx, ImportCallback const& _cb = ImportCallback(), IfDropped _ik = IfDropped::Ignore) { return import(&_tx, _cb, _ik); } ImportResult import(bytesConstRef _tx, ImportCallback const& _cb = ImportCallback(), IfDropped _ik = IfDropped::Ignore); @@ -65,6 +66,9 @@ public: template Handler onReady(T const& _t) { return m_onReady.add(_t); } private: + ImportResult check_WITH_LOCK(h256 const& _h, IfDropped _ik); + ImportResult manageImport_WITH_LOCK(h256 const& _h, Transaction const& _transaction, ImportCallback const& _cb); + void insertCurrent_WITH_LOCK(std::pair const& _p); bool removeCurrent_WITH_LOCK(h256 const& _txHash);