@ -5,7 +5,8 @@ var tls = require('tls');
var util = require ( 'util' ) ;
var utils = require ( './lib/utils' ) ;
var Queue = require ( 'double-ended-queue' ) ;
var Command = require ( './lib/command' ) ;
var Command = require ( './lib/command' ) . Command ;
var OfflineCommand = require ( './lib/command' ) . OfflineCommand ;
var EventEmitter = require ( 'events' ) ;
var Parser = require ( 'redis-parser' ) ;
var commands = require ( 'redis-commands' ) ;
@ -128,7 +129,7 @@ function RedisClient (options, stream) {
) ;
}
this . initialize_retry_vars ( ) ;
this . pub_sub_mode = false ;
this . pub_sub_mode = 0 ;
this . subscription_set = { } ;
this . monitoring = false ;
this . closing = false ;
@ -222,6 +223,7 @@ RedisClient.prototype.create_stream = function () {
// The buffer_from_socket.toString() has a significant impact on big chunks and therefor this should only be used if necessary
debug ( 'Net read ' + self . address + ' id ' + self . connection_id ) ; // + ': ' + buffer_from_socket.toString());
self . reply_parser . execute ( buffer_from_socket ) ;
self . emit_idle ( ) ;
} ) ;
this . stream . on ( 'error' , function ( err ) {
@ -386,7 +388,7 @@ RedisClient.prototype.on_ready = function () {
}
this . cork = cork ;
// r estore modal commands from previous connection. The order of the commands is important
// R estore modal commands from previous connection. The order of the commands is important
if ( this . selected_db !== undefined ) {
this . send_command ( 'select' , [ this . selected_db ] ) ;
}
@ -394,31 +396,29 @@ RedisClient.prototype.on_ready = function () {
this . monitoring = this . old_state . monitoring ;
this . pub_sub_mode = this . old_state . pub_sub_mode ;
}
if ( this . pub_sub_mode ) {
if ( this . monitoring ) { // Monitor has to be fired before pub sub commands
this . send_command ( 'monitor' , [ ] ) ;
}
var callback_count = Object . keys ( this . subscription_set ) . length ;
if ( ! this . options . disable_resubscribing && callback_count ) {
// only emit 'ready' when all subscriptions were made again
var callback_count = 0 ;
// TODO: Remove the countdown for ready here. This is not coherent with all other modes and should therefor not be handled special
// We know we are ready as soon as all commands were fired
var callback = function ( ) {
callback_count -- ;
if ( callback_count === 0 ) {
self . emit ( 'ready' ) ;
}
} ;
if ( this . options . disable_resubscribing ) {
this . emit ( 'ready' ) ;
return ;
debug ( 'Sending pub/sub on_ready commands' ) ;
for ( var key in this . subscription_set ) { // jshint ignore: line
var command = key . slice ( 0 , key . indexOf ( '_' ) ) ;
var args = self . subscription_set [ key ] ;
self . send_command ( command , [ args ] , callback ) ;
}
Object . keys ( this . subscription_set ) . forEach ( function ( key ) {
var space_index = key . indexOf ( ' ' ) ;
var parts = [ key . slice ( 0 , space_index ) , key . slice ( space_index + 1 ) ] ;
debug ( 'Sending pub/sub on_ready ' + parts [ 0 ] + ', ' + parts [ 1 ] ) ;
callback_count ++ ;
self . send_command ( parts [ 0 ] + 'scribe' , [ parts [ 1 ] ] , callback ) ;
} ) ;
this . send_offline_queue ( ) ;
return ;
}
if ( this . monitoring ) {
this . send_command ( 'monitor' , [ ] ) ;
}
this . send_offline_queue ( ) ;
this . emit ( 'ready' ) ;
} ;
@ -521,7 +521,7 @@ RedisClient.prototype.connection_gone = function (why, error) {
} ;
this . old_state = state ;
this . monitoring = false ;
this . pub_sub_mode = false ;
this . pub_sub_mode = 0 ;
// since we are collapsing end and close, users don't expect to be called twice
if ( ! this . emitted_end ) {
@ -603,7 +603,6 @@ RedisClient.prototype.return_error = function (err) {
err . code = match [ 1 ] ;
}
this . emit_idle ( ) ;
utils . callback_or_emit ( this , command_obj && command_obj . callback , err ) ;
} ;
@ -613,19 +612,13 @@ RedisClient.prototype.drain = function () {
} ;
RedisClient . prototype . emit_idle = function ( ) {
if ( this . command_queue . length === 0 && this . pub_sub_mode === false ) {
if ( this . command_queue . length === 0 && this . pub_sub_mode === 0 ) {
this . emit ( 'idle' ) ;
}
} ;
/* istanbul ignore next: this is a safety check that we should not be able to trigger */
function queue_state_error ( self , command_obj ) {
var err = new Error ( 'node_redis command queue state error. If you can reproduce this, please report it.' ) ;
err . command_obj = command_obj ;
self . emit ( 'error' , err ) ;
}
function normal_reply ( self , reply , command_obj ) {
function normal_reply ( self , reply ) {
var command_obj = self . command_queue . shift ( ) ;
if ( typeof command_obj . callback === 'function' ) {
if ( 'exec' !== command_obj . command ) {
reply = self . handle_reply ( reply , command_obj . command , command_obj . buffer_args ) ;
@ -636,67 +629,107 @@ function normal_reply (self, reply, command_obj) {
}
}
function return_pub_sub ( self , reply , command_obj ) {
if ( reply instanceof Array ) {
if ( ( ! command_obj || command_obj . buffer_args === false ) && ! self . options . return_buffers ) {
reply = utils . reply_to_strings ( reply ) ;
function set_subscribe ( self , type , command_obj , subscribe , reply ) {
var i = 0 ;
if ( subscribe ) {
// The channels have to be saved one after the other and the type has to be the same too,
// to make sure partly subscribe / unsubscribe works well together
for ( ; i < command_obj . args . length ; i ++ ) {
self . subscription_set [ type + '_' + command_obj . args [ i ] ] = command_obj . args [ i ] ;
}
var type = reply [ 0 ] . toString ( ) ;
// TODO: Add buffer emiters (we have to get all pubsub messages as buffers back in that case)
if ( type === 'message' ) {
self . emit ( 'message' , reply [ 1 ] , reply [ 2 ] ) ; // channcel, message
} else if ( type === 'pmessage' ) {
self . emit ( 'pmessage' , reply [ 1 ] , reply [ 2 ] , reply [ 3 ] ) ; // pattern, channcel, message
} else if ( type === 'subscribe' || type === 'unsubscribe' || type === 'psubscribe' || type === 'punsubscribe' ) {
if ( reply [ 2 ] . toString ( ) === '0' ) {
self . pub_sub_mode = false ;
debug ( 'All subscriptions removed, exiting pub/sub mode' ) ;
} else {
self . pub_sub_mode = true ;
}
// Subscribe commands take an optional callback and also emit an event, but only the first response is included in the callback
// TODO - document this or fix it so it works in a more obvious way
if ( command_obj && typeof command_obj . callback === 'function' ) {
command_obj . callback ( null , reply [ 1 ] ) ;
} else {
type = type === 'unsubscribe' ? 'subscribe' : 'psubscribe' ; // Make types consistent
for ( ; i < command_obj . args . length ; i ++ ) {
delete self . subscription_set [ type + '_' + command_obj . args [ i ] ] ;
}
if ( reply [ 2 ] === 0 ) { // No channels left that this client is subscribed to
var running_command ;
i = 0 ;
// This should be a rare case and therefor handling it this way should be good performance wise for the general case
while ( running_command = self . command_queue . get ( i ++ ) ) {
if (
running_command . command === 'subscribe' ||
running_command . command === 'psubscribe' ||
running_command . command === 'unsubscribe' ||
running_command . command === 'punsubscribe'
) {
self . pub_sub_mode = i ;
return ;
}
}
self . emit ( type , reply [ 1 ] , reply [ 2 ] ) ; // channcel, count
} else {
self . emit ( 'error' , new Error ( 'subscriptions are active but got unknown reply type ' + type ) ) ;
self . pub_sub_mode = 0 ;
}
} else if ( ! self . closing ) {
self . emit ( 'error' , new Error ( 'subscriptions are active but got an invalid reply: ' + reply ) ) ;
}
}
RedisClient . prototype . return_reply = function ( reply ) {
var command_obj , type , queue_len ;
// If the 'reply' here is actually a message received asynchronously due to a
// pubsub subscription, don't pop the command queue as we'll only be consuming
// the head command prematurely.
if ( this . pub_sub_mode && reply instanceof Array && reply [ 0 ] ) {
type = reply [ 0 ] . toString ( ) ;
function subscribe_unsubscribe ( self , reply , type , subscribe ) {
// Subscribe commands take an optional callback and also emit an event, but only the _last_ response is included in the callback
var command_obj = self . command_queue . get ( 0 ) ;
var buffer = self . options . return_buffers || self . options . detect_buffers && command_obj && command_obj . buffer_args || reply [ 1 ] === null ;
var channel = buffer ? reply [ 1 ] : reply [ 1 ] . toString ( ) ;
var count = reply [ 2 ] ;
debug ( 'Subscribe / unsubscribe command' ) ;
// Emit first, then return the callback
if ( channel !== null ) { // Do not emit something if there was no channel to unsubscribe from
self . emit ( type , channel , count ) ;
}
// The pub sub commands return each argument in a separate return value and have to be handled that way
if ( command_obj . sub_commands_left <= 1 ) {
if ( count !== 0 && ! subscribe && command_obj . args . length === 0 ) {
command_obj . sub_commands_left = count ;
return ;
}
self . command_queue . shift ( ) ;
set_subscribe ( self , type , command_obj , subscribe , reply ) ;
if ( typeof command_obj . callback === 'function' ) {
// TODO: The current return value is pretty useless.
// Evaluate to change this in v.3 to return all subscribed / unsubscribed channels in an array including the number of channels subscribed too
command_obj . callback ( null , channel ) ;
}
} else {
command_obj . sub_commands_left -- ;
}
}
if ( this . pub_sub_mode && ( type === 'message' || type === 'pmessage' ) ) {
debug ( 'Received pubsub message' ) ;
function return_pub_sub ( self , reply ) {
var type = reply [ 0 ] . toString ( ) ;
if ( type === 'message' ) { // channel, message
// TODO: Implement message_buffer
// if (self.buffers) {
// self.emit('message_buffer', reply[1], reply[2]);
// }
if ( ! self . options . return_buffers ) { // backwards compatible. Refactor this in v.3 to always return a string on the normal emitter
self . emit ( 'message' , reply [ 1 ] . toString ( ) , reply [ 2 ] . toString ( ) ) ;
} else {
self . emit ( 'message' , reply [ 1 ] , reply [ 2 ] ) ;
}
} else if ( type === 'pmessage' ) { // pattern, channel, message
// if (self.buffers) {
// self.emit('pmessage_buffer', reply[1], reply[2], reply[3]);
// }
if ( ! self . options . return_buffers ) { // backwards compatible. Refactor this in v.3 to always return a string on the normal emitter
self . emit ( 'pmessage' , reply [ 1 ] . toString ( ) , reply [ 2 ] . toString ( ) , reply [ 3 ] . toString ( ) ) ;
} else {
self . emit ( 'pmessage' , reply [ 1 ] , reply [ 2 ] , reply [ 3 ] ) ;
}
} else if ( type === 'subscribe' || type === 'psubscribe' ) {
subscribe_unsubscribe ( self , reply , type , true ) ;
} else if ( type === 'unsubscribe' || type === 'punsubscribe' ) {
subscribe_unsubscribe ( self , reply , type , false ) ;
} else {
command_obj = this . command_queue . shift ( ) ;
normal_reply ( self , reply ) ;
}
}
queue_len = this . command_queue . length ;
this . emit_idle ( ) ;
if ( command_obj && ! command_obj . sub_command ) {
normal_reply ( this , reply , command_obj ) ;
} else if ( this . pub_sub_mode || command_obj && command_obj . sub_command ) {
return_pub_sub ( this , reply , command_obj ) ;
}
/* istanbul ignore else: this is a safety check that we should not be able to trigger */
else if ( ! this . monitoring ) {
queue_state_error ( this , command_obj ) ;
RedisClient . prototype . return_reply = function ( reply ) {
if ( this . pub_sub_mode === 1 && reply instanceof Array && reply . length !== 0 && reply [ 0 ] ) {
return_pub_sub ( this , reply ) ;
} else {
if ( this . pub_sub_mode !== 0 && this . pub_sub_mode !== 1 ) {
this . pub_sub_mode -- ;
}
normal_reply ( this , reply ) ;
}
} ;
@ -731,16 +764,15 @@ RedisClient.prototype.send_command = function (command, args, callback) {
var command_str = '' ;
var len = 0 ;
var big_data = false ;
var buffer_args = false ;
if ( process . domain && callback ) {
callback = process . domain . bind ( callback ) ;
}
var command_obj = new Command ( command , args , callback ) ;
if ( this . ready === false || this . stream . writable === false ) {
// Handle offline commands right away
handle_offline_command ( this , command_obj ) ;
handle_offline_command ( this , new OfflineCommand ( command , args , callback ) ) ;
return false ; // Indicate buffering
}
@ -776,7 +808,7 @@ RedisClient.prototype.send_command = function (command, args, callback) {
args_copy [ i ] = 'null' ; // Backwards compatible :/
} else if ( Buffer . isBuffer ( args [ i ] ) ) {
args_copy [ i ] = args [ i ] ;
command_obj . buffer_args = true ;
buffer_args = true ;
big_data = true ;
if ( this . pipeline !== 0 ) {
this . pipeline += 2 ;
@ -803,9 +835,15 @@ RedisClient.prototype.send_command = function (command, args, callback) {
}
}
args = null ;
var command_obj = new Command ( command , args_copy , callback ) ;
command_obj . buffer_args = buffer_args ;
if ( command === 'subscribe' || command === 'psubscribe' || command === 'unsubscribe' || command === 'punsubscribe' ) {
this . pub_sub_command ( command_obj ) ; // TODO: This has to be moved to the result handler
// If pub sub is already activated, keep it that way, otherwise set the number of commands to resolve until pub sub mode activates
// Deactivation of the pub sub mode happens in the result handler
if ( ! this . pub_sub_mode ) {
this . pub_sub_mode = this . command_queue . length + 1 ;
}
} else if ( command === 'quit' ) {
this . closing = true ;
}
@ -886,38 +924,6 @@ RedisClient.prototype.write = function (data) {
return ;
} ;
RedisClient . prototype . pub_sub_command = function ( command_obj ) {
var i , key , command , args ;
if ( this . pub_sub_mode === false ) {
debug ( 'Entering pub/sub mode from ' + command_obj . command ) ;
}
this . pub_sub_mode = true ;
command_obj . sub_command = true ;
command = command_obj . command ;
args = command_obj . args ;
if ( command === 'subscribe' || command === 'psubscribe' ) {
if ( command === 'subscribe' ) {
key = 'sub' ;
} else {
key = 'psub' ;
}
for ( i = 0 ; i < args . length ; i ++ ) {
this . subscription_set [ key + ' ' + args [ i ] ] = true ;
}
} else {
if ( command === 'unsubscribe' ) {
key = 'sub' ;
} else {
key = 'psub' ;
}
for ( i = 0 ; i < args . length ; i ++ ) {
delete this . subscription_set [ key + ' ' + args [ i ] ] ;
}
}
} ;
RedisClient . prototype . end = function ( flush ) {
// Flush queue if wanted
if ( flush ) {