mirror of https://github.com/lukechilds/node.git
isaacs
12 years ago
7 changed files with 829 additions and 4 deletions
@ -0,0 +1,81 @@ |
|||
// Copyright Joyent, Inc. and other Node contributors.
|
|||
//
|
|||
// Permission is hereby granted, free of charge, to any person obtaining a
|
|||
// copy of this software and associated documentation files (the
|
|||
// "Software"), to deal in the Software without restriction, including
|
|||
// without limitation the rights to use, copy, modify, merge, publish,
|
|||
// distribute, sublicense, and/or sell copies of the Software, and to permit
|
|||
// persons to whom the Software is furnished to do so, subject to the
|
|||
// following conditions:
|
|||
//
|
|||
// The above copyright notice and this permission notice shall be included
|
|||
// in all copies or substantial portions of the Software.
|
|||
//
|
|||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
|||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
|||
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
|
|||
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
|
|||
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
|
|||
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
|||
// USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|||
|
|||
// a duplex stream is just a stream that is both readable and writable.
|
|||
// Since JS doesn't have multiple prototypal inheritance, this class
|
|||
// prototypally inherits from Readable, and then parasitically from
|
|||
// Writable.
|
|||
|
|||
module.exports = Duplex; |
|||
var util = require('util'); |
|||
var Readable = require('_stream_readable'); |
|||
var Writable = require('_stream_writable'); |
|||
|
|||
util.inherits(Duplex, Readable); |
|||
|
|||
Object.keys(Writable.prototype).forEach(function(method) { |
|||
if (!Duplex.prototype[method]) |
|||
Duplex.prototype[method] = Writable.prototype[method]; |
|||
}); |
|||
|
|||
function Duplex(options) { |
|||
Readable.call(this, options); |
|||
Writable.call(this, options); |
|||
|
|||
this.allowHalfOpen = true; |
|||
if (options && options.allowHalfOpen === false) |
|||
this.allowHalfOpen = false; |
|||
|
|||
this.once('finish', onfinish); |
|||
this.once('end', onend); |
|||
} |
|||
|
|||
// the no-half-open enforcers.
|
|||
function onfinish() { |
|||
// if we allow half-open state, or if the readable side ended,
|
|||
// then we're ok.
|
|||
if (this.allowHalfOpen || this._readableState.ended) |
|||
return; |
|||
|
|||
// mark that we're done.
|
|||
this._readableState.ended = true; |
|||
|
|||
// tell the user
|
|||
if (this._readableState.length === 0) |
|||
this.emit('end'); |
|||
else |
|||
this.emit('readable'); |
|||
} |
|||
|
|||
function onend() { |
|||
// if we allow half-open state, or if the writable side ended,
|
|||
// then we're ok.
|
|||
if (this.allowHalfOpen || this._writableState.ended) |
|||
return; |
|||
|
|||
// just in case the user is about to call write() again.
|
|||
this.write = function() { |
|||
return false; |
|||
}; |
|||
|
|||
// no more data can be written.
|
|||
this.end(); |
|||
} |
@ -0,0 +1,39 @@ |
|||
// Copyright Joyent, Inc. and other Node contributors.
|
|||
//
|
|||
// Permission is hereby granted, free of charge, to any person obtaining a
|
|||
// copy of this software and associated documentation files (the
|
|||
// "Software"), to deal in the Software without restriction, including
|
|||
// without limitation the rights to use, copy, modify, merge, publish,
|
|||
// distribute, sublicense, and/or sell copies of the Software, and to permit
|
|||
// persons to whom the Software is furnished to do so, subject to the
|
|||
// following conditions:
|
|||
//
|
|||
// The above copyright notice and this permission notice shall be included
|
|||
// in all copies or substantial portions of the Software.
|
|||
//
|
|||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
|||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
|||
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
|
|||
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
|
|||
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
|
|||
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
|||
// USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|||
|
|||
// a passthrough stream.
|
|||
// basically just the most minimal sort of Transform stream.
|
|||
// Every written chunk gets output as-is.
|
|||
|
|||
module.exports = PassThrough; |
|||
|
|||
var Transform = require('_stream_transform'); |
|||
var util = require('util'); |
|||
util.inherits(PassThrough, Transform); |
|||
|
|||
function PassThrough(options) { |
|||
Transform.call(this, options); |
|||
} |
|||
|
|||
PassThrough.prototype._transform = function(chunk, output, cb) { |
|||
output(chunk); |
|||
cb(); |
|||
}; |
@ -0,0 +1,429 @@ |
|||
// Copyright Joyent, Inc. and other Node contributors.
|
|||
//
|
|||
// Permission is hereby granted, free of charge, to any person obtaining a
|
|||
// copy of this software and associated documentation files (the
|
|||
// "Software"), to deal in the Software without restriction, including
|
|||
// without limitation the rights to use, copy, modify, merge, publish,
|
|||
// distribute, sublicense, and/or sell copies of the Software, and to permit
|
|||
// persons to whom the Software is furnished to do so, subject to the
|
|||
// following conditions:
|
|||
//
|
|||
// The above copyright notice and this permission notice shall be included
|
|||
// in all copies or substantial portions of the Software.
|
|||
//
|
|||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
|||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
|||
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
|
|||
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
|
|||
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
|
|||
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
|||
// USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|||
|
|||
module.exports = Readable; |
|||
|
|||
var Stream = require('stream'); |
|||
var util = require('util'); |
|||
var assert = require('assert'); |
|||
|
|||
util.inherits(Readable, Stream); |
|||
|
|||
function ReadableState(options, stream) { |
|||
options = options || {}; |
|||
|
|||
this.bufferSize = options.bufferSize || 16 * 1024; |
|||
assert(typeof this.bufferSize === 'number'); |
|||
// cast to an int
|
|||
this.bufferSize = ~~this.bufferSize; |
|||
|
|||
this.lowWaterMark = options.lowWaterMark || 1024; |
|||
this.buffer = []; |
|||
this.length = 0; |
|||
this.pipes = []; |
|||
this.flowing = false; |
|||
this.ended = false; |
|||
this.stream = stream; |
|||
this.reading = false; |
|||
|
|||
// whenever we return null, then we set a flag to say
|
|||
// that we're awaiting a 'readable' event emission.
|
|||
this.needReadable = false; |
|||
} |
|||
|
|||
function Readable(options) { |
|||
this._readableState = new ReadableState(options, this); |
|||
Stream.apply(this); |
|||
} |
|||
|
|||
// you can override either this method, or _read(n, cb) below.
|
|||
Readable.prototype.read = function(n) { |
|||
var state = this._readableState; |
|||
|
|||
if (state.length === 0 && state.ended) { |
|||
process.nextTick(this.emit.bind(this, 'end')); |
|||
return null; |
|||
} |
|||
|
|||
if (isNaN(n) || n <= 0) |
|||
n = state.length |
|||
|
|||
// XXX: controversial.
|
|||
// don't have that much. return null, unless we've ended.
|
|||
// However, if the low water mark is lower than the number of bytes,
|
|||
// then we still need to return what we have, or else it won't kick
|
|||
// off another _read() call. For example,
|
|||
// lwm=5
|
|||
// len=9
|
|||
// read(10)
|
|||
// We don't have that many bytes, so it'd be tempting to return null,
|
|||
// but then it won't ever cause _read to be called, so in that case,
|
|||
// we just return what we have, and let the programmer deal with it.
|
|||
if (n > state.length) { |
|||
if (!state.ended && state.length < state.lowWaterMark) { |
|||
state.needReadable = true; |
|||
n = 0; |
|||
} else |
|||
n = state.length; |
|||
} |
|||
|
|||
var ret = n > 0 ? fromList(n, state.buffer, state.length) : null; |
|||
|
|||
if (ret === null || ret.length === 0) |
|||
state.needReadable = true; |
|||
|
|||
state.length -= n; |
|||
|
|||
if (!state.ended && |
|||
state.length < state.lowWaterMark && |
|||
!state.reading) { |
|||
state.reading = true; |
|||
// call internal read method
|
|||
this._read(state.bufferSize, function onread(er, chunk) { |
|||
state.reading = false; |
|||
if (er) |
|||
return this.emit('error', er); |
|||
|
|||
if (!chunk || !chunk.length) { |
|||
state.ended = true; |
|||
// if we've ended and we have some data left, then emit
|
|||
// 'readable' now to make sure it gets picked up.
|
|||
if (state.length > 0) |
|||
this.emit('readable'); |
|||
return; |
|||
} |
|||
|
|||
state.length += chunk.length; |
|||
state.buffer.push(chunk); |
|||
if (state.length < state.lowWaterMark) |
|||
this._read(state.bufferSize, onread.bind(this)); |
|||
|
|||
// now we have something to call this.read() to get.
|
|||
if (state.needReadable) { |
|||
state.needReadable = false; |
|||
this.emit('readable'); |
|||
} |
|||
}.bind(this)); |
|||
} |
|||
|
|||
return ret; |
|||
}; |
|||
|
|||
// abstract method. to be overridden in specific implementation classes.
|
|||
// call cb(er, data) where data is <= n in length.
|
|||
// for virtual (non-string, non-buffer) streams, "length" is somewhat
|
|||
// arbitrary, and perhaps not very meaningful.
|
|||
Readable.prototype._read = function(n, cb) { |
|||
process.nextTick(cb.bind(this, new Error('not implemented'))); |
|||
}; |
|||
|
|||
Readable.prototype.pipe = function(dest, pipeOpts) { |
|||
var src = this; |
|||
var state = this._readableState; |
|||
if (!pipeOpts) |
|||
pipeOpts = {}; |
|||
state.pipes.push(dest); |
|||
|
|||
if ((!pipeOpts || pipeOpts.end !== false) && |
|||
dest !== process.stdout && |
|||
dest !== process.stderr) { |
|||
src.once('end', onend); |
|||
dest.on('unpipe', function(readable) { |
|||
if (readable === src) |
|||
src.removeListener('end', onend); |
|||
}); |
|||
} |
|||
|
|||
function onend() { |
|||
dest.end(); |
|||
} |
|||
|
|||
dest.emit('pipe', src); |
|||
|
|||
// start the flow.
|
|||
if (!state.flowing) |
|||
process.nextTick(flow.bind(null, src, pipeOpts)); |
|||
|
|||
return dest; |
|||
}; |
|||
|
|||
function flow(src, pipeOpts) { |
|||
var state = src._readableState; |
|||
var chunk; |
|||
var dest; |
|||
var needDrain = 0; |
|||
|
|||
function ondrain() { |
|||
needDrain--; |
|||
if (needDrain === 0) |
|||
flow(src, pipeOpts); |
|||
} |
|||
|
|||
while (state.pipes.length && |
|||
null !== (chunk = src.read(pipeOpts.chunkSize))) { |
|||
state.pipes.forEach(function(dest, i, list) { |
|||
var written = dest.write(chunk); |
|||
if (false === written) { |
|||
needDrain++; |
|||
dest.once('drain', ondrain); |
|||
} |
|||
}); |
|||
src.emit('data', chunk); |
|||
|
|||
// if anyone needs a drain, then we have to wait for that.
|
|||
if (needDrain > 0) |
|||
return; |
|||
} |
|||
|
|||
// if every destination was unpiped, either before entering this
|
|||
// function, or in the while loop, then stop flowing.
|
|||
//
|
|||
// NB: This is a pretty rare edge case.
|
|||
if (state.pipes.length === 0) { |
|||
state.flowing = false; |
|||
|
|||
// if there were data event listeners added, then switch to old mode.
|
|||
if (this.listeners('data').length) |
|||
emitDataEvents(this); |
|||
return; |
|||
} |
|||
|
|||
// at this point, no one needed a drain, so we just ran out of data
|
|||
// on the next readable event, start it over again.
|
|||
src.once('readable', flow.bind(null, src, pipeOpts)); |
|||
} |
|||
|
|||
Readable.prototype.unpipe = function(dest) { |
|||
var state = this._readableState; |
|||
if (!dest) { |
|||
// remove all of them.
|
|||
state.pipes.forEach(function(dest, i, list) { |
|||
dest.emit('unpipe', this); |
|||
}, this); |
|||
state.pipes.length = 0; |
|||
} else { |
|||
var i = state.pipes.indexOf(dest); |
|||
if (i !== -1) { |
|||
dest.emit('unpipe', this); |
|||
state.pipes.splice(i, 1); |
|||
} |
|||
} |
|||
return this; |
|||
}; |
|||
|
|||
// kludge for on('data', fn) consumers. Sad.
|
|||
// This is *not* part of the new readable stream interface.
|
|||
// It is an ugly unfortunate mess of history.
|
|||
Readable.prototype.on = function(ev, fn) { |
|||
// https://github.com/isaacs/readable-stream/issues/16
|
|||
// if we're already flowing, then no need to set up data events.
|
|||
if (ev === 'data' && !this._readableState.flowing) |
|||
emitDataEvents(this); |
|||
|
|||
return Stream.prototype.on.call(this, ev, fn); |
|||
}; |
|||
Readable.prototype.addListener = Readable.prototype.on; |
|||
|
|||
// pause() and resume() are remnants of the legacy readable stream API
|
|||
// If the user uses them, then switch into old mode.
|
|||
Readable.prototype.resume = function() { |
|||
emitDataEvents(this); |
|||
return this.resume(); |
|||
}; |
|||
|
|||
Readable.prototype.pause = function() { |
|||
emitDataEvents(this); |
|||
return this.pause(); |
|||
}; |
|||
|
|||
function emitDataEvents(stream) { |
|||
var state = stream._readableState; |
|||
|
|||
if (state.flowing) { |
|||
// https://github.com/isaacs/readable-stream/issues/16
|
|||
throw new Error('Cannot switch to old mode now.'); |
|||
} |
|||
|
|||
var paused = false; |
|||
var readable = false; |
|||
|
|||
// convert to an old-style stream.
|
|||
stream.readable = true; |
|||
stream.pipe = Stream.prototype.pipe; |
|||
stream.on = stream.addEventListener = Stream.prototype.on; |
|||
|
|||
stream.on('readable', function() { |
|||
readable = true; |
|||
var c; |
|||
while (!paused && (null !== (c = stream.read()))) |
|||
stream.emit('data', c); |
|||
|
|||
if (c === null) { |
|||
readable = false; |
|||
stream._readableState.needReadable = true; |
|||
} |
|||
}); |
|||
|
|||
stream.pause = function() { |
|||
paused = true; |
|||
}; |
|||
|
|||
stream.resume = function() { |
|||
paused = false; |
|||
if (readable) |
|||
stream.emit('readable'); |
|||
}; |
|||
|
|||
// now make it start, just in case it hadn't already.
|
|||
process.nextTick(function() { |
|||
stream.emit('readable'); |
|||
}); |
|||
} |
|||
|
|||
// wrap an old-style stream as the async data source.
|
|||
// This is *not* part of the readable stream interface.
|
|||
// It is an ugly unfortunate mess of history.
|
|||
Readable.prototype.wrap = function(stream) { |
|||
var state = this._readableState; |
|||
var paused = false; |
|||
|
|||
stream.on('end', function() { |
|||
state.ended = true; |
|||
if (state.length === 0) |
|||
this.emit('end'); |
|||
}.bind(this)); |
|||
|
|||
stream.on('data', function(chunk) { |
|||
state.buffer.push(chunk); |
|||
state.length += chunk.length; |
|||
this.emit('readable'); |
|||
|
|||
// if not consumed, then pause the stream.
|
|||
if (state.length > state.lowWaterMark && !paused) { |
|||
paused = true; |
|||
stream.pause(); |
|||
} |
|||
}.bind(this)); |
|||
|
|||
// proxy all the other methods.
|
|||
// important when wrapping filters and duplexes.
|
|||
for (var i in stream) { |
|||
if (typeof stream[i] === 'function' && |
|||
typeof this[i] === 'undefined') { |
|||
this[i] = function(method) { return function() { |
|||
return stream[method].apply(stream, arguments); |
|||
}}(i); |
|||
} |
|||
} |
|||
|
|||
// proxy certain important events.
|
|||
var events = ['error', 'close', 'destroy', 'pause', 'resume']; |
|||
events.forEach(function(ev) { |
|||
stream.on(ev, this.emit.bind(this, ev)); |
|||
}.bind(this)); |
|||
|
|||
// consume some bytes. if not all is consumed, then
|
|||
// pause the underlying stream.
|
|||
this.read = function(n) { |
|||
if (state.length === 0) { |
|||
state.needReadable = true; |
|||
return null; |
|||
} |
|||
|
|||
if (isNaN(n) || n <= 0) |
|||
n = state.length; |
|||
|
|||
if (n > state.length) { |
|||
if (!state.ended) { |
|||
state.needReadable = true; |
|||
return null; |
|||
} else |
|||
n = state.length; |
|||
} |
|||
|
|||
var ret = fromList(n, state.buffer, state.length); |
|||
state.length -= n; |
|||
|
|||
if (state.length < state.lowWaterMark && paused) { |
|||
stream.resume(); |
|||
paused = false; |
|||
} |
|||
|
|||
if (state.length === 0 && state.ended) |
|||
process.nextTick(this.emit.bind(this, 'end')); |
|||
|
|||
return ret; |
|||
}; |
|||
}; |
|||
|
|||
|
|||
|
|||
// exposed for testing purposes only.
|
|||
Readable._fromList = fromList; |
|||
|
|||
// Pluck off n bytes from an array of buffers.
|
|||
// Length is the combined lengths of all the buffers in the list.
|
|||
// If there's no data, then
|
|||
function fromList(n, list, length) { |
|||
var ret; |
|||
|
|||
// nothing in the list, definitely empty.
|
|||
if (list.length === 0) { |
|||
return null; |
|||
} |
|||
|
|||
if (length === 0) { |
|||
ret = null; |
|||
} else if (!n || n >= length) { |
|||
// read it all, truncate the array.
|
|||
ret = Buffer.concat(list, length); |
|||
list.length = 0; |
|||
} else { |
|||
// read just some of it.
|
|||
if (n < list[0].length) { |
|||
// just take a part of the first list item.
|
|||
var buf = list[0]; |
|||
ret = buf.slice(0, n); |
|||
list[0] = buf.slice(n); |
|||
} else if (n === list[0].length) { |
|||
// first list is a perfect match
|
|||
ret = list.shift(); |
|||
} else { |
|||
// complex case.
|
|||
// we have enough to cover it, but it spans past the first buffer.
|
|||
ret = new Buffer(n); |
|||
var c = 0; |
|||
for (var i = 0, l = list.length; i < l && c < n; i++) { |
|||
var buf = list[0]; |
|||
var cpy = Math.min(n - c, buf.length); |
|||
buf.copy(ret, c, 0, cpy); |
|||
if (cpy < buf.length) { |
|||
list[0] = buf.slice(cpy); |
|||
} else { |
|||
list.shift(); |
|||
} |
|||
c += cpy; |
|||
} |
|||
} |
|||
} |
|||
|
|||
return ret; |
|||
} |
@ -0,0 +1,123 @@ |
|||
// Copyright Joyent, Inc. and other Node contributors.
|
|||
//
|
|||
// Permission is hereby granted, free of charge, to any person obtaining a
|
|||
// copy of this software and associated documentation files (the
|
|||
// "Software"), to deal in the Software without restriction, including
|
|||
// without limitation the rights to use, copy, modify, merge, publish,
|
|||
// distribute, sublicense, and/or sell copies of the Software, and to permit
|
|||
// persons to whom the Software is furnished to do so, subject to the
|
|||
// following conditions:
|
|||
//
|
|||
// The above copyright notice and this permission notice shall be included
|
|||
// in all copies or substantial portions of the Software.
|
|||
//
|
|||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
|||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
|||
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
|
|||
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
|
|||
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
|
|||
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
|||
// USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|||
|
|||
// a transform stream is a readable/writable stream where you do
|
|||
// something with the data. Sometimes it's called a "filter",
|
|||
// but that's not a great name for it, since that implies a thing where
|
|||
// some bits pass through, and others are simply ignored. (That would
|
|||
// be a valid example of a transform, of course.)
|
|||
//
|
|||
// While the output is causally related to the input, it's not a
|
|||
// necessarily symmetric or synchronous transformation. For example,
|
|||
// a zlib stream might take multiple plain-text writes(), and then
|
|||
// emit a single compressed chunk some time in the future.
|
|||
|
|||
module.exports = Transform; |
|||
|
|||
var Duplex = require('_stream_duplex'); |
|||
var util = require('util'); |
|||
util.inherits(Transform, Duplex); |
|||
|
|||
function Transform(options) { |
|||
Duplex.call(this, options); |
|||
|
|||
// bind output so that it can be passed around as a regular function.
|
|||
this._output = this._output.bind(this); |
|||
|
|||
// when the writable side finishes, then flush out anything remaining.
|
|||
this.once('finish', function() { |
|||
if ('function' === typeof this._flush) |
|||
this._flush(this._output, done.bind(this)); |
|||
else |
|||
done.call(this); |
|||
}); |
|||
} |
|||
|
|||
// This is the part where you do stuff!
|
|||
// override this function in implementation classes.
|
|||
// 'chunk' is an input chunk.
|
|||
//
|
|||
// Call `output(newChunk)` to pass along transformed output
|
|||
// to the readable side. You may call 'output' zero or more times.
|
|||
//
|
|||
// Call `cb(err)` when you are done with this chunk. If you pass
|
|||
// an error, then that'll put the hurt on the whole operation. If you
|
|||
// never call cb(), then you'll never get another chunk.
|
|||
Transform.prototype._transform = function(chunk, output, cb) { |
|||
throw new Error('not implemented'); |
|||
}; |
|||
|
|||
|
|||
Transform.prototype._write = function(chunk, cb) { |
|||
this._transform(chunk, this._output, cb); |
|||
}; |
|||
|
|||
Transform.prototype._read = function(n, cb) { |
|||
var ws = this._writableState; |
|||
var rs = this._readableState; |
|||
|
|||
// basically a no-op, since the _transform will fill the
|
|||
// _readableState.buffer and emit 'readable' for us, and set ended
|
|||
// Usually, we want to just not call the cb, and set the reading
|
|||
// flag to false, so that another _read will happen next time,
|
|||
// but no state changes.
|
|||
rs.reading = false; |
|||
|
|||
// however, if the writable side has ended, and its buffer is clear,
|
|||
// then that means that the input has all been consumed, and no more
|
|||
// will ever be provide. treat this as an EOF, and pass back 0 bytes.
|
|||
if ((ws.ended || ws.ending) && ws.length === 0) |
|||
cb(); |
|||
}; |
|||
|
|||
Transform.prototype._output = function(chunk) { |
|||
if (!chunk || !chunk.length) |
|||
return; |
|||
|
|||
var state = this._readableState; |
|||
var len = state.length; |
|||
state.buffer.push(chunk); |
|||
state.length += chunk.length; |
|||
if (state.needReadable) { |
|||
state.needReadable = false; |
|||
this.emit('readable'); |
|||
} |
|||
}; |
|||
|
|||
function done(er) { |
|||
if (er) |
|||
return this.emit('error', er); |
|||
|
|||
// if there's nothing in the write buffer, then that means
|
|||
// that nothing more will ever be provided
|
|||
var ws = this._writableState; |
|||
var rs = this._readableState; |
|||
|
|||
rs.ended = true; |
|||
// we may have gotten a 'null' read before, and since there is
|
|||
// no more data coming from the writable side, we need to emit
|
|||
// now so that the consumer knows to pick up the tail bits.
|
|||
if (rs.length && rs.needReadable) |
|||
this.emit('readable'); |
|||
else if (rs.length === 0) { |
|||
this.emit('end'); |
|||
} |
|||
} |
@ -0,0 +1,135 @@ |
|||
// Copyright Joyent, Inc. and other Node contributors.
|
|||
//
|
|||
// Permission is hereby granted, free of charge, to any person obtaining a
|
|||
// copy of this software and associated documentation files (the
|
|||
// "Software"), to deal in the Software without restriction, including
|
|||
// without limitation the rights to use, copy, modify, merge, publish,
|
|||
// distribute, sublicense, and/or sell copies of the Software, and to permit
|
|||
// persons to whom the Software is furnished to do so, subject to the
|
|||
// following conditions:
|
|||
//
|
|||
// The above copyright notice and this permission notice shall be included
|
|||
// in all copies or substantial portions of the Software.
|
|||
//
|
|||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
|||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
|||
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
|
|||
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
|
|||
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
|
|||
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
|||
// USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|||
|
|||
// A bit simpler than readable streams.
|
|||
// Implement an async ._write(chunk, cb), and it'll handle all
|
|||
// the drain event emission and buffering.
|
|||
|
|||
module.exports = Writable |
|||
|
|||
var util = require('util'); |
|||
var Stream = require('stream'); |
|||
|
|||
util.inherits(Writable, Stream); |
|||
|
|||
function WritableState(options) { |
|||
options = options || {}; |
|||
this.highWaterMark = options.highWaterMark || 16 * 1024; |
|||
this.lowWaterMark = options.lowWaterMark || 1024; |
|||
this.needDrain = false; |
|||
this.ended = false; |
|||
this.ending = false; |
|||
|
|||
// not an actual buffer we keep track of, but a measurement
|
|||
// of how much we're waiting to get pushed to some underlying
|
|||
// socket or file.
|
|||
this.length = 0; |
|||
|
|||
this.writing = false; |
|||
this.buffer = []; |
|||
} |
|||
|
|||
function Writable(options) { |
|||
this._writableState = new WritableState(options); |
|||
|
|||
// legacy.
|
|||
this.writable = true; |
|||
|
|||
Stream.call(this); |
|||
} |
|||
|
|||
// Override this method for sync streams
|
|||
// override the _write(chunk, cb) method for async streams
|
|||
Writable.prototype.write = function(chunk, encoding) { |
|||
var state = this._writableState; |
|||
if (state.ended) { |
|||
this.emit('error', new Error('write after end')); |
|||
return; |
|||
} |
|||
|
|||
if (typeof chunk === 'string' && encoding) |
|||
chunk = new Buffer(chunk, encoding); |
|||
|
|||
var ret = state.length >= state.highWaterMark; |
|||
if (ret === false) |
|||
state.needDrain = true; |
|||
|
|||
var l = chunk.length; |
|||
state.length += l; |
|||
|
|||
if (state.writing) { |
|||
state.buffer.push(chunk); |
|||
return ret; |
|||
} |
|||
|
|||
state.writing = true; |
|||
this._write(chunk, function writecb(er) { |
|||
state.writing = false; |
|||
if (er) { |
|||
this.emit('error', er); |
|||
return; |
|||
} |
|||
state.length -= l; |
|||
|
|||
if (state.length === 0 && (state.ended || state.ending)) { |
|||
// emit 'finish' at the very end.
|
|||
this.emit('finish'); |
|||
return; |
|||
} |
|||
|
|||
// if there's something in the buffer waiting, then do that, too.
|
|||
if (state.buffer.length) { |
|||
chunk = state.buffer.shift(); |
|||
l = chunk.length; |
|||
state.writing = true; |
|||
this._write(chunk, writecb.bind(this)); |
|||
} |
|||
|
|||
if (state.length < state.lowWaterMark && state.needDrain) { |
|||
// Must force callback to be called on nextTick, so that we don't
|
|||
// emit 'drain' before the write() consumer gets the 'false' return
|
|||
// value, and has a chance to attach a 'drain' listener.
|
|||
process.nextTick(function() { |
|||
if (!state.needDrain) |
|||
return; |
|||
state.needDrain = false; |
|||
this.emit('drain'); |
|||
}.bind(this)); |
|||
} |
|||
|
|||
}.bind(this)); |
|||
|
|||
return ret; |
|||
}; |
|||
|
|||
Writable.prototype._write = function(chunk, cb) { |
|||
process.nextTick(cb.bind(this, new Error('not implemented'))); |
|||
}; |
|||
|
|||
Writable.prototype.end = function(chunk, encoding) { |
|||
var state = this._writableState; |
|||
state.ending = true; |
|||
if (chunk) |
|||
this.write(chunk, encoding); |
|||
else if (state.length === 0) |
|||
this.emit('finish'); |
|||
state.ended = true; |
|||
}; |
Loading…
Reference in new issue