From 9a9f08b1bc01edff0b66943334b2e34d3afb7016 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Sun, 7 Mar 2010 16:33:21 +0100 Subject: [PATCH] Add callbacks to stream methods Allows for more fine graining, especially finding out about an individual chunk of data being flushed in a write stream rather than the whole queue. This commit also fixes a bug causing forceClose to fail on a readStream that did not finish opening yet. --- doc/api.txt | 11 ++-- lib/fs.js | 75 +++++++++++++++++++-------- test/simple/test-file-read-stream.js | 9 +++- test/simple/test-file-write-stream.js | 20 +++++-- 4 files changed, 84 insertions(+), 31 deletions(-) diff --git a/doc/api.txt b/doc/api.txt index 6695699c14..a6ca6bf11a 100644 --- a/doc/api.txt +++ b/doc/api.txt @@ -823,7 +823,7 @@ until the stream is resumed. +readStream.resume()+ :: Resumes the stream. Together with +pause()+ this useful to throttle reading. -+readStream.forceClose()+ :: ++readStream.forceClose([callback])+ :: Allows to close the stream before the +"end"+ is reached. No more events other than +"close"+ will be fired after this method has been called. @@ -855,15 +855,18 @@ Returns a new FileWriteStream object. A boolean that is +true+ by default, but turns +false+ after an +"error"+ occured or +close()+ / +forceClose()+ was called. -+writeStream.write(data)+ :: ++writeStream.write(data, [callback])+ :: Returns +true+ if the data was flushed to the kernel, and +false+ if it was queued up for being written later. A +"drain"+ will fire after all queued data has been written. ++ +You can also specify +callback+ to be notified when the data from this write +has been flushed. The first param is +err+, the second is +bytesWritten+. -+writeStream.close()+ :: ++writeStream.close([callback])+ :: Closes the stream right after all queued +write()+ calls have finished. -+writeStream.forceClose()+ :: ++writeStream.forceClose([callback])+ :: Allows to close the stream regardless of its current state. == HTTP diff --git a/lib/fs.js b/lib/fs.js index 74e1e3ef85..5b57b3603f 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -495,16 +495,31 @@ var FileReadStream = exports.FileReadStream = function(path, options) { read(); }); - this.forceClose = function() { + this.forceClose = function(cb) { this.readable = false; - fs.close(this.fd, function(err) { - if (err) { - self.emit('error', err); - return; - } - self.emit('close'); - }); + function close() { + fs.close(self.fd, function(err) { + if (err) { + if (cb) { + cb(err); + } + self.emit('error', err); + return; + } + + if (cb) { + cb(null); + } + self.emit('close'); + }); + } + + if (this.fd) { + close(); + } else { + this.addListener('open', close); + } }; this.pause = function() { @@ -546,7 +561,7 @@ var FileWriteStream = exports.FileWriteStream = function(path, options) { queue = [], busy = false; - queue.push([fs.open, this.path, this.flags, this.mode]); + queue.push([fs.open, this.path, this.flags, this.mode, undefined]); function flush() { if (busy) { @@ -560,29 +575,40 @@ var FileWriteStream = exports.FileWriteStream = function(path, options) { busy = true; - var method = args.shift(); + var + method = args.shift(), + cb = args.pop(); args.push(function(err) { busy = false; if (err) { self.writeable = false; + if (cb) { + cb(err); + } self.emit('error', err); return; } - // save reference for file pointer - if (method === fs.open) { - self.fd = arguments[1]; - self.emit('open', self.fd); - } - // stop flushing after close if (method === fs.close) { + if (cb) { + cb(null); + } self.emit('close'); return; } + // save reference for file pointer + if (method === fs.open) { + self.fd = arguments[1]; + self.emit('open', self.fd); + } else if (cb) { + // write callback + cb(null, arguments[1]); + } + flush(); }); @@ -594,30 +620,37 @@ var FileWriteStream = exports.FileWriteStream = function(path, options) { method.apply(null, args); }; - this.write = function(data) { + this.write = function(data, cb) { if (!this.writeable) { throw new Error('stream not writeable'); } - queue.push([fs.write, data, undefined, this.encoding]); + queue.push([fs.write, data, undefined, this.encoding, cb]); flush(); return false; }; - this.close = function() { + this.close = function(cb) { this.writeable = false; - queue.push([fs.close,]); + queue.push([fs.close, cb]); flush(); }; - this.forceClose = function() { + this.forceClose = function(cb) { this.writeable = false; fs.close(self.fd, function(err) { if (err) { + if (cb) { + cb(err); + } + self.emit('error', err); return; } + if (cb) { + cb(null); + } self.emit('close'); }); }; diff --git a/test/simple/test-file-read-stream.js b/test/simple/test-file-read-stream.js index 447629e41b..445957f6dc 100644 --- a/test/simple/test-file-read-stream.js +++ b/test/simple/test-file-read-stream.js @@ -7,7 +7,8 @@ var callbacks = { open: -1, end: -1, - close: -1 + close: -1, + forceClose: -1 }, paused = false, @@ -47,6 +48,12 @@ file assert.equal(fs.readFileSync(fn), fileContent); }); +var file2 = fs.createReadStream(fn); +file2.forceClose(function(err) { + assert.ok(!err); + callbacks.forceClose++; +}); + process.addListener('exit', function() { for (var k in callbacks) { assert.equal(0, callbacks[k], k+' count off by '+callbacks[k]); diff --git a/test/simple/test-file-write-stream.js b/test/simple/test-file-write-stream.js index 89059033ab..bb74db58a2 100644 --- a/test/simple/test-file-write-stream.js +++ b/test/simple/test-file-write-stream.js @@ -4,12 +4,14 @@ var fn = path.join(fixturesDir, "write.txt"), file = fs.createWriteStream(fn), - EXPECTED = '0123456789', + EXPECTED = '012345678910', callbacks = { open: -1, drain: -2, - close: -1 + close: -1, + closeCb: -1, + write: -11, }; file @@ -27,7 +29,10 @@ file file.write(EXPECTED); } else if (callbacks.drain == 0) { assert.equal(EXPECTED+EXPECTED, fs.readFileSync(fn)); - file.close(); + file.close(function(err) { + assert.ok(!err); + callbacks.closeCb++; + }); } }) .addListener('close', function() { @@ -39,8 +44,13 @@ file fs.unlinkSync(fn); }); -for (var i = 0; i < 10; i++) { - assert.strictEqual(false, file.write(i)); +for (var i = 0; i < 11; i++) { + (function(i) { + assert.strictEqual(false, file.write(i, function(err, bytesWritten) { + callbacks.write++; + assert.equal(new String(i).length, bytesWritten); + })); + })(i); } process.addListener('exit', function() {