Browse Source

update oi_socket - modify node code to match

v0.7.4-release
Ryan 16 years ago
parent
commit
30450388d6
  1. 23
      deps/liboi/LICENSE
  2. 36
      deps/liboi/README
  3. 24
      deps/liboi/config.mk
  4. 6
      deps/liboi/oi.h
  5. 278
      deps/liboi/oi.pod
  6. 41
      deps/liboi/oi_buf.c
  7. 30
      deps/liboi/oi_buf.h
  8. 28
      deps/liboi/oi_error.h
  9. 69
      deps/liboi/oi_queue.h
  10. 559
      deps/liboi/oi_socket.c
  11. 130
      deps/liboi/oi_socket.h
  12. 10
      deps/liboi/test/common.c
  13. 18
      deps/liboi/test/connection_interruption.c
  14. 12
      deps/liboi/test/echo.c
  15. 28
      deps/liboi/test/ping_pong.c
  16. 75
      src/net.cc
  17. 24
      src/net.h
  18. 3
      test/test-pingpong.js
  19. 2
      wscript

23
deps/liboi/LICENSE

@ -1,23 +0,0 @@
liboi is Copyright (C) 2009 Ryan Dahl.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.

36
deps/liboi/README

@ -1,36 +0,0 @@
liboi is a C library for doing evented I/O. It is intended for building
efficent internet programs.
liboi is released under the X11 license.
= Feature Summary
* The library has a minimalist design
- Does not make internal allocations
- Does not wrap functionality of GnuTLS or libev. The user must use those
libraries in conjuction with liboi.
* Supports both server and client sockets.
* Supports evented file I/O emulation through a thread pool.
* SSL support
* Sendfile (file to socket) with emulation on platforms that do not support
it.
= Building
1 Edit config.mk. You almost certainly will need to set the EVDIR and
GNUTLSDIR variables.
2 Run "make"
= Documentation
1 make doc
2 man ./oi.3
= Website
http://github.com/ry/liboi
= Author
Ryan Dahl (ry@tinyclouds.org)

24
deps/liboi/config.mk

@ -1,24 +0,0 @@
# Define EVDIR=/foo/bar if your libev header and library files are in
# /foo/bar/include and /foo/bar/lib directories.
EVDIR=$(HOME)/local/libev
# Define GNUTLSDIR=/foo/bar if your gnutls header and library files are in
# /foo/bar/include and /foo/bar/lib directories.
#GNUTLSDIR=/usr
uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not')
uname_M := $(shell sh -c 'uname -m 2>/dev/null || echo not')
uname_O := $(shell sh -c 'uname -o 2>/dev/null || echo not')
uname_R := $(shell sh -c 'uname -r 2>/dev/null || echo not')
uname_P := $(shell sh -c 'uname -p 2>/dev/null || echo not')
# CFLAGS and LDFLAGS are for the users to override from the command line.
CFLAGS = -g
LDFLAGS =
PREFIX = $(HOME)/local/liboi
CC = gcc
AR = ar
RM = rm -f
RANLIB = ranlib

6
deps/liboi/oi.h

@ -1,6 +0,0 @@
#ifndef oi_h
#define oi_h
#include <oi_socket.h>
#endif

278
deps/liboi/oi.pod

@ -1,278 +0,0 @@
=head1 NAME
liboi - a C library for doing evented I/O.
=head1 SYNOPSIS
#include <oi.h>
=head1 DESCRIPTION
liboi is an object oriented library for doing evented socket and file I/O.
The API is mostly about registering callbacks to be executed on certain
events.
Because most systems do not support asynchornous file I/O, the behavior is
emulated with an internal thread pool. The thread pool is accessed with the
C<oi_async> and C<oi_task> objects. Typically one will not need to use these
directly as C<oi_file> wraps that functionality.
=head2 CONVENTIONS
liboi's goal is to be very simple layer above the POSIX API. To that end it
avoids internal allocations as much as possible. Unless otherwise noted you
should assume all pointers passed into liboi will remain your responsibility
to maintain. That means you should not free the data passed into liboi
until the object in question has completed.
C<oi_socket> and C<oi_file> objects must be attached to an event loop. This
is completed with the C<*_attach> and C<*_detach> methods. When an object
is detached, other methods can be called - just the loop will not churn out
callbacks.
Both C<oi_socket> and C<oi_file> contain a number of callback pointers.
These are to be set manually after calling their initalization functions.
All classes include a C<void *data> member which is left for you to use.
=head1 ERROR HANDLING
=head1 Sockets
The C<oi_socket> structure represents a socket.
The callbacks inside C<oi_socket> are
void (*on_connect) (oi_socket *);
void (*on_read) (oi_socket *, const void *buf, size_t count);
void (*on_drain) (oi_socket *);
void (*on_error) (oi_socket *, struct oi_error e);
void (*on_close) (oi_socket *);
void (*on_timeout) (oi_socket *);
A the memory for a socket is released when the C<on_close()> callback is
made. That is, the user may free the memory for the socket with-in the
C<on_close()> callback.
=over 4
=item void oi_socket_init (oi_socket *, float timeout);
Initialize a socket. C<timeout> is the number of seconds of inactivity that
is allowed before the socket closes. The timeout only starts once the
socket is attached to a loop and open.
A C<timeout> argument of 0.0 signals that no timeout should be used.
After calling this function, register your callbacks manually. Thus your
code will probably look like this
oi_socket socket;
oi_socket_init(&socket, 60.0);
socket.on_connect = my_on_connect;
socket.on_read = my_on_read;
/* etc */
=item int oi_socket_connect (oi_socket *, struct addrinfo *addrinfo);
Open a client connect to the specified address. When the connection is made
C<socket.on_connect> will be called.
Here is an example of filling in C<addrinfo> for a local TCP connection on
port 5555:
struct addrinfo *servinfo;
struct addrinfo hints;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
r = getaddrinfo(NULL, "5555", &hints, &servinfo);
assert(r == 0);
oi_socket_connect(socket, servinfo);
=item void oi_socket_attach (oi_socket *, struct ev_loop *loop);
A socket must be attached to a loop in order before any callbacks will be
made.
=item void oi_socket_detach (oi_socket *);
Detaching a socket will not close the connection.
=item void oi_socket_read_start (oi_socket *);
This will make the socket start receiving data. When data is received the
C<socket.on_read> callback is made. The maximum amount of data that can be
receive at a time is controlled by C<socket.chunksize>.
The buffer returned by C<socket.on_read> is statically allocated exists only
for the length of the callback. That means if you need to save any of the
data coming down the line, you must copy it to a new buffer.
Ideally you will have a parser attached to the C<on_read> callback which can
be interrupted at any time.
C<socket.chunksize> can be changed at any time.
=item void oi_socket_read_stop (oi_socket *);
Stops receiving data. You may receive spurious C<on_read> attempts even
though the socket reading is stopped - be prepared to handle them.
=item void oi_socket_reset_timeout (oi_socket *);
Reset the timeout to allow the socket to exist for another few seconds
(however long you specified in the initialization function).
=item void oi_socket_write (oi_socket *socket, oi_buf *buf);
Write the I<buf> to the socket. Each socket has a queue of C<oi_buf> objects
to be written - this appends the specified buffer to the end of that queue.
You will be notified when the queue is empty with the C<socket.on_drain()>
callback. When the socket has written the buffer C<buf.release()> will be
called. The release callback does not imply that the buffer was successfully
written.
=item void oi_socket_write_simple (oi_socket *, const char *str, size_t len);
Sometimes you are just hacking around and need to quickly write a string to
the socket. This convenience function allocates an C<oi_buf> object, and
C<strdup>s the given string. The allocated buffer will be freed by liboi
internally.
Most production most applications will use their own memory pool and will
not need this function.
=item void oi_socket_write_eof (oi_socket *);
This closes the write end of the socket. Further writes are not allowed
after this.
=item void oi_socket_close (oi_socket *);
Attempts to close the socket.
If the socket is secure, an SSL bye message will be sent.
SSL recommends that you wait for a bye response from the peer however this
tends to be overkill for most people. By default liboi will not wait for
peer to send a matching bye message. If you require this then set
C<socket.wait_for_secure_hangup> to 1.
When the close is complete C<on_close()> is made. The C<on_close()>
callback is not made until the program returns to the event loop. This is
because C<on_close()> may free the socket memory and if C<on_close()> was
called from C<oi_socket_close()>, then the socket object might unexpectedly
be gone. To summarize: C<oi_socket_close()> does not call C<on_close()> and
the socket memory is still accessable immediately after making calling
C<oi_socket_close()>.
=item void oi_socket_set_secure_session (oi_socket *, gnutls_session_t);
This make a socket use SSL. You must create the GnuTLS session yourself and
assign its credentials.
=back
=head1 Servers
A server simply listens on an address for new connections. The connections
come in the form of C<oi_socket> objects. The key is to give a
C<server.on_connection()> callback which returns an initialized C<oi_socket>.
The callback looks like this
oi_socket* (*on_connection) (oi_server *, struct sockaddr *remote_addr, socklen_t remove_addr_len);
Returning NULL from C<on_connection()> will reject the connection.
=over 4
=item void oi_server_init (oi_server *, int backlog);
Initializes a server object. C<backlog> is the argument given
internally to C<listen()>. Set the C<server.on_connection()> callback
after calling this.
=item int oi_server_listen (oi_server *, struct addrinfo *addrinfo);
Listens on the specified address. The server will not accept connections
until it is attached to a loop, however.
=item void oi_server_attach (oi_server *, struct ev_loop *loop);
Attaches a server to a loop.
=item void oi_server_detach (oi_server *);
Detaches a server to a loop. Does not close the server.
=item void oi_server_close (oi_server *);
Stops the server from listening.
=back
=head1 Files
Files internally use a thread pool to operate without blocking.
The thread pool is started once a file is attached and it continues until
program termination.
The following callbacks are used inside of the file object
void (*on_open) (oi_file *);
void (*on_read) (oi_file *, size_t count);
void (*on_drain) (oi_file *);
void (*on_error) (oi_file *, struct oi_error);
void (*on_close) (oi_file *);
=over 4
=item int oi_file_init (oi_file *);
Initializes a file object.
=item void oi_file_attach (oi_file *, struct ev_loop *);
Attaches a file object to a loop. If the thread pool has not been started,
then it is started at this call.
=item void oi_file_detach (oi_file *);
Detaches a file object from the loop.
=item int oi_file_open_path (oi_file *, const char *path, int flags, mode_t mode);
Opens a file specified by the path. The C<flag> and C<mode> arguments are
the same as used by L<open.2>. The C<file.on_open> callback is triggered
when the file is opened. Returns 0 on success. Returns -1 if the given file
is already open.
WARNING: path argument must be valid until C<oi_file> object is closed and
the C<file.on_close()> callback is made. I.E., liboi does not strdup the path
pointer.
=item int oi_file_open_stdin (oi_file *);
=item int oi_file_open_stdout (oi_file *);
=item int oi_file_open_stderr (oi_file *);
=item void oi_file_read_start (oi_file *, void *buffer, size_t bufsize);
=item void oi_file_read_stop (oi_file *);
=item int oi_file_write (oi_file *, oi_buf *to_write);
=item int oi_file_write_simple (oi_file *, const char *, size_t);
=item int oi_file_send (oi_file *source, oi_socket *destination, off_t offset, size_t length);
=item void oi_file_close (oi_file *);
=over 4
=back
=head1 AUTHOR
Ryan Dahl <ry@tinyclouds.org>

