diff --git a/lib/storage.js b/lib/storage.js index f2bed0b..19fe047 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -1,121 +1,86 @@ 'use strict'; var _ = require('lodash'); -var levelup = require('levelup'); -var multilevel = require('multilevel'); -var net = require('net'); var async = require('async'); var $ = require('preconditions').singleton(); var log = require('npmlog'); -var util = require('util'); log.debug = log.verbose; log.disableColor(); +var util = require('util'); + +var mongodb = require('mongodb'); -var Wallet = require('./model/wallet'); -var Copayer = require('./model/copayer'); -var Address = require('./model/address'); -var TxProposal = require('./model/txproposal'); -var Notification = require('./model/notification'); +var Model = require('./model'); + +var collections = { + WALLETS: 'wallets', + TXS: 'txs', + ADDRESSES: 'addresses', + NOTIFICATIONS: 'notifications', +}; var Storage = function(opts) { opts = opts || {}; this.db = opts.db; if (!this.db) { - if (opts.multiLevel) { - this.db = multilevel.client(); - var con = net.connect(opts.multiLevel); - con.pipe(this.db.createRpcStream()).pipe(con); - log.info('Using multilevel server:' + opts.multiLevel.host + ':' + opts.multiLevel.port); - } else { - this.db = levelup(opts.dbPath || './db/bws.db', { - valueEncoding: 'json' - }); - } + var url = 'mongodb://localhost:27017/bws'; + mongodb.MongoClient.connect(url, function(err, db) { + if (err) { + log.error('Unable to connect to the mongoDB server. Error:', err); + return; + } + this.db = db; + console.log('Connection established to ', url); + }); } }; -var zeroPad = function(x, length) { - return _.padLeft(parseInt(x), length, '0'); -}; - -var walletPrefix = function(id) { - return 'w!' + id; -}; - -var opKey = function(key) { - return key ? '!' + key : ''; -}; - -var MAX_TS = _.repeat('9', 14); - - -var KEY = { - WALLET: function(walletId) { - return walletPrefix(walletId) + '!main'; - }, - COPAYER: function(id) { - return 'copayer!' + id; - }, - TXP: function(walletId, txProposalId) { - return walletPrefix(walletId) + '!txp' + opKey(txProposalId); - }, - NOTIFICATION: function(walletId, notificationId) { - return walletPrefix(walletId) + '!not' + opKey(notificationId); - }, - PENDING_TXP: function(walletId, txProposalId) { - return walletPrefix(walletId) + '!ptxp' + opKey(txProposalId); - }, - ADDRESS: function(walletId, address) { - return walletPrefix(walletId) + '!addr' + opKey(address); - }, -}; - Storage.prototype.fetchWallet = function(id, cb) { - this.db.get(KEY.WALLET(id), function(err, data) { - if (err) { - if (err.notFound) return cb(); - return cb(err); - } - return cb(null, Wallet.fromObj(data)); + this.db.collection(collections.WALLETS).findOne({ + id: id + }, function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + return cb(null, Model.Wallet.fromObj(result)); }); }; Storage.prototype.storeWallet = function(wallet, cb) { - this.db.put(KEY.WALLET(wallet.id), wallet, cb); + this.db.collection(collections.WALLETS).update({ + id: wallet.id + }, wallet, { + w: 1, + upsert: true, + }, cb); }; Storage.prototype.storeWalletAndUpdateCopayersLookup = function(wallet, cb) { - var ops = []; - ops.push({ - type: 'put', - key: KEY.WALLET(wallet.id), - value: wallet - }); - _.each(wallet.copayers, function(copayer) { - var value = { - walletId: wallet.id, - requestPubKey: copayer.requestPubKey, - }; - ops.push({ - type: 'put', - key: KEY.COPAYER(copayer.id), - value: value - }); - }); - this.db.batch(ops, cb); + return this.storeWallet(wallet, cb); }; Storage.prototype.fetchCopayerLookup = function(copayerId, cb) { - this.db.get(KEY.COPAYER(copayerId), function(err, data) { - if (err) { - if (err.notFound) return cb(); - return cb(err); - } - return cb(null, data); + this.db.collection(collections.WALLETS).findOne({ + 'copayers.id': copayerId + }, { + fields: { + id: 1, + copayers: 1, + }, + }, function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + var copayer = _.find(result.copayers, { + id: copayerId + }); + return cb(null, { + walletId: result.id, + requestPubKey: copayer.requestPubKey, + }); }); }; +// TODO: should be done client-side Storage.prototype._completeTxData = function(walletId, txs, cb) { var txList = [].concat(txs); this.fetchWallet(walletId, function(err, wallet) { @@ -132,12 +97,14 @@ Storage.prototype._completeTxData = function(walletId, txs, cb) { Storage.prototype.fetchTx = function(walletId, txProposalId, cb) { var self = this; - this.db.get(KEY.TXP(walletId, txProposalId), function(err, data) { - if (err) { - if (err.notFound) return cb(); - return cb(err); - } - return self._completeTxData(walletId, TxProposal.fromObj(data), cb); + + this.db.collection(collections.TXS).findOne({ + id: txProposalId, + walletId: walletId + }, function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + return self._completeTxData(walletId, Model.TxProposal.fromObj(result), cb); }); }; @@ -145,22 +112,19 @@ Storage.prototype.fetchTx = function(walletId, txProposalId, cb) { Storage.prototype.fetchPendingTxs = function(walletId, cb) { var self = this; - var txs = []; - var key = KEY.PENDING_TXP(walletId); - this.db.createReadStream({ - gte: key, - lt: key + '~' - }) - .on('data', function(data) { - txs.push(TxProposal.fromObj(data.value)); - }) - .on('error', function(err) { - if (err.notFound) return cb(); - return cb(err); - }) - .on('end', function() { - return self._completeTxData(walletId, txs, cb); + this.db.collection(collections.TXS).find({ + walletId: walletId, + isPending: true + }).sort({ + createdOn: -1 + }).toArray(function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + var txs = _.map(result, function(tx) { + return Model.TxProposal.fromObj(tx); }); + return self._completeTxData(walletId, txs, cb); + }); }; /** @@ -174,31 +138,30 @@ Storage.prototype.fetchPendingTxs = function(walletId, cb) { Storage.prototype.fetchTxs = function(walletId, opts, cb) { var self = this; - var txs = []; opts = opts || {}; - opts.limit = _.isNumber(opts.limit) ? parseInt(opts.limit) : -1; - opts.minTs = _.isNumber(opts.minTs) ? zeroPad(opts.minTs, 11) : 0; - opts.maxTs = _.isNumber(opts.maxTs) ? zeroPad(opts.maxTs, 11) : MAX_TS; - - var key = KEY.TXP(walletId, opts.minTs); - var endkey = KEY.TXP(walletId, opts.maxTs); - - this.db.createReadStream({ - gt: key, - lt: endkey + '~', - reverse: true, - limit: opts.limit, - }) - .on('data', function(data) { - txs.push(TxProposal.fromObj(data.value)); - }) - .on('error', function(err) { - if (err.notFound) return cb(); - return cb(err); - }) - .on('end', function() { - return self._completeTxData(walletId, txs, cb); + + var tsFilter = {}; + if (_.isNumber(opts.minTs)) tsFilter.$gte = opts.minTs; + if (_.isNumber(opts.maxTs)) tsFilter.$lte = opts.maxTs; + + var filter = { + walletId: walletId + }; + if (!_.isEmpty(tsFilter)) filter.createdOn = tsFilter; + + var mods = {}; + if (_.isNumber(opts.limit)) mods.limit = opts.limit; + + this.db.collection(collections.TXS).find(filter, mods).sort({ + createdOn: -1 + }).toArray(function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + var txs = _.map(result, function(tx) { + return Model.TxProposal.fromObj(tx); }); + return self._completeTxData(walletId, txs, cb); + }); }; @@ -209,183 +172,140 @@ Storage.prototype.fetchTxs = function(walletId, opts, cb) { * @param opts.minTs * @param opts.maxTs * @param opts.limit + * @param opts.reverse */ Storage.prototype.fetchNotifications = function(walletId, opts, cb) { - var txs = []; + var self = this; + opts = opts || {}; - opts.limit = _.isNumber(opts.limit) ? parseInt(opts.limit) : -1; - opts.minTs = _.isNumber(opts.minTs) ? zeroPad(opts.minTs, 11) : 0; - opts.maxTs = _.isNumber(opts.maxTs) ? zeroPad(opts.maxTs, 11) : MAX_TS; - - var key = KEY.NOTIFICATION(walletId, opts.minTs); - var endkey = KEY.NOTIFICATION(walletId, opts.maxTs); - - this.db.createReadStream({ - gt: key, - lt: endkey + '~', - reverse: opts.reverse, - limit: opts.limit, - }) - .on('data', function(data) { - txs.push(Notification.fromObj(data.value)); - }) - .on('error', function(err) { - if (err.notFound) return cb(); - return cb(err); - }) - .on('end', function() { - return cb(null, txs); - }); -}; + var tsFilter = {}; + if (_.isNumber(opts.minTs)) tsFilter.$gte = opts.minTs; + if (_.isNumber(opts.maxTs)) tsFilter.$lte = opts.maxTs; -Storage.prototype.storeNotification = function(walletId, notification, cb) { - this.db.put(KEY.NOTIFICATION(walletId, notification.id), notification, cb); -}; + var filter = { + walletId: walletId + }; + if (!_.isEmpty(tsFilter)) filter.createdOn = tsFilter; + var mods = {}; + if (_.isNumber(opts.limit)) mods.limit = opts.limit; -// TODO should we store only txp.id on keys for indexing -// or the whole txp? For now, the entire record makes sense -// (faster + easier to access) -Storage.prototype.storeTx = function(walletId, txp, cb) { - var ops = [{ - type: 'put', - key: KEY.TXP(walletId, txp.id), - value: txp, - }]; - - if (txp.isPending()) { - ops.push({ - type: 'put', - key: KEY.PENDING_TXP(walletId, txp.id), - value: txp, - }); - } else { - ops.push({ - type: 'del', - key: KEY.PENDING_TXP(walletId, txp.id), + this.db.collection(collections.NOTIFICATIONS).find(filter, mods).sort({ + id: opts.reverse ? -1 : 1, + }).toArray(function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + var notifications = _.map(result, function(notification) { + return Model.Notification.fromObj(notification); }); - } - this.db.batch(ops, cb); + return cb(null, notifications); + }); }; -Storage.prototype.removeTx = function(walletId, txProposalId, cb) { - var ops = [{ - type: 'del', - key: KEY.TXP(walletId, txProposalId), - }, { - type: 'del', - key: KEY.PENDING_TXP(walletId, txProposalId), - }]; - this.db.batch(ops, cb); +// TODO: remove walletId from signature +Storage.prototype.storeNotification = function(walletId, notification, cb) { + this.db.collection(collections.NOTIFICATIONS).insert(notification, { + w: 1 + }, cb); }; -Storage.prototype._delByKey = function(key, cb) { - var self = this; - var keys = []; - this.db.createKeyStream({ - gte: key, - lt: key + '~', - }) - .on('data', function(key) { - keys.push(key); - }) - .on('error', function(err) { - if (err.notFound) return cb(); - return cb(err); - }) - .on('end', function(err) { - self.db.batch(_.map(keys, function(k) { - return { - key: k, - type: 'del' - }; - }), function(err) { - return cb(err); - }); - }); +// TODO: remove walletId from signature +Storage.prototype.storeTx = function(walletId, txp, cb) { + txp.isPending = txp.isPending(); // Persist attribute to use when querying + this.db.collection(collections.TXS).update({ + id: txp.id, + walletId: walletId + }, txp, { + w: 1, + upsert: true, + }, cb); }; -Storage.prototype._removeCopayers = function(walletId, cb) { - var self = this; - - this.fetchWallet(walletId, function(err, w) { - if (err || !w) return cb(err); - - self.db.batch(_.map(w.copayers, function(c) { - return { - type: 'del', - key: KEY.COPAYER(c.id), - }; - }), cb); - }); +Storage.prototype.removeTx = function(walletId, txProposalId, cb) { + this.db.collection(collections.TXS).findAndRemove({ + id: txProposalId, + walletId: walletId + }, { + w: 1 + }, cb); }; Storage.prototype.removeWallet = function(walletId, cb) { var self = this; - async.series([ + async.parallel([ function(next) { - // This should be the first step. Will check the wallet exists - self._removeCopayers(walletId, next); + self.db.collection(collections.WALLETS).findAndRemove({ + id: walletId + }, next); + }, + function(next) { + self.db.collection(collections.ADDRESSES).remove({ + walletId: walletId + }, next); + }, + function(next) { + self.db.collection(collections.TXS).remove({ + walletId: walletId + }, next); }, function(next) { - self._delByKey(walletPrefix(walletId), cb); + self.db.collection(collections.NOTIFICATIONS).remove({ + walletId: walletId + }, next); }, ], cb); }; Storage.prototype.fetchAddresses = function(walletId, cb) { - var addresses = []; - var key = KEY.ADDRESS(walletId); - this.db.createReadStream({ - gte: key, - lt: key + '~' - }) - .on('data', function(data) { - addresses.push(Address.fromObj(data.value)); - }) - .on('error', function(err) { - if (err.notFound) return cb(); - return cb(err); - }) - .on('end', function() { - return cb(null, addresses); + var self = this; + + this.db.collection(collections.ADDRESSES).find({ + walletId: walletId, + }).sort({ + createdOn: 1 + }).toArray(function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + var addresses = _.map(result, function(address) { + return Model.Address.fromObj(address); }); + return cb(null, addresses); + }); }; Storage.prototype.storeAddressAndWallet = function(wallet, addresses, cb) { - var ops = _.map([].concat(addresses), function(address) { - return { - type: 'put', - key: KEY.ADDRESS(wallet.id, address.address), - value: address, - }; - }); - ops.unshift({ - type: 'put', - key: KEY.WALLET(wallet.id), - value: wallet, + var self = this; + var addresses = [].concat(addresses); + if (addresses.length == 0) return cb(); + this.db.collection(collections.ADDRESSES).insert(addresses, { + w: 1 + }, function(err) { + if (err) return cb(err); + self.storeWallet(wallet, cb); }); - - this.db.batch(ops, cb); }; Storage.prototype._dump = function(cb, fn) { fn = fn || console.log; + cb = cb || function() {}; - this.db.readStream() - .on('data', function(data) { - fn(util.inspect(data, { - depth: 10 - })); - }) - .on('end', function() { - if (cb) return cb(); - }); + var self = this; + this.db.collections(function(err, collections) { + if (err) return cb(err); + async.eachSeries(collections, function(col, next) { + col.find().toArray(function(err, items) { + fn('--------', col.s.name); + fn(items); + fn('------------------------------------------------------------------\n\n'); + next(err); + }); + }, cb); + }); }; module.exports = Storage; diff --git a/lib/storage_leveldb.js b/lib/storage_leveldb.js new file mode 100644 index 0000000..f2bed0b --- /dev/null +++ b/lib/storage_leveldb.js @@ -0,0 +1,391 @@ +'use strict'; + +var _ = require('lodash'); +var levelup = require('levelup'); +var multilevel = require('multilevel'); +var net = require('net'); +var async = require('async'); +var $ = require('preconditions').singleton(); +var log = require('npmlog'); +var util = require('util'); +log.debug = log.verbose; +log.disableColor(); + +var Wallet = require('./model/wallet'); +var Copayer = require('./model/copayer'); +var Address = require('./model/address'); +var TxProposal = require('./model/txproposal'); +var Notification = require('./model/notification'); + +var Storage = function(opts) { + opts = opts || {}; + this.db = opts.db; + + if (!this.db) { + if (opts.multiLevel) { + this.db = multilevel.client(); + var con = net.connect(opts.multiLevel); + con.pipe(this.db.createRpcStream()).pipe(con); + log.info('Using multilevel server:' + opts.multiLevel.host + ':' + opts.multiLevel.port); + } else { + this.db = levelup(opts.dbPath || './db/bws.db', { + valueEncoding: 'json' + }); + } + } +}; + +var zeroPad = function(x, length) { + return _.padLeft(parseInt(x), length, '0'); +}; + +var walletPrefix = function(id) { + return 'w!' + id; +}; + +var opKey = function(key) { + return key ? '!' + key : ''; +}; + +var MAX_TS = _.repeat('9', 14); + + +var KEY = { + WALLET: function(walletId) { + return walletPrefix(walletId) + '!main'; + }, + COPAYER: function(id) { + return 'copayer!' + id; + }, + TXP: function(walletId, txProposalId) { + return walletPrefix(walletId) + '!txp' + opKey(txProposalId); + }, + NOTIFICATION: function(walletId, notificationId) { + return walletPrefix(walletId) + '!not' + opKey(notificationId); + }, + PENDING_TXP: function(walletId, txProposalId) { + return walletPrefix(walletId) + '!ptxp' + opKey(txProposalId); + }, + ADDRESS: function(walletId, address) { + return walletPrefix(walletId) + '!addr' + opKey(address); + }, +}; + +Storage.prototype.fetchWallet = function(id, cb) { + this.db.get(KEY.WALLET(id), function(err, data) { + if (err) { + if (err.notFound) return cb(); + return cb(err); + } + return cb(null, Wallet.fromObj(data)); + }); +}; + +Storage.prototype.storeWallet = function(wallet, cb) { + this.db.put(KEY.WALLET(wallet.id), wallet, cb); +}; + +Storage.prototype.storeWalletAndUpdateCopayersLookup = function(wallet, cb) { + var ops = []; + ops.push({ + type: 'put', + key: KEY.WALLET(wallet.id), + value: wallet + }); + _.each(wallet.copayers, function(copayer) { + var value = { + walletId: wallet.id, + requestPubKey: copayer.requestPubKey, + }; + ops.push({ + type: 'put', + key: KEY.COPAYER(copayer.id), + value: value + }); + }); + this.db.batch(ops, cb); +}; + +Storage.prototype.fetchCopayerLookup = function(copayerId, cb) { + this.db.get(KEY.COPAYER(copayerId), function(err, data) { + if (err) { + if (err.notFound) return cb(); + return cb(err); + } + return cb(null, data); + }); +}; + +Storage.prototype._completeTxData = function(walletId, txs, cb) { + var txList = [].concat(txs); + this.fetchWallet(walletId, function(err, wallet) { + if (err) return cb(err); + _.each(txList, function(tx) { + tx.creatorName = wallet.getCopayer(tx.creatorId).name; + _.each(tx.actions, function(action) { + action.copayerName = wallet.getCopayer(action.copayerId).name; + }); + }); + return cb(null, txs); + }); +}; + +Storage.prototype.fetchTx = function(walletId, txProposalId, cb) { + var self = this; + this.db.get(KEY.TXP(walletId, txProposalId), function(err, data) { + if (err) { + if (err.notFound) return cb(); + return cb(err); + } + return self._completeTxData(walletId, TxProposal.fromObj(data), cb); + }); +}; + + +Storage.prototype.fetchPendingTxs = function(walletId, cb) { + var self = this; + + var txs = []; + var key = KEY.PENDING_TXP(walletId); + this.db.createReadStream({ + gte: key, + lt: key + '~' + }) + .on('data', function(data) { + txs.push(TxProposal.fromObj(data.value)); + }) + .on('error', function(err) { + if (err.notFound) return cb(); + return cb(err); + }) + .on('end', function() { + return self._completeTxData(walletId, txs, cb); + }); +}; + +/** + * fetchTxs. Times are in UNIX EPOCH (seconds) + * + * @param walletId + * @param opts.minTs + * @param opts.maxTs + * @param opts.limit + */ +Storage.prototype.fetchTxs = function(walletId, opts, cb) { + var self = this; + + var txs = []; + opts = opts || {}; + opts.limit = _.isNumber(opts.limit) ? parseInt(opts.limit) : -1; + opts.minTs = _.isNumber(opts.minTs) ? zeroPad(opts.minTs, 11) : 0; + opts.maxTs = _.isNumber(opts.maxTs) ? zeroPad(opts.maxTs, 11) : MAX_TS; + + var key = KEY.TXP(walletId, opts.minTs); + var endkey = KEY.TXP(walletId, opts.maxTs); + + this.db.createReadStream({ + gt: key, + lt: endkey + '~', + reverse: true, + limit: opts.limit, + }) + .on('data', function(data) { + txs.push(TxProposal.fromObj(data.value)); + }) + .on('error', function(err) { + if (err.notFound) return cb(); + return cb(err); + }) + .on('end', function() { + return self._completeTxData(walletId, txs, cb); + }); +}; + + +/** + * fetchNotifications + * + * @param walletId + * @param opts.minTs + * @param opts.maxTs + * @param opts.limit + */ +Storage.prototype.fetchNotifications = function(walletId, opts, cb) { + var txs = []; + opts = opts || {}; + opts.limit = _.isNumber(opts.limit) ? parseInt(opts.limit) : -1; + opts.minTs = _.isNumber(opts.minTs) ? zeroPad(opts.minTs, 11) : 0; + opts.maxTs = _.isNumber(opts.maxTs) ? zeroPad(opts.maxTs, 11) : MAX_TS; + + var key = KEY.NOTIFICATION(walletId, opts.minTs); + var endkey = KEY.NOTIFICATION(walletId, opts.maxTs); + + this.db.createReadStream({ + gt: key, + lt: endkey + '~', + reverse: opts.reverse, + limit: opts.limit, + }) + .on('data', function(data) { + txs.push(Notification.fromObj(data.value)); + }) + .on('error', function(err) { + if (err.notFound) return cb(); + return cb(err); + }) + .on('end', function() { + return cb(null, txs); + }); +}; + + +Storage.prototype.storeNotification = function(walletId, notification, cb) { + this.db.put(KEY.NOTIFICATION(walletId, notification.id), notification, cb); +}; + + +// TODO should we store only txp.id on keys for indexing +// or the whole txp? For now, the entire record makes sense +// (faster + easier to access) +Storage.prototype.storeTx = function(walletId, txp, cb) { + var ops = [{ + type: 'put', + key: KEY.TXP(walletId, txp.id), + value: txp, + }]; + + if (txp.isPending()) { + ops.push({ + type: 'put', + key: KEY.PENDING_TXP(walletId, txp.id), + value: txp, + }); + } else { + ops.push({ + type: 'del', + key: KEY.PENDING_TXP(walletId, txp.id), + }); + } + this.db.batch(ops, cb); +}; + +Storage.prototype.removeTx = function(walletId, txProposalId, cb) { + var ops = [{ + type: 'del', + key: KEY.TXP(walletId, txProposalId), + }, { + type: 'del', + key: KEY.PENDING_TXP(walletId, txProposalId), + }]; + + this.db.batch(ops, cb); +}; + +Storage.prototype._delByKey = function(key, cb) { + var self = this; + var keys = []; + this.db.createKeyStream({ + gte: key, + lt: key + '~', + }) + .on('data', function(key) { + keys.push(key); + }) + .on('error', function(err) { + if (err.notFound) return cb(); + return cb(err); + }) + .on('end', function(err) { + self.db.batch(_.map(keys, function(k) { + return { + key: k, + type: 'del' + }; + }), function(err) { + return cb(err); + }); + }); +}; + +Storage.prototype._removeCopayers = function(walletId, cb) { + var self = this; + + this.fetchWallet(walletId, function(err, w) { + if (err || !w) return cb(err); + + self.db.batch(_.map(w.copayers, function(c) { + return { + type: 'del', + key: KEY.COPAYER(c.id), + }; + }), cb); + }); +}; + +Storage.prototype.removeWallet = function(walletId, cb) { + var self = this; + + async.series([ + + function(next) { + // This should be the first step. Will check the wallet exists + self._removeCopayers(walletId, next); + }, + function(next) { + self._delByKey(walletPrefix(walletId), cb); + }, + ], cb); +}; + + +Storage.prototype.fetchAddresses = function(walletId, cb) { + var addresses = []; + var key = KEY.ADDRESS(walletId); + this.db.createReadStream({ + gte: key, + lt: key + '~' + }) + .on('data', function(data) { + addresses.push(Address.fromObj(data.value)); + }) + .on('error', function(err) { + if (err.notFound) return cb(); + return cb(err); + }) + .on('end', function() { + return cb(null, addresses); + }); +}; + +Storage.prototype.storeAddressAndWallet = function(wallet, addresses, cb) { + var ops = _.map([].concat(addresses), function(address) { + return { + type: 'put', + key: KEY.ADDRESS(wallet.id, address.address), + value: address, + }; + }); + ops.unshift({ + type: 'put', + key: KEY.WALLET(wallet.id), + value: wallet, + }); + + this.db.batch(ops, cb); +}; + +Storage.prototype._dump = function(cb, fn) { + fn = fn || console.log; + + this.db.readStream() + .on('data', function(data) { + fn(util.inspect(data, { + depth: 10 + })); + }) + .on('end', function() { + if (cb) return cb(); + }); +}; + +module.exports = Storage; diff --git a/lib/storage_mongo.js b/lib/storage_mongo.js deleted file mode 100644 index 19fe047..0000000 --- a/lib/storage_mongo.js +++ /dev/null @@ -1,311 +0,0 @@ -'use strict'; - -var _ = require('lodash'); -var async = require('async'); -var $ = require('preconditions').singleton(); -var log = require('npmlog'); -log.debug = log.verbose; -log.disableColor(); -var util = require('util'); - -var mongodb = require('mongodb'); - -var Model = require('./model'); - -var collections = { - WALLETS: 'wallets', - TXS: 'txs', - ADDRESSES: 'addresses', - NOTIFICATIONS: 'notifications', -}; - -var Storage = function(opts) { - opts = opts || {}; - this.db = opts.db; - - if (!this.db) { - var url = 'mongodb://localhost:27017/bws'; - mongodb.MongoClient.connect(url, function(err, db) { - if (err) { - log.error('Unable to connect to the mongoDB server. Error:', err); - return; - } - this.db = db; - console.log('Connection established to ', url); - }); - } -}; - -Storage.prototype.fetchWallet = function(id, cb) { - this.db.collection(collections.WALLETS).findOne({ - id: id - }, function(err, result) { - if (err) return cb(err); - if (!result) return cb(); - return cb(null, Model.Wallet.fromObj(result)); - }); -}; - -Storage.prototype.storeWallet = function(wallet, cb) { - this.db.collection(collections.WALLETS).update({ - id: wallet.id - }, wallet, { - w: 1, - upsert: true, - }, cb); -}; - -Storage.prototype.storeWalletAndUpdateCopayersLookup = function(wallet, cb) { - return this.storeWallet(wallet, cb); -}; - -Storage.prototype.fetchCopayerLookup = function(copayerId, cb) { - this.db.collection(collections.WALLETS).findOne({ - 'copayers.id': copayerId - }, { - fields: { - id: 1, - copayers: 1, - }, - }, function(err, result) { - if (err) return cb(err); - if (!result) return cb(); - var copayer = _.find(result.copayers, { - id: copayerId - }); - return cb(null, { - walletId: result.id, - requestPubKey: copayer.requestPubKey, - }); - }); -}; - -// TODO: should be done client-side -Storage.prototype._completeTxData = function(walletId, txs, cb) { - var txList = [].concat(txs); - this.fetchWallet(walletId, function(err, wallet) { - if (err) return cb(err); - _.each(txList, function(tx) { - tx.creatorName = wallet.getCopayer(tx.creatorId).name; - _.each(tx.actions, function(action) { - action.copayerName = wallet.getCopayer(action.copayerId).name; - }); - }); - return cb(null, txs); - }); -}; - -Storage.prototype.fetchTx = function(walletId, txProposalId, cb) { - var self = this; - - this.db.collection(collections.TXS).findOne({ - id: txProposalId, - walletId: walletId - }, function(err, result) { - if (err) return cb(err); - if (!result) return cb(); - return self._completeTxData(walletId, Model.TxProposal.fromObj(result), cb); - }); -}; - - -Storage.prototype.fetchPendingTxs = function(walletId, cb) { - var self = this; - - this.db.collection(collections.TXS).find({ - walletId: walletId, - isPending: true - }).sort({ - createdOn: -1 - }).toArray(function(err, result) { - if (err) return cb(err); - if (!result) return cb(); - var txs = _.map(result, function(tx) { - return Model.TxProposal.fromObj(tx); - }); - return self._completeTxData(walletId, txs, cb); - }); -}; - -/** - * fetchTxs. Times are in UNIX EPOCH (seconds) - * - * @param walletId - * @param opts.minTs - * @param opts.maxTs - * @param opts.limit - */ -Storage.prototype.fetchTxs = function(walletId, opts, cb) { - var self = this; - - opts = opts || {}; - - var tsFilter = {}; - if (_.isNumber(opts.minTs)) tsFilter.$gte = opts.minTs; - if (_.isNumber(opts.maxTs)) tsFilter.$lte = opts.maxTs; - - var filter = { - walletId: walletId - }; - if (!_.isEmpty(tsFilter)) filter.createdOn = tsFilter; - - var mods = {}; - if (_.isNumber(opts.limit)) mods.limit = opts.limit; - - this.db.collection(collections.TXS).find(filter, mods).sort({ - createdOn: -1 - }).toArray(function(err, result) { - if (err) return cb(err); - if (!result) return cb(); - var txs = _.map(result, function(tx) { - return Model.TxProposal.fromObj(tx); - }); - return self._completeTxData(walletId, txs, cb); - }); -}; - - -/** - * fetchNotifications - * - * @param walletId - * @param opts.minTs - * @param opts.maxTs - * @param opts.limit - * @param opts.reverse - */ -Storage.prototype.fetchNotifications = function(walletId, opts, cb) { - var self = this; - - opts = opts || {}; - - var tsFilter = {}; - if (_.isNumber(opts.minTs)) tsFilter.$gte = opts.minTs; - if (_.isNumber(opts.maxTs)) tsFilter.$lte = opts.maxTs; - - var filter = { - walletId: walletId - }; - if (!_.isEmpty(tsFilter)) filter.createdOn = tsFilter; - - var mods = {}; - if (_.isNumber(opts.limit)) mods.limit = opts.limit; - - this.db.collection(collections.NOTIFICATIONS).find(filter, mods).sort({ - id: opts.reverse ? -1 : 1, - }).toArray(function(err, result) { - if (err) return cb(err); - if (!result) return cb(); - var notifications = _.map(result, function(notification) { - return Model.Notification.fromObj(notification); - }); - return cb(null, notifications); - }); -}; - - -// TODO: remove walletId from signature -Storage.prototype.storeNotification = function(walletId, notification, cb) { - this.db.collection(collections.NOTIFICATIONS).insert(notification, { - w: 1 - }, cb); -}; - -// TODO: remove walletId from signature -Storage.prototype.storeTx = function(walletId, txp, cb) { - txp.isPending = txp.isPending(); // Persist attribute to use when querying - this.db.collection(collections.TXS).update({ - id: txp.id, - walletId: walletId - }, txp, { - w: 1, - upsert: true, - }, cb); -}; - -Storage.prototype.removeTx = function(walletId, txProposalId, cb) { - this.db.collection(collections.TXS).findAndRemove({ - id: txProposalId, - walletId: walletId - }, { - w: 1 - }, cb); -}; - -Storage.prototype.removeWallet = function(walletId, cb) { - var self = this; - - async.parallel([ - - function(next) { - self.db.collection(collections.WALLETS).findAndRemove({ - id: walletId - }, next); - }, - function(next) { - self.db.collection(collections.ADDRESSES).remove({ - walletId: walletId - }, next); - }, - function(next) { - self.db.collection(collections.TXS).remove({ - walletId: walletId - }, next); - }, - function(next) { - self.db.collection(collections.NOTIFICATIONS).remove({ - walletId: walletId - }, next); - }, - ], cb); -}; - - -Storage.prototype.fetchAddresses = function(walletId, cb) { - var self = this; - - this.db.collection(collections.ADDRESSES).find({ - walletId: walletId, - }).sort({ - createdOn: 1 - }).toArray(function(err, result) { - if (err) return cb(err); - if (!result) return cb(); - var addresses = _.map(result, function(address) { - return Model.Address.fromObj(address); - }); - return cb(null, addresses); - }); -}; - -Storage.prototype.storeAddressAndWallet = function(wallet, addresses, cb) { - var self = this; - var addresses = [].concat(addresses); - if (addresses.length == 0) return cb(); - this.db.collection(collections.ADDRESSES).insert(addresses, { - w: 1 - }, function(err) { - if (err) return cb(err); - self.storeWallet(wallet, cb); - }); -}; - -Storage.prototype._dump = function(cb, fn) { - fn = fn || console.log; - cb = cb || function() {}; - - var self = this; - this.db.collections(function(err, collections) { - if (err) return cb(err); - async.eachSeries(collections, function(col, next) { - col.find().toArray(function(err, items) { - fn('--------', col.s.name); - fn(items); - fn('------------------------------------------------------------------\n\n'); - next(err); - }); - }, cb); - }); -}; - -module.exports = Storage; diff --git a/package.json b/package.json index 66b52f3..48953f0 100644 --- a/package.json +++ b/package.json @@ -26,15 +26,12 @@ "coveralls": "^2.11.2", "express": "^4.10.0", "inherits": "^2.0.1", - "leveldown": "^0.10.0", - "levelup": "^0.19.0", "locker": "^0.1.0", "locker-server": "^0.1.3", "lodash": "^3.3.1", "mocha-lcov-reporter": "0.0.1", "mongodb": "^2.0.27", "morgan": "*", - "multilevel": "^6.1.0", "npmlog": "^0.1.1", "preconditions": "^1.0.7", "read": "^1.0.5", @@ -61,11 +58,14 @@ "test": "./node_modules/.bin/mocha", "coveralls": "./node_modules/.bin/istanbul cover ./node_modules/mocha/bin/_mocha --report lcovonly -- -R spec && cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js && rm -rf ./coverage" }, - "contributors": [{ - "name": "Ivan Socolsky", - "email": "ivan@bitpay.com" - }, { - "name": "Matias Alejo Garcia", - "email": "ematiu@gmail.com" - }] + "contributors": [ + { + "name": "Ivan Socolsky", + "email": "ivan@bitpay.com" + }, + { + "name": "Matias Alejo Garcia", + "email": "ematiu@gmail.com" + } + ] } diff --git a/test/integration/server.js b/test/integration/server.js index 1b05a35..1017c08 100644 --- a/test/integration/server.js +++ b/test/integration/server.js @@ -7,22 +7,23 @@ var inspect = require('util').inspect; var chai = require('chai'); var sinon = require('sinon'); var should = chai.should(); -var levelup = require('levelup'); -var memdown = require('memdown'); -var mongodb = require('mongodb'); var log = require('npmlog'); log.debug = log.verbose; +var mongodb = require('mongodb'); + var Utils = require('../../lib/utils'); var WalletUtils = require('bitcore-wallet-utils'); var Bitcore = WalletUtils.Bitcore; -var Storage = require('../../lib/storage_mongo'); +var Storage = require('../../lib/storage'); var BlockchainMonitor = require('../../lib/blockchainmonitor'); -var Wallet = require('../../lib/model/wallet'); -var TxProposal = require('../../lib/model/txproposal'); -var Address = require('../../lib/model/address'); -var Copayer = require('../../lib/model/copayer'); +var Model = require('../../lib/model'); +var Wallet = Model.Wallet; +var TxProposal = Model.TxProposal; +var Address = Model.Address; +var Copayer = Model.Copayer; + var WalletService = require('../../lib/server'); var NotificationBroadcaster = require('../../lib/notificationbroadcaster'); var TestData = require('../testdata'); diff --git a/test/storage.js b/test/storage.js index 143259d..e2ed216 100644 --- a/test/storage.js +++ b/test/storage.js @@ -5,24 +5,13 @@ var async = require('async'); var chai = require('chai'); var sinon = require('sinon'); var should = chai.should(); -var levelup = require('levelup'); -var memdown = require('memdown'); var mongodb = require('mongodb'); - -var StorageLevelDb = require('../lib/storage'); -var StorageMongoDb = require('../lib/storage_mongo'); +var Storage = require('../lib/storage'); var Model = require('../lib/model'); -function initStorageLevelDb(cb) { - var db = levelup(memdown, { - valueEncoding: 'json' - }); - return cb(null, db); -}; - -function initStorageMongoDb(cb) { +function initDb(cb) { var url = 'mongodb://localhost:27017/bws'; mongodb.MongoClient.connect(url, function(err, db) { should.not.exist(err); @@ -33,21 +22,6 @@ function initStorageMongoDb(cb) { }; - -var Storage, initDb; - - -var useLevel = false; - -if (useLevel) { - Storage = StorageLevelDb; - initDb = initStorageLevelDb; -} else { - Storage = StorageMongoDb; - initDb = initStorageMongoDb; -} - - describe('Storage', function() { var storage; beforeEach(function(done) {