Browse Source

Network a lot smoother.

cl-refactor
Gav Wood 11 years ago
parent
commit
268ca545ac
  1. 32
      libethereum/BlockChain.cpp
  2. 9
      libethereum/BlockChain.h
  3. 102
      libethereum/BlockQueue.cpp
  4. 66
      libethereum/BlockQueue.h
  5. 45
      libethereum/Client.cpp
  6. 10
      libethereum/Client.h
  7. 3
      libethereum/Guards.h
  8. 155
      libethereum/PeerServer.cpp
  9. 34
      libethereum/PeerServer.h
  10. 26
      libethereum/PeerSession.cpp
  11. 35
      libethereum/TransactionQueue.cpp
  12. 15
      libethereum/TransactionQueue.h

32
libethereum/BlockChain.cpp

@ -163,20 +163,40 @@ bool contains(T const& _t, V const& _v)
return false;
}
h256s BlockChain::attemptImport(bytes const& _block, OverlayDB const& _stateDB)
h256s BlockChain::sync(BlockQueue& _bq, OverlayDB const& _stateDB, unsigned _max)
{
vector<bytes> blocks;
_bq.drain(blocks);
h256s ret;
for (auto const& block: blocks)
try
{
for (auto h: import(block, _stateDB))
if (!_max--)
break;
else
ret.push_back(h);
}
catch (UnknownParent)
{
cwarn << "Unknown parent of block!!!" << eth::sha3(block).abridged();
_bq.import(&block, *this);
}
catch (...){}
return ret;
}
h256s BlockChain::attemptImport(bytes const& _block, OverlayDB const& _stateDB) noexcept
{
#if ETH_CATCH
try
#endif
{
return import(_block, _stateDB);
}
#if ETH_CATCH
catch (...)
{
return h256s();
}
#endif
}
h256s BlockChain::import(bytes const& _block, OverlayDB const& _db)
@ -220,7 +240,7 @@ h256s BlockChain::import(bytes const& _block, OverlayDB const& _db)
if (bi.timestamp > (u256)time(0))
{
clog(BlockChainNote) << newHash << ": Future time " << bi.timestamp << " (now at " << time(0) << ")";
// We don't know the parent (yet) - discard for now. It'll get resent to us if we find out about its ancestry later on.
// Block has a timestamp in the future. This is no good.
throw FutureTime();
}

9
libethereum/BlockChain.h

@ -28,6 +28,7 @@
#include "Guards.h"
#include "BlockDetails.h"
#include "AddressState.h"
#include "BlockQueue.h"
namespace ldb = leveldb;
namespace eth
@ -66,8 +67,12 @@ public:
/// To be called from main loop every 100ms or so.
void process();
/// Attempt to import the given block.
h256s attemptImport(bytes const& _block, OverlayDB const& _stateDB);
/// Sync the chain with any incoming blocks. All blocks should, if processed in order
h256s sync(BlockQueue& _bq, OverlayDB const& _stateDB, unsigned _max);
/// Attempt to import the given block directly into the BlockChain and sync with the state DB.
/// @returns the block hashes of any blocks that came into/went out of the canonical block chain.
h256s attemptImport(bytes const& _block, OverlayDB const& _stateDB) noexcept;
/// Import block into disk-backed DB
/// @returns the block hashes of any blocks that came into/went out of the canonical block chain.

102
libethereum/BlockQueue.cpp