41
deps/liboi/oi_buf.c

@ -1,41 +0,0 @@
#include <oi_buf.h>
#include <stdlib.h>
#include <string.h>
void oi_buf_destroy
( oi_buf *buf
)
{
free(buf->base);
free(buf);
}
oi_buf * oi_buf_new2
( size_t len
)
{
oi_buf *buf = malloc(sizeof(oi_buf));
if(!buf)
return NULL;
buf->base = malloc(len);
if(!buf->base) {
free(buf);
return NULL;
}
buf->len = len;
buf->release = oi_buf_destroy;
return buf;
}
oi_buf * oi_buf_new
( const char *base
, size_t len
)
{
oi_buf *buf = oi_buf_new2(len);
if(!buf)
return NULL;
memcpy(buf->base, base, len);
return buf;
}

30
deps/liboi/oi_buf.h

@ -1,30 +0,0 @@
#include <oi_queue.h>
#ifndef oi_buf_h
#define oi_buf_h
#ifdef __cplusplus
extern "C" {
#endif
typedef struct oi_buf oi_buf;
struct oi_buf {
/* public */
char *base;
size_t len;
void (*release) (oi_buf *); /* called when oi is done with the object */
void *data;
/* private */
size_t written;
oi_queue queue;
};
oi_buf * oi_buf_new (const char* base, size_t len);
oi_buf * oi_buf_new2 (size_t len);
void oi_buf_destroy (oi_buf *);
#ifdef __cplusplus
}
#endif
#endif // oi_buf_h

28
deps/liboi/oi_error.h

@ -1,28 +0,0 @@
#ifndef oi_error_h
#define oi_error_h
#ifdef __cplusplus
extern "C" {
#endif
enum oi_error_domain
{ OI_ERROR_GNUTLS
, OI_ERROR_EV
, OI_ERROR_CLOSE
, OI_ERROR_SHUTDOWN
, OI_ERROR_OPEN
, OI_ERROR_SEND
, OI_ERROR_RECV
, OI_ERROR_WRITE
, OI_ERROR_READ
, OI_ERROR_SENDFILE
};
struct oi_error {
enum oi_error_domain domain;
int code; /* errno */
};
#ifdef __cplusplus
}
#endif
#endif // oi_error_h

69
deps/liboi/oi_queue.h

@ -1,69 +0,0 @@
/* Copyright (C) 2002-2009 Igor Sysoev
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
#ifndef oi_queue_h
#define oi_queue_h
#ifdef __cplusplus
extern "C" {
#endif
#include <stddef.h> /* offsetof() */
typedef struct oi_queue oi_queue;
struct oi_queue {
oi_queue *prev;
oi_queue *next;
};
#define oi_queue_init(q) \
(q)->prev = q; \
(q)->next = q
#define oi_queue_empty(h) \
(h == (h)->prev)
#define oi_queue_insert_head(h, x) \
(x)->next = (h)->next; \
(x)->next->prev = x; \
(x)->prev = h; \
(h)->next = x
#define oi_queue_head(h) \
(h)->next
#define oi_queue_last(h) \
(h)->prev
#define oi_queue_remove(x) \
(x)->next->prev = (x)->prev; \
(x)->prev->next = (x)->next; \
(x)->prev = NULL; \
(x)->next = NULL
#define oi_queue_data(q, type, link) \
(type *) ((unsigned char *) q - offsetof(type, link))
#ifdef __cplusplus
}
#endif
#endif // oi_queue_h

559
deps/liboi/oi_socket.c

