diff --git a/src/node_io_watcher.cc b/src/node_io_watcher.cc index 7e516155ab..98c2403eb2 100644 --- a/src/node_io_watcher.cc +++ b/src/node_io_watcher.cc @@ -2,17 +2,41 @@ #include #include +#include #include + +#include /* writev */ +#include +#include /* IOV_MAX */ + +#include +#include + + #include namespace node { using namespace v8; +static ev_prepare dumper; +static Persistent dump_queue; + Persistent IOWatcher::constructor_template; Persistent callback_symbol; +static Persistent next_sym; +static Persistent prev_sym; +static Persistent ondrain_sym; +static Persistent onerror_sym; +static Persistent data_sym; +static Persistent offset_sym; +static Persistent fd_sym; +static Persistent is_unix_socket_sym; +static Persistent first_bucket_sym; +static Persistent last_bucket_sym; + void IOWatcher::Initialize(Handle target) { HandleScope scope; @@ -26,9 +50,32 @@ void IOWatcher::Initialize(Handle target) { NODE_SET_PROTOTYPE_METHOD(constructor_template, "stop", IOWatcher::Stop); NODE_SET_PROTOTYPE_METHOD(constructor_template, "set", IOWatcher::Set); - target->Set(String::NewSymbol("IOWatcher"), constructor_template->GetFunction()); + Local io_watcher = constructor_template->GetFunction(); + target->Set(String::NewSymbol("IOWatcher"), io_watcher); callback_symbol = NODE_PSYMBOL("callback"); + + next_sym = NODE_PSYMBOL("next"); + prev_sym = NODE_PSYMBOL("prev"); + ondrain_sym = NODE_PSYMBOL("ondrain"); + onerror_sym = NODE_PSYMBOL("onerror"); + first_bucket_sym = NODE_PSYMBOL("firstBucket"); + last_bucket_sym = NODE_PSYMBOL("lastBucket"); + offset_sym = NODE_PSYMBOL("offset"); + fd_sym = NODE_PSYMBOL("fd"); + is_unix_socket_sym = NODE_PSYMBOL("isUnixSocket"); + data_sym = NODE_PSYMBOL("data"); + + + ev_prepare_init(&dumper, IOWatcher::Dump); + ev_prepare_start(EV_DEFAULT_UC_ &dumper); + // Need to make sure that Dump runs *after* all other prepare watchers - + // in particular the next tick one. + ev_set_priority(&dumper, EV_MINPRI); + ev_unref(EV_DEFAULT_UC); + + dump_queue = Persistent::New(Object::New()); + io_watcher->Set(String::NewSymbol("dumpQueue"), dump_queue); } @@ -143,6 +190,346 @@ Handle IOWatcher::Set(const Arguments& args) { return Undefined(); } +#define KB 1024 + +/* + * A large javascript object structure is built up in net.js. The function + * Dump is called at the end of each iteration, before select() is called, + * to push all the data out to sockets. + * + * The structure looks like this: + * + * IOWatcher . dumpQueue + * | + * watcher . buckets - b - b - b - b + * | + * watcher . buckets - b - b + * | + * watcher . buckets - b + * | + * watcher . buckets - b - b - b + * + * The 'b' nodes are little javascript objects buckets. Each has a 'data' + * member. 'data' is either a string or buffer. E.G. + * + * b = { data: "hello world" } + * + */ + +// To enable this debug output, add '-DDUMP_DEBUG' to CPPFLAGS +// in 'build/c4che/default.cache.py' and 'make clean all' +#ifdef DUMP_DEBUG +#define DEBUG_PRINT(fmt,...) \ + fprintf(stderr, "(dump:%d) " fmt "\n", __LINE__, ##__VA_ARGS__) +#else +#define DEBUG_PRINT(fmt,...) +#endif + + +void IOWatcher::Dump(EV_P_ ev_prepare *w, int revents) { + assert(revents == EV_PREPARE); + assert(w == &dumper); + + HandleScope scope; + + static struct iovec iov[IOV_MAX]; + + // Loop over all watchers in the dump queue. Each one stands for a socket + // that has stuff to be written out. + // + // There are several possible outcomes for each watcher. + // 1. All the buckets associated with the watcher are written out. In this + // case the watcher is disabled; it is removed from the dump_queue. + // 2. Some of the data was written, but there still remains buckets. In + // this case the watcher is enabled (i.e. we wait for the file + // descriptor to become readable) and we remove it from the dump_queue. + // When it becomes readable, we'll get a callback in net.js and add it + // again to the dump_queue + // 3. writev returns EAGAIN. This is the same as case 2. + // + // In any case, the dump queue should be empty when we exit this function. + // (See the assert at the end of the outermost for loop. + Local watcher_v; + Local watcher; + + for (watcher_v = dump_queue->Get(next_sym); + watcher_v->IsObject(); + dump_queue->Set(next_sym, (watcher_v = watcher->Get(next_sym))), + watcher->Set(next_sym, Null())) { + watcher = watcher_v->ToObject(); + + IOWatcher *io = ObjectWrap::Unwrap(watcher); + + // stats (just for fun) + io->dumps_++; + io->last_dump_ = ev_now(EV_DEFAULT_UC); + + DEBUG_PRINT("dumping fd %d", io->watcher_.fd); + + // Number of items we've stored in iov + int iovcnt = 0; + // Number of bytes we've stored in iov + size_t to_write = 0; + + bool unix_socket = false; + if (watcher->Has(is_unix_socket_sym) && watcher->Get(is_unix_socket_sym)->IsTrue()) { + unix_socket = true; + } + + // Unix sockets don't like huge messages. TCP sockets do. + // TODO: handle EMSGSIZE after sendmsg(). + size_t max_to_write = unix_socket ? 8*KB : 64*KB; + + int fd_to_send = -1; + + // Offset is only as large as the first buffer of data. (See assert + // below) Offset > 0 occurs when a previous writev could not entirely + // drain a bucket. + size_t offset = 0; + if (watcher->Has(offset_sym)) { + offset = watcher->Get(offset_sym)->Uint32Value(); + } + size_t first_offset = offset; + + + // Loop over all the buckets for this particular watcher/socket in order + // to fill iov. + Local bucket_v; + Local bucket; + unsigned int bucket_index = 0; + + for (bucket_v = watcher->Get(first_bucket_sym); + // Break if we have an FD to send. + // sendmsg can only handle one FD at a time. + fd_to_send < 0 && + // break if we've hit the end + bucket_v->IsObject() && + // break if iov contains a lot of data + to_write < max_to_write && + // break if iov is running out of space + iovcnt < IOV_MAX; + bucket_v = bucket->Get(next_sym), bucket_index++) { + assert(bucket_v->IsObject()); + bucket = bucket_v->ToObject(); + + Local data_v = bucket->Get(data_sym); + // net.js will be setting this 'data' value. We can ensure that it is + // never empty. + assert(!data_v.IsEmpty()); + + Local buf_object; + + if (data_v->IsString()) { + // TODO: insert v8::String::Pointers() hack here. + // TODO: handle different encodings. + Local s = data_v->ToString(); + buf_object = Local::New(Buffer::New(s)); + bucket->Set(data_sym, buf_object); + } else if (Buffer::HasInstance(data_v)) { + buf_object = data_v->ToObject(); + } else { + assert(0); + } + + size_t l = Buffer::Length(buf_object); + + assert(first_offset < l); + iov[iovcnt].iov_base = Buffer::Data(buf_object) + first_offset; + iov[iovcnt].iov_len = l - first_offset; + to_write += iov[iovcnt].iov_len; + iovcnt++; + + first_offset = 0; // only the first buffer will be offset. + + if (unix_socket && bucket->Has(fd_sym)) { + Local fd_v = bucket->Get(fd_sym); + if (fd_v->IsInt32()) { + fd_to_send = fd_v->Int32Value(); + DEBUG_PRINT("got fd to send: %d", fd_to_send); + assert(fd_to_send >= 0); + } + } + } + + if (to_write == 0) continue; + + ssize_t written; + + if (unix_socket) { + struct msghdr msg; + char scratch[64]; + + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_iov = iov; + msg.msg_iovlen = iovcnt; + msg.msg_control = NULL; // void* + msg.msg_controllen = 0; // socklen_t + msg.msg_flags = 0; // int + + if (fd_to_send >= 0) { + struct cmsghdr *cmsg; + + msg.msg_control = (void *) scratch; + msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send)); + + cmsg = CMSG_FIRSTHDR(&msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = msg.msg_controllen; + *(int*) CMSG_DATA(cmsg) = fd_to_send; + } + + written = sendmsg(io->watcher_.fd, &msg, 0); + } else { + written = writev(io->watcher_.fd, iov, iovcnt); + } + + DEBUG_PRINT("iovcnt: %d, to_write: %ld, written: %ld", + iovcnt, + to_write, + written); + + if (written < 0) { + // Allow EAGAIN. + // TODO: handle EMSGSIZE after sendmsg(). + if (errno == EAGAIN) { + io->Start(); + } else { + // Emit error event + if (watcher->Has(onerror_sym)) { + Local callback_v = io->handle_->Get(onerror_sym); + assert(callback_v->IsFunction()); + Local callback = Local::Cast(callback_v); + + Local argv[1] = { Integer::New(errno) }; + + TryCatch try_catch; + + callback->Call(io->handle_, 1, argv); + + if (try_catch.HasCaught()) { + FatalException(try_catch); + } + } + } + // Continue with the next watcher. + continue; + } + + // what about written == 0 ? + + // Now drop the buckets that have been written. + bucket_index = 0; + + while (written > 0) { + bucket_v = watcher->Get(first_bucket_sym); + if (!bucket_v->IsObject()) { + // No more buckets in the queue. Make sure the last_bucket_sym is + // updated and then go to the next watcher. + watcher->Set(last_bucket_sym, Null()); + break; + } + + bucket = bucket_v->ToObject(); + + Local data_v = bucket->Get(data_sym); + assert(!data_v.IsEmpty()); + + // At the moment we're turning all string into buffers + // so we assert that this is not a string. However, when the + // "Pointer patch" lands, this assert will need to be removed. + assert(!data_v->IsString()); + // When the "Pointer patch" lands, we will need to be careful + // to somehow store the length of strings that we're optimizing on + // so that it need not be recalculated here. Note the "Pointer patch" + // will only apply to ASCII strings - UTF8 ones will need to be + // serialized onto a buffer. + size_t bucket_len = Buffer::Length(data_v->ToObject()); + + + if (unix_socket && bucket->Has(fd_sym)) { + bucket->Set(fd_sym, Null()); + } + + + assert(bucket_len > offset); + DEBUG_PRINT("[%ld] bucket_len: %ld, offset: %ld", bucket_index, bucket_len, offset); + + // Only on the first bucket does is the offset > 0. + if (offset + written < bucket_len) { + // we have not written the entire bucket + DEBUG_PRINT("[%ld] Only wrote part of the buffer. " + "setting watcher.offset = %ld", + bucket_index, + offset + written); + + watcher->Set(offset_sym, + Integer::NewFromUnsigned(offset + written)); + break; + } else { + DEBUG_PRINT("[%ld] wrote the whole bucket. discarding.", + bucket_index); + + written -= bucket_len - offset; + + // Offset is now zero + watcher->Set(offset_sym, Integer::NewFromUnsigned(0)); + } + + offset = 0; // the next bucket will have zero offset; + bucket_index++; + + // unshift + watcher->Set(first_bucket_sym, bucket->Get(next_sym)); + } + + // Finished dumping the buckets. + // + // If our list of buckets is empty, we can emit 'drain' and forget about + // this socket. Nothing needs to be done. + // + // Otherwise we need to prepare the io_watcher to wait for the interface + // to become writable again. + + if (watcher->Get(first_bucket_sym)->IsObject()) { + // Still have buckets to be written. Wait for fd to become writable. + io->Start(); + + DEBUG_PRINT("Started watcher %d", io->watcher_.fd); + } else { + // No more buckets in the queue. Make sure the last_bucket_sym is + // updated and then go to the next watcher. + watcher->Set(last_bucket_sym, Null()); + + // Emptied the buckets queue for this socket. Don't wait for it to + // become writable. + io->Stop(); + + DEBUG_PRINT("Stop watcher %d", io->watcher_.fd); + + // Emit drain event + if (watcher->Has(ondrain_sym)) { + Local callback_v = io->handle_->Get(ondrain_sym); + assert(callback_v->IsFunction()); + Local callback = Local::Cast(callback_v); + + TryCatch try_catch; + + callback->Call(io->handle_, 0, NULL); + + if (try_catch.HasCaught()) { + FatalException(try_catch); + } + } + } + } + + // Assert that the dump_queue is empty. + assert(!dump_queue->Get(next_sym)->IsObject()); +} + + } // namespace node diff --git a/src/node_io_watcher.h b/src/node_io_watcher.h index 06d431ece9..2c2de7809f 100644 --- a/src/node_io_watcher.h +++ b/src/node_io_watcher.h @@ -33,9 +33,15 @@ class IOWatcher : ObjectWrap { private: static void Callback(EV_P_ ev_io *watcher, int revents); + static void Dump(EV_P_ ev_prepare *watcher, int revents); + void Start(); void Stop(); + // stats. TODO: expose to js, add reset() method + uint64_t dumps_; + ev_tstamp last_dump_; + ev_io watcher_; }; diff --git a/src/node_net.cc b/src/node_net.cc index a74e2dcef0..f068fc802f 100644 --- a/src/node_net.cc +++ b/src/node_net.cc @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -37,7 +38,6 @@ #define ARRAY_SIZE(a) (sizeof(a) / sizeof(*(a))) - namespace node { using namespace v8; diff --git a/test/simple/test-dumper-unix.js b/test/simple/test-dumper-unix.js new file mode 100644 index 0000000000..d6e7457c72 --- /dev/null +++ b/test/simple/test-dumper-unix.js @@ -0,0 +1,133 @@ +var assert =require('assert'); +var IOWatcher = process.binding('io_watcher').IOWatcher; +var errnoException = process.binding('net').errnoException; +var close = process.binding('net').close; +var net = require('net'); + +var ncomplete = 0; + +function test (N, b, cb) { + var fdsSent = 0; + var fdsRecv = 0; + //console.trace(); + var expected = N * b.length; + var nread = 0; + + // Create a socketpair + var fds = process.binding('net').socketpair(); + + // Use writev/dumper to send data down the one of the sockets, fds[1]. + // This requires a IOWatcher. + var w = new IOWatcher(); + w.set(fds[1], false, true); + w.isUnixSocket = true; + + w.callback = function (readable, writable) { + assert.ok(!readable && writable); // not really important. + // Insert watcher into dumpQueue + w.next = IOWatcher.dumpQueue.next; + IOWatcher.dumpQueue.next = w; + } + + var ndrain = 0; + w.ondrain = function () { + ndrain++; + } + + var nerror = 0; + w.onerror = function (errno) { + throw errnoException(errno); + nerror++; + } + + // The read end, fds[0], will be used to count how much comes through. + // This sets up a readable stream on fds[0]. + var stream = new net.Stream({ fd: fds[0], type: 'unix' }); + //stream.readable = true; + stream.resume(); + + stream.on('fd', function (fd) { + console.log('got fd %d', fd); + fdsRecv++; + }); + + // Count the data as it arrives on the other end + stream.on('data', function (d) { + nread += d.length; + + if (nread >= expected) { + assert.ok(nread === expected); + assert.equal(1, ndrain); + assert.equal(0, nerror); + console.error("done. wrote %d bytes\n", nread); + close(fds[1]); + } + }); + + + stream.on('close', function () { + assert.equal(fdsSent, fdsRecv); + // check to make sure the watcher isn't in the dump queue. + for (var x = IOWatcher.dumpQueue; x; x = x.next) { + assert.ok(x !== w); + } + assert.equal(null, w.next); + // completely flushed + assert.ok(!w.firstBucket); + assert.ok(!w.lastBucket); + + ncomplete++; + if (cb) cb(); + }); + + + // Insert watcher into dumpQueue + w.next = IOWatcher.dumpQueue.next; + IOWatcher.dumpQueue.next = w; + + w.firstBucket = { data: b }; + w.lastBucket = w.firstBucket; + + for (var i = 0; i < N-1; i++) { + var bucket = { data: b }; + w.lastBucket.next = bucket; + w.lastBucket = bucket; + // Kind of randomly fill these buckets with fds. + if (fdsSent < 5 && i % 2 == 0) { + bucket.fd = 1; // send stdout + fdsSent++; + } + } +} + + +function runTests (values) { + expectedToComplete = values.length; + + function go () { + if (ncomplete < values.length) { + var v = values[ncomplete]; + console.log("test N=%d, size=%d", v[0], v[1].length); + test(v[0], v[1], go); + } + } + + go(); +} + +runTests([ [30, Buffer(1000)] + , [4, Buffer(10000)] + , [1, "hello world\n"] + , [50, Buffer(1024*1024)] + , [500, Buffer(40960+1)] + , [500, Buffer(40960-1)] + , [500, Buffer(40960)] + , [500, Buffer(1024*1024+1)] + , [50000, "hello world\n"] + ]); + + +process.on('exit', function () { + assert.equal(expectedToComplete, ncomplete); +}); + diff --git a/test/simple/test-dumper.js b/test/simple/test-dumper.js new file mode 100644 index 0000000000..af9ad87018 --- /dev/null +++ b/test/simple/test-dumper.js @@ -0,0 +1,127 @@ +var assert =require('assert'); +var IOWatcher = process.binding('io_watcher').IOWatcher; +var errnoException = process.binding('net').errnoException; +var close = process.binding('net').close; +var net = require('net'); + +var ncomplete = 0; + + + + + +function test (N, b, cb) { + //console.trace(); + var expected = N * b.length; + var nread = 0; + + // Create a pipe + var fds = process.binding('net').pipe(); + console.log("fds == %j", fds); + + // Use writev/dumper to send data down the write end of the pipe, fds[1]. + // This requires a IOWatcher. + var w = new IOWatcher(); + w.set(fds[1], false, true); + + w.callback = function (readable, writable) { + assert.ok(!readable && writable); // not really important. + // Insert watcher into dumpQueue + w.next = IOWatcher.dumpQueue.next; + IOWatcher.dumpQueue.next = w; + } + + var ndrain = 0; + w.ondrain = function () { + ndrain++; + } + + var nerror = 0; + w.onerror = function (errno) { + throw errnoException(errno); + nerror++; + } + + // The read end, fds[0], will be used to count how much comes through. + // This sets up a readable stream on fds[0]. + var stream = new net.Stream(); + stream.open(fds[0]); + stream.readable = true; + stream.resume(); + + + // Count the data as it arrives on the read end of the pipe. + stream.on('data', function (d) { + nread += d.length; + + if (nread >= expected) { + assert.ok(nread === expected); + assert.equal(1, ndrain); + assert.equal(0, nerror); + console.error("done. wrote %d bytes\n", nread); + close(fds[1]); + } + }); + + stream.on('close', function () { + // check to make sure the watcher isn't in the dump queue. + for (var x = IOWatcher.dumpQueue; x; x = x.next) { + assert.ok(x !== w); + } + assert.equal(null, w.next); + // completely flushed + assert.ok(!w.firstBucket); + assert.ok(!w.lastBucket); + + ncomplete++; + if (cb) cb(); + }); + + + // Insert watcher into dumpQueue + w.next = IOWatcher.dumpQueue.next; + IOWatcher.dumpQueue.next = w; + + w.firstBucket = { data: b }; + w.lastBucket = w.firstBucket; + + for (var i = 0; i < N-1; i++) { + var bucket = { data: b }; + assert.ok(!w.lastBucket.next); + w.lastBucket.next = bucket; + w.lastBucket = bucket; + } +} + + +function runTests (values) { + expectedToComplete = values.length; + + function go () { + if (ncomplete < values.length) { + var v = values[ncomplete]; + console.log("test N=%d, size=%d", v[0], v[1].length); + test(v[0], v[1], go); + } + } + + go(); +} + +runTests([ [3, Buffer(1000)], + [30, Buffer(1000)], + [4, Buffer(10000)], + [1, "hello world\n"], + [50, Buffer(1024*1024)], + [500, Buffer(40960+1)], + [500, Buffer(40960-1)], + [500, Buffer(40960)], + [500, Buffer(1024*1024+1)], + [50000, "hello world\n"] + ]); + + +process.on('exit', function () { + assert.equal(expectedToComplete, ncomplete); +}); +