From f7ec6e3a0bc323db8a472b2f99c08cefd39bfd43 Mon Sep 17 00:00:00 2001 From: Matias Alejo Garcia Date: Fri, 29 Jul 2016 18:15:25 -0300 Subject: [PATCH] implement "buckets" to split big values in mongodb --- lib/server.js | 8 +- lib/storage.js | 230 +++++++++++++++++++++++-------------- test/integration/server.js | 20 ++-- 3 files changed, 156 insertions(+), 102 deletions(-) diff --git a/lib/server.js b/lib/server.js index 80e6c72..7d16874 100644 --- a/lib/server.js +++ b/lib/server.js @@ -2638,6 +2638,7 @@ WalletService.prototype._normalizeTxHistory = function(txs) { WalletService.prototype.getTxHistory = function(opts, cb) { var self = this; + opts = opts || {}; opts.limit = (_.isUndefined(opts.limit) ? Defaults.HISTORY_LIMIT : opts.limit); if (opts.limit > Defaults.HISTORY_LIMIT) @@ -2821,14 +2822,11 @@ WalletService.prototype.getTxHistory = function(opts, cb) { var txsToCache = _.filter(normalizedTxs, function(i) { return i.confirmations >= Defaults.CONFIRMATIONS_TO_START_CACHING; - }).reverse(); + }); if (!txsToCache.length) return nextSerie(err); - - var fwdIndex = totalItems - to; - if (fwdIndex < 0) fwdIndex = 0; - self.storage.storeTxHistoryCache(self.walletId, totalItems, fwdIndex, txsToCache, function(err) { + self.storage.storeTxHistoryCache(self.walletId, totalItems, to, txsToCache, function(err) { nextSerie(err); }) } diff --git a/lib/storage.js b/lib/storage.js index 2f7cc06..f15b155 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -609,63 +609,6 @@ Storage.prototype.storeActiveAddresses = function(walletId, addresses, cb) { }, cb); }; -// -------- --------------------------- Total -// > Time > -// ^to <= ^from -// ^fwdIndex => ^end - -Storage.prototype.getTxHistoryCache = function(walletId, from, to, cb) { - var self = this; - $.checkArgument(from >= 0); - $.checkArgument(from <= to); - - self.db.collection(collections.CACHE).findOne({ - walletId: walletId, - type: 'historyCacheStatus', - key: null - }, function(err, result) { - if (err) return cb(err); - if (!result) return cb(); - if (!result.isUpdated) return cb(); - - // Reverse indexes - var fwdIndex = result.totalItems - to; - - if (fwdIndex < 0) { - fwdIndex = 0; - } - - var end = result.totalItems - from; - - // nothing to return - if (end <=0) - return cb(null, []); - - - // Cache is OK. - self.db.collection(collections.CACHE).findOne({ - walletId: walletId, - type: 'historyCache', - key: null - }, function(err, result) { - if (err) return cb(err); - - if (!result || result.history.length < end) - return cb(); - - var ret = result.history.slice(fwdIndex, end); - - if (_.any(ret, function(i) { - return !i; - })) { - // some items are not yet defined. - return cb(); - } - return cb(null, ret.reverse()); - }); - }) -}; - Storage.prototype.softResetAllTxHistoryCache = function(cb) { this.db.collection(collections.CACHE).update({ type: 'historyCacheStatus', @@ -696,10 +639,9 @@ Storage.prototype.clearTxHistoryCache = function(walletId, cb) { var self = this; self.db.collection(collections.CACHE).remove({ walletId: walletId, - type: 'historyCache', - key: null + type: 'historyCache' }, { - w: 1 + multi: 1 }, function(err) { self.db.collection(collections.CACHE).remove({ walletId: walletId, @@ -711,75 +653,191 @@ Storage.prototype.clearTxHistoryCache = function(walletId, cb) { }); }; -// items should be in CHRONOLOGICAL order -// firstPosition, is the -Storage.prototype.storeTxHistoryCache = function(walletId, totalItems, firstPosition, items, cb) { - $.shouldBeNumber(firstPosition); - $.checkArgument(firstPosition >= 0); - $.shouldBeNumber(totalItems); - $.checkArgument(totalItems >= 0); +var bucketKey = function(bucket, size) { + return bucket + ':' + size; +}; + +var BUCKET_SIZE = 100; +Storage.prototype._doGetTxHistoryCacheBucket = function(walletId, fwdIndex, cb) { var self = this; + var bucket = Math.floor(fwdIndex / BUCKET_SIZE); + var bucketStart = bucket * BUCKET_SIZE; + self.db.collection(collections.CACHE).findOne({ walletId: walletId, type: 'historyCache', + key: bucketKey(bucket, BUCKET_SIZE), + }, function(err, res1) { + if (err) return cb(err); + + var h1 = res1 ? res1.history || [] : []; + + if (h1.length < BUCKET_SIZE) + h1[BUCKET_SIZE-1] = null; + + self.db.collection(collections.CACHE).findOne({ + walletId: walletId, + type: 'historyCache', + key: bucketKey(bucket + 1, BUCKET_SIZE), + }, function(err, res2) { + if (err) return cb(err); + var h2 = res2 ? res2.history || [] : []; + + if (h2.length < BUCKET_SIZE) + h2[BUCKET_SIZE-1] = null; + var h = (new Array(bucketStart)).concat(h1).concat(h2); + return cb(null, h, bucket); + }); + }); +}; + + + +Storage.prototype.getTxHistoryCache = function(walletId, from, to, cb) { + var self = this; + $.checkArgument(from >= 0); + $.checkArgument(from <= to); + + self.db.collection(collections.CACHE).findOne({ + walletId: walletId, + type: 'historyCacheStatus', key: null }, function(err, result) { if (err) return cb(err); + if (!result) return cb(); + if (!result.isUpdated) return cb(); + + // Reverse indexes + var fwdIndex = result.totalItems - to; - result = result || {}; - var h = result.history || []; + if (fwdIndex < 0) { + fwdIndex = 0; + } + var end = result.totalItems - from; - //create a sparce array, from the input - _.each(items, function(i) { - h[firstPosition++] = i; + // nothing to return + if (end <= 0) + return cb(null, []); + + self._doGetTxHistoryCacheBucket(walletId, fwdIndex, function(err, h) { + if (err) return cb(err); + + if (!h) //|| result.history.length < end) + return cb(); + + var res = h.slice(fwdIndex, end); + + if (_.any(res, function(i) { + return !i; + })) { + // some items are not yet defined. + return cb(); + } + return cb(null, res.reverse()); }); + }) +}; + + +Storage.prototype._doSaveTxHistoryCache = function(walletId, fwdIndex, items, cb) { + $.checkArgument(items.length < BUCKET_SIZE); + + var self = this; + var bucket = Math.floor(fwdIndex / BUCKET_SIZE); + var bucketStart = bucket * BUCKET_SIZE; + + self._doGetTxHistoryCacheBucket(walletId, fwdIndex, function(err, h, bucket) { + // Add new items + _.each(items, function(i) { + h[fwdIndex++] = i; + }); - // TODO: check txid uniqness? + var toSave = h.slice(bucketStart, bucketStart + BUCKET_SIZE); self.db.collection(collections.CACHE).update({ walletId: walletId, type: 'historyCache', - key: null + key: bucketKey(bucket, BUCKET_SIZE), }, { walletId: walletId, type: 'historyCache', - key: null, - history: h + key: bucketKey(bucket, BUCKET_SIZE), + history: toSave, }, { w: 1, upsert: true, }, function(err) { if (err) return cb(err); - var cacheIsComplete = !!h[0]; - var now = Date.now(); + bucket++; + bucketStart += BUCKET_SIZE; + + toSave = h.slice(bucketStart, bucketStart + BUCKET_SIZE); self.db.collection(collections.CACHE).update({ walletId: walletId, - type: 'historyCacheStatus', - key: null + type: 'historyCache', + key: bucketKey(bucket, BUCKET_SIZE), }, { walletId: walletId, - type: 'historyCacheStatus', - key: null, - totalItems: totalItems, - updatedOn: now, - isComplete: cacheIsComplete, - isUpdated: true, + type: 'historyCache', + key: bucketKey(bucket, BUCKET_SIZE), + history: toSave, }, { w: 1, upsert: true, }, cb); }); + }); +}; + + + + + + +// items should be in reverse CHRONOLOGICAL order +// firstPosition, is the +Storage.prototype.storeTxHistoryCache = function(walletId, totalItems, to, items, cb) { + $.shouldBeNumber(to); + $.checkArgument(to >= 0); + $.shouldBeNumber(totalItems); + $.checkArgument(totalItems >= 0); + + var fwdIndex = totalItems - to; + if (fwdIndex < 0) fwdIndex = 0; + var self = this; + + self._doSaveTxHistoryCache(walletId, fwdIndex, items.reverse(), function(err) { + if (err) return cb(err); + + var now = Date.now(); + self.db.collection(collections.CACHE).update({ + walletId: walletId, + type: 'historyCacheStatus', + key: null + }, { + walletId: walletId, + type: 'historyCacheStatus', + key: null, + totalItems: totalItems, + updatedOn: now, + isUpdated: true, + }, { + w: 1, + upsert: true, + }, cb); }); }; + + Storage.prototype.fetchActiveAddresses = function(walletId, cb) { var self = this; diff --git a/test/integration/server.js b/test/integration/server.js index 47f30bd..4c78801 100644 --- a/test/integration/server.js +++ b/test/integration/server.js @@ -6219,7 +6219,7 @@ describe('Wallet service', function() { }); }); - describe('#getTxHistory cache', function() { + describe.only('#getTxHistory cache', function() { var server, wallet, mainAddresses, changeAddresses; var _threshold = Defaults.HISTORY_CACHE_ADDRESS_THRESOLD; beforeEach(function(done) { @@ -6259,9 +6259,6 @@ describe('Wallet service', function() { txs.length.should.equal(limit); var calls = storeTxHistoryCacheSpy.getCalls(); calls.length.should.equal(1); - - calls[0].args[1].should.equal(200); // total - calls[0].args[2].should.equal(200 - skip - limit); // position calls[0].args[3].length.should.equal(5); // 5 txs have confirmations>= 100 // should be reversed! @@ -6309,8 +6306,6 @@ describe('Wallet service', function() { var calls = storeTxHistoryCacheSpy.getCalls(); calls.length.should.equal(1); - calls[0].args[1].should.equal(200); // total - calls[0].args[2].should.equal(200 - skip - limit); // position calls[0].args[3].length.should.equal(5); // should be reversed! @@ -6323,9 +6318,12 @@ describe('Wallet service', function() { describe('Downloading history', function() { var h; - beforeEach(function() { + beforeEach(function(done) { h = helpers.historyCacheTest(200); helpers.stubHistory(h); + server.storage.clearTxHistoryCache(server.walletId, function() { + done(); + }); }); it('from 0 to 200, two times, in order', function(done) { @@ -6377,15 +6375,15 @@ describe('Wallet service', function() { next(); }); }, function() { - async.eachSeries(_.range(0, 200, 5), function(i, next) { + async.eachSeries(_.range(0, 190, 7), function(i, next) { server.getTxHistory({ skip: i, - limit: 5, + limit: 7, }, function(err, txs, fromCache) { should.not.exist(err); should.exist(txs); - txs.length.should.equal(5); - var s = h.slice(i, i + 5); + txs.length.should.equal(7); + var s = h.slice(i, i + 7); _.pluck(txs, 'txid').should.deep.equal(_.pluck(s, 'txid')); fromCache.should.equal(i >= Defaults.CONFIRMATIONS_TO_START_CACHING); next();