Browse Source

Work on making states good and various docs.

cl-refactor
Gav Wood 10 years ago
parent
commit
f73b46d2c1
  1. 3
      libethereum/CommonNet.h
  2. 42
      libethereum/EthereumHost.cpp
  3. 6
      libethereum/EthereumHost.h
  4. 96
      libethereum/EthereumPeer.cpp
  5. 30
      libethereum/EthereumPeer.h

3
libethereum/CommonNet.h

@ -59,9 +59,10 @@ enum EthereumPacket
BlockHashesPacket,
GetBlocksPacket,
BlocksPacket,
NewBlockPacket,
};
enum class Grabbing
enum class Asking
{
State,
Hashes,

42
libethereum/EthereumHost.cpp

@ -77,28 +77,28 @@ void EthereumHost::notePeerStateChanged(EthereumPeer* _who)
// TODO: FIX: BUG: Better state management!
// if already downloading hash-chain, ignore.
if (m_grabbing != Grabbing::Nothing)
if (m_grabbing != Asking::Nothing)
{
for (auto const& i: peers())
if (i->cap<EthereumPeer>()->m_grabbing == m_grabbing || m_grabbing == Grabbing::Presync)
if (i->cap<EthereumPeer>()->m_grabbing == m_grabbing || m_grabbing == Asking::Presync)
{
clog(NetAllDetail) << "Already downloading chain. Just set to help out.";
_who->ensureGettingChain();
return;
}
m_grabbing = Grabbing::Nothing;
m_grabbing = Asking::Nothing;
}
// otherwise check to see if we should be downloading...
_who->tryGrabbingHashChain();
}
void EthereumHost::updateGrabbing(Grabbing _g)
void EthereumHost::updateGrabbing(Asking _g)
{
m_grabbing = _g;
if (_g == Grabbing::Nothing)
if (_g == Asking::Nothing)
readyForSync();
else if (_g == Grabbing::Chain)
else if (_g == Asking::Chain)
for (auto j: peers())
j->cap<EthereumPeer>()->ensureGettingChain();
}
@ -109,8 +109,8 @@ void EthereumHost::noteHaveChain(EthereumPeer* _from)
if (_from->m_neededBlocks.empty())
{
_from->setGrabbing(Grabbing::Nothing);
updateGrabbing(Grabbing::Nothing);
_from->setGrabbing(Asking::Nothing);
updateGrabbing(Asking::Nothing);
return;
}
@ -119,8 +119,8 @@ void EthereumHost::noteHaveChain(EthereumPeer* _from)
if (td < m_chain.details().totalDifficulty || (td == m_chain.details().totalDifficulty && m_chain.currentHash() == _from->m_latestHash))
{
clog(NetNote) << "Difficulty of hashchain not HIGHER. Ignoring.";
_from->setGrabbing(Grabbing::Nothing);
updateGrabbing(Grabbing::Nothing);
_from->setGrabbing(Asking::Nothing);
updateGrabbing(Asking::Nothing);
return;
}
@ -130,8 +130,8 @@ void EthereumHost::noteHaveChain(EthereumPeer* _from)
m_man.resetToChain(_from->m_neededBlocks);
m_latestBlockSent = _from->m_latestHash;
_from->setGrabbing(Grabbing::Chain);
updateGrabbing(Grabbing::Chain);
_from->setGrabbing(Asking::Chain);
updateGrabbing(Asking::Chain);
}
void EthereumHost::readyForSync()
@ -140,9 +140,9 @@ void EthereumHost::readyForSync()
for (auto j: peers())
{
j->cap<EthereumPeer>()->tryGrabbingHashChain();
if (j->cap<EthereumPeer>()->m_grabbing == Grabbing::Hashes)
if (j->cap<EthereumPeer>()->m_grabbing == Asking::Hashes)
{
m_grabbing = Grabbing::Hashes;
m_grabbing = Asking::Hashes;
return;
}
}
@ -155,15 +155,15 @@ void EthereumHost::noteDoneBlocks(EthereumPeer* _who)
{
// Done our chain-get.
clog(NetNote) << "Chain download complete.";
updateGrabbing(Grabbing::Nothing);
updateGrabbing(Asking::Nothing);
m_man.reset();
}
if (_who->m_grabbing == Grabbing::Chain)
if (_who->m_grabbing == Asking::Chain)
{
// Done our chain-get.
clog(NetNote) << "Chain download failed. Peer with blocks didn't have them all. This peer is bad and should be punished.";
// TODO: note that peer is BADBADBAD!
updateGrabbing(Grabbing::Nothing);
updateGrabbing(Asking::Nothing);
m_man.reset();
}
}
@ -192,7 +192,7 @@ void EthereumHost::doWork()
void EthereumHost::maintainTransactions(TransactionQueue& _tq, h256 _currentHash)
{
bool resendAll = (m_grabbing == Grabbing::Nothing && m_chain.isKnown(m_latestBlockSent) && _currentHash != m_latestBlockSent);
bool resendAll = (m_grabbing == Asking::Nothing && m_chain.isKnown(m_latestBlockSent) && _currentHash != m_latestBlockSent);
{
lock_guard<recursive_mutex> l(m_incomingLock);
for (auto it = m_incomingTransactions.begin(); it != m_incomingTransactions.end(); ++it)
@ -218,7 +218,7 @@ void EthereumHost::maintainTransactions(TransactionQueue& _tq, h256 _currentHash
}
ep->clearKnownTransactions();
if (n)
if (n || ep->m_requireTransactions)
{
RLPStream ts;
EthereumPeer::prep(ts);
@ -233,7 +233,7 @@ void EthereumHost::maintainTransactions(TransactionQueue& _tq, h256 _currentHash
void EthereumHost::reset()
{
m_grabbing = Grabbing::Nothing;
m_grabbing = Asking::Nothing;
m_man.resetToChain(h256s());
@ -247,7 +247,7 @@ void EthereumHost::reset()
void EthereumHost::maintainBlocks(BlockQueue& _bq, h256 _currentHash)
{
// If we've finished our initial sync send any new blocks.
if (m_grabbing == Grabbing::Nothing && m_chain.isKnown(m_latestBlockSent) && m_chain.details(m_latestBlockSent).totalDifficulty < m_chain.details(_currentHash).totalDifficulty)
if (m_grabbing == Asking::Nothing && m_chain.isKnown(m_latestBlockSent) && m_chain.details(m_latestBlockSent).totalDifficulty < m_chain.details(_currentHash).totalDifficulty)
{
// TODO: clean up
h256s hs;

6
libethereum/EthereumHost.h

@ -70,7 +70,7 @@ public:
void reset();
DownloadMan const& downloadMan() const { return m_man; }
bool isSyncing() const { return m_grabbing == Grabbing::Chain; }
bool isSyncing() const { return m_grabbing == Asking::Chain; }
private:
void noteHavePeerState(EthereumPeer* _who);
@ -103,7 +103,7 @@ private:
virtual void onStopping() { stopWorking(); }
void readyForSync();
void updateGrabbing(Grabbing _g);
void updateGrabbing(Asking _g);
BlockChain const& m_chain;
TransactionQueue& m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain.
@ -111,7 +111,7 @@ private:
u256 m_networkId;
Grabbing m_grabbing = Grabbing::Nothing; // TODO: needs to be thread-safe & switch to just having a peer id.
Asking m_grabbing = Asking::Nothing; // TODO: needs to be thread-safe & switch to just having a peer id.
DownloadMan m_man;

96
libethereum/EthereumPeer.cpp

@ -44,7 +44,7 @@ EthereumPeer::EthereumPeer(Session* _s, HostCapabilityFace* _h):
EthereumPeer::~EthereumPeer()
{
giveUpOnFetch();
finishSync();
}
EthereumHost* EthereumPeer::host() const
@ -119,7 +119,7 @@ void EthereumPeer::tryGrabbingHashChain()
}
}
void EthereumPeer::giveUpOnFetch()
void EthereumPeer::finishSync()
{
clogS(NetNote) << "Finishing fetch...";
@ -169,12 +169,12 @@ bool EthereumPeer::interpret(RLP const& _r)
{
clogS(NetMessageSummary) << "Transactions (" << dec << (_r.itemCount() - 1) << "entries)";
addRating(_r.itemCount() - 1);
RecursiveGuard l(m_incomingLock);
Guard l(x_knownTransactions);
for (unsigned i = 1; i < _r.itemCount(); ++i)
{
m_incomingTransactions.push_back(_r[i].data().toBytes());
m_knownTransactions.insert(sha3(_r[i].data()));
if (!_tq.import(_r[i].data())) // if we already had the transaction, then don't bother sending it on.
host()->m_transactionsSent.insert(sha3(*it));
}
break;
}
@ -245,23 +245,59 @@ bool EthereumPeer::interpret(RLP const& _r)
sealAndSend(prep(s).appendList(n + 1).append(BlocksPacket).appendRaw(rlp, n));
break;
}
case NewBlockPacket:
{
auto h = BlockInfo::headerHash(bd(_r[1].data()));
clogS(NetMessageSummary) << "NewBlock: " << h.abridged();
if (_r.itemCount() != 3)
disable("NewBlock without 2 data fields.");
else
{
switch (host()->m_bq.import(_r[1].data(), host()->m_chain))
{
case ImportResult::Success:
case ImportResult::FutureTime:
addRating(1);
break;
case ImportResult::Malformed:
disable("Malformed block received.");
break;
case ImportResult::AlreadyInChain:
case ImportResult::AlreadyKnown:
break;
case ImportResult::UnknownParent:
clogS(NetMessageSummary) << "Received block with no known parent. Resyncing...";
setNeedsSyncing(h, _r[2].toInt<u256>());
break;
}
Guard l(x_knownBlocks);
m_knownBlocks.insert(h);
}
break;
}
case BlocksPacket:
{
clogS(NetMessageSummary) << "Blocks (" << dec << (_r.itemCount() - 1) << "entries)" << (_r.itemCount() - 1 ? "" : ": NoMoreBlocks");
if (m_asking != Asking::Blocks)
clogS(NetWarn) << "Unexpected Blocks received!";
if (_r.itemCount() == 1)
{
// Couldn't get any from last batch - probably got to this peer's latest block - just give up.
if (m_asking == Asking::Blocks || m_asking == Asking::ChainHelper)
giveUpOnFetch();
// Got to this peer's latest block - just give up.
if (m_asking == Asking::Blocks)
finishSync();
break;
}
unsigned success = 0;
unsigned got = 0;
unsigned bad = 0;
unsigned unknown = 0;
unsigned future = 0;
unsigned unknown = 0;
unsigned got = 0;
for (unsigned i = 1; i < _r.itemCount(); ++i)
{
@ -276,12 +312,13 @@ bool EthereumPeer::interpret(RLP const& _r)
switch (host()->m_bq.import(_r[i].data(), host()->m_chain))
{
case ImportResult::Success:
addRating(1);
success++;
break;
case ImportResult::Malformed:
bad++;
break;
disable("Malformed block received.");
return true;
case ImportResult::FutureTime:
future++;
@ -298,17 +335,6 @@ bool EthereumPeer::interpret(RLP const& _r)
}
}
if (unknown && m_asking == Asking::Nothing)
{
// TODO: kick off resync.
}
if (bad)
{
// TODO: punish peer
}
addRating(used);
unsigned knownParents = 0;
unsigned unknownParents = 0;
if (g_logVerbosity >= NetMessageSummary::verbosity)
@ -331,9 +357,9 @@ bool EthereumPeer::interpret(RLP const& _r)
}
}
}
clogS(NetMessageSummary) << dec << knownParents << "known parents," << unknownParents << "unknown," << used << "used.";
if (m_asking == Asking::Blocks || m_asking == Asking::ChainHelper)
continueGettingChain();
clogS(NetMessageSummary) << dec << success << "known parents," << unknownParents << "unknown," << used << "used.";
if (m_asking == Asking::Blocks)
continueSync();
break;
}
default:
@ -342,17 +368,15 @@ bool EthereumPeer::interpret(RLP const& _r)
return true;
}
void EthereumPeer::ensureGettingChain()
void EthereumPeer::ensureAskingBlocks()
{
if (m_helping)
if (m_asking != Asking::Nothing)
return; // Already asked & waiting for some.
// Help otherwise, unless we're already the Chain grabber.
setHelping(true);
continueGettingChain();
continueSync();
}
void EthereumPeer::continueGettingChain()
void EthereumPeer::continueSync()
{
// If we're getting the hashes already, then we shouldn't be asking for the chain.
if (m_asking == Asking::Hashes)
@ -370,7 +394,7 @@ void EthereumPeer::continueGettingChain()
sealAndSend(s);
}
else
giveUpOnFetch();
finishSync();
}
/*
@ -387,10 +411,10 @@ void EthereumPeer::continueGettingChain()
* presync nothing
*/
void EthereumPeer::setAsking(Asking _a, Syncing _s)
void EthereumPeer::setAsking(Asking _a, bool _isSyncing)
{
m_asking = _a;
m_syncing = _s;
m_isSyncing = _isSyncing;
session()->addNote("ask", _a == Asking::Nothing ? "nothing" : _a == Asking::State ? "state" : _a == Asking::Hashes ? "hashes" : _a == Asking::Blocks ? "blocks" : "?");
session()->addNote("sync", _s == Syncing::Done ? "done" : _s == Syncing::Waiting ? "wait" : _s == Syncing::Executing ? "exec" : "?");
session()->addNote("sync", string(isSyncing() ? "ongoing" : "holding") + (needsSyncing() ? "needed" : "ok"));
}

