Browse Source

used weak_ptr for session and peers, minor sync optimization

cl-refactor
arkpar 10 years ago
parent
commit
cd3355f23f
  1. 1
      libethcore/Farm.h
  2. 124
      libethereum/BlockChainSync.cpp
  3. 66
      libethereum/BlockChainSync.h
  4. 3
      libethereum/Client.cpp
  5. 34
      libethereum/EthereumHost.cpp
  6. 19
      libethereum/EthereumHost.h
  7. 64
      libethereum/EthereumPeer.cpp
  8. 5
      libethereum/EthereumPeer.h
  9. 12
      libp2p/Capability.cpp
  10. 8
      libp2p/Capability.h
  11. 2
      libp2p/Host.cpp
  12. 4
      libp2p/HostCapability.h
  13. 2
      libwhisper/WhisperPeer.cpp
  14. 2
      libwhisper/WhisperPeer.h

1
libethcore/Farm.h

@ -95,7 +95,6 @@ public:
void stop() void stop()
{ {
WriteGuard l(x_minerWork); WriteGuard l(x_minerWork);
cdebug << "Farm::stop()";
m_miners.clear(); m_miners.clear();
m_work.reset(); m_work.reset();
m_isMining = false; m_isMining = false;

124
libethereum/BlockChainSync.cpp

@ -69,12 +69,11 @@ DownloadMan& BlockChainSync::downloadMan()
void BlockChainSync::abortSync() void BlockChainSync::abortSync()
{ {
DEV_INVARIANT_CHECK; DEV_INVARIANT_CHECK;
host().foreachPeer([this](EthereumPeer* _p) { onPeerAborting(_p); return true; });
downloadMan().resetToChain(h256s()); downloadMan().resetToChain(h256s());
DEV_INVARIANT_CHECK; DEV_INVARIANT_CHECK;
} }
void BlockChainSync::onPeerStatus(EthereumPeer* _peer) void BlockChainSync::onPeerStatus(std::shared_ptr<EthereumPeer> _peer)
{ {
RecursiveGuard l(x_sync); RecursiveGuard l(x_sync);
DEV_INVARIANT_CHECK; DEV_INVARIANT_CHECK;
@ -111,7 +110,7 @@ unsigned BlockChainSync::estimatedHashes() const
return blockCount; return blockCount;
} }
void BlockChainSync::requestBlocks(EthereumPeer* _peer) void BlockChainSync::requestBlocks(std::shared_ptr<EthereumPeer> _peer)
{ {
if (host().bq().knownFull()) if (host().bq().knownFull())
{ {
@ -130,7 +129,13 @@ void BlockChainSync::requestBlocks(EthereumPeer* _peer)
} }
} }
void BlockChainSync::onPeerBlocks(EthereumPeer* _peer, RLP const& _r) void BlockChainSync::logNewBlock(h256 const& _h)
{
if (m_state == SyncState::NewBlocks)
clog(NetAllDetail) << "NewBlock: " << _h;
}
void BlockChainSync::onPeerBlocks(std::shared_ptr<EthereumPeer> _peer, RLP const& _r)
{ {
RecursiveGuard l(x_sync); RecursiveGuard l(x_sync);
DEV_INVARIANT_CHECK; DEV_INVARIANT_CHECK;
@ -138,7 +143,7 @@ void BlockChainSync::onPeerBlocks(EthereumPeer* _peer, RLP const& _r)
clog(NetMessageSummary) << "Blocks (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreBlocks"); clog(NetMessageSummary) << "Blocks (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreBlocks");
_peer->setIdle(); _peer->setIdle();
if (m_state != SyncState::Blocks && m_state != SyncState::NewBlocks) if (m_state != SyncState::Blocks && m_state != SyncState::NewBlocks && m_state != SyncState::Waiting)
clog(NetWarn) << "Unexpected Blocks received!"; clog(NetWarn) << "Unexpected Blocks received!";
if (m_state == SyncState::Waiting) if (m_state == SyncState::Waiting)
{ {
@ -173,14 +178,17 @@ void BlockChainSync::onPeerBlocks(EthereumPeer* _peer, RLP const& _r)
{ {
case ImportResult::Success: case ImportResult::Success:
success++; success++;
logNewBlock(h);
break; break;
case ImportResult::Malformed: case ImportResult::Malformed:
case ImportResult::BadChain: case ImportResult::BadChain:
logNewBlock(h);
_peer->disable("Malformed block received."); _peer->disable("Malformed block received.");
return; return;
case ImportResult::FutureTimeKnown: case ImportResult::FutureTimeKnown:
logNewBlock(h);
future++; future++;
break; break;
case ImportResult::AlreadyInChain: case ImportResult::AlreadyInChain:
@ -194,6 +202,7 @@ void BlockChainSync::onPeerBlocks(EthereumPeer* _peer, RLP const& _r)
case ImportResult::UnknownParent: case ImportResult::UnknownParent:
{ {
unknown++; unknown++;
logNewBlock(h);
if (m_state == SyncState::NewBlocks) if (m_state == SyncState::NewBlocks)
{ {
BlockInfo bi; BlockInfo bi;
@ -241,7 +250,7 @@ void BlockChainSync::onPeerBlocks(EthereumPeer* _peer, RLP const& _r)
DEV_INVARIANT_CHECK; DEV_INVARIANT_CHECK;
} }
void BlockChainSync::onPeerNewBlock(EthereumPeer* _peer, RLP const& _r) void BlockChainSync::onPeerNewBlock(std::shared_ptr<EthereumPeer> _peer, RLP const& _r)
{ {
DEV_INVARIANT_CHECK; DEV_INVARIANT_CHECK;
RecursiveGuard l(x_sync); RecursiveGuard l(x_sync);
@ -256,6 +265,7 @@ void BlockChainSync::onPeerNewBlock(EthereumPeer* _peer, RLP const& _r)
{ {
case ImportResult::Success: case ImportResult::Success:
_peer->addRating(100); _peer->addRating(100);
logNewBlock(h);
break; break;
case ImportResult::FutureTimeKnown: case ImportResult::FutureTimeKnown:
//TODO: Rating dependent on how far in future it is. //TODO: Rating dependent on how far in future it is.
@ -263,6 +273,7 @@ void BlockChainSync::onPeerNewBlock(EthereumPeer* _peer, RLP const& _r)
case ImportResult::Malformed: case ImportResult::Malformed:
case ImportResult::BadChain: case ImportResult::BadChain:
logNewBlock(h);
_peer->disable("Malformed block received."); _peer->disable("Malformed block received.");
return; return;
@ -272,6 +283,7 @@ void BlockChainSync::onPeerNewBlock(EthereumPeer* _peer, RLP const& _r)
case ImportResult::FutureTimeUnknown: case ImportResult::FutureTimeUnknown:
case ImportResult::UnknownParent: case ImportResult::UnknownParent:
logNewBlock(h);
clog(NetMessageSummary) << "Received block with no known parent. Resyncing..."; clog(NetMessageSummary) << "Received block with no known parent. Resyncing...";
resetSyncFor(_peer, h, _r[1].toInt<u256>()); resetSyncFor(_peer, h, _r[1].toInt<u256>());
break; break;
@ -309,17 +321,17 @@ SyncStatus PV60Sync::status() const
return res; return res;
} }
void PV60Sync::setState(EthereumPeer* _peer, SyncState _s, bool _isSyncing, bool _needHelp) void PV60Sync::setState(std::shared_ptr<EthereumPeer> _peer, SyncState _s, bool _isSyncing, bool _needHelp)
{ {
bool changedState = (m_state != _s); bool changedState = (m_state != _s);
m_state = _s; m_state = _s;
if (_isSyncing != (m_syncer == _peer) || (_isSyncing && changedState)) if (_isSyncing != (m_syncer.lock() == _peer) || (_isSyncing && changedState))
changeSyncer(_isSyncing ? _peer : nullptr, _needHelp); changeSyncer(_isSyncing ? _peer : nullptr, _needHelp);
else if (_s == SyncState::Idle) else if (_s == SyncState::Idle)
changeSyncer(nullptr, _needHelp); changeSyncer(nullptr, _needHelp);
assert(!!m_syncer || _s == SyncState::Idle); assert(isSyncing() || _s == SyncState::Idle);
} }
void PV60Sync::resetSync() void PV60Sync::resetSync()
@ -335,32 +347,33 @@ void PV60Sync::restartSync()
resetSync(); resetSync();
host().bq().clear(); host().bq().clear();
if (isSyncing()) if (isSyncing())
transition(m_syncer, SyncState::Idle); transition(m_syncer.lock(), SyncState::Idle);
} }
void PV60Sync::completeSync() void PV60Sync::completeSync()
{ {
if (isSyncing()) if (isSyncing())
transition(m_syncer, SyncState::Idle); transition(m_syncer.lock(), SyncState::Idle);
} }
void PV60Sync::pauseSync() void PV60Sync::pauseSync()
{ {
if (isSyncing()) if (isSyncing())
setState(m_syncer, SyncState::Waiting, true); setState(m_syncer.lock(), SyncState::Waiting, true);
} }
void PV60Sync::continueSync() void PV60Sync::continueSync()
{ {
transition(m_syncer, SyncState::Blocks); if (isSyncing())
transition(m_syncer.lock(), SyncState::Blocks);
} }
void PV60Sync::onNewPeer(EthereumPeer* _peer) void PV60Sync::onNewPeer(std::shared_ptr<EthereumPeer> _peer)
{ {
setNeedsSyncing(_peer, _peer->m_latestHash, _peer->m_totalDifficulty); setNeedsSyncing(_peer, _peer->m_latestHash, _peer->m_totalDifficulty);
} }
void PV60Sync::transition(EthereumPeer* _peer, SyncState _s, bool _force, bool _needHelp) void PV60Sync::transition(std::shared_ptr<EthereumPeer> _peer, SyncState _s, bool _force, bool _needHelp)
{ {
clog(NetMessageSummary) << "Transition!" << EthereumHost::stateName(_s) << "from" << EthereumHost::stateName(m_state) << ", " << (isSyncing(_peer) ? "syncing" : "holding") << (needsSyncing(_peer) ? "& needed" : ""); clog(NetMessageSummary) << "Transition!" << EthereumHost::stateName(_s) << "from" << EthereumHost::stateName(m_state) << ", " << (isSyncing(_peer) ? "syncing" : "holding") << (needsSyncing(_peer) ? "& needed" : "");
@ -451,7 +464,7 @@ void PV60Sync::transition(EthereumPeer* _peer, SyncState _s, bool _force, bool _
} }
else if (_s == SyncState::Idle) else if (_s == SyncState::Idle)
{ {
host().foreachPeer([this](EthereumPeer* _p) { _p->setIdle(); return true; }); host().foreachPeer([this](std::shared_ptr<EthereumPeer> _p) { _p->setIdle(); return true; });
if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks) if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks)
{ {
clog(NetNote) << "Finishing blocks fetch..."; clog(NetNote) << "Finishing blocks fetch...";
@ -478,12 +491,12 @@ void PV60Sync::transition(EthereumPeer* _peer, SyncState _s, bool _force, bool _
clog(NetWarn) << "Invalid state transition:" << EthereumHost::stateName(_s) << "from" << EthereumHost::stateName(m_state) << ", " << (isSyncing(_peer) ? "syncing" : "holding") << (needsSyncing(_peer) ? "& needed" : ""); clog(NetWarn) << "Invalid state transition:" << EthereumHost::stateName(_s) << "from" << EthereumHost::stateName(m_state) << ", " << (isSyncing(_peer) ? "syncing" : "holding") << (needsSyncing(_peer) ? "& needed" : "");
} }
void PV60Sync::resetSyncFor(EthereumPeer* _peer, h256 const& _latestHash, u256 const& _td) void PV60Sync::resetSyncFor(std::shared_ptr<EthereumPeer> _peer, h256 const& _latestHash, u256 const& _td)
{ {
setNeedsSyncing(_peer, _latestHash, _td); setNeedsSyncing(_peer, _latestHash, _td);
} }
void PV60Sync::setNeedsSyncing(EthereumPeer* _peer, h256 const& _latestHash, u256 const& _td) void PV60Sync::setNeedsSyncing(std::shared_ptr<EthereumPeer> _peer, h256 const& _latestHash, u256 const& _td)
{ {
_peer->m_latestHash = _latestHash; _peer->m_latestHash = _latestHash;
_peer->m_totalDifficulty = _td; _peer->m_totalDifficulty = _td;
@ -494,17 +507,17 @@ void PV60Sync::setNeedsSyncing(EthereumPeer* _peer, h256 const& _latestHash, u25
_peer->session()->addNote("sync", string(isSyncing(_peer) ? "ongoing" : "holding") + (needsSyncing(_peer) ? " & needed" : "")); _peer->session()->addNote("sync", string(isSyncing(_peer) ? "ongoing" : "holding") + (needsSyncing(_peer) ? " & needed" : ""));
} }
bool PV60Sync::needsSyncing(EthereumPeer* _peer) const bool PV60Sync::needsSyncing(std::shared_ptr<EthereumPeer> _peer) const
{ {
return !!_peer->m_latestHash; return !!_peer->m_latestHash;
} }
bool PV60Sync::isSyncing(EthereumPeer* _peer) const bool PV60Sync::isSyncing(std::shared_ptr<EthereumPeer> _peer) const
{ {
return m_syncer == _peer; return m_syncer.lock() == _peer;
} }
bool PV60Sync::shouldGrabBlocks(EthereumPeer* _peer) const bool PV60Sync::shouldGrabBlocks(std::shared_ptr<EthereumPeer> _peer) const
{ {
auto td = _peer->m_totalDifficulty; auto td = _peer->m_totalDifficulty;
auto lh = _peer->m_latestHash; auto lh = _peer->m_latestHash;
@ -521,7 +534,7 @@ bool PV60Sync::shouldGrabBlocks(EthereumPeer* _peer) const
return true; return true;
} }
void PV60Sync::attemptSync(EthereumPeer* _peer) void PV60Sync::attemptSync(std::shared_ptr<EthereumPeer> _peer)
{ {
if (m_state != SyncState::Idle) if (m_state != SyncState::Idle)
{ {
@ -556,7 +569,7 @@ void PV60Sync::attemptSync(EthereumPeer* _peer)
} }
} }
void PV60Sync::noteNeedsSyncing(EthereumPeer* _peer) void PV60Sync::noteNeedsSyncing(std::shared_ptr<EthereumPeer> _peer)
{ {
// if already downloading hash-chain, ignore. // if already downloading hash-chain, ignore.
if (isSyncing()) if (isSyncing())
@ -570,7 +583,7 @@ void PV60Sync::noteNeedsSyncing(EthereumPeer* _peer)
attemptSync(_peer); attemptSync(_peer);
} }
void PV60Sync::changeSyncer(EthereumPeer* _syncer, bool _needHelp) void PV60Sync::changeSyncer(std::shared_ptr<EthereumPeer> _syncer, bool _needHelp)
{ {
if (_syncer) if (_syncer)
clog(NetAllDetail) << "Changing syncer to" << _syncer->session()->socketId(); clog(NetAllDetail) << "Changing syncer to" << _syncer->session()->socketId();
@ -581,7 +594,7 @@ void PV60Sync::changeSyncer(EthereumPeer* _syncer, bool _needHelp)
if (isSyncing()) if (isSyncing())
{ {
if (_needHelp && (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks)) if (_needHelp && (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks))
host().foreachPeer([&](EthereumPeer* _p) host().foreachPeer([&](std::shared_ptr<EthereumPeer> _p)
{ {
clog(NetNote) << "Getting help with downloading blocks"; clog(NetNote) << "Getting help with downloading blocks";
if (_p != _syncer && _p->m_asking == Asking::Nothing) if (_p != _syncer && _p->m_asking == Asking::Nothing)
@ -592,7 +605,7 @@ void PV60Sync::changeSyncer(EthereumPeer* _syncer, bool _needHelp)
else else
{ {
// start grabbing next hash chain if there is one. // start grabbing next hash chain if there is one.
host().foreachPeer([this](EthereumPeer* _p) host().foreachPeer([this](std::shared_ptr<EthereumPeer> _p)
{ {
attemptSync(_p); attemptSync(_p);
return !isSyncing(); return !isSyncing();
@ -604,15 +617,15 @@ void PV60Sync::changeSyncer(EthereumPeer* _syncer, bool _needHelp)
clog(NetNote) << "No more peers to sync with."; clog(NetNote) << "No more peers to sync with.";
} }
} }
assert(!!m_syncer || m_state == SyncState::Idle); assert(isSyncing() || m_state == SyncState::Idle);
} }
void PV60Sync::peerDoneBlocks(EthereumPeer* _peer) void PV60Sync::peerDoneBlocks(std::shared_ptr<EthereumPeer> _peer)
{ {
noteDoneBlocks(_peer, false); noteDoneBlocks(_peer, false);
} }
void PV60Sync::noteDoneBlocks(EthereumPeer* _peer, bool _clemency) void PV60Sync::noteDoneBlocks(std::shared_ptr<EthereumPeer> _peer, bool _clemency)
{ {
resetNeedsSyncing(_peer); resetNeedsSyncing(_peer);
if (downloadMan().isComplete()) if (downloadMan().isComplete())
@ -643,7 +656,7 @@ void PV60Sync::noteDoneBlocks(EthereumPeer* _peer, bool _clemency)
_peer->m_sub.doneFetch(); _peer->m_sub.doneFetch();
} }
void PV60Sync::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes) void PV60Sync::onPeerHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes)
{ {
RecursiveGuard l(x_sync); RecursiveGuard l(x_sync);
DEV_INVARIANT_CHECK; DEV_INVARIANT_CHECK;
@ -698,11 +711,11 @@ void PV60Sync::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes)
DEV_INVARIANT_CHECK; DEV_INVARIANT_CHECK;
} }
void PV60Sync::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes) void PV60Sync::onPeerNewHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes)
{ {
RecursiveGuard l(x_sync); RecursiveGuard l(x_sync);
DEV_INVARIANT_CHECK; DEV_INVARIANT_CHECK;
if (isSyncing()) if (isSyncing() && (m_state != SyncState::NewBlocks || isSyncing(_peer)))
{ {
clog(NetMessageSummary) << "Ignoring since we're already downloading."; clog(NetMessageSummary) << "Ignoring since we're already downloading.";
return; return;
@ -734,43 +747,56 @@ void PV60Sync::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes)
clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns"; clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns";
if (unknowns > 0) if (unknowns > 0)
{ {
clog(NetNote) << "Not syncing and new block hash discovered: syncing without help."; if (m_state == SyncState::NewBlocks)
downloadMan().resetToChain(m_syncingNeededBlocks); {
clog(NetNote) << "Downloading new blocks and seeing new hashes. Trying grabbing blocks";
_peer->requestBlocks(m_syncingNeededBlocks);
}
else
{
clog(NetNote) << "Not syncing and new block hash discovered: syncing without help.";
downloadMan().resetToChain(m_syncingNeededBlocks);
transition(_peer, SyncState::NewBlocks, false, false);
}
resetSync(); resetSync();
transition(_peer, SyncState::NewBlocks, false, false);
for (auto const& h: m_syncingNeededBlocks)
if (!m_knownNewHashes.count(h))
{
m_knownNewHashes.insert(h);
clog(NetMessageDetail) << "NewHashes: " << h;
}
} }
DEV_INVARIANT_CHECK; DEV_INVARIANT_CHECK;
} }
void PV60Sync::abortSync(EthereumPeer* _peer) void PV60Sync::abortSync()
{ {
// Can't check invariants here since the peers is already removed from the list and the state is not updated yet. // Can't check invariants here since the peers is already removed from the list and the state is not updated yet.
if (isSyncing(_peer)) host().foreachPeer([this](std::shared_ptr<EthereumPeer> _p) { _p->setIdle(); return true; });
{ setState(std::shared_ptr<EthereumPeer>(), SyncState::Idle, false, true);
host().foreachPeer([this](EthereumPeer* _p) { _p->setIdle(); return true; });
transition(_peer, SyncState::Idle, true);
}
DEV_INVARIANT_CHECK; DEV_INVARIANT_CHECK;
} }
void PV60Sync::onPeerAborting(EthereumPeer* _peer) void PV60Sync::onPeerAborting()
{ {
RecursiveGuard l(x_sync); RecursiveGuard l(x_sync);
// Can't check invariants here since the peers is already removed from the list and the state is not updated yet. // Can't check invariants here since the peers is already removed from the list and the state is not updated yet.
abortSync(_peer); if (m_syncer.expired())
abortSync();
DEV_INVARIANT_CHECK; DEV_INVARIANT_CHECK;
} }
bool PV60Sync::invariants() const bool PV60Sync::invariants() const
{ {
if (m_state == SyncState::Idle && !!m_syncer) if (m_state == SyncState::Idle && isSyncing())
return false; return false;
if (m_state != SyncState::Idle && !m_syncer) if (m_state != SyncState::Idle && !isSyncing())
return false; return false;
if (m_state == SyncState::Hashes) if (m_state == SyncState::Hashes)
{ {
bool hashes = false; bool hashes = false;
host().foreachPeer([&](EthereumPeer* _p) { if (_p->m_asking == Asking::Hashes) hashes = true; return !hashes; }); host().foreachPeer([&](std::shared_ptr<EthereumPeer> _p) { if (_p->m_asking == Asking::Hashes) hashes = true; return !hashes; });
if (!hashes) if (!hashes)
return false; return false;
if (!m_syncingLatestHash) if (!m_syncingLatestHash)
@ -781,7 +807,7 @@ bool PV60Sync::invariants() const
if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks) if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks)
{ {
bool blocks = false; bool blocks = false;
host().foreachPeer([&](EthereumPeer* _p) { if (_p->m_asking == Asking::Blocks) blocks = true; return !blocks; }); host().foreachPeer([&](std::shared_ptr<EthereumPeer> _p) { if (_p->m_asking == Asking::Blocks) blocks = true; return !blocks; });
if (!blocks) if (!blocks)
return false; return false;
if (downloadMan().isComplete()) if (downloadMan().isComplete())
@ -790,7 +816,7 @@ bool PV60Sync::invariants() const
if (m_state == SyncState::Idle) if (m_state == SyncState::Idle)
{ {
bool busy = false; bool busy = false;
host().foreachPeer([&](EthereumPeer* _p) { if (_p->m_asking != Asking::Nothing && _p->m_asking != Asking::State) busy = true; return !busy; }); host().foreachPeer([&](std::shared_ptr<EthereumPeer> _p) { if (_p->m_asking != Asking::Nothing && _p->m_asking != Asking::State) busy = true; return !busy; });
if (busy) if (busy)
return false; return false;
} }

66
libethereum/BlockChainSync.h

@ -60,22 +60,22 @@ public:
virtual bool isSyncing() const = 0; virtual bool isSyncing() const = 0;
/// Called by peer to report status /// Called by peer to report status
virtual void onPeerStatus(EthereumPeer* _peer); virtual void onPeerStatus(std::shared_ptr<EthereumPeer> _peer);
/// Called by peer once it has new blocks during syn /// Called by peer once it has new blocks during syn
virtual void onPeerBlocks(EthereumPeer* _peer, RLP const& _r); virtual void onPeerBlocks(std::shared_ptr<EthereumPeer> _peer, RLP const& _r);
/// Called by peer once it has new blocks /// Called by peer once it has new blocks
virtual void onPeerNewBlock(EthereumPeer* _peer, RLP const& _r); virtual void onPeerNewBlock(std::shared_ptr<EthereumPeer> _peer, RLP const& _r);
/// Called by peer once it has new hashes /// Called by peer once it has new hashes
virtual void onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes) = 0; virtual void onPeerNewHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes) = 0;
/// Called by peer once it has another sequential block of hashes during sync /// Called by peer once it has another sequential block of hashes during sync
virtual void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes) = 0; virtual void onPeerHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes) = 0;
/// Called by peer when it is disconnecting /// Called by peer when it is disconnecting
virtual void onPeerAborting(EthereumPeer* _peer) = 0; virtual void onPeerAborting() = 0;
/// @returns Synchonization status /// @returns Synchonization status
virtual SyncStatus status() const = 0; virtual SyncStatus status() const = 0;
@ -85,10 +85,10 @@ public:
protected: protected:
//To be implemented in derived classes: //To be implemented in derived classes:
/// New valid peer appears /// New valid peer appears
virtual void onNewPeer(EthereumPeer* _peer) = 0; virtual void onNewPeer(std::shared_ptr<EthereumPeer> _peer) = 0;
/// Peer done downloading blocks /// Peer done downloading blocks
virtual void peerDoneBlocks(EthereumPeer* _peer) = 0; virtual void peerDoneBlocks(std::shared_ptr<EthereumPeer> _peer) = 0;
/// Resume downloading after witing state /// Resume downloading after witing state
virtual void continueSync() = 0; virtual void continueSync() = 0;
@ -103,7 +103,7 @@ protected:
virtual void pauseSync() = 0; virtual void pauseSync() = 0;
/// Restart sync for given peer /// Restart sync for given peer
virtual void resetSyncFor(EthereumPeer* _peer, h256 const& _latestHash, u256 const& _td) = 0; virtual void resetSyncFor(std::shared_ptr<EthereumPeer> _peer, h256 const& _latestHash, u256 const& _td) = 0;
EthereumHost& host() { return m_host; } EthereumHost& host() { return m_host; }
EthereumHost const& host() const { return m_host; } EthereumHost const& host() const { return m_host; }
@ -112,7 +112,7 @@ protected:
unsigned estimatedHashes() const; unsigned estimatedHashes() const;
/// Request blocks from peer if needed /// Request blocks from peer if needed
void requestBlocks(EthereumPeer* _peer); void requestBlocks(std::shared_ptr<EthereumPeer> _peer);
protected: protected:
Handler m_bqRoomAvailable; ///< Triggered once block queue Handler m_bqRoomAvailable; ///< Triggered once block queue
@ -123,6 +123,8 @@ protected:
private: private:
static char const* const s_stateNames[static_cast<int>(SyncState::Size)]; static char const* const s_stateNames[static_cast<int>(SyncState::Size)];
bool invariants() const override = 0; bool invariants() const override = 0;
void logNewBlock(h256 const& _h);
EthereumHost& m_host; EthereumHost& m_host;
HashDownloadMan m_hashMan; HashDownloadMan m_hashMan;
}; };
@ -202,65 +204,65 @@ public:
PV60Sync(EthereumHost& _host); PV60Sync(EthereumHost& _host);
/// @returns true is Sync is in progress /// @returns true is Sync is in progress
bool isSyncing() const override { return !!m_syncer; } bool isSyncing() const override { return !!m_syncer.lock(); }
/// Called by peer once it has new hashes /// Called by peer once it has new hashes
void onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes) override; void onPeerNewHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes) override;
/// Called by peer once it has another sequential block of hashes during sync /// Called by peer once it has another sequential block of hashes during sync
void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes) override; void onPeerHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes) override;
/// Called by peer when it is disconnecting /// Called by peer when it is disconnecting
void onPeerAborting(EthereumPeer* _peer) override; void onPeerAborting() override;
/// @returns Sync status /// @returns Sync status
SyncStatus status() const override; SyncStatus status() const override;
protected: protected:
void onNewPeer(EthereumPeer* _peer) override; void onNewPeer(std::shared_ptr<EthereumPeer> _peer) override;
void continueSync() override; void continueSync() override;
void peerDoneBlocks(EthereumPeer* _peer) override; void peerDoneBlocks(std::shared_ptr<EthereumPeer> _peer) override;
void restartSync() override; void restartSync() override;
void completeSync() override; void completeSync() override;
void pauseSync() override; void pauseSync() override;
void resetSyncFor(EthereumPeer* _peer, h256 const& _latestHash, u256 const& _td) override; void resetSyncFor(std::shared_ptr<EthereumPeer> _peer, h256 const& _latestHash, u256 const& _td) override;
private: private:
/// Transition sync state in a particular direction. @param _peer Peer that is responsible for state tranfer /// Transition sync state in a particular direction. @param _peer Peer that is responsible for state tranfer
void transition(EthereumPeer* _peer, SyncState _s, bool _force = false, bool _needHelp = true); void transition(std::shared_ptr<EthereumPeer> _peer, SyncState _s, bool _force = false, bool _needHelp = true);
/// Reset peer syncing requirements state. /// Reset peer syncing requirements state.
void resetNeedsSyncing(EthereumPeer* _peer) { setNeedsSyncing(_peer, h256(), 0); } void resetNeedsSyncing(std::shared_ptr<EthereumPeer> _peer) { setNeedsSyncing(_peer, h256(), 0); }
/// Update peer syncing requirements state. /// Update peer syncing requirements state.
void setNeedsSyncing(EthereumPeer* _peer, h256 const& _latestHash, u256 const& _td); void setNeedsSyncing(std::shared_ptr<EthereumPeer> _peer, h256 const& _latestHash, u256 const& _td);
/// Do we presently need syncing with this peer? /// Do we presently need syncing with this peer?
bool needsSyncing(EthereumPeer* _peer) const; bool needsSyncing(std::shared_ptr<EthereumPeer> _peer) const;
/// Check whether the session should bother grabbing blocks from a peer. /// Check whether the session should bother grabbing blocks from a peer.
bool shouldGrabBlocks(EthereumPeer* _peer) const; bool shouldGrabBlocks(std::shared_ptr<EthereumPeer> _peer) const;
/// Attempt to begin syncing with the peer; first check the peer has a more difficlult chain to download, then start asking for hashes, then move to blocks /// Attempt to begin syncing with the peer; first check the peer has a more difficlult chain to download, then start asking for hashes, then move to blocks
void attemptSync(EthereumPeer* _peer); void attemptSync(std::shared_ptr<EthereumPeer> _peer);
/// Update our syncing state /// Update our syncing state
void setState(EthereumPeer* _peer, SyncState _s, bool _isSyncing = false, bool _needHelp = false); void setState(std::shared_ptr<EthereumPeer> _peer, SyncState _s, bool _isSyncing = false, bool _needHelp = false);
/// Check if peer is main syncer /// Check if peer is main syncer
bool isSyncing(EthereumPeer* _peer) const; bool isSyncing(std::shared_ptr<EthereumPeer> _peer) const;
/// Check if we need (re-)syncing with the peer. /// Check if we need (re-)syncing with the peer.
void noteNeedsSyncing(EthereumPeer* _who); void noteNeedsSyncing(std::shared_ptr<EthereumPeer> _who);
/// Set main syncing peer /// Set main syncing peer
void changeSyncer(EthereumPeer* _syncer, bool _needHelp); void changeSyncer(std::shared_ptr<EthereumPeer> _syncer, bool _needHelp);
/// Called when peer done downloading blocks /// Called when peer done downloading blocks
void noteDoneBlocks(EthereumPeer* _who, bool _clemency); void noteDoneBlocks(std::shared_ptr<EthereumPeer> _who, bool _clemency);
/// Abort syncing for peer /// Abort syncing
void abortSync(EthereumPeer* _peer); void abortSync();
/// Reset hash chain syncing /// Reset hash chain syncing
void resetSync(); void resetSync();
@ -271,8 +273,8 @@ private:
h256 m_syncingLastReceivedHash; ///< Hash most recently received from peer. h256 m_syncingLastReceivedHash; ///< Hash most recently received from peer.
h256 m_syncingLatestHash; ///< Latest block's hash of the peer we are syncing to, as of the current sync. h256 m_syncingLatestHash; ///< Latest block's hash of the peer we are syncing to, as of the current sync.
u256 m_syncingTotalDifficulty; ///< Latest block's total difficulty of the peer we aresyncing to, as of the current sync. u256 m_syncingTotalDifficulty; ///< Latest block's total difficulty of the peer we aresyncing to, as of the current sync.
// TODO: switch to weak_ptr h256Hash m_knownNewHashes; ///< New hashes we know about use for logging only
EthereumPeer* m_syncer = nullptr; ///< Peer we are currently syncing with std::weak_ptr<EthereumPeer> m_syncer; ///< Peer we are currently syncing with
}; };
} }
} }

3
libethereum/Client.cpp

@ -632,7 +632,8 @@ void Client::syncBlockQueue()
tie(ir, m_syncBlockQueue, count) = m_bc.sync(m_bq, m_stateDB, m_syncAmount); tie(ir, m_syncBlockQueue, count) = m_bc.sync(m_bq, m_stateDB, m_syncAmount);
double elapsed = t.elapsed(); double elapsed = t.elapsed();
cnote << count << "blocks imported in" << unsigned(elapsed * 1000) << "ms (" << (count / elapsed) << "blocks/s)"; if (count)
cnote << count << "blocks imported in" << unsigned(elapsed * 1000) << "ms (" << (count / elapsed) << "blocks/s)";
if (elapsed > c_targetDuration * 1.1 && count > c_syncMin) if (elapsed > c_targetDuration * 1.1 && count > c_syncMin)
m_syncAmount = max(c_syncMin, count * 9 / 10); m_syncAmount = max(c_syncMin, count * 9 / 10);

34
libethereum/EthereumHost.cpp

@ -105,7 +105,7 @@ void EthereumHost::doWork()
} }
} }
foreachPeer([](EthereumPeer* _p) { _p->tick(); return true; }); foreachPeer([](std::shared_ptr<EthereumPeer> _p) { _p->tick(); return true; });
// return netChange; // return netChange;
// TODO: Figure out what to do with netChange. // TODO: Figure out what to do with netChange.
@ -126,7 +126,7 @@ void EthereumHost::maintainTransactions()
} }
for (auto const& t: ts) for (auto const& t: ts)
m_transactionsSent.insert(t.first); m_transactionsSent.insert(t.first);
foreachPeerPtr([&](shared_ptr<EthereumPeer> _p) foreachPeer([&](shared_ptr<EthereumPeer> _p)
{ {
bytes b; bytes b;
unsigned n = 0; unsigned n = 0;
@ -151,17 +151,7 @@ void EthereumHost::maintainTransactions()
}); });
} }
void EthereumHost::foreachPeer(std::function<bool(EthereumPeer*)> const& _f) const void EthereumHost::foreachPeer(std::function<bool(std::shared_ptr<EthereumPeer>)> const& _f) const
{
foreachPeerPtr([&](std::shared_ptr<EthereumPeer> _p)
{
if (_p)
return _f(_p.get());
return true;
});
}
void EthereumHost::foreachPeerPtr(std::function<bool(std::shared_ptr<EthereumPeer>)> const& _f) const
{ {
for (auto s: peerSessions()) for (auto s: peerSessions())
if (!_f(s.first->cap<EthereumPeer>())) if (!_f(s.first->cap<EthereumPeer>()))
@ -246,7 +236,7 @@ BlockChainSync& EthereumHost::sync()
return *m_sync; // We only chose sync strategy once return *m_sync; // We only chose sync strategy once
bool pv61 = false; bool pv61 = false;
foreachPeer([&](EthereumPeer* _p) foreachPeer([&](std::shared_ptr<EthereumPeer> _p)
{ {
if (_p->m_protocolVersion == protocolVersion()) if (_p->m_protocolVersion == protocolVersion())
pv61 = true; pv61 = true;
@ -256,37 +246,37 @@ BlockChainSync& EthereumHost::sync()
return *m_sync; return *m_sync;
} }
void EthereumHost::onPeerStatus(EthereumPeer* _peer) void EthereumHost::onPeerStatus(std::shared_ptr<EthereumPeer> _peer)
{ {
Guard l(x_sync); Guard l(x_sync);
sync().onPeerStatus(_peer); sync().onPeerStatus(_peer);
} }
void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes) void EthereumHost::onPeerHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes)
{ {
Guard l(x_sync); Guard l(x_sync);
sync().onPeerHashes(_peer, _hashes); sync().onPeerHashes(_peer, _hashes);
} }
void EthereumHost::onPeerBlocks(EthereumPeer* _peer, RLP const& _r) void EthereumHost::onPeerBlocks(std::shared_ptr<EthereumPeer> _peer, RLP const& _r)
{ {
Guard l(x_sync); Guard l(x_sync);
sync().onPeerBlocks(_peer, _r); sync().onPeerBlocks(_peer, _r);
} }
void EthereumHost::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes) void EthereumHost::onPeerNewHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes)
{ {
Guard l(x_sync); Guard l(x_sync);
sync().onPeerNewHashes(_peer, _hashes); sync().onPeerNewHashes(_peer, _hashes);
} }
void EthereumHost::onPeerNewBlock(EthereumPeer* _peer, RLP const& _r) void EthereumHost::onPeerNewBlock(std::shared_ptr<EthereumPeer> _peer, RLP const& _r)
{ {
Guard l(x_sync); Guard l(x_sync);
sync().onPeerNewBlock(_peer, _r); sync().onPeerNewBlock(_peer, _r);
} }
void EthereumHost::onPeerTransactions(EthereumPeer* _peer, RLP const& _r) void EthereumHost::onPeerTransactions(std::shared_ptr<EthereumPeer> _peer, RLP const& _r)
{ {
if (_peer->isCriticalSyncing()) if (_peer->isCriticalSyncing())
{ {
@ -319,11 +309,11 @@ void EthereumHost::onPeerTransactions(EthereumPeer* _peer, RLP const& _r)
} }
} }
void EthereumHost::onPeerAborting(EthereumPeer* _peer) void EthereumHost::onPeerAborting()
{ {
Guard l(x_sync); Guard l(x_sync);
if (m_sync) if (m_sync)
m_sync->onPeerAborting(_peer); m_sync->onPeerAborting();
} }
bool EthereumHost::isSyncing() const bool EthereumHost::isSyncing() const

