From 471c5701c30ae0d8abfb2dbd83bd176cf18cb8e2 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Wed, 5 Oct 2011 12:33:05 -0700 Subject: [PATCH 1/4] uv_write2 uv_read2_start binding --- src/pipe_wrap.cc | 8 +- src/pipe_wrap.h | 2 +- src/stream_wrap.cc | 80 +++++++- src/stream_wrap.h | 7 +- src/tcp_wrap.cc | 474 +++++++++++++++++++++++---------------------- src/tcp_wrap.h | 37 ++++ 6 files changed, 363 insertions(+), 245 deletions(-) create mode 100644 src/tcp_wrap.h diff --git a/src/pipe_wrap.cc b/src/pipe_wrap.cc index c4f965d65f..1bf59bd2de 100644 --- a/src/pipe_wrap.cc +++ b/src/pipe_wrap.cc @@ -86,16 +86,16 @@ Handle PipeWrap::New(const Arguments& args) { assert(args.IsConstructCall()); HandleScope scope; - PipeWrap* wrap = new PipeWrap(args.This()); + PipeWrap* wrap = new PipeWrap(args.This(), args[0]->IsTrue()); assert(wrap); return scope.Close(args.This()); } -PipeWrap::PipeWrap(Handle object) : StreamWrap(object, - (uv_stream_t*) &handle_) { - int r = uv_pipe_init(uv_default_loop(), &handle_, 0); +PipeWrap::PipeWrap(Handle object, bool ipc) + : StreamWrap(object, (uv_stream_t*) &handle_) { + int r = uv_pipe_init(uv_default_loop(), &handle_, ipc); assert(r == 0); // How do we proxy this error up to javascript? // Suggestion: uv_pipe_init() returns void. handle_.data = reinterpret_cast(this); diff --git a/src/pipe_wrap.h b/src/pipe_wrap.h index 1ec51c71f6..df18e8f5bd 100644 --- a/src/pipe_wrap.h +++ b/src/pipe_wrap.h @@ -12,7 +12,7 @@ class PipeWrap : StreamWrap { static void Initialize(v8::Handle target); private: - PipeWrap(v8::Handle object); + PipeWrap(v8::Handle object, bool ipc); static v8::Handle New(const v8::Arguments& args); static v8::Handle Bind(const v8::Arguments& args); diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index 56d57b23be..548f978987 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -2,6 +2,7 @@ #include #include #include +#include #include @@ -95,7 +96,14 @@ Handle StreamWrap::ReadStart(const Arguments& args) { UNWRAP - int r = uv_read_start(wrap->stream_, OnAlloc, OnRead); + bool ipc_pipe = wrap->stream_->type == UV_NAMED_PIPE && + ((uv_pipe_t*)wrap->stream_)->ipc; + int r; + if (ipc_pipe) { + r = uv_read2_start(wrap->stream_, OnAlloc, OnRead2); + } else { + r = uv_read_start(wrap->stream_, OnAlloc, OnRead); + } // Error starting the tcp. if (r) SetErrno(uv_last_error(uv_default_loop()).code); @@ -170,9 +178,13 @@ uv_buf_t StreamWrap::OnAlloc(uv_handle_t* handle, size_t suggested_size) { return buf; } -void StreamWrap::OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) { + +void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread, + uv_buf_t buf, uv_handle_type pending) { HandleScope scope; + assert(pending == UV_UNKNOWN_HANDLE); // TODO + StreamWrap* wrap = static_cast(handle->data); // We should not be getting this callback if someone as already called @@ -201,25 +213,59 @@ void StreamWrap::OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) { } if (nread > 0) { - Local argv[3] = { + int argc = 3; + Local argv[4] = { slab_v, Integer::New(wrap->slab_offset_), Integer::New(nread) }; - MakeCallback(wrap->object_, "onread", 3, argv); + + if (pending == UV_TCP) { + // Instantiate the client javascript object and handle. + Local pending_obj = TCPWrap::Instantiate(); + + // Unwrap the client javascript object. + assert(pending_obj->InternalFieldCount() > 0); + TCPWrap* pending_wrap = + static_cast(pending_obj->GetPointerFromInternalField(0)); + + int r = uv_accept(handle, pending_wrap->GetStream()); + assert(r == 0); + + argv[3] = pending_obj; + argc++; + } else { + // We only support sending UV_TCP right now. + assert(pending == UV_UNKNOWN_HANDLE); + } + + MakeCallback(wrap->object_, "onread", argc, argv); } } +void StreamWrap::OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) { + OnReadCommon(handle, nread, buf, UV_UNKNOWN_HANDLE); +} + + +void StreamWrap::OnRead2(uv_pipe_t* handle, ssize_t nread, uv_buf_t buf, + uv_handle_type pending) { + OnReadCommon((uv_stream_t*)handle, nread, buf, pending); +} + + Handle StreamWrap::Write(const Arguments& args) { HandleScope scope; UNWRAP + bool ipc_pipe = wrap->stream_->type == UV_NAMED_PIPE && + ((uv_pipe_t*)wrap->stream_)->ipc; + // The first argument is a buffer. assert(Buffer::HasInstance(args[0])); Local buffer_obj = args[0]->ToObject(); - size_t offset = 0; size_t length = Buffer::Length(buffer_obj); @@ -239,7 +285,29 @@ Handle StreamWrap::Write(const Arguments& args) { buf.base = Buffer::Data(buffer_obj) + offset; buf.len = length; - int r = uv_write(&req_wrap->req_, wrap->stream_, &buf, 1, StreamWrap::AfterWrite); + int r; + + if (!ipc_pipe) { + r = uv_write(&req_wrap->req_, wrap->stream_, &buf, 1, StreamWrap::AfterWrite); + } else { + uv_stream_t* send_stream = NULL; + + if (args.Length() > 3) { + assert(args[3]->IsObject()); + Local send_stream_obj = args[3]->ToObject(); + assert(send_stream_obj->InternalFieldCount() > 0); + StreamWrap* send_stream_wrap = static_cast( + send_stream_obj->GetPointerFromInternalField(0)); + send_stream = send_stream_wrap->GetStream(); + } + + r = uv_write2(&req_wrap->req_, + wrap->stream_, + &buf, + 1, + send_stream, + StreamWrap::AfterWrite); + } req_wrap->Dispatched(); diff --git a/src/stream_wrap.h b/src/stream_wrap.h index dd246327c6..93ad7f08c8 100644 --- a/src/stream_wrap.h +++ b/src/stream_wrap.h @@ -32,9 +32,14 @@ class StreamWrap : public HandleWrap { // Callbacks for libuv static void AfterWrite(uv_write_t* req, int status); static uv_buf_t OnAlloc(uv_handle_t* handle, size_t suggested_size); - static void OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf); static void AfterShutdown(uv_shutdown_t* req, int status); + static void OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf); + static void OnRead2(uv_pipe_t* handle, ssize_t nread, uv_buf_t buf, + uv_handle_type pending); + static void OnReadCommon(uv_stream_t* handle, ssize_t nread, + uv_buf_t buf, uv_handle_type pending); + size_t slab_offset_; uv_stream_t* stream_; }; diff --git a/src/tcp_wrap.cc b/src/tcp_wrap.cc index 9c0786c1a8..3ba5ef055c 100644 --- a/src/tcp_wrap.cc +++ b/src/tcp_wrap.cc @@ -3,6 +3,7 @@ #include #include #include +#include // Temporary hack: libuv should provide uv_inet_pton and uv_inet_ntop. #if defined(__MINGW32__) || defined(_MSC_VER) @@ -45,8 +46,7 @@ using v8::Context; using v8::Arguments; using v8::Integer; -Persistent tcpConstructor; - +static Persistent tcpConstructor; static Persistent family_symbol; static Persistent address_symbol; static Persistent port_symbol; @@ -55,314 +55,322 @@ static Persistent port_symbol; typedef class ReqWrap ConnectWrap; -class TCPWrap : public StreamWrap { - public: +Local TCPWrap::Instantiate() { + HandleScope scope; + return scope.Close(tcpConstructor->NewInstance()); +} - static void Initialize(Handle target) { - HandleWrap::Initialize(target); - StreamWrap::Initialize(target); - HandleScope scope; +void TCPWrap::Initialize(Handle target) { + HandleWrap::Initialize(target); + StreamWrap::Initialize(target); - Local t = FunctionTemplate::New(New); - t->SetClassName(String::NewSymbol("TCP")); + HandleScope scope; - t->InstanceTemplate()->SetInternalFieldCount(1); + Local t = FunctionTemplate::New(New); + t->SetClassName(String::NewSymbol("TCP")); - NODE_SET_PROTOTYPE_METHOD(t, "close", HandleWrap::Close); + t->InstanceTemplate()->SetInternalFieldCount(1); - NODE_SET_PROTOTYPE_METHOD(t, "readStart", StreamWrap::ReadStart); - NODE_SET_PROTOTYPE_METHOD(t, "readStop", StreamWrap::ReadStop); - NODE_SET_PROTOTYPE_METHOD(t, "write", StreamWrap::Write); - NODE_SET_PROTOTYPE_METHOD(t, "shutdown", StreamWrap::Shutdown); + NODE_SET_PROTOTYPE_METHOD(t, "close", HandleWrap::Close); - NODE_SET_PROTOTYPE_METHOD(t, "bind", Bind); - NODE_SET_PROTOTYPE_METHOD(t, "listen", Listen); - NODE_SET_PROTOTYPE_METHOD(t, "connect", Connect); - NODE_SET_PROTOTYPE_METHOD(t, "bind6", Bind6); - NODE_SET_PROTOTYPE_METHOD(t, "connect6", Connect6); - NODE_SET_PROTOTYPE_METHOD(t, "getsockname", GetSockName); - NODE_SET_PROTOTYPE_METHOD(t, "getpeername", GetPeerName); + NODE_SET_PROTOTYPE_METHOD(t, "readStart", StreamWrap::ReadStart); + NODE_SET_PROTOTYPE_METHOD(t, "readStop", StreamWrap::ReadStop); + NODE_SET_PROTOTYPE_METHOD(t, "write", StreamWrap::Write); + NODE_SET_PROTOTYPE_METHOD(t, "shutdown", StreamWrap::Shutdown); - tcpConstructor = Persistent::New(t->GetFunction()); + NODE_SET_PROTOTYPE_METHOD(t, "bind", Bind); + NODE_SET_PROTOTYPE_METHOD(t, "listen", Listen); + NODE_SET_PROTOTYPE_METHOD(t, "connect", Connect); + NODE_SET_PROTOTYPE_METHOD(t, "bind6", Bind6); + NODE_SET_PROTOTYPE_METHOD(t, "connect6", Connect6); + NODE_SET_PROTOTYPE_METHOD(t, "getsockname", GetSockName); + NODE_SET_PROTOTYPE_METHOD(t, "getpeername", GetPeerName); - family_symbol = NODE_PSYMBOL("family"); - address_symbol = NODE_PSYMBOL("address"); - port_symbol = NODE_PSYMBOL("port"); + tcpConstructor = Persistent::New(t->GetFunction()); - target->Set(String::NewSymbol("TCP"), tcpConstructor); - } + family_symbol = NODE_PSYMBOL("family"); + address_symbol = NODE_PSYMBOL("address"); + port_symbol = NODE_PSYMBOL("port"); - private: - static Handle New(const Arguments& args) { - // This constructor should not be exposed to public javascript. - // Therefore we assert that we are not trying to call this as a - // normal function. - assert(args.IsConstructCall()); + target->Set(String::NewSymbol("TCP"), tcpConstructor); +} - HandleScope scope; - TCPWrap* wrap = new TCPWrap(args.This()); - assert(wrap); - return scope.Close(args.This()); - } +Handle TCPWrap::New(const Arguments& args) { + // This constructor should not be exposed to public javascript. + // Therefore we assert that we are not trying to call this as a + // normal function. + assert(args.IsConstructCall()); - TCPWrap(Handle object) : StreamWrap(object, - (uv_stream_t*) &handle_) { - int r = uv_tcp_init(uv_default_loop(), &handle_); - assert(r == 0); // How do we proxy this error up to javascript? - // Suggestion: uv_tcp_init() returns void. - UpdateWriteQueueSize(); - } + HandleScope scope; + TCPWrap* wrap = new TCPWrap(args.This()); + assert(wrap); - ~TCPWrap() { - assert(object_.IsEmpty()); - } + return scope.Close(args.This()); +} + + +TCPWrap::TCPWrap(Handle object) + : StreamWrap(object, (uv_stream_t*) &handle_) { + int r = uv_tcp_init(uv_default_loop(), &handle_); + assert(r == 0); // How do we proxy this error up to javascript? + // Suggestion: uv_tcp_init() returns void. + UpdateWriteQueueSize(); +} + + +TCPWrap::~TCPWrap() { + assert(object_.IsEmpty()); +} - static Handle GetSockName(const Arguments& args) { - HandleScope scope; - struct sockaddr_storage address; - int family; - int port; - char ip[INET6_ADDRSTRLEN]; - - UNWRAP - - int addrlen = sizeof(address); - int r = uv_tcp_getsockname(&wrap->handle_, - reinterpret_cast(&address), - &addrlen); - - Local sockname = Object::New(); - if (r != 0) { - SetErrno(uv_last_error(uv_default_loop()).code); - } else { - family = address.ss_family; - if (family == AF_INET) { - struct sockaddr_in* addrin = (struct sockaddr_in*)&address; - uv_inet_ntop(AF_INET, &(addrin->sin_addr), ip, INET6_ADDRSTRLEN); - port = ntohs(addrin->sin_port); - } else if (family == AF_INET6) { - struct sockaddr_in6* addrin6 = (struct sockaddr_in6*)&address; - uv_inet_ntop(AF_INET6, &(addrin6->sin6_addr), ip, INET6_ADDRSTRLEN); - port = ntohs(addrin6->sin6_port); - } - - sockname->Set(port_symbol, Integer::New(port)); - sockname->Set(family_symbol, Integer::New(family)); - sockname->Set(address_symbol, String::New(ip)); + +Handle TCPWrap::GetSockName(const Arguments& args) { + HandleScope scope; + struct sockaddr_storage address; + int family; + int port; + char ip[INET6_ADDRSTRLEN]; + + UNWRAP + + int addrlen = sizeof(address); + int r = uv_tcp_getsockname(&wrap->handle_, + reinterpret_cast(&address), + &addrlen); + + Local sockname = Object::New(); + if (r != 0) { + SetErrno(uv_last_error(uv_default_loop()).code); + } else { + family = address.ss_family; + if (family == AF_INET) { + struct sockaddr_in* addrin = (struct sockaddr_in*)&address; + uv_inet_ntop(AF_INET, &(addrin->sin_addr), ip, INET6_ADDRSTRLEN); + port = ntohs(addrin->sin_port); + } else if (family == AF_INET6) { + struct sockaddr_in6* addrin6 = (struct sockaddr_in6*)&address; + uv_inet_ntop(AF_INET6, &(addrin6->sin6_addr), ip, INET6_ADDRSTRLEN); + port = ntohs(addrin6->sin6_port); } - return scope.Close(sockname); + sockname->Set(port_symbol, Integer::New(port)); + sockname->Set(family_symbol, Integer::New(family)); + sockname->Set(address_symbol, String::New(ip)); } - - static Handle GetPeerName(const Arguments& args) { - HandleScope scope; - struct sockaddr_storage address; - int family; - int port; - char ip[INET6_ADDRSTRLEN]; - - UNWRAP - - int addrlen = sizeof(address); - int r = uv_tcp_getpeername(&wrap->handle_, - reinterpret_cast(&address), - &addrlen); - - Local sockname = Object::New(); - if (r != 0) { - SetErrno(uv_last_error(uv_default_loop()).code); - } else { - family = address.ss_family; - if (family == AF_INET) { - struct sockaddr_in* addrin = (struct sockaddr_in*)&address; - uv_inet_ntop(AF_INET, &(addrin->sin_addr), ip, INET6_ADDRSTRLEN); - port = ntohs(addrin->sin_port); - } else if (family == AF_INET6) { - struct sockaddr_in6* addrin6 = (struct sockaddr_in6*)&address; - uv_inet_ntop(AF_INET6, &(addrin6->sin6_addr), ip, INET6_ADDRSTRLEN); - port = ntohs(addrin6->sin6_port); - } - - sockname->Set(port_symbol, Integer::New(port)); - sockname->Set(family_symbol, Integer::New(family)); - sockname->Set(address_symbol, String::New(ip)); + return scope.Close(sockname); +} + + +Handle TCPWrap::GetPeerName(const Arguments& args) { + HandleScope scope; + struct sockaddr_storage address; + int family; + int port; + char ip[INET6_ADDRSTRLEN]; + + UNWRAP + + int addrlen = sizeof(address); + int r = uv_tcp_getpeername(&wrap->handle_, + reinterpret_cast(&address), + &addrlen); + + Local sockname = Object::New(); + if (r != 0) { + SetErrno(uv_last_error(uv_default_loop()).code); + } else { + family = address.ss_family; + if (family == AF_INET) { + struct sockaddr_in* addrin = (struct sockaddr_in*)&address; + uv_inet_ntop(AF_INET, &(addrin->sin_addr), ip, INET6_ADDRSTRLEN); + port = ntohs(addrin->sin_port); + } else if (family == AF_INET6) { + struct sockaddr_in6* addrin6 = (struct sockaddr_in6*)&address; + uv_inet_ntop(AF_INET6, &(addrin6->sin6_addr), ip, INET6_ADDRSTRLEN); + port = ntohs(addrin6->sin6_port); } - return scope.Close(sockname); + sockname->Set(port_symbol, Integer::New(port)); + sockname->Set(family_symbol, Integer::New(family)); + sockname->Set(address_symbol, String::New(ip)); } + return scope.Close(sockname); +} - static Handle Bind(const Arguments& args) { - HandleScope scope; - UNWRAP +Handle TCPWrap::Bind(const Arguments& args) { + HandleScope scope; - String::AsciiValue ip_address(args[0]->ToString()); - int port = args[1]->Int32Value(); + UNWRAP - struct sockaddr_in address = uv_ip4_addr(*ip_address, port); - int r = uv_tcp_bind(&wrap->handle_, address); + String::AsciiValue ip_address(args[0]->ToString()); + int port = args[1]->Int32Value(); - // Error starting the tcp. - if (r) SetErrno(uv_last_error(uv_default_loop()).code); + struct sockaddr_in address = uv_ip4_addr(*ip_address, port); + int r = uv_tcp_bind(&wrap->handle_, address); - return scope.Close(Integer::New(r)); - } + // Error starting the tcp. + if (r) SetErrno(uv_last_error(uv_default_loop()).code); - static Handle Bind6(const Arguments& args) { - HandleScope scope; + return scope.Close(Integer::New(r)); +} - UNWRAP - String::AsciiValue ip6_address(args[0]->ToString()); - int port = args[1]->Int32Value(); +Handle TCPWrap::Bind6(const Arguments& args) { + HandleScope scope; - struct sockaddr_in6 address = uv_ip6_addr(*ip6_address, port); - int r = uv_tcp_bind6(&wrap->handle_, address); + UNWRAP - // Error starting the tcp. - if (r) SetErrno(uv_last_error(uv_default_loop()).code); + String::AsciiValue ip6_address(args[0]->ToString()); + int port = args[1]->Int32Value(); - return scope.Close(Integer::New(r)); - } + struct sockaddr_in6 address = uv_ip6_addr(*ip6_address, port); + int r = uv_tcp_bind6(&wrap->handle_, address); - static Handle Listen(const Arguments& args) { - HandleScope scope; + // Error starting the tcp. + if (r) SetErrno(uv_last_error(uv_default_loop()).code); - UNWRAP + return scope.Close(Integer::New(r)); +} - int backlog = args[0]->Int32Value(); - int r = uv_listen((uv_stream_t*)&wrap->handle_, backlog, OnConnection); +Handle TCPWrap::Listen(const Arguments& args) { + HandleScope scope; - // Error starting the tcp. - if (r) SetErrno(uv_last_error(uv_default_loop()).code); + UNWRAP - return scope.Close(Integer::New(r)); - } + int backlog = args[0]->Int32Value(); - static void OnConnection(uv_stream_t* handle, int status) { - HandleScope scope; + int r = uv_listen((uv_stream_t*)&wrap->handle_, backlog, OnConnection); - TCPWrap* wrap = static_cast(handle->data); - assert(&wrap->handle_ == (uv_tcp_t*)handle); + // Error starting the tcp. + if (r) SetErrno(uv_last_error(uv_default_loop()).code); - // We should not be getting this callback if someone as already called - // uv_close() on the handle. - assert(wrap->object_.IsEmpty() == false); + return scope.Close(Integer::New(r)); +} - Handle argv[1]; - if (status == 0) { - // Instantiate the client javascript object and handle. - Local client_obj = tcpConstructor->NewInstance(); +void TCPWrap::OnConnection(uv_stream_t* handle, int status) { + HandleScope scope; - // Unwrap the client javascript object. - assert(client_obj->InternalFieldCount() > 0); - TCPWrap* client_wrap = - static_cast(client_obj->GetPointerFromInternalField(0)); + TCPWrap* wrap = static_cast(handle->data); + assert(&wrap->handle_ == (uv_tcp_t*)handle); - int r = uv_accept(handle, (uv_stream_t*)&client_wrap->handle_); + // We should not be getting this callback if someone as already called + // uv_close() on the handle. + assert(wrap->object_.IsEmpty() == false); - // uv_accept should always work. - assert(r == 0); + Handle argv[1]; - // Successful accept. Call the onconnection callback in JavaScript land. - argv[0] = client_obj; - } else { - SetErrno(uv_last_error(uv_default_loop()).code); - argv[0] = v8::Null(); - } + if (status == 0) { + // Instantiate the client javascript object and handle. + Local client_obj = Instantiate(); - MakeCallback(wrap->object_, "onconnection", 1, argv); - } + // Unwrap the client javascript object. + assert(client_obj->InternalFieldCount() > 0); + TCPWrap* client_wrap = + static_cast(client_obj->GetPointerFromInternalField(0)); - static void AfterConnect(uv_connect_t* req, int status) { - ConnectWrap* req_wrap = (ConnectWrap*) req->data; - TCPWrap* wrap = (TCPWrap*) req->handle->data; + int r = uv_accept(handle, (uv_stream_t*)&client_wrap->handle_); - HandleScope scope; + // uv_accept should always work. + assert(r == 0); - // The wrap and request objects should still be there. - assert(req_wrap->object_.IsEmpty() == false); - assert(wrap->object_.IsEmpty() == false); + // Successful accept. Call the onconnection callback in JavaScript land. + argv[0] = client_obj; + } else { + SetErrno(uv_last_error(uv_default_loop()).code); + argv[0] = v8::Null(); + } - if (status) { - SetErrno(uv_last_error(uv_default_loop()).code); - } + MakeCallback(wrap->object_, "onconnection", 1, argv); +} - Local argv[3] = { - Integer::New(status), - Local::New(wrap->object_), - Local::New(req_wrap->object_) - }; - MakeCallback(req_wrap->object_, "oncomplete", 3, argv); +void TCPWrap::AfterConnect(uv_connect_t* req, int status) { + ConnectWrap* req_wrap = (ConnectWrap*) req->data; + TCPWrap* wrap = (TCPWrap*) req->handle->data; - delete req_wrap; + HandleScope scope; + + // The wrap and request objects should still be there. + assert(req_wrap->object_.IsEmpty() == false); + assert(wrap->object_.IsEmpty() == false); + + if (status) { + SetErrno(uv_last_error(uv_default_loop()).code); } - static Handle Connect(const Arguments& args) { - HandleScope scope; + Local argv[3] = { + Integer::New(status), + Local::New(wrap->object_), + Local::New(req_wrap->object_) + }; - UNWRAP + MakeCallback(req_wrap->object_, "oncomplete", 3, argv); - String::AsciiValue ip_address(args[0]->ToString()); - int port = args[1]->Int32Value(); + delete req_wrap; +} - struct sockaddr_in address = uv_ip4_addr(*ip_address, port); - // I hate when people program C++ like it was C, and yet I do it too. - // I'm too lazy to come up with the perfect class hierarchy here. Let's - // just do some type munging. - ConnectWrap* req_wrap = new ConnectWrap(); - - int r = uv_tcp_connect(&req_wrap->req_, &wrap->handle_, address, - AfterConnect); +Handle TCPWrap::Connect(const Arguments& args) { + HandleScope scope; - req_wrap->Dispatched(); + UNWRAP - if (r) { - SetErrno(uv_last_error(uv_default_loop()).code); - delete req_wrap; - return scope.Close(v8::Null()); - } else { - return scope.Close(req_wrap->object_); - } - } + String::AsciiValue ip_address(args[0]->ToString()); + int port = args[1]->Int32Value(); - static Handle Connect6(const Arguments& args) { - HandleScope scope; + struct sockaddr_in address = uv_ip4_addr(*ip_address, port); - UNWRAP + // I hate when people program C++ like it was C, and yet I do it too. + // I'm too lazy to come up with the perfect class hierarchy here. Let's + // just do some type munging. + ConnectWrap* req_wrap = new ConnectWrap(); + + int r = uv_tcp_connect(&req_wrap->req_, &wrap->handle_, address, + AfterConnect); - String::AsciiValue ip_address(args[0]->ToString()); - int port = args[1]->Int32Value(); + req_wrap->Dispatched(); - struct sockaddr_in6 address = uv_ip6_addr(*ip_address, port); + if (r) { + SetErrno(uv_last_error(uv_default_loop()).code); + delete req_wrap; + return scope.Close(v8::Null()); + } else { + return scope.Close(req_wrap->object_); + } +} - ConnectWrap* req_wrap = new ConnectWrap(); - int r = uv_tcp_connect6(&req_wrap->req_, &wrap->handle_, address, - AfterConnect); +Handle TCPWrap::Connect6(const Arguments& args) { + HandleScope scope; - req_wrap->Dispatched(); + UNWRAP - if (r) { - SetErrno(uv_last_error(uv_default_loop()).code); - delete req_wrap; - return scope.Close(v8::Null()); - } else { - return scope.Close(req_wrap->object_); - } - } + String::AsciiValue ip_address(args[0]->ToString()); + int port = args[1]->Int32Value(); + + struct sockaddr_in6 address = uv_ip6_addr(*ip_address, port); + ConnectWrap* req_wrap = new ConnectWrap(); - uv_tcp_t handle_; -}; + int r = uv_tcp_connect6(&req_wrap->req_, &wrap->handle_, address, + AfterConnect); + + req_wrap->Dispatched(); + + if (r) { + SetErrno(uv_last_error(uv_default_loop()).code); + delete req_wrap; + return scope.Close(v8::Null()); + } else { + return scope.Close(req_wrap->object_); + } +} } // namespace node diff --git a/src/tcp_wrap.h b/src/tcp_wrap.h new file mode 100644 index 0000000000..f0bf9efab5 --- /dev/null +++ b/src/tcp_wrap.h @@ -0,0 +1,37 @@ +#ifndef TCP_WRAP_H_ +#define TCP_WRAP_H_ +#include + +namespace node { + +class TCPWrap : public StreamWrap { + public: + static v8::Local Instantiate(); + static TCPWrap* Unwrap(v8::Local obj); + static void Initialize(v8::Handle target); + + private: + TCPWrap(v8::Handle object); + ~TCPWrap(); + + static v8::Handle New(const v8::Arguments& args); + static v8::Handle GetSockName(const v8::Arguments& args); + static v8::Handle GetPeerName(const v8::Arguments& args); + static v8::Handle Bind(const v8::Arguments& args); + static v8::Handle Bind6(const v8::Arguments& args); + static v8::Handle Listen(const v8::Arguments& args); + static v8::Handle Connect(const v8::Arguments& args); + static v8::Handle Connect6(const v8::Arguments& args); + static v8::Handle Open(const v8::Arguments& args); + + static void OnConnection(uv_stream_t* handle, int status); + static void AfterConnect(uv_connect_t* req, int status); + + uv_tcp_t handle_; +}; + + +} // namespace node + + +#endif // TCP_WRAP_H_ From f6d889523ddf222ed3739846f3b91918e9223e2d Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 6 Oct 2011 22:56:29 -0700 Subject: [PATCH 2/4] mv test-child-process-spawn-node.js test/simple/test-child-process-fork.js --- ...est-child-process-spawn-node.js => test-child-process-fork.js} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename test/simple/{test-child-process-spawn-node.js => test-child-process-fork.js} (100%) diff --git a/test/simple/test-child-process-spawn-node.js b/test/simple/test-child-process-fork.js similarity index 100% rename from test/simple/test-child-process-spawn-node.js rename to test/simple/test-child-process-fork.js From 08c12de8e2032f9998eebcd9a7b353fc4570b67f Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Fri, 7 Oct 2011 00:47:13 -0700 Subject: [PATCH 3/4] Upgrade libuv to 886b112 --- deps/uv/Makefile | 2 +- deps/uv/src/unix/process.c | 10 +++++++--- deps/uv/src/unix/stream.c | 19 +++++++++++-------- deps/uv/test/test-fs.c | 28 +++++++++++++++++++++++++--- 4 files changed, 44 insertions(+), 15 deletions(-) diff --git a/deps/uv/Makefile b/deps/uv/Makefile index 884f514bf2..cf1e7880bf 100644 --- a/deps/uv/Makefile +++ b/deps/uv/Makefile @@ -80,7 +80,7 @@ endif TESTS=test/blackhole-server.c test/echo-server.c test/test-*.c BENCHMARKS=test/blackhole-server.c test/echo-server.c test/dns-server.c test/benchmark-*.c -all: uv.a test/run-tests$(E) test/run-benchmarks$(E) +all: uv.a $(CARES_OBJS): %.o: %.c $(CC) -o $*.o -c $(CFLAGS) $(CPPFLAGS) $< -DHAVE_CONFIG_H diff --git a/deps/uv/src/unix/process.c b/deps/uv/src/unix/process.c index 06af65d5b8..c43660d800 100644 --- a/deps/uv/src/unix/process.c +++ b/deps/uv/src/unix/process.c @@ -110,6 +110,7 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, #endif int status; pid_t pid; + int flags; uv__handle_init(loop, (uv_handle_t*)process, UV_PROCESS); loop->counters.process_init++; @@ -255,8 +256,9 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, assert(stdin_pipe[0] >= 0); uv__close(stdin_pipe[0]); uv__nonblock(stdin_pipe[1], 1); + flags = UV_WRITABLE | (options.stdin_stream->ipc ? UV_READABLE : 0); uv__stream_open((uv_stream_t*)options.stdin_stream, stdin_pipe[1], - UV_WRITABLE); + flags); } if (stdout_pipe[0] >= 0) { @@ -264,8 +266,9 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, assert(stdout_pipe[1] >= 0); uv__close(stdout_pipe[1]); uv__nonblock(stdout_pipe[0], 1); + flags = UV_READABLE | (options.stdout_stream->ipc ? UV_WRITABLE : 0); uv__stream_open((uv_stream_t*)options.stdout_stream, stdout_pipe[0], - UV_READABLE); + flags); } if (stderr_pipe[0] >= 0) { @@ -273,8 +276,9 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, assert(stderr_pipe[1] >= 0); uv__close(stderr_pipe[1]); uv__nonblock(stderr_pipe[0], 1); + flags = UV_READABLE | (options.stderr_stream->ipc ? UV_WRITABLE : 0); uv__stream_open((uv_stream_t*)options.stderr_stream, stderr_pipe[0], - UV_READABLE); + flags); } return 0; diff --git a/deps/uv/src/unix/stream.c b/deps/uv/src/unix/stream.c index 0268158252..a68d7a964a 100644 --- a/deps/uv/src/unix/stream.c +++ b/deps/uv/src/unix/stream.c @@ -563,6 +563,7 @@ static void uv__read(uv_stream_t* stream) { return; } else { /* Successful read */ + size_t buflen = buf.len; if (stream->read_cb) { stream->read_cb(stream, nread, buf); @@ -599,6 +600,11 @@ static void uv__read(uv_stream_t* stream) { } else { stream->read2_cb((uv_pipe_t*)stream, nread, buf, UV_UNKNOWN_HANDLE); } + + /* Return if we didn't fill the buffer, there is no more data to read. */ + if (nread < buflen) { + return; + } } } } @@ -907,14 +913,11 @@ int uv_read2_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, int uv_read_stop(uv_stream_t* stream) { - uv_tcp_t* tcp = (uv_tcp_t*)stream; - - ((uv_handle_t*)tcp)->flags &= ~UV_READING; - - ev_io_stop(tcp->loop->ev, &tcp->read_watcher); - tcp->read_cb = NULL; - tcp->read2_cb = NULL; - tcp->alloc_cb = NULL; + ev_io_stop(stream->loop->ev, &stream->read_watcher); + stream->flags &= ~UV_READING; + stream->read_cb = NULL; + stream->read2_cb = NULL; + stream->alloc_cb = NULL; return 0; } diff --git a/deps/uv/test/test-fs.c b/deps/uv/test/test-fs.c index d5e7e2915a..73939e2bfd 100644 --- a/deps/uv/test/test-fs.c +++ b/deps/uv/test/test-fs.c @@ -1163,13 +1163,21 @@ TEST_IMPL(fs_symlink) { TEST_IMPL(fs_utime) { utime_check_t checkme; - const char* path = "."; + const char* path = "test_file"; double atime; double mtime; uv_fs_t req; int r; + /* Setup. */ loop = uv_default_loop(); + unlink(path); + r = uv_fs_open(loop, &req, path, O_RDWR | O_CREAT, + S_IWRITE | S_IREAD, NULL); + ASSERT(r != -1); + ASSERT(req.result != -1); + uv_fs_req_cleanup(&req); + close(r); atime = mtime = 400497753; /* 1982-09-10 11:22:33 */ @@ -1196,24 +1204,35 @@ TEST_IMPL(fs_utime) { uv_run(loop); ASSERT(utime_cb_count == 1); + /* Cleanup. */ + unlink(path); + return 0; } TEST_IMPL(fs_futime) { utime_check_t checkme; - const char* path = "."; + const char* path = "test_file"; double atime; double mtime; uv_file file; uv_fs_t req; int r; + /* Setup. */ loop = uv_default_loop(); + unlink(path); + r = uv_fs_open(loop, &req, path, O_RDWR | O_CREAT, + S_IWRITE | S_IREAD, NULL); + ASSERT(r != -1); + ASSERT(req.result != -1); + uv_fs_req_cleanup(&req); + close(r); atime = mtime = 400497753; /* 1982-09-10 11:22:33 */ - r = uv_fs_open(loop, &req, path, O_RDONLY, 0, NULL); + r = uv_fs_open(loop, &req, path, O_RDWR, 0, NULL); ASSERT(r != -1); ASSERT(req.result != -1); file = req.result; /* FIXME probably not how it's supposed to be used */ @@ -1243,6 +1262,9 @@ TEST_IMPL(fs_futime) { uv_run(loop); ASSERT(futime_cb_count == 1); + /* Cleanup. */ + unlink(path); + return 0; } From 26c5905a99538f1c8f5aea131d0e24149625f254 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Fri, 7 Oct 2011 00:57:37 -0700 Subject: [PATCH 4/4] Reimplement child_process.fork Fixes test/simple/test-child-process-fork.js --- lib/child_process_uv.js | 121 ++++++++++++++++++++++++++++++++++++++-- src/node.cc | 3 + 2 files changed, 120 insertions(+), 4 deletions(-) diff --git a/lib/child_process_uv.js b/lib/child_process_uv.js index 56c3b673ed..3dbe5f11b7 100644 --- a/lib/child_process_uv.js +++ b/lib/child_process_uv.js @@ -24,11 +24,18 @@ var Process = process.binding('process_wrap').Process; var inherits = require('util').inherits; var constants; // if (!constants) constants = process.binding('constants'); +var LF = '\n'.charCodeAt(0); +var Pipe; + // constructors for lazy loading -function createPipe() { - var Pipe = process.binding('pipe_wrap').Pipe; - return new Pipe(); +function createPipe(ipc) { + // Lazy load + if (!Pipe) { + Pipe = new process.binding('pipe_wrap').Pipe; + } + + return new Pipe(ipc); } function createSocket(pipe, readable) { @@ -61,6 +68,106 @@ function mergeOptions(target, overrides) { } +function setupChannel(target, channel) { + target._channel = channel; + + var jsonBuffer = ''; + + channel.onread = function(pool, offset, length) { + if (pool) { + for (var i = 0; i < length; i++) { + if (pool[offset + i] === LF) { + jsonBuffer += pool.toString('ascii', offset, offset + i); + var message = JSON.parse(jsonBuffer); + jsonBuffer = pool.toString('ascii', i, length); + offset = i + 1; + + target.emit('message', message); + } + } + } else { + channel.close(); + target._channel = null; + } + }; + + target.send = function(message, fd) { + if (fd) throw new Error("not yet implemented"); + + if (!target._channel) throw new Error("channel closed"); + + // For overflow protection don't write if channel queue is too deep. + if (channel.writeQueueSize > 1024 * 1024) { + return false; + } + + var buffer = Buffer(JSON.stringify(message) + '\n'); + + var writeReq = channel.write(buffer); + + if (!writeReq) { + throw new Error(errno + " cannot write to IPC channel."); + } else { + writeReq.oncomplete = nop; + } + + return true; + }; + + channel.readStart(); +} + + +function nop() { } + + +exports.fork = function(modulePath, args, options) { + if (!options) options = {}; + + if (!args) args = []; + args.unshift(modulePath); + + if (options.stdinStream) { + throw new Error("stdinStream not allowed for fork()"); + } + + if (options.customFds) { + throw new Error("customFds not allowed for fork()"); + } + + // Leave stdin open for the IPC channel. stdout and stderr should be the + // same as the parent's. + options.customFds = [ -1, 1, 2 ]; + + // Just need to set this - child process won't actually use the fd. + // For backwards compat - this can be changed to 'NODE_CHANNEL' before v0.6. + options.env = { NODE_CHANNEL_FD: 42 }; + + // stdin is the IPC channel. + options.stdinStream = createPipe(true); + + var child = spawn(process.execPath, args, options); + + setupChannel(child, options.stdinStream); + + child.on('exit', function() { + if (child._channel) { + child._channel.close(); + } + }); + + return child; +}; + + +exports._forkChild = function() { + // set process.send() + var p = createPipe(true); + p.open(0); + setupChannel(process, p); +}; + + exports.exec = function(command /*, options, callback */) { var file, args, options, callback; @@ -213,7 +320,8 @@ var spawn = exports.spawn = function(file, args, options) { cwd: options ? options.cwd : null, windowsVerbatimArguments: !!(options && options.windowsVerbatimArguments), envPairs: envPairs, - customFds: options ? options.customFds : null + customFds: options ? options.customFds : null, + stdinStream: options ? options.stdinStream : null }); return child; @@ -266,6 +374,9 @@ inherits(ChildProcess, EventEmitter); function setStreamOption(name, index, options) { + // Skip if we already have options.stdinStream + if (options[name]) return; + if (options.customFds && typeof options.customFds[index] == 'number' && options.customFds[index] !== -1) { @@ -283,6 +394,8 @@ function setStreamOption(name, index, options) { ChildProcess.prototype.spawn = function(options) { var self = this; + debugger; + setStreamOption('stdinStream', 0, options); setStreamOption('stdoutStream', 1, options); setStreamOption('stderrStream', 2, options); diff --git a/src/node.cc b/src/node.cc index 61c2fcc97a..b46705943a 100644 --- a/src/node.cc +++ b/src/node.cc @@ -1078,6 +1078,9 @@ void MakeCallback(Handle object, HandleScope scope; Local callback_v = object->Get(String::New(method)); + if (!callback_v->IsFunction()) { + fprintf(stderr, "method = %s", method); + } assert(callback_v->IsFunction()); Local callback = Local::Cast(callback_v);