Browse Source

More work on the peer state transition system

cl-refactor
Gav Wood 11 years ago
parent
commit
6df207655c
  1. 39
      libethereum/EthereumHost.cpp
  2. 6
      libethereum/EthereumHost.h
  3. 325
      libethereum/EthereumPeer.cpp
  4. 18
      libethereum/EthereumPeer.h

39
libethereum/EthereumHost.cpp

@ -90,48 +90,43 @@ void EthereumHost::notePeerStateChanged(EthereumPeer* _who)
} }
// otherwise check to see if we should be downloading... // otherwise check to see if we should be downloading...
_who->tryGrabbingHashChain(); _who->attemptSyncing();
} }
void EthereumHost::updateGrabbing(Asking _g) void EthereumHost::updateGrabbing(Asking _g, EthereumPeer* _ignore)
{ {
m_grabbing = _g; m_grabbing = _g;
if (_g == Asking::Nothing) if (_g == Asking::Nothing)
readyForSync(); readyForSync();
else if (_g == Asking::Chain) else if (_g == Asking::Blocks)
for (auto j: peers()) for (auto j: peers())
j->cap<EthereumPeer>()->ensureGettingChain(); if (j->cap<EthereumPeer>().get() != _ignore && j->cap<EthereumPeer>()->m_asking == Asking::Nothing)
j->cap<EthereumPeer>()->transition(Asking::Blocks);
} }
void EthereumHost::noteHaveChain(EthereumPeer* _from) bool EthereumHost::shouldGrabBlocks(EthereumPeer* _from)
{ {
auto td = _from->m_totalDifficulty; auto td = _from->m_syncingTotalDifficulty;
auto lh = _from->m_syncingLatestHash;
if (_from->m_neededBlocks.empty()) if (_from->m_syncingNeededBlocks.empty())
{ {
_from->setGrabbing(Asking::Nothing);
updateGrabbing(Asking::Nothing); updateGrabbing(Asking::Nothing);
return; return false;
} }
clog(NetNote) << "Hash-chain COMPLETE:" << _from->m_totalDifficulty << "vs" << m_chain.details().totalDifficulty << ";" << _from->m_neededBlocks.size() << " blocks, ends" << _from->m_neededBlocks.back().abridged(); clog(NetNote) << "Hash-chain COMPLETE:" << td << "vs" << m_chain.details().totalDifficulty << ";" << _from->m_syncingNeededBlocks.size() << " blocks, ends" << _from->m_syncingNeededBlocks.back().abridged();
if (td < m_chain.details().totalDifficulty || (td == m_chain.details().totalDifficulty && m_chain.currentHash() == _from->m_latestHash)) if (td < m_chain.details().totalDifficulty || (td == m_chain.details().totalDifficulty && m_chain.currentHash() == lh))
{ {
clog(NetNote) << "Difficulty of hashchain not HIGHER. Ignoring."; clog(NetNote) << "Difficulty of hashchain not HIGHER. Ignoring.";
_from->setGrabbing(Asking::Nothing);
updateGrabbing(Asking::Nothing); updateGrabbing(Asking::Nothing);
return; return false;
} }
clog(NetNote) << "Difficulty of hashchain HIGHER. Replacing fetch queue [latest now" << _from->m_latestHash.abridged() << ", was" << m_latestBlockSent.abridged() << "]"; clog(NetNote) << "Difficulty of hashchain HIGHER. Replacing fetch queue [latest now" << lh.abridged() << ", was" << m_latestBlockSent.abridged() << "]";
// Looks like it's the best yet for total difficulty. Set to download.
m_man.resetToChain(_from->m_neededBlocks);
m_latestBlockSent = _from->m_latestHash;
_from->setGrabbing(Asking::Chain); return true;
updateGrabbing(Asking::Chain);
} }
void EthereumHost::readyForSync() void EthereumHost::readyForSync()
@ -139,7 +134,7 @@ void EthereumHost::readyForSync()
// start grabbing next hash chain if there is one. // start grabbing next hash chain if there is one.
for (auto j: peers()) for (auto j: peers())
{ {
j->cap<EthereumPeer>()->tryGrabbingHashChain(); j->cap<EthereumPeer>()->attemptSyncing();
if (j->cap<EthereumPeer>()->m_grabbing == Asking::Hashes) if (j->cap<EthereumPeer>()->m_grabbing == Asking::Hashes)
{ {
m_grabbing = Asking::Hashes; m_grabbing = Asking::Hashes;
@ -158,7 +153,7 @@ void EthereumHost::noteDoneBlocks(EthereumPeer* _who)
updateGrabbing(Asking::Nothing); updateGrabbing(Asking::Nothing);
m_man.reset(); m_man.reset();
} }
if (_who->m_grabbing == Asking::Chain) if (_who->isSyncing())
{ {
// Done our chain-get. // Done our chain-get.
clog(NetNote) << "Chain download failed. Peer with blocks didn't have them all. This peer is bad and should be punished."; clog(NetNote) << "Chain download failed. Peer with blocks didn't have them all. This peer is bad and should be punished.";

6
libethereum/EthereumHost.h

@ -77,8 +77,10 @@ private:
/// Session wants to pass us a block that we might not have. /// Session wants to pass us a block that we might not have.
/// @returns true if we didn't have it. /// @returns true if we didn't have it.
bool noteBlock(h256 _hash, bytesConstRef _data); bool noteBlock(h256 _hash, bytesConstRef _data);
/// Session has finished getting the chain of hashes. /// Session has finished getting the chain of hashes.
void noteHaveChain(EthereumPeer* _who); bool shouldGrabBlocks(EthereumPeer* _who);
/// Called when the peer can no longer provide us with any needed blocks. /// Called when the peer can no longer provide us with any needed blocks.
void noteDoneBlocks(EthereumPeer* _who); void noteDoneBlocks(EthereumPeer* _who);
@ -103,7 +105,7 @@ private:
virtual void onStopping() { stopWorking(); } virtual void onStopping() { stopWorking(); }
void readyForSync(); void readyForSync();
void updateGrabbing(Asking _g); void updateGrabbing(Asking _g, EthereumPeer* _ignore);
BlockChain const& m_chain; BlockChain const& m_chain;
TransactionQueue& m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain. TransactionQueue& m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain.

325
libethereum/EthereumPeer.cpp

@ -38,7 +38,7 @@ EthereumPeer::EthereumPeer(Session* _s, HostCapabilityFace* _h):
Capability(_s, _h), Capability(_s, _h),
m_sub(host()->m_man) m_sub(host()->m_man)
{ {
setAsking(Asking::State, Syncing::Done); setAsking(Asking::State, false);
sendStatus(); sendStatus();
} }
@ -65,20 +65,133 @@ void EthereumPeer::sendStatus()
sealAndSend(s); sealAndSend(s);
} }
void EthereumPeer::startInitialSync() /*
{ * Possible asking/syncing states for two peers:
// Grab transactions off them. */
void EthereumPeer::transition(Asking _a)
{ {
RLPStream s; RLPStream s;
prep(s).appendList(1); prep(s);
s << GetTransactionsPacket; if (_a == Asking::Hashes)
{
if (m_asking == Asking::State || m_asking == Asking::Nothing)
{
if (isSyncing())
clogS(NetWarn) << "Bad state: not asking for Hashes, yet syncing!";
m_syncingLatestHash = m_latestHash;
m_syncingTotalDifficulty = m_totalDifficulty;
m_latestHash = h256();
setAsking(_a, true);
s.appendList(3) << GetBlockHashesPacket << m_syncingLatestHash << c_maxHashesAsk;
m_syncingNeededBlocks = h256s(1, m_syncingLatestHash);
host()->updateGrabbing(Asking::Hashes);
sealAndSend(s); sealAndSend(s);
return;
}
else if (m_asking == Asking::Hashes)
{
if (!isSyncing())
clogS(NetWarn) << "Bad state: asking for Hashes yet not syncing!";
setAsking(_a, true);
s.appendList(3) << GetBlockHashesPacket << m_syncingNeededBlocks.back() << c_maxHashesAsk;
sealAndSend(s);
return;
}
}
else if (_a == Asking::Blocks)
{
if (m_asking == Asking::Hashes)
{
if (host()->shouldGrabBlocks(this))
{
host()->m_man.resetToChain(m_syncingNeededBlocks);
host()->m_latestBlockSent = m_syncingLatestHash;
host()->updateGrabbing(Asking::Blocks, this);
}
else
{
setAsking(Asking::Nothing, false);
return;
}
}
// run through into...
if (m_asking == Asking::Nothing || m_asking == Asking::Hashes || m_asking == Asking::Blocks)
{
// Looks like it's the best yet for total difficulty. Set to download.
setAsking(Asking::Blocks, true);
auto blocks = m_sub.nextFetch(c_maxBlocksAsk);
if (blocks.size())
{
s.appendList(blocks.size() + 1) << GetBlocksPacket;
for (auto const& i: blocks)
s << i;
sealAndSend(s);
}
else
transition(Asking::Nothing);
return;
}
}
else if (_a == Asking::Nothing)
{
if (m_asking == Asking::Blocks)
{
clogS(NetNote) << "Finishing block fetch...";
// a bit overkill given that the other nodes may yet have the needed blocks, but better to be safe than sorry.
if (isSyncing())
host()->noteDoneBlocks(this);
// NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary.
m_sub.doneFetch();
setAsking(Asking::Nothing, false);
}
else if (m_asking == Asking::Hashes)
{
clogS(NetNote) << "Finishing hashes fetch...";
if (isSyncing())
host()->noteDoneBlocks(this);
setAsking(Asking::Nothing, false);
}
else if (m_asking == Asking::State)
{
setAsking(Asking::Nothing, false);
// TODO: Just got the state - should check to see if we can be of help downloading the chain if any.
// TODO: Otherwise, should put ourselves up for sync.
}
// Otherwise it's fine. We don't care if it's Nothing->Nothing.
return;
}
clogS(NetWarn) << "Invalid state transition:" << (int)_a << "from" << (int)m_asking << "/" << boolalpha << isSyncing() << needsSyncing();
} }
void EthereumPeer::setAsking(Asking _a, bool _isSyncing)
{
m_asking = _a;
m_isSyncing = _isSyncing;
session()->addNote("ask", _a == Asking::Nothing ? "nothing" : _a == Asking::State ? "state" : _a == Asking::Hashes ? "hashes" : _a == Asking::Blocks ? "blocks" : "?");
session()->addNote("sync", string(isSyncing() ? "ongoing" : "holding") + (needsSyncing() ? " & needed" : ""));
}
void EthereumPeer::setNeedsSyncing(h256 _latestHash, u256 _td)
{
m_latestHash = _latestHash;
m_totalDifficulty = _td;
// TODO: should be "noteNeedsSyncing" or some such.
host()->notePeerStateChanged(this); host()->notePeerStateChanged(this);
} }
void EthereumPeer::tryGrabbingHashChain() void EthereumPeer::attemptSyncing()
{ {
if (m_asking != Asking::Nothing) if (m_asking != Asking::Nothing)
{ {
@ -87,7 +200,7 @@ void EthereumPeer::tryGrabbingHashChain()
} }
// if already done this, then ignore. // if already done this, then ignore.
if (m_syncing == Syncing::Done) if (!needsSyncing())
{ {
clogS(NetAllDetail) << "Already synced with this peer."; clogS(NetAllDetail) << "Already synced with this peer.";
return; return;
@ -101,39 +214,15 @@ void EthereumPeer::tryGrabbingHashChain()
if (td >= m_totalDifficulty) if (td >= m_totalDifficulty)
{ {
clogS(NetAllDetail) << "No. Our chain is better."; clogS(NetAllDetail) << "No. Our chain is better.";
setAsking(Asking::Nothing, Syncing::Done); transition(Asking::Nothing);
return; // All good - we have the better chain.
} }
else
// Our chain isn't better - grab theirs.
{ {
clogS(NetAllDetail) << "Yes. Their chain is better."; clogS(NetAllDetail) << "Yes. Their chain is better.";
transition(Asking::Hashes);
host()->updateGrabbing(Asking::Hashes);
setAsking(Asking::Hashes, Syncing::Executing);
RLPStream s;
prep(s).appendList(3);
s << GetBlockHashesPacket << m_latestHash << c_maxHashesAsk;
m_neededBlocks = h256s(1, m_latestHash);
sealAndSend(s);
} }
} }
void EthereumPeer::finishSync()
{
clogS(NetNote) << "Finishing fetch...";
// a bit overkill given that the other nodes may yet have the needed blocks, but better to be safe than sorry.
if (m_asking == Asking::Blocks || m_asking == Asking::ChainHelper)
{
host()->noteDoneBlocks(this);
setAsking(Asking::Nothing);
}
// NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary.
m_sub.doneFetch();
}
bool EthereumPeer::interpret(RLP const& _r) bool EthereumPeer::interpret(RLP const& _r)
{ {
switch (_r[0].toInt<unsigned>()) switch (_r[0].toInt<unsigned>())
@ -142,8 +231,8 @@ bool EthereumPeer::interpret(RLP const& _r)
{ {
m_protocolVersion = _r[1].toInt<unsigned>(); m_protocolVersion = _r[1].toInt<unsigned>();
m_networkId = _r[2].toInt<u256>(); m_networkId = _r[2].toInt<u256>();
m_totalDifficulty = _r[3].toInt<u256>(); auto totalDifficulty = _r[3].toInt<u256>();
m_latestHash = _r[4].toHash<h256>(); auto latestHash = _r[4].toHash<h256>();
auto genesisHash = _r[5].toHash<h256>(); auto genesisHash = _r[5].toHash<h256>();
clogS(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << genesisHash.abridged() << ", TD:" << m_totalDifficulty << "=" << m_latestHash.abridged(); clogS(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << genesisHash.abridged() << ", TD:" << m_totalDifficulty << "=" << m_latestHash.abridged();
@ -157,7 +246,15 @@ bool EthereumPeer::interpret(RLP const& _r)
else if (session()->info().clientVersion.find("/v0.6.9/") != string::npos) else if (session()->info().clientVersion.find("/v0.6.9/") != string::npos)
disable("Blacklisted client version."); disable("Blacklisted client version.");
else else
startInitialSync(); {
// Grab transactions off them.
RLPStream s;
prep(s).appendList(1);
s << GetTransactionsPacket;
sealAndSend(s);
setNeedsSyncing(latestHash, totalDifficulty);
}
break; break;
} }
case GetTransactionsPacket: case GetTransactionsPacket:
@ -205,7 +302,7 @@ bool EthereumPeer::interpret(RLP const& _r)
} }
if (_r.itemCount() == 1) if (_r.itemCount() == 1)
{ {
host()->noteHaveChain(this); transition(Asking::Blocks);
return true; return true;
} }
for (unsigned i = 1; i < _r.itemCount(); ++i) for (unsigned i = 1; i < _r.itemCount(); ++i)
@ -213,17 +310,14 @@ bool EthereumPeer::interpret(RLP const& _r)
auto h = _r[i].toHash<h256>(); auto h = _r[i].toHash<h256>();
if (host()->m_chain.isKnown(h)) if (host()->m_chain.isKnown(h))
{ {
host()->noteHaveChain(this); transition(Asking::Blocks);
return true; return true;
} }
else else
m_neededBlocks.push_back(h); m_syncingNeededBlocks.push_back(h);
} }
// run through - ask for more. // run through - ask for more.
RLPStream s; transition(Asking::Hashes);
prep(s).appendList(3);
s << GetBlockHashesPacket << m_neededBlocks.back() << c_maxHashesAsk;
sealAndSend(s);
break; break;
} }
case GetBlocksPacket: case GetBlocksPacket:
@ -245,40 +339,6 @@ bool EthereumPeer::interpret(RLP const& _r)
sealAndSend(prep(s).appendList(n + 1).append(BlocksPacket).appendRaw(rlp, n)); sealAndSend(prep(s).appendList(n + 1).append(BlocksPacket).appendRaw(rlp, n));
break; break;
} }
case NewBlockPacket:
{
auto h = BlockInfo::headerHash(bd(_r[1].data()));
clogS(NetMessageSummary) << "NewBlock: " << h.abridged();
if (_r.itemCount() != 3)
disable("NewBlock without 2 data fields.");
else
{
switch (host()->m_bq.import(_r[1].data(), host()->m_chain))
{
case ImportResult::Success:
case ImportResult::FutureTime:
addRating(1);
break;
case ImportResult::Malformed:
disable("Malformed block received.");
break;
case ImportResult::AlreadyInChain:
case ImportResult::AlreadyKnown:
break;
case ImportResult::UnknownParent:
clogS(NetMessageSummary) << "Received block with no known parent. Resyncing...";
setNeedsSyncing(h, _r[2].toInt<u256>());
break;
}
Guard l(x_knownBlocks);
m_knownBlocks.insert(h);
}
break;
}
case BlocksPacket: case BlocksPacket:
{ {
clogS(NetMessageSummary) << "Blocks (" << dec << (_r.itemCount() - 1) << "entries)" << (_r.itemCount() - 1 ? "" : ": NoMoreBlocks"); clogS(NetMessageSummary) << "Blocks (" << dec << (_r.itemCount() - 1) << "entries)" << (_r.itemCount() - 1 ? "" : ": NoMoreBlocks");
@ -289,8 +349,7 @@ bool EthereumPeer::interpret(RLP const& _r)
if (_r.itemCount() == 1) if (_r.itemCount() == 1)
{ {
// Got to this peer's latest block - just give up. // Got to this peer's latest block - just give up.
if (m_asking == Asking::Blocks) transition(Asking::Nothing);
finishSync();
break; break;
} }
@ -335,31 +394,44 @@ bool EthereumPeer::interpret(RLP const& _r)
} }
} }
unsigned knownParents = 0; clogS(NetMessageSummary) << dec << success << "imported OK," << unknown << "with unknown parents," << future << "with future timestamps," << got << " already known.";
unsigned unknownParents = 0;
if (g_logVerbosity >= NetMessageSummary::verbosity) if (m_asking == Asking::Blocks)
{ transition(Asking::Blocks);
unsigned ic = _r.itemCount(); break;
for (unsigned i = 1; i < ic; ++i)
{
auto h = BlockInfo::headerHash(_r[i].data());
BlockInfo bi(_r[i].data());
Guard l(x_knownBlocks);
if (!host()->m_chain.details(bi.parentHash) && !m_knownBlocks.count(bi.parentHash))
{
unknownParents++;
clogS(NetAllDetail) << "Unknown parent" << bi.parentHash.abridged() << "of block" << h.abridged();
} }
case NewBlockPacket:
{
auto h = BlockInfo::headerHash(bd(_r[1].data()));
clogS(NetMessageSummary) << "NewBlock: " << h.abridged();
if (_r.itemCount() != 3)
disable("NewBlock without 2 data fields.");
else else
{ {
knownParents++; switch (host()->m_bq.import(_r[1].data(), host()->m_chain))
clogS(NetAllDetail) << "Known parent" << bi.parentHash.abridged() << "of block" << h.abridged(); {
} case ImportResult::Success:
case ImportResult::FutureTime:
addRating(1);
break;
case ImportResult::Malformed:
disable("Malformed block received.");
break;
case ImportResult::AlreadyInChain:
case ImportResult::AlreadyKnown:
break;
case ImportResult::UnknownParent:
clogS(NetMessageSummary) << "Received block with no known parent. Resyncing...";
setNeedsSyncing(h, _r[2].toInt<u256>());
break;
} }
Guard l(x_knownBlocks);
m_knownBlocks.insert(h);
} }
clogS(NetMessageSummary) << dec << success << "known parents," << unknownParents << "unknown," << used << "used.";
if (m_asking == Asking::Blocks)
continueSync();
break; break;
} }
default: default:
@ -367,54 +439,3 @@ bool EthereumPeer::interpret(RLP const& _r)
} }
return true; return true;
} }
void EthereumPeer::ensureAskingBlocks()
{
if (m_asking != Asking::Nothing)
return; // Already asked & waiting for some.
continueSync();
}
void EthereumPeer::continueSync()
{
// If we're getting the hashes already, then we shouldn't be asking for the chain.
if (m_asking == Asking::Hashes)
return;
auto blocks = m_sub.nextFetch(c_maxBlocksAsk);
if (blocks.size())
{
RLPStream s;
prep(s);
s.appendList(blocks.size() + 1) << GetBlocksPacket;
for (auto const& i: blocks)
s << i;
sealAndSend(s);
}
else
finishSync();
}
/*
* Possible asking/syncing states for two peers:
* state/ presync
* presync hashes
* presync chain (transiently)
* presync+ chain
* presync nothing
* hashes nothing
* chain hashes
* presync chain (transiently)
* presync+ chain
* presync nothing
*/
void EthereumPeer::setAsking(Asking _a, bool _isSyncing)
{
m_asking = _a;
m_isSyncing = _isSyncing;
session()->addNote("ask", _a == Asking::Nothing ? "nothing" : _a == Asking::State ? "state" : _a == Asking::Hashes ? "hashes" : _a == Asking::Blocks ? "blocks" : "?");
session()->addNote("sync", string(isSyncing() ? "ongoing" : "holding") + (needsSyncing() ? "needed" : "ok"));
}

