diff --git a/bcmonitor/bcmonitor.js b/bcmonitor/bcmonitor.js new file mode 100644 index 0000000..3a6e6e7 --- /dev/null +++ b/bcmonitor/bcmonitor.js @@ -0,0 +1,17 @@ +#!/usr/bin/env node + +'use strict'; + +var _ = require('lodash'); +var log = require('npmlog'); +log.debug = log.verbose; + +var config = require('../config'); +var BlockchainMonitor = require('../lib/blockchainmonitor'); + +var bcm = new BlockchainMonitor(); +bcm.start(config, function(err) { + if (err) throw err; + + console.log('Blockchain monitor started'); +}); diff --git a/bws.js b/bws.js index 9260511..7812e29 100755 --- a/bws.js +++ b/bws.js @@ -37,8 +37,10 @@ var start = function(cb) { if (err) return cb(err); var server = config.https ? serverModule.createServer(serverOpts, app) : serverModule.Server(app); - WsApp.start(server, config); - return server; + var wsApp = new WsApp(); + wsApp.start(server, config, function(err) { + return server; + }); }); }); return cb(null, server); @@ -47,8 +49,10 @@ var start = function(cb) { if (err) return cb(err); server = config.https ? serverModule.createServer(serverOpts, app) : serverModule.Server(app); - WsApp.start(server, config); - return cb(null, server); + var wsApp = new WsApp(); + wsApp.start(server, config, function(err) { + return cb(err, server); + }); }); }; }; @@ -56,6 +60,9 @@ var start = function(cb) { if (config.cluster && !config.lockOpts.lockerServer) throw 'When running in cluster mode, locker server need to be configured'; +if (config.cluster && !config.messageBrokerOpts.messageBrokerServer) + throw 'When running in cluster mode, message broker server need to be configured'; + start(function(err, server) { if (err) { console.log('Could not start BWS:', err); diff --git a/config.js b/config.js index b6d9300..e64b966 100644 --- a/config.js +++ b/config.js @@ -23,6 +23,12 @@ var config = { // port: 3231, // }, }, + messageBrokerOpts: { + // To use message broker server, uncomment this: + messageBrokerServer: { + url: 'http://localhost:3380', + }, + }, blockchainExplorerOpts: { livenet: { provider: 'insight', diff --git a/lib/blockchainexplorer.js b/lib/blockchainexplorer.js index 99586c3..8da38f2 100644 --- a/lib/blockchainexplorer.js +++ b/lib/blockchainexplorer.js @@ -62,7 +62,9 @@ function getAddressActivityInsight(url, addresses, cb) { }; function initSocketInsight(url) { - var socket = io.connect(url, {}); + var socket = io.connect(url, { + 'reconnection': true, + }); return socket; }; diff --git a/lib/blockchainmonitor.js b/lib/blockchainmonitor.js index 0e7f730..3d2a245 100644 --- a/lib/blockchainmonitor.js +++ b/lib/blockchainmonitor.js @@ -5,100 +5,119 @@ var _ = require('lodash'); var async = require('async'); var log = require('npmlog'); log.debug = log.verbose; -var Uuid = require('uuid'); -var inherits = require('inherits'); -var events = require('events'); -var nodeutil = require('util'); - -var WalletUtils = require('bitcore-wallet-utils'); -var Bitcore = WalletUtils.Bitcore; -var WalletService = require('./server'); + var BlockchainExplorer = require('./blockchainexplorer'); +var Storage = require('./storage'); +var MessageBroker = require('./messagebroker'); var Notification = require('./model/notification'); -function BlockchainMonitor(opts) { +function BlockchainMonitor() {}; + +BlockchainMonitor.prototype.start = function(opts, cb) { opts = opts || {}; + $.checkArgument(opts.blockchainExplorerOpts); + $.checkArgument(opts.storageOpts); + var self = this; - this.subscriptions = {}; - this.subscriber = {}; - _.each(['livenet', 'testnet'], function(network) { - opts[network] = opts[network] || {}; - self.subscriber[network] = self._getAddressSubscriber( - opts[network].provider, network, opts[network].url); - }); -}; -nodeutil.inherits(BlockchainMonitor, events.EventEmitter); + async.parallel([ + + function(done) { + self.explorers = _.map(['livenet', 'testnet'], function(network) { + var config = opts.blockchainExplorerOpts[network] || {}; + return self._initExplorer(config.provider, network, config.url); + }); + done(); + }, + function(done) { + self.storage = new Storage(); + self.storage.connect(opts.storageOpts, done); + }, + function(done) { + self.messageBroker = new MessageBroker(opts.messageBrokerOpts); + done(); + }, + ], cb); -BlockchainMonitor.prototype._getAddressSubscriber = function(provider, network) { +}; + +BlockchainMonitor.prototype._initExplorer = function(provider, network, url) { $.checkArgument(provider == 'insight', 'Blockchain monitor ' + provider + ' not supported'); + var self = this; + var explorer = new BlockchainExplorer({ provider: provider, network: network, + url: url, }); var socket = explorer.initSocket(); - // TODO: Extract on its own class once more providers are implemented - return { - subscribe: function(address, handler) { - socket.emit('subscribe', address); - socket.on(address, handler); - }, - }; -}; + var connectionInfo = provider + ' (' + network + ') @ ' + url; + socket.on('connect', function() { + log.info('Connected to ' + connectionInfo); + socket.emit('subscribe', 'inv'); + }); + socket.on('connect_error', function() { + log.error('Error connecting to ' + connectionInfo); + }); + socket.on('tx', _.bind(self._handleIncommingTx, self)); -BlockchainMonitor.prototype.subscribeAddresses = function(walletId, addresses) { - $.checkArgument(walletId); + return explorer; +}; +BlockchainMonitor.prototype._handleIncommingTx = function(data) { var self = this; - if (!addresses || addresses.length == 0) return; + if (!data || !data.vout) return; - function handlerFor(address, txid) { - var notification = Notification.create({ - walletId: this, - type: 'NewIncomingTx', - data: { - address: address, - txid: txid, - }, - }); - self.emit('notification', notification); - }; + var outs = _.compact(_.map(data.vout, function(v) { + var addr = _.keys(v)[0]; + var startingChar = addr.charAt(0); + if (startingChar != '2' && startingChar != '3') return; - if (!self.subscriptions[walletId]) { - self.subscriptions[walletId] = { - addresses: [], + return { + address: addr, + amount: +v[addr] }; - }; - - var addresses = [].concat(addresses); - var network = Bitcore.Address.fromString(addresses[0]).network.name; - var subscriber = self.subscriber[network]; - _.each(addresses, function(address) { - self.subscriptions[walletId].addresses.push(address); - subscriber.subscribe(address, _.bind(handlerFor, walletId, address)); + })); + if (_.isEmpty(outs)) return; + + async.each(outs, function(out, next) { + self.storage.fetchAddress(out.address, function(err, address) { + if (err) { + log.error('Could not fetch addresses from the db'); + return next(err); + } + if (!address || address.isChange) return next(); + + var walletId = address.walletId; + log.info('Incoming tx for wallet ' + walletId + ' [' + out.amount + 'sat -> ' + out.address + ']'); + self._createNotification(walletId, data.txid, out.address, out.amount, next); + }); + }, function(err) { + return; }); }; -BlockchainMonitor.prototype.subscribeWallet = function(walletService, cb) { +BlockchainMonitor.prototype._createNotification = function(walletId, txid, address, amount, cb) { var self = this; - var walletId = walletService.walletId; - if (self.subscriptions[walletId]) return; - - walletService.getMainAddresses({}, function(err, addresses) { - if (err) { - delete self.subscriptions[walletId]; - return cb(new Error('Could not subscribe to addresses for wallet ' + walletId)); - } - self.subscribeAddresses(walletService.walletId, _.pluck(addresses, 'address')); + var n = Notification.create({ + type: 'NewIncomingTx', + data: { + txid: txid, + address: address, + amount: amount, + }, + walletId: walletId, + }); + self.storage.storeNotification(walletId, n, function() { + self.messageBroker.send(n) return cb(); }); }; - module.exports = BlockchainMonitor; diff --git a/lib/messagebroker.js b/lib/messagebroker.js new file mode 100644 index 0000000..e41da43 --- /dev/null +++ b/lib/messagebroker.js @@ -0,0 +1,46 @@ +var $ = require('preconditions').singleton(); +var _ = require('lodash'); +var inherits = require('inherits'); +var events = require('events'); +var nodeutil = require('util'); +var log = require('npmlog'); +log.debug = log.verbose; +log.disableColor(); + +function MessageBroker(opts) { + var self = this; + + opts = opts || {}; + if (opts.messageBrokerServer) { + var url = opts.messageBrokerServer.url; + + this.remote = true; + this.mq = require('socket.io-client').connect(url); + this.mq.on('connect', function() {}); + this.mq.on('connect_error', function() { + log.warn('Error connecting to message broker server @ ' + url); + }); + + this.mq.on('msg', function(data) { + self.emit('msg', data); + }); + + log.info('Using message broker server at ' + url); + } +}; + +nodeutil.inherits(MessageBroker, events.EventEmitter); + +MessageBroker.prototype.send = function(data) { + if (this.remote) { + this.mq.emit('msg', data); + } else { + this.emit('msg', data); + } +}; + +MessageBroker.prototype.onMessage = function(handler) { + this.on('msg', handler); +}; + +module.exports = MessageBroker; diff --git a/lib/notificationbroadcaster.js b/lib/notificationbroadcaster.js deleted file mode 100644 index 692c43e..0000000 --- a/lib/notificationbroadcaster.js +++ /dev/null @@ -1,25 +0,0 @@ -'use strict'; - -var log = require('npmlog'); -log.debug = log.verbose; -var inherits = require('inherits'); -var events = require('events'); -var nodeutil = require('util'); - -function NotificationBroadcaster() {}; - -nodeutil.inherits(NotificationBroadcaster, events.EventEmitter); - -NotificationBroadcaster.prototype.broadcast = function(eventName, notification) { - this.emit(eventName, notification); -}; - -var _instance; -NotificationBroadcaster.singleton = function() { - if (!_instance) { - _instance = new NotificationBroadcaster(); - } - return _instance; -}; - -module.exports = NotificationBroadcaster.singleton(); diff --git a/lib/server.js b/lib/server.js index 99aa833..bf71221 100644 --- a/lib/server.js +++ b/lib/server.js @@ -16,7 +16,7 @@ var ClientError = require('./clienterror'); var Utils = require('./utils'); var Lock = require('./lock'); var Storage = require('./storage'); -var NotificationBroadcaster = require('./notificationbroadcaster'); +var MessageBroker = require('./messagebroker'); var BlockchainExplorer = require('./blockchainexplorer'); var Wallet = require('./model/wallet'); @@ -27,6 +27,7 @@ var Notification = require('./model/notification'); var initialized = false; var lock, storage, blockchainExplorer, blockchainExplorerOpts; +var messageBroker; /** @@ -41,13 +42,10 @@ function WalletService() { this.storage = storage; this.blockchainExplorer = blockchainExplorer; this.blockchainExplorerOpts = blockchainExplorerOpts; + this.messageBroker = messageBroker; this.notifyTicker = 0; }; -WalletService.onNotification = function(func) { - NotificationBroadcaster.on('notification', func); -}; - /** * Initializes global settings for all instances. * @param {Object} opts @@ -58,7 +56,6 @@ WalletService.onNotification = function(func) { WalletService.initialize = function(opts, cb) { $.shouldBeFunction(cb); - opts = opts || {}; lock = opts.lock || new Lock(opts.lockOpts); blockchainExplorer = opts.blockchainExplorer; @@ -67,19 +64,44 @@ WalletService.initialize = function(opts, cb) { if (initialized) return cb(); - if (opts.storage) { - storage = opts.storage; - initialized = true; - return cb(); - } else { + function initStorage(cb) { + if (opts.storage) { + storage = opts.storage; + return cb(); + } var newStorage = new Storage(); newStorage.connect(opts.storageOpts, function(err) { if (err) return cb(err); storage = newStorage; - initialized = true; return cb(); }); - } + }; + + function initMessageBroker(cb) { + if (opts.messageBroker) { + messageBroker = opts.messageBroker; + } else { + messageBroker = new MessageBroker(opts.messageBrokerOpts); + } + return cb(); + }; + + async.series([ + + function(next) { + initStorage(next); + }, + function(next) { + initMessageBroker(next); + }, + ], function(err) { + if (err) { + log.error('Could not initialize', err); + throw err; + } + initialized = true; + return cb(); + }); }; @@ -260,11 +282,11 @@ WalletService.prototype.replaceTemporaryRequestKey = function(opts, cb) { walletId: opts.walletId, copayerId: self.copayerId, copayerName: opts.name, - }); - - return cb(null, { - copayerId: self.copayerId, - wallet: wallet + }, function() { + return cb(null, { + copayerId: self.copayerId, + wallet: wallet + }); }); }); }); @@ -281,25 +303,23 @@ WalletService.prototype._verifySignature = function(text, signature, pubKey) { return WalletUtils.verifyMessage(text, signature, pubKey); }; -/** - * _emit - * - * @param {Object} args - */ -WalletService.prototype._emit = function(eventName, args) { - NotificationBroadcaster.broadcast(eventName, args); -}; - /** * _notify * * @param {String} type * @param {Object} data - * @param {Boolean} isGlobal - If true, the notification is not issued on behalf of any particular copayer (defaults to false) + * @param {Object} opts + * @param {Boolean} opts.isGlobal - If true, the notification is not issued on behalf of any particular copayer (defaults to false) */ -WalletService.prototype._notify = function(type, data, isGlobal) { +WalletService.prototype._notify = function(type, data, opts, cb) { var self = this; + if (_.isFunction(opts)) { + cb = opts; + opts = {}; + } + opts = opts || {}; + log.debug('Notification', type, data); var walletId = self.walletId || data.walletId; @@ -311,11 +331,12 @@ WalletService.prototype._notify = function(type, data, isGlobal) { type: type, data: data, ticker: this.notifyTicker++, - creatorId: isGlobal ? null : copayerId, + creatorId: opts.isGlobal ? null : copayerId, walletId: walletId, }); this.storage.storeNotification(walletId, n, function() { - self._emit('notification', n); + self.messageBroker.send(n); + if (cb) return cb(); }); }; @@ -379,10 +400,11 @@ WalletService.prototype.joinWallet = function(opts, cb) { walletId: opts.walletId, copayerId: copayer.id, copayerName: copayer.name, - }); - return cb(null, { - copayerId: copayer.id, - wallet: wallet + }, function() { + return cb(null, { + copayerId: copayer.id, + wallet: wallet + }); }); }); }); @@ -411,8 +433,9 @@ WalletService.prototype.createAddress = function(opts, cb) { self._notify('NewAddress', { address: address.address, + }, function() { + return cb(null, address); }); - return cb(null, address); }); }); }); @@ -710,8 +733,9 @@ WalletService.prototype.createTx = function(opts, cb) { self._notify('NewTxProposal', { amount: opts.amount + }, function() { + return cb(null, txp); }); - return cb(null, txp); }); }); }); @@ -783,8 +807,9 @@ WalletService.prototype.removePendingTx = function(opts, cb) { if (actors.length > 1 || (actors.length == 1 && actors[0] !== self.copayerId)) return cb(new ClientError('TXACTIONED', 'Cannot remove a proposal signed/rejected by other copayers')); - self._notify('TxProposalRemoved'); - self.storage.removeTx(self.walletId, txp.id, cb); + self._notify('TxProposalRemoved', {}, function() { + self.storage.removeTx(self.walletId, txp.id, cb); + }); }); }); }; @@ -839,18 +864,26 @@ WalletService.prototype.signTx = function(opts, cb) { self.storage.storeTx(self.walletId, txp, function(err) { if (err) return cb(err); - self._notify('TxProposalAcceptedBy', { - txProposalId: opts.txProposalId, - copayerId: self.copayerId, + async.parallel([ + + function(done) { + self._notify('TxProposalAcceptedBy', { + txProposalId: opts.txProposalId, + copayerId: self.copayerId, + }, done); + }, + function(done) { + if (txp.isAccepted()) { + self._notify('TxProposalFinallyAccepted', { + txProposalId: opts.txProposalId, + }, done); + } else { + done(); + } + }, + ], function() { + return cb(null, txp); }); - - if (txp.isAccepted()) { - self._notify('TxProposalFinallyAccepted', { - txProposalId: opts.txProposalId, - }); - } - - return cb(null, txp); }); }); }); @@ -892,9 +925,9 @@ WalletService.prototype.broadcastTx = function(opts, cb) { self._notify('NewOutgoingTx', { txProposalId: opts.txProposalId, txid: txid + }, function() { + return cb(null, txp); }); - - return cb(null, txp); }); }); }); @@ -932,19 +965,26 @@ WalletService.prototype.rejectTx = function(opts, cb) { self.storage.storeTx(self.walletId, txp, function(err) { if (err) return cb(err); - self._notify('TxProposalRejectedBy', { - txProposalId: opts.txProposalId, - copayerId: self.copayerId, - }); - - - if (txp.status == 'rejected') { - self._notify('TxProposalFinallyRejected', { - txProposalId: opts.txProposalId, - }); - }; + async.parallel([ - return cb(null, txp); + function(done) { + self._notify('TxProposalRejectedBy', { + txProposalId: opts.txProposalId, + copayerId: self.copayerId, + }, done); + }, + function(done) { + if (txp.status == 'rejected') { + self._notify('TxProposalFinallyRejected', { + txProposalId: opts.txProposalId, + }, done); + } else { + done(); + } + }, + ], function() { + return cb(null, txp); + }); }); }); }; @@ -1286,7 +1326,9 @@ WalletService.prototype.startScan = function(opts, cb) { result: err ? 'error' : 'success', }; if (err) data.error = err; - self._notify('ScanFinished', data, true); + self._notify('ScanFinished', data, { + isGlobal: true + }); }; self.getWallet({}, function(err, wallet) { diff --git a/lib/storage.js b/lib/storage.js index c72a0c9..f964126 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -100,6 +100,9 @@ Storage.prototype.storeWalletAndUpdateCopayersLookup = function(wallet, cb) { }; Storage.prototype.fetchCopayerLookup = function(copayerId, cb) { + this.db.collection(collections.COPAYERS_LOOKUP).createIndex({ + copayerId: 1 + }); this.db.collection(collections.COPAYERS_LOOKUP).findOne({ copayerId: copayerId }, function(err, result) { @@ -312,6 +315,23 @@ Storage.prototype.storeAddressAndWallet = function(wallet, addresses, cb) { }); }; +Storage.prototype.fetchAddress = function(address, cb) { + var self = this; + + this.db.collection(collections.ADDRESSES).createIndex({ + address: 1 + }); + this.db.collection(collections.ADDRESSES).findOne({ + address: address, + }, function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + + return cb(null, Model.Address.fromObj(result)); + }); +}; + + Storage.prototype._dump = function(cb, fn) { fn = fn || console.log; cb = cb || function() {}; diff --git a/lib/wsapp.js b/lib/wsapp.js index b2fc7e8..da325f8 100644 --- a/lib/wsapp.js +++ b/lib/wsapp.js @@ -7,63 +7,57 @@ var log = require('npmlog'); log.debug = log.verbose; var Uuid = require('uuid'); -var WalletUtils = require('bitcore-wallet-utils'); -var Bitcore = WalletUtils.Bitcore; var WalletService = require('./server'); -var BlockchainMonitor = require('./blockchainmonitor') - -var Notification = require('./model/notification'); +var MessageBroker = require('./messagebroker'); log.level = 'debug'; -var io, bcMonitor; - var WsApp = function() {}; -WsApp._unauthorized = function(socket) { +WsApp.prototype._unauthorized = function(socket) { socket.emit('unauthorized'); socket.disconnect(); }; -WsApp.handleNotification = function(service, notification) { - if (notification.type == 'NewAddress') { - self.subscribeAddresses(notification.walletId, notification.data.address); - } - io.to(notification.walletId).emit('notification', notification); +WsApp.prototype._handleNotification = function(notification) { + this.io.to(notification.walletId).emit('notification', notification); }; -WsApp.start = function(server, config) { - io = require('socket.io')(server); - bcMonitor = new BlockchainMonitor(config.blockchainExplorerOpts); +WsApp.prototype.start = function(server, opts, cb) { + opts = opts || {}; + $.checkState(opts.messageBrokerOpts); - function handleNotification(notification) { - if (notification.type == 'NewAddress') { - bcMonitor.subscribeAddresses(notification.walletId, notification.data.address); - } - io.to(notification.walletId).emit('notification', notification); - }; + var self = this; - bcMonitor.on('notification', handleNotification); - WalletService.onNotification(handleNotification); + this.io = require('socket.io')(server); - io.on('connection', function(socket) { - socket.nonce = Uuid.v4(); - socket.emit('challenge', socket.nonce); + async.series([ - socket.on('authorize', function(data) { - if (data.message != socket.nonce) return WsApp._unauthorized(socket); + function(done) { + self.messageBroker = new MessageBroker(opts.messageBrokerOpts); + self.messageBroker.onMessage(_.bind(self._handleNotification, self)); + done(); + }, + function(done) { + self.io.on('connection', function(socket) { + socket.nonce = Uuid.v4(); + socket.on('authorize', function(data) { + if (data.message != socket.nonce) return self._unauthorized(socket); - WalletService.getInstanceWithAuth(data, function(err, service) { - if (err) return WsApp._unauthorized(socket); + WalletService.getInstanceWithAuth(data, function(err, service) { + if (err) return self._unauthorized(socket); - socket.join(service.walletId); - socket.emit('authorized'); - - bcMonitor.subscribeWallet(service, function(err) { - if (err) log.warn(err.message); + socket.join(service.walletId); + socket.emit('authorized'); + }); }); + + socket.emit('challenge', socket.nonce); }); - }); + done(); + }, + ], function(err) { + if (cb) return cb(err); }); }; diff --git a/messagebroker/bws-messagebroker.js b/messagebroker/bws-messagebroker.js new file mode 100644 index 0000000..d48dc63 --- /dev/null +++ b/messagebroker/bws-messagebroker.js @@ -0,0 +1,23 @@ +#!/usr/bin/env node + +'use strict'; + +var $ = require('preconditions').singleton(); +var io = require('socket.io'); +var log = require('npmlog'); +log.debug = log.verbose; + +var DEFAULT_PORT = 3380; + +var opts = { + port: parseInt(process.argv[2]) || DEFAULT_PORT, +}; + +var server = io(opts.port); +server.on('connection', function(socket) { + socket.on('msg', function(data) { + server.emit('msg', data); + }); +}); + +console.log('Message broker server listening on port ' + opts.port) diff --git a/test/integration/server.js b/test/integration/server.js index 641204e..8828674 100644 --- a/test/integration/server.js +++ b/test/integration/server.js @@ -19,7 +19,6 @@ var Utils = require('../../lib/utils'); var WalletUtils = require('bitcore-wallet-utils'); var Bitcore = WalletUtils.Bitcore; var Storage = require('../../lib/storage'); -var BlockchainMonitor = require('../../lib/blockchainmonitor'); var Model = require('../../lib/model'); var Wallet = Model.Wallet; @@ -28,7 +27,6 @@ var Address = Model.Address; var Copayer = Model.Copayer; var WalletService = require('../../lib/server'); -var NotificationBroadcaster = require('../../lib/notificationbroadcaster'); var TestData = require('../testdata'); var helpers = {}; @@ -2050,7 +2048,6 @@ describe('Wallet service', function() { server.getPendingTxs({}, function(err, txs) { var tx = txs[2]; var signatures = helpers.clientSign(tx, TestData.copayers[0].xPrivKey); - sinon.spy(server, '_emit'); server.signTx({ txProposalId: tx.id, signatures: signatures, @@ -2068,11 +2065,6 @@ describe('Wallet service', function() { should.not.exist(err); var types = _.pluck(notifications, 'type'); types.should.deep.equal(['NewOutgoingTx', 'TxProposalFinallyAccepted', 'TxProposalAcceptedBy']); - // Check also events - server._emit.getCall(0).args[1].type.should.equal('TxProposalAcceptedBy'); - server._emit.getCall(1).args[1].type.should.equal('TxProposalFinallyAccepted');; - server._emit.getCall(2).args[1].type.should.equal('NewOutgoingTx'); - done(); }); }); @@ -2738,7 +2730,7 @@ describe('Wallet service', function() { }); afterEach(function() { WalletService.scanConfig = scanConfigOld; - NotificationBroadcaster.removeAllListeners(); + server.messageBroker.removeAllListeners(); }); it('should start an asynchronous scan', function(done) { @@ -2755,7 +2747,7 @@ describe('Wallet service', function() { 'm/2147483647/1/0', 'm/2147483647/1/1', ]; - WalletService.onNotification(function(n) { + server.messageBroker.onMessage(function(n) { if (n.type == 'ScanFinished') { server.getWallet({}, function(err, wallet) { should.exist(wallet.scanStatus); @@ -2781,7 +2773,7 @@ describe('Wallet service', function() { }); it('should set scan status error when unable to reach blockchain', function(done) { blockchainExplorer.getAddressActivity = sinon.stub().yields('dummy error'); - WalletService.onNotification(function(n) { + server.messageBroker.onMessage(function(n) { if (n.type == 'ScanFinished') { should.exist(n.data.error); server.getWallet({}, function(err, wallet) { @@ -2800,7 +2792,7 @@ describe('Wallet service', function() { WalletService.scanConfig.SCAN_WINDOW = 1; var scans = 0; - WalletService.onNotification(function(n) { + server.messageBroker.onMessage(function(n) { if (n.type == 'ScanFinished') { scans++; if (scans == 2) done(); @@ -3014,7 +3006,6 @@ describe('Wallet service', function() { should.not.exist(err); var copayerId2 = result.copayerId; - helpers.getAuthServer(copayerId, function(server) { server.getWallet({}, function(err, wallet) { @@ -3040,102 +3031,3 @@ describe('Wallet service', function() { }); }); }); - - -describe('Blockchain monitor', function() { - var addressSubscriber; - - before(function(done) { - openDb(function() { - storage = new Storage({ - db: db - }); - done(); - }); - }); - - beforeEach(function(done) { - addressSubscriber = sinon.stub(); - addressSubscriber.subscribe = sinon.stub(); - sinon.stub(BlockchainMonitor.prototype, '_getAddressSubscriber').onFirstCall().returns(addressSubscriber); - - resetDb(function() { - blockchainExplorer = sinon.stub(); - WalletService.initialize({ - storage: storage, - blockchainExplorer: blockchainExplorer, - }, done); - }); - }); - afterEach(function() { - BlockchainMonitor.prototype._getAddressSubscriber.restore(); - }); - after(function(done) { - WalletService.shutDown(done); - }); - - it('should subscribe wallet', function(done) { - var monitor = new BlockchainMonitor(); - helpers.createAndJoinWallet(2, 2, function(server, wallet) { - server.createAddress({}, function(err, address1) { - should.not.exist(err); - server.createAddress({}, function(err, address2) { - should.not.exist(err); - monitor.subscribeWallet(server, function(err) { - should.not.exist(err); - addressSubscriber.subscribe.calledTwice.should.be.true; - addressSubscriber.subscribe.calledWith(address1.address).should.be.true; - addressSubscriber.subscribe.calledWith(address2.address).should.be.true; - done(); - }); - }); - }); - }); - }); - - it('should be able to subscribe new address', function(done) { - var monitor = new BlockchainMonitor(); - helpers.createAndJoinWallet(2, 2, function(server, wallet) { - server.createAddress({}, function(err, address1) { - should.not.exist(err); - monitor.subscribeWallet(server, function(err) { - should.not.exist(err); - addressSubscriber.subscribe.calledOnce.should.be.true; - addressSubscriber.subscribe.calledWith(address1.address).should.be.true; - server.createAddress({}, function(err, address2) { - should.not.exist(err); - monitor.subscribeAddresses(wallet.id, address2.address); - addressSubscriber.subscribe.calledTwice.should.be.true; - addressSubscriber.subscribe.calledWith(address2.address).should.be.true; - done(); - }); - }); - }); - }); - }); - - it('should create NewIncomingTx notification when a new tx arrives on registered address', function(done) { - var monitor = new BlockchainMonitor(); - helpers.createAndJoinWallet(2, 2, function(server, wallet) { - server.createAddress({}, function(err, address1) { - should.not.exist(err); - monitor.subscribeWallet(server, function(err) { - should.not.exist(err); - addressSubscriber.subscribe.calledOnce.should.be.true; - addressSubscriber.subscribe.getCall(0).args[0].should.equal(address1.address); - var handler = addressSubscriber.subscribe.getCall(0).args[1]; - _.isFunction(handler).should.be.true; - - monitor.on('notification', function(notification) { - notification.type.should.equal('NewIncomingTx'); - notification.data.address.should.equal(address1.address); - notification.data.txid.should.equal('txid'); - done(); - }); - - handler('txid'); - }); - }); - }); - }); -});