Browse Source

Less hangy.

cl-refactor
Gav Wood 11 years ago
parent
commit
8be99bd36c
  1. 34
      TODO
  2. 45
      libethereum/BlockChain.cpp
  3. 11
      libethereum/BlockChain.h
  4. 24
      libethereum/Client.cpp
  5. 262
      libethereum/PeerNetwork.cpp
  6. 9
      libethereum/PeerNetwork.h
  7. 2
      test/peer.cpp

34
TODO

@ -7,12 +7,20 @@ Crypto stuff:
- kFromMessage - kFromMessage
- Check all the tweak instructions. - Check all the tweak instructions.
Better handling of corrupt blocks.
- Kill DB & restart.
Network: Network:
- Crypto on network. TLS? - *** Exponential backoff on bad connection.
- *** Randomly select peers from incoming peers.
- NotInChain will be very bad for new peers - it'll run through until the genesis.
- Check how many it has first.
- Crypto on network - use id as public key?
- Make work with IPv6 - Make work with IPv6
- Peers rated.
- Useful/useless - new blocks/transactions or useful peers?
- Solid communications?
- Strategy for peer suggestion?
CLI client CLI client
- Implement CLI option "--help". - Implement CLI option "--help".
@ -23,22 +31,24 @@ General:
- Move over to new system. - Move over to new system.
- Remove block chain on protocol change (i.e. store protocol with block chain). - Remove block chain on protocol change (i.e. store protocol with block chain).
Robustness Robustness
- Remove aborts - Remove aborts
- Recover from all exceptions. - Recover from all exceptions.
- Store version alongside BC DB.
- Better handling of corrupt blocks.
- Kill DB & restart.
### Gav GUI
- Make address/block chain list model-based, JIT populated.
- Make everything else model-based
- Qt/QML class.
For PoC2: For PoC3:
- Shared contract acceptence tests.
- Use mining state for nonce. - Use mining state for nonce.
Network:
- NotInChain will be very bad for new peers - it'll run through until the genesis.
- Check how many it has first.
BUG: need to discard transactions if nonce too old, even when not mining. BUG: need to discard transactions if nonce too old, even when not mining.
### Marko ### Marko
Ubuntu builds Ubuntu builds

45
libethereum/BlockChain.cpp

