|
@ -152,13 +152,7 @@ RedisClient.prototype.flush_and_error = function (message) { |
|
|
while (this.offline_queue.length > 0) { |
|
|
while (this.offline_queue.length > 0) { |
|
|
command_obj = this.offline_queue.shift(); |
|
|
command_obj = this.offline_queue.shift(); |
|
|
if (typeof command_obj.callback === "function") { |
|
|
if (typeof command_obj.callback === "function") { |
|
|
try { |
|
|
|
|
|
command_obj.callback(error); |
|
|
command_obj.callback(error); |
|
|
} catch (callback_err) { |
|
|
|
|
|
process.nextTick(function () { |
|
|
|
|
|
throw callback_err; |
|
|
|
|
|
}); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
this.offline_queue = new Queue(); |
|
|
this.offline_queue = new Queue(); |
|
@ -166,13 +160,7 @@ RedisClient.prototype.flush_and_error = function (message) { |
|
|
while (this.command_queue.length > 0) { |
|
|
while (this.command_queue.length > 0) { |
|
|
command_obj = this.command_queue.shift(); |
|
|
command_obj = this.command_queue.shift(); |
|
|
if (typeof command_obj.callback === "function") { |
|
|
if (typeof command_obj.callback === "function") { |
|
|
try { |
|
|
|
|
|
command_obj.callback(error); |
|
|
command_obj.callback(error); |
|
|
} catch (callback_err) { |
|
|
|
|
|
process.nextTick(function () { |
|
|
|
|
|
throw callback_err; |
|
|
|
|
|
}); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
this.command_queue = new Queue(); |
|
|
this.command_queue = new Queue(); |
|
@ -292,6 +280,8 @@ RedisClient.prototype.init_parser = function () { |
|
|
return true; |
|
|
return true; |
|
|
} |
|
|
} |
|
|
})) { |
|
|
})) { |
|
|
|
|
|
// Do not emit this error
|
|
|
|
|
|
// This should take down the app if anyone made such a huge mistake or should somehow be handled in user code
|
|
|
throw new Error("Couldn't find named parser " + self.options.parser + " on this system"); |
|
|
throw new Error("Couldn't find named parser " + self.options.parser + " on this system"); |
|
|
} |
|
|
} |
|
|
} else { |
|
|
} else { |
|
@ -529,37 +519,12 @@ RedisClient.prototype.return_error = function (err) { |
|
|
this.emit("drain"); |
|
|
this.emit("drain"); |
|
|
this.should_buffer = false; |
|
|
this.should_buffer = false; |
|
|
} |
|
|
} |
|
|
|
|
|
if (command_obj.callback) { |
|
|
try { |
|
|
|
|
|
command_obj.callback(err); |
|
|
command_obj.callback(err); |
|
|
} catch (callback_err) { |
|
|
|
|
|
// if a callback throws an exception, re-throw it on a new stack so the parser can keep going
|
|
|
|
|
|
process.nextTick(function () { |
|
|
|
|
|
throw callback_err; |
|
|
|
|
|
}); |
|
|
|
|
|
} |
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
// if a callback throws an exception, re-throw it on a new stack so the parser can keep going.
|
|
|
|
|
|
// if a domain is active, emit the error on the domain, which will serve the same function.
|
|
|
|
|
|
// put this try/catch in its own function because V8 doesn't optimize this well yet.
|
|
|
|
|
|
function try_callback(callback, reply) { |
|
|
|
|
|
try { |
|
|
|
|
|
callback(null, reply); |
|
|
|
|
|
} catch (err) { |
|
|
|
|
|
if (process.domain) { |
|
|
|
|
|
var currDomain = process.domain; |
|
|
|
|
|
currDomain.emit('error', err); |
|
|
|
|
|
if (process.domain === currDomain) { |
|
|
|
|
|
currDomain.exit(); |
|
|
|
|
|
} |
|
|
|
|
|
} else { |
|
|
} else { |
|
|
process.nextTick(function () { |
|
|
this.emit('error', err); |
|
|
throw err; |
|
|
|
|
|
}); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
// hgetall converts its replies to an Object. If the reply is empty, null is returned.
|
|
|
// hgetall converts its replies to an Object. If the reply is empty, null is returned.
|
|
|
function reply_to_object(reply) { |
|
|
function reply_to_object(reply) { |
|
@ -638,7 +603,7 @@ RedisClient.prototype.return_reply = function (reply) { |
|
|
reply = reply_to_object(reply); |
|
|
reply = reply_to_object(reply); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
try_callback(command_obj.callback, reply); |
|
|
command_obj.callback(null, reply); |
|
|
} else { |
|
|
} else { |
|
|
debug("no callback for reply: " + (reply && reply.toString && reply.toString())); |
|
|
debug("no callback for reply: " + (reply && reply.toString && reply.toString())); |
|
|
} |
|
|
} |
|
@ -662,14 +627,16 @@ RedisClient.prototype.return_reply = function (reply) { |
|
|
// reply[1] can be null
|
|
|
// reply[1] can be null
|
|
|
var reply1String = (reply[1] === null) ? null : reply[1].toString(); |
|
|
var reply1String = (reply[1] === null) ? null : reply[1].toString(); |
|
|
if (command_obj && typeof command_obj.callback === "function") { |
|
|
if (command_obj && typeof command_obj.callback === "function") { |
|
|
try_callback(command_obj.callback, reply1String); |
|
|
command_obj.callback(null, reply1String); |
|
|
} |
|
|
} |
|
|
this.emit(type, reply1String, reply[2]); // channel, count
|
|
|
this.emit(type, reply1String, reply[2]); // channel, count
|
|
|
} else { |
|
|
} else { |
|
|
throw new Error("subscriptions are active but got unknown reply type " + type); |
|
|
this.emit("error", new Error("subscriptions are active but got unknown reply type " + type)); |
|
|
|
|
|
return; |
|
|
} |
|
|
} |
|
|
} else if (!this.closing) { |
|
|
} else if (!this.closing) { |
|
|
throw new Error("subscriptions are active but got an invalid reply: " + reply); |
|
|
this.emit("error", new Error("subscriptions are active but got an invalid reply: " + reply)); |
|
|
|
|
|
return; |
|
|
} |
|
|
} |
|
|
} else if (this.monitoring) { |
|
|
} else if (this.monitoring) { |
|
|
len = reply.indexOf(" "); |
|
|
len = reply.indexOf(" "); |
|
@ -680,7 +647,7 @@ RedisClient.prototype.return_reply = function (reply) { |
|
|
}); |
|
|
}); |
|
|
this.emit("monitor", timestamp, args); |
|
|
this.emit("monitor", timestamp, args); |
|
|
} else { |
|
|
} else { |
|
|
throw new Error("node_redis command queue state error. If you can reproduce this, please report it."); |
|
|
this.emit("error", new Error("node_redis command queue state error. If you can reproduce this, please report it.")); |
|
|
} |
|
|
} |
|
|
}; |
|
|
}; |
|
|
|
|
|
|
|
@ -739,10 +706,17 @@ RedisClient.prototype.send_command = function (command, args, callback) { |
|
|
|
|
|
|
|
|
// if the value is undefined or null and command is set or setx, need not to send message to redis
|
|
|
// if the value is undefined or null and command is set or setx, need not to send message to redis
|
|
|
if (command === 'set' || command === 'setex') { |
|
|
if (command === 'set' || command === 'setex') { |
|
|
|
|
|
if (args.length === 0) { |
|
|
|
|
|
return; |
|
|
|
|
|
} |
|
|
if (args[args.length - 1] === undefined || args[args.length - 1] === null) { |
|
|
if (args[args.length - 1] === undefined || args[args.length - 1] === null) { |
|
|
var err = new Error('send_command: ' + command + ' value must not be undefined or null'); |
|
|
var err = new Error('send_command: ' + command + ' value must not be undefined or null'); |
|
|
|
|
|
if (callback) { |
|
|
return callback && callback(err); |
|
|
return callback && callback(err); |
|
|
} |
|
|
} |
|
|
|
|
|
this.emit("error", err); |
|
|
|
|
|
return; |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
buffer_args = false; |
|
|
buffer_args = false; |
|
@ -768,7 +742,8 @@ RedisClient.prototype.send_command = function (command, args, callback) { |
|
|
if (command_obj.callback) { |
|
|
if (command_obj.callback) { |
|
|
command_obj.callback(not_writeable_error); |
|
|
command_obj.callback(not_writeable_error); |
|
|
} else { |
|
|
} else { |
|
|
throw not_writeable_error; |
|
|
this.emit("error", not_writeable_error); |
|
|
|
|
|
return; |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -782,7 +757,8 @@ RedisClient.prototype.send_command = function (command, args, callback) { |
|
|
} else if (command === "quit") { |
|
|
} else if (command === "quit") { |
|
|
this.closing = true; |
|
|
this.closing = true; |
|
|
} else if (this.pub_sub_mode === true) { |
|
|
} else if (this.pub_sub_mode === true) { |
|
|
throw new Error("Connection in subscriber mode, only subscriber commands may be used"); |
|
|
this.emit("error", new Error("Connection in subscriber mode, only subscriber commands may be used")); |
|
|
|
|
|
return; |
|
|
} |
|
|
} |
|
|
this.command_queue.push(command_obj); |
|
|
this.command_queue.push(command_obj); |
|
|
this.commands_sent += 1; |
|
|
this.commands_sent += 1; |
|
@ -1069,7 +1045,7 @@ Multi.prototype.exec = function (callback) { |
|
|
callback(errors); |
|
|
callback(errors); |
|
|
return; |
|
|
return; |
|
|
} else { |
|
|
} else { |
|
|
throw new Error(err); |
|
|
self._client.emit('error', err); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|