From c1082c0b4f4e8fbd493b3d5c34957cea4737e26c Mon Sep 17 00:00:00 2001 From: Dan Janosik Date: Sun, 3 Mar 2019 16:30:12 -0500 Subject: [PATCH] concurrency + simplicity improvements for rpcApi: - support for configurable RPC concurrency level (default 10, to be under bitcoind default "rpcworkqueue=16") - queue up requests to prevent overloading RPC work queue of bitcoind - simplify / cleanup rpcApi --- .env-sample | 4 + app/api/rpcApi.js | 290 ++++++++++++-------------------------------- app/config.js | 2 + npm-shrinkwrap.json | 8 ++ package.json | 1 + 5 files changed, 94 insertions(+), 211 deletions(-) diff --git a/.env-sample b/.env-sample index c1cd452..98f70bc 100644 --- a/.env-sample +++ b/.env-sample @@ -26,6 +26,10 @@ #BTCEXP_INFLUXDB_USER=dbuser #BTCEXP_INFLUXDB_PASS=dbpassword +# Set number of concurrent RPC requests. Should be lower than your node's "rpcworkqueue" value. +# The default for this value is 10, aiming to be less than Bitcoin Core's default rpcworkqueue=16. +#BTCEXP_RPC_CONCURRENCY=10 + # Disable app's in-memory RPC caching to reduce memory usage #BTCEXP_NO_INMEMORY_RPC_CACHE=true diff --git a/app/api/rpcApi.js b/app/api/rpcApi.js index 60fe25f..cdc6baf 100644 --- a/app/api/rpcApi.js +++ b/app/api/rpcApi.js @@ -1,9 +1,23 @@ -var debug = require('debug')('rpcApi'); +var debug = require('debug')('btcexp:rpcApi'); + +var async = require("async"); + var utils = require("../utils.js"); var config = require("../config.js"); var coins = require("../coins.js"); +var rpcQueue = async.queue(function(task, callback) { + task.rpcCall(); + + if (callback != null) { + callback(); + } + +}, config.rpcConcurrency); + + + function getBlockchainInfo() { return getRpcData("getblockchaininfo"); } @@ -33,51 +47,22 @@ function getPeerInfo() { } function getRawMempool() { - return getRpcDataWithParams("getrawmempool", true); + return getRpcDataWithParams({method:"getrawmempool", parameters:[true]}); } function getChainTxStats(blockCount) { - return getRpcDataWithParams("getchaintxstats", blockCount); + return getRpcDataWithParams({method:"getchaintxstats", parameters:[blockCount]}); } function getBlockByHeight(blockHeight) { return new Promise(function(resolve, reject) { - getBlocksByHeight([blockHeight]).then(function(results) { - if (results && results.length > 0) { - resolve(results[0]); - - } else { - resolve(null); - } - }).catch(function(err) { - reject(err); - }); - }); -} - -function getBlocksByHeight(blockHeights) { - //debug("getBlocksByHeight: " + blockHeights); - - return new Promise(function(resolve, reject) { - var batch = []; - for (var i = 0; i < blockHeights.length; i++) { - batch.push({ - method: 'getblockhash', - parameters: [ blockHeights[i] ] - }); - } + getRpcDataWithParams({method:"getblockhash", parameters:[blockHeight]}).then(function(blockhash) { + getBlockByHash(blockhash).then(function(block) { + resolve(block); - var blockHashes = []; - client.command(batch).then((responses) => { - responses.forEach((item) => { - blockHashes.push(item); + }).catch(function(err) { + reject(err); }); - - if (blockHashes.length == batch.length) { - getBlocksByHash(blockHashes).then(function(blocks) { - resolve(blocks); - }); - } }).catch(function(err) { reject(err); }); @@ -85,53 +70,19 @@ function getBlocksByHeight(blockHeights) { } function getBlockByHash(blockHash) { - return new Promise(function(resolve, reject) { - getBlocksByHash([blockHash]).then(function(results) { - if (results && results.length > 0) { - resolve(results[0]); - - } else { - resolve(null); - } - }).catch(function(err) { - reject(err); - }); - }); -} - -function getBlocksByHash(blockHashes) { - debug("rpc.getBlocksByHash: " + blockHashes); + debug("getBlockByHash: %s", blockHash); return new Promise(function(resolve, reject) { - var batch = []; - for (var i = 0; i < blockHashes.length; i++) { - batch.push({ - method: 'getblock', - parameters: [ blockHashes[i] ] - }); - } + getRpcDataWithParams({method:"getblock", parameters:[blockHash]}).then(function(block) { + getRawTransaction(block.tx[0]).then(function(tx) { + block.coinbaseTx = tx; + block.totalFees = utils.getBlockTotalFeesFromCoinbaseTxAndBlockHeight(tx, block.height); + block.miner = utils.getMinerFromCoinbaseTx(tx); - var blocks = []; - client.command(batch).then((responses) => { - responses.forEach((item) => { - if (item.tx) { - blocks.push(item); - } - }); - - var coinbaseTxids = []; - for (var i = 0; i < blocks.length; i++) { - coinbaseTxids.push(blocks[i].tx[0]) - } + resolve(block); - getRawTransactions(coinbaseTxids).then(function(coinbaseTxs) { - for (var i = 0; i < blocks.length; i++) { - blocks[i].coinbaseTx = coinbaseTxs[i]; - blocks[i].totalFees = utils.getBlockTotalFeesFromCoinbaseTxAndBlockHeight(coinbaseTxs[i], blocks[i].height); - blocks[i].miner = utils.getMinerFromCoinbaseTx(coinbaseTxs[i]); - } - - resolve(blocks); + }).catch(function(err) { + reject(err); }); }).catch(function(err) { reject(err); @@ -139,89 +90,36 @@ function getBlocksByHash(blockHashes) { }); } -function getRawTransaction(txid) { - return new Promise(function(resolve, reject) { - getRawTransactions([txid]).then(function(results) { - if (results && results.length > 0) { - if (results[0].txid) { - resolve(results[0]); - - } else { - resolve(null); - } - } else { - resolve(null); - } - }).catch(function(err) { - reject(err); - }); - }); -} - function getAddress(address) { - return getRpcDataWithParams("validateaddress", address); + return getRpcDataWithParams({method:"validateaddress", parameters:[address]}); } -function getRawTransactions(txids) { - //debug("getRawTransactions: " + txids); +function getRawTransaction(txid) { + debug("getRawTransaction: %s", txid); return new Promise(function(resolve, reject) { - if (!txids || txids.length == 0) { - resolve([]); + if (coins[config.coin].genesisCoinbaseTransactionId && txid == coins[config.coin].genesisCoinbaseTransactionId) { + // copy the "confirmations" field from genesis block to the genesis-coinbase tx + promises.push(new Promise(function(resolve2, reject2) { + getBlockchainInfo().then(function(blockchainInfoResult) { + var result = coins[config.coin].genesisCoinbaseTransaction; + result.confirmations = blockchainInfoResult.blocks; - return; - } - - var requests = []; - var promises = []; - for (var i = 0; i < txids.length; i++) { - var txid = txids[i]; - - if (txid) { - if (coins[config.coin].genesisCoinbaseTransactionId && txid == coins[config.coin].genesisCoinbaseTransactionId) { - // copy the "confirmations" field from genesis block to the genesis-coinbase tx - promises.push(new Promise(function(resolve2, reject2) { - getBlockchainInfo().then(function(blockchainInfoResult) { - var result = coins[config.coin].genesisCoinbaseTransaction; - result.confirmations = blockchainInfoResult.blocks; - - resolve2([result]); - - }).catch(function(err) { - reject2(err); - }); - })); + resolve([result]); - } else { - requests.push({ - method: 'getrawtransaction', - parameters: [ txid, 1 ] - }); - } - } - } + }).catch(function(err) { + reject(err); + }); + })); - var requestBatches = utils.splitArrayIntoChunks(requests, 100); + } else { + getRpcDataWithParams({method:"getrawtransaction", parameters:[txid, 1]}).then(function(result) { + resolve(result); - promises.push(new Promise(function(resolve2, reject2) { - executeBatchesSequentially(requestBatches, function(results) { - resolve2(results); + }).catch(function(err) { + reject(err); }); - })); - - Promise.all(promises).then(function(results) { - var finalResults = []; - for (var i = 0; i < results.length; i++) { - for (var j = 0; j < results[i].length; j++) { - finalResults.push(results[i][j]); - } - } - - resolve(finalResults); - - }).catch(function(err) { - reject(err); - }); + } }); } @@ -356,73 +254,45 @@ function getRpcMethodHelp(methodName) { function getRpcData(cmd) { return new Promise(function(resolve, reject) { debug(`RPC: ${cmd}`); - client.command(cmd, function(err, result, resHeaders) { - if (err) { - console.log("Error for RPC command '" + cmd + "': " + err); - - reject(err); - } else { - resolve(result); - } - }).catch(function(err) { - reject(err); - }); - }); -} + rpcCall = function() { + client.command(cmd, function(err, result, resHeaders) { + if (err) { + console.log(`Error for RPC command '${cmd}': ${err}`); -function getRpcDataWithParams(cmd, params) { - return new Promise(function(resolve, reject) { - debug(`RPC: ${cmd}(${params})`); - client.command(cmd, params, function(err, result, resHeaders) { - if (err) { - console.log("Error for RPC command '" + cmd + "': " + err); + reject(err); + } else { + resolve(result); + } + }).catch(function(err) { reject(err); - - } else { - resolve(result); - } - }).catch(function(err) { - reject(err); - }); + }); + }; + + rpcQueue.push({rpcCall:rpcCall}); }); } -function executeBatchesSequentially(batches, resultFunc) { - var batchId = utils.getRandomString(20, 'aA#'); - - debug(`Starting ${batches.length}-item batch ${batchId}...`); - - executeBatchesSequentiallyInternal(batchId, batches, 0, [], resultFunc); -} - -function executeBatchesSequentiallyInternal(batchId, batches, currentIndex, accumulatedResults, resultFunc) { - if (currentIndex == batches.length) { - debug(`Finishing batch ${batchId}...`); - - resultFunc(accumulatedResults); - - return; - } - - debug(`Executing item #${(currentIndex + 1)} (of ${batches.length}) for batch ${batchId}`); +function getRpcDataWithParams(request) { + return new Promise(function(resolve, reject) { + debug(`RPC: ${request}`); - var count = batches[currentIndex].length; + rpcCall = function() { + client.command([request], function(err, result, resHeaders) { + if (err != null) { + console.log(`Error for RPC command ${request}: ${err}`); - debug(`RPC: ${JSON.stringify(batches[currentIndex])}`); - client.command(batches[currentIndex]).then(function(results) { - results.forEach((item) => { - accumulatedResults.push(item); + reject(err); - count--; - }); + return; + } - if (count == 0) { - executeBatchesSequentiallyInternal(batchId, batches, currentIndex + 1, accumulatedResults, resultFunc); - } - }).catch(function(err) { - throw err; + resolve(result[0]); + }); + }; + + rpcQueue.push({rpcCall:rpcCall}); }); } @@ -434,10 +304,8 @@ module.exports = { getMempoolInfo: getMempoolInfo, getMiningInfo: getMiningInfo, getBlockByHeight: getBlockByHeight, - getBlocksByHeight: getBlocksByHeight, getBlockByHash: getBlockByHash, getRawTransaction: getRawTransaction, - getRawTransactions: getRawTransactions, getRawMempool: getRawMempool, getUptimeSeconds: getUptimeSeconds, getHelp: getHelp, diff --git a/app/config.js b/app/config.js index 6419c02..4271ac2 100644 --- a/app/config.js +++ b/app/config.js @@ -52,6 +52,8 @@ module.exports = { queryExchangeRates: (process.env.BTCEXP_NO_RATES.toLowerCase() != "true"), noInmemoryRpcCache: (process.env.BTCEXP_NO_INMEMORY_RPC_CACHE.toLowerCase() == "true"), coin: currentCoin, + + rpcConcurrency: (process.env.BTCEXP_RPC_CONCURRENCY || 10), rpcBlacklist: process.env.BTCEXP_RPC_ALLOWALL ? [] diff --git a/npm-shrinkwrap.json b/npm-shrinkwrap.json index 5a83a58..8ee195f 100644 --- a/npm-shrinkwrap.json +++ b/npm-shrinkwrap.json @@ -148,6 +148,14 @@ "resolved": "https://registry.npmjs.org/assert-plus/-/assert-plus-1.0.0.tgz", "integrity": "sha1-8S4PPF13sLHN2RRpQuTpbB5N1SU=" }, + "async": { + "version": "2.6.2", + "resolved": "https://registry.npmjs.org/async/-/async-2.6.2.tgz", + "integrity": "sha512-H1qVYh1MYhEEFLsP97cVKqCGo7KfCyTt6uEWqsTBr9SO84oK9Uwbyd/yCW+6rKJLHksBNUVWZDAjfS+Ccx0Bbg==", + "requires": { + "lodash": "^4.17.11" + } + }, "asynckit": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", diff --git a/package.json b/package.json index 71909ce..b227cf9 100644 --- a/package.json +++ b/package.json @@ -23,6 +23,7 @@ "url": "git+https://github.com/janoside/btc-rpc-explorer.git" }, "dependencies": { + "async": "2.6.2", "basic-auth": "^2.0.1", "bitcoin-core": "2.0.0", "bitcoinjs-lib": "3.3.2",