Browse Source

pv61 sync

re-enable pv60
cl-refactor
arkpar 10 years ago
parent
commit
cfcd722c51
  1. 2
      alethzero/MainWin.cpp
  2. 345
      libethereum/BlockChainSync.cpp
  3. 46
      libethereum/BlockChainSync.h
  4. 1
      libethereum/CommonNet.h
  5. 23
      libethereum/EthereumHost.cpp
  6. 2
      libethereum/EthereumHost.h
  7. 17
      libethereum/EthereumPeer.cpp
  8. 2
      libethereum/EthereumPeer.h
  9. 5
      libp2p/Session.cpp

2
alethzero/MainWin.cpp

@ -1252,7 +1252,7 @@ void Main::refreshBlockCount()
auto d = ethereum()->blockChain().details();
BlockQueueStatus b = ethereum()->blockQueueStatus();
SyncStatus sync = ethereum()->syncStatus();
QString syncStatus = EthereumHost::stateName(sync.state);
QString syncStatus = QString("PV%1 %2").arg(sync.protocolVersion).arg(EthereumHost::stateName(sync.state));
if (sync.state == SyncState::Hashes)
syncStatus += QString(": %1/%2%3").arg(sync.hashesReceived).arg(sync.hashesEstimated ? "~" : "").arg(sync.hashesTotal);
if (sync.state == SyncState::Blocks || sync.state == SyncState::NewBlocks)

345
libethereum/BlockChainSync.cpp

