429 lines
10 KiB
429 lines
10 KiB
'use strict';
var _ = require('lodash');
var async = require('async');
var $ = require('preconditions').singleton();
var log = require('npmlog');
log.debug = log.verbose;
var util = require('util');
var mongodb = require('mongodb');
var Model = require('./model');
var collections = {
WALLETS: 'wallets',
TXS: 'txs',
ADDRESSES: 'addresses',
NOTIFICATIONS: 'notifications',
COPAYERS_LOOKUP: 'copayers_lookup',
PREFERENCES: 'preferences',
EMAIL_QUEUE: 'email_queue',
var Storage = function(opts) {
opts = opts || {};
this.db = opts.db;
Storage.prototype._createIndexes = function() {
id: 1
copayerId: 1
walletId: 1,
id: 1,
walletId: 1,
isPending: 1,
walletId: 1,
id: 1,
walletId: 1
address: 1,
Storage.prototype.connect = function(opts, cb) {
var self = this;
opts = opts || {};
if (this.db) return cb();
var config = opts.mongoDb || {};
mongodb.MongoClient.connect(config.uri, function(err, db) {
if (err) {
log.error('Unable to connect to the mongoDB server on ', config.uri);
return cb(err);
self.db = db;
console.log('Connection established to ', config.uri);
return cb();
Storage.prototype.disconnect = function(cb) {
var self = this;
this.db.close(true, function(err) {
if (err) return cb(err);
self.db = null;
return cb();
Storage.prototype.fetchWallet = function(id, cb) {
id: id
}, function(err, result) {
if (err) return cb(err);
if (!result) return cb();
return cb(null, Model.Wallet.fromObj(result));
Storage.prototype.storeWallet = function(wallet, cb) {
id: wallet.id
}, wallet, {
w: 1,
upsert: true,
}, cb);
Storage.prototype.storeWalletAndUpdateCopayersLookup = function(wallet, cb) {
var self = this;
var copayerLookups = _.map(wallet.copayers, function(copayer) {
return {
copayerId: copayer.id,
walletId: wallet.id,
requestPubKey: copayer.requestPubKey,
walletId: wallet.id
}, {
w: 1
}, function(err) {
if (err) return cb(err);
self.db.collection(collections.COPAYERS_LOOKUP).insert(copayerLookups, {
w: 1
}, function(err) {
if (err) return cb(err);
return self.storeWallet(wallet, cb);
Storage.prototype.fetchCopayerLookup = function(copayerId, cb) {
copayerId: copayerId
}, function(err, result) {
if (err) return cb(err);
if (!result) return cb();
return cb(null, result);
// TODO: should be done client-side
Storage.prototype._completeTxData = function(walletId, txs, cb) {
var txList = [].concat(txs);
this.fetchWallet(walletId, function(err, wallet) {
if (err) return cb(err);
_.each(txList, function(tx) {
tx.creatorName = wallet.getCopayer(tx.creatorId).name;
_.each(tx.actions, function(action) {
action.copayerName = wallet.getCopayer(action.copayerId).name;
return cb(null, txs);
Storage.prototype.fetchTx = function(walletId, txProposalId, cb) {
var self = this;
id: txProposalId,
walletId: walletId
}, function(err, result) {
if (err) return cb(err);
if (!result) return cb();
return self._completeTxData(walletId, Model.TxProposal.fromObj(result), cb);
Storage.prototype.fetchPendingTxs = function(walletId, cb) {
var self = this;
walletId: walletId,
isPending: true
createdOn: -1
}).toArray(function(err, result) {
if (err) return cb(err);
if (!result) return cb();
var txs = _.map(result, function(tx) {
return Model.TxProposal.fromObj(tx);
return self._completeTxData(walletId, txs, cb);
* fetchTxs. Times are in UNIX EPOCH (seconds)
* @param walletId
* @param opts.minTs
* @param opts.maxTs
* @param opts.limit
Storage.prototype.fetchTxs = function(walletId, opts, cb) {
var self = this;
opts = opts || {};
var tsFilter = {};
if (_.isNumber(opts.minTs)) tsFilter.$gte = opts.minTs;
if (_.isNumber(opts.maxTs)) tsFilter.$lte = opts.maxTs;
var filter = {
walletId: walletId
if (!_.isEmpty(tsFilter)) filter.createdOn = tsFilter;
var mods = {};
if (_.isNumber(opts.limit)) mods.limit = opts.limit;
this.db.collection(collections.TXS).find(filter, mods).sort({
createdOn: -1
}).toArray(function(err, result) {
if (err) return cb(err);
if (!result) return cb();
var txs = _.map(result, function(tx) {
return Model.TxProposal.fromObj(tx);
return self._completeTxData(walletId, txs, cb);
* fetchNotifications
* @param walletId
* @param opts.minTs
* @param opts.maxTs
* @param opts.limit
* @param opts.reverse
Storage.prototype.fetchNotifications = function(walletId, opts, cb) {
var self = this;
opts = opts || {};
var tsFilter = {};
if (_.isNumber(opts.minTs)) tsFilter.$gte = opts.minTs;
if (_.isNumber(opts.maxTs)) tsFilter.$lte = opts.maxTs;
var filter = {
walletId: walletId
if (!_.isEmpty(tsFilter)) filter.createdOn = tsFilter;
var mods = {};
if (_.isNumber(opts.limit)) mods.limit = opts.limit;
this.db.collection(collections.NOTIFICATIONS).find(filter, mods).sort({
id: opts.reverse ? -1 : 1,
}).toArray(function(err, result) {
if (err) return cb(err);
if (!result) return cb();
var notifications = _.map(result, function(notification) {
return Model.Notification.fromObj(notification);
return cb(null, notifications);
// TODO: remove walletId from signature
Storage.prototype.storeNotification = function(walletId, notification, cb) {
this.db.collection(collections.NOTIFICATIONS).insert(notification, {
w: 1
}, cb);
// TODO: remove walletId from signature
Storage.prototype.storeTx = function(walletId, txp, cb) {
txp.isPending = txp.isPending(); // Persist attribute to use when querying
id: txp.id,
walletId: walletId
}, txp, {
w: 1,
upsert: true,
}, cb);
Storage.prototype.removeTx = function(walletId, txProposalId, cb) {
id: txProposalId,
walletId: walletId
}, {
w: 1
}, cb);
Storage.prototype.removeWallet = function(walletId, cb) {
var self = this;
function(next) {
id: walletId
}, next);
function(next) {
var otherCollections = _.without(_.values(collections), collections.WALLETS);
async.each(otherCollections, function(col, next) {
walletId: walletId
}, next);
}, next);
], cb);
Storage.prototype.fetchAddresses = function(walletId, cb) {
var self = this;
walletId: walletId,
createdOn: 1
}).toArray(function(err, result) {
if (err) return cb(err);
if (!result) return cb();
var addresses = _.map(result, function(address) {
return Model.Address.fromObj(address);
return cb(null, addresses);
Storage.prototype.storeAddressAndWallet = function(wallet, addresses, cb) {
var self = this;
var addresses = [].concat(addresses);
if (addresses.length == 0) return cb();
this.db.collection(collections.ADDRESSES).insert(addresses, {
w: 1
}, function(err) {
if (err) return cb(err);
self.storeWallet(wallet, cb);
Storage.prototype.fetchAddress = function(address, cb) {
var self = this;
address: address,
}, function(err, result) {
if (err) return cb(err);
if (!result) return cb();
return cb(null, Model.Address.fromObj(result));
Storage.prototype.fetchPreferences = function(walletId, copayerId, cb) {
walletId: walletId,
}).toArray(function(err, result) {
if (err) return cb(err);
if (copayerId) {
result = _.find(result, {
copayerId: copayerId
if (!result) return cb();
var preferences = _.map([].concat(result), function(r) {
return Model.Preferences.fromObj(r);
if (copayerId) {
preferences = preferences[0];
return cb(null, preferences);
Storage.prototype.storePreferences = function(preferences, cb) {
walletId: preferences.walletId,
copayerId: preferences.copayerId,
}, preferences, {
w: 1,
upsert: true,
}, cb);
Storage.prototype.storeEmail = function(email, cb) {
id: email.id,
}, email, {
w: 1,
upsert: true,
}, cb);
Storage.prototype.fetchUnsentEmails = function(cb) {
status: 'pending',
}).toArray(function(err, result) {
if (err) return cb(err);
if (!result || _.isEmpty(result)) return cb(null, []);
return cb(null, Model.Email.fromObj(result));
Storage.prototype._dump = function(cb, fn) {
fn = fn || console.log;
cb = cb || function() {};
var self = this;
this.db.collections(function(err, collections) {
if (err) return cb(err);
async.eachSeries(collections, function(col, next) {
col.find().toArray(function(err, items) {
fn('--------', col.s.name);
}, cb);
module.exports = Storage;