/* Copyright (c) 2008,2009 Ryan Dahl * * evcom_queue comes from ngx_queue.h * Copyright (C) 2002-2009 Igor Sysoev * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ #include #include #include #include #include #include #include /* memset */ #include #include /* TCP_NODELAY */ #include #include /* shutdown */ #include #include /* sockaddr_in, sockaddr_in6 */ #include #include #if EV_MULTIPLICITY # define D_LOOP_(d) (d)->loop, # define D_LOOP_SET(d, _loop) do { (d)->loop = (_loop); } while (0) #else # define D_LOOP_(d) # define D_LOOP_SET(d, _loop) #endif // EV_MULTIPLICITY /* SEND STATES */ static int stream_send__wait_for_connection (evcom_stream*); static int stream_send__data (evcom_stream*); static int stream_send__drain (evcom_stream*); static int stream_send__wait_for_eof (evcom_stream*); static int stream_send__wait_for_buf (evcom_stream*); static int stream_send__shutdown (evcom_stream*); #if EVCOM_HAVE_GNUTLS static int stream_send__gnutls_bye (evcom_stream*); #endif static int stream_send__close_one (evcom_stream*); static int stream_send__close (evcom_stream*); /* RECV STATES */ static int stream_recv__data (evcom_stream*); static int stream_recv__wait_for_resume (evcom_stream*); static int stream_recv__wait_for_close (evcom_stream*); static int stream_recv__close_one (evcom_stream*); static int stream_recv__close (evcom_stream*); /* COMMON STATES */ #if EVCOM_HAVE_GNUTLS static int stream__handshake (evcom_stream*); #endif static int stream__close_both (evcom_stream*); #undef TRUE #define TRUE 1 #undef FALSE #define FALSE 0 #undef MIN #define MIN(a,b) ((a) < (b) ? (a) : (b)) #define OKAY 0 #define AGAIN 1 #define ATTACHED(s) ((s)->flags & EVCOM_ATTACHED) #define LISTENING(s) ((s)->flags & EVCOM_LISTENING) #define CONNECTED(s) ((s)->flags & EVCOM_CONNECTED) #define SECURE(s) ((s)->flags & EVCOM_SECURE) #define DUPLEX(s) ((s)->flags & EVCOM_DUPLEX) #define GOT_CLOSE(s) ((s)->flags & EVCOM_GOT_CLOSE) #define PAUSED(s) ((s)->flags & EVCOM_PAUSED) #define READABLE(s) ((s)->flags & EVCOM_READABLE) #define WRITABLE(s) ((s)->flags & EVCOM_WRITABLE) static int too_many_connections = 0; EV_INLINE int set_nonblock (int fd) { int flags = fcntl(fd, F_GETFL, 0); if (flags == -1) return -1; int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK); if (r == -1) return -1; return 0; } evcom_buf * evcom_buf_new2 (size_t len) { void *data = malloc(sizeof(evcom_buf) + len); if (!data) return NULL; evcom_buf *buf = data; buf->len = len; buf->release = (void (*)(evcom_buf*))free; buf->base = data + sizeof(evcom_buf); return buf; } evcom_buf * evcom_buf_new (const char *base, size_t len) { evcom_buf* buf = evcom_buf_new2(len); if (!buf) return NULL; memcpy(buf->base, base, len); return buf; } void evcom_ignore_sigpipe (void) { struct sigaction sa; bzero(&sa, sizeof(sa)); sa.sa_handler = SIG_IGN; sigaction(SIGPIPE, &sa, NULL); } static int close_asap (evcom_descriptor *d) { if (d->fd < 0) return OKAY; int r = close(d->fd); if (r < 0) { if (errno == EINTR) { d->action = close_asap; return AGAIN; } else { d->errorno = errno; } } d->action = NULL; d->fd = -1; return OKAY; } #define release_write_buffer(writer) \ do { \ while (!evcom_queue_empty(&(writer)->out)) { \ evcom_queue *q = evcom_queue_last(&(writer)->out); \ evcom_buf *buf = evcom_queue_data(q, evcom_buf, queue); \ evcom_queue_remove(q); \ if (buf->release) buf->release(buf); \ } \ } while (0) static int close_writer_asap (evcom_writer *writer) { release_write_buffer(writer); ev_feed_event(D_LOOP_(writer) &writer->write_watcher, EV_WRITE); return close_asap((evcom_descriptor*)writer); } static inline void evcom_perror (const char *msg, int errorno) { fprintf(stderr, "(evcom) %s %s\n", msg, strerror(errorno)); } static int stream_send__wait_for_buf (evcom_stream *stream) { if (evcom_queue_empty(&stream->out)) { if (GOT_CLOSE(stream)) { stream->send_action = stream_send__drain; return OKAY; } ev_io_stop(D_LOOP_(stream) &stream->write_watcher); return AGAIN; } stream->send_action = stream_send__data; return OKAY; } static inline void stream__set_recv_closed (evcom_stream *stream) { stream->flags &= ~EVCOM_READABLE; stream->recvfd = -1; stream->recv_action = NULL; ev_io_stop(D_LOOP_(stream) &stream->read_watcher); } static inline void stream__set_send_closed (evcom_stream *stream) { release_write_buffer(stream); stream->flags &= ~EVCOM_WRITABLE; stream->sendfd = -1; stream->send_action = NULL; ev_io_stop(D_LOOP_(stream) &stream->write_watcher); } static int stream_send__close_one (evcom_stream *stream) { assert(stream->sendfd >= 0); close(stream->sendfd); /* TODO recover from EINTR */ stream__set_send_closed(stream); if (DUPLEX(stream) || stream->recv_action == stream_recv__wait_for_close) { stream__set_recv_closed(stream); } return OKAY; } static int stream__close_both (evcom_stream *stream) { assert(stream->sendfd != stream->recvfd); assert(stream->sendfd >= 0); assert(stream->recvfd >= 0); close(stream->recvfd); close(stream->sendfd); /* TODO recover from EINTR */ stream__set_send_closed(stream); stream__set_recv_closed(stream); return OKAY; } static int stream_send__close (evcom_stream *stream) { if (DUPLEX(stream) || stream->recvfd < 0) { stream->send_action = stream_send__close_one; } else { stream->send_action = stream__close_both; } return OKAY; } static int stream_recv__close_one (evcom_stream *stream) { assert(stream->recvfd >= 0); close(stream->recvfd); /* TODO recover from EINTR */ stream__set_recv_closed(stream); if (DUPLEX(stream)) { stream__set_send_closed(stream); } return OKAY; } static int stream_recv__close (evcom_stream *stream) { if (DUPLEX(stream) || stream->sendfd < 0) { stream->recv_action = stream_recv__close_one; } else { stream->recv_action = stream__close_both; } return OKAY; } static int stream_send__drain (evcom_stream *stream) { if (stream->on_drain) { stream->on_drain(stream); } if (!GOT_CLOSE(stream)) { stream->send_action = stream_send__wait_for_buf; return OKAY; } #if EVCOM_HAVE_GNUTLS if (SECURE(stream)) { stream->send_action = stream_send__gnutls_bye; return OKAY; } #endif if (DUPLEX(stream)) { stream->send_action = stream_send__shutdown; return OKAY; } stream->send_action = stream_send__close_one; return OKAY; } static int stream_send__wait_for_eof (evcom_stream *stream) { if (READABLE(stream)) { ev_io_stop(D_LOOP_(stream) &stream->write_watcher); assert(stream->send_action == stream_send__wait_for_eof); return AGAIN; } stream->send_action = stream_send__close_one; return OKAY; } static inline ssize_t nosigpipe_send (int fd, const void *buf, size_t len) { int flags = 0; #ifdef MSG_NOSIGNAL flags |= MSG_NOSIGNAL; #endif #ifdef MSG_DONTWAIT flags |= MSG_DONTWAIT; #endif return send(fd, buf, len, flags); } static inline ssize_t nosigpipe_stream_send (evcom_stream *stream, const void *buf, size_t len) { return write(stream->sendfd, buf, len); } #if EVCOM_HAVE_GNUTLS static ssize_t nosigpipe_push (gnutls_transport_ptr_t data, const void *buf, size_t len) { evcom_stream *stream = (evcom_stream*)data; assert(SECURE(stream)); return nosigpipe_stream_send(stream, buf, len); } static ssize_t pull (gnutls_transport_ptr_t data, void* buf, size_t len) { evcom_stream *stream = (evcom_stream*)data; assert(SECURE(stream)); return read(stream->recvfd, buf, len); } static int stream__handshake (evcom_stream *stream) { assert(SECURE(stream)); int r = gnutls_handshake(stream->session); if (gnutls_error_is_fatal(r)) { stream->gnutls_errorno = r; stream->send_action = stream_send__close; stream->recv_action = stream_recv__close; return OKAY; } ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher); if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN) { if (0 == gnutls_record_get_direction((stream)->session)) { ev_io_start(D_LOOP_(stream) &(stream)->read_watcher); ev_io_stop(D_LOOP_(stream) &(stream)->write_watcher); } else { ev_io_stop(D_LOOP_(stream) &(stream)->read_watcher); ev_io_start(D_LOOP_(stream) &(stream)->write_watcher); } assert(stream->recv_action == stream__handshake); assert(stream->send_action == stream__handshake); return AGAIN; } assert(!CONNECTED(stream)); stream->flags |= EVCOM_CONNECTED; if (stream->on_connect) stream->on_connect(stream); /* evcom_stream_force_close might have been called. */ if (stream->recvfd >= 0 && stream->sendfd >= 0) { ev_io_start(D_LOOP_(stream) &stream->read_watcher); ev_io_start(D_LOOP_(stream) &stream->write_watcher); stream->send_action = stream_send__data; stream->recv_action = stream_recv__data; } return OKAY; } static int stream_send__gnutls_bye (evcom_stream *stream) { assert(SECURE(stream)); int r = gnutls_bye(stream->session, GNUTLS_SHUT_WR); if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN) { assert(1 == gnutls_record_get_direction((stream)->session)); assert(stream->send_action == stream_send__gnutls_bye); return AGAIN; } if (gnutls_error_is_fatal(r)) { stream->gnutls_errorno = r; stream->send_action = stream_send__close; return OKAY; } stream->flags &= ~EVCOM_WRITABLE; stream->send_action = stream_send__wait_for_eof; return OKAY; } void evcom_stream_set_secure_session (evcom_stream *stream, gnutls_session_t session) { stream->session = session; stream->flags |= EVCOM_SECURE; } #endif /* HAVE GNUTLS */ static int stream_recv__wait_for_close (evcom_stream *stream) { assert(!READABLE(stream)); if (!WRITABLE(stream)) { stream->recv_action = stream_recv__close; return OKAY; } ev_io_stop(D_LOOP_(stream) &stream->read_watcher); return AGAIN; } static int stream_recv__wait_for_resume (evcom_stream *stream) { stream->flags |= EVCOM_PAUSED; ev_io_stop(D_LOOP_(stream) &stream->read_watcher); assert(stream->recv_action == stream_recv__wait_for_resume); return AGAIN; } static int stream_recv__data (evcom_stream *stream) { char buf[EVCOM_CHUNKSIZE]; size_t buf_size = EVCOM_CHUNKSIZE; ssize_t recved; while (READABLE(stream)) { assert(CONNECTED(stream)); if (PAUSED(stream)) { stream->recv_action = stream_recv__wait_for_resume; return OKAY; } #if EVCOM_HAVE_GNUTLS if (SECURE(stream)) { recved = gnutls_record_recv(stream->session, buf, buf_size); if (gnutls_error_is_fatal(recved)) { stream->gnutls_errorno = recved; stream->recv_action = stream_recv__close; return OKAY; } if (recved == GNUTLS_E_INTERRUPTED || recved == GNUTLS_E_AGAIN) { if (1 == gnutls_record_get_direction((stream)->session)) { fprintf(stderr, "(evcom) gnutls recv: unexpected switch direction!\n"); ev_io_stop(D_LOOP_(stream) &(stream)->read_watcher); ev_io_start(D_LOOP_(stream) &(stream)->write_watcher); } return AGAIN; } /* A server may also receive GNUTLS_E_REHANDSHAKE when a client has * initiated a andshake. In that case the server can only initiate a * handshake or terminate the connection. */ if (recved == GNUTLS_E_REHANDSHAKE) { assert(WRITABLE(stream)); stream->recv_action = stream__handshake; stream->send_action = stream__handshake; return OKAY; } } else #endif /* EVCOM_HAVE_GNUTLS */ { recved = read(stream->recvfd, buf, buf_size); } if (recved < 0) { if (errno == EAGAIN || errno == EINTR) { assert(stream->recv_action == stream_recv__data); return AGAIN; } if (errno != ECONNRESET) { evcom_perror("recv()", stream->errorno); } stream->errorno = errno; stream->recv_action = stream_recv__close; return OKAY; } ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher); assert(recved >= 0); if (recved == 0) { stream->flags &= ~EVCOM_READABLE; ev_io_stop(D_LOOP_(stream) &stream->read_watcher); stream->recv_action = stream_recv__wait_for_close; } /* NOTE: EOF is signaled with recved == 0 on callback */ if (stream->on_read) stream->on_read(stream, buf, recved); if (recved == 0) { return OKAY; } } return AGAIN; } static int stream_send__data (evcom_stream *stream) { ssize_t sent; while (!evcom_queue_empty(&stream->out)) { assert(WRITABLE(stream)); evcom_queue *q = evcom_queue_last(&stream->out); evcom_buf *buf = evcom_queue_data(q, evcom_buf, queue); #if EVCOM_HAVE_GNUTLS if (SECURE(stream)) { sent = gnutls_record_send(stream->session, buf->base + buf->written, buf->len - buf->written); if (sent == GNUTLS_E_INTERRUPTED || sent == GNUTLS_E_AGAIN) { if (0 == gnutls_record_get_direction((stream)->session)) { fprintf(stderr, "(evcom) gnutls send: unexpected switch direction!\n"); ev_io_start(D_LOOP_(stream) &(stream)->read_watcher); ev_io_stop(D_LOOP_(stream) &(stream)->write_watcher); } return AGAIN; } if (gnutls_error_is_fatal(sent)) { stream->gnutls_errorno = sent; stream->send_action = stream_send__close; return OKAY; } } else #endif // EVCOM_HAVE_GNUTLS { /* TODO use writev() here? */ sent = nosigpipe_stream_send(stream, buf->base + buf->written, buf->len - buf->written); } if (sent <= 0) { switch (errno) { case EAGAIN: case EINTR: assert(stream->send_action == stream_send__data); return AGAIN; default: stream->errorno = errno; evcom_perror("send()", errno); /* pass through */ case EPIPE: stream->send_action = stream_send__close; return OKAY; } } ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher); assert(sent >= 0); buf->written += sent; if (buf->written == buf->len) { evcom_queue_remove(q); if (buf->release) buf->release(buf); } } assert(evcom_queue_empty(&stream->out)); stream->send_action = stream_send__drain; return OKAY; } static int stream_send__shutdown (evcom_stream *stream) { int r = shutdown(stream->sendfd, SHUT_WR); if (r < 0) { switch (errno) { case EINTR: assert(stream->send_action == stream_send__shutdown); return OKAY; case ENOTCONN: break; default: stream->errorno = errno; evcom_perror("shutdown()", errno); break; } stream->send_action = stream_send__close; return OKAY; } stream->flags &= ~EVCOM_WRITABLE; stream->send_action = stream_send__wait_for_eof; return OKAY; } static int stream__connection_established (evcom_stream *stream) { assert(!CONNECTED(stream)); #if EVCOM_HAVE_GNUTLS if (SECURE(stream)) { stream->send_action = stream__handshake; stream->recv_action = stream__handshake; } else #endif { stream->flags |= EVCOM_CONNECTED; if (stream->on_connect) stream->on_connect(stream); stream->send_action = stream_send__data; stream->recv_action = stream_recv__data; } ev_io_start(D_LOOP_(stream) &stream->write_watcher); ev_io_start(D_LOOP_(stream) &stream->read_watcher); return OKAY; } static int stream_send__wait_for_connection (evcom_stream *stream) { assert(DUPLEX(stream)); int connect_error; socklen_t len = sizeof(int); int r = getsockopt(stream->sendfd, SOL_SOCKET, SO_ERROR, &connect_error, &len); if (r < 0) { stream->errorno = r; stream->send_action = stream_send__close; return OKAY; } if (connect_error == 0) { stream->send_action = stream__connection_established; return OKAY; } else if (connect_error == EINPROGRESS || connect_error == EINTR) { assert(stream->send_action == stream_send__wait_for_connection); return AGAIN; } stream->errorno = connect_error; stream->send_action = stream_send__close; return OKAY; } void evcom_stream_assign_fds (evcom_stream *stream, int recvfd, int sendfd) { assert(recvfd >= 0); assert(sendfd >= 0); if (recvfd == sendfd) stream->flags |= EVCOM_DUPLEX; if (set_nonblock(recvfd) != 0) { evcom_perror("set_nonblock(recvfd)", errno); } if (set_nonblock(sendfd) != 0) { evcom_perror("set_nonblock(sendfd)", errno); } #ifdef SO_NOSIGPIPE if (DUPLEX(stream)) { int flags = 1; int r = setsockopt(sendfd, SOL_SOCKET, SO_NOSIGPIPE, &flags, sizeof(flags)); if (r < 0) { evcom_perror("setsockopt(SO_NOSIGPIPE)", errno); } } #endif ev_io_set(&stream->read_watcher, recvfd, EV_READ); ev_io_set(&stream->write_watcher, sendfd, EV_WRITE); stream->recvfd = recvfd; stream->sendfd = sendfd; stream->send_action = stream__connection_established; stream->recv_action = stream__connection_established; stream->flags |= EVCOM_READABLE; stream->flags |= EVCOM_WRITABLE; #if EVCOM_HAVE_GNUTLS if (SECURE(stream)) { gnutls_transport_set_lowat(stream->session, 0); gnutls_transport_set_push_function(stream->session, nosigpipe_push); gnutls_transport_set_pull_function(stream->session, pull); gnutls_transport_set_ptr2(stream->session, stream, stream); } #endif } /* Retruns evcom_stream if a connection could be accepted. * The returned stream is not yet attached to the event loop. * Otherwise NULL */ static evcom_stream* accept_connection (evcom_server *server) { struct sockaddr address; /* connector's address information */ socklen_t addr_len = sizeof(address); int fd = accept(server->fd, &address, &addr_len); if (fd < 0) { switch (errno) { case EMFILE: case ENFILE: too_many_connections = 1; server->flags |= EVCOM_TOO_MANY_CONN; evcom_server_detach(server); return NULL; case EINTR: case EAGAIN: return NULL; default: evcom_perror("accept()", errno); return NULL; } assert(0 && "no reach"); } evcom_stream *stream = NULL; if (server->on_connection) { stream = server->on_connection(server, &address); } if (stream == NULL) { close(fd); return NULL; } if (set_nonblock(fd) != 0) { evcom_perror("set_nonblock()", errno); return NULL; } stream->server = server; evcom_stream_assign_fds(stream, fd, fd); return stream; } /* Internal callback * Called by server->watcher. */ static int accept_connections (evcom_descriptor *d) { evcom_server *server = (evcom_server *)d; assert(LISTENING(server)); /* accept as many connections as possible */ evcom_stream *stream; while (server->fd >= 0 && (stream = accept_connection(server))) { evcom_stream_attach(D_LOOP_(server) stream); } return AGAIN; } static inline socklen_t address_length (struct sockaddr *address) { struct sockaddr_un* unix_address = (struct sockaddr_un*)address; int length = 0; switch (address->sa_family) { case AF_INET: length = sizeof(struct sockaddr_in); break; case AF_INET6: length = sizeof(struct sockaddr_in6); break; case AF_UNIX: #ifdef SUN_LEN length = SUN_LEN(unix_address); #else length = strlen(unix_address->sun_path) + sizeof(unix_address->sun_family); #endif break; default: assert(0 && "Unsupported socket family"); } return length; } int evcom_server_listen (evcom_server *server, struct sockaddr *address, int backlog) { assert(!LISTENING(server)); int fd = socket(address->sa_family, SOCK_STREAM, 0); if (fd < 0) { server->errorno = errno; evcom_perror("socket()", errno); return -1; } server->fd = fd; ev_io_set(&server->watcher, server->fd, EV_READ); if (set_nonblock(fd) != 0) { server->errorno = errno; evcom_perror("set_nonblock()", errno); close(fd); return -1; } int flags = 1; setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)); /* XXX: Sending single byte chunks in a response body? Perhaps there is a * need to enable the Nagel algorithm dynamically. For now disabling. */ //setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)); if (bind(fd, address, address_length(address)) < 0) { server->errorno = errno; evcom_perror("bind()", errno); close(fd); return -1; } if (listen(fd, backlog) < 0) { server->errorno = errno; evcom_perror("listen()", errno); close(fd); return -1; } server->flags |= EVCOM_LISTENING; server->action = accept_connections; return 0; } /** * Stops the server. Will not accept new connections. Does not drop * existing connections. */ void evcom_server_close (evcom_server *server) { ev_io_start(D_LOOP_(server) &server->watcher); ev_feed_event(D_LOOP_(server) &server->watcher, EV_READ); close_asap((evcom_descriptor*)server); } void evcom_server_attach (EV_P_ evcom_server *server) { ev_io_start (EV_A_ &server->watcher); D_LOOP_SET(server, EV_A); server->flags |= EVCOM_ATTACHED; } void evcom_server_detach (evcom_server *server) { ev_io_stop (D_LOOP_(server) &server->watcher); D_LOOP_SET(server, NULL); server->flags &= ~EVCOM_ATTACHED; } static void io_event(EV_P_ ev_io *watcher, int revents) { evcom_descriptor *d = watcher->data; #if EV_MULTIPLICITY assert(d->loop == loop); #endif int r = OKAY; if (revents & EV_ERROR) { d->errorno = 1; r = close_asap(d); } while (r == OKAY && d->action && d->fd >= 0) { r = d->action(d); } if (d->fd < 0) { assert(d->action == NULL); ev_clear_pending(EV_A_ watcher); ev_io_stop(EV_A_ watcher); if (d->on_close) d->on_close(d); } } static void evcom_descriptor_init (evcom_descriptor *d) { d->fd = -1; D_LOOP_SET(d, NULL); d->flags = 0; d->errorno = 0; d->action = NULL; } void evcom_server_init (evcom_server *server) { evcom_descriptor_init((evcom_descriptor*)server); ev_init (&server->watcher, io_event); server->watcher.data = server; server->on_connection = NULL; } /* Internal callback. called by stream->timeout_watcher */ static void on_timeout (EV_P_ ev_timer *watcher, int revents) { evcom_stream *stream = watcher->data; #if EV_MULTIPLICITY assert(stream->loop == loop); #endif assert(revents == EV_TIMEOUT); assert(watcher == &stream->timeout_watcher); if (PAUSED(stream)) { ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher); return; } if (stream->on_timeout) stream->on_timeout(stream); // Hack to get error in Node on 'close' event. // should probably be made into a proper error code. stream->errorno = 1; ev_timer_stop(EV_A_ watcher); evcom_stream_force_close(stream); if (stream->on_close) stream->on_close(stream); } static void stream_event (EV_P_ ev_io *w, int revents) { evcom_stream *stream = w->data; if (revents & EV_READ) { while (stream->recv_action) { int r = stream->recv_action(stream); if (r == AGAIN) break; } } if (revents & EV_WRITE) { while (stream->send_action) { int r = stream->send_action(stream); if (r == AGAIN) break; } } if (stream->send_action == NULL) { ev_io_stop(EV_A_ &stream->write_watcher); } if (stream->recv_action == NULL) { ev_io_stop(EV_A_ &stream->read_watcher); } if (stream->sendfd < 0 && stream->recvfd < 0) { ev_timer_stop(EV_A_ &stream->timeout_watcher); if (stream->server && (stream->server->flags & EVCOM_TOO_MANY_CONN)) { #if EV_MULTIPLICITY struct ev_loop *loop = stream->server->loop; #endif stream->server->flags &= ~EVCOM_TOO_MANY_CONN; evcom_server_attach(EV_A_ stream->server); } too_many_connections = 0; if (stream->on_close) stream->on_close(stream); } } /** * If using SSL do consider setting * gnutls_db_set_retrieve_function (stream->session, _); * gnutls_db_set_remove_function (stream->session, _); * gnutls_db_set_store_function (stream->session, _); * gnutls_db_set_ptr (stream->session, _); */ void evcom_stream_init (evcom_stream *stream) { stream->flags = 0; stream->errorno = 0; stream->recvfd = -1; stream->sendfd = -1; // reader things ev_init(&stream->read_watcher, stream_event); stream->read_watcher.data = stream; stream->recv_action = NULL; // writer things ev_init(&stream->write_watcher, stream_event); stream->write_watcher.data = stream; evcom_queue_init(&stream->out); stream->send_action = NULL; // stream things stream->server = NULL; #if EVCOM_HAVE_GNUTLS stream->gnutls_errorno = 0; stream->session = NULL; #endif ev_timer_init(&stream->timeout_watcher, on_timeout, 0., 60.); stream->timeout_watcher.data = stream; stream->on_connect = NULL; stream->on_timeout = NULL; stream->on_read = NULL; stream->on_drain = NULL; stream->on_close = NULL; } void evcom_stream_close (evcom_stream *stream) { stream->flags |= EVCOM_GOT_CLOSE; if (ATTACHED(stream)) { // start the watchers if attached. evcom_stream_attach(D_LOOP_(stream) stream); } } void evcom_stream_force_close (evcom_stream *stream) { if (stream->recvfd >= 0) { close(stream->recvfd); /* XXX What to do on EINTR? */ stream__set_recv_closed(stream); } if (!DUPLEX(stream) && stream->sendfd >= 0) { close(stream->sendfd); } stream__set_send_closed(stream); evcom_stream_detach(stream); } /* Returns the number of bytes flushed to the buffer */ ssize_t evcom_stream_write (evcom_stream *stream, const char *str, size_t len) { if (!WRITABLE(stream) || GOT_CLOSE(stream)) { assert(0 && "Do not write to a closed stream"); return -1; } ssize_t sent = 0; if ( stream->send_action == stream_send__wait_for_buf && evcom_queue_empty(&stream->out) ) { assert(CONNECTED(stream)); #if EVCOM_HAVE_GNUTLS if (SECURE(stream)) { sent = gnutls_record_send(stream->session, str, len); if (gnutls_error_is_fatal(sent)) { stream->gnutls_errorno = sent; goto close; } } else #endif // EVCOM_HAVE_GNUTLS { /* TODO use writev() here? */ sent = nosigpipe_stream_send(stream, str, len); } if (sent < 0) { switch (errno) { case EPIPE: goto close; case EINTR: case EAGAIN: sent = 0; break; default: stream->errorno = errno; evcom_perror("send()", stream->errorno); goto close; } } } /* TODO else { memcpy to last buffer on head } */ assert(sent >= 0); if ((size_t)sent == len) return sent; /* sent the whole buffer */ len -= sent; str += sent; evcom_buf *b = evcom_buf_new(str, len); evcom_queue_insert_head(&stream->out, &b->queue); b->written = 0; assert(stream->sendfd >= 0); if (ATTACHED(stream)) { ev_io_start(D_LOOP_(stream) &stream->write_watcher); } return sent; close: stream->send_action = stream_send__close; stream->recv_action = stream_recv__close; if (ATTACHED(stream)) { ev_io_start(D_LOOP_(stream) &stream->write_watcher); } return -1; } void evcom_stream_reset_timeout (evcom_stream *stream, float timeout) { stream->timeout_watcher.repeat = timeout; if (ATTACHED(stream)) { ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher); } } void evcom_stream_set_no_delay (evcom_stream *stream, int no_delay) { if (DUPLEX(stream)) { int flags = no_delay ? 1 : 0; setsockopt(stream->recvfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)); } } void evcom_stream_attach (EV_P_ evcom_stream *stream) { D_LOOP_SET(stream, EV_A); stream->flags |= EVCOM_ATTACHED; ev_timer_again(EV_A_ &stream->timeout_watcher); if (READABLE(stream)) { ev_io_start(EV_A_ &stream->read_watcher); } if (WRITABLE(stream)) { ev_io_start(EV_A_ &stream->write_watcher); } } void evcom_stream_detach (evcom_stream *stream) { ev_io_stop(D_LOOP_(stream) &stream->write_watcher); ev_io_stop(D_LOOP_(stream) &stream->read_watcher); ev_timer_stop(D_LOOP_(stream) &stream->timeout_watcher); D_LOOP_SET(stream, NULL); stream->flags &= ~EVCOM_ATTACHED; } void evcom_stream_read_pause (evcom_stream *stream) { stream->flags |= EVCOM_PAUSED; ev_timer_stop(D_LOOP_(stream) &stream->timeout_watcher); if (stream->recv_action == stream_recv__data) { ev_io_stop(D_LOOP_(stream) &stream->read_watcher); stream->recv_action = stream_recv__wait_for_resume; } } void evcom_stream_read_resume (evcom_stream *stream) { stream->flags &= ~EVCOM_PAUSED; ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher); if (stream->recv_action == stream_recv__wait_for_resume) { stream->recv_action = stream_recv__data; } if (ATTACHED(stream)) { ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher); if (READABLE(stream)) ev_io_start(D_LOOP_(stream) &stream->read_watcher); } } int evcom_stream_connect (evcom_stream *stream, struct sockaddr *address) { int fd = socket(address->sa_family, SOCK_STREAM, 0); if (fd < 0) { evcom_perror("socket()", errno); return -1; } int r = set_nonblock(fd); if (r < 0) { stream->errorno = errno; evcom_perror("set_nonblock()", errno); close(fd); return -1; } r = connect(fd, address, address_length(address)); if (r < 0 && errno != EINPROGRESS) { stream->errorno = errno; evcom_perror("connect", errno); close(fd); return -1; } evcom_stream_assign_fds(stream, fd, fd); stream->send_action = stream_send__wait_for_connection; stream->recv_action = NULL; return 0; } int evcom_stream_pair (evcom_stream *a, evcom_stream *b) { int sv[2]; int old_errno; int r = socketpair(PF_UNIX, SOCK_STREAM, 0, sv); if (r < 0) return -1; r = set_nonblock(sv[0]); if (r < 0) goto set_nonblock_error; r = set_nonblock(sv[1]); if (r < 0) goto set_nonblock_error; evcom_stream_assign_fds(a, sv[0], sv[0]); evcom_stream_assign_fds(b, sv[1], sv[1]); return 0; set_nonblock_error: old_errno = errno; evcom_perror("set_nonblock()", errno); close(sv[0]); close(sv[1]); errno = old_errno; return -1; } enum evcom_stream_state evcom_stream_state (evcom_stream *stream) { if (stream->recvfd < 0 && stream->sendfd && stream->flags == 0) { return EVCOM_INITIALIZED; } if (stream->recvfd < 0 && stream->sendfd < 0) return EVCOM_CLOSED; if (!CONNECTED(stream)) return EVCOM_CONNECTING; if (GOT_CLOSE(stream)) { if (READABLE(stream)) { return EVCOM_CONNECTED_RO; } else { return EVCOM_CLOSING; } } if (READABLE(stream) && WRITABLE(stream)) return EVCOM_CONNECTED_RW; if (WRITABLE(stream)) return EVCOM_CONNECTED_WO; if (READABLE(stream)) return EVCOM_CONNECTED_RO; return EVCOM_CLOSING; } static int reader_recv (evcom_descriptor *d) { evcom_reader* reader = (evcom_reader*) d; char buf[EVCOM_CHUNKSIZE]; size_t buf_size = EVCOM_CHUNKSIZE; ssize_t recved; while (reader->fd >= 0) { recved = read(reader->fd, buf, buf_size); if (recved < 0) { if (errno == EAGAIN || errno == EINTR) return AGAIN; reader->errorno = errno; evcom_perror("read()", reader->errorno); return close_asap(d); } /* NOTE: EOF is signaled with recved == 0 on callback */ if (reader->on_read) reader->on_read(reader, buf, recved); if (recved == 0) return close_asap(d); } return AGAIN; } void evcom_reader_init (evcom_reader *reader) { evcom_descriptor_init((evcom_descriptor*)reader); reader->on_close = NULL; reader->on_read = NULL; ev_init(&reader->read_watcher, io_event); reader->read_watcher.data = reader; } void evcom_reader_set (evcom_reader *reader, int fd) { assert(fd >= 0); reader->fd = fd; ev_io_set(&reader->read_watcher, fd, EV_READ); reader->action = reader_recv; } void evcom_reader_attach (EV_P_ evcom_reader *reader) { ev_io_start(EV_A_ &reader->read_watcher); D_LOOP_SET(reader, EV_A); } void evcom_reader_detach (evcom_reader *reader) { ev_io_stop(D_LOOP_(reader) &reader->read_watcher); D_LOOP_SET(reader, NULL); } void evcom_reader_close (evcom_reader *reader) { ev_io_start(D_LOOP_(reader) &reader->read_watcher); ev_feed_event(D_LOOP_(reader) &reader->read_watcher, EV_READ); close_asap((evcom_descriptor*)reader); } static int writer_send (evcom_descriptor *d) { evcom_writer* writer = (evcom_writer*) d; assert(writer->fd >= 0); while (!evcom_queue_empty(&writer->out)) { evcom_queue *q = evcom_queue_last(&writer->out); evcom_buf *buf = evcom_queue_data(q, evcom_buf, queue); ssize_t sent = write(writer->fd, buf->base + buf->written, buf->len - buf->written); if (sent < 0) { switch (errno) { case ECONNRESET: case EPIPE: return close_writer_asap(writer); case EINTR: case EAGAIN: sent = 0; return AGAIN; default: writer->errorno = errno; evcom_perror("send()", writer->errorno); return close_writer_asap(writer); } } assert(sent >= 0); buf->written += sent; if (buf->written == buf->len) { evcom_queue_remove(q); if (buf->release) buf->release(buf); } } if (GOT_CLOSE(writer)) { assert(evcom_queue_empty(&writer->out)); return close_writer_asap(writer); } else { ev_io_stop(D_LOOP_(writer) &writer->write_watcher); return AGAIN; } } void evcom_writer_init (evcom_writer* writer) { evcom_descriptor_init((evcom_descriptor*)writer); writer->on_close = NULL; ev_init(&writer->write_watcher, io_event); writer->write_watcher.data = writer; evcom_queue_init(&writer->out); } void evcom_writer_set (evcom_writer* writer, int fd) { assert(fd >= 0); writer->fd = fd; ev_io_set(&writer->write_watcher, fd, EV_WRITE); writer->action = writer_send; } void evcom_writer_attach (EV_P_ evcom_writer* writer) { if (!evcom_queue_empty(&writer->out)) { ev_io_start (EV_A_ &writer->write_watcher); } D_LOOP_SET(writer, EV_A); } void evcom_writer_detach (evcom_writer* writer) { ev_io_stop(D_LOOP_(writer) &writer->write_watcher); D_LOOP_SET(writer, NULL); } void evcom_writer_write (evcom_writer* writer, const char* buf, size_t len) { assert(writer->fd >= 0); ssize_t sent = 0; if (evcom_queue_empty(&writer->out)) { sent = write(writer->fd, buf, len); if (sent < 0) { switch (errno) { case ECONNRESET: case EPIPE: goto close; case EINTR: case EAGAIN: sent = 0; break; default: writer->errorno = errno; evcom_perror("send()", writer->errorno); goto close; } } } /* TODO else { memcpy to last buffer on head } */ assert(sent >= 0); if ((size_t)sent == len) return; /* sent the whole buffer */ len -= sent; buf += sent; evcom_buf *b = evcom_buf_new(buf, len); evcom_queue_insert_head(&writer->out, &b->queue); b->written = 0; assert(writer->fd >= 0); ev_io_start(D_LOOP_(writer) &writer->write_watcher); return; close: close_writer_asap(writer); } void evcom_writer_close (evcom_writer* writer) { writer->flags |= EVCOM_GOT_CLOSE; if (evcom_queue_empty(&writer->out)) close_writer_asap(writer); }