Browse Source

tcp.Connection.prototype.write should return boolean

v0.7.4-release
Ryan Dahl 15 years ago
parent
commit
0e844d3bcb
  1. 10
      deps/evcom/evcom.c
  2. 2
      deps/evcom/evcom.h
  3. 4
      doc/api.txt
  4. 8
      src/node_net.cc
  5. 4
      src/node_net.h
  6. 67
      test/pummel/test-tcp-pause.js
  7. 55
      test/pummel/test-tcp-throttle-kernel-buffer.js
  8. 72
      test/pummel/test-tcp-throttle.js

10
deps/evcom/evcom.c

@ -1139,12 +1139,13 @@ void evcom_stream_force_close (evcom_stream *stream)
evcom_stream_detach(stream);
}
void
/* Returns the number of bytes flushed to the buffer */
ssize_t
evcom_stream_write (evcom_stream *stream, const char *str, size_t len)
{
if (!WRITABLE(stream) || GOT_CLOSE(stream)) {
assert(0 && "Do not write to a closed stream");
return;
return -1;
}
ssize_t sent = 0;
@ -1188,7 +1189,7 @@ evcom_stream_write (evcom_stream *stream, const char *str, size_t len)
} /* TODO else { memcpy to last buffer on head } */
assert(sent >= 0);
if ((size_t)sent == len) return; /* sent the whole buffer */
if ((size_t)sent == len) return sent; /* sent the whole buffer */
len -= sent;
str += sent;
@ -1202,7 +1203,7 @@ evcom_stream_write (evcom_stream *stream, const char *str, size_t len)
if (ATTACHED(stream)) {
ev_io_start(D_LOOP_(stream) &stream->write_watcher);
}
return;
return sent;
close:
stream->send_action = stream_send__close;
@ -1210,6 +1211,7 @@ close:
if (ATTACHED(stream)) {
ev_io_start(D_LOOP_(stream) &stream->write_watcher);
}
return -1;
}
void

2
deps/evcom/evcom.h

