diff --git a/lib/net.js b/lib/net.js index 998322678e..230d8e3a58 100644 --- a/lib/net.js +++ b/lib/net.js @@ -254,8 +254,6 @@ Stream.prototype._appendBucket = function (data, encoding, fd) { if (encoding) newBucket.encoding = encoding; if (fd) newBucket.fd = fd; - var queueSize = data.length; - // TODO properly calculate queueSize if (this._writeWatcher.lastBucket) { @@ -266,7 +264,13 @@ Stream.prototype._appendBucket = function (data, encoding, fd) { this._writeWatcher.lastBucket = newBucket; - return queueSize; + if (this._writeWatcher.queueSize === undefined) { + this._writeWatcher.queueSize = 0; + } + assert(this._writeWatcher.queueSize >= 0); + this._writeWatcher.queueSize += data.length; + + return this._writeWatcher.queueSize; }; diff --git a/src/node_io_watcher.cc b/src/node_io_watcher.cc index a991dd9a8f..38f44cc46f 100644 --- a/src/node_io_watcher.cc +++ b/src/node_io_watcher.cc @@ -37,6 +37,7 @@ static Persistent fd_sym; static Persistent is_unix_socket_sym; static Persistent first_bucket_sym; static Persistent last_bucket_sym; +static Persistent queue_size_sym; void IOWatcher::Initialize(Handle target) { @@ -62,6 +63,7 @@ void IOWatcher::Initialize(Handle target) { onerror_sym = NODE_PSYMBOL("onerror"); first_bucket_sym = NODE_PSYMBOL("firstBucket"); last_bucket_sym = NODE_PSYMBOL("lastBucket"); + queue_size_sym = NODE_PSYMBOL("queueSize"); offset_sym = NODE_PSYMBOL("offset"); fd_sym = NODE_PSYMBOL("fd"); is_unix_socket_sym = NODE_PSYMBOL("isUnixSocket"); @@ -423,6 +425,9 @@ void IOWatcher::Dump(EV_P_ ev_prepare *w, int revents) { // what about written == 0 ? + size_t queue_size = watcher->Get(queue_size_sym)->Uint32Value(); + assert(queue_size >= offset); + // Now drop the buckets that have been written. bucket_index = 0; @@ -451,15 +456,15 @@ void IOWatcher::Dump(EV_P_ ev_prepare *w, int revents) { // serialized onto a buffer. size_t bucket_len = Buffer::Length(data_v->ToObject()); - if (unix_socket && bucket->Has(fd_sym)) { bucket->Set(fd_sym, Null()); } - assert(bucket_len > offset); DEBUG_PRINT("[%ld] bucket_len: %ld, offset: %ld", bucket_index, bucket_len, offset); + queue_size -= written; + // Only on the first bucket does is the offset > 0. if (offset + written < bucket_len) { // we have not written the entire bucket @@ -488,6 +493,9 @@ void IOWatcher::Dump(EV_P_ ev_prepare *w, int revents) { watcher->Set(first_bucket_sym, bucket->Get(next_sym)); } + // Set the queue size. + watcher->Set(queue_size_sym, Integer::NewFromUnsigned(queue_size)); + // Finished dumping the buckets. // // If our list of buckets is empty, we can emit 'drain' and forget about