From 439f30fee48b8133e00db902e971aba0be29aee3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pavel=20=C5=A0ev=C4=8D=C3=ADk?= Date: Thu, 5 Aug 2021 01:19:02 +0200 Subject: [PATCH] Update zeromq usage to compat mode --- accounts/notifications-server.js | 6 +++--- lib/bitcoind-rpc/latest-block.js | 2 +- pushtx/orchestrator.js | 2 +- pushtx/pushtx-processor.js | 4 ++-- tracker/blockchain-processor.js | 2 +- tracker/mempool-processor.js | 2 +- tracker/tracker.js | 28 +++++++++++++++++----------- 7 files changed, 26 insertions(+), 20 deletions(-) diff --git a/accounts/notifications-server.js b/accounts/notifications-server.js index 1926677..94a0679 100644 --- a/accounts/notifications-server.js +++ b/accounts/notifications-server.js @@ -5,7 +5,7 @@ 'use strict' const _ = require('lodash') -const zmq = require('zeromq') +const zmq = require('zeromq/v5-compat') const WebSocket = require('websocket') const Logger = require('../lib/logger') const network = require('../lib/bitcoin/network') @@ -40,7 +40,7 @@ class NotificationsServer { this.httpServer = httpServer if (this.notifService !== null) return - + this.notifService = new NotificationsService(httpServer.server) } @@ -54,7 +54,7 @@ class NotificationsServer { this.sock.subscribe('block') this.sock.subscribe('transaction') - this.sock.on('message', (topic, message, sequence) => { + this.sock.on('message', (topic, message) => { switch(topic.toString()) { case 'block': try { diff --git a/lib/bitcoind-rpc/latest-block.js b/lib/bitcoind-rpc/latest-block.js index dfe659d..96856cd 100644 --- a/lib/bitcoind-rpc/latest-block.js +++ b/lib/bitcoind-rpc/latest-block.js @@ -4,7 +4,7 @@ */ 'use strict' -const zmq = require('zeromq') +const zmq = require('zeromq/v5-compat') const Logger = require('../logger') const util = require('../util') const network = require('../bitcoin/network') diff --git a/pushtx/orchestrator.js b/pushtx/orchestrator.js index acaa803..11b9b4b 100644 --- a/pushtx/orchestrator.js +++ b/pushtx/orchestrator.js @@ -4,7 +4,7 @@ */ 'use strict' -const zmq = require('zeromq') +const zmq = require('zeromq/v5-compat') const { Sema } = require('async-sema') const Logger = require('../lib/logger') const db = require('../lib/db/mysql-db-wrapper') diff --git a/pushtx/pushtx-processor.js b/pushtx/pushtx-processor.js index 11766e6..4112a0e 100644 --- a/pushtx/pushtx-processor.js +++ b/pushtx/pushtx-processor.js @@ -5,7 +5,7 @@ 'use strict' const bitcoin = require('bitcoinjs-lib') -const zmq = require('zeromq') +const zmq = require('zeromq/v5-compat') const Logger = require('../lib/logger') const errors = require('../lib/errors') const db = require('../lib/db/mysql-db-wrapper') @@ -46,7 +46,7 @@ class PushTxProcessor { initNotifications(config) { // Notification socket for the tracker this.notifSock = zmq.socket('pub') - this.notifSock.bindSync(config.uriSocket) + this.notifSock.bind(config.uriSocket) } /** diff --git a/tracker/blockchain-processor.js b/tracker/blockchain-processor.js index 8765ddb..06aff7f 100644 --- a/tracker/blockchain-processor.js +++ b/tracker/blockchain-processor.js @@ -5,7 +5,7 @@ 'use strict' const _ = require('lodash') -const zmq = require('zeromq') +const zmq = require('zeromq/v5-compat') const { Sema } = require('async-sema') const util = require('../lib/util') const Logger = require('../lib/logger') diff --git a/tracker/mempool-processor.js b/tracker/mempool-processor.js index bd30ef3..c913ff7 100644 --- a/tracker/mempool-processor.js +++ b/tracker/mempool-processor.js @@ -5,7 +5,7 @@ 'use strict' const _ = require('lodash') -const zmq = require('zeromq') +const zmq = require('zeromq/v5-compat') const bitcoin = require('bitcoinjs-lib') const util = require('../lib/util') const Logger = require('../lib/logger') diff --git a/tracker/tracker.js b/tracker/tracker.js index 1089faa..4c48bf3 100644 --- a/tracker/tracker.js +++ b/tracker/tracker.js @@ -4,11 +4,12 @@ */ 'use strict' -const zmq = require('zeromq') +const zmq = require('zeromq/v5-compat') const network = require('../lib/bitcoin/network') const keys = require('../keys')[network.key] const BlockchainProcessor = require('./blockchain-processor') const MempoolProcessor = require('./mempool-processor') +const util = require('../lib/util') /** @@ -22,12 +23,13 @@ class Tracker { constructor() { // Notification socket for client events this.notifSock = zmq.socket('pub') - this.notifSock.bindSync(`tcp://127.0.0.1:${keys.ports.tracker}`) - - // Initialize the blockchain processor - // and the mempool buffer - this.blockchainProcessor = new BlockchainProcessor(this.notifSock) - this.mempoolProcessor = new MempoolProcessor(this.notifSock) + this.notifSock.bind(`tcp://127.0.0.1:${keys.ports.tracker}`, () => { + // Initialize the blockchain processor + // and the mempool buffer + this.initialized = true + this.blockchainProcessor = new BlockchainProcessor(this.notifSock) + this.mempoolProcessor = new MempoolProcessor(this.notifSock) + }) } /** @@ -35,10 +37,14 @@ class Tracker { * @returns {Promise} */ async start() { - this.startupTimeout = setTimeout(async function() { - await this.blockchainProcessor.start() - await this.mempoolProcessor.start() - }.bind(this), 1500) + if (!this.initialized) { + await util.delay(1000) + + return this.start() + } + + await this.blockchainProcessor.start() + await this.mempoolProcessor.start() } /**