Browse Source

Merge pull request #2254 from arkpar/bc_rf

p2p/sync fixes
cl-refactor
Gav Wood 10 years ago
parent
commit
e0d6e20619
  1. 1
      libethcore/Farm.h
  2. 143
      libethereum/BlockChainSync.cpp
  3. 66
      libethereum/BlockChainSync.h
  4. 3
      libethereum/Client.cpp
  5. 34
      libethereum/EthereumHost.cpp
  6. 19
      libethereum/EthereumHost.h
  7. 64
      libethereum/EthereumPeer.cpp
  8. 5
      libethereum/EthereumPeer.h
  9. 12
      libp2p/Capability.cpp
  10. 8
      libp2p/Capability.h
  11. 2
      libp2p/Host.cpp
  12. 4
      libp2p/HostCapability.h
  13. 2
      libwhisper/WhisperPeer.cpp
  14. 2
      libwhisper/WhisperPeer.h
  15. 2
      test/libp2p/capability.cpp

1
libethcore/Farm.h

@ -95,7 +95,6 @@ public:
void stop()
{
WriteGuard l(x_minerWork);
cdebug << "Farm::stop()";
m_miners.clear();
m_work.reset();
m_isMining = false;

143
libethereum/BlockChainSync.cpp

@ -68,13 +68,10 @@ DownloadMan& BlockChainSync::downloadMan()
void BlockChainSync::abortSync()
{
DEV_INVARIANT_CHECK;
host().foreachPeer([this](EthereumPeer* _p) { onPeerAborting(_p); return true; });
downloadMan().resetToChain(h256s());
DEV_INVARIANT_CHECK;
}
void BlockChainSync::onPeerStatus(EthereumPeer* _peer)
void BlockChainSync::onPeerStatus(std::shared_ptr<EthereumPeer> _peer)
{
RecursiveGuard l(x_sync);
DEV_INVARIANT_CHECK;
@ -111,7 +108,7 @@ unsigned BlockChainSync::estimatedHashes() const
return blockCount;
}
void BlockChainSync::requestBlocks(EthereumPeer* _peer)
void BlockChainSync::requestBlocks(std::shared_ptr<EthereumPeer> _peer)
{
if (host().bq().knownFull())
{
@ -130,7 +127,14 @@ void BlockChainSync::requestBlocks(EthereumPeer* _peer)
}
}
void BlockChainSync::onPeerBlocks(EthereumPeer* _peer, RLP const& _r)
void BlockChainSync::logNewBlock(h256 const& _h)
{
if (m_state == SyncState::NewBlocks)
clog(NetNote) << "NewBlock: " << _h;
m_knownNewHashes.erase(_h);
}
void BlockChainSync::onPeerBlocks(std::shared_ptr<EthereumPeer> _peer, RLP const& _r)
{
RecursiveGuard l(x_sync);
DEV_INVARIANT_CHECK;
@ -138,7 +142,7 @@ void BlockChainSync::onPeerBlocks(EthereumPeer* _peer, RLP const& _r)
clog(NetMessageSummary) << "Blocks (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreBlocks");
_peer->setIdle();
if (m_state != SyncState::Blocks && m_state != SyncState::NewBlocks)
if (m_state != SyncState::Blocks && m_state != SyncState::NewBlocks && m_state != SyncState::Waiting)
clog(NetWarn) << "Unexpected Blocks received!";
if (m_state == SyncState::Waiting)
{
@ -173,14 +177,17 @@ void BlockChainSync::onPeerBlocks(EthereumPeer* _peer, RLP const& _r)
{
case ImportResult::Success:
success++;
logNewBlock(h);
break;
case ImportResult::Malformed:
case ImportResult::BadChain:
logNewBlock(h);
_peer->disable("Malformed block received.");
return;
case ImportResult::FutureTimeKnown:
logNewBlock(h);
future++;
break;
case ImportResult::AlreadyInChain:
@ -194,6 +201,7 @@ void BlockChainSync::onPeerBlocks(EthereumPeer* _peer, RLP const& _r)
case ImportResult::UnknownParent:
{
unknown++;
logNewBlock(h);
if (m_state == SyncState::NewBlocks)
{
BlockInfo bi;
@ -241,12 +249,11 @@ void BlockChainSync::onPeerBlocks(EthereumPeer* _peer, RLP const& _r)
DEV_INVARIANT_CHECK;
}
void BlockChainSync::onPeerNewBlock(EthereumPeer* _peer, RLP const& _r)
void BlockChainSync::onPeerNewBlock(std::shared_ptr<EthereumPeer> _peer, RLP const& _r)
{
DEV_INVARIANT_CHECK;
RecursiveGuard l(x_sync);
auto h = BlockInfo::headerHash(_r[0].data());
clog(NetMessageSummary) << "NewBlock: " << h;
if (_r.itemCount() != 2)
_peer->disable("NewBlock without 2 data fields.");
@ -256,6 +263,7 @@ void BlockChainSync::onPeerNewBlock(EthereumPeer* _peer, RLP const& _r)
{
case ImportResult::Success:
_peer->addRating(100);
logNewBlock(h);
break;
case ImportResult::FutureTimeKnown:
//TODO: Rating dependent on how far in future it is.
@ -263,6 +271,7 @@ void BlockChainSync::onPeerNewBlock(EthereumPeer* _peer, RLP const& _r)
case ImportResult::Malformed:
case ImportResult::BadChain:
logNewBlock(h);
_peer->disable("Malformed block received.");
return;
@ -272,6 +281,7 @@ void BlockChainSync::onPeerNewBlock(EthereumPeer* _peer, RLP const& _r)
case ImportResult::FutureTimeUnknown:
case ImportResult::UnknownParent:
logNewBlock(h);
clog(NetMessageSummary) << "Received block with no known parent. Resyncing...";
resetSyncFor(_peer, h, _r[1].toInt<u256>());
break;
@ -309,17 +319,17 @@ SyncStatus PV60Sync::status() const
return res;
}
void PV60Sync::setState(EthereumPeer* _peer, SyncState _s, bool _isSyncing, bool _needHelp)
void PV60Sync::setState(std::shared_ptr<EthereumPeer> _peer, SyncState _s, bool _isSyncing, bool _needHelp)
{
bool changedState = (m_state != _s);
m_state = _s;
if (_isSyncing != (m_syncer == _peer) || (_isSyncing && changedState))
if (_isSyncing != (m_syncer.lock() == _peer) || (_isSyncing && changedState))
changeSyncer(_isSyncing ? _peer : nullptr, _needHelp);
else if (_s == SyncState::Idle)
changeSyncer(nullptr, _needHelp);
assert(!!m_syncer || _s == SyncState::Idle);
assert(isSyncing() || _s == SyncState::Idle);
}
void PV60Sync::resetSync()
@ -335,32 +345,33 @@ void PV60Sync::restartSync()
resetSync();
host().bq().clear();
if (isSyncing())
transition(m_syncer, SyncState::Idle);
transition(m_syncer.lock(), SyncState::Idle);
}
void PV60Sync::completeSync()
{
if (isSyncing())
transition(m_syncer, SyncState::Idle);
transition(m_syncer.lock(), SyncState::Idle);
}
void PV60Sync::pauseSync()
{
if (isSyncing())
setState(m_syncer, SyncState::Waiting, true);
setState(m_syncer.lock(), SyncState::Waiting, true);
}
void PV60Sync::continueSync()
{
transition(m_syncer, SyncState::Blocks);
if (isSyncing())
transition(m_syncer.lock(), SyncState::Blocks);
}
void PV60Sync::onNewPeer(EthereumPeer* _peer)
void PV60Sync::onNewPeer(std::shared_ptr<EthereumPeer> _peer)
{
setNeedsSyncing(_peer, _peer->m_latestHash, _peer->m_totalDifficulty);
}
void PV60Sync::transition(EthereumPeer* _peer, SyncState _s, bool _force, bool _needHelp)
void PV60Sync::transition(std::shared_ptr<EthereumPeer> _peer, SyncState _s, bool _force, bool _needHelp)
{
clog(NetMessageSummary) << "Transition!" << EthereumHost::stateName(_s) << "from" << EthereumHost::stateName(m_state) << ", " << (isSyncing(_peer) ? "syncing" : "holding") << (needsSyncing(_peer) ? "& needed" : "");
@ -404,13 +415,13 @@ void PV60Sync::transition(EthereumPeer* _peer, SyncState _s, bool _force, bool _
}
if (shouldGrabBlocks(_peer))
{
clog(NetNote) << "Difficulty of hashchain HIGHER. Grabbing" << m_syncingNeededBlocks.size() << "blocks [latest now" << m_syncingLatestHash << ", was" << host().latestBlockSent() << "]";
clog(NetMessageDetail) << "Difficulty of hashchain HIGHER. Grabbing" << m_syncingNeededBlocks.size() << "blocks [latest now" << m_syncingLatestHash << ", was" << host().latestBlockSent() << "]";
downloadMan().resetToChain(m_syncingNeededBlocks);
resetSync();
}
else
{
clog(NetNote) << "Difficulty of hashchain not HIGHER. Ignoring.";
clog(NetMessageDetail) << "Difficulty of hashchain not HIGHER. Ignoring.";
resetSync();
setState(_peer, SyncState::Idle, false);
return;
@ -451,10 +462,10 @@ void PV60Sync::transition(EthereumPeer* _peer, SyncState _s, bool _force, bool _
}
else if (_s == SyncState::Idle)
{
host().foreachPeer([this](EthereumPeer* _p) { _p->setIdle(); return true; });
host().foreachPeer([this](std::shared_ptr<EthereumPeer> _p) { _p->setIdle(); return true; });
if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks)
{
clog(NetNote) << "Finishing blocks fetch...";
clog(NetMessageDetail) << "Finishing blocks fetch...";
// a bit overkill given that the other nodes may yet have the needed blocks, but better to be safe than sorry.
if (isSyncing(_peer))
@ -467,7 +478,7 @@ void PV60Sync::transition(EthereumPeer* _peer, SyncState _s, bool _force, bool _
}
else if (m_state == SyncState::Hashes)
{
clog(NetNote) << "Finishing hashes fetch...";
clog(NetMessageDetail) << "Finishing hashes fetch...";
setState(_peer, SyncState::Idle, false);
}
// Otherwise it's fine. We don't care if it's Nothing->Nothing.
@ -478,12 +489,12 @@ void PV60Sync::transition(EthereumPeer* _peer, SyncState _s, bool _force, bool _
clog(NetWarn) << "Invalid state transition:" << EthereumHost::stateName(_s) << "from" << EthereumHost::stateName(m_state) << ", " << (isSyncing(_peer) ? "syncing" : "holding") << (needsSyncing(_peer) ? "& needed" : "");
}
void PV60Sync::resetSyncFor(EthereumPeer* _peer, h256 const& _latestHash, u256 const& _td)
void PV60Sync::resetSyncFor(std::shared_ptr<EthereumPeer> _peer, h256 const& _latestHash, u256 const& _td)
{
setNeedsSyncing(_peer, _latestHash, _td);
}
void PV60Sync::setNeedsSyncing(EthereumPeer* _peer, h256 const& _latestHash, u256 const& _td)
void PV60Sync::setNeedsSyncing(std::shared_ptr<EthereumPeer> _peer, h256 const& _latestHash, u256 const& _td)
{
_peer->m_latestHash = _latestHash;
_peer->m_totalDifficulty = _td;
@ -494,17 +505,17 @@ void PV60Sync::setNeedsSyncing(EthereumPeer* _peer, h256 const& _latestHash, u25
_peer->session()->addNote("sync", string(isSyncing(_peer) ? "ongoing" : "holding") + (needsSyncing(_peer) ? " & needed" : ""));
}
bool PV60Sync::needsSyncing(EthereumPeer* _peer) const
bool PV60Sync::needsSyncing(std::shared_ptr<EthereumPeer> _peer) const
{
return !!_peer->m_latestHash;
}
bool PV60Sync::isSyncing(EthereumPeer* _peer) const
bool PV60Sync::isSyncing(std::shared_ptr<EthereumPeer> _peer) const
{
return m_syncer == _peer;
return m_syncer.lock() == _peer;
}
bool PV60Sync::shouldGrabBlocks(EthereumPeer* _peer) const
bool PV60Sync::shouldGrabBlocks(std::shared_ptr<EthereumPeer> _peer) const
{
auto td = _peer->m_totalDifficulty;
auto lh = _peer->m_latestHash;
@ -513,7 +524,7 @@ bool PV60Sync::shouldGrabBlocks(EthereumPeer* _peer) const
if (m_syncingNeededBlocks.empty())
return false;
clog(NetNote) << "Should grab blocks? " << td << "vs" << ctd << ";" << m_syncingNeededBlocks.size() << " blocks, ends" << m_syncingNeededBlocks.back();
clog(NetMessageDetail) << "Should grab blocks? " << td << "vs" << ctd << ";" << m_syncingNeededBlocks.size() << " blocks, ends" << m_syncingNeededBlocks.back();
if (td < ctd || (td == ctd && host().chain().currentHash() == lh))
return false;
@ -521,7 +532,7 @@ bool PV60Sync::shouldGrabBlocks(EthereumPeer* _peer) const
return true;
}
void PV60Sync::attemptSync(EthereumPeer* _peer)
void PV60Sync::attemptSync(std::shared_ptr<EthereumPeer> _peer)
{
if (m_state != SyncState::Idle)
{
@ -556,7 +567,7 @@ void PV60Sync::attemptSync(EthereumPeer* _peer)
}
}
void PV60Sync::noteNeedsSyncing(EthereumPeer* _peer)
void PV60Sync::noteNeedsSyncing(std::shared_ptr<EthereumPeer> _peer)
{
// if already downloading hash-chain, ignore.
if (isSyncing())
@ -570,7 +581,7 @@ void PV60Sync::noteNeedsSyncing(EthereumPeer* _peer)
attemptSync(_peer);
}
void PV60Sync::changeSyncer(EthereumPeer* _syncer, bool _needHelp)
void PV60Sync::changeSyncer(std::shared_ptr<EthereumPeer> _syncer, bool _needHelp)
{
if (_syncer)
clog(NetAllDetail) << "Changing syncer to" << _syncer->session()->socketId();
@ -581,9 +592,9 @@ void PV60Sync::changeSyncer(EthereumPeer* _syncer, bool _needHelp)
if (isSyncing())
{
if (_needHelp && (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks))
host().foreachPeer([&](EthereumPeer* _p)
host().foreachPeer([&](std::shared_ptr<EthereumPeer> _p)
{
clog(NetNote) << "Getting help with downloading blocks";
clog(NetMessageDetail) << "Getting help with downloading blocks";
if (_p != _syncer && _p->m_asking == Asking::Nothing)
transition(_p, m_state);
return true;
@ -592,7 +603,7 @@ void PV60Sync::changeSyncer(EthereumPeer* _syncer, bool _needHelp)
else
{
// start grabbing next hash chain if there is one.
host().foreachPeer([this](EthereumPeer* _p)
host().foreachPeer([this](std::shared_ptr<EthereumPeer> _p)
{
attemptSync(_p);
return !isSyncing();
@ -601,24 +612,24 @@ void PV60Sync::changeSyncer(EthereumPeer* _syncer, bool _needHelp)
{
if (m_state != SyncState::Idle)
setState(_syncer, SyncState::Idle);
clog(NetNote) << "No more peers to sync with.";
clog(NetMessageDetail) << "No more peers to sync with.";
}
}
assert(!!m_syncer || m_state == SyncState::Idle);
assert(isSyncing() || m_state == SyncState::Idle);
}
void PV60Sync::peerDoneBlocks(EthereumPeer* _peer)
void PV60Sync::peerDoneBlocks(std::shared_ptr<EthereumPeer> _peer)
{
noteDoneBlocks(_peer, false);
}
void PV60Sync::noteDoneBlocks(EthereumPeer* _peer, bool _clemency)
void PV60Sync::noteDoneBlocks(std::shared_ptr<EthereumPeer> _peer, bool _clemency)
{
resetNeedsSyncing(_peer);
if (downloadMan().isComplete())
{
// Done our chain-get.
clog(NetNote) << "Chain download complete.";
clog(NetMessageDetail) << "Chain download complete.";
// 1/100th for each useful block hash.
_peer->addRating(downloadMan().chainSize() / 100);
downloadMan().reset();
@ -643,7 +654,7 @@ void PV60Sync::noteDoneBlocks(EthereumPeer* _peer, bool _clemency)
_peer->m_sub.doneFetch();
}
void PV60Sync::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes)
void PV60Sync::onPeerHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes)
{
RecursiveGuard l(x_sync);
DEV_INVARIANT_CHECK;
@ -698,11 +709,11 @@ void PV60Sync::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes)
DEV_INVARIANT_CHECK;
}
void PV60Sync::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes)
void PV60Sync::onPeerNewHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes)
{
RecursiveGuard l(x_sync);
DEV_INVARIANT_CHECK;
if (isSyncing())
if (isSyncing() && (m_state != SyncState::NewBlocks || isSyncing(_peer)))
{
clog(NetMessageSummary) << "Ignoring since we're already downloading.";
return;
@ -734,43 +745,55 @@ void PV60Sync::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes)
clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns";
if (unknowns > 0)
{
clog(NetNote) << "Not syncing and new block hash discovered: syncing without help.";
downloadMan().resetToChain(m_syncingNeededBlocks);
if (m_state == SyncState::NewBlocks)
{
clog(NetMessageDetail) << "Downloading new blocks and seeing new hashes. Trying grabbing blocks";
_peer->requestBlocks(m_syncingNeededBlocks);
}
else
{
clog(NetMessageDetail) << "Not syncing and new block hash discovered: syncing without help.";
downloadMan().resetToChain(m_syncingNeededBlocks);
transition(_peer, SyncState::NewBlocks, false, false);
}
for (auto const& h: m_syncingNeededBlocks)
if (!m_knownNewHashes.count(h))
{
m_knownNewHashes.insert(h);
clog(NetNote) << "NewHash: " << h;
}
resetSync();
transition(_peer, SyncState::NewBlocks, false, false);
}
DEV_INVARIANT_CHECK;
}
void PV60Sync::abortSync(EthereumPeer* _peer)
void PV60Sync::abortSync()
{
// Can't check invariants here since the peers is already removed from the list and the state is not updated yet.
if (isSyncing(_peer))
{
host().foreachPeer([this](EthereumPeer* _p) { _p->setIdle(); return true; });
transition(_peer, SyncState::Idle, true);
}
host().foreachPeer([this](std::shared_ptr<EthereumPeer> _p) { _p->setIdle(); return true; });
setState(std::shared_ptr<EthereumPeer>(), SyncState::Idle, false, true);
DEV_INVARIANT_CHECK;
}
void PV60Sync::onPeerAborting(EthereumPeer* _peer)
void PV60Sync::onPeerAborting()
{
RecursiveGuard l(x_sync);
// Can't check invariants here since the peers is already removed from the list and the state is not updated yet.
abortSync(_peer);
if (m_syncer.expired())
abortSync();
DEV_INVARIANT_CHECK;
}
bool PV60Sync::invariants() const
{
if (m_state == SyncState::Idle && !!m_syncer)
if (m_state == SyncState::Idle && isSyncing())
return false;
if (m_state != SyncState::Idle && !m_syncer)
if (m_state != SyncState::Idle && !isSyncing())
return false;
if (m_state == SyncState::Hashes)
{
bool hashes = false;
host().foreachPeer([&](EthereumPeer* _p) { if (_p->m_asking == Asking::Hashes) hashes = true; return !hashes; });
host().foreachPeer([&](std::shared_ptr<EthereumPeer> _p) { if (_p->m_asking == Asking::Hashes) hashes = true; return !hashes; });
if (!hashes)
return false;
if (!m_syncingLatestHash)
@ -781,7 +804,7 @@ bool PV60Sync::invariants() const
if (m_state == SyncState::Blocks || m_state == SyncState::NewBlocks)
{
bool blocks = false;
host().foreachPeer([&](EthereumPeer* _p) { if (_p->m_asking == Asking::Blocks) blocks = true; return !blocks; });
host().foreachPeer([&](std::shared_ptr<EthereumPeer> _p) { if (_p->m_asking == Asking::Blocks) blocks = true; return !blocks; });
if (!blocks)
return false;
if (downloadMan().isComplete())
@ -790,7 +813,7 @@ bool PV60Sync::invariants() const
if (m_state == SyncState::Idle)
{
bool busy = false;
host().foreachPeer([&](EthereumPeer* _p) { if (_p->m_asking != Asking::Nothing && _p->m_asking != Asking::State) busy = true; return !busy; });
host().foreachPeer([&](std::shared_ptr<EthereumPeer> _p) { if (_p->m_asking != Asking::Nothing && _p->m_asking != Asking::State) busy = true; return !busy; });
if (busy)
return false;
}

66
libethereum/BlockChainSync.h

@ -60,22 +60,22 @@ public:
virtual bool isSyncing() const = 0;
/// Called by peer to report status
virtual void onPeerStatus(EthereumPeer* _peer);
virtual void onPeerStatus(std::shared_ptr<EthereumPeer> _peer);
/// Called by peer once it has new blocks during syn
virtual void onPeerBlocks(EthereumPeer* _peer, RLP const& _r);
virtual void onPeerBlocks(std::shared_ptr<EthereumPeer> _peer, RLP const& _r);
/// Called by peer once it has new blocks
virtual void onPeerNewBlock(EthereumPeer* _peer, RLP const& _r);
virtual void onPeerNewBlock(std::shared_ptr<EthereumPeer> _peer, RLP const& _r);
/// Called by peer once it has new hashes
virtual void onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes) = 0;
virtual void onPeerNewHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes) = 0;
/// Called by peer once it has another sequential block of hashes during sync
virtual void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes) = 0;
virtual void onPeerHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes) = 0;
/// Called by peer when it is disconnecting
virtual void onPeerAborting(EthereumPeer* _peer) = 0;
virtual void onPeerAborting() = 0;
/// @returns Synchonization status
virtual SyncStatus status() const = 0;
@ -85,10 +85,10 @@ public:
protected:
//To be implemented in derived classes:
/// New valid peer appears
virtual void onNewPeer(EthereumPeer* _peer) = 0;
virtual void onNewPeer(std::shared_ptr<EthereumPeer> _peer) = 0;
/// Peer done downloading blocks
virtual void peerDoneBlocks(EthereumPeer* _peer) = 0;
virtual void peerDoneBlocks(std::shared_ptr<EthereumPeer> _peer) = 0;
/// Resume downloading after witing state
virtual void continueSync() = 0;
@ -103,7 +103,7 @@ protected:
virtual void pauseSync() = 0;
/// Restart sync for given peer
virtual void resetSyncFor(EthereumPeer* _peer, h256 const& _latestHash, u256 const& _td) = 0;
virtual void resetSyncFor(std::shared_ptr<EthereumPeer> _peer, h256 const& _latestHash, u256 const& _td) = 0;
EthereumHost& host() { return m_host; }
EthereumHost const& host() const { return m_host; }
@ -112,17 +112,20 @@ protected:
unsigned estimatedHashes() const;
/// Request blocks from peer if needed
void requestBlocks(EthereumPeer* _peer);
void requestBlocks(std::shared_ptr<EthereumPeer> _peer);
protected:
Handler m_bqRoomAvailable; ///< Triggered once block queue
mutable RecursiveMutex x_sync;
SyncState m_state = SyncState::Idle; ///< Current sync state
unsigned m_estimatedHashes = 0; ///< Number of estimated hashes for the last peer over PV60. Used for status reporting only.
h256Hash m_knownNewHashes; ///< New hashes we know about use for logging only
private:
static char const* const s_stateNames[static_cast<int>(SyncState::Size)];
bool invariants() const override = 0;
void logNewBlock(h256 const& _h);
EthereumHost& m_host;
HashDownloadMan m_hashMan;
};
@ -202,65 +205,65 @@ public:
PV60Sync(EthereumHost& _host);
/// @returns true is Sync is in progress
bool isSyncing() const override { return !!m_syncer; }
bool isSyncing() const override { return !!m_syncer.lock(); }
/// Called by peer once it has new hashes
void onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes) override;
void onPeerNewHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes) override;
/// Called by peer once it has another sequential block of hashes during sync
void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes) override;
void onPeerHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes) override;
/// Called by peer when it is disconnecting
void onPeerAborting(EthereumPeer* _peer) override;
void onPeerAborting() override;
/// @returns Sync status
SyncStatus status() const override;
protected:
void onNewPeer(EthereumPeer* _peer) override;
void onNewPeer(std::shared_ptr<EthereumPeer> _peer) override;
void continueSync() override;
void peerDoneBlocks(EthereumPeer* _peer) override;
void peerDoneBlocks(std::shared_ptr<EthereumPeer> _peer) override;
void restartSync() override;
void completeSync() override;
void pauseSync() override;
void resetSyncFor(EthereumPeer* _peer, h256 const& _latestHash, u256 const& _td) override;
void resetSyncFor(std::shared_ptr<EthereumPeer> _peer, h256 const& _latestHash, u256 const& _td) override;
private:
/// Transition sync state in a particular direction. @param _peer Peer that is responsible for state tranfer
void transition(EthereumPeer* _peer, SyncState _s, bool _force = false, bool _needHelp = true);
void transition(std::shared_ptr<EthereumPeer> _peer, SyncState _s, bool _force = false, bool _needHelp = true);
/// Reset peer syncing requirements state.
void resetNeedsSyncing(EthereumPeer* _peer) { setNeedsSyncing(_peer, h256(), 0); }
void resetNeedsSyncing(std::shared_ptr<EthereumPeer> _peer) { setNeedsSyncing(_peer, h256(), 0); }
/// Update peer syncing requirements state.
void setNeedsSyncing(EthereumPeer* _peer, h256 const& _latestHash, u256 const& _td);
void setNeedsSyncing(std::shared_ptr<EthereumPeer> _peer, h256 const& _latestHash, u256 const& _td);
/// Do we presently need syncing with this peer?
bool needsSyncing(EthereumPeer* _peer) const;
bool needsSyncing(std::shared_ptr<EthereumPeer> _peer) const;
/// Check whether the session should bother grabbing blocks from a peer.
bool shouldGrabBlocks(EthereumPeer* _peer) const;
bool shouldGrabBlocks(std::shared_ptr<EthereumPeer> _peer) const;
/// Attempt to begin syncing with the peer; first check the peer has a more difficlult chain to download, then start asking for hashes, then move to blocks
void attemptSync(EthereumPeer* _peer);
void attemptSync(std::shared_ptr<EthereumPeer> _peer);
/// Update our syncing state
void setState(EthereumPeer* _peer, SyncState _s, bool _isSyncing = false, bool _needHelp = false);
void setState(std::shared_ptr<EthereumPeer> _peer, SyncState _s, bool _isSyncing = false, bool _needHelp = false);
/// Check if peer is main syncer
bool isSyncing(EthereumPeer* _peer) const;
bool isSyncing(std::shared_ptr<EthereumPeer> _peer) const;
/// Check if we need (re-)syncing with the peer.
void noteNeedsSyncing(EthereumPeer* _who);
void noteNeedsSyncing(std::shared_ptr<EthereumPeer> _who);
/// Set main syncing peer
void changeSyncer(EthereumPeer* _syncer, bool _needHelp);
void changeSyncer(std::shared_ptr<EthereumPeer> _syncer, bool _needHelp);
/// Called when peer done downloading blocks
void noteDoneBlocks(EthereumPeer* _who, bool _clemency);
void noteDoneBlocks(std::shared_ptr<EthereumPeer> _who, bool _clemency);
/// Abort syncing for peer
void abortSync(EthereumPeer* _peer);
/// Abort syncing
void abortSync();
/// Reset hash chain syncing
void resetSync();
@ -271,8 +274,7 @@ private:
h256 m_syncingLastReceivedHash; ///< Hash most recently received from peer.
h256 m_syncingLatestHash; ///< Latest block's hash of the peer we are syncing to, as of the current sync.
u256 m_syncingTotalDifficulty; ///< Latest block's total difficulty of the peer we aresyncing to, as of the current sync.
// TODO: switch to weak_ptr
EthereumPeer* m_syncer = nullptr; ///< Peer we are currently syncing with
std::weak_ptr<EthereumPeer> m_syncer; ///< Peer we are currently syncing with
};
}
}

3
libethereum/Client.cpp

@ -632,7 +632,8 @@ void Client::syncBlockQueue()
tie(ir, m_syncBlockQueue, count) = m_bc.sync(m_bq, m_stateDB, m_syncAmount);
double elapsed = t.elapsed();
cnote << count << "blocks imported in" << unsigned(elapsed * 1000) << "ms (" << (count / elapsed) << "blocks/s)";
if (count)
cnote << count << "blocks imported in" << unsigned(elapsed * 1000) << "ms (" << (count / elapsed) << "blocks/s)";
if (elapsed > c_targetDuration * 1.1 && count > c_syncMin)
m_syncAmount = max(c_syncMin, count * 9 / 10);

34
libethereum/EthereumHost.cpp

@ -104,7 +104,7 @@ void EthereumHost::doWork()
}
}
foreachPeer([](EthereumPeer* _p) { _p->tick(); return true; });
foreachPeer([](std::shared_ptr<EthereumPeer> _p) { _p->tick(); return true; });
// return netChange;
// TODO: Figure out what to do with netChange.
@ -125,7 +125,7 @@ void EthereumHost::maintainTransactions()
}
for (auto const& t: ts)
m_transactionsSent.insert(t.first);
foreachPeerPtr([&](shared_ptr<EthereumPeer> _p)
foreachPeer([&](shared_ptr<EthereumPeer> _p)
{
bytes b;
unsigned n = 0;
@ -150,17 +150,7 @@ void EthereumHost::maintainTransactions()
});
}
void EthereumHost::foreachPeer(std::function<bool(EthereumPeer*)> const& _f) const
{
foreachPeerPtr([&](std::shared_ptr<EthereumPeer> _p)
{
if (_p)
return _f(_p.get());
return true;
});
}
void EthereumHost::foreachPeerPtr(std::function<bool(std::shared_ptr<EthereumPeer>)> const& _f) const
void EthereumHost::foreachPeer(std::function<bool(std::shared_ptr<EthereumPeer>)> const& _f) const
{
for (auto s: peerSessions())
if (!_f(s.first->cap<EthereumPeer>()))
@ -245,7 +235,7 @@ BlockChainSync& EthereumHost::sync()
return *m_sync; // We only chose sync strategy once
bool pv61 = false;
foreachPeer([&](EthereumPeer* _p)
foreachPeer([&](std::shared_ptr<EthereumPeer> _p)
{
if (_p->m_protocolVersion == protocolVersion())
pv61 = true;
@ -255,37 +245,37 @@ BlockChainSync& EthereumHost::sync()
return *m_sync;
}
void EthereumHost::onPeerStatus(EthereumPeer* _peer)
void EthereumHost::onPeerStatus(std::shared_ptr<EthereumPeer> _peer)
{
Guard l(x_sync);
sync().onPeerStatus(_peer);
}
void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes)
void EthereumHost::onPeerHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes)
{
Guard l(x_sync);
sync().onPeerHashes(_peer, _hashes);
}
void EthereumHost::onPeerBlocks(EthereumPeer* _peer, RLP const& _r)
void EthereumHost::onPeerBlocks(std::shared_ptr<EthereumPeer> _peer, RLP const& _r)
{
Guard l(x_sync);
sync().onPeerBlocks(_peer, _r);
}
void EthereumHost::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes)
void EthereumHost::onPeerNewHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes)
{
Guard l(x_sync);
sync().onPeerNewHashes(_peer, _hashes);
}
void EthereumHost::onPeerNewBlock(EthereumPeer* _peer, RLP const& _r)
void EthereumHost::onPeerNewBlock(std::shared_ptr<EthereumPeer> _peer, RLP const& _r)
{
Guard l(x_sync);
sync().onPeerNewBlock(_peer, _r);
}
void EthereumHost::onPeerTransactions(EthereumPeer* _peer, RLP const& _r)
void EthereumHost::onPeerTransactions(std::shared_ptr<EthereumPeer> _peer, RLP const& _r)
{
if (_peer->isCriticalSyncing())
{
@ -318,11 +308,11 @@ void EthereumHost::onPeerTransactions(EthereumPeer* _peer, RLP const& _r)
}
}
void EthereumHost::onPeerAborting(EthereumPeer* _peer)
void EthereumHost::onPeerAborting()
{
Guard l(x_sync);
if (m_sync)
m_sync->onPeerAborting(_peer);
m_sync->onPeerAborting();
}
bool EthereumHost::isSyncing() const

