Browse Source

Merge branch 'goodnet' into develop

Conflicts:
	libethereum/BlockQueue.cpp
cl-refactor
Gav Wood 10 years ago
parent
commit
3fec74fac1
  1. 13
      libethereum/BlockQueue.cpp
  2. 15
      libethereum/BlockQueue.h
  3. 13
      libethereum/CommonNet.h
  4. 190
      libethereum/EthereumHost.cpp
  5. 35
      libethereum/EthereumHost.h
  6. 405
      libethereum/EthereumPeer.cpp
  7. 70
      libethereum/EthereumPeer.h
  8. 14
      libp2p/Session.cpp

13
libethereum/BlockQueue.cpp

@ -29,7 +29,7 @@ using namespace std;
using namespace dev;
using namespace dev::eth;
bool BlockQueue::import(bytesConstRef _block, BlockChain const& _bc)
ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc)
{
// Check if we already know this block.
h256 h = BlockInfo::headerHash(_block);
@ -42,7 +42,7 @@ bool BlockQueue::import(bytesConstRef _block, BlockChain const& _bc)
{
// Already know about this one.
cblockq << "Already known.";
return false;
return ImportResult::AlreadyKnown;
}
// VERIFY: populates from the block and checks the block is internally coherent.
@ -60,6 +60,7 @@ bool BlockQueue::import(bytesConstRef _block, BlockChain const& _bc)
{
cwarn << "Ignoring malformed block: " << diagnostic_information(_e);
return false;
return ImportResult::Malformed;
}
#endif
@ -67,7 +68,7 @@ bool BlockQueue::import(bytesConstRef _block, BlockChain const& _bc)
if (_bc.details(h))
{
cblockq << "Already known in chain.";
return false;
return ImportResult::AlreadyInChain;
}
UpgradeGuard ul(l);
@ -77,6 +78,7 @@ bool BlockQueue::import(bytesConstRef _block, BlockChain const& _bc)
{
m_future.insert(make_pair((unsigned)bi.timestamp, _block.toBytes()));
cblockq << "OK - queued for future.";
return ImportResult::FutureTime;
}
else
{
@ -87,6 +89,8 @@ bool BlockQueue::import(bytesConstRef _block, BlockChain const& _bc)
cblockq << "OK - queued as unknown parent:" << bi.parentHash.abridged();
m_unknown.insert(make_pair(bi.parentHash, make_pair(h, _block.toBytes())));
m_unknownSet.insert(h);
return ImportResult::UnknownParent;
}
else
{
@ -96,10 +100,9 @@ bool BlockQueue::import(bytesConstRef _block, BlockChain const& _bc)
m_readySet.insert(h);
noteReadyWithoutWriteGuard(h);
return ImportResult::Success;
}
}
return true;
}
void BlockQueue::tick(BlockChain const& _bc)

15
libethereum/BlockQueue.h

