diff --git a/src/req_wrap.h b/src/req_wrap.h new file mode 100644 index 0000000000..cc60547013 --- /dev/null +++ b/src/req_wrap.h @@ -0,0 +1,35 @@ +#ifndef REQ_WRAP_H_ +#define REQ_WRAP_H_ + +namespace node { + +template +class ReqWrap { + public: + ReqWrap() { + v8::HandleScope scope; + object_ = v8::Persistent::New(v8::Object::New()); + } + + ~ReqWrap() { + // Assert that someone has called Dispatched() + assert(req_.data == this); + assert(!object_.IsEmpty()); + object_.Dispose(); + object_.Clear(); + } + + // Call this after the req has been dispatched. + void Dispatched() { + req_.data = this; + } + + v8::Persistent object_; + T req_; +}; + + +} // namespace node + + +#endif // REQ_WRAP_H_ diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc new file mode 100644 index 0000000000..df509c485b --- /dev/null +++ b/src/stream_wrap.cc @@ -0,0 +1,367 @@ +#include +#include +#include +#include + + +namespace node { + + +#define SLAB_SIZE (1024 * 1024) +#define MIN(a, b) ((a) < (b) ? (a) : (b)) + + +using v8::Object; +using v8::Handle; +using v8::Local; +using v8::Persistent; +using v8::Value; +using v8::HandleScope; +using v8::FunctionTemplate; +using v8::String; +using v8::Function; +using v8::TryCatch; +using v8::Context; +using v8::Arguments; +using v8::Integer; + + +#define UNWRAP \ + assert(!args.Holder().IsEmpty()); \ + assert(args.Holder()->InternalFieldCount() > 0); \ + StreamWrap* wrap = \ + static_cast(args.Holder()->GetPointerFromInternalField(0)); \ + if (!wrap) { \ + SetErrno(UV_EBADF); \ + return scope.Close(Integer::New(-1)); \ + } + + +typedef class ReqWrap ShutdownWrap; +typedef class ReqWrap WriteWrap; + + +static size_t slab_used; +static uv_stream_t* handle_that_last_alloced; +Persistent slab_sym; +Persistent buffer_sym; +Persistent write_queue_size_sym; +bool initialized; + + +void StreamWrap::Initialize(Handle target) { + if (initialized) { + return; + } else { + initialized = true; + } + + HandleScope scope; + + slab_sym = Persistent::New(String::NewSymbol("slab")); + buffer_sym = Persistent::New(String::NewSymbol("buffer")); + write_queue_size_sym = + Persistent::New(String::NewSymbol("writeQueueSize")); +} + + +StreamWrap::StreamWrap(Handle object, uv_stream_t* stream) { + HandleScope scope; + + stream_ = stream; + stream->data = this; + + assert(object_.IsEmpty()); + assert(object->InternalFieldCount() > 0); + object_ = v8::Persistent::New(object); + object_->SetPointerInInternalField(0, this); + + UpdateWriteQueueSize(); +} + + +StreamWrap::~StreamWrap() { + assert(object_.IsEmpty()); +} + + +// Free the C++ object on the close callback. +void StreamWrap::OnClose(uv_handle_t* handle) { + StreamWrap* wrap = static_cast(handle->data); + + // The wrap object should still be there. + assert(wrap->object_.IsEmpty() == false); + + wrap->object_->SetPointerInInternalField(0, NULL); + wrap->object_.Dispose(); + wrap->object_.Clear(); + delete wrap; +} + + +void StreamWrap::UpdateWriteQueueSize() { + object_->Set(write_queue_size_sym, Integer::New(stream_->write_queue_size)); +} + + +Handle StreamWrap::ReadStart(const Arguments& args) { + HandleScope scope; + + UNWRAP + + int r = uv_read_start(wrap->stream_, OnAlloc, OnRead); + + // Error starting the tcp. + if (r) SetErrno(uv_last_error().code); + + return scope.Close(Integer::New(r)); +} + + +Handle StreamWrap::ReadStop(const Arguments& args) { + HandleScope scope; + + UNWRAP + + int r = uv_read_stop(wrap->stream_); + + // Error starting the tcp. + if (r) SetErrno(uv_last_error().code); + + return scope.Close(Integer::New(r)); +} + + +inline char* StreamWrap::NewSlab(Handle global, + Handle wrap_obj) { + Buffer* b = Buffer::New(SLAB_SIZE); + global->SetHiddenValue(slab_sym, b->handle_); + assert(Buffer::Length(b) == SLAB_SIZE); + slab_used = 0; + wrap_obj->SetHiddenValue(slab_sym, b->handle_); + return Buffer::Data(b); +} + + +uv_buf_t StreamWrap::OnAlloc(uv_stream_t* handle, size_t suggested_size) { + HandleScope scope; + + StreamWrap* wrap = static_cast(handle->data); + assert(wrap->stream_ == handle); + + char* slab = NULL; + + Handle global = Context::GetCurrent()->Global(); + Local slab_v = global->GetHiddenValue(slab_sym); + + if (slab_v.IsEmpty()) { + // No slab currently. Create a new one. + slab = NewSlab(global, wrap->object_); + } else { + // Use existing slab. + Local slab_obj = slab_v->ToObject(); + slab = Buffer::Data(slab_obj); + assert(Buffer::Length(slab_obj) == SLAB_SIZE); + assert(SLAB_SIZE >= slab_used); + + // If less than 64kb is remaining on the slab allocate a new one. + if (SLAB_SIZE - slab_used < 64 * 1024) { + slab = NewSlab(global, wrap->object_); + } else { + wrap->object_->SetHiddenValue(slab_sym, slab_obj); + } + } + + uv_buf_t buf; + buf.base = slab + slab_used; + buf.len = MIN(SLAB_SIZE - slab_used, suggested_size); + + wrap->slab_offset_ = slab_used; + slab_used += buf.len; + + handle_that_last_alloced = handle; + + return buf; +} + +void StreamWrap::OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) { + HandleScope scope; + + StreamWrap* wrap = static_cast(handle->data); + + // We should not be getting this callback if someone as already called + // uv_close() on the handle. + assert(wrap->object_.IsEmpty() == false); + + // Remove the reference to the slab to avoid memory leaks; + Local slab_v = wrap->object_->GetHiddenValue(slab_sym); + wrap->object_->SetHiddenValue(slab_sym, v8::Null()); + + if (nread < 0) { + // EOF or Error + if (handle_that_last_alloced == handle) { + slab_used -= buf.len; + } + + SetErrno(uv_last_error().code); + MakeCallback(wrap->object_, "onread", 0, NULL); + return; + } + + assert(nread <= buf.len); + + if (handle_that_last_alloced == handle) { + slab_used -= (buf.len - nread); + } + + if (nread > 0) { + Local argv[3] = { + slab_v, + Integer::New(wrap->slab_offset_), + Integer::New(nread) + }; + MakeCallback(wrap->object_, "onread", 3, argv); + } +} + +// TODO: share me? +Handle StreamWrap::Close(const Arguments& args) { + HandleScope scope; + + UNWRAP + + assert(!wrap->object_.IsEmpty()); + int r = uv_close((uv_handle_t*) wrap->stream_, OnClose); + + if (r) { + SetErrno(uv_last_error().code); + + wrap->object_->SetPointerInInternalField(0, NULL); + wrap->object_.Dispose(); + wrap->object_.Clear(); + } + return scope.Close(Integer::New(r)); +} + + +Handle StreamWrap::Write(const Arguments& args) { + HandleScope scope; + + UNWRAP + + // The first argument is a buffer. + assert(Buffer::HasInstance(args[0])); + Local buffer_obj = args[0]->ToObject(); + + size_t offset = 0; + size_t length = Buffer::Length(buffer_obj); + + if (args.Length() > 1) { + offset = args[1]->IntegerValue(); + } + + if (args.Length() > 2) { + length = args[2]->IntegerValue(); + } + + WriteWrap* req_wrap = new WriteWrap(); + + req_wrap->object_->SetHiddenValue(buffer_sym, buffer_obj); + + uv_buf_t buf; + buf.base = Buffer::Data(buffer_obj) + offset; + buf.len = length; + + int r = uv_write(&req_wrap->req_, wrap->stream_, &buf, 1, StreamWrap::AfterWrite); + + req_wrap->Dispatched(); + + wrap->UpdateWriteQueueSize(); + + if (r) { + SetErrno(uv_last_error().code); + delete req_wrap; + return scope.Close(v8::Null()); + } else { + return scope.Close(req_wrap->object_); + } +} + + +void StreamWrap::AfterWrite(uv_write_t* req, int status) { + WriteWrap* req_wrap = (WriteWrap*) req->data; + StreamWrap* wrap = (StreamWrap*) req->handle->data; + + HandleScope scope; + + // The wrap and request objects should still be there. + assert(req_wrap->object_.IsEmpty() == false); + assert(wrap->object_.IsEmpty() == false); + + if (status) { + SetErrno(uv_last_error().code); + } + + wrap->UpdateWriteQueueSize(); + + Local argv[4] = { + Integer::New(status), + Local::New(wrap->object_), + Local::New(req_wrap->object_), + req_wrap->object_->GetHiddenValue(buffer_sym), + }; + + MakeCallback(req_wrap->object_, "oncomplete", 4, argv); + + delete req_wrap; +} + + +Handle StreamWrap::Shutdown(const Arguments& args) { + HandleScope scope; + + UNWRAP + + ShutdownWrap* req_wrap = new ShutdownWrap(); + + int r = uv_shutdown(&req_wrap->req_, wrap->stream_, AfterShutdown); + + req_wrap->Dispatched(); + + if (r) { + SetErrno(uv_last_error().code); + delete req_wrap; + return scope.Close(v8::Null()); + } else { + return scope.Close(req_wrap->object_); + } +} + + +void StreamWrap::AfterShutdown(uv_shutdown_t* req, int status) { + ReqWrap* req_wrap = (ReqWrap*) req->data; + StreamWrap* wrap = (StreamWrap*) req->handle->data; + + // The wrap and request objects should still be there. + assert(req_wrap->object_.IsEmpty() == false); + assert(wrap->object_.IsEmpty() == false); + + HandleScope scope; + + if (status) { + SetErrno(uv_last_error().code); + } + + Local argv[3] = { + Integer::New(status), + Local::New(wrap->object_), + Local::New(req_wrap->object_) + }; + + MakeCallback(req_wrap->object_, "oncomplete", 3, argv); + + delete req_wrap; +} + + +} diff --git a/src/stream_wrap.h b/src/stream_wrap.h new file mode 100644 index 0000000000..fb233a447d --- /dev/null +++ b/src/stream_wrap.h @@ -0,0 +1,42 @@ +#ifndef STREAM_WRAP_H_ +#define STREAM_WRAP_H_ + +namespace node { + +class StreamWrap { + public: + static void Initialize(v8::Handle target); + + // JavaScript functions + static v8::Handle Write(const v8::Arguments& args); + static v8::Handle ReadStart(const v8::Arguments& args); + static v8::Handle ReadStop(const v8::Arguments& args); + static v8::Handle Shutdown(const v8::Arguments& args); + static v8::Handle Close(const v8::Arguments& args); + + protected: + StreamWrap(v8::Handle object, uv_stream_t* stream); + ~StreamWrap(); + + v8::Persistent object_; + + private: + void UpdateWriteQueueSize(); + static inline char* NewSlab(v8::Handle global, v8::Handle wrap_obj); + + // Callbacks for libuv + static void AfterWrite(uv_write_t* req, int status); + static uv_buf_t OnAlloc(uv_stream_t* handle, size_t suggested_size); + static void OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf); + static void AfterShutdown(uv_shutdown_t* req, int status); + static void OnClose(uv_handle_t* handle); + + size_t slab_offset_; + uv_stream_t* stream_; +}; + + +} // namespace node + + +#endif // STREAM_WRAP_H_ diff --git a/src/tcp_wrap.cc b/src/tcp_wrap.cc index 4a7e1a08e9..cbeb4a3d7a 100644 --- a/src/tcp_wrap.cc +++ b/src/tcp_wrap.cc @@ -1,5 +1,7 @@ #include #include +#include +#include // Temporary hack: libuv should provide uv_inet_pton and uv_inet_ntop. #ifdef __MINGW32__ @@ -16,9 +18,6 @@ # define uv_inet_ntop inet_ntop #endif -#define SLAB_SIZE (1024 * 1024) -#define MIN(a, b) ((a) < (b) ? (a) : (b)) - // Rules: // // - Do not throw from handle methods. Set errno. @@ -67,53 +66,21 @@ using v8::Arguments; using v8::Integer; static Persistent constructor; -static size_t slab_used; -static uv_tcp_t* handle_that_last_alloced; -static Persistent slab_sym; -static Persistent buffer_sym; static Persistent family_symbol; static Persistent address_symbol; static Persistent port_symbol; -static Persistent write_queue_size_sym; - -class TCPWrap; - -template -class ReqWrap { - public: - ReqWrap() { - HandleScope scope; - object_ = Persistent::New(Object::New()); - } - - ~ReqWrap() { - // Assert that someone has called Dispatched() - assert(req_.data == this); - assert(!object_.IsEmpty()); - object_.Dispose(); - object_.Clear(); - } - - // Call this after the req has been dispatched. - void Dispatched() { - req_.data = this; - } - - Persistent object_; - T req_; -}; -typedef class ReqWrap ShutdownWrap; -typedef class ReqWrap WriteWrap; typedef class ReqWrap ConnectWrap; -class TCPWrap { +class TCPWrap : StreamWrap { public: static void Initialize(Handle target) { + StreamWrap::Initialize(target); + HandleScope scope; Local t = FunctionTemplate::New(New); @@ -121,25 +88,21 @@ class TCPWrap { t->InstanceTemplate()->SetInternalFieldCount(1); + NODE_SET_PROTOTYPE_METHOD(t, "readStart", StreamWrap::ReadStart); + NODE_SET_PROTOTYPE_METHOD(t, "readStop", StreamWrap::ReadStop); + NODE_SET_PROTOTYPE_METHOD(t, "write", StreamWrap::Write); + NODE_SET_PROTOTYPE_METHOD(t, "shutdown", StreamWrap::Shutdown); + NODE_SET_PROTOTYPE_METHOD(t, "close", StreamWrap::Close); + NODE_SET_PROTOTYPE_METHOD(t, "bind", Bind); NODE_SET_PROTOTYPE_METHOD(t, "listen", Listen); - NODE_SET_PROTOTYPE_METHOD(t, "readStart", ReadStart); - NODE_SET_PROTOTYPE_METHOD(t, "readStop", ReadStop); - NODE_SET_PROTOTYPE_METHOD(t, "write", Write); NODE_SET_PROTOTYPE_METHOD(t, "connect", Connect); - NODE_SET_PROTOTYPE_METHOD(t, "shutdown", Shutdown); - NODE_SET_PROTOTYPE_METHOD(t, "close", Close); NODE_SET_PROTOTYPE_METHOD(t, "bind6", Bind6); NODE_SET_PROTOTYPE_METHOD(t, "connect6", Connect6); NODE_SET_PROTOTYPE_METHOD(t, "getsockname", GetSockName); constructor = Persistent::New(t->GetFunction()); - slab_sym = Persistent::New(String::NewSymbol("slab")); - buffer_sym = Persistent::New(String::NewSymbol("buffer")); - write_queue_size_sym = - Persistent::New(String::NewSymbol("writeQueueSize")); - family_symbol = NODE_PSYMBOL("family"); address_symbol = NODE_PSYMBOL("address"); port_symbol = NODE_PSYMBOL("port"); @@ -161,40 +124,17 @@ class TCPWrap { return scope.Close(args.This()); } - TCPWrap(Handle object) { + TCPWrap(Handle object) : StreamWrap(object, + (uv_stream_t*) &handle_) { int r = uv_tcp_init(&handle_); - handle_.data = this; assert(r == 0); // How do we proxy this error up to javascript? // Suggestion: uv_tcp_init() returns void. - assert(object_.IsEmpty()); - assert(object->InternalFieldCount() > 0); - object_ = v8::Persistent::New(object); - object_->SetPointerInInternalField(0, this); - - UpdateWriteQueueSize(); } ~TCPWrap() { assert(object_.IsEmpty()); } - // Free the C++ object on the close callback. - static void OnClose(uv_handle_t* handle) { - TCPWrap* wrap = static_cast(handle->data); - - // The wrap object should still be there. - assert(wrap->object_.IsEmpty() == false); - - wrap->object_->SetPointerInInternalField(0, NULL); - wrap->object_.Dispose(); - wrap->object_.Clear(); - delete wrap; - } - - inline void UpdateWriteQueueSize() { - object_->Set(write_queue_size_sym, Integer::New(handle_.write_queue_size)); - } - static Handle GetSockName(const Arguments& args) { HandleScope scope; struct sockaddr address; @@ -315,213 +255,6 @@ class TCPWrap { MakeCallback(wrap->object_, "onconnection", 1, argv); } - static Handle ReadStart(const Arguments& args) { - HandleScope scope; - - UNWRAP - - int r = uv_read_start((uv_stream_t*)&wrap->handle_, OnAlloc, OnRead); - - // Error starting the tcp. - if (r) SetErrno(uv_last_error().code); - - return scope.Close(Integer::New(r)); - } - - static Handle ReadStop(const Arguments& args) { - HandleScope scope; - - UNWRAP - - int r = uv_read_stop((uv_stream_t*)&wrap->handle_); - - // Error starting the tcp. - if (r) SetErrno(uv_last_error().code); - - return scope.Close(Integer::New(r)); - } - - static inline char* NewSlab(Handle global, Handle wrap_obj) { - Buffer* b = Buffer::New(SLAB_SIZE); - global->SetHiddenValue(slab_sym, b->handle_); - assert(Buffer::Length(b) == SLAB_SIZE); - slab_used = 0; - wrap_obj->SetHiddenValue(slab_sym, b->handle_); - return Buffer::Data(b); - } - - static uv_buf_t OnAlloc(uv_stream_t* handle, size_t suggested_size) { - HandleScope scope; - - TCPWrap* wrap = static_cast(handle->data); - assert(&wrap->handle_ == (uv_tcp_t*)handle); - - char* slab = NULL; - - Handle global = Context::GetCurrent()->Global(); - Local slab_v = global->GetHiddenValue(slab_sym); - - if (slab_v.IsEmpty()) { - // No slab currently. Create a new one. - slab = NewSlab(global, wrap->object_); - } else { - // Use existing slab. - Local slab_obj = slab_v->ToObject(); - slab = Buffer::Data(slab_obj); - assert(Buffer::Length(slab_obj) == SLAB_SIZE); - assert(SLAB_SIZE >= slab_used); - - // If less than 64kb is remaining on the slab allocate a new one. - if (SLAB_SIZE - slab_used < 64 * 1024) { - slab = NewSlab(global, wrap->object_); - } else { - wrap->object_->SetHiddenValue(slab_sym, slab_obj); - } - } - - uv_buf_t buf; - buf.base = slab + slab_used; - buf.len = MIN(SLAB_SIZE - slab_used, suggested_size); - - wrap->slab_offset_ = slab_used; - slab_used += buf.len; - - handle_that_last_alloced = (uv_tcp_t*)handle; - - return buf; - } - - static void OnRead(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) { - HandleScope scope; - - TCPWrap* wrap = static_cast(handle->data); - - // We should not be getting this callback if someone as already called - // uv_close() on the handle. - assert(wrap->object_.IsEmpty() == false); - - // Remove the reference to the slab to avoid memory leaks; - Local slab_v = wrap->object_->GetHiddenValue(slab_sym); - wrap->object_->SetHiddenValue(slab_sym, v8::Null()); - - if (nread < 0) { - // EOF or Error - if (handle_that_last_alloced == (uv_tcp_t*)handle) { - slab_used -= buf.len; - } - - SetErrno(uv_last_error().code); - MakeCallback(wrap->object_, "onread", 0, NULL); - return; - } - - assert(nread <= buf.len); - - if (handle_that_last_alloced == (uv_tcp_t*)handle) { - slab_used -= (buf.len - nread); - } - - if (nread > 0) { - Local argv[3] = { - slab_v, - Integer::New(wrap->slab_offset_), - Integer::New(nread) - }; - MakeCallback(wrap->object_, "onread", 3, argv); - } - } - - // TODO: share me? - static Handle Close(const Arguments& args) { - HandleScope scope; - - UNWRAP - - assert(!wrap->object_.IsEmpty()); - int r = uv_close((uv_handle_t*) &wrap->handle_, OnClose); - - if (r) { - SetErrno(uv_last_error().code); - - wrap->object_->SetPointerInInternalField(0, NULL); - wrap->object_.Dispose(); - wrap->object_.Clear(); - } - return scope.Close(Integer::New(r)); - } - - static void AfterWrite(uv_write_t* req, int status) { - WriteWrap* req_wrap = (WriteWrap*) req->data; - TCPWrap* wrap = (TCPWrap*) req->handle->data; - - HandleScope scope; - - // The wrap and request objects should still be there. - assert(req_wrap->object_.IsEmpty() == false); - assert(wrap->object_.IsEmpty() == false); - - if (status) { - SetErrno(uv_last_error().code); - } - - wrap->UpdateWriteQueueSize(); - - Local argv[4] = { - Integer::New(status), - Local::New(wrap->object_), - Local::New(req_wrap->object_), - req_wrap->object_->GetHiddenValue(buffer_sym), - }; - - MakeCallback(req_wrap->object_, "oncomplete", 4, argv); - - delete req_wrap; - } - - static Handle Write(const Arguments& args) { - HandleScope scope; - - UNWRAP - - // The first argument is a buffer. - assert(Buffer::HasInstance(args[0])); - Local buffer_obj = args[0]->ToObject(); - - size_t offset = 0; - size_t length = Buffer::Length(buffer_obj); - - if (args.Length() > 1) { - offset = args[1]->IntegerValue(); - } - - if (args.Length() > 2) { - length = args[2]->IntegerValue(); - } - - WriteWrap* req_wrap = new WriteWrap(); - - req_wrap->object_->SetHiddenValue(buffer_sym, buffer_obj); - - uv_buf_t buf; - buf.base = Buffer::Data(buffer_obj) + offset; - buf.len = length; - - int r = uv_write(&req_wrap->req_, (uv_stream_t*)&wrap->handle_, &buf, 1, - AfterWrite); - - req_wrap->Dispatched(); - - wrap->UpdateWriteQueueSize(); - - if (r) { - SetErrno(uv_last_error().code); - delete req_wrap; - return scope.Close(v8::Null()); - } else { - return scope.Close(req_wrap->object_); - } - } - static void AfterConnect(uv_connect_t* req, int status) { ConnectWrap* req_wrap = (ConnectWrap*) req->data; TCPWrap* wrap = (TCPWrap*) req->handle->data; @@ -602,55 +335,8 @@ class TCPWrap { } } - static void AfterShutdown(uv_shutdown_t* req, int status) { - ReqWrap* req_wrap = (ReqWrap*) req->data; - TCPWrap* wrap = (TCPWrap*) req->handle->data; - - // The wrap and request objects should still be there. - assert(req_wrap->object_.IsEmpty() == false); - assert(wrap->object_.IsEmpty() == false); - - HandleScope scope; - - if (status) { - SetErrno(uv_last_error().code); - } - - Local argv[3] = { - Integer::New(status), - Local::New(wrap->object_), - Local::New(req_wrap->object_) - }; - - MakeCallback(req_wrap->object_, "oncomplete", 3, argv); - - delete req_wrap; - } - - static Handle Shutdown(const Arguments& args) { - HandleScope scope; - - UNWRAP - - ShutdownWrap* req_wrap = new ShutdownWrap(); - - int r = uv_shutdown(&req_wrap->req_, (uv_stream_t*) &wrap->handle_, - AfterShutdown); - - req_wrap->Dispatched(); - - if (r) { - SetErrno(uv_last_error().code); - delete req_wrap; - return scope.Close(v8::Null()); - } else { - return scope.Close(req_wrap->object_); - } - } uv_tcp_t handle_; - Persistent object_; - size_t slab_offset_; }; diff --git a/wscript b/wscript index ec647bad47..398856961c 100644 --- a/wscript +++ b/wscript @@ -846,6 +846,7 @@ def build(bld): src/node_dtrace.cc src/node_string.cc src/timer_wrap.cc + src/stream_wrap.cc src/tcp_wrap.cc src/cares_wrap.cc """