Browse Source

reverted to old pv61 sync code

cl-refactor
arkpar 10 years ago
parent
commit
2ad9a6d265
  1. 3
      libethcore/Common.h
  2. 2
      libethereum/BlockChain.cpp
  3. 701
      libethereum/BlockChainSync.cpp
  4. 146
      libethereum/BlockChainSync.h
  5. 3
      libethereum/BlockQueue.cpp
  6. 6
      libethereum/CommonNet.h
  7. 578
      libethereum/EthereumHost.cpp
  8. 64
      libethereum/EthereumHost.h
  9. 11
      libethereum/EthereumPeer.cpp
  10. 16
      libethereum/EthereumPeer.h

3
libethcore/Common.h

@ -100,7 +100,8 @@ enum class ImportResult
{ {
Success = 0, Success = 0,
UnknownParent, UnknownParent,
FutureTime, FutureTimeKnown,
FutureTimeUnkwnown,
AlreadyInChain, AlreadyInChain,
AlreadyKnown, AlreadyKnown,
Malformed, Malformed,

2
libethereum/BlockChain.cpp

@ -372,7 +372,7 @@ pair<ImportResult, ImportRoute> BlockChain::attemptImport(bytes const& _block, O
} }
catch (FutureTime&) catch (FutureTime&)
{ {
return make_pair(ImportResult::FutureTime, make_pair(h256s(), h256s())); return make_pair(ImportResult::FutureTimeKnown, make_pair(h256s(), h256s()));
} }
catch (Exception& ex) catch (Exception& ex)
{ {

701
libethereum/BlockChainSync.cpp

@ -0,0 +1,701 @@
/*
This file is part of cpp-ethereum.
cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file EthereumHost.cpp
* @author Gav Wood <i@gavwood.com>
* @date 2014
*/
#include "BlockChainSync.h"
#include <chrono>
#include <thread>
#include <libdevcore/Common.h>
#include <libp2p/Host.h>
#include <libp2p/Session.h>
#include <libethcore/Exceptions.h>
#include <libethcore/Params.h>
#include "BlockChain.h"
#include "TransactionQueue.h"
#include "BlockQueue.h"
#include "EthereumPeer.h"
#include "EthereumHost.h"
#include "DownloadMan.h"
using namespace std;
using namespace dev;
using namespace dev::eth;
using namespace p2p;
unsigned const c_chainReorgSize = 30000;
BlockChainSync::BlockChainSync(EthereumHost& _host):
m_host(_host)
{
}
BlockChainSync::~BlockChainSync()
{
abortSync();
}
DownloadMan const& BlockChainSync::downloadMan() const
{
return host().downloadMan();
}
DownloadMan& BlockChainSync::downloadMan()
{
return host().downloadMan();
}
void BlockChainSync::abortSync()
{
host().foreachPeer([this](EthereumPeer* _p) { onPeerAborting(_p); return true; });
downloadMan().resetToChain(h256s());
}
void BlockChainSync::onPeerStatus(EthereumPeer*)
{
}
unsigned BlockChainSync::estimateHashes()
{
BlockInfo block = host().chain().info();
time_t lastBlockTime = (block.hash() == host().chain().genesisHash()) ? 1428192000 : (time_t)block.timestamp;
time_t now = time(0);
unsigned blockCount = c_chainReorgSize;
if (lastBlockTime > now)
clog(NetWarn) << "Clock skew? Latest block is in the future";
else
blockCount += (now - lastBlockTime) / (unsigned)c_durationLimit;
clog(NetAllDetail) << "Estimated hashes: " << blockCount;
return blockCount;
}
PV60Sync::PV60Sync(EthereumHost& _host):
BlockChainSync(_host)
{
}
SyncStatus PV60Sync::status() const
{
RecursiveGuard l(x_sync);
SyncStatus res;
res.state = m_state;
if (m_state == SyncState::Hashes)
{
res.hashesTotal = m_estimatedHashes;
res.hashesReceived = static_cast<unsigned>(m_syncingNeededBlocks.size());
res.hashesEstimated = true;
}
else if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks || m_state == SyncState::Waiting)
{
res.blocksTotal = downloadMan().chainSize();
res.blocksReceived = downloadMan().blocksGot().size();
}
return res;
}
void PV60Sync::setState(EthereumPeer* _peer, SyncState _s, bool _isSyncing, bool _needHelp)
{
bool changedState = (m_state != _s);
m_state = _s;
if (_isSyncing != (m_syncer == _peer) || (_isSyncing && changedState))
changeSyncer(_isSyncing ? _peer : nullptr, _needHelp);
else if (_s == SyncState::Idle)
changeSyncer(nullptr, _needHelp);
assert(!!m_syncer || _s == SyncState::Idle);
if (!_isSyncing)
{
m_syncingLatestHash = h256();
m_syncingTotalDifficulty = 0;
m_syncingNeededBlocks.clear();
}
}
void PV60Sync::transition(EthereumPeer* _peer, SyncState _s, bool _force, bool _needHelp)
{
clog(NetMessageSummary) << "Transition!" << EthereumHost::stateName(_s) << "from" << EthereumHost::stateName(m_state) << ", " << (isSyncing(_peer) ? "syncing" : "holding") << (needsSyncing(_peer) ? "& needed" : "");
//DEV_INVARIANT_CHECK;
if (m_state == SyncState::Idle && _s != SyncState::Idle)
_peer->m_requireTransactions = true;
RLPStream s;
if (_s == SyncState::Hashes)
{
if (m_state == SyncState::Idle)
{
if (isSyncing(_peer))
clog(NetWarn) << "Bad state: not asking for Hashes, yet syncing!";
m_syncingLatestHash = _peer->m_latestHash;
m_syncingTotalDifficulty = _peer->m_totalDifficulty;
setState(_peer, _s, true);
_peer->requestHashes(m_syncingLatestHash);
DEV_INVARIANT_CHECK;
return;
}
else if (m_state == SyncState::Hashes)
{
if (!isSyncing(_peer))
clog(NetWarn) << "Bad state: asking for Hashes yet not syncing!";
setState(_peer, _s, true);
_peer->requestHashes(m_syncingLastReceivedHash);
DEV_INVARIANT_CHECK;
return;
}
}
else if (_s == SyncState::Blocks)
{
if (m_state == SyncState::Hashes)
{
if (!isSyncing(_peer))
{
clog(NetWarn) << "Bad state: asking for Hashes yet not syncing!";
return;
}
if (shouldGrabBlocks(_peer))
{
clog(NetNote) << "Difficulty of hashchain HIGHER. Grabbing" << m_syncingNeededBlocks.size() << "blocks [latest now" << m_syncingLatestHash << ", was" << host().latestBlockSent() << "]";
downloadMan().resetToChain(m_syncingNeededBlocks);
}
else
{
clog(NetNote) << "Difficulty of hashchain not HIGHER. Ignoring.";
m_syncingLatestHash = h256();
setState(_peer, SyncState::Idle, false);
return;
}
assert (isSyncing(_peer));
}
// run through into...
if (m_state == SyncState::Idle || m_state == SyncState::Hashes || m_state == SyncState::Blocks)
{
// Looks like it's the best yet for total difficulty. Set to download.
setState(_peer, SyncState::Blocks, isSyncing(_peer), _needHelp); // will kick off other peers to help if available.
requestBlocks(_peer);
DEV_INVARIANT_CHECK;
return;
}
}
else if (_s == SyncState::NewBlocks)
{
if (m_state != SyncState::Idle && m_state != SyncState::NewBlocks)
clog(NetWarn) << "Bad state: Asking new blocks while syncing!";
else
{
setState(_peer, SyncState::NewBlocks, true, _needHelp);
requestBlocks(_peer);
DEV_INVARIANT_CHECK;
return;
}
}
else if (_s == SyncState::Idle)
{
host().foreachPeer([this](EthereumPeer* _p) { _p->setIdle(); return true; });
if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks)
{
clog(NetNote) << "Finishing blocks fetch...";
// a bit overkill given that the other nodes may yet have the needed blocks, but better to be safe than sorry.
if (isSyncing(_peer))
noteDoneBlocks(_peer, _force);
// NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary.
_peer->m_sub.doneFetch();
_peer->setIdle();
setState(_peer, SyncState::Idle, false);
}
else if (m_state == SyncState::Hashes)
{
clog(NetNote) << "Finishing hashes fetch...";
setState(_peer, SyncState::Idle, false);
}
// Otherwise it's fine. We don't care if it's Nothing->Nothing.
DEV_INVARIANT_CHECK;
return;
}
clog(NetWarn) << "Invalid state transition:" << EthereumHost::stateName(_s) << "from" << EthereumHost::stateName(m_state) << ", " << (isSyncing(_peer) ? "syncing" : "holding") << (needsSyncing(_peer) ? "& needed" : "");
}
void PV60Sync::requestBlocks(EthereumPeer* _peer)
{
_peer->requestBlocks();
if (_peer->m_asking != Asking::Blocks) //nothing to download
{
noteDoneBlocks(_peer, false);
if (downloadMan().isComplete())
transition(_peer, SyncState::Idle);
return;
}
}
void PV60Sync::setNeedsSyncing(EthereumPeer* _peer, h256 _latestHash, u256 _td)
{
_peer->m_latestHash = _latestHash;
_peer->m_totalDifficulty = _td;
if (_peer->m_latestHash)
noteNeedsSyncing(_peer);
_peer->session()->addNote("sync", string(isSyncing(_peer) ? "ongoing" : "holding") + (needsSyncing(_peer) ? " & needed" : ""));
}
bool PV60Sync::isSyncing(EthereumPeer* _peer) const
{
return m_syncer == _peer;
}
bool PV60Sync::shouldGrabBlocks(EthereumPeer* _peer) const
{
auto td = _peer->m_totalDifficulty;
auto lh = _peer->m_latestHash;
auto ctd = host().chain().details().totalDifficulty;
if (m_syncingNeededBlocks.empty())
return false;
clog(NetNote) << "Should grab blocks? " << td << "vs" << ctd << ";" << m_syncingNeededBlocks.size() << " blocks, ends" << m_syncingNeededBlocks.back();
if (td < ctd || (td == ctd && host().chain().currentHash() == lh))
return false;
return true;
}
void PV60Sync::attemptSync(EthereumPeer* _peer)
{
if (m_state != SyncState::Idle)
{
clog(NetAllDetail) << "Can't sync with this peer - outstanding asks.";
return;
}
// if already done this, then ignore.
if (!needsSyncing(_peer))
{
clog(NetAllDetail) << "Already synced with this peer.";
return;
}
h256 c = host().chain().currentHash();
unsigned n = host().chain().number();
u256 td = host().chain().details().totalDifficulty;
clog(NetAllDetail) << "Attempt chain-grab? Latest:" << c << ", number:" << n << ", TD:" << td << " versus " << _peer->m_totalDifficulty;
if (td >= _peer->m_totalDifficulty)
{
clog(NetAllDetail) << "No. Our chain is better.";
resetNeedsSyncing(_peer);
transition(_peer, SyncState::Idle);
}
else
{
clog(NetAllDetail) << "Yes. Their chain is better.";
m_estimatedHashes = _peer->m_expectedHashes - c_chainReorgSize;
transition(_peer, SyncState::Hashes);
}
}
void PV60Sync::noteNeedsSyncing(EthereumPeer* _peer)
{
// if already downloading hash-chain, ignore.
if (isSyncing())
{
clog(NetAllDetail) << "Sync in progress: Just set to help out.";
if (m_state == SyncState::Blocks)
_peer->requestBlocks();
}
else
// otherwise check to see if we should be downloading...
attemptSync(_peer);
}
void PV60Sync::changeSyncer(EthereumPeer* _syncer, bool _needHelp)
{
if (_syncer)
clog(NetAllDetail) << "Changing syncer to" << _syncer->session()->socketId();
else
clog(NetAllDetail) << "Clearing syncer.";
m_syncer = _syncer;
if (isSyncing())
{
if (_needHelp && (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks))
host().foreachPeer([&](EthereumPeer* _p)
{
clog(NetNote) << "Getting help with downloading blocks";
if (_p != _syncer && _p->m_asking == Asking::Nothing)
transition(_p, m_state);
return true;
});
}
else
{
// start grabbing next hash chain if there is one.
host().foreachPeer([this](EthereumPeer* _p)
{
attemptSync(_p);
return !isSyncing();
});
if (!isSyncing())
{
if (m_state != SyncState::Idle)
setState(_syncer, SyncState::Idle);
clog(NetNote) << "No more peers to sync with.";
}
}
assert(!!m_syncer || m_state == SyncState::Idle);
}
void PV60Sync::noteDoneBlocks(EthereumPeer* _peer, bool _clemency)
{
resetNeedsSyncing(_peer);
if (downloadMan().isComplete())
{
// Done our chain-get.
clog(NetNote) << "Chain download complete.";
// 1/100th for each useful block hash.
_peer->addRating(downloadMan().chainSize() / 100);
downloadMan().reset();
}
else if (isSyncing(_peer))
{
if (_clemency)
clog(NetNote) << "Chain download failed. Aborted while incomplete.";
else
{
// Done our chain-get.
clog(NetWarn) << "Chain download failed. Peer with blocks didn't have them all. This peer is bad and should be punished.";
clog(NetWarn) << downloadMan().remaining();
clog(NetWarn) << "WOULD BAN.";
// m_banned.insert(_peer->session()->id()); // We know who you are!
// _peer->disable("Peer sent hashes but was unable to provide the blocks.");
}
downloadMan().reset();
}
_peer->m_sub.doneFetch();
}
void PV60Sync::onPeerStatus(EthereumPeer* _peer)
{
RecursiveGuard l(x_sync);
DEV_INVARIANT_CHECK;
if (_peer->m_genesisHash != host().chain().genesisHash())
_peer->disable("Invalid genesis hash");
else if (_peer->m_protocolVersion != host().protocolVersion() && _peer->m_protocolVersion != EthereumHost::c_oldProtocolVersion)
_peer->disable("Invalid protocol version.");
else if (_peer->m_networkId != host().networkId())
_peer->disable("Invalid network identifier.");
else if (_peer->session()->info().clientVersion.find("/v0.7.0/") != string::npos)
_peer->disable("Blacklisted client version.");
else if (host().isBanned(_peer->session()->id()))
_peer->disable("Peer banned for previous bad behaviour.");
else
{
unsigned estimatedHashes = estimateHashes();
_peer->m_expectedHashes = estimatedHashes;
setNeedsSyncing(_peer, _peer->m_latestHash, _peer->m_totalDifficulty);
}
DEV_INVARIANT_CHECK;
}
void PV60Sync::onPeerBlocks(EthereumPeer* _peer, RLP const& _r)
{
RecursiveGuard l(x_sync);
DEV_INVARIANT_CHECK;
unsigned itemCount = _r.itemCount();
clog(NetMessageSummary) << "Blocks (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreBlocks");
_peer->setIdle();
if (m_state != SyncState::Blocks && m_state != SyncState::NewBlocks)
clog(NetWarn) << "Unexpected Blocks received!";
if (itemCount == 0)
{
// Got to this peer's latest block - just give up.
noteDoneBlocks(_peer, false);
if (downloadMan().isComplete())
transition(_peer, SyncState::Idle);
return;
}
unsigned success = 0;
unsigned future = 0;
unsigned unknown = 0;
unsigned got = 0;
unsigned repeated = 0;
u256 maxDifficulty = 0;
h256 maxUnknown;
for (unsigned i = 0; i < itemCount; ++i)
{
auto h = BlockInfo::headerHash(_r[i].data());
if (_peer->m_sub.noteBlock(h))
{
_peer->addRating(10);
switch (host().bq().import(_r[i].data(), host().chain()))
{
case ImportResult::Success:
success++;
break;
case ImportResult::Malformed:
case ImportResult::BadChain:
_peer->disable("Malformed block received.");
return;
case ImportResult::FutureTimeKnown:
future++;
break;
case ImportResult::AlreadyInChain:
case ImportResult::AlreadyKnown:
got++;
break;
case ImportResult::FutureTimeUnkwnown:
future++; //Fall through
case ImportResult::UnknownParent:
{
unknown++;
if (m_state == SyncState::NewBlocks)
{
BlockInfo bi;
bi.populateFromHeader(_r[i][0]);
if (bi.difficulty > maxDifficulty)
{
maxDifficulty = bi.difficulty;
maxUnknown = h;
}
}
break;
}
default:;
}
}
else
{
_peer->addRating(0); // -1?
repeated++;
}
}
clog(NetMessageSummary) << dec << success << "imported OK," << unknown << "with unknown parents," << future << "with future timestamps," << got << " already known," << repeated << " repeats received.";
if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks)
{
if (downloadMan().isComplete())
transition(_peer, SyncState::Idle);
else if (!got)
transition(_peer, m_state);
else
noteDoneBlocks(_peer, false);
}
DEV_INVARIANT_CHECK;
}
void PV60Sync::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes)
{
RecursiveGuard l(x_sync);
DEV_INVARIANT_CHECK;
_peer->setIdle();
if (!isSyncing(_peer))
{
clog(NetMessageSummary) << "Ignoring hashes synce not syncing";
return;
}
if (_hashes.size() == 0)
{
transition(_peer, SyncState::Blocks);
return;
}
unsigned knowns = 0;
unsigned unknowns = 0;
for (unsigned i = 0; i < _hashes.size(); ++i)
{
auto h = _hashes[i];
auto status = host().bq().blockStatus(h);
if (status == QueueStatus::Importing || status == QueueStatus::Ready || host().chain().isKnown(h))
{
clog(NetMessageSummary) << "block hash ready:" << h << ". Start blocks download...";
assert (isSyncing(_peer));
transition(_peer, SyncState::Blocks);
return;
}
else if (status == QueueStatus::Bad)
{
cwarn << "block hash bad!" << h << ". Bailing...";
transition(_peer, SyncState::Idle);
return;
}
else if (status == QueueStatus::Unknown)
{
unknowns++;
m_syncingNeededBlocks.push_back(h);
}
else
knowns++;
m_syncingLastReceivedHash = h;
}
clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns; now at" << m_syncingLastReceivedHash;
if (m_syncingNeededBlocks.size() > _peer->m_expectedHashes)
{
_peer->disable("Too many hashes");
m_syncingNeededBlocks.clear();
m_syncingLatestHash = h256();
transition(_peer, SyncState::Idle);
return;
}
// run through - ask for more.
transition(_peer, SyncState::Hashes);
DEV_INVARIANT_CHECK;
}
void PV60Sync::abortSync(EthereumPeer* _peer)
{
if (isSyncing(_peer))
{
host().foreachPeer([this](EthereumPeer* _p) { _p->setIdle(); return true; });
transition(_peer, SyncState::Idle, true);
}
DEV_INVARIANT_CHECK;
}
void PV60Sync::onPeerAborting(EthereumPeer* _peer)
{
abortSync(_peer);
DEV_INVARIANT_CHECK;
}
void PV60Sync::onPeerNewBlock(EthereumPeer* _peer, RLP const& _r)
{
DEV_INVARIANT_CHECK;
RecursiveGuard l(x_sync);
auto h = BlockInfo::headerHash(_r[0].data());
clog(NetMessageSummary) << "NewBlock: " << h;
if (_r.itemCount() != 2)
_peer->disable("NewBlock without 2 data fields.");
else
{
switch (host().bq().import(_r[0].data(), host().chain()))
{
case ImportResult::Success:
_peer->addRating(100);
break;
case ImportResult::FutureTimeKnown:
//TODO: Rating dependent on how far in future it is.
break;
case ImportResult::Malformed:
case ImportResult::BadChain:
_peer->disable("Malformed block received.");
return;
case ImportResult::AlreadyInChain:
case ImportResult::AlreadyKnown:
break;
case ImportResult::FutureTimeUnkwnown:
case ImportResult::UnknownParent:
clog(NetMessageSummary) << "Received block with no known parent. Resyncing...";
setNeedsSyncing(_peer, h, _r[1].toInt<u256>());
break;
default:;
}
DEV_GUARDED(_peer->x_knownBlocks)
_peer->m_knownBlocks.insert(h);
}
DEV_INVARIANT_CHECK;
}
void PV60Sync::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes)
{
RecursiveGuard l(x_sync);
DEV_INVARIANT_CHECK;
if (isSyncing())
{
clog(NetMessageSummary) << "Ignoring since we're already downloading.";
return;
}
unsigned knowns = 0;
unsigned unknowns = 0;
for (auto h: _hashes)
{
_peer->addRating(1);
DEV_GUARDED(_peer->x_knownBlocks)
_peer->m_knownBlocks.insert(h);
auto status = host().bq().blockStatus(h);
if (status == QueueStatus::Importing || status == QueueStatus::Ready || host().chain().isKnown(h))
knowns++;
else if (status == QueueStatus::Bad)
{
cwarn << "block hash bad!" << h << ". Bailing...";
return;
}
else if (status == QueueStatus::Unknown)
{
unknowns++;
m_syncingNeededBlocks.push_back(h);
}
else
knowns++;
}
clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns";
if (unknowns > 0)
{
clog(NetNote) << "Not syncing and new block hash discovered: syncing without help.";
downloadMan().resetToChain(m_syncingNeededBlocks);
transition(_peer, SyncState::NewBlocks, false, false);
}
DEV_INVARIANT_CHECK;
}
bool PV60Sync::invariants() const
{
if (m_state == SyncState::Idle && !!m_syncer)
return false;
if (m_state != SyncState::Idle && !m_syncer)
return false;
if (m_state == SyncState::Hashes)
{
bool hashes = false;
host().foreachPeer([&](EthereumPeer* _p) { if (_p->m_asking == Asking::Hashes) hashes = true; return !hashes; });
if (!hashes)
return false;
}
if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks)
{
bool blocks = false;
host().foreachPeer([&](EthereumPeer* _p) { if (_p->m_asking == Asking::Blocks) blocks = true; return !blocks; });
if (!blocks)
return false;
}
return true;
}

