Browse Source

Merge pull request #191 from isocolsky/mongo

Mongo
activeAddress
Matias Alejo Garcia 10 years ago
parent
commit
2fb5754da4
  1. 37
      bws.js
  2. 20
      config.js
  3. 9
      lib/expressapp.js
  4. 2
      lib/model/address.js
  5. 6
      lib/model/copayer.js
  6. 9
      lib/model/index.js
  7. 5
      lib/model/wallet.js
  8. 40
      lib/server.js
  9. 461
      lib/storage.js
  10. 383
      lib/storage_leveldb.js
  11. 70
      multilevel/clientMultilevel.js
  12. 17
      multilevel/multilevel.js
  13. 7
      package.json
  14. 63
      scripts/level2mongo.js
  15. 269
      test/integration/server.js
  16. 193
      test/storage.js

37
bws.js

@ -28,30 +28,35 @@ if (config.https) {
serverOpts.cert = fs.readFileSync(config.certificateFile || './ssl/certificate.pem'); serverOpts.cert = fs.readFileSync(config.certificateFile || './ssl/certificate.pem');
} }
var start = function() { var start = function(cb) {
var server; var server;
if (config.cluster) { if (config.cluster) {
server = sticky(clusterInstances, function() { server = sticky(clusterInstances, function() {
var app = ExpressApp.start(config); ExpressApp.start(config, function(err, app) {
var server = config.https ? serverModule.createServer(serverOpts, app) : var server = config.https ? serverModule.createServer(serverOpts, app) :
serverModule.Server(app);
WsApp.start(server, config);
return server;
});
});
return cb(server);
} else {
ExpressApp.start(config, function(err, app) {
server = config.https ? serverModule.createServer(serverOpts, app) :
serverModule.Server(app); serverModule.Server(app);
WsApp.start(server, config); WsApp.start(server, config);
return server; return cb(server);
}); });
} else { };
var app = ExpressApp.start(config); };
server = config.https ? serverModule.createServer(serverOpts, app) :
serverModule.Server(app); if (config.cluster && !config.lockOpts.lockerServer)
WsApp.start(server, config); throw 'When running in cluster mode, locker server need to be configured';
}
start(function(server) {
server.listen(port, function(err) { server.listen(port, function(err) {
if (err) console.log('ERROR: ', err); if (err) console.log('ERROR: ', err);
log.info('Bitcore Wallet Service running on port ' + port); log.info('Bitcore Wallet Service running on port ' + port);
}); });
}; });
if (config.cluster && (!config.storageOpts.multiLevel || !config.lockOpts.lockerServer))
throw 'When running in cluster mode, multilevel and locker server need to be configured';
start();

20
config.js