@ -38,7 +38,8 @@ using namespace dev;
using namespace dev::eth;
using namespace p2p;
unsigned const c_chainReorgSize = 30000;
unsigned const c_chainReorgSize = 30000; /// Added to estimated hashes to account for potential chain reorganiation
unsigned const c_hashSubchainSize = 8192; /// PV61 subchain size
BlockChainSync::BlockChainSync(EthereumHost& _host):
m_host(_host)
@ -114,7 +115,6 @@ void BlockChainSync::requestBlocks(std::shared_ptr<EthereumPeer> _peer)
{
clog(NetAllDetail) << "Waiting for block queue before downloading blocks";
pauseSync();
_peer->setIdle();
return;
}
_peer->requestBlocks();
@ -137,11 +137,9 @@ void BlockChainSync::logNewBlock(h256 const& _h)
void BlockChainSync::onPeerBlocks(std::shared_ptr<EthereumPeer> _peer, RLP const& _r)
{
RecursiveGuard l(x_sync);
DEV_INVARIANT_CHECK;
unsigned itemCount = _r.itemCount();
clog(NetMessageSummary) << "Blocks (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreBlocks");
_peer->setIdle();
if (m_state != SyncState::Blocks && m_state != SyncState::NewBlocks && m_state != SyncState::Waiting)
clog(NetWarn) << "Unexpected Blocks received!";
if (m_state == SyncState::Waiting)
@ -305,6 +303,7 @@ SyncStatus PV60Sync::status() const
RecursiveGuard l(x_sync);
SyncStatus res;
res.state = m_state;
res.protocolVersion = EthereumHost::c_oldProtocolVersion;
if (m_state == SyncState::Hashes)
{
res.hashesTotal = m_estimatedHashes;
@ -381,26 +380,10 @@ void PV60Sync::transition(std::shared_ptr<EthereumPeer> _peer, SyncState _s, boo
RLPStream s;
if (_s == SyncState::Hashes)
{
if (m_state == SyncState::Idle)
{
if (isSyncing(_peer))
clog(NetWarn) << "Bad state: not asking for Hashes, yet syncing!";
m_syncingLatestHash = _peer->m_latestHash;
m_syncingTotalDifficulty = _peer->m_totalDifficulty;
setState(_peer, _s, true);
_peer->requestHashes(m_syncingLastReceivedHash ? m_syncingLastReceivedHash : m_syncingLatestHash);
DEV_INVARIANT_CHECK;
return;
}
else if (m_state == SyncState::Hashes)
if (m_state == SyncState::Idle || m_state == SyncState::Hashes)
{
if (!isSyncing(_peer))
clog(NetWarn) << "Bad state: asking for Hashes yet not syncing!";
setState(_peer, _s, true);
_peer->requestHashes(m_syncingLastReceivedHash);
DEV_INVARIANT_CHECK;
m_estimatedHashes = _peer->m_expectedHashes - c_chainReorgSize;
syncHashes(_peer);
return;
}
}
@ -462,7 +445,6 @@ void PV60Sync::transition(std::shared_ptr<EthereumPeer> _peer, SyncState _s, boo
}
else if (_s == SyncState::Idle)
{
host().foreachPeer([this](std::shared_ptr<EthereumPeer> _p) { _p->setIdle(); return true; });
if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks)
{
clog(NetMessageDetail) << "Finishing blocks fetch...";
@ -473,7 +455,6 @@ void PV60Sync::transition(std::shared_ptr<EthereumPeer> _peer, SyncState _s, boo
// NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary.
_peer->m_sub.doneFetch();
_peer->setIdle();
setState(_peer, SyncState::Idle, false);
}
else if (m_state == SyncState::Hashes)
@ -562,7 +543,6 @@ void PV60Sync::attemptSync(std::shared_ptr<EthereumPeer> _peer)
else
{
clog(NetAllDetail) << "Yes. Their chain is better.";
m_estimatedHashes = _peer->m_expectedHashes - c_chainReorgSize;
transition(_peer, SyncState::Hashes);
}
}
@ -573,7 +553,9 @@ void PV60Sync::noteNeedsSyncing(std::shared_ptr<EthereumPeer> _peer)
if (isSyncing())
{
clog(NetAllDetail) << "Sync in progress: Just set to help out.";
if (m_state == SyncState::Blocks)
if (m_state == SyncState::Hashes && _peer->m_asking == Asking::Nothing)
requestSubchain(_peer);
else if (m_state == SyncState::Blocks)
requestBlocks(_peer);
}
else
@ -649,20 +631,45 @@ void PV60Sync::noteDoneBlocks(std::shared_ptr<EthereumPeer> _peer, bool _clemenc
}
resetSync();
downloadMan().reset();
}
_peer->m_sub.doneFetch();
}
void PV60Sync::syncHashes(std::shared_ptr<EthereumPeer> _peer)
{
if (m_state == SyncState::Idle)
{
if (isSyncing(_peer))
clog(NetWarn) << "Bad state: not asking for Hashes, yet syncing!";
m_syncingLatestHash = _peer->m_latestHash;
m_syncingTotalDifficulty = _peer->m_totalDifficulty;
setState(_peer, SyncState::Hashes, true);
_peer->requestHashes(m_syncingLastReceivedHash ? m_syncingLastReceivedHash : m_syncingLatestHash);
}
else if (m_state == SyncState::Hashes)
{
if (!isSyncing(_peer))
clog(NetWarn) << "Bad state: asking for Hashes yet not syncing!";
setState(_peer, SyncState::Hashes, true);
_peer->requestHashes(m_syncingLastReceivedHash);
}
}
void PV60Sync::onPeerHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes)
{
RecursiveGuard l(x_sync);
DEV_INVARIANT_CHECK;
_peer->setIdle();
if (!isSyncing(_peer))
{
clog(NetMessageSummary) << "Ignoring hashes since not syncing";
return;
}
if (_peer->m_syncHash != (m_syncingLastReceivedHash ? m_syncingLastReceivedHash : m_syncingLatestHash))
{
clog(NetMessageSummary) << "Ignoring unexpected hashes";
return;
}
if (_hashes.size() == 0)
{
transition(_peer, SyncState::Blocks);
@ -711,7 +718,6 @@ void PV60Sync::onPeerHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _h
void PV60Sync::onPeerNewHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes)
{
RecursiveGuard l(x_sync);
DEV_INVARIANT_CHECK;
if (isSyncing() && (m_state != SyncState::NewBlocks || isSyncing(_peer)))
{
clog(NetMessageSummary) << "Ignoring since we're already downloading.";
@ -769,7 +775,6 @@ void PV60Sync::onPeerNewHashes(std::shared_ptr<EthereumPeer> _peer, h256s const&
void PV60Sync::abortSync()
{
// Can't check invariants here since the peers is already removed from the list and the state is not updated yet.
host().foreachPeer([this](std::shared_ptr<EthereumPeer> _p) { _p->setIdle(); return true; });
setState(std::shared_ptr<EthereumPeer>(), SyncState::Idle, false, true);
DEV_INVARIANT_CHECK;
}
@ -820,3 +825,281 @@ bool PV60Sync::invariants() const
return false;
return true;
}
PV61Sync::PV61Sync(EthereumHost& _host):
PV60Sync(_host)
{
}
void PV61Sync::syncHashes(std::shared_ptr<EthereumPeer> _peer)
{
if (_peer->m_protocolVersion != host().protocolVersion())
{
m_readyChainMap.clear();
m_completeChainMap.clear();
m_downloadingChainMap.clear();
m_syncingBlockNumber = 0;
m_chainSyncPeers.clear();
m_knownHashes.clear();
PV60Sync::syncHashes(_peer);
return;
}
if (m_state == SyncState::Idle)
{
if (isSyncing(_peer))
clog(NetWarn) << "Bad state: not asking for Hashes, yet syncing!";
if (m_syncingBlockNumber == 0)
m_syncingBlockNumber = host().chain().number() + c_hashSubchainSize;
m_syncingTotalDifficulty = _peer->m_totalDifficulty;
setState(_peer, SyncState::Hashes, true);
_peer->requestHashes(m_syncingBlockNumber, 1);
}
else if (m_state == SyncState::Hashes)
{
if (!isSyncing(_peer))
clog(NetWarn) << "Bad state: asking for Hashes yet not syncing!";
m_syncingBlockNumber += c_hashSubchainSize;
setState(_peer, SyncState::Hashes, true);
_peer->requestHashes(m_syncingBlockNumber, 1);
}
}
void PV61Sync::requestSubchain(std::shared_ptr<EthereumPeer> _peer)
{
auto syncPeer = m_chainSyncPeers.find(_peer);
if (syncPeer != m_chainSyncPeers.end())
{
// Already downoading, request next batch
h256s& d = m_downloadingChainMap.at(syncPeer->second);
_peer->requestHashes(d.back());
}
else if (needsSyncing(_peer) && !m_readyChainMap.empty())
{
clog(NetAllDetail) << "Helping with hashchin download";
h256s& d = m_readyChainMap.begin()->second;
_peer->requestHashes(d.back());
m_downloadingChainMap[m_readyChainMap.begin()->first] = move(d);
m_chainSyncPeers[_peer] = m_readyChainMap.begin()->first;
m_readyChainMap.erase(m_readyChainMap.begin());
}
}
void PV61Sync::requestSubchains()
{
host().foreachPeer([this](std::shared_ptr<EthereumPeer> _p)
{
if (_p->m_asking == Asking::Nothing)
requestSubchain(_p);
return true;
});
}
void PV61Sync::completeSubchain(std::shared_ptr<EthereumPeer> _peer, unsigned _n)
{
m_completeChainMap[_n] = move(m_downloadingChainMap.at(_n));
m_downloadingChainMap.erase(_n);
_peer->m_syncHashNumber = 0;
auto syncer = m_syncer.lock();
if (!syncer)
{
restartSync();
return;
}
if (m_readyChainMap.empty() && m_downloadingChainMap.empty() && syncer->m_asking == Asking::Nothing)
{
//Done chain-get
m_syncingNeededBlocks.clear();
for (auto h = m_completeChainMap.rbegin(); h != m_completeChainMap.rend(); ++h)
m_syncingNeededBlocks.insert(m_syncingNeededBlocks.end(), h->second.begin(), h->second.end());
m_completeChainMap.clear();
m_knownHashes.clear();
m_syncingBlockNumber = 0;
transition(syncer, SyncState::Blocks);
}
else
requestSubchain(_peer);
}
void PV61Sync::restartSync()
{
m_completeChainMap.clear();
m_readyChainMap.clear();
m_downloadingChainMap.clear();
m_chainSyncPeers.clear();
m_syncingBlockNumber = 0;
m_knownHashes.clear();
PV60Sync::restartSync();
}
void PV61Sync::onPeerHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes)
{
RecursiveGuard l(x_sync);
if (m_syncingBlockNumber == 0 || (_peer == m_syncer.lock() && _peer->m_protocolVersion != host().protocolVersion()))
{
// Syncing in pv60 mode
PV60Sync::onPeerHashes(_peer, _hashes);
return;
}
if (_hashes.size() == 0)
{
if (isSyncing(_peer) && _peer->m_syncHashNumber == m_syncingBlockNumber)
{
// End of hash chain, add last chunk to download
m_readyChainMap.insert(make_pair(m_syncingBlockNumber, h256s { _peer->m_latestHash }));
_peer->m_syncHashNumber = 0;
requestSubchain(_peer);
}
else
{
auto syncPeer = m_chainSyncPeers.find(_peer);
if (syncPeer == m_chainSyncPeers.end())
clog(NetWarn) << "Hashes response from unexpected peer";
else
{
// Peer does not have request hashes, move back from downloading to ready
unsigned number = syncPeer->second;
m_chainSyncPeers.erase(_peer);
m_readyChainMap[number] = move(m_downloadingChainMap.at(number));
m_downloadingChainMap.erase(number);
resetNeedsSyncing(_peer);
requestSubchains();
}
}
return;
}
if (isSyncing(_peer) && _peer->m_syncHashNumber == m_syncingBlockNumber)
{
// Got new subchain marker
assert(_hashes.size() == 1);
m_knownHashes.insert(_hashes[0]);
m_readyChainMap.insert(make_pair(m_syncingBlockNumber, h256s { _hashes[0] }));
if ((m_readyChainMap.size() + m_downloadingChainMap.size() + m_completeChainMap.size()) * c_hashSubchainSize > _peer->m_expectedHashes)
{
_peer->disable("Too many hashes from lead peer");
restartSync();
return;
}
transition(_peer, SyncState::Hashes);
requestSubchains();
}
else
{
auto syncPeer = m_chainSyncPeers.find(_peer);
if (syncPeer == m_chainSyncPeers.end())
{
clog(NetWarn) << "Hashes response from unexpected peer";
return;
}
unsigned number = syncPeer->second;
h256s& hashes = m_downloadingChainMap.at(number);
unsigned knowns = 0;
unsigned unknowns = 0;
for (unsigned i = 0; i < _hashes.size(); ++i)
{
auto h = _hashes[i];
auto status = host().bq().blockStatus(h);
if (status == QueueStatus::Importing || status == QueueStatus::Ready || host().chain().isKnown(h) || !!m_knownHashes.count(h))
{
clog(NetMessageSummary) << "Subchain download complete";
m_chainSyncPeers.erase(_peer);
completeSubchain(_peer, number);
return;
}
else if (status == QueueStatus::Bad)
{
cwarn << "block hash bad!" << h << ". Bailing...";
restartSync();
return;
}
else if (status == QueueStatus::Unknown)
{
unknowns++;
hashes.push_back(h);
}
else
knowns++;
}
clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns; now at" << hashes.back();
if (hashes.size() > c_hashSubchainSize)
{
_peer->disable("Too many subchain hashes");
restartSync();
return;
}
requestSubchain(_peer);
}
DEV_INVARIANT_CHECK;
}
void PV61Sync::onPeerAborting()
{
RecursiveGuard l(x_sync);
// Can't check invariants here since the peers is already removed from the list and the state is not updated yet.
for (auto s = m_chainSyncPeers.begin(); s != m_chainSyncPeers.end();)
{
if (s->first.expired())
{
unsigned number = s->second;
m_readyChainMap[number] = move(m_downloadingChainMap.at(number));
m_downloadingChainMap.erase(number);
m_chainSyncPeers.erase(s++);
}
else
++s;
}
if (m_syncer.expired())
abortSync();
else if (isPV61Syncing())
requestSubchains();
DEV_INVARIANT_CHECK;
}
SyncStatus PV61Sync::status() const
{
RecursiveGuard l(x_sync);
SyncStatus res = PV60Sync::status();
if (m_state == SyncState::Hashes && isPV61Syncing())
{
res.protocolVersion = 61;
res.hashesReceived = 0;
for (auto const& d : m_readyChainMap)
res.hashesReceived += d.second.size();
for (auto const& d : m_downloadingChainMap)
res.hashesReceived += d.second.size();
for (auto const& d : m_completeChainMap)
res.hashesReceived += d.second.size();
}
return res;
}
bool PV61Sync::isPV61Syncing() const
{
return m_syncingBlockNumber != 0;
}
bool PV61Sync::invariants() const
{
if (m_downloadingChainMap.size() != m_chainSyncPeers.size())
return false;
if (m_state == SyncState::Idle && isSyncing())
return false;
if (m_state != SyncState::Idle && !isSyncing())
return false;
if (m_state == SyncState::Hashes)
{
bool hashes = false;
host().foreachPeer([&](std::shared_ptr<EthereumPeer> _p) { if (_p->m_asking == Asking::Hashes) hashes = true; return !hashes; });
if (!hashes)
return false;
if (isPV61Syncing() && !m_syncingBlockNumber)
return false;
}
else if (!PV60Sync::invariants())
return false;
return true;
}

46
libethereum/BlockChainSync.h

@ -132,8 +132,8 @@ private:
/**
* @brief Syncrhonization over PV60. Selects a single peer and tries to downloading hashes from it. After hash downaload is complete
* Syncs to peers and keeps up to date
* @brief Syncrhonization over PV60. Selects a single peer and tries to downloading hashes from it. After hash download is complete
* syncs to peers and keeps up to date
*/
/**
@ -228,7 +228,7 @@ protected:
void pauseSync() override;
void resetSyncFor(std::shared_ptr<EthereumPeer> _peer, h256 const& _latestHash, u256 const& _td) override;
private:
protected:
/// Transition sync state in a particular direction. @param _peer Peer that is responsible for state tranfer
void transition(std::shared_ptr<EthereumPeer> _peer, SyncState _s, bool _force = false, bool _needHelp = true);
@ -262,6 +262,12 @@ private:
/// Called when peer done downloading blocks
void noteDoneBlocks(std::shared_ptr<EthereumPeer> _who, bool _clemency);
/// Start chainhash sync
virtual void syncHashes(std::shared_ptr<EthereumPeer> _peer);
/// Request subchain, no-op for pv60
virtual void requestSubchain(std::shared_ptr<EthereumPeer> /*_peer*/) {}
/// Abort syncing
void abortSync();
@ -276,5 +282,39 @@ private:
u256 m_syncingTotalDifficulty; ///< Latest block's total difficulty of the peer we aresyncing to, as of the current sync.
std::weak_ptr<EthereumPeer> m_syncer; ///< Peer we are currently syncing with
};
/**
* @brief Syncrhonization over PV61. Selects a single peer and requests every c_hashSubchainSize hash, splitting the hashchain into subchains and downloading each subchain in parallel.
* Syncs to peers and keeps up to date
*/
class PV61Sync: public PV60Sync
{
public:
PV61Sync(EthereumHost& _host);
protected:
void restartSync() override;
void requestSubchain(std::shared_ptr<EthereumPeer> _peer) override;
void syncHashes(std::shared_ptr<EthereumPeer> _peer) override;
void onPeerHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes) override;
void onPeerAborting() override;
SyncStatus status() const override;
bool invariants() const override;
private:
/// Called when subchain is complete. Check if if hashchain is fully downloaded and proceed to downloading blocks
void completeSubchain(std::shared_ptr<EthereumPeer> _peer, unsigned _n);
/// Find a subchain for peers to downloading
void requestSubchains();
/// Check if downloading hashes in parallel
bool isPV61Syncing() const;
std::map<unsigned, h256s> m_completeChainMap; ///< Fully downloaded subchains
std::map<unsigned, h256s> m_readyChainMap; ///< Subchains ready for download
std::map<unsigned, h256s> m_downloadingChainMap; ///< Subchains currently being downloading. In sync with m_chainSyncPeers
std::map<std::weak_ptr<EthereumPeer>, unsigned, std::owner_less<std::weak_ptr<EthereumPeer>>> m_chainSyncPeers; ///< Peers to m_downloadingSubchain number map
h256Hash m_knownHashes; ///< Subchain start markers. Used to track suchain completion
unsigned m_syncingBlockNumber = 0; ///< Current subchain marker
};
}
}