@ -37,6 +37,16 @@ class BlockChain;
struct BlockQueueChannel: public LogChannel { static const char* name() { return "[]Q"; } static const int verbosity = 4; };
#define cblockq dev::LogOutputStream<dev::eth::BlockQueueChannel, true>()
enum class ImportResult
{
Success = 0,
UnknownParent,
FutureTime,
AlreadyInChain,
AlreadyKnown,
Malformed
};
/**
* @brief A queue of blocks. Sits between network or other I/O and the BlockChain.
* Sorts them ready for blockchain insertion (with the BlockChain::sync() method).
@ -46,7 +56,7 @@ class BlockQueue
{
public:
/// Import a block into the queue.
bool import(bytesConstRef _tx, BlockChain const& _bc);
ImportResult import(bytesConstRef _tx, BlockChain const& _bc);
/// Notes that time has moved on and some blocks that used to be "in the future" may no be valid.
void tick(BlockChain const& _bc);
@ -67,6 +77,9 @@ public:
/// Clear everything.
void clear() { WriteGuard l(m_lock); m_readySet.clear(); m_drainingSet.clear(); m_ready.clear(); m_unknownSet.clear(); m_unknown.clear(); m_future.clear(); }
/// Return first block with an unknown parent.
h256 firstUnknown() const { ReadGuard l(m_lock); return m_unknownSet.size() ? *m_unknownSet.begin() : h256(); }
private:
void noteReadyWithoutWriteGuard(h256 _b);
void notePresentWithoutWriteGuard(bytesConstRef _block);

13
libethereum/CommonNet.h

@ -59,16 +59,23 @@ enum EthereumPacket
BlockHashesPacket,
GetBlocksPacket,
BlocksPacket,
NewBlockPacket,
};
enum class Grabbing
enum class Asking
{
State,
Hashes,
Chain,
ChainHelper,
Blocks,
Nothing
};
enum class Syncing
{
Waiting,
Executing,
Done
};
}
}

190
libethereum/EthereumHost.cpp

@ -52,10 +52,10 @@ EthereumHost::EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQu
EthereumHost::~EthereumHost()
{
for (auto const& i: peers())
i->cap<EthereumPeer>()->giveUpOnFetch();
i->cap<EthereumPeer>()->abortSync();
}
bool EthereumHost::ensureInitialised(TransactionQueue& _tq)
bool EthereumHost::ensureInitialised()
{
if (!m_latestBlockSent)
{
@ -63,155 +63,109 @@ bool EthereumHost::ensureInitialised(TransactionQueue& _tq)
m_latestBlockSent = m_chain.currentHash();
clog(NetNote) << "Initialising: latest=" << m_latestBlockSent.abridged();
for (auto const& i: _tq.transactions())
for (auto const& i: m_tq.transactions())
m_transactionsSent.insert(i.first);
return true;
}
return false;
}
void EthereumHost::noteHavePeerState(EthereumPeer* _who)
void EthereumHost::noteNeedsSyncing(EthereumPeer* _who)
{
clog(NetAllDetail) << "Have peer state.";
// TODO: FIX: BUG: Better state management!
// if already downloading hash-chain, ignore.
if (m_grabbing != Grabbing::Nothing)
if (isSyncing())
{
for (auto const& i: peers())
if (i->cap<EthereumPeer>()->m_grabbing == m_grabbing || m_grabbing == Grabbing::State)
{
clog(NetAllDetail) << "Already downloading chain. Just set to help out.";
_who->ensureGettingChain();
return;
clog(NetAllDetail) << "Sync in progress: Just set to help out.";
if (m_syncer->m_asking == Asking::Blocks)
_who->transition(Asking::Blocks);
}
m_grabbing = Grabbing::Nothing;
}
else
// otherwise check to see if we should be downloading...
_who->tryGrabbingHashChain();
_who->attemptSync();
}
void EthereumHost::updateGrabbing(Grabbing _g)
void EthereumHost::changeSyncer(EthereumPeer* _syncer)
{
m_grabbing = _g;
if (_g == Grabbing::Nothing)
readyForSync();
else if (_g == Grabbing::Chain)
for (auto j: peers())
j->cap<EthereumPeer>()->ensureGettingChain();
}
void EthereumHost::noteHaveChain(EthereumPeer* _from)
{
auto td = _from->m_totalDifficulty;
if (_from->m_neededBlocks.empty())
m_syncer = _syncer;
if (isSyncing())
{
_from->setGrabbing(Grabbing::Nothing);
updateGrabbing(Grabbing::Nothing);
return;
}
clog(NetNote) << "Hash-chain COMPLETE:" << _from->m_totalDifficulty << "vs" << m_chain.details().totalDifficulty << ";" << _from->m_neededBlocks.size() << " blocks, ends" << _from->m_neededBlocks.back().abridged();
if (td < m_chain.details().totalDifficulty || (td == m_chain.details().totalDifficulty && m_chain.currentHash() == _from->m_latestHash))
{
clog(NetNote) << "Difficulty of hashchain not HIGHER. Ignoring.";
_from->setGrabbing(Grabbing::Nothing);
updateGrabbing(Grabbing::Nothing);
return;
}
clog(NetNote) << "Difficulty of hashchain HIGHER. Replacing fetch queue [latest now" << _from->m_latestHash.abridged() << ", was" << m_latestBlockSent.abridged() << "]";
// Looks like it's the best yet for total difficulty. Set to download.
m_man.resetToChain(_from->m_neededBlocks);
m_latestBlockSent = _from->m_latestHash;
_from->setGrabbing(Grabbing::Chain);
updateGrabbing(Grabbing::Chain);
if (_syncer->m_asking == Asking::Blocks)
for (auto j: peers())
if (j->cap<EthereumPeer>().get() != _syncer && j->cap<EthereumPeer>()->m_asking == Asking::Nothing)
j->cap<EthereumPeer>()->transition(Asking::Blocks);
}
void EthereumHost::readyForSync()
else
{
// start grabbing next hash chain if there is one.
for (auto j: peers())
{
j->cap<EthereumPeer>()->tryGrabbingHashChain();
if (j->cap<EthereumPeer>()->m_grabbing == Grabbing::Hashes)
{
m_grabbing = Grabbing::Hashes;
j->cap<EthereumPeer>()->attemptSync();
if (isSyncing())
return;
}
}
clog(NetNote) << "No more peers to sync with.";
}
}
void EthereumHost::noteDoneBlocks(EthereumPeer* _who)
void EthereumHost::noteDoneBlocks(EthereumPeer* _who, bool _clemency)
{
if (m_man.isComplete())
{
// Done our chain-get.
clog(NetNote) << "Chain download complete.";
updateGrabbing(Grabbing::Nothing);
// 1/100th for each useful block hash.
_who->addRating(m_man.chain().size() / 100);
m_man.reset();
}
if (_who->m_grabbing == Grabbing::Chain)
else if (_who->isSyncing())
{
if (_clemency)
clog(NetNote) << "Chain download failed. Aborted while incomplete.";
else
{
// Done our chain-get.
clog(NetNote) << "Chain download failed. Peer with blocks didn't have them all. This peer is bad and should be punished.";
// TODO: note that peer is BADBADBAD!
updateGrabbing(Grabbing::Nothing);
m_banned.insert(_who->session()->id()); // We know who you are!
_who->disable("Peer sent hashes but was unable to provide the blocks.");
}
m_man.reset();
}
}
bool EthereumHost::noteBlock(h256 _hash, bytesConstRef _data)
{
if (!m_chain.details(_hash))
void EthereumHost::reset()
{
lock_guard<recursive_mutex> l(m_incomingLock);
m_incomingBlocks.push_back(_data.toBytes());
return true;
}
return false;
if (m_syncer)
m_syncer->abortSync();
m_man.resetToChain(h256s());
m_latestBlockSent = h256();
m_transactionsSent.clear();
}
void EthereumHost::doWork()
{
bool netChange = ensureInitialised(m_tq);
bool netChange = ensureInitialised();
auto h = m_chain.currentHash();
maintainTransactions(m_tq, h);
maintainBlocks(m_bq, h);
maintainTransactions(h);
maintainBlocks(h);
// return netChange;
// TODO: Figure out what to do with netChange.
(void)netChange;
}
void EthereumHost::maintainTransactions(TransactionQueue& _tq, h256 _currentHash)
{
bool resendAll = (m_grabbing == Grabbing::Nothing && m_chain.isKnown(m_latestBlockSent) && _currentHash != m_latestBlockSent);
void EthereumHost::maintainTransactions(h256 _currentHash)
{
lock_guard<recursive_mutex> l(m_incomingLock);
for (auto it = m_incomingTransactions.begin(); it != m_incomingTransactions.end(); ++it)
if (_tq.import(&*it))
{}//ret = true; // just putting a transaction in the queue isn't enough to change the state - it might have an invalid nonce...
else
m_transactionsSent.insert(sha3(*it)); // if we already had the transaction, then don't bother sending it on.
m_incomingTransactions.clear();
}
bool resendAll = (!isSyncing() && m_chain.isKnown(m_latestBlockSent) && _currentHash != m_latestBlockSent);
// Send any new transactions.
for (auto const& p: peers())
{
auto ep = p->cap<EthereumPeer>();
if (ep)
if (auto ep = p->cap<EthereumPeer>())
{
bytes b;
unsigned n = 0;
for (auto const& i: _tq.transactions())
for (auto const& i: m_tq.transactions())
if ((!m_transactionsSent.count(i.first) && !ep->m_knownTransactions.count(i.first)) || ep->m_requireTransactions || resendAll)
{
b += i.second;
@ -220,7 +174,7 @@ void EthereumHost::maintainTransactions(TransactionQueue& _tq, h256 _currentHash
}
ep->clearKnownTransactions();
if (n)
if (n || ep->m_requireTransactions)
{
RLPStream ts;
EthereumPeer::prep(ts);
@ -232,53 +186,23 @@ void EthereumHost::maintainTransactions(TransactionQueue& _tq, h256 _currentHash
ep->m_requireTransactions = false;
}
}
}
void EthereumHost::reset()
void EthereumHost::maintainBlocks(h256 _currentHash)
{
m_grabbing = Grabbing::Nothing;
m_man.resetToChain(h256s());
m_incomingTransactions.clear();
m_incomingBlocks.clear();
m_latestBlockSent = h256();
m_transactionsSent.clear();
}
void EthereumHost::maintainBlocks(BlockQueue& _bq, h256 _currentHash)
{
// Import new blocks
{
lock_guard<recursive_mutex> l(m_incomingLock);
for (auto it = m_incomingBlocks.rbegin(); it != m_incomingBlocks.rend(); ++it)
if (_bq.import(&*it, m_chain))
{}
else{} // TODO: don't forward it.
m_incomingBlocks.clear();
}
// If we've finished our initial sync send any new blocks.
if (m_grabbing == Grabbing::Nothing && m_chain.isKnown(m_latestBlockSent) && m_chain.details(m_latestBlockSent).totalDifficulty < m_chain.details(_currentHash).totalDifficulty)
if (!isSyncing() && m_chain.isKnown(m_latestBlockSent) && m_chain.details(m_latestBlockSent).totalDifficulty < m_chain.details(_currentHash).totalDifficulty)
{
// TODO: clean up
h256s hs;
hs.push_back(_currentHash);
RLPStream ts;
EthereumPeer::prep(ts);
bytes bs;
unsigned c = 0;
for (auto h: m_chain.treeRoute(m_latestBlockSent, _currentHash, nullptr, false, true))
{
for (auto h: hs)
bs += m_chain.block(h);
++c;
}
clog(NetMessageSummary) << "Sending" << c << "new blocks (current is" << _currentHash << ", was" << m_latestBlockSent << ")";
if (c > 1000)
{
cwarn << "Gaa this would be an awful lot of new blocks. Not bothering";
return;
}
clog(NetMessageSummary) << "Sending" << hs.size() << "new blocks (current is" << _currentHash << ", was" << m_latestBlockSent << ")";
ts.appendList(1 + c).append(BlocksPacket).appendRaw(bs, c);
ts.appendList(1 + hs.size()).append(BlocksPacket).appendRaw(bs, hs.size());
bytes b;
ts.swapOut(b);
seal(b);

35
libethereum/EthereumHost.h

@ -70,26 +70,22 @@ public:
void reset();
DownloadMan const& downloadMan() const { return m_man; }
bool isSyncing() const { return m_grabbing == Grabbing::Chain; }
bool isSyncing() const { return !!m_syncer; }
bool isBanned(h512 _id) const { return m_banned.count(_id); }
private:
void noteHavePeerState(EthereumPeer* _who);
/// Session wants to pass us a block that we might not have.
/// @returns true if we didn't have it.
bool noteBlock(h256 _hash, bytesConstRef _data);
/// Session has finished getting the chain of hashes.
void noteHaveChain(EthereumPeer* _who);
/// Session is tell us that we may need (re-)syncing with the peer.
void noteNeedsSyncing(EthereumPeer* _who);
/// Called when the peer can no longer provide us with any needed blocks.
void noteDoneBlocks(EthereumPeer* _who);
void noteDoneBlocks(EthereumPeer* _who, bool _clemency);
/// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network.
void doWork();
/// Called by peer to add incoming transactions.
void addIncomingTransaction(bytes const& _bytes) { std::lock_guard<std::recursive_mutex> l(m_incomingLock); m_incomingTransactions.push_back(_bytes); }
void maintainTransactions(TransactionQueue& _tq, h256 _currentBlock);
void maintainBlocks(BlockQueue& _bq, h256 _currentBlock);
void maintainTransactions(h256 _currentBlock);
void maintainBlocks(h256 _currentBlock);
/// Get a bunch of needed blocks.
/// Removes them from our list of needed blocks.
@ -100,13 +96,12 @@ private:
bool isInitialised() const { return m_latestBlockSent; }
/// Initialises the network peer-state, doing the stuff that needs to be once-only. @returns true if it really was first.
bool ensureInitialised(TransactionQueue& _tq);
bool ensureInitialised();
virtual void onStarting() { startWorking(); }
virtual void onStopping() { stopWorking(); }
void readyForSync();
void updateGrabbing(Grabbing _g);
void changeSyncer(EthereumPeer* _ignore);
BlockChain const& m_chain;
TransactionQueue& m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain.
@ -114,16 +109,14 @@ private:
u256 m_networkId;
Grabbing m_grabbing = Grabbing::Nothing; // TODO: needs to be thread-safe & switch to just having a peer id.
mutable std::recursive_mutex m_incomingLock;
std::vector<bytes> m_incomingTransactions;
std::vector<bytes> m_incomingBlocks;
EthereumPeer* m_syncer = nullptr; // TODO: switch to weak_ptr
DownloadMan m_man;
h256 m_latestBlockSent;
h256Set m_transactionsSent;
std::set<h512> m_banned;
};
}

405
libethereum/EthereumPeer.cpp

@ -27,6 +27,8 @@
#include <libp2p/Session.h>
#include "BlockChain.h"
#include "EthereumHost.h"
#include "TransactionQueue.h"
#include "BlockQueue.h"
using namespace std;
using namespace dev;
using namespace dev::eth;
@ -38,13 +40,18 @@ EthereumPeer::EthereumPeer(Session* _s, HostCapabilityFace* _h):
Capability(_s, _h),
m_sub(host()->m_man)
{
setGrabbing(Grabbing::State);
sendStatus();
transition(Asking::State);
}
EthereumPeer::~EthereumPeer()
{
giveUpOnFetch();
abortSync();
}
void EthereumPeer::abortSync()
{
if (isSyncing())
transition(Asking::Nothing, true);
}
EthereumHost* EthereumPeer::host() const
@ -52,10 +59,33 @@ EthereumHost* EthereumPeer::host() const
return static_cast<EthereumHost*>(Capability::hostCapability());
}
void EthereumPeer::sendStatus()
/*
* Possible asking/syncing states for two peers:
*/
string toString(Asking _a)
{
switch (_a)
{
case Asking::Blocks: return "Blocks";
case Asking::Hashes: return "Hashes";
case Asking::Nothing: return "Nothing";
case Asking::State: return "State";
}
return "?";
}
void EthereumPeer::transition(Asking _a, bool _force)
{
clogS(NetMessageSummary) << "Transition!" << ::toString(_a) << "from" << ::toString(m_asking) << ", " << (isSyncing() ? "syncing" : "holding") << (needsSyncing() ? "& needed" : "");
RLPStream s;
prep(s);
if (_a == Asking::State)
{
if (m_asking == Asking::Nothing)
{
setAsking(Asking::State, false);
s.appendList(6) << StatusPacket
<< host()->protocolVersion()
<< host()->networkId()
@ -63,25 +93,174 @@ void EthereumPeer::sendStatus()
<< host()->m_chain.currentHash()
<< host()->m_chain.genesisHash();
sealAndSend(s);
return;
}
}
else if (_a == Asking::Hashes)
{
if (m_asking == Asking::State || m_asking == Asking::Nothing)
{
if (isSyncing())
clogS(NetWarn) << "Bad state: not asking for Hashes, yet syncing!";
void EthereumPeer::startInitialSync()
m_syncingLatestHash = m_latestHash;
m_syncingTotalDifficulty = m_totalDifficulty;
resetNeedsSyncing();
setAsking(_a, true);
s.appendList(3) << GetBlockHashesPacket << m_syncingLatestHash << c_maxHashesAsk;
m_syncingNeededBlocks = h256s(1, m_syncingLatestHash);
sealAndSend(s);
return;
}
else if (m_asking == Asking::Hashes)
{
// Grab transactions off them.
if (!isSyncing())
clogS(NetWarn) << "Bad state: asking for Hashes yet not syncing!";
setAsking(_a, true);
s.appendList(3) << GetBlockHashesPacket << m_syncingNeededBlocks.back() << c_maxHashesAsk;
sealAndSend(s);
return;
}
}
else if (_a == Asking::Blocks)
{
RLPStream s;
prep(s).appendList(1);
s << GetTransactionsPacket;
if (m_asking == Asking::Hashes)
{
if (!isSyncing())
clogS(NetWarn) << "Bad state: asking for Hashes yet not syncing!";
if (shouldGrabBlocks())
{
clog(NetNote) << "Difficulty of hashchain HIGHER. Grabbing" << m_syncingNeededBlocks.size() << "blocks [latest now" << m_syncingLatestHash.abridged() << ", was" << host()->m_latestBlockSent.abridged() << "]";
host()->m_man.resetToChain(m_syncingNeededBlocks);
host()->m_latestBlockSent = m_syncingLatestHash;
}
else
{
clog(NetNote) << "Difficulty of hashchain not HIGHER. Ignoring.";
m_syncingLatestHash = h256();
setAsking(Asking::Nothing, false);
return;
}
}
// run through into...
if (m_asking == Asking::Nothing || m_asking == Asking::Hashes || m_asking == Asking::Blocks)
{
// Looks like it's the best yet for total difficulty. Set to download.
setAsking(Asking::Blocks, true); // will kick off other peers to help if available.
auto blocks = m_sub.nextFetch(c_maxBlocksAsk);
if (blocks.size())
{
s.appendList(blocks.size() + 1) << GetBlocksPacket;
for (auto const& i: blocks)
s << i;
sealAndSend(s);
}
else
transition(Asking::Nothing);
return;
}
}
else if (_a == Asking::Nothing)
{
if (m_asking == Asking::Blocks)
{
clogS(NetNote) << "Finishing blocks fetch...";
// a bit overkill given that the other nodes may yet have the needed blocks, but better to be safe than sorry.
if (isSyncing())
host()->noteDoneBlocks(this, _force);
// NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary.
m_sub.doneFetch();
setAsking(Asking::Nothing, false);
}
else if (m_asking == Asking::Hashes)
{
clogS(NetNote) << "Finishing hashes fetch...";
setAsking(Asking::Nothing, false);
}
else if (m_asking == Asking::State)
{
setAsking(Asking::Nothing, false);
// Just got the state - should check to see if we can be of help downloading the chain if any.
// Otherwise, should put ourselves up for sync.
setNeedsSyncing(m_latestHash, m_totalDifficulty);
}
// Otherwise it's fine. We don't care if it's Nothing->Nothing.
return;
}
clogS(NetWarn) << "Invalid state transition:" << ::toString(_a) << "from" << ::toString(m_asking) << ", " << (isSyncing() ? "syncing" : "holding") << (needsSyncing() ? "& needed" : "");
}
void EthereumPeer::setAsking(Asking _a, bool _isSyncing)
{
bool changedAsking = (m_asking != _a);
m_asking = _a;
if (_isSyncing != (host()->m_syncer == this) || (_isSyncing && changedAsking))
host()->changeSyncer(_isSyncing ? this : nullptr);
if (!_isSyncing)
{
m_syncingLatestHash = h256();
m_syncingTotalDifficulty = 0;
m_syncingNeededBlocks.clear();
}
session()->addNote("ask", _a == Asking::Nothing ? "nothing" : _a == Asking::State ? "state" : _a == Asking::Hashes ? "hashes" : _a == Asking::Blocks ? "blocks" : "?");
session()->addNote("sync", string(isSyncing() ? "ongoing" : "holding") + (needsSyncing() ? " & needed" : ""));
}
void EthereumPeer::setNeedsSyncing(h256 _latestHash, u256 _td)
{
m_latestHash = _latestHash;
m_totalDifficulty = _td;
if (m_latestHash)
host()->noteNeedsSyncing(this);
host()->noteHavePeerState(this);
session()->addNote("sync", string(isSyncing() ? "ongoing" : "holding") + (needsSyncing() ? " & needed" : ""));
}
void EthereumPeer::tryGrabbingHashChain()
bool EthereumPeer::isSyncing() const
{
return host()->m_syncer == this;
}
bool EthereumPeer::shouldGrabBlocks() const
{
auto td = m_syncingTotalDifficulty;
auto lh = m_syncingLatestHash;
auto ctd = host()->m_chain.details().totalDifficulty;
if (m_syncingNeededBlocks.empty())
return false;
clog(NetNote) << "Should grab blocks? " << td << "vs" << ctd << ";" << m_syncingNeededBlocks.size() << " blocks, ends" << m_syncingNeededBlocks.back().abridged();
if (td < ctd || (td == ctd && host()->m_chain.currentHash() == lh))
return false;
return true;
}
void EthereumPeer::attemptSync()
{
if (m_asking != Asking::Nothing)
{
clogS(NetAllDetail) << "Can't synced with this peer - outstanding asks.";
return;
}
// if already done this, then ignore.
if (m_grabbing != Grabbing::State)
if (!needsSyncing())
{
clogS(NetAllDetail) << "Already synced with this peer.";
return;
@ -95,39 +274,16 @@ void EthereumPeer::tryGrabbingHashChain()
if (td >= m_totalDifficulty)
{
clogS(NetAllDetail) << "No. Our chain is better.";
setGrabbing(Grabbing::Nothing);
return; // All good - we have the better chain.
resetNeedsSyncing();
transition(Asking::Nothing);
}
// Our chain isn't better - grab theirs.
else
{
clogS(NetAllDetail) << "Yes. Their chain is better.";
host()->updateGrabbing(Grabbing::Hashes);
setGrabbing(Grabbing::Hashes);
RLPStream s;
prep(s).appendList(3);
s << GetBlockHashesPacket << m_latestHash << c_maxHashesAsk;
m_neededBlocks = h256s(1, m_latestHash);
sealAndSend(s);
transition(Asking::Hashes);
}
}
void EthereumPeer::giveUpOnFetch()
{
clogS(NetNote) << "Finishing fetch...";
// a bit overkill given that the other nodes may yet have the needed blocks, but better to be safe than sorry.
if (m_grabbing == Grabbing::Chain || m_grabbing == Grabbing::ChainHelper)
{
host()->noteDoneBlocks(this);
setGrabbing(Grabbing::Nothing);
}
// NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary.
m_sub.doneFetch();
}
bool EthereumPeer::interpret(RLP const& _r)
{
switch (_r[0].toInt<unsigned>())
@ -136,6 +292,8 @@ bool EthereumPeer::interpret(RLP const& _r)
{
m_protocolVersion = _r[1].toInt<unsigned>();
m_networkId = _r[2].toInt<u256>();
// a bit dirty as we're misusing these to communicate the values to transition, but harmless.
m_totalDifficulty = _r[3].toInt<u256>();
m_latestHash = _r[4].toHash<h256>();
auto genesisHash = _r[5].toHash<h256>();
@ -150,8 +308,17 @@ bool EthereumPeer::interpret(RLP const& _r)
disable("Invalid network identifier.");
else if (session()->info().clientVersion.find("/v0.6.9/") != string::npos)
disable("Blacklisted client version.");
else if (host()->isBanned(session()->id()))
disable("Peer banned for previous bad behaviour.");
else
startInitialSync();
{
// Grab transactions off them.
RLPStream s;
prep(s).appendList(1);
s << GetTransactionsPacket;
sealAndSend(s);
transition(Asking::Nothing);
}
break;
}
case GetTransactionsPacket:
@ -163,13 +330,14 @@ bool EthereumPeer::interpret(RLP const& _r)
{
clogS(NetMessageSummary) << "Transactions (" << dec << (_r.itemCount() - 1) << "entries)";
addRating(_r.itemCount() - 1);
lock_guard<recursive_mutex> l(host()->m_incomingLock);
Guard l(x_knownTransactions);
for (unsigned i = 1; i < _r.itemCount(); ++i)
{
host()->addIncomingTransaction(_r[i].data().toBytes());
lock_guard<mutex> l(x_knownTransactions);
m_knownTransactions.insert(sha3(_r[i].data()));
auto h = sha3(_r[i].data());
m_knownTransactions.insert(h);
if (!host()->m_tq.import(_r[i].data()))
// if we already had the transaction, then don't bother sending it on.
host()->m_transactionsSent.insert(h);
}
break;
}
@ -193,14 +361,14 @@ bool EthereumPeer::interpret(RLP const& _r)
{
clogS(NetMessageSummary) << "BlockHashes (" << dec << (_r.itemCount() - 1) << "entries)" << (_r.itemCount() - 1 ? "" : ": NoMoreHashes");
if (m_grabbing != Grabbing::Hashes)
if (m_asking != Asking::Hashes)
{
cwarn << "Peer giving us hashes when we didn't ask for them.";
break;
}
if (_r.itemCount() == 1)
{
host()->noteHaveChain(this);
transition(Asking::Blocks);
return true;
}
for (unsigned i = 1; i < _r.itemCount(); ++i)
@ -208,17 +376,14 @@ bool EthereumPeer::interpret(RLP const& _r)
auto h = _r[i].toHash<h256>();
if (host()->m_chain.isKnown(h))
{
host()->noteHaveChain(this);
transition(Asking::Blocks);
return true;
}
else
m_neededBlocks.push_back(h);
m_syncingNeededBlocks.push_back(h);
}
// run through - ask for more.
RLPStream s;
prep(s).appendList(3);
s << GetBlockHashesPacket << m_neededBlocks.back() << c_maxHashesAsk;
sealAndSend(s);
transition(Asking::Hashes);
break;
}
case GetBlocksPacket:
@ -244,93 +409,99 @@ bool EthereumPeer::interpret(RLP const& _r)
{
clogS(NetMessageSummary) << "Blocks (" << dec << (_r.itemCount() - 1) << "entries)" << (_r.itemCount() - 1 ? "" : ": NoMoreBlocks");
if (m_asking != Asking::Blocks)
clogS(NetWarn) << "Unexpected Blocks received!";
if (_r.itemCount() == 1)
{
// Couldn't get any from last batch - probably got to this peer's latest block - just give up.
if (m_grabbing == Grabbing::Chain || m_grabbing == Grabbing::ChainHelper)
giveUpOnFetch();
// Got to this peer's latest block - just give up.
transition(Asking::Nothing);
break;
}
unsigned used = 0;
unsigned success = 0;
unsigned future = 0;
unsigned unknown = 0;
unsigned got = 0;
for (unsigned i = 1; i < _r.itemCount(); ++i)
{
auto h = BlockInfo::headerHash(_r[i].data());
m_sub.noteBlock(h);
if (host()->noteBlock(h, _r[i].data()))
used++;
Guard l(x_knownBlocks);
m_knownBlocks.insert(h);
}
addRating(used);
unsigned knownParents = 0;
unsigned unknownParents = 0;
if (g_logVerbosity >= NetMessageSummary::verbosity)
{
unsigned ic = _r.itemCount();
for (unsigned i = 1; i < ic; ++i)
{
auto h = BlockInfo::headerHash(_r[i].data());
BlockInfo bi(_r[i].data());
Guard l(x_knownBlocks);
if (!host()->m_chain.details(bi.parentHash) && !m_knownBlocks.count(bi.parentHash))
{
unknownParents++;
clogS(NetAllDetail) << "Unknown parent" << bi.parentHash.abridged() << "of block" << h.abridged();
m_knownBlocks.insert(h);
}
else
switch (host()->m_bq.import(_r[i].data(), host()->m_chain))
{
knownParents++;
clogS(NetAllDetail) << "Known parent" << bi.parentHash.abridged() << "of block" << h.abridged();
}
}
}
clogS(NetMessageSummary) << dec << knownParents << "known parents," << unknownParents << "unknown," << used << "used.";
if (m_grabbing == Grabbing::Chain || m_grabbing == Grabbing::ChainHelper)
continueGettingChain();
case ImportResult::Success:
addRating(1);
success++;
break;
}
default:
return false;
}
case ImportResult::Malformed:
disable("Malformed block received.");
return true;
}
void EthereumPeer::ensureGettingChain()
{
if (m_grabbing == Grabbing::ChainHelper)
return; // Already asked & waiting for some.
case ImportResult::FutureTime:
future++;
break;
// Switch to ChainHelper otherwise, unless we're already the Chain grabber.
if (m_grabbing != Grabbing::Chain)
setGrabbing(Grabbing::ChainHelper);
case ImportResult::AlreadyInChain:
case ImportResult::AlreadyKnown:
got++;
break;
continueGettingChain();
case ImportResult::UnknownParent:
unknown++;
break;
}
}
void EthereumPeer::continueGettingChain()
{
// If we're getting the hashes already, then we shouldn't be asking for the chain.
if (m_grabbing == Grabbing::Hashes)
return;
clogS(NetMessageSummary) << dec << success << "imported OK," << unknown << "with unknown parents," << future << "with future timestamps," << got << " already known.";
auto blocks = m_sub.nextFetch(c_maxBlocksAsk);
if (m_asking == Asking::Blocks)
transition(Asking::Blocks);
break;
}
case NewBlockPacket:
{
auto h = BlockInfo::headerHash(_r[1].data());
clogS(NetMessageSummary) << "NewBlock: " << h.abridged();
if (blocks.size())
if (_r.itemCount() != 3)
disable("NewBlock without 2 data fields.");
else
{
RLPStream s;
prep(s);
s.appendList(blocks.size() + 1) << GetBlocksPacket;
for (auto const& i: blocks)
s << i;
sealAndSend(s);
switch (host()->m_bq.import(_r[1].data(), host()->m_chain))
{
case ImportResult::Success:
case ImportResult::FutureTime:
addRating(1);
break;
case ImportResult::Malformed:
disable("Malformed block received.");
break;
case ImportResult::AlreadyInChain:
case ImportResult::AlreadyKnown:
break;
case ImportResult::UnknownParent:
clogS(NetMessageSummary) << "Received block with no known parent. Resyncing...";
setNeedsSyncing(h, _r[2].toInt<u256>());
break;
}
else
giveUpOnFetch();
Guard l(x_knownBlocks);
m_knownBlocks.insert(h);
}
void EthereumPeer::setGrabbing(Grabbing _g)
{
m_grabbing = _g;
session()->addNote("grab", _g == Grabbing::Nothing ? "nothing" : _g == Grabbing::State ? "state" : _g == Grabbing::Hashes ? "hashes" : _g == Grabbing::Chain ? "chain" : _g == Grabbing::ChainHelper ? "chainhelper" : "?");
break;
}
default:
return false;
}
return true;
}

