Browse Source

New strategy - send all new blocks out, even when (re)syncing (though

only when resyncing < 20 blocks).
Make a note of bad transactions to avoid re-importing when bad nodes
pass them to us.
cl-refactor
Gav Wood 10 years ago
parent
commit
a5f2dc881e
  1. 1
      cmake/scripts/jsonrpcstub.cmake
  2. 2
      libdevcore/Common.cpp
  3. 19
      libethereum/Client.cpp
  4. 1
      libethereum/Client.h
  5. 28
      libethereum/EthereumHost.cpp
  6. 2
      libethereum/EthereumPeer.cpp
  7. 8
      libethereum/TransactionQueue.cpp
  8. 12
      libethereum/TransactionQueue.h
  9. 10
      libp2p/NodeTable.cpp

1
cmake/scripts/jsonrpcstub.cmake

@ -42,4 +42,3 @@ else()
replace_if_different("${SERVER_TMPFILE}" "${SERVER_OUTFILE}") replace_if_different("${SERVER_TMPFILE}" "${SERVER_OUTFILE}")
replace_if_different("${CLIENT_TMPFILE}" "${CLIENT_OUTFILE}") replace_if_different("${CLIENT_TMPFILE}" "${CLIENT_OUTFILE}")
endif() endif()

2
libdevcore/Common.cpp

@ -27,7 +27,7 @@ using namespace dev;
namespace dev namespace dev
{ {
char const* Version = "0.9.10"; char const* Version = "0.9.11";
} }

19
libethereum/Client.cpp

@ -395,6 +395,9 @@ ExecutionResult Client::call(Address _dest, bytes const& _data, u256 _gas, u256
ProofOfWork::WorkPackage Client::getWork() ProofOfWork::WorkPackage Client::getWork()
{ {
// lock the work so a later submission isn't invalidated by processing a transaction elsewhere.
// this will be reset as soon as a new block arrives, allowing more transactions to be processed.
m_remoteWorking = true;
return ProofOfWork::package(m_miningInfo); return ProofOfWork::package(m_miningInfo);
} }
@ -448,7 +451,7 @@ void Client::syncTransactionQueue()
appendFromNewPending(newPendingReceipts[i], changeds, m_postMine.pending()[i].sha3()); appendFromNewPending(newPendingReceipts[i], changeds, m_postMine.pending()[i].sha3());
changeds.insert(PendingChangedFilter); changeds.insert(PendingChangedFilter);
// TODO: Tell farm about new transaction (i.e. restartProofOfWork mining). // Tell farm about new transaction (i.e. restartProofOfWork mining).
onPostStateChanged(); onPostStateChanged();
// Tell watches about the new transactions. // Tell watches about the new transactions.
@ -468,7 +471,7 @@ void Client::onChainChanged(ImportRoute const& _ir)
for (auto const& t: m_bc.transactions(h)) for (auto const& t: m_bc.transactions(h))
{ {
clog(ClientNote) << "Resubmitting transaction " << Transaction(t, CheckTransaction::None); clog(ClientNote) << "Resubmitting transaction " << Transaction(t, CheckTransaction::None);
m_tq.import(t); m_tq.import(t, TransactionQueue::ImportCallback(), IfDropped::Retry);
} }
} }
@ -525,6 +528,7 @@ void Client::onPostStateChanged()
} }
m_farm.setWork(m_miningInfo); m_farm.setWork(m_miningInfo);
} }
m_remoteWorking = false;
} }
void Client::startMining() void Client::startMining()
@ -561,16 +565,17 @@ void Client::doWork()
// TODO: Use condition variable rather than this rubbish. // TODO: Use condition variable rather than this rubbish.
bool t = true; bool t = true;
if (m_syncTransactionQueue.compare_exchange_strong(t, false))
syncTransactionQueue();
t = true;
if (m_syncBlockQueue.compare_exchange_strong(t, false)) if (m_syncBlockQueue.compare_exchange_strong(t, false))
syncBlockQueue(); syncBlockQueue();
t = true;
if (m_syncTransactionQueue.compare_exchange_strong(t, false) && !m_remoteWorking)
syncTransactionQueue();
tick(); tick();
this_thread::sleep_for(chrono::milliseconds(20)); if (!m_syncBlockQueue && !m_syncTransactionQueue)
this_thread::sleep_for(chrono::milliseconds(20));
} }
void Client::tick() void Client::tick()