19
libethereum/EthereumHost.h

@ -86,16 +86,15 @@ public:
static char const* stateName(SyncState _s) { return s_stateNames[static_cast<int>(_s)]; }
static unsigned const c_oldProtocolVersion;
void foreachPeerPtr(std::function<bool(std::shared_ptr<EthereumPeer>)> const& _f) const;
void foreachPeer(std::function<bool(EthereumPeer*)> const& _f) const;
void onPeerStatus(EthereumPeer* _peer);
void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes);
void onPeerBlocks(EthereumPeer* _peer, RLP const& _r);
void onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes);
void onPeerNewBlock(EthereumPeer* _peer, RLP const& _r);
void onPeerTransactions(EthereumPeer* _peer, RLP const& _r);
void onPeerAborting(EthereumPeer* _peer);
void foreachPeer(std::function<bool(std::shared_ptr<EthereumPeer>)> const& _f) const;
void onPeerStatus(std::shared_ptr<EthereumPeer> _peer);
void onPeerHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes);
void onPeerBlocks(std::shared_ptr<EthereumPeer> _peer, RLP const& _r);
void onPeerNewHashes(std::shared_ptr<EthereumPeer> _peer, h256s const& _hashes);
void onPeerNewBlock(std::shared_ptr<EthereumPeer> _peer, RLP const& _r);
void onPeerTransactions(std::shared_ptr<EthereumPeer> _peer, RLP const& _r);
void onPeerAborting();
private:
static char const* const s_stateNames[static_cast<int>(SyncState::Size)];

