diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index e29c31702b..515fa9f41e 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -22,6 +22,7 @@ #include "node.h" #include "node_buffer.h" #include "handle_wrap.h" +#include "slab_allocator.h" #include "stream_wrap.h" #include "pipe_wrap.h" #include "tcp_wrap.h" @@ -29,14 +30,11 @@ #include // abort() - -namespace node { - - #define SLAB_SIZE (1024 * 1024) -#define MIN(a, b) ((a) < (b) ? (a) : (b)) +namespace node { + using v8::Object; using v8::Handle; using v8::Local; @@ -69,26 +67,20 @@ typedef class ReqWrap ShutdownWrap; typedef class ReqWrap WriteWrap; -static size_t slab_used; -static uv_stream_t* handle_that_last_alloced; -static Persistent slab_sym; static Persistent buffer_sym; static Persistent write_queue_size_sym; +static SlabAllocator slab_allocator(SLAB_SIZE); static bool initialized; void StreamWrap::Initialize(Handle target) { - if (initialized) { - return; - } else { - initialized = true; - } + if (initialized) return; + initialized = true; HandleScope scope; HandleWrap::Initialize(target); - slab_sym = Persistent::New(String::NewSymbol("slab")); buffer_sym = Persistent::New(String::NewSymbol("buffer")); write_queue_size_sym = Persistent::New(String::NewSymbol("writeQueueSize")); @@ -152,60 +144,11 @@ Handle StreamWrap::ReadStop(const Arguments& args) { } -char* StreamWrap::NewSlab(Handle global, - Handle wrap_obj) { - HandleScope scope; - Local arg = Integer::NewFromUnsigned(SLAB_SIZE); - Local b = Buffer::constructor_template->GetFunction()-> - NewInstance(1, &arg); - if (b.IsEmpty()) return NULL; - global->SetHiddenValue(slab_sym, b); - assert(Buffer::Length(b) == SLAB_SIZE); - slab_used = 0; - wrap_obj->SetHiddenValue(slab_sym, b); - return Buffer::Data(b); -} - - uv_buf_t StreamWrap::OnAlloc(uv_handle_t* handle, size_t suggested_size) { - HandleScope scope; - StreamWrap* wrap = static_cast(handle->data); assert(wrap->stream_ == reinterpret_cast(handle)); - - char* slab = NULL; - - Handle global = Context::GetCurrent()->Global(); - Local slab_v = global->GetHiddenValue(slab_sym); - - if (slab_v.IsEmpty()) { - // No slab currently. Create a new one. - slab = NewSlab(global, wrap->object_); - } else { - // Use existing slab. - Local slab_obj = slab_v->ToObject(); - slab = Buffer::Data(slab_obj); - assert(Buffer::Length(slab_obj) == SLAB_SIZE); - assert(SLAB_SIZE >= slab_used); - - // If less than 64kb is remaining on the slab allocate a new one. - if (SLAB_SIZE - slab_used < 64 * 1024) { - slab = NewSlab(global, wrap->object_); - } else { - wrap->object_->SetHiddenValue(slab_sym, slab_obj); - } - } - - uv_buf_t buf; - buf.base = slab + slab_used; - buf.len = MIN(SLAB_SIZE - slab_used, suggested_size); - - wrap->slab_offset_ = slab_used; - slab_used += buf.len; - - handle_that_last_alloced = reinterpret_cast(handle); - - return buf; + char* buf = slab_allocator.Allocate(wrap->object_, suggested_size); + return uv_buf_init(buf, suggested_size); } @@ -219,34 +162,24 @@ void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread, // uv_close() on the handle. assert(wrap->object_.IsEmpty() == false); - // Remove the reference to the slab to avoid memory leaks; - Local slab_v = wrap->object_->GetHiddenValue(slab_sym); - wrap->object_->SetHiddenValue(slab_sym, v8::Null()); + Local slab = slab_allocator.Shrink(wrap->object_, + buf.base, + nread < 0 ? 0 : nread); if (nread < 0) { - // EOF or Error - if (handle_that_last_alloced == handle) { - slab_used -= buf.len; - } - SetErrno(uv_last_error(uv_default_loop())); MakeCallback(wrap->object_, "onread", 0, NULL); return; } - assert(static_cast(nread) <= buf.len); - - if (handle_that_last_alloced == handle) { - slab_used -= (buf.len - nread); - } - if (nread == 0) return; + assert(static_cast(nread) <= buf.len); int argc = 3; Local argv[4] = { - slab_v, - Integer::New(wrap->slab_offset_), - Integer::New(nread) + slab, + Integer::NewFromUnsigned(buf.base - Buffer::Data(slab)), + Integer::NewFromUnsigned(nread) }; Local pending_obj;