Browse Source

Network has own thread.

cl-refactor
Gav Wood 11 years ago
parent
commit
b4191025fd
  1. 73
      libethereum/Client.cpp
  2. 41
      libethereum/Client.h

73
libethereum/Client.cpp

@ -99,14 +99,7 @@ void Client::ensureWorking()
Client::~Client() Client::~Client()
{ {
if (m_work) stopNetwork();
{
if (m_workState.load(std::memory_order_acquire) == Active)
m_workState.store(Deleting, std::memory_order_release);
while (m_workState.load(std::memory_order_acquire) != Deleted)
this_thread::sleep_for(chrono::milliseconds(10));
m_work->join();
}
} }
void Client::flushTransactions() void Client::flushTransactions()
@ -196,6 +189,18 @@ void Client::noteChanged(h256Set const& _filters)
void Client::startNetwork(unsigned short _listenPort, std::string const& _seedHost, unsigned short _port, NodeMode _mode, unsigned _peers, string const& _publicIP, bool _upnp) void Client::startNetwork(unsigned short _listenPort, std::string const& _seedHost, unsigned short _port, NodeMode _mode, unsigned _peers, string const& _publicIP, bool _upnp)
{ {
static const char* c_threadName = "net";
if (!m_workNet)
m_workNet.reset(new thread([&]()
{
setThreadName(c_threadName);
m_workNetState.store(Active, std::memory_order_release);
while (m_workNetState.load(std::memory_order_acquire) != Deleting)
workNet();
m_workNetState.store(Deleted, std::memory_order_release);
}));
ensureWorking(); ensureWorking();
{ {
@ -242,8 +247,19 @@ void Client::connect(std::string const& _seedHost, unsigned short _port)
void Client::stopNetwork() void Client::stopNetwork()
{ {
Guard l(x_net); {
m_net.reset(nullptr); Guard l(x_net);
m_net.reset(nullptr);
}
if (m_workNet)
{
if (m_workNetState.load(std::memory_order_acquire) == Active)
m_workNetState.store(Deleting, std::memory_order_release);
while (m_workNetState.load(std::memory_order_acquire) != Deleted)
this_thread::sleep_for(chrono::milliseconds(10));
m_workNet->join();
}
} }
void Client::startMining() void Client::startMining()
@ -303,28 +319,29 @@ void Client::inject(bytesConstRef _rlp)
m_tq.attemptImport(_rlp); m_tq.attemptImport(_rlp);
} }
void Client::work(bool _justQueue) void Client::workNet()
{ {
cworkin << "WORK";
h256Set changeds;
// Process network events. // Process network events.
// Synchronise block chain with network. // Synchronise block chain with network.
// Will broadcast any of our (new) transactions and blocks, and collect & add any of their (new) transactions and blocks. // Will broadcast any of our (new) transactions and blocks, and collect & add any of their (new) transactions and blocks.
Guard l(x_net);
if (m_net)
{ {
Guard l(x_net); cwork << "NETWORK";
if (m_net && !_justQueue) m_net->process(); // must be in guard for now since it uses the blockchain.
{
cwork << "NETWORK";
m_net->process(); // must be in guard for now since it uses the blockchain.
// returns h256Set as block hashes, once for each block that has come in/gone out. // returns h256Set as block hashes, once for each block that has come in/gone out.
cwork << "NET <==> TQ ; CHAIN ==> NET ==> BQ"; cwork << "NET <==> TQ ; CHAIN ==> NET ==> BQ";
m_net->sync(m_tq, m_bq); m_net->sync(m_tq, m_bq);
cwork << "TQ:" << m_tq.items() << "; BQ:" << m_bq.items(); cwork << "TQ:" << m_tq.items() << "; BQ:" << m_bq.items();
}
} }
}
void Client::work(bool _justQueue)
{
cworkin << "WORK";
h256Set changeds;
// Do some mining. // Do some mining.
if (!_justQueue) if (!_justQueue)
@ -407,7 +424,7 @@ void Client::work(bool _justQueue)
cwork << "BQ ==> CHAIN ==> STATE"; cwork << "BQ ==> CHAIN ==> STATE";
OverlayDB db = m_stateDB; OverlayDB db = m_stateDB;
m_lock.unlock(); x_stateDB.unlock();
h256s newBlocks = m_bc.sync(m_bq, db, 100); // TODO: remove transactions from m_tq nicely rather than relying on out of date nonce later on. h256s newBlocks = m_bc.sync(m_bq, db, 100); // TODO: remove transactions from m_tq nicely rather than relying on out of date nonce later on.
if (newBlocks.size()) if (newBlocks.size())
{ {
@ -415,7 +432,7 @@ void Client::work(bool _justQueue)
appendFromNewBlock(i, changeds); appendFromNewBlock(i, changeds);
changeds.insert(NewBlockFilter); changeds.insert(NewBlockFilter);
} }
m_lock.lock(); x_stateDB.lock();
if (newBlocks.size()) if (newBlocks.size())
m_stateDB = db; m_stateDB = db;
@ -451,12 +468,12 @@ void Client::work(bool _justQueue)
void Client::lock() const void Client::lock() const
{ {
m_lock.lock(); x_stateDB.lock();
} }
void Client::unlock() const void Client::unlock() const
{ {
m_lock.unlock(); x_stateDB.unlock();
} }
unsigned Client::numberOf(int _n) const unsigned Client::numberOf(int _n) const

41
libethereum/Client.h

@ -291,13 +291,16 @@ public:
void clearPending(); void clearPending();
private: private:
/// Ensure the worker thread is running. Needed for networking & mining. /// Ensure the worker thread is running. Needed for blockchain maintenance & mining.
void ensureWorking(); void ensureWorking();
/// Do some work. Handles networking and mining. /// Do some work. Handles blockchain maintenance and mining.
/// @param _justQueue If true will only processing the transaction queues. /// @param _justQueue If true will only processing the transaction queues.
void work(bool _justQueue = false); void work(bool _justQueue = false);
/// Do some work on the network.
void workNet();
/// Collate the changed filters for the bloom filter of the given pending transaction. /// Collate the changed filters for the bloom filter of the given pending transaction.
/// Insert any filters that are activated into @a o_changed. /// Insert any filters that are activated into @a o_changed.
void appendFromNewPending(h256 _pendingTransactionBloom, h256Set& o_changed) const; void appendFromNewPending(h256 _pendingTransactionBloom, h256Set& o_changed) const;
@ -316,24 +319,26 @@ private:
State asOf(int _h) const; State asOf(int _h) const;
State asOf(unsigned _h) const; State asOf(unsigned _h) const;
std::string m_clientVersion; ///< Our end-application client's name/version. std::string m_clientVersion; ///< Our end-application client's name/version.
VersionChecker m_vc; ///< Dummy object to check & update the protocol version. VersionChecker m_vc; ///< Dummy object to check & update the protocol version.
BlockChain m_bc; ///< Maintains block database. BlockChain m_bc; ///< Maintains block database.
TransactionQueue m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain. TransactionQueue m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain.
BlockQueue m_bq; ///< Maintains a list of incoming blocks not yet on the blockchain (to be imported). BlockQueue m_bq; ///< Maintains a list of incoming blocks not yet on the blockchain (to be imported).
OverlayDB m_stateDB; ///< Acts as the central point for the state database, so multiple States can share it. mutable std::recursive_mutex x_stateDB; // TODO: remove in favour of copying m_stateDB as required and thread-safing/copying State. Have a good think about what state objects there should be. Probably want 4 (pre, post, mining, user-visible).
State m_preMine; ///< The present state of the client. OverlayDB m_stateDB; ///< Acts as the central point for the state database, so multiple States can share it.
State m_postMine; ///< The state of the client which we're mining (i.e. it'll have all the rewards added). State m_preMine; ///< The present state of the client.
State m_postMine; ///< The state of the client which we're mining (i.e. it'll have all the rewards added).
mutable std::mutex x_net; ///< Lock for the network.
std::unique_ptr<PeerServer> m_net; ///< Should run in background and send us events when blocks found and allow us to send blocks as required. std::unique_ptr<std::thread> m_workNet; ///< The network thread.
std::atomic<ClientWorkState> m_workNetState;
std::unique_ptr<std::thread> m_work;///< The work thread. mutable std::mutex x_net; ///< Lock for the network. // TODO: make network thread-safe.
std::unique_ptr<PeerServer> m_net; ///< Should run in background and send us events when blocks found and allow us to send blocks as required.
mutable std::recursive_mutex m_lock;
std::unique_ptr<std::thread> m_work; ///< The work thread.
std::atomic<ClientWorkState> m_workState; std::atomic<ClientWorkState> m_workState;
bool m_paranoia = false; bool m_paranoia = false;
bool m_doMine = false; ///< Are we supposed to be mining? bool m_doMine = false; ///< Are we supposed to be mining?
MineProgress m_mineProgress; MineProgress m_mineProgress;
std::list<MineInfo> m_mineHistory; std::list<MineInfo> m_mineHistory;
mutable bool m_restartMining = false; mutable bool m_restartMining = false;

Loading…
Cancel
Save