@ -53,10 +53,8 @@ function WritableState(options, stream) {
this . ending = false ;
// when end() has been called, and returned
this . ended = false ;
// when 'finish' ha s emitted
// when 'finish' i s emitted
this . finished = false ;
// when 'finish' is being emitted
this . finishing = false ;
// should we decode strings into buffers before passing to _write?
// this is here so that some node-core streams can optimize string
@ -116,183 +114,196 @@ Writable.prototype.pipe = function() {
this . emit ( 'error' , new Error ( 'Cannot pipe. Not readable.' ) ) ;
} ;
// Override this method or _write(chunk, cb)
Writable . prototype . write = function ( chunk , encoding , cb ) {
var state = this . _ writableState ;
if ( typeof encoding === 'function' ) {
cb = encoding ;
encoding = null ;
}
if ( state . ended ) {
var self = this ;
var er = new Error ( 'write after end' ) ;
// TODO: defer error events consistently everywhere, not just the cb
self . emit ( 'error' , er ) ;
if ( typeof cb === 'function' ) {
process . nextTick ( function ( ) {
cb ( er ) ;
} ) ;
}
return ;
}
function writeAfterEnd ( stream , state , cb ) {
var er = new Error ( 'write after end' ) ;
// TODO: defer error events consistently everywhere, not just the cb
stream . emit ( 'error' , er ) ;
process . nextTick ( function ( ) {
cb ( er ) ;
} ) ;
}
// If we get something that is not a buffer, string, null, or undefined,
// and we're not in objectMode, then that's an error.
// Otherwise stream chunks are all considered to be of length=1, and the
// watermarks determine how many objects to keep in the buffer, rather than
// how many bytes or characters.
// If we get something that is not a buffer, string, null, or undefined,
// and we're not in objectMode, then that's an error.
// Otherwise stream chunks are all considered to be of length=1, and the
// watermarks determine how many objects to keep in the buffer, rather than
// how many bytes or characters.
function validChunk ( stream , state , chunk , cb ) {
var valid = true ;
if ( ! Buffer . isBuffer ( chunk ) &&
'string' !== typeof chunk &&
chunk !== null &&
chunk !== undefined &&
! state . objectMode ) {
var er = new TypeError ( 'Invalid non-string/buffer chunk' ) ;
if ( typeof cb === 'function' )
stream . emit ( 'error' , er ) ;
process . nextTick ( function ( ) {
cb ( er ) ;
this . emit ( 'error' , er ) ;
return ;
} ) ;
valid = false ;
}
return valid ;
}
var len ;
if ( state . objectMode )
len = 1 ;
else {
len = chunk . length ;
if ( false === state . decodeStrings )
chunk = [ chunk , encoding || 'utf8' ] ;
else if ( typeof chunk === 'string' ) {
chunk = new Buffer ( chunk , encoding ) ;
len = chunk . length ;
}
function decodeChunk ( state , chunk , encoding ) {
if ( ! state . objectMode &&
state . decodeStrings !== false &&
typeof chunk === 'string' ) {
chunk = new Buffer ( chunk , encoding ) ;
}
return chunk ;
}
Writable . prototype . write = function ( chunk , encoding , cb ) {
var state = this . _ writableState ;
var ret = false ;
if ( typeof encoding === 'function' ) {
cb = encoding ;
encoding = null ;
}
if ( ! encoding )
encoding = 'utf8' ;
if ( typeof cb !== 'function' )
cb = function ( ) { } ;
if ( state . ended )
writeAfterEnd ( this , state , cb ) ;
else if ( validChunk ( this , state , chunk , cb ) )
ret = writeOrBuffer ( this , state , chunk , encoding , cb ) ;
return ret ;
} ;
// if we're already writing something, then just put this
// in the queue, and wait our turn. Otherwise, call _write
// If we return false, then we need a drain event, so set that flag.
function writeOrBuffer ( stream , state , chunk , encoding , cb ) {
chunk = decodeChunk ( state , chunk , encoding ) ;
var len = state . objectMode ? 1 : chunk . length ;
// XXX Remove. _write() should take an encoding.
if ( state . decodeStrings === false )
chunk = [ chunk , encoding ] ;
state . length += len ;
var ret = state . length < state . highWaterMark ;
if ( ret === false )
state . needDrain = true ;
// if we're already writing something, then just put this
// in the queue, and wait our turn.
if ( state . writing ) {
state . buffer . push ( [ chunk , cb ] ) ;
return ret ;
}
state . needDrain = ! ret ;
state . writing = true ;
state . sync = true ;
if ( state . writing )
state . buffer . push ( [ chunk , cb ] ) ; // XXX [chunk,encoding,cb]
else
doWrite ( stream , state , len , chunk , encoding , cb ) ;
return ret ;
}
function doWrite ( stream , state , len , chunk , encoding , cb ) {
state . writelen = len ;
state . writecb = cb ;
this . _ write ( chunk , state . onwrite ) ;
state . writing = true ;
state . sync = true ;
// XXX stream._write(chunk, encoding, state.onwrite)
stream . _ write ( chunk , state . onwrite ) ;
state . sync = false ;
}
return ret ;
} ;
function onwriteError ( stream , state , sync , er , cb ) {
if ( sync )
process . nextTick ( function ( ) {
cb ( er ) ;
} ) ;
else
cb ( er ) ;
stream . emit ( 'error' , er ) ;
}
function onwriteStateUpdate ( state ) {
state . writing = false ;
state . writecb = null ;
state . length -= state . writelen ;
state . writelen = 0 ;
}
function onwrite ( stream , er ) {
var state = stream . _ writableState ;
var sync = state . sync ;
var cb = state . writecb ;
var len = state . writelen ;
state . writing = false ;
state . writelen = null ;
state . writecb = null ;
onwriteStateUpdate ( state ) ;
if ( er ) {
if ( cb ) {
// If _write(chunk,cb) calls cb() in this tick, we still defer
// the *user's* write callback to the next tick.
// Never present an external API that is *sometimes* async!
if ( sync )
process . nextTick ( function ( ) {
cb ( er ) ;
} ) ;
else
cb ( er ) ;
}
if ( er )
onwriteError ( stream , state , sync , er , cb ) ;
else {
if ( ! finishMaybe ( stream , state ) ) {
if ( state . length === 0 && state . needDrain )
onwriteDrain ( stream , state ) ;
// backwards compatibility. still emit if there was a cb.
stream . emit ( 'error' , er ) ;
return ;
}
state . length -= len ;
if ( ! state . bufferProcessing && state . buffer . length )
clearBuffer ( stream , state ) ;
}
if ( cb ) {
// Don't call the cb until the next tick if we're in sync mode.
if ( sync )
process . nextTick ( cb ) ;
else
cb ( ) ;
}
}
if ( state . length === 0 && ( state . ended || state . ending ) &&
! state . finished && ! state . finishing ) {
// emit 'finish' at the very end.
state . finishing = true ;
stream . emit ( 'finish' ) ;
state . finished = true ;
return ;
}
if ( state . length === 0 && state . needDrain ) {
// Must force callback to be called on nextTick, so that we don't
// emit 'drain' before the write() consumer gets the 'false' return
// value, and has a chance to attach a 'drain' listener.
process . nextTick ( function ( ) {
if ( ! state . needDrain )
return ;
// Must force callback to be called on nextTick, so that we don't
// emit 'drain' before the write() consumer gets the 'false' return
// value, and has a chance to attach a 'drain' listener.
function onwriteDrain ( stream , state ) {
process . nextTick ( function ( ) {
if ( state . needDrain ) {
state . needDrain = false ;
stream . emit ( 'drain' ) ;
} ) ;
}
// if there's something in the buffer waiting, then process it
// It would be nice if there were TCO in JS, and we could just
// shift the top off the buffer and _write that, but that approach
// causes RangeErrors when you have a very large number of very
// small writes, and is not very efficient otherwise.
if ( ! state . bufferProcessing && state . buffer . length ) {
state . bufferProcessing = true ;
for ( var c = 0 ; c < state . buffer . length ; c ++ ) {
var chunkCb = state . buffer [ c ] ;
var chunk = chunkCb [ 0 ] ;
cb = chunkCb [ 1 ] ;
if ( state . objectMode )
len = 1 ;
else if ( false === state . decodeStrings )
len = chunk [ 0 ] . length ;
else
len = chunk . length ;
state . writelen = len ;
state . writecb = cb ;
state . writechunk = chunk ;
state . writing = true ;
state . sync = true ;
stream . _ write ( chunk , state . onwrite ) ;
state . sync = false ;
// if we didn't call the onwrite immediately, then
// it means that we need to wait until it does.
// also, that means that the chunk and cb are currently
// being processed, so move the buffer counter past them.
if ( state . writing ) {
c ++ ;
break ;
}
}
} ) ;
}
state . bufferProcessing = false ;
if ( c < state . buffer . length )
state . buffer = state . buffer . slice ( c ) ;
else
state . buffer . length = 0 ;
// if there's something in the buffer waiting, then process it
function clearBuffer ( stream , state ) {
state . bufferProcessing = true ;
// XXX buffer entry should be [chunk, encoding, cb]
for ( var c = 0 ; c < state . buffer . length ; c ++ ) {
var chunkCb = state . buffer [ c ] ;
var chunk = chunkCb [ 0 ] ;
var cb = chunkCb [ 1 ] ;
var encoding = '' ;
var len ;
if ( state . objectMode )
len = 1 ;
else if ( false === state . decodeStrings ) {
len = chunk [ 0 ] . length ;
encoding = chunk [ 1 ] ;
} else
len = chunk . length ;
doWrite ( stream , state , len , chunk , encoding , cb ) ;
// if we didn't call the onwrite immediately, then
// it means that we need to wait until it does.
// also, that means that the chunk and cb are currently
// being processed, so move the buffer counter past them.
if ( state . writing ) {
c ++ ;
break ;
}
}
state . bufferProcessing = false ;
if ( c < state . buffer . length )
state . buffer = state . buffer . slice ( c ) ;
else
state . buffer . length = 0 ;
}
Writable . prototype . _ write = function ( chunk , cb ) {
@ -317,19 +328,23 @@ Writable.prototype.end = function(chunk, encoding, cb) {
this . write ( chunk , encoding ) ;
// ignore unnecessary end() calls.
if ( ! state . ending && ! state . ended && ! state . finished )
if ( ! state . ending && ! state . finished )
endWritable ( this , state , cb ) ;
} ;
function endWritable ( stream , state , cb ) {
state . ending = true ;
if ( state . length === 0 && ! state . finishing ) {
state . finishing = true ;
stream . emit ( 'finish' ) ;
function finishMaybe ( stream , state ) {
if ( state . ending && state . length === 0 && ! state . finished ) {
state . finished = true ;
stream . emit ( 'finish' ) ;
}
return state . finished ;
}
function endWritable ( stream , state , cb ) {
state . ending = true ;
finishMaybe ( stream , state ) ;
if ( cb ) {
if ( state . finished || state . finishing )
if ( state . finished )
process . nextTick ( cb ) ;
else
stream . once ( 'finish' , cb ) ;