From cc540dbc3c90e24098be71cf180567b8001f4ce2 Mon Sep 17 00:00:00 2001 From: Ruben Bridgewater Date: Tue, 1 Mar 2016 16:55:12 +0100 Subject: [PATCH] Implement retry_strategy and add more info to the reconnect event --- index.js | 54 ++++++++++++++++++++++++++++--------- test/connection.spec.js | 59 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 13 deletions(-) diff --git a/index.js b/index.js index 7b3db91..57d0c65 100644 --- a/index.js +++ b/index.js @@ -103,6 +103,7 @@ function RedisClient (options) { this.old_state = null; this.send_anyway = false; this.pipeline = 0; + this.times_connected = 0; this.options = options; // Init parser this.reply_parser = new Parser({ @@ -145,14 +146,15 @@ RedisClient.prototype.create_stream = function () { if (this.options.connect_timeout) { this.stream.setTimeout(this.connect_timeout, function () { self.retry_totaltime = self.connect_timeout; - self.connection_gone('timeout'); + self.connection_gone('timeout', new Error('Redis connection gone from timeout event')); }); } /* istanbul ignore next: travis does not work with stunnel atm. Therefor the tls tests are skipped on travis */ - var connect_event = this.options.tls ? "secureConnect" : "connect"; + var connect_event = this.options.tls ? 'secureConnect' : 'connect'; this.stream.once(connect_event, function () { - this.removeAllListeners("timeout"); + this.removeAllListeners('timeout'); + self.times_connected++; self.on_connect(); }); @@ -166,17 +168,18 @@ RedisClient.prototype.create_stream = function () { self.on_error(err); }); - /* istanbul ignore next: travis does not work with stunnel atm. Therefor the tls tests are skipped on travis */ + /* istanbul ignore next: difficult to test and not important as long as we keep this listener */ this.stream.on('clientError', function (err) { + debug('clientError occured'); self.on_error(err); }); this.stream.once('close', function () { - self.connection_gone('close'); + self.connection_gone('close', new Error('Stream connection closed')); }); this.stream.once('end', function () { - self.connection_gone('end'); + self.connection_gone('end', new Error('Stream connection ended')); }); this.stream.on('drain', function () { @@ -268,10 +271,14 @@ RedisClient.prototype.on_error = function (err) { this.connected = false; this.ready = false; - this.emit('error', err); + + // Only emit the error if the retry_stategy option is not set + if (!this.options.retry_strategy) { + this.emit('error', err); + } // 'error' events get turned into exceptions if they aren't listened for. If the user handled this error // then we should try to reconnect. - this.connection_gone('error'); + this.connection_gone('error', err); }; RedisClient.prototype.on_connect = function () { @@ -417,12 +424,15 @@ RedisClient.prototype.send_offline_queue = function () { this.offline_queue = new Queue(); }; -var retry_connection = function (self) { +var retry_connection = function (self, error) { debug('Retrying connection...'); self.emit('reconnecting', { delay: self.retry_delay, - attempt: self.attempts + attempt: self.attempts, + error: error, + times_connected: self.times_connected, + total_retry_time: self.retry_totaltime }); self.retry_totaltime += self.retry_delay; @@ -432,8 +442,7 @@ var retry_connection = function (self) { self.retry_timer = null; }; -RedisClient.prototype.connection_gone = function (why) { - var error; +RedisClient.prototype.connection_gone = function (why, error) { // If a retry is already in progress, just let that happen if (this.retry_timer) { return; @@ -469,6 +478,25 @@ RedisClient.prototype.connection_gone = function (why) { return; } + if (typeof this.options.retry_strategy === 'function') { + this.retry_delay = this.options.retry_strategy({ + attempt: this.attempts, + error: error, + total_retry_time: this.retry_totaltime, + times_connected: this.times_connected + }); + if (typeof this.retry_delay !== 'number') { + // Pass individual error through + if (this.retry_delay instanceof Error) { + error = this.retry_delay; + } + this.flush_and_error(error); + this.emit('error', error); + this.end(false); + return; + } + } + if (this.max_attempts !== 0 && this.attempts >= this.max_attempts || this.retry_totaltime >= this.connect_timeout) { var message = this.retry_totaltime >= this.connect_timeout ? 'connection timeout exceeded.' : @@ -502,7 +530,7 @@ RedisClient.prototype.connection_gone = function (why) { debug('Retry connection in ' + this.retry_delay + ' ms'); - this.retry_timer = setTimeout(retry_connection, this.retry_delay, this); + this.retry_timer = setTimeout(retry_connection, this.retry_delay, this, error); }; RedisClient.prototype.return_error = function (err) { diff --git a/test/connection.spec.js b/test/connection.spec.js index 906ae02..507089c 100644 --- a/test/connection.spec.js +++ b/test/connection.spec.js @@ -127,6 +127,65 @@ describe("connection tests", function () { client.stream.destroy(); }); }); + + it("retry_strategy used to reconnect with individual error", function (done) { + var text = ''; + var unhookIntercept = intercept(function (data) { + text += data; + return ''; + }); + var end = helper.callFuncAfter(done, 2); + client = redis.createClient({ + retry_strategy: function (options) { + if (options.total_retry_time > 150) { + client.set('foo', 'bar', function (err, res) { + assert.strictEqual(err.message, 'Connection timeout'); + end(); + }); + // Pass a individual error message to the error handler + return new Error('Connection timeout'); + } + return Math.min(options.attempt * 25, 200); + }, + max_attempts: 5, + retry_max_delay: 123, + port: 9999 + }); + + client.on('error', function(err) { + unhookIntercept(); + assert.strictEqual( + text, + 'node_redis: WARNING: You activated the retry_strategy and max_attempts at the same time. This is not possible and max_attempts will be ignored.\n' + + 'node_redis: WARNING: You activated the retry_strategy and retry_max_delay at the same time. This is not possible and retry_max_delay will be ignored.\n' + ); + assert.strictEqual(err.message, 'Connection timeout'); + assert(!err.code); + end(); + }); + }); + + it("retry_strategy used to reconnect", function (done) { + var end = helper.callFuncAfter(done, 2); + client = redis.createClient({ + retry_strategy: function (options) { + if (options.total_retry_time > 150) { + client.set('foo', 'bar', function (err, res) { + assert.strictEqual(err.code, 'ECONNREFUSED'); + end(); + }); + return false; + } + return Math.min(options.attempt * 25, 200); + }, + port: 9999 + }); + + client.on('error', function(err) { + assert.strictEqual(err.code, 'ECONNREFUSED'); + end(); + }); + }); }); describe("when not connected", function () {