Browse Source

PoC-6 networking.

cl-refactor
Gav Wood 11 years ago
parent
commit
f5c22035fc
  1. 2
      libethcore/BlockInfo.cpp
  2. 2
      libethcore/CommonEth.cpp
  3. 2
      libethential/FixedHash.h
  4. 2
      libethereum/BlockChain.cpp
  5. 6
      libethereum/Client.h
  6. 2
      libethereum/PeerNetwork.cpp
  7. 16
      libethereum/PeerNetwork.h
  8. 63
      libethereum/PeerServer.cpp
  9. 24
      libethereum/PeerServer.h
  10. 296
      libethereum/PeerSession.cpp
  11. 16
      libethereum/PeerSession.h

2
libethcore/BlockInfo.cpp

@ -30,7 +30,7 @@
using namespace std; using namespace std;
using namespace eth; using namespace eth;
u256 eth::c_genesisDifficulty = (u256)1 << 22; u256 eth::c_genesisDifficulty = (u256)1 << 12;
BlockInfo::BlockInfo(): timestamp(Invalid256) BlockInfo::BlockInfo(): timestamp(Invalid256)
{ {

2
libethcore/CommonEth.cpp

@ -29,7 +29,7 @@ using namespace eth;
//#define ETH_ADDRESS_DEBUG 1 //#define ETH_ADDRESS_DEBUG 1
const unsigned eth::c_protocolVersion = 26; const unsigned eth::c_protocolVersion = 27;
const unsigned eth::c_databaseVersion = 1; const unsigned eth::c_databaseVersion = 1;
static const vector<pair<u256, string>> g_units = static const vector<pair<u256, string>> g_units =

2
libethential/FixedHash.h

@ -102,7 +102,7 @@ public:
byte operator[](unsigned _i) const { return m_data[_i]; } byte operator[](unsigned _i) const { return m_data[_i]; }
/// @returns an abridged version of the hash as a user-readable hex string. /// @returns an abridged version of the hash as a user-readable hex string.
std::string abridged() const { return toHex(ref().cropped(0, 4)) + ".."; } std::string abridged() const { return toHex(ref().cropped(0, 4)) + "\342\200\246"; }
/// @returns a mutable byte vector_ref to the object's data. /// @returns a mutable byte vector_ref to the object's data.
bytesRef ref() { return bytesRef(m_data.data(), N); } bytesRef ref() { return bytesRef(m_data.data(), N); }

2
libethereum/BlockChain.cpp

@ -402,7 +402,7 @@ bytes BlockChain::block(h256 _hash) const
memcpy(m_cache[_hash].data(), d.data(), d.size()); memcpy(m_cache[_hash].data(), d.data(), d.size());
if (!d.size()) if (!d.size())
cwarn << "Couldn't find requested block:" << _hash; cwarn << "Couldn't find requested block:" << _hash.abridged();
return m_cache[_hash]; return m_cache[_hash];
} }

6
libethereum/Client.h

@ -154,9 +154,9 @@ struct ClientWatch
struct WatchChannel: public LogChannel { static const char* name() { return "(o)"; } static const int verbosity = 7; }; struct WatchChannel: public LogChannel { static const char* name() { return "(o)"; } static const int verbosity = 7; };
#define cwatch eth::LogOutputStream<eth::WatchChannel, true>() #define cwatch eth::LogOutputStream<eth::WatchChannel, true>()
struct WorkInChannel: public LogChannel { static const char* name() { return ">W>"; } static const int verbosity = 6; }; struct WorkInChannel: public LogChannel { static const char* name() { return ">W>"; } static const int verbosity = 16; };
struct WorkOutChannel: public LogChannel { static const char* name() { return "<W<"; } static const int verbosity = 6; }; struct WorkOutChannel: public LogChannel { static const char* name() { return "<W<"; } static const int verbosity = 16; };
struct WorkChannel: public LogChannel { static const char* name() { return "-W-"; } static const int verbosity = 6; }; struct WorkChannel: public LogChannel { static const char* name() { return "-W-"; } static const int verbosity = 16; };
#define cwork eth::LogOutputStream<eth::WorkChannel, true>() #define cwork eth::LogOutputStream<eth::WorkChannel, true>()
#define cworkin eth::LogOutputStream<eth::WorkInChannel, true>() #define cworkin eth::LogOutputStream<eth::WorkInChannel, true>()
#define cworkout eth::LogOutputStream<eth::WorkOutChannel, true>() #define cworkout eth::LogOutputStream<eth::WorkOutChannel, true>()

