@ -19,572 +19,304 @@
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
// USE OR OTHER DEALINGS IN THE SOFTWARE.
var EventEmitter = require ( 'events' ) . EventEmitter ;
var assert = require ( 'assert' ) ;
var assert = require ( 'assert' ) ;
var dgram = require ( 'dgram' ) ;
var fork = require ( 'child_process' ) . fork ;
var fork = require ( 'child_process' ) . fork ;
var net = require ( 'net' ) ;
var net = require ( 'net' ) ;
var EventEmitter = require ( 'events' ) . EventEmitter ;
var util = require ( 'util' ) ;
var util = require ( 'util' ) ;
function isObject ( o ) {
var cluster = new EventEmitter ;
return ( typeof o === 'object' && o !== null ) ;
module . exports = cluster ;
}
cluster . Worker = Worker ;
cluster . isWorker = ( 'NODE_UNIQUE_ID' in process . env ) ;
cluster . isMaster = ( cluster . isWorker === false ) ;
var debug ;
if ( process . env . NODE_DEBUG && /cluster/ . test ( process . env . NODE_DEBUG ) ) {
debug = function ( x ) {
var prefix = process . pid + ',' +
( process . env . NODE_UNIQUE_ID ? 'Worker' : 'Master' ) ;
console . error ( prefix , x ) ;
} ;
} else {
debug = function ( ) { } ;
}
// cluster object:
function Worker ( ) {
function Cluster ( ) {
if ( ! ( this instanceof Worker ) ) return new Worker ;
EventEmitter . call ( this ) ;
EventEmitter . call ( this ) ;
this . suicide = undefined ;
this . state = 'none' ;
this . id = 0 ;
}
}
util . inherits ( Worker , EventEmitter ) ;
util . inherits ( Cluster , EventEmitter ) ;
Worker . prototype . kill = function ( ) {
this . destroy . apply ( this , arguments ) ;
var cluster = module . exports = new Cluster ( ) ;
} ;
// Used in the master:
var masterStarted = false ;
var ids = 0 ;
var serverHandlers = { } ;
// Used in the worker:
Worker . prototype . send = function ( ) {
var serverListeners = { } ;
this . process . send . apply ( this . process , arguments ) ;
var queryIds = 0 ;
} ;
var queryCallbacks = { } ;
// Define isWorker and isMaster
// Master/worker specific methods are defined in the *Init() functions.
cluster . isWorker = 'NODE_UNIQUE_ID' in process . env ;
cluster . isMaster = ! cluster . isWorker ;
// The worker object is only used in a worker
cluster . worker = cluster . isWorker ? { } : null ;
// The workers array is only used in the master
cluster . workers = cluster . isMaster ? { } : null ;
// Settings object
if ( cluster . isMaster )
var settings = cluster . settings = { } ;
masterInit ( ) ;
else
workerInit ( ) ;
// Simple function to call a function on each worker
function eachWorker ( cb ) {
// Go through all workers
for ( var id in cluster . workers ) {
if ( cluster . workers . hasOwnProperty ( id ) ) {
cb ( cluster . workers [ id ] ) ;
}
}
}
// Extremely simple progress tracker
function masterInit ( ) {
function ProgressTracker ( missing , callback ) {
cluster . workers = { } ;
this . missing = missing ;
this . callback = callback ;
}
ProgressTracker . prototype . done = function ( ) {
this . missing -= 1 ;
this . check ( ) ;
} ;
ProgressTracker . prototype . check = function ( ) {
if ( this . missing === 0 ) this . callback ( ) ;
} ;
cluster . setupMaster = function ( options ) {
var intercom = new EventEmitter ;
// This can only be called from the master.
var settings = {
assert ( cluster . isMaster ) ;
args : process . argv . slice ( 2 ) ,
exec : process . argv [ 1 ] ,
// Don't allow this function to run more than once
execArgv : process . execArgv ,
if ( masterStarted ) return ;
silent : false
masterStarted = true ;
} ;
cluster . settings = settings ;
// Get filename and arguments
options = options || { } ;
// Indexed by address:port:etc key. Its entries are dicts with handle and
// workers keys. That second one is a list of workers that hold a reference
// By default, V8 writes the profile data of all processes to a sing le
// to the handle. When a worker dies, we scan the dicts and close the hand le
// v8.log.
// when its reference count drops to zero. Yes, that means we're doing an
//
// O(n*m) scan but n and m are small and worker deaths are rare events anyway.
// Running that log file through a tick processor produces bogus numbers
var handles = { } ;
// because many events won't match up with the recorded memory mappings
// and you end up with graphs where 80+% of ticks is unaccounted for.
var initialized = false ;
//
cluster . setupMaster = function ( options ) {
// Fixing the tick processor to deal with multi-process output is not very
if ( initialized === true ) return ;
// useful because the processes may be running wildly disparate workloads.
initialized = true ;
//
settings = util . _ extend ( settings , options || { } ) ;
// That's why we fix up the command line arguments to include
// Tell V8 to write profile data for each process to a separate file.
// a "--logfile=v8-%p.log" argument (where %p is expanded to the PID)
// Without --logfile=v8-%p.log, everything ends up in a single, unusable
// unless it already contains a --logfile argument.
// file. (Unusable because what V8 logs are memory addresses and each
var execArgv = options . execArgv || process . execArgv ;
// process has its own memory mappings.)
if ( execArgv . some ( function ( s ) { return /^--prof/ . test ( s ) ; } ) &&
if ( settings . execArgv . some ( function ( s ) { return /^--prof/ . test ( s ) ; } ) &&
! execArgv . some ( function ( s ) { return /^--logfile=/ . test ( s ) ; } ) )
! settings . execArgv . some ( function ( s ) { return /^--logfile=/ . test ( s ) ; } ) )
{
{
execArgv = execArgv . slice ( ) ;
settings . execArgv = settings . execArgv . concat ( [ '--logfile=v8-%p.log' ] ) ;
execArgv . push ( '--logfile=v8-%p.log' ) ;
}
}
cluster . settings = settings ;
// Set settings object
settings = cluster . settings = {
exec : options . exec || process . argv [ 1 ] ,
execArgv : execArgv ,
args : options . args || process . argv . slice ( 2 ) ,
silent : options . silent || false
} ;
// emit setup event
cluster . emit ( 'setup' ) ;
cluster . emit ( 'setup' ) ;
} ;
} ;
// Check if a message is internal only
var INTERNAL_PREFIX = 'NODE_CLUSTER_' ;
function isInternalMessage ( message ) {
return isObject ( message ) &&
typeof message . cmd === 'string' &&
message . cmd . length > INTERNAL_PREFIX . length &&
message . cmd . slice ( 0 , INTERNAL_PREFIX . length ) === INTERNAL_PREFIX ;
}
// Modify message object to be internal
function internalMessage ( inMessage ) {
var outMessage = util . _ extend ( { } , inMessage ) ;
// Add internal prefix to cmd
outMessage . cmd = INTERNAL_PREFIX + ( outMessage . cmd || '' ) ;
return outMessage ;
}
// Handle callback messages
function handleResponse ( outMessage , outHandle , inMessage , inHandle , worker ) {
// The message there will be sent
var message = internalMessage ( outMessage ) ;
// callback id - will be undefined if not set
var ids = 0 ;
message . _ queryEcho = inMessage . _ requestEcho ;
cluster . fork = function ( env ) {
cluster . setupMaster ( ) ;
var worker = new Worker ;
worker . id = ++ ids ;
var workerEnv = util . _ extend ( { } , process . env ) ;
workerEnv = util . _ extend ( workerEnv , env ) ;
workerEnv . NODE_UNIQUE_ID = '' + worker . id ;
worker . process = fork ( settings . exec , settings . args , {
env : workerEnv ,
silent : settings . silent ,
execArgv : settings . execArgv
} ) ;
worker . process . once ( 'exit' , function ( exitCode , signalCode ) {
worker . suicide = ! ! worker . suicide ;
worker . state = 'dead' ;
worker . emit ( 'exit' , exitCode , signalCode ) ;
cluster . emit ( 'exit' , worker , exitCode , signalCode ) ;
delete cluster . workers [ worker . id ] ;
} ) ;
worker . process . once ( 'disconnect' , function ( ) {
worker . suicide = ! ! worker . suicide ;
worker . state = 'disconnected' ;
worker . emit ( 'disconnect' ) ;
cluster . emit ( 'disconnect' , worker ) ;
delete cluster . workers [ worker . id ] ;
} ) ;
worker . process . on ( 'error' , worker . emit . bind ( worker , 'error' ) ) ;
worker . process . on ( 'message' , worker . emit . bind ( worker , 'message' ) ) ;
worker . process . on ( 'internalMessage' , internal ( worker , onmessage ) ) ;
process . nextTick ( function ( ) {
cluster . emit ( 'fork' , worker ) ;
} ) ;
cluster . workers [ worker . id ] = worker ;
return worker ;
} ;
// Call callback if a query echo is received
cluster . disconnect = function ( cb ) {
if ( inMessage . _ queryEcho ) {
for ( var key in cluster . workers ) {
queryCallbacks [ inMessage . _ queryEcho ] ( inMessage . content , inHandle ) ;
var worker = cluster . workers [ key ] ;
delete queryCallbacks [ inMessage . _ queryEcho ] ;
worker . disconnect ( ) ;
}
}
if ( cb ) intercom . once ( 'disconnect' , cb ) ;
} ;
// Send if outWrap contains something useful
cluster . on ( 'disconnect' , function ( worker ) {
if ( ! ( outMessage === undefined && message . _ queryEcho === undefined ) ) {
delete cluster . workers [ worker . id ] ;
sendInternalMessage ( worker , message , outHandle ) ;
// O(n*m) scan but for small values of n and m.
for ( var key in handles ) {
var e = handles [ key ] ;
var i = e . workers . indexOf ( worker ) ;
if ( i === - 1 ) continue ;
e . workers . splice ( i , 1 ) ;
if ( e . workers . length !== 0 ) continue ;
e . handle . close ( ) ;
delete handles [ key ] ;
}
}
}
if ( Object . keys ( handles ) . length === 0 ) {
intercom . emit ( 'disconnect' ) ;
// Handle messages from both master and workers
var messageHandler = { } ;
function handleMessage ( worker , inMessage , inHandle ) {
// Remove internal prefix
var message = util . _ extend ( { } , inMessage ) ;
message . cmd = inMessage . cmd . substr ( INTERNAL_PREFIX . length ) ;
var respondUsed = false ;
function respond ( outMessage , outHandler ) {
respondUsed = true ;
handleResponse ( outMessage , outHandler , inMessage , inHandle , worker ) ;
}
}
} ) ;
// Run handler if it exists
Worker . prototype . disconnect = function ( ) {
if ( messageHandler [ message . cmd ] ) {
this . suicide = true ;
messageHandler [ message . cmd ] ( message , worker , respond ) ;
send ( this , { act : 'disconnect' } ) ;
}
} ;
// Send respond if it hasn't been called yet
Worker . prototype . destroy = function ( signo ) {
if ( respondUsed === false ) {
signo = signo || 'SIGTERM' ;
respond ( ) ;
var proc = this . process ;
if ( proc . connected ) {
proc . once ( 'disconnect' , proc . kill . bind ( proc , signo ) ) ;
proc . disconnect ( ) ;
return ;
}
}
}
proc . kill ( signo ) ;
} ;
// Messages to the master will be handled using these methods
function onmessage ( message , handle ) {
if ( cluster . isMaster ) {
var worker = this ;
if ( message . act === 'online' )
online ( worker ) ;
else if ( message . act === 'queryServer' )
queryServer ( worker , message ) ;
else if ( message . act === 'listening' )
listening ( worker , message ) ;
else if ( message . act === 'suicide' )
worker . suicide = true ;
}
// Handle online messages from workers
function online ( worker ) {
messageHandler . online = function ( message , worker ) {
worker . state = 'online' ;
worker . state = 'online' ;
debug ( 'Worker ' + worker . process . pid + ' online' ) ;
worker . emit ( 'online' ) ;
worker . emit ( 'online' ) ;
cluster . emit ( 'online' , worker ) ;
cluster . emit ( 'online' , worker ) ;
} ;
}
// Handle queryServer messages from workers
messageHandler . queryServer = function ( message , worker , send ) {
// This sequence of information is unique to the connection
function queryServer ( worker , message ) {
// but not to the worker
var args = [ message . address ,
var args = [ message . address ,
message . port ,
message . port ,
message . addressType ,
message . addressType ,
message . fd ] ;
message . fd ] ;
var key = args . join ( ':' ) ;
var key = args . join ( ':' ) ;
var handler ;
var e = handles [ key ] ;
if ( typeof e === 'undefined' ) {
if ( serverHandlers . hasOwnProperty ( key ) ) {
e = { workers : [ ] } ;
handler = serverHandlers [ key ] ;
if ( message . addressType === 'udp4' || message . addressType === 'udp6' )
} else if ( message . addressType === 'udp4' ||
e . handle = dgram . _ createSocketHandle . apply ( null , args ) ;
message . addressType === 'udp6' ) {
else
var dgram = require ( 'dgram' ) ;
e . handle = net . _ createServerHandle . apply ( null , args ) ;
handler = dgram . _ createSocketHandle . apply ( net , args ) ;
handles [ key ] = e ;
serverHandlers [ key ] = handler ;
}
} else {
e . workers . push ( worker ) ;
handler = net . _ createServerHandle . apply ( net , args ) ;
send ( worker , { ack : message . seq } , e . handle ) ;
serverHandlers [ key ] = handler ;
}
}
// echo callback with the fd handler associated with it
function listening ( worker , message ) {
send ( { } , handler ) ;
var info = {
} ;
// Handle listening messages from workers
messageHandler . listening = function ( message , worker ) {
worker . state = 'listening' ;
// Emit listening, now that we know the worker is listening
worker . emit ( 'listening' , {
address : message . address ,
port : message . port ,
addressType : message . addressType ,
addressType : message . addressType ,
fd : message . fd
} ) ;
cluster . emit ( 'listening' , worker , {
address : message . address ,
address : message . address ,
port : message . port ,
port : message . port ,
addressType : message . addressType ,
fd : message . fd
fd : message . fd
} ) ;
} ;
// Handle suicide messages from workers
messageHandler . suicide = function ( message , worker ) {
worker . suicide = true ;
} ;
}
// Messages to a worker will be handled using these methods
else if ( cluster . isWorker ) {
// Handle worker.disconnect from master
messageHandler . disconnect = function ( message , worker ) {
worker . disconnect ( ) ;
} ;
} ;
}
worker . state = 'listening' ;
worker . emit ( 'listening' , info ) ;
function toDecInt ( value ) {
cluster . emit ( 'listening' , worker , info ) ;
value = parseInt ( value , 10 ) ;
return isNaN ( value ) ? null : value ;
}
// Create a worker object, that works both for master and worker
function Worker ( customEnv ) {
if ( ! ( this instanceof Worker ) ) return new Worker ( ) ;
EventEmitter . call ( this ) ;
var self = this ;
var env = process . env ;
// Assign a unique id, default null
this . id = cluster . isMaster ? ++ ids : toDecInt ( env . NODE_UNIQUE_ID ) ;
// XXX: Legacy. Remove in 0.9
this . workerID = this . uniqueID = this . id ;
// Assign state
this . state = 'none' ;
// Create or get process
if ( cluster . isMaster ) {
// Create env object
// first: copy and add id property
var envCopy = util . _ extend ( { } , env ) ;
envCopy [ 'NODE_UNIQUE_ID' ] = this . id ;
// second: extend envCopy with the env argument
if ( isObject ( customEnv ) ) {
envCopy = util . _ extend ( envCopy , customEnv ) ;
}
// fork worker
this . process = fork ( settings . exec , settings . args , {
'env' : envCopy ,
'silent' : settings . silent ,
'execArgv' : settings . execArgv
} ) ;
} else {
this . process = process ;
}
if ( cluster . isMaster ) {
// Save worker in the cluster.workers array
cluster . workers [ this . id ] = this ;
// Emit a fork event, on next tick
// There is no worker.fork event since this has no real purpose
process . nextTick ( function ( ) {
cluster . emit ( 'fork' , self ) ;
} ) ;
}
// handle internalMessage, exit and disconnect event
this . process . on ( 'internalMessage' , handleMessage . bind ( null , this ) ) ;
this . process . once ( 'exit' , function ( exitCode , signalCode ) {
prepareExit ( self , 'dead' ) ;
self . emit ( 'exit' , exitCode , signalCode ) ;
cluster . emit ( 'exit' , self , exitCode , signalCode ) ;
} ) ;
this . process . once ( 'disconnect' , function ( ) {
prepareExit ( self , 'disconnected' ) ;
self . emit ( 'disconnect' ) ;
cluster . emit ( 'disconnect' , self ) ;
} ) ;
// relay message and error
this . process . on ( 'message' , this . emit . bind ( this , 'message' ) ) ;
this . process . on ( 'error' , this . emit . bind ( this , 'error' ) ) ;
}
util . inherits ( Worker , EventEmitter ) ;
cluster . Worker = Worker ;
function prepareExit ( worker , state ) {
// set state to disconnect
worker . state = state ;
// Make suicide a boolean
worker . suicide = ! ! worker . suicide ;
// Remove from workers in the master
if ( cluster . isMaster ) {
delete cluster . workers [ worker . id ] ;
}
}
// Send internal message
function sendInternalMessage ( worker , message /*, handler, callback*/ ) {
// Exist callback
var callback = arguments [ arguments . length - 1 ] ;
if ( typeof callback !== 'function' ) {
callback = undefined ;
}
// exist handler
var handler = arguments [ 2 ] !== callback ? arguments [ 2 ] : undefined ;
if ( ! isInternalMessage ( message ) ) {
message = internalMessage ( message ) ;
}
}
// Store callback for later
function send ( worker , message , handle , cb ) {
if ( callback ) {
sendHelper ( worker . process , message , handle , cb ) ;
message . _ requestEcho = worker . id + ':' + ( ++ queryIds ) ;
queryCallbacks [ message . _ requestEcho ] = callback ;
}
}
worker . send ( message , handler ) ;
}
}
// Send message to worker or master
Worker . prototype . send = function ( ) {
// You could also just use process.send in a worker
this . process . send . apply ( this . process , arguments ) ;
} ;
// Kill the worker without restarting
function workerInit ( ) {
Worker . prototype . kill = Worker . prototype . destroy = function ( signal ) {
var handles = [ ] ;
if ( ! signal )
signal = 'SIGTERM' ;
var self = this ;
this . suicide = true ;
if ( cluster . isMaster ) {
// Called from src/node.js
// Disconnect IPC channel
cluster . _ setupWorker = function ( ) {
// this way the worker won't need to propagate suicide state to master
var worker = new Worker ;
if ( self . process . connected ) {
cluster . worker = worker ;
self . process . once ( 'disconnect' , function ( ) {
worker . id = + process . env . NODE_UNIQUE_ID | 0 ;
self . process . kill ( signal ) ;
worker . state = 'online' ;
} ) ;
worker . process = process ;
self . process . disconnect ( ) ;
process . once ( 'disconnect' , process . exit . bind ( null , 0 ) ) ;
} else {
process . on ( 'internalMessage' , internal ( worker , onmessage ) ) ;
self . process . kill ( signal ) ;
send ( { act : 'online' } ) ;
function onmessage ( message , handle ) {
if ( message . act === 'disconnect' ) worker . disconnect ( ) ;
}
}
} ;
} else {
// obj is a net#Server or a dgram#Socket object.
// Channel is open
cluster . _ getServer = function ( obj , address , port , addressType , fd , cb ) {
if ( this . process . connected ) {
var message = {
addressType : addressType ,
// Inform master to suicide and then kill
address : address ,
sendInternalMessage ( this , { cmd : 'suicide' } , function ( ) {
port : port ,
process . exit ( 0 ) ;
act : 'queryServer' ,
fd : fd
} ;
send ( message , function ( _ , handle ) {
// Monkey-patch the close() method so we can keep track of when it's
// closed. Avoids resource leaks when the handle is short-lived.
var close = handle . close ;
handle . close = function ( ) {
var index = handles . indexOf ( handle ) ;
if ( index !== - 1 ) handles . splice ( index , 1 ) ;
return close . apply ( this , arguments ) ;
} ;
handles . push ( handle ) ;
cb ( handle ) ;
} ) ;
} ) ;
obj . once ( 'listening' , function ( ) {
// When channel is closed, terminate the process
cluster . worker . state = 'listening' ;
this . process . once ( 'disconnect' , function ( ) {
message . act = 'listening' ;
process . exit ( 0 ) ;
message . port = obj . address ( ) . port || port ;
send ( message ) ;
} ) ;
} ) ;
} else {
process . exit ( 0 ) ;
}
}
} ;
// The .disconnect function will close all servers
// and then disconnect the IPC channel.
if ( cluster . isMaster ) {
// Used in master
Worker . prototype . disconnect = function ( ) {
this . suicide = true ;
sendInternalMessage ( this , { cmd : 'disconnect' } ) ;
} ;
} ;
} else {
// Used in workers
Worker . prototype . disconnect = function ( ) {
Worker . prototype . disconnect = function ( ) {
var self = this ;
for ( var handle ; handle = handles . shift ( ) ; handle . close ( ) ) ;
process . disconnect ( ) ;
this . suicide = true ;
// keep track of open servers
var servers = Object . keys ( serverListeners ) . length ;
var progress = new ProgressTracker ( servers , function ( ) {
// There are no more servers open so we will close the IPC channel.
// Closing the IPC channel will emit a disconnect event
// in both master and worker on the process object.
// This event will be handled by prepareExit.
self . process . disconnect ( ) ;
} ) ;
// depending on where this function was called from (master or worker)
// The suicide state has already been set,
// but it doesn't really matter if we set it again.
sendInternalMessage ( this , { cmd : 'suicide' } , function ( ) {
// in case there are no servers
progress . check ( ) ;
// closing all servers gracefully
var server ;
for ( var key in serverListeners ) {
server = serverListeners [ key ] ;
// in case the server is closed we won't close it again
if ( server . _ handle === null ) {
progress . done ( ) ;
continue ;
}
server . on ( 'close' , progress . done . bind ( progress ) ) ;
server . close ( ) ;
}
} ) ;
} ;
} ;
}
// Fork a new worker
Worker . prototype . destroy = function ( ) {
cluster . fork = function ( env ) {
if ( ! process . connected ) process . exit ( 0 ) ;
// This can only be called from the master.
var exit = process . exit . bind ( null , 0 ) ;
assert ( cluster . isMaster ) ;
send ( { act : 'suicide' } , exit ) ;
process . once ( 'disconnect' , exit ) ;
// Make sure that the master has been initialized
process . disconnect ( ) ;
cluster . setupMaster ( ) ;
} ;
return ( new cluster . Worker ( env ) ) ;
} ;
// execute .disconnect on all workers and close handlers when done
function send ( message , cb ) {
cluster . disconnect = function ( callback ) {
sendHelper ( process , message , null , cb ) ;
// This can only be called from the master.
assert ( cluster . isMaster ) ;
// Close all TCP handlers when all workers are disconnected
var workers = Object . keys ( cluster . workers ) . length ;
var progress = new ProgressTracker ( workers , function ( ) {
for ( var key in serverHandlers ) {
serverHandlers [ key ] . close ( ) ;
delete serverHandlers [ key ] ;
}
}
}
// call callback when done
if ( callback ) callback ( ) ;
} ) ;
// begin disconnecting all workers
eachWorker ( function ( worker ) {
worker . once ( 'disconnect' , progress . done . bind ( progress ) ) ;
worker . disconnect ( ) ;
} ) ;
// in case there weren't any workers
progress . check ( ) ;
} ;
// Internal function. Called from src/node.js when worker process starts.
var seq = 0 ;
cluster . _ setupWorker = function ( ) {
var callbacks = { } ;
function sendHelper ( proc , message , handle , cb ) {
// Mark message as internal. See INTERNAL_PREFIX in lib/child_process.js
message = util . _ extend ( { cmd : 'NODE_CLUSTER' } , message ) ;
if ( cb ) callbacks [ seq ] = cb ;
message . seq = seq ;
seq += 1 ;
proc . send ( message , handle ) ;
}
// Get worker class
var worker = cluster . worker = new Worker ( ) ;
// we will terminate the worker
// Returns an internalMessage listener that hands off normal messages
// when the worker is disconnected from the parent accidentally
// to the callback but intercepts and redirects ACK messages.
process . once ( 'disconnect' , function ( ) {
function internal ( worker , cb ) {
if ( worker . suicide !== true ) {
return function ( message , handle ) {
process . exit ( 0 ) ;
if ( message . cmd !== 'NODE_CLUSTER' ) return ;
var fn = cb ;
if ( typeof message . ack !== 'undefined' ) {
fn = callbacks [ message . ack ] ;
delete callbacks [ message . ack ] ;
}
}
} ) ;
fn . apply ( worker , arguments ) ;
// Tell master that the worker is online
worker . state = 'online' ;
sendInternalMessage ( worker , { cmd : 'online' } ) ;
} ;
// Internal function. Called by net.js and dgram.js when attempting to bind a
// TCP server or UDP socket.
cluster . _ getServer = function ( tcpSelf , address , port , addressType , fd , cb ) {
// This can only be called from a worker.
assert ( cluster . isWorker ) ;
// Store tcp instance for later use
var key = [ address , port , addressType , fd ] . join ( ':' ) ;
serverListeners [ key ] = tcpSelf ;
// Send a listening message to the master
tcpSelf . once ( 'listening' , function ( ) {
cluster . worker . state = 'listening' ;
sendInternalMessage ( cluster . worker , {
cmd : 'listening' ,
address : address ,
port : tcpSelf . address ( ) . port || port ,
addressType : addressType ,
fd : fd
} ) ;
} ) ;
// Request the fd handler from the master process
var message = {
cmd : 'queryServer' ,
address : address ,
port : port ,
addressType : addressType ,
fd : fd
} ;
} ;
}
// The callback will be stored until the master has responded
sendInternalMessage ( cluster . worker , message , function ( msg , handle ) {
cb ( handle ) ;
} ) ;
} ;