Browse Source

Peer network code NYW.

cl-refactor
Gav Wood 11 years ago
parent
commit
ee3f311607
  1. 2
      eth/main.cpp
  2. 115
      libethereum/PeerNetwork.cpp
  3. 174
      libethereum/PeerNetwork.h
  4. 8
      test/main.cpp
  5. 99
      test/peer.cpp
  6. 2
      test/trie.cpp

2
eth/main.cpp

@ -43,7 +43,7 @@ int main()
s.sync(bc);
s.sync(tq);
PeerNetwork net; // TODO: Implement - should run in background and send us events when blocks found and allow us to send blocks as required.
PeerServer net(0, 30303); // TODO: Implement - should run in background and send us events when blocks found and allow us to send blocks as required.
while (true)
{
// Process network events.

115
libethereum/PeerNetwork.cpp

@ -24,16 +24,104 @@
using namespace std;
using namespace eth;
PeerNetwork::PeerNetwork()
bool PeerServer::connect(string const& _addr, uint _port)
{
bi::tcp::resolver resolver(m_ioService);
cout << "Attempting connection to " << _addr << " @" << dec << _port << endl;
try
{
bi::tcp::socket s(m_ioService);
boost::asio::connect(s, resolver.resolve({ _addr, toString(_port) }));
auto p = make_shared<PeerSession>(std::move(s), m_requiredNetworkId);
m_peers.push_back(p);
p->start();
return true;
}
catch (exception& _e)
{
cout << "Connection refused (" << _e.what() << ")" << endl;
return false;
}
}
#if 0
void PeerConnection::start()
{
cout << "Connected." << endl;
RLPStream s;
prep(s);
s.appendList(4) << (uint)Hello << (uint)0 << (uint)0 << "Ethereum++/0.1.0";
sealAndSend(s);
handleRead();
}
void PeerConnection::handleRead()
{
m_socket.async_read_some(boost::asio::buffer(m_buffer), [&](boost::system::error_code ec, std::size_t length)
{
if (ec)
return; // bomb out on error.
std::cout << "Got data" << std::endl;
m_incoming.resize(m_incoming.size() + length);
memcpy(m_incoming.data() + m_incoming.size() - length, m_buffer.data(), length);
while (m_incoming.size() > 8)
{
uint32_t len = fromBigEndian<uint32_t>(bytesConstRef(m_incoming.data() + 4, 4));
if (m_incoming.size() - 8 >= len)
{
// enough has come in.
RLP r(bytesConstRef(m_incoming.data() + 8, len));
if (!interpret(r))
// error
break;
memmove(m_incoming.data(), m_incoming.data() + len + 8, m_incoming.size() - (len + 8));
m_incoming.resize(m_incoming.size() - (len + 8));
}
else
break;
}
handleRead();
});
}
PeerNetwork::PeerNetwork(uint _networkId):
m_ioService(),
m_acceptor(m_ioService, bi::tcp::endpoint(bi::tcp::v4(), 0)),
m_networkId(_networkId)
{
m_ioService.run();
}
PeerNetwork::PeerNetwork(uint _networkId, uint _listenPort):
m_ioService(),
m_acceptor(m_ioService, bi::tcp::endpoint(bi::tcp::v4(), _listenPort)),
m_networkId(_networkId)
{
try
{
start();
}
catch (exception& _e)
{
cerr << "Network error: " << _e.what() << endl;
exit(1);
}
}
PeerNetwork::~PeerNetwork()
{
}
bool PeerNetwork::connect(std::string const& _addr, uint _port)
{
cout << "Connecting to " << _addr << " @" << dec << _port << endl;
PeerConnection::pointer newConnection = PeerConnection::create(m_acceptor.get_io_service(), m_networkId);
return newConnection->connect(_addr, _port, m_ioService);
}
void PeerNetwork::process()
{
m_ioService.run();
}
void PeerNetwork::sync(BlockChain& _bc, TransactionQueue const& _tq)
@ -49,3 +137,28 @@ void PeerNetwork::sync(BlockChain& _bc, TransactionQueue const& _tq)
}
*/
}
void PeerNetwork::start()
{
m_listener = new thread([&](){ justListen(); });
}
void PeerNetwork::justListen()
{
bi::tcp::endpoint ep(bi::tcp::v4(), 30303);
while (true)
{
PeerConnection::pointer newConnection = PeerConnection::create(m_acceptor.get_io_service(), m_networkId);
cout << "Accepting incoming connections..." << endl;
try
{
m_acceptor.accept(newConnection->socket(), ep);
newConnection->start();
}
catch (exception& _e)
{
return;
}
}
}
#endif

