Browse Source

streams: Don't emit 'end' until read() past EOF

This prevents the following sort of thing from being confusing:

```javascript
stream.on('data', function() { console.error('got data'); });
stream.pause(); // stop reading

// turns out no data is available
stream.push(null);

// Hand the stream to someone else, who does stuff...
setTimeout(function() {
  // too late! 'end' is already emitted!
  stream.on('end', function() { console.error('got end'); });
});
```

With this change, the `end` event is not emitted until you call `read()`
*past* the EOF null.  So, a paused stream will not swallow the `end`
event and emit it before you `resume()` the stream.
v0.11.5-release
isaacs 12 years ago
parent
commit
993bb93e0a
  1. 31
      lib/_stream_readable.js
  2. 3
      test/simple/test-http-after-connect.js
  3. 1
      test/simple/test-http-client-abort.js
  4. 4
      test/simple/test-http-pause-resume-one-end.js
  5. 53
      test/simple/test-stream-end-paused.js
  6. 4
      test/simple/test-stream2-transform.js

31
lib/_stream_readable.js

@ -50,12 +50,6 @@ function ReadableState(options, stream) {
this.endEmitted = false;
this.reading = false;
// In streams that never have any data, and do push(null) right away,
// the consumer can miss the 'end' event if they do some I/O before
// consuming the stream. So, we don't emit('end') until some reading
// happens.
this.calledRead = false;
// a flag to be able to tell if the onwrite cb is called immediately,
// or on a later tick. We set this to true at first, becuase any
// actions that shouldn't happen until "later" should generally also
@ -253,7 +247,6 @@ function howMuchToRead(n, state) {
Readable.prototype.read = function(n) {
debug('read', n);
var state = this._readableState;
state.calledRead = true;
var nOrig = n;
if (!IS_NUMBER(n) || n > 0)
@ -265,7 +258,7 @@ Readable.prototype.read = function(n) {
if (n === 0 &&
state.needReadable &&
(state.length >= state.highWaterMark || state.ended)) {
debug('read: emitReadable');
debug('read: emitReadable', state.length, state.ended);
if (state.length === 0 && state.ended)
endReadable(this);
else
@ -333,9 +326,8 @@ Readable.prototype.read = function(n) {
state.sync = false;
}
// If _read called its callback synchronously, then `reading`
// will be false, and we need to re-evaluate how much data we
// can return to the user.
// If _read pushed data synchronously, then `reading` will be false,
// and we need to re-evaluate how much data we can return to the user.
if (doRead && !state.reading)
n = howMuchToRead(nOrig, state);
@ -357,14 +349,13 @@ Readable.prototype.read = function(n) {
if (state.length === 0 && !state.ended)
state.needReadable = true;
// If we happened to read() exactly the remaining amount in the
// buffer, and the EOF has been seen at this point, then make sure
// that we emit 'end' on the very next tick.
if (state.ended && !state.endEmitted && state.length === 0)
// If we tried to read() past the EOF, then emit end on the next tick.
if (nOrig !== n && state.ended && state.length === 0)
endReadable(this);
if (!IS_NULL(ret))
this.emit('data', ret);
return ret;
};
@ -391,12 +382,8 @@ function onEofChunk(stream, state) {
}
state.ended = true;
// if we've ended and we have some data left, then emit
// 'readable' now to make sure it gets picked up.
if (state.length > 0)
emitReadable(stream);
else
endReadable(stream);
// emit 'readable' now to make sure it gets picked up.
emitReadable(stream);
}
// Don't emit readable right away in sync mode, because this can trigger
@ -879,7 +866,7 @@ function endReadable(stream) {
if (state.length > 0)
throw new Error('endReadable called on non-empty stream');
if (!state.endEmitted && state.calledRead) {
if (!state.endEmitted) {
state.ended = true;
process.nextTick(function() {
// Check that we didn't get one last unshift.

3
test/simple/test-http-after-connect.js

@ -29,6 +29,7 @@ var clientResponses = 0;
var server = http.createServer(function(req, res) {
common.debug('Server got GET request');
req.resume();
++serverRequests;
res.writeHead(200);
res.write('');
@ -40,6 +41,7 @@ server.on('connect', function(req, socket, firstBodyChunk) {
common.debug('Server got CONNECT request');
serverConnected = true;
socket.write('HTTP/1.1 200 Connection established\r\n\r\n');
socket.resume();
socket.on('end', function() {
socket.end();
});
@ -57,6 +59,7 @@ server.listen(common.PORT, function() {
doRequest(0);
doRequest(1);
});
socket.resume();
});
req.end();
});

1
test/simple/test-http-client-abort.js

@ -62,6 +62,7 @@ server.listen(common.PORT, function() {
var req = http.get(options, function(res) {
console.log('Client response code ' + res.statusCode);
res.resume();
if (++responses == N) {
console.log('All clients connected, destroying.');
requests.forEach(function(outReq) {

4
test/simple/test-http-pause-resume-one-end.js

@ -42,11 +42,13 @@ server.listen(common.PORT, function() {
res.on('data', function(chunk) {
dataCount++;
res.pause();
setTimeout(function() {
res.resume();
});
});
res.on('end', function() {
endCount++;
res.resume();
});
});
});

53
test/simple/test-stream-end-paused.js

@ -0,0 +1,53 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
var common = require('../common');
var assert = require('assert');
var gotEnd = false;
// Make sure we don't miss the end event for paused 0-length streams
var Readable = require('stream').Readable;
var stream = new Readable();
var calledRead = false;
stream._read = function() {
assert(!calledRead);
calledRead = true;
this.push(null);
};
stream.on('data', function() {
throw new Error('should not ever get data');
});
stream.pause();
setTimeout(function() {
stream.on('end', function() {
gotEnd = true;
});
stream.resume();
});
process.on('exit', function() {
assert(gotEnd);
assert(calledRead);
console.log('ok');
});

4
test/simple/test-stream2-transform.js

@ -470,6 +470,8 @@ test('object transform (json parse)', function(t) {
});
jp.end();
// read one more time to get the 'end' event
jp.read();
process.nextTick(function() {
t.ok(ended);
@ -510,6 +512,8 @@ test('object transform (json stringify)', function(t) {
});
js.end();
// read one more time to get the 'end' event
js.read();
process.nextTick(function() {
t.ok(ended);

Loading…
Cancel
Save