Browse Source

Implement new stream method, destroySoon

Still missing on fs.WriteStream
v0.7.4-release
Ryan Dahl 14 years ago
parent
commit
2957382991
  1. 5
      doc/api/streams.markdown
  2. 3
      lib/fs.js
  3. 15
      lib/http.js
  4. 8
      lib/net.js
  5. 14
      lib/tls.js

5
doc/api/streams.markdown

@ -65,6 +65,11 @@ Resumes the incoming `'data'` events after a `pause()`.
Closes the underlying file descriptor. Stream will not emit any more events. Closes the underlying file descriptor. Stream will not emit any more events.
### stream.destroySoon()
After the write queue is drained, close the file descriptor.
### stream.pipe(destination, [options]) ### stream.pipe(destination, [options])
This is a `Stream.prototype` method available on all `Stream`s. This is a `Stream.prototype` method available on all `Stream`s.

3
lib/fs.js

@ -981,3 +981,6 @@ WriteStream.prototype.destroy = function(cb) {
} }
}; };
// TODO: WriteStream.prototype.destroySoon()

15
lib/http.js

@ -733,13 +733,11 @@ function httpSocketSetup(socket) {
// An array of outgoing messages for the socket. In pipelined connections // An array of outgoing messages for the socket. In pipelined connections
// we need to keep track of the order they were sent. // we need to keep track of the order they were sent.
socket._outgoing = []; socket._outgoing = [];
socket.__destroyOnDrain = false;
// NOTE: be sure not to use ondrain elsewhere in this file! // NOTE: be sure not to use ondrain elsewhere in this file!
socket.ondrain = function() { socket.ondrain = function() {
var message = socket._outgoing[0]; var message = socket._outgoing[0];
if (message) message.emit('drain'); if (message) message.emit('drain');
if (socket.__destroyOnDrain) socket.destroy();
}; };
} }
@ -833,18 +831,7 @@ function connectionListener(socket) {
if (message._last) { if (message._last) {
// No more messages to be pushed out. // No more messages to be pushed out.
if (!socket._writeQueue) { socket.destroySoon();
// Putting this here for https. Really need to add below hack to
// both socket and https interfaces.
socket.end();
} else {
// HACK: need way to do this with socket interface
if (socket._writeQueue.length) {
socket.__destroyOnDrain = true; //socket.end();
} else {
socket.destroy();
}
}
} else if (socket._outgoing.length) { } else if (socket._outgoing.length) {
// Push out the next message. // Push out the next message.

8
lib/net.js

@ -554,6 +554,7 @@ Stream.prototype._onWritable = function() {
if (this.flush()) { if (this.flush()) {
if (this._events && this._events['drain']) this.emit('drain'); if (this._events && this._events['drain']) this.emit('drain');
if (this.ondrain) this.ondrain(); // Optimization if (this.ondrain) this.ondrain(); // Optimization
if (this.__destroyOnDrain) this.destroy();
} }
}; };
@ -705,6 +706,13 @@ Stream.prototype.resume = function() {
this._readWatcher.start(); this._readWatcher.start();
}; };
Stream.prototype.destroySoon = function() {
if (this.flush()) {
this.destroy();
} else {
this.__destroyOnDrain = true;
}
};
Stream.prototype.destroy = function(exception) { Stream.prototype.destroy = function(exception) {
// pool is shared between sockets, so don't need to free it here. // pool is shared between sockets, so don't need to free it here.

14
lib/tls.js

@ -139,6 +139,19 @@ CryptoStream.prototype.end = function(d) {
}; };
CryptoStream.prototype.destroySoon = function(err) {
if (this.pair._done) return;
this.pair._cycle();
if (this._pending.length) {
this.__destroyOnDrain = true;
} else {
this.end();
}
};
CryptoStream.prototype.destroy = function(err) { CryptoStream.prototype.destroy = function(err) {
if (this.pair._done) return; if (this.pair._done) return;
this.pair._destroy(); this.pair._destroy();
@ -249,6 +262,7 @@ CryptoStream.prototype._suck = function() {
if (havePending && this._pending && this._pending.length === 0) { if (havePending && this._pending && this._pending.length === 0) {
debug('drain'); debug('drain');
this.emit('drain'); this.emit('drain');
if (this.__destroyOnDrain) this.end();
} }
}; };

Loading…
Cancel
Save