Browse Source

Update zeromq usage to compat mode

Pavel Ševčík 3 years ago
parent
commit
439f30fee4
No known key found for this signature in database GPG Key ID: CFA54E4C0CD58DF0
  1. 6
      accounts/notifications-server.js
  2. 2
      lib/bitcoind-rpc/latest-block.js
  3. 2
      pushtx/orchestrator.js
  4. 4
      pushtx/pushtx-processor.js
  5. 2
      tracker/blockchain-processor.js
  6. 2
      tracker/mempool-processor.js
  7. 28
      tracker/tracker.js

6
accounts/notifications-server.js

@ -5,7 +5,7 @@
'use strict'
const _ = require('lodash')
const zmq = require('zeromq')
const zmq = require('zeromq/v5-compat')
const WebSocket = require('websocket')
const Logger = require('../lib/logger')
const network = require('../lib/bitcoin/network')
@ -40,7 +40,7 @@ class NotificationsServer {
this.httpServer = httpServer
if (this.notifService !== null) return
this.notifService = new NotificationsService(httpServer.server)
}
@ -54,7 +54,7 @@ class NotificationsServer {
this.sock.subscribe('block')
this.sock.subscribe('transaction')
this.sock.on('message', (topic, message, sequence) => {
this.sock.on('message', (topic, message) => {
switch(topic.toString()) {
case 'block':
try {

2
lib/bitcoind-rpc/latest-block.js

@ -4,7 +4,7 @@
*/
'use strict'
const zmq = require('zeromq')
const zmq = require('zeromq/v5-compat')
const Logger = require('../logger')
const util = require('../util')
const network = require('../bitcoin/network')

2
pushtx/orchestrator.js

@ -4,7 +4,7 @@
*/
'use strict'
const zmq = require('zeromq')
const zmq = require('zeromq/v5-compat')
const { Sema } = require('async-sema')
const Logger = require('../lib/logger')
const db = require('../lib/db/mysql-db-wrapper')

4
pushtx/pushtx-processor.js

@ -5,7 +5,7 @@
'use strict'
const bitcoin = require('bitcoinjs-lib')
const zmq = require('zeromq')
const zmq = require('zeromq/v5-compat')
const Logger = require('../lib/logger')
const errors = require('../lib/errors')
const db = require('../lib/db/mysql-db-wrapper')
@ -46,7 +46,7 @@ class PushTxProcessor {
initNotifications(config) {
// Notification socket for the tracker
this.notifSock = zmq.socket('pub')
this.notifSock.bindSync(config.uriSocket)
this.notifSock.bind(config.uriSocket)
}
/**

2
tracker/blockchain-processor.js

@ -5,7 +5,7 @@
'use strict'
const _ = require('lodash')
const zmq = require('zeromq')
const zmq = require('zeromq/v5-compat')
const { Sema } = require('async-sema')
const util = require('../lib/util')
const Logger = require('../lib/logger')

2
tracker/mempool-processor.js

@ -5,7 +5,7 @@
'use strict'
const _ = require('lodash')
const zmq = require('zeromq')
const zmq = require('zeromq/v5-compat')
const bitcoin = require('bitcoinjs-lib')
const util = require('../lib/util')
const Logger = require('../lib/logger')

28
tracker/tracker.js

@ -4,11 +4,12 @@
*/
'use strict'
const zmq = require('zeromq')
const zmq = require('zeromq/v5-compat')
const network = require('../lib/bitcoin/network')
const keys = require('../keys')[network.key]
const BlockchainProcessor = require('./blockchain-processor')
const MempoolProcessor = require('./mempool-processor')
const util = require('../lib/util')
/**
@ -22,12 +23,13 @@ class Tracker {
constructor() {
// Notification socket for client events
this.notifSock = zmq.socket('pub')
this.notifSock.bindSync(`tcp://127.0.0.1:${keys.ports.tracker}`)
// Initialize the blockchain processor
// and the mempool buffer
this.blockchainProcessor = new BlockchainProcessor(this.notifSock)
this.mempoolProcessor = new MempoolProcessor(this.notifSock)
this.notifSock.bind(`tcp://127.0.0.1:${keys.ports.tracker}`, () => {
// Initialize the blockchain processor
// and the mempool buffer
this.initialized = true
this.blockchainProcessor = new BlockchainProcessor(this.notifSock)
this.mempoolProcessor = new MempoolProcessor(this.notifSock)
})
}
/**
@ -35,10 +37,14 @@ class Tracker {
* @returns {Promise}
*/
async start() {
this.startupTimeout = setTimeout(async function() {
await this.blockchainProcessor.start()
await this.mempoolProcessor.start()
}.bind(this), 1500)
if (!this.initialized) {
await util.delay(1000)
return this.start()
}
await this.blockchainProcessor.start()
await this.mempoolProcessor.start()
}
/**

Loading…
Cancel
Save