64
libethereum/EthereumPeer.cpp

@ -49,7 +49,7 @@ string toString(Asking _a)
return "?";
}
EthereumPeer::EthereumPeer(Session* _s, HostCapabilityFace* _h, unsigned _i, CapDesc const& _cap):
EthereumPeer::EthereumPeer(std::shared_ptr<Session> _s, HostCapabilityFace* _h, unsigned _i, CapDesc const& _cap):
Capability(_s, _h, _i),
m_sub(host()->downloadMan()),
m_peerCapabilityVersion(_cap.second)
@ -71,30 +71,39 @@ EthereumPeer::~EthereumPeer()
bool EthereumPeer::isRude() const
{
return repMan().isRude(*session(), name());
auto s = session();
if (s)
return repMan().isRude(*s, name());
return false;
}
unsigned EthereumPeer::askOverride() const
{
std::string static const badGeth = "Geth/v0.9.27";
if (session()->info().clientVersion.substr(0, badGeth.size()) == badGeth)
auto s = session();
if (!s)
return c_maxBlocksAsk;
if (s->info().clientVersion.substr(0, badGeth.size()) == badGeth)
return 1;
bytes const& d = repMan().data(*session(), name());
bytes const& d = repMan().data(*s, name());
return d.empty() ? c_maxBlocksAsk : RLP(d).toInt<unsigned>(RLP::LaisezFaire);
}
void EthereumPeer::setRude()
{
auto s = session();
if (!s)
return;
auto old = askOverride();
repMan().setData(*session(), name(), rlp(askOverride() / 2 + 1));
repMan().setData(*s, name(), rlp(askOverride() / 2 + 1));
cnote << "Rude behaviour; askOverride now" << askOverride() << ", was" << old;
repMan().noteRude(*session(), name());
repMan().noteRude(*s, name());
session()->addNote("manners", "RUDE");
}
void EthereumPeer::abortSync()
{
host()->onPeerAborting(this);
host()->onPeerAborting();
}
EthereumHost* EthereumPeer::host() const
@ -153,6 +162,21 @@ void EthereumPeer::requestHashes(h256 const& _lastHash)
sealAndSend(s);
}
void EthereumPeer::requestBlocks(h256s const& _blocks)
{
setAsking(Asking::Blocks);
if (_blocks.size())
{
RLPStream s;
prep(s, GetBlocksPacket, _blocks.size());
for (auto const& i: _blocks)
s << i;
sealAndSend(s);
}
else
setIdle();
}
void EthereumPeer::requestBlocks()
{
setAsking(Asking::Blocks);
@ -167,7 +191,6 @@ void EthereumPeer::requestBlocks()
}
else
setIdle();
return;
}
void EthereumPeer::setAsking(Asking _a)
@ -175,15 +198,20 @@ void EthereumPeer::setAsking(Asking _a)
m_asking = _a;
m_lastAsk = chrono::system_clock::now();
session()->addNote("ask", _a == Asking::Nothing ? "nothing" : _a == Asking::State ? "state" : _a == Asking::Hashes ? "hashes" : _a == Asking::Blocks ? "blocks" : "?");
session()->addNote("sync", string(isCriticalSyncing() ? "ONGOING" : "holding") + (needsSyncing() ? " & needed" : ""));
auto s = session();
if (s)
{
s->addNote("ask", _a == Asking::Nothing ? "nothing" : _a == Asking::State ? "state" : _a == Asking::Hashes ? "hashes" : _a == Asking::Blocks ? "blocks" : "?");
s->addNote("sync", string(isCriticalSyncing() ? "ONGOING" : "holding") + (needsSyncing() ? " & needed" : ""));
}
}
void EthereumPeer::tick()
{
if (chrono::system_clock::now() - m_lastAsk > chrono::seconds(10) && m_asking != Asking::Nothing)
auto s = session();
if (s && (chrono::system_clock::now() - m_lastAsk > chrono::seconds(10) && m_asking != Asking::Nothing))
// timeout
session()->disconnect(PingTimeout);
s->disconnect(PingTimeout);
}
bool EthereumPeer::isConversing() const
@ -225,12 +253,12 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
clog(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << m_genesisHash << "/" << m_latestBlockNumber << ", TD:" << m_totalDifficulty << "=" << m_latestHash;
setAsking(Asking::Nothing);
host()->onPeerStatus(this);
host()->onPeerStatus(dynamic_pointer_cast<EthereumPeer>(dynamic_pointer_cast<EthereumPeer>(shared_from_this())));
break;
}
case TransactionsPacket:
{
host()->onPeerTransactions(this, _r);
host()->onPeerTransactions(dynamic_pointer_cast<EthereumPeer>(dynamic_pointer_cast<EthereumPeer>(shared_from_this())), _r);
break;
}
case GetBlockHashesPacket:
@ -285,7 +313,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
for (unsigned i = 0; i < itemCount; ++i)
hashes[i] = _r[i].toHash<h256>();
host()->onPeerHashes(this, hashes);
host()->onPeerHashes(dynamic_pointer_cast<EthereumPeer>(shared_from_this()), hashes);
break;
}
case GetBlocksPacket:
@ -327,12 +355,12 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
if (m_asking != Asking::Blocks)
clog(NetImpolite) << "Peer giving us blocks when we didn't ask for them.";
else
host()->onPeerBlocks(this, _r);
host()->onPeerBlocks(dynamic_pointer_cast<EthereumPeer>(shared_from_this()), _r);
break;
}
case NewBlockPacket:
{
host()->onPeerNewBlock(this, _r);
host()->onPeerNewBlock(dynamic_pointer_cast<EthereumPeer>(shared_from_this()), _r);
break;
}
case NewBlockHashesPacket:
@ -344,7 +372,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
for (unsigned i = 0; i < itemCount; ++i)
hashes[i] = _r[i].toHash<h256>();
host()->onPeerNewHashes(this, hashes);
host()->onPeerNewHashes(dynamic_pointer_cast<EthereumPeer>(shared_from_this()), hashes);
break;
}
default:

