Browse Source

block queue limiting and other fixes

cl-refactor
arkpar 10 years ago
parent
commit
0a44c7ab4f
  1. 2
      libdevcore/Log.cpp
  2. 4
      libethereum/BlockQueue.cpp
  3. 52
      libethereum/EthereumHost.cpp
  4. 3
      libethereum/EthereumHost.h
  5. 6
      libethereum/EthereumPeer.cpp
  6. 2
      libethereum/State.cpp

2
libdevcore/Log.cpp

@ -40,7 +40,7 @@ mutex x_logOverride;
/// or equal to the currently output verbosity (g_logVerbosity). /// or equal to the currently output verbosity (g_logVerbosity).
static map<type_info const*, bool> s_logOverride; static map<type_info const*, bool> s_logOverride;
bool isChannelVisible(std::type_info const* _ch, bool _default) bool dev::isChannelVisible(std::type_info const* _ch, bool _default)
{ {
Guard l(x_logOverride); Guard l(x_logOverride);
if (s_logOverride.count(_ch)) if (s_logOverride.count(_ch))

4
libethereum/BlockQueue.cpp

@ -38,9 +38,9 @@ const char* BlockQueueChannel::name() { return EthOrange "▣┅▶"; }
#endif #endif
size_t const c_maxKnownCount = 100000; ///< M size_t const c_maxKnownCount = 100000; ///< M
size_t const c_maxKnownSize = 64 * 1024 * 1024; size_t const c_maxKnownSize = 128 * 1024 * 1024;
size_t const c_maxUnknownCount = 100000; size_t const c_maxUnknownCount = 100000;
size_t const c_maxUnknownSize = 64 * 1024 * 1024; size_t const c_maxUnknownSize = 128 * 1024 * 1024;
BlockQueue::BlockQueue(): BlockQueue::BlockQueue():
m_unknownSize(0), m_unknownSize(0),

52
libethereum/EthereumHost.cpp

@ -39,6 +39,7 @@ using namespace dev::eth;
using namespace p2p; using namespace p2p;
unsigned const EthereumHost::c_oldProtocolVersion = 60; //TODO: remove this once v61+ is common unsigned const EthereumHost::c_oldProtocolVersion = 60; //TODO: remove this once v61+ is common
unsigned const c_chainReorgSize = 30000;
EthereumHost::EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQueue& _bq, u256 _networkId): EthereumHost::EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQueue& _bq, u256 _networkId):
HostCapability<EthereumPeer>(), HostCapability<EthereumPeer>(),
@ -50,6 +51,7 @@ EthereumHost::EthereumHost(BlockChain const& _ch, TransactionQueue& _tq, BlockQu
{ {
m_latestBlockSent = _ch.currentHash(); m_latestBlockSent = _ch.currentHash();
m_hashMan.reset(m_chain.number() + 1); m_hashMan.reset(m_chain.number() + 1);
m_bqRoomAvailable = m_bq.onRoomAvailable([this](){ this->continueSync(); });
} }
EthereumHost::~EthereumHost() EthereumHost::~EthereumHost()
@ -250,37 +252,43 @@ void EthereumHost::onPeerStatus(EthereumPeer* _peer)
_peer->disable("Peer banned for previous bad behaviour."); _peer->disable("Peer banned for previous bad behaviour.");
else else
{ {
if (_peer->m_protocolVersion != protocolVersion()) unsigned estimatedHashes = estimateHashes();
estimatePeerHashes(_peer); if (_peer->m_protocolVersion == protocolVersion())
else
{ {
if (_peer->m_latestBlockNumber > m_chain.number()) if (_peer->m_latestBlockNumber > m_chain.number())
_peer->m_expectedHashes = (unsigned)_peer->m_latestBlockNumber - m_chain.number(); _peer->m_expectedHashes = (unsigned)_peer->m_latestBlockNumber - m_chain.number();
if (m_hashMan.chainSize() < _peer->m_expectedHashes) if (_peer->m_expectedHashes > estimatedHashes)
_peer->disable("Too many hashes");
else if (m_hashMan.chainSize() < _peer->m_expectedHashes)
m_hashMan.resetToRange(m_chain.number() + 1, _peer->m_expectedHashes); m_hashMan.resetToRange(m_chain.number() + 1, _peer->m_expectedHashes);
} }
else
_peer->m_expectedHashes = estimatedHashes;
continueSync(_peer); continueSync(_peer);
} }
} }
void EthereumHost::estimatePeerHashes(EthereumPeer* _peer) unsigned EthereumHost::estimateHashes()
{ {
BlockInfo block = m_chain.info(); BlockInfo block = m_chain.info();
time_t lastBlockTime = (block.hash() == m_chain.genesisHash()) ? 1428192000 : (time_t)block.timestamp; time_t lastBlockTime = (block.hash() == m_chain.genesisHash()) ? 1428192000 : (time_t)block.timestamp;
time_t now = time(0); time_t now = time(0);
unsigned blockCount = 30000; unsigned blockCount = c_chainReorgSize;
if (lastBlockTime > now) if (lastBlockTime > now)
clog(NetWarn) << "Clock skew? Latest block is in the future"; clog(NetWarn) << "Clock skew? Latest block is in the future";
else else
blockCount += (now - lastBlockTime) / (unsigned)c_durationLimit; blockCount += (now - lastBlockTime) / (unsigned)c_durationLimit;
clog(NetAllDetail) << "Estimated hashes: " << blockCount; clog(NetAllDetail) << "Estimated hashes: " << blockCount;
_peer->m_expectedHashes = blockCount; return blockCount;
} }
void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes) void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes)
{ {
RecursiveGuard l(x_sync); RecursiveGuard l(x_sync);
assert(_peer->m_asking == Asking::Nothing); if (_peer->m_syncHashNumber > 0)
_peer->m_syncHashNumber += _hashes.size();
_peer->setAsking(Asking::Nothing);
onPeerHashes(_peer, _hashes, false); onPeerHashes(_peer, _hashes, false);
} }
@ -392,7 +400,7 @@ void EthereumHost::onPeerDoneHashes(EthereumPeer* _peer, bool _localChain)
void EthereumHost::onPeerBlocks(EthereumPeer* _peer, RLP const& _r) void EthereumHost::onPeerBlocks(EthereumPeer* _peer, RLP const& _r)
{ {
RecursiveGuard l(x_sync); RecursiveGuard l(x_sync);
assert(_peer->m_asking == Asking::Nothing); _peer->setAsking(Asking::Nothing);
unsigned itemCount = _r.itemCount(); unsigned itemCount = _r.itemCount();
clog(NetMessageSummary) << "Blocks (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreBlocks"); clog(NetMessageSummary) << "Blocks (" << dec << itemCount << "entries)" << (itemCount ? "" : ": NoMoreBlocks");
@ -594,8 +602,14 @@ void EthereumHost::continueSync(EthereumPeer* _peer)
assert(_peer->m_asking == Asking::Nothing); assert(_peer->m_asking == Asking::Nothing);
bool otherPeerV60Sync = false; bool otherPeerV60Sync = false;
bool otherPeerV61Sync = false; bool otherPeerV61Sync = false;
if (m_needSyncHashes && peerShouldGrabChain(_peer)) if (m_needSyncHashes)
{
if (!peerShouldGrabChain(_peer))
{ {
_peer->setIdle();
return;
}
foreachPeer([&](EthereumPeer* _p) foreachPeer([&](EthereumPeer* _p)
{ {
if (_p != _peer && _p->m_asking == Asking::Hashes) if (_p != _peer && _p->m_asking == Asking::Hashes)
@ -642,7 +656,23 @@ void EthereumHost::continueSync(EthereumPeer* _peer)
} }
} }
else if (m_needSyncBlocks && peerCanHelp(_peer)) // Check if this peer can help with downloading blocks else if (m_needSyncBlocks && peerCanHelp(_peer)) // Check if this peer can help with downloading blocks
{
// Check block queue status
if (m_bq.unknownFull())
{
clog(NetWarn) << "Too many unknown blocks, restarting sync";
m_bq.clear();
reset();
continueSync();
}
else if (m_bq.knownFull())
{
clog(NetAllDetail) << "Waiting for block queue before downloading blocks";
_peer->setIdle();
}
else
_peer->requestBlocks(); _peer->requestBlocks();
}
else else
_peer->setIdle(); _peer->setIdle();
} }
@ -709,6 +739,6 @@ HashChainStatus EthereumHost::status()
RecursiveGuard l(x_sync); RecursiveGuard l(x_sync);
if (m_syncingV61) if (m_syncingV61)
return HashChainStatus { static_cast<unsigned>(m_hashMan.chainSize()), static_cast<unsigned>(m_hashMan.gotCount()), false }; return HashChainStatus { static_cast<unsigned>(m_hashMan.chainSize()), static_cast<unsigned>(m_hashMan.gotCount()), false };
return HashChainStatus { m_estimatedHashes - 30000, static_cast<unsigned>(m_hashes.size()), true }; return HashChainStatus { m_estimatedHashes > 0 ? m_estimatedHashes - c_chainReorgSize : 0, static_cast<unsigned>(m_hashes.size()), m_estimatedHashes > 0 };
} }

