Browse Source

optimize imports and rescans with sql batch requests

umbrel
kenshin-samourai 5 years ago
parent
commit
2abec10c6a
  1. 119
      lib/remote-importer/remote-importer.js

119
lib/remote-importer/remote-importer.js

@ -115,7 +115,10 @@ class RemoteImporter {
if (txMaps.txMap[txid])
aTxs.push(txMaps.txMap[txid])
return util.seriesCall(aTxs, tx => this.addTransaction(tx, addrIdMap))
// Store the transactions by batches of 200 transactions
const txsChunks = util.splitList(aTxs, 200)
for (let txsChunk of txsChunks)
await this.addTransactions(txsChunk, addrIdMap)
}
/**
@ -260,8 +263,8 @@ class RemoteImporter {
Logger.info(` Got ${scanTx.length} transactions`)
// Retrieve the transactions by batches of 100 transactions
const txsChunks = util.splitList(scanTx, 100)
// Retrieve the transactions by batches of 200 transactions
const txsChunks = util.splitList(scanTx, 200)
try {
for (let txsChunk of txsChunks) {
const txs = await rpcTxns.getTransactions(txsChunk, false)
@ -369,70 +372,86 @@ class RemoteImporter {
}
/**
* Add a transaction to the database.
* @param {object} tx - transaction object
* @params {Promise}
* Add a collection of transactions to the database.
* @param {object[]} txs - array of transaction objects
* @params {object} addrIdMap - map address => addrId
* @returns {Promise}
*/
async addTransaction(tx, addrIdMap) {
const outputs = []
async addTransactions(txs, addrIdMap) {
try {
// Store the transaction into the database
await db.addTransaction(tx)
// Confirm the transaction
if (tx.block) {
const block = await db.getBlockByHash(tx.block.hash)
if (block)
await db.confirmTransactions([tx.txid], block.blockID)
// Store the transactions into the database
await db.addTransactions(txs)
// Confirm the transactions if needed
const blocksHashes = new Set()
for (let tx of txs)
if (tx.block)
blocksHashes.add(tx.block.hash)
const blocks = await db.getBlocksByHashes(Array.from(blocksHashes))
for (let block of blocks) {
// Filter the transactions by blockHash
const filteredTxs = txs.filter(tx => (tx.block && tx.block.hash == block.blockHash))
if (filteredTxs.length > 0) {
const txids = filteredTxs.map(tx => tx.txid)
// Asynchronous confirmations
db.confirmTransactions(txids, block.blockID)
}
}
// Retrieve the database id for the transaction
let txnID = await db.ensureTransactionId(tx.txid)
// Process the outputs
for (let output of tx.outputs) {
if (addrIdMap[output.address]) {
outputs.push({
txnID,
addrID: addrIdMap[output.address],
outIndex: output.n,
outAmount: output.value,
outScript: output.scriptpubkey,
})
// Retrieve the database ids for the transactions
const txids = txs.map(tx => tx.txid)
const mapTxsIds = await db.getTransactionsIds(txids)
// Store the outputs in db
const outputs = []
for (let tx of txs) {
for (let output of tx.outputs) {
if (addrIdMap[output.address]) {
outputs.push({
txnID: mapTxsIds[tx.txid],
addrID: addrIdMap[output.address],
outIndex: output.n,
outAmount: output.value,
outScript: output.scriptpubkey,
})
}
}
}
await db.addOutputs(outputs)
// Process the inputs
// Get any outputs spent by the inputs of this transaction, add those
// database outIDs to the corresponding transaction inputs, and store.
const res = await db.getOutputIds(tx.inputs.map(input => input.outpoint))
const spent = {}
// Store the inputs in db
const inputs = []
const spent = {}
// Get any outputs spent by the inputs of this transaction,
// add those database outIDs to the corresponding inputs, and store.
let outpoints = []
for (let tx of txs)
outpoints = outpoints.concat(tx.inputs.map(input => input.outpoint))
const res = await db.getOutputIds(outpoints)
for (let r of res)
spent[`${r.txnTxid}-${r.outIndex}`] = r.outID
for (let input of tx.inputs) {
let key = `${input.outpoint.txid}-${input.outpoint.vout}`
if (spent[key]) {
inputs.push({
outID: spent[key],
txnID,
inIndex: input.n,
inSequence: input.seq
})
for (let tx of txs) {
for (let input of tx.inputs) {
const key = `${input.outpoint.txid}-${input.outpoint.vout}`
if (spent[key]) {
inputs.push({
outID: spent[key],
txnID: mapTxsIds[tx.txid],
inIndex: input.n,
inSequence: input.seq
})
}
}
}
await db.addInputs(inputs)
} catch(e) {
Logger.error(e, `RemoteImporter.addTransaction() : xpub ${tx.txid}`)
Logger.error(null, JSON.stringify(tx,null,2))
Logger.error(e, `RemoteImporter.addTransactions() :`)
}
}

Loading…
Cancel
Save