/* This file is part of cpp-ethereum. cpp-ethereum is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. cpp-ethereum is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with cpp-ethereum. If not, see . */ /** @file EthereumHost.cpp * @author Gav Wood * @date 2014 */ #include "EthereumHost.h" #include #include #include #include #include #include #include #include "BlockChain.h" #include "TransactionQueue.h" #include "BlockQueue.h" #include "EthereumPeer.h" #include "DownloadMan.h" using namespace std; using namespace dev; using namespace dev::eth; using namespace p2p; EthereumHost::EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQueue& _bq, u256 _networkId): HostCapability(), Worker ("ethsync"), m_chain (_ch), m_tq (_tq), m_bq (_bq), m_networkId (_networkId) { m_latestBlockSent = _ch.currentHash(); } EthereumHost::~EthereumHost() { for (auto const& i: peers()) i->cap()->giveUpOnFetch(); } h256Set EthereumHost::neededBlocks(h256Set const& _exclude) { Guard l(x_blocksNeeded); h256Set ret; if (m_blocksNeeded.size()) { int s = m_blocksNeeded.size() - 1; for (; ret.size() < c_maxBlocksAsk && s < (int)m_blocksNeeded.size() && s >= 0; --s) if (!_exclude.count(m_blocksNeeded[s])) { auto it = m_blocksNeeded.begin() + s; ret.insert(*it); m_blocksOnWay.insert(*it); m_blocksNeeded.erase(it); } } if (ret.empty()) for (auto i = m_blocksOnWay.begin(); ret.size() < c_maxBlocksAsk && i != m_blocksOnWay.end() && !_exclude.count(*i); ++i) ret.insert(*i); if (ret.size()) clog(NetMessageSummary) << "Asking for" << ret.size() << "blocks that we don't yet have." << m_blocksNeeded.size() << "blocks still needed," << m_blocksOnWay.size() << "total blocks on way."; return ret; } bool EthereumHost::ensureInitialised(TransactionQueue& _tq) { if (!m_latestBlockSent) { // First time - just initialise. m_latestBlockSent = m_chain.currentHash(); clog(NetNote) << "Initialising: latest=" << m_latestBlockSent.abridged(); for (auto const& i: _tq.transactions()) m_transactionsSent.insert(i.first); return true; } return false; } void EthereumHost::noteHavePeerState(EthereumPeer* _who) { clog(NetAllDetail) << "Have peer state."; // if already downloading hash-chain, ignore. if (m_grabbing != Grabbing::Nothing) { clog(NetAllDetail) << "Already downloading chain. Just set to help out."; _who->restartGettingChain(); return; } // otherwise check to see if we should be downloading... _who->tryGrabbingHashChain(); } void EthereumHost::updateGrabbing(Grabbing _g) { m_grabbing = _g; if (_g == Grabbing::Nothing) readyForSync(); else if (_g == Grabbing::Chain) for (auto j: peers()) j->cap()->restartGettingChain(); } void EthereumHost::noteHaveChain(EthereumPeer* _from) { auto td = _from->m_totalDifficulty; if (_from->m_neededBlocks.empty()) { _from->m_grabbing = Grabbing::Nothing; updateGrabbing(Grabbing::Nothing); return; } clog(NetNote) << "Hash-chain COMPLETE:" << _from->m_totalDifficulty << "vs" << m_chain.details().totalDifficulty << "," << m_totalDifficultyOfNeeded << ";" << _from->m_neededBlocks.size() << " blocks, ends" << _from->m_neededBlocks.back().abridged(); if ((m_totalDifficultyOfNeeded && (td < m_totalDifficultyOfNeeded || (td == m_totalDifficultyOfNeeded && m_latestBlockSent == _from->m_latestHash))) || td < m_chain.details().totalDifficulty || (td == m_chain.details().totalDifficulty && m_chain.currentHash() == _from->m_latestHash)) { clog(NetNote) << "Difficulty of hashchain not HIGHER. Ignoring."; _from->m_grabbing = Grabbing::Nothing; updateGrabbing(Grabbing::Nothing); return; } clog(NetNote) << "Difficulty of hashchain HIGHER. Replacing fetch queue [latest now" << _from->m_latestHash.abridged() << ", was" << m_latestBlockSent.abridged() << "]"; // Looks like it's the best yet for total difficulty. Set to download. { Guard l(x_blocksNeeded); m_blocksNeeded = _from->m_neededBlocks; m_blocksOnWay.clear(); m_totalDifficultyOfNeeded = td; m_latestBlockSent = _from->m_latestHash; } _from->m_grabbing = Grabbing::Chain; updateGrabbing(Grabbing::Chain); } void EthereumHost::readyForSync() { // start grabbing next hash chain if there is one. for (auto j: peers()) { j->cap()->tryGrabbingHashChain(); if (j->cap()->m_grabbing == Grabbing::Hashes) { m_grabbing = Grabbing::Hashes; return; } } clog(NetNote) << "No more peers to sync with."; } void EthereumHost::noteDoneBlocks() { if (m_blocksOnWay.empty()) { // Done our chain-get. if (m_blocksNeeded.size()) clog(NetNote) << "No more blocks coming. Missing" << m_blocksNeeded.size() << "blocks."; else clog(NetNote) << "No more blocks to get."; updateGrabbing(Grabbing::Nothing); } } bool EthereumHost::noteBlock(h256 _hash, bytesConstRef _data) { Guard l(x_blocksNeeded); m_blocksOnWay.erase(_hash); if (!m_chain.details(_hash)) { lock_guard l(m_incomingLock); m_incomingBlocks.push_back(_data.toBytes()); return true; } return false; } void EthereumHost::doWork() { bool netChange = ensureInitialised(m_tq); auto h = m_chain.currentHash(); maintainTransactions(m_tq, h); maintainBlocks(m_bq, h); // return netChange; // TODO: Figure out what to do with netChange. (void)netChange; } void EthereumHost::maintainTransactions(TransactionQueue& _tq, h256 _currentHash) { bool resendAll = (_currentHash != m_latestBlockSent); { lock_guard l(m_incomingLock); for (auto it = m_incomingTransactions.begin(); it != m_incomingTransactions.end(); ++it) if (_tq.import(&*it)) {}//ret = true; // just putting a transaction in the queue isn't enough to change the state - it might have an invalid nonce... else m_transactionsSent.insert(sha3(*it)); // if we already had the transaction, then don't bother sending it on. m_incomingTransactions.clear(); } // Send any new transactions. for (auto const& p: peers()) { auto ep = p->cap(); if (ep) { bytes b; unsigned n = 0; for (auto const& i: _tq.transactions()) if ((!m_transactionsSent.count(i.first) && !ep->m_knownTransactions.count(i.first)) || ep->m_requireTransactions || resendAll) { b += i.second; ++n; m_transactionsSent.insert(i.first); } ep->clearKnownTransactions(); if (n) { RLPStream ts; EthereumPeer::prep(ts); ts.appendList(n + 1) << TransactionsPacket; ts.appendRaw(b, n).swapOut(b); seal(b); ep->send(&b); } ep->m_requireTransactions = false; } } } void EthereumHost::reset() { m_grabbing = Grabbing::Nothing; m_incomingTransactions.clear(); m_incomingBlocks.clear(); m_totalDifficultyOfNeeded = 0; m_blocksNeeded.clear(); m_blocksOnWay.clear(); m_latestBlockSent = h256(); m_transactionsSent.clear(); } void EthereumHost::maintainBlocks(BlockQueue& _bq, h256 _currentHash) { // Import new blocks { lock_guard l(m_incomingLock); for (auto it = m_incomingBlocks.rbegin(); it != m_incomingBlocks.rend(); ++it) if (_bq.import(&*it, m_chain)) {} else{} // TODO: don't forward it. m_incomingBlocks.clear(); } // If we've finished our initial sync send any new blocks. if (m_grabbing == Grabbing::Nothing && m_chain.details(m_latestBlockSent) && m_chain.details(m_latestBlockSent).totalDifficulty < m_chain.details(_currentHash).totalDifficulty) { RLPStream ts; EthereumPeer::prep(ts); bytes bs; unsigned c = 0; for (auto h: m_chain.treeRoute(m_latestBlockSent, _currentHash, nullptr, false, true)) { bs += m_chain.block(h); ++c; } clog(NetMessageSummary) << "Sending" << c << "new blocks (current is" << _currentHash << ", was" << m_latestBlockSent << ")"; ts.appendList(1 + c).append(BlocksPacket).appendRaw(bs, c); bytes b; ts.swapOut(b); seal(b); for (auto j: peers()) { auto p = j->cap(); Guard l(p->x_knownBlocks); if (!p->m_knownBlocks.count(_currentHash)) p->send(&b); p->m_knownBlocks.clear(); } m_latestBlockSent = _currentHash; } }