Browse Source

http: allow async createConnection()

This commit adds support for async createConnection()
implementations and is still backwards compatible with
synchronous createConnection() implementations.

This commit also makes the http client more friendly with
generic stream objects produced by createConnection() by
checking stream.writable instead of stream.destroyed as the
latter is currently a net.Socket-ism and not set by the core
stream implementations.

PR-URL: https://github.com/nodejs/node/pull/4638
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
process-exit-stdio-flushing
Brian White 9 years ago
parent
commit
9bee03aaf2
  1. 18
      doc/api/http.markdown
  2. 110
      lib/_http_agent.js
  3. 41
      lib/_http_client.js
  4. 68
      test/parallel/test-http-createConnection.js

18
doc/api/http.markdown

@ -117,6 +117,18 @@ options.agent = keepAliveAgent;
http.request(options, onResponseCallback); http.request(options, onResponseCallback);
``` ```
### agent.createConnection(options[, callback])
Produces a socket/stream to be used for HTTP requests.
By default, this function is the same as [`net.createConnection()`][]. However,
custom Agents may override this method in case greater flexibility is desired.
A socket/stream can be supplied in one of two ways: by returning the
socket/stream from this function, or by passing the socket/stream to `callback`.
`callback` has a signature of `(err, stream)`.
### agent.destroy() ### agent.destroy()
Destroy any sockets that are currently in use by the agent. Destroy any sockets that are currently in use by the agent.
@ -1117,6 +1129,10 @@ Options:
- `Agent` object: explicitly use the passed in `Agent`. - `Agent` object: explicitly use the passed in `Agent`.
- `false`: opts out of connection pooling with an Agent, defaults request to - `false`: opts out of connection pooling with an Agent, defaults request to
`Connection: close`. `Connection: close`.
- `createConnection`: A function that produces a socket/stream to use for the
request when the `agent` option is not used. This can be used to avoid
creating a custom Agent class just to override the default `createConnection`
function. See [`agent.createConnection()`][] for more details.
The optional `callback` parameter will be added as a one time listener for The optional `callback` parameter will be added as a one time listener for
the `'response'` event. the `'response'` event.
@ -1192,6 +1208,7 @@ There are a few special headers that should be noted.
[`'listening'`]: net.html#net_event_listening [`'listening'`]: net.html#net_event_listening
[`'response'`]: #http_event_response [`'response'`]: #http_event_response
[`Agent`]: #http_class_http_agent [`Agent`]: #http_class_http_agent
[`agent.createConnection`]: #http_agent_createconnection
[`Buffer`]: buffer.html#buffer_buffer [`Buffer`]: buffer.html#buffer_buffer
[`destroy()`]: #http_agent_destroy [`destroy()`]: #http_agent_destroy
[`EventEmitter`]: events.html#events_class_events_eventemitter [`EventEmitter`]: events.html#events_class_events_eventemitter
@ -1203,6 +1220,7 @@ There are a few special headers that should be noted.
[`http.Server`]: #http_class_http_server [`http.Server`]: #http_class_http_server
[`http.ServerResponse`]: #http_class_http_serverresponse [`http.ServerResponse`]: #http_class_http_serverresponse
[`message.headers`]: #http_message_headers [`message.headers`]: #http_message_headers
[`net.createConnection`]: net.html#net_net_createconnection_options_connectlistener
[`net.Server`]: net.html#net_class_net_server [`net.Server`]: net.html#net_class_net_server
[`net.Server.close()`]: net.html#net_server_close_callback [`net.Server.close()`]: net.html#net_server_close_callback
[`net.Server.listen()`]: net.html#net_server_listen_handle_callback [`net.Server.listen()`]: net.html#net_server_listen_handle_callback

110
lib/_http_agent.js

