@ -25,6 +25,8 @@ var dgram = require('dgram');
var fork = require ( 'child_process' ) . fork ;
var net = require ( 'net' ) ;
var util = require ( 'util' ) ;
var SCHED_NONE = 1 ;
var SCHED_RR = 2 ;
var cluster = new EventEmitter ;
module . exports = cluster ;
@ -52,6 +54,121 @@ Worker.prototype.send = function() {
// Master/worker specific methods are defined in the *Init() functions.
function SharedHandle ( key , address , port , addressType , backlog , fd ) {
this . key = key ;
this . errno = '' ;
this . workers = [ ] ;
if ( addressType === 'udp4' || addressType === 'udp6' )
this . handle = dgram . _ createSocketHandle ( address , port , addressType , fd ) ;
else
this . handle = net . _ createServerHandle ( address , port , addressType , fd ) ;
this . errno = this . handle ? '' : process . _ errno ;
}
SharedHandle . prototype . add = function ( worker , send ) {
assert ( this . workers . indexOf ( worker ) === - 1 ) ;
this . workers . push ( worker ) ;
send ( this . errno , null , this . handle ) ;
} ;
SharedHandle . prototype . remove = function ( worker ) {
var index = this . workers . indexOf ( worker ) ;
assert ( index !== - 1 ) ;
this . workers . splice ( index , 1 ) ;
if ( this . workers . length !== 0 ) return false ;
this . handle . close ( ) ;
this . handle = null ;
return true ;
} ;
// Start a round-robin server. Master accepts connections and distributes
// them over the workers.
function RoundRobinHandle ( key , address , port , addressType , backlog , fd ) {
this . key = key ;
this . all = { } ;
this . free = [ ] ;
this . handles = [ ] ;
this . handle = null ;
this . server = net . createServer ( assert . fail ) ;
if ( fd >= 0 )
this . server . listen ( { fd : fd } ) ;
else if ( port >= 0 )
this . server . listen ( port , address ) ;
else
this . server . listen ( address ) ; // UNIX socket path.
var self = this ;
this . server . once ( 'listening' , function ( ) {
self . handle = self . server . _ handle ;
self . handle . onconnection = self . distribute . bind ( self ) ;
self . server . _ handle = null ;
self . server = null ;
} ) ;
}
RoundRobinHandle . prototype . add = function ( worker , send ) {
assert ( worker . id in this . all === false ) ;
this . all [ worker . id ] = worker ;
var self = this ;
function done ( ) {
if ( self . handle . getsockname )
send ( null , { sockname : self . handle . getsockname ( ) } , null ) ;
else
send ( null , null , null ) ; // UNIX socket.
self . handoff ( worker ) ; // In case there are connections pending.
}
if ( this . server === null ) return done ( ) ;
// Still busy binding.
this . server . once ( 'listening' , done ) ;
this . server . once ( 'error' , function ( err ) {
send ( err . errno , null ) ;
} ) ;
} ;
RoundRobinHandle . prototype . remove = function ( worker ) {
if ( worker . id in this . all === false ) return false ;
delete this . all [ worker . id ] ;
var index = this . free . indexOf ( worker ) ;
if ( index !== - 1 ) this . free . splice ( index , 1 ) ;
if ( Object . getOwnPropertyNames ( this . all ) . length !== 0 ) return false ;
for ( var handle ; handle = this . handles . shift ( ) ; handle . close ( ) ) ;
this . handle . close ( ) ;
this . handle = null ;
return true ;
} ;
RoundRobinHandle . prototype . distribute = function ( handle ) {
this . handles . push ( handle ) ;
var worker = this . free . shift ( ) ;
if ( worker ) this . handoff ( worker ) ;
} ;
RoundRobinHandle . prototype . handoff = function ( worker ) {
if ( worker . id in this . all === false ) {
return ; // Worker is closing (or has closed) the server.
}
var handle = this . handles . shift ( ) ;
if ( typeof handle === 'undefined' ) {
this . free . push ( worker ) ; // Add to ready queue again.
return ;
}
var message = { act : 'newconn' , key : this . key } ;
var self = this ;
sendHelper ( worker . process , message , handle , function ( reply ) {
if ( reply . accepted )
handle . close ( ) ;
else
self . distribute ( handle ) ; // Worker is shutting down. Send to another.
self . handoff ( worker ) ;
} ) ;
} ;
if ( cluster . isMaster )
masterInit ( ) ;
@ -90,11 +207,25 @@ function masterInit() {
} ;
cluster . settings = settings ;
// Indexed by address:port:etc key. Its entries are dicts with handle and
// workers keys. That second one is a list of workers that hold a reference
// to the handle. When a worker dies, we scan the dicts and close the handle
// when its reference count drops to zero. Yes, that means we're doing an
// O(n*m) scan but n and m are small and worker deaths are rare events anyway.
// XXX(bnoordhuis) Fold cluster.schedulingPolicy into cluster.settings?
var schedulingPolicy = {
'none' : SCHED_NONE ,
'rr' : SCHED_RR
} [ process . env . NODE_CLUSTER_SCHED_POLICY ] ;
if ( typeof schedulingPolicy === 'undefined' ) {
// FIXME Round-robin doesn't perform well on Windows right now due to the
// way IOCP is wired up. Bert is going to fix that, eventually.
schedulingPolicy = ( process . platform === 'win32' ) ? SCHED_NONE : SCHED_RR ;
}
cluster . schedulingPolicy = schedulingPolicy ;
cluster . SCHED_NONE = SCHED_NONE ; // Leave it to the operating system.
cluster . SCHED_RR = SCHED_RR ; // Master distributes connections.
// Keyed on address:port:etc. When a worker dies, we walk over the handles
// and remove() the worker from each one. remove() may do a linear scan
// itself so we might end up with an O(n*m) operation. Ergo, FIXME.
var handles = { } ;
var initialized = false ;
@ -111,6 +242,9 @@ function masterInit() {
{
settings . execArgv = settings . execArgv . concat ( [ '--logfile=v8-%p.log' ] ) ;
}
schedulingPolicy = cluster . schedulingPolicy ; // Freeze policy.
assert ( schedulingPolicy === SCHED_NONE || schedulingPolicy === SCHED_RR ,
'Bad cluster.schedulingPolicy: ' + schedulingPolicy ) ;
cluster . settings = settings ;
process . on ( 'internalMessage' , function ( message ) {
@ -169,15 +303,9 @@ function masterInit() {
cluster . on ( 'disconnect' , function ( worker ) {
delete cluster . workers [ worker . id ] ;
// O(n*m) scan but for small values of n and m.
for ( var key in handles ) {
var e = handles [ key ] ;
var i = e . workers . indexOf ( worker ) ;
if ( i === - 1 ) continue ;
e . workers . splice ( i , 1 ) ;
if ( e . workers . length !== 0 ) continue ;
e . handle . close ( ) ;
delete handles [ key ] ;
var handle = handles [ key ] ;
if ( handle . remove ( worker ) ) delete handles [ key ] ;
}
if ( Object . keys ( handles ) . length === 0 ) {
intercom . emit ( 'disconnect' ) ;
@ -210,6 +338,8 @@ function masterInit() {
listening ( worker , message ) ;
else if ( message . act === 'suicide' )
worker . suicide = true ;
else if ( message . act === 'close' )
close ( worker , message ) ;
}
function online ( worker ) {
@ -224,17 +354,32 @@ function masterInit() {
message . addressType ,
message . fd ] ;
var key = args . join ( ':' ) ;
var e = handles [ key ] ;
if ( typeof e === 'undefined' ) {
e = { workers : [ ] } ;
if ( message . addressType === 'udp4' || message . addressType === 'udp6' )
e . handle = dgram . _ createSocketHandle . apply ( null , args ) ;
else
e . handle = net . _ createServerHandle . apply ( null , args ) ;
handles [ key ] = e ;
var handle = handles [ key ] ;
if ( typeof handle === 'undefined' ) {
var constructor = RoundRobinHandle ;
// UDP is exempt from round-robin connection balancing for what should
// be obvious reasons: it's connectionless. There is nothing to send to
// the workers except raw datagrams and that's pointless.
if ( schedulingPolicy !== SCHED_RR ||
message . addressType === 'udp4' ||
message . addressType === 'udp6' ) {
constructor = SharedHandle ;
}
handles [ key ] = handle = new constructor ( key ,
message . address ,
message . port ,
message . addressType ,
message . backlog ,
message . fd ) ;
}
e . workers . push ( worker ) ;
send ( worker , { ack : message . seq } , e . handle ) ;
handle . add ( worker , function ( errno , reply , handle ) {
reply = util . _ extend ( { ack : message . seq , key : key } , reply ) ;
if ( errno ) {
reply . errno = errno ;
delete handles [ key ] ; // Gives other workers a chance to retry.
}
send ( worker , reply , handle ) ;
} ) ;
}
function listening ( worker , message ) {
@ -249,6 +394,13 @@ function masterInit() {
cluster . emit ( 'listening' , worker , info ) ;
}
// Round-robin only. Server in worker is closing, remove from list.
function close ( worker , message ) {
var key = message . key ;
var handle = handles [ key ] ;
if ( handle . remove ( worker ) ) delete handles [ key ] ;
}
function send ( worker , message , handle , cb ) {
sendHelper ( worker . process , message , handle , cb ) ;
}
@ -256,7 +408,7 @@ function masterInit() {
function workerInit ( ) {
var handles = [ ] ;
var handles = { } ;
// Called from src/node.js
cluster . _ setupWorker = function ( ) {
@ -269,7 +421,10 @@ function workerInit() {
process . on ( 'internalMessage' , internal ( worker , onmessage ) ) ;
send ( { act : 'online' } ) ;
function onmessage ( message , handle ) {
if ( message . act === 'disconnect' ) worker . disconnect ( ) ;
if ( message . act === 'newconn' )
onconnection ( message , handle ) ;
else if ( message . act === 'disconnect' )
worker . disconnect ( ) ;
}
} ;
@ -282,28 +437,108 @@ function workerInit() {
act : 'queryServer' ,
fd : fd
} ;
send ( message , function ( _ , handle ) {
// Monkey-patch the close() method so we can keep track of when it's
// closed. Avoids resource leaks when the handle is short-lived.
var close = handle . close ;
handle . close = function ( ) {
var index = handles . indexOf ( handle ) ;
if ( index !== - 1 ) handles . splice ( index , 1 ) ;
return close . apply ( this , arguments ) ;
} ;
handles . push ( handle ) ;
cb ( handle ) ;
send ( message , function ( reply , handle ) {
if ( handle )
shared ( reply , handle , cb ) ; // Shared listen socket.
else
rr ( reply , cb ) ; // Round-robin.
} ) ;
obj . once ( 'listening' , function ( ) {
cluster . worker . state = 'listening' ;
var address = obj . address ( ) ;
message . act = 'listening' ;
message . port = obj . address ( ) . port || port ;
message . port = address && address . port || port ,
send ( message ) ;
} ) ;
} ;
// Shared listen socket.
function shared ( message , handle , cb ) {
var key = message . key ;
// Monkey-patch the close() method so we can keep track of when it's
// closed. Avoids resource leaks when the handle is short-lived.
var close = handle . close ;
handle . close = function ( ) {
delete handles [ key ] ;
return close . apply ( this , arguments ) ;
} ;
assert ( typeof handles [ key ] === 'undefined' ) ;
handles [ key ] = handle ;
cb ( handle ) ;
}
// Round-robin. Master distributes handles across workers.
function rr ( message , cb ) {
if ( message . errno )
onerror ( message , cb ) ;
else
onsuccess ( message , cb ) ;
function onerror ( message , cb ) {
function listen ( backlog ) {
process . _ errno = message . errno ;
return - 1 ;
}
function close ( ) {
}
cb ( { close : close , listen : listen } ) ;
}
function onsuccess ( message , cb ) {
var key = message . key ;
function listen ( backlog ) {
// TODO(bnoordhuis) Send a message to the master that tells it to
// update the backlog size. The actual backlog should probably be
// the largest requested size by any worker.
return 0 ;
}
function close ( ) {
// lib/net.js treats server._handle.close() as effectively synchronous.
// That means there is a time window between the call to close() and
// the ack by the master process in which we can still receive handles.
// onconnection() below handles that by sending those handles back to
// the master.
if ( typeof key === 'undefined' ) return ;
send ( { act : 'close' , key : key } ) ;
delete handles [ key ] ;
key = undefined ;
}
function getsockname ( ) {
var rv = { } ;
if ( key ) return util . _ extend ( rv , message . sockname ) ;
return rv ;
}
// Faux handle. Mimics a TCPWrap with just enough fidelity to get away
// with it. Fools net.Server into thinking that it's backed by a real
// handle.
var handle = {
close : close ,
listen : listen
} ;
if ( message . sockname ) {
handle . getsockname = getsockname ; // TCP handles only.
}
assert ( typeof handles [ key ] === 'undefined' ) ;
handles [ key ] = handle ;
cb ( handle ) ;
}
}
// Round-robin connection.
function onconnection ( message , handle ) {
var key = message . key ;
var server = handles [ key ] ;
var accepted = ( typeof server !== 'undefined' ) ;
send ( { ack : message . seq , accepted : accepted } ) ;
if ( accepted ) server . onconnection ( handle ) ;
}
Worker . prototype . disconnect = function ( ) {
for ( var handle ; handle = handles . shift ( ) ; handle . close ( ) ) ;
for ( var key in handles ) {
var handle = handles [ key ] ;
delete handles [ key ] ;
handle . close ( ) ;
}
process . disconnect ( ) ;
} ;