19
libethereum/EthereumHost.h

@ -86,16 +86,15 @@ public:
static char const* stateName(SyncState _s) { return s_stateNames[static_cast<int>(_s)]; } static char const* stateName(SyncState _s) { return s_stateNames[static_cast<int>(_s)]; }
static unsigned const c_oldProtocolVersion; static unsigned const c_oldProtocolVersion;
void foreachPeerPtr(std::function<bool(std::shared_ptr<EthereumPeer>)> const& _f) const; void foreachPeer(std::function<bool(std::shared_ptr<EthereumPeer>)> const& _f) const;
void foreachPeer(std::function<bool(EthereumPeer*)> const& _f) const;
void onPeerStatus(std::shared_ptr<EthereumPeer> _peer);
void onPeerStatus(EthereumPeer* _peer); void onPeerHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes);
void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes); void onPeerBlocks(std::shared_ptr<EthereumPeer> _peer, RLP const& _r);
void onPeerBlocks(EthereumPeer* _peer, RLP const& _r); void onPeerNewHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes);
void onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes); void onPeerNewBlock(std::shared_ptr<EthereumPeer> _peer, RLP const& _r);
void onPeerNewBlock(EthereumPeer* _peer, RLP const& _r); void onPeerTransactions(std::shared_ptr<EthereumPeer> _peer, RLP const& _r);
void onPeerTransactions(EthereumPeer* _peer, RLP const& _r); void onPeerAborting();
void onPeerAborting(EthereumPeer* _peer);
private: private:
static char const* const s_stateNames[static_cast<int>(SyncState::Size)]; static char const* const s_stateNames[static_cast<int>(SyncState::Size)];

