Browse Source

continue syncing refactoring

cl-refactor
arkpar 10 years ago
parent
commit
18127c1b4e
  1. 510
      libethereum/BlockChainSync.cpp
  2. 153
      libethereum/BlockChainSync.h
  3. 31
      libethereum/BlockQueue.cpp
  4. 4
      libethereum/BlockQueue.h
  5. 5
      libethereum/EthereumHost.cpp
  6. 1
      libethereum/EthereumHost.h
  7. 5
      libethereum/EthereumPeer.h

510
libethereum/BlockChainSync.cpp

@ -14,7 +14,7 @@
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file EthereumHost.cpp
/** @file BlockChainSync.cpp
* @author Gav Wood <i@gavwood.com>
* @date 2014
*/
@ -22,30 +22,32 @@
#include "BlockChainSync.h"
#include <chrono>
#include <thread>
#include <libdevcore/Common.h>
#include <libp2p/Host.h>
#include <libp2p/Session.h>
#include <libethcore/Exceptions.h>
#include <libethcore/Params.h>
#include "BlockChain.h"
#include "TransactionQueue.h"
#include "BlockQueue.h"
#include "EthereumPeer.h"
#include "EthereumHost.h"
#include "DownloadMan.h"
using namespace std;
using namespace dev;
using namespace dev::eth;
using namespace p2p;
unsigned const c_chainReorgSize = 30000;
BlockChainSync::BlockChainSync(EthereumHost& _host):
m_host(_host)
{
m_bqRoomAvailable = host().bq().onRoomAvailable([this]()
{
RecursiveGuard l(x_sync);
continueSync();
});
}
BlockChainSync::~BlockChainSync()
@ -69,9 +71,27 @@ void BlockChainSync::abortSync()
downloadMan().resetToChain(h256s());
}
void BlockChainSync::onPeerStatus(EthereumPeer*)
void BlockChainSync::onPeerStatus(EthereumPeer* _peer)
{
RecursiveGuard l(x_sync);
DEV_INVARIANT_CHECK;
if (_peer->m_genesisHash != host().chain().genesisHash())
_peer->disable("Invalid genesis hash");
else if (_peer->m_protocolVersion != host().protocolVersion() && _peer->m_protocolVersion != EthereumHost::c_oldProtocolVersion)
_peer->disable("Invalid protocol version.");
else if (_peer->m_networkId != host().networkId())
_peer->disable("Invalid network identifier.");
else if (_peer->session()->info().clientVersion.find("/v0.7.0/") != string::npos)
_peer->disable("Blacklisted client version.");
else if (host().isBanned(_peer->session()->id()))
_peer->disable("Peer banned for previous bad behaviour.");
else
{
unsigned estimatedHashes = estimateHashes();
_peer->m_expectedHashes = estimatedHashes;
onNewPeer(_peer);
}
DEV_INVARIANT_CHECK;
}
unsigned BlockChainSync::estimateHashes()
@ -88,9 +108,183 @@ unsigned BlockChainSync::estimateHashes()
return blockCount;
}
void BlockChainSync::requestBlocks(EthereumPeer* _peer)
{
if (host().bq().knownFull())
{
clog(NetAllDetail) << "Waiting for block queue before downloading blocks";
m_lastActiveState = m_state;
pauseSync();
_peer->setIdle();
return;
}
_peer->requestBlocks();
if (_peer->m_asking != Asking::Blocks) //nothing to download
{
peerDoneBlocks(_peer);
if (downloadMan().isComplete())
completeSync();
return;
}
}
void BlockChainSync::onPeerBlocks(EthereumPeer* _peer, RLP const& _r)
{
RecursiveGuard l(x_sync);
DEV_INVARIANT_CHECK;
unsigned itemCount = _r.itemCount();
clog(NetMessageSummary) << "Blocks (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreBlocks");
_peer->setIdle();
if (m_state != SyncState::Blocks && m_state != SyncState::NewBlocks)
clog(NetWarn) << "Unexpected Blocks received!";
if (m_state == SyncState::Waiting)
{
clog(NetAllDetail) << "Ignored blocks while waiting";
return;
}
if (itemCount == 0)
{
// Got to this peer's latest block - just give up.
peerDoneBlocks(_peer);
if (downloadMan().isComplete())
completeSync();
return;
}
unsigned success = 0;
unsigned future = 0;
unsigned unknown = 0;
unsigned got = 0;
unsigned repeated = 0;
u256 maxUnknownNumber = 0;
h256 maxUnknown;
for (unsigned i = 0; i < itemCount; ++i)
{
auto h = BlockInfo::headerHash(_r[i].data());
if (_peer->m_sub.noteBlock(h))
{
_peer->addRating(10);
switch (host().bq().import(_r[i].data(), host().chain()))
{
case ImportResult::Success:
success++;
break;
case ImportResult::Malformed:
case ImportResult::BadChain:
_peer->disable("Malformed block received.");
return;
case ImportResult::FutureTimeKnown:
future++;
break;
case ImportResult::AlreadyInChain:
case ImportResult::AlreadyKnown:
got++;
break;
case ImportResult::FutureTimeUnkwnown:
future++; //Fall through
case ImportResult::UnknownParent:
{
unknown++;
if (m_state == SyncState::NewBlocks)
{
BlockInfo bi;
bi.populateFromHeader(_r[i][0]);
if (bi.number > maxUnknownNumber)
{
maxUnknownNumber = bi.number;
maxUnknown = h;
}
}
break;
}
default:;
}
}
else
{
_peer->addRating(0); // -1?
repeated++;
}
}
clog(NetMessageSummary) << dec << success << "imported OK," << unknown << "with unknown parents," << future << "with future timestamps," << got << " already known," << repeated << " repeats received.";
if (host().bq().unknownFull())
{
clog(NetWarn) << "Too many unknown blocks, restarting sync";
restartSync();
return;
}
if (m_state == SyncState::NewBlocks && unknown > 0)
resetSyncFor(_peer, maxUnknown, std::numeric_limits<u256>::max()); //TODO: proper total difficuty
else if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks)
{
if (downloadMan().isComplete())
completeSync();
else if (!got)
requestBlocks(_peer);
else
peerDoneBlocks(_peer);
}
DEV_INVARIANT_CHECK;
}
void BlockChainSync::onPeerNewBlock(EthereumPeer* _peer, RLP const& _r)
{
DEV_INVARIANT_CHECK;
RecursiveGuard l(x_sync);
auto h = BlockInfo::headerHash(_r[0].data());
clog(NetMessageSummary) << "NewBlock: " << h;
if (_r.itemCount() != 2)
_peer->disable("NewBlock without 2 data fields.");
else
{
switch (host().bq().import(_r[0].data(), host().chain()))
{
case ImportResult::Success:
_peer->addRating(100);
break;
case ImportResult::FutureTimeKnown:
//TODO: Rating dependent on how far in future it is.
break;
case ImportResult::Malformed:
case ImportResult::BadChain:
_peer->disable("Malformed block received.");
return;
case ImportResult::AlreadyInChain:
case ImportResult::AlreadyKnown:
break;
case ImportResult::FutureTimeUnkwnown:
case ImportResult::UnknownParent:
clog(NetMessageSummary) << "Received block with no known parent. Resyncing...";
resetSyncFor(_peer, h, _r[1].toInt<u256>());
break;
default:;
}
DEV_GUARDED(_peer->x_knownBlocks)
_peer->m_knownBlocks.insert(h);
}
DEV_INVARIANT_CHECK;
}
PV60Sync::PV60Sync(EthereumHost& _host):
BlockChainSync(_host)
{
resetSync();
}
SyncStatus PV60Sync::status() const
@ -123,13 +317,44 @@ void PV60Sync::setState(EthereumPeer* _peer, SyncState _s, bool _isSyncing, bool
changeSyncer(nullptr, _needHelp);
assert(!!m_syncer || _s == SyncState::Idle);
}
if (!_isSyncing)
{
m_syncingLatestHash = h256();
m_syncingTotalDifficulty = 0;
m_syncingNeededBlocks.clear();
}
void PV60Sync::resetSync()
{
m_syncingLatestHash = h256();
m_syncingLastReceivedHash = h256();
m_syncingTotalDifficulty = 0;
m_syncingNeededBlocks.clear();
}
void PV60Sync::restartSync()
{
resetSync();
host().bq().clear();
if (isSyncing())
transition(m_syncer, SyncState::Idle);
}
void PV60Sync::completeSync()
{
if (isSyncing())
transition(m_syncer, SyncState::Idle);
}
void PV60Sync::pauseSync()
{
if (isSyncing())
setState(m_syncer, SyncState::Waiting, true);
}
void PV60Sync::continueSync()
{
transition(m_syncer, SyncState::Blocks);
}
void PV60Sync::onNewPeer(EthereumPeer* _peer)
{
setNeedsSyncing(_peer, _peer->m_latestHash, _peer->m_totalDifficulty);
}
void PV60Sync::transition(EthereumPeer* _peer, SyncState _s, bool _force, bool _needHelp)
@ -151,7 +376,7 @@ void PV60Sync::transition(EthereumPeer* _peer, SyncState _s, bool _force, bool _
m_syncingLatestHash = _peer->m_latestHash;
m_syncingTotalDifficulty = _peer->m_totalDifficulty;
setState(_peer, _s, true);
_peer->requestHashes(m_syncingLatestHash);
_peer->requestHashes(m_syncingLastReceivedHash ? m_syncingLastReceivedHash : m_syncingLatestHash);
DEV_INVARIANT_CHECK;
return;
}
@ -175,22 +400,23 @@ void PV60Sync::transition(EthereumPeer* _peer, SyncState _s, bool _force, bool _
clog(NetWarn) << "Bad state: asking for Hashes yet not syncing!";
return;
}
if (shouldGrabBlocks(_peer))
if (shouldGrabBlocks())
{
clog(NetNote) << "Difficulty of hashchain HIGHER. Grabbing" << m_syncingNeededBlocks.size() << "blocks [latest now" << m_syncingLatestHash << ", was" << host().latestBlockSent() << "]";
downloadMan().resetToChain(m_syncingNeededBlocks);
resetSync();
}
else
{
clog(NetNote) << "Difficulty of hashchain not HIGHER. Ignoring.";
m_syncingLatestHash = h256();
resetSync();
setState(_peer, SyncState::Idle, false);
return;
}
assert (isSyncing(_peer));
}
// run through into...
if (m_state == SyncState::Idle || m_state == SyncState::Hashes || m_state == SyncState::Blocks)
if (m_state == SyncState::Idle || m_state == SyncState::Hashes || m_state == SyncState::Blocks || m_state == SyncState::Waiting)
{
// Looks like it's the best yet for total difficulty. Set to download.
setState(_peer, SyncState::Blocks, isSyncing(_peer), _needHelp); // will kick off other peers to help if available.
@ -201,7 +427,7 @@ void PV60Sync::transition(EthereumPeer* _peer, SyncState _s, bool _force, bool _
}
else if (_s == SyncState::NewBlocks)
{
if (m_state != SyncState::Idle && m_state != SyncState::NewBlocks)
if (m_state != SyncState::Idle && m_state != SyncState::NewBlocks && m_state != SyncState::Waiting)
clog(NetWarn) << "Bad state: Asking new blocks while syncing!";
else
{
@ -211,6 +437,16 @@ void PV60Sync::transition(EthereumPeer* _peer, SyncState _s, bool _force, bool _
return;
}
}
else if (_s == SyncState::Waiting)
{
if (m_state != SyncState::Blocks && m_state != SyncState::NewBlocks && m_state != SyncState::Hashes && m_state != SyncState::Waiting)
clog(NetWarn) << "Bad state: Entering waiting state while not downloading blocks!";
else
{
setState(_peer, SyncState::Waiting, isSyncing(_peer), _needHelp);
return;
}
}
else if (_s == SyncState::Idle)
{
host().foreachPeer([this](EthereumPeer* _p) { _p->setIdle(); return true; });
@ -240,16 +476,9 @@ void PV60Sync::transition(EthereumPeer* _peer, SyncState _s, bool _force, bool _
clog(NetWarn) << "Invalid state transition:" << EthereumHost::stateName(_s) << "from" << EthereumHost::stateName(m_state) << ", " << (isSyncing(_peer) ? "syncing" : "holding") << (needsSyncing(_peer) ? "& needed" : "");
}
void PV60Sync::requestBlocks(EthereumPeer* _peer)
void PV60Sync::resetSyncFor(EthereumPeer* _peer, h256 _latestHash, u256 _td)
{
_peer->requestBlocks();
if (_peer->m_asking != Asking::Blocks) //nothing to download
{
noteDoneBlocks(_peer, false);
if (downloadMan().isComplete())
transition(_peer, SyncState::Idle);
return;
}
setNeedsSyncing(_peer, _latestHash, _td);
}
void PV60Sync::setNeedsSyncing(EthereumPeer* _peer, h256 _latestHash, u256 _td)
@ -263,16 +492,21 @@ void PV60Sync::setNeedsSyncing(EthereumPeer* _peer, h256 _latestHash, u256 _td)
_peer->session()->addNote("sync", string(isSyncing(_peer) ? "ongoing" : "holding") + (needsSyncing(_peer) ? " & needed" : ""));
}
bool PV60Sync::needsSyncing(EthereumPeer* _peer) const
{
return !!_peer->m_latestHash;
}
bool PV60Sync::isSyncing(EthereumPeer* _peer) const
{
return m_syncer == _peer;
}
bool PV60Sync::shouldGrabBlocks(EthereumPeer* _peer) const
bool PV60Sync::shouldGrabBlocks() const
{
auto td = _peer->m_totalDifficulty;
auto lh = _peer->m_latestHash;
auto ctd = host().chain().details().totalDifficulty;
auto td = m_syncingTotalDifficulty;
auto lh = m_syncingLatestHash;
auto ctd = host().chain().details().totalDifficulty;
if (m_syncingNeededBlocks.empty())
return false;
@ -300,11 +534,12 @@ void PV60Sync::attemptSync(EthereumPeer* _peer)
return;
}
h256 c = host().chain().currentHash();
unsigned n = host().chain().number();
u256 td = host().chain().details().totalDifficulty;
if (host().bq().isActive())
td += host().bq().difficulty();
clog(NetAllDetail) << "Attempt chain-grab? Latest:" << c << ", number:" << n << ", TD:" << td << " versus " << _peer->m_totalDifficulty;
clog(NetAllDetail) << "Attempt chain-grab? Latest:" << (m_syncingLastReceivedHash ? m_syncingLastReceivedHash : m_syncingLatestHash) << ", number:" << n << ", TD:" << td << " versus " << _peer->m_totalDifficulty;
if (td >= _peer->m_totalDifficulty)
{
clog(NetAllDetail) << "No. Our chain is better.";
@ -326,7 +561,7 @@ void PV60Sync::noteNeedsSyncing(EthereumPeer* _peer)
{
clog(NetAllDetail) << "Sync in progress: Just set to help out.";
if (m_state == SyncState::Blocks)
_peer->requestBlocks();
requestBlocks(_peer);
}
else
// otherwise check to see if we should be downloading...
@ -370,6 +605,11 @@ void PV60Sync::changeSyncer(EthereumPeer* _syncer, bool _needHelp)
assert(!!m_syncer || m_state == SyncState::Idle);
}
void PV60Sync::peerDoneBlocks(EthereumPeer* _peer)
{
noteDoneBlocks(_peer, false);
}
void PV60Sync::noteDoneBlocks(EthereumPeer* _peer, bool _clemency)
{
resetNeedsSyncing(_peer);
@ -399,125 +639,6 @@ void PV60Sync::noteDoneBlocks(EthereumPeer* _peer, bool _clemency)
_peer->m_sub.doneFetch();
}
void PV60Sync::onPeerStatus(EthereumPeer* _peer)
{
RecursiveGuard l(x_sync);
DEV_INVARIANT_CHECK;
if (_peer->m_genesisHash != host().chain().genesisHash())
_peer->disable("Invalid genesis hash");
else if (_peer->m_protocolVersion != host().protocolVersion() && _peer->m_protocolVersion != EthereumHost::c_oldProtocolVersion)
_peer->disable("Invalid protocol version.");
else if (_peer->m_networkId != host().networkId())
_peer->disable("Invalid network identifier.");
else if (_peer->session()->info().clientVersion.find("/v0.7.0/") != string::npos)
_peer->disable("Blacklisted client version.");
else if (host().isBanned(_peer->session()->id()))
_peer->disable("Peer banned for previous bad behaviour.");
else
{
unsigned estimatedHashes = estimateHashes();
_peer->m_expectedHashes = estimatedHashes;
setNeedsSyncing(_peer, _peer->m_latestHash, _peer->m_totalDifficulty);
}
DEV_INVARIANT_CHECK;
}
void PV60Sync::onPeerBlocks(EthereumPeer* _peer, RLP const& _r)
{
RecursiveGuard l(x_sync);
DEV_INVARIANT_CHECK;
unsigned itemCount = _r.itemCount();
clog(NetMessageSummary) << "Blocks (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreBlocks");
_peer->setIdle();
if (m_state != SyncState::Blocks && m_state != SyncState::NewBlocks)
clog(NetWarn) << "Unexpected Blocks received!";
if (itemCount == 0)
{
// Got to this peer's latest block - just give up.
noteDoneBlocks(_peer, false);
if (downloadMan().isComplete())
transition(_peer, SyncState::Idle);
return;
}
unsigned success = 0;
unsigned future = 0;
unsigned unknown = 0;
unsigned got = 0;
unsigned repeated = 0;
u256 maxDifficulty = 0;
h256 maxUnknown;
for (unsigned i = 0; i < itemCount; ++i)
{
auto h = BlockInfo::headerHash(_r[i].data());
if (_peer->m_sub.noteBlock(h))
{
_peer->addRating(10);
switch (host().bq().import(_r[i].data(), host().chain()))
{
case ImportResult::Success:
success++;
break;
case ImportResult::Malformed:
case ImportResult::BadChain:
_peer->disable("Malformed block received.");
return;
case ImportResult::FutureTimeKnown:
future++;
break;
case ImportResult::AlreadyInChain:
case ImportResult::AlreadyKnown:
got++;
break;
case ImportResult::FutureTimeUnkwnown:
future++; //Fall through
case ImportResult::UnknownParent:
{
unknown++;
if (m_state == SyncState::NewBlocks)
{
BlockInfo bi;
bi.populateFromHeader(_r[i][0]);
if (bi.difficulty > maxDifficulty)
{
maxDifficulty = bi.difficulty;
maxUnknown = h;
}
}
break;
}
default:;
}
}
else
{
_peer->addRating(0); // -1?
repeated++;
}
}
clog(NetMessageSummary) << dec << success << "imported OK," << unknown << "with unknown parents," << future << "with future timestamps," << got << " already known," << repeated << " repeats received.";
if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks)
{
if (downloadMan().isComplete())
transition(_peer, SyncState::Idle);
else if (!got)
transition(_peer, m_state);
else
noteDoneBlocks(_peer, false);
}
DEV_INVARIANT_CHECK;
}
void PV60Sync::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes)
{
RecursiveGuard l(x_sync);
@ -565,9 +686,7 @@ void PV60Sync::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes)
if (m_syncingNeededBlocks.size() > _peer->m_expectedHashes)
{
_peer->disable("Too many hashes");
m_syncingNeededBlocks.clear();
m_syncingLatestHash = h256();
transition(_peer, SyncState::Idle);
restartSync();
return;
}
// run through - ask for more.
@ -575,66 +694,6 @@ void PV60Sync::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes)
DEV_INVARIANT_CHECK;
}
void PV60Sync::abortSync(EthereumPeer* _peer)
{
if (isSyncing(_peer))
{
host().foreachPeer([this](EthereumPeer* _p) { _p->setIdle(); return true; });
transition(_peer, SyncState::Idle, true);
}
DEV_INVARIANT_CHECK;
}
void PV60Sync::onPeerAborting(EthereumPeer* _peer)
{
abortSync(_peer);
DEV_INVARIANT_CHECK;
}
void PV60Sync::onPeerNewBlock(EthereumPeer* _peer, RLP const& _r)
{
DEV_INVARIANT_CHECK;
RecursiveGuard l(x_sync);
auto h = BlockInfo::headerHash(_r[0].data());
clog(NetMessageSummary) << "NewBlock: " << h;
if (_r.itemCount() != 2)
_peer->disable("NewBlock without 2 data fields.");
else
{
switch (host().bq().import(_r[0].data(), host().chain()))
{
case ImportResult::Success:
_peer->addRating(100);
break;
case ImportResult::FutureTimeKnown:
//TODO: Rating dependent on how far in future it is.
break;
case ImportResult::Malformed:
case ImportResult::BadChain:
_peer->disable("Malformed block received.");
return;
case ImportResult::AlreadyInChain:
case ImportResult::AlreadyKnown:
break;
case ImportResult::FutureTimeUnkwnown:
case ImportResult::UnknownParent:
clog(NetMessageSummary) << "Received block with no known parent. Resyncing...";
setNeedsSyncing(_peer, h, _r[1].toInt<u256>());
break;
default:;
}
DEV_GUARDED(_peer->x_knownBlocks)
_peer->m_knownBlocks.insert(h);
}
DEV_INVARIANT_CHECK;
}
void PV60Sync::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes)
{
RecursiveGuard l(x_sync);
@ -672,11 +731,28 @@ void PV60Sync::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes)
{
clog(NetNote) << "Not syncing and new block hash discovered: syncing without help.";
downloadMan().resetToChain(m_syncingNeededBlocks);
resetSync();
transition(_peer, SyncState::NewBlocks, false, false);
}
DEV_INVARIANT_CHECK;
}
void PV60Sync::abortSync(EthereumPeer* _peer)
{
if (isSyncing(_peer))
{
host().foreachPeer([this](EthereumPeer* _p) { _p->setIdle(); return true; });
transition(_peer, SyncState::Idle, true);
}
DEV_INVARIANT_CHECK;
}
void PV60Sync::onPeerAborting(EthereumPeer* _peer)
{
abortSync(_peer);
DEV_INVARIANT_CHECK;
}
bool PV60Sync::invariants() const
{
if (m_state == SyncState::Idle && !!m_syncer)
@ -696,6 +772,8 @@ bool PV60Sync::invariants() const
host().foreachPeer([&](EthereumPeer* _p) { if (_p->m_asking == Asking::Blocks) blocks = true; return !blocks; });
if (!blocks)
return false;
if (downloadMan().isComplete())
return false;
}
return true;
}

