Browse Source

dgram: support dgram.send with multiple buffers

Added ability to dgram.send to send multiple buffers, _writev style.
The offset and length parameters in dgram.send are now optional.
Refactored the dgram benchmarks, and seperated them from net.
Added docs for the new signature.

Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Saúl Ibarra Corretgé <saghul@gmail.com>
Fixes: https://github.com/nodejs/node/issues/4302
PR-URL: https://github.com/nodejs/node/pull/4374
process-exit-stdio-flushing
Matteo Collina 9 years ago
parent
commit
137f53c7b7
  1. 5
      Makefile
  2. 80
      benchmark/dgram/array-vs-concat.js
  3. 70
      benchmark/dgram/multi-buffer.js
  4. 7
      benchmark/dgram/offset-length.js
  5. 62
      benchmark/dgram/single-buffer.js
  6. 26
      doc/api/dgram.markdown
  7. 170
      lib/dgram.js
  8. 64
      src/udp_wrap.cc
  9. 20
      test/parallel/test-dgram-oob-buffer.js
  10. 11
      test/parallel/test-dgram-send-bad-arguments.js
  11. 21
      test/parallel/test-dgram-send-callback-buffer.js
  12. 36
      test/parallel/test-dgram-send-callback-multi-buffer.js

5
Makefile

@ -501,7 +501,10 @@ bench-events: all
bench-util: all
@$(NODE) benchmark/common.js util
bench-all: bench bench-misc bench-array bench-buffer bench-url bench-events
bench-dgram: all
@$(NODE) benchmark/common.js dgram
bench-all: bench bench-misc bench-array bench-buffer bench-url bench-events bench-dgram bench-util
bench: bench-net bench-http bench-fs bench-tls

80
benchmark/dgram/array-vs-concat.js

@ -0,0 +1,80 @@
// test UDP send throughput with the multi buffer API against Buffer.concat
'use strict';
const common = require('../common.js');
const PORT = common.PORT;
// `num` is the number of send requests to queue up each time.
// Keep it reasonably high (>10) otherwise you're benchmarking the speed of
// event loop cycles more than anything else.
var bench = common.createBenchmark(main, {
len: [64, 256, 512, 1024],
num: [100],
chunks: [1, 2, 4, 8],
type: ['concat', 'multi'],
dur: [5]
});
var dur;
var len;
var num;
var type;
var chunk;
var chunks;
var encoding;
function main(conf) {
dur = +conf.dur;
len = +conf.len;
num = +conf.num;
type = conf.type;
chunks = +conf.chunks;
chunk = []
for (var i = 0; i < chunks; i++) {
chunk.push(new Buffer(Math.round(len / chunks)));
}
server();
}
var dgram = require('dgram');
function server() {
var sent = 0;
var received = 0;
var socket = dgram.createSocket('udp4');
var onsend = type === 'concat' ? onsendConcat : onsendMulti;
function onsendConcat() {
if (sent++ % num == 0)
for (var i = 0; i < num; i++) {
socket.send(Buffer.concat(chunk), PORT, '127.0.0.1', onsend);
}
}
function onsendMulti() {
if (sent++ % num == 0)
for (var i = 0; i < num; i++) {
socket.send(chunk, PORT, '127.0.0.1', onsend);
}
}
socket.on('listening', function() {
bench.start();
onsend();
setTimeout(function() {
var bytes = sent * len;
var gbits = (bytes * 8) / (1024 * 1024 * 1024);
bench.end(gbits);
}, dur * 1000);
});
socket.on('message', function(buf, rinfo) {
received++;
});
socket.bind(PORT);
}

70
benchmark/dgram/multi-buffer.js

