From 0e01d6398f14074675393a65cf430ea0a7a81ab8 Mon Sep 17 00:00:00 2001 From: isaacs Date: Tue, 2 Oct 2012 16:15:39 -0700 Subject: [PATCH] zlib: streams2 --- lib/zlib.js | 215 +++++++++++-------------- src/node_zlib.cc | 13 ++ test/simple/test-zlib-destroy.js | 36 ----- test/simple/test-zlib-invalid-input.js | 7 - 4 files changed, 104 insertions(+), 167 deletions(-) delete mode 100644 test/simple/test-zlib-destroy.js diff --git a/lib/zlib.js b/lib/zlib.js index 9b56241146..bc3e9330f2 100644 --- a/lib/zlib.js +++ b/lib/zlib.js @@ -19,9 +19,10 @@ // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE // USE OR OTHER DEALINGS IN THE SOFTWARE. +var Transform = require('_stream_transform'); + var binding = process.binding('zlib'); var util = require('util'); -var Stream = require('stream'); var assert = require('assert').ok; // zlib doesn't provide these, so kludge them in following the same @@ -138,33 +139,35 @@ function zlibBuffer(engine, buffer, callback) { var buffers = []; var nread = 0; + engine.on('error', onError); + engine.on('end', onEnd); + + engine.end(buffer); + flow(); + + function flow() { + var chunk; + while (null !== (chunk = engine.read())) { + buffers.push(chunk); + nread += chunk.length; + } + engine.once('readable', flow); + } + function onError(err) { engine.removeListener('end', onEnd); - engine.removeListener('error', onError); + engine.removeListener('readable', flow); callback(err); } - function onData(chunk) { - buffers.push(chunk); - nread += chunk.length; - } - function onEnd() { var buf = Buffer.concat(buffers, nread); buffers = []; callback(null, buf); } - - engine.on('error', onError); - engine.on('data', onData); - engine.on('end', onEnd); - - engine.write(buffer); - engine.end(); } - // generic zlib // minimal 2-byte header function Deflate(opts) { @@ -217,15 +220,13 @@ function Unzip(opts) { // you call the .write() method. function Zlib(opts, mode) { - Stream.call(this); - this._opts = opts = opts || {}; - this._queue = []; - this._processing = false; - this._ended = false; - this.readable = true; - this.writable = true; - this._flush = binding.Z_NO_FLUSH; + this._chunkSize = opts.chunkSize || exports.Z_DEFAULT_CHUNK; + + Transform.call(this, opts); + + // means a different thing there. + this._readableState.chunkSize = null; if (opts.chunkSize) { if (opts.chunkSize < exports.Z_MIN_CHUNK || @@ -274,13 +275,12 @@ function Zlib(opts, mode) { this._binding = new binding.Zlib(mode); var self = this; + this._hadError = false; this._binding.onerror = function(message, errno) { // there is no way to cleanly recover. // continuing only obscures problems. self._binding = null; self._hadError = true; - self._queue.length = 0; - self._processing = false; var error = new Error(message); error.errno = errno; @@ -294,7 +294,6 @@ function Zlib(opts, mode) { opts.strategy || exports.Z_DEFAULT_STRATEGY, opts.dictionary); - this._chunkSize = opts.chunkSize || exports.Z_DEFAULT_CHUNK; this._buffer = new Buffer(this._chunkSize); this._offset = 0; this._closed = false; @@ -302,59 +301,47 @@ function Zlib(opts, mode) { this.once('end', this.close); } -util.inherits(Zlib, Stream); - -Zlib.prototype.write = function write(chunk, cb) { - if (this._hadError) return true; - - if (this._ended) { - return this.emit('error', new Error('Cannot write after end')); - } - - if (arguments.length === 1 && typeof chunk === 'function') { - cb = chunk; - chunk = null; - } - - if (!chunk) { - chunk = null; - } else if (typeof chunk === 'string') { - chunk = new Buffer(chunk); - } else if (!Buffer.isBuffer(chunk)) { - return this.emit('error', new Error('Invalid argument')); - } - - - var empty = this._queue.length === 0; - - this._queue.push([chunk, cb]); - this._process(); - if (!empty) { - this._needDrain = true; - } - return empty; -}; +util.inherits(Zlib, Transform); Zlib.prototype.reset = function reset() { return this._binding.reset(); }; -Zlib.prototype.flush = function flush(cb) { - this._flush = binding.Z_SYNC_FLUSH; - return this.write(cb); +Zlib.prototype._flush = function(output, callback) { + var rs = this._readableState; + var self = this; + this._transform(null, output, function(er) { + if (er) + return callback(er); + + // now a weird thing happens... it could be that you called flush + // but everything had already actually been consumed, but it wasn't + // enough to get over the Readable class's lowWaterMark. + // In that case, we emit 'readable' now to make sure it's consumed. + if (rs.length && + rs.length < rs.lowWaterMark && + !rs.ended && + rs.needReadable) + self.emit('readable'); + + callback(); + }); }; -Zlib.prototype.end = function end(chunk, cb) { - if (this._hadError) return true; +Zlib.prototype.flush = function(callback) { + var ws = this._writableState; + var ts = this._transformState; - var self = this; - this._ending = true; - var ret = this.write(chunk, function() { - self.emit('end'); - if (cb) cb(); - }); - this._ended = true; - return ret; + if (ws.writing) { + ws.needDrain = true; + var self = this; + this.once('drain', function() { + self._flush(ts.output, callback); + }); + return; + } + + this._flush(ts.output, callback || function() {}); }; Zlib.prototype.close = function(callback) { @@ -368,37 +355,37 @@ Zlib.prototype.close = function(callback) { this._binding.close(); - process.nextTick(this.emit.bind(this, 'close')); + var self = this; + process.nextTick(function() { + self.emit('close'); + }); }; -Zlib.prototype._process = function() { - if (this._hadError) return; - - if (this._processing || this._paused) return; - - if (this._queue.length === 0) { - if (this._needDrain) { - this._needDrain = false; - this.emit('drain'); - } - // nothing to do, waiting for more data at this point. - return; - } - - var req = this._queue.shift(); - var cb = req.pop(); - var chunk = req.pop(); - - if (this._ending && this._queue.length === 0) { - this._flush = binding.Z_FINISH; - } +Zlib.prototype._transform = function(chunk, output, cb) { + var flushFlag; + var ws = this._writableState; + var ending = ws.ending || ws.ended; + var last = ending && (!chunk || ws.length === chunk.length); + + if (chunk !== null && !Buffer.isBuffer(chunk)) + return cb(new Error('invalid input')); + + // If it's the last chunk, or a final flush, we use the Z_FINISH flush flag. + // If it's explicitly flushing at some other time, then we use + // Z_FULL_FLUSH. Otherwise, use Z_NO_FLUSH for maximum compression + // goodness. + if (last) + flushFlag = binding.Z_FINISH; + else if (chunk === null) + flushFlag = binding.Z_FULL_FLUSH; + else + flushFlag = binding.Z_NO_FLUSH; - var self = this; var availInBefore = chunk && chunk.length; var availOutBefore = this._chunkSize - this._offset; - var inOff = 0; - var req = this._binding.write(this._flush, + + var req = this._binding.write(flushFlag, chunk, // in inOff, // in_off availInBefore, // in_len @@ -408,23 +395,23 @@ Zlib.prototype._process = function() { req.buffer = chunk; req.callback = callback; - this._processing = req; + var self = this; function callback(availInAfter, availOutAfter, buffer) { - if (self._hadError) return; + if (self._hadError) + return; var have = availOutBefore - availOutAfter; - assert(have >= 0, 'have should not go down'); if (have > 0) { var out = self._buffer.slice(self._offset, self._offset + have); self._offset += have; - self.emit('data', out); + // serve some output to the consumer. + output(out); } - // XXX Maybe have a 'min buffer' size so we don't dip into the - // thread pool with only 1 byte available or something? + // exhausted the output buffer, or used all the input create a new one. if (availOutAfter === 0 || self._offset >= self._chunkSize) { availOutBefore = self._chunkSize; self._offset = 0; @@ -439,7 +426,7 @@ Zlib.prototype._process = function() { inOff += (availInBefore - availInAfter); availInBefore = availInAfter; - var newReq = self._binding.write(self._flush, + var newReq = self._binding.write(flushFlag, chunk, inOff, availInBefore, @@ -448,34 +435,14 @@ Zlib.prototype._process = function() { self._chunkSize); newReq.callback = callback; // this same function newReq.buffer = chunk; - self._processing = newReq; return; } // finished with the chunk. - self._processing = false; - if (cb) cb(); - self._process(); + cb(); } }; -Zlib.prototype.pause = function() { - this._paused = true; - this.emit('pause'); -}; - -Zlib.prototype.resume = function() { - this._paused = false; - this._process(); -}; - -Zlib.prototype.destroy = function() { - this.readable = false; - this.writable = false; - this._ended = true; - this.emit('close'); -}; - util.inherits(Deflate, Zlib); util.inherits(Inflate, Zlib); util.inherits(Gzip, Zlib); diff --git a/src/node_zlib.cc b/src/node_zlib.cc index 13f94e9020..881b20ce62 100644 --- a/src/node_zlib.cc +++ b/src/node_zlib.cc @@ -109,7 +109,19 @@ class ZCtx : public ObjectWrap { assert(!ctx->write_in_progress_ && "write already in progress"); ctx->write_in_progress_ = true; + assert(!args[0]->IsUndefined() && "must provide flush value"); + unsigned int flush = args[0]->Uint32Value(); + + if (flush != Z_NO_FLUSH && + flush != Z_PARTIAL_FLUSH && + flush != Z_SYNC_FLUSH && + flush != Z_FULL_FLUSH && + flush != Z_FINISH && + flush != Z_BLOCK) { + assert(0 && "Invalid flush value"); + } + Bytef *in; Bytef *out; size_t in_off, in_len, out_off, out_len; @@ -483,6 +495,7 @@ void InitZlib(Handle target) { callback_sym = NODE_PSYMBOL("callback"); onerror_sym = NODE_PSYMBOL("onerror"); + // valid flush values. NODE_DEFINE_CONSTANT(target, Z_NO_FLUSH); NODE_DEFINE_CONSTANT(target, Z_PARTIAL_FLUSH); NODE_DEFINE_CONSTANT(target, Z_SYNC_FLUSH); diff --git a/test/simple/test-zlib-destroy.js b/test/simple/test-zlib-destroy.js deleted file mode 100644 index 7a1120e284..0000000000 --- a/test/simple/test-zlib-destroy.js +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright Joyent, Inc. and other Node contributors. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the -// "Software"), to deal in the Software without restriction, including -// without limitation the rights to use, copy, modify, merge, publish, -// distribute, sublicense, and/or sell copies of the Software, and to permit -// persons to whom the Software is furnished to do so, subject to the -// following conditions: -// -// The above copyright notice and this permission notice shall be included -// in all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN -// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, -// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR -// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE -// USE OR OTHER DEALINGS IN THE SOFTWARE. - -var common = require('../common'); -var assert = require('assert'); -var zlib = require('zlib'); - -['Deflate', 'Inflate', 'Gzip', 'Gunzip', 'DeflateRaw', 'InflateRaw', 'Unzip'] - .forEach(function (name) { - var a = false; - var zStream = new zlib[name](); - zStream.on('close', function () { - a = true; - }); - zStream.destroy(); - - assert.equal(a, true, name+'#destroy() must emit \'close\''); - }); diff --git a/test/simple/test-zlib-invalid-input.js b/test/simple/test-zlib-invalid-input.js index f97c5831ad..c3d8b5b47a 100644 --- a/test/simple/test-zlib-invalid-input.js +++ b/test/simple/test-zlib-invalid-input.js @@ -50,13 +50,6 @@ unzips.forEach(function (uz, i) { uz.on('error', function(er) { console.error('Error event', er); hadError[i] = true; - - // to be friendly to the Stream API, zlib objects just return true and - // ignore data on the floor after an error. It's up to the user to - // catch the 'error' event and do something intelligent. They do not - // emit any more data, however. - assert.equal(uz.write('also invalid'), true); - assert.equal(uz.end(), true); }); uz.on('end', function(er) {