Browse Source

stream_wrap: use new slab allocator

v0.9.1-release
Ben Noordhuis 13 years ago
parent
commit
1e13a2d242
  1. 97
      src/stream_wrap.cc

97
src/stream_wrap.cc

@ -22,6 +22,7 @@
#include "node.h"
#include "node_buffer.h"
#include "handle_wrap.h"
#include "slab_allocator.h"
#include "stream_wrap.h"
#include "pipe_wrap.h"
#include "tcp_wrap.h"
@ -29,14 +30,11 @@
#include <stdlib.h> // abort()
namespace node {
#define SLAB_SIZE (1024 * 1024)
#define MIN(a, b) ((a) < (b) ? (a) : (b))
namespace node {
using v8::Object;
using v8::Handle;
using v8::Local;
@ -69,26 +67,20 @@ typedef class ReqWrap<uv_shutdown_t> ShutdownWrap;
typedef class ReqWrap<uv_write_t> WriteWrap;
static size_t slab_used;
static uv_stream_t* handle_that_last_alloced;
static Persistent<String> slab_sym;
static Persistent<String> buffer_sym;
static Persistent<String> write_queue_size_sym;
static SlabAllocator slab_allocator(SLAB_SIZE);
static bool initialized;
void StreamWrap::Initialize(Handle<Object> target) {
if (initialized) {
return;
} else {
initialized = true;
}
if (initialized) return;
initialized = true;
HandleScope scope;
HandleWrap::Initialize(target);
slab_sym = Persistent<String>::New(String::NewSymbol("slab"));
buffer_sym = Persistent<String>::New(String::NewSymbol("buffer"));
write_queue_size_sym =
Persistent<String>::New(String::NewSymbol("writeQueueSize"));
@ -152,60 +144,11 @@ Handle<Value> StreamWrap::ReadStop(const Arguments& args) {
}
char* StreamWrap::NewSlab(Handle<Object> global,
Handle<Object> wrap_obj) {
HandleScope scope;
Local<Value> arg = Integer::NewFromUnsigned(SLAB_SIZE);
Local<Object> b = Buffer::constructor_template->GetFunction()->
NewInstance(1, &arg);
if (b.IsEmpty()) return NULL;
global->SetHiddenValue(slab_sym, b);
assert(Buffer::Length(b) == SLAB_SIZE);
slab_used = 0;
wrap_obj->SetHiddenValue(slab_sym, b);
return Buffer::Data(b);
}
uv_buf_t StreamWrap::OnAlloc(uv_handle_t* handle, size_t suggested_size) {
HandleScope scope;
StreamWrap* wrap = static_cast<StreamWrap*>(handle->data);
assert(wrap->stream_ == reinterpret_cast<uv_stream_t*>(handle));
char* slab = NULL;
Handle<Object> global = Context::GetCurrent()->Global();
Local<Value> slab_v = global->GetHiddenValue(slab_sym);
if (slab_v.IsEmpty()) {
// No slab currently. Create a new one.
slab = NewSlab(global, wrap->object_);
} else {
// Use existing slab.
Local<Object> slab_obj = slab_v->ToObject();
slab = Buffer::Data(slab_obj);
assert(Buffer::Length(slab_obj) == SLAB_SIZE);
assert(SLAB_SIZE >= slab_used);
// If less than 64kb is remaining on the slab allocate a new one.
if (SLAB_SIZE - slab_used < 64 * 1024) {
slab = NewSlab(global, wrap->object_);
} else {
wrap->object_->SetHiddenValue(slab_sym, slab_obj);
}
}
uv_buf_t buf;
buf.base = slab + slab_used;
buf.len = MIN(SLAB_SIZE - slab_used, suggested_size);
wrap->slab_offset_ = slab_used;
slab_used += buf.len;
handle_that_last_alloced = reinterpret_cast<uv_stream_t*>(handle);
return buf;
char* buf = slab_allocator.Allocate(wrap->object_, suggested_size);
return uv_buf_init(buf, suggested_size);
}
@ -219,34 +162,24 @@ void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread,
// uv_close() on the handle.
assert(wrap->object_.IsEmpty() == false);
// Remove the reference to the slab to avoid memory leaks;
Local<Value> slab_v = wrap->object_->GetHiddenValue(slab_sym);
wrap->object_->SetHiddenValue(slab_sym, v8::Null());
Local<Object> slab = slab_allocator.Shrink(wrap->object_,
buf.base,
nread < 0 ? 0 : nread);
if (nread < 0) {
// EOF or Error
if (handle_that_last_alloced == handle) {
slab_used -= buf.len;
}
SetErrno(uv_last_error(uv_default_loop()));
MakeCallback(wrap->object_, "onread", 0, NULL);
return;
}
assert(static_cast<size_t>(nread) <= buf.len);
if (handle_that_last_alloced == handle) {
slab_used -= (buf.len - nread);
}
if (nread == 0) return;
assert(static_cast<size_t>(nread) <= buf.len);
int argc = 3;
Local<Value> argv[4] = {
slab_v,
Integer::New(wrap->slab_offset_),
Integer::New(nread)
slab,
Integer::NewFromUnsigned(buf.base - Buffer::Data(slab)),
Integer::NewFromUnsigned(nread)
};
Local<Object> pending_obj;

Loading…
Cancel
Save