Browse Source

http_parser: consume StreamBase instance

Consume StreamBase instance and operate on incoming data directly
without allocating Buffer instances. Improves performance.

PR-URL: https://github.com/nodejs/node/pull/2355
Reviewed-By: Trevor Norris <trev.norris@gmail.com>
v3.x
Fedor Indutny 10 years ago
committed by Rod Vagg
parent
commit
1bc446863f
  1. 53
      lib/_http_server.js
  2. 11
      src/env-inl.h
  3. 5
      src/env.h
  4. 204
      src/node_http_parser.cc
  5. 30
      test/parallel/test-http-server-unconsume.js

53
lib/_http_server.js

@ -79,6 +79,8 @@ const STATUS_CODES = exports.STATUS_CODES = {
511 : 'Network Authentication Required' // RFC 6585
};
const kOnExecute = HTTPParser.kOnExecute | 0;
function ServerResponse(req) {
OutgoingMessage.call(this);
@ -317,6 +319,21 @@ function connectionListener(socket) {
socket.on('end', socketOnEnd);
socket.on('data', socketOnData);
// We are consuming socket, so it won't get any actual data
socket.on('resume', onSocketResume);
socket.on('pause', onSocketPause);
socket.on('drain', socketOnDrain);
// Override on to unconsume on `data`, `readable` listeners
socket.on = socketOnWrap;
var external = socket._handle._externalStream;
if (external)
parser.consume(external);
external = null;
parser[kOnExecute] = onParserExecute;
// TODO(isaacs): Move all these functions out of here
function socketOnError(e) {
self.emit('clientError', e, this);
@ -326,6 +343,16 @@ function connectionListener(socket) {
assert(!socket._paused);
debug('SERVER socketOnData %d', d.length);
var ret = parser.execute(d);
onParserExecuteCommon(ret, d);
}
function onParserExecute(ret, d) {
debug('SERVER socketOnParserExecute %d', ret);
onParserExecuteCommon(ret, undefined);
}
function onParserExecuteCommon(ret, d) {
if (ret instanceof Error) {
debug('parse error');
socket.destroy(ret);
@ -335,9 +362,13 @@ function connectionListener(socket) {
var req = parser.incoming;
debug('SERVER upgrade or connect', req.method);
if (!d)
d = parser.getCurrentBuffer();
socket.removeListener('data', socketOnData);
socket.removeListener('end', socketOnEnd);
socket.removeListener('close', serverSocketCloseListener);
parser.unconsume(socket._handle._externalStream);
parser.finish();
freeParser(parser, req, null);
parser = null;
@ -400,7 +431,6 @@ function connectionListener(socket) {
socket.resume();
}
}
socket.on('drain', socketOnDrain);
function parserOnIncoming(req, shouldKeepAlive) {
incoming.push(req);
@ -480,3 +510,24 @@ function connectionListener(socket) {
}
}
exports._connectionListener = connectionListener;
function onSocketResume() {
this._handle.readStart();
}
function onSocketPause() {
this._handle.readStop();
}
function socketOnWrap(ev, fn) {
var res = net.Socket.prototype.on.call(this, ev, fn);
if (!this.parser) {
this.on = net.Socket.prototype.on;
return res;
}
if (ev === 'data' || ev === 'readable')
this.parser.unconsume(this._handle._externalStream);
return res;
}

11
src/env-inl.h

@ -178,6 +178,7 @@ inline Environment::Environment(v8::Local<v8::Context> context,
printed_error_(false),
trace_sync_io_(false),
debugger_agent_(this),
http_parser_buffer_(nullptr),
context_(context->GetIsolate(), context) {
// We'll be creating new objects so make sure we've entered the context.
v8::HandleScope handle_scope(isolate());
@ -200,6 +201,7 @@ inline Environment::~Environment() {
isolate_data()->Put();
delete[] heap_statistics_buffer_;
delete[] http_parser_buffer_;
}
inline void Environment::CleanupHandles() {
@ -338,6 +340,15 @@ inline void Environment::set_heap_statistics_buffer(uint32_t* pointer) {
heap_statistics_buffer_ = pointer;
}
inline char* Environment::http_parser_buffer() const {
return http_parser_buffer_;
}
inline void Environment::set_http_parser_buffer(char* buffer) {
CHECK_EQ(http_parser_buffer_, nullptr); // Should be set only once.
http_parser_buffer_ = buffer;
}
inline Environment* Environment::from_cares_timer_handle(uv_timer_t* handle) {
return ContainerOf(&Environment::cares_timer_handle_, handle);
}

5
src/env.h

@ -429,6 +429,9 @@ class Environment {
inline uint32_t* heap_statistics_buffer() const;
inline void set_heap_statistics_buffer(uint32_t* pointer);
inline char* http_parser_buffer() const;
inline void set_http_parser_buffer(char* buffer);
inline void ThrowError(const char* errmsg);
inline void ThrowTypeError(const char* errmsg);
inline void ThrowRangeError(const char* errmsg);
@ -526,6 +529,8 @@ class Environment {
uint32_t* heap_statistics_buffer_ = nullptr;
char* http_parser_buffer_;
#define V(PropertyName, TypeName) \
v8::Persistent<TypeName> PropertyName ## _;
ENVIRONMENT_STRONG_PERSISTENT_PROPERTIES(V)

204
src/node_http_parser.cc

@ -6,6 +6,8 @@
#include "base-object-inl.h"
#include "env.h"
#include "env-inl.h"
#include "stream_base.h"
#include "stream_base-inl.h"
#include "util.h"
#include "util-inl.h"
#include "v8.h"
@ -36,6 +38,7 @@ namespace node {
using v8::Array;
using v8::Boolean;
using v8::Context;
using v8::EscapableHandleScope;
using v8::Exception;
using v8::Function;
using v8::FunctionCallbackInfo;
@ -54,6 +57,7 @@ const uint32_t kOnHeaders = 0;
const uint32_t kOnHeadersComplete = 1;
const uint32_t kOnBody = 2;
const uint32_t kOnMessageComplete = 3;
const uint32_t kOnExecute = 4;
#define HTTP_CB(name) \
@ -295,7 +299,7 @@ class Parser : public BaseObject {
HTTP_DATA_CB(on_body) {
HandleScope scope(env()->isolate());
EscapableHandleScope scope(env()->isolate());
Local<Object> obj = object();
Local<Value> cb = obj->Get(kOnBody);
@ -303,6 +307,15 @@ class Parser : public BaseObject {
if (!cb->IsFunction())
return 0;
// We came from consumed stream
if (current_buffer_.IsEmpty()) {
// Make sure Buffer will be in parent HandleScope
current_buffer_ = scope.Escape(Buffer::Copy(
env()->isolate(),
current_buffer_data_,
current_buffer_len_).ToLocalChecked());
}
Local<Value> argv[3] = {
current_buffer_,
Integer::NewFromUnsigned(env()->isolate(), at - current_buffer_data_),
@ -374,8 +387,6 @@ class Parser : public BaseObject {
// var bytesParsed = parser->execute(buffer);
static void Execute(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
Parser* parser = Unwrap<Parser>(args.Holder());
CHECK(parser->current_buffer_.IsEmpty());
CHECK_EQ(parser->current_buffer_len_, 0);
@ -390,40 +401,11 @@ class Parser : public BaseObject {
// amount of overhead. Nothing else will run while http_parser_execute()
// runs, therefore this pointer can be set and used for the execution.
parser->current_buffer_ = buffer_obj;
parser->current_buffer_len_ = buffer_len;
parser->current_buffer_data_ = buffer_data;
parser->got_exception_ = false;
size_t nparsed =
http_parser_execute(&parser->parser_, &settings, buffer_data, buffer_len);
parser->Save();
// Unassign the 'buffer_' variable
parser->current_buffer_.Clear();
parser->current_buffer_len_ = 0;
parser->current_buffer_data_ = nullptr;
// If there was an exception in one of the callbacks
if (parser->got_exception_)
return;
Local<Integer> nparsed_obj = Integer::New(env->isolate(), nparsed);
// If there was a parse error in one of the callbacks
// TODO(bnoordhuis) What if there is an error on EOF?
if (!parser->parser_.upgrade && nparsed != buffer_len) {
enum http_errno err = HTTP_PARSER_ERRNO(&parser->parser_);
Local<Value> e = Exception::Error(env->parse_error_string());
Local<Object> obj = e->ToObject(env->isolate());
obj->Set(env->bytes_parsed_string(), nparsed_obj);
obj->Set(env->code_string(),
OneByteString(env->isolate(), http_errno_name(err)));
Local<Value> ret = parser->Execute(buffer_data, buffer_len);
args.GetReturnValue().Set(e);
} else {
args.GetReturnValue().Set(nparsed_obj);
}
if (!ret.IsEmpty())
args.GetReturnValue().Set(ret);
}
@ -478,7 +460,150 @@ class Parser : public BaseObject {
}
private:
static void Consume(const FunctionCallbackInfo<Value>& args) {
Parser* parser = Unwrap<Parser>(args.Holder());
Local<External> stream_obj = args[0].As<External>();
StreamBase* stream = static_cast<StreamBase*>(stream_obj->Value());
CHECK_NE(stream, nullptr);
stream->Consume();
parser->prev_alloc_cb_ = stream->alloc_cb();
parser->prev_read_cb_ = stream->read_cb();
stream->set_alloc_cb({ OnAllocImpl, parser });
stream->set_read_cb({ OnReadImpl, parser });
}
static void Unconsume(const FunctionCallbackInfo<Value>& args) {
Parser* parser = Unwrap<Parser>(args.Holder());
// Already unconsumed
if (parser->prev_alloc_cb_.is_empty())
return;
CHECK(args[0]->IsExternal());
Local<External> stream_obj = args[0].As<External>();
StreamBase* stream = static_cast<StreamBase*>(stream_obj->Value());
CHECK_NE(stream, nullptr);
stream->set_alloc_cb(parser->prev_alloc_cb_);
stream->set_read_cb(parser->prev_read_cb_);
}
static void GetCurrentBuffer(const FunctionCallbackInfo<Value>& args) {
Parser* parser = Unwrap<Parser>(args.Holder());
Local<Object> ret = Buffer::Copy(
parser->env(),
parser->current_buffer_data_,
parser->current_buffer_len_).ToLocalChecked();
args.GetReturnValue().Set(ret);
}
protected:
static const size_t kAllocBufferSize = 64 * 1024;
static void OnAllocImpl(size_t suggested_size, uv_buf_t* buf, void* ctx) {
Parser* parser = static_cast<Parser*>(ctx);
Environment* env = parser->env();
if (env->http_parser_buffer() == nullptr)
env->set_http_parser_buffer(new char[kAllocBufferSize]);
buf->base = env->http_parser_buffer();
buf->len = kAllocBufferSize;
}
static void OnReadImpl(ssize_t nread,
const uv_buf_t* buf,
uv_handle_type pending,
void* ctx) {
Parser* parser = static_cast<Parser*>(ctx);
HandleScope scope(parser->env()->isolate());
if (nread < 0) {
uv_buf_t tmp_buf;
tmp_buf.base = nullptr;
tmp_buf.len = 0;
parser->prev_read_cb_.fn(nread,
&tmp_buf,
pending,
parser->prev_read_cb_.ctx);
return;
}
// Ignore, empty reads have special meaning in http parser
if (nread == 0)
return;
parser->current_buffer_.Clear();
Local<Value> ret = parser->Execute(buf->base, nread);
// Exception
if (ret.IsEmpty())
return;
Local<Object> obj = parser->object();
Local<Value> cb = obj->Get(kOnExecute);
if (!cb->IsFunction())
return;
// Hooks for GetCurrentBuffer
parser->current_buffer_len_ = nread;
parser->current_buffer_data_ = buf->base;
cb.As<Function>()->Call(obj, 1, &ret);
parser->current_buffer_len_ = 0;
parser->current_buffer_data_ = nullptr;
parser->env()->KickNextTick();
}
Local<Value> Execute(char* data, size_t len) {
EscapableHandleScope scope(env()->isolate());
current_buffer_len_ = len;
current_buffer_data_ = data;
got_exception_ = false;
size_t nparsed =
http_parser_execute(&parser_, &settings, data, len);
Save();
// Unassign the 'buffer_' variable
current_buffer_.Clear();
current_buffer_len_ = 0;
current_buffer_data_ = nullptr;
// If there was an exception in one of the callbacks
if (got_exception_)
return scope.Escape(Local<Value>());
Local<Integer> nparsed_obj = Integer::New(env()->isolate(), nparsed);
// If there was a parse error in one of the callbacks
// TODO(bnoordhuis) What if there is an error on EOF?
if (!parser_.upgrade && nparsed != len) {
enum http_errno err = HTTP_PARSER_ERRNO(&parser_);
Local<Value> e = Exception::Error(env()->parse_error_string());
Local<Object> obj = e->ToObject(env()->isolate());
obj->Set(env()->bytes_parsed_string(), nparsed_obj);
obj->Set(env()->code_string(),
OneByteString(env()->isolate(), http_errno_name(err)));
return scope.Escape(e);
}
return scope.Escape(nparsed_obj);
}
Local<Array> CreateHeaders() {
// num_values_ is either -1 or the entry # of the last header
@ -542,6 +667,8 @@ class Parser : public BaseObject {
Local<Object> current_buffer_;
size_t current_buffer_len_;
char* current_buffer_data_;
StreamResource::Callback<StreamResource::AllocCb> prev_alloc_cb_;
StreamResource::Callback<StreamResource::ReadCb> prev_read_cb_;
static const struct http_parser_settings settings;
};
@ -581,6 +708,8 @@ void InitHttpParser(Handle<Object> target,
Integer::NewFromUnsigned(env->isolate(), kOnBody));
t->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "kOnMessageComplete"),
Integer::NewFromUnsigned(env->isolate(), kOnMessageComplete));
t->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "kOnExecute"),
Integer::NewFromUnsigned(env->isolate(), kOnExecute));
Local<Array> methods = Array::New(env->isolate());
#define V(num, name, string) \
@ -595,6 +724,9 @@ void InitHttpParser(Handle<Object> target,
env->SetProtoMethod(t, "reinitialize", Parser::Reinitialize);
env->SetProtoMethod(t, "pause", Parser::Pause<true>);
env->SetProtoMethod(t, "resume", Parser::Pause<false>);
env->SetProtoMethod(t, "consume", Parser::Consume);
env->SetProtoMethod(t, "unconsume", Parser::Unconsume);
env->SetProtoMethod(t, "getCurrentBuffer", Parser::GetCurrentBuffer);
target->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "HTTPParser"),
t->GetFunction());

30
test/parallel/test-http-server-unconsume.js

@ -0,0 +1,30 @@
'use strict';
var common = require('../common');
var assert = require('assert');
var http = require('http');
var net = require('net');
var received = '';
var server = http.createServer(function(req, res) {
res.writeHead(200);
res.end();
req.socket.on('data', function(data) {
received += data;
});
server.close();
}).listen(common.PORT, function() {
var socket = net.connect(common.PORT, function() {
socket.write('PUT / HTTP/1.1\r\n\r\n');
socket.once('data', function() {
socket.end('hello world');
});
});
});
process.on('exit', function() {
assert.equal(received, 'hello world');
});
Loading…
Cancel
Save