1
libethereum/CommonNet.h

@ -91,6 +91,7 @@ enum class SyncState
struct SyncStatus
{
SyncState state = SyncState::Idle;
unsigned protocolVersion = 0;
unsigned hashesTotal = 0;
unsigned hashesReceived = 0;
bool hashesEstimated = false;

23
libethereum/EthereumHost.cpp

@ -230,10 +230,10 @@ void EthereumHost::maintainBlocks(h256 const& _currentHash)
}
}
BlockChainSync& EthereumHost::sync()
BlockChainSync* EthereumHost::sync()
{
if (m_sync)
return *m_sync; // We only chose sync strategy once
return m_sync.get(); // We only chose sync strategy once
bool pv61 = false;
foreachPeer([&](std::shared_ptr<EthereumPeer> _p)
@ -242,38 +242,43 @@ BlockChainSync& EthereumHost::sync()
pv61 = true;
return !pv61;
});
m_sync.reset(pv61 ? new PV60Sync(*this) : new PV60Sync(*this));
return *m_sync;
m_sync.reset(pv61 ? new PV61Sync(*this) : new PV60Sync(*this));
return m_sync.get();
}
void EthereumHost::onPeerStatus(std::shared_ptr<EthereumPeer> _peer)
{
Guard l(x_sync);
sync().onPeerStatus(_peer);
if (sync())
sync()->onPeerStatus(_peer);
}
void EthereumHost::onPeerHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes)
{
Guard l(x_sync);
sync().onPeerHashes(_peer, _hashes);
if (sync())
sync()->onPeerHashes(_peer, _hashes);
}
void EthereumHost::onPeerBlocks(std::shared_ptr<EthereumPeer> _peer, RLP const& _r)
{
Guard l(x_sync);
sync().onPeerBlocks(_peer, _r);
if (sync())
sync()->onPeerBlocks(_peer, _r);
}
void EthereumHost::onPeerNewHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes)
{
Guard l(x_sync);
sync().onPeerNewHashes(_peer, _hashes);
if (sync())
sync()->onPeerNewHashes(_peer, _hashes);
}
void EthereumHost::onPeerNewBlock(std::shared_ptr<EthereumPeer> _peer, RLP const& _r)
{
Guard l(x_sync);
sync().onPeerNewBlock(_peer, _r);
if (sync())
sync()->onPeerNewBlock(_peer, _r);
}
void EthereumHost::onPeerTransactions(std::shared_ptr<EthereumPeer> _peer, RLP const& _r)

