Browse Source

Avoid attempting to import invalid blocks.

Reduce verbosity.
Minor additions to exponential backoff.
cl-refactor
Gav Wood 10 years ago
parent
commit
c5cb5aeed4
  1. 6
      libethereum/BlockChain.cpp
  2. 2
      libethereum/BlockChain.h
  3. 11
      libethereum/BlockQueue.cpp
  4. 1
      libethereum/BlockQueue.h
  5. 15
      libethereum/Client.cpp
  6. 3
      libethereum/EthereumPeer.cpp
  7. 2
      libethereum/State.h
  8. 3
      libethereum/TransactionQueue.cpp
  9. 3
      libethereum/TransactionQueue.h
  10. 3
      libp2p/Host.cpp
  11. 19
      libp2p/Session.cpp
  12. 7
      libp2p/Session.h

6
libethereum/BlockChain.cpp

@ -305,7 +305,7 @@ LastHashes BlockChain::lastHashes(unsigned _n) const
return m_lastLastHashes;
}
tuple<ImportRoute, bool> BlockChain::sync(BlockQueue& _bq, OverlayDB const& _stateDB, unsigned _max)
tuple<ImportRoute, bool, unsigned> BlockChain::sync(BlockQueue& _bq, OverlayDB const& _stateDB, unsigned _max)
{
// _bq.tick(*this);
@ -315,6 +315,7 @@ tuple<ImportRoute, bool> BlockChain::sync(BlockQueue& _bq, OverlayDB const& _sta
h256s fresh;
h256s dead;
h256s badBlocks;
unsigned count = 0;
for (VerifiedBlock const& block: blocks)
if (!badBlocks.empty())
badBlocks.push_back(block.verified.info.hash());
@ -328,6 +329,7 @@ tuple<ImportRoute, bool> BlockChain::sync(BlockQueue& _bq, OverlayDB const& _sta
r = import(block.verified, _stateDB, ImportRequirements::Default & ~ImportRequirements::ValidNonce & ~ImportRequirements::CheckUncles);
fresh += r.liveBlocks;
dead += r.deadBlocks;
++count;
}
catch (dev::eth::UnknownParent)
{
@ -353,7 +355,7 @@ tuple<ImportRoute, bool> BlockChain::sync(BlockQueue& _bq, OverlayDB const& _sta
badBlocks.push_back(block.verified.info.hash());
}
}
return make_tuple(ImportRoute{dead, fresh}, _bq.doneDrain(badBlocks));
return make_tuple(ImportRoute{dead, fresh}, _bq.doneDrain(badBlocks), count);
}
pair<ImportResult, ImportRoute> BlockChain::attemptImport(bytes const& _block, OverlayDB const& _stateDB, ImportRequirements::value _ir) noexcept

2
libethereum/BlockChain.h

@ -117,7 +117,7 @@ public:
/// Sync the chain with any incoming blocks. All blocks should, if processed in order.
/// @returns fresh blocks, dead blocks and true iff there are additional blocks to be processed waiting.
std::tuple<ImportRoute, bool> sync(BlockQueue& _bq, OverlayDB const& _stateDB, unsigned _max);
std::tuple<ImportRoute, bool, unsigned> sync(BlockQueue& _bq, OverlayDB const& _stateDB, unsigned _max);
/// Attempt to import the given block directly into the CanonBlockChain and sync with the state DB.
/// @returns the block hashes of any blocks that came into/went out of the canonical block chain.

11
libethereum/BlockQueue.cpp

@ -36,6 +36,7 @@ const char* BlockQueueChannel::name() { return EthOrange "[]>"; }
#else
const char* BlockQueueChannel::name() { return EthOrange "▣┅▶"; }
#endif
const char* BlockQueueTraceChannel::name() { return EthOrange "▣ ▶"; }
size_t const c_maxKnownCount = 100000;
size_t const c_maxKnownSize = 128 * 1024 * 1024;
@ -183,14 +184,14 @@ ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc, boo
// Check if we already know this block.
h256 h = BlockInfo::headerHash(_block);
cblockq << "Queuing block" << h << "for import...";
clog(BlockQueueTraceChannel) << "Queuing block" << h << "for import...";
UpgradableGuard l(m_lock);
if (m_readySet.count(h) || m_drainingSet.count(h) || m_unknownSet.count(h) || m_knownBad.count(h))
{
// Already know about this one.
cblockq << "Already known.";
clog(BlockQueueTraceChannel) << "Already known.";
return ImportResult::AlreadyKnown;
}
@ -228,7 +229,7 @@ ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc, boo
time_t bit = (unsigned)bi.timestamp;
if (strftime(buf, 24, "%X", localtime(&bit)) == 0)
buf[0] = '\0'; // empty if case strftime fails
cblockq << "OK - queued for future [" << bi.timestamp << "vs" << time(0) << "] - will wait until" << buf;
clog(BlockQueueTraceChannel) << "OK - queued for future [" << bi.timestamp << "vs" << time(0) << "] - will wait until" << buf;
m_unknownSize += _block.size();
m_unknownCount++;
m_difficulty += bi.difficulty;
@ -248,7 +249,7 @@ ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc, boo
else if (!m_readySet.count(bi.parentHash) && !m_drainingSet.count(bi.parentHash) && !_bc.isKnown(bi.parentHash))
{
// We don't know the parent (yet) - queue it up for later. It'll get resent to us if we find out about its ancestry later on.
cblockq << "OK - queued as unknown parent:" << bi.parentHash;
clog(BlockQueueTraceChannel) << "OK - queued as unknown parent:" << bi.parentHash;
m_unknown.insert(make_pair(bi.parentHash, make_pair(h, _block.toBytes())));
m_unknownSet.insert(h);
m_unknownSize += _block.size();
@ -260,7 +261,7 @@ ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc, boo
else
{
// If valid, append to blocks.
cblockq << "OK - ready for chain insertion.";
clog(BlockQueueTraceChannel) << "OK - ready for chain insertion.";
DEV_GUARDED(m_verification)
m_unverified.push_back(UnverifiedBlock { h, bi.parentHash, _block.toBytes() });
m_moreToVerify.notify_one();

1
libethereum/BlockQueue.h

@ -42,6 +42,7 @@ namespace eth
class BlockChain;
struct BlockQueueChannel: public LogChannel { static const char* name(); static const int verbosity = 4; };
struct BlockQueueTraceChannel: public LogChannel { static const char* name(); static const int verbosity = 7; };
#define cblockq dev::LogOutputStream<dev::eth::BlockQueueChannel, true>()
struct BlockQueueStatus

15
libethereum/Client.cpp

@ -612,22 +612,23 @@ bool Client::submitWork(ProofOfWork::Solution const& _solution)
}
unsigned static const c_syncMin = 1;
unsigned static const c_syncMax = 100;
unsigned static const c_syncMax = 1000;
double static const c_targetDuration = 1;
void Client::syncBlockQueue()
{
ImportRoute ir;
cwork << "BQ ==> CHAIN ==> STATE";
ImportRoute ir;
unsigned count;
boost::timer t;
tie(ir, m_syncBlockQueue) = m_bc.sync(m_bq, m_stateDB, m_syncAmount);
tie(ir, m_syncBlockQueue, count) = m_bc.sync(m_bq, m_stateDB, m_syncAmount);
double elapsed = t.elapsed();
cnote << m_syncAmount << "blocks imported in" << unsigned(elapsed * 1000) << "ms (" << (m_syncAmount / elapsed) << "blocks/s)";
cnote << count << "blocks imported in" << unsigned(elapsed * 1000) << "ms (" << (count / elapsed) << "blocks/s)";
if (elapsed > c_targetDuration * 1.1 && m_syncAmount > c_syncMin)
m_syncAmount = max(c_syncMin, m_syncAmount * 9 / 10);
else if (elapsed < c_targetDuration * 0.9 && m_syncAmount < c_syncMax)
if (elapsed > c_targetDuration * 1.1 && count > c_syncMin)
m_syncAmount = max(c_syncMin, count * 9 / 10);
else if (count == m_syncAmount && elapsed < c_targetDuration * 0.9 && m_syncAmount < c_syncMax)
m_syncAmount = min(c_syncMax, m_syncAmount * 11 / 10 + 1);
if (ir.liveBlocks.empty())
return;

3
libethereum/EthereumPeer.cpp

@ -76,6 +76,9 @@ bool EthereumPeer::isRude() const
unsigned EthereumPeer::askOverride() const
{
std::string static const badGeth = "Geth/v0.9.27";
if (session()->info().clientVersion.substr(0, badGeth.size()) == badGeth)
return 1;
bytes const& d = repMan().data(*session(), name());
return d.empty() ? c_maxBlocksAsk : RLP(d).toInt<unsigned>(RLP::LaisezFaire);
}

2
libethereum/State.h

@ -206,6 +206,8 @@ public:
return false;
PoW::assignResult(_result, m_currentBlock);
if (!PoW::verify(m_currentBlock))
return false;
cnote << "Completed" << m_currentBlock.headerHash(WithoutNonce) << m_currentBlock.nonce << m_currentBlock.difficulty << PoW::verify(m_currentBlock);

3
libethereum/TransactionQueue.cpp

@ -29,6 +29,7 @@ using namespace dev;
using namespace dev::eth;
const char* TransactionQueueChannel::name() { return EthCyan "┉┅▶"; }
const char* TransactionQueueTraceChannel::name() { return EthCyan " ┅▶"; }
ImportResult TransactionQueue::import(bytesConstRef _transactionRLP, ImportCallback const& _cb, IfDropped _ik)
{
@ -115,7 +116,7 @@ ImportResult TransactionQueue::manageImport_WITH_LOCK(h256 const& _h, Transactio
m_known.insert(_h);
if (_cb)
m_callbacks[_h] = _cb;
ctxq << "Queued vaguely legit-looking transaction" << _h;
clog(TransactionQueueTraceChannel) << "Queued vaguely legit-looking transaction" << _h;
m_onReady();
}
catch (Exception const& _e)

3
libethereum/TransactionQueue.h

@ -36,7 +36,8 @@ namespace eth
class BlockChain;
struct TransactionQueueChannel: public LogChannel { static const char* name(); static const int verbosity = 4; };
#define ctxq dev::LogOutputStream<dev::eth::TransactionQueueChannel, true>()
struct TransactionQueueTraceChannel: public LogChannel { static const char* name(); static const int verbosity = 7; };
#define ctxq dev::LogOutputStream<dev::eth::TransactionQueueTraceChannel, true>()
enum class IfDropped { Ignore, Retry };

3
libp2p/Host.cpp

@ -580,7 +580,8 @@ PeerSessionInfos Host::peerSessionInfo() const
for (auto& i: m_sessions)
if (auto j = i.second.lock())
if (j->isConnected())
ret.push_back(j->m_info);
DEV_GUARDED(j->x_info)
ret.push_back(j->m_info);
return ret;
}

19
libp2p/Session.cpp

@ -44,7 +44,8 @@ Session::Session(Host* _h, RLPXFrameCoder* _io, std::shared_ptr<RLPXSocket> cons
{
m_peer->m_lastDisconnect = NoDisconnect;
m_lastReceived = m_connect = chrono::steady_clock::now();
m_info.socketId = m_socket->ref().native_handle();
DEV_GUARDED(x_info)
m_info.socketId = m_socket->ref().native_handle();
}
Session::~Session()
@ -187,9 +188,12 @@ bool Session::interpret(PacketType _t, RLP const& _r)
break;
}
case PongPacket:
m_info.lastPing = std::chrono::steady_clock::now() - m_ping;
{
DEV_GUARDED(x_info)
m_info.lastPing = std::chrono::steady_clock::now() - m_ping;
clog(NetTriviaSummary) << "Latency: " << chrono::duration_cast<chrono::milliseconds>(m_info.lastPing).count() << " ms";
break;
}
case GetPeersPacket:
// Disabled for interop testing.
// GetPeers/PeersPacket will be modified to only exchange new nodes which it's peers are interested in.
@ -382,11 +386,12 @@ void Session::drop(DisconnectReason _reason)
void Session::disconnect(DisconnectReason _reason)
{
clog(NetConnect) << "Disconnecting (our reason:" << reasonOf(_reason) << ")";
StructuredLogger::p2pDisconnected(
m_info.id.abridged(),
m_peer->endpoint, // TODO: may not be 100% accurate
m_server->peerCount()
);
DEV_GUARDED(x_info)
StructuredLogger::p2pDisconnected(
m_info.id.abridged(),
m_peer->endpoint, // TODO: may not be 100% accurate
m_server->peerCount()
);
if (m_socket->ref().is_open())
{
RLPStream s;

7
libp2p/Session.h

@ -67,7 +67,7 @@ public:
bool isConnected() const { return m_socket->ref().is_open(); }
NodeId id() const;
unsigned socketId() const { return m_info.socketId; }
unsigned socketId() const { Guard l(x_info); return m_info.socketId; }
template <class PeerCap>
std::shared_ptr<PeerCap> cap() const { try { return std::static_pointer_cast<PeerCap>(m_capabilities.at(std::make_pair(PeerCap::name(), PeerCap::version()))); } catch (...) { return nullptr; } }
@ -81,9 +81,9 @@ public:
int rating() const;
void addRating(int _r);
void addNote(std::string const& _k, std::string const& _v) { m_info.notes[_k] = _v; }
void addNote(std::string const& _k, std::string const& _v) { Guard l(x_info); m_info.notes[_k] = _v; }
PeerSessionInfo const& info() const { return m_info; }
PeerSessionInfo info() const { Guard l(x_info); return m_info; }
void ensureNodesRequested();
void serviceNodesRequest();
@ -119,6 +119,7 @@ private:
std::shared_ptr<Peer> m_peer; ///< The Peer object.
bool m_dropped = false; ///< If true, we've already divested ourselves of this peer. We're just waiting for the reads & writes to fail before the shared_ptr goes OOS and the destructor kicks in.
mutable Mutex x_info;
PeerSessionInfo m_info; ///< Dynamic information about this peer.
bool m_theyRequestedNodes = false; ///< Has the peer requested nodes from us without receiveing an answer from us?

Loading…
Cancel
Save