Browse Source

Get blocks in right order.

cl-refactor
Gav Wood 11 years ago
parent
commit
17f55abd37
  1. 5
      libdevcore/RangeMask.h
  2. 21
      libethereum/DownloadMan.cpp
  3. 12
      libethereum/DownloadMan.h
  4. 7
      libethereum/EthereumHost.cpp
  5. 2
      libethereum/EthereumHost.h
  6. 18
      libethereum/EthereumPeer.cpp
  7. 8
      libp2p/Host.cpp

5
libdevcore/RangeMask.h

@ -146,6 +146,11 @@ public:
return m_ranges.empty(); return m_ranges.empty();
} }
bool full() const
{
return m_ranges.size() == 1 && m_ranges.begin()->first == m_all.first && m_ranges.begin()->second == m_all.second;
}
void clear() void clear()
{ {
m_ranges.clear(); m_ranges.clear();

21
libethereum/DownloadMan.cpp

@ -39,26 +39,31 @@ DownloadSub::~DownloadSub()
} }
} }
h256s DownloadSub::nextFetch(unsigned _n) h256Set DownloadSub::nextFetch(unsigned _n)
{ {
Guard l(m_fetch); Guard l(m_fetch);
if (m_remaining.size())
return m_remaining;
m_asked.clear(); m_asked.clear();
m_indices.clear();
m_remaining.clear();
if (!m_man) if (!m_man)
return h256s(); return h256Set();
m_asked = (~(m_man->taken() + m_attempted)).lowest(_n); m_asked = (~(m_man->taken() + m_attempted)).lowest(_n);
if (m_asked.empty()) if (m_asked.empty())
m_asked = (~(m_man->taken(true) + m_attempted)).lowest(_n); m_asked = (~(m_man->taken(true) + m_attempted)).lowest(_n);
m_attempted += m_asked; m_attempted += m_asked;
m_indices.clear();
h256s ret;
for (auto i: m_asked) for (auto i: m_asked)
{ {
ret.push_back(m_man->m_chain[i]); auto x = m_man->m_chain[i];
m_indices[ret.back()] = i; m_remaining.insert(x);
m_indices[x] = i;
} }
return ret; return m_remaining;
} }
void DownloadSub::noteBlock(h256 _hash) void DownloadSub::noteBlock(h256 _hash)
@ -66,5 +71,5 @@ void DownloadSub::noteBlock(h256 _hash)
Guard l(m_fetch); Guard l(m_fetch);
if (m_man && m_indices.count(_hash)) if (m_man && m_indices.count(_hash))
m_man->m_blocksGot += m_indices[_hash]; m_man->m_blocksGot += m_indices[_hash];
m_remaining.erase(_hash);
} }

12
libethereum/DownloadMan.h

@ -46,17 +46,19 @@ public:
~DownloadSub(); ~DownloadSub();
/// Finished last fetch - grab the next bunch of block hashes to download. /// Finished last fetch - grab the next bunch of block hashes to download.
h256s nextFetch(unsigned _n); h256Set nextFetch(unsigned _n);
/// Note that we've received a particular block. /// Note that we've received a particular block.
void noteBlock(h256 _hash); void noteBlock(h256 _hash);
void doneFetch() { nextFetch(0); } /// Nothing doing here.
void doneFetch() { resetFetch(); }
private: private:
void resetFetch() // Called by DownloadMan when we need to reset the download. void resetFetch() // Called by DownloadMan when we need to reset the download.
{ {
Guard l(m_fetch); Guard l(m_fetch);
m_remaining.clear();
m_indices.clear(); m_indices.clear();
m_asked.clear(); m_asked.clear();
m_attempted.clear(); m_attempted.clear();
@ -65,6 +67,7 @@ private:
DownloadMan* m_man = nullptr; DownloadMan* m_man = nullptr;
Mutex m_fetch; Mutex m_fetch;
h256Set m_remaining;
std::map<h256, unsigned> m_indices; std::map<h256, unsigned> m_indices;
RangeMask<unsigned> m_asked; RangeMask<unsigned> m_asked;
RangeMask<unsigned> m_attempted; RangeMask<unsigned> m_attempted;
@ -105,6 +108,11 @@ public:
return ret; return ret;
} }
bool isComplete() const
{
return m_blocksGot.full();
}
private: private:
h256s m_chain; h256s m_chain;
RangeMask<unsigned> m_blocksGot; RangeMask<unsigned> m_blocksGot;

7
libethereum/EthereumHost.cpp

