You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

148 lines
3.8 KiB

// Basically just a wrapper around an fs.ReadStream
module.exports = FileReader
var fs = require("graceful-fs")
, fstream = require("../fstream.js")
, Reader = fstream.Reader
, inherits = require("inherits")
, mkdir = require("mkdirp")
, Reader = require("./reader.js")
, EOF = {EOF: true}
, CLOSE = {CLOSE: true}
inherits(FileReader, Reader)
function FileReader (props) {
// console.error(" FR create", props.path, props.size, new Error().stack)
var me = this
if (!(me instanceof FileReader)) throw new Error(
"FileReader must be called as constructor.")
// should already be established as a File type
// XXX Todo: preserve hardlinks by tracking dev+inode+nlink,
// with a HardLinkReader class.
if (!((props.type === "Link" && props.Link) ||
(props.type === "File" && props.File))) {
throw new Error("Non-file type "+ props.type)
}
me._buffer = []
me._bytesEmitted = 0
Reader.call(me, props)
}
FileReader.prototype._getStream = function () {
var me = this
, stream = me._stream = fs.createReadStream(me._path, me.props)
if (me.props.blksize) {
stream.bufferSize = me.props.blksize
}
stream.on("open", me.emit.bind(me, "open"))
stream.on("data", function (c) {
// console.error("\t\t%d %s", c.length, me.basename)
me._bytesEmitted += c.length
// no point saving empty chunks
if (!c.length) return
else if (me._paused || me._buffer.length) {
me._buffer.push(c)
me._read()
} else me.emit("data", c)
})
stream.on("end", function () {
if (me._paused || me._buffer.length) {
// console.error("FR Buffering End", me._path)
me._buffer.push(EOF)
me._read()
} else {
me.emit("end")
}
if (me._bytesEmitted !== me.props.size) {
me.error("Didn't get expected byte count\n"+
"expect: "+me.props.size + "\n" +
"actual: "+me._bytesEmitted)
}
})
stream.on("close", function () {
if (me._paused || me._buffer.length) {
// console.error("FR Buffering Close", me._path)
me._buffer.push(CLOSE)
me._read()
} else {
// console.error("FR close 1", me._path)
me.emit("close")
}
})
me._read()
}
FileReader.prototype._read = function () {
var me = this
// console.error("FR _read", me._path)
if (me._paused) {
// console.error("FR _read paused", me._path)
return
}
if (!me._stream) {
// console.error("FR _getStream calling", me._path)
return me._getStream()
}
// clear out the buffer, if there is one.
if (me._buffer.length) {
// console.error("FR _read has buffer", me._buffer.length, me._path)
var buf = me._buffer
for (var i = 0, l = buf.length; i < l; i ++) {
var c = buf[i]
if (c === EOF) {
// console.error("FR Read emitting buffered end", me._path)
me.emit("end")
} else if (c === CLOSE) {
// console.error("FR Read emitting buffered close", me._path)
me.emit("close")
} else {
// console.error("FR Read emitting buffered data", me._path)
me.emit("data", c)
}
if (me._paused) {
// console.error("FR Read Re-pausing at "+i, me._path)
me._buffer = buf.slice(i)
return
}
}
me._buffer.length = 0
}
// console.error("FR _read done")
// that's about all there is to it.
}
FileReader.prototype.pause = function (who) {
var me = this
// console.error("FR Pause", me._path)
if (me._paused) return
who = who || me
me._paused = true
if (me._stream) me._stream.pause()
me.emit("pause", who)
}
FileReader.prototype.resume = function (who) {
var me = this
// console.error("FR Resume", me._path)
if (!me._paused) return
who = who || me
me.emit("resume", who)
me._paused = false
if (me._stream) me._stream.resume()
me._read()
}