diff --git a/index.js b/index.js index 43d2b7c..96cf624 100644 --- a/index.js +++ b/index.js @@ -12,6 +12,12 @@ var Parser = require('redis-parser'); var commands = require('redis-commands'); var debug = require('./lib/debug'); var unifyOptions = require('./lib/createClient'); +var SUBSCRIBE_COMMANDS = { + subscribe: true, + unsubscribe: true, + psubscribe: true, + punsubscribe: true +}; // Newer Node.js versions > 0.10 return the EventEmitter right away and using .EventEmitter was deprecated if (typeof EventEmitter !== 'function') { @@ -615,59 +621,52 @@ function normal_reply (self, reply) { } } -function set_subscribe (self, type, command_obj, subscribe, reply) { - var i = 0; +function set_subscribe (self, type, subscribe, channel) { + // Every channel has to be saved / removed one after the other and the type has to be the same too, + // to make sure partly subscribe / unsubscribe works well together if (subscribe) { - // The channels have to be saved one after the other and the type has to be the same too, - // to make sure partly subscribe / unsubscribe works well together - for (; i < command_obj.args.length; i++) { - self.subscription_set[type + '_' + command_obj.args[i]] = command_obj.args[i]; - } + self.subscription_set[type + '_' + channel] = channel; } else { type = type === 'unsubscribe' ? 'subscribe' : 'psubscribe'; // Make types consistent - for (; i < command_obj.args.length; i++) { - delete self.subscription_set[type + '_' + command_obj.args[i]]; - } - if (reply[2] === 0) { // No channels left that this client is subscribed to - var running_command; - i = 0; - // This should be a rare case and therefor handling it this way should be good performance wise for the general case - while (running_command = self.command_queue.get(i++)) { - if ( - running_command.command === 'subscribe' || - running_command.command === 'psubscribe' || - running_command.command === 'unsubscribe' || - running_command.command === 'punsubscribe' - ) { - self.pub_sub_mode = i; - return; - } - } - self.pub_sub_mode = 0; - } + delete self.subscription_set[type + '_' + channel]; } } function subscribe_unsubscribe (self, reply, type, subscribe) { // Subscribe commands take an optional callback and also emit an event, but only the _last_ response is included in the callback + // The pub sub commands return each argument in a separate return value and have to be handled that way var command_obj = self.command_queue.get(0); - var buffer = self.options.return_buffers || self.options.detect_buffers && command_obj && command_obj.buffer_args || reply[1] === null; - var channel = buffer ? reply[1] : reply[1].toString(); - var count = reply[2]; + var buffer = self.options.return_buffers || self.options.detect_buffers && command_obj.buffer_args; + var channel = (buffer || reply[1] === null) ? reply[1] : reply[1].toString(); + var count = +reply[2]; // Return the channel counter as number no matter if `string_numbers` is activated or not debug('Subscribe / unsubscribe command'); // Emit first, then return the callback - if (channel !== null) { // Do not emit something if there was no channel to unsubscribe from + if (channel !== null) { // Do not emit or "unsubscribe" something if there was no channel to unsubscribe from self.emit(type, channel, count); + set_subscribe(self, type, subscribe, channel); } - // The pub sub commands return each argument in a separate return value and have to be handled that way if (command_obj.sub_commands_left <= 1) { - if (count !== 0 && !subscribe && command_obj.args.length === 0) { - command_obj.sub_commands_left = count; - return; + if (count !== 0) { + if (!subscribe && command_obj.args.length === 0) { // Unsubscribe from all channels + command_obj.sub_commands_left = count; + return; + } + } else { + var running_command; + var i = 1; + // This should be a rare case and therefor handling it this way should be good performance wise for the general case + while (running_command = self.command_queue.get(i)) { + if (SUBSCRIBE_COMMANDS[running_command.command]) { + self.command_queue.shift(); + self.pub_sub_mode = i; + return; + } + i++; + } + self.pub_sub_mode = 0; } self.command_queue.shift(); - set_subscribe(self, type, command_obj, subscribe, reply); if (typeof command_obj.callback === 'function') { // TODO: The current return value is pretty useless. // Evaluate to change this in v.3 to return all subscribed / unsubscribed channels in an array including the number of channels subscribed too @@ -819,12 +818,10 @@ RedisClient.prototype.internal_send_command = function (command, args, callback) var command_obj = new Command(command, args_copy, callback); command_obj.buffer_args = buffer_args; - if (command === 'subscribe' || command === 'psubscribe' || command === 'unsubscribe' || command === 'punsubscribe') { + if (SUBSCRIBE_COMMANDS[command] && this.pub_sub_mode === 0) { // If pub sub is already activated, keep it that way, otherwise set the number of commands to resolve until pub sub mode activates // Deactivation of the pub sub mode happens in the result handler - if (!this.pub_sub_mode) { - this.pub_sub_mode = this.command_queue.length + 1; - } + this.pub_sub_mode = this.command_queue.length + 1; } this.command_queue.push(command_obj); diff --git a/test/pubsub.spec.js b/test/pubsub.spec.js index 63d61e0..1651746 100644 --- a/test/pubsub.spec.js +++ b/test/pubsub.spec.js @@ -65,15 +65,44 @@ describe('publish/subscribe', function () { }); }); + describe('string_numbers and pub sub', function () { + beforeEach(function (done) { + sub.end(false); + sub = redis.createClient({ + string_numbers: true + }); + sub.once('connect', function () { + done(); + }); + }); + + it('does not fire subscribe events after reconnecting', function (done) { + var i = 0; + sub.on('subscribe', function (chnl, count) { + assert.strictEqual(typeof count, 'number'); + assert.strictEqual(++i, count); + }); + sub.on('unsubscribe', function (chnl, count) { + assert.strictEqual(typeof count, 'number'); + assert.strictEqual(--i, count); + }); + sub.subscribe(channel, channel2); + sub.unsubscribe(function (err, res) { // Do not pass a channel here! + assert.strictEqual(sub.pub_sub_mode, 2); + assert.deepEqual(sub.subscription_set, {}); + }); + sub.set('foo', 'bar', helper.isString('OK')); + sub.subscribe(channel2, done); + }); + }); + describe('subscribe', function () { it('fires a subscribe event for each channel subscribed to even after reconnecting', function (done) { var a = false; sub.on('subscribe', function (chnl, count) { if (chnl === channel2) { assert.equal(2, count); - if (a) { - return done(); - } + if (a) return done(); sub.stream.destroy(); } });