2
libethereum/PeerNetwork.cpp

@ -35,7 +35,7 @@ bool eth::isPrivateAddress(bi::address _addressToCheck)
bi::address_v4::bytes_type bytesToCheck = v4Address.to_bytes(); bi::address_v4::bytes_type bytesToCheck = v4Address.to_bytes();
if (bytesToCheck[0] == 10 || bytesToCheck[0] == 127) if (bytesToCheck[0] == 10 || bytesToCheck[0] == 127)
return true; return true;
if (bytesToCheck[0] == 172 && (bytesToCheck[1] >= 16 && bytesToCheck[1] <=31)) if (bytesToCheck[0] == 172 && (bytesToCheck[1] >= 16 && bytesToCheck[1] <= 31))
return true; return true;
if (bytesToCheck[0] == 192 && bytesToCheck[1] == 168) if (bytesToCheck[0] == 192 && bytesToCheck[1] == 168)
return true; return true;

16
libethereum/PeerNetwork.h

@ -37,6 +37,11 @@ namespace eth
bool isPrivateAddress(bi::address _addressToCheck); bool isPrivateAddress(bi::address _addressToCheck);
static const eth::uint c_maxHashes = 32; ///< Maximum number of hashes BlockHashes will ever send.
static const eth::uint c_maxHashesAsk = 32; ///< Maximum number of hashes GetBlockHashes will ever ask for.
static const eth::uint c_maxBlocks = 16; ///< Maximum number of blocks Blocks will ever send.
static const eth::uint c_maxBlocksAsk = 16; ///< Maximum number of blocks we ask to receive in Blocks (when using GetChain).
class OverlayDB; class OverlayDB;
class BlockChain; class BlockChain;
class TransactionQueue; class TransactionQueue;
@ -50,9 +55,9 @@ struct NetConnect: public LogChannel { static const char* name() { return "+N+";
struct NetMessageDetail: public LogChannel { static const char* name() { return "=N="; } static const int verbosity = 5; }; struct NetMessageDetail: public LogChannel { static const char* name() { return "=N="; } static const int verbosity = 5; };
struct NetTriviaSummary: public LogChannel { static const char* name() { return "-N-"; } static const int verbosity = 10; }; struct NetTriviaSummary: public LogChannel { static const char* name() { return "-N-"; } static const int verbosity = 10; };
struct NetTriviaDetail: public LogChannel { static const char* name() { return "=N="; } static const int verbosity = 11; }; struct NetTriviaDetail: public LogChannel { static const char* name() { return "=N="; } static const int verbosity = 11; };
struct NetAllDetail: public LogChannel { static const char* name() { return "=N="; } static const int verbosity = 15; }; struct NetAllDetail: public LogChannel { static const char* name() { return "=N="; } static const int verbosity = 13; };
struct NetRight: public LogChannel { static const char* name() { return ">N>"; } static const int verbosity = 18; }; struct NetRight: public LogChannel { static const char* name() { return ">N>"; } static const int verbosity = 14; };
struct NetLeft: public LogChannel { static const char* name() { return "<N<"; } static const int verbosity = 19; }; struct NetLeft: public LogChannel { static const char* name() { return "<N<"; } static const int verbosity = 15; };
enum PacketType enum PacketType
{ {
@ -66,7 +71,10 @@ enum PacketType
BlocksPacket, BlocksPacket,
GetChainPacket, GetChainPacket,
NotInChainPacket, NotInChainPacket,
GetTransactionsPacket GetTransactionsPacket,
GetBlockHashesPacket,
BlockHashesPacket,
GetBlocksPacket,
}; };
enum DisconnectReason enum DisconnectReason

63
libethereum/PeerServer.cpp

@ -108,6 +108,10 @@ PeerServer::PeerServer(std::string const& _clientVersion, BlockChain const& _ch,
PeerServer::~PeerServer() PeerServer::~PeerServer()
{ {
disconnectPeers(); disconnectPeers();
for (auto i: m_peers)
if (shared_ptr<PeerSession> p = i.second.lock())
p->giveUpOnFetch();
} }
void PeerServer::registerPeer(std::shared_ptr<PeerSession> _s) void PeerServer::registerPeer(std::shared_ptr<PeerSession> _s)
@ -363,13 +367,46 @@ void PeerServer::connect(bi::tcp::endpoint const& _ep)
}); });
} }
h256Set PeerServer::neededBlocks()
{
Guard l(x_blocksNeeded);
h256Set ret;
if (m_blocksNeeded.size())
{
while (ret.size() < c_maxBlocksAsk && m_blocksNeeded.size())
{
ret.insert(m_blocksNeeded.back());
m_blocksOnWay.insert(m_blocksNeeded.back());
m_blocksNeeded.pop_back();
}
}
else
for (auto i = m_blocksOnWay.begin(); ret.size() < c_maxBlocksAsk && i != m_blocksOnWay.end(); ++i)
ret.insert(*i);
return ret;
}
bool PeerServer::havePeer(Public _id) const
{
Guard l(x_peers);
// Remove dead peers from list.
for (auto i = m_peers.begin(); i != m_peers.end();)
if (i->second.lock().get())
++i;
else
i = m_peers.erase(i);
return m_peers.count(_id);
}
bool PeerServer::ensureInitialised(TransactionQueue& _tq) bool PeerServer::ensureInitialised(TransactionQueue& _tq)
{ {
if (m_latestBlockSent == h256()) if (m_latestBlockSent == h256())
{ {
// First time - just initialise. // First time - just initialise.
m_latestBlockSent = m_chain->currentHash(); m_latestBlockSent = m_chain->currentHash();
clog(NetNote) << "Initialising: latest=" << m_latestBlockSent; clog(NetNote) << "Initialising: latest=" << m_latestBlockSent.abridged();
for (auto const& i: _tq.transactions()) for (auto const& i: _tq.transactions())
m_transactionsSent.insert(i.first); m_transactionsSent.insert(i.first);
@ -381,6 +418,8 @@ bool PeerServer::ensureInitialised(TransactionQueue& _tq)
bool PeerServer::noteBlock(h256 _hash, bytesConstRef _data) bool PeerServer::noteBlock(h256 _hash, bytesConstRef _data)
{ {
Guard l(x_blocksNeeded);
m_blocksOnWay.erase(_hash);
if (!m_chain->details(_hash)) if (!m_chain->details(_hash))
{ {
lock_guard<recursive_mutex> l(m_incomingLock); lock_guard<recursive_mutex> l(m_incomingLock);
@ -522,6 +561,28 @@ void PeerServer::growPeers()
} }
} }
void PeerServer::noteHaveChain(std::shared_ptr<PeerSession> const& _from)
{
auto td = _from->m_totalDifficulty;
if ((m_totalDifficultyOfNeeded && td < m_totalDifficultyOfNeeded) || td < m_chain->details().totalDifficulty)
return;
{
Guard l(x_blocksNeeded);
m_blocksNeeded = _from->m_neededBlocks;
}
// Looks like it's the best yet for total difficulty. Set to download.
{
Guard l(x_peers);
for (auto const& i: m_peers)
if (shared_ptr<PeerSession> p = i.second.lock())
p->ensureGettingChain();
}
}
void PeerServer::prunePeers() void PeerServer::prunePeers()
{ {
Guard l(x_peers); Guard l(x_peers);

24
libethereum/PeerServer.h

@ -37,6 +37,7 @@ namespace bi = boost::asio::ip;
namespace eth namespace eth
{ {
class RLPStream;
class TransactionQueue; class TransactionQueue;
class BlockQueue; class BlockQueue;
@ -77,11 +78,13 @@ public:
/// This won't touch alter the blockchain. /// This won't touch alter the blockchain.
void process() { if (isInitialised()) m_ioService.poll(); } void process() { if (isInitialised()) m_ioService.poll(); }
bool havePeer(Public _id) const { Guard l(x_peers); return m_peers.count(_id) != 0; } /// @returns true iff we have the a peer of the given id.
bool havePeer(Public _id) const;
/// Set ideal number of peers. /// Set ideal number of peers.
void setIdealPeerCount(unsigned _n) { m_idealPeerCount = _n; } void setIdealPeerCount(unsigned _n) { m_idealPeerCount = _n; }
/// Set the mode of operation on the network.
void setMode(NodeMode _m) { m_mode = _m; } void setMode(NodeMode _m) { m_mode = _m; }
/// Get peer information. /// Get peer information.
@ -96,7 +99,10 @@ public:
/// Get the port we're listening on currently. /// Get the port we're listening on currently.
unsigned short listenPort() const { return m_public.port(); } unsigned short listenPort() const { return m_public.port(); }
/// Serialise the set of known peers.
bytes savePeers() const; bytes savePeers() const;
/// Deserialise the data and populate the set of known peers.
void restorePeers(bytesConstRef _b); void restorePeers(bytesConstRef _b);
void registerPeer(std::shared_ptr<PeerSession> _s); void registerPeer(std::shared_ptr<PeerSession> _s);
@ -105,6 +111,10 @@ private:
/// Session wants to pass us a block that we might not have. /// Session wants to pass us a block that we might not have.
/// @returns true if we didn't have it. /// @returns true if we didn't have it.
bool noteBlock(h256 _hash, bytesConstRef _data); bool noteBlock(h256 _hash, bytesConstRef _data);
/// Session has finished getting the chain of hashes.
void noteHaveChain(std::shared_ptr<PeerSession> const& _who);
/// Called when the session has provided us with a new peer we can connect to.
void noteNewPeers() {}
void seal(bytes& _b); void seal(bytes& _b);
void populateAddresses(); void populateAddresses();
@ -116,6 +126,11 @@ private:
void maintainTransactions(TransactionQueue& _tq, h256 _currentBlock); void maintainTransactions(TransactionQueue& _tq, h256 _currentBlock);
void maintainBlocks(BlockQueue& _bq, h256 _currentBlock); void maintainBlocks(BlockQueue& _bq, h256 _currentBlock);
/// Get a bunch of needed blocks.
/// Removes them from our list of needed blocks.
/// @returns empty if there's no more blocks left to fetch, otherwise the blocks to fetch.
h256Set neededBlocks();
/// Check to see if the network peer-state initialisation has happened. /// Check to see if the network peer-state initialisation has happened.
bool isInitialised() const { return m_latestBlockSent; } bool isInitialised() const { return m_latestBlockSent; }
/// Initialises the network peer-state, doing the stuff that needs to be once-only. @returns true if it really was first. /// Initialises the network peer-state, doing the stuff that needs to be once-only. @returns true if it really was first.
@ -140,7 +155,7 @@ private:
u256 m_networkId; u256 m_networkId;
mutable std::mutex x_peers; mutable std::mutex x_peers;
std::map<Public, std::weak_ptr<PeerSession>> m_peers; mutable std::map<Public, std::weak_ptr<PeerSession>> m_peers; // mutable because we flush zombie entries (null-weakptrs) as regular maintenance from a const method.
mutable std::recursive_mutex m_incomingLock; mutable std::recursive_mutex m_incomingLock;
std::vector<bytes> m_incomingTransactions; std::vector<bytes> m_incomingTransactions;
@ -148,6 +163,11 @@ private:
std::map<Public, std::pair<bi::tcp::endpoint, unsigned>> m_incomingPeers; std::map<Public, std::pair<bi::tcp::endpoint, unsigned>> m_incomingPeers;
std::vector<Public> m_freePeers; std::vector<Public> m_freePeers;
mutable std::mutex x_blocksNeeded;
u256 m_totalDifficultyOfNeeded;
h256s m_blocksNeeded; /// From latest to earliest.
h256Set m_blocksOnWay;
h256 m_latestBlockSent; h256 m_latestBlockSent;
std::set<h256> m_transactionsSent; std::set<h256> m_transactionsSent;

296
libethereum/PeerSession.cpp

@ -31,10 +31,6 @@ using namespace eth;
#define clogS(X) eth::LogOutputStream<X, true>(false) << "| " << std::setw(2) << m_socket.native_handle() << "] " #define clogS(X) eth::LogOutputStream<X, true>(false) << "| " << std::setw(2) << m_socket.native_handle() << "] "
static const eth::uint c_maxHashes = 4096; ///< Maximum number of hashes GetChain will ever send.
static const eth::uint c_maxBlocks = 2048; ///< Maximum number of blocks Blocks will ever send.
static const eth::uint c_maxBlocksAsk = 512; ///< Maximum number of blocks we ask to receive in Blocks (when using GetChain).
PeerSession::PeerSession(PeerServer* _s, bi::tcp::socket _socket, u256 _rNId, bi::address _peerAddress, unsigned short _peerPort): PeerSession::PeerSession(PeerServer* _s, bi::tcp::socket _socket, u256 _rNId, bi::address _peerAddress, unsigned short _peerPort):
m_server(_s), m_server(_s),
m_socket(std::move(_socket)), m_socket(std::move(_socket)),
@ -49,6 +45,8 @@ PeerSession::PeerSession(PeerServer* _s, bi::tcp::socket _socket, u256 _rNId, bi
PeerSession::~PeerSession() PeerSession::~PeerSession()
{ {
giveUpOnFetch();
// Read-chain finished for one reason or another. // Read-chain finished for one reason or another.
try try
{ {
@ -58,6 +56,21 @@ PeerSession::~PeerSession()
catch (...){} catch (...){}
} }
void PeerSession::giveUpOnFetch()
{
if (m_askedBlocks.size())
{
Guard l (m_server->x_blocksNeeded);
m_server->m_blocksNeeded.reserve(m_server->m_blocksNeeded.size() + m_askedBlocks.size());
for (auto i: m_askedBlocks)
{
m_server->m_blocksOnWay.erase(i);
m_server->m_blocksNeeded.push_back(i);
}
m_askedBlocks.clear();
}
}
bi::tcp::endpoint PeerSession::endpoint() const bi::tcp::endpoint PeerSession::endpoint() const
{ {
if (m_socket.is_open()) if (m_socket.is_open())
@ -83,6 +96,8 @@ bool PeerSession::interpret(RLP const& _r)
m_caps = _r[4].toInt<uint>(); m_caps = _r[4].toInt<uint>();
m_listenPort = _r[5].toInt<unsigned short>(); m_listenPort = _r[5].toInt<unsigned short>();
m_id = _r[6].toHash<h512>(); m_id = _r[6].toHash<h512>();
m_totalDifficulty = _r[7].toInt<u256>();
m_latestHash = _r[8].toHash<h256>();
clogS(NetMessageSummary) << "Hello: " << clientVersion << "V[" << m_protocolVersion << "/" << m_networkId << "]" << m_id.abridged() << showbase << hex << m_caps << dec << m_listenPort; clogS(NetMessageSummary) << "Hello: " << clientVersion << "V[" << m_protocolVersion << "/" << m_networkId << "]" << m_id.abridged() << showbase << hex << m_caps << dec << m_listenPort;
@ -153,7 +168,7 @@ bool PeerSession::interpret(RLP const& _r)
s << PeersPacket; s << PeersPacket;
for (auto i: peers) for (auto i: peers)
{ {
clogS(NetTriviaDetail) << "Sending peer " << toHex(i.first.ref().cropped(0, 4)) << i.second; clogS(NetTriviaDetail) << "Sending peer " << i.first.abridged() << i.second;
s.appendList(3) << bytesConstRef(i.second.address().to_v4().to_bytes().data(), 4) << i.second.port() << i.first; s.appendList(3) << bytesConstRef(i.second.address().to_v4().to_bytes().data(), 4) << i.second.port() << i.first;
} }
sealAndSend(s); sealAndSend(s);
@ -186,6 +201,7 @@ bool PeerSession::interpret(RLP const& _r)
goto CONTINUE; goto CONTINUE;
m_server->m_incomingPeers[id] = make_pair(ep, 0); m_server->m_incomingPeers[id] = make_pair(ep, 0);
m_server->m_freePeers.push_back(id); m_server->m_freePeers.push_back(id);
m_server->noteNewPeers();
clogS(NetTriviaDetail) << "New peer: " << ep << "(" << id << ")"; clogS(NetTriviaDetail) << "New peer: " << ep << "(" << id << ")";
CONTINUE:; CONTINUE:;
} }
@ -201,25 +217,99 @@ bool PeerSession::interpret(RLP const& _r)
m_knownTransactions.insert(sha3(_r[i].data())); m_knownTransactions.insert(sha3(_r[i].data()));
} }
break; break;
case GetBlockHashesPacket:
{
if (m_server->m_mode == NodeMode::PeerServer)
break;
unsigned limit = _r[1].toInt<unsigned>();
h256 later = _r[2].toHash<h256>();
clogS(NetMessageSummary) << "GetBlockHashes (" << limit << "entries, " << later.abridged() << ")";
unsigned c = min<unsigned>(m_server->m_chain->number(later), limit);
RLPStream s;
prep(s).appendList(1 + c).append(BlockHashesPacket);
h256 p = m_server->m_chain->details(later).parent;
for (unsigned i = 0; i < c; ++i, p = m_server->m_chain->details(p).parent)
s << p;
sealAndSend(s);
break;
}
case BlockHashesPacket:
{
if (m_server->m_mode == NodeMode::PeerServer)
break;
clogS(NetMessageSummary) << "BlockHashes (" << dec << (_r.itemCount() - 1) << " entries)";
if (_r.itemCount() == 1)
{
m_server->noteHaveChain(shared_from_this());
return true;
}
for (unsigned i = 1; i < _r.itemCount(); ++i)
{
auto h = _r[i].toHash<h256>();
if (m_server->m_chain->details(h))
{
m_server->noteHaveChain(shared_from_this());
return true;
}
else
m_neededBlocks.push_back(h);
}
// run through - ask for more.
RLPStream s;
prep(s).appendList(3);
s << GetBlockHashesPacket << c_maxHashesAsk << m_neededBlocks.back();
sealAndSend(s);
break;
}
case GetBlocksPacket:
{
if (m_server->m_mode == NodeMode::PeerServer)
break;
clogS(NetMessageSummary) << "GetBlocks (" << dec << (_r.itemCount() - 1) << " entries)";
// TODO: return the requested blocks.
bytes rlp;
unsigned n = 0;
for (unsigned i = 1; i < _r.itemCount() && i <= c_maxBlocks; ++i)
{
auto b = m_server->m_chain->block(_r[i].toHash<h256>());
if (b.size())
{
rlp += b;
++n;
}
}
RLPStream s;
sealAndSend(prep(s).appendList(n + 1).append(BlocksPacket).appendRaw(rlp, n));
break;
}
case BlocksPacket: case BlocksPacket:
{ {
if (m_server->m_mode == NodeMode::PeerServer) if (m_server->m_mode == NodeMode::PeerServer)
break; break;
clogS(NetMessageSummary) << "Blocks (" << dec << (_r.itemCount() - 1) << " entries)"; clogS(NetMessageSummary) << "Blocks (" << dec << (_r.itemCount() - 1) << " entries)";
if (_r.itemCount() == 1)
{
// Couldn't get any from last batch - probably got to this peer's latest block - just give up.
giveUpOnFetch();
break;
}
unsigned used = 0; unsigned used = 0;
for (unsigned i = 1; i < _r.itemCount(); ++i) for (unsigned i = 1; i < _r.itemCount(); ++i)
{ {
auto h = sha3(_r[i].data()); auto h = sha3(_r[i].data());
if (m_server->noteBlock(h, _r[i].data())) if (m_server->noteBlock(h, _r[i].data()))
{
m_knownBlocks.insert(h);
used++; used++;
} m_askedBlocks.erase(h);
m_knownBlocks.insert(h);
} }
m_rating += used; m_rating += used;
unsigned knownParents = 0; unsigned knownParents = 0;
unsigned unknownParents = 0; unsigned unknownParents = 0;
if (g_logVerbosity >= 2) if (g_logVerbosity >= NetMessageSummary::verbosity)
{ {
for (unsigned i = 1; i < _r.itemCount(); ++i) for (unsigned i = 1; i < _r.itemCount(); ++i)
{ {
@ -228,138 +318,17 @@ bool PeerSession::interpret(RLP const& _r)
if (!m_server->m_chain->details(bi.parentHash) && !m_knownBlocks.count(bi.parentHash)) if (!m_server->m_chain->details(bi.parentHash) && !m_knownBlocks.count(bi.parentHash))
{ {
unknownParents++; unknownParents++;
clogS(NetMessageDetail) << "Unknown parent " << bi.parentHash << " of block " << h; clogS(NetAllDetail) << "Unknown parent " << bi.parentHash << " of block " << h;
} }
else else
{ {
knownParents++; knownParents++;
clogS(NetMessageDetail) << "Known parent " << bi.parentHash << " of block " << h; clogS(NetAllDetail) << "Known parent " << bi.parentHash << " of block " << h;
} }
} }
} }
clogS(NetMessageSummary) << dec << knownParents << " known parents, " << unknownParents << "unknown, " << used << "used."; clogS(NetMessageSummary) << dec << knownParents << " known parents, " << unknownParents << "unknown, " << used << "used.";
if (used) // we received some - check if there's any more ensureGettingChain();
{
RLPStream s;
prep(s).appendList(3);
s << GetChainPacket;
s << sha3(_r[1].data());
s << c_maxBlocksAsk;
sealAndSend(s);
}
else
clogS(NetMessageSummary) << "Peer sent all blocks in chain.";
break;
}
case GetChainPacket:
{
if (m_server->m_mode == NodeMode::PeerServer)
break;
clogS(NetMessageSummary) << "GetChain (" << (_r.itemCount() - 2) << " hashes, " << (_r[_r.itemCount() - 1].toInt<bigint>()) << ")";
// ********************************************************************
// NEEDS FULL REWRITE!
h256s parents;
parents.reserve(_r.itemCount() - 2);
for (unsigned i = 1; i < _r.itemCount() - 1; ++i)
parents.push_back(_r[i].toHash<h256>());
if (_r.itemCount() == 2)
break;
// return 2048 block max.
uint baseCount = (uint)min<bigint>(_r[_r.itemCount() - 1].toInt<bigint>(), c_maxBlocks);
clogS(NetMessageSummary) << "GetChain (" << baseCount << " max, from " << parents.front() << " to " << parents.back() << ")";
for (auto parent: parents)
{
auto h = m_server->m_chain->currentHash();
h256 latest = m_server->m_chain->currentHash();
uint latestNumber = 0;
uint parentNumber = 0;
RLPStream s;
// try to find parent in our blockchain
// todo: add some delta() fn to blockchain
BlockDetails fParent = m_server->m_chain->details(parent);
if (fParent)
{
latestNumber = m_server->m_chain->number(latest);
parentNumber = fParent.number;
uint count = min<uint>(latestNumber - parentNumber, baseCount);
clogS(NetAllDetail) << "Requires " << dec << (latestNumber - parentNumber) << " blocks from " << latestNumber << " to " << parentNumber;
clogS(NetAllDetail) << latest << " - " << parent;
prep(s);
s.appendList(1 + count) << BlocksPacket;
uint endNumber = parentNumber;
uint startNumber = endNumber + count;
clogS(NetAllDetail) << "Sending " << dec << count << " blocks from " << startNumber << " to " << endNumber;
// append blocks
uint n = latestNumber;
// seek back (occurs when count is limited by baseCount)
for (; n > startNumber; n--, h = m_server->m_chain->details(h).parent) {}
for (uint i = 0; i < count; ++i, --n, h = m_server->m_chain->details(h).parent)
{
if (h == parent || n == endNumber)
{
cwarn << "BUG! Couldn't create the reply for GetChain!";
return true;
}
clogS(NetAllDetail) << " " << dec << i << " " << h;
s.appendRaw(m_server->m_chain->block(h));
}
if (!count)
clogS(NetMessageSummary) << "Sent peer all we have.";
clogS(NetAllDetail) << "Parent: " << h;
}
else if (parent != parents.back())
continue;
if (h != parent)
{
// not in the blockchain;
if (parent == parents.back())
{
// out of parents...
clogS(NetAllDetail) << "GetChain failed; not in chain";
// No good - must have been on a different branch.
s.clear();
prep(s).appendList(2) << NotInChainPacket << parents.back();
}
else
// still some parents left - try them.
continue;
}
// send the packet (either Blocks or NotInChain) & exit.
sealAndSend(s);
break;
// ********************************************************************
}
break;
}
case NotInChainPacket:
{
if (m_server->m_mode == NodeMode::PeerServer)
break;
h256 noGood = _r[1].toHash<h256>();
clogS(NetMessageSummary) << "NotInChain (" << noGood << ")";
if (noGood == m_server->m_chain->genesisHash())
{
clogS(NetWarn) << "Discordance over genesis block! Disconnect.";
disconnect(WrongGenesis);
}
else
{
uint count = std::min(c_maxHashes, m_server->m_chain->number(noGood));
RLPStream s;
prep(s).appendList(2 + count);
s << GetChainPacket;
auto h = m_server->m_chain->details(noGood).parent;
for (uint i = 0; i < count; ++i, h = m_server->m_chain->details(h).parent)
s << h;
s << c_maxBlocksAsk;
sealAndSend(s);
}
break;
} }
case GetTransactionsPacket: case GetTransactionsPacket:
{ {
@ -374,6 +343,24 @@ bool PeerSession::interpret(RLP const& _r)
return true; return true;
} }
void PeerSession::ensureGettingChain()
{
if (!m_askedBlocks.size())
m_askedBlocks = m_server->neededBlocks();
if (m_askedBlocks.size())
{
RLPStream s;
prep(s);
s.appendList(m_askedBlocks.size() + 1) << GetBlocksPacket;
for (auto i: m_askedBlocks)
s << i;
sealAndSend(s);
}
else
clogS(NetMessageSummary) << "No blocks left to get.";
}
void PeerSession::ping() void PeerSession::ping()
{ {
RLPStream s; RLPStream s;
@ -381,6 +368,12 @@ void PeerSession::ping()
m_ping = std::chrono::steady_clock::now(); m_ping = std::chrono::steady_clock::now();
} }
void PeerSession::getPeers()
{
RLPStream s;
sealAndSend(prep(s).appendList(1) << GetPeersPacket);
}
RLPStream& PeerSession::prep(RLPStream& _s) RLPStream& PeerSession::prep(RLPStream& _s)
{ {
return _s.appendRaw(bytes(8, 0)); return _s.appendRaw(bytes(8, 0));
@ -508,30 +501,37 @@ void PeerSession::start()
{ {
RLPStream s; RLPStream s;
prep(s); prep(s);
s.appendList(7) << HelloPacket << (uint)PeerServer::protocolVersion() << m_server->networkId() << m_server->m_clientVersion << (m_server->m_mode == NodeMode::Full ? 0x07 : m_server->m_mode == NodeMode::PeerServer ? 0x01 : 0) << m_server->m_public.port() << m_server->m_key.pub(); s.appendList(9) << HelloPacket
<< (uint)PeerServer::protocolVersion()
<< m_server->networkId()
<< m_server->m_clientVersion
<< (m_server->m_mode == NodeMode::Full ? 0x07 : m_server->m_mode == NodeMode::PeerServer ? 0x01 : 0)
<< m_server->m_public.port()
<< m_server->m_key.pub()
<< m_server->m_chain->details().totalDifficulty
<< m_server->m_chain->currentHash();
sealAndSend(s); sealAndSend(s);
ping(); ping();
getPeers();
doRead(); doRead();
} }
void PeerSession::startInitialSync() void PeerSession::startInitialSync()
{ {
uint n = m_server->m_chain->number(m_server->m_latestBlockSent); h256 c = m_server->m_chain->currentHash();
clogS(NetAllDetail) << "Want chain. Latest:" << m_server->m_latestBlockSent << ", number:" << n; uint n = m_server->m_chain->number();
uint count = std::min(c_maxHashes, n + 1); u256 td = max(m_server->m_chain->details().totalDifficulty, m_server->m_totalDifficultyOfNeeded);
RLPStream s;
prep(s).appendList(2 + count); clogS(NetAllDetail) << "Initial sync. Latest:" << c.abridged() << ", number:" << n << ", TD: max(" << m_server->m_chain->details().totalDifficulty << "," << m_server->m_totalDifficultyOfNeeded << ") versus " << m_totalDifficulty;
s << GetChainPacket; if (td > m_totalDifficulty)
auto h = m_server->m_latestBlockSent; return; // All good - we have the better chain.
for (uint i = 0; i < count; ++i, h = m_server->m_chain->details(h).parent)
{
clogS(NetAllDetail) << " " << i << ":" << h;
s << h;
}
s << c_maxBlocksAsk; // Our chain isn't better - grab theirs.
RLPStream s;
prep(s).appendList(3);
s << GetBlockHashesPacket << c_maxHashesAsk << m_latestHash;
m_neededBlocks = h256s(1, m_latestHash);
sealAndSend(s); sealAndSend(s);
} }

16
libethereum/PeerSession.h

@ -33,6 +33,10 @@
namespace eth namespace eth
{ {
/**
* @brief The PeerSession class
* @todo Document fully.
*/
class PeerSession: public std::enable_shared_from_this<PeerSession> class PeerSession: public std::enable_shared_from_this<PeerSession>
{ {
friend class PeerServer; friend class PeerServer;
@ -52,6 +56,12 @@ public:
private: private:
void startInitialSync(); void startInitialSync();
void getPeers();
/// Ensure that we are waiting for a bunch of blocks from our peer.
void ensureGettingChain();
void giveUpOnFetch();
void dropped(); void dropped();
void doRead(); void doRead();
@ -84,6 +94,12 @@ private:
unsigned short m_listenPort; ///< Port that the remote client is listening on for connections. Useful for giving to peers. unsigned short m_listenPort; ///< Port that the remote client is listening on for connections. Useful for giving to peers.
uint m_caps; uint m_caps;
h256 m_latestHash; ///< Peer's latest block's hash.
u256 m_totalDifficulty; ///< Peer's latest block's total difficulty.
h256s m_neededBlocks; ///< The blocks that we should download from this peer.
h256Set m_askedBlocks; ///< The blocks for which we sent the last GetBlocks for but haven't received a corresponding Blocks.
std::chrono::steady_clock::time_point m_ping; std::chrono::steady_clock::time_point m_ping;
std::chrono::steady_clock::time_point m_connect; std::chrono::steady_clock::time_point m_connect;
std::chrono::steady_clock::time_point m_disconnect; std::chrono::steady_clock::time_point m_disconnect;

Loading…
Cancel
Save