Browse Source

Merge pull request #878 from NodeRedis/pipeline

node_redis on speed
greenkeeper-update-all
Ruben Bridgewater 9 years ago
parent
commit
352f8a5be3
  1. 110
      README.md
  2. 62
      benchmarks/multi_bench.js
  3. 50
      changelog.md
  4. 302
      index.js
  5. 14
      lib/parsers/javascript.js
  6. 61
      lib/queue.js
  7. 7
      lib/utils.js
  8. 3
      package.json
  9. 3
      test/auth.spec.js
  10. 323
      test/batch.spec.js
  11. 57
      test/commands/multi.spec.js
  12. 8
      test/commands/select.spec.js
  13. 3
      test/commands/setex.spec.js
  14. 60
      test/detect_buffers.spec.js
  15. 4
      test/parser/javascript.spec.js
  16. 39
      test/queue.spec.js
  17. 49
      test/rename.spec.js
  18. 232
      test/return_buffers.spec.js

110
README.md

@ -152,8 +152,13 @@ So please attach the error listener to node_redis.
### "drain"
`client` will emit `drain` when the TCP connection to the Redis server has been buffering, but is now
writable. This event can be used to stream commands in to Redis and adapt to backpressure. Right now,
you need to check `client.command_queue.length` to decide when to reduce your send rate and resume sending commands when you get `drain`.
writable. This event can be used to stream commands in to Redis and adapt to backpressure.
All commands return a boolean if the stream had to buffer or not. If false is returned the stream had to buffer.
That way you can decide when to reduce your send rate and resume sending commands when you get `drain`.
You can manually control the low water and high water marks by passing ommand_queue_high_water` and `command_queue_low_water` to the client options.
Check the [Node.js streams API](https://nodejs.org/api/stream.html) for further info.
### "idle"
@ -403,6 +408,7 @@ channel name as `channel` and the new count of subscriptions for this client as
`MULTI` commands are queued up until an `EXEC` is issued, and then all commands are run atomically by
Redis. The interface in `node_redis` is to return an individual `Multi` object by calling `client.multi()`.
If any command fails to queue, all commands are rolled back and none is going to be executed (For further information look at [transactions](http://redis.io/topics/transactions)).
```js
var redis = require("./index"),
@ -486,6 +492,21 @@ client.multi([
});
```
### Multi.exec_atomic( callback )
Identical to Multi.exec but with the difference that executing a single command will not use transactions.
## client.batch([commands])
Identical to .multi without transactions. This is recommended if you want to execute many commands at once but don't have to rely on transactions.
`BATCH` commands are queued up until an `EXEC` is issued, and then all commands are run atomically by
Redis. The interface in `node_redis` is to return an individual `Batch` object by calling `client.batch()`.
The only difference between .batch and .multi is that no transaction is going to be used.
Be aware that the errors are - just like in multi statements - in the result. Otherwise both, errors and results could be returned at the same time.
If you fire many commands at once this is going to **boost the execution speed by up to 400%** [sic!] compared to fireing the same commands in a loop without waiting for the result! See the benchmarks for further comparison. Please remember that all commands are kept in memory until they are fired.
## Monitor mode
Redis supports the `MONITOR` command, which lets you see all commands received by the Redis server
@ -628,42 +649,59 @@ Here are results of `multi_bench.js` which is similar to `redis-benchmark` from
hiredis parser (Lenovo T450s i7-5600U):
Client count: 5, node version: 2.5.0, server version: 3.0.3, parser: hiredis
PING, 1/5 min/max/avg/p95: 0/ 3/ 0.03/ 0.00 540ms total, 37037.04 ops/sec
PING, 50/5 min/max/avg/p95: 0/ 4/ 0.65/ 1.00 260ms total, 76923.08 ops/sec
SET 4B str, 1/5 min/max/avg/p95: 0/ 20/ 0.04/ 0.00 816ms total, 24509.80 ops/sec
SET 4B str, 50/5 min/max/avg/p95: 0/ 13/ 1.00/ 2.00 401ms total, 49875.31 ops/sec
SET 4B buf, 1/5 min/max/avg/p95: 0/ 4/ 0.06/ 1.00 1293ms total, 15467.90 ops/sec
SET 4B buf, 50/5 min/max/avg/p95: 0/ 5/ 2.58/ 4.00 1033ms total, 19361.08 ops/sec
GET 4B str, 1/5 min/max/avg/p95: 0/ 14/ 0.03/ 0.00 717ms total, 27894.00 ops/sec
GET 4B str, 50/5 min/max/avg/p95: 0/ 3/ 0.62/ 1.00 249ms total, 80321.29 ops/sec
GET 4B buf, 1/5 min/max/avg/p95: 0/ 6/ 0.03/ 0.00 561ms total, 35650.62 ops/sec
GET 4B buf, 50/5 min/max/avg/p95: 0/ 3/ 0.64/ 1.00 259ms total, 77220.08 ops/sec
SET 4KiB str, 1/5 min/max/avg/p95: 0/ 4/ 0.03/ 0.00 678ms total, 29498.53 ops/sec
SET 4KiB str, 50/5 min/max/avg/p95: 0/ 3/ 0.91/ 2.00 364ms total, 54945.05 ops/sec
SET 4KiB buf, 1/5 min/max/avg/p95: 0/ 20/ 0.07/ 1.00 1354ms total, 14771.05 ops/sec
SET 4KiB buf, 50/5 min/max/avg/p95: 0/ 5/ 1.86/ 3.00 744ms total, 26881.72 ops/sec
GET 4KiB str, 1/5 min/max/avg/p95: 0/ 3/ 0.03/ 0.00 575ms total, 34782.61 ops/sec
GET 4KiB str, 50/5 min/max/avg/p95: 0/ 5/ 0.82/ 2.00 327ms total, 61162.08 ops/sec
GET 4KiB buf, 1/5 min/max/avg/p95: 0/ 25/ 0.04/ 0.00 808ms total, 24752.48 ops/sec
GET 4KiB buf, 50/5 min/max/avg/p95: 0/ 4/ 0.92/ 2.00 371ms total, 53908.36 ops/sec
INCR, 1/5 min/max/avg/p95: 0/ 28/ 0.03/ 0.00 556ms total, 35971.22 ops/sec
INCR, 50/5 min/max/avg/p95: 0/ 4/ 0.67/ 1.00 269ms total, 74349.44 ops/sec
LPUSH, 1/5 min/max/avg/p95: 0/ 2/ 0.03/ 0.00 534ms total, 37453.18 ops/sec
LPUSH, 50/5 min/max/avg/p95: 0/ 2/ 0.89/ 2.00 357ms total, 56022.41 ops/sec
LRANGE 10, 1/5 min/max/avg/p95: 0/ 12/ 0.04/ 0.00 829ms total, 24125.45 ops/sec
LRANGE 10, 50/5 min/max/avg/p95: 0/ 3/ 1.04/ 2.00 415ms total, 48192.77 ops/sec
LRANGE 100, 1/5 min/max/avg/p95: 0/ 16/ 0.06/ 1.00 1212ms total, 16501.65 ops/sec
LRANGE 100, 50/5 min/max/avg/p95: 0/ 5/ 1.76/ 3.00 707ms total, 28288.54 ops/sec
SET 4MiB buf, 1/5 min/max/avg/p95: 1/ 22/ 2.66/ 4.00 1335ms total, 374.53 ops/sec
SET 4MiB buf, 50/5 min/max/avg/p95: 13/ 122/ 101.33/ 114.00 1062ms total, 470.81 ops/sec
GET 4MiB str, 1/5 min/max/avg/p95: 3/ 14/ 6.07/ 12.00 607ms total, 164.74 ops/sec
GET 4MiB str, 50/5 min/max/avg/p95: 17/ 431/ 286.75/ 418.00 686ms total, 145.77 ops/sec
GET 4MiB buf, 1/5 min/max/avg/p95: 3/ 38/ 6.83/ 12.95 684ms total, 146.20 ops/sec
GET 4MiB buf, 50/5 min/max/avg/p95: 10/ 273/ 194.07/ 253.90 495ms total, 202.02 ops/sec
Client count: 5, node version: 4.1.2, server version: 3.0.3, parser: hiredis
PING, 1/5 min/max/avg/p95: 0/ 4/ 0.02/ 0.00 1223ms total, 40883.07 ops/sec
PING, 50/5 min/max/avg/p95: 0/ 3/ 0.50/ 1.00 497ms total, 100603.62 ops/sec
PING, batch 50/5 min/max/avg/p95: 0/ 1/ 0.15/ 1.00 308ms total, 324675.32 ops/sec
SET 4B str, 1/5 min/max/avg/p95: 0/ 2/ 0.03/ 0.00 1402ms total, 35663.34 ops/sec
SET 4B str, 50/5 min/max/avg/p95: 0/ 2/ 0.53/ 1.00 534ms total, 93632.96 ops/sec
SET 4B str, batch 50/5 min/max/avg/p95: 0/ 1/ 0.19/ 1.00 392ms total, 255102.04 ops/sec
SET 4B buf, 1/5 min/max/avg/p95: 0/ 2/ 0.05/ 1.00 2433ms total, 20550.76 ops/sec
SET 4B buf, 50/5 min/max/avg/p95: 0/ 5/ 1.65/ 3.00 1652ms total, 30266.34 ops/sec
SET 4B buf, batch 50/5 min/max/avg/p95: 0/ 3/ 0.36/ 1.00 726ms total, 137741.05 ops/sec
GET 4B str, 1/5 min/max/avg/p95: 0/ 1/ 0.03/ 0.00 1314ms total, 38051.75 ops/sec
GET 4B str, 50/5 min/max/avg/p95: 0/ 3/ 0.53/ 1.00 529ms total, 94517.96 ops/sec
GET 4B str, batch 50/5 min/max/avg/p95: 0/ 1/ 0.16/ 1.00 328ms total, 304878.05 ops/sec
GET 4B buf, 1/5 min/max/avg/p95: 0/ 2/ 0.03/ 0.00 1389ms total, 35997.12 ops/sec
GET 4B buf, 50/5 min/max/avg/p95: 0/ 2/ 0.52/ 1.00 519ms total, 96339.11 ops/sec
GET 4B buf, batch 50/5 min/max/avg/p95: 0/ 1/ 0.16/ 1.00 168ms total, 297619.05 ops/sec
SET 4KiB str, 1/5 min/max/avg/p95: 0/ 3/ 0.03/ 0.00 1670ms total, 29940.12 ops/sec
SET 4KiB str, 50/5 min/max/avg/p95: 0/ 5/ 0.94/ 2.00 941ms total, 53134.96 ops/sec
SET 4KiB str, batch 50/5 min/max/avg/p95: 0/ 2/ 0.49/ 1.00 984ms total, 101626.02 ops/sec
SET 4KiB buf, 1/5 min/max/avg/p95: 0/ 1/ 0.05/ 0.00 2423ms total, 20635.58 ops/sec
SET 4KiB buf, 50/5 min/max/avg/p95: 0/ 5/ 1.60/ 3.00 1598ms total, 31289.11 ops/sec
SET 4KiB buf, batch 50/5 min/max/avg/p95: 0/ 1/ 0.41/ 1.00 825ms total, 121212.12 ops/sec
GET 4KiB str, 1/5 min/max/avg/p95: 0/ 1/ 0.03/ 0.00 1483ms total, 33715.44 ops/sec
GET 4KiB str, 50/5 min/max/avg/p95: 0/ 3/ 0.69/ 1.00 691ms total, 72358.90 ops/sec
GET 4KiB str, batch 50/5 min/max/avg/p95: 0/ 2/ 0.38/ 1.00 759ms total, 131752.31 ops/sec
GET 4KiB buf, 1/5 min/max/avg/p95: 0/ 3/ 0.03/ 0.00 1485ms total, 33670.03 ops/sec
GET 4KiB buf, 50/5 min/max/avg/p95: 0/ 3/ 0.80/ 2.00 797ms total, 62735.26 ops/sec
GET 4KiB buf, batch 50/5 min/max/avg/p95: 0/ 2/ 0.39/ 1.00 396ms total, 126262.63 ops/sec
INCR, 1/5 min/max/avg/p95: 0/ 2/ 0.03/ 0.00 1376ms total, 36337.21 ops/sec
INCR, 50/5 min/max/avg/p95: 0/ 3/ 0.53/ 1.00 529ms total, 94517.96 ops/sec
INCR, batch 50/5 min/max/avg/p95: 0/ 1/ 0.17/ 1.00 339ms total, 294985.25 ops/sec
LPUSH, 1/5 min/max/avg/p95: 0/ 3/ 0.03/ 0.00 1394ms total, 35868.01 ops/sec
LPUSH, 50/5 min/max/avg/p95: 0/ 3/ 0.58/ 1.00 584ms total, 85616.44 ops/sec
LPUSH, batch 50/5 min/max/avg/p95: 0/ 1/ 0.19/ 1.00 383ms total, 261096.61 ops/sec
LRANGE 10, 1/5 min/max/avg/p95: 0/ 4/ 0.03/ 0.00 1706ms total, 29308.32 ops/sec
LRANGE 10, 50/5 min/max/avg/p95: 0/ 3/ 0.71/ 1.00 712ms total, 70224.72 ops/sec
LRANGE 10, batch 50/5 min/max/avg/p95: 0/ 1/ 0.38/ 1.00 772ms total, 129533.68 ops/sec
LRANGE 100, 1/5 min/max/avg/p95: 0/ 1/ 0.06/ 1.00 3026ms total, 16523.46 ops/sec
LRANGE 100, 50/5 min/max/avg/p95: 0/ 5/ 1.88/ 3.00 1882ms total, 26567.48 ops/sec
LRANGE 100, batch 50/5 min/max/avg/p95: 2/ 4/ 2.09/ 3.00 4189ms total, 23872.05 ops/sec
SET 4MiB buf, 1/5 min/max/avg/p95: 1/ 7/ 2.08/ 3.00 1044ms total, 478.93 ops/sec
SET 4MiB buf, 20/5 min/max/avg/p95: 17/ 50/ 40.02/ 46.00 1022ms total, 489.24 ops/sec
SET 4MiB buf, batch 20/5 min/max/avg/p95: 37/ 45/ 39.00/ 44.40 975ms total, 512.82 ops/sec
GET 4MiB str, 1/5 min/max/avg/p95: 4/ 15/ 6.31/ 10.00 634ms total, 157.73 ops/sec
GET 4MiB str, 20/5 min/max/avg/p95: 7/ 124/ 88.27/ 110.80 476ms total, 210.08 ops/sec
GET 4MiB str, batch 20/5 min/max/avg/p95: 76/ 99/ 89.00/ 99.00 446ms total, 224.22 ops/sec
GET 4MiB buf, 1/5 min/max/avg/p95: 4/ 12/ 5.67/ 10.00 568ms total, 176.06 ops/sec
GET 4MiB buf, 20/5 min/max/avg/p95: 14/ 133/ 85.34/ 107.95 458ms total, 218.34 ops/sec
GET 4MiB buf, batch 20/5 min/max/avg/p95: 78/ 96/ 88.00/ 96.00 440ms total, 227.27 ops/sec
End of tests. Total time elapsed: 50421 ms
The hiredis and js parser should most of the time be on the same level. The js parser lacks speed for large responses though.
Therefor the hiredis parser is the default used in node_redis. To use `hiredis`, do:
Therefor the hiredis parser is the default used in node_redis and we recommend using the hiredis parser. To use `hiredis`, do:
npm install hiredis redis

62
benchmarks/multi_bench.js

@ -3,6 +3,7 @@
var path = require('path');
var RedisProcess = require('../test/lib/redis-process');
var rp;
var client_nr = 0;
var redis = require('../index');
var totalTime = 0;
var metrics = require('metrics');
@ -42,6 +43,7 @@ function Test(args) {
this.commands_sent = 0;
this.commands_completed = 0;
this.max_pipeline = this.args.pipeline || num_requests;
this.batch_pipeline = this.args.batch || 0;
this.client_options = args.client_options || client_options;
this.num_requests = args.reqs || num_requests;
@ -105,7 +107,7 @@ Test.prototype.new_client = function (id) {
};
Test.prototype.on_clients_ready = function () {
process.stdout.write(lpad(this.args.descr, 13) + ', ' + lpad(this.args.pipeline, 5) + '/' + this.clients_ready + ' ');
process.stdout.write(lpad(this.args.descr, 13) + ', ' + (this.args.batch ? lpad('batch ' + this.args.batch, 9) : lpad(this.args.pipeline, 9)) + '/' + this.clients_ready + ' ');
this.test_start = Date.now();
this.fill_pipeline();
@ -114,10 +116,14 @@ Test.prototype.on_clients_ready = function () {
Test.prototype.fill_pipeline = function () {
var pipeline = this.commands_sent - this.commands_completed;
while (this.commands_sent < this.num_requests && pipeline < this.max_pipeline) {
this.commands_sent++;
pipeline++;
this.send_next();
if (this.batch_pipeline && this.commands_sent < this.num_requests) {
this.batch();
} else {
while (this.commands_sent < this.num_requests && pipeline < this.max_pipeline) {
this.commands_sent++;
pipeline++;
this.send_next();
}
}
if (this.commands_completed === this.num_requests) {
@ -126,6 +132,28 @@ Test.prototype.fill_pipeline = function () {
}
};
Test.prototype.batch = function () {
var self = this,
cur_client = client_nr++ % this.clients.length,
start = Date.now(),
i = 0,
batch = this.clients[cur_client].batch();
while (i++ < this.batch_pipeline) {
this.commands_sent++;
batch[this.args.command](this.args.args);
}
batch.exec(function (err, res) {
if (err) {
throw err;
}
self.commands_completed += res.length;
self.command_latency.update(Date.now() - start);
self.fill_pipeline();
});
};
Test.prototype.stop_clients = function () {
var self = this;
@ -160,7 +188,7 @@ Test.prototype.print_stats = function () {
totalTime += duration;
console.log('min/max/avg/p95: ' + this.command_latency.print_line() + ' ' + lpad(duration, 6) + 'ms total, ' +
lpad((this.num_requests / (duration / 1000)).toFixed(2), 8) + ' ops/sec');
lpad((this.num_requests / (duration / 1000)).toFixed(2), 9) + ' ops/sec');
};
small_str = '1234';
@ -172,51 +200,67 @@ very_large_buf = new Buffer(very_large_str);
tests.push(new Test({descr: 'PING', command: 'ping', args: [], pipeline: 1}));
tests.push(new Test({descr: 'PING', command: 'ping', args: [], pipeline: 50}));
tests.push(new Test({descr: 'PING', command: 'ping', args: [], batch: 50, reqs: num_requests * 2}));
tests.push(new Test({descr: 'SET 4B str', command: 'set', args: ['foo_rand000000000000', small_str], pipeline: 1}));
tests.push(new Test({descr: 'SET 4B str', command: 'set', args: ['foo_rand000000000000', small_str], pipeline: 50}));
tests.push(new Test({descr: 'SET 4B str', command: 'set', args: ['foo_rand000000000000', small_str], batch: 50, reqs: num_requests * 2}));
tests.push(new Test({descr: 'SET 4B buf', command: 'set', args: ['foo_rand000000000000', small_buf], pipeline: 1}));
tests.push(new Test({descr: 'SET 4B buf', command: 'set', args: ['foo_rand000000000000', small_buf], pipeline: 50}));
tests.push(new Test({descr: 'SET 4B buf', command: 'set', args: ['foo_rand000000000000', small_buf], batch: 50, reqs: num_requests * 2}));
tests.push(new Test({descr: 'GET 4B str', command: 'get', args: ['foo_rand000000000000'], pipeline: 1}));
tests.push(new Test({descr: 'GET 4B str', command: 'get', args: ['foo_rand000000000000'], pipeline: 50}));
tests.push(new Test({descr: 'GET 4B str', command: 'get', args: ['foo_rand000000000000'], batch: 50, reqs: num_requests * 2}));
tests.push(new Test({descr: 'GET 4B buf', command: 'get', args: ['foo_rand000000000000'], pipeline: 1, client_opts: { return_buffers: true} }));
tests.push(new Test({descr: 'GET 4B buf', command: 'get', args: ['foo_rand000000000000'], pipeline: 50, client_opts: { return_buffers: true} }));
tests.push(new Test({descr: 'GET 4B buf', command: 'get', args: ['foo_rand000000000000'], batch: 50, client_opts: { return_buffers: true} }));
tests.push(new Test({descr: 'SET 4KiB str', command: 'set', args: ['foo_rand000000000001', large_str], pipeline: 1}));
tests.push(new Test({descr: 'SET 4KiB str', command: 'set', args: ['foo_rand000000000001', large_str], pipeline: 50}));
tests.push(new Test({descr: 'SET 4KiB str', command: 'set', args: ['foo_rand000000000001', large_str], batch: 50, reqs: num_requests * 2}));
tests.push(new Test({descr: 'SET 4KiB buf', command: 'set', args: ['foo_rand000000000001', large_buf], pipeline: 1}));
tests.push(new Test({descr: 'SET 4KiB buf', command: 'set', args: ['foo_rand000000000001', large_buf], pipeline: 50}));
tests.push(new Test({descr: 'SET 4KiB buf', command: 'set', args: ['foo_rand000000000001', large_buf], batch: 50, reqs: num_requests * 2}));
tests.push(new Test({descr: 'GET 4KiB str', command: 'get', args: ['foo_rand000000000001'], pipeline: 1}));
tests.push(new Test({descr: 'GET 4KiB str', command: 'get', args: ['foo_rand000000000001'], pipeline: 50}));
tests.push(new Test({descr: 'GET 4KiB str', command: 'get', args: ['foo_rand000000000001'], batch: 50, reqs: num_requests * 2}));
tests.push(new Test({descr: 'GET 4KiB buf', command: 'get', args: ['foo_rand000000000001'], pipeline: 1, client_opts: { return_buffers: true} }));
tests.push(new Test({descr: 'GET 4KiB buf', command: 'get', args: ['foo_rand000000000001'], pipeline: 50, client_opts: { return_buffers: true} }));
tests.push(new Test({descr: 'GET 4KiB buf', command: 'get', args: ['foo_rand000000000001'], batch: 50, client_opts: { return_buffers: true} }));
tests.push(new Test({descr: 'INCR', command: 'incr', args: ['counter_rand000000000000'], pipeline: 1}));
tests.push(new Test({descr: 'INCR', command: 'incr', args: ['counter_rand000000000000'], pipeline: 50}));
tests.push(new Test({descr: 'INCR', command: 'incr', args: ['counter_rand000000000000'], batch: 50, reqs: num_requests * 2}));
tests.push(new Test({descr: 'LPUSH', command: 'lpush', args: ['mylist', small_str], pipeline: 1}));
tests.push(new Test({descr: 'LPUSH', command: 'lpush', args: ['mylist', small_str], pipeline: 50}));
tests.push(new Test({descr: 'LPUSH', command: 'lpush', args: ['mylist', small_str], batch: 50, reqs: num_requests * 2}));
tests.push(new Test({descr: 'LRANGE 10', command: 'lrange', args: ['mylist', '0', '9'], pipeline: 1}));
tests.push(new Test({descr: 'LRANGE 10', command: 'lrange', args: ['mylist', '0', '9'], pipeline: 50}));
tests.push(new Test({descr: 'LRANGE 10', command: 'lrange', args: ['mylist', '0', '9'], batch: 50, reqs: num_requests * 2}));
tests.push(new Test({descr: 'LRANGE 100', command: 'lrange', args: ['mylist', '0', '99'], pipeline: 1}));
tests.push(new Test({descr: 'LRANGE 100', command: 'lrange', args: ['mylist', '0', '99'], pipeline: 50}));
tests.push(new Test({descr: 'LRANGE 100', command: 'lrange', args: ['mylist', '0', '99'], batch: 50, reqs: num_requests * 2}));
tests.push(new Test({descr: 'SET 4MiB buf', command: 'set', args: ['foo_rand000000000002', very_large_buf], pipeline: 1, reqs: 500}));
tests.push(new Test({descr: 'SET 4MiB buf', command: 'set', args: ['foo_rand000000000002', very_large_buf], pipeline: 50, reqs: 500}));
tests.push(new Test({descr: 'SET 4MiB buf', command: 'set', args: ['foo_rand000000000002', very_large_buf], pipeline: 20, reqs: 500}));
tests.push(new Test({descr: 'SET 4MiB buf', command: 'set', args: ['foo_rand000000000002', very_large_buf], batch: 20, reqs: 500}));
tests.push(new Test({descr: 'GET 4MiB str', command: 'get', args: ['foo_rand000000000002'], pipeline: 1, reqs: 100}));
tests.push(new Test({descr: 'GET 4MiB str', command: 'get', args: ['foo_rand000000000002'], pipeline: 50, reqs: 100}));
tests.push(new Test({descr: 'GET 4MiB str', command: 'get', args: ['foo_rand000000000002'], pipeline: 20, reqs: 100}));
tests.push(new Test({descr: 'GET 4MiB str', command: 'get', args: ['foo_rand000000000002'], batch: 20, reqs: 100}));
tests.push(new Test({descr: 'GET 4MiB buf', command: 'get', args: ['foo_rand000000000002'], pipeline: 1, reqs: 100, client_opts: { return_buffers: true} }));
tests.push(new Test({descr: 'GET 4MiB buf', command: 'get', args: ['foo_rand000000000002'], pipeline: 50, reqs: 100, client_opts: { return_buffers: true} }));
tests.push(new Test({descr: 'GET 4MiB buf', command: 'get', args: ['foo_rand000000000002'], pipeline: 20, reqs: 100, client_opts: { return_buffers: true} }));
tests.push(new Test({descr: 'GET 4MiB buf', command: 'get', args: ['foo_rand000000000002'], batch: 20, reqs: 100, client_opts: { return_buffers: true} }));
function next() {
var test = tests.shift();

50
changelog.md

@ -1,16 +1,58 @@
Changelog
=========
## v.2.x.x - xx, 2015
## v.2.2.0 - xx Oct, 2015 - The peregrino falcon
The peregrino falcon is the fasted bird on earth and this is what this release is all about: We increased performance for heavy usage by up to **400%** [sic!] and increased overall performance for any command as well. Please check the benchmarks in the [README.md](README.md) for further details.
Features
- Added disable_resubscribing option to prevent a client from resubscribing after reconnecting (@BridgeAR)
- Added rename_commands options to handle renamed commands from the redis config (@digmxl & @BridgeAR)
- Added rename_commands options to handle renamed commands from the redis config ([@digmxl](https://github.com/digmxl) & [@BridgeAR](https://github.com/BridgeAR))
- Added disable_resubscribing option to prevent a client from resubscribing after reconnecting ([@BridgeAR](https://github.com/BridgeAR))
- Increased performance ([@BridgeAR](https://github.com/BridgeAR))
- exchanging built in queue with [@petkaantonov](https://github.com/petkaantonov)'s [double-ended queue](https://github.com/petkaantonov/deque)
- prevent polymorphism
- optimize statements
- Added *.batch* command, similar to .multi but without transaction ([@BridgeAR](https://github.com/BridgeAR))
- Improved pipelining to minimize the [RTT](http://redis.io/topics/pipelining) further ([@BridgeAR](https://github.com/BridgeAR))
Bugfixes
- Fix a javascript parser regression introduced in 2.0 that could result in timeouts on high load. (@BridgeAR)
- Fix a javascript parser regression introduced in 2.0 that could result in timeouts on high load. ([@BridgeAR](https://github.com/BridgeAR))
- Fixed should_buffer boolean for .exec, .select and .auth commands not being returned and fix a couple special conditions ([@BridgeAR](https://github.com/BridgeAR))
If you do not rely on transactions but want to reduce the RTT you can use .batch from now on. It'll behave just the same as .multi but it does not have any transaction and therefor won't roll back any failed commands.<br>
Both .multi and .batch are from now on going to cache the commands and release them while calling .exec.
Please consider using .batch instead of looping through a lot of commands one by one. This will significantly improve your performance.
Here are some stats compared to ioredis 1.9.1:
simple set
82,496 op/s » ioredis
112,617 op/s » node_redis
simple get
82,015 op/s » ioredis
105,701 op/s » node_redis
simple get with pipeline
10,233 op/s » ioredis
26,541 op/s » node_redis
lrange 100
7,321 op/s » ioredis
26,155 op/s » node_redis
publish
90,524 op/s » ioredis
112,823 op/s » node_redis
subscribe
43,783 op/s » ioredis
61,889 op/s » node_redis
To conclude: we can proudly say that node_redis is very likely outperforming any other node redis client.
## v2.1.0 - Oct 02, 2015

302
index.js

@ -4,7 +4,7 @@ var net = require('net');
var URL = require('url');
var util = require('util');
var utils = require('./lib/utils');
var Queue = require('./lib/queue');
var Queue = require('double-ended-queue');
var Command = require('./lib/command');
var events = require('events');
var parsers = [];
@ -34,10 +34,19 @@ parsers.push(require('./lib/parsers/javascript'));
function RedisClient(stream, options) {
options = options || {};
var self = this;
this.pipeline = 0;
if (!stream.cork) {
stream.cork = function noop() {
self.pipeline_queue = new Queue();
};
stream.uncork = function noop() {};
this.write = this.writeStream;
}
this.stream = stream;
this.options = options;
this.connection_id = ++connection_id;
this.connected = false;
this.ready = false;
@ -55,18 +64,22 @@ function RedisClient(stream, options) {
}
}
}
this.options.return_buffers = !!this.options.return_buffers;
this.options.detect_buffers = !!this.options.detect_buffers;
// Override the detect_buffers setting if return_buffers is active and print a warning
if (this.options.return_buffers && this.options.detect_buffers) {
console.warn('>> WARNING: You activated return_buffers and detect_buffers at the same time. The return value is always going to be a buffer.');
this.options.detect_buffers = false;
}
this.should_buffer = false;
this.command_queue_high_water = options.command_queue_high_water || 1000;
this.command_queue_low_water = options.command_queue_low_water || 0;
this.max_attempts = +options.max_attempts || 0;
this.command_queue = new Queue(); // holds sent commands to de-pipeline them
this.offline_queue = new Queue(); // holds commands issued but not able to be sent
this.command_queue = new Queue(); // Holds sent commands to de-pipeline them
this.offline_queue = new Queue(); // Holds commands issued but not able to be sent
this.commands_sent = 0;
this.connect_timeout = +options.connect_timeout || 86400000; // 24 * 60 * 60 * 1000 ms
this.enable_offline_queue = true;
if (options.enable_offline_queue === false) {
this.enable_offline_queue = false;
}
this.enable_offline_queue = options.enable_offline_queue === false ? false : true;
this.retry_max_delay = +options.retry_max_delay || null;
this.initialize_retry_vars();
this.pub_sub_mode = false;
@ -76,7 +89,7 @@ function RedisClient(stream, options) {
this.server_info = {};
this.auth_pass = options.auth_pass;
this.parser_module = null;
this.selected_db = null; // save the selected db here, used when reconnecting
this.selected_db = null; // Save the selected db here, used when reconnecting
this.old_state = null;
this.install_stream_listeners();
@ -138,21 +151,19 @@ RedisClient.prototype.unref = function () {
// flush offline_queue and command_queue, erroring any items with a callback first
RedisClient.prototype.flush_and_error = function (error) {
var command_obj;
while (command_obj = this.offline_queue.shift()) {
if (typeof command_obj.callback === 'function') {
error.command = command_obj.command.toUpperCase();
command_obj.callback(error);
}
}
this.offline_queue = new Queue();
while (command_obj = this.command_queue.shift()) {
if (typeof command_obj.callback === 'function') {
error.command = command_obj.command.toUpperCase();
command_obj.callback(error);
}
}
this.offline_queue = new Queue();
this.command_queue = new Queue();
};
@ -233,7 +244,6 @@ RedisClient.prototype.on_connect = function () {
this.connected = true;
this.ready = false;
this.connections += 1;
this.command_queue = new Queue();
this.emitted_end = false;
if (this.options.socket_nodelay) {
this.stream.setNoDelay();
@ -430,10 +440,10 @@ RedisClient.prototype.send_offline_queue = function () {
debug('Sending offline command: ' + command_obj.command);
buffered_writes += !this.send_command(command_obj.command, command_obj.args, command_obj.callback);
}
this.offline_queue = new Queue();
// Even though items were shifted off, Queue backing store still uses memory until next add, so just get a new Queue
this.offline_queue = new Queue();
if (!buffered_writes) {
if (buffered_writes === 0) {
this.should_buffer = false;
this.emit('drain');
}
@ -511,7 +521,7 @@ RedisClient.prototype.connection_gone = function (why) {
this.retry_delay = this.connect_timeout - this.retry_totaltime;
}
debug("Retry connection in " + this.retry_delay + " ms");
debug('Retry connection in ' + this.retry_delay + ' ms');
this.retry_timer = setTimeout(retry_connection, this.retry_delay, this);
};
@ -531,7 +541,18 @@ RedisClient.prototype.return_error = function (err) {
err.code = match[1];
}
this.emit_drain_idle(queue_len);
if (command_obj.callback) {
command_obj.callback(err);
} else {
this.emit('error', err);
}
};
RedisClient.prototype.emit_drain_idle = function (queue_len) {
if (this.pub_sub_mode === false && queue_len === 0) {
// Free the queue capacity memory by using a new queue
this.command_queue = new Queue();
this.emit('idle');
}
@ -540,12 +561,6 @@ RedisClient.prototype.return_error = function (err) {
this.emit('drain');
this.should_buffer = false;
}
if (command_obj.callback) {
command_obj.callback(err);
} else {
this.emit('error', err);
}
};
RedisClient.prototype.return_reply = function (reply) {
@ -566,37 +581,29 @@ RedisClient.prototype.return_reply = function (reply) {
queue_len = this.command_queue.length;
if (this.pub_sub_mode === false && queue_len === 0) {
this.command_queue = new Queue(); // explicitly reclaim storage from old Queue
this.emit('idle');
}
if (this.should_buffer && queue_len <= this.command_queue_low_water) {
this.emit('drain');
this.should_buffer = false;
}
this.emit_drain_idle(queue_len);
if (command_obj && !command_obj.sub_command) {
if (typeof command_obj.callback === 'function') {
if ('exec' !== command_obj.command) {
if (this.options.detect_buffers && command_obj.buffer_args === false) {
if (command_obj.buffer_args === false) {
// If detect_buffers option was specified, then the reply from the parser will be Buffers.
// If this command did not use Buffer arguments, then convert the reply to Strings here.
reply = utils.reply_to_strings(reply);
}
// TODO - confusing and error-prone that hgetall is special cased in two places
if (reply && 'hgetall' === command_obj.command) {
if ('hgetall' === command_obj.command) {
reply = utils.reply_to_object(reply);
}
}
command_obj.callback(null, reply);
} else {
debug('No callback for reply');
}
} else if (this.pub_sub_mode || command_obj && command_obj.sub_command) {
if (Array.isArray(reply)) {
if (!this.options.return_buffers && (!command_obj || this.options.detect_buffers && command_obj.buffer_args === false)) {
if (!command_obj || command_obj.buffer_args === false) {
reply = utils.reply_to_strings(reply);
}
type = reply[0].toString();
@ -620,11 +627,9 @@ RedisClient.prototype.return_reply = function (reply) {
this.emit(type, reply[1], reply[2]); // channel, count
} else {
this.emit('error', new Error('subscriptions are active but got unknown reply type ' + type));
return;
}
} else if (!this.closing) {
this.emit('error', new Error('subscriptions are active but got an invalid reply: ' + reply));
return;
}
}
/* istanbul ignore else: this is a safety check that we should not be able to trigger */
@ -648,7 +653,12 @@ RedisClient.prototype.return_reply = function (reply) {
};
RedisClient.prototype.send_command = function (command, args, callback) {
var arg, command_obj, i, elem_count, buffer_args, stream = this.stream, command_str = '', buffered_writes = 0, err;
var arg, command_obj, i, err,
stream = this.stream,
command_str = '',
buffered_writes = 0,
buffer_args = false,
buffer = this.options.return_buffers;
if (args === undefined) {
args = [];
@ -660,7 +670,7 @@ RedisClient.prototype.send_command = function (command, args, callback) {
}
}
if (process.domain && callback) {
if (callback && process.domain) {
callback = process.domain.bind(callback);
}
@ -671,22 +681,26 @@ RedisClient.prototype.send_command = function (command, args, callback) {
err = new Error('send_command: ' + command + ' value must not be undefined or null');
err.command = command;
if (callback) {
return callback && callback(err);
callback(err);
} else {
this.emit('error', err);
}
this.emit('error', err);
return;
// Singal no buffering
return true;
}
}
buffer_args = false;
for (i = 0; i < args.length; i += 1) {
if (Buffer.isBuffer(args[i])) {
buffer_args = true;
break;
}
}
if (this.options.detect_buffers) {
buffer = buffer_args;
}
command_obj = new Command(command, args, false, buffer_args, callback);
command_obj = new Command(command, args, false, buffer, callback);
if (!this.ready && !this.send_anyway || !stream.writable) {
if (this.closing || !this.enable_offline_queue) {
@ -707,7 +721,8 @@ RedisClient.prototype.send_command = function (command, args, callback) {
this.offline_queue.push(command_obj);
this.should_buffer = true;
}
return;
// Return false to signal buffering
return false;
}
if (command === 'subscribe' || command === 'psubscribe' || command === 'unsubscribe' || command === 'punsubscribe') {
@ -720,13 +735,11 @@ RedisClient.prototype.send_command = function (command, args, callback) {
err = new Error('Connection in subscriber mode, only subscriber commands may be used');
err.command = command.toUpperCase();
this.emit('error', err);
return;
return true;
}
this.command_queue.push(command_obj);
this.commands_sent += 1;
elem_count = args.length + 1;
if (typeof this.options.rename_commands !== 'undefined' && this.options.rename_commands[command]) {
command = this.options.rename_commands[command];
}
@ -734,7 +747,7 @@ RedisClient.prototype.send_command = function (command, args, callback) {
// Always use 'Multi bulk commands', but if passed any Buffer args, then do multiple writes, one for each arg.
// This means that using Buffers in commands is going to be slower, so use Strings if you don't already have a Buffer.
command_str = '*' + elem_count + '\r\n$' + command.length + '\r\n' + command + '\r\n';
command_str = '*' + (args.length + 1) + '\r\n$' + command.length + '\r\n' + command + '\r\n';
if (!buffer_args) { // Build up a string and send entire command in one write
for (i = 0; i < args.length; i += 1) {
@ -745,40 +758,64 @@ RedisClient.prototype.send_command = function (command, args, callback) {
command_str += '$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n';
}
debug('Send ' + this.address + ' id ' + this.connection_id + ': ' + command_str);
buffered_writes += !stream.write(command_str);
buffered_writes += !this.write(command_str);
} else {
debug('Send command (' + command_str + ') has Buffer arguments');
buffered_writes += !stream.write(command_str);
buffered_writes += !this.write(command_str);
for (i = 0; i < args.length; i += 1) {
arg = args[i];
if (!(Buffer.isBuffer(arg) || typeof arg === 'string')) {
arg = String(arg);
}
if (Buffer.isBuffer(arg)) {
if (arg.length === 0) {
debug('send_command: using empty string for 0 length buffer');
buffered_writes += !stream.write('$0\r\n\r\n');
buffered_writes += !this.write('$0\r\n\r\n');
} else {
buffered_writes += !stream.write('$' + arg.length + '\r\n');
buffered_writes += !stream.write(arg);
buffered_writes += !stream.write('\r\n');
buffered_writes += !this.write('$' + arg.length + '\r\n');
buffered_writes += !this.write(arg);
buffered_writes += !this.write('\r\n');
debug('send_command: buffer send ' + arg.length + ' bytes');
}
} else {
if (typeof arg !== 'string') {
arg = String(arg);
}
debug('send_command: string send ' + Buffer.byteLength(arg) + ' bytes: ' + arg);
buffered_writes += !stream.write('$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n');
buffered_writes += !this.write('$' + Buffer.byteLength(arg) + '\r\n' + arg + '\r\n');
}
}
}
debug('send_command buffered_writes: ' + buffered_writes, ' should_buffer: ' + this.should_buffer);
if (buffered_writes || this.command_queue.length >= this.command_queue_high_water) {
if (buffered_writes !== 0 || this.command_queue.length >= this.command_queue_high_water) {
debug('send_command buffered_writes: ' + buffered_writes, ' should_buffer: ' + this.should_buffer);
this.should_buffer = true;
}
return !this.should_buffer;
};
RedisClient.prototype.write = function (data) {
return this.stream.write(data);
};
RedisClient.prototype.writeStream = function (data) {
var nr = 0;
if (this.pipeline === 0) {
return this.stream.write(data);
}
this.pipeline--;
if (this.pipeline === 0) {
var command;
while (command = this.pipeline_queue.shift()) {
nr += !this.stream.write(command);
}
nr += !this.stream.write(data);
return !nr;
}
this.pipeline_queue.push(data);
return true;
};
RedisClient.prototype.pub_sub_command = function (command_obj) {
var i, key, command, args;
@ -832,9 +869,14 @@ RedisClient.prototype.end = function (flush) {
return this.stream.destroySoon();
};
function Multi(client, args) {
function Multi(client, args, transaction) {
client.stream.cork();
this._client = client;
this.queue = [['multi']];
this.queue = new Queue();
if (transaction) {
this.exec = this.exec_transaction;
this.EXEC = this.exec_transaction;
}
var command, tmp_args;
if (Array.isArray(args)) {
while (tmp_args = args.shift()) {
@ -850,7 +892,11 @@ function Multi(client, args) {
}
RedisClient.prototype.multi = RedisClient.prototype.MULTI = function (args) {
return new Multi(this, args);
return new Multi(this, args, true);
};
RedisClient.prototype.batch = RedisClient.prototype.BATCH = function (args) {
return new Multi(this, args, false);
};
commands.forEach(function (fullCommand) {
@ -861,7 +907,7 @@ commands.forEach(function (fullCommand) {
return;
}
RedisClient.prototype[command] = function (key, arg, callback) {
RedisClient.prototype[command.toUpperCase()] = RedisClient.prototype[command] = function (key, arg, callback) {
if (Array.isArray(key)) {
return this.send_command(command, key, arg);
}
@ -879,9 +925,8 @@ commands.forEach(function (fullCommand) {
}
return this.send_command(command, utils.to_array(arguments));
};
RedisClient.prototype[command.toUpperCase()] = RedisClient.prototype[command];
Multi.prototype[command] = function (key, arg, callback) {
Multi.prototype[command.toUpperCase()] = Multi.prototype[command] = function (key, arg, callback) {
if (Array.isArray(key)) {
if (arg) {
key = key.concat([arg]);
@ -905,14 +950,12 @@ commands.forEach(function (fullCommand) {
}
return this;
};
Multi.prototype[command.toUpperCase()] = Multi.prototype[command];
});
// store db in this.select_db to restore it on reconnect
RedisClient.prototype.select = RedisClient.prototype.SELECT = function (db, callback) {
var self = this;
this.send_command('select', [db], function (err, res) {
return this.send_command('select', [db], function (err, res) {
if (err === null) {
self.selected_db = db;
}
@ -930,20 +973,22 @@ RedisClient.prototype.auth = RedisClient.prototype.AUTH = function (pass, callba
var err = new Error('The password has to be of type "string"');
err.command = 'AUTH';
if (callback) {
callback(err);
setImmediate(function () {
callback(err);
});
} else {
this.emit('error', err);
}
return;
return true;
}
this.auth_pass = pass;
debug('Saving auth as ' + this.auth_pass);
// Only run the callback once. So do not safe it if already connected
if (this.connected) {
this.send_command('auth', [this.auth_pass], callback);
} else {
this.auth_callback = callback;
return this.send_command('auth', [this.auth_pass], callback);
}
this.auth_callback = callback;
return true;
};
RedisClient.prototype.hmset = RedisClient.prototype.HMSET = function (key, args, callback) {
@ -1013,43 +1058,62 @@ Multi.prototype.send_command = function (command, args, index, cb) {
if (cb) {
cb(err);
}
err.position = index - 1;
err.position = index;
self.errors.push(err);
}
});
};
Multi.prototype.exec = Multi.prototype.EXEC = function (callback) {
Multi.prototype.exec_atomic = function (callback) {
if (this.queue.length < 2) {
return this.exec_batch(callback);
}
return this.exec(callback);
};
Multi.prototype.exec_transaction = function (callback) {
var self = this;
var len = this.queue.length;
var cb;
this.errors = [];
this.callback = callback;
this.wants_buffers = new Array(this.queue.length);
this._client.pipeline = len + 2;
this.wants_buffers = new Array(len);
this.send_command('multi', []);
// drain queue, callback will catch 'QUEUED' or error
for (var index = 0; index < this.queue.length; index++) {
var args = this.queue[index].slice(0);
for (var index = 0; index < len; index++) {
var args = this.queue.get(index).slice(0);
var command = args.shift();
var cb;
if (typeof args[args.length - 1] === 'function') {
cb = args.pop();
} else {
cb = undefined;
}
// Keep track of who wants buffer responses:
this.wants_buffers[index] = false;
for (var i = 0; i < args.length; i += 1) {
if (Buffer.isBuffer(args[i])) {
this.wants_buffers[index] = true;
break;
if (this._client.options.return_buffers) {
this.wants_buffers[index] = true;
} else if (!this._client.options.detect_buffers) {
this.wants_buffers[index] = false;
} else {
this.wants_buffers[index] = false;
for (var i = 0; i < args.length; i += 1) {
if (Buffer.isBuffer(args[i])) {
this.wants_buffers[index] = true;
break;
}
}
}
this.send_command(command, args, index, cb);
}
this._client.send_command('exec', [], function(err, replies) {
this._client.stream.uncork();
return this._client.send_command('exec', [], function(err, replies) {
self.execute_callback(err, replies);
});
};
Multi.prototype.execute_callback = function (err, replies) {
var i, args;
var i = 0, args;
if (err) {
if (err.code !== 'CONNECTION_BROKEN') {
@ -1065,9 +1129,7 @@ Multi.prototype.execute_callback = function (err, replies) {
}
if (replies) {
for (i = 0; i < this.queue.length - 1; i += 1) {
args = this.queue[i + 1];
while (args = this.queue.shift()) {
// If we asked for strings, even in detect_buffers mode, then return strings:
if (replies[i] instanceof Error) {
var match = replies[i].message.match(utils.errCode);
@ -1077,7 +1139,7 @@ Multi.prototype.execute_callback = function (err, replies) {
}
replies[i].command = args[0].toUpperCase();
} else if (replies[i]) {
if (this._client.options.detect_buffers && this.wants_buffers[i + 1] === false) {
if (this.wants_buffers[i] === false) {
replies[i] = utils.reply_to_strings(replies[i]);
}
if (args[0] === 'hgetall') {
@ -1093,6 +1155,7 @@ Multi.prototype.execute_callback = function (err, replies) {
args[args.length - 1](null, replies[i]);
}
}
i++;
}
}
@ -1101,6 +1164,61 @@ Multi.prototype.execute_callback = function (err, replies) {
}
};
Multi.prototype.callback = function (cb, command, i) {
var self = this;
return function (err, res) {
if (err) {
self.results[i] = err;
} else {
self.results[i] = res;
}
if (cb) {
cb(err, res);
}
// Do not emit an error here. Otherwise each error would result in one emit.
// The errors will be returned in the result anyway
};
};
Multi.prototype.exec = Multi.prototype.EXEC = Multi.prototype.exec_batch = function (callback) {
var len = this.queue.length;
var self = this;
var index = 0;
var args;
if (len === 0) {
if (callback) {
setImmediate(function () {
callback(null, []);
});
}
return true;
}
this.results = new Array(len);
this._client.pipeline = len;
var lastCallback = function (cb) {
return function (err, res) {
cb(err, res);
callback(null, self.results);
};
};
while (args = this.queue.shift()) {
var command = args.shift();
var cb;
if (typeof args[args.length - 1] === 'function') {
cb = this.callback(args.pop(), command, index);
} else {
cb = this.callback(undefined, command, index);
}
if (callback && index === len - 1) {
cb = lastCallback(cb);
}
this._client.send_command(command, args, cb);
index++;
}
this._client.stream.uncork();
return this._client.should_buffer;
};
var createClient_unix = function (path, options){
var cnxOptions = {
path: path

14
lib/parsers/javascript.js

@ -2,9 +2,8 @@
var util = require('util');
function ReplyParser(return_buffers) {
function ReplyParser() {
this.name = exports.name;
this.return_buffers = return_buffers;
this._buffer = new Buffer(0);
this._offset = 0;
@ -37,10 +36,8 @@ ReplyParser.prototype._parseResult = function (type) {
if (type === 45) {
return new Error(this._buffer.toString(this._encoding, start, end));
} else if (this.return_buffers) {
return this._buffer.slice(start, end);
}
return this._buffer.toString(this._encoding, start, end);
return this._buffer.slice(start, end);
} else if (type === 58) { // :
// up to the delimiter
end = this._packetEndOffset() - 1;
@ -77,15 +74,12 @@ ReplyParser.prototype._parseResult = function (type) {
// set the offset to after the delimiter
this._offset = end + 2;
if (this.return_buffers) {
return this._buffer.slice(start, end);
}
return this._buffer.toString(this._encoding, start, end);
return this._buffer.slice(start, end);
} else if (type === 42) { // *
offset = this._offset;
packetHeader = this.parseHeader();
if (packetHeader < 0) {
if (packetHeader === -1) {
return null;
}

61
lib/queue.js

@ -1,61 +0,0 @@
'use strict';
// Queue class adapted from Tim Caswell's pattern library
// http://github.com/creationix/pattern/blob/master/lib/pattern/queue.js
function Queue() {
this.tail = [];
this.head = [];
this.offset = 0;
}
Queue.prototype.shift = function () {
if (this.offset === this.head.length) {
var tmp = this.head;
tmp.length = 0;
this.head = this.tail;
this.tail = tmp;
this.offset = 0;
if (this.head.length === 0) {
return;
}
}
var item = this.head[this.offset];
this.head[this.offset] = null;
this.offset++;
return item;
};
Queue.prototype.push = function (item) {
return this.tail.push(item);
};
Queue.prototype.forEach = function (fn, thisv) {
var array = this.head.slice(this.offset), i, il;
array.push.apply(array, this.tail);
if (thisv) {
for (i = 0, il = array.length; i < il; i += 1) {
fn.call(thisv, array[i], i, array);
}
} else {
for (i = 0, il = array.length; i < il; i += 1) {
fn(array[i], i, array);
}
}
return array;
};
Queue.prototype.getLength = function () {
return this.head.length - this.offset + this.tail.length;
};
Object.defineProperty(Queue.prototype, 'length', {
get: function () {
return this.getLength();
}
});
module.exports = Queue;

7
lib/utils.js

@ -1,6 +1,6 @@
'use strict';
// hgetall converts its replies to an Object. If the reply is empty, null is returned.
// hgetall converts its replies to an Object. If the reply is empty, null is returned.
function replyToObject(reply) {
var obj = {}, j, jl, key, val;
@ -25,11 +25,12 @@ function replyToStrings(reply) {
}
if (Array.isArray(reply)) {
var res = new Array(reply.length);
for (i = 0; i < reply.length; i++) {
// Recusivly call the function as slowlog returns deep nested replies
reply[i] = replyToStrings(reply[i]);
res[i] = replyToStrings(reply[i]);
}
return reply;
return res;
}
return reply;

3
package.json

@ -17,6 +17,9 @@
"pretest": "optional-dev-dependency hiredis",
"posttest": "jshint ."
},
"dependencies": {
"double-ended-queue": "^2.1.0-0"
},
"devDependencies": {
"coveralls": "^2.11.2",
"jshint": "^2.8.0",

3
test/auth.spec.js

@ -128,12 +128,15 @@ describe("client authentication", function () {
if (helper.redisProcess().spawnFailed()) this.skip();
client = redis.createClient.apply(redis.createClient, args);
var async = true;
client.auth(undefined, function(err, res) {
assert.strictEqual(err.message, 'The password has to be of type "string"');
assert.strictEqual(err.command, 'AUTH');
assert.strictEqual(res, undefined);
async = false;
done();
});
assert(async);
});
it('should emit an error if the password is not of type string and no callback has been provided', function (done) {

323
test/batch.spec.js

@ -0,0 +1,323 @@
'use strict';
var assert = require('assert');
var config = require("./lib/config");
var helper = require('./helper');
var redis = config.redis;
var uuid = require('uuid');
describe("The 'batch' method", function () {
helper.allTests(function(parser, ip, args) {
describe("using " + parser + " and " + ip, function () {
var key, value;
beforeEach(function () {
key = uuid.v4();
value = uuid.v4();
});
describe("when not connected", function () {
var client;
beforeEach(function (done) {
client = redis.createClient.apply(redis.createClient, args);
client.once("connect", function () {
client.quit();
});
client.on('end', function () {
return done();
});
});
it("returns an empty array", function (done) {
var batch = client.batch();
batch.exec(function (err, res) {
assert.strictEqual(err, null);
assert.strictEqual(res.length, 0);
done();
});
});
it("returns an empty array if promisified", function () {
return client.batch().execAsync().then(function(res) {
assert.strictEqual(res.length, 0);
});
});
});
describe("when connected", function () {
var client;
beforeEach(function (done) {
client = redis.createClient.apply(redis.createClient, args);
client.once("ready", function () {
client.flushdb(function (err) {
return done(err);
});
});
});
afterEach(function () {
client.end();
});
it("returns an empty result array", function (done) {
var batch = client.batch();
var async = true;
var notBuffering = batch.exec(function (err, res) {
assert.strictEqual(err, null);
assert.strictEqual(res.length, 0);
async = false;
done();
});
assert(async);
assert.strictEqual(notBuffering, true);
});
it('fail individually when one command fails using chaining notation', function (done) {
var batch1, batch2;
batch1 = client.batch();
batch1.mset("batchfoo", "10", "batchbar", "20", helper.isString("OK"));
// Provoke an error at queue time
batch1.set("foo2", helper.isError());
batch1.incr("batchfoo");
batch1.incr("batchbar");
batch1.exec(function () {
// Confirm that the previous command, while containing an error, still worked.
batch2 = client.batch();
batch2.get('foo2', helper.isNull());
batch2.incr("batchbar", helper.isNumber(22));
batch2.incr("batchfoo", helper.isNumber(12));
batch2.exec(function (err, replies) {
assert.strictEqual(null, replies[0]);
assert.strictEqual(22, replies[1]);
assert.strictEqual(12, replies[2]);
return done();
});
});
});
it('fail individually when one command fails and emit the error if no callback has been provided', function (done) {
var batch1;
client.on('error', function (err) {
done(err);
});
batch1 = client.batch();
batch1.mset("batchfoo", "10", "batchbar", "20", helper.isString("OK"));
// Provoke an error at queue time
batch1.set("foo2");
batch1.incr("batchfoo");
batch1.incr("batchbar");
batch1.exec(function (err, res) {
assert.strictEqual(res[1].command, 'SET');
assert.strictEqual(res[1].code, 'ERR');
done();
});
});
it('fail individually when one command in an array of commands fails', function (done) {
// test nested batch-bulk replies
client.batch([
["mget", "batchfoo", "batchbar", function (err, res) {
assert.strictEqual(2, res.length);
assert.strictEqual(0, +res[0]);
assert.strictEqual(0, +res[1]);
}],
["set", "foo2", helper.isError()],
["incr", "batchfoo"],
["incr", "batchbar"]
]).exec(function (err, replies) {
assert.strictEqual(2, replies[0].length);
assert.strictEqual(null, replies[0][0]);
assert.strictEqual(null, replies[0][1]);
assert.strictEqual('SET', replies[1].command);
assert.strictEqual("1", replies[2].toString());
assert.strictEqual("1", replies[3].toString());
return done();
});
});
it('handles batchple operations being applied to a set', function (done) {
client.sadd("some set", "mem 1");
client.sadd(["some set", "mem 2"]);
client.sadd("some set", "mem 3");
client.sadd("some set", "mem 4");
// make sure empty mb reply works
client.del("some missing set");
client.smembers("some missing set", function (err, reply) {
// make sure empty mb reply works
assert.strictEqual(0, reply.length);
});
// test nested batch-bulk replies with empty mb elements.
client.BATCH([
["smembers", ["some set"]],
["del", "some set"],
["smembers", "some set"]
])
.scard("some set")
.exec(function (err, replies) {
assert.strictEqual(4, replies[0].length);
assert.strictEqual(0, replies[2].length);
return done();
});
});
it('allows batchple operations to be performed using constructor with all kinds of syntax', function (done) {
var now = Date.now();
var arr = ["batchhmset", "batchbar", "batchbaz"];
var arr2 = ['some manner of key', 'otherTypes'];
var arr3 = [5768, "batchbarx", "batchfoox"];
var arr4 = ["mset", [578, "batchbar"], helper.isString('OK')];
client.batch([
arr4,
[["mset", "batchfoo2", "batchbar2", "batchfoo3", "batchbar3"], helper.isString('OK')],
["hmset", arr],
[["hmset", "batchhmset2", "batchbar2", "batchfoo3", "batchbar3", "test", helper.isString('OK')]],
["hmset", ["batchhmset", "batchbar", "batchfoo", helper.isString('OK')]],
["hmset", arr3, helper.isString('OK')],
['hmset', now, {123456789: "abcdefghij", "some manner of key": "a type of value", "otherTypes": 555}],
['hmset', 'key2', {"0123456789": "abcdefghij", "some manner of key": "a type of value", "otherTypes": 999}, helper.isString('OK')],
["HMSET", "batchhmset", ["batchbar", "batchbaz"]],
["hmset", "batchhmset", ["batchbar", "batchbaz"], helper.isString('OK')],
])
.hmget(now, 123456789, 'otherTypes')
.hmget('key2', arr2, function noop() {})
.hmget(['batchhmset2', 'some manner of key', 'batchbar3'])
.mget('batchfoo2', ['batchfoo3', 'batchfoo'], function(err, res) {
assert(res[0], 'batchfoo3');
assert(res[1], 'batchfoo');
})
.exec(function (err, replies) {
assert.equal(arr.length, 3);
assert.equal(arr2.length, 2);
assert.equal(arr3.length, 3);
assert.equal(arr4.length, 3);
assert.strictEqual(null, err);
assert.equal(replies[10][1], '555');
assert.equal(replies[11][0], 'a type of value');
assert.strictEqual(replies[12][0], null);
assert.equal(replies[12][1], 'test');
assert.equal(replies[13][0], 'batchbar2');
assert.equal(replies[13].length, 3);
assert.equal(replies.length, 14);
return done();
});
});
it('converts a non string key to a string', function(done) {
// TODO: Converting the key might change soon again.
client.batch().hmset(true, {
test: 123,
bar: 'baz'
}).exec(done);
});
it('runs a batch without any further commands', function(done) {
var buffering = client.batch().exec(function(err, res) {
assert.strictEqual(err, null);
assert.strictEqual(res.length, 0);
done();
});
assert(typeof buffering === 'boolean');
});
it('runs a batch without any further commands and without callback', function() {
var buffering = client.batch().exec();
assert.strictEqual(buffering, true);
});
it('allows batchple operations to be performed using a chaining API', function (done) {
client.batch()
.mset('some', '10', 'keys', '20')
.incr('some')
.incr('keys')
.mget('some', 'keys')
.exec(function (err, replies) {
assert.strictEqual(null, err);
assert.equal('OK', replies[0]);
assert.equal(11, replies[1]);
assert.equal(21, replies[2]);
assert.equal(11, replies[3][0].toString());
assert.equal(21, replies[3][1].toString());
return done();
});
});
it('allows batchple commands to work the same as normal to be performed using a chaining API', function (done) {
client.batch()
.mset(['some', '10', 'keys', '20'])
.incr(['some', helper.isNumber(11)])
.incr(['keys'], helper.isNumber(21))
.mget('some', 'keys')
.exec(function (err, replies) {
assert.strictEqual(null, err);
assert.equal('OK', replies[0]);
assert.equal(11, replies[1]);
assert.equal(21, replies[2]);
assert.equal(11, replies[3][0].toString());
assert.equal(21, replies[3][1].toString());
return done();
});
});
it('allows batchple commands to work the same as normal to be performed using a chaining API promisified', function () {
return client.batch()
.mset(['some', '10', 'keys', '20'])
.incr(['some', helper.isNumber(11)])
.incr(['keys'], helper.isNumber(21))
.mget('some', 'keys')
.execAsync()
.then(function (replies) {
assert.equal('OK', replies[0]);
assert.equal(11, replies[1]);
assert.equal(21, replies[2]);
assert.equal(11, replies[3][0].toString());
assert.equal(21, replies[3][1].toString());
});
});
it('allows an array to be provided indicating batchple operations to perform', function (done) {
// test nested batch-bulk replies with nulls.
client.batch([
["mget", ["batchfoo", "some", "random value", "keys"]],
["incr", "batchfoo"]
])
.exec(function (err, replies) {
assert.strictEqual(replies.length, 2);
assert.strictEqual(replies[0].length, 4);
return done();
});
});
it('allows multiple operations to be performed on a hash', function (done) {
client.batch()
.hmset("batchhash", "a", "foo", "b", 1)
.hmset("batchhash", {
extra: "fancy",
things: "here"
})
.hgetall("batchhash")
.exec(done);
});
it("should work without any callback", function (done) {
helper.serverVersionAtLeast.call(this, client, [2, 6, 5]);
var batch = client.batch();
batch.set("baz", "binary");
batch.set("foo", "bar");
batch.exec();
client.get('foo', helper.isString('bar', done));
});
});
});
});
});

57
test/commands/multi.spec.js

@ -32,11 +32,12 @@ describe("The 'multi' method", function () {
});
it("reports an error", function (done) {
client.multi();
client.exec(function (err, res) {
var multi = client.multi();
var notBuffering = multi.exec(function (err, res) {
assert(err.message.match(/The connection has already been closed/));
done();
});
assert.strictEqual(notBuffering, false);
});
it("reports an error if promisified", function () {
@ -51,7 +52,7 @@ describe("The 'multi' method", function () {
beforeEach(function (done) {
client = redis.createClient.apply(redis.createClient, args);
client.once("connect", function () {
client.once("ready", function () {
client.flushdb(function (err) {
return done(err);
});
@ -62,6 +63,16 @@ describe("The 'multi' method", function () {
client.end();
});
it("returns an empty result array", function (done) {
var multi = client.multi();
var notBuffering = multi.exec(function (err, res) {
assert.strictEqual(err, null);
assert.strictEqual(res.length, 0);
done();
});
assert.strictEqual(notBuffering, true);
});
it('roles back a transaction when one command in a sequence of commands fails', function (done) {
var multi1, multi2;
var expected = helper.serverVersionAtLeast(client, [2, 6, 5]) ? helper.isError() : function () {};
@ -203,11 +214,12 @@ describe("The 'multi' method", function () {
});
it('runs a multi without any further commands', function(done) {
client.multi().exec(function(err, res) {
var buffering = client.multi().exec(function(err, res) {
assert.strictEqual(err, null);
assert.strictEqual(res.length, 0);
done();
});
assert(typeof buffering === 'boolean');
});
it('allows multiple operations to be performed using a chaining API', function (done) {
@ -215,7 +227,7 @@ describe("The 'multi' method", function () {
.mset('some', '10', 'keys', '20')
.incr('some')
.incr('keys')
.mget('some', 'keys')
.mget('some', ['keys'])
.exec(function (err, replies) {
assert.strictEqual(null, err);
assert.equal('OK', replies[0]);
@ -373,6 +385,41 @@ describe("The 'multi' method", function () {
client.get('foo', helper.isString('bar', done));
});
it("should not use a transaction with exec_atomic if only no command is used", function () {
var multi = client.multi();
var test = false;
multi.exec_batch = function () {
test = true;
};
multi.exec_atomic();
assert(test);
});
it("should not use a transaction with exec_atomic if only one command is used", function () {
var multi = client.multi();
var test = false;
multi.exec_batch = function () {
test = true;
};
multi.set("baz", "binary");
multi.exec_atomic();
assert(test);
});
it("should use transaction with exec_atomic and more than one command used", function (done) {
helper.serverVersionAtLeast.call(this, client, [2, 6, 5]);
var multi = client.multi();
var test = false;
multi.exec_batch = function () {
test = true;
};
multi.set("baz", "binary");
multi.get('baz');
multi.exec_atomic(done);
assert(!test);
});
});
});
});

8
test/commands/select.spec.js

@ -24,10 +24,11 @@ describe("The 'select' method", function () {
});
it("returns an error if redis is not connected", function (done) {
client.select(1, function (err, res) {
var buffering = client.select(1, function (err, res) {
assert(err.message.match(/The connection has already been closed/));
done();
});
assert(typeof buffering === 'boolean');
});
});
@ -36,7 +37,7 @@ describe("The 'select' method", function () {
beforeEach(function (done) {
client = redis.createClient.apply(redis.createClient, args);
client.once("connect", function () { done(); });
client.once("ready", function () { done(); });
});
afterEach(function () {
@ -46,11 +47,12 @@ describe("The 'select' method", function () {
it("changes the database and calls the callback", function (done) {
// default value of null means database 0 will be used.
assert.strictEqual(client.selected_db, null, "default db should be null");
client.SELECT(1, function (err, res) {
var buffering = client.SELECT(1, function (err, res) {
helper.isNotError()(err, res);
assert.strictEqual(client.selected_db, 1, "db should be 1 after select");
done();
});
assert(typeof buffering === 'boolean');
});
describe("and a callback is specified", function () {

3
test/commands/setex.spec.js

@ -29,7 +29,8 @@ describe("The 'setex' method", function () {
});
it('returns an error if no value is provided', function (done) {
client.SETEX(["setex key", "100", undefined], helper.isError(done));
var buffering = client.SETEX(["setex key", "100", undefined], helper.isError(done));
assert(typeof buffering === 'boolean');
});
afterEach(function () {

60
test/detect_buffers.spec.js

@ -79,6 +79,27 @@ describe("detect_buffers", function () {
});
});
describe('batch.hget', function () {
it('can interleave string and buffer results', function (done) {
client.batch()
.hget("hash key 2", "key 1")
.hget(new Buffer("hash key 2"), "key 1")
.hget("hash key 2", new Buffer("key 2"))
.hget("hash key 2", "key 2")
.exec(function (err, reply) {
assert.strictEqual(true, Array.isArray(reply));
assert.strictEqual(4, reply.length);
assert.strictEqual("val 1", reply[0]);
assert.strictEqual(true, Buffer.isBuffer(reply[1]));
assert.strictEqual("<Buffer 76 61 6c 20 31>", reply[1].inspect());
assert.strictEqual(true, Buffer.isBuffer(reply[2]));
assert.strictEqual("<Buffer 76 61 6c 20 32>", reply[2].inspect());
assert.strictEqual("val 2", reply[3]);
return done(err);
});
});
});
describe('hmget', function () {
describe('first argument is a string', function () {
it('returns strings for keys requested', function (done) {
@ -149,6 +170,19 @@ describe("detect_buffers", function () {
return done(err);
});
});
it("returns buffers for keys requested in .batch", function (done) {
client.batch().hmget(new Buffer("hash key 2"), "key 1", "key 2").exec(function (err, reply) {
assert.strictEqual(true, Array.isArray(reply));
assert.strictEqual(1, reply.length);
assert.strictEqual(2, reply[0].length);
assert.strictEqual(true, Buffer.isBuffer(reply[0][0]));
assert.strictEqual(true, Buffer.isBuffer(reply[0][1]));
assert.strictEqual("<Buffer 76 61 6c 20 31>", reply[0][0].inspect());
assert.strictEqual("<Buffer 76 61 6c 20 32>", reply[0][1].inspect());
return done(err);
});
});
});
});
@ -174,6 +208,17 @@ describe("detect_buffers", function () {
return done(err);
});
});
it('returns string values when executed in .batch', function (done) {
client.batch().hgetall("hash key 2").exec(function (err, reply) {
assert.strictEqual(1, reply.length);
assert.strictEqual("object", typeof reply[0]);
assert.strictEqual(2, Object.keys(reply[0]).length);
assert.strictEqual("val 1", reply[0]["key 1"]);
assert.strictEqual("val 2", reply[0]["key 2"]);
return done(err);
});
});
});
describe('first argument is a buffer', function () {
@ -193,7 +238,20 @@ describe("detect_buffers", function () {
it('returns buffer values when executed in transaction', function (done) {
client.multi().hgetall(new Buffer("hash key 2")).exec(function (err, reply) {
assert.strictEqual(1, reply.length);
assert.strictEqual("object", typeof reply);
assert.strictEqual("object", typeof reply[0]);
assert.strictEqual(2, Object.keys(reply[0]).length);
assert.strictEqual(true, Buffer.isBuffer(reply[0]["key 1"]));
assert.strictEqual(true, Buffer.isBuffer(reply[0]["key 2"]));
assert.strictEqual("<Buffer 76 61 6c 20 31>", reply[0]["key 1"].inspect());
assert.strictEqual("<Buffer 76 61 6c 20 32>", reply[0]["key 2"].inspect());
return done(err);
});
});
it('returns buffer values when executed in .batch', function (done) {
client.batch().hgetall(new Buffer("hash key 2")).exec(function (err, reply) {
assert.strictEqual(1, reply.length);
assert.strictEqual("object", typeof reply[0]);
assert.strictEqual(2, Object.keys(reply[0]).length);
assert.strictEqual(true, Buffer.isBuffer(reply[0]["key 1"]));
assert.strictEqual(true, Buffer.isBuffer(reply[0]["key 2"]));

4
test/parser/javascript.spec.js

@ -3,13 +3,15 @@
var assert = require('assert');
var Parser = require("../../lib/parsers/javascript").Parser;
var config = require("../lib/config");
var utils = require("../../lib/utils");
var redis = config.redis;
describe('javascript parser', function () {
it('handles multi-bulk reply', function (done) {
var parser = new Parser(false);
var parser = new Parser();
var reply_count = 0;
function check_reply(reply) {
reply = utils.reply_to_strings(reply);
assert.deepEqual(reply, [['a']], "Expecting multi-bulk reply of [['a']]");
reply_count++;
}

39
test/queue.spec.js

@ -1,39 +0,0 @@
'use strict';
var assert = require("assert");
var Queue = require('../lib/queue');
describe('queue', function () {
var q = new Queue();
describe('push', function () {
it('places values on end of queue', function () {
q.push('a');
q.push(3);
assert.equal(q.length, 2);
});
});
describe('shift', function () {
it('removes values from front of queue', function () {
assert.equal(q.shift(), 'a');
});
});
describe('forEach', function () {
it('iterates over values in queue', function () {
q.forEach(function (v) {
assert.equal(v, 3);
});
});
});
describe('forEachWithScope', function () {
it('provides a scope to the iteration function', function () {
q.forEach(function (v) {
assert.equal(this.foo, 'bar');
assert.equal(v, 3);
}, {foo: 'bar'});
});
});
});

49
test/rename.spec.js

@ -17,13 +17,7 @@ describe("rename commands", function () {
describe("using " + parser + " and " + ip, function () {
var client = null;
afterEach(function () {
client.end();
});
it("allows to use renamed functions", function (done) {
if (helper.redisProcess().spawnFailed()) this.skip();
beforeEach(function(done) {
client = redis.createClient({
rename_commands: {
set: '807081f5afa96845a02816a28b7258c3',
@ -31,6 +25,18 @@ describe("rename commands", function () {
}
});
client.on('ready', function () {
done();
});
});
afterEach(function () {
client.end();
});
it("allows to use renamed functions", function (done) {
if (helper.redisProcess().spawnFailed()) this.skip();
client.set('key', 'value', function(err, reply) {
assert.strictEqual(reply, 'OK');
});
@ -48,16 +54,26 @@ describe("rename commands", function () {
});
});
it("should also work with multi", function (done) {
it("should also work with batch", function (done) {
if (helper.redisProcess().spawnFailed()) this.skip();
client = redis.createClient({
rename_commands: {
SET: '807081f5afa96845a02816a28b7258c3',
getrange: '9e3102b15cf231c4e9e940f284744fe0'
}
client.batch([['set', 'key', 'value']]).exec(function (err, res) {
assert.strictEqual(res[0], 'OK');
});
var batch = client.batch();
batch.getrange('key', 1, -1);
batch.exec(function (err, res) {
assert(!err);
assert.strictEqual(res.length, 1);
assert.strictEqual(res[0], 'alue');
done();
});
});
it("should also work with multi", function (done) {
if (helper.redisProcess().spawnFailed()) this.skip();
client.multi([['set', 'key', 'value']]).exec(function (err, res) {
assert.strictEqual(res[0], 'OK');
});
@ -75,13 +91,6 @@ describe("rename commands", function () {
it("should also work with multi and abort transaction", function (done) {
if (helper.redisProcess().spawnFailed()) this.skip();
client = redis.createClient({
rename_commands: {
SET: '807081f5afa96845a02816a28b7258c3',
getrange: '9e3102b15cf231c4e9e940f284744fe0'
}
});
var multi = client.multi();
multi.get('key');
multi.getrange('key', 1, -1, function(err, reply) {

232
test/return_buffers.spec.js

@ -0,0 +1,232 @@
'use strict';
var assert = require("assert");
var config = require("./lib/config");
var helper = require('./helper');
var redis = config.redis;
describe("return_buffers", function () {
helper.allTests(function(parser, ip) {
describe("using " + parser + " and " + ip, function () {
var client;
var args = config.configureClient(parser, ip, {
return_buffers: true,
detect_buffers: true
});
beforeEach(function (done) {
client = redis.createClient.apply(redis.createClient, args);
if (args[2].detect_buffers) {
args[2].detect_buffers = false;
}
client.once("error", done);
client.once("connect", function () {
client.flushdb(function (err) {
client.hmset("hash key 2", "key 1", "val 1", "key 2", "val 2");
client.set("string key 1", "string value");
return done(err);
});
});
});
describe('get', function () {
describe('first argument is a string', function () {
it('returns a buffer', function (done) {
client.get("string key 1", function (err, reply) {
assert.strictEqual(true, Buffer.isBuffer(reply));
assert.strictEqual("<Buffer 73 74 72 69 6e 67 20 76 61 6c 75 65>", reply.inspect());
return done(err);
});
});
it('returns a bufffer when executed as part of transaction', function (done) {
client.multi().get("string key 1").exec(function (err, reply) {
assert.strictEqual(1, reply.length);
assert.strictEqual(true, Buffer.isBuffer(reply[0]));
assert.strictEqual("<Buffer 73 74 72 69 6e 67 20 76 61 6c 75 65>", reply[0].inspect());
return done(err);
});
});
});
});
describe('multi.hget', function () {
it('returns buffers', function (done) {
client.multi()
.hget("hash key 2", "key 1")
.hget(new Buffer("hash key 2"), "key 1")
.hget("hash key 2", new Buffer("key 2"))
.hget("hash key 2", "key 2")
.exec(function (err, reply) {
assert.strictEqual(true, Array.isArray(reply));
assert.strictEqual(4, reply.length);
assert.strictEqual("<Buffer 76 61 6c 20 31>", reply[0].inspect());
assert.strictEqual(true, Buffer.isBuffer(reply[1]));
assert.strictEqual("<Buffer 76 61 6c 20 31>", reply[1].inspect());
assert.strictEqual(true, Buffer.isBuffer(reply[2]));
assert.strictEqual("<Buffer 76 61 6c 20 32>", reply[2].inspect());
assert.strictEqual(true, Buffer.isBuffer(reply[3]));
assert.strictEqual("<Buffer 76 61 6c 20 32>", reply[3].inspect());
return done(err);
});
});
});
describe('batch.hget', function () {
it('returns buffers', function (done) {
client.batch()
.hget("hash key 2", "key 1")
.hget(new Buffer("hash key 2"), "key 1")
.hget("hash key 2", new Buffer("key 2"))
.hget("hash key 2", "key 2")
.exec(function (err, reply) {
assert.strictEqual(true, Array.isArray(reply));
assert.strictEqual(4, reply.length);
assert.strictEqual("<Buffer 76 61 6c 20 31>", reply[0].inspect());
assert.strictEqual(true, Buffer.isBuffer(reply[1]));
assert.strictEqual("<Buffer 76 61 6c 20 31>", reply[1].inspect());
assert.strictEqual(true, Buffer.isBuffer(reply[2]));
assert.strictEqual("<Buffer 76 61 6c 20 32>", reply[2].inspect());
assert.strictEqual(true, Buffer.isBuffer(reply[3]));
assert.strictEqual("<Buffer 76 61 6c 20 32>", reply[3].inspect());
return done(err);
});
});
});
describe('hmget', function () {
describe('first argument is a string', function () {
it('handles array of strings with undefined values in transaction (repro #344)', function (done) {
client.multi().hmget("hash key 2", "key 3", "key 4").exec(function(err, reply) {
assert.strictEqual(true, Array.isArray(reply));
assert.strictEqual(1, reply.length);
assert.strictEqual(2, reply[0].length);
assert.equal(null, reply[0][0]);
assert.equal(null, reply[0][1]);
return done(err);
});
});
});
describe('first argument is a buffer', function () {
it('returns buffers for keys requested', function (done) {
client.hmget(new Buffer("hash key 2"), "key 1", "key 2", function (err, reply) {
assert.strictEqual(true, Array.isArray(reply));
assert.strictEqual(2, reply.length);
assert.strictEqual(true, Buffer.isBuffer(reply[0]));
assert.strictEqual(true, Buffer.isBuffer(reply[1]));
assert.strictEqual("<Buffer 76 61 6c 20 31>", reply[0].inspect());
assert.strictEqual("<Buffer 76 61 6c 20 32>", reply[1].inspect());
return done(err);
});
});
it("returns buffers for keys requested in transaction", function (done) {
client.multi().hmget(new Buffer("hash key 2"), "key 1", "key 2").exec(function (err, reply) {
assert.strictEqual(true, Array.isArray(reply));
assert.strictEqual(1, reply.length);
assert.strictEqual(2, reply[0].length);
assert.strictEqual(true, Buffer.isBuffer(reply[0][0]));
assert.strictEqual(true, Buffer.isBuffer(reply[0][1]));
assert.strictEqual("<Buffer 76 61 6c 20 31>", reply[0][0].inspect());
assert.strictEqual("<Buffer 76 61 6c 20 32>", reply[0][1].inspect());
return done(err);
});
});
it("returns buffers for keys requested in .batch", function (done) {
client.batch().hmget(new Buffer("hash key 2"), "key 1", "key 2").exec(function (err, reply) {
assert.strictEqual(true, Array.isArray(reply));
assert.strictEqual(1, reply.length);
assert.strictEqual(2, reply[0].length);
assert.strictEqual(true, Buffer.isBuffer(reply[0][0]));
assert.strictEqual(true, Buffer.isBuffer(reply[0][1]));
assert.strictEqual("<Buffer 76 61 6c 20 31>", reply[0][0].inspect());
assert.strictEqual("<Buffer 76 61 6c 20 32>", reply[0][1].inspect());
return done(err);
});
});
});
});
describe('hgetall', function (done) {
describe('first argument is a string', function () {
it('returns buffer values', function (done) {
client.hgetall("hash key 2", function (err, reply) {
assert.strictEqual("object", typeof reply);
assert.strictEqual(2, Object.keys(reply).length);
assert.strictEqual("<Buffer 76 61 6c 20 31>", reply["key 1"].inspect());
assert.strictEqual("<Buffer 76 61 6c 20 32>", reply["key 2"].inspect());
return done(err);
});
});
it('returns buffer values when executed in transaction', function (done) {
client.multi().hgetall("hash key 2").exec(function (err, reply) {
assert.strictEqual(1, reply.length);
assert.strictEqual("object", typeof reply[0]);
assert.strictEqual(2, Object.keys(reply[0]).length);
assert.strictEqual("<Buffer 76 61 6c 20 31>", reply[0]["key 1"].inspect());
assert.strictEqual("<Buffer 76 61 6c 20 32>", reply[0]["key 2"].inspect());
return done(err);
});
});
it('returns buffer values when executed in .batch', function (done) {
client.batch().hgetall("hash key 2").exec(function (err, reply) {
assert.strictEqual(1, reply.length);
assert.strictEqual("object", typeof reply[0]);
assert.strictEqual(2, Object.keys(reply[0]).length);
assert.strictEqual("<Buffer 76 61 6c 20 31>", reply[0]["key 1"].inspect());
assert.strictEqual("<Buffer 76 61 6c 20 32>", reply[0]["key 2"].inspect());
return done(err);
});
});
});
describe('first argument is a buffer', function () {
it('returns buffer values', function (done) {
client.hgetall(new Buffer("hash key 2"), function (err, reply) {
assert.strictEqual(null, err);
assert.strictEqual("object", typeof reply);
assert.strictEqual(2, Object.keys(reply).length);
assert.strictEqual(true, Buffer.isBuffer(reply["key 1"]));
assert.strictEqual(true, Buffer.isBuffer(reply["key 2"]));
assert.strictEqual("<Buffer 76 61 6c 20 31>", reply["key 1"].inspect());
assert.strictEqual("<Buffer 76 61 6c 20 32>", reply["key 2"].inspect());
return done(err);
});
});
it('returns buffer values when executed in transaction', function (done) {
client.multi().hgetall(new Buffer("hash key 2")).exec(function (err, reply) {
assert.strictEqual(1, reply.length);
assert.strictEqual("object", typeof reply[0]);
assert.strictEqual(2, Object.keys(reply[0]).length);
assert.strictEqual(true, Buffer.isBuffer(reply[0]["key 1"]));
assert.strictEqual(true, Buffer.isBuffer(reply[0]["key 2"]));
assert.strictEqual("<Buffer 76 61 6c 20 31>", reply[0]["key 1"].inspect());
assert.strictEqual("<Buffer 76 61 6c 20 32>", reply[0]["key 2"].inspect());
return done(err);
});
});
it('returns buffer values when executed in .batch', function (done) {
client.batch().hgetall(new Buffer("hash key 2")).exec(function (err, reply) {
assert.strictEqual(1, reply.length);
assert.strictEqual("object", typeof reply[0]);
assert.strictEqual(2, Object.keys(reply[0]).length);
assert.strictEqual(true, Buffer.isBuffer(reply[0]["key 1"]));
assert.strictEqual(true, Buffer.isBuffer(reply[0]["key 2"]));
assert.strictEqual("<Buffer 76 61 6c 20 31>", reply[0]["key 1"].inspect());
assert.strictEqual("<Buffer 76 61 6c 20 32>", reply[0]["key 2"].inspect());
return done(err);
});
});
});
});
});
});
});
Loading…
Cancel
Save