You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

115 lines
2.9 KiB

'use strict';
const assert = require('assert');
const net = require('net');
const { sendHelper } = require('internal/cluster/utils');
const getOwnPropertyNames = Object.getOwnPropertyNames;
const uv = process.binding('uv');
module.exports = RoundRobinHandle;
function RoundRobinHandle(key, address, port, addressType, fd) {
this.key = key;
this.all = {};
this.free = [];
this.handles = [];
this.handle = null;
this.server = net.createServer(assert.fail);
if (fd >= 0)
this.server.listen({ fd });
else if (port >= 0)
this.server.listen(port, address);
else
this.server.listen(address); // UNIX socket path.
this.server.once('listening', () => {
this.handle = this.server._handle;
this.handle.onconnection = (err, handle) => this.distribute(err, handle);
this.server._handle = null;
this.server = null;
});
}
RoundRobinHandle.prototype.add = function(worker, send) {
assert(worker.id in this.all === false);
this.all[worker.id] = worker;
const done = () => {
if (this.handle.getsockname) {
const out = {};
this.handle.getsockname(out);
// TODO(bnoordhuis) Check err.
send(null, { sockname: out }, null);
} else {
send(null, null, null); // UNIX socket.
}
this.handoff(worker); // In case there are connections pending.
};
if (this.server === null)
return done();
// Still busy binding.
this.server.once('listening', done);
this.server.once('error', (err) => {
// Hack: translate 'EADDRINUSE' error string back to numeric error code.
// It works but ideally we'd have some backchannel between the net and
// cluster modules for stuff like this.
send(uv[`UV_${err.errno}`], null);
});
};
RoundRobinHandle.prototype.remove = function(worker) {
if (worker.id in this.all === false)
return false;
delete this.all[worker.id];
const index = this.free.indexOf(worker);
if (index !== -1)
this.free.splice(index, 1);
if (getOwnPropertyNames(this.all).length !== 0)
return false;
for (var handle; handle = this.handles.shift(); handle.close())
;
this.handle.close();
this.handle = null;
return true;
};
RoundRobinHandle.prototype.distribute = function(err, handle) {
this.handles.push(handle);
const worker = this.free.shift();
if (worker)
this.handoff(worker);
};
RoundRobinHandle.prototype.handoff = function(worker) {
if (worker.id in this.all === false) {
return; // Worker is closing (or has closed) the server.
}
const handle = this.handles.shift();
if (handle === undefined) {
this.free.push(worker); // Add to ready queue again.
return;
}
const message = { act: 'newconn', key: this.key };
sendHelper(worker.process, message, handle, (reply) => {
if (reply.accepted)
handle.close();
else
this.distribute(0, handle); // Worker is shutting down. Send to another.
this.handoff(worker);
});
};