174
libethereum/PeerNetwork.h

@ -21,7 +21,15 @@
#pragma once
#include <memory>
#include <utility>
#include <boost/asio.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <thread>
#include "RLP.h"
#include "Common.h"
namespace ba = boost::asio;
namespace bi = boost::asio::ip;
namespace eth
{
@ -29,19 +37,150 @@ namespace eth
class BlockChain;
class TransactionQueue;
class PeerNetwork
enum PacketType
{
Hello = 0,
Disconnect,
Ping,
Pong,
GetPeers = 0x10,
Peers,
Transactions,
Blocks,
GetChain
};
class PeerSession: public std::enable_shared_from_this<PeerSession>
{
public:
PeerSession(bi::tcp::socket _socket, uint _rNId): m_socket(std::move(_socket)), m_reqNetworkId(_rNId)
{
}
void start()
{
doRead();
}
bool interpret(RLP const& _r)
{
switch (_r[0].toInt<unsigned>())
{
case Hello:
m_protocolVersion = _r[1].toInt<uint>();
m_networkId = _r[2].toInt<uint>();
m_clientVersion = _r[3].toString();
if (m_protocolVersion != 0 || m_networkId != m_reqNetworkId)
{
disconnect();
return false;
}
break;
}
return true;
}
void disconnect()
{
RLPStream s;
prep(s);
s.appendList(1) << Disconnect;
sealAndSend(s);
sleep(1);
}
private:
void doRead()
{
auto self(shared_from_this());
m_socket.async_read_some(boost::asio::buffer(m_data), [this, self](boost::system::error_code ec, std::size_t length)
{
if (!ec)
doWrite(length);
});
}
void doWrite(std::size_t length)
{
auto self(shared_from_this());
boost::asio::async_write(m_socket, boost::asio::buffer(m_data, length), [this, self](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec)
doRead();
});
}
void prep(RLPStream& _s)
{
_s.appendRaw(bytes(8, 0));
}
void sealAndSend(RLPStream& _s)
{
bytes b;
_s.swapOut(b);
b[0] = 0x22;
b[1] = 0x40;
b[2] = 0x08;
b[3] = 0x91;
uint32_t len = b.size() - 8;
b[4] = len >> 24;
b[5] = len >> 16;
b[6] = len >> 8;
b[7] = len;
send(b);
}
void send(bytes& _msg)
{
bytes* buffer = new bytes;
swap(*buffer, _msg);
ba::async_write(m_socket, ba::buffer(*buffer), [&](boost::system::error_code ec, std::size_t length)
{
if (!ec)
// Callback for how the write went. For now, just kill the buffer.
delete buffer;
});
}
bi::tcp::socket m_socket;
std::array<byte, 1024> m_data;
bytes m_incoming;
std::string m_clientVersion;
uint m_protocolVersion;
uint m_networkId;
uint m_reqNetworkId;
};
class PeerServer
{
public:
PeerNetwork();
~PeerNetwork();
PeerServer(uint _networkId, short _port):
m_acceptor(m_ioService, bi::tcp::endpoint(bi::tcp::v4(), _port)),
m_socket(m_ioService),
m_requiredNetworkId(_networkId)
{
doAccept();
}
PeerServer(uint _networkId):
m_acceptor(m_ioService, bi::tcp::endpoint(bi::tcp::v4(), 0)),
m_socket(m_ioService),
m_requiredNetworkId(_networkId)
{
}
/// Conduct I/O, polling, syncing, whatever.
/// Ideally all time-consuming I/O is done in a background thread, but you get this call every 100ms or so anyway.
void process();
void run() { m_ioService.run(); }
void process() { m_ioService.poll(); }
bool connect(std::string const& _addr = "127.0.0.1", uint _port = 30303);
/// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network.
void sync(BlockChain& _bc, TransactionQueue const&);
void sync(BlockChain& _bc, TransactionQueue const&) {}
/// Get an incoming transaction from the queue. @returns bytes() if nothing waiting.
bytes const& incomingTransaction() { return NullBytes; }
@ -49,8 +188,27 @@ public:
void popIncomingTransaction() {}
private:
};
void doAccept()
{
m_acceptor.async_accept(m_socket, [&](boost::system::error_code ec)
{
if (!ec)
{
auto p = std::make_shared<PeerSession>(std::move(m_socket), m_requiredNetworkId);
m_peers.push_back(p);
p->start();
}
doAccept();
});
}
}
ba::io_service m_ioService;
bi::tcp::acceptor m_acceptor;
bi::tcp::socket m_socket;
uint m_requiredNetworkId;
std::vector<std::weak_ptr<PeerSession>> m_peers;
};
}

