Browse Source

Merge pull request #2322 from arkpar/bc_rf

PV61 sync
cl-refactor
Gav Wood 10 years ago
parent
commit
ee460772b7
  1. 2
      alethzero/MainWin.cpp
  2. 486
      libethereum/BlockChainSync.cpp
  3. 47
      libethereum/BlockChainSync.h
  4. 9
      libethereum/BlockQueue.cpp
  5. 1
      libethereum/CommonNet.h
  6. 30
      libethereum/EthereumHost.cpp
  7. 2
      libethereum/EthereumHost.h
  8. 34
      libethereum/EthereumPeer.cpp
  9. 4
      libethereum/EthereumPeer.h
  10. 2
      libp2p/RLPXSocket.h
  11. 3
      libp2p/Session.cpp

2
alethzero/MainWin.cpp

@ -1290,7 +1290,7 @@ void Main::refreshBlockCount()
auto d = ethereum()->blockChain().details();
BlockQueueStatus b = ethereum()->blockQueueStatus();
SyncStatus sync = ethereum()->syncStatus();
QString syncStatus = EthereumHost::stateName(sync.state);
QString syncStatus = QString("PV%1 %2").arg(sync.protocolVersion).arg(EthereumHost::stateName(sync.state));
if (sync.state == SyncState::Hashes)
syncStatus += QString(": %1/%2%3").arg(sync.hashesReceived).arg(sync.hashesEstimated ? "~" : "").arg(sync.hashesTotal);
if (sync.state == SyncState::Blocks || sync.state == SyncState::NewBlocks)

486
libethereum/BlockChainSync.cpp

