You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
148 lines
4.4 KiB
148 lines
4.4 KiB
9 years ago
|
/*
|
||
|
Copyright (c) 2012-2013 Martin Sustrik All rights reserved.
|
||
|
|
||
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||
|
of this software and associated documentation files (the "Software"),
|
||
|
to deal in the Software without restriction, including without limitation
|
||
|
the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||
|
and/or sell copies of the Software, and to permit persons to whom
|
||
|
the Software is furnished to do so, subject to the following conditions:
|
||
|
|
||
|
The above copyright notice and this permission notice shall be included
|
||
|
in all copies or substantial portions of the Software.
|
||
|
|
||
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
|
||
|
THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||
|
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
||
|
IN THE SOFTWARE.
|
||
|
*/
|
||
|
|
||
|
#include "msgqueue.h"
|
||
|
|
||
|
#include "../../utils/alloc.h"
|
||
|
#include "../../utils/fast.h"
|
||
|
#include "../../utils/err.h"
|
||
|
|
||
|
#include <string.h>
|
||
|
|
||
|
void nn_msgqueue_init (struct nn_msgqueue *self, size_t maxmem)
|
||
|
{
|
||
|
struct nn_msgqueue_chunk *chunk;
|
||
|
|
||
|
self->count = 0;
|
||
|
self->mem = 0;
|
||
|
self->maxmem = maxmem;
|
||
|
|
||
|
chunk = nn_alloc (sizeof (struct nn_msgqueue_chunk), "msgqueue chunk");
|
||
|
alloc_assert (chunk);
|
||
|
chunk->next = NULL;
|
||
|
|
||
|
self->out.chunk = chunk;
|
||
|
self->out.pos = 0;
|
||
|
self->in.chunk = chunk;
|
||
|
self->in.pos = 0;
|
||
|
|
||
|
self->cache = NULL;
|
||
|
}
|
||
|
|
||
|
void nn_msgqueue_term (struct nn_msgqueue *self)
|
||
|
{
|
||
|
int rc;
|
||
|
struct nn_msg msg;
|
||
|
|
||
|
/* Deallocate messages in the pipe. */
|
||
|
while (1) {
|
||
|
rc = nn_msgqueue_recv (self, &msg);
|
||
|
if (rc == -EAGAIN)
|
||
|
break;
|
||
|
errnum_assert (rc >= 0, -rc);
|
||
|
nn_msg_term (&msg);
|
||
|
}
|
||
|
|
||
|
/* There are no more messages in the pipe so there's at most one chunk
|
||
|
in the queue. Deallocate it. */
|
||
|
nn_assert (self->in.chunk == self->out.chunk);
|
||
|
nn_free (self->in.chunk);
|
||
|
|
||
|
/* Deallocate the cached chunk, if any. */
|
||
|
if (self->cache)
|
||
|
nn_free (self->cache);
|
||
|
}
|
||
|
|
||
|
int nn_msgqueue_empty (struct nn_msgqueue *self)
|
||
|
{
|
||
|
return self->count == 0 ? 1 : 0;
|
||
|
}
|
||
|
|
||
|
int nn_msgqueue_send (struct nn_msgqueue *self, struct nn_msg *msg)
|
||
|
{
|
||
|
size_t msgsz;
|
||
|
|
||
|
/* By allowing one message of arbitrary size to be written to the queue,
|
||
|
we allow even messages that exceed max buffer size to pass through.
|
||
|
Beyond that we'll apply the buffer limit as specified by the user. */
|
||
|
msgsz = nn_chunkref_size (&msg->sphdr) + nn_chunkref_size (&msg->body);
|
||
|
if (nn_slow (self->count > 0 && self->mem + msgsz >= self->maxmem))
|
||
|
return -EAGAIN;
|
||
|
|
||
|
/* Adjust the statistics. */
|
||
|
++self->count;
|
||
|
self->mem += msgsz;
|
||
|
|
||
|
/* Move the content of the message to the pipe. */
|
||
|
nn_msg_mv (&self->out.chunk->msgs [self->out.pos], msg);
|
||
|
++self->out.pos;
|
||
|
|
||
|
/* If there's no space for a new message in the pipe, either re-use
|
||
|
the cache chunk or allocate a new chunk if it does not exist. */
|
||
|
if (nn_slow (self->out.pos == NN_MSGQUEUE_GRANULARITY)) {
|
||
|
if (nn_slow (!self->cache)) {
|
||
|
self->cache = nn_alloc (sizeof (struct nn_msgqueue_chunk),
|
||
|
"msgqueue chunk");
|
||
|
alloc_assert (self->cache);
|
||
|
self->cache->next = NULL;
|
||
|
}
|
||
|
self->out.chunk->next = self->cache;
|
||
|
self->out.chunk = self->cache;
|
||
|
self->cache = NULL;
|
||
|
self->out.pos = 0;
|
||
|
}
|
||
|
|
||
|
return 0;
|
||
|
}
|
||
|
|
||
|
int nn_msgqueue_recv (struct nn_msgqueue *self, struct nn_msg *msg)
|
||
|
{
|
||
|
struct nn_msgqueue_chunk *o;
|
||
|
|
||
|
/* If there is no message in the queue. */
|
||
|
if (nn_slow (!self->count))
|
||
|
return -EAGAIN;
|
||
|
|
||
|
/* Move the message from the pipe to the user. */
|
||
|
nn_msg_mv (msg, &self->in.chunk->msgs [self->in.pos]);
|
||
|
|
||
|
/* Move to the next position. */
|
||
|
++self->in.pos;
|
||
|
if (nn_slow (self->in.pos == NN_MSGQUEUE_GRANULARITY)) {
|
||
|
o = self->in.chunk;
|
||
|
self->in.chunk = self->in.chunk->next;
|
||
|
self->in.pos = 0;
|
||
|
if (nn_fast (!self->cache))
|
||
|
self->cache = o;
|
||
|
else
|
||
|
nn_free (o);
|
||
|
}
|
||
|
|
||
|
/* Adjust the statistics. */
|
||
|
--self->count;
|
||
|
self->mem -= (nn_chunkref_size (&msg->sphdr) +
|
||
|
nn_chunkref_size (&msg->body));
|
||
|
|
||
|
return 0;
|
||
|
}
|
||
|
|