mirror of https://github.com/lukechilds/node.git
Browse Source
Improve readability of lib/stream.js by moving the legacy abstract Stream into lib/internal/streams/legacy.js. PR-URL: https://github.com/nodejs/node/pull/8197 Reviewed-By: Matteo Collina <matteo.collina@gmail.com>v6
committed by
Matteo Collina
3 changed files with 97 additions and 96 deletions
@ -0,0 +1,93 @@ |
|||
'use strict'; |
|||
|
|||
const EE = require('events'); |
|||
const util = require('util'); |
|||
|
|||
function Stream() { |
|||
EE.call(this); |
|||
} |
|||
util.inherits(Stream, EE); |
|||
|
|||
Stream.prototype.pipe = function(dest, options) { |
|||
var source = this; |
|||
|
|||
function ondata(chunk) { |
|||
if (dest.writable) { |
|||
if (false === dest.write(chunk) && source.pause) { |
|||
source.pause(); |
|||
} |
|||
} |
|||
} |
|||
|
|||
source.on('data', ondata); |
|||
|
|||
function ondrain() { |
|||
if (source.readable && source.resume) { |
|||
source.resume(); |
|||
} |
|||
} |
|||
|
|||
dest.on('drain', ondrain); |
|||
|
|||
// If the 'end' option is not supplied, dest.end() will be called when
|
|||
// source gets the 'end' or 'close' events. Only dest.end() once.
|
|||
if (!dest._isStdio && (!options || options.end !== false)) { |
|||
source.on('end', onend); |
|||
source.on('close', onclose); |
|||
} |
|||
|
|||
var didOnEnd = false; |
|||
function onend() { |
|||
if (didOnEnd) return; |
|||
didOnEnd = true; |
|||
|
|||
dest.end(); |
|||
} |
|||
|
|||
|
|||
function onclose() { |
|||
if (didOnEnd) return; |
|||
didOnEnd = true; |
|||
|
|||
if (typeof dest.destroy === 'function') dest.destroy(); |
|||
} |
|||
|
|||
// don't leave dangling pipes when there are errors.
|
|||
function onerror(er) { |
|||
cleanup(); |
|||
if (EE.listenerCount(this, 'error') === 0) { |
|||
throw er; // Unhandled stream error in pipe.
|
|||
} |
|||
} |
|||
|
|||
source.on('error', onerror); |
|||
dest.on('error', onerror); |
|||
|
|||
// remove all the event listeners that were added.
|
|||
function cleanup() { |
|||
source.removeListener('data', ondata); |
|||
dest.removeListener('drain', ondrain); |
|||
|
|||
source.removeListener('end', onend); |
|||
source.removeListener('close', onclose); |
|||
|
|||
source.removeListener('error', onerror); |
|||
dest.removeListener('error', onerror); |
|||
|
|||
source.removeListener('end', cleanup); |
|||
source.removeListener('close', cleanup); |
|||
|
|||
dest.removeListener('close', cleanup); |
|||
} |
|||
|
|||
source.on('end', cleanup); |
|||
source.on('close', cleanup); |
|||
|
|||
dest.on('close', cleanup); |
|||
dest.emit('pipe', source); |
|||
|
|||
// Allow for unix-like usage: A.pipe(B).pipe(C)
|
|||
return dest; |
|||
}; |
|||
|
|||
module.exports = Stream; |
Loading…
Reference in new issue