From 61785afb3d22133fb1968ac7c0274999d4e432eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Tue, 2 Mar 2010 23:12:52 +0100 Subject: [PATCH 01/11] Initial write stream implementation --- lib/fs.js | 87 +++++++++++++++++++++++++++ test/simple/test-file-write-stream.js | 47 +++++++++++++++ 2 files changed, 134 insertions(+) create mode 100644 test/simple/test-file-write-stream.js diff --git a/lib/fs.js b/lib/fs.js index 5687424dc2..02701f687f 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -375,3 +375,90 @@ exports.realpath = function(path, callback) { callback(null, normalize(path)); }); } + +exports.fileWriteStream = function(path, options) { + return new FileWriteStream(path, options); +}; + +var FileWriteStream = exports.FileWriteStream = function(path, options) { + this.path = path; + this.fd = null; + this.closed = false; + + this.flags = 'w'; + this.encoding = 'binary'; + this.mode = 0666; + + process.mixin(this, options || {}); + + var + self = this, + queue = [], + busy = false; + + queue.push([fs.open, this.path, this.flags, this.mode]); + + function pump() { + if (busy) { + return; + } + + var args = queue.shift(); + if (!args) { + return self.emit('drain'); + } + + busy = true; + + var method = args.shift(); + + args.push(function(err) { + busy = false; + + if (err) { + self.emit('error', err); + return; + } + + // save reference for file pointer + if (method === fs.open) { + self.fd = arguments[1]; + self.emit('open', self.fd); + } + + // stop pumping after close + if (method === fs.close) { + self.emit('close'); + return; + } + + pump(); + }); + + // Inject the file pointer + if (method !== fs.open) { + args.unshift(self.fd); + } + + method.apply(null, args); + }; + + this.write = function(data) { + if (this.closed) { + throw new Error('stream already closed'); + } + + queue.push([fs.write, data, undefined, this.encoding]); + pump(); + return false; + }; + + this.close = function() { + this.closed = true; + queue.push([fs.close,]); + pump(); + }; + + pump(); +}; +FileWriteStream.prototype.__proto__ = process.EventEmitter.prototype; diff --git a/test/simple/test-file-write-stream.js b/test/simple/test-file-write-stream.js new file mode 100644 index 0000000000..669985e178 --- /dev/null +++ b/test/simple/test-file-write-stream.js @@ -0,0 +1,47 @@ +process.mixin(require('../common')); + +var + fn = path.join(fixturesDir, "write.txt"), + file = fs.fileWriteStream(fn), + + EXPECTED = '0123456789', + + callbacks = { + open: -1, + drain: -2, + close: -1 + }; + +file + .addListener('open', function(fd) { + callbacks.open++; + assert.equal('number', typeof fd); + }) + .addListener('drain', function() { + callbacks.drain++; + if (callbacks.drain == -1) { + assert.equal(EXPECTED, fs.readFileSync(fn)); + file.write(EXPECTED); + } else if (callbacks.drain == 0) { + assert.equal(EXPECTED+EXPECTED, fs.readFileSync(fn)); + file.close(); + } + }) + .addListener('close', function() { + callbacks.close++; + assert.throws(function() { + file.write('should not work anymore'); + }); + + fs.unlinkSync(fn); + }); + +for (var i = 0; i < 10; i++) { + assert.strictEqual(false, file.write(i)); +} + +process.addListener('exit', function() { + for (var k in callbacks) { + assert.equal(0, callbacks[k], k+' count off by '+callbacks[k]); + } +}); \ No newline at end of file From 18a70ffda13d3692aee34c097ad9330756d6479b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Tue, 2 Mar 2010 23:28:00 +0100 Subject: [PATCH 02/11] Tweaks - Add 'writeable' property - Renamed pump->flush - Use sys.mixin instead of process.mixin --- lib/fs.js | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/lib/fs.js b/lib/fs.js index 02701f687f..00f6168c15 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -1,3 +1,5 @@ +var sys = require('sys'); + exports.Stats = process.Stats; process.Stats.prototype._checkModeProperty = function (property) { @@ -383,13 +385,13 @@ exports.fileWriteStream = function(path, options) { var FileWriteStream = exports.FileWriteStream = function(path, options) { this.path = path; this.fd = null; - this.closed = false; + this.writeable = true; this.flags = 'w'; this.encoding = 'binary'; this.mode = 0666; - process.mixin(this, options || {}); + sys.mixin(this, options || {}); var self = this, @@ -398,7 +400,7 @@ var FileWriteStream = exports.FileWriteStream = function(path, options) { queue.push([fs.open, this.path, this.flags, this.mode]); - function pump() { + function flush() { if (busy) { return; } @@ -416,6 +418,7 @@ var FileWriteStream = exports.FileWriteStream = function(path, options) { busy = false; if (err) { + self.writeable = false; self.emit('error', err); return; } @@ -426,13 +429,13 @@ var FileWriteStream = exports.FileWriteStream = function(path, options) { self.emit('open', self.fd); } - // stop pumping after close + // stop flushing after close if (method === fs.close) { self.emit('close'); return; } - pump(); + flush(); }); // Inject the file pointer @@ -444,21 +447,21 @@ var FileWriteStream = exports.FileWriteStream = function(path, options) { }; this.write = function(data) { - if (this.closed) { - throw new Error('stream already closed'); + if (!this.writeable) { + throw new Error('stream not writeable'); } queue.push([fs.write, data, undefined, this.encoding]); - pump(); + flush(); return false; }; this.close = function() { - this.closed = true; + this.writeable = false; queue.push([fs.close,]); - pump(); + flush(); }; - pump(); + flush(); }; -FileWriteStream.prototype.__proto__ = process.EventEmitter.prototype; +FileWriteStream.prototype.__proto__ = process.EventEmitter.prototype; \ No newline at end of file From 9415ca909ea0e5e5c0b390fa60759952143beed3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Wed, 3 Mar 2010 12:39:17 +0100 Subject: [PATCH 03/11] Use process.mixin instead of sys.mixin The process namespace has not been cleaned up yet, so mixin is still attached to process. --- lib/fs.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/fs.js b/lib/fs.js index 00f6168c15..24542ae0c4 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -1,5 +1,3 @@ -var sys = require('sys'); - exports.Stats = process.Stats; process.Stats.prototype._checkModeProperty = function (property) { @@ -391,7 +389,7 @@ var FileWriteStream = exports.FileWriteStream = function(path, options) { this.encoding = 'binary'; this.mode = 0666; - sys.mixin(this, options || {}); + process.mixin(this, options || {}); var self = this, From f6e00759effd1d550657aa6ce0208ee04eda87bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Wed, 3 Mar 2010 12:39:41 +0100 Subject: [PATCH 04/11] Initial read stream implementation --- lib/fs.js | 90 ++++++++++++++++++++++++++++ test/simple/test-file-read-stream.js | 54 +++++++++++++++++ 2 files changed, 144 insertions(+) create mode 100644 test/simple/test-file-read-stream.js diff --git a/lib/fs.js b/lib/fs.js index 24542ae0c4..ac13b2670f 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -376,6 +376,96 @@ exports.realpath = function(path, callback) { }); } +exports.fileReadStream = function(path, options) { + return new FileReadStream(path, options); +}; + +var FileReadStream = exports.FileReadStream = function(path, options) { + this.path = path; + this.fd = null; + this.readable = true; + this.paused = false; + + this.flags = 'r'; + this.encoding = 'binary'; + this.mode = 0666; + this.bufferSize = 4 * 1024; + + process.mixin(this, options || {}); + + var + self = this, + buffer = []; + + function read() { + if (!self.readable || self.paused) { + return; + } + + fs.read(self.fd, self.bufferSize, undefined, self.encoding, function(err, data, bytesRead) { + if (bytesRead === 0) { + self.emit('end'); + self.close(); + return; + } + + // do not emit events if the stream is paused + if (self.paused) { + buffer.push(data); + return; + } + + self.emit('data', data); + read(); + }); + } + + fs.open(this.path, this.flags, this.mode, function(err, fd) { + if (err) { + self.emit('error', err); + return; + } + + self.fd = fd; + self.emit('open', fd); + read(); + }); + + this.close = function() { + this.readable = false; + fs.close(this.fd, function(err) { + if (err) { + self.emit('error', err); + return; + } + + self.emit('close'); + }); + }; + + this.pause = function() { + this.paused = true; + }; + + this.resume = function() { + this.paused = false; + + // emit any buffered read events before continuing + var data; + while (!this.paused) { + data = buffer.shift(); + if (data === undefined) { + break; + } + + self.emit('data', data); + } + + read(); + }; +}; +FileReadStream.prototype.__proto__ = process.EventEmitter.prototype; + exports.fileWriteStream = function(path, options) { return new FileWriteStream(path, options); }; diff --git a/test/simple/test-file-read-stream.js b/test/simple/test-file-read-stream.js new file mode 100644 index 0000000000..3b358b5c5c --- /dev/null +++ b/test/simple/test-file-read-stream.js @@ -0,0 +1,54 @@ +process.mixin(require('../common')); + +var + fn = path.join(fixturesDir, 'multipart.js'), + file = fs.fileReadStream(fn), + + callbacks = { + open: -1, + end: -1, + close: -1 + }, + + paused = false, + + fileContent = ''; + +file + .addListener('open', function(fd) { + callbacks.open++; + assert.equal('number', typeof fd); + assert.ok(file.readable); + }) + .addListener('error', function(err) { + throw err; + }) + .addListener('data', function(data) { + assert.ok(!paused); + fileContent += data; + + paused = true; + file.pause(); + assert.ok(file.paused); + + setTimeout(function() { + paused = false; + file.resume(); + assert.ok(!file.paused); + }, 10); + }) + .addListener('end', function(chunk) { + callbacks.end++; + }) + .addListener('close', function() { + callbacks.close++; + assert.ok(!file.readable); + + assert.equal(fs.readFileSync(fn), fileContent); + }); + +process.addListener('exit', function() { + for (var k in callbacks) { + assert.equal(0, callbacks[k], k+' count off by '+callbacks[k]); + } +}); \ No newline at end of file From b4fba5fe8ee5fe3a43405fbd8838feab5f259bc8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Thu, 4 Mar 2010 14:25:59 +0100 Subject: [PATCH 05/11] Simplify buffering There is no way more than one read event would be buffered. --- lib/fs.js | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/lib/fs.js b/lib/fs.js index ac13b2670f..de1bf94ce3 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -395,7 +395,7 @@ var FileReadStream = exports.FileReadStream = function(path, options) { var self = this, - buffer = []; + buffer = null; function read() { if (!self.readable || self.paused) { @@ -411,7 +411,7 @@ var FileReadStream = exports.FileReadStream = function(path, options) { // do not emit events if the stream is paused if (self.paused) { - buffer.push(data); + buffer = data; return; } @@ -450,15 +450,9 @@ var FileReadStream = exports.FileReadStream = function(path, options) { this.resume = function() { this.paused = false; - // emit any buffered read events before continuing - var data; - while (!this.paused) { - data = buffer.shift(); - if (data === undefined) { - break; - } - - self.emit('data', data); + if (buffer !== null) { + self.emit('data', buffer); + buffer = null; } read(); From 48562fa9389946ae7ac5b977004fc0d47e5af947 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Thu, 4 Mar 2010 22:06:06 +0100 Subject: [PATCH 06/11] Updated file streams Read streams now only support forceClose() Write streams support close() and forceClose() --- lib/fs.js | 22 ++++++++++++++++++++-- test/simple/test-file-write-stream.js | 3 +++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/lib/fs.js b/lib/fs.js index de1bf94ce3..cf133158e8 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -405,7 +405,7 @@ var FileReadStream = exports.FileReadStream = function(path, options) { fs.read(self.fd, self.bufferSize, undefined, self.encoding, function(err, data, bytesRead) { if (bytesRead === 0) { self.emit('end'); - self.close(); + self.forceClose(); return; } @@ -415,6 +415,11 @@ var FileReadStream = exports.FileReadStream = function(path, options) { return; } + // do not emit events anymore after we declared the stream unreadable + if (!self.readable) { + return; + } + self.emit('data', data); read(); }); @@ -431,7 +436,7 @@ var FileReadStream = exports.FileReadStream = function(path, options) { read(); }); - this.close = function() { + this.forceClose = function() { this.readable = false; fs.close(this.fd, function(err) { if (err) { @@ -544,6 +549,19 @@ var FileWriteStream = exports.FileWriteStream = function(path, options) { flush(); }; + this.forceClose = function() { + this.writeable = false; + fs.close(self.fd, function(err) { + if (err) { + self.emit('error', err); + return; + } + + self.emit('close'); + }); + }; + + flush(); }; FileWriteStream.prototype.__proto__ = process.EventEmitter.prototype; \ No newline at end of file diff --git a/test/simple/test-file-write-stream.js b/test/simple/test-file-write-stream.js index 669985e178..5f15bfe1c3 100644 --- a/test/simple/test-file-write-stream.js +++ b/test/simple/test-file-write-stream.js @@ -17,6 +17,9 @@ file callbacks.open++; assert.equal('number', typeof fd); }) + .addListener('error', function(err) { + throw err; + }) .addListener('drain', function() { callbacks.drain++; if (callbacks.drain == -1) { From 0fcc94525a59df4ae1a90a7c3d4c62df0e19b36c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Fri, 5 Mar 2010 18:56:25 +0100 Subject: [PATCH 07/11] Renamed fileReadStream -> createReadStream Did the same for fileWriteStream as well. --- lib/fs.js | 4 ++-- test/simple/test-file-read-stream.js | 2 +- test/simple/test-file-write-stream.js | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/fs.js b/lib/fs.js index cf133158e8..579199d93e 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -376,7 +376,7 @@ exports.realpath = function(path, callback) { }); } -exports.fileReadStream = function(path, options) { +exports.createReadStream = function(path, options) { return new FileReadStream(path, options); }; @@ -465,7 +465,7 @@ var FileReadStream = exports.FileReadStream = function(path, options) { }; FileReadStream.prototype.__proto__ = process.EventEmitter.prototype; -exports.fileWriteStream = function(path, options) { +exports.createWriteStream = function(path, options) { return new FileWriteStream(path, options); }; diff --git a/test/simple/test-file-read-stream.js b/test/simple/test-file-read-stream.js index 3b358b5c5c..447629e41b 100644 --- a/test/simple/test-file-read-stream.js +++ b/test/simple/test-file-read-stream.js @@ -2,7 +2,7 @@ process.mixin(require('../common')); var fn = path.join(fixturesDir, 'multipart.js'), - file = fs.fileReadStream(fn), + file = fs.createReadStream(fn), callbacks = { open: -1, diff --git a/test/simple/test-file-write-stream.js b/test/simple/test-file-write-stream.js index 5f15bfe1c3..89059033ab 100644 --- a/test/simple/test-file-write-stream.js +++ b/test/simple/test-file-write-stream.js @@ -2,7 +2,7 @@ process.mixin(require('../common')); var fn = path.join(fixturesDir, "write.txt"), - file = fs.fileWriteStream(fn), + file = fs.createWriteStream(fn), EXPECTED = '0123456789', From 145fac2b56a29270c798996db508a9fc6338c1f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Fri, 5 Mar 2010 19:24:20 +0100 Subject: [PATCH 08/11] Use sys inherits Also use events.EventEmitter instead of process.EventEmitter. --- lib/fs.js | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/lib/fs.js b/lib/fs.js index 579199d93e..165ece8155 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -1,3 +1,7 @@ +var + sys = require('sys'), + events = require('events'); + exports.Stats = process.Stats; process.Stats.prototype._checkModeProperty = function (property) { @@ -381,6 +385,8 @@ exports.createReadStream = function(path, options) { }; var FileReadStream = exports.FileReadStream = function(path, options) { + events.EventEmitter.call(this); + this.path = path; this.fd = null; this.readable = true; @@ -463,13 +469,15 @@ var FileReadStream = exports.FileReadStream = function(path, options) { read(); }; }; -FileReadStream.prototype.__proto__ = process.EventEmitter.prototype; +sys.inherits(FileReadStream, events.EventEmitter); exports.createWriteStream = function(path, options) { return new FileWriteStream(path, options); }; var FileWriteStream = exports.FileWriteStream = function(path, options) { + events.EventEmitter.call(this); + this.path = path; this.fd = null; this.writeable = true; @@ -564,4 +572,4 @@ var FileWriteStream = exports.FileWriteStream = function(path, options) { flush(); }; -FileWriteStream.prototype.__proto__ = process.EventEmitter.prototype; \ No newline at end of file +sys.inherits(FileWriteStream, events.EventEmitter); \ No newline at end of file From 78c61000c20e0456a3d887bc397fe29815f2f9e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Fri, 5 Mar 2010 19:53:59 +0100 Subject: [PATCH 09/11] Properly handle read errors Also set readable to false if the initial fs.open call failed. --- lib/fs.js | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/lib/fs.js b/lib/fs.js index 165ece8155..56c739389b 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -409,6 +409,12 @@ var FileReadStream = exports.FileReadStream = function(path, options) { } fs.read(self.fd, self.bufferSize, undefined, self.encoding, function(err, data, bytesRead) { + if (err) { + self.emit('error', err); + self.readable = false; + return; + } + if (bytesRead === 0) { self.emit('end'); self.forceClose(); @@ -434,6 +440,7 @@ var FileReadStream = exports.FileReadStream = function(path, options) { fs.open(this.path, this.flags, this.mode, function(err, fd) { if (err) { self.emit('error', err); + self.readable = false; return; } From a96b5c792e49c0e2e425ce14746276d1c59c45ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Fri, 5 Mar 2010 19:54:28 +0100 Subject: [PATCH 10/11] Documentation for FileReadStream --- doc/api.txt | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/doc/api.txt b/doc/api.txt index 73f4f54c2e..c4a0ec355c 100644 --- a/doc/api.txt +++ b/doc/api.txt @@ -783,6 +783,47 @@ Objects returned from +fs.stat()+ and +fs.lstat()+ are of this type. +stats.isSocket()+:: ... +=== +fs.FileReadStream+ + +[cols="1,2,10",options="header"] +|========================================================= +|Event | Parameters | Notes + +|+"open"+ | +fd+ | The file descriptor was opened. +|+"data"+ | +chunk+ | A chunk of data was read. +|+"error"+ | +err+ | An error occured. This stops the stream. +|+"end"+ | | The end of the file was reached. +|+"close"+ | | The file descriptor was closed. +|========================================================= + ++fs.createReadStream(path, [options]);+ :: +Returns a new FileReadStream object. ++ ++options+ is an object with the following defaults: ++ +---------------------------------------- +{ "flags": "r" +, "encoding": "binary" +, "mode": 0666 +, "bufferSize": 4 * 1024 +} +---------------------------------------- + ++readStream.readable+ :: +A boolean that is +true+ by default, but turns +false+ after an +"error"+ +occured, the stream came to an "end", or +forceClose()+ was called. + ++readStream.pause()+ :: +Stops the stream from reading further data. No +"data"+ event will be fired +until the stream is resumed. + ++readStream.resume()+ :: +Resumes the stream. Together with +pause()+ this useful to throttle reading. + ++readStream.forceClose()+ :: +Allows to close the stream before the +"end"+ is reached. No more events other +than +"close"+ will be fired after this method has been called. + == HTTP To use the HTTP server and client one must +require("http")+. From dbf9e466bc263cc08b217403cdf4a14a66f82935 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Fri, 5 Mar 2010 20:04:19 +0100 Subject: [PATCH 11/11] Documentation for FileWriteStream --- doc/api.txt | 39 +++++++++++++++++++++++++++++++++++++++ lib/fs.js | 1 - 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/doc/api.txt b/doc/api.txt index c4a0ec355c..c8d62c2b1a 100644 --- a/doc/api.txt +++ b/doc/api.txt @@ -824,6 +824,45 @@ Resumes the stream. Together with +pause()+ this useful to throttle reading. Allows to close the stream before the +"end"+ is reached. No more events other than +"close"+ will be fired after this method has been called. +=== +fs.FileWriteStream+ + +[cols="1,2,10",options="header"] +|========================================================= +|Event | Parameters | Notes + +|+"open"+ | +fd+ | The file descriptor was opened. +|+"drain"+ | | No more data needs to be written. +|+"error"+ | +err+ | An error occured. This stops the stream. +|+"close"+ | | The file descriptor was closed. +|========================================================= + ++fs.createWriteStream(path, [options]);+ :: +Returns a new FileWriteStream object. ++ ++options+ is an object with the following defaults: ++ +---------------------------------------- +{ "flags": "r" +, "encoding": "binary" +, "mode": 0666 +} +---------------------------------------- + ++writeStream.writeable+ :: +A boolean that is +true+ by default, but turns +false+ after an +"error"+ +occured or +close()+ / +forceClose()+ was called. + ++writeStream.write(data)+ :: +Returns +true+ if the data was flushed to the kernel, and +false+ if it was +queued up for being written later. A +"drain"+ will fire after all queued data +has been written. + ++writeStream.close()+ :: +Closes the stream right after all queued +write()+ calls have finished. + ++writeStream.forceClose()+ :: +Allows to close the stream regardless of its current state. + == HTTP To use the HTTP server and client one must +require("http")+. diff --git a/lib/fs.js b/lib/fs.js index 56c739389b..fa8ba89bda 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -576,7 +576,6 @@ var FileWriteStream = exports.FileWriteStream = function(path, options) { }); }; - flush(); }; sys.inherits(FileWriteStream, events.EventEmitter); \ No newline at end of file