Browse Source

Merge branch 'feat_dojo_optim_importer2' into 'develop'

optimize remote importer

See merge request dojo/samourai-dojo!219
umbrel
kenshin-samourai 4 years ago
parent
commit
b4b17360eb
  1. 353
      lib/remote-importer/remote-importer.js
  2. 53
      lib/util.js

353
lib/remote-importer/remote-importer.js

@ -60,79 +60,6 @@ class RemoteImporter {
return this.importing[xpub] ? this.importing[xpub] : null return this.importing[xpub] ? this.importing[xpub] : null
} }
/**
* Process the relations between a list of transactions
* @param {object[]} txs - array of transaction objects
* @returns {object} returns a object with 3 mappings
* {txMap: {], txChildren: {}, txParents: {}}
*/
_processTxsRelations(txs) {
const txMap = {}
const txChildren = {}
const txParents = {}
for (let tx of txs) {
let txid = tx.txid
// Populate txMap
txMap[txid] = tx
// Create parent-child transaction associations
if (!txChildren[txid])
txChildren[txid] = []
if (!txParents[txid])
txParents[txid] = []
for (let i in tx.inputs) {
const input = tx.inputs[i]
let prev = input.outpoint.txid
if (!txMap[prev]) continue
if (txParents[txid].indexOf(prev) == -1)
txParents[txid].push(prev)
if (!txChildren[prev])
txChildren[prev] = []
if (txChildren[prev].indexOf(txid) == -1)
txChildren[prev].push(txid)
}
}
return {
txMap: txMap,
txChildren: txChildren,
txParents: txParents
}
}
/**
* Import a list of transactions associated to a list of addresses
* @param {object[]} addresses - array of addresses objects
* @param {object[]} txns - array of transaction objects
* @returns {Promise}
*/
async _importTransactions(addresses, txns) {
const addrIdMap = await db.getAddressesIds(addresses)
// The transactions array must be topologically ordered, such that
// entries earlier in the array MUST NOT depend upon any entry later
// in the array.
const txMaps = this._processTxsRelations(txns)
const txOrdered = util.topologicalOrdering(txMaps.txParents, txMaps.txChildren)
const aTxs = []
for (let txid of txOrdered)
if (txMaps.txMap[txid])
aTxs.push(txMaps.txMap[txid])
// Store the transactions by batches of 200 transactions
const txsChunks = util.splitList(aTxs, 200)
for (let txsChunk of txsChunks)
await this.addTransactions(txsChunk, addrIdMap)
}
/** /**
* Import an HD account from remote sources * Import an HD account from remote sources
* @param {string} xpub - HD Account * @param {string} xpub - HD Account
@ -161,32 +88,22 @@ class RemoteImporter {
const t0 = Date.now() const t0 = Date.now()
const chains = [0,1] const chains = [0,1]
let gaps = [gap.external, gap.internal]
// Allow custom higher gap limits // Allow custom higher gap limits
// for local scans relying on bitcoind or on a local indexer // for local scans relying on bitcoind or on a local indexer
if (gapLimit const isLocal = ['local_bitcoind', 'local_indexer'].includes(keys.indexer.active)
&& ((keys.indexer.active == 'local_bitcoind') const gaps = (gapLimit && isLocal) ? [gapLimit, gapLimit] : [gap.external, gap.internal]
|| (keys.indexer.active == 'local_indexer'))
) {
gaps = [gapLimit, gapLimit]
}
startIndex = (startIndex == null) ? -1 : startIndex - 1 startIndex = (startIndex == null) ? -1 : startIndex - 1
const addrIdMap = {}
let txns = []
let addresses = []
try { try {
const results = await util.parallelCall(chains, chain => { const results = await util.parallelCall(chains, chain => {
return this.xpubScan(xpub, chain, startIndex, startIndex, gaps[chain], type) return this.xpubScan(xpub, chain, startIndex, startIndex, gaps[chain], type)
}) })
// Accumulate addresses and transactions from all chains // Accumulate addresses and transactions from all chains
for (let result of results) { const txns = results.map(r => r.transactions).flat()
txns = txns.concat(result.transactions) const addresses = results.map(r => r.addresses).flat()
addresses = addresses.concat(result.addresses) const aAddresses = addresses.map(a => a.address)
}
this.importing[xpub] = { this.importing[xpub] = {
'status': this.STATUS_IMPORT, 'status': this.STATUS_IMPORT,
@ -195,10 +112,13 @@ class RemoteImporter {
// Store the hdaccount and the addresses into the database // Store the hdaccount and the addresses into the database
await db.ensureHDAccountId(xpub, type) await db.ensureHDAccountId(xpub, type)
await db.addAddressesToHDAccount(xpub, addresses)
const addrChunks = util.splitList(addresses, 1000)
await util.parallelCall(addrChunks, chunk => {
return db.addAddressesToHDAccount(xpub, chunk)
})
// Store the transaction into the database // Store the transaction into the database
const aAddresses = addresses.map(a => a.address)
await this._importTransactions(aAddresses, txns) await this._importTransactions(aAddresses, txns)
} catch(e) { } catch(e) {
@ -256,62 +176,39 @@ class RemoteImporter {
d = u + G d = u + G
Logger.info(`Importer : derived M/${c}/${A.join(',')}`) Logger.info(`Importer : derived M/${c}/${A.join(',')}`)
const addrMap = {} const addrMap = ret.addresses.reduce((m,a) => (m[a.address] = a, m), {})
for (let a of ret.addresses)
addrMap[a.address] = a
const aAddresses = ret.addresses.map(a => a.address) const aAddresses = ret.addresses.map(a => a.address)
try { try {
const results = await this.sources.getAddresses(aAddresses) const results = await this.sources.getAddresses(aAddresses)
const filteredResults = results.flat().filter(r => r.ntx > 0)
let gotTransactions = false const gotTransactions = filteredResults.length > 0
const scanTx = [] const scanTx = filteredResults.map(r => r.txids).flat().filter(t => !txids[t])
u = filteredResults.reduce((m,r) => Math.max(m, addrMap[r.address].index), u)
for (let r of results) {
if (r.ntx == 0) continue
// Address is used. Update used parameter
u = Math.max(u, addrMap[r.address].index)
gotTransactions = true
// TODO: Handle pathological case of many address transactions
while (r.txids.length > 0) {
let txid = r.txids.pop()
if (!txids[txid])
scanTx.push(txid)
}
}
Logger.info(`Importer : Got ${scanTx.length} transactions`) Logger.info(`Importer : Got ${scanTx.length} transactions`)
// Retrieve the transactions by batches of 200 transactions // Retrieve the transactions by batches of 200 transactions
const txsChunks = util.splitList(scanTx, 200)
try { try {
for (let txsChunk of txsChunks) { const txsChunks = util.splitList(scanTx, 200)
const txs = await rpcTxns.getTransactions(txsChunk, false) const txs = await util.seriesCall(txsChunks, chunk => {
for (let tx of txs) { return rpcTxns.getTransactions(chunk, false)
if (tx != null) { })
ret.transactions.push(tx) const filteredTxs = txs.flat().filter(tx => tx != null)
txids[tx.txid] = true ret.transactions = ret.transactions.concat(filteredTxs)
} txids = filteredTxs.reduce((m,tx) => (m[tx.txid] = true, m), txids)
}
}
} catch(e) { } catch(e) {
Logger.error(e, `Importer : RemoteImporter.xpubScan() : getTransactions error`) Logger.error(e, `Importer : RemoteImporter.xpubScan() : getTransactions error`)
} }
if (gotTransactions) { if (gotTransactions) {
if (c == 0) const keyStatus = (c == 0) ? 'txs_ext' : 'txs_int'
this.importing[xpub]['txs_ext'] = Object.keys(txids).length this.importing[xpub][keyStatus] = Object.keys(txids).length
else
this.importing[xpub]['txs_int'] = Object.keys(txids).length
// We must go deeper // We must go deeper
const result = await this.xpubScan(xpub, c, d, u, G, type, txids) const result = await this.xpubScan(xpub, c, d, u, G, type, txids)
// Accumulate results from further down the rabbit hole // Accumulate results from further down the rabbit hole
for (let a of result.addresses) ret.addresses = ret.addresses.concat(result.addresses)
ret.addresses.push(a) ret.transactions = ret.transactions.concat(result.transactions)
for (let t of result.transactions)
ret.transactions.push(t)
} }
} catch(e) { } catch(e) {
@ -329,50 +226,34 @@ class RemoteImporter {
*/ */
async importAddresses(candidates, filterAddr) { async importAddresses(candidates, filterAddr) {
const t0 = Date.now() const t0 = Date.now()
const txns = []
const addresses = []
const imported = []
for (let address of candidates) {
if (!this.importing[address]) {
addresses.push(address)
this.importing[address] = true
} else {
Logger.info(`Importer : Import overlap for ${address}. Skipping`)
}
}
if (addresses.length == 0) // Check if some addresses are currently processed
return true const overlap = candidates.filter(c => this.importing[c])
for (let a of overlap)
Logger.info(`Importer : Import overlap for ${a}. Skipping`)
// List addresses that need to be processed
const addresses = candidates.filter(c => !this.importing[c])
this.importing = addresses.reduce((m,a) => (m[a] = true, m), this.importing)
if (addresses.length == 0) return true
Logger.info(`Importer : Importing ${addresses.join(',')}`) Logger.info(`Importer : Importing ${addresses.join(',')}`)
try { try {
const scanTx = []
const results = await this.sources.getAddresses(addresses, filterAddr) const results = await this.sources.getAddresses(addresses, filterAddr)
const imported = results.map(r => r.address)
for (let r of results) { const filteredResults = results.filter(r => r.ntx > 0)
// Mark the address as imported const scanTx = [...new Set(filteredResults.map(r => r.txids).flat())]
imported.push(r.address)
if (r.ntx == 0) continue
// TODO: Handle pathological case of many address transactions
while (r.txids.length > 0) {
let txid = r.txids.pop()
if (scanTx.indexOf(txid) == -1)
scanTx.push(txid)
}
}
Logger.info(`Importer : Got ${scanTx.length} transactions`) Logger.info(`Importer : Got ${scanTx.length} transactions`)
// Retrieve the transactions by batches of 100 transactions // Retrieve the transactions by batches of 100 transactions
const txsChunks = util.splitList(scanTx, 100) const txsChunks = util.splitList(scanTx, 100)
for (let txsChunk of txsChunks) { const txs = await util.seriesCall(txsChunks, chunk => {
const txs = await rpcTxns.getTransactions(txsChunk, false) return rpcTxns.getTransactions(chunk, false)
for (let tx of txs) })
if (tx != null) const txns = txs.flat().filter(tx => tx != null)
txns.push(tx)
}
// Import addresses and transactions into the database // Import addresses and transactions into the database
await db.addAddresses(imported) await db.addAddresses(imported)
@ -387,7 +268,7 @@ class RemoteImporter {
const N = addresses.length const N = addresses.length
if (N > 0) if (N > 0)
Logger.info(`Importer : Imported ${N} addresses in ${ts}s (${(dt/N).toFixed(0)} ms/addr)`) Logger.info(`Importer : Imported ${N} addresses in ${ts}s (${(dt/N).toFixed(0)} ms/addr)`)
for (let address of addresses) for (let address of addresses)
delete this.importing[address] delete this.importing[address]
@ -396,84 +277,116 @@ class RemoteImporter {
} }
} }
/**
* Import a list of transactions associated to a list of addresses
* @param {object[]} addresses - array of addresses objects
* @param {object[]} txs - array of transaction objects
* @returns {Promise}
*/
async _importTransactions(addresses, txs) {
const addrChunks = util.splitList(addresses, 1000)
const addrIdMaps = await util.parallelCall(addrChunks, chunk => {
return db.getAddressesIds(chunk)
})
const addrIdMap = Object.assign({}, ...addrIdMaps)
// Process the transactions by batches of 200 transactions
const txsChunks = util.splitList(txs, 200)
await util.parallelCall(txsChunks, chunk => {
return this._addTransactions(chunk)
})
await util.parallelCall(txsChunks, chunk => {
return this._addOutputs(chunk, addrIdMap)
})
await util.parallelCall(txsChunks, chunk => {
return this._addInputs(chunk)
})
}
/** /**
* Add a collection of transactions to the database. * Add a collection of transactions to the database.
* @param {object[]} txs - array of transaction objects * @param {object[]} txs - array of transaction objects
* @params {object} addrIdMap - map address => addrId
* @returns {Promise} * @returns {Promise}
*/ */
async addTransactions(txs, addrIdMap) { async _addTransactions(txs) {
try { try {
// Store the transactions into the database // Store the transactions into the database
await db.addTransactions(txs) await db.addTransactions(txs)
// Confirm the transactions if needed // Confirm the transactions if needed
const blocksHashes = new Set() const blocksHashes = txs.filter(tx => tx.block).map(tx => tx.block.hash)
for (let tx of txs) const blocks = await db.getBlocksByHashes(blocksHashes)
if (tx.block)
blocksHashes.add(tx.block.hash)
const blocks = await db.getBlocksByHashes(Array.from(blocksHashes)) return util.parallelCall(blocks, block => {
for (let block of blocks) {
// Filter the transactions by blockHash
const filteredTxs = txs.filter(tx => (tx.block && tx.block.hash == block.blockHash)) const filteredTxs = txs.filter(tx => (tx.block && tx.block.hash == block.blockHash))
if (filteredTxs.length > 0) { if (filteredTxs.length == 0) return
const txids = filteredTxs.map(tx => tx.txid) const txids = filteredTxs.map(tx => tx.txid)
// Asynchronous confirmations return db.confirmTransactions(txids, block.blockID)
db.confirmTransactions(txids, block.blockID) })
} } catch(e) {
} Logger.error(e, `Importer : RemoteImporter.addTransactions() :`)
}
}
// Retrieve the database ids for the transactions /**
* Add a collection of transaction outputs to the database.
* @param {object[]} txs - array of transaction objects
* @params {object} addrIdMap - map address => addrId
* @returns {Promise}
*/
async _addOutputs(txs, addrIdMap) {
try {
const txids = txs.map(tx => tx.txid) const txids = txs.map(tx => tx.txid)
const mapTxsIds = await db.getTransactionsIds(txids) const mapTxsIds = await db.getTransactionsIds(txids)
// Store the outputs in db const outputs = txs
const outputs = [] .map(tx => tx.outputs.map(o => (o.txnID = mapTxsIds[tx.txid], o)))
for (let tx of txs) { .flat()
for (let output of tx.outputs) { .filter(o => addrIdMap[o.address])
if (addrIdMap[output.address]) { .map(o => { return {
outputs.push({ txnID: o.txnID,
txnID: mapTxsIds[tx.txid], addrID: addrIdMap[o.address],
addrID: addrIdMap[output.address], outIndex: o.n,
outIndex: output.n, outAmount: o.value,
outAmount: output.value, outScript: o.scriptpubkey,
outScript: output.scriptpubkey, }})
})
} return db.addOutputs(outputs)
}
} } catch(e) {
await db.addOutputs(outputs) Logger.error(e, `Importer : RemoteImporter._addOutputs() :`)
}
}
// Store the inputs in db /**
const inputs = [] * Add a collection of transaction inputs to the database.
const spent = {} * @param {object[]} txs - array of transaction objects
* @returns {Promise}
*/
async _addInputs(txs) {
try {
// Retrieve the database ids for the transactions
const txids = txs.map(tx => tx.txid)
const mapTxsIds = await db.getTransactionsIds(txids)
// Get any outputs spent by the inputs of this transaction, // Get any outputs spent by the inputs of this transaction,
// add those database outIDs to the corresponding inputs, and store. // add those database outIDs to the corresponding inputs, and store.
let outpoints = [] const outpoints = txs.map(tx => tx.inputs).flat().map(input => input.outpoint)
for (let tx of txs)
outpoints = outpoints.concat(tx.inputs.map(input => input.outpoint))
const res = await db.getOutputIds(outpoints) const res = await db.getOutputIds(outpoints)
for (let r of res) const spent = res.reduce((m,r) => (m[`${r.txnTxid}-${r.outIndex}`] = r.outID, m), {})
spent[`${r.txnTxid}-${r.outIndex}`] = r.outID
const inputs = txs
for (let tx of txs) { .map(tx => tx.inputs.map(i => (i.txnID = mapTxsIds[tx.txid], i)))
for (let input of tx.inputs) { .flat()
const key = `${input.outpoint.txid}-${input.outpoint.vout}` .filter(i => spent[`${i.outpoint.txid}-${i.outpoint.vout}`])
if (spent[key]) { .map(i => { return {
inputs.push({ outID: spent[`${i.outpoint.txid}-${i.outpoint.vout}`],
outID: spent[key], txnID: i.txnID,
txnID: mapTxsIds[tx.txid], inIndex: i.n,
inIndex: input.n, inSequence: i.seq
inSequence: input.seq }})
})
} return db.addInputs(inputs)
}
}
await db.addInputs(inputs)
} catch(e) { } catch(e) {
Logger.error(e, `Importer : RemoteImporter.addTransactions() :`) Logger.error(e, `Importer : RemoteImporter.addTransactions() :`)

53
lib/util.js

@ -14,59 +14,6 @@ class Util {
*/ */
constructor() {} constructor() {}
/**
* Topological ordering of DAG
* https://en.wikipedia.org/wiki/Topological_sorting
*
* Kahn's algorithm
*
* L Empty list that will contain the sorted elements
* S Set of all nodes with no incoming edge
* while S is non-empty do
* remove a node n from S
* add n to tail of L
* for each node m with an edge e from n to m do
* remove edge e from the graph
* if m has no other incoming edges then
* insert m into S
*
* @param {object} parents - map of {[key]: [incoming edge keys]}
* @param {object} children - a map of {[key]: [outgoing edge keys]}
* @returns {object}
* if graph has edges then
* return error (graph has at least one cycle)
* else
* return L (a topologically sorted order)
*/
static topologicalOrdering(parents, children) {
const S = []
for (let node in parents) {
if (parents[node].length == 0) {
// Node has no parent (incoming edges)
S.push(node)
}
}
const L = []
while (S.length > 0) {
const node = S.pop()
L.push(node)
// Loop over nodes that depend on node
for (let child of children[node]) {
let i = parents[child].indexOf(node)
if (i > -1)
parents[child].splice(i, 1)
if (parents[child].length == 0)
S.push(child)
}
}
return L
}
/** /**
* Serialize a series of asynchronous calls to a function * Serialize a series of asynchronous calls to a function
* over a list of objects * over a list of objects

Loading…
Cancel
Save