From 09e778212c02013f08d784f5d6a4d4e99f7b98c0 Mon Sep 17 00:00:00 2001
From: Ivan Socolsky <>
Date: Wed, 11 Jan 2017 18:15:00 -0300
Subject: [PATCH] Revert "Remove two step balance & active addresses cache"

 lib/blockchainmonitor.js   |  15 +-
 lib/expressapp.js          |   2 +
 lib/server.js              |  96 +++++++++++-
 lib/storage.js             |  63 ++++++++
 test/expressapp.js         |  38 +++++
 test/integration/server.js | 309 +++++++++++++++++++++++++++++++++++++
 6 files changed, 518 insertions(+), 5 deletions(-)

diff --git a/lib/blockchainmonitor.js b/lib/blockchainmonitor.js
index e2df13a..8545522 100644
--- a/lib/blockchainmonitor.js
+++ b/lib/blockchainmonitor.js
@@ -170,7 +170,9 @@ BlockchainMonitor.prototype._handleTxOuts = function(data) {
         walletId: walletId,
       });, function() {
-        self._storeAndBroadcastNotification(notification, next);
+        self._updateActiveAddresses(address, function() {
+          self._storeAndBroadcastNotification(notification, next);
+        });
   }, function(err) {
@@ -178,6 +180,17 @@ BlockchainMonitor.prototype._handleTxOuts = function(data) {
+BlockchainMonitor.prototype._updateActiveAddresses = function(address, cb) {
+  var self = this;
+, address.address, function(err) {
+    if (err) {
+      log.warn('Could not update wallet cache', err);
+    }
+    return cb(err);
+  });
 BlockchainMonitor.prototype._handleIncomingTx = function(data) {
diff --git a/lib/expressapp.js b/lib/expressapp.js
index c9fb8ef..bfc402b 100644
--- a/lib/expressapp.js
+++ b/lib/expressapp.js
@@ -281,6 +281,7 @@ ExpressApp.prototype.start = function(opts, cb) {
     getServerWithAuth(req, res, function(server) {
       var opts = {};
       if (req.query.includeExtendedInfo == '1') opts.includeExtendedInfo = true;
+      if (req.query.twoStep == '1') opts.twoStep = true;
       server.getStatus(opts, function(err, status) {
         if (err) return returnError(err, res, req);
@@ -382,6 +383,7 @@ ExpressApp.prototype.start = function(opts, cb) {
   router.get('/v1/balance/', function(req, res) {
     getServerWithAuth(req, res, function(server) {
       var opts = {};
+      if (req.query.twoStep == '1') opts.twoStep = true;
       server.getBalance(opts, function(err, balance) {
         if (err) return returnError(err, res, req);
diff --git a/lib/server.js b/lib/server.js
index e594aa0..214318d 100644
--- a/lib/server.js
+++ b/lib/server.js
@@ -393,6 +393,7 @@ WalletService.prototype.getWallet = function(opts, cb) {
  * Retrieves wallet status.
  * @param {Object} opts
+ * @param {Object} opts.twoStep[=false] - Optional: use 2-step balance computation for improved performance
  * @param {Object} opts.includeExtendedInfo - Include PKR info & address managers for wallet & copayers
  * @returns {Object} status
@@ -1131,24 +1132,111 @@ WalletService.prototype._getBalanceFromAddresses = function(addresses, cb) {
+WalletService.prototype._getBalanceOneStep = function(opts, cb) {
+  var self = this;
+, function(err, addresses) {
+    if (err) return cb(err);
+    self._getBalanceFromAddresses(addresses, function(err, balance) {
+      if (err) return cb(err);
+      // Update cache
+      async.series([
+        function(next) {
+, next);
+        },
+        function(next) {
+          var active = _.pluck(balance.byAddress, 'address')
+, active, next);
+        },
+      ], function(err) {
+        if (err) {
+          log.warn('Could not update wallet cache', err);
+        }
+        return cb(null, balance);
+      });
+    });
+  });
+WalletService.prototype._getActiveAddresses = function(cb) {
+  var self = this;
+, function(err, active) {
+    if (err) {
+      log.warn('Could not fetch active addresses from cache', err);
+      return cb();
+    }
+    if (!_.isArray(active)) return cb();
+, function(err, allAddresses) {
+      if (err) return cb(err);
+      var now = Math.floor( / 1000);
+      var recent = _.pluck(_.filter(allAddresses, function(address) {
+        return address.createdOn > (now - 24 * 3600);
+      }), 'address');
+      var result = _.union(active, recent);
+      var index = _.indexBy(allAddresses, 'address');
+      result = _.compact(, function(r) {
+        return index[r];
+      }));
+      return cb(null, result);
+    });
+  });
  * Get wallet balance.
  * @param {Object} opts
+ * @param {Boolean} opts.twoStep[=false] - Optional - Use 2 step balance computation for improved performance
  * @returns {Object} balance - Total amount & locked amount.
 WalletService.prototype.getBalance = function(opts, cb) {
   var self = this;
-, function(err, addresses) {
+  opts = opts || {};
+  if (!opts.twoStep)
+    return self._getBalanceOneStep(opts, cb);
+, function(err, nbAddresses) {
     if (err) return cb(err);
-    self._getBalanceFromAddresses(addresses, function(err, balance) {
+    if (nbAddresses < Defaults.TWO_STEP_BALANCE_THRESHOLD) {
+      return self._getBalanceOneStep(opts, cb);
+    }
+    self._getActiveAddresses(function(err, activeAddresses) {
       if (err) return cb(err);
-      return cb(null, balance);
+      if (!_.isArray(activeAddresses)) {
+        return self._getBalanceOneStep(opts, cb);
+      } else {
+        log.debug('Requesting partial balance for ' + activeAddresses.length + ' out of ' + nbAddresses + ' addresses');
+        self._getBalanceFromAddresses(activeAddresses, function(err, partialBalance) {
+          if (err) return cb(err);
+          cb(null, partialBalance);
+          setTimeout(function() {
+            self._getBalanceOneStep(opts, function(err, fullBalance) {
+              if (err) return;
+              if (!_.isEqual(partialBalance, fullBalance)) {
+      'Balance in active addresses differs from final balance');
+                self._notify('BalanceUpdated', fullBalance, {
+                  isGlobal: true
+                });
+              }
+            });
+          }, 1);
+          return;
+        });
+      }
  * Return info needed to send all funds in the wallet
  * @param {Object} opts
diff --git a/lib/storage.js b/lib/storage.js
index a10013c..f63f968 100644
--- a/lib/storage.js
+++ b/lib/storage.js
@@ -563,6 +563,51 @@ Storage.prototype.fetchEmailByNotification = function(notificationId, cb) {
+Storage.prototype.cleanActiveAddresses = function(walletId, cb) {
+  var self = this;
+  async.series([
+    function(next) {
+      self.db.collection(collections.CACHE).remove({
+        walletId: walletId,
+        type: 'activeAddresses',
+      }, {
+        w: 1
+      }, next);
+    },
+    function(next) {
+      self.db.collection(collections.CACHE).insert({
+        walletId: walletId,
+        type: 'activeAddresses',
+        key: null
+      }, {
+        w: 1
+      }, next);
+    },
+  ], cb);
+Storage.prototype.storeActiveAddresses = function(walletId, addresses, cb) {
+  var self = this;
+  async.each(addresses, function(address, next) {
+    var record = {
+      walletId: walletId,
+      type: 'activeAddresses',
+      key: address,
+    };
+    self.db.collection(collections.CACHE).update({
+      walletId: record.walletId,
+      type: record.type,
+      key: record.key,
+    }, record, {
+      w: 1,
+      upsert: true,
+    }, next);
+  }, cb);
 // --------         ---------------------------  Total
 //           > Time >                  
 //                       ^to     <=  ^from
@@ -714,6 +759,24 @@ Storage.prototype.storeTxHistoryCache = function(walletId, totalItems, firstPosi
+Storage.prototype.fetchActiveAddresses = function(walletId, cb) {
+  var self = this;
+  self.db.collection(collections.CACHE).find({
+    walletId: walletId,
+    type: 'activeAddresses',
+  }).toArray(function(err, result) {
+    if (err) return cb(err);
+    if (_.isEmpty(result)) return cb();
+    return cb(null, _.compact(_.pluck(result, 'key')));
+  });
 Storage.prototype.storeFiatRate = function(providerName, rates, cb) {
   var self = this;
diff --git a/test/expressapp.js b/test/expressapp.js
index 0a07626..d437297 100644
--- a/test/expressapp.js
+++ b/test/expressapp.js
@@ -150,6 +150,44 @@ describe('ExpressApp', function() {
+      describe('Balance', function() {
+        it('should handle cache argument', function(done) {
+          var server = {
+            getBalance: sinon.stub().callsArgWith(1, null, {}),
+          };
+          var TestExpressApp = proxyquire('../lib/expressapp', {
+            './server': {
+              initialize: sinon.stub().callsArg(1),
+              getInstanceWithAuth: sinon.stub().callsArgWith(1, null, server),
+            }
+          });
+          start(TestExpressApp, function() {
+            var reqOpts = {
+              url: testHost + ':' + testPort + config.basePath + '/v1/balance',
+              headers: {
+                'x-identity': 'identity',
+                'x-signature': 'signature'
+              }
+            };
+            request(reqOpts, function(err, res, body) {
+              should.not.exist(err);
+              res.statusCode.should.equal(200);
+              var args = server.getBalance.getCalls()[0].args[0];
+              should.not.exist(args.twoStep);
+              reqOpts.url += '?twoStep=1';
+              request(reqOpts, function(err, res, body) {
+                should.not.exist(err);
+                res.statusCode.should.equal(200);
+                var args = server.getBalance.getCalls()[1].args[0];
+                args.twoStep.should.equal(true);
+                done();
+              });
+            });
+          });
+        });
+      });
       describe('/v1/notifications', function(done) {
         var server, TestExpressApp, clock;
         beforeEach(function() {
diff --git a/test/integration/server.js b/test/integration/server.js
index d6d71d9..aa66225 100644
--- a/test/integration/server.js
+++ b/test/integration/server.js
@@ -1832,6 +1832,315 @@ describe('Wallet service', function() {
+  describe('#getBalance 2 steps', function() {
+    var server, wallet, clock;
+    var _threshold = Defaults.TWO_STEP_BALANCE_THRESHOLD;
+    beforeEach(function(done) {
+      clock = sinon.useFakeTimers(, 'Date');
+      helpers.createAndJoinWallet(1, 1, function(s, w) {
+        server = s;
+        wallet = w;
+        done();
+      });
+    });
+    afterEach(function() {
+      clock.restore();
+      Defaults.TWO_STEP_BALANCE_THRESHOLD = _threshold;
+    });
+    it('should get balance', function(done) {
+      helpers.stubUtxos(server, wallet, [1, 'u2', 3], function() {
+        server.getBalance({
+          twoStep: true
+        }, function(err, balance) {
+          should.not.exist(err);
+          should.exist(balance);
+          balance.totalAmount.should.equal(helpers.toSatoshi(6));
+          balance.lockedAmount.should.equal(0);
+          balance.availableAmount.should.equal(helpers.toSatoshi(6));
+          balance.totalConfirmedAmount.should.equal(helpers.toSatoshi(4));
+          balance.lockedConfirmedAmount.should.equal(0);
+          balance.availableConfirmedAmount.should.equal(helpers.toSatoshi(4));
+          should.exist(balance.byAddress);
+          balance.byAddress.length.should.equal(2);
+          balance.byAddress[0].amount.should.equal(helpers.toSatoshi(4));
+          balance.byAddress[1].amount.should.equal(helpers.toSatoshi(2));
+          setTimeout(done, 100);
+        });
+      });
+    });
+    it('should trigger notification when balance of non-prioritary addresses is updated', function(done) {
+      var oldAddrs, newAddrs;
+      async.series([
+        function(next) {
+          helpers.createAddresses(server, wallet, 2, 0, function(addrs) {
+            oldAddrs = addrs;
+            next();
+          });
+        },
+        function(next) {
+          clock.tick(7 * 24 * 3600 * 1000);
+          helpers.createAddresses(server, wallet, 2, 0, function(addrs) {
+            newAddrs = addrs;
+            server._getActiveAddresses(function(err, active) {
+              should.not.exist(err);
+              should.not.exist(active);
+              helpers.stubUtxos(server, wallet, [1, 2], {
+                addresses: [oldAddrs[0], newAddrs[0]],
+              }, function() {
+                next();
+              });
+            });
+          });
+        },
+        function(next) {
+          server.getBalance({
+            twoStep: true
+          }, function(err, balance) {
+            should.not.exist(err);
+            should.exist(balance);
+            balance.totalAmount.should.equal(helpers.toSatoshi(3));
+            next();
+          });
+        },
+        function(next) {
+          setTimeout(next, 100);
+        },
+        function(next) {
+          server._getActiveAddresses(function(err, active) {
+            should.not.exist(err);
+            should.exist(active);
+            active.length.should.equal(3);
+            next();
+          });
+        },
+        function(next) {
+          helpers.stubUtxos(server, wallet, 0.5, {
+            addresses: oldAddrs[1],
+            keepUtxos: true,
+          }, function() {
+            next();
+          });
+        },
+        function(next) {
+          server.getBalance({
+            twoStep: true
+          }, function(err, balance) {
+            should.not.exist(err);
+            should.exist(balance);
+            balance.totalAmount.should.equal(helpers.toSatoshi(3));
+            next();
+          });
+        },
+        function(next) {
+          setTimeout(next, 100);
+        },
+        function(next) {
+          server.getNotifications({}, function(err, notifications) {
+            should.not.exist(err);
+            var last = _.last(notifications);
+            last.type.should.equal('BalanceUpdated');
+            var balance =;
+            balance.totalAmount.should.equal(helpers.toSatoshi(3.5));
+            next();
+          });
+        },
+      ], function(err) {
+        should.not.exist(err);
+        done();
+      });
+    });
+    it('should not trigger notification when only balance of prioritary addresses is updated', function(done) {
+      var oldAddrs, newAddrs;
+      async.series([
+        function(next) {
+          helpers.createAddresses(server, wallet, 2, 0, function(addrs) {
+            oldAddrs = addrs;
+            next();
+          });
+        },
+        function(next) {
+          clock.tick(7 * 24 * 3600 * 1000);
+          helpers.createAddresses(server, wallet, 2, 0, function(addrs) {
+            newAddrs = addrs;
+            helpers.stubUtxos(server, wallet, [1, 2], {
+              addresses: newAddrs,
+            }, function() {
+              next();
+            });
+          });
+        },
+        function(next) {
+          server.getBalance({
+            twoStep: true
+          }, function(err, balance) {
+            should.not.exist(err);
+            should.exist(balance);
+            balance.totalAmount.should.equal(helpers.toSatoshi(3));
+            next();
+          });
+        },
+        function(next) {
+          setTimeout(next, 100);
+        },
+        function(next) {
+          helpers.stubUtxos(server, wallet, 0.5, {
+            addresses: newAddrs[0],
+            keepUtxos: true,
+          }, function() {
+            next();
+          });
+        },
+        function(next) {
+          server.getBalance({
+            twoStep: true
+          }, function(err, balance) {
+            should.not.exist(err);
+            should.exist(balance);
+            balance.totalAmount.should.equal(helpers.toSatoshi(3.5));
+            next();
+          });
+        },
+        function(next) {
+          setTimeout(next, 100);
+        },
+        function(next) {
+          server.getNotifications({}, function(err, notifications) {
+            should.not.exist(err);
+            var last = _.last(notifications);
+            last.type.should.not.equal('BalanceUpdated');
+            next();
+          });
+        },
+      ], function(err) {
+        should.not.exist(err);
+        done();
+      });
+    });
+    it('should resolve balance of new addresses immediately', function(done) {
+      var addresses;
+      async.series([
+        function(next) {
+          helpers.createAddresses(server, wallet, 4, 0, function(addrs) {
+            addresses = addrs;
+            helpers.stubUtxos(server, wallet, [1, 2], {
+              addresses: _.take(addresses, 2),
+            }, function() {
+              next();
+            });
+          });
+        },
+        function(next) {
+          server.getBalance({
+            twoStep: true
+          }, function(err, balance) {
+            should.not.exist(err);
+            should.exist(balance);
+            balance.totalAmount.should.equal(helpers.toSatoshi(3));
+            next();
+          });
+        },
+        function(next) {
+          server.createAddress({}, function(err, addr) {
+            helpers.stubUtxos(server, wallet, 0.5, {
+              addresses: addr,
+              keepUtxos: true,
+            }, function() {
+              next();
+            });
+          });
+        },
+        function(next) {
+          server.getBalance({
+            twoStep: true
+          }, function(err, balance) {
+            should.not.exist(err);
+            should.exist(balance);
+            balance.totalAmount.should.equal(helpers.toSatoshi(3.5));
+            next();
+          });
+        },
+        function(next) {
+          setTimeout(next, 100);
+        },
+        function(next) {
+          server.getNotifications({}, function(err, notifications) {
+            should.not.exist(err);
+            var last = _.last(notifications);
+            last.type.should.not.equal('BalanceUpdated');
+            next();
+          });
+        },
+      ], function(err) {
+        should.not.exist(err);
+        done();
+      });
+    });
+    it('should not perform 2 steps when nb of addresses below threshold', function(done) {
+      var oldAddrs, newAddrs;
+      async.series([
+        function(next) {
+          helpers.createAddresses(server, wallet, 2, 0, function(addrs) {
+            oldAddrs = addrs;
+            next();
+          });
+        },
+        function(next) {
+          clock.tick(7 * 24 * 3600 * 1000);
+          helpers.createAddresses(server, wallet, 2, 0, function(addrs) {
+            newAddrs = addrs;
+            helpers.stubUtxos(server, wallet, [1, 2], {
+              addresses: [oldAddrs[0], newAddrs[0]],
+            }, function() {
+              next();
+            });
+          });
+        },
+        function(next) {
+          server.getBalance({
+            twoStep: true
+          }, function(err, balance) {
+            should.not.exist(err);
+            should.exist(balance);
+            balance.totalAmount.should.equal(helpers.toSatoshi(3));
+            next();
+          });
+        },
+        function(next) {
+          setTimeout(next, 100);
+        },
+        function(next) {
+          server.getNotifications({}, function(err, notifications) {
+            should.not.exist(err);
+            var last = _.last(notifications);
+            last.type.should.not.equal('BalanceUpdated');
+            next();
+          });
+        },
+      ], function(err) {
+        should.not.exist(err);
+        done();
+      });
+    });
+  });
   describe('#getFeeLevels', function() {
     var server, wallet, levels;
     before(function() {