Browse Source

Fix a bug regarding queueSize, add asserts

v0.7.4-release
Ryan Dahl 14 years ago
parent
commit
0b1214c16b
  1. 37
      src/node_io_watcher.cc
  2. 2
      test/simple/test-dumper-unix.js
  3. 3
      test/simple/test-dumper.js
  4. 1
      test/simple/test-pipe.js

37
src/node_io_watcher.cc

@ -285,7 +285,7 @@ void IOWatcher::Dump() {
io->dumps_++; io->dumps_++;
io->last_dump_ = ev_now(EV_DEFAULT_UC); io->last_dump_ = ev_now(EV_DEFAULT_UC);
DEBUG_PRINT("dumping fd %d", io->watcher_.fd); DEBUG_PRINT("<%d> dumping", io->watcher_.fd);
// Number of items we've stored in iov // Number of items we've stored in iov
int iovcnt = 0; int iovcnt = 0;
@ -299,7 +299,7 @@ void IOWatcher::Dump() {
// Unix sockets don't like huge messages. TCP sockets do. // Unix sockets don't like huge messages. TCP sockets do.
// TODO: handle EMSGSIZE after sendmsg(). // TODO: handle EMSGSIZE after sendmsg().
size_t max_to_write = unix_socket ? 8*KB : 64*KB; size_t max_to_write = unix_socket ? 8*KB : 256*KB;
int fd_to_send = -1; int fd_to_send = -1;
@ -312,6 +312,7 @@ void IOWatcher::Dump() {
} }
size_t first_offset = offset; size_t first_offset = offset;
DEBUG_PRINT("<%d> offset=%ld", io->watcher_.fd, offset);
// Loop over all the buckets for this particular watcher/socket in order // Loop over all the buckets for this particular watcher/socket in order
// to fill iov. // to fill iov.
@ -367,7 +368,7 @@ void IOWatcher::Dump() {
Local<Value> fd_v = bucket->Get(fd_sym); Local<Value> fd_v = bucket->Get(fd_sym);
if (fd_v->IsInt32()) { if (fd_v->IsInt32()) {
fd_to_send = fd_v->Int32Value(); fd_to_send = fd_v->Int32Value();
DEBUG_PRINT("got fd to send: %d", fd_to_send); DEBUG_PRINT("<%d> got fd to send: %d", io->watcher_.fd, fd_to_send);
assert(fd_to_send >= 0); assert(fd_to_send >= 0);
} }
} }
@ -406,7 +407,8 @@ void IOWatcher::Dump() {
written = writev(io->watcher_.fd, iov, iovcnt); written = writev(io->watcher_.fd, iov, iovcnt);
} }
DEBUG_PRINT("iovcnt: %d, to_write: %ld, written: %ld", DEBUG_PRINT("<%d> iovcnt: %d, to_write: %ld, written: %ld",
io->watcher_.fd,
iovcnt, iovcnt,
to_write, to_write,
written); written);
@ -415,6 +417,7 @@ void IOWatcher::Dump() {
// Allow EAGAIN. // Allow EAGAIN.
// TODO: handle EMSGSIZE after sendmsg(). // TODO: handle EMSGSIZE after sendmsg().
if (errno == EAGAIN) { if (errno == EAGAIN) {
DEBUG_PRINT("<%d> EAGAIN", io->watcher_.fd);
io->Start(); io->Start();
} else { } else {
// Emit error event // Emit error event
@ -441,6 +444,7 @@ void IOWatcher::Dump() {
// what about written == 0 ? // what about written == 0 ?
size_t queue_size = watcher->Get(queue_size_sym)->Uint32Value(); size_t queue_size = watcher->Get(queue_size_sym)->Uint32Value();
DEBUG_PRINT("<%d> queue_size=%ld", io->watcher_.fd, queue_size);
assert(queue_size >= offset); assert(queue_size >= offset);
// Now drop the buckets that have been written. // Now drop the buckets that have been written.
@ -475,26 +479,34 @@ void IOWatcher::Dump() {
bucket->Set(fd_sym, Null()); bucket->Set(fd_sym, Null());
} }
DEBUG_PRINT("<%d,%ld> bucket_len: %ld, offset: %ld",
io->watcher_.fd,
bucket_index,
bucket_len,
offset);
assert(bucket_len > offset); assert(bucket_len > offset);
DEBUG_PRINT("[%ld] bucket_len: %ld, offset: %ld", bucket_index, bucket_len, offset);
queue_size -= written;
// Only on the first bucket does is the offset > 0. // Only on the first bucket does is the offset > 0.
if (offset + written < bucket_len) { if (offset + written < bucket_len) {
// we have not written the entire bucket // we have not written the entire bucket
DEBUG_PRINT("[%ld] Only wrote part of the buffer. " DEBUG_PRINT("<%d,%ld> Only wrote part of the buffer. "
"setting watcher.offset = %ld", "setting watcher.offset = %ld",
io->watcher_.fd,
bucket_index, bucket_index,
offset + written); offset + written);
watcher->Set(offset_sym, watcher->Set(offset_sym,
Integer::NewFromUnsigned(offset + written)); Integer::NewFromUnsigned(offset + written));
break; break;
} else { } else {
DEBUG_PRINT("[%ld] wrote the whole bucket. discarding.", DEBUG_PRINT("<%d,%ld> wrote the whole bucket. discarding.",
io->watcher_.fd,
bucket_index); bucket_index);
assert(bucket_len <= queue_size);
queue_size -= bucket_len;
assert(bucket_len - offset <= written);
written -= bucket_len - offset; written -= bucket_len - offset;
Local<Value> bucket_callback_v = bucket->Get(callback_sym); Local<Value> bucket_callback_v = bucket->Get(callback_sym);
@ -519,7 +531,6 @@ void IOWatcher::Dump() {
watcher->Set(first_bucket_sym, bucket->Get(next_sym)); watcher->Set(first_bucket_sym, bucket->Get(next_sym));
} }
// Set the queue size.
watcher->Set(queue_size_sym, Integer::NewFromUnsigned(queue_size)); watcher->Set(queue_size_sym, Integer::NewFromUnsigned(queue_size));
} }
@ -536,7 +547,7 @@ void IOWatcher::Dump() {
// Still have buckets to be written. Wait for fd to become writable. // Still have buckets to be written. Wait for fd to become writable.
io->Start(); io->Start();
DEBUG_PRINT("Started watcher %d", io->watcher_.fd); DEBUG_PRINT("<%d> Started watcher", io->watcher_.fd);
} else { } else {
// No more buckets in the queue. Make sure the last_bucket_sym is // No more buckets in the queue. Make sure the last_bucket_sym is
// updated and then go to the next watcher. // updated and then go to the next watcher.
@ -546,7 +557,7 @@ void IOWatcher::Dump() {
// become writable. // become writable.
io->Stop(); io->Stop();
DEBUG_PRINT("Stop watcher %d", io->watcher_.fd); DEBUG_PRINT("<%d> Stop watcher", io->watcher_.fd);
// Emit drain event // Emit drain event
if (watcher->Has(ondrain_sym)) { if (watcher->Has(ondrain_sym)) {

2
test/simple/test-dumper-unix.js

@ -87,11 +87,13 @@ function test (N, b, cb) {
w.firstBucket = { data: b }; w.firstBucket = { data: b };
w.lastBucket = w.firstBucket; w.lastBucket = w.firstBucket;
w.queueSize = b.length;
for (var i = 0; i < N-1; i++) { for (var i = 0; i < N-1; i++) {
var bucket = { data: b }; var bucket = { data: b };
w.lastBucket.next = bucket; w.lastBucket.next = bucket;
w.lastBucket = bucket; w.lastBucket = bucket;
w.queueSize += b.length;
// Kind of randomly fill these buckets with fds. // Kind of randomly fill these buckets with fds.
if (fdsSent < 5 && i % 2 == 0) { if (fdsSent < 5 && i % 2 == 0) {
bucket.fd = 1; // send stdout bucket.fd = 1; // send stdout

3
test/simple/test-dumper.js

@ -49,7 +49,6 @@ function test (N, b, cb) {
stream.readable = true; stream.readable = true;
stream.resume(); stream.resume();
// Count the data as it arrives on the read end of the pipe. // Count the data as it arrives on the read end of the pipe.
stream.on('data', function (d) { stream.on('data', function (d) {
nread += d.length; nread += d.length;
@ -84,12 +83,14 @@ function test (N, b, cb) {
w.firstBucket = { data: b }; w.firstBucket = { data: b };
w.lastBucket = w.firstBucket; w.lastBucket = w.firstBucket;
w.queueSize = b.length;
for (var i = 0; i < N-1; i++) { for (var i = 0; i < N-1; i++) {
var bucket = { data: b }; var bucket = { data: b };
assert.ok(!w.lastBucket.next); assert.ok(!w.lastBucket.next);
w.lastBucket.next = bucket; w.lastBucket.next = bucket;
w.lastBucket = bucket; w.lastBucket = bucket;
w.queueSize += b.length;
} }
} }

1
test/simple/test-pipe.js

@ -93,7 +93,6 @@ function startClient () {
req.write(buffer); req.write(buffer);
req.end(); req.end();
console.log("request fd=%d", req.connection.fd); console.log("request fd=%d", req.connection.fd);
// note the queue includes http headers. // note the queue includes http headers.

Loading…
Cancel
Save