3
libethereum/EthereumHost.h

@ -125,11 +125,12 @@ private:
bool peerShouldGrabBlocks(EthereumPeer* _peer) const; bool peerShouldGrabBlocks(EthereumPeer* _peer) const;
bool peerShouldGrabChain(EthereumPeer* _peer) const; bool peerShouldGrabChain(EthereumPeer* _peer) const;
bool peerCanHelp(EthereumPeer* _peer) const; bool peerCanHelp(EthereumPeer* _peer) const;
void estimatePeerHashes(EthereumPeer* _peer); unsigned estimateHashes();
BlockChain const& m_chain; BlockChain const& m_chain;
TransactionQueue& m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain. TransactionQueue& m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain.
BlockQueue& m_bq; ///< Maintains a list of incoming blocks not yet on the blockchain (to be imported). BlockQueue& m_bq; ///< Maintains a list of incoming blocks not yet on the blockchain (to be imported).
Handler m_bqRoomAvailable;
u256 m_networkId; u256 m_networkId;

6
libethereum/EthereumPeer.cpp

@ -265,13 +265,10 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
clog(NetWarn) << "Peer giving us hashes when we didn't ask for them."; clog(NetWarn) << "Peer giving us hashes when we didn't ask for them.";
break; break;
} }
setAsking(Asking::Nothing);
h256s hashes(itemCount); h256s hashes(itemCount);
for (unsigned i = 0; i < itemCount; ++i) for (unsigned i = 0; i < itemCount; ++i)
hashes[i] = _r[i].toHash<h256>(); hashes[i] = _r[i].toHash<h256>();
if (m_syncHashNumber > 0)
m_syncHashNumber += itemCount;
host()->onPeerHashes(this, hashes); host()->onPeerHashes(this, hashes);
break; break;
} }
@ -314,10 +311,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
if (m_asking != Asking::Blocks) if (m_asking != Asking::Blocks)
clog(NetImpolite) << "Peer giving us blocks when we didn't ask for them."; clog(NetImpolite) << "Peer giving us blocks when we didn't ask for them.";
else else
{
setAsking(Asking::Nothing);
host()->onPeerBlocks(this, _r); host()->onPeerBlocks(this, _r);
}
break; break;
} }
case NewBlockPacket: case NewBlockPacket:

