From 5f85abcb45d719a1f4daa65ea46f0222528d119b Mon Sep 17 00:00:00 2001 From: kenshin-samourai Date: Mon, 19 Apr 2021 19:13:39 +0200 Subject: [PATCH] switch parallel addresses derivation to worker threads --- lib/bitcoin/hd-accounts-helper.js | 18 ++--- lib/bitcoin/parallel-address-derivation.js | 22 ++++-- lib/fork-pool.js | 85 ---------------------- package-lock.json | 11 +-- package.json | 2 +- 5 files changed, 29 insertions(+), 109 deletions(-) delete mode 100644 lib/fork-pool.js diff --git a/lib/bitcoin/hd-accounts-helper.js b/lib/bitcoin/hd-accounts-helper.js index 728196a..51dfe0f 100644 --- a/lib/bitcoin/hd-accounts-helper.js +++ b/lib/bitcoin/hd-accounts-helper.js @@ -4,14 +4,13 @@ */ 'use strict' -const cp = require('child_process') const LRU = require('lru-cache') +const workerPool = require('workerpool') const bitcoin = require('bitcoinjs-lib') const bs58check = require('bs58check') const bs58 = require('bs58') const errors = require('../errors') const Logger = require('../logger') -const ForkPool = require('../fork-pool') const network = require('./network') const activeNet = network.network const keys = require('../../keys/')[network.key] @@ -64,17 +63,16 @@ class HDAccountsHelper { // Pool of child processes used for derivation of addresses const poolKeys = keys.addrDerivationPool - this.derivationPool = new ForkPool( + this.derivationPool = workerPool.pool( `${__dirname}/parallel-address-derivation.js`, { - networkKey: network.key, - max: poolKeys.maxNbChildren, - min: poolKeys.minNbChildren, - acquireTimeoutMillis: poolKeys.acquireTimeoutMillis + maxWorkers: poolKeys.maxNbChildren, + minWorkers: poolKeys.minNbChildren, + workerType: 'thread' } ) - this.externalDerivationActivated = true + Logger.info(`Created ${poolKeys.minNbChildren} worker threads for addresses derivation (max = ${poolKeys.maxNbChildren})`) } /** @@ -374,7 +372,7 @@ class HDAccountsHelper { type: info.type } - const msg = await this.derivationPool.enqueue(data) + const msg = await this.derivationPool.exec('deriveAddresses', [data]) if (msg.status = 'ok') { resolve(msg.addresses) @@ -384,7 +382,7 @@ class HDAccountsHelper { } } catch(e) { - Logger.error(e, 'HdAccountsHelper : A problem was met during parallel addresses derivation') + Logger.error(null, 'HdAccountsHelper : A problem was met during parallel addresses derivation') reject(e) } }) diff --git a/lib/bitcoin/parallel-address-derivation.js b/lib/bitcoin/parallel-address-derivation.js index cae5240..38e1ce3 100644 --- a/lib/bitcoin/parallel-address-derivation.js +++ b/lib/bitcoin/parallel-address-derivation.js @@ -5,6 +5,7 @@ 'use strict' const bitcoin = require('bitcoinjs-lib') +const workerPool = require('workerpool') const errors = require('../errors') const activeNet = require('./network').network const addrHelper = require('./addresses-helper') @@ -26,7 +27,7 @@ const BIP84 = 2 * @param {int} type - type of derivation * @returns {Promise - object} returns an object {address: '...', chain: , index: } */ -const deriveAddress = async function(chain, chainNode, index, type) { +async function deriveAddress(chain, chainNode, index, type) { // Derive M/chain/index const indexNode = chainNode.derive(index) @@ -51,9 +52,11 @@ const deriveAddress = async function(chain, chainNode, index, type) { } /** - * Receive message from parent process + * Derives a set of addresses for an hd account + * @param {object} msg - parameters used for the derivation + * @returns {Promise - object[]} */ -process.on('message', async (msg) => { +async function deriveAddresses(msg) { try { const xpub = msg.xpub const chain = msg.chain @@ -76,17 +79,20 @@ process.on('message', async (msg) => { const addresses = await Promise.all(promises) // Send response to parent process - process.send({ + return { status: 'ok', addresses: addresses - }) + } } catch(e) { - process.send({ + return { status: 'error', addresses: [], - error: e - }) + error: JSON.stringify(e) + } } +} +workerPool.worker({ + deriveAddresses: deriveAddresses }) diff --git a/lib/fork-pool.js b/lib/fork-pool.js deleted file mode 100644 index 16df8e5..0000000 --- a/lib/fork-pool.js +++ /dev/null @@ -1,85 +0,0 @@ -/*! - * lib/fork-pool.js - * Copyright © 2019 – Katana Cryptographic Ltd. All Rights Reserved. - */ -'use strict' - -const os = require('os') -const childProcess = require('child_process') -const genericPool = require('generic-pool') -const Logger = require('./logger') - - -/** - * A class managing a pool of child processes - * Inspired from fork-pool by Andrew Sliwinski - * https://github.com/thisandagain/fork-pool/ - */ -class ForkPool { - - /** - * Constructor - */ - constructor(path, options) { - if (!options) { - this._networkKey = '' - this._options = { - max: os.cpus().length / 2, - min: os.cpus().length / 2, - acquireTimeoutMillis: 60000 - } - } else { - this._networkKey = options.networkKey - this._options = options - } - - const factory = { - create: () => { - return childProcess.fork(path, [this._networkKey]) - }, - destroy: (cp) => { - cp.kill() - } - } - - this.pool = genericPool.createPool(factory, this._options) - Logger.info(`Created ${this._options.min} child processes for addresses derivation (max = ${this._options.max})`) - } - - /** - * Enqueue a new task to be processed by a child process - * @param {object} data - data to be passed to the child process - * @returns {Promise} - */ - async enqueue(data) { - let cp - const pool = this.pool - - return new Promise(async (resolve, reject) => { - try { - cp = await pool.acquire() - - cp.send(data) - - cp.once('message', async msg => { - pool.release(cp) - resolve(msg) - }) - - } catch(e) { - reject(e) - } - }) - } - - /** - * Drain the pool - */ - async drain() { - await this.pool.drain() - await this.pool.clear() - } - -} - -module.exports = ForkPool diff --git a/package-lock.json b/package-lock.json index 63349df..4c1eaf9 100644 --- a/package-lock.json +++ b/package-lock.json @@ -508,6 +508,7 @@ "requires": { "anymatch": "~3.1.1", "braces": "~3.0.2", + "fsevents": "~2.1.1", "glob-parent": "~5.1.0", "is-binary-path": "~2.1.0", "is-glob": "~4.0.1", @@ -1121,11 +1122,6 @@ "wide-align": "^1.1.0" } }, - "generic-pool": { - "version": "3.4.2", - "resolved": "https://registry.npmjs.org/generic-pool/-/generic-pool-3.4.2.tgz", - "integrity": "sha512-H7cUpwCQSiJmAHM4c/aFu6fUfrhWXW1ncyh8ftxEPMu6AiYkHw9K8br720TGPZJbk5eOH2bynjZD1yPvdDAmag==" - }, "get-caller-file": { "version": "2.0.5", "resolved": "https://registry.npmjs.org/get-caller-file/-/get-caller-file-2.0.5.tgz", @@ -2540,6 +2536,11 @@ "bs58check": "<3.0.0" } }, + "workerpool": { + "version": "6.1.4", + "resolved": "https://registry.npmjs.org/workerpool/-/workerpool-6.1.4.tgz", + "integrity": "sha512-jGWPzsUqzkow8HoAvqaPWTUPCrlPJaJ5tY8Iz7n1uCz3tTp6s3CDG0FF1NsX42WNlkRSW6Mr+CDZGnNoSsKa7g==" + }, "wrap-ansi": { "version": "5.1.0", "resolved": "https://registry.npmjs.org/wrap-ansi/-/wrap-ansi-5.1.0.tgz", diff --git a/package.json b/package.json index feb7aac..a6ec254 100644 --- a/package.json +++ b/package.json @@ -23,7 +23,6 @@ "body-parser": "1.18.3", "express": "4.16.3", "express-jwt": "5.3.1", - "generic-pool": "3.4.2", "helmet": "3.23.3", "lodash": "4.17.19", "lru-cache": "4.0.2", @@ -34,6 +33,7 @@ "socks-proxy-agent": "4.0.1", "validator": "10.8.0", "websocket": "1.0.28", + "workerpool": "6.1.4", "zeromq": "4.2.0" }, "devDependencies": {