@ -194,7 +194,7 @@ void evcom_stream_read_resume (evcom_stream *);
void evcom_stream_read_pause (evcom_stream *);
void evcom_stream_reset_timeout (evcom_stream *, float timeout);
void evcom_stream_set_no_delay (evcom_stream *, int no_delay);
void evcom_stream_write (evcom_stream *, const char *str, size_t len);
ssize_t evcom_stream_write (evcom_stream *, const char *str, size_t len);
/* Once the write buffer is drained, evcom_stream_close will shutdown the
* writing end of the stream and will close the read end once the server
* replies with an EOF.

4
doc/api.txt

@ -1493,6 +1493,10 @@ Sets the encoding (either +"ascii"+, +"utf8"+, or +"binary"+) for data that is r
Sends data on the connection. The second parameter specifies the encoding
in the case of a string--it defaults to ASCII because encoding to UTF8 is
rather slow.
+
Returns +true+ if the entire data was flushed successfully to the kernel
buffer. Returns +false+ if all or part of the data was queued in user memory.
+'drain'+ will be emitted when the buffer is again free.
+connection.close()+::

8
src/node_net.cc

@ -633,12 +633,12 @@ Handle<Value> Connection::Write(const Arguments& args) {
}
char * buf = new char[len];
ssize_t written = DecodeWrite(buf, len, args[0], enc);
assert(written == len);
connection->Write(buf, written);
ssize_t bufsize = DecodeWrite(buf, len, args[0], enc);
assert(bufsize == len);
ssize_t sent = connection->Write(buf, bufsize);
delete [] buf;
return scope.Close(Integer::New(written));
return sent == bufsize ? True() : False();
}
void Connection::OnReceive(const void *buf, size_t len) {

4
src/node_net.h

@ -62,8 +62,8 @@ class Connection : public EventEmitter {
return evcom_stream_connect(&stream_, address);
}
void Write(const char *buf, size_t len) {
evcom_stream_write(&stream_, buf, len);
ssize_t Write(const char *buf, size_t len) {
return evcom_stream_write(&stream_, buf, len);
}
void Close() {

67
test/pummel/test-tcp-pause.js

@ -0,0 +1,67 @@
process.mixin(require("../common"));
tcp = require("tcp");
N = 200;
server = tcp.createServer(function (connection) {
function write (j) {
if (j >= N) {
connection.close();
return;
}
setTimeout(function () {
connection.write("C");
write(j+1);
}, 10);
}
write(0);
});
server.listen(PORT);
recv = "";
chars_recved = 0;
client = tcp.createConnection(PORT);
client.setEncoding("ascii");
client.addListener("data", function (d) {
print(d);
recv += d;
});
setTimeout(function () {
chars_recved = recv.length;
puts("pause at: " + chars_recved);
assert.equal(true, chars_recved > 1);
client.pause();
setTimeout(function () {
puts("resume at: " + chars_recved);
assert.equal(chars_recved, recv.length);
client.resume();
setTimeout(function () {
chars_recved = recv.length;
puts("pause at: " + chars_recved);
client.pause();
setTimeout(function () {
puts("resume at: " + chars_recved);
assert.equal(chars_recved, recv.length);
client.resume();
}, 500);
}, 500);
}, 500);
}, 500);
client.addListener("end", function () {
server.close();
client.close();
});
process.addListener("exit", function () {
assert.equal(N, recv.length);
debug("Exit");
});

55
test/pummel/test-tcp-throttle-kernel-buffer.js

@ -1,55 +0,0 @@
process.mixin(require("../common"));
tcp = require("tcp");
N = 30*1024; // 500kb
puts("build big string");
var body = "";
for (var i = 0; i < N; i++) {
body += "C";
}
puts("start server on port " + PORT);
server = tcp.createServer(function (connection) {
connection.addListener("connect", function () {
connection.write(body);
connection.close();
});
});
server.listen(PORT);
chars_recved = 0;
npauses = 0;
var paused = false;
client = tcp.createConnection(PORT);
client.setEncoding("ascii");
client.addListener("data", function (d) {
chars_recved += d.length;
puts("got " + chars_recved);
if (!paused) {
client.pause();
npauses += 1;
paused = true;
puts("pause");
x = chars_recved;
setTimeout(function () {
assert.equal(chars_recved, x);
client.resume();
puts("resume");
paused = false;
}, 100);
}
});
client.addListener("end", function () {
server.close();
client.close();
});
process.addListener("exit", function () {
assert.equal(N, chars_recved);
assert.equal(true, npauses > 2);
});

72
test/pummel/test-tcp-throttle.js

@ -1,60 +1,48 @@
process.mixin(require("../common"));
tcp = require("tcp");
N = 200;
N = 60*1024; // 30kb
puts("build big string");
var body = "";
for (var i = 0; i < N; i++) {
body += "C";
}
puts("start server on port " + PORT);
server = tcp.createServer(function (connection) {
function write (j) {
if (j >= N) {
connection.addListener("connect", function () {
assert.equal(false, connection.write(body));
connection.close();
return;
}
setTimeout(function () {
connection.write("C");
write(j+1);
}, 10);
}
write(0);
});
});
server.listen(PORT);
recv = "";
chars_recved = 0;
npauses = 0;
var paused = false;
client = tcp.createConnection(PORT);
client.setEncoding("ascii");
client.addListener("data", function (d) {
print(d);
recv += d;
});
setTimeout(function () {
chars_recved = recv.length;
puts("pause at: " + chars_recved);
assert.equal(true, chars_recved > 1);
chars_recved += d.length;
puts("got " + chars_recved);
if (!paused) {
client.pause();
npauses += 1;
paused = true;
puts("pause");
x = chars_recved;
setTimeout(function () {
puts("resume at: " + chars_recved);
assert.equal(chars_recved, recv.length);
assert.equal(chars_recved, x);
client.resume();
setTimeout(function () {
chars_recved = recv.length;
puts("pause at: " + chars_recved);
client.pause();
setTimeout(function () {
puts("resume at: " + chars_recved);
assert.equal(chars_recved, recv.length);
client.resume();
}, 500);
}, 500);
}, 500);
}, 500);
puts("resume");
paused = false;
}, 100);
}
});
client.addListener("end", function () {
server.close();
@ -62,6 +50,6 @@ client.addListener("end", function () {
});
process.addListener("exit", function () {
assert.equal(N, recv.length);
debug("Exit");
assert.equal(N, chars_recved);
assert.equal(true, npauses > 2);
});

Loading…
Cancel
Save