@ -144,9 +144,12 @@ void EthereumHost::noteHaveChain(EthereumPeer* _from)
clog(NetNote) << "Difficulty of hashchain HIGHER. Replacing fetch queue [latest now" << _from->m_latestHash.abridged() << ", was" << m_latestBlockSent.abridged() << "]"; clog(NetNote) << "Difficulty of hashchain HIGHER. Replacing fetch queue [latest now" << _from->m_latestHash.abridged() << ", was" << m_latestBlockSent.abridged() << "]";
// Looks like it's the best yet for total difficulty. Set to download. // Looks like it's the best yet for total difficulty. Set to download.
m_man.resetToChain(_from->m_neededBlocks);
{ {
Guard l(x_blocksNeeded); Guard l(x_blocksNeeded);
m_blocksNeeded = _from->m_neededBlocks; m_blocksNeeded.clear();
for (auto i = _from->m_neededBlocks.rbegin(); i != _from->m_neededBlocks.rend(); ++i)
m_blocksNeeded.push_back(*i);
m_blocksOnWay.clear(); m_blocksOnWay.clear();
m_totalDifficultyOfNeeded = td; m_totalDifficultyOfNeeded = td;
m_latestBlockSent = _from->m_latestHash; m_latestBlockSent = _from->m_latestHash;
@ -173,7 +176,7 @@ void EthereumHost::readyForSync()
void EthereumHost::noteDoneBlocks() void EthereumHost::noteDoneBlocks()
{ {
if (m_blocksOnWay.empty()) if (m_man.isComplete())
{ {
// Done our chain-get. // Done our chain-get.
if (m_blocksNeeded.size()) if (m_blocksNeeded.size())

2
libethereum/EthereumHost.h

@ -111,7 +111,7 @@ private:
u256 m_networkId; u256 m_networkId;
Grabbing m_grabbing = Grabbing::Nothing; Grabbing m_grabbing = Grabbing::Nothing; // TODO: needs to be thread-safe & switch to just having a peer id.
mutable std::recursive_mutex m_incomingLock; mutable std::recursive_mutex m_incomingLock;
std::vector<bytes> m_incomingTransactions; std::vector<bytes> m_incomingTransactions;

18
libethereum/EthereumPeer.cpp

@ -66,7 +66,7 @@ void EthereumPeer::sendStatus()
void EthereumPeer::startInitialSync() void EthereumPeer::startInitialSync()
{ {
// Grab trsansactions off them. // Grab transactions off them.
{ {
RLPStream s; RLPStream s;
prep(s).appendList(1); prep(s).appendList(1);
@ -102,6 +102,7 @@ void EthereumPeer::tryGrabbingHashChain()
{ {
clogS(NetAllDetail) << "Yes. Their chain is better."; clogS(NetAllDetail) << "Yes. Their chain is better.";
host()->updateGrabbing(Grabbing::Hashes);
m_grabbing = Grabbing::Hashes; m_grabbing = Grabbing::Hashes;
RLPStream s; RLPStream s;
prep(s).appendList(3); prep(s).appendList(3);
@ -255,6 +256,7 @@ bool EthereumPeer::interpret(RLP const& _r)
if (_r.itemCount() == 1 && !m_askedBlocksChanged) if (_r.itemCount() == 1 && !m_askedBlocksChanged)
{ {
// Couldn't get any from last batch - probably got to this peer's latest block - just give up. // Couldn't get any from last batch - probably got to this peer's latest block - just give up.
m_sub.doneFetch();
giveUpOnFetch(); giveUpOnFetch();
} }
m_askedBlocksChanged = false; m_askedBlocksChanged = false;
@ -263,6 +265,7 @@ bool EthereumPeer::interpret(RLP const& _r)
for (unsigned i = 1; i < _r.itemCount(); ++i) for (unsigned i = 1; i < _r.itemCount(); ++i)
{ {
auto h = BlockInfo::headerHash(_r[i].data()); auto h = BlockInfo::headerHash(_r[i].data());
m_sub.noteBlock(h);
if (host()->noteBlock(h, _r[i].data())) if (host()->noteBlock(h, _r[i].data()))
used++; used++;
m_askedBlocks.erase(h); m_askedBlocks.erase(h);
@ -282,12 +285,12 @@ bool EthereumPeer::interpret(RLP const& _r)
if (!host()->m_chain.details(bi.parentHash) && !m_knownBlocks.count(bi.parentHash)) if (!host()->m_chain.details(bi.parentHash) && !m_knownBlocks.count(bi.parentHash))
{ {
unknownParents++; unknownParents++;
clogS(NetAllDetail) << "Unknown parent" << bi.parentHash << "of block" << h; clogS(NetAllDetail) << "Unknown parent" << bi.parentHash.abridged() << "of block" << h.abridged();
} }
else else
{ {
knownParents++; knownParents++;
clogS(NetAllDetail) << "Known parent" << bi.parentHash << "of block" << h; clogS(NetAllDetail) << "Known parent" << bi.parentHash.abridged() << "of block" << h.abridged();
} }
} }
} }
@ -323,15 +326,14 @@ void EthereumPeer::ensureGettingChain()
void EthereumPeer::continueGettingChain() void EthereumPeer::continueGettingChain()
{ {
if (!m_askedBlocks.size()) auto blocks = m_sub.nextFetch(c_maxBlocksAsk);
m_askedBlocks = host()->neededBlocks(m_failedBlocks);
if (m_askedBlocks.size()) if (blocks.size())
{ {
RLPStream s; RLPStream s;
prep(s); prep(s);
s.appendList(m_askedBlocks.size() + 1) << GetBlocksPacket; s.appendList(blocks.size() + 1) << GetBlocksPacket;
for (auto i: m_askedBlocks) for (auto const& i: blocks)
s << i; s << i;
sealAndSend(s); sealAndSend(s);
} }

8
libp2p/Host.cpp

@ -77,7 +77,9 @@ Host::~Host()
void Host::start() void Host::start()
{ {
stop(); if (isWorking())
stop();
for (unsigned i = 0; i < 2; ++i) for (unsigned i = 0; i < 2; ++i)
{ {
bi::tcp::endpoint endpoint(bi::tcp::v4(), i ? 0 : m_netPrefs.listenPort); bi::tcp::endpoint endpoint(bi::tcp::v4(), i ? 0 : m_netPrefs.listenPort);
@ -104,6 +106,10 @@ void Host::start()
determinePublic(m_netPrefs.publicIP, m_netPrefs.upnp); determinePublic(m_netPrefs.publicIP, m_netPrefs.upnp);
ensureAccepting(); ensureAccepting();
m_incomingPeers.clear();
m_freePeers.clear();
m_lastPeersRequest = chrono::steady_clock::time_point::min(); m_lastPeersRequest = chrono::steady_clock::time_point::min();
clog(NetNote) << "Id:" << m_id.abridged(); clog(NetNote) << "Id:" << m_id.abridged();

Loading…
Cancel
Save