diff --git a/bws-mq.js b/bws-mq.js new file mode 100644 index 0000000..033a6e8 --- /dev/null +++ b/bws-mq.js @@ -0,0 +1,19 @@ +#!/usr/bin/env node + +'use strict'; + +var _ = require('lodash'); +var io = require('socket.io'); + +var DEFAULT_PORT = 3380; + +var port = parseInt(process.argv[2]) || DEFAULT_PORT; + +var server = io(port); +server.on('connection', function(socket) { + socket.on('notification', function(data) { + server.emit('notification', data); + }); +}); + +console.log('Message queue server listening on port ' + port) diff --git a/lib/server.js b/lib/server.js index 84f922e..3b30cdf 100644 --- a/lib/server.js +++ b/lib/server.js @@ -5,6 +5,7 @@ var async = require('async'); var log = require('npmlog'); log.debug = log.verbose; log.disableColor(); +var io = require('socket.io-client'); var WalletUtils = require('bitcore-wallet-utils'); var Bitcore = WalletUtils.Bitcore; @@ -27,6 +28,7 @@ var Notification = require('./model/notification'); var initialized = false; var lock, storage, blockchainExplorer, blockchainExplorerOpts; +var messageQueue; /** @@ -41,6 +43,7 @@ function WalletService() { this.storage = storage; this.blockchainExplorer = blockchainExplorer; this.blockchainExplorerOpts = blockchainExplorerOpts; + this.messageQueue = messageQueue; this.notifyTicker = 0; }; @@ -58,7 +61,6 @@ WalletService.onNotification = function(func) { WalletService.initialize = function(opts, cb) { $.shouldBeFunction(cb); - opts = opts || {}; lock = opts.lock || new Lock(opts.lockOpts); blockchainExplorer = opts.blockchainExplorer; @@ -67,19 +69,51 @@ WalletService.initialize = function(opts, cb) { if (initialized) return cb(); - if (opts.storage) { - storage = opts.storage; - initialized = true; - return cb(); - } else { + function initStorage(cb) { + if (opts.storage) { + storage = opts.storage; + return cb(); + } var newStorage = new Storage(); newStorage.connect(opts.storageOpts, function(err) { if (err) return cb(err); storage = newStorage; - initialized = true; return cb(); }); - } + }; + + function initMessageQueue(cb) { + if (opts.messageQueue) { + messageQueue = opts.messageQueue; + return cb(); + } + messageQueue = io.connect('http://localhost:3380', { + 'force new connection': true, + }); + messageQueue.on('connect', function() { + return cb(); + }); + messageQueue.on('connect_error', function(err) { + log.warn('Could not connect to message queue server'); + }); + }; + + async.series([ + + function(next) { + initStorage(next); + }, + function(next) { + initMessageQueue(next); + }, + ], function(err) { + if (err) { + log.error('Could not initialize', err); + throw err; + } + initialized = true; + return cb(); + }); }; @@ -316,6 +350,7 @@ WalletService.prototype._notify = function(type, data, isGlobal, cb) { }); this.storage.storeNotification(walletId, n, function() { self._emit('notification', n); + self.messageQueue.emit('notification', n); if (cb) return cb(); }); }; diff --git a/lib/wsapp.js b/lib/wsapp.js index b2fc7e8..8d9597c 100644 --- a/lib/wsapp.js +++ b/lib/wsapp.js @@ -7,16 +7,13 @@ var log = require('npmlog'); log.debug = log.verbose; var Uuid = require('uuid'); -var WalletUtils = require('bitcore-wallet-utils'); -var Bitcore = WalletUtils.Bitcore; var WalletService = require('./server'); -var BlockchainMonitor = require('./blockchainmonitor') var Notification = require('./model/notification'); log.level = 'debug'; -var io, bcMonitor; +var io, messageQueue; var WsApp = function() {}; @@ -25,45 +22,58 @@ WsApp._unauthorized = function(socket) { socket.disconnect(); }; -WsApp.handleNotification = function(service, notification) { - if (notification.type == 'NewAddress') { - self.subscribeAddresses(notification.walletId, notification.data.address); - } - io.to(notification.walletId).emit('notification', notification); -}; +// WsApp._handleNotification = function(notification) { +// console.log('*** [wsapp.js ln26] notification:', notification); // TODO -WsApp.start = function(server, config) { - io = require('socket.io')(server); - bcMonitor = new BlockchainMonitor(config.blockchainExplorerOpts); +// io.to(notification.walletId).emit('notification', notification); +// }; +WsApp._initMessageQueue = function(cb) { function handleNotification(notification) { - if (notification.type == 'NewAddress') { - bcMonitor.subscribeAddresses(notification.walletId, notification.data.address); - } io.to(notification.walletId).emit('notification', notification); }; - bcMonitor.on('notification', handleNotification); - WalletService.onNotification(handleNotification); + messageQueue = require('socket.io-client').connect('http://localhost:3380', { + 'force new connection': true, + }); + messageQueue.on('connect_error', function(err) { + log.warn('Could not connect to message queue server'); + }); + messageQueue.on('notification', handleNotification); - io.on('connection', function(socket) { - socket.nonce = Uuid.v4(); - socket.emit('challenge', socket.nonce); + messageQueue.on('connect', function() { + return cb(); + }); +}; + +WsApp.start = function(server, config, cb) { + io = require('socket.io')(server); - socket.on('authorize', function(data) { - if (data.message != socket.nonce) return WsApp._unauthorized(socket); + async.series([ - WalletService.getInstanceWithAuth(data, function(err, service) { - if (err) return WsApp._unauthorized(socket); + function(done) { + WsApp._initMessageQueue(done); + }, + function(done) { + io.on('connection', function(socket) { + socket.nonce = Uuid.v4(); + socket.on('authorize', function(data) { + if (data.message != socket.nonce) return WsApp._unauthorized(socket); - socket.join(service.walletId); - socket.emit('authorized'); + WalletService.getInstanceWithAuth(data, function(err, service) { + if (err) return WsApp._unauthorized(socket); - bcMonitor.subscribeWallet(service, function(err) { - if (err) log.warn(err.message); + socket.join(service.walletId); + socket.emit('authorized'); + }); }); + + socket.emit('challenge', socket.nonce); + done(); }); - }); + }, + ], function(err) { + if (cb) return cb(err); }); }; diff --git a/test/integration/server.js b/test/integration/server.js index ea2f16d..3f9a12d 100644 --- a/test/integration/server.js +++ b/test/integration/server.js @@ -865,7 +865,7 @@ describe('Wallet service', function() { }); }); - describe('#createTx', function() { + describe.only('#createTx', function() { var server, wallet; beforeEach(function(done) { helpers.createAndJoinWallet(2, 3, function(s, w) {