Browse Source

Separate out miner thread in preparation for own class.

cl-refactor
Gav Wood 11 years ago
parent
commit
2a85f95542
  1. 147
      libethereum/Client.cpp
  2. 24
      libethereum/Client.h

147
libethereum/Client.cpp

@ -74,7 +74,21 @@ Client::Client(std::string const& _clientVersion, Address _us, std::string const
if (_dbPath.size())
Defaults::setDBPath(_dbPath);
m_vc.setOk();
work(true);
work();
}
Client::~Client()
{
if (m_work)
{
if (m_workState.load(std::memory_order_acquire) == Active)
m_workState.store(Deleting, std::memory_order_release);
while (m_workState.load(std::memory_order_acquire) != Deleted)
this_thread::sleep_for(chrono::milliseconds(10));
m_work->join();
m_work.reset(nullptr);
}
stopNetwork();
}
void Client::ensureWorking()
@ -98,23 +112,9 @@ void Client::ensureWorking()
}));
}
Client::~Client()
{
if (m_work)
{
if (m_workState.load(std::memory_order_acquire) == Active)
m_workState.store(Deleting, std::memory_order_release);
while (m_workState.load(std::memory_order_acquire) != Deleted)
this_thread::sleep_for(chrono::milliseconds(10));
m_work->join();
m_work.reset(nullptr);
}
stopNetwork();
}
void Client::flushTransactions()
{
work(true);
work();
}
void Client::clearPending()
@ -127,6 +127,7 @@ void Client::clearPending()
appendFromNewPending(m_postMine.bloom(i), changeds);
changeds.insert(PendingChangedFilter);
m_postMine = m_preMine;
m_miningStatus = Preparing;
noteChanged(changeds);
}
@ -302,15 +303,33 @@ void Client::connect(std::string const& _seedHost, unsigned short _port)
void Client::startMining()
{
ensureWorking();
static const char* c_threadName = "miner";
m_doMine = true;
m_restartMining = true;
if (!m_workMine)
m_workMine.reset(new thread([&]()
{
setThreadName(c_threadName);
m_workMineState.store(Active, std::memory_order_release);
m_miningStatus = Preparing;
while (m_workMineState.load(std::memory_order_acquire) != Deleting)
workMine();
m_workMineState.store(Deleted, std::memory_order_release);
}));
ensureWorking();
}
void Client::stopMining()
{
m_doMine = false;
if (m_workMine)
{
if (m_workMineState.load(std::memory_order_acquire) == Active)
m_workMineState.store(Deleting, std::memory_order_release);
while (m_workMineState.load(std::memory_order_acquire) != Deleted)
this_thread::sleep_for(chrono::milliseconds(10));
m_workMine->join();
}
m_workMine.reset(nullptr);
}
void Client::transact(Secret _secret, u256 _value, Address _dest, bytes const& _data, u256 _gas, u256 _gasPrice)
@ -404,83 +423,94 @@ void Client::workNet()
this_thread::sleep_for(chrono::milliseconds(1));
}
void Client::work(bool _justQueue)
void Client::workMine()
{
cworkin << "WORK";
h256Set changeds;
// Do some mining.
if (!_justQueue && (m_pendingCount || m_forceMining))
if ((m_pendingCount || m_forceMining) && m_miningStatus != Mined)
{
// TODO: Separate "Miner" object.
if (m_doMine)
if (m_miningStatus == Preparing)
{
if (m_restartMining)
m_miningStatus = Mining;
{
ReadGuard l(x_stateDB);
m_mineState = m_postMine;
}
{
Guard l(x_mineProgress);
m_mineProgress.best = (double)-1;
m_mineProgress.hashes = 0;
m_mineProgress.ms = 0;
WriteGuard l(x_stateDB);
}
if (m_paranoia)
{
if (m_postMine.amIJustParanoid(m_bc))
if (m_mineState.amIJustParanoid(m_bc))
{
cnote << "I'm just paranoid. Block is fine.";
m_postMine.commitToMine(m_bc);
m_mineState.commitToMine(m_bc);
}
else
{
cwarn << "I'm not just paranoid. Cannot mine. Please file a bug report.";
m_doMine = false;
}
}
else
m_postMine.commitToMine(m_bc);
}
m_restartMining = false;
m_mineState.commitToMine(m_bc);
}
if (m_doMine)
{
cwork << "MINE";
// Mine for a while.
MineInfo mineInfo = m_postMine.mine(100, m_turboMining);
MineInfo mineInfo = m_mineState.mine(100, m_turboMining);
{
Guard l(x_mineProgress);
m_mineProgress.best = min(m_mineProgress.best, mineInfo.best);
m_mineProgress.current = mineInfo.best;
m_mineProgress.requirement = mineInfo.requirement;
m_mineProgress.ms += 100;
m_mineProgress.hashes += mineInfo.hashes;
WriteGuard l(x_stateDB);
m_mineHistory.push_back(mineInfo);
}
if (mineInfo.completed)
{
// Import block.
cwork << "COMPLETE MINE";
m_postMine.completeMine();
cwork << "CHAIN <== postSTATE";
h256s hs = m_bc.attemptImport(m_postMine.blockData(), m_stateDB);
if (hs.size())
{
for (auto h: hs)
appendFromNewBlock(h, changeds);
changeds.insert(ChainChangedFilter);
//changeds.insert(PendingChangedFilter); // if we mined the new block, then we've probably reset the pending transactions.
// Must lock stateDB here since we're actually pushing out to the database.
WriteGuard l(x_stateDB);
cwork << "COMPLETE MINE";
m_mineState.completeMine();
}
m_miningStatus = Mined;
}
}
else
{
cwork << "SLEEP";
this_thread::sleep_for(chrono::milliseconds(100));
}
}
else
}
void Client::work()
{
cworkin << "WORK";
h256Set changeds;
if (m_miningStatus == Mined)
{
cwork << "SLEEP";
this_thread::sleep_for(chrono::milliseconds(100));
cwork << "CHAIN <== postSTATE";
h256s hs = m_bc.attemptImport(m_mineState.blockData(), m_stateDB);
if (hs.size())
{
for (auto h: hs)
appendFromNewBlock(h, changeds);
changeds.insert(ChainChangedFilter);
//changeds.insert(PendingChangedFilter); // if we mined the new block, then we've probably reset the pending transactions.
}
m_miningStatus = Preparing;
}
// Synchronise state to block chain.
@ -507,13 +537,14 @@ void Client::work(bool _justQueue)
if (newBlocks.size())
m_stateDB = db;
bool rsm = false;
cwork << "preSTATE <== CHAIN";
if (m_preMine.sync(m_bc) || m_postMine.address() != m_preMine.address())
{
if (m_doMine)
cnote << "New block on chain: Restarting mining operation.";
m_restartMining = true; // need to re-commit to mine.
m_postMine = m_preMine;
rsm = true;
changeds.insert(PendingChangedFilter);
}
@ -526,11 +557,13 @@ void Client::work(bool _justQueue)
appendFromNewPending(i, changeds);
changeds.insert(PendingChangedFilter);
if (m_doMine)
cnote << "Additional transaction ready: Restarting mining operation.";
m_restartMining = true;
rsm = true;
}
m_pendingCount = m_postMine.pending().size();
if (rsm)
m_miningStatus = Preparing;
}
cwork << "noteChanged" << changeds.size() << "items";

