Browse Source

Emit drain and stop IOWatcher even on empty buffer

v0.7.4-release
Ryan Dahl 14 years ago
parent
commit
d3fbe3e3d1
  1. 245
      src/node_io_watcher.cc

245
src/node_io_watcher.cc

@ -374,155 +374,156 @@ void IOWatcher::Dump() {
} }
} }
if (to_write == 0) continue; if (to_write > 0) {
ssize_t written;
ssize_t written;
if (unix_socket) {
if (unix_socket) { struct msghdr msg;
struct msghdr msg; char scratch[64];
char scratch[64];
msg.msg_name = NULL;
msg.msg_name = NULL; msg.msg_namelen = 0;
msg.msg_namelen = 0; msg.msg_iov = iov;
msg.msg_iov = iov; msg.msg_iovlen = iovcnt;
msg.msg_iovlen = iovcnt; msg.msg_control = NULL; // void*
msg.msg_control = NULL; // void* msg.msg_controllen = 0; // socklen_t
msg.msg_controllen = 0; // socklen_t msg.msg_flags = 0; // int
msg.msg_flags = 0; // int
if (fd_to_send >= 0) {
if (fd_to_send >= 0) { struct cmsghdr *cmsg;
struct cmsghdr *cmsg;
msg.msg_control = (void *) scratch;
msg.msg_control = (void *) scratch; msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send));
msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send));
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = msg.msg_controllen;
*(int*) CMSG_DATA(cmsg) = fd_to_send;
}
cmsg = CMSG_FIRSTHDR(&msg); written = sendmsg(io->watcher_.fd, &msg, 0);
cmsg->cmsg_level = SOL_SOCKET; } else {
cmsg->cmsg_type = SCM_RIGHTS; written = writev(io->watcher_.fd, iov, iovcnt);
cmsg->cmsg_len = msg.msg_controllen;
*(int*) CMSG_DATA(cmsg) = fd_to_send;
} }
written = sendmsg(io->watcher_.fd, &msg, 0); DEBUG_PRINT("iovcnt: %d, to_write: %ld, written: %ld",
} else { iovcnt,
written = writev(io->watcher_.fd, iov, iovcnt); to_write,
} written);
DEBUG_PRINT("iovcnt: %d, to_write: %ld, written: %ld", if (written < 0) {
iovcnt, // Allow EAGAIN.
to_write, // TODO: handle EMSGSIZE after sendmsg().
written); if (errno == EAGAIN) {
io->Start();
if (written < 0) { } else {
// Allow EAGAIN. // Emit error event
// TODO: handle EMSGSIZE after sendmsg(). if (watcher->Has(onerror_sym)) {
if (errno == EAGAIN) { Local<Value> callback_v = io->handle_->Get(onerror_sym);
io->Start(); assert(callback_v->IsFunction());
} else { Local<Function> callback = Local<Function>::Cast(callback_v);
// Emit error event
if (watcher->Has(onerror_sym)) {
Local<Value> callback_v = io->handle_->Get(onerror_sym);
assert(callback_v->IsFunction());
Local<Function> callback = Local<Function>::Cast(callback_v);
Local<Value> argv[1] = { Integer::New(errno) }; Local<Value> argv[1] = { Integer::New(errno) };
TryCatch try_catch; TryCatch try_catch;
callback->Call(io->handle_, 1, argv); callback->Call(io->handle_, 1, argv);
if (try_catch.HasCaught()) { if (try_catch.HasCaught()) {
FatalException(try_catch); FatalException(try_catch);
}
} }
} }
// Continue with the next watcher.
continue;
} }
// Continue with the next watcher.
continue;
}
// what about written == 0 ?
size_t queue_size = watcher->Get(queue_size_sym)->Uint32Value(); // what about written == 0 ?
assert(queue_size >= offset);
// Now drop the buckets that have been written. size_t queue_size = watcher->Get(queue_size_sym)->Uint32Value();
bucket_index = 0; assert(queue_size >= offset);
while (written > 0) { // Now drop the buckets that have been written.
bucket_v = watcher->Get(first_bucket_sym); bucket_index = 0;
if (!bucket_v->IsObject()) {
// No more buckets in the queue. Make sure the last_bucket_sym is
// updated and then go to the next watcher.
watcher->Set(last_bucket_sym, Null());
break;
}
bucket = bucket_v->ToObject(); while (written > 0) {
bucket_v = watcher->Get(first_bucket_sym);
Local<Value> data_v = bucket->Get(data_sym); if (!bucket_v->IsObject()) {
assert(!data_v.IsEmpty()); // No more buckets in the queue. Make sure the last_bucket_sym is
// updated and then go to the next watcher.
watcher->Set(last_bucket_sym, Null());
break;
}
// At the moment we're turning all string into buffers bucket = bucket_v->ToObject();
// so we assert that this is not a string. However, when the
// "Pointer patch" lands, this assert will need to be removed.
assert(!data_v->IsString());
// When the "Pointer patch" lands, we will need to be careful
// to somehow store the length of strings that we're optimizing on
// so that it need not be recalculated here. Note the "Pointer patch"
// will only apply to ASCII strings - UTF8 ones will need to be
// serialized onto a buffer.
size_t bucket_len = Buffer::Length(data_v->ToObject());
if (unix_socket && bucket->Has(fd_sym)) { Local<Value> data_v = bucket->Get(data_sym);
bucket->Set(fd_sym, Null()); assert(!data_v.IsEmpty());
}
assert(bucket_len > offset); // At the moment we're turning all string into buffers
DEBUG_PRINT("[%ld] bucket_len: %ld, offset: %ld", bucket_index, bucket_len, offset); // so we assert that this is not a string. However, when the
// "Pointer patch" lands, this assert will need to be removed.
assert(!data_v->IsString());
// When the "Pointer patch" lands, we will need to be careful
// to somehow store the length of strings that we're optimizing on
// so that it need not be recalculated here. Note the "Pointer patch"
// will only apply to ASCII strings - UTF8 ones will need to be
// serialized onto a buffer.
size_t bucket_len = Buffer::Length(data_v->ToObject());
queue_size -= written; if (unix_socket && bucket->Has(fd_sym)) {
bucket->Set(fd_sym, Null());
// Only on the first bucket does is the offset > 0. }
if (offset + written < bucket_len) {
// we have not written the entire bucket
DEBUG_PRINT("[%ld] Only wrote part of the buffer. "
"setting watcher.offset = %ld",
bucket_index,
offset + written);
watcher->Set(offset_sym, assert(bucket_len > offset);
Integer::NewFromUnsigned(offset + written)); DEBUG_PRINT("[%ld] bucket_len: %ld, offset: %ld", bucket_index, bucket_len, offset);
break;
} else { queue_size -= written;
DEBUG_PRINT("[%ld] wrote the whole bucket. discarding.",
bucket_index); // Only on the first bucket does is the offset > 0.
if (offset + written < bucket_len) {
written -= bucket_len - offset; // we have not written the entire bucket
DEBUG_PRINT("[%ld] Only wrote part of the buffer. "
Local<Value> bucket_callback_v = bucket->Get(callback_sym); "setting watcher.offset = %ld",
if (bucket_callback_v->IsFunction()) { bucket_index,
Local<Function> bucket_callback = offset + written);
Local<Function>::Cast(bucket_callback_v);
TryCatch try_catch; watcher->Set(offset_sym,
bucket_callback->Call(io->handle_, 0, NULL); Integer::NewFromUnsigned(offset + written));
if (try_catch.HasCaught()) { break;
FatalException(try_catch); } else {
DEBUG_PRINT("[%ld] wrote the whole bucket. discarding.",
bucket_index);
written -= bucket_len - offset;
Local<Value> bucket_callback_v = bucket->Get(callback_sym);
if (bucket_callback_v->IsFunction()) {
Local<Function> bucket_callback =
Local<Function>::Cast(bucket_callback_v);
TryCatch try_catch;
bucket_callback->Call(io->handle_, 0, NULL);
if (try_catch.HasCaught()) {
FatalException(try_catch);
}
} }
// Offset is now zero
watcher->Set(offset_sym, Integer::NewFromUnsigned(0));
} }
// Offset is now zero offset = 0; // the next bucket will have zero offset;
watcher->Set(offset_sym, Integer::NewFromUnsigned(0)); bucket_index++;
}
offset = 0; // the next bucket will have zero offset; // unshift
bucket_index++; watcher->Set(first_bucket_sym, bucket->Get(next_sym));
}
// unshift // Set the queue size.
watcher->Set(first_bucket_sym, bucket->Get(next_sym)); watcher->Set(queue_size_sym, Integer::NewFromUnsigned(queue_size));
} }
// Set the queue size.
watcher->Set(queue_size_sym, Integer::NewFromUnsigned(queue_size));
// Finished dumping the buckets. // Finished dumping the buckets.
// //

Loading…
Cancel
Save