You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

680 lines
30 KiB

'use strict';
var assert = require('assert');
var config = require('./lib/config');
var helper = require('./helper');
var redis = config.redis;
describe('publish/subscribe', function () {
helper.allTests(function (parser, ip, args) {
describe('using ' + parser + ' and ' + ip, function () {
var pub = null;
var sub = null;
var channel = 'test channel';
var channel2 = 'test channel 2';
var message = 'test message';
beforeEach(function (done) {
var end = helper.callFuncAfter(done, 2);
pub = redis.createClient.apply(null, args);
sub = redis.createClient.apply(null, args);
pub.once('connect', function () {
pub.flushdb(function () {
end();
});
});
sub.once('connect', function () {
end();
});
});
describe('disable resubscribe', function () {
beforeEach(function (done) {
sub.end(false);
sub = redis.createClient({
disable_resubscribing: true
});
sub.once('connect', function () {
done();
});
});
it('does not fire subscribe events after reconnecting', function (done) {
var a = false;
sub.on('subscribe', function (chnl, count) {
if (chnl === channel2) {
if (a) {
return done(new Error('Test failed'));
}
assert.equal(2, count);
sub.stream.destroy();
}
});
sub.on('reconnecting', function () {
a = true;
sub.on('ready', function () {
assert.strictEqual(sub.command_queue.length, 0);
done();
});
});
sub.subscribe(channel, channel2);
});
});
describe('string_numbers and pub sub', function () {
beforeEach(function (done) {
sub.end(false);
sub = redis.createClient({
string_numbers: true
});
sub.once('connect', function () {
done();
});
});
it('does not fire subscribe events after reconnecting', function (done) {
var i = 0;
var end = helper.callFuncAfter(done, 2);
sub.on('subscribe', function (chnl, count) {
assert.strictEqual(typeof count, 'number');
assert.strictEqual(++i, count);
});
sub.on('unsubscribe', function (chnl, count) {
assert.strictEqual(typeof count, 'number');
assert.strictEqual(--i, count);
});
sub.subscribe(channel, channel2);
sub.unsubscribe(function (err, res) { // Do not pass a channel here!
assert.strictEqual(sub.pub_sub_mode, 2);
assert.deepEqual(sub.subscription_set, {});
end();
});
sub.set('foo', 'bar', helper.isString('OK'));
sub.subscribe(channel2, end);
});
});
describe('subscribe', function () {
it('fires a subscribe event for each channel subscribed to even after reconnecting', function (done) {
var a = false;
sub.on('subscribe', function (chnl, count) {
if (chnl === channel2) {
assert.equal(2, count);
if (a) return done();
sub.stream.destroy();
}
});
sub.on('reconnecting', function () {
a = true;
});
sub.subscribe(channel, channel2);
});
it('fires a subscribe event for each channel as buffer subscribed to even after reconnecting', function (done) {
var a = false;
sub.end(true);
sub = redis.createClient({
detect_buffers: true
});
sub.on('subscribe', function (chnl, count) {
if (chnl.inspect() === new Buffer([0xAA, 0xBB, 0x00, 0xF0]).inspect()) {
assert.equal(1, count);
if (a) {
return done();
}
sub.stream.destroy();
}
});
sub.on('reconnecting', function () {
a = true;
});
sub.subscribe(new Buffer([0xAA, 0xBB, 0x00, 0xF0]), channel2);
});
it('receives messages on subscribed channel', function (done) {
var end = helper.callFuncAfter(done, 2);
sub.on('subscribe', function (chnl, count) {
pub.publish(channel, message, function (err, res) {
helper.isNumber(1)(err, res);
end();
});
});
sub.on('message', function (chnl, msg) {
assert.equal(chnl, channel);
assert.equal(msg, message);
end();
});
sub.subscribe(channel);
});
it('receives messages if subscribe is called after unsubscribe', function (done) {
var end = helper.callFuncAfter(done, 2);
sub.once('subscribe', function (chnl, count) {
pub.publish(channel, message, function (err, res) {
helper.isNumber(1)(err, res);
end();
});
});
sub.on('message', function (chnl, msg) {
assert.equal(chnl, channel);
assert.equal(msg, message);
end();
});
sub.subscribe(channel);
sub.unsubscribe(channel);
sub.subscribe(channel);
});
it('handles SUB_UNSUB_MSG_SUB', function (done) {
sub.subscribe('chan8');
sub.subscribe('chan9');
sub.unsubscribe('chan9');
pub.publish('chan8', 'something');
sub.subscribe('chan9', done);
});
it('handles SUB_UNSUB_MSG_SUB 2', function (done) {
sub.psubscribe('abc*', helper.isString('abc*'));
sub.subscribe('xyz');
sub.unsubscribe('xyz');
pub.publish('abcd', 'something');
sub.subscribe('xyz', done);
});
it('emits end event if quit is called from within subscribe', function (done) {
sub.on('end', done);
sub.on('subscribe', function (chnl, count) {
sub.quit();
});
sub.subscribe(channel);
});
it('subscribe; close; resubscribe with prototype inherited property names', function (done) {
var count = 0;
var channels = ['channel 1', 'channel 2'];
var msg = ['hi from channel 1', 'hi from channel 2'];
sub.on('message', function (channel, message) {
var n = Math.max(count - 1, 0);
assert.strictEqual(channel, channels[n]);
assert.strictEqual(message, msg[n]);
if (count === 2) return done();
sub.stream.end();
});
sub.select(3);
sub.subscribe(channels);
sub.on('ready', function (err, results) {
pub.publish(channels[count], msg[count]);
count++;
});
pub.publish(channels[count], msg[count]);
});
});
describe('multiple subscribe / unsubscribe commands', function () {
it('reconnects properly with pub sub and select command', function (done) {
var end = helper.callFuncAfter(done, 2);
sub.select(3);
sub.set('foo', 'bar');
sub.set('failure', helper.isError()); // Triggering a warning while subscribing should work
sub.mget('foo', 'bar', 'baz', 'hello', 'world', function (err, res) {
assert.deepEqual(res, ['bar', null, null, null, null]);
});
sub.subscribe('somechannel', 'another channel', function (err, res) {
end();
sub.stream.destroy();
});
assert(sub.ready);
sub.on('ready', function () {
sub.unsubscribe();
sub.del('foo');
sub.info(end);
});
});
it('should not go into pubsub mode with unsubscribe commands', function (done) {
sub.on('unsubscribe', function (msg) {
// The unsubscribe should not be triggered, as there was no corresponding channel
throw new Error('Test failed');
});
sub.set('foo', 'bar');
sub.unsubscribe(function (err, res) {
assert.strictEqual(res, null);
});
sub.del('foo', done);
});
it('handles multiple channels with the same channel name properly, even with buffers', function (done) {
var channels = ['a', 'b', 'a', new Buffer('a'), 'c', 'b'];
var subscribed_channels = [1, 2, 2, 2, 3, 3];
var i = 0;
sub.subscribe(channels);
sub.on('subscribe', function (channel, count) {
if (Buffer.isBuffer(channel)) {
assert.strictEqual(channel.inspect(), new Buffer(channels[i]).inspect());
} else {
assert.strictEqual(channel, channels[i].toString());
}
assert.strictEqual(count, subscribed_channels[i]);
i++;
});
sub.unsubscribe('a', 'c', 'b');
sub.get('foo', done);
});
it('should only resubscribe to channels not unsubscribed earlier on a reconnect', function (done) {
sub.subscribe('/foo', '/bar');
sub.batch().unsubscribe(['/bar'], function () {
pub.pubsub('channels', function (err, res) {
assert.deepEqual(res, ['/foo']);
sub.stream.destroy();
sub.once('ready', function () {
pub.pubsub('channels', function (err, res) {
assert.deepEqual(res, ['/foo']);
sub.unsubscribe('/foo', done);
});
});
});
}).exec();
});
it('unsubscribes, subscribes, unsubscribes... single and multiple entries mixed. Withouth callbacks', function (done) {
function subscribe (channels) {
sub.unsubscribe(helper.isNull);
sub.subscribe(channels, helper.isNull);
}
var all = false;
var subscribeMsg = ['1', '3', '2', '5', 'test', 'bla'];
sub.on('subscribe', function (msg, count) {
subscribeMsg.splice(subscribeMsg.indexOf(msg), 1);
if (subscribeMsg.length === 0 && all) {
assert.strictEqual(count, 3);
done();
}
});
var unsubscribeMsg = ['1', '3', '2'];
sub.on('unsubscribe', function (msg, count) {
unsubscribeMsg.splice(unsubscribeMsg.indexOf(msg), 1);
if (unsubscribeMsg.length === 0) {
assert.strictEqual(count, 0);
all = true;
}
});
subscribe(['1', '3']);
subscribe(['2']);
subscribe(['5', 'test', 'bla']);
});
it('unsubscribes, subscribes, unsubscribes... single and multiple entries mixed. Without callbacks', function (done) {
function subscribe (channels) {
sub.unsubscribe();
sub.subscribe(channels);
}
var all = false;
var subscribeMsg = ['1', '3', '2', '5', 'test', 'bla'];
sub.on('subscribe', function (msg, count) {
subscribeMsg.splice(subscribeMsg.indexOf(msg), 1);
if (subscribeMsg.length === 0 && all) {
assert.strictEqual(count, 3);
done();
}
});
var unsubscribeMsg = ['1', '3', '2'];
sub.on('unsubscribe', function (msg, count) {
unsubscribeMsg.splice(unsubscribeMsg.indexOf(msg), 1);
if (unsubscribeMsg.length === 0) {
assert.strictEqual(count, 0);
all = true;
}
});
subscribe(['1', '3']);
subscribe(['2']);
subscribe(['5', 'test', 'bla']);
});
it('unsubscribes, subscribes, unsubscribes... single and multiple entries mixed. Without callback and concret channels', function (done) {
function subscribe (channels) {
sub.unsubscribe(channels);
sub.unsubscribe(channels);
sub.subscribe(channels);
}
var all = false;
var subscribeMsg = ['1', '3', '2', '5', 'test', 'bla'];
sub.on('subscribe', function (msg, count) {
subscribeMsg.splice(subscribeMsg.indexOf(msg), 1);
if (subscribeMsg.length === 0 && all) {
assert.strictEqual(count, 6);
done();
}
});
var unsubscribeMsg = ['1', '3', '2', '5', 'test', 'bla'];
sub.on('unsubscribe', function (msg, count) {
var pos = unsubscribeMsg.indexOf(msg);
if (pos !== -1)
unsubscribeMsg.splice(pos, 1);
if (unsubscribeMsg.length === 0) {
all = true;
}
});
subscribe(['1', '3']);
subscribe(['2']);
subscribe(['5', 'test', 'bla']);
});
it('unsubscribes, subscribes, unsubscribes... with pattern matching', function (done) {
function subscribe (channels, callback) {
sub.punsubscribe('prefix:*', helper.isNull);
sub.psubscribe(channels, function (err, res) {
helper.isNull(err);
if (callback) callback(err, res);
});
}
var all = false;
var end = helper.callFuncAfter(done, 8);
var subscribeMsg = ['prefix:*', 'prefix:3', 'prefix:2', '5', 'test:a', 'bla'];
sub.on('psubscribe', function (msg, count) {
subscribeMsg.splice(subscribeMsg.indexOf(msg), 1);
if (subscribeMsg.length === 0) {
assert.strictEqual(count, 5);
all = true;
}
});
var rest = 1;
var unsubscribeMsg = ['prefix:*', 'prefix:*', 'prefix:*', '*'];
sub.on('punsubscribe', function (msg, count) {
unsubscribeMsg.splice(unsubscribeMsg.indexOf(msg), 1);
if (all) {
assert.strictEqual(unsubscribeMsg.length, 0);
assert.strictEqual(count, rest--); // Print the remaining channels
end();
} else {
assert.strictEqual(msg, 'prefix:*');
assert.strictEqual(count, rest++ - 1);
}
});
sub.on('pmessage', function (pattern, channel, msg) {
assert.strictEqual(msg, 'test');
assert.strictEqual(pattern, 'prefix:*');
assert.strictEqual(channel, 'prefix:1');
end();
});
subscribe(['prefix:*', 'prefix:3'], function () {
pub.publish('prefix:1', new Buffer('test'), function () {
subscribe(['prefix:2']);
subscribe(['5', 'test:a', 'bla'], function () {
assert(all);
});
sub.punsubscribe(function (err, res) {
assert(!err);
assert.strictEqual(res, 'bla');
assert(all);
all = false; // Make sure the callback is actually after the emit
end();
});
sub.pubsub('channels', function (err, res) {
assert.strictEqual(res.length, 0);
end();
});
});
});
});
});
describe('unsubscribe', function () {
it('fires an unsubscribe event', function (done) {
sub.on('subscribe', function (chnl, count) {
sub.unsubscribe(channel);
});
sub.subscribe(channel);
sub.on('unsubscribe', function (chnl, count) {
assert.equal(chnl, channel);
assert.strictEqual(count, 0);
return done();
});
});
it('puts client back into write mode', function (done) {
sub.on('subscribe', function (chnl, count) {
sub.unsubscribe(channel);
});
sub.subscribe(channel);
sub.on('unsubscribe', function (chnl, count) {
pub.incr('foo', helper.isNumber(1, done));
});
});
it('does not complain when unsubscribe is called and there are no subscriptions', function (done) {
sub.unsubscribe(function (err, res) {
assert.strictEqual(err, null);
assert.strictEqual(res, null);
done();
});
});
it('executes callback when unsubscribe is called and there are no subscriptions', function (done) {
pub.unsubscribe(function (err, results) {
assert.strictEqual(null, results);
done(err);
});
});
});
describe('psubscribe', function () {
it('allows all channels to be subscribed to using a * pattern', function (done) {
sub.subscribe('/foo');
var sub2 = redis.createClient({
return_buffers: true
});
sub2.on('ready', function () {
sub2.batch().psubscribe('*', helper.isString('*')).exec();
sub2.subscribe('/foo');
sub2.on('pmessage', function (pattern, channel, message) {
assert.strictEqual(pattern.inspect(), new Buffer('*').inspect());
assert.strictEqual(channel.inspect(), new Buffer('/foo').inspect());
assert.strictEqual(message.inspect(), new Buffer('hello world').inspect());
sub2.quit(done);
});
pub.pubsub('numsub', '/foo', function (err, res) {
assert.deepEqual(res, ['/foo', 2]);
});
// sub2 is counted twice as it subscribed with psubscribe and subscribe
pub.publish('/foo', 'hello world', helper.isNumber(3));
});
});
it('allows to listen to pmessageBuffer and pmessage', function (done) {
var end = helper.callFuncAfter(done, 6);
var data = Array(10000).join('äüs^öéÉÉ`e');
sub.set('foo', data, function () {
sub.get('foo');
sub.stream.once('data', function () {
assert.strictEqual(sub.message_buffers, false);
assert.strictEqual(sub.shouldBuffer, false);
sub.on('pmessageBuffer', function (pattern, channel, message) {
if (parser !== 'javascript' && typeof pattern === 'string') {
pattern = new Buffer(pattern);
channel = new Buffer(channel);
}
assert.strictEqual(pattern.inspect(), new Buffer('*').inspect());
assert.strictEqual(channel.inspect(), new Buffer('/foo').inspect());
sub.quit(end);
});
if (parser === 'javascript') {
assert.notStrictEqual(sub.message_buffers, sub.buffers);
}
});
var batch = sub.batch();
batch.psubscribe('*');
batch.subscribe('/foo');
batch.unsubscribe('/foo');
batch.unsubscribe(helper.isNull());
batch.subscribe(['/foo'], helper.isString('/foo'));
batch.exec(function () {
pub.pubsub('numsub', '/foo', function (err, res) {
// There's one subscriber to this channel
assert.deepEqual(res, ['/foo', 1]);
end();
});
pub.pubsub('channels', function (err, res) {
// There's exactly one channel that is listened too
assert.deepEqual(res, ['/foo']);
end();
});
pub.pubsub('numpat', function (err, res) {
// One pattern is active
assert.strictEqual(res, 1);
end();
});
pub.publish('/foo', 'hello world', helper.isNumber(2));
});
// Either message_buffers or buffers has to be true, but not both at the same time
sub.on('pmessage', function (pattern, channel, message) {
assert.strictEqual(pattern, '*');
assert.strictEqual(channel, '/foo');
assert.strictEqual(message, 'hello world');
end();
});
sub.on('message', function (channel, message) {
assert.strictEqual(channel, '/foo');
assert.strictEqual(message, 'hello world');
end();
});
});
});
});
describe('punsubscribe', function () {
it('does not complain when punsubscribe is called and there are no subscriptions', function () {
sub.punsubscribe();
});
it('executes callback when punsubscribe is called and there are no subscriptions', function (done) {
pub.batch().punsubscribe(helper.isNull()).exec(done);
});
});
describe('fail for other commands while in pub sub mode', function () {
it('return error if only pub sub commands are allowed', function (done) {
sub.subscribe('channel');
// Ping is allowed even if not listed as such!
sub.ping(function (err, res) {
assert.strictEqual(err, null);
assert.strictEqual(res[0], 'pong');
});
// Get is forbidden
sub.get('foo', function (err, res) {
assert(/^ERR only \(P\)SUBSCRIBE \/ \(P\)UNSUBSCRIBE/.test(err.message));
assert.strictEqual(err.command, 'GET');
});
// Quit is allowed
sub.quit(done);
});
it('emit error if only pub sub commands are allowed without callback', function (done) {
sub.subscribe('channel');
sub.on('error', function (err) {
assert(/^ERR only \(P\)SUBSCRIBE \/ \(P\)UNSUBSCRIBE/.test(err.message));
assert.strictEqual(err.command, 'GET');
done();
});
sub.get('foo');
});
});
it('should not publish a message multiple times per command', function (done) {
var published = {};
function subscribe (message) {
sub.removeAllListeners('subscribe');
sub.removeAllListeners('message');
sub.removeAllListeners('unsubscribe');
sub.on('subscribe', function () {
pub.publish('/foo', message);
});
sub.on('message', function (channel, message) {
if (published[message]) {
done(new Error('Message published more than once.'));
}
published[message] = true;
});
sub.on('unsubscribe', function (channel, count) {
assert.strictEqual(count, 0);
});
sub.subscribe('/foo');
}
subscribe('hello');
setTimeout(function () {
sub.unsubscribe();
setTimeout(function () {
subscribe('world');
setTimeout(done, 50);
}, 40);
}, 40);
});
it('should not publish a message without any publish command', function (done) {
pub.set('foo', 'message');
pub.set('bar', 'hello');
pub.mget('foo', 'bar');
pub.subscribe('channel', function () {
setTimeout(done, 50);
});
pub.on('message', function (msg) {
done(new Error('This message should not have been published: ' + msg));
});
});
it('arguments variants', function (done) {
sub.batch()
.info(['stats'])
.info()
.client('KILL', ['type', 'pubsub'])
.client('KILL', ['type', 'pubsub'], function () {})
.unsubscribe()
.psubscribe(['pattern:*'])
.punsubscribe('unkown*')
.punsubscribe(['pattern:*'])
.exec(function (err, res) {
sub.client('kill', ['type', 'pubsub']);
sub.psubscribe('*');
sub.punsubscribe('pa*');
sub.punsubscribe(['a', '*'], done);
});
});
afterEach(function () {
// Explicitly ignore still running commands
pub.end(false);
sub.end(false);
});
});
});
});