64
libethereum/EthereumPeer.cpp

@ -49,7 +49,7 @@ string toString(Asking _a)
return "?"; return "?";
} }
EthereumPeer::EthereumPeer(Session* _s, HostCapabilityFace* _h, unsigned _i, CapDesc const& _cap): EthereumPeer::EthereumPeer(std::shared_ptr<Session> _s, HostCapabilityFace* _h, unsigned _i, CapDesc const& _cap):
Capability(_s, _h, _i), Capability(_s, _h, _i),
m_sub(host()->downloadMan()), m_sub(host()->downloadMan()),
m_peerCapabilityVersion(_cap.second) m_peerCapabilityVersion(_cap.second)
@ -71,30 +71,39 @@ EthereumPeer::~EthereumPeer()
bool EthereumPeer::isRude() const bool EthereumPeer::isRude() const
{ {
return repMan().isRude(*session(), name()); auto s = session();
if (s)
return repMan().isRude(*s, name());
return false;
} }
unsigned EthereumPeer::askOverride() const unsigned EthereumPeer::askOverride() const
{ {
std::string static const badGeth = "Geth/v0.9.27"; std::string static const badGeth = "Geth/v0.9.27";
if (session()->info().clientVersion.substr(0, badGeth.size()) == badGeth) auto s = session();
if (!s)
return c_maxBlocksAsk;
if (s->info().clientVersion.substr(0, badGeth.size()) == badGeth)
return 1; return 1;
bytes const& d = repMan().data(*session(), name()); bytes const& d = repMan().data(*s, name());
return d.empty() ? c_maxBlocksAsk : RLP(d).toInt<unsigned>(RLP::LaisezFaire); return d.empty() ? c_maxBlocksAsk : RLP(d).toInt<unsigned>(RLP::LaisezFaire);
} }
void EthereumPeer::setRude() void EthereumPeer::setRude()
{ {
auto s = session();
if (!s)
return;
auto old = askOverride(); auto old = askOverride();
repMan().setData(*session(), name(), rlp(askOverride() / 2 + 1)); repMan().setData(*s, name(), rlp(askOverride() / 2 + 1));
cnote << "Rude behaviour; askOverride now" << askOverride() << ", was" << old; cnote << "Rude behaviour; askOverride now" << askOverride() << ", was" << old;
repMan().noteRude(*session(), name()); repMan().noteRude(*s, name());
session()->addNote("manners", "RUDE"); session()->addNote("manners", "RUDE");
} }
void EthereumPeer::abortSync() void EthereumPeer::abortSync()
{ {
host()->onPeerAborting(this); host()->onPeerAborting();
} }
EthereumHost* EthereumPeer::host() const EthereumHost* EthereumPeer::host() const
@ -153,6 +162,21 @@ void EthereumPeer::requestHashes(h256 const& _lastHash)
sealAndSend(s); sealAndSend(s);
} }
void EthereumPeer::requestBlocks(h256s const& _blocks)
{
setAsking(Asking::Blocks);
if (_blocks.size())
{
RLPStream s;
prep(s, GetBlocksPacket, _blocks.size());
for (auto const& i: _blocks)
s << i;
sealAndSend(s);
}
else
setIdle();
}
void EthereumPeer::requestBlocks() void EthereumPeer::requestBlocks()
{ {
setAsking(Asking::Blocks); setAsking(Asking::Blocks);
@ -167,7 +191,6 @@ void EthereumPeer::requestBlocks()
} }
else else
setIdle(); setIdle();
return;
} }
void EthereumPeer::setAsking(Asking _a) void EthereumPeer::setAsking(Asking _a)
@ -175,15 +198,20 @@ void EthereumPeer::setAsking(Asking _a)
m_asking = _a; m_asking = _a;
m_lastAsk = chrono::system_clock::now(); m_lastAsk = chrono::system_clock::now();
session()->addNote("ask", _a == Asking::Nothing ? "nothing" : _a == Asking::State ? "state" : _a == Asking::Hashes ? "hashes" : _a == Asking::Blocks ? "blocks" : "?"); auto s = session();
session()->addNote("sync", string(isCriticalSyncing() ? "ONGOING" : "holding") + (needsSyncing() ? " & needed" : "")); if (s)
{
s->addNote("ask", _a == Asking::Nothing ? "nothing" : _a == Asking::State ? "state" : _a == Asking::Hashes ? "hashes" : _a == Asking::Blocks ? "blocks" : "?");
s->addNote("sync", string(isCriticalSyncing() ? "ONGOING" : "holding") + (needsSyncing() ? " & needed" : ""));
}
} }
void EthereumPeer::tick() void EthereumPeer::tick()
{ {
if (chrono::system_clock::now() - m_lastAsk > chrono::seconds(10) && m_asking != Asking::Nothing) auto s = session();
if (s && (chrono::system_clock::now() - m_lastAsk > chrono::seconds(10) && m_asking != Asking::Nothing))
// timeout // timeout
session()->disconnect(PingTimeout); s->disconnect(PingTimeout);
} }
bool EthereumPeer::isConversing() const bool EthereumPeer::isConversing() const
@ -225,12 +253,12 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
clog(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << m_genesisHash << "/" << m_latestBlockNumber << ", TD:" << m_totalDifficulty << "=" << m_latestHash; clog(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << m_genesisHash << "/" << m_latestBlockNumber << ", TD:" << m_totalDifficulty << "=" << m_latestHash;
setAsking(Asking::Nothing); setAsking(Asking::Nothing);
host()->onPeerStatus(this); host()->onPeerStatus(dynamic_pointer_cast<EthereumPeer>(dynamic_pointer_cast<EthereumPeer>(shared_from_this())));
break; break;
} }
case TransactionsPacket: case TransactionsPacket:
{ {
host()->onPeerTransactions(this, _r); host()->onPeerTransactions(dynamic_pointer_cast<EthereumPeer>(dynamic_pointer_cast<EthereumPeer>(shared_from_this())), _r);
break; break;
} }
case GetBlockHashesPacket: case GetBlockHashesPacket:
@ -285,7 +313,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
for (unsigned i = 0; i < itemCount; ++i) for (unsigned i = 0; i < itemCount; ++i)
hashes[i] = _r[i].toHash<h256>(); hashes[i] = _r[i].toHash<h256>();
host()->onPeerHashes(this, hashes); host()->onPeerHashes(dynamic_pointer_cast<EthereumPeer>(shared_from_this()), hashes);
break; break;
} }
case GetBlocksPacket: case GetBlocksPacket:
@ -327,12 +355,12 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
if (m_asking != Asking::Blocks) if (m_asking != Asking::Blocks)
clog(NetImpolite) << "Peer giving us blocks when we didn't ask for them."; clog(NetImpolite) << "Peer giving us blocks when we didn't ask for them.";
else else
host()->onPeerBlocks(this, _r); host()->onPeerBlocks(dynamic_pointer_cast<EthereumPeer>(shared_from_this()), _r);
break; break;
} }
case NewBlockPacket: case NewBlockPacket:
{ {
host()->onPeerNewBlock(this, _r); host()->onPeerNewBlock(dynamic_pointer_cast<EthereumPeer>(shared_from_this()), _r);
break; break;
} }
case NewBlockHashesPacket: case NewBlockHashesPacket:
@ -344,7 +372,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
for (unsigned i = 0; i < itemCount; ++i) for (unsigned i = 0; i < itemCount; ++i)
hashes[i] = _r[i].toHash<h256>(); hashes[i] = _r[i].toHash<h256>();
host()->onPeerNewHashes(this, hashes); host()->onPeerNewHashes(dynamic_pointer_cast<EthereumPeer>(shared_from_this()), hashes);
break; break;
} }
default: default:

