mirror of https://github.com/lukechilds/node.git
89 changed files with 4403 additions and 1013 deletions
@ -0,0 +1,63 @@ |
|||||
|
// Copyright Joyent, Inc. and other Node contributors.
|
||||
|
//
|
||||
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
|
// copy of this software and associated documentation files (the
|
||||
|
// "Software"), to deal in the Software without restriction, including
|
||||
|
// without limitation the rights to use, copy, modify, merge, publish,
|
||||
|
// distribute, sublicense, and/or sell copies of the Software, and to permit
|
||||
|
// persons to whom the Software is furnished to do so, subject to the
|
||||
|
// following conditions:
|
||||
|
//
|
||||
|
// The above copyright notice and this permission notice shall be included
|
||||
|
// in all copies or substantial portions of the Software.
|
||||
|
//
|
||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
|
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
|
||||
|
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
|
||||
|
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
|
||||
|
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
||||
|
// USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
|
||||
|
// a duplex stream is just a stream that is both readable and writable.
|
||||
|
// Since JS doesn't have multiple prototypal inheritance, this class
|
||||
|
// prototypally inherits from Readable, and then parasitically from
|
||||
|
// Writable.
|
||||
|
|
||||
|
module.exports = Duplex; |
||||
|
var util = require('util'); |
||||
|
var Readable = require('_stream_readable'); |
||||
|
var Writable = require('_stream_writable'); |
||||
|
|
||||
|
util.inherits(Duplex, Readable); |
||||
|
|
||||
|
Object.keys(Writable.prototype).forEach(function(method) { |
||||
|
if (!Duplex.prototype[method]) |
||||
|
Duplex.prototype[method] = Writable.prototype[method]; |
||||
|
}); |
||||
|
|
||||
|
function Duplex(options) { |
||||
|
if (!(this instanceof Duplex)) |
||||
|
return new Duplex(options); |
||||
|
|
||||
|
Readable.call(this, options); |
||||
|
Writable.call(this, options); |
||||
|
|
||||
|
this.allowHalfOpen = true; |
||||
|
if (options && options.allowHalfOpen === false) |
||||
|
this.allowHalfOpen = false; |
||||
|
|
||||
|
this.once('end', onend); |
||||
|
} |
||||
|
|
||||
|
// the no-half-open enforcer
|
||||
|
function onend() { |
||||
|
// if we allow half-open state, or if the writable side ended,
|
||||
|
// then we're ok.
|
||||
|
if (this.allowHalfOpen || this._writableState.ended) |
||||
|
return; |
||||
|
|
||||
|
// no more data can be written.
|
||||
|
// But allow more writes to happen in this tick.
|
||||
|
process.nextTick(this.end.bind(this)); |
||||
|
} |
@ -0,0 +1,752 @@ |
|||||
|
// Copyright Joyent, Inc. and other Node contributors.
|
||||
|
//
|
||||
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
|
// copy of this software and associated documentation files (the
|
||||
|
// "Software"), to deal in the Software without restriction, including
|
||||
|
// without limitation the rights to use, copy, modify, merge, publish,
|
||||
|
// distribute, sublicense, and/or sell copies of the Software, and to permit
|
||||
|
// persons to whom the Software is furnished to do so, subject to the
|
||||
|
// following conditions:
|
||||
|
//
|
||||
|
// The above copyright notice and this permission notice shall be included
|
||||
|
// in all copies or substantial portions of the Software.
|
||||
|
//
|
||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
|
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
|
||||
|
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
|
||||
|
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
|
||||
|
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
||||
|
// USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
|
||||
|
module.exports = Readable; |
||||
|
Readable.ReadableState = ReadableState; |
||||
|
|
||||
|
var Stream = require('stream'); |
||||
|
var util = require('util'); |
||||
|
var assert = require('assert'); |
||||
|
var StringDecoder; |
||||
|
|
||||
|
util.inherits(Readable, Stream); |
||||
|
|
||||
|
function ReadableState(options, stream) { |
||||
|
options = options || {}; |
||||
|
|
||||
|
// the argument passed to this._read(n,cb)
|
||||
|
this.bufferSize = options.hasOwnProperty('bufferSize') ? |
||||
|
options.bufferSize : 16 * 1024; |
||||
|
|
||||
|
// the point at which it stops calling _read() to fill the buffer
|
||||
|
this.highWaterMark = options.hasOwnProperty('highWaterMark') ? |
||||
|
options.highWaterMark : 16 * 1024; |
||||
|
|
||||
|
// the minimum number of bytes to buffer before emitting 'readable'
|
||||
|
// default to pushing everything out as fast as possible.
|
||||
|
this.lowWaterMark = options.hasOwnProperty('lowWaterMark') ? |
||||
|
options.lowWaterMark : 0; |
||||
|
|
||||
|
// cast to ints.
|
||||
|
assert(typeof this.bufferSize === 'number'); |
||||
|
assert(typeof this.lowWaterMark === 'number'); |
||||
|
assert(typeof this.highWaterMark === 'number'); |
||||
|
this.bufferSize = ~~this.bufferSize; |
||||
|
this.lowWaterMark = ~~this.lowWaterMark; |
||||
|
this.highWaterMark = ~~this.highWaterMark; |
||||
|
assert(this.bufferSize >= 0); |
||||
|
assert(this.lowWaterMark >= 0); |
||||
|
assert(this.highWaterMark >= this.lowWaterMark, |
||||
|
this.highWaterMark + '>=' + this.lowWaterMark); |
||||
|
|
||||
|
this.buffer = []; |
||||
|
this.length = 0; |
||||
|
this.pipes = null; |
||||
|
this.pipesCount = 0; |
||||
|
this.flowing = false; |
||||
|
this.ended = false; |
||||
|
this.endEmitted = false; |
||||
|
this.reading = false; |
||||
|
this.sync = false; |
||||
|
this.onread = function(er, data) { |
||||
|
onread(stream, er, data); |
||||
|
}; |
||||
|
|
||||
|
// whenever we return null, then we set a flag to say
|
||||
|
// that we're awaiting a 'readable' event emission.
|
||||
|
this.needReadable = false; |
||||
|
this.emittedReadable = false; |
||||
|
|
||||
|
// when piping, we only care about 'readable' events that happen
|
||||
|
// after read()ing all the bytes and not getting any pushback.
|
||||
|
this.ranOut = false; |
||||
|
|
||||
|
// the number of writers that are awaiting a drain event in .pipe()s
|
||||
|
this.awaitDrain = 0; |
||||
|
this.pipeChunkSize = null; |
||||
|
|
||||
|
this.decoder = null; |
||||
|
if (options.encoding) { |
||||
|
if (!StringDecoder) |
||||
|
StringDecoder = require('string_decoder').StringDecoder; |
||||
|
this.decoder = new StringDecoder(options.encoding); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
function Readable(options) { |
||||
|
if (!(this instanceof Readable)) |
||||
|
return new Readable(options); |
||||
|
|
||||
|
this._readableState = new ReadableState(options, this); |
||||
|
|
||||
|
// legacy
|
||||
|
this.readable = true; |
||||
|
|
||||
|
Stream.apply(this); |
||||
|
} |
||||
|
|
||||
|
// backwards compatibility.
|
||||
|
Readable.prototype.setEncoding = function(enc) { |
||||
|
if (!StringDecoder) |
||||
|
StringDecoder = require('string_decoder').StringDecoder; |
||||
|
this._readableState.decoder = new StringDecoder(enc); |
||||
|
}; |
||||
|
|
||||
|
|
||||
|
function howMuchToRead(n, state) { |
||||
|
if (state.length === 0 && state.ended) |
||||
|
return 0; |
||||
|
|
||||
|
if (isNaN(n) || n === null) |
||||
|
return state.length; |
||||
|
|
||||
|
if (n <= 0) |
||||
|
return 0; |
||||
|
|
||||
|
// don't have that much. return null, unless we've ended.
|
||||
|
if (n > state.length) { |
||||
|
if (!state.ended) { |
||||
|
state.needReadable = true; |
||||
|
return 0; |
||||
|
} else |
||||
|
return state.length; |
||||
|
} |
||||
|
|
||||
|
return n; |
||||
|
} |
||||
|
|
||||
|
// you can override either this method, or _read(n, cb) below.
|
||||
|
Readable.prototype.read = function(n) { |
||||
|
var state = this._readableState; |
||||
|
var nOrig = n; |
||||
|
|
||||
|
if (typeof n !== 'number' || n > 0) |
||||
|
state.emittedReadable = false; |
||||
|
|
||||
|
n = howMuchToRead(n, state); |
||||
|
|
||||
|
// if we've ended, and we're now clear, then finish it up.
|
||||
|
if (n === 0 && state.ended) { |
||||
|
endReadable(this); |
||||
|
return null; |
||||
|
} |
||||
|
|
||||
|
// All the actual chunk generation logic needs to be
|
||||
|
// *below* the call to _read. The reason is that in certain
|
||||
|
// synthetic stream cases, such as passthrough streams, _read
|
||||
|
// may be a completely synchronous operation which may change
|
||||
|
// the state of the read buffer, providing enough data when
|
||||
|
// before there was *not* enough.
|
||||
|
//
|
||||
|
// So, the steps are:
|
||||
|
// 1. Figure out what the state of things will be after we do
|
||||
|
// a read from the buffer.
|
||||
|
//
|
||||
|
// 2. If that resulting state will trigger a _read, then call _read.
|
||||
|
// Note that this may be asynchronous, or synchronous. Yes, it is
|
||||
|
// deeply ugly to write APIs this way, but that still doesn't mean
|
||||
|
// that the Readable class should behave improperly, as streams are
|
||||
|
// designed to be sync/async agnostic.
|
||||
|
// Take note if the _read call is sync or async (ie, if the read call
|
||||
|
// has returned yet), so that we know whether or not it's safe to emit
|
||||
|
// 'readable' etc.
|
||||
|
//
|
||||
|
// 3. Actually pull the requested chunks out of the buffer and return.
|
||||
|
|
||||
|
// if we need a readable event, then we need to do some reading.
|
||||
|
var doRead = state.needReadable; |
||||
|
|
||||
|
// if we currently have less than the highWaterMark, then also read some
|
||||
|
if (state.length - n <= state.highWaterMark) |
||||
|
doRead = true; |
||||
|
|
||||
|
// however, if we've ended, then there's no point, and if we're already
|
||||
|
// reading, then it's unnecessary.
|
||||
|
if (state.ended || state.reading) |
||||
|
doRead = false; |
||||
|
|
||||
|
if (doRead) { |
||||
|
state.reading = true; |
||||
|
state.sync = true; |
||||
|
// if the length is currently zero, then we *need* a readable event.
|
||||
|
if (state.length === 0) |
||||
|
state.needReadable = true; |
||||
|
// call internal read method
|
||||
|
this._read(state.bufferSize, state.onread); |
||||
|
state.sync = false; |
||||
|
} |
||||
|
|
||||
|
// If _read called its callback synchronously, then `reading`
|
||||
|
// will be false, and we need to re-evaluate how much data we
|
||||
|
// can return to the user.
|
||||
|
if (doRead && !state.reading) |
||||
|
n = howMuchToRead(nOrig, state); |
||||
|
|
||||
|
var ret; |
||||
|
if (n > 0) |
||||
|
ret = fromList(n, state.buffer, state.length, !!state.decoder); |
||||
|
else |
||||
|
ret = null; |
||||
|
|
||||
|
if (ret === null || ret.length === 0) { |
||||
|
state.needReadable = true; |
||||
|
n = 0; |
||||
|
} |
||||
|
|
||||
|
state.length -= n; |
||||
|
|
||||
|
// If we have nothing in the buffer, then we want to know
|
||||
|
// as soon as we *do* get something into the buffer.
|
||||
|
if (state.length === 0 && !state.ended) |
||||
|
state.needReadable = true; |
||||
|
|
||||
|
return ret; |
||||
|
}; |
||||
|
|
||||
|
function onread(stream, er, chunk) { |
||||
|
var state = stream._readableState; |
||||
|
var sync = state.sync; |
||||
|
|
||||
|
state.reading = false; |
||||
|
if (er) |
||||
|
return stream.emit('error', er); |
||||
|
|
||||
|
if (!chunk || !chunk.length) { |
||||
|
// eof
|
||||
|
state.ended = true; |
||||
|
if (state.decoder) { |
||||
|
chunk = state.decoder.end(); |
||||
|
if (chunk && chunk.length) { |
||||
|
state.buffer.push(chunk); |
||||
|
state.length += chunk.length; |
||||
|
} |
||||
|
} |
||||
|
// if we've ended and we have some data left, then emit
|
||||
|
// 'readable' now to make sure it gets picked up.
|
||||
|
if (!sync) { |
||||
|
if (state.length > 0) { |
||||
|
state.needReadable = false; |
||||
|
if (!state.emittedReadable) { |
||||
|
state.emittedReadable = true; |
||||
|
stream.emit('readable'); |
||||
|
} |
||||
|
} else |
||||
|
endReadable(stream); |
||||
|
} |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
if (state.decoder) |
||||
|
chunk = state.decoder.write(chunk); |
||||
|
|
||||
|
// update the buffer info.
|
||||
|
if (chunk) { |
||||
|
state.length += chunk.length; |
||||
|
state.buffer.push(chunk); |
||||
|
} |
||||
|
|
||||
|
// if we haven't gotten enough to pass the lowWaterMark,
|
||||
|
// and we haven't ended, then don't bother telling the user
|
||||
|
// that it's time to read more data. Otherwise, emitting 'readable'
|
||||
|
// probably will trigger another stream.read(), which can trigger
|
||||
|
// another _read(n,cb) before this one returns!
|
||||
|
if (state.length <= state.lowWaterMark) { |
||||
|
state.reading = true; |
||||
|
stream._read(state.bufferSize, state.onread); |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
if (state.needReadable && !sync) { |
||||
|
state.needReadable = false; |
||||
|
if (!state.emittedReadable) { |
||||
|
state.emittedReadable = true; |
||||
|
stream.emit('readable'); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
// abstract method. to be overridden in specific implementation classes.
|
||||
|
// call cb(er, data) where data is <= n in length.
|
||||
|
// for virtual (non-string, non-buffer) streams, "length" is somewhat
|
||||
|
// arbitrary, and perhaps not very meaningful.
|
||||
|
Readable.prototype._read = function(n, cb) { |
||||
|
process.nextTick(function() { |
||||
|
cb(new Error('not implemented')); |
||||
|
}); |
||||
|
}; |
||||
|
|
||||
|
Readable.prototype.pipe = function(dest, pipeOpts) { |
||||
|
var src = this; |
||||
|
var state = this._readableState; |
||||
|
|
||||
|
switch (state.pipesCount) { |
||||
|
case 0: |
||||
|
state.pipes = dest; |
||||
|
break; |
||||
|
case 1: |
||||
|
state.pipes = [state.pipes, dest]; |
||||
|
break; |
||||
|
default: |
||||
|
state.pipes.push(dest); |
||||
|
break; |
||||
|
} |
||||
|
state.pipesCount += 1; |
||||
|
|
||||
|
if ((!pipeOpts || pipeOpts.end !== false) && |
||||
|
dest !== process.stdout && |
||||
|
dest !== process.stderr) { |
||||
|
src.once('end', onend); |
||||
|
dest.on('unpipe', function(readable) { |
||||
|
if (readable === src) |
||||
|
src.removeListener('end', onend); |
||||
|
}); |
||||
|
} |
||||
|
|
||||
|
if (pipeOpts && pipeOpts.chunkSize) |
||||
|
state.pipeChunkSize = pipeOpts.chunkSize; |
||||
|
|
||||
|
function onend() { |
||||
|
dest.end(); |
||||
|
} |
||||
|
|
||||
|
// when the dest drains, it reduces the awaitDrain counter
|
||||
|
// on the source. This would be more elegant with a .once()
|
||||
|
// handler in flow(), but adding and removing repeatedly is
|
||||
|
// too slow.
|
||||
|
var ondrain = pipeOnDrain(src); |
||||
|
dest.on('drain', ondrain); |
||||
|
dest.on('unpipe', function(readable) { |
||||
|
if (readable === src) |
||||
|
dest.removeListener('drain', ondrain); |
||||
|
|
||||
|
// if the reader is waiting for a drain event from this
|
||||
|
// specific writer, then it would cause it to never start
|
||||
|
// flowing again.
|
||||
|
// So, if this is awaiting a drain, then we just call it now.
|
||||
|
// If we don't know, then assume that we are waiting for one.
|
||||
|
if (!dest._writableState || dest._writableState.needDrain) |
||||
|
ondrain(); |
||||
|
}); |
||||
|
|
||||
|
// if the dest has an error, then stop piping into it.
|
||||
|
// however, don't suppress the throwing behavior for this.
|
||||
|
dest.once('error', function(er) { |
||||
|
unpipe(); |
||||
|
if (dest.listeners('error').length === 0) |
||||
|
dest.emit('error', er); |
||||
|
}); |
||||
|
|
||||
|
// if the dest emits close, then presumably there's no point writing
|
||||
|
// to it any more.
|
||||
|
dest.on('close', unpipe); |
||||
|
dest.on('finish', function() { |
||||
|
dest.removeListener('close', unpipe); |
||||
|
}); |
||||
|
|
||||
|
function unpipe() { |
||||
|
src.unpipe(dest); |
||||
|
} |
||||
|
|
||||
|
// tell the dest that it's being piped to
|
||||
|
dest.emit('pipe', src); |
||||
|
|
||||
|
// start the flow if it hasn't been started already.
|
||||
|
if (!state.flowing) { |
||||
|
// the handler that waits for readable events after all
|
||||
|
// the data gets sucked out in flow.
|
||||
|
// This would be easier to follow with a .once() handler
|
||||
|
// in flow(), but that is too slow.
|
||||
|
this.on('readable', pipeOnReadable); |
||||
|
|
||||
|
state.flowing = true; |
||||
|
process.nextTick(function() { |
||||
|
flow(src); |
||||
|
}); |
||||
|
} |
||||
|
|
||||
|
return dest; |
||||
|
}; |
||||
|
|
||||
|
function pipeOnDrain(src) { |
||||
|
return function() { |
||||
|
var dest = this; |
||||
|
var state = src._readableState; |
||||
|
state.awaitDrain--; |
||||
|
if (state.awaitDrain === 0) |
||||
|
flow(src); |
||||
|
}; |
||||
|
} |
||||
|
|
||||
|
function flow(src) { |
||||
|
var state = src._readableState; |
||||
|
var chunk; |
||||
|
state.awaitDrain = 0; |
||||
|
|
||||
|
function write(dest, i, list) { |
||||
|
var written = dest.write(chunk); |
||||
|
if (false === written) { |
||||
|
state.awaitDrain++; |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
while (state.pipesCount && |
||||
|
null !== (chunk = src.read(state.pipeChunkSize))) { |
||||
|
|
||||
|
if (state.pipesCount === 1) |
||||
|
write(state.pipes, 0, null); |
||||
|
else |
||||
|
state.pipes.forEach(write); |
||||
|
|
||||
|
src.emit('data', chunk); |
||||
|
|
||||
|
// if anyone needs a drain, then we have to wait for that.
|
||||
|
if (state.awaitDrain > 0) |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
// if every destination was unpiped, either before entering this
|
||||
|
// function, or in the while loop, then stop flowing.
|
||||
|
//
|
||||
|
// NB: This is a pretty rare edge case.
|
||||
|
if (state.pipesCount === 0) { |
||||
|
state.flowing = false; |
||||
|
|
||||
|
// if there were data event listeners added, then switch to old mode.
|
||||
|
if (src.listeners('data').length) |
||||
|
emitDataEvents(src); |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
// at this point, no one needed a drain, so we just ran out of data
|
||||
|
// on the next readable event, start it over again.
|
||||
|
state.ranOut = true; |
||||
|
} |
||||
|
|
||||
|
function pipeOnReadable() { |
||||
|
if (this._readableState.ranOut) { |
||||
|
this._readableState.ranOut = false; |
||||
|
flow(this); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
|
||||
|
Readable.prototype.unpipe = function(dest) { |
||||
|
var state = this._readableState; |
||||
|
|
||||
|
// if we're not piping anywhere, then do nothing.
|
||||
|
if (state.pipesCount === 0) |
||||
|
return this; |
||||
|
|
||||
|
// just one destination. most common case.
|
||||
|
if (state.pipesCount === 1) { |
||||
|
// passed in one, but it's not the right one.
|
||||
|
if (dest && dest !== state.pipes) |
||||
|
return this; |
||||
|
|
||||
|
if (!dest) |
||||
|
dest = state.pipes; |
||||
|
|
||||
|
// got a match.
|
||||
|
state.pipes = null; |
||||
|
state.pipesCount = 0; |
||||
|
this.removeListener('readable', pipeOnReadable); |
||||
|
if (dest) |
||||
|
dest.emit('unpipe', this); |
||||
|
return this; |
||||
|
} |
||||
|
|
||||
|
// slow case. multiple pipe destinations.
|
||||
|
|
||||
|
if (!dest) { |
||||
|
// remove all.
|
||||
|
var dests = state.pipes; |
||||
|
var len = state.pipesCount; |
||||
|
state.pipes = null; |
||||
|
state.pipesCount = 0; |
||||
|
this.removeListener('readable', pipeOnReadable); |
||||
|
|
||||
|
for (var i = 0; i < len; i++) |
||||
|
dests[i].emit('unpipe', this); |
||||
|
return this; |
||||
|
} |
||||
|
|
||||
|
// try to find the right one.
|
||||
|
var i = state.pipes.indexOf(dest); |
||||
|
if (i === -1) |
||||
|
return this; |
||||
|
|
||||
|
state.pipes.splice(i, 1); |
||||
|
state.pipesCount -= 1; |
||||
|
if (state.pipesCount === 1) |
||||
|
state.pipes = state.pipes[0]; |
||||
|
|
||||
|
dest.emit('unpipe', this); |
||||
|
|
||||
|
return this; |
||||
|
}; |
||||
|
|
||||
|
// kludge for on('data', fn) consumers. Sad.
|
||||
|
// This is *not* part of the new readable stream interface.
|
||||
|
// It is an ugly unfortunate mess of history.
|
||||
|
Readable.prototype.on = function(ev, fn) { |
||||
|
// https://github.com/isaacs/readable-stream/issues/16
|
||||
|
// if we're already flowing, then no need to set up data events.
|
||||
|
if (ev === 'data' && !this._readableState.flowing) |
||||
|
emitDataEvents(this); |
||||
|
|
||||
|
return Stream.prototype.on.call(this, ev, fn); |
||||
|
}; |
||||
|
Readable.prototype.addListener = Readable.prototype.on; |
||||
|
|
||||
|
// pause() and resume() are remnants of the legacy readable stream API
|
||||
|
// If the user uses them, then switch into old mode.
|
||||
|
Readable.prototype.resume = function() { |
||||
|
emitDataEvents(this); |
||||
|
this.read(0); |
||||
|
this.emit('resume'); |
||||
|
}; |
||||
|
|
||||
|
Readable.prototype.pause = function() { |
||||
|
emitDataEvents(this, true); |
||||
|
this.emit('pause'); |
||||
|
}; |
||||
|
|
||||
|
function emitDataEvents(stream, startPaused) { |
||||
|
var state = stream._readableState; |
||||
|
|
||||
|
if (state.flowing) { |
||||
|
// https://github.com/isaacs/readable-stream/issues/16
|
||||
|
throw new Error('Cannot switch to old mode now.'); |
||||
|
} |
||||
|
|
||||
|
var paused = startPaused || false; |
||||
|
var readable = false; |
||||
|
|
||||
|
// convert to an old-style stream.
|
||||
|
stream.readable = true; |
||||
|
stream.pipe = Stream.prototype.pipe; |
||||
|
stream.on = stream.addEventListener = Stream.prototype.on; |
||||
|
|
||||
|
stream.on('readable', function() { |
||||
|
readable = true; |
||||
|
|
||||
|
var c; |
||||
|
while (!paused && (null !== (c = stream.read()))) |
||||
|
stream.emit('data', c); |
||||
|
|
||||
|
if (c === null) { |
||||
|
readable = false; |
||||
|
stream._readableState.needReadable = true; |
||||
|
} |
||||
|
}); |
||||
|
|
||||
|
stream.pause = function() { |
||||
|
paused = true; |
||||
|
this.emit('pause'); |
||||
|
}; |
||||
|
|
||||
|
stream.resume = function() { |
||||
|
paused = false; |
||||
|
if (readable) |
||||
|
process.nextTick(function() { |
||||
|
stream.emit('readable'); |
||||
|
}); |
||||
|
else |
||||
|
this.read(0); |
||||
|
this.emit('resume'); |
||||
|
}; |
||||
|
|
||||
|
// now make it start, just in case it hadn't already.
|
||||
|
stream.emit('readable'); |
||||
|
} |
||||
|
|
||||
|
// wrap an old-style stream as the async data source.
|
||||
|
// This is *not* part of the readable stream interface.
|
||||
|
// It is an ugly unfortunate mess of history.
|
||||
|
Readable.prototype.wrap = function(stream) { |
||||
|
var state = this._readableState; |
||||
|
var paused = false; |
||||
|
|
||||
|
var self = this; |
||||
|
stream.on('end', function() { |
||||
|
state.ended = true; |
||||
|
if (state.decoder) { |
||||
|
var chunk = state.decoder.end(); |
||||
|
if (chunk && chunk.length) { |
||||
|
state.buffer.push(chunk); |
||||
|
state.length += chunk.length; |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
if (state.length > 0) |
||||
|
self.emit('readable'); |
||||
|
else |
||||
|
endReadable(self); |
||||
|
}); |
||||
|
|
||||
|
stream.on('data', function(chunk) { |
||||
|
if (state.decoder) |
||||
|
chunk = state.decoder.write(chunk); |
||||
|
if (!chunk || !chunk.length) |
||||
|
return; |
||||
|
|
||||
|
state.buffer.push(chunk); |
||||
|
state.length += chunk.length; |
||||
|
self.emit('readable'); |
||||
|
|
||||
|
// if not consumed, then pause the stream.
|
||||
|
if (state.length > state.lowWaterMark && !paused) { |
||||
|
paused = true; |
||||
|
stream.pause(); |
||||
|
} |
||||
|
}); |
||||
|
|
||||
|
// proxy all the other methods.
|
||||
|
// important when wrapping filters and duplexes.
|
||||
|
for (var i in stream) { |
||||
|
if (typeof stream[i] === 'function' && |
||||
|
typeof this[i] === 'undefined') { |
||||
|
this[i] = function(method) { return function() { |
||||
|
return stream[method].apply(stream, arguments); |
||||
|
}}(i); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
// proxy certain important events.
|
||||
|
var events = ['error', 'close', 'destroy', 'pause', 'resume']; |
||||
|
events.forEach(function(ev) { |
||||
|
stream.on(ev, self.emit.bind(self, ev)); |
||||
|
}); |
||||
|
|
||||
|
// consume some bytes. if not all is consumed, then
|
||||
|
// pause the underlying stream.
|
||||
|
this.read = function(n) { |
||||
|
if (state.length === 0) { |
||||
|
state.needReadable = true; |
||||
|
return null; |
||||
|
} |
||||
|
|
||||
|
if (isNaN(n) || n <= 0) |
||||
|
n = state.length; |
||||
|
|
||||
|
if (n > state.length) { |
||||
|
if (!state.ended) { |
||||
|
state.needReadable = true; |
||||
|
return null; |
||||
|
} else |
||||
|
n = state.length; |
||||
|
} |
||||
|
|
||||
|
var ret = fromList(n, state.buffer, state.length, !!state.decoder); |
||||
|
state.length -= n; |
||||
|
|
||||
|
if (state.length === 0 && !state.ended) |
||||
|
state.needReadable = true; |
||||
|
|
||||
|
if (state.length <= state.lowWaterMark && paused) { |
||||
|
stream.resume(); |
||||
|
paused = false; |
||||
|
} |
||||
|
|
||||
|
if (state.length === 0 && state.ended) |
||||
|
endReadable(this); |
||||
|
|
||||
|
return ret; |
||||
|
}; |
||||
|
}; |
||||
|
|
||||
|
|
||||
|
|
||||
|
// exposed for testing purposes only.
|
||||
|
Readable._fromList = fromList; |
||||
|
|
||||
|
// Pluck off n bytes from an array of buffers.
|
||||
|
// Length is the combined lengths of all the buffers in the list.
|
||||
|
function fromList(n, list, length, stringMode) { |
||||
|
var ret; |
||||
|
|
||||
|
// nothing in the list, definitely empty.
|
||||
|
if (list.length === 0) { |
||||
|
return null; |
||||
|
} |
||||
|
|
||||
|
if (length === 0) |
||||
|
ret = null; |
||||
|
else if (!n || n >= length) { |
||||
|
// read it all, truncate the array.
|
||||
|
if (stringMode) |
||||
|
ret = list.join(''); |
||||
|
else |
||||
|
ret = Buffer.concat(list, length); |
||||
|
list.length = 0; |
||||
|
} else { |
||||
|
// read just some of it.
|
||||
|
if (n < list[0].length) { |
||||
|
// just take a part of the first list item.
|
||||
|
// slice is the same for buffers and strings.
|
||||
|
var buf = list[0]; |
||||
|
ret = buf.slice(0, n); |
||||
|
list[0] = buf.slice(n); |
||||
|
} else if (n === list[0].length) { |
||||
|
// first list is a perfect match
|
||||
|
ret = list.shift(); |
||||
|
} else { |
||||
|
// complex case.
|
||||
|
// we have enough to cover it, but it spans past the first buffer.
|
||||
|
if (stringMode) |
||||
|
ret = ''; |
||||
|
else |
||||
|
ret = new Buffer(n); |
||||
|
|
||||
|
var c = 0; |
||||
|
for (var i = 0, l = list.length; i < l && c < n; i++) { |
||||
|
var buf = list[0]; |
||||
|
var cpy = Math.min(n - c, buf.length); |
||||
|
|
||||
|
if (stringMode) |
||||
|
ret += buf.slice(0, cpy); |
||||
|
else |
||||
|
buf.copy(ret, c, 0, cpy); |
||||
|
|
||||
|
if (cpy < buf.length) |
||||
|
list[0] = buf.slice(cpy); |
||||
|
else |
||||
|
list.shift(); |
||||
|
|
||||
|
c += cpy; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
return ret; |
||||
|
} |
||||
|
|
||||
|
function endReadable(stream) { |
||||
|
var state = stream._readableState; |
||||
|
if (state.endEmitted) |
||||
|
return; |
||||
|
state.ended = true; |
||||
|
state.endEmitted = true; |
||||
|
process.nextTick(function() { |
||||
|
stream.readable = false; |
||||
|
stream.emit('end'); |
||||
|
}); |
||||
|
} |
@ -0,0 +1,231 @@ |
|||||
|
// Copyright Joyent, Inc. and other Node contributors.
|
||||
|
//
|
||||
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
|
// copy of this software and associated documentation files (the
|
||||
|
// "Software"), to deal in the Software without restriction, including
|
||||
|
// without limitation the rights to use, copy, modify, merge, publish,
|
||||
|
// distribute, sublicense, and/or sell copies of the Software, and to permit
|
||||
|
// persons to whom the Software is furnished to do so, subject to the
|
||||
|
// following conditions:
|
||||
|
//
|
||||
|
// The above copyright notice and this permission notice shall be included
|
||||
|
// in all copies or substantial portions of the Software.
|
||||
|
//
|
||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
|
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
|
||||
|
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
|
||||
|
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
|
||||
|
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
||||
|
// USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
|
||||
|
|
||||
|
// a transform stream is a readable/writable stream where you do
|
||||
|
// something with the data. Sometimes it's called a "filter",
|
||||
|
// but that's not a great name for it, since that implies a thing where
|
||||
|
// some bits pass through, and others are simply ignored. (That would
|
||||
|
// be a valid example of a transform, of course.)
|
||||
|
//
|
||||
|
// While the output is causally related to the input, it's not a
|
||||
|
// necessarily symmetric or synchronous transformation. For example,
|
||||
|
// a zlib stream might take multiple plain-text writes(), and then
|
||||
|
// emit a single compressed chunk some time in the future.
|
||||
|
//
|
||||
|
// Here's how this works:
|
||||
|
//
|
||||
|
// The Transform stream has all the aspects of the readable and writable
|
||||
|
// stream classes. When you write(chunk), that calls _write(chunk,cb)
|
||||
|
// internally, and returns false if there's a lot of pending writes
|
||||
|
// buffered up. When you call read(), that calls _read(n,cb) until
|
||||
|
// there's enough pending readable data buffered up.
|
||||
|
//
|
||||
|
// In a transform stream, the written data is placed in a buffer. When
|
||||
|
// _read(n,cb) is called, it transforms the queued up data, calling the
|
||||
|
// buffered _write cb's as it consumes chunks. If consuming a single
|
||||
|
// written chunk would result in multiple output chunks, then the first
|
||||
|
// outputted bit calls the readcb, and subsequent chunks just go into
|
||||
|
// the read buffer, and will cause it to emit 'readable' if necessary.
|
||||
|
//
|
||||
|
// This way, back-pressure is actually determined by the reading side,
|
||||
|
// since _read has to be called to start processing a new chunk. However,
|
||||
|
// a pathological inflate type of transform can cause excessive buffering
|
||||
|
// here. For example, imagine a stream where every byte of input is
|
||||
|
// interpreted as an integer from 0-255, and then results in that many
|
||||
|
// bytes of output. Writing the 4 bytes {ff,ff,ff,ff} would result in
|
||||
|
// 1kb of data being output. In this case, you could write a very small
|
||||
|
// amount of input, and end up with a very large amount of output. In
|
||||
|
// such a pathological inflating mechanism, there'd be no way to tell
|
||||
|
// the system to stop doing the transform. A single 4MB write could
|
||||
|
// cause the system to run out of memory.
|
||||
|
//
|
||||
|
// However, even in such a pathological case, only a single written chunk
|
||||
|
// would be consumed, and then the rest would wait (un-transformed) until
|
||||
|
// the results of the previous transformed chunk were consumed. Because
|
||||
|
// the transform happens on-demand, it will only transform as much as is
|
||||
|
// necessary to fill the readable buffer to the specified lowWaterMark.
|
||||
|
|
||||
|
module.exports = Transform; |
||||
|
|
||||
|
var Duplex = require('_stream_duplex'); |
||||
|
var util = require('util'); |
||||
|
util.inherits(Transform, Duplex); |
||||
|
|
||||
|
function TransformState(stream) { |
||||
|
this.buffer = []; |
||||
|
this.transforming = false; |
||||
|
this.pendingReadCb = null; |
||||
|
this.output = function(chunk) { |
||||
|
stream._output(chunk); |
||||
|
}; |
||||
|
} |
||||
|
|
||||
|
function Transform(options) { |
||||
|
if (!(this instanceof Transform)) |
||||
|
return new Transform(options); |
||||
|
|
||||
|
Duplex.call(this, options); |
||||
|
|
||||
|
// bind output so that it can be passed around as a regular function.
|
||||
|
var stream = this; |
||||
|
|
||||
|
// the queue of _write chunks that are pending being transformed
|
||||
|
var ts = this._transformState = new TransformState(stream); |
||||
|
|
||||
|
// when the writable side finishes, then flush out anything remaining.
|
||||
|
this.once('finish', function() { |
||||
|
if ('function' === typeof this._flush) |
||||
|
this._flush(ts.output, function(er) { |
||||
|
done(stream, er); |
||||
|
}); |
||||
|
else |
||||
|
done(stream); |
||||
|
}); |
||||
|
} |
||||
|
|
||||
|
// This is the part where you do stuff!
|
||||
|
// override this function in implementation classes.
|
||||
|
// 'chunk' is an input chunk.
|
||||
|
//
|
||||
|
// Call `output(newChunk)` to pass along transformed output
|
||||
|
// to the readable side. You may call 'output' zero or more times.
|
||||
|
//
|
||||
|
// Call `cb(err)` when you are done with this chunk. If you pass
|
||||
|
// an error, then that'll put the hurt on the whole operation. If you
|
||||
|
// never call cb(), then you'll never get another chunk.
|
||||
|
Transform.prototype._transform = function(chunk, output, cb) { |
||||
|
throw new Error('not implemented'); |
||||
|
}; |
||||
|
|
||||
|
Transform.prototype._write = function(chunk, cb) { |
||||
|
var ts = this._transformState; |
||||
|
var rs = this._readableState; |
||||
|
ts.buffer.push([chunk, cb]); |
||||
|
|
||||
|
// no need for auto-pull if already in the midst of one.
|
||||
|
if (ts.transforming) |
||||
|
return; |
||||
|
|
||||
|
// now we have something to transform, if we were waiting for it.
|
||||
|
// kick off a _read to pull it in.
|
||||
|
if (ts.pendingReadCb) { |
||||
|
var readcb = ts.pendingReadCb; |
||||
|
ts.pendingReadCb = null; |
||||
|
this._read(0, readcb); |
||||
|
} |
||||
|
|
||||
|
// if we weren't waiting for it, but nothing is queued up, then
|
||||
|
// still kick off a transform, just so it's there when the user asks.
|
||||
|
var doRead = rs.needReadable || rs.length <= rs.highWaterMark; |
||||
|
if (doRead && !rs.reading) { |
||||
|
var ret = this.read(0); |
||||
|
if (ret !== null) |
||||
|
return cb(new Error('invalid stream transform state')); |
||||
|
} |
||||
|
}; |
||||
|
|
||||
|
Transform.prototype._read = function(n, readcb) { |
||||
|
var ws = this._writableState; |
||||
|
var rs = this._readableState; |
||||
|
var ts = this._transformState; |
||||
|
|
||||
|
if (ts.pendingReadCb) |
||||
|
throw new Error('_read while _read already in progress'); |
||||
|
|
||||
|
ts.pendingReadCb = readcb; |
||||
|
|
||||
|
// if there's nothing pending, then we just wait.
|
||||
|
// if we're already transforming, then also just hold on a sec.
|
||||
|
// we've already stashed the readcb, so we can come back later
|
||||
|
// when we have something to transform
|
||||
|
if (ts.buffer.length === 0 || ts.transforming) |
||||
|
return; |
||||
|
|
||||
|
// go ahead and transform that thing, now that someone wants it
|
||||
|
var req = ts.buffer.shift(); |
||||
|
var chunk = req[0]; |
||||
|
var writecb = req[1]; |
||||
|
ts.transforming = true; |
||||
|
this._transform(chunk, ts.output, function(er, data) { |
||||
|
ts.transforming = false; |
||||
|
if (data) |
||||
|
ts.output(data); |
||||
|
writecb(er); |
||||
|
}); |
||||
|
}; |
||||
|
|
||||
|
Transform.prototype._output = function(chunk) { |
||||
|
if (!chunk || !chunk.length) |
||||
|
return; |
||||
|
|
||||
|
// if we've got a pending readcb, then just call that,
|
||||
|
// and let Readable take care of it. If not, then we fill
|
||||
|
// the readable buffer ourselves, and emit whatever's needed.
|
||||
|
var ts = this._transformState; |
||||
|
var readcb = ts.pendingReadCb; |
||||
|
if (readcb) { |
||||
|
ts.pendingReadCb = null; |
||||
|
readcb(null, chunk); |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
// otherwise, it's up to us to fill the rs buffer.
|
||||
|
var rs = this._readableState; |
||||
|
var len = rs.length; |
||||
|
rs.buffer.push(chunk); |
||||
|
rs.length += chunk.length; |
||||
|
if (rs.needReadable) { |
||||
|
rs.needReadable = false; |
||||
|
this.emit('readable'); |
||||
|
} |
||||
|
}; |
||||
|
|
||||
|
function done(stream, er) { |
||||
|
if (er) |
||||
|
return stream.emit('error', er); |
||||
|
|
||||
|
// if there's nothing in the write buffer, then that means
|
||||
|
// that nothing more will ever be provided
|
||||
|
var ws = stream._writableState; |
||||
|
var rs = stream._readableState; |
||||
|
var ts = stream._transformState; |
||||
|
|
||||
|
if (ws.length) |
||||
|
throw new Error('calling transform done when ws.length != 0'); |
||||
|
|
||||
|
if (ts.transforming) |
||||
|
throw new Error('calling transform done when still transforming'); |
||||
|
|
||||
|
// if we were waiting on a read, let them know that it isn't coming.
|
||||
|
var readcb = ts.pendingReadCb; |
||||
|
if (readcb) |
||||
|
return readcb(); |
||||
|
|
||||
|
rs.ended = true; |
||||
|
// we may have gotten a 'null' read before, and since there is
|
||||
|
// no more data coming from the writable side, we need to emit
|
||||
|
// now so that the consumer knows to pick up the tail bits.
|
||||
|
if (rs.length && rs.needReadable) |
||||
|
stream.emit('readable'); |
||||
|
else if (rs.length === 0) |
||||
|
stream.emit('end'); |
||||
|
} |
@ -0,0 +1,257 @@ |
|||||
|
// Copyright Joyent, Inc. and other Node contributors.
|
||||
|
//
|
||||
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
|
// copy of this software and associated documentation files (the
|
||||
|
// "Software"), to deal in the Software without restriction, including
|
||||
|
// without limitation the rights to use, copy, modify, merge, publish,
|
||||
|
// distribute, sublicense, and/or sell copies of the Software, and to permit
|
||||
|
// persons to whom the Software is furnished to do so, subject to the
|
||||
|
// following conditions:
|
||||
|
//
|
||||
|
// The above copyright notice and this permission notice shall be included
|
||||
|
// in all copies or substantial portions of the Software.
|
||||
|
//
|
||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
|
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
|
||||
|
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
|
||||
|
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
|
||||
|
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
||||
|
// USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
|
||||
|
// A bit simpler than readable streams.
|
||||
|
// Implement an async ._write(chunk, cb), and it'll handle all
|
||||
|
// the drain event emission and buffering.
|
||||
|
|
||||
|
module.exports = Writable; |
||||
|
Writable.WritableState = WritableState; |
||||
|
|
||||
|
var util = require('util'); |
||||
|
var assert = require('assert'); |
||||
|
var Stream = require('stream'); |
||||
|
|
||||
|
util.inherits(Writable, Stream); |
||||
|
|
||||
|
function WritableState(options, stream) { |
||||
|
options = options || {}; |
||||
|
|
||||
|
// the point at which write() starts returning false
|
||||
|
this.highWaterMark = options.hasOwnProperty('highWaterMark') ? |
||||
|
options.highWaterMark : 16 * 1024; |
||||
|
|
||||
|
// the point that it has to get to before we call _write(chunk,cb)
|
||||
|
// default to pushing everything out as fast as possible.
|
||||
|
this.lowWaterMark = options.hasOwnProperty('lowWaterMark') ? |
||||
|
options.lowWaterMark : 0; |
||||
|
|
||||
|
// cast to ints.
|
||||
|
assert(typeof this.lowWaterMark === 'number'); |
||||
|
assert(typeof this.highWaterMark === 'number'); |
||||
|
this.lowWaterMark = ~~this.lowWaterMark; |
||||
|
this.highWaterMark = ~~this.highWaterMark; |
||||
|
assert(this.lowWaterMark >= 0); |
||||
|
assert(this.highWaterMark >= this.lowWaterMark, |
||||
|
this.highWaterMark + '>=' + this.lowWaterMark); |
||||
|
|
||||
|
this.needDrain = false; |
||||
|
// at the start of calling end()
|
||||
|
this.ending = false; |
||||
|
// when end() has been called, and returned
|
||||
|
this.ended = false; |
||||
|
// when 'finish' has emitted
|
||||
|
this.finished = false; |
||||
|
// when 'finish' is being emitted
|
||||
|
this.finishing = false; |
||||
|
|
||||
|
// should we decode strings into buffers before passing to _write?
|
||||
|
// this is here so that some node-core streams can optimize string
|
||||
|
// handling at a lower level.
|
||||
|
this.decodeStrings = options.hasOwnProperty('decodeStrings') ? |
||||
|
options.decodeStrings : true; |
||||
|
|
||||
|
// not an actual buffer we keep track of, but a measurement
|
||||
|
// of how much we're waiting to get pushed to some underlying
|
||||
|
// socket or file.
|
||||
|
this.length = 0; |
||||
|
|
||||
|
// a flag to see when we're in the middle of a write.
|
||||
|
this.writing = false; |
||||
|
|
||||
|
// a flag to be able to tell if the onwrite cb is called immediately,
|
||||
|
// or on a later tick.
|
||||
|
this.sync = false; |
||||
|
|
||||
|
// the callback that's passed to _write(chunk,cb)
|
||||
|
this.onwrite = function(er) { |
||||
|
onwrite(stream, er); |
||||
|
}; |
||||
|
|
||||
|
// the callback that the user supplies to write(chunk,encoding,cb)
|
||||
|
this.writecb = null; |
||||
|
|
||||
|
// the amount that is being written when _write is called.
|
||||
|
this.writelen = 0; |
||||
|
|
||||
|
this.buffer = []; |
||||
|
} |
||||
|
|
||||
|
function Writable(options) { |
||||
|
// Writable ctor is applied to Duplexes, though they're not
|
||||
|
// instanceof Writable, they're instanceof Readable.
|
||||
|
if (!(this instanceof Writable) && !(this instanceof Stream.Duplex)) |
||||
|
return new Writable(options); |
||||
|
|
||||
|
this._writableState = new WritableState(options, this); |
||||
|
|
||||
|
// legacy.
|
||||
|
this.writable = true; |
||||
|
|
||||
|
Stream.call(this); |
||||
|
} |
||||
|
|
||||
|
// Override this method or _write(chunk, cb)
|
||||
|
Writable.prototype.write = function(chunk, encoding, cb) { |
||||
|
var state = this._writableState; |
||||
|
|
||||
|
if (typeof encoding === 'function') { |
||||
|
cb = encoding; |
||||
|
encoding = null; |
||||
|
} |
||||
|
|
||||
|
if (state.ended) { |
||||
|
var er = new Error('write after end'); |
||||
|
if (typeof cb === 'function') |
||||
|
cb(er); |
||||
|
this.emit('error', er); |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
var l = chunk.length; |
||||
|
if (false === state.decodeStrings) |
||||
|
chunk = [chunk, encoding || 'utf8']; |
||||
|
else if (typeof chunk === 'string' || encoding) { |
||||
|
chunk = new Buffer(chunk + '', encoding); |
||||
|
l = chunk.length; |
||||
|
} |
||||
|
|
||||
|
state.length += l; |
||||
|
|
||||
|
var ret = state.length < state.highWaterMark; |
||||
|
if (ret === false) |
||||
|
state.needDrain = true; |
||||
|
|
||||
|
// if we're already writing something, then just put this
|
||||
|
// in the queue, and wait our turn.
|
||||
|
if (state.writing) { |
||||
|
state.buffer.push([chunk, cb]); |
||||
|
return ret; |
||||
|
} |
||||
|
|
||||
|
state.writing = true; |
||||
|
state.sync = true; |
||||
|
state.writelen = l; |
||||
|
state.writecb = cb; |
||||
|
this._write(chunk, state.onwrite); |
||||
|
state.sync = false; |
||||
|
|
||||
|
return ret; |
||||
|
}; |
||||
|
|
||||
|
function onwrite(stream, er) { |
||||
|
var state = stream._writableState; |
||||
|
var sync = state.sync; |
||||
|
var cb = state.writecb; |
||||
|
var l = state.writelen; |
||||
|
|
||||
|
state.writing = false; |
||||
|
state.writelen = null; |
||||
|
state.writecb = null; |
||||
|
|
||||
|
if (er) { |
||||
|
if (cb) { |
||||
|
if (sync) |
||||
|
process.nextTick(function() { |
||||
|
cb(er); |
||||
|
}); |
||||
|
else |
||||
|
cb(er); |
||||
|
} |
||||
|
|
||||
|
// backwards compatibility. still emit if there was a cb.
|
||||
|
stream.emit('error', er); |
||||
|
return; |
||||
|
} |
||||
|
state.length -= l; |
||||
|
|
||||
|
if (cb) { |
||||
|
// don't call the cb until the next tick if we're in sync mode.
|
||||
|
// also, defer if we're about to write some more right now.
|
||||
|
if (sync || state.buffer.length) |
||||
|
process.nextTick(cb); |
||||
|
else |
||||
|
cb(); |
||||
|
} |
||||
|
|
||||
|
if (state.length === 0 && (state.ended || state.ending) && |
||||
|
!state.finished && !state.finishing) { |
||||
|
// emit 'finish' at the very end.
|
||||
|
state.finishing = true; |
||||
|
stream.emit('finish'); |
||||
|
state.finished = true; |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
// if there's something in the buffer waiting, then do that, too.
|
||||
|
if (state.buffer.length) { |
||||
|
var chunkCb = state.buffer.shift(); |
||||
|
var chunk = chunkCb[0]; |
||||
|
cb = chunkCb[1]; |
||||
|
|
||||
|
if (false === state.decodeStrings) |
||||
|
l = chunk[0].length; |
||||
|
else |
||||
|
l = chunk.length; |
||||
|
|
||||
|
state.writelen = l; |
||||
|
state.writecb = cb; |
||||
|
state.writechunk = chunk; |
||||
|
state.writing = true; |
||||
|
stream._write(chunk, state.onwrite); |
||||
|
} |
||||
|
|
||||
|
if (state.length <= state.lowWaterMark && state.needDrain) { |
||||
|
// Must force callback to be called on nextTick, so that we don't
|
||||
|
// emit 'drain' before the write() consumer gets the 'false' return
|
||||
|
// value, and has a chance to attach a 'drain' listener.
|
||||
|
process.nextTick(function() { |
||||
|
if (!state.needDrain) |
||||
|
return; |
||||
|
state.needDrain = false; |
||||
|
stream.emit('drain'); |
||||
|
}); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
Writable.prototype._write = function(chunk, cb) { |
||||
|
process.nextTick(function() { |
||||
|
cb(new Error('not implemented')); |
||||
|
}); |
||||
|
}; |
||||
|
|
||||
|
Writable.prototype.end = function(chunk, encoding) { |
||||
|
var state = this._writableState; |
||||
|
|
||||
|
// ignore unnecessary end() calls.
|
||||
|
if (state.ending || state.ended || state.finished) |
||||
|
return; |
||||
|
|
||||
|
state.ending = true; |
||||
|
if (chunk) |
||||
|
this.write(chunk, encoding); |
||||
|
else if (state.length === 0 && !state.finishing && !state.finished) { |
||||
|
state.finishing = true; |
||||
|
this.emit('finish'); |
||||
|
state.finished = true; |
||||
|
} |
||||
|
state.ended = true; |
||||
|
}; |
@ -0,0 +1 @@ |
|||||
|
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx |
@ -0,0 +1,320 @@ |
|||||
|
// Copyright Joyent, Inc. and other Node contributors.
|
||||
|
//
|
||||
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
|
// copy of this software and associated documentation files (the
|
||||
|
// "Software"), to deal in the Software without restriction, including
|
||||
|
// without limitation the rights to use, copy, modify, merge, publish,
|
||||
|
// distribute, sublicense, and/or sell copies of the Software, and to permit
|
||||
|
// persons to whom the Software is furnished to do so, subject to the
|
||||
|
// following conditions:
|
||||
|
//
|
||||
|
// The above copyright notice and this permission notice shall be included
|
||||
|
// in all copies or substantial portions of the Software.
|
||||
|
//
|
||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
|
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
|
||||
|
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
|
||||
|
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
|
||||
|
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
||||
|
// USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
|
||||
|
|
||||
|
var common = require('../common.js'); |
||||
|
var R = require('_stream_readable'); |
||||
|
var assert = require('assert'); |
||||
|
|
||||
|
var util = require('util'); |
||||
|
var EE = require('events').EventEmitter; |
||||
|
|
||||
|
function TestReader(n) { |
||||
|
R.apply(this); |
||||
|
this._buffer = new Buffer(n || 100); |
||||
|
this._buffer.fill('x'); |
||||
|
this._pos = 0; |
||||
|
this._bufs = 10; |
||||
|
} |
||||
|
|
||||
|
util.inherits(TestReader, R); |
||||
|
|
||||
|
TestReader.prototype.read = function(n) { |
||||
|
var max = this._buffer.length - this._pos; |
||||
|
n = n || max; |
||||
|
n = Math.max(n, 0); |
||||
|
var toRead = Math.min(n, max); |
||||
|
if (toRead === 0) { |
||||
|
// simulate the read buffer filling up with some more bytes some time
|
||||
|
// in the future.
|
||||
|
setTimeout(function() { |
||||
|
this._pos = 0; |
||||
|
this._bufs -= 1; |
||||
|
if (this._bufs <= 0) { |
||||
|
// read them all!
|
||||
|
if (!this.ended) { |
||||
|
this.emit('end'); |
||||
|
this.ended = true; |
||||
|
} |
||||
|
} else { |
||||
|
this.emit('readable'); |
||||
|
} |
||||
|
}.bind(this), 10); |
||||
|
return null; |
||||
|
} |
||||
|
|
||||
|
var ret = this._buffer.slice(this._pos, this._pos + toRead); |
||||
|
this._pos += toRead; |
||||
|
return ret; |
||||
|
}; |
||||
|
|
||||
|
/////
|
||||
|
|
||||
|
function TestWriter() { |
||||
|
EE.apply(this); |
||||
|
this.received = []; |
||||
|
this.flush = false; |
||||
|
} |
||||
|
|
||||
|
util.inherits(TestWriter, EE); |
||||
|
|
||||
|
TestWriter.prototype.write = function(c) { |
||||
|
this.received.push(c.toString()); |
||||
|
this.emit('write', c); |
||||
|
return true; |
||||
|
|
||||
|
// flip back and forth between immediate acceptance and not.
|
||||
|
this.flush = !this.flush; |
||||
|
if (!this.flush) setTimeout(this.emit.bind(this, 'drain'), 10); |
||||
|
return this.flush; |
||||
|
}; |
||||
|
|
||||
|
TestWriter.prototype.end = function(c) { |
||||
|
if (c) this.write(c); |
||||
|
this.emit('end', this.received); |
||||
|
}; |
||||
|
|
||||
|
////////
|
||||
|
|
||||
|
// tiny node-tap lookalike.
|
||||
|
var tests = []; |
||||
|
function test(name, fn) { |
||||
|
tests.push([name, fn]); |
||||
|
} |
||||
|
|
||||
|
function run() { |
||||
|
var next = tests.shift(); |
||||
|
if (!next) |
||||
|
return console.error('ok'); |
||||
|
|
||||
|
var name = next[0]; |
||||
|
var fn = next[1]; |
||||
|
console.log('# %s', name); |
||||
|
fn({ |
||||
|
same: assert.deepEqual, |
||||
|
equal: assert.equal, |
||||
|
end: run |
||||
|
}); |
||||
|
} |
||||
|
|
||||
|
process.nextTick(run); |
||||
|
|
||||
|
|
||||
|
test('a most basic test', function(t) { |
||||
|
var r = new TestReader(20); |
||||
|
|
||||
|
var reads = []; |
||||
|
var expect = [ 'x', |
||||
|
'xx', |
||||
|
'xxx', |
||||
|
'xxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxxxxx', |
||||
|
'xxxxxxxxx', |
||||
|
'xxx', |
||||
|
'xxxxxxxxxxxx', |
||||
|
'xxxxxxxx', |
||||
|
'xxxxxxxxxxxxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxxxxxxxxxxxxxxx', |
||||
|
'xx', |
||||
|
'xxxxxxxxxxxxxxxxxxxx', |
||||
|
'xxxxxxxxxxxxxxxxxxxx', |
||||
|
'xxxxxxxxxxxxxxxxxxxx', |
||||
|
'xxxxxxxxxxxxxxxxxxxx', |
||||
|
'xxxxxxxxxxxxxxxxxxxx' ]; |
||||
|
|
||||
|
r.on('end', function() { |
||||
|
t.same(reads, expect); |
||||
|
t.end(); |
||||
|
}); |
||||
|
|
||||
|
var readSize = 1; |
||||
|
function flow() { |
||||
|
var res; |
||||
|
while (null !== (res = r.read(readSize++))) { |
||||
|
reads.push(res.toString()); |
||||
|
} |
||||
|
r.once('readable', flow); |
||||
|
} |
||||
|
|
||||
|
flow(); |
||||
|
}); |
||||
|
|
||||
|
test('pipe', function(t) { |
||||
|
var r = new TestReader(5); |
||||
|
|
||||
|
var expect = [ 'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx' ] |
||||
|
|
||||
|
var w = new TestWriter; |
||||
|
var flush = true; |
||||
|
w.on('end', function(received) { |
||||
|
t.same(received, expect); |
||||
|
t.end(); |
||||
|
}); |
||||
|
|
||||
|
r.pipe(w); |
||||
|
}); |
||||
|
|
||||
|
|
||||
|
|
||||
|
[1,2,3,4,5,6,7,8,9].forEach(function(SPLIT) { |
||||
|
test('unpipe', function(t) { |
||||
|
var r = new TestReader(5); |
||||
|
|
||||
|
// unpipe after 3 writes, then write to another stream instead.
|
||||
|
var expect = [ 'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx' ]; |
||||
|
expect = [ expect.slice(0, SPLIT), expect.slice(SPLIT) ]; |
||||
|
|
||||
|
var w = [ new TestWriter(), new TestWriter() ]; |
||||
|
|
||||
|
var writes = SPLIT; |
||||
|
w[0].on('write', function() { |
||||
|
if (--writes === 0) { |
||||
|
r.unpipe(); |
||||
|
t.equal(r._readableState.pipes, null); |
||||
|
w[0].end(); |
||||
|
r.pipe(w[1]); |
||||
|
t.equal(r._readableState.pipes, w[1]); |
||||
|
} |
||||
|
}); |
||||
|
|
||||
|
var ended = 0; |
||||
|
|
||||
|
var ended0 = false; |
||||
|
var ended1 = false; |
||||
|
w[0].on('end', function(results) { |
||||
|
t.equal(ended0, false); |
||||
|
ended0 = true; |
||||
|
ended++; |
||||
|
t.same(results, expect[0]); |
||||
|
}); |
||||
|
|
||||
|
w[1].on('end', function(results) { |
||||
|
t.equal(ended1, false); |
||||
|
ended1 = true; |
||||
|
ended++; |
||||
|
t.equal(ended, 2); |
||||
|
t.same(results, expect[1]); |
||||
|
t.end(); |
||||
|
}); |
||||
|
|
||||
|
r.pipe(w[0]); |
||||
|
}); |
||||
|
}); |
||||
|
|
||||
|
|
||||
|
// both writers should get the same exact data.
|
||||
|
test('multipipe', function(t) { |
||||
|
var r = new TestReader(5); |
||||
|
var w = [ new TestWriter, new TestWriter ]; |
||||
|
|
||||
|
var expect = [ 'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx' ]; |
||||
|
|
||||
|
var c = 2; |
||||
|
w[0].on('end', function(received) { |
||||
|
t.same(received, expect, 'first'); |
||||
|
if (--c === 0) t.end(); |
||||
|
}); |
||||
|
w[1].on('end', function(received) { |
||||
|
t.same(received, expect, 'second'); |
||||
|
if (--c === 0) t.end(); |
||||
|
}); |
||||
|
|
||||
|
r.pipe(w[0]); |
||||
|
r.pipe(w[1]); |
||||
|
}); |
||||
|
|
||||
|
|
||||
|
[1,2,3,4,5,6,7,8,9].forEach(function(SPLIT) { |
||||
|
test('multi-unpipe', function(t) { |
||||
|
var r = new TestReader(5); |
||||
|
|
||||
|
// unpipe after 3 writes, then write to another stream instead.
|
||||
|
var expect = [ 'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx', |
||||
|
'xxxxx' ]; |
||||
|
expect = [ expect.slice(0, SPLIT), expect.slice(SPLIT) ]; |
||||
|
|
||||
|
var w = [ new TestWriter(), new TestWriter(), new TestWriter() ]; |
||||
|
|
||||
|
var writes = SPLIT; |
||||
|
w[0].on('write', function() { |
||||
|
if (--writes === 0) { |
||||
|
r.unpipe(); |
||||
|
w[0].end(); |
||||
|
r.pipe(w[1]); |
||||
|
} |
||||
|
}); |
||||
|
|
||||
|
var ended = 0; |
||||
|
|
||||
|
w[0].on('end', function(results) { |
||||
|
ended++; |
||||
|
t.same(results, expect[0]); |
||||
|
}); |
||||
|
|
||||
|
w[1].on('end', function(results) { |
||||
|
ended++; |
||||
|
t.equal(ended, 2); |
||||
|
t.same(results, expect[1]); |
||||
|
t.end(); |
||||
|
}); |
||||
|
|
||||
|
r.pipe(w[0]); |
||||
|
r.pipe(w[2]); |
||||
|
}); |
||||
|
}); |
@ -0,0 +1,76 @@ |
|||||
|
// Copyright Joyent, Inc. and other Node contributors.
|
||||
|
//
|
||||
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
|
// copy of this software and associated documentation files (the
|
||||
|
// "Software"), to deal in the Software without restriction, including
|
||||
|
// without limitation the rights to use, copy, modify, merge, publish,
|
||||
|
// distribute, sublicense, and/or sell copies of the Software, and to permit
|
||||
|
// persons to whom the Software is furnished to do so, subject to the
|
||||
|
// following conditions:
|
||||
|
//
|
||||
|
// The above copyright notice and this permission notice shall be included
|
||||
|
// in all copies or substantial portions of the Software.
|
||||
|
//
|
||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
|
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
|
||||
|
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
|
||||
|
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
|
||||
|
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
||||
|
// USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
|
||||
|
|
||||
|
var common = require('../common.js'); |
||||
|
var R = require('_stream_readable'); |
||||
|
var assert = require('assert'); |
||||
|
|
||||
|
var fs = require('fs'); |
||||
|
var FSReadable = fs.ReadStream; |
||||
|
|
||||
|
var path = require('path'); |
||||
|
var file = path.resolve(common.fixturesDir, 'x1024.txt'); |
||||
|
|
||||
|
var size = fs.statSync(file).size; |
||||
|
|
||||
|
// expect to see chunks no more than 10 bytes each.
|
||||
|
var expectLengths = []; |
||||
|
for (var i = size; i > 0; i -= 10) { |
||||
|
expectLengths.push(Math.min(i, 10)); |
||||
|
} |
||||
|
|
||||
|
var util = require('util'); |
||||
|
var Stream = require('stream'); |
||||
|
|
||||
|
util.inherits(TestWriter, Stream); |
||||
|
|
||||
|
function TestWriter() { |
||||
|
Stream.apply(this); |
||||
|
this.buffer = []; |
||||
|
this.length = 0; |
||||
|
} |
||||
|
|
||||
|
TestWriter.prototype.write = function(c) { |
||||
|
this.buffer.push(c.toString()); |
||||
|
this.length += c.length; |
||||
|
return true; |
||||
|
}; |
||||
|
|
||||
|
TestWriter.prototype.end = function(c) { |
||||
|
if (c) this.buffer.push(c.toString()); |
||||
|
this.emit('results', this.buffer); |
||||
|
} |
||||
|
|
||||
|
var r = new FSReadable(file, { bufferSize: 10 }); |
||||
|
var w = new TestWriter(); |
||||
|
|
||||
|
w.on('results', function(res) { |
||||
|
console.error(res, w.length); |
||||
|
assert.equal(w.length, size); |
||||
|
var l = 0; |
||||
|
assert.deepEqual(res.map(function (c) { |
||||
|
return c.length; |
||||
|
}), expectLengths); |
||||
|
console.log('ok'); |
||||
|
}); |
||||
|
|
||||
|
r.pipe(w, { chunkSize: 10 }); |
@ -0,0 +1,105 @@ |
|||||
|
// Copyright Joyent, Inc. and other Node contributors.
|
||||
|
//
|
||||
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
|
// copy of this software and associated documentation files (the
|
||||
|
// "Software"), to deal in the Software without restriction, including
|
||||
|
// without limitation the rights to use, copy, modify, merge, publish,
|
||||
|
// distribute, sublicense, and/or sell copies of the Software, and to permit
|
||||
|
// persons to whom the Software is furnished to do so, subject to the
|
||||
|
// following conditions:
|
||||
|
//
|
||||
|
// The above copyright notice and this permission notice shall be included
|
||||
|
// in all copies or substantial portions of the Software.
|
||||
|
//
|
||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
|
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
|
||||
|
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
|
||||
|
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
|
||||
|
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
||||
|
// USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
|
||||
|
var common = require('../common'); |
||||
|
var assert = require('assert'); |
||||
|
var stream = require('stream'); |
||||
|
|
||||
|
(function testErrorListenerCatches() { |
||||
|
var count = 1000; |
||||
|
|
||||
|
var source = new stream.Readable(); |
||||
|
source._read = function(n, cb) { |
||||
|
n = Math.min(count, n); |
||||
|
count -= n; |
||||
|
cb(null, new Buffer(n)); |
||||
|
}; |
||||
|
|
||||
|
var unpipedDest; |
||||
|
source.unpipe = function(dest) { |
||||
|
unpipedDest = dest; |
||||
|
stream.Readable.prototype.unpipe.call(this, dest); |
||||
|
}; |
||||
|
|
||||
|
var dest = new stream.Writable(); |
||||
|
dest._write = function(chunk, cb) { |
||||
|
cb(); |
||||
|
}; |
||||
|
|
||||
|
source.pipe(dest); |
||||
|
|
||||
|
var gotErr = null; |
||||
|
dest.on('error', function(err) { |
||||
|
gotErr = err; |
||||
|
}); |
||||
|
|
||||
|
var unpipedSource; |
||||
|
dest.on('unpipe', function(src) { |
||||
|
unpipedSource = src; |
||||
|
}); |
||||
|
|
||||
|
var err = new Error('This stream turned into bacon.'); |
||||
|
dest.emit('error', err); |
||||
|
assert.strictEqual(gotErr, err); |
||||
|
assert.strictEqual(unpipedSource, source); |
||||
|
assert.strictEqual(unpipedDest, dest); |
||||
|
})(); |
||||
|
|
||||
|
(function testErrorWithoutListenerThrows() { |
||||
|
var count = 1000; |
||||
|
|
||||
|
var source = new stream.Readable(); |
||||
|
source._read = function(n, cb) { |
||||
|
n = Math.min(count, n); |
||||
|
count -= n; |
||||
|
cb(null, new Buffer(n)); |
||||
|
}; |
||||
|
|
||||
|
var unpipedDest; |
||||
|
source.unpipe = function(dest) { |
||||
|
unpipedDest = dest; |
||||
|
stream.Readable.prototype.unpipe.call(this, dest); |
||||
|
}; |
||||
|
|
||||
|
var dest = new stream.Writable(); |
||||
|
dest._write = function(chunk, cb) { |
||||
|
cb(); |
||||
|
}; |
||||
|
|
||||
|
source.pipe(dest); |
||||
|
|
||||
|
var unpipedSource; |
||||
|
dest.on('unpipe', function(src) { |
||||
|
unpipedSource = src; |
||||
|
}); |
||||
|
|
||||
|
var err = new Error('This stream turned into bacon.'); |
||||
|
|
||||
|
var gotErr = null; |
||||
|
try { |
||||
|
dest.emit('error', err); |
||||
|
} catch (e) { |
||||
|
gotErr = e; |
||||
|
} |
||||
|
assert.strictEqual(gotErr, err); |
||||
|
assert.strictEqual(unpipedSource, source); |
||||
|
assert.strictEqual(unpipedDest, dest); |
||||
|
})(); |
@ -0,0 +1,109 @@ |
|||||
|
// Copyright Joyent, Inc. and other Node contributors.
|
||||
|
//
|
||||
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
|
// copy of this software and associated documentation files (the
|
||||
|
// "Software"), to deal in the Software without restriction, including
|
||||
|
// without limitation the rights to use, copy, modify, merge, publish,
|
||||
|
// distribute, sublicense, and/or sell copies of the Software, and to permit
|
||||
|
// persons to whom the Software is furnished to do so, subject to the
|
||||
|
// following conditions:
|
||||
|
//
|
||||
|
// The above copyright notice and this permission notice shall be included
|
||||
|
// in all copies or substantial portions of the Software.
|
||||
|
//
|
||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
|
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
|
||||
|
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
|
||||
|
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
|
||||
|
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
||||
|
// USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
|
||||
|
var assert = require('assert'); |
||||
|
var common = require('../common.js'); |
||||
|
var fromList = require('_stream_readable')._fromList; |
||||
|
|
||||
|
// tiny node-tap lookalike.
|
||||
|
var tests = []; |
||||
|
function test(name, fn) { |
||||
|
tests.push([name, fn]); |
||||
|
} |
||||
|
|
||||
|
function run() { |
||||
|
var next = tests.shift(); |
||||
|
if (!next) |
||||
|
return console.error('ok'); |
||||
|
|
||||
|
var name = next[0]; |
||||
|
var fn = next[1]; |
||||
|
console.log('# %s', name); |
||||
|
fn({ |
||||
|
same: assert.deepEqual, |
||||
|
equal: assert.equal, |
||||
|
end: run |
||||
|
}); |
||||
|
} |
||||
|
|
||||
|
process.nextTick(run); |
||||
|
|
||||
|
|
||||
|
|
||||
|
test('buffers', function(t) { |
||||
|
// have a length
|
||||
|
var len = 16; |
||||
|
var list = [ new Buffer('foog'), |
||||
|
new Buffer('bark'), |
||||
|
new Buffer('bazy'), |
||||
|
new Buffer('kuel') ]; |
||||
|
|
||||
|
// read more than the first element.
|
||||
|
var ret = fromList(6, list, 16); |
||||
|
t.equal(ret.toString(), 'foogba'); |
||||
|
|
||||
|
// read exactly the first element.
|
||||
|
ret = fromList(2, list, 10); |
||||
|
t.equal(ret.toString(), 'rk'); |
||||
|
|
||||
|
// read less than the first element.
|
||||
|
ret = fromList(2, list, 8); |
||||
|
t.equal(ret.toString(), 'ba'); |
||||
|
|
||||
|
// read more than we have.
|
||||
|
ret = fromList(100, list, 6); |
||||
|
t.equal(ret.toString(), 'zykuel'); |
||||
|
|
||||
|
// all consumed.
|
||||
|
t.same(list, []); |
||||
|
|
||||
|
t.end(); |
||||
|
}); |
||||
|
|
||||
|
test('strings', function(t) { |
||||
|
// have a length
|
||||
|
var len = 16; |
||||
|
var list = [ 'foog', |
||||
|
'bark', |
||||
|
'bazy', |
||||
|
'kuel' ]; |
||||
|
|
||||
|
// read more than the first element.
|
||||
|
var ret = fromList(6, list, 16, true); |
||||
|
t.equal(ret, 'foogba'); |
||||
|
|
||||
|
// read exactly the first element.
|
||||
|
ret = fromList(2, list, 10, true); |
||||
|
t.equal(ret, 'rk'); |
||||
|
|
||||
|
// read less than the first element.
|
||||
|
ret = fromList(2, list, 8, true); |
||||
|
t.equal(ret, 'ba'); |
||||
|
|
||||
|
// read more than we have.
|
||||
|
ret = fromList(100, list, 6, true); |
||||
|
t.equal(ret, 'zykuel'); |
||||
|
|
||||
|
// all consumed.
|
||||
|
t.same(list, []); |
||||
|
|
||||
|
t.end(); |
||||
|
}); |
@ -0,0 +1,299 @@ |
|||||
|
// Copyright Joyent, Inc. and other Node contributors.
|
||||
|
//
|
||||
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
|
// copy of this software and associated documentation files (the
|
||||
|
// "Software"), to deal in the Software without restriction, including
|
||||
|
// without limitation the rights to use, copy, modify, merge, publish,
|
||||
|
// distribute, sublicense, and/or sell copies of the Software, and to permit
|
||||
|
// persons to whom the Software is furnished to do so, subject to the
|
||||
|
// following conditions:
|
||||
|
//
|
||||
|
// The above copyright notice and this permission notice shall be included
|
||||
|
// in all copies or substantial portions of the Software.
|
||||
|
//
|
||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
|
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
|
||||
|
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
|
||||
|
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
|
||||
|
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
||||
|
// USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
|
||||
|
|
||||
|
var common = require('../common.js'); |
||||
|
var assert = require('assert'); |
||||
|
var R = require('_stream_readable'); |
||||
|
var util = require('util'); |
||||
|
|
||||
|
// tiny node-tap lookalike.
|
||||
|
var tests = []; |
||||
|
function test(name, fn) { |
||||
|
tests.push([name, fn]); |
||||
|
} |
||||
|
|
||||
|
function run() { |
||||
|
var next = tests.shift(); |
||||
|
if (!next) |
||||
|
return console.error('ok'); |
||||
|
|
||||
|
var name = next[0]; |
||||
|
var fn = next[1]; |
||||
|
console.log('# %s', name); |
||||
|
fn({ |
||||
|
same: assert.deepEqual, |
||||
|
equal: assert.equal, |
||||
|
end: run |
||||
|
}); |
||||
|
} |
||||
|
|
||||
|
process.nextTick(run); |
||||
|
|
||||
|
/////
|
||||
|
|
||||
|
util.inherits(TestReader, R); |
||||
|
|
||||
|
function TestReader(n, opts) { |
||||
|
R.call(this, util._extend({ |
||||
|
bufferSize: 5 |
||||
|
}, opts)); |
||||
|
|
||||
|
this.pos = 0; |
||||
|
this.len = n || 100; |
||||
|
} |
||||
|
|
||||
|
TestReader.prototype._read = function(n, cb) { |
||||
|
setTimeout(function() { |
||||
|
|
||||
|
if (this.pos >= this.len) { |
||||
|
return cb(); |
||||
|
} |
||||
|
|
||||
|
n = Math.min(n, this.len - this.pos); |
||||
|
if (n <= 0) { |
||||
|
return cb(); |
||||
|
} |
||||
|
|
||||
|
this.pos += n; |
||||
|
var ret = new Buffer(n); |
||||
|
ret.fill('a'); |
||||
|
|
||||
|
return cb(null, ret); |
||||
|
}.bind(this), 1); |
||||
|
}; |
||||
|
|
||||
|
test('setEncoding utf8', function(t) { |
||||
|
var tr = new TestReader(100); |
||||
|
tr.setEncoding('utf8'); |
||||
|
var out = []; |
||||
|
var expect = |
||||
|
[ 'aaaaaaaaaa', |
||||
|
'aaaaaaaaaa', |
||||
|
'aaaaaaaaaa', |
||||
|
'aaaaaaaaaa', |
||||
|
'aaaaaaaaaa', |
||||
|
'aaaaaaaaaa', |
||||
|
'aaaaaaaaaa', |
||||
|
'aaaaaaaaaa', |
||||
|
'aaaaaaaaaa', |
||||
|
'aaaaaaaaaa' ]; |
||||
|
|
||||
|
tr.on('readable', function flow() { |
||||
|
var chunk; |
||||
|
while (null !== (chunk = tr.read(10))) |
||||
|
out.push(chunk); |
||||
|
}); |
||||
|
|
||||
|
tr.on('end', function() { |
||||
|
t.same(out, expect); |
||||
|
t.end(); |
||||
|
}); |
||||
|
|
||||
|
// just kick it off.
|
||||
|
tr.emit('readable'); |
||||
|
}); |
||||
|
|
||||
|
|
||||
|
test('setEncoding hex', function(t) { |
||||
|
var tr = new TestReader(100); |
||||
|
tr.setEncoding('hex'); |
||||
|
var out = []; |
||||
|
var expect = |
||||
|
[ '6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161' ]; |
||||
|
|
||||
|
tr.on('readable', function flow() { |
||||
|
var chunk; |
||||
|
while (null !== (chunk = tr.read(10))) |
||||
|
out.push(chunk); |
||||
|
}); |
||||
|
|
||||
|
tr.on('end', function() { |
||||
|
t.same(out, expect); |
||||
|
t.end(); |
||||
|
}); |
||||
|
|
||||
|
// just kick it off.
|
||||
|
tr.emit('readable'); |
||||
|
}); |
||||
|
|
||||
|
test('setEncoding hex with read(13)', function(t) { |
||||
|
var tr = new TestReader(100); |
||||
|
tr.setEncoding('hex'); |
||||
|
var out = []; |
||||
|
var expect = |
||||
|
[ "6161616161616", |
||||
|
"1616161616161", |
||||
|
"6161616161616", |
||||
|
"1616161616161", |
||||
|
"6161616161616", |
||||
|
"1616161616161", |
||||
|
"6161616161616", |
||||
|
"1616161616161", |
||||
|
"6161616161616", |
||||
|
"1616161616161", |
||||
|
"6161616161616", |
||||
|
"1616161616161", |
||||
|
"6161616161616", |
||||
|
"1616161616161", |
||||
|
"6161616161616", |
||||
|
"16161" ]; |
||||
|
|
||||
|
tr.on('readable', function flow() { |
||||
|
var chunk; |
||||
|
while (null !== (chunk = tr.read(13))) |
||||
|
out.push(chunk); |
||||
|
}); |
||||
|
|
||||
|
tr.on('end', function() { |
||||
|
t.same(out, expect); |
||||
|
t.end(); |
||||
|
}); |
||||
|
|
||||
|
// just kick it off.
|
||||
|
tr.emit('readable'); |
||||
|
}); |
||||
|
|
||||
|
test('encoding: utf8', function(t) { |
||||
|
var tr = new TestReader(100, { encoding: 'utf8' }); |
||||
|
var out = []; |
||||
|
var expect = |
||||
|
[ 'aaaaaaaaaa', |
||||
|
'aaaaaaaaaa', |
||||
|
'aaaaaaaaaa', |
||||
|
'aaaaaaaaaa', |
||||
|
'aaaaaaaaaa', |
||||
|
'aaaaaaaaaa', |
||||
|
'aaaaaaaaaa', |
||||
|
'aaaaaaaaaa', |
||||
|
'aaaaaaaaaa', |
||||
|
'aaaaaaaaaa' ]; |
||||
|
|
||||
|
tr.on('readable', function flow() { |
||||
|
var chunk; |
||||
|
while (null !== (chunk = tr.read(10))) |
||||
|
out.push(chunk); |
||||
|
}); |
||||
|
|
||||
|
tr.on('end', function() { |
||||
|
t.same(out, expect); |
||||
|
t.end(); |
||||
|
}); |
||||
|
|
||||
|
// just kick it off.
|
||||
|
tr.emit('readable'); |
||||
|
}); |
||||
|
|
||||
|
|
||||
|
test('encoding: hex', function(t) { |
||||
|
var tr = new TestReader(100, { encoding: 'hex' }); |
||||
|
var out = []; |
||||
|
var expect = |
||||
|
[ '6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161', |
||||
|
'6161616161' ]; |
||||
|
|
||||
|
tr.on('readable', function flow() { |
||||
|
var chunk; |
||||
|
while (null !== (chunk = tr.read(10))) |
||||
|
out.push(chunk); |
||||
|
}); |
||||
|
|
||||
|
tr.on('end', function() { |
||||
|
t.same(out, expect); |
||||
|
t.end(); |
||||
|
}); |
||||
|
|
||||
|
// just kick it off.
|
||||
|
tr.emit('readable'); |
||||
|
}); |
||||
|
|
||||
|
test('encoding: hex with read(13)', function(t) { |
||||
|
var tr = new TestReader(100, { encoding: 'hex' }); |
||||
|
var out = []; |
||||
|
var expect = |
||||
|
[ "6161616161616", |
||||
|
"1616161616161", |
||||
|
"6161616161616", |
||||
|
"1616161616161", |
||||
|
"6161616161616", |
||||
|
"1616161616161", |
||||
|
"6161616161616", |
||||
|
"1616161616161", |
||||
|
"6161616161616", |
||||
|
"1616161616161", |
||||
|
"6161616161616", |
||||
|
"1616161616161", |
||||
|
"6161616161616", |
||||
|
"1616161616161", |
||||
|
"6161616161616", |
||||
|
"16161" ]; |
||||
|
|
||||
|
tr.on('readable', function flow() { |
||||
|
var chunk; |
||||
|
while (null !== (chunk = tr.read(13))) |
||||
|
out.push(chunk); |
||||
|
}); |
||||
|
|
||||
|
tr.on('end', function() { |
||||
|
t.same(out, expect); |
||||
|
t.end(); |
||||
|
}); |
||||
|
|
||||
|
// just kick it off.
|
||||
|
tr.emit('readable'); |
||||
|
}); |
@ -0,0 +1,314 @@ |
|||||
|
// Copyright Joyent, Inc. and other Node contributors.
|
||||
|
//
|
||||
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
|
// copy of this software and associated documentation files (the
|
||||
|
// "Software"), to deal in the Software without restriction, including
|
||||
|
// without limitation the rights to use, copy, modify, merge, publish,
|
||||
|
// distribute, sublicense, and/or sell copies of the Software, and to permit
|
||||
|
// persons to whom the Software is furnished to do so, subject to the
|
||||
|
// following conditions:
|
||||
|
//
|
||||
|
// The above copyright notice and this permission notice shall be included
|
||||
|
// in all copies or substantial portions of the Software.
|
||||
|
//
|
||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
|
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
|
||||
|
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
|
||||
|
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
|
||||
|
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
||||
|
// USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
|
||||
|
var assert = require('assert'); |
||||
|
var common = require('../common.js'); |
||||
|
var PassThrough = require('_stream_passthrough'); |
||||
|
var Transform = require('_stream_transform'); |
||||
|
|
||||
|
// tiny node-tap lookalike.
|
||||
|
var tests = []; |
||||
|
function test(name, fn) { |
||||
|
tests.push([name, fn]); |
||||
|
} |
||||
|
|
||||
|
function run() { |
||||
|
var next = tests.shift(); |
||||
|
if (!next) |
||||
|
return console.error('ok'); |
||||
|
|
||||
|
var name = next[0]; |
||||
|
var fn = next[1]; |
||||
|
console.log('# %s', name); |
||||
|
fn({ |
||||
|
same: assert.deepEqual, |
||||
|
equal: assert.equal, |
||||
|
end: run |
||||
|
}); |
||||
|
} |
||||
|
|
||||
|
process.nextTick(run); |
||||
|
|
||||
|
/////
|
||||
|
|
||||
|
test('passthrough', function(t) { |
||||
|
var pt = new PassThrough(); |
||||
|
|
||||
|
pt.write(new Buffer('foog')); |
||||
|
pt.write(new Buffer('bark')); |
||||
|
pt.write(new Buffer('bazy')); |
||||
|
pt.write(new Buffer('kuel')); |
||||
|
pt.end(); |
||||
|
|
||||
|
t.equal(pt.read(5).toString(), 'foogb'); |
||||
|
t.equal(pt.read(5).toString(), 'arkba'); |
||||
|
t.equal(pt.read(5).toString(), 'zykue'); |
||||
|
t.equal(pt.read(5).toString(), 'l'); |
||||
|
t.end(); |
||||
|
}); |
||||
|
|
||||
|
test('simple transform', function(t) { |
||||
|
var pt = new Transform; |
||||
|
pt._transform = function(c, output, cb) { |
||||
|
var ret = new Buffer(c.length); |
||||
|
ret.fill('x'); |
||||
|
output(ret); |
||||
|
cb(); |
||||
|
}; |
||||
|
|
||||
|
pt.write(new Buffer('foog')); |
||||
|
pt.write(new Buffer('bark')); |
||||
|
pt.write(new Buffer('bazy')); |
||||
|
pt.write(new Buffer('kuel')); |
||||
|
pt.end(); |
||||
|
|
||||
|
t.equal(pt.read(5).toString(), 'xxxxx'); |
||||
|
t.equal(pt.read(5).toString(), 'xxxxx'); |
||||
|
t.equal(pt.read(5).toString(), 'xxxxx'); |
||||
|
t.equal(pt.read(5).toString(), 'x'); |
||||
|
t.end(); |
||||
|
}); |
||||
|
|
||||
|
test('async passthrough', function(t) { |
||||
|
var pt = new Transform; |
||||
|
pt._transform = function(chunk, output, cb) { |
||||
|
setTimeout(function() { |
||||
|
output(chunk); |
||||
|
cb(); |
||||
|
}, 10); |
||||
|
}; |
||||
|
|
||||
|
pt.write(new Buffer('foog')); |
||||
|
pt.write(new Buffer('bark')); |
||||
|
pt.write(new Buffer('bazy')); |
||||
|
pt.write(new Buffer('kuel')); |
||||
|
pt.end(); |
||||
|
|
||||
|
setTimeout(function() { |
||||
|
t.equal(pt.read(5).toString(), 'foogb'); |
||||
|
t.equal(pt.read(5).toString(), 'arkba'); |
||||
|
t.equal(pt.read(5).toString(), 'zykue'); |
||||
|
t.equal(pt.read(5).toString(), 'l'); |
||||
|
t.end(); |
||||
|
}, 100); |
||||
|
}); |
||||
|
|
||||
|
test('assymetric transform (expand)', function(t) { |
||||
|
var pt = new Transform; |
||||
|
|
||||
|
// emit each chunk 2 times.
|
||||
|
pt._transform = function(chunk, output, cb) { |
||||
|
setTimeout(function() { |
||||
|
output(chunk); |
||||
|
setTimeout(function() { |
||||
|
output(chunk); |
||||
|
cb(); |
||||
|
}, 10) |
||||
|
}, 10); |
||||
|
}; |
||||
|
|
||||
|
pt.write(new Buffer('foog')); |
||||
|
pt.write(new Buffer('bark')); |
||||
|
pt.write(new Buffer('bazy')); |
||||
|
pt.write(new Buffer('kuel')); |
||||
|
pt.end(); |
||||
|
|
||||
|
setTimeout(function() { |
||||
|
t.equal(pt.read(5).toString(), 'foogf'); |
||||
|
t.equal(pt.read(5).toString(), 'oogba'); |
||||
|
t.equal(pt.read(5).toString(), 'rkbar'); |
||||
|
t.equal(pt.read(5).toString(), 'kbazy'); |
||||
|
t.equal(pt.read(5).toString(), 'bazyk'); |
||||
|
t.equal(pt.read(5).toString(), 'uelku'); |
||||
|
t.equal(pt.read(5).toString(), 'el'); |
||||
|
t.end(); |
||||
|
}, 200); |
||||
|
}); |
||||
|
|
||||
|
test('assymetric transform (compress)', function(t) { |
||||
|
var pt = new Transform; |
||||
|
|
||||
|
// each output is the first char of 3 consecutive chunks,
|
||||
|
// or whatever's left.
|
||||
|
pt.state = ''; |
||||
|
|
||||
|
pt._transform = function(chunk, output, cb) { |
||||
|
if (!chunk) |
||||
|
chunk = ''; |
||||
|
var s = chunk.toString(); |
||||
|
setTimeout(function() { |
||||
|
this.state += s.charAt(0); |
||||
|
if (this.state.length === 3) { |
||||
|
output(new Buffer(this.state)); |
||||
|
this.state = ''; |
||||
|
} |
||||
|
cb(); |
||||
|
}.bind(this), 10); |
||||
|
}; |
||||
|
|
||||
|
pt._flush = function(output, cb) { |
||||
|
// just output whatever we have.
|
||||
|
setTimeout(function() { |
||||
|
output(new Buffer(this.state)); |
||||
|
this.state = ''; |
||||
|
cb(); |
||||
|
}.bind(this), 10); |
||||
|
}; |
||||
|
|
||||
|
pt._writableState.lowWaterMark = 3; |
||||
|
|
||||
|
pt.write(new Buffer('aaaa')); |
||||
|
pt.write(new Buffer('bbbb')); |
||||
|
pt.write(new Buffer('cccc')); |
||||
|
pt.write(new Buffer('dddd')); |
||||
|
pt.write(new Buffer('eeee')); |
||||
|
pt.write(new Buffer('aaaa')); |
||||
|
pt.write(new Buffer('bbbb')); |
||||
|
pt.write(new Buffer('cccc')); |
||||
|
pt.write(new Buffer('dddd')); |
||||
|
pt.write(new Buffer('eeee')); |
||||
|
pt.write(new Buffer('aaaa')); |
||||
|
pt.write(new Buffer('bbbb')); |
||||
|
pt.write(new Buffer('cccc')); |
||||
|
pt.write(new Buffer('dddd')); |
||||
|
pt.end(); |
||||
|
|
||||
|
// 'abcdeabcdeabcd'
|
||||
|
setTimeout(function() { |
||||
|
t.equal(pt.read(5).toString(), 'abcde'); |
||||
|
t.equal(pt.read(5).toString(), 'abcde'); |
||||
|
t.equal(pt.read(5).toString(), 'abcd'); |
||||
|
t.end(); |
||||
|
}, 200); |
||||
|
}); |
||||
|
|
||||
|
|
||||
|
test('passthrough event emission', function(t) { |
||||
|
var pt = new PassThrough({ |
||||
|
lowWaterMark: 0 |
||||
|
}); |
||||
|
var emits = 0; |
||||
|
pt.on('readable', function() { |
||||
|
var state = pt._readableState; |
||||
|
console.error('>>> emit readable %d', emits); |
||||
|
emits++; |
||||
|
}); |
||||
|
|
||||
|
var i = 0; |
||||
|
|
||||
|
pt.write(new Buffer('foog')); |
||||
|
pt.write(new Buffer('bark')); |
||||
|
|
||||
|
t.equal(pt.read(5).toString(), 'foogb'); |
||||
|
t.equal(pt.read(5) + '', 'null'); |
||||
|
|
||||
|
console.error('need emit 0'); |
||||
|
|
||||
|
pt.write(new Buffer('bazy')); |
||||
|
console.error('should have emitted, but not again'); |
||||
|
pt.write(new Buffer('kuel')); |
||||
|
|
||||
|
console.error('should have emitted readable now 1 === %d', emits); |
||||
|
t.equal(emits, 1); |
||||
|
|
||||
|
t.equal(pt.read(5).toString(), 'arkba'); |
||||
|
t.equal(pt.read(5).toString(), 'zykue'); |
||||
|
t.equal(pt.read(5), null); |
||||
|
|
||||
|
console.error('need emit 1'); |
||||
|
|
||||
|
pt.end(); |
||||
|
|
||||
|
t.equal(emits, 2); |
||||
|
|
||||
|
t.equal(pt.read(5).toString(), 'l'); |
||||
|
t.equal(pt.read(5), null); |
||||
|
|
||||
|
console.error('should not have emitted again'); |
||||
|
t.equal(emits, 2); |
||||
|
t.end(); |
||||
|
}); |
||||
|
|
||||
|
test('passthrough event emission reordered', function(t) { |
||||
|
var pt = new PassThrough; |
||||
|
var emits = 0; |
||||
|
pt.on('readable', function() { |
||||
|
console.error('emit readable', emits) |
||||
|
emits++; |
||||
|
}); |
||||
|
|
||||
|
pt.write(new Buffer('foog')); |
||||
|
pt.write(new Buffer('bark')); |
||||
|
|
||||
|
t.equal(pt.read(5).toString(), 'foogb'); |
||||
|
t.equal(pt.read(5), null); |
||||
|
|
||||
|
console.error('need emit 0'); |
||||
|
pt.once('readable', function() { |
||||
|
t.equal(pt.read(5).toString(), 'arkba'); |
||||
|
|
||||
|
t.equal(pt.read(5), null); |
||||
|
|
||||
|
console.error('need emit 1'); |
||||
|
pt.once('readable', function() { |
||||
|
t.equal(pt.read(5).toString(), 'zykue'); |
||||
|
t.equal(pt.read(5), null); |
||||
|
pt.once('readable', function() { |
||||
|
t.equal(pt.read(5).toString(), 'l'); |
||||
|
t.equal(pt.read(5), null); |
||||
|
t.equal(emits, 3); |
||||
|
t.end(); |
||||
|
}); |
||||
|
pt.end(); |
||||
|
}); |
||||
|
pt.write(new Buffer('kuel')); |
||||
|
}); |
||||
|
|
||||
|
pt.write(new Buffer('bazy')); |
||||
|
}); |
||||
|
|
||||
|
test('passthrough facaded', function(t) { |
||||
|
console.error('passthrough facaded'); |
||||
|
var pt = new PassThrough; |
||||
|
var datas = []; |
||||
|
pt.on('data', function(chunk) { |
||||
|
datas.push(chunk.toString()); |
||||
|
}); |
||||
|
|
||||
|
pt.on('end', function() { |
||||
|
t.same(datas, ['foog', 'bark', 'bazy', 'kuel']); |
||||
|
t.end(); |
||||
|
}); |
||||
|
|
||||
|
pt.write(new Buffer('foog')); |
||||
|
setTimeout(function() { |
||||
|
pt.write(new Buffer('bark')); |
||||
|
setTimeout(function() { |
||||
|
pt.write(new Buffer('bazy')); |
||||
|
setTimeout(function() { |
||||
|
pt.write(new Buffer('kuel')); |
||||
|
setTimeout(function() { |
||||
|
pt.end(); |
||||
|
}, 10); |
||||
|
}, 10); |
||||
|
}, 10); |
||||
|
}, 10); |
||||
|
}); |
@ -0,0 +1,246 @@ |
|||||
|
// Copyright Joyent, Inc. and other Node contributors.
|
||||
|
//
|
||||
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
|
// copy of this software and associated documentation files (the
|
||||
|
// "Software"), to deal in the Software without restriction, including
|
||||
|
// without limitation the rights to use, copy, modify, merge, publish,
|
||||
|
// distribute, sublicense, and/or sell copies of the Software, and to permit
|
||||
|
// persons to whom the Software is furnished to do so, subject to the
|
||||
|
// following conditions:
|
||||
|
//
|
||||
|
// The above copyright notice and this permission notice shall be included
|
||||
|
// in all copies or substantial portions of the Software.
|
||||
|
//
|
||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
|
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
|
||||
|
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
|
||||
|
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
|
||||
|
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
||||
|
// USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
|
||||
|
var common = require('../common.js'); |
||||
|
var W = require('_stream_writable'); |
||||
|
var assert = require('assert'); |
||||
|
|
||||
|
var util = require('util'); |
||||
|
util.inherits(TestWriter, W); |
||||
|
|
||||
|
function TestWriter() { |
||||
|
W.apply(this, arguments); |
||||
|
this.buffer = []; |
||||
|
this.written = 0; |
||||
|
} |
||||
|
|
||||
|
TestWriter.prototype._write = function(chunk, cb) { |
||||
|
// simulate a small unpredictable latency
|
||||
|
setTimeout(function() { |
||||
|
this.buffer.push(chunk.toString()); |
||||
|
this.written += chunk.length; |
||||
|
cb(); |
||||
|
}.bind(this), Math.floor(Math.random() * 10)); |
||||
|
}; |
||||
|
|
||||
|
var chunks = new Array(50); |
||||
|
for (var i = 0; i < chunks.length; i++) { |
||||
|
chunks[i] = new Array(i + 1).join('x'); |
||||
|
} |
||||
|
|
||||
|
// tiny node-tap lookalike.
|
||||
|
var tests = []; |
||||
|
function test(name, fn) { |
||||
|
tests.push([name, fn]); |
||||
|
} |
||||
|
|
||||
|
function run() { |
||||
|
var next = tests.shift(); |
||||
|
if (!next) |
||||
|
return console.log('ok'); |
||||
|
|
||||
|
var name = next[0]; |
||||
|
var fn = next[1]; |
||||
|
|
||||
|
if (!fn) |
||||
|
return run(); |
||||
|
|
||||
|
console.log('# %s', name); |
||||
|
fn({ |
||||
|
same: assert.deepEqual, |
||||
|
equal: assert.equal, |
||||
|
end: run |
||||
|
}); |
||||
|
} |
||||
|
|
||||
|
process.nextTick(run); |
||||
|
|
||||
|
test('write fast', function(t) { |
||||
|
var tw = new TestWriter({ |
||||
|
lowWaterMark: 5, |
||||
|
highWaterMark: 100 |
||||
|
}); |
||||
|
|
||||
|
tw.on('finish', function() { |
||||
|
t.same(tw.buffer, chunks, 'got chunks in the right order'); |
||||
|
t.end(); |
||||
|
}); |
||||
|
|
||||
|
chunks.forEach(function(chunk) { |
||||
|
// screw backpressure. Just buffer it all up.
|
||||
|
tw.write(chunk); |
||||
|
}); |
||||
|
tw.end(); |
||||
|
}); |
||||
|
|
||||
|
test('write slow', function(t) { |
||||
|
var tw = new TestWriter({ |
||||
|
lowWaterMark: 5, |
||||
|
highWaterMark: 100 |
||||
|
}); |
||||
|
|
||||
|
tw.on('finish', function() { |
||||
|
t.same(tw.buffer, chunks, 'got chunks in the right order'); |
||||
|
t.end(); |
||||
|
}); |
||||
|
|
||||
|
var i = 0; |
||||
|
(function W() { |
||||
|
tw.write(chunks[i++]); |
||||
|
if (i < chunks.length) |
||||
|
setTimeout(W, 10); |
||||
|
else |
||||
|
tw.end(); |
||||
|
})(); |
||||
|
}); |
||||
|
|
||||
|
test('write backpressure', function(t) { |
||||
|
var tw = new TestWriter({ |
||||
|
lowWaterMark: 5, |
||||
|
highWaterMark: 50 |
||||
|
}); |
||||
|
|
||||
|
var drains = 0; |
||||
|
|
||||
|
tw.on('finish', function() { |
||||
|
t.same(tw.buffer, chunks, 'got chunks in the right order'); |
||||
|
t.equal(drains, 17); |
||||
|
t.end(); |
||||
|
}); |
||||
|
|
||||
|
tw.on('drain', function() { |
||||
|
drains++; |
||||
|
}); |
||||
|
|
||||
|
var i = 0; |
||||
|
(function W() { |
||||
|
do { |
||||
|
var ret = tw.write(chunks[i++]); |
||||
|
} while (ret !== false && i < chunks.length); |
||||
|
|
||||
|
if (i < chunks.length) { |
||||
|
assert(tw._writableState.length >= 50); |
||||
|
tw.once('drain', W); |
||||
|
} else { |
||||
|
tw.end(); |
||||
|
} |
||||
|
})(); |
||||
|
}); |
||||
|
|
||||
|
test('write bufferize', function(t) { |
||||
|
var tw = new TestWriter({ |
||||
|
lowWaterMark: 5, |
||||
|
highWaterMark: 100 |
||||
|
}); |
||||
|
|
||||
|
var encodings = |
||||
|
[ 'hex', |
||||
|
'utf8', |
||||
|
'utf-8', |
||||
|
'ascii', |
||||
|
'binary', |
||||
|
'base64', |
||||
|
'ucs2', |
||||
|
'ucs-2', |
||||
|
'utf16le', |
||||
|
'utf-16le', |
||||
|
undefined ]; |
||||
|
|
||||
|
tw.on('finish', function() { |
||||
|
t.same(tw.buffer, chunks, 'got the expected chunks'); |
||||
|
}); |
||||
|
|
||||
|
chunks.forEach(function(chunk, i) { |
||||
|
var enc = encodings[ i % encodings.length ]; |
||||
|
chunk = new Buffer(chunk); |
||||
|
tw.write(chunk.toString(enc), enc); |
||||
|
}); |
||||
|
t.end(); |
||||
|
}); |
||||
|
|
||||
|
test('write no bufferize', function(t) { |
||||
|
var tw = new TestWriter({ |
||||
|
lowWaterMark: 5, |
||||
|
highWaterMark: 100, |
||||
|
decodeStrings: false |
||||
|
}); |
||||
|
|
||||
|
tw._write = function(chunk, cb) { |
||||
|
assert(Array.isArray(chunk)); |
||||
|
assert(typeof chunk[0] === 'string'); |
||||
|
chunk = new Buffer(chunk[0], chunk[1]); |
||||
|
return TestWriter.prototype._write.call(this, chunk, cb); |
||||
|
}; |
||||
|
|
||||
|
var encodings = |
||||
|
[ 'hex', |
||||
|
'utf8', |
||||
|
'utf-8', |
||||
|
'ascii', |
||||
|
'binary', |
||||
|
'base64', |
||||
|
'ucs2', |
||||
|
'ucs-2', |
||||
|
'utf16le', |
||||
|
'utf-16le', |
||||
|
undefined ]; |
||||
|
|
||||
|
tw.on('finish', function() { |
||||
|
t.same(tw.buffer, chunks, 'got the expected chunks'); |
||||
|
}); |
||||
|
|
||||
|
chunks.forEach(function(chunk, i) { |
||||
|
var enc = encodings[ i % encodings.length ]; |
||||
|
chunk = new Buffer(chunk); |
||||
|
tw.write(chunk.toString(enc), enc); |
||||
|
}); |
||||
|
t.end(); |
||||
|
}); |
||||
|
|
||||
|
test('write callbacks', function (t) { |
||||
|
var callbacks = chunks.map(function(chunk, i) { |
||||
|
return [i, function(er) { |
||||
|
callbacks._called[i] = chunk; |
||||
|
}]; |
||||
|
}).reduce(function(set, x) { |
||||
|
set['callback-' + x[0]] = x[1]; |
||||
|
return set; |
||||
|
}, {}); |
||||
|
callbacks._called = []; |
||||
|
|
||||
|
var tw = new TestWriter({ |
||||
|
lowWaterMark: 5, |
||||
|
highWaterMark: 100 |
||||
|
}); |
||||
|
|
||||
|
tw.on('finish', function() { |
||||
|
process.nextTick(function() { |
||||
|
t.same(tw.buffer, chunks, 'got chunks in the right order'); |
||||
|
t.same(callbacks._called, chunks, 'called all callbacks'); |
||||
|
t.end(); |
||||
|
}); |
||||
|
}); |
||||
|
|
||||
|
chunks.forEach(function(chunk, i) { |
||||
|
tw.write(chunk, callbacks['callback-' + i]); |
||||
|
}); |
||||
|
tw.end(); |
||||
|
}); |
Loading…
Reference in new issue