diff --git a/doc/api/streams.markdown b/doc/api/streams.markdown index 88ae04f5c0..6392eec7bb 100644 --- a/doc/api/streams.markdown +++ b/doc/api/streams.markdown @@ -65,7 +65,7 @@ Resumes the incoming `'data'` events after a `pause()`. Closes the underlying file descriptor. Stream will not emit any more events. -### stream.pipe(destination, [options], [filter]) +### stream.pipe(destination, [options]) This is a `Stream.prototype` method available on all `Stream`s. @@ -92,48 +92,6 @@ NOTE: If the source stream does not support `pause()` and `resume()`, this funct adds simple definitions which simply emit `'pause'` and `'resume'` events on the source stream. - -The `filter` argument is an optional callback which can be used to filter all -data passing through the pipe. This makes it easy to do arbitrary transforms -(like gzip) while still maintaining the proper throttling. `filter` gets -three arguments: a buffer, a write function, and a done function. Here is an -example of a chat which uses a `filter` to append each message with the -address of the sender. - - var net = require('net'); - var people = []; - - function address(socket) { - return '<' + socket.remoteAddress + ':' + socket.remotePort + '> '; - } - - net.Server(function (socket) { - socket.write("hello!\r\n"); - - people.forEach(function (p) { - socket.pipe(p, { end: false }, function (d, write, done) { - write(address(socket)); - write(d); - done(); - }); - - p.pipe(socket, { end: false }, function (d, write, done) { - write(address(p)); - write(d); - done(); - }); - }); - - people.push(socket); - - socket.on('end', function () { - people.splice(people.indexOf(socket), 1); - }); - }).listen(8000); - - - - ## Writable Stream A `Writable Stream` has the following methods, members, and events. diff --git a/lib/stream.js b/lib/stream.js index e92c1bc5db..98f0c37fcb 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -7,55 +7,16 @@ function Stream() { util.inherits(Stream, events.EventEmitter); exports.Stream = Stream; -Stream.prototype.pipe = function(dest /* options, filter */) { +Stream.prototype.pipe = function(dest, options) { var source = this; - // parse arguments - var options, filter; - if (typeof arguments[1] == 'object') { - options = arguments[1]; - filter = arguments[2]; - } else { - filter = arguments[1]; - } - function ondata(chunk) { - // FIXME shouldn't need to test writable - this is working around bug. - // .writable should not change before a 'end' event is fired. if (dest.writable) { if (false === dest.write(chunk)) source.pause(); } } - if (!filter) { - source.on('data', ondata); - } else { - // - // TODO: needs tests - // - var wait = false; - var waitQueue = []; - - function done () { - wait = false; - // Drain the waitQueue - if (dest.writable && waitQueue.length) { - wait = true; - filter(waitQueue.shift(), ondata, done); - } - } - - source.on('data', function (d) { - if (wait) { - waitQueue.push(d); - source.pause(); - } else { - wait = true; - filter(d, ondata, done); - } - }); - } - + source.on('data', ondata); function ondrain() { if (source.readable) source.resume();