Browse Source

Compilable etheruem network rewrite.

cl-refactor
Gav Wood 11 years ago
parent
commit
611caef125
  1. 140
      libethereum/EthereumHost.cpp
  2. 22
      libethereum/EthereumHost.h
  3. 126
      libethereum/EthereumPeer.cpp
  4. 46
      libethereum/EthereumPeer.h

140
libethereum/EthereumHost.cpp

@ -52,7 +52,7 @@ EthereumHost::EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQu
EthereumHost::~EthereumHost()
{
for (auto const& i: peers())
i->cap<EthereumPeer>()->giveUpOnFetch();
i->cap<EthereumPeer>()->abortSync();
}
bool EthereumHost::ensureInitialised(TransactionQueue& _tq)
@ -70,108 +70,76 @@ bool EthereumHost::ensureInitialised(TransactionQueue& _tq)
return false;
}
void EthereumHost::notePeerStateChanged(EthereumPeer* _who)
void EthereumHost::noteNeedsSyncing(EthereumPeer* _who)
{
clog(NetAllDetail) << "Peer state changed.";
// TODO: FIX: BUG: Better state management!
// if already downloading hash-chain, ignore.
if (m_grabbing != Asking::Nothing)
if (isSyncing())
{
for (auto const& i: peers())
if (i->cap<EthereumPeer>()->m_grabbing == m_grabbing || m_grabbing == Asking::Presync)
{
clog(NetAllDetail) << "Already downloading chain. Just set to help out.";
_who->ensureGettingChain();
return;
}
m_grabbing = Asking::Nothing;
clog(NetAllDetail) << "Sync in progress: Just set to help out.";
if (m_syncer->m_asking == Asking::Blocks)
_who->transition(Asking::Blocks);
}
// otherwise check to see if we should be downloading...
_who->attemptSyncing();
else
// otherwise check to see if we should be downloading...
_who->attemptSync();
}
void EthereumHost::updateGrabbing(Asking _g, EthereumPeer* _ignore)
void EthereumHost::updateSyncer(EthereumPeer* _syncer)
{
m_grabbing = _g;
if (_g == Asking::Nothing)
readyForSync();
else if (_g == Asking::Blocks)
if (_syncer)
{
for (auto j: peers())
if (j->cap<EthereumPeer>().get() != _ignore && j->cap<EthereumPeer>()->m_asking == Asking::Nothing)
if (j->cap<EthereumPeer>().get() != _syncer && j->cap<EthereumPeer>()->m_asking == Asking::Nothing)
j->cap<EthereumPeer>()->transition(Asking::Blocks);
}
bool EthereumHost::shouldGrabBlocks(EthereumPeer* _from)
{
auto td = _from->m_syncingTotalDifficulty;
auto lh = _from->m_syncingLatestHash;
if (_from->m_syncingNeededBlocks.empty())
{
updateGrabbing(Asking::Nothing);
return false;
}
clog(NetNote) << "Hash-chain COMPLETE:" << td << "vs" << m_chain.details().totalDifficulty << ";" << _from->m_syncingNeededBlocks.size() << " blocks, ends" << _from->m_syncingNeededBlocks.back().abridged();
if (td < m_chain.details().totalDifficulty || (td == m_chain.details().totalDifficulty && m_chain.currentHash() == lh))
{
clog(NetNote) << "Difficulty of hashchain not HIGHER. Ignoring.";
updateGrabbing(Asking::Nothing);
return false;
}
clog(NetNote) << "Difficulty of hashchain HIGHER. Replacing fetch queue [latest now" << lh.abridged() << ", was" << m_latestBlockSent.abridged() << "]";
return true;
}
void EthereumHost::readyForSync()
{
// start grabbing next hash chain if there is one.
for (auto j: peers())
else
{
j->cap<EthereumPeer>()->attemptSyncing();
if (j->cap<EthereumPeer>()->m_grabbing == Asking::Hashes)
// start grabbing next hash chain if there is one.
for (auto j: peers())
{
m_grabbing = Asking::Hashes;
return;
j->cap<EthereumPeer>()->attemptSync();
if (isSyncing())
return;
}
clog(NetNote) << "No more peers to sync with.";
}
clog(NetNote) << "No more peers to sync with.";
}
void EthereumHost::noteDoneBlocks(EthereumPeer* _who)
void EthereumHost::noteDoneBlocks(EthereumPeer* _who, bool _clemency)
{
if (m_man.isComplete())
{
// Done our chain-get.
clog(NetNote) << "Chain download complete.";
updateGrabbing(Asking::Nothing);
// 1/100th for each useful block hash.
_who->addRating(m_man.chain().size() / 100);
m_man.reset();
}
if (_who->isSyncing())
{
// Done our chain-get.
clog(NetNote) << "Chain download failed. Peer with blocks didn't have them all. This peer is bad and should be punished.";
// TODO: note that peer is BADBADBAD!
updateGrabbing(Asking::Nothing);
if (_clemency)
clog(NetNote) << "Chain download failed. Aborted while incomplete.";
else
{
// Done our chain-get.
clog(NetNote) << "Chain download failed. Peer with blocks didn't have them all. This peer is bad and should be punished.";
m_banned.insert(_who->session()->id()); // We know who you are!
_who->disable("Peer sent hashes but was unable to provide the blocks.");
}
m_man.reset();
}
}
bool EthereumHost::noteBlock(h256 _hash, bytesConstRef _data)
void EthereumHost::reset()
{
if (!m_chain.details(_hash))
{
lock_guard<recursive_mutex> l(m_incomingLock);
m_incomingBlocks.push_back(_data.toBytes());
return true;
}
return false;
if (m_syncer)
m_syncer->abortSync();
m_man.resetToChain(h256s());
m_latestBlockSent = h256();
m_transactionsSent.clear();
}
void EthereumHost::doWork()
@ -187,16 +155,7 @@ void EthereumHost::doWork()
void EthereumHost::maintainTransactions(TransactionQueue& _tq, h256 _currentHash)
{
bool resendAll = (m_grabbing == Asking::Nothing && m_chain.isKnown(m_latestBlockSent) && _currentHash != m_latestBlockSent);
{
lock_guard<recursive_mutex> l(m_incomingLock);
for (auto it = m_incomingTransactions.begin(); it != m_incomingTransactions.end(); ++it)
if (_tq.import(&*it))
{}//ret = true; // just putting a transaction in the queue isn't enough to change the state - it might have an invalid nonce...
else
m_transactionsSent.insert(sha3(*it)); // if we already had the transaction, then don't bother sending it on.
m_incomingTransactions.clear();
}
bool resendAll = (!isSyncing() && m_chain.isKnown(m_latestBlockSent) && _currentHash != m_latestBlockSent);
// Send any new transactions.
for (auto const& p: peers())
@ -226,23 +185,10 @@ void EthereumHost::maintainTransactions(TransactionQueue& _tq, h256 _currentHash
}
}
void EthereumHost::reset()
{
m_grabbing = Asking::Nothing;
m_man.resetToChain(h256s());
m_incomingTransactions.clear();
m_incomingBlocks.clear();
m_latestBlockSent = h256();
m_transactionsSent.clear();
}
void EthereumHost::maintainBlocks(BlockQueue& _bq, h256 _currentHash)
{
// If we've finished our initial sync send any new blocks.
if (m_grabbing == Asking::Nothing && m_chain.isKnown(m_latestBlockSent) && m_chain.details(m_latestBlockSent).totalDifficulty < m_chain.details(_currentHash).totalDifficulty)
if (!isSyncing() && m_chain.isKnown(m_latestBlockSent) && m_chain.details(m_latestBlockSent).totalDifficulty < m_chain.details(_currentHash).totalDifficulty)
{
// TODO: clean up
h256s hs;

22
libethereum/EthereumHost.h

@ -70,19 +70,16 @@ public:
void reset();
DownloadMan const& downloadMan() const { return m_man; }
bool isSyncing() const { return m_grabbing == Asking::Chain; }
bool isSyncing() const { return !!m_syncer; }
private:
void noteHavePeerState(EthereumPeer* _who);
/// Session wants to pass us a block that we might not have.
/// @returns true if we didn't have it.
bool noteBlock(h256 _hash, bytesConstRef _data);
bool isBanned(h512 _id) const { return m_banned.count(_id); }
/// Session has finished getting the chain of hashes.
bool shouldGrabBlocks(EthereumPeer* _who);
private:
/// Session is tell us that we may need (re-)syncing with the peer.
void noteNeedsSyncing(EthereumPeer* _who);
/// Called when the peer can no longer provide us with any needed blocks.
void noteDoneBlocks(EthereumPeer* _who);
void noteDoneBlocks(EthereumPeer* _who, bool _clemency);
/// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network.
void doWork();
@ -104,8 +101,7 @@ private:
virtual void onStarting() { startWorking(); }
virtual void onStopping() { stopWorking(); }
void readyForSync();
void updateGrabbing(Asking _g, EthereumPeer* _ignore);
void updateSyncer(EthereumPeer* _ignore);
BlockChain const& m_chain;
TransactionQueue& m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain.
@ -113,12 +109,14 @@ private:
u256 m_networkId;
Asking m_grabbing = Asking::Nothing; // TODO: needs to be thread-safe & switch to just having a peer id.
EthereumPeer* m_syncer = nullptr; // TODO: switch to weak_ptr
DownloadMan m_man;
h256 m_latestBlockSent;
h256Set m_transactionsSent;
std::set<h512> m_banned;
};
}

126
libethereum/EthereumPeer.cpp

@ -27,6 +27,8 @@
#include <libp2p/Session.h>
#include "BlockChain.h"
#include "EthereumHost.h"
#include "TransactionQueue.h"
#include "BlockQueue.h"
using namespace std;
using namespace dev;
using namespace dev::eth;
@ -38,13 +40,18 @@ EthereumPeer::EthereumPeer(Session* _s, HostCapabilityFace* _h):
Capability(_s, _h),
m_sub(host()->m_man)
{
setAsking(Asking::State, false);
sendStatus();
transition(Asking::State);
}
EthereumPeer::~EthereumPeer()
{
finishSync();
abortSync();
}
void EthereumPeer::abortSync()
{
if (isSyncing())
transition(Asking::Nothing, true);
}
EthereumHost* EthereumPeer::host() const
@ -54,26 +61,46 @@ EthereumHost* EthereumPeer::host() const
void EthereumPeer::sendStatus()
{
RLPStream s;
prep(s);
s.appendList(6) << StatusPacket
<< host()->protocolVersion()
<< host()->networkId()
<< host()->m_chain.details().totalDifficulty
<< host()->m_chain.currentHash()
<< host()->m_chain.genesisHash();
sealAndSend(s);
}
/*
* Possible asking/syncing states for two peers:
*/
void EthereumPeer::transition(Asking _a)
string toString(Asking _a)
{
switch (_a)
{
case Asking::Blocks: return "Blocks";
case Asking::Hashes: return "Hashes";
case Asking::Nothing: return "Nothing";
case Asking::State: return "State";
}
return "?";
}
void EthereumPeer::transition(Asking _a, bool _force)
{
clogS(NetMessageSummary) << "Transition!" << ::toString(_a) << "from" << ::toString(m_asking) << ", " << (isSyncing() ? "syncing" : "holding") << (needsSyncing() ? "& needed" : "");
RLPStream s;
prep(s);
if (_a == Asking::Hashes)
if (_a == Asking::State)
{
if (m_asking == Asking::Nothing)
{
setAsking(Asking::State, false);
s.appendList(6) << StatusPacket
<< host()->protocolVersion()
<< host()->networkId()
<< host()->m_chain.details().totalDifficulty
<< host()->m_chain.currentHash()
<< host()->m_chain.genesisHash();
sealAndSend(s);
return;
}
}
else if (_a == Asking::Hashes)
{
if (m_asking == Asking::State || m_asking == Asking::Nothing)
{
@ -87,7 +114,6 @@ void EthereumPeer::transition(Asking _a)
setAsking(_a, true);
s.appendList(3) << GetBlockHashesPacket << m_syncingLatestHash << c_maxHashesAsk;
m_syncingNeededBlocks = h256s(1, m_syncingLatestHash);
host()->updateGrabbing(Asking::Hashes);
sealAndSend(s);
return;
}
@ -106,15 +132,17 @@ void EthereumPeer::transition(Asking _a)
{
if (m_asking == Asking::Hashes)
{
if (host()->shouldGrabBlocks(this))
if (shouldGrabBlocks())
{
clog(NetNote) << "Difficulty of hashchain HIGHER. Grabbing" << m_syncingNeededBlocks.size() << "blocks [latest now" << m_syncingLatestHash.abridged() << ", was" << host()->m_latestBlockSent.abridged() << "]";
host()->m_man.resetToChain(m_syncingNeededBlocks);
host()->m_latestBlockSent = m_syncingLatestHash;
host()->updateGrabbing(Asking::Blocks, this);
}
else
{
clog(NetNote) << "Difficulty of hashchain not HIGHER. Ignoring.";
setAsking(Asking::Nothing, false);
return;
}
@ -145,7 +173,7 @@ void EthereumPeer::transition(Asking _a)
// a bit overkill given that the other nodes may yet have the needed blocks, but better to be safe than sorry.
if (isSyncing())
host()->noteDoneBlocks(this);
host()->noteDoneBlocks(this, _force);
// NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary.
m_sub.doneFetch();
@ -156,28 +184,28 @@ void EthereumPeer::transition(Asking _a)
{
clogS(NetNote) << "Finishing hashes fetch...";
if (isSyncing())
host()->noteDoneBlocks(this);
setAsking(Asking::Nothing, false);
}
else if (m_asking == Asking::State)
{
setAsking(Asking::Nothing, false);
// TODO: Just got the state - should check to see if we can be of help downloading the chain if any.
// TODO: Otherwise, should put ourselves up for sync.
// Just got the state - should check to see if we can be of help downloading the chain if any.
// Otherwise, should put ourselves up for sync.
setNeedsSyncing(m_latestHash, m_totalDifficulty);
}
// Otherwise it's fine. We don't care if it's Nothing->Nothing.
return;
}
clogS(NetWarn) << "Invalid state transition:" << (int)_a << "from" << (int)m_asking << "/" << boolalpha << isSyncing() << needsSyncing();
clogS(NetWarn) << "Invalid state transition:" << ::toString(_a) << "from" << ::toString(m_asking) << ", " << (isSyncing() ? "syncing" : "holding") << (needsSyncing() ? "& needed" : "");
}
void EthereumPeer::setAsking(Asking _a, bool _isSyncing)
{
m_asking = _a;
m_isSyncing = _isSyncing;
if (_isSyncing != (host()->m_syncer == this))
host()->updateSyncer(_isSyncing ? this : nullptr);
session()->addNote("ask", _a == Asking::Nothing ? "nothing" : _a == Asking::State ? "state" : _a == Asking::Hashes ? "hashes" : _a == Asking::Blocks ? "blocks" : "?");
session()->addNote("sync", string(isSyncing() ? "ongoing" : "holding") + (needsSyncing() ? " & needed" : ""));
}
@ -187,11 +215,32 @@ void EthereumPeer::setNeedsSyncing(h256 _latestHash, u256 _td)
m_latestHash = _latestHash;
m_totalDifficulty = _td;
// TODO: should be "noteNeedsSyncing" or some such.
host()->notePeerStateChanged(this);
host()->noteNeedsSyncing(this);
}
bool EthereumPeer::isSyncing() const
{
return host()->m_syncer == this;
}
void EthereumPeer::attemptSyncing()
bool EthereumPeer::shouldGrabBlocks() const
{
auto td = m_syncingTotalDifficulty;
auto lh = m_syncingLatestHash;
auto ctd = host()->m_chain.details().totalDifficulty;
if (m_syncingNeededBlocks.empty())
return false;
clog(NetNote) << "Should grab blocks? " << td << "vs" << ctd << ";" << m_syncingNeededBlocks.size() << " blocks, ends" << m_syncingNeededBlocks.back().abridged();
if (td < ctd || (td == ctd && host()->m_chain.currentHash() == lh))
return false;
return true;
}
void EthereumPeer::attemptSync()
{
if (m_asking != Asking::Nothing)
{
@ -231,8 +280,8 @@ bool EthereumPeer::interpret(RLP const& _r)
{
m_protocolVersion = _r[1].toInt<unsigned>();
m_networkId = _r[2].toInt<u256>();
auto totalDifficulty = _r[3].toInt<u256>();
auto latestHash = _r[4].toHash<h256>();
m_totalDifficulty = _r[3].toInt<u256>();
m_latestHash = _r[4].toHash<h256>();
auto genesisHash = _r[5].toHash<h256>();
clogS(NetMessageSummary) << "Status:" << m_protocolVersion << "/" << m_networkId << "/" << genesisHash.abridged() << ", TD:" << m_totalDifficulty << "=" << m_latestHash.abridged();
@ -245,6 +294,8 @@ bool EthereumPeer::interpret(RLP const& _r)
disable("Invalid network identifier.");
else if (session()->info().clientVersion.find("/v0.6.9/") != string::npos)
disable("Blacklisted client version.");
else if (host()->isBanned(session()->id()))
disable("Peer banned for previous bad behaviour.");
else
{
// Grab transactions off them.
@ -252,8 +303,7 @@ bool EthereumPeer::interpret(RLP const& _r)
prep(s).appendList(1);
s << GetTransactionsPacket;
sealAndSend(s);
setNeedsSyncing(latestHash, totalDifficulty);
transition(Asking::Nothing);
}
break;
}
@ -269,9 +319,11 @@ bool EthereumPeer::interpret(RLP const& _r)
Guard l(x_knownTransactions);
for (unsigned i = 1; i < _r.itemCount(); ++i)
{
m_knownTransactions.insert(sha3(_r[i].data()));
if (!_tq.import(_r[i].data())) // if we already had the transaction, then don't bother sending it on.
host()->m_transactionsSent.insert(sha3(*it));
auto h = sha3(_r[i].data());
m_knownTransactions.insert(h);
if (!host()->m_tq.import(_r[i].data()))
// if we already had the transaction, then don't bother sending it on.
host()->m_transactionsSent.insert(h);
}
break;
}
@ -402,7 +454,7 @@ bool EthereumPeer::interpret(RLP const& _r)
}
case NewBlockPacket:
{
auto h = BlockInfo::headerHash(bd(_r[1].data()));
auto h = BlockInfo::headerHash(_r[1].data());
clogS(NetMessageSummary) << "NewBlock: " << h.abridged();
if (_r.itemCount() != 3)

46
libethereum/EthereumPeer.h

@ -49,33 +49,51 @@ class EthereumPeer: public p2p::Capability
friend class EthereumHost;
public:
/// Basic constructor.
EthereumPeer(p2p::Session* _s, p2p::HostCapabilityFace* _h);
/// Basic destructor.
virtual ~EthereumPeer();
/// What is our name?
static std::string name() { return "eth"; }
/// What is the ethereum subprotocol host object.
EthereumHost* host() const;
private:
/// Interpret an incoming message.
virtual bool interpret(RLP const& _r);
/// Send our status to peer.
void sendStatus();
void transition(Asking _wantState);
void attemptSyncing();
/// Transition state in a particular direction.
void transition(Asking _wantState, bool _force = false);
/// Ensure that we are waiting for a bunch of blocks from our peer.
void ensureAskingBlocks();
/// Attempt to begin syncing with this peer; first check the peer has a more difficlult chain to download, then start asking for hashes, then move to blocks.
void attemptSync();
void finishSync();
/// Abort the sync operation.
void abortSync();
/// Clear all known transactions.
void clearKnownTransactions() { std::lock_guard<std::mutex> l(x_knownTransactions); m_knownTransactions.clear(); }
/// Update our asking state.
void setAsking(Asking _g, bool _isSyncing);
/// Update our syncing requirements state.
void setNeedsSyncing(h256 _latestHash, u256 _td);
/// Do we presently need syncing with this peer?
bool needsSyncing() const { return !!m_latestHash; }
bool isSyncing() const { return m_isSyncing; }
/// Are we presently syncing with this peer?
bool isSyncing() const;
/// Check whether the session should bother grabbing the peer's blocks.
bool shouldGrabBlocks() const;
/// Peer's protocol version.
unsigned m_protocolVersion;
@ -83,7 +101,7 @@ private:
u256 m_networkId;
/// What, if anything, we last asked the other peer for.
Asking m_asking;
Asking m_asking = Asking::Nothing;
/// Whether this peer is in the process of syncing or not. Only one peer can be syncing at once.
bool m_isSyncing = false;
@ -95,9 +113,9 @@ private:
/// This is built as we ask for hashes. Once no more hashes are given, we present this to the
/// host who initialises the DownloadMan and m_sub becomes active for us to begin asking for blocks.
h256s m_syncingNeededBlocks; ///< The blocks that we should download from this peer.
h256 m_syncingLatestHash; ///< Peer's latest block's hash, as of the current sync.
u256 m_syncingTotalDifficulty; ///< Peer's latest block's total difficulty, as of the current sync.
h256s m_syncingNeededBlocks; ///< The blocks that we should download from this peer.
h256 m_syncingLatestHash; ///< Peer's latest block's hash, as of the current sync.
u256 m_syncingTotalDifficulty; ///< Peer's latest block's total difficulty, as of the current sync.
/// Once we're asking for blocks, this becomes in use.
DownloadSub m_sub;
@ -106,9 +124,9 @@ private:
bool m_requireTransactions;
Mutex x_knownBlocks;
std::set<h256> m_knownBlocks;
std::set<h256> m_knownTransactions;
std::mutex x_knownTransactions;
h256Set m_knownBlocks; ///< Blocks that the peer already knows about (that don't need to be sent to them).
Mutex x_knownTransactions;
h256Set m_knownTransactions; ///< Transactions that the peer already knows of.
};

Loading…
Cancel
Save