You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

80 lines
1.6 KiB

'use strict';
function or(option, alternate, required) {
var result = option === false ? false : option || alternate;
if ((required && !result) || (result && typeof result !== 'string')) {
throw new TypeError(alternate + 'Event must be a string.');
}
return result;
}
module.exports = function create(Observable) {
return function (stream, opts) {
opts = opts || {};
var complete = false;
var dataListeners = [];
var awaited = opts.await;
var dataEvent = or(opts.dataEvent, 'data', true);
var errorEvent = or(opts.errorEvent, 'error');
var endEvent = or(opts.endEvent, 'end');
function cleanup() {
complete = true;
dataListeners.forEach(function (listener) {
stream.removeListener(dataEvent, listener);
});
dataListeners = null;
}
var completion = new Promise(function (resolve, reject) {
function onEnd(result) {
if (awaited) {
awaited.then(resolve);
} else {
resolve(result);
}
}
if (endEvent) {
stream.once(endEvent, onEnd);
} else if (awaited) {
onEnd();
}
if (errorEvent) {
stream.once(errorEvent, reject);
}
if (awaited) {
awaited.catch(reject);
}
}).catch(function (err) {
cleanup();
throw err;
}).then(function (result) {
cleanup();
return result;
});
return new Observable(function (observer) {
if (!complete) {
var onData = function onData(data) {
observer.next(data);
};
stream.on(dataEvent, onData);
dataListeners.push(onData);
}
completion
.catch(function (err) {
observer.error(err);
})
.then(function (result) {
observer.complete(result);
});
});
};
};