@ -117,7 +117,7 @@ void BasicGasPricer::update(BlockChain const& _bc)
}
}
Client : : Client ( p2p : : Host * _extNet , std : : string const & _dbPath , WithExisting _forceAction , u256 _networkId , int _miners ) :
Client : : Client ( p2p : : Host * _extNet , std : : string const & _dbPath , WithExisting _forceAction , u256 _networkId ) :
Worker ( " eth " ) ,
m_vc ( _dbPath ) ,
m_bc ( _dbPath , max ( m_vc . action ( ) , _forceAction ) , [ ] ( unsigned d , unsigned t ) { cerr < < " REVISING BLOCKCHAIN: Processed " < < d < < " of " < < t < < " ... \r " ; } ) ,
@ -126,14 +126,14 @@ Client::Client(p2p::Host* _extNet, std::string const& _dbPath, WithExisting _for
m_preMine ( m_stateDB , BaseState : : CanonGenesis ) ,
m_postMine ( m_stateDB )
{
m_tqReady = m_tq - > onReady ( [ = ] ( ) { this - > onTransactionQueueReady ( ) ; } ) ; // TODO: should read m_tq->onReady(thisThread, syncTransactionQueue);
m_bqReady = m_bq - > onReady ( [ = ] ( ) { this - > onBlockQueueReady ( ) ; } ) ; // TODO: should read m_bq->onReady(thisThread, syncBlockQueue);
m_farm - > onSolutionFound ( [ = ] ( ProofOfWork : : Solution const & s ) { return this - > submitWork ( s ) ; } ) ;
m_gp - > update ( m_bc ) ;
m_host = _extNet - > registerCapability ( new EthereumHost ( m_bc , m_tq , m_bq , _networkId ) ) ;
if ( _miners > - 1 )
setMiningThreads ( _miners ) ;
else
setMiningThreads ( ) ;
if ( _dbPath . size ( ) )
Defaults : : setDBPath ( _dbPath ) ;
m_vc . setOk ( ) ;
@ -142,7 +142,7 @@ Client::Client(p2p::Host* _extNet, std::string const& _dbPath, WithExisting _for
startWorking ( ) ;
}
Client : : Client ( p2p : : Host * _extNet , std : : shared_ptr < GasPricer > _gp , std : : string const & _dbPath , WithExisting _forceAction , u256 _networkId , int _miners ) :
Client : : Client ( p2p : : Host * _extNet , std : : shared_ptr < GasPricer > _gp , std : : string const & _dbPath , WithExisting _forceAction , u256 _networkId ) :
Worker ( " eth " ) ,
m_vc ( _dbPath ) ,
m_bc ( _dbPath , max ( m_vc . action ( ) , _forceAction ) , [ ] ( unsigned d , unsigned t ) { cerr < < " REVISING BLOCKCHAIN: Processed " < < d < < " of " < < t < < " ... \r " ; } ) ,
@ -151,14 +151,14 @@ Client::Client(p2p::Host* _extNet, std::shared_ptr<GasPricer> _gp, std::string c
m_preMine ( m_stateDB ) ,
m_postMine ( m_stateDB )
{
m_tq - > onReady ( [ = ] ( ) { this - > onTransactionQueueReady ( ) ; } ) ;
m_bq - > onReady ( [ = ] ( ) { this - > onBlockQueueReady ( ) ; } ) ;
m_farm - > onSolutionFound ( [ = ] ( ProofOfWork : : Solution const & s ) { return this - > submitWork ( s ) ; } ) ;
m_gp - > update ( m_bc ) ;
m_host = _extNet - > registerCapability ( new EthereumHost ( m_bc , m_tq , m_bq , _networkId ) ) ;
if ( _miners > - 1 )
setMiningThreads ( _miners ) ;
else
setMiningThreads ( ) ;
if ( _dbPath . size ( ) )
Defaults : : setDBPath ( _dbPath ) ;
m_vc . setOk ( ) ;
@ -229,8 +229,6 @@ void Client::killChain()
doWork ( ) ;
setMiningThreads ( 0 ) ;
startWorking ( ) ;
if ( wasMining )
startMining ( ) ;
@ -271,26 +269,6 @@ static string filtersToString(T const& _fs)
return ret . str ( ) ;
}
void Client : : noteChanged ( h256Set const & _filters )
{
Guard l ( x_filtersWatches ) ;
if ( _filters . size ( ) )
cnote < < " noteChanged( " < < filtersToString ( _filters ) < < " ) " ;
// accrue all changes left in each filter into the watches.
for ( auto & w : m_watches )
if ( _filters . count ( w . second . id ) )
{
cwatch < < " !!! " < < w . first < < ( m_filters . count ( w . second . id ) ? w . second . id . abridged ( ) : w . second . id = = PendingChangedFilter ? " pending " : w . second . id = = ChainChangedFilter ? " chain " : " ??? " ) ;
if ( m_filters . count ( w . second . id ) ) // Normal filtering watch
w . second . changes + = m_filters . at ( w . second . id ) . changes ;
else // Special ('pending'/'latest') watch
w . second . changes . push_back ( LocalisedLogEntry ( SpecialLogEntry , 0 ) ) ;
}
// clear the filters now.
for ( auto & i : m_filters )
i . second . changes . clear ( ) ;
}
void Client : : appendFromNewPending ( TransactionReceipt const & _receipt , h256Set & io_changed , h256 _transactionHash )
{
Guard l ( x_filtersWatches ) ;
@ -342,22 +320,6 @@ void Client::setForceMining(bool _enable)
m . noteStateChange ( ) ;
}
void Client : : setMiningThreads ( unsigned _threads )
{
stopMining ( ) ;
auto t = _threads ? _threads : thread : : hardware_concurrency ( ) ;
# if ETH_ETHASHCL || !ETH_TRUE
if ( m_turboMining )
t = 1 ;
# endif
WriteGuard l ( x_localMiners ) ;
m_localMiners . clear ( ) ;
m_localMiners . resize ( t ) ;
unsigned i = 0 ;
for ( auto & m : m_localMiners )
m . setup ( this , i + + ) ;
}
MineProgress Client : : miningProgress ( ) const
{
MineProgress ret ;
@ -452,160 +414,161 @@ pair<h256, u256> Client::getWork()
return make_pair ( m_remoteMiner . workHash ( ) , m_remoteMiner . difficulty ( ) ) ;
}
bool Client : : submitWork ( ProofOfWork : : Solution const & _proof )
bool Client : : submitWork ( ProofOfWork : : Solution const & _solution )
{
Guard l ( x_remoteMiner ) ;
return m_remoteMiner . submitWork ( _proof ) ;
bytes newBlock ;
{
WriteGuard l ( x_stateDB ) ;
if ( ! m_postMine . completeMine ( _solution ) )
return false ;
newBlock = m_postMine . blockData ( ) ;
}
ImportRoute ir = m_bc . attemptImport ( newBlock , m_stateDB ) ;
if ( ! ir . first . empty ( ) )
onChainChanged ( ir ) ;
return true ;
}
void Client : : doWork ( )
void Client : : syncBlockQueue ( )
{
// TODO: Use condition variable rather than polling.
bool stillGotWork = false ;
cworkin < < " WORK " ;
h256Set changeds ;
auto maintainMiner = [ & ] ( OldMiner & m )
{
if ( m . isComplete ( ) )
{
// TODO: enable a short-circuit option since we mined it. will need to get the end state from the miner.
auto lm = dynamic_cast < LocalMiner * > ( & m ) ;
h256s hs ;
h256 c ;
if ( false & & lm & & ! m_verifyOwnBlocks )
{
// TODO: implement
//m_bc.attemptImport(m_blockData(), m_stateDB, lm->state());
// TODO: derive hs from lm->state()
}
else
{
cwork < < " CHAIN <== postSTATE " ;
WriteGuard l ( x_stateDB ) ;
tie ( hs , c ) = m_bc . attemptImport ( m . blockData ( ) , m_stateDB ) ;
}
if ( hs . size ( ) )
{
for ( auto const & h : hs )
if ( h ! = c )
appendFromNewBlock ( h , changeds ) ;
changeds . insert ( ChainChangedFilter ) ;
}
for ( auto & m : m_localMiners )
m . noteStateChange ( ) ;
}
} ;
{
ReadGuard l ( x_localMiners ) ;
for ( auto & m : m_localMiners )
maintainMiner ( m ) ;
}
{
Guard l ( x_remoteMiner ) ;
maintainMiner ( m_remoteMiner ) ;
}
ImportResult ir ;
// Synchronise state to block chain.
// This should remove any transactions on our queue that are included within our state.
// It also guarantees that the state reflects the longest (valid!) chain on the block chain.
// This might mean reverting to an earlier state and replaying some blocks, or, (worst-case:
// if there are no checkpoints before our fork) reverting to the genesis block and replaying
// all blocks.
// Resynchronise state with block chain & trans
bool resyncStateNeeded = false ;
{
WriteGuard l ( x_stateDB ) ;
cwork < < " BQ ==> CHAIN ==> STATE " ;
OverlayDB db = m_stateDB ;
x_stateDB . unlock ( ) ;
h256s fresh ;
h256s dead ;
bool sgw ;
tie ( fresh , dead , sgw ) = m_bc . sync ( m_bq , db , 100 ) ;
// insert transactions that we are declaring the dead part of the chain
for ( auto const & h : dead )
{
clog ( ClientNote ) < < " Dead block: " < < h . abridged ( ) ;
for ( auto const & t : m_bc . transactions ( h ) )
{
clog ( ClientNote ) < < " Resubmitting transaction " < < Transaction ( t , CheckTransaction : : None ) ;
m_tq . import ( t ) ;
}
}
// remove transactions from m_tq nicely rather than relying on out of date nonce later on.
for ( auto const & h : fresh )
{
clog ( ClientChat ) < < " Live block: " < < h . abridged ( ) ;
for ( auto const & th : m_bc . transactionHashes ( h ) )
{
clog ( ClientNote ) < < " Safely dropping transaction " < < th . abridged ( ) ;
m_tq . drop ( th ) ;
}
}
tie ( ir . first , ir . second , m_syncBlockQueue ) = m_bc . sync ( m_bq , db , 100 ) ;
stillGotWork = stillGotWork | sgw ;
if ( ! fresh . empty ( ) )
{
for ( auto i : fresh )
appendFromNewBlock ( i , changeds ) ;
changeds . insert ( ChainChangedFilter ) ;
}
x_stateDB . lock ( ) ;
if ( fresh . size ( ) )
m_stateDB = db ;
}
if ( ! ir . first . empty ( ) )
onChainChanged ( ir ) ;
return true ;
}
void Client : : syncTransactionQueue ( )
{
// returns TransactionReceipts, once for each transaction.
cwork < < " postSTATE <== TQ " ;
TransactionReceipts newPendingReceipts = m_postMine . sync ( m_bc , m_tq , * m_gp ) ;
if ( newPendingReceipts . size ( ) )
{
for ( size_t i = 0 ; i < newPendingReceipts . size ( ) ; i + + )
appendFromNewPending ( newPendingReceipts [ i ] , changeds , m_postMine . pending ( ) [ i ] . sha3 ( ) ) ;
cwork < < " preSTATE <== CHAIN " ;
if ( m_preMine . sync ( m_bc ) | | m_postMine . address ( ) ! = m_preMine . address ( ) )
changeds . insert ( PendingChangedFilter ) ;
if ( isMining ( ) )
cnote < < " Additional transaction ready: Restarting mining operation. " ;
resyncStateNeeded = true ;
if ( auto h = m_host . lock ( ) )
h - > noteNewTransactions ( ) ;
}
}
void Client : : onChainChanged ( ImportRoute const & _ir )
{
// insert transactions that we are declaring the dead part of the chain
for ( auto const & h : _ir . second )
{
clog ( ClientNote ) < < " Dead block: " < < h . abridged ( ) ;
for ( auto const & t : m_bc . transactions ( h ) )
{
if ( isMining ( ) )
cnote < < " New block on chain: Restarting mining operation. " ;
m_postMine = m_preMine ;
resyncStateNeeded = true ;
changeds . insert ( PendingChangedFilter ) ;
// TODO: Move transactions pending from m_postMine back to transaction queue.
clog ( ClientNote ) < < " Resubmitting transaction " < < Transaction ( t , CheckTransaction : : None ) ;
m_tq . import ( t ) ;
}
}
// returns TransactionReceipts, once for each transaction.
cwork < < " postSTATE <== TQ " ;
TransactionReceipts newPendingReceipts = m_postMine . sync ( m_bc , m_tq , * m_gp ) ;
if ( newPendingReceipts . size ( ) )
// remove transactions from m_tq nicely rather than relying on out of date nonce later on.
for ( auto const & h : _ir . first )
{
clog ( ClientChat ) < < " Live block: " < < h . abridged ( ) ;
for ( auto const & th : m_bc . transactionHashes ( h ) )
{
for ( size_t i = 0 ; i < newPendingReceipts . size ( ) ; i + + )
appendFromNewPending ( newPendingReceipts [ i ] , changeds , m_postMine . pending ( ) [ i ] . sha3 ( ) ) ;
changeds . insert ( PendingChangedFilter ) ;
if ( isMining ( ) )
cnote < < " Additional transaction ready: Restarting mining operation. " ;
resyncStateNeeded = true ;
if ( auto h = m_host . lock ( ) )
h - > noteNewTransactions ( ) ;
clog ( ClientNote ) < < " Safely dropping transaction " < < th . abridged ( ) ;
m_tq . drop ( th ) ;
}
}
if ( ! changeds . empty ( ) )
if ( auto h = m_host . lock ( ) )
h - > noteNewBlocks ( ) ;
if ( auto h = m_host . lock ( ) )
h - > noteNewBlocks ( ) ;
h256Set changeds ;
for ( auto const & h : _ir . first )
if ( h ! = _ir . second )
appendFromNewBlock ( h , changeds ) ;
changeds . insert ( ChainChangedFilter ) ;
noteChanged ( changeds ) ;
// RESTART MINING
if ( resyncStateNeeded )
// LOCKS NEEDED?
Guard l ( x_stateDB ) ;
cwork < < " preSTATE <== CHAIN " ;
if ( m_preMine . sync ( m_bc ) | | m_postMine . address ( ) ! = m_preMine . address ( ) )
{
ReadGuard l ( x_localMiners ) ;
for ( auto & m : m_localMiners )
m . noteStateChange ( ) ;
if ( isMining ( ) )
cnote < < " New block on chain: Restarting mining operation. " ;
m_postMine = m_preMine ;
resyncStateNeeded = true ;
changeds . insert ( PendingChangedFilter ) ;
m_postMine . commitToMine ( m_bc ) ;
m_farm . setWork ( m_postMine . info ( ) ) ;
}
}
cwork < < " noteChanged " < < changeds . size ( ) < < " items " ;
noteChanged ( changeds ) ;
cworkout < < " WORK " ;
void Client : : noteChanged ( h256Set const & _filters )
{
Guard l ( x_filtersWatches ) ;
if ( _filters . size ( ) )
cnote < < " noteChanged( " < < filtersToString ( _filters ) < < " ) " ;
// accrue all changes left in each filter into the watches.
for ( auto & w : m_watches )
if ( _filters . count ( w . second . id ) )
{
cwatch < < " !!! " < < w . first < < ( m_filters . count ( w . second . id ) ? w . second . id . abridged ( ) : w . second . id = = PendingChangedFilter ? " pending " : w . second . id = = ChainChangedFilter ? " chain " : " ??? " ) ;
if ( m_filters . count ( w . second . id ) ) // Normal filtering watch
w . second . changes + = m_filters . at ( w . second . id ) . changes ;
else // Special ('pending'/'latest') watch
w . second . changes . push_back ( LocalisedLogEntry ( SpecialLogEntry , 0 ) ) ;
}
// clear the filters now.
for ( auto & i : m_filters )
i . second . changes . clear ( ) ;
}
void Client : : doWork ( )
{
// TODO: Use condition variable rather than this rubbish.
Guard l ( x_fakeSignalSystemState ) ;
if ( m_syncTransactionQueue )
{
m_syncTransactionQueue = false ;
syncTransactionQueue ( ) ;
}
if ( m_syncBlockQueue )
{
m_syncBlockQueue = false ;
syncBlockQueue ( ) ;
}
checkWatchGarbage ( ) ;
if ( ! stillGotWork )
this_thread : : sleep_for ( chrono : : milliseconds ( 100 ) ) ;
this_thread : : sleep_for ( chrono : : milliseconds ( 20 ) ) ;
}
void Client : : checkWatchGarbage ( )
{
if ( chrono : : system_clock : : now ( ) - m_lastGarbageCollection > chrono : : seconds ( 5 ) )
{
// watches garbage collection