From 86214c9f164c152b57b140d33deefa48eff50920 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Tue, 14 Jun 2011 18:02:58 +0200 Subject: [PATCH] tcp_wrap: Initial wrap of uv_read_start --- src/tcp_wrap.cc | 131 ++++++++++++++++++++++++++++ test/common.js | 4 + test/simple/test-tcp-wrap-listen.js | 56 ++++++++++++ 3 files changed, 191 insertions(+) create mode 100644 test/simple/test-tcp-wrap-listen.js diff --git a/src/tcp_wrap.cc b/src/tcp_wrap.cc index 4c589c896d..444dd43208 100644 --- a/src/tcp_wrap.cc +++ b/src/tcp_wrap.cc @@ -1,6 +1,10 @@ #include #include #include +#include + +#define SLAB_SIZE (1024 * 1024) +#define MIN(a, b) ((a) < (b) ? (a) : (b)) // Rules: // @@ -50,6 +54,12 @@ using v8::Arguments; using v8::Integer; static Persistent constructor; +static size_t slab_used; +static uv_tcp_t* handle_that_last_alloced; + +static Persistent slab_sym; +static Persistent offset_sym; +static Persistent length_sym; class TCPWrap { public: @@ -64,10 +74,16 @@ class TCPWrap { NODE_SET_PROTOTYPE_METHOD(t, "bind", Bind); NODE_SET_PROTOTYPE_METHOD(t, "listen", Listen); + NODE_SET_PROTOTYPE_METHOD(t, "readStart", ReadStart); + NODE_SET_PROTOTYPE_METHOD(t, "readStop", ReadStop); NODE_SET_PROTOTYPE_METHOD(t, "close", Close); constructor = Persistent::New(t->GetFunction()); + slab_sym = Persistent::New(String::NewSymbol("slab")); + offset_sym = Persistent::New(String::NewSymbol("offset")); + length_sym = Persistent::New(String::NewSymbol("length")); + target->Set(String::NewSymbol("TCP"), constructor); } @@ -170,6 +186,120 @@ class TCPWrap { MakeCallback(wrap->object_, "onconnection", 1, argv); } + static Handle ReadStart(const Arguments& args) { + HandleScope scope; + + UNWRAP + + int r = uv_read_start(&wrap->handle_, OnAlloc, OnRead); + + // Error starting the tcp. + if (r) SetErrno(uv_last_error().code); + + return scope.Close(Integer::New(r)); + } + + static Handle ReadStop(const Arguments& args) { + HandleScope scope; + + UNWRAP + + int r = uv_read_stop(&wrap->handle_); + + // Error starting the tcp. + if (r) SetErrno(uv_last_error().code); + + return scope.Close(Integer::New(r)); + } + + static inline char* NewSlab(Handle global, Handle wrap_obj) { + Buffer* b = Buffer::New(SLAB_SIZE); + global->SetHiddenValue(slab_sym, b->handle_); + assert(Buffer::Length(b) == SLAB_SIZE); + slab_used = 0; + wrap_obj->SetHiddenValue(slab_sym, b->handle_); + return Buffer::Data(b); + } + + static uv_buf_t OnAlloc(uv_tcp_t* handle, size_t suggested_size) { + HandleScope scope; + + TCPWrap* wrap = static_cast(handle->data); + assert(&wrap->handle_ == handle); + + char* slab = NULL; + + Handle global = Context::GetCurrent()->Global(); + Local slab_v = global->GetHiddenValue(slab_sym); + + if (slab_v.IsEmpty()) { + // No slab currently. Create a new one. + slab = NewSlab(global, wrap->object_); + } else { + // Use existing slab. + Local slab_obj = slab_v->ToObject(); + slab = Buffer::Data(slab_obj); + assert(Buffer::Length(slab_obj) == SLAB_SIZE); + assert(SLAB_SIZE > slab_used); + + // If less than 64kb is remaining on the slab allocate a new one. + if (SLAB_SIZE - slab_used < 64 * 1024) { + slab = NewSlab(global, wrap->object_); + } else { + wrap->object_->SetHiddenValue(slab_sym, slab_obj); + } + } + + uv_buf_t buf; + buf.base = slab + slab_used; + buf.len = MIN(SLAB_SIZE - slab_used, suggested_size); + + wrap->slab_offset_ = slab_used; + slab_used += buf.len; + + handle_that_last_alloced = handle; + + return buf; + } + + static void OnRead(uv_tcp_t* handle, ssize_t nread, uv_buf_t buf) { + HandleScope scope; + + TCPWrap* wrap = static_cast(handle->data); + + // Remove the reference to the slab to avoid memory leaks; + Local slab_v = wrap->object_->GetHiddenValue(slab_sym); + wrap->object_->SetHiddenValue(slab_sym, v8::Null()); + + if (nread < 0) { + // EOF or Error + if (handle_that_last_alloced == handle) { + slab_used -= buf.len; + } + + SetErrno(uv_last_error().code); + MakeCallback(wrap->object_, "onread", 0, NULL); + return; + } + + assert(nread <= buf.len); + + if (handle_that_last_alloced == handle) { + slab_used -= (buf.len - nread); + } + + if (nread > 0) { + Local slice = Object::New(); + slice->Set(slab_sym, slab_v); + slice->Set(offset_sym, Integer::New(wrap->slab_offset_)); + slice->Set(length_sym, Integer::New(nread)); + + Local argv[1] = { slice }; + MakeCallback(wrap->object_, "onread", 1, argv); + } + } + + // TODO: share me? static Handle Close(const Arguments& args) { HandleScope scope; @@ -190,6 +320,7 @@ class TCPWrap { uv_tcp_t handle_; Persistent object_; + size_t slab_offset_; }; diff --git a/test/common.js b/test/common.js index 044f74a93f..ba9663b566 100644 --- a/test/common.js +++ b/test/common.js @@ -60,6 +60,10 @@ process.on('exit', function() { process, global]; + if (global.errno) { + knownGlobals.push(errno); + } + if (global.gc) { knownGlobals.push(gc); } diff --git a/test/simple/test-tcp-wrap-listen.js b/test/simple/test-tcp-wrap-listen.js new file mode 100644 index 0000000000..c2e8a00b55 --- /dev/null +++ b/test/simple/test-tcp-wrap-listen.js @@ -0,0 +1,56 @@ +var common = require('../common'); +var assert = require('assert'); + +var TCP = process.binding('tcp_wrap').TCP; + +var server = new TCP(); + +var r = server.bind("0.0.0.0", common.PORT); +assert.equal(0, r); + +server.listen(128); + +var slice, sliceCount = 0, eofCount = 0; + +server.onconnection = function(client) { + console.log("got connection"); + + client.readStart(); + client.onread = function(s) { + if (s) { + slice = s; + sliceCount++; + } else { + console.log("eof"); + client.close(); + server.close(); + eofCount++; + } + }; +}; + + + +var net = require('net'); + +var c = net.createConnection(common.PORT); +c.on('connect', function() { + c.end("hello world"); +}); + +c.on('close', function() { + console.error("client closed"); +}); + +process.on('exit', function() { + assert.equal(1, sliceCount); + assert.equal(1, eofCount); + assert.ok(slice); + assert.ok(slice.slab); + + var s = slice.slab.slice(slice.offset, slice.length, "ascii"); + assert.equal("hello world", s); +}); + + +