Browse Source

GetBlockHashesByNumber packet

cl-refactor
arkpar 10 years ago
parent
commit
597f56843b
  1. 3
      libethereum/Client.cpp
  2. 1
      libethereum/CommonNet.h
  3. 1
      libethereum/DownloadMan.cpp
  4. 12
      libethereum/DownloadMan.h
  5. 146
      libethereum/EthereumHost.cpp
  6. 51
      libethereum/EthereumHost.h
  7. 224
      libethereum/EthereumPeer.cpp
  8. 40
      libethereum/EthereumPeer.h
  9. 1
      libp2p/Capability.h
  10. 6
      libp2p/Host.cpp
  11. 4
      libp2p/HostCapability.h
  12. 2
      libwhisper/WhisperPeer.cpp
  13. 3
      libwhisper/WhisperPeer.h
  14. 2
      test/libp2p/capability.cpp

3
libethereum/Client.cpp

@ -185,10 +185,9 @@ Client::Client(p2p::Host* _extNet, std::shared_ptr<GasPricer> _gp, std::string c
m_gp->update(m_bc);
auto host = _extNet->registerCapability(new EthereumHost(m_bc, m_tq, m_bq, _networkId));
m_host = host;
_extNet->addCapability(host, EthereumHost::staticName(), EthereumHost::staticVersion() - 1);
_extNet->addCapability(host, EthereumHost::staticName(), EthereumHost::c_oldProtocolVersion); //TODO: remove this one v61+ protocol is common
if (_dbPath.size())
Defaults::setDBPath(_dbPath);

1
libethereum/CommonNet.h

@ -63,6 +63,7 @@ enum
GetBlocksPacket,
BlocksPacket,
NewBlockPacket,
GetBlockHashesByNumberPacket,
PacketCount
};

1
libethereum/DownloadMan.cpp

@ -122,5 +122,6 @@ void HashDownloadSub::noteHash(unsigned _index, unsigned _size)
Guard l(m_fetch);
if (m_man)
for(unsigned i = _index; i < _index + _size; ++i)
if (i >= m_man->m_got.all().first && i < m_man->m_got.all().second)
m_man->m_got += i;
}

12
libethereum/DownloadMan.h

@ -234,12 +234,6 @@ public:
m_chainStart = _start;
m_chainCount = 0;
m_got = RangeMask<unsigned>(_start, _start);
{
ReadGuard l(x_subs);
for (auto i: m_subs)
i->resetFetch();
}
}
RangeMask<unsigned> taken(bool _desperate = false) const
@ -259,7 +253,7 @@ public:
{
ReadGuard l(m_lock);
return m_got.full();
}\
}
size_t chainSize() const { ReadGuard l(m_lock); return m_chainCount; }
size_t chainEmpty() const { ReadGuard l(m_lock); return m_chainCount == 0; }
@ -277,10 +271,6 @@ private:
std::unordered_set<HashDownloadSub*> m_subs;
};
}
}

146
libethereum/EthereumHost.cpp

