From 3038c9043de88a1ca85ccb028d7f798782ef0ad5 Mon Sep 17 00:00:00 2001 From: Ruben Bridgewater Date: Thu, 14 Apr 2016 01:14:41 +0200 Subject: [PATCH] Make sure all individual handled command work in multi context the same Fix quit possibly resulting in reconnections --- index.js | 31 +++++- lib/individualCommands.js | 228 ++++++++++++++++++++++++++------------ test/auth.spec.js | 42 +++++++ test/batch.spec.js | 16 ++- test/connection.spec.js | 21 ++++ test/multi.spec.js | 34 ++++++ test/node_redis.spec.js | 61 ++++++---- 7 files changed, 334 insertions(+), 99 deletions(-) diff --git a/index.js b/index.js index 8b5d917..f9b9339 100644 --- a/index.js +++ b/index.js @@ -735,12 +735,35 @@ function return_pub_sub (self, reply) { } RedisClient.prototype.return_reply = function (reply) { - if (this.pub_sub_mode === 1 && reply instanceof Array && reply.length !== 0 && reply[0]) { + // If in monitor mode, all normal commands are still working and we only want to emit the streamlined commands + // As this is not the average use case and monitor is expensive anyway, let's change the code here, to improve + // the average performance of all other commands in case of no monitor mode + if (this.monitoring) { + var replyStr; + if (this.buffers && Buffer.isBuffer(reply)) { + replyStr = reply.toString(); + } else { + replyStr = reply; + } + // While reconnecting the redis server does not recognize the client as in monitor mode anymore + // Therefore the monitor command has to finish before it catches further commands + if (typeof replyStr === 'string' && utils.monitor_regex.test(replyStr)) { + var timestamp = replyStr.slice(0, replyStr.indexOf(' ')); + var args = replyStr.slice(replyStr.indexOf('"') + 1, -1).split('" "').map(function (elem) { + return elem.replace(/\\"/g, '"'); + }); + this.emit('monitor', timestamp, args, replyStr); + return; + } + } + if (this.pub_sub_mode === 0) { + normal_reply(this, reply); + } else if (this.pub_sub_mode !== 1) { + this.pub_sub_mode--; + normal_reply(this, reply); + } else if (reply instanceof Array && reply.length > 2 && reply[0]) { return_pub_sub(this, reply); } else { - if (this.pub_sub_mode !== 0 && this.pub_sub_mode !== 1) { - this.pub_sub_mode--; - } normal_reply(this, reply); } }; diff --git a/lib/individualCommands.js b/lib/individualCommands.js index 4ebd8ae..bcbbecd 100644 --- a/lib/individualCommands.js +++ b/lib/individualCommands.js @@ -31,88 +31,101 @@ RedisClient.prototype.batch = RedisClient.prototype.BATCH = function batch (args return new Multi(this, args); }; -// Store db in this.select_db to restore it on reconnect -RedisClient.prototype.select = RedisClient.prototype.SELECT = function select (db, callback) { - var self = this; - return this.internal_send_command('select', [db], function (err, res) { +function select_callback (self, db, callback) { + return function (err, res) { if (err === null) { + // Store db in this.select_db to restore it on reconnect self.selected_db = db; } utils.callback_or_emit(self, callback, err, res); - }); + }; +} + +RedisClient.prototype.select = RedisClient.prototype.SELECT = function select (db, callback) { + return this.internal_send_command('select', [db], select_callback(this, db, callback)); }; -RedisClient.prototype.monitor = RedisClient.prototype.MONITOR = function (callback) { - // Use a individual command, as this is a special case that does not has to be checked for any other command - var self = this; - return this.internal_send_command('monitor', [], function (err, res) { +Multi.prototype.select = Multi.prototype.SELECT = function select (db, callback) { + this.queue.push(['select', [db], select_callback(this._client, db, callback)]); + return this; +}; + +function monitor_callback (self, callback) { + return function (err, res) { if (err === null) { - self.reply_parser.returnReply = function (reply) { - // If in monitor mode, all normal commands are still working and we only want to emit the streamlined commands - // As this is not the average use case and monitor is expensive anyway, let's change the code here, to improve - // the average performance of all other commands in case of no monitor mode - if (self.monitoring) { - var replyStr; - if (self.buffers && Buffer.isBuffer(reply)) { - replyStr = reply.toString(); - } else { - replyStr = reply; - } - // While reconnecting the redis server does not recognize the client as in monitor mode anymore - // Therefor the monitor command has to finish before it catches further commands - if (typeof replyStr === 'string' && utils.monitor_regex.test(replyStr)) { - var timestamp = replyStr.slice(0, replyStr.indexOf(' ')); - var args = replyStr.slice(replyStr.indexOf('"') + 1, -1).split('" "').map(function (elem) { - return elem.replace(/\\"/g, '"'); - }); - self.emit('monitor', timestamp, args, replyStr); - return; - } - } - self.return_reply(reply); - }; self.monitoring = true; } utils.callback_or_emit(self, callback, err, res); - }); + }; +} + +RedisClient.prototype.monitor = RedisClient.prototype.MONITOR = function monitor (callback) { + // Use a individual command, as this is a special case that does not has to be checked for any other command + return this.internal_send_command('monitor', [], monitor_callback(this, callback)); }; -RedisClient.prototype.quit = RedisClient.prototype.QUIT = function (callback) { - var self = this; - var callback_hook = function (err, res) { - // TODO: Improve this by handling everything with coherend error codes and find out if there's anything missing - if (err && (err.code === 'NR_OFFLINE' || - err.message === 'Redis connection gone from close event.' || - err.message === 'The command can\'t be processed. The connection has already been closed.' - )) { +// Only works with batch, not in a transaction +Multi.prototype.monitor = Multi.prototype.MONITOR = function monitor (callback) { + // Use a individual command, as this is a special case that does not has to be checked for any other command + if (this.exec !== this.exec_transaction) { + this.queue.push(['monitor', [], monitor_callback(this._client, callback)]); + return this; + } + var err = new Error( + 'You used the monitor command in combination with a transaction. Due to faulty return values of ' + + 'Redis in this context, the monitor command is now executed without transaction instead and ignored ' + + 'in the multi statement.' + ); + err.command = 'MONITOR'; + utils.reply_in_order(this._client, callback, err); + this._client.monitor('monitor', callback); + return this; +}; + +function quit_callback (self, callback) { + return function (err, res) { + if (err && err.code === 'NR_OFFLINE') { // Pretent the quit command worked properly in this case. // Either the quit landed in the offline queue and was flushed at the reconnect // or the offline queue is deactivated and the command was rejected right away // or the stream is not writable - // or while sending the quit, the connection dropped + // or while sending the quit, the connection ended / closed err = null; res = 'OK'; } utils.callback_or_emit(self, callback, err, res); + if (self.stream.writable) { + // If the socket is still alive, kill it. This could happen if quit got a NR_OFFLINE error code + self.stream.destroy(); + } }; - var backpressure_indicator = this.internal_send_command('quit', [], callback_hook); +} + +RedisClient.prototype.QUIT = RedisClient.prototype.quit = function (callback) { + // TODO: Consider this for v.3 + // Allow the quit command to be fired as soon as possible to prevent it landing in the offline queue. + // this.ready = this.offline_queue.length === 0; + var backpressure_indicator = this.internal_send_command('quit', [], quit_callback(this, callback)); // Calling quit should always end the connection, no matter if there's a connection or not this.closing = true; + this.ready = false; return backpressure_indicator; }; -// Store info in this.server_info after each call -RedisClient.prototype.info = RedisClient.prototype.INFO = function info (section, callback) { - var self = this; - var ready = this.ready; - var args = []; - if (typeof section === 'function') { - callback = section; - } else if (section !== undefined) { - args = Array.isArray(section) ? section : [section]; - } - this.ready = ready || this.offline_queue.length === 0; // keep the execution order intakt - var tmp = this.internal_send_command('info', args, function (err, res) { +// Only works with batch, not in a transaction +Multi.prototype.QUIT = Multi.prototype.quit = function (callback) { + var self = this._client; + var call_on_write = function () { + // If called in a multi context, we expect redis is available + self.closing = true; + self.ready = false; + }; + this.queue.push(['quit', [], quit_callback(self, callback), call_on_write]); + return this; +}; + +function info_callback (self, callback) { + return function (err, res) { if (res) { var obj = {}; var lines = res.toString().split('\r\n'); @@ -146,20 +159,33 @@ RedisClient.prototype.info = RedisClient.prototype.INFO = function info (section self.server_info = {}; } utils.callback_or_emit(self, callback, err, res); - }); - this.ready = ready; - return tmp; + }; +} + +// Store info in this.server_info after each call +RedisClient.prototype.info = RedisClient.prototype.INFO = function info (section, callback) { + var args = []; + if (typeof section === 'function') { + callback = section; + } else if (section !== undefined) { + args = Array.isArray(section) ? section : [section]; + } + return this.internal_send_command('info', args, info_callback(this, callback)); }; -RedisClient.prototype.auth = RedisClient.prototype.AUTH = function auth (pass, callback) { - var self = this; - var ready = this.ready; - debug('Sending auth to ' + self.address + ' id ' + self.connection_id); +Multi.prototype.info = Multi.prototype.INFO = function info (section, callback) { + var args = []; + if (typeof section === 'function') { + callback = section; + } else if (section !== undefined) { + args = Array.isArray(section) ? section : [section]; + } + this.queue.push(['info', args, info_callback(this._client, callback)]); + return this; +}; - // Stash auth for connect and reconnect. - this.auth_pass = pass; - this.ready = ready || this.offline_queue.length === 0; // keep the execution order intakt - var tmp = this.internal_send_command('auth', [pass], function (err, res) { +function auth_callback (self, pass, callback) { + return function (err, res) { if (err) { if (no_password_is_set.test(err.message)) { self.warn('Warning: Redis server does not require a password, but a password was supplied.'); @@ -175,11 +201,31 @@ RedisClient.prototype.auth = RedisClient.prototype.AUTH = function auth (pass, c } } utils.callback_or_emit(self, callback, err, res); - }); + }; +} + +RedisClient.prototype.auth = RedisClient.prototype.AUTH = function auth (pass, callback) { + debug('Sending auth to ' + this.address + ' id ' + this.connection_id); + + // Stash auth for connect and reconnect. + this.auth_pass = pass; + var ready = this.ready; + this.ready = ready || this.offline_queue.length === 0; + var tmp = this.internal_send_command('auth', [pass], auth_callback(this, pass, callback)); this.ready = ready; return tmp; }; +// Only works with batch, not in a transaction +Multi.prototype.auth = Multi.prototype.AUTH = function auth (pass, callback) { + debug('Sending auth to ' + this.address + ' id ' + this.connection_id); + + // Stash auth for connect and reconnect. + this.auth_pass = pass; + this.queue.push(['auth', [pass], auth_callback(this._client, callback)]); + return this; +}; + RedisClient.prototype.hmset = RedisClient.prototype.HMSET = function hmset () { var arr, len = arguments.length, @@ -198,7 +244,7 @@ RedisClient.prototype.hmset = RedisClient.prototype.HMSET = function hmset () { for (; i < len; i += 1) { arr[i + 1] = arguments[1][i]; } - } else if (typeof arguments[1] === 'object' && (arguments.length === 2 || arguments.length === 3 && typeof arguments[2] === 'function' || typeof arguments[2] === 'undefined')) { + } else if (typeof arguments[1] === 'object' && (arguments.length === 2 || arguments.length === 3 && (typeof arguments[2] === 'function' || typeof arguments[2] === 'undefined'))) { arr = [arguments[0]]; for (var field in arguments[1]) { // jshint ignore: line arr.push(field, arguments[1][field]); @@ -219,6 +265,46 @@ RedisClient.prototype.hmset = RedisClient.prototype.HMSET = function hmset () { return this.internal_send_command('hmset', arr, callback); }; +Multi.prototype.hmset = Multi.prototype.HMSET = function hmset () { + var arr, + len = arguments.length, + callback, + i = 0; + if (Array.isArray(arguments[0])) { + arr = arguments[0]; + callback = arguments[1]; + } else if (Array.isArray(arguments[1])) { + if (len === 3) { + callback = arguments[2]; + } + len = arguments[1].length; + arr = new Array(len + 1); + arr[0] = arguments[0]; + for (; i < len; i += 1) { + arr[i + 1] = arguments[1][i]; + } + } else if (typeof arguments[1] === 'object' && (arguments.length === 2 || arguments.length === 3 && (typeof arguments[2] === 'function' || typeof arguments[2] === 'undefined'))) { + arr = [arguments[0]]; + for (var field in arguments[1]) { // jshint ignore: line + arr.push(field, arguments[1][field]); + } + callback = arguments[2]; + } else { + len = arguments.length; + // The later should not be the average use case + if (len !== 0 && (typeof arguments[len - 1] === 'function' || typeof arguments[len - 1] === 'undefined')) { + len--; + callback = arguments[len]; + } + arr = new Array(len); + for (; i < len; i += 1) { + arr[i] = arguments[i]; + } + } + this.queue.push(['hmset', arr, callback]); + return this; +}; + RedisClient.prototype.subscribe = RedisClient.prototype.SUBSCRIBE = function subscribe () { var arr, len = arguments.length, @@ -378,7 +464,7 @@ Multi.prototype.psubscribe = Multi.prototype.PSUBSCRIBE = function psubscribe () arr[i] = arguments[i]; } } - var self = this; + var self = this._client; var call_on_write = function () { self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1; }; @@ -434,7 +520,7 @@ Multi.prototype.punsubscribe = Multi.prototype.PUNSUBSCRIBE = function punsubscr arr[i] = arguments[i]; } } - var self = this; + var self = this._client; var call_on_write = function () { // Pub sub has to be activated even if not in pub sub mode, as the return value is manipulated in the callback self.pub_sub_mode = self.pub_sub_mode || self.command_queue.length + 1; diff --git a/test/auth.spec.js b/test/auth.spec.js index 4c262dc..ac6548e 100644 --- a/test/auth.spec.js +++ b/test/auth.spec.js @@ -291,6 +291,48 @@ describe('client authentication', function () { }); }); }); + + it('indivdual commands work properly with batch', function (done) { + // quit => might return an error instead of "OK" in the exec callback... (if not connected) + // auth => might return an error instead of "OK" in the exec callback... (if no password is required / still loading on Redis <= 2.4) + // This could be fixed by checking the return value of the callback in the exec callback and + // returning the manipulated [error, result] from the callback. + // There should be a better solution though + + var args = config.configureClient(parser, 'localhost', { + noReadyCheck: true + }); + client = redis.createClient.apply(redis.createClient, args); + assert.strictEqual(client.selected_db, undefined); + var end = helper.callFuncAfter(done, 8); + client.on('monitor', function () { + end(); // Should be called for each command after monitor + }); + client.batch() + .auth(auth) + .SELECT(5, function (err, res) { + assert.strictEqual(client.selected_db, 5); + assert.strictEqual(res, 'OK'); + assert.notDeepEqual(client.serverInfo.db5, { avg_ttl: 0, expires: 0, keys: 1 }); + }) + .monitor() + .set('foo', 'bar', helper.isString('OK')) + .INFO(function (err, res) { + assert.strictEqual(res.indexOf('# Server\r\nredis_version:'), 0); + assert.deepEqual(client.serverInfo.db5, { avg_ttl: 0, expires: 0, keys: 1 }); + }) + .get('foo', helper.isString('bar')) + .subscribe(['foo', 'bar']) + .unsubscribe('foo') + .SUBSCRIBE('/foo', helper.isString('/foo')) + .psubscribe('*') + .quit(helper.isString('OK')) // this might be interesting + .exec(function (err, res) { + res[4] = res[4].substr(0, 10); + assert.deepEqual(res, ['OK', 'OK', 'OK', 'OK', '# Server\r\n', 'bar', 'bar', 'foo', '/foo', '*', 'OK']); + end(); + }); + }); }); }); diff --git a/test/batch.spec.js b/test/batch.spec.js index 5f25be6..7b382ee 100644 --- a/test/batch.spec.js +++ b/test/batch.spec.js @@ -12,8 +12,6 @@ describe("The 'batch' method", function () { describe('using ' + parser + ' and ' + ip, function () { describe('when not connected', function () { - // TODO: This is somewhat broken and should be fixed in v.3 - // The commands should return an error instead of returning an empty result var client; beforeEach(function (done) { @@ -24,7 +22,7 @@ describe("The 'batch' method", function () { client.on('end', done); }); - it('returns an empty array', function (done) { + it('returns an empty array for missing commands', function (done) { var batch = client.batch(); batch.exec(function (err, res) { assert.strictEqual(err, null); @@ -33,7 +31,17 @@ describe("The 'batch' method", function () { }); }); - it('returns an empty array if promisified', function () { + it('returns an error for batch with commands', function (done) { + var batch = client.batch(); + batch.set('foo', 'bar'); + batch.exec(function (err, res) { + assert.strictEqual(err, null); + assert.strictEqual(res[0].code, 'NR_OFFLINE'); + done(); + }); + }); + + it('returns an empty array for missing commands if promisified', function () { return client.batch().execAsync().then(function (res) { assert.strictEqual(res.length, 0); }); diff --git a/test/connection.spec.js b/test/connection.spec.js index ad91730..531dce5 100644 --- a/test/connection.spec.js +++ b/test/connection.spec.js @@ -93,6 +93,27 @@ describe('connection tests', function () { assert.strictEqual(bool, false); }); + it('calling quit while connected without offline queue should end the connection when all commands have finished', function (done) { + var called = false; + client = redis.createClient({ + enable_offline_queue: false + }); + client.on('ready', function () { + client.set('foo', 'bar', function (err, res) { + assert.strictEqual(res, 'OK'); + called = true; + }); + var bool = client.quit(function (err, res) { + assert.strictEqual(res, 'OK'); + assert.strictEqual(err, null); + assert(called); + done(); + }); + // TODO: In v.3 the quit command would be fired right away, so bool should be true + assert.strictEqual(bool, true); + }); + }); + it('do not quit before connected or a connection issue is detected', function (done) { client = redis.createClient(); client.set('foo', 'bar', helper.isString('OK')); diff --git a/test/multi.spec.js b/test/multi.spec.js index 3227ecd..ec2a81a 100644 --- a/test/multi.spec.js +++ b/test/multi.spec.js @@ -628,6 +628,40 @@ describe("The 'multi' method", function () { client.stream.destroy(); }); + it('indivdual commands work properly with multi', function (done) { + // Neither of the following work properly in a transactions: + // (This is due to Redis not returning the reply as expected / resulting in undefined behavior) + // (Likely there are more commands that do not work with a transaction) + // + // auth => can't be called after a multi command + // monitor => results in faulty return values e.g. multi().monitor().set('foo', 'bar').get('foo') + // returns ['OK, 'OK', 'monitor reply'] instead of ['OK', 'OK', 'bar'] + // quit => ends the connection before the exec + // client reply skip|off => results in weird return values. Not sure what exactly happens + // subscribe => enters subscribe mode and this does not work in combination with exec (the same for psubscribe, unsubscribe...) + // + + assert.strictEqual(client.selected_db, undefined); + var multi = client.multi(); + multi.select(5, function (err, res) { + assert.strictEqual(client.selected_db, 5); + assert.strictEqual(res, 'OK'); + assert.notDeepEqual(client.server_info.db5, { avg_ttl: 0, expires: 0, keys: 1 }); + }); + // multi.client('reply', 'on', helper.isString('OK')); // Redis v.3.2 + multi.set('foo', 'bar', helper.isString('OK')); + multi.info(function (err, res) { + assert.strictEqual(res.indexOf('# Server\r\nredis_version:'), 0); + assert.deepEqual(client.server_info.db5, { avg_ttl: 0, expires: 0, keys: 1 }); + }); + multi.get('foo', helper.isString('bar')); + multi.exec(function (err, res) { + res[3] = res[3].substr(0, 10); + assert.deepEqual(res, ['OK', 'OK', '# Server\r\n', 'bar']); + done(); + }); + }); + }); }); }); diff --git a/test/node_redis.spec.js b/test/node_redis.spec.js index fdfc503..774e57a 100644 --- a/test/node_redis.spec.js +++ b/test/node_redis.spec.js @@ -618,6 +618,26 @@ describe('The node_redis client', function () { }); }); + it('monitors reconnects properly and works with the offline queue in a batch statement', function (done) { + var i = 0; + var multi = client.batch(); + multi.MONITOR(helper.isString('OK')); + multi.mget('hello', 'world'); + multi.exec(function (err, res) { + assert.deepEqual(res, ['OK', [null, null]]); + }); + client.on('monitor', function (time, args, rawOutput) { + assert(utils.monitor_regex.test(rawOutput), rawOutput); + assert.deepEqual(args, ['mget', 'hello', 'world']); + if (i++ === 2) { + // End after two reconnects + return done(); + } + client.stream.destroy(); + client.mget('hello', 'world'); + }); + }); + it('monitor does not activate if the command could not be processed properly', function (done) { client.MONITOR(function (err, res) { assert.strictEqual(err.code, 'UNCERTAIN_STATE'); @@ -748,26 +768,27 @@ describe('The node_redis client', function () { }); }); - it('should fire early', function (done) { - client = redis.createClient.apply(null, args); - var fired = false; - client.info(function (err, res) { - fired = true; - }); - client.set('foo', 'bar', function (err, res) { - assert(fired); - done(); - }); - assert.strictEqual(client.offline_queue.length, 1); - assert.strictEqual(client.command_queue.length, 1); - client.on('connect', function () { - assert.strictEqual(client.offline_queue.length, 1); - assert.strictEqual(client.command_queue.length, 1); - }); - client.on('ready', function () { - assert.strictEqual(client.offline_queue.length, 0); - }); - }); + // TODO: consider allowing loading commands in v.3 + // it('should fire early', function (done) { + // client = redis.createClient.apply(null, args); + // var fired = false; + // client.info(function (err, res) { + // fired = true; + // }); + // client.set('foo', 'bar', function (err, res) { + // assert(fired); + // done(); + // }); + // assert.strictEqual(client.offline_queue.length, 1); + // assert.strictEqual(client.command_queue.length, 1); + // client.on('connect', function () { + // assert.strictEqual(client.offline_queue.length, 1); + // assert.strictEqual(client.command_queue.length, 1); + // }); + // client.on('ready', function () { + // assert.strictEqual(client.offline_queue.length, 0); + // }); + // }); }); describe('socket_nodelay', function () {