Browse Source

fixed bad blocks handling in BlockQueue

cl-refactor
arkpar 10 years ago
parent
commit
f32961daa8
  1. 2
      libdevcore/Common.h
  2. 122
      libethereum/BlockQueue.cpp
  3. 12
      libethereum/BlockQueue.h

2
libdevcore/Common.h

@ -181,7 +181,7 @@ private:
/// Scope guard for invariant check in a class derived from HasInvariants. /// Scope guard for invariant check in a class derived from HasInvariants.
#if ETH_DEBUG #if ETH_DEBUG
#define DEV_INVARIANT_CHECK ::dev::InvariantChecker __dev_invariantCheck(this) #define DEV_INVARIANT_CHECK { ::dev::InvariantChecker __dev_invariantCheck(this); }
#else #else
#define DEV_INVARIANT_CHECK (void)0; #define DEV_INVARIANT_CHECK (void)0;
#endif #endif

122
libethereum/BlockQueue.cpp

@ -37,10 +37,10 @@ const char* BlockQueueChannel::name() { return EthOrange "[]>"; }
const char* BlockQueueChannel::name() { return EthOrange "▣┅▶"; } const char* BlockQueueChannel::name() { return EthOrange "▣┅▶"; }
#endif #endif
size_t const c_maxKnownCount = 100000; ///< M size_t const c_maxKnownCount = 100000;
size_t const c_maxKnownSize = 128 * 1024 * 1024; size_t const c_maxKnownSize = 128 * 1024 * 1024;
size_t const c_maxUnknownCount = 100000; size_t const c_maxUnknownCount = 100000;
size_t const c_maxUnknownSize = 128 * 1024 * 1024; size_t const c_maxUnknownSize = 512 * 1024 * 1024; // Block size can be ~50kb
BlockQueue::BlockQueue(): BlockQueue::BlockQueue():
m_unknownSize(0), m_unknownSize(0),
@ -87,7 +87,7 @@ void BlockQueue::verifierBody()
{ {
while (!m_deleting) while (!m_deleting)
{ {
std::pair<h256, bytes> work; UnverifiedBlock work;
{ {
unique_lock<Mutex> l(m_verification); unique_lock<Mutex> l(m_verification);
@ -97,12 +97,13 @@ void BlockQueue::verifierBody()
swap(work, m_unverified.front()); swap(work, m_unverified.front());
m_unverified.pop_front(); m_unverified.pop_front();
BlockInfo bi; BlockInfo bi;
bi.mixHash = work.first; bi.mixHash = work.hash;
bi.parentHash = work.parentHash;
m_verifying.push_back(VerifiedBlock { VerifiedBlockRef { bytesConstRef(), move(bi), Transactions() }, bytes() }); m_verifying.push_back(VerifiedBlock { VerifiedBlockRef { bytesConstRef(), move(bi), Transactions() }, bytes() });
} }
VerifiedBlock res; VerifiedBlock res;
swap(work.second, res.blockData); swap(work.block, res.blockData);
try try
{ {
res.verified = BlockChain::verifyBlock(res.blockData, m_onBad); res.verified = BlockChain::verifyBlock(res.blockData, m_onBad);
@ -114,13 +115,13 @@ void BlockQueue::verifierBody()
// has to be this order as that's how invariants() assumes. // has to be this order as that's how invariants() assumes.
WriteGuard l2(m_lock); WriteGuard l2(m_lock);
unique_lock<Mutex> l(m_verification); unique_lock<Mutex> l(m_verification);
m_readySet.erase(work.first); m_readySet.erase(work.hash);
m_knownBad.insert(work.first); m_knownBad.insert(work.hash);
} }
unique_lock<Mutex> l(m_verification); unique_lock<Mutex> l(m_verification);
for (auto it = m_verifying.begin(); it != m_verifying.end(); ++it) for (auto it = m_verifying.begin(); it != m_verifying.end(); ++it)
if (it->verified.info.mixHash == work.first) if (it->verified.info.mixHash == work.hash)
{ {
m_verifying.erase(it); m_verifying.erase(it);
goto OK1; goto OK1;
@ -132,12 +133,13 @@ void BlockQueue::verifierBody()
bool ready = false; bool ready = false;
{ {
WriteGuard l2(m_lock);
unique_lock<Mutex> l(m_verification); unique_lock<Mutex> l(m_verification);
if (!m_verifying.empty() && m_verifying.front().verified.info.mixHash == work.first) if (!m_verifying.empty() && m_verifying.front().verified.info.mixHash == work.hash)
{ {
// we're next! // we're next!
m_verifying.pop_front(); m_verifying.pop_front();
if (m_knownBad.count(res.verified.info.hash())) if (m_knownBad.count(res.verified.info.parentHash))
{ {
m_readySet.erase(res.verified.info.hash()); m_readySet.erase(res.verified.info.hash());
m_knownBad.insert(res.verified.info.hash()); m_knownBad.insert(res.verified.info.hash());
@ -146,7 +148,7 @@ void BlockQueue::verifierBody()
m_verified.push_back(move(res)); m_verified.push_back(move(res));
while (m_verifying.size() && !m_verifying.front().blockData.empty()) while (m_verifying.size() && !m_verifying.front().blockData.empty())
{ {
if (m_knownBad.count(m_verifying.front().verified.info.hash())) if (m_knownBad.count(m_verifying.front().verified.info.parentHash))
{ {
m_readySet.erase(m_verifying.front().verified.info.hash()); m_readySet.erase(m_verifying.front().verified.info.hash());
m_knownBad.insert(res.verified.info.hash()); m_knownBad.insert(res.verified.info.hash());
@ -160,7 +162,7 @@ void BlockQueue::verifierBody()
else else
{ {
for (auto& i: m_verifying) for (auto& i: m_verifying)
if (i.verified.info.mixHash == work.first) if (i.verified.info.mixHash == work.hash)
{ {
i = move(res); i = move(res);
goto OK; goto OK;
@ -235,6 +237,7 @@ ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc, boo
if (m_knownBad.count(bi.parentHash)) if (m_knownBad.count(bi.parentHash))
{ {
m_knownBad.insert(bi.hash()); m_knownBad.insert(bi.hash());
updateBad(bi.hash());
// bad parent; this is bad too, note it as such // bad parent; this is bad too, note it as such
return ImportResult::BadChain; return ImportResult::BadChain;
} }
@ -254,7 +257,7 @@ ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc, boo
// If valid, append to blocks. // If valid, append to blocks.
cblockq << "OK - ready for chain insertion."; cblockq << "OK - ready for chain insertion.";
DEV_GUARDED(m_verification) DEV_GUARDED(m_verification)
m_unverified.push_back(make_pair(h, _block.toBytes())); m_unverified.push_back(UnverifiedBlock { h, bi.parentHash, _block.toBytes() });
m_moreToVerify.notify_one(); m_moreToVerify.notify_one();
m_readySet.insert(h); m_readySet.insert(h);
m_knownSize += _block.size(); m_knownSize += _block.size();
@ -267,39 +270,93 @@ ImportResult BlockQueue::import(bytesConstRef _block, BlockChain const& _bc, boo
} }
} }
bool BlockQueue::doneDrain(h256s const& _bad) void BlockQueue::updateBad(h256 const& _bad)
{ {
WriteGuard l(m_lock);
DEV_INVARIANT_CHECK; DEV_INVARIANT_CHECK;
m_drainingSet.clear(); DEV_GUARDED(m_verification)
if (_bad.size())
{ {
// at least one of them was bad. collectUnknownBad(_bad);
m_knownBad += _bad; bool moreBad = true;
DEV_GUARDED(m_verification) while (moreBad)
{ {
moreBad = false;
std::vector<VerifiedBlock> oldVerified; std::vector<VerifiedBlock> oldVerified;
swap(m_verified, oldVerified); swap(m_verified, oldVerified);
for (auto& b: oldVerified) for (auto& b: oldVerified)
if (m_knownBad.count(b.verified.info.parentHash)) if (m_knownBad.count(b.verified.info.parentHash) || m_knownBad.count(b.verified.info.hash()))
{ {
m_knownBad.insert(b.verified.info.hash()); m_knownBad.insert(b.verified.info.hash());
m_readySet.erase(b.verified.info.hash()); m_readySet.erase(b.verified.info.hash());
collectUnknownBad(b.verified.info.hash());
moreBad = true;
} }
else else
m_verified.push_back(std::move(b)); m_verified.push_back(std::move(b));
std::deque<UnverifiedBlock> oldUnverified;
swap(m_unverified, oldUnverified);
for (auto& b: oldUnverified)
if (m_knownBad.count(b.parentHash) || m_knownBad.count(b.hash))
{
m_knownBad.insert(b.hash);
m_readySet.erase(b.hash);
collectUnknownBad(b.hash);
moreBad = true;
}
else
m_unverified.push_back(std::move(b));
std::deque<VerifiedBlock> oldVerifying;
swap(m_verifying, oldVerifying);
for (auto& b: oldVerifying)
if (m_knownBad.count(b.verified.info.parentHash) || m_knownBad.count(b.verified.info.mixHash))
{
h256 const& h = b.blockData.size() != 0 ? b.verified.info.hash() : b.verified.info.mixHash;
m_knownBad.insert(h);
m_readySet.erase(h);
collectUnknownBad(h);
moreBad = true;
}
else
m_verifying.push_back(std::move(b));
} }
} }
/* DEV_GUARDED(m_verification) DEV_INVARIANT_CHECK;
}
void BlockQueue::collectUnknownBad(h256 const& _bad)
{
list<h256> badQueue(1, _bad);
while (!badQueue.empty())
{
auto r = m_unknown.equal_range(badQueue.front());
badQueue.pop_front();
for (auto it = r.first; it != r.second; ++it)
{ {
m_knownBad += _bad; m_unknownSize -= it->second.second.size();
m_knownBad += m_readySet; m_unknownCount--;
m_readySet.clear(); auto newBad = it->second.first;
m_verified.clear(); m_unknownSet.erase(newBad);
m_verifying.clear(); m_knownBad.insert(newBad);
m_unverified.clear(); badQueue.push_back(newBad);
}*/ }
return !m_readySet.empty(); m_unknown.erase(r.first, r.second);
}
}
bool BlockQueue::doneDrain(h256s const& _bad)
{
WriteGuard l(m_lock);
DEV_INVARIANT_CHECK;
m_drainingSet.clear();
if (_bad.size())
{
// at least one of them was bad.
m_knownBad += _bad;
for (h256 const& b : _bad)
updateBad(b);
} return !m_readySet.empty();
} }
void BlockQueue::tick(BlockChain const& _bc) void BlockQueue::tick(BlockChain const& _bc)
@ -416,7 +473,7 @@ void BlockQueue::noteReady_WITH_LOCK(h256 const& _good)
for (auto it = r.first; it != r.second; ++it) for (auto it = r.first; it != r.second; ++it)
{ {
DEV_GUARDED(m_verification) DEV_GUARDED(m_verification)
m_unverified.push_back(it->second); m_unverified.push_back(UnverifiedBlock { it->second.first, it->first, it->second.second });
m_knownSize += it->second.second.size(); m_knownSize += it->second.second.size();
m_knownCount++; m_knownCount++;
m_unknownSize -= it->second.second.size(); m_unknownSize -= it->second.second.size();
@ -431,6 +488,7 @@ void BlockQueue::noteReady_WITH_LOCK(h256 const& _good)
} }
if (notify) if (notify)
m_moreToVerify.notify_all(); m_moreToVerify.notify_all();
DEV_INVARIANT_CHECK;
} }
void BlockQueue::retryAllUnknown() void BlockQueue::retryAllUnknown()
@ -440,7 +498,7 @@ void BlockQueue::retryAllUnknown()
for (auto it = m_unknown.begin(); it != m_unknown.end(); ++it) for (auto it = m_unknown.begin(); it != m_unknown.end(); ++it)
{ {
DEV_GUARDED(m_verification) DEV_GUARDED(m_verification)
m_unverified.push_back(it->second); m_unverified.push_back(UnverifiedBlock { it->second.first, it->first, it->second.second });
auto newReady = it->second.first; auto newReady = it->second.first;
m_unknownSet.erase(newReady); m_unknownSet.erase(newReady);
m_readySet.insert(newReady); m_readySet.insert(newReady);

12
libethereum/BlockQueue.h

@ -119,11 +119,21 @@ public:
bool unknownFull() const; bool unknownFull() const;
private: private:
struct UnverifiedBlock
{
h256 hash;
h256 parentHash;
bytes block;
};
void noteReady_WITH_LOCK(h256 const& _b); void noteReady_WITH_LOCK(h256 const& _b);
bool invariants() const override; bool invariants() const override;
void verifierBody(); void verifierBody();
void collectUnknownBad(h256 const& _bad);
void updateBad(h256 const& _bad);
mutable boost::shared_mutex m_lock; ///< General lock for the sets, m_future and m_unknown. mutable boost::shared_mutex m_lock; ///< General lock for the sets, m_future and m_unknown.
h256Hash m_drainingSet; ///< All blocks being imported. h256Hash m_drainingSet; ///< All blocks being imported.
@ -139,7 +149,7 @@ private:
std::condition_variable m_moreToVerify; ///< Signaled when m_unverified has a new entry. std::condition_variable m_moreToVerify; ///< Signaled when m_unverified has a new entry.
std::vector<VerifiedBlock> m_verified; ///< List of blocks, in correct order, verified and ready for chain-import. std::vector<VerifiedBlock> m_verified; ///< List of blocks, in correct order, verified and ready for chain-import.
std::deque<VerifiedBlock> m_verifying; ///< List of blocks being verified; as long as the block component (bytes) is empty, it's not finished. std::deque<VerifiedBlock> m_verifying; ///< List of blocks being verified; as long as the block component (bytes) is empty, it's not finished.
std::deque<std::pair<h256, bytes>> m_unverified; ///< List of blocks, in correct order, ready for verification. std::deque<UnverifiedBlock> m_unverified; ///< List of <block hash, parent hash, block data> in correct order, ready for verification.
std::vector<std::thread> m_verifiers; ///< Threads who only verify. std::vector<std::thread> m_verifiers; ///< Threads who only verify.
bool m_deleting = false; ///< Exit condition for verifiers. bool m_deleting = false; ///< Exit condition for verifiers.

Loading…
Cancel
Save