2
libethereum/State.cpp

@ -777,6 +777,7 @@ void State::cleanup(bool _fullCommit)
paranoia("immediately before database commit", true); paranoia("immediately before database commit", true);
// Commit the new trie to disk. // Commit the new trie to disk.
if (isChannelVisible<StateTrace>()) // Avoid calling toHex if not needed
clog(StateTrace) << "Committing to disk: stateRoot" << m_currentBlock.stateRoot << "=" << rootHash() << "=" << toHex(asBytes(m_db.lookup(rootHash()))); clog(StateTrace) << "Committing to disk: stateRoot" << m_currentBlock.stateRoot << "=" << rootHash() << "=" << toHex(asBytes(m_db.lookup(rootHash())));
try { try {
@ -790,6 +791,7 @@ void State::cleanup(bool _fullCommit)
} }
m_db.commit(); m_db.commit();
if (isChannelVisible<StateTrace>()) // Avoid calling toHex if not needed
clog(StateTrace) << "Committed: stateRoot" << m_currentBlock.stateRoot << "=" << rootHash() << "=" << toHex(asBytes(m_db.lookup(rootHash()))); clog(StateTrace) << "Committed: stateRoot" << m_currentBlock.stateRoot << "=" << rootHash() << "=" << toHex(asBytes(m_db.lookup(rootHash())));
paranoia("immediately after database commit", true); paranoia("immediately after database commit", true);

Loading…
Cancel
Save