Browse Source

Merge pull request #2048 from arkpar/bc

PV61 hash downloading fixes
cl-refactor
Gav Wood 10 years ago
parent
commit
c8856355db
  1. 90
      libethereum/EthereumHost.cpp
  2. 5
      libethereum/EthereumHost.h
  3. 20
      libethereum/EthereumPeer.cpp

90
libethereum/EthereumHost.cpp

@ -91,7 +91,7 @@ void EthereumHost::doWork()
bool netChange = ensureInitialised(); bool netChange = ensureInitialised();
auto h = m_chain.currentHash(); auto h = m_chain.currentHash();
// If we've finished our initial sync (including getting all the blocks into the chain so as to reduce invalid transactions), start trading transactions & blocks // If we've finished our initial sync (including getting all the blocks into the chain so as to reduce invalid transactions), start trading transactions & blocks
if (!isSyncing() && m_chain.isKnown(m_latestBlockSent)) if (isSyncing() && m_chain.isKnown(m_latestBlockSent))
{ {
if (m_newTransactions) if (m_newTransactions)
{ {
@ -249,7 +249,6 @@ void EthereumHost::onPeerStatus(EthereumPeer* _peer)
_peer->disable("Peer banned for previous bad behaviour."); _peer->disable("Peer banned for previous bad behaviour.");
else else
{ {
_peer->m_protocolVersion = EthereumHost::c_oldProtocolVersion; //force V60 for now
if (_peer->m_protocolVersion != protocolVersion()) if (_peer->m_protocolVersion != protocolVersion())
estimatePeerHashes(_peer); estimatePeerHashes(_peer);
else if (_peer->m_latestBlockNumber > m_chain.number()) else if (_peer->m_latestBlockNumber > m_chain.number())
@ -291,6 +290,7 @@ void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes, bool
unsigned knowns = 0; unsigned knowns = 0;
unsigned unknowns = 0; unsigned unknowns = 0;
h256s neededBlocks; h256s neededBlocks;
bool syncByNumber = !m_syncingLatestHash;
for (unsigned i = 0; i < _hashes.size(); ++i) for (unsigned i = 0; i < _hashes.size(); ++i)
{ {
_peer->addRating(1); _peer->addRating(1);
@ -298,11 +298,15 @@ void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes, bool
auto status = m_bq.blockStatus(h); auto status = m_bq.blockStatus(h);
if (status == QueueStatus::Importing || status == QueueStatus::Ready || m_chain.isKnown(h)) if (status == QueueStatus::Importing || status == QueueStatus::Ready || m_chain.isKnown(h))
{ {
clog(NetMessageSummary) << "block hash ready:" << h << ". Start blocks download..."; clog(NetMessageSummary) << "Block hash already known:" << h;
if (!syncByNumber)
{
m_hashes += neededBlocks; m_hashes += neededBlocks;
clog(NetMessageSummary) << "Start blocks download...";
onPeerDoneHashes(_peer, true); onPeerDoneHashes(_peer, true);
return; return;
} }
}
else if (status == QueueStatus::Bad) else if (status == QueueStatus::Bad)
{ {
cwarn << "block hash bad!" << h << ". Bailing..."; cwarn << "block hash bad!" << h << ". Bailing...";
@ -316,65 +320,25 @@ void EthereumHost::onPeerHashes(EthereumPeer* _peer, h256s const& _hashes, bool
} }
else else
knowns++; knowns++;
if (!syncByNumber)
m_syncingLatestHash = h; m_syncingLatestHash = h;
} }
m_hashes += neededBlocks; if (syncByNumber)
clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns; now at" << m_syncingLatestHash;
if (_complete)
{
m_needSyncBlocks = true;
continueSync(_peer);
}
else if (m_hashes.size() > _peer->m_expectedHashes)
{ {
_peer->disable("Too many hashes"); m_man.appendToChain(neededBlocks); // Append to download manager immediatelly
m_hashes.clear(); clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns";
m_syncingLatestHash = h256();
continueSync(); ///Try with some other peer, keep the chain
} }
else else
continueSync(_peer); /// Grab next hashes
}
void EthereumHost::onPeerHashes(EthereumPeer* _peer, unsigned /*_index*/, h256s const& _hashes)
{
Guard l(x_sync);
assert(_peer->m_asking == Asking::Nothing);
if (_hashes.empty())
{
onPeerDoneHashes(_peer, true);
return;
}
unsigned knowns = 0;
unsigned unknowns = 0;
h256s neededBlocks;
for (unsigned i = 0; i < _hashes.size(); ++i)
{ {
_peer->addRating(1); m_hashes += neededBlocks; // Append to local list
auto h = _hashes[i]; clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns; now at" << m_syncingLatestHash;
auto status = m_bq.blockStatus(h);
if (status == QueueStatus::Importing || status == QueueStatus::Ready || m_chain.isKnown(h))
{
clog(NetWarn) << "block hash already known:" << h;
}
else if (status == QueueStatus::Bad)
{
clog(NetWarn) << "block hash bad!" << h << ". Bailing...";
_peer->setIdle();
return;
} }
else if (status == QueueStatus::Unknown) if (_complete)
{ {
unknowns++; m_needSyncBlocks = true;
neededBlocks.push_back(h); continueSync(_peer);
}
else
knowns++;
} }
m_man.appendToChain(neededBlocks); else if (syncByNumber && m_hashMan.isComplete())
clog(NetMessageSummary) << knowns << "knowns," << unknowns << "unknowns";
if (m_hashMan.isComplete())
{ {
// Done our chain-get. // Done our chain-get.
m_needSyncHashes = false; m_needSyncHashes = false;
@ -384,8 +348,15 @@ void EthereumHost::onPeerHashes(EthereumPeer* _peer, unsigned /*_index*/, h256s
m_hashMan.reset(m_chain.number() + 1); m_hashMan.reset(m_chain.number() + 1);
continueSync(); continueSync();
} }
else if (m_hashes.size() > _peer->m_expectedHashes)
{
_peer->disable("Too many hashes");
m_hashes.clear();
m_syncingLatestHash = h256();
continueSync(); ///Try with some other peer, keep the chain
}
else else
continueSync(_peer); continueSync(_peer); /// Grab next hashes
} }
void EthereumHost::onPeerDoneHashes(EthereumPeer* _peer, bool _localChain) void EthereumHost::onPeerDoneHashes(EthereumPeer* _peer, bool _localChain)
@ -479,7 +450,7 @@ void EthereumHost::onPeerBlocks(EthereumPeer* _peer, RLP const& _r)
void EthereumHost::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes) void EthereumHost::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes)
{ {
Guard l(x_sync); Guard l(x_sync);
if (_peer->m_asking != Asking::Nothing) if (isSyncing_UNSAFE())
{ {
clog(NetMessageSummary) << "Ignoring new hashes since we're already downloading."; clog(NetMessageSummary) << "Ignoring new hashes since we're already downloading.";
return; return;
@ -491,7 +462,7 @@ void EthereumHost::onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes)
void EthereumHost::onPeerNewBlock(EthereumPeer* _peer, RLP const& _r) void EthereumHost::onPeerNewBlock(EthereumPeer* _peer, RLP const& _r)
{ {
Guard l(x_sync); Guard l(x_sync);
if (_peer->m_asking != Asking::Nothing) if (isSyncing_UNSAFE())
{ {
clog(NetMessageSummary) << "Ignoring new blocks since we're already downloading."; clog(NetMessageSummary) << "Ignoring new blocks since we're already downloading.";
return; return;
@ -533,7 +504,7 @@ void EthereumHost::onPeerNewBlock(EthereumPeer* _peer, RLP const& _r)
_peer->m_totalDifficulty = difficulty; _peer->m_totalDifficulty = difficulty;
m_needSyncHashes = true; m_needSyncHashes = true;
m_needSyncBlocks = true; m_needSyncBlocks = true;
m_syncingLatestHash = _peer->m_latestHash; m_syncingLatestHash = h;
sync = true; sync = true;
} }
} }
@ -654,9 +625,10 @@ bool EthereumHost::peerShouldGrabChain(EthereumPeer* _peer) const
} }
} }
bool EthereumHost::isSyncing() const bool EthereumHost::isSyncing_UNSAFE() const
{ {
Guard l(x_sync); /// We need actual peer information here to handle the case when we are the first ever peer on the network to mine.
/// I.e. on a new private network the first node mining has noone to sync with and should start block propogation immediately.
bool syncing = false; bool syncing = false;
forEachPeer([&](EthereumPeer* _p) forEachPeer([&](EthereumPeer* _p)
{ {

5
libethereum/EthereumHost.h

@ -70,8 +70,7 @@ public:
void reset(); void reset();
DownloadMan const& downloadMan() const { return m_man; } DownloadMan const& downloadMan() const { return m_man; }
bool isSyncing() const; bool isSyncing() const { Guard l(x_sync); return isSyncing_UNSAFE(); }
bool isBanned(p2p::NodeId _id) const { return !!m_banned.count(_id); } bool isBanned(p2p::NodeId _id) const { return !!m_banned.count(_id); }
void noteNewTransactions() { m_newTransactions = true; } void noteNewTransactions() { m_newTransactions = true; }
@ -82,7 +81,6 @@ public:
void onPeerNewBlock(EthereumPeer* _peer, RLP const& _r); ///< Called by peer once it has new blocks void onPeerNewBlock(EthereumPeer* _peer, RLP const& _r); ///< Called by peer once it has new blocks
void onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes); ///< Called by peer once it has new hashes void onPeerNewHashes(EthereumPeer* _peer, h256s const& _hashes); ///< Called by peer once it has new hashes
void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes); ///< Called by peer once it has another sequential block of hashes during sync void onPeerHashes(EthereumPeer* _peer, h256s const& _hashes); ///< Called by peer once it has another sequential block of hashes during sync
void onPeerHashes(EthereumPeer* _peer, unsigned _index, h256s const& _hashes); ///< Called by peer once it has a new ordered block of hashes starting with a particular number
void onPeerTransactions(EthereumPeer* _peer, RLP const& _r); ///< Called by peer when it has new transactions void onPeerTransactions(EthereumPeer* _peer, RLP const& _r); ///< Called by peer when it has new transactions
DownloadMan& downloadMan() { return m_man; } DownloadMan& downloadMan() { return m_man; }
@ -96,6 +94,7 @@ private:
void forEachPeerPtr(std::function<void(std::shared_ptr<EthereumPeer>)> const& _f) const; void forEachPeerPtr(std::function<void(std::shared_ptr<EthereumPeer>)> const& _f) const;
void forEachPeer(std::function<void(EthereumPeer*)> const& _f) const; void forEachPeer(std::function<void(EthereumPeer*)> const& _f) const;
bool isSyncing_UNSAFE() const;
/// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network. /// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network.
void doWork(); void doWork();

20
libethereum/EthereumPeer.cpp

@ -40,7 +40,6 @@ EthereumPeer::EthereumPeer(Session* _s, HostCapabilityFace* _h, unsigned _i, Cap
m_hashSub(host()->hashDownloadMan()), m_hashSub(host()->hashDownloadMan()),
m_peerCapabilityVersion(_cap.second) m_peerCapabilityVersion(_cap.second)
{ {
m_peerCapabilityVersion = EthereumHost::c_oldProtocolVersion;
m_syncHashNumber = host()->chain().number() + 1; m_syncHashNumber = host()->chain().number() + 1;
requestStatus(); requestStatus();
} }
@ -78,7 +77,6 @@ string toString(Asking _a)
return "?"; return "?";
} }
void EthereumPeer::setIdle() void EthereumPeer::setIdle()
{ {
m_sub.doneFetch(); m_sub.doneFetch();
@ -88,8 +86,7 @@ void EthereumPeer::setIdle()
void EthereumPeer::requestStatus() void EthereumPeer::requestStatus()
{ {
if (m_asking != Asking::Nothing) assert(m_asking == Asking::Nothing);
clog(NetWarn) << "Bad state: requesting state should be the first action";
setAsking(Asking::State); setAsking(Asking::State);
RLPStream s; RLPStream s;
bool latest = m_peerCapabilityVersion == host()->protocolVersion(); bool latest = m_peerCapabilityVersion == host()->protocolVersion();
@ -106,22 +103,22 @@ void EthereumPeer::requestStatus()
void EthereumPeer::requestHashes() void EthereumPeer::requestHashes()
{ {
if (m_asking == Asking::Blocks) assert(m_asking == Asking::Nothing);
return;
m_syncHashNumber = m_hashSub.nextFetch(c_maxHashesAsk); m_syncHashNumber = m_hashSub.nextFetch(c_maxHashesAsk);
setAsking(Asking::Hashes); setAsking(Asking::Hashes);
RLPStream s; RLPStream s;
prep(s, GetBlockHashesByNumberPacket, 2) << m_syncHashNumber << c_maxHashesAsk; prep(s, GetBlockHashesByNumberPacket, 2) << m_syncHashNumber << c_maxHashesAsk;
clog(NetMessageDetail) << "Requesting block hashes for numbers " << m_syncHashNumber << "-" << m_syncHashNumber + c_maxHashesAsk - 1;
sealAndSend(s); sealAndSend(s);
} }
void EthereumPeer::requestHashes(h256 const& _lastHash) void EthereumPeer::requestHashes(h256 const& _lastHash)
{ {
if (m_asking == Asking::Blocks) assert(m_asking == Asking::Nothing);
return;
setAsking(Asking::Hashes); setAsking(Asking::Hashes);
RLPStream s; RLPStream s;
prep(s, GetBlockHashesPacket, 2) << _lastHash << c_maxHashesAsk; prep(s, GetBlockHashesPacket, 2) << _lastHash << c_maxHashesAsk;
clog(NetMessageDetail) << "Requesting block hashes staring from " << _lastHash;
sealAndSend(s); sealAndSend(s);
} }
@ -212,7 +209,7 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
u256 number256 = _r[0].toInt<u256>(); u256 number256 = _r[0].toInt<u256>();
unsigned number = (unsigned) number256; unsigned number = (unsigned) number256;
unsigned limit = _r[1].toInt<unsigned>(); unsigned limit = _r[1].toInt<unsigned>();
clog(NetMessageSummary) << "GetBlockHashesByNumber (" << number << "-" << number + limit << ")"; clog(NetMessageSummary) << "GetBlockHashesByNumber (" << number << "-" << number + limit - 1 << ")";
RLPStream s; RLPStream s;
if (number <= host()->chain().number()) if (number <= host()->chain().number())
{ {
@ -248,11 +245,8 @@ bool EthereumPeer::interpret(unsigned _id, RLP const& _r)
m_hashSub.noteHash(m_syncHashNumber + i, 1); m_hashSub.noteHash(m_syncHashNumber + i, 1);
} }
if (m_protocolVersion == host()->protocolVersion())
host()->onPeerHashes(this, m_syncHashNumber, hashes); // V61+, report hashes by number
else
host()->onPeerHashes(this, hashes);
m_syncHashNumber += itemCount; m_syncHashNumber += itemCount;
host()->onPeerHashes(this, hashes);
break; break;
} }
case GetBlocksPacket: case GetBlocksPacket:

Loading…
Cancel
Save