Browse Source

message broker

activeAddress
Ivan Socolsky 10 years ago
parent
commit
a0e21ed8e1
  1. 7
      config.js
  2. 46
      lib/messagebroker.js
  3. 23
      lib/messagequeue.js
  4. 25
      lib/notificationbroadcaster.js
  5. 31
      lib/server.js
  6. 23
      messagebroker/bws-messagebroker.js
  7. 16
      messagequeue/bws-mq.js
  8. 14
      test/integration/server.js

7
config.js

@ -23,8 +23,11 @@ var config = {
// port: 3231,
// },
},
messageQueueOpts: {
url: 'http://localhost:3380',
messageBrokerOpts: {
// To use message broker server, uncomment this:
messageBrokerServer: {
url: 'http://localhost:3380',
},
},
blockchainExplorerOpts: {
livenet: {

46
lib/messagebroker.js

@ -0,0 +1,46 @@
var $ = require('preconditions').singleton();
var _ = require('lodash');
var inherits = require('inherits');
var events = require('events');
var nodeutil = require('util');
var log = require('npmlog');
log.debug = log.verbose;
log.disableColor();
function MessageBroker(opts) {
var self = this;
opts = opts || {};
if (opts.messageBrokerServer) {
var url = opts.messageBrokerServer.url;
this.remote = true;
this.mq = require('socket.io-client').connect(url);
this.mq.on('connect', function() {});
this.mq.on('connect_error', function() {
log.warn('Message queue server connection error');
});
this.mq.on('msg', function(data) {
self.emit('msg', data);
});
log.info('Using message queue server at ' + url);
}
};
nodeutil.inherits(MessageBroker, events.EventEmitter);
MessageBroker.prototype.send = function(data) {
if (this.remote) {
this.mq.emit('msg', data);
} else {
this.emit('msg', data);
}
};
MessageBroker.prototype.onMessage = function(handler) {
this.on('msg', handler);
};
module.exports = MessageBroker;

23
lib/messagequeue.js

@ -1,23 +0,0 @@
'use strict';
var $ = require('preconditions').singleton();
var io = require('socket.io');
var log = require('npmlog');
log.debug = log.verbose;
var MessageQueue = function() {};
MessageQueue.start = function(opts, cb) {
opts = opts || {};
$.checkIsNumber(opts.port, 'Invalid port number');
var server = io(opts.port);
server.on('connection', function(socket) {
socket.on('notification', function(data) {
server.emit('notification', data);
});
});
return cb();
};
module.exports = MessageQueue;

25
lib/notificationbroadcaster.js

@ -1,25 +0,0 @@
'use strict';
var log = require('npmlog');
log.debug = log.verbose;
var inherits = require('inherits');
var events = require('events');
var nodeutil = require('util');
function NotificationBroadcaster() {};
nodeutil.inherits(NotificationBroadcaster, events.EventEmitter);
NotificationBroadcaster.prototype.broadcast = function(eventName, notification) {
this.emit(eventName, notification);
};
var _instance;
NotificationBroadcaster.singleton = function() {
if (!_instance) {
_instance = new NotificationBroadcaster();
}
return _instance;
};
module.exports = NotificationBroadcaster.singleton();

31
lib/server.js

@ -5,7 +5,6 @@ var async = require('async');
var log = require('npmlog');
log.debug = log.verbose;
log.disableColor();
var io = require('socket.io-client');
var WalletUtils = require('bitcore-wallet-utils');
var Bitcore = WalletUtils.Bitcore;
@ -17,6 +16,7 @@ var ClientError = require('./clienterror');
var Utils = require('./utils');
var Lock = require('./lock');
var Storage = require('./storage');
var MessageBroker = require('./messagebroker');
var NotificationBroadcaster = require('./notificationbroadcaster');
var BlockchainExplorer = require('./blockchainexplorer');
@ -28,7 +28,7 @@ var Notification = require('./model/notification');
var initialized = false;
var lock, storage, blockchainExplorer, blockchainExplorerOpts;
var messageQueue;
var messageBroker;
/**
@ -43,7 +43,7 @@ function WalletService() {
this.storage = storage;
this.blockchainExplorer = blockchainExplorer;
this.blockchainExplorerOpts = blockchainExplorerOpts;
this.messageQueue = messageQueue;
this.messageBroker = messageBroker;
this.notifyTicker = 0;
};
@ -82,10 +82,11 @@ WalletService.initialize = function(opts, cb) {
});
};
function initMessageQueue(cb) {
if (opts.messageQueue) {
messageQueue = opts.messageQueue;
return cb();
function initMessageBroker(cb) {
if (opts.messageBroker) {
messageBroker = opts.messageBroker;
} else {
messageBroker = new MessageBroker(opts.messageBrokerOpts);
}
return cb();
};
@ -96,7 +97,7 @@ WalletService.initialize = function(opts, cb) {
initStorage(next);
},
function(next) {
initMessageQueue(next);
initMessageBroker(next);
},
], function(err) {
if (err) {
@ -307,15 +308,6 @@ WalletService.prototype._verifySignature = function(text, signature, pubKey) {
return WalletUtils.verifyMessage(text, signature, pubKey);
};
/**
* _emit
*
* @param {Object} args
*/
WalletService.prototype._emit = function(eventName, args) {
NotificationBroadcaster.broadcast(eventName, args);
};
/**
* _notify
*
@ -341,10 +333,7 @@ WalletService.prototype._notify = function(type, data, isGlobal, cb) {
walletId: walletId,
});
this.storage.storeNotification(walletId, n, function() {
self._emit('notification', n);
if (self.messageQueue) {
self.messageQueue.emit('notification', n);
}
self.messageBroker.send(n);
if (cb) return cb();
});
};

23
messagebroker/bws-messagebroker.js

@ -0,0 +1,23 @@
#!/usr/bin/env node
'use strict';
var $ = require('preconditions').singleton();
var io = require('socket.io');
var log = require('npmlog');
log.debug = log.verbose;
var DEFAULT_PORT = 3380;
var opts = {
port: parseInt(process.argv[2]) || DEFAULT_PORT,
};
var server = io(opts.port);
server.on('connection', function(socket) {
socket.on('msg', function(data) {
server.emit('msg', data);
});
});
console.log('Message queue server listening on port ' + opts.port)

16
messagequeue/bws-mq.js

@ -1,16 +0,0 @@
#!/usr/bin/env node
'use strict';
var MQ = require('../lib/messagequeue');
var DEFAULT_PORT = 3380;
var opts = {
port: parseInt(process.argv[2]) || DEFAULT_PORT,
};
MQ.start(opts, function(err) {
if (err) throw err;
console.log('Message queue server listening on port ' + port)
});

14
test/integration/server.js

@ -2050,7 +2050,6 @@ describe('Wallet service', function() {
server.getPendingTxs({}, function(err, txs) {
var tx = txs[2];
var signatures = helpers.clientSign(tx, TestData.copayers[0].xPrivKey);
sinon.spy(server, '_emit');
server.signTx({
txProposalId: tx.id,
signatures: signatures,
@ -2068,11 +2067,6 @@ describe('Wallet service', function() {
should.not.exist(err);
var types = _.pluck(notifications, 'type');
types.should.deep.equal(['NewOutgoingTx', 'TxProposalFinallyAccepted', 'TxProposalAcceptedBy']);
// Check also events
server._emit.getCall(0).args[1].type.should.equal('TxProposalAcceptedBy');
server._emit.getCall(1).args[1].type.should.equal('TxProposalFinallyAccepted');;
server._emit.getCall(2).args[1].type.should.equal('NewOutgoingTx');
done();
});
});
@ -2738,7 +2732,7 @@ describe('Wallet service', function() {
});
afterEach(function() {
WalletService.scanConfig = scanConfigOld;
NotificationBroadcaster.removeAllListeners();
server.messageBroker.removeAllListeners();
});
it('should start an asynchronous scan', function(done) {
@ -2755,7 +2749,7 @@ describe('Wallet service', function() {
'm/2147483647/1/0',
'm/2147483647/1/1',
];
WalletService.onNotification(function(n) {
server.messageBroker.onMessage(function(n) {
if (n.type == 'ScanFinished') {
server.getWallet({}, function(err, wallet) {
should.exist(wallet.scanStatus);
@ -2781,7 +2775,7 @@ describe('Wallet service', function() {
});
it('should set scan status error when unable to reach blockchain', function(done) {
blockchainExplorer.getAddressActivity = sinon.stub().yields('dummy error');
WalletService.onNotification(function(n) {
server.messageBroker.onMessage(function(n) {
if (n.type == 'ScanFinished') {
should.exist(n.data.error);
server.getWallet({}, function(err, wallet) {
@ -2800,7 +2794,7 @@ describe('Wallet service', function() {
WalletService.scanConfig.SCAN_WINDOW = 1;
var scans = 0;
WalletService.onNotification(function(n) {
server.messageBroker.onMessage(function(n) {
if (n.type == 'ScanFinished') {
scans++;
if (scans == 2) done();

Loading…
Cancel
Save