diff --git a/libp2p/UDP.h b/libp2p/UDP.h index 31ec170f6..ec491288b 100644 --- a/libp2p/UDP.h +++ b/libp2p/UDP.h @@ -57,17 +57,17 @@ struct UDPSocketEvents /** * @brief UDP Interface - * Handler must implement UDPSocketEvents. S is maximum data size (bytes) of UDP datagram. + * Handler must implement UDPSocketEvents. */ -template -class UDPSocket: UDPSocketFace, public std::enable_shared_from_this> +template +class UDPSocket: UDPSocketFace, public std::enable_shared_from_this> { public: - static constexpr unsigned datagramSize = S; - static_assert(datagramSize < 65507, "UDP datagrams cannot be larger than 65507 bytes"); + static constexpr unsigned maxDatagramSize = MaxDatagramSize; + static_assert(maxDatagramSize < 65507, "UDP datagrams cannot be larger than 65507 bytes"); /// Construct open socket to endpoint. - UDPSocket(ba::io_service& _io, UDPSocketEvents& _host, unsigned _port): m_host(_host), m_socket(_io, bi::udp::endpoint(bi::udp::v4(), _port)) { m_started.store(false); m_closed.store(true); }; + UDPSocket(ba::io_service& _io, UDPSocketEvents& _host, unsigned _port): m_host(_host), m_endpoint(bi::udp::v4(), _port), m_socket(_io) { m_started.store(false); m_closed.store(true); }; virtual ~UDPSocket() { disconnect(); } /// Socket will begin listening for and delivering packets @@ -76,6 +76,9 @@ public: bool expect = false; if (!m_started.compare_exchange_strong(expect, true)) return; + + m_socket.open(bi::udp::v4()); + m_socket.bind(m_endpoint); m_closed = false; doRead(); @@ -83,7 +86,7 @@ public: void send(UDPDatagram const& _datagram) { - if (!m_started) + if (m_closed) return; Guard l(x_sendQ); @@ -91,13 +94,15 @@ public: if (sendQ.size() == 1 && !m_closed.load()) doWrite(); } + + bool isOpen() { return !m_closed; } void disconnect() { disconnectWithError(boost::asio::error::connection_reset); } protected: void doRead() { - auto self(UDPSocket::shared_from_this()); + auto self(UDPSocket::shared_from_this()); m_socket.async_receive_from(boost::asio::buffer(recvData), recvEndpoint, [this, self](boost::system::error_code _ec, size_t _len) { if (_ec) @@ -113,7 +118,7 @@ protected: void doWrite() { const UDPDatagram& datagram = sendQ[0]; - auto self(UDPSocket::shared_from_this()); + auto self(UDPSocket::shared_from_this()); m_socket.async_send_to(boost::asio::buffer(datagram.data), datagram.to, [this, self](boost::system::error_code _ec, std::size_t) { if (_ec) @@ -132,7 +137,7 @@ protected: void disconnectWithError(boost::system::error_code _ec) { // If !started and already stopped, shutdown has already occured. (EOF or Operation canceled) - if (!m_started && m_closed) + if (!m_started && m_closed && !m_socket.is_open()) return; assert(_ec); @@ -146,8 +151,8 @@ protected: // TODO: (if non-zero error) schedule high-priority writes // prevent concurrent disconnect - bool yes = true; - if (!m_started.compare_exchange_strong(yes, false)) + bool expected = true; + if (!m_started.compare_exchange_strong(expected, false)) return; // set m_closed to true to prevent undeliverable egress messages @@ -170,10 +175,11 @@ protected: std::atomic m_started; ///< Atomically ensure connection is started once. Start cannot occur unless m_started is false. Managed by start and disconnectWithError. UDPSocketEvents& m_host; ///< Interface which owns this socket. + bi::udp::endpoint m_endpoint; ///< Endpoint which we listen to. Mutex x_sendQ; std::deque sendQ; - std::array recvData; ///< Buffer for ingress datagrams. + std::array recvData; ///< Buffer for ingress datagrams. bi::udp::endpoint recvEndpoint; ///< Endpoint data was received from. bi::udp::socket m_socket;