70
libethereum/EthereumPeer.h

@ -42,54 +42,90 @@ namespace eth
/**
* @brief The EthereumPeer class
* @todo Document fully.
* @todo make state transitions thread-safe.
*/
class EthereumPeer: public p2p::Capability
{
friend class EthereumHost;
public:
/// Basic constructor.
EthereumPeer(p2p::Session* _s, p2p::HostCapabilityFace* _h);
/// Basic destructor.
virtual ~EthereumPeer();
/// What is our name?
static std::string name() { return "eth"; }
/// What is the ethereum subprotocol host object.
EthereumHost* host() const;
private:
/// Interpret an incoming message.
virtual bool interpret(RLP const& _r);
void sendStatus();
void startInitialSync();
void tryGrabbingHashChain();
/// Transition state in a particular direction.
void transition(Asking _wantState, bool _force = false);
/// Ensure that we are waiting for a bunch of blocks from our peer.
void ensureGettingChain();
/// Ensure that we are waiting for a bunch of blocks from our peer.
void continueGettingChain();
/// Attempt to begin syncing with this peer; first check the peer has a more difficlult chain to download, then start asking for hashes, then move to blocks.
void attemptSync();
void giveUpOnFetch();
/// Abort the sync operation.
void abortSync();
/// Clear all known transactions.
void clearKnownTransactions() { std::lock_guard<std::mutex> l(x_knownTransactions); m_knownTransactions.clear(); }
void setGrabbing(Grabbing _g);
/// Update our asking state.
void setAsking(Asking _g, bool _isSyncing);
/// Update our syncing requirements state.
void setNeedsSyncing(h256 _latestHash, u256 _td);
void resetNeedsSyncing() { setNeedsSyncing(h256(), 0); }
/// Do we presently need syncing with this peer?
bool needsSyncing() const { return !!m_latestHash; }
/// Are we presently syncing with this peer?
bool isSyncing() const;
/// Check whether the session should bother grabbing the peer's blocks.
bool shouldGrabBlocks() const;
/// Peer's protocol version.
unsigned m_protocolVersion;
/// Peer's network id.
u256 m_networkId;
Grabbing m_grabbing;
/// What, if anything, we last asked the other peer for.
Asking m_asking = Asking::Nothing;
/// Whether this peer is in the process of syncing or not. Only one peer can be syncing at once.
bool m_isSyncing = false;
h256 m_latestHash; ///< Peer's latest block's hash.
/// These are determined through either a Status message or from NewBlock.
h256 m_latestHash; ///< Peer's latest block's hash that we know about or default null value if no need to sync.
u256 m_totalDifficulty; ///< Peer's latest block's total difficulty.
h256s m_neededBlocks; ///< The blocks that we should download from this peer.
/// Once a sync is started on this peer, they are cleared and moved into m_syncing*.
/// This is built as we ask for hashes. Once no more hashes are given, we present this to the
/// host who initialises the DownloadMan and m_sub becomes active for us to begin asking for blocks.
h256s m_syncingNeededBlocks; ///< The blocks that we should download from this peer.
h256 m_syncingLatestHash; ///< Peer's latest block's hash, as of the current sync.
u256 m_syncingTotalDifficulty; ///< Peer's latest block's total difficulty, as of the current sync.
/// Once we're asking for blocks, this becomes in use.
DownloadSub m_sub;
/// Have we received a GetTransactions packet that we haven't yet answered?
bool m_requireTransactions;
Mutex x_knownBlocks;
std::set<h256> m_knownBlocks;
std::set<h256> m_knownTransactions;
std::mutex x_knownTransactions;
h256Set m_knownBlocks; ///< Blocks that the peer already knows about (that don't need to be sent to them).
Mutex x_knownTransactions;
h256Set m_knownTransactions; ///< Transactions that the peer already knows of.
DownloadSub m_sub;
};
}

