diff --git a/lib/fs.js b/lib/fs.js index a9d5893636..ad6b977457 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -1,10 +1,13 @@ -var sys = require('sys'), - events = require('events'), - Buffer = require('buffer').Buffer; +var sys = require('sys'); +var events = require('events'); +var Buffer = require('buffer').Buffer; var binding = process.binding('fs'); var fs = exports; +var kMinPoolSpace = 128; +var kPoolSize = 40*1024; + fs.Stats = binding.Stats; fs.Stats.prototype._checkModeProperty = function (property) { @@ -565,8 +568,16 @@ fs.realpath = function (path, callback) { } } next(); +}; + +var pool; +function allocNewPool () { + pool = new Buffer(kPoolSize); + pool.used = 0; } + + fs.createReadStream = function(path, options) { return new ReadStream(path, options); }; @@ -580,7 +591,6 @@ var ReadStream = fs.ReadStream = function(path, options) { this.paused = false; this.flags = 'r'; - this.encoding = 'binary'; this.mode = 0666; this.bufferSize = 4 * 1024; @@ -614,19 +624,40 @@ sys.inherits(ReadStream, events.EventEmitter); fs.FileReadStream = fs.ReadStream; // support the legacy name -ReadStream.prototype.setEncoding = function(encoding) { - this.encoding = encoding; +ReadStream.prototype.setEncoding = function (encoding) { + var Utf8Decoder = require("utf8decoder").Utf8Decoder; // lazy load + var self = this; + this._encoding = enc.toLowerCase(); + if (this._encoding == 'utf-8' || this._encoding == 'utf8') { + this._decoder = new Utf8Decoder(); + this._decoder.onString = function(str) { + self.emit('data', str); + }; + } else if (this._decoder) { + delete this._decoder; + } }; + ReadStream.prototype._read = function () { var self = this; if (!self.readable || self.paused) return; - fs.read(self.fd, - self.bufferSize, - undefined, - self.encoding, - function(err, data, bytesRead) { + if (!pool || pool.length - pool.used < kMinPoolSpace) { + // discard the old pool. Can't add to the free list because + // users might have refernces to slices on it. + pool = null; + allocNewPool(); + } + + // Grab another reference to the pool in the case that while we're in the + // thread pool another read() finishes up the pool, and allocates a new + // one. + var thisPool = pool; + var toRead = Math.min(pool.length - pool.used, this.bufferSize); + var start = pool.used; + + function afterRead (err, bytesRead) { if (err) { self.emit('error', err); self.readable = false; @@ -639,20 +670,39 @@ ReadStream.prototype._read = function () { return; } + var b = thisPool.slice(start, start+bytesRead); + + // Possible optimizition here? + // Reclaim some bytes if bytesRead < toRead? + // Would need to ensure that pool === thisPool. + // do not emit events if the stream is paused if (self.paused) { - self.buffer = data; + self.buffer = b; return; } // do not emit events anymore after we declared the stream unreadable - if (!self.readable) { - return; - } + if (!self.readable) return; - self.emit('data', data); + self._emitData(b); self._read(); - }); + } + + fs.read(self.fd, pool, pool.used, toRead, undefined, afterRead); + pool.used += toRead; +}; + + +ReadStream.prototype._emitData = function (d) { + if (!this._encoding) { + this.emit('data', d); + } else if (this._decoder) { + this._decoder.write(d); + } else { + var string = d.toString(this._encoding, 0, d.length); + this.emit('data', string); + } }; @@ -664,7 +714,7 @@ ReadStream.prototype.forceClose = function (cb) { sys.error(readStreamForceCloseWarning); } return this.destroy(cb); -} +}; ReadStream.prototype.destroy = function (cb) { @@ -674,16 +724,12 @@ ReadStream.prototype.destroy = function (cb) { function close() { fs.close(self.fd, function(err) { if (err) { - if (cb) { - cb(err); - } + if (cb) cb(err); self.emit('error', err); return; } - if (cb) { - cb(null); - } + if (cb) cb(null); self.emit('close'); }); } @@ -705,7 +751,7 @@ ReadStream.prototype.resume = function() { this.paused = false; if (this.buffer) { - this.emit('data', this.buffer); + this._emitData(this.buffer); this.buffer = null; } @@ -858,7 +904,7 @@ WriteStream.prototype.forceClose = function (cb) { sys.error(writeStreamForceCloseWarning); } return this.destroy(cb); -} +}; WriteStream.prototype.forceClose = function (cb) { diff --git a/test/simple/test-file-read-stream.js b/test/simple/test-file-read-stream.js index a7fa8b69ab..c615030a4c 100644 --- a/test/simple/test-file-read-stream.js +++ b/test/simple/test-file-read-stream.js @@ -1,54 +1,65 @@ require('../common'); -var - path = require('path'), - fs = require('fs'), - fn = path.join(fixturesDir, 'test_ca.pem'), - file = fs.createReadStream(fn), - - callbacks = { - open: -1, - end: -1, - close: -1, - destroy: -1 - }, - - paused = false, - - fileContent = ''; - -file - .addListener('open', function(fd) { - callbacks.open++; - assert.equal('number', typeof fd); - assert.ok(file.readable); - }) - .addListener('error', function(err) { - throw err; - }) - .addListener('data', function(data) { - assert.ok(!paused); - fileContent += data; - - paused = true; - file.pause(); - assert.ok(file.paused); - - setTimeout(function() { - paused = false; - file.resume(); - assert.ok(!file.paused); - }, 10); - }) - .addListener('end', function(chunk) { - callbacks.end++; - }) - .addListener('close', function() { - callbacks.close++; - assert.ok(!file.readable); - - assert.equal(fs.readFileSync(fn), fileContent); - }); +// TODO Improved this test. test_ca.pem is too small. A proper test would +// great a large utf8 (with multibyte chars) file and stream it in, +// performing sanity checks throughout. + +Buffer = require('buffer').Buffer; +path = require('path'); +fs = require('fs'); +fn = path.join(fixturesDir, 'test_ca.pem'); + +file = fs.createReadStream(fn); + +callbacks = { + open: -1, + end: -1, + data: -1, + close: -1, + destroy: -1 +}; + +paused = false; + +fileContent = ''; + +file.addListener('open', function(fd) { + callbacks.open++; + assert.equal('number', typeof fd); + assert.ok(file.readable); +}); + +file.addListener('error', function(err) { + throw err; +}); + +file.addListener('data', function(data) { + callbacks.data++; + assert.ok(data instanceof Buffer); + assert.ok(!paused); + fileContent += data; + + paused = true; + file.pause(); + assert.ok(file.paused); + + setTimeout(function() { + paused = false; + file.resume(); + assert.ok(!file.paused); + }, 10); +}); + +file.addListener('end', function(chunk) { + callbacks.end++; +}); + +file.addListener('close', function() { + callbacks.close++; + assert.ok(!file.readable); + + assert.equal(fs.readFileSync(fn), fileContent); +}); var file2 = fs.createReadStream(fn); file2.destroy(function(err) {