Browse Source

switch parallel addresses derivation to worker threads

umbrel
kenshin-samourai 4 years ago
parent
commit
5f85abcb45
  1. 18
      lib/bitcoin/hd-accounts-helper.js
  2. 22
      lib/bitcoin/parallel-address-derivation.js
  3. 85
      lib/fork-pool.js
  4. 11
      package-lock.json
  5. 2
      package.json

18
lib/bitcoin/hd-accounts-helper.js

@ -4,14 +4,13 @@
*/ */
'use strict' 'use strict'
const cp = require('child_process')
const LRU = require('lru-cache') const LRU = require('lru-cache')
const workerPool = require('workerpool')
const bitcoin = require('bitcoinjs-lib') const bitcoin = require('bitcoinjs-lib')
const bs58check = require('bs58check') const bs58check = require('bs58check')
const bs58 = require('bs58') const bs58 = require('bs58')
const errors = require('../errors') const errors = require('../errors')
const Logger = require('../logger') const Logger = require('../logger')
const ForkPool = require('../fork-pool')
const network = require('./network') const network = require('./network')
const activeNet = network.network const activeNet = network.network
const keys = require('../../keys/')[network.key] const keys = require('../../keys/')[network.key]
@ -64,17 +63,16 @@ class HDAccountsHelper {
// Pool of child processes used for derivation of addresses // Pool of child processes used for derivation of addresses
const poolKeys = keys.addrDerivationPool const poolKeys = keys.addrDerivationPool
this.derivationPool = new ForkPool( this.derivationPool = workerPool.pool(
`${__dirname}/parallel-address-derivation.js`, `${__dirname}/parallel-address-derivation.js`,
{ {
networkKey: network.key, maxWorkers: poolKeys.maxNbChildren,
max: poolKeys.maxNbChildren, minWorkers: poolKeys.minNbChildren,
min: poolKeys.minNbChildren, workerType: 'thread'
acquireTimeoutMillis: poolKeys.acquireTimeoutMillis
} }
) )
this.externalDerivationActivated = true this.externalDerivationActivated = true
Logger.info(`Created ${poolKeys.minNbChildren} worker threads for addresses derivation (max = ${poolKeys.maxNbChildren})`)
} }
/** /**
@ -374,7 +372,7 @@ class HDAccountsHelper {
type: info.type type: info.type
} }
const msg = await this.derivationPool.enqueue(data) const msg = await this.derivationPool.exec('deriveAddresses', [data])
if (msg.status = 'ok') { if (msg.status = 'ok') {
resolve(msg.addresses) resolve(msg.addresses)
@ -384,7 +382,7 @@ class HDAccountsHelper {
} }
} catch(e) { } catch(e) {
Logger.error(e, 'HdAccountsHelper : A problem was met during parallel addresses derivation') Logger.error(null, 'HdAccountsHelper : A problem was met during parallel addresses derivation')
reject(e) reject(e)
} }
}) })

22
lib/bitcoin/parallel-address-derivation.js

@ -5,6 +5,7 @@
'use strict' 'use strict'
const bitcoin = require('bitcoinjs-lib') const bitcoin = require('bitcoinjs-lib')
const workerPool = require('workerpool')
const errors = require('../errors') const errors = require('../errors')
const activeNet = require('./network').network const activeNet = require('./network').network
const addrHelper = require('./addresses-helper') const addrHelper = require('./addresses-helper')
@ -26,7 +27,7 @@ const BIP84 = 2
* @param {int} type - type of derivation * @param {int} type - type of derivation
* @returns {Promise - object} returns an object {address: '...', chain: <int>, index: <int>} * @returns {Promise - object} returns an object {address: '...', chain: <int>, index: <int>}
*/ */
const deriveAddress = async function(chain, chainNode, index, type) { async function deriveAddress(chain, chainNode, index, type) {
// Derive M/chain/index // Derive M/chain/index
const indexNode = chainNode.derive(index) const indexNode = chainNode.derive(index)
@ -51,9 +52,11 @@ const deriveAddress = async function(chain, chainNode, index, type) {
} }
/** /**
* Receive message from parent process * Derives a set of addresses for an hd account
* @param {object} msg - parameters used for the derivation
* @returns {Promise - object[]}
*/ */
process.on('message', async (msg) => { async function deriveAddresses(msg) {
try { try {
const xpub = msg.xpub const xpub = msg.xpub
const chain = msg.chain const chain = msg.chain
@ -76,17 +79,20 @@ process.on('message', async (msg) => {
const addresses = await Promise.all(promises) const addresses = await Promise.all(promises)
// Send response to parent process // Send response to parent process
process.send({ return {
status: 'ok', status: 'ok',
addresses: addresses addresses: addresses
}) }
} catch(e) { } catch(e) {
process.send({ return {
status: 'error', status: 'error',
addresses: [], addresses: [],
error: e error: JSON.stringify(e)
}) }
} }
}
workerPool.worker({
deriveAddresses: deriveAddresses
}) })

85
lib/fork-pool.js

@ -1,85 +0,0 @@
/*!
* lib/fork-pool.js
* Copyright © 2019 Katana Cryptographic Ltd. All Rights Reserved.
*/
'use strict'
const os = require('os')
const childProcess = require('child_process')
const genericPool = require('generic-pool')
const Logger = require('./logger')
/**
* A class managing a pool of child processes
* Inspired from fork-pool by Andrew Sliwinski
* https://github.com/thisandagain/fork-pool/
*/
class ForkPool {
/**
* Constructor
*/
constructor(path, options) {
if (!options) {
this._networkKey = ''
this._options = {
max: os.cpus().length / 2,
min: os.cpus().length / 2,
acquireTimeoutMillis: 60000
}
} else {
this._networkKey = options.networkKey
this._options = options
}
const factory = {
create: () => {
return childProcess.fork(path, [this._networkKey])
},
destroy: (cp) => {
cp.kill()
}
}
this.pool = genericPool.createPool(factory, this._options)
Logger.info(`Created ${this._options.min} child processes for addresses derivation (max = ${this._options.max})`)
}
/**
* Enqueue a new task to be processed by a child process
* @param {object} data - data to be passed to the child process
* @returns {Promise}
*/
async enqueue(data) {
let cp
const pool = this.pool
return new Promise(async (resolve, reject) => {
try {
cp = await pool.acquire()
cp.send(data)
cp.once('message', async msg => {
pool.release(cp)
resolve(msg)
})
} catch(e) {
reject(e)
}
})
}
/**
* Drain the pool
*/
async drain() {
await this.pool.drain()
await this.pool.clear()
}
}
module.exports = ForkPool

11
package-lock.json

@ -508,6 +508,7 @@
"requires": { "requires": {
"anymatch": "~3.1.1", "anymatch": "~3.1.1",
"braces": "~3.0.2", "braces": "~3.0.2",
"fsevents": "~2.1.1",
"glob-parent": "~5.1.0", "glob-parent": "~5.1.0",
"is-binary-path": "~2.1.0", "is-binary-path": "~2.1.0",
"is-glob": "~4.0.1", "is-glob": "~4.0.1",
@ -1121,11 +1122,6 @@
"wide-align": "^1.1.0" "wide-align": "^1.1.0"
} }
}, },
"generic-pool": {
"version": "3.4.2",
"resolved": "https://registry.npmjs.org/generic-pool/-/generic-pool-3.4.2.tgz",
"integrity": "sha512-H7cUpwCQSiJmAHM4c/aFu6fUfrhWXW1ncyh8ftxEPMu6AiYkHw9K8br720TGPZJbk5eOH2bynjZD1yPvdDAmag=="
},
"get-caller-file": { "get-caller-file": {
"version": "2.0.5", "version": "2.0.5",
"resolved": "https://registry.npmjs.org/get-caller-file/-/get-caller-file-2.0.5.tgz", "resolved": "https://registry.npmjs.org/get-caller-file/-/get-caller-file-2.0.5.tgz",
@ -2540,6 +2536,11 @@
"bs58check": "<3.0.0" "bs58check": "<3.0.0"
} }
}, },
"workerpool": {
"version": "6.1.4",
"resolved": "https://registry.npmjs.org/workerpool/-/workerpool-6.1.4.tgz",
"integrity": "sha512-jGWPzsUqzkow8HoAvqaPWTUPCrlPJaJ5tY8Iz7n1uCz3tTp6s3CDG0FF1NsX42WNlkRSW6Mr+CDZGnNoSsKa7g=="
},
"wrap-ansi": { "wrap-ansi": {
"version": "5.1.0", "version": "5.1.0",
"resolved": "https://registry.npmjs.org/wrap-ansi/-/wrap-ansi-5.1.0.tgz", "resolved": "https://registry.npmjs.org/wrap-ansi/-/wrap-ansi-5.1.0.tgz",

2
package.json

@ -23,7 +23,6 @@
"body-parser": "1.18.3", "body-parser": "1.18.3",
"express": "4.16.3", "express": "4.16.3",
"express-jwt": "5.3.1", "express-jwt": "5.3.1",
"generic-pool": "3.4.2",
"helmet": "3.23.3", "helmet": "3.23.3",
"lodash": "4.17.19", "lodash": "4.17.19",
"lru-cache": "4.0.2", "lru-cache": "4.0.2",
@ -34,6 +33,7 @@
"socks-proxy-agent": "4.0.1", "socks-proxy-agent": "4.0.1",
"validator": "10.8.0", "validator": "10.8.0",
"websocket": "1.0.28", "websocket": "1.0.28",
"workerpool": "6.1.4",
"zeromq": "4.2.0" "zeromq": "4.2.0"
}, },
"devDependencies": { "devDependencies": {

Loading…
Cancel
Save