Browse Source

Decent transaction import result provision.

Give network a hint about what's going on for peer backoffs.
Avoid sleeping in main loop when there's still work on.
cl-refactor
Gav Wood 10 years ago
parent
commit
105be32bb4
  1. 8
      eth/main.cpp
  2. 10
      libethcore/Common.h
  3. 6
      libethereum/BlockChain.cpp
  4. 2
      libethereum/BlockChain.h
  5. 14
      libethereum/BlockQueue.h
  6. 13
      libethereum/Client.cpp
  7. 4
      libethereum/ClientBase.cpp
  8. 20
      libethereum/EthereumPeer.cpp
  9. 12
      libethereum/TransactionQueue.cpp
  10. 8
      libethereum/TransactionQueue.h
  11. 2
      libp2p/Capability.cpp
  12. 2
      libp2p/Capability.h
  13. 4
      libp2p/Session.cpp
  14. 2
      libp2p/Session.h
  15. 2
      test/blockchain.cpp

8
eth/main.cpp

@ -120,7 +120,7 @@ void help()
<< " -j,--json-rpc Enable JSON-RPC server (default: off)." << endl << " -j,--json-rpc Enable JSON-RPC server (default: off)." << endl
<< " --json-rpc-port Specify JSON-RPC server port (implies '-j', default: " << SensibleHttpPort << ")." << endl << " --json-rpc-port Specify JSON-RPC server port (implies '-j', default: " << SensibleHttpPort << ")." << endl
#endif #endif
<< " -K,--kill-blockchain First kill the blockchain." << endl << " -K,--kill First kill the blockchain." << endl
<< " --listen-ip <port> Listen on the given port for incoming connections (default: 30303)." << endl << " --listen-ip <port> Listen on the given port for incoming connections (default: 30303)." << endl
<< " -l,--listen <ip> Listen on the given IP for incoming connections (default: 0.0.0.0)." << endl << " -l,--listen <ip> Listen on the given IP for incoming connections (default: 0.0.0.0)." << endl
<< " -u,--public-ip <ip> Force public ip to given (default: auto)." << endl << " -u,--public-ip <ip> Force public ip to given (default: auto)." << endl
@ -129,7 +129,7 @@ void help()
<< " -o,--mode <full/peer> Start a full node or a peer node (Default: full)." << endl << " -o,--mode <full/peer> Start a full node or a peer node (Default: full)." << endl
<< " -p,--port <port> Connect to remote port (default: 30303)." << endl << " -p,--port <port> Connect to remote port (default: 30303)." << endl
<< " -P,--priority <0 - 100> Default % priority of a transaction (default: 50)." << endl << " -P,--priority <0 - 100> Default % priority of a transaction (default: 50)." << endl
<< " -R,--rebuild-blockchain First rebuild the blockchain from the existing database." << endl << " -R,--rebuild First rebuild the blockchain from the existing database." << endl
<< " -r,--remote <host> Connect to remote host (default: none)." << endl << " -r,--remote <host> Connect to remote host (default: none)." << endl
<< " -s,--secret <secretkeyhex> Set the secret key for use with send command (default: auto)." << endl << " -s,--secret <secretkeyhex> Set the secret key for use with send command (default: auto)." << endl
<< " -t,--miners <number> Number of mining threads to start (Default: " << thread::hardware_concurrency() << ")" << endl << " -t,--miners <number> Number of mining threads to start (Default: " << thread::hardware_concurrency() << ")" << endl
@ -274,9 +274,9 @@ int main(int argc, char** argv)
return -1; return -1;
} }
} }
else if (arg == "-K" || arg == "--kill-blockchain") else if (arg == "-K" || arg == "--kill-blockchain" || arg == "--kill")
killChain = WithExisting::Kill; killChain = WithExisting::Kill;
else if (arg == "-B" || arg == "--rebuild-blockchain") else if (arg == "-B" || arg == "--rebuild")
killChain = WithExisting::Verify; killChain = WithExisting::Verify;
else if ((arg == "-c" || arg == "--client-name") && i + 1 < argc) else if ((arg == "-c" || arg == "--client-name") && i + 1 < argc)
clientName = argv[++i]; clientName = argv[++i];

10
libethcore/Common.h

