Browse Source

Merge pull request #1 from isocolsky/ref/cache

Ref/cache
activeAddress
Matias Alejo Garcia 9 years ago
committed by GitHub
parent
commit
8c032c9da8
  1. 14
      lib/blockchainexplorers/insight.js
  2. 142
      lib/server.js
  3. 230
      lib/storage.js
  4. 6
      test/integration/helpers.js
  5. 114
      test/integration/server.js

14
lib/blockchainexplorers/insight.js

@ -168,6 +168,20 @@ Insight.prototype.estimateFee = function(nbBlocks, cb) {
});
};
Insight.prototype.getBlockchainHeight = function(cb) {
var path = this.apiPrefix + '/sync';
var args = {
method: 'GET',
path: path,
json: true,
};
this._doRequest(args, function(err, res, body) {
if (err || res.statusCode !== 200) return cb(_parseErr(err, res));
return cb(null, body.blockChainHeight);
});
};
Insight.prototype.initSocket = function() {
// sockets always use the first server on the pull

142
lib/server.js

@ -2617,6 +2617,7 @@ WalletService.prototype._normalizeTxHistory = function(txs) {
return {
txid: tx.txid,
confirmations: tx.confirmations,
blockheight: tx.blockheight,
fees: parseInt((tx.fees * 1e8).toFixed(0)),
time: t,
inputs: inputs,
@ -2625,6 +2626,19 @@ WalletService.prototype._normalizeTxHistory = function(txs) {
});
};
var _lastKnownBlockchainHeight;
WalletService.prototype._getLastKnownBlockchainHeight = function(network, cb) {
var self = this;
var bc = self._getBlockchainExplorer(network);
bc.getBlockchainHeight(function(err, height) {
if (!err && height > 0) {
_lastKnownBlockchainHeight = height;
}
return cb(null, _lastKnownBlockchainHeight);
});
};
/**
* Retrieves all transactions (incoming & outgoing)
* Times are in UNIX EPOCH
@ -2771,70 +2785,90 @@ WalletService.prototype.getTxHistory = function(opts, cb) {
});
};
// Get addresses for this wallet
self.storage.fetchAddresses(self.walletId, function(err, addresses) {
if (err) return cb(err);
if (addresses.length == 0) return cb(null, []);
var addressStrs = _.pluck(addresses, 'address');
var networkName = Bitcore.Address(addressStrs[0]).toObject().network;
function getNormalizedTxs(addresses, from, to, cb) {
var txs, fromCache, totalItems;
var useCache = addresses.length >= Defaults.HISTORY_CACHE_ADDRESS_THRESOLD;
var network = Bitcore.Address(addresses[0].address).toObject().network;
var bc = self._getBlockchainExplorer(networkName);
var from = opts.skip || 0;
var to = from + opts.limit;
var normalizedTxs, fromCache;
async.series([
async.parallel([
function(next) {
if (!useCache) return next();
self.storage.getTxHistoryCache(self.walletId, from, to, function(err, res) {
if (err) return next(err);
if (!res || !res[0]) return next();
txs = res;
fromCache = true;
return next()
});
},
function(next) {
self.storage.fetchTxs(self.walletId, {}, next);
if (txs) return next();
var addressStrs = _.pluck(addresses, 'address');
var bc = self._getBlockchainExplorer(network);
bc.getTransactions(addressStrs, from, to, function(err, rawTxs, total) {
if (err) return next(err);
txs = self._normalizeTxHistory(rawTxs);
totalItems = total;
return next();
});
},
function(next) {
var totalItems;
if (!useCache || fromCache) return next();
async.series([
var txsToCache = _.filter(txs, function(i) {
return i.confirmations >= Defaults.CONFIRMATIONS_TO_START_CACHING;
}).reverse();
function(nextSerie) {
if (!useCache) return nextSerie();
if (!txsToCache.length) return next();
self.storage.getTxHistoryCache(self.walletId, from, to, function(err, res) {
if (err) return nextSerie(err);
if (!res || !res[0]) return nextSerie();
var fwdIndex = totalItems - to;
if (fwdIndex < 0) fwdIndex = 0;
self.storage.storeTxHistoryCache(self.walletId, totalItems, fwdIndex, txsToCache, next);
},
function(next) {
if (!txs) return next();
// Fix tx confirmations
self._getLastKnownBlockchainHeight(network, function(err, height) {
if (err || !height) return next(err);
_.each(txs, function(tx) {
if (tx.blockheight >= 0) {
tx.confirmations = height - tx.blockheight;
}
});
next();
});
},
], function(err) {
if (err) return cb(err);
normalizedTxs = res;
fromCache = true;
return cb(null, {
items: txs,
fromCache: fromCache
});
});
};
return nextSerie()
});
},
function(nextSerie) {
if (normalizedTxs) return nextSerie();
bc.getTransactions(addressStrs, from, to, function(err, rawTxs, total) {
if (err) return cb(err);
normalizedTxs = self._normalizeTxHistory(rawTxs);
totalItems = total;
return nextSerie();
});
},
function(nextSerie) {
if (!useCache || fromCache) return nextSerie();
// Get addresses for this wallet
self.storage.fetchAddresses(self.walletId, function(err, addresses) {
if (err) return cb(err);
if (addresses.length == 0) return cb(null, []);
var txsToCache = _.filter(normalizedTxs, function(i) {
return i.confirmations >= Defaults.CONFIRMATIONS_TO_START_CACHING;
});
var from = opts.skip || 0;
var to = from + opts.limit;
if (!txsToCache.length)
return nextSerie(err);
self.storage.storeTxHistoryCache(self.walletId, totalItems, to, txsToCache, function(err) {
nextSerie(err);
})
}
],
function(err) {
if (err) return next(err);
return next();
});
async.parallel([
function(next) {
getNormalizedTxs(addresses, from, to, next);
},
function(next) {
self.storage.fetchTxs(self.walletId, {}, next);
},
function(next) {
self.storage.fetchTxNotes(self.walletId, {}, next);
@ -2842,14 +2876,12 @@ WalletService.prototype.getTxHistory = function(opts, cb) {
], function(err, res) {
if (err) return cb(err);
var proposals = res[0];
var notes = res[2];
var finalTxs = decorate(normalizedTxs, addresses, proposals, notes);
var finalTxs = decorate(res[0].items, addresses, res[1], res[2]);
if (fromCache)
if (res[0].fromCache)
log.debug("History from cache for:", self.walletId, from, to);
return cb(null, finalTxs, !!fromCache);
return cb(null, finalTxs, !!res[0].fromCache);
});
});
};

230
lib/storage.js

@ -609,6 +609,62 @@ Storage.prototype.storeActiveAddresses = function(walletId, addresses, cb) {
}, cb);
};
// -------- --------------------------- Total
// > Time >
// ^to <= ^from
// ^fwdIndex => ^end
Storage.prototype.getTxHistoryCache = function(walletId, from, to, cb) {
var self = this;
$.checkArgument(from >= 0);
$.checkArgument(from <= to);
self.db.collection(collections.CACHE).findOne({
walletId: walletId,
type: 'historyCacheStatus',
key: null
}, function(err, result) {
if (err) return cb(err);
if (!result) return cb();
if (!result.isUpdated) return cb();
// Reverse indexes
var fwdIndex = result.totalItems - to;
if (fwdIndex < 0) {
fwdIndex = 0;
}
var end = result.totalItems - from;
// nothing to return
if (end <= 0) return cb(null, []);
// Cache is OK.
self.db.collection(collections.CACHE).find({
walletId: walletId,
type: 'historyCache',
key: {
$gte: fwdIndex,
$lt: end
},
}).sort({
key: -1,
}).toArray(function(err, result) {
if (err) return cb(err);
if (!result) return cb();
if (result.length < end - fwdIndex) {
// some items are not yet defined.
return cb();
}
var txs = _.pluck(result, 'tx');
return cb(null, txs);
});
})
};
Storage.prototype.softResetAllTxHistoryCache = function(cb) {
this.db.collection(collections.CACHE).update({
type: 'historyCacheStatus',
@ -619,8 +675,6 @@ Storage.prototype.softResetAllTxHistoryCache = function(cb) {
}, cb);
};
Storage.prototype.softResetTxHistoryCache = function(walletId, cb) {
this.db.collection(collections.CACHE).update({
walletId: walletId,
@ -634,15 +688,15 @@ Storage.prototype.softResetTxHistoryCache = function(walletId, cb) {
}, cb);
};
Storage.prototype.clearTxHistoryCache = function(walletId, cb) {
var self = this;
self.db.collection(collections.CACHE).remove({
walletId: walletId,
type: 'historyCache'
type: 'historyCache',
}, {
multi: 1
}, function(err) {
if (err) return cb(err);
self.db.collection(collections.CACHE).remove({
walletId: walletId,
type: 'historyCacheStatus',
@ -653,169 +707,40 @@ Storage.prototype.clearTxHistoryCache = function(walletId, cb) {
});
};
var bucketKey = function(bucket, size) {
return bucket + ':' + size;
};
var BUCKET_SIZE = 100;
// items should be in CHRONOLOGICAL order
Storage.prototype.storeTxHistoryCache = function(walletId, totalItems, firstPosition, items, cb) {
$.shouldBeNumber(firstPosition);
$.checkArgument(firstPosition >= 0);
$.shouldBeNumber(totalItems);
$.checkArgument(totalItems >= 0);
Storage.prototype._doGetTxHistoryCacheBucket = function(walletId, fwdIndex, cb) {
var self = this;
var bucket = Math.floor(fwdIndex / BUCKET_SIZE);
var bucketStart = bucket * BUCKET_SIZE;
self.db.collection(collections.CACHE).findOne({
walletId: walletId,
type: 'historyCache',
key: bucketKey(bucket, BUCKET_SIZE),
}, function(err, res1) {
if (err) return cb(err);
var h1 = res1 ? res1.history || [] : [];
if (h1.length < BUCKET_SIZE)
h1[BUCKET_SIZE-1] = null;
self.db.collection(collections.CACHE).findOne({
walletId: walletId,
type: 'historyCache',
key: bucketKey(bucket + 1, BUCKET_SIZE),
}, function(err, res2) {
if (err) return cb(err);
var h2 = res2 ? res2.history || [] : [];
if (h2.length < BUCKET_SIZE)
h2[BUCKET_SIZE-1] = null;
var h = (new Array(bucketStart)).concat(h1).concat(h2);
return cb(null, h, bucket);
});
_.each(items, function(item, i) {
item.position = firstPosition + i;
});
};
var cacheIsComplete = (firstPosition == 0);
Storage.prototype.getTxHistoryCache = function(walletId, from, to, cb) {
var self = this;
$.checkArgument(from >= 0);
$.checkArgument(from <= to);
self.db.collection(collections.CACHE).findOne({
walletId: walletId,
type: 'historyCacheStatus',
key: null
}, function(err, result) {
if (err) return cb(err);
if (!result) return cb();
if (!result.isUpdated) return cb();
// Reverse indexes
var fwdIndex = result.totalItems - to;
if (fwdIndex < 0) {
fwdIndex = 0;
}
var end = result.totalItems - from;
// nothing to return
if (end <= 0)
return cb(null, []);
self._doGetTxHistoryCacheBucket(walletId, fwdIndex, function(err, h) {
if (err) return cb(err);
if (!h) //|| result.history.length < end)
return cb();
var res = h.slice(fwdIndex, end);
if (_.any(res, function(i) {
return !i;
})) {
// some items are not yet defined.
return cb();
}
return cb(null, res.reverse());
});
})
};
Storage.prototype._doSaveTxHistoryCache = function(walletId, fwdIndex, items, cb) {
$.checkArgument(items.length < BUCKET_SIZE);
var self = this;
var bucket = Math.floor(fwdIndex / BUCKET_SIZE);
var bucketStart = bucket * BUCKET_SIZE;
self._doGetTxHistoryCacheBucket(walletId, fwdIndex, function(err, h, bucket) {
// Add new items
_.each(items, function(i) {
h[fwdIndex++] = i;
});
var toSave = h.slice(bucketStart, bucketStart + BUCKET_SIZE);
// TODO: check txid uniqness?
async.each(items, function(item, next) {
var pos = item.position;
delete item.position;
self.db.collection(collections.CACHE).update({
walletId: walletId,
type: 'historyCache',
key: bucketKey(bucket, BUCKET_SIZE),
key: pos,
}, {
walletId: walletId,
type: 'historyCache',
key: bucketKey(bucket, BUCKET_SIZE),
history: toSave,
key: pos,
tx: item,
}, {
w: 1,
upsert: true,
}, function(err) {
if (err) return cb(err);
bucket++;
bucketStart += BUCKET_SIZE;
toSave = h.slice(bucketStart, bucketStart + BUCKET_SIZE);
self.db.collection(collections.CACHE).update({
walletId: walletId,
type: 'historyCache',
key: bucketKey(bucket, BUCKET_SIZE),
}, {
walletId: walletId,
type: 'historyCache',
key: bucketKey(bucket, BUCKET_SIZE),
history: toSave,
}, {
w: 1,
upsert: true,
}, cb);
});
});
};
// items should be in reverse CHRONOLOGICAL order
// firstPosition, is the
Storage.prototype.storeTxHistoryCache = function(walletId, totalItems, to, items, cb) {
$.shouldBeNumber(to);
$.checkArgument(to >= 0);
$.shouldBeNumber(totalItems);
$.checkArgument(totalItems >= 0);
var fwdIndex = totalItems - to;
if (fwdIndex < 0) fwdIndex = 0;
var self = this;
self._doSaveTxHistoryCache(walletId, fwdIndex, items.reverse(), function(err) {
}, next);
}, function(err) {
if (err) return cb(err);
var now = Date.now();
self.db.collection(collections.CACHE).update({
walletId: walletId,
type: 'historyCacheStatus',
@ -825,7 +750,8 @@ Storage.prototype.storeTxHistoryCache = function(walletId, totalItems, to, items
type: 'historyCacheStatus',
key: null,
totalItems: totalItems,
updatedOn: now,
updatedOn: Date.now(),
isComplete: cacheIsComplete,
isUpdated: true,
}, {
w: 1,

6
test/integration/helpers.js

@ -233,9 +233,9 @@ helpers._parseAmount = function(str) {
switch (match[3]) {
default:
case 'btc':
case 'btc':
result.amount = Utils.strip(+match[2] * 1e8);
break;
break;
case 'bit':
result.amount = Utils.strip(+match[2] * 1e2);
break
@ -541,6 +541,7 @@ helpers.historyCacheTest = function(items) {
}
}],
confirmations: 1,
blockheight: 423499,
time: 1424472242,
blocktime: 1424472242,
valueOut: 0.00031454,
@ -553,6 +554,7 @@ helpers.historyCacheTest = function(items) {
var t = _.clone(template);
t.txid = 'txid:' + i;
t.confirmations = items - i - 1;
t.blockheight = i;
t.time = t.blocktime = i;
ret.unshift(t);
});

114
test/integration/server.js

@ -4077,6 +4077,7 @@ describe('Wallet service', function() {
});
it('should include the note in tx history listing', function(done) {
helpers.createAddresses(server, wallet, 1, 1, function(mainAddresses, changeAddress) {
blockchainExplorer.getBlockchainHeight = sinon.stub().callsArgWith(0, null, 1000);
server._normalizeTxHistory = sinon.stub().returnsArg(0);
var txs = [{
txid: '123',
@ -4092,7 +4093,7 @@ describe('Wallet service', function() {
amount: 200,
}],
}];
helpers.stubHistory(txs, 100);
helpers.stubHistory(txs);
server.editTxNote({
txid: '123',
body: 'just some note'
@ -5898,6 +5899,7 @@ describe('Wallet service', function() {
describe('#getTxHistory', function() {
var server, wallet, mainAddresses, changeAddresses;
beforeEach(function(done) {
blockchainExplorer.getBlockchainHeight = sinon.stub().callsArgWith(0, null, 1000);
helpers.createAndJoinWallet(1, 1, function(s, w) {
server = s;
wallet = w;
@ -6219,7 +6221,7 @@ describe('Wallet service', function() {
});
});
describe.only('#getTxHistory cache', function() {
describe('#getTxHistory cache', function() {
var server, wallet, mainAddresses, changeAddresses;
var _threshold = Defaults.HISTORY_CACHE_ADDRESS_THRESOLD;
beforeEach(function(done) {
@ -6239,12 +6241,16 @@ describe('Wallet service', function() {
});
it('should store partial cache tx history from insight', function(done) {
var h = helpers.historyCacheTest(200);
var skip = 31;
var limit = 10;
var totalItems = 200;
var currentHeight = 1000;
var h = helpers.historyCacheTest(totalItems);
helpers.stubHistory(h);
blockchainExplorer.getBlockchainHeight = sinon.stub().callsArgWith(0, null, currentHeight);
var storeTxHistoryCacheSpy = sinon.spy(server.storage, 'storeTxHistoryCache');
var skip = 31;
var limit = 10;
server.getTxHistory({
skip: skip,
@ -6259,10 +6265,13 @@ describe('Wallet service', function() {
txs.length.should.equal(limit);
var calls = storeTxHistoryCacheSpy.getCalls();
calls.length.should.equal(1);
calls[0].args[1].should.equal(totalItems); // total
calls[0].args[2].should.equal(totalItems - skip - limit); // position
calls[0].args[3].length.should.equal(5); // 5 txs have confirmations>= 100
// should be reversed!
calls[0].args[3][0].confirmations.should.equal(skip + limit - 1);
calls[0].args[3][0].confirmations.should.equal(currentHeight - (totalItems - (skip + limit)));
calls[0].args[3][0].txid.should.equal(h[skip + limit - 1].txid);
server.storage.storeTxHistoryCache.restore();
done();
@ -6270,9 +6279,10 @@ describe('Wallet service', function() {
});
it('should not cache tx history from insight', function(done) {
it('should not cache tx history when requesting txs with low # of confirmations', function(done) {
var h = helpers.historyCacheTest(200);
helpers.stubHistory(h);
blockchainExplorer.getBlockchainHeight = sinon.stub().callsArgWith(0, null, 1000);
var storeTxHistoryCacheSpy = sinon.spy(server.storage, 'storeTxHistoryCache');
server.getTxHistory({
skip: 0,
@ -6289,11 +6299,15 @@ describe('Wallet service', function() {
it('should store cache all tx history from insight', function(done) {
var h = helpers.historyCacheTest(200);
helpers.stubHistory(h);
var storeTxHistoryCacheSpy = sinon.spy(server.storage, 'storeTxHistoryCache');
var skip = 195;
var limit = 5;
var totalItems = 200;
var currentHeight = 1000;
var h = helpers.historyCacheTest(totalItems);
helpers.stubHistory(h);
blockchainExplorer.getBlockchainHeight = sinon.stub().callsArgWith(0, null, currentHeight);
var storeTxHistoryCacheSpy = sinon.spy(server.storage, 'storeTxHistoryCache');
server.getTxHistory({
skip: skip,
@ -6306,19 +6320,86 @@ describe('Wallet service', function() {
var calls = storeTxHistoryCacheSpy.getCalls();
calls.length.should.equal(1);
calls[0].args[1].should.equal(totalItems); // total
calls[0].args[2].should.equal(totalItems - skip - limit); // position
calls[0].args[3].length.should.equal(5);
// should be reversed!
calls[0].args[3][0].confirmations.should.equal(199);
calls[0].args[3][0].txid.should.equal(h[199].txid);
calls[0].args[3][0].confirmations.should.equal(currentHeight);
calls[0].args[3][0].txid.should.equal(h[totalItems - 1].txid);
server.storage.storeTxHistoryCache.restore();
done();
});
});
it('should get real # of confirmations based on current block height', function(done) {
var _confirmations = Defaults.CONFIRMATIONS_TO_START_CACHING;
Defaults.CONFIRMATIONS_TO_START_CACHING = 6;
var h = helpers.historyCacheTest(20);
helpers.stubHistory(h);
var storeTxHistoryCacheSpy = sinon.spy(server.storage, 'storeTxHistoryCache');
var skip = 0;
var limit = 20;
blockchainExplorer.getBlockchainHeight = sinon.stub().callsArgWith(0, null, 100);
server.getTxHistory({
skip: skip,
limit: limit,
}, function(err, txs) {
should.not.exist(err);
should.exist(txs);
txs.length.should.equal(limit);
var calls = storeTxHistoryCacheSpy.getCalls();
calls.length.should.equal(1);
_.first(txs).confirmations.should.equal(81);
_.last(txs).confirmations.should.equal(100);
server.storage.storeTxHistoryCache.restore();
Defaults.CONFIRMATIONS_TO_START_CACHING = _confirmations;
done();
});
});
it('should get cached # of confirmations if current height unknown', function(done) {
var _confirmations = Defaults.CONFIRMATIONS_TO_START_CACHING;
Defaults.CONFIRMATIONS_TO_START_CACHING = 6;
var h = helpers.historyCacheTest(20);
helpers.stubHistory(h);
var storeTxHistoryCacheSpy = sinon.spy(server.storage, 'storeTxHistoryCache');
var skip = 0;
var limit = 20;
var _getLastKnownBlockchainHeight = server._getLastKnownBlockchainHeight;
server._getLastKnownBlockchainHeight = sinon.stub().callsArgWith(1, null, null);
server.getTxHistory({
skip: skip,
limit: limit,
}, function(err, txs) {
should.not.exist(err);
should.exist(txs);
txs.length.should.equal(limit);
var calls = storeTxHistoryCacheSpy.getCalls();
calls.length.should.equal(1);
_.first(txs).confirmations.should.equal(0);
_.last(txs).confirmations.should.equal(19);
server.storage.storeTxHistoryCache.restore();
Defaults.CONFIRMATIONS_TO_START_CACHING = _confirmations;
server._getLastKnownBlockchainHeight = _getLastKnownBlockchainHeight;
done();
});
});
describe('Downloading history', function() {
var h;
beforeEach(function(done) {
blockchainExplorer.getBlockchainHeight = sinon.stub().callsArgWith(0, null, 1000);
h = helpers.historyCacheTest(200);
helpers.stubHistory(h);
server.storage.clearTxHistoryCache(server.walletId, function() {
@ -6332,7 +6413,6 @@ describe('Wallet service', function() {
skip: i,
limit: 5,
}, function(err, txs, fromCache) {
should.not.exist(err);
should.exist(txs);
txs.length.should.equal(5);
@ -6342,15 +6422,15 @@ describe('Wallet service', function() {
next();
});
}, function() {
// Ask more that cached.
async.eachSeries(_.range(0, 210, 7), function(i, next) {
async.eachSeries(_.range(0, 200, 5), function(i, next) {
server.getTxHistory({
skip: i,
limit: 7,
limit: 5,
}, function(err, txs, fromCache) {
should.not.exist(err);
should.exist(txs);
var s = h.slice(i, i + 7);
txs.length.should.equal(5);
var s = h.slice(i, i + 5);
_.pluck(txs, 'txid').should.deep.equal(_.pluck(s, 'txid'));
fromCache.should.equal(i >= Defaults.CONFIRMATIONS_TO_START_CACHING && i < 200);
next();

Loading…
Cancel
Save