Browse Source

fs.ReadStream should emit Buffers

And do proper utf8 encoding.
v0.7.4-release
Ryan Dahl 15 years ago
parent
commit
19f475c573
  1. 96
      lib/fs.js
  2. 45
      test/simple/test-file-read-stream.js

96
lib/fs.js

@ -1,10 +1,13 @@
var sys = require('sys'), var sys = require('sys');
events = require('events'), var events = require('events');
Buffer = require('buffer').Buffer; var Buffer = require('buffer').Buffer;
var binding = process.binding('fs'); var binding = process.binding('fs');
var fs = exports; var fs = exports;
var kMinPoolSpace = 128;
var kPoolSize = 40*1024;
fs.Stats = binding.Stats; fs.Stats = binding.Stats;
fs.Stats.prototype._checkModeProperty = function (property) { fs.Stats.prototype._checkModeProperty = function (property) {
@ -565,8 +568,16 @@ fs.realpath = function (path, callback) {
} }
} }
next(); next();
};
var pool;
function allocNewPool () {
pool = new Buffer(kPoolSize);
pool.used = 0;
} }
fs.createReadStream = function(path, options) { fs.createReadStream = function(path, options) {
return new ReadStream(path, options); return new ReadStream(path, options);
}; };
@ -580,7 +591,6 @@ var ReadStream = fs.ReadStream = function(path, options) {
this.paused = false; this.paused = false;
this.flags = 'r'; this.flags = 'r';
this.encoding = 'binary';
this.mode = 0666; this.mode = 0666;
this.bufferSize = 4 * 1024; this.bufferSize = 4 * 1024;
@ -615,18 +625,39 @@ sys.inherits(ReadStream, events.EventEmitter);
fs.FileReadStream = fs.ReadStream; // support the legacy name fs.FileReadStream = fs.ReadStream; // support the legacy name
ReadStream.prototype.setEncoding = function (encoding) { ReadStream.prototype.setEncoding = function (encoding) {
this.encoding = encoding; var Utf8Decoder = require("utf8decoder").Utf8Decoder; // lazy load
var self = this;
this._encoding = enc.toLowerCase();
if (this._encoding == 'utf-8' || this._encoding == 'utf8') {
this._decoder = new Utf8Decoder();
this._decoder.onString = function(str) {
self.emit('data', str);
};
} else if (this._decoder) {
delete this._decoder;
}
}; };
ReadStream.prototype._read = function () { ReadStream.prototype._read = function () {
var self = this; var self = this;
if (!self.readable || self.paused) return; if (!self.readable || self.paused) return;
fs.read(self.fd, if (!pool || pool.length - pool.used < kMinPoolSpace) {
self.bufferSize, // discard the old pool. Can't add to the free list because
undefined, // users might have refernces to slices on it.
self.encoding, pool = null;
function(err, data, bytesRead) { allocNewPool();
}
// Grab another reference to the pool in the case that while we're in the
// thread pool another read() finishes up the pool, and allocates a new
// one.
var thisPool = pool;
var toRead = Math.min(pool.length - pool.used, this.bufferSize);
var start = pool.used;
function afterRead (err, bytesRead) {
if (err) { if (err) {
self.emit('error', err); self.emit('error', err);
self.readable = false; self.readable = false;
@ -639,20 +670,39 @@ ReadStream.prototype._read = function () {
return; return;
} }
var b = thisPool.slice(start, start+bytesRead);
// Possible optimizition here?
// Reclaim some bytes if bytesRead < toRead?
// Would need to ensure that pool === thisPool.
// do not emit events if the stream is paused // do not emit events if the stream is paused
if (self.paused) { if (self.paused) {
self.buffer = data; self.buffer = b;
return; return;
} }
// do not emit events anymore after we declared the stream unreadable // do not emit events anymore after we declared the stream unreadable
if (!self.readable) { if (!self.readable) return;
return;
}
self.emit('data', data); self._emitData(b);
self._read(); self._read();
}); }
fs.read(self.fd, pool, pool.used, toRead, undefined, afterRead);
pool.used += toRead;
};
ReadStream.prototype._emitData = function (d) {
if (!this._encoding) {
this.emit('data', d);
} else if (this._decoder) {
this._decoder.write(d);
} else {
var string = d.toString(this._encoding, 0, d.length);
this.emit('data', string);
}
}; };
@ -664,7 +714,7 @@ ReadStream.prototype.forceClose = function (cb) {
sys.error(readStreamForceCloseWarning); sys.error(readStreamForceCloseWarning);
} }
return this.destroy(cb); return this.destroy(cb);
} };
ReadStream.prototype.destroy = function (cb) { ReadStream.prototype.destroy = function (cb) {
@ -674,16 +724,12 @@ ReadStream.prototype.destroy = function (cb) {
function close() { function close() {
fs.close(self.fd, function(err) { fs.close(self.fd, function(err) {
if (err) { if (err) {
if (cb) { if (cb) cb(err);
cb(err);
}
self.emit('error', err); self.emit('error', err);
return; return;
} }
if (cb) { if (cb) cb(null);
cb(null);
}
self.emit('close'); self.emit('close');
}); });
} }
@ -705,7 +751,7 @@ ReadStream.prototype.resume = function() {
this.paused = false; this.paused = false;
if (this.buffer) { if (this.buffer) {
this.emit('data', this.buffer); this._emitData(this.buffer);
this.buffer = null; this.buffer = null;
} }
@ -858,7 +904,7 @@ WriteStream.prototype.forceClose = function (cb) {
sys.error(writeStreamForceCloseWarning); sys.error(writeStreamForceCloseWarning);
} }
return this.destroy(cb); return this.destroy(cb);
} };
WriteStream.prototype.forceClose = function (cb) { WriteStream.prototype.forceClose = function (cb) {

45
test/simple/test-file-read-stream.js

@ -1,32 +1,41 @@
require('../common'); require('../common');
var // TODO Improved this test. test_ca.pem is too small. A proper test would
path = require('path'), // great a large utf8 (with multibyte chars) file and stream it in,
fs = require('fs'), // performing sanity checks throughout.
fn = path.join(fixturesDir, 'test_ca.pem'),
file = fs.createReadStream(fn), Buffer = require('buffer').Buffer;
path = require('path');
fs = require('fs');
fn = path.join(fixturesDir, 'test_ca.pem');
file = fs.createReadStream(fn);
callbacks = { callbacks = {
open: -1, open: -1,
end: -1, end: -1,
data: -1,
close: -1, close: -1,
destroy: -1 destroy: -1
}, };
paused = false, paused = false;
fileContent = ''; fileContent = '';
file file.addListener('open', function(fd) {
.addListener('open', function(fd) {
callbacks.open++; callbacks.open++;
assert.equal('number', typeof fd); assert.equal('number', typeof fd);
assert.ok(file.readable); assert.ok(file.readable);
}) });
.addListener('error', function(err) {
file.addListener('error', function(err) {
throw err; throw err;
}) });
.addListener('data', function(data) {
file.addListener('data', function(data) {
callbacks.data++;
assert.ok(data instanceof Buffer);
assert.ok(!paused); assert.ok(!paused);
fileContent += data; fileContent += data;
@ -39,11 +48,13 @@ file
file.resume(); file.resume();
assert.ok(!file.paused); assert.ok(!file.paused);
}, 10); }, 10);
}) });
.addListener('end', function(chunk) {
file.addListener('end', function(chunk) {
callbacks.end++; callbacks.end++;
}) });
.addListener('close', function() {
file.addListener('close', function() {
callbacks.close++; callbacks.close++;
assert.ok(!file.readable); assert.ok(!file.readable);

Loading…
Cancel
Save