diff --git a/lib/blockchainexplorers/insight.js b/lib/blockchainexplorers/insight.js index bde2818..fdc69f4 100644 --- a/lib/blockchainexplorers/insight.js +++ b/lib/blockchainexplorers/insight.js @@ -75,7 +75,10 @@ Insight.prototype.getTransaction = function(txid, cb) { }; request(args, function(err, res, tx) { - if (err || res.statusCode !== 200) return cb(_parseErr(err,res)); + if (res && res.statusCode == 404 ) return cb(); + if (err || res.statusCode !== 200) + return cb(_parseErr(err,res)); + return cb(null, tx); }); }; diff --git a/lib/blockchainmonitor.js b/lib/blockchainmonitor.js index ea59a30..f85213b 100644 --- a/lib/blockchainmonitor.js +++ b/lib/blockchainmonitor.js @@ -86,7 +86,50 @@ BlockchainMonitor.prototype._initExplorer = function(explorer) { socket.on('block', _.bind(self._handleNewBlock, self)); }; -BlockchainMonitor.prototype._handleIncommingTx = function(data) { +BlockchainMonitor.prototype._handleTxId = function(data, processIt) { + var self = this; + if (!data || !data.txid) return; + + self.storage.fetchTxByHash(data.txid, function(err, txp) { + if (err) { + log.error('Could not fetch tx the db'); + return; + } + if (!txp || txp.status != 'accepted') return; + + var walletId = txp.walletId; + + if (!processIt) { + log.info('Detected broadcast ' + data.txid + ' of an accepted txp [' + txp.id + '] for wallet ' + walletId + ' [' + txp.amount + 'sat ]'); + return setTimeout(self._handleTxId.bind(self, data, true), 20 * 1000); + } + + log.info('Processing accepted txp [' + txp.id + '] for wallet ' + walletId + ' [' + txp.amount + 'sat ]'); + + txp.setBroadcasted(); + self.storage.storeTx(self.walletId, txp, function(err) { + if (err) + log.error('Could not save TX'); + + var args = { + txProposalId: txp.id, + txid: data.txid, + amount: txp.getTotalAmount(), + }; + + var notification = Notification.create({ + type: 'NewOutgoingTxByThirdParty', + data: args, + walletId: walletId, + }); + self._storeAndBroadcastNotification(notification); + }); + }); +}; + + + +BlockchainMonitor.prototype._handleTxOuts = function(data) { var self = this; if (!data || !data.vout) return; @@ -130,6 +173,12 @@ BlockchainMonitor.prototype._handleIncommingTx = function(data) { }); }; + +BlockchainMonitor.prototype._handleIncommingTx = function(data) { + this._handleTxId(data); + this._handleTxOuts(data); +}; + BlockchainMonitor.prototype._handleNewBlock = function(hash) { var self = this; @@ -150,7 +199,7 @@ BlockchainMonitor.prototype._storeAndBroadcastNotification = function(notificati self.storage.storeNotification(notification.walletId, notification, function() { self.messageBroker.send(notification) - return cb(); + if (cb) return cb(); }); }; diff --git a/lib/model/txproposal.js b/lib/model/txproposal.js index 668e671..a917358 100644 --- a/lib/model/txproposal.js +++ b/lib/model/txproposal.js @@ -1,6 +1,7 @@ 'use strict'; var _ = require('lodash'); +var $ = require('preconditions').singleton(); var Uuid = require('uuid'); var WalletUtils = require('bitcore-wallet-utils'); var Bitcore = WalletUtils.Bitcore; @@ -300,8 +301,10 @@ TxProposal.prototype.sign = function(copayerId, signatures, xpub) { this.addAction(copayerId, 'accept', null, signatures, xpub); - if (this.status == 'accepted') + if (this.status == 'accepted') { this.raw = tx.uncheckedSerialize(); + this.txid = tx.id; + } return true; } catch (e) { @@ -331,8 +334,8 @@ TxProposal.prototype.isBroadcasted = function() { return this.status == 'broadcasted'; }; -TxProposal.prototype.setBroadcasted = function(txid) { - this.txid = txid; +TxProposal.prototype.setBroadcasted = function() { + $.checkState(this.txid); this.status = 'broadcasted'; this.broadcastedOn = Math.floor(Date.now() / 1000); }; diff --git a/lib/server.js b/lib/server.js index a4bdbdf..64882d4 100644 --- a/lib/server.js +++ b/lib/server.js @@ -1362,11 +1362,11 @@ WalletService.prototype.broadcastRawTx = function(opts, cb) { WalletService.prototype._checkTxInBlockchain = function(txp, cb) { - var tx = txp.getBitcoreTx(); + if (!txp.txid) return cb(); var bc = this._getBlockchainExplorer(txp.getNetworkName()); - bc.getTransaction(tx.id, function(err, tx) { + bc.getTransaction(txp.txid, function(err, tx) { if (err) return cb(err); - return cb(null, tx); + return cb(null, !!tx); }) }; @@ -1435,6 +1435,31 @@ WalletService.prototype.signTx = function(opts, cb) { }); }; +WalletService.prototype._processBroadcast = function(txp, opts, cb) { + var self = this; + $.checkState(txp.txid); + opts = opts || {}; + + txp.setBroadcasted(); + self.storage.storeTx(self.walletId, txp, function(err) { + if (err) return cb(err); + + var args = { + txProposalId: txp.id, + txid: txp.txid, + amount: txp.getTotalAmount(), + }; + + if (opts.byThirdParty) { + self._notify('NewOutgoingTxByThirdParty', args); + } else { + self._notify('NewOutgoingTx', args); + } + + return cb(err, txp); + }); +}; + /** * Broadcast a transaction proposal. @@ -1447,21 +1472,6 @@ WalletService.prototype.broadcastTx = function(opts, cb) { if (!Utils.checkRequired(opts, ['txProposalId'])) return cb(new ClientError('Required argument missing')); - function setBroadcasted(txp, txid, cb) { - txp.setBroadcasted(txid); - self.storage.storeTx(self.walletId, txp, function(err) { - if (err) return cb(err); - - self._notify('NewOutgoingTx', { - txProposalId: opts.txProposalId, - txid: txid, - amount: txp.getTotalAmount(), - }, function() { - return cb(null, txp); - }); - }); - }; - self.getWallet({}, function(err, wallet) { if (err) return cb(err); @@ -1483,14 +1493,21 @@ WalletService.prototype.broadcastTx = function(opts, cb) { if (err) { var broadcastErr = err; // Check if tx already in blockchain - self._checkTxInBlockchain(txp, function(err, tx) { + self._checkTxInBlockchain(txp, function(err, isInBlockchain) { if (err) return cb(err); - if (!tx) return cb(broadcastErr); + if (!isInBlockchain) return cb(broadcastErr); - setBroadcasted(txp, tx.txid, cb); + self._processBroadcast(txp, { + byThirdParty: true + }, cb); }); } else { - setBroadcasted(txp, txid, cb); + self._processBroadcast(txp, { + byThirdParty: false + }, function(err) { + if (err) return cb(err); + return cb(null, txp); + }); } }); }); @@ -1570,7 +1587,20 @@ WalletService.prototype.getPendingTxs = function(opts, cb) { txp.deleteLockTime = self.getRemainingDeleteLockTime(txp); }); - return cb(null, txps); + async.each(txps, function(txp, a_cb) { + if (txp.status != 'accepted') return a_cb(); + + self._checkTxInBlockchain(txp, function(err, isInBlockchain) { + if (err || !isInBlockchain) return a_cb(err); + self._processBroadcast(txp, { + byThirdParty: true + }, a_cb); + }); + }, function(err) { + return cb(err, _.reject(txps, function(txp) { + return txp.status == 'broadcasted'; + })); + }); }); }; diff --git a/lib/storage.js b/lib/storage.js index a952494..836af12 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -41,6 +41,7 @@ Storage.prototype._createIndexes = function() { this.db.collection(collections.TXS).createIndex({ walletId: 1, isPending: 1, + txid: 1, }); this.db.collection(collections.NOTIFICATIONS).createIndex({ walletId: 1, @@ -171,6 +172,7 @@ Storage.prototype._completeTxData = function(walletId, txs, cb) { }); }; +// TODO: remove walletId from signature Storage.prototype.fetchTx = function(walletId, txProposalId, cb) { var self = this; @@ -184,7 +186,18 @@ Storage.prototype.fetchTx = function(walletId, txProposalId, cb) { }); }; +Storage.prototype.fetchTxByHash = function(hash, cb) { + var self = this; + this.db.collection(collections.TXS).findOne({ + txid: hash, + }, function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + + return self._completeTxData(result.walletId, Model.TxProposal.fromObj(result), cb); + }); +}; Storage.prototype.fetchLastTxs = function(walletId, creatorId, limit, cb) { var self = this; diff --git a/test/integration/server.js b/test/integration/server.js index ce4fb63..57c5431 100644 --- a/test/integration/server.js +++ b/test/integration/server.js @@ -181,8 +181,9 @@ helpers.stubUtxos = function(server, wallet, amounts, cb) { }); }; -helpers.stubBroadcast = function(txid) { - blockchainExplorer.broadcast = sinon.stub().callsArgWith(1, null, txid); +helpers.stubBroadcast = function(thirdPartyBroadcast) { + blockchainExplorer.broadcast = sinon.stub().callsArgWith(1, null, '112233'); + blockchainExplorer.getTransaction = sinon.stub().callsArgWith(1, null, null); }; helpers.stubHistory = function(txs) { @@ -485,14 +486,14 @@ describe('Wallet service', function() { message: 'some message' }); - var txpId; + var txp; async.waterfall([ function(next) { server.createTx(txOpts, next); }, - function(txp, next) { - txpId = txp.id; + function(t, next) { + txp = t; async.eachSeries(_.range(2), function(i, next) { var copayer = TestData.copayers[i]; helpers.getAuthServer(copayer.id45, function(server) { @@ -500,14 +501,17 @@ describe('Wallet service', function() { server.signTx({ txProposalId: txp.id, signatures: signatures, - }, next); + }, function(err, t) { + txp = t; + next(); + }); }); }, next); }, function(next) { - helpers.stubBroadcast('999'); + helpers.stubBroadcast(); server.broadcastTx({ - txProposalId: txpId, + txProposalId: txp.id, }, next); }, ], function(err) { @@ -525,7 +529,7 @@ describe('Wallet service', function() { one.text.should.contain(wallet.name); one.text.should.contain('800,000'); should.exist(one.html); - one.html.should.contain('https://insight.bitpay.com/tx/999'); + one.html.should.contain('https://insight.bitpay.com/tx/' + txp.txid); server.storage.fetchUnsentEmails(function(err, unsent) { should.not.exist(err); unsent.should.be.empty; @@ -2817,6 +2821,7 @@ describe('Wallet service', function() { }); it('should sign a TX with multiple inputs, different paths, and return raw', function(done) { + blockchainExplorer.getTransaction = sinon.stub().callsArgWith(1, null, null); server.getPendingTxs({}, function(err, txs) { var tx = txs[0]; tx.id.should.equal(txid); @@ -3053,7 +3058,7 @@ describe('Wallet service', function() { }); describe('#broadcastTx & #broadcastRawTx', function() { - var server, wallet, txpid; + var server, wallet, txpid, txid; beforeEach(function(done) { helpers.createAndJoinWallet(1, 1, function(s, w) { server = s; @@ -3074,6 +3079,7 @@ describe('Wallet service', function() { should.exist(txp); txp.isAccepted().should.be.true; txp.isBroadcasted().should.be.false; + txid = txp.txid; txpid = txp.id; done(); }); @@ -3084,7 +3090,7 @@ describe('Wallet service', function() { it('should broadcast a tx', function(done) { var clock = sinon.useFakeTimers(1234000, 'Date'); - helpers.stubBroadcast('999'); + helpers.stubBroadcast(); server.broadcastTx({ txProposalId: txpid }, function(err) { @@ -3094,7 +3100,7 @@ describe('Wallet service', function() { }, function(err, txp) { should.not.exist(err); should.not.exist(txp.raw); - txp.txid.should.equal('999'); + txp.txid.should.equal(txid); txp.isBroadcasted().should.be.true; txp.broadcastedOn.should.equal(1234); clock.restore(); @@ -3104,19 +3110,19 @@ describe('Wallet service', function() { }); it('should broadcast a raw tx', function(done) { - helpers.stubBroadcast('999'); + helpers.stubBroadcast(); server.broadcastRawTx({ network: 'testnet', rawTx: 'raw tx', }, function(err, txid) { should.not.exist(err); - txid.should.equal('999'); + should.exist(txid); done(); }); }); it('should fail to brodcast a tx already marked as broadcasted', function(done) { - helpers.stubBroadcast('999'); + helpers.stubBroadcast(); server.broadcastTx({ txProposalId: txpid }, function(err) { @@ -3131,8 +3137,52 @@ describe('Wallet service', function() { }); }); + it('should auto process already broadcasted txs', function(done) { + helpers.stubBroadcast(); + server.getPendingTxs({}, function(err, txs) { + should.not.exist(err); + txs.length.should.equal(1); + blockchainExplorer.getTransaction = sinon.stub().callsArgWith(1, null, { + txid: 999 + }); + server.getPendingTxs({}, function(err, txs) { + should.not.exist(err); + txs.length.should.equal(0); + done(); + }); + }); + }); + + it('should process only broadcasted txs', function(done) { + helpers.stubBroadcast(); + var txOpts = helpers.createSimpleProposalOpts('18PzpUFkFZE8zKWUPvfykkTxmB9oMR8qP7', 9, TestData.copayers[0].privKey_1H_0, { + message: 'some message 2' + }); + server.createTx(txOpts, function(err, txp) { + should.not.exist(err); + server.getPendingTxs({}, function(err, txs) { + should.not.exist(err); + txs.length.should.equal(2); + blockchainExplorer.getTransaction = sinon.stub().callsArgWith(1, null, { + txid: 999 + }); + server.getPendingTxs({}, function(err, txs) { + should.not.exist(err); + txs.length.should.equal(1); + txs[0].status.should.equal('pending'); + should.not.exist(txs[0].txid); + done(); + }); + }); + }); + }); + + + + + it('should fail to brodcast a not yet accepted tx', function(done) { - helpers.stubBroadcast('999'); + helpers.stubBroadcast(); var txOpts = helpers.createSimpleProposalOpts('18PzpUFkFZE8zKWUPvfykkTxmB9oMR8qP7', 9, TestData.copayers[0].privKey_1H_0, { message: 'some message' }); @@ -3161,7 +3211,7 @@ describe('Wallet service', function() { txProposalId: txpid }, function(err, txp) { should.not.exist(err); - should.not.exist(txp.txid); + should.exist(txp.txid); txp.isBroadcasted().should.be.false; should.not.exist(txp.broadcastedOn); txp.isAccepted().should.be.true; @@ -3184,7 +3234,6 @@ describe('Wallet service', function() { }, function(err, txp) { should.not.exist(err); should.exist(txp.txid); - txp.txid.should.equal('999'); txp.isBroadcasted().should.be.true; should.exist(txp.broadcastedOn); done(); @@ -3204,7 +3253,7 @@ describe('Wallet service', function() { txProposalId: txpid }, function(err, txp) { should.not.exist(err); - should.not.exist(txp.txid); + should.exist(txp.txid); txp.isBroadcasted().should.be.false; should.not.exist(txp.broadcastedOn); txp.isAccepted().should.be.true; @@ -3221,7 +3270,7 @@ describe('Wallet service', function() { server = s; wallet = w; helpers.stubUtxos(server, wallet, _.range(1, 9), function() { - helpers.stubBroadcast('999'); + helpers.stubBroadcast(); done(); }); }); @@ -3320,7 +3369,7 @@ describe('Wallet service', function() { txp.isPending().should.be.true; txp.isAccepted().should.be.true; txp.isBroadcasted().should.be.false; - should.not.exist(txp.txid); + should.exist(txp.txid); txp.actions.length.should.equal(2); server.getNotifications({}, function(err, notifications) { should.not.exist(err); @@ -3692,7 +3741,7 @@ describe('Wallet service', function() { signatures: signatures, }, function(err) { should.not.exist(err); - helpers.stubBroadcast('1122334455'); + helpers.stubBroadcast(); server.broadcastTx({ txProposalId: tx.id }, function(err, txp) { @@ -3710,6 +3759,38 @@ describe('Wallet service', function() { }); }); }); + + + it('should notify sign, acceptance, and broadcast, and emit (with 3rd party broadcast', function(done) { + server.getPendingTxs({}, function(err, txs) { + var tx = txs[2]; + var signatures = helpers.clientSign(tx, TestData.copayers[0].xPrivKey); + server.signTx({ + txProposalId: tx.id, + signatures: signatures, + }, function(err) { + should.not.exist(err); + blockchainExplorer.broadcast = sinon.stub().callsArgWith(1, 'err'); + blockchainExplorer.getTransaction = sinon.stub().callsArgWith(1, null, { + txid: 11 + }); + server.broadcastTx({ + txProposalId: tx.id + }, function(err, txp) { + should.not.exist(err); + server.getNotifications({ + limit: 3, + reverse: true, + }, function(err, notifications) { + should.not.exist(err); + var types = _.pluck(notifications, 'type'); + types.should.deep.equal(['NewOutgoingTxByThirdParty', 'TxProposalFinallyAccepted', 'TxProposalAcceptedBy']); + done(); + }); + }); + }); + }); + }); }); describe('#removeWallet', function() { @@ -4192,13 +4273,13 @@ describe('Wallet service', function() { }, function(err, tx) { should.not.exist(err); - helpers.stubBroadcast('1122334455'); + helpers.stubBroadcast(); server.broadcastTx({ txProposalId: tx.id }, function(err, txp) { should.not.exist(err); var txs = [{ - txid: '1122334455', + txid: txp.txid, confirmations: 1, fees: 5460, time: 1, diff --git a/test/storage.js b/test/storage.js index c5d4ed9..921ac2e 100644 --- a/test/storage.js +++ b/test/storage.js @@ -151,6 +151,7 @@ describe('Storage', function() { tx.status = 'rejected'; tx.isPending().should.be.false; } + tx.txid = 'txid' + i; return tx; }); async.each(proposals, function(tx, next) { @@ -171,6 +172,17 @@ describe('Storage', function() { done(); }); }); + it('should fetch tx by hash', function(done) { + storage.fetchTxByHash('txid0', function(err, tx) { + should.not.exist(err); + should.exist(tx); + tx.id.should.equal(proposals[0].id); + tx.walletId.should.equal(proposals[0].walletId); + tx.creatorName.should.equal('copayer 0'); + done(); + }); + }); + it('should fetch all pending txs', function(done) { storage.fetchPendingTxs('123', function(err, txs) { should.not.exist(err);