@ -1,3 +1,29 @@
/* Copyright (c) 2008,2009 Ryan Dahl
*
* oi_queue comes from ngx_queue.h
* Copyright (C) 2002-2009 Igor Sysoev
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
@ -21,9 +47,13 @@
#if HAVE_GNUTLS
# include <gnutls/gnutls.h>
# define GNUTLS_NEED_WRITE (gnutls_record_get_direction(socket->session) == 1)
# define GNUTLS_NEED_READ (gnutls_record_get_direction(socket->session) == 0)
#endif
#endif // HAVE_GNUTLS
/* a few forwards
* they wont even be defined if not having gnutls
* */
static int secure_full_goodbye (oi_socket *socket);
static int secure_half_goodbye (oi_socket *socket);
#undef TRUE
#define TRUE 1
@ -36,30 +66,56 @@
#define AGAIN 1
#define ERROR 2
#define RAISE_ERROR(s, _domain, _code) do { \
if(s->on_error) { \
struct oi_error __oi_error; \
__oi_error.domain = _domain; \
__oi_error.code = _code; \
s->on_error(s, __oi_error); \
} \
} while(0) \
void
oi_buf_destroy (oi_buf *buf)
{
free(buf->base);
free(buf);
}
oi_buf *
oi_buf_new2 (size_t len)
{
oi_buf *buf = malloc(sizeof(oi_buf));
if(!buf)
return NULL;
buf->base = malloc(len);
if(!buf->base) {
free(buf);
return NULL;
}
buf->len = len;
buf->release = oi_buf_destroy;
return buf;
}
oi_buf *
oi_buf_new (const char *base, size_t len)
{
oi_buf *buf = oi_buf_new2(len);
if(!buf)
return NULL;
memcpy(buf->base, base, len);
return buf;
}
#define CLOSE_ASAP(socket) do { \
if ((socket)->read_action) { \
(socket)->read_action = full_close; \
} \
if ((socket)->write_action) { \
(socket)->write_action = full_close; \
} \
} while (0)
static int
full_close(oi_socket *socket)
{
if(-1 == close(socket->fd) && errno == EINTR) {
/* TODO fd still open. next loop call close again? */
assert(0 && "implement me");
return ERROR;
}
if (close(socket->fd) == -1)
return errno == EINTR ? AGAIN : ERROR;
socket->read_action = NULL;
socket->write_action = NULL;
if(socket->attached) {
ev_feed_event(SOCKET_LOOP_ &socket->read_watcher, EV_READ);
}
return OKAY;
}
@ -67,43 +123,79 @@ static int
half_close(oi_socket *socket)
{
int r = shutdown(socket->fd, SHUT_WR);
if(r == -1) {
RAISE_ERROR(socket, OI_ERROR_SHUTDOWN, errno);
if (r == -1) {
socket->errorno = errno;
assert(0 && "Shouldn't get an error on shutdown");
return ERROR;
}
socket->write_action = NULL;
/* TODO set timer to zero so we get a callback soon */
return OKAY;
}
// This is to be called when ever the out_stream is empty
// and we need to change state.
static void
update_write_buffer_after_send(oi_socket *socket, ssize_t sent)
change_state_for_empty_out_stream (oi_socket *socket)
{
/*
* a very complicated bunch of close logic!
* XXX this is awful. FIXME
*/
if (socket->got_half_close == FALSE) {
if (socket->got_full_close == FALSE) {
/* Normal situation. Didn't get any close signals. */
ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
} else {
/* Got Full Close. */
if (socket->read_action)
#if HAVE_GNUTLS
socket->read_action = socket->secure ? secure_full_goodbye : full_close;
#else
socket->read_action = full_close;
#endif
if (socket->write_action)
#if HAVE_GNUTLS
socket->write_action = socket->secure ? secure_full_goodbye : full_close;
#else
socket->write_action = full_close;
#endif
}
} else {
/* Got Half Close. */
if (socket->write_action)
#if HAVE_GNUTLS
socket->write_action = socket->secure ? secure_half_goodbye : half_close;
#else
socket->write_action = half_close;
#endif
}
}
static void
update_write_buffer_after_send (oi_socket *socket, ssize_t sent)
{
oi_queue *q = oi_queue_last(&socket->out_stream);
oi_buf *to_write = oi_queue_data(q, oi_buf, queue);
to_write->written += sent;
socket->written += sent;
if(to_write->written == to_write->len) {
if (to_write->written == to_write->len) {
oi_queue_remove(q);
if(to_write->release) {
if (to_write->release) {
to_write->release(to_write);
}
if(oi_queue_empty(&socket->out_stream)) {
ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
if(socket->on_drain)
if (oi_queue_empty(&socket->out_stream)) {
change_state_for_empty_out_stream(socket);
if (socket->on_drain)
socket->on_drain(socket);
}
}
}
#if HAVE_GNUTLS
static int secure_socket_send(oi_socket *socket);
static int secure_socket_recv(oi_socket *socket);
@ -123,7 +215,7 @@ nosigpipe_push(gnutls_transport_ptr_t data, const void *buf, size_t len)
#endif
int r = send(socket->fd, buf, len, flags);
if(r == -1) {
if (r == -1) {
gnutls_transport_set_errno(socket->session, errno); /* necessary ? */
}
@ -137,26 +229,25 @@ secure_handshake(oi_socket *socket)
int r = gnutls_handshake(socket->session);
if(gnutls_error_is_fatal(r)) {
RAISE_ERROR(socket, OI_ERROR_GNUTLS, r);
if (gnutls_error_is_fatal(r)) {
socket->gnutls_errorno = r;
return ERROR;
}
if(r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN)
if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN)
return AGAIN;
oi_socket_reset_timeout(socket);
if(!socket->connected) {
if (!socket->connected) {
socket->connected = TRUE;
if(socket->on_connect)
socket->on_connect(socket);
if (socket->on_connect) socket->on_connect(socket);
}
if(socket->read_action)
if (socket->read_action)
socket->read_action = secure_socket_recv;
if(socket->write_action)
if (socket->write_action)
socket->write_action = secure_socket_send;
return OKAY;
@ -167,7 +258,7 @@ secure_socket_send(oi_socket *socket)
{
ssize_t sent;
if(oi_queue_empty(&socket->out_stream)) {
if (oi_queue_empty(&socket->out_stream)) {
ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
return AGAIN;
}
@ -182,32 +273,23 @@ secure_socket_send(oi_socket *socket)
, to_write->len - to_write->written
);
if(gnutls_error_is_fatal(sent)) {
RAISE_ERROR(socket, OI_ERROR_GNUTLS, sent);
if (gnutls_error_is_fatal(sent)) {
socket->gnutls_errorno = sent;
return ERROR;
}
if(sent == 0)
if (sent == 0)
return AGAIN;
oi_socket_reset_timeout(socket);
if(sent == GNUTLS_E_INTERRUPTED || sent == GNUTLS_E_AGAIN) {
if(GNUTLS_NEED_READ) {
if(socket->read_action) {
socket->read_action = secure_socket_send;
} else {
/* TODO GnuTLS needs read but already got EOF */
assert(0 && "needs read but already got EOF");
return ERROR;
}
}
if (sent == GNUTLS_E_INTERRUPTED || sent == GNUTLS_E_AGAIN) {
return AGAIN;
}
if(sent > 0) {
if (sent > 0) {
/* make sure the callbacks are correct */
if(socket->read_action)
if (socket->read_action)
socket->read_action = secure_socket_recv;
update_write_buffer_after_send(socket, sent);
return OKAY;
@ -220,8 +302,8 @@ secure_socket_send(oi_socket *socket)
static int
secure_socket_recv(oi_socket *socket)
{
char recv_buffer[TCP_MAXWIN];
size_t recv_buffer_size = MIN(TCP_MAXWIN, socket->chunksize);
char recv_buffer[socket->chunksize];
size_t recv_buffer_size = socket->chunksize;
ssize_t recved;
assert(socket->secure);
@ -230,22 +312,12 @@ secure_socket_recv(oi_socket *socket)
//printf("secure socket recv %d %p\n", recved, socket->on_connect);
if(gnutls_error_is_fatal(recved)) {
RAISE_ERROR(socket, OI_ERROR_GNUTLS, recved);
if (gnutls_error_is_fatal(recved)) {
socket->gnutls_errorno = recved;
return ERROR;
}
if(recved == GNUTLS_E_INTERRUPTED || recved == GNUTLS_E_AGAIN) {
if(GNUTLS_NEED_WRITE) {
if(socket->write_action) {
printf("need write\n");
socket->write_action = secure_socket_recv;
} else {
/* TODO GnuTLS needs send but already closed write end */
assert(0 && "needs read but cannot");
return ERROR;
}
}
if (recved == GNUTLS_E_INTERRUPTED || recved == GNUTLS_E_AGAIN) {
return AGAIN;
}
@ -254,27 +326,27 @@ secure_socket_recv(oi_socket *socket)
/* A server may also receive GNUTLS_E_REHANDSHAKE when a client has
* initiated a handshake. In that case the server can only initiate a
* handshake or terminate the connection. */
if(recved == GNUTLS_E_REHANDSHAKE) {
if(socket->write_action) {
if (recved == GNUTLS_E_REHANDSHAKE) {
if (socket->write_action) {
socket->read_action = secure_handshake;
socket->write_action = secure_handshake;
return OKAY;
} else {
/* TODO */
assert(0 && "needs read but cannot");
socket->read_action = full_close;
// set error
return ERROR;
}
}
if(recved >= 0) {
if (recved >= 0) {
/* Got EOF */
if(recved == 0)
if (recved == 0)
socket->read_action = NULL;
if(socket->write_action)
if (socket->write_action)
socket->write_action = secure_socket_send;
if(socket->on_read) { socket->on_read(socket, recv_buffer, recved); }
if (socket->on_read) { socket->on_read(socket, recv_buffer, recved); }
return OKAY;
}
@ -284,51 +356,46 @@ secure_socket_recv(oi_socket *socket)
}
static int
secure_goodbye(oi_socket *socket, gnutls_close_request_t how)
secure_full_goodbye (oi_socket *socket)
{
assert(socket->secure);
int r = gnutls_bye(socket->session, how);
int r = gnutls_bye(socket->session, GNUTLS_SHUT_RDWR);
if(gnutls_error_is_fatal(r)) {
RAISE_ERROR(socket, OI_ERROR_GNUTLS, r);
if (gnutls_error_is_fatal(r)) {
socket->gnutls_errorno = r;
return ERROR;
}
if(r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN)
if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN)
return AGAIN;
CLOSE_ASAP(socket);
return OKAY;
}
static int
secure_full_goodbye(oi_socket *socket)
secure_half_goodbye (oi_socket *socket)
{
int r = secure_goodbye(socket, GNUTLS_SHUT_RDWR);
if(OKAY == r) {
return full_close(socket);
}
return r;
}
assert(socket->secure);
static int
secure_half_goodbye(oi_socket *socket)
{
int r = secure_goodbye(socket, GNUTLS_SHUT_WR);
if(OKAY == r) {
return half_close(socket);
int r = gnutls_bye(socket->session, GNUTLS_SHUT_WR);
if (gnutls_error_is_fatal(r)) {
socket->gnutls_errorno = r;
return ERROR;
}
return r;
if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN)
return AGAIN;
if (socket->write_action)
socket->write_action = half_close;
return OKAY;
}
/* Tells the socket to use transport layer security (SSL). liboi does not
* want to make any decisions about security requirements, so the
* majoirty of GnuTLS configuration is left to the user. Only the transport
* layer of GnuTLS is controlled by liboi.
*
* That is, do not use gnutls_transport_* functions.
* Do use the rest of GnuTLS's API.
*/
void
oi_socket_set_secure_session (oi_socket *socket, gnutls_session_t session)
{
@ -338,13 +405,13 @@ oi_socket_set_secure_session (oi_socket *socket, gnutls_session_t session)
#endif /* HAVE GNUTLS */
static int
socket_send(oi_socket *socket)
socket_send (oi_socket *socket)
{
ssize_t sent;
assert(socket->secure == FALSE);
if(oi_queue_empty(&socket->out_stream)) {
if (oi_queue_empty(&socket->out_stream)) {
ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
return AGAIN;
}
@ -368,32 +435,36 @@ socket_send(oi_socket *socket)
, flags
);
if(sent < 0) {
switch(errno) {
if (sent < 0) {
switch (errno) {
#ifdef EWOULDBLOCK
case EWOULDBLOCK:
#else
case EAGAIN:
#endif
return AGAIN;
case ECONNREFUSED:
socket->errorno = errno;
return ERROR;
case ECONNRESET:
socket->write_action = NULL;
/* TODO maybe just clear write buffer instead of error?
* They should be able to read still from the socket.
*/
RAISE_ERROR(socket, OI_ERROR_SEND, errno);
socket->errorno = errno;
return ERROR;
default:
perror("send()");
assert(0 && "oi shouldn't let this happen.");
socket->errorno = errno;
return ERROR;
}
}
oi_socket_reset_timeout(socket);
if(!socket->connected) {
if (!socket->connected) {
socket->connected = TRUE;
if(socket->on_connect) { socket->on_connect(socket); }
if (socket->on_connect) socket->on_connect(socket);
}
update_write_buffer_after_send(socket, sent);
@ -402,7 +473,7 @@ socket_send(oi_socket *socket)
}
static int
socket_recv(oi_socket *socket)
socket_recv (oi_socket *socket)
{
char buf[TCP_MAXWIN];
size_t buf_size = TCP_MAXWIN;
@ -410,28 +481,30 @@ socket_recv(oi_socket *socket)
assert(socket->secure == FALSE);
if(!socket->connected) {
if (!socket->connected) {
socket->connected = TRUE;
if(socket->on_connect) { socket->on_connect(socket); }
if (socket->on_connect) socket->on_connect(socket);
return OKAY;
}
recved = recv(socket->fd, buf, buf_size, 0);
if(recved < 0) {
switch(errno) {
case EAGAIN:
if (recved < 0) {
switch (errno) {
#ifdef EWOULDBLOCK
case EWOULDBLOCK:
#else
case EAGAIN:
#endif
return AGAIN;
case EINTR:
return AGAIN;
/* A remote host refused to allow the network connection (typically
* because it is not running the requested service). */
case ECONNREFUSED:
RAISE_ERROR(socket, OI_ERROR_RECV, errno);
return ERROR;
case ECONNRESET:
RAISE_ERROR(socket, OI_ERROR_RECV, errno);
socket->errorno = errno;
return ERROR;
default:
@ -444,19 +517,19 @@ socket_recv(oi_socket *socket)
oi_socket_reset_timeout(socket);
if(recved == 0) {
if (recved == 0) {
oi_socket_read_stop(socket);
socket->read_action = NULL;
}
/* NOTE: EOF is signaled with recved == 0 on callback */
if(socket->on_read) { socket->on_read(socket, buf, recved); }
if (socket->on_read) { socket->on_read(socket, buf, recved); }
return OKAY;
}
static void
assign_file_descriptor(oi_socket *socket, int fd)
assign_file_descriptor (oi_socket *socket, int fd)
{
socket->fd = fd;
@ -467,7 +540,7 @@ assign_file_descriptor(oi_socket *socket, int fd)
socket->write_action = socket_send;
#if HAVE_GNUTLS
if(socket->secure) {
if (socket->secure) {
gnutls_transport_set_lowat(socket->session, 0);
gnutls_transport_set_push_function(socket->session, nosigpipe_push);
gnutls_transport_set_ptr2 ( socket->session
@ -485,7 +558,7 @@ assign_file_descriptor(oi_socket *socket, int fd)
* Called by server->connection_watcher.
*/
static void
on_connection(EV_P_ ev_io *watcher, int revents)
on_connection (EV_P_ ev_io *watcher, int revents)
{
oi_server *server = watcher->data;
@ -497,7 +570,7 @@ on_connection(EV_P_ ev_io *watcher, int revents)
#endif
assert(&server->connection_watcher == watcher);
if(EV_ERROR & revents) {
if (EV_ERROR & revents) {
oi_server_close(server);
return;
}
@ -507,23 +580,23 @@ on_connection(EV_P_ ev_io *watcher, int revents)
/* TODO accept all possible connections? currently: just one */
int fd = accept(server->fd, (struct sockaddr*)&address, &addr_len);
if(fd < 0) {
if (fd < 0) {
perror("accept()");
return;
}
oi_socket *socket = NULL;
if(server->on_connection)
if (server->on_connection)
socket = server->on_connection(server, (struct sockaddr*)&address, addr_len);
if(socket == NULL) {
if (socket == NULL) {
close(fd);
return;
}
int flags = fcntl(fd, F_GETFL, 0);
int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
if(r < 0) {
if (r < 0) {
/* TODO error report */
}
@ -541,21 +614,20 @@ int
oi_server_listen(oi_server *server, struct addrinfo *addrinfo)
{
int fd = -1;
struct linger ling = {0, 0};
assert(server->listening == FALSE);
fd = socket( addrinfo->ai_family
, addrinfo->ai_socktype
, addrinfo->ai_protocol
);
if(fd < 0) {
if (fd < 0) {
perror("socket()");
return -1;
}
int flags = fcntl(fd, F_GETFL, 0);
int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
if(r < 0) {
if (r < 0) {
perror("fcntl()");
return -1;
}
@ -563,7 +635,6 @@ oi_server_listen(oi_server *server, struct addrinfo *addrinfo)
flags = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
setsockopt(fd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
/* XXX: Sending single byte chunks in a response body? Perhaps there is a
* need to enable the Nagel algorithm dynamically. For now disabling.
@ -596,7 +667,7 @@ oi_server_listen(oi_server *server, struct addrinfo *addrinfo)
void
oi_server_close(oi_server *server)
{
if(server->listening) {
if (server->listening) {
oi_server_detach(server);
close(server->fd);
/* TODO do this on the loop? check return value? */
@ -647,23 +718,20 @@ on_timeout(EV_P_ ev_timer *watcher, int revents)
assert(watcher == &socket->timeout_watcher);
// printf("on_timeout\n");
if(socket->on_timeout) { socket->on_timeout(socket); }
// printf("on_timeout\n");
/* TODD set timer to zero */
full_close(socket);
if (socket->on_timeout) { socket->on_timeout(socket); }
// timeout does not automatically kill your connection. you must!
}
static void
release_write_buffer(oi_socket *socket)
{
while(!oi_queue_empty(&socket->out_stream)) {
while (!oi_queue_empty(&socket->out_stream)) {
oi_queue *q = oi_queue_last(&socket->out_stream);
oi_buf *buf = oi_queue_data(q, oi_buf, queue);
oi_queue_remove(q);
if(buf->release) { buf->release(buf); }
if (buf->release) { buf->release(buf); }
}
}
@ -673,57 +741,55 @@ on_io_event(EV_P_ ev_io *watcher, int revents)
{
oi_socket *socket = watcher->data;
if(revents & EV_ERROR) {
RAISE_ERROR(socket, OI_ERROR_EV, 0);
goto close;
if (revents & EV_ERROR) {
socket->errorno = 1;
CLOSE_ASAP(socket);
}
int r;
int have_read_event = TRUE;
int have_write_event = TRUE;
int have_read_event = (socket->read_action != NULL);
int have_write_event = (socket->write_action != NULL);
while(have_read_event || have_write_event) {
if(socket->read_action) {
r = socket->read_action(socket);
if(r == ERROR) goto close;
if(r == AGAIN) have_read_event = FALSE;
} else {
while (have_read_event || have_write_event) {
/* RECV LOOP - TRY TO CLEAR THE BUFFER */
if (socket->read_action == NULL)
have_read_event = FALSE;
}
else {
r = socket->read_action(socket);
if(socket->write_action) {
r = socket->write_action(socket);
if(r == ERROR) goto close;
if(r == AGAIN) have_write_event = FALSE;
} else {
have_write_event = FALSE;
if (r == AGAIN)
have_read_event = FALSE;
else if (r == ERROR)
CLOSE_ASAP(socket);
}
if(socket->read_watcher.active == FALSE)
have_read_event = FALSE;
if(socket->write_watcher.active == FALSE)
/* SEND LOOP - TRY TO CLEAR THE BUFFER */
if (socket->write_action == NULL)
have_write_event = FALSE;
}
if(socket->write_action == NULL && socket->read_action == NULL)
goto close;
else {
r = socket->write_action(socket);
return;
if (r == AGAIN)
have_write_event = FALSE;
else if (r == ERROR)
CLOSE_ASAP(socket);
}
}
close:
release_write_buffer(socket);
// Close when both sides of the stream are closed.
if (socket->write_action == NULL && socket->read_action == NULL) {
release_write_buffer(socket);
ev_clear_pending (EV_A_ &socket->write_watcher);
ev_clear_pending (EV_A_ &socket->read_watcher);
ev_clear_pending (EV_A_ &socket->timeout_watcher);
ev_clear_pending (EV_A_ &socket->write_watcher);
ev_clear_pending (EV_A_ &socket->read_watcher);
ev_clear_pending (EV_A_ &socket->timeout_watcher);
oi_socket_detach(socket);
oi_socket_detach(socket);
if(socket->on_close) { socket->on_close(socket); }
/* WARNING: user can free socket in on_close so no more
* access beyond this point. */
if (socket->on_close) { socket->on_close(socket); }
/* WARNING: user can free socket in on_close so no more
* access beyond this point. */
}
}
/**
@ -752,13 +818,17 @@ oi_socket_init(oi_socket *socket, float timeout)
ev_init(&socket->read_watcher, on_io_event);
socket->read_watcher.data = socket;
socket->got_full_close = FALSE;
socket->got_half_close = FALSE;
socket->errorno = 0;
socket->secure = FALSE;
socket->wait_for_secure_hangup = FALSE;
#if HAVE_GNUTLS
socket->gnutls_errorno = 0;
socket->session = NULL;
#endif
/* TODO higher resolution timer */
ev_timer_init(&socket->timeout_watcher, on_timeout, 0., timeout);
socket->timeout_watcher.data = socket;
@ -769,90 +839,50 @@ oi_socket_init(oi_socket *socket, float timeout)
socket->on_connect = NULL;
socket->on_read = NULL;
socket->on_drain = NULL;
socket->on_error = NULL;
socket->on_timeout = NULL;
}
void
oi_socket_write_eof (oi_socket *socket)
oi_socket_close (oi_socket *socket)
{
#if HAVE_GNUTLS
/* try to hang up properly for secure connections */
if(socket->secure)
{
if( socket->connected /* completed handshake */
&& socket->write_action /* write end is open */
)
{
socket->write_action = secure_half_goodbye;
if(socket->attached)
ev_io_start(SOCKET_LOOP_ &socket->write_watcher);
return;
}
/* secure servers cannot handle half-closed connections? */
full_close(socket);
return;
}
#endif // HAVE_GNUTLS
if(socket->write_action)
half_close(socket);
else
full_close(socket);
socket->got_half_close = TRUE;
if (oi_queue_empty(&socket->out_stream))
change_state_for_empty_out_stream(socket);
}
void
oi_socket_close (oi_socket *socket)
oi_socket_full_close (oi_socket *socket)
{
#if HAVE_GNUTLS
/* try to hang up properly for secure connections */
if( socket->secure
&& socket->connected /* completed handshake */
&& socket->write_action /* write end is open */
)
{
if(socket->wait_for_secure_hangup && socket->read_action) {
socket->write_action = secure_full_goodbye;
socket->read_action = secure_full_goodbye;
} else {
socket->write_action = secure_half_goodbye;
socket->read_action = NULL;
}
if(socket->attached)
ev_io_start(SOCKET_LOOP_ &socket->write_watcher);
return;
}
#endif // HAVE_GNUTLS
full_close(socket);
socket->got_full_close = TRUE;
if (oi_queue_empty(&socket->out_stream))
change_state_for_empty_out_stream(socket);
}
/*
* Resets the timeout to stay alive for another socket->timeout seconds
*/
void
oi_socket_reset_timeout(oi_socket *socket)
void oi_socket_force_close (oi_socket *socket)
{
ev_timer_again(SOCKET_LOOP_ &socket->timeout_watcher);
// socket->errorno = OI_SOCKET_ERROR_FORCE_CLOSE
CLOSE_ASAP(socket);
}
/**
* Writes a string to the socket. This is actually sets a watcher which may
* take multiple iterations to write the entire string.
*/
void
oi_socket_write(oi_socket *socket, oi_buf *buf)
{
if(socket->write_action == NULL)
return;
assert(socket->write_action != NULL && "Do not write to a closed socket");
assert(socket->got_full_close == FALSE && "Do not write to a closing socket");
assert(socket->got_half_close == FALSE && "Do not write to a closing socket");
oi_queue_insert_head(&socket->out_stream, &buf->queue);
buf->written = 0;
// XXX if (socket->attached) ??
ev_io_start(SOCKET_LOOP_ &socket->write_watcher);
if (socket->attached) {
ev_io_start(SOCKET_LOOP_ &socket->write_watcher);
}
}
void
oi_socket_reset_timeout(oi_socket *socket)
{
ev_timer_again(SOCKET_LOOP_ &socket->timeout_watcher);
}
static void
@ -886,20 +916,17 @@ oi_socket_attach(EV_P_ oi_socket *socket)
ev_timer_again(EV_A_ &socket->timeout_watcher);
if(socket->read_action)
if (socket->read_action)
ev_io_start(EV_A_ &socket->read_watcher);
if(socket->write_action)
if (socket->write_action)
ev_io_start(EV_A_ &socket->write_watcher);
/* make sure the io_event happens soon in the case we're being reattached */
ev_feed_event(EV_A_ &socket->read_watcher, EV_READ);
}
void
oi_socket_detach(oi_socket *socket)
{
if(socket->attached) {
if (socket->attached) {
ev_io_stop(SOCKET_LOOP_ &socket->write_watcher);
ev_io_stop(SOCKET_LOOP_ &socket->read_watcher);
ev_timer_stop(SOCKET_LOOP_ &socket->timeout_watcher);
@ -920,7 +947,7 @@ oi_socket_read_stop (oi_socket *socket)
void
oi_socket_read_start (oi_socket *socket)
{
if(socket->read_action) {
if (socket->read_action) {
ev_io_start(SOCKET_LOOP_ &socket->read_watcher);
/* XXX feed event? */
}
@ -933,14 +960,14 @@ oi_socket_connect(oi_socket *s, struct addrinfo *addrinfo)
, addrinfo->ai_socktype
, addrinfo->ai_protocol
);
if(fd < 0) {
if (fd < 0) {
perror("socket()");
return -1;
}
int flags = fcntl(fd, F_GETFL, 0);
int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
if(r < 0) {
if (r < 0) {
perror("fcntl()");
return -1;
}
@ -955,7 +982,7 @@ oi_socket_connect(oi_socket *s, struct addrinfo *addrinfo)
, addrinfo->ai_addrlen
);
if(r < 0 && errno != EINPROGRESS) {
if (r < 0 && errno != EINPROGRESS) {
perror("connect");
close(fd);
return -1;

130
deps/liboi/oi_socket.h

@ -1,8 +1,32 @@
/* Copyright (c) 2008,2009 Ryan Dahl
*
* oi_queue comes from ngx_queue.h
* Copyright (C) 2002-2009 Igor Sysoev
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
#include <netdb.h>
#include <ev.h>
#include <oi_queue.h>
#include <oi_error.h>
#include <oi_buf.h>
#include <stddef.h> /* offsetof() */
#ifndef oi_socket_h
#define oi_socket_h
@ -16,10 +40,48 @@ extern "C" {
#if HAVE_GNUTLS
# include <gnutls/gnutls.h>
#endif
typedef struct oi_queue oi_queue;
struct oi_queue {
oi_queue *prev;
oi_queue *next;
};
#define oi_queue_init(q) \
(q)->prev = q; \
(q)->next = q
#define oi_queue_empty(h) \
(h == (h)->prev)
#define oi_queue_insert_head(h, x) \
(x)->next = (h)->next; \
(x)->next->prev = x; \
(x)->prev = h; \
(h)->next = x
#define oi_queue_head(h) \
(h)->next
#define oi_queue_last(h) \
(h)->prev
#define oi_queue_remove(x) \
(x)->next->prev = (x)->prev; \
(x)->prev->next = (x)->next; \
(x)->prev = NULL; \
(x)->next = NULL
#define oi_queue_data(q, type, link) \
(type *) ((unsigned char *) q - offsetof(type, link))
typedef struct oi_buf oi_buf;
typedef struct oi_server oi_server;
typedef struct oi_socket oi_socket;
oi_buf * oi_buf_new (const char* base, size_t len);
oi_buf * oi_buf_new2 (size_t len);
void oi_buf_destroy (oi_buf *);
void oi_server_init (oi_server *, int backlog);
int oi_server_listen (oi_server *, struct addrinfo *addrinfo);
void oi_server_attach (EV_P_ oi_server *);
@ -27,21 +89,65 @@ void oi_server_detach (oi_server *);
void oi_server_close (oi_server *);
void oi_socket_init (oi_socket *, float timeout);
int oi_socket_pair (oi_socket *a, oi_socket *b); /* TODO */
int oi_socket_connect (oi_socket *, struct addrinfo *addrinfo);
void oi_socket_attach (EV_P_ oi_socket *);
void oi_socket_detach (oi_socket *);
void oi_socket_read_start (oi_socket *);
void oi_socket_read_stop (oi_socket *);
/* Resets the timeout to stay alive for another socket->timeout seconds
*/
void oi_socket_reset_timeout (oi_socket *);
/* Writes a buffer to the socket.
* (Do not send a NULL oi_buf or a buffer with oi_buf->base == NULL.)
*/
void oi_socket_write (oi_socket *, oi_buf *);
void oi_socket_write_simple (oi_socket *, const char *str, size_t len);
void oi_socket_write_eof (oi_socket *);
/* Once the write buffer is drained, oi_socket_close will shutdown the
* writing end of the socket and will close the read end once the server
* replies with an EOF.
*/
void oi_socket_close (oi_socket *);
/* Do not wait for the server to reply with EOF. This will only be called
* once the write buffer is drained.
* Warning: For TCP socket, the OS kernel may (should) reply with RST
* packets if this is called when data is still being received from the
* server.
*/
void oi_socket_full_close (oi_socket *);
/* The most extreme measure.
* Will not wait for the write queue to complete.
*/
void oi_socket_force_close (oi_socket *);
#if HAVE_GNUTLS
/* Tells the socket to use transport layer security (SSL). oi_socket does
* not want to make any decisions about security requirements, so the
* majoirty of GnuTLS configuration is left to the user. Only the transport
* layer of GnuTLS is controlled by oi_socket. That is, do not use
* gnutls_transport_* functions. Do use the rest of GnuTLS's API.
*/
void oi_socket_set_secure_session (oi_socket *, gnutls_session_t);
#endif
struct oi_buf {
/* public */
char *base;
size_t len;
void (*release) (oi_buf *); /* called when oi is done with the object */
void *data;
/* private */
size_t written;
oi_queue queue;
};
struct oi_server {
/* read only */
int fd;
@ -57,7 +163,7 @@ struct oi_server {
/* public */
oi_socket* (*on_connection) (oi_server *, struct sockaddr *remote_addr, socklen_t remove_addr_len);
void (*on_error) (oi_server *, struct oi_error e);
void (*on_error) (oi_server *);
void *data;
};
@ -73,12 +179,19 @@ struct oi_socket {
unsigned attached:1;
unsigned connected:1;
unsigned secure:1;
unsigned wait_for_secure_hangup:1;
unsigned got_full_close:1;
unsigned got_half_close:1;
/* if these are NULL then it means that end of the socket is closed. */
/* NULL = that end of the socket is closed. */
int (*read_action) (oi_socket *);
int (*write_action) (oi_socket *);
/* ERROR CODES. 0 = no error. Check on_close. */
int errorno;
#if HAVE_GNUTLS
int gnutls_errorno;
#endif
/* private */
ev_io write_watcher;
ev_io read_watcher;
@ -92,7 +205,6 @@ struct oi_socket {
void (*on_connect) (oi_socket *);
void (*on_read) (oi_socket *, const void *buf, size_t count);
void (*on_drain) (oi_socket *);
void (*on_error) (oi_socket *, struct oi_error e);
void (*on_close) (oi_socket *);
void (*on_timeout) (oi_socket *);
void *data;

10
deps/liboi/test/common.c

@ -11,7 +11,7 @@
#include <ev.h>
#include <oi.h>
#include <oi_socket.h>
#include <gnutls/gnutls.h>
#define HOST "127.0.0.1"
@ -23,8 +23,10 @@ int nconnections;
static void
on_peer_close(oi_socket *socket)
{
assert(socket->errorno == 0);
//printf("server connection closed\n");
#if HAVE_GNUTLS
assert(socket->gnutls_errorno == 0);
#if SECURE
gnutls_deinit(socket->session);
#endif
@ -46,12 +48,6 @@ on_peer_timeout(oi_socket *socket)
assert(0);
}
static void
on_client_error(oi_socket *socket, struct oi_error e)
{
assert(0);
}
#if HAVE_GNUTLS
#if SECURE

18
deps/liboi/test/connection_interruption.c

@ -18,13 +18,6 @@ on_peer_drain(oi_socket *socket)
oi_socket_close(socket);
}
static void
on_peer_error2(oi_socket *socket, struct oi_error e)
{
if(e.domain == OI_ERROR_GNUTLS) return;
assert(0);
}
static oi_socket*
on_server_connection(oi_server *server, struct sockaddr *addr, socklen_t len)
{
@ -32,7 +25,6 @@ on_server_connection(oi_server *server, struct sockaddr *addr, socklen_t len)
oi_socket_init(socket, TIMEOUT);
socket->on_read = on_peer_read;
socket->on_drain = on_peer_drain;
socket->on_error = on_peer_error2;
socket->on_close = on_peer_close;
socket->on_timeout = on_peer_timeout;
@ -51,7 +43,7 @@ static void
on_client_connect(oi_socket *socket)
{
//printf("on client connection\n");
oi_socket_write_eof(socket);
oi_socket_close(socket);
}
static void
@ -69,13 +61,18 @@ on_client_close(oi_socket *socket)
static void
on_client_read(oi_socket *socket, const void *base, size_t len)
{
if (len == 0) {
oi_socket_close(socket);
return;
}
char buf[200000];
strncpy(buf, base, len);
buf[len] = 0;
//printf("client got message: %s\n", buf);
if(strcmp(buf, "BYE") == 0) {
if (strcmp(buf, "BYE") == 0) {
oi_socket_close(socket);
} else {
assert(0);
@ -132,7 +129,6 @@ main(int argc, const char *argv[])
oi_socket *client = malloc(sizeof(oi_socket));
oi_socket_init(client, TIMEOUT);
client->on_read = on_client_read;
client->on_error = on_client_error;
client->on_connect = on_client_connect;
client->on_close = on_client_close;
client->on_timeout = on_client_timeout;

12
deps/liboi/test/echo.c

@ -1,5 +1,8 @@
#include "test/common.c"
// timeout must match the timeout in timeout.rb
#define TIMEOUT 5.0
int successful_ping_count;
static void
@ -11,19 +14,12 @@ on_peer_read(oi_socket *socket, const void *base, size_t len)
oi_socket_write_simple(socket, base, len);
}
static void
on_peer_error(oi_socket *socket, struct oi_error e)
{
assert(0);
}
static oi_socket*
on_server_connection(oi_server *server, struct sockaddr *addr, socklen_t len)
{
oi_socket *socket = malloc(sizeof(oi_socket));
oi_socket_init(socket, 5.0);
oi_socket_init(socket, TIMEOUT);
socket->on_read = on_peer_read;
socket->on_error = on_peer_error;
socket->on_close = on_peer_close;
socket->on_timeout = on_peer_timeout;

28
deps/liboi/test/ping_pong.c

@ -2,15 +2,18 @@
#define PING "PING"
#define PONG "PONG"
#define EXCHANGES 100
#define EXCHANGES 500
#define TIMEOUT 5.0
int successful_ping_count;
static void
on_peer_read(oi_socket *socket, const void *base, size_t len)
{
if(len == 0)
if (len == 0) {
oi_socket_close(socket);
return;
}
char buf[2000];
strncpy(buf, base, len);
@ -20,12 +23,6 @@ on_peer_read(oi_socket *socket, const void *base, size_t len)
oi_socket_write_simple(socket, PONG, sizeof PONG);
}
static void
on_peer_error(oi_socket *socket, struct oi_error e)
{
assert(0);
}
static void
on_client_close(oi_socket *socket)
{
@ -37,9 +34,8 @@ static oi_socket*
on_server_connection(oi_server *server, struct sockaddr *addr, socklen_t len)
{
oi_socket *socket = malloc(sizeof(oi_socket));
oi_socket_init(socket, 5.0);
oi_socket_init(socket, TIMEOUT);
socket->on_read = on_peer_read;
socket->on_error = on_peer_error;
socket->on_close = on_peer_close;
socket->on_timeout = on_peer_timeout;
@ -57,15 +53,20 @@ on_server_connection(oi_server *server, struct sockaddr *addr, socklen_t len)
}
static void
on_client_connect(oi_socket *socket)
on_client_connect (oi_socket *socket)
{
//printf("client connected. sending ping\n");
oi_socket_write_simple(socket, PING, sizeof PING);
}
static void
on_client_read(oi_socket *socket, const void *base, size_t len)
on_client_read (oi_socket *socket, const void *base, size_t len)
{
if(len == 0) {
oi_socket_close(socket);
return;
}
char buf[200000];
strncpy(buf, base, len);
buf[len] = 0;
@ -134,9 +135,8 @@ main(int argc, const char *argv[])
assert(r == 0);
oi_server_attach(EV_DEFAULT_ &server);
oi_socket_init(&client, 5.0);
oi_socket_init(&client, TIMEOUT);
client.on_read = on_client_read;
client.on_error = on_client_error;
client.on_connect = on_client_connect;
client.on_close = on_client_close;
client.on_timeout = on_client_timeout;

75
src/net.cc

@ -26,7 +26,7 @@ using namespace node;
#define CONNECT_SYMBOL String::NewSymbol("connect")
#define ENCODING_SYMBOL String::NewSymbol("encoding")
#define TIMEOUT_SYMBOL String::NewSymbol("timeout")
#define SERVER_SYMBOL String::NewSymbol("server")
#define SERVER_SYMBOL String::NewSymbol("server")
#define PROTOCOL_SYMBOL String::NewSymbol("protocol")
#define PROTOCOL_CLASS_SYMBOL String::NewSymbol("protocol_class")
@ -55,9 +55,10 @@ Connection::Initialize (v8::Handle<v8::Object> target)
constructor_template->InstanceTemplate()->SetInternalFieldCount(1);
NODE_SET_METHOD(constructor_template->PrototypeTemplate(), "connect", v8Connect);
NODE_SET_METHOD(constructor_template->PrototypeTemplate(), "close", v8Close);
NODE_SET_METHOD(constructor_template->PrototypeTemplate(), "send", v8Send);
NODE_SET_METHOD(constructor_template->PrototypeTemplate(), "sendEOF", v8SendEOF);
NODE_SET_METHOD(constructor_template->PrototypeTemplate(), "close", v8Close);
NODE_SET_METHOD(constructor_template->PrototypeTemplate(), "fullClose", v8FullClose);
NODE_SET_METHOD(constructor_template->PrototypeTemplate(), "forceClose", v8ForceClose);
target->Set(String::NewSymbol("TCPConnection"), constructor_template->GetFunction());
}
@ -97,12 +98,17 @@ Connection::Connection (Handle<Object> handle, Handle<Function> protocol_class)
socket_.on_connect = Connection::on_connect;
socket_.on_read = Connection::on_read;
socket_.on_drain = Connection::on_drain;
socket_.on_error = Connection::on_error;
socket_.on_close = Connection::on_close;
socket_.on_timeout = Connection::on_timeout;
socket_.data = this;
}
Connection::~Connection ()
{
handle_->Delete(SEND_SYMBOL);
Close();
}
Local<Object>
Connection::GetProtocol (void)
{
@ -207,9 +213,7 @@ Connection::AfterResolve (eio_req *req)
return 0;
}
oi_error e; // TODO better error!
connection->OnError(e);
connection->OnDisconnect();
return 0;
}
@ -224,15 +228,31 @@ Connection::v8Close (const Arguments& args)
}
Handle<Value>
Connection::v8Send (const Arguments& args)
Connection::v8FullClose (const Arguments& args)
{
HandleScope scope;
Connection *connection = NODE_UNWRAP(Connection, args.Holder());
connection->FullClose();
return Undefined();
}
if (args[0] == Null()) {
oi_socket_write_eof(&connection->socket_);
Handle<Value>
Connection::v8ForceClose (const Arguments& args)
{
HandleScope scope;
Connection *connection = NODE_UNWRAP(Connection, args.Holder());
connection->ForceClose();
return Undefined();
}
Handle<Value>
Connection::v8Send (const Arguments& args)
{
HandleScope scope;
Connection *connection = NODE_UNWRAP(Connection, args.Holder());
} else if (args[0]->IsString()) {
if (args[0]->IsString()) {
// utf8 encoding
Local<String> s = args[0]->ToString();
size_t length = s->Utf8Length();
@ -256,15 +276,6 @@ Connection::v8Send (const Arguments& args)
return Undefined();
}
Handle<Value>
Connection::v8SendEOF (const Arguments& args)
{
HandleScope scope;
Connection *connection = NODE_UNWRAP(Connection, args.Holder());
connection->SendEOF();
return Undefined();
}
void
Connection::OnReceive (const void *buf, size_t len)
{
@ -305,18 +316,6 @@ Connection::OnReceive (const void *buf, size_t len)
fatal_exception(try_catch); // XXX is this the right action to take?
}
void
Connection::OnError (oi_error e)
{
HandleScope scope;
Local<Object> protocol = GetProtocol();
Local<Value> callback_v = protocol->Get(ON_ERROR_SYMBOL);
if (!callback_v->IsFunction()) return;
Handle<Function> callback = Handle<Function>::Cast(callback_v);
// TODO call with error arg
callback->Call(protocol, 0, NULL);
}
#define DEFINE_SIMPLE_CALLBACK(name, symbol) \
void name () \
{ \
@ -372,7 +371,6 @@ Acceptor::Acceptor (Handle<Object> handle, Handle<Function> protocol_class, Han
oi_server_init(&server_, backlog);
server_.on_connection = Acceptor::on_connection;
server_.on_error = Acceptor::on_error;
server_.data = this;
}
@ -398,17 +396,6 @@ Acceptor::OnConnection (struct sockaddr *addr, socklen_t len)
return connection;
}
void
Acceptor::OnError (struct oi_error error)
{
HandleScope scope;
Local<Value> callback_v = handle_->Get(ON_ERROR_SYMBOL);
if (!callback_v->IsFunction()) return;
Local<Function> callback = Local<Function>::Cast(callback_v);
callback->Call(handle_, 0, NULL); // TODO args
}
Handle<Value>
Acceptor::v8New (const Arguments& args)
{

24
src/net.h

@ -19,16 +19,18 @@ protected:
static v8::Handle<v8::Value> v8New (const v8::Arguments& args);
static v8::Handle<v8::Value> v8Connect (const v8::Arguments& args);
static v8::Handle<v8::Value> v8Send (const v8::Arguments& args);
static v8::Handle<v8::Value> v8SendEOF (const v8::Arguments& args);
static v8::Handle<v8::Value> v8Close (const v8::Arguments& args);
static v8::Handle<v8::Value> v8FullClose (const v8::Arguments& args);
static v8::Handle<v8::Value> v8ForceClose (const v8::Arguments& args);
Connection (v8::Handle<v8::Object> handle, v8::Handle<v8::Function> protocol_class);
virtual ~Connection () { Close(); }
virtual ~Connection ();
int Connect (struct addrinfo *address) { return oi_socket_connect (&socket_, address); }
void Send (oi_buf *buf) { oi_socket_write (&socket_, buf); }
void SendEOF (void) { oi_socket_write_eof (&socket_); }
void Close (void) { oi_socket_close (&socket_); }
void Send (oi_buf *buf) { oi_socket_write(&socket_, buf); }
void Close (void) { oi_socket_close(&socket_); }
void FullClose (void) { oi_socket_full_close(&socket_); }
void ForceClose (void) { oi_socket_force_close(&socket_); }
void SetAcceptor (v8::Handle<v8::Object> acceptor_handle);
@ -37,7 +39,6 @@ protected:
virtual void OnDrain (void);
virtual void OnEOF (void);
virtual void OnDisconnect (void);
virtual void OnError (oi_error e);
virtual void OnTimeout (void);
v8::Local<v8::Object> GetProtocol (void);
@ -65,11 +66,6 @@ private:
connection->OnDrain();
}
static void on_error (oi_socket *s, oi_error e) {
Connection *connection = static_cast<Connection*> (s->data);
connection->OnError(e);
}
static void on_close (oi_socket *s) {
Connection *connection = static_cast<Connection*> (s->data);
connection->OnDisconnect();
@ -118,7 +114,6 @@ protected:
}
virtual Connection* OnConnection (struct sockaddr *addr, socklen_t len);
virtual void OnError (struct oi_error error);
private:
static oi_socket* on_connection (oi_server *s, struct sockaddr *addr, socklen_t len) {
@ -130,11 +125,6 @@ private:
return NULL;
}
static void on_error (oi_server *s, struct oi_error error) {
Acceptor *acceptor = static_cast<Acceptor*> (s->data);
acceptor->OnError (error);
}
oi_server server_;
};

3
test/test-pingpong.js

@ -22,7 +22,6 @@ function Ponger (socket) {
this.onEOF = function () {
puts("ponger: onEOF");
socket.send("QUIT");
socket.close();
};
@ -47,7 +46,7 @@ function Pinger (socket) {
socket.send("PING");
} else {
puts("sending FIN");
socket.sendEOF();
socket.close();
}
};

2
wscript

@ -110,7 +110,7 @@ def build(bld):
### oi
oi = bld.new_task_gen("cc", "staticlib")
oi.source = "deps/liboi/oi_socket.c deps/liboi/oi_buf.c"
oi.source = "deps/liboi/oi_socket.c"
oi.includes = "deps/liboi/"
oi.name = "oi"
oi.target = "oi"

Loading…
Cancel
Save