/* Copyright (c) 2013 Martin Sustrik All rights reserved. Copyright (c) 2013 GoPivotal, Inc. All rights reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #include "../utils/alloc.h" #include "../utils/closefd.h" #include "../utils/cont.h" #include "../utils/fast.h" #include "../utils/err.h" #include "../utils/attr.h" #include #include #include //#ifndef __PNACL #include //#else //#include //#endif #define NN_USOCK_STATE_IDLE 1 #define NN_USOCK_STATE_STARTING 2 #define NN_USOCK_STATE_BEING_ACCEPTED 3 #define NN_USOCK_STATE_ACCEPTED 4 #define NN_USOCK_STATE_CONNECTING 5 #define NN_USOCK_STATE_ACTIVE 6 #define NN_USOCK_STATE_REMOVING_FD 7 #define NN_USOCK_STATE_DONE 8 #define NN_USOCK_STATE_LISTENING 9 #define NN_USOCK_STATE_ACCEPTING 10 #define NN_USOCK_STATE_CANCELLING 11 #define NN_USOCK_STATE_STOPPING 12 #define NN_USOCK_STATE_STOPPING_ACCEPT 13 #define NN_USOCK_STATE_ACCEPTING_ERROR 14 #define NN_USOCK_ACTION_ACCEPT 1 #define NN_USOCK_ACTION_BEING_ACCEPTED 2 #define NN_USOCK_ACTION_CANCEL 3 #define NN_USOCK_ACTION_LISTEN 4 #define NN_USOCK_ACTION_CONNECT 5 #define NN_USOCK_ACTION_ACTIVATE 6 #define NN_USOCK_ACTION_DONE 7 #define NN_USOCK_ACTION_ERROR 8 #define NN_USOCK_ACTION_STARTED 9 #define NN_USOCK_SRC_FD 1 #define NN_USOCK_SRC_TASK_CONNECTING 2 #define NN_USOCK_SRC_TASK_CONNECTED 3 #define NN_USOCK_SRC_TASK_ACCEPT 4 #define NN_USOCK_SRC_TASK_SEND 5 #define NN_USOCK_SRC_TASK_RECV 6 #define NN_USOCK_SRC_TASK_STOP 7 /* Private functions. */ static void nn_usock_init_from_fd (struct nn_usock *self, int s); static int nn_usock_send_raw (struct nn_usock *self, struct msghdr *hdr); static int nn_usock_recv_raw (struct nn_usock *self, void *buf, size_t *len); static int nn_usock_geterr (struct nn_usock *self); static void nn_usock_handler (struct nn_fsm *self, int src, int type, void *srcptr); static void nn_usock_shutdown (struct nn_fsm *self, int src, int type, void *srcptr); void nn_usock_init (struct nn_usock *self, int src, struct nn_fsm *owner) { /* Initalise the state machine. */ nn_fsm_init (&self->fsm, nn_usock_handler, nn_usock_shutdown, src, self, owner); self->state = NN_USOCK_STATE_IDLE; /* Choose a worker thread to handle this socket. */ self->worker = nn_fsm_choose_worker (&self->fsm); /* Actual file descriptor will be generated during 'start' step. */ self->s = -1; self->errnum = 0; self->in.buf = NULL; self->in.len = 0; self->in.batch = NULL; self->in.batch_len = 0; self->in.batch_pos = 0; self->in.pfd = NULL; memset (&self->out.hdr, 0, sizeof (struct msghdr)); /* Initialise tasks for the worker thread. */ nn_worker_fd_init (&self->wfd, NN_USOCK_SRC_FD, &self->fsm); nn_worker_task_init (&self->task_connecting, NN_USOCK_SRC_TASK_CONNECTING,&self->fsm); nn_worker_task_init (&self->task_connected, NN_USOCK_SRC_TASK_CONNECTED,&self->fsm); nn_worker_task_init (&self->task_accept, NN_USOCK_SRC_TASK_ACCEPT,&self->fsm); nn_worker_task_init (&self->task_send, NN_USOCK_SRC_TASK_SEND, &self->fsm); nn_worker_task_init (&self->task_recv, NN_USOCK_SRC_TASK_RECV, &self->fsm); nn_worker_task_init (&self->task_stop, NN_USOCK_SRC_TASK_STOP, &self->fsm); /* Intialise events raised by usock. */ nn_fsm_event_init (&self->event_established); nn_fsm_event_init (&self->event_sent); nn_fsm_event_init (&self->event_received); nn_fsm_event_init (&self->event_error); /* accepting is not going on at the moment. */ self->asock = NULL; } void nn_usock_term (struct nn_usock *self) { nn_assert_state (self, NN_USOCK_STATE_IDLE); if (self->in.batch) nn_free (self->in.batch); nn_fsm_event_term (&self->event_error); nn_fsm_event_term (&self->event_received); nn_fsm_event_term (&self->event_sent); nn_fsm_event_term (&self->event_established); nn_worker_cancel (self->worker, &self->task_recv); nn_worker_task_term (&self->task_stop); nn_worker_task_term (&self->task_recv); nn_worker_task_term (&self->task_send); nn_worker_task_term (&self->task_accept); nn_worker_task_term (&self->task_connected); nn_worker_task_term (&self->task_connecting); nn_worker_fd_term (&self->wfd); nn_fsm_term (&self->fsm); } int nn_usock_isidle (struct nn_usock *self) { return nn_fsm_isidle (&self->fsm); } int nn_usock_start (struct nn_usock *self, int domain, int type, int protocol) { int s; /* If the operating system allows to directly open the socket with CLOEXEC flag, do so. That way there are no race conditions. */ #ifdef SOCK_CLOEXEC type |= SOCK_CLOEXEC; #endif /* Open the underlying socket. */ s = socket (domain, type, protocol); if (nn_slow (s < 0)) return -errno; nn_usock_init_from_fd (self, s); /* Start the state machine. */ nn_fsm_start (&self->fsm); return 0; } void nn_usock_start_fd (struct nn_usock *self, int fd) { nn_usock_init_from_fd (self, fd); nn_fsm_start (&self->fsm); nn_fsm_action (&self->fsm, NN_USOCK_ACTION_STARTED); } static void nn_usock_init_from_fd (struct nn_usock *self, int s) { int rc; int opt; nn_assert (self->state == NN_USOCK_STATE_IDLE || NN_USOCK_STATE_BEING_ACCEPTED); /* Store the file descriptor. */ nn_assert (self->s == -1); self->s = s; /* Setting FD_CLOEXEC option immediately after socket creation is the second best option after using SOCK_CLOEXEC. There is a race condition here (if process is forked between socket creation and setting the option) but the problem is pretty unlikely to happen. */ #if defined FD_CLOEXEC rc = fcntl (self->s, F_SETFD, FD_CLOEXEC); #if defined __APPLE__ errno_assert (rc != -1 || errno == EINVAL); #else errno_assert (rc != -1); #endif #endif /* If applicable, prevent SIGPIPE signal when writing to the connection already closed by the peer. */ #ifdef SO_NOSIGPIPE opt = 1; rc = setsockopt (self->s, SOL_SOCKET, SO_NOSIGPIPE, &opt, sizeof (opt)); #if defined __APPLE__ errno_assert (rc == 0 || errno == EINVAL); #else errno_assert (rc == 0); #endif #endif /* Switch the socket to the non-blocking mode. All underlying sockets are always used in the callbackhronous mode. */ opt = fcntl (self->s, F_GETFL, 0); if (opt == -1) opt = 0; if (!(opt & O_NONBLOCK)) { rc = fcntl (self->s, F_SETFL, opt | O_NONBLOCK); #if defined __APPLE__ errno_assert (rc != -1 || errno == EINVAL); #else errno_assert (rc != -1); #endif } } void nn_usock_stop (struct nn_usock *self) { nn_fsm_stop (&self->fsm); } void nn_usock_async_stop (struct nn_usock *self) { nn_worker_execute (self->worker, &self->task_stop); nn_fsm_raise (&self->fsm, &self->event_error, NN_USOCK_SHUTDOWN); } void nn_usock_swap_owner (struct nn_usock *self, struct nn_fsm_owner *owner) { nn_fsm_swap_owner (&self->fsm, owner); } int nn_usock_setsockopt (struct nn_usock *self, int level, int optname, const void *optval, size_t optlen) { int rc; /* The socket can be modified only before it's active. */ nn_assert (self->state == NN_USOCK_STATE_STARTING || self->state == NN_USOCK_STATE_ACCEPTED); /* EINVAL errors are ignored on OSX platform. The reason for that is buggy OSX behaviour where setsockopt returns EINVAL if the peer have already disconnected. Thus, nn_usock_setsockopt() can succeed on OSX even though the option value was invalid, but the peer have already closed the connection. This behaviour should be relatively harmless. */ rc = setsockopt (self->s, level, optname, optval, (socklen_t) optlen); #if defined __APPLE__ if (nn_slow (rc != 0 && errno != EINVAL)) return -errno; #else if (nn_slow (rc != 0)) return -errno; #endif return 0; } int nn_usock_bind (struct nn_usock *self, const struct sockaddr *addr, size_t addrlen) { int rc; int opt; /* The socket can be bound only before it's connected. */ nn_assert_state (self, NN_USOCK_STATE_STARTING); /* Allow re-using the address. */ opt = 1; rc = setsockopt (self->s, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof (opt)); // ignore SO_REUSEADDR failures //errno_assert (rc == 0); rc = bind (self->s, addr, (socklen_t) addrlen); if (nn_slow (rc != 0)) return -errno; return 0; } int nn_usock_listen (struct nn_usock *self, int backlog) { int rc; /* You can start listening only before the socket is connected. */ nn_assert_state (self, NN_USOCK_STATE_STARTING); /* Start listening for incoming connections. */ rc = listen (self->s, backlog); if (nn_slow (rc != 0)) return -errno; /* Notify the state machine. */ nn_fsm_action (&self->fsm, NN_USOCK_ACTION_LISTEN); return 0; } void nn_usock_accept (struct nn_usock *self, struct nn_usock *listener) { int s; /* Start the actual accepting. */ if (nn_fsm_isidle(&self->fsm)) { nn_fsm_start (&self->fsm); nn_fsm_action (&self->fsm, NN_USOCK_ACTION_BEING_ACCEPTED); } nn_fsm_action (&listener->fsm, NN_USOCK_ACTION_ACCEPT); /* Try to accept new connection in synchronous manner. */ #if NN_HAVE_ACCEPT4 s = accept4 (listener->s, NULL, NULL, SOCK_CLOEXEC); #else s = accept (listener->s, NULL, NULL); #endif /* Immediate success. */ if (nn_fast (s >= 0)) { /* Disassociate the listener socket from the accepted socket. Is useful if we restart accepting on ACCEPT_ERROR */ listener->asock = NULL; self->asock = NULL; nn_usock_init_from_fd (self, s); nn_fsm_action (&listener->fsm, NN_USOCK_ACTION_DONE); nn_fsm_action (&self->fsm, NN_USOCK_ACTION_DONE); return; } /* Detect a failure. Note that in ECONNABORTED case we simply ignore the error and wait for next connection in asynchronous manner. */ errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || errno == ECONNABORTED || errno == ENFILE || errno == EMFILE || errno == ENOBUFS || errno == ENOMEM); /* Pair the two sockets. They are already paired in case previous attempt failed on ACCEPT_ERROR */ nn_assert (!self->asock || self->asock == listener); self->asock = listener; nn_assert (!listener->asock || listener->asock == self); listener->asock = self; /* Some errors are just ok to ignore for now. We also stop repeating any errors until next IN_FD event so that we are not in a tight loop and allow processing other events in the meantime */ if (nn_slow (errno != EAGAIN && errno != EWOULDBLOCK && errno != ECONNABORTED && errno != listener->errnum)) { listener->errnum = errno; listener->state = NN_USOCK_STATE_ACCEPTING_ERROR; nn_fsm_raise (&listener->fsm, &listener->event_error, NN_USOCK_ACCEPT_ERROR); return; } /* Ask the worker thread to wait for the new connection. */ nn_worker_execute (listener->worker, &listener->task_accept); } void nn_usock_activate (struct nn_usock *self) { nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ACTIVATE); } void nn_usock_connect (struct nn_usock *self, const struct sockaddr *addr, size_t addrlen) { int rc; /* Notify the state machine that we've started connecting. */ nn_fsm_action (&self->fsm, NN_USOCK_ACTION_CONNECT); /* Do the connect itself. */ rc = connect (self->s, addr, (socklen_t) addrlen); /* Immediate success. */ if (nn_fast (rc == 0)) { nn_fsm_action (&self->fsm, NN_USOCK_ACTION_DONE); return; } /* Immediate error. */ if (nn_slow (errno != EINPROGRESS)) { self->errnum = errno; nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR); return; } /* Start asynchronous connect. */ nn_worker_execute (self->worker, &self->task_connecting); } void nn_usock_send(struct nn_usock *self,const struct nn_iovec *iov,int32_t iovcnt) { int32_t rc,i,out; // Make sure that the socket is actually alive nn_assert_state (self, NN_USOCK_STATE_ACTIVE); // Copy the iovecs to the socket nn_assert (iovcnt <= NN_USOCK_MAX_IOVCNT); self->out.hdr.msg_iov = self->out.iov; out = 0; for (i=0; iout.iov [out].iov_base = iov [i].iov_base; self->out.iov [out].iov_len = iov [i].iov_len; out++; } self->out.hdr.msg_iovlen = out; rc = nn_usock_send_raw (self, &self->out.hdr); // Try to send the data immediately if ( nn_fast(rc == 0) ) // Success { nn_fsm_raise (&self->fsm, &self->event_sent, NN_USOCK_SENT); return; } if ( nn_slow(rc != -EAGAIN) ) // Errors { errnum_assert (rc == -ECONNRESET, -rc); nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR); return; } nn_worker_execute (self->worker, &self->task_send); // Ask the worker thread to send the remaining data } void nn_usock_recv(struct nn_usock *self, void *buf, size_t len, int *fd) { int rc; size_t nbytes; /* Make sure that the socket is actually alive. */ nn_assert_state (self, NN_USOCK_STATE_ACTIVE); /* Try to receive the data immediately. */ nbytes = len; self->in.pfd = fd; rc = nn_usock_recv_raw (self, buf, &nbytes); if (nn_slow (rc < 0)) { errnum_assert (rc == -ECONNRESET, -rc); nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR); return; } /* Success. */ if (nn_fast (nbytes == len)) { nn_fsm_raise (&self->fsm, &self->event_received, NN_USOCK_RECEIVED); return; } /* There are still data to receive in the background. */ self->in.buf = ((uint8_t*) buf) + nbytes; self->in.len = len - nbytes; /* Ask the worker thread to receive the remaining data. */ nn_worker_execute (self->worker, &self->task_recv); } static int nn_internal_tasks (struct nn_usock *usock, int src, int type) { /******************************************************************************/ /* Internal tasks sent from the user thread to the worker thread. */ /******************************************************************************/ switch (src) { case NN_USOCK_SRC_TASK_SEND: nn_assert (type == NN_WORKER_TASK_EXECUTE); nn_worker_set_out (usock->worker, &usock->wfd); return 1; case NN_USOCK_SRC_TASK_RECV: nn_assert (type == NN_WORKER_TASK_EXECUTE); nn_worker_set_in (usock->worker, &usock->wfd); return 1; case NN_USOCK_SRC_TASK_CONNECTED: nn_assert (type == NN_WORKER_TASK_EXECUTE); nn_worker_add_fd (usock->worker, usock->s, &usock->wfd); return 1; case NN_USOCK_SRC_TASK_CONNECTING: nn_assert (type == NN_WORKER_TASK_EXECUTE); nn_worker_add_fd (usock->worker, usock->s, &usock->wfd); nn_worker_set_out (usock->worker, &usock->wfd); return 1; case NN_USOCK_SRC_TASK_ACCEPT: nn_assert (type == NN_WORKER_TASK_EXECUTE); nn_worker_add_fd (usock->worker, usock->s, &usock->wfd); nn_worker_set_in (usock->worker, &usock->wfd); return 1; } return 0; } static void nn_usock_shutdown (struct nn_fsm *self, int src, int type, NN_UNUSED void *srcptr) { struct nn_usock *usock; usock = nn_cont (self, struct nn_usock, fsm); if (nn_internal_tasks (usock, src, type)) return; if (nn_slow (src == NN_FSM_ACTION && type == NN_FSM_STOP)) { /* Socket in ACCEPTING or CANCELLING state cannot be closed. Stop the socket being accepted first. */ nn_assert (usock->state != NN_USOCK_STATE_ACCEPTING && usock->state != NN_USOCK_STATE_CANCELLING); usock->errnum = 0; /* Synchronous stop. */ if (usock->state == NN_USOCK_STATE_IDLE) goto finish3; if (usock->state == NN_USOCK_STATE_DONE) goto finish2; if (usock->state == NN_USOCK_STATE_STARTING || usock->state == NN_USOCK_STATE_ACCEPTED || usock->state == NN_USOCK_STATE_ACCEPTING_ERROR || usock->state == NN_USOCK_STATE_LISTENING) goto finish1; /* When socket that's being accepted is asked to stop, we have to ask the listener socket to stop accepting first. */ if (usock->state == NN_USOCK_STATE_BEING_ACCEPTED) { nn_fsm_action (&usock->asock->fsm, NN_USOCK_ACTION_CANCEL); usock->state = NN_USOCK_STATE_STOPPING_ACCEPT; return; } /* Asynchronous stop. */ if (usock->state != NN_USOCK_STATE_REMOVING_FD) nn_usock_async_stop (usock); usock->state = NN_USOCK_STATE_STOPPING; return; } if (nn_slow (usock->state == NN_USOCK_STATE_STOPPING_ACCEPT)) { nn_assert (src == NN_FSM_ACTION && type == NN_USOCK_ACTION_DONE); goto finish2; } if (nn_slow (usock->state == NN_USOCK_STATE_STOPPING)) { if (src != NN_USOCK_SRC_TASK_STOP) return; nn_assert (type == NN_WORKER_TASK_EXECUTE); nn_worker_rm_fd (usock->worker, &usock->wfd); finish1: nn_closefd (usock->s); usock->s = -1; finish2: usock->state = NN_USOCK_STATE_IDLE; nn_fsm_stopped (&usock->fsm, NN_USOCK_STOPPED); finish3: return; } nn_fsm_bad_state(usock->state, src, type); } static void nn_usock_handler (struct nn_fsm *self, int src, int type, NN_UNUSED void *srcptr) { int rc; struct nn_usock *usock; int s; size_t sz; int sockerr; usock = nn_cont (self, struct nn_usock, fsm); if(nn_internal_tasks(usock, src, type)) return; switch (usock->state) { /******************************************************************************/ /* IDLE state. */ /* nn_usock object is initialised, but underlying OS socket is not yet */ /* created. */ /******************************************************************************/ case NN_USOCK_STATE_IDLE: switch (src) { case NN_FSM_ACTION: switch (type) { case NN_FSM_START: usock->state = NN_USOCK_STATE_STARTING; return; default: nn_fsm_bad_action (usock->state, src, type); } default: nn_fsm_bad_source (usock->state, src, type); } /******************************************************************************/ /* STARTING state. */ /* Underlying OS socket is created, but it's not yet passed to the worker */ /* thread. In this state we can set socket options, local and remote */ /* address etc. */ /******************************************************************************/ case NN_USOCK_STATE_STARTING: /* Events from the owner of the usock. */ switch (src) { case NN_FSM_ACTION: switch (type) { case NN_USOCK_ACTION_LISTEN: usock->state = NN_USOCK_STATE_LISTENING; return; case NN_USOCK_ACTION_CONNECT: usock->state = NN_USOCK_STATE_CONNECTING; return; case NN_USOCK_ACTION_BEING_ACCEPTED: usock->state = NN_USOCK_STATE_BEING_ACCEPTED; return; case NN_USOCK_ACTION_STARTED: nn_worker_add_fd (usock->worker, usock->s, &usock->wfd); usock->state = NN_USOCK_STATE_ACTIVE; return; default: nn_fsm_bad_action (usock->state, src, type); } default: nn_fsm_bad_source (usock->state, src, type); } /******************************************************************************/ /* BEING_ACCEPTED state. */ /* accept() was called on the usock. Now the socket is waiting for a new */ /* connection to arrive. */ /******************************************************************************/ case NN_USOCK_STATE_BEING_ACCEPTED: switch (src) { case NN_FSM_ACTION: switch (type) { case NN_USOCK_ACTION_DONE: usock->state = NN_USOCK_STATE_ACCEPTED; nn_fsm_raise (&usock->fsm, &usock->event_established, NN_USOCK_ACCEPTED); return; default: nn_fsm_bad_action (usock->state, src, type); } default: nn_fsm_bad_source (usock->state, src, type); } /******************************************************************************/ /* ACCEPTED state. */ /* Connection was accepted, now it can be tuned. Afterwards, it'll move to */ /* the active state. */ /******************************************************************************/ case NN_USOCK_STATE_ACCEPTED: switch (src) { case NN_FSM_ACTION: switch (type) { case NN_USOCK_ACTION_ACTIVATE: nn_worker_add_fd (usock->worker, usock->s, &usock->wfd); usock->state = NN_USOCK_STATE_ACTIVE; return; default: nn_fsm_bad_action (usock->state, src, type); } default: nn_fsm_bad_source (usock->state, src, type); } /******************************************************************************/ /* CONNECTING state. */ /* Asynchronous connecting is going on. */ /******************************************************************************/ case NN_USOCK_STATE_CONNECTING: switch (src) { case NN_FSM_ACTION: switch (type) { case NN_USOCK_ACTION_DONE: usock->state = NN_USOCK_STATE_ACTIVE; nn_worker_execute (usock->worker, &usock->task_connected); nn_fsm_raise (&usock->fsm, &usock->event_established, NN_USOCK_CONNECTED); return; case NN_USOCK_ACTION_ERROR: nn_closefd (usock->s); usock->s = -1; usock->state = NN_USOCK_STATE_DONE; nn_fsm_raise (&usock->fsm, &usock->event_error, NN_USOCK_ERROR); return; default: nn_fsm_bad_action (usock->state, src, type); } case NN_USOCK_SRC_FD: switch (type) { case NN_WORKER_FD_OUT: nn_worker_reset_out (usock->worker, &usock->wfd); usock->state = NN_USOCK_STATE_ACTIVE; sockerr = nn_usock_geterr(usock); if (sockerr == 0) { nn_fsm_raise (&usock->fsm, &usock->event_established, NN_USOCK_CONNECTED); } else { usock->errnum = sockerr; nn_worker_rm_fd (usock->worker, &usock->wfd); rc = close (usock->s); errno_assert (rc == 0); usock->s = -1; usock->state = NN_USOCK_STATE_DONE; nn_fsm_raise (&usock->fsm, &usock->event_error, NN_USOCK_ERROR); } return; case NN_WORKER_FD_ERR: nn_worker_rm_fd (usock->worker, &usock->wfd); nn_closefd (usock->s); usock->s = -1; usock->state = NN_USOCK_STATE_DONE; nn_fsm_raise (&usock->fsm, &usock->event_error, NN_USOCK_ERROR); return; default: nn_fsm_bad_action (usock->state, src, type); } default: nn_fsm_bad_source (usock->state, src, type); } /******************************************************************************/ /* ACTIVE state. */ /* Socket is connected. It can be used for sending and receiving data. */ /******************************************************************************/ case NN_USOCK_STATE_ACTIVE: switch (src) { case NN_USOCK_SRC_FD: switch (type) { case NN_WORKER_FD_IN: sz = usock->in.len; rc = nn_usock_recv_raw (usock, usock->in.buf, &sz); if (nn_fast (rc == 0)) { usock->in.len -= sz; usock->in.buf += sz; if (!usock->in.len) { nn_worker_reset_in (usock->worker, &usock->wfd); nn_fsm_raise (&usock->fsm, &usock->event_received, NN_USOCK_RECEIVED); } return; } errnum_assert (rc == -ECONNRESET, -rc); goto error; case NN_WORKER_FD_OUT: rc = nn_usock_send_raw (usock, &usock->out.hdr); if (nn_fast (rc == 0)) { nn_worker_reset_out (usock->worker, &usock->wfd); nn_fsm_raise (&usock->fsm, &usock->event_sent, NN_USOCK_SENT); return; } if (nn_fast (rc == -EAGAIN)) return; errnum_assert (rc == -ECONNRESET, -rc); goto error; case NN_WORKER_FD_ERR: error: nn_worker_rm_fd (usock->worker, &usock->wfd); nn_closefd (usock->s); usock->s = -1; usock->state = NN_USOCK_STATE_DONE; nn_fsm_raise (&usock->fsm, &usock->event_error, NN_USOCK_ERROR); return; default: nn_fsm_bad_action (usock->state, src, type); } case NN_FSM_ACTION: switch (type) { case NN_USOCK_ACTION_ERROR: usock->state = NN_USOCK_STATE_REMOVING_FD; nn_usock_async_stop (usock); return; default: nn_fsm_bad_action (usock->state, src, type); } default: nn_fsm_bad_source(usock->state, src, type); } /******************************************************************************/ /* REMOVING_FD state. */ /******************************************************************************/ case NN_USOCK_STATE_REMOVING_FD: switch (src) { case NN_USOCK_SRC_TASK_STOP: switch (type) { case NN_WORKER_TASK_EXECUTE: nn_worker_rm_fd (usock->worker, &usock->wfd); nn_closefd (usock->s); usock->s = -1; usock->state = NN_USOCK_STATE_DONE; nn_fsm_raise (&usock->fsm, &usock->event_error, NN_USOCK_ERROR); return; default: nn_fsm_bad_action (usock->state, src, type); } /* Events from the file descriptor are ignored while it is being removed. */ case NN_USOCK_SRC_FD: return; default: nn_fsm_bad_source (usock->state, src, type); } /******************************************************************************/ /* DONE state. */ /* Socket is closed. The only thing that can be done in this state is */ /* stopping the usock. */ /******************************************************************************/ case NN_USOCK_STATE_DONE: nn_fsm_bad_source (usock->state, src, type); /******************************************************************************/ /* LISTENING state. */ /* Socket is listening for new incoming connections, however, user is not */ /* accepting a new connection. */ /******************************************************************************/ case NN_USOCK_STATE_LISTENING: switch (src) { case NN_FSM_ACTION: switch (type) { case NN_USOCK_ACTION_ACCEPT: usock->state = NN_USOCK_STATE_ACCEPTING; return; default: nn_fsm_bad_action (usock->state, src, type); } default: nn_fsm_bad_source (usock->state, src, type); } /******************************************************************************/ /* ACCEPTING state. */ /* User is waiting asynchronouslyfor a new inbound connection */ /* to be accepted. */ /******************************************************************************/ case NN_USOCK_STATE_ACCEPTING: switch (src) { case NN_FSM_ACTION: switch (type) { case NN_USOCK_ACTION_DONE: usock->state = NN_USOCK_STATE_LISTENING; return; case NN_USOCK_ACTION_CANCEL: usock->state = NN_USOCK_STATE_CANCELLING; nn_worker_execute (usock->worker, &usock->task_stop); return; default: nn_fsm_bad_action (usock->state, src, type); } case NN_USOCK_SRC_FD: switch (type) { case NN_WORKER_FD_IN: /* New connection arrived in asynchronous manner. */ #if NN_HAVE_ACCEPT4 s = accept4 (usock->s, NULL, NULL, SOCK_CLOEXEC); #else s = accept (usock->s, NULL, NULL); #endif /* ECONNABORTED is an valid error. New connection was closed by the peer before we were able to accept it. If it happens do nothing and wait for next incoming connection. */ if (nn_slow (s < 0 && errno == ECONNABORTED)) return; /* Resource allocation errors. It's not clear from POSIX specification whether the new connection is closed in this case or whether it remains in the backlog. In the latter case it would be wise to wait here for a while to prevent busy looping. */ if (nn_slow (s < 0 && (errno == ENFILE || errno == EMFILE || errno == ENOBUFS || errno == ENOMEM))) { usock->errnum = errno; usock->state = NN_USOCK_STATE_ACCEPTING_ERROR; /* Wait till the user starts accepting once again. */ nn_worker_rm_fd (usock->worker, &usock->wfd); nn_fsm_raise (&usock->fsm, &usock->event_error, NN_USOCK_ACCEPT_ERROR); return; } /* Any other error is unexpected. */ errno_assert (s >= 0); /* Initialise the new usock object. */ nn_usock_init_from_fd (usock->asock, s); usock->asock->state = NN_USOCK_STATE_ACCEPTED; /* Notify the user that connection was accepted. */ nn_fsm_raise (&usock->asock->fsm, &usock->asock->event_established, NN_USOCK_ACCEPTED); /* Disassociate the listener socket from the accepted socket. */ usock->asock->asock = NULL; usock->asock = NULL; /* Wait till the user starts accepting once again. */ nn_worker_rm_fd (usock->worker, &usock->wfd); usock->state = NN_USOCK_STATE_LISTENING; return; default: nn_fsm_bad_action (usock->state, src, type); } default: nn_fsm_bad_source (usock->state, src, type); } /******************************************************************************/ /* ACCEPTING_ERROR state. */ /* Waiting the socket to accept the error and restart */ /******************************************************************************/ case NN_USOCK_STATE_ACCEPTING_ERROR: switch (src) { case NN_FSM_ACTION: switch (type) { case NN_USOCK_ACTION_ACCEPT: usock->state = NN_USOCK_STATE_ACCEPTING; return; default: nn_fsm_bad_action (usock->state, src, type); } default: nn_fsm_bad_source (usock->state, src, type); } /******************************************************************************/ /* CANCELLING state. */ /******************************************************************************/ case NN_USOCK_STATE_CANCELLING: switch (src) { case NN_USOCK_SRC_TASK_STOP: switch (type) { case NN_WORKER_TASK_EXECUTE: nn_worker_rm_fd (usock->worker, &usock->wfd); usock->state = NN_USOCK_STATE_LISTENING; /* Notify the accepted socket that it was stopped. */ nn_fsm_action (&usock->asock->fsm, NN_USOCK_ACTION_DONE); return; default: nn_fsm_bad_action (usock->state, src, type); } case NN_USOCK_SRC_FD: switch (type) { case NN_WORKER_FD_IN: return; default: nn_fsm_bad_action (usock->state, src, type); } default: nn_fsm_bad_source (usock->state, src, type); } /******************************************************************************/ /* Invalid state */ /******************************************************************************/ default: nn_fsm_bad_state (usock->state, src, type); } } static int32_t nn_usock_send_raw(struct nn_usock *self,struct msghdr *hdr) { ssize_t nbytes; // Try to send the data #if defined MSG_NOSIGNAL nbytes = sendmsg(self->s,hdr,MSG_NOSIGNAL); #else nbytes = sendmsg(self->s,hdr,0); #endif printf("nn_usock_send_raw nbytes.%d for sock.%d\n",(int32_t)nbytes,self->s); if ( nn_slow(nbytes < 0) ) // Handle errors { if ( nn_fast(errno == EAGAIN || errno == EWOULDBLOCK) ) nbytes = 0; #ifdef __PNACL else if ( errno < 0 ) { PostMessage("nn_usock_send_raw err.%d\n",(int32_t)nbytes); return -ECONNRESET; } #endif else { // If the connection fails, return ECONNRESET errno_assert(errno == ECONNRESET || errno == ETIMEDOUT || errno == EPIPE || errno == ECONNREFUSED || errno == ENOTCONN); return -ECONNRESET; } } while ( nbytes != 0 ) // Some bytes were sent. Adjust the iovecs accordingly { if ( nbytes >= (ssize_t)hdr->msg_iov->iov_len ) { --hdr->msg_iovlen; if ( !hdr->msg_iovlen ) { nn_assert(nbytes == (ssize_t)hdr->msg_iov->iov_len); return 0; } nbytes -= hdr->msg_iov->iov_len; ++hdr->msg_iov; } else { *((uint8_t **)&(hdr->msg_iov->iov_base)) += nbytes; hdr->msg_iov->iov_len -= nbytes; return -EAGAIN; } } if ( hdr->msg_iovlen > 0 ) return -EAGAIN; return 0; } static int nn_usock_recv_raw(struct nn_usock *self, void *buf, size_t *len) { size_t sz,length,nbytes; struct iovec iov; struct msghdr hdr; uint8_t ctrl [256]; #if defined NN_HAVE_MSG_CONTROL struct cmsghdr *cmsg; #endif /* If batch buffer doesn't exist, allocate it. The point of delayed deallocation to allow non-receiving sockets, such as TCP listening sockets, to do without the batch buffer. */ if (nn_slow (!self->in.batch)) { self->in.batch = nn_alloc (NN_USOCK_BATCH_SIZE, "AIO batch buffer"); alloc_assert (self->in.batch); } /* Try to satisfy the recv request by data from the batch buffer. */ length = *len; sz = self->in.batch_len - self->in.batch_pos; if (sz) { if (sz > length) sz = length; memcpy (buf, self->in.batch + self->in.batch_pos, sz); self->in.batch_pos += sz; buf = ((char*) buf) + sz; length -= sz; if (!length) return 0; } /* If recv request is greater than the batch buffer, get the data directly into the place. Otherwise, read data to the batch buffer. */ if (length > NN_USOCK_BATCH_SIZE) { iov.iov_base = buf; iov.iov_len = length; } else { iov.iov_base = self->in.batch; iov.iov_len = NN_USOCK_BATCH_SIZE; } memset (&hdr, 0, sizeof (hdr)); hdr.msg_iov = &iov; hdr.msg_iovlen = 1; #if defined NN_HAVE_MSG_CONTROL hdr.msg_control = ctrl; hdr.msg_controllen = sizeof (ctrl); #else *((int*) ctrl) = -1; hdr.msg_accrights = ctrl; hdr.msg_accrightslen = sizeof (int); #endif nbytes = recvmsg (self->s, &hdr, 0); /* Handle any possible errors. */ if (nn_slow (nbytes <= 0)) { if (nn_slow (nbytes == 0)) return -ECONNRESET; /* Zero bytes received. */ if (nn_fast (errno == EAGAIN || errno == EWOULDBLOCK)) nbytes = 0; else { /* If the peer closes the connection, return ECONNRESET. */ errno_assert (errno == ECONNRESET || errno == ENOTCONN || errno == ECONNREFUSED || errno == ETIMEDOUT || errno == EHOSTUNREACH); return -ECONNRESET; } } /* Extract the associated file descriptor, if any. */ if (nbytes > 0) { #if defined NN_HAVE_MSG_CONTROL cmsg = CMSG_FIRSTHDR (&hdr); while (cmsg) { if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) { if (self->in.pfd) { *self->in.pfd = *((int*) CMSG_DATA (cmsg)); self->in.pfd = NULL; } else { nn_closefd (*((int*) CMSG_DATA (cmsg))); } break; } cmsg = CMSG_NXTHDR (&hdr, cmsg); } #else if (hdr.msg_accrightslen > 0) { nn_assert (hdr.msg_accrightslen == sizeof (int)); if (self->in.pfd) { *self->in.pfd = *((int*) hdr.msg_accrights); self->in.pfd = NULL; } else { nn_closefd (*((int*) hdr.msg_accrights)); } } #endif } /* If the data were received directly into the place we can return straight away. */ if (length > NN_USOCK_BATCH_SIZE) { length -= nbytes; *len -= length; return 0; } /* New data were read to the batch buffer. Copy the requested amount of it to the user-supplied buffer. */ self->in.batch_len = nbytes; self->in.batch_pos = 0; if (nbytes) { sz = nbytes > (ssize_t)length ? length : (size_t)nbytes; memcpy (buf, self->in.batch, sz); length -= sz; self->in.batch_pos += sz; } *len -= length; return 0; } static int nn_usock_geterr (struct nn_usock *self) { int rc; int opt; #if defined NN_HAVE_HPUX int optsz; #else socklen_t optsz; #endif opt = 0; optsz = sizeof (opt); rc = getsockopt (self->s, SOL_SOCKET, SO_ERROR, &opt, &optsz); /* The following should handle both Solaris and UNIXes derived from BSD. */ if (rc == -1) return errno; errno_assert (rc == 0); nn_assert (optsz == sizeof (opt)); return opt; }