Browse Source

Use events for all HTTP messages.

This is a rather large refactor! Mostly for the better side. I've had to
remove some functionality like req.interrupt(). A lot of other work is left
messy or incomplete.
v0.7.4-release
Ryan 16 years ago
parent
commit
70fe920fb5
  1. 2
      benchmark/http_simple.js
  2. 2
      src/events.js
  3. 145
      src/http.cc
  4. 88
      src/http.js
  5. 1
      src/node.cc
  6. 2
      test/mjsunit/test-http-client-race.js
  7. 4
      test/mjsunit/test-http-proxy.js
  8. 2
      test/mjsunit/test-http.js

2
benchmark/http_simple.js

@ -3,7 +3,7 @@ for (var i = 0; i < 20*1024; i++) {
fixed += "C"; fixed += "C";
} }
stored = {}; stored = {};
new node.http.Server(function (req, res) { node.http.createServer(function (req, res) {
var commands = req.uri.path.split("/"); var commands = req.uri.path.split("/");
var command = commands[1]; var command = commands[1];
var body = ""; var body = "";

2
src/events.js

@ -4,9 +4,11 @@
var emitter = node.EventEmitter.prototype; var emitter = node.EventEmitter.prototype;
emitter.addListener = function (type, listener) { emitter.addListener = function (type, listener) {
if (listener instanceof Function) {
if (!this._events) this._events = {}; if (!this._events) this._events = {};
if (!this._events.hasOwnProperty(type)) this._events[type] = []; if (!this._events.hasOwnProperty(type)) this._events[type] = [];
this._events[type].push(listener); this._events[type].push(listener);
}
}; };
emitter.listeners = function (type, listener) { emitter.listeners = function (type, listener) {

145
src/http.cc

@ -87,52 +87,47 @@ int
HTTPConnection::on_message_begin (http_parser *parser) HTTPConnection::on_message_begin (http_parser *parser)
{ {
HTTPConnection *connection = static_cast<HTTPConnection*> (parser->data); HTTPConnection *connection = static_cast<HTTPConnection*> (parser->data);
HandleScope scope; connection->Emit("MessageBegin", 0, NULL);
return 0;
Local<Value> on_message_v = connection->handle_->Get(ON_MESSAGE_SYMBOL); }
if (!on_message_v->IsFunction()) return -1;
Handle<Function> on_message = Handle<Function>::Cast(on_message_v);
TryCatch try_catch; int
Local<Object> message_handler = on_message->NewInstance(); HTTPConnection::on_message_complete (http_parser *parser)
if (try_catch.HasCaught()) { {
FatalException(try_catch); HTTPConnection *connection = static_cast<HTTPConnection*> (parser->data);
return -1; connection->Emit("MessageComplete", 0, NULL);
} return 0;
}
connection->handle_->SetHiddenValue(MESSAGE_HANDLER_SYMBOL, message_handler); int
HTTPConnection::on_uri (http_parser *parser, const char *buf, size_t len)
{
HandleScope scope;
HTTPConnection *connection = static_cast<HTTPConnection*>(parser->data);
Local<Value> argv[1] = { String::New(buf, len) };
connection->Emit("URI", 1, argv);
return 0; return 0;
} }
#define DEFINE_PARSER_CALLBACK(name, symbol) \ int
int \ HTTPConnection::on_header_field (http_parser *parser, const char *buf, size_t len)
HTTPConnection::name (http_parser *parser, const char *buf, size_t len) \ {
{ \ HandleScope scope;
HandleScope scope; \ HTTPConnection *connection = static_cast<HTTPConnection*>(parser->data);
HTTPConnection *connection = static_cast<HTTPConnection*> (parser->data); \ Local<Value> argv[1] = { String::New(buf, len) };
Local<Value> message_handler_v = \ connection->Emit("HeaderField", 1, argv);
connection->handle_->GetHiddenValue(MESSAGE_HANDLER_SYMBOL); \ return 0;
if (message_handler_v->IsObject() == false) \
return -1; \
Local<Object> message_handler = message_handler_v->ToObject(); \
Local<Value> callback_v = message_handler->Get(symbol); \
if (callback_v->IsFunction() == false) \
return 0; \
Local<Function> callback = Local<Function>::Cast(callback_v); \
TryCatch try_catch; \
Local<Value> argv[1] = { String::New(buf, len) }; \
Local<Value> ret = callback->Call(message_handler, 1, argv); \
if (ret.IsEmpty()) { \
FatalException(try_catch); \
return -2; \
} \
if (ret->IsFalse()) return -3; \
return 0; \
} }
DEFINE_PARSER_CALLBACK(on_uri, ON_URI_SYMBOL) int
DEFINE_PARSER_CALLBACK(on_header_field, ON_HEADER_FIELD_SYMBOL) HTTPConnection::on_header_value (http_parser *parser, const char *buf, size_t len)
DEFINE_PARSER_CALLBACK(on_header_value, ON_HEADER_VALUE_SYMBOL) {
HandleScope scope;
HTTPConnection *connection = static_cast<HTTPConnection*>(parser->data);
Local<Value> argv[1] = { String::New(buf, len) };
connection->Emit("HeaderValue", 1, argv);
return 0;
}
static inline Local<String> static inline Local<String>
GetMethod (int method) GetMethod (int method)
@ -162,17 +157,15 @@ HTTPConnection::on_headers_complete (http_parser *parser)
HTTPConnection *connection = static_cast<HTTPConnection*> (parser->data); HTTPConnection *connection = static_cast<HTTPConnection*> (parser->data);
HandleScope scope; HandleScope scope;
Local<Value> message_handler_v = Local<Object> message_info = Object::New();
connection->handle_->GetHiddenValue(MESSAGE_HANDLER_SYMBOL);
Local<Object> message_handler = message_handler_v->ToObject();
// METHOD // METHOD
if (connection->parser_.type == HTTP_REQUEST) if (connection->parser_.type == HTTP_REQUEST)
message_handler->Set(METHOD_SYMBOL, GetMethod(connection->parser_.method)); message_info->Set(METHOD_SYMBOL, GetMethod(connection->parser_.method));
// STATUS // STATUS
if (connection->parser_.type == HTTP_RESPONSE) if (connection->parser_.type == HTTP_RESPONSE)
message_handler->Set(STATUS_CODE_SYMBOL, message_info->Set(STATUS_CODE_SYMBOL,
Integer::New(connection->parser_.status_code)); Integer::New(connection->parser_.status_code));
// VERSION // VERSION
@ -183,25 +176,14 @@ HTTPConnection::on_headers_complete (http_parser *parser)
, connection->parser_.version_major , connection->parser_.version_major
, connection->parser_.version_minor , connection->parser_.version_minor
); );
message_handler->Set(HTTP_VERSION_SYMBOL, String::New(version)); message_info->Set(HTTP_VERSION_SYMBOL, String::New(version));
message_handler->Set(SHOULD_KEEP_ALIVE_SYMBOL, message_info->Set(SHOULD_KEEP_ALIVE_SYMBOL,
http_parser_should_keep_alive(&connection->parser_) ? True() : False()); http_parser_should_keep_alive(&connection->parser_) ? True() : False());
Local<Value> on_headers_complete_v = Local<Value> argv[1] = { message_info };
message_handler->Get(ON_HEADERS_COMPLETE_SYMBOL);
if (on_headers_complete_v->IsFunction() == false) return 0;
Handle<Function> on_headers_complete =
Handle<Function>::Cast(on_headers_complete_v);
TryCatch try_catch; connection->Emit("HeadersComplete", 1, argv);
Local<Value> ret = on_headers_complete->Call(message_handler, 0, NULL);
if (ret.IsEmpty()) {
FatalException(try_catch);
return -2;
}
if (ret->IsFalse()) return -3;
return 0; return 0;
} }
@ -214,17 +196,11 @@ HTTPConnection::on_body (http_parser *parser, const char *buf, size_t len)
HTTPConnection *connection = static_cast<HTTPConnection*> (parser->data); HTTPConnection *connection = static_cast<HTTPConnection*> (parser->data);
HandleScope scope; HandleScope scope;
Local<Value> message_handler_v =
connection->handle_->GetHiddenValue(MESSAGE_HANDLER_SYMBOL);
Local<Object> message_handler = message_handler_v->ToObject();
Local<Value> on_body_v = message_handler->Get(ON_BODY_SYMBOL);
if (on_body_v->IsFunction() == false) return 0;
Handle<Function> on_body = Handle<Function>::Cast(on_body_v);
Handle<Value> argv[1]; Handle<Value> argv[1];
// TODO each message should have their encoding. // TODO each message should have their encoding.
// don't look at the conneciton for encoding // don't look at the conneciton for encoding
if (connection->encoding_ == UTF8) { if (connection->encoding_ == UTF8) {
// utf8 encoding // utf8 encoding
Handle<String> chunk = String::New((const char*)buf, len); Handle<String> chunk = String::New((const char*)buf, len);
@ -240,40 +216,7 @@ HTTPConnection::on_body (http_parser *parser, const char *buf, size_t len)
argv[0] = array; argv[0] = array;
} }
TryCatch try_catch; connection->Emit("Body", 1, argv);
Local<Value> ret = on_body->Call(message_handler, 1, argv);
if (ret.IsEmpty()) {
FatalException(try_catch);
return -2;
}
if (ret->IsFalse()) return -3;
return 0;
}
int
HTTPConnection::on_message_complete (http_parser *parser)
{
HTTPConnection *connection = static_cast<HTTPConnection*> (parser->data);
HandleScope scope;
Local<Value> message_handler_v =
connection->handle_->GetHiddenValue(MESSAGE_HANDLER_SYMBOL);
connection->handle_->DeleteHiddenValue(MESSAGE_HANDLER_SYMBOL);
Local<Object> message_handler = message_handler_v->ToObject();
Local<Value> on_msg_complete_v = message_handler->Get(ON_MESSAGE_COMPLETE_SYMBOL);
if (on_msg_complete_v->IsFunction() == false) return 0;
Handle<Function> on_msg_complete = Handle<Function>::Cast(on_msg_complete_v);
TryCatch try_catch;
Local<Value> ret = on_msg_complete->Call(message_handler, 0, NULL);
if (ret.IsEmpty()) {
FatalException(try_catch);
return -2;
}
if (ret->IsFalse()) return -3;
return 0; return 0;
} }

