You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1359 lines
47 KiB
1359 lines
47 KiB
/*
|
|
Copyright (c) 2013 Martin Sustrik All rights reserved.
|
|
Copyright (c) 2013 GoPivotal, Inc. All rights reserved.
|
|
|
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
of this software and associated documentation files (the "Software"),
|
|
to deal in the Software without restriction, including without limitation
|
|
the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
|
and/or sell copies of the Software, and to permit persons to whom
|
|
the Software is furnished to do so, subject to the following conditions:
|
|
|
|
The above copyright notice and this permission notice shall be included
|
|
in all copies or substantial portions of the Software.
|
|
|
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
|
|
THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
|
IN THE SOFTWARE.
|
|
*/
|
|
|
|
#include "../nn_config.h"
|
|
|
|
#include "../utils/alloc.h"
|
|
#include "../utils/closefd.h"
|
|
#include "../utils/cont.h"
|
|
#include "../utils/fast.h"
|
|
#include "../utils/err.h"
|
|
#include "../utils/attr.h"
|
|
|
|
#include <string.h>
|
|
#include <stdlib.h>
|
|
#include <unistd.h>
|
|
#include <fcntl.h>
|
|
|
|
#define NN_USOCK_STATE_IDLE 1
|
|
#define NN_USOCK_STATE_STARTING 2
|
|
#define NN_USOCK_STATE_BEING_ACCEPTED 3
|
|
#define NN_USOCK_STATE_ACCEPTED 4
|
|
#define NN_USOCK_STATE_CONNECTING 5
|
|
#define NN_USOCK_STATE_ACTIVE 6
|
|
#define NN_USOCK_STATE_REMOVING_FD 7
|
|
#define NN_USOCK_STATE_DONE 8
|
|
#define NN_USOCK_STATE_LISTENING 9
|
|
#define NN_USOCK_STATE_ACCEPTING 10
|
|
#define NN_USOCK_STATE_CANCELLING 11
|
|
#define NN_USOCK_STATE_STOPPING 12
|
|
#define NN_USOCK_STATE_STOPPING_ACCEPT 13
|
|
#define NN_USOCK_STATE_ACCEPTING_ERROR 14
|
|
|
|
#define NN_USOCK_ACTION_ACCEPT 1
|
|
#define NN_USOCK_ACTION_BEING_ACCEPTED 2
|
|
#define NN_USOCK_ACTION_CANCEL 3
|
|
#define NN_USOCK_ACTION_LISTEN 4
|
|
#define NN_USOCK_ACTION_CONNECT 5
|
|
#define NN_USOCK_ACTION_ACTIVATE 6
|
|
#define NN_USOCK_ACTION_DONE 7
|
|
#define NN_USOCK_ACTION_ERROR 8
|
|
#define NN_USOCK_ACTION_STARTED 9
|
|
|
|
#define NN_USOCK_SRC_FD 1
|
|
#define NN_USOCK_SRC_TASK_CONNECTING 2
|
|
#define NN_USOCK_SRC_TASK_CONNECTED 3
|
|
#define NN_USOCK_SRC_TASK_ACCEPT 4
|
|
#define NN_USOCK_SRC_TASK_SEND 5
|
|
#define NN_USOCK_SRC_TASK_RECV 6
|
|
#define NN_USOCK_SRC_TASK_STOP 7
|
|
|
|
/* Private functions. */
|
|
static void nn_usock_init_from_fd (struct nn_usock *self, int s);
|
|
static int nn_usock_send_raw (struct nn_usock *self, struct msghdr *hdr);
|
|
static int nn_usock_recv_raw (struct nn_usock *self, void *buf, size_t *len);
|
|
static int nn_usock_geterr (struct nn_usock *self);
|
|
static void nn_usock_handler (struct nn_fsm *self, int src, int type,
|
|
void *srcptr);
|
|
static void nn_usock_shutdown (struct nn_fsm *self, int src, int type,
|
|
void *srcptr);
|
|
|
|
void nn_usock_init (struct nn_usock *self, int src, struct nn_fsm *owner)
|
|
{
|
|
/* Initalise the state machine. */
|
|
nn_fsm_init (&self->fsm, nn_usock_handler, nn_usock_shutdown,
|
|
src, self, owner);
|
|
self->state = NN_USOCK_STATE_IDLE;
|
|
|
|
/* Choose a worker thread to handle this socket. */
|
|
self->worker = nn_fsm_choose_worker (&self->fsm);
|
|
|
|
/* Actual file descriptor will be generated during 'start' step. */
|
|
self->s = -1;
|
|
self->errnum = 0;
|
|
|
|
self->in.buf = NULL;
|
|
self->in.len = 0;
|
|
self->in.batch = NULL;
|
|
self->in.batch_len = 0;
|
|
self->in.batch_pos = 0;
|
|
self->in.pfd = NULL;
|
|
|
|
memset (&self->out.hdr, 0, sizeof (struct msghdr));
|
|
|
|
/* Initialise tasks for the worker thread. */
|
|
nn_worker_fd_init (&self->wfd, NN_USOCK_SRC_FD, &self->fsm);
|
|
nn_worker_task_init (&self->task_connecting, NN_USOCK_SRC_TASK_CONNECTING,
|
|
&self->fsm);
|
|
nn_worker_task_init (&self->task_connected, NN_USOCK_SRC_TASK_CONNECTED,
|
|
&self->fsm);
|
|
nn_worker_task_init (&self->task_accept, NN_USOCK_SRC_TASK_ACCEPT,
|
|
&self->fsm);
|
|
nn_worker_task_init (&self->task_send, NN_USOCK_SRC_TASK_SEND, &self->fsm);
|
|
nn_worker_task_init (&self->task_recv, NN_USOCK_SRC_TASK_RECV, &self->fsm);
|
|
nn_worker_task_init (&self->task_stop, NN_USOCK_SRC_TASK_STOP, &self->fsm);
|
|
|
|
/* Intialise events raised by usock. */
|
|
nn_fsm_event_init (&self->event_established);
|
|
nn_fsm_event_init (&self->event_sent);
|
|
nn_fsm_event_init (&self->event_received);
|
|
nn_fsm_event_init (&self->event_error);
|
|
|
|
/* accepting is not going on at the moment. */
|
|
self->asock = NULL;
|
|
}
|
|
|
|
void nn_usock_term (struct nn_usock *self)
|
|
{
|
|
nn_assert_state (self, NN_USOCK_STATE_IDLE);
|
|
|
|
if (self->in.batch)
|
|
nn_free (self->in.batch);
|
|
|
|
nn_fsm_event_term (&self->event_error);
|
|
nn_fsm_event_term (&self->event_received);
|
|
nn_fsm_event_term (&self->event_sent);
|
|
nn_fsm_event_term (&self->event_established);
|
|
|
|
nn_worker_cancel (self->worker, &self->task_recv);
|
|
|
|
nn_worker_task_term (&self->task_stop);
|
|
nn_worker_task_term (&self->task_recv);
|
|
nn_worker_task_term (&self->task_send);
|
|
nn_worker_task_term (&self->task_accept);
|
|
nn_worker_task_term (&self->task_connected);
|
|
nn_worker_task_term (&self->task_connecting);
|
|
nn_worker_fd_term (&self->wfd);
|
|
|
|
nn_fsm_term (&self->fsm);
|
|
}
|
|
|
|
int nn_usock_isidle (struct nn_usock *self)
|
|
{
|
|
return nn_fsm_isidle (&self->fsm);
|
|
}
|
|
|
|
int nn_usock_start (struct nn_usock *self, int domain, int type, int protocol)
|
|
{
|
|
int s;
|
|
|
|
/* If the operating system allows to directly open the socket with CLOEXEC
|
|
flag, do so. That way there are no race conditions. */
|
|
//#ifdef SOCK_CLOEXEC
|
|
// type |= SOCK_CLOEXEC;
|
|
//#endif
|
|
|
|
/* Open the underlying socket. */
|
|
s = socket (domain, type, protocol);
|
|
if (nn_slow (s < 0))
|
|
return -errno;
|
|
//printf("got socket s.%d\n",s);
|
|
nn_usock_init_from_fd (self, s);
|
|
|
|
/* Start the state machine. */
|
|
nn_fsm_start (&self->fsm);
|
|
|
|
return 0;
|
|
}
|
|
|
|
void nn_usock_start_fd (struct nn_usock *self, int fd)
|
|
{
|
|
nn_usock_init_from_fd (self, fd);
|
|
nn_fsm_start (&self->fsm);
|
|
nn_fsm_action (&self->fsm, NN_USOCK_ACTION_STARTED);
|
|
}
|
|
|
|
static void nn_usock_init_from_fd (struct nn_usock *self, int s)
|
|
{
|
|
int rc;
|
|
int opt;
|
|
|
|
nn_assert (self->state == NN_USOCK_STATE_IDLE || NN_USOCK_STATE_BEING_ACCEPTED);
|
|
|
|
/* Store the file descriptor. */
|
|
nn_assert (self->s == -1);
|
|
self->s = s;
|
|
|
|
/* Setting FD_CLOEXEC option immediately after socket creation is the
|
|
second best option after using SOCK_CLOEXEC. There is a race condition
|
|
here (if process is forked between socket creation and setting
|
|
the option) but the problem is pretty unlikely to happen. */
|
|
//#if defined FD_CLOEXEC
|
|
// rc = fcntl (self->s, F_SETFD, FD_CLOEXEC);
|
|
//#if defined NN_HAVE_OSX
|
|
// errno_assert (rc != -1 || errno == EINVAL);
|
|
//#else
|
|
// errno_assert (rc != -1);
|
|
//#endif
|
|
//#endif
|
|
|
|
// If applicable, prevent SIGPIPE signal when writing to the connection already closed by the peer
|
|
#ifdef SO_NOSIGPIPE
|
|
opt = 1;
|
|
rc = setsockopt (self->s, SOL_SOCKET, SO_NOSIGPIPE, &opt, sizeof (opt));
|
|
#if defined NN_HAVE_OSX
|
|
errno_assert (rc == 0 || errno == EINVAL);
|
|
#else
|
|
errno_assert (rc == 0);
|
|
#endif
|
|
#endif
|
|
|
|
// Switch the socket to the non-blocking mode. All underlying sockets are always used in the callbackhronous mode
|
|
opt = fcntl(self->s, F_GETFL, 0);
|
|
if ( opt == -1 )
|
|
opt = 0;
|
|
if ( !(opt & O_NONBLOCK) )
|
|
{
|
|
rc = fcntl(self->s, F_SETFL, opt | O_NONBLOCK);
|
|
#if defined NN_HAVE_OSX
|
|
errno_assert (rc != -1 || errno == EINVAL);
|
|
#else
|
|
errno_assert (rc != -1);
|
|
#endif
|
|
}
|
|
}
|
|
|
|
void nn_usock_stop (struct nn_usock *self)
|
|
{
|
|
nn_fsm_stop (&self->fsm);
|
|
}
|
|
|
|
void nn_usock_async_stop (struct nn_usock *self)
|
|
{
|
|
nn_worker_execute (self->worker, &self->task_stop);
|
|
nn_fsm_raise (&self->fsm, &self->event_error, NN_USOCK_SHUTDOWN);
|
|
}
|
|
|
|
void nn_usock_swap_owner (struct nn_usock *self, struct nn_fsm_owner *owner)
|
|
{
|
|
nn_fsm_swap_owner (&self->fsm, owner);
|
|
}
|
|
|
|
int nn_usock_setsockopt (struct nn_usock *self, int level, int optname,
|
|
const void *optval, size_t optlen)
|
|
{
|
|
int rc;
|
|
|
|
/* The socket can be modified only before it's active. */
|
|
nn_assert (self->state == NN_USOCK_STATE_STARTING ||
|
|
self->state == NN_USOCK_STATE_ACCEPTED);
|
|
|
|
/* EINVAL errors are ignored on OSX platform. The reason for that is buggy
|
|
OSX behaviour where setsockopt returns EINVAL if the peer have already
|
|
disconnected. Thus, nn_usock_setsockopt() can succeed on OSX even though
|
|
the option value was invalid, but the peer have already closed the
|
|
connection. This behaviour should be relatively harmless. */
|
|
rc = setsockopt (self->s, level, optname, optval, (socklen_t) optlen);
|
|
#if defined NN_HAVE_OSX
|
|
if (nn_slow (rc != 0 && errno != EINVAL))
|
|
return -errno;
|
|
#else
|
|
if (nn_slow (rc != 0))
|
|
return -errno;
|
|
#endif
|
|
|
|
return 0;
|
|
}
|
|
|
|
int nn_usock_bind (struct nn_usock *self, const struct sockaddr *addr,
|
|
size_t addrlen)
|
|
{
|
|
int rc;
|
|
int opt;
|
|
|
|
/* The socket can be bound only before it's connected. */
|
|
nn_assert_state (self, NN_USOCK_STATE_STARTING);
|
|
|
|
/* Allow re-using the address. */
|
|
opt = 1;
|
|
printf("call setsockopt %d SOL_SOCKET.%d SO_REUSEADDR.%d in nn_usock_bind\n",self->s,SOL_SOCKET,SO_REUSEADDR);
|
|
rc = setsockopt (self->s, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof (opt));
|
|
printf("called setsockopt in nn_usock_bind returns %d\n",rc);
|
|
// ignore SO_REUSEADDR failures
|
|
//errno_assert (rc == 0);
|
|
|
|
rc = bind (self->s, addr, (socklen_t) addrlen);
|
|
printf("usock.%d -> bind rc.%d errno.%d %s\n",self->s,rc,errno,nn_strerror(errno));
|
|
if (nn_slow (rc != 0))
|
|
return -errno;
|
|
|
|
return 0;
|
|
}
|
|
|
|
int nn_usock_listen (struct nn_usock *self, int backlog)
|
|
{
|
|
int rc;
|
|
|
|
/* You can start listening only before the socket is connected. */
|
|
nn_assert_state (self, NN_USOCK_STATE_STARTING);
|
|
|
|
/* Start listening for incoming connections. */
|
|
rc = listen (self->s, backlog);
|
|
printf("usock.%d -> listen rc.%d errno.%d %s\n",self->s,rc,errno,nn_strerror(errno));
|
|
if (nn_slow (rc != 0))
|
|
return -errno;
|
|
|
|
/* Notify the state machine. */
|
|
nn_fsm_action (&self->fsm, NN_USOCK_ACTION_LISTEN);
|
|
|
|
return 0;
|
|
}
|
|
|
|
void nn_usock_accept (struct nn_usock *self, struct nn_usock *listener)
|
|
{
|
|
int s;
|
|
|
|
/* Start the actual accepting. */
|
|
if (nn_fsm_isidle(&self->fsm)) {
|
|
nn_fsm_start (&self->fsm);
|
|
nn_fsm_action (&self->fsm, NN_USOCK_ACTION_BEING_ACCEPTED);
|
|
}
|
|
nn_fsm_action (&listener->fsm, NN_USOCK_ACTION_ACCEPT);
|
|
|
|
/* Try to accept new connection in synchronous manner. */
|
|
#if NN_HAVE_ACCEPT4
|
|
s = accept4 (listener->s, NULL, NULL, SOCK_CLOEXEC);
|
|
#else
|
|
s = accept (listener->s, NULL, NULL);
|
|
#endif
|
|
printf("usock.%d -> accept errno.%d s.%d %s\n",self->s,errno,s,nn_strerror(errno));
|
|
|
|
/* Immediate success. */
|
|
if (nn_fast (s >= 0)) {
|
|
/* Disassociate the listener socket from the accepted
|
|
socket. Is useful if we restart accepting on ACCEPT_ERROR */
|
|
listener->asock = NULL;
|
|
self->asock = NULL;
|
|
|
|
nn_usock_init_from_fd (self, s);
|
|
nn_fsm_action (&listener->fsm, NN_USOCK_ACTION_DONE);
|
|
nn_fsm_action (&self->fsm, NN_USOCK_ACTION_DONE);
|
|
return;
|
|
}
|
|
|
|
/* Detect a failure. Note that in ECONNABORTED case we simply ignore
|
|
the error and wait for next connection in asynchronous manner. */
|
|
errno_assert (errno == EAGAIN || errno == EWOULDBLOCK ||
|
|
errno == ECONNABORTED || errno == ENFILE || errno == EMFILE ||
|
|
errno == ENOBUFS || errno == ENOMEM);
|
|
|
|
/* Pair the two sockets. They are already paired in case
|
|
previous attempt failed on ACCEPT_ERROR */
|
|
nn_assert (!self->asock || self->asock == listener);
|
|
self->asock = listener;
|
|
nn_assert (!listener->asock || listener->asock == self);
|
|
listener->asock = self;
|
|
|
|
/* Some errors are just ok to ignore for now. We also stop repeating
|
|
any errors until next IN_FD event so that we are not in a tight loop
|
|
and allow processing other events in the meantime */
|
|
if (nn_slow (errno != EAGAIN && errno != EWOULDBLOCK && errno != ECONNABORTED && errno != listener->errnum))
|
|
{
|
|
printf("listen errno.%d\n",errno);
|
|
listener->errnum = errno;
|
|
listener->state = NN_USOCK_STATE_ACCEPTING_ERROR;
|
|
nn_fsm_raise (&listener->fsm,
|
|
&listener->event_error, NN_USOCK_ACCEPT_ERROR);
|
|
return;
|
|
}
|
|
|
|
/* Ask the worker thread to wait for the new connection. */
|
|
nn_worker_execute (listener->worker, &listener->task_accept);
|
|
}
|
|
|
|
void nn_usock_activate (struct nn_usock *self)
|
|
{
|
|
nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ACTIVATE);
|
|
}
|
|
|
|
void nn_usock_connect (struct nn_usock *self, const struct sockaddr *addr,
|
|
size_t addrlen)
|
|
{
|
|
int rc;
|
|
|
|
/* Notify the state machine that we've started connecting. */
|
|
nn_fsm_action (&self->fsm, NN_USOCK_ACTION_CONNECT);
|
|
|
|
/* Do the connect itself. */
|
|
rc = connect(self->s,addr,(socklen_t)addrlen);
|
|
printf("usock.%d <- connect (%llx) rc.%d errno.%d %s\n",self->s,*(long long *)addr,rc,errno,nn_strerror(errno));
|
|
/* Immediate success. */
|
|
if ( nn_fast(rc == 0) )
|
|
{
|
|
nn_fsm_action(&self->fsm,NN_USOCK_ACTION_DONE);
|
|
return;
|
|
}
|
|
/* Immediate error. */
|
|
if ( nn_slow(errno != EINPROGRESS) )
|
|
{
|
|
self->errnum = errno;
|
|
printf("error.%d not EINPROGRESS\n",errno);
|
|
nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR);
|
|
return;
|
|
}
|
|
|
|
/* Start asynchronous connect. */
|
|
nn_worker_execute (self->worker, &self->task_connecting);
|
|
}
|
|
|
|
void nn_usock_send (struct nn_usock *self, const struct nn_iovec *iov,
|
|
int iovcnt)
|
|
{
|
|
int rc;
|
|
int i;
|
|
int out;
|
|
|
|
/* Make sure that the socket is actually alive. */
|
|
nn_assert_state (self, NN_USOCK_STATE_ACTIVE);
|
|
|
|
/* Copy the iovecs to the socket. */
|
|
nn_assert (iovcnt <= NN_USOCK_MAX_IOVCNT);
|
|
self->out.hdr.msg_iov = self->out.iov;
|
|
out = 0;
|
|
for (i = 0; i != iovcnt; ++i) {
|
|
if (iov [i].iov_len == 0)
|
|
continue;
|
|
self->out.iov [out].iov_base = iov [i].iov_base;
|
|
self->out.iov [out].iov_len = iov [i].iov_len;
|
|
out++;
|
|
printf("{%d} ",(int)iov [i].iov_len);
|
|
}
|
|
self->out.hdr.msg_iovlen = out;
|
|
|
|
/* Try to send the data immediately. */
|
|
rc = nn_usock_send_raw (self, &self->out.hdr);
|
|
printf("iov[%d] nn_usock_send_raw -> rc.%d\n",out,rc);
|
|
|
|
/* Success. */
|
|
if (nn_fast (rc == 0)) {
|
|
nn_fsm_raise (&self->fsm, &self->event_sent, NN_USOCK_SENT);
|
|
return;
|
|
}
|
|
|
|
/* Errors. */
|
|
if (nn_slow (rc != -EAGAIN)) {
|
|
errnum_assert (rc == -ECONNRESET, -rc);
|
|
nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR);
|
|
return;
|
|
}
|
|
|
|
/* Ask the worker thread to send the remaining data. */
|
|
nn_worker_execute (self->worker, &self->task_send);
|
|
}
|
|
|
|
void nn_usock_recv (struct nn_usock *self, void *buf, size_t len, int *fd)
|
|
{
|
|
int rc;
|
|
size_t nbytes;
|
|
|
|
/* Make sure that the socket is actually alive. */
|
|
nn_assert_state (self, NN_USOCK_STATE_ACTIVE);
|
|
|
|
/* Try to receive the data immediately. */
|
|
nbytes = len;
|
|
self->in.pfd = fd;
|
|
rc = nn_usock_recv_raw (self, buf, &nbytes);
|
|
if (nn_slow (rc < 0)) {
|
|
errnum_assert (rc == -ECONNRESET, -rc);
|
|
//printf("rc.%d vs ECONNRESET\n",rc,ECONNRESET);
|
|
nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR);
|
|
return;
|
|
}
|
|
//int i;
|
|
//for (i=0; i<16&&i<nbytes; i++)
|
|
// printf("%02x ",((uint8_t *)buf)[i]);
|
|
//printf("nn_usock_recv nbytes.%d\n",(int)nbytes);
|
|
/* Success. */
|
|
if (nn_fast (nbytes == len)) {
|
|
//printf("raise NN_USOCK_RECEIVED\n");
|
|
nn_fsm_raise (&self->fsm, &self->event_received, NN_USOCK_RECEIVED);
|
|
return;
|
|
}
|
|
|
|
/* There are still data to receive in the background. */
|
|
self->in.buf = ((uint8_t*) buf) + nbytes;
|
|
self->in.len = len - nbytes;
|
|
|
|
/* Ask the worker thread to receive the remaining data. */
|
|
nn_worker_execute (self->worker, &self->task_recv);
|
|
}
|
|
|
|
static int nn_internal_tasks (struct nn_usock *usock, int src, int type)
|
|
{
|
|
|
|
/******************************************************************************/
|
|
/* Internal tasks sent from the user thread to the worker thread. */
|
|
/******************************************************************************/
|
|
switch (src) {
|
|
case NN_USOCK_SRC_TASK_SEND:
|
|
nn_assert (type == NN_WORKER_TASK_EXECUTE);
|
|
nn_worker_set_out (usock->worker, &usock->wfd);
|
|
return 1;
|
|
case NN_USOCK_SRC_TASK_RECV:
|
|
nn_assert (type == NN_WORKER_TASK_EXECUTE);
|
|
nn_worker_set_in (usock->worker, &usock->wfd);
|
|
return 1;
|
|
case NN_USOCK_SRC_TASK_CONNECTED:
|
|
nn_assert (type == NN_WORKER_TASK_EXECUTE);
|
|
nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
|
|
return 1;
|
|
case NN_USOCK_SRC_TASK_CONNECTING:
|
|
nn_assert (type == NN_WORKER_TASK_EXECUTE);
|
|
nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
|
|
nn_worker_set_out (usock->worker, &usock->wfd);
|
|
return 1;
|
|
case NN_USOCK_SRC_TASK_ACCEPT:
|
|
nn_assert (type == NN_WORKER_TASK_EXECUTE);
|
|
nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
|
|
nn_worker_set_in (usock->worker, &usock->wfd);
|
|
return 1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static void nn_usock_shutdown (struct nn_fsm *self, int src, int type,
|
|
NN_UNUSED void *srcptr)
|
|
{
|
|
struct nn_usock *usock;
|
|
|
|
usock = nn_cont (self, struct nn_usock, fsm);
|
|
|
|
if (nn_internal_tasks (usock, src, type))
|
|
return;
|
|
|
|
if (nn_slow (src == NN_FSM_ACTION && type == NN_FSM_STOP)) {
|
|
|
|
/* Socket in ACCEPTING or CANCELLING state cannot be closed.
|
|
Stop the socket being accepted first. */
|
|
nn_assert (usock->state != NN_USOCK_STATE_ACCEPTING &&
|
|
usock->state != NN_USOCK_STATE_CANCELLING);
|
|
|
|
usock->errnum = 0;
|
|
|
|
/* Synchronous stop. */
|
|
if (usock->state == NN_USOCK_STATE_IDLE)
|
|
goto finish3;
|
|
if (usock->state == NN_USOCK_STATE_DONE)
|
|
goto finish2;
|
|
if (usock->state == NN_USOCK_STATE_STARTING ||
|
|
usock->state == NN_USOCK_STATE_ACCEPTED ||
|
|
usock->state == NN_USOCK_STATE_ACCEPTING_ERROR ||
|
|
usock->state == NN_USOCK_STATE_LISTENING)
|
|
goto finish1;
|
|
|
|
/* When socket that's being accepted is asked to stop, we have to
|
|
ask the listener socket to stop accepting first. */
|
|
if (usock->state == NN_USOCK_STATE_BEING_ACCEPTED) {
|
|
nn_fsm_action (&usock->asock->fsm, NN_USOCK_ACTION_CANCEL);
|
|
usock->state = NN_USOCK_STATE_STOPPING_ACCEPT;
|
|
return;
|
|
}
|
|
|
|
/* Asynchronous stop. */
|
|
if (usock->state != NN_USOCK_STATE_REMOVING_FD)
|
|
nn_usock_async_stop (usock);
|
|
usock->state = NN_USOCK_STATE_STOPPING;
|
|
return;
|
|
}
|
|
if (nn_slow (usock->state == NN_USOCK_STATE_STOPPING_ACCEPT)) {
|
|
nn_assert (src == NN_FSM_ACTION && type == NN_USOCK_ACTION_DONE);
|
|
goto finish2;
|
|
}
|
|
if (nn_slow (usock->state == NN_USOCK_STATE_STOPPING)) {
|
|
if (src != NN_USOCK_SRC_TASK_STOP)
|
|
return;
|
|
nn_assert (type == NN_WORKER_TASK_EXECUTE);
|
|
nn_worker_rm_fd (usock->worker, &usock->wfd);
|
|
finish1:
|
|
nn_closefd (usock->s);
|
|
usock->s = -1;
|
|
finish2:
|
|
usock->state = NN_USOCK_STATE_IDLE;
|
|
nn_fsm_stopped (&usock->fsm, NN_USOCK_STOPPED);
|
|
finish3:
|
|
return;
|
|
}
|
|
|
|
nn_fsm_bad_state(usock->state, src, type);
|
|
}
|
|
|
|
static void nn_usock_handler (struct nn_fsm *self, int src, int type,
|
|
NN_UNUSED void *srcptr)
|
|
{
|
|
int rc;
|
|
struct nn_usock *usock;
|
|
int s;
|
|
size_t sz;
|
|
int sockerr;
|
|
|
|
usock = nn_cont (self, struct nn_usock, fsm);
|
|
|
|
if(nn_internal_tasks(usock, src, type))
|
|
return;
|
|
|
|
switch (usock->state) {
|
|
|
|
/******************************************************************************/
|
|
/* IDLE state. */
|
|
/* nn_usock object is initialised, but underlying OS socket is not yet */
|
|
/* created. */
|
|
/******************************************************************************/
|
|
case NN_USOCK_STATE_IDLE:
|
|
switch (src) {
|
|
case NN_FSM_ACTION:
|
|
switch (type) {
|
|
case NN_FSM_START:
|
|
usock->state = NN_USOCK_STATE_STARTING;
|
|
return;
|
|
default:
|
|
nn_fsm_bad_action (usock->state, src, type);
|
|
}
|
|
default:
|
|
nn_fsm_bad_source (usock->state, src, type);
|
|
}
|
|
|
|
/******************************************************************************/
|
|
/* STARTING state. */
|
|
/* Underlying OS socket is created, but it's not yet passed to the worker */
|
|
/* thread. In this state we can set socket options, local and remote */
|
|
/* address etc. */
|
|
/******************************************************************************/
|
|
case NN_USOCK_STATE_STARTING:
|
|
|
|
/* Events from the owner of the usock. */
|
|
switch (src) {
|
|
case NN_FSM_ACTION:
|
|
switch (type) {
|
|
case NN_USOCK_ACTION_LISTEN:
|
|
usock->state = NN_USOCK_STATE_LISTENING;
|
|
return;
|
|
case NN_USOCK_ACTION_CONNECT:
|
|
usock->state = NN_USOCK_STATE_CONNECTING;
|
|
return;
|
|
case NN_USOCK_ACTION_BEING_ACCEPTED:
|
|
usock->state = NN_USOCK_STATE_BEING_ACCEPTED;
|
|
return;
|
|
case NN_USOCK_ACTION_STARTED:
|
|
nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
|
|
usock->state = NN_USOCK_STATE_ACTIVE;
|
|
return;
|
|
default:
|
|
nn_fsm_bad_action (usock->state, src, type);
|
|
}
|
|
default:
|
|
nn_fsm_bad_source (usock->state, src, type);
|
|
}
|
|
|
|
/******************************************************************************/
|
|
/* BEING_ACCEPTED state. */
|
|
/* accept() was called on the usock. Now the socket is waiting for a new */
|
|
/* connection to arrive. */
|
|
/******************************************************************************/
|
|
case NN_USOCK_STATE_BEING_ACCEPTED:
|
|
switch (src) {
|
|
case NN_FSM_ACTION:
|
|
switch (type) {
|
|
case NN_USOCK_ACTION_DONE:
|
|
usock->state = NN_USOCK_STATE_ACCEPTED;
|
|
nn_fsm_raise (&usock->fsm, &usock->event_established,
|
|
NN_USOCK_ACCEPTED);
|
|
return;
|
|
default:
|
|
nn_fsm_bad_action (usock->state, src, type);
|
|
}
|
|
default:
|
|
nn_fsm_bad_source (usock->state, src, type);
|
|
}
|
|
|
|
/******************************************************************************/
|
|
/* ACCEPTED state. */
|
|
/* Connection was accepted, now it can be tuned. Afterwards, it'll move to */
|
|
/* the active state. */
|
|
/******************************************************************************/
|
|
case NN_USOCK_STATE_ACCEPTED:
|
|
switch (src) {
|
|
case NN_FSM_ACTION:
|
|
switch (type) {
|
|
case NN_USOCK_ACTION_ACTIVATE:
|
|
nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
|
|
usock->state = NN_USOCK_STATE_ACTIVE;
|
|
return;
|
|
default:
|
|
nn_fsm_bad_action (usock->state, src, type);
|
|
}
|
|
default:
|
|
nn_fsm_bad_source (usock->state, src, type);
|
|
}
|
|
|
|
/******************************************************************************/
|
|
/* CONNECTING state. */
|
|
/* Asynchronous connecting is going on. */
|
|
/******************************************************************************/
|
|
case NN_USOCK_STATE_CONNECTING:
|
|
switch (src) {
|
|
case NN_FSM_ACTION:
|
|
switch (type) {
|
|
case NN_USOCK_ACTION_DONE:
|
|
usock->state = NN_USOCK_STATE_ACTIVE;
|
|
nn_worker_execute (usock->worker, &usock->task_connected);
|
|
nn_fsm_raise (&usock->fsm, &usock->event_established,
|
|
NN_USOCK_CONNECTED);
|
|
return;
|
|
case NN_USOCK_ACTION_ERROR:
|
|
nn_closefd (usock->s);
|
|
usock->s = -1;
|
|
usock->state = NN_USOCK_STATE_DONE;
|
|
nn_fsm_raise (&usock->fsm, &usock->event_error,
|
|
NN_USOCK_ERROR);
|
|
return;
|
|
default:
|
|
nn_fsm_bad_action (usock->state, src, type);
|
|
}
|
|
case NN_USOCK_SRC_FD:
|
|
switch (type) {
|
|
case NN_WORKER_FD_OUT:
|
|
nn_worker_reset_out (usock->worker, &usock->wfd);
|
|
usock->state = NN_USOCK_STATE_ACTIVE;
|
|
sockerr = nn_usock_geterr(usock);
|
|
if (sockerr == 0) {
|
|
nn_fsm_raise (&usock->fsm, &usock->event_established,
|
|
NN_USOCK_CONNECTED);
|
|
} else {
|
|
usock->errnum = sockerr;
|
|
nn_worker_rm_fd (usock->worker, &usock->wfd);
|
|
rc = close (usock->s);
|
|
errno_assert (rc == 0);
|
|
usock->s = -1;
|
|
usock->state = NN_USOCK_STATE_DONE;
|
|
nn_fsm_raise (&usock->fsm,
|
|
&usock->event_error, NN_USOCK_ERROR);
|
|
}
|
|
return;
|
|
case NN_WORKER_FD_ERR:
|
|
nn_worker_rm_fd (usock->worker, &usock->wfd);
|
|
nn_closefd (usock->s);
|
|
usock->s = -1;
|
|
usock->state = NN_USOCK_STATE_DONE;
|
|
nn_fsm_raise (&usock->fsm, &usock->event_error, NN_USOCK_ERROR);
|
|
return;
|
|
default:
|
|
nn_fsm_bad_action (usock->state, src, type);
|
|
}
|
|
default:
|
|
nn_fsm_bad_source (usock->state, src, type);
|
|
}
|
|
|
|
/******************************************************************************/
|
|
/* ACTIVE state. */
|
|
/* Socket is connected. It can be used for sending and receiving data. */
|
|
/******************************************************************************/
|
|
case NN_USOCK_STATE_ACTIVE:
|
|
switch (src) {
|
|
case NN_USOCK_SRC_FD:
|
|
switch (type) {
|
|
case NN_WORKER_FD_IN:
|
|
sz = usock->in.len;
|
|
rc = nn_usock_recv_raw (usock, usock->in.buf, &sz);
|
|
if (nn_fast (rc == 0)) {
|
|
usock->in.len -= sz;
|
|
usock->in.buf += sz;
|
|
if (!usock->in.len) {
|
|
nn_worker_reset_in (usock->worker, &usock->wfd);
|
|
nn_fsm_raise (&usock->fsm, &usock->event_received,
|
|
NN_USOCK_RECEIVED);
|
|
}
|
|
return;
|
|
}
|
|
errnum_assert (rc == -ECONNRESET, -rc);
|
|
goto error;
|
|
case NN_WORKER_FD_OUT:
|
|
rc = nn_usock_send_raw (usock, &usock->out.hdr);
|
|
if (nn_fast (rc == 0)) {
|
|
nn_worker_reset_out (usock->worker, &usock->wfd);
|
|
nn_fsm_raise (&usock->fsm, &usock->event_sent,
|
|
NN_USOCK_SENT);
|
|
return;
|
|
}
|
|
if (nn_fast (rc == -EAGAIN))
|
|
return;
|
|
errnum_assert (rc == -ECONNRESET, -rc);
|
|
goto error;
|
|
case NN_WORKER_FD_ERR:
|
|
error:
|
|
nn_worker_rm_fd (usock->worker, &usock->wfd);
|
|
nn_closefd (usock->s);
|
|
usock->s = -1;
|
|
usock->state = NN_USOCK_STATE_DONE;
|
|
nn_fsm_raise (&usock->fsm, &usock->event_error, NN_USOCK_ERROR);
|
|
return;
|
|
default:
|
|
nn_fsm_bad_action (usock->state, src, type);
|
|
}
|
|
case NN_FSM_ACTION:
|
|
switch (type) {
|
|
case NN_USOCK_ACTION_ERROR:
|
|
usock->state = NN_USOCK_STATE_REMOVING_FD;
|
|
nn_usock_async_stop (usock);
|
|
return;
|
|
default:
|
|
nn_fsm_bad_action (usock->state, src, type);
|
|
}
|
|
default:
|
|
nn_fsm_bad_source(usock->state, src, type);
|
|
}
|
|
|
|
/******************************************************************************/
|
|
/* REMOVING_FD state. */
|
|
/******************************************************************************/
|
|
case NN_USOCK_STATE_REMOVING_FD:
|
|
switch (src) {
|
|
case NN_USOCK_SRC_TASK_STOP:
|
|
switch (type) {
|
|
case NN_WORKER_TASK_EXECUTE:
|
|
nn_worker_rm_fd (usock->worker, &usock->wfd);
|
|
nn_closefd (usock->s);
|
|
usock->s = -1;
|
|
usock->state = NN_USOCK_STATE_DONE;
|
|
nn_fsm_raise (&usock->fsm, &usock->event_error, NN_USOCK_ERROR);
|
|
return;
|
|
default:
|
|
nn_fsm_bad_action (usock->state, src, type);
|
|
}
|
|
|
|
/* Events from the file descriptor are ignored while it is being
|
|
removed. */
|
|
case NN_USOCK_SRC_FD:
|
|
return;
|
|
|
|
default:
|
|
nn_fsm_bad_source (usock->state, src, type);
|
|
}
|
|
|
|
/******************************************************************************/
|
|
/* DONE state. */
|
|
/* Socket is closed. The only thing that can be done in this state is */
|
|
/* stopping the usock. */
|
|
/******************************************************************************/
|
|
case NN_USOCK_STATE_DONE:
|
|
nn_fsm_bad_source (usock->state, src, type);
|
|
|
|
/******************************************************************************/
|
|
/* LISTENING state. */
|
|
/* Socket is listening for new incoming connections, however, user is not */
|
|
/* accepting a new connection. */
|
|
/******************************************************************************/
|
|
case NN_USOCK_STATE_LISTENING:
|
|
switch (src) {
|
|
case NN_FSM_ACTION:
|
|
switch (type) {
|
|
case NN_USOCK_ACTION_ACCEPT:
|
|
usock->state = NN_USOCK_STATE_ACCEPTING;
|
|
return;
|
|
default:
|
|
nn_fsm_bad_action (usock->state, src, type);
|
|
}
|
|
default:
|
|
nn_fsm_bad_source (usock->state, src, type);
|
|
}
|
|
|
|
/******************************************************************************/
|
|
/* ACCEPTING state. */
|
|
/* User is waiting asynchronouslyfor a new inbound connection */
|
|
/* to be accepted. */
|
|
/******************************************************************************/
|
|
case NN_USOCK_STATE_ACCEPTING:
|
|
switch (src) {
|
|
case NN_FSM_ACTION:
|
|
switch (type) {
|
|
case NN_USOCK_ACTION_DONE:
|
|
usock->state = NN_USOCK_STATE_LISTENING;
|
|
return;
|
|
case NN_USOCK_ACTION_CANCEL:
|
|
usock->state = NN_USOCK_STATE_CANCELLING;
|
|
nn_worker_execute (usock->worker, &usock->task_stop);
|
|
return;
|
|
default:
|
|
nn_fsm_bad_action (usock->state, src, type);
|
|
}
|
|
case NN_USOCK_SRC_FD:
|
|
switch (type) {
|
|
case NN_WORKER_FD_IN:
|
|
|
|
/* New connection arrived in asynchronous manner. */
|
|
#if NN_HAVE_ACCEPT4
|
|
s = accept4 (usock->s, NULL, NULL, SOCK_CLOEXEC);
|
|
#else
|
|
s = accept (usock->s, NULL, NULL);
|
|
#endif
|
|
|
|
/* ECONNABORTED is an valid error. New connection was closed
|
|
by the peer before we were able to accept it. If it happens
|
|
do nothing and wait for next incoming connection. */
|
|
if (nn_slow (s < 0 && errno == ECONNABORTED))
|
|
return;
|
|
|
|
/* Resource allocation errors. It's not clear from POSIX
|
|
specification whether the new connection is closed in this
|
|
case or whether it remains in the backlog. In the latter
|
|
case it would be wise to wait here for a while to prevent
|
|
busy looping. */
|
|
if (nn_slow (s < 0 && (errno == ENFILE || errno == EMFILE ||
|
|
errno == ENOBUFS || errno == ENOMEM))) {
|
|
usock->errnum = errno;
|
|
usock->state = NN_USOCK_STATE_ACCEPTING_ERROR;
|
|
|
|
/* Wait till the user starts accepting once again. */
|
|
nn_worker_rm_fd (usock->worker, &usock->wfd);
|
|
|
|
nn_fsm_raise (&usock->fsm,
|
|
&usock->event_error, NN_USOCK_ACCEPT_ERROR);
|
|
return;
|
|
}
|
|
|
|
/* Any other error is unexpected. */
|
|
errno_assert (s >= 0);
|
|
|
|
/* Initialise the new usock object. */
|
|
nn_usock_init_from_fd (usock->asock, s);
|
|
usock->asock->state = NN_USOCK_STATE_ACCEPTED;
|
|
|
|
/* Notify the user that connection was accepted. */
|
|
nn_fsm_raise (&usock->asock->fsm,
|
|
&usock->asock->event_established, NN_USOCK_ACCEPTED);
|
|
|
|
/* Disassociate the listener socket from the accepted
|
|
socket. */
|
|
usock->asock->asock = NULL;
|
|
usock->asock = NULL;
|
|
|
|
/* Wait till the user starts accepting once again. */
|
|
nn_worker_rm_fd (usock->worker, &usock->wfd);
|
|
usock->state = NN_USOCK_STATE_LISTENING;
|
|
|
|
return;
|
|
|
|
default:
|
|
nn_fsm_bad_action (usock->state, src, type);
|
|
}
|
|
default:
|
|
nn_fsm_bad_source (usock->state, src, type);
|
|
}
|
|
|
|
/******************************************************************************/
|
|
/* ACCEPTING_ERROR state. */
|
|
/* Waiting the socket to accept the error and restart */
|
|
/******************************************************************************/
|
|
case NN_USOCK_STATE_ACCEPTING_ERROR:
|
|
switch (src) {
|
|
case NN_FSM_ACTION:
|
|
switch (type) {
|
|
case NN_USOCK_ACTION_ACCEPT:
|
|
usock->state = NN_USOCK_STATE_ACCEPTING;
|
|
return;
|
|
default:
|
|
nn_fsm_bad_action (usock->state, src, type);
|
|
}
|
|
default:
|
|
nn_fsm_bad_source (usock->state, src, type);
|
|
}
|
|
|
|
/******************************************************************************/
|
|
/* CANCELLING state. */
|
|
/******************************************************************************/
|
|
case NN_USOCK_STATE_CANCELLING:
|
|
switch (src) {
|
|
case NN_USOCK_SRC_TASK_STOP:
|
|
switch (type) {
|
|
case NN_WORKER_TASK_EXECUTE:
|
|
nn_worker_rm_fd (usock->worker, &usock->wfd);
|
|
usock->state = NN_USOCK_STATE_LISTENING;
|
|
|
|
/* Notify the accepted socket that it was stopped. */
|
|
nn_fsm_action (&usock->asock->fsm, NN_USOCK_ACTION_DONE);
|
|
|
|
return;
|
|
default:
|
|
nn_fsm_bad_action (usock->state, src, type);
|
|
}
|
|
case NN_USOCK_SRC_FD:
|
|
switch (type) {
|
|
case NN_WORKER_FD_IN:
|
|
return;
|
|
default:
|
|
nn_fsm_bad_action (usock->state, src, type);
|
|
}
|
|
default:
|
|
nn_fsm_bad_source (usock->state, src, type);
|
|
}
|
|
|
|
/******************************************************************************/
|
|
/* Invalid state */
|
|
/******************************************************************************/
|
|
default:
|
|
nn_fsm_bad_state (usock->state, src, type);
|
|
}
|
|
}
|
|
|
|
int32_t nn_getiovec_size(uint8_t *buf,int32_t maxlen,struct msghdr *hdr)
|
|
{
|
|
int32_t i,size = 0; struct iovec *iov;
|
|
for (i=0; i<hdr->msg_iovlen; i++)
|
|
{
|
|
iov = &hdr->msg_iov[i];
|
|
if ( nn_slow(iov->iov_len == NN_MSG) )
|
|
{
|
|
errno = EINVAL;
|
|
printf("ERROR: iov->iov_len == NN_MSG\n");
|
|
return(-1);
|
|
}
|
|
if ( nn_slow(!iov->iov_base && iov->iov_len) )
|
|
{
|
|
errno = EFAULT;
|
|
printf("ERROR: !iov->iov_base && iov->iov_len\n");
|
|
return(-1);
|
|
}
|
|
if ( maxlen > 0 && nn_slow(size + iov->iov_len > maxlen) )
|
|
{
|
|
errno = EINVAL;
|
|
printf("ERROR: sz.%d + iov->iov_len.%d < maxlen.%d\n",(int32_t)size,(int32_t)iov->iov_len,maxlen);
|
|
return(-1);
|
|
}
|
|
if ( iov->iov_len > 0 )
|
|
{
|
|
if ( buf != 0 )
|
|
memcpy(&buf[size],iov->iov_base,iov->iov_len);
|
|
size += (int32_t)iov->iov_len;
|
|
}
|
|
}
|
|
return(size);
|
|
}
|
|
|
|
ssize_t mysendmsg(int32_t usock,struct msghdr *hdr,int32_t flags)
|
|
{
|
|
ssize_t nbytes = 0; int32_t veclen,offset,clen,err = 0; uint8_t *buf,_buf[8192];
|
|
if ( (veclen= nn_getiovec_size(0,0,hdr)) > 0 )
|
|
{
|
|
clen = hdr->msg_controllen;
|
|
if ( hdr->msg_control == 0 )
|
|
clen = 0;
|
|
nn_assert(clen == 0); // no support control messagies
|
|
if ( veclen > sizeof(_buf) ) // - clen - 5) )
|
|
buf = malloc(veclen);// + clen + 5);
|
|
else buf = _buf;
|
|
offset = 0;
|
|
/*buf[offset++] = (veclen & 0xff);
|
|
buf[offset++] = ((veclen>>8) & 0xff);
|
|
buf[offset++] = ((veclen>>15) & 0xff);
|
|
buf[offset++] = (clen & 0xff);
|
|
buf[offset++] = ((clen>>8) & 0xff);
|
|
if ( clen > 0 )
|
|
memcpy(&buf[offset],hdr->msg_control,clen), offset += clen;*/
|
|
if ( nn_getiovec_size(&buf[offset],veclen,hdr) == veclen )
|
|
{
|
|
nbytes = send(usock,buf,offset + veclen,0);
|
|
printf(">>>>>>>>> send.[%d %d %d %d] (n.%d v.%d c.%d)-> usock.%d nbytes.%d\n",buf[offset],buf[offset+1],buf[offset+2],buf[offset+3],(int32_t)offset+veclen,veclen,clen,usock,(int32_t)nbytes);
|
|
if ( nbytes != offset + veclen )
|
|
{
|
|
printf("nbytes.%d != offset.%d veclen.%d errno.%d usock.%d\n",(int32_t)nbytes,(int32_t)offset,veclen,errno,usock);
|
|
}
|
|
if ( nbytes >= offset )
|
|
nbytes -= offset;
|
|
}
|
|
else
|
|
{
|
|
err = -errno;
|
|
printf("mysendmsg: unexpected nn_getiovec_size error %d\n",err);
|
|
}
|
|
if ( buf != _buf )
|
|
free(buf);
|
|
if ( err != 0 )
|
|
{
|
|
printf("nn_usock_send_raw errno.%d err.%d\n",errno,err);
|
|
return(-errno);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
printf("nn_usock_send_raw errno.%d invalid iovec size\n",errno);
|
|
return(-errno);
|
|
}
|
|
return(nbytes);
|
|
}
|
|
|
|
ssize_t myrecvmsg(int32_t usock,struct msghdr *hdr,int32_t flags,int32_t len)
|
|
{
|
|
ssize_t nbytes; struct iovec *iov; //uint8_t lens[5];
|
|
iov = hdr->msg_iov;
|
|
/*if ( (n= (int32_t)recv(usock,lens,sizeof(lens),0)) != sizeof(lens) )
|
|
{
|
|
printf("error getting veclen/clen n.%d vs %d from usock.%d\n",n,(int32_t)sizeof(lens),usock);
|
|
return(0);
|
|
} else printf("GOT %d bytes from usock.%d\n",n,usock);
|
|
offset = 0;
|
|
veclen = lens[offset++];
|
|
veclen |= ((int32_t)lens[offset++] << 8);
|
|
veclen |= ((int32_t)lens[offset++] << 16);
|
|
clen = lens[offset++];
|
|
clen |= ((int32_t)lens[offset++] << 8);
|
|
printf("veclen.%d clen.%d waiting in usock.%d\n",veclen,clen,usock);
|
|
if ( clen > 0 )
|
|
{
|
|
if ( (cbytes= (int32_t)recv(usock,hdr->msg_control,clen,0)) != clen )
|
|
{
|
|
printf("myrecvmsg: unexpected cbytes.%d vs clen.%d\n",cbytes,clen);
|
|
}
|
|
} else cbytes = 0;*/
|
|
hdr->msg_controllen = 0;
|
|
if ( (nbytes= (int32_t)recv(usock,iov->iov_base,len,0)) != len )
|
|
{
|
|
//printf("myrecvmsg: partial nbytes.%d vs veclen.%d\n",(int32_t)nbytes,len);
|
|
}
|
|
//printf("GOT nbytes.%d of len.%d from usock.%d\n",(int32_t)nbytes,len,usock);
|
|
if ( 0 && nbytes > 0 )
|
|
{
|
|
printf("got nbytes.%d from usock.%d [%d %d %d %d]\n",(int32_t)nbytes,usock,((uint8_t *)iov->iov_base)[0],((uint8_t *)iov->iov_base)[1],((uint8_t *)iov->iov_base)[2],((uint8_t *)iov->iov_base)[3]);
|
|
}
|
|
return(nbytes);
|
|
}
|
|
|
|
static int nn_usock_send_raw (struct nn_usock *self, struct msghdr *hdr)
|
|
{
|
|
ssize_t nbytes;
|
|
#if NN_USE_MYMSG
|
|
nbytes = mysendmsg(self->s,hdr,0);
|
|
#else
|
|
#if defined MSG_NOSIGNAL
|
|
nbytes = sendmsg(self->s,hdr,MSG_NOSIGNAL);
|
|
#else
|
|
nbytes = sendmsg(self->s,hdr,0);
|
|
printf("nn_usock_send_raw nbytes.%d\n",(int32_t)nbytes);
|
|
#endif
|
|
#endif
|
|
/* Handle errors. */
|
|
if (nn_slow (nbytes < 0)) {
|
|
if (nn_fast (errno == EAGAIN || errno == EWOULDBLOCK))
|
|
nbytes = 0;
|
|
else {
|
|
|
|
/* If the connection fails, return ECONNRESET. */
|
|
errno_assert (errno == ECONNRESET || errno == ETIMEDOUT ||
|
|
errno == EPIPE || errno == ECONNREFUSED || errno == ENOTCONN);
|
|
return -ECONNRESET;
|
|
}
|
|
}
|
|
|
|
/* Some bytes were sent. Adjust the iovecs accordingly. */
|
|
while (nbytes) {
|
|
if (nbytes >= (ssize_t)hdr->msg_iov->iov_len) {
|
|
--hdr->msg_iovlen;
|
|
if (!hdr->msg_iovlen) {
|
|
nn_assert (nbytes == (ssize_t)hdr->msg_iov->iov_len);
|
|
return 0;
|
|
}
|
|
nbytes -= hdr->msg_iov->iov_len;
|
|
++hdr->msg_iov;
|
|
}
|
|
else {
|
|
hdr->msg_iov->iov_base = &((uint8_t *)hdr->msg_iov->iov_base)[nbytes];
|
|
//*((uint8_t **)&(hdr->msg_iov->iov_base)) += nbytes;
|
|
hdr->msg_iov->iov_len -= nbytes;
|
|
return -EAGAIN;
|
|
}
|
|
}
|
|
|
|
if (hdr->msg_iovlen > 0)
|
|
return -EAGAIN;
|
|
|
|
return 0;
|
|
}
|
|
|
|
int32_t nn_process_cmsg(struct nn_usock *self,struct msghdr *hdr)
|
|
{
|
|
// Extract the associated file descriptor, if any
|
|
int32_t retval = -1;
|
|
#if defined NN_HAVE_MSG_CONTROL
|
|
struct cmsghdr *cmsg;
|
|
cmsg = CMSG_FIRSTHDR(hdr);
|
|
while ( cmsg )
|
|
{
|
|
if ( cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS )
|
|
{
|
|
memcpy(&retval,(int32_t *)CMSG_DATA(cmsg),sizeof(int32_t));
|
|
if ( self->in.pfd )
|
|
{
|
|
printf("CMSG set self->in.pfd (%d)\n",retval);
|
|
*self->in.pfd = retval;
|
|
self->in.pfd = NULL;
|
|
}
|
|
else
|
|
{
|
|
printf("CMSG nn_closefd(%d)\n",retval);
|
|
nn_closefd(retval);
|
|
}
|
|
break;
|
|
}
|
|
cmsg = CMSG_NXTHDR(hdr,cmsg);
|
|
}
|
|
#else
|
|
if ( hdr->msg_accrightslen > 0 )
|
|
{
|
|
nn_assert(hdr->msg_accrightslen == sizeof(int32_t));
|
|
retval = *((int32_t *)hdr->msg_accrights);
|
|
if ( self->in.pfd )
|
|
{
|
|
*self->in.pfd = retval;
|
|
self->in.pfd = NULL;
|
|
}
|
|
else nn_closefd(retval);
|
|
}
|
|
#endif
|
|
return(retval);
|
|
}
|
|
|
|
static int nn_usock_recv_raw(struct nn_usock *self, void *buf, size_t *len)
|
|
{
|
|
int usebuf = 0;
|
|
size_t sz;
|
|
size_t length;
|
|
ssize_t nbytes;
|
|
struct iovec iov;
|
|
struct msghdr hdr;
|
|
unsigned char ctrl [256];
|
|
length = *len;
|
|
/* If batch buffer doesn't exist, allocate it. The point of delayed
|
|
deallocation to allow non-receiving sockets, such as TCP listening
|
|
sockets, to do without the batch buffer. */
|
|
if (nn_slow (!self->in.batch)) {
|
|
self->in.batch = nn_alloc (NN_USOCK_BATCH_SIZE, "AIO batch buffer");
|
|
alloc_assert (self->in.batch);
|
|
}
|
|
/* Try to satisfy the recv request by data from the batch buffer. */
|
|
sz = self->in.batch_len - self->in.batch_pos;
|
|
if (sz) {
|
|
if (sz > length)
|
|
sz = length;
|
|
memcpy (buf, self->in.batch + self->in.batch_pos, sz);
|
|
self->in.batch_pos += sz;
|
|
buf = ((char*) buf) + sz;
|
|
length -= sz;
|
|
if (!length)
|
|
return 0;
|
|
}
|
|
#if NN_USE_MYMSG
|
|
usebuf = (length >= NN_USOCK_BATCH_SIZE);
|
|
#else
|
|
usebuf = (length >= NN_USOCK_BATCH_SIZE);
|
|
#endif
|
|
// If recv request is greater than the batch buffer, get the data directly into the place. Otherwise, read data to the batch buffer
|
|
if ( usebuf != 0 )
|
|
{
|
|
iov.iov_base = buf;
|
|
iov.iov_len = length;
|
|
}
|
|
else {
|
|
iov.iov_base = self->in.batch;
|
|
iov.iov_len = NN_USOCK_BATCH_SIZE;
|
|
}
|
|
memset(&hdr,0,sizeof(hdr));
|
|
hdr.msg_iov = &iov;
|
|
hdr.msg_iovlen = 1;
|
|
#if defined NN_HAVE_MSG_CONTROL
|
|
hdr.msg_control = ctrl;
|
|
hdr.msg_controllen = sizeof(ctrl);
|
|
#else
|
|
*((int*) ctrl) = -1;
|
|
hdr.msg_accrights = ctrl;
|
|
hdr.msg_accrightslen = sizeof(int);
|
|
#endif
|
|
|
|
#if NN_USE_MYMSG
|
|
nbytes = myrecvmsg(self->s,&hdr,0,(int32_t)iov.iov_len);
|
|
printf("got nbytes.%d from recvmsg errno.%d %s\n",(int32_t)nbytes,errno,nn_strerror(errno));
|
|
#else
|
|
nbytes = recvmsg (self->s, &hdr, 0);
|
|
#endif
|
|
if ( nn_slow(nbytes <= 0) )
|
|
{
|
|
if ( nn_slow(nbytes == 0) )
|
|
return -ECONNRESET;
|
|
if ( nn_fast(errno == EAGAIN || errno == EWOULDBLOCK) ) // Zero bytes received
|
|
nbytes = 0;
|
|
else
|
|
{
|
|
printf("recvraw errno.%d %s\n",errno,nn_strerror(errno));
|
|
// If the peer closes the connection, return ECONNRESET
|
|
errno_assert(errno == ECONNRESET || errno == ENOTCONN || errno == ECONNREFUSED || errno == ETIMEDOUT || errno == EHOSTUNREACH
|
|
#if NN_USE_MYMSG
|
|
|
|
// || errno == EADDRINUSE || errno == EINPROGRESS
|
|
#endif
|
|
);
|
|
return -ECONNRESET;
|
|
}
|
|
} else if ( hdr.msg_controllen > 0 )
|
|
nn_process_cmsg(self,&hdr);
|
|
printf("nbytes.%d length.%d *len %d\n",(int)nbytes,(int)length,(int)*len);
|
|
|
|
// If the data were received directly into the place we can return straight away
|
|
if ( usebuf != 0 )
|
|
{
|
|
length -= nbytes;
|
|
*len -= length;
|
|
return 0;
|
|
}
|
|
// New data were read to the batch buffer. Copy the requested amount of it to the user-supplied buffer
|
|
self->in.batch_len = nbytes;
|
|
self->in.batch_pos = 0;
|
|
if (nbytes) {
|
|
sz = nbytes > (ssize_t)length ? length : (size_t)nbytes;
|
|
memcpy (buf, self->in.batch, sz);
|
|
length -= sz;
|
|
self->in.batch_pos += sz;
|
|
}
|
|
|
|
*len -= length;
|
|
return 0;
|
|
}
|
|
|
|
static int nn_usock_geterr (struct nn_usock *self)
|
|
{
|
|
int rc;
|
|
int opt;
|
|
#if defined NN_HAVE_HPUX
|
|
int optsz;
|
|
#else
|
|
socklen_t optsz;
|
|
#endif
|
|
|
|
opt = 0;
|
|
optsz = sizeof (opt);
|
|
rc = getsockopt (self->s, SOL_SOCKET, SO_ERROR, &opt, &optsz);
|
|
|
|
/* The following should handle both Solaris and UNIXes derived from BSD. */
|
|
if (rc == -1)
|
|
return errno;
|
|
errno_assert (rc == 0);
|
|
nn_assert (optsz == sizeof (opt));
|
|
return opt;
|
|
}
|
|
|