You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
526 lines
17 KiB
526 lines
17 KiB
/*
|
|
Copyright (c) 2012-2013 Martin Sustrik All rights reserved.
|
|
Copyright 2015 Garrett D'Amore <garrett@damore.org>
|
|
|
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
of this software and associated documentation files (the "Software"),
|
|
to deal in the Software without restriction, including without limitation
|
|
the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
|
and/or sell copies of the Software, and to permit persons to whom
|
|
the Software is furnished to do so, subject to the following conditions:
|
|
|
|
The above copyright notice and this permission notice shall be included
|
|
in all copies or substantial portions of the Software.
|
|
|
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
|
|
THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
|
IN THE SOFTWARE.
|
|
*/
|
|
|
|
#include "surveyor.h"
|
|
#include "xsurveyor.h"
|
|
|
|
#include "../../nn.h"
|
|
#include "../../survey.h"
|
|
|
|
#include "../../aio/fsm.h"
|
|
#include "../../aio/timer.h"
|
|
|
|
#include "../../utils/err.h"
|
|
#include "../../utils/cont.h"
|
|
#include "../../utils/fast.h"
|
|
#include "../../utils/wire.h"
|
|
#include "../../utils/alloc.h"
|
|
#include "../../utils/random.h"
|
|
#include "../../utils/list.h"
|
|
#include "../../utils/int.h"
|
|
#include "../../utils/attr.h"
|
|
|
|
#include <string.h>
|
|
|
|
#define NN_SURVEYOR_DEFAULT_DEADLINE 1000
|
|
|
|
#define NN_SURVEYOR_STATE_IDLE 1
|
|
#define NN_SURVEYOR_STATE_PASSIVE 2
|
|
#define NN_SURVEYOR_STATE_ACTIVE 3
|
|
#define NN_SURVEYOR_STATE_CANCELLING 4
|
|
#define NN_SURVEYOR_STATE_STOPPING_TIMER 5
|
|
#define NN_SURVEYOR_STATE_STOPPING 6
|
|
|
|
#define NN_SURVEYOR_ACTION_START 1
|
|
#define NN_SURVEYOR_ACTION_CANCEL 2
|
|
|
|
#define NN_SURVEYOR_SRC_DEADLINE_TIMER 1
|
|
|
|
#define NN_SURVEYOR_TIMEDOUT 1
|
|
|
|
struct nn_surveyor {
|
|
|
|
/* The underlying raw SP socket. */
|
|
struct nn_xsurveyor xsurveyor;
|
|
|
|
/* The state machine. */
|
|
struct nn_fsm fsm;
|
|
int state;
|
|
|
|
/* Survey ID of the current survey. */
|
|
uint32_t surveyid;
|
|
|
|
/* Timer for timing out the survey. */
|
|
struct nn_timer timer;
|
|
|
|
/* When starting the survey, the message is temporarily stored here. */
|
|
struct nn_msg tosend;
|
|
|
|
/* Protocol-specific socket options. */
|
|
int deadline;
|
|
|
|
/* Flag if surveyor has timed out */
|
|
int timedout;
|
|
};
|
|
|
|
/* Private functions. */
|
|
static void nn_surveyor_init (struct nn_surveyor *self,
|
|
const struct nn_sockbase_vfptr *vfptr, void *hint);
|
|
static void nn_surveyor_term (struct nn_surveyor *self);
|
|
static void nn_surveyor_handler (struct nn_fsm *self, int src, int type,
|
|
void *srcptr);
|
|
static void nn_surveyor_shutdown (struct nn_fsm *self, int src, int type,
|
|
void *srcptr);
|
|
static int nn_surveyor_inprogress (struct nn_surveyor *self);
|
|
static void nn_surveyor_resend (struct nn_surveyor *self);
|
|
|
|
/* Implementation of nn_sockbase's virtual functions. */
|
|
static void nn_surveyor_stop (struct nn_sockbase *self);
|
|
static void nn_surveyor_destroy (struct nn_sockbase *self);
|
|
static int nn_surveyor_events (struct nn_sockbase *self);
|
|
static int nn_surveyor_send (struct nn_sockbase *self, struct nn_msg *msg);
|
|
static int nn_surveyor_recv (struct nn_sockbase *self, struct nn_msg *msg);
|
|
static int nn_surveyor_setopt (struct nn_sockbase *self, int level, int option,
|
|
const void *optval, size_t optvallen);
|
|
static int nn_surveyor_getopt (struct nn_sockbase *self, int level, int option,
|
|
void *optval, size_t *optvallen);
|
|
static const struct nn_sockbase_vfptr nn_surveyor_sockbase_vfptr = {
|
|
nn_surveyor_stop,
|
|
nn_surveyor_destroy,
|
|
nn_xsurveyor_add,
|
|
nn_xsurveyor_rm,
|
|
nn_xsurveyor_in,
|
|
nn_xsurveyor_out,
|
|
nn_surveyor_events,
|
|
nn_surveyor_send,
|
|
nn_surveyor_recv,
|
|
nn_surveyor_setopt,
|
|
nn_surveyor_getopt
|
|
};
|
|
|
|
static void nn_surveyor_init (struct nn_surveyor *self,
|
|
const struct nn_sockbase_vfptr *vfptr, void *hint)
|
|
{
|
|
nn_xsurveyor_init (&self->xsurveyor, vfptr, hint);
|
|
nn_fsm_init_root (&self->fsm, nn_surveyor_handler, nn_surveyor_shutdown,
|
|
nn_sockbase_getctx (&self->xsurveyor.sockbase));
|
|
self->state = NN_SURVEYOR_STATE_IDLE;
|
|
|
|
/* Start assigning survey IDs beginning with a random number. This way
|
|
there should be no key clashes even if the executable is re-started. */
|
|
nn_random_generate (&self->surveyid, sizeof (self->surveyid));
|
|
|
|
nn_timer_init (&self->timer, NN_SURVEYOR_SRC_DEADLINE_TIMER, &self->fsm);
|
|
nn_msg_init (&self->tosend, 0);
|
|
self->deadline = NN_SURVEYOR_DEFAULT_DEADLINE;
|
|
self->timedout = 0;
|
|
|
|
/* Start the state machine. */
|
|
nn_fsm_start (&self->fsm);
|
|
}
|
|
|
|
static void nn_surveyor_term (struct nn_surveyor *self)
|
|
{
|
|
nn_msg_term (&self->tosend);
|
|
nn_timer_term (&self->timer);
|
|
nn_fsm_term (&self->fsm);
|
|
nn_xsurveyor_term (&self->xsurveyor);
|
|
}
|
|
|
|
void nn_surveyor_stop (struct nn_sockbase *self)
|
|
{
|
|
struct nn_surveyor *surveyor;
|
|
|
|
surveyor = nn_cont (self, struct nn_surveyor, xsurveyor.sockbase);
|
|
|
|
nn_fsm_stop (&surveyor->fsm);
|
|
}
|
|
|
|
void nn_surveyor_destroy (struct nn_sockbase *self)
|
|
{
|
|
struct nn_surveyor *surveyor;
|
|
|
|
surveyor = nn_cont (self, struct nn_surveyor, xsurveyor.sockbase);
|
|
|
|
nn_surveyor_term (surveyor);
|
|
nn_free (surveyor);
|
|
}
|
|
|
|
static int nn_surveyor_inprogress (struct nn_surveyor *self)
|
|
{
|
|
/* Return 1 if there's a survey going on. 0 otherwise. */
|
|
return self->state == NN_SURVEYOR_STATE_IDLE ||
|
|
self->state == NN_SURVEYOR_STATE_PASSIVE ||
|
|
self->state == NN_SURVEYOR_STATE_STOPPING ? 0 : 1;
|
|
}
|
|
|
|
static int nn_surveyor_events (struct nn_sockbase *self)
|
|
{
|
|
int rc;
|
|
struct nn_surveyor *surveyor;
|
|
|
|
surveyor = nn_cont (self, struct nn_surveyor, xsurveyor.sockbase);
|
|
|
|
/* Determine the actual readability/writability of the socket. */
|
|
rc = nn_xsurveyor_events (&surveyor->xsurveyor.sockbase);
|
|
|
|
/* If there's no survey going on we'll signal IN to interrupt polling
|
|
when the survey expires. nn_recv() will return -EFSM afterwards. */
|
|
if (!nn_surveyor_inprogress (surveyor))
|
|
rc |= NN_SOCKBASE_EVENT_IN;
|
|
|
|
return rc;
|
|
}
|
|
|
|
static int nn_surveyor_send (struct nn_sockbase *self, struct nn_msg *msg)
|
|
{
|
|
struct nn_surveyor *surveyor;
|
|
|
|
surveyor = nn_cont (self, struct nn_surveyor, xsurveyor.sockbase);
|
|
|
|
/* Generate new survey ID. */
|
|
++surveyor->surveyid;
|
|
surveyor->surveyid |= 0x80000000;
|
|
|
|
/* Tag the survey body with survey ID. */
|
|
nn_assert (nn_chunkref_size (&msg->sphdr) == 0);
|
|
nn_chunkref_term (&msg->sphdr);
|
|
nn_chunkref_init (&msg->sphdr, 4);
|
|
nn_putl (nn_chunkref_data (&msg->sphdr), surveyor->surveyid);
|
|
|
|
/* Store the survey, so that it can be sent later on. */
|
|
nn_msg_term (&surveyor->tosend);
|
|
nn_msg_mv (&surveyor->tosend, msg);
|
|
nn_msg_init (msg, 0);
|
|
|
|
/* Cancel any ongoing survey, if any. */
|
|
if (nn_slow (nn_surveyor_inprogress (surveyor))) {
|
|
|
|
/* First check whether the survey can be sent at all. */
|
|
if (!(nn_xsurveyor_events (&surveyor->xsurveyor.sockbase) &
|
|
NN_SOCKBASE_EVENT_OUT))
|
|
return -EAGAIN;
|
|
|
|
/* Cancel the current survey. */
|
|
nn_fsm_action (&surveyor->fsm, NN_SURVEYOR_ACTION_CANCEL);
|
|
|
|
return 0;
|
|
}
|
|
|
|
/* Notify the state machine that the survey was started. */
|
|
nn_fsm_action (&surveyor->fsm, NN_SURVEYOR_ACTION_START);
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int nn_surveyor_recv (struct nn_sockbase *self, struct nn_msg *msg)
|
|
{
|
|
int rc;
|
|
struct nn_surveyor *surveyor;
|
|
uint32_t surveyid;
|
|
|
|
surveyor = nn_cont (self, struct nn_surveyor, xsurveyor.sockbase);
|
|
|
|
/* If no survey is going on return EFSM error. */
|
|
if (nn_slow (!nn_surveyor_inprogress (surveyor))) {
|
|
if (surveyor->timedout == NN_SURVEYOR_TIMEDOUT) {
|
|
surveyor->timedout = 0;
|
|
return -ETIMEDOUT;
|
|
} else
|
|
return -EFSM;
|
|
}
|
|
|
|
while (1) {
|
|
|
|
/* Get next response. */
|
|
rc = nn_xsurveyor_recv (&surveyor->xsurveyor.sockbase, msg);
|
|
if (nn_slow (rc == -EAGAIN))
|
|
return -EAGAIN;
|
|
errnum_assert (rc == 0, -rc);
|
|
|
|
/* Get the survey ID. Ignore any stale responses. */
|
|
/* TODO: This should be done asynchronously! */
|
|
if (nn_slow (nn_chunkref_size (&msg->sphdr) != sizeof (uint32_t)))
|
|
continue;
|
|
surveyid = nn_getl (nn_chunkref_data (&msg->sphdr));
|
|
if (nn_slow (surveyid != surveyor->surveyid))
|
|
continue;
|
|
|
|
/* Discard the header and return the message to the user. */
|
|
nn_chunkref_term (&msg->sphdr);
|
|
nn_chunkref_init (&msg->sphdr, 0);
|
|
break;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int nn_surveyor_setopt (struct nn_sockbase *self, int level, int option,
|
|
const void *optval, size_t optvallen)
|
|
{
|
|
struct nn_surveyor *surveyor;
|
|
|
|
surveyor = nn_cont (self, struct nn_surveyor, xsurveyor.sockbase);
|
|
|
|
if (level != NN_SURVEYOR)
|
|
return -ENOPROTOOPT;
|
|
|
|
if (option == NN_SURVEYOR_DEADLINE) {
|
|
if (nn_slow (optvallen != sizeof (int)))
|
|
return -EINVAL;
|
|
surveyor->deadline = *(int*) optval;
|
|
return 0;
|
|
}
|
|
|
|
return -ENOPROTOOPT;
|
|
}
|
|
|
|
static int nn_surveyor_getopt (struct nn_sockbase *self, int level, int option,
|
|
void *optval, size_t *optvallen)
|
|
{
|
|
struct nn_surveyor *surveyor;
|
|
|
|
surveyor = nn_cont (self, struct nn_surveyor, xsurveyor.sockbase);
|
|
|
|
if (level != NN_SURVEYOR)
|
|
return -ENOPROTOOPT;
|
|
|
|
if (option == NN_SURVEYOR_DEADLINE) {
|
|
if (nn_slow (*optvallen < sizeof (int)))
|
|
return -EINVAL;
|
|
*(int*) optval = surveyor->deadline;
|
|
*optvallen = sizeof (int);
|
|
return 0;
|
|
}
|
|
|
|
return -ENOPROTOOPT;
|
|
}
|
|
|
|
static void nn_surveyor_shutdown (struct nn_fsm *self, int src, int type,
|
|
NN_UNUSED void *srcptr)
|
|
{
|
|
struct nn_surveyor *surveyor;
|
|
|
|
surveyor = nn_cont (self, struct nn_surveyor, fsm);
|
|
|
|
if (nn_slow (src== NN_FSM_ACTION && type == NN_FSM_STOP)) {
|
|
nn_timer_stop (&surveyor->timer);
|
|
surveyor->state = NN_SURVEYOR_STATE_STOPPING;
|
|
}
|
|
if (nn_slow (surveyor->state == NN_SURVEYOR_STATE_STOPPING)) {
|
|
if (!nn_timer_isidle (&surveyor->timer))
|
|
return;
|
|
surveyor->state = NN_SURVEYOR_STATE_IDLE;
|
|
nn_fsm_stopped_noevent (&surveyor->fsm);
|
|
nn_sockbase_stopped (&surveyor->xsurveyor.sockbase);
|
|
return;
|
|
}
|
|
|
|
nn_fsm_bad_state(surveyor->state, src, type);
|
|
}
|
|
|
|
static void nn_surveyor_handler (struct nn_fsm *self, int src, int type,
|
|
NN_UNUSED void *srcptr)
|
|
{
|
|
struct nn_surveyor *surveyor;
|
|
|
|
surveyor = nn_cont (self, struct nn_surveyor, fsm);
|
|
|
|
switch (surveyor->state) {
|
|
|
|
/******************************************************************************/
|
|
/* IDLE state. */
|
|
/* The socket was created recently. */
|
|
/******************************************************************************/
|
|
case NN_SURVEYOR_STATE_IDLE:
|
|
switch (src) {
|
|
|
|
case NN_FSM_ACTION:
|
|
switch (type) {
|
|
case NN_FSM_START:
|
|
surveyor->state = NN_SURVEYOR_STATE_PASSIVE;
|
|
return;
|
|
default:
|
|
nn_fsm_bad_action (surveyor->state, src, type);
|
|
}
|
|
|
|
default:
|
|
nn_fsm_bad_source (surveyor->state, src, type);
|
|
}
|
|
|
|
/******************************************************************************/
|
|
/* PASSIVE state. */
|
|
/* There's no survey going on. */
|
|
/******************************************************************************/
|
|
case NN_SURVEYOR_STATE_PASSIVE:
|
|
switch (src) {
|
|
|
|
case NN_FSM_ACTION:
|
|
switch (type) {
|
|
case NN_SURVEYOR_ACTION_START:
|
|
nn_surveyor_resend (surveyor);
|
|
nn_timer_start (&surveyor->timer, surveyor->deadline);
|
|
surveyor->state = NN_SURVEYOR_STATE_ACTIVE;
|
|
return;
|
|
|
|
default:
|
|
nn_fsm_bad_action (surveyor->state, src, type);
|
|
}
|
|
|
|
default:
|
|
nn_fsm_bad_source (surveyor->state, src, type);
|
|
}
|
|
|
|
/******************************************************************************/
|
|
/* ACTIVE state. */
|
|
/* Survey was sent, waiting for responses. */
|
|
/******************************************************************************/
|
|
case NN_SURVEYOR_STATE_ACTIVE:
|
|
switch (src) {
|
|
|
|
case NN_FSM_ACTION:
|
|
switch (type) {
|
|
case NN_SURVEYOR_ACTION_CANCEL:
|
|
nn_timer_stop (&surveyor->timer);
|
|
surveyor->state = NN_SURVEYOR_STATE_CANCELLING;
|
|
return;
|
|
default:
|
|
nn_fsm_bad_action (surveyor->state, src, type);
|
|
}
|
|
|
|
case NN_SURVEYOR_SRC_DEADLINE_TIMER:
|
|
switch (type) {
|
|
case NN_TIMER_TIMEOUT:
|
|
nn_timer_stop (&surveyor->timer);
|
|
surveyor->state = NN_SURVEYOR_STATE_STOPPING_TIMER;
|
|
surveyor->timedout = NN_SURVEYOR_TIMEDOUT;
|
|
return;
|
|
default:
|
|
nn_fsm_bad_action (surveyor->state, src, type);
|
|
}
|
|
|
|
default:
|
|
nn_fsm_bad_source (surveyor->state, src, type);
|
|
}
|
|
|
|
/******************************************************************************/
|
|
/* CANCELLING state. */
|
|
/* Survey was cancelled, but the old timer haven't stopped yet. The new */
|
|
/* survey thus haven't been sent and is stored in 'tosend'. */
|
|
/******************************************************************************/
|
|
case NN_SURVEYOR_STATE_CANCELLING:
|
|
switch (src) {
|
|
|
|
case NN_FSM_ACTION:
|
|
switch (type) {
|
|
case NN_SURVEYOR_ACTION_CANCEL:
|
|
return;
|
|
default:
|
|
nn_fsm_bad_action (surveyor->state, src, type);
|
|
}
|
|
|
|
case NN_SURVEYOR_SRC_DEADLINE_TIMER:
|
|
switch (type) {
|
|
case NN_TIMER_STOPPED:
|
|
nn_surveyor_resend (surveyor);
|
|
nn_timer_start (&surveyor->timer, surveyor->deadline);
|
|
surveyor->state = NN_SURVEYOR_STATE_ACTIVE;
|
|
return;
|
|
default:
|
|
nn_fsm_bad_action (surveyor->state, src, type);
|
|
}
|
|
|
|
default:
|
|
nn_fsm_bad_source (surveyor->state, src, type);
|
|
}
|
|
|
|
/******************************************************************************/
|
|
/* STOPPING_TIMER state. */
|
|
/* Survey timeout expired. Now we are stopping the timer. */
|
|
/******************************************************************************/
|
|
case NN_SURVEYOR_STATE_STOPPING_TIMER:
|
|
switch (src) {
|
|
|
|
case NN_FSM_ACTION:
|
|
switch (type) {
|
|
case NN_SURVEYOR_ACTION_CANCEL:
|
|
surveyor->state = NN_SURVEYOR_STATE_CANCELLING;
|
|
return;
|
|
default:
|
|
nn_fsm_bad_action (surveyor->state, src, type);
|
|
}
|
|
|
|
case NN_SURVEYOR_SRC_DEADLINE_TIMER:
|
|
switch (type) {
|
|
case NN_TIMER_STOPPED:
|
|
surveyor->state = NN_SURVEYOR_STATE_PASSIVE;
|
|
return;
|
|
default:
|
|
nn_fsm_bad_action (surveyor->state, src, type);
|
|
}
|
|
|
|
default:
|
|
nn_fsm_bad_source (surveyor->state, src, type);
|
|
}
|
|
|
|
/******************************************************************************/
|
|
/* Invalid state. */
|
|
/******************************************************************************/
|
|
default:
|
|
nn_fsm_bad_state (surveyor->state, src, type);
|
|
}
|
|
}
|
|
|
|
static void nn_surveyor_resend (struct nn_surveyor *self)
|
|
{
|
|
int rc;
|
|
struct nn_msg msg;
|
|
|
|
nn_msg_cp (&msg, &self->tosend);
|
|
rc = nn_xsurveyor_send (&self->xsurveyor.sockbase, &msg);
|
|
errnum_assert (rc == 0, -rc);
|
|
}
|
|
|
|
static int nn_surveyor_create (void *hint, struct nn_sockbase **sockbase)
|
|
{
|
|
struct nn_surveyor *self;
|
|
|
|
self = nn_alloc (sizeof (struct nn_surveyor), "socket (surveyor)");
|
|
alloc_assert (self);
|
|
nn_surveyor_init (self, &nn_surveyor_sockbase_vfptr, hint);
|
|
*sockbase = &self->xsurveyor.sockbase;
|
|
|
|
return 0;
|
|
}
|
|
|
|
static struct nn_socktype nn_surveyor_socktype_struct = {
|
|
AF_SP,
|
|
NN_SURVEYOR,
|
|
0,
|
|
nn_surveyor_create,
|
|
nn_xsurveyor_ispeer,
|
|
NN_LIST_ITEM_INITIALIZER
|
|
};
|
|
|
|
struct nn_socktype *nn_surveyor_socktype = &nn_surveyor_socktype_struct;
|
|
|
|
|