@ -0,0 +1,102 @@
/*
This file is part of cpp-ethereum.
cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file BlockQueue.cpp
* @author Gav Wood <i@gavwood.com>
* @date 2014
*/
#include "BlockQueue.h"
#include <libethential/Log.h>
#include <libethcore/Exceptions.h>
#include <libethcore/BlockInfo.h>
#include "BlockChain.h"
using namespace std;
using namespace eth;
bool BlockQueue::import(bytesConstRef _block, BlockChain const& _bc)
{
// Check if we already know this block.
h256 h = sha3(_block);
UpgradableGuard l(m_lock);
if (m_readySet.count(h) || m_futureSet.count(h))
// Already know about this one.
return false;
// VERIFY: populates from the block and checks the block is internally coherent.
BlockInfo bi;
#if ETH_CATCH
try
#endif
{
bi.populate(_block);
bi.verifyInternals(_block);
}
#if ETH_CATCH
catch (Exception const& _e)
{
cwarn << "Ignoring malformed block: " << _e.description();
return false;
}
#endif
auto newHash = eth::sha3(_block);
// Check block doesn't already exist first!
if (_bc.details(newHash))
return false;
// Check it's not crazy
if (bi.timestamp > (u256)time(0))
return false;
UpgradeGuard ul(l);
// We now know it.
if (!m_readySet.count(bi.parentHash) && !_bc.details(bi.parentHash))
{
// We don't know the parent (yet) - queue it up for later. It'll get resent to us if we find out about its ancestry later on.
m_future.insert(make_pair(bi.parentHash, make_pair(h, _block.toBytes())));
m_futureSet.insert(h);
return true;
}
// If valid, append to blocks.
m_ready.push_back(_block.toBytes());
m_readySet.insert(h);
noteReadyWithoutWriteGuard(h);
return true;
}
void BlockQueue::noteReadyWithoutWriteGuard(h256 _b)
{
auto r = m_future.equal_range(_b);
h256s good;
for (auto it = r.first; it != r.second; ++it)
{
m_futureSet.erase(it->second.first);
m_ready.push_back(it->second.second);
m_readySet.erase(it->second.first);
good.push_back(it->second.first);
}
m_future.erase(r.first, r.second);
for (auto g: good)
noteReadyWithoutWriteGuard(g);
}

66
libethereum/BlockQueue.h

@ -0,0 +1,66 @@
/*
This file is part of cpp-ethereum.
cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
cpp-ethereum is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file BlockQueue.h
* @author Gav Wood <i@gavwood.com>
* @date 2014
*/
#pragma once
#include <boost/thread.hpp>
#include <libethential/Common.h>
#include "libethcore/CommonEth.h"
#include "Guards.h"
namespace eth
{
class BlockChain;
/**
* @brief A queue of blocks. Sits between network or other I/O and the BlockChain.
* Sorts them ready for blockchain insertion (with the BlockChain::sync() method).
* @threadsafe
*/
class BlockQueue
{
public:
/// Import a block into the queue.
bool import(bytesConstRef _tx, BlockChain const& _bc);
/// Grabs the blocks that are ready, giving them in the correct order for insertion into the chain.
void drain(std::vector<bytes>& o_out) { WriteGuard l(m_lock); swap(o_out, m_ready); m_readySet.clear(); }
/// Notify the queue that the chain has changed and a new block has attained 'ready' status (i.e. is in the chain).
void noteReady(h256 _b) { WriteGuard l(m_lock); noteReadyWithoutWriteGuard(_b); }
/// Get information on the items queued.
std::pair<unsigned, unsigned> items() const { ReadGuard l(m_lock); return std::make_pair(m_ready.size(), m_future.size()); }
private:
void noteReadyWithoutWriteGuard(h256 _b);
mutable boost::shared_mutex m_lock; ///< General lock.
std::set<h256> m_readySet; ///< All blocks ready for chain-import.
std::vector<bytes> m_ready; ///< List of blocks, in correct order, ready for chain-import.
std::set<h256> m_futureSet; ///< Set of all blocks whose parents are not ready/in-chain.
std::multimap<h256, std::pair<h256, bytes>> m_future; ///< For transactions that have an unknown parent; we map their parent hash to the block stuff, and insert once the block appears.
};
}

45
libethereum/Client.cpp

