diff --git a/eth/main.cpp b/eth/main.cpp index c7a549a84..37c5fa38c 100644 --- a/eth/main.cpp +++ b/eth/main.cpp @@ -43,7 +43,7 @@ int main() s.sync(bc); s.sync(tq); - PeerNetwork net; // TODO: Implement - should run in background and send us events when blocks found and allow us to send blocks as required. + PeerServer net(0, 30303); // TODO: Implement - should run in background and send us events when blocks found and allow us to send blocks as required. while (true) { // Process network events. diff --git a/libethereum/PeerNetwork.cpp b/libethereum/PeerNetwork.cpp index 47c43af9c..15fe325f4 100644 --- a/libethereum/PeerNetwork.cpp +++ b/libethereum/PeerNetwork.cpp @@ -24,16 +24,104 @@ using namespace std; using namespace eth; -PeerNetwork::PeerNetwork() +bool PeerServer::connect(string const& _addr, uint _port) { + bi::tcp::resolver resolver(m_ioService); + cout << "Attempting connection to " << _addr << " @" << dec << _port << endl; + try + { + bi::tcp::socket s(m_ioService); + boost::asio::connect(s, resolver.resolve({ _addr, toString(_port) })); + auto p = make_shared(std::move(s), m_requiredNetworkId); + m_peers.push_back(p); + p->start(); + return true; + } + catch (exception& _e) + { + cout << "Connection refused (" << _e.what() << ")" << endl; + return false; + } +} + +#if 0 +void PeerConnection::start() +{ + cout << "Connected." << endl; + RLPStream s; + prep(s); + s.appendList(4) << (uint)Hello << (uint)0 << (uint)0 << "Ethereum++/0.1.0"; + sealAndSend(s); + handleRead(); +} + +void PeerConnection::handleRead() +{ + m_socket.async_read_some(boost::asio::buffer(m_buffer), [&](boost::system::error_code ec, std::size_t length) + { + if (ec) + return; // bomb out on error. + std::cout << "Got data" << std::endl; + m_incoming.resize(m_incoming.size() + length); + memcpy(m_incoming.data() + m_incoming.size() - length, m_buffer.data(), length); + while (m_incoming.size() > 8) + { + uint32_t len = fromBigEndian(bytesConstRef(m_incoming.data() + 4, 4)); + if (m_incoming.size() - 8 >= len) + { + // enough has come in. + RLP r(bytesConstRef(m_incoming.data() + 8, len)); + if (!interpret(r)) + // error + break; + memmove(m_incoming.data(), m_incoming.data() + len + 8, m_incoming.size() - (len + 8)); + m_incoming.resize(m_incoming.size() - (len + 8)); + } + else + break; + } + handleRead(); + }); +} + +PeerNetwork::PeerNetwork(uint _networkId): + m_ioService(), + m_acceptor(m_ioService, bi::tcp::endpoint(bi::tcp::v4(), 0)), + m_networkId(_networkId) +{ + m_ioService.run(); +} + +PeerNetwork::PeerNetwork(uint _networkId, uint _listenPort): + m_ioService(), + m_acceptor(m_ioService, bi::tcp::endpoint(bi::tcp::v4(), _listenPort)), + m_networkId(_networkId) +{ + try + { + start(); + } + catch (exception& _e) + { + cerr << "Network error: " << _e.what() << endl; + exit(1); + } } PeerNetwork::~PeerNetwork() { } +bool PeerNetwork::connect(std::string const& _addr, uint _port) +{ + cout << "Connecting to " << _addr << " @" << dec << _port << endl; + PeerConnection::pointer newConnection = PeerConnection::create(m_acceptor.get_io_service(), m_networkId); + return newConnection->connect(_addr, _port, m_ioService); +} + void PeerNetwork::process() { + m_ioService.run(); } void PeerNetwork::sync(BlockChain& _bc, TransactionQueue const& _tq) @@ -49,3 +137,28 @@ void PeerNetwork::sync(BlockChain& _bc, TransactionQueue const& _tq) } */ } + +void PeerNetwork::start() +{ + m_listener = new thread([&](){ justListen(); }); +} + +void PeerNetwork::justListen() +{ + bi::tcp::endpoint ep(bi::tcp::v4(), 30303); + while (true) + { + PeerConnection::pointer newConnection = PeerConnection::create(m_acceptor.get_io_service(), m_networkId); + cout << "Accepting incoming connections..." << endl; + try + { + m_acceptor.accept(newConnection->socket(), ep); + newConnection->start(); + } + catch (exception& _e) + { + return; + } + } +} +#endif diff --git a/libethereum/PeerNetwork.h b/libethereum/PeerNetwork.h index 9ef847b72..c37cb0909 100644 --- a/libethereum/PeerNetwork.h +++ b/libethereum/PeerNetwork.h @@ -21,7 +21,15 @@ #pragma once +#include +#include +#include +#include +#include +#include "RLP.h" #include "Common.h" +namespace ba = boost::asio; +namespace bi = boost::asio::ip; namespace eth { @@ -29,19 +37,150 @@ namespace eth class BlockChain; class TransactionQueue; -class PeerNetwork +enum PacketType +{ + Hello = 0, + Disconnect, + Ping, + Pong, + GetPeers = 0x10, + Peers, + Transactions, + Blocks, + GetChain +}; + +class PeerSession: public std::enable_shared_from_this +{ +public: + PeerSession(bi::tcp::socket _socket, uint _rNId): m_socket(std::move(_socket)), m_reqNetworkId(_rNId) + { + } + + void start() + { + doRead(); + } + + bool interpret(RLP const& _r) + { + switch (_r[0].toInt()) + { + case Hello: + m_protocolVersion = _r[1].toInt(); + m_networkId = _r[2].toInt(); + m_clientVersion = _r[3].toString(); + if (m_protocolVersion != 0 || m_networkId != m_reqNetworkId) + { + disconnect(); + return false; + } + break; + } + return true; + } + + void disconnect() + { + RLPStream s; + prep(s); + s.appendList(1) << Disconnect; + sealAndSend(s); + sleep(1); + } + +private: + void doRead() + { + auto self(shared_from_this()); + m_socket.async_read_some(boost::asio::buffer(m_data), [this, self](boost::system::error_code ec, std::size_t length) + { + if (!ec) + doWrite(length); + }); + } + + void doWrite(std::size_t length) + { + auto self(shared_from_this()); + boost::asio::async_write(m_socket, boost::asio::buffer(m_data, length), [this, self](boost::system::error_code ec, std::size_t /*length*/) + { + if (!ec) + doRead(); + }); + } + + void prep(RLPStream& _s) + { + _s.appendRaw(bytes(8, 0)); + } + + void sealAndSend(RLPStream& _s) + { + bytes b; + _s.swapOut(b); + b[0] = 0x22; + b[1] = 0x40; + b[2] = 0x08; + b[3] = 0x91; + uint32_t len = b.size() - 8; + b[4] = len >> 24; + b[5] = len >> 16; + b[6] = len >> 8; + b[7] = len; + send(b); + } + + void send(bytes& _msg) + { + bytes* buffer = new bytes; + swap(*buffer, _msg); + ba::async_write(m_socket, ba::buffer(*buffer), [&](boost::system::error_code ec, std::size_t length) + { + if (!ec) + // Callback for how the write went. For now, just kill the buffer. + delete buffer; + }); + } + + bi::tcp::socket m_socket; + std::array m_data; + + bytes m_incoming; + std::string m_clientVersion; + uint m_protocolVersion; + uint m_networkId; + uint m_reqNetworkId; +}; + +class PeerServer { public: - PeerNetwork(); - ~PeerNetwork(); + PeerServer(uint _networkId, short _port): + m_acceptor(m_ioService, bi::tcp::endpoint(bi::tcp::v4(), _port)), + m_socket(m_ioService), + m_requiredNetworkId(_networkId) + { + doAccept(); + } + + PeerServer(uint _networkId): + m_acceptor(m_ioService, bi::tcp::endpoint(bi::tcp::v4(), 0)), + m_socket(m_ioService), + m_requiredNetworkId(_networkId) + { + } /// Conduct I/O, polling, syncing, whatever. /// Ideally all time-consuming I/O is done in a background thread, but you get this call every 100ms or so anyway. - void process(); + void run() { m_ioService.run(); } + void process() { m_ioService.poll(); } + + bool connect(std::string const& _addr = "127.0.0.1", uint _port = 30303); /// Sync with the BlockChain. It might contain one of our mined blocks, we might have new candidates from the network. - void sync(BlockChain& _bc, TransactionQueue const&); - + void sync(BlockChain& _bc, TransactionQueue const&) {} + /// Get an incoming transaction from the queue. @returns bytes() if nothing waiting. bytes const& incomingTransaction() { return NullBytes; } @@ -49,8 +188,27 @@ public: void popIncomingTransaction() {} private: -}; + void doAccept() + { + m_acceptor.async_accept(m_socket, [&](boost::system::error_code ec) + { + if (!ec) + { + auto p = std::make_shared(std::move(m_socket), m_requiredNetworkId); + m_peers.push_back(p); + p->start(); + } + doAccept(); + }); + } -} + ba::io_service m_ioService; + bi::tcp::acceptor m_acceptor; + bi::tcp::socket m_socket; + + uint m_requiredNetworkId; + std::vector> m_peers; +}; +} diff --git a/test/main.cpp b/test/main.cpp index e082b6cac..2ef451662 100644 --- a/test/main.cpp +++ b/test/main.cpp @@ -28,15 +28,17 @@ int daggerTest(); int cryptoTest(); int stateTest(); int hexPrefixTest(); +int peerTest(int argc, char** argv); -int main() +int main(int argc, char** argv) { // hexPrefixTest(); // rlpTest(); -// trieTest(); + trieTest(); // daggerTest(); // cryptoTest(); - stateTest(); +// stateTest(); + peerTest(argc, argv); return 0; } diff --git a/test/peer.cpp b/test/peer.cpp new file mode 100644 index 000000000..667f132f3 --- /dev/null +++ b/test/peer.cpp @@ -0,0 +1,99 @@ +/* + This file is part of cpp-ethereum. + + cpp-ethereum is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + Foobar is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with Foobar. If not, see . +*/ +/** @file peer.cpp + * @author Gav Wood + * @date 2014 + * Peer Network test functions. + */ + +#include +using namespace std; +using namespace eth; +using boost::asio::ip::tcp; + +int peerTest(int argc, char** argv) +{ + int port = 30303; + PeerServer s(0, port); + s.run(); + /* + if (argc == 1) + { + boost::asio::io_service io_service; + tcp::acceptor acceptor_(io_service, tcp::endpoint(tcp::v4(), port)); + tcp::socket socket_(io_service); + function do_accept; + do_accept = [&]() + { + acceptor_.async_accept(socket_, [&](boost::system::error_code ec) + { + if (!ec) + { + auto s = move(socket_); + enum { max_length = 1024 }; + char data_[max_length]; + + function do_read; + do_read = [&]() + { + s.async_read_some(boost::asio::buffer(data_, max_length), [&](boost::system::error_code ec, std::size_t length) + { + if (!ec) + boost::asio::async_write(s, boost::asio::buffer(data_, length), [&](boost::system::error_code ec, std::size_t) + { + if (!ec) + do_read(); + }); + }); + }; + } + do_accept(); + }); + }; + io_service.run(); + } + else + { + + }*/ + + + +/* if (argc == 1) + { + PeerNetwork pn(0, 30303); + while (true) + { + usleep(100000); + pn.process(); + } + } + else + { + PeerNetwork pn(0); + if (pn.connect("127.0.0.1", 30303)) + cout << "CONNECTED" << endl; + while (true) + { + usleep(100000); + pn.process(); + } + }*/ + + return 0; +} + diff --git a/test/trie.cpp b/test/trie.cpp index bb41c12cf..8fe596a7f 100644 --- a/test/trie.cpp +++ b/test/trie.cpp @@ -121,6 +121,7 @@ int trieTest() assert(d.root() == hash256(s)); for (auto const& i: s) { + (void)i; assert(t.at(i.first) == i.second); assert(d.at(i.first) == i.second); } @@ -145,6 +146,7 @@ int trieTest() assert(d.root() == hash256(s)); for (auto const& i: s) { + (void)i; assert(t.at(i.first) == i.second); assert(d.at(i.first) == i.second); }