146
libethereum/BlockChainSync.h

@ -0,0 +1,146 @@
/*
This file is part of cpp-ethereum.
cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file EthereumHost.h
* @author Gav Wood <i@gavwood.com>
* @date 2014
*/
#pragma once
#include <mutex>
#include <unordered_map>
#include <vector>
#include <unordered_set>
#include <memory>
#include <utility>
#include <thread>
#include <libdevcore/Guards.h>
#include <libdevcore/Worker.h>
#include <libdevcore/RangeMask.h>
#include <libethcore/Common.h>
#include <libp2p/Common.h>
#include "CommonNet.h"
#include "EthereumPeer.h" //TODO: forward decl
#include "DownloadMan.h"
namespace dev
{
class RLPStream;
namespace eth
{
class EthereumHost;
class BlockQueue;
/**
* @brief BlockChain synchronization strategy class
* @doWork Syncs to peers and sends new blocks and transactions.
*/
class BlockChainSync: public HasInvariants
{
public:
BlockChainSync(EthereumHost& _host);
/// Will block on network process events.
virtual ~BlockChainSync();
void abortSync();
DownloadMan const& downloadMan() const;
DownloadMan& downloadMan();
virtual bool isSyncing() const = 0;
virtual void onPeerStatus(EthereumPeer* _peer); ///< Called by peer to report status
virtual void onPeerBlocks(EthereumPeer* _peer, RLP const& _r) = 0; ///< Called by peer once it has new blocks during syn
virtual void onPeerNewBlock(EthereumPeer* _peer, RLP const& _r) = 0; ///< Called by peer once it has new blocks
virtual void onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes) = 0; ///< Called by peer once it has new hashes
virtual void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes) = 0; ///< Called by peer once it has another sequential block of hashes during sync
virtual void onPeerAborting(EthereumPeer* _peer) = 0; ///< Called by peer when it is disconnecting
virtual SyncStatus status() const = 0;
static char const* stateName(SyncState _s) { return s_stateNames[static_cast<int>(_s)]; }
private:
static char const* const s_stateNames[static_cast<int>(SyncState::Size)];
void setState(SyncState _s);
bool invariants() const override = 0;
EthereumHost& m_host;
Handler m_bqRoomAvailable;
HashDownloadMan m_hashMan;
protected:
EthereumHost& host() { return m_host; }
EthereumHost const& host() const { return m_host; }
unsigned estimateHashes();
mutable RecursiveMutex x_sync;
SyncState m_state = SyncState::Idle; ///< Current sync state
SyncState m_lastActiveState = SyncState::Idle; ///< Saved state before entering waiting queue mode
unsigned m_estimatedHashes = 0; ///< Number of estimated hashes for the last peer over PV60. Used for status reporting only.
};
class PV60Sync: public BlockChainSync
{
public:
PV60Sync(EthereumHost& _host);
bool isSyncing() const override { return !!m_syncer; }
void onPeerStatus(EthereumPeer* _peer) override; ///< Called by peer to report status
void onPeerBlocks(EthereumPeer* _peer, RLP const& _r) override; ///< Called by peer once it has new blocks during syn
void onPeerNewBlock(EthereumPeer* _peer, RLP const& _r) override; ///< Called by peer once it has new blocks
void onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes) override; ///< Called by peer once it has new hashes
void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes) override; ///< Called by peer once it has another sequential block of hashes during sync
void onPeerAborting(EthereumPeer* _peer) override; ///< Called by peer when it is disconnecting
SyncStatus status() const override;
void transition(EthereumPeer* _peer, SyncState _s, bool _force = false, bool _needHelp = true);
void resetNeedsSyncing(EthereumPeer* _peer) { setNeedsSyncing(_peer, h256(), 0); }
bool needsSyncing(EthereumPeer* _peer) const { return !!_peer->m_latestHash; }
void setNeedsSyncing(EthereumPeer* _peer, h256 _latestHash, u256 _td);
bool shouldGrabBlocks(EthereumPeer* _peer) const;
void attemptSync(EthereumPeer* _peer);
void setState(EthereumPeer* _peer, SyncState _s, bool _isSyncing = false, bool _needHelp = false);
bool isSyncing(EthereumPeer* _peer) const;
void noteNeedsSyncing(EthereumPeer* _who);
void changeSyncer(EthereumPeer* _syncer, bool _needHelp);
void noteDoneBlocks(EthereumPeer* _who, bool _clemency);
void abortSync(EthereumPeer* _peer);
void requestBlocks(EthereumPeer* _peer);
private:
bool invariants() const override;
h256s m_knownHashes; ///< List of block hashes we need to download.
h256s m_syncingNeededBlocks; ///< The blocks that we should download from this peer.
h256 m_syncingLastReceivedHash; ///< Hash most recently received from peer.
h256 m_syncingLatestHash; ///< Peer's latest block's hash, as of the current sync.
u256 m_syncingTotalDifficulty; ///< Peer's latest block's total difficulty, as of the current sync.
EthereumPeer* m_syncer = nullptr; // TODO: switch to weak_ptr
};
}
}

