Browse Source

Remove incoming queue. Put things straight into actual queues.

Make state items more fitting.
cl-refactor
Gav Wood 10 years ago
parent
commit
3f61b506db
  1. 14
      libethereum/BlockQueue.cpp
  2. 17
      libethereum/BlockQueue.h
  3. 10
      libethereum/CommonNet.h
  4. 39
      libethereum/EthereumHost.cpp
  5. 7
      libethereum/EthereumHost.h
  6. 118
      libethereum/EthereumPeer.cpp
  7. 6
      libethereum/EthereumPeer.h
  8. 14
      libp2p/Session.cpp

14
libethereum/BlockQueue.cpp

@ -29,7 +29,7 @@ using namespace std;
using namespace dev;
using namespace dev::eth;
bool BlockQueue::import(bytesConstRef _block, BlockChain const& _bc)
ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc)
{
// Check if we already know this block.
h256 h = BlockInfo::headerHash(_block);
@ -42,7 +42,7 @@ bool BlockQueue::import(bytesConstRef _block, BlockChain const& _bc)
{
// Already know about this one.
cblockq << "Already known.";
return false;
return ImportResult::AlreadyKnown;
}
// VERIFY: populates from the block and checks the block is internally coherent.
@ -59,7 +59,7 @@ bool BlockQueue::import(bytesConstRef _block, BlockChain const& _bc)
catch (Exception const& _e)
{
cwarn << "Ignoring malformed block: " << _e.description();
return false;
return ImportResult::Malformed;
}
#endif
@ -67,7 +67,7 @@ bool BlockQueue::import(bytesConstRef _block, BlockChain const& _bc)
if (_bc.details(h))
{
cblockq << "Already known in chain.";
return false;
return ImportResult::AlreadyInChain;
}
UpgradeGuard ul(l);
@ -77,6 +77,7 @@ bool BlockQueue::import(bytesConstRef _block, BlockChain const& _bc)
{
m_future.insert(make_pair((unsigned)bi.timestamp, _block.toBytes()));
cblockq << "OK - queued for future.";
return ImportResult::FutureTime;
}
else
{
@ -87,6 +88,8 @@ bool BlockQueue::import(bytesConstRef _block, BlockChain const& _bc)
cblockq << "OK - queued as unknown parent:" << bi.parentHash.abridged();
m_unknown.insert(make_pair(bi.parentHash, make_pair(h, _block.toBytes())));
m_unknownSet.insert(h);
return ImportResult::UnknownParent;
}
else
{
@ -96,10 +99,9 @@ bool BlockQueue::import(bytesConstRef _block, BlockChain const& _bc)
m_readySet.insert(h);
noteReadyWithoutWriteGuard(h);
return ImportResult::Success;
}
}
return true;
}
void BlockQueue::tick(BlockChain const& _bc)

17
libethereum/BlockQueue.h

