Browse Source

Upgrade evcom; Add setTimeout method to node.tcp.Connection

The default timeout is 60 seconds, but it can now be changed.

evcom upgrade includes fixes to force_close.
v0.7.4-release
Ryan 16 years ago
parent
commit
7beea2cd5f
  1. 58
      deps/evcom/evcom.c
  2. 5
      deps/evcom/evcom.h
  3. 3
      deps/evcom/test/echo.c
  4. 24
      deps/evcom/test/test.c
  5. 19
      src/net.cc
  6. 5
      src/net.h
  7. 79
      test/mjsunit/test-tcp-timeout.js
  8. 9
      website/api.txt

58
deps/evcom/evcom.c

@ -392,7 +392,7 @@ stream__handshake (evcom_stream *stream)
return OKAY;
}
evcom_stream_reset_timeout(stream);
ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher);
if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN) {
if (0 == gnutls_record_get_direction((stream)->session)) {
@ -411,11 +411,14 @@ stream__handshake (evcom_stream *stream)
stream->flags |= EVCOM_CONNECTED;
if (stream->on_connect) stream->on_connect(stream);
ev_io_start(D_LOOP_(stream) &stream->read_watcher);
ev_io_start(D_LOOP_(stream) &stream->write_watcher);
/* evcom_stream_force_close might have been called. */
if (stream->recvfd >= 0 && stream->sendfd >= 0) {
ev_io_start(D_LOOP_(stream) &stream->read_watcher);
ev_io_start(D_LOOP_(stream) &stream->write_watcher);
stream->send_action = stream_send__data;
stream->recv_action = stream_recv__data;
stream->send_action = stream_send__data;
stream->recv_action = stream_recv__data;
}
return OKAY;
}
@ -541,7 +544,7 @@ stream_recv__data (evcom_stream *stream)
return OKAY;
}
evcom_stream_reset_timeout(stream);
ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher);
assert(recved >= 0);
@ -614,7 +617,7 @@ stream_send__data (evcom_stream *stream)
return OKAY;
}
evcom_stream_reset_timeout(stream);
ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher);
assert(sent >= 0);
@ -638,14 +641,24 @@ stream_send__shutdown (evcom_stream *stream)
int r = shutdown(stream->sendfd, SHUT_WR);
if (r < 0) {
stream->errorno = errno;
evcom_perror("shutdown()", errno);
switch (errno) {
case EINTR:
assert(stream->send_action == stream_send__shutdown);
return OKAY;
case ENOTCONN:
break;
default:
stream->errorno = errno;
evcom_perror("shutdown()", errno);
break;
}
stream->send_action = stream_send__close;
return OKAY;
}
stream->flags &= ~EVCOM_WRITABLE;
stream->send_action = stream_send__wait_for_eof;
return OKAY;
}
@ -985,13 +998,15 @@ on_timeout (EV_P_ ev_timer *watcher, int revents)
assert(watcher == &stream->timeout_watcher);
if (PAUSED(stream)) {
evcom_stream_reset_timeout(stream);
ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher);
return;
}
if (stream->on_timeout) stream->on_timeout(stream);
evcom_stream_force_close(stream);
if (stream->on_close) stream->on_close(stream);
}
static void
@ -1045,7 +1060,7 @@ stream_event (EV_P_ ev_io *w, int revents)
* gnutls_db_set_ptr (stream->session, _);
*/
void
evcom_stream_init (evcom_stream *stream, float timeout)
evcom_stream_init (evcom_stream *stream)
{
stream->flags = 0;
stream->errorno = 0;
@ -1069,7 +1084,7 @@ evcom_stream_init (evcom_stream *stream, float timeout)
stream->gnutls_errorno = 0;
stream->session = NULL;
#endif
ev_timer_init(&stream->timeout_watcher, on_timeout, 0., timeout);
ev_timer_init(&stream->timeout_watcher, on_timeout, 0., 60.);
stream->timeout_watcher.data = stream;
stream->on_connect = NULL;
@ -1098,8 +1113,8 @@ void evcom_stream_force_close (evcom_stream *stream)
if (!DUPLEX(stream) && stream->sendfd >= 0) {
close(stream->sendfd);
stream__set_send_closed(stream);
}
stream__set_send_closed(stream);
evcom_stream_detach(stream);
}
@ -1175,9 +1190,12 @@ close:
}
void
evcom_stream_reset_timeout (evcom_stream *stream)
evcom_stream_reset_timeout (evcom_stream *stream, float timeout)
{
ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher);
stream->timeout_watcher.repeat = timeout;
if (ATTACHED(stream)) {
ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher);
}
}
void
@ -1211,6 +1229,7 @@ void
evcom_stream_read_pause (evcom_stream *stream)
{
stream->flags |= EVCOM_PAUSED;
ev_timer_stop(D_LOOP_(stream) &stream->timeout_watcher);
if (stream->recv_action == stream_recv__data) {
ev_io_stop(D_LOOP_(stream) &stream->read_watcher);
stream->recv_action = stream_recv__wait_for_resume;
@ -1221,12 +1240,13 @@ void
evcom_stream_read_resume (evcom_stream *stream)
{
stream->flags &= ~EVCOM_PAUSED;
evcom_stream_reset_timeout(stream);
ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher);
if (stream->recv_action == stream_recv__wait_for_resume) {
stream->recv_action = stream_recv__data;
}
if (ATTACHED(stream) && READABLE(stream)) {
ev_io_start(D_LOOP_(stream) &stream->read_watcher);
if (ATTACHED(stream)) {
ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher);
if (READABLE(stream)) ev_io_start(D_LOOP_(stream) &stream->read_watcher);
}
}

5
deps/evcom/evcom.h

@ -181,7 +181,7 @@ void evcom_server_attach (EV_P_ evcom_server *);
void evcom_server_detach (evcom_server *);
void evcom_server_close (evcom_server *);
void evcom_stream_init (evcom_stream *, float timeout);
void evcom_stream_init (evcom_stream *);
int evcom_stream_pair (evcom_stream *a, evcom_stream *b);
int evcom_stream_connect (evcom_stream *, struct sockaddr *address);
@ -191,8 +191,7 @@ void evcom_stream_attach (EV_P_ evcom_stream *);
void evcom_stream_detach (evcom_stream *);
void evcom_stream_read_resume (evcom_stream *);
void evcom_stream_read_pause (evcom_stream *);
/* Resets the timeout to stay alive for another stream->timeout seconds */
void evcom_stream_reset_timeout (evcom_stream *);
void evcom_stream_reset_timeout (evcom_stream *, float timeout);
void evcom_stream_write (evcom_stream *, const char *str, size_t len);
/* Once the write buffer is drained, evcom_stream_close will shutdown the
* writing end of the stream and will close the read end once the server

3
deps/evcom/test/echo.c

@ -58,10 +58,11 @@ on_server_connection (evcom_server *server, struct sockaddr *addr)
assert(addr);
evcom_stream *stream = malloc(sizeof(evcom_stream));
evcom_stream_init(stream, TIMEOUT);
evcom_stream_init(stream);
stream->on_read = on_peer_read;
stream->on_close = on_peer_close;
stream->on_timeout = on_peer_timeout;
evcom_stream_reset_timeout(stream, TIMEOUT);
nconnections++;

24
deps/evcom/test/test.c

@ -155,10 +155,11 @@ pingpong_on_server_connection (evcom_server *_server, struct sockaddr *addr)
assert(addr);
evcom_stream *stream = malloc(sizeof(evcom_stream));
evcom_stream_init(stream, PINGPONG_TIMEOUT);
evcom_stream_init(stream);
stream->on_read = pingpong_on_peer_read;
stream->on_close = common_on_peer_close;
stream->on_timeout = common_on_peer_timeout;
evcom_stream_reset_timeout(stream, PINGPONG_TIMEOUT);
assert(EVCOM_INITIALIZED == evcom_stream_state(stream));
@ -226,11 +227,12 @@ pingpong (struct sockaddr *address)
assert(r == 0);
evcom_server_attach(EV_DEFAULT_ &server);
evcom_stream_init(&client, PINGPONG_TIMEOUT);
evcom_stream_init(&client);
client.on_read = pingpong_on_client_read;
client.on_connect = pingpong_on_client_connect;
client.on_close = pingpong_on_client_close;
client.on_timeout = common_on_client_timeout;
evcom_stream_reset_timeout(&client, PINGPONG_TIMEOUT);
assert(EVCOM_INITIALIZED == evcom_stream_state(&client));
@ -274,10 +276,11 @@ connint_on_connection(evcom_server *_server, struct sockaddr *addr)
assert(addr);
evcom_stream *stream = malloc(sizeof(evcom_stream));
evcom_stream_init(stream, CONNINT_TIMEOUT);
evcom_stream_init(stream);
stream->on_read = send_bye_and_close;
stream->on_close = common_on_peer_close;
stream->on_timeout = common_on_peer_timeout;
evcom_stream_reset_timeout(stream, CONNINT_TIMEOUT);
#if EVCOM_HAVE_GNUTLS
if (use_tls) anon_tls_server(stream);
@ -349,11 +352,12 @@ connint (struct sockaddr *address)
int i;
for (i = 0; i < NCONN; i++) {
evcom_stream *client = &clients[i];
evcom_stream_init(client, CONNINT_TIMEOUT);
evcom_stream_init(client);
client->on_read = connint_on_client_read;
client->on_connect = connint_on_client_connect;
client->on_close = connint_on_client_close;
client->on_timeout = common_on_client_timeout;
evcom_stream_reset_timeout(client, CONNINT_TIMEOUT);
#if EVCOM_HAVE_GNUTLS
if (use_tls) anon_tls_client(client);
#endif
@ -554,18 +558,20 @@ pair_pingpong (int use_pipe)
b_got_connect = 0;
pair_pingpong_cnt = 0;
evcom_stream_init(&a, PAIR_PINGPONG_TIMEOUT);
evcom_stream_init(&a);
a.on_close = a_close;
a.on_connect = a_connect;
a.on_read = a_read;
evcom_stream_reset_timeout(&a, PAIR_PINGPONG_TIMEOUT);
#if EVCOM_HAVE_GNUTLS
if (use_tls) anon_tls_client(&a);
#endif
evcom_stream_init(&b, PAIR_PINGPONG_TIMEOUT);
evcom_stream_init(&b);
b.on_close = b_close;
b.on_connect = b_connect;
b.on_read = b_read;
evcom_stream_reset_timeout(&b, PAIR_PINGPONG_TIMEOUT);
#if EVCOM_HAVE_GNUTLS
if (use_tls) anon_tls_server(&b);
#endif
@ -639,10 +645,11 @@ make_echo_connection (evcom_server *server, struct sockaddr *addr)
assert(addr);
evcom_stream *stream = malloc(sizeof(evcom_stream));
evcom_stream_init(stream, ZERO_TIMEOUT);
evcom_stream_init(stream);
stream->on_read = echo;
stream->on_close = free_stream;
stream->on_timeout = error_out;
evcom_stream_reset_timeout(stream, ZERO_TIMEOUT);
#if EVCOM_HAVE_GNUTLS
if (use_tls) anon_tls_server(stream);
@ -717,11 +724,12 @@ zero_stream (struct sockaddr *address, size_t to_write)
evcom_server_attach(EV_DEFAULT_ &server);
evcom_stream client;
evcom_stream_init(&client, ZERO_TIMEOUT);
evcom_stream_init(&client);
client.on_read = zero_recv;
client.on_connect = zero_start;
client.on_close = zero_close;
client.on_timeout = error_out;
evcom_stream_reset_timeout(&client, ZERO_TIMEOUT);
#if EVCOM_HAVE_GNUTLS
if (use_tls) anon_tls_client(&client);
#endif

19
src/net.cc

@ -66,6 +66,7 @@ Connection::Initialize (v8::Handle<v8::Object> target)
NODE_SET_PROTOTYPE_METHOD(constructor_template, "setEncoding", SetEncoding);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "readPause", ReadPause);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "readResume", ReadResume);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "setTimeout", SetTimeout);
constructor_template->PrototypeTemplate()->SetAccessor(
READY_STATE_SYMBOL,
@ -104,8 +105,7 @@ void
Connection::Init (void)
{
resolving_ = false;
double timeout = 60.0; // default
evcom_stream_init(&stream_, timeout);
evcom_stream_init(&stream_);
stream_.on_connect = Connection::on_connect;
stream_.on_read = Connection::on_read;
stream_.on_close = Connection::on_close;
@ -333,6 +333,21 @@ Connection::ReadResume (const Arguments& args)
return Undefined();
}
Handle<Value>
Connection::SetTimeout (const Arguments& args)
{
HandleScope scope;
Connection *connection = ObjectWrap::Unwrap<Connection>(args.This());
assert(connection);
float timeout = (float)(args[0]->IntegerValue()) / 1000;
connection->SetTimeout(timeout);
return Undefined();
}
Handle<Value>
Connection::Close (const Arguments& args)
{

5
src/net.h

@ -28,6 +28,7 @@ protected:
static v8::Handle<v8::Value> SetEncoding (const v8::Arguments& args);
static v8::Handle<v8::Value> ReadPause (const v8::Arguments& args);
static v8::Handle<v8::Value> ReadResume (const v8::Arguments& args);
static v8::Handle<v8::Value> SetTimeout (const v8::Arguments& args);
static v8::Handle<v8::Value> ReadyStateGetter (v8::Local<v8::String> _,
const v8::AccessorInfo& info);
@ -51,6 +52,10 @@ protected:
void ForceClose (void) { evcom_stream_force_close(&stream_); }
void ReadPause (void) { evcom_stream_read_pause(&stream_); }
void ReadResume (void) { evcom_stream_read_resume(&stream_); }
void SetTimeout (float timeout)
{
evcom_stream_reset_timeout(&stream_, timeout);
}
virtual void OnConnect (void);
virtual void OnReceive (const void *buf, size_t len);

79
test/mjsunit/test-tcp-timeout.js

@ -0,0 +1,79 @@
include("mjsunit.js");
port = 9992;
exchanges = 0;
starttime = null;
timeouttime = null;
timeout = 1000;
var echo_server = node.tcp.createServer(function (socket) {
socket.setTimeout(timeout);
socket.addListener("timeout", function (d) {
puts("server timeout");
timeouttime = new Date;
p(timeouttime);
});
socket.addListener("receive", function (d) {
p(d);
socket.send(d);
});
socket.addListener("eof", function () {
socket.close();
});
});
echo_server.listen(port);
puts("server listening at " + port);
var client = node.tcp.createConnection(port);
client.setEncoding("UTF8");
client.setTimeout(0); // disable the timeout for client
client.addListener("connect", function () {
puts("client connected.");
client.send("hello\r\n");
});
client.addListener("receive", function (chunk) {
assertEquals("hello\r\n", chunk);
if (exchanges++ < 5) {
setTimeout(function () {
puts("client send 'hello'");
client.send("hello\r\n");
}, 500);
if (exchanges == 5) {
puts("wait for timeout - should come in " + timeout + " ms");
starttime = new Date;
p(starttime);
}
}
});
client.addListener("timeout", function () {
puts("client timeout - this shouldn't happen");
assertFalse(true);
});
client.addListener("eof", function () {
puts("client eof");
client.close();
});
client.addListener("close", function (had_error) {
puts("client disconnect");
echo_server.close();
assertFalse(had_error);
});
process.addListener("exit", function () {
assertTrue(starttime != null);
assertTrue(timeouttime != null);
diff = timeouttime - starttime;
puts("diff = " + diff);
assertTrue(timeout < diff);
// Allow for 800 milliseconds more
assertTrue(diff < timeout + 800);
});

9
website/api.txt

@ -921,6 +921,9 @@ socket for +node.tcp.Server+.
will be +"writeOnly"+. One should probably
just call +connection.close()+ when this
event is emitted.
|+"timeout"+ | | Emitted if the connection times out from
inactivity. The +"close"+ event will be
emitted immediately following this event.
|+"close"+ | +had_error+ | Emitted once the connection is fully
closed. The argument +had_error+
is a boolean which says if the connection
@ -984,6 +987,12 @@ Useful to throttle back an upload.
+connection.readResume()+::
Resumes reading if reading was paused by +readPause()+.
+connection.setTimeout(timeout)+::
Sets the connection to timeout after +timeout+ milliseconds of inacitivty on
the connection. By default all +node.tcp.Connection+ objects have a timeout
of 60 seconds (60000 ms).
+
If +timeout+ is 0, then the idle timeout is disabled.
=== DNS

Loading…
Cancel
Save