Browse Source

Optimisations and fixes for the BlockQueue.

cl-refactor
Gav Wood 10 years ago
parent
commit
be6dd3b62b
  1. 2
      libethereum/BlockChain.cpp
  2. 51
      libethereum/BlockQueue.cpp
  3. 18
      libethereum/BlockQueue.h
  4. 5
      libethereum/Client.cpp
  5. 3
      libethereum/Transaction.h
  6. 46
      libethereum/TransactionQueue.cpp
  7. 4
      libethereum/TransactionQueue.h

2
libethereum/BlockChain.cpp

@ -303,7 +303,7 @@ LastHashes BlockChain::lastHashes(unsigned _n) const
tuple<h256s, h256s, bool> BlockChain::sync(BlockQueue& _bq, OverlayDB const& _stateDB, unsigned _max) tuple<h256s, h256s, bool> BlockChain::sync(BlockQueue& _bq, OverlayDB const& _stateDB, unsigned _max)
{ {
_bq.tick(*this); // _bq.tick(*this);
vector<bytes> blocks; vector<bytes> blocks;
_bq.drain(blocks, _max); _bq.drain(blocks, _max);

51
libethereum/BlockQueue.cpp

@ -74,17 +74,19 @@ ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc, boo
} }
UpgradeGuard ul(l); UpgradeGuard ul(l);
invariants_WITH_LOCK();
// Check it's not in the future // Check it's not in the future
(void)_isOurs; (void)_isOurs;
if (bi.timestamp > (u256)time(0)/* && !_isOurs*/) if (bi.timestamp > (u256)time(0)/* && !_isOurs*/)
{ {
m_future.insert(make_pair((unsigned)bi.timestamp, _block.toBytes())); m_future.insert(make_pair((unsigned)bi.timestamp, make_pair(h, _block.toBytes())));
char buf[24]; char buf[24];
time_t bit = (unsigned)bi.timestamp; time_t bit = (unsigned)bi.timestamp;
if (strftime(buf, 24, "%X", localtime(&bit)) == 0) if (strftime(buf, 24, "%X", localtime(&bit)) == 0)
buf[0] = '\0'; // empty if case strftime fails buf[0] = '\0'; // empty if case strftime fails
cblockq << "OK - queued for future [" << bi.timestamp << "vs" << time(0) << "] - will wait until" << buf; cblockq << "OK - queued for future [" << bi.timestamp << "vs" << time(0) << "] - will wait until" << buf;
invariants_WITH_LOCK();
return ImportResult::FutureTime; return ImportResult::FutureTime;
} }
else else
@ -94,6 +96,7 @@ ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc, boo
{ {
m_knownBad.insert(bi.hash()); m_knownBad.insert(bi.hash());
// bad parent; this is bad too, note it as such // bad parent; this is bad too, note it as such
invariants_WITH_LOCK();
return ImportResult::BadChain; return ImportResult::BadChain;
} }
else if (!m_readySet.count(bi.parentHash) && !m_drainingSet.count(bi.parentHash) && !_bc.isKnown(bi.parentHash)) else if (!m_readySet.count(bi.parentHash) && !m_drainingSet.count(bi.parentHash) && !_bc.isKnown(bi.parentHash))
@ -103,16 +106,18 @@ ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc, boo
m_unknown.insert(make_pair(bi.parentHash, make_pair(h, _block.toBytes()))); m_unknown.insert(make_pair(bi.parentHash, make_pair(h, _block.toBytes())));
m_unknownSet.insert(h); m_unknownSet.insert(h);
invariants_WITH_LOCK();
return ImportResult::UnknownParent; return ImportResult::UnknownParent;
} }
else else
{ {
// If valid, append to blocks. // If valid, append to blocks.
cblockq << "OK - ready for chain insertion."; cblockq << "OK - ready for chain insertion.";
m_ready.push_back(_block.toBytes()); m_ready.push_back(make_pair(h, _block.toBytes()));
m_readySet.insert(h); m_readySet.insert(h);
invariants_WITH_LOCK();
noteReadyWithoutWriteGuard(h); noteReady_WITH_LOCK(h);
m_onReady(); m_onReady();
return ImportResult::Success; return ImportResult::Success;
} }
@ -122,27 +127,33 @@ ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc, boo
bool BlockQueue::doneDrain(h256s const& _bad) bool BlockQueue::doneDrain(h256s const& _bad)
{ {
WriteGuard l(m_lock); WriteGuard l(m_lock);
invariants_WITH_LOCK();
m_drainingSet.clear(); m_drainingSet.clear();
if (_bad.size()) if (_bad.size())
{ {
vector<bytes> old; vector<pair<h256, bytes>> old;
swap(m_ready, old); swap(m_ready, old);
for (auto& b: old) for (auto& b: old)
{ {
BlockInfo bi(b); BlockInfo bi(b.second);
if (m_knownBad.count(bi.parentHash)) if (m_knownBad.count(bi.parentHash))
m_knownBad.insert(bi.hash()); {
m_knownBad.insert(b.first);
m_readySet.erase(b.first);
}
else else
m_ready.push_back(std::move(b)); m_ready.push_back(std::move(b));
} }
} }
m_knownBad += _bad; m_knownBad += _bad;
// GAA!!!! NEVER EMPTY?!?!?! TODO: remove items from readySet!
invariants_WITH_LOCK();
return !m_readySet.empty(); return !m_readySet.empty();
} }
void BlockQueue::tick(BlockChain const& _bc) void BlockQueue::tick(BlockChain const& _bc)
{ {
vector<bytes> todo; vector<pair<h256, bytes>> todo;
{ {
UpgradableGuard l(m_lock); UpgradableGuard l(m_lock);
if (m_future.empty()) if (m_future.empty())
@ -158,16 +169,18 @@ void BlockQueue::tick(BlockChain const& _bc)
{ {
UpgradeGuard l2(l); UpgradeGuard l2(l);
invariants_WITH_LOCK();
auto end = m_future.lower_bound(t); auto end = m_future.lower_bound(t);
for (auto i = m_future.begin(); i != end; ++i) for (auto i = m_future.begin(); i != end; ++i)
todo.push_back(move(i->second)); todo.push_back(move(i->second));
m_future.erase(m_future.begin(), end); m_future.erase(m_future.begin(), end);
invariants_WITH_LOCK();
} }
} }
cblockq << "Importing" << todo.size() << "past-future blocks."; cblockq << "Importing" << todo.size() << "past-future blocks.";
for (auto const& b: todo) for (auto const& b: todo)
import(&b, _bc); import(&b.second, _bc);
} }
template <class T> T advanced(T _t, unsigned _n) template <class T> T advanced(T _t, unsigned _n)
@ -194,25 +207,34 @@ QueueStatus BlockQueue::blockStatus(h256 const& _h) const
void BlockQueue::drain(std::vector<bytes>& o_out, unsigned _max) void BlockQueue::drain(std::vector<bytes>& o_out, unsigned _max)
{ {
WriteGuard l(m_lock); WriteGuard l(m_lock);
invariants_WITH_LOCK();
if (m_drainingSet.empty()) if (m_drainingSet.empty())
{ {
o_out.resize(min<unsigned>(_max, m_ready.size())); o_out.resize(min<unsigned>(_max, m_ready.size()));
for (unsigned i = 0; i < o_out.size(); ++i) for (unsigned i = 0; i < o_out.size(); ++i)
swap(o_out[i], m_ready[i]); swap(o_out[i], m_ready[i].second);
m_ready.erase(m_ready.begin(), advanced(m_ready.begin(), o_out.size())); m_ready.erase(m_ready.begin(), advanced(m_ready.begin(), o_out.size()));
for (auto const& bs: o_out) for (auto const& bs: o_out)
{ {
auto h = sha3(bs); // TODO: @optimise use map<h256, bytes> rather than vector<bytes> & set<h256>.
auto h = BlockInfo::headerHash(bs);
m_drainingSet.insert(h); m_drainingSet.insert(h);
m_readySet.erase(h); m_readySet.erase(h);
} }
// swap(o_out, m_ready); // swap(o_out, m_ready);
// swap(m_drainingSet, m_readySet); // swap(m_drainingSet, m_readySet);
} }
invariants_WITH_LOCK();
} }
void BlockQueue::noteReadyWithoutWriteGuard(h256 _good) void BlockQueue::invariants_WITH_LOCK() const
{ {
assert(m_readySet.size() == m_ready.size());
}
void BlockQueue::noteReady_WITH_LOCK(h256 const& _good)
{
invariants_WITH_LOCK();
list<h256> goodQueue(1, _good); list<h256> goodQueue(1, _good);
while (!goodQueue.empty()) while (!goodQueue.empty())
{ {
@ -220,7 +242,7 @@ void BlockQueue::noteReadyWithoutWriteGuard(h256 _good)
goodQueue.pop_front(); goodQueue.pop_front();
for (auto it = r.first; it != r.second; ++it) for (auto it = r.first; it != r.second; ++it)
{ {
m_ready.push_back(it->second.second); m_ready.push_back(it->second);
auto newReady = it->second.first; auto newReady = it->second.first;
m_unknownSet.erase(newReady); m_unknownSet.erase(newReady);
m_readySet.insert(newReady); m_readySet.insert(newReady);
@ -228,16 +250,19 @@ void BlockQueue::noteReadyWithoutWriteGuard(h256 _good)
} }
m_unknown.erase(r.first, r.second); m_unknown.erase(r.first, r.second);
} }
invariants_WITH_LOCK();
} }
void BlockQueue::retryAllUnknown() void BlockQueue::retryAllUnknown()
{ {
invariants_WITH_LOCK();
for (auto it = m_unknown.begin(); it != m_unknown.end(); ++it) for (auto it = m_unknown.begin(); it != m_unknown.end(); ++it)
{ {
m_ready.push_back(it->second.second); m_ready.push_back(it->second);
auto newReady = it->second.first; auto newReady = it->second.first;
m_unknownSet.erase(newReady); m_unknownSet.erase(newReady);
m_readySet.insert(newReady); m_readySet.insert(newReady);
} }
m_unknown.clear(); m_unknown.clear();
invariants_WITH_LOCK();
} }

