Browse Source

Slowing bashing code into shape.

cl-refactor
Gav Wood 10 years ago
parent
commit
9e76b49578
  1. 5
      libethereum/Client.cpp
  2. 9
      libethereum/Client.h
  3. 4
      libethereum/CommonNet.h
  4. 46
      libethereum/EthereumHost.cpp
  5. 5
      libethereum/EthereumHost.h
  6. 76
      libethereum/EthereumPeer.cpp
  7. 18
      libethereum/EthereumPeer.h
  8. 8
      libethereumx/Ethereum.cpp
  9. 4
      libethereumx/Ethereum.h
  10. 47
      libethnet/Common.cpp
  11. 35
      libethnet/Common.h
  12. 13
      libethnet/PeerHost.cpp
  13. 5
      libethnet/PeerHost.h
  14. 18
      libethnet/PeerSession.cpp
  15. 13
      libethnet/PeerSession.h

5
libethereum/Client.cpp

@ -25,6 +25,7 @@
#include <thread>
#include <boost/filesystem.hpp>
#include <libethential/Log.h>
#include <libethnet/PeerHost.h>
#include "Defaults.h"
#include "EthereumHost.h"
using namespace std;
@ -213,7 +214,9 @@ void Client::startNetwork(unsigned short _listenPort, std::string const& _seedHo
try
{
m_net.reset(new EthereumHost(m_clientVersion, m_bc, _networkId, _listenPort, _mode, _publicIP, _upnp));
m_net.reset(new PeerHost(m_clientVersion, _listenPort, _publicIP, _upnp));
if (_mode == NodeMode::Full)
m_net->registerCapability(new EthereumHost(m_bc, _networkId));
}
catch (std::exception const&)
{

9
libethereum/Client.h

@ -28,9 +28,10 @@
#include <boost/utility.hpp>
#include <libethential/Common.h>
#include <libethential/CommonIO.h>
#include <libethential/Guards.h>
#include <libevm/FeeStructure.h>
#include <libethcore/Dagger.h>
#include <libethential/Guards.h>
#include <libethnet/Common.h>
#include "BlockChain.h"
#include "TransactionQueue.h"
#include "State.h"
@ -51,6 +52,12 @@ enum ClientWorkState
Deleted
};
enum class NodeMode
{
Peer,
Full
};
class VersionChecker
{
public:

4
libethereum/CommonNet.h

@ -41,9 +41,9 @@ class OverlayDB;
class BlockChain;
class TransactionQueue;
class EthereumHost;
class EthereumSession;
class EthereumPeer;
enum PacketType
enum
{
StatusPacket = 0x10,
GetTransactionsPacket,

46
libethereum/EthereumHost.cpp

@ -28,26 +28,27 @@
#include <thread>
#include <libethential/Common.h>
#include <libethnet/PeerHost.h>
#include <libethnet/PeerSession.h>
#include <libethcore/UPnP.h>
#include <libethcore/Exceptions.h>
#include "BlockChain.h"
#include "TransactionQueue.h"
#include "BlockQueue.h"
#include "EthereumSession.h"
#include "EthereumPeer.h"
using namespace std;
using namespace eth;
EthereumHost::EthereumHost(BlockChain const& _ch, u256 _networkId):
m_chain(&_ch),
m_networkId(_networkId)
HostCapability<EthereumPeer>(),
m_chain (&_ch),
m_networkId (_networkId)
{
}
EthereumHost::~EthereumHost()
{
for (auto i: host()->m_peers)
if (shared_ptr<EthereumSession> p = i.second.lock())
p->giveUpOnFetch();
for (auto const& i: peers())
i->cap<EthereumPeer>()->giveUpOnFetch();
}
h256Set EthereumHost::neededBlocks(h256Set const& _exclude)
@ -84,7 +85,6 @@ bool EthereumHost::ensureInitialised(TransactionQueue& _tq)
for (auto const& i: _tq.transactions())
m_transactionsSent.insert(i.first);
m_lastPeersRequest = chrono::steady_clock::time_point::min();
return true;
}
return false;
@ -135,14 +135,13 @@ void EthereumHost::maintainTransactions(TransactionQueue& _tq, h256 _currentHash
m_incomingTransactions.clear();
// Send any new transactions.
Guard l(host()->x_peers);
for (auto j: host()->m_peers)
if (auto p = j.second.lock())
for (auto const& p: peers())
{
auto ep = p->cap<EthereumPeer>();
bytes b;
uint n = 0;
for (auto const& i: _tq.transactions())
if ((!m_transactionsSent.count(i.first) && !p->m_knownTransactions.count(i.first)) || p->m_requireTransactions || resendAll)
if ((!m_transactionsSent.count(i.first) && !ep->m_knownTransactions.count(i.first)) || ep->m_requireTransactions || resendAll)
{
b += i.second;
++n;
@ -151,14 +150,14 @@ void EthereumHost::maintainTransactions(TransactionQueue& _tq, h256 _currentHash
if (n)
{
RLPStream ts;
EthereumSession::prep(ts);
EthereumPeer::prep(ts);
ts.appendList(n + 1) << TransactionsPacket;
ts.appendRaw(b, n).swapOut(b);
seal(b);
p->send(&b);
ep->send(&b);
}
p->m_knownTransactions.clear();
p->m_requireTransactions = false;
ep->m_knownTransactions.clear();
ep->m_requireTransactions = false;
}
}
@ -184,7 +183,7 @@ void EthereumHost::maintainBlocks(BlockQueue& _bq, h256 _currentHash)
if (m_latestBlockSent != _currentHash)
{
RLPStream ts;
EthereumSession::prep(ts);
EthereumPeer::prep(ts);
bytes bs;
unsigned c = 0;
for (auto h: m_chain->treeRoute(m_latestBlockSent, _currentHash, nullptr, false, true))
@ -198,10 +197,9 @@ void EthereumHost::maintainBlocks(BlockQueue& _bq, h256 _currentHash)
ts.swapOut(b);
seal(b);
Guard l(host()->x_peers);
for (auto j: host()->m_peers)
if (auto p = j.second.lock())
for (auto j: peers())
{
auto p = j->cap<EthereumPeer>();
if (!p->m_knownBlocks.count(_currentHash))
p->send(&b);
p->m_knownBlocks.clear();
@ -210,7 +208,7 @@ void EthereumHost::maintainBlocks(BlockQueue& _bq, h256 _currentHash)
}
}
void EthereumHost::noteHaveChain(std::shared_ptr<EthereumSession> const& _from)
void EthereumHost::noteHaveChain(EthereumPeer* _from)
{
auto td = _from->m_totalDifficulty;
@ -235,10 +233,6 @@ void EthereumHost::noteHaveChain(std::shared_ptr<EthereumSession> const& _from)
m_totalDifficultyOfNeeded = td;
}
{
Guard l(host()->x_peers);
for (auto const& i: host()->m_peers)
if (shared_ptr<EthereumSession> p = i.second.lock())
p->restartGettingChain();
}
for (auto j: peers())
j->cap<EthereumPeer>()->restartGettingChain();
}

5
libethereum/EthereumHost.h

@ -32,13 +32,14 @@
#include <libethcore/CommonEth.h>
#include <libethnet/Common.h>
#include "CommonNet.h"
#include "EthereumSession.h"
#include "EthereumPeer.h"
namespace eth
{
class RLPStream;
class TransactionQueue;
class BlockQueue;
/**
* @brief The EthereumHost class
@ -66,7 +67,7 @@ private:
/// @returns true if we didn't have it.
bool noteBlock(h256 _hash, bytesConstRef _data);
/// Session has finished getting the chain of hashes.
void noteHaveChain(std::shared_ptr<EthereumSession> const& _who);
void noteHaveChain(EthereumPeer* _who);
/// Called when the peer can no longer provide us with any needed blocks.
void noteDoneBlocks();

76
libethereum/EthereumSession.cpp → libethereum/EthereumPeer.cpp

@ -14,24 +14,25 @@
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file EthereumSession.cpp
/** @file EthereumPeer.cpp
* @author Gav Wood <i@gavwood.com>
* @date 2014
*/
#include "EthereumSession.h"
#include "EthereumPeer.h"
#include <chrono>
#include <libethential/Common.h>
#include <libethcore/Exceptions.h>
#include <libethnet/PeerSession.h>
#include "BlockChain.h"
#include "EthereumHost.h"
using namespace std;
using namespace eth;
#define clogS(X) eth::LogOutputStream<X, true>(false) << "| " << std::setw(2) << m_socket.native_handle() << "] "
#define clogS(X) eth::LogOutputStream<X, true>(false) << "| " << std::setw(2) << session()->id() << "] "
EthereumPeer::EthereumPeer(PeerSession* _s, HostCapability* _h): PeerCapability(_s, _h)
EthereumPeer::EthereumPeer(PeerSession* _s, HostCapabilityFace* _h): PeerCapability(_s, _h)
{
sendStatus();
}
@ -41,16 +42,21 @@ EthereumPeer::~EthereumPeer()
giveUpOnFetch();
}
EthereumHost* EthereumPeer::host() const
{
return static_cast<EthereumHost*>(PeerCapability::hostCapability());
}
void EthereumPeer::sendStatus()
{
RLPStream s;
m_session->prep(s);
prep(s);
s.appendList(9) << StatusPacket
<< hostCapability()->protocolVersion()
<< hostCapability()->networkId()
<< hostCapability()->m_chain->details().totalDifficulty
<< hostCapability()->m_chain->currentHash()
<< hostCapability()->m_chain->genesisHash();
<< host()->protocolVersion()
<< host()->networkId()
<< host()->m_chain->details().totalDifficulty
<< host()->m_chain->currentHash()
<< host()->m_chain->genesisHash();
sealAndSend(s);
}
@ -64,11 +70,11 @@ void EthereumPeer::startInitialSync()
sealAndSend(s);
}
h256 c = m_server->m_chain->currentHash();
uint n = m_server->m_chain->number();
u256 td = max(m_server->m_chain->details().totalDifficulty, m_server->m_totalDifficultyOfNeeded);
h256 c = host()->m_chain->currentHash();
uint n = host()->m_chain->number();
u256 td = max(host()->m_chain->details().totalDifficulty, host()->m_totalDifficultyOfNeeded);
clogS(NetAllDetail) << "Initial sync. Latest:" << c.abridged() << ", number:" << n << ", TD: max(" << m_server->m_chain->details().totalDifficulty << "," << m_server->m_totalDifficultyOfNeeded << ") versus " << m_totalDifficulty;
clogS(NetAllDetail) << "Initial sync. Latest:" << c.abridged() << ", number:" << n << ", TD: max(" << host()->m_chain->details().totalDifficulty << "," << host()->m_totalDifficultyOfNeeded << ") versus " << m_totalDifficulty;
if (td > m_totalDifficulty)
return; // All good - we have the better chain.
@ -97,13 +103,13 @@ void EthereumPeer::giveUpOnFetch()
clogS(NetNote) << "GIVE UP FETCH; can't get " << toString(m_askedBlocks);
if (m_askedBlocks.size())
{
Guard l (m_server->x_blocksNeeded);
m_server->m_blocksNeeded.reserve(m_server->m_blocksNeeded.size() + m_askedBlocks.size());
Guard l (host()->x_blocksNeeded);
host()->m_blocksNeeded.reserve(host()->m_blocksNeeded.size() + m_askedBlocks.size());
for (auto i: m_askedBlocks)
{
m_failedBlocks.insert(i);
m_server->m_blocksOnWay.erase(i);
m_server->m_blocksNeeded.push_back(i);
host()->m_blocksOnWay.erase(i);
host()->m_blocksNeeded.push_back(i);
}
m_askedBlocks.clear();
}
@ -123,11 +129,11 @@ bool EthereumPeer::interpret(RLP const& _r)
clogS(NetMessageSummary) << "Status: " << m_protocolVersion << "/" << m_networkId << "/" << genesisHash.abridged() << ", TD:" << m_totalDifficulty << "=" << m_latestHash.abridged();
if (genesisHash != hostCapability()->m_chain->genesisHash())
if (genesisHash != host()->m_chain->genesisHash())
disable("Invalid genesis hash");
if (m_protocolVersion != hostCapability()->protocolVersion())
if (m_protocolVersion != host()->protocolVersion())
disable("Invalid protocol version.");
if (m_networkId != hostCapability()->networkId() || !m_id)
if (m_networkId != host()->networkId())
disable("Invalid network identifier.");
startInitialSync();
@ -140,10 +146,10 @@ bool EthereumPeer::interpret(RLP const& _r)
}
case TransactionsPacket:
clogS(NetMessageSummary) << "Transactions (" << dec << (_r.itemCount() - 1) << " entries)";
m_rating += _r.itemCount() - 1;
addRating(_r.itemCount() - 1);
for (unsigned i = 1; i < _r.itemCount(); ++i)
{
m_server->m_incomingTransactions.push_back(_r[i].data().toBytes());
host()->m_incomingTransactions.push_back(_r[i].data().toBytes());
m_knownTransactions.insert(sha3(_r[i].data()));
}
break;
@ -153,12 +159,12 @@ bool EthereumPeer::interpret(RLP const& _r)
unsigned limit = _r[2].toInt<unsigned>();
clogS(NetMessageSummary) << "GetBlockHashes (" << limit << "entries, " << later.abridged() << ")";
unsigned c = min<unsigned>(m_server->m_chain->number(later), limit);
unsigned c = min<unsigned>(host()->m_chain->number(later), limit);
RLPStream s;
prep(s).appendList(1 + c).append(BlockHashesPacket);
h256 p = m_server->m_chain->details(later).parent;
for (unsigned i = 0; i < c; ++i, p = m_server->m_chain->details(p).parent)
h256 p = host()->m_chain->details(later).parent;
for (unsigned i = 0; i < c; ++i, p = host()->m_chain->details(p).parent)
s << p;
sealAndSend(s);
break;
@ -168,15 +174,15 @@ bool EthereumPeer::interpret(RLP const& _r)
clogS(NetMessageSummary) << "BlockHashes (" << dec << (_r.itemCount() - 1) << " entries)";
if (_r.itemCount() == 1)
{
m_server->noteHaveChain(shared_from_this());
host()->noteHaveChain(this);
return true;
}
for (unsigned i = 1; i < _r.itemCount(); ++i)
{
auto h = _r[i].toHash<h256>();
if (m_server->m_chain->details(h))
if (host()->m_chain->details(h))
{
m_server->noteHaveChain(shared_from_this());
host()->noteHaveChain(this);
return true;
}
else
@ -197,7 +203,7 @@ bool EthereumPeer::interpret(RLP const& _r)
unsigned n = 0;
for (unsigned i = 1; i < _r.itemCount() && i <= c_maxBlocks; ++i)
{
auto b = m_server->m_chain->block(_r[i].toHash<h256>());
auto b = host()->m_chain->block(_r[i].toHash<h256>());
if (b.size())
{
rlp += b;
@ -223,12 +229,12 @@ bool EthereumPeer::interpret(RLP const& _r)
for (unsigned i = 1; i < _r.itemCount(); ++i)
{
auto h = BlockInfo::headerHash(_r[i].data());
if (m_server->noteBlock(h, _r[i].data()))
if (host()->noteBlock(h, _r[i].data()))
used++;
m_askedBlocks.erase(h);
m_knownBlocks.insert(h);
}
m_rating += used;
addRating(used);
unsigned knownParents = 0;
unsigned unknownParents = 0;
if (g_logVerbosity >= NetMessageSummary::verbosity)
@ -237,7 +243,7 @@ bool EthereumPeer::interpret(RLP const& _r)
{
auto h = BlockInfo::headerHash(_r[i].data());
BlockInfo bi(_r[i].data());
if (!m_server->m_chain->details(bi.parentHash) && !m_knownBlocks.count(bi.parentHash))
if (!host()->m_chain->details(bi.parentHash) && !m_knownBlocks.count(bi.parentHash))
{
unknownParents++;
clogS(NetAllDetail) << "Unknown parent " << bi.parentHash << " of block " << h;
@ -282,7 +288,7 @@ void EthereumPeer::ensureGettingChain()
void EthereumPeer::continueGettingChain()
{
if (!m_askedBlocks.size())
m_askedBlocks = m_server->neededBlocks(m_failedBlocks);
m_askedBlocks = host()->neededBlocks(m_failedBlocks);
if (m_askedBlocks.size())
{
@ -296,6 +302,6 @@ void EthereumPeer::continueGettingChain()
else
{
clogS(NetMessageSummary) << "No blocks left to get. Peer doesn't seem to have " << m_failedBlocks.size() << "of our needed blocks.";
m_server->noteDoneBlocks();
host()->noteDoneBlocks();
}
}

18
libethereum/EthereumSession.h → libethereum/EthereumPeer.h

@ -14,7 +14,7 @@
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file EthereumSession.h
/** @file EthereumPeer.h
* @author Gav Wood <i@gavwood.com>
* @date 2014
*/
@ -28,13 +28,15 @@
#include <utility>
#include <libethential/RLP.h>
#include <libethcore/CommonEth.h>
#include <libethnet/Common.h>
#include "CommonNet.h"
#include "EthereumHost.h"
namespace eth
{
class WhisperSession: public PeerSession
class HostCapabilityFace;
class WhisperSession: public PeerCapability
{
public:
WhisperSession();
@ -43,11 +45,11 @@ public:
static std::string name() { return "shh"; }
private:
virtual bool interpret(RLP const&) {}
virtual bool interpret(RLP const&) { return false; }
};
/**
* @brief The EthereumSession class
* @brief The EthereumPeer class
* @todo Document fully.
*/
class EthereumPeer: public PeerCapability
@ -55,12 +57,12 @@ class EthereumPeer: public PeerCapability
friend class EthereumHost;
public:
EthereumPeer(PeerSession* _s, HostCapability* _h);
EthereumPeer(PeerSession* _s, HostCapabilityFace* _h);
virtual ~EthereumPeer();
static std::string name() { return "eth"; }
EthereumHost* hostCapability() const { return static_cast<EthereumHost*>(PeerCapability::hostCapability()); }
EthereumHost* host() const;
private:
virtual bool interpret(RLP const& _r);
@ -77,8 +79,6 @@ private:
void giveUpOnFetch();
EthereumHost* m_host;
uint m_protocolVersion;
u256 m_networkId;

8
libethereumx/Ethereum.cpp

@ -38,11 +38,11 @@ void Ethereum::ensureReady()
{
m_client = unique_ptr<Client>(new Client("+ethereum+"));
if (m_client)
startServer();
startRPCServer();
}
catch (DatabaseAlreadyOpen)
{
startClient();
connectToRPCServer();
}
}
@ -55,11 +55,11 @@ bool Ethereum::connectionOpen() const
return false;
}
void Ethereum::startClient()
void Ethereum::connectToRPCServer()
{
}
void Ethereum::startServer()
void Ethereum::startRPCServer()
{
}

4
libethereumx/Ethereum.h

@ -132,9 +132,9 @@ private:
/// Check to see if the client/server connection is open.
bool connectionOpen() const;
/// Start the API client.
void startClient();
void connectToRPCServer();
/// Start the API server.
void startServer();
void startRPCServer();
std::unique_ptr<Client> m_client;

47
libethnet/Common.cpp

@ -20,6 +20,10 @@
*/
#include "Common.h"
#include <libethential/RLP.h>
#include "PeerSession.h"
#include "PeerHost.h"
using namespace std;
using namespace eth;
@ -61,6 +65,47 @@ std::string eth::reasonOf(DisconnectReason _r)
void PeerCapability::disable(std::string const& _problem)
{
clogS(NetConnect) << "Disabling capability '" << m_host->name() << "'. Reason:" << _problem;
clog(NetConnect) << "Disabling capability '" << m_host->name() << "'. Reason:" << _problem;
m_enabled = false;
}
void HostCapabilityFace::seal(bytes& _b)
{
m_host->seal(_b);
}
std::vector<std::shared_ptr<PeerSession> > HostCapabilityFace::peers() const
{
Guard l(m_host->x_peers);
std::vector<std::shared_ptr<PeerSession> > ret;
for (auto const& i: m_host->m_peers)
if (std::shared_ptr<PeerSession> p = i.second.lock())
if (p->m_capabilities.count(name()))
ret.push_back(p);
return ret;
}
RLPStream& PeerCapability::prep(RLPStream& _s)
{
return PeerSession::prep(_s);
}
void PeerCapability::sealAndSend(RLPStream& _s)
{
m_session->sealAndSend(_s);
}
void PeerCapability::sendDestroy(bytes& _msg)
{
m_session->sendDestroy(_msg);
}
void PeerCapability::send(bytesConstRef _msg)
{
m_session->send(_msg);
}
void PeerCapability::addRating(unsigned _r)
{
m_session->addRating(_r);
}

35
libethnet/Common.h

@ -38,6 +38,7 @@ namespace eth
bool isPrivateAddress(bi::address _addressToCheck);
class RLP;
class RLPStream;
class PeerHost;
class PeerSession;
@ -72,6 +73,7 @@ enum DisconnectReason
TooManyPeers,
DuplicatePeer,
IncompatibleProtocol,
InvalidIdentity,
ClientQuit,
UserReason = 0x10
};
@ -94,35 +96,41 @@ class PeerCapability;
class HostCapabilityFace
{
friend class PeerHost;
template <class T> friend class HostCapability;
friend class PeerCapability;
public:
HostCapabilityFace(PeerHost*) {}
HostCapabilityFace() {}
virtual ~HostCapabilityFace() {}
PeerHost* host() const { return m_host; }
std::vector<std::shared_ptr<PeerSession> > peers() const;
protected:
virtual std::string name() const = 0;
virtual PeerCapability* newPeerCapability(PeerSession* _s) = 0;
virtual bool isInitialised() const = 0;
void seal(bytes& _b);
private:
PeerHost* m_host = nullptr;
};
template<class PeerCap>
class HostCapability: public HostCapabilityFace
{
public:
HostCapability(PeerHost* _h): m_host(_h) {}
HostCapability() {}
virtual ~HostCapability() {}
static std::string staticName() { return PeerCap::name(); }
PeerHost* host() const { return m_host; }
protected:
virtual bool isInitialised() const = 0;
virtual std::string name() const { return PeerCap::name(); }
virtual PeerCapability* newPeerCapability(PeerSession* _s) { return new PeerCap(_s, this); }
private:
PeerHost* m_host;
};
class PeerCapability
@ -130,23 +138,30 @@ class PeerCapability
friend class PeerSession;
public:
PeerCapability(PeerSession* _s, HostCapability* _h): m_session(_s), m_host(_h) {}
PeerCapability(PeerSession* _s, HostCapabilityFace* _h): m_session(_s), m_host(_h) {}
virtual ~PeerCapability() {}
/// Must return the capability name.
static std::string name() { return ""; }
PeerSession* session() const { return m_session; }
HostCapability* hostCapability() const { return m_host; }
HostCapabilityFace* hostCapability() const { return m_host; }
protected:
virtual bool interpret(RLP const&) = 0;
void disable(std::string const& _problem);
static RLPStream& prep(RLPStream& _s);
void sealAndSend(RLPStream& _s);
void sendDestroy(bytes& _msg);
void send(bytesConstRef _msg);
void addRating(unsigned _r);
private:
PeerSession* m_session;
HostCapability* m_host;
HostCapabilityFace* m_host;
bool m_enabled = true;
};

13
libethnet/PeerHost.cpp

@ -62,6 +62,7 @@ PeerHost::PeerHost(std::string const& _clientVersion, unsigned short _port, stri
populateAddresses();
determinePublic(_publicAddress, _upnp);
ensureAccepting();
m_lastPeersRequest = chrono::steady_clock::time_point::min();
clog(NetNote) << "Id:" << toHex(m_key.address().ref().cropped(0, 4));
}
@ -78,6 +79,7 @@ PeerHost::PeerHost(std::string const& _clientVersion, string const& _publicAddre
populateAddresses();
determinePublic(_publicAddress, _upnp);
ensureAccepting();
m_lastPeersRequest = chrono::steady_clock::time_point::min();
clog(NetNote) << "Id:" << toHex(m_key.address().ref().cropped(0, 4));
}
@ -90,6 +92,7 @@ PeerHost::PeerHost(std::string const& _clientVersion):
{
// populate addresses.
populateAddresses();
m_lastPeersRequest = chrono::steady_clock::time_point::min();
clog(NetNote) << "Id:" << toHex(m_key.address().ref().cropped(0, 4));
}
@ -103,15 +106,15 @@ unsigned PeerHost::protocolVersion() const
return 0;
}
void PeerHost::registerPeer(std::shared_ptr<PeerSession> _s)
void PeerHost::registerPeer(std::shared_ptr<PeerSession> _s, vector<string> const& _caps)
{
{
Guard l(x_peers);
m_peers[_s->m_id] = _s;
}
for (auto const& i: _s->m_caps)
for (auto const& i: _caps)
if (haveCapability(i))
_s->m_capabilities.push_back(shared_ptr<PeerCapability>(m_capabilities[i]->newPeerCapability(_s.get())));
_s->m_capabilities[i] = shared_ptr<PeerCapability>(m_capabilities[i]->newPeerCapability(_s.get()));
}
void PeerHost::disconnectPeers()
@ -453,8 +456,8 @@ std::vector<PeerInfo> PeerHost::peers(bool _updatePing) const
void PeerHost::process()
{
for (auto const& i: m_capabilities)
if (!i->isInitialised())
return false;
if (!i.second->isInitialised())
return;
growPeers();
prunePeers();
m_ioService.poll();

5
libethnet/PeerHost.h

@ -48,6 +48,7 @@ class BlockQueue;
class PeerHost
{
friend class PeerSession;
friend class HostCapabilityFace;
public:
/// Start server, listening for connections on the given port.
@ -67,7 +68,7 @@ public:
unsigned protocolVersion() const;
/// Register a peer-capability; all new peer connections will have this capability.
template <class T> void registerCapability() { m_capabilities[T::name()] = std::shared_ptr<HostCapabilityFace>(new T(this)); }
template <class T> void registerCapability(T* _t) { _t->m_host = this; m_capabilities[T::name()] = std::shared_ptr<HostCapabilityFace>(_t); }
/// Connect to a peer explicitly.
void connect(std::string const& _addr, unsigned short _port = 30303) noexcept;
@ -102,7 +103,7 @@ public:
/// Deserialise the data and populate the set of known peers.
void restorePeers(bytesConstRef _b);
void registerPeer(std::shared_ptr<PeerSession> _s);
void registerPeer(std::shared_ptr<PeerSession> _s, std::vector<std::string> const& _caps);
bool haveCapability(std::string const& _name) const { return m_capabilities.count(_name); }
std::vector<std::string> caps() const { std::vector<std::string> ret; for (auto const& i: m_capabilities) ret.push_back(i.first); return ret; }

18
libethnet/PeerSession.cpp

@ -45,7 +45,7 @@ PeerSession::~PeerSession()
{
// Read-chain finished for one reason or another.
for (auto& i: m_capabilities)
i.reset();
i.second.reset();
try
{
@ -76,11 +76,11 @@ bool PeerSession::interpret(RLP const& _r)
{
m_protocolVersion = _r[1].toInt<uint>();
auto clientVersion = _r[2].toString();
m_caps = _r[3].toVector<string>();
auto caps = _r[3].toVector<string>();
m_listenPort = _r[4].toInt<unsigned short>();
m_id = _r[5].toHash<h512>();
clogS(NetMessageSummary) << "Hello: " << clientVersion << "V[" << m_protocolVersion << "]" << m_id.abridged() << showbase << hex << m_caps << dec << m_listenPort;
clogS(NetMessageSummary) << "Hello: " << clientVersion << "V[" << m_protocolVersion << "]" << m_id.abridged() << showbase << hex << caps << dec << m_listenPort;
if (m_server->havePeer(m_id))
{
@ -89,8 +89,12 @@ bool PeerSession::interpret(RLP const& _r)
disconnect(DuplicatePeer);
return false;
}
if (m_protocolVersion != m_server->protocolVersion() || !m_id)
if (!m_id)
{
disconnect(InvalidIdentity);
return false;
}
if (m_protocolVersion != m_server->protocolVersion())
{
disconnect(IncompatibleProtocol);
return false;
@ -103,7 +107,7 @@ bool PeerSession::interpret(RLP const& _r)
return false;
}
m_server->registerPeer(shared_from_this());
m_server->registerPeer(shared_from_this(), caps);
break;
}
case DisconnectPacket:
@ -180,7 +184,7 @@ bool PeerSession::interpret(RLP const& _r)
break;
default:
for (auto const& i: m_capabilities)
if (i->m_enabled && i->interpret(_r))
if (i.second->m_enabled && i.second->interpret(_r))
return true;
return false;
}

13
libethnet/PeerSession.h

@ -40,6 +40,7 @@ namespace eth
class PeerSession: public std::enable_shared_from_this<PeerSession>
{
friend class PeerHost;
friend class HostCapabilityFace;
public:
PeerSession(PeerHost* _server, bi::tcp::socket _socket, bi::address _peerAddress, unsigned short _peerPort = 0);
@ -52,13 +53,20 @@ public:
bool isOpen() const { return m_socket.is_open(); }
unsigned id() const { return m_socket.native_handle(); }
bi::tcp::endpoint endpoint() const; ///< for other peers to connect to.
template <class PeerCap>
PeerCap* cap() const { try { return static_cast<PeerCap*>(m_capabilities.at(PeerCap::name())); } catch (...) { return nullptr; } }
static RLPStream& prep(RLPStream& _s);
void sealAndSend(RLPStream& _s);
void sendDestroy(bytes& _msg);
void send(bytesConstRef _msg);
void addRating(unsigned _r) { m_rating += _r; }
private:
void dropped();
void doRead();
@ -77,7 +85,7 @@ private:
std::recursive_mutex m_writeLock;
std::deque<bytes> m_writeQueue;
bi::tcp::socket m_socket;
mutable bi::tcp::socket m_socket; ///< Mutable to ask for native_handle().
std::array<byte, 65536> m_data;
PeerInfo m_info;
Public m_id;
@ -85,7 +93,6 @@ private:
bytes m_incoming;
uint m_protocolVersion;
unsigned short m_listenPort; ///< Port that the remote client is listening on for connections. Useful for giving to peers.
std::vector<std::string> m_caps;
std::chrono::steady_clock::time_point m_ping;
std::chrono::steady_clock::time_point m_connect;
@ -93,7 +100,7 @@ private:
uint m_rating;
std::vector<std::shared_ptr<PeerCapability>> m_capabilities;
std::map<std::string, std::shared_ptr<PeerCapability>> m_capabilities;
bool m_willBeDeleted = false; ///< True if we already posted a deleter on the strand.
};

Loading…
Cancel
Save