3
libethereum/BlockQueue.cpp

@ -229,7 +229,8 @@ ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc, boo
cblockq << "OK - queued for future [" << bi.timestamp << "vs" << time(0) << "] - will wait until" << buf; cblockq << "OK - queued for future [" << bi.timestamp << "vs" << time(0) << "] - will wait until" << buf;
m_unknownSize += _block.size(); m_unknownSize += _block.size();
m_unknownCount++; m_unknownCount++;
return ImportResult::FutureTime; bool unknown = !m_readySet.count(bi.parentHash) && !m_drainingSet.count(bi.parentHash) && !_bc.isKnown(bi.parentHash);
return unknown ? ImportResult::FutureTimeUnkwnown : ImportResult::FutureTimeKnown;
} }
else else
{ {

6
libethereum/CommonNet.h

@ -80,10 +80,8 @@ enum class Asking
enum class SyncState enum class SyncState
{ {
Idle, ///< Initial chain sync complete. Waiting for new packets Idle, ///< Initial chain sync complete. Waiting for new packets
WaitingQueue, ///< Block downloading paused. Waiting for block queue to process blocks and free space Waiting, ///< Block downloading paused. Waiting for block queue to process blocks and free space
HashesNegotiate, ///< Waiting for first hashes to arrive Hashes, ///< Downloading hashes from multiple peers over
HashesSingle, ///< Locked on and downloading hashes from a single peer
HashesParallel, ///< Downloading hashes from multiple peers over
Blocks, ///< Downloading blocks Blocks, ///< Downloading blocks
NewBlocks, ///< Downloading blocks learned from NewHashes packet NewBlocks, ///< Downloading blocks learned from NewHashes packet

578
libethereum/EthereumHost.cpp

@ -33,6 +33,8 @@
#include "BlockQueue.h" #include "BlockQueue.h"
#include "EthereumPeer.h" #include "EthereumPeer.h"
#include "DownloadMan.h" #include "DownloadMan.h"
#include "BlockChainSync.h"
using namespace std; using namespace std;
using namespace dev; using namespace dev;
using namespace dev::eth; using namespace dev::eth;
@ -41,7 +43,7 @@ using namespace p2p;
unsigned const EthereumHost::c_oldProtocolVersion = 60; //TODO: remove this once v61+ is common unsigned const EthereumHost::c_oldProtocolVersion = 60; //TODO: remove this once v61+ is common
unsigned const c_chainReorgSize = 30000; unsigned const c_chainReorgSize = 30000;
char const* const EthereumHost::s_stateNames[static_cast<int>(SyncState::Size)] = {"Idle", "WaitingQueue", "HashesNegotiate", "HashesSingle", "HashesParallel", "Blocks", "NewBlocks" }; char const* const EthereumHost::s_stateNames[static_cast<int>(SyncState::Size)] = {"Idle", "Waiting", "Hashes", "Blocks", "NewBlocks" };
EthereumHost::EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQueue& _bq, u256 _networkId): EthereumHost::EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQueue& _bq, u256 _networkId):
HostCapability<EthereumPeer>(), HostCapability<EthereumPeer>(),
@ -51,15 +53,12 @@ EthereumHost::EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQu
m_bq (_bq), m_bq (_bq),
m_networkId (_networkId) m_networkId (_networkId)
{ {
setState(SyncState::HashesNegotiate);
m_latestBlockSent = _ch.currentHash(); m_latestBlockSent = _ch.currentHash();
m_hashMan.reset(m_chain.number() + 1);
m_bqRoomAvailable = m_bq.onRoomAvailable([this](){ m_continueSync = true; });
} }
EthereumHost::~EthereumHost() EthereumHost::~EthereumHost()
{ {
foreachPeer([](EthereumPeer* _p) { _p->abortSync(); }); //foreachPeer([](EthereumPeer* _p) { _p->abortSync(); });
} }
bool EthereumHost::ensureInitialised() bool EthereumHost::ensureInitialised()
@ -79,31 +78,13 @@ bool EthereumHost::ensureInitialised()
void EthereumHost::reset() void EthereumHost::reset()
{ {
foreachPeer([](EthereumPeer* _p) { _p->abortSync(); }); Guard l(x_sync);
m_man.resetToChain(h256s()); if (m_sync)
m_hashMan.reset(m_chain.number() + 1); m_sync->abortSync();
setState(SyncState::HashesNegotiate); m_sync.reset();
m_syncingLatestHash = h256();
m_syncingTotalDifficulty = 0;
m_latestBlockSent = h256(); m_latestBlockSent = h256();
m_transactionsSent.clear(); m_transactionsSent.clear();
m_hashes.clear();
}
void EthereumHost::resetSyncTo(h256 const& _h)
{
setState(SyncState::HashesNegotiate);
m_syncingLatestHash = _h;
}
void EthereumHost::setState(SyncState _s)
{
if (m_state != _s)
{
clog(NetAllDetail) << "SyncState changed from " << stateName(m_state) << " to " << stateName(_s);
m_state = _s;
}
} }
void EthereumHost::doWork() void EthereumHost::doWork()
@ -125,14 +106,7 @@ void EthereumHost::doWork()
} }
} }
if (m_continueSync) foreachPeer([](EthereumPeer* _p) { _p->tick(); return true; });
{
m_continueSync = false;
RecursiveGuard l(x_sync);
continueSync();
}
foreachPeer([](EthereumPeer* _p) { _p->tick(); });
// return netChange; // return netChange;
// TODO: Figure out what to do with netChange. // TODO: Figure out what to do with netChange.
@ -174,24 +148,28 @@ void EthereumHost::maintainTransactions()
cnote << "Sent" << n << "transactions to " << _p->session()->info().clientVersion; cnote << "Sent" << n << "transactions to " << _p->session()->info().clientVersion;
} }
_p->m_requireTransactions = false; _p->m_requireTransactions = false;
return true;
}); });
} }
void EthereumHost::foreachPeer(std::function<void(EthereumPeer*)> const& _f) const void EthereumHost::foreachPeer(std::function<bool(EthereumPeer*)> const& _f) const
{ {
foreachPeerPtr([&](std::shared_ptr<EthereumPeer> _p) foreachPeerPtr([&](std::shared_ptr<EthereumPeer> _p)
{ {
if (_p) if (_p)
_f(_p.get()); return _f(_p.get());
return true;
}); });
} }
void EthereumHost::foreachPeerPtr(std::function<void(std::shared_ptr<EthereumPeer>)> const& _f) const void EthereumHost::foreachPeerPtr(std::function<bool(std::shared_ptr<EthereumPeer>)> const& _f) const
{ {
for (auto s: peerSessions()) for (auto s: peerSessions())
_f(s.first->cap<EthereumPeer>()); if (!_f(s.first->cap<EthereumPeer>()))
return;
for (auto s: peerSessions(c_oldProtocolVersion)) //TODO: remove once v61+ is common for (auto s: peerSessions(c_oldProtocolVersion)) //TODO: remove once v61+ is common
_f(s.first->cap<EthereumPeer>(c_oldProtocolVersion)); if (!_f(s.first->cap<EthereumPeer>(c_oldProtocolVersion)))
return;
} }
tuple<vector<shared_ptr<EthereumPeer>>, vector<shared_ptr<EthereumPeer>>, vector<shared_ptr<Session>>> EthereumHost::randomSelection(unsigned _percent, std::function<bool(EthereumPeer*)> const& _allow) tuple<vector<shared_ptr<EthereumPeer>>, vector<shared_ptr<EthereumPeer>>, vector<shared_ptr<Session>>> EthereumHost::randomSelection(unsigned _percent, std::function<bool(EthereumPeer*)> const& _allow)
@ -263,326 +241,50 @@ void EthereumHost::maintainBlocks(h256 const& _currentHash)
} }
} }
void EthereumHost::onPeerStatus(EthereumPeer* _peer) BlockChainSync& EthereumHost::sync()
{ {
RecursiveGuard l(x_sync); if (m_sync)
DEV_INVARIANT_CHECK; return *m_sync; // We only chose sync strategy once
if (_peer->m_genesisHash != m_chain.genesisHash())
_peer->disable("Invalid genesis hash"); bool pv61 = false;
else if (_peer->m_protocolVersion != protocolVersion() && _peer->m_protocolVersion != c_oldProtocolVersion) foreachPeer([&](EthereumPeer* _p)
_peer->disable("Invalid protocol version.");
else if (_peer->m_networkId != networkId())
_peer->disable("Invalid network identifier.");
else if (_peer->session()->info().clientVersion.find("/v0.7.0/") != string::npos)
_peer->disable("Blacklisted client version.");
else if (isBanned(_peer->session()->id()))
_peer->disable("Peer banned for previous bad behaviour.");
else
{ {
unsigned estimatedHashes = estimateHashes(); if (_p->m_protocolVersion == protocolVersion())
if (_peer->m_protocolVersion == protocolVersion()) pv61 = true;
{ return !pv61;
if (_peer->m_latestBlockNumber > m_chain.number()) });
_peer->m_expectedHashes = (unsigned)_peer->m_latestBlockNumber - m_chain.number(); m_sync.reset(pv61 ? new PV60Sync(*this) : new PV60Sync(*this));
if (_peer->m_expectedHashes > estimatedHashes) return *m_sync;
_peer->disable("Too many hashes");
else if (needHashes() && m_hashMan.chainSize() < _peer->m_expectedHashes)
m_hashMan.resetToRange(m_chain.number() + 1, _peer->m_expectedHashes);
}
else
_peer->m_expectedHashes = estimatedHashes;
continueSync(_peer);
}
DEV_INVARIANT_CHECK;
} }
unsigned EthereumHost::estimateHashes() void EthereumHost::onPeerStatus(EthereumPeer* _peer)
{ {
BlockInfo block = m_chain.info(); Guard l(x_sync);
time_t lastBlockTime = (block.hash() == m_chain.genesisHash()) ? 1428192000 : (time_t)block.timestamp; sync().onPeerStatus(_peer);
time_t now = time(0);
unsigned blockCount = c_chainReorgSize;
if (lastBlockTime > now)
clog(NetWarn) << "Clock skew? Latest block is in the future";
else
blockCount += (now - lastBlockTime) / (unsigned)c_durationLimit;
clog(NetAllDetail) << "Estimated hashes: " << blockCount;
return blockCount;
} }
void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes) void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes)
{ {
RecursiveGuard l(x_sync); Guard l(x_sync);
if (_peer->m_syncHashNumber > 0) sync().onPeerHashes(_peer, _hashes);
_peer->m_syncHashNumber += _hashes.size();
_peer->setAsking(Asking::Nothing);
onPeerHashes(_peer, _hashes, false);
}
void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes, bool _complete)
{
DEV_INVARIANT_CHECK;
if (_hashes.empty())
{
_peer->m_hashSub.doneFetch();
continueSync();
return;
}
bool syncByNumber = _peer->m_syncHashNumber;
if (!syncByNumber && !_complete && _peer->m_syncHash != m_syncingLatestHash)
{
// Obsolete hashes, discard
continueSync(_peer);
return;
}
unsigned knowns = 0;
unsigned unknowns = 0;
h256s neededBlocks;
unsigned firstNumber = _peer->m_syncHashNumber - _hashes.size();
for (unsigned i = 0; i < _hashes.size(); ++i)
{
_peer->addRating(1);
auto h = _hashes[i];
auto status = m_bq.blockStatus(h);
if (status == QueueStatus::Importing || status == QueueStatus::Ready || m_chain.isKnown(h))
{
clog(NetMessageSummary) << "Block hash already known:" << h;
if (!syncByNumber)
{
m_hashes += neededBlocks;
clog(NetMessageSummary) << "Start blocks download...";
onPeerDoneHashes(_peer, true);
return;
}
}
else if (status == QueueStatus::Bad)
{
cwarn << "block hash bad!" << h << ". Bailing...";
_peer->setIdle();
return;
}
else if (status == QueueStatus::Unknown)
{
unknowns++;
neededBlocks.push_back(h);
}
else
knowns++;
if (!syncByNumber)
m_syncingLatestHash = h;
else
_peer->m_hashSub.noteHash(firstNumber + i, 1);
}
if (syncByNumber)
{
m_man.appendToChain(neededBlocks); // Append to download manager immediatelly
clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns";
}
else
{
m_hashes += neededBlocks; // Append to local list
clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns; now at" << m_syncingLatestHash;
}
if (_complete)
{
clog(NetMessageSummary) << "Start new blocks download...";
m_syncingLatestHash = h256();
setState(SyncState::NewBlocks);
m_man.resetToChain(m_hashes);
m_hashes.clear();
m_hashMan.reset(m_chain.number() + 1);
continueSync(_peer);
}
else if (syncByNumber && m_hashMan.isComplete())
{
// Done our chain-get.
clog(NetNote) << "Hashes download complete.";
onPeerDoneHashes(_peer, false);
}
else if (m_hashes.size() > _peer->m_expectedHashes)
{
_peer->disable("Too many hashes");
m_hashes.clear();
m_syncingLatestHash = h256();
setState(SyncState::HashesNegotiate);
continueSync(); ///Try with some other peer, keep the chain
}
else
continueSync(_peer); /// Grab next hashes
DEV_INVARIANT_CHECK;
}
void EthereumHost::onPeerDoneHashes(EthereumPeer* _peer, bool _localChain)
{
assert(_peer->m_asking == Asking::Nothing);
m_syncingLatestHash = h256();
setState(SyncState::Blocks);
if (_peer->m_protocolVersion != protocolVersion() || _localChain)
{
m_man.resetToChain(m_hashes);
_peer->addRating(m_man.chainSize() / 100); //TODO: what about other peers?
}
m_hashMan.reset(m_chain.number() + 1);
m_hashes.clear();
continueSync();
} }
void EthereumHost::onPeerBlocks(EthereumPeer* _peer, RLP const& _r) void EthereumHost::onPeerBlocks(EthereumPeer* _peer, RLP const& _r)
{ {
RecursiveGuard l(x_sync); Guard l(x_sync);
DEV_INVARIANT_CHECK; sync().onPeerBlocks(_peer, _r);
_peer->setAsking(Asking::Nothing);
unsigned itemCount = _r.itemCount();
clog(NetMessageSummary) << "Blocks (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreBlocks");
if (itemCount == 0)
{
// Got to this peer's latest block - just give up.
clog(NetNote) << "Finishing blocks fetch...";
// NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary.
_peer->m_sub.doneFetch();
_peer->setIdle();
return;
}
unsigned success = 0;
unsigned future = 0;
unsigned unknown = 0;
unsigned got = 0;
unsigned repeated = 0;
h256 lastUnknown;
for (unsigned i = 0; i < itemCount; ++i)
{
auto h = BlockInfo::headerHash(_r[i].data());
if (_peer->m_sub.noteBlock(h))
{
_peer->addRating(10);
switch (m_bq.import(_r[i].data(), m_chain))
{
case ImportResult::Success:
success++;
break;
case ImportResult::Malformed:
case ImportResult::BadChain:
_peer->disable("Malformed block received.");
return;
case ImportResult::FutureTime:
future++;
break;
case ImportResult::AlreadyInChain:
case ImportResult::AlreadyKnown:
got++;
break;
case ImportResult::UnknownParent:
lastUnknown = h;
unknown++;
break;
default:;
}
}
else
{
_peer->addRating(0); // -1?
repeated++;
}
}
clog(NetMessageSummary) << dec << success << "imported OK," << unknown << "with unknown parents," << future << "with future timestamps," << got << " already known," << repeated << " repeats received.";
if (m_state == SyncState::NewBlocks && unknown > 0)
{
_peer->m_latestHash = lastUnknown;
resetSyncTo(lastUnknown);
}
continueSync(_peer);
DEV_INVARIANT_CHECK;
} }
void EthereumHost::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes) void EthereumHost::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes)
{ {
RecursiveGuard l(x_sync); Guard l(x_sync);
DEV_INVARIANT_CHECK; sync().onPeerNewHashes(_peer, _hashes);
if (isSyncing() || _peer->isConversing())
{
clog(NetMessageSummary) << "Ignoring new hashes since we're already downloading.";
return;
}
clog(NetNote) << "New block hash discovered: syncing without help.";
_peer->m_syncHashNumber = 0;
onPeerHashes(_peer, _hashes, true);
DEV_INVARIANT_CHECK;
} }
void EthereumHost::onPeerNewBlock(EthereumPeer* _peer, RLP const& _r) void EthereumHost::onPeerNewBlock(EthereumPeer* _peer, RLP const& _r)
{ {
RecursiveGuard l(x_sync); Guard l(x_sync);
DEV_INVARIANT_CHECK; sync().onPeerNewBlock(_peer, _r);
if ((isSyncing() || _peer->isConversing()) && m_state != SyncState::NewBlocks)
{
clog(NetMessageSummary) << "Ignoring new blocks since we're already downloading.";
return;
}
auto h = BlockInfo::headerHash(_r[0].data());
clog(NetMessageSummary) << "NewBlock: " << h;
if (_r.itemCount() != 2)
_peer->disable("NewBlock without 2 data fields.");
else
{
bool sync = false;
switch (m_bq.import(_r[0].data(), m_chain))
{
case ImportResult::Success:
_peer->addRating(100);
break;
case ImportResult::FutureTime:
//TODO: Rating dependent on how far in future it is.
break;
case ImportResult::Malformed:
case ImportResult::BadChain:
_peer->disable("Malformed block received.");
return;
case ImportResult::AlreadyInChain:
case ImportResult::AlreadyKnown:
break;
case ImportResult::UnknownParent:
if (h)
{
u256 difficulty = _r[1].toInt<u256>();
if (m_syncingTotalDifficulty < difficulty)
{
clog(NetMessageSummary) << "Received block with no known parent. Resyncing...";
_peer->m_latestHash = h;
_peer->m_totalDifficulty = difficulty;
resetSyncTo(h);;
sync = true;
}
}
break;
default:;
}
DEV_GUARDED(_peer->x_knownBlocks)
_peer->m_knownBlocks.insert(h);
if (sync)
continueSync();
}
DEV_INVARIANT_CHECK;
} }
void EthereumHost::onPeerTransactions(EthereumPeer* _peer, RLP const& _r) void EthereumHost::onPeerTransactions(EthereumPeer* _peer, RLP const& _r)
@ -615,202 +317,28 @@ void EthereumHost::onPeerTransactions(EthereumPeer* _peer, RLP const& _r)
void EthereumHost::onPeerAborting(EthereumPeer* _peer) void EthereumHost::onPeerAborting(EthereumPeer* _peer)
{ {
RecursiveGuard l(x_sync); Guard l(x_sync);
if (_peer->isConversing()) if (m_sync)
{ m_sync->onPeerAborting(_peer);
_peer->setIdle();
if (_peer->isCriticalSyncing())
_peer->setRude();
continueSync();
}
}
void EthereumHost::continueSync()
{
if (m_state == SyncState::WaitingQueue)
setState(m_lastActiveState);
clog(NetAllDetail) << "Continuing sync for all peers";
foreachPeer([&](EthereumPeer* _p)
{
if (_p->m_asking == Asking::Nothing)
continueSync(_p);
});
}
void EthereumHost::continueSync(EthereumPeer* _peer)
{
DEV_INVARIANT_CHECK;
assert(_peer->m_asking == Asking::Nothing);
bool otherPeerV60Sync = false;
bool otherPeerV61Sync = false;
if (needHashes())
{
if (!peerShouldGrabChain(_peer))
{
_peer->setIdle();
return;
}
foreachPeer([&](EthereumPeer* _p)
{
if (_p != _peer && _p->m_asking == Asking::Hashes)
{
if (_p->m_protocolVersion != protocolVersion())
otherPeerV60Sync = true; // Already have a peer downloading hash chain with old protocol, do nothing
else
otherPeerV61Sync = true; // Already have a peer downloading hash chain with V61+ protocol, join if supported
}
});
if (otherPeerV60Sync && !m_hashes.empty())
{
/// Downloading from other peer with v60 protocol, nothing else we can do
_peer->setIdle();
return;
}
if (otherPeerV61Sync && _peer->m_protocolVersion != protocolVersion())
{
/// Downloading from other peer with v61+ protocol which this peer does not support,
_peer->setIdle();
return;
}
if (_peer->m_protocolVersion == protocolVersion() && !m_hashMan.isComplete())
{
setState(SyncState::HashesParallel);
_peer->requestHashes(); /// v61+ and not catching up to a particular hash
}
else
{
// Restart/continue sync in single peer mode
if (!m_syncingLatestHash)
{
m_syncingLatestHash =_peer->m_latestHash;
m_syncingTotalDifficulty = _peer->m_totalDifficulty;
}
if (_peer->m_totalDifficulty >= m_syncingTotalDifficulty)
{
_peer->requestHashes(m_syncingLatestHash);
setState(SyncState::HashesSingle);
m_estimatedHashes = _peer->m_expectedHashes - (_peer->m_protocolVersion == protocolVersion() ? 0 : c_chainReorgSize);
}
else
_peer->setIdle();
}
}
else if (needBlocks())
{
if (m_man.isComplete())
{
// Done our chain-get.
setState(SyncState::Idle);
clog(NetNote) << "Chain download complete.";
// 1/100th for each useful block hash.
_peer->addRating(m_man.chainSize() / 100); //TODO: what about other peers?
m_man.reset();
_peer->setIdle();
return;
}
else if (peerCanHelp(_peer))
{
// Check block queue status
if (m_bq.unknownFull())
{
clog(NetWarn) << "Too many unknown blocks, restarting sync";
m_bq.clear();
reset();
continueSync();
}
else if (m_bq.knownFull())
{
clog(NetAllDetail) << "Waiting for block queue before downloading blocks";
m_lastActiveState = m_state;
setState(SyncState::WaitingQueue);
_peer->setIdle();
}
else
_peer->requestBlocks();
}
}
else
_peer->setIdle();
DEV_INVARIANT_CHECK;
}
bool EthereumHost::peerCanHelp(EthereumPeer* _peer) const
{
(void)_peer;
return true;
}
bool EthereumHost::peerShouldGrabBlocks(EthereumPeer* _peer) const
{
// this is only good for deciding whether to go ahead and grab a particular peer's hash chain,
// yet it's being used in determining whether to allow a peer help with downloading an existing
// chain of blocks.
auto td = _peer->m_totalDifficulty;
auto lh = m_syncingLatestHash;
auto ctd = m_chain.details().totalDifficulty;
clog(NetAllDetail) << "Should grab blocks? " << td << "vs" << ctd;
if (td < ctd || (td == ctd && m_chain.currentHash() == lh))
return false;
return true;
}
bool EthereumHost::peerShouldGrabChain(EthereumPeer* _peer) const
{
h256 c = m_chain.currentHash();
unsigned n = m_chain.number();
u256 td = m_chain.details().totalDifficulty;
clog(NetAllDetail) << "Attempt chain-grab? Latest:" << c << ", number:" << n << ", TD:" << td << " versus " << _peer->m_totalDifficulty;
if (td >= _peer->m_totalDifficulty)
{
clog(NetAllDetail) << "No. Our chain is better.";
return false;
}
else
{
clog(NetAllDetail) << "Yes. Their chain is better.";
return true;
}
} }
bool EthereumHost::isSyncing() const bool EthereumHost::isSyncing() const
{ {
return m_state != SyncState::Idle; Guard l(x_sync);
if (!m_sync)
return false;
return m_sync->isSyncing();
} }
SyncStatus EthereumHost::status() const SyncStatus EthereumHost::status() const
{ {
RecursiveGuard l(x_sync); Guard l(x_sync);
SyncStatus res; if (!m_sync)
res.state = m_state; return SyncStatus();
if (m_state == SyncState::HashesParallel) return m_sync->status();
{
res.hashesReceived = m_hashMan.hashesGot().size();
res.hashesTotal = m_hashMan.chainSize();
}
else if (m_state == SyncState::HashesSingle)
{
res.hashesTotal = m_estimatedHashes;
res.hashesReceived = static_cast<unsigned>(m_hashes.size());
res.hashesEstimated = true;
}
else if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks || m_state == SyncState::WaitingQueue)
{
res.blocksTotal = m_man.chainSize();
res.blocksReceived = m_man.blocksGot().size();
}
return res;
} }
bool EthereumHost::invariants() const bool EthereumHost::invariants() const
{ {
if (m_state == SyncState::HashesNegotiate && !m_hashes.empty())
return false;
if (needBlocks() && (m_syncingLatestHash || !m_hashes.empty()))
return false;
return true; return true;
} }

