Browse Source

remove leveldb

activeAddress
Ivan Socolsky 10 years ago
parent
commit
b3c33b2781
  1. 440
      lib/storage.js
  2. 391
      lib/storage_leveldb.js
  3. 311
      lib/storage_mongo.js
  4. 12
      package.json
  5. 17
      test/integration/server.js
  6. 30
      test/storage.js

440
lib/storage.js

@ -1,121 +1,86 @@
'use strict'; 'use strict';
var _ = require('lodash'); var _ = require('lodash');
var levelup = require('levelup');
var multilevel = require('multilevel');
var net = require('net');
var async = require('async'); var async = require('async');
var $ = require('preconditions').singleton(); var $ = require('preconditions').singleton();
var log = require('npmlog'); var log = require('npmlog');
var util = require('util');
log.debug = log.verbose; log.debug = log.verbose;
log.disableColor(); log.disableColor();
var util = require('util');
var mongodb = require('mongodb');
var Wallet = require('./model/wallet'); var Model = require('./model');
var Copayer = require('./model/copayer');
var Address = require('./model/address'); var collections = {
var TxProposal = require('./model/txproposal'); WALLETS: 'wallets',
var Notification = require('./model/notification'); TXS: 'txs',
ADDRESSES: 'addresses',
NOTIFICATIONS: 'notifications',
};
var Storage = function(opts) { var Storage = function(opts) {
opts = opts || {}; opts = opts || {};
this.db = opts.db; this.db = opts.db;
if (!this.db) { if (!this.db) {
if (opts.multiLevel) { var url = 'mongodb://localhost:27017/bws';
this.db = multilevel.client(); mongodb.MongoClient.connect(url, function(err, db) {
var con = net.connect(opts.multiLevel); if (err) {
con.pipe(this.db.createRpcStream()).pipe(con); log.error('Unable to connect to the mongoDB server. Error:', err);
log.info('Using multilevel server:' + opts.multiLevel.host + ':' + opts.multiLevel.port); return;
} else {
this.db = levelup(opts.dbPath || './db/bws.db', {
valueEncoding: 'json'
});
} }
this.db = db;
console.log('Connection established to ', url);
});
} }
}; };
var zeroPad = function(x, length) {
return _.padLeft(parseInt(x), length, '0');
};
var walletPrefix = function(id) {
return 'w!' + id;
};
var opKey = function(key) {
return key ? '!' + key : '';
};
var MAX_TS = _.repeat('9', 14);
var KEY = {
WALLET: function(walletId) {
return walletPrefix(walletId) + '!main';
},
COPAYER: function(id) {
return 'copayer!' + id;
},
TXP: function(walletId, txProposalId) {
return walletPrefix(walletId) + '!txp' + opKey(txProposalId);
},
NOTIFICATION: function(walletId, notificationId) {
return walletPrefix(walletId) + '!not' + opKey(notificationId);
},
PENDING_TXP: function(walletId, txProposalId) {
return walletPrefix(walletId) + '!ptxp' + opKey(txProposalId);
},
ADDRESS: function(walletId, address) {
return walletPrefix(walletId) + '!addr' + opKey(address);
},
};
Storage.prototype.fetchWallet = function(id, cb) { Storage.prototype.fetchWallet = function(id, cb) {
this.db.get(KEY.WALLET(id), function(err, data) { this.db.collection(collections.WALLETS).findOne({
if (err) { id: id
if (err.notFound) return cb(); }, function(err, result) {
return cb(err); if (err) return cb(err);
} if (!result) return cb();
return cb(null, Wallet.fromObj(data)); return cb(null, Model.Wallet.fromObj(result));
}); });
}; };
Storage.prototype.storeWallet = function(wallet, cb) { Storage.prototype.storeWallet = function(wallet, cb) {
this.db.put(KEY.WALLET(wallet.id), wallet, cb); this.db.collection(collections.WALLETS).update({
id: wallet.id
}, wallet, {
w: 1,
upsert: true,
}, cb);
}; };
Storage.prototype.storeWalletAndUpdateCopayersLookup = function(wallet, cb) { Storage.prototype.storeWalletAndUpdateCopayersLookup = function(wallet, cb) {
var ops = []; return this.storeWallet(wallet, cb);
ops.push({
type: 'put',
key: KEY.WALLET(wallet.id),
value: wallet
});
_.each(wallet.copayers, function(copayer) {
var value = {
walletId: wallet.id,
requestPubKey: copayer.requestPubKey,
};
ops.push({
type: 'put',
key: KEY.COPAYER(copayer.id),
value: value
});
});
this.db.batch(ops, cb);
}; };
Storage.prototype.fetchCopayerLookup = function(copayerId, cb) { Storage.prototype.fetchCopayerLookup = function(copayerId, cb) {
this.db.get(KEY.COPAYER(copayerId), function(err, data) { this.db.collection(collections.WALLETS).findOne({
if (err) { 'copayers.id': copayerId
if (err.notFound) return cb(); }, {
return cb(err); fields: {
} id: 1,
return cb(null, data); copayers: 1,
},
}, function(err, result) {
if (err) return cb(err);
if (!result) return cb();
var copayer = _.find(result.copayers, {
id: copayerId
});
return cb(null, {
walletId: result.id,
requestPubKey: copayer.requestPubKey,
});
}); });
}; };
// TODO: should be done client-side
Storage.prototype._completeTxData = function(walletId, txs, cb) { Storage.prototype._completeTxData = function(walletId, txs, cb) {
var txList = [].concat(txs); var txList = [].concat(txs);
this.fetchWallet(walletId, function(err, wallet) { this.fetchWallet(walletId, function(err, wallet) {
@ -132,12 +97,14 @@ Storage.prototype._completeTxData = function(walletId, txs, cb) {
Storage.prototype.fetchTx = function(walletId, txProposalId, cb) { Storage.prototype.fetchTx = function(walletId, txProposalId, cb) {
var self = this; var self = this;
this.db.get(KEY.TXP(walletId, txProposalId), function(err, data) {
if (err) { this.db.collection(collections.TXS).findOne({
if (err.notFound) return cb(); id: txProposalId,
return cb(err); walletId: walletId
} }, function(err, result) {
return self._completeTxData(walletId, TxProposal.fromObj(data), cb); if (err) return cb(err);
if (!result) return cb();
return self._completeTxData(walletId, Model.TxProposal.fromObj(result), cb);
}); });
}; };
@ -145,20 +112,17 @@ Storage.prototype.fetchTx = function(walletId, txProposalId, cb) {
Storage.prototype.fetchPendingTxs = function(walletId, cb) { Storage.prototype.fetchPendingTxs = function(walletId, cb) {
var self = this; var self = this;
var txs = []; this.db.collection(collections.TXS).find({
var key = KEY.PENDING_TXP(walletId); walletId: walletId,
this.db.createReadStream({ isPending: true
gte: key, }).sort({
lt: key + '~' createdOn: -1
}) }).toArray(function(err, result) {
.on('data', function(data) { if (err) return cb(err);
txs.push(TxProposal.fromObj(data.value)); if (!result) return cb();
}) var txs = _.map(result, function(tx) {
.on('error', function(err) { return Model.TxProposal.fromObj(tx);
if (err.notFound) return cb(); });
return cb(err);
})
.on('end', function() {
return self._completeTxData(walletId, txs, cb); return self._completeTxData(walletId, txs, cb);
}); });
}; };
@ -174,29 +138,28 @@ Storage.prototype.fetchPendingTxs = function(walletId, cb) {
Storage.prototype.fetchTxs = function(walletId, opts, cb) { Storage.prototype.fetchTxs = function(walletId, opts, cb) {
var self = this; var self = this;
var txs = [];
opts = opts || {}; opts = opts || {};
opts.limit = _.isNumber(opts.limit) ? parseInt(opts.limit) : -1;
opts.minTs = _.isNumber(opts.minTs) ? zeroPad(opts.minTs, 11) : 0; var tsFilter = {};
opts.maxTs = _.isNumber(opts.maxTs) ? zeroPad(opts.maxTs, 11) : MAX_TS; if (_.isNumber(opts.minTs)) tsFilter.$gte = opts.minTs;
if (_.isNumber(opts.maxTs)) tsFilter.$lte = opts.maxTs;
var key = KEY.TXP(walletId, opts.minTs);
var endkey = KEY.TXP(walletId, opts.maxTs); var filter = {
walletId: walletId
this.db.createReadStream({ };
gt: key, if (!_.isEmpty(tsFilter)) filter.createdOn = tsFilter;
lt: endkey + '~',
reverse: true, var mods = {};
limit: opts.limit, if (_.isNumber(opts.limit)) mods.limit = opts.limit;
})
.on('data', function(data) { this.db.collection(collections.TXS).find(filter, mods).sort({
txs.push(TxProposal.fromObj(data.value)); createdOn: -1
}) }).toArray(function(err, result) {
.on('error', function(err) { if (err) return cb(err);
if (err.notFound) return cb(); if (!result) return cb();
return cb(err); var txs = _.map(result, function(tx) {
}) return Model.TxProposal.fromObj(tx);
.on('end', function() { });
return self._completeTxData(walletId, txs, cb); return self._completeTxData(walletId, txs, cb);
}); });
}; };
@ -209,182 +172,139 @@ Storage.prototype.fetchTxs = function(walletId, opts, cb) {
* @param opts.minTs * @param opts.minTs
* @param opts.maxTs * @param opts.maxTs
* @param opts.limit * @param opts.limit
* @param opts.reverse
*/ */
Storage.prototype.fetchNotifications = function(walletId, opts, cb) { Storage.prototype.fetchNotifications = function(walletId, opts, cb) {
var txs = []; var self = this;
opts = opts || {}; opts = opts || {};
opts.limit = _.isNumber(opts.limit) ? parseInt(opts.limit) : -1;
opts.minTs = _.isNumber(opts.minTs) ? zeroPad(opts.minTs, 11) : 0;
opts.maxTs = _.isNumber(opts.maxTs) ? zeroPad(opts.maxTs, 11) : MAX_TS;
var key = KEY.NOTIFICATION(walletId, opts.minTs);
var endkey = KEY.NOTIFICATION(walletId, opts.maxTs);
this.db.createReadStream({
gt: key,
lt: endkey + '~',
reverse: opts.reverse,
limit: opts.limit,
})
.on('data', function(data) {
txs.push(Notification.fromObj(data.value));
})
.on('error', function(err) {
if (err.notFound) return cb();
return cb(err);
})
.on('end', function() {
return cb(null, txs);
});
};
var tsFilter = {};
if (_.isNumber(opts.minTs)) tsFilter.$gte = opts.minTs;
if (_.isNumber(opts.maxTs)) tsFilter.$lte = opts.maxTs;
Storage.prototype.storeNotification = function(walletId, notification, cb) { var filter = {
this.db.put(KEY.NOTIFICATION(walletId, notification.id), notification, cb); walletId: walletId
}; };
if (!_.isEmpty(tsFilter)) filter.createdOn = tsFilter;
var mods = {};
if (_.isNumber(opts.limit)) mods.limit = opts.limit;
// TODO should we store only txp.id on keys for indexing this.db.collection(collections.NOTIFICATIONS).find(filter, mods).sort({
// or the whole txp? For now, the entire record makes sense id: opts.reverse ? -1 : 1,
// (faster + easier to access) }).toArray(function(err, result) {
Storage.prototype.storeTx = function(walletId, txp, cb) { if (err) return cb(err);
var ops = [{ if (!result) return cb();
type: 'put', var notifications = _.map(result, function(notification) {
key: KEY.TXP(walletId, txp.id), return Model.Notification.fromObj(notification);
value: txp,
}];
if (txp.isPending()) {
ops.push({
type: 'put',
key: KEY.PENDING_TXP(walletId, txp.id),
value: txp,
}); });
} else { return cb(null, notifications);
ops.push({
type: 'del',
key: KEY.PENDING_TXP(walletId, txp.id),
}); });
}
this.db.batch(ops, cb);
}; };
Storage.prototype.removeTx = function(walletId, txProposalId, cb) {
var ops = [{
type: 'del',
key: KEY.TXP(walletId, txProposalId),
}, {
type: 'del',
key: KEY.PENDING_TXP(walletId, txProposalId),
}];
this.db.batch(ops, cb); // TODO: remove walletId from signature
Storage.prototype.storeNotification = function(walletId, notification, cb) {
this.db.collection(collections.NOTIFICATIONS).insert(notification, {
w: 1
}, cb);
}; };
Storage.prototype._delByKey = function(key, cb) { // TODO: remove walletId from signature
var self = this; Storage.prototype.storeTx = function(walletId, txp, cb) {
var keys = []; txp.isPending = txp.isPending(); // Persist attribute to use when querying
this.db.createKeyStream({ this.db.collection(collections.TXS).update({
gte: key, id: txp.id,
lt: key + '~', walletId: walletId
}) }, txp, {
.on('data', function(key) { w: 1,
keys.push(key); upsert: true,
}) }, cb);
.on('error', function(err) {
if (err.notFound) return cb();
return cb(err);
})
.on('end', function(err) {
self.db.batch(_.map(keys, function(k) {
return {
key: k,
type: 'del'
};
}), function(err) {
return cb(err);
});
});
}; };
Storage.prototype._removeCopayers = function(walletId, cb) { Storage.prototype.removeTx = function(walletId, txProposalId, cb) {
var self = this; this.db.collection(collections.TXS).findAndRemove({
id: txProposalId,
this.fetchWallet(walletId, function(err, w) { walletId: walletId
if (err || !w) return cb(err); }, {
w: 1
self.db.batch(_.map(w.copayers, function(c) { }, cb);
return {
type: 'del',
key: KEY.COPAYER(c.id),
};
}), cb);
});
}; };
Storage.prototype.removeWallet = function(walletId, cb) { Storage.prototype.removeWallet = function(walletId, cb) {
var self = this; var self = this;
async.series([ async.parallel([
function(next) { function(next) {
// This should be the first step. Will check the wallet exists self.db.collection(collections.WALLETS).findAndRemove({
self._removeCopayers(walletId, next); id: walletId
}, next);
},
function(next) {
self.db.collection(collections.ADDRESSES).remove({
walletId: walletId
}, next);
}, },
function(next) { function(next) {
self._delByKey(walletPrefix(walletId), cb); self.db.collection(collections.TXS).remove({
walletId: walletId
}, next);
},
function(next) {
self.db.collection(collections.NOTIFICATIONS).remove({
walletId: walletId
}, next);
}, },
], cb); ], cb);
}; };
Storage.prototype.fetchAddresses = function(walletId, cb) { Storage.prototype.fetchAddresses = function(walletId, cb) {
var addresses = []; var self = this;
var key = KEY.ADDRESS(walletId);
this.db.createReadStream({ this.db.collection(collections.ADDRESSES).find({
gte: key, walletId: walletId,
lt: key + '~' }).sort({
}) createdOn: 1
.on('data', function(data) { }).toArray(function(err, result) {
addresses.push(Address.fromObj(data.value)); if (err) return cb(err);
}) if (!result) return cb();
.on('error', function(err) { var addresses = _.map(result, function(address) {
if (err.notFound) return cb(); return Model.Address.fromObj(address);
return cb(err); });
})
.on('end', function() {
return cb(null, addresses); return cb(null, addresses);
}); });
}; };
Storage.prototype.storeAddressAndWallet = function(wallet, addresses, cb) { Storage.prototype.storeAddressAndWallet = function(wallet, addresses, cb) {
var ops = _.map([].concat(addresses), function(address) { var self = this;
return { var addresses = [].concat(addresses);
type: 'put', if (addresses.length == 0) return cb();
key: KEY.ADDRESS(wallet.id, address.address), this.db.collection(collections.ADDRESSES).insert(addresses, {
value: address, w: 1
}; }, function(err) {
}); if (err) return cb(err);
ops.unshift({ self.storeWallet(wallet, cb);
type: 'put',
key: KEY.WALLET(wallet.id),
value: wallet,
}); });
this.db.batch(ops, cb);
}; };
Storage.prototype._dump = function(cb, fn) { Storage.prototype._dump = function(cb, fn) {
fn = fn || console.log; fn = fn || console.log;
cb = cb || function() {};
this.db.readStream() var self = this;
.on('data', function(data) { this.db.collections(function(err, collections) {
fn(util.inspect(data, { if (err) return cb(err);
depth: 10 async.eachSeries(collections, function(col, next) {
})); col.find().toArray(function(err, items) {
}) fn('--------', col.s.name);
.on('end', function() { fn(items);
if (cb) return cb(); fn('------------------------------------------------------------------\n\n');
next(err);
});
}, cb);
}); });
}; };

391
lib/storage_leveldb.js

@ -0,0 +1,391 @@
'use strict';
var _ = require('lodash');
var levelup = require('levelup');
var multilevel = require('multilevel');
var net = require('net');
var async = require('async');
var $ = require('preconditions').singleton();
var log = require('npmlog');
var util = require('util');
log.debug = log.verbose;
log.disableColor();
var Wallet = require('./model/wallet');
var Copayer = require('./model/copayer');
var Address = require('./model/address');
var TxProposal = require('./model/txproposal');
var Notification = require('./model/notification');
var Storage = function(opts) {
opts = opts || {};
this.db = opts.db;
if (!this.db) {
if (opts.multiLevel) {
this.db = multilevel.client();
var con = net.connect(opts.multiLevel);
con.pipe(this.db.createRpcStream()).pipe(con);
log.info('Using multilevel server:' + opts.multiLevel.host + ':' + opts.multiLevel.port);
} else {
this.db = levelup(opts.dbPath || './db/bws.db', {
valueEncoding: 'json'
});
}
}
};
var zeroPad = function(x, length) {
return _.padLeft(parseInt(x), length, '0');
};
var walletPrefix = function(id) {
return 'w!' + id;
};
var opKey = function(key) {
return key ? '!' + key : '';
};
var MAX_TS = _.repeat('9', 14);
var KEY = {
WALLET: function(walletId) {
return walletPrefix(walletId) + '!main';
},
COPAYER: function(id) {
return 'copayer!' + id;
},
TXP: function(walletId, txProposalId) {
return walletPrefix(walletId) + '!txp' + opKey(txProposalId);
},
NOTIFICATION: function(walletId, notificationId) {
return walletPrefix(walletId) + '!not' + opKey(notificationId);
},
PENDING_TXP: function(walletId, txProposalId) {
return walletPrefix(walletId) + '!ptxp' + opKey(txProposalId);
},
ADDRESS: function(walletId, address) {
return walletPrefix(walletId) + '!addr' + opKey(address);
},
};
Storage.prototype.fetchWallet = function(id, cb) {
this.db.get(KEY.WALLET(id), function(err, data) {
if (err) {
if (err.notFound) return cb();
return cb(err);
}
return cb(null, Wallet.fromObj(data));
});
};
Storage.prototype.storeWallet = function(wallet, cb) {
this.db.put(KEY.WALLET(wallet.id), wallet, cb);
};
Storage.prototype.storeWalletAndUpdateCopayersLookup = function(wallet, cb) {
var ops = [];
ops.push({
type: 'put',
key: KEY.WALLET(wallet.id),
value: wallet
});
_.each(wallet.copayers, function(copayer) {
var value = {
walletId: wallet.id,
requestPubKey: copayer.requestPubKey,
};
ops.push({
type: 'put',
key: KEY.COPAYER(copayer.id),
value: value
});
});
this.db.batch(ops, cb);
};
Storage.prototype.fetchCopayerLookup = function(copayerId, cb) {
this.db.get(KEY.COPAYER(copayerId), function(err, data) {
if (err) {
if (err.notFound) return cb();
return cb(err);
}
return cb(null, data);
});
};
Storage.prototype._completeTxData = function(walletId, txs, cb) {
var txList = [].concat(txs);
this.fetchWallet(walletId, function(err, wallet) {
if (err) return cb(err);
_.each(txList, function(tx) {
tx.creatorName = wallet.getCopayer(tx.creatorId).name;
_.each(tx.actions, function(action) {
action.copayerName = wallet.getCopayer(action.copayerId).name;
});
});
return cb(null, txs);
});
};
Storage.prototype.fetchTx = function(walletId, txProposalId, cb) {
var self = this;
this.db.get(KEY.TXP(walletId, txProposalId), function(err, data) {
if (err) {
if (err.notFound) return cb();
return cb(err);
}
return self._completeTxData(walletId, TxProposal.fromObj(data), cb);
});
};
Storage.prototype.fetchPendingTxs = function(walletId, cb) {
var self = this;
var txs = [];
var key = KEY.PENDING_TXP(walletId);
this.db.createReadStream({
gte: key,
lt: key + '~'
})
.on('data', function(data) {
txs.push(TxProposal.fromObj(data.value));
})
.on('error', function(err) {
if (err.notFound) return cb();
return cb(err);
})
.on('end', function() {
return self._completeTxData(walletId, txs, cb);
});
};
/**
* fetchTxs. Times are in UNIX EPOCH (seconds)
*
* @param walletId
* @param opts.minTs
* @param opts.maxTs
* @param opts.limit
*/
Storage.prototype.fetchTxs = function(walletId, opts, cb) {
var self = this;
var txs = [];
opts = opts || {};
opts.limit = _.isNumber(opts.limit) ? parseInt(opts.limit) : -1;
opts.minTs = _.isNumber(opts.minTs) ? zeroPad(opts.minTs, 11) : 0;
opts.maxTs = _.isNumber(opts.maxTs) ? zeroPad(opts.maxTs, 11) : MAX_TS;
var key = KEY.TXP(walletId, opts.minTs);
var endkey = KEY.TXP(walletId, opts.maxTs);
this.db.createReadStream({
gt: key,
lt: endkey + '~',
reverse: true,
limit: opts.limit,
})
.on('data', function(data) {
txs.push(TxProposal.fromObj(data.value));
})
.on('error', function(err) {
if (err.notFound) return cb();
return cb(err);
})
.on('end', function() {
return self._completeTxData(walletId, txs, cb);
});
};
/**
* fetchNotifications
*
* @param walletId
* @param opts.minTs
* @param opts.maxTs
* @param opts.limit
*/
Storage.prototype.fetchNotifications = function(walletId, opts, cb) {
var txs = [];
opts = opts || {};
opts.limit = _.isNumber(opts.limit) ? parseInt(opts.limit) : -1;
opts.minTs = _.isNumber(opts.minTs) ? zeroPad(opts.minTs, 11) : 0;
opts.maxTs = _.isNumber(opts.maxTs) ? zeroPad(opts.maxTs, 11) : MAX_TS;
var key = KEY.NOTIFICATION(walletId, opts.minTs);
var endkey = KEY.NOTIFICATION(walletId, opts.maxTs);
this.db.createReadStream({
gt: key,
lt: endkey + '~',
reverse: opts.reverse,
limit: opts.limit,
})
.on('data', function(data) {
txs.push(Notification.fromObj(data.value));
})
.on('error', function(err) {
if (err.notFound) return cb();
return cb(err);
})
.on('end', function() {
return cb(null, txs);
});
};
Storage.prototype.storeNotification = function(walletId, notification, cb) {
this.db.put(KEY.NOTIFICATION(walletId, notification.id), notification, cb);
};
// TODO should we store only txp.id on keys for indexing
// or the whole txp? For now, the entire record makes sense
// (faster + easier to access)
Storage.prototype.storeTx = function(walletId, txp, cb) {
var ops = [{
type: 'put',
key: KEY.TXP(walletId, txp.id),
value: txp,
}];
if (txp.isPending()) {
ops.push({
type: 'put',
key: KEY.PENDING_TXP(walletId, txp.id),
value: txp,
});
} else {
ops.push({
type: 'del',
key: KEY.PENDING_TXP(walletId, txp.id),
});
}
this.db.batch(ops, cb);
};
Storage.prototype.removeTx = function(walletId, txProposalId, cb) {
var ops = [{
type: 'del',
key: KEY.TXP(walletId, txProposalId),
}, {
type: 'del',
key: KEY.PENDING_TXP(walletId, txProposalId),
}];
this.db.batch(ops, cb);
};
Storage.prototype._delByKey = function(key, cb) {
var self = this;
var keys = [];
this.db.createKeyStream({
gte: key,
lt: key + '~',
})
.on('data', function(key) {
keys.push(key);
})
.on('error', function(err) {
if (err.notFound) return cb();
return cb(err);
})
.on('end', function(err) {
self.db.batch(_.map(keys, function(k) {
return {
key: k,
type: 'del'
};
}), function(err) {
return cb(err);
});
});
};
Storage.prototype._removeCopayers = function(walletId, cb) {
var self = this;
this.fetchWallet(walletId, function(err, w) {
if (err || !w) return cb(err);
self.db.batch(_.map(w.copayers, function(c) {
return {
type: 'del',
key: KEY.COPAYER(c.id),
};
}), cb);
});
};
Storage.prototype.removeWallet = function(walletId, cb) {
var self = this;
async.series([
function(next) {
// This should be the first step. Will check the wallet exists
self._removeCopayers(walletId, next);
},
function(next) {
self._delByKey(walletPrefix(walletId), cb);
},
], cb);
};
Storage.prototype.fetchAddresses = function(walletId, cb) {
var addresses = [];
var key = KEY.ADDRESS(walletId);
this.db.createReadStream({
gte: key,
lt: key + '~'
})
.on('data', function(data) {
addresses.push(Address.fromObj(data.value));
})
.on('error', function(err) {
if (err.notFound) return cb();
return cb(err);
})
.on('end', function() {
return cb(null, addresses);
});
};
Storage.prototype.storeAddressAndWallet = function(wallet, addresses, cb) {
var ops = _.map([].concat(addresses), function(address) {
return {
type: 'put',
key: KEY.ADDRESS(wallet.id, address.address),
value: address,
};
});
ops.unshift({
type: 'put',
key: KEY.WALLET(wallet.id),
value: wallet,
});
this.db.batch(ops, cb);
};
Storage.prototype._dump = function(cb, fn) {
fn = fn || console.log;
this.db.readStream()
.on('data', function(data) {
fn(util.inspect(data, {
depth: 10
}));
})
.on('end', function() {
if (cb) return cb();
});
};
module.exports = Storage;

311
lib/storage_mongo.js

@ -1,311 +0,0 @@
'use strict';
var _ = require('lodash');
var async = require('async');
var $ = require('preconditions').singleton();
var log = require('npmlog');
log.debug = log.verbose;
log.disableColor();
var util = require('util');
var mongodb = require('mongodb');
var Model = require('./model');
var collections = {
WALLETS: 'wallets',
TXS: 'txs',
ADDRESSES: 'addresses',
NOTIFICATIONS: 'notifications',
};
var Storage = function(opts) {
opts = opts || {};
this.db = opts.db;
if (!this.db) {
var url = 'mongodb://localhost:27017/bws';
mongodb.MongoClient.connect(url, function(err, db) {
if (err) {
log.error('Unable to connect to the mongoDB server. Error:', err);
return;
}
this.db = db;
console.log('Connection established to ', url);
});
}
};
Storage.prototype.fetchWallet = function(id, cb) {
this.db.collection(collections.WALLETS).findOne({
id: id
}, function(err, result) {
if (err) return cb(err);
if (!result) return cb();
return cb(null, Model.Wallet.fromObj(result));
});
};
Storage.prototype.storeWallet = function(wallet, cb) {
this.db.collection(collections.WALLETS).update({
id: wallet.id
}, wallet, {
w: 1,
upsert: true,
}, cb);
};
Storage.prototype.storeWalletAndUpdateCopayersLookup = function(wallet, cb) {
return this.storeWallet(wallet, cb);
};
Storage.prototype.fetchCopayerLookup = function(copayerId, cb) {
this.db.collection(collections.WALLETS).findOne({
'copayers.id': copayerId
}, {
fields: {
id: 1,
copayers: 1,
},
}, function(err, result) {
if (err) return cb(err);
if (!result) return cb();
var copayer = _.find(result.copayers, {
id: copayerId
});
return cb(null, {
walletId: result.id,
requestPubKey: copayer.requestPubKey,
});
});
};
// TODO: should be done client-side
Storage.prototype._completeTxData = function(walletId, txs, cb) {
var txList = [].concat(txs);
this.fetchWallet(walletId, function(err, wallet) {
if (err) return cb(err);
_.each(txList, function(tx) {
tx.creatorName = wallet.getCopayer(tx.creatorId).name;
_.each(tx.actions, function(action) {
action.copayerName = wallet.getCopayer(action.copayerId).name;
});
});
return cb(null, txs);
});
};
Storage.prototype.fetchTx = function(walletId, txProposalId, cb) {
var self = this;
this.db.collection(collections.TXS).findOne({
id: txProposalId,
walletId: walletId
}, function(err, result) {
if (err) return cb(err);
if (!result) return cb();
return self._completeTxData(walletId, Model.TxProposal.fromObj(result), cb);
});
};
Storage.prototype.fetchPendingTxs = function(walletId, cb) {
var self = this;
this.db.collection(collections.TXS).find({
walletId: walletId,
isPending: true
}).sort({
createdOn: -1
}).toArray(function(err, result) {
if (err) return cb(err);
if (!result) return cb();
var txs = _.map(result, function(tx) {
return Model.TxProposal.fromObj(tx);
});
return self._completeTxData(walletId, txs, cb);
});
};
/**
* fetchTxs. Times are in UNIX EPOCH (seconds)
*
* @param walletId
* @param opts.minTs
* @param opts.maxTs
* @param opts.limit
*/
Storage.prototype.fetchTxs = function(walletId, opts, cb) {
var self = this;
opts = opts || {};
var tsFilter = {};
if (_.isNumber(opts.minTs)) tsFilter.$gte = opts.minTs;
if (_.isNumber(opts.maxTs)) tsFilter.$lte = opts.maxTs;
var filter = {
walletId: walletId
};
if (!_.isEmpty(tsFilter)) filter.createdOn = tsFilter;
var mods = {};
if (_.isNumber(opts.limit)) mods.limit = opts.limit;
this.db.collection(collections.TXS).find(filter, mods).sort({
createdOn: -1
}).toArray(function(err, result) {
if (err) return cb(err);
if (!result) return cb();
var txs = _.map(result, function(tx) {
return Model.TxProposal.fromObj(tx);
});
return self._completeTxData(walletId, txs, cb);
});
};
/**
* fetchNotifications
*
* @param walletId
* @param opts.minTs
* @param opts.maxTs
* @param opts.limit
* @param opts.reverse
*/
Storage.prototype.fetchNotifications = function(walletId, opts, cb) {
var self = this;
opts = opts || {};
var tsFilter = {};
if (_.isNumber(opts.minTs)) tsFilter.$gte = opts.minTs;
if (_.isNumber(opts.maxTs)) tsFilter.$lte = opts.maxTs;
var filter = {
walletId: walletId
};
if (!_.isEmpty(tsFilter)) filter.createdOn = tsFilter;
var mods = {};
if (_.isNumber(opts.limit)) mods.limit = opts.limit;
this.db.collection(collections.NOTIFICATIONS).find(filter, mods).sort({
id: opts.reverse ? -1 : 1,
}).toArray(function(err, result) {
if (err) return cb(err);
if (!result) return cb();
var notifications = _.map(result, function(notification) {
return Model.Notification.fromObj(notification);
});
return cb(null, notifications);
});
};
// TODO: remove walletId from signature
Storage.prototype.storeNotification = function(walletId, notification, cb) {
this.db.collection(collections.NOTIFICATIONS).insert(notification, {
w: 1
}, cb);
};
// TODO: remove walletId from signature
Storage.prototype.storeTx = function(walletId, txp, cb) {
txp.isPending = txp.isPending(); // Persist attribute to use when querying
this.db.collection(collections.TXS).update({
id: txp.id,
walletId: walletId
}, txp, {
w: 1,
upsert: true,
}, cb);
};
Storage.prototype.removeTx = function(walletId, txProposalId, cb) {
this.db.collection(collections.TXS).findAndRemove({
id: txProposalId,
walletId: walletId
}, {
w: 1
}, cb);
};
Storage.prototype.removeWallet = function(walletId, cb) {
var self = this;
async.parallel([
function(next) {
self.db.collection(collections.WALLETS).findAndRemove({
id: walletId
}, next);
},
function(next) {
self.db.collection(collections.ADDRESSES).remove({
walletId: walletId
}, next);
},
function(next) {
self.db.collection(collections.TXS).remove({
walletId: walletId
}, next);
},
function(next) {
self.db.collection(collections.NOTIFICATIONS).remove({
walletId: walletId
}, next);
},
], cb);
};
Storage.prototype.fetchAddresses = function(walletId, cb) {
var self = this;
this.db.collection(collections.ADDRESSES).find({
walletId: walletId,
}).sort({
createdOn: 1
}).toArray(function(err, result) {
if (err) return cb(err);
if (!result) return cb();
var addresses = _.map(result, function(address) {
return Model.Address.fromObj(address);
});
return cb(null, addresses);
});
};
Storage.prototype.storeAddressAndWallet = function(wallet, addresses, cb) {
var self = this;
var addresses = [].concat(addresses);
if (addresses.length == 0) return cb();
this.db.collection(collections.ADDRESSES).insert(addresses, {
w: 1
}, function(err) {
if (err) return cb(err);
self.storeWallet(wallet, cb);
});
};
Storage.prototype._dump = function(cb, fn) {
fn = fn || console.log;
cb = cb || function() {};
var self = this;
this.db.collections(function(err, collections) {
if (err) return cb(err);
async.eachSeries(collections, function(col, next) {
col.find().toArray(function(err, items) {
fn('--------', col.s.name);
fn(items);
fn('------------------------------------------------------------------\n\n');
next(err);
});
}, cb);
});
};
module.exports = Storage;

12
package.json

@ -26,15 +26,12 @@
"coveralls": "^2.11.2", "coveralls": "^2.11.2",
"express": "^4.10.0", "express": "^4.10.0",
"inherits": "^2.0.1", "inherits": "^2.0.1",
"leveldown": "^0.10.0",
"levelup": "^0.19.0",
"locker": "^0.1.0", "locker": "^0.1.0",
"locker-server": "^0.1.3", "locker-server": "^0.1.3",
"lodash": "^3.3.1", "lodash": "^3.3.1",
"mocha-lcov-reporter": "0.0.1", "mocha-lcov-reporter": "0.0.1",
"mongodb": "^2.0.27", "mongodb": "^2.0.27",
"morgan": "*", "morgan": "*",
"multilevel": "^6.1.0",
"npmlog": "^0.1.1", "npmlog": "^0.1.1",
"preconditions": "^1.0.7", "preconditions": "^1.0.7",
"read": "^1.0.5", "read": "^1.0.5",
@ -61,11 +58,14 @@
"test": "./node_modules/.bin/mocha", "test": "./node_modules/.bin/mocha",
"coveralls": "./node_modules/.bin/istanbul cover ./node_modules/mocha/bin/_mocha --report lcovonly -- -R spec && cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js && rm -rf ./coverage" "coveralls": "./node_modules/.bin/istanbul cover ./node_modules/mocha/bin/_mocha --report lcovonly -- -R spec && cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js && rm -rf ./coverage"
}, },
"contributors": [{ "contributors": [
{
"name": "Ivan Socolsky", "name": "Ivan Socolsky",
"email": "ivan@bitpay.com" "email": "ivan@bitpay.com"
}, { },
{
"name": "Matias Alejo Garcia", "name": "Matias Alejo Garcia",
"email": "ematiu@gmail.com" "email": "ematiu@gmail.com"
}] }
]
} }

17
test/integration/server.js

@ -7,22 +7,23 @@ var inspect = require('util').inspect;
var chai = require('chai'); var chai = require('chai');
var sinon = require('sinon'); var sinon = require('sinon');
var should = chai.should(); var should = chai.should();
var levelup = require('levelup');
var memdown = require('memdown');
var mongodb = require('mongodb');
var log = require('npmlog'); var log = require('npmlog');
log.debug = log.verbose; log.debug = log.verbose;
var mongodb = require('mongodb');
var Utils = require('../../lib/utils'); var Utils = require('../../lib/utils');
var WalletUtils = require('bitcore-wallet-utils'); var WalletUtils = require('bitcore-wallet-utils');
var Bitcore = WalletUtils.Bitcore; var Bitcore = WalletUtils.Bitcore;
var Storage = require('../../lib/storage_mongo'); var Storage = require('../../lib/storage');
var BlockchainMonitor = require('../../lib/blockchainmonitor'); var BlockchainMonitor = require('../../lib/blockchainmonitor');
var Wallet = require('../../lib/model/wallet'); var Model = require('../../lib/model');
var TxProposal = require('../../lib/model/txproposal'); var Wallet = Model.Wallet;
var Address = require('../../lib/model/address'); var TxProposal = Model.TxProposal;
var Copayer = require('../../lib/model/copayer'); var Address = Model.Address;
var Copayer = Model.Copayer;
var WalletService = require('../../lib/server'); var WalletService = require('../../lib/server');
var NotificationBroadcaster = require('../../lib/notificationbroadcaster'); var NotificationBroadcaster = require('../../lib/notificationbroadcaster');
var TestData = require('../testdata'); var TestData = require('../testdata');

30
test/storage.js

@ -5,24 +5,13 @@ var async = require('async');
var chai = require('chai'); var chai = require('chai');
var sinon = require('sinon'); var sinon = require('sinon');
var should = chai.should(); var should = chai.should();
var levelup = require('levelup');
var memdown = require('memdown');
var mongodb = require('mongodb'); var mongodb = require('mongodb');
var Storage = require('../lib/storage');
var StorageLevelDb = require('../lib/storage');
var StorageMongoDb = require('../lib/storage_mongo');
var Model = require('../lib/model'); var Model = require('../lib/model');
function initStorageLevelDb(cb) { function initDb(cb) {
var db = levelup(memdown, {
valueEncoding: 'json'
});
return cb(null, db);
};
function initStorageMongoDb(cb) {
var url = 'mongodb://localhost:27017/bws'; var url = 'mongodb://localhost:27017/bws';
mongodb.MongoClient.connect(url, function(err, db) { mongodb.MongoClient.connect(url, function(err, db) {
should.not.exist(err); should.not.exist(err);
@ -33,21 +22,6 @@ function initStorageMongoDb(cb) {
}; };
var Storage, initDb;
var useLevel = false;
if (useLevel) {
Storage = StorageLevelDb;
initDb = initStorageLevelDb;
} else {
Storage = StorageMongoDb;
initDb = initStorageMongoDb;
}
describe('Storage', function() { describe('Storage', function() {
var storage; var storage;
beforeEach(function(done) { beforeEach(function(done) {

Loading…
Cancel
Save