@ -27,6 +27,7 @@
#include <libp2p/Host.h>
#include <libp2p/Session.h>
#include <libethcore/Exceptions.h>
#include <libethcore/Params.h>
#include "BlockChain.h"
#include "TransactionQueue.h"
#include "BlockQueue.h"
@ -37,7 +38,7 @@ using namespace dev;
using namespace dev::eth;
using namespace p2p;
const unsigned c_prevProtocolVersion = 60;
unsigned const EthereumHost::c_oldProtocolVersion = 60; //TODO: remove this once v61+ is common
EthereumHost::EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQueue& _bq, u256 _networkId):
HostCapability<EthereumPeer>(),
@ -71,12 +72,6 @@ bool EthereumHost::ensureInitialised()
return false;
}
void EthereumHost::noteNeedsSyncing(EthereumPeer* _who)
{
if (_who->m_asking == Asking::Nothing)
continueSync(_who);
}
void EthereumHost::reset()
{
forEachPeer([](EthereumPeer* _p) { _p->abortSync(); });
@ -88,7 +83,7 @@ void EthereumHost::reset()
m_syncingTotalDifficulty = 0;
m_latestBlockSent = h256();
m_transactionsSent.clear();
m_v60Hashes.clear();
m_hashes.clear();
}
void EthereumHost::doWork()
@ -130,7 +125,7 @@ void EthereumHost::maintainTransactions()
}
for (auto const& t: ts)
m_transactionsSent.insert(t.first);
forEachPeer([&](shared_ptr<EthereumPeer> _p)
forEachPeerPtr([&](shared_ptr<EthereumPeer> _p)
{
bytes b;
unsigned n = 0;
@ -153,29 +148,27 @@ void EthereumHost::maintainTransactions()
});
}
void EthereumHost::forEachPeer(std::function<void(EthereumPeer*)> const& _f)
void EthereumHost::forEachPeer(std::function<void(EthereumPeer*)> const& _f) const
{
forEachPeer([&](std::shared_ptr<EthereumPeer> _p)
forEachPeerPtr([&](std::shared_ptr<EthereumPeer> _p)
{
if (_p)
_f(_p.get());
});
}
void EthereumHost::forEachPeer(std::function<void(std::shared_ptr<EthereumPeer>)> const& _f)
void EthereumHost::forEachPeerPtr(std::function<void(std::shared_ptr<EthereumPeer>)> const& _f) const
{
for (auto s: peerSessions())
_f(s.first->cap<EthereumPeer>());
for (auto s: peerSessions(protocolVersion() - 1)) //TODO:
_f(s.first->cap<EthereumPeer>(protocolVersion() - 1));
for (auto s: peerSessions(c_oldProtocolVersion)) //TODO: remove once v61+ is common
_f(s.first->cap<EthereumPeer>(c_oldProtocolVersion));
}
pair<vector<shared_ptr<EthereumPeer>>, vector<shared_ptr<EthereumPeer>>> EthereumHost::randomSelection(unsigned _percent, std::function<bool(EthereumPeer*)> const& _allow)
{
pair<vector<shared_ptr<EthereumPeer>>, vector<shared_ptr<EthereumPeer>>> ret;
vector<shared_ptr<EthereumPeer>> peers;
forEachPeer([&](shared_ptr<EthereumPeer> _p)
forEachPeerPtr([&](shared_ptr<EthereumPeer> _p)
{
if (_p && _allow(_p.get()))
ret.second.push_back(_p);
@ -233,16 +226,12 @@ void EthereumHost::maintainBlocks(h256 const& _currentHash)
}
}
void EthereumHost::onPeerState(EthereumPeer* _peer)
void EthereumHost::onPeerStatus(EthereumPeer* _peer)
{
if (!_peer->enabled())
{
clog(NetNote) << "Ignoring status from disabled peer";
return;
}
Guard l(x_sync);
if (_peer->m_genesisHash != m_chain.genesisHash())
_peer->disable("Invalid genesis hash");
else if (_peer->m_protocolVersion != protocolVersion())// && _peer->m_protocolVersion != c_prevProtocolVersion)
else if (_peer->m_protocolVersion != protocolVersion() && _peer->m_protocolVersion != c_oldProtocolVersion)
_peer->disable("Invalid protocol version.");
else if (_peer->m_networkId != networkId())
_peer->disable("Invalid network identifier.");
@ -252,16 +241,44 @@ void EthereumHost::onPeerState(EthereumPeer* _peer)
_peer->disable("Peer banned for previous bad behaviour.");
else
{
_peer->m_expectedHashes = 500000; //TODO:
if (_peer->m_protocolVersion != protocolVersion())
estimatePeerHashes(_peer);
else if (_peer->m_latestBlockNumber > m_chain.number())
_peer->m_expectedHashes = (unsigned)_peer->m_latestBlockNumber - m_chain.number();
if (m_hashMan.chainSize() < _peer->m_expectedHashes)
m_hashMan.resetToRange(m_chain.number() + 1, _peer->m_expectedHashes);
continueSync(_peer);
}
}
void EthereumHost::estimatePeerHashes(EthereumPeer* _peer)
{
BlockInfo block = m_chain.info();
time_t lastBlockTime = (block.hash() == m_chain.genesisHash()) ? 1428192000 : (time_t)block.timestamp;
time_t now = time(0);
unsigned blockCount = 1000;
if (lastBlockTime > now)
clog(NetWarn) << "Clock skew? Latest block is in the future";
else
blockCount += (now - lastBlockTime) / (unsigned)c_durationLimit;
clog(NetAllDetail) << "Estimated hashes: " << blockCount;
_peer->m_expectedHashes = blockCount;
}
void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes)
{
Guard l(x_sync);
assert(_peer->m_asking == Asking::Nothing);
onPeerHashes(_peer, _hashes, false);
}
void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes, bool _complete)
{
if (_hashes.empty())
{
onPeerDoneHashes(_peer, true);
return;
}
unsigned knowns = 0;
unsigned unknowns = 0;
h256s neededBlocks;
@ -273,8 +290,8 @@ void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes)
if (status == QueueStatus::Importing || status == QueueStatus::Ready || m_chain.isKnown(h))
{
clog(NetMessageSummary) << "block hash ready:" << h << ". Start blocks download...";
m_v60Hashes += neededBlocks;
onPeerDoneHashes(_peer, false);
m_hashes += neededBlocks;
onPeerDoneHashes(_peer, true);
return;
}
else if (status == QueueStatus::Bad)
@ -292,7 +309,7 @@ void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes)
knowns++;
m_syncingLatestHash = h;
}
m_v60Hashes += neededBlocks;
m_hashes += neededBlocks;
clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns; now at" << m_syncingLatestHash;
if (_complete)
{
@ -312,6 +329,13 @@ void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes)
void EthereumHost::onPeerHashes(EthereumPeer* _peer, unsigned /*_index*/, h256s const& _hashes)
{
Guard l(x_sync);
assert(_peer->m_asking == Asking::Nothing);
if (_hashes.empty())
{
onPeerDoneHashes(_peer, true);
return;
}
unsigned knowns = 0;
unsigned unknowns = 0;
h256s neededBlocks;
@ -322,11 +346,11 @@ void EthereumHost::onPeerHashes(EthereumPeer* _peer, unsigned /*_index*/, h256s
auto status = m_bq.blockStatus(h);
if (status == QueueStatus::Importing || status == QueueStatus::Ready || m_chain.isKnown(h))
{
clog(NetWarn) << "block hash alrady known:" << h;
clog(NetWarn) << "block hash already known:" << h;
}
else if (status == QueueStatus::Bad)
{
cwarn << "block hash bad!" << h << ". Bailing...";
clog(NetWarn) << "block hash bad!" << h << ". Bailing...";
_peer->setIdle();
return;
}
@ -337,10 +361,9 @@ void EthereumHost::onPeerHashes(EthereumPeer* _peer, unsigned /*_index*/, h256s
}
else
knowns++;
m_syncingLatestHash = h;
}
m_man.appendToChain(neededBlocks);
clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns; now at" << m_syncingLatestHash;
clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns";
if (m_hashMan.isComplete())
{
@ -356,33 +379,30 @@ void EthereumHost::onPeerHashes(EthereumPeer* _peer, unsigned /*_index*/, h256s
continueSync(_peer);
}
void EthereumHost::onPeerDoneHashes(EthereumPeer* _peer, bool _new)
void EthereumHost::onPeerDoneHashes(EthereumPeer* _peer, bool _localChain)
{
assert(_peer->m_asking == Asking::Nothing);
m_needSyncHashes = false;
if (_peer->m_protocolVersion == protocolVersion() || _new)
if (_peer->m_protocolVersion != protocolVersion() || _localChain)
{
continueSync(_peer);
m_man.resetToChain(m_hashes);
m_hashes.clear();
}
else
{
m_man.resetToChain(m_v60Hashes);
continueSync();
}
}
void EthereumHost::onPeerBlocks(EthereumPeer* _peer, RLP const& _r)
{
if (!_peer->enabled())
{
clog(NetNote) << "Ignoring blocks from disabled peer";
return;
}
Guard l(x_sync);
assert(_peer->m_asking == Asking::Nothing);
unsigned itemCount = _r.itemCount();
clog(NetMessageSummary) << "Blocks (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreBlocks");
if (itemCount == 0)
{
// Got to this peer's latest block - just give up.
clog(NetNote) << "Finishing blocks fetch...";
// NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary.
_peer->setIdle();
return;
}
@ -550,9 +570,9 @@ void EthereumHost::onPeerTransactions(EthereumPeer* _peer, RLP const& _r)
void EthereumHost::continueSync()
{
clog(NetAllDetail) << "Getting help with downloading hashes and blocks";
forEachPeer([&](EthereumPeer* _p)
{
clog(NetNote) << "Getting help with downloading hashes and blocks";
if (_p->m_asking == Asking::Nothing)
continueSync(_p);
});
@ -560,29 +580,23 @@ void EthereumHost::continueSync()
void EthereumHost::continueSync(EthereumPeer* _peer)
{
assert(_peer->m_asking == Asking::Nothing);
bool otherPeerSync = false;
bool thisPeerSync = false;
if (m_needSyncHashes && peerShouldGrabChain(_peer))
{
forEachPeer([&](EthereumPeer* _p)
{
if (_p->m_asking == Asking::Hashes && _p->m_protocolVersion != protocolVersion())
{
// Already have a peer downloading hash chain with old protocol, do nothing
if (_p == _peer)
thisPeerSync = true;
else
otherPeerSync = true;
}
if (_p != _peer && _p->m_asking == Asking::Hashes && _p->m_protocolVersion != protocolVersion())
otherPeerSync = true; // Already have a peer downloading hash chain with old protocol, do nothing
});
if (otherPeerSync)
{
/// Downloading from other peer with v60 protocol, nothing ese we can do
_peer->setIdle();
return;
}
if (_peer->m_protocolVersion == protocolVersion())
_peer->requestHashes();
if (_peer->m_protocolVersion == protocolVersion() && !m_syncingLatestHash)
_peer->requestHashes(); /// v61+ and not catching up to a particular hash
else
{
// Restart/continue sync in single peer mode
@ -606,11 +620,9 @@ bool EthereumHost::peerShouldGrabBlocks(EthereumPeer* _peer) const
auto lh = m_syncingLatestHash;
auto ctd = m_chain.details().totalDifficulty;
clog(NetNote) << "Should grab blocks? " << td << "vs" << ctd;
clog(NetAllDetail) << "Should grab blocks? " << td << "vs" << ctd;
if (td < ctd || (td == ctd && m_chain.currentHash() == lh))
return false;
return true;
}
@ -632,3 +644,15 @@ bool EthereumHost::peerShouldGrabChain(EthereumPeer* _peer) const
return true;
}
}
bool EthereumHost::isSyncing() const
{
Guard l(x_sync);
bool syncing = false;
forEachPeer([&](EthereumPeer* _p)
{
if (_p->m_asking != Asking::Nothing)
syncing = true;
});
return syncing;
}

51
libethereum/EthereumHost.h

@ -56,7 +56,6 @@ class BlockQueue;
*/
class EthereumHost: public p2p::HostCapability<EthereumPeer>, Worker
{
friend class EthereumPeer;
public:
/// Start server, but don't listen.
EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQueue& _bq, u256 _networkId);
@ -71,22 +70,30 @@ public:
void reset();
DownloadMan const& downloadMan() const { return m_man; }
bool isSyncing() const { return m_needSyncBlocks || m_needSyncHashes; }
bool isSyncing() const;
bool isBanned(p2p::NodeId _id) const { return !!m_banned.count(_id); }
void noteNewTransactions() { m_newTransactions = true; }
void noteNewBlocks() { m_newBlocks = true; }
void onPeerState(EthereumPeer* _peer);
void onPeerStatus(EthereumPeer* _peer); ///< Called by peer to report status
void onPeerBlocks(EthereumPeer* _peer, RLP const& _r); ///< Called by peer once it has new blocks during syn
void onPeerNewBlock(EthereumPeer* _peer, RLP const& _r); ///< Called by peer once it has new blocks
void onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes); ///< Called by peer once it has new hashes
void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes); ///< Called by peer once it has another sequential block of hashes during sync
void onPeerHashes(EthereumPeer* _peer, unsigned _index, h256s const& _hashes); ///< Called by peer once it has a new ordered block of hashes starting with a particular number
void onPeerTransactions(EthereumPeer* _peer, RLP const& _r); ///< Called by peer when it has new transactions
DownloadMan& downloadMan() { return m_man; }
HashDownloadMan& hashDownloadMan() { return m_hashMan; }
BlockChain const& chain() { return m_chain; }
static unsigned const c_oldProtocolVersion;
private:
std::pair<std::vector<std::shared_ptr<EthereumPeer>>, std::vector<std::shared_ptr<EthereumPeer>>> randomSelection(unsigned _percent = 25, std::function<bool(EthereumPeer*)> const& _allow = [](EthereumPeer const*){ return true; });
void forEachPeer(std::function<void(std::shared_ptr<EthereumPeer>)> const& _f);
void forEachPeer(std::function<void(EthereumPeer*)> const& _f);
/// Session is tell us that we may need (re-)syncing with the peer.
void noteNeedsSyncing(EthereumPeer* _who);
void forEachPeerPtr(std::function<void(std::shared_ptr<EthereumPeer>)> const& _f) const;
void forEachPeer(std::function<void(EthereumPeer*)> const& _f) const;
/// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network.
void doWork();
@ -108,15 +115,13 @@ private:
virtual void onStarting() { startWorking(); }
virtual void onStopping() { stopWorking(); }
void changeSyncer(EthereumPeer* _ignore, bool _needHelp = true);
void continueSync();
void continueSync(EthereumPeer* _peer);
void onPeerBlocks(EthereumPeer* _peer, RLP const& _r);
void onPeerDoneHashes(EthereumPeer* _peer, bool _new);
void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes);
void onPeerHashes(EthereumPeer* _peer, unsigned _index, h256s const& _hashes);
void continueSync(); /// Find something to do for all peers
void continueSync(EthereumPeer* _peer); /// Find some work to do for a peer
void onPeerDoneHashes(EthereumPeer* _peer, bool _new); /// Called when done downloading hashes from peer
void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes, bool _complete);
bool peerShouldGrabBlocks(EthereumPeer* _peer) const;
bool peerShouldGrabChain(EthereumPeer* _peer) const;
void estimatePeerHashes(EthereumPeer* _peer);
BlockChain const& m_chain;
TransactionQueue& m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain.
@ -124,8 +129,6 @@ private:
u256 m_networkId;
std::weak_ptr<EthereumPeer> m_hashSyncer;
DownloadMan m_man;
HashDownloadMan m_hashMan;
@ -137,14 +140,12 @@ private:
bool m_newTransactions = false;
bool m_newBlocks = false;
unsigned m_maxKnownNumber = 0;
u256 m_maxKnownDifficulty;
bool m_needSyncHashes = true;
bool m_needSyncBlocks = true;
h256 m_syncingLatestHash; ///< Peer's latest block's hash, as of the current sync.
u256 m_syncingTotalDifficulty; ///< Peer's latest block's total difficulty, as of the current sync.
h256s m_v60Hashes;
mutable Mutex x_sync;
bool m_needSyncHashes = true; ///< Indicates if need to downlad hashes
bool m_needSyncBlocks = true; ///< Indicates if we still need to download some blocks
h256 m_syncingLatestHash; ///< Latest block's hash, as of the current sync.
u256 m_syncingTotalDifficulty; ///< Latest block's total difficulty, as of the current sync.
h256s m_hashes; ///< List of hashes with unknown block numbers. Used for v60 chain downloading and catching up to a particular unknown
};
}

