diff --git a/deps/evcom/evcom.c b/deps/evcom/evcom.c index 6fe862e73b..febed43a38 100644 --- a/deps/evcom/evcom.c +++ b/deps/evcom/evcom.c @@ -1,6 +1,6 @@ /* Copyright (c) 2008,2009 Ryan Dahl * - * evcom_queue comes from ngx_queue.h + * evcom_queue comes from ngx_queue.h * Copyright (C) 2002-2009 Igor Sysoev * * Redistribution and use in source and binary forms, with or without @@ -36,20 +36,45 @@ #include #include /* shutdown */ #include +#include /* sockaddr_in, sockaddr_in6 */ #include #include #if EV_MULTIPLICITY # define D_LOOP_(d) (d)->loop, +# define D_LOOP_SET(d, _loop) do { (d)->loop = (_loop); } while (0) #else # define D_LOOP_(d) +# define D_LOOP_SET(d, _loop) #endif // EV_MULTIPLICITY + +/* SEND STATES */ +static int stream_send__wait_for_connection (evcom_stream*); +static int stream_send__data (evcom_stream*); +static int stream_send__drain (evcom_stream*); +static int stream_send__wait_for_eof (evcom_stream*); +static int stream_send__wait_for_buf (evcom_stream*); +static int stream_send__shutdown (evcom_stream*); +#if EVCOM_HAVE_GNUTLS +static int stream_send__gnutls_bye (evcom_stream*); +#endif +static int stream_send__close_one (evcom_stream*); +static int stream_send__close (evcom_stream*); + +/* RECV STATES */ +static int stream_recv__data (evcom_stream*); +static int stream_recv__wait_for_resume (evcom_stream*); +static int stream_recv__wait_for_close (evcom_stream*); +static int stream_recv__close_one (evcom_stream*); +static int stream_recv__close (evcom_stream*); + +/* COMMON STATES */ #if EVCOM_HAVE_GNUTLS -static int secure_hangup (evcom_descriptor *); +static int stream__handshake (evcom_stream*); #endif -static int recv_send (evcom_descriptor *); +static int stream__close_both (evcom_stream*); #undef TRUE #define TRUE 1 @@ -60,18 +85,16 @@ static int recv_send (evcom_descriptor *); #define OKAY 0 #define AGAIN 1 -#define ERROR 2 #define ATTACHED(s) ((s)->flags & EVCOM_ATTACHED) #define LISTENING(s) ((s)->flags & EVCOM_LISTENING) #define CONNECTED(s) ((s)->flags & EVCOM_CONNECTED) #define SECURE(s) ((s)->flags & EVCOM_SECURE) -#define GOT_HALF_CLOSE(s) ((s)->flags & EVCOM_GOT_HALF_CLOSE) -#define GOT_FULL_CLOSE(s) ((s)->flags & EVCOM_GOT_FULL_CLOSE) +#define DUPLEX(s) ((s)->flags & EVCOM_DUPLEX) +#define GOT_CLOSE(s) ((s)->flags & EVCOM_GOT_CLOSE) #define PAUSED(s) ((s)->flags & EVCOM_PAUSED) #define READABLE(s) ((s)->flags & EVCOM_READABLE) #define WRITABLE(s) ((s)->flags & EVCOM_WRITABLE) -#define GOT_WRITE_EVENT(s) ((s)->flags & EVCOM_GOT_WRITE_EVENT) static int too_many_connections = 0; @@ -87,33 +110,26 @@ set_nonblock (int fd) return 0; } -void -evcom_buf_destroy (evcom_buf *buf) -{ - free(buf->base); - free(buf); -} - evcom_buf * evcom_buf_new2 (size_t len) { - evcom_buf *buf = malloc(sizeof(evcom_buf)); - if (!buf) return NULL; - buf->base = malloc(len); - if (!buf->base) { - free(buf); - return NULL; - } + void *data = malloc(sizeof(evcom_buf) + len); + if (!data) return NULL; + + evcom_buf *buf = data; buf->len = len; - buf->release = evcom_buf_destroy; + buf->release = (void (*)(evcom_buf*))free; + buf->base = data + sizeof(evcom_buf); + return buf; } evcom_buf * evcom_buf_new (const char *base, size_t len) { - evcom_buf *buf = evcom_buf_new2(len); + evcom_buf* buf = evcom_buf_new2(len); if (!buf) return NULL; + memcpy(buf->base, base, len); return buf; @@ -124,12 +140,6 @@ close_asap (evcom_descriptor *d) { if (d->fd < 0) return OKAY; - /* In any case we need to feed an event in order - * to get the on_close callback. In the case of EINTR - * we need an event so that we can call close() again. - */ - ev_feed_fd_event(D_LOOP_(d) d->fd, EV_READ); - int r = close(d->fd); if (r < 0) { @@ -146,102 +156,171 @@ close_asap (evcom_descriptor *d) return OKAY; } - + +#define release_write_buffer(writer) \ +do { \ + while (!evcom_queue_empty(&(writer)->out)) { \ + evcom_queue *q = evcom_queue_last(&(writer)->out); \ + evcom_buf *buf = evcom_queue_data(q, evcom_buf, queue); \ + evcom_queue_remove(q); \ + if (buf->release) buf->release(buf); \ + } \ +} while (0) + +static int +close_writer_asap (evcom_writer *writer) +{ + release_write_buffer(writer); + ev_feed_event(D_LOOP_(writer) &writer->write_watcher, EV_WRITE); + return close_asap((evcom_descriptor*)writer); +} + static inline void -release_write_buffer(evcom_stream *stream) +evcom_perror (const char *msg, int errorno) { - while (!evcom_queue_empty(&stream->out)) { - evcom_queue *q = evcom_queue_last(&stream->out); - evcom_buf *buf = evcom_queue_data(q, evcom_buf, queue); - evcom_queue_remove(q); - if (buf->release) buf->release(buf); - } + fprintf(stderr, "(evcom) %s %s\n", msg, strerror(errorno)); } static int -close_stream_asap (evcom_stream *stream) +stream_send__wait_for_buf (evcom_stream *stream) { - release_write_buffer(stream); // needed? - - if (too_many_connections && stream->server) { -#if EV_MULTIPLICITY - struct ev_loop *loop = stream->server->loop; -#endif - evcom_server_attach(EV_A_ stream->server); + if (evcom_queue_empty(&stream->out)) { + if (GOT_CLOSE(stream)) { + stream->send_action = stream_send__drain; + return OKAY; + } + ev_io_stop(D_LOOP_(stream) &stream->write_watcher); + return AGAIN; } - too_many_connections = 0; - int r = close_asap((evcom_descriptor*)stream); - if (r == AGAIN) return AGAIN; - - evcom_stream_detach(stream); + stream->send_action = stream_send__data; return OKAY; } static inline void -evcom_perror (const char *msg, int errorno) +stream__set_recv_closed (evcom_stream *stream) { - fprintf(stderr, "(evcom) %s %s\n", msg, strerror(errorno)); + stream->flags &= ~EVCOM_READABLE; + stream->recvfd = -1; + stream->recv_action = NULL; + ev_io_stop(D_LOOP_(stream) &stream->read_watcher); } -// This is to be called when ever the out is empty -// and we need to change state. static inline void -change_state_for_empty_out (evcom_stream *stream) +stream__set_send_closed (evcom_stream *stream) { - if (GOT_FULL_CLOSE(stream)) { -#if EVCOM_HAVE_GNUTLS - if (SECURE(stream) && READABLE(stream) && WRITABLE(stream)) { - secure_hangup((evcom_descriptor*)stream); - } else -#endif - { - close_stream_asap(stream); - } - return; - } + release_write_buffer(stream); + stream->flags &= ~EVCOM_WRITABLE; + stream->sendfd = -1; + stream->send_action = NULL; + ev_io_stop(D_LOOP_(stream) &stream->write_watcher); +} - if (GOT_HALF_CLOSE(stream)) { - if (WRITABLE(stream)) { - stream->action = recv_send; - recv_send((evcom_descriptor*)stream); - } else { - close_stream_asap(stream); - } - return; - } +static int +stream_send__close_one (evcom_stream *stream) +{ + assert(stream->sendfd >= 0); - if (ATTACHED(stream)) { - ev_io_stop(D_LOOP_(stream) &stream->write_watcher); - } + close(stream->sendfd); + + /* TODO recover from EINTR */ + + stream__set_send_closed(stream); + if (DUPLEX(stream)) stream__set_recv_closed(stream); + + return OKAY; } -static inline void -update_write_buffer_after_send (evcom_stream *stream, ssize_t sent) +static int +stream__close_both (evcom_stream *stream) { - evcom_queue *q = evcom_queue_last(&stream->out); - evcom_buf *buf = evcom_queue_data(q, evcom_buf, queue); - buf->written += sent; + assert(stream->sendfd != stream->recvfd); - if (buf->written == buf->len) { - evcom_queue_remove(q); + assert(stream->sendfd >= 0); + assert(stream->recvfd >= 0); - if (buf->release) buf->release(buf); + close(stream->recvfd); + close(stream->sendfd); - if (evcom_queue_empty(&stream->out)) { - change_state_for_empty_out(stream); - } - } + /* TODO recover from EINTR */ + + stream__set_send_closed(stream); + stream__set_recv_closed(stream); + + return OKAY; +} + +static int +stream_send__close (evcom_stream *stream) +{ + stream->send_action = DUPLEX(stream) ? + stream_send__close_one : stream__close_both; + return OKAY; +} + +static int +stream_recv__close_one (evcom_stream *stream) +{ + assert(stream->recvfd >= 0); + + close(stream->recvfd); + + /* TODO recover from EINTR */ + + stream__set_recv_closed(stream); + if (DUPLEX(stream)) stream__set_send_closed(stream); + + return OKAY; +} + +static int +stream_recv__close (evcom_stream *stream) +{ + stream->recv_action = DUPLEX(stream) ? + stream_recv__close_one : stream__close_both; + return OKAY; } +static int +stream_send__drain (evcom_stream *stream) +{ + if (!GOT_CLOSE(stream)) { + stream->send_action = stream_send__wait_for_buf; + return OKAY; + } + #if EVCOM_HAVE_GNUTLS -/* TODO can this be done without ignoring SIGPIPE? */ -static ssize_t -nosigpipe_push (gnutls_transport_ptr_t data, const void *buf, size_t len) + if (SECURE(stream)) { + stream->send_action = stream_send__gnutls_bye; + return OKAY; + } +#endif + + if (DUPLEX(stream)) { + stream->send_action = stream_send__shutdown; + return OKAY; + } + + stream->send_action = stream_send__close_one; + return OKAY; +} + +static int +stream_send__wait_for_eof (evcom_stream *stream) { - evcom_stream *stream = (evcom_stream*)data; - assert(SECURE(stream)); + if (READABLE(stream)) { + ev_io_stop(D_LOOP_(stream) &stream->write_watcher); + assert(stream->send_action == stream_send__wait_for_eof); + return AGAIN; + } + + stream->send_action = stream_send__close_one; + return OKAY; +} +static inline ssize_t +nosigpipe_send (int fd, const void *buf, size_t len) +{ int flags = 0; #ifdef MSG_NOSIGNAL flags |= MSG_NOSIGNAL; @@ -249,73 +328,99 @@ nosigpipe_push (gnutls_transport_ptr_t data, const void *buf, size_t len) #ifdef MSG_DONTWAIT flags |= MSG_DONTWAIT; #endif - ssize_t r = send(stream->fd, buf, len, flags); + return send(fd, buf, len, flags); +} - return r; +static inline ssize_t +nosigpipe_stream_send (evcom_stream *stream, const void *buf, size_t len) +{ + return write(stream->sendfd, buf, len); } -#define SET_DIRECTION(stream) \ -do { \ - if (0 == gnutls_record_get_direction((stream)->session)) { \ - ev_io_stop(D_LOOP_(stream) &(stream)->write_watcher); \ - ev_io_start(D_LOOP_(stream) &(stream)->read_watcher); \ - } else { \ - ev_io_stop(D_LOOP_(stream) &(stream)->read_watcher); \ - ev_io_start(D_LOOP_(stream) &(stream)->write_watcher); \ - } \ -} while (0) +#if EVCOM_HAVE_GNUTLS +static ssize_t +nosigpipe_push (gnutls_transport_ptr_t data, const void *buf, size_t len) +{ + evcom_stream *stream = (evcom_stream*)data; + assert(SECURE(stream)); -static int -secure_handshake (evcom_descriptor *d) + return nosigpipe_stream_send(stream, buf, len); +} + +static ssize_t +pull (gnutls_transport_ptr_t data, void* buf, size_t len) { - evcom_stream *stream = (evcom_stream*) d; + evcom_stream *stream = (evcom_stream*)data; + assert(SECURE(stream)); + return read(stream->recvfd, buf, len); +} + +static int +stream__handshake (evcom_stream *stream) +{ assert(SECURE(stream)); int r = gnutls_handshake(stream->session); if (gnutls_error_is_fatal(r)) { stream->gnutls_errorno = r; - return close_stream_asap(stream); + stream->send_action = stream_send__close; + stream->recv_action = stream_recv__close; + return OKAY; } + evcom_stream_reset_timeout(stream); + if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN) { - SET_DIRECTION(stream); - stream->action = secure_handshake; + if (0 == gnutls_record_get_direction((stream)->session)) { + ev_io_start(D_LOOP_(stream) &(stream)->read_watcher); + ev_io_stop(D_LOOP_(stream) &(stream)->write_watcher); + } else { + ev_io_stop(D_LOOP_(stream) &(stream)->read_watcher); + ev_io_start(D_LOOP_(stream) &(stream)->write_watcher); + } + assert(stream->recv_action == stream__handshake); + assert(stream->send_action == stream__handshake); return AGAIN; } - stream->action = recv_send; - assert(!CONNECTED(stream)); stream->flags |= EVCOM_CONNECTED; if (stream->on_connect) stream->on_connect(stream); - evcom_stream_reset_timeout(stream); + ev_io_start(D_LOOP_(stream) &stream->read_watcher); + ev_io_start(D_LOOP_(stream) &stream->write_watcher); + + stream->send_action = stream_send__data; + stream->recv_action = stream_recv__data; - return recv_send((evcom_descriptor*)stream); + return OKAY; } static int -secure_hangup (evcom_descriptor *d) +stream_send__gnutls_bye (evcom_stream *stream) { - evcom_stream *stream = (evcom_stream*)d; - assert(SECURE(stream)); - int r = gnutls_bye(stream->session, GNUTLS_SHUT_RDWR); + int r = gnutls_bye(stream->session, GNUTLS_SHUT_WR); if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN) { - SET_DIRECTION(stream); - stream->action = secure_hangup; + assert(1 == gnutls_record_get_direction((stream)->session)); + assert(stream->send_action == stream_send__gnutls_bye); return AGAIN; } if (gnutls_error_is_fatal(r)) { stream->gnutls_errorno = r; + stream->send_action = stream_send__close; + return OKAY; } - return close_stream_asap(stream); + stream->flags &= ~EVCOM_WRITABLE; + + stream->send_action = stream_send__wait_for_eof; + return OKAY; } void @@ -326,58 +431,92 @@ evcom_stream_set_secure_session (evcom_stream *stream, gnutls_session_t session) } #endif /* HAVE GNUTLS */ -static inline int -recv_data (evcom_stream *stream) +static int +stream_recv__wait_for_close (evcom_stream *stream) +{ + assert(!READABLE(stream)); + + if (!WRITABLE(stream)) { + stream->recv_action = stream_recv__close; + return OKAY; + } + + ev_io_stop(D_LOOP_(stream) &stream->read_watcher); + return AGAIN; +} + +static int +stream_recv__wait_for_resume (evcom_stream *stream) +{ + stream->flags |= EVCOM_PAUSED; + ev_io_stop(D_LOOP_(stream) &stream->read_watcher); + assert(stream->recv_action == stream_recv__wait_for_resume); + return AGAIN; +} + +static int +stream_recv__data (evcom_stream *stream) { char buf[EVCOM_CHUNKSIZE]; size_t buf_size = EVCOM_CHUNKSIZE; ssize_t recved; - while (stream->fd >= 0) { - assert(READABLE(stream)); + while (READABLE(stream)) { + assert(CONNECTED(stream)); if (PAUSED(stream)) { - ev_io_stop(D_LOOP_(stream) &stream->read_watcher); - return AGAIN; + stream->recv_action = stream_recv__wait_for_resume; + return OKAY; } - if (!SECURE(stream)) { - recved = recv(stream->fd, buf, buf_size, 0); - } #if EVCOM_HAVE_GNUTLS - else { + if (SECURE(stream)) { recved = gnutls_record_recv(stream->session, buf, buf_size); if (gnutls_error_is_fatal(recved)) { stream->gnutls_errorno = recved; - return close_stream_asap(stream); + stream->recv_action = stream_recv__close; + return OKAY; } if (recved == GNUTLS_E_INTERRUPTED || recved == GNUTLS_E_AGAIN) { - SET_DIRECTION(stream); + if (1 == gnutls_record_get_direction((stream)->session)) { + fprintf(stderr, "(evcom) gnutls recv: unexpected switch direction!\n"); + ev_io_stop(D_LOOP_(stream) &(stream)->read_watcher); + ev_io_start(D_LOOP_(stream) &(stream)->write_watcher); + } return AGAIN; } /* A server may also receive GNUTLS_E_REHANDSHAKE when a client has - * initiated a handshake. In that case the server can only initiate a + * initiated a andshake. In that case the server can only initiate a * handshake or terminate the connection. */ if (recved == GNUTLS_E_REHANDSHAKE) { - if (READABLE(stream) && WRITABLE(stream)) { - stream->action = secure_handshake; - return OKAY; - } else { - stream->gnutls_errorno = GNUTLS_E_REHANDSHAKE; - return close_stream_asap(stream); - } + assert(WRITABLE(stream)); + stream->recv_action = stream__handshake; + stream->send_action = stream__handshake; + return OKAY; } - } + } else #endif /* EVCOM_HAVE_GNUTLS */ + { + recved = read(stream->recvfd, buf, buf_size); + } if (recved < 0) { - if (errno == EAGAIN || errno == EINTR) return AGAIN; + if (errno == EAGAIN || errno == EINTR) { + assert(stream->recv_action == stream_recv__data); + return AGAIN; + } + + if (errno != ECONNRESET) { + evcom_perror("recv()", stream->errorno); + } + stream->errorno = errno; - return close_stream_asap(stream); + stream->recv_action = stream_recv__close; + return OKAY; } evcom_stream_reset_timeout(stream); @@ -393,7 +532,7 @@ recv_data (evcom_stream *stream) if (stream->on_read) stream->on_read(stream, buf, recved); if (recved == 0) { - if (!WRITABLE(stream)) return close_stream_asap(stream); + stream->recv_action = stream_recv__wait_for_close; return OKAY; } } @@ -401,11 +540,11 @@ recv_data (evcom_stream *stream) } static int -send_data (evcom_stream *stream) +stream_send__data (evcom_stream *stream) { ssize_t sent; - while (stream->fd >= 0 && !evcom_queue_empty(&stream->out)) { + while (!evcom_queue_empty(&stream->out)) { assert(WRITABLE(stream)); evcom_queue *q = evcom_queue_last(&stream->out); @@ -414,202 +553,176 @@ send_data (evcom_stream *stream) #if EVCOM_HAVE_GNUTLS if (SECURE(stream)) { sent = gnutls_record_send(stream->session, - buf->base + buf->written, - buf->len - buf->written); + buf->base + buf->written, + buf->len - buf->written); + + if (sent == GNUTLS_E_INTERRUPTED || sent == GNUTLS_E_AGAIN) { + if (0 == gnutls_record_get_direction((stream)->session)) { + fprintf(stderr, "(evcom) gnutls send: unexpected switch direction!\n"); + ev_io_start(D_LOOP_(stream) &(stream)->read_watcher); + ev_io_stop(D_LOOP_(stream) &(stream)->write_watcher); + } + return AGAIN; + } if (gnutls_error_is_fatal(sent)) { stream->gnutls_errorno = sent; - return close_stream_asap(stream); + stream->send_action = stream_send__close; + return OKAY; } } else #endif // EVCOM_HAVE_GNUTLS { - - int flags = 0; -#ifdef MSG_NOSIGNAL - flags |= MSG_NOSIGNAL; -#endif -#ifdef MSG_DONTWAIT - flags |= MSG_DONTWAIT; -#endif - /* TODO use writev() here? */ - sent = send(stream->fd, - buf->base + buf->written, - buf->len - buf->written, - flags); + sent = nosigpipe_stream_send(stream, + buf->base + buf->written, + buf->len - buf->written); } if (sent <= 0) { - switch (errno) { - case EAGAIN: - case EINTR: - return AGAIN; + if (errno == EAGAIN || errno == EINTR) { + assert(stream->send_action == stream_send__data); + return AGAIN; + } - case EPIPE: - stream->flags &= ~EVCOM_WRITABLE; - if (!READABLE(stream)) return close_stream_asap(stream); - return OKAY; + stream->errorno = errno; + evcom_perror("send()", errno); - default: - stream->errorno = errno; - return close_stream_asap(stream); - } + stream->send_action = stream_send__close; + return OKAY; } evcom_stream_reset_timeout(stream); - update_write_buffer_after_send(stream, sent); + assert(sent >= 0); + + buf->written += sent; + + if (buf->written == buf->len) { + evcom_queue_remove(q); + if (buf->release) buf->release(buf); + } } assert(evcom_queue_empty(&stream->out)); - ev_io_stop(D_LOOP_(stream) &stream->write_watcher); - return AGAIN; + stream->send_action = stream_send__drain; + return OKAY; } static int -shutdown_write (evcom_stream *stream) +stream_send__shutdown (evcom_stream *stream) { - int r; - -#if EVCOM_HAVE_GNUTLS - if (SECURE(stream)) { - r = gnutls_bye(stream->session, GNUTLS_SHUT_WR); - - if (gnutls_error_is_fatal(r)) { - stream->gnutls_errorno = r; - return close_stream_asap(stream); - } - - if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN) { - SET_DIRECTION(stream); - } - } -#endif - - r = shutdown(stream->fd, SHUT_WR); + int r = shutdown(stream->sendfd, SHUT_WR); if (r < 0) { stream->errorno = errno; evcom_perror("shutdown()", errno); - return close_stream_asap(stream); + stream->send_action = stream_send__close; + return OKAY; } stream->flags &= ~EVCOM_WRITABLE; + stream->send_action = stream_send__wait_for_eof; return OKAY; } -static int -recv_send (evcom_descriptor *d) -{ - evcom_stream *stream = (evcom_stream*) d; - - int r = AGAIN; - - if (READABLE(stream) && !PAUSED(stream)) { - r = recv_data(stream); - } - - if (stream->fd < 0) return AGAIN; - - if (WRITABLE(stream)) { - if (GOT_HALF_CLOSE(stream) && evcom_queue_empty(&stream->out)) { - - if (READABLE(stream)) { - return shutdown_write(stream); - } else { - return close_stream_asap(stream); - } - - } else { - return send_data(stream); - } - } - - return r; -} - -static inline int -connection_established (evcom_stream *stream) +static int +stream__connection_established (evcom_stream *stream) { - ev_io_start(D_LOOP_(stream) &stream->read_watcher); assert(!CONNECTED(stream)); #if EVCOM_HAVE_GNUTLS if (SECURE(stream)) { - stream->action = secure_handshake; - return secure_handshake((evcom_descriptor*)stream); - } else -#endif /* EVCOM_HAVE_GNUTLS */ + stream->send_action = stream__handshake; + stream->recv_action = stream__handshake; + } else +#endif { stream->flags |= EVCOM_CONNECTED; if (stream->on_connect) stream->on_connect(stream); - stream->action = recv_send; - return recv_send((evcom_descriptor*)stream); + stream->send_action = stream_send__data; + stream->recv_action = stream_recv__data; } + + ev_io_start(D_LOOP_(stream) &stream->write_watcher); + ev_io_start(D_LOOP_(stream) &stream->read_watcher); + + return OKAY; } static int -wait_for_connection (evcom_descriptor *d) +stream_send__wait_for_connection (evcom_stream *stream) { - evcom_stream *stream = (evcom_stream*)d; - - if (!GOT_WRITE_EVENT(d)) { - ev_io_stop(D_LOOP_(stream) &stream->read_watcher); - return AGAIN; - } + assert(DUPLEX(stream)); int connect_error; socklen_t len = sizeof(int); - int r = getsockopt(d->fd, SOL_SOCKET, SO_ERROR, &connect_error, &len); + int r = getsockopt(stream->sendfd, SOL_SOCKET, SO_ERROR, &connect_error, &len); + if (r < 0) { - d->errorno = r; - return close_asap(d); + stream->errorno = r; + stream->send_action = stream_send__close; + return OKAY; } - switch (connect_error) { - case 0: - return connection_established((evcom_stream*)d); - - case EINTR: - case EINPROGRESS: - return AGAIN; + if (connect_error == 0) { + stream->send_action = stream__connection_established; + return OKAY; - default: - d->errorno = connect_error; - return close_asap(d); + } else if (connect_error == EINPROGRESS || connect_error == EINTR) { + assert(stream->send_action == stream_send__wait_for_connection); + return AGAIN; } + + stream->errorno = connect_error; + stream->send_action = stream_send__close; + return OKAY; } static void -assign_file_descriptor (evcom_stream *stream, int fd) +evcom_stream_assign_fds (evcom_stream *stream, int recvfd, int sendfd) { - stream->fd = fd; + assert(recvfd >= 0); + assert(sendfd >= 0); - ev_io_set (&stream->read_watcher, fd, EV_READ); - ev_io_set (&stream->write_watcher, fd, EV_WRITE); + if (recvfd == sendfd) stream->flags |= EVCOM_DUPLEX; + +#ifdef SO_NOSIGPIPE + if (DUPLEX(stream)) { + int flags = 1; + int r = setsockopt(sendfd, SOL_SOCKET, SO_NOSIGPIPE, &flags, sizeof(flags)); + if (r < 0) { + evcom_perror("setsockopt(SO_NOSIGPIPE)", errno); + } + } +#endif + + ev_io_set(&stream->read_watcher, recvfd, EV_READ); + ev_io_set(&stream->write_watcher, sendfd, EV_WRITE); + + stream->recvfd = recvfd; + stream->sendfd = sendfd; + + stream->send_action = stream__connection_established; + stream->recv_action = stream__connection_established; stream->flags |= EVCOM_READABLE; stream->flags |= EVCOM_WRITABLE; #if EVCOM_HAVE_GNUTLS if (SECURE(stream)) { - gnutls_transport_set_lowat(stream->session, 0); + gnutls_transport_set_lowat(stream->session, 0); gnutls_transport_set_push_function(stream->session, nosigpipe_push); - gnutls_transport_set_ptr2(stream->session, - (gnutls_transport_ptr_t)(intptr_t)fd, /* recv */ - stream); /* send */ + gnutls_transport_set_pull_function(stream->session, pull); + gnutls_transport_set_ptr2(stream->session, stream, stream); } -#endif - - stream->action = wait_for_connection; +#endif } - -/* Retruns evcom_stream if a connection could be accepted. +/* Retruns evcom_stream if a connection could be accepted. * The returned stream is not yet attached to the event loop. * Otherwise NULL */ @@ -618,7 +731,7 @@ accept_connection (evcom_server *server) { struct sockaddr address; /* connector's address information */ socklen_t addr_len = sizeof(address); - + int fd = accept(server->fd, &address, &addr_len); if (fd < 0) { switch (errno) { @@ -648,31 +761,22 @@ accept_connection (evcom_server *server) close(fd); return NULL; } - + if (set_nonblock(fd) != 0) { evcom_perror("set_nonblock()", errno); return NULL; } - -#ifdef SO_NOSIGPIPE - int flags = 1; - int r = setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &flags, sizeof(flags)); - if (r < 0) { - evcom_perror("setsockopt()", errno); - return NULL; - } -#endif stream->server = server; - assign_file_descriptor(stream, fd); + evcom_stream_assign_fds(stream, fd, fd); return stream; } -/* Internal callback +/* Internal callback * Called by server->watcher. */ -static int +static int accept_connections (evcom_descriptor *d) { evcom_server *server = (evcom_server *)d; @@ -683,7 +787,6 @@ accept_connections (evcom_descriptor *d) evcom_stream *stream; while (server->fd >= 0 && (stream = accept_connection(server))) { evcom_stream_attach(D_LOOP_(server) stream); - connection_established(stream); } return AGAIN; @@ -747,17 +850,17 @@ evcom_server_listen (evcom_server *server, struct sockaddr *address, int backlog close(fd); return -1; } - + if (listen(fd, backlog) < 0) { server->errorno = errno; evcom_perror("listen()", errno); close(fd); return -1; } - + server->flags |= EVCOM_LISTENING; server->action = accept_connections; - + return 0; } @@ -765,9 +868,12 @@ evcom_server_listen (evcom_server *server, struct sockaddr *address, int backlog * Stops the server. Will not accept new connections. Does not drop * existing connections. */ -void +void evcom_server_close (evcom_server *server) { + ev_io_start(D_LOOP_(server) &server->watcher); + ev_feed_event(D_LOOP_(server) &server->watcher, EV_READ); + close_asap((evcom_descriptor*)server); } @@ -775,9 +881,7 @@ void evcom_server_attach (EV_P_ evcom_server *server) { ev_io_start (EV_A_ &server->watcher); -#if EV_MULTIPLICITY - server->loop = EV_A; -#endif + D_LOOP_SET(server, EV_A); server->flags |= EVCOM_ATTACHED; } @@ -785,33 +889,26 @@ void evcom_server_detach (evcom_server *server) { ev_io_stop (D_LOOP_(server) &server->watcher); -#if EV_MULTIPLICITY - server->loop = NULL; -#endif + D_LOOP_SET(server, NULL); server->flags &= ~EVCOM_ATTACHED; } -static void +static void io_event(EV_P_ ev_io *watcher, int revents) { evcom_descriptor *d = watcher->data; #if EV_MULTIPLICITY assert(d->loop == loop); -#endif +#endif + int r = OKAY; if (revents & EV_ERROR) { d->errorno = 1; - close_asap(d); - } - - if (revents & EV_WRITE) { - d->flags |= EVCOM_GOT_WRITE_EVENT; + r = close_asap(d); } - int r = OKAY; - while (r == OKAY && d->action && d->fd >= 0) { r = d->action(d); } @@ -824,20 +921,27 @@ io_event(EV_P_ ev_io *watcher, int revents) } } -void +static void +evcom_descriptor_init (evcom_descriptor *d) +{ + d->fd = -1; + D_LOOP_SET(d, NULL); + d->flags = 0; + d->errorno = 0; + d->action = NULL; +} + +void evcom_server_init (evcom_server *server) { - server->flags = 0; - server->fd = -1; - server->watcher.data = server; - server->action = NULL; + evcom_descriptor_init((evcom_descriptor*)server); ev_init (&server->watcher, io_event); + server->watcher.data = server; server->on_connection = NULL; - server->on_close = NULL; } /* Internal callback. called by stream->timeout_watcher */ -static void +static void on_timeout (EV_P_ ev_timer *watcher, int revents) { evcom_stream *stream = watcher->data; @@ -854,9 +958,51 @@ on_timeout (EV_P_ ev_timer *watcher, int revents) } if (stream->on_timeout) stream->on_timeout(stream); - // timeout does not automatically kill your connection. you must! + + evcom_stream_force_close(stream); } +static void +stream_event (EV_P_ ev_io *w, int revents) +{ + evcom_stream *stream = w->data; + + if (revents & EV_READ) { + while (stream->recv_action) { + int r = stream->recv_action(stream); + if (r == AGAIN) break; + } + } + + if (revents & EV_WRITE) { + while (stream->send_action) { + int r = stream->send_action(stream); + if (r == AGAIN) break; + } + } + + if (stream->send_action == NULL) { + ev_io_stop(EV_A_ &stream->write_watcher); + } + + if (stream->recv_action == NULL) { + ev_io_stop(EV_A_ &stream->read_watcher); + } + + if (stream->sendfd < 0 && stream->recvfd < 0) { + ev_timer_stop(EV_A_ &stream->timeout_watcher); + + if (too_many_connections && stream->server) { +#if EV_MULTIPLICITY + struct ev_loop *loop = stream->server->loop; +#endif + evcom_server_attach(EV_A_ stream->server); + } + too_many_connections = 0; + + if (stream->on_close) stream->on_close(stream); + } +} /** * If using SSL do consider setting @@ -865,139 +1011,151 @@ on_timeout (EV_P_ ev_timer *watcher, int revents) * gnutls_db_set_store_function (stream->session, _); * gnutls_db_set_ptr (stream->session, _); */ -void +void evcom_stream_init (evcom_stream *stream, float timeout) { - stream->fd = -1; - stream->server = NULL; -#if EV_MULTIPLICITY - stream->loop = NULL; -#endif stream->flags = 0; + stream->errorno = 0; + stream->recvfd = -1; + stream->sendfd = -1; - evcom_queue_init(&stream->out); + // reader things + ev_init(&stream->read_watcher, stream_event); + stream->read_watcher.data = stream; + stream->recv_action = NULL; - ev_init(&stream->write_watcher, io_event); - ev_init(&stream->read_watcher, io_event); + // writer things + ev_init(&stream->write_watcher, stream_event); stream->write_watcher.data = stream; - stream->read_watcher.data = stream; - - stream->errorno = 0; + evcom_queue_init(&stream->out); + stream->send_action = NULL; + // stream things + stream->server = NULL; #if EVCOM_HAVE_GNUTLS stream->gnutls_errorno = 0; stream->session = NULL; -#endif - +#endif ev_timer_init(&stream->timeout_watcher, on_timeout, 0., timeout); - stream->timeout_watcher.data = stream; - - stream->action = NULL; + stream->timeout_watcher.data = stream; stream->on_connect = NULL; - stream->on_read = NULL; - stream->on_drain = NULL; stream->on_timeout = NULL; + stream->on_read = NULL; + stream->on_close = NULL; } -void +void evcom_stream_close (evcom_stream *stream) { - stream->flags |= EVCOM_GOT_HALF_CLOSE; - if (evcom_queue_empty(&stream->out)) { - change_state_for_empty_out(stream); - } -} - -void -evcom_stream_full_close (evcom_stream *stream) -{ - stream->flags |= EVCOM_GOT_FULL_CLOSE; - if (evcom_queue_empty(&stream->out)) { - change_state_for_empty_out(stream); + stream->flags |= EVCOM_GOT_CLOSE; + if (WRITABLE(stream)) { + ev_io_start(D_LOOP_(stream) &stream->write_watcher); } } void evcom_stream_force_close (evcom_stream *stream) { - close_stream_asap(stream); - - // Even if close returned EINTR - stream->action = NULL; - stream->fd = -1; + close(stream->recvfd); + /* XXX What to do on EINTR? */ + stream__set_recv_closed(stream); + + if (!DUPLEX(stream)) close(stream->sendfd); + stream__set_send_closed(stream); evcom_stream_detach(stream); } -void -evcom_stream_write (evcom_stream *stream, evcom_buf *buf) +void +evcom_stream_write (evcom_stream *stream, const char *str, size_t len) { - if (!WRITABLE(stream) || GOT_FULL_CLOSE(stream) || GOT_HALF_CLOSE(stream)) { - assert(0 && "Do not write to a closed stream"); - if (buf->release) buf->release(buf); + if (!WRITABLE(stream) || GOT_CLOSE(stream)) { + assert(0 && "Do not write to a closed stream"); return; } - evcom_queue_insert_head(&stream->out, &buf->queue); - buf->written = 0; + ssize_t sent = 0; + + if ( stream->send_action == stream_send__wait_for_buf + && evcom_queue_empty(&stream->out) + ) + { + assert(CONNECTED(stream)); +#if EVCOM_HAVE_GNUTLS + if (SECURE(stream)) { + sent = gnutls_record_send(stream->session, str, len); + + if (gnutls_error_is_fatal(sent)) { + stream->gnutls_errorno = sent; + goto close; + } + } else +#endif // EVCOM_HAVE_GNUTLS + { + /* TODO use writev() here? */ + sent = nosigpipe_stream_send(stream, str, len); + } + + if (sent < 0) { + switch (errno) { + case EINTR: + case EAGAIN: + sent = 0; + break; + + default: + stream->errorno = errno; + evcom_perror("send()", stream->errorno); + goto close; + } + } + } /* TODO else { memcpy to last buffer on head } */ + + assert(sent >= 0); + if ((size_t)sent == len) return; /* sent the whole buffer */ + + len -= sent; + str += sent; + + evcom_buf *b = evcom_buf_new(str, len); + evcom_queue_insert_head(&stream->out, &b->queue); + b->written = 0; + + assert(stream->sendfd >= 0); if (ATTACHED(stream)) { ev_io_start(D_LOOP_(stream) &stream->write_watcher); - if (stream->action == recv_send) { - send_data(stream); - } } -} - -void -evcom_stream_reset_timeout (evcom_stream *stream) -{ - ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher); -} + return; -static void -free_simple_buf (evcom_buf *buf) -{ - free(buf->base); - free(buf); +close: + stream->send_action = stream_send__close; + stream->recv_action = stream_recv__close; + if (ATTACHED(stream)) { + ev_io_start(D_LOOP_(stream) &stream->write_watcher); + } } -/* Writes a string to the stream. - * NOTE: Allocates memory. Avoid for performance applications. - */ void -evcom_stream_write_simple (evcom_stream *stream, const char *str, size_t len) +evcom_stream_reset_timeout (evcom_stream *stream) { - evcom_buf *buf = malloc(sizeof(evcom_buf)); - buf->release = free_simple_buf; - buf->base = strdup(str); - buf->len = len; - - evcom_stream_write(stream, buf); + ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher); } void evcom_stream_attach (EV_P_ evcom_stream *stream) { -#if EV_MULTIPLICITY - stream->loop = EV_A; -#endif + D_LOOP_SET(stream, EV_A); stream->flags |= EVCOM_ATTACHED; ev_timer_again(EV_A_ &stream->timeout_watcher); - if (!CONNECTED(stream)) { - ev_io_start(EV_A_ &stream->write_watcher); - } else { - if (READABLE(stream) && !PAUSED(stream)) { - ev_io_start(EV_A_ &stream->read_watcher); - } - - if (WRITABLE(stream)) { - ev_io_start(EV_A_ &stream->write_watcher); - } + if (READABLE(stream)) { + ev_io_start(EV_A_ &stream->read_watcher); + } - ev_feed_fd_event(D_LOOP_(stream) stream->fd, EV_WRITE); + if (WRITABLE(stream)) { + ev_io_start(EV_A_ &stream->write_watcher); } } @@ -1007,28 +1165,29 @@ evcom_stream_detach (evcom_stream *stream) ev_io_stop(D_LOOP_(stream) &stream->write_watcher); ev_io_stop(D_LOOP_(stream) &stream->read_watcher); ev_timer_stop(D_LOOP_(stream) &stream->timeout_watcher); -#if EV_MULTIPLICITY - stream->loop = NULL; -#endif + D_LOOP_SET(stream, NULL); stream->flags &= ~EVCOM_ATTACHED; } void evcom_stream_read_pause (evcom_stream *stream) { - ev_io_stop(D_LOOP_(stream) &stream->read_watcher); - ev_clear_pending(D_LOOP_(stream) &stream->read_watcher); stream->flags |= EVCOM_PAUSED; + if (stream->recv_action == stream_recv__data) { + ev_io_stop(D_LOOP_(stream) &stream->read_watcher); + stream->recv_action = stream_recv__wait_for_resume; + } } void evcom_stream_read_resume (evcom_stream *stream) { - evcom_stream_reset_timeout(stream); - stream->flags &= ~EVCOM_PAUSED; - - if (READABLE(stream)) { + evcom_stream_reset_timeout(stream); + if (stream->recv_action == stream_recv__wait_for_resume) { + stream->recv_action = stream_recv__data; + } + if (ATTACHED(stream) && READABLE(stream)) { ev_io_start(D_LOOP_(stream) &stream->read_watcher); } } @@ -1050,11 +1209,6 @@ evcom_stream_connect (evcom_stream *stream, struct sockaddr *address) close(fd); return -1; } - -#ifdef SO_NOSIGPIPE - int flags = 1; - setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &flags, sizeof(flags)); -#endif r = connect(fd, address, address_length(address)); @@ -1065,23 +1219,53 @@ evcom_stream_connect (evcom_stream *stream, struct sockaddr *address) return -1; } - assign_file_descriptor(stream, fd); + evcom_stream_assign_fds(stream, fd, fd); + + stream->send_action = stream_send__wait_for_connection; + stream->recv_action = NULL; + + return 0; +} + +int evcom_stream_pair (evcom_stream *a, evcom_stream *b) +{ + int sv[2]; + int old_errno; + + int r = socketpair(PF_LOCAL, SOCK_STREAM, 0, sv); + if (r < 0) return -1; + + r = set_nonblock(sv[0]); + if (r < 0) goto set_nonblock_error; + r = set_nonblock(sv[1]); + if (r < 0) goto set_nonblock_error; + + evcom_stream_assign_fds(a, sv[0], sv[0]); + evcom_stream_assign_fds(b, sv[1], sv[1]); return 0; + +set_nonblock_error: + old_errno = errno; + evcom_perror("set_nonblock()", errno); + close(sv[0]); + close(sv[1]); + errno = old_errno; + return -1; } enum evcom_stream_state evcom_stream_state (evcom_stream *stream) { - if (stream->fd < 0 && stream->flags == 0) return EVCOM_INITIALIZED; + if (stream->recvfd < 0 && stream->sendfd && stream->flags == 0) { + return EVCOM_INITIALIZED; + } - if (stream->fd < 0) return EVCOM_CLOSED; + if (stream->recvfd < 0 && stream->sendfd < 0) return EVCOM_CLOSED; if (!CONNECTED(stream)) return EVCOM_CONNECTING; - if (GOT_FULL_CLOSE(stream)) return EVCOM_CLOSING; - - if (GOT_HALF_CLOSE(stream)) { + if (GOT_CLOSE(stream)) { if (READABLE(stream)) { return EVCOM_CONNECTED_RO; } else { @@ -1098,3 +1282,216 @@ evcom_stream_state (evcom_stream *stream) return EVCOM_CLOSING; } +static int +reader_recv (evcom_descriptor *d) +{ + evcom_reader* reader = (evcom_reader*) d; + + char buf[EVCOM_CHUNKSIZE]; + size_t buf_size = EVCOM_CHUNKSIZE; + ssize_t recved; + + while (reader->fd >= 0) { + recved = read(reader->fd, buf, buf_size); + + if (recved < 0) { + if (errno == EAGAIN || errno == EINTR) return AGAIN; + reader->errorno = errno; + evcom_perror("read()", reader->errorno); + return close_asap(d); + } + + /* NOTE: EOF is signaled with recved == 0 on callback */ + if (reader->on_read) reader->on_read(reader, buf, recved); + + if (recved == 0) return close_asap(d); + } + return AGAIN; +} + +void +evcom_reader_init (evcom_reader *reader) +{ + evcom_descriptor_init((evcom_descriptor*)reader); + + reader->on_close = NULL; + reader->on_read = NULL; + + ev_init(&reader->read_watcher, io_event); + reader->read_watcher.data = reader; +} + +void +evcom_reader_set (evcom_reader *reader, int fd) +{ + assert(fd >= 0); + reader->fd = fd; + + ev_io_set(&reader->read_watcher, fd, EV_READ); + reader->action = reader_recv; +} + +void +evcom_reader_attach (EV_P_ evcom_reader *reader) +{ + ev_io_start(EV_A_ &reader->read_watcher); + D_LOOP_SET(reader, EV_A); +} + +void +evcom_reader_detach (evcom_reader *reader) +{ + ev_io_stop(D_LOOP_(reader) &reader->read_watcher); + D_LOOP_SET(reader, NULL); +} + +void +evcom_reader_close (evcom_reader *reader) +{ + ev_io_start(D_LOOP_(reader) &reader->read_watcher); + ev_feed_event(D_LOOP_(reader) &reader->read_watcher, EV_READ); + + close_asap((evcom_descriptor*)reader); +} + +static int +writer_send (evcom_descriptor *d) +{ + evcom_writer* writer = (evcom_writer*) d; + assert(writer->fd >= 0); + + while (!evcom_queue_empty(&writer->out)) { + evcom_queue *q = evcom_queue_last(&writer->out); + evcom_buf *buf = evcom_queue_data(q, evcom_buf, queue); + + ssize_t sent = write(writer->fd, buf->base + buf->written, + buf->len - buf->written); + + if (sent < 0) { + switch (errno) { + case ECONNRESET: + case EPIPE: + return close_writer_asap(writer); + + case EINTR: + case EAGAIN: + sent = 0; + return AGAIN; + + default: + writer->errorno = errno; + evcom_perror("send()", writer->errorno); + return close_writer_asap(writer); + } + } + assert(sent >= 0); + + buf->written += sent; + + if (buf->written == buf->len) { + evcom_queue_remove(q); + if (buf->release) buf->release(buf); + } + } + + if (GOT_CLOSE(writer)) { + assert(evcom_queue_empty(&writer->out)); + return close_writer_asap(writer); + } else { + ev_io_stop(D_LOOP_(writer) &writer->write_watcher); + return AGAIN; + } +} + +void +evcom_writer_init (evcom_writer* writer) +{ + evcom_descriptor_init((evcom_descriptor*)writer); + + writer->on_close = NULL; + + ev_init(&writer->write_watcher, io_event); + writer->write_watcher.data = writer; + + evcom_queue_init(&writer->out); +} + +void +evcom_writer_set (evcom_writer* writer, int fd) +{ + assert(fd >= 0); + writer->fd = fd; + + ev_io_set(&writer->write_watcher, fd, EV_WRITE); + writer->action = writer_send; +} + +void +evcom_writer_attach (EV_P_ evcom_writer* writer) +{ + if (!evcom_queue_empty(&writer->out)) { + ev_io_start (EV_A_ &writer->write_watcher); + } + D_LOOP_SET(writer, EV_A); +} + +void +evcom_writer_detach (evcom_writer* writer) +{ + ev_io_stop(D_LOOP_(writer) &writer->write_watcher); + D_LOOP_SET(writer, NULL); +} + +void +evcom_writer_write (evcom_writer* writer, const char* buf, size_t len) +{ + assert(writer->fd >= 0); + + ssize_t sent = 0; + + if (evcom_queue_empty(&writer->out)) { + sent = write(writer->fd, buf, len); + + if (sent < 0) { + switch (errno) { + case ECONNRESET: + case EPIPE: + goto close; + + case EINTR: + case EAGAIN: + sent = 0; + break; + + default: + writer->errorno = errno; + evcom_perror("send()", writer->errorno); + goto close; + } + } + } /* TODO else { memcpy to last buffer on head } */ + + assert(sent >= 0); + if ((size_t)sent == len) return; /* sent the whole buffer */ + + len -= sent; + buf += sent; + + evcom_buf *b = evcom_buf_new(buf, len); + evcom_queue_insert_head(&writer->out, &b->queue); + b->written = 0; + + assert(writer->fd >= 0); + ev_io_start(D_LOOP_(writer) &writer->write_watcher); + return; + +close: + close_writer_asap(writer); +} + +void +evcom_writer_close (evcom_writer* writer) +{ + writer->flags |= EVCOM_GOT_CLOSE; + if (evcom_queue_empty(&writer->out)) close_writer_asap(writer); +} diff --git a/deps/evcom/evcom.h b/deps/evcom/evcom.h index f34dc3c73c..1bbfd03f15 100644 --- a/deps/evcom/evcom.h +++ b/deps/evcom/evcom.h @@ -33,7 +33,7 @@ #ifdef __cplusplus extern "C" { -#endif +#endif #ifndef EVCOM_HAVE_GNUTLS # define EVCOM_HAVE_GNUTLS 0 @@ -52,12 +52,11 @@ extern "C" { #define EVCOM_LISTENING 0x0002 #define EVCOM_CONNECTED 0x0004 #define EVCOM_SECURE 0x0008 -#define EVCOM_GOT_HALF_CLOSE 0x0010 -#define EVCOM_GOT_FULL_CLOSE 0x0020 +#define EVCOM_DUPLEX 0x0010 +#define EVCOM_GOT_CLOSE 0x0020 #define EVCOM_PAUSED 0x0040 #define EVCOM_READABLE 0x0080 #define EVCOM_WRITABLE 0x0100 -#define EVCOM_GOT_WRITE_EVENT 0x0200 enum evcom_stream_state { EVCOM_INITIALIZED , EVCOM_CONNECTING @@ -91,95 +90,112 @@ typedef struct evcom_buf { # define EVCOM_LOOP #endif -#define EVCOM_DESCRIPTOR(type) \ - unsigned int flags; /* private */ \ - int (*action) (struct evcom_descriptor*); /* private */ \ - int errorno; /* read-only */ \ - int fd; /* read-only */ \ - EVCOM_LOOP /* read-only */ \ - void *data; /* public */ \ - void (*on_close) (struct type*); /* public */ +#define EVCOM_DESCRIPTOR(type) \ + /* private */ unsigned int flags; \ + /* private */ int (*action) (struct evcom_descriptor*); \ + /* read-only */ int errorno; \ + /* read-only */ int fd; \ + /* read-only */ EVCOM_LOOP \ + /* public */ void *data; \ + /* public */ void (*on_close) (struct type*); +/* abstract base class */ typedef struct evcom_descriptor { EVCOM_DESCRIPTOR(evcom_descriptor) } evcom_descriptor; -typedef struct evcom_server { - EVCOM_DESCRIPTOR(evcom_server) - - /* PRIVATE */ - ev_io watcher; +typedef struct evcom_reader { + EVCOM_DESCRIPTOR(evcom_reader) + ev_io read_watcher; /* private */ + void (*on_read) (struct evcom_reader*, const void* buf, size_t len); /* public */ +} evcom_reader; - /* PUBLIC */ - struct evcom_stream* - (*on_connection)(struct evcom_server *, struct sockaddr *remote_addr); -} evcom_server; +typedef struct evcom_writer { + EVCOM_DESCRIPTOR(evcom_writer) + ev_io write_watcher; /* private */ + evcom_queue out; /* private */ +} evcom_writer; typedef struct evcom_stream { - EVCOM_DESCRIPTOR(evcom_stream) - - /* PRIVATE */ - ev_io write_watcher; + /* PRIVATE */ + EVCOM_LOOP + int errorno; + unsigned int flags; + evcom_queue out; ev_io read_watcher; + ev_io write_watcher; + int (*send_action) (struct evcom_stream*); + int (*recv_action) (struct evcom_stream*); ev_timer timeout_watcher; #if EVCOM_HAVE_GNUTLS gnutls_session_t session; #endif - /* READ-ONLY */ + /* READ-ONLY */ + int recvfd; + int sendfd; struct evcom_server *server; - evcom_queue out; #if EVCOM_HAVE_GNUTLS int gnutls_errorno; #endif /* PUBLIC */ void (*on_connect) (struct evcom_stream *); - void (*on_read) (struct evcom_stream *, const void *buf, size_t count); - void (*on_drain) (struct evcom_stream *); void (*on_timeout) (struct evcom_stream *); + void (*on_read) (struct evcom_stream *, const void* buf, size_t len); + void (*on_close) (struct evcom_stream *); + void *data; } evcom_stream; +typedef struct evcom_server { + EVCOM_DESCRIPTOR(evcom_server) + + /* PRIVATE */ + ev_io watcher; + + /* PUBLIC */ + struct evcom_stream* + (*on_connection)(struct evcom_server *, struct sockaddr *remote_addr); +} evcom_server; + +void evcom_reader_init (evcom_reader*); +void evcom_reader_set (evcom_reader*, int fd); +void evcom_reader_attach (EV_P_ evcom_reader*); +void evcom_reader_detach (evcom_reader*); +void evcom_reader_close (evcom_reader*); + +void evcom_writer_init (evcom_writer*); +void evcom_writer_set (evcom_writer*, int fd); +void evcom_writer_attach (EV_P_ evcom_writer*); +void evcom_writer_detach (evcom_writer*); +void evcom_writer_write (evcom_writer*, const char *str, size_t len); +void evcom_writer_close (evcom_writer*); + void evcom_server_init (evcom_server *); int evcom_server_listen (evcom_server *, struct sockaddr *address, int backlog); void evcom_server_attach (EV_P_ evcom_server *); void evcom_server_detach (evcom_server *); -void evcom_server_close (evcom_server *); // synchronous +void evcom_server_close (evcom_server *); void evcom_stream_init (evcom_stream *, float timeout); + + int evcom_stream_pair (evcom_stream *a, evcom_stream *b); int evcom_stream_connect (evcom_stream *, struct sockaddr *address); + void evcom_stream_attach (EV_P_ evcom_stream *); void evcom_stream_detach (evcom_stream *); void evcom_stream_read_resume (evcom_stream *); void evcom_stream_read_pause (evcom_stream *); - -/* Resets the timeout to stay alive for another stream->timeout seconds - */ +/* Resets the timeout to stay alive for another stream->timeout seconds */ void evcom_stream_reset_timeout (evcom_stream *); - -/* Writes a buffer to the stream. - */ -void evcom_stream_write (evcom_stream *, evcom_buf *); - -void evcom_stream_write_simple (evcom_stream *, const char *str, size_t len); - +void evcom_stream_write (evcom_stream *, const char *str, size_t len); /* Once the write buffer is drained, evcom_stream_close will shutdown the * writing end of the stream and will close the read end once the server - * replies with an EOF. + * replies with an EOF. */ void evcom_stream_close (evcom_stream *); -/* Do not wait for the server to reply with EOF. This will only be called - * once the write buffer is drained. - * Warning: For TCP stream, the OS kernel may (should) reply with RST - * packets if this is called when data is still being received from the - * server. - */ -void evcom_stream_full_close (evcom_stream *); - -/* The most extreme measure. - * Will not wait for the write queue to complete. - */ +/* Will not wait for the write queue to complete. Closes both directions */ void evcom_stream_force_close (evcom_stream *); @@ -195,9 +211,8 @@ void evcom_stream_set_secure_session (evcom_stream *, gnutls_session_t); enum evcom_stream_state evcom_stream_state (evcom_stream *stream); -evcom_buf * evcom_buf_new (const char* base, size_t len); -evcom_buf * evcom_buf_new2 (size_t len); -void evcom_buf_destroy (evcom_buf *); +evcom_buf* evcom_buf_new (const char* base, size_t len); +evcom_buf* evcom_buf_new2 (size_t len); EV_INLINE void evcom_queue_init (evcom_queue *q) @@ -235,5 +250,5 @@ evcom_queue_remove (evcom_queue *x) #ifdef __cplusplus } -#endif +#endif #endif /* evcom_h */ diff --git a/deps/evcom/recv_states.dot b/deps/evcom/recv_states.dot new file mode 100644 index 0000000000..52f9624230 --- /dev/null +++ b/deps/evcom/recv_states.dot @@ -0,0 +1,37 @@ +strict digraph recv_states { + start [peripheries=2]; + end [peripheries=2]; + handshake; + recv_data; + wait_for_resume; + wait_for_close; + close_one; + close_both; + + node [label="", shape="box", height=0.1, width=0.1]; + close; + + + + start -> handshake [label="tls"]; + start -> recv_data; + + handshake -> close [label="error"]; + handshake -> recv_data; + + recv_data -> handshake [label="rehandshake"]; + recv_data -> wait_for_resume [label="pause"]; + recv_data -> wait_for_close [label="eof"]; + recv_data -> close [label="error"]; + + wait_for_resume -> recv_data; + + wait_for_close -> close; + + close -> close_one [label="duplex"]; + close -> close_both; + + close_one -> end; + close_both -> end; + +} diff --git a/deps/evcom/send_states.dot b/deps/evcom/send_states.dot new file mode 100644 index 0000000000..bb913736c0 --- /dev/null +++ b/deps/evcom/send_states.dot @@ -0,0 +1,65 @@ +strict digraph send_states { + start [peripheries=2]; + end [peripheries=2]; + connection_established; + handshake; + send_data; + shutdown; + gnutls_bye; + close_one; + close_both; + + wait_for_connect; + wait_for_buf; + wait_for_eof; + + node [label="", shape="box", height=0.1, width=0.1]; + close; + drain; + hangup; + hangup_unsecure; + + + + start -> wait_for_connect [label="duplex"]; + start -> connection_established; + + wait_for_connect -> connection_established; + wait_for_connect -> close [label="error"]; + + connection_established -> handshake [label="tls"]; + connection_established -> send_data; + + handshake -> close [label="error"]; + handshake -> send_data; + + send_data -> close [label="error"]; + send_data -> drain [label="drain"]; + + drain -> wait_for_buf; + drain -> hangup [label="got_close"]; + + wait_for_buf -> send_data; + wait_for_buf -> drain [label="empty_buf"]; + + hangup -> gnutls_bye [label="tls"]; + hangup -> hangup_unsecure; + + gnutls_bye -> wait_for_eof; + gnutls_bye -> close [label="error"]; + + hangup_unsecure -> shutdown [label="duplex"]; + hangup_unsecure -> close_one; + + shutdown -> wait_for_eof; + shutdown -> close [label="error"]; + + wait_for_eof -> close_one; + close_one -> wait_for_eof [label="readable"]; + + close -> close_both; + close -> close_one [label="duplex"]; + + close_both -> end; + close_one -> end; +} diff --git a/deps/evcom/test/echo.c b/deps/evcom/test/echo.c index 969cefb57d..4e05f14831 100644 --- a/deps/evcom/test/echo.c +++ b/deps/evcom/test/echo.c @@ -12,7 +12,9 @@ #include #include -#include +#if EVCOM_HAVE_GNUTLS +# include +#endif #define HOST "127.0.0.1" #define SOCKFILE "/tmp/oi.sock" @@ -46,7 +48,7 @@ on_peer_read (evcom_stream *stream, const void *base, size_t len) { if(len == 0) return; - evcom_stream_write_simple(stream, base, len); + evcom_stream_write(stream, base, len); } static evcom_stream* diff --git a/deps/evcom/test/test.c b/deps/evcom/test/test.c index 9d789296f1..6b4b765fa0 100644 --- a/deps/evcom/test/test.c +++ b/deps/evcom/test/test.c @@ -16,13 +16,20 @@ # include #endif -#define MARK_PROGRESS write(STDERR_FILENO, ".", 1) +#undef MAX +#define MAX(a,b) ((a) > (b) ? (a) : (b)) + +#undef MIN +#define MIN(a,b) ((a) < (b) ? (a) : (b)) + +#define MARK_PROGRESS(c,cur,max) \ + if (cur % (MAX(max,50)/50) == 0) write(STDERR_FILENO, c, 1) #define SOCKFILE "/tmp/oi.sock" #define PORT 5000 static evcom_server server; -static int nconnections; +static int nconnections; static int use_tls; static int got_server_close; @@ -36,7 +43,7 @@ common_on_server_close (evcom_server *s) evcom_server_detach(s); } -static void +static void common_on_peer_close (evcom_stream *stream) { assert(EVCOM_CLOSED == evcom_stream_state(stream)); @@ -50,14 +57,14 @@ common_on_peer_close (evcom_stream *stream) free(stream); } -static void +static void common_on_client_timeout (evcom_stream *stream) { assert(stream); printf("client connection timeout\n"); } -static void +static void common_on_peer_timeout (evcom_stream *stream) { assert(stream); @@ -110,12 +117,12 @@ void anon_tls_client (evcom_stream *stream) #define PING "PING" #define PONG "PONG" -#define EXCHANGES 500 +#define EXCHANGES 500 #define PINGPONG_TIMEOUT 5.0 -static int successful_ping_count; +static int successful_ping_count; -static void +static void pingpong_on_peer_read (evcom_stream *stream, const void *base, size_t len) { if (len == 0) { @@ -128,10 +135,10 @@ pingpong_on_peer_read (evcom_stream *stream, const void *base, size_t len) buf[len] = 0; printf("server got message: %s\n", buf); - evcom_stream_write_simple(stream, PONG, sizeof PONG); + evcom_stream_write(stream, PONG, sizeof PONG); } -static void +static void pingpong_on_client_close (evcom_stream *stream) { assert(EVCOM_CLOSED == evcom_stream_state(stream)); @@ -141,7 +148,7 @@ pingpong_on_client_close (evcom_stream *stream) evcom_stream_detach(stream); } -static evcom_stream* +static evcom_stream* pingpong_on_server_connection (evcom_server *_server, struct sockaddr *addr) { assert(_server == &server); @@ -166,15 +173,15 @@ pingpong_on_server_connection (evcom_server *_server, struct sockaddr *addr) return stream; } -static void +static void pingpong_on_client_connect (evcom_stream *stream) { printf("client connected. sending ping\n"); - evcom_stream_write_simple(stream, PING, sizeof PING); + evcom_stream_write(stream, PING, sizeof PING); assert(EVCOM_CONNECTED_RW == evcom_stream_state(stream)); } -static void +static void pingpong_on_client_read (evcom_stream *stream, const void *base, size_t len) { if(len == 0) { @@ -188,17 +195,17 @@ pingpong_on_client_read (evcom_stream *stream, const void *base, size_t len) strncpy(buf, base, len); buf[len] = 0; printf("client got message: %s\n", buf); - + assert(strcmp(buf, PONG) == 0); if (++successful_ping_count > EXCHANGES) { evcom_stream_close(stream); return; - } + } - if (successful_ping_count % (EXCHANGES/20) == 0) MARK_PROGRESS; + MARK_PROGRESS(".", successful_ping_count, EXCHANGES); - evcom_stream_write_simple(stream, PING, sizeof PING); + evcom_stream_write(stream, PING, sizeof PING); } int @@ -206,13 +213,13 @@ pingpong (struct sockaddr *address) { int r; evcom_stream client; - + successful_ping_count = 0; nconnections = 0; got_server_close = 0; - printf("sizeof(evcom_server): %d\n", sizeof(evcom_server)); - printf("sizeof(evcom_stream): %d\n", sizeof(evcom_stream)); + printf("sizeof(evcom_server): %d\n", (int)sizeof(evcom_server)); + printf("sizeof(evcom_stream): %d\n", (int)sizeof(evcom_stream)); evcom_server_init(&server); server.on_connection = pingpong_on_server_connection; @@ -253,17 +260,17 @@ pingpong (struct sockaddr *address) #define NCONN 50 #define CONNINT_TIMEOUT 10.0 -static void +static void send_bye_and_close(evcom_stream *stream, const void *base, size_t len) { assert(base); assert(len == 0); - evcom_stream_write_simple(stream, "BYE", 3); + evcom_stream_write(stream, "BYE", 3); printf("server wrote bye\n"); evcom_stream_close(stream); } -static evcom_stream* +static evcom_stream* connint_on_connection(evcom_server *_server, struct sockaddr *addr) { assert(_server == &server); @@ -284,21 +291,21 @@ connint_on_connection(evcom_server *_server, struct sockaddr *addr) return stream; } -static void +static void connint_on_client_connect (evcom_stream *stream) { printf("on client connection\n"); evcom_stream_close(stream); } -static void +static void connint_on_client_close (evcom_stream *stream) { evcom_stream_close(stream); // already closed, but it shouldn't crash if we try to do it again printf("client connection closed\n"); - if (nconnections % (NCONN/20) == 0) MARK_PROGRESS; + MARK_PROGRESS(".", nconnections, NCONN); if(++nconnections == NCONN) { evcom_server_close(&server); @@ -308,7 +315,7 @@ connint_on_client_close (evcom_stream *stream) evcom_stream_detach(stream); } -static void +static void connint_on_client_read (evcom_stream *stream, const void *base, size_t len) { if (len == 0) { @@ -321,12 +328,12 @@ connint_on_client_read (evcom_stream *stream, const void *base, size_t len) buf[len] = 0; printf("client got message: %s\n", buf); - + assert(strcmp(buf, "BYE") == 0); evcom_stream_close(stream); } -int +int connint (struct sockaddr *address) { int r; @@ -367,6 +374,365 @@ connint (struct sockaddr *address) } +static evcom_reader reader; +static evcom_writer writer; +static int reader_got_close = 0; +static int reader_got_eof = 0; +static int reader_got_hello = 0; +static int reader_cnt = 0; +static int writer_got_close = 0; +#define PIPE_MSG "hello world" +#define PIPE_CNT 5000 + +static void +reader_read (evcom_reader *r, const void *str, size_t len) +{ + assert(r == &reader); + + if (len == 0) { + reader_got_eof = 1; + return; + } + + assert(len == strlen(PIPE_MSG)); + + if (strncmp(str, PIPE_MSG, strlen(PIPE_MSG)) == 0) { + reader_got_hello = 1; + } + + if (++reader_cnt < PIPE_CNT) { + MARK_PROGRESS(".", reader_cnt, PIPE_CNT); + evcom_writer_write(&writer, PIPE_MSG, strlen(PIPE_MSG)); + } else { + evcom_writer_close(&writer); + } +} + +static void +reader_close (evcom_reader *r) +{ + assert(r == &reader); + reader_got_close = 1; + evcom_reader_detach(r); +} + +static void +writer_close (evcom_writer *w) +{ + assert(w == &writer); + writer_got_close = 1; + evcom_writer_detach(w); +} + +int +pipe_stream (void) +{ + reader_cnt = 0; + reader_got_close = 0; + reader_got_hello = 0; + reader_got_eof = 0; + writer_got_close = 0; + + int pipefd[2]; + int r = pipe(pipefd); + if (r < 0) { + perror("pipe()"); + return -1; + } + + evcom_reader_init(&reader); + reader.on_read = reader_read; + reader.on_close = reader_close; + evcom_reader_set(&reader, pipefd[0]); + evcom_reader_attach(EV_DEFAULT_ &reader); + + evcom_writer_init(&writer); + writer.on_close = writer_close; + evcom_writer_set(&writer, pipefd[1]); + evcom_writer_attach(EV_DEFAULT_ &writer); + + evcom_writer_write(&writer, PIPE_MSG, strlen(PIPE_MSG)); + + ev_loop(EV_DEFAULT_ 0); + + assert(reader_got_close); + assert(reader_got_hello); + assert(reader_got_eof); + assert(writer_got_close); + assert(reader_cnt == PIPE_CNT); + + return 0; +} + +#define PAIR_PINGPONG_TIMEOUT 5000.0 +#define PAIR_PINGPONG_EXCHANGES 50 +static int a_got_close; +static int a_got_connect; +static int b_got_close; +static int b_got_connect; +static int pair_pingpong_cnt; +static evcom_stream a, b; + +void a_connect (evcom_stream *stream) +{ + assert(stream == &a); + a_got_connect = 1; +} + +void a_close (evcom_stream *stream) +{ + evcom_stream_detach(stream); + assert(stream == &a); + a_got_close = 1; + + assert(stream->errorno == 0); +#if EVCOM_HAVE_GNUTLS + if (stream->gnutls_errorno) { + fprintf(stderr, "\nGNUTLS ERROR: %s\n", gnutls_strerror(stream->gnutls_errorno)); + } + assert(stream->gnutls_errorno == 0); + if (use_tls) gnutls_deinit(stream->session); +#endif +} + +void a_read (evcom_stream *stream, const void *buf, size_t len) +{ + assert(stream == &a); + if (len == 0) return; + + assert(len == strlen(PONG)); + assert(strncmp(buf, PONG, strlen(PONG)) == 0); + + if (++pair_pingpong_cnt < PAIR_PINGPONG_EXCHANGES) { + evcom_stream_write(&a, PING, strlen(PING)); + } else if (pair_pingpong_cnt == PAIR_PINGPONG_EXCHANGES) { + evcom_stream_close(stream); + } + + MARK_PROGRESS(".", pair_pingpong_cnt, PAIR_PINGPONG_EXCHANGES); +} + +void b_connect (evcom_stream *stream) +{ + assert(stream == &b); + b_got_connect = 1; +} + +void b_close (evcom_stream *stream) +{ + evcom_stream_detach(stream); + assert(stream == &b); + b_got_close = 1; + + assert(stream->errorno == 0); +#if EVCOM_HAVE_GNUTLS + if (stream->gnutls_errorno) { + fprintf(stderr, "\nGNUTLS ERROR: %s\n", gnutls_strerror(stream->gnutls_errorno)); + } + assert(stream->gnutls_errorno == 0); + if (use_tls) gnutls_deinit(stream->session); +#endif +} + +void b_read (evcom_stream *stream, const void *buf, size_t len) +{ + assert(stream == &b); + if (len == 0) { + evcom_stream_close(stream); + return; + } + + assert(len == strlen(PING)); + assert(strncmp(buf, PING, strlen(PING)) == 0); + + evcom_stream_write(&b, PONG, strlen(PONG)); +} + +int +pair_pingpong () +{ + a_got_close = 0; + a_got_connect = 0; + b_got_close = 0; + b_got_connect = 0; + pair_pingpong_cnt = 0; + + evcom_stream_init(&a, PAIR_PINGPONG_TIMEOUT); + a.on_close = a_close; + a.on_connect = a_connect; + a.on_read = a_read; +#if EVCOM_HAVE_GNUTLS + if (use_tls) anon_tls_client(&a); +#endif + + evcom_stream_init(&b, PAIR_PINGPONG_TIMEOUT); + b.on_close = b_close; + b.on_connect = b_connect; + b.on_read = b_read; +#if EVCOM_HAVE_GNUTLS + if (use_tls) anon_tls_server(&b); +#endif + + int r = evcom_stream_pair(&a, &b); + assert(r == 0); + + evcom_stream_attach(EV_DEFAULT_ &a); + evcom_stream_attach(EV_DEFAULT_ &b); + + evcom_stream_write(&a, PING, strlen(PING)); + + ev_loop(EV_DEFAULT_ 0); + + assert(a_got_close); + assert(a_got_connect); + assert(b_got_close); + assert(b_got_connect); + assert(pair_pingpong_cnt == PAIR_PINGPONG_EXCHANGES); + + return 0; +} + + +static void +free_stream (evcom_stream *stream) +{ + assert(stream->errorno == 0); + free(stream); +} + +#define ZERO_TIMEOUT 50.0 +static size_t zero_to_write = 0; +static size_t zero_written = 0; +static size_t zero_read = 0; +static size_t zero_client_closed = 0; + +static void +error_out (evcom_stream *stream) +{ + assert(stream); + fprintf(stderr, "peer connection timeout\n"); + assert(0); +} + +static void +echo (evcom_stream *stream, const void *base, size_t len) +{ + if(len == 0) { + fprintf(stderr, "close"); + evcom_stream_close(stream); + } else { + evcom_stream_write(stream, base, len); + } +} + +static evcom_stream* +make_echo_connection (evcom_server *server, struct sockaddr *addr) +{ + assert(server); + assert(addr); + + evcom_stream *stream = malloc(sizeof(evcom_stream)); + evcom_stream_init(stream, ZERO_TIMEOUT); + stream->on_read = echo; + stream->on_close = free_stream; + stream->on_timeout = error_out; + +#if EVCOM_HAVE_GNUTLS + if (use_tls) anon_tls_server(stream); +#endif + + return stream; +} + + +static void +zero_start (evcom_stream *stream) +{ + evcom_stream_write(stream, "0", 1); + zero_written++; +} + +static void +zero_close (evcom_stream *stream) +{ + assert(stream); + zero_client_closed = 1; +} + +static void +zero_recv (evcom_stream *stream, const void *buf, size_t len) +{ + MARK_PROGRESS("-", zero_read, zero_to_write); + zero_read += len; + + size_t i; + + for (i = 0; i < len; i++) { + assert(((char*)buf)[i] == '0'); + } + + for (i = 0; i < MIN(zero_to_write - zero_written, 90000); i++) { + evcom_stream_write(stream, "0", 1); + zero_written++; + + MARK_PROGRESS(".", zero_written, zero_to_write); + + if (zero_written == zero_to_write) { + + fprintf(stderr, "CLOSE"); + evcom_stream_close(stream); + } + } + + if (len == 0) { + fprintf(stderr, "finish"); + evcom_server_close(&server); + } +} + +int +zero_stream (struct sockaddr *address, size_t to_write) +{ + int r; + + assert(to_write >= 1024); // should be kind of big at least. + zero_to_write = to_write; + got_server_close = 0; + zero_written = 0; + zero_read = 0; + zero_client_closed = 0; + + evcom_server_init(&server); + server.on_connection = make_echo_connection; + server.on_close = common_on_server_close; + + evcom_server_listen(&server, address, 1000); + evcom_server_attach(EV_DEFAULT_ &server); + + evcom_stream client; + evcom_stream_init(&client, ZERO_TIMEOUT); + client.on_read = zero_recv; + client.on_connect = zero_start; + client.on_close = zero_close; + client.on_timeout = error_out; +#if EVCOM_HAVE_GNUTLS + if (use_tls) anon_tls_client(&client); +#endif + r = evcom_stream_connect(&client, address); + assert(r == 0 && "problem connecting"); + evcom_stream_attach(EV_DEFAULT_ &client); + + ev_loop(EV_DEFAULT_ 0); + + assert(got_server_close); + assert(zero_written == zero_to_write); + assert(zero_read == zero_to_write); + assert(zero_client_closed) ; + + return 0; +} + + struct sockaddr * create_unix_address (void) { @@ -386,6 +752,11 @@ create_unix_address (void) void free_unix_address (struct sockaddr *address) { + struct stat tstat; + if (lstat(SOCKFILE, &tstat) == 0) { + assert(S_ISSOCK(tstat.st_mode)); + unlink(SOCKFILE); + } free(address); } @@ -405,48 +776,87 @@ main (void) gnutls_anon_set_server_dh_params (server_credentials, dh_params); #endif + struct sockaddr_in tcp_address; memset(&tcp_address, 0, sizeof(struct sockaddr_in)); tcp_address.sin_family = AF_INET; tcp_address.sin_port = htons(PORT); tcp_address.sin_addr.s_addr = INADDR_ANY; - use_tls = 0; + + fprintf(stderr, "zero_stream tcp: "); + assert(zero_stream((struct sockaddr*)&tcp_address, 5*1024*1024) == 0); + fprintf(stderr, "\n"); + + fprintf(stderr, "pipe_stream: "); + assert(pipe_stream() == 0); + fprintf(stderr, "\n"); + + fprintf(stderr, "pair_pingpong: "); + assert(pair_pingpong() == 0); + fprintf(stderr, "\n"); + + fprintf(stderr, "pingpong tcp: "); assert(pingpong((struct sockaddr*)&tcp_address) == 0); + fprintf(stderr, "\n"); + + fprintf(stderr, "connint tcp: "); assert(connint((struct sockaddr*)&tcp_address) == 0); + fprintf(stderr, "\n"); #if EVCOM_HAVE_GNUTLS use_tls = 1; + + fprintf(stderr, "zero_stream ssl: "); + assert(zero_stream((struct sockaddr*)&tcp_address, 50*1024) == 0); + fprintf(stderr, "\n"); + + fprintf(stderr, "pair_pingpong ssl: "); + assert(pair_pingpong() == 0); + fprintf(stderr, "\n"); + + fprintf(stderr, "pingpong ssl: "); assert(pingpong((struct sockaddr*)&tcp_address) == 0); + fprintf(stderr, "\n"); + + fprintf(stderr, "connint ssl: "); assert(connint((struct sockaddr*)&tcp_address) == 0); -#endif + fprintf(stderr, "\n"); +#endif struct sockaddr *unix_address; - use_tls = 0; + fprintf(stderr, "pingpong unix: "); unix_address = create_unix_address(); assert(pingpong(unix_address) == 0); free_unix_address(unix_address); + fprintf(stderr, "\n"); + fprintf(stderr, "connint unix: "); unix_address = create_unix_address(); assert(connint(unix_address) == 0); free_unix_address(unix_address); + fprintf(stderr, "\n"); #if EVCOM_HAVE_GNUTLS use_tls = 1; + fprintf(stderr, "pingpong unix ssl: "); unix_address = create_unix_address(); assert(pingpong(unix_address) == 0); free_unix_address(unix_address); + fprintf(stderr, "\n"); + fprintf(stderr, "connint unix ssl: "); unix_address = create_unix_address(); assert(connint(unix_address) == 0); free_unix_address(unix_address); -#endif + fprintf(stderr, "\n"); +#endif return 0; } diff --git a/src/http.js b/src/http.js index 773cdb2b71..b59896a20b 100644 --- a/src/http.js +++ b/src/http.js @@ -394,7 +394,7 @@ function connectionListener (connection) { res.should_keep_alive = should_keep_alive; res.addListener("flush", function () { if(flushMessageQueue(connection, responses)) { - connection.fullClose(); + connection.close(); } }); responses.push(res); diff --git a/src/net.cc b/src/net.cc index 8d5e98433c..adedb1e8e7 100644 --- a/src/net.cc +++ b/src/net.cc @@ -62,7 +62,6 @@ Connection::Initialize (v8::Handle target) NODE_SET_PROTOTYPE_METHOD(constructor_template, "connect", Connect); NODE_SET_PROTOTYPE_METHOD(constructor_template, "send", Send); NODE_SET_PROTOTYPE_METHOD(constructor_template, "close", Close); - NODE_SET_PROTOTYPE_METHOD(constructor_template, "fullClose", FullClose); NODE_SET_PROTOTYPE_METHOD(constructor_template, "forceClose", ForceClose); NODE_SET_PROTOTYPE_METHOD(constructor_template, "setEncoding", SetEncoding); NODE_SET_PROTOTYPE_METHOD(constructor_template, "readPause", ReadPause); @@ -116,7 +115,8 @@ Connection::Init (void) Connection::~Connection () { - assert(stream_.fd < 0 && "garbage collecting open Connection"); + assert(stream_.recvfd < 0 && "garbage collecting open Connection"); + assert(stream_.sendfd < 0 && "garbage collecting open Connection"); ForceClose(); } @@ -149,7 +149,8 @@ Connection::Connect (const Arguments& args) return ThrowException(String::New("Socket is not in CLOSED state.")); } - assert(connection->stream_.fd < 0); + assert(connection->stream_.recvfd < 0); + assert(connection->stream_.sendfd < 0); if (args.Length() == 0) return ThrowException(String::New("Must specify a port.")); @@ -344,17 +345,6 @@ Connection::Close (const Arguments& args) return Undefined(); } -Handle -Connection::FullClose (const Arguments& args) -{ - HandleScope scope; - Connection *connection = ObjectWrap::Unwrap(args.Holder()); - assert(connection); - - connection->FullClose(); - return Undefined(); -} - Handle Connection::ForceClose (const Arguments& args) { @@ -394,31 +384,31 @@ Connection::Send (const Arguments& args) enum encoding enc = ParseEncoding(args[1]); Local s = args[0]->ToString(); size_t len = s->Utf8Length(); - evcom_buf *buf = node::buf_new(len); + char buf[len]; switch (enc) { case RAW: case ASCII: - s->WriteAscii(buf->base, 0, len); + s->WriteAscii(buf, 0, len); break; case UTF8: - s->WriteUtf8(buf->base, len); + s->WriteUtf8(buf, len); break; default: assert(0 && "unhandled string encoding"); } - connection->Send(buf); + connection->Send(buf, len); } else if (args[0]->IsArray()) { Handle array = Handle::Cast(args[0]); size_t len = array->Length(); - evcom_buf *buf = node::buf_new(len); + char buf[len]; for (size_t i = 0; i < len; i++) { Local int_value = array->Get(Integer::New(i)); - buf->base[i] = int_value->IntegerValue(); + buf[i] = int_value->IntegerValue(); } - connection->Send(buf); + connection->Send(buf, len); } else return ThrowException(String::New("Bad argument")); diff --git a/src/net.h b/src/net.h index f71faa368f..404d22ea5e 100644 --- a/src/net.h +++ b/src/net.h @@ -24,7 +24,6 @@ protected: static v8::Handle Send (const v8::Arguments& args); static v8::Handle SendUtf8 (const v8::Arguments& args); static v8::Handle Close (const v8::Arguments& args); - static v8::Handle FullClose (const v8::Arguments& args); static v8::Handle ForceClose (const v8::Arguments& args); static v8::Handle SetEncoding (const v8::Arguments& args); static v8::Handle ReadPause (const v8::Arguments& args); @@ -47,9 +46,8 @@ protected: int Connect (struct sockaddr *address) { return evcom_stream_connect (&stream_, address); } - void Send (evcom_buf *buf) { evcom_stream_write(&stream_, buf); } + void Send (const char *buf, size_t len) { evcom_stream_write(&stream_, buf, len); } void Close (void) { evcom_stream_close(&stream_); } - void FullClose (void) { evcom_stream_full_close(&stream_); } void ForceClose (void) { evcom_stream_force_close(&stream_); } void ReadPause (void) { evcom_stream_read_pause(&stream_); } void ReadResume (void) { evcom_stream_read_resume(&stream_); } @@ -92,7 +90,8 @@ private: evcom_stream_detach(s); - assert(connection->stream_.fd < 0); + assert(connection->stream_.recvfd < 0); + assert(connection->stream_.sendfd < 0); connection->OnClose(); diff --git a/test/mjsunit/test-tcp-many-clients.js b/test/mjsunit/test-tcp-many-clients.js index 1231562d19..624513254c 100644 --- a/test/mjsunit/test-tcp-many-clients.js +++ b/test/mjsunit/test-tcp-many-clients.js @@ -18,7 +18,7 @@ var server = node.tcp.createServer(function (c) { total_connections++; print("#"); c.send(body); - c.fullClose(); + c.close(); }); }); server.listen(port); @@ -29,6 +29,7 @@ function runClient (callback) { client.setEncoding("utf8"); client.addListener("connect", function () { + print("c"); client.recved = ""; client.connections += 1; }); diff --git a/test/mjsunit/test-tcp-reconnect.js b/test/mjsunit/test-tcp-reconnect.js index 292e5fd25a..379b0c2681 100644 --- a/test/mjsunit/test-tcp-reconnect.js +++ b/test/mjsunit/test-tcp-reconnect.js @@ -36,7 +36,7 @@ function onLoad () { client_recv_count += 1; puts("client_recv_count " + client_recv_count); assertEquals("hello\r\n", chunk); - client.fullClose(); + client.close(); }); client.addListener("close", function (had_error) { diff --git a/test/mjsunit/test-tcp-throttle-kernel-buffer.js b/test/mjsunit/test-tcp-throttle-kernel-buffer.js index 5639c6aa59..7092b7390a 100644 --- a/test/mjsunit/test-tcp-throttle-kernel-buffer.js +++ b/test/mjsunit/test-tcp-throttle-kernel-buffer.js @@ -13,7 +13,7 @@ puts("start server on port " + PORT); server = node.tcp.createServer(function (connection) { connection.addListener("connect", function () { connection.send(body); - connection.fullClose(); + connection.close(); }); }); server.listen(PORT); diff --git a/test/mjsunit/test-tcp-throttle.js b/test/mjsunit/test-tcp-throttle.js index b5aab9a81d..37bc9bc344 100644 --- a/test/mjsunit/test-tcp-throttle.js +++ b/test/mjsunit/test-tcp-throttle.js @@ -5,7 +5,7 @@ N = 500; server = node.tcp.createServer(function (connection) { function send (j) { if (j >= N) { - connection.fullClose(); + connection.close(); return; } setTimeout(function () {