/* Copyright (c) 2013 Martin Sustrik All rights reserved. Copyright (c) 2013 GoPivotal, Inc. All rights reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #include "../nn_config.h" #include "../utils/alloc.h" #include "../utils/closefd.h" #include "../utils/cont.h" #include "../utils/fast.h" #include "../utils/err.h" #include "../utils/attr.h" #include #include #include #include #define NN_USOCK_STATE_IDLE 1 #define NN_USOCK_STATE_STARTING 2 #define NN_USOCK_STATE_BEING_ACCEPTED 3 #define NN_USOCK_STATE_ACCEPTED 4 #define NN_USOCK_STATE_CONNECTING 5 #define NN_USOCK_STATE_ACTIVE 6 #define NN_USOCK_STATE_REMOVING_FD 7 #define NN_USOCK_STATE_DONE 8 #define NN_USOCK_STATE_LISTENING 9 #define NN_USOCK_STATE_ACCEPTING 10 #define NN_USOCK_STATE_CANCELLING 11 #define NN_USOCK_STATE_STOPPING 12 #define NN_USOCK_STATE_STOPPING_ACCEPT 13 #define NN_USOCK_STATE_ACCEPTING_ERROR 14 #define NN_USOCK_ACTION_ACCEPT 1 #define NN_USOCK_ACTION_BEING_ACCEPTED 2 #define NN_USOCK_ACTION_CANCEL 3 #define NN_USOCK_ACTION_LISTEN 4 #define NN_USOCK_ACTION_CONNECT 5 #define NN_USOCK_ACTION_ACTIVATE 6 #define NN_USOCK_ACTION_DONE 7 #define NN_USOCK_ACTION_ERROR 8 #define NN_USOCK_ACTION_STARTED 9 #define NN_USOCK_SRC_FD 1 #define NN_USOCK_SRC_TASK_CONNECTING 2 #define NN_USOCK_SRC_TASK_CONNECTED 3 #define NN_USOCK_SRC_TASK_ACCEPT 4 #define NN_USOCK_SRC_TASK_SEND 5 #define NN_USOCK_SRC_TASK_RECV 6 #define NN_USOCK_SRC_TASK_STOP 7 /* Private functions. */ static void nn_usock_init_from_fd (struct nn_usock *self, int s); static int nn_usock_send_raw (struct nn_usock *self, struct msghdr *hdr); static int nn_usock_recv_raw (struct nn_usock *self, void *buf, size_t *len); static int nn_usock_geterr (struct nn_usock *self); static void nn_usock_handler (struct nn_fsm *self, int src, int type, void *srcptr); static void nn_usock_shutdown (struct nn_fsm *self, int src, int type, void *srcptr); void nn_usock_init (struct nn_usock *self, int src, struct nn_fsm *owner) { /* Initalise the state machine. */ nn_fsm_init (&self->fsm, nn_usock_handler, nn_usock_shutdown, src, self, owner); self->state = NN_USOCK_STATE_IDLE; /* Choose a worker thread to handle this socket. */ self->worker = nn_fsm_choose_worker (&self->fsm); /* Actual file descriptor will be generated during 'start' step. */ self->s = -1; self->errnum = 0; self->in.buf = NULL; self->in.len = 0; self->in.batch = NULL; self->in.batch_len = 0; self->in.batch_pos = 0; self->in.pfd = NULL; memset (&self->out.hdr, 0, sizeof (struct msghdr)); /* Initialise tasks for the worker thread. */ nn_worker_fd_init (&self->wfd, NN_USOCK_SRC_FD, &self->fsm); nn_worker_task_init (&self->task_connecting, NN_USOCK_SRC_TASK_CONNECTING, &self->fsm); nn_worker_task_init (&self->task_connected, NN_USOCK_SRC_TASK_CONNECTED, &self->fsm); nn_worker_task_init (&self->task_accept, NN_USOCK_SRC_TASK_ACCEPT, &self->fsm); nn_worker_task_init (&self->task_send, NN_USOCK_SRC_TASK_SEND, &self->fsm); nn_worker_task_init (&self->task_recv, NN_USOCK_SRC_TASK_RECV, &self->fsm); nn_worker_task_init (&self->task_stop, NN_USOCK_SRC_TASK_STOP, &self->fsm); /* Intialise events raised by usock. */ nn_fsm_event_init (&self->event_established); nn_fsm_event_init (&self->event_sent); nn_fsm_event_init (&self->event_received); nn_fsm_event_init (&self->event_error); /* accepting is not going on at the moment. */ self->asock = NULL; } void nn_usock_term (struct nn_usock *self) { nn_assert_state (self, NN_USOCK_STATE_IDLE); if (self->in.batch) nn_free (self->in.batch); nn_fsm_event_term (&self->event_error); nn_fsm_event_term (&self->event_received); nn_fsm_event_term (&self->event_sent); nn_fsm_event_term (&self->event_established); nn_worker_cancel (self->worker, &self->task_recv); nn_worker_task_term (&self->task_stop); nn_worker_task_term (&self->task_recv); nn_worker_task_term (&self->task_send); nn_worker_task_term (&self->task_accept); nn_worker_task_term (&self->task_connected); nn_worker_task_term (&self->task_connecting); nn_worker_fd_term (&self->wfd); nn_fsm_term (&self->fsm); } int nn_usock_isidle (struct nn_usock *self) { return nn_fsm_isidle (&self->fsm); } int nn_usock_start (struct nn_usock *self, int domain, int type, int protocol) { int s; /* If the operating system allows to directly open the socket with CLOEXEC flag, do so. That way there are no race conditions. */ //#ifdef SOCK_CLOEXEC // type |= SOCK_CLOEXEC; //#endif /* Open the underlying socket. */ s = socket (domain, type, protocol); if (nn_slow (s < 0)) return -errno; //printf("got socket s.%d\n",s); nn_usock_init_from_fd (self, s); /* Start the state machine. */ nn_fsm_start (&self->fsm); return 0; } void nn_usock_start_fd (struct nn_usock *self, int fd) { nn_usock_init_from_fd (self, fd); nn_fsm_start (&self->fsm); nn_fsm_action (&self->fsm, NN_USOCK_ACTION_STARTED); } static void nn_usock_init_from_fd (struct nn_usock *self, int s) { int rc; int opt; nn_assert (self->state == NN_USOCK_STATE_IDLE || NN_USOCK_STATE_BEING_ACCEPTED); /* Store the file descriptor. */ nn_assert (self->s == -1); self->s = s; /* Setting FD_CLOEXEC option immediately after socket creation is the second best option after using SOCK_CLOEXEC. There is a race condition here (if process is forked between socket creation and setting the option) but the problem is pretty unlikely to happen. */ //#if defined FD_CLOEXEC // rc = fcntl (self->s, F_SETFD, FD_CLOEXEC); //#if defined NN_HAVE_OSX // errno_assert (rc != -1 || errno == EINVAL); //#else // errno_assert (rc != -1); //#endif //#endif // If applicable, prevent SIGPIPE signal when writing to the connection already closed by the peer #ifdef SO_NOSIGPIPE opt = 1; rc = setsockopt (self->s, SOL_SOCKET, SO_NOSIGPIPE, &opt, sizeof (opt)); #if defined NN_HAVE_OSX errno_assert (rc == 0 || errno == EINVAL); #else errno_assert (rc == 0); #endif #endif // Switch the socket to the non-blocking mode. All underlying sockets are always used in the callbackhronous mode opt = fcntl(self->s, F_GETFL, 0); if ( opt == -1 ) opt = 0; if ( !(opt & O_NONBLOCK) ) { rc = fcntl(self->s, F_SETFL, opt | O_NONBLOCK); #if defined NN_HAVE_OSX errno_assert (rc != -1 || errno == EINVAL); #else errno_assert (rc != -1); #endif } } void nn_usock_stop (struct nn_usock *self) { nn_fsm_stop (&self->fsm); } void nn_usock_async_stop (struct nn_usock *self) { nn_worker_execute (self->worker, &self->task_stop); nn_fsm_raise (&self->fsm, &self->event_error, NN_USOCK_SHUTDOWN); } void nn_usock_swap_owner (struct nn_usock *self, struct nn_fsm_owner *owner) { nn_fsm_swap_owner (&self->fsm, owner); } int nn_usock_setsockopt (struct nn_usock *self, int level, int optname, const void *optval, size_t optlen) { int rc; /* The socket can be modified only before it's active. */ nn_assert (self->state == NN_USOCK_STATE_STARTING || self->state == NN_USOCK_STATE_ACCEPTED); /* EINVAL errors are ignored on OSX platform. The reason for that is buggy OSX behaviour where setsockopt returns EINVAL if the peer have already disconnected. Thus, nn_usock_setsockopt() can succeed on OSX even though the option value was invalid, but the peer have already closed the connection. This behaviour should be relatively harmless. */ rc = setsockopt (self->s, level, optname, optval, (socklen_t) optlen); #if defined NN_HAVE_OSX if (nn_slow (rc != 0 && errno != EINVAL)) return -errno; #else if (nn_slow (rc != 0)) return -errno; #endif return 0; } int nn_usock_bind (struct nn_usock *self, const struct sockaddr *addr, size_t addrlen) { int rc; int opt; /* The socket can be bound only before it's connected. */ nn_assert_state (self, NN_USOCK_STATE_STARTING); /* Allow re-using the address. */ opt = 1; printf("call setsockopt %d SOL_SOCKET.%d SO_REUSEADDR.%d in nn_usock_bind\n",self->s,SOL_SOCKET,SO_REUSEADDR); rc = setsockopt (self->s, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof (opt)); printf("called setsockopt in nn_usock_bind returns %d\n",rc); // ignore SO_REUSEADDR failures //errno_assert (rc == 0); rc = bind (self->s, addr, (socklen_t) addrlen); printf("usock.%d -> bind rc.%d errno.%d %s\n",self->s,rc,errno,nn_strerror(errno)); if (nn_slow (rc != 0)) return -errno; return 0; } int nn_usock_listen (struct nn_usock *self, int backlog) { int rc; /* You can start listening only before the socket is connected. */ nn_assert_state (self, NN_USOCK_STATE_STARTING); /* Start listening for incoming connections. */ rc = listen (self->s, backlog); printf("usock.%d -> listen rc.%d errno.%d %s\n",self->s,rc,errno,nn_strerror(errno)); if (nn_slow (rc != 0)) return -errno; /* Notify the state machine. */ nn_fsm_action (&self->fsm, NN_USOCK_ACTION_LISTEN); return 0; } void nn_usock_accept (struct nn_usock *self, struct nn_usock *listener) { int s; /* Start the actual accepting. */ if (nn_fsm_isidle(&self->fsm)) { nn_fsm_start (&self->fsm); nn_fsm_action (&self->fsm, NN_USOCK_ACTION_BEING_ACCEPTED); } nn_fsm_action (&listener->fsm, NN_USOCK_ACTION_ACCEPT); /* Try to accept new connection in synchronous manner. */ #if NN_HAVE_ACCEPT4 s = accept4 (listener->s, NULL, NULL, SOCK_CLOEXEC); #else s = accept (listener->s, NULL, NULL); #endif printf("usock.%d -> accept errno.%d s.%d %s\n",self->s,errno,s,nn_strerror(errno)); /* Immediate success. */ if (nn_fast (s >= 0)) { /* Disassociate the listener socket from the accepted socket. Is useful if we restart accepting on ACCEPT_ERROR */ listener->asock = NULL; self->asock = NULL; nn_usock_init_from_fd (self, s); nn_fsm_action (&listener->fsm, NN_USOCK_ACTION_DONE); nn_fsm_action (&self->fsm, NN_USOCK_ACTION_DONE); return; } /* Detect a failure. Note that in ECONNABORTED case we simply ignore the error and wait for next connection in asynchronous manner. */ errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || errno == ECONNABORTED || errno == ENFILE || errno == EMFILE || errno == ENOBUFS || errno == ENOMEM); /* Pair the two sockets. They are already paired in case previous attempt failed on ACCEPT_ERROR */ nn_assert (!self->asock || self->asock == listener); self->asock = listener; nn_assert (!listener->asock || listener->asock == self); listener->asock = self; /* Some errors are just ok to ignore for now. We also stop repeating any errors until next IN_FD event so that we are not in a tight loop and allow processing other events in the meantime */ if (nn_slow (errno != EAGAIN && errno != EWOULDBLOCK && errno != ECONNABORTED && errno != listener->errnum)) { printf("listen errno.%d\n",errno); listener->errnum = errno; listener->state = NN_USOCK_STATE_ACCEPTING_ERROR; nn_fsm_raise (&listener->fsm, &listener->event_error, NN_USOCK_ACCEPT_ERROR); return; } /* Ask the worker thread to wait for the new connection. */ nn_worker_execute (listener->worker, &listener->task_accept); } void nn_usock_activate (struct nn_usock *self) { nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ACTIVATE); } void nn_usock_connect (struct nn_usock *self, const struct sockaddr *addr, size_t addrlen) { int rc; /* Notify the state machine that we've started connecting. */ nn_fsm_action (&self->fsm, NN_USOCK_ACTION_CONNECT); /* Do the connect itself. */ rc = connect(self->s,addr,(socklen_t)addrlen); printf("usock.%d <- connect (%llx) rc.%d errno.%d %s\n",self->s,*(long long *)addr,rc,errno,nn_strerror(errno)); /* Immediate success. */ if ( nn_fast(rc == 0) ) { nn_fsm_action(&self->fsm,NN_USOCK_ACTION_DONE); return; } /* Immediate error. */ if ( nn_slow(errno != EINPROGRESS) ) { self->errnum = errno; printf("error.%d not EINPROGRESS\n",errno); nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR); return; } /* Start asynchronous connect. */ nn_worker_execute (self->worker, &self->task_connecting); } void nn_usock_send (struct nn_usock *self, const struct nn_iovec *iov, int iovcnt) { int rc; int i; int out; /* Make sure that the socket is actually alive. */ nn_assert_state (self, NN_USOCK_STATE_ACTIVE); /* Copy the iovecs to the socket. */ nn_assert (iovcnt <= NN_USOCK_MAX_IOVCNT); self->out.hdr.msg_iov = self->out.iov; out = 0; for (i = 0; i != iovcnt; ++i) { if (iov [i].iov_len == 0) continue; self->out.iov [out].iov_base = iov [i].iov_base; self->out.iov [out].iov_len = iov [i].iov_len; out++; printf("{%d} ",(int)iov [i].iov_len); } self->out.hdr.msg_iovlen = out; /* Try to send the data immediately. */ rc = nn_usock_send_raw (self, &self->out.hdr); printf("iov[%d] nn_usock_send_raw -> rc.%d\n",out,rc); /* Success. */ if (nn_fast (rc == 0)) { nn_fsm_raise (&self->fsm, &self->event_sent, NN_USOCK_SENT); return; } /* Errors. */ if (nn_slow (rc != -EAGAIN)) { errnum_assert (rc == -ECONNRESET, -rc); nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR); return; } /* Ask the worker thread to send the remaining data. */ nn_worker_execute (self->worker, &self->task_send); } void nn_usock_recv (struct nn_usock *self, void *buf, size_t len, int *fd) { int rc; size_t nbytes; /* Make sure that the socket is actually alive. */ nn_assert_state (self, NN_USOCK_STATE_ACTIVE); /* Try to receive the data immediately. */ nbytes = len; self->in.pfd = fd; rc = nn_usock_recv_raw (self, buf, &nbytes); if (nn_slow (rc < 0)) { errnum_assert (rc == -ECONNRESET, -rc); //printf("rc.%d vs ECONNRESET\n",rc,ECONNRESET); nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR); return; } //int i; //for (i=0; i<16&&ifsm, &self->event_received, NN_USOCK_RECEIVED); return; } /* There are still data to receive in the background. */ self->in.buf = ((uint8_t*) buf) + nbytes; self->in.len = len - nbytes; /* Ask the worker thread to receive the remaining data. */ nn_worker_execute (self->worker, &self->task_recv); } static int nn_internal_tasks (struct nn_usock *usock, int src, int type) { /******************************************************************************/ /* Internal tasks sent from the user thread to the worker thread. */ /******************************************************************************/ switch (src) { case NN_USOCK_SRC_TASK_SEND: nn_assert (type == NN_WORKER_TASK_EXECUTE); nn_worker_set_out (usock->worker, &usock->wfd); return 1; case NN_USOCK_SRC_TASK_RECV: nn_assert (type == NN_WORKER_TASK_EXECUTE); nn_worker_set_in (usock->worker, &usock->wfd); return 1; case NN_USOCK_SRC_TASK_CONNECTED: nn_assert (type == NN_WORKER_TASK_EXECUTE); nn_worker_add_fd (usock->worker, usock->s, &usock->wfd); return 1; case NN_USOCK_SRC_TASK_CONNECTING: nn_assert (type == NN_WORKER_TASK_EXECUTE); nn_worker_add_fd (usock->worker, usock->s, &usock->wfd); nn_worker_set_out (usock->worker, &usock->wfd); return 1; case NN_USOCK_SRC_TASK_ACCEPT: nn_assert (type == NN_WORKER_TASK_EXECUTE); nn_worker_add_fd (usock->worker, usock->s, &usock->wfd); nn_worker_set_in (usock->worker, &usock->wfd); return 1; } return 0; } static void nn_usock_shutdown (struct nn_fsm *self, int src, int type, NN_UNUSED void *srcptr) { struct nn_usock *usock; usock = nn_cont (self, struct nn_usock, fsm); if (nn_internal_tasks (usock, src, type)) return; if (nn_slow (src == NN_FSM_ACTION && type == NN_FSM_STOP)) { /* Socket in ACCEPTING or CANCELLING state cannot be closed. Stop the socket being accepted first. */ nn_assert (usock->state != NN_USOCK_STATE_ACCEPTING && usock->state != NN_USOCK_STATE_CANCELLING); usock->errnum = 0; /* Synchronous stop. */ if (usock->state == NN_USOCK_STATE_IDLE) goto finish3; if (usock->state == NN_USOCK_STATE_DONE) goto finish2; if (usock->state == NN_USOCK_STATE_STARTING || usock->state == NN_USOCK_STATE_ACCEPTED || usock->state == NN_USOCK_STATE_ACCEPTING_ERROR || usock->state == NN_USOCK_STATE_LISTENING) goto finish1; /* When socket that's being accepted is asked to stop, we have to ask the listener socket to stop accepting first. */ if (usock->state == NN_USOCK_STATE_BEING_ACCEPTED) { nn_fsm_action (&usock->asock->fsm, NN_USOCK_ACTION_CANCEL); usock->state = NN_USOCK_STATE_STOPPING_ACCEPT; return; } /* Asynchronous stop. */ if (usock->state != NN_USOCK_STATE_REMOVING_FD) nn_usock_async_stop (usock); usock->state = NN_USOCK_STATE_STOPPING; return; } if (nn_slow (usock->state == NN_USOCK_STATE_STOPPING_ACCEPT)) { nn_assert (src == NN_FSM_ACTION && type == NN_USOCK_ACTION_DONE); goto finish2; } if (nn_slow (usock->state == NN_USOCK_STATE_STOPPING)) { if (src != NN_USOCK_SRC_TASK_STOP) return; nn_assert (type == NN_WORKER_TASK_EXECUTE); nn_worker_rm_fd (usock->worker, &usock->wfd); finish1: nn_closefd (usock->s); usock->s = -1; finish2: usock->state = NN_USOCK_STATE_IDLE; nn_fsm_stopped (&usock->fsm, NN_USOCK_STOPPED); finish3: return; } nn_fsm_bad_state(usock->state, src, type); } static void nn_usock_handler (struct nn_fsm *self, int src, int type, NN_UNUSED void *srcptr) { int rc; struct nn_usock *usock; int s; size_t sz; int sockerr; usock = nn_cont (self, struct nn_usock, fsm); if(nn_internal_tasks(usock, src, type)) return; switch (usock->state) { /******************************************************************************/ /* IDLE state. */ /* nn_usock object is initialised, but underlying OS socket is not yet */ /* created. */ /******************************************************************************/ case NN_USOCK_STATE_IDLE: switch (src) { case NN_FSM_ACTION: switch (type) { case NN_FSM_START: usock->state = NN_USOCK_STATE_STARTING; return; default: nn_fsm_bad_action (usock->state, src, type); } default: nn_fsm_bad_source (usock->state, src, type); } /******************************************************************************/ /* STARTING state. */ /* Underlying OS socket is created, but it's not yet passed to the worker */ /* thread. In this state we can set socket options, local and remote */ /* address etc. */ /******************************************************************************/ case NN_USOCK_STATE_STARTING: /* Events from the owner of the usock. */ switch (src) { case NN_FSM_ACTION: switch (type) { case NN_USOCK_ACTION_LISTEN: usock->state = NN_USOCK_STATE_LISTENING; return; case NN_USOCK_ACTION_CONNECT: usock->state = NN_USOCK_STATE_CONNECTING; return; case NN_USOCK_ACTION_BEING_ACCEPTED: usock->state = NN_USOCK_STATE_BEING_ACCEPTED; return; case NN_USOCK_ACTION_STARTED: nn_worker_add_fd (usock->worker, usock->s, &usock->wfd); usock->state = NN_USOCK_STATE_ACTIVE; return; default: nn_fsm_bad_action (usock->state, src, type); } default: nn_fsm_bad_source (usock->state, src, type); } /******************************************************************************/ /* BEING_ACCEPTED state. */ /* accept() was called on the usock. Now the socket is waiting for a new */ /* connection to arrive. */ /******************************************************************************/ case NN_USOCK_STATE_BEING_ACCEPTED: switch (src) { case NN_FSM_ACTION: switch (type) { case NN_USOCK_ACTION_DONE: usock->state = NN_USOCK_STATE_ACCEPTED; nn_fsm_raise (&usock->fsm, &usock->event_established, NN_USOCK_ACCEPTED); return; default: nn_fsm_bad_action (usock->state, src, type); } default: nn_fsm_bad_source (usock->state, src, type); } /******************************************************************************/ /* ACCEPTED state. */ /* Connection was accepted, now it can be tuned. Afterwards, it'll move to */ /* the active state. */ /******************************************************************************/ case NN_USOCK_STATE_ACCEPTED: switch (src) { case NN_FSM_ACTION: switch (type) { case NN_USOCK_ACTION_ACTIVATE: nn_worker_add_fd (usock->worker, usock->s, &usock->wfd); usock->state = NN_USOCK_STATE_ACTIVE; return; default: nn_fsm_bad_action (usock->state, src, type); } default: nn_fsm_bad_source (usock->state, src, type); } /******************************************************************************/ /* CONNECTING state. */ /* Asynchronous connecting is going on. */ /******************************************************************************/ case NN_USOCK_STATE_CONNECTING: switch (src) { case NN_FSM_ACTION: switch (type) { case NN_USOCK_ACTION_DONE: usock->state = NN_USOCK_STATE_ACTIVE; nn_worker_execute (usock->worker, &usock->task_connected); nn_fsm_raise (&usock->fsm, &usock->event_established, NN_USOCK_CONNECTED); return; case NN_USOCK_ACTION_ERROR: nn_closefd (usock->s); usock->s = -1; usock->state = NN_USOCK_STATE_DONE; nn_fsm_raise (&usock->fsm, &usock->event_error, NN_USOCK_ERROR); return; default: nn_fsm_bad_action (usock->state, src, type); } case NN_USOCK_SRC_FD: switch (type) { case NN_WORKER_FD_OUT: nn_worker_reset_out (usock->worker, &usock->wfd); usock->state = NN_USOCK_STATE_ACTIVE; sockerr = nn_usock_geterr(usock); if (sockerr == 0) { nn_fsm_raise (&usock->fsm, &usock->event_established, NN_USOCK_CONNECTED); } else { usock->errnum = sockerr; nn_worker_rm_fd (usock->worker, &usock->wfd); rc = close (usock->s); errno_assert (rc == 0); usock->s = -1; usock->state = NN_USOCK_STATE_DONE; nn_fsm_raise (&usock->fsm, &usock->event_error, NN_USOCK_ERROR); } return; case NN_WORKER_FD_ERR: nn_worker_rm_fd (usock->worker, &usock->wfd); nn_closefd (usock->s); usock->s = -1; usock->state = NN_USOCK_STATE_DONE; nn_fsm_raise (&usock->fsm, &usock->event_error, NN_USOCK_ERROR); return; default: nn_fsm_bad_action (usock->state, src, type); } default: nn_fsm_bad_source (usock->state, src, type); } /******************************************************************************/ /* ACTIVE state. */ /* Socket is connected. It can be used for sending and receiving data. */ /******************************************************************************/ case NN_USOCK_STATE_ACTIVE: switch (src) { case NN_USOCK_SRC_FD: switch (type) { case NN_WORKER_FD_IN: sz = usock->in.len; rc = nn_usock_recv_raw (usock, usock->in.buf, &sz); if (nn_fast (rc == 0)) { usock->in.len -= sz; usock->in.buf += sz; if (!usock->in.len) { nn_worker_reset_in (usock->worker, &usock->wfd); nn_fsm_raise (&usock->fsm, &usock->event_received, NN_USOCK_RECEIVED); } return; } errnum_assert (rc == -ECONNRESET, -rc); goto error; case NN_WORKER_FD_OUT: rc = nn_usock_send_raw (usock, &usock->out.hdr); if (nn_fast (rc == 0)) { nn_worker_reset_out (usock->worker, &usock->wfd); nn_fsm_raise (&usock->fsm, &usock->event_sent, NN_USOCK_SENT); return; } if (nn_fast (rc == -EAGAIN)) return; errnum_assert (rc == -ECONNRESET, -rc); goto error; case NN_WORKER_FD_ERR: error: nn_worker_rm_fd (usock->worker, &usock->wfd); nn_closefd (usock->s); usock->s = -1; usock->state = NN_USOCK_STATE_DONE; nn_fsm_raise (&usock->fsm, &usock->event_error, NN_USOCK_ERROR); return; default: nn_fsm_bad_action (usock->state, src, type); } case NN_FSM_ACTION: switch (type) { case NN_USOCK_ACTION_ERROR: usock->state = NN_USOCK_STATE_REMOVING_FD; nn_usock_async_stop (usock); return; default: nn_fsm_bad_action (usock->state, src, type); } default: nn_fsm_bad_source(usock->state, src, type); } /******************************************************************************/ /* REMOVING_FD state. */ /******************************************************************************/ case NN_USOCK_STATE_REMOVING_FD: switch (src) { case NN_USOCK_SRC_TASK_STOP: switch (type) { case NN_WORKER_TASK_EXECUTE: nn_worker_rm_fd (usock->worker, &usock->wfd); nn_closefd (usock->s); usock->s = -1; usock->state = NN_USOCK_STATE_DONE; nn_fsm_raise (&usock->fsm, &usock->event_error, NN_USOCK_ERROR); return; default: nn_fsm_bad_action (usock->state, src, type); } /* Events from the file descriptor are ignored while it is being removed. */ case NN_USOCK_SRC_FD: return; default: nn_fsm_bad_source (usock->state, src, type); } /******************************************************************************/ /* DONE state. */ /* Socket is closed. The only thing that can be done in this state is */ /* stopping the usock. */ /******************************************************************************/ case NN_USOCK_STATE_DONE: nn_fsm_bad_source (usock->state, src, type); /******************************************************************************/ /* LISTENING state. */ /* Socket is listening for new incoming connections, however, user is not */ /* accepting a new connection. */ /******************************************************************************/ case NN_USOCK_STATE_LISTENING: switch (src) { case NN_FSM_ACTION: switch (type) { case NN_USOCK_ACTION_ACCEPT: usock->state = NN_USOCK_STATE_ACCEPTING; return; default: nn_fsm_bad_action (usock->state, src, type); } default: nn_fsm_bad_source (usock->state, src, type); } /******************************************************************************/ /* ACCEPTING state. */ /* User is waiting asynchronouslyfor a new inbound connection */ /* to be accepted. */ /******************************************************************************/ case NN_USOCK_STATE_ACCEPTING: switch (src) { case NN_FSM_ACTION: switch (type) { case NN_USOCK_ACTION_DONE: usock->state = NN_USOCK_STATE_LISTENING; return; case NN_USOCK_ACTION_CANCEL: usock->state = NN_USOCK_STATE_CANCELLING; nn_worker_execute (usock->worker, &usock->task_stop); return; default: nn_fsm_bad_action (usock->state, src, type); } case NN_USOCK_SRC_FD: switch (type) { case NN_WORKER_FD_IN: /* New connection arrived in asynchronous manner. */ #if NN_HAVE_ACCEPT4 s = accept4 (usock->s, NULL, NULL, SOCK_CLOEXEC); #else s = accept (usock->s, NULL, NULL); #endif /* ECONNABORTED is an valid error. New connection was closed by the peer before we were able to accept it. If it happens do nothing and wait for next incoming connection. */ if (nn_slow (s < 0 && errno == ECONNABORTED)) return; /* Resource allocation errors. It's not clear from POSIX specification whether the new connection is closed in this case or whether it remains in the backlog. In the latter case it would be wise to wait here for a while to prevent busy looping. */ if (nn_slow (s < 0 && (errno == ENFILE || errno == EMFILE || errno == ENOBUFS || errno == ENOMEM))) { usock->errnum = errno; usock->state = NN_USOCK_STATE_ACCEPTING_ERROR; /* Wait till the user starts accepting once again. */ nn_worker_rm_fd (usock->worker, &usock->wfd); nn_fsm_raise (&usock->fsm, &usock->event_error, NN_USOCK_ACCEPT_ERROR); return; } /* Any other error is unexpected. */ errno_assert (s >= 0); /* Initialise the new usock object. */ nn_usock_init_from_fd (usock->asock, s); usock->asock->state = NN_USOCK_STATE_ACCEPTED; /* Notify the user that connection was accepted. */ nn_fsm_raise (&usock->asock->fsm, &usock->asock->event_established, NN_USOCK_ACCEPTED); /* Disassociate the listener socket from the accepted socket. */ usock->asock->asock = NULL; usock->asock = NULL; /* Wait till the user starts accepting once again. */ nn_worker_rm_fd (usock->worker, &usock->wfd); usock->state = NN_USOCK_STATE_LISTENING; return; default: nn_fsm_bad_action (usock->state, src, type); } default: nn_fsm_bad_source (usock->state, src, type); } /******************************************************************************/ /* ACCEPTING_ERROR state. */ /* Waiting the socket to accept the error and restart */ /******************************************************************************/ case NN_USOCK_STATE_ACCEPTING_ERROR: switch (src) { case NN_FSM_ACTION: switch (type) { case NN_USOCK_ACTION_ACCEPT: usock->state = NN_USOCK_STATE_ACCEPTING; return; default: nn_fsm_bad_action (usock->state, src, type); } default: nn_fsm_bad_source (usock->state, src, type); } /******************************************************************************/ /* CANCELLING state. */ /******************************************************************************/ case NN_USOCK_STATE_CANCELLING: switch (src) { case NN_USOCK_SRC_TASK_STOP: switch (type) { case NN_WORKER_TASK_EXECUTE: nn_worker_rm_fd (usock->worker, &usock->wfd); usock->state = NN_USOCK_STATE_LISTENING; /* Notify the accepted socket that it was stopped. */ nn_fsm_action (&usock->asock->fsm, NN_USOCK_ACTION_DONE); return; default: nn_fsm_bad_action (usock->state, src, type); } case NN_USOCK_SRC_FD: switch (type) { case NN_WORKER_FD_IN: return; default: nn_fsm_bad_action (usock->state, src, type); } default: nn_fsm_bad_source (usock->state, src, type); } /******************************************************************************/ /* Invalid state */ /******************************************************************************/ default: nn_fsm_bad_state (usock->state, src, type); } } int32_t nn_getiovec_size(uint8_t *buf,int32_t maxlen,struct msghdr *hdr) { int32_t i,size = 0; struct iovec *iov; for (i=0; imsg_iovlen; i++) { iov = &hdr->msg_iov[i]; if ( nn_slow(iov->iov_len == NN_MSG) ) { errno = EINVAL; printf("ERROR: iov->iov_len == NN_MSG\n"); return(-1); } if ( nn_slow(!iov->iov_base && iov->iov_len) ) { errno = EFAULT; printf("ERROR: !iov->iov_base && iov->iov_len\n"); return(-1); } if ( maxlen > 0 && nn_slow(size + iov->iov_len > maxlen) ) { errno = EINVAL; printf("ERROR: sz.%d + iov->iov_len.%d < maxlen.%d\n",(int32_t)size,(int32_t)iov->iov_len,maxlen); return(-1); } if ( iov->iov_len > 0 ) { if ( buf != 0 ) memcpy(&buf[size],iov->iov_base,iov->iov_len); size += (int32_t)iov->iov_len; } } return(size); } ssize_t mysendmsg(int32_t usock,struct msghdr *hdr,int32_t flags) { ssize_t nbytes = 0; int32_t veclen,offset,clen,err = 0; uint8_t *buf,_buf[8192]; if ( (veclen= nn_getiovec_size(0,0,hdr)) > 0 ) { clen = hdr->msg_controllen; if ( hdr->msg_control == 0 ) clen = 0; nn_assert(clen == 0); // no support control messagies if ( veclen > sizeof(_buf) ) // - clen - 5) ) buf = malloc(veclen);// + clen + 5); else buf = _buf; offset = 0; /*buf[offset++] = (veclen & 0xff); buf[offset++] = ((veclen>>8) & 0xff); buf[offset++] = ((veclen>>15) & 0xff); buf[offset++] = (clen & 0xff); buf[offset++] = ((clen>>8) & 0xff); if ( clen > 0 ) memcpy(&buf[offset],hdr->msg_control,clen), offset += clen;*/ if ( nn_getiovec_size(&buf[offset],veclen,hdr) == veclen ) { nbytes = send(usock,buf,offset + veclen,0); printf(">>>>>>>>> send.[%d %d %d %d] (n.%d v.%d c.%d)-> usock.%d nbytes.%d\n",buf[offset],buf[offset+1],buf[offset+2],buf[offset+3],(int32_t)offset+veclen,veclen,clen,usock,(int32_t)nbytes); if ( nbytes != offset + veclen ) { printf("nbytes.%d != offset.%d veclen.%d errno.%d usock.%d\n",(int32_t)nbytes,(int32_t)offset,veclen,errno,usock); } if ( nbytes >= offset ) nbytes -= offset; } else { err = -errno; printf("mysendmsg: unexpected nn_getiovec_size error %d\n",err); } if ( buf != _buf ) free(buf); if ( err != 0 ) { printf("nn_usock_send_raw errno.%d err.%d\n",errno,err); return(-errno); } } else { printf("nn_usock_send_raw errno.%d invalid iovec size\n",errno); return(-errno); } return(nbytes); } ssize_t myrecvmsg(int32_t usock,struct msghdr *hdr,int32_t flags,int32_t len) { ssize_t nbytes; struct iovec *iov; //uint8_t lens[5]; iov = hdr->msg_iov; /*if ( (n= (int32_t)recv(usock,lens,sizeof(lens),0)) != sizeof(lens) ) { printf("error getting veclen/clen n.%d vs %d from usock.%d\n",n,(int32_t)sizeof(lens),usock); return(0); } else printf("GOT %d bytes from usock.%d\n",n,usock); offset = 0; veclen = lens[offset++]; veclen |= ((int32_t)lens[offset++] << 8); veclen |= ((int32_t)lens[offset++] << 16); clen = lens[offset++]; clen |= ((int32_t)lens[offset++] << 8); printf("veclen.%d clen.%d waiting in usock.%d\n",veclen,clen,usock); if ( clen > 0 ) { if ( (cbytes= (int32_t)recv(usock,hdr->msg_control,clen,0)) != clen ) { printf("myrecvmsg: unexpected cbytes.%d vs clen.%d\n",cbytes,clen); } } else cbytes = 0;*/ hdr->msg_controllen = 0; if ( (nbytes= (int32_t)recv(usock,iov->iov_base,len,0)) != len ) { //printf("myrecvmsg: partial nbytes.%d vs veclen.%d\n",(int32_t)nbytes,len); } //printf("GOT nbytes.%d of len.%d from usock.%d\n",(int32_t)nbytes,len,usock); if ( 0 && nbytes > 0 ) { printf("got nbytes.%d from usock.%d [%d %d %d %d]\n",(int32_t)nbytes,usock,((uint8_t *)iov->iov_base)[0],((uint8_t *)iov->iov_base)[1],((uint8_t *)iov->iov_base)[2],((uint8_t *)iov->iov_base)[3]); } return(nbytes); } static int nn_usock_send_raw (struct nn_usock *self, struct msghdr *hdr) { ssize_t nbytes; #if NN_USE_MYMSG nbytes = mysendmsg(self->s,hdr,0); #else #if defined MSG_NOSIGNAL nbytes = sendmsg(self->s,hdr,MSG_NOSIGNAL); #else nbytes = sendmsg(self->s,hdr,0); printf("nn_usock_send_raw nbytes.%d\n",(int32_t)nbytes); #endif #endif /* Handle errors. */ if (nn_slow (nbytes < 0)) { if (nn_fast (errno == EAGAIN || errno == EWOULDBLOCK)) nbytes = 0; else { /* If the connection fails, return ECONNRESET. */ errno_assert (errno == ECONNRESET || errno == ETIMEDOUT || errno == EPIPE || errno == ECONNREFUSED || errno == ENOTCONN); return -ECONNRESET; } } /* Some bytes were sent. Adjust the iovecs accordingly. */ while (nbytes) { if (nbytes >= (ssize_t)hdr->msg_iov->iov_len) { --hdr->msg_iovlen; if (!hdr->msg_iovlen) { nn_assert (nbytes == (ssize_t)hdr->msg_iov->iov_len); return 0; } nbytes -= hdr->msg_iov->iov_len; ++hdr->msg_iov; } else { hdr->msg_iov->iov_base = &((uint8_t *)hdr->msg_iov->iov_base)[nbytes]; //*((uint8_t **)&(hdr->msg_iov->iov_base)) += nbytes; hdr->msg_iov->iov_len -= nbytes; return -EAGAIN; } } if (hdr->msg_iovlen > 0) return -EAGAIN; return 0; } int32_t nn_process_cmsg(struct nn_usock *self,struct msghdr *hdr) { // Extract the associated file descriptor, if any int32_t retval = -1; #if defined NN_HAVE_MSG_CONTROL struct cmsghdr *cmsg; cmsg = CMSG_FIRSTHDR(hdr); while ( cmsg ) { if ( cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS ) { memcpy(&retval,(int32_t *)CMSG_DATA(cmsg),sizeof(int32_t)); if ( self->in.pfd ) { printf("CMSG set self->in.pfd (%d)\n",retval); *self->in.pfd = retval; self->in.pfd = NULL; } else { printf("CMSG nn_closefd(%d)\n",retval); nn_closefd(retval); } break; } cmsg = CMSG_NXTHDR(hdr,cmsg); } #else if ( hdr->msg_accrightslen > 0 ) { nn_assert(hdr->msg_accrightslen == sizeof(int32_t)); retval = *((int32_t *)hdr->msg_accrights); if ( self->in.pfd ) { *self->in.pfd = retval; self->in.pfd = NULL; } else nn_closefd(retval); } #endif return(retval); } static int nn_usock_recv_raw(struct nn_usock *self, void *buf, size_t *len) { int usebuf = 0; size_t sz; size_t length; ssize_t nbytes; struct iovec iov; struct msghdr hdr; unsigned char ctrl [256]; length = *len; /* If batch buffer doesn't exist, allocate it. The point of delayed deallocation to allow non-receiving sockets, such as TCP listening sockets, to do without the batch buffer. */ if (nn_slow (!self->in.batch)) { self->in.batch = nn_alloc (NN_USOCK_BATCH_SIZE, "AIO batch buffer"); alloc_assert (self->in.batch); } /* Try to satisfy the recv request by data from the batch buffer. */ sz = self->in.batch_len - self->in.batch_pos; if (sz) { if (sz > length) sz = length; memcpy (buf, self->in.batch + self->in.batch_pos, sz); self->in.batch_pos += sz; buf = ((char*) buf) + sz; length -= sz; if (!length) return 0; } #if NN_USE_MYMSG usebuf = (length >= NN_USOCK_BATCH_SIZE); #else usebuf = (length >= NN_USOCK_BATCH_SIZE); #endif // If recv request is greater than the batch buffer, get the data directly into the place. Otherwise, read data to the batch buffer if ( usebuf != 0 ) { iov.iov_base = buf; iov.iov_len = length; } else { iov.iov_base = self->in.batch; iov.iov_len = NN_USOCK_BATCH_SIZE; } memset(&hdr,0,sizeof(hdr)); hdr.msg_iov = &iov; hdr.msg_iovlen = 1; #if defined NN_HAVE_MSG_CONTROL hdr.msg_control = ctrl; hdr.msg_controllen = sizeof(ctrl); #else *((int*) ctrl) = -1; hdr.msg_accrights = ctrl; hdr.msg_accrightslen = sizeof(int); #endif #if NN_USE_MYMSG nbytes = myrecvmsg(self->s,&hdr,0,(int32_t)iov.iov_len); printf("got nbytes.%d from recvmsg errno.%d %s\n",(int32_t)nbytes,errno,nn_strerror(errno)); #else nbytes = recvmsg (self->s, &hdr, 0); #endif if ( nn_slow(nbytes <= 0) ) { if ( nn_slow(nbytes == 0) ) return -ECONNRESET; if ( nn_fast(errno == EAGAIN || errno == EWOULDBLOCK) ) // Zero bytes received nbytes = 0; else { printf("recvraw errno.%d %s\n",errno,nn_strerror(errno)); // If the peer closes the connection, return ECONNRESET errno_assert(errno == ECONNRESET || errno == ENOTCONN || errno == ECONNREFUSED || errno == ETIMEDOUT || errno == EHOSTUNREACH #if NN_USE_MYMSG // || errno == EADDRINUSE || errno == EINPROGRESS #endif ); return -ECONNRESET; } } else if ( hdr.msg_controllen > 0 ) nn_process_cmsg(self,&hdr); printf("nbytes.%d length.%d *len %d\n",(int)nbytes,(int)length,(int)*len); // If the data were received directly into the place we can return straight away if ( usebuf != 0 ) { length -= nbytes; *len -= length; return 0; } // New data were read to the batch buffer. Copy the requested amount of it to the user-supplied buffer self->in.batch_len = nbytes; self->in.batch_pos = 0; if (nbytes) { sz = nbytes > (ssize_t)length ? length : (size_t)nbytes; memcpy (buf, self->in.batch, sz); length -= sz; self->in.batch_pos += sz; } *len -= length; return 0; } static int nn_usock_geterr (struct nn_usock *self) { int rc; int opt; #if defined NN_HAVE_HPUX int optsz; #else socklen_t optsz; #endif opt = 0; optsz = sizeof (opt); rc = getsockopt (self->s, SOL_SOCKET, SO_ERROR, &opt, &optsz); /* The following should handle both Solaris and UNIXes derived from BSD. */ if (rc == -1) return errno; errno_assert (rc == 0); nn_assert (optsz == sizeof (opt)); return opt; }