Browse Source

Added session to hostcapabilityface::peers to ensure it isn't deallocated when shared-ptr to cap is returned. Previously hosts depended on using Session however this could result in an infinite session when hostcapface::peers is processed concurrently (mutexes can make this a likely event). This will be cleaner with better integration of Session and Peer.

cl-refactor
subtly 10 years ago
parent
commit
572e451bab
  1. 25
      libethereum/EthereumHost.cpp
  2. 10
      libp2p/HostCapability.cpp
  3. 3
      libp2p/HostCapability.h
  4. 2
      libp2p/Session.cpp
  5. 16
      libwhisper/WhisperHost.cpp

25
libethereum/EthereumHost.cpp

@ -51,8 +51,8 @@ EthereumHost::EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQu
EthereumHost::~EthereumHost()
{
for (auto const& i: peers())
i->cap<EthereumPeer>()->abortSync();
for (auto i: peerSessions())
i.first->cap<EthereumPeer>().get()->abortSync();
}
bool EthereumHost::ensureInitialised()
@ -95,16 +95,19 @@ void EthereumHost::changeSyncer(EthereumPeer* _syncer)
if (isSyncing())
{
if (_syncer->m_asking == Asking::Blocks)
for (auto j: peers())
if (j->cap<EthereumPeer>().get() != _syncer && j->cap<EthereumPeer>()->m_asking == Asking::Nothing)
j->cap<EthereumPeer>()->transition(Asking::Blocks);
for (auto j: peerSessions())
{
auto e = j.first->cap<EthereumPeer>().get();
if (e != _syncer && e->m_asking == Asking::Nothing)
e->transition(Asking::Blocks);
}
}
else
{
// start grabbing next hash chain if there is one.
for (auto j: peers())
for (auto j: peerSessions())
{
j->cap<EthereumPeer>()->attemptSync();
j.first->cap<EthereumPeer>()->attemptSync();
if (isSyncing())
return;
}
@ -167,8 +170,8 @@ void EthereumHost::doWork()
void EthereumHost::maintainTransactions()
{
// Send any new transactions.
for (auto const& p: peers())
if (auto ep = p->cap<EthereumPeer>())
for (auto p: peerSessions())
if (auto ep = p.first->cap<EthereumPeer>().get())
{
bytes b;
unsigned n = 0;
@ -198,9 +201,9 @@ void EthereumHost::maintainBlocks(h256 _currentHash)
{
clog(NetMessageSummary) << "Sending a new block (current is" << _currentHash << ", was" << m_latestBlockSent << ")";
for (auto j: peers())
for (auto j: peerSessions())
{
auto p = j->cap<EthereumPeer>();
auto p = j.first->cap<EthereumPeer>().get();
RLPStream ts;
p->prep(ts, NewBlockPacket, 2).appendRaw(m_chain.block(), 1).append(m_chain.details().totalDifficulty);

10
libp2p/HostCapability.cpp

@ -32,13 +32,13 @@ void HostCapabilityFace::seal(bytes& _b)
m_host->seal(_b);
}
std::vector<std::shared_ptr<Session> > HostCapabilityFace::peers() const
std::vector<std::pair<std::shared_ptr<Session>,std::shared_ptr<Peer>>> HostCapabilityFace::peerSessions() const
{
RecursiveGuard l(m_host->x_sessions);
std::vector<std::shared_ptr<Session> > ret;
std::vector<std::pair<std::shared_ptr<Session>,std::shared_ptr<Peer>>> ret;
for (auto const& i: m_host->m_sessions)
if (std::shared_ptr<Session> p = i.second.lock())
if (p->m_capabilities.count(capDesc()))
ret.push_back(p);
if (std::shared_ptr<Session> s = i.second.lock())
if (s->m_capabilities.count(capDesc()))
ret.push_back(make_pair(s,s->m_peer));
return ret;
}

3
libp2p/HostCapability.h

@ -23,6 +23,7 @@
#pragma once
#include "Peer.h"
#include "Common.h"
namespace dev
@ -44,7 +45,7 @@ public:
Host* host() const { return m_host; }
std::vector<std::shared_ptr<Session> > peers() const;
std::vector<std::pair<std::shared_ptr<Session>,std::shared_ptr<Peer>>> peerSessions() const;
protected:
virtual std::string name() const = 0;

2
libp2p/Session.cpp

@ -205,6 +205,8 @@ bool Session::interpret(RLP const& _r)
{
// Already connected.
clogS(NetWarn) << "Already connected to a peer with id" << id.abridged();
// Possible that two nodes continually connect to each other with exact same timing.
this_thread::sleep_for(chrono::milliseconds(rand() % 100));
disconnect(DuplicatePeer);
return true;
}

16
libwhisper/WhisperHost.cpp

@ -79,11 +79,15 @@ void WhisperHost::inject(Envelope const& _m, WhisperPeer* _p)
noteChanged(h, f.first);
}
for (auto& i: peers())
if (i->cap<WhisperPeer>().get() == _p)
i->addRating(1);
// TODO p2p: capability-based rating
for (auto i: peerSessions())
{
auto w = i.first->cap<WhisperPeer>().get();
if (w == _p)
w->addRating(1);
else
i->cap<WhisperPeer>()->noteNewMessage(h, _m);
w->noteNewMessage(h, _m);
}
}
void WhisperHost::noteChanged(h256 _messageHash, h256 _filter)
@ -158,8 +162,8 @@ void WhisperHost::uninstallWatch(unsigned _i)
void WhisperHost::doWork()
{
for (auto& i: peers())
i->cap<WhisperPeer>()->sendMessages();
for (auto& i: peerSessions())
i.first->cap<WhisperPeer>().get()->sendMessages();
cleanup();
}

Loading…
Cancel
Save