24
libethereum/Client.h

@ -286,17 +286,19 @@ public:
/// Get the coinbase address.
Address address() const { return m_preMine.address(); }
/// Start mining.
/// NOT thread-safe
void startMining();
/// Stop mining.
/// NOT thread-safe
void stopMining();
/// Are we mining now?
bool isMining() { return m_doMine; }
bool isMining() { return !!m_workMine; }
/// Register a callback for information concerning mining.
/// This callback will be in an arbitrary thread, blocking progress. JUST COPY THE DATA AND GET OUT.
/// Check the progress of the mining.
MineProgress miningProgress() const { return m_mineProgress; }
MineProgress miningProgress() const { Guard l(x_mineProgress); return m_mineProgress; }
/// Get and clear the mining history.
std::list<MineInfo> miningHistory() { auto ret = m_mineHistory; m_mineHistory.clear(); return ret; }
std::list<MineInfo> miningHistory() { Guard l(x_mineProgress); auto ret = m_mineHistory; m_mineHistory.clear(); return ret; }
bool forceMining() const { return m_forceMining; }
void setForceMining(bool _enable) { m_forceMining = _enable; }
@ -313,11 +315,14 @@ private:
/// Do some work. Handles blockchain maintenance and mining.
/// @param _justQueue If true will only processing the transaction queues.
void work(bool _justQueue = false);
void work();
/// Do some work on the network.
void workNet();
/// Do some work on the mining.
void workMine();
/// Collate the changed filters for the bloom filter of the given pending transaction.
/// Insert any filters that are activated into @a o_changed.
void appendFromNewPending(h256 _pendingTransactionBloom, h256Set& o_changed) const;
@ -341,7 +346,8 @@ private:
BlockChain m_bc; ///< Maintains block database.
TransactionQueue m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain.
BlockQueue m_bq; ///< Maintains a list of incoming blocks not yet on the blockchain (to be imported).
mutable boost::shared_mutex x_stateDB; // TODO: remove in favour of copying m_stateDB as required and thread-safing/copying State. Have a good think about what state objects there should be. Probably want 4 (pre, post, mining, user-visible).
// TODO: remove in favour of copying m_stateDB as required and thread-safing/copying State. Have a good think about what state objects there should be. Probably want 4 (pre, post, mining, user-visible).
mutable boost::shared_mutex x_stateDB; ///< Lock on the state DB, effectively a lock on m_postMine.
OverlayDB m_stateDB; ///< Acts as the central point for the state database, so multiple States can share it.
State m_preMine; ///< The present state of the client.
State m_postMine; ///< The state of the client which we're mining (i.e. it'll have all the rewards added).
@ -354,13 +360,17 @@ private:
std::unique_ptr<std::thread> m_work; ///< The work thread.
std::atomic<ClientWorkState> m_workState;
std::unique_ptr<std::thread> m_workMine;///< The work thread.
std::atomic<ClientWorkState> m_workMineState;
enum MiningStatus { Preparing, Mining, Mined };
MiningStatus m_miningStatus = Preparing;///< TODO: consider mutex/atomic variable.
State m_mineState; ///< The state on which we are mining, generally equivalent to m_postMine.
bool m_paranoia = false;
bool m_doMine = false; ///< Are we supposed to be mining?
bool m_turboMining = false; ///< Don't squander all of our time mining actually just sleeping.
bool m_forceMining = false; ///< Mine even when there are no transactions pending?
mutable std::mutex x_mineProgress; ///< Lock for the mining progress & history.
MineProgress m_mineProgress;
std::list<MineInfo> m_mineHistory;
mutable bool m_restartMining = false;
mutable unsigned m_pendingCount = 0;
mutable std::mutex m_filterLock;

Loading…
Cancel
Save