88
src/http.js

@ -341,16 +341,34 @@ node.http.ServerResponse = function (connection) {
node.http.Client = node.http.LowLevelClient; // FIXME node.http.Client = node.http.LowLevelClient; // FIXME
node.http.Client.prototype.flush = function (request) {
//p(request);
if (this.readyState == "closed") {
this.reconnect();
return;
}
//node.debug("HTTP CLIENT flush. readyState = " + connection.readyState);
while ( request === this.requests[0]
&& request.output.length > 0
&& this.readyState == "open"
)
{
var out = request.output.shift();
this.send(out[0], out[1]);
}
};
node.http.createClient = function (port, host) { node.http.createClient = function (port, host) {
var client = new node.http.Client(); var client = new node.http.Client();
var requests = client.requests = [];
client.requests = [];
client.reconnect = function () { return client.connect(port, host) }; client.reconnect = function () { return client.connect(port, host) };
client.addListener("Connect", function () { client.addListener("Connect", function () {
//node.debug("HTTP CLIENT onConnect. readyState = " + client.readyState); //node.debug("HTTP CLIENT onConnect. readyState = " + client.readyState);
//node.debug("requests[0].uri = '" + requests[0].uri + "'"); //node.debug("client.requests[0].uri = '" + client.requests[0].uri + "'");
requests[0].flush(); client.flush(client.requests[0]);
}); });
client.addListener("EOF", function () { client.addListener("EOF", function () {
@ -365,7 +383,7 @@ node.http.createClient = function (port, host) {
//node.debug("HTTP CLIENT onDisconnect. readyState = " + client.readyState); //node.debug("HTTP CLIENT onDisconnect. readyState = " + client.readyState);
// If there are more requests to handle, reconnect. // If there are more requests to handle, reconnect.
if (requests.length > 0) { if (client.requests.length > 0) {
//node.debug("HTTP CLIENT: reconnecting"); //node.debug("HTTP CLIENT: reconnecting");
client.connect(port, host); client.connect(port, host);
} }
@ -374,7 +392,7 @@ node.http.createClient = function (port, host) {
var req, res; var req, res;
client.addListener("MessageBegin", function () { client.addListener("MessageBegin", function () {
req = requests.shift(); req = client.requests.shift();
res = createClientResponse(client); res = createClientResponse(client);
}); });
@ -416,30 +434,37 @@ node.http.createClient = function (port, host) {
}; };
node.http.Client.prototype.get = function (uri, headers) { node.http.Client.prototype.get = function (uri, headers) {
return createClientRequest(this, "GET", uri, headers); var req = createClientRequest(this, "GET", uri, headers);
this.requests.push(req);
return req;
}; };
node.http.Client.prototype.head = function (uri, headers) { node.http.Client.prototype.head = function (uri, headers) {
return createClientRequest(this, "HEAD", uri, headers); var req = createClientRequest(this, "HEAD", uri, headers);
this.requests.push(req);
return req;
}; };
node.http.Client.prototype.post = function (uri, headers) { node.http.Client.prototype.post = function (uri, headers) {
return createClientRequest(this, "POST", uri, headers); var req = createClientRequest(this, "POST", uri, headers);
this.requests.push(req);
return req;
}; };
node.http.Client.prototype.del = function (uri, headers) { node.http.Client.prototype.del = function (uri, headers) {
return createClientRequest(this, "DELETE", uri, headers); var req = createClientRequest(this, "DELETE", uri, headers);
this.requests.push(req);
return req;
}; };
node.http.Client.prototype.put = function (uri, headers) { node.http.Client.prototype.put = function (uri, headers) {
return createClientRequest(this, "PUT", uri, headers); var req = createClientRequest(this, "PUT", uri, headers);
this.requests.push(req);
return req;
}; };
function createClientRequest (connection, method, uri, header_lines) { function createClientRequest (connection, method, uri, header_lines) {
var req = new node.EventEmitter; var req = new node.EventEmitter;
var requests = connection.requests;
requests.push(this);
req.uri = uri; req.uri = uri;
@ -476,8 +501,9 @@ function createClientRequest (connection, method, uri, header_lines) {
header += CRLF; header += CRLF;
var output = []; req.output = [];
send(output, header);
send(req.output, header);
req.sendBody = function (chunk, encoding) { req.sendBody = function (chunk, encoding) {
if (sent_content_length_header == false && chunked_encoding == false) { if (sent_content_length_header == false && chunked_encoding == false) {
@ -486,31 +512,15 @@ function createClientRequest (connection, method, uri, header_lines) {
} }
if (chunked_encoding) { if (chunked_encoding) {
send(output, chunk.length.toString(16)); send(req.output, chunk.length.toString(16));
send(output, CRLF); send(req.output, CRLF);
send(output, chunk, encoding); send(req.output, chunk, encoding);
send(output, CRLF); send(req.output, CRLF);
} else { } else {
send(output, chunk, encoding); send(req.output, chunk, encoding);
} }
req.flush(); connection.flush(req);
};
req.flush = function ( ) {
if (connection.readyState == "closed") {
connection.reconnect();
return;
}
//node.debug("HTTP CLIENT flush. readyState = " + connection.readyState);
while ( req === requests[0]
&& output.length > 0
&& connection.readyState == "open"
)
{
var out = output.shift();
connection.send(out[0], out[1]);
}
}; };
req.finished = false; req.finished = false;
@ -519,9 +529,9 @@ function createClientRequest (connection, method, uri, header_lines) {
req.addListener("Response", responseListener); req.addListener("Response", responseListener);
if (chunked_encoding) if (chunked_encoding)
send(output, "0\r\n\r\n"); // last chunk send(req.output, "0\r\n\r\n"); // last chunk
req.flush(); connection.flush(req);
}; };
return req; return req;

1
src/node.cc

@ -237,7 +237,6 @@ OnFatalError (const char* location, const char* message)
exit(1); exit(1);
} }
void void
node::FatalException (TryCatch &try_catch) node::FatalException (TryCatch &try_catch)
{ {

2
test/mjsunit/test-http-client-race.js

@ -15,7 +15,7 @@ var server = node.http.createServer(function (req, res) {
}); });
server.listen(PORT); server.listen(PORT);
var client = new node.http.Client(PORT); var client = node.http.createClient(PORT);
var body1 = ""; var body1 = "";
var body2 = ""; var body2 = "";

4
test/mjsunit/test-http-proxy.js

@ -12,7 +12,7 @@ var backend = node.http.createServer(function (req, res) {
// node.debug("listen backend") // node.debug("listen backend")
backend.listen(BACKEND_PORT); backend.listen(BACKEND_PORT);
var proxy_client = new node.http.Client(BACKEND_PORT); var proxy_client = node.http.createClient(BACKEND_PORT);
var proxy = node.http.createServer(function (req, res) { var proxy = node.http.createServer(function (req, res) {
// node.debug("proxy req"); // node.debug("proxy req");
var proxy_req = proxy_client.get(req.uri.path); var proxy_req = proxy_client.get(req.uri.path);
@ -33,7 +33,7 @@ proxy.listen(PROXY_PORT);
var body = ""; var body = "";
function onLoad () { function onLoad () {
var client = new node.http.Client(PROXY_PORT); var client = node.http.createClient(PROXY_PORT);
var req = client.get("/test"); var req = client.get("/test");
// node.debug("client req") // node.debug("client req")
req.finish(function (res) { req.finish(function (res) {

2
test/mjsunit/test-http.js

@ -29,7 +29,7 @@ function onLoad () {
//assertEquals("127.0.0.1", res.connection.remoteAddress); //assertEquals("127.0.0.1", res.connection.remoteAddress);
}).listen(PORT); }).listen(PORT);
var client = new node.http.Client(PORT); var client = node.http.createClient(PORT);
var req = client.get("/hello"); var req = client.get("/hello");
req.finish(function (res) { req.finish(function (res) {
assertEquals(200, res.statusCode); assertEquals(200, res.statusCode);

Loading…
Cancel
Save