From 105be32bb4bfaf1f4e49f5993ffb3b863228b5eb Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Sun, 5 Apr 2015 16:33:51 +0200 Subject: [PATCH] Decent transaction import result provision. Give network a hint about what's going on for peer backoffs. Avoid sleeping in main loop when there's still work on. --- eth/main.cpp | 8 ++++---- libethcore/Common.h | 10 ++++++++++ libethereum/BlockChain.cpp | 6 +++--- libethereum/BlockChain.h | 2 +- libethereum/BlockQueue.h | 14 +++----------- libethereum/Client.cpp | 15 +++++++++++---- libethereum/ClientBase.cpp | 4 ++-- libethereum/EthereumPeer.cpp | 20 ++++++++++++++++++-- libethereum/TransactionQueue.cpp | 12 +++++++----- libethereum/TransactionQueue.h | 8 ++++---- libp2p/Capability.cpp | 2 +- libp2p/Capability.h | 2 +- libp2p/Session.cpp | 4 +++- libp2p/Session.h | 2 +- test/blockchain.cpp | 2 +- 15 files changed, 70 insertions(+), 41 deletions(-) diff --git a/eth/main.cpp b/eth/main.cpp index 3cdcba472..9ec6dec98 100644 --- a/eth/main.cpp +++ b/eth/main.cpp @@ -120,7 +120,7 @@ void help() << " -j,--json-rpc Enable JSON-RPC server (default: off)." << endl << " --json-rpc-port Specify JSON-RPC server port (implies '-j', default: " << SensibleHttpPort << ")." << endl #endif - << " -K,--kill-blockchain First kill the blockchain." << endl + << " -K,--kill First kill the blockchain." << endl << " --listen-ip Listen on the given port for incoming connections (default: 30303)." << endl << " -l,--listen Listen on the given IP for incoming connections (default: 0.0.0.0)." << endl << " -u,--public-ip Force public ip to given (default: auto)." << endl @@ -129,7 +129,7 @@ void help() << " -o,--mode Start a full node or a peer node (Default: full)." << endl << " -p,--port Connect to remote port (default: 30303)." << endl << " -P,--priority <0 - 100> Default % priority of a transaction (default: 50)." << endl - << " -R,--rebuild-blockchain First rebuild the blockchain from the existing database." << endl + << " -R,--rebuild First rebuild the blockchain from the existing database." << endl << " -r,--remote Connect to remote host (default: none)." << endl << " -s,--secret Set the secret key for use with send command (default: auto)." << endl << " -t,--miners Number of mining threads to start (Default: " << thread::hardware_concurrency() << ")" << endl @@ -274,9 +274,9 @@ int main(int argc, char** argv) return -1; } } - else if (arg == "-K" || arg == "--kill-blockchain") + else if (arg == "-K" || arg == "--kill-blockchain" || arg == "--kill") killChain = WithExisting::Kill; - else if (arg == "-B" || arg == "--rebuild-blockchain") + else if (arg == "-B" || arg == "--rebuild") killChain = WithExisting::Verify; else if ((arg == "-c" || arg == "--client-name") && i + 1 < argc) clientName = argv[++i]; diff --git a/libethcore/Common.h b/libethcore/Common.h index aabe663cd..56f1b704a 100644 --- a/libethcore/Common.h +++ b/libethcore/Common.h @@ -85,5 +85,15 @@ enum class RelativeBlock: BlockNumber Pending = PendingBlock }; +enum class ImportResult +{ + Success = 0, + UnknownParent, + FutureTime, + AlreadyInChain, + AlreadyKnown, + Malformed +}; + } } diff --git a/libethereum/BlockChain.cpp b/libethereum/BlockChain.cpp index 832d9465f..ae319cc14 100644 --- a/libethereum/BlockChain.cpp +++ b/libethereum/BlockChain.cpp @@ -267,7 +267,7 @@ LastHashes BlockChain::lastHashes(unsigned _n) const return m_lastLastHashes; } -h256s BlockChain::sync(BlockQueue& _bq, OverlayDB const& _stateDB, unsigned _max) +pair BlockChain::sync(BlockQueue& _bq, OverlayDB const& _stateDB, unsigned _max) { _bq.tick(*this); @@ -295,8 +295,8 @@ h256s BlockChain::sync(BlockQueue& _bq, OverlayDB const& _stateDB, unsigned _max catch (...) {} } - _bq.doneDrain(); - return ret; + bool yetMore = _bq.doneDrain(); + return make_pair(ret, yetMore); } h256s BlockChain::attemptImport(bytes const& _block, OverlayDB const& _stateDB, bool _force) noexcept diff --git a/libethereum/BlockChain.h b/libethereum/BlockChain.h index a45be0ab0..add4b43cd 100644 --- a/libethereum/BlockChain.h +++ b/libethereum/BlockChain.h @@ -99,7 +99,7 @@ public: void process(); /// Sync the chain with any incoming blocks. All blocks should, if processed in order - h256s sync(BlockQueue& _bq, OverlayDB const& _stateDB, unsigned _max); + std::pair sync(BlockQueue& _bq, OverlayDB const& _stateDB, unsigned _max); /// Attempt to import the given block directly into the CanonBlockChain and sync with the state DB. /// @returns the block hashes of any blocks that came into/went out of the canonical block chain. diff --git a/libethereum/BlockQueue.h b/libethereum/BlockQueue.h index 9c473d766..f34a53812 100644 --- a/libethereum/BlockQueue.h +++ b/libethereum/BlockQueue.h @@ -26,6 +26,7 @@ #include #include #include +#include namespace dev { @@ -37,16 +38,6 @@ class BlockChain; struct BlockQueueChannel: public LogChannel { static const char* name() { return "[]Q"; } static const int verbosity = 4; }; #define cblockq dev::LogOutputStream() -enum class ImportResult -{ - Success = 0, - UnknownParent, - FutureTime, - AlreadyInChain, - AlreadyKnown, - Malformed -}; - /** * @brief A queue of blocks. Sits between network or other I/O and the BlockChain. * Sorts them ready for blockchain insertion (with the BlockChain::sync() method). @@ -66,7 +57,8 @@ public: void drain(std::vector& o_out, unsigned _max); /// Must be called after a drain() call. Notes that the drained blocks have been imported into the blockchain, so we can forget about them. - void doneDrain() { WriteGuard l(m_lock); m_drainingSet.clear(); } + /// @returns true iff there are additional blocks ready to be processed. + bool doneDrain() { WriteGuard l(m_lock); m_drainingSet.clear(); return !m_readySet.empty(); } /// Notify the queue that the chain has changed and a new block has attained 'ready' status (i.e. is in the chain). void noteReady(h256 _b) { WriteGuard l(m_lock); noteReadyWithoutWriteGuard(_b); } diff --git a/libethereum/Client.cpp b/libethereum/Client.cpp index 3d9f17335..2f65e5f4b 100644 --- a/libethereum/Client.cpp +++ b/libethereum/Client.cpp @@ -120,7 +120,7 @@ void BasicGasPricer::update(BlockChain const& _bc) Client::Client(p2p::Host* _extNet, std::string const& _dbPath, WithExisting _forceAction, u256 _networkId, int _miners): Worker("eth"), m_vc(_dbPath), - m_bc(_dbPath, max(m_vc.action(), _forceAction), [](unsigned d, unsigned t){ cerr << "REVISING BLOCKCHAIN: Processed " << d << " of " << t << "..." << endl; }), + m_bc(_dbPath, max(m_vc.action(), _forceAction), [](unsigned d, unsigned t){ cerr << "REVISING BLOCKCHAIN: Processed " << d << " of " << t << "...\r"; }), m_gp(new TrivialGasPricer), m_stateDB(State::openDB(_dbPath, max(m_vc.action(), _forceAction))), m_preMine(Address(), m_stateDB), @@ -441,6 +441,8 @@ void Client::doWork() { // TODO: Use condition variable rather than polling. + bool stillGotWork = false; + cworkin << "WORK"; h256Set changeds; @@ -496,7 +498,10 @@ void Client::doWork() cwork << "BQ ==> CHAIN ==> STATE"; OverlayDB db = m_stateDB; x_stateDB.unlock(); - h256s newBlocks = m_bc.sync(m_bq, db, 100); // TODO: remove transactions from m_tq nicely rather than relying on out of date nonce later on. + h256s newBlocks; + bool sgw; + tie(newBlocks, sgw) = m_bc.sync(m_bq, db, 100); // TODO: remove transactions from m_tq nicely rather than relying on out of date nonce later on. + stillGotWork = stillGotWork | sgw; if (newBlocks.size()) { for (auto i: newBlocks) @@ -544,7 +549,9 @@ void Client::doWork() noteChanged(changeds); cworkout << "WORK"; - this_thread::sleep_for(chrono::milliseconds(100)); + if (!stillGotWork) + this_thread::sleep_for(chrono::milliseconds(100)); + if (chrono::system_clock::now() - m_lastGarbageCollection > chrono::seconds(5)) { // watches garbage collection @@ -601,7 +608,7 @@ void Client::inject(bytesConstRef _rlp) { startWorking(); - m_tq.attemptImport(_rlp); + m_tq.import(_rlp); } void Client::flushTransactions() diff --git a/libethereum/ClientBase.cpp b/libethereum/ClientBase.cpp index f949391fd..fae534ff8 100644 --- a/libethereum/ClientBase.cpp +++ b/libethereum/ClientBase.cpp @@ -44,7 +44,7 @@ void ClientBase::submitTransaction(Secret _secret, u256 _value, Address _dest, b u256 n = postMine().transactionsFrom(toAddress(_secret)); Transaction t(_value, _gasPrice, _gas, _dest, _data, n, _secret); - m_tq.attemptImport(t.rlp()); + m_tq.import(t.rlp()); StructuredLogger::transactionReceived(t.sha3().abridged(), t.sender().abridged()); cnote << "New transaction " << t; @@ -56,7 +56,7 @@ Address ClientBase::submitTransaction(Secret _secret, u256 _endowment, bytes con u256 n = postMine().transactionsFrom(toAddress(_secret)); Transaction t(_endowment, _gasPrice, _gas, _init, n, _secret); - m_tq.attemptImport(t.rlp()); + m_tq.import(t.rlp()); StructuredLogger::transactionReceived(t.sha3().abridged(), t.sender().abridged()); cnote << "New transaction " << t; diff --git a/libethereum/EthereumPeer.cpp b/libethereum/EthereumPeer.cpp index f318a1757..15ce5c3f6 100644 --- a/libethereum/EthereumPeer.cpp +++ b/libethereum/EthereumPeer.cpp @@ -326,15 +326,27 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) case TransactionsPacket: { clogS(NetMessageSummary) << "Transactions (" << dec << _r.itemCount() << "entries)"; - addRating(_r.itemCount()); Guard l(x_knownTransactions); for (unsigned i = 0; i < _r.itemCount(); ++i) { auto h = sha3(_r[i].data()); m_knownTransactions.insert(h); - if (!host()->m_tq.import(_r[i].data())) + ImportResult ir = host()->m_tq.import(_r[i].data()); + switch (ir) + { + case ImportResult::Malformed: + addRating(-100); + break; + case ImportResult::AlreadyKnown: // if we already had the transaction, then don't bother sending it on. + addRating(0); + break; + case ImportResult::Success: + addRating(100); host()->m_transactionsSent.insert(h); + break; + default:; + } } break; } @@ -352,6 +364,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) for (unsigned i = 0; i < c && p; ++i, p = host()->m_chain.details(p).parent) s << p; sealAndSend(s); + addRating(0); break; } case BlockHashesPacket: @@ -370,6 +383,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) } for (unsigned i = 0; i < _r.itemCount(); ++i) { + addRating(1); auto h = _r[i].toHash(); if (host()->m_chain.isKnown(h)) { @@ -398,6 +412,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) ++n; } } + addRating(0); RLPStream s; prep(s, BlocksPacket, n).appendRaw(rlp, n); sealAndSend(s); @@ -497,6 +512,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r) setNeedsSyncing(h, _r[1].toInt()); break; } + Guard l(x_knownBlocks); m_knownBlocks.insert(h); } diff --git a/libethereum/TransactionQueue.cpp b/libethereum/TransactionQueue.cpp index 803d320ee..26962ed90 100644 --- a/libethereum/TransactionQueue.cpp +++ b/libethereum/TransactionQueue.cpp @@ -28,14 +28,16 @@ using namespace std; using namespace dev; using namespace dev::eth; -bool TransactionQueue::import(bytesConstRef _transactionRLP) +ImportResult TransactionQueue::import(bytesConstRef _transactionRLP) { // Check if we already know this transaction. h256 h = sha3(_transactionRLP); UpgradableGuard l(m_lock); + // TODO: keep old transactions around and check in State for nonce validity + if (m_known.count(h)) - return false; + return ImportResult::AlreadyKnown; try { @@ -52,15 +54,15 @@ bool TransactionQueue::import(bytesConstRef _transactionRLP) catch (Exception const& _e) { cwarn << "Ignoring invalid transaction: " << diagnostic_information(_e); - return false; + return ImportResult::Malformed; } catch (std::exception const& _e) { cwarn << "Ignoring invalid transaction: " << _e.what(); - return false; + return ImportResult::Malformed; } - return true; + return ImportResult::Success; } void TransactionQueue::setFuture(std::pair const& _t) diff --git a/libethereum/TransactionQueue.h b/libethereum/TransactionQueue.h index b104b98ca..cf40e1209 100644 --- a/libethereum/TransactionQueue.h +++ b/libethereum/TransactionQueue.h @@ -23,8 +23,8 @@ #include #include -#include "libethcore/Common.h" #include +#include "libethcore/Common.h" #include "Transaction.h" namespace dev @@ -34,6 +34,7 @@ namespace eth class BlockChain; + /** * @brief A queue of Transactions, each stored as RLP. * @threadsafe @@ -41,9 +42,8 @@ class BlockChain; class TransactionQueue { public: - bool attemptImport(bytesConstRef _tx) { try { import(_tx); return true; } catch (...) { return false; } } - bool attemptImport(bytes const& _tx) { return attemptImport(&_tx); } - bool import(bytesConstRef _tx); + ImportResult import(bytes const& _tx) { return import(&_tx); } + ImportResult import(bytesConstRef _tx); void drop(h256 _txHash); diff --git a/libp2p/Capability.cpp b/libp2p/Capability.cpp index dccc130cd..f59fd8cdd 100644 --- a/libp2p/Capability.cpp +++ b/libp2p/Capability.cpp @@ -53,7 +53,7 @@ void Capability::sealAndSend(RLPStream& _s) m_session->sealAndSend(_s); } -void Capability::addRating(unsigned _r) +void Capability::addRating(int _r) { m_session->addRating(_r); } diff --git a/libp2p/Capability.h b/libp2p/Capability.h index ad8127bb5..d09391655 100644 --- a/libp2p/Capability.h +++ b/libp2p/Capability.h @@ -52,7 +52,7 @@ protected: RLPStream& prep(RLPStream& _s, unsigned _id, unsigned _args = 0); void sealAndSend(RLPStream& _s); - void addRating(unsigned _r); + void addRating(int _r); private: Session* m_session; diff --git a/libp2p/Session.cpp b/libp2p/Session.cpp index 133d8a30b..0bf07bbe1 100644 --- a/libp2p/Session.cpp +++ b/libp2p/Session.cpp @@ -77,12 +77,14 @@ NodeId Session::id() const return m_peer ? m_peer->id : NodeId(); } -void Session::addRating(unsigned _r) +void Session::addRating(int _r) { if (m_peer) { m_peer->m_rating += _r; m_peer->m_score += _r; + if (_r >= 0) + m_peer->noteSessionGood(); } } diff --git a/libp2p/Session.h b/libp2p/Session.h index 51db5adc3..95053d2a9 100644 --- a/libp2p/Session.h +++ b/libp2p/Session.h @@ -74,7 +74,7 @@ public: void sealAndSend(RLPStream& _s); int rating() const; - void addRating(unsigned _r); + void addRating(int _r); void addNote(std::string const& _k, std::string const& _v) { m_info.notes[_k] = _v; } diff --git a/test/blockchain.cpp b/test/blockchain.cpp index ffb55da30..21345abfd 100644 --- a/test/blockchain.cpp +++ b/test/blockchain.cpp @@ -98,7 +98,7 @@ void doBlockchainTests(json_spirit::mValue& _v, bool _fillin) { mObject tx = txObj.get_obj(); importer.importTransaction(tx); - if (!txs.attemptImport(importer.m_transaction.rlp())) + if (txs.import(importer.m_transaction.rlp()) != ImportResult::Success) cnote << "failed importing transaction\n"; }