153
libethereum/BlockChainSync.h

@ -14,7 +14,7 @@
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file EthereumHost.h
/** @file BlockChainSync.h
* @author Gav Wood <i@gavwood.com>
* @date 2014
*/
@ -22,23 +22,14 @@
#pragma once
#include <mutex>
#include <unordered_map>
#include <vector>
#include <unordered_set>
#include <memory>
#include <utility>
#include <thread>
#include <libdevcore/Guards.h>
#include <libdevcore/Worker.h>
#include <libdevcore/RangeMask.h>
#include <libethcore/Common.h>
#include <libp2p/Common.h>
#include "CommonNet.h"
#include "EthereumPeer.h" //TODO: forward decl
#include "DownloadMan.h"
namespace dev
{
@ -49,98 +40,174 @@ namespace eth
class EthereumHost;
class BlockQueue;
class EthereumPeer;
/**
* @brief BlockChain synchronization strategy class
* @doWork Syncs to peers and sends new blocks and transactions.
* @brief Base BlockChain synchronization strategy class.
* Syncs to peers and keeps up to date. Base class handles blocks downloading but does not contain any details on state transfer logic.
*/
class BlockChainSync: public HasInvariants
{
public:
BlockChainSync(EthereumHost& _host);
/// Will block on network process events.
virtual ~BlockChainSync();
void abortSync();
void abortSync(); ///< Abort all sync activity
DownloadMan const& downloadMan() const;
DownloadMan& downloadMan();
/// @returns true is Sync is in progress
virtual bool isSyncing() const = 0;
virtual void onPeerStatus(EthereumPeer* _peer); ///< Called by peer to report status
virtual void onPeerBlocks(EthereumPeer* _peer, RLP const& _r) = 0; ///< Called by peer once it has new blocks during syn
virtual void onPeerNewBlock(EthereumPeer* _peer, RLP const& _r) = 0; ///< Called by peer once it has new blocks
virtual void onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes) = 0; ///< Called by peer once it has new hashes
virtual void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes) = 0; ///< Called by peer once it has another sequential block of hashes during sync
virtual void onPeerAborting(EthereumPeer* _peer) = 0; ///< Called by peer when it is disconnecting
/// Called by peer to report status
virtual void onPeerStatus(EthereumPeer* _peer);
/// Called by peer once it has new blocks during syn
virtual void onPeerBlocks(EthereumPeer* _peer, RLP const& _r);
/// Called by peer once it has new blocks
virtual void onPeerNewBlock(EthereumPeer* _peer, RLP const& _r);
/// Called by peer once it has new hashes
virtual void onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes) = 0;
/// Called by peer once it has another sequential block of hashes during sync
virtual void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes) = 0;
/// Called by peer when it is disconnecting
virtual void onPeerAborting(EthereumPeer* _peer) = 0;
/// @returns Synchonization status
virtual SyncStatus status() const = 0;
static char const* stateName(SyncState _s) { return s_stateNames[static_cast<int>(_s)]; }
private:
static char const* const s_stateNames[static_cast<int>(SyncState::Size)];
protected:
//To be implemented in derived classes:
/// New valid peer appears
virtual void onNewPeer(EthereumPeer* _peer) = 0;
void setState(SyncState _s);
/// Peer done downloading blocks
virtual void peerDoneBlocks(EthereumPeer* _peer) = 0;
bool invariants() const override = 0;
/// Resume downloading after witing state
virtual void continueSync() = 0;
EthereumHost& m_host;
Handler m_bqRoomAvailable;
HashDownloadMan m_hashMan;
/// Restart sync
virtual void restartSync() = 0;
protected:
/// Called after all blocks have been donloaded
virtual void completeSync() = 0;
/// Enter waiting state
virtual void pauseSync() = 0;
/// Restart sync for given peer
virtual void resetSyncFor(EthereumPeer* _peer, h256 _latestHash, u256 _td) = 0;
EthereumHost& host() { return m_host; }
EthereumHost const& host() const { return m_host; }
/// Estimates max number of hashes peers can give us.
unsigned estimateHashes();
/// Request blocks from peer if needed
void requestBlocks(EthereumPeer* _peer);
private:
static char const* const s_stateNames[static_cast<int>(SyncState::Size)];
bool invariants() const override = 0;
EthereumHost& m_host;
HashDownloadMan m_hashMan;
protected:
Handler m_bqRoomAvailable;
mutable RecursiveMutex x_sync;
SyncState m_state = SyncState::Idle; ///< Current sync state
SyncState m_lastActiveState = SyncState::Idle; ///< Saved state before entering waiting queue mode
unsigned m_estimatedHashes = 0; ///< Number of estimated hashes for the last peer over PV60. Used for status reporting only.
};
/**
* @brief Syncrhonization over PV60. Selects a single peer and tries to downloading hashes from it. After hash downaload is complete
* Syncs to peers and keeps up to date
*/
class PV60Sync: public BlockChainSync
{
public:
PV60Sync(EthereumHost& _host);
/// @returns true is Sync is in progress
bool isSyncing() const override { return !!m_syncer; }
void onPeerStatus(EthereumPeer* _peer) override; ///< Called by peer to report status
void onPeerBlocks(EthereumPeer* _peer, RLP const& _r) override; ///< Called by peer once it has new blocks during syn
void onPeerNewBlock(EthereumPeer* _peer, RLP const& _r) override; ///< Called by peer once it has new blocks
void onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes) override; ///< Called by peer once it has new hashes
void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes) override; ///< Called by peer once it has another sequential block of hashes during sync
void onPeerAborting(EthereumPeer* _peer) override; ///< Called by peer when it is disconnecting
/// Called by peer once it has new hashes
void onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes) override;
/// Called by peer once it has another sequential block of hashes during sync
void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes) override;
/// Called by peer when it is disconnecting
void onPeerAborting(EthereumPeer* _peer) override;
/// @returns Sync status
SyncStatus status() const override;
void onNewPeer(EthereumPeer* _peer) override;
void continueSync() override;
void peerDoneBlocks(EthereumPeer* _peer) override;
void restartSync() override;
void completeSync() override;
void pauseSync() override;
void resetSyncFor(EthereumPeer* _peer, h256 _latestHash, u256 _td) override;
private:
/// Transition sync state in a particular direction. @param _peer Peer that is responsible for state tranfer
void transition(EthereumPeer* _peer, SyncState _s, bool _force = false, bool _needHelp = true);
/// Reset peer syncing requirements state.
void resetNeedsSyncing(EthereumPeer* _peer) { setNeedsSyncing(_peer, h256(), 0); }
bool needsSyncing(EthereumPeer* _peer) const { return !!_peer->m_latestHash; }
/// Update peer syncing requirements state.
void setNeedsSyncing(EthereumPeer* _peer, h256 _latestHash, u256 _td);
bool shouldGrabBlocks(EthereumPeer* _peer) const;
/// Do we presently need syncing with this peer?
bool needsSyncing(EthereumPeer* _peer) const;
/// Check whether the session should bother grabbing blocks.
bool shouldGrabBlocks() const;
/// Attempt to begin syncing with the peer; first check the peer has a more difficlult chain to download, then start asking for hashes, then move to blocks
void attemptSync(EthereumPeer* _peer);
/// Update our syncing state
void setState(EthereumPeer* _peer, SyncState _s, bool _isSyncing = false, bool _needHelp = false);
/// Check if peer is main syncer
bool isSyncing(EthereumPeer* _peer) const;
/// Check if we need (re-)syncing with the peer.
void noteNeedsSyncing(EthereumPeer* _who);
/// Set main syncing peer
void changeSyncer(EthereumPeer* _syncer, bool _needHelp);
/// Called when peer done downloading blocks
void noteDoneBlocks(EthereumPeer* _who, bool _clemency);
/// Abort syncing for peer
void abortSync(EthereumPeer* _peer);
void requestBlocks(EthereumPeer* _peer);
/// Reset hash chain syncing
void resetSync();
private:
bool invariants() const override;
h256s m_knownHashes; ///< List of block hashes we need to download.
h256s m_syncingNeededBlocks; ///< The blocks that we should download from this peer.
h256 m_syncingLastReceivedHash; ///< Hash most recently received from peer.
h256 m_syncingLatestHash; ///< Peer's latest block's hash, as of the current sync.
u256 m_syncingTotalDifficulty; ///< Peer's latest block's total difficulty, as of the current sync.
EthereumPeer* m_syncer = nullptr; // TODO: switch to weak_ptr
};
}
}

