diff --git a/doc/api/stream.markdown b/doc/api/stream.markdown index 2b192d6b4a..c419266bc3 100644 --- a/doc/api/stream.markdown +++ b/doc/api/stream.markdown @@ -436,8 +436,8 @@ Resumes the incoming `'data'` events after a `pause()`. A `Writable` Stream has the following methods, members, and events. Note that `stream.Writable` is an abstract class designed to be -extended with an underlying implementation of the `_write(chunk, cb)` -method. (See below.) +extended with an underlying implementation of the +`_write(chunk, encoding, cb)` method. (See below.) ### new stream.Writable([options]) @@ -451,10 +451,16 @@ In classes that extend the Writable class, make sure to call the constructor so that the buffering settings can be properly initialized. -### writable.\_write(chunk, callback) +### writable.\_write(chunk, encoding, callback) -* `chunk` {Buffer | Array} The data to be written -* `callback` {Function} Called with an error, or null when finished +* `chunk` {Buffer | String} The chunk to be written. Will always + be a buffer unless the `decodeStrings` option was set to `false`. +* `encoding` {String} If the chunk is a string, then this is the + encoding type. Ignore chunk is a buffer. Note that chunk will + **always** be a buffer unless the `decodeStrings` option is + explicitly set to `false`. +* `callback` {Function} Call this function (optionally with an error + argument) when you are done processing the supplied chunk. All Writable stream implementations must provide a `_write` method to send data to the underlying resource. @@ -467,9 +473,12 @@ Call the callback using the standard `callback(error)` pattern to signal that the write completed successfully or with an error. If the `decodeStrings` flag is set in the constructor options, then -`chunk` will be an array rather than a Buffer. This is to support +`chunk` may be a string rather than a Buffer, and `encoding` will +indicate the sort of string that it is. This is to support implementations that have an optimized handling for certain string -data encodings. +data encodings. If you do not explicitly set the `decodeStrings` +option to `false`, then you can safely ignore the `encoding` argument, +and assume that `chunk` will always be a Buffer. This method is prefixed with an underscore because it is internal to the class that defines it, and should not be called directly by user @@ -543,13 +552,13 @@ TCP socket connection. Note that `stream.Duplex` is an abstract class designed to be extended with an underlying implementation of the `_read(size)` -and `_write(chunk, callback)` methods as you would with a Readable or +and `_write(chunk, encoding, callback)` methods as you would with a Readable or Writable stream class. Since JavaScript doesn't have multiple prototypal inheritance, this class prototypally inherits from Readable, and then parasitically from Writable. It is thus up to the user to implement both the lowlevel -`_read(n)` method as well as the lowlevel `_write(chunk,cb)` method +`_read(n)` method as well as the lowlevel `_write(chunk, encoding, cb)` method on extension duplex classes. ### new stream.Duplex(options) @@ -589,9 +598,12 @@ In classes that extend the Transform class, make sure to call the constructor so that the buffering settings can be properly initialized. -### transform.\_transform(chunk, callback) +### transform.\_transform(chunk, encoding, callback) -* `chunk` {Buffer} The chunk to be transformed. +* `chunk` {Buffer | String} The chunk to be transformed. Will always + be a buffer unless the `decodeStrings` option was set to `false`. +* `encoding` {String} If the chunk is a string, then this is the + encoding type. (Ignore if `decodeStrings` chunk is a buffer.) * `callback` {Function} Call this function (optionally with an error argument) when you are done processing the supplied chunk. @@ -671,7 +683,7 @@ function SimpleProtocol(options) { SimpleProtocol.prototype = Object.create( Transform.prototype, { constructor: { value: SimpleProtocol }}); -SimpleProtocol.prototype._transform = function(chunk, done) { +SimpleProtocol.prototype._transform = function(chunk, encoding, done) { if (!this._inBody) { // check if the chunk has a \n\n var split = -1; diff --git a/lib/_stream_passthrough.js b/lib/_stream_passthrough.js index 557d6de99a..a5e986430d 100644 --- a/lib/_stream_passthrough.js +++ b/lib/_stream_passthrough.js @@ -36,6 +36,6 @@ function PassThrough(options) { Transform.call(this, options); } -PassThrough.prototype._transform = function(chunk, cb) { +PassThrough.prototype._transform = function(chunk, encoding, cb) { cb(null, chunk); }; diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index 222b1390f8..013bebde2a 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -155,10 +155,11 @@ Transform.prototype._transform = function(chunk, output, cb) { throw new Error('not implemented'); }; -Transform.prototype._write = function(chunk, cb) { +Transform.prototype._write = function(chunk, encoding, cb) { var ts = this._transformState; ts.writecb = cb; ts.writechunk = chunk; + ts.writeencoding = encoding; if (!ts.transforming) { var rs = this._readableState; if (ts.needTransform || @@ -176,7 +177,7 @@ Transform.prototype._read = function(n) { if (ts.writechunk && ts.writecb && !ts.transforming) { ts.transforming = true; - this._transform(ts.writechunk, ts.afterTransform); + this._transform(ts.writechunk, ts.writeencoding, ts.afterTransform); } else { // mark that we need a transform, so that any data that comes in // will get processed, now that we've asked for it. diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 2dff2d8c75..57926ad57b 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -146,15 +146,6 @@ function validChunk(stream, state, chunk, cb) { return valid; } -function decodeChunk(state, chunk, encoding) { - if (!state.objectMode && - state.decodeStrings !== false && - typeof chunk === 'string') { - chunk = new Buffer(chunk, encoding); - } - return chunk; -} - Writable.prototype.write = function(chunk, encoding, cb) { var state = this._writableState; var ret = false; @@ -177,6 +168,15 @@ Writable.prototype.write = function(chunk, encoding, cb) { return ret; }; +function decodeChunk(state, chunk, encoding) { + if (!state.objectMode && + state.decodeStrings !== false && + typeof chunk === 'string') { + chunk = new Buffer(chunk, encoding); + } + return chunk; +} + // if we're already writing something, then just put this // in the queue, and wait our turn. Otherwise, call _write // If we return false, then we need a drain event, so set that flag. @@ -184,17 +184,13 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) { chunk = decodeChunk(state, chunk, encoding); var len = state.objectMode ? 1 : chunk.length; - // XXX Remove. _write() should take an encoding. - if (state.decodeStrings === false) - chunk = [chunk, encoding]; - state.length += len; var ret = state.length < state.highWaterMark; state.needDrain = !ret; if (state.writing) - state.buffer.push([chunk, cb]); // XXX [chunk,encoding,cb] + state.buffer.push([chunk, encoding, cb]); else doWrite(stream, state, len, chunk, encoding, cb); @@ -206,8 +202,7 @@ function doWrite(stream, state, len, chunk, encoding, cb) { state.writecb = cb; state.writing = true; state.sync = true; - // XXX stream._write(chunk, encoding, state.onwrite) - stream._write(chunk, state.onwrite); + stream._write(chunk, encoding, state.onwrite); state.sync = false; } @@ -271,21 +266,12 @@ function onwriteDrain(stream, state) { function clearBuffer(stream, state) { state.bufferProcessing = true; - // XXX buffer entry should be [chunk, encoding, cb] for (var c = 0; c < state.buffer.length; c++) { - var chunkCb = state.buffer[c]; - var chunk = chunkCb[0]; - var cb = chunkCb[1]; - var encoding = ''; - var len; - - if (state.objectMode) - len = 1; - else if (false === state.decodeStrings) { - len = chunk[0].length; - encoding = chunk[1]; - } else - len = chunk.length; + var entry = state.buffer[c]; + var chunk = entry[0]; + var encoding = entry[1]; + var cb = entry[2]; + var len = state.objectMode ? 1 : chunk.length; doWrite(stream, state, len, chunk, encoding, cb); @@ -306,10 +292,8 @@ function clearBuffer(stream, state) { state.buffer.length = 0; } -Writable.prototype._write = function(chunk, cb) { - process.nextTick(function() { - cb(new Error('not implemented')); - }); +Writable.prototype._write = function(chunk, encoding, cb) { + cb(new Error('not implemented')); }; Writable.prototype.end = function(chunk, encoding, cb) { diff --git a/lib/crypto.js b/lib/crypto.js index 500e14d2f8..01d4b7125b 100644 --- a/lib/crypto.js +++ b/lib/crypto.js @@ -160,8 +160,8 @@ function Hash(algorithm, options) { util.inherits(Hash, stream.Transform); -Hash.prototype._transform = function(chunk, callback) { - this._binding.update(chunk); +Hash.prototype._transform = function(chunk, encoding, callback) { + this._binding.update(chunk, encoding); callback(); }; @@ -226,8 +226,8 @@ function Cipher(cipher, password, options) { util.inherits(Cipher, stream.Transform); -Cipher.prototype._transform = function(chunk, callback) { - this.push(this._binding.update(chunk)); +Cipher.prototype._transform = function(chunk, encoding, callback) { + this.push(this._binding.update(chunk, encoding)); callback(); }; @@ -351,8 +351,8 @@ function Sign(algorithm, options) { util.inherits(Sign, stream.Writable); -Sign.prototype._write = function(chunk, callback) { - this._binding.update(chunk); +Sign.prototype._write = function(chunk, encoding, callback) { + this._binding.update(chunk, encoding); callback(); }; diff --git a/lib/fs.js b/lib/fs.js index d467c5e0cc..39a34abc96 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -1650,12 +1650,14 @@ WriteStream.prototype.open = function() { }; -WriteStream.prototype._write = function(data, cb) { +WriteStream.prototype._write = function(data, encoding, cb) { if (!Buffer.isBuffer(data)) return this.emit('error', new Error('Invalid data')); if (typeof this.fd !== 'number') - return this.once('open', this._write.bind(this, data, cb)); + return this.once('open', function() { + this._write(data, encoding, cb); + }); var self = this; fs.write(this.fd, data, 0, data.length, this.pos, function(er, bytes) { diff --git a/lib/net.js b/lib/net.js index 35223c3396..4821665218 100644 --- a/lib/net.js +++ b/lib/net.js @@ -161,7 +161,8 @@ function Socket(options) { initSocketHandle(this); - this._pendingWrite = null; + this._pendingData = null; + this._pendingEncoding = ''; // handle strings directly this._writableState.decodeStrings = false; @@ -583,22 +584,20 @@ Socket.prototype.write = function(chunk, encoding, cb) { }; -Socket.prototype._write = function(dataEncoding, cb) { - // assert(Array.isArray(dataEncoding)); - var data = dataEncoding[0]; - var encoding = dataEncoding[1] || 'utf8'; - +Socket.prototype._write = function(data, encoding, cb) { // If we are still connecting, then buffer this for later. // The Writable logic will buffer up any more writes while // waiting for this one to be done. if (this._connecting) { - this._pendingWrite = dataEncoding; + this._pendingData = data; + this._pendingEncoding = encoding; this.once('connect', function() { - this._write(dataEncoding, cb); + this._write(data, encoding, cb); }); return; } - this._pendingWrite = null; + this._pendingData = null; + this._pendingEncoding = ''; timers.active(this); @@ -651,15 +650,16 @@ function createWriteReq(handle, data, encoding) { Socket.prototype.__defineGetter__('bytesWritten', function() { var bytes = this._bytesDispatched, state = this._writableState, - pending = this._pendingWrite; + data = this._pendingData, + encoding = this._pendingEncoding; state.buffer.forEach(function(el) { el = el[0]; bytes += Buffer.byteLength(el[0], el[1]); }); - if (pending) - bytes += Buffer.byteLength(pending[0], pending[1]); + if (data) + bytes += Buffer.byteLength(data, encoding); return bytes; }); diff --git a/lib/tls.js b/lib/tls.js index 86ace15b16..515761410b 100644 --- a/lib/tls.js +++ b/lib/tls.js @@ -239,6 +239,7 @@ function CryptoStream(pair, options) { this.pair = pair; this._pending = null; + this._pendingEncoding = ''; this._pendingCallback = null; this._doneFlag = false; this._resumingSession = false; @@ -300,7 +301,7 @@ function onCryptoStreamEnd() { } -CryptoStream.prototype._write = function write(data, cb) { +CryptoStream.prototype._write = function write(data, encoding, cb) { assert(this._pending === null); // Black-hole data @@ -361,6 +362,7 @@ CryptoStream.prototype._write = function write(data, cb) { // No write has happened this._pending = data; + this._pendingEncoding = encoding; this._pendingCallback = cb; if (this === this.pair.cleartext) { @@ -373,11 +375,13 @@ CryptoStream.prototype._write = function write(data, cb) { CryptoStream.prototype._writePending = function writePending() { var data = this._pending, + encoding = this._pendingEncoding, cb = this._pendingCallback; this._pending = null; + this._pendingEncoding = ''; this._pendingCallback = null; - this._write(data, cb); + this._write(data, encoding, cb); }; diff --git a/lib/zlib.js b/lib/zlib.js index d3aa858fba..dc0aeca304 100644 --- a/lib/zlib.js +++ b/lib/zlib.js @@ -309,7 +309,7 @@ Zlib.prototype.reset = function reset() { }; Zlib.prototype._flush = function(callback) { - this._transform(null, callback); + this._transform(null, '', callback); }; Zlib.prototype.flush = function(callback) { @@ -343,7 +343,7 @@ Zlib.prototype.close = function(callback) { }); }; -Zlib.prototype._transform = function(chunk, cb) { +Zlib.prototype._transform = function(chunk, encoding, cb) { var flushFlag; var ws = this._writableState; var ending = ws.ending || ws.ended; diff --git a/test/simple/test-stream2-finish-pipe.js b/test/simple/test-stream2-finish-pipe.js index bcb57a74a0..39b274f977 100644 --- a/test/simple/test-stream2-finish-pipe.js +++ b/test/simple/test-stream2-finish-pipe.js @@ -29,7 +29,7 @@ r._read = function(size) { }; var w = new stream.Writable(); -w._write = function(data, cb) { +w._write = function(data, encoding, cb) { cb(null); }; diff --git a/test/simple/test-stream2-objects.js b/test/simple/test-stream2-objects.js index 8939ad7a68..ba626cb1e7 100644 --- a/test/simple/test-stream2-objects.js +++ b/test/simple/test-stream2-objects.js @@ -261,7 +261,7 @@ test('high watermark push', function(t) { test('can write objects to stream', function(t) { var w = new Writable({ objectMode: true }); - w._write = function(chunk, cb) { + w._write = function(chunk, encoding, cb) { assert.deepEqual(chunk, { foo: 'bar' }); cb(); }; @@ -278,7 +278,7 @@ test('can write multiple objects to stream', function(t) { var w = new Writable({ objectMode: true }); var list = []; - w._write = function(chunk, cb) { + w._write = function(chunk, encoding, cb) { list.push(chunk); cb(); }; @@ -303,7 +303,7 @@ test('can write strings as objects', function(t) { }); var list = []; - w._write = function(chunk, cb) { + w._write = function(chunk, encoding, cb) { list.push(chunk); process.nextTick(cb); }; @@ -328,7 +328,7 @@ test('buffers finish until cb is called', function(t) { }); var called = false; - w._write = function(chunk, cb) { + w._write = function(chunk, encoding, cb) { assert.equal(chunk, 'foo'); process.nextTick(function() { diff --git a/test/simple/test-stream2-pipe-error-handling.js b/test/simple/test-stream2-pipe-error-handling.js index 82c9a79be9..cf7531cbd3 100644 --- a/test/simple/test-stream2-pipe-error-handling.js +++ b/test/simple/test-stream2-pipe-error-handling.js @@ -40,7 +40,7 @@ var stream = require('stream'); }; var dest = new stream.Writable(); - dest._write = function(chunk, cb) { + dest._write = function(chunk, encoding, cb) { cb(); }; @@ -80,7 +80,7 @@ var stream = require('stream'); }; var dest = new stream.Writable(); - dest._write = function(chunk, cb) { + dest._write = function(chunk, encoding, cb) { cb(); }; diff --git a/test/simple/test-stream2-push.js b/test/simple/test-stream2-push.js index 29b438d32e..b63edc3084 100644 --- a/test/simple/test-stream2-push.js +++ b/test/simple/test-stream2-push.js @@ -90,9 +90,9 @@ var expectWritten = 'asdfgasdfgasdfgasdfg', 'asdfgasdfgasdfgasdfg' ]; -writer._write = function(chunk, cb) { - console.error('WRITE %s', chunk[0]); - written.push(chunk[0]); +writer._write = function(chunk, encoding, cb) { + console.error('WRITE %s', chunk); + written.push(chunk); process.nextTick(cb); }; diff --git a/test/simple/test-stream2-stderr-sync.js b/test/simple/test-stream2-stderr-sync.js index 992c79db41..2b83617e9d 100644 --- a/test/simple/test-stream2-stderr-sync.js +++ b/test/simple/test-stream2-stderr-sync.js @@ -61,7 +61,7 @@ function child0() { Writable.call(this, opts); } - W.prototype._write = function(chunk, cb) { + W.prototype._write = function(chunk, encoding, cb) { var req = handle.writeUtf8String(chunk.toString() + '\n'); // here's the problem. // it needs to tell the Writable machinery that it's ok to write diff --git a/test/simple/test-stream2-transform.js b/test/simple/test-stream2-transform.js index baef18d0c7..a329dee1d9 100644 --- a/test/simple/test-stream2-transform.js +++ b/test/simple/test-stream2-transform.js @@ -67,7 +67,7 @@ test('writable side consumption', function(t) { }); var transformed = 0; - tx._transform = function(chunk, cb) { + tx._transform = function(chunk, encoding, cb) { transformed += chunk.length; tx.push(chunk); cb(); @@ -106,7 +106,7 @@ test('passthrough', function(t) { test('simple transform', function(t) { var pt = new Transform; - pt._transform = function(c, cb) { + pt._transform = function(c, e, cb) { var ret = new Buffer(c.length); ret.fill('x'); pt.push(ret); @@ -128,7 +128,7 @@ test('simple transform', function(t) { test('async passthrough', function(t) { var pt = new Transform; - pt._transform = function(chunk, cb) { + pt._transform = function(chunk, encoding, cb) { setTimeout(function() { pt.push(chunk); cb(); @@ -154,7 +154,7 @@ test('assymetric transform (expand)', function(t) { var pt = new Transform; // emit each chunk 2 times. - pt._transform = function(chunk, cb) { + pt._transform = function(chunk, encoding, cb) { setTimeout(function() { pt.push(chunk); setTimeout(function() { @@ -189,7 +189,7 @@ test('assymetric transform (compress)', function(t) { // or whatever's left. pt.state = ''; - pt._transform = function(chunk, cb) { + pt._transform = function(chunk, encoding, cb) { if (!chunk) chunk = ''; var s = chunk.toString(); @@ -359,7 +359,7 @@ test('passthrough facaded', function(t) { test('object transform (json parse)', function(t) { console.error('json parse stream'); var jp = new Transform({ objectMode: true }); - jp._transform = function(data, cb) { + jp._transform = function(data, encoding, cb) { try { jp.push(JSON.parse(data)); cb(); @@ -399,7 +399,7 @@ test('object transform (json parse)', function(t) { test('object transform (json stringify)', function(t) { console.error('json parse stream'); var js = new Transform({ objectMode: true }); - js._transform = function(data, cb) { + js._transform = function(data, encoding, cb) { try { js.push(JSON.stringify(data)); cb(); diff --git a/test/simple/test-stream2-writable.js b/test/simple/test-stream2-writable.js index 537660263b..1c1bb97ce8 100644 --- a/test/simple/test-stream2-writable.js +++ b/test/simple/test-stream2-writable.js @@ -33,7 +33,7 @@ function TestWriter() { this.written = 0; } -TestWriter.prototype._write = function(chunk, cb) { +TestWriter.prototype._write = function(chunk, encoding, cb) { // simulate a small unpredictable latency setTimeout(function() { this.buffer.push(chunk.toString()); @@ -186,11 +186,10 @@ test('write no bufferize', function(t) { decodeStrings: false }); - tw._write = function(chunk, cb) { - assert(Array.isArray(chunk)); - assert(typeof chunk[0] === 'string'); - chunk = new Buffer(chunk[0], chunk[1]); - return TestWriter.prototype._write.call(this, chunk, cb); + tw._write = function(chunk, encoding, cb) { + assert(typeof chunk === 'string'); + chunk = new Buffer(chunk, encoding); + return TestWriter.prototype._write.call(this, chunk, encoding, cb); }; var encodings = @@ -279,7 +278,7 @@ test('end callback after .write() call', function (t) { test('encoding should be ignored for buffers', function(t) { var tw = new W(); var hex = '018b5e9a8f6236ffe30e31baf80d2cf6eb'; - tw._write = function(chunk, cb) { + tw._write = function(chunk, encoding, cb) { t.equal(chunk.toString('hex'), hex); t.end(); };