Browse Source

switch to parallel processing of blocks by the tracker

umbrel
kenshin-samourai 4 years ago
parent
commit
7833af3504
  1. 50
      tracker/abstract-processor.js
  2. 173
      tracker/block-worker.js
  3. 7
      tracker/block.js
  4. 80
      tracker/blockchain-processor.js
  5. 222
      tracker/blocks-processor.js
  6. 34
      tracker/mempool-processor.js
  7. 2
      tracker/transaction.js
  8. 2
      tracker/transactions-bundle.js

50
tracker/abstract-processor.js

@ -1,50 +0,0 @@
/*!
* tracker/abstract-processor.js
* Copyright © 2019 Katana Cryptographic Ltd. All Rights Reserved.
*/
'use strict'
const RpcClient = require('../lib/bitcoind-rpc/rpc-client')
/**
* An abstract class for tracker processors
*/
class AbstractProcessor {
/**
* Constructor
* @param {object} notifSock - ZMQ socket used for notifications
*/
constructor(notifSock) {
// RPC client
this.client = new RpcClient()
// ZeroMQ socket for notifications sent to others components
this.notifSock = notifSock
}
/**
* Notify a new transaction
* @param {object} tx - bitcoin transaction
*/
notifyTx(tx) {
// Real-time client updates for this transaction.
// Any address input or output present in transaction
// is a potential client to notify.
if (this.notifSock)
this.notifSock.send(['transaction', JSON.stringify(tx)])
}
/**
* Notify a new block
* @param {string} header - block header
*/
notifyBlock(header) {
// Notify clients of the block
if (this.notifSock)
this.notifSock.send(['block', JSON.stringify(header)])
}
}
module.exports = AbstractProcessor

173
tracker/block-worker.js

@ -0,0 +1,173 @@
/*!
* tracker/block-worker.js
* Copyright © 2019 Katana Cryptographic Ltd. All Rights Reserved.
*/
'use strict'
const { isMainThread, parentPort } = require('worker_threads')
const network = require('../lib/bitcoin/network')
const keys = require('../keys')[network.key]
const db = require('../lib/db/mysql-db-wrapper')
const RpcClient = require('../lib/bitcoind-rpc/rpc-client')
const Block = require('./block')
/**
* STATUS
*/
const IDLE = 0
module.exports.IDLE = IDLE
const INITIALIZED = 1
module.exports.INITIALIZED = INITIALIZED
const OUTPUTS_PROCESSED = 2
module.exports.OUTPUTS_PROCESSED = OUTPUTS_PROCESSED
const INPUTS_PROCESSED = 3
module.exports.INPUTS_PROCESSED = INPUTS_PROCESSED
const TXS_CONFIRMED = 4
module.exports.TXS_CONFIRMED = TXS_CONFIRMED
/**
* OPS
*/
const OP_INIT = 0
module.exports.OP_INIT = OP_INIT
const OP_PROCESS_OUTPUTS = 1
module.exports.OP_PROCESS_OUTPUTS = OP_PROCESS_OUTPUTS
const OP_PROCESS_INPUTS = 2
module.exports.OP_PROCESS_INPUTS = OP_PROCESS_INPUTS
const OP_CONFIRM = 3
module.exports.OP_CONFIRM = OP_CONFIRM
const OP_RESET = 4
module.exports.OP_RESET = OP_RESET
/**
* Process message received by the worker
* @param {object} msg - message received by the worker
*/
async function processMessage(msg) {
let res = null
let success = true
try {
switch(msg.op) {
case OP_INIT:
if (status != IDLE)
throw 'Operation not allowed'
res = await initBlock(msg.header)
break
case OP_PROCESS_OUTPUTS:
if (status != INITIALIZED)
throw 'Operation not allowed'
res = await processOutputs()
break
case OP_PROCESS_INPUTS:
if (status != OUTPUTS_PROCESSED)
throw 'Operation not allowed'
res = await processInputs()
break
case OP_CONFIRM:
if (status != INPUTS_PROCESSED)
throw 'Operation not allowed'
res = await confirmTransactions(msg.blockId)
break
case OP_RESET:
res = await reset()
break
default:
throw 'Invalid Operation'
}
} catch (e) {
success = false
res = e
} finally {
parentPort.postMessage({
'op': msg.op,
'status': success,
'res': res
})
}
}
/**
* Initialize the block
* @param {object} header - block header
*/
async function initBlock(header) {
status = INITIALIZED
const hex = await rpcClient.getblock(header.hash, false)
block = new Block(hex, header)
return true
}
/**
* Process the transactions outputs
*/
async function processOutputs() {
status = OUTPUTS_PROCESSED
txsForBroadcast = await block.processOutputs()
return true
}
/**
* Process the transactions inputs
*/
async function processInputs() {
status = INPUTS_PROCESSED
const txs = await block.processInputs()
txsForBroadcast = txsForBroadcast.concat(txs)
return true
}
/**
* Confirm the transactions
* @param {integer} blockId - id of the block in db
*/
async function confirmTransactions(blockId) {
status = TXS_CONFIRMED
const aTxsForBroadcast = [...new Set(txsForBroadcast)]
await block.confirmTransactions(aTxsForBroadcast, blockId)
return aTxsForBroadcast
}
/**
* Reset
*/
function reset() {
status = IDLE
block = null
txsForBroadcast = []
return true
}
/**
* MAIN
*/
const rpcClient = new RpcClient()
let block = null
let txsForBroadcast = []
let status = IDLE
if (!isMainThread) {
db.connect({
connectionLimit: keys.db.connectionLimitTracker,
acquireTimeout: keys.db.acquireTimeout,
host: keys.db.host,
user: keys.db.user,
password: keys.db.pass,
database: keys.db.database
})
reset()
parentPort.on('message', processMessage)
}