@ -198,7 +198,8 @@ void Client::startNetwork(unsigned short _listenPort, std::string const& _seedHo
{
ensureWorking();
ClientGuard l(this);
{
Guard l(x_net);
if (m_net.get())
return;
try
@ -213,25 +214,27 @@ void Client::startNetwork(unsigned short _listenPort, std::string const& _seedHo
}
m_net->setIdealPeerCount(_peers);
}
if (_seedHost.size())
connect(_seedHost, _port);
}
std::vector<PeerInfo> Client::peers()
{
ClientGuard l(this);
Guard l(x_net);
return m_net ? m_net->peers() : std::vector<PeerInfo>();
}
size_t Client::peerCount() const
{
ClientGuard l(this);
Guard l(x_net);
return m_net ? m_net->peerCount() : 0;
}
void Client::connect(std::string const& _seedHost, unsigned short _port)
{
ClientGuard l(this);
Guard l(x_net);
if (!m_net.get())
return;
m_net->connect(_seedHost, _port);
@ -239,7 +242,7 @@ void Client::connect(std::string const& _seedHost, unsigned short _port)
void Client::stopNetwork()
{
ClientGuard l(this);
Guard l(x_net);
m_net.reset(nullptr);
}
@ -308,27 +311,26 @@ void Client::work(bool _justQueue)
// Process network events.
// Synchronise block chain with network.
// Will broadcast any of our (new) transactions and blocks, and collect & add any of their (new) transactions and blocks.
{
Guard l(x_net);
if (m_net && !_justQueue)
{
cdebug << "--- WORK: LOCK";
ClientGuard l(this);
cdebug << "--- WORK: NETWORK";
m_net->process(); // must be in guard for now since it uses the blockchain.
// returns h256Set as block hashes, once for each block that has come in/gone out.
cdebug << "--- WORK: TQ <== NET ==> CHAIN";
h256Set newBlocks = m_net->sync(m_bc, m_tq, m_stateDB, 100);
if (newBlocks.size())
{
for (auto i: newBlocks)
appendFromNewBlock(i, changeds);
changeds.insert(NewBlockFilter);
cdebug << "--- WORK: NET <==> TQ ; CHAIN ==> NET ==> BQ";
m_net->sync(m_tq, m_bq);
cdebug << "--- TQ:" << m_tq.items() << "; BQ:" << m_bq.items();
}
}
// Do some mining.
if (!_justQueue)
{
// TODO: Separate "Miner" object.
if (m_doMine)
{
if (m_restartMining)
@ -402,6 +404,21 @@ void Client::work(bool _justQueue)
// Resynchronise state with block chain & trans
{
ClientGuard l(this);
cdebug << "--- WORK: BQ ==> CHAIN ==> STATE";
OverlayDB db = m_stateDB;
m_lock.unlock();
h256s newBlocks = m_bc.sync(m_bq, db, 100);
if (newBlocks.size())
{
for (auto i: newBlocks)
appendFromNewBlock(i, changeds);
changeds.insert(NewBlockFilter);
}
m_lock.lock();
if (newBlocks.size())
m_stateDB = db;
cdebug << "--- WORK: preSTATE <== CHAIN";
if (m_preMine.sync(m_bc) || m_postMine.address() != m_preMine.address())
{

10
libethereum/Client.h

@ -252,9 +252,9 @@ public:
/// Stop the network subsystem.
void stopNetwork();
/// Is the network subsystem up?
bool haveNetwork() { return !!m_net; }
/// Get access to the peer server object. This will be null if the network isn't online.
PeerServer* peerServer() const { return m_net.get(); }
bool haveNetwork() { Guard l(x_net); return !!m_net; }
/// Get access to the peer server object. This will be null if the network isn't online. DANGEROUS! DO NOT USE!
PeerServer* peerServer() const { Guard l(x_net); return m_net.get(); }
// Mining stuff:
@ -311,11 +311,13 @@ private:
std::string m_clientVersion; ///< Our end-application client's name/version.
VersionChecker m_vc; ///< Dummy object to check & update the protocol version.
BlockChain m_bc; ///< Maintains block database.
TransactionQueue m_tq; ///< Maintains list of incoming transactions not yet on the block chain.
TransactionQueue m_tq; ///< Maintains a list of incoming transactions not yet in a block on the blockchain.
BlockQueue m_bq; ///< Maintains a list of incoming blocks not yet on the blockchain (to be imported).
OverlayDB m_stateDB; ///< Acts as the central point for the state database, so multiple States can share it.
State m_preMine; ///< The present state of the client.
State m_postMine; ///< The state of the client which we're mining (i.e. it'll have all the rewards added).
mutable std::mutex x_net; ///< Lock for the network.
std::unique_ptr<PeerServer> m_net; ///< Should run in background and send us events when blocks found and allow us to send blocks as required.
std::unique_ptr<std::thread> m_work;///< The work thread.

3
libethereum/Guards.h

@ -21,11 +21,14 @@
#pragma once
#include <mutex>
#include <boost/thread.hpp>
namespace eth
{
using Guard = std::lock_guard<std::mutex>;
using RecursiveGuard = std::lock_guard<std::recursive_mutex>;
using ReadGuard = boost::shared_lock<boost::shared_mutex>;
using UpgradableGuard = boost::upgrade_lock<boost::shared_mutex>;
using UpgradeGuard = boost::upgrade_to_unique_lock<boost::shared_mutex>;

155
libethereum/PeerServer.cpp

@ -39,6 +39,7 @@
#include <libethcore/Exceptions.h>
#include "BlockChain.h"
#include "TransactionQueue.h"
#include "BlockQueue.h"
#include "PeerSession.h"
using namespace std;
using namespace eth;
@ -106,9 +107,34 @@ PeerServer::PeerServer(std::string const& _clientVersion, BlockChain const& _ch,
PeerServer::~PeerServer()
{
for (auto const& i: m_peers)
disconnectPeers();
}
void PeerServer::registerPeer(std::shared_ptr<PeerSession> _s)
{
Guard l(x_peers);
m_peers[_s->m_id] = _s;
}
void PeerServer::disconnectPeers()
{
for (unsigned n = 0;; n = 0)
{
{
Guard l(x_peers);
for (auto i: m_peers)
if (auto p = i.second.lock())
{
p->disconnect(ClientQuit);
n++;
}
}
if (!n)
break;
m_ioService.poll();
usleep(100000);
}
delete m_upnp;
}
@ -252,6 +278,7 @@ std::map<Public, bi::tcp::endpoint> PeerServer::potentialPeers()
std::map<Public, bi::tcp::endpoint> ret;
if (!m_public.address().is_unspecified())
ret.insert(make_pair(m_key.pub(), m_public));
Guard l(x_peers);
for (auto i: m_peers)
if (auto j = i.second.lock())
{
@ -288,7 +315,7 @@ void PeerServer::ensureAccepting()
clog(NetWarn) << "ERROR: " << _e.what();
}
m_accepting = false;
if (ec.value() != 1 && (m_mode == NodeMode::PeerServer || m_peers.size() < m_idealPeerCount * 2))
if (ec.value() != 1 && (m_mode == NodeMode::PeerServer || peerCount() < m_idealPeerCount * 2))
ensureAccepting();
});
}
@ -336,12 +363,12 @@ void PeerServer::connect(bi::tcp::endpoint const& _ep)
});
}
bool PeerServer::ensureInitialised(BlockChain& _bc, TransactionQueue& _tq)
bool PeerServer::ensureInitialised(TransactionQueue& _tq)
{
if (m_latestBlockSent == h256())
{
// First time - just initialise.
m_latestBlockSent = _bc.currentHash();
m_latestBlockSent = m_chain->currentHash();
clog(NetNote) << "Initialising: latest=" << m_latestBlockSent;
for (auto const& i: _tq.transactions())
@ -363,14 +390,34 @@ bool PeerServer::noteBlock(h256 _hash, bytesConstRef _data)
return false;
}
h256Set PeerServer::sync(BlockChain& _bc, TransactionQueue& _tq, OverlayDB& _o, unsigned _max)
bool PeerServer::sync(TransactionQueue& _tq, BlockQueue& _bq)
{
h256Set ret;
bool netChange = ensureInitialised(_bc, _tq);
bool netChange = ensureInitialised(_tq);
if (m_mode == NodeMode::Full)
{
auto h = m_chain->currentHash();
maintainTransactions(_tq, h);
maintainBlocks(_bq, h);
// Connect to additional peers
growPeers();
}
// platform for consensus of social contract.
// restricts your freedom but does so fairly. and that's the value proposition.
// guarantees that everyone else respect the rules of the system. (i.e. obeys laws).
prunePeers();
return netChange;
}
void PeerServer::maintainTransactions(TransactionQueue& _tq, h256 _currentHash)
{
bool resendAll = (_currentHash != m_latestBlockSent);
for (auto it = m_incomingTransactions.begin(); it != m_incomingTransactions.end(); ++it)
if (_tq.import(&*it))
{}//ret = true; // just putting a transaction in the queue isn't enough to change the state - it might have an invalid nonce...
@ -378,10 +425,8 @@ h256Set PeerServer::sync(BlockChain& _bc, TransactionQueue& _tq, OverlayDB& _o,
m_transactionsSent.insert(sha3(*it)); // if we already had the transaction, then don't bother sending it on.
m_incomingTransactions.clear();
auto h = _bc.currentHash();
bool resendAll = (h != m_latestBlockSent);
// Send any new transactions.
Guard l(x_peers);
for (auto j: m_peers)
if (auto p = j.second.lock())
{
@ -406,68 +451,46 @@ h256Set PeerServer::sync(BlockChain& _bc, TransactionQueue& _tq, OverlayDB& _o,
p->m_knownTransactions.clear();
p->m_requireTransactions = false;
}
}
void PeerServer::maintainBlocks(BlockQueue& _bq, h256 _currentHash)
{
// Import new blocks
{
lock_guard<recursive_mutex> l(m_incomingLock);
for (auto it = m_incomingBlocks.rbegin(); it != m_incomingBlocks.rend(); ++it)
if (_bq.import(&*it, *m_chain))
{}
else{} // TODO: don't forward it.
m_incomingBlocks.clear();
}
// Send any new blocks.
if (h != m_latestBlockSent)
if (_currentHash != m_latestBlockSent)
{
// TODO: find where they diverge and send complete new branch.
RLPStream ts;
PeerSession::prep(ts);
ts.appendList(2) << BlocksPacket;
bytes b;
ts.appendRaw(_bc.block(_bc.currentHash())).swapOut(b);
ts.appendRaw(m_chain->block()).swapOut(b);
seal(b);
Guard l(x_peers);
for (auto j: m_peers)
if (auto p = j.second.lock())
{
if (!p->m_knownBlocks.count(_bc.currentHash()))
if (!p->m_knownBlocks.count(_currentHash))
p->send(&b);
p->m_knownBlocks.clear();
}
}
m_latestBlockSent = h;
unsigned totalAccepted = 0;
for (int accepted = 1, n = 0; accepted && totalAccepted < _max; ++n)
{
accepted = 0;
lock_guard<recursive_mutex> l(m_incomingLock);
if (m_incomingBlocks.size())
for (auto it = prev(m_incomingBlocks.end()); totalAccepted < _max; --it)
{
try
{
for (auto h: _bc.import(*it, _o))
ret.insert(h);
it = m_incomingBlocks.erase(it);
++accepted;
++totalAccepted;
netChange = true;
}
catch (UnknownParent)
{
// Don't (yet) know its parent. Leave it for later.
m_unknownParentBlocks.push_back(*it);
it = m_incomingBlocks.erase(it);
}
catch (...)
{
// Some other error - erase it.
it = m_incomingBlocks.erase(it);
}
if (it == m_incomingBlocks.begin())
break;
}
if (!n && accepted)
{
for (auto i: m_unknownParentBlocks)
m_incomingBlocks.push_back(i);
m_unknownParentBlocks.clear();
}
}
m_latestBlockSent = _currentHash;
}
// Connect to additional peers
void PeerServer::growPeers()
{
Guard l(x_peers);
while (m_peers.size() < m_idealPeerCount)
{
if (m_freePeers.empty())
@ -497,12 +520,11 @@ h256Set PeerServer::sync(BlockChain& _bc, TransactionQueue& _tq, OverlayDB& _o,
connect(m_incomingPeers[m_freePeers[x]].first);
m_freePeers.erase(m_freePeers.begin() + x);
}
}
// platform for consensus of social contract.
// restricts your freedom but does so fairly. and that's the value proposition.
// guarantees that everyone else respect the rules of the system. (i.e. obeys laws).
}
void PeerServer::prunePeers()
{
Guard l(x_peers);
// We'll keep at most twice as many as is ideal, halfing what counts as "too young to kill" until we get there.
for (uint old = 15000; m_peers.size() > m_idealPeerCount * 2 && old > 100; old /= 2)
while (m_peers.size() > m_idealPeerCount)
@ -524,12 +546,17 @@ h256Set PeerServer::sync(BlockChain& _bc, TransactionQueue& _tq, OverlayDB& _o,
worst->disconnect(TooManyPeers);
}
(void)netChange;
return ret;
// Remove dead peers from list.
for (auto i = m_peers.begin(); i != m_peers.end();)
if (i->second.lock().get())
++i;
else
i = m_peers.erase(i);
}
std::vector<PeerInfo> PeerServer::peers(bool _updatePing) const
{
Guard l(x_peers);
if (_updatePing)
const_cast<PeerServer*>(this)->pingAll();
this_thread::sleep_for(chrono::milliseconds(200));
@ -543,6 +570,7 @@ std::vector<PeerInfo> PeerServer::peers(bool _updatePing) const
void PeerServer::pingAll()
{
Guard l(x_peers);
for (auto& i: m_peers)
if (auto j = i.second.lock())
j->ping();
@ -550,6 +578,7 @@ void PeerServer::pingAll()
bytes PeerServer::savePeers() const
{
Guard l(x_peers);
RLPStream ret;
int n = 0;
for (auto& i: m_peers)

34
libethereum/PeerServer.h

@ -30,12 +30,20 @@
#include <thread>
#include <libethcore/CommonEth.h>
#include "PeerNetwork.h"
#include "Guards.h"
namespace ba = boost::asio;
namespace bi = boost::asio::ip;
namespace eth
{
class TransactionQueue;
class BlockQueue;
/**
* @brief The PeerServer class
* @warning None of this is thread-safe. You have been warned.
*/
class PeerServer
{
friend class PeerSession;
@ -48,8 +56,12 @@ public:
/// Start server, but don't listen.
PeerServer(std::string const& _clientVersion, BlockChain const& _ch, unsigned int _networkId, NodeMode _m = NodeMode::Full);
/// Will block on network process events.
~PeerServer();
/// Closes all peers.
void disconnectPeers();
static unsigned protocolVersion();
unsigned networkId() { return m_networkId; }
@ -58,13 +70,15 @@ public:
void connect(bi::tcp::endpoint const& _ep);
/// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network.
h256Set sync(BlockChain& _bc, TransactionQueue&, OverlayDB& _o, unsigned _max);
bool sync(TransactionQueue&, BlockQueue& _bc);
/// Conduct I/O, polling, syncing, whatever.
/// Ideally all time-consuming I/O is done in a background thread or otherwise asynchronously, but you get this call every 100ms or so anyway.
/// This won't touch alter the blockchain.
void process() { if (isInitialised()) m_ioService.poll(); }
bool havePeer(Public _id) const { Guard l(x_peers); return m_peers.count(_id); }
/// Set ideal number of peers.
void setIdealPeerCount(unsigned _n) { m_idealPeerCount = _n; }
@ -74,7 +88,7 @@ public:
std::vector<PeerInfo> peers(bool _updatePing = false) const;
/// Get number of peers connected; equivalent to, but faster than, peers().size().
size_t peerCount() const { return m_peers.size(); }
size_t peerCount() const { Guard l(x_peers); return m_peers.size(); }
/// Ping the peers, to update the latency information.
void pingAll();
@ -85,6 +99,8 @@ public:
bytes savePeers() const;
void restorePeers(bytesConstRef _b);
void registerPeer(std::shared_ptr<PeerSession> _s);
private:
/// Session wants to pass us a block that we might not have.
/// @returns true if we didn't have it.
@ -95,10 +111,15 @@ private:
void determinePublic(std::string const& _publicAddress, bool _upnp);
void ensureAccepting();
void growPeers();
void prunePeers();
void maintainTransactions(TransactionQueue& _tq, h256 _currentBlock);
void maintainBlocks(BlockQueue& _bq, h256 _currentBlock);
/// Check to see if the network peer-state initialisation has happened.
bool isInitialised() const { return m_latestBlockSent; }
/// Initialises the network peer-state, doing the stuff that needs to be once-only. @returns true if it really was first.
bool ensureInitialised(BlockChain& _bc, TransactionQueue& _tq);
bool ensureInitialised(TransactionQueue& _tq);
std::map<Public, bi::tcp::endpoint> potentialPeers();
@ -117,14 +138,15 @@ private:
KeyPair m_key;
unsigned m_networkId;
mutable std::mutex x_peers;
std::map<Public, std::weak_ptr<PeerSession>> m_peers;
mutable std::recursive_mutex m_incomingLock;
std::vector<bytes> m_incomingTransactions;
std::vector<bytes> m_incomingBlocks;
mutable std::recursive_mutex m_incomingLock;
std::vector<bytes> m_unknownParentBlocks;
std::vector<Public> m_freePeers;
std::map<Public, std::pair<bi::tcp::endpoint, unsigned>> m_incomingPeers;
std::vector<Public> m_freePeers;
h256 m_latestBlockSent;
std::set<h256> m_transactionsSent;

26
libethereum/PeerSession.cpp

@ -86,12 +86,10 @@ bool PeerSession::interpret(RLP const& _r)
clogS(NetMessageSummary) << "Hello: " << clientVersion << "V[" << m_protocolVersion << "/" << m_networkId << "]" << m_id.abridged() << showbase << hex << m_caps << dec << m_listenPort;
if (m_server->m_peers.count(m_id))
if (auto l = m_server->m_peers[m_id].lock())
if (l.get() != this && l->isOpen())
if (m_server->havePeer(m_id))
{
// Already connected.
cwarn << "Already have peer id" << m_id.abridged() << "at" << l->endpoint() << "rather than" << endpoint();
cwarn << "Already have peer id" << m_id.abridged();// << "at" << l->endpoint() << "rather than" << endpoint();
disconnect(DuplicatePeer);
return false;
}
@ -109,7 +107,8 @@ bool PeerSession::interpret(RLP const& _r)
return false;
}
m_server->m_peers[m_id] = shared_from_this();
m_server->registerPeer(shared_from_this());
startInitialSync();
// Grab trsansactions off them.
{
@ -173,7 +172,7 @@ bool PeerSession::interpret(RLP const& _r)
clogS(NetAllDetail) << "Checking: " << ep << "(" << toHex(id.ref().cropped(0, 4)) << ")";
// check that it's not us or one we already know:
if (id && (m_server->m_key.pub() == id || m_server->m_peers.count(id) || m_server->m_incomingPeers.count(id)))
if (id && (m_server->m_key.pub() == id || m_server->havePeer(id) || m_server->m_incomingPeers.count(id)))
goto CONTINUE;
// check that we're not already connected to addr:
@ -182,13 +181,6 @@ bool PeerSession::interpret(RLP const& _r)
for (auto i: m_server->m_addresses)
if (ep.address() == i && ep.port() == m_server->listenPort())
goto CONTINUE;
for (auto i: m_server->m_peers)
if (shared_ptr<PeerSession> p = i.second.lock())
{
clogS(NetAllDetail) << " ...against " << p->endpoint();
if (p->m_socket.is_open() && p->endpoint() == ep)
goto CONTINUE;
}
for (auto i: m_server->m_incomingPeers)
if (i.second.first == ep)
goto CONTINUE;
@ -492,14 +484,6 @@ void PeerSession::dropped()
m_socket.close();
}
catch (...) {}
// Remove from peer server
for (auto i = m_server->m_peers.begin(); i != m_server->m_peers.end(); ++i)
if (i->second.lock().get() == this)
{
m_server->m_peers.erase(i);
break;
}
}
void PeerSession::disconnect(int _reason)

35
libethereum/TransactionQueue.cpp

@ -31,9 +31,7 @@ bool TransactionQueue::import(bytesConstRef _block)
{
// Check if we already know this transaction.
h256 h = sha3(_block);
UpgradableGuard l(x_data);
if (m_data.count(h))
if (m_known.count(h))
return false;
try
@ -44,8 +42,7 @@ bool TransactionQueue::import(bytesConstRef _block)
auto s = t.sender();
// If valid, append to blocks.
UpgradeGuard ul(l);
m_data[h] = _block.toBytes();
m_current[h] = _block.toBytes();
}
catch (InvalidTransactionFormat const& _e)
{
@ -63,20 +60,36 @@ bool TransactionQueue::import(bytesConstRef _block)
void TransactionQueue::setFuture(std::pair<h256, bytes> const& _t)
{
UpgradableGuard l(x_data);
if (m_data.count(_t.first))
if (m_current.count(_t.first))
{
UpgradeGuard ul(l);
m_data.erase(_t.first);
m_current.erase(_t.first);
m_future.insert(make_pair(Transaction(_t.second).sender(), _t));
}
}
void TransactionQueue::noteGood(std::pair<h256, bytes> const& _t)
{
WriteGuard l(x_data);
auto r = m_future.equal_range(Transaction(_t.second).sender());
for (auto it = r.first; it != r.second; ++it)
m_data.insert(_t);
m_current.insert(it->second);
m_future.erase(r.first, r.second);
}
void TransactionQueue::drop(h256 _txHash)
{
WriteGuard l(m_lock);
if (!m_known.erase(_txHash))
return;
if (m_current.count(_txHash))
m_current.erase(_txHash);
else
{
for (auto i = m_future.begin(); i != m_future.end(); ++i)
if (i->second.first == _txHash)
{
m_future.erase(i);
break;
}
}
}

15
libethereum/TransactionQueue.h

@ -38,21 +38,22 @@ class BlockChain;
class TransactionQueue
{
public:
bool attemptImport(bytesConstRef _tx) { try { import(_block); return true; } catch (...) { return false; } }
bool attemptImport(bytes const& _tx) { return attemptImport(&_block); }
bool attemptImport(bytesConstRef _tx) { try { import(_tx); return true; } catch (...) { return false; } }
bool attemptImport(bytes const& _tx) { return attemptImport(&_tx); }
bool import(bytesConstRef _tx);
void drop(h256 _txHash) { WriteGuard l(x_data); m_data.erase(_txHash); }
void drop(h256 _txHash);
std::map<h256, bytes> transactions() const { ReadGuard l(x_data); return m_data; }
std::map<h256, bytes> transactions() const { ReadGuard l(m_lock); return m_current; }
std::pair<unsigned, unsigned> items() const { ReadGuard l(m_lock); return std::make_pair(m_current.size(), m_future.size()); }
void setFuture(std::pair<h256, bytes> const& _t);
void noteGood(std::pair<h256, bytes> const& _t);
private:
std::map<h256, bytes> m_data; ///< Map of SHA3(tx) to tx.
boost::shared_mutex x_data;
mutable boost::shared_mutex m_lock; ///< General lock.
std::set<h256> m_known; ///< Hashes of transactions in both sets.
std::map<h256, bytes> m_current; ///< Map of SHA3(tx) to tx.
std::multimap<Address, std::pair<h256, bytes>> m_future; ///< For transactions that have a future nonce; we map their sender address to the tx stuff, and insert once the sender has a valid TX.
};

Loading…
Cancel
Save