Browse Source

Merge pull request #546 from isocolsky/rm/2-step-balance

Remove two step balance & active addresses cache
activeAddress
Matias Alejo Garcia 8 years ago
committed by GitHub
parent
commit
163418f504
  1. 15
      lib/blockchainmonitor.js
  2. 2
      lib/expressapp.js
  3. 96
      lib/server.js
  4. 63
      lib/storage.js
  5. 38
      test/expressapp.js
  6. 309
      test/integration/server.js

15
lib/blockchainmonitor.js

@ -170,9 +170,7 @@ BlockchainMonitor.prototype._handleTxOuts = function(data) {
walletId: walletId,
});
self.storage.softResetTxHistoryCache(walletId, function() {
self._updateActiveAddresses(address, function() {
self._storeAndBroadcastNotification(notification, next);
});
self._storeAndBroadcastNotification(notification, next);
});
});
}, function(err) {
@ -180,17 +178,6 @@ BlockchainMonitor.prototype._handleTxOuts = function(data) {
});
};
BlockchainMonitor.prototype._updateActiveAddresses = function(address, cb) {
var self = this;
self.storage.storeActiveAddresses(address.walletId, address.address, function(err) {
if (err) {
log.warn('Could not update wallet cache', err);
}
return cb(err);
});
};
BlockchainMonitor.prototype._handleIncomingTx = function(data) {
this._handleTxId(data);
this._handleTxOuts(data);

2
lib/expressapp.js

@ -281,7 +281,6 @@ ExpressApp.prototype.start = function(opts, cb) {
getServerWithAuth(req, res, function(server) {
var opts = {};
if (req.query.includeExtendedInfo == '1') opts.includeExtendedInfo = true;
if (req.query.twoStep == '1') opts.twoStep = true;
server.getStatus(opts, function(err, status) {
if (err) return returnError(err, res, req);
@ -383,7 +382,6 @@ ExpressApp.prototype.start = function(opts, cb) {
router.get('/v1/balance/', function(req, res) {
getServerWithAuth(req, res, function(server) {
var opts = {};
if (req.query.twoStep == '1') opts.twoStep = true;
server.getBalance(opts, function(err, balance) {
if (err) return returnError(err, res, req);
res.json(balance);

96
lib/server.js

@ -393,7 +393,6 @@ WalletService.prototype.getWallet = function(opts, cb) {
/**
* Retrieves wallet status.
* @param {Object} opts
* @param {Object} opts.twoStep[=false] - Optional: use 2-step balance computation for improved performance
* @param {Object} opts.includeExtendedInfo - Include PKR info & address managers for wallet & copayers
* @returns {Object} status
*/
@ -1132,111 +1131,24 @@ WalletService.prototype._getBalanceFromAddresses = function(addresses, cb) {
});
};
WalletService.prototype._getBalanceOneStep = function(opts, cb) {
var self = this;
self.storage.fetchAddresses(self.walletId, function(err, addresses) {
if (err) return cb(err);
self._getBalanceFromAddresses(addresses, function(err, balance) {
if (err) return cb(err);
// Update cache
async.series([
function(next) {
self.storage.cleanActiveAddresses(self.walletId, next);
},
function(next) {
var active = _.pluck(balance.byAddress, 'address')
self.storage.storeActiveAddresses(self.walletId, active, next);
},
], function(err) {
if (err) {
log.warn('Could not update wallet cache', err);
}
return cb(null, balance);
});
});
});
};
WalletService.prototype._getActiveAddresses = function(cb) {
var self = this;
self.storage.fetchActiveAddresses(self.walletId, function(err, active) {
if (err) {
log.warn('Could not fetch active addresses from cache', err);
return cb();
}
if (!_.isArray(active)) return cb();
self.storage.fetchAddresses(self.walletId, function(err, allAddresses) {
if (err) return cb(err);
var now = Math.floor(Date.now() / 1000);
var recent = _.pluck(_.filter(allAddresses, function(address) {
return address.createdOn > (now - 24 * 3600);
}), 'address');
var result = _.union(active, recent);
var index = _.indexBy(allAddresses, 'address');
result = _.compact(_.map(result, function(r) {
return index[r];
}));
return cb(null, result);
});
});
};
/**
* Get wallet balance.
* @param {Object} opts
* @param {Boolean} opts.twoStep[=false] - Optional - Use 2 step balance computation for improved performance
* @returns {Object} balance - Total amount & locked amount.
*/
WalletService.prototype.getBalance = function(opts, cb) {
var self = this;
opts = opts || {};
if (!opts.twoStep)
return self._getBalanceOneStep(opts, cb);
self.storage.countAddresses(self.walletId, function(err, nbAddresses) {
self.storage.fetchAddresses(self.walletId, function(err, addresses) {
if (err) return cb(err);
if (nbAddresses < Defaults.TWO_STEP_BALANCE_THRESHOLD) {
return self._getBalanceOneStep(opts, cb);
}
self._getActiveAddresses(function(err, activeAddresses) {
self._getBalanceFromAddresses(addresses, function(err, balance) {
if (err) return cb(err);
if (!_.isArray(activeAddresses)) {
return self._getBalanceOneStep(opts, cb);
} else {
log.debug('Requesting partial balance for ' + activeAddresses.length + ' out of ' + nbAddresses + ' addresses');
self._getBalanceFromAddresses(activeAddresses, function(err, partialBalance) {
if (err) return cb(err);
cb(null, partialBalance);
setTimeout(function() {
self._getBalanceOneStep(opts, function(err, fullBalance) {
if (err) return;
if (!_.isEqual(partialBalance, fullBalance)) {
log.info('Balance in active addresses differs from final balance');
self._notify('BalanceUpdated', fullBalance, {
isGlobal: true
});
}
});
}, 1);
return;
});
}
return cb(null, balance);
});
});
};
/**
* Return info needed to send all funds in the wallet
* @param {Object} opts

63
lib/storage.js

@ -563,51 +563,6 @@ Storage.prototype.fetchEmailByNotification = function(notificationId, cb) {
});
};
Storage.prototype.cleanActiveAddresses = function(walletId, cb) {
var self = this;
async.series([
function(next) {
self.db.collection(collections.CACHE).remove({
walletId: walletId,
type: 'activeAddresses',
}, {
w: 1
}, next);
},
function(next) {
self.db.collection(collections.CACHE).insert({
walletId: walletId,
type: 'activeAddresses',
key: null
}, {
w: 1
}, next);
},
], cb);
};
Storage.prototype.storeActiveAddresses = function(walletId, addresses, cb) {
var self = this;
async.each(addresses, function(address, next) {
var record = {
walletId: walletId,
type: 'activeAddresses',
key: address,
};
self.db.collection(collections.CACHE).update({
walletId: record.walletId,
type: record.type,
key: record.key,
}, record, {
w: 1,
upsert: true,
}, next);
}, cb);
};
// -------- --------------------------- Total
// > Time >
// ^to <= ^from
@ -759,24 +714,6 @@ Storage.prototype.storeTxHistoryCache = function(walletId, totalItems, firstPosi
});
};
Storage.prototype.fetchActiveAddresses = function(walletId, cb) {
var self = this;
self.db.collection(collections.CACHE).find({
walletId: walletId,
type: 'activeAddresses',
}).toArray(function(err, result) {
if (err) return cb(err);
if (_.isEmpty(result)) return cb();
return cb(null, _.compact(_.pluck(result, 'key')));
});
};
Storage.prototype.storeFiatRate = function(providerName, rates, cb) {
var self = this;

38
test/expressapp.js

@ -150,44 +150,6 @@ describe('ExpressApp', function() {
});
});
describe('Balance', function() {
it('should handle cache argument', function(done) {
var server = {
getBalance: sinon.stub().callsArgWith(1, null, {}),
};
var TestExpressApp = proxyquire('../lib/expressapp', {
'./server': {
initialize: sinon.stub().callsArg(1),
getInstanceWithAuth: sinon.stub().callsArgWith(1, null, server),
}
});
start(TestExpressApp, function() {
var reqOpts = {
url: testHost + ':' + testPort + config.basePath + '/v1/balance',
headers: {
'x-identity': 'identity',
'x-signature': 'signature'
}
};
request(reqOpts, function(err, res, body) {
should.not.exist(err);
res.statusCode.should.equal(200);
var args = server.getBalance.getCalls()[0].args[0];
should.not.exist(args.twoStep);
reqOpts.url += '?twoStep=1';
request(reqOpts, function(err, res, body) {
should.not.exist(err);
res.statusCode.should.equal(200);
var args = server.getBalance.getCalls()[1].args[0];
args.twoStep.should.equal(true);
done();
});
});
});
});
});
describe('/v1/notifications', function(done) {
var server, TestExpressApp, clock;
beforeEach(function() {

309
test/integration/server.js

@ -1832,315 +1832,6 @@ describe('Wallet service', function() {
});
});
describe('#getBalance 2 steps', function() {
var server, wallet, clock;
var _threshold = Defaults.TWO_STEP_BALANCE_THRESHOLD;
beforeEach(function(done) {
clock = sinon.useFakeTimers(Date.now(), 'Date');
Defaults.TWO_STEP_BALANCE_THRESHOLD = 0;
helpers.createAndJoinWallet(1, 1, function(s, w) {
server = s;
wallet = w;
done();
});
});
afterEach(function() {
clock.restore();
Defaults.TWO_STEP_BALANCE_THRESHOLD = _threshold;
});
it('should get balance', function(done) {
helpers.stubUtxos(server, wallet, [1, 'u2', 3], function() {
server.getBalance({
twoStep: true
}, function(err, balance) {
should.not.exist(err);
should.exist(balance);
balance.totalAmount.should.equal(helpers.toSatoshi(6));
balance.lockedAmount.should.equal(0);
balance.availableAmount.should.equal(helpers.toSatoshi(6));
balance.totalConfirmedAmount.should.equal(helpers.toSatoshi(4));
balance.lockedConfirmedAmount.should.equal(0);
balance.availableConfirmedAmount.should.equal(helpers.toSatoshi(4));
should.exist(balance.byAddress);
balance.byAddress.length.should.equal(2);
balance.byAddress[0].amount.should.equal(helpers.toSatoshi(4));
balance.byAddress[1].amount.should.equal(helpers.toSatoshi(2));
setTimeout(done, 100);
});
});
});
it('should trigger notification when balance of non-prioritary addresses is updated', function(done) {
var oldAddrs, newAddrs;
async.series([
function(next) {
helpers.createAddresses(server, wallet, 2, 0, function(addrs) {
oldAddrs = addrs;
next();
});
},
function(next) {
clock.tick(7 * 24 * 3600 * 1000);
helpers.createAddresses(server, wallet, 2, 0, function(addrs) {
newAddrs = addrs;
server._getActiveAddresses(function(err, active) {
should.not.exist(err);
should.not.exist(active);
helpers.stubUtxos(server, wallet, [1, 2], {
addresses: [oldAddrs[0], newAddrs[0]],
}, function() {
next();
});
});
});
},
function(next) {
server.getBalance({
twoStep: true
}, function(err, balance) {
should.not.exist(err);
should.exist(balance);
balance.totalAmount.should.equal(helpers.toSatoshi(3));
next();
});
},
function(next) {
setTimeout(next, 100);
},
function(next) {
server._getActiveAddresses(function(err, active) {
should.not.exist(err);
should.exist(active);
active.length.should.equal(3);
next();
});
},
function(next) {
helpers.stubUtxos(server, wallet, 0.5, {
addresses: oldAddrs[1],
keepUtxos: true,
}, function() {
next();
});
},
function(next) {
server.getBalance({
twoStep: true
}, function(err, balance) {
should.not.exist(err);
should.exist(balance);
balance.totalAmount.should.equal(helpers.toSatoshi(3));
next();
});
},
function(next) {
setTimeout(next, 100);
},
function(next) {
server.getNotifications({}, function(err, notifications) {
should.not.exist(err);
var last = _.last(notifications);
last.type.should.equal('BalanceUpdated');
var balance = last.data;
balance.totalAmount.should.equal(helpers.toSatoshi(3.5));
next();
});
},
], function(err) {
should.not.exist(err);
done();
});
});
it('should not trigger notification when only balance of prioritary addresses is updated', function(done) {
var oldAddrs, newAddrs;
async.series([
function(next) {
helpers.createAddresses(server, wallet, 2, 0, function(addrs) {
oldAddrs = addrs;
next();
});
},
function(next) {
clock.tick(7 * 24 * 3600 * 1000);
helpers.createAddresses(server, wallet, 2, 0, function(addrs) {
newAddrs = addrs;
helpers.stubUtxos(server, wallet, [1, 2], {
addresses: newAddrs,
}, function() {
next();
});
});
},
function(next) {
server.getBalance({
twoStep: true
}, function(err, balance) {
should.not.exist(err);
should.exist(balance);
balance.totalAmount.should.equal(helpers.toSatoshi(3));
next();
});
},
function(next) {
setTimeout(next, 100);
},
function(next) {
helpers.stubUtxos(server, wallet, 0.5, {
addresses: newAddrs[0],
keepUtxos: true,
}, function() {
next();
});
},
function(next) {
server.getBalance({
twoStep: true
}, function(err, balance) {
should.not.exist(err);
should.exist(balance);
balance.totalAmount.should.equal(helpers.toSatoshi(3.5));
next();
});
},
function(next) {
setTimeout(next, 100);
},
function(next) {
server.getNotifications({}, function(err, notifications) {
should.not.exist(err);
var last = _.last(notifications);
last.type.should.not.equal('BalanceUpdated');
next();
});
},
], function(err) {
should.not.exist(err);
done();
});
});
it('should resolve balance of new addresses immediately', function(done) {
var addresses;
async.series([
function(next) {
helpers.createAddresses(server, wallet, 4, 0, function(addrs) {
addresses = addrs;
helpers.stubUtxos(server, wallet, [1, 2], {
addresses: _.take(addresses, 2),
}, function() {
next();
});
});
},
function(next) {
server.getBalance({
twoStep: true
}, function(err, balance) {
should.not.exist(err);
should.exist(balance);
balance.totalAmount.should.equal(helpers.toSatoshi(3));
next();
});
},
function(next) {
server.createAddress({}, function(err, addr) {
helpers.stubUtxos(server, wallet, 0.5, {
addresses: addr,
keepUtxos: true,
}, function() {
next();
});
});
},
function(next) {
server.getBalance({
twoStep: true
}, function(err, balance) {
should.not.exist(err);
should.exist(balance);
balance.totalAmount.should.equal(helpers.toSatoshi(3.5));
next();
});
},
function(next) {
setTimeout(next, 100);
},
function(next) {
server.getNotifications({}, function(err, notifications) {
should.not.exist(err);
var last = _.last(notifications);
last.type.should.not.equal('BalanceUpdated');
next();
});
},
], function(err) {
should.not.exist(err);
done();
});
});
it('should not perform 2 steps when nb of addresses below threshold', function(done) {
var oldAddrs, newAddrs;
Defaults.TWO_STEP_BALANCE_THRESHOLD = 5;
async.series([
function(next) {
helpers.createAddresses(server, wallet, 2, 0, function(addrs) {
oldAddrs = addrs;
next();
});
},
function(next) {
clock.tick(7 * 24 * 3600 * 1000);
helpers.createAddresses(server, wallet, 2, 0, function(addrs) {
newAddrs = addrs;
helpers.stubUtxos(server, wallet, [1, 2], {
addresses: [oldAddrs[0], newAddrs[0]],
}, function() {
next();
});
});
},
function(next) {
server.getBalance({
twoStep: true
}, function(err, balance) {
should.not.exist(err);
should.exist(balance);
balance.totalAmount.should.equal(helpers.toSatoshi(3));
next();
});
},
function(next) {
setTimeout(next, 100);
},
function(next) {
server.getNotifications({}, function(err, notifications) {
should.not.exist(err);
var last = _.last(notifications);
last.type.should.not.equal('BalanceUpdated');
next();
});
},
], function(err) {
should.not.exist(err);
done();
});
});
});
describe('#getFeeLevels', function() {
var server, wallet, levels;
before(function() {

Loading…
Cancel
Save