Browse Source

refactoring

activeAddress
Ivan Socolsky 10 years ago
parent
commit
cce8b64260
  1. 19
      bws-mq.js
  2. 3
      config.js
  3. 23
      lib/messagequeue.js
  4. 14
      lib/server.js
  5. 41
      lib/wsapp.js
  6. 16
      messagequeue/bws-mq.js
  7. 2
      test/integration/server.js

19
bws-mq.js

@ -1,19 +0,0 @@
#!/usr/bin/env node
'use strict';
var _ = require('lodash');
var io = require('socket.io');
var DEFAULT_PORT = 3380;
var port = parseInt(process.argv[2]) || DEFAULT_PORT;
var server = io(port);
server.on('connection', function(socket) {
socket.on('notification', function(data) {
server.emit('notification', data);
});
});
console.log('Message queue server listening on port ' + port)

3
config.js

@ -23,6 +23,9 @@ var config = {
// port: 3231, // port: 3231,
// }, // },
}, },
messageQueueOpts: {
url: 'http://localhost:3380',
},
blockchainExplorerOpts: { blockchainExplorerOpts: {
livenet: { livenet: {
provider: 'insight', provider: 'insight',

23
lib/messagequeue.js

@ -0,0 +1,23 @@
'use strict';
var $ = require('preconditions').singleton();
var io = require('socket.io');
var log = require('npmlog');
log.debug = log.verbose;
var MessageQueue = function() {};
MessageQueue.start = function(opts, cb) {
opts = opts || {};
$.checkIsNumber(opts.port, 'Invalid port number');
var server = io(opts.port);
server.on('connection', function(socket) {
socket.on('notification', function(data) {
server.emit('notification', data);
});
});
return cb();
};
module.exports = MessageQueue;

14
lib/server.js

@ -87,15 +87,7 @@ WalletService.initialize = function(opts, cb) {
messageQueue = opts.messageQueue; messageQueue = opts.messageQueue;
return cb(); return cb();
} }
messageQueue = io.connect('http://localhost:3380', { return cb();
'force new connection': true,
});
messageQueue.on('connect', function() {
return cb();
});
messageQueue.on('connect_error', function(err) {
log.warn('Could not connect to message queue server');
});
}; };
async.series([ async.series([
@ -350,7 +342,9 @@ WalletService.prototype._notify = function(type, data, isGlobal, cb) {
}); });
this.storage.storeNotification(walletId, n, function() { this.storage.storeNotification(walletId, n, function() {
self._emit('notification', n); self._emit('notification', n);
self.messageQueue.emit('notification', n); if (self.messageQueue) {
self.messageQueue.emit('notification', n);
}
if (cb) return cb(); if (cb) return cb();
}); });
}; };

41
lib/wsapp.js

@ -22,37 +22,30 @@ WsApp._unauthorized = function(socket) {
socket.disconnect(); socket.disconnect();
}; };
// WsApp._handleNotification = function(notification) { WsApp._handleNotification = function(notification) {
// console.log('*** [wsapp.js ln26] notification:', notification); // TODO io.to(notification.walletId).emit('notification', notification);
// io.to(notification.walletId).emit('notification', notification);
// };
WsApp._initMessageQueue = function(cb) {
function handleNotification(notification) {
io.to(notification.walletId).emit('notification', notification);
};
messageQueue = require('socket.io-client').connect('http://localhost:3380', {
'force new connection': true,
});
messageQueue.on('connect_error', function(err) {
log.warn('Could not connect to message queue server');
});
messageQueue.on('notification', handleNotification);
messageQueue.on('connect', function() {
return cb();
});
}; };
WsApp.start = function(server, config, cb) { WsApp.start = function(server, opts, cb) {
opts = opts || {};
$.checkState(opts.messageQueueOpts);
io = require('socket.io')(server); io = require('socket.io')(server);
async.series([ async.series([
function(done) { function(done) {
WsApp._initMessageQueue(done); messageQueue = require('socket.io-client').connect(opts.messageQueueOpts.url, {
'force new connection': true,
});
messageQueue.on('connect_error', function(err) {
log.warn('Could not connect to message queue server');
});
messageQueue.on('notification', WsApp._handleNotification);
messageQueue.on('connect', function() {
done();
});
}, },
function(done) { function(done) {
io.on('connection', function(socket) { io.on('connection', function(socket) {

16
messagequeue/bws-mq.js

@ -0,0 +1,16 @@
#!/usr/bin/env node
'use strict';
var MQ = require('../lib/messagequeue');
var DEFAULT_PORT = 3380;
var opts = {
port: parseInt(process.argv[2]) || DEFAULT_PORT,
};
MQ.start(opts, function(err) {
if (err) throw err;
console.log('Message queue server listening on port ' + port)
});

2
test/integration/server.js

@ -865,7 +865,7 @@ describe('Wallet service', function() {
}); });
}); });
describe.only('#createTx', function() { describe('#createTx', function() {
var server, wallet; var server, wallet;
beforeEach(function(done) { beforeEach(function(done) {
helpers.createAndJoinWallet(2, 3, function(s, w) { helpers.createAndJoinWallet(2, 3, function(s, w) {

Loading…
Cancel
Save