Browse Source

net2 HTTPClient work

v0.7.4-release
Ryan Dahl 15 years ago
parent
commit
6db43f4c29
  1. 513
      lib/http2.js
  2. 4
      test/simple/test-http-client-upload.js

513
lib/http2.js

@ -4,6 +4,108 @@ var events = require('events');
var HTTPParser = process.binding('http_parser').HTTPParser; var HTTPParser = process.binding('http_parser').HTTPParser;
var parserFreeList = [];
function newParser (type) {
var parser;
if (parserFreeList.length) {
parser = parserFreeList.shift();
parser.reinitialize(type);
} else {
parser = new HTTPParser(type);
parser.onMessageBegin = function () {
parser.incoming = new IncomingMessage(parser.socket);
parser.field = null;
parser.value = null;
};
// Only servers will get URL events.
parser.onURL = function (b, start, len) {
var slice = b.asciiSlice(start, start+len);
if (parser.incoming.url) {
parser.incoming.url += slice;
} else {
// Almost always will branch here.
parser.incoming.url = slice;
}
};
parser.onHeaderField = function (b, start, len) {
var slice = b.asciiSlice(start, start+len).toLowerCase();
if (parser.value) {
parser.incoming._addHeaderLine(parser.field, parser.value);
parser.field = null;
parser.value = null;
}
if (parser.field) {
parser.field += slice;
} else {
parser.field = slice;
}
};
parser.onHeaderValue = function (b, start, len) {
var slice = b.asciiSlice(start, start+len);
if (parser.value) {
parser.value += slice;
} else {
parser.value = slice;
}
};
parser.onHeadersComplete = function (info) {
if (parser.field && parser.value) {
parser.incoming._addHeaderLine(parser.field, parser.value);
}
parser.incoming.httpVersionMajor = info.versionMajor;
parser.incoming.httpVersionMinor = info.versionMinor;
if (info.method) {
// server only
parser.incoming.method = info.method;
} else {
// client only
parser.incoming.statusCode = info.statusCode;
}
parser.onIncoming(parser.incoming, info.shouldKeepAlive);
};
parser.onBody = function (b, start, len) {
// TODO body encoding?
var enc = parser.incoming._encoding;
if (!enc) {
parser.incoming.emit('data', b.slice(start, start+len));
} else {
var string;
switch (enc) {
case 'utf8':
string = b.utf8Slice(start, start+len);
break;
case 'ascii':
string = b.asciiSlice(start, start+len);
break;
default:
throw new Error('Unsupported encoding ' + self._encoding + '. Use Buffer');
}
parser.incoming.emit('data', string);
}
};
parser.onMessageComplete = function () {
parser.incoming.emit("end");
};
}
return parser;
}
function freeParser (parser) {
if (parserFreeList.length < 1000) parserFreeList.push(parser);
}
var CRLF = "\r\n"; var CRLF = "\r\n";
var STATUS_CODES = exports.STATUS_CODES = { var STATUS_CODES = exports.STATUS_CODES = {
100 : 'Continue', 100 : 'Continue',
@ -45,11 +147,11 @@ var STATUS_CODES = exports.STATUS_CODES = {
505 : 'HTTP Version not supported' 505 : 'HTTP Version not supported'
}; };
var connectionExpression = /Connection/i; var connection_expression = /Connection/i;
var transferEncodingExpression = /Transfer-Encoding/i; var transfer_encoding_expression = /Transfer-Encoding/i;
var closeExpression = /close/i; var close_expression = /close/i;
var chunkExpression = /chunk/i; var chunk_expression = /chunk/i;
var contentLengthExpression = /Content-Length/i; var content_length_expression = /Content-Length/i;
/* Abstract base class for ServerRequest and ClientResponse. */ /* Abstract base class for ServerRequest and ClientResponse. */
@ -60,10 +162,14 @@ function IncomingMessage (socket) {
this.httpVersion = null; this.httpVersion = null;
this.headers = {}; this.headers = {};
// request (server) only
this.url = "";
this.method = null; this.method = null;
// response (client) only // response (client) only
this.statusCode = null; this.statusCode = null;
this.client = this.socket;
} }
sys.inherits(IncomingMessage, events.EventEmitter); sys.inherits(IncomingMessage, events.EventEmitter);
exports.IncomingMessage = IncomingMessage; exports.IncomingMessage = IncomingMessage;
@ -73,16 +179,21 @@ IncomingMessage.prototype._parseQueryString = function () {
}; };
IncomingMessage.prototype.setBodyEncoding = function (enc) { IncomingMessage.prototype.setBodyEncoding = function (enc) {
// TODO: Find a cleaner way of doing this. // TODO deprecation message?
this.socket.setEncoding(enc); this.setEncoding(enc);
};
IncomingMessage.prototype.setEncoding = function (enc) {
// TODO check values, error out on bad, and deprecation message?
this._encoding = enc.toLowerCase();
}; };
IncomingMessage.prototype.pause = function () { IncomingMessage.prototype.pause = function () {
this.socket.readPause(); this.socket.pause();
}; };
IncomingMessage.prototype.resume = function () { IncomingMessage.prototype.resume = function () {
this.socket.readResume(); this.socket.resume();
}; };
IncomingMessage.prototype._addHeaderLine = function (field, value) { IncomingMessage.prototype._addHeaderLine = function (field, value) {
@ -95,18 +206,21 @@ IncomingMessage.prototype._addHeaderLine = function (field, value) {
} }
}; };
function OutgoingMessage () { function OutgoingMessage (socket) {
events.EventEmitter.call(this); events.EventEmitter.call(this, socket);
this.socket = socket;
this.output = []; this.output = [];
this.outputEncodings = []; this.outputEncodings = [];
this.closeOnFinish = false; this.closeOnFinish = false;
this.chunkEncoding = false; this.chunked_encoding = false;
this.shouldKeepAlive = true; this.should_keep_alive = true;
this.useChunkedEncodingByDefault = true; this.use_chunked_encoding_by_default = true;
this.flushing = false; this.flushing = false;
this.headWritten = false;
this.finished = false; this.finished = false;
} }
@ -141,14 +255,14 @@ OutgoingMessage.prototype._send = function (data, encoding) {
this.outputEncodings.push(encoding); this.outputEncodings.push(encoding);
}; };
OutgoingMessage.prototype._sendHeaderLines = function (first_line, headers) { OutgoingMessage.prototype.sendHeaderLines = function (first_line, headers) {
var sentConnectionHeader = false; var sent_connection_header = false;
var sendContentLengthHeader = false; var sent_content_length_header = false;
var sendTransferEncodingHeader = false; var sent_transfer_encoding_header = false;
// first_line in the case of request is: "GET /index.html HTTP/1.1\r\n" // first_line in the case of request is: "GET /index.html HTTP/1.1\r\n"
// in the case of response it is: "HTTP/1.1 200 OK\r\n" // in the case of response it is: "HTTP/1.1 200 OK\r\n"
var messageHeader = first_line; var message_header = first_line;
var field, value; var field, value;
for (var i in headers) { for (var i in headers) {
if (headers[i] instanceof Array) { if (headers[i] instanceof Array) {
@ -160,52 +274,64 @@ OutgoingMessage.prototype._sendHeaderLines = function (first_line, headers) {
value = headers[i]; value = headers[i];
} }
messageHeader += field + ": " + value + CRLF; message_header += field + ": " + value + CRLF;
if (connectionExpression.test(field)) { if (connection_expression.test(field)) {
sentConnectionHeader = true; sent_connection_header = true;
if (closeExpression.test(value)) this.closeOnFinish = true; if (close_expression.test(value)) this.closeOnFinish = true;
} else if (transferEncodingExpression.test(field)) { } else if (transfer_encoding_expression.test(field)) {
sendTransferEncodingHeader = true; sent_transfer_encoding_header = true;
if (chunkExpression.test(value)) this.chunkEncoding = true; if (chunk_expression.test(value)) this.chunked_encoding = true;
} else if (contentLengthExpression.test(field)) { } else if (content_length_expression.test(field)) {
sendContentLengthHeader = true; sent_content_length_header = true;
} }
} }
// keep-alive logic // keep-alive logic
if (sentConnectionHeader == false) { if (sent_connection_header == false) {
if (this.shouldKeepAlive && if (this.should_keep_alive &&
(sendContentLengthHeader || this.useChunkedEncodingByDefault)) { (sent_content_length_header || this.use_chunked_encoding_by_default)) {
messageHeader += "Connection: keep-alive\r\n"; message_header += "Connection: keep-alive\r\n";
} else { } else {
this.closeOnFinish = true; this.closeOnFinish = true;
messageHeader += "Connection: close\r\n"; message_header += "Connection: close\r\n";
} }
} }
if (sendContentLengthHeader == false && sendTransferEncodingHeader == false) { if (sent_content_length_header == false && sent_transfer_encoding_header == false) {
if (this.useChunkedEncodingByDefault) { if (this.use_chunked_encoding_by_default) {
messageHeader += "Transfer-Encoding: chunked\r\n"; message_header += "Transfer-Encoding: chunked\r\n";
this.chunkEncoding = true; this.chunked_encoding = true;
} }
else { else {
this.closeOnFinish = true; this.closeOnFinish = true;
} }
} }
messageHeader += CRLF; message_header += CRLF;
this._send(messageHeader); this._send(message_header);
// wait until the first body chunk, or finish(), is sent to flush. // wait until the first body chunk, or close(), is sent to flush.
}; };
OutgoingMessage.prototype.sendBody = function () {
throw new Error("sendBody() has been renamed to write(). " +
"The 'body' event has been renamed to 'data' and " +
"the 'complete' event has been renamed to 'end'.");
};
OutgoingMessage.prototype.write = function (chunk, encoding) { OutgoingMessage.prototype.write = function (chunk, encoding) {
if ( (this instanceof ServerResponse) && !this.headWritten) {
throw new Error("writeHead() must be called before write()")
}
encoding = encoding || "ascii"; encoding = encoding || "ascii";
if (this.chunkEncoding) { if (this.chunked_encoding) {
this._send(process._byteLength(chunk, encoding).toString(16)); this._send(process._byteLength(chunk, encoding).toString(16));
this._send(CRLF); this._send(CRLF);
this._send(chunk, encoding); this._send(chunk, encoding);
@ -221,65 +347,90 @@ OutgoingMessage.prototype.write = function (chunk, encoding) {
} }
}; };
OutgoingMessage.prototype.sendBody = function () {
throw new Error('sendBody() renamed to write()');
};
OutgoingMessage.prototype.flush = function () { OutgoingMessage.prototype.flush = function () {
this.emit("flush"); this.emit("flush");
}; };
OutgoingMessage.prototype.finish = function () {
throw new Error("finish() has been renamed to close().");
};
OutgoingMessage.prototype.close = function () { OutgoingMessage.prototype.close = function () {
if (this.chunkEncoding) this._send("0\r\n\r\n"); // last chunk if (this.chunked_encoding) this._send("0\r\n\r\n"); // last chunk
this.finished = true; this.finished = true;
this.flush(); this.flush();
}; };
function ServerResponse (req) { function ServerResponse (req) {
OutgoingMessage.call(this); OutgoingMessage.call(this, req.socket);
if (req.httpVersionMajor < 1 || req.httpVersionMinor < 1) { if (req.httpVersionMajor < 1 || req.httpVersionMinor < 1) {
this.useChunkedEncodingByDefault = false; this.use_chunked_encoding_by_default = false;
this.shouldKeepAlive = false; this.should_keep_alive = false;
} }
} }
sys.inherits(ServerResponse, OutgoingMessage); sys.inherits(ServerResponse, OutgoingMessage);
exports.ServerResponse = ServerResponse; exports.ServerResponse = ServerResponse;
ServerResponse.prototype.writeHead = function (statusCode, headers) {
var reason = STATUS_CODES[statusCode] || "unknown";
var status_line = "HTTP/1.1 " + statusCode.toString() + " " + reason + CRLF;
this._sendHeaderLines(status_line, headers);
};
ServerResponse.prototype.writeHeader = ServerResponse.prototype.writeHead; ServerResponse.prototype.writeHead = function (statusCode) {
var reasonPhrase, headers, headerIndex;
ServerResponse.prototype.sendHeader = function () { if (typeof arguments[1] == 'string') {
throw new Error('sendHeader renamed to writeHead()'); reasonPhrase = arguments[1];
headerIndex = 2;
} else {
reasonPhrase = STATUS_CODES[statusCode] || "unknown";
headerIndex = 1;
}
if (typeof arguments[headerIndex] == 'object') {
headers = arguments[headerIndex];
} else {
headers = {};
}
var status_line = "HTTP/1.1 " + statusCode.toString() + " "
+ reasonPhrase + CRLF;
this.sendHeaderLines(status_line, headers);
this.headWritten = true;
}; };
// TODO eventually remove sendHeader(), writeHeader()
ServerResponse.prototype.sendHeader = ServerResponse.prototype.writeHead;
ServerResponse.prototype.writeHeader = ServerResponse.prototype.writeHead;
function ClientRequest (method, url, headers) { function ClientRequest (socket, method, url, headers) {
OutgoingMessage.call(this); OutgoingMessage.call(this, socket);
this.shouldKeepAlive = false; this.should_keep_alive = false;
if (method === "GET" || method === "HEAD") { if (method === "GET" || method === "HEAD") {
this.useChunkedEncodingByDefault = false; this.use_chunked_encoding_by_default = false;
} else { } else {
this.useChunkedEncodingByDefault = true; this.use_chunked_encoding_by_default = true;
} }
this.closeOnFinish = true; this.closeOnFinish = true;
this._sendHeaderLines(method + " " + url + " HTTP/1.1\r\n", headers); this.sendHeaderLines(method + " " + url + " HTTP/1.1\r\n", headers);
} }
sys.inherits(ClientRequest, OutgoingMessage); sys.inherits(ClientRequest, OutgoingMessage);
exports.ClientRequest = ClientRequest; exports.ClientRequest = ClientRequest;
ClientRequest.prototype.finish = function (responseListener) { ClientRequest.prototype.finish = function () {
this.addListener("response", responseListener); throw new Error( "finish() has been renamed to close() and no longer takes "
OutgoingMessage.prototype.finish.call(this); + "a response handler as an argument. Manually add a 'response' listener "
+ "to the request object."
);
};
ClientRequest.prototype.close = function () {
if (arguments.length > 0) {
throw new Error( "ClientRequest.prototype.close does not take any arguments. "
+ "Add a response listener manually to the request object."
);
}
OutgoingMessage.prototype.close.call(this);
}; };
@ -309,97 +460,27 @@ function flushMessageQueue (socket, queue) {
} }
var parserFreeList = []; function Server (requestListener) {
net.Server.call(this);
function newParser (type) { this.addListener("request", requestListener);
var parser; this.addListener("connection", connectionListener);
if (parserFreeList.length) {
parser = parserFreeList.shift();
parser.reinitialize(type);
} else {
parser = new HTTPParser(type);
parser.onMessageBegin = function () {
parser.incoming = new IncomingMessage(parser.socket);
parser.field = null;
parser.value = null;
};
// Only servers will get URL events.
parser.onURL = function (b, start, len) {
var slice = b.asciiSlice(start, start+len);
if (parser.incoming.url) {
parser.incoming.url += slice;
} else {
// Almost always will branch here.
parser.incoming.url = slice;
}
};
parser.onHeaderField = function (b, start, len) {
var slice = b.asciiSlice(start, start+len).toLowerCase();
if (parser.value) {
parser.incoming._addHeaderLine(parser.field, parser.value);
parser.field = null;
parser.value = null;
}
if (parser.field) {
parser.field += slice;
} else {
parser.field = slice;
}
};
parser.onHeaderValue = function (b, start, len) {
var slice = b.asciiSlice(start, start+len);
if (parser.value) {
parser.value += slice;
} else {
parser.value = slice;
}
};
parser.onHeadersComplete = function (info) {
if (parser.field && parser.value) {
parser.incoming._addHeaderLine(parser.field, parser.value);
}
parser.incoming.httpVersionMajor = info.versionMajor;
parser.incoming.httpVersionMinor = info.versionMinor;
if (info.method) {
// server only
parser.incoming.method = info.method;
} else {
// client only
parser.incoming.statusCode = info.statusCode;
}
parser.onIncoming(parser.incoming, info.shouldKeepAlive);
};
parser.onBody = function (b, start, len) {
parser.incoming.emit("data", b.slice(start, start+len));
};
parser.onMessageComplete = function () {
parser.incoming.emit("end");
};
}
return parser;
} }
sys.inherits(Server, net.Server);
function freeParser (parser) { exports.Server = Server;
if (parserFreeList.length < 1000) parserFreeList.push(parser);
} exports.createServer = function (requestListener) {
return new Server(requestListener);
};
function connectionListener (socket) { function connectionListener (socket) {
var self = this; var self = this;
var parser = newParser('request');
// An array of responses for each socket. In pipelined connections // An array of responses for each socket. In pipelined connections
// we need to keep track of the order they were sent. // we need to keep track of the order they were sent.
var responses = []; var responses = [];
var parser = newParser('request');
socket.ondata = function (d, start, end) { socket.ondata = function (d, start, end) {
parser.execute(d, start, end - start); parser.execute(d, start, end - start);
}; };
@ -437,64 +518,56 @@ function connectionListener (socket) {
} }
function Server (requestListener, options) { function Client ( ) {
net.Server.call(this, connectionListener);
//server.setOptions(options);
this.addListener('request', requestListener);
}
sys.inherits(Server, net.Server);
exports.Server = Server;
exports.createServer = function (requestListener, options) {
return new Server(requestListener, options);
};
function Client () {
net.Stream.call(this); net.Stream.call(this);
var self = this; var self = this;
var requests = []; var requests = [];
var currentRequest; var currentRequest;
var parser = newParser('response'); var parser = newParser('response');
parser.socket = self; parser.socket = this;
self.addListener("connect", function () {
self.resetParser();
currentRequest = requests.shift();
currentRequest.flush();
});
self.ondata = function (d, start, end) { self._reconnect = function () {
parser.execute(d, start, end - start); if (self.readyState != "opening") {
//sys.debug("HTTP CLIENT: reconnecting readyState = " + self.readyState);
self.connect(self.port, self.host);
}
}; };
parser.onIncoming = function (res) { self._pushRequest = function (req) {
//sys.debug("incoming response!"); req.addListener("flush", function () {
/*
res.addListener('end', function ( ) { if (self.readyState == "closed") {
//sys.debug("request complete disconnecting. readyState = " + self.readyState); //sys.debug("HTTP CLIENT request flush. reconnect. readyState = " + self.readyState);
self.close(); self._reconnect();
return;
}
*/
//sys.debug("self flush readyState = " + self.readyState);
if (req == currentRequest) flushMessageQueue(self, [req]);
}); });
requests.push(req);
currentRequest.emit("response", res);
}; };
self._pushRequest = function (req) { this.ondata = function (d, start, end) {
parser.execute(d, start, end - start);
}; };
self.addListener("end", function () { self.addListener("connect", function () {
self.close(); parser.reinitialize('response');
currentRequest = requests.shift();
currentRequest.flush();
}); });
self.onend = function () { self.addListener("end", function () {
parser.finish(); parser.finish();
// unref the parser for easy gc
freeParser(parser); freeParser(parser);
//sys.debug("self got end closing. readyState = " + self.readyState); //sys.debug("self got end closing. readyState = " + self.readyState);
self.close(); self.close();
}; });
self.addListener("close", function (had_error) { self.addListener("close", function (had_error) {
if (had_error) { if (had_error) {
@ -509,53 +582,30 @@ function Client () {
self._reconnect(); self._reconnect();
} }
}); });
}
sys.inherits(Client, net.Stream);
parser.onIncoming = function (res) {
sys.debug("incoming response!");
exports.Client = Client; res.addListener('end', function ( ) {
//sys.debug("request complete disconnecting. readyState = " + self.readyState);
self.close();
exports.createClient = function (port, host) { });
var client = new Client();
client.port = port;
client.host = host;
client.connect(port, host);
return client;
};
Client.prototype._reconnect = function () { currentRequest.emit("response", res);
if (this.readyState != "opening") { };
//sys.debug("HTTP CLIENT: reconnecting readyState = " + self.readyState);
this.connect(this.port, this.host);
}
}; };
sys.inherits(Client, net.Stream);
exports.Client = Client;
Client.prototype.request = function (method, url, headers) { exports.createClient = function (port, host) {
var self = this; var c = new Client;
c.port = port;
if (typeof(url) != "string") { // assume method was omitted, shift arguments c.host = host;
headers = url; c.connect(port, host);
url = method; return c;
method = null; }
}
var req = new ClientRequest(this, method || "GET", url, headers);
req.addListener("flush", function () {
if (self.readyState == "closed") {
//sys.debug("HTTP CLIENT request flush. reconnect. readyState = " + self.readyState);
self._reconnect();
return;
}
//sys.debug("self flush readyState = " + self.readyState);
if (req == currentRequest) flushMessageQueue(self, [req]);
});
requests.push(req);
return req;
};
Client.prototype.get = function () { Client.prototype.get = function () {
throw new Error("client.get(...) is now client.request('GET', ...)"); throw new Error("client.get(...) is now client.request('GET', ...)");
@ -577,6 +627,17 @@ Client.prototype.put = function () {
throw new Error("client.put(...) is now client.request('PUT', ...)"); throw new Error("client.put(...) is now client.request('PUT', ...)");
}; };
Client.prototype.request = function (method, url, headers) {
if (typeof(url) != "string") { // assume method was omitted, shift arguments
headers = url;
url = method;
method = null;
}
var req = new ClientRequest(this, method || "GET", url, headers);
this._pushRequest(req);
return req;
};
exports.cat = function (url, encoding_, headers_) { exports.cat = function (url, encoding_, headers_) {
var encoding = 'utf8', var encoding = 'utf8',

4
test/simple/test-http-client-upload.js

@ -1,5 +1,5 @@
require("../common"); require("../common");
http = require("http"); http = require("http2");
var sent_body = ""; var sent_body = "";
var server_req_complete = false; var server_req_complete = false;
@ -33,7 +33,7 @@ req.write('3\n');
puts("client finished sending request"); puts("client finished sending request");
req.addListener('response', function(res) { req.addListener('response', function(res) {
res.setBodyEncoding("utf8"); res.setEncoding("utf8");
res.addListener('data', function(chunk) { res.addListener('data', function(chunk) {
puts(chunk); puts(chunk);
}); });

Loading…
Cancel
Save