Browse Source

net: implement ._writev for .cork/uncork() support

Add Writev method to StreamWrap class for writing mixed array of strings
and buffers. Expose this method for TCP class.
v0.11.2-release
Fedor Indutny 12 years ago
parent
commit
60ed2c5434
  1. 69
      lib/net.js
  2. 312
      src/stream_wrap.cc
  3. 23
      src/stream_wrap.h
  4. 1
      src/tcp_wrap.cc

69
lib/net.js

@ -130,6 +130,10 @@ function initSocketHandle(self) {
if (self._handle) {
self._handle.owner = self;
self._handle.onread = onread;
// If handle doesn't support writev - neither do we
if (!self._handle.writev)
self._writev = null;
}
}
@ -597,7 +601,7 @@ Socket.prototype.write = function(chunk, encoding, cb) {
};
Socket.prototype._write = function(data, encoding, cb) {
Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
// If we are still connecting, then buffer this for later.
// The Writable logic will buffer up any more writes while
// waiting for this one to be done.
@ -605,7 +609,7 @@ Socket.prototype._write = function(data, encoding, cb) {
this._pendingData = data;
this._pendingEncoding = encoding;
this.once('connect', function() {
this._write(data, encoding, cb);
this._writeGeneric(writev, data, encoding, cb);
});
return;
}
@ -619,8 +623,31 @@ Socket.prototype._write = function(data, encoding, cb) {
return false;
}
var enc = Buffer.isBuffer(data) ? 'buffer' : encoding;
var writeReq = createWriteReq(this._handle, data, enc);
var writeReq;
if (writev) {
var chunks = new Array(data.length << 1);
for (var i = 0; i < data.length; i++) {
var entry = data[i];
var enc = entry.encoding;
var chunk = entry.chunk;
var code = getEncodingId(enc);
// Buffer encoding, translate argument to buffer
if (code === 0 && !Buffer.isBuffer(chunk))
chunk = new Buffer(chunk, enc);
chunks[i * 2] = chunk;
chunks[i * 2 + 1] = code;
}
var writeReq = this._handle.writev(chunks);
// Retain chunks
if (writeReq)
writeReq._chunks = chunks;
} else {
var enc = Buffer.isBuffer(data) ? 'buffer' : encoding;
var writeReq = createWriteReq(this._handle, data, enc);
}
if (!writeReq || typeof writeReq !== 'object')
return this._destroy(errnoException(process._errno, 'write'), cb);
@ -636,6 +663,40 @@ Socket.prototype._write = function(data, encoding, cb) {
writeReq.cb = cb;
};
Socket.prototype._writev = function(chunks, cb) {
this._writeGeneric(true, chunks, '', cb);
};
Socket.prototype._write = function(data, encoding, cb) {
this._writeGeneric(false, data, encoding, cb);
};
// Important: this should have the same values as in src/stream_wrap.h
function getEncodingId(encoding) {
switch (encoding) {
case 'buffer':
return 0;
case 'utf8':
case 'utf-8':
return 1;
case 'ascii':
return 2;
case 'ucs2':
case 'ucs-2':
case 'utf16le':
case 'utf-16le':
return 3;
default:
return 0;
}
}
function createWriteReq(handle, data, encoding) {
switch (encoding) {
case 'buffer':

312
src/stream_wrap.cc

@ -40,6 +40,7 @@ namespace node {
using v8::AccessorInfo;
using v8::Arguments;
using v8::Array;
using v8::Context;
using v8::Exception;
using v8::Function;
@ -283,6 +284,109 @@ void StreamWrap::OnRead2(uv_pipe_t* handle, ssize_t nread, uv_buf_t buf,
}
size_t StreamWrap::WriteBuffer(WriteWrap* req,
Handle<Value> val,
uv_buf_t* buf) {
assert(Buffer::HasInstance(val));
// Simple non-writev case
buf->base = Buffer::Data(val);
buf->len = Buffer::Length(val);
return buf->len;
}
template <WriteEncoding encoding>
size_t StreamWrap::WriteStringImpl(char* storage,
size_t storage_size,
Handle<Value> val,
uv_buf_t* buf) {
assert(val->IsString());
Handle<String> string = val.As<String>();
size_t data_size;
switch (encoding) {
case kAscii:
data_size = string->WriteOneByte(
reinterpret_cast<uint8_t*>(storage),
0,
-1,
String::NO_NULL_TERMINATION | String::HINT_MANY_WRITES_EXPECTED);
break;
case kUtf8:
data_size = string->WriteUtf8(
storage,
-1,
NULL,
String::NO_NULL_TERMINATION | String::HINT_MANY_WRITES_EXPECTED);
break;
case kUcs2: {
int chars_copied = string->Write(
reinterpret_cast<uint16_t*>(storage),
0,
-1,
String::NO_NULL_TERMINATION | String::HINT_MANY_WRITES_EXPECTED);
data_size = chars_copied * sizeof(uint16_t);
break;
}
default:
// Unreachable
assert(0);
}
assert(data_size <= storage_size);
buf->base = storage;
buf->len = data_size;
return data_size;
}
template <WriteEncoding encoding>
size_t StreamWrap::GetStringSizeImpl(Handle<Value> val) {
assert(val->IsString());
Handle<String> string = val.As<String>();
switch (encoding) {
case kAscii:
return string->Length();
break;
case kUtf8:
if (!(string->MayContainNonAscii())) {
// If the string has only ascii characters, we know exactly how big
// the storage should be.
return string->Length();
} else if (string->Length() < 65536) {
// A single UCS2 codepoint never takes up more than 3 utf8 bytes.
// Unless the string is really long we just allocate so much space that
// we're certain the string fits in there entirely.
// TODO: maybe check handle->write_queue_size instead of string length?
return 3 * string->Length();
} else {
// The string is really long. Compute the allocation size that we
// actually need.
return string->Utf8Length();
}
break;
case kUcs2:
return string->Length() * sizeof(uint16_t);
break;
default:
// Unreachable.
assert(0);
}
return 0;
}
Handle<Value> StreamWrap::WriteBuffer(const Arguments& args) {
HandleScope scope(node_isolate);
@ -290,17 +394,14 @@ Handle<Value> StreamWrap::WriteBuffer(const Arguments& args) {
// The first argument is a buffer.
assert(args.Length() >= 1 && Buffer::HasInstance(args[0]));
Local<Object> buffer_obj = args[0]->ToObject();
size_t offset = 0;
size_t length = Buffer::Length(buffer_obj);
size_t length = Buffer::Length(args[0]);
char* storage = new char[sizeof(WriteWrap)];
WriteWrap* req_wrap = new (storage) WriteWrap();
req_wrap->object_->SetHiddenValue(buffer_sym, buffer_obj);
req_wrap->object_->SetHiddenValue(buffer_sym, args[0]);
uv_buf_t buf;
buf.base = Buffer::Data(buffer_obj) + offset;
buf.len = length;
WriteBuffer(req_wrap, args[0], &buf);
int r = uv_write(&req_wrap->req_,
wrap->stream_,
@ -344,38 +445,7 @@ Handle<Value> StreamWrap::WriteStringImpl(const Arguments& args) {
Local<String> string = args[0]->ToString();
// Compute the size of the storage that the string will be flattened into.
size_t storage_size;
switch (encoding) {
case kAscii:
storage_size = string->Length();
break;
case kUtf8:
if (!(string->MayContainNonAscii())) {
// If the string has only ascii characters, we know exactly how big
// the storage should be.
storage_size = string->Length();
} else if (string->Length() < 65536) {
// A single UCS2 codepoint never takes up more than 3 utf8 bytes.
// Unless the string is really long we just allocate so much space that
// we're certain the string fits in there entirely.
// TODO: maybe check handle->write_queue_size instead of string length?
storage_size = 3 * string->Length();
} else {
// The string is really long. Compute the allocation size that we
// actually need.
storage_size = string->Utf8Length();
}
break;
case kUcs2:
storage_size = string->Length() * sizeof(uint16_t);
break;
default:
// Unreachable.
assert(0);
}
size_t storage_size = GetStringSizeImpl<encoding>(string);
if (storage_size > INT_MAX) {
uv_err_t err;
@ -389,35 +459,10 @@ Handle<Value> StreamWrap::WriteStringImpl(const Arguments& args) {
char* data = reinterpret_cast<char*>(ROUND_UP(
reinterpret_cast<uintptr_t>(storage) + sizeof(WriteWrap), 16));
size_t data_size;
switch (encoding) {
case kAscii:
data_size = string->WriteOneByte(reinterpret_cast<uint8_t*>(data), 0, -1,
String::NO_NULL_TERMINATION | String::HINT_MANY_WRITES_EXPECTED);
break;
case kUtf8:
data_size = string->WriteUtf8(data, -1, NULL,
String::NO_NULL_TERMINATION | String::HINT_MANY_WRITES_EXPECTED);
break;
case kUcs2: {
int chars_copied = string->Write((uint16_t*) data, 0, -1,
String::NO_NULL_TERMINATION | String::HINT_MANY_WRITES_EXPECTED);
data_size = chars_copied * sizeof(uint16_t);
break;
}
default:
// Unreachable
assert(0);
}
assert(data_size <= storage_size);
uv_buf_t buf;
buf.base = data;
buf.len = data_size;
size_t data_size =
WriteStringImpl<encoding>(data, storage_size, string, &buf);
bool ipc_pipe = wrap->stream_->type == UV_NAMED_PIPE &&
((uv_pipe_t*)wrap->stream_)->ipc;
@ -478,6 +523,143 @@ Handle<Value> StreamWrap::WriteStringImpl(const Arguments& args) {
}
Handle<Value> StreamWrap::Writev(const Arguments& args) {
HandleScope scope;
UNWRAP(StreamWrap)
if (args.Length() < 1)
return ThrowTypeError("Not enough arguments");
if (!args[0]->IsArray())
return ThrowTypeError("Argument should be array");
Handle<Array> chunks = args[0].As<Array>();
size_t count = chunks->Length() >> 1;
uv_buf_t bufs_[16];
uv_buf_t* bufs = bufs_;
if (ARRAY_SIZE(bufs_) < count)
bufs = new uv_buf_t[count];
// Determine storage size first
size_t storage_size = 0;
for (size_t i = 0; i < count; i++) {
Handle<Value> chunk = chunks->Get(i * 2);
if (Buffer::HasInstance(chunk))
continue;
// Buffer chunk, no additional storage required
// String chunk
Handle<Value> string = chunk->ToString();
switch (static_cast<WriteEncoding>(chunks->Get(i * 2 + 1)->Int32Value())) {
case kAscii:
storage_size += GetStringSizeImpl<kAscii>(string);
break;
case kUtf8:
storage_size += GetStringSizeImpl<kUtf8>(string);
break;
case kUcs2:
storage_size += GetStringSizeImpl<kUcs2>(string);
break;
default:
assert(0); // Unreachable
}
storage_size += 15;
}
if (storage_size > INT_MAX) {
uv_err_t err;
err.code = UV_ENOBUFS;
SetErrno(err);
return scope.Close(v8::Null(node_isolate));
}
storage_size += sizeof(WriteWrap);
char* storage = new char[storage_size];
WriteWrap* req_wrap = new (storage) WriteWrap();
uint32_t bytes = 0;
size_t offset = sizeof(WriteWrap);
for (size_t i = 0; i < count; i++) {
Handle<Value> chunk = chunks->Get(i * 2);
// Write buffer
if (Buffer::HasInstance(chunk)) {
bytes += WriteBuffer(req_wrap, chunk, &bufs[i]);
continue;
}
// Write string
offset = ROUND_UP(offset, 16);
assert(offset < storage_size);
char* str_storage = storage + offset;
size_t str_size = storage_size - offset;
Handle<String> string = chunk->ToString();
switch (static_cast<WriteEncoding>(chunks->Get(i * 2 + 1)->Int32Value())) {
case kAscii:
str_size = WriteStringImpl<kAscii>(str_storage,
str_size,
string,
&bufs[i]);
break;
case kUtf8:
str_size = WriteStringImpl<kUtf8>(str_storage,
str_size,
string,
&bufs[i]);
break;
case kUcs2:
str_size = WriteStringImpl<kUcs2>(str_storage,
str_size,
string,
&bufs[i]);
break;
default:
assert(0);
}
offset += str_size;
bytes += str_size;
}
int r = uv_write(&req_wrap->req_,
wrap->stream_,
bufs,
count,
StreamWrap::AfterWrite);
// Deallocate space
if (bufs != bufs_)
delete[] bufs;
req_wrap->Dispatched();
req_wrap->object_->Set(bytes_sym, Number::New(bytes));
wrap->UpdateWriteQueueSize();
if (r) {
SetErrno(uv_last_error(uv_default_loop()));
req_wrap->~WriteWrap();
delete[] storage;
return scope.Close(v8::Null(node_isolate));
} else {
if (wrap->stream_->type == UV_TCP) {
NODE_COUNT_NET_BYTES_SENT(bytes);
} else if (wrap->stream_->type == UV_NAMED_PIPE) {
NODE_COUNT_PIPE_BYTES_SENT(bytes);
}
return scope.Close(req_wrap->object_);
}
}
Handle<Value> StreamWrap::WriteAsciiString(const Arguments& args) {
return WriteStringImpl<kAscii>(args);
}

23
src/stream_wrap.h

@ -29,10 +29,15 @@
namespace node {
// Forward declaration
class WriteWrap;
// Important: this should have the same values as in lib/net.js
enum WriteEncoding {
kAscii,
kUtf8,
kUcs2
kUtf8 = 0x1,
kAscii = 0x2,
kUcs2 = 0x3
};
@ -50,12 +55,24 @@ class StreamWrap : public HandleWrap {
static v8::Handle<v8::Value> ReadStop(const v8::Arguments& args);
static v8::Handle<v8::Value> Shutdown(const v8::Arguments& args);
static v8::Handle<v8::Value> Writev(const v8::Arguments& args);
static v8::Handle<v8::Value> WriteBuffer(const v8::Arguments& args);
static v8::Handle<v8::Value> WriteAsciiString(const v8::Arguments& args);
static v8::Handle<v8::Value> WriteUtf8String(const v8::Arguments& args);
static v8::Handle<v8::Value> WriteUcs2String(const v8::Arguments& args);
protected:
static size_t WriteBuffer(WriteWrap* req,
v8::Handle<v8::Value> val,
uv_buf_t* buf);
template <enum WriteEncoding encoding>
static size_t WriteStringImpl(char* storage,
size_t storage_size,
v8::Handle<v8::Value> val,
uv_buf_t* buf);
template <enum WriteEncoding encoding>
static size_t GetStringSizeImpl(v8::Handle<v8::Value> val);
StreamWrap(v8::Handle<v8::Object> object, uv_stream_t* stream);
virtual void SetHandle(uv_handle_t* h);
void StateChange() { }

1
src/tcp_wrap.cc

@ -103,6 +103,7 @@ void TCPWrap::Initialize(Handle<Object> target) {
NODE_SET_PROTOTYPE_METHOD(t, "writeAsciiString", StreamWrap::WriteAsciiString);
NODE_SET_PROTOTYPE_METHOD(t, "writeUtf8String", StreamWrap::WriteUtf8String);
NODE_SET_PROTOTYPE_METHOD(t, "writeUcs2String", StreamWrap::WriteUcs2String);
NODE_SET_PROTOTYPE_METHOD(t, "writev", StreamWrap::Writev);
NODE_SET_PROTOTYPE_METHOD(t, "open", Open);
NODE_SET_PROTOTYPE_METHOD(t, "bind", Bind);

Loading…
Cancel
Save