@ -168,19 +168,21 @@ void BlockChain::import(bytes const& _block, Overlay const& _db)
auto tdIncrease = s.playback(&_block, bi, biParent, biGrandParent, true); auto tdIncrease = s.playback(&_block, bi, biParent, biGrandParent, true);
td = pd.totalDifficulty + tdIncrease; td = pd.totalDifficulty + tdIncrease;
#if !NDEBUG #if PARANOIA
checkConsistency(); checkConsistency();
#endif #endif
// All ok - insert into DB // All ok - insert into DB
m_details[newHash] = BlockDetails((uint)pd.number + 1, td, bi.parentHash, {}); {
m_detailsDB->Put(m_writeOptions, ldb::Slice((char const*)&newHash, 32), (ldb::Slice)eth::ref(m_details[newHash].rlp())); lock_guard<mutex> l(m_lock);
m_details[newHash] = BlockDetails((uint)pd.number + 1, td, bi.parentHash, {});
m_details[bi.parentHash].children.push_back(newHash);
}
m_details[bi.parentHash].children.push_back(newHash); m_detailsDB->Put(m_writeOptions, ldb::Slice((char const*)&newHash, 32), (ldb::Slice)eth::ref(m_details[newHash].rlp()));
m_detailsDB->Put(m_writeOptions, ldb::Slice((char const*)&bi.parentHash, 32), (ldb::Slice)eth::ref(m_details[bi.parentHash].rlp())); m_detailsDB->Put(m_writeOptions, ldb::Slice((char const*)&bi.parentHash, 32), (ldb::Slice)eth::ref(m_details[bi.parentHash].rlp()));
m_db->Put(m_writeOptions, ldb::Slice((char const*)&newHash, 32), (ldb::Slice)ref(_block)); m_db->Put(m_writeOptions, ldb::Slice((char const*)&newHash, 32), (ldb::Slice)ref(_block));
#if !NDEBUG #if PARANOIA
checkConsistency(); checkConsistency();
#endif #endif
} }
@ -193,7 +195,7 @@ void BlockChain::import(bytes const& _block, Overlay const& _db)
// cnote << "Parent " << bi.parentHash << " has " << details(bi.parentHash).children.size() << " children."; // cnote << "Parent " << bi.parentHash << " has " << details(bi.parentHash).children.size() << " children.";
// This might be the new last block... // This might be the new last block...
if (td > m_details[m_lastBlockHash].totalDifficulty) if (td > details(m_lastBlockHash).totalDifficulty)
{ {
m_lastBlockHash = newHash; m_lastBlockHash = newHash;
m_detailsDB->Put(m_writeOptions, ldb::Slice("best"), ldb::Slice((char const*)&newHash, 32)); m_detailsDB->Put(m_writeOptions, ldb::Slice("best"), ldb::Slice((char const*)&newHash, 32));
@ -201,7 +203,7 @@ void BlockChain::import(bytes const& _block, Overlay const& _db)
} }
else else
{ {
clog(BlockChainNote) << " Imported but not best (oTD:" << m_details[m_lastBlockHash].totalDifficulty << ", TD:" << td << ")"; clog(BlockChainNote) << " Imported but not best (oTD:" << details(m_lastBlockHash).totalDifficulty << ", TD:" << td << ")";
} }
} }
@ -230,14 +232,26 @@ bytesConstRef BlockChain::block(h256 _hash) const
if (_hash == m_genesisHash) if (_hash == m_genesisHash)
return &m_genesisBlock; return &m_genesisBlock;
m_db->Get(m_readOptions, ldb::Slice((char const*)&_hash, 32), &m_cache[_hash]); string d;
return bytesConstRef(&m_cache[_hash]); m_db->Get(m_readOptions, ldb::Slice((char const*)&_hash, 32), &d);
{
lock_guard<mutex> l(m_lock);
swap(m_cache[_hash], d);
return bytesConstRef(&m_cache[_hash]);
}
} }
BlockDetails const& BlockChain::details(h256 _h) const BlockDetails const& BlockChain::details(h256 _h) const
{ {
auto it = m_details.find(_h); std::map<h256, BlockDetails>::const_iterator it;
if (it == m_details.end()) bool fetchRequired;
{
lock_guard<mutex> l(m_lock);
it = m_details.find(_h);
fetchRequired = (it == m_details.end());
}
if (fetchRequired)
{ {
std::string s; std::string s;
m_detailsDB->Get(m_readOptions, ldb::Slice((char const*)&_h, 32), &s); m_detailsDB->Get(m_readOptions, ldb::Slice((char const*)&_h, 32), &s);
@ -246,8 +260,11 @@ BlockDetails const& BlockChain::details(h256 _h) const
// cout << "Not found in DB: " << _h << endl; // cout << "Not found in DB: " << _h << endl;
return NullBlockDetails; return NullBlockDetails;
} }
bool ok; {
tie(it, ok) = m_details.insert(make_pair(_h, BlockDetails(RLP(s)))); lock_guard<mutex> l(m_lock);
bool ok;
tie(it, ok) = m_details.insert(make_pair(_h, BlockDetails(RLP(s))));
}
} }
return it->second; return it->second;
} }

11
libethereum/BlockChain.h