@ -0,0 +1,70 @@
// test UDP send/recv throughput with the multi buffer API
'use strict';
const common = require('../common.js');
const PORT = common.PORT;
// `num` is the number of send requests to queue up each time.
// Keep it reasonably high (>10) otherwise you're benchmarking the speed of
// event loop cycles more than anything else.
var bench = common.createBenchmark(main, {
len: [64, 256, 1024],
num: [100],
chunks: [1, 2, 4, 8],
type: ['send', 'recv'],
dur: [5]
});
var dur;
var len;
var num;
var type;
var chunk;
var chunks;
var encoding;
function main(conf) {
dur = +conf.dur;
len = +conf.len;
num = +conf.num;
type = conf.type;
chunks = +conf.chunks;
chunk = []
for (var i = 0; i < chunks; i++) {
chunk.push(new Buffer(Math.round(len / chunks)));
}
server();
}
var dgram = require('dgram');
function server() {
var sent = 0;
var received = 0;
var socket = dgram.createSocket('udp4');
function onsend() {
if (sent++ % num == 0)
for (var i = 0; i < num; i++)
socket.send(chunk, PORT, '127.0.0.1', onsend);
}
socket.on('listening', function() {
bench.start();
onsend();
setTimeout(function() {
var bytes = (type === 'send' ? sent : received) * len;
var gbits = (bytes * 8) / (1024 * 1024 * 1024);
bench.end(gbits);
}, dur * 1000);
});
socket.on('message', function(buf, rinfo) {
received++;
});
socket.bind(PORT);
}

7
benchmark/net/dgram.js → benchmark/dgram/offset-length.js

@ -1,7 +1,8 @@
// test UDP send/recv throughput
// test UDP send/recv throughput with the "old" offset/length API
'use strict';
var common = require('../common.js');
var PORT = common.PORT;
const common = require('../common.js');
const PORT = common.PORT;
// `num` is the number of send requests to queue up each time.
// Keep it reasonably high (>10) otherwise you're benchmarking the speed of

62
benchmark/dgram/single-buffer.js

@ -0,0 +1,62 @@
// test UDP send/recv throughput with the new single Buffer API
'use strict';
const common = require('../common.js');
const PORT = common.PORT;
// `num` is the number of send requests to queue up each time.
// Keep it reasonably high (>10) otherwise you're benchmarking the speed of
// event loop cycles more than anything else.
var bench = common.createBenchmark(main, {
len: [1, 64, 256, 1024],
num: [100],
type: ['send', 'recv'],
dur: [5]
});
var dur;
var len;
var num;
var type;
var chunk;
var encoding;
function main(conf) {
dur = +conf.dur;
len = +conf.len;
num = +conf.num;
type = conf.type;
chunk = new Buffer(len);
server();
}
var dgram = require('dgram');
function server() {
var sent = 0;
var received = 0;
var socket = dgram.createSocket('udp4');
function onsend() {
if (sent++ % num == 0)
for (var i = 0; i < num; i++)
socket.send(chunk, PORT, '127.0.0.1', onsend);
}
socket.on('listening', function() {
bench.start();
onsend();
setTimeout(function() {
var bytes = (type === 'send' ? sent : received) * chunk.length;
var gbits = (bytes * 8) / (1024 * 1024 * 1024);
bench.end(gbits);
}, dur * 1000);
});
socket.on('message', function(buf, rinfo) {
received++;
});
socket.bind(PORT);
}

26
doc/api/dgram.markdown

@ -185,9 +185,10 @@ never have reason to call this.
If `multicastInterface` is not specified, the operating system will attempt to
drop membership on all valid interfaces.
### socket.send(buf, offset, length, port, address[, callback])
### socket.send(buf, [offset, length,] port, address[, callback])
* `buf` Buffer object or string. Message to be sent
* `buf` Buffer object, string, or an array of either. Message to be
sent.
* `offset` Integer. Offset in the buffer where the message starts.
* `length` Integer. Number of bytes in the message.
* `port` Integer. Destination port.
@ -224,17 +225,36 @@ The only way to know for sure that the datagram has been sent is by using a
passed as the first argument to the `callback`. If a `callback` is not given,
the error is emitted as an `'error'` event on the `socket` object.
Offset and length are optional, but if you specify one you would need to
specify the other. Also, they are supported only when the first
argument is a `Buffer`.
Example of sending a UDP packet to a random port on `localhost`;
```js
const dgram = require('dgram');
const message = new Buffer('Some bytes');
const client = dgram.createSocket('udp4');
client.send(message, 0, message.length, 41234, 'localhost', (err) => {
client.send(message, 41234, 'localhost', (err) => {
client.close();
});
```
Example of sending a UDP packet composed of multiple buffers to a random port on `localhost`;
```js
const dgram = require('dgram');
const buf1 = new Buffer('Some ');
const buf2 = new Buffer('bytes');
const client = dgram.createSocket('udp4');
client.send([buf1, buf2], 41234, 'localhost', (err) => {
client.close();
});
```
Sending multiple buffers might be faster or slower depending on your
application and operating system: benchmark it. Usually it is faster.
**A Note about UDP datagram size**
The maximum size of an `IPv4/v6` datagram depends on the `MTU`