@ -38,7 +38,8 @@ using namespace dev;
using namespace dev::eth;
using namespace p2p;
unsigned const c_chainReorgSize = 30000;
unsigned const c_chainReorgSize = 30000; /// Added to estimated hashes to account for potential chain reorganiation
unsigned const c_hashSubchainSize = 8192; /// PV61 subchain size
BlockChainSync::BlockChainSync(EthereumHost& _host):
m_host(_host)
@ -75,15 +76,18 @@ void BlockChainSync::onPeerStatus(std::shared_ptr<EthereumPeer> _peer)
{
RecursiveGuard l(x_sync);
DEV_INVARIANT_CHECK;
std::shared_ptr<Session> session = _peer->session();
if (!session)
return; // Expired
if (_peer->m_genesisHash != host().chain().genesisHash())
_peer->disable("Invalid genesis hash");
else if (_peer->m_protocolVersion != host().protocolVersion() && _peer->m_protocolVersion != EthereumHost::c_oldProtocolVersion)
_peer->disable("Invalid protocol version.");
else if (_peer->m_networkId != host().networkId())
_peer->disable("Invalid network identifier.");
else if (_peer->session()->info().clientVersion.find("/v0.7.0/") != string::npos)
else if (session->info().clientVersion.find("/v0.7.0/") != string::npos)
_peer->disable("Blacklisted client version.");
else if (host().isBanned(_peer->session()->id()))
else if (host().isBanned(session->id()))
_peer->disable("Peer banned for previous bad behaviour.");
else
{
@ -91,7 +95,6 @@ void BlockChainSync::onPeerStatus(std::shared_ptr<EthereumPeer> _peer)
_peer->m_expectedHashes = hashes;
onNewPeer(_peer);
}
DEV_INVARIANT_CHECK;
}
unsigned BlockChainSync::estimatedHashes() const
@ -114,7 +117,6 @@ void BlockChainSync::requestBlocks(std::shared_ptr<EthereumPeer> _peer)
{
clog(NetAllDetail) << "Waiting for block queue before downloading blocks";
pauseSync();
_peer->setIdle();
return;
}
_peer->requestBlocks();
@ -137,13 +139,14 @@ void BlockChainSync::logNewBlock(h256 const& _h)
void BlockChainSync::onPeerBlocks(std::shared_ptr<EthereumPeer> _peer, RLP const& _r)
{
RecursiveGuard l(x_sync);
DEV_INVARIANT_CHECK;
unsigned itemCount = _r.itemCount();
clog(NetMessageSummary) << "Blocks (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreBlocks");
_peer->setIdle();
if (m_state != SyncState::Blocks && m_state != SyncState::NewBlocks && m_state != SyncState::Waiting)
clog(NetWarn) << "Unexpected Blocks received!";
{
clog(NetMessageSummary) << "Ignoring unexpected blocks";
return;
}
if (m_state == SyncState::Waiting)
{
clog(NetAllDetail) << "Ignored blocks while waiting";
@ -184,6 +187,7 @@ void BlockChainSync::onPeerBlocks(std::shared_ptr<EthereumPeer> _peer, RLP const
case ImportResult::BadChain:
logNewBlock(h);
_peer->disable("Malformed block received.");
restartSync();
return;
case ImportResult::FutureTimeKnown:
@ -282,7 +286,7 @@ void BlockChainSync::onPeerNewBlock(std::shared_ptr<EthereumPeer> _peer, RLP con
case ImportResult::FutureTimeUnknown:
case ImportResult::UnknownParent:
logNewBlock(h);
clog(NetMessageSummary) << "Received block with no known parent. Resyncing...";
clog(NetMessageDetail) << "Received block with no known parent. Resyncing...";
resetSyncFor(_peer, h, _r[1].toInt<u256>());
break;
default:;
@ -291,7 +295,6 @@ void BlockChainSync::onPeerNewBlock(std::shared_ptr<EthereumPeer> _peer, RLP con
DEV_GUARDED(_peer->x_knownBlocks)
_peer->m_knownBlocks.insert(h);
}
DEV_INVARIANT_CHECK;
}
PV60Sync::PV60Sync(EthereumHost& _host):
@ -305,6 +308,7 @@ SyncStatus PV60Sync::status() const
RecursiveGuard l(x_sync);
SyncStatus res;
res.state = m_state;
res.protocolVersion = EthereumHost::c_oldProtocolVersion;
if (m_state == SyncState::Hashes)
{
res.hashesTotal = m_estimatedHashes;
@ -344,26 +348,30 @@ void PV60Sync::restartSync()
{
resetSync();
host().bq().clear();
if (isSyncing())
transition(m_syncer.lock(), SyncState::Idle);
std::shared_ptr<EthereumPeer> syncer = m_syncer.lock();
if (syncer)
transition(syncer, SyncState::Idle);
}
void PV60Sync::completeSync()
{
if (isSyncing())
transition(m_syncer.lock(), SyncState::Idle);
std::shared_ptr<EthereumPeer> syncer = m_syncer.lock();
if (syncer)
transition(syncer, SyncState::Idle);
}
void PV60Sync::pauseSync()
{
if (isSyncing())
setState(m_syncer.lock(), SyncState::Waiting, true);
std::shared_ptr<EthereumPeer> syncer = m_syncer.lock();
if (syncer)
transition(syncer, SyncState::Waiting, true);
}
void PV60Sync::continueSync()
{
if (isSyncing())
transition(m_syncer.lock(), SyncState::Blocks);
std::shared_ptr<EthereumPeer> syncer = m_syncer.lock();
if (syncer)
transition(syncer, SyncState::Blocks);
}
void PV60Sync::onNewPeer(std::shared_ptr<EthereumPeer> _peer)
@ -381,26 +389,10 @@ void PV60Sync::transition(std::shared_ptr<EthereumPeer> _peer, SyncState _s, boo
RLPStream s;
if (_s == SyncState::Hashes)
{
if (m_state == SyncState::Idle)
{
if (isSyncing(_peer))
clog(NetWarn) << "Bad state: not asking for Hashes, yet syncing!";
m_syncingLatestHash = _peer->m_latestHash;
m_syncingTotalDifficulty = _peer->m_totalDifficulty;
setState(_peer, _s, true);
_peer->requestHashes(m_syncingLastReceivedHash ? m_syncingLastReceivedHash : m_syncingLatestHash);
DEV_INVARIANT_CHECK;
return;
}
else if (m_state == SyncState::Hashes)
if (m_state == SyncState::Idle || m_state == SyncState::Hashes)
{
if (!isSyncing(_peer))
clog(NetWarn) << "Bad state: asking for Hashes yet not syncing!";
setState(_peer, _s, true);
_peer->requestHashes(m_syncingLastReceivedHash);
DEV_INVARIANT_CHECK;
m_estimatedHashes = _peer->m_expectedHashes - c_chainReorgSize;
syncHashes(_peer);
return;
}
}
@ -462,7 +454,6 @@ void PV60Sync::transition(std::shared_ptr<EthereumPeer> _peer, SyncState _s, boo
}
else if (_s == SyncState::Idle)
{
host().foreachPeer([this](std::shared_ptr<EthereumPeer> _p) { _p->setIdle(); return true; });
if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks)
{
clog(NetMessageDetail) << "Finishing blocks fetch...";
@ -473,7 +464,6 @@ void PV60Sync::transition(std::shared_ptr<EthereumPeer> _peer, SyncState _s, boo
// NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary.
_peer->m_sub.doneFetch();
_peer->setIdle();
setState(_peer, SyncState::Idle, false);
}
else if (m_state == SyncState::Hashes)
@ -502,7 +492,9 @@ void PV60Sync::setNeedsSyncing(std::shared_ptr<EthereumPeer> _peer, h256 const&
if (_peer->m_latestHash)
noteNeedsSyncing(_peer);
_peer->session()->addNote("sync", string(isSyncing(_peer) ? "ongoing" : "holding") + (needsSyncing(_peer) ? " & needed" : ""));
shared_ptr<Session> session = _peer->session();
if (session)
session->addNote("sync", string(isSyncing(_peer) ? "ongoing" : "holding") + (needsSyncing(_peer) ? " & needed" : ""));
}
bool PV60Sync::needsSyncing(std::shared_ptr<EthereumPeer> _peer) const
@ -562,7 +554,6 @@ void PV60Sync::attemptSync(std::shared_ptr<EthereumPeer> _peer)
else
{
clog(NetAllDetail) << "Yes. Their chain is better.";
m_estimatedHashes = _peer->m_expectedHashes - c_chainReorgSize;
transition(_peer, SyncState::Hashes);
}
}
@ -573,7 +564,9 @@ void PV60Sync::noteNeedsSyncing(std::shared_ptr<EthereumPeer> _peer)
if (isSyncing())
{
clog(NetAllDetail) << "Sync in progress: Just set to help out.";
if (m_state == SyncState::Blocks)
if (m_state == SyncState::Hashes && _peer->m_asking == Asking::Nothing)
requestSubchain(_peer);
else if (m_state == SyncState::Blocks)
requestBlocks(_peer);
}
else
@ -649,20 +642,45 @@ void PV60Sync::noteDoneBlocks(std::shared_ptr<EthereumPeer> _peer, bool _clemenc
}
resetSync();
downloadMan().reset();
}
_peer->m_sub.doneFetch();
}
void PV60Sync::syncHashes(std::shared_ptr<EthereumPeer> _peer)
{
if (m_state == SyncState::Idle)
{
if (isSyncing(_peer))
clog(NetWarn) << "Bad state: not asking for Hashes, yet syncing!";
m_syncingLatestHash = _peer->m_latestHash;
m_syncingTotalDifficulty = _peer->m_totalDifficulty;
setState(_peer, SyncState::Hashes, true);
_peer->requestHashes(m_syncingLastReceivedHash ? m_syncingLastReceivedHash : m_syncingLatestHash);
}
else if (m_state == SyncState::Hashes)
{
if (!isSyncing(_peer))
clog(NetWarn) << "Bad state: asking for Hashes yet not syncing!";
setState(_peer, SyncState::Hashes, true);
_peer->requestHashes(m_syncingLastReceivedHash);
}
}
void PV60Sync::onPeerHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes)
{
RecursiveGuard l(x_sync);
DEV_INVARIANT_CHECK;
_peer->setIdle();
if (!isSyncing(_peer))
{
clog(NetMessageSummary) << "Ignoring hashes since not syncing";
return;
}
if (_peer->m_syncHash != (m_syncingLastReceivedHash ? m_syncingLastReceivedHash : m_syncingLatestHash))
{
clog(NetMessageSummary) << "Ignoring unexpected hashes";
return;
}
if (_hashes.size() == 0)
{
transition(_peer, SyncState::Blocks);
@ -684,7 +702,8 @@ void PV60Sync::onPeerHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _h
else if (status == QueueStatus::Bad)
{
cwarn << "block hash bad!" << h << ". Bailing...";
transition(_peer, SyncState::Idle);
_peer->disable("Bad blocks");
restartSync();
return;
}
else if (status == QueueStatus::Unknown)
@ -711,7 +730,6 @@ void PV60Sync::onPeerHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _h
void PV60Sync::onPeerNewHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes)
{
RecursiveGuard l(x_sync);
DEV_INVARIANT_CHECK;
if (isSyncing() && (m_state != SyncState::NewBlocks || isSyncing(_peer)))
{
clog(NetMessageSummary) << "Ignoring since we're already downloading.";
@ -769,8 +787,33 @@ void PV60Sync::onPeerNewHashes(std::shared_ptr<EthereumPeer> _peer, h256s const&
void PV60Sync::abortSync()
{
// Can't check invariants here since the peers is already removed from the list and the state is not updated yet.
host().foreachPeer([this](std::shared_ptr<EthereumPeer> _p) { _p->setIdle(); return true; });
bool continueSync = false;
if (m_state == SyncState::Blocks)
{
// Main syncer aborted, try to find a replacement
host().foreachPeer([&](std::shared_ptr<EthereumPeer> _p)
{
if (_p->m_asking == Asking::Blocks)
{
setState(_p, SyncState::Blocks, true, true); // will kick off other peers to help if available.
continueSync = true;
return false;
}
if (_p->m_asking == Asking::Nothing && shouldGrabBlocks(_p))
{
transition(_p, SyncState::Blocks);
clog(NetMessageDetail) << "New sync peer selected";
continueSync = true;
return false;
}
return true;
});
}
if (!continueSync)
{
// Just set to idle. Hashchain is keept, Sync will be continued if there are more peers to sync with
setState(std::shared_ptr<EthereumPeer>(), SyncState::Idle, false, true);
}
DEV_INVARIANT_CHECK;
}
@ -778,37 +821,366 @@ void PV60Sync::onPeerAborting()
{
RecursiveGuard l(x_sync);
// Can't check invariants here since the peers is already removed from the list and the state is not updated yet.
if (m_syncer.expired())
if (m_syncer.expired() && m_state != SyncState::Idle)
{
clog(NetWarn) << "Syncing peer disconnected, restarting sync";
m_syncer.reset();
abortSync();
}
DEV_INVARIANT_CHECK;
}
bool PV60Sync::invariants() const
{
if (m_state == SyncState::Idle && isSyncing())
return false;
BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Idle while peer syncing"));
if (m_state != SyncState::Idle && !isSyncing())
return false;
BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Active while peer not syncing"));
if (m_state == SyncState::Hashes)
{
if (!m_syncingLatestHash)
return false;
BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("m_syncingLatestHash is not set while downloading hashes"));
if (m_syncingNeededBlocks.empty() != (!m_syncingLastReceivedHash))
return false;
BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Received hashes but the hashes list is empty (or the other way around)"));
}
if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks)
{
if (downloadMan().isComplete())
return false;
BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Block download complete but the state is still Blocks"));
}
if (m_state == SyncState::Waiting && !host().bq().isActive())
BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Waiting while block queue is idle"));
return true;
}
PV61Sync::PV61Sync(EthereumHost& _host):
PV60Sync(_host)
{
}
void PV61Sync::syncHashes(std::shared_ptr<EthereumPeer> _peer)
{
if (_peer->m_protocolVersion != host().protocolVersion())
{
m_readyChainMap.clear();
m_completeChainMap.clear();
m_downloadingChainMap.clear();
m_syncingBlockNumber = 0;
m_chainSyncPeers.clear();
m_knownHashes.clear();
m_hashScanComplete = false;
PV60Sync::syncHashes(_peer);
return;
}
if (m_state == SyncState::Idle)
{
bool busy = false;
host().foreachPeer([&](std::shared_ptr<EthereumPeer> _p) { if (_p->m_asking != Asking::Nothing && _p->m_asking != Asking::State) busy = true; return !busy; });
if (busy)
if (isSyncing(_peer))
clog(NetWarn) << "Bad state: not asking for Hashes, yet syncing!";
if (m_syncingBlockNumber == 0)
m_syncingBlockNumber = host().chain().number() + c_hashSubchainSize;
m_syncingTotalDifficulty = _peer->m_totalDifficulty;
setState(_peer, SyncState::Hashes, true);
_peer->requestHashes(m_syncingBlockNumber, 1);
}
else if (m_state == SyncState::Hashes)
{
if (!isSyncing(_peer))
clog(NetWarn) << "Bad state: asking for Hashes yet not syncing!";
m_syncingBlockNumber += c_hashSubchainSize;
setState(_peer, SyncState::Hashes, true);
_peer->requestHashes(m_syncingBlockNumber, 1);
}
}
void PV61Sync::requestSubchain(std::shared_ptr<EthereumPeer> _peer)
{
auto syncPeer = m_chainSyncPeers.find(_peer);
if (syncPeer != m_chainSyncPeers.end())
{
// Already downoading, request next batch
h256s& d = m_downloadingChainMap.at(syncPeer->second);
_peer->requestHashes(d.back());
}
else if (needsSyncing(_peer))
{
if (!m_readyChainMap.empty())
{
clog(NetAllDetail) << "Helping with hashchin download";
h256s& d = m_readyChainMap.begin()->second;
_peer->requestHashes(d.back());
m_downloadingChainMap[m_readyChainMap.begin()->first] = move(d);
m_chainSyncPeers[_peer] = m_readyChainMap.begin()->first;
m_readyChainMap.erase(m_readyChainMap.begin());
}
else if (!m_downloadingChainMap.empty() && m_hashScanComplete)
{
// Lead syncer is done, just grab whatever we can
h256s& d = m_downloadingChainMap.begin()->second;
_peer->requestHashes(d.back());
}
}
}
void PV61Sync::requestSubchains()
{
host().foreachPeer([this](std::shared_ptr<EthereumPeer> _p)
{
if (_p->m_asking == Asking::Nothing)
requestSubchain(_p);
return true;
});
}
void PV61Sync::completeSubchain(std::shared_ptr<EthereumPeer> _peer, unsigned _n)
{
m_completeChainMap[_n] = move(m_downloadingChainMap.at(_n));
m_downloadingChainMap.erase(_n);
for (auto s = m_chainSyncPeers.begin(); s != m_chainSyncPeers.end(); ++s)
if (s->second == _n) //TODO: optimize this
{
m_chainSyncPeers.erase(s);
break;
}
_peer->m_syncHashNumber = 0;
auto syncer = m_syncer.lock();
if (!syncer)
{
restartSync();
return;
}
if (m_readyChainMap.empty() && m_downloadingChainMap.empty() && m_hashScanComplete)
{
//Done chain-get
m_syncingNeededBlocks.clear();
for (auto h = m_completeChainMap.rbegin(); h != m_completeChainMap.rend(); ++h)
m_syncingNeededBlocks.insert(m_syncingNeededBlocks.end(), h->second.begin(), h->second.end());
m_completeChainMap.clear();
m_knownHashes.clear();
m_syncingBlockNumber = 0;
transition(syncer, SyncState::Blocks);
}
else
requestSubchain(_peer);
}
void PV61Sync::restartSync()
{
m_completeChainMap.clear();
m_readyChainMap.clear();
m_downloadingChainMap.clear();
m_chainSyncPeers.clear();
m_syncingBlockNumber = 0;
m_knownHashes.clear();
m_hashScanComplete = false;
PV60Sync::restartSync();
}
void PV61Sync::onPeerHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes)
{
RecursiveGuard l(x_sync);
if (m_syncingBlockNumber == 0 || (_peer == m_syncer.lock() && _peer->m_protocolVersion != host().protocolVersion()))
{
// Syncing in pv60 mode
PV60Sync::onPeerHashes(_peer, _hashes);
return;
}
if (_hashes.size() == 0)
{
if (isSyncing(_peer) && _peer->m_syncHashNumber == m_syncingBlockNumber)
{
// End of hash chain, add last chunk to download
m_readyChainMap.insert(make_pair(m_syncingBlockNumber, h256s { _peer->m_latestHash }));
m_hashScanComplete = true;
_peer->m_syncHashNumber = 0;
requestSubchain(_peer);
}
else
{
auto syncPeer = m_chainSyncPeers.find(_peer);
if (syncPeer == m_chainSyncPeers.end())
clog(NetMessageDetail) << "Hashes response from unexpected peer";
else
{
// Peer does not have request hashes, move back from downloading to ready
unsigned number = syncPeer->second;
m_chainSyncPeers.erase(_peer);
m_readyChainMap[number] = move(m_downloadingChainMap.at(number));
m_downloadingChainMap.erase(number);
resetNeedsSyncing(_peer);
requestSubchains();
}
}
return;
}
if (isSyncing(_peer) && _peer->m_syncHashNumber == m_syncingBlockNumber)
{
// Got new subchain marker
assert(_hashes.size() == 1);
m_knownHashes.insert(_hashes[0]);
m_readyChainMap.insert(make_pair(m_syncingBlockNumber, h256s { _hashes[0] }));
if ((m_readyChainMap.size() + m_downloadingChainMap.size() + m_completeChainMap.size()) * c_hashSubchainSize > _peer->m_expectedHashes)
{
_peer->disable("Too many hashes from lead peer");
restartSync();
return;
}
transition(_peer, SyncState::Hashes);
requestSubchains();
}
else
{
auto syncPeer = m_chainSyncPeers.find(_peer);
unsigned number = 0;
if (syncPeer == m_chainSyncPeers.end())
{
//check downlading peers
for (auto const& downloader: m_downloadingChainMap)
if (downloader.second.back() == _peer->m_syncHash)
{
number = downloader.first;
break;
}
}
else
number = syncPeer->second;
if (number == 0)
{
clog(NetAllDetail) << "Hashes response from unexpected/expired peer";
return;
}
auto downloadingPeer = m_downloadingChainMap.find(number);
if (downloadingPeer == m_downloadingChainMap.end() || downloadingPeer->second.back() != _peer->m_syncHash)
{
// Too late, other peer has already downloaded our hashes
m_chainSyncPeers.erase(_peer);
requestSubchain(_peer);
return;
}
h256s& hashes = downloadingPeer->second;
unsigned knowns = 0;
unsigned unknowns = 0;
for (unsigned i = 0; i < _hashes.size(); ++i)
{
auto h = _hashes[i];
auto status = host().bq().blockStatus(h);
if (status == QueueStatus::Importing || status == QueueStatus::Ready || host().chain().isKnown(h) || !!m_knownHashes.count(h))
{
clog(NetMessageSummary) << "Subchain download complete";
m_chainSyncPeers.erase(_peer);
completeSubchain(_peer, number);
return;
}
else if (status == QueueStatus::Bad)
{
cwarn << "block hash bad!" << h << ". Bailing...";
_peer->disable("Bad hashes");
if (isSyncing(_peer))
restartSync();
else
{
//try with other peer
m_readyChainMap[number] = move(m_downloadingChainMap.at(number));
m_downloadingChainMap.erase(number);
m_chainSyncPeers.erase(_peer);
}
return;
}
else if (status == QueueStatus::Unknown)
{
unknowns++;
hashes.push_back(h);
}
else
knowns++;
}
clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns; now at" << hashes.back();
if (hashes.size() > c_hashSubchainSize)
{
_peer->disable("Too many subchain hashes");
restartSync();
return;
}
requestSubchain(_peer);
}
DEV_INVARIANT_CHECK;
}
void PV61Sync::onPeerAborting()
{
RecursiveGuard l(x_sync);
// Can't check invariants here since the peers is already removed from the list and the state is not updated yet.
for (auto s = m_chainSyncPeers.begin(); s != m_chainSyncPeers.end();)
{
if (s->first.expired())
{
unsigned number = s->second;
m_readyChainMap[number] = move(m_downloadingChainMap.at(number));
m_downloadingChainMap.erase(number);
m_chainSyncPeers.erase(s++);
}
else
++s;
}
if (m_syncer.expired())
{
if (m_state == SyncState::Hashes)
{
// Main syncer aborted, other peers are probably still downloading hashes, just set one of them as syncer
host().foreachPeer([&](std::shared_ptr<EthereumPeer> _p)
{
if (_p->m_asking != Asking::Hashes)
return true;
setState(_p, SyncState::Hashes, true, true);
return false;
});
}
if (m_state == SyncState::Waiting && !host().bq().isActive())
if (m_syncer.expired())
PV60Sync::onPeerAborting();
}
else if (isPV61Syncing() && m_state == SyncState::Hashes)
requestSubchains();
DEV_INVARIANT_CHECK;
}
SyncStatus PV61Sync::status() const
{
RecursiveGuard l(x_sync);
SyncStatus res = PV60Sync::status();
if (m_state == SyncState::Hashes && isPV61Syncing())
{
res.protocolVersion = 61;
res.hashesReceived = 0;
for (auto const& d : m_readyChainMap)
res.hashesReceived += d.second.size();
for (auto const& d : m_downloadingChainMap)
res.hashesReceived += d.second.size();
for (auto const& d : m_completeChainMap)
res.hashesReceived += d.second.size();
}
return res;
}
bool PV61Sync::isPV61Syncing() const
{
return m_syncingBlockNumber != 0;
}
bool PV61Sync::invariants() const
{
if (m_state == SyncState::Hashes)
{
if (isPV61Syncing() && !m_syncingBlockNumber)
BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Syncing in PV61 with no block number set"));
}
else if (!PV60Sync::invariants())
return false;
return true;
}

47
libethereum/BlockChainSync.h

@ -131,8 +131,8 @@ private:
/**
* @brief Syncrhonization over PV60. Selects a single peer and tries to downloading hashes from it. After hash downaload is complete
* Syncs to peers and keeps up to date
* @brief Syncrhonization over PV60. Selects a single peer and tries to downloading hashes from it. After hash download is complete
* syncs to peers and keeps up to date
*/
/**
@ -227,7 +227,7 @@ protected:
void pauseSync() override;
void resetSyncFor(std::shared_ptr<EthereumPeer> _peer, h256 const& _latestHash, u256 const& _td) override;
private:
protected:
/// Transition sync state in a particular direction. @param _peer Peer that is responsible for state tranfer
void transition(std::shared_ptr<EthereumPeer> _peer, SyncState _s, bool _force = false, bool _needHelp = true);
@ -261,6 +261,12 @@ private:
/// Called when peer done downloading blocks
void noteDoneBlocks(std::shared_ptr<EthereumPeer> _who, bool _clemency);
/// Start chainhash sync
virtual void syncHashes(std::shared_ptr<EthereumPeer> _peer);
/// Request subchain, no-op for pv60
virtual void requestSubchain(std::shared_ptr<EthereumPeer> /*_peer*/) {}
/// Abort syncing
void abortSync();
@ -275,5 +281,40 @@ private:
u256 m_syncingTotalDifficulty; ///< Latest block's total difficulty of the peer we aresyncing to, as of the current sync.
std::weak_ptr<EthereumPeer> m_syncer; ///< Peer we are currently syncing with
};
/**
* @brief Syncrhonization over PV61. Selects a single peer and requests every c_hashSubchainSize hash, splitting the hashchain into subchains and downloading each subchain in parallel.
* Syncs to peers and keeps up to date
*/
class PV61Sync: public PV60Sync
{
public:
PV61Sync(EthereumHost& _host);
protected:
void restartSync() override;
void requestSubchain(std::shared_ptr<EthereumPeer> _peer) override;
void syncHashes(std::shared_ptr<EthereumPeer> _peer) override;
void onPeerHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes) override;
void onPeerAborting() override;
SyncStatus status() const override;
bool invariants() const override;
private:
/// Called when subchain is complete. Check if if hashchain is fully downloaded and proceed to downloading blocks
void completeSubchain(std::shared_ptr<EthereumPeer> _peer, unsigned _n);
/// Find a subchain for peers to downloading
void requestSubchains();
/// Check if downloading hashes in parallel
bool isPV61Syncing() const;
std::map<unsigned, h256s> m_completeChainMap; ///< Fully downloaded subchains
std::map<unsigned, h256s> m_readyChainMap; ///< Subchains ready for download
std::map<unsigned, h256s> m_downloadingChainMap; ///< Subchains currently being downloading. In sync with m_chainSyncPeers
std::map<std::weak_ptr<EthereumPeer>, unsigned, std::owner_less<std::weak_ptr<EthereumPeer>>> m_chainSyncPeers; ///< Peers to m_downloadingSubchain number map
h256Hash m_knownHashes; ///< Subchain start markers. Used to track suchain completion
unsigned m_syncingBlockNumber = 0; ///< Current subchain marker
bool m_hashScanComplete = false; ///< True if leading peer completed hashchain scan and we have a list of subchains ready
};
}
}

