@ -8,27 +8,29 @@ const Stream = require('stream');
const Buffer = require ( 'buffer' ) . Buffer ;
const util = require ( 'util' ) ;
const debug = util . debuglog ( 'stream' ) ;
const BufferList = require ( 'internal/streams/BufferList' ) ;
var StringDecoder ;
util . inherits ( Readable , Stream ) ;
const hasPrependListener = typeof EE . prototype . prependListener === 'function' ;
function prependListener ( emitter , event , fn ) {
if ( hasPrependListener )
var prependListener ;
if ( typeof EE . prototype . prependListener === 'function' ) {
prependListener = function prependListener ( emitter , event , fn ) {
return emitter . prependListener ( event , fn ) ;
// This is a brutally ugly hack to make sure that our error handler
// is attached before any userland ones. NEVER DO THIS. This is here
// only because this code needs to continue to work with older versions
// of Node.js that do not include the prependListener() method. The goal
// is to eventually remove this hack.
if ( ! emitter . _ events || ! emitter . _ events [ event ] )
emitter . on ( event , fn ) ;
else if ( Array . isArray ( emitter . _ events [ event ] ) )
emitter . _ events [ event ] . unshift ( fn ) ;
else
emitter . _ events [ event ] = [ fn , emitter . _ events [ event ] ] ;
} ;
} else {
prependListener = function prependListener ( emitter , event , fn ) {
// This is a hack to make sure that our error handler is attached before any
// userland ones. NEVER DO THIS. This is here only because this code needs
// to continue to work with older versions of Node.js that do not include
// the prependListener() method. The goal is to eventually remove this hack.
if ( ! emitter . _ events || ! emitter . _ events [ event ] )
emitter . on ( event , fn ) ;
else if ( Array . isArray ( emitter . _ events [ event ] ) )
emitter . _ events [ event ] . unshift ( fn ) ;
else
emitter . _ events [ event ] = [ fn , emitter . _ events [ event ] ] ;
} ;
}
function ReadableState ( options , stream ) {
@ -50,7 +52,10 @@ function ReadableState(options, stream) {
// cast to ints.
this . highWaterMark = ~ ~ this . highWaterMark ;
this . buffer = [ ] ;
// A linked list is used to store data chunks instead of an array because the
// linked list can remove elements from the beginning faster than
// array.shift()
this . buffer = new BufferList ( ) ;
this . length = 0 ;
this . pipes = null ;
this . pipesCount = 0 ;
@ -223,7 +228,8 @@ function computeNewHighWaterMark(n) {
if ( n >= MAX_HWM ) {
n = MAX_HWM ;
} else {
// Get the next highest power of 2
// Get the next highest power of 2 to prevent increasing hwm excessively in
// tiny amounts
n -- ;
n |= n >>> 1 ;
n |= n >>> 2 ;
@ -235,51 +241,41 @@ function computeNewHighWaterMark(n) {
return n ;
}
// This function is designed to be inlinable, so please take care when making
// changes to the function body.
function howMuchToRead ( n , state ) {
if ( state . length === 0 && state . ended )
if ( n <= 0 || ( state . length === 0 && state . ended ) )
return 0 ;
if ( state . objectMode )
return n === 0 ? 0 : 1 ;
if ( n === null || isNaN ( n ) ) {
// only flow one buffer at a time
if ( state . flowing && state . buffer . length )
return state . buffer [ 0 ] . length ;
return 1 ;
if ( n !== n ) {
// Only flow one buffer at a time
if ( state . flowing && state . length )
return state . buffer . head . data . length ;
else
return state . length ;
}
if ( n <= 0 )
return 0 ;
// If we're asking for more than the target buffer level,
// then raise the water mark. Bump up to the next highest
// power of 2, to prevent increasing it excessively in tiny
// amounts.
// If we're asking for more than the current hwm, then raise the hwm.
if ( n > state . highWaterMark )
state . highWaterMark = computeNewHighWaterMark ( n ) ;
// don't have that much. return null, unless we've ended.
if ( n > state . length ) {
if ( ! state . ended ) {
state . needReadable = true ;
return 0 ;
} else {
return state . length ;
}
if ( n <= state . length )
return n ;
// Don't have enough
if ( ! state . ended ) {
state . needReadable = true ;
return 0 ;
}
return n ;
return state . length ;
}
// you can override either this method, or the async _read(n) below.
Readable . prototype . read = function ( n ) {
debug ( 'read' , n ) ;
n = parseInt ( n , 10 ) ;
var state = this . _ readableState ;
var nOrig = n ;
if ( typeof n !== 'number' || n > 0 )
if ( n !== 0 )
state . emittedReadable = false ;
// if we're doing read(0) to trigger a readable event, but we
@ -342,9 +338,7 @@ Readable.prototype.read = function(n) {
if ( state . ended || state . reading ) {
doRead = false ;
debug ( 'reading or ended' , doRead ) ;
}
if ( doRead ) {
} else if ( doRead ) {
debug ( 'do read' ) ;
state . reading = true ;
state . sync = true ;
@ -354,13 +348,12 @@ Readable.prototype.read = function(n) {
// call internal read method
this . _ read ( state . highWaterMark ) ;
state . sync = false ;
// If _read pushed data synchronously, then `reading` will be false,
// and we need to re-evaluate how much data we can return to the user.
if ( ! state . reading )
n = howMuchToRead ( nOrig , state ) ;
}
// If _read pushed data synchronously, then `reading` will be false,
// and we need to re-evaluate how much data we can return to the user.
if ( doRead && ! state . reading )
n = howMuchToRead ( nOrig , state ) ;
var ret ;
if ( n > 0 )
ret = fromList ( n , state ) ;
@ -370,18 +363,20 @@ Readable.prototype.read = function(n) {
if ( ret === null ) {
state . needReadable = true ;
n = 0 ;
} else {
state . length -= n ;
}
state . length -= n ;
// If we have nothing in the buffer, then we want to know
// as soon as we *do* get something into the buffer.
if ( state . length === 0 && ! state . ended )
state . needReadable = true ;
if ( state . length === 0 ) {
// If we have nothing in the buffer, then we want to know
// as soon as we *do* get something into the buffer.
if ( ! state . ended )
state . needReadable = true ;
// If we tried to read() past the EOF, then emit end on the next tick.
if ( nOrig !== n && state . ended && state . length === 0 )
endReadable ( this ) ;
// If we tried to read() past the EOF, then emit end on the next tick.
if ( nOrig !== n && state . ended )
endReadable ( this ) ;
}
if ( ret !== null )
this . emit ( 'data' , ret ) ;
@ -683,20 +678,17 @@ Readable.prototype.unpipe = function(dest) {
// set up data events if they are asked for
// Ensure readable listeners eventually get something
Readable . prototype . on = function ( ev , fn ) {
var res = Stream . prototype . on . call ( this , ev , fn ) ;
// If listening to data, and it has not explicitly been paused,
// then call resume to start the flow of data on the next tick.
if ( ev === 'data' && false !== this . _ readableState . flowing ) {
this . resume ( ) ;
}
if ( ev === 'readable' && ! this . _ readableState . endEmitted ) {
var state = this . _ readableState ;
if ( ! state . readableListening ) {
state . readableListening = true ;
const res = Stream . prototype . on . call ( this , ev , fn ) ;
if ( ev === 'data' ) {
// Start flowing on next tick if stream isn't explicitly paused
if ( this . _ readableState . flowing !== false )
this . resume ( ) ;
} else if ( ev === 'readable' ) {
const state = this . _ readableState ;
if ( ! state . endEmitted && ! state . readableListening ) {
state . readableListening = state . needReadable = true ;
state . emittedReadable = false ;
state . needReadable = true ;
if ( ! state . reading ) {
process . nextTick ( nReadingNextTick , this ) ;
} else if ( state . length ) {
@ -758,13 +750,9 @@ Readable.prototype.pause = function() {
} ;
function flow ( stream ) {
var state = stream . _ readableState ;
const state = stream . _ readableState ;
debug ( 'flow' , state . flowing ) ;
if ( state . flowing ) {
do {
var chunk = stream . read ( ) ;
} while ( null !== chunk && state . flowing ) ;
}
while ( state . flowing && stream . read ( ) !== null ) ;
}
// wrap an old-style stream as the async data source.
@ -839,69 +827,120 @@ Readable._fromList = fromList;
// Pluck off n bytes from an array of buffers.
// Length is the combined lengths of all the buffers in the list.
// This function is designed to be inlinable, so please take care when making
// changes to the function body.
function fromList ( n , state ) {
var list = state . buffer ;
var length = state . length ;
var stringMode = ! ! state . decoder ;
var objectMode = ! ! state . objectMode ;
// nothing buffered
if ( state . length === 0 )
return null ;
var ret ;
if ( state . objectMode )
ret = state . buffer . shift ( ) ;
else if ( ! n || n >= state . length ) {
// read it all, truncate the list
if ( state . decoder )
ret = state . buffer . join ( '' ) ;
else if ( state . buffer . length === 1 )
ret = state . buffer . head . data ;
else
ret = state . buffer . concat ( state . length ) ;
state . buffer . clear ( ) ;
} else {
// read part of list
ret = fromListPartial ( n , state . buffer , state . decoder ) ;
}
// nothing in the list, definitely empty.
if ( list . length === 0 )
return null ;
return ret ;
}
if ( length === 0 )
ret = null ;
else if ( objectMode )
// Extracts only enough buffered data to satisfy the amount requested.
// This function is designed to be inlinable, so please take care when making
// changes to the function body.
function fromListPartial ( n , list , hasStrings ) {
var ret ;
if ( n < list . head . data . length ) {
// slice is the same for buffers and strings
ret = list . head . data . slice ( 0 , n ) ;
list . head . data = list . head . data . slice ( n ) ;
} else if ( n === list . head . data . length ) {
// first chunk is a perfect match
ret = list . shift ( ) ;
else if ( ! n || n >= length ) {
// read it all, truncate the array.
if ( stringMode )
ret = list . join ( '' ) ;
else if ( list . length === 1 )
ret = list [ 0 ] ;
else
ret = Buffer . concat ( list , length ) ;
list . length = 0 ;
} else {
// read just some of it.
if ( n < list [ 0 ] . length ) {
// just take a part of the first list item.
// slice is the same for buffers and strings.
const buf = list [ 0 ] ;
ret = buf . slice ( 0 , n ) ;
list [ 0 ] = buf . slice ( n ) ;
} else if ( n === list [ 0 ] . length ) {
// first list is a perfect match
ret = list . shift ( ) ;
} else {
// complex case.
// we have enough to cover it, but it spans past the first buffer.
if ( stringMode )
ret = '' ;
else
ret = Buffer . allocUnsafe ( n ) ;
var c = 0 ;
for ( var i = 0 , l = list . length ; i < l && c < n ; i ++ ) {
const buf = list [ 0 ] ;
var cpy = Math . min ( n - c , buf . length ) ;
if ( stringMode )
ret += buf . slice ( 0 , cpy ) ;
else
buf . copy ( ret , c , 0 , cpy ) ;
// result spans more than one buffer
ret = ( hasStrings
? copyFromBufferString ( n , list )
: copyFromBuffer ( n , list ) ) ;
}
return ret ;
}
if ( cpy < buf . length )
list [ 0 ] = buf . slice ( cpy ) ;
// Copies a specified amount of characters from the list of buffered data
// chunks.
// This function is designed to be inlinable, so please take care when making
// changes to the function body.
function copyFromBufferString ( n , list ) {
var p = list . head ;
var c = 1 ;
var ret = p . data ;
n -= ret . length ;
while ( p = p . next ) {
const str = p . data ;
const nb = ( n > str . length ? str . length : n ) ;
if ( nb === str . length )
ret += str ;
else
ret += str . slice ( 0 , n ) ;
n -= nb ;
if ( n === 0 ) {
if ( nb === str . length ) {
++ c ;
if ( p . next )
list . head = p . next ;
else
list . shift ( ) ;
c += cpy ;
list . head = list . tail = null ;
} else {
list . head = p ;
p . data = str . slice ( nb ) ;
}
break ;
}
++ c ;
}
list . length -= c ;
return ret ;
}
// Copies a specified amount of bytes from the list of buffered data chunks.
// This function is designed to be inlinable, so please take care when making
// changes to the function body.
function copyFromBuffer ( n , list ) {
const ret = Buffer . allocUnsafe ( n ) ;
var p = list . head ;
var c = 1 ;
p . data . copy ( ret ) ;
n -= p . data . length ;
while ( p = p . next ) {
const buf = p . data ;
const nb = ( n > buf . length ? buf . length : n ) ;
buf . copy ( ret , ret . length - n , 0 , nb ) ;
n -= nb ;
if ( n === 0 ) {
if ( nb === buf . length ) {
++ c ;
if ( p . next )
list . head = p . next ;
else
list . head = list . tail = null ;
} else {
list . head = p ;
p . data = buf . slice ( nb ) ;
}
break ;
}
++ c ;
}
list . length -= c ;
return ret ;
}