Browse Source

Initial read stream implementation

v0.7.4-release
Felix Geisendörfer 15 years ago
parent
commit
f6e00759ef
  1. 90
      lib/fs.js
  2. 54
      test/simple/test-file-read-stream.js

90
lib/fs.js

@ -376,6 +376,96 @@ exports.realpath = function(path, callback) {
}); });
} }
exports.fileReadStream = function(path, options) {
return new FileReadStream(path, options);
};
var FileReadStream = exports.FileReadStream = function(path, options) {
this.path = path;
this.fd = null;
this.readable = true;
this.paused = false;
this.flags = 'r';
this.encoding = 'binary';
this.mode = 0666;
this.bufferSize = 4 * 1024;
process.mixin(this, options || {});
var
self = this,
buffer = [];
function read() {
if (!self.readable || self.paused) {
return;
}
fs.read(self.fd, self.bufferSize, undefined, self.encoding, function(err, data, bytesRead) {
if (bytesRead === 0) {
self.emit('end');
self.close();
return;
}
// do not emit events if the stream is paused
if (self.paused) {
buffer.push(data);
return;
}
self.emit('data', data);
read();
});
}
fs.open(this.path, this.flags, this.mode, function(err, fd) {
if (err) {
self.emit('error', err);
return;
}
self.fd = fd;
self.emit('open', fd);
read();
});
this.close = function() {
this.readable = false;
fs.close(this.fd, function(err) {
if (err) {
self.emit('error', err);
return;
}
self.emit('close');
});
};
this.pause = function() {
this.paused = true;
};
this.resume = function() {
this.paused = false;
// emit any buffered read events before continuing
var data;
while (!this.paused) {
data = buffer.shift();
if (data === undefined) {
break;
}
self.emit('data', data);
}
read();
};
};
FileReadStream.prototype.__proto__ = process.EventEmitter.prototype;
exports.fileWriteStream = function(path, options) { exports.fileWriteStream = function(path, options) {
return new FileWriteStream(path, options); return new FileWriteStream(path, options);
}; };

54
test/simple/test-file-read-stream.js

@ -0,0 +1,54 @@
process.mixin(require('../common'));
var
fn = path.join(fixturesDir, 'multipart.js'),
file = fs.fileReadStream(fn),
callbacks = {
open: -1,
end: -1,
close: -1
},
paused = false,
fileContent = '';
file
.addListener('open', function(fd) {
callbacks.open++;
assert.equal('number', typeof fd);
assert.ok(file.readable);
})
.addListener('error', function(err) {
throw err;
})
.addListener('data', function(data) {
assert.ok(!paused);
fileContent += data;
paused = true;
file.pause();
assert.ok(file.paused);
setTimeout(function() {
paused = false;
file.resume();
assert.ok(!file.paused);
}, 10);
})
.addListener('end', function(chunk) {
callbacks.end++;
})
.addListener('close', function() {
callbacks.close++;
assert.ok(!file.readable);
assert.equal(fs.readFileSync(fn), fileContent);
});
process.addListener('exit', function() {
for (var k in callbacks) {
assert.equal(0, callbacks[k], k+' count off by '+callbacks[k]);
}
});
Loading…
Cancel
Save