5
libethereum/EthereumPeer.h

@ -56,7 +56,7 @@ class EthereumPeer: public p2p::Capability
public: public:
/// Basic constructor. /// Basic constructor.
EthereumPeer(p2p::Session* _s, p2p::HostCapabilityFace* _h, unsigned _i, p2p::CapDesc const& _cap); EthereumPeer(std::shared_ptr<p2p::Session> _s, p2p::HostCapabilityFace* _h, unsigned _i, p2p::CapDesc const& _cap);
/// Basic destructor. /// Basic destructor.
virtual ~EthereumPeer(); virtual ~EthereumPeer();
@ -85,6 +85,9 @@ public:
/// Request blocks. Uses block download manager. /// Request blocks. Uses block download manager.
void requestBlocks(); void requestBlocks();
/// Request specified blocks from peer.
void requestBlocks(h256s const& _blocks);
/// Check if this node is rude. /// Check if this node is rude.
bool isRude() const; bool isRude() const;

12
libp2p/Capability.cpp

@ -28,14 +28,14 @@ using namespace std;
using namespace dev; using namespace dev;
using namespace dev::p2p; using namespace dev::p2p;
Capability::Capability(Session* _s, HostCapabilityFace* _h, unsigned _idOffset): m_session(_s), m_hostCap(_h), m_idOffset(_idOffset) Capability::Capability(std::shared_ptr<Session> _s, HostCapabilityFace* _h, unsigned _idOffset): m_session(_s), m_hostCap(_h), m_idOffset(_idOffset)
{ {
clog(NetConnect) << "New session for capability" << m_hostCap->name() << "; idOffset:" << m_idOffset; clog(NetConnect) << "New session for capability" << m_hostCap->name() << "; idOffset:" << m_idOffset;
} }
void Capability::disable(std::string const& _problem) void Capability::disable(std::string const& _problem)
{ {
clog(NetWarn) << "DISABLE: Disabling capability '" << m_hostCap->name() << "'. Reason:" << _problem; clog(NetNote) << "DISABLE: Disabling capability '" << m_hostCap->name() << "'. Reason:" << _problem;
m_enabled = false; m_enabled = false;
} }
@ -46,12 +46,16 @@ RLPStream& Capability::prep(RLPStream& _s, unsigned _id, unsigned _args)
void Capability::sealAndSend(RLPStream& _s) void Capability::sealAndSend(RLPStream& _s)
{ {
m_session->sealAndSend(_s); shared_ptr<Session> session = m_session.lock();
if (session)
session->sealAndSend(_s);
} }
void Capability::addRating(int _r) void Capability::addRating(int _r)
{ {
m_session->addRating(_r); shared_ptr<Session> session = m_session.lock();
if (session)
session->addRating(_r);
} }
ReputationManager& Capability::repMan() const ReputationManager& Capability::repMan() const

