Browse Source

Merge pull request #102 from Samourai-Wallet/imp_perfs_importer

improve performances of transactions imports
umbrel
kenshin samourai 5 years ago
committed by GitHub
parent
commit
dfdda2e9e8
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      lib/bitcoind-rpc/rpc-client.js
  2. 134
      lib/bitcoind-rpc/transactions.js
  3. 58
      lib/db/mysql-db-wrapper.js
  4. 156
      lib/remote-importer/remote-importer.js
  5. 2
      lib/remote-importer/sources.js
  6. 18
      lib/util.js

4
lib/bitcoind-rpc/rpc-client.js

@ -37,7 +37,9 @@ class RpcClient {
return async function(...args) {
const result = await origMethod.apply(target.client, args)
if (result.result) {
if (Array.isArray(result)) {
return result
} else if (result.result) {
return result.result
} else if (result.error) {
throw result.error

134
lib/bitcoind-rpc/transactions.js

@ -46,6 +46,38 @@ class Transactions {
this.rpcClient = new RpcClient()
}
/**
* Get the transactions for a given array of txids
* @param {string[]} txids - txids of the transaction to be retrieved
* @param {boolean} fees - true if fees must be computed, false otherwise
* @returns {Promise} return an array of transactions (object[])
*/
async getTransactions(txids, fees) {
try {
const rpcCalls = txids.map(txid => {
return {
'method': 'getrawtransaction',
'params': [txid, true]
}
})
const txs = await this.rpcClient.batch(rpcCalls)
return await util.seriesCall(txs, async tx => {
if (tx.result == null) {
Logger.info(` got null for ${txids[tx.id]}`)
return null
} else {
return this._prepareTxResult(tx.result, fees)
}
})
} catch(e) {
Logger.error(e, 'Transaction.getTransactions()')
return Promise.reject(errors.generic.GEN)
}
}
/**
* Get the transaction for a given txid
* @param {string} txid - txid of the transaction to be retrieved
@ -61,64 +93,73 @@ class Transactions {
try {
const tx = await this.rpcClient.getrawtransaction(txid, true)
const ret = await this._prepareTxResult(tx)
// Store the result in cache
if (ret.block && ret.block.hash)
this.txCache.set(txid, ret)
return ret
} catch(e) {
Logger.error(e, 'Transaction.getTransaction()')
return Promise.reject(errors.generic.GEN)
}
}
const ret = {
txid: tx.txid,
size: tx.size,
vsize: tx.vsize,
version: tx.version,
locktime: tx.locktime,
inputs: [],
outputs: []
}
/**
* Formats a transaction object returned by the RPC API
* @param {object} tx - transaction
* @param {boolean} fees - true if fees must be computed, false otherwise
* @returns {Promise} return an array of inputs (object[])
*/
async _prepareTxResult(tx, fees) {
const ret = {
txid: tx.txid,
size: tx.size,
vsize: tx.vsize,
version: tx.version,
locktime: tx.locktime,
inputs: [],
outputs: []
}
if (!ret.vsize)
delete ret.vsize
if (!ret.vsize)
delete ret.vsize
if (tx.time)
ret.created = tx.time
if (tx.time)
ret.created = tx.time
// Process block informations
if (tx.blockhash && tx.confirmations && tx.blocktime) {
ret.block = {
height: rpcLatestBlock.height - tx.confirmations + 1,
hash: tx.blockhash,
time: tx.blocktime
}
// Process block informations
if (tx.blockhash && tx.confirmations && tx.blocktime) {
ret.block = {
height: rpcLatestBlock.height - tx.confirmations + 1,
hash: tx.blockhash,
time: tx.blocktime
}
}
let inAmount = 0
let outAmount = 0
// Process the inputs
ret.inputs = await this._getInputs(tx, fees)
inAmount = ret.inputs.reduce((prev, cur) => prev + cur.outpoint.value, 0)
// Process the outputs
ret.outputs = await this._getOutputs(tx)
outAmount = ret.outputs.reduce((prev, cur) => prev + cur.value, 0)
let inAmount = 0
let outAmount = 0
// Process the fees (if needed)
if (fees) {
ret.fees = inAmount - outAmount
if (ret.fees > 0 && ret.size)
ret.feerate = Math.round(ret.fees / ret.size)
if (ret.fees > 0 && ret.vsize)
ret.vfeerate = Math.round(ret.fees / ret.vsize)
}
// Process the inputs
ret.inputs = await this._getInputs(tx, fees)
inAmount = ret.inputs.reduce((prev, cur) => prev + cur.outpoint.value, 0)
// Store in cache
if (ret.block && ret.block.hash)
this.txCache.set(keyCache, ret)
// Process the outputs
ret.outputs = await this._getOutputs(tx)
outAmount = ret.outputs.reduce((prev, cur) => prev + cur.value, 0)
return ret
} catch(e) {
Logger.error(e, 'Transaction.getTransaction()')
return Promise.reject(errors.generic.GEN)
// Process the fees (if needed)
if (fees) {
ret.fees = inAmount - outAmount
if (ret.fees > 0 && ret.size)
ret.feerate = Math.round(ret.fees / ret.size)
if (ret.fees > 0 && ret.vsize)
ret.vfeerate = Math.round(ret.fees / ret.vsize)
}
return ret
}
/**
* Extract information about the inputs of a transaction
* @param {object} tx - transaction
@ -180,7 +221,6 @@ class Transactions {
/**
* Extract information about the outputs of a transaction
* @param {object} tx - transaction
* @param {boolean} fees - true if fees must be computed, false otherwise
* @returns {Promise} return an array of outputs (object[])
*/
async _getOutputs(tx) {

58
lib/db/mysql-db-wrapper.js

@ -1358,6 +1358,26 @@ class MySqlDbWrapper {
return (result.length == 0) ? null : result[0].txnID
}
/**
* Get the mysql IDs of a collection of transactions
* @param {string[]} txids - txids of the transactions
* @returns {object[]} returns an array of {txnTxid: txnId}
*/
async getTransactionsIds(txids) {
if (txids.length == 0)
return []
const sqlQuery = 'SELECT `txnID`, `txnTxid` FROM `transactions` WHERE `txnTxid` IN (?)'
const params = [txids]
const query = mysql.format(sqlQuery, params)
const results = await this._query(query)
const ret = {}
for (let r of results)
ret[r.txnTxid] = r.txnID
return ret
}
/**
* Get the mysql IDs of a set of transactions
* @param {string[]} txid - array of transactions txids
@ -1396,6 +1416,29 @@ class MySqlDbWrapper {
return this._query(query)
}
/**
* Insert a collection of transactions in db
* @param {object[]} txs - array of {txid, version, locktime}
*/
async addTransactions(txs) {
if (txs.length == 0)
return
const sqlQuery = 'INSERT INTO `transactions` \
(txnTxid, txnCreated, txnVersion, txnLocktime) VALUES ? \
ON DUPLICATE KEY UPDATE txnVersion = VALUES(txnVersion)'
const params = [txs.map(tx => [
tx.txid,
tx.created,
tx.version,
tx.locktime
])]
const query = mysql.format(sqlQuery, params)
return this._query(query)
}
/**
* Get a transaction for a given txid
* @param {string} txid - txid of the transaction
@ -1773,6 +1816,21 @@ class MySqlDbWrapper {
return (result.length == 1) ? result[0] : null
}
/**
* Get a collection of blocks identified by the blocks hashes
* @param {string[]} hashes - blocks hashes
* @returns {object[]} returns the blocks
*/
async getBlocksByHashes(hashes) {
if (hashes.length == 0)
return []
const sqlQuery = 'SELECT * FROM `blocks` WHERE `blockHash` IN (?)'
const params = [hashes]
const query = mysql.format(sqlQuery, params)
return await this._query(query)
}
/**
* Get details about all blocks at a given block height
* @param {integer} height - block height

156
lib/remote-importer/remote-importer.js

@ -115,7 +115,10 @@ class RemoteImporter {
if (txMaps.txMap[txid])
aTxs.push(txMaps.txMap[txid])
return util.seriesCall(aTxs, tx => this.addTransaction(tx, addrIdMap))
// Store the transactions by batches of 200 transactions
const txsChunks = util.splitList(aTxs, 200)
for (let txsChunk of txsChunks)
await this.addTransactions(txsChunk, addrIdMap)
}
/**
@ -260,19 +263,21 @@ class RemoteImporter {
Logger.info(` Got ${scanTx.length} transactions`)
await util.seriesCall(scanTx, async txid => {
try {
const tx = await rpcTxns.getTransaction(txid, false)
if (tx == null) {
Logger.info(` got null for ${txid}`)
return null
// Retrieve the transactions by batches of 200 transactions
const txsChunks = util.splitList(scanTx, 200)
try {
for (let txsChunk of txsChunks) {
const txs = await rpcTxns.getTransactions(txsChunk, false)
for (let tx of txs) {
if (tx != null) {
ret.transactions.push(tx)
txids[tx.txid] = true
}
}
ret.transactions.push(tx)
txids[tx.txid] = true
} catch(e) {
Logger.error(e, `RemoteImporter.xpubScan() : rawTransaction error, txid ${txid}`)
}
})
} catch(e) {
Logger.error(e, `RemoteImporter.xpubScan() : getTransactions error`)
}
if (gotTransactions) {
// We must go deeper
@ -335,15 +340,14 @@ class RemoteImporter {
Logger.info(` Got ${scanTx.length} transactions`)
// Get transaction s data from bitcoind
await util.seriesCall(scanTx, async txid => {
const tx = await rpcTxns.getTransaction(txid, false)
if (tx == null) {
Logger.info(` got null for ${txid}`)
return null
}
txns.push(tx)
})
// Retrieve the transactions by batches of 100 transactions
const txsChunks = util.splitList(scanTx, 100)
for (let txsChunk of txsChunks) {
const txs = await rpcTxns.getTransactions(txsChunk, false)
for (let tx of txs)
if (tx != null)
txns.push(tx)
}
// Import addresses and transactions into the database
await db.addAddresses(imported)
@ -368,70 +372,86 @@ class RemoteImporter {
}
/**
* Add a transaction to the database.
* @param {object} tx - transaction object
* @params {Promise}
* Add a collection of transactions to the database.
* @param {object[]} txs - array of transaction objects
* @params {object} addrIdMap - map address => addrId
* @returns {Promise}
*/
async addTransaction(tx, addrIdMap) {
const outputs = []
async addTransactions(txs, addrIdMap) {
try {
// Store the transaction into the database
await db.addTransaction(tx)
// Confirm the transaction
if (tx.block) {
const block = await db.getBlockByHash(tx.block.hash)
if (block)
await db.confirmTransactions([tx.txid], block.blockID)
// Store the transactions into the database
await db.addTransactions(txs)
// Confirm the transactions if needed
const blocksHashes = new Set()
for (let tx of txs)
if (tx.block)
blocksHashes.add(tx.block.hash)
const blocks = await db.getBlocksByHashes(Array.from(blocksHashes))
for (let block of blocks) {
// Filter the transactions by blockHash
const filteredTxs = txs.filter(tx => (tx.block && tx.block.hash == block.blockHash))
if (filteredTxs.length > 0) {
const txids = filteredTxs.map(tx => tx.txid)
// Asynchronous confirmations
db.confirmTransactions(txids, block.blockID)
}
}
// Retrieve the database id for the transaction
let txnID = await db.ensureTransactionId(tx.txid)
// Process the outputs
for (let output of tx.outputs) {
if (addrIdMap[output.address]) {
outputs.push({
txnID,
addrID: addrIdMap[output.address],
outIndex: output.n,
outAmount: output.value,
outScript: output.scriptpubkey,
})
// Retrieve the database ids for the transactions
const txids = txs.map(tx => tx.txid)
const mapTxsIds = await db.getTransactionsIds(txids)
// Store the outputs in db
const outputs = []
for (let tx of txs) {
for (let output of tx.outputs) {
if (addrIdMap[output.address]) {
outputs.push({
txnID: mapTxsIds[tx.txid],
addrID: addrIdMap[output.address],
outIndex: output.n,
outAmount: output.value,
outScript: output.scriptpubkey,
})
}
}
}
await db.addOutputs(outputs)
// Process the inputs
// Get any outputs spent by the inputs of this transaction, add those
// database outIDs to the corresponding transaction inputs, and store.
const res = await db.getOutputIds(tx.inputs.map(input => input.outpoint))
const spent = {}
// Store the inputs in db
const inputs = []
const spent = {}
// Get any outputs spent by the inputs of this transaction,
// add those database outIDs to the corresponding inputs, and store.
let outpoints = []
for (let tx of txs)
outpoints = outpoints.concat(tx.inputs.map(input => input.outpoint))
const res = await db.getOutputIds(outpoints)
for (let r of res)
spent[`${r.txnTxid}-${r.outIndex}`] = r.outID
for (let input of tx.inputs) {
let key = `${input.outpoint.txid}-${input.outpoint.vout}`
if (spent[key]) {
inputs.push({
outID: spent[key],
txnID,
inIndex: input.n,
inSequence: input.seq
})
for (let tx of txs) {
for (let input of tx.inputs) {
const key = `${input.outpoint.txid}-${input.outpoint.vout}`
if (spent[key]) {
inputs.push({
outID: spent[key],
txnID: mapTxsIds[tx.txid],
inIndex: input.n,
inSequence: input.seq
})
}
}
}
await db.addInputs(inputs)
} catch(e) {
Logger.error(e, `RemoteImporter.addTransaction() : xpub ${tx.txid}`)
Logger.error(null, JSON.stringify(tx,null,2))
Logger.error(e, `RemoteImporter.addTransactions() :`)
}
}

2
lib/remote-importer/sources.js

@ -75,7 +75,7 @@ class Sources {
}
} catch(e) {
Logger.error(e, `Sources.getAddresses() : ${addresses} from ${this.source.base}`)
Logger.error(null, `Sources.getAddresses() : Error while requesting ${addresses} from ${this.source.base}`)
} finally {
return ret
}

18
lib/util.js

@ -101,20 +101,14 @@ class Util {
* Splits a list into a list of lists each with maximum length LIMIT
*/
static splitList(list, limit) {
if (list.length <= limit) {
if (list.length <= limit)
return [list]
} else {
const lists = []
// How many lists to create?
const count = Math.ceil(list.length / limit)
// How many elements per list (max)?
const els = Math.ceil(list.length / count)
for (let i=0; i < count; i++) {
lists.push(list.slice(i * els, (i+1) * els))
}
return lists
const lists = []
while (list.length) {
lists.push(list.splice(0, limit))
}
return lists
}
/**

Loading…
Cancel
Save