From 60ed2c5434d893fccd3702d288c2ba2ab325805a Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Tue, 9 Apr 2013 12:56:56 +0400 Subject: [PATCH] net: implement ._writev for .cork/uncork() support Add Writev method to StreamWrap class for writing mixed array of strings and buffers. Expose this method for TCP class. --- lib/net.js | 69 +++++++++- src/stream_wrap.cc | 312 +++++++++++++++++++++++++++++++++++---------- src/stream_wrap.h | 23 +++- src/tcp_wrap.cc | 1 + 4 files changed, 333 insertions(+), 72 deletions(-) diff --git a/lib/net.js b/lib/net.js index 646983f6ca..4affa8d754 100644 --- a/lib/net.js +++ b/lib/net.js @@ -130,6 +130,10 @@ function initSocketHandle(self) { if (self._handle) { self._handle.owner = self; self._handle.onread = onread; + + // If handle doesn't support writev - neither do we + if (!self._handle.writev) + self._writev = null; } } @@ -597,7 +601,7 @@ Socket.prototype.write = function(chunk, encoding, cb) { }; -Socket.prototype._write = function(data, encoding, cb) { +Socket.prototype._writeGeneric = function(writev, data, encoding, cb) { // If we are still connecting, then buffer this for later. // The Writable logic will buffer up any more writes while // waiting for this one to be done. @@ -605,7 +609,7 @@ Socket.prototype._write = function(data, encoding, cb) { this._pendingData = data; this._pendingEncoding = encoding; this.once('connect', function() { - this._write(data, encoding, cb); + this._writeGeneric(writev, data, encoding, cb); }); return; } @@ -619,8 +623,31 @@ Socket.prototype._write = function(data, encoding, cb) { return false; } - var enc = Buffer.isBuffer(data) ? 'buffer' : encoding; - var writeReq = createWriteReq(this._handle, data, enc); + var writeReq; + if (writev) { + var chunks = new Array(data.length << 1); + for (var i = 0; i < data.length; i++) { + var entry = data[i]; + var enc = entry.encoding; + var chunk = entry.chunk; + var code = getEncodingId(enc); + + // Buffer encoding, translate argument to buffer + if (code === 0 && !Buffer.isBuffer(chunk)) + chunk = new Buffer(chunk, enc); + + chunks[i * 2] = chunk; + chunks[i * 2 + 1] = code; + } + var writeReq = this._handle.writev(chunks); + + // Retain chunks + if (writeReq) + writeReq._chunks = chunks; + } else { + var enc = Buffer.isBuffer(data) ? 'buffer' : encoding; + var writeReq = createWriteReq(this._handle, data, enc); + } if (!writeReq || typeof writeReq !== 'object') return this._destroy(errnoException(process._errno, 'write'), cb); @@ -636,6 +663,40 @@ Socket.prototype._write = function(data, encoding, cb) { writeReq.cb = cb; }; + +Socket.prototype._writev = function(chunks, cb) { + this._writeGeneric(true, chunks, '', cb); +}; + + +Socket.prototype._write = function(data, encoding, cb) { + this._writeGeneric(false, data, encoding, cb); +}; + +// Important: this should have the same values as in src/stream_wrap.h +function getEncodingId(encoding) { + switch (encoding) { + case 'buffer': + return 0; + + case 'utf8': + case 'utf-8': + return 1; + + case 'ascii': + return 2; + + case 'ucs2': + case 'ucs-2': + case 'utf16le': + case 'utf-16le': + return 3; + + default: + return 0; + } +} + function createWriteReq(handle, data, encoding) { switch (encoding) { case 'buffer': diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index 01fc8de894..6c5b5b2b1f 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -40,6 +40,7 @@ namespace node { using v8::AccessorInfo; using v8::Arguments; +using v8::Array; using v8::Context; using v8::Exception; using v8::Function; @@ -283,6 +284,109 @@ void StreamWrap::OnRead2(uv_pipe_t* handle, ssize_t nread, uv_buf_t buf, } +size_t StreamWrap::WriteBuffer(WriteWrap* req, + Handle val, + uv_buf_t* buf) { + assert(Buffer::HasInstance(val)); + + // Simple non-writev case + buf->base = Buffer::Data(val); + buf->len = Buffer::Length(val); + + return buf->len; +} + + +template +size_t StreamWrap::WriteStringImpl(char* storage, + size_t storage_size, + Handle val, + uv_buf_t* buf) { + assert(val->IsString()); + Handle string = val.As(); + + size_t data_size; + switch (encoding) { + case kAscii: + data_size = string->WriteOneByte( + reinterpret_cast(storage), + 0, + -1, + String::NO_NULL_TERMINATION | String::HINT_MANY_WRITES_EXPECTED); + break; + + case kUtf8: + data_size = string->WriteUtf8( + storage, + -1, + NULL, + String::NO_NULL_TERMINATION | String::HINT_MANY_WRITES_EXPECTED); + break; + + case kUcs2: { + int chars_copied = string->Write( + reinterpret_cast(storage), + 0, + -1, + String::NO_NULL_TERMINATION | String::HINT_MANY_WRITES_EXPECTED); + data_size = chars_copied * sizeof(uint16_t); + break; + } + + default: + // Unreachable + assert(0); + } + assert(data_size <= storage_size); + + buf->base = storage; + buf->len = data_size; + + return data_size; +} + + +template +size_t StreamWrap::GetStringSizeImpl(Handle val) { + assert(val->IsString()); + Handle string = val.As(); + + switch (encoding) { + case kAscii: + return string->Length(); + break; + + case kUtf8: + if (!(string->MayContainNonAscii())) { + // If the string has only ascii characters, we know exactly how big + // the storage should be. + return string->Length(); + } else if (string->Length() < 65536) { + // A single UCS2 codepoint never takes up more than 3 utf8 bytes. + // Unless the string is really long we just allocate so much space that + // we're certain the string fits in there entirely. + // TODO: maybe check handle->write_queue_size instead of string length? + return 3 * string->Length(); + } else { + // The string is really long. Compute the allocation size that we + // actually need. + return string->Utf8Length(); + } + break; + + case kUcs2: + return string->Length() * sizeof(uint16_t); + break; + + default: + // Unreachable. + assert(0); + } + + return 0; +} + + Handle StreamWrap::WriteBuffer(const Arguments& args) { HandleScope scope(node_isolate); @@ -290,17 +394,14 @@ Handle StreamWrap::WriteBuffer(const Arguments& args) { // The first argument is a buffer. assert(args.Length() >= 1 && Buffer::HasInstance(args[0])); - Local buffer_obj = args[0]->ToObject(); - size_t offset = 0; - size_t length = Buffer::Length(buffer_obj); + size_t length = Buffer::Length(args[0]); char* storage = new char[sizeof(WriteWrap)]; WriteWrap* req_wrap = new (storage) WriteWrap(); - req_wrap->object_->SetHiddenValue(buffer_sym, buffer_obj); + req_wrap->object_->SetHiddenValue(buffer_sym, args[0]); uv_buf_t buf; - buf.base = Buffer::Data(buffer_obj) + offset; - buf.len = length; + WriteBuffer(req_wrap, args[0], &buf); int r = uv_write(&req_wrap->req_, wrap->stream_, @@ -344,38 +445,7 @@ Handle StreamWrap::WriteStringImpl(const Arguments& args) { Local string = args[0]->ToString(); // Compute the size of the storage that the string will be flattened into. - size_t storage_size; - switch (encoding) { - case kAscii: - storage_size = string->Length(); - break; - - case kUtf8: - if (!(string->MayContainNonAscii())) { - // If the string has only ascii characters, we know exactly how big - // the storage should be. - storage_size = string->Length(); - } else if (string->Length() < 65536) { - // A single UCS2 codepoint never takes up more than 3 utf8 bytes. - // Unless the string is really long we just allocate so much space that - // we're certain the string fits in there entirely. - // TODO: maybe check handle->write_queue_size instead of string length? - storage_size = 3 * string->Length(); - } else { - // The string is really long. Compute the allocation size that we - // actually need. - storage_size = string->Utf8Length(); - } - break; - - case kUcs2: - storage_size = string->Length() * sizeof(uint16_t); - break; - - default: - // Unreachable. - assert(0); - } + size_t storage_size = GetStringSizeImpl(string); if (storage_size > INT_MAX) { uv_err_t err; @@ -389,35 +459,10 @@ Handle StreamWrap::WriteStringImpl(const Arguments& args) { char* data = reinterpret_cast(ROUND_UP( reinterpret_cast(storage) + sizeof(WriteWrap), 16)); - size_t data_size; - switch (encoding) { - case kAscii: - data_size = string->WriteOneByte(reinterpret_cast(data), 0, -1, - String::NO_NULL_TERMINATION | String::HINT_MANY_WRITES_EXPECTED); - break; - - case kUtf8: - data_size = string->WriteUtf8(data, -1, NULL, - String::NO_NULL_TERMINATION | String::HINT_MANY_WRITES_EXPECTED); - break; - - case kUcs2: { - int chars_copied = string->Write((uint16_t*) data, 0, -1, - String::NO_NULL_TERMINATION | String::HINT_MANY_WRITES_EXPECTED); - data_size = chars_copied * sizeof(uint16_t); - break; - } - - default: - // Unreachable - assert(0); - } - - assert(data_size <= storage_size); uv_buf_t buf; - buf.base = data; - buf.len = data_size; + size_t data_size = + WriteStringImpl(data, storage_size, string, &buf); bool ipc_pipe = wrap->stream_->type == UV_NAMED_PIPE && ((uv_pipe_t*)wrap->stream_)->ipc; @@ -478,6 +523,143 @@ Handle StreamWrap::WriteStringImpl(const Arguments& args) { } +Handle StreamWrap::Writev(const Arguments& args) { + HandleScope scope; + + UNWRAP(StreamWrap) + + if (args.Length() < 1) + return ThrowTypeError("Not enough arguments"); + + if (!args[0]->IsArray()) + return ThrowTypeError("Argument should be array"); + + Handle chunks = args[0].As(); + size_t count = chunks->Length() >> 1; + + uv_buf_t bufs_[16]; + uv_buf_t* bufs = bufs_; + + if (ARRAY_SIZE(bufs_) < count) + bufs = new uv_buf_t[count]; + + // Determine storage size first + size_t storage_size = 0; + for (size_t i = 0; i < count; i++) { + Handle chunk = chunks->Get(i * 2); + + if (Buffer::HasInstance(chunk)) + continue; + // Buffer chunk, no additional storage required + + // String chunk + Handle string = chunk->ToString(); + switch (static_cast(chunks->Get(i * 2 + 1)->Int32Value())) { + case kAscii: + storage_size += GetStringSizeImpl(string); + break; + + case kUtf8: + storage_size += GetStringSizeImpl(string); + break; + + case kUcs2: + storage_size += GetStringSizeImpl(string); + break; + + default: + assert(0); // Unreachable + } + storage_size += 15; + } + + if (storage_size > INT_MAX) { + uv_err_t err; + err.code = UV_ENOBUFS; + SetErrno(err); + return scope.Close(v8::Null(node_isolate)); + } + + storage_size += sizeof(WriteWrap); + char* storage = new char[storage_size]; + WriteWrap* req_wrap = new (storage) WriteWrap(); + + uint32_t bytes = 0; + size_t offset = sizeof(WriteWrap); + for (size_t i = 0; i < count; i++) { + Handle chunk = chunks->Get(i * 2); + + // Write buffer + if (Buffer::HasInstance(chunk)) { + bytes += WriteBuffer(req_wrap, chunk, &bufs[i]); + continue; + } + + // Write string + offset = ROUND_UP(offset, 16); + assert(offset < storage_size); + char* str_storage = storage + offset; + size_t str_size = storage_size - offset; + + Handle string = chunk->ToString(); + switch (static_cast(chunks->Get(i * 2 + 1)->Int32Value())) { + case kAscii: + str_size = WriteStringImpl(str_storage, + str_size, + string, + &bufs[i]); + break; + case kUtf8: + str_size = WriteStringImpl(str_storage, + str_size, + string, + &bufs[i]); + break; + case kUcs2: + str_size = WriteStringImpl(str_storage, + str_size, + string, + &bufs[i]); + break; + default: + assert(0); + } + offset += str_size; + bytes += str_size; + } + + int r = uv_write(&req_wrap->req_, + wrap->stream_, + bufs, + count, + StreamWrap::AfterWrite); + + // Deallocate space + if (bufs != bufs_) + delete[] bufs; + + req_wrap->Dispatched(); + req_wrap->object_->Set(bytes_sym, Number::New(bytes)); + + wrap->UpdateWriteQueueSize(); + + if (r) { + SetErrno(uv_last_error(uv_default_loop())); + req_wrap->~WriteWrap(); + delete[] storage; + return scope.Close(v8::Null(node_isolate)); + } else { + if (wrap->stream_->type == UV_TCP) { + NODE_COUNT_NET_BYTES_SENT(bytes); + } else if (wrap->stream_->type == UV_NAMED_PIPE) { + NODE_COUNT_PIPE_BYTES_SENT(bytes); + } + + return scope.Close(req_wrap->object_); + } +} + + Handle StreamWrap::WriteAsciiString(const Arguments& args) { return WriteStringImpl(args); } diff --git a/src/stream_wrap.h b/src/stream_wrap.h index 77bd234a9d..84f6acc45f 100644 --- a/src/stream_wrap.h +++ b/src/stream_wrap.h @@ -29,10 +29,15 @@ namespace node { +// Forward declaration +class WriteWrap; + + +// Important: this should have the same values as in lib/net.js enum WriteEncoding { - kAscii, - kUtf8, - kUcs2 + kUtf8 = 0x1, + kAscii = 0x2, + kUcs2 = 0x3 }; @@ -50,12 +55,24 @@ class StreamWrap : public HandleWrap { static v8::Handle ReadStop(const v8::Arguments& args); static v8::Handle Shutdown(const v8::Arguments& args); + static v8::Handle Writev(const v8::Arguments& args); static v8::Handle WriteBuffer(const v8::Arguments& args); static v8::Handle WriteAsciiString(const v8::Arguments& args); static v8::Handle WriteUtf8String(const v8::Arguments& args); static v8::Handle WriteUcs2String(const v8::Arguments& args); protected: + static size_t WriteBuffer(WriteWrap* req, + v8::Handle val, + uv_buf_t* buf); + template + static size_t WriteStringImpl(char* storage, + size_t storage_size, + v8::Handle val, + uv_buf_t* buf); + template + static size_t GetStringSizeImpl(v8::Handle val); + StreamWrap(v8::Handle object, uv_stream_t* stream); virtual void SetHandle(uv_handle_t* h); void StateChange() { } diff --git a/src/tcp_wrap.cc b/src/tcp_wrap.cc index 3b9a9b9251..5cd70b2979 100644 --- a/src/tcp_wrap.cc +++ b/src/tcp_wrap.cc @@ -103,6 +103,7 @@ void TCPWrap::Initialize(Handle target) { NODE_SET_PROTOTYPE_METHOD(t, "writeAsciiString", StreamWrap::WriteAsciiString); NODE_SET_PROTOTYPE_METHOD(t, "writeUtf8String", StreamWrap::WriteUtf8String); NODE_SET_PROTOTYPE_METHOD(t, "writeUcs2String", StreamWrap::WriteUcs2String); + NODE_SET_PROTOTYPE_METHOD(t, "writev", StreamWrap::Writev); NODE_SET_PROTOTYPE_METHOD(t, "open", Open); NODE_SET_PROTOTYPE_METHOD(t, "bind", Bind);