18
libethereum/BlockQueue.h

@ -78,7 +78,7 @@ public:
bool doneDrain(h256s const& _knownBad = h256s()); bool doneDrain(h256s const& _knownBad = h256s());
/// Notify the queue that the chain has changed and a new block has attained 'ready' status (i.e. is in the chain). /// Notify the queue that the chain has changed and a new block has attained 'ready' status (i.e. is in the chain).
void noteReady(h256 _b) { WriteGuard l(m_lock); noteReadyWithoutWriteGuard(_b); } void noteReady(h256 const& _b) { WriteGuard l(m_lock); noteReady_WITH_LOCK(_b); }
/// Force a retry of all the blocks with unknown parents. /// Force a retry of all the blocks with unknown parents.
void retryAllUnknown(); void retryAllUnknown();
@ -87,7 +87,7 @@ public:
std::pair<unsigned, unsigned> items() const { ReadGuard l(m_lock); return std::make_pair(m_ready.size(), m_unknown.size()); } std::pair<unsigned, unsigned> items() const { ReadGuard l(m_lock); return std::make_pair(m_ready.size(), m_unknown.size()); }
/// Clear everything. /// Clear everything.
void clear() { WriteGuard l(m_lock); m_readySet.clear(); m_drainingSet.clear(); m_ready.clear(); m_unknownSet.clear(); m_unknown.clear(); m_future.clear(); } void clear() { WriteGuard l(m_lock); invariants_WITH_LOCK(); m_readySet.clear(); m_drainingSet.clear(); m_ready.clear(); m_unknownSet.clear(); m_unknown.clear(); m_future.clear(); invariants_WITH_LOCK(); }
/// Return first block with an unknown parent. /// Return first block with an unknown parent.
h256 firstUnknown() const { ReadGuard l(m_lock); return m_unknownSet.size() ? *m_unknownSet.begin() : h256(); } h256 firstUnknown() const { ReadGuard l(m_lock); return m_unknownSet.size() ? *m_unknownSet.begin() : h256(); }
@ -101,18 +101,18 @@ public:
template <class T> Handler onReady(T const& _t) { return m_onReady.add(_t); } template <class T> Handler onReady(T const& _t) { return m_onReady.add(_t); }
private: private:
void noteReadyWithoutWriteGuard(h256 _b); void noteReady_WITH_LOCK(h256 const& _b);
void notePresentWithoutWriteGuard(bytesConstRef _block); void invariants_WITH_LOCK() const;
mutable boost::shared_mutex m_lock; ///< General lock. mutable boost::shared_mutex m_lock; ///< General lock.
std::set<h256> m_readySet; ///< All blocks ready for chain-import.
std::set<h256> m_drainingSet; ///< All blocks being imported. std::set<h256> m_drainingSet; ///< All blocks being imported.
std::vector<bytes> m_ready; ///< List of blocks, in correct order, ready for chain-import. std::set<h256> m_readySet; ///< All blocks ready for chain-import.
std::vector<std::pair<h256, bytes>> m_ready; ///< List of blocks, in correct order, ready for chain-import.
std::set<h256> m_unknownSet; ///< Set of all blocks whose parents are not ready/in-chain. std::set<h256> m_unknownSet; ///< Set of all blocks whose parents are not ready/in-chain.
std::multimap<h256, std::pair<h256, bytes>> m_unknown; ///< For transactions that have an unknown parent; we map their parent hash to the block stuff, and insert once the block appears. std::multimap<h256, std::pair<h256, bytes>> m_unknown; ///< For blocks that have an unknown parent; we map their parent hash to the block stuff, and insert once the block appears.
std::multimap<unsigned, bytes> m_future; ///< Set of blocks that are not yet valid.
std::set<h256> m_knownBad; ///< Set of blocks that we know will never be valid. std::set<h256> m_knownBad; ///< Set of blocks that we know will never be valid.
Signal m_onReady; ///< Called when a subsequent call to import transactions will return a non-empty container. Be nice and exit fast. std::multimap<unsigned, std::pair<h256, bytes>> m_future;///< Set of blocks that are not yet valid.
Signal m_onReady; ///< Called when a subsequent call to import blocks will return a non-empty container. Be nice and exit fast.
}; };
} }

