Browse Source

Minimise write-locking of DB. Fixes #1676

cl-refactor
Gav Wood 10 years ago
parent
commit
976c990ddb
  1. 90
      libethereum/BlockChain.cpp
  2. 1
      libethereum/BlockDetails.h
  3. 89
      libethereum/Client.cpp
  4. 6
      libethereum/Client.h

90
libethereum/BlockChain.cpp

@ -25,6 +25,7 @@
#include <gperftools/profiler.h>
#endif
#include <leveldb/db.h>
#include <leveldb/write_batch.h>
#include <boost/timer.hpp>
#include <boost/filesystem.hpp>
#include <test/JsonSpiritHeaders.h>
@ -442,6 +443,11 @@ ImportRoute BlockChain::import(bytes const& _block, OverlayDB const& _db, Import
t.restart();
#endif
ldb::WriteBatch blocksBatch;
ldb::WriteBatch extrasBatch;
h256 newLastBlockHash;
unsigned newLastBlockNumber = 0;
u256 td;
#if ETH_CATCH
try
@ -470,6 +476,7 @@ ImportRoute BlockChain::import(bytes const& _block, OverlayDB const& _db, Import
#if ETH_PARANOIA
checkConsistency();
#endif
// All ok - insert into DB
{
// ensure parent is cached for later addition.
@ -478,19 +485,9 @@ ImportRoute BlockChain::import(bytes const& _block, OverlayDB const& _db, Import
// This is safe in practice since the caches don't get flushed nearly often enough to be
// done here.
details(bi.parentHash);
WriteGuard l(x_details);
m_details[bi.hash()] = BlockDetails((unsigned)pd.number + 1, td, bi.parentHash, {});
m_details[bi.parentHash].children.push_back(bi.hash());
}
{
WriteGuard l(x_logBlooms);
m_logBlooms[bi.hash()] = blb;
}
{
WriteGuard l(x_receipts);
m_receipts[bi.hash()] = br;
}
#if ETH_TIMED_IMPORTS
collation = t.elapsed();
@ -498,15 +495,12 @@ ImportRoute BlockChain::import(bytes const& _block, OverlayDB const& _db, Import
#endif
{
ReadGuard l1(x_blocks);
ReadGuard l2(x_details);
ReadGuard l4(x_receipts);
ReadGuard l5(x_logBlooms);
m_extrasDB->Put(m_writeOptions, toSlice(bi.hash(), ExtraDetails), (ldb::Slice)dev::ref(m_details[bi.hash()].rlp()));
m_extrasDB->Put(m_writeOptions, toSlice(bi.parentHash, ExtraDetails), (ldb::Slice)dev::ref(m_details[bi.parentHash].rlp()));
m_extrasDB->Put(m_writeOptions, toSlice(bi.hash(), ExtraLogBlooms), (ldb::Slice)dev::ref(m_logBlooms[bi.hash()].rlp()));
m_extrasDB->Put(m_writeOptions, toSlice(bi.hash(), ExtraReceipts), (ldb::Slice)dev::ref(m_receipts[bi.hash()].rlp()));
m_blocksDB->Put(m_writeOptions, toSlice(bi.hash()), (ldb::Slice)ref(_block));
extrasBatch.Put(toSlice(bi.hash(), ExtraDetails), (ldb::Slice)dev::ref(BlockDetails((unsigned)pd.number + 1, td, bi.parentHash, {}).rlp()));
extrasBatch.Put(toSlice(bi.parentHash, ExtraDetails), (ldb::Slice)dev::ref(m_details[bi.parentHash].rlp()));
extrasBatch.Put(toSlice(bi.hash(), ExtraLogBlooms), (ldb::Slice)dev::ref(blb.rlp()));
extrasBatch.Put(toSlice(bi.hash(), ExtraReceipts), (ldb::Slice)dev::ref(br.rlp()));
blocksBatch.Put(toSlice(bi.hash()), (ldb::Slice)ref(_block));
}
#if ETH_TIMED_IMPORTS
@ -552,8 +546,11 @@ ImportRoute BlockChain::import(bytes const& _block, OverlayDB const& _db, Import
h256 last = currentHash();
if (td > details(last).totalDifficulty)
{
// don't include bi.hash() in treeRoute, since it's not yet in details DB...
// just tack it on afterwards.
unsigned commonIndex;
tie(route, common, commonIndex) = treeRoute(last, bi.hash());
tie(route, common, commonIndex) = treeRoute(last, bi.parentHash);
route.push_back(bi.hash());
// Most of the time these two will be equal - only when we're doing a chain revert will they not be
if (common != last)
@ -564,20 +561,24 @@ ImportRoute BlockChain::import(bytes const& _block, OverlayDB const& _db, Import
// Go through ret backwards until hash != last.parent and update m_transactionAddresses, m_blockHashes
for (auto i = route.rbegin(); i != route.rend() && *i != common; ++i)
{
auto b = block(*i);
BlockInfo bi(b);
BlockInfo tbi;
if (*i == bi.hash())
tbi = bi;
else
tbi = BlockInfo(block(*i));
// Collate logs into blooms.
h256s alteredBlooms;
{
LogBloom blockBloom = bi.logBloom;
blockBloom.shiftBloom<3>(sha3(bi.coinbaseAddress.ref()));
LogBloom blockBloom = tbi.logBloom;
blockBloom.shiftBloom<3>(sha3(tbi.coinbaseAddress.ref()));
// Pre-memoize everything we need before locking x_blocksBlooms
for (unsigned level = 0, index = (unsigned)bi.number; level < c_bloomIndexLevels; level++, index /= c_bloomIndexSize)
for (unsigned level = 0, index = (unsigned)tbi.number; level < c_bloomIndexLevels; level++, index /= c_bloomIndexSize)
blocksBlooms(chunkId(level, index / c_bloomIndexSize));
WriteGuard l(x_blocksBlooms);
for (unsigned level = 0, index = (unsigned)bi.number; level < c_bloomIndexLevels; level++, index /= c_bloomIndexSize)
for (unsigned level = 0, index = (unsigned)tbi.number; level < c_bloomIndexLevels; level++, index /= c_bloomIndexSize)
{
unsigned i = index / c_bloomIndexSize;
unsigned o = index % c_bloomIndexSize;
@ -588,38 +589,26 @@ ImportRoute BlockChain::import(bytes const& _block, OverlayDB const& _db, Import
// Collate transaction hashes and remember who they were.
h256s newTransactionAddresses;
{
RLP blockRLP(b);
bytes blockBytes;
RLP blockRLP(*i == bi.hash() ? _block : (blockBytes = block(*i)));
TransactionAddress ta;
ta.blockHash = bi.hash();
WriteGuard l(x_transactionAddresses);
ta.blockHash = tbi.hash();
for (ta.index = 0; ta.index < blockRLP[1].itemCount(); ++ta.index)
{
newTransactionAddresses.push_back(sha3(blockRLP[1][ta.index].data()));
m_transactionAddresses[newTransactionAddresses.back()] = ta;
}
}
{
WriteGuard l(x_blockHashes);
m_blockHashes[h256(bi.number)].value = bi.hash();
extrasBatch.Put(toSlice(sha3(blockRLP[1][ta.index].data()), ExtraTransactionAddress), (ldb::Slice)dev::ref(ta.rlp()));
}
// Update database with them.
ReadGuard l1(x_blocksBlooms);
ReadGuard l3(x_blockHashes);
ReadGuard l6(x_transactionAddresses);
for (auto const& h: alteredBlooms)
m_extrasDB->Put(m_writeOptions, toSlice(h, ExtraBlocksBlooms), (ldb::Slice)dev::ref(m_blocksBlooms[h].rlp()));
m_extrasDB->Put(m_writeOptions, toSlice(h256(bi.number), ExtraBlockHash), (ldb::Slice)dev::ref(m_blockHashes[h256(bi.number)].rlp()));
for (auto const& h: newTransactionAddresses)
m_extrasDB->Put(m_writeOptions, toSlice(h, ExtraTransactionAddress), (ldb::Slice)dev::ref(m_transactionAddresses[h].rlp()));
extrasBatch.Put(toSlice(h, ExtraBlocksBlooms), (ldb::Slice)dev::ref(m_blocksBlooms[h].rlp()));
extrasBatch.Put(toSlice(h256(tbi.number), ExtraBlockHash), (ldb::Slice)dev::ref(BlockHash(tbi.hash()).rlp()));
}
// FINALLY! change our best hash.
{
WriteGuard l(x_lastBlockHash);
m_lastBlockHash = bi.hash();
m_lastBlockNumber = (unsigned)bi.number;
m_extrasDB->Put(m_writeOptions, ldb::Slice("best"), ldb::Slice((char const*)&(bi.hash()), 32));
newLastBlockHash = bi.hash();
newLastBlockNumber = (unsigned)bi.number;
extrasBatch.Put(ldb::Slice("best"), ldb::Slice((char const*)&(bi.hash()), 32));
}
clog(BlockChainNote) << " Imported and best" << td << " (#" << bi.number << "). Has" << (details(bi.parentHash).children.size() - 1) << "siblings. Route:" << route;
@ -637,6 +626,15 @@ ImportRoute BlockChain::import(bytes const& _block, OverlayDB const& _db, Import
clog(BlockChainChat) << " Imported but not best (oTD:" << details(last).totalDifficulty << " > TD:" << td << ")";
}
m_blocksDB->Write(m_writeOptions, &blocksBatch);
m_extrasDB->Write(m_writeOptions, &extrasBatch);
ETH_WRITE_GUARDED(x_lastBlockHash)
{
m_lastBlockHash = newLastBlockHash;
m_lastBlockNumber = newLastBlockNumber;
}
#if ETH_TIMED_IMPORTS
checkBest = t.elapsed();
cnote << "Import took:" << total.elapsed();

1
libethereum/BlockDetails.h

@ -92,6 +92,7 @@ struct BlockReceipts
struct BlockHash
{
BlockHash() {}
BlockHash(h256 const& _h): value(_h) {}
BlockHash(RLP const& _r) { value = _r.toHash<h256>(); }
bytes rlp() const { return dev::rlp(value); }

89
libethereum/Client.cpp

@ -36,6 +36,12 @@ using namespace dev;
using namespace dev::eth;
using namespace p2p;
namespace dev
{
struct TimerHelper { TimerHelper(char const* _id): id(_id) {} ~TimerHelper() { cdebug << "Timer" << id << t.elapsed() << "s"; } boost::timer t; char const* id; };
#define DEV_TIMED(S) for (::std::pair<::dev::TimerHelper, bool> __eth_t(#S, true); __eth_t.second; __eth_t.second = false)
}
VersionChecker::VersionChecker(string const& _dbPath):
m_path(_dbPath.size() ? _dbPath : Defaults::dbPath())
{
@ -246,9 +252,13 @@ void Client::startedWorking()
ETH_WRITE_GUARDED(x_preMine)
m_preMine.sync(m_bc);
ETH_WRITE_GUARDED(x_postMine)
ETH_READ_GUARDED(x_preMine)
ETH_READ_GUARDED(x_preMine)
{
ETH_WRITE_GUARDED(x_working)
m_working = m_preMine;
ETH_WRITE_GUARDED(x_postMine)
m_postMine = m_preMine;
}
}
void Client::doneWorking()
@ -257,9 +267,13 @@ void Client::doneWorking()
// TODO: currently it contains keys for *all* blocks. Make it remove old ones.
ETH_WRITE_GUARDED(x_preMine)
m_preMine.sync(m_bc);
ETH_WRITE_GUARDED(x_postMine)
ETH_READ_GUARDED(x_preMine)
ETH_READ_GUARDED(x_preMine)
{
ETH_WRITE_GUARDED(x_working)
m_working = m_preMine;
ETH_WRITE_GUARDED(x_postMine)
m_postMine = m_preMine;
}
}
void Client::killChain()
@ -453,18 +467,20 @@ ProofOfWork::WorkPackage Client::getWork()
bool Client::submitWork(ProofOfWork::Solution const& _solution)
{
bytes newBlock;
{
WriteGuard l(x_postMine);
if (!m_postMine.completeMine<ProofOfWork>(_solution))
DEV_TIMED(working) ETH_WRITE_GUARDED(x_working)
if (!m_working.completeMine<ProofOfWork>(_solution))
return false;
newBlock = m_postMine.blockData();
// OPTIMISE: very inefficient to not utilise the existing OverlayDB in m_postMine that contains all trie changes.
ETH_READ_GUARDED(x_working)
{
DEV_TIMED(post) ETH_WRITE_GUARDED(x_postMine)
m_postMine = m_working;
newBlock = m_working.blockData();
}
// OPTIMISE: very inefficient to not utilise the existing OverlayDB in m_postMine that contains all trie changes.
m_bq.import(&newBlock, m_bc, true);
/*
ImportRoute ir = m_bc.attemptImport(newBlock, m_stateDB);
if (!ir.first.empty())
onChainChanged(ir);*/
return true;
}
@ -489,12 +505,16 @@ void Client::syncTransactionQueue()
h256Set changeds;
TransactionReceipts newPendingReceipts;
ETH_WRITE_GUARDED(x_postMine)
tie(newPendingReceipts, m_syncTransactionQueue) = m_postMine.sync(m_bc, m_tq, *m_gp);
DEV_TIMED(working) ETH_WRITE_GUARDED(x_working)
tie(newPendingReceipts, m_syncTransactionQueue) = m_working.sync(m_bc, m_tq, *m_gp);
if (newPendingReceipts.empty())
return;
ETH_READ_GUARDED(x_working)
DEV_TIMED(post) ETH_WRITE_GUARDED(x_postMine)
m_postMine = m_working;
ETH_READ_GUARDED(x_postMine)
for (size_t i = 0; i < newPendingReceipts.size(); i++)
appendFromNewPending(newPendingReceipts[i], changeds, m_postMine.pending()[i].sha3());
@ -519,7 +539,7 @@ void Client::onChainChanged(ImportRoute const& _ir)
clog(ClientNote) << "Dead block:" << h;
for (auto const& t: m_bc.transactions(h))
{
clog(ClientNote) << "Resubmitting transaction " << Transaction(t, CheckTransaction::None);
clog(ClientNote) << "Resubmitting dead-block transaction " << Transaction(t, CheckTransaction::None);
m_tq.import(t, TransactionQueue::ImportCallback(), IfDropped::Retry);
}
}
@ -545,18 +565,32 @@ void Client::onChainChanged(ImportRoute const& _ir)
// RESTART MINING
// LOCKS REALLY NEEDED?
bool preChanged = false;
ETH_WRITE_GUARDED(x_preMine)
preChanged = m_preMine.sync(m_bc);
State newPreMine;
ETH_READ_GUARDED(x_preMine)
newPreMine = m_preMine;
// TODO: use m_postMine to avoid re-evaluating our own blocks.
preChanged = newPreMine.sync(m_bc);
if (preChanged || m_postMine.address() != m_preMine.address())
{
if (isMining())
cnote << "New block on chain.";
ETH_WRITE_GUARDED(x_postMine)
ETH_READ_GUARDED(x_preMine)
m_postMine = m_preMine;
ETH_WRITE_GUARDED(x_preMine)
m_preMine = newPreMine;
DEV_TIMED(working) ETH_WRITE_GUARDED(x_working)
m_working = newPreMine;
ETH_READ_GUARDED(x_postMine)
for (auto const& t: m_postMine.pending())
{
clog(ClientNote) << "Resubmitting post-mine transaction " << t;
m_tq.import(t.rlp(), TransactionQueue::ImportCallback(), IfDropped::Retry);
}
ETH_READ_GUARDED(x_working) DEV_TIMED(post) ETH_WRITE_GUARDED(x_postMine)
m_postMine = m_working;
changeds.insert(PendingChangedFilter);
onPostStateChanged();
@ -575,9 +609,12 @@ void Client::onPostStateChanged()
cnote << "Post state changed: Restarting mining...";
if (isMining() || remoteActive())
{
DEV_TIMED(working) ETH_WRITE_GUARDED(x_working)
m_working.commitToMine(m_bc);
ETH_READ_GUARDED(x_working)
{
WriteGuard l(x_postMine);
m_postMine.commitToMine(m_bc);
DEV_TIMED(post) ETH_WRITE_GUARDED(x_postMine)
m_postMine = m_working;
m_miningInfo = m_postMine.info();
}
m_farm.setWork(m_miningInfo);
@ -695,7 +732,9 @@ eth::State Client::state(h256 _block) const
eth::State Client::state(unsigned _txi) const
{
return m_postMine.fromPending(_txi);
ETH_READ_GUARDED(x_postMine)
return m_postMine.fromPending(_txi);
assert(false);
}
void Client::flushTransactions()

6
libethereum/Client.h

@ -279,10 +279,12 @@ private:
std::shared_ptr<GasPricer> m_gp; ///< The gas pricer.
OverlayDB m_stateDB; ///< Acts as the central point for the state database, so multiple States can share it.
mutable SharedMutex x_preMine; ///< Lock on the OverlayDB and other attributes of m_preMine.
mutable SharedMutex x_preMine; ///< Lock on m_preMine.
State m_preMine; ///< The present state of the client.
mutable SharedMutex x_postMine; ///< Lock on the OverlayDB and other attributes of m_postMine.
mutable SharedMutex x_postMine; ///< Lock on m_postMine.
State m_postMine; ///< The state of the client which we're mining (i.e. it'll have all the rewards added).
mutable SharedMutex x_working; ///< Lock on m_working.
State m_working; ///< The state of the client which we're mining (i.e. it'll have all the rewards added), while we're actually working on it.
BlockInfo m_miningInfo; ///< The header we're attempting to mine on (derived from m_postMine).
bool remoteActive() const; ///< Is there an active and valid remote worker?
bool m_remoteWorking = false; ///< Has the remote worker recently been reset?

Loading…
Cancel
Save