Browse Source

Upgrade evcom - fix API issues.

v0.7.4-release
Ryan 16 years ago
parent
commit
368ea93bfe
  1. 1233
      deps/evcom/evcom.c
  2. 127
      deps/evcom/evcom.h
  3. 37
      deps/evcom/recv_states.dot
  4. 65
      deps/evcom/send_states.dot
  5. 6
      deps/evcom/test/echo.c
  6. 478
      deps/evcom/test/test.c
  7. 2
      src/http.js
  8. 32
      src/net.cc
  9. 7
      src/net.h
  10. 3
      test/mjsunit/test-tcp-many-clients.js
  11. 2
      test/mjsunit/test-tcp-reconnect.js
  12. 2
      test/mjsunit/test-tcp-throttle-kernel-buffer.js
  13. 2
      test/mjsunit/test-tcp-throttle.js

1233
deps/evcom/evcom.c

File diff suppressed because it is too large

127
deps/evcom/evcom.h

@ -33,7 +33,7 @@
#ifdef __cplusplus
extern "C" {
#endif
#endif
#ifndef EVCOM_HAVE_GNUTLS
# define EVCOM_HAVE_GNUTLS 0
@ -52,12 +52,11 @@ extern "C" {
#define EVCOM_LISTENING 0x0002
#define EVCOM_CONNECTED 0x0004
#define EVCOM_SECURE 0x0008
#define EVCOM_GOT_HALF_CLOSE 0x0010
#define EVCOM_GOT_FULL_CLOSE 0x0020
#define EVCOM_DUPLEX 0x0010
#define EVCOM_GOT_CLOSE 0x0020
#define EVCOM_PAUSED 0x0040
#define EVCOM_READABLE 0x0080
#define EVCOM_WRITABLE 0x0100
#define EVCOM_GOT_WRITE_EVENT 0x0200
enum evcom_stream_state { EVCOM_INITIALIZED
, EVCOM_CONNECTING
@ -91,95 +90,112 @@ typedef struct evcom_buf {
# define EVCOM_LOOP
#endif
#define EVCOM_DESCRIPTOR(type) \
unsigned int flags; /* private */ \
int (*action) (struct evcom_descriptor*); /* private */ \
int errorno; /* read-only */ \
int fd; /* read-only */ \
EVCOM_LOOP /* read-only */ \
void *data; /* public */ \
void (*on_close) (struct type*); /* public */
#define EVCOM_DESCRIPTOR(type) \
/* private */ unsigned int flags; \
/* private */ int (*action) (struct evcom_descriptor*); \
/* read-only */ int errorno; \
/* read-only */ int fd; \
/* read-only */ EVCOM_LOOP \
/* public */ void *data; \
/* public */ void (*on_close) (struct type*);
/* abstract base class */
typedef struct evcom_descriptor {
EVCOM_DESCRIPTOR(evcom_descriptor)
} evcom_descriptor;
typedef struct evcom_server {
EVCOM_DESCRIPTOR(evcom_server)
/* PRIVATE */
ev_io watcher;
typedef struct evcom_reader {
EVCOM_DESCRIPTOR(evcom_reader)
ev_io read_watcher; /* private */
void (*on_read) (struct evcom_reader*, const void* buf, size_t len); /* public */
} evcom_reader;
/* PUBLIC */
struct evcom_stream*
(*on_connection)(struct evcom_server *, struct sockaddr *remote_addr);
} evcom_server;
typedef struct evcom_writer {
EVCOM_DESCRIPTOR(evcom_writer)
ev_io write_watcher; /* private */
evcom_queue out; /* private */
} evcom_writer;
typedef struct evcom_stream {
EVCOM_DESCRIPTOR(evcom_stream)
/* PRIVATE */
ev_io write_watcher;
/* PRIVATE */
EVCOM_LOOP
int errorno;
unsigned int flags;
evcom_queue out;
ev_io read_watcher;
ev_io write_watcher;
int (*send_action) (struct evcom_stream*);
int (*recv_action) (struct evcom_stream*);
ev_timer timeout_watcher;
#if EVCOM_HAVE_GNUTLS
gnutls_session_t session;
#endif
/* READ-ONLY */
/* READ-ONLY */
int recvfd;
int sendfd;
struct evcom_server *server;
evcom_queue out;
#if EVCOM_HAVE_GNUTLS
int gnutls_errorno;
#endif
/* PUBLIC */
void (*on_connect) (struct evcom_stream *);
void (*on_read) (struct evcom_stream *, const void *buf, size_t count);
void (*on_drain) (struct evcom_stream *);
void (*on_timeout) (struct evcom_stream *);
void (*on_read) (struct evcom_stream *, const void* buf, size_t len);
void (*on_close) (struct evcom_stream *);
void *data;
} evcom_stream;
typedef struct evcom_server {
EVCOM_DESCRIPTOR(evcom_server)
/* PRIVATE */
ev_io watcher;
/* PUBLIC */
struct evcom_stream*
(*on_connection)(struct evcom_server *, struct sockaddr *remote_addr);
} evcom_server;
void evcom_reader_init (evcom_reader*);
void evcom_reader_set (evcom_reader*, int fd);
void evcom_reader_attach (EV_P_ evcom_reader*);
void evcom_reader_detach (evcom_reader*);
void evcom_reader_close (evcom_reader*);
void evcom_writer_init (evcom_writer*);
void evcom_writer_set (evcom_writer*, int fd);
void evcom_writer_attach (EV_P_ evcom_writer*);
void evcom_writer_detach (evcom_writer*);
void evcom_writer_write (evcom_writer*, const char *str, size_t len);
void evcom_writer_close (evcom_writer*);
void evcom_server_init (evcom_server *);
int evcom_server_listen (evcom_server *, struct sockaddr *address, int backlog);
void evcom_server_attach (EV_P_ evcom_server *);
void evcom_server_detach (evcom_server *);
void evcom_server_close (evcom_server *); // synchronous
void evcom_server_close (evcom_server *);
void evcom_stream_init (evcom_stream *, float timeout);
int evcom_stream_pair (evcom_stream *a, evcom_stream *b);
int evcom_stream_connect (evcom_stream *, struct sockaddr *address);
void evcom_stream_attach (EV_P_ evcom_stream *);
void evcom_stream_detach (evcom_stream *);
void evcom_stream_read_resume (evcom_stream *);
void evcom_stream_read_pause (evcom_stream *);
/* Resets the timeout to stay alive for another stream->timeout seconds
*/
/* Resets the timeout to stay alive for another stream->timeout seconds */
void evcom_stream_reset_timeout (evcom_stream *);
/* Writes a buffer to the stream.
*/
void evcom_stream_write (evcom_stream *, evcom_buf *);
void evcom_stream_write_simple (evcom_stream *, const char *str, size_t len);
void evcom_stream_write (evcom_stream *, const char *str, size_t len);
/* Once the write buffer is drained, evcom_stream_close will shutdown the
* writing end of the stream and will close the read end once the server
* replies with an EOF.
* replies with an EOF.
*/
void evcom_stream_close (evcom_stream *);
/* Do not wait for the server to reply with EOF. This will only be called
* once the write buffer is drained.
* Warning: For TCP stream, the OS kernel may (should) reply with RST
* packets if this is called when data is still being received from the
* server.
*/
void evcom_stream_full_close (evcom_stream *);
/* The most extreme measure.
* Will not wait for the write queue to complete.
*/
/* Will not wait for the write queue to complete. Closes both directions */
void evcom_stream_force_close (evcom_stream *);
@ -195,9 +211,8 @@ void evcom_stream_set_secure_session (evcom_stream *, gnutls_session_t);
enum evcom_stream_state evcom_stream_state (evcom_stream *stream);
evcom_buf * evcom_buf_new (const char* base, size_t len);
evcom_buf * evcom_buf_new2 (size_t len);
void evcom_buf_destroy (evcom_buf *);
evcom_buf* evcom_buf_new (const char* base, size_t len);
evcom_buf* evcom_buf_new2 (size_t len);
EV_INLINE void
evcom_queue_init (evcom_queue *q)
@ -235,5 +250,5 @@ evcom_queue_remove (evcom_queue *x)
#ifdef __cplusplus
}
#endif
#endif
#endif /* evcom_h */

37
deps/evcom/recv_states.dot

@ -0,0 +1,37 @@
strict digraph recv_states {
start [peripheries=2];
end [peripheries=2];
handshake;
recv_data;
wait_for_resume;
wait_for_close;
close_one;
close_both;
node [label="", shape="box", height=0.1, width=0.1];
close;
start -> handshake [label="tls"];
start -> recv_data;
handshake -> close [label="error"];
handshake -> recv_data;
recv_data -> handshake [label="rehandshake"];
recv_data -> wait_for_resume [label="pause"];
recv_data -> wait_for_close [label="eof"];
recv_data -> close [label="error"];
wait_for_resume -> recv_data;
wait_for_close -> close;
close -> close_one [label="duplex"];
close -> close_both;
close_one -> end;
close_both -> end;
}

65
deps/evcom/send_states.dot

@ -0,0 +1,65 @@
strict digraph send_states {
start [peripheries=2];
end [peripheries=2];
connection_established;
handshake;
send_data;
shutdown;
gnutls_bye;
close_one;
close_both;
wait_for_connect;
wait_for_buf;
wait_for_eof;
node [label="", shape="box", height=0.1, width=0.1];
close;
drain;
hangup;
hangup_unsecure;
start -> wait_for_connect [label="duplex"];
start -> connection_established;
wait_for_connect -> connection_established;
wait_for_connect -> close [label="error"];
connection_established -> handshake [label="tls"];
connection_established -> send_data;
handshake -> close [label="error"];
handshake -> send_data;
send_data -> close [label="error"];
send_data -> drain [label="drain"];
drain -> wait_for_buf;
drain -> hangup [label="got_close"];
wait_for_buf -> send_data;
wait_for_buf -> drain [label="empty_buf"];
hangup -> gnutls_bye [label="tls"];
hangup -> hangup_unsecure;
gnutls_bye -> wait_for_eof;
gnutls_bye -> close [label="error"];
hangup_unsecure -> shutdown [label="duplex"];
hangup_unsecure -> close_one;
shutdown -> wait_for_eof;
shutdown -> close [label="error"];
wait_for_eof -> close_one;
close_one -> wait_for_eof [label="readable"];
close -> close_both;
close -> close_one [label="duplex"];
close_both -> end;
close_one -> end;
}

6
deps/evcom/test/echo.c

@ -12,7 +12,9 @@
#include <ev.h>
#include <evcom.h>
#include <gnutls/gnutls.h>
#if EVCOM_HAVE_GNUTLS
# include <gnutls/gnutls.h>
#endif
#define HOST "127.0.0.1"
#define SOCKFILE "/tmp/oi.sock"
@ -46,7 +48,7 @@ on_peer_read (evcom_stream *stream, const void *base, size_t len)
{
if(len == 0) return;
evcom_stream_write_simple(stream, base, len);
evcom_stream_write(stream, base, len);
}
static evcom_stream*

478
deps/evcom/test/test.c

@ -16,13 +16,20 @@
# include <gnutls/gnutls.h>
#endif
#define MARK_PROGRESS write(STDERR_FILENO, ".", 1)
#undef MAX
#define MAX(a,b) ((a) > (b) ? (a) : (b))
#undef MIN
#define MIN(a,b) ((a) < (b) ? (a) : (b))
#define MARK_PROGRESS(c,cur,max) \
if (cur % (MAX(max,50)/50) == 0) write(STDERR_FILENO, c, 1)
#define SOCKFILE "/tmp/oi.sock"
#define PORT 5000
static evcom_server server;
static int nconnections;
static int nconnections;
static int use_tls;
static int got_server_close;
@ -36,7 +43,7 @@ common_on_server_close (evcom_server *s)
evcom_server_detach(s);
}
static void
static void
common_on_peer_close (evcom_stream *stream)
{
assert(EVCOM_CLOSED == evcom_stream_state(stream));
@ -50,14 +57,14 @@ common_on_peer_close (evcom_stream *stream)
free(stream);
}
static void
static void
common_on_client_timeout (evcom_stream *stream)
{
assert(stream);
printf("client connection timeout\n");
}
static void
static void
common_on_peer_timeout (evcom_stream *stream)
{
assert(stream);
@ -110,12 +117,12 @@ void anon_tls_client (evcom_stream *stream)
#define PING "PING"
#define PONG "PONG"
#define EXCHANGES 500
#define EXCHANGES 500
#define PINGPONG_TIMEOUT 5.0
static int successful_ping_count;
static int successful_ping_count;
static void
static void
pingpong_on_peer_read (evcom_stream *stream, const void *base, size_t len)
{
if (len == 0) {
@ -128,10 +135,10 @@ pingpong_on_peer_read (evcom_stream *stream, const void *base, size_t len)
buf[len] = 0;
printf("server got message: %s\n", buf);
evcom_stream_write_simple(stream, PONG, sizeof PONG);
evcom_stream_write(stream, PONG, sizeof PONG);
}
static void
static void
pingpong_on_client_close (evcom_stream *stream)
{
assert(EVCOM_CLOSED == evcom_stream_state(stream));
@ -141,7 +148,7 @@ pingpong_on_client_close (evcom_stream *stream)
evcom_stream_detach(stream);
}
static evcom_stream*
static evcom_stream*
pingpong_on_server_connection (evcom_server *_server, struct sockaddr *addr)
{
assert(_server == &server);
@ -166,15 +173,15 @@ pingpong_on_server_connection (evcom_server *_server, struct sockaddr *addr)
return stream;
}
static void
static void
pingpong_on_client_connect (evcom_stream *stream)
{
printf("client connected. sending ping\n");
evcom_stream_write_simple(stream, PING, sizeof PING);
evcom_stream_write(stream, PING, sizeof PING);
assert(EVCOM_CONNECTED_RW == evcom_stream_state(stream));
}
static void
static void
pingpong_on_client_read (evcom_stream *stream, const void *base, size_t len)
{
if(len == 0) {
@ -188,17 +195,17 @@ pingpong_on_client_read (evcom_stream *stream, const void *base, size_t len)
strncpy(buf, base, len);
buf[len] = 0;
printf("client got message: %s\n", buf);
assert(strcmp(buf, PONG) == 0);
if (++successful_ping_count > EXCHANGES) {
evcom_stream_close(stream);
return;
}
}
if (successful_ping_count % (EXCHANGES/20) == 0) MARK_PROGRESS;
MARK_PROGRESS(".", successful_ping_count, EXCHANGES);
evcom_stream_write_simple(stream, PING, sizeof PING);
evcom_stream_write(stream, PING, sizeof PING);
}
int
@ -206,13 +213,13 @@ pingpong (struct sockaddr *address)
{
int r;
evcom_stream client;
successful_ping_count = 0;
nconnections = 0;
got_server_close = 0;
printf("sizeof(evcom_server): %d\n", sizeof(evcom_server));
printf("sizeof(evcom_stream): %d\n", sizeof(evcom_stream));
printf("sizeof(evcom_server): %d\n", (int)sizeof(evcom_server));
printf("sizeof(evcom_stream): %d\n", (int)sizeof(evcom_stream));
evcom_server_init(&server);
server.on_connection = pingpong_on_server_connection;
@ -253,17 +260,17 @@ pingpong (struct sockaddr *address)
#define NCONN 50
#define CONNINT_TIMEOUT 10.0
static void
static void
send_bye_and_close(evcom_stream *stream, const void *base, size_t len)
{
assert(base);
assert(len == 0);
evcom_stream_write_simple(stream, "BYE", 3);
evcom_stream_write(stream, "BYE", 3);
printf("server wrote bye\n");
evcom_stream_close(stream);
}
static evcom_stream*
static evcom_stream*
connint_on_connection(evcom_server *_server, struct sockaddr *addr)
{
assert(_server == &server);
@ -284,21 +291,21 @@ connint_on_connection(evcom_server *_server, struct sockaddr *addr)
return stream;
}
static void
static void
connint_on_client_connect (evcom_stream *stream)
{
printf("on client connection\n");
evcom_stream_close(stream);
}
static void
static void
connint_on_client_close (evcom_stream *stream)
{
evcom_stream_close(stream); // already closed, but it shouldn't crash if we try to do it again
printf("client connection closed\n");
if (nconnections % (NCONN/20) == 0) MARK_PROGRESS;
MARK_PROGRESS(".", nconnections, NCONN);
if(++nconnections == NCONN) {
evcom_server_close(&server);
@ -308,7 +315,7 @@ connint_on_client_close (evcom_stream *stream)
evcom_stream_detach(stream);
}
static void
static void
connint_on_client_read (evcom_stream *stream, const void *base, size_t len)
{
if (len == 0) {
@ -321,12 +328,12 @@ connint_on_client_read (evcom_stream *stream, const void *base, size_t len)
buf[len] = 0;
printf("client got message: %s\n", buf);
assert(strcmp(buf, "BYE") == 0);
evcom_stream_close(stream);
}
int
int
connint (struct sockaddr *address)
{
int r;
@ -367,6 +374,365 @@ connint (struct sockaddr *address)
}
static evcom_reader reader;
static evcom_writer writer;
static int reader_got_close = 0;
static int reader_got_eof = 0;
static int reader_got_hello = 0;
static int reader_cnt = 0;
static int writer_got_close = 0;
#define PIPE_MSG "hello world"
#define PIPE_CNT 5000
static void
reader_read (evcom_reader *r, const void *str, size_t len)
{
assert(r == &reader);
if (len == 0) {
reader_got_eof = 1;
return;
}
assert(len == strlen(PIPE_MSG));
if (strncmp(str, PIPE_MSG, strlen(PIPE_MSG)) == 0) {
reader_got_hello = 1;
}
if (++reader_cnt < PIPE_CNT) {
MARK_PROGRESS(".", reader_cnt, PIPE_CNT);
evcom_writer_write(&writer, PIPE_MSG, strlen(PIPE_MSG));
} else {
evcom_writer_close(&writer);
}
}
static void
reader_close (evcom_reader *r)
{
assert(r == &reader);
reader_got_close = 1;
evcom_reader_detach(r);
}
static void
writer_close (evcom_writer *w)
{
assert(w == &writer);
writer_got_close = 1;
evcom_writer_detach(w);
}
int
pipe_stream (void)
{
reader_cnt = 0;
reader_got_close = 0;
reader_got_hello = 0;
reader_got_eof = 0;
writer_got_close = 0;
int pipefd[2];
int r = pipe(pipefd);
if (r < 0) {
perror("pipe()");
return -1;
}
evcom_reader_init(&reader);
reader.on_read = reader_read;
reader.on_close = reader_close;
evcom_reader_set(&reader, pipefd[0]);
evcom_reader_attach(EV_DEFAULT_ &reader);
evcom_writer_init(&writer);
writer.on_close = writer_close;
evcom_writer_set(&writer, pipefd[1]);
evcom_writer_attach(EV_DEFAULT_ &writer);
evcom_writer_write(&writer, PIPE_MSG, strlen(PIPE_MSG));
ev_loop(EV_DEFAULT_ 0);
assert(reader_got_close);
assert(reader_got_hello);
assert(reader_got_eof);
assert(writer_got_close);
assert(reader_cnt == PIPE_CNT);
return 0;
}
#define PAIR_PINGPONG_TIMEOUT 5000.0
#define PAIR_PINGPONG_EXCHANGES 50
static int a_got_close;
static int a_got_connect;
static int b_got_close;
static int b_got_connect;
static int pair_pingpong_cnt;
static evcom_stream a, b;
void a_connect (evcom_stream *stream)
{
assert(stream == &a);
a_got_connect = 1;
}
void a_close (evcom_stream *stream)
{
evcom_stream_detach(stream);
assert(stream == &a);
a_got_close = 1;
assert(stream->errorno == 0);
#if EVCOM_HAVE_GNUTLS
if (stream->gnutls_errorno) {
fprintf(stderr, "\nGNUTLS ERROR: %s\n", gnutls_strerror(stream->gnutls_errorno));
}
assert(stream->gnutls_errorno == 0);
if (use_tls) gnutls_deinit(stream->session);
#endif
}
void a_read (evcom_stream *stream, const void *buf, size_t len)
{
assert(stream == &a);
if (len == 0) return;
assert(len == strlen(PONG));
assert(strncmp(buf, PONG, strlen(PONG)) == 0);
if (++pair_pingpong_cnt < PAIR_PINGPONG_EXCHANGES) {
evcom_stream_write(&a, PING, strlen(PING));
} else if (pair_pingpong_cnt == PAIR_PINGPONG_EXCHANGES) {
evcom_stream_close(stream);
}
MARK_PROGRESS(".", pair_pingpong_cnt, PAIR_PINGPONG_EXCHANGES);
}
void b_connect (evcom_stream *stream)
{
assert(stream == &b);
b_got_connect = 1;
}
void b_close (evcom_stream *stream)
{
evcom_stream_detach(stream);
assert(stream == &b);
b_got_close = 1;
assert(stream->errorno == 0);
#if EVCOM_HAVE_GNUTLS
if (stream->gnutls_errorno) {
fprintf(stderr, "\nGNUTLS ERROR: %s\n", gnutls_strerror(stream->gnutls_errorno));
}
assert(stream->gnutls_errorno == 0);
if (use_tls) gnutls_deinit(stream->session);
#endif
}
void b_read (evcom_stream *stream, const void *buf, size_t len)
{
assert(stream == &b);
if (len == 0) {
evcom_stream_close(stream);
return;
}
assert(len == strlen(PING));
assert(strncmp(buf, PING, strlen(PING)) == 0);
evcom_stream_write(&b, PONG, strlen(PONG));
}
int
pair_pingpong ()
{
a_got_close = 0;
a_got_connect = 0;
b_got_close = 0;
b_got_connect = 0;
pair_pingpong_cnt = 0;
evcom_stream_init(&a, PAIR_PINGPONG_TIMEOUT);
a.on_close = a_close;
a.on_connect = a_connect;
a.on_read = a_read;
#if EVCOM_HAVE_GNUTLS
if (use_tls) anon_tls_client(&a);
#endif
evcom_stream_init(&b, PAIR_PINGPONG_TIMEOUT);
b.on_close = b_close;
b.on_connect = b_connect;
b.on_read = b_read;
#if EVCOM_HAVE_GNUTLS
if (use_tls) anon_tls_server(&b);
#endif
int r = evcom_stream_pair(&a, &b);
assert(r == 0);
evcom_stream_attach(EV_DEFAULT_ &a);
evcom_stream_attach(EV_DEFAULT_ &b);
evcom_stream_write(&a, PING, strlen(PING));
ev_loop(EV_DEFAULT_ 0);
assert(a_got_close);
assert(a_got_connect);
assert(b_got_close);
assert(b_got_connect);
assert(pair_pingpong_cnt == PAIR_PINGPONG_EXCHANGES);
return 0;
}
static void
free_stream (evcom_stream *stream)
{
assert(stream->errorno == 0);
free(stream);
}
#define ZERO_TIMEOUT 50.0
static size_t zero_to_write = 0;
static size_t zero_written = 0;
static size_t zero_read = 0;
static size_t zero_client_closed = 0;
static void
error_out (evcom_stream *stream)
{
assert(stream);
fprintf(stderr, "peer connection timeout\n");
assert(0);
}
static void
echo (evcom_stream *stream, const void *base, size_t len)
{
if(len == 0) {
fprintf(stderr, "close");
evcom_stream_close(stream);
} else {
evcom_stream_write(stream, base, len);
}
}
static evcom_stream*
make_echo_connection (evcom_server *server, struct sockaddr *addr)
{
assert(server);
assert(addr);
evcom_stream *stream = malloc(sizeof(evcom_stream));
evcom_stream_init(stream, ZERO_TIMEOUT);
stream->on_read = echo;
stream->on_close = free_stream;
stream->on_timeout = error_out;
#if EVCOM_HAVE_GNUTLS
if (use_tls) anon_tls_server(stream);
#endif
return stream;
}
static void
zero_start (evcom_stream *stream)
{
evcom_stream_write(stream, "0", 1);
zero_written++;
}
static void
zero_close (evcom_stream *stream)
{
assert(stream);
zero_client_closed = 1;
}
static void
zero_recv (evcom_stream *stream, const void *buf, size_t len)
{
MARK_PROGRESS("-", zero_read, zero_to_write);
zero_read += len;
size_t i;
for (i = 0; i < len; i++) {
assert(((char*)buf)[i] == '0');
}
for (i = 0; i < MIN(zero_to_write - zero_written, 90000); i++) {
evcom_stream_write(stream, "0", 1);
zero_written++;
MARK_PROGRESS(".", zero_written, zero_to_write);
if (zero_written == zero_to_write) {
fprintf(stderr, "CLOSE");
evcom_stream_close(stream);
}
}
if (len == 0) {
fprintf(stderr, "finish");
evcom_server_close(&server);
}
}
int
zero_stream (struct sockaddr *address, size_t to_write)
{
int r;
assert(to_write >= 1024); // should be kind of big at least.
zero_to_write = to_write;
got_server_close = 0;
zero_written = 0;
zero_read = 0;
zero_client_closed = 0;
evcom_server_init(&server);
server.on_connection = make_echo_connection;
server.on_close = common_on_server_close;
evcom_server_listen(&server, address, 1000);
evcom_server_attach(EV_DEFAULT_ &server);
evcom_stream client;
evcom_stream_init(&client, ZERO_TIMEOUT);
client.on_read = zero_recv;
client.on_connect = zero_start;
client.on_close = zero_close;
client.on_timeout = error_out;
#if EVCOM_HAVE_GNUTLS
if (use_tls) anon_tls_client(&client);
#endif
r = evcom_stream_connect(&client, address);
assert(r == 0 && "problem connecting");
evcom_stream_attach(EV_DEFAULT_ &client);
ev_loop(EV_DEFAULT_ 0);
assert(got_server_close);
assert(zero_written == zero_to_write);
assert(zero_read == zero_to_write);
assert(zero_client_closed) ;
return 0;
}
struct sockaddr *
create_unix_address (void)
{
@ -386,6 +752,11 @@ create_unix_address (void)
void
free_unix_address (struct sockaddr *address)
{
struct stat tstat;
if (lstat(SOCKFILE, &tstat) == 0) {
assert(S_ISSOCK(tstat.st_mode));
unlink(SOCKFILE);
}
free(address);
}
@ -405,48 +776,87 @@ main (void)
gnutls_anon_set_server_dh_params (server_credentials, dh_params);
#endif
struct sockaddr_in tcp_address;
memset(&tcp_address, 0, sizeof(struct sockaddr_in));
tcp_address.sin_family = AF_INET;
tcp_address.sin_port = htons(PORT);
tcp_address.sin_addr.s_addr = INADDR_ANY;
use_tls = 0;
fprintf(stderr, "zero_stream tcp: ");
assert(zero_stream((struct sockaddr*)&tcp_address, 5*1024*1024) == 0);
fprintf(stderr, "\n");
fprintf(stderr, "pipe_stream: ");
assert(pipe_stream() == 0);
fprintf(stderr, "\n");
fprintf(stderr, "pair_pingpong: ");
assert(pair_pingpong() == 0);
fprintf(stderr, "\n");
fprintf(stderr, "pingpong tcp: ");
assert(pingpong((struct sockaddr*)&tcp_address) == 0);
fprintf(stderr, "\n");
fprintf(stderr, "connint tcp: ");
assert(connint((struct sockaddr*)&tcp_address) == 0);
fprintf(stderr, "\n");
#if EVCOM_HAVE_GNUTLS
use_tls = 1;
fprintf(stderr, "zero_stream ssl: ");
assert(zero_stream((struct sockaddr*)&tcp_address, 50*1024) == 0);
fprintf(stderr, "\n");
fprintf(stderr, "pair_pingpong ssl: ");
assert(pair_pingpong() == 0);
fprintf(stderr, "\n");
fprintf(stderr, "pingpong ssl: ");
assert(pingpong((struct sockaddr*)&tcp_address) == 0);
fprintf(stderr, "\n");
fprintf(stderr, "connint ssl: ");
assert(connint((struct sockaddr*)&tcp_address) == 0);
#endif
fprintf(stderr, "\n");
#endif
struct sockaddr *unix_address;
use_tls = 0;
fprintf(stderr, "pingpong unix: ");
unix_address = create_unix_address();
assert(pingpong(unix_address) == 0);
free_unix_address(unix_address);
fprintf(stderr, "\n");
fprintf(stderr, "connint unix: ");
unix_address = create_unix_address();
assert(connint(unix_address) == 0);
free_unix_address(unix_address);
fprintf(stderr, "\n");
#if EVCOM_HAVE_GNUTLS
use_tls = 1;
fprintf(stderr, "pingpong unix ssl: ");
unix_address = create_unix_address();
assert(pingpong(unix_address) == 0);
free_unix_address(unix_address);
fprintf(stderr, "\n");
fprintf(stderr, "connint unix ssl: ");
unix_address = create_unix_address();
assert(connint(unix_address) == 0);
free_unix_address(unix_address);
#endif
fprintf(stderr, "\n");
#endif
return 0;
}

2
src/http.js

@ -394,7 +394,7 @@ function connectionListener (connection) {
res.should_keep_alive = should_keep_alive;
res.addListener("flush", function () {
if(flushMessageQueue(connection, responses)) {
connection.fullClose();
connection.close();
}
});
responses.push(res);

32
src/net.cc

@ -62,7 +62,6 @@ Connection::Initialize (v8::Handle<v8::Object> target)
NODE_SET_PROTOTYPE_METHOD(constructor_template, "connect", Connect);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "send", Send);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "close", Close);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "fullClose", FullClose);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "forceClose", ForceClose);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "setEncoding", SetEncoding);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "readPause", ReadPause);
@ -116,7 +115,8 @@ Connection::Init (void)
Connection::~Connection ()
{
assert(stream_.fd < 0 && "garbage collecting open Connection");
assert(stream_.recvfd < 0 && "garbage collecting open Connection");
assert(stream_.sendfd < 0 && "garbage collecting open Connection");
ForceClose();
}
@ -149,7 +149,8 @@ Connection::Connect (const Arguments& args)
return ThrowException(String::New("Socket is not in CLOSED state."));
}
assert(connection->stream_.fd < 0);
assert(connection->stream_.recvfd < 0);
assert(connection->stream_.sendfd < 0);
if (args.Length() == 0)
return ThrowException(String::New("Must specify a port."));
@ -344,17 +345,6 @@ Connection::Close (const Arguments& args)
return Undefined();
}
Handle<Value>
Connection::FullClose (const Arguments& args)
{
HandleScope scope;
Connection *connection = ObjectWrap::Unwrap<Connection>(args.Holder());
assert(connection);
connection->FullClose();
return Undefined();
}
Handle<Value>
Connection::ForceClose (const Arguments& args)
{
@ -394,31 +384,31 @@ Connection::Send (const Arguments& args)
enum encoding enc = ParseEncoding(args[1]);
Local<String> s = args[0]->ToString();
size_t len = s->Utf8Length();
evcom_buf *buf = node::buf_new(len);
char buf[len];
switch (enc) {
case RAW:
case ASCII:
s->WriteAscii(buf->base, 0, len);
s->WriteAscii(buf, 0, len);
break;
case UTF8:
s->WriteUtf8(buf->base, len);
s->WriteUtf8(buf, len);
break;
default:
assert(0 && "unhandled string encoding");
}
connection->Send(buf);
connection->Send(buf, len);
} else if (args[0]->IsArray()) {
Handle<Array> array = Handle<Array>::Cast(args[0]);
size_t len = array->Length();
evcom_buf *buf = node::buf_new(len);
char buf[len];
for (size_t i = 0; i < len; i++) {
Local<Value> int_value = array->Get(Integer::New(i));
buf->base[i] = int_value->IntegerValue();
buf[i] = int_value->IntegerValue();
}
connection->Send(buf);
connection->Send(buf, len);
} else return ThrowException(String::New("Bad argument"));

