Browse Source

IOWatcher::Dump(), writev

v0.7.4-release
Ryan Dahl 14 years ago
parent
commit
913789da3e
  1. 389
      src/node_io_watcher.cc
  2. 6
      src/node_io_watcher.h
  3. 2
      src/node_net.cc
  4. 133
      test/simple/test-dumper-unix.js
  5. 127
      test/simple/test-dumper.js

389
src/node_io_watcher.cc

@ -2,17 +2,41 @@
#include <node_io_watcher.h> #include <node_io_watcher.h>
#include <node.h> #include <node.h>
#include <node_buffer.h>
#include <v8.h> #include <v8.h>
#include <sys/uio.h> /* writev */
#include <errno.h>
#include <limits.h> /* IOV_MAX */
#include <sys/types.h>
#include <sys/socket.h>
#include <assert.h> #include <assert.h>
namespace node { namespace node {
using namespace v8; using namespace v8;
static ev_prepare dumper;
static Persistent<Object> dump_queue;
Persistent<FunctionTemplate> IOWatcher::constructor_template; Persistent<FunctionTemplate> IOWatcher::constructor_template;
Persistent<String> callback_symbol; Persistent<String> callback_symbol;
static Persistent<String> next_sym;
static Persistent<String> prev_sym;
static Persistent<String> ondrain_sym;
static Persistent<String> onerror_sym;
static Persistent<String> data_sym;
static Persistent<String> offset_sym;
static Persistent<String> fd_sym;
static Persistent<String> is_unix_socket_sym;
static Persistent<String> first_bucket_sym;
static Persistent<String> last_bucket_sym;
void IOWatcher::Initialize(Handle<Object> target) { void IOWatcher::Initialize(Handle<Object> target) {
HandleScope scope; HandleScope scope;
@ -26,9 +50,32 @@ void IOWatcher::Initialize(Handle<Object> target) {
NODE_SET_PROTOTYPE_METHOD(constructor_template, "stop", IOWatcher::Stop); NODE_SET_PROTOTYPE_METHOD(constructor_template, "stop", IOWatcher::Stop);
NODE_SET_PROTOTYPE_METHOD(constructor_template, "set", IOWatcher::Set); NODE_SET_PROTOTYPE_METHOD(constructor_template, "set", IOWatcher::Set);
target->Set(String::NewSymbol("IOWatcher"), constructor_template->GetFunction()); Local<Function> io_watcher = constructor_template->GetFunction();
target->Set(String::NewSymbol("IOWatcher"), io_watcher);
callback_symbol = NODE_PSYMBOL("callback"); callback_symbol = NODE_PSYMBOL("callback");
next_sym = NODE_PSYMBOL("next");
prev_sym = NODE_PSYMBOL("prev");
ondrain_sym = NODE_PSYMBOL("ondrain");
onerror_sym = NODE_PSYMBOL("onerror");
first_bucket_sym = NODE_PSYMBOL("firstBucket");
last_bucket_sym = NODE_PSYMBOL("lastBucket");
offset_sym = NODE_PSYMBOL("offset");
fd_sym = NODE_PSYMBOL("fd");
is_unix_socket_sym = NODE_PSYMBOL("isUnixSocket");
data_sym = NODE_PSYMBOL("data");
ev_prepare_init(&dumper, IOWatcher::Dump);
ev_prepare_start(EV_DEFAULT_UC_ &dumper);
// Need to make sure that Dump runs *after* all other prepare watchers -
// in particular the next tick one.
ev_set_priority(&dumper, EV_MINPRI);
ev_unref(EV_DEFAULT_UC);
dump_queue = Persistent<Object>::New(Object::New());
io_watcher->Set(String::NewSymbol("dumpQueue"), dump_queue);
} }
@ -143,6 +190,346 @@ Handle<Value> IOWatcher::Set(const Arguments& args) {
return Undefined(); return Undefined();
} }
#define KB 1024
/*
* A large javascript object structure is built up in net.js. The function
* Dump is called at the end of each iteration, before select() is called,
* to push all the data out to sockets.
*
* The structure looks like this:
*
* IOWatcher . dumpQueue
* |
* watcher . buckets - b - b - b - b
* |
* watcher . buckets - b - b
* |
* watcher . buckets - b
* |
* watcher . buckets - b - b - b
*
* The 'b' nodes are little javascript objects buckets. Each has a 'data'
* member. 'data' is either a string or buffer. E.G.
*
* b = { data: "hello world" }
*
*/
// To enable this debug output, add '-DDUMP_DEBUG' to CPPFLAGS
// in 'build/c4che/default.cache.py' and 'make clean all'
#ifdef DUMP_DEBUG
#define DEBUG_PRINT(fmt,...) \
fprintf(stderr, "(dump:%d) " fmt "\n", __LINE__, ##__VA_ARGS__)
#else
#define DEBUG_PRINT(fmt,...)
#endif
void IOWatcher::Dump(EV_P_ ev_prepare *w, int revents) {
assert(revents == EV_PREPARE);
assert(w == &dumper);
HandleScope scope;
static struct iovec iov[IOV_MAX];
// Loop over all watchers in the dump queue. Each one stands for a socket
// that has stuff to be written out.
//
// There are several possible outcomes for each watcher.
// 1. All the buckets associated with the watcher are written out. In this
// case the watcher is disabled; it is removed from the dump_queue.
// 2. Some of the data was written, but there still remains buckets. In
// this case the watcher is enabled (i.e. we wait for the file
// descriptor to become readable) and we remove it from the dump_queue.
// When it becomes readable, we'll get a callback in net.js and add it
// again to the dump_queue
// 3. writev returns EAGAIN. This is the same as case 2.
//
// In any case, the dump queue should be empty when we exit this function.
// (See the assert at the end of the outermost for loop.
Local<Value> watcher_v;
Local<Object> watcher;
for (watcher_v = dump_queue->Get(next_sym);
watcher_v->IsObject();
dump_queue->Set(next_sym, (watcher_v = watcher->Get(next_sym))),
watcher->Set(next_sym, Null())) {
watcher = watcher_v->ToObject();
IOWatcher *io = ObjectWrap::Unwrap<IOWatcher>(watcher);
// stats (just for fun)
io->dumps_++;
io->last_dump_ = ev_now(EV_DEFAULT_UC);
DEBUG_PRINT("dumping fd %d", io->watcher_.fd);
// Number of items we've stored in iov
int iovcnt = 0;
// Number of bytes we've stored in iov
size_t to_write = 0;
bool unix_socket = false;
if (watcher->Has(is_unix_socket_sym) && watcher->Get(is_unix_socket_sym)->IsTrue()) {
unix_socket = true;
}
// Unix sockets don't like huge messages. TCP sockets do.
// TODO: handle EMSGSIZE after sendmsg().
size_t max_to_write = unix_socket ? 8*KB : 64*KB;
int fd_to_send = -1;
// Offset is only as large as the first buffer of data. (See assert
// below) Offset > 0 occurs when a previous writev could not entirely
// drain a bucket.
size_t offset = 0;
if (watcher->Has(offset_sym)) {
offset = watcher->Get(offset_sym)->Uint32Value();
}
size_t first_offset = offset;
// Loop over all the buckets for this particular watcher/socket in order
// to fill iov.
Local<Value> bucket_v;
Local<Object> bucket;
unsigned int bucket_index = 0;
for (bucket_v = watcher->Get(first_bucket_sym);
// Break if we have an FD to send.
// sendmsg can only handle one FD at a time.
fd_to_send < 0 &&
// break if we've hit the end
bucket_v->IsObject() &&
// break if iov contains a lot of data
to_write < max_to_write &&
// break if iov is running out of space
iovcnt < IOV_MAX;
bucket_v = bucket->Get(next_sym), bucket_index++) {
assert(bucket_v->IsObject());
bucket = bucket_v->ToObject();
Local<Value> data_v = bucket->Get(data_sym);
// net.js will be setting this 'data' value. We can ensure that it is
// never empty.
assert(!data_v.IsEmpty());
Local<Object> buf_object;
if (data_v->IsString()) {
// TODO: insert v8::String::Pointers() hack here.
// TODO: handle different encodings.
Local<String> s = data_v->ToString();
buf_object = Local<Object>::New(Buffer::New(s));
bucket->Set(data_sym, buf_object);
} else if (Buffer::HasInstance(data_v)) {
buf_object = data_v->ToObject();
} else {
assert(0);
}
size_t l = Buffer::Length(buf_object);
assert(first_offset < l);
iov[iovcnt].iov_base = Buffer::Data(buf_object) + first_offset;
iov[iovcnt].iov_len = l - first_offset;
to_write += iov[iovcnt].iov_len;
iovcnt++;
first_offset = 0; // only the first buffer will be offset.
if (unix_socket && bucket->Has(fd_sym)) {
Local<Value> fd_v = bucket->Get(fd_sym);
if (fd_v->IsInt32()) {
fd_to_send = fd_v->Int32Value();
DEBUG_PRINT("got fd to send: %d", fd_to_send);
assert(fd_to_send >= 0);
}
}
}
if (to_write == 0) continue;
ssize_t written;
if (unix_socket) {
struct msghdr msg;
char scratch[64];
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = iovcnt;
msg.msg_control = NULL; // void*
msg.msg_controllen = 0; // socklen_t
msg.msg_flags = 0; // int
if (fd_to_send >= 0) {
struct cmsghdr *cmsg;
msg.msg_control = (void *) scratch;
msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send));
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = msg.msg_controllen;
*(int*) CMSG_DATA(cmsg) = fd_to_send;
}
written = sendmsg(io->watcher_.fd, &msg, 0);
} else {
written = writev(io->watcher_.fd, iov, iovcnt);
}
DEBUG_PRINT("iovcnt: %d, to_write: %ld, written: %ld",
iovcnt,
to_write,
written);
if (written < 0) {
// Allow EAGAIN.
// TODO: handle EMSGSIZE after sendmsg().
if (errno == EAGAIN) {
io->Start();
} else {
// Emit error event
if (watcher->Has(onerror_sym)) {
Local<Value> callback_v = io->handle_->Get(onerror_sym);
assert(callback_v->IsFunction());
Local<Function> callback = Local<Function>::Cast(callback_v);
Local<Value> argv[1] = { Integer::New(errno) };
TryCatch try_catch;
callback->Call(io->handle_, 1, argv);
if (try_catch.HasCaught()) {
FatalException(try_catch);
}
}
}
// Continue with the next watcher.
continue;
}
// what about written == 0 ?
// Now drop the buckets that have been written.
bucket_index = 0;
while (written > 0) {
bucket_v = watcher->Get(first_bucket_sym);
if (!bucket_v->IsObject()) {
// No more buckets in the queue. Make sure the last_bucket_sym is
// updated and then go to the next watcher.
watcher->Set(last_bucket_sym, Null());
break;
}
bucket = bucket_v->ToObject();
Local<Value> data_v = bucket->Get(data_sym);
assert(!data_v.IsEmpty());
// At the moment we're turning all string into buffers
// so we assert that this is not a string. However, when the
// "Pointer patch" lands, this assert will need to be removed.
assert(!data_v->IsString());
// When the "Pointer patch" lands, we will need to be careful
// to somehow store the length of strings that we're optimizing on
// so that it need not be recalculated here. Note the "Pointer patch"
// will only apply to ASCII strings - UTF8 ones will need to be
// serialized onto a buffer.
size_t bucket_len = Buffer::Length(data_v->ToObject());
if (unix_socket && bucket->Has(fd_sym)) {
bucket->Set(fd_sym, Null());
}
assert(bucket_len > offset);
DEBUG_PRINT("[%ld] bucket_len: %ld, offset: %ld", bucket_index, bucket_len, offset);
// Only on the first bucket does is the offset > 0.
if (offset + written < bucket_len) {
// we have not written the entire bucket
DEBUG_PRINT("[%ld] Only wrote part of the buffer. "
"setting watcher.offset = %ld",
bucket_index,
offset + written);
watcher->Set(offset_sym,
Integer::NewFromUnsigned(offset + written));
break;
} else {
DEBUG_PRINT("[%ld] wrote the whole bucket. discarding.",
bucket_index);
written -= bucket_len - offset;
// Offset is now zero
watcher->Set(offset_sym, Integer::NewFromUnsigned(0));
}
offset = 0; // the next bucket will have zero offset;
bucket_index++;
// unshift
watcher->Set(first_bucket_sym, bucket->Get(next_sym));
}
// Finished dumping the buckets.
//
// If our list of buckets is empty, we can emit 'drain' and forget about
// this socket. Nothing needs to be done.
//
// Otherwise we need to prepare the io_watcher to wait for the interface
// to become writable again.
if (watcher->Get(first_bucket_sym)->IsObject()) {
// Still have buckets to be written. Wait for fd to become writable.
io->Start();
DEBUG_PRINT("Started watcher %d", io->watcher_.fd);
} else {
// No more buckets in the queue. Make sure the last_bucket_sym is
// updated and then go to the next watcher.
watcher->Set(last_bucket_sym, Null());
// Emptied the buckets queue for this socket. Don't wait for it to
// become writable.
io->Stop();
DEBUG_PRINT("Stop watcher %d", io->watcher_.fd);
// Emit drain event
if (watcher->Has(ondrain_sym)) {
Local<Value> callback_v = io->handle_->Get(ondrain_sym);
assert(callback_v->IsFunction());
Local<Function> callback = Local<Function>::Cast(callback_v);
TryCatch try_catch;
callback->Call(io->handle_, 0, NULL);
if (try_catch.HasCaught()) {
FatalException(try_catch);
}
}
}
}
// Assert that the dump_queue is empty.
assert(!dump_queue->Get(next_sym)->IsObject());
}
} // namespace node } // namespace node

