Browse Source

large http.js refactor

v0.7.4-release
Ryan 16 years ago
parent
commit
ef09b2c65d
  1. 538
      src/http.js
  2. 6
      src/net.h

538
src/http.js

@ -139,27 +139,6 @@ function toRaw(string) {
return a; return a;
} }
function send (output, data, encoding) {
if (data.constructor === String)
encoding = encoding || "ascii";
else
encoding = "raw";
output.push([data, encoding]);
}
/* This is a wrapper around the LowLevelServer interface. It provides
* connection handling, overflow checking, and some data buffering.
*/
node.http.createServer = function (requestListener, options) {
var server = new node.http.LowLevelServer();
server.addListener("connection", connectionListener);
server.addListener("request", requestListener);
//server.setOptions(options);
return server;
};
/* Abstract base class for ServerRequest and ClientResponse. */ /* Abstract base class for ServerRequest and ClientResponse. */
var IncomingMessage = function (connection) { var IncomingMessage = function (connection) {
@ -168,7 +147,14 @@ var IncomingMessage = function (connection) {
this.connection = connection; this.connection = connection;
this.httpVersion = null; this.httpVersion = null;
this.headers = []; this.headers = [];
this.last_was_value = false; // TODO: remove me.
// request (server) only
this.uri = "";
this.method = null;
// response (client) only
this.statusCode = null;
this.client = this.connection;
}; };
inherits(IncomingMessage, node.EventEmitter); inherits(IncomingMessage, node.EventEmitter);
@ -186,215 +172,301 @@ IncomingMessage.prototype._emitComplete = function () {
}; };
var ServerRequest = function (connection) { var OutgoingMessage = function () {
IncomingMessage.call(this, connection); node.EventEmitter.call(this);
this.uri = ""; this.output = [];
this.method = null;
};
inherits(ServerRequest, IncomingMessage);
this.sent_connection_header = false;
this.sent_content_length_header = false;
this.sent_transfer_encoding_header = false;
var ClientResponse = function (connection) { this.closeOnFinish = false;
IncomingMessage.call(this, connection); this.chunked_encoding = false;
this.should_keep_alive = true;
this.use_chunked_encoding_by_default = true;
this.statusCode = null; this.finished = false;
this.client = this.connection;
}; };
inherits(ClientResponse, IncomingMessage); inherits(OutgoingMessage, node.EventEmitter);
OutgoingMessage.prototype.send = function (data, encoding) {
if (data.constructor === String) {
encoding = encoding || "ascii";
} else {
encoding = "raw";
}
this.output.push([data, encoding]);
};
OutgoingMessage.prototype.sendHeaderLines = function (first_line, header_lines) {
header_lines = header_lines || [];
// first_line in the case of request is: "GET /index.html HTTP/1.1\r\n"
// in the case of response it is: "HTTP/1.1 200 OK\r\n"
var header = first_line;
for (var i = 0; i < header_lines.length; i++) {
var field = header_lines[i][0];
var value = header_lines[i][1];
function connectionListener (connection) { header += field + ": " + value + CRLF;
// An array of responses for each connection. In pipelined connections
// we need to keep track of the order they were sent. if (connection_expression.exec(field)) {
connection.responses = []; this.sent_connection_header = true;
if (close_expression.exec(value)) this.closeOnFinish = true;
// is this really needed? } else if (transfer_encoding_expression.exec(field)) {
connection.addListener("eof", function () { this.sent_transfer_encoding_header = true;
if (connection.responses.length == 0) { if (chunk_expression.exec(value)) this.chunked_encoding = true;
connection.close();
} else if (content_length_expression.exec(field)) {
this.sent_content_length_header = true;
}
}
// keep-alive logic
if (this.sent_connection_header == false) {
if (this.should_keep_alive) {
header += "Connection: keep-alive\r\n";
} else { } else {
connection.responses[connection.responses.length-1].closeOnFinish = true; this.closeOnFinish = true;
header += "Connection: close\r\n";
} }
}); }
if (this.sent_content_length_header == false && this.sent_transfer_encoding_header == false) {
if (this.use_chunked_encoding_by_default) {
header += "Transfer-Encoding: chunked\r\n";
this.chunked_encoding = true;
}
}
header += CRLF;
this.send(header);
};
OutgoingMessage.prototype.sendBody = function (chunk, encoding) {
/*
if (this.sent_content_length_header == false && this.chunked_encoding == false) {
throw "Content-Length header (or Transfer-Encoding:chunked) not set";
}
*/
if (this.chunked_encoding) {
this.send(chunk.length.toString(16));
this.send(CRLF);
this.send(chunk, encoding);
this.send(CRLF);
} else {
this.send(chunk, encoding);
}
var req, res; this.flush();
};
OutgoingMessage.prototype.flush = function () {
this.emit("flush");
};
OutgoingMessage.prototype.finish = function () {
if (this.chunked_encoding) this.send("0\r\n\r\n"); // last chunk
this.finished = true;
this.flush();
};
var ServerResponse = function () {
OutgoingMessage.call(this);
this.should_keep_alive = true;
this.use_chunked_encoding_by_default = true;
};
inherits(ServerResponse, OutgoingMessage);
ServerResponse.prototype.sendHeader = function (statusCode, headers) {
var reason = node.http.STATUS_CODES[statusCode] || "unknown";
var status_line = "HTTP/1.1 " + statusCode.toString() + " " + reason + CRLF;
this.sendHeaderLines(status_line, headers);
};
var ClientRequest = function (method, uri, header_lines) {
OutgoingMessage.call(this);
this.should_keep_alive = false;
this.use_chunked_encoding_by_default = false;
this.closeOnFinish = true;
this.sendHeaderLines(method + " " + uri + " HTTP/1.1\r\n", header_lines);
};
inherits(ClientRequest, OutgoingMessage);
ClientRequest.prototype.finish = function (responseListener) {
this.addListener("response", responseListener);
OutgoingMessage.prototype.finish.call(this);
};
function createIncomingMessageStream (connection, incoming_listener) {
var stream = new node.EventEmitter();
stream.addListener("incoming", incoming_listener);
var incoming;
var last_header_was_a_value = false;
connection.addListener("message_begin", function () { connection.addListener("message_begin", function () {
req = new ServerRequest(connection); incoming = new IncomingMessage(connection);
res = new node.http.ServerResponse(connection);
}); });
// Only servers will get URI events.
connection.addListener("uri", function (data) { connection.addListener("uri", function (data) {
req.uri += data; incoming.uri += data;
}); });
connection.addListener("header_field", function (data) { connection.addListener("header_field", function (data) {
if (req.headers.length > 0 && req.last_was_value == false) if (incoming.headers.length > 0 && last_header_was_a_value == false) {
req.headers[req.headers.length-1][0] += data; incoming.headers[incoming.headers.length-1][0] += data;
else } else {
req.headers.push([data]); incoming.headers.push([data]);
req.last_was_value = false; }
last_header_was_a_value = false;
}); });
connection.addListener("header_value", function (data) { connection.addListener("header_value", function (data) {
var last_pair = req.headers[req.headers.length-1]; var last_pair = incoming.headers[incoming.headers.length-1];
if (last_pair.length == 1) if (last_pair.length == 1) {
last_pair[1] = data; last_pair[1] = data;
else } else {
last_pair[1] += data; last_pair[1] += data;
req.last_was_value = true; }
last_header_was_a_value = true;
}); });
connection.addListener("headers_complete", function (info) { connection.addListener("headers_complete", function (info) {
req.httpVersion = info.httpVersion; incoming.httpVersion = info.httpVersion;
req.method = info.method;
req.uri = node.http.parseUri(req.uri); // TODO parse the URI lazily?
res.should_keep_alive = info.should_keep_alive; if (info.method) {
// server only
incoming.method = info.method;
incoming.uri = node.http.parseUri(incoming.uri); // TODO parse the URI lazily?
} else {
// client only
incoming.statusCode = info.statusCode;
}
connection.server.emit("request", [req, res]); stream.emit("incoming", [incoming, info.should_keep_alive]);
}); });
connection.addListener("body", function (chunk) { connection.addListener("body", function (chunk) {
req._emitBody(chunk); incoming.emit("body", [chunk]);
}); });
connection.addListener("message_complete", function () { connection.addListener("message_complete", function () {
req._emitComplete() incoming.emit("complete");
}); });
return stream;
} }
node.http.ServerResponse = function (connection) { /* Returns true if the message queue is finished and the connection
var responses = connection.responses; * should be closed. */
responses.push(this); function flushMessageQueue (connection, queue) {
this.connection = connection; if (connection.readyState === "closed" || connection.readyState === "readOnly") {
this.closeOnFinish = false; return false;
var output = []; }
var chunked_encoding = false;
this.sendHeader = function (statusCode, headers) {
var sent_connection_header = false;
var sent_transfer_encoding_header = false;
var sent_content_length_header = false;
var reason = node.http.STATUS_CODES[statusCode] || "unknown";
var header = "HTTP/1.1 " + statusCode.toString() + " " + reason + CRLF;
for (var i = 0; i < headers.length; i++) {
var field = headers[i][0];
var value = headers[i][1];
header += field + ": " + value + CRLF;
if (connection_expression.exec(field)) {
sent_connection_header = true;
if (close_expression.exec(value)) this.closeOnFinish = true;
} else if (transfer_encoding_expression.exec(field)) {
sent_transfer_encoding_header = true;
if (chunk_expression.exec(value)) chunked_encoding = true;
} else if (content_length_expression.exec(field)) {
sent_content_length_header = true;
}
}
// keep-alive logic while (queue[0]) {
if (sent_connection_header == false) { var message = queue[0];
if (this.should_keep_alive) {
header += "Connection: keep-alive\r\n";
} else {
this.closeOnFinish = true;
header += "Connection: close\r\n";
}
}
if ( sent_content_length_header == false && sent_transfer_encoding_header == false ) { while (message.output.length > 0) {
header += "Transfer-Encoding: chunked\r\n"; var out = message.output.shift();
chunked_encoding = true; connection.send(out[0], out[1]);
} }
if (!message.finished) break;
header += CRLF; message.emit("sent");
queue.shift();
send(output, header); if (message.closeOnFinish) return true;
}; }
return false;
}
this.sendBody = function (chunk, encoding) { /* This is a wrapper around the LowLevelServer interface. It provides
if (chunked_encoding) { * connection handling, overflow checking, and some data buffering.
send(output, chunk.length.toString(16)); */
send(output, CRLF); node.http.createServer = function (requestListener, options) {
send(output, chunk, encoding); var server = new node.http.LowLevelServer();
send(output, CRLF); //server.setOptions(options);
} else { server.addListener("request", requestListener);
send(output, chunk, encoding); server.addListener("connection", connectionListener);
} return server;
};
this.flush(); function connectionListener (connection) {
}; // An array of responses for each connection. In pipelined connections
// we need to keep track of the order they were sent.
var responses = [];
this.flush = function () { // is this really needed?
if (connection.readyState === "closed" || connection.readyState === "readOnly") connection.addListener("eof", function () {
{ if (responses.length == 0) {
responses = []; connection.close();
return; } else {
} responses[responses.length-1].closeOnFinish = true;
if (responses.length > 0 && responses[0] === this) {
while (output.length > 0) {
var out = output.shift();
connection.send(out[0], out[1]);
}
} }
}; });
this.finished = false;
this.finish = function () {
if (chunked_encoding) send(output, "0\r\n\r\n"); // last chunk
this.finished = true;
while (responses.length > 0 && responses[0].finished) { var flushResponse = function () {
var res = responses[0]; if(flushMessageQueue(connection, responses)) {
res.flush(); connection.fullClose();
if (res.closeOnFinish)
connection.fullClose();
responses.shift();
} }
}; };
};
createIncomingMessageStream(connection, function (incoming, should_keep_alive) {
var req = incoming;
node.http.Client = node.http.LowLevelClient; // FIXME var res = new ServerResponse(connection);
res.should_keep_alive = should_keep_alive;
res.addListener("flush", flushResponse);
responses.push(res);
connection.server.emit("request", [req, res]);
});
}
node.http.Client.prototype.flush = function (request) { node.http.Client = node.http.LowLevelClient; // FIXME
//p(request);
if (this.readyState == "closed") {
this.reconnect();
return;
}
//node.debug("HTTP CLIENT flush. readyState = " + connection.readyState);
while ( request === this.requests[0] && request.output.length > 0 && this.readyState == "open" ) {
var out = request.output.shift();
this.send(out[0], out[1]);
}
};
node.http.createClient = function (port, host) { node.http.createClient = function (port, host) {
var client = new node.http.Client(); var client = new node.http.Client();
client.requests = []; var requests = [];
client.reconnect = function () { return client.connect(port, host); }; client._pushRequest = function (req) {
req.addListener("flush", function () {
if (client.readyState == "closed") {
//node.debug("HTTP CLIENT request flush. reconnect. readyState = " + client.readyState);
client.connect(port, host); // reconnect
return;
}
if (req == requests[0]) flushMessageQueue(client, [req]);
});
requests.push(req);
};
client.addListener("connect", function () { client.addListener("connect", function () {
//node.debug("HTTP CLIENT onConnect. readyState = " + client.readyState); //node.debug("HTTP CLIENT onConnect. readyState = " + client.readyState);
//node.debug("client.requests[0].uri = '" + client.requests[0].uri + "'"); //node.debug("client.requests[0].uri = '" + client.requests[0].uri + "'");
client.flush(client.requests[0]); requests[0].flush();
}); });
client.addListener("eof", function () { client.addListener("eof", function () {
//node.debug("client got eof closing. readyState = " + client.readyState);
client.close(); client.close();
}); });
@ -405,159 +477,61 @@ node.http.createClient = function (port, host) {
} }
//node.debug("HTTP CLIENT onDisconnect. readyState = " + client.readyState); //node.debug("HTTP CLIENT onDisconnect. readyState = " + client.readyState);
// If there are more requests to handle, reconnect. // If there are more requests to handle, reconnect.
if (client.requests.length > 0) { if (requests.length > 0 && client.readyState != "opening") {
//node.debug("HTTP CLIENT: reconnecting"); //node.debug("HTTP CLIENT: reconnecting readyState = " + client.readyState);
client.connect(port, host); client.connect(port, host); // reconnect
} }
}); });
var req, res; createIncomingMessageStream(client, function (res) {
//node.debug("incoming response!");
client.addListener("message_begin", function () {
req = client.requests.shift();
res = new ClientResponse(client);
});
client.addListener("header_field", function (data) { res.addListener("complete", function ( ) {
if (res.headers.length > 0 && res.last_was_value == false) //node.debug("request complete disconnecting. readyState = " + client.readyState);
res.headers[res.headers.length-1][0] += data; client.close();
else });
res.headers.push([data]);
res.last_was_value = false;
});
client.addListener("header_value", function (data) {
var last_pair = res.headers[res.headers.length-1];
if (last_pair.length == 1) {
last_pair[1] = data;
} else {
last_pair[1] += data;
}
res.last_was_value = true;
});
client.addListener("headers_complete", function (info) {
res.statusCode = info.statusCode;
res.httpVersion = info.httpVersion;
var req = requests.shift();
req.emit("response", [res]); req.emit("response", [res]);
}); });
client.addListener("body", function (chunk) {
res._emitBody(chunk);
});
client.addListener("message_complete", function () {
client.close();
res._emitComplete();
});
return client; return client;
}; };
node.http.Client.prototype.get = function (uri, headers) { node.http.Client.prototype.get = function (uri, headers) {
var req = createClientRequest(this, "GET", uri, headers); var req = new ClientRequest("GET", uri, headers);
this.requests.push(req); this._pushRequest(req);
return req; return req;
}; };
node.http.Client.prototype.head = function (uri, headers) { node.http.Client.prototype.head = function (uri, headers) {
var req = createClientRequest(this, "HEAD", uri, headers); var req = new ClientRequest("HEAD", uri, headers);
this.requests.push(req); this._pushRequest(req);
return req; return req;
}; };
node.http.Client.prototype.post = function (uri, headers) { node.http.Client.prototype.post = function (uri, headers) {
var req = createClientRequest(this, "POST", uri, headers); var req = new ClientRequest("POST", uri, headers);
this.requests.push(req); this._pushRequest(req);
return req; return req;
}; };
node.http.Client.prototype.del = function (uri, headers) { node.http.Client.prototype.del = function (uri, headers) {
var req = createClientRequest(this, "DELETE", uri, headers); var req = new ClientRequest("DELETE", uri, headers);
this.requests.push(req); this._pushRequest(req);
return req; return req;
}; };
node.http.Client.prototype.put = function (uri, headers) { node.http.Client.prototype.put = function (uri, headers) {
var req = createClientRequest(this, "PUT", uri, headers); var req = new ClientRequest("PUT", uri, headers);
this.requests.push(req); this._pushRequest(req);
return req; return req;
}; };
function createClientRequest (connection, method, uri, header_lines) {
var req = new node.EventEmitter;
req.uri = uri;
var chunked_encoding = false;
req.closeOnFinish = false;
var sent_connection_header = false;
var sent_transfer_encoding_header = false;
var sent_content_length_header = false;
var header = method + " " + uri + " HTTP/1.1\r\n";
header_lines = header_lines || [];
for (var i = 0; i < header_lines.length; i++) {
var field = header_lines[i][0];
var value = header_lines[i][1];
header += field + ": " + value + CRLF;
if (connection_expression.exec(field)) {
sent_connection_header = true;
if (close_expression.exec(value)) req.closeOnFinish = true;
} else if (transfer_encoding_expression.exec(field)) {
sent_transfer_encoding_header = true;
if (chunk_expression.exec(value)) chunked_encoding = true;
} else if (content_length_expression.exec(field)) {
sent_content_length_header = true;
}
}
if (sent_connection_header == false) {
header += "Connection: keep-alive\r\n";
}
header += CRLF;
req.output = [];
send(req.output, header);
req.sendBody = function (chunk, encoding) {
if (sent_content_length_header == false && chunked_encoding == false) {
throw "Content-Length header (or Transfer-Encoding:chunked) not set";
}
if (chunked_encoding) {
send(req.output, chunk.length.toString(16));
send(req.output, CRLF);
send(req.output, chunk, encoding);
send(req.output, CRLF);
} else {
send(req.output, chunk, encoding);
}
connection.flush(req);
};
req.finished = false;
req.finish = function (responseListener) {
req.addListener("response", responseListener);
if (chunked_encoding)
send(req.output, "0\r\n\r\n"); // last chunk
connection.flush(req);
};
return req;
}
node.http.cat = function (url, encoding) { node.http.cat = function (url, encoding) {
var promise = new node.Promise(); var promise = new node.Promise();

6
src/net.h

@ -88,6 +88,11 @@ private:
static void on_close (evnet_socket *s) { static void on_close (evnet_socket *s) {
Connection *connection = static_cast<Connection*> (s->data); Connection *connection = static_cast<Connection*> (s->data);
assert(connection->socket_.fd < 0);
assert(connection->socket_.read_action == NULL);
assert(connection->socket_.write_action == NULL);
connection->OnDisconnect(); connection->OnDisconnect();
if (s->errorno) { if (s->errorno) {
@ -95,6 +100,7 @@ private:
} }
assert(connection->attached_); assert(connection->attached_);
connection->Detach(); connection->Detach();
} }

Loading…
Cancel
Save