Browse Source

Merge pull request #543 from ethereum/network

ioservice with multiple threads
cl-refactor
Gav Wood 10 years ago
parent
commit
fca8873f10
  1. 31
      libp2p/Host.cpp
  2. 6
      libp2p/Host.h
  3. 2
      libwhisper/WhisperHost.cpp
  4. 36
      libwhisper/WhisperPeer.cpp

31
libp2p/Host.cpp

@ -34,6 +34,7 @@
#include <set>
#include <chrono>
#include <thread>
#include <mutex>
#include <boost/algorithm/string.hpp>
#include <libdevcore/Common.h>
#include <libdevcore/CommonIO.h>
@ -199,7 +200,7 @@ Host::Host(std::string const& _clientVersion, NetworkPreferences const& _n, bool
m_clientVersion(_clientVersion),
m_netPrefs(_n),
m_ifAddresses(getInterfaceAddresses()),
m_ioService(new ba::io_service),
m_ioService(new ba::io_service(2)),
m_acceptor(new bi::tcp::acceptor(*m_ioService)),
m_socket(new bi::tcp::socket(*m_ioService)),
m_key(KeyPair::create())
@ -227,7 +228,7 @@ void Host::stop()
{
{
// prevent m_run from being set to false at same time as set to true by start()
lock_guard<mutex> l(x_runtimer);
Guard l(x_runTimer);
// once m_run is false the scheduler will shutdown network and stopWorking()
m_run = false;
}
@ -536,7 +537,16 @@ void Host::connect(std::shared_ptr<Node> const& _n)
// if there's no ioService, it means we've had quit() called - bomb out - we're not allowed in here.
if (!m_ioService)
return;
// prevent concurrently connecting to a node; todo: better abstraction
Node *nptr = _n.get();
{
Guard l(x_pendingNodeConns);
if (m_pendingNodeConns.count(nptr))
return;
m_pendingNodeConns.insert(nptr);
}
clog(NetConnect) << "Attempting connection to node" << _n->id.abridged() << "@" << _n->address << "from" << id().abridged();
_n->lastAttempted = std::chrono::system_clock::now();
_n->failedAttempts++;
@ -559,6 +569,8 @@ void Host::connect(std::shared_ptr<Node> const& _n)
p->start();
}
delete s;
Guard l(x_pendingNodeConns);
m_pendingNodeConns.erase(nptr);
});
}
@ -685,11 +697,10 @@ PeerInfos Host::peers(bool _updatePing) const
ret.push_back(j->m_info);
return ret;
}
void Host::run(boost::system::error_code const& error)
{
static unsigned s_lasttick = 0;
s_lasttick += c_timerInterval;
m_lastTick += c_timerInterval;
if (error || !m_ioService)
{
@ -701,11 +712,11 @@ void Host::run(boost::system::error_code const& error)
// network running
if (m_run)
{
if (s_lasttick >= c_timerInterval * 50)
if (m_lastTick >= c_timerInterval * 10)
{
growPeers();
prunePeers();
s_lasttick = 0;
m_lastTick = 0;
}
if (m_hadNewNodes)
@ -771,7 +782,7 @@ void Host::run(boost::system::error_code const& error)
m_socket->close();
// m_run is false, so we're stopping; kill timer
s_lasttick = 0;
m_lastTick = 0;
// causes parent thread's stop() to continue which calls stopWorking()
m_timer.reset();
@ -794,7 +805,7 @@ void Host::startedWorking()
// prevent m_run from being set to true at same time as set to false by stop()
// don't release mutex until m_timer is set so in case stop() is called at same
// time, stop will wait on m_timer and graceful network shutdown.
lock_guard<mutex> l(x_runtimer);
Guard l(x_runTimer);
// reset io service and create deadline timer
m_timer.reset(new boost::asio::deadline_timer(*m_ioService));
m_run = true;

6
libp2p/Host.h

@ -224,7 +224,7 @@ private:
Nodes potentialPeers(RangeMask<unsigned> const& _known);
bool m_run = false; ///< Whether network is running.
std::mutex x_runtimer; ///< Start/stop mutex.
std::mutex x_runTimer; ///< Start/stop mutex.
std::string m_clientVersion; ///< Our version string.
@ -241,6 +241,10 @@ private:
std::unique_ptr<boost::asio::deadline_timer> m_timer; ///< Timer which, when network is running, calls scheduler() every c_timerInterval ms.
static const unsigned c_timerInterval = 100; ///< Interval which m_timer is run when network is connected.
unsigned m_lastTick = 0; ///< Used by run() for scheduling; must not be mutated outside of run().
std::set<Node*> m_pendingNodeConns; /// Used only by connect(Node&) to limit concurrently connecting to same node. See connect(shared_ptr<Node>const&).
Mutex x_pendingNodeConns;
bi::tcp::endpoint m_public; ///< Our public listening endpoint.
KeyPair m_key; ///< Our unique ID.

2
libwhisper/WhisperHost.cpp

@ -118,7 +118,6 @@ unsigned WhisperHost::installWatch(shh::TopicFilter const& _f)
h256s WhisperHost::watchMessages(unsigned _watchId)
{
cleanup();
h256s ret;
auto wit = m_watches.find(_watchId);
if (wit == m_watches.end())
@ -160,6 +159,7 @@ void WhisperHost::doWork()
{
for (auto& i: peers())
i->cap<WhisperPeer>()->sendMessages();
cleanup();
}
void WhisperHost::cleanup()

36
libwhisper/WhisperPeer.cpp

@ -82,33 +82,23 @@ bool WhisperPeer::interpret(unsigned _id, RLP const& _r)
void WhisperPeer::sendMessages()
{
RLPStream amalg;
unsigned n = 0;
if (m_unseen.size())
{
Guard l(x_unseen);
while (m_unseen.size())
RLPStream amalg;
unsigned msgCount;
{
auto p = *m_unseen.begin();
m_unseen.erase(m_unseen.begin());
host()->streamMessage(p.second, amalg);
n++;
Guard l(x_unseen);
msgCount = m_unseen.size();
while (m_unseen.size())
{
auto p = *m_unseen.begin();
m_unseen.erase(m_unseen.begin());
host()->streamMessage(p.second, amalg);
}
}
}
// the message subsystem should really just keep pumping out messages while m_unseen.size() and there's bandwidth for them.
auto diff = chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now() - m_timer);
if (n || diff.count() > 0)
{
RLPStream s;
prep(s, MessagesPacket, n).appendRaw(amalg.out(), n);
sealAndSend(s);
m_timer = chrono::system_clock::now();
}
{
RLPStream s;
prep(s, MessagesPacket, n).appendRaw(amalg.out(), n);
prep(s, MessagesPacket, msgCount).appendRaw(amalg.out(), msgCount);
sealAndSend(s);
}
}

Loading…
Cancel
Save