@ -37,6 +37,16 @@ class BlockChain;
struct BlockQueueChannel: public LogChannel { static const char* name() { return "[]Q"; } static const int verbosity = 4; };
#define cblockq dev::LogOutputStream<dev::eth::BlockQueueChannel, true>()
enum class ImportResult
{
Success = 0,
UnknownParent,
FutureTime,
AlreadyInChain,
AlreadyKnown,
Malformed
};
/**
* @brief A queue of blocks. Sits between network or other I/O and the BlockChain.
* Sorts them ready for blockchain insertion (with the BlockChain::sync() method).
@ -46,7 +56,7 @@ class BlockQueue
{
public:
/// Import a block into the queue.
bool import(bytesConstRef _tx, BlockChain const& _bc);
ImportResult import(bytesConstRef _tx, BlockChain const& _bc);
/// Notes that time has moved on and some blocks that used to be "in the future" may no be valid.
void tick(BlockChain const& _bc);
@ -67,6 +77,9 @@ public:
/// Clear everything.
void clear() { WriteGuard l(m_lock); m_readySet.clear(); m_drainingSet.clear(); m_ready.clear(); m_unknownSet.clear(); m_unknown.clear(); m_future.clear(); }
/// Return first block with an unknown parent.
h256 firstUnknown() const { ReadGuard l(m_lock); return m_unknownSet.size() ? *m_unknownSet.begin() : h256(); }
private:
void noteReadyWithoutWriteGuard(h256 _b);
void notePresentWithoutWriteGuard(bytesConstRef _block);
@ -77,7 +90,7 @@ private:
std::vector<bytes> m_ready; ///< List of blocks, in correct order, ready for chain-import.
std::set<h256> m_unknownSet; ///< Set of all blocks whose parents are not ready/in-chain.
std::multimap<h256, std::pair<h256, bytes>> m_unknown; ///< For transactions that have an unknown parent; we map their parent hash to the block stuff, and insert once the block appears.
std::multimap<unsigned, bytes> m_future; ///< Set of blocks that are not yet valid.
std::multimap<unsigned, bytes> m_future; ///< Set of blocks that are not yet valid.
};
}

10
libethereum/CommonNet.h

@ -65,10 +65,16 @@ enum class Grabbing
{
State,
Hashes,
Chain,
ChainHelper,
Blocks,
Nothing
};
enum class Syncing
{
Waiting,
Executing,
Done
};
}
}

39
libethereum/EthereumHost.cpp

@ -70,9 +70,9 @@ bool EthereumHost::ensureInitialised(TransactionQueue& _tq)
return false;
}
void EthereumHost::noteHavePeerState(EthereumPeer* _who)
void EthereumHost::notePeerStateChanged(EthereumPeer* _who)
{
clog(NetAllDetail) << "Have peer state.";
clog(NetAllDetail) << "Peer state changed.";
// TODO: FIX: BUG: Better state management!
@ -80,7 +80,7 @@ void EthereumHost::noteHavePeerState(EthereumPeer* _who)
if (m_grabbing != Grabbing::Nothing)
{
for (auto const& i: peers())
if (i->cap<EthereumPeer>()->m_grabbing == m_grabbing || m_grabbing == Grabbing::State)
if (i->cap<EthereumPeer>()->m_grabbing == m_grabbing || m_grabbing == Grabbing::Presync)
{
clog(NetAllDetail) << "Already downloading chain. Just set to help out.";
_who->ensureGettingChain();
@ -205,9 +205,7 @@ void EthereumHost::maintainTransactions(TransactionQueue& _tq, h256 _currentHash
// Send any new transactions.
for (auto const& p: peers())
{
auto ep = p->cap<EthereumPeer>();
if (ep)
if (auto ep = p->cap<EthereumPeer>())
{
bytes b;
unsigned n = 0;
@ -231,7 +229,6 @@ void EthereumHost::maintainTransactions(TransactionQueue& _tq, h256 _currentHash
}
ep->m_requireTransactions = false;
}
}
}
void EthereumHost::reset()
@ -249,36 +246,20 @@ void EthereumHost::reset()
void EthereumHost::maintainBlocks(BlockQueue& _bq, h256 _currentHash)
{
// Import new blocks
{
lock_guard<recursive_mutex> l(m_incomingLock);
for (auto it = m_incomingBlocks.rbegin(); it != m_incomingBlocks.rend(); ++it)
if (_bq.import(&*it, m_chain))
{}
else{} // TODO: don't forward it.
m_incomingBlocks.clear();
}
// If we've finished our initial sync send any new blocks.
if (m_grabbing == Grabbing::Nothing && m_chain.isKnown(m_latestBlockSent) && m_chain.details(m_latestBlockSent).totalDifficulty < m_chain.details(_currentHash).totalDifficulty)
{
// TODO: clean up
h256s hs;
hs.push_back(_currentHash);
RLPStream ts;
EthereumPeer::prep(ts);
bytes bs;
unsigned c = 0;
for (auto h: m_chain.treeRoute(m_latestBlockSent, _currentHash, nullptr, false, true))
{
for (auto h: hs)
bs += m_chain.block(h);
++c;
}
clog(NetMessageSummary) << "Sending" << c << "new blocks (current is" << _currentHash << ", was" << m_latestBlockSent << ")";
if (c > 1000)
{
cwarn << "Gaa this would be an awful lot of new blocks. Not bothering";
return;
}
clog(NetMessageSummary) << "Sending" << hs.size() << "new blocks (current is" << _currentHash << ", was" << m_latestBlockSent << ")";
ts.appendList(1 + c).append(BlocksPacket).appendRaw(bs, c);
ts.appendList(1 + hs.size()).append(BlocksPacket).appendRaw(bs, hs.size());
bytes b;
ts.swapOut(b);
seal(b);

7
libethereum/EthereumHost.h

@ -85,9 +85,6 @@ private:
/// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network.
void doWork();
/// Called by peer to add incoming transactions.
void addIncomingTransaction(bytes const& _bytes) { std::lock_guard<std::recursive_mutex> l(m_incomingLock); m_incomingTransactions.push_back(_bytes); }
void maintainTransactions(TransactionQueue& _tq, h256 _currentBlock);
void maintainBlocks(BlockQueue& _bq, h256 _currentBlock);
@ -116,10 +113,6 @@ private:
Grabbing m_grabbing = Grabbing::Nothing; // TODO: needs to be thread-safe & switch to just having a peer id.
mutable std::recursive_mutex m_incomingLock;
std::vector<bytes> m_incomingTransactions;
std::vector<bytes> m_incomingBlocks;
DownloadMan m_man;
h256 m_latestBlockSent;

118
libethereum/EthereumPeer.cpp

@ -38,7 +38,7 @@ EthereumPeer::EthereumPeer(Session* _s, HostCapabilityFace* _h):
Capability(_s, _h),
m_sub(host()->m_man)
{
setGrabbing(Grabbing::State);
setAsking(Asking::State, Syncing::Done);
sendStatus();
}
@ -75,13 +75,19 @@ void EthereumPeer::startInitialSync()
sealAndSend(s);
}
host()->noteHavePeerState(this);
host()->notePeerStateChanged(this);
}
void EthereumPeer::tryGrabbingHashChain()
{
if (m_asking != Asking::Nothing)
{
clogS(NetAllDetail) << "Can't synced with this peer - outstanding asks.";
return;
}
// if already done this, then ignore.
if (m_grabbing != Grabbing::State)
if (m_syncing == Syncing::Done)
{
clogS(NetAllDetail) << "Already synced with this peer.";
return;
@ -95,7 +101,7 @@ void EthereumPeer::tryGrabbingHashChain()
if (td >= m_totalDifficulty)
{
clogS(NetAllDetail) << "No. Our chain is better.";
setGrabbing(Grabbing::Nothing);
setAsking(Asking::Nothing, Syncing::Done);
return; // All good - we have the better chain.
}
@ -103,8 +109,8 @@ void EthereumPeer::tryGrabbingHashChain()
{
clogS(NetAllDetail) << "Yes. Their chain is better.";
host()->updateGrabbing(Grabbing::Hashes);
setGrabbing(Grabbing::Hashes);
host()->updateGrabbing(Asking::Hashes);
setAsking(Asking::Hashes, Syncing::Executing);
RLPStream s;
prep(s).appendList(3);
s << GetBlockHashesPacket << m_latestHash << c_maxHashesAsk;
@ -118,10 +124,10 @@ void EthereumPeer::giveUpOnFetch()
clogS(NetNote) << "Finishing fetch...";
// a bit overkill given that the other nodes may yet have the needed blocks, but better to be safe than sorry.
if (m_grabbing == Grabbing::Chain || m_grabbing == Grabbing::ChainHelper)
if (m_asking == Asking::Blocks || m_asking == Asking::ChainHelper)
{
host()->noteDoneBlocks(this);
setGrabbing(Grabbing::Nothing);
setAsking(Asking::Nothing);
}
// NOTE: need to notify of giving up on chain-hashes, too, altering state as necessary.
@ -163,12 +169,11 @@ bool EthereumPeer::interpret(RLP const& _r)
{
clogS(NetMessageSummary) << "Transactions (" << dec << (_r.itemCount() - 1) << "entries)";
addRating(_r.itemCount() - 1);
lock_guard<recursive_mutex> l(host()->m_incomingLock);
RecursiveGuard l(m_incomingLock);
Guard l(x_knownTransactions);
for (unsigned i = 1; i < _r.itemCount(); ++i)
{
host()->addIncomingTransaction(_r[i].data().toBytes());
lock_guard<mutex> l(x_knownTransactions);
m_incomingTransactions.push_back(_r[i].data().toBytes());
m_knownTransactions.insert(sha3(_r[i].data()));
}
break;
@ -193,7 +198,7 @@ bool EthereumPeer::interpret(RLP const& _r)
{
clogS(NetMessageSummary) << "BlockHashes (" << dec << (_r.itemCount() - 1) << "entries)" << (_r.itemCount() - 1 ? "" : ": NoMoreHashes");
if (m_grabbing != Grabbing::Hashes)
if (m_asking != Asking::Hashes)
{
cwarn << "Peer giving us hashes when we didn't ask for them.";
break;
@ -247,21 +252,62 @@ bool EthereumPeer::interpret(RLP const& _r)
if (_r.itemCount() == 1)
{
// Couldn't get any from last batch - probably got to this peer's latest block - just give up.
if (m_grabbing == Grabbing::Chain || m_grabbing == Grabbing::ChainHelper)
if (m_asking == Asking::Blocks || m_asking == Asking::ChainHelper)
giveUpOnFetch();
break;
}
unsigned used = 0;
unsigned success = 0;
unsigned got = 0;
unsigned bad = 0;
unsigned unknown = 0;
unsigned future = 0;
for (unsigned i = 1; i < _r.itemCount(); ++i)
{
auto h = BlockInfo::headerHash(_r[i].data());
m_sub.noteBlock(h);
if (host()->noteBlock(h, _r[i].data()))
used++;
Guard l(x_knownBlocks);
m_knownBlocks.insert(h);
{
Guard l(x_knownBlocks);
m_knownBlocks.insert(h);
}
switch (host()->m_bq.import(_r[i].data(), host()->m_chain))
{
case ImportResult::Success:
success++;
break;
case ImportResult::Malformed:
bad++;
break;
case ImportResult::FutureTime:
future++;
break;
case ImportResult::AlreadyInChain:
case ImportResult::AlreadyKnown:
got++;
break;
case ImportResult::UnknownParent:
unknown++;
break;
}
}
if (unknown && m_asking == Asking::Nothing)
{
// TODO: kick off resync.
}
if (bad)
{
// TODO: punish peer
}
addRating(used);
unsigned knownParents = 0;
unsigned unknownParents = 0;
@ -286,7 +332,7 @@ bool EthereumPeer::interpret(RLP const& _r)
}
}
clogS(NetMessageSummary) << dec << knownParents << "known parents," << unknownParents << "unknown," << used << "used.";
if (m_grabbing == Grabbing::Chain || m_grabbing == Grabbing::ChainHelper)
if (m_asking == Asking::Blocks || m_asking == Asking::ChainHelper)
continueGettingChain();
break;
}
@ -298,20 +344,18 @@ bool EthereumPeer::interpret(RLP const& _r)
void EthereumPeer::ensureGettingChain()
{
if (m_grabbing == Grabbing::ChainHelper)
if (m_helping)
return; // Already asked & waiting for some.
// Switch to ChainHelper otherwise, unless we're already the Chain grabber.
if (m_grabbing != Grabbing::Chain)
setGrabbing(Grabbing::ChainHelper);
// Help otherwise, unless we're already the Chain grabber.
setHelping(true);
continueGettingChain();
}
void EthereumPeer::continueGettingChain()
{
// If we're getting the hashes already, then we shouldn't be asking for the chain.
if (m_grabbing == Grabbing::Hashes)
if (m_asking == Asking::Hashes)
return;
auto blocks = m_sub.nextFetch(c_maxBlocksAsk);
@ -329,8 +373,24 @@ void EthereumPeer::continueGettingChain()
giveUpOnFetch();
}
void EthereumPeer::setGrabbing(Grabbing _g)
/*
* Possible asking/syncing states for two peers:
* state/ presync
* presync hashes
* presync chain (transiently)
* presync+ chain
* presync nothing
* hashes nothing
* chain hashes
* presync chain (transiently)
* presync+ chain
* presync nothing
*/
void EthereumPeer::setAsking(Asking _a, Syncing _s)
{
m_grabbing = _g;
session()->addNote("grab", _g == Grabbing::Nothing ? "nothing" : _g == Grabbing::State ? "state" : _g == Grabbing::Hashes ? "hashes" : _g == Grabbing::Chain ? "chain" : _g == Grabbing::ChainHelper ? "chainhelper" : "?");
m_asking = _a;
m_syncing = _s;
session()->addNote("ask", _a == Asking::Nothing ? "nothing" : _a == Asking::State ? "state" : _a == Asking::Hashes ? "hashes" : _a == Asking::Blocks ? "blocks" : "?");
session()->addNote("sync", _s == Syncing::Done ? "done" : _s == Syncing::Waiting ? "wait" : _s == Syncing::Executing ? "exec" : "?");
}