5
libethereum/Client.cpp

@ -582,11 +582,12 @@ void Client::onChainChanged(ImportRoute const& _ir)
m_preMine = newPreMine; m_preMine = newPreMine;
DEV_TIMED(working) ETH_WRITE_GUARDED(x_working) DEV_TIMED(working) ETH_WRITE_GUARDED(x_working)
m_working = newPreMine; m_working = newPreMine;
// Transactions ts = m_postMine.pending();
ETH_READ_GUARDED(x_postMine) ETH_READ_GUARDED(x_postMine)
for (auto const& t: m_postMine.pending()) for (auto const& t: m_postMine.pending())
{ {
clog(ClientNote) << "Resubmitting post-mine transaction " << t; clog(ClientNote) << "Resubmitting post-mine transaction " << t;
m_tq.import(t.rlp(), TransactionQueue::ImportCallback(), IfDropped::Retry); m_tq.import(t, TransactionQueue::ImportCallback(), IfDropped::Retry);
} }
ETH_READ_GUARDED(x_working) DEV_TIMED(post) ETH_WRITE_GUARDED(x_postMine) ETH_READ_GUARDED(x_working) DEV_TIMED(post) ETH_WRITE_GUARDED(x_postMine)
m_postMine = m_working; m_postMine = m_working;
@ -662,7 +663,7 @@ void Client::doWork()
bool t = true; bool t = true;
if (m_syncBlockQueue.compare_exchange_strong(t, false)) if (m_syncBlockQueue.compare_exchange_strong(t, false))
syncBlockQueue(); syncBlockQueue(); // GAAA!!!!! CALLED TOO OFTEN!!!
t = true; t = true;
if (m_syncTransactionQueue.compare_exchange_strong(t, false) && !m_remoteWorking) if (m_syncTransactionQueue.compare_exchange_strong(t, false) && !m_remoteWorking)