5
libethereum/EthereumPeer.h

@ -56,7 +56,7 @@ class EthereumPeer: public p2p::Capability
public:
/// Basic constructor.
EthereumPeer(p2p::Session* _s, p2p::HostCapabilityFace* _h, unsigned _i, p2p::CapDesc const& _cap);
EthereumPeer(std::shared_ptr<p2p::Session> _s, p2p::HostCapabilityFace* _h, unsigned _i, p2p::CapDesc const& _cap);
/// Basic destructor.
virtual ~EthereumPeer();
@ -85,6 +85,9 @@ public:
/// Request blocks. Uses block download manager.
void requestBlocks();
/// Request specified blocks from peer.
void requestBlocks(h256s const& _blocks);
/// Check if this node is rude.
bool isRude() const;

12
libp2p/Capability.cpp

@ -28,14 +28,14 @@ using namespace std;
using namespace dev;
using namespace dev::p2p;
Capability::Capability(Session* _s, HostCapabilityFace* _h, unsigned _idOffset): m_session(_s), m_hostCap(_h), m_idOffset(_idOffset)
Capability::Capability(std::shared_ptr<Session> _s, HostCapabilityFace* _h, unsigned _idOffset): m_session(_s), m_hostCap(_h), m_idOffset(_idOffset)
{
clog(NetConnect) << "New session for capability" << m_hostCap->name() << "; idOffset:" << m_idOffset;
}
void Capability::disable(std::string const& _problem)
{
clog(NetWarn) << "DISABLE: Disabling capability '" << m_hostCap->name() << "'. Reason:" << _problem;
clog(NetNote) << "DISABLE: Disabling capability '" << m_hostCap->name() << "'. Reason:" << _problem;
m_enabled = false;
}
@ -46,12 +46,16 @@ RLPStream& Capability::prep(RLPStream& _s, unsigned _id, unsigned _args)
void Capability::sealAndSend(RLPStream& _s)
{
m_session->sealAndSend(_s);
shared_ptr<Session> session = m_session.lock();
if (session)
session->sealAndSend(_s);
}
void Capability::addRating(int _r)
{
m_session->addRating(_r);
shared_ptr<Session> session = m_session.lock();
if (session)
session->addRating(_r);
}
ReputationManager& Capability::repMan() const

