diff --git a/doc/api.txt b/doc/api.txt index 1e7abcfb20..75b02d6009 100644 --- a/doc/api.txt +++ b/doc/api.txt @@ -785,6 +785,86 @@ Objects returned from +fs.stat()+ and +fs.lstat()+ are of this type. +stats.isSocket()+:: ... +=== +fs.FileReadStream+ + +[cols="1,2,10",options="header"] +|========================================================= +|Event | Parameters | Notes + +|+"open"+ | +fd+ | The file descriptor was opened. +|+"data"+ | +chunk+ | A chunk of data was read. +|+"error"+ | +err+ | An error occured. This stops the stream. +|+"end"+ | | The end of the file was reached. +|+"close"+ | | The file descriptor was closed. +|========================================================= + ++fs.createReadStream(path, [options]);+ :: +Returns a new FileReadStream object. ++ ++options+ is an object with the following defaults: ++ +---------------------------------------- +{ "flags": "r" +, "encoding": "binary" +, "mode": 0666 +, "bufferSize": 4 * 1024 +} +---------------------------------------- + ++readStream.readable+ :: +A boolean that is +true+ by default, but turns +false+ after an +"error"+ +occured, the stream came to an "end", or +forceClose()+ was called. + ++readStream.pause()+ :: +Stops the stream from reading further data. No +"data"+ event will be fired +until the stream is resumed. + ++readStream.resume()+ :: +Resumes the stream. Together with +pause()+ this useful to throttle reading. + ++readStream.forceClose()+ :: +Allows to close the stream before the +"end"+ is reached. No more events other +than +"close"+ will be fired after this method has been called. + +=== +fs.FileWriteStream+ + +[cols="1,2,10",options="header"] +|========================================================= +|Event | Parameters | Notes + +|+"open"+ | +fd+ | The file descriptor was opened. +|+"drain"+ | | No more data needs to be written. +|+"error"+ | +err+ | An error occured. This stops the stream. +|+"close"+ | | The file descriptor was closed. +|========================================================= + ++fs.createWriteStream(path, [options]);+ :: +Returns a new FileWriteStream object. ++ ++options+ is an object with the following defaults: ++ +---------------------------------------- +{ "flags": "r" +, "encoding": "binary" +, "mode": 0666 +} +---------------------------------------- + ++writeStream.writeable+ :: +A boolean that is +true+ by default, but turns +false+ after an +"error"+ +occured or +close()+ / +forceClose()+ was called. + ++writeStream.write(data)+ :: +Returns +true+ if the data was flushed to the kernel, and +false+ if it was +queued up for being written later. A +"drain"+ will fire after all queued data +has been written. + ++writeStream.close()+ :: +Closes the stream right after all queued +write()+ calls have finished. + ++writeStream.forceClose()+ :: +Allows to close the stream regardless of its current state. + == HTTP To use the HTTP server and client one must +require("http")+. diff --git a/lib/fs.js b/lib/fs.js index 3fd95de4d5..74e1e3ef85 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -1,3 +1,7 @@ +var + sys = require('sys'), + events = require('events'); + exports.Stats = process.Stats; process.Stats.prototype._checkModeProperty = function (property) { @@ -421,3 +425,203 @@ exports.realpath = function (path, callback) { } next(); } + +exports.createReadStream = function(path, options) { + return new FileReadStream(path, options); +}; + +var FileReadStream = exports.FileReadStream = function(path, options) { + events.EventEmitter.call(this); + + this.path = path; + this.fd = null; + this.readable = true; + this.paused = false; + + this.flags = 'r'; + this.encoding = 'binary'; + this.mode = 0666; + this.bufferSize = 4 * 1024; + + process.mixin(this, options || {}); + + var + self = this, + buffer = null; + + function read() { + if (!self.readable || self.paused) { + return; + } + + fs.read(self.fd, self.bufferSize, undefined, self.encoding, function(err, data, bytesRead) { + if (err) { + self.emit('error', err); + self.readable = false; + return; + } + + if (bytesRead === 0) { + self.emit('end'); + self.forceClose(); + return; + } + + // do not emit events if the stream is paused + if (self.paused) { + buffer = data; + return; + } + + // do not emit events anymore after we declared the stream unreadable + if (!self.readable) { + return; + } + + self.emit('data', data); + read(); + }); + } + + fs.open(this.path, this.flags, this.mode, function(err, fd) { + if (err) { + self.emit('error', err); + self.readable = false; + return; + } + + self.fd = fd; + self.emit('open', fd); + read(); + }); + + this.forceClose = function() { + this.readable = false; + fs.close(this.fd, function(err) { + if (err) { + self.emit('error', err); + return; + } + + self.emit('close'); + }); + }; + + this.pause = function() { + this.paused = true; + }; + + this.resume = function() { + this.paused = false; + + if (buffer !== null) { + self.emit('data', buffer); + buffer = null; + } + + read(); + }; +}; +sys.inherits(FileReadStream, events.EventEmitter); + +exports.createWriteStream = function(path, options) { + return new FileWriteStream(path, options); +}; + +var FileWriteStream = exports.FileWriteStream = function(path, options) { + events.EventEmitter.call(this); + + this.path = path; + this.fd = null; + this.writeable = true; + + this.flags = 'w'; + this.encoding = 'binary'; + this.mode = 0666; + + process.mixin(this, options || {}); + + var + self = this, + queue = [], + busy = false; + + queue.push([fs.open, this.path, this.flags, this.mode]); + + function flush() { + if (busy) { + return; + } + + var args = queue.shift(); + if (!args) { + return self.emit('drain'); + } + + busy = true; + + var method = args.shift(); + + args.push(function(err) { + busy = false; + + if (err) { + self.writeable = false; + self.emit('error', err); + return; + } + + // save reference for file pointer + if (method === fs.open) { + self.fd = arguments[1]; + self.emit('open', self.fd); + } + + // stop flushing after close + if (method === fs.close) { + self.emit('close'); + return; + } + + flush(); + }); + + // Inject the file pointer + if (method !== fs.open) { + args.unshift(self.fd); + } + + method.apply(null, args); + }; + + this.write = function(data) { + if (!this.writeable) { + throw new Error('stream not writeable'); + } + + queue.push([fs.write, data, undefined, this.encoding]); + flush(); + return false; + }; + + this.close = function() { + this.writeable = false; + queue.push([fs.close,]); + flush(); + }; + + this.forceClose = function() { + this.writeable = false; + fs.close(self.fd, function(err) { + if (err) { + self.emit('error', err); + return; + } + + self.emit('close'); + }); + }; + + flush(); +}; +sys.inherits(FileWriteStream, events.EventEmitter); \ No newline at end of file diff --git a/test/simple/test-file-read-stream.js b/test/simple/test-file-read-stream.js new file mode 100644 index 0000000000..447629e41b --- /dev/null +++ b/test/simple/test-file-read-stream.js @@ -0,0 +1,54 @@ +process.mixin(require('../common')); + +var + fn = path.join(fixturesDir, 'multipart.js'), + file = fs.createReadStream(fn), + + callbacks = { + open: -1, + end: -1, + close: -1 + }, + + paused = false, + + fileContent = ''; + +file + .addListener('open', function(fd) { + callbacks.open++; + assert.equal('number', typeof fd); + assert.ok(file.readable); + }) + .addListener('error', function(err) { + throw err; + }) + .addListener('data', function(data) { + assert.ok(!paused); + fileContent += data; + + paused = true; + file.pause(); + assert.ok(file.paused); + + setTimeout(function() { + paused = false; + file.resume(); + assert.ok(!file.paused); + }, 10); + }) + .addListener('end', function(chunk) { + callbacks.end++; + }) + .addListener('close', function() { + callbacks.close++; + assert.ok(!file.readable); + + assert.equal(fs.readFileSync(fn), fileContent); + }); + +process.addListener('exit', function() { + for (var k in callbacks) { + assert.equal(0, callbacks[k], k+' count off by '+callbacks[k]); + } +}); \ No newline at end of file diff --git a/test/simple/test-file-write-stream.js b/test/simple/test-file-write-stream.js new file mode 100644 index 0000000000..89059033ab --- /dev/null +++ b/test/simple/test-file-write-stream.js @@ -0,0 +1,50 @@ +process.mixin(require('../common')); + +var + fn = path.join(fixturesDir, "write.txt"), + file = fs.createWriteStream(fn), + + EXPECTED = '0123456789', + + callbacks = { + open: -1, + drain: -2, + close: -1 + }; + +file + .addListener('open', function(fd) { + callbacks.open++; + assert.equal('number', typeof fd); + }) + .addListener('error', function(err) { + throw err; + }) + .addListener('drain', function() { + callbacks.drain++; + if (callbacks.drain == -1) { + assert.equal(EXPECTED, fs.readFileSync(fn)); + file.write(EXPECTED); + } else if (callbacks.drain == 0) { + assert.equal(EXPECTED+EXPECTED, fs.readFileSync(fn)); + file.close(); + } + }) + .addListener('close', function() { + callbacks.close++; + assert.throws(function() { + file.write('should not work anymore'); + }); + + fs.unlinkSync(fn); + }); + +for (var i = 0; i < 10; i++) { + assert.strictEqual(false, file.write(i)); +} + +process.addListener('exit', function() { + for (var k in callbacks) { + assert.equal(0, callbacks[k], k+' count off by '+callbacks[k]); + } +}); \ No newline at end of file