3
libethereum/Transaction.h

@ -149,7 +149,7 @@ public:
bytes rlp(IncludeSignature _sig = WithSignature) const { RLPStream s; streamRLP(s, _sig); return s.out(); } bytes rlp(IncludeSignature _sig = WithSignature) const { RLPStream s; streamRLP(s, _sig); return s.out(); }
/// @returns the SHA3 hash of the RLP serialisation of this transaction. /// @returns the SHA3 hash of the RLP serialisation of this transaction.
h256 sha3(IncludeSignature _sig = WithSignature) const { RLPStream s; streamRLP(s, _sig); return dev::sha3(s.out()); } h256 sha3(IncludeSignature _sig = WithSignature) const { if (_sig == WithSignature && m_hashWith) return m_hashWith; RLPStream s; streamRLP(s, _sig); auto ret = dev::sha3(s.out()); if (_sig == WithSignature) m_hashWith = ret; return ret; }
/// @returns the amount of ETH to be transferred by this (message-call) transaction, in Wei. Synonym for endowment(). /// @returns the amount of ETH to be transferred by this (message-call) transaction, in Wei. Synonym for endowment().
u256 value() const { return m_value; } u256 value() const { return m_value; }
@ -211,6 +211,7 @@ private:
bytes m_data; ///< The data associated with the transaction, or the initialiser if it's a creation transaction. bytes m_data; ///< The data associated with the transaction, or the initialiser if it's a creation transaction.
SignatureStruct m_vrs; ///< The signature of the transaction. Encodes the sender. SignatureStruct m_vrs; ///< The signature of the transaction. Encodes the sender.
mutable h256 m_hashWith; ///< Cached hash of transaction with signature.
mutable Address m_sender; ///< Cached sender, determined from signature. mutable Address m_sender; ///< Cached sender, determined from signature.
mutable bigint m_gasRequired = 0; ///< Memoised amount required for the transaction to run. mutable bigint m_gasRequired = 0; ///< Memoised amount required for the transaction to run.
}; };