14
libp2p/Session.cpp

@ -87,7 +87,7 @@ bool Session::interpret(RLP const& _r)
if (m_server->havePeer(m_id))
{
// Already connected.
cwarn << "Already have peer id" << m_id.abridged();// << "at" << l->endpoint() << "rather than" << endpoint();
clogS(NetWarn) << "Already have peer id" << m_id.abridged();// << "at" << l->endpoint() << "rather than" << endpoint();
disconnect(DuplicatePeer);
return false;
}
@ -240,7 +240,7 @@ void Session::sendDestroy(bytes& _msg)
if (!checkPacket(bytesConstRef(&_msg)))
{
cwarn << "INVALID PACKET CONSTRUCTED!";
clogS(NetWarn) << "INVALID PACKET CONSTRUCTED!";
}
bytes buffer = bytes(std::move(_msg));
@ -253,7 +253,7 @@ void Session::send(bytesConstRef _msg)
if (!checkPacket(_msg))
{
cwarn << "INVALID PACKET CONSTRUCTED!";
clogS(NetWarn) << "INVALID PACKET CONSTRUCTED!";
}
bytes buffer = bytes(_msg.toBytes());
@ -288,7 +288,7 @@ void Session::write()
// must check queue, as write callback can occur following dropped()
if (ec)
{
cwarn << "Error sending: " << ec.message();
clogS(NetWarn) << "Error sending: " << ec.message();
dropped();
return;
}
@ -363,7 +363,7 @@ void Session::doRead()
if (ec && ec.category() != boost::asio::error::get_misc_category() && ec.value() != boost::asio::error::eof)
{
// got here with length of 1241...
cwarn << "Error reading: " << ec.message();
clogS(NetWarn) << "Error reading: " << ec.message();
dropped();
}
else if (ec && length == 0)
@ -380,7 +380,7 @@ void Session::doRead()
{
if (m_incoming[0] != 0x22 || m_incoming[1] != 0x40 || m_incoming[2] != 0x08 || m_incoming[3] != 0x91)
{
cwarn << "INVALID SYNCHRONISATION TOKEN; expected = 22400891; received = " << toHex(bytesConstRef(m_incoming.data(), 4));
clogS(NetWarn) << "INVALID SYNCHRONISATION TOKEN; expected = 22400891; received = " << toHex(bytesConstRef(m_incoming.data(), 4));
disconnect(BadProtocol);
return;
}
@ -396,7 +396,7 @@ void Session::doRead()
if (!checkPacket(data))
{
cerr << "Received " << len << ": " << toHex(bytesConstRef(m_incoming.data() + 8, len)) << endl;
cwarn << "INVALID MESSAGE RECEIVED";
clogS(NetWarn) << "INVALID MESSAGE RECEIVED";
disconnect(BadProtocol);
return;
}

Loading…
Cancel
Save