Browse Source

socket is created in disconnected state. socket can't be created in open state because shared_ptr methods aren't available to doRead until after class has been constructed and the socket is dependent on ioservice running.

cl-refactor
subtly 10 years ago
parent
commit
dd4c7152b4
  1. 32
      libp2p/UDP.h

32
libp2p/UDP.h

@ -57,17 +57,17 @@ struct UDPSocketEvents
/** /**
* @brief UDP Interface * @brief UDP Interface
* Handler must implement UDPSocketEvents. S is maximum data size (bytes) of UDP datagram. * Handler must implement UDPSocketEvents.
*/ */
template <typename Handler, unsigned S> template <typename Handler, unsigned MaxDatagramSize>
class UDPSocket: UDPSocketFace, public std::enable_shared_from_this<UDPSocket<Handler, S>> class UDPSocket: UDPSocketFace, public std::enable_shared_from_this<UDPSocket<Handler, MaxDatagramSize>>
{ {
public: public:
static constexpr unsigned datagramSize = S; static constexpr unsigned maxDatagramSize = MaxDatagramSize;
static_assert(datagramSize < 65507, "UDP datagrams cannot be larger than 65507 bytes"); static_assert(maxDatagramSize < 65507, "UDP datagrams cannot be larger than 65507 bytes");
/// Construct open socket to endpoint. /// Construct open socket to endpoint.
UDPSocket(ba::io_service& _io, UDPSocketEvents& _host, unsigned _port): m_host(_host), m_socket(_io, bi::udp::endpoint(bi::udp::v4(), _port)) { m_started.store(false); m_closed.store(true); }; UDPSocket(ba::io_service& _io, UDPSocketEvents& _host, unsigned _port): m_host(_host), m_endpoint(bi::udp::v4(), _port), m_socket(_io) { m_started.store(false); m_closed.store(true); };
virtual ~UDPSocket() { disconnect(); } virtual ~UDPSocket() { disconnect(); }
/// Socket will begin listening for and delivering packets /// Socket will begin listening for and delivering packets
@ -77,13 +77,16 @@ public:
if (!m_started.compare_exchange_strong(expect, true)) if (!m_started.compare_exchange_strong(expect, true))
return; return;
m_socket.open(bi::udp::v4());
m_socket.bind(m_endpoint);
m_closed = false; m_closed = false;
doRead(); doRead();
} }
void send(UDPDatagram const& _datagram) void send(UDPDatagram const& _datagram)
{ {
if (!m_started) if (m_closed)
return; return;
Guard l(x_sendQ); Guard l(x_sendQ);
@ -92,12 +95,14 @@ public:
doWrite(); doWrite();
} }
bool isOpen() { return !m_closed; }
void disconnect() { disconnectWithError(boost::asio::error::connection_reset); } void disconnect() { disconnectWithError(boost::asio::error::connection_reset); }
protected: protected:
void doRead() void doRead()
{ {
auto self(UDPSocket<Handler, S>::shared_from_this()); auto self(UDPSocket<Handler, MaxDatagramSize>::shared_from_this());
m_socket.async_receive_from(boost::asio::buffer(recvData), recvEndpoint, [this, self](boost::system::error_code _ec, size_t _len) m_socket.async_receive_from(boost::asio::buffer(recvData), recvEndpoint, [this, self](boost::system::error_code _ec, size_t _len)
{ {
if (_ec) if (_ec)
@ -113,7 +118,7 @@ protected:
void doWrite() void doWrite()
{ {
const UDPDatagram& datagram = sendQ[0]; const UDPDatagram& datagram = sendQ[0];
auto self(UDPSocket<Handler, S>::shared_from_this()); auto self(UDPSocket<Handler, MaxDatagramSize>::shared_from_this());
m_socket.async_send_to(boost::asio::buffer(datagram.data), datagram.to, [this, self](boost::system::error_code _ec, std::size_t) m_socket.async_send_to(boost::asio::buffer(datagram.data), datagram.to, [this, self](boost::system::error_code _ec, std::size_t)
{ {
if (_ec) if (_ec)
@ -132,7 +137,7 @@ protected:
void disconnectWithError(boost::system::error_code _ec) void disconnectWithError(boost::system::error_code _ec)
{ {
// If !started and already stopped, shutdown has already occured. (EOF or Operation canceled) // If !started and already stopped, shutdown has already occured. (EOF or Operation canceled)
if (!m_started && m_closed) if (!m_started && m_closed && !m_socket.is_open())
return; return;
assert(_ec); assert(_ec);
@ -146,8 +151,8 @@ protected:
// TODO: (if non-zero error) schedule high-priority writes // TODO: (if non-zero error) schedule high-priority writes
// prevent concurrent disconnect // prevent concurrent disconnect
bool yes = true; bool expected = true;
if (!m_started.compare_exchange_strong(yes, false)) if (!m_started.compare_exchange_strong(expected, false))
return; return;
// set m_closed to true to prevent undeliverable egress messages // set m_closed to true to prevent undeliverable egress messages
@ -170,10 +175,11 @@ protected:
std::atomic<bool> m_started; ///< Atomically ensure connection is started once. Start cannot occur unless m_started is false. Managed by start and disconnectWithError. std::atomic<bool> m_started; ///< Atomically ensure connection is started once. Start cannot occur unless m_started is false. Managed by start and disconnectWithError.
UDPSocketEvents& m_host; ///< Interface which owns this socket. UDPSocketEvents& m_host; ///< Interface which owns this socket.
bi::udp::endpoint m_endpoint; ///< Endpoint which we listen to.
Mutex x_sendQ; Mutex x_sendQ;
std::deque<UDPDatagram> sendQ; std::deque<UDPDatagram> sendQ;
std::array<byte, datagramSize> recvData; ///< Buffer for ingress datagrams. std::array<byte, maxDatagramSize> recvData; ///< Buffer for ingress datagrams.
bi::udp::endpoint recvEndpoint; ///< Endpoint data was received from. bi::udp::endpoint recvEndpoint; ///< Endpoint data was received from.
bi::udp::socket m_socket; bi::udp::socket m_socket;

Loading…
Cancel
Save