Browse Source

refactoring of tracker prior to parallel processing of blocks

umbrel
kenshin-samourai 4 years ago
parent
commit
63a62e9ce6
  1. 122
      tracker/block.js
  2. 135
      tracker/blockchain-processor.js
  3. 23
      tracker/mempool-processor.js
  4. 8
      tracker/transaction.js
  5. 109
      tracker/transactions-bundle.js

122
tracker/block.js

@ -26,44 +26,91 @@ class Block extends TransactionsBundle {
super()
this.hex = hex
this.header = header
try {
const block = bitcoin.Block.fromHex(hex)
this.transactions = block.transactions
} catch (e) {
Logger.error(e, 'Tracker : Block()')
Logger.error(null, header)
return Promise.reject(e)
}
}
/**
* Register the block and transactions of interest in db
* @returns {Promise - object[]} returns an array of transactions to be broadcast
*/
async checkBlock() {
async processBlock() {
Logger.info('Tracker : Beginning to process new block.')
let block
const txsForBroadcast = []
const t0 = Date.now()
try {
block = bitcoin.Block.fromHex(this.hex)
this.transactions = block.transactions
} catch (e) {
Logger.error(e, 'Tracker : Block.checkBlock()')
Logger.error(null, this.header)
return Promise.reject(e)
}
const txsForBroadcast = new Map()
const t0 = Date.now()
let ntx = 0
// Filter transactions
const filteredTxs = await this.prefilterTransactions()
// Check filtered transactions
// and broadcast notifications
await util.seriesCall(filteredTxs, async tx => {
const filteredTx = new Transaction(tx)
const txCheck = await filteredTx.checkTransaction()
if (txCheck && txCheck.broadcast)
txsForBroadcast.push(txCheck.tx)
const txsForBroadcast1 = await this.processOutputs()
txsForBroadcast1.map(tx => {txsForBroadcast.set(tx.getId(), tx)})
const txsForBroadcast2 = await this.processInputs()
txsForBroadcast2.map(tx => {txsForBroadcast.set(tx.getId(), tx)})
const aTxsForBroadcast = [...txsForBroadcast.values()]
const blockId = await this.registerBlock()
await this.confirmTransactions(aTxsForBroadcast, blockId)
// Logs and result returned
const ntx = this.transactions.length
const dt = ((Date.now()-t0)/1000).toFixed(1)
const per = ((Date.now()-t0)/ntx).toFixed(0)
Logger.info(`Tracker : Finished block ${this.header.height}, ${dt}s, ${ntx} tx, ${per}ms/tx`)
return aTxsForBroadcast
}
/**
* Process the transaction outputs
* @returns {Promise - object[]} returns an array of transactions to be broadcast
*/
async processOutputs() {
const txsForBroadcast = new Set()
console.time('prefilterByOutputs')
const filteredTxs = await this.prefilterByOutputs()
console.timeEnd('prefilterByOutputs')
console.time('processOutputs')
await util.parallelCall(filteredTxs, async filteredTx => {
const tx = new Transaction(filteredTx)
await tx.processOutputs()
if (tx.doBroadcast)
txsForBroadcast.add(tx.tx)
})
console.timeEnd('processOutputs')
return [...txsForBroadcast]
}
/**
* Process the transaction inputs
* @returns {Promise - object[]} returns an array of transactions to be broadcast
*/
async processInputs() {
const txsForBroadcast = new Set()
const filteredTxs = await this.prefilterByInputs()
await util.parallelCall(filteredTxs, async filteredTx => {
const tx = new Transaction(filteredTx)
await tx.processInputs()
if (tx.doBroadcast)
txsForBroadcast.add(tx.tx)
})
return [...txsForBroadcast]
}
// Retrieve the previous block
// and store the new block into the database
/**
* Store the block in db
* @returns {Promise - int} returns the id of the block
*/
async registerBlock() {
const prevBlock = await db.getBlockByHash(this.header.previousblockhash)
const prevID = (prevBlock && prevBlock.blockID) ? prevBlock.blockID : null
@ -76,18 +123,19 @@ class Block extends TransactionsBundle {
Logger.info(`Tracker : Added block ${this.header.height} (id=${blockId})`)
// Confirms the transactions
const txids = this.transactions.map(t => t.getId())
ntx = txids.length
const txidLists = util.splitList(txids, 100)
await util.parallelCall(txidLists, list => db.confirmTransactions(list, blockId))
// Logs and result returned
const dt = ((Date.now()-t0)/1000).toFixed(1)
const per = ((Date.now()-t0)/ntx).toFixed(0)
Logger.info(`Tracker : Finished block ${this.header.height}, ${dt}s, ${ntx} tx, ${per}ms/tx`)
return blockId
}
return txsForBroadcast
/**
* Confirm the transactions in db
* @param {Set} txs - set of transactions stored in db
* @param {int} blockId - id of the block
* r@returns {Promise}
*/
async confirmTransactions(txs, blockId) {
const txids = txs.map(t => t.getId())
const txidLists = util.splitList(txids, 100)
return util.parallelCall(txidLists, list => db.confirmTransactions(list, blockId))
}
/**

135
tracker/blockchain-processor.js

@ -161,17 +161,12 @@ class BlockchainProcessor extends AbstractProcessor {
Logger.info(`Tracker : Sync ${blockRange.length} blocks`)
// Process the blocks
return util.seriesCall(blockRange, async height => {
try {
const hash = await this.client.getblockhash(height)
const header = await this.client.getblockheader(hash)
return this.processBlock(header)
} catch(e) {
Logger.error(e, 'Tracker : BlockchainProcessor.catchupNormalMode()')
process.exit()
}
}, 'Tracker syncing', true)
try {
return this.processBlockRange(blockRange)
} catch(e) {
Logger.error(e, 'Tracker : BlockchainProcessor.catchupNormalMode()')
process.exit()
}
} catch(e) {
Logger.error(e, 'Tracker : BlockchainProcessor.catchupNormalMode()')
@ -259,9 +254,7 @@ class BlockchainProcessor extends AbstractProcessor {
await this.rewind(knownHeight)
// Process the blocks
return await util.seriesCall(headers, header => {
return this.processBlock(header)
})
return await this.processBlocks(headers)
} catch(e) {
Logger.error(e, 'Tracker : BlockchainProcessor.onBlockHash()')
@ -340,44 +333,94 @@ class BlockchainProcessor extends AbstractProcessor {
Logger.info(`Blocks Rescan : starting a rescan for ${blockRange.length} blocks`)
// Process the blocks
return util.seriesCall(blockRange, async height => {
try {
Logger.info(`Tracker : Rescanning block ${height}`)
const hash = await this.client.getblockhash(height)
const header = await this.client.getblockheader(hash)
return this.processBlock(header)
} catch(e) {
Logger.error(e, 'Tracker : BlockchainProcessor.rescan()')
throw e
}
}, 'Tracker rescan', true)
try {
return this.processBlockRange(blockRange)
} catch(e) {
Logger.error(e, 'Tracker : BlockchainProcessor.rescan()')
throw e
}
}
/**
* Process a block
* @param {object} header - block header
* @returns {Promise}
* Process a series of blocks
* @param {object[]} headers - array of block headers
*/
async processBlock(header) {
try {
// Get raw block hex string from bitcoind
const hex = await this.client.getblock(header.hash, false)
const block = new Block(hex, header)
const txsForBroadcast = await block.checkBlock()
// Send notifications
for (let tx of txsForBroadcast)
this.notifyTx(tx)
async processBlocks(headers) {
const MAX_NB_BLOCKS = 5
const chunks = util.splitList(headers, MAX_NB_BLOCKS)
return util.seriesCall(chunks, async chunk => {
const t0 = Date.now()
const sBlockRange = `${chunk[0].height}-${chunk[chunk.length-1].height}`
Logger.info(`Tracker : Beginning to process blocks ${sBlockRange}.`)
const txsForBroadcast = new Map()
// Initialize the blocks and process the transaction outputs
console.time('outputs')
const blocks = await util.parallelCall(chunk, async header => {
// Get raw block hex string from bitcoind
const hex = await this.client.getblock(header.hash, false)
const block = new Block(hex, header)
// Process the transaction outputs
const txs = await block.processOutputs()
txsForBroadcast.set(block.header.hash, txs)
return block
})
console.timeEnd('outputs')
// Process the transaction inputs
console.time('inputs')
await util.parallelCall(blocks, async block => {
const txs = await block.processInputs()
txsForBroadcast.set(
block.header.hash,
txs.concat(txsForBroadcast.get(block.header.hash))
)
})
console.timeEnd('inputs')
// Sort the blocks by ascending height
blocks.sort((a,b) => a.header.height - b.header.height)
// Register the block and confirm the transactions
console.time('confirm')
await util.seriesCall(blocks, async block => {
const blockId = await block.registerBlock()
const aTxsForBroadcast = [...new Set(txsForBroadcast.get(block.header.hash))]
await block.confirmTransactions(aTxsForBroadcast, blockId)
})
console.timeEnd('confirm')
console.time('notify')
await util.parallelCall(blocks, async block => {
const aTxsForBroadcast = [...new Set(txsForBroadcast.get(block.header.hash))]
for (let tx of aTxsForBroadcast)
this.notifyTx(tx)
this.notifyBlock(block.header)
})
console.timeEnd('notify')
this.notifyBlock(header)
const dt = ((Date.now()-t0)/1000).toFixed(1)
const per = ((Date.now()-t0)/MAX_NB_BLOCKS).toFixed(0)
Logger.info(`Tracker : Finished processing blocks ${sBlockRange}, ${dt}s, ${per}ms/block`)
})
}
} catch(e) {
// The show must go on.
// TODO: further notification that this block did not check out
Logger.error(e, 'Tracker : BlockchainProcessor.processBlock()')
}
/**
* Process a range of blocks
* @param {int[]} heights - a range of block heights
*/
async processBlockRange(heights) {
const MAX_NB_BLOCKS = 5
const chunks = util.splitList(heights, MAX_NB_BLOCKS)
return util.seriesCall(chunks, async chunk => {
console.time('headers')
const headers = await util.parallelCall(chunk, async height => {
// Get raw block hex string from bitcoind
const hash = await this.client.getblockhash(height)
return await this.client.getblockheader(hash)
})
console.timeEnd('headers')
return this.processBlocks(headers)
})
}
/**

23
tracker/mempool-processor.js

@ -152,14 +152,27 @@ class MempoolProcessor extends AbstractProcessor {
let currentMempool = new TransactionsBundle(this.mempoolBuffer.toArray())
this.mempoolBuffer.clear()
const filteredTxs = await currentMempool.prefilterTransactions()
const txsForBroadcast = new Map()
return util.seriesCall(filteredTxs, async filteredTx => {
let filteredTxs = await currentMempool.prefilterByOutputs()
await util.parallelCall(filteredTxs, async filteredTx => {
const tx = new Transaction(filteredTx)
const txCheck = await tx.checkTransaction()
if (txCheck && txCheck.broadcast)
this.notifyTx(txCheck.tx)
await tx.processOutputs()
if (tx.doBroadcast)
txsForBroadcast[tx.txid] = tx.tx
})
filteredTxs = await currentMempool.prefilterByInputs()
await util.parallelCall(filteredTxs, async filteredTx => {
const tx = new Transaction(filteredTx)
await tx.processInputs()
if (tx.doBroadcast)
txsForBroadcast[tx.txid] = tx.tx
})
// Send the notifications
for (let tx of txsForBroadcast.values())
this.notifyTx(tx)
}
/**

8
tracker/transaction.js

@ -47,10 +47,10 @@ class Transaction {
async checkTransaction() {
try {
// Process transaction inputs
await this._processInputs()
await this.processInputs()
// Process transaction outputs
await this._processOutputs()
await this.processOutputs()
// If this point reached with no errors,
// store the fact that this transaction was checked.
@ -73,7 +73,7 @@ class Transaction {
* Process transaction inputs
* @returns {Promise}
*/
async _processInputs() {
async processInputs() {
// Array of inputs spent
const spends = []
// Store input indices, keyed by `txid-outindex` for easy retrieval
@ -151,7 +151,7 @@ class Transaction {
* Process transaction outputs
* @returns {Promise}
*/
async _processOutputs() {
async processOutputs() {
// Store outputs, keyed by address. Values are arrays of outputs
const indexedOutputs = {}

109
tracker/transactions-bundle.js

@ -6,13 +6,9 @@
const _ = require('lodash')
const LRU = require('lru-cache')
const bitcoin = require('bitcoinjs-lib')
const util = require('../lib/util')
const db = require('../lib/db/mysql-db-wrapper')
const addrHelper = require('../lib/bitcoin/addresses-helper')
const network = require('../lib/bitcoin/network')
const keys = require('../keys')[network.key]
const activeNet = network.network
/**
@ -64,51 +60,47 @@ class TransactionsBundle {
/**
* Find the transactions of interest
* based on theirs inputs
* @returns {object[]} returns an array of transactions objects
*/
async prefilterTransactions() {
async prefilterByInputs() {
// Process transactions by slices of 5000 transactions
const MAX_NB_TXS = 5000
const lists = util.splitList(this.transactions, MAX_NB_TXS)
const results = await util.seriesCall(lists, txs => this._prefilterTransactions(txs))
const results = await util.parallelCall(lists, txs => this._prefilterByInputs(txs))
return _.flatten(results)
}
/**
* Find the transactions of interest (internal implementation)
* @params {object[]} transactions - array of transactions objects
* Find the transactions of interest
* based on theirs outputs
* @returns {object[]} returns an array of transactions objects
*/
async _prefilterTransactions(transactions) {
let inputs = []
let outputs = []
async prefilterByOutputs() {
// Process transactions by slices of 5000 transactions
const MAX_NB_TXS = 5000
const lists = util.splitList(this.transactions, MAX_NB_TXS)
const results = await util.parallelCall(lists, txs => this._prefilterByOutputs(txs))
return _.flatten(results)
}
// Store indices of txs to be processed
/**
* Find the transactions of interest
* based on theirs outputs (internal implementation)
* @params {object[]} txs - array of transactions objects
* @returns {object[]} returns an array of transactions objects
*/
async _prefilterByOutputs(txs) {
let addresses = []
let filteredIdxTxs = []
// Store txs indices, keyed by `txid-outindex`.
// Values are arrays of txs indices (for double spends)
let indexedInputs = {}
// Store txs indices, keyed by address.
// Values are arrays of txs indices
let indexedOutputs = {}
// Stores txs indices, keyed by txids
let indexedTxs = {}
//
// Prefilter against the outputs
//
// Index the transaction outputs
for (const i in transactions) {
const tx = transactions[i]
console.time('outputScript2Address')
for (const i in txs) {
const tx = txs[i]
const txid = tx.getId()
indexedTxs[txid] = i
// If we already checked this tx
if (TransactionsBundle.cache.has(txid))
continue
@ -116,18 +108,17 @@ class TransactionsBundle {
try {
const script = tx.outs[j].script
const address = addrHelper.outputScript2Address(script)
outputs.push(address)
// Index the output
addresses.push(address)
if (!indexedOutputs[address])
indexedOutputs[address] = []
indexedOutputs[address].push(i)
} catch (e) {}
}
}
console.timeEnd('outputScript2Address')
// Prefilter
const outRes = await db.getUngroupedHDAccountsByAddresses(outputs)
const outRes = await db.getUngroupedHDAccountsByAddresses(addresses)
for (const i in outRes) {
const key = outRes[i].addrAddress
const idxTxs = indexedOutputs[key]
@ -138,35 +129,36 @@ class TransactionsBundle {
}
}
//
// Prefilter against the inputs
//
return filteredIdxTxs.map(x => txs[x])
}
/**
* Find the transactions of interest
* based on theirs inputs (internal implementation)
* @params {object[]} txs - array of transactions objects
* @returns {object[]} returns an array of transactions objects
*/
async _prefilterByInputs(txs) {
let inputs = []
let filteredIdxTxs = []
let indexedInputs = {}
// Index the transaction inputs
for (const i in transactions) {
const tx = transactions[i]
for (const i in txs) {
const tx = txs[i]
const txid = tx.getId()
// If we already checked this tx
if (TransactionsBundle.cache.has(txid))
continue
for (const j in tx.ins) {
const spendHash = tx.ins[j].hash
const spendTxid = Buffer.from(spendHash).reverse().toString('hex')
// Check if this input consumes an output
// generated by a transaction from this block
if (filteredIdxTxs.indexOf(indexedTxs[spendTxid]) > -1 && filteredIdxTxs.indexOf(i) == -1) {
filteredIdxTxs.push(i)
} else {
const spendIdx = tx.ins[j].index
inputs.push({txid: spendTxid, index: spendIdx})
// Index the input
const key = spendTxid + '-' + spendIdx
if (!indexedInputs[key])
indexedInputs[key] = []
indexedInputs[key].push(i)
}
const spendIdx = tx.ins[j].index
inputs.push({txid: spendTxid, index: spendIdx})
const key = spendTxid + '-' + spendIdx
if (!indexedInputs[key])
indexedInputs[key] = []
indexedInputs[key].push(i)
}
}
@ -174,7 +166,6 @@ class TransactionsBundle {
const lists = util.splitList(inputs, 1000)
const results = await util.parallelCall(lists, list => db.getOutputSpends(list))
const inRes = _.flatten(results)
for (const i in inRes) {
const key = inRes[i].txnTxid + '-' + inRes[i].outIndex
const idxTxs = indexedInputs[key]
@ -185,11 +176,7 @@ class TransactionsBundle {
}
}
//
// Returns the matching transactions
//
filteredIdxTxs.sort((a, b) => a - b);
return filteredIdxTxs.map(x => transactions[x])
return filteredIdxTxs.map(x => txs[x])
}
}

Loading…
Cancel
Save