diff --git a/deps/evcom/evcom.c b/deps/evcom/evcom.c index febed43a38..629864d58f 100644 --- a/deps/evcom/evcom.c +++ b/deps/evcom/evcom.c @@ -226,7 +226,10 @@ stream_send__close_one (evcom_stream *stream) /* TODO recover from EINTR */ stream__set_send_closed(stream); - if (DUPLEX(stream)) stream__set_recv_closed(stream); + + if (DUPLEX(stream) || stream->recv_action == stream_recv__wait_for_close) { + stream__set_recv_closed(stream); + } return OKAY; } @@ -253,8 +256,11 @@ stream__close_both (evcom_stream *stream) static int stream_send__close (evcom_stream *stream) { - stream->send_action = DUPLEX(stream) ? - stream_send__close_one : stream__close_both; + if (DUPLEX(stream) || stream->recvfd < 0) { + stream->send_action = stream_send__close_one; + } else { + stream->send_action = stream__close_both; + } return OKAY; } @@ -268,7 +274,10 @@ stream_recv__close_one (evcom_stream *stream) /* TODO recover from EINTR */ stream__set_recv_closed(stream); - if (DUPLEX(stream)) stream__set_send_closed(stream); + + if (DUPLEX(stream)) { + stream__set_send_closed(stream); + } return OKAY; } @@ -276,8 +285,11 @@ stream_recv__close_one (evcom_stream *stream) static int stream_recv__close (evcom_stream *stream) { - stream->recv_action = DUPLEX(stream) ? - stream_recv__close_one : stream__close_both; + if (DUPLEX(stream) || stream->sendfd < 0) { + stream->recv_action = stream_recv__close_one; + } else { + stream->recv_action = stream__close_both; + } return OKAY; } @@ -526,13 +538,13 @@ stream_recv__data (evcom_stream *stream) if (recved == 0) { stream->flags &= ~EVCOM_READABLE; ev_io_stop(D_LOOP_(stream) &stream->read_watcher); + stream->recv_action = stream_recv__wait_for_close; } /* NOTE: EOF is signaled with recved == 0 on callback */ if (stream->on_read) stream->on_read(stream, buf, recved); if (recved == 0) { - stream->recv_action = stream_recv__wait_for_close; return OKAY; } } @@ -682,7 +694,7 @@ stream_send__wait_for_connection (evcom_stream *stream) return OKAY; } -static void +void evcom_stream_assign_fds (evcom_stream *stream, int recvfd, int sendfd) { assert(recvfd >= 0); @@ -690,6 +702,14 @@ evcom_stream_assign_fds (evcom_stream *stream, int recvfd, int sendfd) if (recvfd == sendfd) stream->flags |= EVCOM_DUPLEX; + if (set_nonblock(recvfd) != 0) { + evcom_perror("set_nonblock(recvfd)", errno); + } + + if (set_nonblock(sendfd) != 0) { + evcom_perror("set_nonblock(sendfd)", errno); + } + #ifdef SO_NOSIGPIPE if (DUPLEX(stream)) { int flags = 1; @@ -736,7 +756,9 @@ accept_connection (evcom_server *server) if (fd < 0) { switch (errno) { case EMFILE: + case ENFILE: too_many_connections = 1; + server->flags |= EVCOM_TOO_MANY_CONN; evcom_server_detach(server); return NULL; @@ -992,10 +1014,11 @@ stream_event (EV_P_ ev_io *w, int revents) if (stream->sendfd < 0 && stream->recvfd < 0) { ev_timer_stop(EV_A_ &stream->timeout_watcher); - if (too_many_connections && stream->server) { + if (stream->server && (stream->server->flags & EVCOM_TOO_MANY_CONN)) { #if EV_MULTIPLICITY struct ev_loop *loop = stream->server->loop; #endif + stream->server->flags &= ~EVCOM_TOO_MANY_CONN; evcom_server_attach(EV_A_ stream->server); } too_many_connections = 0; @@ -1049,19 +1072,24 @@ void evcom_stream_close (evcom_stream *stream) { stream->flags |= EVCOM_GOT_CLOSE; - if (WRITABLE(stream)) { - ev_io_start(D_LOOP_(stream) &stream->write_watcher); + if (ATTACHED(stream)) { + // start the watchers if attached. + evcom_stream_attach(D_LOOP_(stream) stream); } } void evcom_stream_force_close (evcom_stream *stream) { - close(stream->recvfd); - /* XXX What to do on EINTR? */ - stream__set_recv_closed(stream); + if (stream->recvfd >= 0) { + close(stream->recvfd); + /* XXX What to do on EINTR? */ + stream__set_recv_closed(stream); + } - if (!DUPLEX(stream)) close(stream->sendfd); - stream__set_send_closed(stream); + if (!DUPLEX(stream) && stream->sendfd >= 0) { + close(stream->sendfd); + stream__set_send_closed(stream); + } evcom_stream_detach(stream); } diff --git a/deps/evcom/evcom.h b/deps/evcom/evcom.h index 1bbfd03f15..fae9244747 100644 --- a/deps/evcom/evcom.h +++ b/deps/evcom/evcom.h @@ -57,6 +57,7 @@ extern "C" { #define EVCOM_PAUSED 0x0040 #define EVCOM_READABLE 0x0080 #define EVCOM_WRITABLE 0x0100 +#define EVCOM_TOO_MANY_CONN 0x0200 enum evcom_stream_state { EVCOM_INITIALIZED , EVCOM_CONNECTING @@ -181,6 +182,7 @@ void evcom_stream_init (evcom_stream *, float timeout); int evcom_stream_pair (evcom_stream *a, evcom_stream *b); int evcom_stream_connect (evcom_stream *, struct sockaddr *address); +void evcom_stream_assign_fds (evcom_stream *, int recvfd, int sendfd); void evcom_stream_attach (EV_P_ evcom_stream *); void evcom_stream_detach (evcom_stream *); diff --git a/deps/evcom/test/test.c b/deps/evcom/test/test.c index 6b4b765fa0..7aadebeebc 100644 --- a/deps/evcom/test/test.c +++ b/deps/evcom/test/test.c @@ -218,9 +218,6 @@ pingpong (struct sockaddr *address) nconnections = 0; got_server_close = 0; - printf("sizeof(evcom_server): %d\n", (int)sizeof(evcom_server)); - printf("sizeof(evcom_stream): %d\n", (int)sizeof(evcom_stream)); - evcom_server_init(&server); server.on_connection = pingpong_on_server_connection; server.on_close = common_on_server_close; @@ -549,7 +546,7 @@ void b_read (evcom_stream *stream, const void *buf, size_t len) } int -pair_pingpong () +pair_pingpong (int use_pipe) { a_got_close = 0; a_got_connect = 0; @@ -573,8 +570,18 @@ pair_pingpong () if (use_tls) anon_tls_server(&b); #endif - int r = evcom_stream_pair(&a, &b); - assert(r == 0); + if (use_pipe) { + int pipeA[2], pipeB[2]; + assert(0 == pipe(pipeA)); + assert(0 == pipe(pipeB)); + + evcom_stream_assign_fds(&a, pipeA[0], pipeB[1]); + evcom_stream_assign_fds(&b, pipeB[0], pipeA[1]); + + } else { + int r = evcom_stream_pair(&a, &b); + assert(r == 0); + } evcom_stream_attach(EV_DEFAULT_ &a); evcom_stream_attach(EV_DEFAULT_ &b); @@ -764,6 +771,11 @@ free_unix_address (struct sockaddr *address) int main (void) { + fprintf(stderr, "sizeof(evcom_server): %d\n", (int)sizeof(evcom_server)); + fprintf(stderr, "sizeof(evcom_stream): %d\n", (int)sizeof(evcom_stream)); + fprintf(stderr, "sizeof(evcom_reader): %d\n", (int)sizeof(evcom_reader)); + fprintf(stderr, "sizeof(evcom_writer): %d\n", (int)sizeof(evcom_writer)); + #if EVCOM_HAVE_GNUTLS gnutls_global_init(); @@ -785,6 +797,14 @@ main (void) use_tls = 0; + fprintf(stderr, "pair_pingpong use_pipe=1: "); + assert(pair_pingpong(1) == 0); + fprintf(stderr, "\n"); + + fprintf(stderr, "pair_pingpong use_pipe=0: "); + assert(pair_pingpong(0) == 0); + fprintf(stderr, "\n"); + fprintf(stderr, "zero_stream tcp: "); assert(zero_stream((struct sockaddr*)&tcp_address, 5*1024*1024) == 0); fprintf(stderr, "\n"); @@ -793,10 +813,6 @@ main (void) assert(pipe_stream() == 0); fprintf(stderr, "\n"); - fprintf(stderr, "pair_pingpong: "); - assert(pair_pingpong() == 0); - fprintf(stderr, "\n"); - fprintf(stderr, "pingpong tcp: "); assert(pingpong((struct sockaddr*)&tcp_address) == 0); fprintf(stderr, "\n"); @@ -812,8 +828,12 @@ main (void) assert(zero_stream((struct sockaddr*)&tcp_address, 50*1024) == 0); fprintf(stderr, "\n"); - fprintf(stderr, "pair_pingpong ssl: "); - assert(pair_pingpong() == 0); + fprintf(stderr, "pair_pingpong ssl use_pipe=1: "); + assert(pair_pingpong(1) == 0); + fprintf(stderr, "\n"); + + fprintf(stderr, "pair_pingpong ssl use_pipe=0: "); + assert(pair_pingpong(0) == 0); fprintf(stderr, "\n"); fprintf(stderr, "pingpong ssl: ");