Browse Source

Merge branch 'feat_dojo_optim_tracker2' into 'develop'

optimize tracker (parallel processing of blocks)

See merge request dojo/samourai-dojo!217
umbrel
kenshin-samourai 4 years ago
parent
commit
3b45a621f4
  1. 50
      tracker/abstract-processor.js
  2. 173
      tracker/block-worker.js
  3. 123
      tracker/block.js
  4. 96
      tracker/blockchain-processor.js
  5. 222
      tracker/blocks-processor.js
  6. 57
      tracker/mempool-processor.js
  7. 10
      tracker/transaction.js
  8. 107
      tracker/transactions-bundle.js

50
tracker/abstract-processor.js

@ -1,50 +0,0 @@
/*!
* tracker/abstract-processor.js
* Copyright © 2019 Katana Cryptographic Ltd. All Rights Reserved.
*/
'use strict'
const RpcClient = require('../lib/bitcoind-rpc/rpc-client')
/**
* An abstract class for tracker processors
*/
class AbstractProcessor {
/**
* Constructor
* @param {object} notifSock - ZMQ socket used for notifications
*/
constructor(notifSock) {
// RPC client
this.client = new RpcClient()
// ZeroMQ socket for notifications sent to others components
this.notifSock = notifSock
}
/**
* Notify a new transaction
* @param {object} tx - bitcoin transaction
*/
notifyTx(tx) {
// Real-time client updates for this transaction.
// Any address input or output present in transaction
// is a potential client to notify.
if (this.notifSock)
this.notifSock.send(['transaction', JSON.stringify(tx)])
}
/**
* Notify a new block
* @param {string} header - block header
*/
notifyBlock(header) {
// Notify clients of the block
if (this.notifSock)
this.notifSock.send(['block', JSON.stringify(header)])
}
}
module.exports = AbstractProcessor

173
tracker/block-worker.js

@ -0,0 +1,173 @@
/*!
* tracker/block-worker.js
* Copyright © 2019 Katana Cryptographic Ltd. All Rights Reserved.
*/
'use strict'
const { isMainThread, parentPort } = require('worker_threads')
const network = require('../lib/bitcoin/network')
const keys = require('../keys')[network.key]
const db = require('../lib/db/mysql-db-wrapper')
const RpcClient = require('../lib/bitcoind-rpc/rpc-client')
const Block = require('./block')
/**
* STATUS
*/
const IDLE = 0
module.exports.IDLE = IDLE
const INITIALIZED = 1
module.exports.INITIALIZED = INITIALIZED
const OUTPUTS_PROCESSED = 2
module.exports.OUTPUTS_PROCESSED = OUTPUTS_PROCESSED
const INPUTS_PROCESSED = 3
module.exports.INPUTS_PROCESSED = INPUTS_PROCESSED
const TXS_CONFIRMED = 4
module.exports.TXS_CONFIRMED = TXS_CONFIRMED
/**
* OPS
*/
const OP_INIT = 0
module.exports.OP_INIT = OP_INIT
const OP_PROCESS_OUTPUTS = 1
module.exports.OP_PROCESS_OUTPUTS = OP_PROCESS_OUTPUTS
const OP_PROCESS_INPUTS = 2
module.exports.OP_PROCESS_INPUTS = OP_PROCESS_INPUTS
const OP_CONFIRM = 3
module.exports.OP_CONFIRM = OP_CONFIRM
const OP_RESET = 4
module.exports.OP_RESET = OP_RESET
/**
* Process message received by the worker
* @param {object} msg - message received by the worker
*/
async function processMessage(msg) {
let res = null
let success = true
try {
switch(msg.op) {
case OP_INIT:
if (status != IDLE)
throw 'Operation not allowed'
res = await initBlock(msg.header)
break
case OP_PROCESS_OUTPUTS:
if (status != INITIALIZED)
throw 'Operation not allowed'
res = await processOutputs()
break
case OP_PROCESS_INPUTS:
if (status != OUTPUTS_PROCESSED)
throw 'Operation not allowed'
res = await processInputs()
break
case OP_CONFIRM:
if (status != INPUTS_PROCESSED)
throw 'Operation not allowed'
res = await confirmTransactions(msg.blockId)
break
case OP_RESET:
res = await reset()
break
default:
throw 'Invalid Operation'
}
} catch (e) {
success = false
res = e
} finally {
parentPort.postMessage({
'op': msg.op,
'status': success,
'res': res
})
}
}
/**
* Initialize the block
* @param {object} header - block header
*/
async function initBlock(header) {
status = INITIALIZED
const hex = await rpcClient.getblock(header.hash, false)
block = new Block(hex, header)
return true
}
/**
* Process the transactions outputs
*/
async function processOutputs() {
status = OUTPUTS_PROCESSED
txsForBroadcast = await block.processOutputs()
return true
}
/**
* Process the transactions inputs
*/
async function processInputs() {
status = INPUTS_PROCESSED
const txs = await block.processInputs()
txsForBroadcast = txsForBroadcast.concat(txs)
return true
}
/**
* Confirm the transactions
* @param {integer} blockId - id of the block in db
*/
async function confirmTransactions(blockId) {
status = TXS_CONFIRMED
const aTxsForBroadcast = [...new Set(txsForBroadcast)]
await block.confirmTransactions(aTxsForBroadcast, blockId)
return aTxsForBroadcast
}
/**
* Reset
*/
function reset() {
status = IDLE
block = null
txsForBroadcast = []
return true
}
/**
* MAIN
*/
const rpcClient = new RpcClient()
let block = null
let txsForBroadcast = []
let status = IDLE
if (!isMainThread) {
db.connect({
connectionLimit: keys.db.connectionLimitTracker,
acquireTimeout: keys.db.acquireTimeout,
host: keys.db.host,
user: keys.db.user,
password: keys.db.pass,
database: keys.db.database
})
reset()
parentPort.on('message', processMessage)
}

