From b4191025fd1fb7a3d0bd9bb7e6e518003a7f636d Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Mon, 28 Jul 2014 16:18:12 +0200 Subject: [PATCH] Network has own thread. --- libethereum/Client.cpp | 73 ++++++++++++++++++++++++++---------------- libethereum/Client.h | 41 +++++++++++++----------- 2 files changed, 68 insertions(+), 46 deletions(-) diff --git a/libethereum/Client.cpp b/libethereum/Client.cpp index 2c9b4453b..805dcac75 100644 --- a/libethereum/Client.cpp +++ b/libethereum/Client.cpp @@ -99,14 +99,7 @@ void Client::ensureWorking() Client::~Client() { - if (m_work) - { - if (m_workState.load(std::memory_order_acquire) == Active) - m_workState.store(Deleting, std::memory_order_release); - while (m_workState.load(std::memory_order_acquire) != Deleted) - this_thread::sleep_for(chrono::milliseconds(10)); - m_work->join(); - } + stopNetwork(); } void Client::flushTransactions() @@ -196,6 +189,18 @@ void Client::noteChanged(h256Set const& _filters) void Client::startNetwork(unsigned short _listenPort, std::string const& _seedHost, unsigned short _port, NodeMode _mode, unsigned _peers, string const& _publicIP, bool _upnp) { + static const char* c_threadName = "net"; + + if (!m_workNet) + m_workNet.reset(new thread([&]() + { + setThreadName(c_threadName); + m_workNetState.store(Active, std::memory_order_release); + while (m_workNetState.load(std::memory_order_acquire) != Deleting) + workNet(); + m_workNetState.store(Deleted, std::memory_order_release); + })); + ensureWorking(); { @@ -242,8 +247,19 @@ void Client::connect(std::string const& _seedHost, unsigned short _port) void Client::stopNetwork() { - Guard l(x_net); - m_net.reset(nullptr); + { + Guard l(x_net); + m_net.reset(nullptr); + } + + if (m_workNet) + { + if (m_workNetState.load(std::memory_order_acquire) == Active) + m_workNetState.store(Deleting, std::memory_order_release); + while (m_workNetState.load(std::memory_order_acquire) != Deleted) + this_thread::sleep_for(chrono::milliseconds(10)); + m_workNet->join(); + } } void Client::startMining() @@ -303,28 +319,29 @@ void Client::inject(bytesConstRef _rlp) m_tq.attemptImport(_rlp); } -void Client::work(bool _justQueue) +void Client::workNet() { - cworkin << "WORK"; - h256Set changeds; - // Process network events. // Synchronise block chain with network. // Will broadcast any of our (new) transactions and blocks, and collect & add any of their (new) transactions and blocks. + Guard l(x_net); + if (m_net) { - Guard l(x_net); - if (m_net && !_justQueue) - { - cwork << "NETWORK"; - m_net->process(); // must be in guard for now since it uses the blockchain. + cwork << "NETWORK"; + m_net->process(); // must be in guard for now since it uses the blockchain. - // returns h256Set as block hashes, once for each block that has come in/gone out. - cwork << "NET <==> TQ ; CHAIN ==> NET ==> BQ"; - m_net->sync(m_tq, m_bq); + // returns h256Set as block hashes, once for each block that has come in/gone out. + cwork << "NET <==> TQ ; CHAIN ==> NET ==> BQ"; + m_net->sync(m_tq, m_bq); - cwork << "TQ:" << m_tq.items() << "; BQ:" << m_bq.items(); - } + cwork << "TQ:" << m_tq.items() << "; BQ:" << m_bq.items(); } +} + +void Client::work(bool _justQueue) +{ + cworkin << "WORK"; + h256Set changeds; // Do some mining. if (!_justQueue) @@ -407,7 +424,7 @@ void Client::work(bool _justQueue) cwork << "BQ ==> CHAIN ==> STATE"; OverlayDB db = m_stateDB; - m_lock.unlock(); + x_stateDB.unlock(); h256s newBlocks = m_bc.sync(m_bq, db, 100); // TODO: remove transactions from m_tq nicely rather than relying on out of date nonce later on. if (newBlocks.size()) { @@ -415,7 +432,7 @@ void Client::work(bool _justQueue) appendFromNewBlock(i, changeds); changeds.insert(NewBlockFilter); } - m_lock.lock(); + x_stateDB.lock(); if (newBlocks.size()) m_stateDB = db; @@ -451,12 +468,12 @@ void Client::work(bool _justQueue) void Client::lock() const { - m_lock.lock(); + x_stateDB.lock(); } void Client::unlock() const { - m_lock.unlock(); + x_stateDB.unlock(); } unsigned Client::numberOf(int _n) const diff --git a/libethereum/Client.h b/libethereum/Client.h index 17d1bc149..405415d9f 100644 --- a/libethereum/Client.h +++ b/libethereum/Client.h @@ -291,13 +291,16 @@ public: void clearPending(); private: - /// Ensure the worker thread is running. Needed for networking & mining. + /// Ensure the worker thread is running. Needed for blockchain maintenance & mining. void ensureWorking(); - /// Do some work. Handles networking and mining. + /// Do some work. Handles blockchain maintenance and mining. /// @param _justQueue If true will only processing the transaction queues. void work(bool _justQueue = false); + /// Do some work on the network. + void workNet(); + /// Collate the changed filters for the bloom filter of the given pending transaction. /// Insert any filters that are activated into @a o_changed. void appendFromNewPending(h256 _pendingTransactionBloom, h256Set& o_changed) const; @@ -316,24 +319,26 @@ private: State asOf(int _h) const; State asOf(unsigned _h) const; - std::string m_clientVersion; ///< Our end-application client's name/version. - VersionChecker m_vc; ///< Dummy object to check & update the protocol version. - BlockChain m_bc; ///< Maintains block database. - TransactionQueue m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain. - BlockQueue m_bq; ///< Maintains a list of incoming blocks not yet on the blockchain (to be imported). - OverlayDB m_stateDB; ///< Acts as the central point for the state database, so multiple States can share it. - State m_preMine; ///< The present state of the client. - State m_postMine; ///< The state of the client which we're mining (i.e. it'll have all the rewards added). - - mutable std::mutex x_net; ///< Lock for the network. - std::unique_ptr m_net; ///< Should run in background and send us events when blocks found and allow us to send blocks as required. - - std::unique_ptr m_work;///< The work thread. - - mutable std::recursive_mutex m_lock; + std::string m_clientVersion; ///< Our end-application client's name/version. + VersionChecker m_vc; ///< Dummy object to check & update the protocol version. + BlockChain m_bc; ///< Maintains block database. + TransactionQueue m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain. + BlockQueue m_bq; ///< Maintains a list of incoming blocks not yet on the blockchain (to be imported). + mutable std::recursive_mutex x_stateDB; // TODO: remove in favour of copying m_stateDB as required and thread-safing/copying State. Have a good think about what state objects there should be. Probably want 4 (pre, post, mining, user-visible). + OverlayDB m_stateDB; ///< Acts as the central point for the state database, so multiple States can share it. + State m_preMine; ///< The present state of the client. + State m_postMine; ///< The state of the client which we're mining (i.e. it'll have all the rewards added). + + std::unique_ptr m_workNet; ///< The network thread. + std::atomic m_workNetState; + mutable std::mutex x_net; ///< Lock for the network. // TODO: make network thread-safe. + std::unique_ptr m_net; ///< Should run in background and send us events when blocks found and allow us to send blocks as required. + + std::unique_ptr m_work; ///< The work thread. std::atomic m_workState; + bool m_paranoia = false; - bool m_doMine = false; ///< Are we supposed to be mining? + bool m_doMine = false; ///< Are we supposed to be mining? MineProgress m_mineProgress; std::list m_mineHistory; mutable bool m_restartMining = false;