8
test/main.cpp

@ -28,15 +28,17 @@ int daggerTest();
int cryptoTest();
int stateTest();
int hexPrefixTest();
int peerTest(int argc, char** argv);
int main()
int main(int argc, char** argv)
{
// hexPrefixTest();
// rlpTest();
// trieTest();
trieTest();
// daggerTest();
// cryptoTest();
stateTest();
// stateTest();
peerTest(argc, argv);
return 0;
}

99
test/peer.cpp

@ -0,0 +1,99 @@
/*
This file is part of cpp-ethereum.
cpp-ethereum is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
Foobar is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with Foobar. If not, see <http://www.gnu.org/licenses/>.
*/
/** @file peer.cpp
* @author Gav Wood <i@gavwood.com>
* @date 2014
* Peer Network test functions.
*/
#include <PeerNetwork.h>
using namespace std;
using namespace eth;
using boost::asio::ip::tcp;
int peerTest(int argc, char** argv)
{
int port = 30303;
PeerServer s(0, port);
s.run();
/*
if (argc == 1)
{
boost::asio::io_service io_service;
tcp::acceptor acceptor_(io_service, tcp::endpoint(tcp::v4(), port));
tcp::socket socket_(io_service);
function<void()> do_accept;
do_accept = [&]()
{
acceptor_.async_accept(socket_, [&](boost::system::error_code ec)
{
if (!ec)
{
auto s = move(socket_);
enum { max_length = 1024 };
char data_[max_length];
function<void()> do_read;
do_read = [&]()
{
s.async_read_some(boost::asio::buffer(data_, max_length), [&](boost::system::error_code ec, std::size_t length)
{
if (!ec)
boost::asio::async_write(s, boost::asio::buffer(data_, length), [&](boost::system::error_code ec, std::size_t)
{
if (!ec)
do_read();
});
});
};
}
do_accept();
});
};
io_service.run();
}
else
{
}*/
/* if (argc == 1)
{
PeerNetwork pn(0, 30303);
while (true)
{
usleep(100000);
pn.process();
}
}
else
{
PeerNetwork pn(0);
if (pn.connect("127.0.0.1", 30303))
cout << "CONNECTED" << endl;
while (true)
{
usleep(100000);
pn.process();
}
}*/
return 0;
}

2
test/trie.cpp

@ -121,6 +121,7 @@ int trieTest()
assert(d.root() == hash256(s));
for (auto const& i: s)
{
(void)i;
assert(t.at(i.first) == i.second);
assert(d.at(i.first) == i.second);
}
@ -145,6 +146,7 @@ int trieTest()
assert(d.root() == hash256(s));
for (auto const& i: s)
{
(void)i;
assert(t.at(i.first) == i.second);
assert(d.at(i.first) == i.second);
}

Loading…
Cancel
Save