@ -21,6 +21,7 @@
#pragma once #pragma once
#include <mutex>
#include "Common.h" #include "Common.h"
#include "AddressState.h" #include "AddressState.h"
namespace ldb = leveldb; namespace ldb = leveldb;
@ -82,17 +83,14 @@ public:
BlockDetails const& details(h256 _hash) const; BlockDetails const& details(h256 _hash) const;
BlockDetails const& details() const { return details(currentHash()); } BlockDetails const& details() const { return details(currentHash()); }
/// Get a given block (RLP format). /// Get a given block (RLP format). Thread-safe.
bytesConstRef block(h256 _hash) const; bytesConstRef block(h256 _hash) const;
bytesConstRef block() const { return block(currentHash()); } bytesConstRef block() const { return block(currentHash()); }
/// Get a given block (RLP format). /// Get a given block (RLP format). Thread-safe.
h256 currentHash() const { return m_lastBlockHash; } h256 currentHash() const { return m_lastBlockHash; }
/// Get the coinbase address of a given block. /// Get the hash of the genesis block.
Address coinbaseAddress(h256 _hash) const;
Address coinbaseAddress() const { return coinbaseAddress(currentHash()); }
h256 genesisHash() const { return m_genesisHash; } h256 genesisHash() const { return m_genesisHash; }
std::vector<std::pair<Address, AddressState>> interestQueue() { std::vector<std::pair<Address, AddressState>> ret; swap(ret, m_interestQueue); return ret; } std::vector<std::pair<Address, AddressState>> interestQueue() { std::vector<std::pair<Address, AddressState>> ret; swap(ret, m_interestQueue); return ret; }
@ -105,6 +103,7 @@ private:
/// Get fully populated from disk DB. /// Get fully populated from disk DB.
mutable std::map<h256, BlockDetails> m_details; mutable std::map<h256, BlockDetails> m_details;
mutable std::map<h256, std::string> m_cache; mutable std::map<h256, std::string> m_cache;
mutable std::mutex m_lock;
/// The queue of transactions that have happened that we're interested in. /// The queue of transactions that have happened that we're interested in.
std::map<Address, int> m_interest; std::map<Address, int> m_interest;

24
libethereum/Client.cpp

