diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index e045ead396..fff50d36fe 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -69,7 +69,9 @@ function ReadableState(options, stream) { this.endEmitted = false; this.reading = false; this.sync = false; - this.onread = onread.bind(stream); + this.onread = function(er, data) { + onread(stream, er, data); + }; // whenever we return null, then we set a flag to say // that we're awaiting a 'readable' event emission. @@ -208,13 +210,13 @@ Readable.prototype.read = function(n) { return ret; }; -function onread(er, chunk) { - var state = this._readableState; +function onread(stream, er, chunk) { + var state = stream._readableState; var sync = state.sync; state.reading = false; if (er) - return this.emit('error', er); + return stream.emit('error', er); if (!chunk || !chunk.length) { // eof @@ -233,10 +235,10 @@ function onread(er, chunk) { state.needReadable = false; if (!state.emittedReadable) { state.emittedReadable = true; - this.emit('readable'); + stream.emit('readable'); } } else - endReadable(this); + endReadable(stream); } return; } @@ -257,7 +259,7 @@ function onread(er, chunk) { // another _read(n,cb) before this one returns! if (state.length <= state.lowWaterMark) { state.reading = true; - this._read(state.bufferSize, state.onread); + stream._read(state.bufferSize, state.onread); return; } @@ -265,7 +267,7 @@ function onread(er, chunk) { state.needReadable = false; if (!state.emittedReadable) { state.emittedReadable = true; - this.emit('readable'); + stream.emit('readable'); } } } @@ -275,7 +277,9 @@ function onread(er, chunk) { // for virtual (non-string, non-buffer) streams, "length" is somewhat // arbitrary, and perhaps not very meaningful. Readable.prototype._read = function(n, cb) { - process.nextTick(cb.bind(this, new Error('not implemented'))); + process.nextTick(function() { + cb(new Error('not implemented')); + }); }; Readable.prototype.pipe = function(dest, pipeOpts) { @@ -316,7 +320,9 @@ Readable.prototype.pipe = function(dest, pipeOpts) { // start the flow. if (!state.flowing) { state.flowing = true; - process.nextTick(flow.bind(null, src, pipeOpts)); + process.nextTick(function() { + flow(src, pipeOpts); + }); } return dest; @@ -371,7 +377,9 @@ function flow(src, pipeOpts) { // at this point, no one needed a drain, so we just ran out of data // on the next readable event, start it over again. - src.once('readable', flow.bind(null, src, pipeOpts)); + src.once('readable', function() { + flow(src, pipeOpts); + }); } Readable.prototype.unpipe = function(dest) { @@ -504,6 +512,7 @@ Readable.prototype.wrap = function(stream) { var state = this._readableState; var paused = false; + var self = this; stream.on('end', function() { state.ended = true; if (state.decoder) { @@ -515,10 +524,10 @@ Readable.prototype.wrap = function(stream) { } if (state.length > 0) - this.emit('readable'); + self.emit('readable'); else - endReadable(this); - }.bind(this)); + endReadable(self); + }); stream.on('data', function(chunk) { if (state.decoder) @@ -528,14 +537,14 @@ Readable.prototype.wrap = function(stream) { state.buffer.push(chunk); state.length += chunk.length; - this.emit('readable'); + self.emit('readable'); // if not consumed, then pause the stream. if (state.length > state.lowWaterMark && !paused) { paused = true; stream.pause(); } - }.bind(this)); + }); // proxy all the other methods. // important when wrapping filters and duplexes. @@ -551,8 +560,8 @@ Readable.prototype.wrap = function(stream) { // proxy certain important events. var events = ['error', 'close', 'destroy', 'pause', 'resume']; events.forEach(function(ev) { - stream.on(ev, this.emit.bind(this, ev)); - }.bind(this)); + stream.on(ev, self.emit.bind(self, ev)); + }); // consume some bytes. if not all is consumed, then // pause the underlying stream. @@ -660,5 +669,7 @@ function endReadable(stream) { return; state.ended = true; state.endEmitted = true; - process.nextTick(stream.emit.bind(stream, 'end')); + process.nextTick(function() { + stream.emit('end'); + }); } diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index cf1c2e3b0e..b0819de29a 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -70,10 +70,13 @@ var Duplex = require('_stream_duplex'); var util = require('util'); util.inherits(Transform, Duplex); -function TransformState() { +function TransformState(stream) { this.buffer = []; this.transforming = false; this.pendingReadCb = null; + this.output = function(chunk) { + stream._output(chunk); + }; } function Transform(options) { @@ -83,17 +86,19 @@ function Transform(options) { Duplex.call(this, options); // bind output so that it can be passed around as a regular function. - this._output = this._output.bind(this); + var stream = this; // the queue of _write chunks that are pending being transformed - this._transformState = new TransformState(); + var ts = this._transformState = new TransformState(stream); // when the writable side finishes, then flush out anything remaining. this.once('finish', function() { if ('function' === typeof this._flush) - this._flush(this._output, done.bind(this)); + this._flush(ts.output, function(er) { + done(stream, er); + }); else - done.call(this); + done(stream); }); } @@ -159,14 +164,13 @@ Transform.prototype._read = function(n, readcb) { var req = ts.buffer.shift(); var chunk = req[0]; var writecb = req[1]; - var output = this._output; ts.transforming = true; - this._transform(chunk, output, function(er, data) { + this._transform(chunk, ts.output, function(er, data) { ts.transforming = false; if (data) - output(data); + ts.output(data); writecb(er); - }.bind(this)); + }); }; Transform.prototype._output = function(chunk) { @@ -185,25 +189,25 @@ Transform.prototype._output = function(chunk) { } // otherwise, it's up to us to fill the rs buffer. - var state = this._readableState; - var len = state.length; - state.buffer.push(chunk); - state.length += chunk.length; - if (state.needReadable) { - state.needReadable = false; + var rs = this._readableState; + var len = rs.length; + rs.buffer.push(chunk); + rs.length += chunk.length; + if (rs.needReadable) { + rs.needReadable = false; this.emit('readable'); } }; -function done(er) { +function done(stream, er) { if (er) - return this.emit('error', er); + return stream.emit('error', er); // if there's nothing in the write buffer, then that means // that nothing more will ever be provided - var ws = this._writableState; - var rs = this._readableState; - var ts = this._transformState; + var ws = stream._writableState; + var rs = stream._readableState; + var ts = stream._transformState; if (ws.length) throw new Error('calling transform done when ws.length != 0'); @@ -221,7 +225,7 @@ function done(er) { // no more data coming from the writable side, we need to emit // now so that the consumer knows to pick up the tail bits. if (rs.length && rs.needReadable) - this.emit('readable'); + stream.emit('readable'); else if (rs.length === 0) - this.emit('end'); + stream.emit('end'); } diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index cfcd2e25d7..00702cab9a 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -82,7 +82,9 @@ function WritableState(options, stream) { this.sync = false; // the callback that's passed to _write(chunk,cb) - this.onwrite = onwrite.bind(stream); + this.onwrite = function(er) { + onwrite(stream, er); + }; // the callback that the user supplies to write(chunk,encoding,cb) this.writecb = null; @@ -155,8 +157,8 @@ Writable.prototype.write = function(chunk, encoding, cb) { return ret; }; -function onwrite(er) { - var state = this._writableState; +function onwrite(stream, er) { + var state = stream._writableState; var sync = state.sync; var cb = state.writecb; var l = state.writelen; @@ -168,11 +170,13 @@ function onwrite(er) { if (er) { if (cb) { if (sync) - process.nextTick(cb.bind(null, er)); + process.nextTick(function() { + cb(er); + }); else cb(er); } else - this.emit('error', er); + stream.emit('error', er); return; } state.length -= l; @@ -189,7 +193,7 @@ function onwrite(er) { if (state.length === 0 && (state.ended || state.ending)) { // emit 'finish' at the very end. state.finishing = true; - this.emit('finish'); + stream.emit('finish'); state.finished = true; return; } @@ -209,7 +213,7 @@ function onwrite(er) { state.writecb = cb; state.writechunk = chunk; state.writing = true; - this._write(chunk, state.onwrite); + stream._write(chunk, state.onwrite); } if (state.length <= state.lowWaterMark && state.needDrain) { @@ -220,13 +224,15 @@ function onwrite(er) { if (!state.needDrain) return; state.needDrain = false; - this.emit('drain'); - }.bind(this)); + stream.emit('drain'); + }); } } Writable.prototype._write = function(chunk, cb) { - process.nextTick(cb.bind(this, new Error('not implemented'))); + process.nextTick(function() { + cb(new Error('not implemented')); + }); }; Writable.prototype.end = function(chunk, encoding) {