diff --git a/lib/cluster.js b/lib/cluster.js index cbccd02605..84d7c766cb 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -57,7 +57,7 @@ Worker.prototype.isConnected = function isConnected() { // Master/worker specific methods are defined in the *Init() functions. -function SharedHandle(key, address, port, addressType, backlog, fd) { +function SharedHandle(key, address, port, addressType, backlog, fd, flags) { this.key = key; this.workers = []; this.handle = null; @@ -66,7 +66,7 @@ function SharedHandle(key, address, port, addressType, backlog, fd) { // FIXME(bnoordhuis) Polymorphic return type for lack of a better solution. var rval; if (addressType === 'udp4' || addressType === 'udp6') - rval = dgram._createSocketHandle(address, port, addressType, fd); + rval = dgram._createSocketHandle(address, port, addressType, fd, flags); else rval = net._createServerHandle(address, port, addressType, fd); @@ -438,7 +438,8 @@ function masterInit() { var args = [message.address, message.port, message.addressType, - message.fd]; + message.fd, + message.index]; var key = args.join(':'); var handle = handles[key]; if (handle === undefined) { @@ -456,7 +457,8 @@ function masterInit() { message.port, message.addressType, message.backlog, - message.fd); + message.fd, + message.flags); } if (!handle.data) handle.data = message.data; @@ -485,7 +487,7 @@ function masterInit() { cluster.emit('listening', worker, info); } - // Round-robin only. Server in worker is closing, remove from list. + // Server in worker is closing, remove from list. function close(worker, message) { var key = message.key; var handle = handles[key]; @@ -500,6 +502,7 @@ function masterInit() { function workerInit() { var handles = {}; + var indexes = {}; // Called from src/node.js cluster._setupWorker = function() { @@ -528,15 +531,22 @@ function workerInit() { }; // obj is a net#Server or a dgram#Socket object. - cluster._getServer = function(obj, address, port, addressType, fd, cb) { - var message = { - addressType: addressType, - address: address, - port: port, + cluster._getServer = function(obj, options, cb) { + const key = [ options.address, + options.port, + options.addressType, + options.fd ].join(':'); + if (indexes[key] === undefined) + indexes[key] = 0; + else + indexes[key]++; + + const message = util._extend({ act: 'queryServer', - fd: fd, + index: indexes[key], data: null - }; + }, options); + // Set custom data on handle (i.e. tls tickets key) if (obj._getServerData) message.data = obj._getServerData(); send(message, function(reply, handle) { @@ -549,9 +559,9 @@ function workerInit() { }); obj.once('listening', function() { cluster.worker.state = 'listening'; - var address = obj.address(); + const address = obj.address(); message.act = 'listening'; - message.port = address && address.port || port; + message.port = address && address.port || options.port; send(message); }); }; @@ -563,6 +573,7 @@ function workerInit() { // closed. Avoids resource leaks when the handle is short-lived. var close = handle.close; handle.close = function() { + send({ act: 'close', key: key }); delete handles[key]; return close.apply(this, arguments); }; diff --git a/lib/dgram.js b/lib/dgram.js index b7bb7d3d70..5d207a5ef6 100644 --- a/lib/dgram.js +++ b/lib/dgram.js @@ -60,14 +60,14 @@ function newHandle(type) { } -exports._createSocketHandle = function(address, port, addressType, fd) { +exports._createSocketHandle = function(address, port, addressType, fd, flags) { // Opening an existing fd is not supported for UDP handles. assert(typeof fd !== 'number' || fd < 0); var handle = newHandle(addressType); if (port || address) { - var err = handle.bind(address, port || 0, 0); + var err = handle.bind(address, port || 0, flags); if (err) { handle.close(); return err; @@ -176,8 +176,12 @@ Socket.prototype.bind = function(port /*, address, callback*/) { if (!cluster) cluster = require('cluster'); + var flags = 0; + if (self._reuseAddr) + flags |= constants.UV_UDP_REUSEADDR; + if (cluster.isWorker && !exclusive) { - cluster._getServer(self, ip, port, self.type, -1, function(err, handle) { + function onHandle(err, handle) { if (err) { var ex = exceptionWithHostPort(err, 'bind', ip, port); self.emit('error', ex); @@ -191,16 +195,19 @@ Socket.prototype.bind = function(port /*, address, callback*/) { replaceHandle(self, handle); startListening(self); - }); + } + cluster._getServer(self, { + address: ip, + port: port, + addressType: self.type, + fd: -1, + flags: flags + }, onHandle); } else { if (!self._handle) return; // handle has been closed in the mean time - var flags = 0; - if (self._reuseAddr) - flags |= constants.UV_UDP_REUSEADDR; - var err = self._handle.bind(ip, port || 0, flags); if (err) { var ex = exceptionWithHostPort(err, 'bind', ip, port); diff --git a/lib/net.js b/lib/net.js index 10e7896784..8245601e51 100644 --- a/lib/net.js +++ b/lib/net.js @@ -1268,7 +1268,13 @@ function listen(self, address, port, addressType, backlog, fd, exclusive) { return; } - cluster._getServer(self, address, port, addressType, fd, cb); + cluster._getServer(self, { + address: address, + port: port, + addressType: addressType, + fd: fd, + flags: 0 + }, cb); function cb(err, handle) { // EADDRINUSE may not be reported until we call listen(). To complicate diff --git a/test/parallel/test-cluster-dgram-reuse.js b/test/parallel/test-cluster-dgram-reuse.js new file mode 100644 index 0000000000..1de6ad1e5f --- /dev/null +++ b/test/parallel/test-cluster-dgram-reuse.js @@ -0,0 +1,40 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const cluster = require('cluster'); +const dgram = require('dgram'); + +if (common.isWindows) { + console.log('1..0 # Skipped: dgram clustering is currently not supported ' + + 'on windows.'); + return; +} + +if (cluster.isMaster) { + cluster.fork().on('exit', function(code) { + assert.equal(code, 0); + }); + return; +} + +const sockets = []; +function next() { + sockets.push(this); + if (sockets.length !== 2) + return; + + // Work around health check issue + process.nextTick(function() { + for (var i = 0; i < sockets.length; i++) + sockets[i].close(close); + }); +} + +var waiting = 2; +function close() { + if (--waiting === 0) + cluster.worker.disconnect(); +} + +for (var i = 0; i < 2; i++) + dgram.createSocket({ type: 'udp4', reuseAddr: true }).bind(common.PORT, next);