/* Copyright (c) 2013 Martin Sustrik All rights reserved. Copyright (c) 2013 GoPivotal, Inc. All rights reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #include "worker.h" #include "../utils/err.h" #include "../utils/cont.h" #include "../utils/alloc.h" #include #include #include #define NN_USOCK_STATE_IDLE 1 #define NN_USOCK_STATE_STARTING 2 #define NN_USOCK_STATE_BEING_ACCEPTED 3 #define NN_USOCK_STATE_ACCEPTED 4 #define NN_USOCK_STATE_CONNECTING 5 #define NN_USOCK_STATE_ACTIVE 6 #define NN_USOCK_STATE_CANCELLING_IO 7 #define NN_USOCK_STATE_DONE 8 #define NN_USOCK_STATE_LISTENING 9 #define NN_USOCK_STATE_ACCEPTING 10 #define NN_USOCK_STATE_CANCELLING 11 #define NN_USOCK_STATE_STOPPING 12 #define NN_USOCK_STATE_STOPPING_ACCEPT 13 #define NN_USOCK_ACTION_ACCEPT 1 #define NN_USOCK_ACTION_BEING_ACCEPTED 2 #define NN_USOCK_ACTION_CANCEL 3 #define NN_USOCK_ACTION_LISTEN 4 #define NN_USOCK_ACTION_CONNECT 5 #define NN_USOCK_ACTION_ACTIVATE 6 #define NN_USOCK_ACTION_DONE 7 #define NN_USOCK_ACTION_ERROR 8 #define NN_USOCK_SRC_IN 1 #define NN_USOCK_SRC_OUT 2 /* Private functions. */ static void nn_usock_handler (struct nn_fsm *self, int src, int type, void *srcptr); static void nn_usock_shutdown (struct nn_fsm *self, int src, int type, void *srcptr); static int nn_usock_cancel_io (struct nn_usock *self); static void nn_usock_create_io_completion (struct nn_usock *self); DWORD nn_usock_open_pipe (struct nn_usock *self, const char *name); void nn_usock_accept_pipe (struct nn_usock *self, struct nn_usock *listener); void nn_usock_init (struct nn_usock *self, int src, struct nn_fsm *owner) { nn_fsm_init (&self->fsm, nn_usock_handler, nn_usock_shutdown, src, self, owner); self->state = NN_USOCK_STATE_IDLE; self->s = INVALID_SOCKET; self->isaccepted = 0; nn_worker_op_init (&self->in, NN_USOCK_SRC_IN, &self->fsm); nn_worker_op_init (&self->out, NN_USOCK_SRC_OUT, &self->fsm); self->domain = -1; self->type = -1; self->protocol = -1; /* Intialise events raised by usock. */ nn_fsm_event_init (&self->event_established); nn_fsm_event_init (&self->event_sent); nn_fsm_event_init (&self->event_received); nn_fsm_event_init (&self->event_error); /* No accepting is going on at the moment. */ self->asock = NULL; self->ainfo = NULL; /* NamedPipe-related stuff. */ memset (&self->pipename, 0, sizeof (self->pipename)); self->pipesendbuf = NULL; } void nn_usock_term (struct nn_usock *self) { nn_assert_state (self, NN_USOCK_STATE_IDLE); if (self->ainfo) nn_free (self->ainfo); if (self->pipesendbuf) nn_free (self->pipesendbuf); nn_fsm_event_term (&self->event_error); nn_fsm_event_term (&self->event_received); nn_fsm_event_term (&self->event_sent); nn_fsm_event_term (&self->event_established); nn_worker_op_term (&self->out); nn_worker_op_term (&self->in); nn_fsm_term (&self->fsm); } int nn_usock_isidle (struct nn_usock *self) { return nn_fsm_isidle (&self->fsm); } int nn_usock_start (struct nn_usock *self, int domain, int type, int protocol) { int rc; #if defined IPV6_V6ONLY DWORD only; #endif #if defined HANDLE_FLAG_INHERIT BOOL brc; #endif printf("nn_usock_start protocol %d\n",protocol); /* NamedPipes aren't sockets. They don't need all the socket initialisation stuff. */ if (domain != AF_UNIX) { /* Open the underlying socket. */ self->s = socket (domain, type, protocol); if (self->s == INVALID_SOCKET) return -nn_err_wsa_to_posix (WSAGetLastError ()); /* Disable inheriting the socket to the child processes. */ #if defined HANDLE_FLAG_INHERIT brc = SetHandleInformation (self->p, HANDLE_FLAG_INHERIT, 0); win_assert (brc); #endif /* IPv4 mapping for IPv6 sockets is disabled by default. Switch it on. */ #if defined IPV6_V6ONLY if (domain == AF_INET6) { only = 0; rc = setsockopt (self->s, IPPROTO_IPV6, IPV6_V6ONLY, (const char*) &only, sizeof (only)); wsa_assert (rc != SOCKET_ERROR); } #endif /* Associate the socket with a worker thread/completion port. */ nn_usock_create_io_completion (self); } /* Remember the type of the socket. */ self->domain = domain; self->type = type; self->protocol = protocol; /* Start the state machine. */ nn_fsm_start (&self->fsm); return 0; } void nn_usock_start_fd (struct nn_usock *self, int fd) { nn_assert (0); } void nn_usock_stop (struct nn_usock *self) { nn_fsm_stop (&self->fsm); } void nn_usock_swap_owner (struct nn_usock *self, struct nn_fsm_owner *owner) { nn_fsm_swap_owner (&self->fsm, owner); } int nn_usock_setsockopt (struct nn_usock *self, int level, int optname, const void *optval, size_t optlen) { int rc; /* NamedPipes aren't sockets. We can't set socket options on them. For now we'll ignore the options. */ if (self->domain == AF_UNIX) return 0; /* The socket can be modified only before it's active. */ nn_assert (self->state == NN_USOCK_STATE_STARTING || self->state == NN_USOCK_STATE_ACCEPTED); nn_assert (optlen < INT_MAX); rc = setsockopt (self->s, level, optname, (char*) optval, (int) optlen); if (nn_slow (rc == SOCKET_ERROR)) return -nn_err_wsa_to_posix (WSAGetLastError ()); return 0; } int nn_usock_bind (struct nn_usock *self, const struct sockaddr *addr, size_t addrlen) { int rc; ULONG opt; /* In the case of named pipes, let's save the address for the later use. */ if (self->domain == AF_UNIX) { if (addrlen > sizeof (struct sockaddr_un)) return -EINVAL; memcpy (&self->pipename, addr, addrlen); return 0; } /* You can set socket options only before the socket is connected. */ nn_assert_state (self, NN_USOCK_STATE_STARTING); /* On Windows, the bound port can be hijacked if SO_EXCLUSIVEADDRUSE is not set. */ opt = 1; rc = setsockopt (self->s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (const char*) &opt, sizeof (opt)); wsa_assert (rc != SOCKET_ERROR); nn_assert (addrlen < INT_MAX); rc = bind (self->s, addr, (int) addrlen); if (nn_slow (rc == SOCKET_ERROR)) return -nn_err_wsa_to_posix (WSAGetLastError ()); return 0; } int nn_usock_listen (struct nn_usock *self, int backlog) { int rc; /* You can start listening only before the socket is connected. */ nn_assert_state (self, NN_USOCK_STATE_STARTING); /* Start listening for incoming connections. NamedPipes are already created in the listening state, so no need to do anything here. */ if (self->domain != AF_UNIX) { rc = listen (self->s, backlog); if (nn_slow (rc == SOCKET_ERROR)) return -nn_err_wsa_to_posix (WSAGetLastError ()); } /* Notify the state machine. */ nn_fsm_action (&self->fsm, NN_USOCK_ACTION_LISTEN); return 0; } void nn_usock_accept (struct nn_usock *self, struct nn_usock *listener) { int rc; BOOL brc; DWORD nbytes; /* NamedPipes have their own accepting mechanism. */ if (listener->domain == AF_UNIX) { nn_usock_accept_pipe (self, listener); return; } rc = nn_usock_start (self, listener->domain, listener->type, listener->protocol); /* TODO: EMFILE can be returned here. */ errnum_assert (rc == 0, -rc); nn_fsm_action (&listener->fsm, NN_USOCK_ACTION_ACCEPT); nn_fsm_action (&self->fsm, NN_USOCK_ACTION_BEING_ACCEPTED); /* If the memory for accept information is not yet allocated, do so. */ if (!listener->ainfo) { listener->ainfo = nn_alloc (512, "accept info"); alloc_assert (listener->ainfo); } /* Wait for the incoming connection. */ memset (&listener->in.olpd, 0, sizeof (listener->in.olpd)); brc = AcceptEx (listener->s, self->s, listener->ainfo, 0, 256, 256, &nbytes, &listener->in.olpd); /* Immediate success. */ if (nn_fast (brc == TRUE)) { nn_fsm_action (&listener->fsm, NN_USOCK_ACTION_DONE); nn_fsm_action (&self->fsm, NN_USOCK_ACTION_DONE); return; } /* We don't expect a synchronous failure at this point. */ wsa_assert (nn_slow (WSAGetLastError () == WSA_IO_PENDING)); /* Pair the two sockets. */ nn_assert (!self->asock); self->asock = listener; nn_assert (!listener->asock); listener->asock = self; /* Asynchronous accept. */ nn_worker_op_start (&listener->in, 0); } void nn_usock_activate (struct nn_usock *self) { nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ACTIVATE); } void nn_usock_connect (struct nn_usock *self, const struct sockaddr *addr, size_t addrlen) { BOOL brc; const GUID fid = WSAID_CONNECTEX; LPFN_CONNECTEX pconnectex; DWORD nbytes; DWORD winerror; /* Fail if the socket is already connected, closed or such. */ nn_assert_state (self, NN_USOCK_STATE_STARTING); /* Notify the state machine that we've started connecting. */ nn_fsm_action (&self->fsm, NN_USOCK_ACTION_CONNECT); nn_assert(addrlen < INT_MAX); memset (&self->out.olpd, 0, sizeof (self->out.olpd)); if (self->domain == AF_UNIX) { winerror = nn_usock_open_pipe (self, ((struct sockaddr_un*) addr)->sun_path); } else { /* Get the pointer to connect function. */ brc = WSAIoctl(self->s, SIO_GET_EXTENSION_FUNCTION_POINTER, (void*)&fid, sizeof(fid), (void*)&pconnectex, sizeof(pconnectex), &nbytes, NULL, NULL) == 0; wsa_assert(brc == TRUE); nn_assert(nbytes == sizeof(pconnectex)); /* Connect itself. */ brc = pconnectex(self->s, (struct sockaddr*) addr, addrlen, NULL, 0, NULL, &self->out.olpd); winerror = brc ? ERROR_SUCCESS : WSAGetLastError(); } /* Immediate success. */ if (nn_fast (winerror == ERROR_SUCCESS)) { nn_fsm_action (&self->fsm, NN_USOCK_ACTION_DONE); return; } /* Immediate error. */ if (nn_slow (winerror != WSA_IO_PENDING)) { nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR); return; } /* Asynchronous connect. */ nn_worker_op_start (&self->out, 0); } void nn_usock_send (struct nn_usock *self, const struct nn_iovec *iov, int iovcnt) { int rc; BOOL brc; WSABUF wbuf [NN_USOCK_MAX_IOVCNT]; int i; size_t len; size_t idx; DWORD error; /* Make sure that the socket is actually alive. */ nn_assert_state (self, NN_USOCK_STATE_ACTIVE); /* Create a WinAPI-style iovec. */ len = 0; nn_assert (iovcnt <= NN_USOCK_MAX_IOVCNT); for (i = 0; i != iovcnt; ++i) { wbuf [i].buf = (char FAR*) iov [i].iov_base; wbuf [i].len = (u_long) iov [i].iov_len; len += iov [i].iov_len; } /* Start the send operation. */ memset (&self->out.olpd, 0, sizeof (self->out.olpd)); if (self->domain == AF_UNIX) { /* TODO: Do not copy the buffer, find an efficent way to Write multiple buffers that doesn't affect the state machine. */ nn_assert (!self->pipesendbuf); self->pipesendbuf = nn_alloc (len, "named pipe sendbuf"); idx = 0; for (i = 0; i != iovcnt; ++i) { memcpy ((char*)(self->pipesendbuf) + idx, iov [i].iov_base, iov [i].iov_len); idx += iov [i].iov_len; } brc = WriteFile (self->p, self->pipesendbuf, len, NULL, &self->out.olpd); if (nn_fast (brc || GetLastError() == ERROR_IO_PENDING)) { nn_worker_op_start (&self->out, 0); return; } error = GetLastError(); win_assert (error == ERROR_NO_DATA); self->errnum = EINVAL; nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR); return; } rc = WSASend (self->s, wbuf, iovcnt, NULL, 0, &self->out.olpd, NULL); if (nn_fast (rc == 0)) { nn_worker_op_start (&self->out, 0); return; } error = WSAGetLastError(); if (nn_fast (error == WSA_IO_PENDING)) { nn_worker_op_start (&self->out, 0); return; } wsa_assert (error == WSAECONNABORTED || error == WSAECONNRESET || error == WSAENETDOWN || error == WSAENETRESET || error == WSAENOBUFS || error == WSAEWOULDBLOCK); self->errnum = nn_err_wsa_to_posix (error); nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR); } void nn_usock_recv (struct nn_usock *self, void *buf, size_t len, int *fd) { int rc; BOOL brc; WSABUF wbuf; DWORD wflags; DWORD error; /* Passing file descriptors is not implemented on Windows platform. */ if (fd) *fd = -1; /* Make sure that the socket is actually alive. */ nn_assert_state (self, NN_USOCK_STATE_ACTIVE); /* Start the receive operation. */ wbuf.len = (u_long) len; wbuf.buf = (char FAR*) buf; wflags = MSG_WAITALL; memset (&self->in.olpd, 0, sizeof (self->in.olpd)); if (self->domain == AF_UNIX) { brc = ReadFile(self->p, buf, len, NULL, &self->in.olpd); error = brc ? ERROR_SUCCESS : GetLastError(); } else { rc = WSARecv (self->s, &wbuf, 1, NULL, &wflags, &self->in.olpd, NULL); error = (rc == 0) ? ERROR_SUCCESS : WSAGetLastError (); } if (nn_fast (error == ERROR_SUCCESS)) { nn_worker_op_start (&self->in, 1); return; } if (nn_fast (error == WSA_IO_PENDING)) { nn_worker_op_start (&self->in, 1); return; } if (error == WSAECONNABORTED || error == WSAECONNRESET || error == WSAENETDOWN || error == WSAENETRESET || error == WSAETIMEDOUT || error == WSAEWOULDBLOCK || error == ERROR_PIPE_NOT_CONNECTED || error == ERROR_BROKEN_PIPE) { nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR); return; } wsa_assert (0); } static void nn_usock_create_io_completion (struct nn_usock *self) { struct nn_worker *worker; HANDLE cp; /* Associate the socket with a worker thread/completion port. */ worker = nn_fsm_choose_worker (&self->fsm); cp = CreateIoCompletionPort ( self->p, nn_worker_getcp(worker), (ULONG_PTR) NULL, 0); nn_assert(cp); } static void nn_usock_create_pipe (struct nn_usock *self, const char *name) { char fullname [256]; /* First, create a fully qualified name for the named pipe. */ _snprintf(fullname, sizeof (fullname), "\\\\.\\pipe\\%s", name); /* TODO: Expose custom nOutBufferSize, nInBufferSize, nDefaultTimeOut, lpSecurityAttributes */ self->p = CreateNamedPipeA ( (char*) fullname, PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT | PIPE_REJECT_REMOTE_CLIENTS, PIPE_UNLIMITED_INSTANCES, 4096, 4096, 0, NULL); /* TODO: How to properly handle self->p == INVALID_HANDLE_VALUE? */ win_assert (self->p != INVALID_HANDLE_VALUE); self->isaccepted = 1; nn_usock_create_io_completion (self); } DWORD nn_usock_open_pipe (struct nn_usock *self, const char *name) { char fullname [256]; DWORD winerror; DWORD mode; BOOL brc; /* First, create a fully qualified name for the named pipe. */ _snprintf(fullname, sizeof (fullname), "\\\\.\\pipe\\%s", name); /* TODO: Expose a way to pass lpSecurityAttributes */ self->p = CreateFileA ( fullname, GENERIC_READ | GENERIC_WRITE, 0, NULL, OPEN_ALWAYS, FILE_FLAG_OVERLAPPED, NULL); if (self->p == INVALID_HANDLE_VALUE) return GetLastError (); mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT; brc = SetNamedPipeHandleState ( self->p, &mode, NULL, NULL); if (!brc) { CloseHandle (self->p); self->p = INVALID_HANDLE_VALUE; return GetLastError (); } self->isaccepted = 0; nn_usock_create_io_completion (self); winerror = GetLastError (); if (winerror != ERROR_SUCCESS && winerror != ERROR_ALREADY_EXISTS) return winerror; return ERROR_SUCCESS; } void nn_usock_accept_pipe (struct nn_usock *self, struct nn_usock *listener) { int rc; BOOL brc; DWORD winerror; /* TODO: EMFILE can be returned here. */ rc = nn_usock_start (self, listener->domain, listener->type, listener->protocol); errnum_assert(rc == 0, -rc); nn_fsm_action(&listener->fsm, NN_USOCK_ACTION_ACCEPT); nn_fsm_action(&self->fsm, NN_USOCK_ACTION_BEING_ACCEPTED); /* If the memory for accept information is not yet allocated, do so now. */ if (!listener->ainfo) { listener->ainfo = nn_alloc (512, "accept info"); alloc_assert (listener->ainfo); } /* Wait for the incoming connection. */ memset (&listener->in.olpd, 0, sizeof(listener->in.olpd)); nn_usock_create_pipe (self, listener->pipename.sun_path); brc = ConnectNamedPipe (self->p, (LPOVERLAPPED) &listener->in.olpd); /* TODO: Can this function possibly succeed? */ nn_assert (brc == 0); winerror = GetLastError(); /* Immediate success. */ if (nn_fast (winerror == ERROR_PIPE_CONNECTED)) { nn_fsm_action (&listener->fsm, NN_USOCK_ACTION_DONE); nn_fsm_action (&self->fsm, NN_USOCK_ACTION_DONE); return; } /* We don't expect a synchronous failure at this point. */ wsa_assert (nn_slow (winerror == WSA_IO_PENDING)); /* Pair the two sockets. */ nn_assert (!self->asock); self->asock = listener; nn_assert (!listener->asock); listener->asock = self; /* Asynchronous accept. */ nn_worker_op_start (&listener->in, 0); } static void nn_usock_close (struct nn_usock *self) { int rc; BOOL brc; if (self->domain == AF_UNIX) { if (self->p == INVALID_HANDLE_VALUE) return; if (self->isaccepted) DisconnectNamedPipe(self->p); brc = CloseHandle (self->p); self->p = INVALID_HANDLE_VALUE; win_assert (brc); } else { rc = closesocket (self->s); self->s = INVALID_SOCKET; wsa_assert (rc == 0); } } static void nn_usock_shutdown (struct nn_fsm *self, int src, int type, void *srcptr) { struct nn_usock *usock; usock = nn_cont (self, struct nn_usock, fsm); if (nn_slow (src == NN_FSM_ACTION && type == NN_FSM_STOP)) { /* Socket in ACCEPTING state cannot be closed. Stop the socket being accepted first. */ nn_assert (usock->state != NN_USOCK_STATE_ACCEPTING); /* Synchronous stop. */ if (usock->state == NN_USOCK_STATE_IDLE) goto finish3; if (usock->state == NN_USOCK_STATE_DONE) goto finish2; if (usock->state == NN_USOCK_STATE_STARTING || usock->state == NN_USOCK_STATE_ACCEPTED || usock->state == NN_USOCK_STATE_LISTENING) goto finish1; /* When socket that's being accepted is asked to stop, we have to ask the listener socket to stop accepting first. */ if (usock->state == NN_USOCK_STATE_BEING_ACCEPTED) { nn_fsm_action (&usock->asock->fsm, NN_USOCK_ACTION_CANCEL); usock->state = NN_USOCK_STATE_STOPPING_ACCEPT; return; } /* If we were already in the process of cancelling overlapped operations, we don't have to do anything. Continue waiting till cancelling is finished. */ if (usock->state == NN_USOCK_STATE_CANCELLING_IO) { usock->state = NN_USOCK_STATE_STOPPING; return; } /* Notify our parent that pipe socket is shutting down */ nn_fsm_raise (&usock->fsm, &usock->event_error, NN_USOCK_SHUTDOWN); /* In all remaining states we'll simply cancel all overlapped operations. */ if (nn_usock_cancel_io (usock) == 0) goto finish1; usock->state = NN_USOCK_STATE_STOPPING; return; } if (nn_slow (usock->state == NN_USOCK_STATE_STOPPING_ACCEPT)) { nn_assert (src == NN_FSM_ACTION && type == NN_USOCK_ACTION_DONE); goto finish1; } if (nn_slow (usock->state == NN_USOCK_STATE_STOPPING)) { if (!nn_worker_op_isidle (&usock->in) || !nn_worker_op_isidle (&usock->out)) return; finish1: nn_usock_close(usock); finish2: usock->state = NN_USOCK_STATE_IDLE; nn_fsm_stopped (&usock->fsm, NN_USOCK_STOPPED); finish3: return; } nn_fsm_bad_state(usock->state, src, type); } static void nn_usock_handler (struct nn_fsm *self, int src, int type, void *srcptr) { struct nn_usock *usock; usock = nn_cont (self, struct nn_usock, fsm); switch (usock->state) { /*****************************************************************************/ /* IDLE state. */ /*****************************************************************************/ case NN_USOCK_STATE_IDLE: switch (src) { case NN_FSM_ACTION: switch (type) { case NN_FSM_START: usock->state = NN_USOCK_STATE_STARTING; return; default: nn_fsm_bad_action (usock->state, src, type); } default: nn_fsm_bad_source (usock->state, src, type); } /*****************************************************************************/ /* STARTING state. */ /*****************************************************************************/ case NN_USOCK_STATE_STARTING: switch (src) { case NN_FSM_ACTION: switch (type) { case NN_USOCK_ACTION_LISTEN: usock->state = NN_USOCK_STATE_LISTENING; return; case NN_USOCK_ACTION_CONNECT: usock->state = NN_USOCK_STATE_CONNECTING; return; case NN_USOCK_ACTION_BEING_ACCEPTED: usock->state = NN_USOCK_STATE_BEING_ACCEPTED; return; default: nn_fsm_bad_action (usock->state, src, type); } default: nn_fsm_bad_source (usock->state, src, type); } /*****************************************************************************/ /* BEING_ACCEPTED state. */ /*****************************************************************************/ case NN_USOCK_STATE_BEING_ACCEPTED: switch (src) { case NN_FSM_ACTION: switch (type) { case NN_USOCK_ACTION_DONE: usock->state = NN_USOCK_STATE_ACCEPTED; nn_fsm_raise (&usock->fsm, &usock->event_established, NN_USOCK_ACCEPTED); return; default: nn_fsm_bad_action (usock->state, src, type); } default: nn_fsm_bad_source (usock->state, src, type); } /*****************************************************************************/ /* ACCEPTED state. */ /*****************************************************************************/ case NN_USOCK_STATE_ACCEPTED: switch (src) { case NN_FSM_ACTION: switch (type) { case NN_USOCK_ACTION_ACTIVATE: usock->state = NN_USOCK_STATE_ACTIVE; return; default: nn_fsm_bad_action (usock->state, src, type); } default: nn_fsm_bad_source (usock->state, src, type); } /*****************************************************************************/ /* CONNECTING state. */ /*****************************************************************************/ case NN_USOCK_STATE_CONNECTING: switch (src) { case NN_FSM_ACTION: switch (type) { case NN_USOCK_ACTION_DONE: usock->state = NN_USOCK_STATE_ACTIVE; nn_fsm_raise (&usock->fsm, &usock->event_established, NN_USOCK_CONNECTED); return; case NN_USOCK_ACTION_ERROR: nn_usock_close(usock); usock->state = NN_USOCK_STATE_DONE; nn_fsm_raise (&usock->fsm, &usock->event_error, NN_USOCK_ERROR); return; default: nn_fsm_bad_action (usock->state, src, type); } case NN_USOCK_SRC_OUT: switch (type) { case NN_WORKER_OP_DONE: usock->state = NN_USOCK_STATE_ACTIVE; nn_fsm_raise (&usock->fsm, &usock->event_established, NN_USOCK_CONNECTED); return; case NN_WORKER_OP_ERROR: nn_usock_close(usock); usock->state = NN_USOCK_STATE_DONE; nn_fsm_raise (&usock->fsm, &usock->event_error, NN_USOCK_ERROR); return; default: nn_fsm_bad_action (usock->state, src, type); } default: nn_fsm_bad_source (usock->state, src, type); } /*****************************************************************************/ /* ACTIVE state. */ /*****************************************************************************/ case NN_USOCK_STATE_ACTIVE: switch (src) { case NN_USOCK_SRC_IN: switch (type) { case NN_WORKER_OP_DONE: nn_fsm_raise (&usock->fsm, &usock->event_received, NN_USOCK_RECEIVED); return; case NN_WORKER_OP_ERROR: if (nn_usock_cancel_io (usock) == 0) { nn_fsm_raise(&usock->fsm, &usock->event_error, NN_USOCK_ERROR); nn_usock_close (usock); usock->state = NN_USOCK_STATE_DONE; return; } usock->state = NN_USOCK_STATE_CANCELLING_IO; return; default: nn_fsm_bad_action (usock->state, src, type); } case NN_USOCK_SRC_OUT: switch (type) { case NN_WORKER_OP_DONE: if (usock->pipesendbuf) { nn_free(usock->pipesendbuf); usock->pipesendbuf = NULL; } nn_fsm_raise (&usock->fsm, &usock->event_sent, NN_USOCK_SENT); return; case NN_WORKER_OP_ERROR: if (nn_usock_cancel_io (usock) == 0) { nn_fsm_raise(&usock->fsm, &usock->event_error, NN_USOCK_ERROR); nn_usock_close(usock); usock->state = NN_USOCK_STATE_DONE; return; } usock->state = NN_USOCK_STATE_CANCELLING_IO; return; default: nn_fsm_bad_action (usock->state, src, type); } case NN_FSM_ACTION: switch (type) { case NN_USOCK_ACTION_ERROR: if (nn_usock_cancel_io (usock) == 0) { nn_fsm_raise(&usock->fsm, &usock->event_error, NN_USOCK_SHUTDOWN); nn_usock_close(usock); usock->state = NN_USOCK_STATE_DONE; return; } usock->state = NN_USOCK_STATE_CANCELLING_IO; return; default: nn_fsm_bad_action (usock->state, src, type); } default: nn_fsm_bad_source (usock->state, src, type); } /*****************************************************************************/ /* CANCELLING_IO state. */ /*****************************************************************************/ case NN_USOCK_STATE_CANCELLING_IO: switch (src) { case NN_USOCK_SRC_IN: case NN_USOCK_SRC_OUT: if (!nn_worker_op_isidle (&usock->in) || !nn_worker_op_isidle (&usock->out)) return; nn_fsm_raise(&usock->fsm, &usock->event_error, NN_USOCK_SHUTDOWN); nn_usock_close(usock); usock->state = NN_USOCK_STATE_DONE; return; default: nn_fsm_bad_source (usock->state, src, type); } /*****************************************************************************/ /* DONE state. */ /*****************************************************************************/ case NN_USOCK_STATE_DONE: nn_fsm_bad_source (usock->state, src, type); /*****************************************************************************/ /* LISTENING state. */ /*****************************************************************************/ case NN_USOCK_STATE_LISTENING: switch (src) { case NN_FSM_ACTION: switch (type) { case NN_USOCK_ACTION_ACCEPT: usock->state = NN_USOCK_STATE_ACCEPTING; return; default: nn_fsm_bad_action (usock->state, src, type); } default: nn_fsm_bad_source (usock->state, src, type); } /*****************************************************************************/ /* ACCEPTING state. */ /*****************************************************************************/ case NN_USOCK_STATE_ACCEPTING: switch (src) { case NN_FSM_ACTION: switch (type) { case NN_USOCK_ACTION_DONE: usock->state = NN_USOCK_STATE_LISTENING; return; case NN_USOCK_ACTION_CANCEL: if (usock->p == INVALID_HANDLE_VALUE && usock->asock != NULL && usock->domain == AF_UNIX) { usock->p = usock->asock->p; nn_usock_cancel_io (usock); usock->p = INVALID_HANDLE_VALUE; } else { nn_usock_cancel_io(usock); } usock->state = NN_USOCK_STATE_CANCELLING; return; default: nn_fsm_bad_action (usock->state, src, type); } case NN_USOCK_SRC_IN: switch (type) { case NN_WORKER_OP_DONE: /* Adjust the new usock object. */ usock->asock->state = NN_USOCK_STATE_ACCEPTED; /* Notify the user that connection was accepted. */ nn_fsm_raise (&usock->asock->fsm, &usock->asock->event_established, NN_USOCK_ACCEPTED); /* Disassociate the listener socket from the accepted socket. */ usock->asock->asock = NULL; usock->asock = NULL; /* Wait till the user starts accepting once again. */ usock->state = NN_USOCK_STATE_LISTENING; return; default: nn_fsm_bad_action (usock->state, src, type); } default: nn_fsm_bad_source (usock->state, src, type); } /*****************************************************************************/ /* CANCELLING state. */ /*****************************************************************************/ case NN_USOCK_STATE_CANCELLING: switch (src) { case NN_USOCK_SRC_IN: switch (type) { case NN_WORKER_OP_DONE: case NN_WORKER_OP_ERROR: /* TODO: The socket being accepted should be closed here. */ usock->state = NN_USOCK_STATE_LISTENING; /* Notify the accepted socket that it was stopped. */ nn_fsm_action (&usock->asock->fsm, NN_USOCK_ACTION_DONE); return; default: nn_fsm_bad_action (usock->state, src, type); } default: nn_fsm_bad_source (usock->state, src, type); } /*****************************************************************************/ /* Invalid state. */ /*****************************************************************************/ default: nn_fsm_bad_state (usock->state, src, type); } } /*****************************************************************************/ /* State machine actions. */ /*****************************************************************************/ /* Returns 0 if there's nothing to cancel or 1 otherwise. */ static int nn_usock_cancel_io (struct nn_usock *self) { int rc; BOOL brc; /* For some reason simple CancelIo doesn't seem to work here. We have to use CancelIoEx instead. */ rc = 0; if (!nn_worker_op_isidle (&self->in)) { brc = CancelIoEx (self->p, &self->in.olpd); win_assert (brc || GetLastError () == ERROR_NOT_FOUND); rc = 1; } if (!nn_worker_op_isidle (&self->out)) { brc = CancelIoEx (self->p, &self->out.olpd); win_assert (brc || GetLastError () == ERROR_NOT_FOUND); rc = 1; } return rc; }