|
|
@ -29,14 +29,17 @@ try { |
|
|
|
require("./lib/parser/hiredis"); |
|
|
|
parsers.push(require("./lib/parser/hiredis")); |
|
|
|
} catch (err) { |
|
|
|
/* istanbul ignore next: won't be reached with tests */ |
|
|
|
debug("hiredis parser not installed."); |
|
|
|
} |
|
|
|
|
|
|
|
parsers.push(require("./lib/parser/javascript")); |
|
|
|
|
|
|
|
function RedisClient(stream, options) { |
|
|
|
options = options || {}; |
|
|
|
|
|
|
|
this.stream = stream; |
|
|
|
this.options = options = options || {}; |
|
|
|
this.options = options; |
|
|
|
|
|
|
|
this.connection_id = ++connection_id; |
|
|
|
this.connected = false; |
|
|
@ -51,26 +54,23 @@ function RedisClient(stream, options) { |
|
|
|
this.should_buffer = false; |
|
|
|
this.command_queue_high_water = this.options.command_queue_high_water || 1000; |
|
|
|
this.command_queue_low_water = this.options.command_queue_low_water || 0; |
|
|
|
this.max_attempts = null; |
|
|
|
if (options.max_attempts && !isNaN(options.max_attempts) && options.max_attempts > 0) { |
|
|
|
if (options.max_attempts && options.max_attempts > 0) { |
|
|
|
this.max_attempts = +options.max_attempts; |
|
|
|
} |
|
|
|
this.command_queue = new Queue(); // holds sent commands to de-pipeline them
|
|
|
|
this.offline_queue = new Queue(); // holds commands issued but not able to be sent
|
|
|
|
this.commands_sent = 0; |
|
|
|
this.connect_timeout = false; |
|
|
|
if (options.connect_timeout && !isNaN(options.connect_timeout) && options.connect_timeout > 0) { |
|
|
|
if (options.connect_timeout && options.connect_timeout > 0) { |
|
|
|
this.connect_timeout = +options.connect_timeout; |
|
|
|
} |
|
|
|
this.enable_offline_queue = true; |
|
|
|
if (typeof this.options.enable_offline_queue === "boolean") { |
|
|
|
this.enable_offline_queue = this.options.enable_offline_queue; |
|
|
|
if (this.options.enable_offline_queue === false) { |
|
|
|
this.enable_offline_queue = false; |
|
|
|
} |
|
|
|
this.retry_max_delay = null; |
|
|
|
if (options.retry_max_delay !== undefined && !isNaN(options.retry_max_delay) && options.retry_max_delay > 0) { |
|
|
|
this.retry_max_delay = options.retry_max_delay; |
|
|
|
if (options.retry_max_delay && options.retry_max_delay > 0) { |
|
|
|
this.retry_max_delay = +options.retry_max_delay; |
|
|
|
} |
|
|
|
|
|
|
|
this.initialize_retry_vars(); |
|
|
|
this.pub_sub_mode = false; |
|
|
|
this.subscription_set = {}; |
|
|
@ -131,12 +131,10 @@ RedisClient.prototype.initialize_retry_vars = function () { |
|
|
|
}; |
|
|
|
|
|
|
|
RedisClient.prototype.unref = function () { |
|
|
|
debug("User requesting to unref the connection"); |
|
|
|
if (this.connected) { |
|
|
|
debug("unref'ing the socket connection"); |
|
|
|
this.stream.unref(); |
|
|
|
} |
|
|
|
else { |
|
|
|
} else { |
|
|
|
debug("Not connected yet, will unref later"); |
|
|
|
this.once("connect", function () { |
|
|
|
this.unref(); |
|
|
@ -210,23 +208,26 @@ RedisClient.prototype.do_auth = function () { |
|
|
|
self.send_anyway = true; |
|
|
|
self.send_command("auth", [this.auth_pass], function (err, res) { |
|
|
|
if (err) { |
|
|
|
/* istanbul ignore if: this is almost impossible to test */ |
|
|
|
if (loading.test(err.message)) { |
|
|
|
// if redis is still loading the db, it will not authenticate and everything else will fail
|
|
|
|
console.log("Redis still loading, trying to authenticate later"); |
|
|
|
debug("Redis still loading, trying to authenticate later"); |
|
|
|
setTimeout(function () { |
|
|
|
self.do_auth(); |
|
|
|
}, 2000); // TODO - magic number alert
|
|
|
|
return; |
|
|
|
} else if (noPasswordIsSet.test(err.message)) { |
|
|
|
console.log("Warning: Redis server does not require a password, but a password was supplied."); |
|
|
|
debug("Warning: Redis server does not require a password, but a password was supplied."); |
|
|
|
err = null; |
|
|
|
res = "OK"; |
|
|
|
} else { |
|
|
|
return self.emit("error", new Error("Auth error: " + err.message)); |
|
|
|
} |
|
|
|
} |
|
|
|
if (res.toString() !== "OK") { |
|
|
|
return self.emit("error", new Error("Auth failed: " + res.toString())); |
|
|
|
|
|
|
|
res = res.toString(); |
|
|
|
if (res !== "OK") { |
|
|
|
return self.emit("error", new Error("Auth failed: " + res)); |
|
|
|
} |
|
|
|
|
|
|
|
debug("Auth succeeded " + self.address + " id " + self.connection_id); |
|
|
@ -283,7 +284,7 @@ RedisClient.prototype.init_parser = function () { |
|
|
|
var self = this; |
|
|
|
|
|
|
|
if (this.options.parser) { |
|
|
|
if (! parsers.some(function (parser) { |
|
|
|
if (!parsers.some(function (parser) { |
|
|
|
if (parser.name === self.options.parser) { |
|
|
|
self.parser_module = parser; |
|
|
|
debug("Using parser module: " + self.parser_module.name); |
|
|
@ -353,7 +354,9 @@ RedisClient.prototype.on_ready = function () { |
|
|
|
self.send_command(parts[0] + "scribe", [parts[1]], callback); |
|
|
|
}); |
|
|
|
return; |
|
|
|
} else if (this.monitoring) { |
|
|
|
} |
|
|
|
|
|
|
|
if (this.monitoring) { |
|
|
|
this.send_command("monitor", []); |
|
|
|
} else { |
|
|
|
this.send_offline_queue(); |
|
|
@ -378,7 +381,7 @@ RedisClient.prototype.on_info_cmd = function (err, res) { |
|
|
|
}); |
|
|
|
|
|
|
|
obj.versions = []; |
|
|
|
if( obj.redis_version ){ |
|
|
|
if (obj.redis_version) { |
|
|
|
obj.redis_version.split('.').forEach(function (num) { |
|
|
|
obj.versions.push(+num); |
|
|
|
}); |
|
|
@ -483,7 +486,7 @@ RedisClient.prototype.connection_gone = function (why) { |
|
|
|
this.retry_timer = null; |
|
|
|
// TODO - some people need a "Redis is Broken mode" for future commands that errors immediately, and others
|
|
|
|
// want the program to exit. Right now, we just log, which doesn't really help in either case.
|
|
|
|
console.error("node_redis: Couldn't get Redis connection after " + this.max_attempts + " attempts."); |
|
|
|
debug("node_redis: Couldn't get Redis connection after " + this.max_attempts + " attempts."); |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
@ -500,7 +503,7 @@ RedisClient.prototype.connection_gone = function (why) { |
|
|
|
if (self.connect_timeout && self.retry_totaltime >= self.connect_timeout) { |
|
|
|
self.retry_timer = null; |
|
|
|
// TODO - engage Redis is Broken mode for future commands, or whatever
|
|
|
|
console.error("node_redis: Couldn't get Redis connection after " + self.retry_totaltime + "ms."); |
|
|
|
debug("node_redis: Couldn't get Redis connection after " + self.retry_totaltime + "ms."); |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
@ -525,7 +528,7 @@ RedisClient.prototype.on_data = function (data) { |
|
|
|
}; |
|
|
|
|
|
|
|
RedisClient.prototype.return_error = function (err) { |
|
|
|
var command_obj = this.command_queue.shift(), queue_len = this.command_queue.getLength(); |
|
|
|
var command_obj = this.command_queue.shift(), queue_len = this.command_queue.length; |
|
|
|
|
|
|
|
if (this.pub_sub_mode === false && queue_len === 0) { |
|
|
|
this.command_queue = new Queue(); |
|
|
@ -536,7 +539,6 @@ RedisClient.prototype.return_error = function (err) { |
|
|
|
this.should_buffer = false; |
|
|
|
} |
|
|
|
|
|
|
|
if (command_obj && typeof command_obj.callback === "function") { |
|
|
|
try { |
|
|
|
command_obj.callback(err); |
|
|
|
} catch (callback_err) { |
|
|
@ -545,13 +547,6 @@ RedisClient.prototype.return_error = function (err) { |
|
|
|
throw callback_err; |
|
|
|
}); |
|
|
|
} |
|
|
|
} else { |
|
|
|
console.log("node_redis: no callback to send error: " + err.message); |
|
|
|
// this will probably not make it anywhere useful, but we might as well throw
|
|
|
|
process.nextTick(function () { |
|
|
|
throw err; |
|
|
|
}); |
|
|
|
} |
|
|
|
}; |
|
|
|
|
|
|
|
// if a callback throws an exception, re-throw it on a new stack so the parser can keep going.
|
|
|
@ -628,7 +623,7 @@ RedisClient.prototype.return_reply = function (reply) { |
|
|
|
command_obj = this.command_queue.shift(); |
|
|
|
} |
|
|
|
|
|
|
|
queue_len = this.command_queue.getLength(); |
|
|
|
queue_len = this.command_queue.length; |
|
|
|
|
|
|
|
if (this.pub_sub_mode === false && queue_len === 0) { |
|
|
|
this.command_queue = new Queue(); // explicitly reclaim storage from old Queue
|
|
|
@ -720,7 +715,7 @@ RedisClient.prototype.send_command = function (command, args, callback) { |
|
|
|
// probably the fastest way:
|
|
|
|
// client.command([arg1, arg2], cb); (straight passthrough)
|
|
|
|
// send_command(command, [arg1, arg2], cb);
|
|
|
|
} else if (! callback) { |
|
|
|
} else if (!callback) { |
|
|
|
// most people find this variable argument length form more convenient, but it uses arguments, which is slower
|
|
|
|
// client.command(arg1, arg2, cb); (wraps up arguments into an array)
|
|
|
|
// send_command(command, [arg1, arg2, cb]);
|
|
|
@ -739,7 +734,9 @@ RedisClient.prototype.send_command = function (command, args, callback) { |
|
|
|
throw new Error("send_command: second argument must be an array"); |
|
|
|
} |
|
|
|
|
|
|
|
if (callback && process.domain) callback = process.domain.bind(callback); |
|
|
|
if (callback && process.domain) { |
|
|
|
callback = process.domain.bind(callback); |
|
|
|
} |
|
|
|
|
|
|
|
// if the last argument is an array and command is sadd or srem, expand it out:
|
|
|
|
// client.sadd(arg1, [arg2, arg3, arg4], cb);
|
|
|
@ -844,7 +841,7 @@ RedisClient.prototype.send_command = function (command, args, callback) { |
|
|
|
} |
|
|
|
} |
|
|
|
debug("send_command buffered_writes: " + buffered_writes, " should_buffer: " + this.should_buffer); |
|
|
|
if (buffered_writes || this.command_queue.getLength() >= this.command_queue_high_water) { |
|
|
|
if (buffered_writes || this.command_queue.length >= this.command_queue_high_water) { |
|
|
|
this.should_buffer = true; |
|
|
|
} |
|
|
|
return !this.should_buffer; |
|
|
@ -1141,9 +1138,7 @@ Multi.prototype.EXEC = Multi.prototype.exec; |
|
|
|
RedisClient.prototype.multi = function (args) { |
|
|
|
return new Multi(this, args); |
|
|
|
}; |
|
|
|
RedisClient.prototype.MULTI = function (args) { |
|
|
|
return new Multi(this, args); |
|
|
|
}; |
|
|
|
RedisClient.prototype.MULTI = RedisClient.prototype.multi; |
|
|
|
|
|
|
|
|
|
|
|
// stash original eval method
|
|
|
@ -1177,26 +1172,26 @@ RedisClient.prototype.eval = RedisClient.prototype.EVAL = function () { |
|
|
|
}); |
|
|
|
}; |
|
|
|
|
|
|
|
exports.createClient = function(arg0, arg1, options) { |
|
|
|
if (typeof arg0 === 'object' || arg0 === undefined) { |
|
|
|
options = arg0 || options; |
|
|
|
exports.createClient = function(port_arg, host_arg, options) { |
|
|
|
if (typeof port_arg === 'object' || port_arg === undefined) { |
|
|
|
options = port_arg || options; |
|
|
|
return createClient_tcp(default_port, default_host, options); |
|
|
|
} |
|
|
|
if (typeof arg0 === 'number' || typeof arg0 === 'string' && arg0.match(/^\d+$/)){ |
|
|
|
return createClient_tcp(arg0, arg1, options); |
|
|
|
if (typeof port_arg === 'number' || typeof port_arg === 'string' && /^\d+$/.test(port_arg)) { |
|
|
|
return createClient_tcp(port_arg, host_arg, options); |
|
|
|
} |
|
|
|
if (typeof arg0 === 'string') { |
|
|
|
options = arg1 || {}; |
|
|
|
if (typeof port_arg === 'string') { |
|
|
|
options = host_arg || {}; |
|
|
|
|
|
|
|
var parsed = URL.parse(arg0, true, true); |
|
|
|
var parsed = URL.parse(port_arg, true, true); |
|
|
|
if (parsed.hostname) { |
|
|
|
if (parsed.auth) { |
|
|
|
options.auth_pass = parsed.auth.split(':')[1]; |
|
|
|
} |
|
|
|
return createClient_tcp((parsed.port || default_port), parsed.hostname, options); |
|
|
|
return createClient_tcp(parsed.port, parsed.hostname, options); |
|
|
|
} |
|
|
|
|
|
|
|
return createClient_unix(arg0, options); |
|
|
|
return createClient_unix(port_arg, options); |
|
|
|
} |
|
|
|
throw new Error('unknown type of connection in createClient()'); |
|
|
|
}; |
|
|
@ -1206,7 +1201,7 @@ var createClient_unix = function(path, options){ |
|
|
|
path: path |
|
|
|
}; |
|
|
|
var net_client = net.createConnection(cnxOptions); |
|
|
|
var redis_client = new RedisClient(net_client, options || {}); |
|
|
|
var redis_client = new RedisClient(net_client, options); |
|
|
|
|
|
|
|
redis_client.connectionOption = cnxOptions; |
|
|
|
redis_client.address = path; |
|
|
@ -1218,10 +1213,10 @@ var createClient_tcp = function (port_arg, host_arg, options) { |
|
|
|
var cnxOptions = { |
|
|
|
'port' : port_arg || default_port, |
|
|
|
'host' : host_arg || default_host, |
|
|
|
'family' : (options && options.family === 'IPv6') ? 6 : 4 |
|
|
|
'family' : options && options.family === 'IPv6' ? 6 : 4 |
|
|
|
}; |
|
|
|
var net_client = net.createConnection(cnxOptions); |
|
|
|
var redis_client = new RedisClient(net_client, options || {}); |
|
|
|
var redis_client = new RedisClient(net_client, options); |
|
|
|
|
|
|
|
redis_client.connectionOption = cnxOptions; |
|
|
|
redis_client.address = cnxOptions.host + ':' + cnxOptions.port; |
|
|
|