8
libp2p/Capability.h

@ -31,12 +31,12 @@ namespace p2p
class ReputationManager;
class Capability
class Capability: public std::enable_shared_from_this<Capability>
{
friend class Session;
public:
Capability(Session* _s, HostCapabilityFace* _h, unsigned _idOffset);
Capability(std::shared_ptr<Session> _s, HostCapabilityFace* _h, unsigned _idOffset);
virtual ~Capability() {}
// Implement these in the derived class.
@ -44,7 +44,7 @@ public:
static u256 version() { return 0; }
static unsigned messageCount() { return 0; }
*/
Session* session() const { return m_session; }
std::shared_ptr<Session> session() const { return m_session.lock(); }
HostCapabilityFace* hostCapability() const { return m_hostCap; }
Host* host() const { return m_hostCap->host(); }
ReputationManager& repMan() const;
@ -59,7 +59,7 @@ protected:
void addRating(int _r);
private:
Session* m_session;
std::weak_ptr<Session> m_session;
HostCapabilityFace* m_hostCap;
bool m_enabled = true;
unsigned m_idOffset;

2
libp2p/Host.cpp

@ -284,7 +284,7 @@ void Host::startPeerSession(Public const& _id, RLP const& _rlp, RLPXFrameCoder*
for (auto const& i: caps)
if (haveCapability(i))
{
ps->m_capabilities[i] = shared_ptr<Capability>(m_capabilities[i]->newPeerCapability(ps.get(), o, i));
ps->m_capabilities[i] = shared_ptr<Capability>(m_capabilities[i]->newPeerCapability(ps, o, i));
o += m_capabilities[i]->messageCount();
}
ps->start();

4
libp2p/HostCapability.h

@ -53,7 +53,7 @@ protected:
virtual u256 version() const = 0;
CapDesc capDesc() const { return std::make_pair(name(), version()); }
virtual unsigned messageCount() const = 0;
virtual Capability* newPeerCapability(Session* _s, unsigned _idOffset, CapDesc const& _cap) = 0;
virtual Capability* newPeerCapability(std::shared_ptr<Session> _s, unsigned _idOffset, CapDesc const& _cap) = 0;
virtual void onStarting() {}
virtual void onStopping() {}
@ -77,7 +77,7 @@ protected:
virtual std::string name() const { return PeerCap::name(); }
virtual u256 version() const { return PeerCap::version(); }
virtual unsigned messageCount() const { return PeerCap::messageCount(); }
virtual Capability* newPeerCapability(Session* _s, unsigned _idOffset, CapDesc const& _cap) { return new PeerCap(_s, this, _idOffset, _cap); }
virtual Capability* newPeerCapability(std::shared_ptr<Session> _s, unsigned _idOffset, CapDesc const& _cap) { return new PeerCap(_s, this, _idOffset, _cap); }
};
}