7
src/net.h

@ -24,7 +24,6 @@ protected:
static v8::Handle<v8::Value> Send (const v8::Arguments& args);
static v8::Handle<v8::Value> SendUtf8 (const v8::Arguments& args);
static v8::Handle<v8::Value> Close (const v8::Arguments& args);
static v8::Handle<v8::Value> FullClose (const v8::Arguments& args);
static v8::Handle<v8::Value> ForceClose (const v8::Arguments& args);
static v8::Handle<v8::Value> SetEncoding (const v8::Arguments& args);
static v8::Handle<v8::Value> ReadPause (const v8::Arguments& args);
@ -47,9 +46,8 @@ protected:
int Connect (struct sockaddr *address) {
return evcom_stream_connect (&stream_, address);
}
void Send (evcom_buf *buf) { evcom_stream_write(&stream_, buf); }
void Send (const char *buf, size_t len) { evcom_stream_write(&stream_, buf, len); }
void Close (void) { evcom_stream_close(&stream_); }
void FullClose (void) { evcom_stream_full_close(&stream_); }
void ForceClose (void) { evcom_stream_force_close(&stream_); }
void ReadPause (void) { evcom_stream_read_pause(&stream_); }
void ReadResume (void) { evcom_stream_read_resume(&stream_); }
@ -92,7 +90,8 @@ private:
evcom_stream_detach(s);
assert(connection->stream_.fd < 0);
assert(connection->stream_.recvfd < 0);
assert(connection->stream_.sendfd < 0);
connection->OnClose();

