Browse Source

implement "buckets" to split big values in mongodb

activeAddress
Matias Alejo Garcia 9 years ago
parent
commit
f7ec6e3a0b
No known key found for this signature in database GPG Key ID: 2470DB551277AB3
  1. 8
      lib/server.js
  2. 230
      lib/storage.js
  3. 20
      test/integration/server.js

8
lib/server.js

@ -2638,6 +2638,7 @@ WalletService.prototype._normalizeTxHistory = function(txs) {
WalletService.prototype.getTxHistory = function(opts, cb) { WalletService.prototype.getTxHistory = function(opts, cb) {
var self = this; var self = this;
opts = opts || {}; opts = opts || {};
opts.limit = (_.isUndefined(opts.limit) ? Defaults.HISTORY_LIMIT : opts.limit); opts.limit = (_.isUndefined(opts.limit) ? Defaults.HISTORY_LIMIT : opts.limit);
if (opts.limit > Defaults.HISTORY_LIMIT) if (opts.limit > Defaults.HISTORY_LIMIT)
@ -2821,14 +2822,11 @@ WalletService.prototype.getTxHistory = function(opts, cb) {
var txsToCache = _.filter(normalizedTxs, function(i) { var txsToCache = _.filter(normalizedTxs, function(i) {
return i.confirmations >= Defaults.CONFIRMATIONS_TO_START_CACHING; return i.confirmations >= Defaults.CONFIRMATIONS_TO_START_CACHING;
}).reverse(); });
if (!txsToCache.length) if (!txsToCache.length)
return nextSerie(err); return nextSerie(err);
self.storage.storeTxHistoryCache(self.walletId, totalItems, to, txsToCache, function(err) {
var fwdIndex = totalItems - to;
if (fwdIndex < 0) fwdIndex = 0;
self.storage.storeTxHistoryCache(self.walletId, totalItems, fwdIndex, txsToCache, function(err) {
nextSerie(err); nextSerie(err);
}) })
} }

230
lib/storage.js

@ -609,63 +609,6 @@ Storage.prototype.storeActiveAddresses = function(walletId, addresses, cb) {
}, cb); }, cb);
}; };
// -------- --------------------------- Total
// > Time >
// ^to <= ^from
// ^fwdIndex => ^end
Storage.prototype.getTxHistoryCache = function(walletId, from, to, cb) {
var self = this;
$.checkArgument(from >= 0);
$.checkArgument(from <= to);
self.db.collection(collections.CACHE).findOne({
walletId: walletId,
type: 'historyCacheStatus',
key: null
}, function(err, result) {
if (err) return cb(err);
if (!result) return cb();
if (!result.isUpdated) return cb();
// Reverse indexes
var fwdIndex = result.totalItems - to;
if (fwdIndex < 0) {
fwdIndex = 0;
}
var end = result.totalItems - from;
// nothing to return
if (end <=0)
return cb(null, []);
// Cache is OK.
self.db.collection(collections.CACHE).findOne({
walletId: walletId,
type: 'historyCache',
key: null
}, function(err, result) {
if (err) return cb(err);
if (!result || result.history.length < end)
return cb();
var ret = result.history.slice(fwdIndex, end);
if (_.any(ret, function(i) {
return !i;
})) {
// some items are not yet defined.
return cb();
}
return cb(null, ret.reverse());
});
})
};
Storage.prototype.softResetAllTxHistoryCache = function(cb) { Storage.prototype.softResetAllTxHistoryCache = function(cb) {
this.db.collection(collections.CACHE).update({ this.db.collection(collections.CACHE).update({
type: 'historyCacheStatus', type: 'historyCacheStatus',
@ -696,10 +639,9 @@ Storage.prototype.clearTxHistoryCache = function(walletId, cb) {
var self = this; var self = this;
self.db.collection(collections.CACHE).remove({ self.db.collection(collections.CACHE).remove({
walletId: walletId, walletId: walletId,
type: 'historyCache', type: 'historyCache'
key: null
}, { }, {
w: 1 multi: 1
}, function(err) { }, function(err) {
self.db.collection(collections.CACHE).remove({ self.db.collection(collections.CACHE).remove({
walletId: walletId, walletId: walletId,
@ -711,75 +653,191 @@ Storage.prototype.clearTxHistoryCache = function(walletId, cb) {
}); });
}; };
// items should be in CHRONOLOGICAL order var bucketKey = function(bucket, size) {
// firstPosition, is the return bucket + ':' + size;
Storage.prototype.storeTxHistoryCache = function(walletId, totalItems, firstPosition, items, cb) { };
$.shouldBeNumber(firstPosition);
$.checkArgument(firstPosition >= 0); var BUCKET_SIZE = 100;
$.shouldBeNumber(totalItems);
$.checkArgument(totalItems >= 0);
Storage.prototype._doGetTxHistoryCacheBucket = function(walletId, fwdIndex, cb) {
var self = this; var self = this;
var bucket = Math.floor(fwdIndex / BUCKET_SIZE);
var bucketStart = bucket * BUCKET_SIZE;
self.db.collection(collections.CACHE).findOne({ self.db.collection(collections.CACHE).findOne({
walletId: walletId, walletId: walletId,
type: 'historyCache', type: 'historyCache',
key: bucketKey(bucket, BUCKET_SIZE),
}, function(err, res1) {
if (err) return cb(err);
var h1 = res1 ? res1.history || [] : [];
if (h1.length < BUCKET_SIZE)
h1[BUCKET_SIZE-1] = null;
self.db.collection(collections.CACHE).findOne({
walletId: walletId,
type: 'historyCache',
key: bucketKey(bucket + 1, BUCKET_SIZE),
}, function(err, res2) {
if (err) return cb(err);
var h2 = res2 ? res2.history || [] : [];
if (h2.length < BUCKET_SIZE)
h2[BUCKET_SIZE-1] = null;
var h = (new Array(bucketStart)).concat(h1).concat(h2);
return cb(null, h, bucket);
});
});
};
Storage.prototype.getTxHistoryCache = function(walletId, from, to, cb) {
var self = this;
$.checkArgument(from >= 0);
$.checkArgument(from <= to);
self.db.collection(collections.CACHE).findOne({
walletId: walletId,
type: 'historyCacheStatus',
key: null key: null
}, function(err, result) { }, function(err, result) {
if (err) return cb(err); if (err) return cb(err);
if (!result) return cb();
if (!result.isUpdated) return cb();
// Reverse indexes
var fwdIndex = result.totalItems - to;
result = result || {}; if (fwdIndex < 0) {
var h = result.history || []; fwdIndex = 0;
}
var end = result.totalItems - from;
//create a sparce array, from the input // nothing to return
_.each(items, function(i) { if (end <= 0)
h[firstPosition++] = i; return cb(null, []);
self._doGetTxHistoryCacheBucket(walletId, fwdIndex, function(err, h) {
if (err) return cb(err);
if (!h) //|| result.history.length < end)
return cb();
var res = h.slice(fwdIndex, end);
if (_.any(res, function(i) {
return !i;
})) {
// some items are not yet defined.
return cb();
}
return cb(null, res.reverse());
}); });
})
};
Storage.prototype._doSaveTxHistoryCache = function(walletId, fwdIndex, items, cb) {
$.checkArgument(items.length < BUCKET_SIZE);
var self = this;
var bucket = Math.floor(fwdIndex / BUCKET_SIZE);
var bucketStart = bucket * BUCKET_SIZE;
self._doGetTxHistoryCacheBucket(walletId, fwdIndex, function(err, h, bucket) {
// Add new items
_.each(items, function(i) {
h[fwdIndex++] = i;
});
// TODO: check txid uniqness? var toSave = h.slice(bucketStart, bucketStart + BUCKET_SIZE);
self.db.collection(collections.CACHE).update({ self.db.collection(collections.CACHE).update({
walletId: walletId, walletId: walletId,
type: 'historyCache', type: 'historyCache',
key: null key: bucketKey(bucket, BUCKET_SIZE),
}, { }, {
walletId: walletId, walletId: walletId,
type: 'historyCache', type: 'historyCache',
key: null, key: bucketKey(bucket, BUCKET_SIZE),
history: h history: toSave,
}, { }, {
w: 1, w: 1,
upsert: true, upsert: true,
}, function(err) { }, function(err) {
if (err) return cb(err); if (err) return cb(err);
var cacheIsComplete = !!h[0]; bucket++;
var now = Date.now(); bucketStart += BUCKET_SIZE;
toSave = h.slice(bucketStart, bucketStart + BUCKET_SIZE);
self.db.collection(collections.CACHE).update({ self.db.collection(collections.CACHE).update({
walletId: walletId, walletId: walletId,
type: 'historyCacheStatus', type: 'historyCache',
key: null key: bucketKey(bucket, BUCKET_SIZE),
}, { }, {
walletId: walletId, walletId: walletId,
type: 'historyCacheStatus', type: 'historyCache',
key: null, key: bucketKey(bucket, BUCKET_SIZE),
totalItems: totalItems, history: toSave,
updatedOn: now,
isComplete: cacheIsComplete,
isUpdated: true,
}, { }, {
w: 1, w: 1,
upsert: true, upsert: true,
}, cb); }, cb);
}); });
});
};
// items should be in reverse CHRONOLOGICAL order
// firstPosition, is the
Storage.prototype.storeTxHistoryCache = function(walletId, totalItems, to, items, cb) {
$.shouldBeNumber(to);
$.checkArgument(to >= 0);
$.shouldBeNumber(totalItems);
$.checkArgument(totalItems >= 0);
var fwdIndex = totalItems - to;
if (fwdIndex < 0) fwdIndex = 0;
var self = this;
self._doSaveTxHistoryCache(walletId, fwdIndex, items.reverse(), function(err) {
if (err) return cb(err);
var now = Date.now();
self.db.collection(collections.CACHE).update({
walletId: walletId,
type: 'historyCacheStatus',
key: null
}, {
walletId: walletId,
type: 'historyCacheStatus',
key: null,
totalItems: totalItems,
updatedOn: now,
isUpdated: true,
}, {
w: 1,
upsert: true,
}, cb);
}); });
}; };
Storage.prototype.fetchActiveAddresses = function(walletId, cb) { Storage.prototype.fetchActiveAddresses = function(walletId, cb) {
var self = this; var self = this;

20
test/integration/server.js

@ -6219,7 +6219,7 @@ describe('Wallet service', function() {
}); });
}); });
describe('#getTxHistory cache', function() { describe.only('#getTxHistory cache', function() {
var server, wallet, mainAddresses, changeAddresses; var server, wallet, mainAddresses, changeAddresses;
var _threshold = Defaults.HISTORY_CACHE_ADDRESS_THRESOLD; var _threshold = Defaults.HISTORY_CACHE_ADDRESS_THRESOLD;
beforeEach(function(done) { beforeEach(function(done) {
@ -6259,9 +6259,6 @@ describe('Wallet service', function() {
txs.length.should.equal(limit); txs.length.should.equal(limit);
var calls = storeTxHistoryCacheSpy.getCalls(); var calls = storeTxHistoryCacheSpy.getCalls();
calls.length.should.equal(1); calls.length.should.equal(1);
calls[0].args[1].should.equal(200); // total
calls[0].args[2].should.equal(200 - skip - limit); // position
calls[0].args[3].length.should.equal(5); // 5 txs have confirmations>= 100 calls[0].args[3].length.should.equal(5); // 5 txs have confirmations>= 100
// should be reversed! // should be reversed!
@ -6309,8 +6306,6 @@ describe('Wallet service', function() {
var calls = storeTxHistoryCacheSpy.getCalls(); var calls = storeTxHistoryCacheSpy.getCalls();
calls.length.should.equal(1); calls.length.should.equal(1);
calls[0].args[1].should.equal(200); // total
calls[0].args[2].should.equal(200 - skip - limit); // position
calls[0].args[3].length.should.equal(5); calls[0].args[3].length.should.equal(5);
// should be reversed! // should be reversed!
@ -6323,9 +6318,12 @@ describe('Wallet service', function() {
describe('Downloading history', function() { describe('Downloading history', function() {
var h; var h;
beforeEach(function() { beforeEach(function(done) {
h = helpers.historyCacheTest(200); h = helpers.historyCacheTest(200);
helpers.stubHistory(h); helpers.stubHistory(h);
server.storage.clearTxHistoryCache(server.walletId, function() {
done();
});
}); });
it('from 0 to 200, two times, in order', function(done) { it('from 0 to 200, two times, in order', function(done) {
@ -6377,15 +6375,15 @@ describe('Wallet service', function() {
next(); next();
}); });
}, function() { }, function() {
async.eachSeries(_.range(0, 200, 5), function(i, next) { async.eachSeries(_.range(0, 190, 7), function(i, next) {
server.getTxHistory({ server.getTxHistory({
skip: i, skip: i,
limit: 5, limit: 7,
}, function(err, txs, fromCache) { }, function(err, txs, fromCache) {
should.not.exist(err); should.not.exist(err);
should.exist(txs); should.exist(txs);
txs.length.should.equal(5); txs.length.should.equal(7);
var s = h.slice(i, i + 5); var s = h.slice(i, i + 7);
_.pluck(txs, 'txid').should.deep.equal(_.pluck(s, 'txid')); _.pluck(txs, 'txid').should.deep.equal(_.pluck(s, 'txid'));
fromCache.should.equal(i >= Defaults.CONFIRMATIONS_TO_START_CACHING); fromCache.should.equal(i >= Defaults.CONFIRMATIONS_TO_START_CACHING);
next(); next();

Loading…
Cancel
Save