31
libethereum/BlockQueue.cpp

@ -37,7 +37,7 @@ const char* BlockQueueChannel::name() { return EthOrange "[]>"; }
const char* BlockQueueChannel::name() { return EthOrange "▣┅▶"; }
#endif
size_t const c_maxKnownCount = 100000;
size_t const c_maxKnownCount = 10000;
size_t const c_maxKnownSize = 128 * 1024 * 1024;
size_t const c_maxUnknownCount = 100000;
size_t const c_maxUnknownSize = 512 * 1024 * 1024; // Block size can be ~50kb
@ -81,6 +81,8 @@ void BlockQueue::clear()
m_unknownCount = 0;
m_knownSize = 0;
m_knownCount = 0;
m_difficulty = 0;
m_drainingDifficulty = 0;
}
void BlockQueue::verifierBody()
@ -194,7 +196,6 @@ ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc, boo
// VERIFY: populates from the block and checks the block is internally coherent.
BlockInfo bi;
try
{
// TODO: quick verify
@ -229,6 +230,7 @@ ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc, boo
cblockq << "OK - queued for future [" << bi.timestamp << "vs" << time(0) << "] - will wait until" << buf;
m_unknownSize += _block.size();
m_unknownCount++;
m_difficulty += bi.difficulty;
bool unknown = !m_readySet.count(bi.parentHash) && !m_drainingSet.count(bi.parentHash) && !_bc.isKnown(bi.parentHash);
return unknown ? ImportResult::FutureTimeUnkwnown : ImportResult::FutureTimeKnown;
}
@ -249,6 +251,7 @@ ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc, boo
m_unknown.insert(make_pair(bi.parentHash, make_pair(h, _block.toBytes())));
m_unknownSet.insert(h);
m_unknownSize += _block.size();
m_difficulty += bi.difficulty;
m_unknownCount++;
return ImportResult::UnknownParent;
@ -262,6 +265,7 @@ ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc, boo
m_moreToVerify.notify_one();
m_readySet.insert(h);
m_knownSize += _block.size();
m_difficulty += bi.difficulty;
m_knownCount++;
noteReady_WITH_LOCK(h);
@ -351,13 +355,16 @@ bool BlockQueue::doneDrain(h256s const& _bad)
WriteGuard l(m_lock);
DEV_INVARIANT_CHECK;
m_drainingSet.clear();
m_difficulty -= m_drainingDifficulty;
m_drainingDifficulty = 0;
if (_bad.size())
{
// at least one of them was bad.
m_knownBad += _bad;
for (h256 const& b : _bad)
updateBad(b);
} return !m_readySet.empty();
}
return !m_readySet.empty();
}
void BlockQueue::tick(BlockChain const& _bc)
@ -434,6 +441,7 @@ void BlockQueue::drain(VerifiedBlocks& o_out, unsigned _max)
if (m_drainingSet.empty())
{
bool wasFull = knownFull();
m_drainingDifficulty = 0;
DEV_GUARDED(m_verification)
{
o_out.resize(min<unsigned>(_max, m_verified.size()));
@ -446,6 +454,7 @@ void BlockQueue::drain(VerifiedBlocks& o_out, unsigned _max)
// TODO: @optimise use map<h256, bytes> rather than vector<bytes> & set<h256>.
auto h = bs.verified.info.hash();
m_drainingSet.insert(h);
m_drainingDifficulty += bs.verified.info.difficulty;
m_readySet.erase(h);
m_knownSize -= bs.verified.block.size();
m_knownCount--;
@ -525,3 +534,19 @@ std::ostream& dev::eth::operator<<(std::ostream& _out, BlockQueueStatus const& _
return _out;
}
u256 BlockQueue::difficulty() const
{
UpgradableGuard l(m_lock);
return m_difficulty;
}
bool BlockQueue::isActive() const
{
UpgradableGuard l(m_lock);
if (m_readySet.empty() && m_drainingSet.empty())
DEV_GUARDED(m_verification)
if (m_verified.empty() && m_verifying.empty() && m_unverified.empty())
return false;
return true;
}

4
libethereum/BlockQueue.h

@ -117,6 +117,8 @@ public:
bool knownFull() const;
bool unknownFull() const;
u256 difficulty() const; // Total difficulty of queueud blocks
bool isActive() const;
private:
struct UnverifiedBlock
@ -158,6 +160,8 @@ private:
std::atomic<size_t> m_knownSize; ///< Tracks total size in bytes of all known blocks;
std::atomic<size_t> m_unknownCount; ///< Tracks total count of unknown blocks. Used to avoid additional syncing
std::atomic<size_t> m_knownCount; ///< Tracks total count of known blocks. Used to avoid additional syncing
u256 m_difficulty; ///< Total difficulty of blocks in the queue
u256 m_drainingDifficulty; ///< Total difficulty of blocks in draining
};
std::ostream& operator<<(std::ostream& _out, BlockQueueStatus const& _s);

5
libethereum/EthereumHost.cpp

@ -289,6 +289,11 @@ void EthereumHost::onPeerNewBlock(EthereumPeer* _peer, RLP const& _r)
void EthereumHost::onPeerTransactions(EthereumPeer* _peer, RLP const& _r)
{
if (_peer->isCriticalSyncing())
{
clog(NetAllDetail) << "Ignoring transaction from peer we are syncing with";
return;
}
unsigned itemCount = _r.itemCount();
clog(NetAllDetail) << "Transactions (" << dec << itemCount << "entries)";
Guard l(_peer->x_knownTransactions);

1
libethereum/EthereumHost.h

@ -80,6 +80,7 @@ public:
BlockChain const& chain() const { return m_chain; }
BlockQueue& bq() { return m_bq; }
BlockQueue const& bq() const { return m_bq; }
SyncStatus status() const;
h256 latestBlockSent() { return m_latestBlockSent; }
static char const* stateName(SyncState _s) { return s_stateNames[static_cast<int>(_s)]; }

5
libethereum/EthereumPeer.h

@ -141,11 +141,8 @@ private:
/// This is built as we ask for hashes. Once no more hashes are given, we present this to the
/// host who initialises the DownloadMan and m_sub becomes active for us to begin asking for blocks.
h256 m_syncingLastReceivedHash; ///< Hash most recently received from peer.
h256 m_syncingLatestHash; ///< Peer's latest block's hash, as of the current sync.
u256 m_syncingTotalDifficulty; ///< Peer's latest block's total difficulty, as of the current sync.
unsigned m_expectedHashes = 0; ///< Estimated upper bound of hashes to expect from this peer.
u256 m_syncHashNumber = 0; ///< Number of latest hash we sync to (PV61+)
u256 m_syncHashNumber = 0; ///< Number of latest hash we sync to (PV61+)
h256 m_syncHash; ///< Latest hash we sync to (PV60)
/// Once we're asking for blocks, this becomes in use.

Loading…
Cancel
Save