@ -85,5 +85,15 @@ enum class RelativeBlock: BlockNumber
Pending = PendingBlock Pending = PendingBlock
}; };
enum class ImportResult
{
Success = 0,
UnknownParent,
FutureTime,
AlreadyInChain,
AlreadyKnown,
Malformed
};
} }
} }

6
libethereum/BlockChain.cpp

@ -267,7 +267,7 @@ LastHashes BlockChain::lastHashes(unsigned _n) const
return m_lastLastHashes; return m_lastLastHashes;
} }
h256s BlockChain::sync(BlockQueue& _bq, OverlayDB const& _stateDB, unsigned _max) pair<h256s, bool> BlockChain::sync(BlockQueue& _bq, OverlayDB const& _stateDB, unsigned _max)
{ {
_bq.tick(*this); _bq.tick(*this);
@ -295,8 +295,8 @@ h256s BlockChain::sync(BlockQueue& _bq, OverlayDB const& _stateDB, unsigned _max
catch (...) catch (...)
{} {}
} }
_bq.doneDrain(); bool yetMore = _bq.doneDrain();
return ret; return make_pair(ret, yetMore);
} }
h256s BlockChain::attemptImport(bytes const& _block, OverlayDB const& _stateDB, bool _force) noexcept h256s BlockChain::attemptImport(bytes const& _block, OverlayDB const& _stateDB, bool _force) noexcept

2
libethereum/BlockChain.h

@ -99,7 +99,7 @@ public:
void process(); void process();
/// Sync the chain with any incoming blocks. All blocks should, if processed in order /// Sync the chain with any incoming blocks. All blocks should, if processed in order
h256s sync(BlockQueue& _bq, OverlayDB const& _stateDB, unsigned _max); std::pair<h256s, bool> sync(BlockQueue& _bq, OverlayDB const& _stateDB, unsigned _max);
/// Attempt to import the given block directly into the CanonBlockChain and sync with the state DB. /// Attempt to import the given block directly into the CanonBlockChain and sync with the state DB.
/// @returns the block hashes of any blocks that came into/went out of the canonical block chain. /// @returns the block hashes of any blocks that came into/went out of the canonical block chain.

14
libethereum/BlockQueue.h

