Browse Source

streams2: Writable organization, add 'finishing' flag

v0.9.4-release
isaacs 12 years ago
parent
commit
0118584433
  1. 199
      lib/_stream_writable.js

199
lib/_stream_writable.js

@ -27,20 +27,41 @@ module.exports = Writable;
Writable.WritableState = WritableState;
var util = require('util');
var assert = require('assert');
var Stream = require('stream');
util.inherits(Writable, Stream);
function WritableState(options) {
function WritableState(options, stream) {
options = options || {};
this.highWaterMark = options.highWaterMark || 16 * 1024;
// the point at which write() starts returning false
this.highWaterMark = options.hasOwnProperty('highWaterMark') ?
options.highWaterMark : 16 * 1024;
// the point that it has to get to before we call _write(chunk,cb)
// default to pushing everything out as fast as possible.
this.lowWaterMark = options.hasOwnProperty('lowWaterMark') ?
options.lowWaterMark : 1024;
options.lowWaterMark : 0;
// cast to ints.
assert(typeof this.lowWaterMark === 'number');
assert(typeof this.highWaterMark === 'number');
this.lowWaterMark = ~~this.lowWaterMark;
this.highWaterMark = ~~this.highWaterMark;
assert(this.lowWaterMark >= 0);
assert(this.highWaterMark >= this.lowWaterMark,
this.highWaterMark + '>=' + this.lowWaterMark);
this.needDrain = false;
this.ended = false;
// at the start of calling end()
this.ending = false;
// when end() has been called, and returned
this.ended = false;
// when 'finish' has emitted
this.finished = false;
// when 'finish' is being emitted
this.finishing = false;
// should we decode strings into buffers before passing to _write?
// this is here so that some node-core streams can optimize string
@ -53,7 +74,22 @@ function WritableState(options) {
// socket or file.
this.length = 0;
// a flag to see when we're in the middle of a write.
this.writing = false;
// a flag to be able to tell if the onwrite cb is called immediately,
// or on a later tick.
this.sync = false;
// the callback that's passed to _write(chunk,cb)
this.onwrite = onwrite.bind(stream);
// the callback that the user supplies to write(chunk,encoding,cb)
this.writecb = null;
// the amount that is being written when _write is called.
this.writelen = 0;
this.buffer = [];
}
@ -63,7 +99,7 @@ function Writable(options) {
if (!(this instanceof Writable) && !(this instanceof Stream.Duplex))
return new Writable(options);
this._writableState = new WritableState(options);
this._writableState = new WritableState(options, this);
// legacy.
this.writable = true;
@ -71,23 +107,26 @@ function Writable(options) {
Stream.call(this);
}
// Override this method for sync streams
// override the _write(chunk, cb) method for async streams
// Override this method or _write(chunk, cb)
Writable.prototype.write = function(chunk, encoding, cb) {
var state = this._writableState;
if (state.ended) {
this.emit('error', new Error('write after end'));
return;
}
if (typeof encoding === 'function') {
cb = encoding;
encoding = null;
}
if (state.ended) {
var er = new Error('write after end');
if (typeof cb === 'function')
cb(er);
this.emit('error', er);
return;
}
var l = chunk.length;
if (false === state.decodeStrings)
chunk = [chunk, encoding];
chunk = [chunk, encoding || 'utf8'];
else if (typeof chunk === 'string' || encoding) {
chunk = new Buffer(chunk + '', encoding);
l = chunk.length;
@ -107,70 +146,84 @@ Writable.prototype.write = function(chunk, encoding, cb) {
}
state.writing = true;
var sync = true;
this._write(chunk, writecb.bind(this));
sync = false;
state.sync = true;
state.writelen = l;
state.writecb = cb;
this._write(chunk, state.onwrite);
state.sync = false;
return ret;
};
function writecb(er) {
state.writing = false;
if (er) {
if (cb) {
if (sync)
process.nextTick(cb.bind(null, er));
else
cb(er);
} else
this.emit('error', er);
return;
}
state.length -= l;
function onwrite(er) {
var state = this._writableState;
var sync = state.sync;
var cb = state.writecb;
var l = state.writelen;
state.writing = false;
state.writelen = null;
state.writecb = null;
if (er) {
if (cb) {
// don't call the cb until the next tick if we're in sync mode.
// also, defer if we're about to write some more right now.
if (sync || state.buffer.length)
process.nextTick(cb);
else
cb();
}
if (state.length === 0 && (state.ended || state.ending)) {
// emit 'finish' at the very end.
this.emit('finish');
return;
}
// if there's something in the buffer waiting, then do that, too.
if (state.buffer.length) {
var chunkCb = state.buffer.shift();
chunk = chunkCb[0];
cb = chunkCb[1];
if (false === state.decodeStrings)
l = chunk[0].length;
if (sync)
process.nextTick(cb.bind(null, er));
else
l = chunk.length;
state.writing = true;
this._write(chunk, writecb.bind(this));
}
if (state.length <= state.lowWaterMark && state.needDrain) {
// Must force callback to be called on nextTick, so that we don't
// emit 'drain' before the write() consumer gets the 'false' return
// value, and has a chance to attach a 'drain' listener.
process.nextTick(function() {
if (!state.needDrain)
return;
state.needDrain = false;
this.emit('drain');
}.bind(this));
}
cb(er);
} else
this.emit('error', er);
return;
}
state.length -= l;
if (cb) {
// don't call the cb until the next tick if we're in sync mode.
// also, defer if we're about to write some more right now.
if (sync || state.buffer.length)
process.nextTick(cb);
else
cb();
}
};
if (state.length === 0 && (state.ended || state.ending)) {
// emit 'finish' at the very end.
state.finishing = true;
this.emit('finish');
state.finished = true;
return;
}
// if there's something in the buffer waiting, then do that, too.
if (state.buffer.length) {
var chunkCb = state.buffer.shift();
var chunk = chunkCb[0];
cb = chunkCb[1];
if (false === state.decodeStrings)
l = chunk[0].length;
else
l = chunk.length;
state.writelen = l;
state.writecb = cb;
state.writechunk = chunk;
state.writing = true;
this._write(chunk, state.onwrite);
}
if (state.length <= state.lowWaterMark && state.needDrain) {
// Must force callback to be called on nextTick, so that we don't
// emit 'drain' before the write() consumer gets the 'false' return
// value, and has a chance to attach a 'drain' listener.
process.nextTick(function() {
if (!state.needDrain)
return;
state.needDrain = false;
this.emit('drain');
}.bind(this));
}
}
Writable.prototype._write = function(chunk, cb) {
process.nextTick(cb.bind(this, new Error('not implemented')));
@ -178,10 +231,18 @@ Writable.prototype._write = function(chunk, cb) {
Writable.prototype.end = function(chunk, encoding) {
var state = this._writableState;
// ignore unnecessary end() calls.
if (state.ending || state.ended || state.finished)
return;
state.ending = true;
if (chunk)
this.write(chunk, encoding);
else if (state.length === 0)
else if (state.length === 0) {
state.finishing = true;
this.emit('finish');
state.finished = true;
}
state.ended = true;
};

Loading…
Cancel
Save