diff --git a/libp2p/Host.cpp b/libp2p/Host.cpp index cad1b179c..7d08910aa 100644 --- a/libp2p/Host.cpp +++ b/libp2p/Host.cpp @@ -34,6 +34,7 @@ #include #include #include +#include #include #include #include @@ -199,7 +200,7 @@ Host::Host(std::string const& _clientVersion, NetworkPreferences const& _n, bool m_clientVersion(_clientVersion), m_netPrefs(_n), m_ifAddresses(getInterfaceAddresses()), - m_ioService(new ba::io_service), + m_ioService(new ba::io_service(2)), m_acceptor(new bi::tcp::acceptor(*m_ioService)), m_socket(new bi::tcp::socket(*m_ioService)), m_key(KeyPair::create()) @@ -227,7 +228,7 @@ void Host::stop() { { // prevent m_run from being set to false at same time as set to true by start() - lock_guard l(x_runtimer); + Guard l(x_runTimer); // once m_run is false the scheduler will shutdown network and stopWorking() m_run = false; } @@ -536,7 +537,16 @@ void Host::connect(std::shared_ptr const& _n) // if there's no ioService, it means we've had quit() called - bomb out - we're not allowed in here. if (!m_ioService) return; - + + // prevent concurrently connecting to a node; todo: better abstraction + Node *nptr = _n.get(); + { + Guard l(x_pendingNodeConns); + if (m_pendingNodeConns.count(nptr)) + return; + m_pendingNodeConns.insert(nptr); + } + clog(NetConnect) << "Attempting connection to node" << _n->id.abridged() << "@" << _n->address << "from" << id().abridged(); _n->lastAttempted = std::chrono::system_clock::now(); _n->failedAttempts++; @@ -559,6 +569,8 @@ void Host::connect(std::shared_ptr const& _n) p->start(); } delete s; + Guard l(x_pendingNodeConns); + m_pendingNodeConns.erase(nptr); }); } @@ -685,11 +697,10 @@ PeerInfos Host::peers(bool _updatePing) const ret.push_back(j->m_info); return ret; } - + void Host::run(boost::system::error_code const& error) { - static unsigned s_lasttick = 0; - s_lasttick += c_timerInterval; + m_lastTick += c_timerInterval; if (error || !m_ioService) { @@ -701,11 +712,11 @@ void Host::run(boost::system::error_code const& error) // network running if (m_run) { - if (s_lasttick >= c_timerInterval * 50) + if (m_lastTick >= c_timerInterval * 10) { growPeers(); prunePeers(); - s_lasttick = 0; + m_lastTick = 0; } if (m_hadNewNodes) @@ -771,7 +782,7 @@ void Host::run(boost::system::error_code const& error) m_socket->close(); // m_run is false, so we're stopping; kill timer - s_lasttick = 0; + m_lastTick = 0; // causes parent thread's stop() to continue which calls stopWorking() m_timer.reset(); @@ -794,7 +805,7 @@ void Host::startedWorking() // prevent m_run from being set to true at same time as set to false by stop() // don't release mutex until m_timer is set so in case stop() is called at same // time, stop will wait on m_timer and graceful network shutdown. - lock_guard l(x_runtimer); + Guard l(x_runTimer); // reset io service and create deadline timer m_timer.reset(new boost::asio::deadline_timer(*m_ioService)); m_run = true; diff --git a/libp2p/Host.h b/libp2p/Host.h index c82ecf84c..644afeb69 100644 --- a/libp2p/Host.h +++ b/libp2p/Host.h @@ -224,7 +224,7 @@ private: Nodes potentialPeers(RangeMask const& _known); bool m_run = false; ///< Whether network is running. - std::mutex x_runtimer; ///< Start/stop mutex. + std::mutex x_runTimer; ///< Start/stop mutex. std::string m_clientVersion; ///< Our version string. @@ -241,6 +241,10 @@ private: std::unique_ptr m_timer; ///< Timer which, when network is running, calls scheduler() every c_timerInterval ms. static const unsigned c_timerInterval = 100; ///< Interval which m_timer is run when network is connected. + unsigned m_lastTick = 0; ///< Used by run() for scheduling; must not be mutated outside of run(). + + std::set m_pendingNodeConns; /// Used only by connect(Node&) to limit concurrently connecting to same node. See connect(shared_ptrconst&). + Mutex x_pendingNodeConns; bi::tcp::endpoint m_public; ///< Our public listening endpoint. KeyPair m_key; ///< Our unique ID. diff --git a/libwhisper/WhisperHost.cpp b/libwhisper/WhisperHost.cpp index 9b26e3260..769b39057 100644 --- a/libwhisper/WhisperHost.cpp +++ b/libwhisper/WhisperHost.cpp @@ -118,7 +118,6 @@ unsigned WhisperHost::installWatch(shh::TopicFilter const& _f) h256s WhisperHost::watchMessages(unsigned _watchId) { - cleanup(); h256s ret; auto wit = m_watches.find(_watchId); if (wit == m_watches.end()) @@ -160,6 +159,7 @@ void WhisperHost::doWork() { for (auto& i: peers()) i->cap()->sendMessages(); + cleanup(); } void WhisperHost::cleanup() diff --git a/libwhisper/WhisperPeer.cpp b/libwhisper/WhisperPeer.cpp index c3a28e3c3..0951a3726 100644 --- a/libwhisper/WhisperPeer.cpp +++ b/libwhisper/WhisperPeer.cpp @@ -82,33 +82,23 @@ bool WhisperPeer::interpret(unsigned _id, RLP const& _r) void WhisperPeer::sendMessages() { - RLPStream amalg; - unsigned n = 0; - + if (m_unseen.size()) { - Guard l(x_unseen); - while (m_unseen.size()) + RLPStream amalg; + unsigned msgCount; { - auto p = *m_unseen.begin(); - m_unseen.erase(m_unseen.begin()); - host()->streamMessage(p.second, amalg); - n++; + Guard l(x_unseen); + msgCount = m_unseen.size(); + while (m_unseen.size()) + { + auto p = *m_unseen.begin(); + m_unseen.erase(m_unseen.begin()); + host()->streamMessage(p.second, amalg); + } } - } - - // the message subsystem should really just keep pumping out messages while m_unseen.size() and there's bandwidth for them. - auto diff = chrono::duration_cast(chrono::system_clock::now() - m_timer); - if (n || diff.count() > 0) - { - RLPStream s; - prep(s, MessagesPacket, n).appendRaw(amalg.out(), n); - sealAndSend(s); - m_timer = chrono::system_clock::now(); - } - - { + RLPStream s; - prep(s, MessagesPacket, n).appendRaw(amalg.out(), n); + prep(s, MessagesPacket, msgCount).appendRaw(amalg.out(), msgCount); sealAndSend(s); } }