Browse Source

Pinging.

cl-refactor
Gav Wood 11 years ago
parent
commit
97d1232b21
  1. 53
      libethereum/PeerNetwork.cpp
  2. 24
      libethereum/PeerNetwork.h
  3. 4
      test/peer.cpp

53
libethereum/PeerNetwork.cpp

@ -24,6 +24,15 @@
using namespace std; using namespace std;
using namespace eth; using namespace eth;
PeerSession::PeerSession(bi::tcp::socket _socket, uint _rNId): m_socket(std::move(_socket)), m_reqNetworkId(_rNId)
{
}
PeerSession::~PeerSession()
{
disconnect();
}
bool PeerSession::interpret(RLP const& _r) bool PeerSession::interpret(RLP const& _r)
{ {
switch (_r[0].toInt<unsigned>()) switch (_r[0].toInt<unsigned>())
@ -39,22 +48,34 @@ bool PeerSession::interpret(RLP const& _r)
} }
cout << std::setw(2) << m_socket.native_handle() << " | Client version: " << m_clientVersion << endl; cout << std::setw(2) << m_socket.native_handle() << " | Client version: " << m_clientVersion << endl;
break; break;
case Disconnect:
m_socket.close();
return false;
case Ping:
{
RLPStream s;
sealAndSend(prep(s).appendList(1) << Pong);
break;
} }
return true; case Pong:
cout << "Latency: " << chrono::duration_cast<chrono::milliseconds>(std::chrono::steady_clock::now() - m_ping).count() << " ms" << endl;
break;
default:
break;
} }
return true;
PeerSession::PeerSession(bi::tcp::socket _socket, uint _rNId): m_socket(std::move(_socket)), m_reqNetworkId(_rNId)
{
} }
PeerSession::~PeerSession() void PeerSession::ping()
{ {
disconnect(); RLPStream s;
sealAndSend(prep(s).appendList(1) << Ping);
m_ping = std::chrono::steady_clock::now();
} }
void PeerSession::prep(RLPStream& _s) RLPStream& PeerSession::prep(RLPStream& _s)
{ {
_s.appendRaw(bytes(8, 0)); return _s.appendRaw(bytes(8, 0));
} }
void PeerSession::sealAndSend(RLPStream& _s) void PeerSession::sealAndSend(RLPStream& _s)
@ -89,6 +110,7 @@ void PeerSession::disconnect()
s.appendList(1) << Disconnect; s.appendList(1) << Disconnect;
sealAndSend(s); sealAndSend(s);
sleep(1); sleep(1);
m_socket.close();
} }
void PeerSession::start() void PeerSession::start()
@ -108,7 +130,6 @@ void PeerSession::doRead()
{ {
if (!ec) if (!ec)
{ {
std::cout << "Got data" << std::endl;
m_incoming.resize(m_incoming.size() + length); m_incoming.resize(m_incoming.size() + length);
memcpy(m_incoming.data() + m_incoming.size() - length, m_data.data(), length); memcpy(m_incoming.data() + m_incoming.size() - length, m_data.data(), length);
while (m_incoming.size() > 8) while (m_incoming.size() > 8)
@ -128,7 +149,6 @@ void PeerSession::doRead()
break; break;
} }
} }
//doWrite(length);
doRead(); doRead();
}); });
} }
@ -185,6 +205,19 @@ bool PeerServer::connect(string const& _addr, uint _port)
} }
} }
void PeerServer::process()
{
m_ioService.poll();
// TODO: Gather all transactions, blocks & peers.
}
void PeerServer::pingAll()
{
for (auto& i: m_peers)
if (auto j = i.lock())
j->ping();
}
void PeerServer::sync(BlockChain& _bc, TransactionQueue const& _tq) void PeerServer::sync(BlockChain& _bc, TransactionQueue const& _tq)
{ {
/* /*

24
libethereum/PeerNetwork.h

@ -57,40 +57,48 @@ public:
~PeerSession(); ~PeerSession();
void start(); void start();
bool interpret(RLP const& _r);
void disconnect(); void disconnect();
void ping();
private: private:
void doRead(); void doRead();
void doWrite(std::size_t length); void doWrite(std::size_t length);
bool interpret(RLP const& _r);
void prep(RLPStream& _s); RLPStream& prep(RLPStream& _s);
void sealAndSend(RLPStream& _s); void sealAndSend(RLPStream& _s);
void send(bytes& _msg); void send(bytes& _msg);
bi::tcp::socket m_socket; bi::tcp::socket m_socket;
std::array<byte, 1024> m_data; std::array<byte, 65536> m_data;
bytes m_incoming; bytes m_incoming;
std::string m_clientVersion; std::string m_clientVersion;
uint m_protocolVersion; uint m_protocolVersion;
uint m_networkId; uint m_networkId;
uint m_reqNetworkId; uint m_reqNetworkId;
std::vector<bytes> m_incomingTransactions;
std::vector<bytes> m_incomingBlocks;
std::vector<bi::tcp::endpoint> m_incomingPeers;
std::chrono::steady_clock::time_point m_ping;
}; };
class PeerServer class PeerServer
{ {
public: public:
/// Start server, listening for connections on the given port.
PeerServer(uint _networkId, short _port); PeerServer(uint _networkId, short _port);
/// Start server, but don't listen.
PeerServer(uint _networkId); PeerServer(uint _networkId);
/// Conduct I/O, polling, syncing, whatever. /// Conduct I/O, polling, syncing, whatever.
/// Ideally all time-consuming I/O is done in a background thread, but you get this call every 100ms or so anyway. /// Ideally all time-consuming I/O is done in a background thread, but you get this call every 100ms or so anyway.
void run() { m_ioService.run(); } void process();
void process() { m_ioService.poll(); }
/// Connect to a peer explicitly.
bool connect(std::string const& _addr = "127.0.0.1", uint _port = 30303); bool connect(std::string const& _addr = "127.0.0.1", uint _port = 30303);
/// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network. /// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network.
@ -102,6 +110,8 @@ public:
/// Remove incoming transaction from the queue. Make sure you've finished with the data from any previous incomingTransaction() calls. /// Remove incoming transaction from the queue. Make sure you've finished with the data from any previous incomingTransaction() calls.
void popIncomingTransaction() {} void popIncomingTransaction() {}
void pingAll();
private: private:
void doAccept(); void doAccept();

4
test/peer.cpp

@ -49,10 +49,12 @@ int peerTest(int argc, char** argv)
if (!remoteHost.empty()) if (!remoteHost.empty())
pn.connect(remoteHost, remotePort); pn.connect(remoteHost, remotePort);
while (true) for (int i = 0; ; ++i)
{ {
usleep(100000); usleep(100000);
pn.process(); pn.process();
if (!(i % 10))
pn.pingAll();
} }
return 0; return 0;

Loading…
Cancel
Save