From 1422107c6e91230780ffadf63adaac1288a45c19 Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Thu, 30 Apr 2015 20:31:45 -0300 Subject: [PATCH 01/12] return after generating notification --- lib/server.js | 98 +++++++++++++++++++++++--------------- test/integration/server.js | 1 - 2 files changed, 59 insertions(+), 40 deletions(-) diff --git a/lib/server.js b/lib/server.js index 99aa833..84f922e 100644 --- a/lib/server.js +++ b/lib/server.js @@ -260,11 +260,11 @@ WalletService.prototype.replaceTemporaryRequestKey = function(opts, cb) { walletId: opts.walletId, copayerId: self.copayerId, copayerName: opts.name, - }); - - return cb(null, { - copayerId: self.copayerId, - wallet: wallet + }, false, function() { + return cb(null, { + copayerId: self.copayerId, + wallet: wallet + }); }); }); }); @@ -297,7 +297,7 @@ WalletService.prototype._emit = function(eventName, args) { * @param {Object} data * @param {Boolean} isGlobal - If true, the notification is not issued on behalf of any particular copayer (defaults to false) */ -WalletService.prototype._notify = function(type, data, isGlobal) { +WalletService.prototype._notify = function(type, data, isGlobal, cb) { var self = this; log.debug('Notification', type, data); @@ -316,6 +316,7 @@ WalletService.prototype._notify = function(type, data, isGlobal) { }); this.storage.storeNotification(walletId, n, function() { self._emit('notification', n); + if (cb) return cb(); }); }; @@ -379,10 +380,11 @@ WalletService.prototype.joinWallet = function(opts, cb) { walletId: opts.walletId, copayerId: copayer.id, copayerName: copayer.name, - }); - return cb(null, { - copayerId: copayer.id, - wallet: wallet + }, false, function() { + return cb(null, { + copayerId: copayer.id, + wallet: wallet + }); }); }); }); @@ -411,8 +413,9 @@ WalletService.prototype.createAddress = function(opts, cb) { self._notify('NewAddress', { address: address.address, + }, false, function() { + return cb(null, address); }); - return cb(null, address); }); }); }); @@ -710,8 +713,9 @@ WalletService.prototype.createTx = function(opts, cb) { self._notify('NewTxProposal', { amount: opts.amount + }, false, function() { + return cb(null, txp); }); - return cb(null, txp); }); }); }); @@ -783,8 +787,9 @@ WalletService.prototype.removePendingTx = function(opts, cb) { if (actors.length > 1 || (actors.length == 1 && actors[0] !== self.copayerId)) return cb(new ClientError('TXACTIONED', 'Cannot remove a proposal signed/rejected by other copayers')); - self._notify('TxProposalRemoved'); - self.storage.removeTx(self.walletId, txp.id, cb); + self._notify('TxProposalRemoved', {}, false, function() { + self.storage.removeTx(self.walletId, txp.id, cb); + }); }); }); }; @@ -839,18 +844,26 @@ WalletService.prototype.signTx = function(opts, cb) { self.storage.storeTx(self.walletId, txp, function(err) { if (err) return cb(err); - self._notify('TxProposalAcceptedBy', { - txProposalId: opts.txProposalId, - copayerId: self.copayerId, + async.parallel([ + + function(done) { + self._notify('TxProposalAcceptedBy', { + txProposalId: opts.txProposalId, + copayerId: self.copayerId, + }, false, done); + }, + function(done) { + if (txp.isAccepted()) { + self._notify('TxProposalFinallyAccepted', { + txProposalId: opts.txProposalId, + }, false, done); + } else { + done(); + } + }, + ], function() { + return cb(null, txp); }); - - if (txp.isAccepted()) { - self._notify('TxProposalFinallyAccepted', { - txProposalId: opts.txProposalId, - }); - } - - return cb(null, txp); }); }); }); @@ -892,9 +905,9 @@ WalletService.prototype.broadcastTx = function(opts, cb) { self._notify('NewOutgoingTx', { txProposalId: opts.txProposalId, txid: txid + }, false, function() { + return cb(null, txp); }); - - return cb(null, txp); }); }); }); @@ -932,19 +945,26 @@ WalletService.prototype.rejectTx = function(opts, cb) { self.storage.storeTx(self.walletId, txp, function(err) { if (err) return cb(err); - self._notify('TxProposalRejectedBy', { - txProposalId: opts.txProposalId, - copayerId: self.copayerId, - }); - + async.parallel([ - if (txp.status == 'rejected') { - self._notify('TxProposalFinallyRejected', { - txProposalId: opts.txProposalId, - }); - }; - - return cb(null, txp); + function(done) { + self._notify('TxProposalRejectedBy', { + txProposalId: opts.txProposalId, + copayerId: self.copayerId, + }, false, done); + }, + function(done) { + if (txp.status == 'rejected') { + self._notify('TxProposalFinallyRejected', { + txProposalId: opts.txProposalId, + }, false, done); + } else { + done(); + } + }, + ], function() { + return cb(null, txp); + }); }); }); }; diff --git a/test/integration/server.js b/test/integration/server.js index 641204e..ea2f16d 100644 --- a/test/integration/server.js +++ b/test/integration/server.js @@ -3014,7 +3014,6 @@ describe('Wallet service', function() { should.not.exist(err); var copayerId2 = result.copayerId; - helpers.getAuthServer(copayerId, function(server) { server.getWallet({}, function(err, wallet) { From bd45f8ef990115bad77bdbed3873217bdadf56fe Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Mon, 4 May 2015 18:23:56 -0300 Subject: [PATCH 02/12] add message queue service --- bws-mq.js | 19 +++++++++++ lib/server.js | 51 ++++++++++++++++++++++----- lib/wsapp.js | 70 ++++++++++++++++++++++---------------- test/integration/server.js | 2 +- 4 files changed, 103 insertions(+), 39 deletions(-) create mode 100644 bws-mq.js diff --git a/bws-mq.js b/bws-mq.js new file mode 100644 index 0000000..033a6e8 --- /dev/null +++ b/bws-mq.js @@ -0,0 +1,19 @@ +#!/usr/bin/env node + +'use strict'; + +var _ = require('lodash'); +var io = require('socket.io'); + +var DEFAULT_PORT = 3380; + +var port = parseInt(process.argv[2]) || DEFAULT_PORT; + +var server = io(port); +server.on('connection', function(socket) { + socket.on('notification', function(data) { + server.emit('notification', data); + }); +}); + +console.log('Message queue server listening on port ' + port) diff --git a/lib/server.js b/lib/server.js index 84f922e..3b30cdf 100644 --- a/lib/server.js +++ b/lib/server.js @@ -5,6 +5,7 @@ var async = require('async'); var log = require('npmlog'); log.debug = log.verbose; log.disableColor(); +var io = require('socket.io-client'); var WalletUtils = require('bitcore-wallet-utils'); var Bitcore = WalletUtils.Bitcore; @@ -27,6 +28,7 @@ var Notification = require('./model/notification'); var initialized = false; var lock, storage, blockchainExplorer, blockchainExplorerOpts; +var messageQueue; /** @@ -41,6 +43,7 @@ function WalletService() { this.storage = storage; this.blockchainExplorer = blockchainExplorer; this.blockchainExplorerOpts = blockchainExplorerOpts; + this.messageQueue = messageQueue; this.notifyTicker = 0; }; @@ -58,7 +61,6 @@ WalletService.onNotification = function(func) { WalletService.initialize = function(opts, cb) { $.shouldBeFunction(cb); - opts = opts || {}; lock = opts.lock || new Lock(opts.lockOpts); blockchainExplorer = opts.blockchainExplorer; @@ -67,19 +69,51 @@ WalletService.initialize = function(opts, cb) { if (initialized) return cb(); - if (opts.storage) { - storage = opts.storage; - initialized = true; - return cb(); - } else { + function initStorage(cb) { + if (opts.storage) { + storage = opts.storage; + return cb(); + } var newStorage = new Storage(); newStorage.connect(opts.storageOpts, function(err) { if (err) return cb(err); storage = newStorage; - initialized = true; return cb(); }); - } + }; + + function initMessageQueue(cb) { + if (opts.messageQueue) { + messageQueue = opts.messageQueue; + return cb(); + } + messageQueue = io.connect('http://localhost:3380', { + 'force new connection': true, + }); + messageQueue.on('connect', function() { + return cb(); + }); + messageQueue.on('connect_error', function(err) { + log.warn('Could not connect to message queue server'); + }); + }; + + async.series([ + + function(next) { + initStorage(next); + }, + function(next) { + initMessageQueue(next); + }, + ], function(err) { + if (err) { + log.error('Could not initialize', err); + throw err; + } + initialized = true; + return cb(); + }); }; @@ -316,6 +350,7 @@ WalletService.prototype._notify = function(type, data, isGlobal, cb) { }); this.storage.storeNotification(walletId, n, function() { self._emit('notification', n); + self.messageQueue.emit('notification', n); if (cb) return cb(); }); }; diff --git a/lib/wsapp.js b/lib/wsapp.js index b2fc7e8..8d9597c 100644 --- a/lib/wsapp.js +++ b/lib/wsapp.js @@ -7,16 +7,13 @@ var log = require('npmlog'); log.debug = log.verbose; var Uuid = require('uuid'); -var WalletUtils = require('bitcore-wallet-utils'); -var Bitcore = WalletUtils.Bitcore; var WalletService = require('./server'); -var BlockchainMonitor = require('./blockchainmonitor') var Notification = require('./model/notification'); log.level = 'debug'; -var io, bcMonitor; +var io, messageQueue; var WsApp = function() {}; @@ -25,45 +22,58 @@ WsApp._unauthorized = function(socket) { socket.disconnect(); }; -WsApp.handleNotification = function(service, notification) { - if (notification.type == 'NewAddress') { - self.subscribeAddresses(notification.walletId, notification.data.address); - } - io.to(notification.walletId).emit('notification', notification); -}; +// WsApp._handleNotification = function(notification) { +// console.log('*** [wsapp.js ln26] notification:', notification); // TODO -WsApp.start = function(server, config) { - io = require('socket.io')(server); - bcMonitor = new BlockchainMonitor(config.blockchainExplorerOpts); +// io.to(notification.walletId).emit('notification', notification); +// }; +WsApp._initMessageQueue = function(cb) { function handleNotification(notification) { - if (notification.type == 'NewAddress') { - bcMonitor.subscribeAddresses(notification.walletId, notification.data.address); - } io.to(notification.walletId).emit('notification', notification); }; - bcMonitor.on('notification', handleNotification); - WalletService.onNotification(handleNotification); + messageQueue = require('socket.io-client').connect('http://localhost:3380', { + 'force new connection': true, + }); + messageQueue.on('connect_error', function(err) { + log.warn('Could not connect to message queue server'); + }); + messageQueue.on('notification', handleNotification); - io.on('connection', function(socket) { - socket.nonce = Uuid.v4(); - socket.emit('challenge', socket.nonce); + messageQueue.on('connect', function() { + return cb(); + }); +}; + +WsApp.start = function(server, config, cb) { + io = require('socket.io')(server); - socket.on('authorize', function(data) { - if (data.message != socket.nonce) return WsApp._unauthorized(socket); + async.series([ - WalletService.getInstanceWithAuth(data, function(err, service) { - if (err) return WsApp._unauthorized(socket); + function(done) { + WsApp._initMessageQueue(done); + }, + function(done) { + io.on('connection', function(socket) { + socket.nonce = Uuid.v4(); + socket.on('authorize', function(data) { + if (data.message != socket.nonce) return WsApp._unauthorized(socket); - socket.join(service.walletId); - socket.emit('authorized'); + WalletService.getInstanceWithAuth(data, function(err, service) { + if (err) return WsApp._unauthorized(socket); - bcMonitor.subscribeWallet(service, function(err) { - if (err) log.warn(err.message); + socket.join(service.walletId); + socket.emit('authorized'); + }); }); + + socket.emit('challenge', socket.nonce); + done(); }); - }); + }, + ], function(err) { + if (cb) return cb(err); }); }; diff --git a/test/integration/server.js b/test/integration/server.js index ea2f16d..3f9a12d 100644 --- a/test/integration/server.js +++ b/test/integration/server.js @@ -865,7 +865,7 @@ describe('Wallet service', function() { }); }); - describe('#createTx', function() { + describe.only('#createTx', function() { var server, wallet; beforeEach(function(done) { helpers.createAndJoinWallet(2, 3, function(s, w) { From cce8b642603335e9e52e810af7e6ccfd5c73ce6b Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Tue, 5 May 2015 13:04:29 -0300 Subject: [PATCH 03/12] refactoring --- bws-mq.js | 19 ------------------ config.js | 3 +++ lib/messagequeue.js | 23 +++++++++++++++++++++ lib/server.js | 14 ++++--------- lib/wsapp.js | 41 ++++++++++++++++---------------------- messagequeue/bws-mq.js | 16 +++++++++++++++ test/integration/server.js | 2 +- 7 files changed, 64 insertions(+), 54 deletions(-) delete mode 100644 bws-mq.js create mode 100644 lib/messagequeue.js create mode 100644 messagequeue/bws-mq.js diff --git a/bws-mq.js b/bws-mq.js deleted file mode 100644 index 033a6e8..0000000 --- a/bws-mq.js +++ /dev/null @@ -1,19 +0,0 @@ -#!/usr/bin/env node - -'use strict'; - -var _ = require('lodash'); -var io = require('socket.io'); - -var DEFAULT_PORT = 3380; - -var port = parseInt(process.argv[2]) || DEFAULT_PORT; - -var server = io(port); -server.on('connection', function(socket) { - socket.on('notification', function(data) { - server.emit('notification', data); - }); -}); - -console.log('Message queue server listening on port ' + port) diff --git a/config.js b/config.js index b6d9300..8b36020 100644 --- a/config.js +++ b/config.js @@ -23,6 +23,9 @@ var config = { // port: 3231, // }, }, + messageQueueOpts: { + url: 'http://localhost:3380', + }, blockchainExplorerOpts: { livenet: { provider: 'insight', diff --git a/lib/messagequeue.js b/lib/messagequeue.js new file mode 100644 index 0000000..fa31297 --- /dev/null +++ b/lib/messagequeue.js @@ -0,0 +1,23 @@ +'use strict'; + +var $ = require('preconditions').singleton(); +var io = require('socket.io'); +var log = require('npmlog'); +log.debug = log.verbose; + +var MessageQueue = function() {}; + +MessageQueue.start = function(opts, cb) { + opts = opts || {}; + $.checkIsNumber(opts.port, 'Invalid port number'); + + var server = io(opts.port); + server.on('connection', function(socket) { + socket.on('notification', function(data) { + server.emit('notification', data); + }); + }); + return cb(); +}; + +module.exports = MessageQueue; diff --git a/lib/server.js b/lib/server.js index 3b30cdf..1e79ae5 100644 --- a/lib/server.js +++ b/lib/server.js @@ -87,15 +87,7 @@ WalletService.initialize = function(opts, cb) { messageQueue = opts.messageQueue; return cb(); } - messageQueue = io.connect('http://localhost:3380', { - 'force new connection': true, - }); - messageQueue.on('connect', function() { - return cb(); - }); - messageQueue.on('connect_error', function(err) { - log.warn('Could not connect to message queue server'); - }); + return cb(); }; async.series([ @@ -350,7 +342,9 @@ WalletService.prototype._notify = function(type, data, isGlobal, cb) { }); this.storage.storeNotification(walletId, n, function() { self._emit('notification', n); - self.messageQueue.emit('notification', n); + if (self.messageQueue) { + self.messageQueue.emit('notification', n); + } if (cb) return cb(); }); }; diff --git a/lib/wsapp.js b/lib/wsapp.js index 8d9597c..d424932 100644 --- a/lib/wsapp.js +++ b/lib/wsapp.js @@ -22,37 +22,30 @@ WsApp._unauthorized = function(socket) { socket.disconnect(); }; -// WsApp._handleNotification = function(notification) { -// console.log('*** [wsapp.js ln26] notification:', notification); // TODO - -// io.to(notification.walletId).emit('notification', notification); -// }; - -WsApp._initMessageQueue = function(cb) { - function handleNotification(notification) { - io.to(notification.walletId).emit('notification', notification); - }; - - messageQueue = require('socket.io-client').connect('http://localhost:3380', { - 'force new connection': true, - }); - messageQueue.on('connect_error', function(err) { - log.warn('Could not connect to message queue server'); - }); - messageQueue.on('notification', handleNotification); - - messageQueue.on('connect', function() { - return cb(); - }); +WsApp._handleNotification = function(notification) { + io.to(notification.walletId).emit('notification', notification); }; -WsApp.start = function(server, config, cb) { +WsApp.start = function(server, opts, cb) { + opts = opts || {}; + $.checkState(opts.messageQueueOpts); + io = require('socket.io')(server); async.series([ function(done) { - WsApp._initMessageQueue(done); + messageQueue = require('socket.io-client').connect(opts.messageQueueOpts.url, { + 'force new connection': true, + }); + messageQueue.on('connect_error', function(err) { + log.warn('Could not connect to message queue server'); + }); + messageQueue.on('notification', WsApp._handleNotification); + + messageQueue.on('connect', function() { + done(); + }); }, function(done) { io.on('connection', function(socket) { diff --git a/messagequeue/bws-mq.js b/messagequeue/bws-mq.js new file mode 100644 index 0000000..c466902 --- /dev/null +++ b/messagequeue/bws-mq.js @@ -0,0 +1,16 @@ +#!/usr/bin/env node + +'use strict'; + +var MQ = require('../lib/messagequeue'); + +var DEFAULT_PORT = 3380; + +var opts = { + port: parseInt(process.argv[2]) || DEFAULT_PORT, +}; + +MQ.start(opts, function(err) { + if (err) throw err; + console.log('Message queue server listening on port ' + port) +}); diff --git a/test/integration/server.js b/test/integration/server.js index 3f9a12d..ea2f16d 100644 --- a/test/integration/server.js +++ b/test/integration/server.js @@ -865,7 +865,7 @@ describe('Wallet service', function() { }); }); - describe.only('#createTx', function() { + describe('#createTx', function() { var server, wallet; beforeEach(function(done) { helpers.createAndJoinWallet(2, 3, function(s, w) { From a0e21ed8e1a3cc62d61e4056a586611926e9d63b Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Wed, 6 May 2015 10:00:09 -0300 Subject: [PATCH 04/12] message broker --- config.js | 7 +++-- lib/messagebroker.js | 46 ++++++++++++++++++++++++++++++ lib/messagequeue.js | 23 --------------- lib/notificationbroadcaster.js | 25 ---------------- lib/server.js | 31 +++++++------------- messagebroker/bws-messagebroker.js | 23 +++++++++++++++ messagequeue/bws-mq.js | 16 ----------- test/integration/server.js | 14 +++------ 8 files changed, 88 insertions(+), 97 deletions(-) create mode 100644 lib/messagebroker.js delete mode 100644 lib/messagequeue.js delete mode 100644 lib/notificationbroadcaster.js create mode 100644 messagebroker/bws-messagebroker.js delete mode 100644 messagequeue/bws-mq.js diff --git a/config.js b/config.js index 8b36020..e64b966 100644 --- a/config.js +++ b/config.js @@ -23,8 +23,11 @@ var config = { // port: 3231, // }, }, - messageQueueOpts: { - url: 'http://localhost:3380', + messageBrokerOpts: { + // To use message broker server, uncomment this: + messageBrokerServer: { + url: 'http://localhost:3380', + }, }, blockchainExplorerOpts: { livenet: { diff --git a/lib/messagebroker.js b/lib/messagebroker.js new file mode 100644 index 0000000..028bdcb --- /dev/null +++ b/lib/messagebroker.js @@ -0,0 +1,46 @@ +var $ = require('preconditions').singleton(); +var _ = require('lodash'); +var inherits = require('inherits'); +var events = require('events'); +var nodeutil = require('util'); +var log = require('npmlog'); +log.debug = log.verbose; +log.disableColor(); + +function MessageBroker(opts) { + var self = this; + + opts = opts || {}; + if (opts.messageBrokerServer) { + var url = opts.messageBrokerServer.url; + + this.remote = true; + this.mq = require('socket.io-client').connect(url); + this.mq.on('connect', function() {}); + this.mq.on('connect_error', function() { + log.warn('Message queue server connection error'); + }); + + this.mq.on('msg', function(data) { + self.emit('msg', data); + }); + + log.info('Using message queue server at ' + url); + } +}; + +nodeutil.inherits(MessageBroker, events.EventEmitter); + +MessageBroker.prototype.send = function(data) { + if (this.remote) { + this.mq.emit('msg', data); + } else { + this.emit('msg', data); + } +}; + +MessageBroker.prototype.onMessage = function(handler) { + this.on('msg', handler); +}; + +module.exports = MessageBroker; diff --git a/lib/messagequeue.js b/lib/messagequeue.js deleted file mode 100644 index fa31297..0000000 --- a/lib/messagequeue.js +++ /dev/null @@ -1,23 +0,0 @@ -'use strict'; - -var $ = require('preconditions').singleton(); -var io = require('socket.io'); -var log = require('npmlog'); -log.debug = log.verbose; - -var MessageQueue = function() {}; - -MessageQueue.start = function(opts, cb) { - opts = opts || {}; - $.checkIsNumber(opts.port, 'Invalid port number'); - - var server = io(opts.port); - server.on('connection', function(socket) { - socket.on('notification', function(data) { - server.emit('notification', data); - }); - }); - return cb(); -}; - -module.exports = MessageQueue; diff --git a/lib/notificationbroadcaster.js b/lib/notificationbroadcaster.js deleted file mode 100644 index 692c43e..0000000 --- a/lib/notificationbroadcaster.js +++ /dev/null @@ -1,25 +0,0 @@ -'use strict'; - -var log = require('npmlog'); -log.debug = log.verbose; -var inherits = require('inherits'); -var events = require('events'); -var nodeutil = require('util'); - -function NotificationBroadcaster() {}; - -nodeutil.inherits(NotificationBroadcaster, events.EventEmitter); - -NotificationBroadcaster.prototype.broadcast = function(eventName, notification) { - this.emit(eventName, notification); -}; - -var _instance; -NotificationBroadcaster.singleton = function() { - if (!_instance) { - _instance = new NotificationBroadcaster(); - } - return _instance; -}; - -module.exports = NotificationBroadcaster.singleton(); diff --git a/lib/server.js b/lib/server.js index 1e79ae5..1fcd977 100644 --- a/lib/server.js +++ b/lib/server.js @@ -5,7 +5,6 @@ var async = require('async'); var log = require('npmlog'); log.debug = log.verbose; log.disableColor(); -var io = require('socket.io-client'); var WalletUtils = require('bitcore-wallet-utils'); var Bitcore = WalletUtils.Bitcore; @@ -17,6 +16,7 @@ var ClientError = require('./clienterror'); var Utils = require('./utils'); var Lock = require('./lock'); var Storage = require('./storage'); +var MessageBroker = require('./messagebroker'); var NotificationBroadcaster = require('./notificationbroadcaster'); var BlockchainExplorer = require('./blockchainexplorer'); @@ -28,7 +28,7 @@ var Notification = require('./model/notification'); var initialized = false; var lock, storage, blockchainExplorer, blockchainExplorerOpts; -var messageQueue; +var messageBroker; /** @@ -43,7 +43,7 @@ function WalletService() { this.storage = storage; this.blockchainExplorer = blockchainExplorer; this.blockchainExplorerOpts = blockchainExplorerOpts; - this.messageQueue = messageQueue; + this.messageBroker = messageBroker; this.notifyTicker = 0; }; @@ -82,10 +82,11 @@ WalletService.initialize = function(opts, cb) { }); }; - function initMessageQueue(cb) { - if (opts.messageQueue) { - messageQueue = opts.messageQueue; - return cb(); + function initMessageBroker(cb) { + if (opts.messageBroker) { + messageBroker = opts.messageBroker; + } else { + messageBroker = new MessageBroker(opts.messageBrokerOpts); } return cb(); }; @@ -96,7 +97,7 @@ WalletService.initialize = function(opts, cb) { initStorage(next); }, function(next) { - initMessageQueue(next); + initMessageBroker(next); }, ], function(err) { if (err) { @@ -307,15 +308,6 @@ WalletService.prototype._verifySignature = function(text, signature, pubKey) { return WalletUtils.verifyMessage(text, signature, pubKey); }; -/** - * _emit - * - * @param {Object} args - */ -WalletService.prototype._emit = function(eventName, args) { - NotificationBroadcaster.broadcast(eventName, args); -}; - /** * _notify * @@ -341,10 +333,7 @@ WalletService.prototype._notify = function(type, data, isGlobal, cb) { walletId: walletId, }); this.storage.storeNotification(walletId, n, function() { - self._emit('notification', n); - if (self.messageQueue) { - self.messageQueue.emit('notification', n); - } + self.messageBroker.send(n); if (cb) return cb(); }); }; diff --git a/messagebroker/bws-messagebroker.js b/messagebroker/bws-messagebroker.js new file mode 100644 index 0000000..68f43a7 --- /dev/null +++ b/messagebroker/bws-messagebroker.js @@ -0,0 +1,23 @@ +#!/usr/bin/env node + +'use strict'; + +var $ = require('preconditions').singleton(); +var io = require('socket.io'); +var log = require('npmlog'); +log.debug = log.verbose; + +var DEFAULT_PORT = 3380; + +var opts = { + port: parseInt(process.argv[2]) || DEFAULT_PORT, +}; + +var server = io(opts.port); +server.on('connection', function(socket) { + socket.on('msg', function(data) { + server.emit('msg', data); + }); +}); + +console.log('Message queue server listening on port ' + opts.port) diff --git a/messagequeue/bws-mq.js b/messagequeue/bws-mq.js deleted file mode 100644 index c466902..0000000 --- a/messagequeue/bws-mq.js +++ /dev/null @@ -1,16 +0,0 @@ -#!/usr/bin/env node - -'use strict'; - -var MQ = require('../lib/messagequeue'); - -var DEFAULT_PORT = 3380; - -var opts = { - port: parseInt(process.argv[2]) || DEFAULT_PORT, -}; - -MQ.start(opts, function(err) { - if (err) throw err; - console.log('Message queue server listening on port ' + port) -}); diff --git a/test/integration/server.js b/test/integration/server.js index ea2f16d..5d05111 100644 --- a/test/integration/server.js +++ b/test/integration/server.js @@ -2050,7 +2050,6 @@ describe('Wallet service', function() { server.getPendingTxs({}, function(err, txs) { var tx = txs[2]; var signatures = helpers.clientSign(tx, TestData.copayers[0].xPrivKey); - sinon.spy(server, '_emit'); server.signTx({ txProposalId: tx.id, signatures: signatures, @@ -2068,11 +2067,6 @@ describe('Wallet service', function() { should.not.exist(err); var types = _.pluck(notifications, 'type'); types.should.deep.equal(['NewOutgoingTx', 'TxProposalFinallyAccepted', 'TxProposalAcceptedBy']); - // Check also events - server._emit.getCall(0).args[1].type.should.equal('TxProposalAcceptedBy'); - server._emit.getCall(1).args[1].type.should.equal('TxProposalFinallyAccepted');; - server._emit.getCall(2).args[1].type.should.equal('NewOutgoingTx'); - done(); }); }); @@ -2738,7 +2732,7 @@ describe('Wallet service', function() { }); afterEach(function() { WalletService.scanConfig = scanConfigOld; - NotificationBroadcaster.removeAllListeners(); + server.messageBroker.removeAllListeners(); }); it('should start an asynchronous scan', function(done) { @@ -2755,7 +2749,7 @@ describe('Wallet service', function() { 'm/2147483647/1/0', 'm/2147483647/1/1', ]; - WalletService.onNotification(function(n) { + server.messageBroker.onMessage(function(n) { if (n.type == 'ScanFinished') { server.getWallet({}, function(err, wallet) { should.exist(wallet.scanStatus); @@ -2781,7 +2775,7 @@ describe('Wallet service', function() { }); it('should set scan status error when unable to reach blockchain', function(done) { blockchainExplorer.getAddressActivity = sinon.stub().yields('dummy error'); - WalletService.onNotification(function(n) { + server.messageBroker.onMessage(function(n) { if (n.type == 'ScanFinished') { should.exist(n.data.error); server.getWallet({}, function(err, wallet) { @@ -2800,7 +2794,7 @@ describe('Wallet service', function() { WalletService.scanConfig.SCAN_WINDOW = 1; var scans = 0; - WalletService.onNotification(function(n) { + server.messageBroker.onMessage(function(n) { if (n.type == 'ScanFinished') { scans++; if (scans == 2) done(); From 756b82b370276ed165d803d191effb54fce04458 Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Wed, 6 May 2015 10:28:47 -0300 Subject: [PATCH 05/12] update websocket app --- lib/server.js | 5 ----- lib/wsapp.js | 21 ++++++--------------- test/integration/server.js | 1 - 3 files changed, 6 insertions(+), 21 deletions(-) diff --git a/lib/server.js b/lib/server.js index 1fcd977..36c6130 100644 --- a/lib/server.js +++ b/lib/server.js @@ -17,7 +17,6 @@ var Utils = require('./utils'); var Lock = require('./lock'); var Storage = require('./storage'); var MessageBroker = require('./messagebroker'); -var NotificationBroadcaster = require('./notificationbroadcaster'); var BlockchainExplorer = require('./blockchainexplorer'); var Wallet = require('./model/wallet'); @@ -47,10 +46,6 @@ function WalletService() { this.notifyTicker = 0; }; -WalletService.onNotification = function(func) { - NotificationBroadcaster.on('notification', func); -}; - /** * Initializes global settings for all instances. * @param {Object} opts diff --git a/lib/wsapp.js b/lib/wsapp.js index d424932..d4fb026 100644 --- a/lib/wsapp.js +++ b/lib/wsapp.js @@ -8,12 +8,11 @@ log.debug = log.verbose; var Uuid = require('uuid'); var WalletService = require('./server'); - -var Notification = require('./model/notification'); +var MessageBroker = require('./messagebroker'); log.level = 'debug'; -var io, messageQueue; +var io, messageBroker; var WsApp = function() {}; @@ -28,24 +27,16 @@ WsApp._handleNotification = function(notification) { WsApp.start = function(server, opts, cb) { opts = opts || {}; - $.checkState(opts.messageQueueOpts); + $.checkState(opts.messageBrokerOpts); io = require('socket.io')(server); async.series([ function(done) { - messageQueue = require('socket.io-client').connect(opts.messageQueueOpts.url, { - 'force new connection': true, - }); - messageQueue.on('connect_error', function(err) { - log.warn('Could not connect to message queue server'); - }); - messageQueue.on('notification', WsApp._handleNotification); - - messageQueue.on('connect', function() { - done(); - }); + messageBroker = new MessageBroker(opts.messageBrokerOpts); + messageBroker.onMessage(WsApp._handleNotification); + done(); }, function(done) { io.on('connection', function(socket) { diff --git a/test/integration/server.js b/test/integration/server.js index 5d05111..bb8506a 100644 --- a/test/integration/server.js +++ b/test/integration/server.js @@ -28,7 +28,6 @@ var Address = Model.Address; var Copayer = Model.Copayer; var WalletService = require('../../lib/server'); -var NotificationBroadcaster = require('../../lib/notificationbroadcaster'); var TestData = require('../testdata'); var helpers = {}; From ae0114b17b95a307b31dcc8c4ebcce617cf096ca Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Wed, 6 May 2015 12:48:43 -0300 Subject: [PATCH 06/12] broadcast blockchain events --- bcmonitor/bcmonitor.js | 15 ++++ lib/blockchainmonitor.js | 133 ++++++++++++++--------------- lib/storage.js | 14 +++ lib/wsapp.js | 3 +- messagebroker/bws-messagebroker.js | 2 +- 5 files changed, 97 insertions(+), 70 deletions(-) create mode 100644 bcmonitor/bcmonitor.js diff --git a/bcmonitor/bcmonitor.js b/bcmonitor/bcmonitor.js new file mode 100644 index 0000000..cfd85f6 --- /dev/null +++ b/bcmonitor/bcmonitor.js @@ -0,0 +1,15 @@ +#!/usr/bin/env node + +'use strict'; + +var log = require('npmlog'); +log.debug = log.verbose; + +var config = require('../config'); +var BlockchainExplorer = require('../lib/blockchainmonitor'); + +BlockchainExplorer.start(config, function(err) { + if (err) throw err; + + console.log('Blockchain monitor started'); +}); diff --git a/lib/blockchainmonitor.js b/lib/blockchainmonitor.js index 0e7f730..481dcf0 100644 --- a/lib/blockchainmonitor.js +++ b/lib/blockchainmonitor.js @@ -5,100 +5,97 @@ var _ = require('lodash'); var async = require('async'); var log = require('npmlog'); log.debug = log.verbose; -var Uuid = require('uuid'); -var inherits = require('inherits'); -var events = require('events'); -var nodeutil = require('util'); - -var WalletUtils = require('bitcore-wallet-utils'); -var Bitcore = WalletUtils.Bitcore; -var WalletService = require('./server'); + var BlockchainExplorer = require('./blockchainexplorer'); +var Storage = require('./storage'); +var MessageBroker = require('./messagebroker'); var Notification = require('./model/notification'); -function BlockchainMonitor(opts) { +var storage, messageBroker; + +function BlockchainMonitor() {}; + +BlockchainMonitor.start = function(opts, cb) { opts = opts || {}; - var self = this; - this.subscriptions = {}; - this.subscriber = {}; - _.each(['livenet', 'testnet'], function(network) { - opts[network] = opts[network] || {}; - self.subscriber[network] = self._getAddressSubscriber( - opts[network].provider, network, opts[network].url); - }); -}; + $.checkArgument(opts.blockchainExplorerOpts); + $.checkArgument(opts.storageOpts); + + async.parallel([ -nodeutil.inherits(BlockchainMonitor, events.EventEmitter); + function(done) { + _.each(['livenet', 'testnet'], function(network) { + var config = opts.blockchainExplorerOpts[network] || {}; + BlockchainMonitor._initExplorer(config.provider, network, config.url); + }); + done(); + }, + function(done) { + storage = new Storage(); + storage.connect(opts.storageOpts, done); + }, + function(done) { + messageBroker = new MessageBroker(opts.messageBrokerOpts); + done(); + }, + ], cb); + +}; -BlockchainMonitor.prototype._getAddressSubscriber = function(provider, network) { +BlockchainMonitor._initExplorer = function(provider, network, url) { $.checkArgument(provider == 'insight', 'Blockchain monitor ' + provider + ' not supported'); var explorer = new BlockchainExplorer({ provider: provider, network: network, + url: url, }); var socket = explorer.initSocket(); - - // TODO: Extract on its own class once more providers are implemented - return { - subscribe: function(address, handler) { - socket.emit('subscribe', address); - socket.on(address, handler); - }, - }; + socket.emit('subscribe', 'inv'); + socket.on('tx', BlockchainMonitor._handleIncommingTx); }; -BlockchainMonitor.prototype.subscribeAddresses = function(walletId, addresses) { - $.checkArgument(walletId); +BlockchainMonitor._handleIncommingTx = function(data) { + if (!data || !data.vout) return; + + var outs = _.compact(_.map(data.vout, function(v) { + var addr = _.keys(v)[0]; + if (addr.indexOf('3') != 0 && addr.indexOf('2') != 0) return; - var self = this; + return { + address: addr, + amount: +v[addr] + }; + })); + if (_.isEmpty(outs)) return; - if (!addresses || addresses.length == 0) return; + async.each(outs, function(out, next) { + storage.fetchWalletIdByAddress(out.address, function(err, walletId) { + if (err || !walletId) return next(err); - function handlerFor(address, txid) { - var notification = Notification.create({ - walletId: this, - type: 'NewIncomingTx', - data: { - address: address, - txid: txid, - }, + log.info('Incoming tx for wallet ' + walletId + ' (' + out.address + ' -> ' + out.amount + ')'); + BlockchainMonitor._createNotification(walletId, data.txid, out.address, out.amount, next); }); - self.emit('notification', notification); - }; - - if (!self.subscriptions[walletId]) { - self.subscriptions[walletId] = { - addresses: [], - }; - }; - - var addresses = [].concat(addresses); - var network = Bitcore.Address.fromString(addresses[0]).network.name; - var subscriber = self.subscriber[network]; - _.each(addresses, function(address) { - self.subscriptions[walletId].addresses.push(address); - subscriber.subscribe(address, _.bind(handlerFor, walletId, address)); + }, function(err) { + return; }); }; -BlockchainMonitor.prototype.subscribeWallet = function(walletService, cb) { - var self = this; - - var walletId = walletService.walletId; - if (self.subscriptions[walletId]) return; - - walletService.getMainAddresses({}, function(err, addresses) { - if (err) { - delete self.subscriptions[walletId]; - return cb(new Error('Could not subscribe to addresses for wallet ' + walletId)); - } - self.subscribeAddresses(walletService.walletId, _.pluck(addresses, 'address')); +BlockchainMonitor._createNotification = function(walletId, txid, address, amount, cb) { + var n = Notification.create({ + type: 'NewIncomingTx', + data: { + txid: txid, + address: address, + amount: amount, + }, + walletId: walletId, + }); + storage.storeNotification(walletId, n, function() { + messageBroker.send(n) return cb(); }); }; - module.exports = BlockchainMonitor; diff --git a/lib/storage.js b/lib/storage.js index c72a0c9..e7b1160 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -312,6 +312,20 @@ Storage.prototype.storeAddressAndWallet = function(wallet, addresses, cb) { }); }; +Storage.prototype.fetchWalletIdByAddress = function(address, cb) { + var self = this; + + this.db.collection(collections.ADDRESSES).findOne({ + address: address, + }, function(err, result) { + if (err) return cb(err); + if (!result) return cb(); + + return cb(null, result.walletId); + }); +}; + + Storage.prototype._dump = function(cb, fn) { fn = fn || console.log; cb = cb || function() {}; diff --git a/lib/wsapp.js b/lib/wsapp.js index d4fb026..3f44471 100644 --- a/lib/wsapp.js +++ b/lib/wsapp.js @@ -9,10 +9,11 @@ var Uuid = require('uuid'); var WalletService = require('./server'); var MessageBroker = require('./messagebroker'); +var BlockchainMonitor = require('./blockchainmonitor'); log.level = 'debug'; -var io, messageBroker; +var io, messageBroker, blockchainMonitor; var WsApp = function() {}; diff --git a/messagebroker/bws-messagebroker.js b/messagebroker/bws-messagebroker.js index 68f43a7..d48dc63 100644 --- a/messagebroker/bws-messagebroker.js +++ b/messagebroker/bws-messagebroker.js @@ -20,4 +20,4 @@ server.on('connection', function(socket) { }); }); -console.log('Message queue server listening on port ' + opts.port) +console.log('Message broker server listening on port ' + opts.port) From 117dddc138d0f86c9917ddae94fe88d21daf6b0f Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Wed, 6 May 2015 12:52:18 -0300 Subject: [PATCH 07/12] remove tests for blockchain monitor --- test/integration/server.js | 100 ------------------------------------- 1 file changed, 100 deletions(-) diff --git a/test/integration/server.js b/test/integration/server.js index bb8506a..8828674 100644 --- a/test/integration/server.js +++ b/test/integration/server.js @@ -19,7 +19,6 @@ var Utils = require('../../lib/utils'); var WalletUtils = require('bitcore-wallet-utils'); var Bitcore = WalletUtils.Bitcore; var Storage = require('../../lib/storage'); -var BlockchainMonitor = require('../../lib/blockchainmonitor'); var Model = require('../../lib/model'); var Wallet = Model.Wallet; @@ -3032,102 +3031,3 @@ describe('Wallet service', function() { }); }); }); - - -describe('Blockchain monitor', function() { - var addressSubscriber; - - before(function(done) { - openDb(function() { - storage = new Storage({ - db: db - }); - done(); - }); - }); - - beforeEach(function(done) { - addressSubscriber = sinon.stub(); - addressSubscriber.subscribe = sinon.stub(); - sinon.stub(BlockchainMonitor.prototype, '_getAddressSubscriber').onFirstCall().returns(addressSubscriber); - - resetDb(function() { - blockchainExplorer = sinon.stub(); - WalletService.initialize({ - storage: storage, - blockchainExplorer: blockchainExplorer, - }, done); - }); - }); - afterEach(function() { - BlockchainMonitor.prototype._getAddressSubscriber.restore(); - }); - after(function(done) { - WalletService.shutDown(done); - }); - - it('should subscribe wallet', function(done) { - var monitor = new BlockchainMonitor(); - helpers.createAndJoinWallet(2, 2, function(server, wallet) { - server.createAddress({}, function(err, address1) { - should.not.exist(err); - server.createAddress({}, function(err, address2) { - should.not.exist(err); - monitor.subscribeWallet(server, function(err) { - should.not.exist(err); - addressSubscriber.subscribe.calledTwice.should.be.true; - addressSubscriber.subscribe.calledWith(address1.address).should.be.true; - addressSubscriber.subscribe.calledWith(address2.address).should.be.true; - done(); - }); - }); - }); - }); - }); - - it('should be able to subscribe new address', function(done) { - var monitor = new BlockchainMonitor(); - helpers.createAndJoinWallet(2, 2, function(server, wallet) { - server.createAddress({}, function(err, address1) { - should.not.exist(err); - monitor.subscribeWallet(server, function(err) { - should.not.exist(err); - addressSubscriber.subscribe.calledOnce.should.be.true; - addressSubscriber.subscribe.calledWith(address1.address).should.be.true; - server.createAddress({}, function(err, address2) { - should.not.exist(err); - monitor.subscribeAddresses(wallet.id, address2.address); - addressSubscriber.subscribe.calledTwice.should.be.true; - addressSubscriber.subscribe.calledWith(address2.address).should.be.true; - done(); - }); - }); - }); - }); - }); - - it('should create NewIncomingTx notification when a new tx arrives on registered address', function(done) { - var monitor = new BlockchainMonitor(); - helpers.createAndJoinWallet(2, 2, function(server, wallet) { - server.createAddress({}, function(err, address1) { - should.not.exist(err); - monitor.subscribeWallet(server, function(err) { - should.not.exist(err); - addressSubscriber.subscribe.calledOnce.should.be.true; - addressSubscriber.subscribe.getCall(0).args[0].should.equal(address1.address); - var handler = addressSubscriber.subscribe.getCall(0).args[1]; - _.isFunction(handler).should.be.true; - - monitor.on('notification', function(notification) { - notification.type.should.equal('NewIncomingTx'); - notification.data.address.should.equal(address1.address); - notification.data.txid.should.equal('txid'); - done(); - }); - - handler('txid'); - }); - }); - }); - }); -}); From cf8106eb3f3ceae82e6289b61a0bfce83aa8f3ed Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Wed, 6 May 2015 14:32:01 -0300 Subject: [PATCH 08/12] ignore change addresses --- lib/blockchainmonitor.js | 6 ++++-- lib/storage.js | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/lib/blockchainmonitor.js b/lib/blockchainmonitor.js index 481dcf0..9496578 100644 --- a/lib/blockchainmonitor.js +++ b/lib/blockchainmonitor.js @@ -71,9 +71,11 @@ BlockchainMonitor._handleIncommingTx = function(data) { if (_.isEmpty(outs)) return; async.each(outs, function(out, next) { - storage.fetchWalletIdByAddress(out.address, function(err, walletId) { - if (err || !walletId) return next(err); + storage.fetchAddress(out.address, function(err, address) { + if (err || !address) return next(err); + if (address.isChange) return next(); + var walletId = address.walletId; log.info('Incoming tx for wallet ' + walletId + ' (' + out.address + ' -> ' + out.amount + ')'); BlockchainMonitor._createNotification(walletId, data.txid, out.address, out.amount, next); }); diff --git a/lib/storage.js b/lib/storage.js index e7b1160..11740d5 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -312,7 +312,7 @@ Storage.prototype.storeAddressAndWallet = function(wallet, addresses, cb) { }); }; -Storage.prototype.fetchWalletIdByAddress = function(address, cb) { +Storage.prototype.fetchAddress = function(address, cb) { var self = this; this.db.collection(collections.ADDRESSES).findOne({ @@ -321,7 +321,7 @@ Storage.prototype.fetchWalletIdByAddress = function(address, cb) { if (err) return cb(err); if (!result) return cb(); - return cb(null, result.walletId); + return cb(null, Model.Address.fromObj(result)); }); }; From 86525ce6ef5d21fd2e051050a7e11d526408457e Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Wed, 6 May 2015 16:03:58 -0300 Subject: [PATCH 09/12] refactor bcmonitor --- bcmonitor/bcmonitor.js | 9 +++++++-- lib/blockchainmonitor.js | 42 ++++++++++++++++++++++++---------------- 2 files changed, 32 insertions(+), 19 deletions(-) diff --git a/bcmonitor/bcmonitor.js b/bcmonitor/bcmonitor.js index cfd85f6..3a62e69 100644 --- a/bcmonitor/bcmonitor.js +++ b/bcmonitor/bcmonitor.js @@ -2,14 +2,19 @@ 'use strict'; +var _ = require('lodash'); var log = require('npmlog'); log.debug = log.verbose; var config = require('../config'); -var BlockchainExplorer = require('../lib/blockchainmonitor'); +var BlockchainMonitor = require('../lib/blockchainmonitor'); -BlockchainExplorer.start(config, function(err) { +var bcm = new BlockchainMonitor(); +bcm.start(config, function(err) { if (err) throw err; console.log('Blockchain monitor started'); + _.each(bcm.explorers, function(explorer) { + console.log('\t' + explorer.network.name + ': ' + explorer.url); + }); }); diff --git a/lib/blockchainmonitor.js b/lib/blockchainmonitor.js index 9496578..e1b782e 100644 --- a/lib/blockchainmonitor.js +++ b/lib/blockchainmonitor.js @@ -12,39 +12,41 @@ var MessageBroker = require('./messagebroker'); var Notification = require('./model/notification'); -var storage, messageBroker; - function BlockchainMonitor() {}; -BlockchainMonitor.start = function(opts, cb) { +BlockchainMonitor.prototype.start = function(opts, cb) { opts = opts || {}; $.checkArgument(opts.blockchainExplorerOpts); $.checkArgument(opts.storageOpts); + var self = this; + async.parallel([ function(done) { - _.each(['livenet', 'testnet'], function(network) { + self.explorers = _.map(['livenet', 'testnet'], function(network) { var config = opts.blockchainExplorerOpts[network] || {}; - BlockchainMonitor._initExplorer(config.provider, network, config.url); + return self._initExplorer(config.provider, network, config.url); }); done(); }, function(done) { - storage = new Storage(); - storage.connect(opts.storageOpts, done); + self.storage = new Storage(); + self.storage.connect(opts.storageOpts, done); }, function(done) { - messageBroker = new MessageBroker(opts.messageBrokerOpts); + self.messageBroker = new MessageBroker(opts.messageBrokerOpts); done(); }, ], cb); }; -BlockchainMonitor._initExplorer = function(provider, network, url) { +BlockchainMonitor.prototype._initExplorer = function(provider, network, url) { $.checkArgument(provider == 'insight', 'Blockchain monitor ' + provider + ' not supported'); + var self = this; + var explorer = new BlockchainExplorer({ provider: provider, network: network, @@ -53,10 +55,14 @@ BlockchainMonitor._initExplorer = function(provider, network, url) { var socket = explorer.initSocket(); socket.emit('subscribe', 'inv'); - socket.on('tx', BlockchainMonitor._handleIncommingTx); + socket.on('tx', _.bind(self._handleIncommingTx, self)); + + return explorer; }; -BlockchainMonitor._handleIncommingTx = function(data) { +BlockchainMonitor.prototype._handleIncommingTx = function(data) { + var self = this; + if (!data || !data.vout) return; var outs = _.compact(_.map(data.vout, function(v) { @@ -71,20 +77,22 @@ BlockchainMonitor._handleIncommingTx = function(data) { if (_.isEmpty(outs)) return; async.each(outs, function(out, next) { - storage.fetchAddress(out.address, function(err, address) { + self.storage.fetchAddress(out.address, function(err, address) { if (err || !address) return next(err); if (address.isChange) return next(); var walletId = address.walletId; - log.info('Incoming tx for wallet ' + walletId + ' (' + out.address + ' -> ' + out.amount + ')'); - BlockchainMonitor._createNotification(walletId, data.txid, out.address, out.amount, next); + log.info('Incoming tx for wallet ' + walletId + ' [' + out.amount + 'sat -> ' + out.address + ']'); + self._createNotification(walletId, data.txid, out.address, out.amount, next); }); }, function(err) { return; }); }; -BlockchainMonitor._createNotification = function(walletId, txid, address, amount, cb) { +BlockchainMonitor.prototype._createNotification = function(walletId, txid, address, amount, cb) { + var self = this; + var n = Notification.create({ type: 'NewIncomingTx', data: { @@ -94,8 +102,8 @@ BlockchainMonitor._createNotification = function(walletId, txid, address, amount }, walletId: walletId, }); - storage.storeNotification(walletId, n, function() { - messageBroker.send(n) + self.storage.storeNotification(walletId, n, function() { + self.messageBroker.send(n) return cb(); }); }; From bfaf7ff5c4af931bd5997ae5a3778a112380849e Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Wed, 6 May 2015 16:26:43 -0300 Subject: [PATCH 10/12] refactor websockets app --- bws.js | 4 ++-- lib/wsapp.js | 25 ++++++++++++------------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/bws.js b/bws.js index 9260511..3080144 100755 --- a/bws.js +++ b/bws.js @@ -37,7 +37,7 @@ var start = function(cb) { if (err) return cb(err); var server = config.https ? serverModule.createServer(serverOpts, app) : serverModule.Server(app); - WsApp.start(server, config); + new WsApp().start(server, config); return server; }); }); @@ -47,7 +47,7 @@ var start = function(cb) { if (err) return cb(err); server = config.https ? serverModule.createServer(serverOpts, app) : serverModule.Server(app); - WsApp.start(server, config); + new WsApp().start(server, config); return cb(null, server); }); }; diff --git a/lib/wsapp.js b/lib/wsapp.js index 3f44471..9bc8ca8 100644 --- a/lib/wsapp.js +++ b/lib/wsapp.js @@ -9,44 +9,43 @@ var Uuid = require('uuid'); var WalletService = require('./server'); var MessageBroker = require('./messagebroker'); -var BlockchainMonitor = require('./blockchainmonitor'); log.level = 'debug'; -var io, messageBroker, blockchainMonitor; - var WsApp = function() {}; -WsApp._unauthorized = function(socket) { +WsApp.prototype._unauthorized = function(socket) { socket.emit('unauthorized'); socket.disconnect(); }; -WsApp._handleNotification = function(notification) { - io.to(notification.walletId).emit('notification', notification); +WsApp.prototype._handleNotification = function(notification) { + this.io.to(notification.walletId).emit('notification', notification); }; -WsApp.start = function(server, opts, cb) { +WsApp.prototype.start = function(server, opts, cb) { opts = opts || {}; $.checkState(opts.messageBrokerOpts); - io = require('socket.io')(server); + var self = this; + + this.io = require('socket.io')(server); async.series([ function(done) { - messageBroker = new MessageBroker(opts.messageBrokerOpts); - messageBroker.onMessage(WsApp._handleNotification); + self.messageBroker = new MessageBroker(opts.messageBrokerOpts); + self.messageBroker.onMessage(_.bind(self._handleNotification, self)); done(); }, function(done) { - io.on('connection', function(socket) { + self.io.on('connection', function(socket) { socket.nonce = Uuid.v4(); socket.on('authorize', function(data) { - if (data.message != socket.nonce) return WsApp._unauthorized(socket); + if (data.message != socket.nonce) return self._unauthorized(socket); WalletService.getInstanceWithAuth(data, function(err, service) { - if (err) return WsApp._unauthorized(socket); + if (err) return self._unauthorized(socket); socket.join(service.walletId); socket.emit('authorized'); From 75a834c4cf76efbe9b901fd85b91c3dd81da7891 Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Thu, 7 May 2015 14:16:24 -0300 Subject: [PATCH 11/12] various fixes --- bcmonitor/bcmonitor.js | 3 --- bws.js | 15 +++++++++++---- lib/blockchainmonitor.js | 18 +++++++++++++++--- lib/messagebroker.js | 4 ++-- lib/server.js | 37 +++++++++++++++++++++++-------------- lib/storage.js | 6 ++++++ lib/wsapp.js | 2 +- 7 files changed, 58 insertions(+), 27 deletions(-) diff --git a/bcmonitor/bcmonitor.js b/bcmonitor/bcmonitor.js index 3a62e69..3a6e6e7 100644 --- a/bcmonitor/bcmonitor.js +++ b/bcmonitor/bcmonitor.js @@ -14,7 +14,4 @@ bcm.start(config, function(err) { if (err) throw err; console.log('Blockchain monitor started'); - _.each(bcm.explorers, function(explorer) { - console.log('\t' + explorer.network.name + ': ' + explorer.url); - }); }); diff --git a/bws.js b/bws.js index 3080144..7812e29 100755 --- a/bws.js +++ b/bws.js @@ -37,8 +37,10 @@ var start = function(cb) { if (err) return cb(err); var server = config.https ? serverModule.createServer(serverOpts, app) : serverModule.Server(app); - new WsApp().start(server, config); - return server; + var wsApp = new WsApp(); + wsApp.start(server, config, function(err) { + return server; + }); }); }); return cb(null, server); @@ -47,8 +49,10 @@ var start = function(cb) { if (err) return cb(err); server = config.https ? serverModule.createServer(serverOpts, app) : serverModule.Server(app); - new WsApp().start(server, config); - return cb(null, server); + var wsApp = new WsApp(); + wsApp.start(server, config, function(err) { + return cb(err, server); + }); }); }; }; @@ -56,6 +60,9 @@ var start = function(cb) { if (config.cluster && !config.lockOpts.lockerServer) throw 'When running in cluster mode, locker server need to be configured'; +if (config.cluster && !config.messageBrokerOpts.messageBrokerServer) + throw 'When running in cluster mode, message broker server need to be configured'; + start(function(err, server) { if (err) { console.log('Could not start BWS:', err); diff --git a/lib/blockchainmonitor.js b/lib/blockchainmonitor.js index e1b782e..e2ff233 100644 --- a/lib/blockchainmonitor.js +++ b/lib/blockchainmonitor.js @@ -54,6 +54,14 @@ BlockchainMonitor.prototype._initExplorer = function(provider, network, url) { }); var socket = explorer.initSocket(); + + var connectionInfo = provider + ' (' + network + ') @ ' + url; + socket.on('connect', function() { + log.info('Connected to ' + connectionInfo); + }); + socket.on('connect_error', function() { + log.error('Error connecting to ' + connectionInfo); + }); socket.emit('subscribe', 'inv'); socket.on('tx', _.bind(self._handleIncommingTx, self)); @@ -67,7 +75,8 @@ BlockchainMonitor.prototype._handleIncommingTx = function(data) { var outs = _.compact(_.map(data.vout, function(v) { var addr = _.keys(v)[0]; - if (addr.indexOf('3') != 0 && addr.indexOf('2') != 0) return; + var startingChar = addr.charAt(0); + if (startingChar != '2' && startingChar != '3') return; return { address: addr, @@ -78,8 +87,11 @@ BlockchainMonitor.prototype._handleIncommingTx = function(data) { async.each(outs, function(out, next) { self.storage.fetchAddress(out.address, function(err, address) { - if (err || !address) return next(err); - if (address.isChange) return next(); + if (err) { + log.error('Could not fetch addresses from the db'); + return next(err); + } + if (!address || address.isChange) return next(); var walletId = address.walletId; log.info('Incoming tx for wallet ' + walletId + ' [' + out.amount + 'sat -> ' + out.address + ']'); diff --git a/lib/messagebroker.js b/lib/messagebroker.js index 028bdcb..e41da43 100644 --- a/lib/messagebroker.js +++ b/lib/messagebroker.js @@ -18,14 +18,14 @@ function MessageBroker(opts) { this.mq = require('socket.io-client').connect(url); this.mq.on('connect', function() {}); this.mq.on('connect_error', function() { - log.warn('Message queue server connection error'); + log.warn('Error connecting to message broker server @ ' + url); }); this.mq.on('msg', function(data) { self.emit('msg', data); }); - log.info('Using message queue server at ' + url); + log.info('Using message broker server at ' + url); } }; diff --git a/lib/server.js b/lib/server.js index 36c6130..bf71221 100644 --- a/lib/server.js +++ b/lib/server.js @@ -282,7 +282,7 @@ WalletService.prototype.replaceTemporaryRequestKey = function(opts, cb) { walletId: opts.walletId, copayerId: self.copayerId, copayerName: opts.name, - }, false, function() { + }, function() { return cb(null, { copayerId: self.copayerId, wallet: wallet @@ -308,11 +308,18 @@ WalletService.prototype._verifySignature = function(text, signature, pubKey) { * * @param {String} type * @param {Object} data - * @param {Boolean} isGlobal - If true, the notification is not issued on behalf of any particular copayer (defaults to false) + * @param {Object} opts + * @param {Boolean} opts.isGlobal - If true, the notification is not issued on behalf of any particular copayer (defaults to false) */ -WalletService.prototype._notify = function(type, data, isGlobal, cb) { +WalletService.prototype._notify = function(type, data, opts, cb) { var self = this; + if (_.isFunction(opts)) { + cb = opts; + opts = {}; + } + opts = opts || {}; + log.debug('Notification', type, data); var walletId = self.walletId || data.walletId; @@ -324,7 +331,7 @@ WalletService.prototype._notify = function(type, data, isGlobal, cb) { type: type, data: data, ticker: this.notifyTicker++, - creatorId: isGlobal ? null : copayerId, + creatorId: opts.isGlobal ? null : copayerId, walletId: walletId, }); this.storage.storeNotification(walletId, n, function() { @@ -393,7 +400,7 @@ WalletService.prototype.joinWallet = function(opts, cb) { walletId: opts.walletId, copayerId: copayer.id, copayerName: copayer.name, - }, false, function() { + }, function() { return cb(null, { copayerId: copayer.id, wallet: wallet @@ -426,7 +433,7 @@ WalletService.prototype.createAddress = function(opts, cb) { self._notify('NewAddress', { address: address.address, - }, false, function() { + }, function() { return cb(null, address); }); }); @@ -726,7 +733,7 @@ WalletService.prototype.createTx = function(opts, cb) { self._notify('NewTxProposal', { amount: opts.amount - }, false, function() { + }, function() { return cb(null, txp); }); }); @@ -800,7 +807,7 @@ WalletService.prototype.removePendingTx = function(opts, cb) { if (actors.length > 1 || (actors.length == 1 && actors[0] !== self.copayerId)) return cb(new ClientError('TXACTIONED', 'Cannot remove a proposal signed/rejected by other copayers')); - self._notify('TxProposalRemoved', {}, false, function() { + self._notify('TxProposalRemoved', {}, function() { self.storage.removeTx(self.walletId, txp.id, cb); }); }); @@ -863,13 +870,13 @@ WalletService.prototype.signTx = function(opts, cb) { self._notify('TxProposalAcceptedBy', { txProposalId: opts.txProposalId, copayerId: self.copayerId, - }, false, done); + }, done); }, function(done) { if (txp.isAccepted()) { self._notify('TxProposalFinallyAccepted', { txProposalId: opts.txProposalId, - }, false, done); + }, done); } else { done(); } @@ -918,7 +925,7 @@ WalletService.prototype.broadcastTx = function(opts, cb) { self._notify('NewOutgoingTx', { txProposalId: opts.txProposalId, txid: txid - }, false, function() { + }, function() { return cb(null, txp); }); }); @@ -964,13 +971,13 @@ WalletService.prototype.rejectTx = function(opts, cb) { self._notify('TxProposalRejectedBy', { txProposalId: opts.txProposalId, copayerId: self.copayerId, - }, false, done); + }, done); }, function(done) { if (txp.status == 'rejected') { self._notify('TxProposalFinallyRejected', { txProposalId: opts.txProposalId, - }, false, done); + }, done); } else { done(); } @@ -1319,7 +1326,9 @@ WalletService.prototype.startScan = function(opts, cb) { result: err ? 'error' : 'success', }; if (err) data.error = err; - self._notify('ScanFinished', data, true); + self._notify('ScanFinished', data, { + isGlobal: true + }); }; self.getWallet({}, function(err, wallet) { diff --git a/lib/storage.js b/lib/storage.js index 11740d5..f964126 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -100,6 +100,9 @@ Storage.prototype.storeWalletAndUpdateCopayersLookup = function(wallet, cb) { }; Storage.prototype.fetchCopayerLookup = function(copayerId, cb) { + this.db.collection(collections.COPAYERS_LOOKUP).createIndex({ + copayerId: 1 + }); this.db.collection(collections.COPAYERS_LOOKUP).findOne({ copayerId: copayerId }, function(err, result) { @@ -315,6 +318,9 @@ Storage.prototype.storeAddressAndWallet = function(wallet, addresses, cb) { Storage.prototype.fetchAddress = function(address, cb) { var self = this; + this.db.collection(collections.ADDRESSES).createIndex({ + address: 1 + }); this.db.collection(collections.ADDRESSES).findOne({ address: address, }, function(err, result) { diff --git a/lib/wsapp.js b/lib/wsapp.js index 9bc8ca8..da325f8 100644 --- a/lib/wsapp.js +++ b/lib/wsapp.js @@ -53,8 +53,8 @@ WsApp.prototype.start = function(server, opts, cb) { }); socket.emit('challenge', socket.nonce); - done(); }); + done(); }, ], function(err) { if (cb) return cb(err); From 02755dbdd65a075de5c630004a470893f70c2dcf Mon Sep 17 00:00:00 2001 From: Ivan Socolsky Date: Thu, 7 May 2015 15:04:32 -0300 Subject: [PATCH 12/12] resubscribe when disconnected --- lib/blockchainexplorer.js | 4 +++- lib/blockchainmonitor.js | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/blockchainexplorer.js b/lib/blockchainexplorer.js index 99586c3..8da38f2 100644 --- a/lib/blockchainexplorer.js +++ b/lib/blockchainexplorer.js @@ -62,7 +62,9 @@ function getAddressActivityInsight(url, addresses, cb) { }; function initSocketInsight(url) { - var socket = io.connect(url, {}); + var socket = io.connect(url, { + 'reconnection': true, + }); return socket; }; diff --git a/lib/blockchainmonitor.js b/lib/blockchainmonitor.js index e2ff233..3d2a245 100644 --- a/lib/blockchainmonitor.js +++ b/lib/blockchainmonitor.js @@ -58,11 +58,11 @@ BlockchainMonitor.prototype._initExplorer = function(provider, network, url) { var connectionInfo = provider + ' (' + network + ') @ ' + url; socket.on('connect', function() { log.info('Connected to ' + connectionInfo); + socket.emit('subscribe', 'inv'); }); socket.on('connect_error', function() { log.error('Error connecting to ' + connectionInfo); }); - socket.emit('subscribe', 'inv'); socket.on('tx', _.bind(self._handleIncommingTx, self)); return explorer;