diff --git a/static/admin/dmt/xpubs-tools/xpubs-tools.js b/static/admin/dmt/xpubs-tools/xpubs-tools.js
index 2ada7e5..20d93cc 100644
--- a/static/admin/dmt/xpubs-tools/xpubs-tools.js
+++ b/static/admin/dmt/xpubs-tools/xpubs-tools.js
@@ -12,10 +12,13 @@ const screenXpubsToolsScript = {
$('#btn-xpub-details-reset').click(() => {this.showSearchForm()})
$('#btn-xpub-details-rescan').click(() => {this.showRescanForm()})
$('#btn-xpub-details-delete').click(() => {this.showDeletionForm()})
+ $('#btn-xpub-details-export').click(() => {this.showExportForm()})
$('#btn-xpub-rescan-go').click(() => {this.rescanXpub()})
$('#btn-xpub-rescan-cancel').click(() => {this.hideRescanForm()})
$('#btn-xpub-delete-go').click(() => {this.deleteXpub()})
$('#btn-xpub-delete-cancel').click(() => {this.hideDeletionForm()})
+ $('#btn-xpub-export-go').click(() => {this.exportXpubHistory()})
+ $('#btn-xpub-export-cancel').click(() => {this.hideExportForm()})
$('#btn-xpub-import-go').click(() => {this.importXpub()})
$('#btn-xpub-details-retype').click(() => {this.showImportForm(true)})
$('#btn-xpub-import-cancel').click(() => {this.hideImportForm(this.isReimport)})
@@ -27,8 +30,15 @@ const screenXpubsToolsScript = {
},
preparePage: function() {
+ // Disable custom lookahead if data source is a third party explorer
+ const isTPE = sessionStorage.getItem('indexerType') == 'third_party_explorer'
+ const isLRI = sessionStorage.getItem('indexerType') == 'local_rest_indexer'
+ const disableLookahead = isTPE || isLRI
+ $('#rescan-lookahead').prop('disabled', disableLookahead)
+
this.hideRescanForm()
this.hideDeletionForm()
+ this.hideExportForm()
this.showSearchForm()
$("#xpub").focus()
},
@@ -144,6 +154,35 @@ const screenXpubsToolsScript = {
})
},
+ exportXpubHistory: function() {
+ lib_msg.displayMessage('Exporting the transactional history of this xpub. Please wait...')
+
+ const args = {
+ 'active': this.currentXpub,
+ 'page': 0,
+ 'count': 1000000000
+ }
+
+ if ($('#export-type').val() == 'notNull')
+ args['excludeNullXfer'] = 1
+
+ return lib_api.getTransactions(args)
+ .then(result => {
+ if (result['txs'] && result['txs'].length > 0) {
+ let content = 'data:text/csv;charset=utf-8,'
+ content += 'height,txid,date,flow\n'
+ for (let tx of result['txs'])
+ content += `${tx['block_height']},${tx['hash']},${new Date(tx['time']*1000).toString()},${tx['result']/100000000}\n`
+ const encodedURI = encodeURI(content)
+ window.open(encodedURI)
+ }
+ this.hideExportForm()
+ lib_msg.displayInfo('Transactional history successfully exported.')
+ }).catch(e => {
+ lib_errors.processError(e)
+ })
+ },
+
checkRescanStatus: function(callback) {
this.rescanStatusTimerId = setTimeout(() => {
lib_api.getXpubRescanStatus(this.currentXpub)
@@ -308,6 +347,17 @@ const screenXpubsToolsScript = {
$('#xpubs-tool-actions').show()
},
+ showExportForm: function() {
+ $('#xpubs-tool-actions').hide()
+ $('#xpubs-export-actions').show()
+ lib_msg.cleanMessagesUi()
+ },
+
+ hideExportForm: function() {
+ $('#xpubs-export-actions').hide()
+ $('#xpubs-tool-actions').show()
+ },
+
}
screenScripts.set('#screen-xpubs-tools', screenXpubsToolsScript)
diff --git a/static/admin/lib/api-wrapper.js b/static/admin/lib/api-wrapper.js
index 2fdae1d..6dfbcb3 100644
--- a/static/admin/lib/api-wrapper.js
+++ b/static/admin/lib/api-wrapper.js
@@ -156,6 +156,14 @@ const lib_api = {
)
},
+ /**
+ * Transactions
+ */
+ getTransactions: function(arguments) {
+ let uri = this.baseUri + '/txs'
+ return this.sendGetUriEncoded(uri, arguments)
+ },
+
/**
* Rescans a range of blocks
*/
diff --git a/static/admin/lib/auth-utils.js b/static/admin/lib/auth-utils.js
index 656a3d7..e17557b 100644
--- a/static/admin/lib/auth-utils.js
+++ b/static/admin/lib/auth-utils.js
@@ -139,6 +139,7 @@ const lib_auth = {
this.setRefreshToken(null)
this.setAccessToken(null)
sessionStorage.setItem('activeTab', '')
+ sessionStorage.setItem('indexerType', '')
lib_cmn.goToHomePage()
}
diff --git a/tracker/abstract-processor.js b/tracker/abstract-processor.js
deleted file mode 100644
index 3f46325..0000000
--- a/tracker/abstract-processor.js
+++ /dev/null
@@ -1,50 +0,0 @@
-/*!
- * tracker/abstract-processor.js
- * Copyright © 2019 – Katana Cryptographic Ltd. All Rights Reserved.
- */
-'use strict'
-
-const RpcClient = require('../lib/bitcoind-rpc/rpc-client')
-
-
-/**
- * An abstract class for tracker processors
- */
-class AbstractProcessor {
-
- /**
- * Constructor
- * @param {object} notifSock - ZMQ socket used for notifications
- */
- constructor(notifSock) {
- // RPC client
- this.client = new RpcClient()
- // ZeroMQ socket for notifications sent to others components
- this.notifSock = notifSock
- }
-
- /**
- * Notify a new transaction
- * @param {object} tx - bitcoin transaction
- */
- notifyTx(tx) {
- // Real-time client updates for this transaction.
- // Any address input or output present in transaction
- // is a potential client to notify.
- if (this.notifSock)
- this.notifSock.send(['transaction', JSON.stringify(tx)])
- }
-
- /**
- * Notify a new block
- * @param {string} header - block header
- */
- notifyBlock(header) {
- // Notify clients of the block
- if (this.notifSock)
- this.notifSock.send(['block', JSON.stringify(header)])
- }
-
-}
-
-module.exports = AbstractProcessor
diff --git a/tracker/block-worker.js b/tracker/block-worker.js
new file mode 100644
index 0000000..f19a2ef
--- /dev/null
+++ b/tracker/block-worker.js
@@ -0,0 +1,173 @@
+/*!
+ * tracker/block-worker.js
+ * Copyright © 2019 – Katana Cryptographic Ltd. All Rights Reserved.
+ */
+'use strict'
+
+const { isMainThread, parentPort } = require('worker_threads')
+const network = require('../lib/bitcoin/network')
+const keys = require('../keys')[network.key]
+const db = require('../lib/db/mysql-db-wrapper')
+const { createRpcClient } = require('../lib/bitcoind-rpc/rpc-client')
+const Block = require('./block')
+
+
+/**
+ * STATUS
+ */
+const IDLE = 0
+module.exports.IDLE = IDLE
+
+const INITIALIZED = 1
+module.exports.INITIALIZED = INITIALIZED
+
+const OUTPUTS_PROCESSED = 2
+module.exports.OUTPUTS_PROCESSED = OUTPUTS_PROCESSED
+
+const INPUTS_PROCESSED = 3
+module.exports.INPUTS_PROCESSED = INPUTS_PROCESSED
+
+const TXS_CONFIRMED = 4
+module.exports.TXS_CONFIRMED = TXS_CONFIRMED
+
+/**
+ * OPS
+ */
+const OP_INIT = 0
+module.exports.OP_INIT = OP_INIT
+
+const OP_PROCESS_OUTPUTS = 1
+module.exports.OP_PROCESS_OUTPUTS = OP_PROCESS_OUTPUTS
+
+const OP_PROCESS_INPUTS = 2
+module.exports.OP_PROCESS_INPUTS = OP_PROCESS_INPUTS
+
+const OP_CONFIRM = 3
+module.exports.OP_CONFIRM = OP_CONFIRM
+
+const OP_RESET = 4
+module.exports.OP_RESET = OP_RESET
+
+
+
+/**
+ * Process message received by the worker
+ * @param {object} msg - message received by the worker
+ */
+async function processMessage(msg) {
+ let res = null
+ let success = true
+
+ try {
+ switch(msg.op) {
+ case OP_INIT:
+ if (status != IDLE)
+ throw 'Operation not allowed'
+ res = await initBlock(msg.header)
+ break
+ case OP_PROCESS_OUTPUTS:
+ if (status != INITIALIZED)
+ throw 'Operation not allowed'
+ res = await processOutputs()
+ break
+ case OP_PROCESS_INPUTS:
+ if (status != OUTPUTS_PROCESSED)
+ throw 'Operation not allowed'
+ res = await processInputs()
+ break
+ case OP_CONFIRM:
+ if (status != INPUTS_PROCESSED)
+ throw 'Operation not allowed'
+ res = await confirmTransactions(msg.blockId)
+ break
+ case OP_RESET:
+ res = await reset()
+ break
+ default:
+ throw 'Invalid Operation'
+ }
+ } catch (e) {
+ success = false
+ res = e
+ } finally {
+ parentPort.postMessage({
+ 'op': msg.op,
+ 'status': success,
+ 'res': res
+ })
+ }
+}
+
+/**
+ * Initialize the block
+ * @param {object} header - block header
+ */
+async function initBlock(header) {
+ status = INITIALIZED
+ const hex = await rpcClient.getblock({ blockhash: header.hash, verbosity: 0 })
+ block = new Block(hex, header)
+ return true
+}
+
+/**
+ * Process the transactions outputs
+ */
+async function processOutputs() {
+ status = OUTPUTS_PROCESSED
+ txsForBroadcast = await block.processOutputs()
+ return true
+}
+
+/**
+ * Process the transactions inputs
+ */
+async function processInputs() {
+ status = INPUTS_PROCESSED
+ const txs = await block.processInputs()
+ txsForBroadcast = txsForBroadcast.concat(txs)
+ return true
+}
+
+/**
+ * Confirm the transactions
+ * @param {integer} blockId - id of the block in db
+ */
+async function confirmTransactions(blockId) {
+ status = TXS_CONFIRMED
+ const aTxsForBroadcast = [...new Set(txsForBroadcast)]
+ await block.confirmTransactions(aTxsForBroadcast, blockId)
+ return aTxsForBroadcast
+}
+
+/**
+ * Reset
+ */
+function reset() {
+ status = IDLE
+ block = null
+ txsForBroadcast = []
+ return true
+}
+
+
+/**
+ * MAIN
+ */
+const rpcClient = createRpcClient()
+let block = null
+let txsForBroadcast = []
+let status = IDLE
+
+if (!isMainThread) {
+ db.connect({
+ connectionLimit: keys.db.connectionLimitTracker,
+ acquireTimeout: keys.db.acquireTimeout,
+ host: keys.db.host,
+ user: keys.db.user,
+ password: keys.db.pass,
+ database: keys.db.database
+ })
+
+ reset()
+ parentPort.on('message', processMessage)
+}
diff --git a/tracker/block.js b/tracker/block.js
index b8cfe91..55161e2 100644
--- a/tracker/block.js
+++ b/tracker/block.js
@@ -26,44 +26,92 @@ class Block extends TransactionsBundle {
super()
this.hex = hex
this.header = header
+
+ try {
+ if (hex != null) {
+ const block = bitcoin.Block.fromHex(hex)
+ this.transactions = block.transactions
+ }
+ } catch (e) {
+ Logger.error(e, 'Tracker : Block()')
+ Logger.error(null, header)
+ return Promise.reject(e)
+ }
}
/**
* Register the block and transactions of interest in db
+ * @dev This method isn't used anymore.
+ * It has been replaced by a parallel processing of blocks.
+ * (see blocks-processor and block-worker)
* @returns {Promise - object[]} returns an array of transactions to be broadcast
*/
- async checkBlock() {
+ async processBlock() {
Logger.info('Tracker : Beginning to process new block.')
- let block
- const txsForBroadcast = []
-
- try {
- block = bitcoin.Block.fromHex(this.hex)
- this.transactions = block.transactions
- } catch (e) {
- Logger.error(e, 'Tracker : Block.checkBlock()')
- Logger.error(null, this.header)
- return Promise.reject(e)
- }
-
const t0 = Date.now()
- let ntx = 0
-
- // Filter transactions
- const filteredTxs = await this.prefilterTransactions()
-
- // Check filtered transactions
- // and broadcast notifications
- await util.seriesCall(filteredTxs, async tx => {
- const filteredTx = new Transaction(tx)
- const txCheck = await filteredTx.checkTransaction()
- if (txCheck && txCheck.broadcast)
- txsForBroadcast.push(txCheck.tx)
+
+ const txsForBroadcast = new Map()
+
+ const txsForBroadcast1 = await this.processOutputs()
+ txsForBroadcast1.map(tx => {txsForBroadcast.set(tx.getId(), tx)})
+
+ const txsForBroadcast2 = await this.processInputs()
+ txsForBroadcast2.map(tx => {txsForBroadcast.set(tx.getId(), tx)})
+
+ const aTxsForBroadcast = [...txsForBroadcast.values()]
+
+ const blockId = await this.registerBlock()
+
+ await this.confirmTransactions(aTxsForBroadcast, blockId)
+
+ // Logs and result returned
+ const ntx = this.transactions.length
+ const dt = ((Date.now()-t0)/1000).toFixed(1)
+ const per = ((Date.now()-t0)/ntx).toFixed(0)
+ Logger.info(`Tracker : Finished block ${this.header.height}, ${dt}s, ${ntx} tx, ${per}ms/tx`)
+
+ return aTxsForBroadcast
+ }
+
+
+ /**
+ * Process the transaction outputs
+ * @returns {Promise - object[]} returns an array of transactions to be broadcast
+ */
+ async processOutputs() {
+ const txsForBroadcast = new Set()
+ const filteredTxs = await this.prefilterByOutputs()
+ await util.parallelCall(filteredTxs, async filteredTx => {
+ const tx = new Transaction(filteredTx)
+ await tx.processOutputs()
+ if (tx.doBroadcast)
+ txsForBroadcast.add(tx.tx)
+ })
+ return [...txsForBroadcast]
+ }
+
+ /**
+ * Process the transaction inputs
+ * @returns {Promise - object[]} returns an array of transactions to be broadcast
+ */
+ async processInputs() {
+ const txsForBroadcast = new Set()
+ const filteredTxs = await this.prefilterByInputs()
+ await util.parallelCall(filteredTxs, async filteredTx => {
+ const tx = new Transaction(filteredTx)
+ await tx.processInputs()
+ if (tx.doBroadcast)
+ txsForBroadcast.add(tx.tx)
})
+ return [...txsForBroadcast]
+ }
- // Retrieve the previous block
- // and store the new block into the database
+ /**
+ * Store the block in db
+ * @returns {Promise - int} returns the id of the block
+ */
+ async registerBlock() {
const prevBlock = await db.getBlockByHash(this.header.previousblockhash)
const prevID = (prevBlock && prevBlock.blockID) ? prevBlock.blockID : null
@@ -76,18 +124,19 @@ class Block extends TransactionsBundle {
Logger.info(`Tracker : Added block ${this.header.height} (id=${blockId})`)
- // Confirms the transactions
- const txids = this.transactions.map(t => t.getId())
- ntx = txids.length
- const txidLists = util.splitList(txids, 100)
- await util.seriesCall(txidLists, list => db.confirmTransactions(list, blockId))
-
- // Logs and result returned
- const dt = ((Date.now()-t0)/1000).toFixed(1)
- const per = ((Date.now()-t0)/ntx).toFixed(0)
- Logger.info(`Tracker : Finished block ${this.header.height}, ${dt}s, ${ntx} tx, ${per}ms/tx`)
+ return blockId
+ }
- return txsForBroadcast
+ /**
+ * Confirm the transactions in db
+ * @param {Set} txs - set of transactions stored in db
+ * @param {int} blockId - id of the block
+ * r@returns {Promise}
+ */
+ async confirmTransactions(txs, blockId) {
+ const txids = txs.map(t => t.getId())
+ const txidLists = util.splitList(txids, 100)
+ return util.parallelCall(txidLists, list => db.confirmTransactions(list, blockId))
}
/**
diff --git a/tracker/blockchain-processor.js b/tracker/blockchain-processor.js
index f0ee571..15185de 100644
--- a/tracker/blockchain-processor.js
+++ b/tracker/blockchain-processor.js
@@ -11,29 +11,34 @@ const util = require('../lib/util')
const Logger = require('../lib/logger')
const db = require('../lib/db/mysql-db-wrapper')
const network = require('../lib/bitcoin/network')
+const { createRpcClient } = require('../lib/bitcoind-rpc/rpc-client')
const keys = require('../keys')[network.key]
-const AbstractProcessor = require('./abstract-processor')
const Block = require('./block')
-const TransactionsBundle = require('./transactions-bundle')
+const blocksProcessor = require('./blocks-processor')
/**
* A class allowing to process the blockchain
*/
-class BlockchainProcessor extends AbstractProcessor {
+class BlockchainProcessor {
/**
* Constructor
* @param {object} notifSock - ZMQ socket used for notifications
*/
constructor(notifSock) {
- super(notifSock)
+ // RPC client
+ this.client = createRpcClient()
// ZeroMQ socket for bitcoind blocks messages
this.blkSock = null
// Initialize a semaphor protecting the onBlockHash() method
this._onBlockHashSemaphor = new Sema(1, { capacity: 50 })
+ // Array of worker threads used for parallel processing of blocks
+ this.blockWorkers = []
// Flag tracking Initial Block Download Mode
this.isIBD = true
+ // Initialize the blocks processor
+ blocksProcessor.init(notifSock)
}
/**
@@ -55,8 +60,7 @@ class BlockchainProcessor extends AbstractProcessor {
* @returns {Promise}
*/
async catchup() {
- const highest = await db.getHighestBlock()
- const info = await this.client.getblockchaininfo()
+ const [highest, info] = await Promise.all([db.getHighestBlock(), this.client.getblockchaininfo()])
const daemonNbHeaders = info.headers
// Consider that we are in IBD mode if Dojo is far in the past (> 13,000 blocks)
@@ -80,12 +84,11 @@ class BlockchainProcessor extends AbstractProcessor {
try {
Logger.info('Tracker : Tracker Startup (IBD mode)')
- const info = await this.client.getblockchaininfo()
+ // Get highest block processed by the tracker
+ const [highest, info] = await Promise.all([db.getHighestBlock(), this.client.getblockchaininfo()])
const daemonNbBlocks = info.blocks
const daemonNbHeaders = info.headers
- // Get highest block processed by the tracker
- const highest = await db.getHighestBlock()
const dbMaxHeight = highest.blockHeight
let prevBlockId = highest.blockID
@@ -114,8 +117,8 @@ class BlockchainProcessor extends AbstractProcessor {
await util.seriesCall(blockRange, async height => {
try {
- const blockHash = await this.client.getblockhash(height)
- const header = await this.client.getblockheader(blockHash, true)
+ const blockHash = await this.client.getblockhash({ height })
+ const header = await this.client.getblockheader({ blockhash: blockHash, verbose: true })
prevBlockId = await this.processBlockHeader(header, prevBlockId)
} catch(e) {
Logger.error(e, 'Tracker : BlockchainProcessor.catchupIBDMode()')
@@ -151,30 +154,23 @@ class BlockchainProcessor extends AbstractProcessor {
try {
Logger.info('Tracker : Tracker Startup (normal mode)')
- const info = await this.client.getblockchaininfo()
+ // Get highest block processed by the tracker
+ const [highest, info] = await Promise.all([db.getHighestBlock(), this.client.getblockchaininfo()])
const daemonNbBlocks = info.blocks
- // Get highest block processed by the tracker
- const highest = await db.getHighestBlock()
if (highest == null) return null
if (daemonNbBlocks == highest.blockHeight) return null
- // Compute blocks range to be processed
const blockRange = _.range(highest.blockHeight, daemonNbBlocks + 1)
Logger.info(`Tracker : Sync ${blockRange.length} blocks`)
- // Process the blocks
- return util.seriesCall(blockRange, async height => {
- try {
- const hash = await this.client.getblockhash(height)
- const header = await this.client.getblockheader(hash)
- return this.processBlock(header)
- } catch(e) {
- Logger.error(e, 'Tracker : BlockchainProcessor.catchupNormalMode()')
- process.exit()
- }
- }, 'Tracker syncing', true)
+ try {
+ return this.processBlockRange(blockRange)
+ } catch(e) {
+ Logger.error(e, 'Tracker : BlockchainProcessor.catchupNormalMode()')
+ process.exit()
+ }
} catch(e) {
Logger.error(e, 'Tracker : BlockchainProcessor.catchupNormalMode()')
@@ -240,7 +236,7 @@ class BlockchainProcessor extends AbstractProcessor {
let headers = null
try {
- const header = await this.client.getblockheader(blockHash, true)
+ const header = await this.client.getblockheader({ blockhash: blockHash, verbose: true })
Logger.info(`Tracker : Block #${header.height} ${blockHash}`)
// Grab all headers between this block and last known
headers = await this.chainBacktrace([header])
@@ -262,9 +258,7 @@ class BlockchainProcessor extends AbstractProcessor {
await this.rewind(knownHeight)
// Process the blocks
- return await util.seriesCall(headers, header => {
- return this.processBlock(header)
- })
+ return await this.processBlocks(headers)
} catch(e) {
Logger.error(e, 'Tracker : BlockchainProcessor.onBlockHash()')
@@ -292,7 +286,7 @@ class BlockchainProcessor extends AbstractProcessor {
if (block == null) {
// Previous block does not exist in database. Grab from bitcoind
- const header = await this.client.getblockheader(deepest.previousblockhash, true)
+ const header = await this.client.getblockheader({ blockhash: deepest.previousblockhash, verbose: true })
headers.push(header)
return this.chainBacktrace(headers)
} else {
@@ -318,8 +312,6 @@ class BlockchainProcessor extends AbstractProcessor {
await db.unconfirmTransactions(txids)
}
- // TODO: get accounts and notify of deletion ?
-
await db.deleteBlocksAfterHeight(height)
}
@@ -342,45 +334,40 @@ class BlockchainProcessor extends AbstractProcessor {
Logger.info(`Blocks Rescan : starting a rescan for ${blockRange.length} blocks`)
- // Process the blocks
- return util.seriesCall(blockRange, async height => {
- try {
- Logger.info(`Tracker : Rescanning block ${height}`)
- const hash = await this.client.getblockhash(height)
- const header = await this.client.getblockheader(hash)
- return this.processBlock(header)
- } catch(e) {
- Logger.error(e, 'Tracker : BlockchainProcessor.rescan()')
- throw e
- }
- }, 'Tracker rescan', true)
+ try {
+ return this.processBlockRange(blockRange)
+ } catch(e) {
+ Logger.error(e, 'Tracker : BlockchainProcessor.rescan()')
+ throw e
+ }
}
/**
- * Process a block
- * @param {object} header - block header
- * @returns {Promise}
+ * Process a list of blocks
+ * @param {object[]} headers - array of block headers
*/
- async processBlock(header) {
- try {
- // Get raw block hex string from bitcoind
- const hex = await this.client.getblock(header.hash, false)
-
- const block = new Block(hex, header)
-
- const txsForBroadcast = await block.checkBlock()
+ async processBlocks(headers) {
+ const chunks = util.splitList(headers, blocksProcessor.nbWorkers)
- // Send notifications
- for (let tx of txsForBroadcast)
- this.notifyTx(tx)
+ await util.seriesCall(chunks, async chunk => {
+ return blocksProcessor.processChunk(chunk)
+ })
+ }
- this.notifyBlock(header)
+ /**
+ * Process a range of blocks
+ * @param {int[]} heights - a range of block heights
+ */
+ async processBlockRange(heights) {
+ const chunks = util.splitList(heights, blocksProcessor.nbWorkers)
- } catch(e) {
- // The show must go on.
- // TODO: further notification that this block did not check out
- Logger.error(e, 'Tracker : BlockchainProcessor.processBlock()')
- }
+ return util.seriesCall(chunks, async chunk => {
+ const headers = await util.parallelCall(chunk, async height => {
+ const hash = await this.client.getblockhash({ height })
+ return await this.client.getblockheader({ blockhash: hash })
+ })
+ return this.processBlocks(headers)
+ })
}
/**
diff --git a/tracker/blocks-processor.js b/tracker/blocks-processor.js
new file mode 100644
index 0000000..c5c9f99
--- /dev/null
+++ b/tracker/blocks-processor.js
@@ -0,0 +1,222 @@
+/*!
+ * tracker/blocks-processor.js
+ * Copyright © 2019 – Katana Cryptographic Ltd. All Rights Reserved.
+ */
+'use strict'
+
+const os = require('os')
+const Sema = require('async-sema')
+const { Worker } = require('worker_threads')
+const Logger = require('../lib/logger')
+const util = require('../lib/util')
+const dbProcessor = require('../lib/db/mysql-db-wrapper')
+const blockWorker = require('./block-worker')
+
+
+let notifSock = null
+let blockWorkers = []
+let headersChunk = []
+let txsForBroadcast = []
+let t0 = null
+let currentOp = null
+let nbTasksEnqueued = 0
+let nbTasksCompleted = 0
+
+// Semaphor protecting the processBloks() method
+const _processBlocksSemaphor = new Sema(1)
+
+// Number of worker threads processing the blocks in parallel
+const nbWorkers = os.cpus().length //- 1
+module.exports.nbWorkers = nbWorkers
+
+
+/**
+ * Initialize the processor
+ * @param {object} notifSock - ZMQ socket used for notifications
+ */
+function init(notifSock) {
+ notifSock = notifSock
+
+ for (let i = 0; i < nbWorkers; i++) {
+ const worker = new Worker(
+ `${__dirname}/block-worker.js`,
+ { workerData: null }
+ )
+ worker.on('error', processWorkerError)
+ worker.on('message', processWorkerMessage)
+ blockWorkers.push(worker)
+ }
+}
+module.exports.init = init
+
+/**
+ * Process a chunk of block headers
+ * @param {object[]} chunk - array of block headers
+ */
+async function processChunk(chunk) {
+ // Acquire the semaphor (wait for previous chunk)
+ await _processBlocksSemaphor.acquire()
+
+ t0 = Date.now()
+ const sBlockRange = `${chunk[0].height}-${chunk[chunk.length-1].height}`
+ Logger.info(`Tracker : Beginning to process blocks ${sBlockRange}`)
+
+ // Process the chunk
+ chunk.sort((a,b) => a.height - b.height)
+ headersChunk = chunk
+ txsForBroadcast = []
+ processTask(blockWorker.OP_INIT)
+}
+module.exports.processChunk = processChunk
+
+/**
+ * Process an error returned by a worker thread
+ * @param {object} e - error
+ */
+async function processWorkerError(e) {
+ return processWorkerMessage({
+ 'op': currentOp,
+ 'status': false,
+ 'res': e
+ })
+}
+
+/**
+ * Process a message returned by a worker thread
+ * @param {object} msg - message sent by the worker thread
+ */
+async function processWorkerMessage(msg) {
+ nbTasksCompleted++
+
+ if (!msg.status) {
+ Logger.error(msg.res, 'Tracker : processWorkerMessage()')
+ } else if (msg.op == blockWorker.OP_CONFIRM) {
+ txsForBroadcast = txsForBroadcast.concat(msg.res)
+ }
+
+ if (nbTasksCompleted == nbTasksEnqueued) {
+ switch (msg.op) {
+ case blockWorker.OP_INIT:
+ // Process the transaction outputs
+ processTask(blockWorker.OP_PROCESS_OUTPUTS)
+ break
+ case blockWorker.OP_PROCESS_OUTPUTS:
+ // Process the transaction inputs
+ processTask(blockWorker.OP_PROCESS_INPUTS)
+ break
+ case blockWorker.OP_PROCESS_INPUTS:
+ // Store the blocks in db and get their id
+ const blockIds = await util.seriesCall(headersChunk, async header => {
+ return registerBlock(header)
+ })
+ // Confirm the transactions
+ processTask(blockWorker.OP_CONFIRM, blockIds)
+ break
+ case blockWorker.OP_CONFIRM:
+ // Notify new transactions and blocks
+ await Promise.all([
+ util.parallelCall(txsForBroadcast, async tx => {
+ notifyTx(tx)
+ }),
+ util.parallelCall(headersChunk, async header => {
+ notifyBlock(header)
+ })
+ ])
+ // Process complete. Reset the workers
+ processTask(blockWorker.OP_RESET)
+ break
+ case blockWorker.OP_RESET:
+ const dt = ((Date.now()-t0)/1000).toFixed(1)
+ const per = ((Date.now()-t0)/headersChunk.length).toFixed(0)
+ const sBlockRange = `${headersChunk[0].height}-${headersChunk[headersChunk.length-1].height}`
+ Logger.info(`Tracker : Finished processing blocks ${sBlockRange}, ${dt}s, ${per}ms/block`)
+ // Release the semaphor
+ await _processBlocksSemaphor.release()
+ break
+ }
+ }
+}
+
+/**
+ * Execute an operation processing a block
+ * @param {integer} op - operation
+ * @param {*} args
+ */
+function processTask(op, args) {
+ currentOp = op
+ nbTasksEnqueued = 0
+ nbTasksCompleted = 0
+
+ switch (op) {
+ case blockWorker.OP_INIT:
+ for (let i = 0; i < headersChunk.length; i++) {
+ blockWorkers[i].postMessage({
+ 'op': op,
+ 'header': headersChunk[i]
+ })
+ nbTasksEnqueued++
+ }
+ break
+ case blockWorker.OP_PROCESS_OUTPUTS:
+ case blockWorker.OP_PROCESS_INPUTS:
+ case blockWorker.OP_RESET:
+ for (let i = 0; i < headersChunk.length; i++) {
+ blockWorkers[i].postMessage({'op': op})
+ nbTasksEnqueued++
+ }
+ break
+ case blockWorker.OP_CONFIRM:
+ for (let i = 0; i < headersChunk.length; i++) {
+ blockWorkers[i].postMessage({
+ 'op': op,
+ 'blockId': args[i]
+ })
+ nbTasksEnqueued++
+ }
+ break
+ default:
+ Logger.error(null, 'Tracker : processTask() : Unknown operation')
+ }
+}
+
+/**
+ * Notify a new transaction
+ * @param {object} tx - bitcoin transaction
+ */
+function notifyTx(tx) {
+ // Real-time client updates for this transaction.
+ // Any address input or output present in transaction
+ // is a potential client to notify.
+ if (notifSock)
+ notifSock.send(['transaction', JSON.stringify(tx)])
+}
+
+/**
+ * Notify a new block
+ * @param {string} header - block header
+ */
+function notifyBlock(header) {
+ // Notify clients of the block
+ if (notifSock)
+ notifSock.send(['block', JSON.stringify(header)])
+}
+
+/**
+ * Store a block in db
+ * @param {object} header - block header
+ * @returns {Promise - int} returns the id of the block
+ */
+async function registerBlock(header) {
+ const prevBlock = await dbProcessor.getBlockByHash(header.previousblockhash)
+ const prevID = (prevBlock && prevBlock.blockID) ? prevBlock.blockID : null
+
+ const blockId = await dbProcessor.addBlock({
+ blockHeight: header.height,
+ blockHash: header.hash,
+ blockTime: header.time,
+ blockParent: prevID
+ })
+
+ Logger.info(`Tracker : Added block ${header.height} (id=${blockId})`)
+ return blockId
+}
diff --git a/tracker/index.js b/tracker/index.js
index bbbed6a..b08887c 100644
--- a/tracker/index.js
+++ b/tracker/index.js
@@ -6,7 +6,7 @@
'use strict'
- const RpcClient = require('../lib/bitcoind-rpc/rpc-client')
+ const { waitForBitcoindRpcApi } = require('../lib/bitcoind-rpc/rpc-client')
const network = require('../lib/bitcoin/network')
const keys = require('../keys')[network.key]
const db = require('../lib/db/mysql-db-wrapper')
@@ -21,7 +21,7 @@
// Wait for Bitcoind RPC API
// being ready to process requests
- await RpcClient.waitForBitcoindRpcApi()
+ await waitForBitcoindRpcApi()
// Initialize the db wrapper
const dbConfig = {
diff --git a/tracker/mempool-processor.js b/tracker/mempool-processor.js
index a9f4563..bd30ef3 100644
--- a/tracker/mempool-processor.js
+++ b/tracker/mempool-processor.js
@@ -11,8 +11,8 @@ const util = require('../lib/util')
const Logger = require('../lib/logger')
const db = require('../lib/db/mysql-db-wrapper')
const network = require('../lib/bitcoin/network')
+const { createRpcClient } = require('../lib/bitcoind-rpc/rpc-client')
const keys = require('../keys')[network.key]
-const AbstractProcessor = require('./abstract-processor')
const Transaction = require('./transaction')
const TransactionsBundle = require('./transactions-bundle')
@@ -20,14 +20,17 @@ const TransactionsBundle = require('./transactions-bundle')
/**
* A class managing a buffer for the mempool
*/
-class MempoolProcessor extends AbstractProcessor {
+class MempoolProcessor {
/**
* Constructor
* @param {object} notifSock - ZMQ socket used for notifications
*/
constructor(notifSock) {
- super(notifSock)
+ // RPC client
+ this.client = createRpcClient()
+ // ZeroMQ socket for notifications sent to others components
+ this.notifSock = notifSock
// Mempool buffer
this.mempoolBuffer = new TransactionsBundle()
// ZeroMQ socket for bitcoind Txs messages
@@ -69,15 +72,17 @@ class MempoolProcessor extends AbstractProcessor {
/**
* Stop processing
- */
+ */
async stop() {
clearInterval(this.checkUnconfirmedId)
clearInterval(this.processMempoolId)
//clearInterval(this.displayStatsId)
- resolve(this.txSock.disconnect(keys.bitcoind.zmqTx).close())
- resolve(this.pushTxSock.disconnect(keys.ports.notifpushtx).close())
- resolve(this.orchestratorSock.disconnect(keys.ports.orchestrator).close())
+ this.txSock.disconnect(keys.bitcoind.zmqTx).close()
+ this.pushTxSock.disconnect(keys.ports.notifpushtx).close()
+ this.orchestratorSock.disconnect(keys.ports.orchestrator).close()
+
+ return Promise.resolve();
}
/**
@@ -150,14 +155,27 @@ class MempoolProcessor extends AbstractProcessor {
let currentMempool = new TransactionsBundle(this.mempoolBuffer.toArray())
this.mempoolBuffer.clear()
- const filteredTxs = await currentMempool.prefilterTransactions()
+ const txsForBroadcast = new Map()
+
+ let filteredTxs = await currentMempool.prefilterByOutputs()
+ await util.parallelCall(filteredTxs, async filteredTx => {
+ const tx = new Transaction(filteredTx)
+ await tx.processOutputs()
+ if (tx.doBroadcast)
+ txsForBroadcast[tx.txid] = tx.tx
+ })
- return util.seriesCall(filteredTxs, async filteredTx => {
+ filteredTxs = await currentMempool.prefilterByInputs()
+ await util.parallelCall(filteredTxs, async filteredTx => {
const tx = new Transaction(filteredTx)
- const txCheck = await tx.checkTransaction()
- if (txCheck && txCheck.broadcast)
- this.notifyTx(txCheck.tx)
+ await tx.processInputs()
+ if (tx.doBroadcast)
+ txsForBroadcast[tx.txid] = tx.tx
})
+
+ // Send the notifications
+ for (let tx of txsForBroadcast.values())
+ this.notifyTx(tx)
}
/**
@@ -206,6 +224,29 @@ class MempoolProcessor extends AbstractProcessor {
}
}
+ /**
+ * Notify a new transaction
+ * @param {object} tx - bitcoin transaction
+ */
+ notifyTx(tx) {
+ // Real-time client updates for this transaction.
+ // Any address input or output present in transaction
+ // is a potential client to notify.
+ if (this.notifSock)
+ this.notifSock.send(['transaction', JSON.stringify(tx)])
+ }
+
+ /**
+ * Notify a new block
+ * @param {string} header - block header
+ */
+ notifyBlock(header) {
+ // Notify clients of the block
+ if (this.notifSock)
+ this.notifSock.send(['block', JSON.stringify(header)])
+ }
+
+
/**
* Check unconfirmed transactions
* @returns {Promise}
@@ -218,11 +259,11 @@ class MempoolProcessor extends AbstractProcessor {
const unconfirmedTxs = await db.getUnconfirmedTransactions()
if (unconfirmedTxs.length > 0) {
- await util.seriesCall(unconfirmedTxs, tx => {
+ await util.parallelCall(unconfirmedTxs, tx => {
try {
- return this.client.getrawtransaction(tx.txnTxid, true)
+ return this.client.getrawtransaction( { txid: tx.txnTxid, verbose: true })
.then(async rtx => {
- if (!rtx.blockhash) return null
+ if (!rtx.blockhash) return null
// Transaction is confirmed
const block = await db.getBlockByHash(rtx.blockhash)
if (block && block.blockID) {
@@ -247,7 +288,7 @@ class MempoolProcessor extends AbstractProcessor {
const ntx = unconfirmedTxs.length
const dt = ((Date.now() - t0) / 1000).toFixed(1)
const per = (ntx == 0) ? 0 : ((Date.now() - t0) / ntx).toFixed(0)
- Logger.info(`Tracker : Finished processing unconfirmed transactions ${dt}s, ${ntx} tx, ${per}ms/tx`)
+ Logger.info(`Tracker : Finished processing unconfirmed transactions ${dt}s, ${ntx} tx, ${per}ms/tx`)
}
/**
@@ -255,11 +296,10 @@ class MempoolProcessor extends AbstractProcessor {
*/
async _refreshActiveStatus() {
// Get highest header in the blockchain
- const info = await this.client.getblockchaininfo()
+ // Get highest block processed by the tracker
+ const [highestBlock, info] = await Promise.all([db.getHighestBlock(), this.client.getblockchaininfo()])
const highestHeader = info.headers
- // Get highest block processed by the tracker
- const highestBlock = await db.getHighestBlock()
if (highestBlock == null || highestBlock.blockHeight == 0) {
this.isActive = false
return
diff --git a/tracker/tracker-rest-api.js b/tracker/tracker-rest-api.js
index 85d7c5f..5fc43cc 100644
--- a/tracker/tracker-rest-api.js
+++ b/tracker/tracker-rest-api.js
@@ -4,10 +4,7 @@
*/
'use strict'
-const qs = require('querystring')
const validator = require('validator')
-const bodyParser = require('body-parser')
-const Logger = require('../lib/logger')
const errors = require('../lib/errors')
const authMgr = require('../lib/auth/authorizations-manager')
const HttpServer = require('../lib/http-server/http-server')
@@ -29,8 +26,6 @@ class TrackerRestApi {
this.httpServer = httpServer
this.tracker = tracker
- const urlencodedParser = bodyParser.urlencoded({ extended: true })
-
// Establish routes. Proxy server strips /pushtx
this.httpServer.app.get(
`/${keys.prefixes.support}/rescan`,
diff --git a/tracker/transaction.js b/tracker/transaction.js
index f81d0b8..037e774 100644
--- a/tracker/transaction.js
+++ b/tracker/transaction.js
@@ -8,6 +8,7 @@ const _ = require('lodash')
const bitcoin = require('bitcoinjs-lib')
const util = require('../lib/util')
const Logger = require('../lib/logger')
+const addrHelper = require('../lib/bitcoin/addresses-helper')
const hdaHelper = require('../lib/bitcoin/hd-accounts-helper')
const db = require('../lib/db/mysql-db-wrapper')
const network = require('../lib/bitcoin/network')
@@ -28,9 +29,9 @@ class Transaction {
*/
constructor(tx) {
this.tx = tx
- this.txid = this.tx.getId()
+ this.txid = this.tx.getId()
// Id of transaction stored in db
- this.storedTxnID = null
+ this.storedTxnID = null
// Should this transaction be broadcast out to connected clients?
this.doBroadcast = false
}
@@ -46,10 +47,10 @@ class Transaction {
async checkTransaction() {
try {
// Process transaction inputs
- await this._processInputs()
+ await this.processInputs()
// Process transaction outputs
- await this._processOutputs()
+ await this.processOutputs()
// If this point reached with no errors,
// store the fact that this transaction was checked.
@@ -72,7 +73,7 @@ class Transaction {
* Process transaction inputs
* @returns {Promise}
*/
- async _processInputs() {
+ async processInputs() {
// Array of inputs spent
const spends = []
// Store input indices, keyed by `txid-outindex` for easy retrieval
@@ -80,7 +81,7 @@ class Transaction {
// Store database ids of double spend transactions
const doubleSpentTxnIDs = []
// Store inputs of interest
- const inputs = []
+ const inputs = []
// Extracts inputs information
let index = 0
@@ -150,16 +151,16 @@ class Transaction {
* Process transaction outputs
* @returns {Promise}
*/
- async _processOutputs() {
+ async processOutputs() {
// Store outputs, keyed by address. Values are arrays of outputs
const indexedOutputs = {}
-
+
// Extracts outputs information
let index = 0
for (let output of this.tx.outs) {
try {
- const address = bitcoin.address.fromOutputScript(output.script, activeNet)
+ const address = addrHelper.outputScript2Address(output.script)
if (!indexedOutputs[address])
indexedOutputs[address] = []
@@ -174,7 +175,7 @@ class Transaction {
// Array of addresses receiving tx outputs
const addresses = _.keys(indexedOutputs)
-
+
// Store a list of known addresses that received funds
let fundedAddresses = []
@@ -203,7 +204,7 @@ class Transaction {
for (let a of fundedAddresses) {
outputs.push({
- txnID: this.storedTxnID,
+ txnID: this.storedTxnID,
addrID: a.addrID,
outIndex: a.outIndex,
outAmount: a.outAmount,
@@ -253,9 +254,9 @@ class Transaction {
// Store a list of known addresses that received funds
const fundedAddresses = []
const xpubList = _.keys(hdAccounts)
-
+
if (xpubList.length > 0) {
- await util.seriesCall(xpubList, async xpub => {
+ await util.parallelCall(xpubList, async xpub => {
const usedNewAddresses = await this._deriveNewAddresses(
xpub,
hdAccounts[xpub],
@@ -281,7 +282,7 @@ class Transaction {
}
})
}
-
+
return fundedAddresses
}
@@ -290,7 +291,7 @@ class Transaction {
* Check if tx addresses are at or beyond the next unused
* index for the HD chain. Derive additional addresses
* to replace the gap limit and add those addresses to
- * the database. Make sure to account for tx sending to
+ * the database. Make sure to account for tx sending to
* newly-derived addresses.
*
* @param {string} xpub
@@ -389,7 +390,7 @@ class Transaction {
await db.addAddressesToHDAccount(xpub, newAddresses)
return _.keys(usedNewAddresses)
}
-
+
/**
* Store the transaction in database
@@ -405,7 +406,7 @@ class Transaction {
locktime: this.tx.locktime,
})
- Logger.info(`Tracker : Storing transaction ${this.txid}`)
+ Logger.info(`Tracker : Storing transaction ${this.txid}`)
}
}
diff --git a/tracker/transactions-bundle.js b/tracker/transactions-bundle.js
index 081eca7..a769e44 100644
--- a/tracker/transactions-bundle.js
+++ b/tracker/transactions-bundle.js
@@ -6,12 +6,9 @@
const _ = require('lodash')
const LRU = require('lru-cache')
-const bitcoin = require('bitcoinjs-lib')
const util = require('../lib/util')
const db = require('../lib/db/mysql-db-wrapper')
-const network = require('../lib/bitcoin/network')
-const keys = require('../keys')[network.key]
-const activeNet = network.network
+const addrHelper = require('../lib/bitcoin/addresses-helper')
/**
@@ -63,64 +60,54 @@ class TransactionsBundle {
/**
* Find the transactions of interest
+ * based on theirs inputs
* @returns {object[]} returns an array of transactions objects
*/
- async prefilterTransactions() {
+ async prefilterByInputs() {
// Process transactions by slices of 5000 transactions
const MAX_NB_TXS = 5000
const lists = util.splitList(this.transactions, MAX_NB_TXS)
-
- const results = await util.seriesCall(lists, list => {
- return this._prefilterTransactions(list)
- })
-
+ const results = await util.parallelCall(lists, txs => this._prefilterByInputs(txs))
return _.flatten(results)
}
/**
- * Find the transactions of interest (internal implementation)
- * @params {object[]} transactions - array of transactions objects
+ * Find the transactions of interest
+ * based on theirs outputs
* @returns {object[]} returns an array of transactions objects
*/
- async _prefilterTransactions(transactions) {
- let inputs = []
- let outputs = []
+ async prefilterByOutputs() {
+ // Process transactions by slices of 5000 transactions
+ const MAX_NB_TXS = 5000
+ const lists = util.splitList(this.transactions, MAX_NB_TXS)
+ const results = await util.parallelCall(lists, txs => this._prefilterByOutputs(txs))
+ return _.flatten(results)
+ }
- // Store indices of txs to be processed
+ /**
+ * Find the transactions of interest
+ * based on theirs outputs (internal implementation)
+ * @params {object[]} txs - array of transactions objects
+ * @returns {object[]} returns an array of transactions objects
+ */
+ async _prefilterByOutputs(txs) {
+ let addresses = []
let filteredIdxTxs = []
-
- // Store txs indices, keyed by `txid-outindex`.
- // Values are arrays of txs indices (for double spends)
- let indexedInputs = {}
-
- // Store txs indices, keyed by address.
- // Values are arrays of txs indices
let indexedOutputs = {}
- // Stores txs indices, keyed by txids
- let indexedTxs = {}
-
- //
- // Prefilter against the outputs
- //
-
// Index the transaction outputs
- for (const i in transactions) {
- const tx = transactions[i]
+ for (const i in txs) {
+ const tx = txs[i]
const txid = tx.getId()
- indexedTxs[txid] = i
-
- // If we already checked this tx
if (TransactionsBundle.cache.has(txid))
- continue
+ continue
for (const j in tx.outs) {
try {
const script = tx.outs[j].script
- const address = bitcoin.address.fromOutputScript(script, activeNet)
- outputs.push(address)
- // Index the output
+ const address = addrHelper.outputScript2Address(script)
+ addresses.push(address)
if (!indexedOutputs[address])
indexedOutputs[address] = []
indexedOutputs[address].push(i)
@@ -129,8 +116,7 @@ class TransactionsBundle {
}
// Prefilter
- const outRes = await db.getUngroupedHDAccountsByAddresses(outputs)
-
+ const outRes = await db.getUngroupedHDAccountsByAddresses(addresses)
for (const i in outRes) {
const key = outRes[i].addrAddress
const idxTxs = indexedOutputs[key]
@@ -141,41 +127,43 @@ class TransactionsBundle {
}
}
- //
- // Prefilter against the inputs
- //
+ return filteredIdxTxs.map(x => txs[x])
+ }
- // Index the transaction inputs
- for (const i in transactions) {
- const tx = transactions[i]
+ /**
+ * Find the transactions of interest
+ * based on theirs inputs (internal implementation)
+ * @params {object[]} txs - array of transactions objects
+ * @returns {object[]} returns an array of transactions objects
+ */
+ async _prefilterByInputs(txs) {
+ let inputs = []
+ let filteredIdxTxs = []
+ let indexedInputs = {}
+
+ for (const i in txs) {
+ const tx = txs[i]
const txid = tx.getId()
- // If we already checked this tx
if (TransactionsBundle.cache.has(txid))
continue
for (const j in tx.ins) {
const spendHash = tx.ins[j].hash
const spendTxid = Buffer.from(spendHash).reverse().toString('hex')
- // Check if this input consumes an output
- // generated by a transaction from this block
- if (filteredIdxTxs.indexOf(indexedTxs[spendTxid]) > -1 && filteredIdxTxs.indexOf(i) == -1) {
- filteredIdxTxs.push(i)
- } else {
- const spendIdx = tx.ins[j].index
- inputs.push({txid: spendTxid, index: spendIdx})
- // Index the input
- const key = spendTxid + '-' + spendIdx
- if (!indexedInputs[key])
- indexedInputs[key] = []
- indexedInputs[key].push(i)
- }
+ const spendIdx = tx.ins[j].index
+ inputs.push({txid: spendTxid, index: spendIdx})
+ const key = spendTxid + '-' + spendIdx
+ if (!indexedInputs[key])
+ indexedInputs[key] = []
+ indexedInputs[key].push(i)
}
}
// Prefilter
- const inRes = await db.getOutputSpends(inputs)
-
+ const lists = util.splitList(inputs, 1000)
+ const results = await util.parallelCall(lists, list => db.getOutputSpends(list))
+ const inRes = _.flatten(results)
for (const i in inRes) {
const key = inRes[i].txnTxid + '-' + inRes[i].outIndex
const idxTxs = indexedInputs[key]
@@ -186,11 +174,7 @@ class TransactionsBundle {
}
}
- //
- // Returns the matching transactions
- //
- filteredIdxTxs.sort((a, b) => a - b);
- return filteredIdxTxs.map(x => transactions[x])
+ return filteredIdxTxs.map(x => txs[x])
}
}