64
libethereum/EthereumHost.h

@ -48,6 +48,7 @@ namespace eth
class TransactionQueue; class TransactionQueue;
class BlockQueue; class BlockQueue;
class BlockChainSync;
/** /**
* @brief The EthereumHost class * @brief The EthereumHost class
@ -57,7 +58,6 @@ class BlockQueue;
class EthereumHost: public p2p::HostCapability<EthereumPeer>, Worker, HasInvariants class EthereumHost: public p2p::HostCapability<EthereumPeer>, Worker, HasInvariants
{ {
public: public:
/// Start server, but don't listen. /// Start server, but don't listen.
EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQueue& _bq, u256 _networkId); EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQueue& _bq, u256 _networkId);
@ -71,50 +71,42 @@ public:
void reset(); void reset();
DownloadMan const& downloadMan() const { return m_man; } DownloadMan const& downloadMan() const { return m_man; }
DownloadMan& downloadMan() { return m_man; }
bool isSyncing() const; bool isSyncing() const;
bool isBanned(p2p::NodeId const& _id) const { return !!m_banned.count(_id); } bool isBanned(p2p::NodeId const& _id) const { return !!m_banned.count(_id); }
void noteNewTransactions() { m_newTransactions = true; } void noteNewTransactions() { m_newTransactions = true; }
void noteNewBlocks() { m_newBlocks = true; } void noteNewBlocks() { m_newBlocks = true; }
void onPeerStatus(EthereumPeer* _peer); ///< Called by peer to report status BlockChain const& chain() const { return m_chain; }
void onPeerBlocks(EthereumPeer* _peer, RLP const& _r); ///< Called by peer once it has new blocks during syn BlockQueue& bq() { return m_bq; }
void onPeerNewBlock(EthereumPeer* _peer, RLP const& _r); ///< Called by peer once it has new blocks
void onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes); ///< Called by peer once it has new hashes
void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes); ///< Called by peer once it has another sequential block of hashes during sync
void onPeerTransactions(EthereumPeer* _peer, RLP const& _r); ///< Called by peer when it has new transactions
void onPeerAborting(EthereumPeer* _peer); ///< Called by peer when it is disconnecting
DownloadMan& downloadMan() { return m_man; }
HashDownloadMan& hashDownloadMan() { return m_hashMan; }
BlockChain const& chain() { return m_chain; }
SyncStatus status() const; SyncStatus status() const;
h256 latestBlockSent() { return m_latestBlockSent; }
static char const* stateName(SyncState _s) { return s_stateNames[static_cast<int>(_s)]; } static char const* stateName(SyncState _s) { return s_stateNames[static_cast<int>(_s)]; }
static unsigned const c_oldProtocolVersion; static unsigned const c_oldProtocolVersion;
void foreachPeerPtr(std::function<bool(std::shared_ptr<EthereumPeer>)> const& _f) const;
void foreachPeer(std::function<bool(EthereumPeer*)> const& _f) const;
void onPeerStatus(EthereumPeer* _peer);
void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes);
void onPeerBlocks(EthereumPeer* _peer, RLP const& _r);
void onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes);
void onPeerNewBlock(EthereumPeer* _peer, RLP const& _r);
void onPeerTransactions(EthereumPeer* _peer, RLP const& _r);
void onPeerAborting(EthereumPeer* _peer);
private: private:
static char const* const s_stateNames[static_cast<int>(SyncState::Size)]; static char const* const s_stateNames[static_cast<int>(SyncState::Size)];
std::tuple<std::vector<std::shared_ptr<EthereumPeer>>, std::vector<std::shared_ptr<EthereumPeer>>, std::vector<std::shared_ptr<p2p::Session>>> randomSelection(unsigned _percent = 25, std::function<bool(EthereumPeer*)> const& _allow = [](EthereumPeer const*){ return true; }); std::tuple<std::vector<std::shared_ptr<EthereumPeer>>, std::vector<std::shared_ptr<EthereumPeer>>, std::vector<std::shared_ptr<p2p::Session>>> randomSelection(unsigned _percent = 25, std::function<bool(EthereumPeer*)> const& _allow = [](EthereumPeer const*){ return true; });
void foreachPeerPtr(std::function<void(std::shared_ptr<EthereumPeer>)> const& _f) const;
void foreachPeer(std::function<void(EthereumPeer*)> const& _f) const;
void resetSyncTo(h256 const& _h);
bool needHashes() const { return m_state == SyncState::HashesNegotiate || m_state == SyncState::HashesSingle || m_state == SyncState::HashesParallel; }
bool needBlocks() const { return m_state == SyncState::Blocks || m_state == SyncState::NewBlocks; }
/// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network. /// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network.
void doWork(); void doWork();
void maintainTransactions(); void maintainTransactions();
void maintainBlocks(h256 const& _currentBlock); void maintainBlocks(h256 const& _currentBlock);
/// Get a bunch of needed blocks.
/// Removes them from our list of needed blocks.
/// @returns empty if there's no more blocks left to fetch, otherwise the blocks to fetch.
h256Hash neededBlocks(h256Hash const& _exclude);
/// Check to see if the network peer-state initialisation has happened. /// Check to see if the network peer-state initialisation has happened.
bool isInitialised() const { return (bool)m_latestBlockSent; } bool isInitialised() const { return (bool)m_latestBlockSent; }
@ -124,29 +116,16 @@ private:
virtual void onStarting() { startWorking(); } virtual void onStarting() { startWorking(); }
virtual void onStopping() { stopWorking(); } virtual void onStopping() { stopWorking(); }
void continueSync(); /// Find something to do for all peers BlockChainSync& sync();
void continueSync(EthereumPeer* _peer); /// Find some work to do for a peer
void onPeerDoneHashes(EthereumPeer* _peer, bool _new); /// Called when done downloading hashes from peer
void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes, bool _complete);
bool peerShouldGrabBlocks(EthereumPeer* _peer) const;
bool peerShouldGrabChain(EthereumPeer* _peer) const;
bool peerCanHelp(EthereumPeer* _peer) const;
unsigned estimateHashes();
void estimatePeerHashes(EthereumPeer* _peer);
void setState(SyncState _s);
bool invariants() const override; bool invariants() const override;
BlockChain const& m_chain; BlockChain const& m_chain;
TransactionQueue& m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain. TransactionQueue& m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain.
BlockQueue& m_bq; ///< Maintains a list of incoming blocks not yet on the blockchain (to be imported). BlockQueue& m_bq; ///< Maintains a list of incoming blocks not yet on the blockchain (to be imported).
Handler m_bqRoomAvailable;
u256 m_networkId; u256 m_networkId;
DownloadMan m_man;
HashDownloadMan m_hashMan;
h256 m_latestBlockSent; h256 m_latestBlockSent;
h256Hash m_transactionsSent; h256Hash m_transactionsSent;
@ -155,14 +134,9 @@ private:
bool m_newTransactions = false; bool m_newTransactions = false;
bool m_newBlocks = false; bool m_newBlocks = false;
mutable RecursiveMutex x_sync; mutable Mutex x_sync;
SyncState m_state = SyncState::Idle; ///< Current sync state DownloadMan m_man;
SyncState m_lastActiveState = SyncState::Idle; ///< Saved state before entering waiting queue mode std::unique_ptr<BlockChainSync> m_sync;
h256 m_syncingLatestHash; ///< Latest block's hash, as of the current sync.
u256 m_syncingTotalDifficulty; ///< Latest block's total difficulty, as of the current sync.
h256s m_hashes; ///< List of hashes with unknown block numbers. Used for PV60 chain downloading and catching up to a particular unknown
unsigned m_estimatedHashes = 0; ///< Number of estimated hashes for the last peer over PV60. Used for status reporting only.
bool m_continueSync = false; ///< True when the block queue has processed a block; we should restart grabbing blocks.
}; };
} }

11
libethereum/EthereumPeer.cpp

@ -30,6 +30,8 @@
#include "EthereumHost.h" #include "EthereumHost.h"
#include "TransactionQueue.h" #include "TransactionQueue.h"
#include "BlockQueue.h" #include "BlockQueue.h"
#include "BlockChainSync.h"
using namespace std; using namespace std;
using namespace dev; using namespace dev;
using namespace dev::eth; using namespace dev::eth;
@ -38,7 +40,6 @@ using namespace p2p;
EthereumPeer::EthereumPeer(Session* _s, HostCapabilityFace* _h, unsigned _i, CapDesc const& _cap): EthereumPeer::EthereumPeer(Session* _s, HostCapabilityFace* _h, unsigned _i, CapDesc const& _cap):
Capability(_s, _h, _i), Capability(_s, _h, _i),
m_sub(host()->downloadMan()), m_sub(host()->downloadMan()),
m_hashSub(host()->hashDownloadMan()),
m_peerCapabilityVersion(_cap.second) m_peerCapabilityVersion(_cap.second)
{ {
session()->addNote("manners", isRude() ? "RUDE" : "nice"); session()->addNote("manners", isRude() ? "RUDE" : "nice");
@ -91,8 +92,6 @@ string toString(Asking _a)
void EthereumPeer::setIdle() void EthereumPeer::setIdle()
{ {
m_sub.doneFetch();
m_hashSub.doneFetch();
setAsking(Asking::Nothing); setAsking(Asking::Nothing);
} }
@ -113,14 +112,14 @@ void EthereumPeer::requestStatus()
sealAndSend(s); sealAndSend(s);
} }
void EthereumPeer::requestHashes() void EthereumPeer::requestHashes(u256 _number, unsigned _count)
{ {
assert(m_asking == Asking::Nothing); assert(m_asking == Asking::Nothing);
m_syncHashNumber = m_hashSub.nextFetch(c_maxHashesAsk); m_syncHashNumber = _number;
m_syncHash = h256(); m_syncHash = h256();
setAsking(Asking::Hashes); setAsking(Asking::Hashes);
RLPStream s; RLPStream s;
prep(s, GetBlockHashesByNumberPacket, 2) << m_syncHashNumber << c_maxHashesAsk; prep(s, GetBlockHashesByNumberPacket, 2) << m_syncHashNumber << _count;
clog(NetMessageDetail) << "Requesting block hashes for numbers " << m_syncHashNumber << "-" << m_syncHashNumber + c_maxHashesAsk - 1; clog(NetMessageDetail) << "Requesting block hashes for numbers " << m_syncHashNumber << "-" << m_syncHashNumber + c_maxHashesAsk - 1;
sealAndSend(s); sealAndSend(s);
} }

16
libethereum/EthereumPeer.h

@ -50,6 +50,9 @@ namespace eth
class EthereumPeer: public p2p::Capability class EthereumPeer: public p2p::Capability
{ {
friend class EthereumHost; //TODO: remove this friend class EthereumHost; //TODO: remove this
friend class BlockChainSync; //TODO: remove this
friend class PV60Sync; //TODO: remove this
friend class PV61Sync; //TODO: remove this
public: public:
/// Basic constructor. /// Basic constructor.
@ -73,8 +76,8 @@ public:
/// Abort sync and reset fetch /// Abort sync and reset fetch
void setIdle(); void setIdle();
/// Request hashes. Uses hash download manager to get hash number. v61+ protocol version only /// Request hashes by number. v61+ protocol version only
void requestHashes(); void requestHashes(u256 _number, unsigned _count);
/// Request hashes for given parent hash. /// Request hashes for given parent hash.
void requestHashes(h256 const& _lastHash); void requestHashes(h256 const& _lastHash);
@ -135,18 +138,19 @@ private:
h256 m_genesisHash; ///< Peer's genesis hash h256 m_genesisHash; ///< Peer's genesis hash
u256 m_latestBlockNumber; ///< Number of the latest block this peer has u256 m_latestBlockNumber; ///< Number of the latest block this peer has
/// This is built as we ask for hashes. Once no more hashes are given, we present this to the /// This is built as we ask for hashes. Once no more hashes are given, we present this to the
/// host who initialises the DownloadMan and m_sub becomes active for us to begin asking for blocks. /// host who initialises the DownloadMan and m_sub becomes active for us to begin asking for blocks.
h256 m_syncingLastReceivedHash; ///< Hash most recently received from peer.
h256 m_syncingLatestHash; ///< Peer's latest block's hash, as of the current sync.
u256 m_syncingTotalDifficulty; ///< Peer's latest block's total difficulty, as of the current sync.
unsigned m_expectedHashes = 0; ///< Estimated upper bound of hashes to expect from this peer. unsigned m_expectedHashes = 0; ///< Estimated upper bound of hashes to expect from this peer.
unsigned m_syncHashNumber = 0; ///< Number of latest hash we sync to (PV61+) u256 m_syncHashNumber = 0; ///< Number of latest hash we sync to (PV61+)
h256 m_syncHash; ///< Latest hash we sync to (PV60) h256 m_syncHash; ///< Latest hash we sync to (PV60)
/// Once we're asking for blocks, this becomes in use. /// Once we're asking for blocks, this becomes in use.
DownloadSub m_sub; DownloadSub m_sub;
/// Once we're asking for hashes, this becomes in use.
HashDownloadSub m_hashSub;
u256 m_peerCapabilityVersion; ///< Protocol version this peer supports received as capability u256 m_peerCapabilityVersion; ///< Protocol version this peer supports received as capability
/// Have we received a GetTransactions packet that we haven't yet answered? /// Have we received a GetTransactions packet that we haven't yet answered?
bool m_requireTransactions = false; bool m_requireTransactions = false;

Loading…
Cancel
Save