@ -60,79 +60,6 @@ class RemoteImporter {
return this . importing [ xpub ] ? this . importing [ xpub ] : null
}
/ * *
* Process the relations between a list of transactions
* @ param { object [ ] } txs - array of transaction objects
* @ returns { object } returns a object with 3 mappings
* { txMap : { ] , txChildren : { } , txParents : { } }
* /
_ processTxsRelations ( txs ) {
const txMap = { }
const txChildren = { }
const txParents = { }
for ( let tx of txs ) {
let txid = tx . txid
// Populate txMap
txMap [ txid ] = tx
// Create parent-child transaction associations
if ( ! txChildren [ txid ] )
txChildren [ txid ] = [ ]
if ( ! txParents [ txid ] )
txParents [ txid ] = [ ]
for ( let i in tx . inputs ) {
const input = tx . inputs [ i ]
let prev = input . outpoint . txid
if ( ! txMap [ prev ] ) continue
if ( txParents [ txid ] . indexOf ( prev ) == - 1 )
txParents [ txid ] . push ( prev )
if ( ! txChildren [ prev ] )
txChildren [ prev ] = [ ]
if ( txChildren [ prev ] . indexOf ( txid ) == - 1 )
txChildren [ prev ] . push ( txid )
}
}
return {
txMap : txMap ,
txChildren : txChildren ,
txParents : txParents
}
}
/ * *
* Import a list of transactions associated to a list of addresses
* @ param { object [ ] } addresses - array of addresses objects
* @ param { object [ ] } txns - array of transaction objects
* @ returns { Promise }
* /
async _ importTransactions ( addresses , txns ) {
const addrIdMap = await db . getAddressesIds ( addresses )
// The transactions array must be topologically ordered, such that
// entries earlier in the array MUST NOT depend upon any entry later
// in the array.
const txMaps = this . _ processTxsRelations ( txns )
const txOrdered = util . topologicalOrdering ( txMaps . txParents , txMaps . txChildren )
const aTxs = [ ]
for ( let txid of txOrdered )
if ( txMaps . txMap [ txid ] )
aTxs . push ( txMaps . txMap [ txid ] )
// Store the transactions by batches of 200 transactions
const txsChunks = util . splitList ( aTxs , 200 )
for ( let txsChunk of txsChunks )
await this . addTransactions ( txsChunk , addrIdMap )
}
/ * *
* Import an HD account from remote sources
* @ param { string } xpub - HD Account
@ -161,32 +88,22 @@ class RemoteImporter {
const t0 = Date . now ( )
const chains = [ 0 , 1 ]
let gaps = [ gap . external , gap . internal ]
// Allow custom higher gap limits
// for local scans relying on bitcoind or on a local indexer
if ( gapLimit
&& ( ( keys . indexer . active == 'local_bitcoind' )
|| ( keys . indexer . active == 'local_indexer' ) )
) {
gaps = [ gapLimit , gapLimit ]
}
const isLocal = [ 'local_bitcoind' , 'local_indexer' ] . includes ( keys . indexer . active )
const gaps = ( gapLimit && isLocal ) ? [ gapLimit , gapLimit ] : [ gap . external , gap . internal ]
startIndex = ( startIndex == null ) ? - 1 : startIndex - 1
const addrIdMap = { }
let txns = [ ]
let addresses = [ ]
try {
const results = await util . series Call( chains , chain => {
const results = await util . parallelCall ( chains , chain => {
return this . xpubScan ( xpub , chain , startIndex , startIndex , gaps [ chain ] , type )
} )
// Accumulate addresses and transactions from all chains
for ( let result of results ) {
txns = txns . concat ( result . transactions )
addresses = addresses . concat ( result . addresses )
}
const txns = results . map ( r => r . transactions ) . flat ( )
const addresses = results . map ( r => r . addresses ) . flat ( )
const aAddresses = addresses . map ( a => a . address )
this . importing [ xpub ] = {
'status' : this . STATUS_IMPORT ,
@ -195,10 +112,13 @@ class RemoteImporter {
// Store the hdaccount and the addresses into the database
await db . ensureHDAccountId ( xpub , type )
await db . addAddressesToHDAccount ( xpub , addresses )
const addrChunks = util . splitList ( addresses , 1000 )
await util . parallelCall ( addrChunks , chunk => {
return db . addAddressesToHDAccount ( xpub , chunk )
} )
// Store the transaction into the database
const aAddresses = addresses . map ( a => a . address )
await this . _ importTransactions ( aAddresses , txns )
} catch ( e ) {
@ -256,62 +176,39 @@ class RemoteImporter {
d = u + G
Logger . info ( ` Importer : derived M/ ${ c } / ${ A . join ( ',' ) } ` )
const addrMap = { }
for ( let a of ret . addresses )
addrMap [ a . address ] = a
const addrMap = ret . addresses . reduce ( ( m , a ) => ( m [ a . address ] = a , m ) , { } )
const aAddresses = ret . addresses . map ( a => a . address )
try {
const results = await this . sources . getAddresses ( aAddresses )
let gotTransactions = false
const scanTx = [ ]
for ( let r of results ) {
if ( r . ntx == 0 ) continue
// Address is used. Update used parameter
u = Math . max ( u , addrMap [ r . address ] . index )
gotTransactions = true
// TODO: Handle pathological case of many address transactions
while ( r . txids . length > 0 ) {
let txid = r . txids . pop ( )
if ( ! txids [ txid ] )
scanTx . push ( txid )
}
}
const filteredResults = results . flat ( ) . filter ( r => r . ntx > 0 )
const gotTransactions = filteredResults . length > 0
const scanTx = filteredResults . map ( r => r . txids ) . flat ( ) . filter ( t => ! txids [ t ] )
u = filteredResults . reduce ( ( m , r ) => Math . max ( m , addrMap [ r . address ] . index ) , u )
Logger . info ( ` Importer : Got ${ scanTx . length } transactions ` )
// Retrieve the transactions by batches of 200 transactions
const txsChunks = util . splitList ( scanTx , 200 )
try {
for ( let txsChunk of txsChunks ) {
const txs = await rpcTxns . getTransactions ( txsChunk , false )
for ( let tx of txs ) {
if ( tx != null ) {
ret . transactions . push ( tx )
txids [ tx . txid ] = true
}
}
}
const txsChunks = util . splitList ( scanTx , 200 )
const txs = await util . seriesCall ( txsChunks , chunk => {
return rpcTxns . getTransactions ( chunk , false )
} )
const filteredTxs = txs . flat ( ) . filter ( tx => tx != null )
ret . transactions = ret . transactions . concat ( filteredTxs )
txids = filteredTxs . reduce ( ( m , tx ) => ( m [ tx . txid ] = true , m ) , txids )
} catch ( e ) {
Logger . error ( e , ` Importer : RemoteImporter.xpubScan() : getTransactions error ` )
}
if ( gotTransactions ) {
if ( c == 0 )
this . importing [ xpub ] [ 'txs_ext' ] = Object . keys ( txids ) . length
else
this . importing [ xpub ] [ 'txs_int' ] = Object . keys ( txids ) . length
const keyStatus = ( c == 0 ) ? 'txs_ext' : 'txs_int'
this . importing [ xpub ] [ keyStatus ] = Object . keys ( txids ) . length
// We must go deeper
const result = await this . xpubScan ( xpub , c , d , u , G , type , txids )
// Accumulate results from further down the rabbit hole
for ( let a of result . addresses )
ret . addresses . push ( a )
for ( let t of result . transactions )
ret . transactions . push ( t )
ret . addresses = ret . addresses . concat ( result . addresses )
ret . transactions = ret . transactions . concat ( result . transactions )
}
} catch ( e ) {
@ -329,50 +226,34 @@ class RemoteImporter {
* /
async importAddresses ( candidates , filterAddr ) {
const t0 = Date . now ( )
const txns = [ ]
const addresses = [ ]
const imported = [ ]
for ( let address of candidates ) {
if ( ! this . importing [ address ] ) {
addresses . push ( address )
this . importing [ address ] = true
} else {
Logger . info ( ` Importer : Import overlap for ${ address } . Skipping ` )
}
}
if ( addresses . length == 0 )
return true
// Check if some addresses are currently processed
const overlap = candidates . filter ( c => this . importing [ c ] )
for ( let a of overlap )
Logger . info ( ` Importer : Import overlap for ${ a } . Skipping ` )
// List addresses that need to be processed
const addresses = candidates . filter ( c => ! this . importing [ c ] )
this . importing = addresses . reduce ( ( m , a ) => ( m [ a ] = true , m ) , this . importing )
if ( addresses . length == 0 ) return true
Logger . info ( ` Importer : Importing ${ addresses . join ( ',' ) } ` )
try {
const scanTx = [ ]
const results = await this . sources . getAddresses ( addresses , filterAddr )
for ( let r of results ) {
// Mark the address as imported
imported . push ( r . address )
if ( r . ntx == 0 ) continue
// TODO: Handle pathological case of many address transactions
while ( r . txids . length > 0 ) {
let txid = r . txids . pop ( )
if ( scanTx . indexOf ( txid ) == - 1 )
scanTx . push ( txid )
}
}
const imported = results . map ( r => r . address )
const filteredResults = results . filter ( r => r . ntx > 0 )
const scanTx = [ ... new Set ( filteredResults . map ( r => r . txids ) . flat ( ) ) ]
Logger . info ( ` Importer : Got ${ scanTx . length } transactions ` )
// Retrieve the transactions by batches of 100 transactions
const txsChunks = util . splitList ( scanTx , 100 )
for ( let txsChunk of txsChunks ) {
const txs = await rpcTxns . getTransactions ( txsChunk , false )
for ( let tx of txs )
if ( tx != null )
txns . push ( tx )
}
const txs = await util . seriesCall ( txsChunks , chunk => {
return rpcTxns . getTransactions ( chunk , false )
} )
const txns = txs . flat ( ) . filter ( tx => tx != null )
// Import addresses and transactions into the database
await db . addAddresses ( imported )
@ -396,84 +277,116 @@ class RemoteImporter {
}
}
/ * *
* Import a list of transactions associated to a list of addresses
* @ param { object [ ] } addresses - array of addresses objects
* @ param { object [ ] } txs - array of transaction objects
* @ returns { Promise }
* /
async _ importTransactions ( addresses , txs ) {
const addrChunks = util . splitList ( addresses , 1000 )
const addrIdMaps = await util . parallelCall ( addrChunks , chunk => {
return db . getAddressesIds ( chunk )
} )
const addrIdMap = Object . assign ( { } , ... addrIdMaps )
// Process the transactions by batches of 200 transactions
const txsChunks = util . splitList ( txs , 200 )
await util . parallelCall ( txsChunks , chunk => {
return this . _ addTransactions ( chunk )
} )
await util . parallelCall ( txsChunks , chunk => {
return this . _ addOutputs ( chunk , addrIdMap )
} )
await util . parallelCall ( txsChunks , chunk => {
return this . _ addInputs ( chunk )
} )
}
/ * *
* Add a collection of transactions to the database .
* @ param { object [ ] } txs - array of transaction objects
* @ params { object } addrIdMap - map address => addrId
* @ returns { Promise }
* /
async addTransactions ( txs , addrIdMap ) {
async _ addTransactions ( txs ) {
try {
// Store the transactions into the database
await db . addTransactions ( txs )
// Confirm the transactions if needed
const blocksHashes = new Set ( )
for ( let tx of txs )
if ( tx . block )
blocksHashes . add ( tx . block . hash )
const blocks = await db . getBlocksByHashes ( Array . from ( blocksHashes ) )
const blocksHashes = txs . filter ( tx => tx . block ) . map ( tx => tx . block . hash )
const blocks = await db . getBlocksByHashes ( blocksHashes )
for ( let block of blocks ) {
// Filter the transactions by blockHash
return util . parallelCall ( blocks , block => {
const filteredTxs = txs . filter ( tx => ( tx . block && tx . block . hash == block . blockHash ) )
if ( filteredTxs . length > 0 ) {
if ( filteredTxs . length == 0 ) return
const txids = filteredTxs . map ( tx => tx . txid )
// Asynchronous confirmations
db . confirmTransactions ( txids , block . blockID )
return db . confirmTransactions ( txids , block . blockID )
} )
} catch ( e ) {
Logger . error ( e , ` Importer : RemoteImporter.addTransactions() : ` )
}
}
// Retrieve the database ids for the transactions
/ * *
* Add a collection of transaction outputs to the database .
* @ param { object [ ] } txs - array of transaction objects
* @ params { object } addrIdMap - map address => addrId
* @ returns { Promise }
* /
async _ addOutputs ( txs , addrIdMap ) {
try {
const txids = txs . map ( tx => tx . txid )
const mapTxsIds = await db . getTransactionsIds ( txids )
// Store the outputs in db
const outputs = [ ]
for ( let tx of txs ) {
for ( let output of tx . outputs ) {
if ( addrIdMap [ output . address ] ) {
outputs . push ( {
txnID : mapTxsIds [ tx . txid ] ,
addrID : addrIdMap [ output . address ] ,
outIndex : output . n ,
outAmount : output . value ,
outScript : output . scriptpubkey ,
} )
}
const outputs = txs
. map ( tx => tx . outputs . map ( o => ( o . txnID = mapTxsIds [ tx . txid ] , o ) ) )
. flat ( )
. filter ( o => addrIdMap [ o . address ] )
. map ( o => { return {
txnID : o . txnID ,
addrID : addrIdMap [ o . address ] ,
outIndex : o . n ,
outAmount : o . value ,
outScript : o . scriptpubkey ,
} } )
return db . addOutputs ( outputs )
} catch ( e ) {
Logger . error ( e , ` Importer : RemoteImporter._addOutputs() : ` )
}
}
await db . addOutputs ( outputs )
// Store the inputs in db
const inputs = [ ]
const spent = { }
/ * *
* Add a collection of transaction inputs to the database .
* @ param { object [ ] } txs - array of transaction objects
* @ returns { Promise }
* /
async _ addInputs ( txs ) {
try {
// Retrieve the database ids for the transactions
const txids = txs . map ( tx => tx . txid )
const mapTxsIds = await db . getTransactionsIds ( txids )
// Get any outputs spent by the inputs of this transaction,
// add those database outIDs to the corresponding inputs, and store.
let outpoints = [ ]
for ( let tx of txs )
outpoints = outpoints . concat ( tx . inputs . map ( input => input . outpoint ) )
const outpoints = txs . map ( tx => tx . inputs ) . flat ( ) . map ( input => input . outpoint )
const res = await db . getOutputIds ( outpoints )
for ( let r of res )
spent [ ` ${ r . txnTxid } - ${ r . outIndex } ` ] = r . outID
for ( let tx of txs ) {
for ( let input of tx . inputs ) {
const key = ` ${ input . outpoint . txid } - ${ input . outpoint . vout } `
if ( spent [ key ] ) {
inputs . push ( {
outID : spent [ key ] ,
txnID : mapTxsIds [ tx . txid ] ,
inIndex : input . n ,
inSequence : input . seq
} )
}
}
}
await db . addInputs ( inputs )
const spent = res . reduce ( ( m , r ) => ( m [ ` ${ r . txnTxid } - ${ r . outIndex } ` ] = r . outID , m ) , { } )
const inputs = txs
. map ( tx => tx . inputs . map ( i => ( i . txnID = mapTxsIds [ tx . txid ] , i ) ) )
. flat ( )
. filter ( i => spent [ ` ${ i . outpoint . txid } - ${ i . outpoint . vout } ` ] )
. map ( i => { return {
outID : spent [ ` ${ i . outpoint . txid } - ${ i . outpoint . vout } ` ] ,
txnID : i . txnID ,
inIndex : i . n ,
inSequence : i . seq
} } )
return db . addInputs ( inputs )
} catch ( e ) {
Logger . error ( e , ` Importer : RemoteImporter.addTransactions() : ` )