Browse Source

add cache resets

activeAddress
Matias Alejo Garcia 9 years ago
parent
commit
926506c394
No known key found for this signature in database GPG Key ID: 2470DB551277AB3
  1. 42
      lib/blockchainmonitor.js
  2. 79
      lib/server.js
  3. 20
      lib/storage.js
  4. 16
      test/integration/server.js

42
lib/blockchainmonitor.js

@ -109,22 +109,25 @@ BlockchainMonitor.prototype._handleTxId = function(data, processIt) {
log.info('Processing accepted txp [' + txp.id + '] for wallet ' + walletId + ' [' + txp.amount + 'sat ]');
txp.setBroadcasted();
self.storage.storeTx(self.walletId, txp, function(err) {
if (err)
log.error('Could not save TX');
var args = {
txProposalId: txp.id,
txid: data.txid,
amount: txp.getTotalAmount(),
};
self.storage.softResetTxHistoryCache(walletId, function() {
self.storage.storeTx(self.walletId, txp, function(err) {
if (err)
log.error('Could not save TX');
var notification = Notification.create({
type: 'NewOutgoingTxByThirdParty',
data: args,
walletId: walletId,
var args = {
txProposalId: txp.id,
txid: data.txid,
amount: txp.getTotalAmount(),
};
var notification = Notification.create({
type: 'NewOutgoingTxByThirdParty',
data: args,
walletId: walletId,
});
self._storeAndBroadcastNotification(notification);
});
self._storeAndBroadcastNotification(notification);
});
});
};
@ -166,8 +169,10 @@ BlockchainMonitor.prototype._handleTxOuts = function(data) {
},
walletId: walletId,
});
self._updateActiveAddresses(address, function() {
self._storeAndBroadcastNotification(notification, next);
self.storage.softResetTxHistoryCache(walletId, function() {
self._updateActiveAddresses(address, function() {
self._storeAndBroadcastNotification(notification, next);
});
});
});
}, function(err) {
@ -202,8 +207,11 @@ BlockchainMonitor.prototype._handleNewBlock = function(network, hash) {
hash: hash,
},
});
self._storeAndBroadcastNotification(notification, function(err) {
return;
self.storage.softResetAllTxHistoryCache(function() {
self._storeAndBroadcastNotification(notification, function(err) {
return;
});
});
};

79
lib/server.js

@ -2383,7 +2383,9 @@ WalletService.prototype._processBroadcast = function(txp, opts, cb) {
self._notify('NewOutgoingTx', args);
}
return cb(err, txp);
self.storage.softResetTxHistoryCache(self.walletId, function() {
return cb(err, txp);
});
});
};
@ -2788,15 +2790,16 @@ WalletService.prototype.getTxHistory = function(opts, cb) {
self.storage.fetchTxs(self.walletId, {}, next);
},
function(next) {
var totalItems;
var totalItems;
async.series([
function(nextSerie) {
if (!useCache) return nextSerie();
self.storage.getTxHistoryCache(self.walletId, from, to, function(err, res) {
self.storage.getTxHistoryCache(self.walletId, from, to, function(err, res) {
if (err) return nextSerie(err);
if (!res || !res[0] ) return nextSerie();
if (!res || !res[0]) return nextSerie();
normalizedTxs = res;
fromCache = true;
@ -2842,10 +2845,13 @@ WalletService.prototype.getTxHistory = function(opts, cb) {
if (err) return cb(err);
var proposals = res[0];
var notes = res[3];
var notes = res[2];
var finalTxs = decorate(normalizedTxs, addresses, proposals, notes);
if (fromCache)
log.debug("History from cache for:", self.walletId, from, to);
return cb(null, finalTxs, !!fromCache);
});
});
@ -2897,40 +2903,43 @@ WalletService.prototype.scan = function(opts, cb) {
if (!wallet.isComplete()) return cb(Errors.WALLET_NOT_COMPLETE);
wallet.scanStatus = 'running';
self.storage.storeWallet(wallet, function(err) {
if (err) return cb(err);
var derivators = [];
_.each([false, true], function(isChange) {
derivators.push({
derive: _.bind(wallet.createAddress, wallet, isChange),
rewind: _.bind(wallet.addressManager.rewindIndex, wallet.addressManager, isChange),
});
if (opts.includeCopayerBranches) {
_.each(wallet.copayers, function(copayer) {
if (copayer.addressManager) {
derivators.push({
derive: _.bind(copayer.createAddress, copayer, wallet, isChange),
rewind: _.bind(copayer.addressManager.rewindIndex, copayer.addressManager, isChange),
});
}
});
}
});
self.storage.clearTxHistoryCache(self.walletId, function() {
self.storage.storeWallet(wallet, function(err) {
if (err) return cb(err);
async.eachSeries(derivators, function(derivator, next) {
scanBranch(derivator, function(err, addresses) {
if (err) return next(err);
self.storage.storeAddressAndWallet(wallet, addresses, next);
var derivators = [];
_.each([false, true], function(isChange) {
derivators.push({
derive: _.bind(wallet.createAddress, wallet, isChange),
rewind: _.bind(wallet.addressManager.rewindIndex, wallet.addressManager, isChange),
});
if (opts.includeCopayerBranches) {
_.each(wallet.copayers, function(copayer) {
if (copayer.addressManager) {
derivators.push({
derive: _.bind(copayer.createAddress, copayer, wallet, isChange),
rewind: _.bind(copayer.addressManager.rewindIndex, copayer.addressManager, isChange),
});
}
});
}
});
}, function(error) {
self.storage.fetchWallet(wallet.id, function(err, wallet) {
if (err) return cb(err);
wallet.scanStatus = error ? 'error' : 'success';
self.storage.storeWallet(wallet, function() {
return cb(error);
async.eachSeries(derivators, function(derivator, next) {
scanBranch(derivator, function(err, addresses) {
if (err) return next(err);
self.storage.storeAddressAndWallet(wallet, addresses, next);
});
})
}, function(error) {
self.storage.fetchWallet(wallet.id, function(err, wallet) {
if (err) return cb(err);
wallet.scanStatus = error ? 'error' : 'success';
self.storage.storeWallet(wallet, function() {
return cb(error);
});
})
});
});
});
});

20
lib/storage.js

@ -632,9 +632,6 @@ Storage.prototype.getTxHistoryCache = function(walletId, from, to, cb) {
var end = fwdIndex + to - from;
console.log('[storage.js.632] from,to:', from, to); //TODO
console.log('[storage.js.632] fwsIndex,end:', fwdIndex, end); //TODO
// Cache is OK.
self.db.collection(collections.CACHE).findOne({
walletId: walletId,
@ -651,14 +648,26 @@ Storage.prototype.getTxHistoryCache = function(walletId, from, to, cb) {
if (!_.any(ret, function(i) {
return !!i;
})) {
console.log('[storage.js.650] history has holes. not using cache'); //TODO
return cb(); // some items are not yet defined.
// some items are not yet defined.
return cb();
}
return cb(null, ret.reverse());
});
})
};
Storage.prototype.softResetAllTxHistoryCache = function(cb) {
this.db.collection(collections.CACHE).update({
type: 'historyCacheStatus',
}, {
isUpdated: false,
}, {
multi: true,
}, cb);
};
Storage.prototype.softResetTxHistoryCache = function(walletId, cb) {
this.db.collection(collections.CACHE).update({
walletId: walletId,
@ -712,7 +721,6 @@ Storage.prototype.storeTxHistoryCache = function(walletId, totalItems, firstPosi
result = result || {};
var h = result.history || [];
console.log('[storage.js.723] STORING FROM', firstPosition); //TODO
//create a sparce array, from the input
_.each(items, function(i) {
h[firstPosition++] = i;

16
test/integration/server.js

@ -6284,8 +6284,7 @@ describe('Wallet service', function() {
should.not.exist(err);
should.exist(txs);
var calls = storeTxHistoryCacheSpy.getCalls();
calls.length.should.equal(1);
calls[0].args[3].length.should.equal(0);
calls.length.should.equal(0);
server.storage.storeTxHistoryCache.restore();
done();
});
@ -6322,7 +6321,7 @@ describe('Wallet service', function() {
});
});
describe.only('Downloading history', function() {
describe('Downloading history', function() {
var h;
beforeEach(function() {
h = helpers.historyCacheTest(200);
@ -6331,7 +6330,6 @@ describe('Wallet service', function() {
it('from 0 to 200, two times, in order', function(done) {
async.eachSeries(_.range(0, 200, 5), function(i, next) {
console.log('FIRST ', i); //TODO
server.getTxHistory({
skip: i,
limit: 5,
@ -6346,7 +6344,6 @@ describe('Wallet service', function() {
});
}, function() {
async.eachSeries(_.range(0, 200, 5), function(i, next) {
console.log('SECOND ', i); //TODO
server.getTxHistory({
skip: i,
limit: 5,
@ -6366,7 +6363,6 @@ describe('Wallet service', function() {
it('from 0 to 200, two times, random', function(done) {
var indexes = _.range(0, 200, 5);
async.eachSeries(_.shuffle(indexes), function(i, next) {
console.log('FIRST ', i); //TODO
server.getTxHistory({
skip: i,
limit: 5,
@ -6381,7 +6377,6 @@ describe('Wallet service', function() {
});
}, function() {
async.eachSeries(_.range(0, 200, 5), function(i, next) {
console.log('SECOND ', i); //TODO
server.getTxHistory({
skip: i,
limit: 5,
@ -6402,7 +6397,6 @@ describe('Wallet service', function() {
it('from 0 to 200, two times, random, with resets', function(done) {
var indexes = _.range(0, 200, 5);
async.eachSeries(_.shuffle(indexes), function(i, next) {
console.log('FIRST ', i); //TODO
server.getTxHistory({
skip: i,
limit: 5,
@ -6417,11 +6411,9 @@ describe('Wallet service', function() {
});
}, function() {
async.eachSeries(_.range(0, 200, 5), function(i, next) {
console.log('SECOND ', i); //TODO
function resetCache(cb) {
if (!(i%25)) {
console.log('[server.js.6424] RESET CACHE!!!!!!!!!!!!!!!!'); //TODO
storage.softResetTxHistoryCache(server.walletId, function() {
return cb(true);
});
@ -6443,8 +6435,8 @@ console.log('[server.js.6424] RESET CACHE!!!!!!!!!!!!!!!!'); //TODO
fromCache.should.equal(i >= 100 && !reset);
next();
});
}, done);
});
});
}, done);
});
});

Loading…
Cancel
Save