From 30450388d619788ca188183fb6fc15122ebf4f67 Mon Sep 17 00:00:00 2001 From: Ryan Date: Tue, 5 May 2009 12:52:18 +0200 Subject: [PATCH] update oi_socket - modify node code to match --- deps/liboi/LICENSE | 23 - deps/liboi/README | 36 -- deps/liboi/config.mk | 24 - deps/liboi/oi.h | 6 - deps/liboi/oi.pod | 278 ----------- deps/liboi/oi_buf.c | 41 -- deps/liboi/oi_buf.h | 30 -- deps/liboi/oi_error.h | 28 -- deps/liboi/oi_queue.h | 69 --- deps/liboi/oi_socket.c | 559 ++++++++++++---------- deps/liboi/oi_socket.h | 130 ++++- deps/liboi/test/common.c | 10 +- deps/liboi/test/connection_interruption.c | 18 +- deps/liboi/test/echo.c | 12 +- deps/liboi/test/ping_pong.c | 28 +- src/net.cc | 75 ++- src/net.h | 24 +- test/test-pingpong.js | 3 +- wscript | 2 +- 19 files changed, 482 insertions(+), 914 deletions(-) delete mode 100644 deps/liboi/LICENSE delete mode 100644 deps/liboi/README delete mode 100644 deps/liboi/config.mk delete mode 100644 deps/liboi/oi.h delete mode 100644 deps/liboi/oi.pod delete mode 100644 deps/liboi/oi_buf.c delete mode 100644 deps/liboi/oi_buf.h delete mode 100644 deps/liboi/oi_error.h delete mode 100644 deps/liboi/oi_queue.h diff --git a/deps/liboi/LICENSE b/deps/liboi/LICENSE deleted file mode 100644 index a5a6e0191f..0000000000 --- a/deps/liboi/LICENSE +++ /dev/null @@ -1,23 +0,0 @@ -liboi is Copyright (C) 2009 Ryan Dahl. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: - -* Redistributions of source code must retain the above copyright - notice, this list of conditions and the following disclaimer. - -* Redistributions in binary form must reproduce the above - copyright notice, this list of conditions and the following disclaimer in - the documentation and/or other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE -LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -POSSIBILITY OF SUCH DAMAGE. diff --git a/deps/liboi/README b/deps/liboi/README deleted file mode 100644 index 0609ab4961..0000000000 --- a/deps/liboi/README +++ /dev/null @@ -1,36 +0,0 @@ -liboi is a C library for doing evented I/O. It is intended for building -efficent internet programs. - -liboi is released under the X11 license. - -= Feature Summary - - * The library has a minimalist design - - Does not make internal allocations - - Does not wrap functionality of GnuTLS or libev. The user must use those - libraries in conjuction with liboi. - * Supports both server and client sockets. - * Supports evented file I/O emulation through a thread pool. - * SSL support - * Sendfile (file to socket) with emulation on platforms that do not support - it. - -= Building - - 1 Edit config.mk. You almost certainly will need to set the EVDIR and - GNUTLSDIR variables. - 2 Run "make" - -= Documentation - - 1 make doc - 2 man ./oi.3 - -= Website - -http://github.com/ry/liboi - -= Author - -Ryan Dahl (ry@tinyclouds.org) - diff --git a/deps/liboi/config.mk b/deps/liboi/config.mk deleted file mode 100644 index b5d6fd0b82..0000000000 --- a/deps/liboi/config.mk +++ /dev/null @@ -1,24 +0,0 @@ -# Define EVDIR=/foo/bar if your libev header and library files are in -# /foo/bar/include and /foo/bar/lib directories. -EVDIR=$(HOME)/local/libev - -# Define GNUTLSDIR=/foo/bar if your gnutls header and library files are in -# /foo/bar/include and /foo/bar/lib directories. -#GNUTLSDIR=/usr - -uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not') -uname_M := $(shell sh -c 'uname -m 2>/dev/null || echo not') -uname_O := $(shell sh -c 'uname -o 2>/dev/null || echo not') -uname_R := $(shell sh -c 'uname -r 2>/dev/null || echo not') -uname_P := $(shell sh -c 'uname -p 2>/dev/null || echo not') - -# CFLAGS and LDFLAGS are for the users to override from the command line. -CFLAGS = -g -LDFLAGS = - -PREFIX = $(HOME)/local/liboi - -CC = gcc -AR = ar -RM = rm -f -RANLIB = ranlib diff --git a/deps/liboi/oi.h b/deps/liboi/oi.h deleted file mode 100644 index b4d7f0a3ec..0000000000 --- a/deps/liboi/oi.h +++ /dev/null @@ -1,6 +0,0 @@ -#ifndef oi_h -#define oi_h - -#include - -#endif diff --git a/deps/liboi/oi.pod b/deps/liboi/oi.pod deleted file mode 100644 index 8b07115ee6..0000000000 --- a/deps/liboi/oi.pod +++ /dev/null @@ -1,278 +0,0 @@ -=head1 NAME - -liboi - a C library for doing evented I/O. - -=head1 SYNOPSIS - - #include - -=head1 DESCRIPTION - -liboi is an object oriented library for doing evented socket and file I/O. -The API is mostly about registering callbacks to be executed on certain -events. - -Because most systems do not support asynchornous file I/O, the behavior is -emulated with an internal thread pool. The thread pool is accessed with the -C and C objects. Typically one will not need to use these -directly as C wraps that functionality. - -=head2 CONVENTIONS - -liboi's goal is to be very simple layer above the POSIX API. To that end it -avoids internal allocations as much as possible. Unless otherwise noted you -should assume all pointers passed into liboi will remain your responsibility -to maintain. That means you should not free the data passed into liboi -until the object in question has completed. - -C and C objects must be attached to an event loop. This -is completed with the C<*_attach> and C<*_detach> methods. When an object -is detached, other methods can be called - just the loop will not churn out -callbacks. - -Both C and C contain a number of callback pointers. -These are to be set manually after calling their initalization functions. -All classes include a C member which is left for you to use. - -=head1 ERROR HANDLING - - -=head1 Sockets - -The C structure represents a socket. -The callbacks inside C are - void (*on_connect) (oi_socket *); - void (*on_read) (oi_socket *, const void *buf, size_t count); - void (*on_drain) (oi_socket *); - void (*on_error) (oi_socket *, struct oi_error e); - void (*on_close) (oi_socket *); - void (*on_timeout) (oi_socket *); - -A the memory for a socket is released when the C callback is -made. That is, the user may free the memory for the socket with-in the -C callback. - -=over 4 - -=item void oi_socket_init (oi_socket *, float timeout); - -Initialize a socket. C is the number of seconds of inactivity that -is allowed before the socket closes. The timeout only starts once the -socket is attached to a loop and open. - -A C argument of 0.0 signals that no timeout should be used. - -After calling this function, register your callbacks manually. Thus your -code will probably look like this - oi_socket socket; - oi_socket_init(&socket, 60.0); - socket.on_connect = my_on_connect; - socket.on_read = my_on_read; - /* etc */ - - -=item int oi_socket_connect (oi_socket *, struct addrinfo *addrinfo); - -Open a client connect to the specified address. When the connection is made -C will be called. - -Here is an example of filling in C for a local TCP connection on -port 5555: - struct addrinfo *servinfo; - struct addrinfo hints; - memset(&hints, 0, sizeof hints); - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_PASSIVE; - r = getaddrinfo(NULL, "5555", &hints, &servinfo); - assert(r == 0); - oi_socket_connect(socket, servinfo); - -=item void oi_socket_attach (oi_socket *, struct ev_loop *loop); - -A socket must be attached to a loop in order before any callbacks will be -made. - -=item void oi_socket_detach (oi_socket *); - -Detaching a socket will not close the connection. - -=item void oi_socket_read_start (oi_socket *); - -This will make the socket start receiving data. When data is received the -C callback is made. The maximum amount of data that can be -receive at a time is controlled by C. - -The buffer returned by C is statically allocated exists only -for the length of the callback. That means if you need to save any of the -data coming down the line, you must copy it to a new buffer. - -Ideally you will have a parser attached to the C callback which can -be interrupted at any time. - -C can be changed at any time. - -=item void oi_socket_read_stop (oi_socket *); - -Stops receiving data. You may receive spurious C attempts even -though the socket reading is stopped - be prepared to handle them. - -=item void oi_socket_reset_timeout (oi_socket *); - -Reset the timeout to allow the socket to exist for another few seconds -(however long you specified in the initialization function). - -=item void oi_socket_write (oi_socket *socket, oi_buf *buf); - -Write the I to the socket. Each socket has a queue of C objects -to be written - this appends the specified buffer to the end of that queue. -You will be notified when the queue is empty with the C -callback. When the socket has written the buffer C will be -called. The release callback does not imply that the buffer was successfully -written. - -=item void oi_socket_write_simple (oi_socket *, const char *str, size_t len); - -Sometimes you are just hacking around and need to quickly write a string to -the socket. This convenience function allocates an C object, and -Cs the given string. The allocated buffer will be freed by liboi -internally. - -Most production most applications will use their own memory pool and will -not need this function. - -=item void oi_socket_write_eof (oi_socket *); - -This closes the write end of the socket. Further writes are not allowed -after this. - -=item void oi_socket_close (oi_socket *); - -Attempts to close the socket. - -If the socket is secure, an SSL bye message will be sent. -SSL recommends that you wait for a bye response from the peer however this -tends to be overkill for most people. By default liboi will not wait for -peer to send a matching bye message. If you require this then set -C to 1. - -When the close is complete C is made. The C -callback is not made until the program returns to the event loop. This is -because C may free the socket memory and if C was -called from C, then the socket object might unexpectedly -be gone. To summarize: C does not call C and -the socket memory is still accessable immediately after making calling -C. - -=item void oi_socket_set_secure_session (oi_socket *, gnutls_session_t); - -This make a socket use SSL. You must create the GnuTLS session yourself and -assign its credentials. - -=back - -=head1 Servers - -A server simply listens on an address for new connections. The connections -come in the form of C objects. The key is to give a -C callback which returns an initialized C. -The callback looks like this - - oi_socket* (*on_connection) (oi_server *, struct sockaddr *remote_addr, socklen_t remove_addr_len); - -Returning NULL from C will reject the connection. - -=over 4 - -=item void oi_server_init (oi_server *, int backlog); - -Initializes a server object. C is the argument given -internally to C. Set the C callback -after calling this. - -=item int oi_server_listen (oi_server *, struct addrinfo *addrinfo); - -Listens on the specified address. The server will not accept connections -until it is attached to a loop, however. - -=item void oi_server_attach (oi_server *, struct ev_loop *loop); - -Attaches a server to a loop. - -=item void oi_server_detach (oi_server *); - -Detaches a server to a loop. Does not close the server. - -=item void oi_server_close (oi_server *); - -Stops the server from listening. - -=back - -=head1 Files - -Files internally use a thread pool to operate without blocking. -The thread pool is started once a file is attached and it continues until -program termination. - -The following callbacks are used inside of the file object - void (*on_open) (oi_file *); - void (*on_read) (oi_file *, size_t count); - void (*on_drain) (oi_file *); - void (*on_error) (oi_file *, struct oi_error); - void (*on_close) (oi_file *); - -=over 4 - -=item int oi_file_init (oi_file *); - -Initializes a file object. - -=item void oi_file_attach (oi_file *, struct ev_loop *); - -Attaches a file object to a loop. If the thread pool has not been started, -then it is started at this call. - -=item void oi_file_detach (oi_file *); - -Detaches a file object from the loop. - -=item int oi_file_open_path (oi_file *, const char *path, int flags, mode_t mode); - -Opens a file specified by the path. The C and C arguments are -the same as used by L. The C callback is triggered -when the file is opened. Returns 0 on success. Returns -1 if the given file -is already open. - -WARNING: path argument must be valid until C object is closed and -the C callback is made. I.E., liboi does not strdup the path -pointer. - -=item int oi_file_open_stdin (oi_file *); - -=item int oi_file_open_stdout (oi_file *); - -=item int oi_file_open_stderr (oi_file *); - -=item void oi_file_read_start (oi_file *, void *buffer, size_t bufsize); - - - -=item void oi_file_read_stop (oi_file *); - -=item int oi_file_write (oi_file *, oi_buf *to_write); - -=item int oi_file_write_simple (oi_file *, const char *, size_t); - -=item int oi_file_send (oi_file *source, oi_socket *destination, off_t offset, size_t length); - -=item void oi_file_close (oi_file *); - -=over 4 - -=back - -=head1 AUTHOR - -Ryan Dahl - diff --git a/deps/liboi/oi_buf.c b/deps/liboi/oi_buf.c deleted file mode 100644 index a192f7dc58..0000000000 --- a/deps/liboi/oi_buf.c +++ /dev/null @@ -1,41 +0,0 @@ -#include -#include -#include - -void oi_buf_destroy - ( oi_buf *buf - ) -{ - free(buf->base); - free(buf); -} - -oi_buf * oi_buf_new2 - ( size_t len - ) -{ - oi_buf *buf = malloc(sizeof(oi_buf)); - if(!buf) - return NULL; - buf->base = malloc(len); - if(!buf->base) { - free(buf); - return NULL; - } - buf->len = len; - buf->release = oi_buf_destroy; - return buf; -} - -oi_buf * oi_buf_new - ( const char *base - , size_t len - ) -{ - oi_buf *buf = oi_buf_new2(len); - if(!buf) - return NULL; - memcpy(buf->base, base, len); - return buf; -} - diff --git a/deps/liboi/oi_buf.h b/deps/liboi/oi_buf.h deleted file mode 100644 index b810c050e5..0000000000 --- a/deps/liboi/oi_buf.h +++ /dev/null @@ -1,30 +0,0 @@ -#include - -#ifndef oi_buf_h -#define oi_buf_h -#ifdef __cplusplus -extern "C" { -#endif - -typedef struct oi_buf oi_buf; - -struct oi_buf { - /* public */ - char *base; - size_t len; - void (*release) (oi_buf *); /* called when oi is done with the object */ - void *data; - - /* private */ - size_t written; - oi_queue queue; -}; - -oi_buf * oi_buf_new (const char* base, size_t len); -oi_buf * oi_buf_new2 (size_t len); -void oi_buf_destroy (oi_buf *); - -#ifdef __cplusplus -} -#endif -#endif // oi_buf_h diff --git a/deps/liboi/oi_error.h b/deps/liboi/oi_error.h deleted file mode 100644 index 571a5701fd..0000000000 --- a/deps/liboi/oi_error.h +++ /dev/null @@ -1,28 +0,0 @@ -#ifndef oi_error_h -#define oi_error_h -#ifdef __cplusplus -extern "C" { -#endif - -enum oi_error_domain - { OI_ERROR_GNUTLS - , OI_ERROR_EV - , OI_ERROR_CLOSE - , OI_ERROR_SHUTDOWN - , OI_ERROR_OPEN - , OI_ERROR_SEND - , OI_ERROR_RECV - , OI_ERROR_WRITE - , OI_ERROR_READ - , OI_ERROR_SENDFILE - }; - -struct oi_error { - enum oi_error_domain domain; - int code; /* errno */ -}; - -#ifdef __cplusplus -} -#endif -#endif // oi_error_h diff --git a/deps/liboi/oi_queue.h b/deps/liboi/oi_queue.h deleted file mode 100644 index 0405e336f5..0000000000 --- a/deps/liboi/oi_queue.h +++ /dev/null @@ -1,69 +0,0 @@ -/* Copyright (C) 2002-2009 Igor Sysoev - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions - * are met: - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND - * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE - * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL - * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS - * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) - * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT - * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY - * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF - * SUCH DAMAGE. - */ -#ifndef oi_queue_h -#define oi_queue_h -#ifdef __cplusplus -extern "C" { -#endif - -#include /* offsetof() */ - -typedef struct oi_queue oi_queue; -struct oi_queue { - oi_queue *prev; - oi_queue *next; -}; - -#define oi_queue_init(q) \ - (q)->prev = q; \ - (q)->next = q - -#define oi_queue_empty(h) \ - (h == (h)->prev) - -#define oi_queue_insert_head(h, x) \ - (x)->next = (h)->next; \ - (x)->next->prev = x; \ - (x)->prev = h; \ - (h)->next = x - -#define oi_queue_head(h) \ - (h)->next - -#define oi_queue_last(h) \ - (h)->prev - -#define oi_queue_remove(x) \ - (x)->next->prev = (x)->prev; \ - (x)->prev->next = (x)->next; \ - (x)->prev = NULL; \ - (x)->next = NULL - -#define oi_queue_data(q, type, link) \ - (type *) ((unsigned char *) q - offsetof(type, link)) - -#ifdef __cplusplus -} -#endif -#endif // oi_queue_h diff --git a/deps/liboi/oi_socket.c b/deps/liboi/oi_socket.c index 57dd3fa95d..47643f9926 100644 --- a/deps/liboi/oi_socket.c +++ b/deps/liboi/oi_socket.c @@ -1,3 +1,29 @@ +/* Copyright (c) 2008,2009 Ryan Dahl + * + * oi_queue comes from ngx_queue.h + * Copyright (C) 2002-2009 Igor Sysoev + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ #include #include #include @@ -21,9 +47,13 @@ #if HAVE_GNUTLS # include -# define GNUTLS_NEED_WRITE (gnutls_record_get_direction(socket->session) == 1) -# define GNUTLS_NEED_READ (gnutls_record_get_direction(socket->session) == 0) -#endif +#endif // HAVE_GNUTLS + +/* a few forwards + * they wont even be defined if not having gnutls + * */ +static int secure_full_goodbye (oi_socket *socket); +static int secure_half_goodbye (oi_socket *socket); #undef TRUE #define TRUE 1 @@ -36,30 +66,56 @@ #define AGAIN 1 #define ERROR 2 -#define RAISE_ERROR(s, _domain, _code) do { \ - if(s->on_error) { \ - struct oi_error __oi_error; \ - __oi_error.domain = _domain; \ - __oi_error.code = _code; \ - s->on_error(s, __oi_error); \ - } \ -} while(0) \ +void +oi_buf_destroy (oi_buf *buf) +{ + free(buf->base); + free(buf); +} + +oi_buf * +oi_buf_new2 (size_t len) +{ + oi_buf *buf = malloc(sizeof(oi_buf)); + if(!buf) + return NULL; + buf->base = malloc(len); + if(!buf->base) { + free(buf); + return NULL; + } + buf->len = len; + buf->release = oi_buf_destroy; + return buf; +} + +oi_buf * +oi_buf_new (const char *base, size_t len) +{ + oi_buf *buf = oi_buf_new2(len); + if(!buf) + return NULL; + memcpy(buf->base, base, len); + return buf; +} + +#define CLOSE_ASAP(socket) do { \ + if ((socket)->read_action) { \ + (socket)->read_action = full_close; \ + } \ + if ((socket)->write_action) { \ + (socket)->write_action = full_close; \ + } \ +} while (0) static int full_close(oi_socket *socket) { - if(-1 == close(socket->fd) && errno == EINTR) { - /* TODO fd still open. next loop call close again? */ - assert(0 && "implement me"); - return ERROR; - } + if (close(socket->fd) == -1) + return errno == EINTR ? AGAIN : ERROR; socket->read_action = NULL; socket->write_action = NULL; - - if(socket->attached) { - ev_feed_event(SOCKET_LOOP_ &socket->read_watcher, EV_READ); - } return OKAY; } @@ -67,43 +123,79 @@ static int half_close(oi_socket *socket) { int r = shutdown(socket->fd, SHUT_WR); - - if(r == -1) { - RAISE_ERROR(socket, OI_ERROR_SHUTDOWN, errno); + if (r == -1) { + socket->errorno = errno; + assert(0 && "Shouldn't get an error on shutdown"); return ERROR; } - socket->write_action = NULL; - - /* TODO set timer to zero so we get a callback soon */ return OKAY; } +// This is to be called when ever the out_stream is empty +// and we need to change state. static void -update_write_buffer_after_send(oi_socket *socket, ssize_t sent) +change_state_for_empty_out_stream (oi_socket *socket) +{ + /* + * a very complicated bunch of close logic! + * XXX this is awful. FIXME + */ + if (socket->got_half_close == FALSE) { + if (socket->got_full_close == FALSE) { + /* Normal situation. Didn't get any close signals. */ + ev_io_stop(SOCKET_LOOP_ &socket->write_watcher); + } else { + /* Got Full Close. */ + if (socket->read_action) +#if HAVE_GNUTLS + socket->read_action = socket->secure ? secure_full_goodbye : full_close; +#else + socket->read_action = full_close; +#endif + + if (socket->write_action) +#if HAVE_GNUTLS + socket->write_action = socket->secure ? secure_full_goodbye : full_close; +#else + socket->write_action = full_close; +#endif + } + } else { + /* Got Half Close. */ + if (socket->write_action) +#if HAVE_GNUTLS + socket->write_action = socket->secure ? secure_half_goodbye : half_close; +#else + socket->write_action = half_close; +#endif + } +} + +static void +update_write_buffer_after_send (oi_socket *socket, ssize_t sent) { oi_queue *q = oi_queue_last(&socket->out_stream); oi_buf *to_write = oi_queue_data(q, oi_buf, queue); to_write->written += sent; socket->written += sent; - if(to_write->written == to_write->len) { + if (to_write->written == to_write->len) { oi_queue_remove(q); - if(to_write->release) { + if (to_write->release) { to_write->release(to_write); } - if(oi_queue_empty(&socket->out_stream)) { - ev_io_stop(SOCKET_LOOP_ &socket->write_watcher); - if(socket->on_drain) + if (oi_queue_empty(&socket->out_stream)) { + change_state_for_empty_out_stream(socket); + if (socket->on_drain) socket->on_drain(socket); } } } - #if HAVE_GNUTLS static int secure_socket_send(oi_socket *socket); static int secure_socket_recv(oi_socket *socket); @@ -123,7 +215,7 @@ nosigpipe_push(gnutls_transport_ptr_t data, const void *buf, size_t len) #endif int r = send(socket->fd, buf, len, flags); - if(r == -1) { + if (r == -1) { gnutls_transport_set_errno(socket->session, errno); /* necessary ? */ } @@ -137,26 +229,25 @@ secure_handshake(oi_socket *socket) int r = gnutls_handshake(socket->session); - if(gnutls_error_is_fatal(r)) { - RAISE_ERROR(socket, OI_ERROR_GNUTLS, r); + if (gnutls_error_is_fatal(r)) { + socket->gnutls_errorno = r; return ERROR; } - if(r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN) + if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN) return AGAIN; oi_socket_reset_timeout(socket); - if(!socket->connected) { + if (!socket->connected) { socket->connected = TRUE; - if(socket->on_connect) - socket->on_connect(socket); + if (socket->on_connect) socket->on_connect(socket); } - if(socket->read_action) + if (socket->read_action) socket->read_action = secure_socket_recv; - if(socket->write_action) + if (socket->write_action) socket->write_action = secure_socket_send; return OKAY; @@ -167,7 +258,7 @@ secure_socket_send(oi_socket *socket) { ssize_t sent; - if(oi_queue_empty(&socket->out_stream)) { + if (oi_queue_empty(&socket->out_stream)) { ev_io_stop(SOCKET_LOOP_ &socket->write_watcher); return AGAIN; } @@ -182,32 +273,23 @@ secure_socket_send(oi_socket *socket) , to_write->len - to_write->written ); - if(gnutls_error_is_fatal(sent)) { - RAISE_ERROR(socket, OI_ERROR_GNUTLS, sent); + if (gnutls_error_is_fatal(sent)) { + socket->gnutls_errorno = sent; return ERROR; } - if(sent == 0) + if (sent == 0) return AGAIN; oi_socket_reset_timeout(socket); - if(sent == GNUTLS_E_INTERRUPTED || sent == GNUTLS_E_AGAIN) { - if(GNUTLS_NEED_READ) { - if(socket->read_action) { - socket->read_action = secure_socket_send; - } else { - /* TODO GnuTLS needs read but already got EOF */ - assert(0 && "needs read but already got EOF"); - return ERROR; - } - } + if (sent == GNUTLS_E_INTERRUPTED || sent == GNUTLS_E_AGAIN) { return AGAIN; } - if(sent > 0) { + if (sent > 0) { /* make sure the callbacks are correct */ - if(socket->read_action) + if (socket->read_action) socket->read_action = secure_socket_recv; update_write_buffer_after_send(socket, sent); return OKAY; @@ -220,8 +302,8 @@ secure_socket_send(oi_socket *socket) static int secure_socket_recv(oi_socket *socket) { - char recv_buffer[TCP_MAXWIN]; - size_t recv_buffer_size = MIN(TCP_MAXWIN, socket->chunksize); + char recv_buffer[socket->chunksize]; + size_t recv_buffer_size = socket->chunksize; ssize_t recved; assert(socket->secure); @@ -230,22 +312,12 @@ secure_socket_recv(oi_socket *socket) //printf("secure socket recv %d %p\n", recved, socket->on_connect); - if(gnutls_error_is_fatal(recved)) { - RAISE_ERROR(socket, OI_ERROR_GNUTLS, recved); + if (gnutls_error_is_fatal(recved)) { + socket->gnutls_errorno = recved; return ERROR; } - if(recved == GNUTLS_E_INTERRUPTED || recved == GNUTLS_E_AGAIN) { - if(GNUTLS_NEED_WRITE) { - if(socket->write_action) { - printf("need write\n"); - socket->write_action = secure_socket_recv; - } else { - /* TODO GnuTLS needs send but already closed write end */ - assert(0 && "needs read but cannot"); - return ERROR; - } - } + if (recved == GNUTLS_E_INTERRUPTED || recved == GNUTLS_E_AGAIN) { return AGAIN; } @@ -254,27 +326,27 @@ secure_socket_recv(oi_socket *socket) /* A server may also receive GNUTLS_E_REHANDSHAKE when a client has * initiated a handshake. In that case the server can only initiate a * handshake or terminate the connection. */ - if(recved == GNUTLS_E_REHANDSHAKE) { - if(socket->write_action) { + if (recved == GNUTLS_E_REHANDSHAKE) { + if (socket->write_action) { socket->read_action = secure_handshake; socket->write_action = secure_handshake; return OKAY; } else { - /* TODO */ - assert(0 && "needs read but cannot"); + socket->read_action = full_close; + // set error return ERROR; } } - if(recved >= 0) { + if (recved >= 0) { /* Got EOF */ - if(recved == 0) + if (recved == 0) socket->read_action = NULL; - if(socket->write_action) + if (socket->write_action) socket->write_action = secure_socket_send; - if(socket->on_read) { socket->on_read(socket, recv_buffer, recved); } + if (socket->on_read) { socket->on_read(socket, recv_buffer, recved); } return OKAY; } @@ -284,51 +356,46 @@ secure_socket_recv(oi_socket *socket) } static int -secure_goodbye(oi_socket *socket, gnutls_close_request_t how) +secure_full_goodbye (oi_socket *socket) { assert(socket->secure); - int r = gnutls_bye(socket->session, how); + int r = gnutls_bye(socket->session, GNUTLS_SHUT_RDWR); - if(gnutls_error_is_fatal(r)) { - RAISE_ERROR(socket, OI_ERROR_GNUTLS, r); + if (gnutls_error_is_fatal(r)) { + socket->gnutls_errorno = r; return ERROR; } - if(r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN) + if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN) return AGAIN; + CLOSE_ASAP(socket); + return OKAY; } static int -secure_full_goodbye(oi_socket *socket) +secure_half_goodbye (oi_socket *socket) { - int r = secure_goodbye(socket, GNUTLS_SHUT_RDWR); - if(OKAY == r) { - return full_close(socket); - } - return r; -} + assert(socket->secure); -static int -secure_half_goodbye(oi_socket *socket) -{ - int r = secure_goodbye(socket, GNUTLS_SHUT_WR); - if(OKAY == r) { - return half_close(socket); + int r = gnutls_bye(socket->session, GNUTLS_SHUT_WR); + + if (gnutls_error_is_fatal(r)) { + socket->gnutls_errorno = r; + return ERROR; } - return r; + + if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN) + return AGAIN; + + if (socket->write_action) + socket->write_action = half_close; + + return OKAY; } -/* Tells the socket to use transport layer security (SSL). liboi does not - * want to make any decisions about security requirements, so the - * majoirty of GnuTLS configuration is left to the user. Only the transport - * layer of GnuTLS is controlled by liboi. - * - * That is, do not use gnutls_transport_* functions. - * Do use the rest of GnuTLS's API. - */ void oi_socket_set_secure_session (oi_socket *socket, gnutls_session_t session) { @@ -338,13 +405,13 @@ oi_socket_set_secure_session (oi_socket *socket, gnutls_session_t session) #endif /* HAVE GNUTLS */ static int -socket_send(oi_socket *socket) +socket_send (oi_socket *socket) { ssize_t sent; assert(socket->secure == FALSE); - if(oi_queue_empty(&socket->out_stream)) { + if (oi_queue_empty(&socket->out_stream)) { ev_io_stop(SOCKET_LOOP_ &socket->write_watcher); return AGAIN; } @@ -368,32 +435,36 @@ socket_send(oi_socket *socket) , flags ); - if(sent < 0) { - switch(errno) { + if (sent < 0) { + switch (errno) { +#ifdef EWOULDBLOCK + case EWOULDBLOCK: +#else case EAGAIN: +#endif return AGAIN; case ECONNREFUSED: + socket->errorno = errno; + return ERROR; + case ECONNRESET: - socket->write_action = NULL; - /* TODO maybe just clear write buffer instead of error? - * They should be able to read still from the socket. - */ - RAISE_ERROR(socket, OI_ERROR_SEND, errno); + socket->errorno = errno; return ERROR; default: perror("send()"); assert(0 && "oi shouldn't let this happen."); + socket->errorno = errno; return ERROR; } } oi_socket_reset_timeout(socket); - if(!socket->connected) { + if (!socket->connected) { socket->connected = TRUE; - if(socket->on_connect) { socket->on_connect(socket); } + if (socket->on_connect) socket->on_connect(socket); } update_write_buffer_after_send(socket, sent); @@ -402,7 +473,7 @@ socket_send(oi_socket *socket) } static int -socket_recv(oi_socket *socket) +socket_recv (oi_socket *socket) { char buf[TCP_MAXWIN]; size_t buf_size = TCP_MAXWIN; @@ -410,28 +481,30 @@ socket_recv(oi_socket *socket) assert(socket->secure == FALSE); - if(!socket->connected) { + if (!socket->connected) { socket->connected = TRUE; - if(socket->on_connect) { socket->on_connect(socket); } + if (socket->on_connect) socket->on_connect(socket); return OKAY; } recved = recv(socket->fd, buf, buf_size, 0); - if(recved < 0) { - switch(errno) { - case EAGAIN: + if (recved < 0) { + switch (errno) { +#ifdef EWOULDBLOCK + case EWOULDBLOCK: +#else + case EAGAIN: +#endif + return AGAIN; + case EINTR: return AGAIN; /* A remote host refused to allow the network connection (typically * because it is not running the requested service). */ case ECONNREFUSED: - RAISE_ERROR(socket, OI_ERROR_RECV, errno); - return ERROR; - - case ECONNRESET: - RAISE_ERROR(socket, OI_ERROR_RECV, errno); + socket->errorno = errno; return ERROR; default: @@ -444,19 +517,19 @@ socket_recv(oi_socket *socket) oi_socket_reset_timeout(socket); - if(recved == 0) { + if (recved == 0) { oi_socket_read_stop(socket); socket->read_action = NULL; } /* NOTE: EOF is signaled with recved == 0 on callback */ - if(socket->on_read) { socket->on_read(socket, buf, recved); } + if (socket->on_read) { socket->on_read(socket, buf, recved); } return OKAY; } static void -assign_file_descriptor(oi_socket *socket, int fd) +assign_file_descriptor (oi_socket *socket, int fd) { socket->fd = fd; @@ -467,7 +540,7 @@ assign_file_descriptor(oi_socket *socket, int fd) socket->write_action = socket_send; #if HAVE_GNUTLS - if(socket->secure) { + if (socket->secure) { gnutls_transport_set_lowat(socket->session, 0); gnutls_transport_set_push_function(socket->session, nosigpipe_push); gnutls_transport_set_ptr2 ( socket->session @@ -485,7 +558,7 @@ assign_file_descriptor(oi_socket *socket, int fd) * Called by server->connection_watcher. */ static void -on_connection(EV_P_ ev_io *watcher, int revents) +on_connection (EV_P_ ev_io *watcher, int revents) { oi_server *server = watcher->data; @@ -497,7 +570,7 @@ on_connection(EV_P_ ev_io *watcher, int revents) #endif assert(&server->connection_watcher == watcher); - if(EV_ERROR & revents) { + if (EV_ERROR & revents) { oi_server_close(server); return; } @@ -507,23 +580,23 @@ on_connection(EV_P_ ev_io *watcher, int revents) /* TODO accept all possible connections? currently: just one */ int fd = accept(server->fd, (struct sockaddr*)&address, &addr_len); - if(fd < 0) { + if (fd < 0) { perror("accept()"); return; } oi_socket *socket = NULL; - if(server->on_connection) + if (server->on_connection) socket = server->on_connection(server, (struct sockaddr*)&address, addr_len); - if(socket == NULL) { + if (socket == NULL) { close(fd); return; } int flags = fcntl(fd, F_GETFL, 0); int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK); - if(r < 0) { + if (r < 0) { /* TODO error report */ } @@ -541,21 +614,20 @@ int oi_server_listen(oi_server *server, struct addrinfo *addrinfo) { int fd = -1; - struct linger ling = {0, 0}; assert(server->listening == FALSE); fd = socket( addrinfo->ai_family , addrinfo->ai_socktype , addrinfo->ai_protocol ); - if(fd < 0) { + if (fd < 0) { perror("socket()"); return -1; } int flags = fcntl(fd, F_GETFL, 0); int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK); - if(r < 0) { + if (r < 0) { perror("fcntl()"); return -1; } @@ -563,7 +635,6 @@ oi_server_listen(oi_server *server, struct addrinfo *addrinfo) flags = 1; setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)); - setsockopt(fd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)); /* XXX: Sending single byte chunks in a response body? Perhaps there is a * need to enable the Nagel algorithm dynamically. For now disabling. @@ -596,7 +667,7 @@ oi_server_listen(oi_server *server, struct addrinfo *addrinfo) void oi_server_close(oi_server *server) { - if(server->listening) { + if (server->listening) { oi_server_detach(server); close(server->fd); /* TODO do this on the loop? check return value? */ @@ -647,23 +718,20 @@ on_timeout(EV_P_ ev_timer *watcher, int revents) assert(watcher == &socket->timeout_watcher); - // printf("on_timeout\n"); - - if(socket->on_timeout) { socket->on_timeout(socket); } - + // printf("on_timeout\n"); - /* TODD set timer to zero */ - full_close(socket); + if (socket->on_timeout) { socket->on_timeout(socket); } + // timeout does not automatically kill your connection. you must! } static void release_write_buffer(oi_socket *socket) { - while(!oi_queue_empty(&socket->out_stream)) { + while (!oi_queue_empty(&socket->out_stream)) { oi_queue *q = oi_queue_last(&socket->out_stream); oi_buf *buf = oi_queue_data(q, oi_buf, queue); oi_queue_remove(q); - if(buf->release) { buf->release(buf); } + if (buf->release) { buf->release(buf); } } } @@ -673,57 +741,55 @@ on_io_event(EV_P_ ev_io *watcher, int revents) { oi_socket *socket = watcher->data; - if(revents & EV_ERROR) { - RAISE_ERROR(socket, OI_ERROR_EV, 0); - goto close; + if (revents & EV_ERROR) { + socket->errorno = 1; + CLOSE_ASAP(socket); } int r; - int have_read_event = TRUE; - int have_write_event = TRUE; + int have_read_event = (socket->read_action != NULL); + int have_write_event = (socket->write_action != NULL); - while(have_read_event || have_write_event) { - - if(socket->read_action) { - r = socket->read_action(socket); - if(r == ERROR) goto close; - if(r == AGAIN) have_read_event = FALSE; - } else { + while (have_read_event || have_write_event) { + /* RECV LOOP - TRY TO CLEAR THE BUFFER */ + if (socket->read_action == NULL) have_read_event = FALSE; - } + else { + r = socket->read_action(socket); - if(socket->write_action) { - r = socket->write_action(socket); - if(r == ERROR) goto close; - if(r == AGAIN) have_write_event = FALSE; - } else { - have_write_event = FALSE; + if (r == AGAIN) + have_read_event = FALSE; + else if (r == ERROR) + CLOSE_ASAP(socket); } - - if(socket->read_watcher.active == FALSE) - have_read_event = FALSE; - if(socket->write_watcher.active == FALSE) + /* SEND LOOP - TRY TO CLEAR THE BUFFER */ + if (socket->write_action == NULL) have_write_event = FALSE; - } - - if(socket->write_action == NULL && socket->read_action == NULL) - goto close; + else { + r = socket->write_action(socket); - return; + if (r == AGAIN) + have_write_event = FALSE; + else if (r == ERROR) + CLOSE_ASAP(socket); + } + } -close: - release_write_buffer(socket); + // Close when both sides of the stream are closed. + if (socket->write_action == NULL && socket->read_action == NULL) { + release_write_buffer(socket); - ev_clear_pending (EV_A_ &socket->write_watcher); - ev_clear_pending (EV_A_ &socket->read_watcher); - ev_clear_pending (EV_A_ &socket->timeout_watcher); + ev_clear_pending (EV_A_ &socket->write_watcher); + ev_clear_pending (EV_A_ &socket->read_watcher); + ev_clear_pending (EV_A_ &socket->timeout_watcher); - oi_socket_detach(socket); + oi_socket_detach(socket); - if(socket->on_close) { socket->on_close(socket); } - /* WARNING: user can free socket in on_close so no more - * access beyond this point. */ + if (socket->on_close) { socket->on_close(socket); } + /* WARNING: user can free socket in on_close so no more + * access beyond this point. */ + } } /** @@ -752,13 +818,17 @@ oi_socket_init(oi_socket *socket, float timeout) ev_init(&socket->read_watcher, on_io_event); socket->read_watcher.data = socket; + socket->got_full_close = FALSE; + socket->got_half_close = FALSE; + + socket->errorno = 0; + socket->secure = FALSE; - socket->wait_for_secure_hangup = FALSE; #if HAVE_GNUTLS + socket->gnutls_errorno = 0; socket->session = NULL; #endif - /* TODO higher resolution timer */ ev_timer_init(&socket->timeout_watcher, on_timeout, 0., timeout); socket->timeout_watcher.data = socket; @@ -769,90 +839,50 @@ oi_socket_init(oi_socket *socket, float timeout) socket->on_connect = NULL; socket->on_read = NULL; socket->on_drain = NULL; - socket->on_error = NULL; socket->on_timeout = NULL; } void -oi_socket_write_eof (oi_socket *socket) +oi_socket_close (oi_socket *socket) { -#if HAVE_GNUTLS - /* try to hang up properly for secure connections */ - if(socket->secure) - { - if( socket->connected /* completed handshake */ - && socket->write_action /* write end is open */ - ) - { - socket->write_action = secure_half_goodbye; - if(socket->attached) - ev_io_start(SOCKET_LOOP_ &socket->write_watcher); - return; - } - /* secure servers cannot handle half-closed connections? */ - full_close(socket); - return; - } -#endif // HAVE_GNUTLS - - if(socket->write_action) - half_close(socket); - else - full_close(socket); + socket->got_half_close = TRUE; + if (oi_queue_empty(&socket->out_stream)) + change_state_for_empty_out_stream(socket); } void -oi_socket_close (oi_socket *socket) +oi_socket_full_close (oi_socket *socket) { -#if HAVE_GNUTLS - /* try to hang up properly for secure connections */ - if( socket->secure - && socket->connected /* completed handshake */ - && socket->write_action /* write end is open */ - ) - { - if(socket->wait_for_secure_hangup && socket->read_action) { - socket->write_action = secure_full_goodbye; - socket->read_action = secure_full_goodbye; - } else { - socket->write_action = secure_half_goodbye; - socket->read_action = NULL; - } - - if(socket->attached) - ev_io_start(SOCKET_LOOP_ &socket->write_watcher); - - return; - } -#endif // HAVE_GNUTLS - - full_close(socket); + socket->got_full_close = TRUE; + if (oi_queue_empty(&socket->out_stream)) + change_state_for_empty_out_stream(socket); } -/* - * Resets the timeout to stay alive for another socket->timeout seconds - */ -void -oi_socket_reset_timeout(oi_socket *socket) +void oi_socket_force_close (oi_socket *socket) { - ev_timer_again(SOCKET_LOOP_ &socket->timeout_watcher); + // socket->errorno = OI_SOCKET_ERROR_FORCE_CLOSE + CLOSE_ASAP(socket); } -/** - * Writes a string to the socket. This is actually sets a watcher which may - * take multiple iterations to write the entire string. - */ void oi_socket_write(oi_socket *socket, oi_buf *buf) { - if(socket->write_action == NULL) - return; + assert(socket->write_action != NULL && "Do not write to a closed socket"); + assert(socket->got_full_close == FALSE && "Do not write to a closing socket"); + assert(socket->got_half_close == FALSE && "Do not write to a closing socket"); oi_queue_insert_head(&socket->out_stream, &buf->queue); - buf->written = 0; - // XXX if (socket->attached) ?? - ev_io_start(SOCKET_LOOP_ &socket->write_watcher); + + if (socket->attached) { + ev_io_start(SOCKET_LOOP_ &socket->write_watcher); + } +} + +void +oi_socket_reset_timeout(oi_socket *socket) +{ + ev_timer_again(SOCKET_LOOP_ &socket->timeout_watcher); } static void @@ -886,20 +916,17 @@ oi_socket_attach(EV_P_ oi_socket *socket) ev_timer_again(EV_A_ &socket->timeout_watcher); - if(socket->read_action) + if (socket->read_action) ev_io_start(EV_A_ &socket->read_watcher); - if(socket->write_action) + if (socket->write_action) ev_io_start(EV_A_ &socket->write_watcher); - - /* make sure the io_event happens soon in the case we're being reattached */ - ev_feed_event(EV_A_ &socket->read_watcher, EV_READ); } void oi_socket_detach(oi_socket *socket) { - if(socket->attached) { + if (socket->attached) { ev_io_stop(SOCKET_LOOP_ &socket->write_watcher); ev_io_stop(SOCKET_LOOP_ &socket->read_watcher); ev_timer_stop(SOCKET_LOOP_ &socket->timeout_watcher); @@ -920,7 +947,7 @@ oi_socket_read_stop (oi_socket *socket) void oi_socket_read_start (oi_socket *socket) { - if(socket->read_action) { + if (socket->read_action) { ev_io_start(SOCKET_LOOP_ &socket->read_watcher); /* XXX feed event? */ } @@ -933,14 +960,14 @@ oi_socket_connect(oi_socket *s, struct addrinfo *addrinfo) , addrinfo->ai_socktype , addrinfo->ai_protocol ); - if(fd < 0) { + if (fd < 0) { perror("socket()"); return -1; } int flags = fcntl(fd, F_GETFL, 0); int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK); - if(r < 0) { + if (r < 0) { perror("fcntl()"); return -1; } @@ -955,7 +982,7 @@ oi_socket_connect(oi_socket *s, struct addrinfo *addrinfo) , addrinfo->ai_addrlen ); - if(r < 0 && errno != EINPROGRESS) { + if (r < 0 && errno != EINPROGRESS) { perror("connect"); close(fd); return -1; diff --git a/deps/liboi/oi_socket.h b/deps/liboi/oi_socket.h index 2298c633b1..878ebffbe5 100644 --- a/deps/liboi/oi_socket.h +++ b/deps/liboi/oi_socket.h @@ -1,8 +1,32 @@ +/* Copyright (c) 2008,2009 Ryan Dahl + * + * oi_queue comes from ngx_queue.h + * Copyright (C) 2002-2009 Igor Sysoev + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ #include #include -#include -#include -#include +#include /* offsetof() */ #ifndef oi_socket_h #define oi_socket_h @@ -16,10 +40,48 @@ extern "C" { #if HAVE_GNUTLS # include #endif +typedef struct oi_queue oi_queue; +struct oi_queue { + oi_queue *prev; + oi_queue *next; +}; + +#define oi_queue_init(q) \ + (q)->prev = q; \ + (q)->next = q + +#define oi_queue_empty(h) \ + (h == (h)->prev) + +#define oi_queue_insert_head(h, x) \ + (x)->next = (h)->next; \ + (x)->next->prev = x; \ + (x)->prev = h; \ + (h)->next = x + +#define oi_queue_head(h) \ + (h)->next + +#define oi_queue_last(h) \ + (h)->prev + +#define oi_queue_remove(x) \ + (x)->next->prev = (x)->prev; \ + (x)->prev->next = (x)->next; \ + (x)->prev = NULL; \ + (x)->next = NULL +#define oi_queue_data(q, type, link) \ + (type *) ((unsigned char *) q - offsetof(type, link)) + +typedef struct oi_buf oi_buf; typedef struct oi_server oi_server; typedef struct oi_socket oi_socket; +oi_buf * oi_buf_new (const char* base, size_t len); +oi_buf * oi_buf_new2 (size_t len); +void oi_buf_destroy (oi_buf *); + void oi_server_init (oi_server *, int backlog); int oi_server_listen (oi_server *, struct addrinfo *addrinfo); void oi_server_attach (EV_P_ oi_server *); @@ -27,21 +89,65 @@ void oi_server_detach (oi_server *); void oi_server_close (oi_server *); void oi_socket_init (oi_socket *, float timeout); - int oi_socket_pair (oi_socket *a, oi_socket *b); /* TODO */ int oi_socket_connect (oi_socket *, struct addrinfo *addrinfo); void oi_socket_attach (EV_P_ oi_socket *); void oi_socket_detach (oi_socket *); void oi_socket_read_start (oi_socket *); void oi_socket_read_stop (oi_socket *); + +/* Resets the timeout to stay alive for another socket->timeout seconds + */ void oi_socket_reset_timeout (oi_socket *); + +/* Writes a buffer to the socket. + * (Do not send a NULL oi_buf or a buffer with oi_buf->base == NULL.) + */ void oi_socket_write (oi_socket *, oi_buf *); + void oi_socket_write_simple (oi_socket *, const char *str, size_t len); -void oi_socket_write_eof (oi_socket *); + +/* Once the write buffer is drained, oi_socket_close will shutdown the + * writing end of the socket and will close the read end once the server + * replies with an EOF. + */ void oi_socket_close (oi_socket *); + +/* Do not wait for the server to reply with EOF. This will only be called + * once the write buffer is drained. + * Warning: For TCP socket, the OS kernel may (should) reply with RST + * packets if this is called when data is still being received from the + * server. + */ +void oi_socket_full_close (oi_socket *); + +/* The most extreme measure. + * Will not wait for the write queue to complete. + */ +void oi_socket_force_close (oi_socket *); + + #if HAVE_GNUTLS +/* Tells the socket to use transport layer security (SSL). oi_socket does + * not want to make any decisions about security requirements, so the + * majoirty of GnuTLS configuration is left to the user. Only the transport + * layer of GnuTLS is controlled by oi_socket. That is, do not use + * gnutls_transport_* functions. Do use the rest of GnuTLS's API. + */ void oi_socket_set_secure_session (oi_socket *, gnutls_session_t); #endif +struct oi_buf { + /* public */ + char *base; + size_t len; + void (*release) (oi_buf *); /* called when oi is done with the object */ + void *data; + + /* private */ + size_t written; + oi_queue queue; +}; + struct oi_server { /* read only */ int fd; @@ -57,7 +163,7 @@ struct oi_server { /* public */ oi_socket* (*on_connection) (oi_server *, struct sockaddr *remote_addr, socklen_t remove_addr_len); - void (*on_error) (oi_server *, struct oi_error e); + void (*on_error) (oi_server *); void *data; }; @@ -73,12 +179,19 @@ struct oi_socket { unsigned attached:1; unsigned connected:1; unsigned secure:1; - unsigned wait_for_secure_hangup:1; + unsigned got_full_close:1; + unsigned got_half_close:1; - /* if these are NULL then it means that end of the socket is closed. */ + /* NULL = that end of the socket is closed. */ int (*read_action) (oi_socket *); int (*write_action) (oi_socket *); + /* ERROR CODES. 0 = no error. Check on_close. */ + int errorno; +#if HAVE_GNUTLS + int gnutls_errorno; +#endif + /* private */ ev_io write_watcher; ev_io read_watcher; @@ -92,7 +205,6 @@ struct oi_socket { void (*on_connect) (oi_socket *); void (*on_read) (oi_socket *, const void *buf, size_t count); void (*on_drain) (oi_socket *); - void (*on_error) (oi_socket *, struct oi_error e); void (*on_close) (oi_socket *); void (*on_timeout) (oi_socket *); void *data; diff --git a/deps/liboi/test/common.c b/deps/liboi/test/common.c index 4703b402fd..9003152299 100644 --- a/deps/liboi/test/common.c +++ b/deps/liboi/test/common.c @@ -11,7 +11,7 @@ #include -#include +#include #include #define HOST "127.0.0.1" @@ -23,8 +23,10 @@ int nconnections; static void on_peer_close(oi_socket *socket) { + assert(socket->errorno == 0); //printf("server connection closed\n"); #if HAVE_GNUTLS + assert(socket->gnutls_errorno == 0); #if SECURE gnutls_deinit(socket->session); #endif @@ -46,12 +48,6 @@ on_peer_timeout(oi_socket *socket) assert(0); } -static void -on_client_error(oi_socket *socket, struct oi_error e) -{ - assert(0); -} - #if HAVE_GNUTLS #if SECURE diff --git a/deps/liboi/test/connection_interruption.c b/deps/liboi/test/connection_interruption.c index 7dd659194c..bcc0df532d 100644 --- a/deps/liboi/test/connection_interruption.c +++ b/deps/liboi/test/connection_interruption.c @@ -18,13 +18,6 @@ on_peer_drain(oi_socket *socket) oi_socket_close(socket); } -static void -on_peer_error2(oi_socket *socket, struct oi_error e) -{ - if(e.domain == OI_ERROR_GNUTLS) return; - assert(0); -} - static oi_socket* on_server_connection(oi_server *server, struct sockaddr *addr, socklen_t len) { @@ -32,7 +25,6 @@ on_server_connection(oi_server *server, struct sockaddr *addr, socklen_t len) oi_socket_init(socket, TIMEOUT); socket->on_read = on_peer_read; socket->on_drain = on_peer_drain; - socket->on_error = on_peer_error2; socket->on_close = on_peer_close; socket->on_timeout = on_peer_timeout; @@ -51,7 +43,7 @@ static void on_client_connect(oi_socket *socket) { //printf("on client connection\n"); - oi_socket_write_eof(socket); + oi_socket_close(socket); } static void @@ -69,13 +61,18 @@ on_client_close(oi_socket *socket) static void on_client_read(oi_socket *socket, const void *base, size_t len) { + if (len == 0) { + oi_socket_close(socket); + return; + } + char buf[200000]; strncpy(buf, base, len); buf[len] = 0; //printf("client got message: %s\n", buf); - if(strcmp(buf, "BYE") == 0) { + if (strcmp(buf, "BYE") == 0) { oi_socket_close(socket); } else { assert(0); @@ -132,7 +129,6 @@ main(int argc, const char *argv[]) oi_socket *client = malloc(sizeof(oi_socket)); oi_socket_init(client, TIMEOUT); client->on_read = on_client_read; - client->on_error = on_client_error; client->on_connect = on_client_connect; client->on_close = on_client_close; client->on_timeout = on_client_timeout; diff --git a/deps/liboi/test/echo.c b/deps/liboi/test/echo.c index 69cd430ea4..7b7a4068ec 100644 --- a/deps/liboi/test/echo.c +++ b/deps/liboi/test/echo.c @@ -1,5 +1,8 @@ #include "test/common.c" +// timeout must match the timeout in timeout.rb +#define TIMEOUT 5.0 + int successful_ping_count; static void @@ -11,19 +14,12 @@ on_peer_read(oi_socket *socket, const void *base, size_t len) oi_socket_write_simple(socket, base, len); } -static void -on_peer_error(oi_socket *socket, struct oi_error e) -{ - assert(0); -} - static oi_socket* on_server_connection(oi_server *server, struct sockaddr *addr, socklen_t len) { oi_socket *socket = malloc(sizeof(oi_socket)); - oi_socket_init(socket, 5.0); + oi_socket_init(socket, TIMEOUT); socket->on_read = on_peer_read; - socket->on_error = on_peer_error; socket->on_close = on_peer_close; socket->on_timeout = on_peer_timeout; diff --git a/deps/liboi/test/ping_pong.c b/deps/liboi/test/ping_pong.c index 26b3f689d2..08ffabecfa 100644 --- a/deps/liboi/test/ping_pong.c +++ b/deps/liboi/test/ping_pong.c @@ -2,15 +2,18 @@ #define PING "PING" #define PONG "PONG" -#define EXCHANGES 100 +#define EXCHANGES 500 +#define TIMEOUT 5.0 int successful_ping_count; static void on_peer_read(oi_socket *socket, const void *base, size_t len) { - if(len == 0) + if (len == 0) { + oi_socket_close(socket); return; + } char buf[2000]; strncpy(buf, base, len); @@ -20,12 +23,6 @@ on_peer_read(oi_socket *socket, const void *base, size_t len) oi_socket_write_simple(socket, PONG, sizeof PONG); } -static void -on_peer_error(oi_socket *socket, struct oi_error e) -{ - assert(0); -} - static void on_client_close(oi_socket *socket) { @@ -37,9 +34,8 @@ static oi_socket* on_server_connection(oi_server *server, struct sockaddr *addr, socklen_t len) { oi_socket *socket = malloc(sizeof(oi_socket)); - oi_socket_init(socket, 5.0); + oi_socket_init(socket, TIMEOUT); socket->on_read = on_peer_read; - socket->on_error = on_peer_error; socket->on_close = on_peer_close; socket->on_timeout = on_peer_timeout; @@ -57,15 +53,20 @@ on_server_connection(oi_server *server, struct sockaddr *addr, socklen_t len) } static void -on_client_connect(oi_socket *socket) +on_client_connect (oi_socket *socket) { //printf("client connected. sending ping\n"); oi_socket_write_simple(socket, PING, sizeof PING); } static void -on_client_read(oi_socket *socket, const void *base, size_t len) +on_client_read (oi_socket *socket, const void *base, size_t len) { + if(len == 0) { + oi_socket_close(socket); + return; + } + char buf[200000]; strncpy(buf, base, len); buf[len] = 0; @@ -134,9 +135,8 @@ main(int argc, const char *argv[]) assert(r == 0); oi_server_attach(EV_DEFAULT_ &server); - oi_socket_init(&client, 5.0); + oi_socket_init(&client, TIMEOUT); client.on_read = on_client_read; - client.on_error = on_client_error; client.on_connect = on_client_connect; client.on_close = on_client_close; client.on_timeout = on_client_timeout; diff --git a/src/net.cc b/src/net.cc index e1a8e8643d..20119b02d3 100644 --- a/src/net.cc +++ b/src/net.cc @@ -26,7 +26,7 @@ using namespace node; #define CONNECT_SYMBOL String::NewSymbol("connect") #define ENCODING_SYMBOL String::NewSymbol("encoding") #define TIMEOUT_SYMBOL String::NewSymbol("timeout") -#define SERVER_SYMBOL String::NewSymbol("server") +#define SERVER_SYMBOL String::NewSymbol("server") #define PROTOCOL_SYMBOL String::NewSymbol("protocol") #define PROTOCOL_CLASS_SYMBOL String::NewSymbol("protocol_class") @@ -55,9 +55,10 @@ Connection::Initialize (v8::Handle target) constructor_template->InstanceTemplate()->SetInternalFieldCount(1); NODE_SET_METHOD(constructor_template->PrototypeTemplate(), "connect", v8Connect); - NODE_SET_METHOD(constructor_template->PrototypeTemplate(), "close", v8Close); NODE_SET_METHOD(constructor_template->PrototypeTemplate(), "send", v8Send); - NODE_SET_METHOD(constructor_template->PrototypeTemplate(), "sendEOF", v8SendEOF); + NODE_SET_METHOD(constructor_template->PrototypeTemplate(), "close", v8Close); + NODE_SET_METHOD(constructor_template->PrototypeTemplate(), "fullClose", v8FullClose); + NODE_SET_METHOD(constructor_template->PrototypeTemplate(), "forceClose", v8ForceClose); target->Set(String::NewSymbol("TCPConnection"), constructor_template->GetFunction()); } @@ -97,12 +98,17 @@ Connection::Connection (Handle handle, Handle protocol_class) socket_.on_connect = Connection::on_connect; socket_.on_read = Connection::on_read; socket_.on_drain = Connection::on_drain; - socket_.on_error = Connection::on_error; socket_.on_close = Connection::on_close; socket_.on_timeout = Connection::on_timeout; socket_.data = this; } +Connection::~Connection () +{ + handle_->Delete(SEND_SYMBOL); + Close(); +} + Local Connection::GetProtocol (void) { @@ -207,9 +213,7 @@ Connection::AfterResolve (eio_req *req) return 0; } - - oi_error e; // TODO better error! - connection->OnError(e); + connection->OnDisconnect(); return 0; } @@ -224,15 +228,31 @@ Connection::v8Close (const Arguments& args) } Handle -Connection::v8Send (const Arguments& args) +Connection::v8FullClose (const Arguments& args) { HandleScope scope; Connection *connection = NODE_UNWRAP(Connection, args.Holder()); + connection->FullClose(); + return Undefined(); +} - if (args[0] == Null()) { - oi_socket_write_eof(&connection->socket_); +Handle +Connection::v8ForceClose (const Arguments& args) +{ + HandleScope scope; + Connection *connection = NODE_UNWRAP(Connection, args.Holder()); + connection->ForceClose(); + return Undefined(); +} + + +Handle +Connection::v8Send (const Arguments& args) +{ + HandleScope scope; + Connection *connection = NODE_UNWRAP(Connection, args.Holder()); - } else if (args[0]->IsString()) { + if (args[0]->IsString()) { // utf8 encoding Local s = args[0]->ToString(); size_t length = s->Utf8Length(); @@ -256,15 +276,6 @@ Connection::v8Send (const Arguments& args) return Undefined(); } -Handle -Connection::v8SendEOF (const Arguments& args) -{ - HandleScope scope; - Connection *connection = NODE_UNWRAP(Connection, args.Holder()); - connection->SendEOF(); - return Undefined(); -} - void Connection::OnReceive (const void *buf, size_t len) { @@ -305,18 +316,6 @@ Connection::OnReceive (const void *buf, size_t len) fatal_exception(try_catch); // XXX is this the right action to take? } -void -Connection::OnError (oi_error e) -{ - HandleScope scope; - Local protocol = GetProtocol(); - Local callback_v = protocol->Get(ON_ERROR_SYMBOL); - if (!callback_v->IsFunction()) return; - Handle callback = Handle::Cast(callback_v); - // TODO call with error arg - callback->Call(protocol, 0, NULL); -} - #define DEFINE_SIMPLE_CALLBACK(name, symbol) \ void name () \ { \ @@ -372,7 +371,6 @@ Acceptor::Acceptor (Handle handle, Handle protocol_class, Han oi_server_init(&server_, backlog); server_.on_connection = Acceptor::on_connection; - server_.on_error = Acceptor::on_error; server_.data = this; } @@ -398,17 +396,6 @@ Acceptor::OnConnection (struct sockaddr *addr, socklen_t len) return connection; } -void -Acceptor::OnError (struct oi_error error) -{ - HandleScope scope; - - Local callback_v = handle_->Get(ON_ERROR_SYMBOL); - if (!callback_v->IsFunction()) return; - Local callback = Local::Cast(callback_v); - callback->Call(handle_, 0, NULL); // TODO args -} - Handle Acceptor::v8New (const Arguments& args) { diff --git a/src/net.h b/src/net.h index 5cf9b0a693..e4b03a599b 100644 --- a/src/net.h +++ b/src/net.h @@ -19,16 +19,18 @@ protected: static v8::Handle v8New (const v8::Arguments& args); static v8::Handle v8Connect (const v8::Arguments& args); static v8::Handle v8Send (const v8::Arguments& args); - static v8::Handle v8SendEOF (const v8::Arguments& args); static v8::Handle v8Close (const v8::Arguments& args); + static v8::Handle v8FullClose (const v8::Arguments& args); + static v8::Handle v8ForceClose (const v8::Arguments& args); Connection (v8::Handle handle, v8::Handle protocol_class); - virtual ~Connection () { Close(); } + virtual ~Connection (); int Connect (struct addrinfo *address) { return oi_socket_connect (&socket_, address); } - void Send (oi_buf *buf) { oi_socket_write (&socket_, buf); } - void SendEOF (void) { oi_socket_write_eof (&socket_); } - void Close (void) { oi_socket_close (&socket_); } + void Send (oi_buf *buf) { oi_socket_write(&socket_, buf); } + void Close (void) { oi_socket_close(&socket_); } + void FullClose (void) { oi_socket_full_close(&socket_); } + void ForceClose (void) { oi_socket_force_close(&socket_); } void SetAcceptor (v8::Handle acceptor_handle); @@ -37,7 +39,6 @@ protected: virtual void OnDrain (void); virtual void OnEOF (void); virtual void OnDisconnect (void); - virtual void OnError (oi_error e); virtual void OnTimeout (void); v8::Local GetProtocol (void); @@ -65,11 +66,6 @@ private: connection->OnDrain(); } - static void on_error (oi_socket *s, oi_error e) { - Connection *connection = static_cast (s->data); - connection->OnError(e); - } - static void on_close (oi_socket *s) { Connection *connection = static_cast (s->data); connection->OnDisconnect(); @@ -118,7 +114,6 @@ protected: } virtual Connection* OnConnection (struct sockaddr *addr, socklen_t len); - virtual void OnError (struct oi_error error); private: static oi_socket* on_connection (oi_server *s, struct sockaddr *addr, socklen_t len) { @@ -130,11 +125,6 @@ private: return NULL; } - static void on_error (oi_server *s, struct oi_error error) { - Acceptor *acceptor = static_cast (s->data); - acceptor->OnError (error); - } - oi_server server_; }; diff --git a/test/test-pingpong.js b/test/test-pingpong.js index dfb206d637..e23c49861b 100644 --- a/test/test-pingpong.js +++ b/test/test-pingpong.js @@ -22,7 +22,6 @@ function Ponger (socket) { this.onEOF = function () { puts("ponger: onEOF"); - socket.send("QUIT"); socket.close(); }; @@ -47,7 +46,7 @@ function Pinger (socket) { socket.send("PING"); } else { puts("sending FIN"); - socket.sendEOF(); + socket.close(); } }; diff --git a/wscript b/wscript index baf7fc18b6..994ad37bb7 100644 --- a/wscript +++ b/wscript @@ -110,7 +110,7 @@ def build(bld): ### oi oi = bld.new_task_gen("cc", "staticlib") - oi.source = "deps/liboi/oi_socket.c deps/liboi/oi_buf.c" + oi.source = "deps/liboi/oi_socket.c" oi.includes = "deps/liboi/" oi.name = "oi" oi.target = "oi"