@ -32,6 +32,9 @@ function RedisClient(stream, options) {
this . ready = false ;
this . connections = 0 ;
this . attempts = 1 ;
this . should_buffer = false ;
this . command_queue_high_water = this . options . command_queue_high_water || 1000 ;
this . command_queue_low_water = this . options . command_queue_low_water || 0 ;
this . command_queue = new Queue ( ) ; // holds sent commands to de-pipeline them
this . offline_queue = new Queue ( ) ; // holds commands issued but not able to be sent
this . commands_sent = 0 ;
@ -132,6 +135,7 @@ function RedisClient(stream, options) {
} ) ;
this . stream . on ( "drain" , function ( ) {
self . should_buffer = false ;
self . emit ( "drain" ) ;
} ) ;
@ -157,26 +161,21 @@ RedisClient.prototype.on_connect = function () {
this . stream . setNoDelay ( ) ;
this . stream . setTimeout ( 0 ) ;
if ( this . auth_pass ) {
var self = this ;
// if redis is still loading the db, it will not authenticate and everything else will fail
function cmd_do_auth ( ) {
// if redis is still loading the db, it will not authenticate and everything else will fail
function cmd_do_auth ( ) {
if ( exports . debug_mode ) {
console . log ( "Sending auth to " + self . host + ":" + self . port + " fd " + self . stream . fd ) ;
}
self . send_anyway = true ;
self . send_command ( "auth" , self . auth_pass , function ( err , res ) {
self . send_command ( "auth" , [ this . auth_pass ] , function ( err , res ) {
if ( err ) {
if ( err . toString ( ) . match ( "LOADING" ) ) {
// still loading, try to authenticate later
console . log ( "Redis still loading, trying to authenticate later" ) ;
setTimeout ( cmd_do_auth , 2000 ) ;
return ;
console . log ( "Redis still loading, trying to authenticate later" ) ;
setTimeout ( cmd_do_auth , 2000 ) ; // TODO - magic number alert
return ;
} else {
return self . emit ( "error" , "Auth error: " + err ) ;
}
else return self . emit ( "error" , "Auth error: " + err ) ;
}
if ( res . toString ( ) !== "OK" ) {
return self . emit ( "error" , "Auth failed: " + res . toString ( ) ) ;
@ -191,31 +190,28 @@ RedisClient.prototype.on_connect = function () {
// now we are really connected
self . emit ( "connect" ) ;
if ( self . options . no_ready_check ) {
self . ready = true ;
self . send_offline_queue ( ) ;
if ( self . options . no_ready_check ) {
self . ready = true ;
self . send_offline_queue ( ) ;
} else {
self . ready_check ( ) ;
self . ready_check ( ) ;
}
} ) ;
self . send_anyway = false ;
}
}
cmd_do_auth ( ) ;
if ( this . auth_pass ) {
cmd_do_auth ( ) ;
} else {
this . emit ( "connect" ) ;
this . emit ( "connect" ) ;
if ( this . options . no_ready_check ) {
this . ready = true ;
this . send_offline_queue ( ) ;
} else {
this . ready_check ( ) ;
}
if ( this . options . no_ready_check ) {
this . ready = true ;
this . send_offline_queue ( ) ;
} else {
this . ready_check ( ) ;
}
}
} ;
RedisClient . prototype . ready_check = function ( ) {
@ -275,16 +271,21 @@ RedisClient.prototype.ready_check = function () {
} ;
RedisClient . prototype . send_offline_queue = function ( ) {
var command_obj ;
var command_obj , buffered_writes = 0 ;
while ( this . offline_queue . length > 0 ) {
command_obj = this . offline_queue . shift ( ) ;
if ( exports . debug_mode ) {
console . log ( "Sending offline command: " + command_obj . command ) ;
}
this . send_command ( command_obj . command , command_obj . args , command_obj . callback ) ;
buffered_writes += ! this . send_command ( command_obj . command , command_obj . args , command_obj . callback ) ;
}
this . offline_queue = new Queue ( ) ;
// Even though items were shifted off, Queue backing store still uses memory until next add
// Even though items were shifted off, Queue backing store still uses memory until next add, so just get a new Queue
if ( buffered_writes === 0 ) {
this . should_buffer = false ;
this . emit ( "drain" ) ;
}
} ;
RedisClient . prototype . connection_gone = function ( why ) {
@ -361,12 +362,16 @@ RedisClient.prototype.on_data = function (data) {
} ;
RedisClient . prototype . return_error = function ( err ) {
var command_obj = this . command_queue . shift ( ) ;
var command_obj = this . command_queue . shift ( ) , queue_len = this . command_queue . getLength ( ) ;
if ( this . subscriptions === false && this . command_queue . length === 0 ) {
if ( this . subscriptions === false && queue_len === 0 ) {
this . emit ( "idle" ) ;
this . command_queue = new Queue ( ) ;
}
if ( this . should_buffer && queue_len <= this . command_queue_low_water ) {
this . emit ( "drain" ) ;
this . should_buffer = false ;
}
if ( command_obj && typeof command_obj . callback === "function" ) {
try {
@ -388,11 +393,15 @@ RedisClient.prototype.return_error = function (err) {
RedisClient . prototype . return_reply = function ( reply ) {
var command_obj = this . command_queue . shift ( ) ,
obj , i , len , key , val , type , timestamp , args ;
obj , i , len , key , val , type , timestamp , args , queue_len = this . command_queue . getLength ( ) ;
if ( this . subscriptions === false && this . command_queue . length === 0 ) {
if ( this . subscriptions === false && queue_len === 0 ) {
this . emit ( "idle" ) ;
this . command_queue = new Queue ( ) ;
this . command_queue = new Queue ( ) ; // explicitly reclaim storage from old Queue
}
if ( this . should_buffer && queue_len <= this . command_queue_low_water ) {
this . emit ( "drain" ) ;
this . should_buffer = false ;
}
if ( command_obj && ! command_obj . sub_command ) {
@ -453,52 +462,56 @@ RedisClient.prototype.return_reply = function (reply) {
}
} ;
RedisClient . prototype . send_command = function ( ) {
var command , callback , arg , args , this_args , command_obj , i , il ,
elem_count , stream = this . stream , buffer_args , command_str = "" ;
this_args = to_array ( arguments ) ;
// This Command constructor is ever so slightly faster than using an object literal
function Command ( command , args , sub_command , callback ) {
this . command = command ;
this . args = args ;
this . sub_command = sub_command ;
this . callback = callback ;
}
if ( this_args . length === 0 ) {
throw new Error ( "send_command: not enough arguments" ) ;
}
RedisClient . prototype . send_command = function ( command , args , callback ) {
var arg , this_args , command_obj , i , il , elem_count , stream = this . stream , buffer_args , command_str = "" , buffered_writes = 0 ;
if ( typeof this_args [ 0 ] !== "string" ) {
throw new Error ( "First argument of send_command must be the command name" ) ;
if ( typeof command !== "string" ) {
throw new Error ( "First argument t o send_command must be the command name string, not " + typeof command ) ;
}
command = this_args [ 0 ] . toLowerCase ( ) ;
if ( this_args [ 1 ] && Array . isArray ( this_args [ 1 ] ) ) {
args = this_args [ 1 ] ;
if ( typeof this_args [ 2 ] === "function" ) {
callback = this_args [ 2 ] ;
}
} else {
if ( typeof this_args [ this_args . length - 1 ] === "function" ) {
callback = this_args [ this_args . length - 1 ] ;
args = this_args . slice ( 1 , this_args . length - 1 ) ;
if ( Array . isArray ( args ) ) {
if ( typeof callback === "function" ) {
// probably the fastest way:
// client.command([arg1, arg2], cb); (straight passthrough)
// send_command(command, [arg1, arg2], cb);
} else if ( typeof callback === "undefined" ) {
// most people find this variable argument length form more convenient, but it uses arguments, which is slower
// client.command(arg1, arg2, cb); (wraps up arguments into an array)
// send_command(command, [arg1, arg2, cb]);
// client.command(arg1, arg2); (callback is optional)
// send_command(command, [arg1, arg2]);
if ( typeof args [ args . length - 1 ] === "function" ) {
callback = args [ args . length - 1 ] ;
args . length -= 1 ;
}
} else {
args = this_args . slice ( 1 , this_args . length ) ;
throw new Error ( "send_command: last argument must be a callback or undefined" ) ;
}
} else {
throw new Error ( "send_command: second argument must be an array" ) ;
}
if ( args . length === 2 && Array . isArray ( args [ 1 ] ) ) {
args = [ args [ 0 ] ] . concat ( args [ 1 ] ) ;
}
command_obj = {
command : command ,
args : args ,
callback : callback ,
sub_command : false
} ;
command_obj = new Command ( command , args , false , callback ) ;
if ( ! this . ready && ! this . send_anyway ) {
if ( ( ! this . ready && ! this . send_anyway ) || ! stream . writable ) {
if ( exports . debug_mode ) {
if ( ! stream . writable ) {
console . log ( "send command: stream is not writeable." ) ;
}
console . log ( "Queueing " + command + " for next server connection." ) ;
}
this . offline_queue . push ( command_obj ) ;
return ;
this . should_buffer = true ;
return false ;
}
if ( command === "subscribe" || command === "psubscribe" || command === "unsubscribe" || command === "punsubscribe" ) {
@ -521,10 +534,6 @@ RedisClient.prototype.send_command = function () {
buffer_args = false ;
elem_count += args . length ;
// Probably should just scan this like a normal person. This is clever, but might be slow.
buffer_args = args . some ( function ( arg ) {
return arg instanceof Buffer ;
} ) ;
// Always use "Multi bulk commands", but if passed any Buffer args, then do multiple writes, one for each arg
// This means that using Buffers in commands is going to be slower, so use Strings if you don't already have a Buffer.
@ -532,9 +541,10 @@ RedisClient.prototype.send_command = function () {
command_str = "*" + elem_count + "\r\n$" + command . length + "\r\n" + command + "\r\n" ;
if ( ! stream . writable && exports . debug_mode ) {
console . log ( "send command: stream is not writeable, should get a close event next tick." ) ;
return ;
for ( i = 0 , il = args . length , arg ; i < il ; i += 1 ) {
if ( args [ i ] instanceof Buffer ) {
buffer_args = true ;
}
}
if ( ! buffer_args ) { // Build up a string and send entire command in one write
@ -548,13 +558,12 @@ RedisClient.prototype.send_command = function () {
if ( exports . debug_mode ) {
console . log ( "send " + this . host + ":" + this . port + " fd " + this . stream . fd + ": " + command_str ) ;
}
stream . write ( command_str ) ;
buffered_writes += ! stream . write ( command_str ) ;
} else {
if ( exports . debug_mode ) {
console . log ( "send command: " + command_str ) ;
console . log ( "send command has Buffer arguments" ) ;
console . log ( "send command (" + command_str + ") has Buffer arguments" ) ;
}
stream . write ( command_str ) ;
buffered_writes += ! stream . write ( command_str ) ;
for ( i = 0 , il = args . length , arg ; i < il ; i += 1 ) {
arg = args [ i ] ;
@ -565,19 +574,32 @@ RedisClient.prototype.send_command = function () {
if ( arg instanceof Buffer ) {
if ( arg . length === 0 ) {
if ( exports . debug_mode ) {
console . log ( "U sing empty string for 0 length buffer" ) ;
console . log ( "send_command: u sing empty string for 0 length buffer" ) ;
}
stream . write ( "$0\r\n\r\n" ) ;
buffered_writes += ! stream . write ( "$0\r\n\r\n" ) ;
} else {
stream . write ( "$" + arg . length + "\r\n" ) ;
stream . write ( arg ) ;
stream . write ( "\r\n" ) ;
buffered_writes += ! stream . write ( "$" + arg . length + "\r\n" ) ;
buffered_writes += ! stream . write ( arg ) ;
buffered_writes += ! stream . write ( "\r\n" ) ;
if ( exports . debug_mode ) {
console . log ( "send_command: buffer send " + arg . length + " bytes" ) ;
}
}
} else {
stream . write ( "$" + Buffer . byteLength ( arg ) + "\r\n" + arg + "\r\n" ) ;
if ( exports . debug_mode ) {
console . log ( "send_command: string send " + Buffer . byteLength ( arg ) + " bytes: " + arg ) ;
}
buffered_writes += ! stream . write ( "$" + Buffer . byteLength ( arg ) + "\r\n" + arg + "\r\n" ) ;
}
}
}
if ( exports . debug_mode ) {
console . log ( "send_command buffered_writes: " + buffered_writes , " should_buffer: " + this . should_buffer ) ;
}
if ( buffered_writes || this . command_queue . getLength ( ) >= this . command_queue_high_water ) {
this . should_buffer = true ;
}
return ! this . should_buffer ;
} ;
RedisClient . prototype . end = function ( ) {
@ -587,7 +609,6 @@ RedisClient.prototype.end = function () {
return this . stream . end ( ) ;
} ;
function Multi ( client , args ) {
this . client = client ;
this . queue = [ [ "MULTI" ] ] ;
@ -622,17 +643,17 @@ commands = set_union(["get", "set", "setnx", "setex", "append", "strlen", "del",
"restore" , "migrate" , "dump" , "object" , "client" , "eval" , "evalsha" ] , require ( "./lib/commands" ) ) ;
commands . forEach ( function ( command ) {
RedisClient . prototype [ command ] = function ( ) {
var args = to_array ( arguments ) ;
args . unshift ( command ) ; // put command at the beginning
this . send_command . apply ( this , args ) ;
RedisClient . prototype [ command ] = function ( args , callback ) {
if ( Array . isArray ( args ) && typeof callback === "function" ) {
return this . send_command ( command , args , callback ) ;
} else {
return this . send_command ( command , to_array ( arguments ) ) ;
}
} ;
RedisClient . prototype [ command . toUpperCase ( ) ] = RedisClient . prototype [ command ] ;
Multi . prototype [ command ] = function ( ) {
var args = to_array ( arguments ) ;
args . unshift ( command ) ;
this . queue . push ( args ) ;
this . queue . push ( [ command ] . concat ( to_array ( arguments ) ) ) ;
return this ;
} ;
Multi . prototype [ command . toUpperCase ( ) ] = Multi . prototype [ command ] ;
@ -648,29 +669,50 @@ RedisClient.prototype.auth = function () {
}
if ( this . connected ) {
args . unshift ( "auth" ) ;
this . send_command . apply ( this , args ) ;
this . send_command ( "auth" , args ) ;
}
} ;
RedisClient . prototype . AUTH = RedisClient . prototype . auth ;
RedisClient . prototype . hmset = function ( ) {
var args = to_array ( arguments ) , tmp_args ;
if ( args . length >= 2 && typeof args [ 0 ] === "string" && typeof args [ 1 ] === "object" ) {
tmp_args = [ "hmset" , args [ 0 ] ] ;
Object . keys ( args [ 1 ] ) . map ( function ( key ) {
RedisClient . prototype . hmget = function ( arg1 , arg2 , arg3 ) {
if ( Array . isArray ( arg2 ) && typeof arg3 === "function" ) {
return this . send_command ( "hmget" , [ arg1 ] . concat ( arg2 ) , arg3 ) ;
} else if ( Array . isArray ( arg1 ) && typeof arg2 === "function" ) {
return this . send_command ( "hmget" , arg1 , arg2 ) ;
} else {
return this . send_command ( "hmget" , to_array ( arguments ) ) ;
}
} ;
RedisClient . prototype . HMGET = RedisClient . prototype . hmget ;
RedisClient . prototype . hmset = function ( args , callback ) {
var tmp_args , tmp_keys , i , il , key ;
if ( Array . isArray ( args ) && typeof callback === "function" ) {
return this . send_command ( "hmset" , args , callback ) ;
}
args = to_array ( arguments ) ;
if ( typeof args [ args . length - 1 ] === "function" ) {
callback = args [ args . length - 1 ] ;
args . length -= 1 ;
} else {
callback = null ;
}
if ( args . length === 2 && typeof args [ 0 ] === "string" && typeof args [ 1 ] === "object" ) {
// User does: client.hmset(key, {key1: val1, key2: val2})
tmp_args = [ args [ 0 ] ] ;
tmp_keys = Object . keys ( args [ 1 ] ) ;
for ( i = 0 , il = tmp_keys . length ; i < il ; i ++ ) {
key = tmp_keys [ i ] ;
tmp_args . push ( key ) ;
tmp_args . push ( args [ 1 ] [ key ] ) ;
} ) ;
if ( args [ 2 ] ) {
tmp_args . push ( args [ 2 ] ) ;
}
args = tmp_args ;
} else {
args . unshift ( "hmset" ) ;
}
this . send_command . apply ( this , args ) ;
return this . send_command ( "hmset" , args , callback ) ;
} ;
RedisClient . prototype . HMSET = RedisClient . prototype . hmset ;
@ -699,7 +741,7 @@ Multi.prototype.exec = function (callback) {
var self = this ;
// drain queue, callback will catch "QUEUED" or error
// Can't use a for loop here, as we need closure around the index.
// TODO - get rid of all of these anonymous functions which are elegant but slow
this . queue . forEach ( function ( args , index ) {
var command = args [ 0 ] , obj ;
if ( typeof args [ args . length - 1 ] === "function" ) {
@ -730,7 +772,8 @@ Multi.prototype.exec = function (callback) {
} ) ;
} , this ) ;
this . client . send_command ( "EXEC" , function ( err , replies ) {
// TODO - make this callback part of Multi.prototype instead of creating it each time
return this . client . send_command ( "EXEC" , [ ] , function ( err , replies ) {
if ( err ) {
if ( callback ) {
callback ( new Error ( err ) ) ;