diff --git a/deps/evcom/evcom.c b/deps/evcom/evcom.c index b070f58268..f3a69cf144 100644 --- a/deps/evcom/evcom.c +++ b/deps/evcom/evcom.c @@ -35,21 +35,22 @@ #include /* TCP_NODELAY */ #include #include /* shutdown */ +#include #include #include #if EV_MULTIPLICITY -# define SOCKET_LOOP_ socket->loop, +# define STREAM_LOOP_ stream->loop, # define SERVER_LOOP_ server->loop, #else -# define SOCKET_LOOP_ +# define STREAM_LOOP_ # define SERVER_LOOP_ #endif // EV_MULTIPLICITY #if EVCOM_HAVE_GNUTLS -static int secure_full_goodbye (evcom_socket *socket); -static int secure_half_goodbye (evcom_socket *socket); +static int secure_full_goodbye (evcom_stream *stream); +static int secure_half_goodbye (evcom_stream *stream); #endif #undef TRUE @@ -70,6 +71,7 @@ static int secure_half_goodbye (evcom_socket *socket); #define GOT_HALF_CLOSE(s) ((s)->flags & EVCOM_GOT_HALF_CLOSE) #define GOT_FULL_CLOSE(s) ((s)->flags & EVCOM_GOT_FULL_CLOSE) #define TOO_MANY_CONN(s) ((s)->flags & EVCOM_TOO_MANY_CONN) +#define READ_PAUSED(s) ((s)->flags & EVCOM_READ_PAUSED) static void @@ -137,110 +139,110 @@ evcom_buf_new (const char *base, size_t len) } static int -full_close (evcom_socket *socket) +full_close (evcom_stream *stream) { - //printf("close(%d)\n", socket->fd); - if (close(socket->fd) == -1) { + //printf("close(%d)\n", stream->fd); + if (close(stream->fd) == -1) { if (errno == EINTR) { return AGAIN; } else { - socket->errorno = errno; + stream->errorno = errno; return ERROR; } } - socket->read_action = NULL; - socket->write_action = NULL; - socket->fd = -1; + stream->read_action = NULL; + stream->write_action = NULL; + stream->fd = -1; return OKAY; } static inline void -close_asap (evcom_socket *socket) +close_asap (evcom_stream *stream) { - socket->read_action = full_close; - socket->write_action = full_close; + stream->read_action = full_close; + stream->write_action = full_close; } static int -half_close (evcom_socket *socket) +half_close (evcom_stream *stream) { - int r = shutdown(socket->fd, SHUT_WR); + int r = shutdown(stream->fd, SHUT_WR); if (r == -1) { switch (errno) { case ENOTCONN: - socket->errorno = errno; + stream->errorno = errno; return ERROR; default: perror("shutdown()"); - socket->errorno = errno; + stream->errorno = errno; assert(0 && "Shouldn't get an error on shutdown"); return ERROR; } } - socket->write_action = NULL; - if (socket->read_action == NULL) - socket->fd = -1; + stream->write_action = NULL; + if (stream->read_action == NULL) + stream->fd = -1; return OKAY; } // This is to be called when ever the out_stream is empty // and we need to change state. static void -change_state_for_empty_out_stream (evcom_socket *socket) +change_state_for_empty_out_stream (evcom_stream *stream) { /* * a very complicated bunch of close logic! * XXX this is awful. FIXME */ - if (socket->write_action == full_close || socket->read_action == full_close) { + if (stream->write_action == full_close || stream->read_action == full_close) { return; } - if (!GOT_HALF_CLOSE(socket)) { - if (!GOT_FULL_CLOSE(socket)) { + if (!GOT_HALF_CLOSE(stream)) { + if (!GOT_FULL_CLOSE(stream)) { /* Normal situation. Didn't get any close signals. */ - ev_io_stop(SOCKET_LOOP_ &socket->write_watcher); + ev_io_stop(STREAM_LOOP_ &stream->write_watcher); } else { /* Got Full Close. */ - if (socket->read_action) { + if (stream->read_action) { #if EVCOM_HAVE_GNUTLS - socket->read_action = SECURE(socket) ? secure_full_goodbye : full_close; + stream->read_action = SECURE(stream) ? secure_full_goodbye : full_close; #else - socket->read_action = full_close; + stream->read_action = full_close; #endif } - if (socket->write_action) { + if (stream->write_action) { #if EVCOM_HAVE_GNUTLS - socket->write_action = SECURE(socket) ? secure_full_goodbye : full_close; + stream->write_action = SECURE(stream) ? secure_full_goodbye : full_close; #else - socket->write_action = full_close; + stream->write_action = full_close; #endif } } } else { /* Got Half Close. */ - if (socket->write_action) { + if (stream->write_action) { #if EVCOM_HAVE_GNUTLS - socket->write_action = SECURE(socket) ? secure_half_goodbye : half_close; + stream->write_action = SECURE(stream) ? secure_half_goodbye : half_close; #else - socket->write_action = half_close; + stream->write_action = half_close; #endif - ev_io_start(SOCKET_LOOP_ &socket->write_watcher); + ev_io_start(STREAM_LOOP_ &stream->write_watcher); } } } static void -update_write_buffer_after_send (evcom_socket *socket, ssize_t sent) +update_write_buffer_after_send (evcom_stream *stream, ssize_t sent) { - evcom_queue *q = evcom_queue_last(&socket->out_stream); + evcom_queue *q = evcom_queue_last(&stream->out_stream); evcom_buf *to_write = evcom_queue_data(q, evcom_buf, queue); to_write->written += sent; - socket->written += sent; + stream->written += sent; if (to_write->written == to_write->len) { @@ -250,24 +252,24 @@ update_write_buffer_after_send (evcom_socket *socket, ssize_t sent) to_write->release(to_write); } - if (evcom_queue_empty(&socket->out_stream)) { - change_state_for_empty_out_stream(socket); - if (socket->on_drain) - socket->on_drain(socket); + if (evcom_queue_empty(&stream->out_stream)) { + change_state_for_empty_out_stream(stream); + if (stream->on_drain) + stream->on_drain(stream); } } } #if EVCOM_HAVE_GNUTLS -static int secure_socket_send (evcom_socket *socket); -static int secure_socket_recv (evcom_socket *socket); +static int secure_stream_send (evcom_stream *stream); +static int secure_stream_recv (evcom_stream *stream); /* TODO can this be done without ignoring SIGPIPE? */ static ssize_t nosigpipe_push (gnutls_transport_ptr_t data, const void *buf, size_t len) { - evcom_socket *socket = (evcom_socket*)data; - assert(SECURE(socket)); + evcom_stream *stream = (evcom_stream*)data; + assert(SECURE(stream)); int flags = 0; #ifdef MSG_NOSIGNAL flags |= MSG_NOSIGNAL; @@ -275,75 +277,75 @@ nosigpipe_push (gnutls_transport_ptr_t data, const void *buf, size_t len) #ifdef MSG_DONTWAIT flags |= MSG_DONTWAIT; #endif - int r = send(socket->fd, buf, len, flags); + int r = send(stream->fd, buf, len, flags); if (r == -1) { - gnutls_transport_set_errno(socket->session, errno); /* necessary ? */ + stream->errorno = errno; } return r; } static int -secure_handshake (evcom_socket *socket) +secure_handshake (evcom_stream *stream) { - assert(SECURE(socket)); + assert(SECURE(stream)); - int r = gnutls_handshake(socket->session); + int r = gnutls_handshake(stream->session); if (gnutls_error_is_fatal(r)) { - socket->gnutls_errorno = r; + stream->gnutls_errorno = r; return ERROR; } if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN) return AGAIN; - evcom_socket_reset_timeout(socket); + evcom_stream_reset_timeout(stream); - if (!CONNECTED(socket)) { - socket->flags |= EVCOM_CONNECTED; - if (socket->on_connect) socket->on_connect(socket); + if (!CONNECTED(stream)) { + stream->flags |= EVCOM_CONNECTED; + if (stream->on_connect) stream->on_connect(stream); } - if (socket->read_action == secure_handshake) { - socket->read_action = secure_socket_recv; + if (stream->read_action == secure_handshake) { + stream->read_action = secure_stream_recv; } - if (socket->write_action == secure_handshake) { - socket->write_action = secure_socket_send; + if (stream->write_action == secure_handshake) { + stream->write_action = secure_stream_send; } return OKAY; } static int -secure_socket_send (evcom_socket *socket) +secure_stream_send (evcom_stream *stream) { ssize_t sent; - if (evcom_queue_empty(&socket->out_stream)) { - ev_io_stop(SOCKET_LOOP_ &socket->write_watcher); + if (evcom_queue_empty(&stream->out_stream)) { + ev_io_stop(STREAM_LOOP_ &stream->write_watcher); return AGAIN; } - evcom_queue *q = evcom_queue_last(&socket->out_stream); + evcom_queue *q = evcom_queue_last(&stream->out_stream); evcom_buf *to_write = evcom_queue_data(q, evcom_buf, queue); - assert(SECURE(socket)); + assert(SECURE(stream)); - sent = gnutls_record_send(socket->session, + sent = gnutls_record_send(stream->session, to_write->base + to_write->written, to_write->len - to_write->written); if (gnutls_error_is_fatal(sent)) { - socket->gnutls_errorno = sent; + stream->gnutls_errorno = sent; return ERROR; } if (sent == 0) return AGAIN; - evcom_socket_reset_timeout(socket); + evcom_stream_reset_timeout(stream); if (sent == GNUTLS_E_INTERRUPTED || sent == GNUTLS_E_AGAIN) { return AGAIN; @@ -351,9 +353,9 @@ secure_socket_send (evcom_socket *socket) if (sent > 0) { /* make sure the callbacks are correct */ - if (socket->read_action) - socket->read_action = secure_socket_recv; - update_write_buffer_after_send(socket, sent); + if (stream->read_action) + stream->read_action = secure_stream_recv; + update_write_buffer_after_send(stream, sent); return OKAY; } @@ -362,20 +364,20 @@ secure_socket_send (evcom_socket *socket) } static int -secure_socket_recv (evcom_socket *socket) +secure_stream_recv (evcom_stream *stream) { - char recv_buffer[socket->chunksize]; - size_t recv_buffer_size = socket->chunksize; + char recv_buffer[stream->chunksize]; + size_t recv_buffer_size = stream->chunksize; ssize_t recved; - assert(SECURE(socket)); + assert(SECURE(stream)); - recved = gnutls_record_recv(socket->session, recv_buffer, recv_buffer_size); + recved = gnutls_record_recv(stream->session, recv_buffer, recv_buffer_size); - //printf("secure socket recv %d %p\n", recved, socket->on_connect); + //printf("secure stream recv %d %p\n", recved, stream->on_connect); if (gnutls_error_is_fatal(recved)) { - socket->gnutls_errorno = recved; + stream->gnutls_errorno = recved; return ERROR; } @@ -383,18 +385,18 @@ secure_socket_recv (evcom_socket *socket) return AGAIN; } - evcom_socket_reset_timeout(socket); + evcom_stream_reset_timeout(stream); /* A server may also receive GNUTLS_E_REHANDSHAKE when a client has * initiated a handshake. In that case the server can only initiate a * handshake or terminate the connection. */ if (recved == GNUTLS_E_REHANDSHAKE) { - if (socket->write_action) { - socket->read_action = secure_handshake; - socket->write_action = secure_handshake; + if (stream->write_action) { + stream->read_action = secure_handshake; + stream->write_action = secure_handshake; return OKAY; } else { - socket->read_action = full_close; + stream->read_action = full_close; // set error return ERROR; } @@ -403,16 +405,16 @@ secure_socket_recv (evcom_socket *socket) if (recved >= 0) { /* Got EOF */ if (recved == 0) { - socket->read_action = NULL; - if (socket->write_action == NULL) close_asap(socket); + stream->read_action = NULL; + if (stream->write_action == NULL) close_asap(stream); } - if (socket->write_action) { - socket->write_action = secure_socket_send; + if (stream->write_action) { + stream->write_action = secure_stream_send; } - if (socket->on_read) { - socket->on_read(socket, recv_buffer, recved); + if (stream->on_read) { + stream->on_read(stream, recv_buffer, recved); } return OKAY; @@ -423,64 +425,64 @@ secure_socket_recv (evcom_socket *socket) } static int -secure_full_goodbye (evcom_socket *socket) +secure_full_goodbye (evcom_stream *stream) { - assert(SECURE(socket)); + assert(SECURE(stream)); - int r = gnutls_bye(socket->session, GNUTLS_SHUT_RDWR); + int r = gnutls_bye(stream->session, GNUTLS_SHUT_RDWR); if (gnutls_error_is_fatal(r)) { - socket->gnutls_errorno = r; + stream->gnutls_errorno = r; return ERROR; } if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN) return AGAIN; - close_asap(socket); + close_asap(stream); return OKAY; } static int -secure_half_goodbye (evcom_socket *socket) +secure_half_goodbye (evcom_stream *stream) { - assert(SECURE(socket)); + assert(SECURE(stream)); - int r = gnutls_bye(socket->session, GNUTLS_SHUT_WR); + int r = gnutls_bye(stream->session, GNUTLS_SHUT_WR); if (gnutls_error_is_fatal(r)) { - socket->gnutls_errorno = r; + stream->gnutls_errorno = r; return ERROR; } if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN) return AGAIN; - if (socket->write_action) socket->write_action = half_close; + if (stream->write_action) stream->write_action = half_close; return OKAY; } void -evcom_socket_set_secure_session (evcom_socket *socket, gnutls_session_t session) +evcom_stream_set_secure_session (evcom_stream *stream, gnutls_session_t session) { - socket->session = session; - socket->flags |= EVCOM_SECURE; + stream->session = session; + stream->flags |= EVCOM_SECURE; } #endif /* HAVE GNUTLS */ static int -socket_send (evcom_socket *socket) +stream_send (evcom_stream *stream) { ssize_t sent; - assert(!SECURE(socket)); + assert(!SECURE(stream)); - if (evcom_queue_empty(&socket->out_stream)) { - ev_io_stop(SOCKET_LOOP_ &socket->write_watcher); + if (evcom_queue_empty(&stream->out_stream)) { + ev_io_stop(STREAM_LOOP_ &stream->write_watcher); return AGAIN; } - evcom_queue *q = evcom_queue_last(&socket->out_stream); + evcom_queue *q = evcom_queue_last(&stream->out_stream); evcom_buf *to_write = evcom_queue_data(q, evcom_buf, queue); int flags = 0; @@ -493,7 +495,7 @@ socket_send (evcom_socket *socket) /* TODO use writev() here */ - sent = send(socket->fd, + sent = send(stream->fd, to_write->base + to_write->written, to_write->len - to_write->written, flags); @@ -508,39 +510,39 @@ socket_send (evcom_socket *socket) return AGAIN; default: - socket->errorno = errno; + stream->errorno = errno; return ERROR; } } - evcom_socket_reset_timeout(socket); + evcom_stream_reset_timeout(stream); - if (!CONNECTED(socket)) { - socket->flags |= EVCOM_CONNECTED; - if (socket->on_connect) socket->on_connect(socket); + if (!CONNECTED(stream)) { + stream->flags |= EVCOM_CONNECTED; + if (stream->on_connect) stream->on_connect(stream); } - update_write_buffer_after_send(socket, sent); + update_write_buffer_after_send(stream, sent); return OKAY; } static int -socket_recv (evcom_socket *socket) +stream_recv (evcom_stream *stream) { - char buf[socket->chunksize]; - size_t buf_size = socket->chunksize; + char buf[stream->chunksize]; + size_t buf_size = stream->chunksize; ssize_t recved; - assert(!SECURE(socket)); + assert(!SECURE(stream)); - if (!CONNECTED(socket)) { - socket->flags |= EVCOM_CONNECTED; - if (socket->on_connect) socket->on_connect(socket); + if (!CONNECTED(stream)) { + stream->flags |= EVCOM_CONNECTED; + if (stream->on_connect) stream->on_connect(stream); //return OKAY; } - recved = recv(socket->fd, buf, buf_size, 0); + recved = recv(stream->fd, buf, buf_size, 0); if (recved < 0) { switch (errno) { @@ -555,45 +557,45 @@ socket_recv (evcom_socket *socket) return AGAIN; default: - socket->errorno = errno; + stream->errorno = errno; return ERROR; } } - evcom_socket_reset_timeout(socket); + evcom_stream_reset_timeout(stream); if (recved == 0) { - evcom_socket_read_stop(socket); - socket->read_action = NULL; - if (socket->write_action == NULL) close_asap(socket); + evcom_stream_read_pause(stream); + stream->read_action = NULL; + if (stream->write_action == NULL) close_asap(stream); } /* NOTE: EOF is signaled with recved == 0 on callback */ - if (socket->on_read) socket->on_read(socket, buf, recved); + if (stream->on_read) stream->on_read(stream, buf, recved); return OKAY; } static void -assign_file_descriptor (evcom_socket *socket, int fd) +assign_file_descriptor (evcom_stream *stream, int fd) { - socket->fd = fd; + stream->fd = fd; - ev_io_set (&socket->read_watcher, fd, EV_READ); - ev_io_set (&socket->write_watcher, fd, EV_WRITE); + ev_io_set (&stream->read_watcher, fd, EV_READ); + ev_io_set (&stream->write_watcher, fd, EV_WRITE); - socket->read_action = socket_recv; - socket->write_action = socket_send; + stream->read_action = stream_recv; + stream->write_action = stream_send; #if EVCOM_HAVE_GNUTLS - if (SECURE(socket)) { - gnutls_transport_set_lowat(socket->session, 0); - gnutls_transport_set_push_function(socket->session, nosigpipe_push); - gnutls_transport_set_ptr2(socket->session, - (gnutls_transport_ptr_t)fd, /* recv */ - socket); /* send */ - socket->read_action = secure_handshake; - socket->write_action = secure_handshake; + if (SECURE(stream)) { + gnutls_transport_set_lowat(stream->session, 0); + gnutls_transport_set_push_function(stream->session, nosigpipe_push); + gnutls_transport_set_ptr2(stream->session, + (gnutls_transport_ptr_t)(intptr_t)fd, /* recv */ + stream); /* send */ + stream->read_action = secure_handshake; + stream->write_action = secure_handshake; } #endif } @@ -614,11 +616,11 @@ server_close_with_error (evcom_server *server, int errorno) } -/* Retruns evcom_socket if a connection could be accepted. - * The returned socket is not yet attached to the event loop. +/* Retruns evcom_stream if a connection could be accepted. + * The returned stream is not yet attached to the event loop. * Otherwise NULL */ -static evcom_socket* +static evcom_stream* accept_connection (evcom_server *server) { struct sockaddr address; /* connector's address information */ @@ -639,13 +641,13 @@ accept_connection (evcom_server *server) } } - evcom_socket *socket = NULL; + evcom_stream *stream = NULL; if (server->on_connection) { - socket = server->on_connection(server, (struct sockaddr*)&address); + stream = server->on_connection(server, (struct sockaddr*)&address); } - if (socket == NULL) { + if (stream == NULL) { close(fd); return NULL; } @@ -654,14 +656,14 @@ accept_connection (evcom_server *server) #ifdef SO_NOSIGPIPE int flags = 1; - int r = setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &flags, sizeof(flags)); + int r = setsockopt(fd, SOL_STREAM, SO_NOSIGPIPE, &flags, sizeof(flags)); if (r < 0) goto error; #endif - socket->server = server; - assign_file_descriptor(socket, fd); + stream->server = server; + assign_file_descriptor(stream, fd); - return socket; + return stream; error: server_close_with_error(server, errno); @@ -689,22 +691,42 @@ on_connection (EV_P_ ev_io *watcher, int revents) } /* accept as many connections as possible */ - evcom_socket *socket; - while ((socket = accept_connection(server))) { - evcom_socket_attach(EV_A_ socket); + evcom_stream *stream; + while ((stream = accept_connection(server))) { + evcom_stream_attach(EV_A_ stream); } } +static inline socklen_t +address_length (struct sockaddr *address) +{ + struct sockaddr_un* unix_address = (struct sockaddr_un*)address; + + switch (address->sa_family) { + case AF_INET: + return sizeof(struct sockaddr_in); + + case AF_INET6: + return sizeof(struct sockaddr_in6); + + case AF_UNIX: + return strlen(unix_address->sun_path) + sizeof(unix_address->sun_family); + + default: + assert(0 && "Unsupported socket family"); + } + return 0; +} + int -evcom_server_listen (evcom_server *server, struct addrinfo *addrinfo, int backlog) +evcom_server_listen (evcom_server *server, struct sockaddr *address, int backlog) { int fd = -1; assert(!LISTENING(server)); - fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, - addrinfo->ai_protocol); + fd = socket(address->sa_family, SOCK_STREAM, 0); if (fd < 0) { - perror("socket()"); + perror("stream()"); return -1; } @@ -722,7 +744,7 @@ evcom_server_listen (evcom_server *server, struct addrinfo *addrinfo, int backlo */ //setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)); - if (bind(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) < 0) { + if (bind(fd, address, address_length(address)) < 0) { perror("bind()"); close(fd); return -1; @@ -782,203 +804,206 @@ evcom_server_init (evcom_server *server) server->on_close = NULL; } -/* Internal callback. called by socket->timeout_watcher */ +/* Internal callback. called by stream->timeout_watcher */ static void on_timeout (EV_P_ ev_timer *watcher, int revents) { - evcom_socket *socket = watcher->data; + evcom_stream *stream = watcher->data; #if EV_MULTIPLICITY - assert(socket->loop == loop); + assert(stream->loop == loop); #endif assert(revents == EV_TIMEOUT); - assert(watcher == &socket->timeout_watcher); + assert(watcher == &stream->timeout_watcher); - // printf("on_timeout\n"); + if (READ_PAUSED(stream)) { + evcom_stream_reset_timeout(stream); + return; + } - if (socket->on_timeout) socket->on_timeout(socket); + if (stream->on_timeout) stream->on_timeout(stream); // timeout does not automatically kill your connection. you must! } static void -release_write_buffer(evcom_socket *socket) +release_write_buffer(evcom_stream *stream) { - while (!evcom_queue_empty(&socket->out_stream)) { - evcom_queue *q = evcom_queue_last(&socket->out_stream); + while (!evcom_queue_empty(&stream->out_stream)) { + evcom_queue *q = evcom_queue_last(&stream->out_stream); evcom_buf *buf = evcom_queue_data(q, evcom_buf, queue); evcom_queue_remove(q); if (buf->release) buf->release(buf); } } -/* Internal callback. called by socket->read_watcher */ +/* Internal callback. called by stream->read_watcher */ static void on_io_event(EV_P_ ev_io *watcher, int revents) { - evcom_socket *socket = watcher->data; + evcom_stream *stream = watcher->data; if (revents & EV_ERROR) { - socket->errorno = 1; - close_asap(socket); + stream->errorno = 1; + close_asap(stream); } int r; - int have_read_event = (socket->read_action != NULL); - int have_write_event = (socket->write_action != NULL); + int have_read_event = (stream->read_action != NULL); + int have_write_event = (stream->write_action != NULL); while (have_read_event || have_write_event) { /* RECV LOOP - TRY TO CLEAR THE BUFFER */ - if (socket->read_action == NULL) { + if (stream->read_action == NULL) { have_read_event = FALSE; } else { - r = socket->read_action(socket); + r = stream->read_action(stream); if (r == AGAIN) { have_read_event = FALSE; } else { - if (r == ERROR) close_asap(socket); + if (r == ERROR) close_asap(stream); } } /* SEND LOOP - TRY TO CLEAR THE BUFFER */ - if (socket->write_action == NULL) { + if (stream->write_action == NULL) { have_write_event = FALSE; } else { - r = socket->write_action(socket); + r = stream->write_action(stream); if (r == AGAIN) { have_write_event = FALSE; } else { - if (r == ERROR) close_asap(socket); + if (r == ERROR) close_asap(stream); } } } // Close when both sides of the stream are closed. - if (socket->write_action == NULL && socket->read_action == NULL) { - release_write_buffer(socket); + if (stream->write_action == NULL && stream->read_action == NULL) { + release_write_buffer(stream); - ev_clear_pending (EV_A_ &socket->write_watcher); - ev_clear_pending (EV_A_ &socket->read_watcher); - ev_clear_pending (EV_A_ &socket->timeout_watcher); + ev_clear_pending (EV_A_ &stream->write_watcher); + ev_clear_pending (EV_A_ &stream->read_watcher); + ev_clear_pending (EV_A_ &stream->timeout_watcher); - evcom_socket_detach(socket); - assert(socket->fd == -1); + evcom_stream_detach(stream); + assert(stream->fd == -1); - if (socket->server) { - accept_new_connections(socket->server); + if (stream->server) { + accept_new_connections(stream->server); } - if (socket->on_close) socket->on_close(socket); - /* WARNING: user can free socket in on_close so no more + if (stream->on_close) stream->on_close(stream); + /* WARNING: user can free stream in on_close so no more * access beyond this point. */ } } /** * If using SSL do consider setting - * gnutls_db_set_retrieve_function (socket->session, _); - * gnutls_db_set_remove_function (socket->session, _); - * gnutls_db_set_store_function (socket->session, _); - * gnutls_db_set_ptr (socket->session, _); + * gnutls_db_set_retrieve_function (stream->session, _); + * gnutls_db_set_remove_function (stream->session, _); + * gnutls_db_set_store_function (stream->session, _); + * gnutls_db_set_ptr (stream->session, _); */ void -evcom_socket_init (evcom_socket *socket, float timeout) +evcom_stream_init (evcom_stream *stream, float timeout) { - socket->fd = -1; - socket->server = NULL; + stream->fd = -1; + stream->server = NULL; #if EV_MULTIPLICITY - socket->loop = NULL; + stream->loop = NULL; #endif - socket->flags = 0; + stream->flags = 0; - evcom_queue_init(&socket->out_stream); + evcom_queue_init(&stream->out_stream); - ev_init(&socket->write_watcher, on_io_event); - socket->write_watcher.data = socket; + ev_init(&stream->write_watcher, on_io_event); + stream->write_watcher.data = stream; - ev_init(&socket->read_watcher, on_io_event); - socket->read_watcher.data = socket; + ev_init(&stream->read_watcher, on_io_event); + stream->read_watcher.data = stream; - socket->errorno = 0; + stream->errorno = 0; #if EVCOM_HAVE_GNUTLS - socket->gnutls_errorno = 0; - socket->session = NULL; + stream->gnutls_errorno = 0; + stream->session = NULL; #endif - ev_timer_init(&socket->timeout_watcher, on_timeout, 0., timeout); - socket->timeout_watcher.data = socket; + ev_timer_init(&stream->timeout_watcher, on_timeout, 0., timeout); + stream->timeout_watcher.data = stream; - socket->read_action = NULL; - socket->write_action = NULL; + stream->read_action = NULL; + stream->write_action = NULL; - socket->chunksize = 8*1024; + stream->chunksize = 8*1024; - socket->on_connect = NULL; - socket->on_read = NULL; - socket->on_drain = NULL; - socket->on_timeout = NULL; + stream->on_connect = NULL; + stream->on_read = NULL; + stream->on_drain = NULL; + stream->on_timeout = NULL; } void -evcom_socket_close (evcom_socket *socket) +evcom_stream_close (evcom_stream *stream) { - socket->flags |= EVCOM_GOT_HALF_CLOSE; - if (evcom_queue_empty(&socket->out_stream)) { - change_state_for_empty_out_stream(socket); + stream->flags |= EVCOM_GOT_HALF_CLOSE; + if (evcom_queue_empty(&stream->out_stream)) { + change_state_for_empty_out_stream(stream); } } void -evcom_socket_full_close (evcom_socket *socket) +evcom_stream_full_close (evcom_stream *stream) { - socket->flags |= EVCOM_GOT_FULL_CLOSE; - if (evcom_queue_empty(&socket->out_stream)) { - change_state_for_empty_out_stream(socket); + stream->flags |= EVCOM_GOT_FULL_CLOSE; + if (evcom_queue_empty(&stream->out_stream)) { + change_state_for_empty_out_stream(stream); } } -void evcom_socket_force_close (evcom_socket *socket) +void evcom_stream_force_close (evcom_stream *stream) { - release_write_buffer(socket); + release_write_buffer(stream); - ev_clear_pending (SOCKET_LOOP_ &socket->write_watcher); - ev_clear_pending (SOCKET_LOOP_ &socket->read_watcher); - ev_clear_pending (SOCKET_LOOP_ &socket->timeout_watcher); + ev_clear_pending (STREAM_LOOP_ &stream->write_watcher); + ev_clear_pending (STREAM_LOOP_ &stream->read_watcher); + ev_clear_pending (STREAM_LOOP_ &stream->timeout_watcher); - socket->write_action = socket->read_action = NULL; - // socket->errorno = EVCOM_SOCKET_ERROR_FORCE_CLOSE + stream->write_action = stream->read_action = NULL; + // stream->errorno = EVCOM_STREAM_ERROR_FORCE_CLOSE - if (socket->fd > 0) { - close(socket->fd); - if (socket->server) { - accept_new_connections(socket->server); + if (stream->fd > 0) { + close(stream->fd); + if (stream->server) { + accept_new_connections(stream->server); } } - socket->fd = -1; + stream->fd = -1; - evcom_socket_detach(socket); + evcom_stream_detach(stream); } void -evcom_socket_write (evcom_socket *socket, evcom_buf *buf) +evcom_stream_write (evcom_stream *stream, evcom_buf *buf) { - if (socket->write_action == NULL) { - assert(0 && "Do not write to a closed socket"); + if (stream->write_action == NULL) { + assert(0 && "Do not write to a closed stream"); goto error; } - if (GOT_FULL_CLOSE(socket) || GOT_HALF_CLOSE(socket)) { - assert(0 && "Do not write to a closing socket"); + if (GOT_FULL_CLOSE(stream) || GOT_HALF_CLOSE(stream)) { + assert(0 && "Do not write to a closing stream"); goto error; } - evcom_queue_insert_head(&socket->out_stream, &buf->queue); + evcom_queue_insert_head(&stream->out_stream, &buf->queue); buf->written = 0; - if (ATTACHED(socket)) { - ev_io_start(SOCKET_LOOP_ &socket->write_watcher); + if (ATTACHED(stream)) { + ev_io_start(STREAM_LOOP_ &stream->write_watcher); } return; @@ -987,9 +1012,9 @@ error: } void -evcom_socket_reset_timeout (evcom_socket *socket) +evcom_stream_reset_timeout (evcom_stream *stream) { - ev_timer_again(SOCKET_LOOP_ &socket->timeout_watcher); + ev_timer_again(STREAM_LOOP_ &stream->timeout_watcher); } static void @@ -999,74 +1024,76 @@ free_simple_buf (evcom_buf *buf) free(buf); } -/* Writes a string to the socket. +/* Writes a string to the stream. * NOTE: Allocates memory. Avoid for performance applications. */ void -evcom_socket_write_simple (evcom_socket *socket, const char *str, size_t len) +evcom_stream_write_simple (evcom_stream *stream, const char *str, size_t len) { evcom_buf *buf = malloc(sizeof(evcom_buf)); buf->release = free_simple_buf; buf->base = strdup(str); buf->len = len; - evcom_socket_write(socket, buf); + evcom_stream_write(stream, buf); } void -evcom_socket_attach (EV_P_ evcom_socket *socket) +evcom_stream_attach (EV_P_ evcom_stream *stream) { #if EV_MULTIPLICITY - socket->loop = loop; + stream->loop = loop; #endif - socket->flags |= EVCOM_ATTACHED; + stream->flags |= EVCOM_ATTACHED; - ev_timer_again(EV_A_ &socket->timeout_watcher); + ev_timer_again(EV_A_ &stream->timeout_watcher); - if (socket->read_action) { - ev_io_start(EV_A_ &socket->read_watcher); + if (stream->read_action) { + ev_io_start(EV_A_ &stream->read_watcher); } - if (socket->write_action) { - ev_io_start(EV_A_ &socket->write_watcher); + if (stream->write_action) { + ev_io_start(EV_A_ &stream->write_watcher); } } void -evcom_socket_detach (evcom_socket *socket) +evcom_stream_detach (evcom_stream *stream) { - if (ATTACHED(socket)) { - ev_io_stop(SOCKET_LOOP_ &socket->write_watcher); - ev_io_stop(SOCKET_LOOP_ &socket->read_watcher); - ev_timer_stop(SOCKET_LOOP_ &socket->timeout_watcher); + if (ATTACHED(stream)) { + ev_io_stop(STREAM_LOOP_ &stream->write_watcher); + ev_io_stop(STREAM_LOOP_ &stream->read_watcher); + ev_timer_stop(STREAM_LOOP_ &stream->timeout_watcher); #if EV_MULTIPLICITY - socket->loop = NULL; + stream->loop = NULL; #endif - socket->flags &= ~EVCOM_ATTACHED; + stream->flags &= ~EVCOM_ATTACHED; } } void -evcom_socket_read_stop (evcom_socket *socket) +evcom_stream_read_pause (evcom_stream *stream) { - ev_io_stop(SOCKET_LOOP_ &socket->read_watcher); - ev_clear_pending(SOCKET_LOOP_ &socket->read_watcher); + ev_io_stop(STREAM_LOOP_ &stream->read_watcher); + ev_clear_pending(STREAM_LOOP_ &stream->read_watcher); + stream->flags |= EVCOM_READ_PAUSED; } void -evcom_socket_read_start (evcom_socket *socket) +evcom_stream_read_resume (evcom_stream *stream) { - if (socket->read_action) { - ev_io_start(SOCKET_LOOP_ &socket->read_watcher); + stream->flags &= ~EVCOM_READ_PAUSED; + + if (stream->read_action) { + ev_io_start(STREAM_LOOP_ &stream->read_watcher); /* XXX feed event? */ } } int -evcom_socket_connect (evcom_socket *s, struct addrinfo *addrinfo) +evcom_stream_connect (evcom_stream *s, struct sockaddr *address) { - int fd = socket(addrinfo->ai_family, addrinfo->ai_socktype, - addrinfo->ai_protocol); + int fd = socket(address->sa_family, SOCK_STREAM, 0); if (fd < 0) { perror("socket()"); return -1; @@ -1083,7 +1110,7 @@ evcom_socket_connect (evcom_socket *s, struct addrinfo *addrinfo) setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &flags, sizeof(flags)); #endif - r = connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen); + r = connect(fd, address, address_length(address)); if (r < 0 && errno != EINPROGRESS) { perror("connect"); diff --git a/deps/evcom/evcom.h b/deps/evcom/evcom.h index 3161dcec2a..6921e76eb2 100644 --- a/deps/evcom/evcom.h +++ b/deps/evcom/evcom.h @@ -45,9 +45,9 @@ extern "C" { typedef struct evcom_queue evcom_queue; typedef struct evcom_buf evcom_buf; typedef struct evcom_server evcom_server; -typedef struct evcom_socket evcom_socket; +typedef struct evcom_stream evcom_stream; -/* flags for socket and server */ +/* flags for stream and server */ #define EVCOM_ATTACHED 0x0001 #define EVCOM_LISTENING 0x0002 #define EVCOM_CONNECTED 0x0004 @@ -55,58 +55,59 @@ typedef struct evcom_socket evcom_socket; #define EVCOM_GOT_HALF_CLOSE 0x0010 #define EVCOM_GOT_FULL_CLOSE 0x0020 #define EVCOM_TOO_MANY_CONN 0x0040 +#define EVCOM_READ_PAUSED 0x0080 void evcom_server_init (evcom_server *); - int evcom_server_listen (evcom_server *, struct addrinfo *addrinfo, int backlog); + int evcom_server_listen (evcom_server *, struct sockaddr *address, int backlog); void evcom_server_attach (EV_P_ evcom_server *); void evcom_server_detach (evcom_server *); void evcom_server_close (evcom_server *); // synchronous -void evcom_socket_init (evcom_socket *, float timeout); - int evcom_socket_connect (evcom_socket *, struct addrinfo *addrinfo); -void evcom_socket_attach (EV_P_ evcom_socket *); -void evcom_socket_detach (evcom_socket *); -void evcom_socket_read_start (evcom_socket *); -void evcom_socket_read_stop (evcom_socket *); +void evcom_stream_init (evcom_stream *, float timeout); + int evcom_stream_connect (evcom_stream *, struct sockaddr *address); +void evcom_stream_attach (EV_P_ evcom_stream *); +void evcom_stream_detach (evcom_stream *); +void evcom_stream_read_resume (evcom_stream *); +void evcom_stream_read_pause (evcom_stream *); -/* Resets the timeout to stay alive for another socket->timeout seconds +/* Resets the timeout to stay alive for another stream->timeout seconds */ -void evcom_socket_reset_timeout (evcom_socket *); +void evcom_stream_reset_timeout (evcom_stream *); -/* Writes a buffer to the socket. +/* Writes a buffer to the stream. */ -void evcom_socket_write (evcom_socket *, evcom_buf *); +void evcom_stream_write (evcom_stream *, evcom_buf *); -void evcom_socket_write_simple (evcom_socket *, const char *str, size_t len); +void evcom_stream_write_simple (evcom_stream *, const char *str, size_t len); -/* Once the write buffer is drained, evcom_socket_close will shutdown the - * writing end of the socket and will close the read end once the server +/* Once the write buffer is drained, evcom_stream_close will shutdown the + * writing end of the stream and will close the read end once the server * replies with an EOF. */ -void evcom_socket_close (evcom_socket *); +void evcom_stream_close (evcom_stream *); /* Do not wait for the server to reply with EOF. This will only be called * once the write buffer is drained. - * Warning: For TCP socket, the OS kernel may (should) reply with RST + * Warning: For TCP stream, the OS kernel may (should) reply with RST * packets if this is called when data is still being received from the * server. */ -void evcom_socket_full_close (evcom_socket *); +void evcom_stream_full_close (evcom_stream *); /* The most extreme measure. * Will not wait for the write queue to complete. */ -void evcom_socket_force_close (evcom_socket *); +void evcom_stream_force_close (evcom_stream *); #if EVCOM_HAVE_GNUTLS -/* Tells the socket to use transport layer security (SSL). evcom_socket does +/* Tells the stream to use transport layer security (SSL). evcom_stream does * not want to make any decisions about security requirements, so the * majoirty of GnuTLS configuration is left to the user. Only the transport - * layer of GnuTLS is controlled by evcom_socket. That is, do not use + * layer of GnuTLS is controlled by evcom_stream. That is, do not use * gnutls_transport_* functions. Do use the rest of GnuTLS's API. */ -void evcom_socket_set_secure_session (evcom_socket *, gnutls_session_t); +void evcom_stream_set_secure_session (evcom_stream *, gnutls_session_t); #endif evcom_buf * evcom_buf_new (const char* base, size_t len); @@ -144,7 +145,7 @@ struct evcom_server { /* PUBLIC */ - evcom_socket* (*on_connection) (evcom_server *, struct sockaddr *remote_addr); + evcom_stream* (*on_connection) (evcom_server *, struct sockaddr *remote_addr); /* Executed when a server is closed. * If evcom_server_close() was called errorno will be 0. @@ -156,7 +157,7 @@ struct evcom_server { void *data; }; -struct evcom_socket { +struct evcom_stream { /* read only */ int fd; #if EV_MULTIPLICITY @@ -167,9 +168,9 @@ struct evcom_socket { size_t written; unsigned flags; - /* NULL = that end of the socket is closed. */ - int (*read_action) (evcom_socket *); - int (*write_action) (evcom_socket *); + /* NULL = that end of the stream is closed. */ + int (*read_action) (evcom_stream *); + int (*write_action) (evcom_stream *); /* ERROR CODES. 0 = no error. Check on_close. */ int errorno; @@ -187,11 +188,11 @@ struct evcom_socket { /* public */ size_t chunksize; /* the maximum chunk that on_read() will return */ - void (*on_connect) (evcom_socket *); - void (*on_read) (evcom_socket *, const void *buf, size_t count); - void (*on_drain) (evcom_socket *); - void (*on_close) (evcom_socket *); - void (*on_timeout) (evcom_socket *); + void (*on_connect) (evcom_stream *); + void (*on_read) (evcom_stream *, const void *buf, size_t count); + void (*on_drain) (evcom_stream *); + void (*on_close) (evcom_stream *); + void (*on_timeout) (evcom_stream *); void *data; }; diff --git a/deps/evcom/test/echo.c b/deps/evcom/test/echo.c index 45a2ee882d..969cefb57d 100644 --- a/deps/evcom/test/echo.c +++ b/deps/evcom/test/echo.c @@ -16,22 +16,22 @@ #define HOST "127.0.0.1" #define SOCKFILE "/tmp/oi.sock" -#define PORT "5000" +#define PORT 5000 static int nconnections; static void -on_peer_close (evcom_socket *socket) +on_peer_close (evcom_stream *stream) { - assert(socket->errorno == 0); + assert(stream->errorno == 0); //printf("server connection closed\n"); - free(socket); + free(stream); } static void -on_peer_timeout (evcom_socket *socket) +on_peer_timeout (evcom_stream *stream) { - assert(socket); + assert(stream); fprintf(stderr, "peer connection timeout\n"); assert(0); } @@ -42,31 +42,31 @@ on_peer_timeout (evcom_socket *socket) #define TIMEOUT 5.0 static void -on_peer_read (evcom_socket *socket, const void *base, size_t len) +on_peer_read (evcom_stream *stream, const void *base, size_t len) { if(len == 0) return; - evcom_socket_write_simple(socket, base, len); + evcom_stream_write_simple(stream, base, len); } -static evcom_socket* +static evcom_stream* on_server_connection (evcom_server *server, struct sockaddr *addr) { assert(server); assert(addr); - evcom_socket *socket = malloc(sizeof(evcom_socket)); - evcom_socket_init(socket, TIMEOUT); - socket->on_read = on_peer_read; - socket->on_close = on_peer_close; - socket->on_timeout = on_peer_timeout; + evcom_stream *stream = malloc(sizeof(evcom_stream)); + evcom_stream_init(stream, TIMEOUT); + stream->on_read = on_peer_read; + stream->on_close = on_peer_close; + stream->on_timeout = on_peer_timeout; nconnections++; //printf("on server connection\n"); - return socket; + return stream; } int @@ -76,27 +76,22 @@ main (void) evcom_server server; //printf("sizeof(evcom_server): %d\n", sizeof(evcom_server)); - //printf("sizeof(evcom_socket): %d\n", sizeof(evcom_socket)); + //printf("sizeof(evcom_stream): %d\n", sizeof(evcom_stream)); evcom_server_init(&server); server.on_connection = on_server_connection; - struct addrinfo *servinfo; - struct addrinfo hints; - memset(&hints, 0, sizeof hints); - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_PASSIVE; - r = getaddrinfo(NULL, PORT, &hints, &servinfo); - assert(r == 0); + struct sockaddr_in address; + memset(&address, 0, sizeof(struct sockaddr_in)); + address.sin_family = AF_INET; + address.sin_port = htons(PORT); + address.sin_addr.s_addr = INADDR_ANY; - r = evcom_server_listen(&server, servinfo, 10); + r = evcom_server_listen(&server, (struct sockaddr*)&address, 10); assert(r == 0); evcom_server_attach(EV_DEFAULT_ &server); ev_loop(EV_DEFAULT_ 0); - freeaddrinfo(servinfo); - return 0; } diff --git a/deps/evcom/test/test.c b/deps/evcom/test/test.c index 78701763d1..16998ba907 100644 --- a/deps/evcom/test/test.c +++ b/deps/evcom/test/test.c @@ -16,18 +16,10 @@ # include #endif -static const struct addrinfo tcp_hints = -/* ai_flags */ { .ai_flags = 0 -/* ai_family */ , .ai_family = AF_UNSPEC -/* ai_socktype */ , .ai_socktype = SOCK_STREAM - , 0 - }; - #define MARK_PROGRESS write(STDERR_FILENO, ".", 1) -#define HOST "127.0.0.1" #define SOCKFILE "/tmp/oi.sock" -#define PORT "5000" +#define PORT 5000 static evcom_server server; static int nconnections; @@ -43,28 +35,28 @@ common_on_server_close (evcom_server *server, int errorno) } static void -common_on_peer_close (evcom_socket *socket) +common_on_peer_close (evcom_stream *stream) { - assert(socket->errorno == 0); + assert(stream->errorno == 0); printf("server connection closed\n"); #if EVCOM_HAVE_GNUTLS - assert(socket->gnutls_errorno == 0); - if (use_tls) gnutls_deinit(socket->session); + assert(stream->gnutls_errorno == 0); + if (use_tls) gnutls_deinit(stream->session); #endif - free(socket); + free(stream); } static void -common_on_client_timeout (evcom_socket *socket) +common_on_client_timeout (evcom_stream *stream) { - assert(socket); + assert(stream); printf("client connection timeout\n"); } static void -common_on_peer_timeout (evcom_socket *socket) +common_on_peer_timeout (evcom_stream *stream) { - assert(socket); + assert(stream); fprintf(stderr, "peer connection timeout\n"); assert(0); } @@ -75,10 +67,10 @@ static gnutls_anon_server_credentials_t server_credentials; const int kx_prio[] = { GNUTLS_KX_ANON_DH, 0 }; static gnutls_dh_params_t dh_params; -void anon_tls_server (evcom_socket *socket) +void anon_tls_server (evcom_stream *stream) { gnutls_session_t session; - socket->data = session; + stream->data = session; int r = gnutls_init(&session, GNUTLS_SERVER); assert(r == 0); @@ -87,10 +79,10 @@ void anon_tls_server (evcom_socket *socket) gnutls_credentials_set(session, GNUTLS_CRD_ANON, server_credentials); gnutls_dh_set_prime_bits(session, DH_BITS); - evcom_socket_set_secure_session(socket, session); + evcom_stream_set_secure_session(stream, session); } -void anon_tls_client (evcom_socket *socket) +void anon_tls_client (evcom_stream *stream) { gnutls_session_t client_session; gnutls_anon_client_credentials_t client_credentials; @@ -102,8 +94,8 @@ void anon_tls_client (evcom_socket *socket) /* Need to enable anonymous KX specifically. */ gnutls_credentials_set(client_session, GNUTLS_CRD_ANON, client_credentials); - evcom_socket_set_secure_session(socket, client_session); - assert(socket->flags & EVCOM_SECURE); + evcom_stream_set_secure_session(stream, client_session); + assert(stream->flags & EVCOM_SECURE); } #endif // EVCOM_HAVE_GNUTLS @@ -120,10 +112,10 @@ void anon_tls_client (evcom_socket *socket) static int successful_ping_count; static void -pingpong_on_peer_read (evcom_socket *socket, const void *base, size_t len) +pingpong_on_peer_read (evcom_stream *stream, const void *base, size_t len) { if (len == 0) { - evcom_socket_close(socket); + evcom_stream_close(stream); return; } @@ -132,52 +124,52 @@ pingpong_on_peer_read (evcom_socket *socket, const void *base, size_t len) buf[len] = 0; printf("server got message: %s\n", buf); - evcom_socket_write_simple(socket, PONG, sizeof PONG); + evcom_stream_write_simple(stream, PONG, sizeof PONG); } static void -pingpong_on_client_close (evcom_socket *socket) +pingpong_on_client_close (evcom_stream *stream) { - assert(socket); + assert(stream); printf("client connection closed\n"); evcom_server_close(&server); } -static evcom_socket* +static evcom_stream* pingpong_on_server_connection (evcom_server *_server, struct sockaddr *addr) { assert(_server == &server); assert(addr); - evcom_socket *socket = malloc(sizeof(evcom_socket)); - evcom_socket_init(socket, PINGPONG_TIMEOUT); - socket->on_read = pingpong_on_peer_read; - socket->on_close = common_on_peer_close; - socket->on_timeout = common_on_peer_timeout; + evcom_stream *stream = malloc(sizeof(evcom_stream)); + evcom_stream_init(stream, PINGPONG_TIMEOUT); + stream->on_read = pingpong_on_peer_read; + stream->on_close = common_on_peer_close; + stream->on_timeout = common_on_peer_timeout; nconnections++; #if EVCOM_HAVE_GNUTLS - if (use_tls) anon_tls_server(socket); + if (use_tls) anon_tls_server(stream); #endif printf("on server connection\n"); - return socket; + return stream; } static void -pingpong_on_client_connect (evcom_socket *socket) +pingpong_on_client_connect (evcom_stream *stream) { printf("client connected. sending ping\n"); - evcom_socket_write_simple(socket, PING, sizeof PING); + evcom_stream_write_simple(stream, PING, sizeof PING); } static void -pingpong_on_client_read (evcom_socket *socket, const void *base, size_t len) +pingpong_on_client_read (evcom_stream *stream, const void *base, size_t len) { if(len == 0) { - evcom_socket_close(socket); + evcom_stream_close(stream); return; } @@ -191,37 +183,37 @@ pingpong_on_client_read (evcom_socket *socket, const void *base, size_t len) assert(strcmp(buf, PONG) == 0); if (++successful_ping_count > EXCHANGES) { - evcom_socket_close(socket); + evcom_stream_close(stream); return; } if (successful_ping_count % (EXCHANGES/20) == 0) MARK_PROGRESS; - evcom_socket_write_simple(socket, PING, sizeof PING); + evcom_stream_write_simple(stream, PING, sizeof PING); } int -pingpong (struct addrinfo *servinfo) +pingpong (struct sockaddr *address) { int r; - evcom_socket client; + evcom_stream client; successful_ping_count = 0; nconnections = 0; got_server_close = 0; printf("sizeof(evcom_server): %d\n", sizeof(evcom_server)); - printf("sizeof(evcom_socket): %d\n", sizeof(evcom_socket)); + printf("sizeof(evcom_stream): %d\n", sizeof(evcom_stream)); evcom_server_init(&server); server.on_connection = pingpong_on_server_connection; server.on_close = common_on_server_close; - r = evcom_server_listen(&server, servinfo, 10); + r = evcom_server_listen(&server, address, 10); assert(r == 0); evcom_server_attach(EV_DEFAULT_ &server); - evcom_socket_init(&client, PINGPONG_TIMEOUT); + evcom_stream_init(&client, PINGPONG_TIMEOUT); client.on_read = pingpong_on_client_read; client.on_connect = pingpong_on_client_connect; client.on_close = pingpong_on_client_close; @@ -231,9 +223,9 @@ pingpong (struct addrinfo *servinfo) if (use_tls) anon_tls_client(&client); #endif - r = evcom_socket_connect(&client, servinfo); + r = evcom_stream_connect(&client, address); assert(r == 0 && "problem connecting"); - evcom_socket_attach(EV_DEFAULT_ &client); + evcom_stream_attach(EV_DEFAULT_ &client); ev_loop(EV_DEFAULT_ 0); @@ -252,53 +244,53 @@ pingpong (struct addrinfo *servinfo) #define CONNINT_TIMEOUT 1000.0 static void -connint_on_peer_read(evcom_socket *socket, const void *base, size_t len) +connint_on_peer_read(evcom_stream *stream, const void *base, size_t len) { assert(base); assert(len == 0); - evcom_socket_write_simple(socket, "BYE", 3); + evcom_stream_write_simple(stream, "BYE", 3); printf("server wrote bye\n"); } static void -connint_on_peer_drain(evcom_socket *socket) +connint_on_peer_drain(evcom_stream *stream) { - evcom_socket_close(socket); + evcom_stream_close(stream); } -static evcom_socket* +static evcom_stream* connint_on_server_connection(evcom_server *_server, struct sockaddr *addr) { assert(_server == &server); assert(addr); - evcom_socket *socket = malloc(sizeof(evcom_socket)); - evcom_socket_init(socket, CONNINT_TIMEOUT); - socket->on_read = connint_on_peer_read; - socket->on_drain = connint_on_peer_drain; - socket->on_close = common_on_peer_close; - socket->on_timeout = common_on_peer_timeout; + evcom_stream *stream = malloc(sizeof(evcom_stream)); + evcom_stream_init(stream, CONNINT_TIMEOUT); + stream->on_read = connint_on_peer_read; + stream->on_drain = connint_on_peer_drain; + stream->on_close = common_on_peer_close; + stream->on_timeout = common_on_peer_timeout; #if EVCOM_HAVE_GNUTLS - if (use_tls) anon_tls_server(socket); + if (use_tls) anon_tls_server(stream); #endif printf("on server connection\n"); - return socket; + return stream; } static void -connint_on_client_connect (evcom_socket *socket) +connint_on_client_connect (evcom_stream *stream) { printf("on client connection\n"); - evcom_socket_close(socket); + evcom_stream_close(stream); } static void -connint_on_client_close (evcom_socket *socket) +connint_on_client_close (evcom_stream *stream) { - evcom_socket_close(socket); // already closed, but it shouldn't crash if we try to do it again + evcom_stream_close(stream); // already closed, but it shouldn't crash if we try to do it again printf("client connection closed\n"); @@ -311,10 +303,10 @@ connint_on_client_close (evcom_socket *socket) } static void -connint_on_client_read (evcom_socket *socket, const void *base, size_t len) +connint_on_client_read (evcom_stream *stream, const void *base, size_t len) { if (len == 0) { - evcom_socket_close(socket); + evcom_stream_close(stream); return; } @@ -325,11 +317,11 @@ connint_on_client_read (evcom_socket *socket, const void *base, size_t len) printf("client got message: %s\n", buf); assert(strcmp(buf, "BYE") == 0); - evcom_socket_close(socket); + evcom_stream_close(stream); } int -connint (struct addrinfo *servinfo) +connint (struct sockaddr *address) { int r; @@ -341,14 +333,14 @@ connint (struct addrinfo *servinfo) server.on_close = common_on_server_close; - evcom_server_listen(&server, servinfo, 1000); + evcom_server_listen(&server, address, 1000); evcom_server_attach(EV_DEFAULT_ &server); - evcom_socket clients[NCONN]; + evcom_stream clients[NCONN]; int i; for (i = 0; i < NCONN; i++) { - evcom_socket *client = &clients[i]; - evcom_socket_init(client, CONNINT_TIMEOUT); + evcom_stream *client = &clients[i]; + evcom_stream_init(client, CONNINT_TIMEOUT); client->on_read = connint_on_client_read; client->on_connect = connint_on_client_connect; client->on_close = connint_on_client_close; @@ -356,9 +348,9 @@ connint (struct addrinfo *servinfo) #if EVCOM_HAVE_GNUTLS if (use_tls) anon_tls_client(client); #endif - r = evcom_socket_connect(client, servinfo); + r = evcom_stream_connect(client, address); assert(r == 0 && "problem connecting"); - evcom_socket_attach(EV_DEFAULT_ client); + evcom_stream_attach(EV_DEFAULT_ client); } ev_loop(EV_DEFAULT_ 0); @@ -370,52 +362,26 @@ connint (struct addrinfo *servinfo) } -struct addrinfo * -create_tcp_address (void) -{ - struct addrinfo *servinfo; - int r = getaddrinfo(NULL, PORT, &tcp_hints, &servinfo); - assert(r == 0); - return servinfo; -} - -void -free_tcp_address (struct addrinfo *servinfo) -{ - freeaddrinfo(servinfo); -} - - -struct addrinfo * +struct sockaddr * create_unix_address (void) { - struct addrinfo *servinfo; struct stat tstat; if (lstat(SOCKFILE, &tstat) == 0) { assert(S_ISSOCK(tstat.st_mode)); unlink(SOCKFILE); } - servinfo = malloc(sizeof(struct addrinfo)); - servinfo->ai_family = AF_UNIX; - servinfo->ai_socktype = SOCK_STREAM; - servinfo->ai_protocol = 0; + struct sockaddr_un *address = calloc(1, sizeof(struct sockaddr_un)); + address->sun_family = AF_UNIX; + strcpy(address->sun_path, SOCKFILE); - struct sockaddr_un *sockaddr = calloc(sizeof(struct sockaddr_un), 1); - sockaddr->sun_family = AF_UNIX; - strcpy(sockaddr->sun_path, SOCKFILE); - - servinfo->ai_addr = (struct sockaddr*)sockaddr; - servinfo->ai_addrlen = sizeof(struct sockaddr_un); - - return servinfo; + return (struct sockaddr*)address; } void -free_unix_address (struct addrinfo *servinfo) +free_unix_address (struct sockaddr *address) { - free(servinfo->ai_addr); - free(servinfo); + free(address); } @@ -434,21 +400,26 @@ main (void) gnutls_anon_set_server_dh_params (server_credentials, dh_params); #endif - struct addrinfo *tcp_address = create_tcp_address(); - struct addrinfo *unix_address; + struct sockaddr_in tcp_address; + memset(&tcp_address, 0, sizeof(struct sockaddr_in)); + tcp_address.sin_family = AF_INET; + tcp_address.sin_port = htons(PORT); + tcp_address.sin_addr.s_addr = INADDR_ANY; use_tls = 0; - assert(pingpong(tcp_address) == 0); - assert(connint(tcp_address) == 0); + assert(pingpong((struct sockaddr*)&tcp_address) == 0); + assert(connint((struct sockaddr*)&tcp_address) == 0); #if EVCOM_HAVE_GNUTLS use_tls = 1; - assert(pingpong(tcp_address) == 0); - assert(connint(tcp_address) == 0); + assert(pingpong((struct sockaddr*)&tcp_address) == 0); + assert(connint((struct sockaddr*)&tcp_address) == 0); #endif + struct sockaddr *unix_address; + use_tls = 0; @@ -473,6 +444,5 @@ main (void) #endif - free_tcp_address(tcp_address); return 0; } diff --git a/src/net.cc b/src/net.cc index 13cb2a3c23..467d8a0591 100644 --- a/src/net.cc +++ b/src/net.cc @@ -99,18 +99,18 @@ Connection::Init (void) { opening = false; double timeout = 60.0; // default - evcom_socket_init(&socket_, timeout); - socket_.on_connect = Connection::on_connect; - socket_.on_read = Connection::on_read; - socket_.on_drain = Connection::on_drain; - socket_.on_close = Connection::on_close; - socket_.on_timeout = Connection::on_timeout; - socket_.data = this; + evcom_stream_init(&stream_, timeout); + stream_.on_connect = Connection::on_connect; + stream_.on_read = Connection::on_read; + stream_.on_drain = Connection::on_drain; + stream_.on_close = Connection::on_close; + stream_.on_timeout = Connection::on_timeout; + stream_.data = this; } Connection::~Connection () { - assert(socket_.fd < 0 && "garbage collecting open Connection"); + assert(stream_.fd < 0 && "garbage collecting open Connection"); ForceClose(); } @@ -128,19 +128,19 @@ Connection::New (const Arguments& args) enum Connection::readyState Connection::ReadyState (void) { - if (socket_.flags & EVCOM_GOT_FULL_CLOSE) + if (stream_.flags & EVCOM_GOT_FULL_CLOSE) return CLOSED; - if (socket_.flags & EVCOM_GOT_HALF_CLOSE) - return (socket_.read_action == NULL ? CLOSED : READ_ONLY); + if (stream_.flags & EVCOM_GOT_HALF_CLOSE) + return (stream_.read_action == NULL ? CLOSED : READ_ONLY); - if (socket_.read_action && socket_.write_action) + if (stream_.read_action && stream_.write_action) return OPEN; - else if (socket_.write_action) + else if (stream_.write_action) return WRITE_ONLY; - else if (socket_.read_action) + else if (stream_.read_action) return READ_ONLY; else if (opening) @@ -165,9 +165,9 @@ Connection::Connect (const Arguments& args) connection->Init(); // in case we're reusing the socket... ? } - assert(connection->socket_.fd < 0); - assert(connection->socket_.read_action == NULL); - assert(connection->socket_.write_action == NULL); + assert(connection->stream_.fd < 0); + assert(connection->stream_.read_action == NULL); + assert(connection->stream_.write_action == NULL); if (args.Length() == 0) return ThrowException(String::New("Must specify a port.")); @@ -267,23 +267,23 @@ Connection::AfterResolve (eio_req *req) connection->opening = false; int r = 0; - if (req->result == 0) r = connection->Connect(address); + if (req->result == 0) r = connection->Connect(address->ai_addr); if (address_list) freeaddrinfo(address_list); // no error. return. if (r == 0 && req->result == 0) { - evcom_socket_attach (EV_DEFAULT_UC_ &connection->socket_); + evcom_stream_attach (EV_DEFAULT_UC_ &connection->stream_); goto out; } /* RESOLVE ERROR */ - /* TODO: the whole resolve process should be moved into evcom_socket. + /* TODO: the whole resolve process should be moved into evcom_stream. * The fact that I'm modifying a read-only variable here should be * good evidence of this. */ - connection->socket_.errorno = r | req->result; + connection->stream_.errorno = r | req->result; connection->OnDisconnect(); @@ -451,7 +451,7 @@ Connection::OnDisconnect () HandleScope scope; Handle argv[1]; - argv[0] = socket_.errorno == 0 ? False() : True(); + argv[0] = stream_.errorno == 0 ? False() : True(); Emit("disconnect", 1, argv); } @@ -623,7 +623,7 @@ Server::Listen (const Arguments& args) address = AddressDefaultToIPv4(address_list); - server->Listen(address, backlog); + server->Listen(address->ai_addr, backlog); if (address_list) freeaddrinfo(address_list); diff --git a/src/net.h b/src/net.h index e18e76c08e..3b142b2048 100644 --- a/src/net.h +++ b/src/net.h @@ -42,13 +42,13 @@ protected: } virtual ~Connection (void); - int Connect (struct addrinfo *address) { - return evcom_socket_connect (&socket_, address); + int Connect (struct sockaddr *address) { + return evcom_stream_connect (&stream_, address); } - void Send (evcom_buf *buf) { evcom_socket_write(&socket_, buf); } - void Close (void) { evcom_socket_close(&socket_); } - void FullClose (void) { evcom_socket_full_close(&socket_); } - void ForceClose (void) { evcom_socket_force_close(&socket_); } + void Send (evcom_buf *buf) { evcom_stream_write(&stream_, buf); } + void Close (void) { evcom_stream_close(&stream_); } + void FullClose (void) { evcom_stream_full_close(&stream_); } + void ForceClose (void) { evcom_stream_force_close(&stream_); } virtual void OnConnect (void); virtual void OnReceive (const void *buf, size_t len); @@ -68,12 +68,12 @@ protected: private: /* liboi callbacks */ - static void on_connect (evcom_socket *s) { + static void on_connect (evcom_stream *s) { Connection *connection = static_cast (s->data); connection->OnConnect(); } - static void on_read (evcom_socket *s, const void *buf, size_t len) { + static void on_read (evcom_stream *s, const void *buf, size_t len) { Connection *connection = static_cast (s->data); assert(connection->attached_); if (len == 0) @@ -82,17 +82,17 @@ private: connection->OnReceive(buf, len); } - static void on_drain (evcom_socket *s) { + static void on_drain (evcom_stream *s) { Connection *connection = static_cast (s->data); connection->OnDrain(); } - static void on_close (evcom_socket *s) { + static void on_close (evcom_stream *s) { Connection *connection = static_cast (s->data); - assert(connection->socket_.fd < 0); - assert(connection->socket_.read_action == NULL); - assert(connection->socket_.write_action == NULL); + assert(connection->stream_.fd < 0); + assert(connection->stream_.read_action == NULL); + assert(connection->stream_.write_action == NULL); connection->OnDisconnect(); @@ -105,7 +105,7 @@ private: connection->Detach(); } - static void on_timeout (evcom_socket *s) { + static void on_timeout (evcom_stream *s) { Connection *connection = static_cast (s->data); connection->OnTimeout(); } @@ -116,7 +116,7 @@ private: static int AfterResolve (eio_req *req); char *host_; char *port_; - evcom_socket socket_; + evcom_stream stream_; friend class Server; }; @@ -143,7 +143,7 @@ protected: evcom_server_close (&server_); } - int Listen (struct addrinfo *address, int backlog) { + int Listen (struct sockaddr *address, int backlog) { int r = evcom_server_listen (&server_, address, backlog); if(r != 0) return r; evcom_server_attach (EV_DEFAULT_ &server_); @@ -160,10 +160,10 @@ protected: private: Connection* OnConnection (struct sockaddr *addr); - static evcom_socket* on_connection (evcom_server *s, struct sockaddr *addr) { + static evcom_stream* on_connection (evcom_server *s, struct sockaddr *addr) { Server *server = static_cast (s->data); Connection *connection = server->OnConnection (addr); - return &connection->socket_; + return &connection->stream_; } void OnClose (int errorno);