Browse Source

concurrency fixes for using "async/queue" improperly

fix-133-memory-crash
Dan Janosik 6 years ago
parent
commit
ff87228094
No known key found for this signature in database GPG Key ID: C6F8CE9FFDB2CED2
  1. 2
      app.js
  2. 30
      app/api/rpcApi.js

2
app.js

@ -75,7 +75,7 @@ app.use(session({
app.use(express.static(path.join(__dirname, 'public'))); app.use(express.static(path.join(__dirname, 'public')));
process.on("unhandledRejection", (reason, p) => { process.on("unhandledRejection", (reason, p) => {
console.log("Unhandled Rejection at: Promise", p, "reason:", reason, "stack:", reason.stack); console.log("Unhandled Rejection at: Promise", p, "reason:", reason, "stack:", (reason != null ? reason.stack : "null"));
if (global.influxdb) { if (global.influxdb) {
var points = []; var points = [];

30
app/api/rpcApi.js

@ -6,13 +6,18 @@ var utils = require("../utils.js");
var config = require("../config.js"); var config = require("../config.js");
var coins = require("../coins.js"); var coins = require("../coins.js");
var activeQueueTasks = 0;
var rpcQueue = async.queue(function(task, callback) { var rpcQueue = async.queue(function(task, callback) {
task.rpcCall(); activeQueueTasks++;
//console.log("activeQueueTasks: " + activeQueueTasks);
if (callback != null) { task.rpcCall(function() {
callback(); callback();
}
activeQueueTasks--;
//console.log("activeQueueTasks: " + activeQueueTasks);
});
}, config.rpcConcurrency); }, config.rpcConcurrency);
@ -255,18 +260,21 @@ function getRpcData(cmd) {
return new Promise(function(resolve, reject) { return new Promise(function(resolve, reject) {
debug(`RPC: ${cmd}`); debug(`RPC: ${cmd}`);
rpcCall = function() { rpcCall = function(callback) {
client.command(cmd, function(err, result, resHeaders) { client.command(cmd, function(err, result, resHeaders) {
if (err) { if (err) {
console.log(`Error for RPC command '${cmd}': ${err}`); console.log(`Error for RPC command '${cmd}': ${err}`);
reject(err); reject(err);
} else { callback();
resolve(result);
return;
} }
}).catch(function(err) {
reject(err); resolve(result);
callback();
}); });
}; };
@ -278,17 +286,21 @@ function getRpcDataWithParams(request) {
return new Promise(function(resolve, reject) { return new Promise(function(resolve, reject) {
debug(`RPC: ${request}`); debug(`RPC: ${request}`);
rpcCall = function() { rpcCall = function(callback) {
client.command([request], function(err, result, resHeaders) { client.command([request], function(err, result, resHeaders) {
if (err != null) { if (err != null) {
console.log(`Error for RPC command ${JSON.stringify(request)}: ${err}, headers=${resHeaders}`); console.log(`Error for RPC command ${JSON.stringify(request)}: ${err}, headers=${resHeaders}`);
reject(err); reject(err);
callback();
return; return;
} }
resolve(result[0]); resolve(result[0]);
callback();
}); });
}; };

Loading…
Cancel
Save