6
src/node_io_watcher.h

@ -33,9 +33,15 @@ class IOWatcher : ObjectWrap {
private: private:
static void Callback(EV_P_ ev_io *watcher, int revents); static void Callback(EV_P_ ev_io *watcher, int revents);
static void Dump(EV_P_ ev_prepare *watcher, int revents);
void Start(); void Start();
void Stop(); void Stop();
// stats. TODO: expose to js, add reset() method
uint64_t dumps_;
ev_tstamp last_dump_;
ev_io watcher_; ev_io watcher_;
}; };

2
src/node_net.cc

@ -3,6 +3,7 @@
#include <node.h> #include <node.h>
#include <node_buffer.h> #include <node_buffer.h>
#include <node_io_watcher.h>
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
@ -37,7 +38,6 @@
#define ARRAY_SIZE(a) (sizeof(a) / sizeof(*(a))) #define ARRAY_SIZE(a) (sizeof(a) / sizeof(*(a)))
namespace node { namespace node {
using namespace v8; using namespace v8;

133
test/simple/test-dumper-unix.js

@ -0,0 +1,133 @@
var assert =require('assert');
var IOWatcher = process.binding('io_watcher').IOWatcher;
var errnoException = process.binding('net').errnoException;
var close = process.binding('net').close;
var net = require('net');
var ncomplete = 0;
function test (N, b, cb) {
var fdsSent = 0;
var fdsRecv = 0;
//console.trace();
var expected = N * b.length;
var nread = 0;
// Create a socketpair
var fds = process.binding('net').socketpair();
// Use writev/dumper to send data down the one of the sockets, fds[1].
// This requires a IOWatcher.
var w = new IOWatcher();
w.set(fds[1], false, true);
w.isUnixSocket = true;
w.callback = function (readable, writable) {
assert.ok(!readable && writable); // not really important.
// Insert watcher into dumpQueue
w.next = IOWatcher.dumpQueue.next;
IOWatcher.dumpQueue.next = w;
}
var ndrain = 0;
w.ondrain = function () {
ndrain++;
}
var nerror = 0;
w.onerror = function (errno) {
throw errnoException(errno);
nerror++;
}
// The read end, fds[0], will be used to count how much comes through.
// This sets up a readable stream on fds[0].
var stream = new net.Stream({ fd: fds[0], type: 'unix' });
//stream.readable = true;
stream.resume();
stream.on('fd', function (fd) {
console.log('got fd %d', fd);
fdsRecv++;
});
// Count the data as it arrives on the other end
stream.on('data', function (d) {
nread += d.length;
if (nread >= expected) {
assert.ok(nread === expected);
assert.equal(1, ndrain);
assert.equal(0, nerror);
console.error("done. wrote %d bytes\n", nread);
close(fds[1]);
}
});
stream.on('close', function () {
assert.equal(fdsSent, fdsRecv);
// check to make sure the watcher isn't in the dump queue.
for (var x = IOWatcher.dumpQueue; x; x = x.next) {
assert.ok(x !== w);
}
assert.equal(null, w.next);
// completely flushed
assert.ok(!w.firstBucket);
assert.ok(!w.lastBucket);
ncomplete++;
if (cb) cb();
});
// Insert watcher into dumpQueue
w.next = IOWatcher.dumpQueue.next;
IOWatcher.dumpQueue.next = w;
w.firstBucket = { data: b };
w.lastBucket = w.firstBucket;
for (var i = 0; i < N-1; i++) {
var bucket = { data: b };
w.lastBucket.next = bucket;
w.lastBucket = bucket;
// Kind of randomly fill these buckets with fds.
if (fdsSent < 5 && i % 2 == 0) {
bucket.fd = 1; // send stdout
fdsSent++;
}
}
}
function runTests (values) {
expectedToComplete = values.length;
function go () {
if (ncomplete < values.length) {
var v = values[ncomplete];
console.log("test N=%d, size=%d", v[0], v[1].length);
test(v[0], v[1], go);
}
}
go();
}
runTests([ [30, Buffer(1000)]
, [4, Buffer(10000)]
, [1, "hello world\n"]
, [50, Buffer(1024*1024)]
, [500, Buffer(40960+1)]
, [500, Buffer(40960-1)]
, [500, Buffer(40960)]
, [500, Buffer(1024*1024+1)]
, [50000, "hello world\n"]
]);
process.on('exit', function () {
assert.equal(expectedToComplete, ncomplete);
});

127
test/simple/test-dumper.js

@ -0,0 +1,127 @@
var assert =require('assert');
var IOWatcher = process.binding('io_watcher').IOWatcher;
var errnoException = process.binding('net').errnoException;
var close = process.binding('net').close;
var net = require('net');
var ncomplete = 0;
function test (N, b, cb) {
//console.trace();
var expected = N * b.length;
var nread = 0;
// Create a pipe
var fds = process.binding('net').pipe();
console.log("fds == %j", fds);
// Use writev/dumper to send data down the write end of the pipe, fds[1].
// This requires a IOWatcher.
var w = new IOWatcher();
w.set(fds[1], false, true);
w.callback = function (readable, writable) {
assert.ok(!readable && writable); // not really important.
// Insert watcher into dumpQueue
w.next = IOWatcher.dumpQueue.next;
IOWatcher.dumpQueue.next = w;
}
var ndrain = 0;
w.ondrain = function () {
ndrain++;
}
var nerror = 0;
w.onerror = function (errno) {
throw errnoException(errno);
nerror++;
}
// The read end, fds[0], will be used to count how much comes through.
// This sets up a readable stream on fds[0].
var stream = new net.Stream();
stream.open(fds[0]);
stream.readable = true;
stream.resume();
// Count the data as it arrives on the read end of the pipe.
stream.on('data', function (d) {
nread += d.length;
if (nread >= expected) {
assert.ok(nread === expected);
assert.equal(1, ndrain);
assert.equal(0, nerror);
console.error("done. wrote %d bytes\n", nread);
close(fds[1]);
}
});
stream.on('close', function () {
// check to make sure the watcher isn't in the dump queue.
for (var x = IOWatcher.dumpQueue; x; x = x.next) {
assert.ok(x !== w);
}
assert.equal(null, w.next);
// completely flushed
assert.ok(!w.firstBucket);
assert.ok(!w.lastBucket);
ncomplete++;
if (cb) cb();
});
// Insert watcher into dumpQueue
w.next = IOWatcher.dumpQueue.next;
IOWatcher.dumpQueue.next = w;
w.firstBucket = { data: b };
w.lastBucket = w.firstBucket;
for (var i = 0; i < N-1; i++) {
var bucket = { data: b };
assert.ok(!w.lastBucket.next);
w.lastBucket.next = bucket;
w.lastBucket = bucket;
}
}
function runTests (values) {
expectedToComplete = values.length;
function go () {
if (ncomplete < values.length) {
var v = values[ncomplete];
console.log("test N=%d, size=%d", v[0], v[1].length);
test(v[0], v[1], go);
}
}
go();
}
runTests([ [3, Buffer(1000)],
[30, Buffer(1000)],
[4, Buffer(10000)],
[1, "hello world\n"],
[50, Buffer(1024*1024)],
[500, Buffer(40960+1)],
[500, Buffer(40960-1)],
[500, Buffer(40960)],
[500, Buffer(1024*1024+1)],
[50000, "hello world\n"]
]);
process.on('exit', function () {
assert.equal(expectedToComplete, ncomplete);
});
Loading…
Cancel
Save