Browse Source

Sync evcom

v0.7.4-release
Ryan 16 years ago
parent
commit
ed3602dddc
  1. 60
      deps/evcom/evcom.c
  2. 2
      deps/evcom/evcom.h
  3. 44
      deps/evcom/test/test.c

60
deps/evcom/evcom.c

@ -226,7 +226,10 @@ stream_send__close_one (evcom_stream *stream)
/* TODO recover from EINTR */ /* TODO recover from EINTR */
stream__set_send_closed(stream); stream__set_send_closed(stream);
if (DUPLEX(stream)) stream__set_recv_closed(stream);
if (DUPLEX(stream) || stream->recv_action == stream_recv__wait_for_close) {
stream__set_recv_closed(stream);
}
return OKAY; return OKAY;
} }
@ -253,8 +256,11 @@ stream__close_both (evcom_stream *stream)
static int static int
stream_send__close (evcom_stream *stream) stream_send__close (evcom_stream *stream)
{ {
stream->send_action = DUPLEX(stream) ? if (DUPLEX(stream) || stream->recvfd < 0) {
stream_send__close_one : stream__close_both; stream->send_action = stream_send__close_one;
} else {
stream->send_action = stream__close_both;
}
return OKAY; return OKAY;
} }
@ -268,7 +274,10 @@ stream_recv__close_one (evcom_stream *stream)
/* TODO recover from EINTR */ /* TODO recover from EINTR */
stream__set_recv_closed(stream); stream__set_recv_closed(stream);
if (DUPLEX(stream)) stream__set_send_closed(stream);
if (DUPLEX(stream)) {
stream__set_send_closed(stream);
}
return OKAY; return OKAY;
} }
@ -276,8 +285,11 @@ stream_recv__close_one (evcom_stream *stream)
static int static int
stream_recv__close (evcom_stream *stream) stream_recv__close (evcom_stream *stream)
{ {
stream->recv_action = DUPLEX(stream) ? if (DUPLEX(stream) || stream->sendfd < 0) {
stream_recv__close_one : stream__close_both; stream->recv_action = stream_recv__close_one;
} else {
stream->recv_action = stream__close_both;
}
return OKAY; return OKAY;
} }
@ -526,13 +538,13 @@ stream_recv__data (evcom_stream *stream)
if (recved == 0) { if (recved == 0) {
stream->flags &= ~EVCOM_READABLE; stream->flags &= ~EVCOM_READABLE;
ev_io_stop(D_LOOP_(stream) &stream->read_watcher); ev_io_stop(D_LOOP_(stream) &stream->read_watcher);
stream->recv_action = stream_recv__wait_for_close;
} }
/* NOTE: EOF is signaled with recved == 0 on callback */ /* NOTE: EOF is signaled with recved == 0 on callback */
if (stream->on_read) stream->on_read(stream, buf, recved); if (stream->on_read) stream->on_read(stream, buf, recved);
if (recved == 0) { if (recved == 0) {
stream->recv_action = stream_recv__wait_for_close;
return OKAY; return OKAY;
} }
} }
@ -682,7 +694,7 @@ stream_send__wait_for_connection (evcom_stream *stream)
return OKAY; return OKAY;
} }
static void void
evcom_stream_assign_fds (evcom_stream *stream, int recvfd, int sendfd) evcom_stream_assign_fds (evcom_stream *stream, int recvfd, int sendfd)
{ {
assert(recvfd >= 0); assert(recvfd >= 0);
@ -690,6 +702,14 @@ evcom_stream_assign_fds (evcom_stream *stream, int recvfd, int sendfd)
if (recvfd == sendfd) stream->flags |= EVCOM_DUPLEX; if (recvfd == sendfd) stream->flags |= EVCOM_DUPLEX;
if (set_nonblock(recvfd) != 0) {
evcom_perror("set_nonblock(recvfd)", errno);
}
if (set_nonblock(sendfd) != 0) {
evcom_perror("set_nonblock(sendfd)", errno);
}
#ifdef SO_NOSIGPIPE #ifdef SO_NOSIGPIPE
if (DUPLEX(stream)) { if (DUPLEX(stream)) {
int flags = 1; int flags = 1;
@ -736,7 +756,9 @@ accept_connection (evcom_server *server)
if (fd < 0) { if (fd < 0) {
switch (errno) { switch (errno) {
case EMFILE: case EMFILE:
case ENFILE:
too_many_connections = 1; too_many_connections = 1;
server->flags |= EVCOM_TOO_MANY_CONN;
evcom_server_detach(server); evcom_server_detach(server);
return NULL; return NULL;
@ -992,10 +1014,11 @@ stream_event (EV_P_ ev_io *w, int revents)
if (stream->sendfd < 0 && stream->recvfd < 0) { if (stream->sendfd < 0 && stream->recvfd < 0) {
ev_timer_stop(EV_A_ &stream->timeout_watcher); ev_timer_stop(EV_A_ &stream->timeout_watcher);
if (too_many_connections && stream->server) { if (stream->server && (stream->server->flags & EVCOM_TOO_MANY_CONN)) {
#if EV_MULTIPLICITY #if EV_MULTIPLICITY
struct ev_loop *loop = stream->server->loop; struct ev_loop *loop = stream->server->loop;
#endif #endif
stream->server->flags &= ~EVCOM_TOO_MANY_CONN;
evcom_server_attach(EV_A_ stream->server); evcom_server_attach(EV_A_ stream->server);
} }
too_many_connections = 0; too_many_connections = 0;
@ -1049,19 +1072,24 @@ void
evcom_stream_close (evcom_stream *stream) evcom_stream_close (evcom_stream *stream)
{ {
stream->flags |= EVCOM_GOT_CLOSE; stream->flags |= EVCOM_GOT_CLOSE;
if (WRITABLE(stream)) { if (ATTACHED(stream)) {
ev_io_start(D_LOOP_(stream) &stream->write_watcher); // start the watchers if attached.
evcom_stream_attach(D_LOOP_(stream) stream);
} }
} }
void evcom_stream_force_close (evcom_stream *stream) void evcom_stream_force_close (evcom_stream *stream)
{ {
close(stream->recvfd); if (stream->recvfd >= 0) {
/* XXX What to do on EINTR? */ close(stream->recvfd);
stream__set_recv_closed(stream); /* XXX What to do on EINTR? */
stream__set_recv_closed(stream);
}
if (!DUPLEX(stream)) close(stream->sendfd); if (!DUPLEX(stream) && stream->sendfd >= 0) {
stream__set_send_closed(stream); close(stream->sendfd);
stream__set_send_closed(stream);
}
evcom_stream_detach(stream); evcom_stream_detach(stream);
} }

2
deps/evcom/evcom.h

@ -57,6 +57,7 @@ extern "C" {
#define EVCOM_PAUSED 0x0040 #define EVCOM_PAUSED 0x0040
#define EVCOM_READABLE 0x0080 #define EVCOM_READABLE 0x0080
#define EVCOM_WRITABLE 0x0100 #define EVCOM_WRITABLE 0x0100
#define EVCOM_TOO_MANY_CONN 0x0200
enum evcom_stream_state { EVCOM_INITIALIZED enum evcom_stream_state { EVCOM_INITIALIZED
, EVCOM_CONNECTING , EVCOM_CONNECTING
@ -181,6 +182,7 @@ void evcom_stream_init (evcom_stream *, float timeout);
int evcom_stream_pair (evcom_stream *a, evcom_stream *b); int evcom_stream_pair (evcom_stream *a, evcom_stream *b);
int evcom_stream_connect (evcom_stream *, struct sockaddr *address); int evcom_stream_connect (evcom_stream *, struct sockaddr *address);
void evcom_stream_assign_fds (evcom_stream *, int recvfd, int sendfd);
void evcom_stream_attach (EV_P_ evcom_stream *); void evcom_stream_attach (EV_P_ evcom_stream *);
void evcom_stream_detach (evcom_stream *); void evcom_stream_detach (evcom_stream *);

44
deps/evcom/test/test.c

@ -218,9 +218,6 @@ pingpong (struct sockaddr *address)
nconnections = 0; nconnections = 0;
got_server_close = 0; got_server_close = 0;
printf("sizeof(evcom_server): %d\n", (int)sizeof(evcom_server));
printf("sizeof(evcom_stream): %d\n", (int)sizeof(evcom_stream));
evcom_server_init(&server); evcom_server_init(&server);
server.on_connection = pingpong_on_server_connection; server.on_connection = pingpong_on_server_connection;
server.on_close = common_on_server_close; server.on_close = common_on_server_close;
@ -549,7 +546,7 @@ void b_read (evcom_stream *stream, const void *buf, size_t len)
} }
int int
pair_pingpong () pair_pingpong (int use_pipe)
{ {
a_got_close = 0; a_got_close = 0;
a_got_connect = 0; a_got_connect = 0;
@ -573,8 +570,18 @@ pair_pingpong ()
if (use_tls) anon_tls_server(&b); if (use_tls) anon_tls_server(&b);
#endif #endif
int r = evcom_stream_pair(&a, &b); if (use_pipe) {
assert(r == 0); int pipeA[2], pipeB[2];
assert(0 == pipe(pipeA));
assert(0 == pipe(pipeB));
evcom_stream_assign_fds(&a, pipeA[0], pipeB[1]);
evcom_stream_assign_fds(&b, pipeB[0], pipeA[1]);
} else {
int r = evcom_stream_pair(&a, &b);
assert(r == 0);
}
evcom_stream_attach(EV_DEFAULT_ &a); evcom_stream_attach(EV_DEFAULT_ &a);
evcom_stream_attach(EV_DEFAULT_ &b); evcom_stream_attach(EV_DEFAULT_ &b);
@ -764,6 +771,11 @@ free_unix_address (struct sockaddr *address)
int int
main (void) main (void)
{ {
fprintf(stderr, "sizeof(evcom_server): %d\n", (int)sizeof(evcom_server));
fprintf(stderr, "sizeof(evcom_stream): %d\n", (int)sizeof(evcom_stream));
fprintf(stderr, "sizeof(evcom_reader): %d\n", (int)sizeof(evcom_reader));
fprintf(stderr, "sizeof(evcom_writer): %d\n", (int)sizeof(evcom_writer));
#if EVCOM_HAVE_GNUTLS #if EVCOM_HAVE_GNUTLS
gnutls_global_init(); gnutls_global_init();
@ -785,6 +797,14 @@ main (void)
use_tls = 0; use_tls = 0;
fprintf(stderr, "pair_pingpong use_pipe=1: ");
assert(pair_pingpong(1) == 0);
fprintf(stderr, "\n");
fprintf(stderr, "pair_pingpong use_pipe=0: ");
assert(pair_pingpong(0) == 0);
fprintf(stderr, "\n");
fprintf(stderr, "zero_stream tcp: "); fprintf(stderr, "zero_stream tcp: ");
assert(zero_stream((struct sockaddr*)&tcp_address, 5*1024*1024) == 0); assert(zero_stream((struct sockaddr*)&tcp_address, 5*1024*1024) == 0);
fprintf(stderr, "\n"); fprintf(stderr, "\n");
@ -793,10 +813,6 @@ main (void)
assert(pipe_stream() == 0); assert(pipe_stream() == 0);
fprintf(stderr, "\n"); fprintf(stderr, "\n");
fprintf(stderr, "pair_pingpong: ");
assert(pair_pingpong() == 0);
fprintf(stderr, "\n");
fprintf(stderr, "pingpong tcp: "); fprintf(stderr, "pingpong tcp: ");
assert(pingpong((struct sockaddr*)&tcp_address) == 0); assert(pingpong((struct sockaddr*)&tcp_address) == 0);
fprintf(stderr, "\n"); fprintf(stderr, "\n");
@ -812,8 +828,12 @@ main (void)
assert(zero_stream((struct sockaddr*)&tcp_address, 50*1024) == 0); assert(zero_stream((struct sockaddr*)&tcp_address, 50*1024) == 0);
fprintf(stderr, "\n"); fprintf(stderr, "\n");
fprintf(stderr, "pair_pingpong ssl: "); fprintf(stderr, "pair_pingpong ssl use_pipe=1: ");
assert(pair_pingpong() == 0); assert(pair_pingpong(1) == 0);
fprintf(stderr, "\n");
fprintf(stderr, "pair_pingpong ssl use_pipe=0: ");
assert(pair_pingpong(0) == 0);
fprintf(stderr, "\n"); fprintf(stderr, "\n");
fprintf(stderr, "pingpong ssl: "); fprintf(stderr, "pingpong ssl: ");

Loading…
Cancel
Save