30
libethereum/EthereumPeer.h

@ -64,26 +64,43 @@ private:
void tryGrabbingHashChain();
/// Ensure that we are waiting for a bunch of blocks from our peer.
void ensureGettingChain();
void ensureAskingBlocks();
/// Ensure that we are waiting for a bunch of blocks from our peer.
void continueGettingChain();
void continueSync();
void giveUpOnFetch();
void finishSync();
void clearKnownTransactions() { std::lock_guard<std::mutex> l(x_knownTransactions); m_knownTransactions.clear(); }
void setAsking(Asking _g, bool _helping = false);
void setHelping(bool _helping = false) { setAsking(m_asking, _helping); }
void setAsking(Asking _g, bool _isSyncing);
void setNeedsSyncing(h256 _latestHash, u256 _td) { m_latestHash = _latestHash; m_totalDifficulty = _td; }
bool needsSyncing() const { return !!m_latestHash; }
bool isSyncing() const { return m_isSyncing; }
/// Peer's protocol version.
unsigned m_protocolVersion;
/// Peer's network id.
u256 m_networkId;
/// What, if anything, we last asked the other peer for.
Asking m_asking;
Syncing m_syncing;
/// Whether this peer is in the process of syncing or not. Only one peer can be syncing at once.
bool m_isSyncing = false;
/// These are determined through either a Status message or from NewBlock.
h256 m_latestHash; ///< Peer's latest block's hash.
u256 m_totalDifficulty; ///< Peer's latest block's total difficulty.
/// Once a sync is started on this peer, they are cleared.
/// This is built as we ask for hashes. Once no more hashes are given, we present this to the
/// host who initialises the DownloadMan and m_sub becomes active for us to begin asking for blocks.
h256s m_neededBlocks; ///< The blocks that we should download from this peer.
/// Once we're asking for blocks, this becomes in use.
DownloadSub m_sub;
/// Have we received a GetTransactions packet that we haven't yet answered?
bool m_requireTransactions;
Mutex x_knownBlocks;
@ -91,7 +108,6 @@ private:
std::set<h256> m_knownTransactions;
std::mutex x_knownTransactions;
DownloadSub m_sub;
};
}

Loading…
Cancel
Save