From c87350d7efae81c5570c626649c3ad38cec8e91d Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Wed, 17 May 2017 15:10:16 -0300 Subject: [PATCH] send notification --- lib/blockchainmonitor.js | 56 ++++++++++++++++++++++++++++------ lib/model/txconfirmationsub.js | 4 +-- lib/server.js | 3 +- lib/storage.js | 23 ++++++++------ test/integration/bcmonitor.js | 41 +++++++++++++++++++++++++ test/integration/server.js | 6 ---- 6 files changed, 103 insertions(+), 30 deletions(-) diff --git a/lib/blockchainmonitor.js b/lib/blockchainmonitor.js index 0f901ca..60a76fe 100644 --- a/lib/blockchainmonitor.js +++ b/lib/blockchainmonitor.js @@ -25,7 +25,8 @@ BlockchainMonitor.prototype.start = function(opts, cb) { async.parallel([ function(done) { - self.explorers = _.indexBy(_.map(['livenet', 'testnet'], function(network) { + self.explorers = {}; + _.map(['livenet', 'testnet'], function(network) { var explorer; if (opts.blockchainExplorers) { explorer = opts.blockchainExplorers[network]; @@ -42,9 +43,9 @@ BlockchainMonitor.prototype.start = function(opts, cb) { }); } $.checkState(explorer); - self._initExplorer(explorer); - return explorer; - }), 'network'); + self._initExplorer(network, explorer); + self.explorers[network] = explorer; + }); done(); }, function(done) { @@ -72,7 +73,7 @@ BlockchainMonitor.prototype.start = function(opts, cb) { }); }; -BlockchainMonitor.prototype._initExplorer = function(explorer) { +BlockchainMonitor.prototype._initExplorer = function(network, explorer) { var self = this; var socket = explorer.initSocket(); @@ -85,7 +86,7 @@ BlockchainMonitor.prototype._initExplorer = function(explorer) { log.error('Error connecting to ' + explorer.getConnectionInfo()); }); socket.on('tx', _.bind(self._handleIncomingTx, self)); - socket.on('block', _.bind(self._handleNewBlock, self, explorer.network)); + socket.on('block', _.bind(self._handleNewBlock, self, network)); }; BlockchainMonitor.prototype._handleThirdPartyBroadcasts = function(data, processIt) { @@ -209,7 +210,7 @@ BlockchainMonitor.prototype._handleIncomingTx = function(data) { BlockchainMonitor.prototype._notifyNewBlock = function(network, hash) { var self = this; - log.info('New ' + network + ' block: ', hash); + log.info('New ' + network + ' block: ' + hash); var notification = Notification.create({ type: 'NewBlock', walletId: network, // use network name as wallet id for global notifications @@ -229,17 +230,52 @@ BlockchainMonitor.prototype._notifyNewBlock = function(network, hash) { BlockchainMonitor.prototype._handleTxConfirmations = function(network, hash) { var self = this; + function processTriggeredSubs(subs, cb) { + async.each(subs, function(sub) { + log.info('New tx confirmation ' + sub.txid); + sub.isActive = false; + self.storage.storeTxConfirmationSub(sub, function(err) { + if (err) return cb(err); + + var notification = Notification.create({ + type: 'TxConfirmation', + walletId: sub.walletId, + creatorId: sub.copayerId, + data: { + txid: sub.txid, + network: network, + // TODO: amount + }, + }); + self._storeAndBroadcastNotification(notification, cb); + }); + }); + }; + var explorer = self.explorers[network]; if (!explorer) return; explorer.getTxidsInBlock(hash, function(err, txids) { if (err) { - log.error('Could not fetch txids from block ' + hash); + log.error('Could not fetch txids from block ' + hash, err); return; } - // self.storage.fetchTx - + self.storage.fetchActiveTxConfirmationSubs(null, function(err, subs) { + if (err) return; + if (_.isEmpty(subs)) return; + var indexedSubs = _.indexBy(subs, 'txid'); + var triggered = []; + _.each(txids, function(txid) { + if (indexedSubs[txid]) triggered.push(indexedSubs[txid]); + }); + processTriggeredSubs(triggered, function(err) { + if (err) { + log.error('Could not process tx confirmations', err); + } + return; + }); + }); }); }; diff --git a/lib/model/txconfirmationsub.js b/lib/model/txconfirmationsub.js index 0d01530..ed07f82 100644 --- a/lib/model/txconfirmationsub.js +++ b/lib/model/txconfirmationsub.js @@ -9,9 +9,9 @@ TxConfirmationSub.create = function(opts) { x.version = 1; x.createdOn = Math.floor(Date.now() / 1000); + x.walletId = opts.walletId; x.copayerId = opts.copayerId; x.txid = opts.txid; - x.nbConfirmations = opts.nbConfirmations || 1; x.isActive = true; return x; }; @@ -21,9 +21,9 @@ TxConfirmationSub.fromObj = function(obj) { x.version = obj.version; x.createdOn = obj.createdOn; + x.walletId = obj.walletId; x.copayerId = obj.copayerId; x.txid = obj.txid; - x.nbConfirmations = obj.nbConfirmations; x.isActive = obj.isActive; return x; }; diff --git a/lib/server.js b/lib/server.js index 94690c6..5a0fdf8 100644 --- a/lib/server.js +++ b/lib/server.js @@ -3140,7 +3140,6 @@ WalletService.prototype.pushNotificationsUnsubscribe = function(opts, cb) { * Subscribe this copayer to the specified tx to get a notification when the tx confirms. * @param {Object} opts * @param {string} opts.txid - The txid of the tx to be notified of. - * @param {string} [opts.nbConfirmations=1] - The number of confirmations to wait before sending the notification. */ WalletService.prototype.txConfirmationSubscribe = function(opts, cb) { if (!checkRequired(opts, ['txid'], cb)) return; @@ -3149,8 +3148,8 @@ WalletService.prototype.txConfirmationSubscribe = function(opts, cb) { var sub = Model.TxConfirmationSub.create({ copayerId: self.copayerId, + walletId: self.walletId, txid: opts.txid, - nbConfirmations: +opts.nbConfirmations || 1, }); self.storage.storeTxConfirmationSub(sub, cb); diff --git a/lib/storage.js b/lib/storage.js index fc7b474..36dc224 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -941,19 +941,22 @@ Storage.prototype.removePushNotificationSub = function(copayerId, token, cb) { }; Storage.prototype.fetchActiveTxConfirmationSubs = function(copayerId, cb) { - this.db.collection(collections.TX_CONFIRMATION_SUBS).find({ - copayerId: copayerId, - isActive: true, - }).toArray(function(err, result) { - if (err) return cb(err); + var filter = { + isActive: true + }; + if (copayerId) filter.copayerId = copayerId; - if (!result) return cb(); + this.db.collection(collections.TX_CONFIRMATION_SUBS).find(filter) + .toArray(function(err, result) { + if (err) return cb(err); + + if (!result) return cb(); - var subs = _.map([].concat(result), function(r) { - return Model.TxConfirmationSub.fromObj(r); + var subs = _.map([].concat(result), function(r) { + return Model.TxConfirmationSub.fromObj(r); + }); + return cb(null, subs); }); - return cb(null, subs); - }); }; Storage.prototype.storeTxConfirmationSub = function(txConfirmationSub, cb) { diff --git a/test/integration/bcmonitor.js b/test/integration/bcmonitor.js index 507b616..37020e4 100644 --- a/test/integration/bcmonitor.js +++ b/test/integration/bcmonitor.js @@ -114,4 +114,45 @@ describe('Blockchain monitor', function() { }, 50); }); }); + + it('should notify copayers of tx confirmation', function(done) { + server.createAddress({}, function(err, address) { + should.not.exist(err); + + var incoming = { + txid: '123', + vout: [{}], + }; + incoming.vout[0][address.address] = 1500; + + server.txConfirmationSubscribe({ + txid: '123' + }, function(err) { + should.not.exist(err); + + blockchainExplorer.getTxidsInBlock = sinon.stub().callsArgWith(1, null, ['123', '456']); + socket.handlers['block']('block1'); + + setTimeout(function() { + blockchainExplorer.getTxidsInBlock = sinon.stub().callsArgWith(1, null, ['123', '456']); + socket.handlers['block']('block2'); + + setTimeout(function() { + server.getNotifications({}, function(err, notifications) { + should.not.exist(err); + var notifications = _.filter(notifications, { + type: 'TxConfirmation' + }); + notifications.length.should.equal(1); + var n = notifications[0]; + n.walletId.should.equal(wallet.id); + n.creatorId.should.equal(server.copayerId); + n.data.txid.should.equal('123'); + done(); + }); + }, 50); + }, 50); + }); + }); + }); }); diff --git a/test/integration/server.js b/test/integration/server.js index 8122ffc..1e0d09c 100644 --- a/test/integration/server.js +++ b/test/integration/server.js @@ -7365,7 +7365,6 @@ describe('Wallet service', function() { subs.length.should.equal(1); var s = subs[0]; s.txid.should.equal('123'); - s.nbConfirmations.should.equal(1); s.isActive.should.be.true; done(); }); @@ -7380,17 +7379,12 @@ describe('Wallet service', function() { }, function(err) { server.txConfirmationSubscribe({ txid: '123', - nbConfirmations: 6, }, function(err) { should.not.exist(err); server.storage.fetchActiveTxConfirmationSubs(wallet.copayers[0].id, function(err, subs) { should.not.exist(err); should.exist(subs); subs.length.should.equal(1); - var s = subs[0]; - s.txid.should.equal('123'); - s.nbConfirmations.should.equal(6); - s.isActive.should.be.true; done(); }); });