Browse Source

Network functionality improved, code somewhat dirty now though.

cl-refactor
Gav Wood 11 years ago
parent
commit
be68b7a1f1
  1. 5
      libethereum/BlockChain.cpp
  2. 142
      libethereum/PeerNetwork.cpp
  3. 13
      libethereum/PeerNetwork.h

5
libethereum/BlockChain.cpp

@ -118,15 +118,16 @@ void BlockChain::import(bytes const& _block, Overlay const& _db)
bi.verifyInternals(&_block); bi.verifyInternals(&_block);
auto newHash = eth::sha3(_block); auto newHash = eth::sha3(_block);
cout << "Attempting import of " << newHash << "..." << endl;
// Check block doesn't already exist first! // Check block doesn't already exist first!
if (details(newHash)) if (details(newHash))
{ {
cout << " Not new." << endl; // cout << " Not new." << endl;
throw AlreadyHaveBlock(); throw AlreadyHaveBlock();
} }
cout << "Attempting import of " << newHash << "..." << endl;
// Work out its number as the parent's number + 1 // Work out its number as the parent's number + 1
auto pd = details(bi.parentHash); auto pd = details(bi.parentHash);
if (!pd) if (!pd)

142
libethereum/PeerNetwork.cpp

@ -31,7 +31,6 @@ PeerSession::PeerSession(PeerServer* _s, bi::tcp::socket _socket, uint _rNId):
m_socket(std::move(_socket)), m_socket(std::move(_socket)),
m_reqNetworkId(_rNId) m_reqNetworkId(_rNId)
{ {
} }
PeerSession::~PeerSession() PeerSession::~PeerSession()
@ -54,15 +53,27 @@ bool PeerSession::interpret(RLP const& _r)
disconnect(); disconnect();
return false; return false;
} }
m_info = PeerInfo({clientVersion, m_socket.remote_endpoint().address().to_string(), (short)m_socket.remote_endpoint().port(), std::chrono::steady_clock::duration()}); try
{ m_info = PeerInfo({clientVersion, m_socket.remote_endpoint().address().to_string(), (short)m_socket.remote_endpoint().port(), std::chrono::steady_clock::duration()}); }
catch (...)
{
disconnect();
return false;
}
cout << std::setw(2) << m_socket.native_handle() << " | Client version: " << clientVersion << endl; cout << std::setw(2) << m_socket.native_handle() << " | Client version: " << clientVersion << endl;
// Grab their block chain off them. // Grab their block chain off them.
{ {
RLPStream s; RLPStream s;
prep(s); prep(s).appendList(3);
s.appendList(3) << (uint)GetChain << m_server->m_latestBlockSent << 256; s << (uint)GetChain;
unsigned count = std::min<unsigned>(256, m_server->m_chain->details(m_server->m_latestBlockSent).number);
s.appendList(count);
auto h = m_server->m_chain->details(m_server->m_latestBlockSent).parent;
for (unsigned i = 0; i < count; ++i, h = m_server->m_chain->details(h).parent)
s << h;
s << 256;
sealAndSend(s); sealAndSend(s);
} }
break; break;
@ -95,7 +106,7 @@ bool PeerSession::interpret(RLP const& _r)
break; break;
} }
case Peers: case Peers:
cout << std::setw(2) << m_socket.native_handle() << " | Peers (" << _r[1].itemCount() << " entries)" << endl; cout << std::setw(2) << m_socket.native_handle() << " | Peers (" << dec << _r[1].itemCount() << " entries)" << endl;
for (auto i: _r[1]) for (auto i: _r[1])
{ {
auto ep = bi::tcp::endpoint(bi::address_v4(i[0].toArray<byte, 4>()), i[1].toInt<short>()); auto ep = bi::tcp::endpoint(bi::address_v4(i[0].toArray<byte, 4>()), i[1].toInt<short>());
@ -104,7 +115,7 @@ bool PeerSession::interpret(RLP const& _r)
} }
break; break;
case Transactions: case Transactions:
cout << std::setw(2) << m_socket.native_handle() << " | Transactions (" << _r[1].itemCount() << " entries)" << endl; cout << std::setw(2) << m_socket.native_handle() << " | Transactions (" << dec << _r[1].itemCount() << " entries)" << endl;
for (auto i: _r[1]) for (auto i: _r[1])
{ {
m_server->m_incomingTransactions.push_back(i.data().toBytes()); m_server->m_incomingTransactions.push_back(i.data().toBytes());
@ -112,49 +123,93 @@ bool PeerSession::interpret(RLP const& _r)
} }
break; break;
case Blocks: case Blocks:
cout << std::setw(2) << m_socket.native_handle() << " | Blocks (" << _r[1].itemCount() << " entries)" << endl; cout << std::setw(2) << m_socket.native_handle() << " | Blocks (" << dec << _r[1].itemCount() << " entries)" << endl;
for (auto i: _r[1]) for (auto i: _r[1])
{ {
m_server->m_incomingBlocks.push_back(i.data().toBytes()); m_server->m_incomingBlocks.push_back(i.data().toBytes());
m_knownBlocks.insert(sha3(i.data())); m_knownBlocks.insert(sha3(i.data()));
} }
if (_r[1].itemCount()) // we received some - check if there's any more
{
RLPStream s;
prep(s).appendList(3);
s << (uint)GetChain;
s.appendList(1);
s << sha3(_r[1][0].data());
s << 256;
sealAndSend(s);
}
break; break;
case GetChain: case GetChain:
{ {
h256 parent = _r[1].toHash<h256>(); // ********************************************************************
// return 256 block max. // NEEDS FULL REWRITE!
uint count = (uint)min<bigint>(_r[1].toInt<bigint>(), 2048);
cout << std::setw(2) << m_socket.native_handle() << " | GetChain (" << count << " max, from " << parent << ")" << endl; h256s parents = _r[1].toVector<h256>();
if (!parents.size())
break;
// return 2048 block max.
uint baseCount = (uint)min<bigint>(_r[2].toInt<bigint>(), 256);
cout << std::setw(2) << m_socket.native_handle() << " | GetChain (" << baseCount << " max, from " << parents.front() << " to " << parents.back() << ")" << endl;
for (auto parent: parents)
{
auto h = m_server->m_chain->currentHash();
h256 latest = m_server->m_chain->currentHash(); h256 latest = m_server->m_chain->currentHash();
uint latestNumber = 0; uint latestNumber = 0;
uint parentNumber = 0; uint parentNumber = 0;
RLPStream s;
if (m_server->m_chain->details(parent)) if (m_server->m_chain->details(parent))
{ {
latestNumber = m_server->m_chain->details(latest).number; latestNumber = m_server->m_chain->details(latest).number;
parentNumber = m_server->m_chain->details(parent).number; parentNumber = m_server->m_chain->details(parent).number;
} uint count = min<uint>(latestNumber - parentNumber, baseCount);
count = min<uint>(latestNumber - parentNumber, count); cout << "Requires " << dec << (latestNumber - parentNumber) << " blocks from " << latestNumber << " to " << parentNumber << endl;
RLPStream s; cout << latest << " - " << parent << endl;
prep(s); prep(s);
s.appendList(2) << (uint)Blocks; s.appendList(2) << (uint)Blocks;
s.appendList(count); s.appendList(count);
uint endNumber = m_server->m_chain->details(parent).number; uint endNumber = m_server->m_chain->details(parent).number;
uint startNumber = endNumber + count; uint startNumber = endNumber + count;
auto h = m_server->m_chain->currentHash(); cout << "Sending " << dec << count << " blocks from " << startNumber << " to " << endNumber << endl;
uint n = latestNumber; uint n = latestNumber;
for (; n > startNumber; n--, h = m_server->m_chain->details(h).parent) {} for (; n > startNumber; n--, h = m_server->m_chain->details(h).parent) {}
for (uint i = 0; h != parent && n > endNumber && i < count; ++i, --n, h = m_server->m_chain->details(h).parent) for (uint i = 0; h != parent && n > endNumber && i < count; ++i, --n, h = m_server->m_chain->details(h).parent)
{
// cout << " " << dec << i << " " << h << endl;
s.appendRaw(m_server->m_chain->block(h)); s.appendRaw(m_server->m_chain->block(h));
}
cout << "Parent: " << h << endl;
}
else if (parent != parents.back())
continue;
if (h != parent) if (h == parent)
{
}
else
{ {
// not in the blockchain;
if (parent == parents.back())
{
// out of parents...
cout << std::setw(2) << m_socket.native_handle() << " | GetChain failed; not in chain" << endl; cout << std::setw(2) << m_socket.native_handle() << " | GetChain failed; not in chain" << endl;
// No good - must have been on a different branch. // No good - must have been on a different branch.
s.clear(); s.clear();
prep(s).appendList(2) << (uint)NotInChain << parent; prep(s).appendList(2) << (uint)NotInChain << parents.back();
}
else
// still some parents left - try them.
continue;
} }
// send the packet (either Blocks or NotInChain) & exit.
sealAndSend(s); sealAndSend(s);
break; break;
// ********************************************************************
}
break;
} }
case NotInChain: case NotInChain:
{ {
@ -164,7 +219,13 @@ bool PeerSession::interpret(RLP const& _r)
{ {
RLPStream s; RLPStream s;
prep(s).appendList(3); prep(s).appendList(3);
s << (uint)GetChain << m_server->m_chain->details(noGood).parent << 2048; s << (uint)GetChain;
unsigned count = std::min<unsigned>(256, m_server->m_chain->details(noGood).number);
s.appendList(count);
auto h = m_server->m_chain->details(noGood).parent;
for (unsigned i = 0; i < count; ++i, h = m_server->m_chain->details(h).parent)
s << h;
s << 2048;
sealAndSend(s); sealAndSend(s);
} }
// else our peer obviously knows nothing if they're unable to give the descendents of the genesis! // else our peer obviously knows nothing if they're unable to give the descendents of the genesis!
@ -214,12 +275,13 @@ void PeerSession::sendDestroy(bytes& _msg)
std::shared_ptr<bytes> buffer = std::make_shared<bytes>(); std::shared_ptr<bytes> buffer = std::make_shared<bytes>();
swap(*buffer, _msg); swap(*buffer, _msg);
assert((*buffer)[0] == 0x22); assert((*buffer)[0] == 0x22);
cout << "Sending " << (buffer->size() - 8) << endl;
// cout << "Sending " << RLP(bytesConstRef(buffer.get()).cropped(8)) << endl; // cout << "Sending " << RLP(bytesConstRef(buffer.get()).cropped(8)) << endl;
ba::async_write(m_socket, ba::buffer(*buffer), [=](boost::system::error_code ec, std::size_t length) ba::async_write(m_socket, ba::buffer(*buffer), [=](boost::system::error_code ec, std::size_t length)
{ {
if (ec) if (ec)
dropped(); dropped();
// cout << length << " bytes written (EC: " << ec << ")" << endl; cout << length << " bytes written (EC: " << ec << ")" << endl;
}); });
} }
@ -227,12 +289,12 @@ void PeerSession::send(bytesConstRef _msg)
{ {
std::shared_ptr<bytes> buffer = std::make_shared<bytes>(_msg.toBytes()); std::shared_ptr<bytes> buffer = std::make_shared<bytes>(_msg.toBytes());
assert((*buffer)[0] == 0x22); assert((*buffer)[0] == 0x22);
// cout << "Sending " << RLP(bytesConstRef(buffer.get()).cropped(8)) << endl; cout << "Sending " << (_msg.size() - 8) << endl;// RLP(bytesConstRef(buffer.get()).cropped(8)) << endl;
ba::async_write(m_socket, ba::buffer(*buffer), [=](boost::system::error_code ec, std::size_t length) ba::async_write(m_socket, ba::buffer(*buffer), [=](boost::system::error_code ec, std::size_t length)
{ {
if (ec) if (ec)
dropped(); dropped();
// cout << length << " bytes written (EC: " << ec << ")" << endl; cout << length << " bytes written (EC: " << ec << ")" << endl;
}); });
} }
@ -291,7 +353,7 @@ void PeerSession::doRead()
else else
{ {
uint32_t len = fromBigEndian<uint32_t>(bytesConstRef(m_incoming.data() + 4, 4)); uint32_t len = fromBigEndian<uint32_t>(bytesConstRef(m_incoming.data() + 4, 4));
// cout << "Received packet of " << len << " bytes" << endl; cout << "Received packet of " << len << " bytes" << endl;
if (m_incoming.size() - 8 < len) if (m_incoming.size() - 8 < len)
break; break;
@ -327,6 +389,13 @@ PeerServer::PeerServer(std::string const& _clientVersion, uint _networkId):
{ {
} }
PeerServer::~PeerServer()
{
for (auto const& i: m_peers)
if (auto p = i.lock())
p->disconnect();
}
std::vector<bi::tcp::endpoint> PeerServer::potentialPeers() std::vector<bi::tcp::endpoint> PeerServer::potentialPeers()
{ {
std::vector<bi::tcp::endpoint> ret; std::vector<bi::tcp::endpoint> ret;
@ -421,6 +490,7 @@ void PeerServer::process(BlockChain& _bc, TransactionQueue& _tq, Overlay& _o)
m_latestBlockSent = _bc.currentHash(); m_latestBlockSent = _bc.currentHash();
for (auto const& i: _tq.transactions()) for (auto const& i: _tq.transactions())
m_transactionsSent.insert(i.first); m_transactionsSent.insert(i.first);
m_lastPeersRequest = chrono::steady_clock::now();
} }
process(_bc); process(_bc);
@ -481,7 +551,8 @@ void PeerServer::process(BlockChain& _bc, TransactionQueue& _tq, Overlay& _o)
for (bool accepted = 1; accepted;) for (bool accepted = 1; accepted;)
{ {
accepted = 0; accepted = 0;
for (auto it = m_incomingBlocks.begin(); it != m_incomingBlocks.end();) if (m_incomingBlocks.size())
for (auto it = prev(m_incomingBlocks.end());; --it)
{ {
try try
{ {
@ -492,22 +563,41 @@ void PeerServer::process(BlockChain& _bc, TransactionQueue& _tq, Overlay& _o)
catch (UnknownParent) catch (UnknownParent)
{ {
// Don't (yet) know its parent. Leave it for later. // Don't (yet) know its parent. Leave it for later.
++it;
} }
catch (...) catch (...)
{ {
// Some other error - erase it. // Some other error - erase it.
it = m_incomingBlocks.erase(it); it = m_incomingBlocks.erase(it);
} }
if (it == m_incomingBlocks.begin())
break;
} }
} }
// Connect to additional peers // Connect to additional peers
while (m_peers.size() < m_idealPeerCount && m_incomingPeers.size()) // TODO: Need to avoid connecting to self & existing peers. Existing peers is easy, but need portable method of listing all addresses we can listen to avoid self.
/*
while (m_peers.size() < m_idealPeerCount)
{
if (m_incomingPeers.empty())
{ {
if (chrono::steady_clock::now() - m_lastPeersRequest > chrono::seconds(10))
{
RLPStream s;
bytes b;
(PeerSession::prep(s).appendList(1) << GetPeers).swapOut(b);
PeerSession::seal(b);
for (auto const& i: m_peers)
if (auto p = i.lock())
p->send(&b);
m_lastPeersRequest = chrono::steady_clock::now();
}
break;
}
connect(m_incomingPeers.back()); connect(m_incomingPeers.back());
m_incomingPeers.pop_back(); m_incomingPeers.pop_back();
} }*/
} }
std::vector<PeerInfo> PeerServer::peers() const std::vector<PeerInfo> PeerServer::peers() const

13
libethereum/PeerNetwork.h

@ -53,7 +53,6 @@ enum PacketType
class PeerServer; class PeerServer;
// TODO: include in PeerSession for ease of copying out later.
struct PeerInfo struct PeerInfo
{ {
std::string clientVersion; std::string clientVersion;
@ -112,6 +111,7 @@ public:
PeerServer(std::string const& _clientVersion, BlockChain const& _ch, uint _networkId, short _port); PeerServer(std::string const& _clientVersion, BlockChain const& _ch, uint _networkId, short _port);
/// Start server, but don't listen. /// Start server, but don't listen.
PeerServer(std::string const& _clientVersion, uint _networkId); PeerServer(std::string const& _clientVersion, uint _networkId);
~PeerServer();
/// Connect to a peer explicitly. /// Connect to a peer explicitly.
bool connect(std::string const& _addr = "127.0.0.1", uint _port = 30303); bool connect(std::string const& _addr = "127.0.0.1", uint _port = 30303);
@ -119,20 +119,20 @@ public:
/// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network. /// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network.
/// Conduct I/O, polling, syncing, whatever. /// Conduct I/O, polling, syncing, whatever.
/// Ideally all time-consuming I/O is done in a background thread, but you get this call every 100ms or so anyway. /// Ideally all time-consuming I/O is done in a background thread or otherwise asynchronously, but you get this call every 100ms or so anyway.
void process(BlockChain& _bc, TransactionQueue&, Overlay& _o); void process(BlockChain& _bc, TransactionQueue&, Overlay& _o);
void process(BlockChain& _bc); void process(BlockChain& _bc);
/// Get number of peers connected.
unsigned peerCount() const { return m_peers.size(); }
/// Set ideal number of peers. /// Set ideal number of peers.
void setIdealPeerCount(uint _n) { m_idealPeerCount = _n; } void setIdealPeerCount(uint _n) { m_idealPeerCount = _n; }
/// Get peer information. /// Get peer information.
std::vector<PeerInfo> peers() const; std::vector<PeerInfo> peers() const;
/// Ping the peers. /// Get number of peers connected; equivalent to, but faster than, peers().size().
unsigned peerCount() const { return m_peers.size(); }
/// Ping the peers, to update the latency information.
void pingAll(); void pingAll();
private: private:
@ -156,6 +156,7 @@ private:
h256 m_latestBlockSent; h256 m_latestBlockSent;
std::set<h256> m_transactionsSent; std::set<h256> m_transactionsSent;
std::chrono::steady_clock::time_point m_lastPeersRequest;
unsigned m_idealPeerCount = 5; unsigned m_idealPeerCount = 5;
}; };

Loading…
Cancel
Save