You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1060 lines
35 KiB
1060 lines
35 KiB
/*
|
|
Copyright (c) 2012-2014 Martin Sustrik All rights reserved.
|
|
Copyright (c) 2013 GoPivotal, Inc. All rights reserved.
|
|
|
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
of this software and associated documentation files (the "Software"),
|
|
to deal in the Software without restriction, including without limitation
|
|
the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
|
and/or sell copies of the Software, and to permit persons to whom
|
|
the Software is furnished to do so, subject to the following conditions:
|
|
|
|
The above copyright notice and this permission notice shall be included
|
|
in all copies or substantial portions of the Software.
|
|
|
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
|
|
THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
|
IN THE SOFTWARE.
|
|
*/
|
|
|
|
#include "../protocol.h"
|
|
#include "../transport.h"
|
|
|
|
#include "sock.h"
|
|
#include "global.h"
|
|
#include "ep.h"
|
|
|
|
#include "../utils/err.h"
|
|
#include "../utils/cont.h"
|
|
#include "../utils/fast.h"
|
|
#include "../utils/alloc.h"
|
|
#include "../utils/msg.h"
|
|
|
|
#include <limits.h>
|
|
|
|
/* These bits specify whether individual efds are signalled or not at
|
|
the moment. Storing this information allows us to avoid redundant signalling
|
|
and unsignalling of the efd objects. */
|
|
#define NN_SOCK_FLAG_IN 1
|
|
#define NN_SOCK_FLAG_OUT 2
|
|
|
|
/* Possible states of the socket. */
|
|
#define NN_SOCK_STATE_INIT 1
|
|
#define NN_SOCK_STATE_ACTIVE 2
|
|
#define NN_SOCK_STATE_ZOMBIE 3
|
|
#define NN_SOCK_STATE_STOPPING_EPS 4
|
|
#define NN_SOCK_STATE_STOPPING 5
|
|
|
|
/* Events sent to the state machine. */
|
|
#define NN_SOCK_ACTION_ZOMBIFY 1
|
|
#define NN_SOCK_ACTION_STOPPED 2
|
|
|
|
/* Subordinated source objects. */
|
|
#define NN_SOCK_SRC_EP 1
|
|
|
|
/* Private functions. */
|
|
static struct nn_optset *nn_sock_optset (struct nn_sock *self, int id);
|
|
static int nn_sock_setopt_inner (struct nn_sock *self, int level,
|
|
int option, const void *optval, size_t optvallen);
|
|
static void nn_sock_onleave (struct nn_ctx *self);
|
|
static void nn_sock_handler (struct nn_fsm *self, int src, int type,
|
|
void *srcptr);
|
|
static void nn_sock_shutdown (struct nn_fsm *self, int src, int type,
|
|
void *srcptr);
|
|
static void nn_sock_action_zombify (struct nn_sock *self);
|
|
|
|
int32_t nn_sock_init(struct nn_sock *self,struct nn_socktype *socktype,int32_t fd)
|
|
{
|
|
int32_t i,rc = 0;
|
|
// Make sure that at least one message direction is supported
|
|
nn_assert(!(socktype->flags & NN_SOCKTYPE_FLAG_NOSEND) || !(socktype->flags & NN_SOCKTYPE_FLAG_NORECV));
|
|
// Create the AIO context for the SP socket
|
|
nn_ctx_init(&self->ctx,nn_global_getpool(),nn_sock_onleave);
|
|
// Initialise the state machine
|
|
nn_fsm_init_root(&self->fsm, nn_sock_handler,nn_sock_shutdown, &self->ctx);
|
|
self->state = NN_SOCK_STATE_INIT;
|
|
// Open the NN_SNDFD and NN_RCVFD efds. Do so, only if the socket type supports send/recv, as appropriate
|
|
if ( socktype->flags & NN_SOCKTYPE_FLAG_NOSEND )
|
|
memset(&self->sndfd,0xcd,sizeof(self->sndfd));
|
|
else
|
|
{
|
|
rc = nn_efd_init(&self->sndfd);
|
|
if ( nn_slow(rc < 0) )
|
|
return rc;
|
|
}
|
|
if ( socktype->flags & NN_SOCKTYPE_FLAG_NORECV )
|
|
memset(&self->rcvfd,0xcd,sizeof(self->rcvfd));
|
|
else
|
|
{
|
|
rc = nn_efd_init(&self->rcvfd);
|
|
if ( nn_slow(rc < 0) )
|
|
{
|
|
if ( !(socktype->flags & NN_SOCKTYPE_FLAG_NOSEND) )
|
|
nn_efd_term(&self->sndfd);
|
|
return rc;
|
|
}
|
|
}
|
|
nn_sem_init(&self->termsem);
|
|
if ( nn_slow (rc < 0) )
|
|
{
|
|
if ( !(socktype->flags & NN_SOCKTYPE_FLAG_NORECV) )
|
|
nn_efd_term(&self->rcvfd);
|
|
if ( !(socktype->flags & NN_SOCKTYPE_FLAG_NOSEND) )
|
|
nn_efd_term(&self->sndfd);
|
|
return rc;
|
|
}
|
|
self->flags = 0;
|
|
nn_clock_init(&self->clock);
|
|
nn_list_init(&self->eps);
|
|
nn_list_init(&self->sdeps);
|
|
self->eid = 1;
|
|
// Default values for NN_SOL_SOCKET options
|
|
self->linger = 1000;
|
|
self->sndbuf = NN_USOCK_BATCH_SIZE > 65536 ? NN_USOCK_BATCH_SIZE : 65536;
|
|
self->rcvbuf = NN_USOCK_BATCH_SIZE > 65536 ? NN_USOCK_BATCH_SIZE : 65536;
|
|
self->rcvmaxsize = 1024 * 1024;
|
|
self->sndtimeo = -1;
|
|
self->rcvtimeo = -1;
|
|
self->reconnect_ivl = 100;
|
|
self->reconnect_ivl_max = 0;
|
|
self->ep_template.sndprio = 8;
|
|
self->ep_template.rcvprio = 8;
|
|
self->ep_template.ipv4only = 1;
|
|
// Initialize statistic entries
|
|
self->statistics.established_connections = 0;
|
|
self->statistics.accepted_connections = 0;
|
|
self->statistics.dropped_connections = 0;
|
|
self->statistics.broken_connections = 0;
|
|
self->statistics.connect_errors = 0;
|
|
self->statistics.bind_errors = 0;
|
|
self->statistics.accept_errors = 0;
|
|
self->statistics.messages_sent = 0;
|
|
self->statistics.messages_received = 0;
|
|
self->statistics.bytes_sent = 0;
|
|
self->statistics.bytes_received = 0;
|
|
self->statistics.current_connections = 0;
|
|
self->statistics.inprogress_connections = 0;
|
|
self->statistics.current_snd_priority = 0;
|
|
self->statistics.current_ep_errors = 0;
|
|
// Should be pretty much enough space for just the number
|
|
sprintf(self->socket_name,"%d",fd);
|
|
// The transport-specific options are not initialised immediately, rather, they are allocated later on when needed
|
|
for (i=0; i<NN_MAX_TRANSPORT; i++)
|
|
self->optsets[i] = NULL;
|
|
// Create the specific socket type itself
|
|
rc = socktype->create((void*)self,&self->sockbase);
|
|
errnum_assert(rc == 0, -rc);
|
|
self->socktype = socktype;
|
|
// Launch the state machine
|
|
nn_ctx_enter (&self->ctx);
|
|
nn_fsm_start (&self->fsm);
|
|
nn_ctx_leave (&self->ctx);
|
|
return 0;
|
|
}
|
|
|
|
void nn_sock_stopped (struct nn_sock *self)
|
|
{
|
|
/* TODO: Do the following in a more sane way. */
|
|
self->fsm.stopped.fsm = &self->fsm;
|
|
self->fsm.stopped.src = NN_FSM_ACTION;
|
|
self->fsm.stopped.srcptr = NULL;
|
|
self->fsm.stopped.type = NN_SOCK_ACTION_STOPPED;
|
|
nn_ctx_raise (self->fsm.ctx, &self->fsm.stopped);
|
|
}
|
|
|
|
void nn_sock_zombify (struct nn_sock *self)
|
|
{
|
|
nn_ctx_enter (&self->ctx);
|
|
nn_fsm_action (&self->fsm, NN_SOCK_ACTION_ZOMBIFY);
|
|
nn_ctx_leave (&self->ctx);
|
|
}
|
|
|
|
int nn_sock_term (struct nn_sock *self)
|
|
{
|
|
int rc;
|
|
int i;
|
|
|
|
/* Ask the state machine to start closing the socket. */
|
|
nn_ctx_enter (&self->ctx);
|
|
nn_fsm_stop (&self->fsm);
|
|
nn_ctx_leave (&self->ctx);
|
|
|
|
/* Shutdown process was already started but some endpoints may still
|
|
alive. Here we are going to wait till they are all closed. */
|
|
rc = nn_sem_wait (&self->termsem);
|
|
if (nn_slow (rc == -EINTR))
|
|
return -EINTR;
|
|
errnum_assert (rc == 0, -rc);
|
|
|
|
/* The thread that posted the semaphore can still have the ctx locked
|
|
for a short while. By simply entering the context and exiting it
|
|
immediately we can be sure that the thread in question have already
|
|
exited the context. */
|
|
nn_ctx_enter (&self->ctx);
|
|
nn_ctx_leave (&self->ctx);
|
|
|
|
/* Deallocate the resources. */
|
|
nn_fsm_stopped_noevent (&self->fsm);
|
|
nn_fsm_term (&self->fsm);
|
|
nn_sem_term (&self->termsem);
|
|
nn_list_term (&self->sdeps);
|
|
nn_list_term (&self->eps);
|
|
nn_clock_term (&self->clock);
|
|
nn_ctx_term (&self->ctx);
|
|
|
|
/* Destroy any optsets associated with the socket. */
|
|
for (i = 0; i != NN_MAX_TRANSPORT; ++i)
|
|
if (self->optsets [i])
|
|
self->optsets [i]->vfptr->destroy (self->optsets [i]);
|
|
|
|
return 0;
|
|
}
|
|
|
|
struct nn_ctx *nn_sock_getctx (struct nn_sock *self)
|
|
{
|
|
return &self->ctx;
|
|
}
|
|
|
|
int nn_sock_ispeer (struct nn_sock *self, int socktype)
|
|
{
|
|
/* If the peer implements a different SP protocol it is not a valid peer.
|
|
Checking it here ensures that even if faulty protocol implementation
|
|
allows for cross-protocol communication, it will never happen
|
|
in practice. */
|
|
if ((self->socktype->protocol & 0xfff0) != (socktype & 0xfff0))
|
|
return 0;
|
|
|
|
/* As long as the peer speaks the same protocol, socket type itself
|
|
decides which socket types are to be accepted. */
|
|
return self->socktype->ispeer (socktype);
|
|
}
|
|
|
|
int nn_sock_setopt (struct nn_sock *self, int level, int option,
|
|
const void *optval, size_t optvallen)
|
|
{
|
|
int rc;
|
|
|
|
nn_ctx_enter (&self->ctx);
|
|
if (nn_slow (self->state == NN_SOCK_STATE_ZOMBIE)) {
|
|
nn_ctx_leave (&self->ctx);
|
|
return -ETERM;
|
|
}
|
|
rc = nn_sock_setopt_inner (self, level, option, optval, optvallen);
|
|
nn_ctx_leave (&self->ctx);
|
|
|
|
return rc;
|
|
}
|
|
|
|
static int nn_sock_setopt_inner (struct nn_sock *self, int level,
|
|
int option, const void *optval, size_t optvallen)
|
|
{
|
|
struct nn_optset *optset;
|
|
int val;
|
|
int *dst;
|
|
|
|
/* Protocol-specific socket options. */
|
|
if (level > NN_SOL_SOCKET)
|
|
return self->sockbase->vfptr->setopt (self->sockbase, level, option,
|
|
optval, optvallen);
|
|
|
|
/* Transport-specific options. */
|
|
if (level < NN_SOL_SOCKET) {
|
|
optset = nn_sock_optset (self, level);
|
|
if (!optset)
|
|
return -ENOPROTOOPT;
|
|
return optset->vfptr->setopt (optset, option, optval, optvallen);
|
|
}
|
|
|
|
/* Special-casing socket name for now as it's the only string option */
|
|
if (level == NN_SOL_SOCKET && option == NN_SOCKET_NAME) {
|
|
if (optvallen > 63)
|
|
return -EINVAL;
|
|
memcpy (self->socket_name, optval, optvallen);
|
|
self->socket_name [optvallen] = 0;
|
|
return 0;
|
|
}
|
|
|
|
/* At this point we assume that all options are of type int. */
|
|
if (optvallen != sizeof (int))
|
|
return -EINVAL;
|
|
val = *(int*) optval;
|
|
|
|
/* Generic socket-level options. */
|
|
if (level == NN_SOL_SOCKET) {
|
|
switch (option) {
|
|
case NN_LINGER:
|
|
dst = &self->linger;
|
|
break;
|
|
case NN_SNDBUF:
|
|
if (nn_slow (val <= 0))
|
|
return -EINVAL;
|
|
dst = &self->sndbuf;
|
|
break;
|
|
case NN_RCVBUF:
|
|
if (nn_slow (val <= 0))
|
|
return -EINVAL;
|
|
dst = &self->rcvbuf;
|
|
break;
|
|
case NN_RCVMAXSIZE:
|
|
if (nn_slow (val < -1))
|
|
return -EINVAL;
|
|
dst = &self->rcvmaxsize;
|
|
break;
|
|
case NN_SNDTIMEO:
|
|
dst = &self->sndtimeo;
|
|
break;
|
|
case NN_RCVTIMEO:
|
|
dst = &self->rcvtimeo;
|
|
break;
|
|
case NN_RECONNECT_IVL:
|
|
if (nn_slow (val < 0))
|
|
return -EINVAL;
|
|
dst = &self->reconnect_ivl;
|
|
break;
|
|
case NN_RECONNECT_IVL_MAX:
|
|
if (nn_slow (val < 0))
|
|
return -EINVAL;
|
|
dst = &self->reconnect_ivl_max;
|
|
break;
|
|
case NN_SNDPRIO:
|
|
if (nn_slow (val < 1 || val > 16))
|
|
return -EINVAL;
|
|
dst = &self->ep_template.sndprio;
|
|
break;
|
|
case NN_RCVPRIO:
|
|
if (nn_slow (val < 1 || val > 16))
|
|
return -EINVAL;
|
|
dst = &self->ep_template.rcvprio;
|
|
break;
|
|
case NN_IPV4ONLY:
|
|
if (nn_slow (val != 0 && val != 1))
|
|
return -EINVAL;
|
|
dst = &self->ep_template.ipv4only;
|
|
break;
|
|
default:
|
|
return -ENOPROTOOPT;
|
|
}
|
|
*dst = val;
|
|
|
|
return 0;
|
|
}
|
|
|
|
nn_assert (0);
|
|
}
|
|
|
|
int nn_sock_getopt (struct nn_sock *self, int level, int option,
|
|
void *optval, size_t *optvallen)
|
|
{
|
|
int rc;
|
|
|
|
nn_ctx_enter (&self->ctx);
|
|
if (nn_slow (self->state == NN_SOCK_STATE_ZOMBIE)) {
|
|
nn_ctx_leave (&self->ctx);
|
|
return -ETERM;
|
|
}
|
|
rc = nn_sock_getopt_inner (self, level, option, optval, optvallen);
|
|
nn_ctx_leave (&self->ctx);
|
|
|
|
return rc;
|
|
}
|
|
|
|
int nn_sock_getopt_inner (struct nn_sock *self, int level,
|
|
int option, void *optval, size_t *optvallen)
|
|
{
|
|
int rc;
|
|
struct nn_optset *optset;
|
|
int intval;
|
|
nn_fd fd;
|
|
|
|
/* Generic socket-level options. */
|
|
if (level == NN_SOL_SOCKET) {
|
|
switch (option) {
|
|
case NN_DOMAIN:
|
|
intval = self->socktype->domain;
|
|
break;
|
|
case NN_PROTOCOL:
|
|
intval = self->socktype->protocol;
|
|
break;
|
|
case NN_LINGER:
|
|
intval = self->linger;
|
|
break;
|
|
case NN_SNDBUF:
|
|
intval = self->sndbuf;
|
|
break;
|
|
case NN_RCVBUF:
|
|
intval = self->rcvbuf;
|
|
break;
|
|
case NN_RCVMAXSIZE:
|
|
intval = self->rcvmaxsize;
|
|
break;
|
|
case NN_SNDTIMEO:
|
|
intval = self->sndtimeo;
|
|
break;
|
|
case NN_RCVTIMEO:
|
|
intval = self->rcvtimeo;
|
|
break;
|
|
case NN_RECONNECT_IVL:
|
|
intval = self->reconnect_ivl;
|
|
break;
|
|
case NN_RECONNECT_IVL_MAX:
|
|
intval = self->reconnect_ivl_max;
|
|
break;
|
|
case NN_SNDPRIO:
|
|
intval = self->ep_template.sndprio;
|
|
break;
|
|
case NN_RCVPRIO:
|
|
intval = self->ep_template.rcvprio;
|
|
break;
|
|
case NN_IPV4ONLY:
|
|
intval = self->ep_template.ipv4only;
|
|
break;
|
|
case NN_SNDFD:
|
|
if (self->socktype->flags & NN_SOCKTYPE_FLAG_NOSEND)
|
|
return -ENOPROTOOPT;
|
|
fd = nn_efd_getfd (&self->sndfd);
|
|
memcpy (optval, &fd,
|
|
*optvallen < sizeof (nn_fd) ? *optvallen : sizeof (nn_fd));
|
|
*optvallen = sizeof (nn_fd);
|
|
return 0;
|
|
case NN_RCVFD:
|
|
if (self->socktype->flags & NN_SOCKTYPE_FLAG_NORECV)
|
|
return -ENOPROTOOPT;
|
|
fd = nn_efd_getfd (&self->rcvfd);
|
|
memcpy (optval, &fd,
|
|
*optvallen < sizeof (nn_fd) ? *optvallen : sizeof (nn_fd));
|
|
*optvallen = sizeof (nn_fd);
|
|
return 0;
|
|
case NN_SOCKET_NAME:
|
|
strncpy (optval, self->socket_name, *optvallen);
|
|
*optvallen = strlen(self->socket_name);
|
|
return 0;
|
|
default:
|
|
return -ENOPROTOOPT;
|
|
}
|
|
|
|
memcpy (optval, &intval,
|
|
*optvallen < sizeof (int) ? *optvallen : sizeof (int));
|
|
*optvallen = sizeof (int);
|
|
|
|
return 0;
|
|
}
|
|
|
|
/* Protocol-specific socket options. */
|
|
if (level > NN_SOL_SOCKET)
|
|
return rc = self->sockbase->vfptr->getopt (self->sockbase,
|
|
level, option, optval, optvallen);
|
|
|
|
/* Transport-specific options. */
|
|
if (level < NN_SOL_SOCKET) {
|
|
optset = nn_sock_optset (self, level);
|
|
if (!optset)
|
|
return -ENOPROTOOPT;
|
|
return optset->vfptr->getopt (optset, option, optval, optvallen);
|
|
}
|
|
nn_assert (0);
|
|
}
|
|
|
|
struct nn_ep *nn_find_ep(struct nn_sock *self,int32_t eid,const char *addr,struct nn_transport *transport,int32_t bind)
|
|
{
|
|
struct nn_ep *ep; struct nn_list_item *it;
|
|
ep = NULL; // Find the specified enpoint
|
|
for (it=nn_list_begin(&self->eps); it!=nn_list_end(&self->eps); it=nn_list_next(&self->eps,it))
|
|
{
|
|
ep = nn_cont(it,struct nn_ep,item);
|
|
if ( addr == 0 && ep->eid == eid )
|
|
break;
|
|
else if ( addr != 0 && transport != 0 && strcmp(addr,ep->addr) == 0 && ep->transport == transport && ep->bind == bind )
|
|
break;
|
|
ep = NULL;
|
|
}
|
|
return(ep);
|
|
}
|
|
|
|
int nn_sock_add_ep(struct nn_sock *self,struct nn_transport *transport,int32_t bind,const char *addr)
|
|
{
|
|
int rc,eid; struct nn_ep *ep;
|
|
printf("nn_sock_add_ep\n");
|
|
nn_ctx_enter (&self->ctx);
|
|
if ( (ep= nn_find_ep(self,0,addr,transport,bind)) == NULL ) // The endpoint doesn't exist
|
|
{
|
|
ep = nn_alloc(sizeof(struct nn_ep),"endpoint"); // Instantiate the endpoint
|
|
rc = nn_ep_init(ep,NN_SOCK_SRC_EP,self,self->eid,transport,bind,addr);
|
|
if ( nn_slow(rc < 0) )
|
|
{
|
|
printf("nn_sock_add_ep nn_ep_init rc.%d\n",rc);
|
|
nn_free(ep);
|
|
nn_ctx_leave(&self->ctx);
|
|
return rc;
|
|
}
|
|
nn_ep_start(ep);
|
|
printf("ep sock.(%s) started %s://(%s) bind.%d\n",self->socket_name,transport->name,addr,bind);
|
|
eid = self->eid++; // Increase the endpoint ID for the next endpoint
|
|
nn_list_insert(&self->eps,&ep->item,nn_list_end(&self->eps)); // Add to the list of active endpoints
|
|
nn_ctx_leave (&self->ctx);
|
|
} else printf("self->sock.(%s) %p already has (%s)\n",self->socket_name,self->sockbase->sock,addr);
|
|
return(ep->eid);
|
|
}
|
|
|
|
int32_t nn_sock_rm_ep(struct nn_sock *self,int32_t eid)
|
|
{
|
|
struct nn_ep *ep;
|
|
nn_ctx_enter(&self->ctx);
|
|
if ( (ep= nn_find_ep(self,eid,0,0,0)) == NULL ) // The endpoint doesn't exist
|
|
{
|
|
nn_ctx_leave(&self->ctx);
|
|
return -EINVAL;
|
|
}
|
|
// Move the endpoint from the list of active endpoints to the list of shutting down endpoints.
|
|
nn_list_erase(&self->eps,&ep->item);
|
|
nn_list_insert(&self->sdeps,&ep->item,nn_list_end(&self->sdeps));
|
|
// Ask the endpoint to stop. Actual terminatation may be delayed by the transport.
|
|
nn_ep_stop(ep);
|
|
nn_ctx_leave(&self->ctx);
|
|
return 0;
|
|
}
|
|
|
|
int nn_sock_send(struct nn_sock *self, struct nn_msg *msg, int flags)
|
|
{
|
|
int rc;
|
|
uint64_t deadline;
|
|
uint64_t now;
|
|
int timeout;
|
|
|
|
/* Some sockets types cannot be used for sending messages. */
|
|
if (nn_slow (self->socktype->flags & NN_SOCKTYPE_FLAG_NOSEND))
|
|
return -ENOTSUP;
|
|
|
|
nn_ctx_enter (&self->ctx);
|
|
|
|
/* Compute the deadline for SNDTIMEO timer. */
|
|
if (self->sndtimeo < 0) {
|
|
deadline = -1;
|
|
timeout = -1;
|
|
}
|
|
else {
|
|
deadline = nn_clock_now (&self->clock) + self->sndtimeo;
|
|
timeout = self->sndtimeo;
|
|
}
|
|
|
|
while (1) {
|
|
|
|
/* If nn_term() was already called, return ETERM. */
|
|
if (nn_slow (self->state == NN_SOCK_STATE_ZOMBIE)) {
|
|
nn_ctx_leave (&self->ctx);
|
|
return -ETERM;
|
|
}
|
|
|
|
/* Try to send the message in a non-blocking way. */
|
|
rc = self->sockbase->vfptr->send (self->sockbase, msg);
|
|
printf("sockbase send rc.%d\n",rc);
|
|
if (nn_fast (rc == 0)) {
|
|
nn_ctx_leave (&self->ctx);
|
|
return 0;
|
|
}
|
|
nn_assert (rc < 0);
|
|
|
|
/* Any unexpected error is forwarded to the caller. */
|
|
if (nn_slow (rc != -EAGAIN)) {
|
|
nn_ctx_leave (&self->ctx);
|
|
return rc;
|
|
}
|
|
|
|
// If the message cannot be sent at the moment and the send call is non-blocking, return immediately.
|
|
if (nn_fast (flags & NN_DONTWAIT)) {
|
|
nn_ctx_leave (&self->ctx);
|
|
return -EAGAIN;
|
|
}
|
|
|
|
/* With blocking send, wait while there are new pipes available
|
|
for sending. */
|
|
nn_ctx_leave (&self->ctx);
|
|
rc = nn_efd_wait (&self->sndfd, timeout);
|
|
if (nn_slow (rc == -ETIMEDOUT))
|
|
return -EAGAIN;
|
|
if (nn_slow (rc == -EINTR))
|
|
return -EINTR;
|
|
errnum_assert (rc == 0, rc);
|
|
nn_ctx_enter (&self->ctx);
|
|
/*
|
|
* Double check if pipes are still available for sending
|
|
*/
|
|
if (!nn_efd_wait (&self->sndfd, 0)) {
|
|
self->flags |= NN_SOCK_FLAG_OUT;
|
|
}
|
|
|
|
/* If needed, re-compute the timeout to reflect the time that have
|
|
already elapsed. */
|
|
if (self->sndtimeo >= 0) {
|
|
now = nn_clock_now (&self->clock);
|
|
timeout = (int) (now > deadline ? 0 : deadline - now);
|
|
}
|
|
}
|
|
}
|
|
|
|
int nn_sock_recv(struct nn_sock *self, struct nn_msg *msg, int flags)
|
|
{
|
|
int rc;
|
|
uint64_t deadline;
|
|
uint64_t now;
|
|
int timeout;
|
|
|
|
/* Some sockets types cannot be used for receiving messages. */
|
|
if (nn_slow (self->socktype->flags & NN_SOCKTYPE_FLAG_NORECV))
|
|
return -ENOTSUP;
|
|
nn_ctx_enter (&self->ctx);
|
|
|
|
/* Compute the deadline for RCVTIMEO timer. */
|
|
if (self->rcvtimeo < 0) {
|
|
deadline = -1;
|
|
timeout = -1;
|
|
}
|
|
else {
|
|
deadline = nn_clock_now (&self->clock) + self->rcvtimeo;
|
|
timeout = self->rcvtimeo;
|
|
}
|
|
|
|
while (1) {
|
|
|
|
/* If nn_term() was already called, return ETERM. */
|
|
if (nn_slow (self->state == NN_SOCK_STATE_ZOMBIE)) {
|
|
nn_ctx_leave (&self->ctx);
|
|
return -ETERM;
|
|
}
|
|
/* Try to receive the message in a non-blocking way. */
|
|
rc = self->sockbase->vfptr->recv(self->sockbase,msg);
|
|
//printf("%p inside nn_sock_recv rc.%d\n",self->sockbase->vfptr->recv,rc);
|
|
if (nn_fast (rc == 0)) {
|
|
nn_ctx_leave (&self->ctx);
|
|
return 0;
|
|
}
|
|
nn_assert (rc < 0);
|
|
|
|
/* Any unexpected error is forwarded to the caller. */
|
|
if (nn_slow (rc != -EAGAIN)) {
|
|
nn_ctx_leave (&self->ctx);
|
|
return rc;
|
|
}
|
|
|
|
/* If the message cannot be received at the moment and the recv call
|
|
is non-blocking, return immediately. */
|
|
if (nn_fast (flags & NN_DONTWAIT)) {
|
|
nn_ctx_leave (&self->ctx);
|
|
return -EAGAIN;
|
|
}
|
|
|
|
/* With blocking recv, wait while there are new pipes available
|
|
for receiving. */
|
|
nn_ctx_leave (&self->ctx);
|
|
rc = nn_efd_wait (&self->rcvfd, timeout);
|
|
if (nn_slow (rc == -ETIMEDOUT))
|
|
return -EAGAIN;
|
|
if (nn_slow (rc == -EINTR))
|
|
return -EINTR;
|
|
errnum_assert (rc == 0, rc);
|
|
nn_ctx_enter (&self->ctx);
|
|
/*
|
|
* Double check if pipes are still available for receiving
|
|
*/
|
|
if (!nn_efd_wait (&self->rcvfd, 0)) {
|
|
self->flags |= NN_SOCK_FLAG_IN;
|
|
}
|
|
|
|
// If needed, re-compute the timeout to reflect the time that have already elapsed
|
|
if (self->rcvtimeo >= 0) {
|
|
now = nn_clock_now (&self->clock);
|
|
timeout = (int) (now > deadline ? 0 : deadline - now);
|
|
}
|
|
}
|
|
}
|
|
|
|
int nn_sock_add(struct nn_sock *self, struct nn_pipe *pipe)
|
|
{
|
|
int rc;
|
|
rc = self->sockbase->vfptr->add(self->sockbase,pipe);
|
|
printf("nn_sock_add rc.%d\n",rc);
|
|
if (nn_slow (rc >= 0)) {
|
|
nn_sock_stat_increment (self, NN_STAT_CURRENT_CONNECTIONS, 1);
|
|
}
|
|
return rc;
|
|
}
|
|
|
|
void nn_sock_rm (struct nn_sock *self, struct nn_pipe *pipe)
|
|
{
|
|
self->sockbase->vfptr->rm (self->sockbase, pipe);
|
|
nn_sock_stat_increment (self, NN_STAT_CURRENT_CONNECTIONS, -1);
|
|
}
|
|
|
|
static void nn_sock_onleave (struct nn_ctx *self)
|
|
{
|
|
struct nn_sock *sock;
|
|
int events;
|
|
|
|
sock = nn_cont (self, struct nn_sock, ctx);
|
|
|
|
/* If nn_close() was already called there's no point in adjusting the
|
|
snd/rcv file descriptors. */
|
|
if (nn_slow (sock->state != NN_SOCK_STATE_ACTIVE))
|
|
return;
|
|
|
|
/* Check whether socket is readable and/or writable at the moment. */
|
|
events = sock->sockbase->vfptr->events (sock->sockbase);
|
|
errnum_assert (events >= 0, -events);
|
|
|
|
/* Signal/unsignal IN as needed. */
|
|
if (!(sock->socktype->flags & NN_SOCKTYPE_FLAG_NORECV)) {
|
|
if (events & NN_SOCKBASE_EVENT_IN) {
|
|
if (!(sock->flags & NN_SOCK_FLAG_IN)) {
|
|
sock->flags |= NN_SOCK_FLAG_IN;
|
|
nn_efd_signal (&sock->rcvfd);
|
|
}
|
|
}
|
|
else {
|
|
if (sock->flags & NN_SOCK_FLAG_IN) {
|
|
sock->flags &= ~NN_SOCK_FLAG_IN;
|
|
nn_efd_unsignal (&sock->rcvfd);
|
|
}
|
|
}
|
|
}
|
|
|
|
/* Signal/unsignal OUT as needed. */
|
|
if (!(sock->socktype->flags & NN_SOCKTYPE_FLAG_NOSEND)) {
|
|
if (events & NN_SOCKBASE_EVENT_OUT) {
|
|
if (!(sock->flags & NN_SOCK_FLAG_OUT)) {
|
|
sock->flags |= NN_SOCK_FLAG_OUT;
|
|
nn_efd_signal (&sock->sndfd);
|
|
}
|
|
}
|
|
else {
|
|
if (sock->flags & NN_SOCK_FLAG_OUT) {
|
|
sock->flags &= ~NN_SOCK_FLAG_OUT;
|
|
nn_efd_unsignal (&sock->sndfd);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
static struct nn_optset *nn_sock_optset (struct nn_sock *self, int id)
|
|
{
|
|
int index;
|
|
struct nn_transport *tp;
|
|
|
|
/* Transport IDs are negative and start from -1. */
|
|
index = (-id) - 1;
|
|
|
|
/* Check for invalid indices. */
|
|
if (nn_slow (index < 0 || index >= NN_MAX_TRANSPORT))
|
|
return NULL;
|
|
|
|
/* If the option set already exists return it. */
|
|
if (nn_fast (self->optsets [index] != NULL))
|
|
return self->optsets [index];
|
|
|
|
/* If the option set doesn't exist yet, create it. */
|
|
tp = nn_global_transport (id);
|
|
if (nn_slow (!tp))
|
|
return NULL;
|
|
if (nn_slow (!tp->optset))
|
|
return NULL;
|
|
self->optsets [index] = tp->optset ();
|
|
|
|
return self->optsets [index];
|
|
}
|
|
|
|
static void nn_sock_shutdown (struct nn_fsm *self, int src, int type,
|
|
void *srcptr)
|
|
{
|
|
struct nn_sock *sock;
|
|
struct nn_list_item *it;
|
|
struct nn_ep *ep;
|
|
|
|
sock = nn_cont (self, struct nn_sock, fsm);
|
|
|
|
if (nn_slow (src == NN_FSM_ACTION && type == NN_FSM_STOP)) {
|
|
nn_assert (sock->state == NN_SOCK_STATE_ACTIVE ||
|
|
sock->state == NN_SOCK_STATE_ZOMBIE);
|
|
|
|
/* Close sndfd and rcvfd. This should make any current
|
|
select/poll using SNDFD and/or RCVFD exit. */
|
|
if (!(sock->socktype->flags & NN_SOCKTYPE_FLAG_NORECV)) {
|
|
nn_efd_term (&sock->rcvfd);
|
|
memset (&sock->rcvfd, 0xcd, sizeof (sock->rcvfd));
|
|
}
|
|
if (!(sock->socktype->flags & NN_SOCKTYPE_FLAG_NOSEND)) {
|
|
nn_efd_term (&sock->sndfd);
|
|
memset (&sock->sndfd, 0xcd, sizeof (sock->sndfd));
|
|
}
|
|
|
|
/* Ask all the associated endpoints to stop. */
|
|
it = nn_list_begin (&sock->eps);
|
|
while (it != nn_list_end (&sock->eps)) {
|
|
ep = nn_cont (it, struct nn_ep, item);
|
|
it = nn_list_next (&sock->eps, it);
|
|
nn_list_erase (&sock->eps, &ep->item);
|
|
nn_list_insert (&sock->sdeps, &ep->item,
|
|
nn_list_end (&sock->sdeps));
|
|
nn_ep_stop (ep);
|
|
|
|
}
|
|
sock->state = NN_SOCK_STATE_STOPPING_EPS;
|
|
goto finish2;
|
|
}
|
|
if (nn_slow (sock->state == NN_SOCK_STATE_STOPPING_EPS)) {
|
|
|
|
/* Endpoint is stopped. Now we can safely deallocate it. */
|
|
if (!(src == NN_SOCK_SRC_EP && type == NN_EP_STOPPED)) {
|
|
fprintf (stderr, "src=%d type=%d\n", (int) src, (int) type);
|
|
nn_assert (src == NN_SOCK_SRC_EP && type == NN_EP_STOPPED);
|
|
}
|
|
ep = (struct nn_ep*) srcptr;
|
|
nn_list_erase (&sock->sdeps, &ep->item);
|
|
nn_ep_term (ep);
|
|
nn_free (ep);
|
|
|
|
finish2:
|
|
/* If all the endpoints are deallocated, we can start stopping
|
|
protocol-specific part of the socket. If there' no stop function
|
|
we can consider it stopped straight away. */
|
|
if (!nn_list_empty (&sock->sdeps))
|
|
return;
|
|
nn_assert (nn_list_empty (&sock->eps));
|
|
sock->state = NN_SOCK_STATE_STOPPING;
|
|
if (!sock->sockbase->vfptr->stop)
|
|
goto finish1;
|
|
sock->sockbase->vfptr->stop (sock->sockbase);
|
|
return;
|
|
}
|
|
if (nn_slow (sock->state == NN_SOCK_STATE_STOPPING)) {
|
|
|
|
/* We get here when the deallocation of the socket was delayed by the
|
|
specific socket type. */
|
|
nn_assert (src == NN_FSM_ACTION && type == NN_SOCK_ACTION_STOPPED);
|
|
|
|
finish1:
|
|
/* Protocol-specific part of the socket is stopped.
|
|
We can safely deallocate it. */
|
|
sock->sockbase->vfptr->destroy (sock->sockbase);
|
|
sock->state = NN_SOCK_STATE_INIT;
|
|
|
|
/* Now we can unblock the application thread blocked in
|
|
the nn_close() call. */
|
|
nn_sem_post (&sock->termsem);
|
|
|
|
return;
|
|
}
|
|
|
|
nn_fsm_bad_state(sock->state, src, type);
|
|
}
|
|
|
|
static void nn_sock_handler(struct nn_fsm *self,int src,int type,void *srcptr)
|
|
{
|
|
struct nn_sock *sock;
|
|
struct nn_ep *ep;
|
|
|
|
sock = nn_cont (self, struct nn_sock, fsm);
|
|
//printf("sock handler state.%d\n",sock->state);
|
|
switch ( sock->state )
|
|
{
|
|
|
|
/******************************************************************************/
|
|
/* INIT state. */
|
|
/******************************************************************************/
|
|
case NN_SOCK_STATE_INIT:
|
|
switch (src) {
|
|
|
|
case NN_FSM_ACTION:
|
|
switch (type) {
|
|
case NN_FSM_START:
|
|
sock->state = NN_SOCK_STATE_ACTIVE;
|
|
return;
|
|
case NN_SOCK_ACTION_ZOMBIFY:
|
|
nn_sock_action_zombify (sock);
|
|
return;
|
|
default:
|
|
nn_fsm_bad_action (sock->state, src, type);
|
|
}
|
|
|
|
default:
|
|
nn_fsm_bad_source (sock->state, src, type);
|
|
}
|
|
|
|
/******************************************************************************/
|
|
/* ACTIVE state. */
|
|
/******************************************************************************/
|
|
case NN_SOCK_STATE_ACTIVE:
|
|
switch (src) {
|
|
|
|
case NN_FSM_ACTION:
|
|
switch (type) {
|
|
case NN_SOCK_ACTION_ZOMBIFY:
|
|
nn_sock_action_zombify (sock);
|
|
return;
|
|
default:
|
|
nn_fsm_bad_action (sock->state, src, type);
|
|
}
|
|
|
|
case NN_SOCK_SRC_EP:
|
|
switch (type) {
|
|
case NN_EP_STOPPED:
|
|
printf("NN_SOCK_SRC_EP NN_EP_STOPPED\n");
|
|
// This happens when an endpoint is closed using nn_shutdown() function
|
|
ep = (struct nn_ep*) srcptr;
|
|
nn_list_erase (&sock->sdeps, &ep->item);
|
|
nn_ep_term (ep);
|
|
nn_free (ep);
|
|
return;
|
|
|
|
default:
|
|
nn_fsm_bad_action (sock->state, src, type);
|
|
}
|
|
|
|
default:
|
|
|
|
// The assumption is that all the other events come from pipes
|
|
//printf("nn_sock_handler state.%d type.%d\n",sock->state,type);
|
|
switch ( type )
|
|
{
|
|
case NN_PIPE_IN:
|
|
sock->sockbase->vfptr->in(sock->sockbase,(struct nn_pipe *)srcptr);
|
|
return;
|
|
case NN_PIPE_OUT:
|
|
sock->sockbase->vfptr->out(sock->sockbase,(struct nn_pipe *)srcptr);
|
|
return;
|
|
default:
|
|
nn_fsm_bad_action(sock->state, src, type);
|
|
}
|
|
}
|
|
|
|
/******************************************************************************/
|
|
/* ZOMBIE state. */
|
|
/******************************************************************************/
|
|
case NN_SOCK_STATE_ZOMBIE:
|
|
nn_fsm_bad_state (sock->state, src, type);
|
|
|
|
/******************************************************************************/
|
|
/* Invalid state. */
|
|
/******************************************************************************/
|
|
default:
|
|
nn_fsm_bad_state (sock->state, src, type);
|
|
}
|
|
}
|
|
|
|
/******************************************************************************/
|
|
/* State machine actions. */
|
|
/******************************************************************************/
|
|
|
|
static void nn_sock_action_zombify (struct nn_sock *self)
|
|
{
|
|
/* Switch to the zombie state. From now on all the socket
|
|
functions will return ETERM. */
|
|
self->state = NN_SOCK_STATE_ZOMBIE;
|
|
|
|
/* Set IN and OUT events to unblock any polling function. */
|
|
if (!(self->flags & NN_SOCK_FLAG_IN)) {
|
|
self->flags |= NN_SOCK_FLAG_IN;
|
|
if (!(self->socktype->flags & NN_SOCKTYPE_FLAG_NORECV))
|
|
nn_efd_signal (&self->rcvfd);
|
|
}
|
|
if (!(self->flags & NN_SOCK_FLAG_OUT)) {
|
|
self->flags |= NN_SOCK_FLAG_OUT;
|
|
if (!(self->socktype->flags & NN_SOCKTYPE_FLAG_NOSEND))
|
|
nn_efd_signal (&self->sndfd);
|
|
}
|
|
}
|
|
|
|
void nn_sock_report_error(struct nn_sock *self,struct nn_ep *ep,int32_t errnum,char *fname,int32_t linenum)
|
|
{
|
|
#ifndef __PNACL
|
|
if ( !nn_global_print_errors() )
|
|
return;
|
|
#endif
|
|
if ( errnum == 0 )
|
|
return;
|
|
if ( ep != 0 )
|
|
{
|
|
fprintf(stderr,"nanomsg: socket.%s[%s]: Error: %s\n",self->socket_name,nn_ep_getaddr(ep),nn_strerror(errnum));
|
|
PNACL_msg("nanomsg: socket.%s[%s]: [%s:%d] Error: %s\n",self->socket_name,nn_ep_getaddr(ep),fname,linenum,nn_strerror(errnum));
|
|
}
|
|
else
|
|
{
|
|
fprintf(stderr,"nanomsg: socket.%s: Error: %s\n",self->socket_name, nn_strerror(errnum));
|
|
PNACL_msg("nanomsg: socket.%s: [%s:%d] Error: %s\n",self->socket_name,fname,linenum,nn_strerror(errnum));
|
|
}
|
|
}
|
|
|
|
void nn_sock_stat_increment (struct nn_sock *self, int name, int64_t increment)
|
|
{
|
|
switch (name) {
|
|
case NN_STAT_ESTABLISHED_CONNECTIONS:
|
|
nn_assert (increment > 0);
|
|
self->statistics.established_connections += increment;
|
|
break;
|
|
case NN_STAT_ACCEPTED_CONNECTIONS:
|
|
nn_assert (increment > 0);
|
|
self->statistics.accepted_connections += increment;
|
|
break;
|
|
case NN_STAT_DROPPED_CONNECTIONS:
|
|
nn_assert (increment > 0);
|
|
self->statistics.dropped_connections += increment;
|
|
break;
|
|
case NN_STAT_BROKEN_CONNECTIONS:
|
|
nn_assert (increment > 0);
|
|
self->statistics.broken_connections += increment;
|
|
break;
|
|
case NN_STAT_CONNECT_ERRORS:
|
|
nn_assert (increment > 0);
|
|
self->statistics.connect_errors += increment;
|
|
break;
|
|
case NN_STAT_BIND_ERRORS:
|
|
nn_assert (increment > 0);
|
|
self->statistics.bind_errors += increment;
|
|
break;
|
|
case NN_STAT_ACCEPT_ERRORS:
|
|
nn_assert (increment > 0);
|
|
self->statistics.accept_errors += increment;
|
|
break;
|
|
case NN_STAT_MESSAGES_SENT:
|
|
nn_assert (increment > 0);
|
|
self->statistics.messages_sent += increment;
|
|
break;
|
|
case NN_STAT_MESSAGES_RECEIVED:
|
|
nn_assert (increment > 0);
|
|
self->statistics.messages_received += increment;
|
|
break;
|
|
case NN_STAT_BYTES_SENT:
|
|
nn_assert (increment >= 0);
|
|
self->statistics.bytes_sent += increment;
|
|
break;
|
|
case NN_STAT_BYTES_RECEIVED:
|
|
nn_assert (increment >= 0);
|
|
self->statistics.bytes_received += increment;
|
|
break;
|
|
|
|
case NN_STAT_CURRENT_CONNECTIONS:
|
|
nn_assert (increment > 0 ||
|
|
self->statistics.current_connections >= -increment);
|
|
nn_assert(increment < INT_MAX && increment > -INT_MAX);
|
|
self->statistics.current_connections += (int) increment;
|
|
break;
|
|
case NN_STAT_INPROGRESS_CONNECTIONS:
|
|
nn_assert (increment > 0 ||
|
|
self->statistics.inprogress_connections >= -increment);
|
|
nn_assert(increment < INT_MAX && increment > -INT_MAX);
|
|
self->statistics.inprogress_connections += (int) increment;
|
|
break;
|
|
case NN_STAT_CURRENT_SND_PRIORITY:
|
|
/* This is an exception, we don't want to increment priority */
|
|
nn_assert((increment > 0 && increment <= 16) || increment == -1);
|
|
self->statistics.current_snd_priority = (int) increment;
|
|
break;
|
|
case NN_STAT_CURRENT_EP_ERRORS:
|
|
nn_assert (increment > 0 ||
|
|
self->statistics.current_ep_errors >= -increment);
|
|
nn_assert(increment < INT_MAX && increment > -INT_MAX);
|
|
self->statistics.current_ep_errors += (int) increment;
|
|
break;
|
|
}
|
|
}
|
|
|