18
libethereum/EthereumPeer.h

@ -42,6 +42,7 @@ namespace eth
/** /**
* @brief The EthereumPeer class * @brief The EthereumPeer class
* @todo Document fully. * @todo Document fully.
* @todo make state transitions thread-safe.
*/ */
class EthereumPeer: public p2p::Capability class EthereumPeer: public p2p::Capability
{ {
@ -59,21 +60,20 @@ private:
virtual bool interpret(RLP const& _r); virtual bool interpret(RLP const& _r);
void sendStatus(); void sendStatus();
void startInitialSync();
void tryGrabbingHashChain(); void transition(Asking _wantState);
void attemptSyncing();
/// Ensure that we are waiting for a bunch of blocks from our peer. /// Ensure that we are waiting for a bunch of blocks from our peer.
void ensureAskingBlocks(); void ensureAskingBlocks();
/// Ensure that we are waiting for a bunch of blocks from our peer.
void continueSync();
void finishSync(); void finishSync();
void clearKnownTransactions() { std::lock_guard<std::mutex> l(x_knownTransactions); m_knownTransactions.clear(); } void clearKnownTransactions() { std::lock_guard<std::mutex> l(x_knownTransactions); m_knownTransactions.clear(); }
void setAsking(Asking _g, bool _isSyncing); void setAsking(Asking _g, bool _isSyncing);
void setNeedsSyncing(h256 _latestHash, u256 _td) { m_latestHash = _latestHash; m_totalDifficulty = _td; } void setNeedsSyncing(h256 _latestHash, u256 _td);
bool needsSyncing() const { return !!m_latestHash; } bool needsSyncing() const { return !!m_latestHash; }
bool isSyncing() const { return m_isSyncing; } bool isSyncing() const { return m_isSyncing; }
@ -89,13 +89,15 @@ private:
bool m_isSyncing = false; bool m_isSyncing = false;
/// These are determined through either a Status message or from NewBlock. /// These are determined through either a Status message or from NewBlock.
h256 m_latestHash; ///< Peer's latest block's hash. h256 m_latestHash; ///< Peer's latest block's hash that we know about or default null value if no need to sync.
u256 m_totalDifficulty; ///< Peer's latest block's total difficulty. u256 m_totalDifficulty; ///< Peer's latest block's total difficulty.
/// Once a sync is started on this peer, they are cleared. /// Once a sync is started on this peer, they are cleared and moved into m_syncing*.
/// This is built as we ask for hashes. Once no more hashes are given, we present this to the /// This is built as we ask for hashes. Once no more hashes are given, we present this to the
/// host who initialises the DownloadMan and m_sub becomes active for us to begin asking for blocks. /// host who initialises the DownloadMan and m_sub becomes active for us to begin asking for blocks.
h256s m_neededBlocks; ///< The blocks that we should download from this peer. h256s m_syncingNeededBlocks; ///< The blocks that we should download from this peer.
h256 m_syncingLatestHash; ///< Peer's latest block's hash, as of the current sync.
u256 m_syncingTotalDifficulty; ///< Peer's latest block's total difficulty, as of the current sync.
/// Once we're asking for blocks, this becomes in use. /// Once we're asking for blocks, this becomes in use.
DownloadSub m_sub; DownloadSub m_sub;

Loading…
Cancel
Save