@ -1,36 +1,36 @@
'use strict' ;
var net = require ( "net" ) ,
URL = require ( "url" ) ,
util = require ( "util" ) ,
utils = require ( "./lib/utils" ) ,
Queue = require ( "./lib/queue" ) ,
Command = require ( "./lib/command" ) ,
events = require ( "events" ) ,
parsers = [ ] ,
// This static list of commands is updated from time to time.
// ./lib/commands.js can be updated with generate_commands.js
commands = require ( "./lib/commands" ) ,
connection_id = 0 ,
default_port = 6379 ,
default_host = "127.0.0.1" ,
debug = function ( msg ) {
if ( exports . debug_mode ) {
console . error ( msg ) ;
}
} ;
var net = require ( 'net' ) ;
var URL = require ( 'url' ) ;
var util = require ( 'util' ) ;
var utils = require ( './lib/utils' ) ;
var Queue = require ( './lib/queue' ) ;
var Command = require ( './lib/command' ) ;
var events = require ( 'events' ) ;
var parsers = [ ] ;
// This static list of commands is updated from time to time.
// ./lib/commands.js can be updated with generate_commands.js
var commands = require ( './lib/commands' ) ;
var connection_id = 0 ;
var default_port = 6379 ;
var default_host = '127.0.0.1' ;
var debug = function ( msg ) {
if ( exports . debug_mode ) {
console . error ( msg ) ;
}
} ;
exports . debug_mode = /\bredis\b/i . test ( process . env . NODE_DEBUG ) ;
// hiredis might not be installed
try {
parsers . push ( require ( "./lib/parsers/hiredis" ) ) ;
parsers . push ( require ( './lib/parsers/hiredis' ) ) ;
} catch ( err ) {
/* istanbul ignore next: won't be reached with tests */
debug ( "Hiredis parser not installed." ) ;
debug ( 'Hiredis parser not installed.' ) ;
}
parsers . push ( require ( "./lib/parsers/javascript" ) ) ;
parsers . push ( require ( './lib/parsers/javascript' ) ) ;
function RedisClient ( stream , options ) {
options = options || { } ;
@ -76,36 +76,35 @@ function RedisClient(stream, options) {
events . EventEmitter . call ( this ) ;
}
util . inherits ( RedisClient , events . EventEmitter ) ;
exports . RedisClient = RedisClient ;
RedisClient . prototype . install_stream_listeners = function ( ) {
var self = this ;
this . stream . on ( "connect" , function ( ) {
this . stream . on ( 'connect' , function ( ) {
self . on_connect ( ) ;
} ) ;
this . stream . on ( "data" , function ( buffer_from_socket ) {
this . stream . on ( 'data' , function ( buffer_from_socket ) {
// The data.toString() has a significant impact on big chunks and therefor this should only be used if necessary
// debug("Net read " + this.address + " id " + this.connection_id + ": " + data.toString());
// debug('Net read ' + this.address + ' id ' + this.connection_id + ': ' + data.toString());
self . reply_parser . execute ( buffer_from_socket ) ;
} ) ;
this . stream . on ( "error" , function ( err ) {
this . stream . on ( 'error' , function ( err ) {
self . on_error ( err ) ;
} ) ;
this . stream . on ( "close" , function ( ) {
self . connection_gone ( "close" ) ;
this . stream . on ( 'close' , function ( ) {
self . connection_gone ( 'close' ) ;
} ) ;
this . stream . on ( "end" , function ( ) {
self . connection_gone ( "end" ) ;
this . stream . on ( 'end' , function ( ) {
self . connection_gone ( 'end' ) ;
} ) ;
this . stream . on ( "drain" , function ( ) {
this . stream . on ( 'drain' , function ( ) {
self . should_buffer = false ;
self . emit ( "drain" ) ;
self . emit ( 'drain' ) ;
} ) ;
} ;
@ -122,8 +121,8 @@ RedisClient.prototype.unref = function () {
debug ( "Unref'ing the socket connection" ) ;
this . stream . unref ( ) ;
} else {
debug ( "Not connected yet, will unref later" ) ;
this . once ( "connect" , function ( ) {
debug ( 'Not connected yet, will unref later' ) ;
this . once ( 'connect' , function ( ) {
this . unref ( ) ;
} ) ;
}
@ -134,7 +133,7 @@ RedisClient.prototype.flush_and_error = function (error) {
var command_obj ;
while ( command_obj = this . offline_queue . shift ( ) ) {
if ( typeof command_obj . callback === "function" ) {
if ( typeof command_obj . callback === 'function' ) {
error . command = command_obj . command . toUpperCase ( ) ;
command_obj . callback ( error ) ;
}
@ -142,7 +141,7 @@ RedisClient.prototype.flush_and_error = function (error) {
this . offline_queue = new Queue ( ) ;
while ( command_obj = this . command_queue . shift ( ) ) {
if ( typeof command_obj . callback === "function" ) {
if ( typeof command_obj . callback === 'function' ) {
error . command = command_obj . command . toUpperCase ( ) ;
command_obj . callback ( error ) ;
}
@ -155,16 +154,16 @@ RedisClient.prototype.on_error = function (err) {
return ;
}
err . message = "Redis connection to " + this . address + " failed - " + err . message ;
err . message = 'Redis connection to ' + this . address + ' failed - ' + err . message ;
debug ( err . message ) ;
this . connected = false ;
this . ready = false ;
this . emit ( "error" , err ) ;
// "error" events get turned into exceptions if they aren't listened for. If the user handled this error
this . emit ( 'error' , err ) ;
// 'error' events get turned into exceptions if they aren't listened for. If the user handled this error
// then we should try to reconnect.
this . connection_gone ( "error" ) ;
this . connection_gone ( 'error' ) ;
} ;
var noPasswordIsSet = /no password is set/ ;
@ -173,35 +172,35 @@ var loading = /LOADING/;
RedisClient . prototype . do_auth = function ( ) {
var self = this ;
debug ( "Sending auth to " + self . address + " id " + self . connection_id ) ;
debug ( 'Sending auth to ' + self . address + ' id ' + self . connection_id ) ;
self . send_anyway = true ;
self . send_command ( "auth" , [ this . auth_pass ] , function ( err , res ) {
self . send_command ( 'auth' , [ this . auth_pass ] , function ( err , res ) {
if ( err ) {
/* istanbul ignore if: this is almost impossible to test */
if ( loading . test ( err . message ) ) {
// If redis is still loading the db, it will not authenticate and everything else will fail
debug ( "Redis still loading, trying to authenticate later" ) ;
debug ( 'Redis still loading, trying to authenticate later' ) ;
setTimeout ( function ( ) {
self . do_auth ( ) ;
} , 333 ) ;
return ;
} else if ( noPasswordIsSet . test ( err . message ) ) {
debug ( "Warning: Redis server does not require a password, but a password was supplied." ) ;
debug ( 'Warning: Redis server does not require a password, but a password was supplied.' ) ;
err = null ;
res = "OK" ;
res = 'OK' ;
} else if ( self . auth_callback ) {
self . auth_callback ( err ) ;
self . auth_callback = null ;
return ;
} else {
self . emit ( "error" , err ) ;
self . emit ( 'error' , err ) ;
return ;
}
}
res = res . toString ( ) ;
debug ( "Auth succeeded " + self . address + " id " + self . connection_id ) ;
debug ( 'Auth succeeded ' + self . address + ' id ' + self . connection_id ) ;
if ( self . auth_callback ) {
self . auth_callback ( null , res ) ;
@ -209,7 +208,7 @@ RedisClient.prototype.do_auth = function () {
}
// Now we are really connected
self . emit ( "connect" ) ;
self . emit ( 'connect' ) ;
self . initialize_retry_vars ( ) ;
if ( self . options . no_ready_check ) {
@ -222,7 +221,7 @@ RedisClient.prototype.do_auth = function () {
} ;
RedisClient . prototype . on_connect = function ( ) {
debug ( "Stream connected " + this . address + " id " + this . connection_id ) ;
debug ( 'Stream connected ' + this . address + ' id ' + this . connection_id ) ;
this . connected = true ;
this . ready = false ;
@ -240,7 +239,7 @@ RedisClient.prototype.on_connect = function () {
if ( typeof this . auth_pass === 'string' ) {
this . do_auth ( ) ;
} else {
this . emit ( "connect" ) ;
this . emit ( 'connect' ) ;
this . initialize_retry_vars ( ) ;
if ( this . options . no_ready_check ) {
@ -258,7 +257,7 @@ RedisClient.prototype.init_parser = function () {
if ( ! parsers . some ( function ( parser ) {
if ( parser . name === self . options . parser ) {
self . parser_module = parser ;
debug ( "Using parser module: " + self . parser_module . name ) ;
debug ( 'Using parser module: ' + self . parser_module . name ) ;
return true ;
}
} ) ) {
@ -267,7 +266,7 @@ RedisClient.prototype.init_parser = function () {
throw new Error ( "Couldn't find named parser " + self . options . parser + " on this system" ) ;
}
} else {
debug ( "Using default parser module: " + parsers [ 0 ] . name ) ;
debug ( 'Using default parser module: ' + parsers [ 0 ] . name ) ;
this . parser_module = parsers [ 0 ] ;
}
@ -310,39 +309,39 @@ RedisClient.prototype.on_ready = function () {
this . pub_sub_mode = pub_sub_mode ;
}
if ( this . pub_sub_mode === true ) {
// only emit "ready" when all subscriptions were made again
// only emit 'ready' when all subscriptions were made again
var callback_count = 0 ;
var callback = function ( ) {
callback_count -- ;
if ( callback_count === 0 ) {
self . emit ( "ready" ) ;
self . emit ( 'ready' ) ;
}
} ;
if ( this . options . disable_resubscribing ) {
return ;
}
Object . keys ( this . subscription_set ) . forEach ( function ( key ) {
var space_index = key . indexOf ( " " ) ;
var space_index = key . indexOf ( ' ' ) ;
var parts = [ key . slice ( 0 , space_index ) , key . slice ( space_index + 1 ) ] ;
debug ( "Sending pub/sub on_ready " + parts [ 0 ] + ", " + parts [ 1 ] ) ;
debug ( 'Sending pub/sub on_ready ' + parts [ 0 ] + ', ' + parts [ 1 ] ) ;
callback_count ++ ;
self . send_command ( parts [ 0 ] + "scribe" , [ parts [ 1 ] ] , callback ) ;
self . send_command ( parts [ 0 ] + 'scribe' , [ parts [ 1 ] ] , callback ) ;
} ) ;
return ;
}
if ( this . monitoring ) {
this . send_command ( "monitor" , [ ] ) ;
this . send_command ( 'monitor' , [ ] ) ;
} else {
this . send_offline_queue ( ) ;
}
this . emit ( "ready" ) ;
this . emit ( 'ready' ) ;
} ;
RedisClient . prototype . on_info_cmd = function ( err , res ) {
if ( err ) {
err . message = "Ready check failed: " + err . message ;
this . emit ( "error" , err ) ;
err . message = 'Ready check failed: ' + err . message ;
this . emit ( 'error' , err ) ;
return ;
}
@ -356,7 +355,7 @@ RedisClient.prototype.on_info_cmd = function (err, res) {
var self = this ;
var obj = { } ;
var lines = res . toString ( ) . split ( "\r\n" ) ;
var lines = res . toString ( ) . split ( '\r\n' ) ;
var i = 0 ;
var key = 'db' + i ;
var line , retry_time , parts , sub_parts ;
@ -390,15 +389,15 @@ RedisClient.prototype.on_info_cmd = function (err, res) {
// expose info key/vals to users
this . server_info = obj ;
if ( ! obj . loading || obj . loading === "0" ) {
debug ( "Redis server ready." ) ;
if ( ! obj . loading || obj . loading === '0' ) {
debug ( 'Redis server ready.' ) ;
this . on_ready ( ) ;
} else {
retry_time = obj . loading_eta_seconds * 1000 ;
if ( retry_time > 1000 ) {
retry_time = 1000 ;
}
debug ( "Redis server still loading, trying again in " + retry_time ) ;
debug ( 'Redis server still loading, trying again in ' + retry_time ) ;
setTimeout ( function ( ) {
self . ready_check ( ) ;
} , retry_time ) ;
@ -408,9 +407,9 @@ RedisClient.prototype.on_info_cmd = function (err, res) {
RedisClient . prototype . ready_check = function ( ) {
var self = this ;
debug ( "Checking server ready state..." ) ;
debug ( 'Checking server ready state...' ) ;
this . send_anyway = true ; // secret flag to send_command to send something even if not "ready"
this . send_anyway = true ; // secret flag to send_command to send something even if not 'ready'
this . info ( function ( err , res ) {
self . on_info_cmd ( err , res ) ;
} ) ;
@ -421,7 +420,7 @@ RedisClient.prototype.send_offline_queue = function () {
var command_obj , buffered_writes = 0 ;
while ( command_obj = this . offline_queue . shift ( ) ) {
debug ( "Sending offline command: " + command_obj . command ) ;
debug ( 'Sending offline command: ' + command_obj . command ) ;
buffered_writes += ! this . send_command ( command_obj . command , command_obj . args , command_obj . callback ) ;
}
this . offline_queue = new Queue ( ) ;
@ -429,14 +428,14 @@ RedisClient.prototype.send_offline_queue = function () {
if ( ! buffered_writes ) {
this . should_buffer = false ;
this . emit ( "drain" ) ;
this . emit ( 'drain' ) ;
}
} ;
var retry_connection = function ( self ) {
debug ( "Retrying connection..." ) ;
debug ( 'Retrying connection...' ) ;
self . emit ( "reconnecting" , {
self . emit ( 'reconnecting' , {
delay : self . retry_delay ,
attempt : self . attempts
} ) ;
@ -457,7 +456,7 @@ RedisClient.prototype.connection_gone = function (why) {
return ;
}
debug ( "Redis connection is gone from " + why + " event." ) ;
debug ( 'Redis connection is gone from ' + why + ' event.' ) ;
this . connected = false ;
this . ready = false ;
@ -475,14 +474,14 @@ RedisClient.prototype.connection_gone = function (why) {
// since we are collapsing end and close, users don't expect to be called twice
if ( ! this . emitted_end ) {
this . emit ( "end" ) ;
this . emit ( 'end' ) ;
this . emitted_end = true ;
}
// If this is a requested shutdown, then don't retry
if ( this . closing ) {
debug ( "Connection ended from quit command, not retrying." ) ;
this . flush_and_error ( new Error ( "Redis connection gone from " + why + " event." ) ) ;
debug ( 'Connection ended from quit command, not retrying.' ) ;
this . flush_and_error ( new Error ( 'Redis connection gone from ' + why + ' event.' ) ) ;
return ;
}
@ -490,7 +489,7 @@ RedisClient.prototype.connection_gone = function (why) {
var message = this . retry_totaltime >= this . connect_timeout ?
'connection timeout exceeded.' :
'maximum connection attempts exceeded.' ;
var error = new Error ( "Redis connection in broken state: " + message ) ;
var error = new Error ( 'Redis connection in broken state: ' + message ) ;
error . code = 'CONNECTION_BROKEN' ;
this . flush_and_error ( error ) ;
this . emit ( 'error' , error ) ;
@ -527,11 +526,11 @@ RedisClient.prototype.return_error = function (err) {
if ( this . pub_sub_mode === false && queue_len === 0 ) {
this . command_queue = new Queue ( ) ;
this . emit ( "idle" ) ;
this . emit ( 'idle' ) ;
}
if ( this . should_buffer && queue_len <= this . command_queue_low_water ) {
this . emit ( "drain" ) ;
this . emit ( 'drain' ) ;
this . should_buffer = false ;
}
@ -545,7 +544,7 @@ RedisClient.prototype.return_error = function (err) {
RedisClient . prototype . return_reply = function ( reply ) {
var command_obj , len , type , timestamp , argindex , args , queue_len ;
// If the "reply" here is actually a message received asynchronously due to a
// If the 'reply' here is actually a message received asynchronously due to a
// pubsub subscription, don't pop the command queue as we'll only be consuming
// the head command prematurely.
if ( this . pub_sub_mode && Array . isArray ( reply ) && reply [ 0 ] ) {
@ -553,7 +552,7 @@ RedisClient.prototype.return_reply = function (reply) {
}
if ( this . pub_sub_mode && ( type === 'message' || type === 'pmessage' ) ) {
debug ( "Received pubsub message" ) ;
debug ( 'Received pubsub message' ) ;
} else {
command_obj = this . command_queue . shift ( ) ;
}
@ -562,15 +561,15 @@ RedisClient.prototype.return_reply = function (reply) {
if ( this . pub_sub_mode === false && queue_len === 0 ) {
this . command_queue = new Queue ( ) ; // explicitly reclaim storage from old Queue
this . emit ( "idle" ) ;
this . emit ( 'idle' ) ;
}
if ( this . should_buffer && queue_len <= this . command_queue_low_water ) {
this . emit ( "drain" ) ;
this . emit ( 'drain' ) ;
this . should_buffer = false ;
}
if ( command_obj && ! command_obj . sub_command ) {
if ( typeof command_obj . callback === "function" ) {
if ( typeof command_obj . callback === 'function' ) {
if ( 'exec' !== command_obj . command ) {
if ( this . options . detect_buffers && command_obj . buffer_args === false ) {
// If detect_buffers option was specified, then the reply from the parser will be Buffers.
@ -586,7 +585,7 @@ RedisClient.prototype.return_reply = function (reply) {
command_obj . callback ( null , reply ) ;
} else {
debug ( "No callback for reply" ) ;
debug ( 'No callback for reply' ) ;
}
} else if ( this . pub_sub_mode || command_obj && command_obj . sub_command ) {
if ( Array . isArray ( reply ) ) {
@ -595,29 +594,29 @@ RedisClient.prototype.return_reply = function (reply) {
}
type = reply [ 0 ] . toString ( ) ;
if ( type === "message" ) {
this . emit ( "message" , reply [ 1 ] , reply [ 2 ] ) ; // channel, message
} else if ( type === "pmessage" ) {
this . emit ( "pmessage" , reply [ 1 ] , reply [ 2 ] , reply [ 3 ] ) ; // pattern, channel, message
} else if ( type === "subscribe" || type === "unsubscribe" || type === "psubscribe" || type === "punsubscribe" ) {
if ( type === 'message' ) {
this . emit ( 'message' , reply [ 1 ] , reply [ 2 ] ) ; // channel, message
} else if ( type === 'pmessage' ) {
this . emit ( 'pmessage' , reply [ 1 ] , reply [ 2 ] , reply [ 3 ] ) ; // pattern, channel, message
} else if ( type === 'subscribe' || type === 'unsubscribe' || type === 'psubscribe' || type === 'punsubscribe' ) {
if ( reply [ 2 ] === 0 ) {
this . pub_sub_mode = false ;
debug ( "All subscriptions removed, exiting pub/sub mode" ) ;
debug ( 'All subscriptions removed, exiting pub/sub mode' ) ;
} else {
this . pub_sub_mode = true ;
}
// subscribe commands take an optional callback and also emit an event, but only the first response is included in the callback
// TODO - document this or fix it so it works in a more obvious way
if ( command_obj && typeof command_obj . callback === "function" ) {
if ( command_obj && typeof command_obj . callback === 'function' ) {
command_obj . callback ( null , reply [ 1 ] ) ;
}
this . emit ( type , reply [ 1 ] , reply [ 2 ] ) ; // channel, count
} else {
this . emit ( "error" , new Error ( "subscriptions are active but got unknown reply type " + type ) ) ;
this . emit ( 'error' , new Error ( 'subscriptions are active but got unknown reply type ' + type ) ) ;
return ;
}
} else if ( ! this . closing ) {
this . emit ( "error" , new Error ( "subscriptions are active but got an invalid reply: " + reply ) ) ;
this . emit ( 'error' , new Error ( 'subscriptions are active but got an invalid reply: ' + reply ) ) ;
return ;
}
}
@ -627,29 +626,29 @@ RedisClient.prototype.return_reply = function (reply) {
reply = reply . toString ( ) ;
}
// If in monitoring mode only two commands are valid ones: AUTH and MONITOR wich reply with OK
len = reply . indexOf ( " " ) ;
len = reply . indexOf ( ' ' ) ;
timestamp = reply . slice ( 0 , len ) ;
argindex = reply . indexOf ( '"' ) ;
args = reply . slice ( argindex + 1 , - 1 ) . split ( '" "' ) . map ( function ( elem ) {
return elem . replace ( /\\"/g , '"' ) ;
} ) ;
this . emit ( "monitor" , timestamp , args ) ;
this . emit ( 'monitor' , timestamp , args ) ;
} else {
var err = new Error ( "node_redis command queue state error. If you can reproduce this, please report it." ) ;
var err = new Error ( 'node_redis command queue state error. If you can reproduce this, please report it.' ) ;
err . command_obj = command_obj ;
this . emit ( "error" , err ) ;
this . emit ( 'error' , err ) ;
}
} ;
RedisClient . prototype . send_command = function ( command , args , callback ) {
var arg , command_obj , i , elem_count , buffer_args , stream = this . stream , command_str = "" , buffered_writes = 0 , err ;
var arg , command_obj , i , elem_count , buffer_args , stream = this . stream , command_str = '' , buffered_writes = 0 , err ;
if ( args === undefined ) {
args = [ ] ;
} else if ( ! callback ) {
if ( typeof args [ args . length - 1 ] === "function" ) {
if ( typeof args [ args . length - 1 ] === 'function' ) {
callback = args . pop ( ) ;
} else if ( typeof args [ args . length - 1 ] === "undefined" ) {
} else if ( typeof args [ args . length - 1 ] === 'undefined' ) {
args . pop ( ) ;
}
}
@ -667,7 +666,7 @@ RedisClient.prototype.send_command = function (command, args, callback) {
if ( callback ) {
return callback && callback ( err ) ;
}
this . emit ( "error" , err ) ;
this . emit ( 'error' , err ) ;
return ;
}
}
@ -686,9 +685,9 @@ RedisClient.prototype.send_command = function (command, args, callback) {
if ( this . closing || ! this . enable_offline_queue ) {
command = command . toUpperCase ( ) ;
if ( ! this . closing ) {
err = new Error ( command + ' can\'t be processed. Stream not writeable and enable_offline_queue is deactivated.' ) ;
err = new Error ( command + " can't be processed. Stream not writeable and enable_offline_queue is deactivated." ) ;
} else {
err = new Error ( command + ' can\'t be processed. The connection has already been closed.' ) ;
err = new Error ( command + " can't be processed. The connection has already been closed." ) ;
}
err . command = command ;
if ( callback ) {
@ -697,23 +696,23 @@ RedisClient.prototype.send_command = function (command, args, callback) {
this . emit ( 'error' , err ) ;
}
} else {
debug ( "Queueing " + command + " for next server connection." ) ;
debug ( 'Queueing ' + command + ' for next server connection.' ) ;
this . offline_queue . push ( command_obj ) ;
this . should_buffer = true ;
}
return ;
}
if ( command === "subscribe" || command === "psubscribe" || command === "unsubscribe" || command === "punsubscribe" ) {
if ( command === 'subscribe' || command === 'psubscribe' || command === 'unsubscribe' || command === 'punsubscribe' ) {
this . pub_sub_command ( command_obj ) ;
} else if ( command === "monitor" ) {
} else if ( command === 'monitor' ) {
this . monitoring = true ;
} else if ( command === "quit" ) {
} else if ( command === 'quit' ) {
this . closing = true ;
} else if ( this . pub_sub_mode === true ) {
err = new Error ( "Connection in subscriber mode, only subscriber commands may be used" ) ;
err = new Error ( 'Connection in subscriber mode, only subscriber commands may be used' ) ;
err . command = command . toUpperCase ( ) ;
this . emit ( "error" , err ) ;
this . emit ( 'error' , err ) ;
return ;
}
this . command_queue . push ( command_obj ) ;
@ -721,23 +720,23 @@ RedisClient.prototype.send_command = function (command, args, callback) {
elem_count = args . length + 1 ;
// Always use "Multi bulk commands" , but if passed any Buffer args, then do multiple writes, one for each arg.
// Always use 'Multi bulk commands' , but if passed any Buffer args, then do multiple writes, one for each arg.
// This means that using Buffers in commands is going to be slower, so use Strings if you don't already have a Buffer.
command_str = "*" + elem_count + "\r\n$" + command . length + "\r\n" + command + "\r\n" ;
command_str = '*' + elem_count + '\r\n$' + command . length + '\r\n' + command + '\r\n' ;
if ( ! buffer_args ) { // Build up a string and send entire command in one write
for ( i = 0 ; i < args . length ; i += 1 ) {
arg = args [ i ] ;
if ( typeof arg !== "string" ) {
if ( typeof arg !== 'string' ) {
arg = String ( arg ) ;
}
command_str += "$" + Buffer . byteLength ( arg ) + "\r\n" + arg + "\r\n" ;
command_str += '$' + Buffer . byteLength ( arg ) + '\r\n' + arg + '\r\n' ;
}
debug ( "Send " + this . address + " id " + this . connection_id + ": " + command_str ) ;
debug ( 'Send ' + this . address + ' id ' + this . connection_id + ': ' + command_str ) ;
buffered_writes += ! stream . write ( command_str ) ;
} else {
debug ( "Send command (" + command_str + ") has Buffer arguments" ) ;
debug ( 'Send command (' + command_str + ') has Buffer arguments' ) ;
buffered_writes += ! stream . write ( command_str ) ;
for ( i = 0 ; i < args . length ; i += 1 ) {
@ -748,21 +747,21 @@ RedisClient.prototype.send_command = function (command, args, callback) {
if ( Buffer . isBuffer ( arg ) ) {
if ( arg . length === 0 ) {
debug ( "send_command: using empty string for 0 length buffer" ) ;
buffered_writes += ! stream . write ( "$0\r\n\r\n" ) ;
debug ( 'send_command: using empty string for 0 length buffer' ) ;
buffered_writes += ! stream . write ( '$0\r\n\r\n' ) ;
} else {
buffered_writes += ! stream . write ( "$" + arg . length + "\r\n" ) ;
buffered_writes += ! stream . write ( '$' + arg . length + '\r\n' ) ;
buffered_writes += ! stream . write ( arg ) ;
buffered_writes += ! stream . write ( "\r\n" ) ;
debug ( "send_command: buffer send " + arg . length + " bytes" ) ;
buffered_writes += ! stream . write ( '\r\n' ) ;
debug ( 'send_command: buffer send ' + arg . length + ' bytes' ) ;
}
} else {
debug ( "send_command: string send " + Buffer . byteLength ( arg ) + " bytes: " + arg ) ;
buffered_writes += ! stream . write ( "$" + Buffer . byteLength ( arg ) + "\r\n" + arg + "\r\n" ) ;
debug ( 'send_command: string send ' + Buffer . byteLength ( arg ) + ' bytes: ' + arg ) ;
buffered_writes += ! stream . write ( '$' + Buffer . byteLength ( arg ) + '\r\n' + arg + '\r\n' ) ;
}
}
}
debug ( "send_command buffered_writes: " + buffered_writes , " should_buffer: " + this . should_buffer ) ;
debug ( 'send_command buffered_writes: ' + buffered_writes , ' should_buffer: ' + this . should_buffer ) ;
if ( buffered_writes || this . command_queue . length >= this . command_queue_high_water ) {
this . should_buffer = true ;
}
@ -773,30 +772,30 @@ RedisClient.prototype.pub_sub_command = function (command_obj) {
var i , key , command , args ;
if ( this . pub_sub_mode === false ) {
debug ( "Entering pub/sub mode from " + command_obj . command ) ;
debug ( 'Entering pub/sub mode from ' + command_obj . command ) ;
}
this . pub_sub_mode = true ;
command_obj . sub_command = true ;
command = command_obj . command ;
args = command_obj . args ;
if ( command === "subscribe" || command === "psubscribe" ) {
if ( command === "subscribe" ) {
key = "sub" ;
if ( command === 'subscribe' || command === 'psubscribe' ) {
if ( command === 'subscribe' ) {
key = 'sub' ;
} else {
key = "psub" ;
key = 'psub' ;
}
for ( i = 0 ; i < args . length ; i ++ ) {
this . subscription_set [ key + " " + args [ i ] ] = true ;
this . subscription_set [ key + ' ' + args [ i ] ] = true ;
}
} else {
if ( command === "unsubscribe" ) {
key = "sub" ;
if ( command === 'unsubscribe' ) {
key = 'sub' ;
} else {
key = "psub" ;
key = 'psub' ;
}
for ( i = 0 ; i < args . length ; i ++ ) {
delete this . subscription_set [ key + " " + args [ i ] ] ;
delete this . subscription_set [ key + ' ' + args [ i ] ] ;
}
}
} ;
@ -809,7 +808,7 @@ RedisClient.prototype.end = function (flush) {
clearTimeout ( this . retry_timer ) ;
this . retry_timer = null ;
}
this . stream . on ( "error" , function noop ( ) { } ) ;
this . stream . on ( 'error' , function noop ( ) { } ) ;
// Flush queue if wanted
if ( flush ) {
@ -824,7 +823,7 @@ RedisClient.prototype.end = function (flush) {
function Multi ( client , args ) {
this . _ client = client ;
this . queue = [ [ "multi" ] ] ;
this . queue = [ [ 'multi' ] ] ;
var command , tmp_args ;
if ( Array . isArray ( args ) ) {
while ( tmp_args = args . shift ( ) ) {
@ -839,6 +838,10 @@ function Multi(client, args) {
}
}
RedisClient . prototype . multi = RedisClient . prototype . MULTI = function ( args ) {
return new Multi ( this , args ) ;
} ;
commands . forEach ( function ( fullCommand ) {
var command = fullCommand . split ( ' ' ) [ 0 ] ;
@ -923,10 +926,10 @@ RedisClient.prototype.auth = RedisClient.prototype.AUTH = function (pass, callba
return ;
}
this . auth_pass = pass ;
debug ( "Saving auth as " + this . auth_pass ) ;
debug ( 'Saving auth as ' + this . auth_pass ) ;
// Only run the callback once. So do not safe it if already connected
if ( this . connected ) {
this . send_command ( "auth" , [ this . auth_pass ] , callback ) ;
this . send_command ( 'auth' , [ this . auth_pass ] , callback ) ;
} else {
this . auth_callback = callback ;
}
@ -935,18 +938,18 @@ RedisClient.prototype.auth = RedisClient.prototype.AUTH = function (pass, callba
RedisClient . prototype . hmset = RedisClient . prototype . HMSET = function ( key , args , callback ) {
var field , tmp_args ;
if ( Array . isArray ( key ) ) {
return this . send_command ( "hmset" , key , args ) ;
return this . send_command ( 'hmset' , key , args ) ;
}
if ( Array . isArray ( args ) ) {
return this . send_command ( "hmset" , [ key ] . concat ( args ) , callback ) ;
return this . send_command ( 'hmset' , [ key ] . concat ( args ) , callback ) ;
}
if ( typeof args === "object" ) {
if ( typeof args === 'object' ) {
// User does: client.hmset(key, {key1: val1, key2: val2})
// assuming key is a string, i.e. email address
// if key is a number, i.e. timestamp, convert to string
// TODO: This seems random and no other command get's the key converted => either all or none should behave like this
if ( typeof key !== "string" ) {
if ( typeof key !== 'string' ) {
key = key . toString ( ) ;
}
tmp_args = [ key ] ;
@ -954,9 +957,9 @@ RedisClient.prototype.hmset = RedisClient.prototype.HMSET = function (key, args,
while ( field = fields . shift ( ) ) {
tmp_args . push ( field , args [ field ] ) ;
}
return this . send_command ( "hmset" , tmp_args , callback ) ;
return this . send_command ( 'hmset' , tmp_args , callback ) ;
}
return this . send_command ( "hmset" , utils . to_array ( arguments ) ) ;
return this . send_command ( 'hmset' , utils . to_array ( arguments ) ) ;
} ;
Multi . prototype . hmset = Multi . prototype . HMSET = function ( key , args , callback ) {
@ -971,11 +974,11 @@ Multi.prototype.hmset = Multi.prototype.HMSET = function (key, args, callback) {
args = args . concat ( [ callback ] ) ;
}
tmp_args = [ 'hmset' , key ] . concat ( args ) ;
} else if ( typeof args === "object" ) {
if ( typeof key !== "string" ) {
} else if ( typeof args === 'object' ) {
if ( typeof key !== 'string' ) {
key = key . toString ( ) ;
}
tmp_args = [ "hmset" , key ] ;
tmp_args = [ 'hmset' , key ] ;
var fields = Object . keys ( args ) ;
while ( field = fields . shift ( ) ) {
tmp_args . push ( field ) ;
@ -986,7 +989,7 @@ Multi.prototype.hmset = Multi.prototype.HMSET = function (key, args, callback) {
}
} else {
tmp_args = utils . to_array ( arguments ) ;
tmp_args . unshift ( "hmset" ) ;
tmp_args . unshift ( 'hmset' ) ;
}
this . queue . push ( tmp_args ) ;
return this ;
@ -1010,12 +1013,12 @@ Multi.prototype.exec = Multi.prototype.EXEC = function (callback) {
this . errors = [ ] ;
this . callback = callback ;
this . wants_buffers = new Array ( this . queue . length ) ;
// drain queue, callback will catch "QUEUED" or error
// drain queue, callback will catch 'QUEUED' or error
for ( var index = 0 ; index < this . queue . length ; index ++ ) {
var args = this . queue [ index ] . slice ( 0 ) ;
var command = args . shift ( ) ;
var cb ;
if ( typeof args [ args . length - 1 ] === "function" ) {
if ( typeof args [ args . length - 1 ] === 'function' ) {
cb = args . pop ( ) ;
}
// Keep track of who wants buffer responses:
@ -1066,13 +1069,13 @@ Multi.prototype.execute_callback = function (err, replies) {
if ( this . _ client . options . detect_buffers && this . wants_buffers [ i + 1 ] === false ) {
replies [ i ] = utils . reply_to_strings ( replies [ i ] ) ;
}
if ( args [ 0 ] === "hgetall" ) {
if ( args [ 0 ] === 'hgetall' ) {
// TODO - confusing and error-prone that hgetall is special cased in two places
replies [ i ] = utils . reply_to_object ( replies [ i ] ) ;
}
}
if ( typeof args [ args . length - 1 ] === "function" ) {
if ( typeof args [ args . length - 1 ] === 'function' ) {
if ( replies [ i ] instanceof Error ) {
args [ args . length - 1 ] ( replies [ i ] ) ;
} else {
@ -1087,11 +1090,7 @@ Multi.prototype.execute_callback = function (err, replies) {
}
} ;
RedisClient . prototype . multi = RedisClient . prototype . MULTI = function ( args ) {
return new Multi ( this , args ) ;
} ;
var createClient_unix = function ( path , options ) {
var createClient_unix = function ( path , options ) {
var cnxOptions = {
path : path
} ;
@ -1106,9 +1105,9 @@ var createClient_unix = function(path, options){
var createClient_tcp = function ( port_arg , host_arg , options ) {
var cnxOptions = {
'port' : port_arg || default_port ,
'host' : host_arg || default_host ,
'family' : options && options . family === 'IPv6' ? 6 : 4
port : port_arg || default_port ,
host : host_arg || default_host ,
family : options && options . family === 'IPv6' ? 6 : 4
} ;
var net_client = net . createConnection ( cnxOptions ) ;
var redis_client = new RedisClient ( net_client , options ) ;
@ -1119,7 +1118,7 @@ var createClient_tcp = function (port_arg, host_arg, options) {
return redis_client ;
} ;
exports . createClient = function ( port_arg , host_arg , options ) {
var createClient = function ( port_arg , host_arg , options ) {
if ( typeof port_arg === 'object' || port_arg === undefined ) {
options = port_arg || options || { } ;
return createClient_tcp ( + options . port , options . host , options ) ;
@ -1143,5 +1142,7 @@ exports.createClient = function(port_arg, host_arg, options) {
throw new Error ( 'Unknown type of connection in createClient()' ) ;
} ;
exports . createClient = createClient ;
exports . RedisClient = RedisClient ;
exports . print = utils . print ;
exports . Multi = Multi ;