8
libp2p/Capability.h

@ -31,12 +31,12 @@ namespace p2p
class ReputationManager; class ReputationManager;
class Capability class Capability: public std::enable_shared_from_this<Capability>
{ {
friend class Session; friend class Session;
public: public:
Capability(Session* _s, HostCapabilityFace* _h, unsigned _idOffset); Capability(std::shared_ptr<Session> _s, HostCapabilityFace* _h, unsigned _idOffset);
virtual ~Capability() {} virtual ~Capability() {}
// Implement these in the derived class. // Implement these in the derived class.
@ -44,7 +44,7 @@ public:
static u256 version() { return 0; } static u256 version() { return 0; }
static unsigned messageCount() { return 0; } static unsigned messageCount() { return 0; }
*/ */
Session* session() const { return m_session; } std::shared_ptr<Session> session() const { return m_session.lock(); }
HostCapabilityFace* hostCapability() const { return m_hostCap; } HostCapabilityFace* hostCapability() const { return m_hostCap; }
Host* host() const { return m_hostCap->host(); } Host* host() const { return m_hostCap->host(); }
ReputationManager& repMan() const; ReputationManager& repMan() const;
@ -59,7 +59,7 @@ protected:
void addRating(int _r); void addRating(int _r);
private: private:
Session* m_session; std::weak_ptr<Session> m_session;
HostCapabilityFace* m_hostCap; HostCapabilityFace* m_hostCap;
bool m_enabled = true; bool m_enabled = true;
unsigned m_idOffset; unsigned m_idOffset;

2
libp2p/Host.cpp

@ -284,7 +284,7 @@ void Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameCoder*
for (auto const& i: caps) for (auto const& i: caps)
if (haveCapability(i)) if (haveCapability(i))
{ {
ps->m_capabilities[i] = shared_ptr<Capability>(m_capabilities[i]->newPeerCapability(ps.get(), o, i)); ps->m_capabilities[i] = shared_ptr<Capability>(m_capabilities[i]->newPeerCapability(ps, o, i));
o += m_capabilities[i]->messageCount(); o += m_capabilities[i]->messageCount();
} }
ps->start(); ps->start();

4
libp2p/HostCapability.h

@ -53,7 +53,7 @@ protected:
virtual u256 version() const = 0; virtual u256 version() const = 0;
CapDesc capDesc() const { return std::make_pair(name(), version()); } CapDesc capDesc() const { return std::make_pair(name(), version()); }
virtual unsigned messageCount() const = 0; virtual unsigned messageCount() const = 0;
virtual Capability* newPeerCapability(Session* _s, unsigned _idOffset, CapDesc const& _cap) = 0; virtual Capability* newPeerCapability(std::shared_ptr<Session> _s, unsigned _idOffset, CapDesc const& _cap) = 0;
virtual void onStarting() {} virtual void onStarting() {}
virtual void onStopping() {} virtual void onStopping() {}
@ -77,7 +77,7 @@ protected:
virtual std::string name() const { return PeerCap::name(); } virtual std::string name() const { return PeerCap::name(); }
virtual u256 version() const { return PeerCap::version(); } virtual u256 version() const { return PeerCap::version(); }
virtual unsigned messageCount() const { return PeerCap::messageCount(); } virtual unsigned messageCount() const { return PeerCap::messageCount(); }
virtual Capability* newPeerCapability(Session* _s, unsigned _idOffset, CapDesc const& _cap) { return new PeerCap(_s, this, _idOffset, _cap); } virtual Capability* newPeerCapability(std::shared_ptr<Session> _s, unsigned _idOffset, CapDesc const& _cap) { return new PeerCap(_s, this, _idOffset, _cap); }
}; };
} }

