diff --git a/lib/fs.js b/lib/fs.js index b75b69abab..25ce376d5a 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -492,99 +492,107 @@ var FileReadStream = fs.FileReadStream = function(path, options) { options = options || {}; for (var i in options) this[i] = options[i]; - var - self = this, - buffer = null; + var self = this; - function read() { - if (!self.readable || self.paused) { + fs.open(this.path, this.flags, this.mode, function(err, fd) { + if (err) { + self.emit('error', err); + self.readable = false; return; } - fs.read(self.fd, self.bufferSize, undefined, self.encoding, function(err, data, bytesRead) { - if (err) { - self.emit('error', err); - self.readable = false; - return; - } - - if (bytesRead === 0) { - self.emit('end'); - self.forceClose(); - return; - } - - // do not emit events if the stream is paused - if (self.paused) { - buffer = data; - return; - } + self.fd = fd; + self.emit('open', fd); + self._read(); + }); +}; +sys.inherits(FileReadStream, events.EventEmitter); - // do not emit events anymore after we declared the stream unreadable - if (!self.readable) { - return; - } - self.emit('data', data); - read(); - }); - } +FileReadStream.prototype._read = function () { + var self = this; + if (!self.readable || self.paused) return; - fs.open(this.path, this.flags, this.mode, function(err, fd) { + fs.read(self.fd, + self.bufferSize, + undefined, + self.encoding, + function(err, data, bytesRead) { if (err) { self.emit('error', err); self.readable = false; return; } - self.fd = fd; - self.emit('open', fd); - read(); + if (bytesRead === 0) { + self.emit('end'); + self.forceClose(); + return; + } + + // do not emit events if the stream is paused + if (self.paused) { + self.buffer = data; + return; + } + + // do not emit events anymore after we declared the stream unreadable + if (!self.readable) { + return; + } + + self.emit('data', data); + self._read(); }); +}; - this.forceClose = function(cb) { - this.readable = false; - function close() { - fs.close(self.fd, function(err) { - if (err) { - if (cb) { - cb(err); - } - self.emit('error', err); - return; - } +FileReadStream.prototype.forceClose = function (cb) { + var self = this; + this.readable = false; + function close() { + fs.close(self.fd, function(err) { + if (err) { if (cb) { - cb(null); + cb(err); } - self.emit('close'); - }); - } + self.emit('error', err); + return; + } - if (this.fd) { - close(); - } else { - this.addListener('open', close); - } - }; + if (cb) { + cb(null); + } + self.emit('close'); + }); + } + + if (this.fd) { + close(); + } else { + this.addListener('open', close); + } +}; - this.pause = function() { - this.paused = true; - }; - this.resume = function() { - this.paused = false; +FileReadStream.prototype.pause = function() { + this.paused = true; +}; - if (buffer !== null) { - self.emit('data', buffer); - buffer = null; - } - read(); - }; +FileReadStream.prototype.resume = function() { + this.paused = false; + + if (this.buffer) { + this.emit('data', this.buffer); + this.buffer = null; + } + + this._read(); }; -sys.inherits(FileReadStream, events.EventEmitter); + + fs.createWriteStream = function(path, options) { return new FileWriteStream(path, options); @@ -604,105 +612,108 @@ var FileWriteStream = fs.FileWriteStream = function(path, options) { options = options || {}; for (var i in options) this[i] = options[i]; - var - self = this, - queue = [], - busy = false; + this.busy = false; + this._queue = []; - queue.push([fs.open, this.path, this.flags, this.mode, undefined]); + this._queue.push([fs.open, this.path, this.flags, this.mode, undefined]); + this.flush(); +}; +sys.inherits(FileWriteStream, events.EventEmitter); - function flush() { - if (busy) { - return; - } - var args = queue.shift(); - if (!args) { - return self.emit('drain'); - } - busy = true; +FileWriteStream.prototype.flush = function () { + if (this.busy) return; + var self = this; - var - method = args.shift(), - cb = args.pop(); + var args = this._queue.shift(); + if (!args) return self.emit('drain'); - args.push(function(err) { - busy = false; + this.busy = true; - if (err) { - self.writeable = false; - if (cb) { - cb(err); - } - self.emit('error', err); - return; - } + var + method = args.shift(), + cb = args.pop(); - // stop flushing after close - if (method === fs.close) { - if (cb) { - cb(null); - } - self.emit('close'); - return; - } + var self = this; - // save reference for file pointer - if (method === fs.open) { - self.fd = arguments[1]; - self.emit('open', self.fd); - } else if (cb) { - // write callback - cb(null, arguments[1]); + args.push(function(err) { + self.busy = false; + + if (err) { + self.writeable = false; + if (cb) { + cb(err); } + self.emit('error', err); + return; + } - flush(); - }); + // stop flushing after close + if (method === fs.close) { + if (cb) { + cb(null); + } + self.emit('close'); + return; + } - // Inject the file pointer - if (method !== fs.open) { - args.unshift(self.fd); + // save reference for file pointer + if (method === fs.open) { + self.fd = arguments[1]; + self.emit('open', self.fd); + } else if (cb) { + // write callback + cb(null, arguments[1]); } - method.apply(this, args); - }; + self.flush(); + }); - this.write = function(data, cb) { - if (!this.writeable) { - throw new Error('stream not writeable'); - } + // Inject the file pointer + if (method !== fs.open) { + args.unshift(self.fd); + } - queue.push([fs.write, data, undefined, this.encoding, cb]); - flush(); - return false; - }; + method.apply(this, args); +}; - this.close = function(cb) { - this.writeable = false; - queue.push([fs.close, cb]); - flush(); - }; - this.forceClose = function(cb) { - this.writeable = false; - fs.close(self.fd, function(err) { - if (err) { - if (cb) { - cb(err); - } +FileWriteStream.prototype.write = function(data, cb) { + if (!this.writeable) { + throw new Error('stream not writeable'); + } - self.emit('error', err); - return; - } + this._queue.push([fs.write, data, undefined, this.encoding, cb]); + this.flush(); + + return false; +}; + +FileWriteStream.prototype.close = function (cb) { + this.writeable = false; + this._queue.push([fs.close, cb]); + this.flush(); +}; + + +FileWriteStream.prototype.forceClose = function (cb) { + this.writeable = false; + fs.close(self.fd, function(err) { + if (err) { if (cb) { - cb(null); + cb(err); } - self.emit('close'); - }); - }; - flush(); + self.emit('error', err); + return; + } + + if (cb) { + cb(null); + } + self.emit('close'); + }); }; -sys.inherits(FileWriteStream, events.EventEmitter); +