170
lib/dgram.js

@ -243,6 +243,49 @@ Socket.prototype.sendto = function(buffer,
};
function sliceBuffer(buffer, offset, length) {
if (typeof buffer === 'string')
buffer = new Buffer(buffer);
else if (!(buffer instanceof Buffer))
throw new TypeError('First argument must be a buffer or string');
offset = offset >>> 0;
length = length >>> 0;
return buffer.slice(offset, offset + length);
}
function fixBuffer(buffer) {
for (var i = 0, l = buffer.length; i < l; i++) {
var buf = buffer[i];
if (typeof buf === 'string')
buffer[i] = new Buffer(buf);
else if (!(buf instanceof Buffer))
return false;
}
return true;
}
function enqueue(self, toEnqueue) {
// If the send queue hasn't been initialized yet, do it, and install an
// event handler that flushes the send queue after binding is done.
if (!self._sendQueue) {
self._sendQueue = [];
self.once('listening', function() {
// Flush the send queue.
for (var i = 0; i < this._sendQueue.length; i++)
this.send.apply(self, this._sendQueue[i]);
this._sendQueue = undefined;
});
}
self._sendQueue.push(toEnqueue);
return;
}
Socket.prototype.send = function(buffer,
offset,
length,
@ -251,30 +294,29 @@ Socket.prototype.send = function(buffer,
callback) {
var self = this;
if (typeof buffer === 'string')
buffer = new Buffer(buffer);
else if (!(buffer instanceof Buffer))
throw new TypeError('First argument must be a buffer or string');
offset = offset | 0;
if (offset < 0)
throw new RangeError('Offset should be >= 0');
if ((length == 0 && offset > buffer.length) ||
(length > 0 && offset >= buffer.length))
throw new RangeError('Offset into buffer is too large');
// Sending a zero-length datagram is kind of pointless but it _is_
// allowed, hence check that length >= 0 rather than > 0.
length = length | 0;
if (length < 0)
throw new RangeError('Length should be >= 0');
// same as arguments.length === 5 || arguments.length === 6
if (address) {
buffer = sliceBuffer(buffer, offset, length);
} else {
callback = port;
port = offset;
address = length;
}
if (offset + length > buffer.length)
throw new RangeError('Offset + length beyond buffer length');
if (!Array.isArray(buffer)) {
if (typeof buffer === 'string') {
buffer = [ new Buffer(buffer) ];
} else if (!(buffer instanceof Buffer)) {
throw new TypeError('First argument must be a buffer or a string');
} else {
buffer = [ buffer ];
}
} else if (!fixBuffer(buffer)) {
throw new TypeError('Buffer list arguments must be buffers or strings');
}
port = port | 0;
if (port <= 0 || port > 65535)
port = port >>> 0;
if (port === 0 || port > 65535)
throw new RangeError('Port should be > 0 and < 65536');
// Normalize callback so it's either a function or undefined but not anything
@ -290,61 +332,55 @@ Socket.prototype.send = function(buffer,
// If the socket hasn't been bound yet, push the outbound packet onto the
// send queue and send after binding is complete.
if (self._bindState != BIND_STATE_BOUND) {
// If the send queue hasn't been initialized yet, do it, and install an
// event handler that flushes the send queue after binding is done.
if (!self._sendQueue) {
self._sendQueue = [];
self.once('listening', function() {
// Flush the send queue.
for (var i = 0; i < self._sendQueue.length; i++)
self.send.apply(self, self._sendQueue[i]);
self._sendQueue = undefined;
});
}
self._sendQueue.push([buffer, offset, length, port, address, callback]);
enqueue(self, [buffer, port, address, callback]);
return;
}
self._handle.lookup(address, function(ex, ip) {
if (ex) {
if (typeof callback === 'function') {
callback(ex);
return;
}
self.emit('error', ex);
} else if (self._handle) {
var req = new SendWrap();
req.buffer = buffer; // Keep reference alive.
req.length = length;
req.address = address;
req.port = port;
if (callback) {
req.callback = callback;
req.oncomplete = afterSend;
}
var err = self._handle.send(req,
buffer,
offset,
length,
port,
ip,
!!callback);
if (err && callback) {
// don't emit as error, dgram_legacy.js compatibility
var ex = exceptionWithHostPort(err, 'send', address, port);
process.nextTick(callback, ex);
}
}
self._handle.lookup(address, function afterDns(ex, ip) {
doSend(ex, self, ip, buffer, address, port, callback);
});
};
function afterSend(err) {
function doSend(ex, self, ip, buffer, address, port, callback) {
if (ex) {
if (typeof callback === 'function') {
callback(ex);
return;
}
self.emit('error', ex);
return;
} else if (!self._handle) {
return;
}
var req = new SendWrap();
req.buffer = buffer; // Keep reference alive.
req.address = address;
req.port = port;
if (callback) {
req.callback = callback;
req.oncomplete = afterSend;
}
var err = self._handle.send(req,
buffer,
buffer.length,
port,
ip,
!!callback);
if (err && callback) {
// don't emit as error, dgram_legacy.js compatibility
var ex = exceptionWithHostPort(err, 'send', address, port);
process.nextTick(callback, ex);
}
}
function afterSend(err, sent) {
if (err) {
err = exceptionWithHostPort(err, 'send', this.address, this.port);
}
this.callback(err, this.length);
this.callback(err, sent);
}

64
src/udp_wrap.cc

@ -13,6 +13,7 @@
namespace node {
using v8::Array;
using v8::Context;
using v8::EscapableHandleScope;
using v8::External;
@ -35,6 +36,7 @@ class SendWrap : public ReqWrap<uv_udp_send_t> {
public:
SendWrap(Environment* env, Local<Object> req_wrap_obj, bool have_callback);
inline bool have_callback() const;
size_t msg_size;
size_t self_size() const override { return sizeof(*this); }
private:
const bool have_callback_;
@ -243,29 +245,46 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
UDPWrap* wrap = Unwrap<UDPWrap>(args.Holder());
// send(req, buffer, offset, length, port, address)
// send(req, buffer, port, address, hasCallback)
CHECK(args[0]->IsObject());
CHECK(Buffer::HasInstance(args[1]));
CHECK(args[1]->IsArray());
CHECK(args[2]->IsUint32());
CHECK(args[3]->IsUint32());
CHECK(args[4]->IsUint32());
CHECK(args[5]->IsString());
CHECK(args[6]->IsBoolean());
CHECK(args[4]->IsString());
CHECK(args[5]->IsBoolean());
Local<Object> req_wrap_obj = args[0].As<Object>();
Local<Object> buffer_obj = args[1].As<Object>();
size_t offset = args[2]->Uint32Value();
size_t length = args[3]->Uint32Value();
const unsigned short port = args[4]->Uint32Value();
node::Utf8Value address(env->isolate(), args[5]);
const bool have_callback = args[6]->IsTrue();
CHECK_LE(length, Buffer::Length(buffer_obj) - offset);
Local<Array> chunks = args[1].As<Array>();
// it is faster to fetch the length of the
// array in js-land
size_t count = args[2]->Uint32Value();
const unsigned short port = args[3]->Uint32Value();
node::Utf8Value address(env->isolate(), args[4]);
const bool have_callback = args[5]->IsTrue();
SendWrap* req_wrap = new SendWrap(env, req_wrap_obj, have_callback);
size_t msg_size = 0;
// allocate uv_buf_t of the correct size
// if bigger than 16 elements
uv_buf_t bufs_[16];
uv_buf_t* bufs = bufs_;
if (ARRAY_SIZE(bufs_) < count)
bufs = new uv_buf_t[count];
// construct uv_buf_t array
for (size_t i = 0; i < count; i++) {
Local<Value> chunk = chunks->Get(i);
size_t length = Buffer::Length(chunk);
bufs[i] = uv_buf_init(Buffer::Data(chunk), length);
msg_size += length;
}
req_wrap->msg_size = msg_size;
uv_buf_t buf = uv_buf_init(Buffer::Data(buffer_obj) + offset,
length);
char addr[sizeof(sockaddr_in6)];
int err;
@ -284,12 +303,16 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
if (err == 0) {
err = uv_udp_send(&req_wrap->req_,
&wrap->handle_,
&buf,
1,
bufs,
count,
reinterpret_cast<const sockaddr*>(&addr),
OnSend);
}
// Deallocate space
if (bufs != bufs_)
delete[] bufs;
req_wrap->Dispatched();
if (err)
delete req_wrap;
@ -332,8 +355,11 @@ void UDPWrap::OnSend(uv_udp_send_t* req, int status) {
Environment* env = req_wrap->env();
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
Local<Value> arg = Integer::New(env->isolate(), status);
req_wrap->MakeCallback(env->oncomplete_string(), 1, &arg);
Local<Value> arg[] = {
Integer::New(env->isolate(), status),
Integer::New(env->isolate(), req_wrap->msg_size),
};
req_wrap->MakeCallback(env->oncomplete_string(), 2, arg);
}
delete req_wrap;
}

20
test/parallel/test-dgram-oob-buffer.js

@ -4,7 +4,6 @@
// recvfrom(). Node should not propagate this error to the user.
var common = require('../common');
var assert = require('assert');
var dgram = require('dgram');
var socket = dgram.createSocket('udp4');
@ -18,23 +17,4 @@ socket.send(buf, 3, 1, common.PORT, '127.0.0.1', ok);
// Since length of zero means nothing, don't error despite OOB.
socket.send(buf, 4, 0, common.PORT, '127.0.0.1', ok);
assert.throws(function() {
socket.send(buf, 0, 5, common.PORT, '127.0.0.1', common.fail);
});
assert.throws(function() {
socket.send(buf, 2, 3, common.PORT, '127.0.0.1', common.fail);
});
assert.throws(function() {
socket.send(buf, 4, 4, common.PORT, '127.0.0.1', common.fail);
});
assert.throws(function() {
socket.send('abc', 4, 1, common.PORT, '127.0.0.1', common.fail);
});
assert.throws(function() {
socket.send('abc', 0, 4, common.PORT, '127.0.0.1', common.fail);
});
assert.throws(function() {
socket.send('abc', -1, 2, common.PORT, '127.0.0.1', common.fail);
});
socket.close(); // FIXME should not be necessary

11
test/parallel/test-dgram-send-bad-arguments.js

@ -11,10 +11,13 @@ assert.throws(function() {
sock.send();
}, TypeError); // First argument should be a buffer.
assert.throws(function() { sock.send(buf, -1, 1, 1, host); }, RangeError);
assert.throws(function() { sock.send(buf, 1, -1, 1, host); }, RangeError);
// send(buf, offset, length, port, host)
assert.throws(function() { sock.send(buf, 1, 1, -1, host); }, RangeError);
assert.throws(function() { sock.send(buf, 5, 1, 1, host); }, RangeError);
assert.throws(function() { sock.send(buf, 1, 5, 1, host); }, RangeError);
assert.throws(function() { sock.send(buf, 1, 1, 0, host); }, RangeError);
assert.throws(function() { sock.send(buf, 1, 1, 65536, host); }, RangeError);
// send(buf, port, host)
assert.throws(function() { sock.send(23, 12345, host); }, TypeError);
// send([buf1, ..], port, host)
assert.throws(function() { sock.send([buf, 23], 12345, host); }, TypeError);

21
test/parallel/test-dgram-send-callback-buffer.js

@ -0,0 +1,21 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const dgram = require('dgram');
const client = dgram.createSocket('udp4');
const buf = new Buffer(256);
const onMessage = common.mustCall(function(err, bytes) {
assert.equal(bytes, buf.length);
clearTimeout(timer);
client.close();
});
const timer = setTimeout(function() {
throw new Error('Timeout');
}, common.platformTimeout(200));
client.send(buf, common.PORT, common.localhostIPv4, onMessage);

36
test/parallel/test-dgram-send-callback-multi-buffer.js

@ -0,0 +1,36 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const dgram = require('dgram');
const client = dgram.createSocket('udp4');
const timer = setTimeout(function() {
throw new Error('Timeout');
}, common.platformTimeout(200));
const onMessage = common.mustCall(function(err, bytes) {
assert.equal(bytes, buf1.length + buf2.length);
clearTimeout(timer);
client.close();
});
const buf1 = new Buffer(256);
const buf2 = new Buffer(256);
buf1.fill('x');
buf2.fill('y');
client.on('listening', function() {
client.send([buf1, buf2], common.PORT, common.localhostIPv4, onMessage);
});
client.on('message', function(buf, info) {
const expected = Buffer.concat([buf1, buf2]);
assert.ok(buf.equals(expected), 'message was received correctly');
client.close();
});
client.bind(common.PORT);
Loading…
Cancel
Save