46
libethereum/TransactionQueue.cpp

@ -38,26 +38,56 @@ ImportResult TransactionQueue::import(bytesConstRef _transactionRLP, ImportCallb
UpgradableGuard l(m_lock); UpgradableGuard l(m_lock);
// TODO: keep old transactions around and check in State for nonce validity // TODO: keep old transactions around and check in State for nonce validity
if (m_known.count(h)) auto ir = check_WITH_LOCK(h, _ik);
if (ir != ImportResult::Success)
return ir;
Transaction t(_transactionRLP, CheckTransaction::Everything);
UpgradeGuard ul(l);
return manageImport_WITH_LOCK(h, t, _cb);
}
ImportResult TransactionQueue::check_WITH_LOCK(h256 const& _h, IfDropped _ik)
{
if (m_known.count(_h))
return ImportResult::AlreadyKnown; return ImportResult::AlreadyKnown;
if (m_dropped.count(h) && _ik == IfDropped::Ignore) if (m_dropped.count(_h) && _ik == IfDropped::Ignore)
return ImportResult::AlreadyInChain; return ImportResult::AlreadyInChain;
return ImportResult::Success;
}
ImportResult TransactionQueue::import(Transaction const& _transaction, ImportCallback const& _cb, IfDropped _ik)
{
// Check if we already know this transaction.
h256 h = _transaction.sha3(WithSignature);
UpgradableGuard l(m_lock);
// TODO: keep old transactions around and check in State for nonce validity
auto ir = check_WITH_LOCK(h, _ik);
if (ir != ImportResult::Success)
return ir;
UpgradeGuard ul(l);
return manageImport_WITH_LOCK(h, _transaction, _cb);
}
ImportResult TransactionQueue::manageImport_WITH_LOCK(h256 const& _h, Transaction const& _transaction, ImportCallback const& _cb)
{
try try
{ {
// Check validity of _transactionRLP as a transaction. To do this we just deserialise and attempt to determine the sender. // Check validity of _transactionRLP as a transaction. To do this we just deserialise and attempt to determine the sender.
// If it doesn't work, the signature is bad. // If it doesn't work, the signature is bad.
// The transaction's nonce may yet be invalid (or, it could be "valid" but we may be missing a marginally older transaction). // The transaction's nonce may yet be invalid (or, it could be "valid" but we may be missing a marginally older transaction).
Transaction t(_transactionRLP, CheckTransaction::Everything);
UpgradeGuard ul(l);
// If valid, append to blocks. // If valid, append to blocks.
insertCurrent_WITH_LOCK(make_pair(h, t)); insertCurrent_WITH_LOCK(make_pair(_h, _transaction));
m_known.insert(h); m_known.insert(_h);
if (_cb) if (_cb)
m_callbacks[h] = _cb; m_callbacks[_h] = _cb;
ctxq << "Queued vaguely legit-looking transaction" << h; ctxq << "Queued vaguely legit-looking transaction" << _h;
m_onReady(); m_onReady();
} }
catch (Exception const& _e) catch (Exception const& _e)

4
libethereum/TransactionQueue.h

@ -49,6 +49,7 @@ class TransactionQueue
public: public:
using ImportCallback = std::function<void(ImportResult)>; using ImportCallback = std::function<void(ImportResult)>;
ImportResult import(Transaction const& _tx, ImportCallback const& _cb = ImportCallback(), IfDropped _ik = IfDropped::Ignore);
ImportResult import(bytes const& _tx, ImportCallback const& _cb = ImportCallback(), IfDropped _ik = IfDropped::Ignore) { return import(&_tx, _cb, _ik); } ImportResult import(bytes const& _tx, ImportCallback const& _cb = ImportCallback(), IfDropped _ik = IfDropped::Ignore) { return import(&_tx, _cb, _ik); }
ImportResult import(bytesConstRef _tx, ImportCallback const& _cb = ImportCallback(), IfDropped _ik = IfDropped::Ignore); ImportResult import(bytesConstRef _tx, ImportCallback const& _cb = ImportCallback(), IfDropped _ik = IfDropped::Ignore);
@ -65,6 +66,9 @@ public:
template <class T> Handler onReady(T const& _t) { return m_onReady.add(_t); } template <class T> Handler onReady(T const& _t) { return m_onReady.add(_t); }
private: private:
ImportResult check_WITH_LOCK(h256 const& _h, IfDropped _ik);
ImportResult manageImport_WITH_LOCK(h256 const& _h, Transaction const& _transaction, ImportCallback const& _cb);
void insertCurrent_WITH_LOCK(std::pair<h256, Transaction> const& _p); void insertCurrent_WITH_LOCK(std::pair<h256, Transaction> const& _p);
bool removeCurrent_WITH_LOCK(h256 const& _txHash); bool removeCurrent_WITH_LOCK(h256 const& _txHash);

Loading…
Cancel
Save