9
libethereum/BlockQueue.cpp

@ -21,6 +21,7 @@
#include "BlockQueue.h"
#include <thread>
#include <sstream>
#include <libdevcore/Log.h>
#include <libethcore/Exceptions.h>
#include <libethcore/BlockInfo.h>
@ -473,7 +474,13 @@ void BlockQueue::drain(VerifiedBlocks& o_out, unsigned _max)
bool BlockQueue::invariants() const
{
Guard l(m_verification);
return m_readySet.size() == m_verified.size() + m_unverified.size() + m_verifying.size();
if (!(m_readySet.size() == m_verified.size() + m_unverified.size() + m_verifying.size()))
{
std::stringstream s;
s << "Failed BlockQueue invariant: m_readySet: " << m_readySet.size() << " m_verified: " << m_verified.size() << " m_unverified: " << m_unverified.size() << " m_verifying" << m_verifying.size();
BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment(s.str()));
}
return true;
}
void BlockQueue::noteReady_WITH_LOCK(h256 const& _good)

1
libethereum/CommonNet.h

@ -91,6 +91,7 @@ enum class SyncState
struct SyncStatus
{
SyncState state = SyncState::Idle;
unsigned protocolVersion = 0;
unsigned hashesTotal = 0;
unsigned hashesReceived = 0;
bool hashesEstimated = false;

30
libethereum/EthereumHost.cpp

@ -230,10 +230,10 @@ void EthereumHost::maintainBlocks(h256 const& _currentHash)
}
}
BlockChainSync& EthereumHost::sync()
BlockChainSync* EthereumHost::sync()
{
if (m_sync)
return *m_sync; // We only chose sync strategy once
return m_sync.get(); // We only chose sync strategy once
bool pv61 = false;
foreachPeer([&](std::shared_ptr<EthereumPeer> _p)
@ -242,38 +242,43 @@ BlockChainSync& EthereumHost::sync()
pv61 = true;
return !pv61;
});
m_sync.reset(pv61 ? new PV60Sync(*this) : new PV60Sync(*this));
return *m_sync;
m_sync.reset(pv61 ? new PV61Sync(*this) : new PV60Sync(*this));
return m_sync.get();
}
void EthereumHost::onPeerStatus(std::shared_ptr<EthereumPeer> _peer)
{
Guard l(x_sync);
sync().onPeerStatus(_peer);
if (sync())
sync()->onPeerStatus(_peer);
}
void EthereumHost::onPeerHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes)
{
Guard l(x_sync);
sync().onPeerHashes(_peer, _hashes);
if (sync())
sync()->onPeerHashes(_peer, _hashes);
}
void EthereumHost::onPeerBlocks(std::shared_ptr<EthereumPeer> _peer, RLP const& _r)
{
Guard l(x_sync);
sync().onPeerBlocks(_peer, _r);
if (sync())
sync()->onPeerBlocks(_peer, _r);
}
void EthereumHost::onPeerNewHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes)
{
Guard l(x_sync);
sync().onPeerNewHashes(_peer, _hashes);
if (sync())
sync()->onPeerNewHashes(_peer, _hashes);
}
void EthereumHost::onPeerNewBlock(std::shared_ptr<EthereumPeer> _peer, RLP const& _r)
{
Guard l(x_sync);
sync().onPeerNewBlock(_peer, _r);
if (sync())
sync()->onPeerNewBlock(_peer, _r);
}
void EthereumHost::onPeerTransactions(std::shared_ptr<EthereumPeer> _peer, RLP const& _r)
@ -312,8 +317,15 @@ void EthereumHost::onPeerTransactions(std::shared_ptr<EthereumPeer> _peer, RLP c
void EthereumHost::onPeerAborting()
{
Guard l(x_sync);
try
{
if (m_sync)
m_sync->onPeerAborting();
}
catch (Exception&)
{
cwarn << "Exception on peer destruciton: " << boost::current_exception_diagnostic_information();
}
}
bool EthereumHost::isSyncing() const

2
libethereum/EthereumHost.h

@ -115,7 +115,7 @@ private:
virtual void onStarting() override { startWorking(); }
virtual void onStopping() override { stopWorking(); }
BlockChainSync& sync();
BlockChainSync* sync();
BlockChain const& m_chain;
TransactionQueue& m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain.

34
libethereum/EthereumPeer.cpp

@ -127,20 +127,19 @@ void EthereumPeer::requestStatus()
m_requireTransactions = true;
RLPStream s;
bool latest = m_peerCapabilityVersion == host()->protocolVersion();
prep(s, StatusPacket, latest ? 6 : 5)
prep(s, StatusPacket, 5)
<< (latest ? host()->protocolVersion() : EthereumHost::c_oldProtocolVersion)
<< host()->networkId()
<< host()->chain().details().totalDifficulty
<< host()->chain().currentHash()
<< host()->chain().genesisHash();
if (latest)
s << u256(host()->chain().number());
sealAndSend(s);
}
void EthereumPeer::requestHashes(u256 _number, unsigned _count)
{
assert(m_asking == Asking::Nothing);
assert(m_protocolVersion == host()->protocolVersion());
m_syncHashNumber = _number;
m_syncHash = h256();
setAsking(Asking::Hashes);
@ -198,7 +197,7 @@ void EthereumPeer::requestBlocks()
void EthereumPeer::setAsking(Asking _a)
{
m_asking = _a;
m_lastAsk = chrono::system_clock::now();
m_lastAsk = std::chrono::system_clock::to_time_t(chrono::system_clock::now());
auto s = session();
if (s)
@ -211,7 +210,8 @@ void EthereumPeer::setAsking(Asking _a)
void EthereumPeer::tick()
{
auto s = session();
if (s && (chrono::system_clock::now() - m_lastAsk > chrono::seconds(10) && m_asking != Asking::Nothing))
time_t now = std::chrono::system_clock::to_time_t(chrono::system_clock::now());
if (s && (now - m_lastAsk > 10 && m_asking != Asking::Nothing))
// timeout
s->disconnect(PingTimeout);
}
@ -228,6 +228,7 @@ bool EthereumPeer::isCriticalSyncing() const
bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
{
m_lastAsk = std::chrono::system_clock::to_time_t(chrono::system_clock::now());
try
{
switch (_id)
@ -240,21 +241,10 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
m_latestHash = _r[3].toHash<h256>();
m_genesisHash = _r[4].toHash<h256>();
if (m_peerCapabilityVersion == host()->protocolVersion())
{
if (_r.itemCount() != 6)
{
clog(NetImpolite) << "Peer does not support PV61+ status extension.";
m_protocolVersion = EthereumHost::c_oldProtocolVersion;
}
else
{
m_protocolVersion = host()->protocolVersion();
m_latestBlockNumber = _r[5].toInt<u256>();
}
}
clog(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << m_genesisHash << "/" << m_latestBlockNumber << ", TD:" << m_totalDifficulty << "=" << m_latestHash;
setAsking(Asking::Nothing);
clog(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << m_genesisHash << ", TD:" << m_totalDifficulty << "=" << m_latestHash;
setIdle();
host()->onPeerStatus(dynamic_pointer_cast<EthereumPeer>(dynamic_pointer_cast<EthereumPeer>(shared_from_this())));
break;
}
@ -311,6 +301,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
clog(NetWarn) << "Peer giving us hashes when we didn't ask for them.";
break;
}
setIdle();
h256s hashes(itemCount);
for (unsigned i = 0; i < itemCount; ++i)
hashes[i] = _r[i].toHash<h256>();
@ -357,7 +348,10 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
if (m_asking != Asking::Blocks)
clog(NetImpolite) << "Peer giving us blocks when we didn't ask for them.";
else
{
setIdle();
host()->onPeerBlocks(dynamic_pointer_cast<EthereumPeer>(shared_from_this()), _r);
}
break;
}
case NewBlockPacket:
@ -381,9 +375,9 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
return false;
}
}
catch (Exception const& _e)
catch (Exception const&)
{
clog(NetWarn) << "Peer causing an Exception:" << _e.what() << _r;
clog(NetWarn) << "Peer causing an Exception:" << boost::current_exception_diagnostic_information() << _r;
}
catch (std::exception const& _e)
{

4
libethereum/EthereumPeer.h

@ -135,14 +135,12 @@ private:
/// What, if anything, we last asked the other peer for.
Asking m_asking = Asking::Nothing;
/// When we asked for it. Allows a time out.
std::chrono::system_clock::time_point m_lastAsk;
std::atomic<time_t> m_lastAsk;
/// These are determined through either a Status message or from NewBlock.
h256 m_latestHash; ///< Peer's latest block's hash that we know about or default null value if no need to sync.
u256 m_totalDifficulty; ///< Peer's latest block's total difficulty.
h256 m_genesisHash; ///< Peer's genesis hash
u256 m_latestBlockNumber; ///< Number of the latest block this peer has
/// This is built as we ask for hashes. Once no more hashes are given, we present this to the
/// host who initialises the DownloadMan and m_sub becomes active for us to begin asking for blocks.

2
libp2p/RLPXSocket.h

@ -45,7 +45,7 @@ public:
bool isConnected() const { return m_socket.is_open(); }
void close() { try { boost::system::error_code ec; m_socket.shutdown(bi::tcp::socket::shutdown_both, ec); if (m_socket.is_open()) m_socket.close(); } catch (...){} }
bi::tcp::endpoint remoteEndpoint() { try { return m_socket.remote_endpoint(); } catch (...){ return bi::tcp::endpoint(); } }
bi::tcp::endpoint remoteEndpoint() { boost::system::error_code ec; return m_socket.remote_endpoint(ec); }
bi::tcp::socket& ref() { return m_socket; }
protected:

3
libp2p/Session.cpp

@ -334,11 +334,12 @@ void Session::drop(DisconnectReason _reason)
void Session::disconnect(DisconnectReason _reason)
{
clog(NetConnect) << "Disconnecting (our reason:" << reasonOf(_reason) << ")";
size_t peerCount = m_server->peerCount(); //needs to be outside of lock to avoid deadlocking with other thread that capture x_info/x_sessions in reverse order
DEV_GUARDED(x_info)
StructuredLogger::p2pDisconnected(
m_info.id.abridged(),
m_peer->endpoint, // TODO: may not be 100% accurate
m_server->peerCount()
peerCount
);
if (m_socket->ref().is_open())
{

Loading…
Cancel
Save