3
test/mjsunit/test-tcp-many-clients.js

@ -18,7 +18,7 @@ var server = node.tcp.createServer(function (c) {
total_connections++;
print("#");
c.send(body);
c.fullClose();
c.close();
});
});
server.listen(port);
@ -29,6 +29,7 @@ function runClient (callback) {
client.setEncoding("utf8");
client.addListener("connect", function () {
print("c");
client.recved = "";
client.connections += 1;
});

2
test/mjsunit/test-tcp-reconnect.js

@ -36,7 +36,7 @@ function onLoad () {
client_recv_count += 1;
puts("client_recv_count " + client_recv_count);
assertEquals("hello\r\n", chunk);
client.fullClose();
client.close();
});
client.addListener("close", function (had_error) {

2
test/mjsunit/test-tcp-throttle-kernel-buffer.js

@ -13,7 +13,7 @@ puts("start server on port " + PORT);
server = node.tcp.createServer(function (connection) {
connection.addListener("connect", function () {
connection.send(body);
connection.fullClose();
connection.close();
});
});
server.listen(PORT);

2
test/mjsunit/test-tcp-throttle.js

@ -5,7 +5,7 @@ N = 500;
server = node.tcp.createServer(function (connection) {
function send (j) {
if (j >= N) {
connection.fullClose();
connection.close();
return;
}
setTimeout(function () {

Loading…
Cancel
Save