Browse Source

HTTP works somewhat on net2 now

However it's not working very well: Hitting a 'hello world' server with many
requests (ab -t 60 -c 10) will cause it to crash with the following error.

  Obtained 3 stack frames.

  ./node(_Z11print_tracev+0x1c) [0x80d1b3c]
  ./node(_ZN4node6Parser7ExecuteERKN2v89ArgumentsE+0x69) [0x80d3759]
  ./node [0x811f44b]
  TypeError: Already parsing a buffer
      at Socket.<anonymous> (/home/ryan/projects/node/lib/http2.js:393:20)
      at IOWatcher.callback (/home/ryan/projects/node/lib/net.js:81:12)
      at node.js:985:9
      at node.js:989:1
v0.7.4-release
Ryan Dahl 15 years ago
parent
commit
741e3fa91b
  1. 8
      lib/net.js
  2. 28
      src/node_buffer.cc
  3. 5
      src/node_buffer.h
  4. 46
      src/node_http_parser.cc
  5. 20
      src/node_io_watcher.cc
  6. 41
      src/node_io_watcher.h
  7. 1
      src/node_object_wrap.h

8
lib/net.js

@ -28,7 +28,7 @@ var getaddrinfo = process.getaddrinfo;
var needsLookup = process.needsLookup; var needsLookup = process.needsLookup;
var EINPROGRESS = process.EINPROGRESS; var EINPROGRESS = process.EINPROGRESS;
var ENOENT = process.ENOENT; var ENOENT = process.ENOENT;
var END_OF_FILE = 42; var END_OF_FILE = 0;
function Socket (peerInfo) { function Socket (peerInfo) {
process.EventEmitter.call(); process.EventEmitter.call();
@ -39,6 +39,7 @@ function Socket (peerInfo) {
self.recvBuffer = null; self.recvBuffer = null;
self.readWatcher = new IOWatcher(); self.readWatcher = new IOWatcher();
self.readWatcher.host = this;
self.readWatcher.callback = function () { self.readWatcher.callback = function () {
// If this is the first recv (recvBuffer doesn't exist) or we've used up // If this is the first recv (recvBuffer doesn't exist) or we've used up
// most of the recvBuffer, allocate a new one. // most of the recvBuffer, allocate a new one.
@ -99,6 +100,7 @@ function Socket (peerInfo) {
} }
}; };
self.writeWatcher = new IOWatcher(); self.writeWatcher = new IOWatcher();
self.writeWatcher.host = this;
self.writeWatcher.callback = self._doFlush; self.writeWatcher.callback = self._doFlush;
self.writable = false; self.writable = false;
@ -131,6 +133,7 @@ Socket.prototype._allocateNewRecvBuf = function () {
var newBufferSize = 1024; // TODO make this adjustable from user API var newBufferSize = 1024; // TODO make this adjustable from user API
/*
if (toRead) { if (toRead) {
// Is the extra system call even worth it? // Is the extra system call even worth it?
var bytesToRead = toRead(self.fd); var bytesToRead = toRead(self.fd);
@ -145,6 +148,7 @@ Socket.prototype._allocateNewRecvBuf = function () {
newBufferSize = 128; newBufferSize = 128;
} }
} }
*/
self.recvBuffer = new process.Buffer(newBufferSize); self.recvBuffer = new process.Buffer(newBufferSize);
self.recvBuffer.used = 0; self.recvBuffer.used = 0;
@ -421,6 +425,7 @@ Socket.prototype.forceClose = function (exception) {
this.writeWatcher.stop(); this.writeWatcher.stop();
this.readWatcher.stop(); this.readWatcher.stop();
close(this.fd); close(this.fd);
debug('close socket ' + this.fd); debug('close socket ' + this.fd);
this.fd = null; this.fd = null;
@ -455,6 +460,7 @@ function Server (listener) {
} }
self.watcher = new IOWatcher(); self.watcher = new IOWatcher();
self.watcher.host = self;
self.watcher.callback = function (readable, writeable) { self.watcher.callback = function (readable, writeable) {
while (self.fd) { while (self.fd) {
var peerInfo = accept(self.fd); var peerInfo = accept(self.fd);

28
src/node_buffer.cc

@ -33,24 +33,15 @@ bool IsBuffer(v8::Handle<v8::Value> val) {
/* Determines the absolute position for a relative offset */ /* Determines the absolute position for a relative offset */
static inline size_t buffer_abs_off(buffer *buffer, size_t off) { size_t buffer_abs_off(buffer *buffer, size_t off) {
struct buffer *root = buffer_root(buffer); struct buffer *root = buffer_root(buffer);
off += buffer->offset; off += buffer->offset;
return MIN(root->length, off); return MIN(root->length, off);
} }
static inline void buffer_ref(struct buffer *buffer) { void buffer_ref(struct buffer *buffer) {
assert(buffer->root == NULL); buffer_root(buffer)->refs++;
buffer->refs++;
}
static inline void buffer_unref(struct buffer *buffer) {
assert(buffer->root == NULL);
assert(buffer->refs > 0);
buffer->refs--;
if (buffer->refs == 0 && buffer->weak) free(buffer);
} }
@ -69,15 +60,26 @@ static void RootWeakCallback(Persistent<Value> value, void *data)
struct buffer *buffer = static_cast<struct buffer*>(data); struct buffer *buffer = static_cast<struct buffer*>(data);
assert(buffer->root == NULL); // this is the root assert(buffer->root == NULL); // this is the root
assert(value == buffer->handle); assert(value == buffer->handle);
buffer->handle.Dispose(); value.ClearWeak();
if (buffer->refs) { if (buffer->refs) {
buffer->weak = true; buffer->weak = true;
} else { } else {
buffer->handle.Dispose();
free(buffer); free(buffer);
} }
} }
void buffer_unref(struct buffer *buffer) {
struct buffer * root = buffer_root(buffer);
assert(root->refs > 0);
root->refs--;
if (root->refs == 0 && root->weak) {
root->handle.MakeWeak(root, RootWeakCallback);
}
}
static void SliceWeakCallback(Persistent<Value> value, void *data) static void SliceWeakCallback(Persistent<Value> value, void *data)
{ {
struct buffer *buffer = static_cast<struct buffer*>(data); struct buffer *buffer = static_cast<struct buffer*>(data);

5
src/node_buffer.h

@ -56,6 +56,9 @@ static inline size_t buffer_remaining(struct buffer *buffer, size_t off) {
return end - buffer_p(buffer, off); return end - buffer_p(buffer, off);
} }
} void buffer_ref(struct buffer *buffer);
void buffer_unref(struct buffer *buffer);
} // namespace node buffer
#endif // NODE_BUFFER #endif // NODE_BUFFER

46
src/node_http_parser.cc

@ -19,6 +19,30 @@
// No copying is performed when slicing the buffer, only small reference // No copying is performed when slicing the buffer, only small reference
// allocations. // allocations.
#include <execinfo.h>
#include <stdio.h>
#include <stdlib.h>
/* Obtain a backtrace and print it to stdout. */
void
print_trace (void)
{
void *array[10];
size_t size;
char **strings;
size_t i;
size = backtrace (array, 10);
strings = backtrace_symbols (array, size);
printf ("Obtained %zd stack frames.\n", size);
for (i = 0; i < size; i++)
printf ("%s\n", strings[i]);
free (strings);
}
namespace node { namespace node {
using namespace v8; using namespace v8;
@ -93,6 +117,7 @@ static Persistent<String> should_keep_alive_sym;
, Integer::New(length) \ , Integer::New(length) \
}; \ }; \
Local<Value> ret = cb->Call(parser->handle_, 3, argv); \ Local<Value> ret = cb->Call(parser->handle_, 3, argv); \
assert(parser->buffer_); \
return ret.IsEmpty() ? -1 : 0; \ return ret.IsEmpty() ? -1 : 0; \
} }
@ -141,6 +166,10 @@ class Parser : public ObjectWrap {
parser_.data = this; parser_.data = this;
} }
~Parser() {
assert(buffer_ == NULL && "Destroying a parser while it's parsing");
}
DEFINE_HTTP_CB(on_message_begin) DEFINE_HTTP_CB(on_message_begin)
DEFINE_HTTP_CB(on_message_complete) DEFINE_HTTP_CB(on_message_complete)
@ -215,6 +244,7 @@ class Parser : public ObjectWrap {
Parser *parser = ObjectWrap::Unwrap<Parser>(args.This()); Parser *parser = ObjectWrap::Unwrap<Parser>(args.This());
if (parser->buffer_) { if (parser->buffer_) {
print_trace();
return ThrowException(Exception::TypeError( return ThrowException(Exception::TypeError(
String::New("Already parsing a buffer"))); String::New("Already parsing a buffer")));
} }
@ -243,9 +273,13 @@ class Parser : public ObjectWrap {
// Assign 'buffer_' while we parse. The callbacks will access that varible. // Assign 'buffer_' while we parse. The callbacks will access that varible.
parser->buffer_ = buffer; parser->buffer_ = buffer;
buffer_ref(parser->buffer_);
size_t nparsed = size_t nparsed =
http_parser_execute(&(parser->parser_), buffer_p(buffer, off), len); http_parser_execute(&(parser->parser_), buffer_p(buffer, off), len);
buffer_unref(parser->buffer_);
// Unassign the 'buffer_' variable // Unassign the 'buffer_' variable
assert(parser->buffer_); assert(parser->buffer_);
parser->buffer_ = NULL; parser->buffer_ = NULL;
@ -266,6 +300,17 @@ class Parser : public ObjectWrap {
return scope.Close(nparsed_obj); return scope.Close(nparsed_obj);
} }
static Handle<Value> ExecuteEOF(const Arguments& args) {
HandleScope scope;
Parser *parser = ObjectWrap::Unwrap<Parser>(args.This());
assert(!parser->buffer_);
http_parser_execute(&parser->parser_, NULL, 0);
return Undefined();
}
private: private:
@ -282,6 +327,7 @@ void InitHttpParser(Handle<Object> target) {
//t->SetClassName(String::NewSymbol("HTTPParser")); //t->SetClassName(String::NewSymbol("HTTPParser"));
NODE_SET_PROTOTYPE_METHOD(t, "execute", Parser::Execute); NODE_SET_PROTOTYPE_METHOD(t, "execute", Parser::Execute);
NODE_SET_PROTOTYPE_METHOD(t, "executeEOF", Parser::ExecuteEOF);
target->Set(String::NewSymbol("HTTPParser"), t->GetFunction()); target->Set(String::NewSymbol("HTTPParser"), t->GetFunction());

20
src/node_io_watcher.cc

@ -59,9 +59,8 @@ void IOWatcher::Callback(EV_P_ ev_io *w, int revents) {
// //
// var io = new process.IOWatcher(function (readable, writable) { // var io = new process.IOWatcher();
// // io.callback = function (readable, writable) { ... };
// });
// io.set(fd, true, false); // io.set(fd, true, false);
// io.start(); // io.start();
// //
@ -69,6 +68,8 @@ Handle<Value> IOWatcher::New(const Arguments& args) {
HandleScope scope; HandleScope scope;
IOWatcher *s = new IOWatcher(); IOWatcher *s = new IOWatcher();
s->Wrap(args.This()); s->Wrap(args.This());
return args.This(); return args.This();
@ -78,11 +79,10 @@ Handle<Value> IOWatcher::New(const Arguments& args) {
Handle<Value> IOWatcher::Start(const Arguments& args) { Handle<Value> IOWatcher::Start(const Arguments& args) {
HandleScope scope; HandleScope scope;
IOWatcher *io = ObjectWrap::Unwrap<IOWatcher>(args.Holder()); IOWatcher *io = Unwrap(args.Holder());
ev_io_start(EV_DEFAULT_UC_ &io->watcher_); ev_io_start(EV_DEFAULT_UC_ &io->watcher_);
assert(ev_is_active(&io->watcher_));
io->Ref();
return Undefined(); return Undefined();
} }
@ -90,7 +90,7 @@ Handle<Value> IOWatcher::Start(const Arguments& args) {
Handle<Value> IOWatcher::Set(const Arguments& args) { Handle<Value> IOWatcher::Set(const Arguments& args) {
HandleScope scope; HandleScope scope;
IOWatcher *io = ObjectWrap::Unwrap<IOWatcher>(args.Holder()); IOWatcher *io = Unwrap(args.Holder());
if (!args[0]->IsInt32()) { if (!args[0]->IsInt32()) {
return ThrowException(Exception::TypeError( return ThrowException(Exception::TypeError(
@ -122,16 +122,16 @@ Handle<Value> IOWatcher::Set(const Arguments& args) {
Handle<Value> IOWatcher::Stop(const Arguments& args) { Handle<Value> IOWatcher::Stop(const Arguments& args) {
HandleScope scope; HandleScope scope;
IOWatcher *io = ObjectWrap::Unwrap<IOWatcher>(args.Holder()); IOWatcher *io = Unwrap(args.This());
io->Stop(); io->Stop();
return Undefined(); return Undefined();
} }
void IOWatcher::Stop () { void IOWatcher::Stop () {
if (watcher_.active) { if (ev_is_active(&watcher_)) {
ev_io_stop(EV_DEFAULT_UC_ &watcher_); ev_io_stop(EV_DEFAULT_UC_ &watcher_);
Unref(); assert(!ev_is_active(&watcher_));
} }
} }

41
src/node_io_watcher.h

@ -7,20 +7,20 @@
namespace node { namespace node {
class IOWatcher : ObjectWrap { class IOWatcher {
public: public:
static void Initialize(v8::Handle<v8::Object> target); static void Initialize(v8::Handle<v8::Object> target);
protected: protected:
static v8::Persistent<v8::FunctionTemplate> constructor_template; static v8::Persistent<v8::FunctionTemplate> constructor_template;
IOWatcher() : ObjectWrap() { IOWatcher() {
ev_init(&watcher_, IOWatcher::Callback); ev_init(&watcher_, IOWatcher::Callback);
watcher_.data = this; watcher_.data = this;
} }
~IOWatcher() { ~IOWatcher() {
ev_io_stop(EV_DEFAULT_UC_ &watcher_); assert(!ev_is_active(&watcher_));
} }
static v8::Handle<v8::Value> New(const v8::Arguments& args); static v8::Handle<v8::Value> New(const v8::Arguments& args);
@ -28,12 +28,47 @@ class IOWatcher : ObjectWrap {
static v8::Handle<v8::Value> Stop(const v8::Arguments& args); static v8::Handle<v8::Value> Stop(const v8::Arguments& args);
static v8::Handle<v8::Value> Set(const v8::Arguments& args); static v8::Handle<v8::Value> Set(const v8::Arguments& args);
inline void Wrap(v8::Handle<v8::Object> handle) {
assert(handle_.IsEmpty());
assert(handle->InternalFieldCount() > 0);
handle_ = v8::Persistent<v8::Object>::New(handle);
handle_->SetInternalField(0, v8::External::New(this));
MakeWeak();
}
inline void MakeWeak(void) {
handle_.MakeWeak(this, WeakCallback);
}
private: private:
static void Callback(EV_P_ ev_io *watcher, int revents); static void Callback(EV_P_ ev_io *watcher, int revents);
static void WeakCallback (v8::Persistent<v8::Value> value, void *data)
{
IOWatcher *io = static_cast<IOWatcher*>(data);
assert(value == io->handle_);
if (!ev_is_active(&io->watcher_)) {
value.Dispose();
delete io;
} else {
//value.ClearWeak();
io->MakeWeak();
}
}
static IOWatcher* Unwrap(v8::Handle<v8::Object> handle) {
assert(!handle.IsEmpty());
assert(handle->InternalFieldCount() > 0);
return static_cast<IOWatcher*>(v8::Handle<v8::External>::Cast(
handle->GetInternalField(0))->Value());
}
void Stop(); void Stop();
ev_io watcher_; ev_io watcher_;
v8::Persistent<v8::Object> handle_;
}; };
} // namespace node } // namespace node

1
src/node_object_wrap.h

@ -53,6 +53,7 @@ class ObjectWrap {
assert(!handle_.IsEmpty()); assert(!handle_.IsEmpty());
assert(handle_.IsWeak()); assert(handle_.IsWeak());
refs_++; refs_++;
MakeWeak();
} }
/* Unref() marks an object as detached from the event loop. This is its /* Unref() marks an object as detached from the event loop. This is its

Loading…
Cancel
Save