224
libethereum/EthereumPeer.cpp

@ -34,12 +34,14 @@ using namespace dev;
using namespace dev::eth;
using namespace p2p;
EthereumPeer::EthereumPeer(Session* _s, HostCapabilityFace* _h, unsigned _i):
EthereumPeer::EthereumPeer(Session* _s, HostCapabilityFace* _h, unsigned _i, CapDesc const& _cap):
Capability(_s, _h, _i),
m_sub(host()->m_man),
m_hashSub(host()->m_hashMan)
m_sub(host()->downloadMan()),
m_hashSub(host()->hashDownloadMan()),
m_peerCapabilityVersion(_cap.second)
{
requestState();
m_syncHashNumber = host()->chain().number() + 1;
requestStatus();
}
EthereumPeer::~EthereumPeer()
@ -78,55 +80,44 @@ string toString(Asking _a)
void EthereumPeer::setIdle()
{
if (m_asking == Asking::Blocks)
{
clog(NetNote) << "Finishing blocks fetch...";
// NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary.
m_sub.doneFetch();
m_hashSub.doneFetch();
setAsking(Asking::Nothing);
}
else if (m_asking == Asking::Hashes)
{
clog(NetNote) << "Finishing hashes fetch...";
setAsking(Asking::Nothing);
}
else if (m_asking == Asking::State)
{
setAsking(Asking::Nothing);
}
}
void EthereumPeer::requestState()
void EthereumPeer::requestStatus()
{
if (m_asking != Asking::Nothing)
clog(NetWarn) << "Bad state: requesting state should be the first action";
setAsking(Asking::State);
RLPStream s;
prep(s, StatusPacket, 5)
<< host()->protocolVersion() - 1
bool latest = m_peerCapabilityVersion == host()->protocolVersion();
prep(s, StatusPacket, latest ? 6 : 5)
<< (latest ? host()->protocolVersion() : EthereumHost::c_oldProtocolVersion)
<< host()->networkId()
<< host()->m_chain.details().totalDifficulty
<< host()->m_chain.currentHash()
<< host()->m_chain.genesisHash();
<< host()->chain().details().totalDifficulty
<< host()->chain().currentHash()
<< host()->chain().genesisHash();
if (latest)
s << u256(host()->chain().number());
sealAndSend(s);
}
void EthereumPeer::requestHashes()
{
assert(m_asking != Asking::Blocks);
m_syncHashNumber = m_hashSub.nextFetch(c_maxBlocksAsk);
if (m_asking == Asking::Blocks)
return;
m_syncHashNumber = m_hashSub.nextFetch(c_maxHashesAsk);
setAsking(Asking::Hashes);
RLPStream s;
prep(s, GetBlockHashesPacket, 2) << m_syncHashNumber << c_maxHashesAsk;
prep(s, GetBlockHashesByNumberPacket, 2) << m_syncHashNumber << c_maxHashesAsk;
sealAndSend(s);
}
void EthereumPeer::requestHashes(h256 const& _lastHash)
{
assert(m_asking != Asking::Blocks);
if (m_asking == Asking::Blocks)
return;
setAsking(Asking::Hashes);
RLPStream s;
prep(s, GetBlockHashesPacket, 2) << _lastHash << c_maxHashesAsk;
@ -135,8 +126,7 @@ void EthereumPeer::requestHashes(h256 const& _lastHash)
void EthereumPeer::requestBlocks()
{
// Looks like it's the best yet for total difficulty. Set to download.
setAsking(Asking::Blocks); // will kick off other peers to help if available.
setAsking(Asking::Blocks);
auto blocks = m_sub.nextFetch(c_maxBlocksAsk);
if (blocks.size())
{
@ -154,31 +144,12 @@ void EthereumPeer::requestBlocks()
void EthereumPeer::setAsking(Asking _a)
{
m_asking = _a;
if (!isSyncing())
{
m_syncingLatestHash = h256();
m_syncingTotalDifficulty = 0;
m_syncingNeededBlocks.clear();
}
m_lastAsk = chrono::system_clock::now();
session()->addNote("ask", _a == Asking::Nothing ? "nothing" : _a == Asking::State ? "state" : _a == Asking::Hashes ? "hashes" : _a == Asking::Blocks ? "blocks" : "?");
session()->addNote("sync", string(isSyncing() ? "ongoing" : "holding") + (needsSyncing() ? " & needed" : ""));
}
void EthereumPeer::setNeedsSyncing(h256 _latestHash, u256 _td)
{
m_latestHash = _latestHash;
m_totalDifficulty = _td;
if (m_latestHash)
host()->noteNeedsSyncing(this);
session()->addNote("sync", string(isSyncing() ? "ongoing" : "holding") + (needsSyncing() ? " & needed" : ""));
}
void EthereumPeer::tick()
{
if (chrono::system_clock::now() - m_lastAsk > chrono::seconds(10) && m_asking != Asking::Nothing)
@ -200,62 +171,55 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
case StatusPacket:
{
m_protocolVersion = _r[0].toInt<unsigned>();
if (!!session()->cap<EthereumPeer>(EthereumHost::staticVersion()))
m_protocolVersion = host()->protocolVersion();
m_networkId = _r[1].toInt<u256>();
// a bit dirty as we're misusing these to communicate the values to transition, but harmless.
m_totalDifficulty = _r[2].toInt<u256>();
m_latestHash = _r[3].toHash<h256>();
m_genesisHash = _r[4].toHash<h256>();
clog(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << m_genesisHash << ", TD:" << m_totalDifficulty << "=" << m_latestHash;
host()->onPeerState(this);
if (m_peerCapabilityVersion == host()->protocolVersion())
{
m_protocolVersion = host()->protocolVersion();
m_latestBlockNumber = _r[5].toInt<u256>();
}
clog(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << m_genesisHash << "/" << m_latestBlockNumber << ", TD:" << m_totalDifficulty << "=" << m_latestHash;
setAsking(Asking::Nothing);
host()->onPeerStatus(this);
break;
}
case TransactionsPacket:
{
unsigned itemCount = _r.itemCount();
clog(NetAllDetail) << "Transactions (" << dec << itemCount << "entries)";
Guard l(x_knownTransactions);
for (unsigned i = 0; i < itemCount; ++i)
{
auto h = sha3(_r[i].data());
m_knownTransactions.insert(h);
ImportResult ir = host()->m_tq.import(_r[i].data());
switch (ir)
{
case ImportResult::Malformed:
addRating(-100);
break;
case ImportResult::AlreadyKnown:
// if we already had the transaction, then don't bother sending it on.
host()->m_transactionsSent.insert(h);
addRating(0);
break;
case ImportResult::Success:
addRating(100);
break;
default:;
}
}
host()->onPeerTransactions(this, _r);
break;
}
case GetBlockHashesPacket:
{
if (m_protocolVersion == host()->protocolVersion())
h256 later = _r[0].toHash<h256>();
unsigned limit = _r[1].toInt<unsigned>();
clog(NetMessageSummary) << "GetBlockHashes (" << limit << "entries," << later << ")";
unsigned c = min<unsigned>(host()->chain().number(later), limit);
RLPStream s;
prep(s, BlockHashesPacket, c);
h256 p = host()->chain().details(later).parent;
for (unsigned i = 0; i < c && p; ++i, p = host()->chain().details(p).parent)
s << p;
sealAndSend(s);
addRating(0);
break;
}
case GetBlockHashesByNumberPacket:
{
u256 number256 = _r[0].toInt<u256>();
unsigned number = (unsigned) number256;
unsigned limit = _r[1].toInt<unsigned>();
clog(NetMessageSummary) << "GetBlockHashes (" << number << "-" << number + limit << ")";
clog(NetMessageSummary) << "GetBlockHashesByNumber (" << number << "-" << number + limit << ")";
RLPStream s;
if (number <= host()->m_chain.number())
if (number <= host()->chain().number())
{
unsigned c = min<unsigned>(host()->m_chain.number() - number + 1, limit);
unsigned c = min<unsigned>(host()->chain().number() - number + 1, limit);
prep(s, BlockHashesPacket, c);
for (unsigned n = number; n < number + c; n++)
{
h256 p = host()->m_chain.numberHash(n);
h256 p = host()->chain().numberHash(n);
s << p;
}
}
@ -263,24 +227,6 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
prep(s, BlockHashesPacket, 0);
sealAndSend(s);
addRating(0);
}
else
{
// Support V60 protocol
h256 later = _r[0].toHash<h256>();
unsigned limit = _r[1].toInt<unsigned>();
clog(NetMessageSummary) << "GetBlockHashes (" << limit << "entries," << later << ")";
unsigned c = min<unsigned>(host()->m_chain.number(later), limit);
RLPStream s;
prep(s, BlockHashesPacket, c);
h256 p = host()->m_chain.details(later).parent;
for (unsigned i = 0; i < c && p; ++i, p = host()->m_chain.details(p).parent)
s << p;
sealAndSend(s);
addRating(0);
}
break;
}
case BlockHashesPacket:
@ -290,14 +236,10 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
if (m_asking != Asking::Hashes)
{
cwarn << "Peer giving us hashes when we didn't ask for them.";
clog(NetWarn) << "Peer giving us hashes when we didn't ask for them.";
break;
}
if (itemCount == 0)
{
host()->onPeerDoneHashes(this, false);
return true;
}
setAsking(Asking::Nothing);
h256s hashes(itemCount);
for (unsigned i = 0; i < itemCount; ++i)
{
@ -306,10 +248,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
}
if (m_protocolVersion == host()->protocolVersion())
{
//v61, report hashes ordered by number
host()->onPeerHashes(this, m_syncHashNumber, hashes);
}
host()->onPeerHashes(this, m_syncHashNumber, hashes); // V61+, report hashes by number
else
host()->onPeerHashes(this, hashes);
m_syncHashNumber += itemCount;
@ -332,9 +271,9 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
for (unsigned i = 0; i < min(count, c_maxBlocks); ++i)
{
auto h = _r[i].toHash<h256>();
if (host()->m_chain.isKnown(h))
if (host()->chain().isKnown(h))
{
rlp += host()->m_chain.block(_r[i].toHash<h256>());
rlp += host()->chain().block(_r[i].toHash<h256>());
++n;
}
}
@ -351,54 +290,21 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
}
case BlocksPacket:
{
if (m_asking != Asking::Blocks)
clog(NetWarn) << "Peer giving us blocks when we didn't ask for them.";
else
{
setAsking(Asking::Nothing);
host()->onPeerBlocks(this, _r);
}
break;
}
case NewBlockPacket:
{
auto h = BlockInfo::headerHash(_r[0].data());
clog(NetMessageSummary) << "NewBlock: " << h;
if (_r.itemCount() != 2)
disable("NewBlock without 2 data fields.");
else
{
switch (host()->m_bq.import(_r[0].data(), host()->m_chain))
{
case ImportResult::Success:
addRating(100);
break;
case ImportResult::FutureTime:
//TODO: Rating dependent on how far in future it is.
break;
case ImportResult::Malformed:
case ImportResult::BadChain:
disable("Malformed block received.");
return true;
case ImportResult::AlreadyInChain:
case ImportResult::AlreadyKnown:
break;
case ImportResult::UnknownParent:
clog(NetMessageSummary) << "Received block with no known parent. Resyncing...";
setNeedsSyncing(h, _r[1].toInt<u256>());
break;
default:;
}
DEV_GUARDED(x_knownBlocks)
m_knownBlocks.insert(h);
}
host()->onPeerNewBlock(this, _r);
break;
}
case NewBlockHashesPacket:
{
clog(NetMessageSummary) << "NewBlockHashes";
if (host()->isSyncing())
clog(NetMessageSummary) << "Ignoring since we're already downloading.";
else
{
unsigned itemCount = _r.itemCount();
clog(NetMessageSummary) << "BlockHashes (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreHashes");
@ -407,11 +313,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
for (unsigned i = 0; i < itemCount; ++i)
hashes[i] = _r[i].toHash<h256>();
clog(NetNote) << "Not syncing and new block hash discovered: syncing without help.";
host()->onPeerHashes(this, hashes);
host()->onPeerDoneHashes(this, true);
return true;
}
host()->onPeerNewHashes(this, hashes);
break;
}
default:

40
libethereum/EthereumPeer.h

@ -49,11 +49,11 @@ namespace eth
*/
class EthereumPeer: public p2p::Capability
{
friend class EthereumHost;
friend class EthereumHost; //TODO: remove this
public:
/// Basic constructor.
EthereumPeer(p2p::Session* _s, p2p::HostCapabilityFace* _h, unsigned _i);
EthereumPeer(p2p::Session* _s, p2p::HostCapabilityFace* _h, unsigned _i, p2p::CapDesc const& _cap);
/// Basic destructor.
virtual ~EthereumPeer();
@ -70,10 +70,16 @@ public:
/// What is the ethereum subprotocol host object.
EthereumHost* host() const;
/// Abort sync and reset fetch
void setIdle();
void requestState();
/// Request hashes. Uses hash download manager to get hash number. v61+ protocol version only
void requestHashes();
/// Request hashes for given parent hash.
void requestHashes(h256 const& _lastHash);
/// Request blocks. Uses block download manager.
void requestBlocks();
private:
@ -82,6 +88,9 @@ private:
/// Interpret an incoming message.
virtual bool interpret(unsigned _id, RLP const& _r);
/// Request status. Called from constructor
void requestStatus();
/// Abort the sync operation.
void abortSync();
@ -91,24 +100,18 @@ private:
/// Update our asking state.
void setAsking(Asking _g);
/// Update our syncing requirements state.
void setNeedsSyncing(h256 _latestHash, u256 _td);
void resetNeedsSyncing() { setNeedsSyncing(h256(), 0); }
/// Do we presently need syncing with this peer?
bool needsSyncing() const { return !!m_latestHash; }
/// Are we presently syncing with this peer?
bool isSyncing() const;
/// Check whether the session should bother grabbing the peer's blocks.
bool shouldGrabBlocks() const;
/// Runs period checks to check up on the peer.
void tick();
/// Peer's protocol version.
unsigned m_protocolVersion;
/// Peer's network id.
u256 m_networkId;
@ -117,28 +120,24 @@ private:
/// When we asked for it. Allows a time out.
std::chrono::system_clock::time_point m_lastAsk;
/// Whether this peer is in the process of syncing or not. Only one peer can be syncing at once.
bool m_isSyncing = false;
/// These are determined through either a Status message or from NewBlock.
h256 m_latestHash; ///< Peer's latest block's hash that we know about or default null value if no need to sync.
u256 m_totalDifficulty; ///< Peer's latest block's total difficulty.
h256 m_genesisHash; ///< Peer's genesis hash
/// Once a sync is started on this peer, they are cleared and moved into m_syncing*.
u256 m_latestBlockNumber; ///< Number of the latest block this peer has
/// This is built as we ask for hashes. Once no more hashes are given, we present this to the
/// host who initialises the DownloadMan and m_sub becomes active for us to begin asking for blocks.
h256s m_syncingNeededBlocks; ///< The blocks that we should download from this peer.
h256 m_syncingLastReceivedHash; ///< Hash most recently received from peer.
h256 m_syncingLatestHash; ///< Peer's latest block's hash, as of the current sync.
u256 m_syncingTotalDifficulty; ///< Peer's latest block's total difficulty, as of the current sync.
unsigned m_expectedHashes = 0; ///< Estimated Upper bound of hashes to expect from this peer.
unsigned m_syncHashNumber = 0;
unsigned m_expectedHashes = 0; ///< Estimated upper bound of hashes to expect from this peer.
unsigned m_syncHashNumber = 0; ///< Number of latest hash we sync to
/// Once we're asking for blocks, this becomes in use.
DownloadSub m_sub;
/// Once we're asking for hashes, this becomes in use.
HashDownloadSub m_hashSub;
u256 m_peerCapabilityVersion; ///< Protocol version this peer supports received as capability
/// Have we received a GetTransactions packet that we haven't yet answered?
bool m_requireTransactions = false;
@ -146,7 +145,6 @@ private:
h256Hash m_knownBlocks; ///< Blocks that the peer already knows about (that don't need to be sent to them).
Mutex x_knownTransactions;
h256Hash m_knownTransactions; ///< Transactions that the peer already knows of.
};
}

1
libp2p/Capability.h

@ -44,7 +44,6 @@ public:
*/
Session* session() const { return m_session; }
HostCapabilityFace* hostCapability() const { return m_host; }
bool enabled() { return m_enabled; }
protected:
virtual bool interpret(unsigned _id, RLP const&) = 0;

6
libp2p/Host.cpp

@ -202,6 +202,10 @@ void Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io
// clang error (previously: ... << hex << caps ...)
// "'operator<<' should be declared prior to the call site or in an associated namespace of one of its arguments"
stringstream capslog;
if (caps.size() > 1)
caps.erase(remove_if(caps.begin(), caps.end(), [&](CapDesc const& _r){ return any_of(caps.begin(), caps.end(), [&](CapDesc const& _o){ return _r.first == _o.first && _o.second > _r.second; }); }), caps.end());
for (auto cap: caps)
capslog << "(" << cap.first << "," << dec << cap.second << ")";
clog(NetMessageSummary) << "Hello: " << clientVersion << "V[" << protocolVersion << "]" << _id << showbase << capslog.str() << dec << listenPort;
@ -237,7 +241,7 @@ void Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameIO* _io
for (auto const& i: caps)
if (haveCapability(i))
{
ps->m_capabilities[i] = shared_ptr<Capability>(m_capabilities[i]->newPeerCapability(ps.get(), o));
ps->m_capabilities[i] = shared_ptr<Capability>(m_capabilities[i]->newPeerCapability(ps.get(), o, i));
o += m_capabilities[i]->messageCount();
}
ps->start();

4
libp2p/HostCapability.h

@ -53,7 +53,7 @@ protected:
virtual u256 version() const = 0;
CapDesc capDesc() const { return std::make_pair(name(), version()); }
virtual unsigned messageCount() const = 0;
virtual Capability* newPeerCapability(Session* _s, unsigned _idOffset) = 0;
virtual Capability* newPeerCapability(Session* _s, unsigned _idOffset, CapDesc const& _cap) = 0;
virtual void onStarting() {}
virtual void onStopping() {}
@ -77,7 +77,7 @@ protected:
virtual std::string name() const { return PeerCap::name(); }
virtual u256 version() const { return PeerCap::version(); }
virtual unsigned messageCount() const { return PeerCap::messageCount(); }
virtual Capability* newPeerCapability(Session* _s, unsigned _idOffset) { return new PeerCap(_s, this, _idOffset); }
virtual Capability* newPeerCapability(Session* _s, unsigned _idOffset, CapDesc const& _cap) { return new PeerCap(_s, this, _idOffset, _cap); }
};
}