@ -120,15 +120,19 @@ void Client::transact(Secret _secret, Address _dest, u256 _amount, u256s _data)
void Client::work() void Client::work()
{ {
m_lock.lock();
bool changed = false; bool changed = false;
// Process network events. // Process network events.
// Synchronise block chain with network. // Synchronise block chain with network.
// Will broadcast any of our (new) transactions and blocks, and collect & add any of their (new) transactions and blocks. // Will broadcast any of our (new) transactions and blocks, and collect & add any of their (new) transactions and blocks.
if (m_net) if (m_net)
if (m_net->process(m_bc, m_tq, m_stateDB)) {
m_net->process();
lock_guard<mutex> l(m_lock);
if (m_net->sync(m_bc, m_tq, m_stateDB))
changed = true; changed = true;
}
// Synchronise state to block chain. // Synchronise state to block chain.
// This should remove any transactions on our queue that are included within our state. // This should remove any transactions on our queue that are included within our state.
@ -136,18 +140,21 @@ void Client::work()
// This might mean reverting to an earlier state and replaying some blocks, or, (worst-case: // This might mean reverting to an earlier state and replaying some blocks, or, (worst-case:
// if there are no checkpoints before our fork) reverting to the genesis block and replaying // if there are no checkpoints before our fork) reverting to the genesis block and replaying
// all blocks. // all blocks.
// Resynchronise state with block chain & trans // Resynchronise state with block chain & trans
if (m_s.sync(m_bc))
{ {
changed = true; lock_guard<mutex> l(m_lock);
m_mined = m_s; if (m_s.sync(m_bc))
{
changed = true;
m_mined = m_s;
}
} }
m_lock.unlock();
if (m_doMine) if (m_doMine)
{ {
if (m_miningStarted) if (m_miningStarted)
{ {
lock_guard<mutex> l(m_lock);
m_mined = m_s; m_mined = m_s;
m_mined.sync(m_tq); m_mined.sync(m_tq);
m_mined.commitToMine(m_bc); m_mined.commitToMine(m_bc);
@ -164,10 +171,9 @@ void Client::work()
if (mineInfo.completed) if (mineInfo.completed)
{ {
// Import block. // Import block.
m_lock.lock(); lock_guard<mutex> l(m_lock);
m_bc.attemptImport(m_mined.blockData(), m_stateDB); m_bc.attemptImport(m_mined.blockData(), m_stateDB);
m_mineProgress.best = 0; m_mineProgress.best = 0;
m_lock.unlock();
m_changed = true; m_changed = true;
m_miningStarted = true; // need to re-commit to mine. m_miningStarted = true; // need to re-commit to mine.
} }

262
libethereum/PeerNetwork.cpp

@ -791,33 +791,25 @@ void PeerServer::connect(bi::tcp::endpoint const& _ep)
}); });
} }
bool PeerServer::process(BlockChain& _bc) bool PeerServer::sync()
{ {
bool ret = false; bool ret = false;
m_ioService.poll(); for (auto i = m_peers.begin(); i != m_peers.end();)
{
auto n = chrono::steady_clock::now(); auto p = i->second.lock();
bool fullProcess = (n > m_lastFullProcess + chrono::seconds(1)); if (p && p->m_socket.is_open() &&
if (fullProcess) (p->m_disconnect == chrono::steady_clock::time_point::max() || chrono::steady_clock::now() - p->m_disconnect < chrono::seconds(1))) // kill old peers that should be disconnected.
m_lastFullProcess = n; ++i;
else
if (fullProcess)
for (auto i = m_peers.begin(); i != m_peers.end();)
{ {
auto p = i->second.lock(); i = m_peers.erase(i);
if (p && p->m_socket.is_open() && ret = true;
(p->m_disconnect == chrono::steady_clock::time_point::max() || chrono::steady_clock::now() - p->m_disconnect < chrono::seconds(1))) // kill old peers that should be disconnected.
++i;
else
{
i = m_peers.erase(i);
ret = true;
}
} }
}
return ret; return ret;
} }
bool PeerServer::process(BlockChain& _bc, TransactionQueue& _tq, Overlay& _o) bool PeerServer::sync(BlockChain& _bc, TransactionQueue& _tq, Overlay& _o)
{ {
bool ret = false; bool ret = false;
@ -830,14 +822,10 @@ bool PeerServer::process(BlockChain& _bc, TransactionQueue& _tq, Overlay& _o)
for (auto const& i: _tq.transactions()) for (auto const& i: _tq.transactions())
m_transactionsSent.insert(i.first); m_transactionsSent.insert(i.first);
m_lastPeersRequest = chrono::steady_clock::time_point::min(); m_lastPeersRequest = chrono::steady_clock::time_point::min();
m_lastFullProcess = chrono::steady_clock::time_point::min();
ret = true; ret = true;
} }
auto n = chrono::steady_clock::now(); if (sync())
bool fullProcess = (n > m_lastFullProcess + chrono::seconds(1));
if (process(_bc))
ret = true; ret = true;
if (m_mode == NodeMode::Full) if (m_mode == NodeMode::Full)
@ -850,118 +838,115 @@ bool PeerServer::process(BlockChain& _bc, TransactionQueue& _tq, Overlay& _o)
m_incomingTransactions.clear(); m_incomingTransactions.clear();
// Send any new transactions. // Send any new transactions.
if (fullProcess) for (auto j: m_peers)
if (auto p = j.second.lock())
{
bytes b;
uint n = 0;
for (auto const& i: _tq.transactions())
if ((!m_transactionsSent.count(i.first) && !p->m_knownTransactions.count(i.first)) || p->m_requireTransactions)
{
b += i.second;
++n;
m_transactionsSent.insert(i.first);
}
if (n)
{
RLPStream ts;
PeerSession::prep(ts);
ts.appendList(n + 1) << TransactionsPacket;
ts.appendRaw(b, n).swapOut(b);
seal(b);
p->send(&b);
}
p->m_knownTransactions.clear();
p->m_requireTransactions = false;
}
// Send any new blocks.
auto h = _bc.currentHash();
if (h != m_latestBlockSent)
{ {
// TODO: find where they diverge and send complete new branch.
RLPStream ts;
PeerSession::prep(ts);
ts.appendList(2) << BlocksPacket;
bytes b;
ts.appendRaw(_bc.block(_bc.currentHash())).swapOut(b);
seal(b);
for (auto j: m_peers) for (auto j: m_peers)
if (auto p = j.second.lock()) if (auto p = j.second.lock())
{ {
bytes b; if (!p->m_knownBlocks.count(_bc.currentHash()))
uint n = 0;
for (auto const& i: _tq.transactions())
if ((!m_transactionsSent.count(i.first) && !p->m_knownTransactions.count(i.first)) || p->m_requireTransactions)
{
b += i.second;
++n;
m_transactionsSent.insert(i.first);
}
if (n)
{
RLPStream ts;
PeerSession::prep(ts);
ts.appendList(n + 1) << TransactionsPacket;
ts.appendRaw(b, n).swapOut(b);
seal(b);
p->send(&b); p->send(&b);
} p->m_knownBlocks.clear();
p->m_knownTransactions.clear();
p->m_requireTransactions = false;
} }
}
m_latestBlockSent = h;
// Send any new blocks. for (int accepted = 1, n = 0; accepted; ++n)
auto h = _bc.currentHash(); {
if (h != m_latestBlockSent) accepted = 0;
{
// TODO: find where they diverge and send complete new branch. if (m_incomingBlocks.size())
RLPStream ts; for (auto it = prev(m_incomingBlocks.end());; --it)
PeerSession::prep(ts); {
ts.appendList(2) << BlocksPacket; try
bytes b;
ts.appendRaw(_bc.block(_bc.currentHash())).swapOut(b);
seal(b);
for (auto j: m_peers)
if (auto p = j.second.lock())
{ {
if (!p->m_knownBlocks.count(_bc.currentHash())) _bc.import(*it, _o);
p->send(&b); it = m_incomingBlocks.erase(it);
p->m_knownBlocks.clear(); ++accepted;
ret = true;
} }
} catch (UnknownParent)
m_latestBlockSent = h;
for (int accepted = 1, n = 0; accepted; ++n)
{
accepted = 0;
if (m_incomingBlocks.size())
for (auto it = prev(m_incomingBlocks.end());; --it)
{ {
try // Don't (yet) know its parent. Leave it for later.
{ m_unknownParentBlocks.push_back(*it);
_bc.import(*it, _o); it = m_incomingBlocks.erase(it);
it = m_incomingBlocks.erase(it);
++accepted;
ret = true;
}
catch (UnknownParent)
{
// Don't (yet) know its parent. Leave it for later.
m_unknownParentBlocks.push_back(*it);
it = m_incomingBlocks.erase(it);
}
catch (...)
{
// Some other error - erase it.
it = m_incomingBlocks.erase(it);
}
if (it == m_incomingBlocks.begin())
break;
} }
if (!n && accepted) catch (...)
{ {
for (auto i: m_unknownParentBlocks) // Some other error - erase it.
m_incomingBlocks.push_back(i); it = m_incomingBlocks.erase(it);
m_unknownParentBlocks.clear(); }
if (it == m_incomingBlocks.begin())
break;
} }
if (!n && accepted)
{
for (auto i: m_unknownParentBlocks)
m_incomingBlocks.push_back(i);
m_unknownParentBlocks.clear();
} }
}
// Connect to additional peers // Connect to additional peers
while (m_peers.size() < m_idealPeerCount) while (m_peers.size() < m_idealPeerCount)
{
if (m_incomingPeers.empty())
{ {
if (m_incomingPeers.empty()) if (chrono::steady_clock::now() > m_lastPeersRequest + chrono::seconds(10))
{ {
if (chrono::steady_clock::now() > m_lastPeersRequest + chrono::seconds(10)) RLPStream s;
{ bytes b;
RLPStream s; (PeerSession::prep(s).appendList(1) << GetPeersPacket).swapOut(b);
bytes b; seal(b);
(PeerSession::prep(s).appendList(1) << GetPeersPacket).swapOut(b); for (auto const& i: m_peers)
seal(b); if (auto p = i.second.lock())
for (auto const& i: m_peers) if (p->isOpen())
if (auto p = i.second.lock()) p->send(&b);
if (p->isOpen()) m_lastPeersRequest = chrono::steady_clock::now();
p->send(&b); }
m_lastPeersRequest = chrono::steady_clock::now();
}
if (!m_accepting) if (!m_accepting)
ensureAccepting(); ensureAccepting();
break; break;
}
connect(m_incomingPeers.begin()->second);
m_incomingPeers.erase(m_incomingPeers.begin());
} }
connect(m_incomingPeers.begin()->second);
m_incomingPeers.erase(m_incomingPeers.begin());
} }
} }
@ -969,29 +954,26 @@ bool PeerServer::process(BlockChain& _bc, TransactionQueue& _tq, Overlay& _o)
// restricts your freedom but does so fairly. and that's the value proposition. // restricts your freedom but does so fairly. and that's the value proposition.
// guarantees that everyone else respect the rules of the system. (i.e. obeys laws). // guarantees that everyone else respect the rules of the system. (i.e. obeys laws).
if (fullProcess) // We'll keep at most twice as many as is ideal, halfing what counts as "too young to kill" until we get there.
{ for (uint old = 15000; m_peers.size() > m_idealPeerCount * 2 && old > 100; old /= 2)
// We'll keep at most twice as many as is ideal, halfing what counts as "too young to kill" until we get there. while (m_peers.size() > m_idealPeerCount)
for (uint old = 15000; m_peers.size() > m_idealPeerCount * 2 && old > 100; old /= 2) {
while (m_peers.size() > m_idealPeerCount) // look for worst peer to kick off
{ // first work out how many are old enough to kick off.
// look for worst peer to kick off shared_ptr<PeerSession> worst;
// first work out how many are old enough to kick off. unsigned agedPeers = 0;
shared_ptr<PeerSession> worst; for (auto i: m_peers)
unsigned agedPeers = 0; if (auto p = i.second.lock())
for (auto i: m_peers) if ((m_mode != NodeMode::PeerServer || p->m_caps != 0x01) && chrono::steady_clock::now() > p->m_connect + chrono::milliseconds(old)) // don't throw off new peers; peer-servers should never kick off other peer-servers.
if (auto p = i.second.lock()) {
if ((m_mode != NodeMode::PeerServer || p->m_caps != 0x01) && chrono::steady_clock::now() > p->m_connect + chrono::milliseconds(old)) // don't throw off new peers; peer-servers should never kick off other peer-servers. ++agedPeers;
{ if ((!worst || p->m_rating < worst->m_rating || (p->m_rating == worst->m_rating && p->m_connect > worst->m_connect))) // kill older ones
++agedPeers; worst = p;
if ((!worst || p->m_rating < worst->m_rating || (p->m_rating == worst->m_rating && p->m_connect > worst->m_connect))) // kill older ones }
worst = p; if (!worst || agedPeers <= m_idealPeerCount)
} break;
if (!worst || agedPeers <= m_idealPeerCount) worst->disconnect(TooManyPeers);
break; }
worst->disconnect(TooManyPeers);
}
}
return ret; return ret;
} }

