Browse Source

Move out worker thread stuff into other class.

p2p::Host is worker rather than WebThree.
Client is single worker.
EthereumHost works.
cl-refactor
Gav Wood 10 years ago
parent
commit
066fc18116
  1. 5
      exp/main.cpp
  2. 63
      libdevcore/Worker.cpp
  3. 49
      libdevcore/Worker.h
  4. 97
      libethereum/Client.cpp
  5. 22
      libethereum/Client.h
  6. 36
      libethereum/EthereumHost.cpp
  7. 18
      libethereum/EthereumHost.h
  8. 28
      libethereum/EthereumPeer.cpp
  9. 15
      libp2p/Host.cpp
  10. 16
      libp2p/Host.h
  11. 3
      libp2p/HostCapability.h
  12. 54
      libwebthree/WebThree.cpp
  13. 9
      libwebthree/WebThree.h

5
exp/main.cpp

@ -326,16 +326,17 @@ int main(int argc, char** argv)
ph.registerCapability(new WhisperHost());
auto wh = ph.cap<WhisperHost>();
ph.start();
if (!remoteHost.empty())
ph.connect(remoteHost, remotePort);
/// Only interested in the packet if the lowest bit is 1
auto w = wh->installWatch(MessageFilter(std::vector<std::pair<bytes, bytes> >({{fromHex("0000000000000000000000000000000000000000000000000000000000000001"), fromHex("0000000000000000000000000000000000000000000000000000000000000001")}})));
for (int i = 0; ; ++i)
{
this_thread::sleep_for(chrono::milliseconds(1000));
ph.process();
wh->sendRaw(h256(u256(i * i)).asBytes(), h256(u256(i)).asBytes(), 1000);
for (auto i: wh->checkWatch(w))
cnote << "New message:" << (u256)h256(wh->message(i).payload);

63
libdevcore/Worker.cpp

@ -0,0 +1,63 @@
/*
This file is part of cpp-ethereum.
cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file Worker.cpp
* @author Gav Wood <i@gavwood.com>
* @date 2014
*/
#include "Worker.h"
#include <chrono>
#include <thread>
#include "Log.h"
using namespace std;
using namespace dev;
void Worker::startWorking()
{
cdebug << "startWorking for thread" << m_name;
Guard l(x_work);
if (m_work)
return;
cdebug << "Spawning" << m_name;
m_stop = false;
m_work.reset(new thread([&]()
{
setThreadName(m_name.c_str());
while (!m_stop)
{
this_thread::sleep_for(chrono::milliseconds(1));
doWork();
}
cdebug << "Finishing up worker thread";
doneWorking();
}));
}
void Worker::stopWorking()
{
cdebug << "stopWorking for thread" << m_name;
Guard l(x_work);
if (!m_work)
return;
cdebug << "Stopping" << m_name;
m_stop = true;
m_work->join();
m_work.reset();
cdebug << "Stopped" << m_name;
}

49
libdevcore/Worker.h

@ -0,0 +1,49 @@
/*
This file is part of cpp-ethereum.
cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file Worker.h
* @author Gav Wood <i@gavwood.com>
* @date 2014
*/
#pragma once
#include <string>
#include <thread>
#include "Guards.h"
namespace dev
{
class Worker
{
protected:
Worker(std::string const& _name): m_name(_name) {}
virtual ~Worker() { stopWorking(); }
void startWorking();
void stopWorking();
bool isWorking() const { Guard l(x_work); return !!m_work; }
virtual void doWork() = 0;
virtual void doneWorking() {}
private:
mutable Mutex x_work; ///< Lock for the network existance.
std::unique_ptr<std::thread> m_work; ///< The network thread.
bool m_stop = false;
std::string m_name;
};
}

97
libethereum/Client.cpp

@ -54,79 +54,47 @@ void VersionChecker::setOk()
}
Client::Client(p2p::Host* _extNet, std::string const& _dbPath, bool _forceClean, u256 _networkId):
Worker("eth"),
m_vc(_dbPath),
m_bc(_dbPath, !m_vc.ok() || _forceClean),
m_stateDB(State::openDB(_dbPath, !m_vc.ok() || _forceClean)),
m_preMine(Address(), m_stateDB),
m_postMine(Address(), m_stateDB)
{
m_extHost = _extNet->registerCapability(new EthereumHost(m_bc, _networkId));
m_host = _extNet->registerCapability(new EthereumHost(m_bc, m_tq, m_bq, _networkId));
// setMiningThreads();
if (_dbPath.size())
Defaults::setDBPath(_dbPath);
m_vc.setOk();
work();
doWork();
static const char* c_threadName = "ethsync";
m_workNet.reset(new thread([&]()
{
setThreadName(c_threadName);
m_workNetState.store(Active, std::memory_order_release);
while (m_workNetState.load(std::memory_order_acquire) != Deleting)
workNet();
m_workNetState.store(Deleted, std::memory_order_release);
}));
ensureWorking();
startWorking();
}
Client::~Client()
{
if (m_work)
{
if (m_workState.load(std::memory_order_acquire) == Active)
m_workState.store(Deleting, std::memory_order_release);
while (m_workState.load(std::memory_order_acquire) != Deleted)
this_thread::sleep_for(chrono::milliseconds(10));
m_work->join();
m_work.reset(nullptr);
}
if (m_workNet)
{
if (m_workNetState.load(std::memory_order_acquire) == Active)
m_workNetState.store(Deleting, std::memory_order_release);
while (m_workNetState.load(std::memory_order_acquire) != Deleted)
this_thread::sleep_for(chrono::milliseconds(10));
m_workNet->join();
m_workNet.reset(nullptr);
}
stopWorking();
}
void Client::ensureWorking()
void Client::setNetworkId(u256 _n)
{
static const char* c_threadName = "eth";
if (auto h = m_host.lock())
h->setNetworkId(_n);
}
if (!m_work)
m_work.reset(new thread([&]()
{
setThreadName(c_threadName);
m_workState.store(Active, std::memory_order_release);
while (m_workState.load(std::memory_order_acquire) != Deleting)
work();
m_workState.store(Deleted, std::memory_order_release);
// Synchronise the state according to the head of the block chain.
// TODO: currently it contains keys for *all* blocks. Make it remove old ones.
WriteGuard l(x_stateDB);
m_preMine.sync(m_bc);
m_postMine = m_preMine;
}));
void Client::doneWorking()
{
// Synchronise the state according to the head of the block chain.
// TODO: currently it contains keys for *all* blocks. Make it remove old ones.
WriteGuard l(x_stateDB);
m_preMine.sync(m_bc);
m_postMine = m_preMine;
}
void Client::flushTransactions()
{
work();
doWork();
}
void Client::killChain()
@ -223,12 +191,6 @@ void Client::appendFromNewBlock(h256 _block, h256Set& o_changed) const
o_changed.insert(i.first);
}
void Client::setNetworkId(u256 _n)
{
if (auto h = m_extHost.lock())
h->setNetworkId(_n);
}
void Client::setMiningThreads(unsigned _threads)
{
stopMining();
@ -295,7 +257,7 @@ void Client::setupState(State& _s)
void Client::transact(Secret _secret, u256 _value, Address _dest, bytes const& _data, u256 _gas, u256 _gasPrice)
{
ensureWorking();
startWorking();
Transaction t;
// cdebug << "Nonce at " << toAddress(_secret) << " pre:" << m_preMine.transactionsFrom(toAddress(_secret)) << " post:" << m_postMine.transactionsFrom(toAddress(_secret));
@ -337,7 +299,7 @@ bytes Client::call(Secret _secret, u256 _value, Address _dest, bytes const& _dat
Address Client::transact(Secret _secret, u256 _endowment, bytes const& _init, u256 _gas, u256 _gasPrice)
{
ensureWorking();
startWorking();
Transaction t;
{
@ -357,23 +319,12 @@ Address Client::transact(Secret _secret, u256 _endowment, bytes const& _init, u2
void Client::inject(bytesConstRef _rlp)
{
ensureWorking();
startWorking();
m_tq.attemptImport(_rlp);
}
void Client::workNet()
{
// Process network events.
// Synchronise block chain with network.
// Will broadcast any of our (new) transactions and blocks, and collect & add any of their (new) transactions and blocks.
if (auto h = m_extHost.lock())
h->sync(m_tq, m_bq);
this_thread::sleep_for(chrono::milliseconds(1));
}
void Client::work()
void Client::doWork()
{
// TODO: Use condition variable rather than polling.
@ -430,7 +381,8 @@ void Client::work()
cwork << "preSTATE <== CHAIN";
if (m_preMine.sync(m_bc) || m_postMine.address() != m_preMine.address())
{
cnote << "New block on chain: Restarting mining operation.";
if (isMining())
cnote << "New block on chain: Restarting mining operation.";
m_postMine = m_preMine;
rsm = true;
changeds.insert(PendingChangedFilter);
@ -446,7 +398,8 @@ void Client::work()
appendFromNewPending(i, changeds);
changeds.insert(PendingChangedFilter);
cnote << "Additional transaction ready: Restarting mining operation.";
if (isMining())
cnote << "Additional transaction ready: Restarting mining operation.";
rsm = true;
}
}

22
libethereum/Client.h

@ -29,6 +29,7 @@
#include <libdevcore/Common.h>
#include <libdevcore/CommonIO.h>
#include <libdevcore/Guards.h>
#include <libdevcore/Worker.h>
#include <libevm/FeeStructure.h>
#include <libethcore/Dagger.h>
#include <libp2p/Common.h>
@ -108,7 +109,7 @@ struct WorkChannel: public LogChannel { static const char* name() { return "-W-"
/**
* @brief Main API hub for interfacing with Ethereum.
*/
class Client: public MinerHost, public Interface
class Client: public MinerHost, public Interface, Worker
{
friend class Miner;
@ -217,7 +218,7 @@ public:
unsigned miningThreads() const { ReadGuard l(x_miners); return m_miners.size(); }
/// Start mining.
/// NOT thread-safe - call it & stopMining only from a single thread
void startMining() { ensureWorking(); ReadGuard l(x_miners); for (auto& m: m_miners) m.start(); }
void startMining() { startWorking(); ReadGuard l(x_miners); for (auto& m: m_miners) m.start(); }
/// Stop mining.
/// NOT thread-safe
void stopMining() { ReadGuard l(x_miners); for (auto& m: m_miners) m.stop(); }
@ -238,15 +239,10 @@ public:
void killChain();
private:
/// Ensure the worker thread is running. Needed for blockchain maintenance & mining.
void ensureWorking();
/// Do some work. Handles blockchain maintenance and mining.
/// @param _justQueue If true will only processing the transaction queues.
void work();
virtual void doWork();
/// Syncs the queues with the network.
void workNet();
virtual void doneWorking();
/// Overrides for being a mining host.
virtual void setupState(State& _s);
@ -281,13 +277,7 @@ private:
State m_preMine; ///< The present state of the client.
State m_postMine; ///< The state of the client which we're mining (i.e. it'll have all the rewards added).
std::unique_ptr<std::thread> m_workNet; ///< The network thread.
std::atomic<ClientWorkState> m_workNetState;
std::weak_ptr<EthereumHost> m_extHost; ///< Our Ethereum Host. Don't do anything if we can't lock.
std::unique_ptr<std::thread> m_work; ///< The work thread.
std::atomic<ClientWorkState> m_workState;
std::weak_ptr<EthereumHost> m_host; ///< Our Ethereum Host. Don't do anything if we can't lock.
std::vector<Miner> m_miners;
mutable boost::shared_mutex x_miners;

36
libethereum/EthereumHost.cpp

@ -39,9 +39,12 @@ using namespace dev;
using namespace dev::eth;
using namespace p2p;
EthereumHost::EthereumHost(BlockChain const& _ch, u256 _networkId):
EthereumHost::EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQueue& _bq, u256 _networkId):
HostCapability<EthereumPeer>(),
m_chain (&_ch),
Worker("ethsync"),
m_chain (_ch),
m_tq (_tq),
m_bq (_bq),
m_networkId (_networkId)
{
m_latestBlockSent = _ch.currentHash();
@ -82,7 +85,7 @@ bool EthereumHost::ensureInitialised(TransactionQueue& _tq)
if (!m_latestBlockSent)
{
// First time - just initialise.
m_latestBlockSent = m_chain->currentHash();
m_latestBlockSent = m_chain.currentHash();
clog(NetNote) << "Initialising: latest=" << m_latestBlockSent.abridged();
for (auto const& i: _tq.transactions())
@ -102,7 +105,7 @@ void EthereumHost::noteDoneBlocks()
clog(NetNote) << "No more blocks coming. Missing" << m_blocksNeeded.size() << "blocks.";
else
clog(NetNote) << "No more blocks to get.";
m_latestBlockSent = m_chain->currentHash();
m_latestBlockSent = m_chain.currentHash();
}
}
@ -110,7 +113,7 @@ bool EthereumHost::noteBlock(h256 _hash, bytesConstRef _data)
{
Guard l(x_blocksNeeded);
m_blocksOnWay.erase(_hash);
if (!m_chain->details(_hash))
if (!m_chain.details(_hash))
{
lock_guard<recursive_mutex> l(m_incomingLock);
m_incomingBlocks.push_back(_data.toBytes());
@ -119,13 +122,14 @@ bool EthereumHost::noteBlock(h256 _hash, bytesConstRef _data)
return false;
}
bool EthereumHost::sync(TransactionQueue& _tq, BlockQueue& _bq)
void EthereumHost::doWork()
{
bool netChange = ensureInitialised(_tq);
auto h = m_chain->currentHash();
maintainTransactions(_tq, h);
maintainBlocks(_bq, h);
return netChange;
bool netChange = ensureInitialised(m_tq);
auto h = m_chain.currentHash();
maintainTransactions(m_tq, h);
maintainBlocks(m_bq, h);
// return netChange;
// TODO: Figure out what to do with netChange.
}
void EthereumHost::maintainTransactions(TransactionQueue& _tq, h256 _currentHash)
@ -172,7 +176,7 @@ void EthereumHost::maintainBlocks(BlockQueue& _bq, h256 _currentHash)
{
lock_guard<recursive_mutex> l(m_incomingLock);
for (auto it = m_incomingBlocks.rbegin(); it != m_incomingBlocks.rend(); ++it)
if (_bq.import(&*it, *m_chain))
if (_bq.import(&*it, m_chain))
{}
else{} // TODO: don't forward it.
m_incomingBlocks.clear();
@ -191,9 +195,9 @@ void EthereumHost::maintainBlocks(BlockQueue& _bq, h256 _currentHash)
EthereumPeer::prep(ts);
bytes bs;
unsigned c = 0;
for (auto h: m_chain->treeRoute(m_latestBlockSent, _currentHash, nullptr, false, true))
for (auto h: m_chain.treeRoute(m_latestBlockSent, _currentHash, nullptr, false, true))
{
bs += m_chain->block(h);
bs += m_chain.block(h);
++c;
}
clog(NetMessageSummary) << "Sending" << c << "new blocks (current is" << _currentHash << ", was" << m_latestBlockSent << ")";
@ -221,9 +225,9 @@ void EthereumHost::noteHaveChain(EthereumPeer* _from)
if (_from->m_neededBlocks.empty())
return;
clog(NetNote) << "Hash-chain COMPLETE:" << _from->m_totalDifficulty << "vs" << m_chain->details().totalDifficulty << "," << m_totalDifficultyOfNeeded << ";" << _from->m_neededBlocks.size() << " blocks, ends" << _from->m_neededBlocks.back().abridged();
clog(NetNote) << "Hash-chain COMPLETE:" << _from->m_totalDifficulty << "vs" << m_chain.details().totalDifficulty << "," << m_totalDifficultyOfNeeded << ";" << _from->m_neededBlocks.size() << " blocks, ends" << _from->m_neededBlocks.back().abridged();
if ((m_totalDifficultyOfNeeded && td < m_totalDifficultyOfNeeded) || td < m_chain->details().totalDifficulty)
if ((m_totalDifficultyOfNeeded && td < m_totalDifficultyOfNeeded) || td < m_chain.details().totalDifficulty)
{
clog(NetNote) << "Difficulty of hashchain LOWER. Ignoring.";
return;

18
libethereum/EthereumHost.h

@ -29,6 +29,7 @@
#include <utility>
#include <thread>
#include <libdevcore/Guards.h>
#include <libdevcore/Worker.h>
#include <libethcore/CommonEth.h>
#include <libp2p/Common.h>
#include "CommonNet.h"
@ -49,13 +50,13 @@ class BlockQueue;
* @brief The EthereumHost class
* @warning None of this is thread-safe. You have been warned.
*/
class EthereumHost: public p2p::HostCapability<EthereumPeer>
class EthereumHost: public p2p::HostCapability<EthereumPeer>, Worker
{
friend class EthereumPeer;
public:
/// Start server, but don't listen.
EthereumHost(BlockChain const& _ch, u256 _networkId);
EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQueue& _bq, u256 _networkId);
/// Will block on network process events.
virtual ~EthereumHost();
@ -64,9 +65,6 @@ public:
u256 networkId() const { return m_networkId; }
void setNetworkId(u256 _n) { m_networkId = _n; }
/// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network.
bool sync(TransactionQueue&, BlockQueue& _bc);
private:
/// Session wants to pass us a block that we might not have.
/// @returns true if we didn't have it.
@ -76,6 +74,9 @@ private:
/// Called when the peer can no longer provide us with any needed blocks.
void noteDoneBlocks();
/// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network.
void doWork();
void maintainTransactions(TransactionQueue& _tq, h256 _currentBlock);
void maintainBlocks(BlockQueue& _bq, h256 _currentBlock);
@ -90,7 +91,12 @@ private:
/// Initialises the network peer-state, doing the stuff that needs to be once-only. @returns true if it really was first.
bool ensureInitialised(TransactionQueue& _tq);
BlockChain const* m_chain = nullptr;
virtual void onStarting() { startWorking(); }
virtual void onStopping() { stopWorking(); }
BlockChain const& m_chain;
TransactionQueue& m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain.
BlockQueue& m_bq; ///< Maintains a list of incoming blocks not yet on the blockchain (to be imported).
u256 m_networkId;

28
libethereum/EthereumPeer.cpp

@ -57,9 +57,9 @@ void EthereumPeer::sendStatus()
s.appendList(6) << StatusPacket
<< host()->protocolVersion()
<< host()->networkId()
<< host()->m_chain->details().totalDifficulty
<< host()->m_chain->currentHash()
<< host()->m_chain->genesisHash();
<< host()->m_chain.details().totalDifficulty
<< host()->m_chain.currentHash()
<< host()->m_chain.genesisHash();
sealAndSend(s);
}
@ -73,11 +73,11 @@ void EthereumPeer::startInitialSync()
sealAndSend(s);
}
h256 c = host()->m_chain->currentHash();
unsigned n = host()->m_chain->number();
u256 td = max(host()->m_chain->details().totalDifficulty, host()->m_totalDifficultyOfNeeded);
h256 c = host()->m_chain.currentHash();
unsigned n = host()->m_chain.number();
u256 td = max(host()->m_chain.details().totalDifficulty, host()->m_totalDifficultyOfNeeded);
clogS(NetAllDetail) << "Initial sync. Latest:" << c.abridged() << ", number:" << n << ", TD: max(" << host()->m_chain->details().totalDifficulty << "," << host()->m_totalDifficultyOfNeeded << ") versus " << m_totalDifficulty;
clogS(NetAllDetail) << "Initial sync. Latest:" << c.abridged() << ", number:" << n << ", TD: max(" << host()->m_chain.details().totalDifficulty << "," << host()->m_totalDifficultyOfNeeded << ") versus " << m_totalDifficulty;
if (td > m_totalDifficulty)
return; // All good - we have the better chain.
@ -132,7 +132,7 @@ bool EthereumPeer::interpret(RLP const& _r)
clogS(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << genesisHash.abridged() << ", TD:" << m_totalDifficulty << "=" << m_latestHash.abridged();
if (genesisHash != host()->m_chain->genesisHash())
if (genesisHash != host()->m_chain.genesisHash())
disable("Invalid genesis hash");
if (m_protocolVersion != host()->protocolVersion())
disable("Invalid protocol version.");
@ -162,12 +162,12 @@ bool EthereumPeer::interpret(RLP const& _r)
unsigned limit = _r[2].toInt<unsigned>();
clogS(NetMessageSummary) << "GetBlockHashes (" << limit << "entries," << later.abridged() << ")";
unsigned c = min<unsigned>(host()->m_chain->number(later), limit);
unsigned c = min<unsigned>(host()->m_chain.number(later), limit);
RLPStream s;
prep(s).appendList(1 + c).append(BlockHashesPacket);
h256 p = host()->m_chain->details(later).parent;
for (unsigned i = 0; i < c; ++i, p = host()->m_chain->details(p).parent)
h256 p = host()->m_chain.details(later).parent;
for (unsigned i = 0; i < c; ++i, p = host()->m_chain.details(p).parent)
s << p;
sealAndSend(s);
break;
@ -183,7 +183,7 @@ bool EthereumPeer::interpret(RLP const& _r)
for (unsigned i = 1; i < _r.itemCount(); ++i)
{
auto h = _r[i].toHash<h256>();
if (host()->m_chain->details(h))
if (host()->m_chain.details(h))
{
host()->noteHaveChain(this);
return true;
@ -206,7 +206,7 @@ bool EthereumPeer::interpret(RLP const& _r)
unsigned n = 0;
for (unsigned i = 1; i < _r.itemCount() && i <= c_maxBlocks; ++i)
{
auto b = host()->m_chain->block(_r[i].toHash<h256>());
auto b = host()->m_chain.block(_r[i].toHash<h256>());
if (b.size())
{
rlp += b;
@ -248,7 +248,7 @@ bool EthereumPeer::interpret(RLP const& _r)
auto h = BlockInfo::headerHash(_r[i].data());
BlockInfo bi(_r[i].data());
Guard l(x_knownBlocks);
if (!host()->m_chain->details(bi.parentHash) && !m_knownBlocks.count(bi.parentHash))
if (!host()->m_chain.details(bi.parentHash) && !m_knownBlocks.count(bi.parentHash))
{
unknownParents++;
clogS(NetAllDetail) << "Unknown parent" << bi.parentHash << "of block" << h;

15
libp2p/Host.cpp

@ -17,7 +17,7 @@
/** @file Host.cpp
* @authors:
* Gav Wood <i@gavwood.com>
* Eric Lombrozo <elombrozo@gmail.com>
* Eric Lombrozo <elombrozo@gmail.com> (Windows version of populateAddresses())
* @date 2014
*/
@ -55,6 +55,7 @@ static const set<bi::address> c_rejectAddresses = {
};
Host::Host(std::string const& _clientVersion, NetworkPreferences const& _n, bool _start):
Worker("p2p"),
m_clientVersion(_clientVersion),
m_netPrefs(_n),
m_acceptor(m_ioService),
@ -104,10 +105,20 @@ void Host::start()
ensureAccepting();
m_lastPeersRequest = chrono::steady_clock::time_point::min();
clog(NetNote) << "Id:" << m_id.abridged();
for (auto const& h: m_capabilities)
h.second->onStarting();
startWorking();
}
void Host::stop()
{
for (auto const& h: m_capabilities)
h.second->onStopping();
stopWorking();
if (m_acceptor.is_open())
{
if (m_accepting)
@ -473,7 +484,7 @@ std::vector<PeerInfo> Host::peers(bool _updatePing) const
return ret;
}
void Host::process()
void Host::doWork()
{
growPeers();
prunePeers();

16
libp2p/Host.h

@ -29,6 +29,7 @@
#include <utility>
#include <thread>
#include <libdevcore/Guards.h>
#include <libdevcore/Worker.h>
#include "HostCapability.h"
namespace ba = boost::asio;
namespace bi = boost::asio::ip;
@ -55,7 +56,7 @@ struct NetworkPreferences
* @brief The Host class
* Capabilities should be registered prior to startNetwork, since m_capabilities is not thread-safe.
*/
class Host
class Host: public Worker
{
friend class Session;
friend class HostCapabilityFace;
@ -84,11 +85,6 @@ public:
void connect(std::string const& _addr, unsigned short _port = 30303) noexcept;
void connect(bi::tcp::endpoint const& _ep);
/// Conduct I/O, polling, syncing, whatever.
/// Ideally all time-consuming I/O is done in a background thread or otherwise asynchronously, but you get this call every 100ms or so anyway.
/// This won't touch alter the blockchain.
void process();
/// @returns true iff we have the a peer of the given id.
bool havePeer(h512 _id) const;
@ -117,12 +113,13 @@ public:
void start();
void stop();
bool isStarted() const { return isWorking(); }
h512 id() const { return m_id; }
void registerPeer(std::shared_ptr<Session> _s, std::vector<std::string> const& _caps);
protected:
private:
/// Called when the session has provided us with a new peer we can connect to.
void noteNewPeers() {}
@ -134,6 +131,11 @@ protected:
void growPeers();
void prunePeers();
/// Conduct I/O, polling, syncing, whatever.
/// Ideally all time-consuming I/O is done in a background thread or otherwise asynchronously, but you get this call every 100ms or so anyway.
/// This won't touch alter the blockchain.
virtual void doWork();
std::map<h512, bi::tcp::endpoint> potentialPeers();
std::string m_clientVersion;

3
libp2p/HostCapability.h

@ -49,6 +49,9 @@ protected:
virtual std::string name() const = 0;
virtual Capability* newPeerCapability(Session* _s) = 0;
virtual void onStarting() {}
virtual void onStopping() {}
void seal(bytes& _b);
private:

54
libwebthree/WebThree.cpp

@ -51,88 +51,34 @@ WebThreeDirect::WebThreeDirect(std::string const& _clientVersion, std::string co
WebThreeDirect::~WebThreeDirect()
{
stopNetwork();
}
void WebThreeDirect::startNetwork()
{
static const char* c_threadName = "p2p";
m_net.start();
UpgradableGuard l(x_work);
{
UpgradeGuard ul(l);
if (!m_work)
m_work.reset(new thread([&]()
{
setThreadName(c_threadName);
m_workState.store(Active, std::memory_order_release);
while (m_workState.load(std::memory_order_acquire) != Deleting)
{
this_thread::sleep_for(chrono::milliseconds(1));
ReadGuard l(x_work);
m_net.process(); // must be in guard for now since it uses the blockchain.
}
m_workState.store(Deleted, std::memory_order_release);
}));
}
}
void WebThreeDirect::stopNetwork()
{
m_net.stop();
UpgradableGuard l(x_work);
if (m_work)
{
if (m_workState.load(std::memory_order_acquire) == Active)
m_workState.store(Deleting, std::memory_order_release);
while (m_workState.load(std::memory_order_acquire) != Deleted)
this_thread::sleep_for(chrono::milliseconds(10));
m_work->join();
}
if (m_work)
{
UpgradeGuard ul(l);
m_work.reset(nullptr);
}
}
std::vector<PeerInfo> WebThreeDirect::peers()
{
ReadGuard l(x_work);
return m_net.peers();
}
size_t WebThreeDirect::peerCount() const
{
ReadGuard l(x_work);
return m_net.peerCount();
}
void WebThreeDirect::setIdealPeerCount(size_t _n)
{
ReadGuard l(x_work);
return m_net.setIdealPeerCount(_n);
}
bytes WebThreeDirect::savePeers()
{
ReadGuard l(x_work);
return m_net.savePeers();
}
void WebThreeDirect::restorePeers(bytesConstRef _saved)
{
ReadGuard l(x_work);
return m_net.restorePeers(_saved);
}
void WebThreeDirect::connect(std::string const& _seedHost, unsigned short _port)
{
ReadGuard l(x_work);
m_net.connect(_seedHost, _port);
}

9
libwebthree/WebThree.h

@ -104,15 +104,15 @@ public:
/// Sets the ideal number of peers.
void setIdealPeerCount(size_t _n);
bool haveNetwork() const { return !!m_work; }
bool haveNetwork() const { return m_net.isStarted(); }
void setNetworkPreferences(p2p::NetworkPreferences const& _n) { auto had = haveNetwork(); if (had) stopNetwork(); m_netPrefs = _n; if (had) startNetwork(); }
/// Start the network subsystem.
void startNetwork();
void startNetwork() { m_net.start(); }
/// Stop the network subsystem.
void stopNetwork();
void stopNetwork() { m_net.stop(); }
private:
std::string m_clientVersion; ///< Our end-application client's name/version.
@ -121,9 +121,6 @@ private:
std::unique_ptr<shh::WhisperHost> m_whisper; ///< Main interface for Whisper ("shh") protocol.
p2p::Host m_net; ///< Should run in background and send us events when blocks found and allow us to send blocks as required.
std::unique_ptr<std::thread> m_work; ///< The network thread.
mutable boost::shared_mutex x_work; ///< Lock for the network existance.
std::atomic<WorkState> m_workState;
p2p::NetworkPreferences m_netPrefs;
};

Loading…
Cancel
Save