@ -26,6 +26,7 @@
#include <libdevcore/Log.h> #include <libdevcore/Log.h>
#include <libethcore/Common.h> #include <libethcore/Common.h>
#include <libdevcore/Guards.h> #include <libdevcore/Guards.h>
#include <libethcore/Common.h>
namespace dev namespace dev
{ {
@ -37,16 +38,6 @@ class BlockChain;
struct BlockQueueChannel: public LogChannel { static const char* name() { return "[]Q"; } static const int verbosity = 4; }; struct BlockQueueChannel: public LogChannel { static const char* name() { return "[]Q"; } static const int verbosity = 4; };
#define cblockq dev::LogOutputStream<dev::eth::BlockQueueChannel, true>() #define cblockq dev::LogOutputStream<dev::eth::BlockQueueChannel, true>()
enum class ImportResult
{
Success = 0,
UnknownParent,
FutureTime,
AlreadyInChain,
AlreadyKnown,
Malformed
};
/** /**
* @brief A queue of blocks. Sits between network or other I/O and the BlockChain. * @brief A queue of blocks. Sits between network or other I/O and the BlockChain.
* Sorts them ready for blockchain insertion (with the BlockChain::sync() method). * Sorts them ready for blockchain insertion (with the BlockChain::sync() method).
@ -66,7 +57,8 @@ public:
void drain(std::vector<bytes>& o_out, unsigned _max); void drain(std::vector<bytes>& o_out, unsigned _max);
/// Must be called after a drain() call. Notes that the drained blocks have been imported into the blockchain, so we can forget about them. /// Must be called after a drain() call. Notes that the drained blocks have been imported into the blockchain, so we can forget about them.
void doneDrain() { WriteGuard l(m_lock); m_drainingSet.clear(); } /// @returns true iff there are additional blocks ready to be processed.
bool doneDrain() { WriteGuard l(m_lock); m_drainingSet.clear(); return !m_readySet.empty(); }
/// Notify the queue that the chain has changed and a new block has attained 'ready' status (i.e. is in the chain). /// Notify the queue that the chain has changed and a new block has attained 'ready' status (i.e. is in the chain).
void noteReady(h256 _b) { WriteGuard l(m_lock); noteReadyWithoutWriteGuard(_b); } void noteReady(h256 _b) { WriteGuard l(m_lock); noteReadyWithoutWriteGuard(_b); }

13
libethereum/Client.cpp

@ -120,7 +120,7 @@ void BasicGasPricer::update(BlockChain const& _bc)
Client::Client(p2p::Host* _extNet, std::string const& _dbPath, WithExisting _forceAction, u256 _networkId, int _miners): Client::Client(p2p::Host* _extNet, std::string const& _dbPath, WithExisting _forceAction, u256 _networkId, int _miners):
Worker("eth"), Worker("eth"),
m_vc(_dbPath), m_vc(_dbPath),
m_bc(_dbPath, max(m_vc.action(), _forceAction), [](unsigned d, unsigned t){ cerr << "REVISING BLOCKCHAIN: Processed " << d << " of " << t << "..." << endl; }), m_bc(_dbPath, max(m_vc.action(), _forceAction), [](unsigned d, unsigned t){ cerr << "REVISING BLOCKCHAIN: Processed " << d << " of " << t << "...\r"; }),
m_gp(new TrivialGasPricer), m_gp(new TrivialGasPricer),
m_stateDB(State::openDB(_dbPath, max(m_vc.action(), _forceAction))), m_stateDB(State::openDB(_dbPath, max(m_vc.action(), _forceAction))),
m_preMine(Address(), m_stateDB), m_preMine(Address(), m_stateDB),
@ -441,6 +441,8 @@ void Client::doWork()
{ {
// TODO: Use condition variable rather than polling. // TODO: Use condition variable rather than polling.
bool stillGotWork = false;
cworkin << "WORK"; cworkin << "WORK";
h256Set changeds; h256Set changeds;
@ -496,7 +498,10 @@ void Client::doWork()
cwork << "BQ ==> CHAIN ==> STATE"; cwork << "BQ ==> CHAIN ==> STATE";
OverlayDB db = m_stateDB; OverlayDB db = m_stateDB;
x_stateDB.unlock(); x_stateDB.unlock();
h256s newBlocks = m_bc.sync(m_bq, db, 100); // TODO: remove transactions from m_tq nicely rather than relying on out of date nonce later on. h256s newBlocks;
bool sgw;
tie(newBlocks, sgw) = m_bc.sync(m_bq, db, 100); // TODO: remove transactions from m_tq nicely rather than relying on out of date nonce later on.
stillGotWork = stillGotWork | sgw;
if (newBlocks.size()) if (newBlocks.size())
{ {
for (auto i: newBlocks) for (auto i: newBlocks)
@ -544,7 +549,9 @@ void Client::doWork()
noteChanged(changeds); noteChanged(changeds);
cworkout << "WORK"; cworkout << "WORK";
if (!stillGotWork)
this_thread::sleep_for(chrono::milliseconds(100)); this_thread::sleep_for(chrono::milliseconds(100));
if (chrono::system_clock::now() - m_lastGarbageCollection > chrono::seconds(5)) if (chrono::system_clock::now() - m_lastGarbageCollection > chrono::seconds(5))
{ {
// watches garbage collection // watches garbage collection
@ -601,7 +608,7 @@ void Client::inject(bytesConstRef _rlp)
{ {
startWorking(); startWorking();
m_tq.attemptImport(_rlp); m_tq.import(_rlp);
} }
void Client::flushTransactions() void Client::flushTransactions()

4
libethereum/ClientBase.cpp

@ -44,7 +44,7 @@ void ClientBase::submitTransaction(Secret _secret, u256 _value, Address _dest, b
u256 n = postMine().transactionsFrom(toAddress(_secret)); u256 n = postMine().transactionsFrom(toAddress(_secret));
Transaction t(_value, _gasPrice, _gas, _dest, _data, n, _secret); Transaction t(_value, _gasPrice, _gas, _dest, _data, n, _secret);
m_tq.attemptImport(t.rlp()); m_tq.import(t.rlp());
StructuredLogger::transactionReceived(t.sha3().abridged(), t.sender().abridged()); StructuredLogger::transactionReceived(t.sha3().abridged(), t.sender().abridged());
cnote << "New transaction " << t; cnote << "New transaction " << t;
@ -56,7 +56,7 @@ Address ClientBase::submitTransaction(Secret _secret, u256 _endowment, bytes con
u256 n = postMine().transactionsFrom(toAddress(_secret)); u256 n = postMine().transactionsFrom(toAddress(_secret));
Transaction t(_endowment, _gasPrice, _gas, _init, n, _secret); Transaction t(_endowment, _gasPrice, _gas, _init, n, _secret);
m_tq.attemptImport(t.rlp()); m_tq.import(t.rlp());
StructuredLogger::transactionReceived(t.sha3().abridged(), t.sender().abridged()); StructuredLogger::transactionReceived(t.sha3().abridged(), t.sender().abridged());
cnote << "New transaction " << t; cnote << "New transaction " << t;

20
libethereum/EthereumPeer.cpp

@ -326,15 +326,27 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
case TransactionsPacket: case TransactionsPacket:
{ {
clogS(NetMessageSummary) << "Transactions (" << dec << _r.itemCount() << "entries)"; clogS(NetMessageSummary) << "Transactions (" << dec << _r.itemCount() << "entries)";
addRating(_r.itemCount());
Guard l(x_knownTransactions); Guard l(x_knownTransactions);
for (unsigned i = 0; i < _r.itemCount(); ++i) for (unsigned i = 0; i < _r.itemCount(); ++i)
{ {
auto h = sha3(_r[i].data()); auto h = sha3(_r[i].data());
m_knownTransactions.insert(h); m_knownTransactions.insert(h);
if (!host()->m_tq.import(_r[i].data())) ImportResult ir = host()->m_tq.import(_r[i].data());
switch (ir)
{
case ImportResult::Malformed:
addRating(-100);
break;
case ImportResult::AlreadyKnown:
// if we already had the transaction, then don't bother sending it on. // if we already had the transaction, then don't bother sending it on.
addRating(0);
break;
case ImportResult::Success:
addRating(100);
host()->m_transactionsSent.insert(h); host()->m_transactionsSent.insert(h);
break;
default:;
}
} }
break; break;
} }
@ -352,6 +364,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
for (unsigned i = 0; i < c && p; ++i, p = host()->m_chain.details(p).parent) for (unsigned i = 0; i < c && p; ++i, p = host()->m_chain.details(p).parent)
s << p; s << p;
sealAndSend(s); sealAndSend(s);
addRating(0);
break; break;
} }
case BlockHashesPacket: case BlockHashesPacket:
@ -370,6 +383,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
} }
for (unsigned i = 0; i < _r.itemCount(); ++i) for (unsigned i = 0; i < _r.itemCount(); ++i)
{ {
addRating(1);
auto h = _r[i].toHash<h256>(); auto h = _r[i].toHash<h256>();
if (host()->m_chain.isKnown(h)) if (host()->m_chain.isKnown(h))
{ {
@ -398,6 +412,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
++n; ++n;
} }
} }
addRating(0);
RLPStream s; RLPStream s;
prep(s, BlocksPacket, n).appendRaw(rlp, n); prep(s, BlocksPacket, n).appendRaw(rlp, n);
sealAndSend(s); sealAndSend(s);
@ -497,6 +512,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
setNeedsSyncing(h, _r[1].toInt<u256>()); setNeedsSyncing(h, _r[1].toInt<u256>());
break; break;
} }
Guard l(x_knownBlocks); Guard l(x_knownBlocks);
m_knownBlocks.insert(h); m_knownBlocks.insert(h);
} }

12
libethereum/TransactionQueue.cpp

@ -28,14 +28,16 @@ using namespace std;
using namespace dev; using namespace dev;
using namespace dev::eth; using namespace dev::eth;
bool TransactionQueue::import(bytesConstRef _transactionRLP) ImportResult TransactionQueue::import(bytesConstRef _transactionRLP)
{ {
// Check if we already know this transaction. // Check if we already know this transaction.
h256 h = sha3(_transactionRLP); h256 h = sha3(_transactionRLP);
UpgradableGuard l(m_lock); UpgradableGuard l(m_lock);
// TODO: keep old transactions around and check in State for nonce validity
if (m_known.count(h)) if (m_known.count(h))
return false; return ImportResult::AlreadyKnown;
try try
{ {
@ -52,15 +54,15 @@ bool TransactionQueue::import(bytesConstRef _transactionRLP)
catch (Exception const& _e) catch (Exception const& _e)
{ {
cwarn << "Ignoring invalid transaction: " << diagnostic_information(_e); cwarn << "Ignoring invalid transaction: " << diagnostic_information(_e);
return false; return ImportResult::Malformed;
} }
catch (std::exception const& _e) catch (std::exception const& _e)
{ {
cwarn << "Ignoring invalid transaction: " << _e.what(); cwarn << "Ignoring invalid transaction: " << _e.what();
return false; return ImportResult::Malformed;
} }
return true; return ImportResult::Success;
} }
void TransactionQueue::setFuture(std::pair<h256, Transaction> const& _t) void TransactionQueue::setFuture(std::pair<h256, Transaction> const& _t)

8
libethereum/TransactionQueue.h

@ -23,8 +23,8 @@
#include <boost/thread.hpp> #include <boost/thread.hpp>
#include <libdevcore/Common.h> #include <libdevcore/Common.h>
#include "libethcore/Common.h"
#include <libdevcore/Guards.h> #include <libdevcore/Guards.h>
#include "libethcore/Common.h"
#include "Transaction.h" #include "Transaction.h"
namespace dev namespace dev
@ -34,6 +34,7 @@ namespace eth
class BlockChain; class BlockChain;
/** /**
* @brief A queue of Transactions, each stored as RLP. * @brief A queue of Transactions, each stored as RLP.
* @threadsafe * @threadsafe
@ -41,9 +42,8 @@ class BlockChain;
class TransactionQueue class TransactionQueue
{ {
public: public:
bool attemptImport(bytesConstRef _tx) { try { import(_tx); return true; } catch (...) { return false; } } ImportResult import(bytes const& _tx) { return import(&_tx); }
bool attemptImport(bytes const& _tx) { return attemptImport(&_tx); } ImportResult import(bytesConstRef _tx);
bool import(bytesConstRef _tx);
void drop(h256 _txHash); void drop(h256 _txHash);

2
libp2p/Capability.cpp

@ -53,7 +53,7 @@ void Capability::sealAndSend(RLPStream& _s)
m_session->sealAndSend(_s); m_session->sealAndSend(_s);
} }
void Capability::addRating(unsigned _r) void Capability::addRating(int _r)
{ {
m_session->addRating(_r); m_session->addRating(_r);
} }

2
libp2p/Capability.h

@ -52,7 +52,7 @@ protected:
RLPStream& prep(RLPStream& _s, unsigned _id, unsigned _args = 0); RLPStream& prep(RLPStream& _s, unsigned _id, unsigned _args = 0);
void sealAndSend(RLPStream& _s); void sealAndSend(RLPStream& _s);
void addRating(unsigned _r); void addRating(int _r);
private: private:
Session* m_session; Session* m_session;

4
libp2p/Session.cpp

@ -77,12 +77,14 @@ NodeId Session::id() const
return m_peer ? m_peer->id : NodeId(); return m_peer ? m_peer->id : NodeId();
} }
void Session::addRating(unsigned _r) void Session::addRating(int _r)
{ {
if (m_peer) if (m_peer)
{ {
m_peer->m_rating += _r; m_peer->m_rating += _r;
m_peer->m_score += _r; m_peer->m_score += _r;
if (_r >= 0)
m_peer->noteSessionGood();
} }
} }

2
libp2p/Session.h

@ -74,7 +74,7 @@ public:
void sealAndSend(RLPStream& _s); void sealAndSend(RLPStream& _s);
int rating() const; int rating() const;
void addRating(unsigned _r); void addRating(int _r);
void addNote(std::string const& _k, std::string const& _v) { m_info.notes[_k] = _v; } void addNote(std::string const& _k, std::string const& _v) { m_info.notes[_k] = _v; }

2
test/blockchain.cpp

@ -98,7 +98,7 @@ void doBlockchainTests(json_spirit::mValue& _v, bool _fillin)
{ {
mObject tx = txObj.get_obj(); mObject tx = txObj.get_obj();
importer.importTransaction(tx); importer.importTransaction(tx);
if (!txs.attemptImport(importer.m_transaction.rlp())) if (txs.import(importer.m_transaction.rlp()) != ImportResult::Success)
cnote << "failed importing transaction\n"; cnote << "failed importing transaction\n";
} }

Loading…
Cancel
Save