9
libethereum/PeerNetwork.h

@ -162,10 +162,13 @@ public:
void connect(bi::tcp::endpoint const& _ep); void connect(bi::tcp::endpoint const& _ep);
/// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network. /// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network.
bool sync(BlockChain& _bc, TransactionQueue&, Overlay& _o);
bool sync();
/// Conduct I/O, polling, syncing, whatever. /// Conduct I/O, polling, syncing, whatever.
/// Ideally all time-consuming I/O is done in a background thread or otherwise asynchronously, but you get this call every 100ms or so anyway. /// Ideally all time-consuming I/O is done in a background thread or otherwise asynchronously, but you get this call every 100ms or so anyway.
bool process(BlockChain& _bc, TransactionQueue&, Overlay& _o); /// This won't touch alter the blockchain.
bool process(BlockChain& _bc); void process() { m_ioService.poll(); }
/// Set ideal number of peers. /// Set ideal number of peers.
void setIdealPeerCount(unsigned _n) { m_idealPeerCount = _n; } void setIdealPeerCount(unsigned _n) { m_idealPeerCount = _n; }
@ -222,8 +225,6 @@ private:
std::chrono::steady_clock::time_point m_lastPeersRequest; std::chrono::steady_clock::time_point m_lastPeersRequest;
unsigned m_idealPeerCount = 5; unsigned m_idealPeerCount = 5;
std::chrono::steady_clock::time_point m_lastFullProcess;
std::vector<bi::address_v4> m_addresses; std::vector<bi::address_v4> m_addresses;
std::vector<bi::address_v4> m_peerAddresses; std::vector<bi::address_v4> m_peerAddresses;

2
test/peer.cpp

@ -56,7 +56,7 @@ int peerTest(int argc, char** argv)
for (int i = 0; ; ++i) for (int i = 0; ; ++i)
{ {
this_thread::sleep_for(chrono::milliseconds(100)); this_thread::sleep_for(chrono::milliseconds(100));
pn.process(ch); pn.sync();
if (!(i % 10)) if (!(i % 10))
pn.pingAll(); pn.pingAll();
} }

Loading…
Cancel
Save