7
tracker/block.js

@ -39,6 +39,9 @@ class Block extends TransactionsBundle {
/**
* Register the block and transactions of interest in db
* @dev This method isn't used anymore.
* It has been replaced by a parallel processing of blocks.
* (see blocks-processor and block-worker)
* @returns {Promise - object[]} returns an array of transactions to be broadcast
*/
async processBlock() {
@ -76,17 +79,13 @@ class Block extends TransactionsBundle {
*/
async processOutputs() {
const txsForBroadcast = new Set()
console.time('prefilterByOutputs')
const filteredTxs = await this.prefilterByOutputs()
console.timeEnd('prefilterByOutputs')
console.time('processOutputs')
await util.parallelCall(filteredTxs, async filteredTx => {
const tx = new Transaction(filteredTx)
await tx.processOutputs()
if (tx.doBroadcast)
txsForBroadcast.add(tx.tx)
})
console.timeEnd('processOutputs')
return [...txsForBroadcast]
}

80
tracker/blockchain-processor.js

@ -11,28 +11,34 @@ const util = require('../lib/util')
const Logger = require('../lib/logger')
const db = require('../lib/db/mysql-db-wrapper')
const network = require('../lib/bitcoin/network')
const RpcClient = require('../lib/bitcoind-rpc/rpc-client')
const keys = require('../keys')[network.key]
const AbstractProcessor = require('./abstract-processor')
const Block = require('./block')
const blocksProcessor = require('./blocks-processor')
/**
* A class allowing to process the blockchain
*/
class BlockchainProcessor extends AbstractProcessor {
class BlockchainProcessor {
/**
* Constructor
* @param {object} notifSock - ZMQ socket used for notifications
*/
constructor(notifSock) {
super(notifSock)
// RPC client
this.client = new RpcClient()
// ZeroMQ socket for bitcoind blocks messages
this.blkSock = null
// Initialize a semaphor protecting the onBlockHash() method
this._onBlockHashSemaphor = new Sema(1, { capacity: 50 })
// Array of worker threads used for parallel processing of blocks
this.blockWorkers = []
// Flag tracking Initial Block Download Mode
this.isIBD = true
// Initialize the blocks processor
blocksProcessor.init(notifSock)
}
/**
@ -155,7 +161,6 @@ class BlockchainProcessor extends AbstractProcessor {
if (highest == null) return null
if (daemonNbBlocks == highest.blockHeight) return null
// Compute blocks range to be processed
const blockRange = _.range(highest.blockHeight, daemonNbBlocks + 1)
Logger.info(`Tracker : Sync ${blockRange.length} blocks`)
@ -307,8 +312,6 @@ class BlockchainProcessor extends AbstractProcessor {
await db.unconfirmTransactions(txids)
}
// TODO: get accounts and notify of deletion ?
await db.deleteBlocksAfterHeight(height)
}
@ -331,7 +334,6 @@ class BlockchainProcessor extends AbstractProcessor {
Logger.info(`Blocks Rescan : starting a rescan for ${blockRange.length} blocks`)
// Process the blocks
try {
return this.processBlockRange(blockRange)
} catch(e) {
@ -341,64 +343,14 @@ class BlockchainProcessor extends AbstractProcessor {
}
/**
* Process a series of blocks
* Process a list of blocks
* @param {object[]} headers - array of block headers
*/
async processBlocks(headers) {
const MAX_NB_BLOCKS = 5
const chunks = util.splitList(headers, MAX_NB_BLOCKS)
return util.seriesCall(chunks, async chunk => {
const t0 = Date.now()
const sBlockRange = `${chunk[0].height}-${chunk[chunk.length-1].height}`
Logger.info(`Tracker : Beginning to process blocks ${sBlockRange}.`)
const txsForBroadcast = new Map()
// Initialize the blocks and process the transaction outputs
console.time('outputs')
const blocks = await util.parallelCall(chunk, async header => {
// Get raw block hex string from bitcoind
const hex = await this.client.getblock(header.hash, false)
const block = new Block(hex, header)
// Process the transaction outputs
const txs = await block.processOutputs()
txsForBroadcast.set(block.header.hash, txs)
return block
})
console.timeEnd('outputs')
// Process the transaction inputs
console.time('inputs')
await util.parallelCall(blocks, async block => {
const txs = await block.processInputs()
txsForBroadcast.set(
block.header.hash,
txs.concat(txsForBroadcast.get(block.header.hash))
)
})
console.timeEnd('inputs')
// Sort the blocks by ascending height
blocks.sort((a,b) => a.header.height - b.header.height)
// Register the block and confirm the transactions
console.time('confirm')
await util.seriesCall(blocks, async block => {
const blockId = await block.registerBlock()
const aTxsForBroadcast = [...new Set(txsForBroadcast.get(block.header.hash))]
await block.confirmTransactions(aTxsForBroadcast, blockId)
})
console.timeEnd('confirm')
console.time('notify')
await util.parallelCall(blocks, async block => {
const aTxsForBroadcast = [...new Set(txsForBroadcast.get(block.header.hash))]
for (let tx of aTxsForBroadcast)
this.notifyTx(tx)
this.notifyBlock(block.header)
})
console.timeEnd('notify')
const chunks = util.splitList(headers, blocksProcessor.nbWorkers)
const dt = ((Date.now()-t0)/1000).toFixed(1)
const per = ((Date.now()-t0)/MAX_NB_BLOCKS).toFixed(0)
Logger.info(`Tracker : Finished processing blocks ${sBlockRange}, ${dt}s, ${per}ms/block`)
await util.seriesCall(chunks, async chunk => {
return blocksProcessor.processChunk(chunk)
})
}
@ -407,17 +359,13 @@ class BlockchainProcessor extends AbstractProcessor {
* @param {int[]} heights - a range of block heights
*/
async processBlockRange(heights) {
const MAX_NB_BLOCKS = 5
const chunks = util.splitList(heights, MAX_NB_BLOCKS)
const chunks = util.splitList(heights, blocksProcessor.nbWorkers)
return util.seriesCall(chunks, async chunk => {
console.time('headers')
const headers = await util.parallelCall(chunk, async height => {
// Get raw block hex string from bitcoind
const hash = await this.client.getblockhash(height)
return await this.client.getblockheader(hash)
})
console.timeEnd('headers')
return this.processBlocks(headers)
})
}

222
tracker/blocks-processor.js

@ -0,0 +1,222 @@
/*!
* tracker/blocks-processor.js
* Copyright © 2019 Katana Cryptographic Ltd. All Rights Reserved.
*/
'use strict'
const os = require('os')
const Sema = require('async-sema')
const { Worker } = require('worker_threads')
const Logger = require('../lib/logger')
const util = require('../lib/util')
const dbProcessor = require('../lib/db/mysql-db-wrapper')
const blockWorker = require('./block-worker')
let notifSock = null
let blockWorkers = []
let headersChunk = []
let txsForBroadcast = []
let t0 = null
let currentOp = null
let nbTasksEnqueued = 0
let nbTasksCompleted = 0
// Semaphor protecting the processBloks() method
const _processBlocksSemaphor = new Sema(1)
// Number of worker threads processing the blocks in parallel
const nbWorkers = os.cpus().length //- 1
module.exports.nbWorkers = nbWorkers
/**
* Initialize the processor
* @param {object} notifSock - ZMQ socket used for notifications
*/
function init(notifSock) {
notifSock = notifSock
for (let i = 0; i < nbWorkers; i++) {
const worker = new Worker(
`${__dirname}/block-worker.js`,
{ workerData: null }
)
worker.on('error', processWorkerError)
worker.on('message', processWorkerMessage)
blockWorkers.push(worker)
}
}
module.exports.init = init
/**
* Process a chunk of block headers
* @param {object[]} chunk - array of block headers
*/
async function processChunk(chunk) {
// Acquire the semaphor (wait for previous chunk)
await _processBlocksSemaphor.acquire()
t0 = Date.now()
const sBlockRange = `${chunk[0].height}-${chunk[chunk.length-1].height}`
Logger.info(`Tracker : Beginning to process blocks ${sBlockRange}`)
// Process the chunk
chunk.sort((a,b) => a.height - b.height)
headersChunk = chunk
txsForBroadcast = []
processTask(blockWorker.OP_INIT)
}
module.exports.processChunk = processChunk
/**
* Process an error returned by a worker thread
* @param {object} e - error
*/
async function processWorkerError(e) {
return processWorkerMessage({
'op': currentOp,
'status': false,
'res': e
})
}
/**
* Process a message returned by a worker thread
* @param {object} msg - message sent by the worker thread
*/
async function processWorkerMessage(msg) {
nbTasksCompleted++
if (!msg.status) {
Logger.error(msg.res, 'Tracker : processWorkerMessage()')
} else if (msg.op == blockWorker.OP_CONFIRM) {
txsForBroadcast = txsForBroadcast.concat(msg.res)
}
if (nbTasksCompleted == nbTasksEnqueued) {
switch (msg.op) {
case blockWorker.OP_INIT:
// Process the transaction outputs
processTask(blockWorker.OP_PROCESS_OUTPUTS)
break
case blockWorker.OP_PROCESS_OUTPUTS:
// Process the transaction inputs
processTask(blockWorker.OP_PROCESS_INPUTS)
break
case blockWorker.OP_PROCESS_INPUTS:
// Store the blocks in db and get their id
const blockIds = await util.seriesCall(headersChunk, async header => {
return registerBlock(header)
})
// Confirm the transactions
processTask(blockWorker.OP_CONFIRM, blockIds)
break
case blockWorker.OP_CONFIRM:
// Notify new transactions and blocks
await Promise.all([
util.parallelCall(txsForBroadcast, async tx => {
notifyTx(tx)
}),
util.parallelCall(headersChunk, async header => {
notifyBlock(header)
})
])
// Process complete. Reset the workers
processTask(blockWorker.OP_RESET)
break
case blockWorker.OP_RESET:
const dt = ((Date.now()-t0)/1000).toFixed(1)
const per = ((Date.now()-t0)/headersChunk.length).toFixed(0)
const sBlockRange = `${headersChunk[0].height}-${headersChunk[headersChunk.length-1].height}`
Logger.info(`Tracker : Finished processing blocks ${sBlockRange}, ${dt}s, ${per}ms/block`)
// Release the semaphor
await _processBlocksSemaphor.release()
break
}
}
}
/**
* Execute an operation processing a block
* @param {integer} op - operation
* @param {*} args
*/
function processTask(op, args) {
currentOp = op
nbTasksEnqueued = 0
nbTasksCompleted = 0
switch (op) {
case blockWorker.OP_INIT:
for (let i = 0; i < headersChunk.length; i++) {
blockWorkers[i].postMessage({
'op': op,
'header': headersChunk[i]
})
nbTasksEnqueued++
}
break
case blockWorker.OP_PROCESS_OUTPUTS:
case blockWorker.OP_PROCESS_INPUTS:
case blockWorker.OP_RESET:
for (let i = 0; i < headersChunk.length; i++) {
blockWorkers[i].postMessage({'op': op})
nbTasksEnqueued++
}
break
case blockWorker.OP_CONFIRM:
for (let i = 0; i < headersChunk.length; i++) {
blockWorkers[i].postMessage({
'op': op,
'blockId': args[i]
})
nbTasksEnqueued++
}
break
default:
Logger.error(null, 'Tracker : processTask() : Unknown operation')
}
}
/**
* Notify a new transaction
* @param {object} tx - bitcoin transaction
*/
function notifyTx(tx) {
// Real-time client updates for this transaction.
// Any address input or output present in transaction
// is a potential client to notify.
if (notifSock)
notifSock.send(['transaction', JSON.stringify(tx)])
}
/**
* Notify a new block
* @param {string} header - block header
*/
function notifyBlock(header) {
// Notify clients of the block
if (notifSock)
notifSock.send(['block', JSON.stringify(header)])
}
/**
* Store a block in db
* @param {object} header - block header
* @returns {Promise - int} returns the id of the block
*/
async function registerBlock(header) {
const prevBlock = await dbProcessor.getBlockByHash(header.previousblockhash)
const prevID = (prevBlock && prevBlock.blockID) ? prevBlock.blockID : null
const blockId = await dbProcessor.addBlock({
blockHeight: header.height,
blockHash: header.hash,
blockTime: header.time,
blockParent: prevID
})
Logger.info(`Tracker : Added block ${header.height} (id=${blockId})`)
return blockId
}

34
tracker/mempool-processor.js

@ -11,8 +11,8 @@ const util = require('../lib/util')
const Logger = require('../lib/logger')
const db = require('../lib/db/mysql-db-wrapper')
const network = require('../lib/bitcoin/network')
const RpcClient = require('../lib/bitcoind-rpc/rpc-client')
const keys = require('../keys')[network.key]
const AbstractProcessor = require('./abstract-processor')
const Transaction = require('./transaction')
const TransactionsBundle = require('./transactions-bundle')
@ -20,14 +20,17 @@ const TransactionsBundle = require('./transactions-bundle')
/**
* A class managing a buffer for the mempool
*/
class MempoolProcessor extends AbstractProcessor {
class MempoolProcessor {
/**
* Constructor
* @param {object} notifSock - ZMQ socket used for notifications
*/
constructor(notifSock) {
super(notifSock)
// RPC client
this.client = new RpcClient()
// ZeroMQ socket for notifications sent to others components
this.notifSock = notifSock
// Mempool buffer
this.mempoolBuffer = new TransactionsBundle()
// ZeroMQ socket for bitcoind Txs messages
@ -221,6 +224,29 @@ class MempoolProcessor extends AbstractProcessor {
}
}
/**
* Notify a new transaction
* @param {object} tx - bitcoin transaction
*/
notifyTx(tx) {
// Real-time client updates for this transaction.
// Any address input or output present in transaction
// is a potential client to notify.
if (this.notifSock)
this.notifSock.send(['transaction', JSON.stringify(tx)])
}
/**
* Notify a new block
* @param {string} header - block header
*/
notifyBlock(header) {
// Notify clients of the block
if (this.notifSock)
this.notifSock.send(['block', JSON.stringify(header)])
}
/**
* Check unconfirmed transactions
* @returns {Promise}
@ -262,7 +288,7 @@ class MempoolProcessor extends AbstractProcessor {
const ntx = unconfirmedTxs.length
const dt = ((Date.now() - t0) / 1000).toFixed(1)
const per = (ntx == 0) ? 0 : ((Date.now() - t0) / ntx).toFixed(0)
Logger.info(`Tracker : Finished processing unconfirmed transactions ${dt}s, ${ntx} tx, ${per}ms/tx`)
Logger.info(`Tracker : Finished processing unconfirmed transactions ${dt}s, ${ntx} tx, ${per}ms/tx`)
}
/**

2
tracker/transaction.js

@ -406,7 +406,7 @@ class Transaction {
locktime: this.tx.locktime,
})
Logger.info(`Tracker : Storing transaction ${this.txid}`)
Logger.info(`Tracker : Storing transaction ${this.txid}`)
}
}

2
tracker/transactions-bundle.js

@ -96,7 +96,6 @@ class TransactionsBundle {
let indexedOutputs = {}
// Index the transaction outputs
console.time('outputScript2Address')
for (const i in txs) {
const tx = txs[i]
const txid = tx.getId()
@ -115,7 +114,6 @@ class TransactionsBundle {
} catch (e) {}
}
}
console.timeEnd('outputScript2Address')
// Prefilter
const outRes = await db.getUngroupedHDAccountsByAddresses(addresses)

Loading…
Cancel
Save