2
libwhisper/WhisperPeer.cpp

@ -29,7 +29,7 @@ using namespace dev;
using namespace dev::p2p;
using namespace dev::shh;
WhisperPeer::WhisperPeer(Session* _s, HostCapabilityFace* _h, unsigned _i): Capability(_s, _h, _i)
WhisperPeer::WhisperPeer(Session* _s, HostCapabilityFace* _h, unsigned _i, CapDesc const&): Capability(_s, _h, _i)
{
RLPStream s;
sealAndSend(prep(s, StatusPacket, 1) << version());

3
libwhisper/WhisperPeer.h

@ -42,6 +42,7 @@ using p2p::Session;
using p2p::HostCapabilityFace;
using p2p::HostCapability;
using p2p::Capability;
using p2p::CapDesc;
/**
*/
@ -50,7 +51,7 @@ class WhisperPeer: public Capability
friend class WhisperHost;
public:
WhisperPeer(Session* _s, HostCapabilityFace* _h, unsigned _i);
WhisperPeer(Session* _s, HostCapabilityFace* _h, unsigned _i, CapDesc const& _cap);
virtual ~WhisperPeer();
static std::string name() { return "shh"; }

2
test/libp2p/capability.cpp

@ -49,7 +49,7 @@ struct VerbosityHolder
class TestCapability: public Capability
{
public:
TestCapability(Session* _s, HostCapabilityFace* _h, unsigned _idOffset): Capability(_s, _h, _idOffset), m_cntReceivedMessages(0), m_testSum(0) {}
TestCapability(Session* _s, HostCapabilityFace* _h, unsigned _idOffset, CapDesc const&): Capability(_s, _h, _idOffset), m_cntReceivedMessages(0), m_testSum(0) {}
virtual ~TestCapability() {}
int countReceivedMessages() { return m_cntReceivedMessages; }
int testSum() { return m_testSum; }

Loading…
Cancel
Save