1
libethereum/Client.h

@ -277,6 +277,7 @@ private:
mutable SharedMutex x_postMine; ///< Lock on the OverlayDB and other attributes of m_postMine. mutable SharedMutex x_postMine; ///< Lock on the OverlayDB and other attributes of m_postMine.
State m_postMine; ///< The state of the client which we're mining (i.e. it'll have all the rewards added). State m_postMine; ///< The state of the client which we're mining (i.e. it'll have all the rewards added).
BlockInfo m_miningInfo; ///< The header we're attempting to mine on (derived from m_postMine). BlockInfo m_miningInfo; ///< The header we're attempting to mine on (derived from m_postMine).
bool m_remoteWorking = false; ///< Is there an acive and valid remote worker?
std::weak_ptr<EthereumHost> m_host; ///< Our Ethereum Host. Don't do anything if we can't lock. std::weak_ptr<EthereumHost> m_host; ///< Our Ethereum Host. Don't do anything if we can't lock.

28
libethereum/EthereumHost.cpp

@ -242,18 +242,28 @@ std::vector<std::shared_ptr<EthereumPeer>> EthereumHost::randomSelection(unsigne
void EthereumHost::maintainBlocks(h256 _currentHash) void EthereumHost::maintainBlocks(h256 _currentHash)
{ {
// Send any new blocks. // Send any new blocks.
if (m_chain.details(m_latestBlockSent).totalDifficulty < m_chain.details(_currentHash).totalDifficulty) auto detailsFrom = m_chain.details(m_latestBlockSent);
auto detailsTo = m_chain.details(_currentHash);
if (detailsFrom.totalDifficulty < detailsTo.totalDifficulty)
{ {
clog(NetMessageSummary) << "Sending a new block (current is" << _currentHash << ", was" << m_latestBlockSent << ")"; if (diff(detailsFrom.number, detailsTo.number) < 20)
for (auto const& p: randomSelection(25, [&](EthereumPeer* p){return !p->m_knownBlocks.count(_currentHash); }))
{ {
RLPStream ts; // don't be sending more than 20 "new" blocks. if there are any more we were probably waaaay behind.
p->prep(ts, NewBlockPacket, 2).appendRaw(m_chain.block(), 1).append(m_chain.details().totalDifficulty); clog(NetMessageSummary) << "Sending a new block (current is" << _currentHash << ", was" << m_latestBlockSent << ")";
h256s blocks = get<0>(m_chain.treeRoute(m_latestBlockSent, _currentHash, false, false, true));
for (auto const& p: randomSelection(25, [&](EthereumPeer* p){return !p->m_knownBlocks.count(_currentHash); }))
for (auto const& b: blocks)
if (!p->m_knownBlocks.count(b))
{
RLPStream ts;
p->prep(ts, NewBlockPacket, 2).appendRaw(m_chain.block(b), 1).append(m_chain.details(b).totalDifficulty);
Guard l(p->x_knownBlocks); Guard l(p->x_knownBlocks);
p->sealAndSend(ts); p->sealAndSend(ts);
p->m_knownBlocks.clear(); p->m_knownBlocks.clear();
}
} }
m_latestBlockSent = _currentHash; m_latestBlockSent = _currentHash;
} }

2
libethereum/EthereumPeer.cpp

@ -142,7 +142,7 @@ void EthereumPeer::transition(Asking _a, bool _force)
clog(NetNote) << "Difficulty of hashchain HIGHER. Grabbing" << m_syncingNeededBlocks.size() << "blocks [latest now" << m_syncingLatestHash.abridged() << ", was" << host()->m_latestBlockSent.abridged() << "]"; clog(NetNote) << "Difficulty of hashchain HIGHER. Grabbing" << m_syncingNeededBlocks.size() << "blocks [latest now" << m_syncingLatestHash.abridged() << ", was" << host()->m_latestBlockSent.abridged() << "]";
host()->m_man.resetToChain(m_syncingNeededBlocks); host()->m_man.resetToChain(m_syncingNeededBlocks);
host()->m_latestBlockSent = m_syncingLatestHash; // host()->m_latestBlockSent = m_syncingLatestHash;
} }
else else
{ {

8
libethereum/TransactionQueue.cpp

@ -28,7 +28,7 @@ using namespace std;
using namespace dev; using namespace dev;
using namespace dev::eth; using namespace dev::eth;
ImportResult TransactionQueue::import(bytesConstRef _transactionRLP, ImportCallback const& _cb) ImportResult TransactionQueue::import(bytesConstRef _transactionRLP, ImportCallback const& _cb, IfDropped _ik)
{ {
// Check if we already know this transaction. // Check if we already know this transaction.
h256 h = sha3(_transactionRLP); h256 h = sha3(_transactionRLP);
@ -39,6 +39,9 @@ ImportResult TransactionQueue::import(bytesConstRef _transactionRLP, ImportCallb
if (m_known.count(h)) if (m_known.count(h))
return ImportResult::AlreadyKnown; return ImportResult::AlreadyKnown;
if (m_dropped.count(h) && _ik == IfDropped::Ignore)
return ImportResult::AlreadyInChain;
try try
{ {
// Check validity of _transactionRLP as a transaction. To do this we just deserialise and attempt to determine the sender. // Check validity of _transactionRLP as a transaction. To do this we just deserialise and attempt to determine the sender.
@ -88,7 +91,7 @@ void TransactionQueue::noteGood(std::pair<h256, Transaction> const& _t)
m_unknown.erase(r.first, r.second); m_unknown.erase(r.first, r.second);
} }
void TransactionQueue::drop(h256 _txHash) void TransactionQueue::drop(h256 const& _txHash)
{ {
UpgradableGuard l(m_lock); UpgradableGuard l(m_lock);
@ -96,6 +99,7 @@ void TransactionQueue::drop(h256 _txHash)
return; return;
UpgradeGuard ul(l); UpgradeGuard ul(l);
m_dropped.insert(_txHash);
m_known.erase(_txHash); m_known.erase(_txHash);
if (m_current.count(_txHash)) if (m_current.count(_txHash))

12
libethereum/TransactionQueue.h

@ -22,7 +22,6 @@
#pragma once #pragma once
#include <functional> #include <functional>
#include <boost/thread.hpp>
#include <libdevcore/Common.h> #include <libdevcore/Common.h>
#include <libdevcore/Guards.h> #include <libdevcore/Guards.h>
#include <libdevcore/Log.h> #include <libdevcore/Log.h>
@ -39,6 +38,8 @@ class BlockChain;
struct TransactionQueueChannel: public LogChannel { static const char* name() { return "->Q"; } static const int verbosity = 4; }; struct TransactionQueueChannel: public LogChannel { static const char* name() { return "->Q"; } static const int verbosity = 4; };
#define ctxq dev::LogOutputStream<dev::eth::TransactionQueueChannel, true>() #define ctxq dev::LogOutputStream<dev::eth::TransactionQueueChannel, true>()
enum class IfDropped { Ignore, Retry };
/** /**
* @brief A queue of Transactions, each stored as RLP. * @brief A queue of Transactions, each stored as RLP.
* @threadsafe * @threadsafe
@ -48,10 +49,10 @@ class TransactionQueue
public: public:
using ImportCallback = std::function<void(ImportResult)>; using ImportCallback = std::function<void(ImportResult)>;
ImportResult import(bytes const& _tx, ImportCallback const& _cb = ImportCallback()) { return import(&_tx, _cb); } ImportResult import(bytes const& _tx, ImportCallback const& _cb = ImportCallback(), IfDropped _ik = IfDropped::Ignore) { return import(&_tx, _cb, _ik); }
ImportResult import(bytesConstRef _tx, ImportCallback const& _cb = ImportCallback()); ImportResult import(bytesConstRef _tx, ImportCallback const& _cb = ImportCallback(), IfDropped _ik = IfDropped::Ignore);
void drop(h256 _txHash); void drop(h256 const& _txHash);
std::map<h256, Transaction> transactions() const { ReadGuard l(m_lock); return m_current; } std::map<h256, Transaction> transactions() const { ReadGuard l(m_lock); return m_current; }
std::pair<unsigned, unsigned> items() const { ReadGuard l(m_lock); return std::make_pair(m_current.size(), m_unknown.size()); } std::pair<unsigned, unsigned> items() const { ReadGuard l(m_lock); return std::make_pair(m_current.size(), m_unknown.size()); }
@ -63,11 +64,12 @@ public:
template <class T> Handler onReady(T const& _t) { return m_onReady.add(_t); } template <class T> Handler onReady(T const& _t) { return m_onReady.add(_t); }
private: private:
mutable boost::shared_mutex m_lock; ///< General lock. mutable SharedMutex m_lock; ///< General lock.
std::set<h256> m_known; ///< Hashes of transactions in both sets. std::set<h256> m_known; ///< Hashes of transactions in both sets.
std::map<h256, Transaction> m_current; ///< Map of SHA3(tx) to tx. std::map<h256, Transaction> m_current; ///< Map of SHA3(tx) to tx.
std::multimap<Address, std::pair<h256, Transaction>> m_unknown; ///< For transactions that have a future nonce; we map their sender address to the tx stuff, and insert once the sender has a valid TX. std::multimap<Address, std::pair<h256, Transaction>> m_unknown; ///< For transactions that have a future nonce; we map their sender address to the tx stuff, and insert once the sender has a valid TX.
std::map<h256, std::function<void(ImportResult)>> m_callbacks; ///< Called once. std::map<h256, std::function<void(ImportResult)>> m_callbacks; ///< Called once.
std::set<h256> m_dropped; ///< Transactions that have previously been dropped.
Signal m_onReady; ///< Called when a subsequent call to import transactions will return a non-empty container. Be nice and exit fast. Signal m_onReady; ///< Called when a subsequent call to import transactions will return a non-empty container. Be nice and exit fast.
}; };

10
libp2p/NodeTable.cpp

@ -384,7 +384,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
// h256 + Signature + type + RLP (smallest possible packet is empty neighbours packet which is 3 bytes) // h256 + Signature + type + RLP (smallest possible packet is empty neighbours packet which is 3 bytes)
if (_packet.size() < h256::size + Signature::size + 1 + 3) if (_packet.size() < h256::size + Signature::size + 1 + 3)
{ {
clog(NodeTableWarn) << "Invalid Message size from " << _from.address().to_string() << ":" << _from.port(); clog(NodeTableWarn) << "Invalid message size from " << _from.address().to_string() << ":" << _from.port();
return; return;
} }
@ -392,7 +392,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
h256 hashSigned(sha3(hashedBytes)); h256 hashSigned(sha3(hashedBytes));
if (!_packet.cropped(0, h256::size).contentsEqual(hashSigned.asBytes())) if (!_packet.cropped(0, h256::size).contentsEqual(hashSigned.asBytes()))
{ {
clog(NodeTableWarn) << "Invalid Message hash from " << _from.address().to_string() << ":" << _from.port(); clog(NodeTableWarn) << "Invalid message hash from " << _from.address().to_string() << ":" << _from.port();
return; return;
} }
@ -404,7 +404,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
Public nodeid(dev::recover(*(Signature const*)sigBytes.data(), sha3(signedBytes))); Public nodeid(dev::recover(*(Signature const*)sigBytes.data(), sha3(signedBytes)));
if (!nodeid) if (!nodeid)
{ {
clog(NodeTableWarn) << "Invalid Message signature from " << _from.address().to_string() << ":" << _from.port(); clog(NodeTableWarn) << "Invalid message signature from " << _from.address().to_string() << ":" << _from.port();
return; return;
} }
@ -471,7 +471,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
if (!expected) if (!expected)
{ {
clog(NetConnect) << "Dropping unsolicited Neighbours packet from " << _from.address(); clog(NetConnect) << "Dropping unsolicited neighbours packet from " << _from.address();
break; break;
} }
@ -518,7 +518,7 @@ void NodeTable::onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytes
} }
default: default:
clog(NodeTableWarn) << "Invalid Message, " << hex << packetType << ", received from " << _from.address().to_string() << ":" << dec << _from.port(); clog(NodeTableWarn) << "Invalid message, " << hex << packetType << ", received from " << _from.address().to_string() << ":" << dec << _from.port();
return; return;
} }

Loading…
Cancel
Save