From 4a2b92bcb00a2d5b193f42c6e71a9031268ada87 Mon Sep 17 00:00:00 2001 From: Matias Alejo Garcia Date: Wed, 23 Nov 2016 12:17:28 -0300 Subject: [PATCH 1/6] rm websockets --- bitcorenode/index.js | 12 +---- bws.js | 111 +++++++++++++++++++++++-------------------- config.js | 2 +- lib/expressapp.js | 10 +++- lib/storage.js | 4 +- lib/wsapp.js | 65 ------------------------- test/bitcorenode.js | 66 ------------------------- 7 files changed, 74 insertions(+), 196 deletions(-) delete mode 100644 lib/wsapp.js diff --git a/bitcorenode/index.js b/bitcorenode/index.js index 11dead7..be77775 100644 --- a/bitcorenode/index.js +++ b/bitcorenode/index.js @@ -13,7 +13,6 @@ var Locker = require('locker-server'); var BlockchainMonitor = require('../lib/blockchainmonitor'); var EmailService = require('../lib/emailservice'); var ExpressApp = require('../lib/expressapp'); -var WsApp = require('../lib/wsapp'); var child_process = require('child_process'); var spawn = child_process.spawn; var EventEmitter = require('events').EventEmitter; @@ -110,7 +109,6 @@ Service.prototype._getConfiguration = function() { Service.prototype._startWalletService = function(config, next) { var self = this; var expressApp = new ExpressApp(); - var wsApp = new WsApp(); if (self.https) { var serverOpts = self._readHttpsOptions(); @@ -119,15 +117,7 @@ Service.prototype._startWalletService = function(config, next) { self.server = http.Server(expressApp.app); } - async.parallel([ - - function(done) { - expressApp.start(config, done); - }, - function(done) { - wsApp.start(self.server, config, done); - }, - ], function(err) { + expressApp.start(config, function(err){ if (err) { return next(err); } diff --git a/bws.js b/bws.js index fe0be8f..ed541d1 100755 --- a/bws.js +++ b/bws.js @@ -4,9 +4,7 @@ var async = require('async'); var fs = require('fs'); var ExpressApp = require('./lib/expressapp'); -var WsApp = require('./lib/wsapp'); var config = require('./config'); -var sticky = require('sticky-session'); var log = require('npmlog'); log.debug = log.verbose; log.disableColor(); @@ -41,60 +39,71 @@ if (config.https) { }; } -var start = function(cb) { - var expressApp = new ExpressApp(); - var wsApp = new WsApp(); - - function doStart(cb) { - var server = config.https ? serverModule.createServer(serverOpts, expressApp.app) : serverModule.Server(expressApp.app); - - server.on('connection', function(socket) { - socket.setTimeout(300 * 1000); - }) - - async.parallel([ - - function(done) { - expressApp.start(config, done); - }, - function(done) { - wsApp.start(server, config, done); - }, - ], function(err) { - if (err) { - log.error('Could not start BWS instance', err); - } - if (cb) return cb(err); - }); - - return server; - }; - - if (config.cluster) { - var server = sticky(clusterInstances, function() { - return doStart(); - }); - return cb(null, server); - } else { - var server = doStart(function(err) { - return cb(err, server); - }); - } -}; - if (config.cluster && !config.lockOpts.lockerServer) throw 'When running in cluster mode, locker server need to be configured'; if (config.cluster && !config.messageBrokerOpts.messageBrokerServer) throw 'When running in cluster mode, message broker server need to be configured'; -start(function(err, server) { +var expressApp = new ExpressApp(); + +function startInstance(cb) { + var server = config.https ? serverModule.createServer(serverOpts, expressApp.app) : serverModule.Server(expressApp.app); + + server.on('connection', function(socket) { + socket.setTimeout(300 * 1000); + }) + + expressApp.start(config, function(err) { + if (err) { + log.error('Could not start BWS instance', err); + return cb(err); + } + + server.listen(port); + return cb(); + }); +}; + + +var logStart = function(err) { if (err) { - console.log('Could not start BWS:', err); - process.exit(0); + log.error('Error:' + err); + return; } - server.listen(port, function(err) { - if (err) console.log('ERROR: ', err); - log.info('Bitcore Wallet Service running on port ' + port); - }); -}); + + if (cluster.worker) + log.info('BWS Instance ' + cluster.worker.id + ' running'); + else + log.info('BWS running'); +}; + + +if (config.cluster) { + + if (cluster.isMaster) { + + // Count the machine's CPUs + var instances = config.clusterInstances || require('os').cpus().length; + + log.info('Starting ' + instances + ' instances on port:' + port); + + // Create a worker for each CPU + for (var i = 0; i < instances; i += 1) { + cluster.fork(); + + // Listen for dying workers + cluster.on('exit', function(worker) { + // Replace the dead worker, + log.error('Worker ' + worker.id + ' died :('); + cluster.fork(); + }); + } + // Code to run if we're in a worker process + } else { + startInstance(logStart); + } +} else { + log.info('Starting on port: ' + port); + startInstance(logStart); +}; diff --git a/config.js b/config.js index 3e6a902..373fcf8 100644 --- a/config.js +++ b/config.js @@ -3,7 +3,7 @@ var config = { disableLogs: false, port: 3232, // Uncomment to make BWS a forking server - // cluster: true, + cluster: true, // Uncomment to use the nr of availalbe CPUs // clusterInstances: 4, diff --git a/lib/expressapp.js b/lib/expressapp.js index 1c50e53..6b1e51a 100644 --- a/lib/expressapp.js +++ b/lib/expressapp.js @@ -56,7 +56,15 @@ ExpressApp.prototype.start = function(opts, cb) { var POST_LIMIT = 1024 * 100 /* Max POST 100 kb */ ; this.app.use(bodyParser.json({ - limit: POST_LIMIT + limit: POST_LIMIT, + verify: function(req, res, buf, encoding) { + + // get rawBody + req.rawBody = buf.toString(); + console.log("rawBody", req.rawBody); + console.log("headers", req.headers); + + } })); if (opts.disableLogs) { diff --git a/lib/storage.js b/lib/storage.js index f63f968..ca0ee2b 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -49,7 +49,8 @@ Storage.prototype._createIndexes = function() { }); this.db.collection(collections.TXS).createIndex({ walletId: 1, - createdOn: -1, + isPending: 1, + createdOn: 1 }); this.db.collection(collections.NOTIFICATIONS).createIndex({ walletId: 1, @@ -74,6 +75,7 @@ Storage.prototype._createIndexes = function() { walletId: 1, txid: 1, }); + }; Storage.prototype.connect = function(opts, cb) { diff --git a/lib/wsapp.js b/lib/wsapp.js deleted file mode 100644 index 2fd2193..0000000 --- a/lib/wsapp.js +++ /dev/null @@ -1,65 +0,0 @@ -'use strict'; - -var $ = require('preconditions').singleton(); -var _ = require('lodash'); -var async = require('async'); -var log = require('npmlog'); -log.debug = log.verbose; -var Uuid = require('uuid'); - -var WalletService = require('./server'); -var MessageBroker = require('./messagebroker'); - -log.level = 'debug'; - -var WsApp = function() {}; - -WsApp.prototype._unauthorized = function(socket) { - socket.emit('unauthorized'); - socket.disconnect(); -}; - -WsApp.prototype._handleNotification = function(notification) { - var room = notification.walletId ? this.io.to(notification.walletId) : this.io; - room.emit('notification', notification); -}; - -WsApp.prototype.start = function(server, opts, cb) { - opts = opts || {}; - $.checkState(opts.messageBrokerOpts); - - var self = this; - - this.io = require('socket.io')(server); - - async.series([ - - function(done) { - self.messageBroker = new MessageBroker(opts.messageBrokerOpts); - self.messageBroker.onMessage(_.bind(self._handleNotification, self)); - done(); - }, - function(done) { - self.io.on('connection', function(socket) { - socket.nonce = Uuid.v4(); - socket.on('authorize', function(data) { - if (data.message != socket.nonce) return self._unauthorized(socket); - - WalletService.getInstanceWithAuth(data, function(err, service) { - if (err) return self._unauthorized(socket); - - socket.join(service.walletId); - socket.emit('authorized'); - }); - }); - - socket.emit('challenge', socket.nonce); - }); - done(); - }, - ], function(err) { - if (cb) return cb(err); - }); -}; - -module.exports = WsApp; diff --git a/test/bitcorenode.js b/test/bitcorenode.js index f1e9806..18f1c02 100644 --- a/test/bitcorenode.js +++ b/test/bitcorenode.js @@ -138,45 +138,6 @@ describe('Bitcore Node Service', function() { }); }); describe('#_startWalletService', function() { - it('will start express and web socket servers', function(done) { - function TestExpressApp() {} - TestExpressApp.prototype.start = sinon.stub().callsArg(1); - function TestWSApp() {} - TestWSApp.prototype.start = sinon.stub().callsArg(2); - var listen = sinon.stub().callsArg(1); - var TestService = proxyquire('../bitcorenode', { - '../lib/expressapp': TestExpressApp, - '../lib/wsapp': TestWSApp, - 'http': { - Server: sinon.stub().returns({ - listen: listen - }) - } - }); - var options = { - node: { - bwsPort: 3232 - } - }; - var service = new TestService(options); - var config = {}; - service._startWalletService(config, function(err) { - if (err) { - throw err; - } - TestExpressApp.prototype.start.callCount.should.equal(1); - TestExpressApp.prototype.start.args[0][0].should.equal(config); - TestExpressApp.prototype.start.args[0][1].should.be.a('function'); - TestWSApp.prototype.start.callCount.should.equal(1); - TestWSApp.prototype.start.args[0][0].should.equal(service.server); - TestWSApp.prototype.start.args[0][1].should.equal(config); - TestWSApp.prototype.start.args[0][2].should.be.a('function'); - listen.callCount.should.equal(1); - listen.args[0][0].should.equal(3232); - listen.args[0][1].should.be.a('function'); - done(); - }); - }); it('error from express', function(done) { function TestExpressApp() {} TestExpressApp.prototype.start = sinon.stub().callsArgWith(1, new Error('test')); @@ -204,33 +165,6 @@ describe('Bitcore Node Service', function() { done(); }); }); - it('error from web socket', function(done) { - function TestExpressApp() {} - TestExpressApp.prototype.start = sinon.stub().callsArg(1); - function TestWSApp() {} - TestWSApp.prototype.start = sinon.stub().callsArgWith(2, new Error('test')); - var listen = sinon.stub().callsArg(1); - var TestService = proxyquire('../bitcorenode', { - '../lib/expressapp': TestExpressApp, - '../lib/wsapp': TestWSApp, - 'http': { - Server: sinon.stub().returns({ - listen: listen - }) - } - }); - var options = { - node: { - bwsPort: 3232 - } - }; - var service = new TestService(options); - var config = {}; - service._startWalletService(config, function(err) { - err.message.should.equal('test'); - done(); - }); - }); it('error from server.listen', function(done) { var app = {}; function TestExpressApp() { From 210b73d1988672dd43e2a462d81f1245fbee6712 Mon Sep 17 00:00:00 2001 From: Matias Alejo Garcia Date: Wed, 23 Nov 2016 13:05:04 -0300 Subject: [PATCH 2/6] fix dying workers handle --- bws.js | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/bws.js b/bws.js index ed541d1..7adc32d 100755 --- a/bws.js +++ b/bws.js @@ -91,14 +91,14 @@ if (config.cluster) { // Create a worker for each CPU for (var i = 0; i < instances; i += 1) { cluster.fork(); - - // Listen for dying workers - cluster.on('exit', function(worker) { - // Replace the dead worker, - log.error('Worker ' + worker.id + ' died :('); - cluster.fork(); - }); } + + // Listen for dying workers + cluster.on('exit', function(worker) { + // Replace the dead worker, + log.error('Worker ' + worker.id + ' died :('); + cluster.fork(); + }); // Code to run if we're in a worker process } else { startInstance(logStart); From 598f6f1361efc8512d07c98f4a7704d89fb797ae Mon Sep 17 00:00:00 2001 From: Matias Alejo Garcia Date: Thu, 24 Nov 2016 10:01:59 -0300 Subject: [PATCH 3/6] rm old changes --- lib/expressapp.js | 10 +--------- lib/storage.js | 4 +--- 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/lib/expressapp.js b/lib/expressapp.js index 6b1e51a..1c50e53 100644 --- a/lib/expressapp.js +++ b/lib/expressapp.js @@ -56,15 +56,7 @@ ExpressApp.prototype.start = function(opts, cb) { var POST_LIMIT = 1024 * 100 /* Max POST 100 kb */ ; this.app.use(bodyParser.json({ - limit: POST_LIMIT, - verify: function(req, res, buf, encoding) { - - // get rawBody - req.rawBody = buf.toString(); - console.log("rawBody", req.rawBody); - console.log("headers", req.headers); - - } + limit: POST_LIMIT })); if (opts.disableLogs) { diff --git a/lib/storage.js b/lib/storage.js index ca0ee2b..f63f968 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -49,8 +49,7 @@ Storage.prototype._createIndexes = function() { }); this.db.collection(collections.TXS).createIndex({ walletId: 1, - isPending: 1, - createdOn: 1 + createdOn: -1, }); this.db.collection(collections.NOTIFICATIONS).createIndex({ walletId: 1, @@ -75,7 +74,6 @@ Storage.prototype._createIndexes = function() { walletId: 1, txid: 1, }); - }; Storage.prototype.connect = function(opts, cb) { From 313a8d667a30ed68de62d3d7b8b57bbb31652133 Mon Sep 17 00:00:00 2001 From: Matias Alejo Garcia Date: Thu, 24 Nov 2016 10:03:00 -0300 Subject: [PATCH 4/6] revert dflt config --- config.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config.js b/config.js index 373fcf8..3e6a902 100644 --- a/config.js +++ b/config.js @@ -3,7 +3,7 @@ var config = { disableLogs: false, port: 3232, // Uncomment to make BWS a forking server - cluster: true, + // cluster: true, // Uncomment to use the nr of availalbe CPUs // clusterInstances: 4, From f7e049c7adc8139f6bc8e0edee5699798c2a9240 Mon Sep 17 00:00:00 2001 From: Matias Alejo Garcia Date: Thu, 24 Nov 2016 10:04:36 -0300 Subject: [PATCH 5/6] better config --- config.js | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/config.js b/config.js index 3e6a902..77355bb 100644 --- a/config.js +++ b/config.js @@ -2,9 +2,11 @@ var config = { basePath: '/bws/api', disableLogs: false, port: 3232, - // Uncomment to make BWS a forking server - // cluster: true, - // Uncomment to use the nr of availalbe CPUs + + // comment this to use a single process + cluster: true, + + // Uncomment to set the number or process (will use the nr of availalbe CPUs by default) // clusterInstances: 4, // https: true, From fbe7dc4fa0917430a650054a46c4d72509593afe Mon Sep 17 00:00:00 2001 From: Matias Alejo Garcia Date: Thu, 24 Nov 2016 10:15:46 -0300 Subject: [PATCH 6/6] ref start --- bws.js | 58 ++++++++++++++++++++----------------------------------- config.js | 4 ++-- 2 files changed, 23 insertions(+), 39 deletions(-) diff --git a/bws.js b/bws.js index 7adc32d..db61478 100755 --- a/bws.js +++ b/bws.js @@ -57,53 +57,37 @@ function startInstance(cb) { expressApp.start(config, function(err) { if (err) { log.error('Could not start BWS instance', err); - return cb(err); + return; } server.listen(port); - return cb(); - }); -}; - -var logStart = function(err) { - if (err) { - log.error('Error:' + err); + var instanceInfo = cluster.worker ? ' [Instance:' + cluster.worker.id + ']' : ''; + log.info('BWS running ' + instanceInfo); return; - } - - if (cluster.worker) - log.info('BWS Instance ' + cluster.worker.id + ' running'); - else - log.info('BWS running'); + }); }; +if (config.cluster && cluster.isMaster) { -if (config.cluster) { + // Count the machine's CPUs + var instances = config.clusterInstances || require('os').cpus().length; - if (cluster.isMaster) { + log.info('Starting ' + instances + ' instances'); - // Count the machine's CPUs - var instances = config.clusterInstances || require('os').cpus().length; - - log.info('Starting ' + instances + ' instances on port:' + port); - - // Create a worker for each CPU - for (var i = 0; i < instances; i += 1) { - cluster.fork(); - } - - // Listen for dying workers - cluster.on('exit', function(worker) { - // Replace the dead worker, - log.error('Worker ' + worker.id + ' died :('); - cluster.fork(); - }); - // Code to run if we're in a worker process - } else { - startInstance(logStart); + // Create a worker for each CPU + for (var i = 0; i < instances; i += 1) { + cluster.fork(); } + + // Listen for dying workers + cluster.on('exit', function(worker) { + // Replace the dead worker, + log.error('Worker ' + worker.id + ' died :('); + cluster.fork(); + }); + // Code to run if we're in a worker process } else { - log.info('Starting on port: ' + port); - startInstance(logStart); + log.info('Listening on port: ' + port); + startInstance(); }; diff --git a/config.js b/config.js index 77355bb..6c0bc4e 100644 --- a/config.js +++ b/config.js @@ -3,8 +3,8 @@ var config = { disableLogs: false, port: 3232, - // comment this to use a single process - cluster: true, + // Uncomment to make BWS a forking server + // cluster: true, // Uncomment to set the number or process (will use the nr of availalbe CPUs by default) // clusterInstances: 4,