2
libwhisper/WhisperPeer.cpp

@ -29,7 +29,7 @@ using namespace dev;
using namespace dev::p2p; using namespace dev::p2p;
using namespace dev::shh; using namespace dev::shh;
WhisperPeer::WhisperPeer(Session* _s, HostCapabilityFace* _h, unsigned _i, CapDesc const&): Capability(_s, _h, _i) WhisperPeer::WhisperPeer(std::shared_ptr<Session> _s, HostCapabilityFace* _h, unsigned _i, CapDesc const&): Capability(_s, _h, _i)
{ {
RLPStream s; RLPStream s;
sealAndSend(prep(s, StatusPacket, 1) << version()); sealAndSend(prep(s, StatusPacket, 1) << version());

2
libwhisper/WhisperPeer.h

@ -51,7 +51,7 @@ class WhisperPeer: public Capability
friend class WhisperHost; friend class WhisperHost;
public: public:
WhisperPeer(Session* _s, HostCapabilityFace* _h, unsigned _i, CapDesc const& _cap); WhisperPeer(std::shared_ptr<Session> _s, HostCapabilityFace* _h, unsigned _i, CapDesc const& _cap);
virtual ~WhisperPeer(); virtual ~WhisperPeer();
static std::string name() { return "shh"; } static std::string name() { return "shh"; }

Loading…
Cancel
Save