@ -1,7 +1,7 @@
var config = { var config = {
basePath: '/bws/api', basePath: '/bws/api',
disableLogs: false, disableLogs: false,
port: 3232, port: 3232,
// Uncomment to make BWS a forking server // Uncomment to make BWS a forking server
// cluster: true, // cluster: true,
// Uncomment to use the nr of availalbe CPUs // Uncomment to use the nr of availalbe CPUs
@ -12,19 +12,17 @@ var config = {
// certificateFile: 'cert.pem', // certificateFile: 'cert.pem',
storageOpts: { storageOpts: {
dbPath: './db', mongoDb: {
// Uncomment to use multilevel server host: 'localhost',
// multiLevel: { port: 27017,
// host: 'localhost', },
// port: 3230,
// },
}, },
lockOpts: { lockOpts: {
// To use locker-server, uncomment this: // To use locker-server, uncomment this:
// lockerServer: { // lockerServer: {
// host: 'localhost', // host: 'localhost',
// port: 3231, // port: 3231,
// }, // },
}, },
blockchainExplorerOpts: { blockchainExplorerOpts: {
livenet: { livenet: {

9
lib/expressapp.js

@ -22,11 +22,11 @@ var ExpressApp = function() {};
* @param opts.WalletService options for WalletService class * @param opts.WalletService options for WalletService class
* @param opts.basePath * @param opts.basePath
* @param opts.disableLogs * @param opts.disableLogs
* @param {Callback} cb
*/ */
ExpressApp.start = function(opts) { ExpressApp.start = function(opts, cb) {
opts = opts || {}; opts = opts || {};
WalletService.initialize(opts);
var app = express(); var app = express();
app.use(function(req, res, next) { app.use(function(req, res, next) {
res.setHeader('Access-Control-Allow-Origin', '*'); res.setHeader('Access-Control-Allow-Origin', '*');
@ -316,7 +316,10 @@ ExpressApp.start = function(opts) {
app.use(opts.basePath || '/bws/api', router); app.use(opts.basePath || '/bws/api', router);
return app;
WalletService.initialize(opts, function(err) {
return cb(err,app);
});
}; };
module.exports = ExpressApp; module.exports = ExpressApp;

2
lib/model/address.js

@ -13,6 +13,7 @@ Address.create = function(opts) {
x.createdOn = Math.floor(Date.now() / 1000); x.createdOn = Math.floor(Date.now() / 1000);
x.address = opts.address; x.address = opts.address;
x.walletId = opts.walletId;
x.isChange = opts.isChange; x.isChange = opts.isChange;
x.path = opts.path; x.path = opts.path;
x.publicKeys = opts.publicKeys; x.publicKeys = opts.publicKeys;
@ -24,6 +25,7 @@ Address.fromObj = function(obj) {
x.createdOn = obj.createdOn; x.createdOn = obj.createdOn;
x.address = obj.address; x.address = obj.address;
x.walletId = obj.walletId;
x.isChange = obj.isChange; x.isChange = obj.isChange;
x.path = obj.path; x.path = obj.path;
x.publicKeys = obj.publicKeys; x.publicKeys = obj.publicKeys;

6
lib/model/copayer.js

@ -60,7 +60,11 @@ Copayer.prototype.createAddress = function(wallet, isChange) {
$.checkState(wallet.isComplete()); $.checkState(wallet.isComplete());
var path = this.addressManager.getNewAddressPath(isChange); var path = this.addressManager.getNewAddressPath(isChange);
var address = Address.create(WalletUtils.deriveAddress(wallet.publicKeyRing, path, wallet.m, wallet.network)); var raw = Address.create(WalletUtils.deriveAddress(wallet.publicKeyRing, path, wallet.m, wallet.network));
var address = Address.create(_.extend(raw, {
walletId: wallet.id,
}));
address.isChange = isChange; address.isChange = isChange;
return address; return address;
}; };

9
lib/model/index.js

@ -0,0 +1,9 @@
var Model = {};
Model.Wallet = require('./wallet');
Model.Copayer = require('./copayer');
Model.TxProposal = require('./txproposal');
Model.Address = require('./address');
Model.Notification = require('./notification');
module.exports = Model;

5
lib/model/wallet.js

@ -145,7 +145,10 @@ Wallet.prototype.createAddress = function(isChange) {
$.checkState(this.isComplete()); $.checkState(this.isComplete());
var path = this.addressManager.getNewAddressPath(isChange); var path = this.addressManager.getNewAddressPath(isChange);
var address = Address.create(WalletUtils.deriveAddress(this.publicKeyRing, path, this.m, this.network)); var raw = WalletUtils.deriveAddress(this.publicKeyRing, path, this.m, this.network);
var address = Address.create(_.extend(raw, {
walletId: this.id,
}));
address.isChange = isChange; address.isChange = isChange;
return address; return address;
}; };

40
lib/server.js

@ -53,14 +53,44 @@ WalletService.onNotification = function(func) {
* @param {Object} opts * @param {Object} opts
* @param {Storage} [opts.storage] - The storage provider. * @param {Storage} [opts.storage] - The storage provider.
* @param {Storage} [opts.blockchainExplorer] - The blockchainExporer provider. * @param {Storage} [opts.blockchainExplorer] - The blockchainExporer provider.
* @param {Callback} cb
*/ */
WalletService.initialize = function(opts) { WalletService.initialize = function(opts, cb) {
$.shouldBeFunction(cb);
opts = opts || {}; opts = opts || {};
lock = opts.lock || new Lock(opts.lockOpts); lock = opts.lock || new Lock(opts.lockOpts);
storage = opts.storage || new Storage(opts.storageOpts);
blockchainExplorer = opts.blockchainExplorer; blockchainExplorer = opts.blockchainExplorer;
blockchainExplorerOpts = opts.blockchainExplorerOpts; blockchainExplorerOpts = opts.blockchainExplorerOpts;
initialized = true;
if (initialized)
return cb();
if (opts.storage) {
storage = opts.storage;
initialized = true;
return cb();
} else {
var newStorage = new Storage();
newStorage.connect(opts.storageOpts, function(err) {
if (err) return cb(err);
storage = newStorage;
initialized = true;
return cb();
});
}
};
WalletService.shutDown = function(cb) {
if (initialized) {
storage.disconnect(function(err) {
if (err) return cb(err);
initialized = false;
return cb();
});
}
}; };
WalletService.getInstance = function() { WalletService.getInstance = function() {
@ -942,7 +972,7 @@ WalletService.prototype.getPendingTxs = function(opts, cb) {
* @param {Object} opts.minTs (defaults to 0) * @param {Object} opts.minTs (defaults to 0)
* @param {Object} opts.maxTs (defaults to now) * @param {Object} opts.maxTs (defaults to now)
* @param {Object} opts.limit * @param {Object} opts.limit
* @returns {TxProposal[]} Transaction proposals, first newer * @returns {TxProposal[]} Transaction proposals, newer first
*/ */
WalletService.prototype.getTxs = function(opts, cb) { WalletService.prototype.getTxs = function(opts, cb) {
var self = this; var self = this;
@ -1120,7 +1150,7 @@ WalletService.prototype.getTxHistory = function(opts, cb) {
async.parallel([ async.parallel([
function(next) { function(next) {
self.storage.fetchTxs(self.walletId, opts, function(err, txps) { self.storage.fetchTxs(self.walletId, {}, function(err, txps) {
if (err) return next(err); if (err) return next(err);
next(null, txps); next(null, txps);
}); });

461
lib/storage.js

@ -1,121 +1,116 @@
'use strict'; 'use strict';
var _ = require('lodash'); var _ = require('lodash');
var levelup = require('levelup');
var multilevel = require('multilevel');
var net = require('net');
var async = require('async'); var async = require('async');
var $ = require('preconditions').singleton(); var $ = require('preconditions').singleton();
var log = require('npmlog'); var log = require('npmlog');
var util = require('util');
log.debug = log.verbose; log.debug = log.verbose;
log.disableColor(); log.disableColor();
var util = require('util');
var mongodb = require('mongodb');
var Wallet = require('./model/wallet'); var Model = require('./model');
var Copayer = require('./model/copayer');
var Address = require('./model/address'); var collections = {
var TxProposal = require('./model/txproposal'); WALLETS: 'wallets',
var Notification = require('./model/notification'); TXS: 'txs',
ADDRESSES: 'addresses',
NOTIFICATIONS: 'notifications',
COPAYERS_LOOKUP: 'copayers_lookup',
};
var Storage = function(opts) { var Storage = function(opts) {
opts = opts || {}; opts = opts || {};
this.db = opts.db; this.db = opts.db;
if (!this.db) {
if (opts.multiLevel) {
this.db = multilevel.client();
var con = net.connect(opts.multiLevel);
con.pipe(this.db.createRpcStream()).pipe(con);
log.info('Using multilevel server:' + opts.multiLevel.host + ':' + opts.multiLevel.port);
} else {
this.db = levelup(opts.dbPath || './db/bws.db', {
valueEncoding: 'json'
});
}
}
}; };
var zeroPad = function(x, length) { Storage.prototype.connect = function(opts, cb) {
return _.padLeft(parseInt(x), length, '0'); var self = this;
};
var walletPrefix = function(id) { opts = opts || {};
return 'w!' + id;
};
var opKey = function(key) { if (this.db) return cb(null);
return key ? '!' + key : '';
var config = opts.mongoDb || {};
var url = 'mongodb://' + (config.host || 'localhost') + ':' + (config.port ||  27017) + '/bws';
mongodb.MongoClient.connect(url, function(err, db) {
if (err) {
log.error('Unable to connect to the mongoDB server.');
return cb(err);
}
self.db = db;
console.log('Connection established to ', url);
return cb(null);
});
}; };
var MAX_TS = _.repeat('9', 14);
Storage.prototype.disconnect = function(cb) {
var self = this;
var KEY = { this.db.close(true, function(err) {
WALLET: function(walletId) { if (err) return cb(err);
return walletPrefix(walletId) + '!main'; self.db = null;
}, return cb();
COPAYER: function(id) { });
return 'copayer!' + id;
},
TXP: function(walletId, txProposalId) {
return walletPrefix(walletId) + '!txp' + opKey(txProposalId);
},
NOTIFICATION: function(walletId, notificationId) {
return walletPrefix(walletId) + '!not' + opKey(notificationId);
},
PENDING_TXP: function(walletId, txProposalId) {
return walletPrefix(walletId) + '!ptxp' + opKey(txProposalId);
},
ADDRESS: function(walletId, address) {
return walletPrefix(walletId) + '!addr' + opKey(address);
},
}; };
Storage.prototype.fetchWallet = function(id, cb) { Storage.prototype.fetchWallet = function(id, cb) {
this.db.get(KEY.WALLET(id), function(err, data) { this.db.collection(collections.WALLETS).findOne({
if (err) { id: id
if (err.notFound) return cb(); }, function(err, result) {
return cb(err); if (err) return cb(err);
} if (!result) return cb();
return cb(null, Wallet.fromObj(data)); return cb(null, Model.Wallet.fromObj(result));
}); });
}; };
Storage.prototype.storeWallet = function(wallet, cb) { Storage.prototype.storeWallet = function(wallet, cb) {
this.db.put(KEY.WALLET(wallet.id), wallet, cb); this.db.collection(collections.WALLETS).update({
id: wallet.id
}, wallet, {
w: 1,
upsert: true,
}, cb);
}; };
Storage.prototype.storeWalletAndUpdateCopayersLookup = function(wallet, cb) { Storage.prototype.storeWalletAndUpdateCopayersLookup = function(wallet, cb) {
var ops = []; var self = this;
ops.push({
type: 'put', var copayerLookups = _.map(wallet.copayers, function(copayer) {
key: KEY.WALLET(wallet.id), return {
value: wallet copayerId: copayer.id,
});
_.each(wallet.copayers, function(copayer) {
var value = {
walletId: wallet.id, walletId: wallet.id,
requestPubKey: copayer.requestPubKey, requestPubKey: copayer.requestPubKey,
}; };
ops.push({ });
type: 'put',
key: KEY.COPAYER(copayer.id), this.db.collection(collections.COPAYERS_LOOKUP).remove({
value: value walletId: wallet.id
}, {
w: 1
}, function(err) {
if (err) return cb(err);
self.db.collection(collections.COPAYERS_LOOKUP).insert(copayerLookups, {
w: 1
}, function(err) {
if (err) return cb(err);
return self.storeWallet(wallet, cb);
}); });
}); });
this.db.batch(ops, cb);
}; };
Storage.prototype.fetchCopayerLookup = function(copayerId, cb) { Storage.prototype.fetchCopayerLookup = function(copayerId, cb) {
this.db.get(KEY.COPAYER(copayerId), function(err, data) { this.db.collection(collections.COPAYERS_LOOKUP).findOne({
if (err) { copayerId: copayerId
if (err.notFound) return cb(); }, function(err, result) {
return cb(err); if (err) return cb(err);
} if (!result) return cb();
return cb(null, data); return cb(null, result);
}); });
}; };
// TODO: should be done client-side
Storage.prototype._completeTxData = function(walletId, txs, cb) { Storage.prototype._completeTxData = function(walletId, txs, cb) {
var txList = [].concat(txs); var txList = [].concat(txs);
this.fetchWallet(walletId, function(err, wallet) { this.fetchWallet(walletId, function(err, wallet) {
@ -132,12 +127,14 @@ Storage.prototype._completeTxData = function(walletId, txs, cb) {
Storage.prototype.fetchTx = function(walletId, txProposalId, cb) { Storage.prototype.fetchTx = function(walletId, txProposalId, cb) {
var self = this; var self = this;
this.db.get(KEY.TXP(walletId, txProposalId), function(err, data) {
if (err) { this.db.collection(collections.TXS).findOne({
if (err.notFound) return cb(); id: txProposalId,
return cb(err); walletId: walletId
} }, function(err, result) {
return self._completeTxData(walletId, TxProposal.fromObj(data), cb); if (err) return cb(err);
if (!result) return cb();
return self._completeTxData(walletId, Model.TxProposal.fromObj(result), cb);
}); });
}; };
@ -145,22 +142,19 @@ Storage.prototype.fetchTx = function(walletId, txProposalId, cb) {
Storage.prototype.fetchPendingTxs = function(walletId, cb) { Storage.prototype.fetchPendingTxs = function(walletId, cb) {
var self = this; var self = this;
var txs = []; this.db.collection(collections.TXS).find({
var key = KEY.PENDING_TXP(walletId); walletId: walletId,
this.db.createReadStream({ isPending: true
gte: key, }).sort({
lt: key + '~' createdOn: -1
}) }).toArray(function(err, result) {
.on('data', function(data) { if (err) return cb(err);
txs.push(TxProposal.fromObj(data.value)); if (!result) return cb();
}) var txs = _.map(result, function(tx) {
.on('error', function(err) { return Model.TxProposal.fromObj(tx);
if (err.notFound) return cb();
return cb(err);
})
.on('end', function() {
return self._completeTxData(walletId, txs, cb);
}); });
return self._completeTxData(walletId, txs, cb);
});
}; };
/** /**
@ -174,31 +168,30 @@ Storage.prototype.fetchPendingTxs = function(walletId, cb) {
Storage.prototype.fetchTxs = function(walletId, opts, cb) { Storage.prototype.fetchTxs = function(walletId, opts, cb) {
var self = this; var self = this;
var txs = [];
opts = opts || {}; opts = opts || {};
opts.limit = _.isNumber(opts.limit) ? parseInt(opts.limit) : -1;
opts.minTs = _.isNumber(opts.minTs) ? zeroPad(opts.minTs, 11) : 0; var tsFilter = {};
opts.maxTs = _.isNumber(opts.maxTs) ? zeroPad(opts.maxTs, 11) : MAX_TS; if (_.isNumber(opts.minTs)) tsFilter.$gte = opts.minTs;
if (_.isNumber(opts.maxTs)) tsFilter.$lte = opts.maxTs;
var key = KEY.TXP(walletId, opts.minTs);
var endkey = KEY.TXP(walletId, opts.maxTs); var filter = {
walletId: walletId
this.db.createReadStream({ };
gt: key, if (!_.isEmpty(tsFilter)) filter.createdOn = tsFilter;
lt: endkey + '~',
reverse: true, var mods = {};
limit: opts.limit, if (_.isNumber(opts.limit)) mods.limit = opts.limit;
})
.on('data', function(data) { this.db.collection(collections.TXS).find(filter, mods).sort({
txs.push(TxProposal.fromObj(data.value)); createdOn: -1
}) }).toArray(function(err, result) {
.on('error', function(err) { if (err) return cb(err);
if (err.notFound) return cb(); if (!result) return cb();
return cb(err); var txs = _.map(result, function(tx) {
}) return Model.TxProposal.fromObj(tx);
.on('end', function() {
return self._completeTxData(walletId, txs, cb);
}); });
return self._completeTxData(walletId, txs, cb);
});
}; };
@ -209,183 +202,133 @@ Storage.prototype.fetchTxs = function(walletId, opts, cb) {
* @param opts.minTs * @param opts.minTs
* @param opts.maxTs * @param opts.maxTs
* @param opts.limit * @param opts.limit
* @param opts.reverse
*/ */
Storage.prototype.fetchNotifications = function(walletId, opts, cb) { Storage.prototype.fetchNotifications = function(walletId, opts, cb) {
var txs = []; var self = this;
opts = opts || {}; opts = opts || {};
opts.limit = _.isNumber(opts.limit) ? parseInt(opts.limit) : -1;
opts.minTs = _.isNumber(opts.minTs) ? zeroPad(opts.minTs, 11) : 0;
opts.maxTs = _.isNumber(opts.maxTs) ? zeroPad(opts.maxTs, 11) : MAX_TS;
var key = KEY.NOTIFICATION(walletId, opts.minTs);
var endkey = KEY.NOTIFICATION(walletId, opts.maxTs);
this.db.createReadStream({
gt: key,
lt: endkey + '~',
reverse: opts.reverse,
limit: opts.limit,
})
.on('data', function(data) {
txs.push(Notification.fromObj(data.value));
})
.on('error', function(err) {
if (err.notFound) return cb();
return cb(err);
})
.on('end', function() {
return cb(null, txs);
});
};
var tsFilter = {};
if (_.isNumber(opts.minTs)) tsFilter.$gte = opts.minTs;
if (_.isNumber(opts.maxTs)) tsFilter.$lte = opts.maxTs;
Storage.prototype.storeNotification = function(walletId, notification, cb) { var filter = {
this.db.put(KEY.NOTIFICATION(walletId, notification.id), notification, cb); walletId: walletId
}; };
if (!_.isEmpty(tsFilter)) filter.createdOn = tsFilter;
var mods = {};
if (_.isNumber(opts.limit)) mods.limit = opts.limit;
// TODO should we store only txp.id on keys for indexing this.db.collection(collections.NOTIFICATIONS).find(filter, mods).sort({
// or the whole txp? For now, the entire record makes sense id: opts.reverse ? -1 : 1,
// (faster + easier to access) }).toArray(function(err, result) {
Storage.prototype.storeTx = function(walletId, txp, cb) { if (err) return cb(err);
var ops = [{ if (!result) return cb();
type: 'put', var notifications = _.map(result, function(notification) {
key: KEY.TXP(walletId, txp.id), return Model.Notification.fromObj(notification);
value: txp,
}];
if (txp.isPending()) {
ops.push({
type: 'put',
key: KEY.PENDING_TXP(walletId, txp.id),
value: txp,
});
} else {
ops.push({
type: 'del',
key: KEY.PENDING_TXP(walletId, txp.id),
}); });
} return cb(null, notifications);
this.db.batch(ops, cb); });
}; };
Storage.prototype.removeTx = function(walletId, txProposalId, cb) {
var ops = [{
type: 'del',
key: KEY.TXP(walletId, txProposalId),
}, {
type: 'del',
key: KEY.PENDING_TXP(walletId, txProposalId),
}];
this.db.batch(ops, cb); // TODO: remove walletId from signature
Storage.prototype.storeNotification = function(walletId, notification, cb) {
this.db.collection(collections.NOTIFICATIONS).insert(notification, {
w: 1
}, cb);
}; };
Storage.prototype._delByKey = function(key, cb) { // TODO: remove walletId from signature
var self = this; Storage.prototype.storeTx = function(walletId, txp, cb) {
var keys = []; txp.isPending = txp.isPending(); // Persist attribute to use when querying
this.db.createKeyStream({ this.db.collection(collections.TXS).update({
gte: key, id: txp.id,
lt: key + '~', walletId: walletId
}) }, txp, {
.on('data', function(key) { w: 1,
keys.push(key); upsert: true,
}) }, cb);
.on('error', function(err) {
if (err.notFound) return cb();
return cb(err);
})
.on('end', function(err) {
self.db.batch(_.map(keys, function(k) {
return {
key: k,
type: 'del'
};
}), function(err) {
return cb(err);
});
});
}; };
Storage.prototype._removeCopayers = function(walletId, cb) { Storage.prototype.removeTx = function(walletId, txProposalId, cb) {
var self = this; this.db.collection(collections.TXS).findAndRemove({
id: txProposalId,
this.fetchWallet(walletId, function(err, w) { walletId: walletId
if (err || !w) return cb(err); }, {
w: 1
self.db.batch(_.map(w.copayers, function(c) { }, cb);
return {
type: 'del',
key: KEY.COPAYER(c.id),
};
}), cb);
});
}; };
Storage.prototype.removeWallet = function(walletId, cb) { Storage.prototype.removeWallet = function(walletId, cb) {
var self = this; var self = this;
async.series([ async.parallel([
function(next) { function(next) {
// This should be the first step. Will check the wallet exists self.db.collection(collections.WALLETS).findAndRemove({
self._removeCopayers(walletId, next); id: walletId
}, next);
}, },
function(next) { function(next) {
self._delByKey(walletPrefix(walletId), cb); var otherCollections = _.without(_.values(collections), collections.WALLETS);
async.each(otherCollections, function(col, next) {
self.db.collection(col).remove({
walletId: walletId
}, next);
}, next);
}, },
], cb); ], cb);
}; };
Storage.prototype.fetchAddresses = function(walletId, cb) { Storage.prototype.fetchAddresses = function(walletId, cb) {
var addresses = []; var self = this;
var key = KEY.ADDRESS(walletId);
this.db.createReadStream({ this.db.collection(collections.ADDRESSES).find({
gte: key, walletId: walletId,
lt: key + '~' }).sort({
}) createdOn: 1
.on('data', function(data) { }).toArray(function(err, result) {
addresses.push(Address.fromObj(data.value)); if (err) return cb(err);
}) if (!result) return cb();
.on('error', function(err) { var addresses = _.map(result, function(address) {
if (err.notFound) return cb(); return Model.Address.fromObj(address);
return cb(err);
})
.on('end', function() {
return cb(null, addresses);
}); });
return cb(null, addresses);
});
}; };
Storage.prototype.storeAddressAndWallet = function(wallet, addresses, cb) { Storage.prototype.storeAddressAndWallet = function(wallet, addresses, cb) {
var ops = _.map([].concat(addresses), function(address) { var self = this;
return { var addresses = [].concat(addresses);
type: 'put', if (addresses.length == 0) return cb();
key: KEY.ADDRESS(wallet.id, address.address), this.db.collection(collections.ADDRESSES).insert(addresses, {
value: address, w: 1
}; }, function(err) {
}); if (err) return cb(err);
ops.unshift({ self.storeWallet(wallet, cb);
type: 'put',
key: KEY.WALLET(wallet.id),
value: wallet,
}); });
this.db.batch(ops, cb);
}; };
Storage.prototype._dump = function(cb, fn) { Storage.prototype._dump = function(cb, fn) {
fn = fn || console.log; fn = fn || console.log;
cb = cb || function() {};
this.db.readStream() var self = this;
.on('data', function(data) { this.db.collections(function(err, collections) {
fn(util.inspect(data, { if (err) return cb(err);
depth: 10 async.eachSeries(collections, function(col, next) {
})); col.find().toArray(function(err, items) {
}) fn('--------', col.s.name);
.on('end', function() { fn(items);
if (cb) return cb(); fn('------------------------------------------------------------------\n\n');
}); next(err);
});
}, cb);
});
}; };
module.exports = Storage; module.exports = Storage;

383
lib/storage_leveldb.js

@ -0,0 +1,383 @@
'use strict';
var _ = require('lodash');
var levelup = require('levelup');
var net = require('net');
var async = require('async');
var $ = require('preconditions').singleton();
var log = require('npmlog');
var util = require('util');
log.debug = log.verbose;
log.disableColor();
var Wallet = require('./model/wallet');
var Copayer = require('./model/copayer');
var Address = require('./model/address');
var TxProposal = require('./model/txproposal');
var Notification = require('./model/notification');
var Storage = function(opts) {
opts = opts || {};
this.db = opts.db;
if (!this.db) {
this.db = levelup(opts.dbPath || './db/bws.db', {
valueEncoding: 'json'
});
}
};
var zeroPad = function(x, length) {
return _.padLeft(parseInt(x), length, '0');
};
var walletPrefix = function(id) {
return 'w!' + id;
};
var opKey = function(key) {
return key ? '!' + key : '';
};
var MAX_TS = _.repeat('9', 14);
var KEY = {
WALLET: function(walletId) {
return walletPrefix(walletId) + '!main';
},
COPAYER: function(id) {
return 'copayer!' + id;
},
TXP: function(walletId, txProposalId) {
return walletPrefix(walletId) + '!txp' + opKey(txProposalId);
},
NOTIFICATION: function(walletId, notificationId) {
return walletPrefix(walletId) + '!not' + opKey(notificationId);
},
PENDING_TXP: function(walletId, txProposalId) {
return walletPrefix(walletId) + '!ptxp' + opKey(txProposalId);
},
ADDRESS: function(walletId, address) {
return walletPrefix(walletId) + '!addr' + opKey(address);
},
};
Storage.prototype.fetchWallet = function(id, cb) {
this.db.get(KEY.WALLET(id), function(err, data) {
if (err) {
if (err.notFound) return cb();
return cb(err);
}
return cb(null, Wallet.fromObj(data));
});
};
Storage.prototype.storeWallet = function(wallet, cb) {
this.db.put(KEY.WALLET(wallet.id), wallet, cb);
};
Storage.prototype.storeWalletAndUpdateCopayersLookup = function(wallet, cb) {
var ops = [];
ops.push({
type: 'put',
key: KEY.WALLET(wallet.id),
value: wallet
});
_.each(wallet.copayers, function(copayer) {
var value = {
walletId: wallet.id,
requestPubKey: copayer.requestPubKey,
};
ops.push({
type: 'put',
key: KEY.COPAYER(copayer.id),
value: value
});
});
this.db.batch(ops, cb);
};
Storage.prototype.fetchCopayerLookup = function(copayerId, cb) {
this.db.get(KEY.COPAYER(copayerId), function(err, data) {
if (err) {
if (err.notFound) return cb();
return cb(err);
}
return cb(null, data);
});
};
Storage.prototype._completeTxData = function(walletId, txs, cb) {
var txList = [].concat(txs);
this.fetchWallet(walletId, function(err, wallet) {
if (err) return cb(err);
_.each(txList, function(tx) {
tx.creatorName = wallet.getCopayer(tx.creatorId).name;
_.each(tx.actions, function(action) {
action.copayerName = wallet.getCopayer(action.copayerId).name;
});
});
return cb(null, txs);
});
};
Storage.prototype.fetchTx = function(walletId, txProposalId, cb) {
var self = this;
this.db.get(KEY.TXP(walletId, txProposalId), function(err, data) {
if (err) {
if (err.notFound) return cb();
return cb(err);
}
return self._completeTxData(walletId, TxProposal.fromObj(data), cb);
});
};
Storage.prototype.fetchPendingTxs = function(walletId, cb) {
var self = this;
var txs = [];
var key = KEY.PENDING_TXP(walletId);
this.db.createReadStream({
gte: key,
lt: key + '~'
})
.on('data', function(data) {
txs.push(TxProposal.fromObj(data.value));
})
.on('error', function(err) {
if (err.notFound) return cb();
return cb(err);
})
.on('end', function() {
return self._completeTxData(walletId, txs, cb);
});
};
/**
* fetchTxs. Times are in UNIX EPOCH (seconds)
*
* @param walletId
* @param opts.minTs
* @param opts.maxTs
* @param opts.limit
*/
Storage.prototype.fetchTxs = function(walletId, opts, cb) {
var self = this;
var txs = [];
opts = opts || {};
opts.limit = _.isNumber(opts.limit) ? parseInt(opts.limit) : -1;
opts.minTs = _.isNumber(opts.minTs) ? zeroPad(opts.minTs, 11) : 0;
opts.maxTs = _.isNumber(opts.maxTs) ? zeroPad(opts.maxTs, 11) : MAX_TS;
var key = KEY.TXP(walletId, opts.minTs);
var endkey = KEY.TXP(walletId, opts.maxTs);
this.db.createReadStream({
gt: key,
lt: endkey + '~',
reverse: true,
limit: opts.limit,
})
.on('data', function(data) {
txs.push(TxProposal.fromObj(data.value));
})
.on('error', function(err) {
if (err.notFound) return cb();
return cb(err);
})
.on('end', function() {
return self._completeTxData(walletId, txs, cb);
});
};
/**
* fetchNotifications
*
* @param walletId
* @param opts.minTs
* @param opts.maxTs
* @param opts.limit
*/
Storage.prototype.fetchNotifications = function(walletId, opts, cb) {
var txs = [];
opts = opts || {};
opts.limit = _.isNumber(opts.limit) ? parseInt(opts.limit) : -1;
opts.minTs = _.isNumber(opts.minTs) ? zeroPad(opts.minTs, 11) : 0;
opts.maxTs = _.isNumber(opts.maxTs) ? zeroPad(opts.maxTs, 11) : MAX_TS;
var key = KEY.NOTIFICATION(walletId, opts.minTs);
var endkey = KEY.NOTIFICATION(walletId, opts.maxTs);
this.db.createReadStream({
gt: key,
lt: endkey + '~',
reverse: opts.reverse,
limit: opts.limit,
})
.on('data', function(data) {
txs.push(Notification.fromObj(data.value));
})
.on('error', function(err) {
if (err.notFound) return cb();
return cb(err);
})
.on('end', function() {
return cb(null, txs);
});
};
Storage.prototype.storeNotification = function(walletId, notification, cb) {
this.db.put(KEY.NOTIFICATION(walletId, notification.id), notification, cb);
};
// TODO should we store only txp.id on keys for indexing
// or the whole txp? For now, the entire record makes sense
// (faster + easier to access)
Storage.prototype.storeTx = function(walletId, txp, cb) {
var ops = [{
type: 'put',
key: KEY.TXP(walletId, txp.id),
value: txp,
}];
if (txp.isPending()) {
ops.push({
type: 'put',
key: KEY.PENDING_TXP(walletId, txp.id),
value: txp,
});
} else {
ops.push({
type: 'del',
key: KEY.PENDING_TXP(walletId, txp.id),
});
}
this.db.batch(ops, cb);
};
Storage.prototype.removeTx = function(walletId, txProposalId, cb) {
var ops = [{
type: 'del',
key: KEY.TXP(walletId, txProposalId),
}, {
type: 'del',
key: KEY.PENDING_TXP(walletId, txProposalId),
}];
this.db.batch(ops, cb);
};
Storage.prototype._delByKey = function(key, cb) {
var self = this;
var keys = [];
this.db.createKeyStream({
gte: key,
lt: key + '~',
})
.on('data', function(key) {
keys.push(key);
})
.on('error', function(err) {
if (err.notFound) return cb();
return cb(err);
})
.on('end', function(err) {
self.db.batch(_.map(keys, function(k) {
return {
key: k,
type: 'del'
};
}), function(err) {
return cb(err);
});
});
};
Storage.prototype._removeCopayers = function(walletId, cb) {
var self = this;
this.fetchWallet(walletId, function(err, w) {
if (err || !w) return cb(err);
self.db.batch(_.map(w.copayers, function(c) {
return {
type: 'del',
key: KEY.COPAYER(c.id),
};
}), cb);
});
};
Storage.prototype.removeWallet = function(walletId, cb) {
var self = this;
async.series([
function(next) {
// This should be the first step. Will check the wallet exists
self._removeCopayers(walletId, next);
},
function(next) {
self._delByKey(walletPrefix(walletId), cb);
},
], cb);
};
Storage.prototype.fetchAddresses = function(walletId, cb) {
var addresses = [];
var key = KEY.ADDRESS(walletId);
this.db.createReadStream({
gte: key,
lt: key + '~'
})
.on('data', function(data) {
addresses.push(Address.fromObj(data.value));
})
.on('error', function(err) {
if (err.notFound) return cb();
return cb(err);
})
.on('end', function() {
return cb(null, addresses);
});
};
Storage.prototype.storeAddressAndWallet = function(wallet, addresses, cb) {
var ops = _.map([].concat(addresses), function(address) {
return {
type: 'put',
key: KEY.ADDRESS(wallet.id, address.address),
value: address,
};
});
ops.unshift({
type: 'put',
key: KEY.WALLET(wallet.id),
value: wallet,
});
this.db.batch(ops, cb);
};
Storage.prototype._dump = function(cb, fn) {
fn = fn || console.log;
this.db.readStream()
.on('data', function(data) {
fn(util.inspect(data, {
depth: 10
}));
})
.on('end', function() {
if (cb) return cb();
});
};
module.exports = Storage;

70
multilevel/clientMultilevel.js

@ -1,70 +0,0 @@
#!/usr/bin/env node
var multilevel = require('multilevel');
var net = require('net');
var moment = require('moment');
var PORT = 3230;
var otherDate;
//trying to parse optional parameter to get stats on any given date
try {
otherDate = process.argv[2] && moment(process.argv[2]).isValid() ? moment(process.argv[2]) : null;
} catch (e) {
console.log('Enter the date in the format YYYY-MM-DD.');
}
var db = multilevel.client();
var con = net.connect(PORT);
con.pipe(db.createRpcStream()).pipe(con);
var Today = otherDate || moment();
var TotalTx = 0;
var TotalAmount = 0;
var TotalNewWallets = 0;
var IsToday = function(date) {
if (!date) return false;
var date = moment(date * 1000);
return (date >= Today.startOf('day') && date <= Today.endOf('day'));
}
var TotalTxpForToday = function(data) {
if (!data) return;
if (data.key.indexOf('!txp!') < 0) return;
if (!data.value || !IsToday(data.value.createdOn)) return;
TotalTx++;
TotalAmount = TotalAmount + data.value.amount;
};
var TotalNewWalletForToday = function(data) {
if (!data) return;
if (data.key.indexOf('!main') < 0) return;
if (!data.value || !IsToday(data.value.createdOn)) return;
TotalNewWallets++;
};
var PrintStats = function() {
console.log('Stats for date : ', Today.format("YYYY-MM-DD"));
console.log('New wallets : ', TotalNewWallets);
console.log('Total tx : ', TotalTx);
console.log('Total amount in tx (satoshis) : ', TotalAmount);
};
var ProcessData = function(data) {
TotalTxpForToday(data);
TotalNewWalletForToday(data);
};
// streams
db.createReadStream().on('data', function(data) {
ProcessData(data);
}).on('error', function(err) {
console.log('Error : ', err);
process.exit(code = 1);
}).on('close', function() {
PrintStats();
process.exit(code = 0);
});

17
multilevel/multilevel.js

@ -1,17 +0,0 @@
#!/usr/bin/env node
var multilevel = require('multilevel');
var net = require('net');
var level = require('levelup');
var db = level('./db', {
valueEncoding: 'json'
});
var HOST = 'localhost';
var PORT = 3230;
console.log('Server started at port ' + PORT + '...');
net.createServer(function(con) {
con.pipe(multilevel.server(db)).pipe(con);
}).listen(PORT, HOST);

7
package.json

@ -26,14 +26,12 @@
"coveralls": "^2.11.2", "coveralls": "^2.11.2",
"express": "^4.10.0", "express": "^4.10.0",
"inherits": "^2.0.1", "inherits": "^2.0.1",
"leveldown": "^0.10.0",
"levelup": "^0.19.0",
"locker": "^0.1.0", "locker": "^0.1.0",
"locker-server": "^0.1.3", "locker-server": "^0.1.3",
"lodash": "^3.3.1", "lodash": "^3.3.1",
"mocha-lcov-reporter": "0.0.1", "mocha-lcov-reporter": "0.0.1",
"mongodb": "^2.0.27",
"morgan": "*", "morgan": "*",
"multilevel": "^6.1.0",
"npmlog": "^0.1.1", "npmlog": "^0.1.1",
"preconditions": "^1.0.7", "preconditions": "^1.0.7",
"read": "^1.0.5", "read": "^1.0.5",
@ -51,7 +49,8 @@
"memdown": "^1.0.0", "memdown": "^1.0.0",
"mocha": "^1.18.2", "mocha": "^1.18.2",
"sinon": "^1.10.3", "sinon": "^1.10.3",
"supertest": "*" "supertest": "*",
"tingodb": "^0.3.4"
}, },
"scripts": { "scripts": {
"start": "node bws.js", "start": "node bws.js",

63
scripts/level2mongo.js

@ -0,0 +1,63 @@
'use strict';
var LevelStorage = require('../lib/storage_leveldb');
var MongoStorage = require('../lib/storage');
var level = new LevelStorage({
dbPath: './db/bws.db',
});
var mongo = new MongoStorage();
mongo.connect({
host: 'localhost',
port: '27017'
}, function(err) {
if (err) throw err;
mongo.db.dropDatabase(function(err) {
if (err) throw err;
run(function(err) {
if (err) throw err;
console.log('All data successfully migrated');
process.exit(0);
// mongo._dump(function() {
// process.exit(0);
// });
});
});
});
function run(cb) {
level.db.readStream()
.on('data', function(data) {
migrate(data.key, data.value, function(err) {
if (err) throw err;
});
})
.on('error', function(err) {
return cb(err);
})
.on('end', function() {
return cb();
});
};
function migrate(key, value, cb) {
if (key.match(/^copayer!/)) {
value.copayerId = key.substring(key.indexOf('!') + 1);
mongo.db.collection('copayers_lookup').insert(value, cb);
} else if (key.match(/!addr!/)) {
value.walletId = key.substring(2, key.indexOf('!addr'));
mongo.db.collection('addresses').insert(value, cb);
} else if (key.match(/!not!/)) {
mongo.db.collection('notifications').insert(value, cb);
} else if (key.match(/!p?txp!/)) {
value.isPending = key.indexOf('!ptxp!') != -1;
mongo.db.collection('txs').insert(value, cb);
} else if (key.match(/!main$/)) {
mongo.db.collection('wallets').insert(value, cb);
} else {
return cb(new Error('Invalid key ' + key));
}
};

269
test/integration/server.js

@ -7,21 +7,26 @@ var inspect = require('util').inspect;
var chai = require('chai'); var chai = require('chai');
var sinon = require('sinon'); var sinon = require('sinon');
var should = chai.should(); var should = chai.should();
var levelup = require('levelup');
var memdown = require('memdown');
var log = require('npmlog'); var log = require('npmlog');
log.debug = log.verbose; log.debug = log.verbose;
var fs = require('fs');
var tingodb = require('tingodb')({
memStore: true
});
var Utils = require('../../lib/utils'); var Utils = require('../../lib/utils');
var WalletUtils = require('bitcore-wallet-utils'); var WalletUtils = require('bitcore-wallet-utils');
var Bitcore = WalletUtils.Bitcore; var Bitcore = WalletUtils.Bitcore;
var Storage = require('../../lib/storage'); var Storage = require('../../lib/storage');
var BlockchainMonitor = require('../../lib/blockchainmonitor'); var BlockchainMonitor = require('../../lib/blockchainmonitor');
var Wallet = require('../../lib/model/wallet'); var Model = require('../../lib/model');
var TxProposal = require('../../lib/model/txproposal'); var Wallet = Model.Wallet;
var Address = require('../../lib/model/address'); var TxProposal = Model.TxProposal;
var Copayer = require('../../lib/model/copayer'); var Address = Model.Address;
var Copayer = Model.Copayer;
var WalletService = require('../../lib/server'); var WalletService = require('../../lib/server');
var NotificationBroadcaster = require('../../lib/notificationbroadcaster'); var NotificationBroadcaster = require('../../lib/notificationbroadcaster');
var TestData = require('../testdata'); var TestData = require('../testdata');
@ -208,27 +213,45 @@ helpers.createAddresses = function(server, wallet, main, change, cb) {
var db, storage, blockchainExplorer; var db, storage, blockchainExplorer;
function openDb(cb) {
db = new tingodb.Db('./db/test', {});
return cb();
};
function resetDb(cb) {
if (!db) return cb();
db.dropDatabase(function(err) {
return cb();
});
};
describe('Wallet service', function() { describe('Wallet service', function() {
beforeEach(function() { before(function(done) {
db = levelup(memdown, { openDb(function() {
valueEncoding: 'json' storage = new Storage({
}); db: db
storage = new Storage({ });
db: db done();
}); });
blockchainExplorer = sinon.stub(); });
beforeEach(function(done) {
WalletService.initialize({ resetDb(function() {
storage: storage, blockchainExplorer = sinon.stub();
blockchainExplorer: blockchainExplorer, WalletService.initialize({
storage: storage,
blockchainExplorer: blockchainExplorer,
}, function() {
helpers.offset = 0;
done();
});
}); });
helpers.offset = 0;
}); });
after(function(done) {
WalletService.shutDown(done);
});
describe('#getInstanceWithAuth', function() { describe('#getInstanceWithAuth', function() {
beforeEach(function() {});
it('should get server instance for existing copayer', function(done) { it('should get server instance for existing copayer', function(done) {
@ -1172,6 +1195,7 @@ describe('Wallet service', function() {
helpers.stubUtxos(server, wallet, _.range(1, 9), function() { helpers.stubUtxos(server, wallet, _.range(1, 9), function() {
var txOpts = helpers.createProposalOpts('18PzpUFkFZE8zKWUPvfykkTxmB9oMR8qP7', 10, null, TestData.copayers[0].privKey_1H_0); var txOpts = helpers.createProposalOpts('18PzpUFkFZE8zKWUPvfykkTxmB9oMR8qP7', 10, null, TestData.copayers[0].privKey_1H_0);
server.createTx(txOpts, function(err, tx) { server.createTx(txOpts, function(err, tx) {
should.not.exist(err); should.not.exist(err);
should.exist(tx); should.exist(tx);
txid = tx.id; txid = tx.id;
@ -1814,9 +1838,7 @@ describe('Wallet service', function() {
var server, wallet, clock; var server, wallet, clock;
beforeEach(function(done) { beforeEach(function(done) {
if (server) return done();
this.timeout(5000); this.timeout(5000);
console.log('\tCreating TXS...');
clock = sinon.useFakeTimers(); clock = sinon.useFakeTimers();
helpers.createAndJoinWallet(1, 1, function(s, w) { helpers.createAndJoinWallet(1, 1, function(s, w) {
server = s; server = s;
@ -1824,7 +1846,7 @@ describe('Wallet service', function() {
helpers.stubUtxos(server, wallet, _.range(10), function() { helpers.stubUtxos(server, wallet, _.range(10), function() {
var txOpts = helpers.createProposalOpts('18PzpUFkFZE8zKWUPvfykkTxmB9oMR8qP7', 0.1, null, TestData.copayers[0].privKey_1H_0); var txOpts = helpers.createProposalOpts('18PzpUFkFZE8zKWUPvfykkTxmB9oMR8qP7', 0.1, null, TestData.copayers[0].privKey_1H_0);
async.eachSeries(_.range(10), function(i, next) { async.eachSeries(_.range(10), function(i, next) {
clock.tick(10000); clock.tick(10 * 1000);
server.createTx(txOpts, function(err, tx) { server.createTx(txOpts, function(err, tx) {
next(); next();
}); });
@ -1883,17 +1905,18 @@ describe('Wallet service', function() {
}); });
it('should txs from times 50 to 70', function(done) { it('should txs from times 50 to 70',
server.getTxs({ function(done) {
minTs: 50, server.getTxs({
maxTs: 70, minTs: 50,
}, function(err, txps) { maxTs: 70,
should.not.exist(err); }, function(err, txps) {
var times = _.pluck(txps, 'createdOn'); should.not.exist(err);
times.should.deep.equal([70, 60, 50]); var times = _.pluck(txps, 'createdOn');
done(); times.should.deep.equal([70, 60, 50]);
done();
});
}); });
});
}); });
describe('Notifications', function() { describe('Notifications', function() {
@ -2073,62 +2096,110 @@ describe('Wallet service', function() {
}); });
}); });
}); });
it('should delete a wallet', function(done) { it('should delete a wallet', function(done) {
var i = 0; server.removeWallet({}, function(err) {
var count = function() { should.not.exist(err);
return ++i; server.getWallet({}, function(err, w) {
}; should.exist(err);
server.storage._dump(function() { err.message.should.equal('Wallet not found');
i.should.above(1); should.not.exist(w);
server.removeWallet({}, function(err) { async.parallel([
i = 0;
server.storage._dump(function() { function(next) {
server.storage._dump(); server.storage.fetchAddresses(wallet.id, function(err, items) {
i.should.equal(0); items.length.should.equal(0);
next();
});
},
function(next) {
server.storage.fetchTxs(wallet.id, {}, function(err, items) {
items.length.should.equal(0);
next();
});
},
function(next) {
server.storage.fetchNotifications(wallet.id, {}, function(err, items) {
items.length.should.equal(0);
next();
});
},
], function(err) {
should.not.exist(err);
done(); done();
}, count); });
}); });
}, count); });
}); });
// creates 2 wallet, and deletes only 1. // creates 2 wallet, and deletes only 1.
it('should delete a wallet, and only that wallet', function(done) { it('should delete a wallet, and only that wallet', function(done) {
var i = 0; var server2, wallet2;
var db = []; async.series([
var cat = function(data) {
db.push(data); function(next) {
}; helpers.offset = 1;
server.storage._dump(function() { helpers.createAndJoinWallet(1, 1, function(s, w) {
var before = _.clone(db); server2 = s;
db.length.should.above(1); wallet2 = w;
helpers.offset = 1; helpers.stubUtxos(server2, wallet2, _.range(1, 3), function() {
helpers.createAndJoinWallet(2, 3, function(s, w) { var txOpts = helpers.createProposalOpts('18PzpUFkFZE8zKWUPvfykkTxmB9oMR8qP7', 0.1, 'some message', TestData.copayers[1].privKey_1H_0);
server = s; async.eachSeries(_.range(2), function(i, next) {
wallet = w; server2.createTx(txOpts, function(err, tx) {
should.not.exist(err);
helpers.stubUtxos(server, wallet, _.range(2), function() { next(err);
var txOpts = { });
toAddress: '18PzpUFkFZE8zKWUPvfykkTxmB9oMR8qP7', }, next);
amount: helpers.toSatoshi(0.1), });
};
async.eachSeries(_.range(2), function(i, next) {
server.createTx(txOpts, function(err, tx) {
next();
});
}, function() {
server.removeWallet({}, function(err) {
db = [];
server.storage._dump(function() {
var after = _.clone(db);
after.should.deep.equal(before);
done();
}, cat);
});
}, cat);
}); });
}); },
}, cat); function(next) {
server.removeWallet({}, next);
},
function(next) {
server.getWallet({}, function(err, wallet) {
should.exist(err);
err.message.should.contain('not found');
next();
});
},
function(next) {
server2.getWallet({}, function(err, wallet) {
should.not.exist(err);
should.exist(wallet);
wallet.id.should.equal(wallet2.id);
next();
});
},
function(next) {
server2.getMainAddresses({}, function(err, addresses) {
should.not.exist(err);
should.exist(addresses);
addresses.length.should.above(0);
next();
});
},
function(next) {
server2.getTxs({}, function(err, txs) {
should.not.exist(err);
should.exist(txs);
txs.length.should.equal(2);
next();
});
},
function(next) {
server2.getNotifications({}, function(err, notifications) {
should.not.exist(err);
should.exist(notifications);
notifications.length.should.above(0);
next();
});
},
], function(err) {
should.not.exist(err);
done();
});
}); });
}); });
@ -2967,29 +3038,37 @@ describe('Wallet service', function() {
describe('Blockchain monitor', function() { describe('Blockchain monitor', function() {
var addressSubscriber; var addressSubscriber;
beforeEach(function() { before(function(done) {
db = levelup(memdown, { openDb(function() {
valueEncoding: 'json' storage = new Storage({
}); db: db
storage = new Storage({ });
db: db done();
});
blockchainExplorer = sinon.stub();
WalletService.initialize({
storage: storage,
blockchainExplorer: blockchainExplorer,
}); });
helpers.offset = 0; });
beforeEach(function(done) {
addressSubscriber = sinon.stub(); addressSubscriber = sinon.stub();
addressSubscriber.subscribe = sinon.stub(); addressSubscriber.subscribe = sinon.stub();
sinon.stub(BlockchainMonitor.prototype, '_getAddressSubscriber').onFirstCall().returns(addressSubscriber); sinon.stub(BlockchainMonitor.prototype, '_getAddressSubscriber').onFirstCall().returns(addressSubscriber);
});
resetDb(function() {
blockchainExplorer = sinon.stub();
WalletService.initialize({
storage: storage,
blockchainExplorer: blockchainExplorer,
}, function() {
helpers.offset = 0;
done();
});
});
});
afterEach(function() { afterEach(function() {
BlockchainMonitor.prototype._getAddressSubscriber.restore(); BlockchainMonitor.prototype._getAddressSubscriber.restore();
}); });
after(function(done) {
WalletService.shutDown(done);
});
it('should subscribe wallet', function(done) { it('should subscribe wallet', function(done) {
var monitor = new BlockchainMonitor(); var monitor = new BlockchainMonitor();

193
test/storage.js

@ -0,0 +1,193 @@
'use strict';
var _ = require('lodash');
var async = require('async');
var chai = require('chai');
var sinon = require('sinon');
var should = chai.should();
var tingodb = require('tingodb')({
memStore: true
});
var Storage = require('../lib/storage');
var Model = require('../lib/model');
var db, storage;
function openDb(cb) {
db = new tingodb.Db('./db/test', {});
return cb();
};
function resetDb(cb) {
if (!db) return cb();
db.dropDatabase(function(err) {
return cb();
});
};
describe('Storage', function() {
before(function(done) {
openDb(function() {
storage = new Storage({
db: db
});
done();
});
});
beforeEach(function(done) {
resetDb(done);
});
describe('Store & fetch wallet', function() {
it('should correctly store and fetch wallet', function(done) {
var wallet = Model.Wallet.create({
id: '123',
name: 'my wallet',
m: 2,
n: 3,
});
should.exist(wallet);
storage.storeWallet(wallet, function(err) {
should.not.exist(err);
storage.fetchWallet('123', function(err, w) {
should.not.exist(err);
should.exist(w);
w.id.should.equal(wallet.id);
w.name.should.equal(wallet.name);
w.m.should.equal(wallet.m);
w.n.should.equal(wallet.n);
done();
})
});
});
it('should not return error if wallet not found', function(done) {
storage.fetchWallet('123', function(err, w) {
should.not.exist(err);
should.not.exist(w);
done();
});
});
});
describe('Copayer lookup', function() {
it('should correctly store and fetch copayer lookup', function(done) {
var wallet = Model.Wallet.create({
id: '123',
name: 'my wallet',
m: 2,
n: 3,
});
_.each(_.range(3), function(i) {
var copayer = Model.Copayer.create({
name: 'copayer ' + i,
xPubKey: 'xPubKey ' + i,
requestPubKey: 'requestPubKey ' + i,
});
wallet.addCopayer(copayer);
});
should.exist(wallet);
storage.storeWalletAndUpdateCopayersLookup(wallet, function(err) {
should.not.exist(err);
storage.fetchCopayerLookup(wallet.copayers[1].id, function(err, lookup) {
should.not.exist(err);
should.exist(lookup);
lookup.walletId.should.equal('123');
lookup.requestPubKey.should.equal('requestPubKey 1');
done();
})
});
});
it('should not return error if copayer not found', function(done) {
storage.fetchCopayerLookup('2', function(err, lookup) {
should.not.exist(err);
should.not.exist(lookup);
done();
});
});
});
describe('Transaction proposals', function() {
var wallet, proposals;
beforeEach(function(done) {
wallet = Model.Wallet.create({
id: '123',
name: 'my wallet',
m: 2,
n: 3,
});
_.each(_.range(3), function(i) {
var copayer = Model.Copayer.create({
name: 'copayer ' + i,
xPubKey: 'xPubKey ' + i,
requestPubKey: 'requestPubKey ' + i,
});
wallet.addCopayer(copayer);
});
should.exist(wallet);
storage.storeWalletAndUpdateCopayersLookup(wallet, function(err) {
should.not.exist(err);
proposals = _.map(_.range(4), function(i) {
var tx = Model.TxProposal.create({
walletId: '123',
creatorId: wallet.copayers[0].id,
amount: i + 100,
});
if (i % 2 == 0) {
tx.status = 'rejected';
tx.isPending().should.be.false;
}
return tx;
});
async.each(proposals, function(tx, next) {
storage.storeTx('123', tx, next);
}, function(err) {
should.not.exist(err);
done();
});
});
});
it('should fetch tx', function(done) {
storage.fetchTx('123', proposals[0].id, function(err, tx) {
should.not.exist(err);
should.exist(tx);
tx.id.should.equal(proposals[0].id);
tx.walletId.should.equal(proposals[0].walletId);
tx.creatorName.should.equal('copayer 0');
done();
});
});
it('should fetch all pending txs', function(done) {
storage.fetchPendingTxs('123', function(err, txs) {
should.not.exist(err);
should.exist(txs);
txs.length.should.equal(2);
txs = _.sortBy(txs, 'amount');
txs[0].amount.should.equal(101);
txs[1].amount.should.equal(103);
done();
});
});
it('should remove tx', function(done) {
storage.removeTx('123', proposals[0].id, function(err) {
should.not.exist(err);
storage.fetchTx('123', proposals[0].id, function(err, tx) {
should.not.exist(err);
should.not.exist(tx);
storage.fetchTxs('123', {}, function(err, txs) {
should.not.exist(err);
should.exist(txs);
txs.length.should.equal(3);
_.any(txs, {
id: proposals[0].id
}).should.be.false;
done();
});
});
});
});
});
});
Loading…
Cancel
Save