2
libwhisper/WhisperPeer.cpp

@ -29,7 +29,7 @@ using namespace dev;
using namespace dev::p2p;
using namespace dev::shh;
WhisperPeer::WhisperPeer(Session* _s, HostCapabilityFace* _h, unsigned _i, CapDesc const&): Capability(_s, _h, _i)
WhisperPeer::WhisperPeer(std::shared_ptr<Session> _s, HostCapabilityFace* _h, unsigned _i, CapDesc const&): Capability(_s, _h, _i)
{
RLPStream s;
sealAndSend(prep(s, StatusPacket, 1) << version());

2
libwhisper/WhisperPeer.h

@ -51,7 +51,7 @@ class WhisperPeer: public Capability
friend class WhisperHost;
public:
WhisperPeer(Session* _s, HostCapabilityFace* _h, unsigned _i, CapDesc const& _cap);
WhisperPeer(std::shared_ptr<Session> _s, HostCapabilityFace* _h, unsigned _i, CapDesc const& _cap);
virtual ~WhisperPeer();
static std::string name() { return "shh"; }

2
test/libp2p/capability.cpp

@ -42,7 +42,7 @@ struct P2PFixture
class TestCapability: public Capability
{
public:
TestCapability(Session* _s, HostCapabilityFace* _h, unsigned _idOffset, CapDesc const&): Capability(_s, _h, _idOffset), m_cntReceivedMessages(0), m_testSum(0) {}
TestCapability(std::shared_ptr<Session> _s, HostCapabilityFace* _h, unsigned _idOffset, CapDesc const&): Capability(_s, _h, _idOffset), m_cntReceivedMessages(0), m_testSum(0) {}
virtual ~TestCapability() {}
int countReceivedMessages() { return m_cntReceivedMessages; }
int testSum() { return m_testSum; }

Loading…
Cancel
Save