123
tracker/block.js

@ -26,44 +26,92 @@ class Block extends TransactionsBundle {
super()
this.hex = hex
this.header = header
try {
if (hex != null) {
const block = bitcoin.Block.fromHex(hex)
this.transactions = block.transactions
}
} catch (e) {
Logger.error(e, 'Tracker : Block()')
Logger.error(null, header)
return Promise.reject(e)
}
}
/**
* Register the block and transactions of interest in db
* @dev This method isn't used anymore.
* It has been replaced by a parallel processing of blocks.
* (see blocks-processor and block-worker)
* @returns {Promise - object[]} returns an array of transactions to be broadcast
*/
async checkBlock() {
async processBlock() {
Logger.info('Tracker : Beginning to process new block.')
let block
const txsForBroadcast = []
const t0 = Date.now()
const txsForBroadcast = new Map()
try {
block = bitcoin.Block.fromHex(this.hex)
this.transactions = block.transactions
} catch (e) {
Logger.error(e, 'Tracker : Block.checkBlock()')
Logger.error(null, this.header)
return Promise.reject(e)
}
const txsForBroadcast1 = await this.processOutputs()
txsForBroadcast1.map(tx => {txsForBroadcast.set(tx.getId(), tx)})
const t0 = Date.now()
let ntx = 0
// Filter transactions
const filteredTxs = await this.prefilterTransactions()
// Check filtered transactions
// and broadcast notifications
await util.seriesCall(filteredTxs, async tx => {
const filteredTx = new Transaction(tx)
const txCheck = await filteredTx.checkTransaction()
if (txCheck && txCheck.broadcast)
txsForBroadcast.push(txCheck.tx)
const txsForBroadcast2 = await this.processInputs()
txsForBroadcast2.map(tx => {txsForBroadcast.set(tx.getId(), tx)})
const aTxsForBroadcast = [...txsForBroadcast.values()]
const blockId = await this.registerBlock()
await this.confirmTransactions(aTxsForBroadcast, blockId)
// Logs and result returned
const ntx = this.transactions.length
const dt = ((Date.now()-t0)/1000).toFixed(1)
const per = ((Date.now()-t0)/ntx).toFixed(0)
Logger.info(`Tracker : Finished block ${this.header.height}, ${dt}s, ${ntx} tx, ${per}ms/tx`)
return aTxsForBroadcast
}
/**
* Process the transaction outputs
* @returns {Promise - object[]} returns an array of transactions to be broadcast
*/
async processOutputs() {
const txsForBroadcast = new Set()
const filteredTxs = await this.prefilterByOutputs()
await util.parallelCall(filteredTxs, async filteredTx => {
const tx = new Transaction(filteredTx)
await tx.processOutputs()
if (tx.doBroadcast)
txsForBroadcast.add(tx.tx)
})
return [...txsForBroadcast]
}
/**
* Process the transaction inputs
* @returns {Promise - object[]} returns an array of transactions to be broadcast
*/
async processInputs() {
const txsForBroadcast = new Set()
const filteredTxs = await this.prefilterByInputs()
await util.parallelCall(filteredTxs, async filteredTx => {
const tx = new Transaction(filteredTx)
await tx.processInputs()
if (tx.doBroadcast)
txsForBroadcast.add(tx.tx)
})
return [...txsForBroadcast]
}
// Retrieve the previous block
// and store the new block into the database
/**
* Store the block in db
* @returns {Promise - int} returns the id of the block
*/
async registerBlock() {
const prevBlock = await db.getBlockByHash(this.header.previousblockhash)
const prevID = (prevBlock && prevBlock.blockID) ? prevBlock.blockID : null
@ -76,18 +124,19 @@ class Block extends TransactionsBundle {
Logger.info(`Tracker : Added block ${this.header.height} (id=${blockId})`)
// Confirms the transactions
const txids = this.transactions.map(t => t.getId())
ntx = txids.length
const txidLists = util.splitList(txids, 100)
await util.parallelCall(txidLists, list => db.confirmTransactions(list, blockId))
// Logs and result returned
const dt = ((Date.now()-t0)/1000).toFixed(1)
const per = ((Date.now()-t0)/ntx).toFixed(0)
Logger.info(`Tracker : Finished block ${this.header.height}, ${dt}s, ${ntx} tx, ${per}ms/tx`)
return blockId
}
return txsForBroadcast
/**
* Confirm the transactions in db
* @param {Set} txs - set of transactions stored in db
* @param {int} blockId - id of the block
* r@returns {Promise}
*/
async confirmTransactions(txs, blockId) {
const txids = txs.map(t => t.getId())
const txidLists = util.splitList(txids, 100)
return util.parallelCall(txidLists, list => db.confirmTransactions(list, blockId))
}
/**

96
tracker/blockchain-processor.js

@ -11,29 +11,34 @@ const util = require('../lib/util')
const Logger = require('../lib/logger')
const db = require('../lib/db/mysql-db-wrapper')
const network = require('../lib/bitcoin/network')
const RpcClient = require('../lib/bitcoind-rpc/rpc-client')
const keys = require('../keys')[network.key]
const AbstractProcessor = require('./abstract-processor')
const Block = require('./block')
const TransactionsBundle = require('./transactions-bundle')
const blocksProcessor = require('./blocks-processor')
/**
* A class allowing to process the blockchain
*/
class BlockchainProcessor extends AbstractProcessor {
class BlockchainProcessor {
/**
* Constructor
* @param {object} notifSock - ZMQ socket used for notifications
*/
constructor(notifSock) {
super(notifSock)
// RPC client
this.client = new RpcClient()
// ZeroMQ socket for bitcoind blocks messages
this.blkSock = null
// Initialize a semaphor protecting the onBlockHash() method
this._onBlockHashSemaphor = new Sema(1, { capacity: 50 })
// Array of worker threads used for parallel processing of blocks
this.blockWorkers = []
// Flag tracking Initial Block Download Mode
this.isIBD = true
// Initialize the blocks processor
blocksProcessor.init(notifSock)
}
/**
@ -156,22 +161,16 @@ class BlockchainProcessor extends AbstractProcessor {
if (highest == null) return null
if (daemonNbBlocks == highest.blockHeight) return null
// Compute blocks range to be processed
const blockRange = _.range(highest.blockHeight, daemonNbBlocks + 1)
Logger.info(`Tracker : Sync ${blockRange.length} blocks`)
// Process the blocks
return util.seriesCall(blockRange, async height => {
try {
const hash = await this.client.getblockhash(height)
const header = await this.client.getblockheader(hash)
return this.processBlock(header)
} catch(e) {
Logger.error(e, 'Tracker : BlockchainProcessor.catchupNormalMode()')
process.exit()
}
}, 'Tracker syncing', true)
try {
return this.processBlockRange(blockRange)
} catch(e) {
Logger.error(e, 'Tracker : BlockchainProcessor.catchupNormalMode()')
process.exit()
}
} catch(e) {
Logger.error(e, 'Tracker : BlockchainProcessor.catchupNormalMode()')
@ -259,9 +258,7 @@ class BlockchainProcessor extends AbstractProcessor {
await this.rewind(knownHeight)
// Process the blocks
return await util.seriesCall(headers, header => {
return this.processBlock(header)
})
return await this.processBlocks(headers)
} catch(e) {
Logger.error(e, 'Tracker : BlockchainProcessor.onBlockHash()')
@ -315,8 +312,6 @@ class BlockchainProcessor extends AbstractProcessor {
await db.unconfirmTransactions(txids)
}
// TODO: get accounts and notify of deletion ?
await db.deleteBlocksAfterHeight(height)
}
@ -339,45 +334,40 @@ class BlockchainProcessor extends AbstractProcessor {
Logger.info(`Blocks Rescan : starting a rescan for ${blockRange.length} blocks`)
// Process the blocks
return util.seriesCall(blockRange, async height => {
try {
Logger.info(`Tracker : Rescanning block ${height}`)
const hash = await this.client.getblockhash(height)
const header = await this.client.getblockheader(hash)
return this.processBlock(header)
} catch(e) {
Logger.error(e, 'Tracker : BlockchainProcessor.rescan()')
throw e
}
}, 'Tracker rescan', true)
try {
return this.processBlockRange(blockRange)
} catch(e) {
Logger.error(e, 'Tracker : BlockchainProcessor.rescan()')
throw e
}
}
/**
* Process a block
* @param {object} header - block header
* @returns {Promise}
* Process a list of blocks
* @param {object[]} headers - array of block headers
*/
async processBlock(header) {
try {
// Get raw block hex string from bitcoind
const hex = await this.client.getblock(header.hash, false)
async processBlocks(headers) {
const chunks = util.splitList(headers, blocksProcessor.nbWorkers)
const block = new Block(hex, header)
const txsForBroadcast = await block.checkBlock()
// Send notifications
for (let tx of txsForBroadcast)
this.notifyTx(tx)
await util.seriesCall(chunks, async chunk => {
return blocksProcessor.processChunk(chunk)
})
}
this.notifyBlock(header)
/**
* Process a range of blocks
* @param {int[]} heights - a range of block heights
*/
async processBlockRange(heights) {
const chunks = util.splitList(heights, blocksProcessor.nbWorkers)
} catch(e) {
// The show must go on.
// TODO: further notification that this block did not check out
Logger.error(e, 'Tracker : BlockchainProcessor.processBlock()')
}
return util.seriesCall(chunks, async chunk => {
const headers = await util.parallelCall(chunk, async height => {
const hash = await this.client.getblockhash(height)
return await this.client.getblockheader(hash)
})
return this.processBlocks(headers)
})
}
/**

222
tracker/blocks-processor.js

@ -0,0 +1,222 @@
/*!
* tracker/blocks-processor.js
* Copyright © 2019 Katana Cryptographic Ltd. All Rights Reserved.
*/
'use strict'
const os = require('os')
const Sema = require('async-sema')
const { Worker } = require('worker_threads')
const Logger = require('../lib/logger')
const util = require('../lib/util')
const dbProcessor = require('../lib/db/mysql-db-wrapper')
const blockWorker = require('./block-worker')
let notifSock = null
let blockWorkers = []
let headersChunk = []
let txsForBroadcast = []
let t0 = null
let currentOp = null
let nbTasksEnqueued = 0
let nbTasksCompleted = 0
// Semaphor protecting the processBloks() method
const _processBlocksSemaphor = new Sema(1)
// Number of worker threads processing the blocks in parallel
const nbWorkers = os.cpus().length //- 1
module.exports.nbWorkers = nbWorkers
/**
* Initialize the processor
* @param {object} notifSock - ZMQ socket used for notifications
*/
function init(notifSock) {
notifSock = notifSock
for (let i = 0; i < nbWorkers; i++) {
const worker = new Worker(
`${__dirname}/block-worker.js`,
{ workerData: null }
)
worker.on('error', processWorkerError)
worker.on('message', processWorkerMessage)
blockWorkers.push(worker)
}
}
module.exports.init = init
/**
* Process a chunk of block headers
* @param {object[]} chunk - array of block headers
*/
async function processChunk(chunk) {
// Acquire the semaphor (wait for previous chunk)
await _processBlocksSemaphor.acquire()
t0 = Date.now()
const sBlockRange = `${chunk[0].height}-${chunk[chunk.length-1].height}`
Logger.info(`Tracker : Beginning to process blocks ${sBlockRange}`)
// Process the chunk
chunk.sort((a,b) => a.height - b.height)
headersChunk = chunk
txsForBroadcast = []
processTask(blockWorker.OP_INIT)
}
module.exports.processChunk = processChunk
/**
* Process an error returned by a worker thread
* @param {object} e - error
*/
async function processWorkerError(e) {
return processWorkerMessage({
'op': currentOp,
'status': false,
'res': e
})
}
/**
* Process a message returned by a worker thread
* @param {object} msg - message sent by the worker thread
*/
async function processWorkerMessage(msg) {
nbTasksCompleted++
if (!msg.status) {
Logger.error(msg.res, 'Tracker : processWorkerMessage()')
} else if (msg.op == blockWorker.OP_CONFIRM) {
txsForBroadcast = txsForBroadcast.concat(msg.res)
}
if (nbTasksCompleted == nbTasksEnqueued) {
switch (msg.op) {
case blockWorker.OP_INIT:
// Process the transaction outputs
processTask(blockWorker.OP_PROCESS_OUTPUTS)
break
case blockWorker.OP_PROCESS_OUTPUTS:
// Process the transaction inputs
processTask(blockWorker.OP_PROCESS_INPUTS)
break
case blockWorker.OP_PROCESS_INPUTS:
// Store the blocks in db and get their id
const blockIds = await util.seriesCall(headersChunk, async header => {
return registerBlock(header)
})
// Confirm the transactions
processTask(blockWorker.OP_CONFIRM, blockIds)
break
case blockWorker.OP_CONFIRM:
// Notify new transactions and blocks
await Promise.all([
util.parallelCall(txsForBroadcast, async tx => {
notifyTx(tx)
}),
util.parallelCall(headersChunk, async header => {
notifyBlock(header)
})
])
// Process complete. Reset the workers
processTask(blockWorker.OP_RESET)
break
case blockWorker.OP_RESET:
const dt = ((Date.now()-t0)/1000).toFixed(1)
const per = ((Date.now()-t0)/headersChunk.length).toFixed(0)
const sBlockRange = `${headersChunk[0].height}-${headersChunk[headersChunk.length-1].height}`
Logger.info(`Tracker : Finished processing blocks ${sBlockRange}, ${dt}s, ${per}ms/block`)
// Release the semaphor
await _processBlocksSemaphor.release()
break
}
}
}
/**
* Execute an operation processing a block
* @param {integer} op - operation
* @param {*} args
*/
function processTask(op, args) {
currentOp = op
nbTasksEnqueued = 0
nbTasksCompleted = 0
switch (op) {
case blockWorker.OP_INIT:
for (let i = 0; i < headersChunk.length; i++) {
blockWorkers[i].postMessage({
'op': op,
'header': headersChunk[i]
})
nbTasksEnqueued++
}
break
case blockWorker.OP_PROCESS_OUTPUTS:
case blockWorker.OP_PROCESS_INPUTS:
case blockWorker.OP_RESET:
for (let i = 0; i < headersChunk.length; i++) {
blockWorkers[i].postMessage({'op': op})
nbTasksEnqueued++
}
break
case blockWorker.OP_CONFIRM:
for (let i = 0; i < headersChunk.length; i++) {
blockWorkers[i].postMessage({
'op': op,
'blockId': args[i]
})
nbTasksEnqueued++
}
break
default:
Logger.error(null, 'Tracker : processTask() : Unknown operation')
}
}
/**
* Notify a new transaction
* @param {object} tx - bitcoin transaction
*/
function notifyTx(tx) {
// Real-time client updates for this transaction.
// Any address input or output present in transaction
// is a potential client to notify.
if (notifSock)
notifSock.send(['transaction', JSON.stringify(tx)])
}
/**
* Notify a new block
* @param {string} header - block header
*/
function notifyBlock(header) {
// Notify clients of the block
if (notifSock)
notifSock.send(['block', JSON.stringify(header)])
}
/**
* Store a block in db
* @param {object} header - block header
* @returns {Promise - int} returns the id of the block
*/
async function registerBlock(header) {
const prevBlock = await dbProcessor.getBlockByHash(header.previousblockhash)
const prevID = (prevBlock && prevBlock.blockID) ? prevBlock.blockID : null
const blockId = await dbProcessor.addBlock({
blockHeight: header.height,
blockHash: header.hash,
blockTime: header.time,
blockParent: prevID
})
Logger.info(`Tracker : Added block ${header.height} (id=${blockId})`)
return blockId
}

57
tracker/mempool-processor.js

@ -11,8 +11,8 @@ const util = require('../lib/util')
const Logger = require('../lib/logger')
const db = require('../lib/db/mysql-db-wrapper')
const network = require('../lib/bitcoin/network')
const RpcClient = require('../lib/bitcoind-rpc/rpc-client')
const keys = require('../keys')[network.key]
const AbstractProcessor = require('./abstract-processor')
const Transaction = require('./transaction')
const TransactionsBundle = require('./transactions-bundle')
@ -20,14 +20,17 @@ const TransactionsBundle = require('./transactions-bundle')
/**
* A class managing a buffer for the mempool
*/
class MempoolProcessor extends AbstractProcessor {
class MempoolProcessor {
/**
* Constructor
* @param {object} notifSock - ZMQ socket used for notifications
*/
constructor(notifSock) {
super(notifSock)
// RPC client
this.client = new RpcClient()
// ZeroMQ socket for notifications sent to others components
this.notifSock = notifSock
// Mempool buffer
this.mempoolBuffer = new TransactionsBundle()
// ZeroMQ socket for bitcoind Txs messages
@ -152,14 +155,27 @@ class MempoolProcessor extends AbstractProcessor {
let currentMempool = new TransactionsBundle(this.mempoolBuffer.toArray())
this.mempoolBuffer.clear()
const filteredTxs = await currentMempool.prefilterTransactions()
const txsForBroadcast = new Map()
return util.seriesCall(filteredTxs, async filteredTx => {
let filteredTxs = await currentMempool.prefilterByOutputs()
await util.parallelCall(filteredTxs, async filteredTx => {
const tx = new Transaction(filteredTx)
const txCheck = await tx.checkTransaction()
if (txCheck && txCheck.broadcast)
this.notifyTx(txCheck.tx)
await tx.processOutputs()
if (tx.doBroadcast)
txsForBroadcast[tx.txid] = tx.tx
})
filteredTxs = await currentMempool.prefilterByInputs()
await util.parallelCall(filteredTxs, async filteredTx => {
const tx = new Transaction(filteredTx)
await tx.processInputs()
if (tx.doBroadcast)
txsForBroadcast[tx.txid] = tx.tx
})
// Send the notifications
for (let tx of txsForBroadcast.values())
this.notifyTx(tx)
}
/**
@ -208,6 +224,29 @@ class MempoolProcessor extends AbstractProcessor {
}
}
/**
* Notify a new transaction
* @param {object} tx - bitcoin transaction
*/
notifyTx(tx) {
// Real-time client updates for this transaction.
// Any address input or output present in transaction
// is a potential client to notify.
if (this.notifSock)
this.notifSock.send(['transaction', JSON.stringify(tx)])
}
/**
* Notify a new block
* @param {string} header - block header
*/
notifyBlock(header) {
// Notify clients of the block
if (this.notifSock)
this.notifSock.send(['block', JSON.stringify(header)])
}
/**
* Check unconfirmed transactions
* @returns {Promise}
@ -249,7 +288,7 @@ class MempoolProcessor extends AbstractProcessor {
const ntx = unconfirmedTxs.length
const dt = ((Date.now() - t0) / 1000).toFixed(1)
const per = (ntx == 0) ? 0 : ((Date.now() - t0) / ntx).toFixed(0)
Logger.info(`Tracker : Finished processing unconfirmed transactions ${dt}s, ${ntx} tx, ${per}ms/tx`)
Logger.info(`Tracker : Finished processing unconfirmed transactions ${dt}s, ${ntx} tx, ${per}ms/tx`)
}
/**

10
tracker/transaction.js

@ -47,10 +47,10 @@ class Transaction {
async checkTransaction() {
try {
// Process transaction inputs
await this._processInputs()
await this.processInputs()
// Process transaction outputs
await this._processOutputs()
await this.processOutputs()
// If this point reached with no errors,
// store the fact that this transaction was checked.
@ -73,7 +73,7 @@ class Transaction {
* Process transaction inputs
* @returns {Promise}
*/
async _processInputs() {
async processInputs() {
// Array of inputs spent
const spends = []
// Store input indices, keyed by `txid-outindex` for easy retrieval
@ -151,7 +151,7 @@ class Transaction {
* Process transaction outputs
* @returns {Promise}
*/
async _processOutputs() {
async processOutputs() {
// Store outputs, keyed by address. Values are arrays of outputs
const indexedOutputs = {}
@ -406,7 +406,7 @@ class Transaction {
locktime: this.tx.locktime,
})
Logger.info(`Tracker : Storing transaction ${this.txid}`)
Logger.info(`Tracker : Storing transaction ${this.txid}`)
}
}

107
tracker/transactions-bundle.js

@ -6,13 +6,9 @@
const _ = require('lodash')
const LRU = require('lru-cache')
const bitcoin = require('bitcoinjs-lib')
const util = require('../lib/util')
const db = require('../lib/db/mysql-db-wrapper')
const addrHelper = require('../lib/bitcoin/addresses-helper')
const network = require('../lib/bitcoin/network')
const keys = require('../keys')[network.key]
const activeNet = network.network
/**
@ -64,51 +60,46 @@ class TransactionsBundle {
/**
* Find the transactions of interest
* based on theirs inputs
* @returns {object[]} returns an array of transactions objects
*/
async prefilterTransactions() {
async prefilterByInputs() {
// Process transactions by slices of 5000 transactions
const MAX_NB_TXS = 5000
const lists = util.splitList(this.transactions, MAX_NB_TXS)
const results = await util.seriesCall(lists, txs => this._prefilterTransactions(txs))
const results = await util.parallelCall(lists, txs => this._prefilterByInputs(txs))
return _.flatten(results)
}
/**
* Find the transactions of interest (internal implementation)
* @params {object[]} transactions - array of transactions objects
* Find the transactions of interest
* based on theirs outputs
* @returns {object[]} returns an array of transactions objects
*/
async _prefilterTransactions(transactions) {
let inputs = []
let outputs = []
async prefilterByOutputs() {
// Process transactions by slices of 5000 transactions
const MAX_NB_TXS = 5000
const lists = util.splitList(this.transactions, MAX_NB_TXS)
const results = await util.parallelCall(lists, txs => this._prefilterByOutputs(txs))
return _.flatten(results)
}
// Store indices of txs to be processed
/**
* Find the transactions of interest
* based on theirs outputs (internal implementation)
* @params {object[]} txs - array of transactions objects
* @returns {object[]} returns an array of transactions objects
*/
async _prefilterByOutputs(txs) {
let addresses = []
let filteredIdxTxs = []
// Store txs indices, keyed by `txid-outindex`.
// Values are arrays of txs indices (for double spends)
let indexedInputs = {}
// Store txs indices, keyed by address.
// Values are arrays of txs indices
let indexedOutputs = {}
// Stores txs indices, keyed by txids
let indexedTxs = {}
//
// Prefilter against the outputs
//
// Index the transaction outputs
for (const i in transactions) {
const tx = transactions[i]
for (const i in txs) {
const tx = txs[i]
const txid = tx.getId()
indexedTxs[txid] = i
// If we already checked this tx
if (TransactionsBundle.cache.has(txid))
continue
@ -116,8 +107,7 @@ class TransactionsBundle {
try {
const script = tx.outs[j].script
const address = addrHelper.outputScript2Address(script)
outputs.push(address)
// Index the output
addresses.push(address)
if (!indexedOutputs[address])
indexedOutputs[address] = []
indexedOutputs[address].push(i)
@ -126,8 +116,7 @@ class TransactionsBundle {
}
// Prefilter
const outRes = await db.getUngroupedHDAccountsByAddresses(outputs)
const outRes = await db.getUngroupedHDAccountsByAddresses(addresses)
for (const i in outRes) {
const key = outRes[i].addrAddress
const idxTxs = indexedOutputs[key]
@ -138,35 +127,36 @@ class TransactionsBundle {
}
}
//
// Prefilter against the inputs
//
return filteredIdxTxs.map(x => txs[x])
}
/**
* Find the transactions of interest
* based on theirs inputs (internal implementation)
* @params {object[]} txs - array of transactions objects
* @returns {object[]} returns an array of transactions objects
*/
async _prefilterByInputs(txs) {
let inputs = []
let filteredIdxTxs = []
let indexedInputs = {}
// Index the transaction inputs
for (const i in transactions) {
const tx = transactions[i]
for (const i in txs) {
const tx = txs[i]
const txid = tx.getId()
// If we already checked this tx
if (TransactionsBundle.cache.has(txid))
continue
for (const j in tx.ins) {
const spendHash = tx.ins[j].hash
const spendTxid = Buffer.from(spendHash).reverse().toString('hex')
// Check if this input consumes an output
// generated by a transaction from this block
if (filteredIdxTxs.indexOf(indexedTxs[spendTxid]) > -1 && filteredIdxTxs.indexOf(i) == -1) {
filteredIdxTxs.push(i)
} else {
const spendIdx = tx.ins[j].index
inputs.push({txid: spendTxid, index: spendIdx})
// Index the input
const key = spendTxid + '-' + spendIdx
if (!indexedInputs[key])
indexedInputs[key] = []
indexedInputs[key].push(i)
}
const spendIdx = tx.ins[j].index
inputs.push({txid: spendTxid, index: spendIdx})
const key = spendTxid + '-' + spendIdx
if (!indexedInputs[key])
indexedInputs[key] = []
indexedInputs[key].push(i)
}
}
@ -174,7 +164,6 @@ class TransactionsBundle {
const lists = util.splitList(inputs, 1000)
const results = await util.parallelCall(lists, list => db.getOutputSpends(list))
const inRes = _.flatten(results)
for (const i in inRes) {
const key = inRes[i].txnTxid + '-' + inRes[i].outIndex
const idxTxs = indexedInputs[key]
@ -185,11 +174,7 @@ class TransactionsBundle {
}
}
//
// Returns the matching transactions
//
filteredIdxTxs.sort((a, b) => a - b);
return filteredIdxTxs.map(x => transactions[x])
return filteredIdxTxs.map(x => txs[x])
}
}

Loading…
Cancel
Save