6
libethereum/EthereumPeer.h

@ -71,12 +71,14 @@ private:
void giveUpOnFetch();
void clearKnownTransactions() { std::lock_guard<std::mutex> l(x_knownTransactions); m_knownTransactions.clear(); }
void setGrabbing(Grabbing _g);
void setAsking(Asking _g, bool _helping = false);
void setHelping(bool _helping = false) { setAsking(m_asking, _helping); }
unsigned m_protocolVersion;
u256 m_networkId;
Grabbing m_grabbing;
Asking m_asking;
Syncing m_syncing;
h256 m_latestHash; ///< Peer's latest block's hash.
u256 m_totalDifficulty; ///< Peer's latest block's total difficulty.

14
libp2p/Session.cpp

@ -87,7 +87,7 @@ bool Session::interpret(RLP const& _r)
if (m_server->havePeer(m_id))
{
// Already connected.
cwarn << "Already have peer id" << m_id.abridged();// << "at" << l->endpoint() << "rather than" << endpoint();
clogS(NetWarn) << "Already have peer id" << m_id.abridged();// << "at" << l->endpoint() << "rather than" << endpoint();
disconnect(DuplicatePeer);
return false;
}
@ -240,7 +240,7 @@ void Session::sendDestroy(bytes& _msg)
if (!checkPacket(bytesConstRef(&_msg)))
{
cwarn << "INVALID PACKET CONSTRUCTED!";
clogS(NetWarn) << "INVALID PACKET CONSTRUCTED!";
}
bytes buffer = bytes(std::move(_msg));
@ -253,7 +253,7 @@ void Session::send(bytesConstRef _msg)
if (!checkPacket(_msg))
{
cwarn << "INVALID PACKET CONSTRUCTED!";
clogS(NetWarn) << "INVALID PACKET CONSTRUCTED!";
}
bytes buffer = bytes(_msg.toBytes());
@ -288,7 +288,7 @@ void Session::write()
// must check queue, as write callback can occur following dropped()
if (ec)
{
cwarn << "Error sending: " << ec.message();
clogS(NetWarn) << "Error sending: " << ec.message();
dropped();
return;
}
@ -363,7 +363,7 @@ void Session::doRead()
if (ec && ec.category() != boost::asio::error::get_misc_category() && ec.value() != boost::asio::error::eof)
{
// got here with length of 1241...
cwarn << "Error reading: " << ec.message();
clogS(NetWarn) << "Error reading: " << ec.message();
dropped();
}
else if (ec && length == 0)
@ -380,7 +380,7 @@ void Session::doRead()
{
if (m_incoming[0] != 0x22 || m_incoming[1] != 0x40 || m_incoming[2] != 0x08 || m_incoming[3] != 0x91)
{
cwarn << "INVALID SYNCHRONISATION TOKEN; expected = 22400891; received = " << toHex(bytesConstRef(m_incoming.data(), 4));
clogS(NetWarn) << "INVALID SYNCHRONISATION TOKEN; expected = 22400891; received = " << toHex(bytesConstRef(m_incoming.data(), 4));
disconnect(BadProtocol);
return;
}
@ -396,7 +396,7 @@ void Session::doRead()
if (!checkPacket(data))
{
cerr << "Received " << len << ": " << toHex(bytesConstRef(m_incoming.data() + 8, len)) << endl;
cwarn << "INVALID MESSAGE RECEIVED";
clogS(NetWarn) << "INVALID MESSAGE RECEIVED";
disconnect(BadProtocol);
return;
}

Loading…
Cancel
Save