diff --git a/libp2p/Network.cpp b/libp2p/Network.cpp index 94413e93f..d0276d67e 100644 --- a/libp2p/Network.cpp +++ b/libp2p/Network.cpp @@ -38,12 +38,6 @@ using namespace std; using namespace dev; using namespace dev::p2p; -template -Socket::Socket(SocketEventFace* _seface): m_eventDelegate(_seface), m_socket(m_eventDelegate->ioService()) {} - -template -Socket::Socket(SocketEventFace* _seface, endpointType _endpoint): m_eventDelegate(_seface), m_socket(m_eventDelegate->ioService(), _endpoint) {} - std::vector Network::getInterfaceAddresses() { std::vector addresses; diff --git a/libp2p/Network.h b/libp2p/Network.h index 3f96fd457..aeeabf329 100644 --- a/libp2p/Network.h +++ b/libp2p/Network.h @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include "Common.h" @@ -46,98 +47,6 @@ struct NetworkPreferences bool localNetworking = false; }; -struct Packet -{ - bytes payload() const { return s.out(); } - - bool required = false; - RLPStream s; -}; - -class SocketFace -{ - virtual void send(Packet const& _msg) = 0; -}; -class SocketEventFace; - -/** - * @brief Generic Socket Interface - * Owners of sockets must outlive the socket. - * Boost ASIO uses lowercase template for udp/tcp, which is adopted here. - */ -template -class Socket: SocketFace, public std::enable_shared_from_this> -{ -public: - using socketType = typename T::socket; - using endpointType = typename T::endpoint; - Socket(SocketEventFace* _seface); - Socket(SocketEventFace* _seface, endpointType _endpoint); - -protected: - void send(Packet const& _msg) - { - if (!m_started) - return; - - Guard l(x_sendQ); - sendQ.push_back(_msg.payload()); - if (sendQ.size() == 1 && !m_stopped) - doWrite(); - } - - void doWrite() - { - const bytes& bytes = sendQ[0]; - auto self(Socket::shared_from_this()); -// boost::asio::async_write(m_socket, boost::asio::buffer(bytes), [this, self](boost::system::error_code _ec, std::size_t /*length*/) -// { -// if (_ec) -// return stopWithError(_ec); -// else -// { -// Guard l(x_sendQ); -// sendQ.pop_front(); -// if (sendQ.empty()) -// return; -// } -// doWrite(); -// }); - } - - void stopWithError(boost::system::error_code _ec); - - std::atomic m_stopped; ///< Set when connection is stopping or stopped. Handshake cannot occur unless m_stopped is true. - std::atomic m_started; ///< Atomically ensure connection is started once. Start cannot occur unless m_started is false. Managed by start() and shutdown(bool). - - SocketEventFace* m_eventDelegate = nullptr; - - Mutex x_sendQ; - std::deque sendQ; - bytes recvBuffer; - size_t recvdBytes = 0; - socketType m_socket; - - Mutex x_socketError; ///< Mutex for error which can occur from host or IO thread. - boost::system::error_code socketError; ///< Set when shut down due to error. -}; - -class SocketEventFace -{ -public: - virtual ba::io_service& ioService() = 0; - virtual void onStopped(SocketFace*) = 0; - virtual void onReceive(SocketFace*, Packet&) = 0; -}; - -struct UDPSocket: public Socket -{ - UDPSocket(ba::io_service& _io, unsigned _port): Socket(nullptr, bi::udp::endpoint(bi::udp::v4(), _port)) {} - ~UDPSocket() { boost::system::error_code ec; m_socket.shutdown(bi::udp::socket::shutdown_both, ec); m_socket.close(); } - -// bi::udp::socket m_socket; -}; - /** * @brief Network Class * Static network operations and interface(s). diff --git a/libp2p/UDP.cpp b/libp2p/UDP.cpp new file mode 100644 index 000000000..9fc65b7c3 --- /dev/null +++ b/libp2p/UDP.cpp @@ -0,0 +1,20 @@ +/* + This file is part of cpp-ethereum. + + cpp-ethereum is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + cpp-ethereum is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with cpp-ethereum. If not, see . + */ +/** @file UDP.cpp + * @author Alex Leverington + * @date 2014 + */ diff --git a/libp2p/UDP.h b/libp2p/UDP.h new file mode 100644 index 000000000..ff94df2ac --- /dev/null +++ b/libp2p/UDP.h @@ -0,0 +1,181 @@ +/* + This file is part of cpp-ethereum. + + cpp-ethereum is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + cpp-ethereum is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with cpp-ethereum. If not, see . + */ +/** @file UDP.h + * @author Alex Leverington + * @date 2014 + */ + +#pragma once + +#include +#include +#include +#include +#include +#include "Common.h" +namespace ba = boost::asio; +namespace bi = ba::ip; + +namespace dev +{ +namespace p2p +{ + +struct UDPDatagram +{ + bi::udp::endpoint to; + bytes data; +}; + +struct UDPSocketFace +{ + virtual void send(UDPDatagram const& _msg) = 0; + virtual void disconnect() = 0; +}; + +struct UDPSocketEvents +{ + virtual void onDisconnected(UDPSocketFace*) {}; + virtual void onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytesConstRef _packetData) = 0; +}; + +/** + * @brief UDP Interface + * Handler must implement UDPSocketEvents. S is maximum data size (bytes) of UDP datagram. + */ +template +class UDPSocket: UDPSocketFace, public std::enable_shared_from_this> +{ +public: + static constexpr unsigned datagramSize = S; + static_assert(datagramSize < 65507, "UDP datagrams cannot be larger than 65507 bytes"); + + /// Construct open socket to endpoint. + UDPSocket(ba::io_service& _io, Handler& _host, unsigned _port): m_host(_host), m_socket(_io, bi::udp::endpoint(bi::udp::v4(), _port)) {}; + virtual ~UDPSocket() { disconnect(); } + + void connect() + { + bool no = false; + if (!m_started.compare_exchange_strong(no, true)) + return; + m_closed = false; + doRead(); + } + + void send(UDPDatagram const& _datagram) + { + if (!m_started) + return; + + Guard l(x_sendQ); + sendQ.push_back(_datagram); + if (sendQ.size() == 1 && !m_closed) + doWrite(); + } + + void disconnect() { disconnectWithError(boost::asio::error::connection_reset); } + +protected: + void doRead() + { + auto self(UDPSocket::shared_from_this()); + m_socket.async_receive_from(boost::asio::buffer(recvData), recvEndpoint, [this, self](boost::system::error_code _ec, size_t _len) + { + if (_ec) + return disconnectWithError(_ec); + + assert(_len); + m_host.onReceived(this, recvEndpoint, bytesConstRef(recvData.data(), _len)); + if (!m_closed) + doRead(); + }); + } + + void doWrite() + { + const UDPDatagram& datagram = sendQ[0]; + auto self(UDPSocket::shared_from_this()); + m_socket.async_send_to(boost::asio::buffer(datagram.data), datagram.to, [this, self](boost::system::error_code _ec, std::size_t) + { + if (_ec) + return disconnectWithError(_ec); + else + { + Guard l(x_sendQ); + sendQ.pop_front(); + if (sendQ.empty()) + return; + } + doWrite(); + }); + } + + void disconnectWithError(boost::system::error_code _ec) + { + // If !started and already stopped, shutdown has already occured. (EOF or Operation canceled) + if (!m_started && m_closed) + return; + + assert(_ec); + { + // disconnect-operation following prior non-zero errors are ignored + Guard l(x_socketError); + if (socketError != boost::system::error_code()) + return; + socketError = _ec; + } + // TODO: (if non-zero error) schedule high-priority writes + + // prevent concurrent disconnect + bool yes = true; + if (!m_started.compare_exchange_strong(yes, false)) + return; + + // set m_closed to true to prevent undeliverable egress messages + bool wasClosed = m_closed; + m_closed = true; + + // close sockets + boost::system::error_code ec; + m_socket.shutdown(bi::udp::socket::shutdown_both, ec); + m_socket.close(); + + // socket never started if it never left stopped-state (pre-handshake) + if (wasClosed) + return; + + m_host.onDisconnected(this); + } + + std::atomic m_closed; ///< Set when connection is stopping or stopped. Handshake cannot occur unless m_closed is true. + std::atomic m_started; ///< Atomically ensure connection is started once. Start cannot occur unless m_started is false. Managed by start and disconnectWithError. + + Handler& m_host; ///< Interface which owns this socket. + + Mutex x_sendQ; + std::deque sendQ; + std::array recvData; ///< Buffer for ingress datagrams. + bi::udp::endpoint recvEndpoint; ///< Endpoint data was received from. + bi::udp::socket m_socket; + + Mutex x_socketError; ///< Mutex for error which can occur from host or IO thread. + boost::system::error_code socketError; ///< Set when shut down due to error. +}; + +} +} \ No newline at end of file diff --git a/test/kademlia.cpp b/test/kademlia.cpp index 21c28cb87..a9d7701cf 100644 --- a/test/kademlia.cpp +++ b/test/kademlia.cpp @@ -17,17 +17,5 @@ /** @file kademlia.cpp * @author Alex Leverington * @date 2014 - * Basic networking tests */ -#include -#include -using namespace std; -using namespace dev; -using namespace dev::p2p; - -BOOST_AUTO_TEST_CASE(host_listen_udp4) -{ - -} - diff --git a/test/net.cpp b/test/net.cpp index acdd649d9..e52654411 100644 --- a/test/net.cpp +++ b/test/net.cpp @@ -14,42 +14,76 @@ You should have received a copy of the GNU General Public License along with cpp-ethereum. If not, see . */ -/** @file network.cpp - * @author Marko Simovic +/** @file net.cpp + * @author Alex Leverington * @date 2014 - * Basic networking tests */ #include -#include -#include -#include -#include -#include "TestHelper.h" +#include +#include using namespace std; using namespace dev; -using namespace dev::eth; +using namespace dev::p2p; +namespace ba = boost::asio; +namespace bi = ba::ip; -// Disabled since tests shouldn't block (not the worst offender, but timeout should be reduced anyway). -/* -BOOST_AUTO_TEST_CASE(listen_port_busy) +class TestA: UDPSocketEvents, public Worker { - short port = 20000; +public: + TestA(): Worker("test",0), m_io(), m_socket(new UDPSocket(m_io, *this, 30300)) {} + ~TestA() { m_io.stop(); stopWorking(); } + + void start() { startWorking(); } + void doWork() { m_io.run(); } + + void onDisconnected(UDPSocketFace*) {}; + void onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytesConstRef _packet) { if(_packet.toString() == "AAAA") success = true; }; - //make use of the port ahead of our client - ba::io_service ioService; - bi::tcp::endpoint endPoint(bi::tcp::v4(), port); - bi::tcp::acceptor acceptor(ioService, endPoint); - acceptor.listen(10); + ba::io_service m_io; + shared_ptr> m_socket; + + bool success = false; +}; - //prepare client and try to listen on same, used, port - Client c1("TestClient1", KeyPair::create().address(), - (boost::filesystem::temp_directory_path() / boost::filesystem::unique_path()).string()); +//struct TestBProtocol: UDPSocketEvents +//{ +// void onDisconnected(UDPSocketFace*) {}; +// void onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytesConstRef _packet) { cout << "received TestBProtocol" << endl; }; +//}; +// +//class TestB: TestBProtocol +//{ +//public: +// TestB(): m_io(), m_socket(m_io, *this, 30300) {} +////private: +// ba::io_service m_io; +// UDPSocket m_socket; +//}; +// +//class TestC +//{ +//public: +// TestC(): m_io(), m_socket(m_io, m_rpc, 30300) {} +////private: +// ba::io_service m_io; +// TestBProtocol m_rpc; +// UDPSocket m_socket; +//}; - c1.startNetwork(port); +BOOST_AUTO_TEST_SUITE(p2p) - BOOST_REQUIRE(c1.haveNetwork()); - BOOST_REQUIRE(c1.peerServer()->listenPort() != 0); - BOOST_REQUIRE(c1.peerServer()->listenPort() != port); +BOOST_AUTO_TEST_CASE(test) +{ + UDPDatagram d; + d.to = boost::asio::ip::udp::endpoint(boost::asio::ip::address::from_string("127.0.0.1"), 30300); + d.data = bytes({65,65,65,65}); + + TestA a; a.start(); a.m_socket->connect(); + a.m_socket->send(d); + sleep(1); + BOOST_REQUIRE_EQUAL(true, a.success); } -*/ + +BOOST_AUTO_TEST_SUITE_END() +