Browse Source

stream: Use push() for readable.wrap()

v0.9.6-release
isaacs 12 years ago
parent
commit
530585b2d1
  1. 54
      lib/_stream_readable.js

54
lib/_stream_readable.js

@ -611,16 +611,11 @@ Readable.prototype.wrap = function(stream) {
state.ended = true;
if (state.decoder) {
var chunk = state.decoder.end();
if (chunk && chunk.length) {
state.buffer.push(chunk);
state.length += chunk.length;
}
if (chunk && chunk.length)
self.push(chunk);
}
if (state.length > 0)
self.emit('readable');
else
endReadable(self);
self.push(null);
});
stream.on('data', function(chunk) {
@ -629,12 +624,8 @@ Readable.prototype.wrap = function(stream) {
if (!chunk || !chunk.length)
return;
state.buffer.push(chunk);
state.length += chunk.length;
self.emit('readable');
// if not consumed, then pause the stream.
if (state.length > state.lowWaterMark && !paused) {
var ret = self.push(chunk);
if (!ret) {
paused = true;
stream.pause();
}
@ -657,40 +648,13 @@ Readable.prototype.wrap = function(stream) {
stream.on(ev, self.emit.bind(self, ev));
});
// consume some bytes. if not all is consumed, then
// pause the underlying stream.
this.read = function(n) {
if (state.length === 0) {
state.needReadable = true;
return null;
}
if (isNaN(n) || n <= 0)
n = state.length;
if (n > state.length) {
if (!state.ended) {
state.needReadable = true;
return null;
} else
n = state.length;
}
var ret = fromList(n, state.buffer, state.length, !!state.decoder);
state.length -= n;
if (state.length === 0 && !state.ended)
state.needReadable = true;
if (state.length <= state.lowWaterMark && paused) {
// when we try to consume some more bytes, simply unpause the
// underlying stream.
self._read = function(n, cb) {
if (paused) {
stream.resume();
paused = false;
}
if (state.length === 0 && state.ended)
endReadable(this);
return ret;
};
};

Loading…
Cancel
Save