diff --git a/config.js b/config.js index 8b36020..e64b966 100644 --- a/config.js +++ b/config.js @@ -23,8 +23,11 @@ var config = { // port: 3231, // }, }, - messageQueueOpts: { - url: 'http://localhost:3380', + messageBrokerOpts: { + // To use message broker server, uncomment this: + messageBrokerServer: { + url: 'http://localhost:3380', + }, }, blockchainExplorerOpts: { livenet: { diff --git a/lib/messagebroker.js b/lib/messagebroker.js new file mode 100644 index 0000000..028bdcb --- /dev/null +++ b/lib/messagebroker.js @@ -0,0 +1,46 @@ +var $ = require('preconditions').singleton(); +var _ = require('lodash'); +var inherits = require('inherits'); +var events = require('events'); +var nodeutil = require('util'); +var log = require('npmlog'); +log.debug = log.verbose; +log.disableColor(); + +function MessageBroker(opts) { + var self = this; + + opts = opts || {}; + if (opts.messageBrokerServer) { + var url = opts.messageBrokerServer.url; + + this.remote = true; + this.mq = require('socket.io-client').connect(url); + this.mq.on('connect', function() {}); + this.mq.on('connect_error', function() { + log.warn('Message queue server connection error'); + }); + + this.mq.on('msg', function(data) { + self.emit('msg', data); + }); + + log.info('Using message queue server at ' + url); + } +}; + +nodeutil.inherits(MessageBroker, events.EventEmitter); + +MessageBroker.prototype.send = function(data) { + if (this.remote) { + this.mq.emit('msg', data); + } else { + this.emit('msg', data); + } +}; + +MessageBroker.prototype.onMessage = function(handler) { + this.on('msg', handler); +}; + +module.exports = MessageBroker; diff --git a/lib/messagequeue.js b/lib/messagequeue.js deleted file mode 100644 index fa31297..0000000 --- a/lib/messagequeue.js +++ /dev/null @@ -1,23 +0,0 @@ -'use strict'; - -var $ = require('preconditions').singleton(); -var io = require('socket.io'); -var log = require('npmlog'); -log.debug = log.verbose; - -var MessageQueue = function() {}; - -MessageQueue.start = function(opts, cb) { - opts = opts || {}; - $.checkIsNumber(opts.port, 'Invalid port number'); - - var server = io(opts.port); - server.on('connection', function(socket) { - socket.on('notification', function(data) { - server.emit('notification', data); - }); - }); - return cb(); -}; - -module.exports = MessageQueue; diff --git a/lib/notificationbroadcaster.js b/lib/notificationbroadcaster.js deleted file mode 100644 index 692c43e..0000000 --- a/lib/notificationbroadcaster.js +++ /dev/null @@ -1,25 +0,0 @@ -'use strict'; - -var log = require('npmlog'); -log.debug = log.verbose; -var inherits = require('inherits'); -var events = require('events'); -var nodeutil = require('util'); - -function NotificationBroadcaster() {}; - -nodeutil.inherits(NotificationBroadcaster, events.EventEmitter); - -NotificationBroadcaster.prototype.broadcast = function(eventName, notification) { - this.emit(eventName, notification); -}; - -var _instance; -NotificationBroadcaster.singleton = function() { - if (!_instance) { - _instance = new NotificationBroadcaster(); - } - return _instance; -}; - -module.exports = NotificationBroadcaster.singleton(); diff --git a/lib/server.js b/lib/server.js index 1e79ae5..1fcd977 100644 --- a/lib/server.js +++ b/lib/server.js @@ -5,7 +5,6 @@ var async = require('async'); var log = require('npmlog'); log.debug = log.verbose; log.disableColor(); -var io = require('socket.io-client'); var WalletUtils = require('bitcore-wallet-utils'); var Bitcore = WalletUtils.Bitcore; @@ -17,6 +16,7 @@ var ClientError = require('./clienterror'); var Utils = require('./utils'); var Lock = require('./lock'); var Storage = require('./storage'); +var MessageBroker = require('./messagebroker'); var NotificationBroadcaster = require('./notificationbroadcaster'); var BlockchainExplorer = require('./blockchainexplorer'); @@ -28,7 +28,7 @@ var Notification = require('./model/notification'); var initialized = false; var lock, storage, blockchainExplorer, blockchainExplorerOpts; -var messageQueue; +var messageBroker; /** @@ -43,7 +43,7 @@ function WalletService() { this.storage = storage; this.blockchainExplorer = blockchainExplorer; this.blockchainExplorerOpts = blockchainExplorerOpts; - this.messageQueue = messageQueue; + this.messageBroker = messageBroker; this.notifyTicker = 0; }; @@ -82,10 +82,11 @@ WalletService.initialize = function(opts, cb) { }); }; - function initMessageQueue(cb) { - if (opts.messageQueue) { - messageQueue = opts.messageQueue; - return cb(); + function initMessageBroker(cb) { + if (opts.messageBroker) { + messageBroker = opts.messageBroker; + } else { + messageBroker = new MessageBroker(opts.messageBrokerOpts); } return cb(); }; @@ -96,7 +97,7 @@ WalletService.initialize = function(opts, cb) { initStorage(next); }, function(next) { - initMessageQueue(next); + initMessageBroker(next); }, ], function(err) { if (err) { @@ -307,15 +308,6 @@ WalletService.prototype._verifySignature = function(text, signature, pubKey) { return WalletUtils.verifyMessage(text, signature, pubKey); }; -/** - * _emit - * - * @param {Object} args - */ -WalletService.prototype._emit = function(eventName, args) { - NotificationBroadcaster.broadcast(eventName, args); -}; - /** * _notify * @@ -341,10 +333,7 @@ WalletService.prototype._notify = function(type, data, isGlobal, cb) { walletId: walletId, }); this.storage.storeNotification(walletId, n, function() { - self._emit('notification', n); - if (self.messageQueue) { - self.messageQueue.emit('notification', n); - } + self.messageBroker.send(n); if (cb) return cb(); }); }; diff --git a/messagebroker/bws-messagebroker.js b/messagebroker/bws-messagebroker.js new file mode 100644 index 0000000..68f43a7 --- /dev/null +++ b/messagebroker/bws-messagebroker.js @@ -0,0 +1,23 @@ +#!/usr/bin/env node + +'use strict'; + +var $ = require('preconditions').singleton(); +var io = require('socket.io'); +var log = require('npmlog'); +log.debug = log.verbose; + +var DEFAULT_PORT = 3380; + +var opts = { + port: parseInt(process.argv[2]) || DEFAULT_PORT, +}; + +var server = io(opts.port); +server.on('connection', function(socket) { + socket.on('msg', function(data) { + server.emit('msg', data); + }); +}); + +console.log('Message queue server listening on port ' + opts.port) diff --git a/messagequeue/bws-mq.js b/messagequeue/bws-mq.js deleted file mode 100644 index c466902..0000000 --- a/messagequeue/bws-mq.js +++ /dev/null @@ -1,16 +0,0 @@ -#!/usr/bin/env node - -'use strict'; - -var MQ = require('../lib/messagequeue'); - -var DEFAULT_PORT = 3380; - -var opts = { - port: parseInt(process.argv[2]) || DEFAULT_PORT, -}; - -MQ.start(opts, function(err) { - if (err) throw err; - console.log('Message queue server listening on port ' + port) -}); diff --git a/test/integration/server.js b/test/integration/server.js index ea2f16d..5d05111 100644 --- a/test/integration/server.js +++ b/test/integration/server.js @@ -2050,7 +2050,6 @@ describe('Wallet service', function() { server.getPendingTxs({}, function(err, txs) { var tx = txs[2]; var signatures = helpers.clientSign(tx, TestData.copayers[0].xPrivKey); - sinon.spy(server, '_emit'); server.signTx({ txProposalId: tx.id, signatures: signatures, @@ -2068,11 +2067,6 @@ describe('Wallet service', function() { should.not.exist(err); var types = _.pluck(notifications, 'type'); types.should.deep.equal(['NewOutgoingTx', 'TxProposalFinallyAccepted', 'TxProposalAcceptedBy']); - // Check also events - server._emit.getCall(0).args[1].type.should.equal('TxProposalAcceptedBy'); - server._emit.getCall(1).args[1].type.should.equal('TxProposalFinallyAccepted');; - server._emit.getCall(2).args[1].type.should.equal('NewOutgoingTx'); - done(); }); }); @@ -2738,7 +2732,7 @@ describe('Wallet service', function() { }); afterEach(function() { WalletService.scanConfig = scanConfigOld; - NotificationBroadcaster.removeAllListeners(); + server.messageBroker.removeAllListeners(); }); it('should start an asynchronous scan', function(done) { @@ -2755,7 +2749,7 @@ describe('Wallet service', function() { 'm/2147483647/1/0', 'm/2147483647/1/1', ]; - WalletService.onNotification(function(n) { + server.messageBroker.onMessage(function(n) { if (n.type == 'ScanFinished') { server.getWallet({}, function(err, wallet) { should.exist(wallet.scanStatus); @@ -2781,7 +2775,7 @@ describe('Wallet service', function() { }); it('should set scan status error when unable to reach blockchain', function(done) { blockchainExplorer.getAddressActivity = sinon.stub().yields('dummy error'); - WalletService.onNotification(function(n) { + server.messageBroker.onMessage(function(n) { if (n.type == 'ScanFinished') { should.exist(n.data.error); server.getWallet({}, function(err, wallet) { @@ -2800,7 +2794,7 @@ describe('Wallet service', function() { WalletService.scanConfig.SCAN_WINDOW = 1; var scans = 0; - WalletService.onNotification(function(n) { + server.messageBroker.onMessage(function(n) { if (n.type == 'ScanFinished') { scans++; if (scans == 2) done();