From 530585b2d10eea72e71007cb5d8ab18149653c99 Mon Sep 17 00:00:00 2001 From: isaacs Date: Mon, 7 Jan 2013 19:40:08 -0800 Subject: [PATCH] stream: Use push() for readable.wrap() --- lib/_stream_readable.js | 54 +++++++---------------------------------- 1 file changed, 9 insertions(+), 45 deletions(-) diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index bd9126baef..6dac946f2a 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -611,16 +611,11 @@ Readable.prototype.wrap = function(stream) { state.ended = true; if (state.decoder) { var chunk = state.decoder.end(); - if (chunk && chunk.length) { - state.buffer.push(chunk); - state.length += chunk.length; - } + if (chunk && chunk.length) + self.push(chunk); } - if (state.length > 0) - self.emit('readable'); - else - endReadable(self); + self.push(null); }); stream.on('data', function(chunk) { @@ -629,12 +624,8 @@ Readable.prototype.wrap = function(stream) { if (!chunk || !chunk.length) return; - state.buffer.push(chunk); - state.length += chunk.length; - self.emit('readable'); - - // if not consumed, then pause the stream. - if (state.length > state.lowWaterMark && !paused) { + var ret = self.push(chunk); + if (!ret) { paused = true; stream.pause(); } @@ -657,40 +648,13 @@ Readable.prototype.wrap = function(stream) { stream.on(ev, self.emit.bind(self, ev)); }); - // consume some bytes. if not all is consumed, then - // pause the underlying stream. - this.read = function(n) { - if (state.length === 0) { - state.needReadable = true; - return null; - } - - if (isNaN(n) || n <= 0) - n = state.length; - - if (n > state.length) { - if (!state.ended) { - state.needReadable = true; - return null; - } else - n = state.length; - } - - var ret = fromList(n, state.buffer, state.length, !!state.decoder); - state.length -= n; - - if (state.length === 0 && !state.ended) - state.needReadable = true; - - if (state.length <= state.lowWaterMark && paused) { + // when we try to consume some more bytes, simply unpause the + // underlying stream. + self._read = function(n, cb) { + if (paused) { stream.resume(); paused = false; } - - if (state.length === 0 && state.ended) - endReadable(this); - - return ret; }; };