Browse Source

fs: streams2

v0.9.4-release
isaacs 12 years ago
parent
commit
44b308b1f7
  1. 415
      lib/fs.js
  2. 35
      test/simple/test-file-write-stream.js
  3. 40
      test/simple/test-file-write-stream2.js
  4. 4
      test/simple/test-fs-read-stream.js

415
lib/fs.js

@ -34,6 +34,9 @@ var fs = exports;
var Stream = require('stream').Stream;
var EventEmitter = require('events').EventEmitter;
var Readable = Stream.Readable;
var Writable = Stream.Writable;
var kMinPoolSpace = 128;
var kPoolSize = 40 * 1024;
@ -1386,34 +1389,30 @@ fs.createReadStream = function(path, options) {
return new ReadStream(path, options);
};
var ReadStream = fs.ReadStream = function(path, options) {
if (!(this instanceof ReadStream)) return new ReadStream(path, options);
Stream.call(this);
var self = this;
this.path = path;
this.fd = null;
this.readable = true;
this.paused = false;
util.inherits(ReadStream, Readable);
fs.ReadStream = ReadStream;
this.flags = 'r';
this.mode = 438; /*=0666*/
this.bufferSize = 64 * 1024;
function ReadStream(path, options) {
if (!(this instanceof ReadStream))
return new ReadStream(path, options);
options = options || {};
// a little bit bigger buffer and water marks by default
options = util._extend({
bufferSize: 64 * 1024,
lowWaterMark: 16 * 1024,
highWaterMark: 64 * 1024
}, options || {});
// Mixin options into this
var keys = Object.keys(options);
for (var index = 0, length = keys.length; index < length; index++) {
var key = keys[index];
this[key] = options[key];
}
Readable.call(this, options);
assertEncoding(this.encoding);
this.path = path;
this.fd = options.hasOwnProperty('fd') ? options.fd : null;
this.flags = options.hasOwnProperty('flags') ? options.flags : 'r';
this.mode = options.hasOwnProperty('mode') ? options.mode : 438; /*=0666*/
if (this.encoding) this.setEncoding(this.encoding);
this.start = options.hasOwnProperty('start') ? options.start : undefined;
this.end = options.hasOwnProperty('start') ? options.end : undefined;
this.pos = undefined;
if (this.start !== undefined) {
if ('number' !== typeof this.start) {
@ -1432,41 +1431,40 @@ var ReadStream = fs.ReadStream = function(path, options) {
this.pos = this.start;
}
if (this.fd !== null) {
process.nextTick(function() {
self._read();
if (typeof this.fd !== 'number')
this.open();
this.on('end', function() {
this.destroy();
});
return;
}
fs.open(this.path, this.flags, this.mode, function(err, fd) {
if (err) {
self.emit('error', err);
self.readable = false;
fs.FileReadStream = fs.ReadStream; // support the legacy name
ReadStream.prototype.open = function() {
var self = this;
fs.open(this.path, this.flags, this.mode, function(er, fd) {
if (er) {
self.destroy();
self.emit('error', er);
return;
}
self.fd = fd;
self.emit('open', fd);
self._read();
// start the flow of data.
self.read();
});
};
util.inherits(ReadStream, Stream);
fs.FileReadStream = fs.ReadStream; // support the legacy name
ReadStream.prototype.setEncoding = function(encoding) {
assertEncoding(encoding);
var StringDecoder = require('string_decoder').StringDecoder; // lazy load
this._decoder = new StringDecoder(encoding);
};
ReadStream.prototype._read = function() {
var self = this;
if (!this.readable || this.paused || this.reading) return;
ReadStream.prototype._read = function(n, cb) {
if (typeof this.fd !== 'number')
return this.once('open', function() {
this._read(n, cb);
});
this.reading = true;
if (this.destroyed)
return;
if (!pool || pool.length - pool.used < kMinPoolSpace) {
// discard the old pool. Can't add to the free list because
@ -1475,149 +1473,110 @@ ReadStream.prototype._read = function() {
allocNewPool();
}
// Grab another reference to the pool in the case that while we're in the
// thread pool another read() finishes up the pool, and allocates a new
// one.
// Grab another reference to the pool in the case that while we're
// in the thread pool another read() finishes up the pool, and
// allocates a new one.
var thisPool = pool;
var toRead = Math.min(pool.length - pool.used, ~~this.bufferSize);
var toRead = Math.min(pool.length - pool.used, n);
var start = pool.used;
if (this.pos !== undefined) {
if (this.pos !== undefined)
toRead = Math.min(this.end - this.pos + 1, toRead);
}
function afterRead(err, bytesRead) {
self.reading = false;
if (err) {
fs.close(self.fd, function() {
self.fd = null;
self.emit('error', err);
self.readable = false;
});
return;
}
if (bytesRead === 0) {
if (this._decoder) {
var ret = this._decoder.end();
if (ret)
this.emit('data', ret);
}
self.emit('end');
self.destroy();
return;
}
var b = thisPool.slice(start, start + bytesRead);
// already read everything we were supposed to read!
// treat as EOF.
if (toRead <= 0)
return cb();
// Possible optimizition here?
// Reclaim some bytes if bytesRead < toRead?
// Would need to ensure that pool === thisPool.
// do not emit events if the stream is paused
if (self.paused) {
self.buffer = b;
return;
}
// the actual read.
var self = this;
fs.read(this.fd, pool, pool.used, toRead, this.pos, onread);
// do not emit events anymore after we declared the stream unreadable
if (!self.readable) return;
// move the pool positions, and internal position for reading.
if (this.pos !== undefined)
this.pos += toRead;
pool.used += toRead;
self._emitData(b);
self._read();
function onread(er, bytesRead) {
if (er) {
self.destroy();
return cb(er);
}
fs.read(this.fd, pool, pool.used, toRead, this.pos, afterRead);
var b = null;
if (bytesRead > 0)
b = thisPool.slice(start, start + bytesRead);
if (this.pos !== undefined) {
this.pos += toRead;
cb(null, b);
}
pool.used += toRead;
};
ReadStream.prototype._emitData = function(d) {
if (this._decoder) {
var string = this._decoder.write(d);
if (string.length) this.emit('data', string);
} else {
this.emit('data', d);
}
ReadStream.prototype.destroy = function() {
if (this.destroyed)
return;
this.destroyed = true;
if ('number' === typeof this.fd)
this.close();
};
ReadStream.prototype.destroy = function() {
ReadStream.prototype.close = function(cb) {
if (cb)
this.once('close', cb);
if (this.closed || 'number' !== typeof this.fd) {
if ('number' !== typeof this.fd)
this.once('open', close);
return process.nextTick(this.emit.bind(this, 'close'));
}
this.closed = true;
var self = this;
if (!this.readable) return;
this.readable = false;
close();
function close() {
fs.close(self.fd, function(err) {
if (err) {
self.emit('error', err);
} else {
fs.close(self.fd, function(er) {
if (er)
self.emit('error', er);
else
self.emit('close');
}
});
}
if (this.fd === null) {
this.addListener('open', close);
} else {
close();
self.fd = null;
}
};
ReadStream.prototype.pause = function() {
this.paused = true;
};
ReadStream.prototype.resume = function() {
this.paused = false;
if (this.buffer) {
var buffer = this.buffer;
this.buffer = null;
this._emitData(buffer);
}
// hasn't opened yet.
if (null == this.fd) return;
this._read();
};
fs.createWriteStream = function(path, options) {
return new WriteStream(path, options);
};
var WriteStream = fs.WriteStream = function(path, options) {
if (!(this instanceof WriteStream)) return new WriteStream(path, options);
util.inherits(WriteStream, Writable);
fs.WriteStream = WriteStream;
function WriteStream(path, options) {
if (!(this instanceof WriteStream))
return new WriteStream(path, options);
// a little bit bigger buffer and water marks by default
options = util._extend({
bufferSize: 64 * 1024,
lowWaterMark: 16 * 1024,
highWaterMark: 64 * 1024
}, options || {});
Stream.call(this);
Writable.call(this, options);
this.path = path;
this.fd = null;
this.writable = true;
this.flags = 'w';
this.encoding = 'binary';
this.mode = 438; /*=0666*/
this.bytesWritten = 0;
options = options || {};
this.fd = options.hasOwnProperty('fd') ? options.fd : null;
this.flags = options.hasOwnProperty('flags') ? options.flags : 'w';
this.mode = options.hasOwnProperty('mode') ? options.mode : 438; /*=0666*/
// Mixin options into this
var keys = Object.keys(options);
for (var index = 0, length = keys.length; index < length; index++) {
var key = keys[index];
this[key] = options[key];
}
this.start = options.hasOwnProperty('start') ? options.start : undefined;
this.pos = undefined;
this.bytesWritten = 0;
if (this.start !== undefined) {
if ('number' !== typeof this.start) {
@ -1630,154 +1589,54 @@ var WriteStream = fs.WriteStream = function(path, options) {
this.pos = this.start;
}
this.busy = false;
this._queue = [];
if ('number' !== typeof this.fd)
this.open();
if (this.fd === null) {
this._open = fs.open;
this._queue.push([this._open, this.path, this.flags, this.mode, undefined]);
this.flush();
// dispose on finish.
this.once('finish', this.close);
}
};
util.inherits(WriteStream, Stream);
fs.FileWriteStream = fs.WriteStream; // support the legacy name
WriteStream.prototype.flush = function() {
if (this.busy) return;
var self = this;
var args = this._queue.shift();
if (!args) {
if (this.drainable) { this.emit('drain'); }
return;
}
this.busy = true;
var method = args.shift(),
cb = args.pop();
args.push(function(err) {
self.busy = false;
if (err) {
self.writable = false;
function emit() {
self.fd = null;
if (cb) cb(err);
self.emit('error', err);
}
if (self.fd === null) {
emit();
} else {
fs.close(self.fd, emit);
}
return;
}
if (method == fs.write) {
self.bytesWritten += arguments[1];
if (cb) {
// write callback
cb(null, arguments[1]);
}
} else if (method === self._open) {
// save reference for file pointer
self.fd = arguments[1];
self.emit('open', self.fd);
} else if (method === fs.close) {
// stop flushing after close
if (cb) {
cb(null);
}
self.emit('close');
WriteStream.prototype.open = function() {
fs.open(this.path, this.flags, this.mode, function(er, fd) {
if (er) {
this.destroy();
this.emit('error', er);
return;
}
self.flush();
});
// Inject the file pointer
if (method !== self._open) {
args.unshift(this.fd);
}
method.apply(this, args);
this.fd = fd;
this.emit('open', fd);
}.bind(this));
};
WriteStream.prototype.write = function(data) {
if (!this.writable) {
this.emit('error', new Error('stream not writable'));
return false;
}
this.drainable = true;
WriteStream.prototype._write = function(data, cb) {
if (!Buffer.isBuffer(data))
return this.emit('error', new Error('Invalid data'));
var cb;
if (typeof(arguments[arguments.length - 1]) == 'function') {
cb = arguments[arguments.length - 1];
}
if (typeof this.fd !== 'number')
return this.once('open', this._write.bind(this, data, cb));
if (!Buffer.isBuffer(data)) {
var encoding = 'utf8';
if (typeof(arguments[1]) == 'string') encoding = arguments[1];
assertEncoding(encoding);
data = new Buffer('' + data, encoding);
var self = this;
fs.write(this.fd, data, 0, data.length, this.pos, function(er, bytes) {
if (er) {
self.destroy();
return cb(er);
}
self.bytesWritten += bytes;
cb();
});
this._queue.push([fs.write, data, 0, data.length, this.pos, cb]);
if (this.pos !== undefined) {
if (this.pos !== undefined)
this.pos += data.length;
}
this.flush();
return false;
};
WriteStream.prototype.end = function(data, encoding, cb) {
if (typeof(data) === 'function') {
cb = data;
} else if (typeof(encoding) === 'function') {
cb = encoding;
this.write(data);
} else if (arguments.length > 0) {
this.write(data, encoding);
}
this.writable = false;
this._queue.push([fs.close, cb]);
this.flush();
};
WriteStream.prototype.destroy = function() {
var self = this;
if (!this.writable) return;
this.writable = false;
function close() {
fs.close(self.fd, function(err) {
if (err) {
self.emit('error', err);
} else {
self.emit('close');
}
});
}
if (this.fd === null) {
this.addListener('open', close);
} else {
close();
}
};
WriteStream.prototype.destroy = ReadStream.prototype.destroy;
WriteStream.prototype.close = ReadStream.prototype.close;
// There is no shutdown() for files.
WriteStream.prototype.destroySoon = WriteStream.prototype.end;

35
test/simple/test-file-write-stream.js

@ -22,46 +22,50 @@
var common = require('../common');
var assert = require('assert');
var path = require('path'),
fs = require('fs'),
fn = path.join(common.tmpDir, 'write.txt'),
file = fs.createWriteStream(fn),
var path = require('path');
var fs = require('fs');
var fn = path.join(common.tmpDir, 'write.txt');
var file = fs.createWriteStream(fn, {
lowWaterMark: 3,
highWaterMark: 10
});
EXPECTED = '012345678910',
var EXPECTED = '012345678910';
callbacks = {
var callbacks = {
open: -1,
drain: -2,
close: -1,
endCb: -1
close: -1
};
file
.on('open', function(fd) {
console.error('open!');
callbacks.open++;
assert.equal('number', typeof fd);
})
.on('error', function(err) {
throw err;
console.error('error!', err.stack);
})
.on('drain', function() {
console.error('drain!', callbacks.drain);
callbacks.drain++;
if (callbacks.drain == -1) {
assert.equal(EXPECTED, fs.readFileSync(fn));
assert.equal(EXPECTED, fs.readFileSync(fn, 'utf8'));
file.write(EXPECTED);
} else if (callbacks.drain == 0) {
assert.equal(EXPECTED + EXPECTED, fs.readFileSync(fn));
file.end(function(err) {
assert.ok(!err);
callbacks.endCb++;
});
assert.equal(EXPECTED + EXPECTED, fs.readFileSync(fn, 'utf8'));
file.end();
}
})
.on('close', function() {
console.error('close!');
assert.strictEqual(file.bytesWritten, EXPECTED.length * 2);
callbacks.close++;
assert.throws(function() {
console.error('write after end should not be allowed');
file.write('should not work anymore');
});
@ -70,7 +74,7 @@ file
for (var i = 0; i < 11; i++) {
(function(i) {
assert.strictEqual(false, file.write(i));
file.write('' + i);
})(i);
}
@ -78,4 +82,5 @@ process.on('exit', function() {
for (var k in callbacks) {
assert.equal(0, callbacks[k], k + ' count off by ' + callbacks[k]);
}
console.log('ok');
});

40
test/simple/test-file-write-stream2.js

@ -22,18 +22,18 @@
var common = require('../common');
var assert = require('assert');
var path = require('path'),
fs = require('fs'),
util = require('util');
var path = require('path');
var fs = require('fs');
var util = require('util');
var filepath = path.join(common.tmpDir, 'write.txt'),
file;
var filepath = path.join(common.tmpDir, 'write.txt');
var file;
var EXPECTED = '012345678910';
var cb_expected = 'write open drain write drain close error ',
cb_occurred = '';
var cb_expected = 'write open drain write drain close error ';
var cb_occurred = '';
var countDrains = 0;
@ -47,6 +47,8 @@ process.on('exit', function() {
assert.strictEqual(cb_occurred, cb_expected,
'events missing or out of order: "' +
cb_occurred + '" !== "' + cb_expected + '"');
} else {
console.log('ok');
}
});
@ -59,22 +61,30 @@ function removeTestFile() {
removeTestFile();
file = fs.createWriteStream(filepath);
// drain at 0, return false at 10.
file = fs.createWriteStream(filepath, {
lowWaterMark: 0,
highWaterMark: 11
});
file.on('open', function(fd) {
console.error('open');
cb_occurred += 'open ';
assert.equal(typeof fd, 'number');
});
file.on('drain', function() {
console.error('drain');
cb_occurred += 'drain ';
++countDrains;
if (countDrains === 1) {
assert.equal(fs.readFileSync(filepath), EXPECTED);
file.write(EXPECTED);
console.error('drain=1, write again');
assert.equal(fs.readFileSync(filepath, 'utf8'), EXPECTED);
console.error('ondrain write ret=%j', file.write(EXPECTED));
cb_occurred += 'write ';
} else if (countDrains == 2) {
assert.equal(fs.readFileSync(filepath), EXPECTED + EXPECTED);
console.error('second drain, end');
assert.equal(fs.readFileSync(filepath, 'utf8'), EXPECTED + EXPECTED);
file.end();
}
});
@ -88,11 +98,15 @@ file.on('close', function() {
file.on('error', function(err) {
cb_occurred += 'error ';
assert.ok(err.message.indexOf('not writable') >= 0);
assert.ok(err.message.indexOf('write after end') >= 0);
});
for (var i = 0; i < 11; i++) {
assert.strictEqual(file.write(i), false);
var ret = file.write(i + '');
console.error('%d %j', i, ret);
// return false when i hits 10
assert(ret === (i != 10));
}
cb_occurred += 'write ';

4
test/simple/test-fs-read-stream.js

@ -60,12 +60,10 @@ file.on('data', function(data) {
paused = true;
file.pause();
assert.ok(file.paused);
setTimeout(function() {
paused = false;
file.resume();
assert.ok(!file.paused);
}, 10);
});
@ -77,7 +75,6 @@ file.on('end', function(chunk) {
file.on('close', function() {
callbacks.close++;
assert.ok(!file.readable);
//assert.equal(fs.readFileSync(fn), fileContent);
});
@ -104,6 +101,7 @@ process.on('exit', function() {
assert.equal(2, callbacks.close);
assert.equal(30000, file.length);
assert.equal(10000, file3.length);
console.error('ok');
});
var file4 = fs.createReadStream(rangeFile, {bufferSize: 1, start: 1, end: 2});

Loading…
Cancel
Save