From 63a62e9ce6d1fdb6266d7a0db816467525238926 Mon Sep 17 00:00:00 2001 From: kenshin-samourai Date: Wed, 7 Apr 2021 14:25:30 +0200 Subject: [PATCH 1/3] refactoring of tracker prior to parallel processing of blocks --- tracker/block.js | 122 ++++++++++++++++++++--------- tracker/blockchain-processor.js | 135 +++++++++++++++++++++----------- tracker/mempool-processor.js | 23 ++++-- tracker/transaction.js | 8 +- tracker/transactions-bundle.js | 109 ++++++++++++-------------- 5 files changed, 244 insertions(+), 153 deletions(-) diff --git a/tracker/block.js b/tracker/block.js index 20c6354..22ff15b 100644 --- a/tracker/block.js +++ b/tracker/block.js @@ -26,44 +26,91 @@ class Block extends TransactionsBundle { super() this.hex = hex this.header = header + + try { + const block = bitcoin.Block.fromHex(hex) + this.transactions = block.transactions + } catch (e) { + Logger.error(e, 'Tracker : Block()') + Logger.error(null, header) + return Promise.reject(e) + } } /** * Register the block and transactions of interest in db * @returns {Promise - object[]} returns an array of transactions to be broadcast */ - async checkBlock() { + async processBlock() { Logger.info('Tracker : Beginning to process new block.') - let block - const txsForBroadcast = [] + const t0 = Date.now() - try { - block = bitcoin.Block.fromHex(this.hex) - this.transactions = block.transactions - } catch (e) { - Logger.error(e, 'Tracker : Block.checkBlock()') - Logger.error(null, this.header) - return Promise.reject(e) - } + const txsForBroadcast = new Map() - const t0 = Date.now() - let ntx = 0 - - // Filter transactions - const filteredTxs = await this.prefilterTransactions() - - // Check filtered transactions - // and broadcast notifications - await util.seriesCall(filteredTxs, async tx => { - const filteredTx = new Transaction(tx) - const txCheck = await filteredTx.checkTransaction() - if (txCheck && txCheck.broadcast) - txsForBroadcast.push(txCheck.tx) + const txsForBroadcast1 = await this.processOutputs() + txsForBroadcast1.map(tx => {txsForBroadcast.set(tx.getId(), tx)}) + + const txsForBroadcast2 = await this.processInputs() + txsForBroadcast2.map(tx => {txsForBroadcast.set(tx.getId(), tx)}) + + const aTxsForBroadcast = [...txsForBroadcast.values()] + + const blockId = await this.registerBlock() + + await this.confirmTransactions(aTxsForBroadcast, blockId) + + // Logs and result returned + const ntx = this.transactions.length + const dt = ((Date.now()-t0)/1000).toFixed(1) + const per = ((Date.now()-t0)/ntx).toFixed(0) + Logger.info(`Tracker : Finished block ${this.header.height}, ${dt}s, ${ntx} tx, ${per}ms/tx`) + + return aTxsForBroadcast + } + + + /** + * Process the transaction outputs + * @returns {Promise - object[]} returns an array of transactions to be broadcast + */ + async processOutputs() { + const txsForBroadcast = new Set() + console.time('prefilterByOutputs') + const filteredTxs = await this.prefilterByOutputs() + console.timeEnd('prefilterByOutputs') + console.time('processOutputs') + await util.parallelCall(filteredTxs, async filteredTx => { + const tx = new Transaction(filteredTx) + await tx.processOutputs() + if (tx.doBroadcast) + txsForBroadcast.add(tx.tx) + }) + console.timeEnd('processOutputs') + return [...txsForBroadcast] + } + + /** + * Process the transaction inputs + * @returns {Promise - object[]} returns an array of transactions to be broadcast + */ + async processInputs() { + const txsForBroadcast = new Set() + const filteredTxs = await this.prefilterByInputs() + await util.parallelCall(filteredTxs, async filteredTx => { + const tx = new Transaction(filteredTx) + await tx.processInputs() + if (tx.doBroadcast) + txsForBroadcast.add(tx.tx) }) + return [...txsForBroadcast] + } - // Retrieve the previous block - // and store the new block into the database + /** + * Store the block in db + * @returns {Promise - int} returns the id of the block + */ + async registerBlock() { const prevBlock = await db.getBlockByHash(this.header.previousblockhash) const prevID = (prevBlock && prevBlock.blockID) ? prevBlock.blockID : null @@ -76,18 +123,19 @@ class Block extends TransactionsBundle { Logger.info(`Tracker : Added block ${this.header.height} (id=${blockId})`) - // Confirms the transactions - const txids = this.transactions.map(t => t.getId()) - ntx = txids.length - const txidLists = util.splitList(txids, 100) - await util.parallelCall(txidLists, list => db.confirmTransactions(list, blockId)) - - // Logs and result returned - const dt = ((Date.now()-t0)/1000).toFixed(1) - const per = ((Date.now()-t0)/ntx).toFixed(0) - Logger.info(`Tracker : Finished block ${this.header.height}, ${dt}s, ${ntx} tx, ${per}ms/tx`) + return blockId + } - return txsForBroadcast + /** + * Confirm the transactions in db + * @param {Set} txs - set of transactions stored in db + * @param {int} blockId - id of the block + * r@returns {Promise} + */ + async confirmTransactions(txs, blockId) { + const txids = txs.map(t => t.getId()) + const txidLists = util.splitList(txids, 100) + return util.parallelCall(txidLists, list => db.confirmTransactions(list, blockId)) } /** diff --git a/tracker/blockchain-processor.js b/tracker/blockchain-processor.js index 78e0d50..8f729c5 100644 --- a/tracker/blockchain-processor.js +++ b/tracker/blockchain-processor.js @@ -161,17 +161,12 @@ class BlockchainProcessor extends AbstractProcessor { Logger.info(`Tracker : Sync ${blockRange.length} blocks`) - // Process the blocks - return util.seriesCall(blockRange, async height => { - try { - const hash = await this.client.getblockhash(height) - const header = await this.client.getblockheader(hash) - return this.processBlock(header) - } catch(e) { - Logger.error(e, 'Tracker : BlockchainProcessor.catchupNormalMode()') - process.exit() - } - }, 'Tracker syncing', true) + try { + return this.processBlockRange(blockRange) + } catch(e) { + Logger.error(e, 'Tracker : BlockchainProcessor.catchupNormalMode()') + process.exit() + } } catch(e) { Logger.error(e, 'Tracker : BlockchainProcessor.catchupNormalMode()') @@ -259,9 +254,7 @@ class BlockchainProcessor extends AbstractProcessor { await this.rewind(knownHeight) // Process the blocks - return await util.seriesCall(headers, header => { - return this.processBlock(header) - }) + return await this.processBlocks(headers) } catch(e) { Logger.error(e, 'Tracker : BlockchainProcessor.onBlockHash()') @@ -340,44 +333,94 @@ class BlockchainProcessor extends AbstractProcessor { Logger.info(`Blocks Rescan : starting a rescan for ${blockRange.length} blocks`) // Process the blocks - return util.seriesCall(blockRange, async height => { - try { - Logger.info(`Tracker : Rescanning block ${height}`) - const hash = await this.client.getblockhash(height) - const header = await this.client.getblockheader(hash) - return this.processBlock(header) - } catch(e) { - Logger.error(e, 'Tracker : BlockchainProcessor.rescan()') - throw e - } - }, 'Tracker rescan', true) + try { + return this.processBlockRange(blockRange) + } catch(e) { + Logger.error(e, 'Tracker : BlockchainProcessor.rescan()') + throw e + } } /** - * Process a block - * @param {object} header - block header - * @returns {Promise} + * Process a series of blocks + * @param {object[]} headers - array of block headers */ - async processBlock(header) { - try { - // Get raw block hex string from bitcoind - const hex = await this.client.getblock(header.hash, false) - - const block = new Block(hex, header) - - const txsForBroadcast = await block.checkBlock() - - // Send notifications - for (let tx of txsForBroadcast) - this.notifyTx(tx) + async processBlocks(headers) { + const MAX_NB_BLOCKS = 5 + const chunks = util.splitList(headers, MAX_NB_BLOCKS) + + return util.seriesCall(chunks, async chunk => { + const t0 = Date.now() + const sBlockRange = `${chunk[0].height}-${chunk[chunk.length-1].height}` + Logger.info(`Tracker : Beginning to process blocks ${sBlockRange}.`) + + const txsForBroadcast = new Map() + // Initialize the blocks and process the transaction outputs + console.time('outputs') + const blocks = await util.parallelCall(chunk, async header => { + // Get raw block hex string from bitcoind + const hex = await this.client.getblock(header.hash, false) + const block = new Block(hex, header) + // Process the transaction outputs + const txs = await block.processOutputs() + txsForBroadcast.set(block.header.hash, txs) + return block + }) + console.timeEnd('outputs') + // Process the transaction inputs + console.time('inputs') + await util.parallelCall(blocks, async block => { + const txs = await block.processInputs() + txsForBroadcast.set( + block.header.hash, + txs.concat(txsForBroadcast.get(block.header.hash)) + ) + }) + console.timeEnd('inputs') + // Sort the blocks by ascending height + blocks.sort((a,b) => a.header.height - b.header.height) + // Register the block and confirm the transactions + console.time('confirm') + await util.seriesCall(blocks, async block => { + const blockId = await block.registerBlock() + const aTxsForBroadcast = [...new Set(txsForBroadcast.get(block.header.hash))] + await block.confirmTransactions(aTxsForBroadcast, blockId) + }) + console.timeEnd('confirm') + + console.time('notify') + await util.parallelCall(blocks, async block => { + const aTxsForBroadcast = [...new Set(txsForBroadcast.get(block.header.hash))] + for (let tx of aTxsForBroadcast) + this.notifyTx(tx) + this.notifyBlock(block.header) + }) + console.timeEnd('notify') - this.notifyBlock(header) + const dt = ((Date.now()-t0)/1000).toFixed(1) + const per = ((Date.now()-t0)/MAX_NB_BLOCKS).toFixed(0) + Logger.info(`Tracker : Finished processing blocks ${sBlockRange}, ${dt}s, ${per}ms/block`) + }) + } - } catch(e) { - // The show must go on. - // TODO: further notification that this block did not check out - Logger.error(e, 'Tracker : BlockchainProcessor.processBlock()') - } + /** + * Process a range of blocks + * @param {int[]} heights - a range of block heights + */ + async processBlockRange(heights) { + const MAX_NB_BLOCKS = 5 + const chunks = util.splitList(heights, MAX_NB_BLOCKS) + + return util.seriesCall(chunks, async chunk => { + console.time('headers') + const headers = await util.parallelCall(chunk, async height => { + // Get raw block hex string from bitcoind + const hash = await this.client.getblockhash(height) + return await this.client.getblockheader(hash) + }) + console.timeEnd('headers') + return this.processBlocks(headers) + }) } /** diff --git a/tracker/mempool-processor.js b/tracker/mempool-processor.js index 17f8bf2..cdfb513 100644 --- a/tracker/mempool-processor.js +++ b/tracker/mempool-processor.js @@ -152,14 +152,27 @@ class MempoolProcessor extends AbstractProcessor { let currentMempool = new TransactionsBundle(this.mempoolBuffer.toArray()) this.mempoolBuffer.clear() - const filteredTxs = await currentMempool.prefilterTransactions() + const txsForBroadcast = new Map() - return util.seriesCall(filteredTxs, async filteredTx => { + let filteredTxs = await currentMempool.prefilterByOutputs() + await util.parallelCall(filteredTxs, async filteredTx => { const tx = new Transaction(filteredTx) - const txCheck = await tx.checkTransaction() - if (txCheck && txCheck.broadcast) - this.notifyTx(txCheck.tx) + await tx.processOutputs() + if (tx.doBroadcast) + txsForBroadcast[tx.txid] = tx.tx }) + + filteredTxs = await currentMempool.prefilterByInputs() + await util.parallelCall(filteredTxs, async filteredTx => { + const tx = new Transaction(filteredTx) + await tx.processInputs() + if (tx.doBroadcast) + txsForBroadcast[tx.txid] = tx.tx + }) + + // Send the notifications + for (let tx of txsForBroadcast.values()) + this.notifyTx(tx) } /** diff --git a/tracker/transaction.js b/tracker/transaction.js index ee51450..89ff08e 100644 --- a/tracker/transaction.js +++ b/tracker/transaction.js @@ -47,10 +47,10 @@ class Transaction { async checkTransaction() { try { // Process transaction inputs - await this._processInputs() + await this.processInputs() // Process transaction outputs - await this._processOutputs() + await this.processOutputs() // If this point reached with no errors, // store the fact that this transaction was checked. @@ -73,7 +73,7 @@ class Transaction { * Process transaction inputs * @returns {Promise} */ - async _processInputs() { + async processInputs() { // Array of inputs spent const spends = [] // Store input indices, keyed by `txid-outindex` for easy retrieval @@ -151,7 +151,7 @@ class Transaction { * Process transaction outputs * @returns {Promise} */ - async _processOutputs() { + async processOutputs() { // Store outputs, keyed by address. Values are arrays of outputs const indexedOutputs = {} diff --git a/tracker/transactions-bundle.js b/tracker/transactions-bundle.js index 3eea200..ef1b19b 100644 --- a/tracker/transactions-bundle.js +++ b/tracker/transactions-bundle.js @@ -6,13 +6,9 @@ const _ = require('lodash') const LRU = require('lru-cache') -const bitcoin = require('bitcoinjs-lib') const util = require('../lib/util') const db = require('../lib/db/mysql-db-wrapper') const addrHelper = require('../lib/bitcoin/addresses-helper') -const network = require('../lib/bitcoin/network') -const keys = require('../keys')[network.key] -const activeNet = network.network /** @@ -64,51 +60,47 @@ class TransactionsBundle { /** * Find the transactions of interest + * based on theirs inputs * @returns {object[]} returns an array of transactions objects */ - async prefilterTransactions() { + async prefilterByInputs() { // Process transactions by slices of 5000 transactions const MAX_NB_TXS = 5000 const lists = util.splitList(this.transactions, MAX_NB_TXS) - const results = await util.seriesCall(lists, txs => this._prefilterTransactions(txs)) + const results = await util.parallelCall(lists, txs => this._prefilterByInputs(txs)) return _.flatten(results) } /** - * Find the transactions of interest (internal implementation) - * @params {object[]} transactions - array of transactions objects + * Find the transactions of interest + * based on theirs outputs * @returns {object[]} returns an array of transactions objects */ - async _prefilterTransactions(transactions) { - let inputs = [] - let outputs = [] + async prefilterByOutputs() { + // Process transactions by slices of 5000 transactions + const MAX_NB_TXS = 5000 + const lists = util.splitList(this.transactions, MAX_NB_TXS) + const results = await util.parallelCall(lists, txs => this._prefilterByOutputs(txs)) + return _.flatten(results) + } - // Store indices of txs to be processed + /** + * Find the transactions of interest + * based on theirs outputs (internal implementation) + * @params {object[]} txs - array of transactions objects + * @returns {object[]} returns an array of transactions objects + */ + async _prefilterByOutputs(txs) { + let addresses = [] let filteredIdxTxs = [] - - // Store txs indices, keyed by `txid-outindex`. - // Values are arrays of txs indices (for double spends) - let indexedInputs = {} - - // Store txs indices, keyed by address. - // Values are arrays of txs indices let indexedOutputs = {} - // Stores txs indices, keyed by txids - let indexedTxs = {} - - // - // Prefilter against the outputs - // - // Index the transaction outputs - for (const i in transactions) { - const tx = transactions[i] + console.time('outputScript2Address') + for (const i in txs) { + const tx = txs[i] const txid = tx.getId() - indexedTxs[txid] = i - - // If we already checked this tx if (TransactionsBundle.cache.has(txid)) continue @@ -116,18 +108,17 @@ class TransactionsBundle { try { const script = tx.outs[j].script const address = addrHelper.outputScript2Address(script) - outputs.push(address) - // Index the output + addresses.push(address) if (!indexedOutputs[address]) indexedOutputs[address] = [] indexedOutputs[address].push(i) } catch (e) {} } } + console.timeEnd('outputScript2Address') // Prefilter - const outRes = await db.getUngroupedHDAccountsByAddresses(outputs) - + const outRes = await db.getUngroupedHDAccountsByAddresses(addresses) for (const i in outRes) { const key = outRes[i].addrAddress const idxTxs = indexedOutputs[key] @@ -138,35 +129,36 @@ class TransactionsBundle { } } - // - // Prefilter against the inputs - // + return filteredIdxTxs.map(x => txs[x]) + } + + /** + * Find the transactions of interest + * based on theirs inputs (internal implementation) + * @params {object[]} txs - array of transactions objects + * @returns {object[]} returns an array of transactions objects + */ + async _prefilterByInputs(txs) { + let inputs = [] + let filteredIdxTxs = [] + let indexedInputs = {} - // Index the transaction inputs - for (const i in transactions) { - const tx = transactions[i] + for (const i in txs) { + const tx = txs[i] const txid = tx.getId() - // If we already checked this tx if (TransactionsBundle.cache.has(txid)) continue for (const j in tx.ins) { const spendHash = tx.ins[j].hash const spendTxid = Buffer.from(spendHash).reverse().toString('hex') - // Check if this input consumes an output - // generated by a transaction from this block - if (filteredIdxTxs.indexOf(indexedTxs[spendTxid]) > -1 && filteredIdxTxs.indexOf(i) == -1) { - filteredIdxTxs.push(i) - } else { - const spendIdx = tx.ins[j].index - inputs.push({txid: spendTxid, index: spendIdx}) - // Index the input - const key = spendTxid + '-' + spendIdx - if (!indexedInputs[key]) - indexedInputs[key] = [] - indexedInputs[key].push(i) - } + const spendIdx = tx.ins[j].index + inputs.push({txid: spendTxid, index: spendIdx}) + const key = spendTxid + '-' + spendIdx + if (!indexedInputs[key]) + indexedInputs[key] = [] + indexedInputs[key].push(i) } } @@ -174,7 +166,6 @@ class TransactionsBundle { const lists = util.splitList(inputs, 1000) const results = await util.parallelCall(lists, list => db.getOutputSpends(list)) const inRes = _.flatten(results) - for (const i in inRes) { const key = inRes[i].txnTxid + '-' + inRes[i].outIndex const idxTxs = indexedInputs[key] @@ -185,11 +176,7 @@ class TransactionsBundle { } } - // - // Returns the matching transactions - // - filteredIdxTxs.sort((a, b) => a - b); - return filteredIdxTxs.map(x => transactions[x]) + return filteredIdxTxs.map(x => txs[x]) } } From 46d1debf5fcf567f0127efdf01828d0e42d30d0f Mon Sep 17 00:00:00 2001 From: kenshin-samourai Date: Wed, 7 Apr 2021 14:25:30 +0200 Subject: [PATCH 2/3] clean code --- tracker/blockchain-processor.js | 1 - 1 file changed, 1 deletion(-) diff --git a/tracker/blockchain-processor.js b/tracker/blockchain-processor.js index 8f729c5..b06a7fd 100644 --- a/tracker/blockchain-processor.js +++ b/tracker/blockchain-processor.js @@ -14,7 +14,6 @@ const network = require('../lib/bitcoin/network') const keys = require('../keys')[network.key] const AbstractProcessor = require('./abstract-processor') const Block = require('./block') -const TransactionsBundle = require('./transactions-bundle') /** From 7833af350499bac4172bf341d093282e7e3c36a9 Mon Sep 17 00:00:00 2001 From: kenshin-samourai Date: Wed, 7 Apr 2021 14:10:35 +0200 Subject: [PATCH 3/3] switch to parallel processing of blocks by the tracker --- tracker/abstract-processor.js | 50 ------- tracker/block-worker.js | 173 +++++++++++++++++++++++++ tracker/block.js | 7 +- tracker/blockchain-processor.js | 80 ++---------- tracker/blocks-processor.js | 222 ++++++++++++++++++++++++++++++++ tracker/mempool-processor.js | 34 ++++- tracker/transaction.js | 2 +- tracker/transactions-bundle.js | 2 - 8 files changed, 443 insertions(+), 127 deletions(-) delete mode 100644 tracker/abstract-processor.js create mode 100644 tracker/block-worker.js create mode 100644 tracker/blocks-processor.js diff --git a/tracker/abstract-processor.js b/tracker/abstract-processor.js deleted file mode 100644 index 3f46325..0000000 --- a/tracker/abstract-processor.js +++ /dev/null @@ -1,50 +0,0 @@ -/*! - * tracker/abstract-processor.js - * Copyright © 2019 – Katana Cryptographic Ltd. All Rights Reserved. - */ -'use strict' - -const RpcClient = require('../lib/bitcoind-rpc/rpc-client') - - -/** - * An abstract class for tracker processors - */ -class AbstractProcessor { - - /** - * Constructor - * @param {object} notifSock - ZMQ socket used for notifications - */ - constructor(notifSock) { - // RPC client - this.client = new RpcClient() - // ZeroMQ socket for notifications sent to others components - this.notifSock = notifSock - } - - /** - * Notify a new transaction - * @param {object} tx - bitcoin transaction - */ - notifyTx(tx) { - // Real-time client updates for this transaction. - // Any address input or output present in transaction - // is a potential client to notify. - if (this.notifSock) - this.notifSock.send(['transaction', JSON.stringify(tx)]) - } - - /** - * Notify a new block - * @param {string} header - block header - */ - notifyBlock(header) { - // Notify clients of the block - if (this.notifSock) - this.notifSock.send(['block', JSON.stringify(header)]) - } - -} - -module.exports = AbstractProcessor diff --git a/tracker/block-worker.js b/tracker/block-worker.js new file mode 100644 index 0000000..9601aa8 --- /dev/null +++ b/tracker/block-worker.js @@ -0,0 +1,173 @@ +/*! + * tracker/block-worker.js + * Copyright © 2019 – Katana Cryptographic Ltd. All Rights Reserved. + */ +'use strict' + +const { isMainThread, parentPort } = require('worker_threads') +const network = require('../lib/bitcoin/network') +const keys = require('../keys')[network.key] +const db = require('../lib/db/mysql-db-wrapper') +const RpcClient = require('../lib/bitcoind-rpc/rpc-client') +const Block = require('./block') + + +/** + * STATUS + */ +const IDLE = 0 +module.exports.IDLE = IDLE + +const INITIALIZED = 1 +module.exports.INITIALIZED = INITIALIZED + +const OUTPUTS_PROCESSED = 2 +module.exports.OUTPUTS_PROCESSED = OUTPUTS_PROCESSED + +const INPUTS_PROCESSED = 3 +module.exports.INPUTS_PROCESSED = INPUTS_PROCESSED + +const TXS_CONFIRMED = 4 +module.exports.TXS_CONFIRMED = TXS_CONFIRMED + +/** + * OPS + */ +const OP_INIT = 0 +module.exports.OP_INIT = OP_INIT + +const OP_PROCESS_OUTPUTS = 1 +module.exports.OP_PROCESS_OUTPUTS = OP_PROCESS_OUTPUTS + +const OP_PROCESS_INPUTS = 2 +module.exports.OP_PROCESS_INPUTS = OP_PROCESS_INPUTS + +const OP_CONFIRM = 3 +module.exports.OP_CONFIRM = OP_CONFIRM + +const OP_RESET = 4 +module.exports.OP_RESET = OP_RESET + + + +/** + * Process message received by the worker + * @param {object} msg - message received by the worker + */ +async function processMessage(msg) { + let res = null + let success = true + + try { + switch(msg.op) { + case OP_INIT: + if (status != IDLE) + throw 'Operation not allowed' + res = await initBlock(msg.header) + break + case OP_PROCESS_OUTPUTS: + if (status != INITIALIZED) + throw 'Operation not allowed' + res = await processOutputs() + break + case OP_PROCESS_INPUTS: + if (status != OUTPUTS_PROCESSED) + throw 'Operation not allowed' + res = await processInputs() + break + case OP_CONFIRM: + if (status != INPUTS_PROCESSED) + throw 'Operation not allowed' + res = await confirmTransactions(msg.blockId) + break + case OP_RESET: + res = await reset() + break + default: + throw 'Invalid Operation' + } + } catch (e) { + success = false + res = e + } finally { + parentPort.postMessage({ + 'op': msg.op, + 'status': success, + 'res': res + }) + } +} + +/** + * Initialize the block + * @param {object} header - block header + */ +async function initBlock(header) { + status = INITIALIZED + const hex = await rpcClient.getblock(header.hash, false) + block = new Block(hex, header) + return true +} + +/** + * Process the transactions outputs + */ +async function processOutputs() { + status = OUTPUTS_PROCESSED + txsForBroadcast = await block.processOutputs() + return true +} + +/** + * Process the transactions inputs + */ +async function processInputs() { + status = INPUTS_PROCESSED + const txs = await block.processInputs() + txsForBroadcast = txsForBroadcast.concat(txs) + return true +} + +/** + * Confirm the transactions + * @param {integer} blockId - id of the block in db + */ +async function confirmTransactions(blockId) { + status = TXS_CONFIRMED + const aTxsForBroadcast = [...new Set(txsForBroadcast)] + await block.confirmTransactions(aTxsForBroadcast, blockId) + return aTxsForBroadcast +} + +/** + * Reset + */ +function reset() { + status = IDLE + block = null + txsForBroadcast = [] + return true +} + + +/** + * MAIN + */ +const rpcClient = new RpcClient() +let block = null +let txsForBroadcast = [] +let status = IDLE + +if (!isMainThread) { + db.connect({ + connectionLimit: keys.db.connectionLimitTracker, + acquireTimeout: keys.db.acquireTimeout, + host: keys.db.host, + user: keys.db.user, + password: keys.db.pass, + database: keys.db.database + }) + + reset() + parentPort.on('message', processMessage) +} diff --git a/tracker/block.js b/tracker/block.js index 22ff15b..a1fbf2a 100644 --- a/tracker/block.js +++ b/tracker/block.js @@ -39,6 +39,9 @@ class Block extends TransactionsBundle { /** * Register the block and transactions of interest in db + * @dev This method isn't used anymore. + * It has been replaced by a parallel processing of blocks. + * (see blocks-processor and block-worker) * @returns {Promise - object[]} returns an array of transactions to be broadcast */ async processBlock() { @@ -76,17 +79,13 @@ class Block extends TransactionsBundle { */ async processOutputs() { const txsForBroadcast = new Set() - console.time('prefilterByOutputs') const filteredTxs = await this.prefilterByOutputs() - console.timeEnd('prefilterByOutputs') - console.time('processOutputs') await util.parallelCall(filteredTxs, async filteredTx => { const tx = new Transaction(filteredTx) await tx.processOutputs() if (tx.doBroadcast) txsForBroadcast.add(tx.tx) }) - console.timeEnd('processOutputs') return [...txsForBroadcast] } diff --git a/tracker/blockchain-processor.js b/tracker/blockchain-processor.js index b06a7fd..695685d 100644 --- a/tracker/blockchain-processor.js +++ b/tracker/blockchain-processor.js @@ -11,28 +11,34 @@ const util = require('../lib/util') const Logger = require('../lib/logger') const db = require('../lib/db/mysql-db-wrapper') const network = require('../lib/bitcoin/network') +const RpcClient = require('../lib/bitcoind-rpc/rpc-client') const keys = require('../keys')[network.key] -const AbstractProcessor = require('./abstract-processor') const Block = require('./block') +const blocksProcessor = require('./blocks-processor') /** * A class allowing to process the blockchain */ -class BlockchainProcessor extends AbstractProcessor { +class BlockchainProcessor { /** * Constructor * @param {object} notifSock - ZMQ socket used for notifications */ constructor(notifSock) { - super(notifSock) + // RPC client + this.client = new RpcClient() // ZeroMQ socket for bitcoind blocks messages this.blkSock = null // Initialize a semaphor protecting the onBlockHash() method this._onBlockHashSemaphor = new Sema(1, { capacity: 50 }) + // Array of worker threads used for parallel processing of blocks + this.blockWorkers = [] // Flag tracking Initial Block Download Mode this.isIBD = true + // Initialize the blocks processor + blocksProcessor.init(notifSock) } /** @@ -155,7 +161,6 @@ class BlockchainProcessor extends AbstractProcessor { if (highest == null) return null if (daemonNbBlocks == highest.blockHeight) return null - // Compute blocks range to be processed const blockRange = _.range(highest.blockHeight, daemonNbBlocks + 1) Logger.info(`Tracker : Sync ${blockRange.length} blocks`) @@ -307,8 +312,6 @@ class BlockchainProcessor extends AbstractProcessor { await db.unconfirmTransactions(txids) } - // TODO: get accounts and notify of deletion ? - await db.deleteBlocksAfterHeight(height) } @@ -331,7 +334,6 @@ class BlockchainProcessor extends AbstractProcessor { Logger.info(`Blocks Rescan : starting a rescan for ${blockRange.length} blocks`) - // Process the blocks try { return this.processBlockRange(blockRange) } catch(e) { @@ -341,64 +343,14 @@ class BlockchainProcessor extends AbstractProcessor { } /** - * Process a series of blocks + * Process a list of blocks * @param {object[]} headers - array of block headers */ async processBlocks(headers) { - const MAX_NB_BLOCKS = 5 - const chunks = util.splitList(headers, MAX_NB_BLOCKS) - - return util.seriesCall(chunks, async chunk => { - const t0 = Date.now() - const sBlockRange = `${chunk[0].height}-${chunk[chunk.length-1].height}` - Logger.info(`Tracker : Beginning to process blocks ${sBlockRange}.`) - - const txsForBroadcast = new Map() - // Initialize the blocks and process the transaction outputs - console.time('outputs') - const blocks = await util.parallelCall(chunk, async header => { - // Get raw block hex string from bitcoind - const hex = await this.client.getblock(header.hash, false) - const block = new Block(hex, header) - // Process the transaction outputs - const txs = await block.processOutputs() - txsForBroadcast.set(block.header.hash, txs) - return block - }) - console.timeEnd('outputs') - // Process the transaction inputs - console.time('inputs') - await util.parallelCall(blocks, async block => { - const txs = await block.processInputs() - txsForBroadcast.set( - block.header.hash, - txs.concat(txsForBroadcast.get(block.header.hash)) - ) - }) - console.timeEnd('inputs') - // Sort the blocks by ascending height - blocks.sort((a,b) => a.header.height - b.header.height) - // Register the block and confirm the transactions - console.time('confirm') - await util.seriesCall(blocks, async block => { - const blockId = await block.registerBlock() - const aTxsForBroadcast = [...new Set(txsForBroadcast.get(block.header.hash))] - await block.confirmTransactions(aTxsForBroadcast, blockId) - }) - console.timeEnd('confirm') - - console.time('notify') - await util.parallelCall(blocks, async block => { - const aTxsForBroadcast = [...new Set(txsForBroadcast.get(block.header.hash))] - for (let tx of aTxsForBroadcast) - this.notifyTx(tx) - this.notifyBlock(block.header) - }) - console.timeEnd('notify') + const chunks = util.splitList(headers, blocksProcessor.nbWorkers) - const dt = ((Date.now()-t0)/1000).toFixed(1) - const per = ((Date.now()-t0)/MAX_NB_BLOCKS).toFixed(0) - Logger.info(`Tracker : Finished processing blocks ${sBlockRange}, ${dt}s, ${per}ms/block`) + await util.seriesCall(chunks, async chunk => { + return blocksProcessor.processChunk(chunk) }) } @@ -407,17 +359,13 @@ class BlockchainProcessor extends AbstractProcessor { * @param {int[]} heights - a range of block heights */ async processBlockRange(heights) { - const MAX_NB_BLOCKS = 5 - const chunks = util.splitList(heights, MAX_NB_BLOCKS) + const chunks = util.splitList(heights, blocksProcessor.nbWorkers) return util.seriesCall(chunks, async chunk => { - console.time('headers') const headers = await util.parallelCall(chunk, async height => { - // Get raw block hex string from bitcoind const hash = await this.client.getblockhash(height) return await this.client.getblockheader(hash) }) - console.timeEnd('headers') return this.processBlocks(headers) }) } diff --git a/tracker/blocks-processor.js b/tracker/blocks-processor.js new file mode 100644 index 0000000..c5c9f99 --- /dev/null +++ b/tracker/blocks-processor.js @@ -0,0 +1,222 @@ +/*! + * tracker/blocks-processor.js + * Copyright © 2019 – Katana Cryptographic Ltd. All Rights Reserved. + */ +'use strict' + +const os = require('os') +const Sema = require('async-sema') +const { Worker } = require('worker_threads') +const Logger = require('../lib/logger') +const util = require('../lib/util') +const dbProcessor = require('../lib/db/mysql-db-wrapper') +const blockWorker = require('./block-worker') + + +let notifSock = null +let blockWorkers = [] +let headersChunk = [] +let txsForBroadcast = [] +let t0 = null +let currentOp = null +let nbTasksEnqueued = 0 +let nbTasksCompleted = 0 + +// Semaphor protecting the processBloks() method +const _processBlocksSemaphor = new Sema(1) + +// Number of worker threads processing the blocks in parallel +const nbWorkers = os.cpus().length //- 1 +module.exports.nbWorkers = nbWorkers + + +/** + * Initialize the processor + * @param {object} notifSock - ZMQ socket used for notifications + */ +function init(notifSock) { + notifSock = notifSock + + for (let i = 0; i < nbWorkers; i++) { + const worker = new Worker( + `${__dirname}/block-worker.js`, + { workerData: null } + ) + worker.on('error', processWorkerError) + worker.on('message', processWorkerMessage) + blockWorkers.push(worker) + } +} +module.exports.init = init + +/** + * Process a chunk of block headers + * @param {object[]} chunk - array of block headers + */ +async function processChunk(chunk) { + // Acquire the semaphor (wait for previous chunk) + await _processBlocksSemaphor.acquire() + + t0 = Date.now() + const sBlockRange = `${chunk[0].height}-${chunk[chunk.length-1].height}` + Logger.info(`Tracker : Beginning to process blocks ${sBlockRange}`) + + // Process the chunk + chunk.sort((a,b) => a.height - b.height) + headersChunk = chunk + txsForBroadcast = [] + processTask(blockWorker.OP_INIT) +} +module.exports.processChunk = processChunk + +/** + * Process an error returned by a worker thread + * @param {object} e - error + */ +async function processWorkerError(e) { + return processWorkerMessage({ + 'op': currentOp, + 'status': false, + 'res': e + }) +} + +/** + * Process a message returned by a worker thread + * @param {object} msg - message sent by the worker thread + */ +async function processWorkerMessage(msg) { + nbTasksCompleted++ + + if (!msg.status) { + Logger.error(msg.res, 'Tracker : processWorkerMessage()') + } else if (msg.op == blockWorker.OP_CONFIRM) { + txsForBroadcast = txsForBroadcast.concat(msg.res) + } + + if (nbTasksCompleted == nbTasksEnqueued) { + switch (msg.op) { + case blockWorker.OP_INIT: + // Process the transaction outputs + processTask(blockWorker.OP_PROCESS_OUTPUTS) + break + case blockWorker.OP_PROCESS_OUTPUTS: + // Process the transaction inputs + processTask(blockWorker.OP_PROCESS_INPUTS) + break + case blockWorker.OP_PROCESS_INPUTS: + // Store the blocks in db and get their id + const blockIds = await util.seriesCall(headersChunk, async header => { + return registerBlock(header) + }) + // Confirm the transactions + processTask(blockWorker.OP_CONFIRM, blockIds) + break + case blockWorker.OP_CONFIRM: + // Notify new transactions and blocks + await Promise.all([ + util.parallelCall(txsForBroadcast, async tx => { + notifyTx(tx) + }), + util.parallelCall(headersChunk, async header => { + notifyBlock(header) + }) + ]) + // Process complete. Reset the workers + processTask(blockWorker.OP_RESET) + break + case blockWorker.OP_RESET: + const dt = ((Date.now()-t0)/1000).toFixed(1) + const per = ((Date.now()-t0)/headersChunk.length).toFixed(0) + const sBlockRange = `${headersChunk[0].height}-${headersChunk[headersChunk.length-1].height}` + Logger.info(`Tracker : Finished processing blocks ${sBlockRange}, ${dt}s, ${per}ms/block`) + // Release the semaphor + await _processBlocksSemaphor.release() + break + } + } +} + +/** + * Execute an operation processing a block + * @param {integer} op - operation + * @param {*} args + */ +function processTask(op, args) { + currentOp = op + nbTasksEnqueued = 0 + nbTasksCompleted = 0 + + switch (op) { + case blockWorker.OP_INIT: + for (let i = 0; i < headersChunk.length; i++) { + blockWorkers[i].postMessage({ + 'op': op, + 'header': headersChunk[i] + }) + nbTasksEnqueued++ + } + break + case blockWorker.OP_PROCESS_OUTPUTS: + case blockWorker.OP_PROCESS_INPUTS: + case blockWorker.OP_RESET: + for (let i = 0; i < headersChunk.length; i++) { + blockWorkers[i].postMessage({'op': op}) + nbTasksEnqueued++ + } + break + case blockWorker.OP_CONFIRM: + for (let i = 0; i < headersChunk.length; i++) { + blockWorkers[i].postMessage({ + 'op': op, + 'blockId': args[i] + }) + nbTasksEnqueued++ + } + break + default: + Logger.error(null, 'Tracker : processTask() : Unknown operation') + } +} + +/** + * Notify a new transaction + * @param {object} tx - bitcoin transaction + */ +function notifyTx(tx) { + // Real-time client updates for this transaction. + // Any address input or output present in transaction + // is a potential client to notify. + if (notifSock) + notifSock.send(['transaction', JSON.stringify(tx)]) +} + +/** + * Notify a new block + * @param {string} header - block header + */ +function notifyBlock(header) { + // Notify clients of the block + if (notifSock) + notifSock.send(['block', JSON.stringify(header)]) +} + +/** + * Store a block in db + * @param {object} header - block header + * @returns {Promise - int} returns the id of the block + */ +async function registerBlock(header) { + const prevBlock = await dbProcessor.getBlockByHash(header.previousblockhash) + const prevID = (prevBlock && prevBlock.blockID) ? prevBlock.blockID : null + + const blockId = await dbProcessor.addBlock({ + blockHeight: header.height, + blockHash: header.hash, + blockTime: header.time, + blockParent: prevID + }) + + Logger.info(`Tracker : Added block ${header.height} (id=${blockId})`) + return blockId +} diff --git a/tracker/mempool-processor.js b/tracker/mempool-processor.js index cdfb513..4420482 100644 --- a/tracker/mempool-processor.js +++ b/tracker/mempool-processor.js @@ -11,8 +11,8 @@ const util = require('../lib/util') const Logger = require('../lib/logger') const db = require('../lib/db/mysql-db-wrapper') const network = require('../lib/bitcoin/network') +const RpcClient = require('../lib/bitcoind-rpc/rpc-client') const keys = require('../keys')[network.key] -const AbstractProcessor = require('./abstract-processor') const Transaction = require('./transaction') const TransactionsBundle = require('./transactions-bundle') @@ -20,14 +20,17 @@ const TransactionsBundle = require('./transactions-bundle') /** * A class managing a buffer for the mempool */ -class MempoolProcessor extends AbstractProcessor { +class MempoolProcessor { /** * Constructor * @param {object} notifSock - ZMQ socket used for notifications */ constructor(notifSock) { - super(notifSock) + // RPC client + this.client = new RpcClient() + // ZeroMQ socket for notifications sent to others components + this.notifSock = notifSock // Mempool buffer this.mempoolBuffer = new TransactionsBundle() // ZeroMQ socket for bitcoind Txs messages @@ -221,6 +224,29 @@ class MempoolProcessor extends AbstractProcessor { } } + /** + * Notify a new transaction + * @param {object} tx - bitcoin transaction + */ + notifyTx(tx) { + // Real-time client updates for this transaction. + // Any address input or output present in transaction + // is a potential client to notify. + if (this.notifSock) + this.notifSock.send(['transaction', JSON.stringify(tx)]) + } + + /** + * Notify a new block + * @param {string} header - block header + */ + notifyBlock(header) { + // Notify clients of the block + if (this.notifSock) + this.notifSock.send(['block', JSON.stringify(header)]) + } + + /** * Check unconfirmed transactions * @returns {Promise} @@ -262,7 +288,7 @@ class MempoolProcessor extends AbstractProcessor { const ntx = unconfirmedTxs.length const dt = ((Date.now() - t0) / 1000).toFixed(1) const per = (ntx == 0) ? 0 : ((Date.now() - t0) / ntx).toFixed(0) - Logger.info(`Tracker : Finished processing unconfirmed transactions ${dt}s, ${ntx} tx, ${per}ms/tx`) + Logger.info(`Tracker : Finished processing unconfirmed transactions ${dt}s, ${ntx} tx, ${per}ms/tx`) } /** diff --git a/tracker/transaction.js b/tracker/transaction.js index 89ff08e..037e774 100644 --- a/tracker/transaction.js +++ b/tracker/transaction.js @@ -406,7 +406,7 @@ class Transaction { locktime: this.tx.locktime, }) - Logger.info(`Tracker : Storing transaction ${this.txid}`) + Logger.info(`Tracker : Storing transaction ${this.txid}`) } } diff --git a/tracker/transactions-bundle.js b/tracker/transactions-bundle.js index ef1b19b..a769e44 100644 --- a/tracker/transactions-bundle.js +++ b/tracker/transactions-bundle.js @@ -96,7 +96,6 @@ class TransactionsBundle { let indexedOutputs = {} // Index the transaction outputs - console.time('outputScript2Address') for (const i in txs) { const tx = txs[i] const txid = tx.getId() @@ -115,7 +114,6 @@ class TransactionsBundle { } catch (e) {} } } - console.timeEnd('outputScript2Address') // Prefilter const outRes = await db.getUngroupedHDAccountsByAddresses(addresses)