diff --git a/deps/evcom/evcom.c b/deps/evcom/evcom.c index 1ee02a1f0a..1088181cef 100644 --- a/deps/evcom/evcom.c +++ b/deps/evcom/evcom.c @@ -1139,12 +1139,13 @@ void evcom_stream_force_close (evcom_stream *stream) evcom_stream_detach(stream); } -void +/* Returns the number of bytes flushed to the buffer */ +ssize_t evcom_stream_write (evcom_stream *stream, const char *str, size_t len) { if (!WRITABLE(stream) || GOT_CLOSE(stream)) { assert(0 && "Do not write to a closed stream"); - return; + return -1; } ssize_t sent = 0; @@ -1188,7 +1189,7 @@ evcom_stream_write (evcom_stream *stream, const char *str, size_t len) } /* TODO else { memcpy to last buffer on head } */ assert(sent >= 0); - if ((size_t)sent == len) return; /* sent the whole buffer */ + if ((size_t)sent == len) return sent; /* sent the whole buffer */ len -= sent; str += sent; @@ -1202,7 +1203,7 @@ evcom_stream_write (evcom_stream *stream, const char *str, size_t len) if (ATTACHED(stream)) { ev_io_start(D_LOOP_(stream) &stream->write_watcher); } - return; + return sent; close: stream->send_action = stream_send__close; @@ -1210,6 +1211,7 @@ close: if (ATTACHED(stream)) { ev_io_start(D_LOOP_(stream) &stream->write_watcher); } + return -1; } void diff --git a/deps/evcom/evcom.h b/deps/evcom/evcom.h index fd03a5bf05..3b07289fb5 100644 --- a/deps/evcom/evcom.h +++ b/deps/evcom/evcom.h @@ -194,7 +194,7 @@ void evcom_stream_read_resume (evcom_stream *); void evcom_stream_read_pause (evcom_stream *); void evcom_stream_reset_timeout (evcom_stream *, float timeout); void evcom_stream_set_no_delay (evcom_stream *, int no_delay); -void evcom_stream_write (evcom_stream *, const char *str, size_t len); +ssize_t evcom_stream_write (evcom_stream *, const char *str, size_t len); /* Once the write buffer is drained, evcom_stream_close will shutdown the * writing end of the stream and will close the read end once the server * replies with an EOF. diff --git a/doc/api.txt b/doc/api.txt index 9466fc6e45..1e7abcfb20 100644 --- a/doc/api.txt +++ b/doc/api.txt @@ -1493,6 +1493,10 @@ Sets the encoding (either +"ascii"+, +"utf8"+, or +"binary"+) for data that is r Sends data on the connection. The second parameter specifies the encoding in the case of a string--it defaults to ASCII because encoding to UTF8 is rather slow. ++ +Returns +true+ if the entire data was flushed successfully to the kernel +buffer. Returns +false+ if all or part of the data was queued in user memory. ++'drain'+ will be emitted when the buffer is again free. +connection.close()+:: diff --git a/src/node_net.cc b/src/node_net.cc index b9f739c0e5..bc0f67315b 100644 --- a/src/node_net.cc +++ b/src/node_net.cc @@ -633,12 +633,12 @@ Handle Connection::Write(const Arguments& args) { } char * buf = new char[len]; - ssize_t written = DecodeWrite(buf, len, args[0], enc); - assert(written == len); - connection->Write(buf, written); + ssize_t bufsize = DecodeWrite(buf, len, args[0], enc); + assert(bufsize == len); + ssize_t sent = connection->Write(buf, bufsize); delete [] buf; - return scope.Close(Integer::New(written)); + return sent == bufsize ? True() : False(); } void Connection::OnReceive(const void *buf, size_t len) { diff --git a/src/node_net.h b/src/node_net.h index 12e9e1dbd6..8ad1471a6a 100644 --- a/src/node_net.h +++ b/src/node_net.h @@ -62,8 +62,8 @@ class Connection : public EventEmitter { return evcom_stream_connect(&stream_, address); } - void Write(const char *buf, size_t len) { - evcom_stream_write(&stream_, buf, len); + ssize_t Write(const char *buf, size_t len) { + return evcom_stream_write(&stream_, buf, len); } void Close() { diff --git a/test/pummel/test-tcp-pause.js b/test/pummel/test-tcp-pause.js new file mode 100644 index 0000000000..0316e7797d --- /dev/null +++ b/test/pummel/test-tcp-pause.js @@ -0,0 +1,67 @@ +process.mixin(require("../common")); +tcp = require("tcp"); +N = 200; + +server = tcp.createServer(function (connection) { + function write (j) { + if (j >= N) { + connection.close(); + return; + } + setTimeout(function () { + connection.write("C"); + write(j+1); + }, 10); + } + write(0); +}); +server.listen(PORT); + + +recv = ""; +chars_recved = 0; + +client = tcp.createConnection(PORT); +client.setEncoding("ascii"); +client.addListener("data", function (d) { + print(d); + recv += d; +}); + +setTimeout(function () { + chars_recved = recv.length; + puts("pause at: " + chars_recved); + assert.equal(true, chars_recved > 1); + client.pause(); + setTimeout(function () { + puts("resume at: " + chars_recved); + assert.equal(chars_recved, recv.length); + client.resume(); + + setTimeout(function () { + chars_recved = recv.length; + puts("pause at: " + chars_recved); + client.pause(); + + setTimeout(function () { + puts("resume at: " + chars_recved); + assert.equal(chars_recved, recv.length); + client.resume(); + + }, 500); + + }, 500); + + }, 500); + +}, 500); + +client.addListener("end", function () { + server.close(); + client.close(); +}); + +process.addListener("exit", function () { + assert.equal(N, recv.length); + debug("Exit"); +}); diff --git a/test/pummel/test-tcp-throttle-kernel-buffer.js b/test/pummel/test-tcp-throttle-kernel-buffer.js deleted file mode 100644 index 8914e3556d..0000000000 --- a/test/pummel/test-tcp-throttle-kernel-buffer.js +++ /dev/null @@ -1,55 +0,0 @@ -process.mixin(require("../common")); -tcp = require("tcp"); -N = 30*1024; // 500kb - -puts("build big string"); -var body = ""; -for (var i = 0; i < N; i++) { - body += "C"; -} - -puts("start server on port " + PORT); - -server = tcp.createServer(function (connection) { - connection.addListener("connect", function () { - connection.write(body); - connection.close(); - }); -}); -server.listen(PORT); - - -chars_recved = 0; -npauses = 0; - - -var paused = false; -client = tcp.createConnection(PORT); -client.setEncoding("ascii"); -client.addListener("data", function (d) { - chars_recved += d.length; - puts("got " + chars_recved); - if (!paused) { - client.pause(); - npauses += 1; - paused = true; - puts("pause"); - x = chars_recved; - setTimeout(function () { - assert.equal(chars_recved, x); - client.resume(); - puts("resume"); - paused = false; - }, 100); - } -}); - -client.addListener("end", function () { - server.close(); - client.close(); -}); - -process.addListener("exit", function () { - assert.equal(N, chars_recved); - assert.equal(true, npauses > 2); -}); diff --git a/test/pummel/test-tcp-throttle.js b/test/pummel/test-tcp-throttle.js index 0316e7797d..32a7363722 100644 --- a/test/pummel/test-tcp-throttle.js +++ b/test/pummel/test-tcp-throttle.js @@ -1,60 +1,48 @@ process.mixin(require("../common")); tcp = require("tcp"); -N = 200; +N = 60*1024; // 30kb + +puts("build big string"); +var body = ""; +for (var i = 0; i < N; i++) { + body += "C"; +} + +puts("start server on port " + PORT); server = tcp.createServer(function (connection) { - function write (j) { - if (j >= N) { - connection.close(); - return; - } - setTimeout(function () { - connection.write("C"); - write(j+1); - }, 10); - } - write(0); + connection.addListener("connect", function () { + assert.equal(false, connection.write(body)); + connection.close(); + }); }); server.listen(PORT); -recv = ""; chars_recved = 0; +npauses = 0; + +var paused = false; client = tcp.createConnection(PORT); client.setEncoding("ascii"); client.addListener("data", function (d) { - print(d); - recv += d; -}); - -setTimeout(function () { - chars_recved = recv.length; - puts("pause at: " + chars_recved); - assert.equal(true, chars_recved > 1); - client.pause(); - setTimeout(function () { - puts("resume at: " + chars_recved); - assert.equal(chars_recved, recv.length); - client.resume(); - + chars_recved += d.length; + puts("got " + chars_recved); + if (!paused) { + client.pause(); + npauses += 1; + paused = true; + puts("pause"); + x = chars_recved; setTimeout(function () { - chars_recved = recv.length; - puts("pause at: " + chars_recved); - client.pause(); - - setTimeout(function () { - puts("resume at: " + chars_recved); - assert.equal(chars_recved, recv.length); - client.resume(); - - }, 500); - - }, 500); - - }, 500); - -}, 500); + assert.equal(chars_recved, x); + client.resume(); + puts("resume"); + paused = false; + }, 100); + } +}); client.addListener("end", function () { server.close(); @@ -62,6 +50,6 @@ client.addListener("end", function () { }); process.addListener("exit", function () { - assert.equal(N, recv.length); - debug("Exit"); + assert.equal(N, chars_recved); + assert.equal(true, npauses > 2); });