@ -44,7 +44,7 @@ function Agent(options) {
var name = self.getName(options); var name = self.getName(options);
debug('agent.on(free)', name); debug('agent.on(free)', name);
if (!socket.destroyed && if (socket.writable &&
self.requests[name] && self.requests[name].length) { self.requests[name] && self.requests[name].length) {
self.requests[name].shift().onSocket(socket); self.requests[name].shift().onSocket(socket);
if (self.requests[name].length === 0) { if (self.requests[name].length === 0) {
@ -57,7 +57,7 @@ function Agent(options) {
var req = socket._httpMessage; var req = socket._httpMessage;
if (req && if (req &&
req.shouldKeepAlive && req.shouldKeepAlive &&
!socket.destroyed && socket.writable &&
self.keepAlive) { self.keepAlive) {
var freeSockets = self.freeSockets[name]; var freeSockets = self.freeSockets[name];
var freeLen = freeSockets ? freeSockets.length : 0; var freeLen = freeSockets ? freeSockets.length : 0;
@ -138,7 +138,15 @@ Agent.prototype.addRequest = function(req, options) {
} else if (sockLen < this.maxSockets) { } else if (sockLen < this.maxSockets) {
debug('call onSocket', sockLen, freeLen); debug('call onSocket', sockLen, freeLen);
// If we are under maxSockets create a new one. // If we are under maxSockets create a new one.
req.onSocket(this.createSocket(req, options)); this.createSocket(req, options, function(err, newSocket) {
if (err) {
process.nextTick(function() {
req.emit('error', err);
});
return;
}
req.onSocket(newSocket);
});
} else { } else {
debug('wait for socket'); debug('wait for socket');
// We are over limit so we'll add it to the queue. // We are over limit so we'll add it to the queue.
@ -149,18 +157,16 @@ Agent.prototype.addRequest = function(req, options) {
} }
}; };
Agent.prototype.createSocket = function(req, options) { Agent.prototype.createSocket = function(req, options, cb) {
var self = this; var self = this;
options = util._extend({}, options); options = util._extend({}, options);
options = util._extend(options, self.options); options = util._extend(options, self.options);
if (!options.servername) { if (!options.servername) {
options.servername = options.host; options.servername = options.host;
if (req) { const hostHeader = req.getHeader('host');
var hostHeader = req.getHeader('host'); if (hostHeader) {
if (hostHeader) { options.servername = hostHeader.replace(/:.*$/, '');
options.servername = hostHeader.replace(/:.*$/, '');
}
} }
} }
@ -169,48 +175,58 @@ Agent.prototype.createSocket = function(req, options) {
debug('createConnection', name, options); debug('createConnection', name, options);
options.encoding = null; options.encoding = null;
var s = self.createConnection(options); var called = false;
if (!self.sockets[name]) { const newSocket = self.createConnection(options, oncreate);
self.sockets[name] = []; if (newSocket)
} oncreate(null, newSocket);
this.sockets[name].push(s); function oncreate(err, s) {
debug('sockets', name, this.sockets[name].length); if (called)
return;
called = true;
if (err)
return cb(err);
if (!self.sockets[name]) {
self.sockets[name] = [];
}
self.sockets[name].push(s);
debug('sockets', name, self.sockets[name].length);
function onFree() { function onFree() {
self.emit('free', s, options); self.emit('free', s, options);
} }
s.on('free', onFree); s.on('free', onFree);
function onClose(err) { function onClose(err) {
debug('CLIENT socket onClose'); debug('CLIENT socket onClose');
// This is the only place where sockets get removed from the Agent. // This is the only place where sockets get removed from the Agent.
// If you want to remove a socket from the pool, just close it. // If you want to remove a socket from the pool, just close it.
// All socket errors end in a close event anyway. // All socket errors end in a close event anyway.
self.removeSocket(s, options); self.removeSocket(s, options);
} }
s.on('close', onClose); s.on('close', onClose);
function onRemove() { function onRemove() {
// We need this function for cases like HTTP 'upgrade' // We need this function for cases like HTTP 'upgrade'
// (defined by WebSockets) where we need to remove a socket from the // (defined by WebSockets) where we need to remove a socket from the
// pool because it'll be locked up indefinitely // pool because it'll be locked up indefinitely
debug('CLIENT socket onRemove'); debug('CLIENT socket onRemove');
self.removeSocket(s, options); self.removeSocket(s, options);
s.removeListener('close', onClose); s.removeListener('close', onClose);
s.removeListener('free', onFree); s.removeListener('free', onFree);
s.removeListener('agentRemove', onRemove); s.removeListener('agentRemove', onRemove);
}
s.on('agentRemove', onRemove);
cb(null, s);
} }
s.on('agentRemove', onRemove);
return s;
}; };
Agent.prototype.removeSocket = function(s, options) { Agent.prototype.removeSocket = function(s, options) {
var name = this.getName(options); var name = this.getName(options);
debug('removeSocket', name, 'destroyed:', s.destroyed); debug('removeSocket', name, 'writable:', s.writable);
var sets = [this.sockets]; var sets = [this.sockets];
// If the socket was destroyed, remove it from the free buffers too. // If the socket was destroyed, remove it from the free buffers too.
if (s.destroyed) if (!s.writable)
sets.push(this.freeSockets); sets.push(this.freeSockets);
for (var sk = 0; sk < sets.length; sk++) { for (var sk = 0; sk < sets.length; sk++) {
@ -231,7 +247,15 @@ Agent.prototype.removeSocket = function(s, options) {
debug('removeSocket, have a request, make a socket'); debug('removeSocket, have a request, make a socket');
var req = this.requests[name][0]; var req = this.requests[name][0];
// If we have pending requests and a socket gets closed make a new one // If we have pending requests and a socket gets closed make a new one
this.createSocket(req, options).emit('free'); this.createSocket(req, options, function(err, newSocket) {
if (err) {
process.nextTick(function() {
req.emit('error', err);
});
return;
}
newSocket.emit('free');
});
} }
}; };

41
lib/_http_client.js

@ -33,7 +33,7 @@ function ClientRequest(options, cb) {
if (agent === false) { if (agent === false) {
agent = new defaultAgent.constructor(); agent = new defaultAgent.constructor();
} else if ((agent === null || agent === undefined) && } else if ((agent === null || agent === undefined) &&
!options.createConnection) { typeof options.createConnection !== 'function') {
agent = defaultAgent; agent = defaultAgent;
} }
self.agent = agent; self.agent = agent;
@ -118,10 +118,20 @@ function ClientRequest(options, cb) {
self._renderHeaders()); self._renderHeaders());
} }
var called = false;
if (self.socketPath) { if (self.socketPath) {
self._last = true; self._last = true;
self.shouldKeepAlive = false; self.shouldKeepAlive = false;
self.onSocket(self.agent.createConnection({ path: self.socketPath })); const optionsPath = {
path: self.socketPath
};
const newSocket = self.agent.createConnection(optionsPath, oncreate);
if (newSocket && !called) {
called = true;
self.onSocket(newSocket);
} else {
return;
}
} else if (self.agent) { } else if (self.agent) {
// If there is an agent we should default to Connection:keep-alive, // If there is an agent we should default to Connection:keep-alive,
// but only if the Agent will actually reuse the connection! // but only if the Agent will actually reuse the connection!
@ -139,14 +149,37 @@ function ClientRequest(options, cb) {
// No agent, default to Connection:close. // No agent, default to Connection:close.
self._last = true; self._last = true;
self.shouldKeepAlive = false; self.shouldKeepAlive = false;
if (options.createConnection) { if (typeof options.createConnection === 'function') {
self.onSocket(options.createConnection(options)); const newSocket = options.createConnection(options, oncreate);
if (newSocket && !called) {
called = true;
self.onSocket(newSocket);
} else {
return;
}
} else { } else {
debug('CLIENT use net.createConnection', options); debug('CLIENT use net.createConnection', options);
self.onSocket(net.createConnection(options)); self.onSocket(net.createConnection(options));
} }
} }
function oncreate(err, socket) {
if (called)
return;
called = true;
if (err) {
process.nextTick(function() {
self.emit('error', err);
});
return;
}
self.onSocket(socket);
self._deferToConnect(null, null, function() {
self._flush();
self = null;
});
}
self._deferToConnect(null, null, function() { self._deferToConnect(null, null, function() {
self._flush(); self._flush();
self = null; self = null;

68
test/parallel/test-http-createConnection.js

@ -1,27 +1,61 @@
'use strict'; 'use strict';
var common = require('../common'); const common = require('../common');
var assert = require('assert'); const http = require('http');
var http = require('http'); const net = require('net');
var net = require('net'); const assert = require('assert');
var create = 0; const server = http.createServer(common.mustCall(function(req, res) {
var response = 0;
process.on('exit', function() {
assert.equal(1, create, 'createConnection() http option was not called');
assert.equal(1, response, 'http server "request" callback was not called');
});
var server = http.createServer(function(req, res) {
res.end(); res.end();
response++; }, 4)).listen(common.PORT, '127.0.0.1', function() {
}).listen(common.PORT, '127.0.0.1', function() { let fn = common.mustCall(createConnection);
http.get({ createConnection: createConnection }, function(res) { http.get({ createConnection: fn }, function(res) {
res.resume(); res.resume();
server.close(); fn = common.mustCall(createConnectionAsync);
http.get({ createConnection: fn }, function(res) {
res.resume();
fn = common.mustCall(createConnectionBoth1);
http.get({ createConnection: fn }, function(res) {
res.resume();
fn = common.mustCall(createConnectionBoth2);
http.get({ createConnection: fn }, function(res) {
res.resume();
fn = common.mustCall(createConnectionError);
http.get({ createConnection: fn }, function(res) {
assert.fail(null, null, 'Unexpected response callback');
}).on('error', common.mustCall(function(err) {
assert.equal(err.message, 'Could not create socket');
server.close();
}));
});
});
});
}); });
}); });
function createConnection() { function createConnection() {
create++;
return net.createConnection(common.PORT, '127.0.0.1'); return net.createConnection(common.PORT, '127.0.0.1');
} }
function createConnectionAsync(options, cb) {
setImmediate(function() {
cb(null, net.createConnection(common.PORT, '127.0.0.1'));
});
}
function createConnectionBoth1(options, cb) {
const socket = net.createConnection(common.PORT, '127.0.0.1');
setImmediate(function() {
cb(null, socket);
});
return socket;
}
function createConnectionBoth2(options, cb) {
const socket = net.createConnection(common.PORT, '127.0.0.1');
cb(null, socket);
return socket;
}
function createConnectionError(options, cb) {
process.nextTick(cb, new Error('Could not create socket'));
}

Loading…
Cancel
Save