From 913789da3e33a0777bd26ccf022b8264521860ef Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Tue, 2 Nov 2010 15:51:16 -0700 Subject: [PATCH 01/20] IOWatcher::Dump(), writev --- src/node_io_watcher.cc | 389 +++++++++++++++++++++++++++++++- src/node_io_watcher.h | 6 + src/node_net.cc | 2 +- test/simple/test-dumper-unix.js | 133 +++++++++++ test/simple/test-dumper.js | 127 +++++++++++ 5 files changed, 655 insertions(+), 2 deletions(-) create mode 100644 test/simple/test-dumper-unix.js create mode 100644 test/simple/test-dumper.js diff --git a/src/node_io_watcher.cc b/src/node_io_watcher.cc index 7e516155ab..98c2403eb2 100644 --- a/src/node_io_watcher.cc +++ b/src/node_io_watcher.cc @@ -2,17 +2,41 @@ #include #include +#include #include + +#include /* writev */ +#include +#include /* IOV_MAX */ + +#include +#include + + #include namespace node { using namespace v8; +static ev_prepare dumper; +static Persistent dump_queue; + Persistent IOWatcher::constructor_template; Persistent callback_symbol; +static Persistent next_sym; +static Persistent prev_sym; +static Persistent ondrain_sym; +static Persistent onerror_sym; +static Persistent data_sym; +static Persistent offset_sym; +static Persistent fd_sym; +static Persistent is_unix_socket_sym; +static Persistent first_bucket_sym; +static Persistent last_bucket_sym; + void IOWatcher::Initialize(Handle target) { HandleScope scope; @@ -26,9 +50,32 @@ void IOWatcher::Initialize(Handle target) { NODE_SET_PROTOTYPE_METHOD(constructor_template, "stop", IOWatcher::Stop); NODE_SET_PROTOTYPE_METHOD(constructor_template, "set", IOWatcher::Set); - target->Set(String::NewSymbol("IOWatcher"), constructor_template->GetFunction()); + Local io_watcher = constructor_template->GetFunction(); + target->Set(String::NewSymbol("IOWatcher"), io_watcher); callback_symbol = NODE_PSYMBOL("callback"); + + next_sym = NODE_PSYMBOL("next"); + prev_sym = NODE_PSYMBOL("prev"); + ondrain_sym = NODE_PSYMBOL("ondrain"); + onerror_sym = NODE_PSYMBOL("onerror"); + first_bucket_sym = NODE_PSYMBOL("firstBucket"); + last_bucket_sym = NODE_PSYMBOL("lastBucket"); + offset_sym = NODE_PSYMBOL("offset"); + fd_sym = NODE_PSYMBOL("fd"); + is_unix_socket_sym = NODE_PSYMBOL("isUnixSocket"); + data_sym = NODE_PSYMBOL("data"); + + + ev_prepare_init(&dumper, IOWatcher::Dump); + ev_prepare_start(EV_DEFAULT_UC_ &dumper); + // Need to make sure that Dump runs *after* all other prepare watchers - + // in particular the next tick one. + ev_set_priority(&dumper, EV_MINPRI); + ev_unref(EV_DEFAULT_UC); + + dump_queue = Persistent::New(Object::New()); + io_watcher->Set(String::NewSymbol("dumpQueue"), dump_queue); } @@ -143,6 +190,346 @@ Handle IOWatcher::Set(const Arguments& args) { return Undefined(); } +#define KB 1024 + +/* + * A large javascript object structure is built up in net.js. The function + * Dump is called at the end of each iteration, before select() is called, + * to push all the data out to sockets. + * + * The structure looks like this: + * + * IOWatcher . dumpQueue + * | + * watcher . buckets - b - b - b - b + * | + * watcher . buckets - b - b + * | + * watcher . buckets - b + * | + * watcher . buckets - b - b - b + * + * The 'b' nodes are little javascript objects buckets. Each has a 'data' + * member. 'data' is either a string or buffer. E.G. + * + * b = { data: "hello world" } + * + */ + +// To enable this debug output, add '-DDUMP_DEBUG' to CPPFLAGS +// in 'build/c4che/default.cache.py' and 'make clean all' +#ifdef DUMP_DEBUG +#define DEBUG_PRINT(fmt,...) \ + fprintf(stderr, "(dump:%d) " fmt "\n", __LINE__, ##__VA_ARGS__) +#else +#define DEBUG_PRINT(fmt,...) +#endif + + +void IOWatcher::Dump(EV_P_ ev_prepare *w, int revents) { + assert(revents == EV_PREPARE); + assert(w == &dumper); + + HandleScope scope; + + static struct iovec iov[IOV_MAX]; + + // Loop over all watchers in the dump queue. Each one stands for a socket + // that has stuff to be written out. + // + // There are several possible outcomes for each watcher. + // 1. All the buckets associated with the watcher are written out. In this + // case the watcher is disabled; it is removed from the dump_queue. + // 2. Some of the data was written, but there still remains buckets. In + // this case the watcher is enabled (i.e. we wait for the file + // descriptor to become readable) and we remove it from the dump_queue. + // When it becomes readable, we'll get a callback in net.js and add it + // again to the dump_queue + // 3. writev returns EAGAIN. This is the same as case 2. + // + // In any case, the dump queue should be empty when we exit this function. + // (See the assert at the end of the outermost for loop. + Local watcher_v; + Local watcher; + + for (watcher_v = dump_queue->Get(next_sym); + watcher_v->IsObject(); + dump_queue->Set(next_sym, (watcher_v = watcher->Get(next_sym))), + watcher->Set(next_sym, Null())) { + watcher = watcher_v->ToObject(); + + IOWatcher *io = ObjectWrap::Unwrap(watcher); + + // stats (just for fun) + io->dumps_++; + io->last_dump_ = ev_now(EV_DEFAULT_UC); + + DEBUG_PRINT("dumping fd %d", io->watcher_.fd); + + // Number of items we've stored in iov + int iovcnt = 0; + // Number of bytes we've stored in iov + size_t to_write = 0; + + bool unix_socket = false; + if (watcher->Has(is_unix_socket_sym) && watcher->Get(is_unix_socket_sym)->IsTrue()) { + unix_socket = true; + } + + // Unix sockets don't like huge messages. TCP sockets do. + // TODO: handle EMSGSIZE after sendmsg(). + size_t max_to_write = unix_socket ? 8*KB : 64*KB; + + int fd_to_send = -1; + + // Offset is only as large as the first buffer of data. (See assert + // below) Offset > 0 occurs when a previous writev could not entirely + // drain a bucket. + size_t offset = 0; + if (watcher->Has(offset_sym)) { + offset = watcher->Get(offset_sym)->Uint32Value(); + } + size_t first_offset = offset; + + + // Loop over all the buckets for this particular watcher/socket in order + // to fill iov. + Local bucket_v; + Local bucket; + unsigned int bucket_index = 0; + + for (bucket_v = watcher->Get(first_bucket_sym); + // Break if we have an FD to send. + // sendmsg can only handle one FD at a time. + fd_to_send < 0 && + // break if we've hit the end + bucket_v->IsObject() && + // break if iov contains a lot of data + to_write < max_to_write && + // break if iov is running out of space + iovcnt < IOV_MAX; + bucket_v = bucket->Get(next_sym), bucket_index++) { + assert(bucket_v->IsObject()); + bucket = bucket_v->ToObject(); + + Local data_v = bucket->Get(data_sym); + // net.js will be setting this 'data' value. We can ensure that it is + // never empty. + assert(!data_v.IsEmpty()); + + Local buf_object; + + if (data_v->IsString()) { + // TODO: insert v8::String::Pointers() hack here. + // TODO: handle different encodings. + Local s = data_v->ToString(); + buf_object = Local::New(Buffer::New(s)); + bucket->Set(data_sym, buf_object); + } else if (Buffer::HasInstance(data_v)) { + buf_object = data_v->ToObject(); + } else { + assert(0); + } + + size_t l = Buffer::Length(buf_object); + + assert(first_offset < l); + iov[iovcnt].iov_base = Buffer::Data(buf_object) + first_offset; + iov[iovcnt].iov_len = l - first_offset; + to_write += iov[iovcnt].iov_len; + iovcnt++; + + first_offset = 0; // only the first buffer will be offset. + + if (unix_socket && bucket->Has(fd_sym)) { + Local fd_v = bucket->Get(fd_sym); + if (fd_v->IsInt32()) { + fd_to_send = fd_v->Int32Value(); + DEBUG_PRINT("got fd to send: %d", fd_to_send); + assert(fd_to_send >= 0); + } + } + } + + if (to_write == 0) continue; + + ssize_t written; + + if (unix_socket) { + struct msghdr msg; + char scratch[64]; + + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_iov = iov; + msg.msg_iovlen = iovcnt; + msg.msg_control = NULL; // void* + msg.msg_controllen = 0; // socklen_t + msg.msg_flags = 0; // int + + if (fd_to_send >= 0) { + struct cmsghdr *cmsg; + + msg.msg_control = (void *) scratch; + msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send)); + + cmsg = CMSG_FIRSTHDR(&msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = msg.msg_controllen; + *(int*) CMSG_DATA(cmsg) = fd_to_send; + } + + written = sendmsg(io->watcher_.fd, &msg, 0); + } else { + written = writev(io->watcher_.fd, iov, iovcnt); + } + + DEBUG_PRINT("iovcnt: %d, to_write: %ld, written: %ld", + iovcnt, + to_write, + written); + + if (written < 0) { + // Allow EAGAIN. + // TODO: handle EMSGSIZE after sendmsg(). + if (errno == EAGAIN) { + io->Start(); + } else { + // Emit error event + if (watcher->Has(onerror_sym)) { + Local callback_v = io->handle_->Get(onerror_sym); + assert(callback_v->IsFunction()); + Local callback = Local::Cast(callback_v); + + Local argv[1] = { Integer::New(errno) }; + + TryCatch try_catch; + + callback->Call(io->handle_, 1, argv); + + if (try_catch.HasCaught()) { + FatalException(try_catch); + } + } + } + // Continue with the next watcher. + continue; + } + + // what about written == 0 ? + + // Now drop the buckets that have been written. + bucket_index = 0; + + while (written > 0) { + bucket_v = watcher->Get(first_bucket_sym); + if (!bucket_v->IsObject()) { + // No more buckets in the queue. Make sure the last_bucket_sym is + // updated and then go to the next watcher. + watcher->Set(last_bucket_sym, Null()); + break; + } + + bucket = bucket_v->ToObject(); + + Local data_v = bucket->Get(data_sym); + assert(!data_v.IsEmpty()); + + // At the moment we're turning all string into buffers + // so we assert that this is not a string. However, when the + // "Pointer patch" lands, this assert will need to be removed. + assert(!data_v->IsString()); + // When the "Pointer patch" lands, we will need to be careful + // to somehow store the length of strings that we're optimizing on + // so that it need not be recalculated here. Note the "Pointer patch" + // will only apply to ASCII strings - UTF8 ones will need to be + // serialized onto a buffer. + size_t bucket_len = Buffer::Length(data_v->ToObject()); + + + if (unix_socket && bucket->Has(fd_sym)) { + bucket->Set(fd_sym, Null()); + } + + + assert(bucket_len > offset); + DEBUG_PRINT("[%ld] bucket_len: %ld, offset: %ld", bucket_index, bucket_len, offset); + + // Only on the first bucket does is the offset > 0. + if (offset + written < bucket_len) { + // we have not written the entire bucket + DEBUG_PRINT("[%ld] Only wrote part of the buffer. " + "setting watcher.offset = %ld", + bucket_index, + offset + written); + + watcher->Set(offset_sym, + Integer::NewFromUnsigned(offset + written)); + break; + } else { + DEBUG_PRINT("[%ld] wrote the whole bucket. discarding.", + bucket_index); + + written -= bucket_len - offset; + + // Offset is now zero + watcher->Set(offset_sym, Integer::NewFromUnsigned(0)); + } + + offset = 0; // the next bucket will have zero offset; + bucket_index++; + + // unshift + watcher->Set(first_bucket_sym, bucket->Get(next_sym)); + } + + // Finished dumping the buckets. + // + // If our list of buckets is empty, we can emit 'drain' and forget about + // this socket. Nothing needs to be done. + // + // Otherwise we need to prepare the io_watcher to wait for the interface + // to become writable again. + + if (watcher->Get(first_bucket_sym)->IsObject()) { + // Still have buckets to be written. Wait for fd to become writable. + io->Start(); + + DEBUG_PRINT("Started watcher %d", io->watcher_.fd); + } else { + // No more buckets in the queue. Make sure the last_bucket_sym is + // updated and then go to the next watcher. + watcher->Set(last_bucket_sym, Null()); + + // Emptied the buckets queue for this socket. Don't wait for it to + // become writable. + io->Stop(); + + DEBUG_PRINT("Stop watcher %d", io->watcher_.fd); + + // Emit drain event + if (watcher->Has(ondrain_sym)) { + Local callback_v = io->handle_->Get(ondrain_sym); + assert(callback_v->IsFunction()); + Local callback = Local::Cast(callback_v); + + TryCatch try_catch; + + callback->Call(io->handle_, 0, NULL); + + if (try_catch.HasCaught()) { + FatalException(try_catch); + } + } + } + } + + // Assert that the dump_queue is empty. + assert(!dump_queue->Get(next_sym)->IsObject()); +} + + } // namespace node diff --git a/src/node_io_watcher.h b/src/node_io_watcher.h index 06d431ece9..2c2de7809f 100644 --- a/src/node_io_watcher.h +++ b/src/node_io_watcher.h @@ -33,9 +33,15 @@ class IOWatcher : ObjectWrap { private: static void Callback(EV_P_ ev_io *watcher, int revents); + static void Dump(EV_P_ ev_prepare *watcher, int revents); + void Start(); void Stop(); + // stats. TODO: expose to js, add reset() method + uint64_t dumps_; + ev_tstamp last_dump_; + ev_io watcher_; }; diff --git a/src/node_net.cc b/src/node_net.cc index a74e2dcef0..f068fc802f 100644 --- a/src/node_net.cc +++ b/src/node_net.cc @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -37,7 +38,6 @@ #define ARRAY_SIZE(a) (sizeof(a) / sizeof(*(a))) - namespace node { using namespace v8; diff --git a/test/simple/test-dumper-unix.js b/test/simple/test-dumper-unix.js new file mode 100644 index 0000000000..d6e7457c72 --- /dev/null +++ b/test/simple/test-dumper-unix.js @@ -0,0 +1,133 @@ +var assert =require('assert'); +var IOWatcher = process.binding('io_watcher').IOWatcher; +var errnoException = process.binding('net').errnoException; +var close = process.binding('net').close; +var net = require('net'); + +var ncomplete = 0; + +function test (N, b, cb) { + var fdsSent = 0; + var fdsRecv = 0; + //console.trace(); + var expected = N * b.length; + var nread = 0; + + // Create a socketpair + var fds = process.binding('net').socketpair(); + + // Use writev/dumper to send data down the one of the sockets, fds[1]. + // This requires a IOWatcher. + var w = new IOWatcher(); + w.set(fds[1], false, true); + w.isUnixSocket = true; + + w.callback = function (readable, writable) { + assert.ok(!readable && writable); // not really important. + // Insert watcher into dumpQueue + w.next = IOWatcher.dumpQueue.next; + IOWatcher.dumpQueue.next = w; + } + + var ndrain = 0; + w.ondrain = function () { + ndrain++; + } + + var nerror = 0; + w.onerror = function (errno) { + throw errnoException(errno); + nerror++; + } + + // The read end, fds[0], will be used to count how much comes through. + // This sets up a readable stream on fds[0]. + var stream = new net.Stream({ fd: fds[0], type: 'unix' }); + //stream.readable = true; + stream.resume(); + + stream.on('fd', function (fd) { + console.log('got fd %d', fd); + fdsRecv++; + }); + + // Count the data as it arrives on the other end + stream.on('data', function (d) { + nread += d.length; + + if (nread >= expected) { + assert.ok(nread === expected); + assert.equal(1, ndrain); + assert.equal(0, nerror); + console.error("done. wrote %d bytes\n", nread); + close(fds[1]); + } + }); + + + stream.on('close', function () { + assert.equal(fdsSent, fdsRecv); + // check to make sure the watcher isn't in the dump queue. + for (var x = IOWatcher.dumpQueue; x; x = x.next) { + assert.ok(x !== w); + } + assert.equal(null, w.next); + // completely flushed + assert.ok(!w.firstBucket); + assert.ok(!w.lastBucket); + + ncomplete++; + if (cb) cb(); + }); + + + // Insert watcher into dumpQueue + w.next = IOWatcher.dumpQueue.next; + IOWatcher.dumpQueue.next = w; + + w.firstBucket = { data: b }; + w.lastBucket = w.firstBucket; + + for (var i = 0; i < N-1; i++) { + var bucket = { data: b }; + w.lastBucket.next = bucket; + w.lastBucket = bucket; + // Kind of randomly fill these buckets with fds. + if (fdsSent < 5 && i % 2 == 0) { + bucket.fd = 1; // send stdout + fdsSent++; + } + } +} + + +function runTests (values) { + expectedToComplete = values.length; + + function go () { + if (ncomplete < values.length) { + var v = values[ncomplete]; + console.log("test N=%d, size=%d", v[0], v[1].length); + test(v[0], v[1], go); + } + } + + go(); +} + +runTests([ [30, Buffer(1000)] + , [4, Buffer(10000)] + , [1, "hello world\n"] + , [50, Buffer(1024*1024)] + , [500, Buffer(40960+1)] + , [500, Buffer(40960-1)] + , [500, Buffer(40960)] + , [500, Buffer(1024*1024+1)] + , [50000, "hello world\n"] + ]); + + +process.on('exit', function () { + assert.equal(expectedToComplete, ncomplete); +}); + diff --git a/test/simple/test-dumper.js b/test/simple/test-dumper.js new file mode 100644 index 0000000000..af9ad87018 --- /dev/null +++ b/test/simple/test-dumper.js @@ -0,0 +1,127 @@ +var assert =require('assert'); +var IOWatcher = process.binding('io_watcher').IOWatcher; +var errnoException = process.binding('net').errnoException; +var close = process.binding('net').close; +var net = require('net'); + +var ncomplete = 0; + + + + + +function test (N, b, cb) { + //console.trace(); + var expected = N * b.length; + var nread = 0; + + // Create a pipe + var fds = process.binding('net').pipe(); + console.log("fds == %j", fds); + + // Use writev/dumper to send data down the write end of the pipe, fds[1]. + // This requires a IOWatcher. + var w = new IOWatcher(); + w.set(fds[1], false, true); + + w.callback = function (readable, writable) { + assert.ok(!readable && writable); // not really important. + // Insert watcher into dumpQueue + w.next = IOWatcher.dumpQueue.next; + IOWatcher.dumpQueue.next = w; + } + + var ndrain = 0; + w.ondrain = function () { + ndrain++; + } + + var nerror = 0; + w.onerror = function (errno) { + throw errnoException(errno); + nerror++; + } + + // The read end, fds[0], will be used to count how much comes through. + // This sets up a readable stream on fds[0]. + var stream = new net.Stream(); + stream.open(fds[0]); + stream.readable = true; + stream.resume(); + + + // Count the data as it arrives on the read end of the pipe. + stream.on('data', function (d) { + nread += d.length; + + if (nread >= expected) { + assert.ok(nread === expected); + assert.equal(1, ndrain); + assert.equal(0, nerror); + console.error("done. wrote %d bytes\n", nread); + close(fds[1]); + } + }); + + stream.on('close', function () { + // check to make sure the watcher isn't in the dump queue. + for (var x = IOWatcher.dumpQueue; x; x = x.next) { + assert.ok(x !== w); + } + assert.equal(null, w.next); + // completely flushed + assert.ok(!w.firstBucket); + assert.ok(!w.lastBucket); + + ncomplete++; + if (cb) cb(); + }); + + + // Insert watcher into dumpQueue + w.next = IOWatcher.dumpQueue.next; + IOWatcher.dumpQueue.next = w; + + w.firstBucket = { data: b }; + w.lastBucket = w.firstBucket; + + for (var i = 0; i < N-1; i++) { + var bucket = { data: b }; + assert.ok(!w.lastBucket.next); + w.lastBucket.next = bucket; + w.lastBucket = bucket; + } +} + + +function runTests (values) { + expectedToComplete = values.length; + + function go () { + if (ncomplete < values.length) { + var v = values[ncomplete]; + console.log("test N=%d, size=%d", v[0], v[1].length); + test(v[0], v[1], go); + } + } + + go(); +} + +runTests([ [3, Buffer(1000)], + [30, Buffer(1000)], + [4, Buffer(10000)], + [1, "hello world\n"], + [50, Buffer(1024*1024)], + [500, Buffer(40960+1)], + [500, Buffer(40960-1)], + [500, Buffer(40960)], + [500, Buffer(1024*1024+1)], + [50000, "hello world\n"] + ]); + + +process.on('exit', function () { + assert.equal(expectedToComplete, ncomplete); +}); + From dcc547d798c11ed81f4617724a0120dad484d0ce Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Fri, 5 Nov 2010 13:03:32 -0700 Subject: [PATCH 02/20] Dumper net.js integration --- lib/http.js | 4 +- lib/net.js | 266 +++++++++++----------------------------------------- 2 files changed, 58 insertions(+), 212 deletions(-) diff --git a/lib/http.js b/lib/http.js index b0e86f2833..1a00efce83 100644 --- a/lib/http.js +++ b/lib/http.js @@ -827,8 +827,8 @@ function connectionListener (socket) { // No more messages to be pushed out. // HACK: need way to do this with socket interface - if (socket._writeQueue.length) { - socket.__destroyOnDrain = true; //socket.end(); + if (socket._writeWatcher.firstBucket) { + socket._eof = true; } else { socket.destroy(); } diff --git a/lib/net.js b/lib/net.js index 6373b40f52..b79dbbd545 100644 --- a/lib/net.js +++ b/lib/net.js @@ -54,6 +54,24 @@ var ioWatchers = new FreeList("iowatcher", 100, function () { return new IOWatcher(); }); + +IOWatcher.prototype.ondrain = function () { + assert(this.socket); + if (this.writable || this.readable) { + require('timers').active(this.socket); + this.socket.emit('drain'); + } + + if (this.socket._eof) this.socket._shutdown(); +}; + + +IOWatcher.prototype.onerror = function (errno) { + assert(this.socket); + this.socket.destroy(errnoException(errno, 'write')); +}; + + exports.isIP = binding.isIP; exports.isIPv4 = function (input) { @@ -92,16 +110,6 @@ function setImplmentationMethods (self) { }; if (self.type == 'unix') { - self._writeImpl = function (buf, off, len, fd, flags) { - // Detect and disallow zero-byte writes wth an attached file - // descriptor. This is an implementation limitation of sendmsg(2). - if (fd && noData(buf, off, len)) { - throw new Error('File descriptors can only be written with data'); - } - - return sendMsg(self.fd, buf, off, len, fd, flags); - }; - self._readImpl = function (buf, off, len) { var bytesRead = recvMsg(self.fd, buf, off, len); @@ -123,25 +131,10 @@ function setImplmentationMethods (self) { return bytesRead; }; } else { - self._writeImpl = function (buf, off, len, fd, flags) { - // XXX: TLS support requires that 0-byte writes get processed - // by the kernel for some reason. Otherwise, we'd just - // fast-path return here. - - // Drop 'fd' and 'flags' as these are not supported by the write(2) - // system call - return write(self.fd, buf, off, len); - }; - self._readImpl = function (buf, off, len) { return read(self.fd, buf, off, len); }; } - - self._shutdownImpl = function () { - shutdown(self.fd, 'write'); - }; - }; @@ -168,11 +161,6 @@ function initStream (self) { self._readWatcher.callback = onReadable; self.readable = false; - // Queue of buffers and string that need to be written to socket. - self._writeQueue = []; - self._writeQueueEncoding = []; - self._writeQueueFD = []; - self._writeWatcher = ioWatchers.alloc(); self._writeWatcher.socket = self; self._writeWatcher.callback = onWritable; @@ -255,182 +243,44 @@ Object.defineProperty(Stream.prototype, 'readyState', { }); -// Returns true if all the data was flushed to socket. Returns false if -// something was queued. If data was queued, then the "drain" event will -// signal when it has been finally flushed to socket. -Stream.prototype.write = function (data, encoding, fd) { - if (this._connecting || (this._writeQueue && this._writeQueue.length)) { - if (!this._writeQueue) { - this._writeQueue = []; - this._writeQueueEncoding = []; - this._writeQueueFD = []; - } - - // Slow. There is already a write queue, so let's append to it. - if (this._writeQueueLast() === END_OF_FILE) { - throw new Error('Stream.end() called already; cannot write.'); - } - - if (typeof data == 'string' && - this._writeQueue.length && - typeof this._writeQueue[this._writeQueue.length-1] === 'string' && - this._writeQueueEncoding[this._writeQueueEncoding.length-1] === encoding) { - // optimization - concat onto last - this._writeQueue[this._writeQueue.length-1] += data; - } else { - this._writeQueue.push(data); - this._writeQueueEncoding.push(encoding); - } - - if (fd != undefined) { - this._writeQueueFD.push(fd); - } - - return false; - } else { - // Fast. - // The most common case. There is no write queue. Just push the data - // directly to the socket. - return this._writeOut(data, encoding, fd); - } -}; - -// Directly writes the data to socket. -// -// Steps: -// 1. If it's a string, write it to the `pool`. (If not space remains -// on the pool make a new one.) -// 2. Write data to socket. Return true if flushed. -// 3. Slice out remaining -// 4. Unshift remaining onto _writeQueue. Return false. -Stream.prototype._writeOut = function (data, encoding, fd) { - if (!this.writable) { - throw new Error('Stream is not writable'); - } +Stream.prototype._appendBucket = function (data, encoding, fd) { + var newBucket = { data: data }; + if (encoding) newBucket.encoding = encoding; + if (fd) newBucket.fd = fd; - var buffer, off, len; - var bytesWritten, charsWritten; - var queuedData = false; + var queueSize = data.length; - if (typeof data != 'string') { - // 'data' is a buffer, ignore 'encoding' - buffer = data; - off = 0; - len = data.length; + // TODO properly calculate queueSize + if (this._writeWatcher.lastBucket) { + this._writeWatcher.lastBucket.next = newBucket; } else { - assert(typeof data == 'string'); - - if (!pool || pool.length - pool.used < kMinPoolSpace) { - pool = null; - allocNewPool(); - } - - if (!encoding || encoding == 'utf8' || encoding == 'utf-8') { - // default to utf8 - bytesWritten = pool.write(data, 'utf8', pool.used); - charsWritten = Buffer._charsWritten; - } else { - bytesWritten = pool.write(data, encoding, pool.used); - charsWritten = bytesWritten; - } - - if (encoding && data.length > 0) { - assert(bytesWritten > 0); - } - - buffer = pool; - len = bytesWritten; - off = pool.used; - - pool.used += bytesWritten; - - debug('wrote ' + bytesWritten + ' bytes to pool'); - - if (charsWritten != data.length) { - //debug("couldn't fit " + (data.length - charsWritten) + " bytes into the pool\n"); - // Unshift whatever didn't fit onto the buffer - this._writeQueue.unshift(data.slice(charsWritten)); - this._writeQueueEncoding.unshift(encoding); - this._writeWatcher.start(); - queuedData = true; - } + this._writeWatcher.firstBucket = newBucket; } - try { - bytesWritten = this._writeImpl(buffer, off, len, fd, 0); - } catch (e) { - this.destroy(e); - return false; - } - - debug('wrote ' + bytesWritten + ' to socket. [fd, off, len] = ' + JSON.stringify([this.fd, off, len]) + "\n"); + this._writeWatcher.lastBucket = newBucket; - require('timers').active(this); + return queueSize; +}; - if (bytesWritten == len) { - // awesome. sent to buffer. - if (buffer === pool) { - // If we're just writing from the pool then we can make a little - // optimization and save the space. - buffer.used -= len; - } - if (queuedData) { - return false; - } else { - return true; - } +Stream.prototype.write = function (data, encoding, fd) { + if (this._eof) { + throw new Error('Stream.end() called already; cannot write.'); } - // Didn't write the entire thing to buffer. - // Need to wait for the socket to become available before trying again. - this._writeWatcher.start(); - - // Slice out the data left. - var leftOver = buffer.slice(off + bytesWritten, off + len); - leftOver.used = leftOver.length; // used the whole thing... - - // util.error('data.used = ' + data.used); - //if (!this._writeQueue) initWriteStream(this); - - // data should be the next thing to write. - this._writeQueue.unshift(leftOver); - this._writeQueueEncoding.unshift(null); - - // If didn't successfully write any bytes, enqueue our fd and try again - if (!bytesWritten) { - this._writeQueueFD.unshift(fd); + if (!this.writable) { + throw new Error('Stream is not writable'); } - return false; -}; - - -// Flushes the write buffer out. -// Returns true if the entire buffer was flushed. -Stream.prototype.flush = function () { - while (this._writeQueue && this._writeQueue.length) { - var data = this._writeQueue.shift(); - var encoding = this._writeQueueEncoding.shift(); - var fd = this._writeQueueFD.shift(); - - if (data === END_OF_FILE) { - this._shutdown(); - return true; - } + var queueSize = this._appendBucket(data, encoding, fd); - var flushed = this._writeOut(data,encoding,fd); - if (!flushed) return false; - } - if (this._writeWatcher) this._writeWatcher.stop(); - return true; -}; + if (this._connecting) return false; + this._onWritable(); // Insert writeWatcher into the dumpQueue + require('timers').active(this); -Stream.prototype._writeQueueLast = function () { - return this._writeQueue.length > 0 ? this._writeQueue[this._writeQueue.length-1] - : null; + return queueSize < (64*1024); }; @@ -481,7 +331,7 @@ Stream.prototype._onConnect = function () { } - if (this._writeQueue && this._writeQueue.length) { + if (this._writeWatcher.firstBucket) { // Flush this in case any writes are queued up while connecting. this._onWritable(); } @@ -493,11 +343,10 @@ Stream.prototype._onConnect = function () { Stream.prototype._onWritable = function () { - // Stream becomes writable on connect() but don't flush if there's - // nothing actually to write - if (this.flush()) { - if (this._events && this._events['drain']) this.emit("drain"); - if (this.ondrain) this.ondrain(); // Optimization + // Stick it into the dumpQueue + if (!this._writeWatcher.next) { + this._writeWatcher.next = IOWatcher.dumpQueue.next; + IOWatcher.dumpQueue.next = this._writeWatcher; } }; @@ -648,15 +497,13 @@ Stream.prototype.destroy = function (exception) { // pool is shared between sockets, so don't need to free it here. var self = this; - // TODO would like to set _writeQueue to null to avoid extra object alloc, - // but lots of code assumes this._writeQueue is always an array. - this._writeQueue = []; - this.readable = this.writable = false; if (this._writeWatcher) { this._writeWatcher.stop(); this._writeWatcher.socket = null; + this._writeWatcher.firstBucket = null; + this._writeWatcher.lastBucket = null; ioWatchers.free(this._writeWatcher); this._writeWatcher = null; } @@ -695,7 +542,7 @@ Stream.prototype._shutdown = function () { this.writable = false; try { - this._shutdownImpl(); + shutdown(this.fd, 'write'); } catch (e) { this.destroy(e); } @@ -708,15 +555,14 @@ Stream.prototype._shutdown = function () { Stream.prototype.end = function (data, encoding) { - if (this.writable) { - if (this._writeQueueLast() !== END_OF_FILE) { - if (data) this.write(data, encoding); - this._writeQueue.push(END_OF_FILE); - if (!this._connecting) { - this.flush(); - } - } - } + if (!this.writable) return; // TODO this should throw error + if (this._eof) return; // TODO this should also throw error + + if (data) this._appendBucket(data, encoding); + this._eof = true; + + // If this isn't in the dumpQueue then we shutdown now. + if (!this._writeWatcher.next) this._shutdown(); }; From d74c5060447c01044e4122d9bcda9482bda2a351 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 11 Nov 2010 14:26:41 -0800 Subject: [PATCH 03/20] Support encoding --- src/node_buffer.cc | 8 +++++--- src/node_buffer.h | 4 +++- src/node_io_watcher.cc | 6 ++++-- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/node_buffer.cc b/src/node_buffer.cc index 8d7fc6dc91..d838577a15 100644 --- a/src/node_buffer.cc +++ b/src/node_buffer.cc @@ -82,7 +82,8 @@ static size_t ByteLength (Handle string, enum encoding enc) { } -Handle Buffer::New(Handle string) { +Handle Buffer::New(Handle string, + Handle encoding) { HandleScope scope; // get Buffer from global scope. @@ -91,8 +92,9 @@ Handle Buffer::New(Handle string) { assert(bv->IsFunction()); Local b = Local::Cast(bv); - Local argv[1] = { Local::New(string) }; - Local instance = b->NewInstance(1, argv); + Local argv[2] = { Local::New(string), + Local::New(encoding) }; + Local instance = b->NewInstance(2, argv); return scope.Close(instance); } diff --git a/src/node_buffer.h b/src/node_buffer.h index 909be452f7..efc982ecf7 100644 --- a/src/node_buffer.h +++ b/src/node_buffer.h @@ -25,7 +25,9 @@ class Buffer : public ObjectWrap { typedef void (*free_callback)(char *data, void *hint); // C++ API for constructing fast buffer - static v8::Handle New(v8::Handle string); + static v8::Handle New( + v8::Handle string, + v8::Handle encoding = v8::Handle()); static void Initialize(v8::Handle target); static Buffer* New(size_t length); // public constructor diff --git a/src/node_io_watcher.cc b/src/node_io_watcher.cc index 98c2403eb2..298a35b9f2 100644 --- a/src/node_io_watcher.cc +++ b/src/node_io_watcher.cc @@ -31,6 +31,7 @@ static Persistent prev_sym; static Persistent ondrain_sym; static Persistent onerror_sym; static Persistent data_sym; +static Persistent encoding_sym; static Persistent offset_sym; static Persistent fd_sym; static Persistent is_unix_socket_sym; @@ -65,6 +66,7 @@ void IOWatcher::Initialize(Handle target) { fd_sym = NODE_PSYMBOL("fd"); is_unix_socket_sym = NODE_PSYMBOL("isUnixSocket"); data_sym = NODE_PSYMBOL("data"); + encoding_sym = NODE_PSYMBOL("encoding"); ev_prepare_init(&dumper, IOWatcher::Dump); @@ -321,9 +323,9 @@ void IOWatcher::Dump(EV_P_ ev_prepare *w, int revents) { if (data_v->IsString()) { // TODO: insert v8::String::Pointers() hack here. - // TODO: handle different encodings. Local s = data_v->ToString(); - buf_object = Local::New(Buffer::New(s)); + Local e = bucket->Get(encoding_sym); + buf_object = Local::New(Buffer::New(s, e)); bucket->Set(data_sym, buf_object); } else if (Buffer::HasInstance(data_v)) { buf_object = data_v->ToObject(); From 5ba0be61661a89fb92c784c1dbc24911eabfecd0 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 11 Nov 2010 17:31:38 -0800 Subject: [PATCH 04/20] Don't send null data segments --- lib/net.js | 1 + src/node_io_watcher.cc | 2 ++ 2 files changed, 3 insertions(+) diff --git a/lib/net.js b/lib/net.js index b79dbbd545..cd83b15720 100644 --- a/lib/net.js +++ b/lib/net.js @@ -244,6 +244,7 @@ Object.defineProperty(Stream.prototype, 'readyState', { Stream.prototype._appendBucket = function (data, encoding, fd) { + // TODO reject empty data. var newBucket = { data: data }; if (encoding) newBucket.encoding = encoding; if (fd) newBucket.fd = fd; diff --git a/src/node_io_watcher.cc b/src/node_io_watcher.cc index 298a35b9f2..a991dd9a8f 100644 --- a/src/node_io_watcher.cc +++ b/src/node_io_watcher.cc @@ -335,6 +335,8 @@ void IOWatcher::Dump(EV_P_ ev_prepare *w, int revents) { size_t l = Buffer::Length(buf_object); + if (l == 0) continue; + assert(first_offset < l); iov[iovcnt].iov_base = Buffer::Data(buf_object) + first_offset; iov[iovcnt].iov_len = l - first_offset; From e1250a3333d22f003b37f66978a856e1f7ec96db Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Fri, 12 Nov 2010 09:18:44 -0800 Subject: [PATCH 05/20] Reset _eof on socket reuse --- lib/http.js | 2 +- lib/net.js | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/http.js b/lib/http.js index 1a00efce83..70585ca810 100644 --- a/lib/http.js +++ b/lib/http.js @@ -828,7 +828,7 @@ function connectionListener (socket) { // HACK: need way to do this with socket interface if (socket._writeWatcher.firstBucket) { - socket._eof = true; + socket.__destroyOnDrain = true; } else { socket.destroy(); } diff --git a/lib/net.js b/lib/net.js index cd83b15720..59af9b9f32 100644 --- a/lib/net.js +++ b/lib/net.js @@ -160,6 +160,7 @@ function initStream (self) { self._readWatcher.socket = self; self._readWatcher.callback = onReadable; self.readable = false; + self._eof = false; self._writeWatcher = ioWatchers.alloc(); self._writeWatcher.socket = self; @@ -498,7 +499,7 @@ Stream.prototype.destroy = function (exception) { // pool is shared between sockets, so don't need to free it here. var self = this; - this.readable = this.writable = false; + this._eof = this.readable = this.writable = false; if (this._writeWatcher) { this._writeWatcher.stop(); From 5a84461e46cdf4c05d6beb418645989db59a546a Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Fri, 12 Nov 2010 09:54:48 -0800 Subject: [PATCH 06/20] Also do ondrain callback for socket (needed by http) --- lib/net.js | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/lib/net.js b/lib/net.js index 59af9b9f32..998322678e 100644 --- a/lib/net.js +++ b/lib/net.js @@ -57,12 +57,16 @@ var ioWatchers = new FreeList("iowatcher", 100, function () { IOWatcher.prototype.ondrain = function () { assert(this.socket); - if (this.writable || this.readable) { - require('timers').active(this.socket); - this.socket.emit('drain'); + var socket = this.socket; + + if (socket.writable || socket.readable) { + require('timers').active(socket); } - if (this.socket._eof) this.socket._shutdown(); + socket.emit('drain'); + if (socket.ondrain) socket.ondrain(); + + if (socket._eof) socket._shutdown(); }; From 7c3c5c6861eed57c9b1a423bf0a6dab7a9f20c6a Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Fri, 12 Nov 2010 11:24:30 -0800 Subject: [PATCH 07/20] Maintain queueSize for each socket --- lib/net.js | 10 +++++++--- src/node_io_watcher.cc | 12 ++++++++++-- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/lib/net.js b/lib/net.js index 998322678e..230d8e3a58 100644 --- a/lib/net.js +++ b/lib/net.js @@ -254,8 +254,6 @@ Stream.prototype._appendBucket = function (data, encoding, fd) { if (encoding) newBucket.encoding = encoding; if (fd) newBucket.fd = fd; - var queueSize = data.length; - // TODO properly calculate queueSize if (this._writeWatcher.lastBucket) { @@ -266,7 +264,13 @@ Stream.prototype._appendBucket = function (data, encoding, fd) { this._writeWatcher.lastBucket = newBucket; - return queueSize; + if (this._writeWatcher.queueSize === undefined) { + this._writeWatcher.queueSize = 0; + } + assert(this._writeWatcher.queueSize >= 0); + this._writeWatcher.queueSize += data.length; + + return this._writeWatcher.queueSize; }; diff --git a/src/node_io_watcher.cc b/src/node_io_watcher.cc index a991dd9a8f..38f44cc46f 100644 --- a/src/node_io_watcher.cc +++ b/src/node_io_watcher.cc @@ -37,6 +37,7 @@ static Persistent fd_sym; static Persistent is_unix_socket_sym; static Persistent first_bucket_sym; static Persistent last_bucket_sym; +static Persistent queue_size_sym; void IOWatcher::Initialize(Handle target) { @@ -62,6 +63,7 @@ void IOWatcher::Initialize(Handle target) { onerror_sym = NODE_PSYMBOL("onerror"); first_bucket_sym = NODE_PSYMBOL("firstBucket"); last_bucket_sym = NODE_PSYMBOL("lastBucket"); + queue_size_sym = NODE_PSYMBOL("queueSize"); offset_sym = NODE_PSYMBOL("offset"); fd_sym = NODE_PSYMBOL("fd"); is_unix_socket_sym = NODE_PSYMBOL("isUnixSocket"); @@ -423,6 +425,9 @@ void IOWatcher::Dump(EV_P_ ev_prepare *w, int revents) { // what about written == 0 ? + size_t queue_size = watcher->Get(queue_size_sym)->Uint32Value(); + assert(queue_size >= offset); + // Now drop the buckets that have been written. bucket_index = 0; @@ -451,15 +456,15 @@ void IOWatcher::Dump(EV_P_ ev_prepare *w, int revents) { // serialized onto a buffer. size_t bucket_len = Buffer::Length(data_v->ToObject()); - if (unix_socket && bucket->Has(fd_sym)) { bucket->Set(fd_sym, Null()); } - assert(bucket_len > offset); DEBUG_PRINT("[%ld] bucket_len: %ld, offset: %ld", bucket_index, bucket_len, offset); + queue_size -= written; + // Only on the first bucket does is the offset > 0. if (offset + written < bucket_len) { // we have not written the entire bucket @@ -488,6 +493,9 @@ void IOWatcher::Dump(EV_P_ ev_prepare *w, int revents) { watcher->Set(first_bucket_sym, bucket->Get(next_sym)); } + // Set the queue size. + watcher->Set(queue_size_sym, Integer::NewFromUnsigned(queue_size)); + // Finished dumping the buckets. // // If our list of buckets is empty, we can emit 'drain' and forget about From 10ff559ec39ccb27d9f12b41960207fe493a27e7 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Fri, 12 Nov 2010 11:31:39 -0800 Subject: [PATCH 08/20] Add IOWatcher.flush() To be called if sockets get too much data. This is to force a flush before the tick ends. --- lib/net.js | 4 ++++ src/node_io_watcher.cc | 15 +++++++++++++++ src/node_io_watcher.h | 2 ++ 3 files changed, 21 insertions(+) diff --git a/lib/net.js b/lib/net.js index 230d8e3a58..12db1f45e5 100644 --- a/lib/net.js +++ b/lib/net.js @@ -290,6 +290,10 @@ Stream.prototype.write = function (data, encoding, fd) { this._onWritable(); // Insert writeWatcher into the dumpQueue require('timers').active(this); + if (queueSize > (64*1024)) { + IOWatcher.flush(); + } + return queueSize < (64*1024); }; diff --git a/src/node_io_watcher.cc b/src/node_io_watcher.cc index 38f44cc46f..943deb7056 100644 --- a/src/node_io_watcher.cc +++ b/src/node_io_watcher.cc @@ -55,6 +55,10 @@ void IOWatcher::Initialize(Handle target) { Local io_watcher = constructor_template->GetFunction(); target->Set(String::NewSymbol("IOWatcher"), io_watcher); + NODE_SET_METHOD(constructor_template->GetFunction(), + "flush", + IOWatcher::Flush); + callback_symbol = NODE_PSYMBOL("callback"); next_sym = NODE_PSYMBOL("next"); @@ -194,6 +198,13 @@ Handle IOWatcher::Set(const Arguments& args) { return Undefined(); } + +Handle IOWatcher::Flush(const Arguments& args) { + HandleScope scope; // unneccessary? + IOWatcher::Dump(); + return Undefined(); +} + #define KB 1024 /* @@ -233,7 +244,11 @@ Handle IOWatcher::Set(const Arguments& args) { void IOWatcher::Dump(EV_P_ ev_prepare *w, int revents) { assert(revents == EV_PREPARE); assert(w == &dumper); + Dump(); +} + +void IOWatcher::Dump() { HandleScope scope; static struct iovec iov[IOV_MAX]; diff --git a/src/node_io_watcher.h b/src/node_io_watcher.h index 2c2de7809f..f1ae3babf1 100644 --- a/src/node_io_watcher.h +++ b/src/node_io_watcher.h @@ -26,6 +26,7 @@ class IOWatcher : ObjectWrap { } static v8::Handle New(const v8::Arguments& args); + static v8::Handle Flush(const v8::Arguments& args); static v8::Handle Start(const v8::Arguments& args); static v8::Handle Stop(const v8::Arguments& args); static v8::Handle Set(const v8::Arguments& args); @@ -34,6 +35,7 @@ class IOWatcher : ObjectWrap { static void Callback(EV_P_ ev_io *watcher, int revents); static void Dump(EV_P_ ev_prepare *watcher, int revents); + static void Dump(); void Start(); void Stop(); From 5d6a03c9fe562b80ecb6ae2b3fc34798a1235e7d Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Fri, 12 Nov 2010 11:47:31 -0800 Subject: [PATCH 09/20] Don't append buckets of zero length --- lib/net.js | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/lib/net.js b/lib/net.js index 12db1f45e5..a25673c3a0 100644 --- a/lib/net.js +++ b/lib/net.js @@ -249,20 +249,22 @@ Object.defineProperty(Stream.prototype, 'readyState', { Stream.prototype._appendBucket = function (data, encoding, fd) { - // TODO reject empty data. - var newBucket = { data: data }; - if (encoding) newBucket.encoding = encoding; - if (fd) newBucket.fd = fd; + if (data.length != 0) { + // TODO reject empty data. + var newBucket = { data: data }; + if (encoding) newBucket.encoding = encoding; + if (fd) newBucket.fd = fd; - // TODO properly calculate queueSize + // TODO properly calculate queueSize - if (this._writeWatcher.lastBucket) { - this._writeWatcher.lastBucket.next = newBucket; - } else { - this._writeWatcher.firstBucket = newBucket; - } + if (this._writeWatcher.lastBucket) { + this._writeWatcher.lastBucket.next = newBucket; + } else { + this._writeWatcher.firstBucket = newBucket; + } - this._writeWatcher.lastBucket = newBucket; + this._writeWatcher.lastBucket = newBucket; + } if (this._writeWatcher.queueSize === undefined) { this._writeWatcher.queueSize = 0; From a6d84253824694b6e6ec148ec57cb344727d3a84 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Fri, 12 Nov 2010 14:05:26 -0800 Subject: [PATCH 10/20] make sure unix sockets are tagged correctly --- lib/net.js | 5 +++++ src/node_io_watcher.cc | 2 -- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/net.js b/lib/net.js index a25673c3a0..03b09c3c5a 100644 --- a/lib/net.js +++ b/lib/net.js @@ -215,6 +215,10 @@ Stream.prototype.open = function (fd, type) { setImplmentationMethods(this); + if (this.type === "unix") { + this._writeWatcher.isUnixSocket = true; + } + this._writeWatcher.set(this.fd, false, true); this.writable = true; }; @@ -520,6 +524,7 @@ Stream.prototype.destroy = function (exception) { this._writeWatcher.socket = null; this._writeWatcher.firstBucket = null; this._writeWatcher.lastBucket = null; + this._writeWatcher.isUnixSocket = false; ioWatchers.free(this._writeWatcher); this._writeWatcher = null; } diff --git a/src/node_io_watcher.cc b/src/node_io_watcher.cc index 943deb7056..3f85076279 100644 --- a/src/node_io_watcher.cc +++ b/src/node_io_watcher.cc @@ -557,6 +557,4 @@ void IOWatcher::Dump() { } - - } // namespace node From fa556a142512ab932b7359760e5e4585e4e035b6 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Fri, 12 Nov 2010 16:24:53 -0800 Subject: [PATCH 11/20] Add callback to socket.write(), fix test-sendfds --- lib/net.js | 46 +++++++++++++++++++++++++++++--------- src/node_io_watcher.cc | 13 +++++++++++ test/fixtures/recvfd.js | 16 ++++++------- test/simple/test-sendfd.js | 17 +++++++------- 4 files changed, 63 insertions(+), 29 deletions(-) diff --git a/lib/net.js b/lib/net.js index 03b09c3c5a..7019d9d107 100644 --- a/lib/net.js +++ b/lib/net.js @@ -56,17 +56,18 @@ var ioWatchers = new FreeList("iowatcher", 100, function () { IOWatcher.prototype.ondrain = function () { - assert(this.socket); - var socket = this.socket; + if (this.socket) { + var socket = this.socket; - if (socket.writable || socket.readable) { - require('timers').active(socket); - } + if (socket.writable || socket.readable) { + require('timers').active(socket); + } - socket.emit('drain'); - if (socket.ondrain) socket.ondrain(); + socket.emit('drain'); + if (socket.ondrain) socket.ondrain(); - if (socket._eof) socket._shutdown(); + if (socket._eof) socket._shutdown(); + } }; @@ -252,12 +253,13 @@ Object.defineProperty(Stream.prototype, 'readyState', { }); -Stream.prototype._appendBucket = function (data, encoding, fd) { +Stream.prototype._appendBucket = function (data, encoding, fd, callback) { if (data.length != 0) { // TODO reject empty data. var newBucket = { data: data }; if (encoding) newBucket.encoding = encoding; if (fd) newBucket.fd = fd; + if (callback) newBucket.callback = callback; // TODO properly calculate queueSize @@ -280,7 +282,7 @@ Stream.prototype._appendBucket = function (data, encoding, fd) { }; -Stream.prototype.write = function (data, encoding, fd) { +Stream.prototype.write = function (data /* encoding, fd, callback */) { if (this._eof) { throw new Error('Stream.end() called already; cannot write.'); } @@ -289,7 +291,29 @@ Stream.prototype.write = function (data, encoding, fd) { throw new Error('Stream is not writable'); } - var queueSize = this._appendBucket(data, encoding, fd); + // parse the arguments. ugly. + + var encoding, fd, callback; + + if (arguments[1] === undefined || typeof arguments[1] == 'string') { + encoding = arguments[1]; + if (typeof arguments[2] == 'number') { + fd = arguments[2]; + callback = arguments[3]; + } else { + callback = arguments[2]; + } + } else if (typeof arguments[1] == 'number') { + fd = arguments[1]; + callback = arguments[2]; + } else if (typeof arguments[1] == 'function') { + callback = arguments[1]; + } else { + throw new Error("Bad type for second argument"); + } + + + var queueSize = this._appendBucket(data, encoding, fd, callback); if (this._connecting) return false; diff --git a/src/node_io_watcher.cc b/src/node_io_watcher.cc index 3f85076279..62c5646463 100644 --- a/src/node_io_watcher.cc +++ b/src/node_io_watcher.cc @@ -38,6 +38,7 @@ static Persistent is_unix_socket_sym; static Persistent first_bucket_sym; static Persistent last_bucket_sym; static Persistent queue_size_sym; +static Persistent callback_sym; void IOWatcher::Initialize(Handle target) { @@ -73,6 +74,7 @@ void IOWatcher::Initialize(Handle target) { is_unix_socket_sym = NODE_PSYMBOL("isUnixSocket"); data_sym = NODE_PSYMBOL("data"); encoding_sym = NODE_PSYMBOL("encoding"); + callback_sym = NODE_PSYMBOL("callback"); ev_prepare_init(&dumper, IOWatcher::Dump); @@ -497,6 +499,17 @@ void IOWatcher::Dump() { written -= bucket_len - offset; + Local bucket_callback_v = bucket->Get(callback_sym); + if (bucket_callback_v->IsFunction()) { + Local bucket_callback = + Local::Cast(bucket_callback_v); + TryCatch try_catch; + bucket_callback->Call(io->handle_, 0, NULL); + if (try_catch.HasCaught()) { + FatalException(try_catch); + } + } + // Offset is now zero watcher->Set(offset_sym, Integer::NewFromUnsigned(0)); } diff --git a/test/fixtures/recvfd.js b/test/fixtures/recvfd.js index 09b2864b7e..8f06469389 100644 --- a/test/fixtures/recvfd.js +++ b/test/fixtures/recvfd.js @@ -22,35 +22,33 @@ function processData(s) { // version of our modified object back. Clean up when we're done. var pipeStream = new net.Stream(fd); - var drainFunc = function() { + pipeStream.resume(); + + pipeStream.write(JSON.stringify(d) + '\n', function () { pipeStream.destroy(); if (++numSentMessages == 2) { s.destroy(); } - }; - - pipeStream.addListener('drain', drainFunc); - pipeStream.resume(); - - if (pipeStream.write(JSON.stringify(d) + '\n')) { - drainFunc(); - } + }); }; // Create a UNIX socket to the path defined by argv[2] and read a file // descriptor and misc data from it. var s = new net.Stream(); + s.addListener('fd', function(fd) { receivedFDs.unshift(fd); processData(s); }); + s.addListener('data', function(data) { data.toString('utf8').trim().split('\n').forEach(function(d) { receivedData.unshift(JSON.parse(d)); }); processData(s); }); + s.connect(process.argv[2]); // vim:ts=2 sw=2 et diff --git a/test/simple/test-sendfd.js b/test/simple/test-sendfd.js index 7ed7b02c1a..8052a13667 100644 --- a/test/simple/test-sendfd.js +++ b/test/simple/test-sendfd.js @@ -53,7 +53,7 @@ var logChild = function(d) { d.split('\n').forEach(function(l) { if (l.length > 0) { - common.debug('CHILD: ' + l); + console.error('CHILD: ' + l); } }); }; @@ -96,19 +96,18 @@ var srv = net.createServer(function(s) { buf.write(JSON.stringify(DATA) + '\n', 'utf8'); s.write(str, 'utf8', pipeFDs[1]); - if (s.write(buf, undefined, pipeFDs[1])) { + + s.write(buf, pipeFDs[1], function () { + console.error("close pipeFDs[1]"); netBinding.close(pipeFDs[1]); - } else { - s.addListener('drain', function() { - netBinding.close(pipeFDs[1]); - }); - } + }); }); srv.listen(SOCK_PATH); // Spawn a child running test/fixtures/recvfd.js -var cp = child_process.spawn(process.argv[0], - [path.join(common.fixturesDir, 'recvfd.js'), SOCK_PATH]); +var cp = child_process.spawn(process.execPath, + [path.join(common.fixturesDir, 'recvfd.js'), + SOCK_PATH]); cp.stdout.addListener('data', logChild); cp.stderr.addListener('data', logChild); From d3fbe3e3d141973a00722f5631f200cdf5c69c46 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Wed, 17 Nov 2010 10:58:45 -0800 Subject: [PATCH 12/20] Emit drain and stop IOWatcher even on empty buffer --- src/node_io_watcher.cc | 245 +++++++++++++++++++++-------------------- 1 file changed, 123 insertions(+), 122 deletions(-) diff --git a/src/node_io_watcher.cc b/src/node_io_watcher.cc index 62c5646463..45bc424eea 100644 --- a/src/node_io_watcher.cc +++ b/src/node_io_watcher.cc @@ -374,155 +374,156 @@ void IOWatcher::Dump() { } } - if (to_write == 0) continue; - - ssize_t written; - - if (unix_socket) { - struct msghdr msg; - char scratch[64]; - - msg.msg_name = NULL; - msg.msg_namelen = 0; - msg.msg_iov = iov; - msg.msg_iovlen = iovcnt; - msg.msg_control = NULL; // void* - msg.msg_controllen = 0; // socklen_t - msg.msg_flags = 0; // int - - if (fd_to_send >= 0) { - struct cmsghdr *cmsg; - - msg.msg_control = (void *) scratch; - msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send)); + if (to_write > 0) { + ssize_t written; + + if (unix_socket) { + struct msghdr msg; + char scratch[64]; + + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_iov = iov; + msg.msg_iovlen = iovcnt; + msg.msg_control = NULL; // void* + msg.msg_controllen = 0; // socklen_t + msg.msg_flags = 0; // int + + if (fd_to_send >= 0) { + struct cmsghdr *cmsg; + + msg.msg_control = (void *) scratch; + msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send)); + + cmsg = CMSG_FIRSTHDR(&msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = msg.msg_controllen; + *(int*) CMSG_DATA(cmsg) = fd_to_send; + } - cmsg = CMSG_FIRSTHDR(&msg); - cmsg->cmsg_level = SOL_SOCKET; - cmsg->cmsg_type = SCM_RIGHTS; - cmsg->cmsg_len = msg.msg_controllen; - *(int*) CMSG_DATA(cmsg) = fd_to_send; + written = sendmsg(io->watcher_.fd, &msg, 0); + } else { + written = writev(io->watcher_.fd, iov, iovcnt); } - written = sendmsg(io->watcher_.fd, &msg, 0); - } else { - written = writev(io->watcher_.fd, iov, iovcnt); - } + DEBUG_PRINT("iovcnt: %d, to_write: %ld, written: %ld", + iovcnt, + to_write, + written); - DEBUG_PRINT("iovcnt: %d, to_write: %ld, written: %ld", - iovcnt, - to_write, - written); - - if (written < 0) { - // Allow EAGAIN. - // TODO: handle EMSGSIZE after sendmsg(). - if (errno == EAGAIN) { - io->Start(); - } else { - // Emit error event - if (watcher->Has(onerror_sym)) { - Local callback_v = io->handle_->Get(onerror_sym); - assert(callback_v->IsFunction()); - Local callback = Local::Cast(callback_v); + if (written < 0) { + // Allow EAGAIN. + // TODO: handle EMSGSIZE after sendmsg(). + if (errno == EAGAIN) { + io->Start(); + } else { + // Emit error event + if (watcher->Has(onerror_sym)) { + Local callback_v = io->handle_->Get(onerror_sym); + assert(callback_v->IsFunction()); + Local callback = Local::Cast(callback_v); - Local argv[1] = { Integer::New(errno) }; + Local argv[1] = { Integer::New(errno) }; - TryCatch try_catch; + TryCatch try_catch; - callback->Call(io->handle_, 1, argv); + callback->Call(io->handle_, 1, argv); - if (try_catch.HasCaught()) { - FatalException(try_catch); + if (try_catch.HasCaught()) { + FatalException(try_catch); + } } } + // Continue with the next watcher. + continue; } - // Continue with the next watcher. - continue; - } - - // what about written == 0 ? - size_t queue_size = watcher->Get(queue_size_sym)->Uint32Value(); - assert(queue_size >= offset); + // what about written == 0 ? - // Now drop the buckets that have been written. - bucket_index = 0; + size_t queue_size = watcher->Get(queue_size_sym)->Uint32Value(); + assert(queue_size >= offset); - while (written > 0) { - bucket_v = watcher->Get(first_bucket_sym); - if (!bucket_v->IsObject()) { - // No more buckets in the queue. Make sure the last_bucket_sym is - // updated and then go to the next watcher. - watcher->Set(last_bucket_sym, Null()); - break; - } + // Now drop the buckets that have been written. + bucket_index = 0; - bucket = bucket_v->ToObject(); - - Local data_v = bucket->Get(data_sym); - assert(!data_v.IsEmpty()); + while (written > 0) { + bucket_v = watcher->Get(first_bucket_sym); + if (!bucket_v->IsObject()) { + // No more buckets in the queue. Make sure the last_bucket_sym is + // updated and then go to the next watcher. + watcher->Set(last_bucket_sym, Null()); + break; + } - // At the moment we're turning all string into buffers - // so we assert that this is not a string. However, when the - // "Pointer patch" lands, this assert will need to be removed. - assert(!data_v->IsString()); - // When the "Pointer patch" lands, we will need to be careful - // to somehow store the length of strings that we're optimizing on - // so that it need not be recalculated here. Note the "Pointer patch" - // will only apply to ASCII strings - UTF8 ones will need to be - // serialized onto a buffer. - size_t bucket_len = Buffer::Length(data_v->ToObject()); + bucket = bucket_v->ToObject(); - if (unix_socket && bucket->Has(fd_sym)) { - bucket->Set(fd_sym, Null()); - } + Local data_v = bucket->Get(data_sym); + assert(!data_v.IsEmpty()); - assert(bucket_len > offset); - DEBUG_PRINT("[%ld] bucket_len: %ld, offset: %ld", bucket_index, bucket_len, offset); + // At the moment we're turning all string into buffers + // so we assert that this is not a string. However, when the + // "Pointer patch" lands, this assert will need to be removed. + assert(!data_v->IsString()); + // When the "Pointer patch" lands, we will need to be careful + // to somehow store the length of strings that we're optimizing on + // so that it need not be recalculated here. Note the "Pointer patch" + // will only apply to ASCII strings - UTF8 ones will need to be + // serialized onto a buffer. + size_t bucket_len = Buffer::Length(data_v->ToObject()); - queue_size -= written; - - // Only on the first bucket does is the offset > 0. - if (offset + written < bucket_len) { - // we have not written the entire bucket - DEBUG_PRINT("[%ld] Only wrote part of the buffer. " - "setting watcher.offset = %ld", - bucket_index, - offset + written); + if (unix_socket && bucket->Has(fd_sym)) { + bucket->Set(fd_sym, Null()); + } - watcher->Set(offset_sym, - Integer::NewFromUnsigned(offset + written)); - break; - } else { - DEBUG_PRINT("[%ld] wrote the whole bucket. discarding.", - bucket_index); - - written -= bucket_len - offset; - - Local bucket_callback_v = bucket->Get(callback_sym); - if (bucket_callback_v->IsFunction()) { - Local bucket_callback = - Local::Cast(bucket_callback_v); - TryCatch try_catch; - bucket_callback->Call(io->handle_, 0, NULL); - if (try_catch.HasCaught()) { - FatalException(try_catch); + assert(bucket_len > offset); + DEBUG_PRINT("[%ld] bucket_len: %ld, offset: %ld", bucket_index, bucket_len, offset); + + queue_size -= written; + + // Only on the first bucket does is the offset > 0. + if (offset + written < bucket_len) { + // we have not written the entire bucket + DEBUG_PRINT("[%ld] Only wrote part of the buffer. " + "setting watcher.offset = %ld", + bucket_index, + offset + written); + + watcher->Set(offset_sym, + Integer::NewFromUnsigned(offset + written)); + break; + } else { + DEBUG_PRINT("[%ld] wrote the whole bucket. discarding.", + bucket_index); + + written -= bucket_len - offset; + + Local bucket_callback_v = bucket->Get(callback_sym); + if (bucket_callback_v->IsFunction()) { + Local bucket_callback = + Local::Cast(bucket_callback_v); + TryCatch try_catch; + bucket_callback->Call(io->handle_, 0, NULL); + if (try_catch.HasCaught()) { + FatalException(try_catch); + } } + + // Offset is now zero + watcher->Set(offset_sym, Integer::NewFromUnsigned(0)); } - // Offset is now zero - watcher->Set(offset_sym, Integer::NewFromUnsigned(0)); - } + offset = 0; // the next bucket will have zero offset; + bucket_index++; - offset = 0; // the next bucket will have zero offset; - bucket_index++; + // unshift + watcher->Set(first_bucket_sym, bucket->Get(next_sym)); + } - // unshift - watcher->Set(first_bucket_sym, bucket->Get(next_sym)); + // Set the queue size. + watcher->Set(queue_size_sym, Integer::NewFromUnsigned(queue_size)); } - // Set the queue size. - watcher->Set(queue_size_sym, Integer::NewFromUnsigned(queue_size)); // Finished dumping the buckets. // From 42357645cb322864d2dade81c6e29ecd42fc7a7f Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Wed, 17 Nov 2010 11:40:43 -0800 Subject: [PATCH 13/20] Dump after ev_loop --- src/node.cc | 2 ++ src/node.js | 4 ++++ src/node_io_watcher.h | 2 +- 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/node.cc b/src/node.cc index 46c56e2b34..cdcd66d1e3 100644 --- a/src/node.cc +++ b/src/node.cc @@ -1967,6 +1967,8 @@ int Start(int argc, char *argv[]) { Tick(); + IOWatcher::Dump(); + } while (need_tick_cb || ev_activecnt(EV_DEFAULT_UC) > 0); diff --git a/src/node.js b/src/node.js index 31e7da4fa3..ea445debde 100644 --- a/src/node.js +++ b/src/node.js @@ -29,6 +29,10 @@ process.assert = function (x, msg) { var writeError = process.binding('stdio').writeError; +// Need to force-load this binding so that we can IOWatcher::Dump in +// src/node.cc +var IOWatcher = process.binding('io_watcher'); + // nextTick() var nextTickQueue = []; diff --git a/src/node_io_watcher.h b/src/node_io_watcher.h index f1ae3babf1..71d714217f 100644 --- a/src/node_io_watcher.h +++ b/src/node_io_watcher.h @@ -10,6 +10,7 @@ namespace node { class IOWatcher : ObjectWrap { public: static void Initialize(v8::Handle target); + static void Dump(); protected: static v8::Persistent constructor_template; @@ -35,7 +36,6 @@ class IOWatcher : ObjectWrap { static void Callback(EV_P_ ev_io *watcher, int revents); static void Dump(EV_P_ ev_prepare *watcher, int revents); - static void Dump(); void Start(); void Stop(); From 02039c9b53811cbce3b324c655f9bdfc7504813f Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Wed, 17 Nov 2010 11:50:51 -0800 Subject: [PATCH 14/20] 'connect' event may disconnect socket --- lib/net.js | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/net.js b/lib/net.js index 7019d9d107..6d0aef9ca8 100644 --- a/lib/net.js +++ b/lib/net.js @@ -367,6 +367,12 @@ Stream.prototype._onConnect = function () { this._connecting = false; this.resume(); this.readable = this.writable = true; + + if (this._writeWatcher.firstBucket) { + // Flush this in case any writes are queued up while connecting. + this._onWritable(); + } + try { this.emit('connect'); } catch (e) { @@ -374,12 +380,6 @@ Stream.prototype._onConnect = function () { return; } - - if (this._writeWatcher.firstBucket) { - // Flush this in case any writes are queued up while connecting. - this._onWritable(); - } - } else if (errno != EINPROGRESS) { this.destroy(errnoException(errno, 'connect')); } From 3884b4185a72d63ee0344d16390eea852e8b1f89 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 18 Nov 2010 11:18:07 -0800 Subject: [PATCH 15/20] Small clean ups --- lib/net.js | 6 ++++++ src/node_buffer.cc | 4 ++-- src/node_buffer.h | 2 +- src/node_io_watcher.cc | 7 +++---- test/simple/test-pipe.js | 14 +++++++++++--- 5 files changed, 23 insertions(+), 10 deletions(-) diff --git a/lib/net.js b/lib/net.js index 6d0aef9ca8..38b804eb43 100644 --- a/lib/net.js +++ b/lib/net.js @@ -207,6 +207,11 @@ Stream.prototype._onTimeout = function () { }; +Stream.prototype.writeQueueSize = function () { + return this._writeWatcher.queueSize || 0; +}; + + Stream.prototype.open = function (fd, type) { initStream(this); @@ -532,6 +537,7 @@ Stream.prototype.pause = function () { Stream.prototype.resume = function () { if (this.fd === null) throw new Error('Cannot resume() closed Stream.'); + this._readWatcher.stop(); this._readWatcher.set(this.fd, true, false); this._readWatcher.start(); }; diff --git a/src/node_buffer.cc b/src/node_buffer.cc index d838577a15..d95cf515ee 100644 --- a/src/node_buffer.cc +++ b/src/node_buffer.cc @@ -82,8 +82,8 @@ static size_t ByteLength (Handle string, enum encoding enc) { } -Handle Buffer::New(Handle string, - Handle encoding) { +Local Buffer::New(Handle string, + Handle encoding) { HandleScope scope; // get Buffer from global scope. diff --git a/src/node_buffer.h b/src/node_buffer.h index efc982ecf7..34b353d799 100644 --- a/src/node_buffer.h +++ b/src/node_buffer.h @@ -25,7 +25,7 @@ class Buffer : public ObjectWrap { typedef void (*free_callback)(char *data, void *hint); // C++ API for constructing fast buffer - static v8::Handle New( + static v8::Local New( v8::Handle string, v8::Handle encoding = v8::Handle()); diff --git a/src/node_io_watcher.cc b/src/node_io_watcher.cc index 45bc424eea..71a640cb0e 100644 --- a/src/node_io_watcher.cc +++ b/src/node_io_watcher.cc @@ -344,12 +344,11 @@ void IOWatcher::Dump() { // TODO: insert v8::String::Pointers() hack here. Local s = data_v->ToString(); Local e = bucket->Get(encoding_sym); - buf_object = Local::New(Buffer::New(s, e)); + buf_object = Buffer::New(s, e); bucket->Set(data_sym, buf_object); - } else if (Buffer::HasInstance(data_v)) { - buf_object = data_v->ToObject(); } else { - assert(0); + assert(Buffer::HasInstance(data_v)); + buf_object = data_v->ToObject(); } size_t l = Buffer::Length(buf_object); diff --git a/test/simple/test-pipe.js b/test/simple/test-pipe.js index 75db48eabb..3341e875ca 100644 --- a/test/simple/test-pipe.js +++ b/test/simple/test-pipe.js @@ -17,20 +17,22 @@ var bufferSize = 5 * 1024 * 1024; */ var buffer = Buffer(bufferSize); for (var i = 0; i < buffer.length; i++) { - buffer[i] = parseInt(Math.random()*10000) % 256; + buffer[i] = 100; //parseInt(Math.random()*10000) % 256; } var web = http.Server(function (req, res) { web.close(); + console.log("web server connection fd=%d", req.connection.fd); + console.log(req.headers); var socket = net.Stream(); socket.connect(tcpPort); socket.on('connect', function () { - console.log('socket connected'); + console.log('http->tcp connected fd=%d', socket.fd); }); req.pipe(socket); @@ -54,7 +56,7 @@ web.listen(webPort, startClient); var tcp = net.Server(function (s) { tcp.close(); - console.log("tcp server connection"); + console.log("tcp server connection fd=%d", s.fd); var i = 0; @@ -91,6 +93,12 @@ function startClient () { req.write(buffer); req.end(); + + console.log("request fd=%d", req.connection.fd); + + // note the queue includes http headers. + assert.ok(req.connection.writeQueueSize() > buffer.length); + req.on('response', function (res) { console.log('Got response'); res.setEncoding('utf8'); From 0b1214c16b4382deb67f70350240f910fe8a2114 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 18 Nov 2010 11:45:32 -0800 Subject: [PATCH 16/20] Fix a bug regarding queueSize, add asserts --- src/node_io_watcher.cc | 37 +++++++++++++++++++++------------ test/simple/test-dumper-unix.js | 2 ++ test/simple/test-dumper.js | 3 ++- test/simple/test-pipe.js | 1 - 4 files changed, 28 insertions(+), 15 deletions(-) diff --git a/src/node_io_watcher.cc b/src/node_io_watcher.cc index 71a640cb0e..a9477573f0 100644 --- a/src/node_io_watcher.cc +++ b/src/node_io_watcher.cc @@ -285,7 +285,7 @@ void IOWatcher::Dump() { io->dumps_++; io->last_dump_ = ev_now(EV_DEFAULT_UC); - DEBUG_PRINT("dumping fd %d", io->watcher_.fd); + DEBUG_PRINT("<%d> dumping", io->watcher_.fd); // Number of items we've stored in iov int iovcnt = 0; @@ -299,7 +299,7 @@ void IOWatcher::Dump() { // Unix sockets don't like huge messages. TCP sockets do. // TODO: handle EMSGSIZE after sendmsg(). - size_t max_to_write = unix_socket ? 8*KB : 64*KB; + size_t max_to_write = unix_socket ? 8*KB : 256*KB; int fd_to_send = -1; @@ -312,6 +312,7 @@ void IOWatcher::Dump() { } size_t first_offset = offset; + DEBUG_PRINT("<%d> offset=%ld", io->watcher_.fd, offset); // Loop over all the buckets for this particular watcher/socket in order // to fill iov. @@ -367,7 +368,7 @@ void IOWatcher::Dump() { Local fd_v = bucket->Get(fd_sym); if (fd_v->IsInt32()) { fd_to_send = fd_v->Int32Value(); - DEBUG_PRINT("got fd to send: %d", fd_to_send); + DEBUG_PRINT("<%d> got fd to send: %d", io->watcher_.fd, fd_to_send); assert(fd_to_send >= 0); } } @@ -406,7 +407,8 @@ void IOWatcher::Dump() { written = writev(io->watcher_.fd, iov, iovcnt); } - DEBUG_PRINT("iovcnt: %d, to_write: %ld, written: %ld", + DEBUG_PRINT("<%d> iovcnt: %d, to_write: %ld, written: %ld", + io->watcher_.fd, iovcnt, to_write, written); @@ -415,6 +417,7 @@ void IOWatcher::Dump() { // Allow EAGAIN. // TODO: handle EMSGSIZE after sendmsg(). if (errno == EAGAIN) { + DEBUG_PRINT("<%d> EAGAIN", io->watcher_.fd); io->Start(); } else { // Emit error event @@ -441,6 +444,7 @@ void IOWatcher::Dump() { // what about written == 0 ? size_t queue_size = watcher->Get(queue_size_sym)->Uint32Value(); + DEBUG_PRINT("<%d> queue_size=%ld", io->watcher_.fd, queue_size); assert(queue_size >= offset); // Now drop the buckets that have been written. @@ -475,26 +479,34 @@ void IOWatcher::Dump() { bucket->Set(fd_sym, Null()); } + DEBUG_PRINT("<%d,%ld> bucket_len: %ld, offset: %ld", + io->watcher_.fd, + bucket_index, + bucket_len, + offset); assert(bucket_len > offset); - DEBUG_PRINT("[%ld] bucket_len: %ld, offset: %ld", bucket_index, bucket_len, offset); - - queue_size -= written; // Only on the first bucket does is the offset > 0. if (offset + written < bucket_len) { // we have not written the entire bucket - DEBUG_PRINT("[%ld] Only wrote part of the buffer. " + DEBUG_PRINT("<%d,%ld> Only wrote part of the buffer. " "setting watcher.offset = %ld", + io->watcher_.fd, bucket_index, offset + written); watcher->Set(offset_sym, - Integer::NewFromUnsigned(offset + written)); + Integer::NewFromUnsigned(offset + written)); break; } else { - DEBUG_PRINT("[%ld] wrote the whole bucket. discarding.", + DEBUG_PRINT("<%d,%ld> wrote the whole bucket. discarding.", + io->watcher_.fd, bucket_index); + assert(bucket_len <= queue_size); + queue_size -= bucket_len; + + assert(bucket_len - offset <= written); written -= bucket_len - offset; Local bucket_callback_v = bucket->Get(callback_sym); @@ -519,7 +531,6 @@ void IOWatcher::Dump() { watcher->Set(first_bucket_sym, bucket->Get(next_sym)); } - // Set the queue size. watcher->Set(queue_size_sym, Integer::NewFromUnsigned(queue_size)); } @@ -536,7 +547,7 @@ void IOWatcher::Dump() { // Still have buckets to be written. Wait for fd to become writable. io->Start(); - DEBUG_PRINT("Started watcher %d", io->watcher_.fd); + DEBUG_PRINT("<%d> Started watcher", io->watcher_.fd); } else { // No more buckets in the queue. Make sure the last_bucket_sym is // updated and then go to the next watcher. @@ -546,7 +557,7 @@ void IOWatcher::Dump() { // become writable. io->Stop(); - DEBUG_PRINT("Stop watcher %d", io->watcher_.fd); + DEBUG_PRINT("<%d> Stop watcher", io->watcher_.fd); // Emit drain event if (watcher->Has(ondrain_sym)) { diff --git a/test/simple/test-dumper-unix.js b/test/simple/test-dumper-unix.js index d6e7457c72..2e510360d5 100644 --- a/test/simple/test-dumper-unix.js +++ b/test/simple/test-dumper-unix.js @@ -87,11 +87,13 @@ function test (N, b, cb) { w.firstBucket = { data: b }; w.lastBucket = w.firstBucket; + w.queueSize = b.length; for (var i = 0; i < N-1; i++) { var bucket = { data: b }; w.lastBucket.next = bucket; w.lastBucket = bucket; + w.queueSize += b.length; // Kind of randomly fill these buckets with fds. if (fdsSent < 5 && i % 2 == 0) { bucket.fd = 1; // send stdout diff --git a/test/simple/test-dumper.js b/test/simple/test-dumper.js index af9ad87018..c1597749f3 100644 --- a/test/simple/test-dumper.js +++ b/test/simple/test-dumper.js @@ -49,7 +49,6 @@ function test (N, b, cb) { stream.readable = true; stream.resume(); - // Count the data as it arrives on the read end of the pipe. stream.on('data', function (d) { nread += d.length; @@ -84,12 +83,14 @@ function test (N, b, cb) { w.firstBucket = { data: b }; w.lastBucket = w.firstBucket; + w.queueSize = b.length; for (var i = 0; i < N-1; i++) { var bucket = { data: b }; assert.ok(!w.lastBucket.next); w.lastBucket.next = bucket; w.lastBucket = bucket; + w.queueSize += b.length; } } diff --git a/test/simple/test-pipe.js b/test/simple/test-pipe.js index 3341e875ca..d12c2b163e 100644 --- a/test/simple/test-pipe.js +++ b/test/simple/test-pipe.js @@ -93,7 +93,6 @@ function startClient () { req.write(buffer); req.end(); - console.log("request fd=%d", req.connection.fd); // note the queue includes http headers. From 544877d12e24b84917d2358caedc00be1538bbe9 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 18 Nov 2010 11:59:15 -0800 Subject: [PATCH 17/20] Fix socket.end() problem on non-empty queue --- lib/net.js | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/net.js b/lib/net.js index 38b804eb43..50cfc293cd 100644 --- a/lib/net.js +++ b/lib/net.js @@ -73,7 +73,9 @@ IOWatcher.prototype.ondrain = function () { IOWatcher.prototype.onerror = function (errno) { assert(this.socket); - this.socket.destroy(errnoException(errno, 'write')); + var e = errnoException(errno, 'write'); + e.message += " fd=" + this.socket.fd; + this.socket.destroy(e); }; @@ -613,7 +615,7 @@ Stream.prototype.end = function (data, encoding) { this._eof = true; // If this isn't in the dumpQueue then we shutdown now. - if (!this._writeWatcher.next) this._shutdown(); + if (!this._writeWatcher.firstBucket) this._shutdown(); }; From 849792e46b36297b63e3424e4b2e183dd9f1d76f Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 18 Nov 2010 12:04:33 -0800 Subject: [PATCH 18/20] Add todo about test-pipe.js and throwing on resume() --- lib/net.js | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/net.js b/lib/net.js index 50cfc293cd..62abffdaf5 100644 --- a/lib/net.js +++ b/lib/net.js @@ -538,7 +538,11 @@ Stream.prototype.pause = function () { Stream.prototype.resume = function () { - if (this.fd === null) throw new Error('Cannot resume() closed Stream.'); + if (this.fd === null) { + // TODO, FIXME: throwing here breaks test/simple/test-pipe.js + // throw new Error('Cannot resume() closed Stream.'); + return; + } this._readWatcher.stop(); this._readWatcher.set(this.fd, true, false); this._readWatcher.start(); From 2ba3c10d6244d32a4196b59b4d46f069c7482795 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 18 Nov 2010 14:19:09 -0800 Subject: [PATCH 19/20] Only try to flush big buffers once --- lib/net.js | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/net.js b/lib/net.js index 62abffdaf5..90c44a0419 100644 --- a/lib/net.js +++ b/lib/net.js @@ -59,6 +59,8 @@ IOWatcher.prototype.ondrain = function () { if (this.socket) { var socket = this.socket; + socket._haveTriedFlush = false; + if (socket.writable || socket.readable) { require('timers').active(socket); } @@ -327,8 +329,9 @@ Stream.prototype.write = function (data /* encoding, fd, callback */) { this._onWritable(); // Insert writeWatcher into the dumpQueue require('timers').active(this); - if (queueSize > (64*1024)) { + if (queueSize > (64*1024) && !this._haveTriedFlush) { IOWatcher.flush(); + this._haveTriedFlush = true; } return queueSize < (64*1024); From 1f18648394d226246b231772257d62da9a06b9fa Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 18 Nov 2010 16:28:10 -0800 Subject: [PATCH 20/20] Fix typo --- lib/net.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/net.js b/lib/net.js index 90c44a0419..f5abf50470 100644 --- a/lib/net.js +++ b/lib/net.js @@ -147,14 +147,14 @@ function setImplmentationMethods (self) { }; -function onReadable (readable, writeable) { +function onReadable (readable, writable) { assert(this.socket); var socket = this.socket; socket._onReadable(); } -function onWritable (readable, writeable) { +function onWritable (readable, writable) { assert(this.socket); var socket = this.socket; if (socket._connecting) {