diff --git a/deps/liboi/oi_socket.c b/deps/evnet/evnet.c similarity index 68% rename from deps/liboi/oi_socket.c rename to deps/evnet/evnet.c index 3fd2c2d328..3bbc49e35d 100644 --- a/deps/liboi/oi_socket.c +++ b/deps/evnet/evnet.c @@ -1,6 +1,6 @@ /* Copyright (c) 2008,2009 Ryan Dahl * - * oi_queue comes from ngx_queue.h + * evnet_queue comes from ngx_queue.h * Copyright (C) 2002-2009 Igor Sysoev * * Redistribution and use in source and binary forms, with or without @@ -27,9 +27,9 @@ #include #include #include -#include /* close() */ -#include /* fcntl() */ -#include /* for the default methods */ +#include +#include +#include #include /* memset */ #include /* TCP_NODELAY */ @@ -37,7 +37,7 @@ #include /* shutdown */ #include -#include +#include #if EV_MULTIPLICITY # define SOCKET_LOOP_ socket->loop, @@ -47,15 +47,10 @@ # define SERVER_LOOP_ #endif // EV_MULTIPLICITY -#if HAVE_GNUTLS -# include - -/* a few forwards - * they wont even be defined if not having gnutls - * */ -static int secure_full_goodbye (oi_socket *socket); -static int secure_half_goodbye (oi_socket *socket); -#endif // HAVE_GNUTLS +#if EVNET_HAVE_GNUTLS +static int secure_full_goodbye (evnet_socket *socket); +static int secure_half_goodbye (evnet_socket *socket); +#endif #undef TRUE #define TRUE 1 @@ -68,36 +63,47 @@ static int secure_half_goodbye (oi_socket *socket); #define AGAIN 1 #define ERROR 2 +EV_INLINE int +set_nonblock (int fd) +{ + int flags = fcntl(fd, F_GETFL, 0); + if (flags == -1) return -1; + + int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK); + if (r == -1) return -1; + + return 0; +} + void -oi_buf_destroy (oi_buf *buf) +evnet_buf_destroy (evnet_buf *buf) { free(buf->base); free(buf); } -oi_buf * -oi_buf_new2 (size_t len) +evnet_buf * +evnet_buf_new2 (size_t len) { - oi_buf *buf = malloc(sizeof(oi_buf)); - if(!buf) - return NULL; + evnet_buf *buf = malloc(sizeof(evnet_buf)); + if (!buf) return NULL; buf->base = malloc(len); - if(!buf->base) { + if (!buf->base) { free(buf); return NULL; } buf->len = len; - buf->release = oi_buf_destroy; + buf->release = evnet_buf_destroy; return buf; } -oi_buf * -oi_buf_new (const char *base, size_t len) +evnet_buf * +evnet_buf_new (const char *base, size_t len) { - oi_buf *buf = oi_buf_new2(len); - if(!buf) - return NULL; + evnet_buf *buf = evnet_buf_new2(len); + if (!buf) return NULL; memcpy(buf->base, base, len); + return buf; } @@ -107,13 +113,13 @@ oi_buf_new (const char *base, size_t len) } while (0) static int -full_close(oi_socket *socket) +full_close (evnet_socket *socket) { //printf("close(%d)\n", socket->fd); if (close(socket->fd) == -1) { - if (errno == EINTR) + if (errno == EINTR) { return AGAIN; - else { + } else { socket->errorno = errno; return ERROR; } @@ -122,11 +128,12 @@ full_close(oi_socket *socket) socket->read_action = NULL; socket->write_action = NULL; socket->fd = -1; + return OKAY; } static int -half_close(oi_socket *socket) +half_close (evnet_socket *socket) { int r = shutdown(socket->fd, SHUT_WR); if (r == -1) { @@ -151,14 +158,15 @@ half_close(oi_socket *socket) // This is to be called when ever the out_stream is empty // and we need to change state. static void -change_state_for_empty_out_stream (oi_socket *socket) +change_state_for_empty_out_stream (evnet_socket *socket) { /* * a very complicated bunch of close logic! * XXX this is awful. FIXME */ - if (socket->write_action == full_close || socket->read_action == full_close) + if (socket->write_action == full_close || socket->read_action == full_close) { return; + } if (socket->got_half_close == FALSE) { if (socket->got_full_close == FALSE) { @@ -166,48 +174,51 @@ change_state_for_empty_out_stream (oi_socket *socket) ev_io_stop(SOCKET_LOOP_ &socket->write_watcher); } else { /* Got Full Close. */ - if (socket->read_action) -#if HAVE_GNUTLS + if (socket->read_action) { +#if EVNET_HAVE_GNUTLS socket->read_action = socket->secure ? secure_full_goodbye : full_close; #else socket->read_action = full_close; #endif + } - if (socket->write_action) -#if HAVE_GNUTLS + if (socket->write_action) { +#if EVNET_HAVE_GNUTLS socket->write_action = socket->secure ? secure_full_goodbye : full_close; #else socket->write_action = full_close; #endif + } } } else { /* Got Half Close. */ - if (socket->write_action) -#if HAVE_GNUTLS + if (socket->write_action) { +#if EVNET_HAVE_GNUTLS socket->write_action = socket->secure ? secure_half_goodbye : half_close; #else socket->write_action = half_close; #endif + } } } static void -update_write_buffer_after_send (oi_socket *socket, ssize_t sent) +update_write_buffer_after_send (evnet_socket *socket, ssize_t sent) { - oi_queue *q = oi_queue_last(&socket->out_stream); - oi_buf *to_write = oi_queue_data(q, oi_buf, queue); + evnet_queue *q = evnet_queue_last(&socket->out_stream); + evnet_buf *to_write = evnet_queue_data(q, evnet_buf, queue); to_write->written += sent; socket->written += sent; if (to_write->written == to_write->len) { - oi_queue_remove(q); + evnet_queue_remove(q); if (to_write->release) { to_write->release(to_write); } - if (oi_queue_empty(&socket->out_stream)) { + if (evnet_queue_empty(&socket->out_stream)) { change_state_for_empty_out_stream(socket); if (socket->on_drain) socket->on_drain(socket); @@ -215,15 +226,15 @@ update_write_buffer_after_send (oi_socket *socket, ssize_t sent) } } -#if HAVE_GNUTLS -static int secure_socket_send(oi_socket *socket); -static int secure_socket_recv(oi_socket *socket); +#if EVNET_HAVE_GNUTLS +static int secure_socket_send (evnet_socket *socket); +static int secure_socket_recv (evnet_socket *socket); /* TODO can this be done without ignoring SIGPIPE? */ static ssize_t -nosigpipe_push(gnutls_transport_ptr_t data, const void *buf, size_t len) +nosigpipe_push (gnutls_transport_ptr_t data, const void *buf, size_t len) { - oi_socket *socket = (oi_socket*)data; + evnet_socket *socket = (evnet_socket*)data; assert(socket->secure); int flags = 0; #ifdef MSG_NOSIGNAL @@ -242,7 +253,7 @@ nosigpipe_push(gnutls_transport_ptr_t data, const void *buf, size_t len) } static int -secure_handshake(oi_socket *socket) +secure_handshake (evnet_socket *socket) { assert(socket->secure); @@ -256,51 +267,51 @@ secure_handshake(oi_socket *socket) if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN) return AGAIN; - oi_socket_reset_timeout(socket); + evnet_socket_reset_timeout(socket); if (!socket->connected) { socket->connected = TRUE; if (socket->on_connect) socket->on_connect(socket); } - if (socket->read_action == secure_handshake) + if (socket->read_action == secure_handshake) { socket->read_action = secure_socket_recv; - - if (socket->write_action == secure_handshake) + } + + if (socket->write_action == secure_handshake) { socket->write_action = secure_socket_send; + } return OKAY; } static int -secure_socket_send(oi_socket *socket) +secure_socket_send (evnet_socket *socket) { ssize_t sent; - if (oi_queue_empty(&socket->out_stream)) { + if (evnet_queue_empty(&socket->out_stream)) { ev_io_stop(SOCKET_LOOP_ &socket->write_watcher); return AGAIN; } - oi_queue *q = oi_queue_last(&socket->out_stream); - oi_buf *to_write = oi_queue_data(q, oi_buf, queue); + evnet_queue *q = evnet_queue_last(&socket->out_stream); + evnet_buf *to_write = evnet_queue_data(q, evnet_buf, queue); assert(socket->secure); - sent = gnutls_record_send( socket->session - , to_write->base + to_write->written - , to_write->len - to_write->written - ); + sent = gnutls_record_send(socket->session, + to_write->base + to_write->written, + to_write->len - to_write->written); if (gnutls_error_is_fatal(sent)) { socket->gnutls_errorno = sent; return ERROR; } - if (sent == 0) - return AGAIN; + if (sent == 0) return AGAIN; - oi_socket_reset_timeout(socket); + evnet_socket_reset_timeout(socket); if (sent == GNUTLS_E_INTERRUPTED || sent == GNUTLS_E_AGAIN) { return AGAIN; @@ -319,7 +330,7 @@ secure_socket_send(oi_socket *socket) } static int -secure_socket_recv(oi_socket *socket) +secure_socket_recv (evnet_socket *socket) { char recv_buffer[socket->chunksize]; size_t recv_buffer_size = socket->chunksize; @@ -340,7 +351,7 @@ secure_socket_recv(oi_socket *socket) return AGAIN; } - oi_socket_reset_timeout(socket); + evnet_socket_reset_timeout(socket); /* A server may also receive GNUTLS_E_REHANDSHAKE when a client has * initiated a handshake. In that case the server can only initiate a @@ -361,14 +372,16 @@ secure_socket_recv(oi_socket *socket) /* Got EOF */ if (recved == 0) { socket->read_action = NULL; - if (socket->write_action == NULL) - CLOSE_ASAP(socket); + if (socket->write_action == NULL) CLOSE_ASAP(socket); } - if (socket->write_action) + if (socket->write_action) { socket->write_action = secure_socket_send; + } - if (socket->on_read) { socket->on_read(socket, recv_buffer, recved); } + if (socket->on_read) { + socket->on_read(socket, recv_buffer, recved); + } return OKAY; } @@ -378,7 +391,7 @@ secure_socket_recv(oi_socket *socket) } static int -secure_full_goodbye (oi_socket *socket) +secure_full_goodbye (evnet_socket *socket) { assert(socket->secure); @@ -389,8 +402,7 @@ secure_full_goodbye (oi_socket *socket) return ERROR; } - if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN) - return AGAIN; + if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN) return AGAIN; CLOSE_ASAP(socket); @@ -398,7 +410,7 @@ secure_full_goodbye (oi_socket *socket) } static int -secure_half_goodbye (oi_socket *socket) +secure_half_goodbye (evnet_socket *socket) { assert(socket->secure); @@ -409,17 +421,15 @@ secure_half_goodbye (oi_socket *socket) return ERROR; } - if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN) - return AGAIN; + if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN) return AGAIN; - if (socket->write_action) - socket->write_action = half_close; + if (socket->write_action) socket->write_action = half_close; return OKAY; } void -oi_socket_set_secure_session (oi_socket *socket, gnutls_session_t session) +evnet_socket_set_secure_session (evnet_socket *socket, gnutls_session_t session) { socket->session = session; socket->secure = TRUE; @@ -427,19 +437,19 @@ oi_socket_set_secure_session (oi_socket *socket, gnutls_session_t session) #endif /* HAVE GNUTLS */ static int -socket_send (oi_socket *socket) +socket_send (evnet_socket *socket) { ssize_t sent; assert(socket->secure == FALSE); - if (oi_queue_empty(&socket->out_stream)) { + if (evnet_queue_empty(&socket->out_stream)) { ev_io_stop(SOCKET_LOOP_ &socket->write_watcher); return AGAIN; } - oi_queue *q = oi_queue_last(&socket->out_stream); - oi_buf *to_write = oi_queue_data(q, oi_buf, queue); + evnet_queue *q = evnet_queue_last(&socket->out_stream); + evnet_buf *to_write = evnet_queue_data(q, evnet_buf, queue); int flags = 0; #ifdef MSG_NOSIGNAL @@ -451,11 +461,10 @@ socket_send (oi_socket *socket) /* TODO use writev() here */ - sent = send( socket->fd - , to_write->base + to_write->written - , to_write->len - to_write->written - , flags - ); + sent = send(socket->fd, + to_write->base + to_write->written, + to_write->len - to_write->written, + flags); if (sent < 0) { switch (errno) { @@ -472,7 +481,7 @@ socket_send (oi_socket *socket) } } - oi_socket_reset_timeout(socket); + evnet_socket_reset_timeout(socket); if (!socket->connected) { socket->connected = TRUE; @@ -485,7 +494,7 @@ socket_send (oi_socket *socket) } static int -socket_recv (oi_socket *socket) +socket_recv (evnet_socket *socket) { char buf[TCP_MAXWIN]; size_t buf_size = TCP_MAXWIN; @@ -519,23 +528,22 @@ socket_recv (oi_socket *socket) } } - oi_socket_reset_timeout(socket); + evnet_socket_reset_timeout(socket); if (recved == 0) { - oi_socket_read_stop(socket); + evnet_socket_read_stop(socket); socket->read_action = NULL; - if (socket->write_action == NULL) - CLOSE_ASAP(socket); + if (socket->write_action == NULL) CLOSE_ASAP(socket); } /* NOTE: EOF is signaled with recved == 0 on callback */ - if (socket->on_read) { socket->on_read(socket, buf, recved); } + if (socket->on_read) socket->on_read(socket, buf, recved); return OKAY; } static void -assign_file_descriptor (oi_socket *socket, int fd) +assign_file_descriptor (evnet_socket *socket, int fd) { socket->fd = fd; @@ -545,100 +553,130 @@ assign_file_descriptor (oi_socket *socket, int fd) socket->read_action = socket_recv; socket->write_action = socket_send; -#if HAVE_GNUTLS +#if EVNET_HAVE_GNUTLS if (socket->secure) { gnutls_transport_set_lowat(socket->session, 0); gnutls_transport_set_push_function(socket->session, nosigpipe_push); - gnutls_transport_set_ptr2 ( socket->session - /* recv */ , (gnutls_transport_ptr_t)fd - /* send */ , socket - ); + gnutls_transport_set_ptr2(socket->session, + (gnutls_transport_ptr_t)fd, /* recv */ + socket); /* send */ socket->read_action = secure_handshake; socket->write_action = secure_handshake; } #endif } - -/* Internal callback - * Called by server->connection_watcher. - */ -static void -on_connection (EV_P_ ev_io *watcher, int revents) +static void +server_close_with_error (evnet_server *server, int errorno) { - oi_server *server = watcher->data; - - // printf("on connection!\n"); + if (server->listening) { + evnet_server_detach(server); + close(server->fd); /* TODO do this on the loop? check return value? */ + server->fd = -1; + server->listening = FALSE; - assert(server->listening); -#if EV_MULTIPLICITY - assert(server->loop == loop); -#endif - assert(&server->connection_watcher == watcher); - - if (EV_ERROR & revents) { - oi_server_close(server); - return; + if (server->on_close) { + server->on_close(server, errorno); + } } - +} + + +/* Retruns evnet_socket if a connection could be accepted. + * The returned socket is not yet attached to the event loop. + * Otherwise NULL + */ +static evnet_socket* +accept_connection (evnet_server *server) +{ struct sockaddr address; /* connector's address information */ socklen_t addr_len = sizeof(address); - /* TODO accept all possible connections? currently: just one */ int fd = accept(server->fd, (struct sockaddr*)&address, &addr_len); if (fd < 0) { - perror("accept()"); - return; +#ifdef EWOULDBLOCK + if (errno == EWOULDBLOCK) return NULL; +#else + if (errno == EAGAIN) return NULL; +#endif + goto error; } - oi_socket *socket = NULL; - if (server->on_connection) - socket = server->on_connection(server, (struct sockaddr*)&address, addr_len); + evnet_socket *socket = NULL; + + if (server->on_connection) { + socket = server->on_connection(server, (struct sockaddr*)&address); + } if (socket == NULL) { close(fd); - return; - } - - int flags = fcntl(fd, F_GETFL, 0); - int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK); - if (r < 0) { - /* TODO error report */ + return NULL; } + if (set_nonblock(fd) != 0) goto error; + #ifdef SO_NOSIGPIPE flags = 1; - setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &flags, sizeof(flags)); + r = setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &flags, sizeof(flags)); + if (r < 0) goto error; #endif socket->server = server; assign_file_descriptor(socket, fd); - oi_socket_attach(EV_A_ socket); + + return socket; + +error: + server_close_with_error(server, errno); + return NULL; +} + + +/* Internal callback + * Called by server->connection_watcher. + */ +static void +on_connection (EV_P_ ev_io *watcher, int revents) +{ + evnet_server *server = watcher->data; + + assert(server->listening); +#if EV_MULTIPLICITY + assert(server->loop == loop); +#endif + assert(&server->connection_watcher == watcher); + + if (EV_ERROR & revents) { + server_close_with_error(server, 1); + return; + } + + /* accept as many connections as possible */ + evnet_socket *socket; + while ((socket = accept_connection(server))) { + evnet_socket_attach(EV_A_ socket); + } } int -oi_server_listen(oi_server *server, struct addrinfo *addrinfo) +evnet_server_listen (evnet_server *server, struct addrinfo *addrinfo, int backlog) { int fd = -1; assert(server->listening == FALSE); - fd = socket( addrinfo->ai_family - , addrinfo->ai_socktype - , addrinfo->ai_protocol - ); + fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); if (fd < 0) { perror("socket()"); return -1; } - int flags = fcntl(fd, F_GETFL, 0); - int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK); - if (r < 0) { - perror("fcntl()"); + if (set_nonblock(fd) != 0) { + perror("set_nonblock()"); return -1; } - flags = 1; + int flags = 1; setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)); @@ -653,7 +691,7 @@ oi_server_listen(oi_server *server, struct addrinfo *addrinfo) return -1; } - if (listen(fd, server->backlog) < 0) { + if (listen(fd, backlog) < 0) { perror("listen()"); close(fd); return -1; @@ -671,18 +709,13 @@ oi_server_listen(oi_server *server, struct addrinfo *addrinfo) * existing connections. */ void -oi_server_close(oi_server *server) +evnet_server_close (evnet_server *server) { - if (server->listening) { - oi_server_detach(server); - close(server->fd); - /* TODO do this on the loop? check return value? */ - server->listening = FALSE; - } + server_close_with_error(server, 0); } void -oi_server_attach (EV_P_ oi_server *server) +evnet_server_attach (EV_P_ evnet_server *server) { ev_io_start (EV_A_ &server->connection_watcher); #if EV_MULTIPLICITY @@ -692,7 +725,7 @@ oi_server_attach (EV_P_ oi_server *server) } void -oi_server_detach (oi_server *server) +evnet_server_detach (evnet_server *server) { ev_io_stop (SERVER_LOOP_ &server->connection_watcher); #if EV_MULTIPLICITY @@ -702,42 +735,43 @@ oi_server_detach (oi_server *server) } void -oi_server_init(oi_server *server, int backlog) +evnet_server_init (evnet_server *server) { - server->backlog = backlog; server->attached = FALSE; server->listening = FALSE; server->fd = -1; server->connection_watcher.data = server; ev_init (&server->connection_watcher, on_connection); - server->on_connection = NULL; - server->on_error = NULL; - server->data = NULL; + server->on_close = NULL; } /* Internal callback. called by socket->timeout_watcher */ static void -on_timeout(EV_P_ ev_timer *watcher, int revents) +on_timeout (EV_P_ ev_timer *watcher, int revents) { - oi_socket *socket = watcher->data; + evnet_socket *socket = watcher->data; +#if EV_MULTIPLICITY + assert(socket->loop == loop); +#endif + assert(revents == EV_TIMEOUT); assert(watcher == &socket->timeout_watcher); // printf("on_timeout\n"); - if (socket->on_timeout) { socket->on_timeout(socket); } + if (socket->on_timeout) socket->on_timeout(socket); // timeout does not automatically kill your connection. you must! } static void -release_write_buffer(oi_socket *socket) +release_write_buffer(evnet_socket *socket) { - while (!oi_queue_empty(&socket->out_stream)) { - oi_queue *q = oi_queue_last(&socket->out_stream); - oi_buf *buf = oi_queue_data(q, oi_buf, queue); - oi_queue_remove(q); - if (buf->release) { buf->release(buf); } + while (!evnet_queue_empty(&socket->out_stream)) { + evnet_queue *q = evnet_queue_last(&socket->out_stream); + evnet_buf *buf = evnet_queue_data(q, evnet_buf, queue); + evnet_queue_remove(q); + if (buf->release) buf->release(buf); } } @@ -745,7 +779,7 @@ release_write_buffer(oi_socket *socket) static void on_io_event(EV_P_ ev_io *watcher, int revents) { - oi_socket *socket = watcher->data; + evnet_socket *socket = watcher->data; if (revents & EV_ERROR) { socket->errorno = 1; @@ -758,27 +792,29 @@ on_io_event(EV_P_ ev_io *watcher, int revents) while (have_read_event || have_write_event) { /* RECV LOOP - TRY TO CLEAR THE BUFFER */ - if (socket->read_action == NULL) + if (socket->read_action == NULL) { have_read_event = FALSE; - else { + } else { r = socket->read_action(socket); - if (r == AGAIN) + if (r == AGAIN) { have_read_event = FALSE; - else if (r == ERROR) - CLOSE_ASAP(socket); + } else { + if (r == ERROR) CLOSE_ASAP(socket); + } } /* SEND LOOP - TRY TO CLEAR THE BUFFER */ - if (socket->write_action == NULL) + if (socket->write_action == NULL) { have_write_event = FALSE; - else { + } else { r = socket->write_action(socket); - if (r == AGAIN) + if (r == AGAIN) { have_write_event = FALSE; - else if (r == ERROR) - CLOSE_ASAP(socket); + } else { + if (r == ERROR) CLOSE_ASAP(socket); + } } } @@ -790,10 +826,10 @@ on_io_event(EV_P_ ev_io *watcher, int revents) ev_clear_pending (EV_A_ &socket->read_watcher); ev_clear_pending (EV_A_ &socket->timeout_watcher); - oi_socket_detach(socket); + evnet_socket_detach(socket); assert(socket->fd == -1); - if (socket->on_close) { socket->on_close(socket); } + if (socket->on_close) socket->on_close(socket); /* WARNING: user can free socket in on_close so no more * access beyond this point. */ } @@ -807,7 +843,7 @@ on_io_event(EV_P_ ev_io *watcher, int revents) * gnutls_db_set_ptr (socket->session, _); */ void -oi_socket_init(oi_socket *socket, float timeout) +evnet_socket_init (evnet_socket *socket, float timeout) { socket->fd = -1; socket->server = NULL; @@ -817,9 +853,9 @@ oi_socket_init(oi_socket *socket, float timeout) socket->attached = FALSE; socket->connected = FALSE; - oi_queue_init(&socket->out_stream); + evnet_queue_init(&socket->out_stream); - ev_init (&socket->write_watcher, on_io_event); + ev_init(&socket->write_watcher, on_io_event); socket->write_watcher.data = socket; ev_init(&socket->read_watcher, on_io_event); @@ -831,7 +867,7 @@ oi_socket_init(oi_socket *socket, float timeout) socket->errorno = 0; socket->secure = FALSE; -#if HAVE_GNUTLS +#if EVNET_HAVE_GNUTLS socket->gnutls_errorno = 0; socket->session = NULL; #endif @@ -842,7 +878,8 @@ oi_socket_init(oi_socket *socket, float timeout) socket->read_action = NULL; socket->write_action = NULL; - socket->chunksize = TCP_MAXWIN; + socket->chunksize = 8192; + socket->on_connect = NULL; socket->on_read = NULL; socket->on_drain = NULL; @@ -850,22 +887,24 @@ oi_socket_init(oi_socket *socket, float timeout) } void -oi_socket_close (oi_socket *socket) +evnet_socket_close (evnet_socket *socket) { socket->got_half_close = TRUE; - if (oi_queue_empty(&socket->out_stream)) + if (evnet_queue_empty(&socket->out_stream)) { change_state_for_empty_out_stream(socket); + } } void -oi_socket_full_close (oi_socket *socket) +evnet_socket_full_close (evnet_socket *socket) { socket->got_full_close = TRUE; - if (oi_queue_empty(&socket->out_stream)) + if (evnet_queue_empty(&socket->out_stream)) { change_state_for_empty_out_stream(socket); + } } -void oi_socket_force_close (oi_socket *socket) +void evnet_socket_force_close (evnet_socket *socket) { release_write_buffer(socket); @@ -874,19 +913,16 @@ void oi_socket_force_close (oi_socket *socket) ev_clear_pending (SOCKET_LOOP_ &socket->timeout_watcher); socket->write_action = socket->read_action = NULL; - // socket->errorno = OI_SOCKET_ERROR_FORCE_CLOSE - // + // socket->errorno = EVNET_SOCKET_ERROR_FORCE_CLOSE - if (socket->fd > 0) { - close(socket->fd); - } + if (socket->fd > 0) close(socket->fd); socket->fd = -1; - oi_socket_detach(socket); + evnet_socket_detach(socket); } void -oi_socket_write(oi_socket *socket, oi_buf *buf) +evnet_socket_write (evnet_socket *socket, evnet_buf *buf) { if (socket->write_action == NULL) { assert(0 && "Do not write to a closed socket"); @@ -897,7 +933,7 @@ oi_socket_write(oi_socket *socket, oi_buf *buf) goto error; } - oi_queue_insert_head(&socket->out_stream, &buf->queue); + evnet_queue_insert_head(&socket->out_stream, &buf->queue); buf->written = 0; if (socket->attached) { @@ -910,13 +946,13 @@ error: } void -oi_socket_reset_timeout(oi_socket *socket) +evnet_socket_reset_timeout (evnet_socket *socket) { ev_timer_again(SOCKET_LOOP_ &socket->timeout_watcher); } static void -free_simple_buf ( oi_buf *buf ) +free_simple_buf (evnet_buf *buf) { free(buf->base); free(buf); @@ -926,18 +962,18 @@ free_simple_buf ( oi_buf *buf ) * NOTE: Allocates memory. Avoid for performance applications. */ void -oi_socket_write_simple(oi_socket *socket, const char *str, size_t len) +evnet_socket_write_simple (evnet_socket *socket, const char *str, size_t len) { - oi_buf *buf = malloc(sizeof(oi_buf)); + evnet_buf *buf = malloc(sizeof(evnet_buf)); buf->release = free_simple_buf; buf->base = strdup(str); buf->len = len; - oi_socket_write(socket, buf); + evnet_socket_write(socket, buf); } void -oi_socket_attach(EV_P_ oi_socket *socket) +evnet_socket_attach (EV_P_ evnet_socket *socket) { #if EV_MULTIPLICITY socket->loop = loop; @@ -946,15 +982,17 @@ oi_socket_attach(EV_P_ oi_socket *socket) ev_timer_again(EV_A_ &socket->timeout_watcher); - if (socket->read_action) + if (socket->read_action) { ev_io_start(EV_A_ &socket->read_watcher); + } - if (socket->write_action) + if (socket->write_action) { ev_io_start(EV_A_ &socket->write_watcher); + } } void -oi_socket_detach(oi_socket *socket) +evnet_socket_detach (evnet_socket *socket) { if (socket->attached) { ev_io_stop(SOCKET_LOOP_ &socket->write_watcher); @@ -968,14 +1006,14 @@ oi_socket_detach(oi_socket *socket) } void -oi_socket_read_stop (oi_socket *socket) +evnet_socket_read_stop (evnet_socket *socket) { ev_io_stop(SOCKET_LOOP_ &socket->read_watcher); - ev_clear_pending (SOCKET_LOOP_ &socket->read_watcher); + ev_clear_pending(SOCKET_LOOP_ &socket->read_watcher); } void -oi_socket_read_start (oi_socket *socket) +evnet_socket_read_start (evnet_socket *socket) { if (socket->read_action) { ev_io_start(SOCKET_LOOP_ &socket->read_watcher); @@ -984,21 +1022,18 @@ oi_socket_read_start (oi_socket *socket) } int -oi_socket_connect(oi_socket *s, struct addrinfo *addrinfo) +evnet_socket_connect (evnet_socket *s, struct addrinfo *addrinfo) { - int fd = socket( addrinfo->ai_family - , addrinfo->ai_socktype - , addrinfo->ai_protocol - ); + int fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, + addrinfo->ai_protocol); if (fd < 0) { perror("socket()"); return -1; } - int flags = fcntl(fd, F_GETFL, 0); - int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK); + int r = set_nonblock(fd); if (r < 0) { - perror("fcntl()"); + perror("set_nonblock()"); return -1; } @@ -1007,10 +1042,7 @@ oi_socket_connect(oi_socket *s, struct addrinfo *addrinfo) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &flags, sizeof(flags)); #endif - r = connect( fd - , addrinfo->ai_addr - , addrinfo->ai_addrlen - ); + r = connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen); if (r < 0 && errno != EINPROGRESS) { perror("connect"); diff --git a/deps/evnet/evnet.h b/deps/evnet/evnet.h new file mode 100644 index 0000000000..77c47a2f77 --- /dev/null +++ b/deps/evnet/evnet.h @@ -0,0 +1,232 @@ +/* Copyright (c) 2008,2009 Ryan Dahl + * + * evnet_queue comes from Nginx, ngx_queue.h + * Copyright (C) 2002-2009 Igor Sysoev + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include +#include +#include /* offsetof() */ + +#ifndef evnet_h +#define evnet_h + +#ifdef __cplusplus +extern "C" { +#endif + +#ifndef EVNET_HAVE_GNUTLS +# define EVNET_HAVE_GNUTLS 0 +#endif +#if EVNET_HAVE_GNUTLS +# include +#endif + +typedef struct evnet_queue evnet_queue; +typedef struct evnet_buf evnet_buf; +typedef struct evnet_server evnet_server; +typedef struct evnet_socket evnet_socket; + +void evnet_server_init (evnet_server *); + int evnet_server_listen (evnet_server *, struct addrinfo *addrinfo, int backlog); +void evnet_server_attach (EV_P_ evnet_server *); +void evnet_server_detach (evnet_server *); +void evnet_server_close (evnet_server *); // synchronous + +void evnet_socket_init (evnet_socket *, float timeout); + int evnet_socket_connect (evnet_socket *, struct addrinfo *addrinfo); +void evnet_socket_attach (EV_P_ evnet_socket *); +void evnet_socket_detach (evnet_socket *); +void evnet_socket_read_start (evnet_socket *); +void evnet_socket_read_stop (evnet_socket *); + +/* Resets the timeout to stay alive for another socket->timeout seconds + */ +void evnet_socket_reset_timeout (evnet_socket *); + +/* Writes a buffer to the socket. + * (Do not send a NULL evnet_buf or a buffer with evnet_buf->base == NULL.) + */ +void evnet_socket_write (evnet_socket *, evnet_buf *); + +void evnet_socket_write_simple (evnet_socket *, const char *str, size_t len); + +/* Once the write buffer is drained, evnet_socket_close will shutdown the + * writing end of the socket and will close the read end once the server + * replies with an EOF. + */ +void evnet_socket_close (evnet_socket *); + +/* Do not wait for the server to reply with EOF. This will only be called + * once the write buffer is drained. + * Warning: For TCP socket, the OS kernel may (should) reply with RST + * packets if this is called when data is still being received from the + * server. + */ +void evnet_socket_full_close (evnet_socket *); + +/* The most extreme measure. + * Will not wait for the write queue to complete. + */ +void evnet_socket_force_close (evnet_socket *); + + +#if EVNET_HAVE_GNUTLS +/* Tells the socket to use transport layer security (SSL). evnet_socket does + * not want to make any decisions about security requirements, so the + * majoirty of GnuTLS configuration is left to the user. Only the transport + * layer of GnuTLS is controlled by evnet_socket. That is, do not use + * gnutls_transport_* functions. Do use the rest of GnuTLS's API. + */ +void evnet_socket_set_secure_session (evnet_socket *, gnutls_session_t); +#endif + +evnet_buf * evnet_buf_new (const char* base, size_t len); +evnet_buf * evnet_buf_new2 (size_t len); +void evnet_buf_destroy (evnet_buf *); + + +struct evnet_queue { + evnet_queue *prev; + evnet_queue *next; +}; + +struct evnet_buf { + /* public */ + char *base; + size_t len; + void (*release) (evnet_buf *); /* called when oi is done with the object */ + void *data; + + /* private */ + size_t written; + evnet_queue queue; +}; + +struct evnet_server { + /* read only */ + int fd; +#if EV_MULTIPLICITY + struct ev_loop *loop; +#endif + unsigned attached:1; + unsigned listening:1; + + /* PRIVATE */ + ev_io connection_watcher; + + /* PUBLIC */ + + evnet_socket* (*on_connection) (evnet_server *, struct sockaddr *remote_addr); + + /* Executed when a server is closed. + * If evnet_server_close() was called errorno will be 0. + * An libev error is indicated with errorno == 1 + * Otherwise errorno is a stdlib errno from a system call, e.g. accept() + */ + void (*on_close) (evnet_server *, int errorno); + + void *data; +}; + +struct evnet_socket { + /* read only */ + int fd; +#if EV_MULTIPLICITY + struct ev_loop *loop; +#endif + evnet_server *server; + evnet_queue out_stream; + size_t written; + unsigned attached:1; + unsigned connected:1; + unsigned secure:1; + unsigned got_full_close:1; + unsigned got_half_close:1; + + /* NULL = that end of the socket is closed. */ + int (*read_action) (evnet_socket *); + int (*write_action) (evnet_socket *); + + /* ERROR CODES. 0 = no error. Check on_close. */ + int errorno; +#if EVNET_HAVE_GNUTLS + int gnutls_errorno; +#endif + + /* private */ + ev_io write_watcher; + ev_io read_watcher; + ev_timer timeout_watcher; +#if EVNET_HAVE_GNUTLS + gnutls_session_t session; +#endif + + /* public */ + size_t chunksize; /* the maximum chunk that on_read() will return */ + void (*on_connect) (evnet_socket *); + void (*on_read) (evnet_socket *, const void *buf, size_t count); + void (*on_drain) (evnet_socket *); + void (*on_close) (evnet_socket *); + void (*on_timeout) (evnet_socket *); + void *data; +}; + +EV_INLINE void +evnet_queue_init (evnet_queue *q) +{ + q->prev = q; + q->next = q; +} + +EV_INLINE void +evnet_queue_insert_head (evnet_queue *h, evnet_queue *x) +{ + (x)->next = (h)->next; + (x)->next->prev = x; + (x)->prev = h; + (h)->next = x; +} + +EV_INLINE void +evnet_queue_remove (evnet_queue *x) +{ + (x)->next->prev = (x)->prev; + (x)->prev->next = (x)->next; +#ifndef NDEBUG + (x)->prev = NULL; + (x)->next = NULL; +#endif +} + +#define evnet_queue_empty(h) (h == (h)->prev) +#define evnet_queue_head(h) (h)->next +#define evnet_queue_last(h) (h)->prev +#define evnet_queue_data(q, type, link) \ + (type *) ((unsigned char *) q - offsetof(type, link)) + + +#ifdef __cplusplus +} +#endif +#endif /* evnet_h */ diff --git a/deps/evnet/test/echo.c b/deps/evnet/test/echo.c new file mode 100644 index 0000000000..5d58b06537 --- /dev/null +++ b/deps/evnet/test/echo.c @@ -0,0 +1,102 @@ +#include +#include +#include +#include +#include + +#include +#include +#include +#include + + +#include +#include +#include + +#define HOST "127.0.0.1" +#define SOCKFILE "/tmp/oi.sock" +#define PORT "5000" + +static int nconnections; + +static void +on_peer_close (evnet_socket *socket) +{ + assert(socket->errorno == 0); + //printf("server connection closed\n"); + free(socket); +} + +static void +on_peer_timeout (evnet_socket *socket) +{ + assert(socket); + fprintf(stderr, "peer connection timeout\n"); + assert(0); +} + + + +// timeout must match the timeout in timeout.rb +#define TIMEOUT 5.0 + +static void +on_peer_read (evnet_socket *socket, const void *base, size_t len) +{ + if(len == 0) return; + + evnet_socket_write_simple(socket, base, len); +} + +static evnet_socket* +on_server_connection (evnet_server *server, struct sockaddr *addr) +{ + assert(server); + assert(addr); + + evnet_socket *socket = malloc(sizeof(evnet_socket)); + evnet_socket_init(socket, TIMEOUT); + socket->on_read = on_peer_read; + socket->on_close = on_peer_close; + socket->on_timeout = on_peer_timeout; + + nconnections++; + + + //printf("on server connection\n"); + + return socket; +} + +int +main (void) +{ + int r; + evnet_server server; + + //printf("sizeof(evnet_server): %d\n", sizeof(evnet_server)); + //printf("sizeof(evnet_socket): %d\n", sizeof(evnet_socket)); + + evnet_server_init(&server); + server.on_connection = on_server_connection; + + struct addrinfo *servinfo; + struct addrinfo hints; + memset(&hints, 0, sizeof hints); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + r = getaddrinfo(NULL, PORT, &hints, &servinfo); + assert(r == 0); + + r = evnet_server_listen(&server, servinfo, 10); + assert(r == 0); + evnet_server_attach(EV_DEFAULT_ &server); + + ev_loop(EV_DEFAULT_ 0); + + freeaddrinfo(servinfo); + + return 0; +} diff --git a/deps/evnet/test/test.c b/deps/evnet/test/test.c new file mode 100644 index 0000000000..a3cbc0a9d9 --- /dev/null +++ b/deps/evnet/test/test.c @@ -0,0 +1,478 @@ +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include + +#if EVNET_HAVE_GNUTLS +# include +#endif + +static const struct addrinfo tcp_hints = +/* ai_flags */ { .ai_flags = 0 +/* ai_family */ , .ai_family = AF_UNSPEC +/* ai_socktype */ , .ai_socktype = SOCK_STREAM + , 0 + }; + +#define MARK_PROGRESS write(STDERR_FILENO, ".", 1) + +#define HOST "127.0.0.1" +#define SOCKFILE "/tmp/oi.sock" +#define PORT "5000" + +static evnet_server server; +static int nconnections; +static int use_tls; +static int got_server_close; + +static void +common_on_server_close (evnet_server *server, int errorno) +{ + assert(server); + assert(errorno == 0); + got_server_close = 1; +} + +static void +common_on_peer_close (evnet_socket *socket) +{ + assert(socket->errorno == 0); + printf("server connection closed\n"); +#if EVNET_HAVE_GNUTLS + assert(socket->gnutls_errorno == 0); + if (use_tls) gnutls_deinit(socket->session); +#endif + free(socket); +} + +static void +common_on_client_timeout (evnet_socket *socket) +{ + assert(socket); + printf("client connection timeout\n"); +} + +static void +common_on_peer_timeout (evnet_socket *socket) +{ + assert(socket); + fprintf(stderr, "peer connection timeout\n"); + assert(0); +} + +#if EVNET_HAVE_GNUTLS +#define DH_BITS 768 +static gnutls_anon_server_credentials_t server_credentials; +const int kx_prio[] = { GNUTLS_KX_ANON_DH, 0 }; +static gnutls_dh_params_t dh_params; + +void anon_tls_server (evnet_socket *socket) +{ + gnutls_session_t session; + socket->data = session; + + int r = gnutls_init(&session, GNUTLS_SERVER); + assert(r == 0); + gnutls_set_default_priority(session); + gnutls_kx_set_priority (session, kx_prio); + gnutls_credentials_set(session, GNUTLS_CRD_ANON, server_credentials); + gnutls_dh_set_prime_bits(session, DH_BITS); + + evnet_socket_set_secure_session(socket, session); +} + +void anon_tls_client (evnet_socket *socket) +{ + gnutls_session_t client_session; + gnutls_anon_client_credentials_t client_credentials; + + gnutls_anon_allocate_client_credentials (&client_credentials); + gnutls_init(&client_session, GNUTLS_CLIENT); + gnutls_set_default_priority(client_session); + gnutls_kx_set_priority(client_session, kx_prio); + /* Need to enable anonymous KX specifically. */ + gnutls_credentials_set(client_session, GNUTLS_CRD_ANON, client_credentials); + + evnet_socket_set_secure_session(socket, client_session); + assert(socket->secure); +} + +#endif // EVNET_HAVE_GNUTLS + + + + + +#define PING "PING" +#define PONG "PONG" +#define EXCHANGES 5000 +#define PINGPONG_TIMEOUT 5.0 + +static int successful_ping_count; + +static void +pingpong_on_peer_read (evnet_socket *socket, const void *base, size_t len) +{ + if (len == 0) { + evnet_socket_close(socket); + return; + } + + char buf[2000]; + strncpy(buf, base, len); + buf[len] = 0; + printf("server got message: %s\n", buf); + + evnet_socket_write_simple(socket, PONG, sizeof PONG); +} + +static void +pingpong_on_client_close (evnet_socket *socket) +{ + assert(socket); + printf("client connection closed\n"); + evnet_server_close(&server); +} + +static evnet_socket* +pingpong_on_server_connection (evnet_server *_server, struct sockaddr *addr) +{ + assert(_server == &server); + assert(addr); + + evnet_socket *socket = malloc(sizeof(evnet_socket)); + evnet_socket_init(socket, PINGPONG_TIMEOUT); + socket->on_read = pingpong_on_peer_read; + socket->on_close = common_on_peer_close; + socket->on_timeout = common_on_peer_timeout; + + nconnections++; + +#if EVNET_HAVE_GNUTLS + if (use_tls) anon_tls_server(socket); +#endif + + printf("on server connection\n"); + + return socket; +} + +static void +pingpong_on_client_connect (evnet_socket *socket) +{ + printf("client connected. sending ping\n"); + evnet_socket_write_simple(socket, PING, sizeof PING); +} + +static void +pingpong_on_client_read (evnet_socket *socket, const void *base, size_t len) +{ + if(len == 0) { + evnet_socket_close(socket); + return; + } + + assert(len = strlen(PONG)); + + char buf[len+1]; + strncpy(buf, base, len); + buf[len] = 0; + printf("client got message: %s\n", buf); + + assert(strcmp(buf, PONG) == 0); + + if (++successful_ping_count > EXCHANGES) { + evnet_socket_close(socket); + return; + } + + if (successful_ping_count % (EXCHANGES/20) == 0) MARK_PROGRESS; + + evnet_socket_write_simple(socket, PING, sizeof PING); +} + +int +pingpong (struct addrinfo *servinfo) +{ + int r; + evnet_socket client; + + successful_ping_count = 0; + nconnections = 0; + got_server_close = 0; + + printf("sizeof(evnet_server): %d\n", sizeof(evnet_server)); + printf("sizeof(evnet_socket): %d\n", sizeof(evnet_socket)); + + evnet_server_init(&server); + server.on_connection = pingpong_on_server_connection; + server.on_close = common_on_server_close; + + r = evnet_server_listen(&server, servinfo, 10); + assert(r == 0); + evnet_server_attach(EV_DEFAULT_ &server); + + evnet_socket_init(&client, PINGPONG_TIMEOUT); + client.on_read = pingpong_on_client_read; + client.on_connect = pingpong_on_client_connect; + client.on_close = pingpong_on_client_close; + client.on_timeout = common_on_client_timeout; + +#if EVNET_HAVE_GNUTLS + if (use_tls) anon_tls_client(&client); +#endif + + r = evnet_socket_connect(&client, servinfo); + assert(r == 0 && "problem connecting"); + evnet_socket_attach(EV_DEFAULT_ &client); + + ev_loop(EV_DEFAULT_ 0); + + printf("successful_ping_count = %d\n", successful_ping_count); + assert(successful_ping_count == EXCHANGES + 1); + assert(nconnections == 1); + assert(got_server_close); + + return 0; +} + + + + +#define NCONN 100 +#define CONNINT_TIMEOUT 1000.0 + +static void +connint_on_peer_read(evnet_socket *socket, const void *base, size_t len) +{ + assert(base); + assert(len == 0); + evnet_socket_write_simple(socket, "BYE", 3); + printf("server wrote bye\n"); +} + +static void +connint_on_peer_drain(evnet_socket *socket) +{ + evnet_socket_close(socket); +} + +static evnet_socket* +connint_on_server_connection(evnet_server *_server, struct sockaddr *addr) +{ + assert(_server == &server); + assert(addr); + + evnet_socket *socket = malloc(sizeof(evnet_socket)); + evnet_socket_init(socket, CONNINT_TIMEOUT); + socket->on_read = connint_on_peer_read; + socket->on_drain = connint_on_peer_drain; + socket->on_close = common_on_peer_close; + socket->on_timeout = common_on_peer_timeout; + +#if EVNET_HAVE_GNUTLS + if (use_tls) anon_tls_server(socket); +#endif + + printf("on server connection\n"); + + return socket; +} + +static void +connint_on_client_connect (evnet_socket *socket) +{ + printf("on client connection\n"); + evnet_socket_close(socket); +} + +static void +connint_on_client_close (evnet_socket *socket) +{ + evnet_socket_close(socket); // already closed, but it shouldn't crash if we try to do it again + + printf("client connection closed\n"); + + if (nconnections % (NCONN/20) == 0) MARK_PROGRESS; + + if(++nconnections == NCONN) { + evnet_server_close(&server); + printf("closing server\n"); + } +} + +static void +connint_on_client_read (evnet_socket *socket, const void *base, size_t len) +{ + if (len == 0) { + evnet_socket_close(socket); + return; + } + + char buf[200000]; + strncpy(buf, base, len); + buf[len] = 0; + + printf("client got message: %s\n", buf); + + assert(strcmp(buf, "BYE") == 0); + evnet_socket_close(socket); +} + +int +connint (struct addrinfo *servinfo) +{ + int r; + + nconnections = 0; + got_server_close = 0; + + evnet_server_init(&server); + server.on_connection = connint_on_server_connection; + server.on_close = common_on_server_close; + + + evnet_server_listen(&server, servinfo, 1000); + evnet_server_attach(EV_DEFAULT_ &server); + + evnet_socket clients[NCONN]; + int i; + for (i = 0; i < NCONN; i++) { + evnet_socket *client = &clients[i]; + evnet_socket_init(client, CONNINT_TIMEOUT); + client->on_read = connint_on_client_read; + client->on_connect = connint_on_client_connect; + client->on_close = connint_on_client_close; + client->on_timeout = common_on_client_timeout; +#if EVNET_HAVE_GNUTLS + if (use_tls) anon_tls_client(client); +#endif + r = evnet_socket_connect(client, servinfo); + assert(r == 0 && "problem connecting"); + evnet_socket_attach(EV_DEFAULT_ client); + } + + ev_loop(EV_DEFAULT_ 0); + + assert(nconnections == NCONN); + assert(got_server_close); + + return 0; +} + + +struct addrinfo * +create_tcp_address (void) +{ + struct addrinfo *servinfo; + int r = getaddrinfo(NULL, PORT, &tcp_hints, &servinfo); + assert(r == 0); + return servinfo; +} + +void +free_tcp_address (struct addrinfo *servinfo) +{ + freeaddrinfo(servinfo); +} + + +struct addrinfo * +create_unix_address (void) +{ + struct addrinfo *servinfo; + struct stat tstat; + if (lstat(SOCKFILE, &tstat) == 0) { + assert(S_ISSOCK(tstat.st_mode)); + unlink(SOCKFILE); + } + + servinfo = malloc(sizeof(struct addrinfo)); + servinfo->ai_family = AF_UNIX; + servinfo->ai_socktype = SOCK_STREAM; + servinfo->ai_protocol = 0; + + struct sockaddr_un *sockaddr = calloc(sizeof(struct sockaddr_un), 1); + sockaddr->sun_family = AF_UNIX; + strcpy(sockaddr->sun_path, SOCKFILE); + + servinfo->ai_addr = (struct sockaddr*)sockaddr; + servinfo->ai_addrlen = sizeof(struct sockaddr_un); + + return servinfo; +} + +void +free_unix_address (struct addrinfo *servinfo) +{ + free(servinfo->ai_addr); + free(servinfo); +} + + +int +main (void) +{ +#if EVNET_HAVE_GNUTLS + gnutls_global_init(); + + gnutls_dh_params_init (&dh_params); + + fsync((int)stderr); + gnutls_dh_params_generate2 (dh_params, DH_BITS); + + gnutls_anon_allocate_server_credentials (&server_credentials); + gnutls_anon_set_server_dh_params (server_credentials, dh_params); +#endif + + struct addrinfo *tcp_address = create_tcp_address(); + struct addrinfo *unix_address; + + + use_tls = 0; + assert(pingpong(tcp_address) == 0); + assert(connint(tcp_address) == 0); + +#if EVNET_HAVE_GNUTLS + use_tls = 1; + assert(pingpong(tcp_address) == 0); + assert(connint(tcp_address) == 0); +#endif + + + + use_tls = 0; + + unix_address = create_unix_address(); + assert(pingpong(unix_address) == 0); + free_unix_address(unix_address); + + unix_address = create_unix_address(); + assert(connint(unix_address) == 0); + free_unix_address(unix_address); + +#if EVNET_HAVE_GNUTLS + use_tls = 1; + + unix_address = create_unix_address(); + assert(pingpong(unix_address) == 0); + free_unix_address(unix_address); + + unix_address = create_unix_address(); + assert(connint(unix_address) == 0); + free_unix_address(unix_address); +#endif + + + free_tcp_address(tcp_address); + return 0; +} diff --git a/deps/liboi/test/timeout.rb b/deps/evnet/test/timeout.rb similarity index 94% rename from deps/liboi/test/timeout.rb rename to deps/evnet/test/timeout.rb index dd1dd729ed..db45d84acb 100755 --- a/deps/liboi/test/timeout.rb +++ b/deps/evnet/test/timeout.rb @@ -1,20 +1,22 @@ #!/usr/bin/env ruby +require 'socket' def test(description) pid = fork do exec(File.dirname(__FILE__) + "/echo") end + puts "#{description}: " begin sleep 0.5 # give time for the server to start yield(pid) rescue - puts "\033[1;31mFAIL\033[m: #{description}" + puts "\033[1;31mFAIL\033[m" raise $! ensure `kill -9 #{pid}` end - puts "\033[1;32mPASS\033[m: #{description}" + puts "\033[1;32mPASS\033[m" end test("make sure echo server works") do diff --git a/deps/liboi/oi_socket.h b/deps/liboi/oi_socket.h deleted file mode 100644 index 878ebffbe5..0000000000 --- a/deps/liboi/oi_socket.h +++ /dev/null @@ -1,216 +0,0 @@ -/* Copyright (c) 2008,2009 Ryan Dahl - * - * oi_queue comes from ngx_queue.h - * Copyright (C) 2002-2009 Igor Sysoev - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions - * are met: - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND - * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE - * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL - * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS - * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) - * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT - * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY - * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF - * SUCH DAMAGE. - */ -#include -#include -#include /* offsetof() */ - -#ifndef oi_socket_h -#define oi_socket_h -#ifdef __cplusplus -extern "C" { -#endif - -#ifndef HAVE_GNUTLS -# define HAVE_GNUTLS 0 -#endif -#if HAVE_GNUTLS -# include -#endif -typedef struct oi_queue oi_queue; -struct oi_queue { - oi_queue *prev; - oi_queue *next; -}; - -#define oi_queue_init(q) \ - (q)->prev = q; \ - (q)->next = q - -#define oi_queue_empty(h) \ - (h == (h)->prev) - -#define oi_queue_insert_head(h, x) \ - (x)->next = (h)->next; \ - (x)->next->prev = x; \ - (x)->prev = h; \ - (h)->next = x - -#define oi_queue_head(h) \ - (h)->next - -#define oi_queue_last(h) \ - (h)->prev - -#define oi_queue_remove(x) \ - (x)->next->prev = (x)->prev; \ - (x)->prev->next = (x)->next; \ - (x)->prev = NULL; \ - (x)->next = NULL - -#define oi_queue_data(q, type, link) \ - (type *) ((unsigned char *) q - offsetof(type, link)) - -typedef struct oi_buf oi_buf; -typedef struct oi_server oi_server; -typedef struct oi_socket oi_socket; - -oi_buf * oi_buf_new (const char* base, size_t len); -oi_buf * oi_buf_new2 (size_t len); -void oi_buf_destroy (oi_buf *); - -void oi_server_init (oi_server *, int backlog); - int oi_server_listen (oi_server *, struct addrinfo *addrinfo); -void oi_server_attach (EV_P_ oi_server *); -void oi_server_detach (oi_server *); -void oi_server_close (oi_server *); - -void oi_socket_init (oi_socket *, float timeout); - int oi_socket_connect (oi_socket *, struct addrinfo *addrinfo); -void oi_socket_attach (EV_P_ oi_socket *); -void oi_socket_detach (oi_socket *); -void oi_socket_read_start (oi_socket *); -void oi_socket_read_stop (oi_socket *); - -/* Resets the timeout to stay alive for another socket->timeout seconds - */ -void oi_socket_reset_timeout (oi_socket *); - -/* Writes a buffer to the socket. - * (Do not send a NULL oi_buf or a buffer with oi_buf->base == NULL.) - */ -void oi_socket_write (oi_socket *, oi_buf *); - -void oi_socket_write_simple (oi_socket *, const char *str, size_t len); - -/* Once the write buffer is drained, oi_socket_close will shutdown the - * writing end of the socket and will close the read end once the server - * replies with an EOF. - */ -void oi_socket_close (oi_socket *); - -/* Do not wait for the server to reply with EOF. This will only be called - * once the write buffer is drained. - * Warning: For TCP socket, the OS kernel may (should) reply with RST - * packets if this is called when data is still being received from the - * server. - */ -void oi_socket_full_close (oi_socket *); - -/* The most extreme measure. - * Will not wait for the write queue to complete. - */ -void oi_socket_force_close (oi_socket *); - - -#if HAVE_GNUTLS -/* Tells the socket to use transport layer security (SSL). oi_socket does - * not want to make any decisions about security requirements, so the - * majoirty of GnuTLS configuration is left to the user. Only the transport - * layer of GnuTLS is controlled by oi_socket. That is, do not use - * gnutls_transport_* functions. Do use the rest of GnuTLS's API. - */ -void oi_socket_set_secure_session (oi_socket *, gnutls_session_t); -#endif - -struct oi_buf { - /* public */ - char *base; - size_t len; - void (*release) (oi_buf *); /* called when oi is done with the object */ - void *data; - - /* private */ - size_t written; - oi_queue queue; -}; - -struct oi_server { - /* read only */ - int fd; - int backlog; -#if EV_MULTIPLICITY - struct ev_loop *loop; -#endif - unsigned attached:1; - unsigned listening:1; - - /* private */ - ev_io connection_watcher; - - /* public */ - oi_socket* (*on_connection) (oi_server *, struct sockaddr *remote_addr, socklen_t remove_addr_len); - void (*on_error) (oi_server *); - void *data; -}; - -struct oi_socket { - /* read only */ - int fd; -#if EV_MULTIPLICITY - struct ev_loop *loop; -#endif - oi_server *server; - oi_queue out_stream; - size_t written; - unsigned attached:1; - unsigned connected:1; - unsigned secure:1; - unsigned got_full_close:1; - unsigned got_half_close:1; - - /* NULL = that end of the socket is closed. */ - int (*read_action) (oi_socket *); - int (*write_action) (oi_socket *); - - /* ERROR CODES. 0 = no error. Check on_close. */ - int errorno; -#if HAVE_GNUTLS - int gnutls_errorno; -#endif - - /* private */ - ev_io write_watcher; - ev_io read_watcher; - ev_timer timeout_watcher; -#if HAVE_GNUTLS - gnutls_session_t session; -#endif - - /* public */ - size_t chunksize; /* the maximum chunk that on_read() will return */ - void (*on_connect) (oi_socket *); - void (*on_read) (oi_socket *, const void *buf, size_t count); - void (*on_drain) (oi_socket *); - void (*on_close) (oi_socket *); - void (*on_timeout) (oi_socket *); - void *data; -}; - -#ifdef __cplusplus -} -#endif -#endif /* oi_socket_h */ diff --git a/deps/liboi/test/common.c b/deps/liboi/test/common.c deleted file mode 100644 index 9003152299..0000000000 --- a/deps/liboi/test/common.c +++ /dev/null @@ -1,106 +0,0 @@ -#include -#include -#include -#include -#include - -#include -#include -#include -#include - - -#include -#include -#include - -#define HOST "127.0.0.1" -#define SOCKFILE "/tmp/oi.sock" -#define PORT "5000" - -int nconnections; - -static void -on_peer_close(oi_socket *socket) -{ - assert(socket->errorno == 0); - //printf("server connection closed\n"); -#if HAVE_GNUTLS - assert(socket->gnutls_errorno == 0); -#if SECURE - gnutls_deinit(socket->session); -#endif -#endif - free(socket); -} - -static void -on_client_timeout(oi_socket *socket) -{ - printf("client connection timeout\n"); - assert(0); -} - -static void -on_peer_timeout(oi_socket *socket) -{ - fprintf(stderr, "peer connection timeout\n"); - assert(0); -} - - -#if HAVE_GNUTLS -#if SECURE -#define DH_BITS 768 -gnutls_anon_server_credentials_t server_credentials; -const int kx_prio[] = { GNUTLS_KX_ANON_DH, 0 }; -static gnutls_dh_params_t dh_params; - -void anon_tls_init() -{ - gnutls_global_init(); - - gnutls_dh_params_init (&dh_params); - - fprintf(stderr, ".."); - fsync((int)stderr); - gnutls_dh_params_generate2 (dh_params, DH_BITS); - fprintf(stderr, "."); - - gnutls_anon_allocate_server_credentials (&server_credentials); - gnutls_anon_set_server_dh_params (server_credentials, dh_params); -} - -void anon_tls_server(oi_socket *socket) -{ - gnutls_session_t session; - socket->data = session; - - int r = gnutls_init(&session, GNUTLS_SERVER); - assert(r == 0); - gnutls_set_default_priority(session); - gnutls_kx_set_priority (session, kx_prio); - gnutls_credentials_set(session, GNUTLS_CRD_ANON, server_credentials); - gnutls_dh_set_prime_bits(session, DH_BITS); - - oi_socket_set_secure_session(socket, session); -} - -void anon_tls_client(oi_socket *socket) -{ - gnutls_session_t client_session; - gnutls_anon_client_credentials_t client_credentials; - - gnutls_anon_allocate_client_credentials (&client_credentials); - gnutls_init (&client_session, GNUTLS_CLIENT); - gnutls_set_default_priority(client_session); - gnutls_kx_set_priority(client_session, kx_prio); - /* Need to enable anonymous KX specifically. */ - gnutls_credentials_set (client_session, GNUTLS_CRD_ANON, client_credentials); - - oi_socket_set_secure_session(socket, client_session); - assert(socket->secure); -} - -#endif // SECURE -#endif // HAVE_GNUTLS diff --git a/deps/liboi/test/connection_interruption.c b/deps/liboi/test/connection_interruption.c deleted file mode 100644 index bcc0df532d..0000000000 --- a/deps/liboi/test/connection_interruption.c +++ /dev/null @@ -1,154 +0,0 @@ -#include "test/common.c" -#define NCONN 100 -#define TIMEOUT 1000.0 - -static oi_server server; - -static void -on_peer_read(oi_socket *socket, const void *base, size_t len) -{ - assert(len == 0); - oi_socket_write_simple(socket, "BYE", 3); - //printf("server wrote bye\n"); -} - -static void -on_peer_drain(oi_socket *socket) -{ - oi_socket_close(socket); -} - -static oi_socket* -on_server_connection(oi_server *server, struct sockaddr *addr, socklen_t len) -{ - oi_socket *socket = malloc(sizeof(oi_socket)); - oi_socket_init(socket, TIMEOUT); - socket->on_read = on_peer_read; - socket->on_drain = on_peer_drain; - socket->on_close = on_peer_close; - socket->on_timeout = on_peer_timeout; - -#if HAVE_GNUTLS -# if SECURE - anon_tls_server(socket); -# endif -#endif - - //printf("on server connection\n"); - - return socket; -} - -static void -on_client_connect(oi_socket *socket) -{ - //printf("on client connection\n"); - oi_socket_close(socket); -} - -static void -on_client_close(oi_socket *socket) -{ - oi_socket_close(socket); // already closed, but it shouldn't crash if we try to do it again - - //printf("client connection closed\n"); - if(++nconnections == NCONN) { - oi_server_detach(&server); - //printf("detaching server\n"); - } -} - -static void -on_client_read(oi_socket *socket, const void *base, size_t len) -{ - if (len == 0) { - oi_socket_close(socket); - return; - } - - char buf[200000]; - strncpy(buf, base, len); - buf[len] = 0; - - //printf("client got message: %s\n", buf); - - if (strcmp(buf, "BYE") == 0) { - oi_socket_close(socket); - } else { - assert(0); - } -} - -int -main(int argc, const char *argv[]) -{ - int r; - - oi_server_init(&server, 1000); - server.on_connection = on_server_connection; -#if HAVE_GNUTLS -# if SECURE - anon_tls_init(); -# endif -#endif - - struct addrinfo *servinfo; - struct addrinfo hints; - memset(&hints, 0, sizeof hints); -#if TCP - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_PASSIVE; - r = getaddrinfo(NULL, PORT, &hints, &servinfo); - assert(r == 0); -#else - struct stat tstat; - if (lstat(SOCKFILE, &tstat) == 0) { - if (S_ISSOCK(tstat.st_mode)) - unlink(SOCKFILE); - } - - servinfo = malloc(sizeof(struct addrinfo)); - servinfo->ai_family = AF_UNIX; - servinfo->ai_socktype = SOCK_STREAM; - servinfo->ai_protocol = 0; - - struct sockaddr_un *sockaddr = calloc(sizeof(struct sockaddr_un), 1); - sockaddr->sun_family = AF_UNIX; - strcpy(sockaddr->sun_path, SOCKFILE); - - servinfo->ai_addr = (struct sockaddr*)sockaddr; - servinfo->ai_addrlen = sizeof(struct sockaddr_un); -#endif - - oi_server_listen(&server, servinfo); - oi_server_attach(EV_DEFAULT_ &server); - - int i; - for(i = 0; i < NCONN; i++) { - oi_socket *client = malloc(sizeof(oi_socket)); - oi_socket_init(client, TIMEOUT); - client->on_read = on_client_read; - client->on_connect = on_client_connect; - client->on_close = on_client_close; - client->on_timeout = on_client_timeout; -#if HAVE_GNUTLS -#if SECURE - anon_tls_client(client); -#endif -#endif - r = oi_socket_connect(client, servinfo); - assert(r == 0 && "problem connecting"); - oi_socket_attach(EV_DEFAULT_ client); - } - - ev_loop(EV_DEFAULT_ 0); - - assert(nconnections == NCONN); - -#if TCP - freeaddrinfo(servinfo); -#endif - - return 0; -} diff --git a/deps/liboi/test/echo.c b/deps/liboi/test/echo.c deleted file mode 100644 index 7b7a4068ec..0000000000 --- a/deps/liboi/test/echo.c +++ /dev/null @@ -1,97 +0,0 @@ -#include "test/common.c" - -// timeout must match the timeout in timeout.rb -#define TIMEOUT 5.0 - -int successful_ping_count; - -static void -on_peer_read(oi_socket *socket, const void *base, size_t len) -{ - if(len == 0) - return; - - oi_socket_write_simple(socket, base, len); -} - -static oi_socket* -on_server_connection(oi_server *server, struct sockaddr *addr, socklen_t len) -{ - oi_socket *socket = malloc(sizeof(oi_socket)); - oi_socket_init(socket, TIMEOUT); - socket->on_read = on_peer_read; - socket->on_close = on_peer_close; - socket->on_timeout = on_peer_timeout; - - nconnections++; - -#if HAVE_GNUTLS -# if SECURE - anon_tls_server(socket); -# endif -#endif - - //printf("on server connection\n"); - - return socket; -} - -int -main(int argc, const char *argv[]) -{ - int r; - oi_server server; - oi_socket client; - - //printf("sizeof(oi_server): %d\n", sizeof(oi_server)); - //printf("sizeof(oi_socket): %d\n", sizeof(oi_socket)); - - oi_server_init(&server, 10); - server.on_connection = on_server_connection; - -#if HAVE_GNUTLS -# if SECURE - anon_tls_init(); -# endif -#endif - - struct addrinfo *servinfo; - struct addrinfo hints; - memset(&hints, 0, sizeof hints); -#if TCP - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_PASSIVE; - r = getaddrinfo(NULL, PORT, &hints, &servinfo); - assert(r == 0); -#else - struct stat tstat; - if (lstat(SOCKFILE, &tstat) == 0) { - if (S_ISSOCK(tstat.st_mode)) - unlink(SOCKFILE); - } - - servinfo = malloc(sizeof(struct addrinfo)); - servinfo->ai_family = AF_UNIX; - servinfo->ai_socktype = SOCK_STREAM; - servinfo->ai_protocol = 0; - - struct sockaddr_un *sockaddr = calloc(sizeof(struct sockaddr_un), 1); - sockaddr->sun_family = AF_UNIX; - strcpy(sockaddr->sun_path, SOCKFILE); - - servinfo->ai_addr = (struct sockaddr*)sockaddr; - servinfo->ai_addrlen = sizeof(struct sockaddr_un); -#endif - r = oi_server_listen(&server, servinfo); - assert(r == 0); - oi_server_attach(EV_DEFAULT_ &server); - - ev_loop(EV_DEFAULT_ 0); - -#if TCP - freeaddrinfo(servinfo); -#endif - - return 0; -} diff --git a/deps/liboi/test/ping_pong.c b/deps/liboi/test/ping_pong.c deleted file mode 100644 index 08ffabecfa..0000000000 --- a/deps/liboi/test/ping_pong.c +++ /dev/null @@ -1,164 +0,0 @@ -#include "test/common.c" - -#define PING "PING" -#define PONG "PONG" -#define EXCHANGES 500 -#define TIMEOUT 5.0 - -int successful_ping_count; - -static void -on_peer_read(oi_socket *socket, const void *base, size_t len) -{ - if (len == 0) { - oi_socket_close(socket); - return; - } - - char buf[2000]; - strncpy(buf, base, len); - buf[len] = 0; - //printf("server got message: %s\n", buf); - - oi_socket_write_simple(socket, PONG, sizeof PONG); -} - -static void -on_client_close(oi_socket *socket) -{ - //printf("client connection closed\n"); - ev_unloop(EV_DEFAULT_ EVUNLOOP_ALL); -} - -static oi_socket* -on_server_connection(oi_server *server, struct sockaddr *addr, socklen_t len) -{ - oi_socket *socket = malloc(sizeof(oi_socket)); - oi_socket_init(socket, TIMEOUT); - socket->on_read = on_peer_read; - socket->on_close = on_peer_close; - socket->on_timeout = on_peer_timeout; - - nconnections++; - -#if HAVE_GNUTLS -# if SECURE - anon_tls_server(socket); -# endif -#endif - - //printf("on server connection\n"); - - return socket; -} - -static void -on_client_connect (oi_socket *socket) -{ - //printf("client connected. sending ping\n"); - oi_socket_write_simple(socket, PING, sizeof PING); -} - -static void -on_client_read (oi_socket *socket, const void *base, size_t len) -{ - if(len == 0) { - oi_socket_close(socket); - return; - } - - char buf[200000]; - strncpy(buf, base, len); - buf[len] = 0; - //printf("client got message: %s\n", buf); - - if(strcmp(buf, PONG) == 0) { - - if(++successful_ping_count > EXCHANGES) { - oi_socket_close(socket); - return; - } - oi_socket_write_simple(socket, PING, sizeof PING); - } else { - assert(0); - } -} - -int -main(int argc, const char *argv[]) -{ - int r; - oi_server server; - oi_socket client; - - //printf("sizeof(oi_server): %d\n", sizeof(oi_server)); - //printf("sizeof(oi_socket): %d\n", sizeof(oi_socket)); - - oi_server_init(&server, 10); - server.on_connection = on_server_connection; - -#if HAVE_GNUTLS -# if SECURE - anon_tls_init(); -# endif -#endif - - struct addrinfo *servinfo; - struct addrinfo hints; - memset(&hints, 0, sizeof hints); -#if TCP - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_PASSIVE; - r = getaddrinfo(NULL, PORT, &hints, &servinfo); - assert(r == 0); -#else - struct stat tstat; - if (lstat(SOCKFILE, &tstat) == 0) { - if (S_ISSOCK(tstat.st_mode)) - unlink(SOCKFILE); - } - - servinfo = malloc(sizeof(struct addrinfo)); - servinfo->ai_family = AF_UNIX; - servinfo->ai_socktype = SOCK_STREAM; - servinfo->ai_protocol = 0; - - struct sockaddr_un *sockaddr = calloc(sizeof(struct sockaddr_un), 1); - sockaddr->sun_family = AF_UNIX; - strcpy(sockaddr->sun_path, SOCKFILE); - - servinfo->ai_addr = (struct sockaddr*)sockaddr; - servinfo->ai_addrlen = sizeof(struct sockaddr_un); -#endif - r = oi_server_listen(&server, servinfo); - assert(r == 0); - oi_server_attach(EV_DEFAULT_ &server); - - oi_socket_init(&client, TIMEOUT); - client.on_read = on_client_read; - client.on_connect = on_client_connect; - client.on_close = on_client_close; - client.on_timeout = on_client_timeout; - -#if HAVE_GNUTLS -# if SECURE - anon_tls_client(&client); -# endif -#endif - - r = oi_socket_connect(&client, servinfo); - assert(r == 0 && "problem connecting"); - oi_socket_attach(EV_DEFAULT_ &client); - - ev_loop(EV_DEFAULT_ 0); - - assert(successful_ping_count == EXCHANGES + 1); - assert(nconnections == 1); - -#if TCP - freeaddrinfo(servinfo); -#endif - - return 0; -} diff --git a/src/net.cc b/src/net.cc index 4118db1f62..0eeb6265f6 100644 --- a/src/net.cc +++ b/src/net.cc @@ -101,7 +101,7 @@ Connection::Init (void) { opening = false; double timeout = 60.0; // default - oi_socket_init(&socket_, timeout); + evnet_socket_init(&socket_, timeout); socket_.on_connect = Connection::on_connect; socket_.on_read = Connection::on_read; socket_.on_drain = Connection::on_drain; @@ -267,13 +267,13 @@ Connection::AfterResolve (eio_req *req) // no error. return. if (r == 0 && req->result == 0) { - oi_socket_attach (EV_DEFAULT_UC_ &connection->socket_); + evnet_socket_attach (EV_DEFAULT_UC_ &connection->socket_); goto out; } /* RESOLVE ERROR */ - /* TODO: the whole resolve process should be moved into oi_socket. + /* TODO: the whole resolve process should be moved into evnet_socket. * The fact that I'm modifying a read-only variable here should be * good evidence of this. */ @@ -368,7 +368,7 @@ Connection::Send (const Arguments& args) // XXX // A lot of improvement can be made here. First of all we're allocating - // oi_bufs for every send which is clearly inefficent - it should use a + // evnet_bufs for every send which is clearly inefficent - it should use a // memory pool or ring buffer. Of course, expressing binary data as an // array of integers is extremely inefficent. This can improved when v8 // bug 270 (http://code.google.com/p/v8/issues/detail?id=270) has been @@ -378,7 +378,7 @@ Connection::Send (const Arguments& args) enum encoding enc = ParseEncoding(args[1]); Local s = args[0]->ToString(); size_t len = s->Utf8Length(); - oi_buf *buf = node::buf_new(len); + evnet_buf *buf = node::buf_new(len); switch (enc) { case RAW: case ASCII: @@ -397,7 +397,7 @@ Connection::Send (const Arguments& args) } else if (args[0]->IsArray()) { Handle array = Handle::Cast(args[0]); size_t len = array->Length(); - oi_buf *buf = node::buf_new(len); + evnet_buf *buf = node::buf_new(len); for (size_t i = 0; i < len; i++) { Local int_value = array->Get(Integer::New(i)); buf->base[i] = int_value->IntegerValue(); @@ -517,12 +517,10 @@ Server::UnwrapConnection (Local connection) } Connection* -Server::OnConnection (struct sockaddr *addr, socklen_t len) +Server::OnConnection (struct sockaddr *addr) { HandleScope scope; - assert(len > 0); // just to get rid of the warning. - TryCatch try_catch; Local js_connection = diff --git a/src/net.h b/src/net.h index a3b9a24f61..2862d64d85 100644 --- a/src/net.h +++ b/src/net.h @@ -4,7 +4,7 @@ #include "node.h" #include "events.h" #include -#include +#include namespace node { @@ -41,12 +41,12 @@ protected: virtual ~Connection (void); int Connect (struct addrinfo *address) { - return oi_socket_connect (&socket_, address); + return evnet_socket_connect (&socket_, address); } - void Send (oi_buf *buf) { oi_socket_write(&socket_, buf); } - void Close (void) { oi_socket_close(&socket_); } - void FullClose (void) { oi_socket_full_close(&socket_); } - void ForceClose (void) { oi_socket_force_close(&socket_); } + void Send (evnet_buf *buf) { evnet_socket_write(&socket_, buf); } + void Close (void) { evnet_socket_close(&socket_); } + void FullClose (void) { evnet_socket_full_close(&socket_); } + void ForceClose (void) { evnet_socket_force_close(&socket_); } virtual void OnConnect (void); virtual void OnReceive (const void *buf, size_t len); @@ -66,12 +66,12 @@ protected: private: /* liboi callbacks */ - static void on_connect (oi_socket *s) { + static void on_connect (evnet_socket *s) { Connection *connection = static_cast (s->data); connection->OnConnect(); } - static void on_read (oi_socket *s, const void *buf, size_t len) { + static void on_read (evnet_socket *s, const void *buf, size_t len) { Connection *connection = static_cast (s->data); if (len == 0) connection->OnEOF(); @@ -79,12 +79,12 @@ private: connection->OnReceive(buf, len); } - static void on_drain (oi_socket *s) { + static void on_drain (evnet_socket *s) { Connection *connection = static_cast (s->data); connection->OnDrain(); } - static void on_close (oi_socket *s) { + static void on_close (evnet_socket *s) { Connection *connection = static_cast (s->data); connection->OnDisconnect(); @@ -96,7 +96,7 @@ private: connection->Detach(); } - static void on_timeout (oi_socket *s) { + static void on_timeout (evnet_socket *s) { Connection *connection = static_cast (s->data); connection->OnTimeout(); } @@ -107,7 +107,7 @@ private: static int AfterResolve (eio_req *req); char *host_; char *port_; - oi_socket socket_; + evnet_socket socket_; friend class Server; }; @@ -124,25 +124,25 @@ protected: Server (void) : EventEmitter() { - oi_server_init(&server_, 1024); + evnet_server_init(&server_); server_.on_connection = Server::on_connection; server_.data = this; } virtual ~Server () { - oi_server_close (&server_); + evnet_server_close (&server_); } int Listen (struct addrinfo *address) { - int r = oi_server_listen (&server_, address); + int r = evnet_server_listen (&server_, address, 1024); if(r != 0) return r; - oi_server_attach (EV_DEFAULT_ &server_); + evnet_server_attach (EV_DEFAULT_ &server_); Attach(); return 0; } void Close ( ) { - oi_server_close (&server_); + evnet_server_close (&server_); Detach(); } @@ -150,14 +150,14 @@ protected: virtual Connection* UnwrapConnection (v8::Local connection); private: - Connection* OnConnection (struct sockaddr *addr, socklen_t len); - static oi_socket* on_connection (oi_server *s, struct sockaddr *addr, socklen_t len) { + Connection* OnConnection (struct sockaddr *addr); + static evnet_socket* on_connection (evnet_server *s, struct sockaddr *addr) { Server *server = static_cast (s->data); - Connection *connection = server->OnConnection (addr, len); + Connection *connection = server->OnConnection (addr); return &connection->socket_; } - oi_server server_; + evnet_server server_; }; } // namespace node diff --git a/src/node.cc b/src/node.cc index a60aa784b4..f5306619af 100644 --- a/src/node.cc +++ b/src/node.cc @@ -24,21 +24,21 @@ using namespace node; using namespace std; static void -buf_free (oi_buf *b) +buf_free (evnet_buf *b) { V8::AdjustAmountOfExternalAllocatedMemory(-b->len); free(b); } -oi_buf * +evnet_buf * node::buf_new (size_t size) { - size_t total = sizeof(oi_buf) + size; + size_t total = sizeof(evnet_buf) + size; void *p = malloc(total); if (p == NULL) return NULL; - oi_buf *b = static_cast(p); - b->base = static_cast(p) + sizeof(oi_buf); + evnet_buf *b = static_cast(p); + b->base = static_cast(p) + sizeof(evnet_buf); b->len = size; b->release = buf_free; diff --git a/src/node.h b/src/node.h index e172d3f764..ec3042505f 100644 --- a/src/node.h +++ b/src/node.h @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include "object_wrap.h" @@ -30,7 +30,7 @@ do { \ enum encoding {ASCII, UTF8, RAW}; enum encoding ParseEncoding (v8::Handle encoding_v); void FatalException (v8::TryCatch &try_catch); -oi_buf * buf_new (size_t size); +evnet_buf * buf_new (size_t size); } // namespace node #endif // node_h diff --git a/src/process.cc b/src/process.cc index db0735a209..0b02f555e4 100644 --- a/src/process.cc +++ b/src/process.cc @@ -100,13 +100,13 @@ Process::Write (const Arguments& args) // XXX // A lot of improvement can be made here. First of all we're allocating - // oi_bufs for every send which is clearly inefficent - it should use a + // evnet_bufs for every send which is clearly inefficent - it should use a // memory pool or ring buffer. Of course, expressing binary data as an // array of integers is extremely inefficent. This can improved when v8 // bug 270 (http://code.google.com/p/v8/issues/detail?id=270) has been // addressed. - oi_buf *buf; + evnet_buf *buf; size_t len; if (args[0]->IsString()) { @@ -196,7 +196,7 @@ Process::Process () pid_ = 0; - oi_queue_init(&out_stream_); + evnet_queue_init(&out_stream_); } Process::~Process () @@ -208,10 +208,10 @@ void Process::Shutdown () { // Clear the out_stream - while (!oi_queue_empty(&out_stream_)) { - oi_queue *q = oi_queue_last(&out_stream_); - oi_buf *buf = (oi_buf*) oi_queue_data(q, oi_buf, queue); - oi_queue_remove(q); + while (!evnet_queue_empty(&out_stream_)) { + evnet_queue *q = evnet_queue_last(&out_stream_); + evnet_buf *buf = (evnet_buf*) evnet_queue_data(q, evnet_buf, queue); + evnet_queue_remove(q); if (buf->release) buf->release(buf); } @@ -394,9 +394,9 @@ Process::OnWritable (EV_P_ ev_io *watcher, int revents) assert(revents == EV_WRITE); assert(process->stdin_pipe_[1] >= 0); - while (!oi_queue_empty(&process->out_stream_)) { - oi_queue *q = oi_queue_last(&process->out_stream_); - oi_buf *to_write = (oi_buf*) oi_queue_data(q, oi_buf, queue); + while (!evnet_queue_empty(&process->out_stream_)) { + evnet_queue *q = evnet_queue_last(&process->out_stream_); + evnet_buf *to_write = (evnet_buf*) evnet_queue_data(q, evnet_buf, queue); sent = write( process->stdin_pipe_[1] , to_write->base + to_write->written @@ -417,12 +417,12 @@ Process::OnWritable (EV_P_ ev_io *watcher, int revents) to_write->written += sent; if (to_write->written == to_write->len) { - oi_queue_remove(q); + evnet_queue_remove(q); if (to_write->release) to_write->release(to_write); } } - if (oi_queue_empty(&process->out_stream_)) { + if (evnet_queue_empty(&process->out_stream_)) { ev_io_stop(EV_DEFAULT_UC_ &process->stdin_watcher_); if (process->got_close_) { close(process->stdin_pipe_[1]); @@ -461,10 +461,10 @@ Process::OnCHLD (EV_P_ ev_child *watcher, int revents) } int -Process::Write (oi_buf *buf) +Process::Write (evnet_buf *buf) { if (STDIN_CLOSED || got_close_ || got_chld_) return -1; - oi_queue_insert_head(&out_stream_, &buf->queue); + evnet_queue_insert_head(&out_stream_, &buf->queue); buf->written = 0; ev_io_start(EV_DEFAULT_UC_ &stdin_watcher_); return 0; diff --git a/src/process.h b/src/process.h index a169ccce6b..2799834de0 100644 --- a/src/process.h +++ b/src/process.h @@ -5,7 +5,7 @@ #include "events.h" #include #include -#include +#include namespace node { @@ -26,7 +26,7 @@ class Process : EventEmitter { ~Process(); int Spawn (const char *command); - int Write (oi_buf *buf); + int Write (evnet_buf *buf); int Close (void); int Kill (int sig); @@ -54,7 +54,7 @@ class Process : EventEmitter { bool got_chld_; int exit_code_; - oi_queue out_stream_; + evnet_queue out_stream_; }; } // namespace node diff --git a/wscript b/wscript index 277154d250..b04b56527d 100644 --- a/wscript +++ b/wscript @@ -112,16 +112,16 @@ def build(bld): v8_debug.rule = v8rule % ( v8dir_src , deps_tgt , v8dir_tgt, scons, "debug") v8_debug.target = join("deps/v8", bld.env["staticlib_PATTERN"] % "v8_g") - ### oi - oi = bld.new_task_gen("cc", "staticlib") - oi.source = "deps/liboi/oi_socket.c" - oi.includes = "deps/liboi/ deps/libev/" - oi.name = "oi" - oi.target = "oi" - # oi.uselib = "GNUTLS" - oi.install_path = None + ### evnet + evnet = bld.new_task_gen("cc", "staticlib") + evnet.source = "deps/evnet/evnet.c" + evnet.includes = "deps/evnet/ deps/libev/" + evnet.name = "evnet" + evnet.target = "evnet" + # evnet.uselib = "GNUTLS" + evnet.install_path = None if bld.env["USE_DEBUG"]: - oi.clone("debug") + evnet.clone("debug") ### http_parser http_parser = bld.new_task_gen("cc", "staticlib") @@ -173,10 +173,10 @@ def build(bld): deps/v8/include deps/libev deps/libeio - deps/liboi + deps/evnet deps/http_parser """ - node.uselib_local = "oi ev eio http_parser" + node.uselib_local = "evnet ev eio http_parser" node.uselib = "V8 EXECINFO PROFILER EFENCE" node.install_path = '${PREFIX}/bin' node.chmod = 0755