From 07b60643975c8efac0df4f02232e93a3cf8328ee Mon Sep 17 00:00:00 2001 From: kenshin-samourai Date: Wed, 28 Apr 2021 19:25:06 +0200 Subject: [PATCH] parallelize remote importer --- lib/remote-importer/remote-importer.js | 358 +++++++++---------------- lib/util.js | 53 ---- 2 files changed, 130 insertions(+), 281 deletions(-) diff --git a/lib/remote-importer/remote-importer.js b/lib/remote-importer/remote-importer.js index 7e6a504..d56b319 100644 --- a/lib/remote-importer/remote-importer.js +++ b/lib/remote-importer/remote-importer.js @@ -60,86 +60,6 @@ class RemoteImporter { return this.importing[xpub] ? this.importing[xpub] : null } - /** - * Process the relations between a list of transactions - * @param {object[]} txs - array of transaction objects - * @returns {object} returns a object with 3 mappings - * {txMap: {], txChildren: {}, txParents: {}} - */ - _processTxsRelations(txs) { - const txMap = {} - const txChildren = {} - const txParents = {} - - for (let tx of txs) { - let txid = tx.txid - - // Populate txMap - txMap[txid] = tx - - // Create parent-child transaction associations - if (!txChildren[txid]) - txChildren[txid] = [] - - if (!txParents[txid]) - txParents[txid] = [] - - for (let i in tx.inputs) { - const input = tx.inputs[i] - let prev = input.outpoint.txid - if (!txMap[prev]) continue - - if (txParents[txid].indexOf(prev) == -1) - txParents[txid].push(prev) - - if (!txChildren[prev]) - txChildren[prev] = [] - - if (txChildren[prev].indexOf(txid) == -1) - txChildren[prev].push(txid) - } - } - - return { - txMap: txMap, - txChildren: txChildren, - txParents: txParents - } - } - - /** - * Import a list of transactions associated to a list of addresses - * @param {object[]} addresses - array of addresses objects - * @param {object[]} txns - array of transaction objects - * @returns {Promise} - */ - async _importTransactions(addresses, txns) { - let addrIdMap = {} - const addrChunks = util.splitList(addresses, 1000) - const addrIdMaps = await util.parallelCall(addrChunks, addrChunk => { - return db.getAddressesIds(addrChunk) - }) - for (let map of addrIdMaps) - addrIdMap = Object.assign(addrIdMap, map) - - // The transactions array must be topologically ordered, such that - // entries earlier in the array MUST NOT depend upon any entry later - // in the array. - const txMaps = this._processTxsRelations(txns) - const txOrdered = util.topologicalOrdering(txMaps.txParents, txMaps.txChildren) - const aTxs = [] - - for (let txid of txOrdered) - if (txMaps.txMap[txid]) - aTxs.push(txMaps.txMap[txid]) - - // Store the transactions by batches of 200 transactions - const txsChunks = util.splitList(aTxs, 200) - await util.seriesCall(txsChunks, txsChunk => { - return this.addTransactions(txsChunk, addrIdMap) - }) - } - /** * Import an HD account from remote sources * @param {string} xpub - HD Account @@ -168,33 +88,21 @@ class RemoteImporter { const t0 = Date.now() const chains = [0,1] - let gaps = [gap.external, gap.internal] // Allow custom higher gap limits // for local scans relying on bitcoind or on a local indexer - if (gapLimit - && ((keys.indexer.active == 'local_bitcoind') - || (keys.indexer.active == 'local_indexer')) - ) { - gaps = [gapLimit, gapLimit] - } + const isLocal = ['local_bitcoind', 'local_indexer'].includes(keys.indexer.active) + const gaps = (gapLimit && isLocal) ? [gapLimit, gapLimit] : [gap.external, gap.internal] startIndex = (startIndex == null) ? -1 : startIndex - 1 - const addrIdMap = {} - let txns = [] - let addresses = [] - try { const results = await util.parallelCall(chains, chain => { return this.xpubScan(xpub, chain, startIndex, startIndex, gaps[chain], type) }) // Accumulate addresses and transactions from all chains - for (let result of results) { - txns = txns.concat(result.transactions) - addresses = addresses.concat(result.addresses) - } - + const txns = results.map(r => r.transactions).flat() + const addresses = results.map(r => r.addresses).flat() const aAddresses = addresses.map(a => a.address) this.importing[xpub] = { @@ -204,9 +112,10 @@ class RemoteImporter { // Store the hdaccount and the addresses into the database await db.ensureHDAccountId(xpub, type) + const addrChunks = util.splitList(addresses, 1000) - await util.parallelCall(addrChunks, addrChunk => { - return db.addAddressesToHDAccount(xpub, addrChunk) + await util.parallelCall(addrChunks, chunk => { + return db.addAddressesToHDAccount(xpub, chunk) }) // Store the transaction into the database @@ -267,62 +176,39 @@ class RemoteImporter { d = u + G Logger.info(`Importer : derived M/${c}/${A.join(',')}`) - const addrMap = {} - for (let a of ret.addresses) - addrMap[a.address] = a - + const addrMap = ret.addresses.reduce((m,a) => (m[a.address] = a, m), {}) const aAddresses = ret.addresses.map(a => a.address) try { const results = await this.sources.getAddresses(aAddresses) - - let gotTransactions = false - const scanTx = [] - - for (let r of results) { - if (r.ntx == 0) continue - - // Address is used. Update used parameter - u = Math.max(u, addrMap[r.address].index) - gotTransactions = true - // TODO: Handle pathological case of many address transactions - while (r.txids.length > 0) { - let txid = r.txids.pop() - if (!txids[txid]) - scanTx.push(txid) - } - } + const filteredResults = results.flat().filter(r => r.ntx > 0) + const gotTransactions = filteredResults.length > 0 + const scanTx = filteredResults.map(r => r.txids).flat().filter(t => !txids[t]) + u = filteredResults.reduce((m,r) => Math.max(m, addrMap[r.address].index), u) Logger.info(`Importer : Got ${scanTx.length} transactions`) // Retrieve the transactions by batches of 200 transactions - const txsChunks = util.splitList(scanTx, 200) try { - for (let txsChunk of txsChunks) { - const txs = await rpcTxns.getTransactions(txsChunk, false) - for (let tx of txs) { - if (tx != null) { - ret.transactions.push(tx) - txids[tx.txid] = true - } - } - } + const txsChunks = util.splitList(scanTx, 200) + const txs = await util.seriesCall(txsChunks, chunk => { + return rpcTxns.getTransactions(chunk, false) + }) + const filteredTxs = txs.flat().filter(tx => tx != null) + ret.transactions = ret.transactions.concat(filteredTxs) + txids = filteredTxs.reduce((m,tx) => (m[tx.txid] = true, m), txids) } catch(e) { Logger.error(e, `Importer : RemoteImporter.xpubScan() : getTransactions error`) } if (gotTransactions) { - if (c == 0) - this.importing[xpub]['txs_ext'] = Object.keys(txids).length - else - this.importing[xpub]['txs_int'] = Object.keys(txids).length + const keyStatus = (c == 0) ? 'txs_ext' : 'txs_int' + this.importing[xpub][keyStatus] = Object.keys(txids).length // We must go deeper const result = await this.xpubScan(xpub, c, d, u, G, type, txids) // Accumulate results from further down the rabbit hole - for (let a of result.addresses) - ret.addresses.push(a) - for (let t of result.transactions) - ret.transactions.push(t) + ret.addresses = ret.addresses.concat(result.addresses) + ret.transactions = ret.transactions.concat(result.transactions) } } catch(e) { @@ -340,50 +226,34 @@ class RemoteImporter { */ async importAddresses(candidates, filterAddr) { const t0 = Date.now() - const txns = [] - const addresses = [] - const imported = [] - - for (let address of candidates) { - if (!this.importing[address]) { - addresses.push(address) - this.importing[address] = true - } else { - Logger.info(`Importer : Import overlap for ${address}. Skipping`) - } - } - if (addresses.length == 0) - return true + // Check if some addresses are currently processed + const overlap = candidates.filter(c => this.importing[c]) + for (let a of overlap) + Logger.info(`Importer : Import overlap for ${a}. Skipping`) + + // List addresses that need to be processed + const addresses = candidates.filter(c => !this.importing[c]) + this.importing = addresses.reduce((m,a) => (m[a] = true, m), this.importing) + + if (addresses.length == 0) return true Logger.info(`Importer : Importing ${addresses.join(',')}`) try { - const scanTx = [] const results = await this.sources.getAddresses(addresses, filterAddr) - - for (let r of results) { - // Mark the address as imported - imported.push(r.address) - if (r.ntx == 0) continue - // TODO: Handle pathological case of many address transactions - while (r.txids.length > 0) { - let txid = r.txids.pop() - if (scanTx.indexOf(txid) == -1) - scanTx.push(txid) - } - } + const imported = results.map(r => r.address) + const filteredResults = results.filter(r => r.ntx > 0) + const scanTx = [...new Set(filteredResults.map(r => r.txids).flat())] Logger.info(`Importer : Got ${scanTx.length} transactions`) // Retrieve the transactions by batches of 100 transactions const txsChunks = util.splitList(scanTx, 100) - for (let txsChunk of txsChunks) { - const txs = await rpcTxns.getTransactions(txsChunk, false) - for (let tx of txs) - if (tx != null) - txns.push(tx) - } + const txs = await util.seriesCall(txsChunks, chunk => { + return rpcTxns.getTransactions(chunk, false) + }) + const txns = txs.flat().filter(tx => tx != null) // Import addresses and transactions into the database await db.addAddresses(imported) @@ -398,7 +268,7 @@ class RemoteImporter { const N = addresses.length if (N > 0) - Logger.info(`Importer : Imported ${N} addresses in ${ts}s (${(dt/N).toFixed(0)} ms/addr)`) + Logger.info(`Importer : Imported ${N} addresses in ${ts}s (${(dt/N).toFixed(0)} ms/addr)`) for (let address of addresses) delete this.importing[address] @@ -407,84 +277,116 @@ class RemoteImporter { } } + /** + * Import a list of transactions associated to a list of addresses + * @param {object[]} addresses - array of addresses objects + * @param {object[]} txs - array of transaction objects + * @returns {Promise} + */ + async _importTransactions(addresses, txs) { + const addrChunks = util.splitList(addresses, 1000) + const addrIdMaps = await util.parallelCall(addrChunks, chunk => { + return db.getAddressesIds(chunk) + }) + const addrIdMap = Object.assign({}, ...addrIdMaps) + + // Process the transactions by batches of 200 transactions + const txsChunks = util.splitList(txs, 200) + await util.parallelCall(txsChunks, chunk => { + return this._addTransactions(chunk) + }) + await util.parallelCall(txsChunks, chunk => { + return this._addOutputs(chunk, addrIdMap) + }) + await util.parallelCall(txsChunks, chunk => { + return this._addInputs(chunk) + }) + } + /** * Add a collection of transactions to the database. * @param {object[]} txs - array of transaction objects - * @params {object} addrIdMap - map address => addrId * @returns {Promise} */ - async addTransactions(txs, addrIdMap) { + async _addTransactions(txs) { try { // Store the transactions into the database await db.addTransactions(txs) // Confirm the transactions if needed - const blocksHashes = new Set() - for (let tx of txs) - if (tx.block) - blocksHashes.add(tx.block.hash) + const blocksHashes = txs.filter(tx => tx.block).map(tx => tx.block.hash) + const blocks = await db.getBlocksByHashes(blocksHashes) - const blocks = await db.getBlocksByHashes(Array.from(blocksHashes)) - - for (let block of blocks) { - // Filter the transactions by blockHash + return util.parallelCall(blocks, block => { const filteredTxs = txs.filter(tx => (tx.block && tx.block.hash == block.blockHash)) - if (filteredTxs.length > 0) { - const txids = filteredTxs.map(tx => tx.txid) - // Asynchronous confirmations - db.confirmTransactions(txids, block.blockID) - } - } + if (filteredTxs.length == 0) return + const txids = filteredTxs.map(tx => tx.txid) + return db.confirmTransactions(txids, block.blockID) + }) + } catch(e) { + Logger.error(e, `Importer : RemoteImporter.addTransactions() :`) + } + } - // Retrieve the database ids for the transactions + /** + * Add a collection of transaction outputs to the database. + * @param {object[]} txs - array of transaction objects + * @params {object} addrIdMap - map address => addrId + * @returns {Promise} + */ + async _addOutputs(txs, addrIdMap) { + try { const txids = txs.map(tx => tx.txid) const mapTxsIds = await db.getTransactionsIds(txids) - // Store the outputs in db - const outputs = [] - for (let tx of txs) { - for (let output of tx.outputs) { - if (addrIdMap[output.address]) { - outputs.push({ - txnID: mapTxsIds[tx.txid], - addrID: addrIdMap[output.address], - outIndex: output.n, - outAmount: output.value, - outScript: output.scriptpubkey, - }) - } - } - } - await db.addOutputs(outputs) + const outputs = txs + .map(tx => tx.outputs.map(o => (o.txnID = mapTxsIds[tx.txid], o))) + .flat() + .filter(o => addrIdMap[o.address]) + .map(o => { return { + txnID: o.txnID, + addrID: addrIdMap[o.address], + outIndex: o.n, + outAmount: o.value, + outScript: o.scriptpubkey, + }}) + + return db.addOutputs(outputs) + + } catch(e) { + Logger.error(e, `Importer : RemoteImporter._addOutputs() :`) + } + } - // Store the inputs in db - const inputs = [] - const spent = {} + /** + * Add a collection of transaction inputs to the database. + * @param {object[]} txs - array of transaction objects + * @returns {Promise} + */ + async _addInputs(txs) { + try { + // Retrieve the database ids for the transactions + const txids = txs.map(tx => tx.txid) + const mapTxsIds = await db.getTransactionsIds(txids) // Get any outputs spent by the inputs of this transaction, // add those database outIDs to the corresponding inputs, and store. - let outpoints = [] - for (let tx of txs) - outpoints = outpoints.concat(tx.inputs.map(input => input.outpoint)) - + const outpoints = txs.map(tx => tx.inputs).flat().map(input => input.outpoint) const res = await db.getOutputIds(outpoints) - for (let r of res) - spent[`${r.txnTxid}-${r.outIndex}`] = r.outID - - for (let tx of txs) { - for (let input of tx.inputs) { - const key = `${input.outpoint.txid}-${input.outpoint.vout}` - if (spent[key]) { - inputs.push({ - outID: spent[key], - txnID: mapTxsIds[tx.txid], - inIndex: input.n, - inSequence: input.seq - }) - } - } - } - await db.addInputs(inputs) + const spent = res.reduce((m,r) => (m[`${r.txnTxid}-${r.outIndex}`] = r.outID, m), {}) + + const inputs = txs + .map(tx => tx.inputs.map(i => (i.txnID = mapTxsIds[tx.txid], i))) + .flat() + .filter(i => spent[`${i.outpoint.txid}-${i.outpoint.vout}`]) + .map(i => { return { + outID: spent[`${i.outpoint.txid}-${i.outpoint.vout}`], + txnID: i.txnID, + inIndex: i.n, + inSequence: i.seq + }}) + + return db.addInputs(inputs) } catch(e) { Logger.error(e, `Importer : RemoteImporter.addTransactions() :`) diff --git a/lib/util.js b/lib/util.js index 8dc625e..8bd5871 100644 --- a/lib/util.js +++ b/lib/util.js @@ -14,59 +14,6 @@ class Util { */ constructor() {} - /** - * Topological ordering of DAG - * https://en.wikipedia.org/wiki/Topological_sorting - * - * Kahn's algorithm - * - * L ← Empty list that will contain the sorted elements - * S ← Set of all nodes with no incoming edge - * while S is non-empty do - * remove a node n from S - * add n to tail of L - * for each node m with an edge e from n to m do - * remove edge e from the graph - * if m has no other incoming edges then - * insert m into S - * - * @param {object} parents - map of {[key]: [incoming edge keys]} - * @param {object} children - a map of {[key]: [outgoing edge keys]} - * @returns {object} - * if graph has edges then - * return error (graph has at least one cycle) - * else - * return L (a topologically sorted order) - */ - static topologicalOrdering(parents, children) { - const S = [] - - for (let node in parents) { - if (parents[node].length == 0) { - // Node has no parent (incoming edges) - S.push(node) - } - } - - const L = [] - - while (S.length > 0) { - const node = S.pop() - L.push(node) - - // Loop over nodes that depend on node - for (let child of children[node]) { - let i = parents[child].indexOf(node) - if (i > -1) - parents[child].splice(i, 1) - - if (parents[child].length == 0) - S.push(child) - } - } - return L - } - /** * Serialize a series of asynchronous calls to a function * over a list of objects