Browse Source

add message queue service

activeAddress
Ivan Socolsky 10 years ago
parent
commit
bd45f8ef99
  1. 19
      bws-mq.js
  2. 51
      lib/server.js
  3. 70
      lib/wsapp.js
  4. 2
      test/integration/server.js

19
bws-mq.js

@ -0,0 +1,19 @@
#!/usr/bin/env node
'use strict';
var _ = require('lodash');
var io = require('socket.io');
var DEFAULT_PORT = 3380;
var port = parseInt(process.argv[2]) || DEFAULT_PORT;
var server = io(port);
server.on('connection', function(socket) {
socket.on('notification', function(data) {
server.emit('notification', data);
});
});
console.log('Message queue server listening on port ' + port)

51
lib/server.js

@ -5,6 +5,7 @@ var async = require('async');
var log = require('npmlog'); var log = require('npmlog');
log.debug = log.verbose; log.debug = log.verbose;
log.disableColor(); log.disableColor();
var io = require('socket.io-client');
var WalletUtils = require('bitcore-wallet-utils'); var WalletUtils = require('bitcore-wallet-utils');
var Bitcore = WalletUtils.Bitcore; var Bitcore = WalletUtils.Bitcore;
@ -27,6 +28,7 @@ var Notification = require('./model/notification');
var initialized = false; var initialized = false;
var lock, storage, blockchainExplorer, blockchainExplorerOpts; var lock, storage, blockchainExplorer, blockchainExplorerOpts;
var messageQueue;
/** /**
@ -41,6 +43,7 @@ function WalletService() {
this.storage = storage; this.storage = storage;
this.blockchainExplorer = blockchainExplorer; this.blockchainExplorer = blockchainExplorer;
this.blockchainExplorerOpts = blockchainExplorerOpts; this.blockchainExplorerOpts = blockchainExplorerOpts;
this.messageQueue = messageQueue;
this.notifyTicker = 0; this.notifyTicker = 0;
}; };
@ -58,7 +61,6 @@ WalletService.onNotification = function(func) {
WalletService.initialize = function(opts, cb) { WalletService.initialize = function(opts, cb) {
$.shouldBeFunction(cb); $.shouldBeFunction(cb);
opts = opts || {}; opts = opts || {};
lock = opts.lock || new Lock(opts.lockOpts); lock = opts.lock || new Lock(opts.lockOpts);
blockchainExplorer = opts.blockchainExplorer; blockchainExplorer = opts.blockchainExplorer;
@ -67,19 +69,51 @@ WalletService.initialize = function(opts, cb) {
if (initialized) if (initialized)
return cb(); return cb();
if (opts.storage) { function initStorage(cb) {
storage = opts.storage; if (opts.storage) {
initialized = true; storage = opts.storage;
return cb(); return cb();
} else { }
var newStorage = new Storage(); var newStorage = new Storage();
newStorage.connect(opts.storageOpts, function(err) { newStorage.connect(opts.storageOpts, function(err) {
if (err) return cb(err); if (err) return cb(err);
storage = newStorage; storage = newStorage;
initialized = true;
return cb(); return cb();
}); });
} };
function initMessageQueue(cb) {
if (opts.messageQueue) {
messageQueue = opts.messageQueue;
return cb();
}
messageQueue = io.connect('http://localhost:3380', {
'force new connection': true,
});
messageQueue.on('connect', function() {
return cb();
});
messageQueue.on('connect_error', function(err) {
log.warn('Could not connect to message queue server');
});
};
async.series([
function(next) {
initStorage(next);
},
function(next) {
initMessageQueue(next);
},
], function(err) {
if (err) {
log.error('Could not initialize', err);
throw err;
}
initialized = true;
return cb();
});
}; };
@ -316,6 +350,7 @@ WalletService.prototype._notify = function(type, data, isGlobal, cb) {
}); });
this.storage.storeNotification(walletId, n, function() { this.storage.storeNotification(walletId, n, function() {
self._emit('notification', n); self._emit('notification', n);
self.messageQueue.emit('notification', n);
if (cb) return cb(); if (cb) return cb();
}); });
}; };

70
lib/wsapp.js

@ -7,16 +7,13 @@ var log = require('npmlog');
log.debug = log.verbose; log.debug = log.verbose;
var Uuid = require('uuid'); var Uuid = require('uuid');
var WalletUtils = require('bitcore-wallet-utils');
var Bitcore = WalletUtils.Bitcore;
var WalletService = require('./server'); var WalletService = require('./server');
var BlockchainMonitor = require('./blockchainmonitor')
var Notification = require('./model/notification'); var Notification = require('./model/notification');
log.level = 'debug'; log.level = 'debug';
var io, bcMonitor; var io, messageQueue;
var WsApp = function() {}; var WsApp = function() {};
@ -25,45 +22,58 @@ WsApp._unauthorized = function(socket) {
socket.disconnect(); socket.disconnect();
}; };
WsApp.handleNotification = function(service, notification) { // WsApp._handleNotification = function(notification) {
if (notification.type == 'NewAddress') { // console.log('*** [wsapp.js ln26] notification:', notification); // TODO
self.subscribeAddresses(notification.walletId, notification.data.address);
}
io.to(notification.walletId).emit('notification', notification);
};
WsApp.start = function(server, config) { // io.to(notification.walletId).emit('notification', notification);
io = require('socket.io')(server); // };
bcMonitor = new BlockchainMonitor(config.blockchainExplorerOpts);
WsApp._initMessageQueue = function(cb) {
function handleNotification(notification) { function handleNotification(notification) {
if (notification.type == 'NewAddress') {
bcMonitor.subscribeAddresses(notification.walletId, notification.data.address);
}
io.to(notification.walletId).emit('notification', notification); io.to(notification.walletId).emit('notification', notification);
}; };
bcMonitor.on('notification', handleNotification); messageQueue = require('socket.io-client').connect('http://localhost:3380', {
WalletService.onNotification(handleNotification); 'force new connection': true,
});
messageQueue.on('connect_error', function(err) {
log.warn('Could not connect to message queue server');
});
messageQueue.on('notification', handleNotification);
io.on('connection', function(socket) { messageQueue.on('connect', function() {
socket.nonce = Uuid.v4(); return cb();
socket.emit('challenge', socket.nonce); });
};
WsApp.start = function(server, config, cb) {
io = require('socket.io')(server);
socket.on('authorize', function(data) { async.series([
if (data.message != socket.nonce) return WsApp._unauthorized(socket);
WalletService.getInstanceWithAuth(data, function(err, service) { function(done) {
if (err) return WsApp._unauthorized(socket); WsApp._initMessageQueue(done);
},
function(done) {
io.on('connection', function(socket) {
socket.nonce = Uuid.v4();
socket.on('authorize', function(data) {
if (data.message != socket.nonce) return WsApp._unauthorized(socket);
socket.join(service.walletId); WalletService.getInstanceWithAuth(data, function(err, service) {
socket.emit('authorized'); if (err) return WsApp._unauthorized(socket);
bcMonitor.subscribeWallet(service, function(err) { socket.join(service.walletId);
if (err) log.warn(err.message); socket.emit('authorized');
});
}); });
socket.emit('challenge', socket.nonce);
done();
}); });
}); },
], function(err) {
if (cb) return cb(err);
}); });
}; };

2
test/integration/server.js

@ -865,7 +865,7 @@ describe('Wallet service', function() {
}); });
}); });
describe('#createTx', function() { describe.only('#createTx', function() {
var server, wallet; var server, wallet;
beforeEach(function(done) { beforeEach(function(done) {
helpers.createAndJoinWallet(2, 3, function(s, w) { helpers.createAndJoinWallet(2, 3, function(s, w) {

Loading…
Cancel
Save