Browse Source

Better stream.pipe() tracking.

This commit does three things:

1. Uses an exposed counter rather than a hidden array for tracking dest
streams that may have multiple inputs.  This allows for significantly
faster lookups, since the counter can be checked in constant time rather
than searching an array for the dest object.  (A proper O(1) WeakMap
would be better, but that may have to wait for Harmony.)

2. Calls the 'end' event logic when there is an 'error' event on the
source object (and then throws if there are no other error listeners.)
This is important, because otherwise 'error' events would lead to
memory leaks.

3. Clean up the style a bit.  Function Declarations are not allowed
within blocks in ES strict.  Prefer Function Declarations to Function
Expressions, because hoisting allows for more expressive ordering of
logic.

Downside: It adds "_pipeCount" as part of the Stream API.  It'll work
fine if the member is missing, but if anyone tries to use it for some
other purpose, it can mess things up.
v0.7.4-release
isaacs 14 years ago
parent
commit
7c6f0147df
  1. 68
      lib/stream.js

68
lib/stream.js

@ -28,13 +28,9 @@ function Stream() {
util.inherits(Stream, events.EventEmitter); util.inherits(Stream, events.EventEmitter);
exports.Stream = Stream; exports.Stream = Stream;
var pipes = [];
Stream.prototype.pipe = function(dest, options) { Stream.prototype.pipe = function(dest, options) {
var source = this; var source = this;
pipes.push(dest);
function ondata(chunk) { function ondata(chunk) {
if (dest.writable) { if (dest.writable) {
if (false === dest.write(chunk)) source.pause(); if (false === dest.write(chunk)) source.pause();
@ -49,31 +45,48 @@ Stream.prototype.pipe = function(dest, options) {
dest.on('drain', ondrain); dest.on('drain', ondrain);
/* // If the 'end' option is not supplied, dest.end() will be called when
* If the 'end' option is not supplied, dest.end() will be called when // source gets the 'end' or 'close' events. Only dest.end() once, and
* source gets the 'end' event. // only when all sources have ended.
*/
if (!options || options.end !== false) { if (!options || options.end !== false) {
function onend() { dest._pipeCount = dest._pipeCount || 0;
var index = pipes.indexOf(dest); dest._pipeCount++;
pipes.splice(index, 1);
source.on('end', onend);
source.on('close', onend);
}
var didOnEnd = false;
function onend() {
if (didOnEnd) return;
didOnEnd = true;
dest._pipeCount--;
if (pipes.indexOf(dest) > -1) { // remove the listeners
return; cleanup();
}
dest.end(); if (dest._pipeCount > 0) {
// waiting for other incoming streams to end.
return;
} }
source.on('end', onend); dest.end();
source.on('close', onend); }
// don't leave dangling pipes when there are errors.
function onerror(er) {
cleanup();
if (this.listeners('error').length === 1) {
throw er; // Unhandled stream error in pipe.
}
} }
/* source.on('error', onerror);
* Questionable: dest.on('error', onerror);
*/
// guarantee that source streams can be paused and resumed, even
// if the only effect is to proxy the event back up the pipe chain.
if (!source.pause) { if (!source.pause) {
source.pause = function() { source.pause = function() {
source.emit('pause'); source.emit('pause');
@ -86,27 +99,32 @@ Stream.prototype.pipe = function(dest, options) {
}; };
} }
var onpause = function() { function onpause() {
source.pause(); source.pause();
} }
dest.on('pause', onpause); dest.on('pause', onpause);
var onresume = function() { function onresume() {
if (source.readable) source.resume(); if (source.readable) source.resume();
}; }
dest.on('resume', onresume); dest.on('resume', onresume);
var cleanup = function () { // remove all the event listeners that were added.
function cleanup() {
source.removeListener('data', ondata); source.removeListener('data', ondata);
dest.removeListener('drain', ondrain); dest.removeListener('drain', ondrain);
source.removeListener('end', onend); source.removeListener('end', onend);
source.removeListener('close', onend); source.removeListener('close', onend);
dest.removeListener('pause', onpause); dest.removeListener('pause', onpause);
dest.removeListener('resume', onresume); dest.removeListener('resume', onresume);
source.removeListener('error', onerror);
dest.removeListener('error', onerror);
source.removeListener('end', cleanup); source.removeListener('end', cleanup);
source.removeListener('close', cleanup); source.removeListener('close', cleanup);

Loading…
Cancel
Save