2
libethereum/EthereumHost.h

@ -116,7 +116,7 @@ private:
virtual void onStarting() override { startWorking(); }
virtual void onStopping() override { stopWorking(); }
BlockChainSync& sync();
BlockChainSync* sync();
BlockChain const& m_chain;
TransactionQueue& m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain.

17
libethereum/EthereumPeer.cpp

@ -141,6 +141,7 @@ void EthereumPeer::requestStatus()
void EthereumPeer::requestHashes(u256 _number, unsigned _count)
{
assert(m_asking == Asking::Nothing);
assert(m_protocolVersion == host()->protocolVersion());
m_syncHashNumber = _number;
m_syncHash = h256();
setAsking(Asking::Hashes);
@ -198,7 +199,7 @@ void EthereumPeer::requestBlocks()
void EthereumPeer::setAsking(Asking _a)
{
m_asking = _a;
m_lastAsk = chrono::system_clock::now();
m_lastAsk = std::chrono::system_clock::to_time_t(chrono::system_clock::now());
auto s = session();
if (s)
@ -211,9 +212,14 @@ void EthereumPeer::setAsking(Asking _a)
void EthereumPeer::tick()
{
auto s = session();
if (s && (chrono::system_clock::now() - m_lastAsk > chrono::seconds(10) && m_asking != Asking::Nothing))
time_t now = std::chrono::system_clock::to_time_t(chrono::system_clock::now());
if (s && (now - m_lastAsk > 10 && m_asking != Asking::Nothing))
{
clog(NetWarn) << "timeout: " << (now - m_lastAsk) << " " <<
(m_asking == Asking::Nothing ? "nothing" : m_asking == Asking::State ? "state" : m_asking == Asking::Hashes ? "hashes" : m_asking == Asking::Blocks ? "blocks" : "?");
// timeout
s->disconnect(PingTimeout);
}
}
bool EthereumPeer::isConversing() const
@ -228,6 +234,7 @@ bool EthereumPeer::isCriticalSyncing() const
bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
{
m_lastAsk = std::chrono::system_clock::to_time_t(chrono::system_clock::now());
try
{
switch (_id)
@ -254,7 +261,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
}
clog(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << m_genesisHash << "/" << m_latestBlockNumber << ", TD:" << m_totalDifficulty << "=" << m_latestHash;
setAsking(Asking::Nothing);
setIdle();
host()->onPeerStatus(dynamic_pointer_cast<EthereumPeer>(dynamic_pointer_cast<EthereumPeer>(shared_from_this())));
break;
}
@ -311,6 +318,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
clog(NetWarn) << "Peer giving us hashes when we didn't ask for them.";
break;
}
setIdle();
h256s hashes(itemCount);
for (unsigned i = 0; i < itemCount; ++i)
hashes[i] = _r[i].toHash<h256>();
@ -357,7 +365,10 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
if (m_asking != Asking::Blocks)
clog(NetImpolite) << "Peer giving us blocks when we didn't ask for them.";
else
{
setIdle();
host()->onPeerBlocks(dynamic_pointer_cast<EthereumPeer>(shared_from_this()), _r);
}
break;
}
case NewBlockPacket:

2
libethereum/EthereumPeer.h

@ -136,7 +136,7 @@ private:
/// What, if anything, we last asked the other peer for.
Asking m_asking = Asking::Nothing;
/// When we asked for it. Allows a time out.
std::chrono::system_clock::time_point m_lastAsk;
std::atomic<time_t> m_lastAsk;
/// These are determined through either a Status message or from NewBlock.
h256 m_latestHash; ///< Peer's latest block's hash that we know about or default null value if no need to sync.

5
libp2p/Session.cpp

@ -367,7 +367,7 @@ void Session::drop(DisconnectReason _reason)
if (socket.is_open())
try
{
clog(NetConnect) << "Closing " << socket.remote_endpoint() << "(" << reasonOf(_reason) << ")";
clog(NetWarn) << "Closing " << socket.remote_endpoint() << "(" << reasonOf(_reason) << ")";
boost::system::error_code ec;
socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
socket.close();
@ -386,11 +386,12 @@ void Session::drop(DisconnectReason _reason)
void Session::disconnect(DisconnectReason _reason)
{
clog(NetConnect) << "Disconnecting (our reason:" << reasonOf(_reason) << ")";
size_t peerCount = m_server->peerCount(); //needs to be outside of lock to avoid deadlocking with other thread that capture x_info/x_sessions in reverse order
DEV_GUARDED(x_info)
StructuredLogger::p2pDisconnected(
m_info.id.abridged(),
m_peer->endpoint, // TODO: may not be 100% accurate
m_server->peerCount()
peerCount
);
if (m_socket->ref().is_open())
{

Loading…
Cancel
Save