Browse Source

concurrency + simplicity improvements for rpcApi:

- support for configurable RPC concurrency level (default 10, to be under bitcoind default "rpcworkqueue=16")
- queue up requests to prevent overloading RPC work queue of bitcoind
- simplify / cleanup rpcApi
fix-133-memory-crash
Dan Janosik 6 years ago
parent
commit
c1082c0b4f
No known key found for this signature in database GPG Key ID: C6F8CE9FFDB2CED2
  1. 4
      .env-sample
  2. 232
      app/api/rpcApi.js
  3. 2
      app/config.js
  4. 8
      npm-shrinkwrap.json
  5. 1
      package.json

4
.env-sample

@ -26,6 +26,10 @@
#BTCEXP_INFLUXDB_USER=dbuser
#BTCEXP_INFLUXDB_PASS=dbpassword
# Set number of concurrent RPC requests. Should be lower than your node's "rpcworkqueue" value.
# The default for this value is 10, aiming to be less than Bitcoin Core's default rpcworkqueue=16.
#BTCEXP_RPC_CONCURRENCY=10
# Disable app's in-memory RPC caching to reduce memory usage
#BTCEXP_NO_INMEMORY_RPC_CACHE=true

232
app/api/rpcApi.js

@ -1,9 +1,23 @@
var debug = require('debug')('rpcApi');
var debug = require('debug')('btcexp:rpcApi');
var async = require("async");
var utils = require("../utils.js");
var config = require("../config.js");
var coins = require("../coins.js");
var rpcQueue = async.queue(function(task, callback) {
task.rpcCall();
if (callback != null) {
callback();
}
}, config.rpcConcurrency);
function getBlockchainInfo() {
return getRpcData("getblockchaininfo");
}
@ -33,51 +47,22 @@ function getPeerInfo() {
}
function getRawMempool() {
return getRpcDataWithParams("getrawmempool", true);
return getRpcDataWithParams({method:"getrawmempool", parameters:[true]});
}
function getChainTxStats(blockCount) {
return getRpcDataWithParams("getchaintxstats", blockCount);
return getRpcDataWithParams({method:"getchaintxstats", parameters:[blockCount]});
}
function getBlockByHeight(blockHeight) {
return new Promise(function(resolve, reject) {
getBlocksByHeight([blockHeight]).then(function(results) {
if (results && results.length > 0) {
resolve(results[0]);
getRpcDataWithParams({method:"getblockhash", parameters:[blockHeight]}).then(function(blockhash) {
getBlockByHash(blockhash).then(function(block) {
resolve(block);
} else {
resolve(null);
}
}).catch(function(err) {
reject(err);
});
});
}
function getBlocksByHeight(blockHeights) {
//debug("getBlocksByHeight: " + blockHeights);
return new Promise(function(resolve, reject) {
var batch = [];
for (var i = 0; i < blockHeights.length; i++) {
batch.push({
method: 'getblockhash',
parameters: [ blockHeights[i] ]
});
}
var blockHashes = [];
client.command(batch).then((responses) => {
responses.forEach((item) => {
blockHashes.push(item);
});
if (blockHashes.length == batch.length) {
getBlocksByHash(blockHashes).then(function(blocks) {
resolve(blocks);
});
}
}).catch(function(err) {
reject(err);
});
@ -85,73 +70,20 @@ function getBlocksByHeight(blockHeights) {
}
function getBlockByHash(blockHash) {
return new Promise(function(resolve, reject) {
getBlocksByHash([blockHash]).then(function(results) {
if (results && results.length > 0) {
resolve(results[0]);
} else {
resolve(null);
}
}).catch(function(err) {
reject(err);
});
});
}
function getBlocksByHash(blockHashes) {
debug("rpc.getBlocksByHash: " + blockHashes);
debug("getBlockByHash: %s", blockHash);
return new Promise(function(resolve, reject) {
var batch = [];
for (var i = 0; i < blockHashes.length; i++) {
batch.push({
method: 'getblock',
parameters: [ blockHashes[i] ]
});
}
var blocks = [];
client.command(batch).then((responses) => {
responses.forEach((item) => {
if (item.tx) {
blocks.push(item);
}
});
getRpcDataWithParams({method:"getblock", parameters:[blockHash]}).then(function(block) {
getRawTransaction(block.tx[0]).then(function(tx) {
block.coinbaseTx = tx;
block.totalFees = utils.getBlockTotalFeesFromCoinbaseTxAndBlockHeight(tx, block.height);
block.miner = utils.getMinerFromCoinbaseTx(tx);
var coinbaseTxids = [];
for (var i = 0; i < blocks.length; i++) {
coinbaseTxids.push(blocks[i].tx[0])
}
getRawTransactions(coinbaseTxids).then(function(coinbaseTxs) {
for (var i = 0; i < blocks.length; i++) {
blocks[i].coinbaseTx = coinbaseTxs[i];
blocks[i].totalFees = utils.getBlockTotalFeesFromCoinbaseTxAndBlockHeight(coinbaseTxs[i], blocks[i].height);
blocks[i].miner = utils.getMinerFromCoinbaseTx(coinbaseTxs[i]);
}
resolve(block);
resolve(blocks);
});
}).catch(function(err) {
reject(err);
});
});
}
function getRawTransaction(txid) {
return new Promise(function(resolve, reject) {
getRawTransactions([txid]).then(function(results) {
if (results && results.length > 0) {
if (results[0].txid) {
resolve(results[0]);
} else {
resolve(null);
}
} else {
resolve(null);
}
}).catch(function(err) {
reject(err);
});
@ -159,25 +91,13 @@ function getRawTransaction(txid) {
}
function getAddress(address) {
return getRpcDataWithParams("validateaddress", address);
return getRpcDataWithParams({method:"validateaddress", parameters:[address]});
}
function getRawTransactions(txids) {
//debug("getRawTransactions: " + txids);
function getRawTransaction(txid) {
debug("getRawTransaction: %s", txid);
return new Promise(function(resolve, reject) {
if (!txids || txids.length == 0) {
resolve([]);
return;
}
var requests = [];
var promises = [];
for (var i = 0; i < txids.length; i++) {
var txid = txids[i];
if (txid) {
if (coins[config.coin].genesisCoinbaseTransactionId && txid == coins[config.coin].genesisCoinbaseTransactionId) {
// copy the "confirmations" field from genesis block to the genesis-coinbase tx
promises.push(new Promise(function(resolve2, reject2) {
@ -185,43 +105,21 @@ function getRawTransactions(txids) {
var result = coins[config.coin].genesisCoinbaseTransaction;
result.confirmations = blockchainInfoResult.blocks;
resolve2([result]);
resolve([result]);
}).catch(function(err) {
reject2(err);
reject(err);
});
}));
} else {
requests.push({
method: 'getrawtransaction',
parameters: [ txid, 1 ]
});
}
}
}
var requestBatches = utils.splitArrayIntoChunks(requests, 100);
promises.push(new Promise(function(resolve2, reject2) {
executeBatchesSequentially(requestBatches, function(results) {
resolve2(results);
});
}));
Promise.all(promises).then(function(results) {
var finalResults = [];
for (var i = 0; i < results.length; i++) {
for (var j = 0; j < results[i].length; j++) {
finalResults.push(results[i][j]);
}
}
resolve(finalResults);
getRpcDataWithParams({method:"getrawtransaction", parameters:[txid, 1]}).then(function(result) {
resolve(result);
}).catch(function(err) {
reject(err);
});
}
});
}
@ -356,9 +254,11 @@ function getRpcMethodHelp(methodName) {
function getRpcData(cmd) {
return new Promise(function(resolve, reject) {
debug(`RPC: ${cmd}`);
rpcCall = function() {
client.command(cmd, function(err, result, resHeaders) {
if (err) {
console.log("Error for RPC command '" + cmd + "': " + err);
console.log(`Error for RPC command '${cmd}': ${err}`);
reject(err);
@ -368,61 +268,31 @@ function getRpcData(cmd) {
}).catch(function(err) {
reject(err);
});
};
rpcQueue.push({rpcCall:rpcCall});
});
}
function getRpcDataWithParams(cmd, params) {
function getRpcDataWithParams(request) {
return new Promise(function(resolve, reject) {
debug(`RPC: ${cmd}(${params})`);
client.command(cmd, params, function(err, result, resHeaders) {
if (err) {
console.log("Error for RPC command '" + cmd + "': " + err);
debug(`RPC: ${request}`);
reject(err);
rpcCall = function() {
client.command([request], function(err, result, resHeaders) {
if (err != null) {
console.log(`Error for RPC command ${request}: ${err}`);
} else {
resolve(result);
}
}).catch(function(err) {
reject(err);
});
});
}
function executeBatchesSequentially(batches, resultFunc) {
var batchId = utils.getRandomString(20, 'aA#');
debug(`Starting ${batches.length}-item batch ${batchId}...`);
executeBatchesSequentiallyInternal(batchId, batches, 0, [], resultFunc);
}
function executeBatchesSequentiallyInternal(batchId, batches, currentIndex, accumulatedResults, resultFunc) {
if (currentIndex == batches.length) {
debug(`Finishing batch ${batchId}...`);
resultFunc(accumulatedResults);
return;
}
debug(`Executing item #${(currentIndex + 1)} (of ${batches.length}) for batch ${batchId}`);
var count = batches[currentIndex].length;
debug(`RPC: ${JSON.stringify(batches[currentIndex])}`);
client.command(batches[currentIndex]).then(function(results) {
results.forEach((item) => {
accumulatedResults.push(item);
count--;
resolve(result[0]);
});
};
if (count == 0) {
executeBatchesSequentiallyInternal(batchId, batches, currentIndex + 1, accumulatedResults, resultFunc);
}
}).catch(function(err) {
throw err;
rpcQueue.push({rpcCall:rpcCall});
});
}
@ -434,10 +304,8 @@ module.exports = {
getMempoolInfo: getMempoolInfo,
getMiningInfo: getMiningInfo,
getBlockByHeight: getBlockByHeight,
getBlocksByHeight: getBlocksByHeight,
getBlockByHash: getBlockByHash,
getRawTransaction: getRawTransaction,
getRawTransactions: getRawTransactions,
getRawMempool: getRawMempool,
getUptimeSeconds: getUptimeSeconds,
getHelp: getHelp,

2
app/config.js

@ -53,6 +53,8 @@ module.exports = {
noInmemoryRpcCache: (process.env.BTCEXP_NO_INMEMORY_RPC_CACHE.toLowerCase() == "true"),
coin: currentCoin,
rpcConcurrency: (process.env.BTCEXP_RPC_CONCURRENCY || 10),
rpcBlacklist:
process.env.BTCEXP_RPC_ALLOWALL ? []
: process.env.BTCEXP_RPC_BLACKLIST ? process.env.BTCEXP_RPC_BLACKLIST.split(',').filter(Boolean)

8
npm-shrinkwrap.json

@ -148,6 +148,14 @@
"resolved": "https://registry.npmjs.org/assert-plus/-/assert-plus-1.0.0.tgz",
"integrity": "sha1-8S4PPF13sLHN2RRpQuTpbB5N1SU="
},
"async": {
"version": "2.6.2",
"resolved": "https://registry.npmjs.org/async/-/async-2.6.2.tgz",
"integrity": "sha512-H1qVYh1MYhEEFLsP97cVKqCGo7KfCyTt6uEWqsTBr9SO84oK9Uwbyd/yCW+6rKJLHksBNUVWZDAjfS+Ccx0Bbg==",
"requires": {
"lodash": "^4.17.11"
}
},
"asynckit": {
"version": "0.4.0",
"resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz",

1
package.json

@ -23,6 +23,7 @@
"url": "git+https://github.com/janoside/btc-rpc-explorer.git"
},
"dependencies": {
"async": "2.6.2",
"basic-auth": "^2.0.1",
"bitcoin-core": "2.0.0",
"bitcoinjs-lib": "3.3.2",

Loading…
Cancel
Save