diff --git a/lib/net.js b/lib/net.js index 2604cb5c6b..3b02993cef 100644 --- a/lib/net.js +++ b/lib/net.js @@ -28,7 +28,7 @@ var getaddrinfo = process.getaddrinfo; var needsLookup = process.needsLookup; var EINPROGRESS = process.EINPROGRESS; var ENOENT = process.ENOENT; -var END_OF_FILE = 42; +var END_OF_FILE = 0; function Socket (peerInfo) { process.EventEmitter.call(); @@ -39,6 +39,7 @@ function Socket (peerInfo) { self.recvBuffer = null; self.readWatcher = new IOWatcher(); + self.readWatcher.host = this; self.readWatcher.callback = function () { // If this is the first recv (recvBuffer doesn't exist) or we've used up // most of the recvBuffer, allocate a new one. @@ -61,9 +62,9 @@ function Socket (peerInfo) { } } else { bytesRead = read(self.fd, - self.recvBuffer, - self.recvBuffer.used, - self.recvBuffer.length - self.recvBuffer.used); + self.recvBuffer, + self.recvBuffer.used, + self.recvBuffer.length - self.recvBuffer.used); } debug('bytesRead ' + bytesRead + '\n'); @@ -99,6 +100,7 @@ function Socket (peerInfo) { } }; self.writeWatcher = new IOWatcher(); + self.writeWatcher.host = this; self.writeWatcher.callback = self._doFlush; self.writable = false; @@ -131,6 +133,7 @@ Socket.prototype._allocateNewRecvBuf = function () { var newBufferSize = 1024; // TODO make this adjustable from user API + /* if (toRead) { // Is the extra system call even worth it? var bytesToRead = toRead(self.fd); @@ -145,6 +148,7 @@ Socket.prototype._allocateNewRecvBuf = function () { newBufferSize = 128; } } + */ self.recvBuffer = new process.Buffer(newBufferSize); self.recvBuffer.used = 0; @@ -421,6 +425,7 @@ Socket.prototype.forceClose = function (exception) { this.writeWatcher.stop(); this.readWatcher.stop(); + close(this.fd); debug('close socket ' + this.fd); this.fd = null; @@ -455,6 +460,7 @@ function Server (listener) { } self.watcher = new IOWatcher(); + self.watcher.host = self; self.watcher.callback = function (readable, writeable) { while (self.fd) { var peerInfo = accept(self.fd); diff --git a/src/node_buffer.cc b/src/node_buffer.cc index fa11ae42cf..ee9ce8406f 100644 --- a/src/node_buffer.cc +++ b/src/node_buffer.cc @@ -33,24 +33,15 @@ bool IsBuffer(v8::Handle val) { /* Determines the absolute position for a relative offset */ -static inline size_t buffer_abs_off(buffer *buffer, size_t off) { +size_t buffer_abs_off(buffer *buffer, size_t off) { struct buffer *root = buffer_root(buffer); off += buffer->offset; return MIN(root->length, off); } -static inline void buffer_ref(struct buffer *buffer) { - assert(buffer->root == NULL); - buffer->refs++; -} - - -static inline void buffer_unref(struct buffer *buffer) { - assert(buffer->root == NULL); - assert(buffer->refs > 0); - buffer->refs--; - if (buffer->refs == 0 && buffer->weak) free(buffer); +void buffer_ref(struct buffer *buffer) { + buffer_root(buffer)->refs++; } @@ -69,15 +60,26 @@ static void RootWeakCallback(Persistent value, void *data) struct buffer *buffer = static_cast(data); assert(buffer->root == NULL); // this is the root assert(value == buffer->handle); - buffer->handle.Dispose(); + value.ClearWeak(); if (buffer->refs) { buffer->weak = true; } else { + buffer->handle.Dispose(); free(buffer); } } +void buffer_unref(struct buffer *buffer) { + struct buffer * root = buffer_root(buffer); + assert(root->refs > 0); + root->refs--; + if (root->refs == 0 && root->weak) { + root->handle.MakeWeak(root, RootWeakCallback); + } +} + + static void SliceWeakCallback(Persistent value, void *data) { struct buffer *buffer = static_cast(data); diff --git a/src/node_buffer.h b/src/node_buffer.h index 2c189fd232..122d589d85 100644 --- a/src/node_buffer.h +++ b/src/node_buffer.h @@ -56,6 +56,9 @@ static inline size_t buffer_remaining(struct buffer *buffer, size_t off) { return end - buffer_p(buffer, off); } -} +void buffer_ref(struct buffer *buffer); +void buffer_unref(struct buffer *buffer); + +} // namespace node buffer #endif // NODE_BUFFER diff --git a/src/node_http_parser.cc b/src/node_http_parser.cc index 972e2bad89..5d133318ef 100644 --- a/src/node_http_parser.cc +++ b/src/node_http_parser.cc @@ -19,6 +19,30 @@ // No copying is performed when slicing the buffer, only small reference // allocations. + #include + #include + #include + + /* Obtain a backtrace and print it to stdout. */ + void + print_trace (void) + { + void *array[10]; + size_t size; + char **strings; + size_t i; + + size = backtrace (array, 10); + strings = backtrace_symbols (array, size); + + printf ("Obtained %zd stack frames.\n", size); + + for (i = 0; i < size; i++) + printf ("%s\n", strings[i]); + + free (strings); + } + namespace node { using namespace v8; @@ -93,6 +117,7 @@ static Persistent should_keep_alive_sym; , Integer::New(length) \ }; \ Local ret = cb->Call(parser->handle_, 3, argv); \ + assert(parser->buffer_); \ return ret.IsEmpty() ? -1 : 0; \ } @@ -141,6 +166,10 @@ class Parser : public ObjectWrap { parser_.data = this; } + ~Parser() { + assert(buffer_ == NULL && "Destroying a parser while it's parsing"); + } + DEFINE_HTTP_CB(on_message_begin) DEFINE_HTTP_CB(on_message_complete) @@ -215,6 +244,7 @@ class Parser : public ObjectWrap { Parser *parser = ObjectWrap::Unwrap(args.This()); if (parser->buffer_) { + print_trace(); return ThrowException(Exception::TypeError( String::New("Already parsing a buffer"))); } @@ -243,9 +273,13 @@ class Parser : public ObjectWrap { // Assign 'buffer_' while we parse. The callbacks will access that varible. parser->buffer_ = buffer; + buffer_ref(parser->buffer_); + size_t nparsed = http_parser_execute(&(parser->parser_), buffer_p(buffer, off), len); + buffer_unref(parser->buffer_); + // Unassign the 'buffer_' variable assert(parser->buffer_); parser->buffer_ = NULL; @@ -266,6 +300,17 @@ class Parser : public ObjectWrap { return scope.Close(nparsed_obj); } + static Handle ExecuteEOF(const Arguments& args) { + HandleScope scope; + + Parser *parser = ObjectWrap::Unwrap(args.This()); + + assert(!parser->buffer_); + http_parser_execute(&parser->parser_, NULL, 0); + + return Undefined(); + } + private: @@ -282,6 +327,7 @@ void InitHttpParser(Handle target) { //t->SetClassName(String::NewSymbol("HTTPParser")); NODE_SET_PROTOTYPE_METHOD(t, "execute", Parser::Execute); + NODE_SET_PROTOTYPE_METHOD(t, "executeEOF", Parser::ExecuteEOF); target->Set(String::NewSymbol("HTTPParser"), t->GetFunction()); diff --git a/src/node_io_watcher.cc b/src/node_io_watcher.cc index 61e49f6bc5..2f05c7fd4e 100644 --- a/src/node_io_watcher.cc +++ b/src/node_io_watcher.cc @@ -59,9 +59,8 @@ void IOWatcher::Callback(EV_P_ ev_io *w, int revents) { // -// var io = new process.IOWatcher(function (readable, writable) { -// -// }); +// var io = new process.IOWatcher(); +// io.callback = function (readable, writable) { ... }; // io.set(fd, true, false); // io.start(); // @@ -69,6 +68,8 @@ Handle IOWatcher::New(const Arguments& args) { HandleScope scope; IOWatcher *s = new IOWatcher(); + + s->Wrap(args.This()); return args.This(); @@ -78,11 +79,10 @@ Handle IOWatcher::New(const Arguments& args) { Handle IOWatcher::Start(const Arguments& args) { HandleScope scope; - IOWatcher *io = ObjectWrap::Unwrap(args.Holder()); + IOWatcher *io = Unwrap(args.Holder()); ev_io_start(EV_DEFAULT_UC_ &io->watcher_); - - io->Ref(); + assert(ev_is_active(&io->watcher_)); return Undefined(); } @@ -90,7 +90,7 @@ Handle IOWatcher::Start(const Arguments& args) { Handle IOWatcher::Set(const Arguments& args) { HandleScope scope; - IOWatcher *io = ObjectWrap::Unwrap(args.Holder()); + IOWatcher *io = Unwrap(args.Holder()); if (!args[0]->IsInt32()) { return ThrowException(Exception::TypeError( @@ -122,16 +122,16 @@ Handle IOWatcher::Set(const Arguments& args) { Handle IOWatcher::Stop(const Arguments& args) { HandleScope scope; - IOWatcher *io = ObjectWrap::Unwrap(args.Holder()); + IOWatcher *io = Unwrap(args.This()); io->Stop(); return Undefined(); } void IOWatcher::Stop () { - if (watcher_.active) { + if (ev_is_active(&watcher_)) { ev_io_stop(EV_DEFAULT_UC_ &watcher_); - Unref(); + assert(!ev_is_active(&watcher_)); } } diff --git a/src/node_io_watcher.h b/src/node_io_watcher.h index 5e73177076..d1d9d2dc2a 100644 --- a/src/node_io_watcher.h +++ b/src/node_io_watcher.h @@ -7,20 +7,20 @@ namespace node { -class IOWatcher : ObjectWrap { +class IOWatcher { public: static void Initialize(v8::Handle target); protected: static v8::Persistent constructor_template; - IOWatcher() : ObjectWrap() { + IOWatcher() { ev_init(&watcher_, IOWatcher::Callback); watcher_.data = this; } ~IOWatcher() { - ev_io_stop(EV_DEFAULT_UC_ &watcher_); + assert(!ev_is_active(&watcher_)); } static v8::Handle New(const v8::Arguments& args); @@ -28,12 +28,47 @@ class IOWatcher : ObjectWrap { static v8::Handle Stop(const v8::Arguments& args); static v8::Handle Set(const v8::Arguments& args); + inline void Wrap(v8::Handle handle) { + assert(handle_.IsEmpty()); + assert(handle->InternalFieldCount() > 0); + handle_ = v8::Persistent::New(handle); + handle_->SetInternalField(0, v8::External::New(this)); + MakeWeak(); + } + + inline void MakeWeak(void) { + handle_.MakeWeak(this, WeakCallback); + } + + + private: static void Callback(EV_P_ ev_io *watcher, int revents); + static void WeakCallback (v8::Persistent value, void *data) + { + IOWatcher *io = static_cast(data); + assert(value == io->handle_); + if (!ev_is_active(&io->watcher_)) { + value.Dispose(); + delete io; + } else { + //value.ClearWeak(); + io->MakeWeak(); + } + } + + static IOWatcher* Unwrap(v8::Handle handle) { + assert(!handle.IsEmpty()); + assert(handle->InternalFieldCount() > 0); + return static_cast(v8::Handle::Cast( + handle->GetInternalField(0))->Value()); + } + void Stop(); ev_io watcher_; + v8::Persistent handle_; }; } // namespace node diff --git a/src/node_object_wrap.h b/src/node_object_wrap.h index 6656664bdf..c392055a63 100644 --- a/src/node_object_wrap.h +++ b/src/node_object_wrap.h @@ -53,6 +53,7 @@ class ObjectWrap { assert(!handle_.IsEmpty()); assert(handle_.IsWeak()); refs_++; + MakeWeak(); } /* Unref() marks an object as detached from the event loop. This is its