From 70fe920fb5fb3a50d75522ea4e5a558edcbd6ad9 Mon Sep 17 00:00:00 2001 From: Ryan Date: Fri, 26 Jun 2009 18:30:55 +0200 Subject: [PATCH] Use events for all HTTP messages. This is a rather large refactor! Mostly for the better side. I've had to remove some functionality like req.interrupt(). A lot of other work is left messy or incomplete. --- benchmark/http_simple.js | 2 +- src/events.js | 8 +- src/http.cc | 145 ++++++++------------------ src/http.js | 88 +++++++++------- src/node.cc | 1 - test/mjsunit/test-http-client-race.js | 2 +- test/mjsunit/test-http-proxy.js | 4 +- test/mjsunit/test-http.js | 2 +- 8 files changed, 103 insertions(+), 149 deletions(-) diff --git a/benchmark/http_simple.js b/benchmark/http_simple.js index 2e3bbb3cad..099a641862 100644 --- a/benchmark/http_simple.js +++ b/benchmark/http_simple.js @@ -3,7 +3,7 @@ for (var i = 0; i < 20*1024; i++) { fixed += "C"; } stored = {}; -new node.http.Server(function (req, res) { +node.http.createServer(function (req, res) { var commands = req.uri.path.split("/"); var command = commands[1]; var body = ""; diff --git a/src/events.js b/src/events.js index f665cec977..f63641c23a 100644 --- a/src/events.js +++ b/src/events.js @@ -4,9 +4,11 @@ var emitter = node.EventEmitter.prototype; emitter.addListener = function (type, listener) { - if (!this._events) this._events = {}; - if (!this._events.hasOwnProperty(type)) this._events[type] = []; - this._events[type].push(listener); + if (listener instanceof Function) { + if (!this._events) this._events = {}; + if (!this._events.hasOwnProperty(type)) this._events[type] = []; + this._events[type].push(listener); + } }; emitter.listeners = function (type, listener) { diff --git a/src/http.cc b/src/http.cc index 5e43ebff58..a6d8efc237 100644 --- a/src/http.cc +++ b/src/http.cc @@ -87,52 +87,47 @@ int HTTPConnection::on_message_begin (http_parser *parser) { HTTPConnection *connection = static_cast (parser->data); - HandleScope scope; - - Local on_message_v = connection->handle_->Get(ON_MESSAGE_SYMBOL); - if (!on_message_v->IsFunction()) return -1; - Handle on_message = Handle::Cast(on_message_v); + connection->Emit("MessageBegin", 0, NULL); + return 0; +} - TryCatch try_catch; - Local message_handler = on_message->NewInstance(); - if (try_catch.HasCaught()) { - FatalException(try_catch); - return -1; - } +int +HTTPConnection::on_message_complete (http_parser *parser) +{ + HTTPConnection *connection = static_cast (parser->data); + connection->Emit("MessageComplete", 0, NULL); + return 0; +} - connection->handle_->SetHiddenValue(MESSAGE_HANDLER_SYMBOL, message_handler); +int +HTTPConnection::on_uri (http_parser *parser, const char *buf, size_t len) +{ + HandleScope scope; + HTTPConnection *connection = static_cast(parser->data); + Local argv[1] = { String::New(buf, len) }; + connection->Emit("URI", 1, argv); return 0; } -#define DEFINE_PARSER_CALLBACK(name, symbol) \ -int \ -HTTPConnection::name (http_parser *parser, const char *buf, size_t len) \ -{ \ - HandleScope scope; \ - HTTPConnection *connection = static_cast (parser->data); \ - Local message_handler_v = \ - connection->handle_->GetHiddenValue(MESSAGE_HANDLER_SYMBOL); \ - if (message_handler_v->IsObject() == false) \ - return -1; \ - Local message_handler = message_handler_v->ToObject(); \ - Local callback_v = message_handler->Get(symbol); \ - if (callback_v->IsFunction() == false) \ - return 0; \ - Local callback = Local::Cast(callback_v); \ - TryCatch try_catch; \ - Local argv[1] = { String::New(buf, len) }; \ - Local ret = callback->Call(message_handler, 1, argv); \ - if (ret.IsEmpty()) { \ - FatalException(try_catch); \ - return -2; \ - } \ - if (ret->IsFalse()) return -3; \ - return 0; \ +int +HTTPConnection::on_header_field (http_parser *parser, const char *buf, size_t len) +{ + HandleScope scope; + HTTPConnection *connection = static_cast(parser->data); + Local argv[1] = { String::New(buf, len) }; + connection->Emit("HeaderField", 1, argv); + return 0; } -DEFINE_PARSER_CALLBACK(on_uri, ON_URI_SYMBOL) -DEFINE_PARSER_CALLBACK(on_header_field, ON_HEADER_FIELD_SYMBOL) -DEFINE_PARSER_CALLBACK(on_header_value, ON_HEADER_VALUE_SYMBOL) +int +HTTPConnection::on_header_value (http_parser *parser, const char *buf, size_t len) +{ + HandleScope scope; + HTTPConnection *connection = static_cast(parser->data); + Local argv[1] = { String::New(buf, len) }; + connection->Emit("HeaderValue", 1, argv); + return 0; +} static inline Local GetMethod (int method) @@ -162,17 +157,15 @@ HTTPConnection::on_headers_complete (http_parser *parser) HTTPConnection *connection = static_cast (parser->data); HandleScope scope; - Local message_handler_v = - connection->handle_->GetHiddenValue(MESSAGE_HANDLER_SYMBOL); - Local message_handler = message_handler_v->ToObject(); + Local message_info = Object::New(); // METHOD if (connection->parser_.type == HTTP_REQUEST) - message_handler->Set(METHOD_SYMBOL, GetMethod(connection->parser_.method)); + message_info->Set(METHOD_SYMBOL, GetMethod(connection->parser_.method)); // STATUS if (connection->parser_.type == HTTP_RESPONSE) - message_handler->Set(STATUS_CODE_SYMBOL, + message_info->Set(STATUS_CODE_SYMBOL, Integer::New(connection->parser_.status_code)); // VERSION @@ -183,25 +176,14 @@ HTTPConnection::on_headers_complete (http_parser *parser) , connection->parser_.version_major , connection->parser_.version_minor ); - message_handler->Set(HTTP_VERSION_SYMBOL, String::New(version)); + message_info->Set(HTTP_VERSION_SYMBOL, String::New(version)); - message_handler->Set(SHOULD_KEEP_ALIVE_SYMBOL, + message_info->Set(SHOULD_KEEP_ALIVE_SYMBOL, http_parser_should_keep_alive(&connection->parser_) ? True() : False()); - Local on_headers_complete_v = - message_handler->Get(ON_HEADERS_COMPLETE_SYMBOL); - if (on_headers_complete_v->IsFunction() == false) return 0; - - Handle on_headers_complete = - Handle::Cast(on_headers_complete_v); + Local argv[1] = { message_info }; - TryCatch try_catch; - Local ret = on_headers_complete->Call(message_handler, 0, NULL); - if (ret.IsEmpty()) { - FatalException(try_catch); - return -2; - } - if (ret->IsFalse()) return -3; + connection->Emit("HeadersComplete", 1, argv); return 0; } @@ -214,17 +196,11 @@ HTTPConnection::on_body (http_parser *parser, const char *buf, size_t len) HTTPConnection *connection = static_cast (parser->data); HandleScope scope; - Local message_handler_v = - connection->handle_->GetHiddenValue(MESSAGE_HANDLER_SYMBOL); - Local message_handler = message_handler_v->ToObject(); - - Local on_body_v = message_handler->Get(ON_BODY_SYMBOL); - if (on_body_v->IsFunction() == false) return 0; - Handle on_body = Handle::Cast(on_body_v); - Handle argv[1]; + // TODO each message should have their encoding. // don't look at the conneciton for encoding + if (connection->encoding_ == UTF8) { // utf8 encoding Handle chunk = String::New((const char*)buf, len); @@ -240,40 +216,7 @@ HTTPConnection::on_body (http_parser *parser, const char *buf, size_t len) argv[0] = array; } - TryCatch try_catch; - Local ret = on_body->Call(message_handler, 1, argv); - if (ret.IsEmpty()) { - FatalException(try_catch); - return -2; - } - if (ret->IsFalse()) return -3; - - return 0; -} - -int -HTTPConnection::on_message_complete (http_parser *parser) -{ - HTTPConnection *connection = static_cast (parser->data); - HandleScope scope; - - Local message_handler_v = - connection->handle_->GetHiddenValue(MESSAGE_HANDLER_SYMBOL); - connection->handle_->DeleteHiddenValue(MESSAGE_HANDLER_SYMBOL); - - Local message_handler = message_handler_v->ToObject(); - - Local on_msg_complete_v = message_handler->Get(ON_MESSAGE_COMPLETE_SYMBOL); - if (on_msg_complete_v->IsFunction() == false) return 0; - Handle on_msg_complete = Handle::Cast(on_msg_complete_v); - - TryCatch try_catch; - Local ret = on_msg_complete->Call(message_handler, 0, NULL); - if (ret.IsEmpty()) { - FatalException(try_catch); - return -2; - } - if (ret->IsFalse()) return -3; + connection->Emit("Body", 1, argv); return 0; } diff --git a/src/http.js b/src/http.js index 848265a8d6..a9c8ff6914 100644 --- a/src/http.js +++ b/src/http.js @@ -341,16 +341,34 @@ node.http.ServerResponse = function (connection) { node.http.Client = node.http.LowLevelClient; // FIXME +node.http.Client.prototype.flush = function (request) { + //p(request); + if (this.readyState == "closed") { + this.reconnect(); + return; + } + //node.debug("HTTP CLIENT flush. readyState = " + connection.readyState); + while ( request === this.requests[0] + && request.output.length > 0 + && this.readyState == "open" + ) + { + var out = request.output.shift(); + this.send(out[0], out[1]); + } +}; + node.http.createClient = function (port, host) { var client = new node.http.Client(); - var requests = client.requests = []; + + client.requests = []; client.reconnect = function () { return client.connect(port, host) }; client.addListener("Connect", function () { //node.debug("HTTP CLIENT onConnect. readyState = " + client.readyState); - //node.debug("requests[0].uri = '" + requests[0].uri + "'"); - requests[0].flush(); + //node.debug("client.requests[0].uri = '" + client.requests[0].uri + "'"); + client.flush(client.requests[0]); }); client.addListener("EOF", function () { @@ -365,7 +383,7 @@ node.http.createClient = function (port, host) { //node.debug("HTTP CLIENT onDisconnect. readyState = " + client.readyState); // If there are more requests to handle, reconnect. - if (requests.length > 0) { + if (client.requests.length > 0) { //node.debug("HTTP CLIENT: reconnecting"); client.connect(port, host); } @@ -374,7 +392,7 @@ node.http.createClient = function (port, host) { var req, res; client.addListener("MessageBegin", function () { - req = requests.shift(); + req = client.requests.shift(); res = createClientResponse(client); }); @@ -416,30 +434,37 @@ node.http.createClient = function (port, host) { }; node.http.Client.prototype.get = function (uri, headers) { - return createClientRequest(this, "GET", uri, headers); + var req = createClientRequest(this, "GET", uri, headers); + this.requests.push(req); + return req; }; node.http.Client.prototype.head = function (uri, headers) { - return createClientRequest(this, "HEAD", uri, headers); + var req = createClientRequest(this, "HEAD", uri, headers); + this.requests.push(req); + return req; }; node.http.Client.prototype.post = function (uri, headers) { - return createClientRequest(this, "POST", uri, headers); + var req = createClientRequest(this, "POST", uri, headers); + this.requests.push(req); + return req; }; node.http.Client.prototype.del = function (uri, headers) { - return createClientRequest(this, "DELETE", uri, headers); + var req = createClientRequest(this, "DELETE", uri, headers); + this.requests.push(req); + return req; }; node.http.Client.prototype.put = function (uri, headers) { - return createClientRequest(this, "PUT", uri, headers); + var req = createClientRequest(this, "PUT", uri, headers); + this.requests.push(req); + return req; }; function createClientRequest (connection, method, uri, header_lines) { var req = new node.EventEmitter; - var requests = connection.requests; - - requests.push(this); req.uri = uri; @@ -476,8 +501,9 @@ function createClientRequest (connection, method, uri, header_lines) { header += CRLF; - var output = []; - send(output, header); + req.output = []; + + send(req.output, header); req.sendBody = function (chunk, encoding) { if (sent_content_length_header == false && chunked_encoding == false) { @@ -486,31 +512,15 @@ function createClientRequest (connection, method, uri, header_lines) { } if (chunked_encoding) { - send(output, chunk.length.toString(16)); - send(output, CRLF); - send(output, chunk, encoding); - send(output, CRLF); + send(req.output, chunk.length.toString(16)); + send(req.output, CRLF); + send(req.output, chunk, encoding); + send(req.output, CRLF); } else { - send(output, chunk, encoding); + send(req.output, chunk, encoding); } - req.flush(); - }; - - req.flush = function ( ) { - if (connection.readyState == "closed") { - connection.reconnect(); - return; - } - //node.debug("HTTP CLIENT flush. readyState = " + connection.readyState); - while ( req === requests[0] - && output.length > 0 - && connection.readyState == "open" - ) - { - var out = output.shift(); - connection.send(out[0], out[1]); - } + connection.flush(req); }; req.finished = false; @@ -519,9 +529,9 @@ function createClientRequest (connection, method, uri, header_lines) { req.addListener("Response", responseListener); if (chunked_encoding) - send(output, "0\r\n\r\n"); // last chunk + send(req.output, "0\r\n\r\n"); // last chunk - req.flush(); + connection.flush(req); }; return req; diff --git a/src/node.cc b/src/node.cc index c1375d4ca9..405a80e3d1 100644 --- a/src/node.cc +++ b/src/node.cc @@ -237,7 +237,6 @@ OnFatalError (const char* location, const char* message) exit(1); } - void node::FatalException (TryCatch &try_catch) { diff --git a/test/mjsunit/test-http-client-race.js b/test/mjsunit/test-http-client-race.js index 379cce46b1..875f53f993 100644 --- a/test/mjsunit/test-http-client-race.js +++ b/test/mjsunit/test-http-client-race.js @@ -15,7 +15,7 @@ var server = node.http.createServer(function (req, res) { }); server.listen(PORT); -var client = new node.http.Client(PORT); +var client = node.http.createClient(PORT); var body1 = ""; var body2 = ""; diff --git a/test/mjsunit/test-http-proxy.js b/test/mjsunit/test-http-proxy.js index 620f601da2..5a3e88f7a7 100644 --- a/test/mjsunit/test-http-proxy.js +++ b/test/mjsunit/test-http-proxy.js @@ -12,7 +12,7 @@ var backend = node.http.createServer(function (req, res) { // node.debug("listen backend") backend.listen(BACKEND_PORT); -var proxy_client = new node.http.Client(BACKEND_PORT); +var proxy_client = node.http.createClient(BACKEND_PORT); var proxy = node.http.createServer(function (req, res) { // node.debug("proxy req"); var proxy_req = proxy_client.get(req.uri.path); @@ -33,7 +33,7 @@ proxy.listen(PROXY_PORT); var body = ""; function onLoad () { - var client = new node.http.Client(PROXY_PORT); + var client = node.http.createClient(PROXY_PORT); var req = client.get("/test"); // node.debug("client req") req.finish(function (res) { diff --git a/test/mjsunit/test-http.js b/test/mjsunit/test-http.js index f90b73a115..2dea8aaa58 100644 --- a/test/mjsunit/test-http.js +++ b/test/mjsunit/test-http.js @@ -29,7 +29,7 @@ function onLoad () { //assertEquals("127.0.0.1", res.connection.remoteAddress); }).listen(PORT); - var client = new node.http.Client(PORT); + var client = node.http.createClient(PORT); var req = client.get("/hello"); req.finish(function (res) { assertEquals(200, res.statusCode);