From 066fc1811618bbe3c6e8748f2b8ff5ef12c26bae Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Tue, 16 Sep 2014 07:53:34 -0400 Subject: [PATCH] Move out worker thread stuff into other class. p2p::Host is worker rather than WebThree. Client is single worker. EthereumHost works. --- exp/main.cpp | 5 +- libdevcore/Worker.cpp | 63 +++++++++++++++++++++++ libdevcore/Worker.h | 49 ++++++++++++++++++ libethereum/Client.cpp | 97 ++++++++++-------------------------- libethereum/Client.h | 22 +++----- libethereum/EthereumHost.cpp | 36 +++++++------ libethereum/EthereumHost.h | 18 ++++--- libethereum/EthereumPeer.cpp | 28 +++++------ libp2p/Host.cpp | 15 +++++- libp2p/Host.h | 16 +++--- libp2p/HostCapability.h | 3 ++ libwebthree/WebThree.cpp | 54 -------------------- libwebthree/WebThree.h | 9 ++-- 13 files changed, 220 insertions(+), 195 deletions(-) create mode 100644 libdevcore/Worker.cpp create mode 100644 libdevcore/Worker.h diff --git a/exp/main.cpp b/exp/main.cpp index 9e9971852..77d4aa00f 100644 --- a/exp/main.cpp +++ b/exp/main.cpp @@ -326,16 +326,17 @@ int main(int argc, char** argv) ph.registerCapability(new WhisperHost()); auto wh = ph.cap(); + ph.start(); + if (!remoteHost.empty()) ph.connect(remoteHost, remotePort); /// Only interested in the packet if the lowest bit is 1 auto w = wh->installWatch(MessageFilter(std::vector >({{fromHex("0000000000000000000000000000000000000000000000000000000000000001"), fromHex("0000000000000000000000000000000000000000000000000000000000000001")}}))); + for (int i = 0; ; ++i) { - this_thread::sleep_for(chrono::milliseconds(1000)); - ph.process(); wh->sendRaw(h256(u256(i * i)).asBytes(), h256(u256(i)).asBytes(), 1000); for (auto i: wh->checkWatch(w)) cnote << "New message:" << (u256)h256(wh->message(i).payload); diff --git a/libdevcore/Worker.cpp b/libdevcore/Worker.cpp new file mode 100644 index 000000000..fe0a4fe92 --- /dev/null +++ b/libdevcore/Worker.cpp @@ -0,0 +1,63 @@ +/* + This file is part of cpp-ethereum. + + cpp-ethereum is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + cpp-ethereum is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with cpp-ethereum. If not, see . +*/ +/** @file Worker.cpp + * @author Gav Wood + * @date 2014 + */ + +#include "Worker.h" + +#include +#include +#include "Log.h" +using namespace std; +using namespace dev; + +void Worker::startWorking() +{ + cdebug << "startWorking for thread" << m_name; + Guard l(x_work); + if (m_work) + return; + cdebug << "Spawning" << m_name; + m_stop = false; + m_work.reset(new thread([&]() + { + setThreadName(m_name.c_str()); + while (!m_stop) + { + this_thread::sleep_for(chrono::milliseconds(1)); + doWork(); + } + cdebug << "Finishing up worker thread"; + doneWorking(); + })); +} + +void Worker::stopWorking() +{ + cdebug << "stopWorking for thread" << m_name; + Guard l(x_work); + if (!m_work) + return; + cdebug << "Stopping" << m_name; + m_stop = true; + m_work->join(); + m_work.reset(); + cdebug << "Stopped" << m_name; +} + diff --git a/libdevcore/Worker.h b/libdevcore/Worker.h new file mode 100644 index 000000000..2d07b5592 --- /dev/null +++ b/libdevcore/Worker.h @@ -0,0 +1,49 @@ +/* + This file is part of cpp-ethereum. + + cpp-ethereum is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + cpp-ethereum is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with cpp-ethereum. If not, see . +*/ +/** @file Worker.h + * @author Gav Wood + * @date 2014 + */ + +#pragma once + +#include +#include +#include "Guards.h" + +namespace dev +{ + +class Worker +{ +protected: + Worker(std::string const& _name): m_name(_name) {} + virtual ~Worker() { stopWorking(); } + void startWorking(); + void stopWorking(); + bool isWorking() const { Guard l(x_work); return !!m_work; } + virtual void doWork() = 0; + virtual void doneWorking() {} + +private: + mutable Mutex x_work; ///< Lock for the network existance. + std::unique_ptr m_work; ///< The network thread. + bool m_stop = false; + std::string m_name; +}; + +} diff --git a/libethereum/Client.cpp b/libethereum/Client.cpp index 9779acfbe..53222f343 100644 --- a/libethereum/Client.cpp +++ b/libethereum/Client.cpp @@ -54,79 +54,47 @@ void VersionChecker::setOk() } Client::Client(p2p::Host* _extNet, std::string const& _dbPath, bool _forceClean, u256 _networkId): + Worker("eth"), m_vc(_dbPath), m_bc(_dbPath, !m_vc.ok() || _forceClean), m_stateDB(State::openDB(_dbPath, !m_vc.ok() || _forceClean)), m_preMine(Address(), m_stateDB), m_postMine(Address(), m_stateDB) { - m_extHost = _extNet->registerCapability(new EthereumHost(m_bc, _networkId)); + m_host = _extNet->registerCapability(new EthereumHost(m_bc, m_tq, m_bq, _networkId)); // setMiningThreads(); if (_dbPath.size()) Defaults::setDBPath(_dbPath); m_vc.setOk(); - work(); + doWork(); - static const char* c_threadName = "ethsync"; - m_workNet.reset(new thread([&]() - { - setThreadName(c_threadName); - m_workNetState.store(Active, std::memory_order_release); - while (m_workNetState.load(std::memory_order_acquire) != Deleting) - workNet(); - m_workNetState.store(Deleted, std::memory_order_release); - })); - - ensureWorking(); + startWorking(); } Client::~Client() { - if (m_work) - { - if (m_workState.load(std::memory_order_acquire) == Active) - m_workState.store(Deleting, std::memory_order_release); - while (m_workState.load(std::memory_order_acquire) != Deleted) - this_thread::sleep_for(chrono::milliseconds(10)); - m_work->join(); - m_work.reset(nullptr); - } - if (m_workNet) - { - if (m_workNetState.load(std::memory_order_acquire) == Active) - m_workNetState.store(Deleting, std::memory_order_release); - while (m_workNetState.load(std::memory_order_acquire) != Deleted) - this_thread::sleep_for(chrono::milliseconds(10)); - m_workNet->join(); - m_workNet.reset(nullptr); - } + stopWorking(); } -void Client::ensureWorking() +void Client::setNetworkId(u256 _n) { - static const char* c_threadName = "eth"; + if (auto h = m_host.lock()) + h->setNetworkId(_n); +} - if (!m_work) - m_work.reset(new thread([&]() - { - setThreadName(c_threadName); - m_workState.store(Active, std::memory_order_release); - while (m_workState.load(std::memory_order_acquire) != Deleting) - work(); - m_workState.store(Deleted, std::memory_order_release); - - // Synchronise the state according to the head of the block chain. - // TODO: currently it contains keys for *all* blocks. Make it remove old ones. - WriteGuard l(x_stateDB); - m_preMine.sync(m_bc); - m_postMine = m_preMine; - })); +void Client::doneWorking() +{ + // Synchronise the state according to the head of the block chain. + // TODO: currently it contains keys for *all* blocks. Make it remove old ones. + WriteGuard l(x_stateDB); + m_preMine.sync(m_bc); + m_postMine = m_preMine; } void Client::flushTransactions() { - work(); + doWork(); } void Client::killChain() @@ -223,12 +191,6 @@ void Client::appendFromNewBlock(h256 _block, h256Set& o_changed) const o_changed.insert(i.first); } -void Client::setNetworkId(u256 _n) -{ - if (auto h = m_extHost.lock()) - h->setNetworkId(_n); -} - void Client::setMiningThreads(unsigned _threads) { stopMining(); @@ -295,7 +257,7 @@ void Client::setupState(State& _s) void Client::transact(Secret _secret, u256 _value, Address _dest, bytes const& _data, u256 _gas, u256 _gasPrice) { - ensureWorking(); + startWorking(); Transaction t; // cdebug << "Nonce at " << toAddress(_secret) << " pre:" << m_preMine.transactionsFrom(toAddress(_secret)) << " post:" << m_postMine.transactionsFrom(toAddress(_secret)); @@ -337,7 +299,7 @@ bytes Client::call(Secret _secret, u256 _value, Address _dest, bytes const& _dat Address Client::transact(Secret _secret, u256 _endowment, bytes const& _init, u256 _gas, u256 _gasPrice) { - ensureWorking(); + startWorking(); Transaction t; { @@ -357,23 +319,12 @@ Address Client::transact(Secret _secret, u256 _endowment, bytes const& _init, u2 void Client::inject(bytesConstRef _rlp) { - ensureWorking(); + startWorking(); m_tq.attemptImport(_rlp); } -void Client::workNet() -{ - // Process network events. - // Synchronise block chain with network. - // Will broadcast any of our (new) transactions and blocks, and collect & add any of their (new) transactions and blocks. - if (auto h = m_extHost.lock()) - h->sync(m_tq, m_bq); - - this_thread::sleep_for(chrono::milliseconds(1)); -} - -void Client::work() +void Client::doWork() { // TODO: Use condition variable rather than polling. @@ -430,7 +381,8 @@ void Client::work() cwork << "preSTATE <== CHAIN"; if (m_preMine.sync(m_bc) || m_postMine.address() != m_preMine.address()) { - cnote << "New block on chain: Restarting mining operation."; + if (isMining()) + cnote << "New block on chain: Restarting mining operation."; m_postMine = m_preMine; rsm = true; changeds.insert(PendingChangedFilter); @@ -446,7 +398,8 @@ void Client::work() appendFromNewPending(i, changeds); changeds.insert(PendingChangedFilter); - cnote << "Additional transaction ready: Restarting mining operation."; + if (isMining()) + cnote << "Additional transaction ready: Restarting mining operation."; rsm = true; } } diff --git a/libethereum/Client.h b/libethereum/Client.h index 9d0b16767..25ec794af 100644 --- a/libethereum/Client.h +++ b/libethereum/Client.h @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -108,7 +109,7 @@ struct WorkChannel: public LogChannel { static const char* name() { return "-W-" /** * @brief Main API hub for interfacing with Ethereum. */ -class Client: public MinerHost, public Interface +class Client: public MinerHost, public Interface, Worker { friend class Miner; @@ -217,7 +218,7 @@ public: unsigned miningThreads() const { ReadGuard l(x_miners); return m_miners.size(); } /// Start mining. /// NOT thread-safe - call it & stopMining only from a single thread - void startMining() { ensureWorking(); ReadGuard l(x_miners); for (auto& m: m_miners) m.start(); } + void startMining() { startWorking(); ReadGuard l(x_miners); for (auto& m: m_miners) m.start(); } /// Stop mining. /// NOT thread-safe void stopMining() { ReadGuard l(x_miners); for (auto& m: m_miners) m.stop(); } @@ -238,15 +239,10 @@ public: void killChain(); private: - /// Ensure the worker thread is running. Needed for blockchain maintenance & mining. - void ensureWorking(); - /// Do some work. Handles blockchain maintenance and mining. - /// @param _justQueue If true will only processing the transaction queues. - void work(); + virtual void doWork(); - /// Syncs the queues with the network. - void workNet(); + virtual void doneWorking(); /// Overrides for being a mining host. virtual void setupState(State& _s); @@ -281,13 +277,7 @@ private: State m_preMine; ///< The present state of the client. State m_postMine; ///< The state of the client which we're mining (i.e. it'll have all the rewards added). - std::unique_ptr m_workNet; ///< The network thread. - std::atomic m_workNetState; - - std::weak_ptr m_extHost; ///< Our Ethereum Host. Don't do anything if we can't lock. - - std::unique_ptr m_work; ///< The work thread. - std::atomic m_workState; + std::weak_ptr m_host; ///< Our Ethereum Host. Don't do anything if we can't lock. std::vector m_miners; mutable boost::shared_mutex x_miners; diff --git a/libethereum/EthereumHost.cpp b/libethereum/EthereumHost.cpp index a80c0df6f..2b88fc768 100644 --- a/libethereum/EthereumHost.cpp +++ b/libethereum/EthereumHost.cpp @@ -39,9 +39,12 @@ using namespace dev; using namespace dev::eth; using namespace p2p; -EthereumHost::EthereumHost(BlockChain const& _ch, u256 _networkId): +EthereumHost::EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQueue& _bq, u256 _networkId): HostCapability(), - m_chain (&_ch), + Worker("ethsync"), + m_chain (_ch), + m_tq (_tq), + m_bq (_bq), m_networkId (_networkId) { m_latestBlockSent = _ch.currentHash(); @@ -82,7 +85,7 @@ bool EthereumHost::ensureInitialised(TransactionQueue& _tq) if (!m_latestBlockSent) { // First time - just initialise. - m_latestBlockSent = m_chain->currentHash(); + m_latestBlockSent = m_chain.currentHash(); clog(NetNote) << "Initialising: latest=" << m_latestBlockSent.abridged(); for (auto const& i: _tq.transactions()) @@ -102,7 +105,7 @@ void EthereumHost::noteDoneBlocks() clog(NetNote) << "No more blocks coming. Missing" << m_blocksNeeded.size() << "blocks."; else clog(NetNote) << "No more blocks to get."; - m_latestBlockSent = m_chain->currentHash(); + m_latestBlockSent = m_chain.currentHash(); } } @@ -110,7 +113,7 @@ bool EthereumHost::noteBlock(h256 _hash, bytesConstRef _data) { Guard l(x_blocksNeeded); m_blocksOnWay.erase(_hash); - if (!m_chain->details(_hash)) + if (!m_chain.details(_hash)) { lock_guard l(m_incomingLock); m_incomingBlocks.push_back(_data.toBytes()); @@ -119,13 +122,14 @@ bool EthereumHost::noteBlock(h256 _hash, bytesConstRef _data) return false; } -bool EthereumHost::sync(TransactionQueue& _tq, BlockQueue& _bq) +void EthereumHost::doWork() { - bool netChange = ensureInitialised(_tq); - auto h = m_chain->currentHash(); - maintainTransactions(_tq, h); - maintainBlocks(_bq, h); - return netChange; + bool netChange = ensureInitialised(m_tq); + auto h = m_chain.currentHash(); + maintainTransactions(m_tq, h); + maintainBlocks(m_bq, h); +// return netChange; + // TODO: Figure out what to do with netChange. } void EthereumHost::maintainTransactions(TransactionQueue& _tq, h256 _currentHash) @@ -172,7 +176,7 @@ void EthereumHost::maintainBlocks(BlockQueue& _bq, h256 _currentHash) { lock_guard l(m_incomingLock); for (auto it = m_incomingBlocks.rbegin(); it != m_incomingBlocks.rend(); ++it) - if (_bq.import(&*it, *m_chain)) + if (_bq.import(&*it, m_chain)) {} else{} // TODO: don't forward it. m_incomingBlocks.clear(); @@ -191,9 +195,9 @@ void EthereumHost::maintainBlocks(BlockQueue& _bq, h256 _currentHash) EthereumPeer::prep(ts); bytes bs; unsigned c = 0; - for (auto h: m_chain->treeRoute(m_latestBlockSent, _currentHash, nullptr, false, true)) + for (auto h: m_chain.treeRoute(m_latestBlockSent, _currentHash, nullptr, false, true)) { - bs += m_chain->block(h); + bs += m_chain.block(h); ++c; } clog(NetMessageSummary) << "Sending" << c << "new blocks (current is" << _currentHash << ", was" << m_latestBlockSent << ")"; @@ -221,9 +225,9 @@ void EthereumHost::noteHaveChain(EthereumPeer* _from) if (_from->m_neededBlocks.empty()) return; - clog(NetNote) << "Hash-chain COMPLETE:" << _from->m_totalDifficulty << "vs" << m_chain->details().totalDifficulty << "," << m_totalDifficultyOfNeeded << ";" << _from->m_neededBlocks.size() << " blocks, ends" << _from->m_neededBlocks.back().abridged(); + clog(NetNote) << "Hash-chain COMPLETE:" << _from->m_totalDifficulty << "vs" << m_chain.details().totalDifficulty << "," << m_totalDifficultyOfNeeded << ";" << _from->m_neededBlocks.size() << " blocks, ends" << _from->m_neededBlocks.back().abridged(); - if ((m_totalDifficultyOfNeeded && td < m_totalDifficultyOfNeeded) || td < m_chain->details().totalDifficulty) + if ((m_totalDifficultyOfNeeded && td < m_totalDifficultyOfNeeded) || td < m_chain.details().totalDifficulty) { clog(NetNote) << "Difficulty of hashchain LOWER. Ignoring."; return; diff --git a/libethereum/EthereumHost.h b/libethereum/EthereumHost.h index 61b716209..7c6b4b865 100644 --- a/libethereum/EthereumHost.h +++ b/libethereum/EthereumHost.h @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include "CommonNet.h" @@ -49,13 +50,13 @@ class BlockQueue; * @brief The EthereumHost class * @warning None of this is thread-safe. You have been warned. */ -class EthereumHost: public p2p::HostCapability +class EthereumHost: public p2p::HostCapability, Worker { friend class EthereumPeer; public: /// Start server, but don't listen. - EthereumHost(BlockChain const& _ch, u256 _networkId); + EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQueue& _bq, u256 _networkId); /// Will block on network process events. virtual ~EthereumHost(); @@ -64,9 +65,6 @@ public: u256 networkId() const { return m_networkId; } void setNetworkId(u256 _n) { m_networkId = _n; } - /// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network. - bool sync(TransactionQueue&, BlockQueue& _bc); - private: /// Session wants to pass us a block that we might not have. /// @returns true if we didn't have it. @@ -76,6 +74,9 @@ private: /// Called when the peer can no longer provide us with any needed blocks. void noteDoneBlocks(); + /// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network. + void doWork(); + void maintainTransactions(TransactionQueue& _tq, h256 _currentBlock); void maintainBlocks(BlockQueue& _bq, h256 _currentBlock); @@ -90,7 +91,12 @@ private: /// Initialises the network peer-state, doing the stuff that needs to be once-only. @returns true if it really was first. bool ensureInitialised(TransactionQueue& _tq); - BlockChain const* m_chain = nullptr; + virtual void onStarting() { startWorking(); } + virtual void onStopping() { stopWorking(); } + + BlockChain const& m_chain; + TransactionQueue& m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain. + BlockQueue& m_bq; ///< Maintains a list of incoming blocks not yet on the blockchain (to be imported). u256 m_networkId; diff --git a/libethereum/EthereumPeer.cpp b/libethereum/EthereumPeer.cpp index 5918a7c49..238bc3d8d 100644 --- a/libethereum/EthereumPeer.cpp +++ b/libethereum/EthereumPeer.cpp @@ -57,9 +57,9 @@ void EthereumPeer::sendStatus() s.appendList(6) << StatusPacket << host()->protocolVersion() << host()->networkId() - << host()->m_chain->details().totalDifficulty - << host()->m_chain->currentHash() - << host()->m_chain->genesisHash(); + << host()->m_chain.details().totalDifficulty + << host()->m_chain.currentHash() + << host()->m_chain.genesisHash(); sealAndSend(s); } @@ -73,11 +73,11 @@ void EthereumPeer::startInitialSync() sealAndSend(s); } - h256 c = host()->m_chain->currentHash(); - unsigned n = host()->m_chain->number(); - u256 td = max(host()->m_chain->details().totalDifficulty, host()->m_totalDifficultyOfNeeded); + h256 c = host()->m_chain.currentHash(); + unsigned n = host()->m_chain.number(); + u256 td = max(host()->m_chain.details().totalDifficulty, host()->m_totalDifficultyOfNeeded); - clogS(NetAllDetail) << "Initial sync. Latest:" << c.abridged() << ", number:" << n << ", TD: max(" << host()->m_chain->details().totalDifficulty << "," << host()->m_totalDifficultyOfNeeded << ") versus " << m_totalDifficulty; + clogS(NetAllDetail) << "Initial sync. Latest:" << c.abridged() << ", number:" << n << ", TD: max(" << host()->m_chain.details().totalDifficulty << "," << host()->m_totalDifficultyOfNeeded << ") versus " << m_totalDifficulty; if (td > m_totalDifficulty) return; // All good - we have the better chain. @@ -132,7 +132,7 @@ bool EthereumPeer::interpret(RLP const& _r) clogS(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << genesisHash.abridged() << ", TD:" << m_totalDifficulty << "=" << m_latestHash.abridged(); - if (genesisHash != host()->m_chain->genesisHash()) + if (genesisHash != host()->m_chain.genesisHash()) disable("Invalid genesis hash"); if (m_protocolVersion != host()->protocolVersion()) disable("Invalid protocol version."); @@ -162,12 +162,12 @@ bool EthereumPeer::interpret(RLP const& _r) unsigned limit = _r[2].toInt(); clogS(NetMessageSummary) << "GetBlockHashes (" << limit << "entries," << later.abridged() << ")"; - unsigned c = min(host()->m_chain->number(later), limit); + unsigned c = min(host()->m_chain.number(later), limit); RLPStream s; prep(s).appendList(1 + c).append(BlockHashesPacket); - h256 p = host()->m_chain->details(later).parent; - for (unsigned i = 0; i < c; ++i, p = host()->m_chain->details(p).parent) + h256 p = host()->m_chain.details(later).parent; + for (unsigned i = 0; i < c; ++i, p = host()->m_chain.details(p).parent) s << p; sealAndSend(s); break; @@ -183,7 +183,7 @@ bool EthereumPeer::interpret(RLP const& _r) for (unsigned i = 1; i < _r.itemCount(); ++i) { auto h = _r[i].toHash(); - if (host()->m_chain->details(h)) + if (host()->m_chain.details(h)) { host()->noteHaveChain(this); return true; @@ -206,7 +206,7 @@ bool EthereumPeer::interpret(RLP const& _r) unsigned n = 0; for (unsigned i = 1; i < _r.itemCount() && i <= c_maxBlocks; ++i) { - auto b = host()->m_chain->block(_r[i].toHash()); + auto b = host()->m_chain.block(_r[i].toHash()); if (b.size()) { rlp += b; @@ -248,7 +248,7 @@ bool EthereumPeer::interpret(RLP const& _r) auto h = BlockInfo::headerHash(_r[i].data()); BlockInfo bi(_r[i].data()); Guard l(x_knownBlocks); - if (!host()->m_chain->details(bi.parentHash) && !m_knownBlocks.count(bi.parentHash)) + if (!host()->m_chain.details(bi.parentHash) && !m_knownBlocks.count(bi.parentHash)) { unknownParents++; clogS(NetAllDetail) << "Unknown parent" << bi.parentHash << "of block" << h; diff --git a/libp2p/Host.cpp b/libp2p/Host.cpp index ac3d6bc10..7e9589646 100644 --- a/libp2p/Host.cpp +++ b/libp2p/Host.cpp @@ -17,7 +17,7 @@ /** @file Host.cpp * @authors: * Gav Wood - * Eric Lombrozo + * Eric Lombrozo (Windows version of populateAddresses()) * @date 2014 */ @@ -55,6 +55,7 @@ static const set c_rejectAddresses = { }; Host::Host(std::string const& _clientVersion, NetworkPreferences const& _n, bool _start): + Worker("p2p"), m_clientVersion(_clientVersion), m_netPrefs(_n), m_acceptor(m_ioService), @@ -104,10 +105,20 @@ void Host::start() ensureAccepting(); m_lastPeersRequest = chrono::steady_clock::time_point::min(); clog(NetNote) << "Id:" << m_id.abridged(); + + for (auto const& h: m_capabilities) + h.second->onStarting(); + + startWorking(); } void Host::stop() { + for (auto const& h: m_capabilities) + h.second->onStopping(); + + stopWorking(); + if (m_acceptor.is_open()) { if (m_accepting) @@ -473,7 +484,7 @@ std::vector Host::peers(bool _updatePing) const return ret; } -void Host::process() +void Host::doWork() { growPeers(); prunePeers(); diff --git a/libp2p/Host.h b/libp2p/Host.h index b055b9aeb..57f0a0022 100644 --- a/libp2p/Host.h +++ b/libp2p/Host.h @@ -29,6 +29,7 @@ #include #include #include +#include #include "HostCapability.h" namespace ba = boost::asio; namespace bi = boost::asio::ip; @@ -55,7 +56,7 @@ struct NetworkPreferences * @brief The Host class * Capabilities should be registered prior to startNetwork, since m_capabilities is not thread-safe. */ -class Host +class Host: public Worker { friend class Session; friend class HostCapabilityFace; @@ -84,11 +85,6 @@ public: void connect(std::string const& _addr, unsigned short _port = 30303) noexcept; void connect(bi::tcp::endpoint const& _ep); - /// Conduct I/O, polling, syncing, whatever. - /// Ideally all time-consuming I/O is done in a background thread or otherwise asynchronously, but you get this call every 100ms or so anyway. - /// This won't touch alter the blockchain. - void process(); - /// @returns true iff we have the a peer of the given id. bool havePeer(h512 _id) const; @@ -117,12 +113,13 @@ public: void start(); void stop(); + bool isStarted() const { return isWorking(); } h512 id() const { return m_id; } void registerPeer(std::shared_ptr _s, std::vector const& _caps); -protected: +private: /// Called when the session has provided us with a new peer we can connect to. void noteNewPeers() {} @@ -134,6 +131,11 @@ protected: void growPeers(); void prunePeers(); + /// Conduct I/O, polling, syncing, whatever. + /// Ideally all time-consuming I/O is done in a background thread or otherwise asynchronously, but you get this call every 100ms or so anyway. + /// This won't touch alter the blockchain. + virtual void doWork(); + std::map potentialPeers(); std::string m_clientVersion; diff --git a/libp2p/HostCapability.h b/libp2p/HostCapability.h index cf2399b83..1c532788b 100644 --- a/libp2p/HostCapability.h +++ b/libp2p/HostCapability.h @@ -49,6 +49,9 @@ protected: virtual std::string name() const = 0; virtual Capability* newPeerCapability(Session* _s) = 0; + virtual void onStarting() {} + virtual void onStopping() {} + void seal(bytes& _b); private: diff --git a/libwebthree/WebThree.cpp b/libwebthree/WebThree.cpp index 6718ec96d..55e82a36f 100644 --- a/libwebthree/WebThree.cpp +++ b/libwebthree/WebThree.cpp @@ -51,88 +51,34 @@ WebThreeDirect::WebThreeDirect(std::string const& _clientVersion, std::string co WebThreeDirect::~WebThreeDirect() { - stopNetwork(); -} - -void WebThreeDirect::startNetwork() -{ - static const char* c_threadName = "p2p"; - - m_net.start(); - - UpgradableGuard l(x_work); - { - UpgradeGuard ul(l); - - if (!m_work) - m_work.reset(new thread([&]() - { - setThreadName(c_threadName); - m_workState.store(Active, std::memory_order_release); - while (m_workState.load(std::memory_order_acquire) != Deleting) - { - this_thread::sleep_for(chrono::milliseconds(1)); - ReadGuard l(x_work); - m_net.process(); // must be in guard for now since it uses the blockchain. - } - m_workState.store(Deleted, std::memory_order_release); - })); - } -} - -void WebThreeDirect::stopNetwork() -{ - m_net.stop(); - - UpgradableGuard l(x_work); - - if (m_work) - { - if (m_workState.load(std::memory_order_acquire) == Active) - m_workState.store(Deleting, std::memory_order_release); - while (m_workState.load(std::memory_order_acquire) != Deleted) - this_thread::sleep_for(chrono::milliseconds(10)); - m_work->join(); - } - if (m_work) - { - UpgradeGuard ul(l); - m_work.reset(nullptr); - } } std::vector WebThreeDirect::peers() { - ReadGuard l(x_work); return m_net.peers(); } size_t WebThreeDirect::peerCount() const { - ReadGuard l(x_work); return m_net.peerCount(); } void WebThreeDirect::setIdealPeerCount(size_t _n) { - ReadGuard l(x_work); return m_net.setIdealPeerCount(_n); } bytes WebThreeDirect::savePeers() { - ReadGuard l(x_work); return m_net.savePeers(); } void WebThreeDirect::restorePeers(bytesConstRef _saved) { - ReadGuard l(x_work); return m_net.restorePeers(_saved); } void WebThreeDirect::connect(std::string const& _seedHost, unsigned short _port) { - ReadGuard l(x_work); m_net.connect(_seedHost, _port); } diff --git a/libwebthree/WebThree.h b/libwebthree/WebThree.h index 4799e7765..da41f9e76 100644 --- a/libwebthree/WebThree.h +++ b/libwebthree/WebThree.h @@ -104,15 +104,15 @@ public: /// Sets the ideal number of peers. void setIdealPeerCount(size_t _n); - bool haveNetwork() const { return !!m_work; } + bool haveNetwork() const { return m_net.isStarted(); } void setNetworkPreferences(p2p::NetworkPreferences const& _n) { auto had = haveNetwork(); if (had) stopNetwork(); m_netPrefs = _n; if (had) startNetwork(); } /// Start the network subsystem. - void startNetwork(); + void startNetwork() { m_net.start(); } /// Stop the network subsystem. - void stopNetwork(); + void stopNetwork() { m_net.stop(); } private: std::string m_clientVersion; ///< Our end-application client's name/version. @@ -121,9 +121,6 @@ private: std::unique_ptr m_whisper; ///< Main interface for Whisper ("shh") protocol. p2p::Host m_net; ///< Should run in background and send us events when blocks found and allow us to send blocks as required. - std::unique_ptr m_work; ///< The network thread. - mutable boost::shared_mutex x_work; ///< Lock for the network existance. - std::atomic m_workState; p2p::NetworkPreferences m_netPrefs; };