diff --git a/tracker/abstract-processor.js b/tracker/abstract-processor.js deleted file mode 100644 index 3f46325..0000000 --- a/tracker/abstract-processor.js +++ /dev/null @@ -1,50 +0,0 @@ -/*! - * tracker/abstract-processor.js - * Copyright © 2019 – Katana Cryptographic Ltd. All Rights Reserved. - */ -'use strict' - -const RpcClient = require('../lib/bitcoind-rpc/rpc-client') - - -/** - * An abstract class for tracker processors - */ -class AbstractProcessor { - - /** - * Constructor - * @param {object} notifSock - ZMQ socket used for notifications - */ - constructor(notifSock) { - // RPC client - this.client = new RpcClient() - // ZeroMQ socket for notifications sent to others components - this.notifSock = notifSock - } - - /** - * Notify a new transaction - * @param {object} tx - bitcoin transaction - */ - notifyTx(tx) { - // Real-time client updates for this transaction. - // Any address input or output present in transaction - // is a potential client to notify. - if (this.notifSock) - this.notifSock.send(['transaction', JSON.stringify(tx)]) - } - - /** - * Notify a new block - * @param {string} header - block header - */ - notifyBlock(header) { - // Notify clients of the block - if (this.notifSock) - this.notifSock.send(['block', JSON.stringify(header)]) - } - -} - -module.exports = AbstractProcessor diff --git a/tracker/block-worker.js b/tracker/block-worker.js new file mode 100644 index 0000000..9601aa8 --- /dev/null +++ b/tracker/block-worker.js @@ -0,0 +1,173 @@ +/*! + * tracker/block-worker.js + * Copyright © 2019 – Katana Cryptographic Ltd. All Rights Reserved. + */ +'use strict' + +const { isMainThread, parentPort } = require('worker_threads') +const network = require('../lib/bitcoin/network') +const keys = require('../keys')[network.key] +const db = require('../lib/db/mysql-db-wrapper') +const RpcClient = require('../lib/bitcoind-rpc/rpc-client') +const Block = require('./block') + + +/** + * STATUS + */ +const IDLE = 0 +module.exports.IDLE = IDLE + +const INITIALIZED = 1 +module.exports.INITIALIZED = INITIALIZED + +const OUTPUTS_PROCESSED = 2 +module.exports.OUTPUTS_PROCESSED = OUTPUTS_PROCESSED + +const INPUTS_PROCESSED = 3 +module.exports.INPUTS_PROCESSED = INPUTS_PROCESSED + +const TXS_CONFIRMED = 4 +module.exports.TXS_CONFIRMED = TXS_CONFIRMED + +/** + * OPS + */ +const OP_INIT = 0 +module.exports.OP_INIT = OP_INIT + +const OP_PROCESS_OUTPUTS = 1 +module.exports.OP_PROCESS_OUTPUTS = OP_PROCESS_OUTPUTS + +const OP_PROCESS_INPUTS = 2 +module.exports.OP_PROCESS_INPUTS = OP_PROCESS_INPUTS + +const OP_CONFIRM = 3 +module.exports.OP_CONFIRM = OP_CONFIRM + +const OP_RESET = 4 +module.exports.OP_RESET = OP_RESET + + + +/** + * Process message received by the worker + * @param {object} msg - message received by the worker + */ +async function processMessage(msg) { + let res = null + let success = true + + try { + switch(msg.op) { + case OP_INIT: + if (status != IDLE) + throw 'Operation not allowed' + res = await initBlock(msg.header) + break + case OP_PROCESS_OUTPUTS: + if (status != INITIALIZED) + throw 'Operation not allowed' + res = await processOutputs() + break + case OP_PROCESS_INPUTS: + if (status != OUTPUTS_PROCESSED) + throw 'Operation not allowed' + res = await processInputs() + break + case OP_CONFIRM: + if (status != INPUTS_PROCESSED) + throw 'Operation not allowed' + res = await confirmTransactions(msg.blockId) + break + case OP_RESET: + res = await reset() + break + default: + throw 'Invalid Operation' + } + } catch (e) { + success = false + res = e + } finally { + parentPort.postMessage({ + 'op': msg.op, + 'status': success, + 'res': res + }) + } +} + +/** + * Initialize the block + * @param {object} header - block header + */ +async function initBlock(header) { + status = INITIALIZED + const hex = await rpcClient.getblock(header.hash, false) + block = new Block(hex, header) + return true +} + +/** + * Process the transactions outputs + */ +async function processOutputs() { + status = OUTPUTS_PROCESSED + txsForBroadcast = await block.processOutputs() + return true +} + +/** + * Process the transactions inputs + */ +async function processInputs() { + status = INPUTS_PROCESSED + const txs = await block.processInputs() + txsForBroadcast = txsForBroadcast.concat(txs) + return true +} + +/** + * Confirm the transactions + * @param {integer} blockId - id of the block in db + */ +async function confirmTransactions(blockId) { + status = TXS_CONFIRMED + const aTxsForBroadcast = [...new Set(txsForBroadcast)] + await block.confirmTransactions(aTxsForBroadcast, blockId) + return aTxsForBroadcast +} + +/** + * Reset + */ +function reset() { + status = IDLE + block = null + txsForBroadcast = [] + return true +} + + +/** + * MAIN + */ +const rpcClient = new RpcClient() +let block = null +let txsForBroadcast = [] +let status = IDLE + +if (!isMainThread) { + db.connect({ + connectionLimit: keys.db.connectionLimitTracker, + acquireTimeout: keys.db.acquireTimeout, + host: keys.db.host, + user: keys.db.user, + password: keys.db.pass, + database: keys.db.database + }) + + reset() + parentPort.on('message', processMessage) +} diff --git a/tracker/block.js b/tracker/block.js index 22ff15b..a1fbf2a 100644 --- a/tracker/block.js +++ b/tracker/block.js @@ -39,6 +39,9 @@ class Block extends TransactionsBundle { /** * Register the block and transactions of interest in db + * @dev This method isn't used anymore. + * It has been replaced by a parallel processing of blocks. + * (see blocks-processor and block-worker) * @returns {Promise - object[]} returns an array of transactions to be broadcast */ async processBlock() { @@ -76,17 +79,13 @@ class Block extends TransactionsBundle { */ async processOutputs() { const txsForBroadcast = new Set() - console.time('prefilterByOutputs') const filteredTxs = await this.prefilterByOutputs() - console.timeEnd('prefilterByOutputs') - console.time('processOutputs') await util.parallelCall(filteredTxs, async filteredTx => { const tx = new Transaction(filteredTx) await tx.processOutputs() if (tx.doBroadcast) txsForBroadcast.add(tx.tx) }) - console.timeEnd('processOutputs') return [...txsForBroadcast] } diff --git a/tracker/blockchain-processor.js b/tracker/blockchain-processor.js index b06a7fd..695685d 100644 --- a/tracker/blockchain-processor.js +++ b/tracker/blockchain-processor.js @@ -11,28 +11,34 @@ const util = require('../lib/util') const Logger = require('../lib/logger') const db = require('../lib/db/mysql-db-wrapper') const network = require('../lib/bitcoin/network') +const RpcClient = require('../lib/bitcoind-rpc/rpc-client') const keys = require('../keys')[network.key] -const AbstractProcessor = require('./abstract-processor') const Block = require('./block') +const blocksProcessor = require('./blocks-processor') /** * A class allowing to process the blockchain */ -class BlockchainProcessor extends AbstractProcessor { +class BlockchainProcessor { /** * Constructor * @param {object} notifSock - ZMQ socket used for notifications */ constructor(notifSock) { - super(notifSock) + // RPC client + this.client = new RpcClient() // ZeroMQ socket for bitcoind blocks messages this.blkSock = null // Initialize a semaphor protecting the onBlockHash() method this._onBlockHashSemaphor = new Sema(1, { capacity: 50 }) + // Array of worker threads used for parallel processing of blocks + this.blockWorkers = [] // Flag tracking Initial Block Download Mode this.isIBD = true + // Initialize the blocks processor + blocksProcessor.init(notifSock) } /** @@ -155,7 +161,6 @@ class BlockchainProcessor extends AbstractProcessor { if (highest == null) return null if (daemonNbBlocks == highest.blockHeight) return null - // Compute blocks range to be processed const blockRange = _.range(highest.blockHeight, daemonNbBlocks + 1) Logger.info(`Tracker : Sync ${blockRange.length} blocks`) @@ -307,8 +312,6 @@ class BlockchainProcessor extends AbstractProcessor { await db.unconfirmTransactions(txids) } - // TODO: get accounts and notify of deletion ? - await db.deleteBlocksAfterHeight(height) } @@ -331,7 +334,6 @@ class BlockchainProcessor extends AbstractProcessor { Logger.info(`Blocks Rescan : starting a rescan for ${blockRange.length} blocks`) - // Process the blocks try { return this.processBlockRange(blockRange) } catch(e) { @@ -341,64 +343,14 @@ class BlockchainProcessor extends AbstractProcessor { } /** - * Process a series of blocks + * Process a list of blocks * @param {object[]} headers - array of block headers */ async processBlocks(headers) { - const MAX_NB_BLOCKS = 5 - const chunks = util.splitList(headers, MAX_NB_BLOCKS) - - return util.seriesCall(chunks, async chunk => { - const t0 = Date.now() - const sBlockRange = `${chunk[0].height}-${chunk[chunk.length-1].height}` - Logger.info(`Tracker : Beginning to process blocks ${sBlockRange}.`) - - const txsForBroadcast = new Map() - // Initialize the blocks and process the transaction outputs - console.time('outputs') - const blocks = await util.parallelCall(chunk, async header => { - // Get raw block hex string from bitcoind - const hex = await this.client.getblock(header.hash, false) - const block = new Block(hex, header) - // Process the transaction outputs - const txs = await block.processOutputs() - txsForBroadcast.set(block.header.hash, txs) - return block - }) - console.timeEnd('outputs') - // Process the transaction inputs - console.time('inputs') - await util.parallelCall(blocks, async block => { - const txs = await block.processInputs() - txsForBroadcast.set( - block.header.hash, - txs.concat(txsForBroadcast.get(block.header.hash)) - ) - }) - console.timeEnd('inputs') - // Sort the blocks by ascending height - blocks.sort((a,b) => a.header.height - b.header.height) - // Register the block and confirm the transactions - console.time('confirm') - await util.seriesCall(blocks, async block => { - const blockId = await block.registerBlock() - const aTxsForBroadcast = [...new Set(txsForBroadcast.get(block.header.hash))] - await block.confirmTransactions(aTxsForBroadcast, blockId) - }) - console.timeEnd('confirm') - - console.time('notify') - await util.parallelCall(blocks, async block => { - const aTxsForBroadcast = [...new Set(txsForBroadcast.get(block.header.hash))] - for (let tx of aTxsForBroadcast) - this.notifyTx(tx) - this.notifyBlock(block.header) - }) - console.timeEnd('notify') + const chunks = util.splitList(headers, blocksProcessor.nbWorkers) - const dt = ((Date.now()-t0)/1000).toFixed(1) - const per = ((Date.now()-t0)/MAX_NB_BLOCKS).toFixed(0) - Logger.info(`Tracker : Finished processing blocks ${sBlockRange}, ${dt}s, ${per}ms/block`) + await util.seriesCall(chunks, async chunk => { + return blocksProcessor.processChunk(chunk) }) } @@ -407,17 +359,13 @@ class BlockchainProcessor extends AbstractProcessor { * @param {int[]} heights - a range of block heights */ async processBlockRange(heights) { - const MAX_NB_BLOCKS = 5 - const chunks = util.splitList(heights, MAX_NB_BLOCKS) + const chunks = util.splitList(heights, blocksProcessor.nbWorkers) return util.seriesCall(chunks, async chunk => { - console.time('headers') const headers = await util.parallelCall(chunk, async height => { - // Get raw block hex string from bitcoind const hash = await this.client.getblockhash(height) return await this.client.getblockheader(hash) }) - console.timeEnd('headers') return this.processBlocks(headers) }) } diff --git a/tracker/blocks-processor.js b/tracker/blocks-processor.js new file mode 100644 index 0000000..c5c9f99 --- /dev/null +++ b/tracker/blocks-processor.js @@ -0,0 +1,222 @@ +/*! + * tracker/blocks-processor.js + * Copyright © 2019 – Katana Cryptographic Ltd. All Rights Reserved. + */ +'use strict' + +const os = require('os') +const Sema = require('async-sema') +const { Worker } = require('worker_threads') +const Logger = require('../lib/logger') +const util = require('../lib/util') +const dbProcessor = require('../lib/db/mysql-db-wrapper') +const blockWorker = require('./block-worker') + + +let notifSock = null +let blockWorkers = [] +let headersChunk = [] +let txsForBroadcast = [] +let t0 = null +let currentOp = null +let nbTasksEnqueued = 0 +let nbTasksCompleted = 0 + +// Semaphor protecting the processBloks() method +const _processBlocksSemaphor = new Sema(1) + +// Number of worker threads processing the blocks in parallel +const nbWorkers = os.cpus().length //- 1 +module.exports.nbWorkers = nbWorkers + + +/** + * Initialize the processor + * @param {object} notifSock - ZMQ socket used for notifications + */ +function init(notifSock) { + notifSock = notifSock + + for (let i = 0; i < nbWorkers; i++) { + const worker = new Worker( + `${__dirname}/block-worker.js`, + { workerData: null } + ) + worker.on('error', processWorkerError) + worker.on('message', processWorkerMessage) + blockWorkers.push(worker) + } +} +module.exports.init = init + +/** + * Process a chunk of block headers + * @param {object[]} chunk - array of block headers + */ +async function processChunk(chunk) { + // Acquire the semaphor (wait for previous chunk) + await _processBlocksSemaphor.acquire() + + t0 = Date.now() + const sBlockRange = `${chunk[0].height}-${chunk[chunk.length-1].height}` + Logger.info(`Tracker : Beginning to process blocks ${sBlockRange}`) + + // Process the chunk + chunk.sort((a,b) => a.height - b.height) + headersChunk = chunk + txsForBroadcast = [] + processTask(blockWorker.OP_INIT) +} +module.exports.processChunk = processChunk + +/** + * Process an error returned by a worker thread + * @param {object} e - error + */ +async function processWorkerError(e) { + return processWorkerMessage({ + 'op': currentOp, + 'status': false, + 'res': e + }) +} + +/** + * Process a message returned by a worker thread + * @param {object} msg - message sent by the worker thread + */ +async function processWorkerMessage(msg) { + nbTasksCompleted++ + + if (!msg.status) { + Logger.error(msg.res, 'Tracker : processWorkerMessage()') + } else if (msg.op == blockWorker.OP_CONFIRM) { + txsForBroadcast = txsForBroadcast.concat(msg.res) + } + + if (nbTasksCompleted == nbTasksEnqueued) { + switch (msg.op) { + case blockWorker.OP_INIT: + // Process the transaction outputs + processTask(blockWorker.OP_PROCESS_OUTPUTS) + break + case blockWorker.OP_PROCESS_OUTPUTS: + // Process the transaction inputs + processTask(blockWorker.OP_PROCESS_INPUTS) + break + case blockWorker.OP_PROCESS_INPUTS: + // Store the blocks in db and get their id + const blockIds = await util.seriesCall(headersChunk, async header => { + return registerBlock(header) + }) + // Confirm the transactions + processTask(blockWorker.OP_CONFIRM, blockIds) + break + case blockWorker.OP_CONFIRM: + // Notify new transactions and blocks + await Promise.all([ + util.parallelCall(txsForBroadcast, async tx => { + notifyTx(tx) + }), + util.parallelCall(headersChunk, async header => { + notifyBlock(header) + }) + ]) + // Process complete. Reset the workers + processTask(blockWorker.OP_RESET) + break + case blockWorker.OP_RESET: + const dt = ((Date.now()-t0)/1000).toFixed(1) + const per = ((Date.now()-t0)/headersChunk.length).toFixed(0) + const sBlockRange = `${headersChunk[0].height}-${headersChunk[headersChunk.length-1].height}` + Logger.info(`Tracker : Finished processing blocks ${sBlockRange}, ${dt}s, ${per}ms/block`) + // Release the semaphor + await _processBlocksSemaphor.release() + break + } + } +} + +/** + * Execute an operation processing a block + * @param {integer} op - operation + * @param {*} args + */ +function processTask(op, args) { + currentOp = op + nbTasksEnqueued = 0 + nbTasksCompleted = 0 + + switch (op) { + case blockWorker.OP_INIT: + for (let i = 0; i < headersChunk.length; i++) { + blockWorkers[i].postMessage({ + 'op': op, + 'header': headersChunk[i] + }) + nbTasksEnqueued++ + } + break + case blockWorker.OP_PROCESS_OUTPUTS: + case blockWorker.OP_PROCESS_INPUTS: + case blockWorker.OP_RESET: + for (let i = 0; i < headersChunk.length; i++) { + blockWorkers[i].postMessage({'op': op}) + nbTasksEnqueued++ + } + break + case blockWorker.OP_CONFIRM: + for (let i = 0; i < headersChunk.length; i++) { + blockWorkers[i].postMessage({ + 'op': op, + 'blockId': args[i] + }) + nbTasksEnqueued++ + } + break + default: + Logger.error(null, 'Tracker : processTask() : Unknown operation') + } +} + +/** + * Notify a new transaction + * @param {object} tx - bitcoin transaction + */ +function notifyTx(tx) { + // Real-time client updates for this transaction. + // Any address input or output present in transaction + // is a potential client to notify. + if (notifSock) + notifSock.send(['transaction', JSON.stringify(tx)]) +} + +/** + * Notify a new block + * @param {string} header - block header + */ +function notifyBlock(header) { + // Notify clients of the block + if (notifSock) + notifSock.send(['block', JSON.stringify(header)]) +} + +/** + * Store a block in db + * @param {object} header - block header + * @returns {Promise - int} returns the id of the block + */ +async function registerBlock(header) { + const prevBlock = await dbProcessor.getBlockByHash(header.previousblockhash) + const prevID = (prevBlock && prevBlock.blockID) ? prevBlock.blockID : null + + const blockId = await dbProcessor.addBlock({ + blockHeight: header.height, + blockHash: header.hash, + blockTime: header.time, + blockParent: prevID + }) + + Logger.info(`Tracker : Added block ${header.height} (id=${blockId})`) + return blockId +} diff --git a/tracker/mempool-processor.js b/tracker/mempool-processor.js index cdfb513..4420482 100644 --- a/tracker/mempool-processor.js +++ b/tracker/mempool-processor.js @@ -11,8 +11,8 @@ const util = require('../lib/util') const Logger = require('../lib/logger') const db = require('../lib/db/mysql-db-wrapper') const network = require('../lib/bitcoin/network') +const RpcClient = require('../lib/bitcoind-rpc/rpc-client') const keys = require('../keys')[network.key] -const AbstractProcessor = require('./abstract-processor') const Transaction = require('./transaction') const TransactionsBundle = require('./transactions-bundle') @@ -20,14 +20,17 @@ const TransactionsBundle = require('./transactions-bundle') /** * A class managing a buffer for the mempool */ -class MempoolProcessor extends AbstractProcessor { +class MempoolProcessor { /** * Constructor * @param {object} notifSock - ZMQ socket used for notifications */ constructor(notifSock) { - super(notifSock) + // RPC client + this.client = new RpcClient() + // ZeroMQ socket for notifications sent to others components + this.notifSock = notifSock // Mempool buffer this.mempoolBuffer = new TransactionsBundle() // ZeroMQ socket for bitcoind Txs messages @@ -221,6 +224,29 @@ class MempoolProcessor extends AbstractProcessor { } } + /** + * Notify a new transaction + * @param {object} tx - bitcoin transaction + */ + notifyTx(tx) { + // Real-time client updates for this transaction. + // Any address input or output present in transaction + // is a potential client to notify. + if (this.notifSock) + this.notifSock.send(['transaction', JSON.stringify(tx)]) + } + + /** + * Notify a new block + * @param {string} header - block header + */ + notifyBlock(header) { + // Notify clients of the block + if (this.notifSock) + this.notifSock.send(['block', JSON.stringify(header)]) + } + + /** * Check unconfirmed transactions * @returns {Promise} @@ -262,7 +288,7 @@ class MempoolProcessor extends AbstractProcessor { const ntx = unconfirmedTxs.length const dt = ((Date.now() - t0) / 1000).toFixed(1) const per = (ntx == 0) ? 0 : ((Date.now() - t0) / ntx).toFixed(0) - Logger.info(`Tracker : Finished processing unconfirmed transactions ${dt}s, ${ntx} tx, ${per}ms/tx`) + Logger.info(`Tracker : Finished processing unconfirmed transactions ${dt}s, ${ntx} tx, ${per}ms/tx`) } /** diff --git a/tracker/transaction.js b/tracker/transaction.js index 89ff08e..037e774 100644 --- a/tracker/transaction.js +++ b/tracker/transaction.js @@ -406,7 +406,7 @@ class Transaction { locktime: this.tx.locktime, }) - Logger.info(`Tracker : Storing transaction ${this.txid}`) + Logger.info(`Tracker : Storing transaction ${this.txid}`) } } diff --git a/tracker/transactions-bundle.js b/tracker/transactions-bundle.js index ef1b19b..a769e44 100644 --- a/tracker/transactions-bundle.js +++ b/tracker/transactions-bundle.js @@ -96,7 +96,6 @@ class TransactionsBundle { let indexedOutputs = {} // Index the transaction outputs - console.time('outputScript2Address') for (const i in txs) { const tx = txs[i] const txid = tx.getId() @@ -115,7 +114,6 @@ class TransactionsBundle { } catch (e) {} } } - console.timeEnd('outputScript2Address') // Prefilter const outRes = await db.getUngroupedHDAccountsByAddresses(addresses)