diff --git a/src/node_io_watcher.cc b/src/node_io_watcher.cc index 62c5646463..45bc424eea 100644 --- a/src/node_io_watcher.cc +++ b/src/node_io_watcher.cc @@ -374,155 +374,156 @@ void IOWatcher::Dump() { } } - if (to_write == 0) continue; - - ssize_t written; - - if (unix_socket) { - struct msghdr msg; - char scratch[64]; - - msg.msg_name = NULL; - msg.msg_namelen = 0; - msg.msg_iov = iov; - msg.msg_iovlen = iovcnt; - msg.msg_control = NULL; // void* - msg.msg_controllen = 0; // socklen_t - msg.msg_flags = 0; // int - - if (fd_to_send >= 0) { - struct cmsghdr *cmsg; - - msg.msg_control = (void *) scratch; - msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send)); + if (to_write > 0) { + ssize_t written; + + if (unix_socket) { + struct msghdr msg; + char scratch[64]; + + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_iov = iov; + msg.msg_iovlen = iovcnt; + msg.msg_control = NULL; // void* + msg.msg_controllen = 0; // socklen_t + msg.msg_flags = 0; // int + + if (fd_to_send >= 0) { + struct cmsghdr *cmsg; + + msg.msg_control = (void *) scratch; + msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send)); + + cmsg = CMSG_FIRSTHDR(&msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = msg.msg_controllen; + *(int*) CMSG_DATA(cmsg) = fd_to_send; + } - cmsg = CMSG_FIRSTHDR(&msg); - cmsg->cmsg_level = SOL_SOCKET; - cmsg->cmsg_type = SCM_RIGHTS; - cmsg->cmsg_len = msg.msg_controllen; - *(int*) CMSG_DATA(cmsg) = fd_to_send; + written = sendmsg(io->watcher_.fd, &msg, 0); + } else { + written = writev(io->watcher_.fd, iov, iovcnt); } - written = sendmsg(io->watcher_.fd, &msg, 0); - } else { - written = writev(io->watcher_.fd, iov, iovcnt); - } + DEBUG_PRINT("iovcnt: %d, to_write: %ld, written: %ld", + iovcnt, + to_write, + written); - DEBUG_PRINT("iovcnt: %d, to_write: %ld, written: %ld", - iovcnt, - to_write, - written); - - if (written < 0) { - // Allow EAGAIN. - // TODO: handle EMSGSIZE after sendmsg(). - if (errno == EAGAIN) { - io->Start(); - } else { - // Emit error event - if (watcher->Has(onerror_sym)) { - Local callback_v = io->handle_->Get(onerror_sym); - assert(callback_v->IsFunction()); - Local callback = Local::Cast(callback_v); + if (written < 0) { + // Allow EAGAIN. + // TODO: handle EMSGSIZE after sendmsg(). + if (errno == EAGAIN) { + io->Start(); + } else { + // Emit error event + if (watcher->Has(onerror_sym)) { + Local callback_v = io->handle_->Get(onerror_sym); + assert(callback_v->IsFunction()); + Local callback = Local::Cast(callback_v); - Local argv[1] = { Integer::New(errno) }; + Local argv[1] = { Integer::New(errno) }; - TryCatch try_catch; + TryCatch try_catch; - callback->Call(io->handle_, 1, argv); + callback->Call(io->handle_, 1, argv); - if (try_catch.HasCaught()) { - FatalException(try_catch); + if (try_catch.HasCaught()) { + FatalException(try_catch); + } } } + // Continue with the next watcher. + continue; } - // Continue with the next watcher. - continue; - } - - // what about written == 0 ? - size_t queue_size = watcher->Get(queue_size_sym)->Uint32Value(); - assert(queue_size >= offset); + // what about written == 0 ? - // Now drop the buckets that have been written. - bucket_index = 0; + size_t queue_size = watcher->Get(queue_size_sym)->Uint32Value(); + assert(queue_size >= offset); - while (written > 0) { - bucket_v = watcher->Get(first_bucket_sym); - if (!bucket_v->IsObject()) { - // No more buckets in the queue. Make sure the last_bucket_sym is - // updated and then go to the next watcher. - watcher->Set(last_bucket_sym, Null()); - break; - } + // Now drop the buckets that have been written. + bucket_index = 0; - bucket = bucket_v->ToObject(); - - Local data_v = bucket->Get(data_sym); - assert(!data_v.IsEmpty()); + while (written > 0) { + bucket_v = watcher->Get(first_bucket_sym); + if (!bucket_v->IsObject()) { + // No more buckets in the queue. Make sure the last_bucket_sym is + // updated and then go to the next watcher. + watcher->Set(last_bucket_sym, Null()); + break; + } - // At the moment we're turning all string into buffers - // so we assert that this is not a string. However, when the - // "Pointer patch" lands, this assert will need to be removed. - assert(!data_v->IsString()); - // When the "Pointer patch" lands, we will need to be careful - // to somehow store the length of strings that we're optimizing on - // so that it need not be recalculated here. Note the "Pointer patch" - // will only apply to ASCII strings - UTF8 ones will need to be - // serialized onto a buffer. - size_t bucket_len = Buffer::Length(data_v->ToObject()); + bucket = bucket_v->ToObject(); - if (unix_socket && bucket->Has(fd_sym)) { - bucket->Set(fd_sym, Null()); - } + Local data_v = bucket->Get(data_sym); + assert(!data_v.IsEmpty()); - assert(bucket_len > offset); - DEBUG_PRINT("[%ld] bucket_len: %ld, offset: %ld", bucket_index, bucket_len, offset); + // At the moment we're turning all string into buffers + // so we assert that this is not a string. However, when the + // "Pointer patch" lands, this assert will need to be removed. + assert(!data_v->IsString()); + // When the "Pointer patch" lands, we will need to be careful + // to somehow store the length of strings that we're optimizing on + // so that it need not be recalculated here. Note the "Pointer patch" + // will only apply to ASCII strings - UTF8 ones will need to be + // serialized onto a buffer. + size_t bucket_len = Buffer::Length(data_v->ToObject()); - queue_size -= written; - - // Only on the first bucket does is the offset > 0. - if (offset + written < bucket_len) { - // we have not written the entire bucket - DEBUG_PRINT("[%ld] Only wrote part of the buffer. " - "setting watcher.offset = %ld", - bucket_index, - offset + written); + if (unix_socket && bucket->Has(fd_sym)) { + bucket->Set(fd_sym, Null()); + } - watcher->Set(offset_sym, - Integer::NewFromUnsigned(offset + written)); - break; - } else { - DEBUG_PRINT("[%ld] wrote the whole bucket. discarding.", - bucket_index); - - written -= bucket_len - offset; - - Local bucket_callback_v = bucket->Get(callback_sym); - if (bucket_callback_v->IsFunction()) { - Local bucket_callback = - Local::Cast(bucket_callback_v); - TryCatch try_catch; - bucket_callback->Call(io->handle_, 0, NULL); - if (try_catch.HasCaught()) { - FatalException(try_catch); + assert(bucket_len > offset); + DEBUG_PRINT("[%ld] bucket_len: %ld, offset: %ld", bucket_index, bucket_len, offset); + + queue_size -= written; + + // Only on the first bucket does is the offset > 0. + if (offset + written < bucket_len) { + // we have not written the entire bucket + DEBUG_PRINT("[%ld] Only wrote part of the buffer. " + "setting watcher.offset = %ld", + bucket_index, + offset + written); + + watcher->Set(offset_sym, + Integer::NewFromUnsigned(offset + written)); + break; + } else { + DEBUG_PRINT("[%ld] wrote the whole bucket. discarding.", + bucket_index); + + written -= bucket_len - offset; + + Local bucket_callback_v = bucket->Get(callback_sym); + if (bucket_callback_v->IsFunction()) { + Local bucket_callback = + Local::Cast(bucket_callback_v); + TryCatch try_catch; + bucket_callback->Call(io->handle_, 0, NULL); + if (try_catch.HasCaught()) { + FatalException(try_catch); + } } + + // Offset is now zero + watcher->Set(offset_sym, Integer::NewFromUnsigned(0)); } - // Offset is now zero - watcher->Set(offset_sym, Integer::NewFromUnsigned(0)); - } + offset = 0; // the next bucket will have zero offset; + bucket_index++; - offset = 0; // the next bucket will have zero offset; - bucket_index++; + // unshift + watcher->Set(first_bucket_sym, bucket->Get(next_sym)); + } - // unshift - watcher->Set(first_bucket_sym, bucket->Get(next_sym)); + // Set the queue size. + watcher->Set(queue_size_sym, Integer::NewFromUnsigned(queue_size)); } - // Set the queue size. - watcher->Set(queue_size_sym, Integer::NewFromUnsigned(queue_size)); // Finished dumping the buckets. //