'use strict'; const StringDecoder = require('string_decoder').StringDecoder; const EventEmitter = require('events'); const net = require('net'); const dgram = require('dgram'); const util = require('util'); const constants = process.binding('constants').os.signals; const assert = require('assert'); const Process = process.binding('process_wrap').Process; const WriteWrap = process.binding('stream_wrap').WriteWrap; const uv = process.binding('uv'); const Pipe = process.binding('pipe_wrap').Pipe; const TTY = process.binding('tty_wrap').TTY; const TCP = process.binding('tcp_wrap').TCP; const UDP = process.binding('udp_wrap').UDP; const SocketList = require('internal/socket_list'); const { isUint8Array } = process.binding('util'); const errnoException = util._errnoException; const SocketListSend = SocketList.SocketListSend; const SocketListReceive = SocketList.SocketListReceive; module.exports = { ChildProcess, setupChannel, _validateStdio, getSocketList }; // this object contain function to convert TCP objects to native handle objects // and back again. const handleConversion = { 'net.Native': { simultaneousAccepts: true, send: function(message, handle, options) { return handle; }, got: function(message, handle, emit) { emit(handle); } }, 'net.Server': { simultaneousAccepts: true, send: function(message, server, options) { return server._handle; }, got: function(message, handle, emit) { var server = new net.Server(); server.listen(handle, function() { emit(server); }); } }, 'net.Socket': { send: function(message, socket, options) { if (!socket._handle) return; // if the socket was created by net.Server if (socket.server) { // the slave should keep track of the socket message.key = socket.server._connectionKey; var firstTime = !this.channel.sockets.send[message.key]; var socketList = getSocketList('send', this, message.key); // the server should no longer expose a .connection property // and when asked to close it should query the socket status from // the slaves if (firstTime) socket.server._setupSlave(socketList); // Act like socket is detached if (!options.keepOpen) socket.server._connections--; } var handle = socket._handle; // remove handle from socket object, it will be closed when the socket // will be sent if (!options.keepOpen) { handle.onread = nop; socket._handle = null; } return handle; }, postSend: function(handle, options, target) { // Store the handle after successfully sending it, so it can be closed // when the NODE_HANDLE_ACK is received. If the handle could not be sent, // just close it. if (handle && !options.keepOpen) { if (target) { // There can only be one _pendingHandle as passing handles are // processed one at a time: handles are stored in _handleQueue while // waiting for the NODE_HANDLE_ACK of the current passing handle. assert(!target._pendingHandle); target._pendingHandle = handle; } else { handle.close(); } } }, got: function(message, handle, emit) { var socket = new net.Socket({handle: handle}); socket.readable = socket.writable = true; // if the socket was created by net.Server we will track the socket if (message.key) { // add socket to connections list var socketList = getSocketList('got', this, message.key); socketList.add({ socket: socket }); } emit(socket); } }, 'dgram.Native': { simultaneousAccepts: false, send: function(message, handle, options) { return handle; }, got: function(message, handle, emit) { emit(handle); } }, 'dgram.Socket': { simultaneousAccepts: false, send: function(message, socket, options) { message.dgramType = socket.type; return socket._handle; }, got: function(message, handle, emit) { var socket = new dgram.Socket(message.dgramType); socket.bind(handle, function() { emit(socket); }); } } }; function ChildProcess() { EventEmitter.call(this); var self = this; this._closesNeeded = 1; this._closesGot = 0; this.connected = false; this.signalCode = null; this.exitCode = null; this.killed = false; this.spawnfile = null; this._handle = new Process(); this._handle.owner = this; this._handle.onexit = function(exitCode, signalCode) { // // follow 0.4.x behaviour: // // - normally terminated processes don't touch this.signalCode // - signaled processes don't touch this.exitCode // // new in 0.9.x: // // - spawn failures are reported with exitCode < 0 // var syscall = self.spawnfile ? 'spawn ' + self.spawnfile : 'spawn'; var err = (exitCode < 0) ? errnoException(exitCode, syscall) : null; if (signalCode) { self.signalCode = signalCode; } else { self.exitCode = exitCode; } if (self.stdin) { self.stdin.destroy(); } self._handle.close(); self._handle = null; if (exitCode < 0) { if (self.spawnfile) err.path = self.spawnfile; err.spawnargs = self.spawnargs.slice(1); self.emit('error', err); } else { self.emit('exit', self.exitCode, self.signalCode); } // if any of the stdio streams have not been touched, // then pull all the data through so that it can get the // eof and emit a 'close' event. // Do it on nextTick so that the user has one last chance // to consume the output, if for example they only want to // start reading the data once the process exits. process.nextTick(flushStdio, self); maybeClose(self); }; } util.inherits(ChildProcess, EventEmitter); function flushStdio(subprocess) { const stdio = subprocess.stdio; if (stdio == null) return; for (var i = 0; i < stdio.length; i++) { const stream = stdio[i]; if (!stream || !stream.readable || stream._readableState.readableListening) continue; stream.resume(); } } function createSocket(pipe, readable) { var s = new net.Socket({ handle: pipe }); if (readable) { s.writable = false; s.readable = true; } else { s.writable = true; s.readable = false; } return s; } function getHandleWrapType(stream) { if (stream instanceof Pipe) return 'pipe'; if (stream instanceof TTY) return 'tty'; if (stream instanceof TCP) return 'tcp'; if (stream instanceof UDP) return 'udp'; return false; } ChildProcess.prototype.spawn = function(options) { const self = this; var ipc; var ipcFd; var i; // If no `stdio` option was given - use default var stdio = options.stdio || 'pipe'; stdio = _validateStdio(stdio, false); ipc = stdio.ipc; ipcFd = stdio.ipcFd; stdio = options.stdio = stdio.stdio; if (ipc !== undefined) { // Let child process know about opened IPC channel options.envPairs = options.envPairs || []; options.envPairs.push('NODE_CHANNEL_FD=' + ipcFd); } this.spawnfile = options.file; this.spawnargs = options.args; var err = this._handle.spawn(options); // Run-time errors should emit an error, not throw an exception. if (err === uv.UV_EAGAIN || err === uv.UV_EMFILE || err === uv.UV_ENFILE || err === uv.UV_ENOENT) { process.nextTick(onErrorNT, self, err); // There is no point in continuing when we've hit EMFILE or ENFILE // because we won't be able to set up the stdio file descriptors. // It's kind of silly that the de facto spec for ENOENT (the test suite) // mandates that stdio _is_ set up, even if there is no process on the // receiving end, but it is what it is. if (err !== uv.UV_ENOENT) return err; } else if (err) { // Close all opened fds on error for (i = 0; i < stdio.length; i++) { const stream = stdio[i]; if (stream.type === 'pipe') { stream.handle.close(); } } this._handle.close(); this._handle = null; throw errnoException(err, 'spawn'); } this.pid = this._handle.pid; for (i = 0; i < stdio.length; i++) { const stream = stdio[i]; if (stream.type === 'ignore') continue; if (stream.ipc) { self._closesNeeded++; continue; } if (stream.handle) { // when i === 0 - we're dealing with stdin // (which is the only one writable pipe) stream.socket = createSocket(self.pid !== 0 ? stream.handle : null, i > 0); if (i > 0 && self.pid !== 0) { self._closesNeeded++; stream.socket.on('close', function() { maybeClose(self); }); } } } this.stdin = stdio.length >= 1 && stdio[0].socket !== undefined ? stdio[0].socket : null; this.stdout = stdio.length >= 2 && stdio[1].socket !== undefined ? stdio[1].socket : null; this.stderr = stdio.length >= 3 && stdio[2].socket !== undefined ? stdio[2].socket : null; this.stdio = stdio.map(function(stdio) { return stdio.socket === undefined ? null : stdio.socket; }); // Add .send() method and start listening for IPC data if (ipc !== undefined) setupChannel(this, ipc); return err; }; function onErrorNT(self, err) { self._handle.onexit(err); } ChildProcess.prototype.kill = function(sig) { var signal; if (sig === 0) { signal = 0; } else if (!sig) { signal = constants['SIGTERM']; } else { signal = constants[sig]; } if (signal === undefined) { throw new Error('Unknown signal: ' + sig); } if (this._handle) { var err = this._handle.kill(signal); if (err === 0) { /* Success. */ this.killed = true; return true; } if (err === uv.UV_ESRCH) { /* Already dead. */ } else if (err === uv.UV_EINVAL || err === uv.UV_ENOSYS) { /* The underlying platform doesn't support this signal. */ throw errnoException(err, 'kill'); } else { /* Other error, almost certainly EPERM. */ this.emit('error', errnoException(err, 'kill')); } } /* Kill didn't succeed. */ return false; }; ChildProcess.prototype.ref = function() { if (this._handle) this._handle.ref(); }; ChildProcess.prototype.unref = function() { if (this._handle) this._handle.unref(); }; class Control extends EventEmitter { constructor(channel) { super(); this.channel = channel; this.refs = 0; } ref() { if (++this.refs === 1) { this.channel.ref(); } } unref() { if (--this.refs === 0) { this.channel.unref(); this.emit('unref'); } } } function setupChannel(target, channel) { target.channel = channel; // _channel can be deprecated in version 8 Object.defineProperty(target, '_channel', { get() { return target.channel; }, set(val) { target.channel = val; }, enumerable: true }); target._handleQueue = null; target._pendingHandle = null; const control = new Control(channel); var decoder = new StringDecoder('utf8'); var jsonBuffer = ''; channel.buffering = false; channel.onread = function(nread, pool, recvHandle) { // TODO(bnoordhuis) Check that nread > 0. if (pool) { // Linebreak is used as a message end sign var chunks = decoder.write(pool).split('\n'); var numCompleteChunks = chunks.length - 1; // Last line does not have trailing linebreak var incompleteChunk = chunks[numCompleteChunks]; if (numCompleteChunks === 0) { jsonBuffer += incompleteChunk; this.buffering = jsonBuffer.length !== 0; return; } chunks[0] = jsonBuffer + chunks[0]; for (var i = 0; i < numCompleteChunks; i++) { var message = JSON.parse(chunks[i]); // There will be at most one NODE_HANDLE message in every chunk we // read because SCM_RIGHTS messages don't get coalesced. Make sure // that we deliver the handle with the right message however. if (message && message.cmd === 'NODE_HANDLE') handleMessage(target, message, recvHandle); else handleMessage(target, message, undefined); } jsonBuffer = incompleteChunk; this.buffering = jsonBuffer.length !== 0; } else { this.buffering = false; target.disconnect(); channel.onread = nop; channel.close(); target.channel = null; maybeClose(target); } }; // object where socket lists will live channel.sockets = { got: {}, send: {} }; // handlers will go through this target.on('internalMessage', function(message, handle) { // Once acknowledged - continue sending handles. if (message.cmd === 'NODE_HANDLE_ACK') { if (target._pendingHandle) { target._pendingHandle.close(); target._pendingHandle = null; } assert(Array.isArray(target._handleQueue)); var queue = target._handleQueue; target._handleQueue = null; for (var i = 0; i < queue.length; i++) { var args = queue[i]; target._send(args.message, args.handle, args.options, args.callback); } // Process a pending disconnect (if any). if (!target.connected && target.channel && !target._handleQueue) target._disconnect(); return; } if (message.cmd !== 'NODE_HANDLE') return; // Acknowledge handle receival. Don't emit error events (for example if // the other side has disconnected) because this call to send() is not // initiated by the user and it shouldn't be fatal to be unable to ACK // a message. target._send({ cmd: 'NODE_HANDLE_ACK' }, null, true); var obj = handleConversion[message.type]; // Update simultaneous accepts on Windows if (process.platform === 'win32') { handle._simultaneousAccepts = false; net._setSimultaneousAccepts(handle); } // Convert handle object obj.got.call(this, message, handle, function(handle) { handleMessage(target, message.msg, handle); }); }); target.send = function(message, handle, options, callback) { if (typeof handle === 'function') { callback = handle; handle = undefined; options = undefined; } else if (typeof options === 'function') { callback = options; options = undefined; } else if (options !== undefined && (options === null || typeof options !== 'object')) { throw new TypeError('"options" argument must be an object'); } options = Object.assign({swallowErrors: false}, options); if (this.connected) { return this._send(message, handle, options, callback); } const ex = new Error('channel closed'); if (typeof callback === 'function') { process.nextTick(callback, ex); } else { this.emit('error', ex); // FIXME(bnoordhuis) Defer to next tick. } return false; }; target._send = function(message, handle, options, callback) { assert(this.connected || this.channel); if (message === undefined) throw new TypeError('"message" argument cannot be undefined'); // Support legacy function signature if (typeof options === 'boolean') { options = {swallowErrors: options}; } // package messages with a handle object if (handle) { // this message will be handled by an internalMessage event handler message = { cmd: 'NODE_HANDLE', type: null, msg: message }; if (handle instanceof net.Socket) { message.type = 'net.Socket'; } else if (handle instanceof net.Server) { message.type = 'net.Server'; } else if (handle instanceof TCP || handle instanceof Pipe) { message.type = 'net.Native'; } else if (handle instanceof dgram.Socket) { message.type = 'dgram.Socket'; } else if (handle instanceof UDP) { message.type = 'dgram.Native'; } else { throw new TypeError('This handle type can\'t be sent'); } // Queue-up message and handle if we haven't received ACK yet. if (this._handleQueue) { this._handleQueue.push({ callback: callback, handle: handle, options: options, message: message.msg, }); return this._handleQueue.length === 1; } var obj = handleConversion[message.type]; // convert TCP object to native handle object handle = handleConversion[message.type].send.call(target, message, handle, options); // If handle was sent twice, or it is impossible to get native handle // out of it - just send a text without the handle. if (!handle) message = message.msg; // Update simultaneous accepts on Windows if (obj.simultaneousAccepts) { net._setSimultaneousAccepts(handle); } } else if (this._handleQueue && !(message && message.cmd === 'NODE_HANDLE_ACK')) { // Queue request anyway to avoid out-of-order messages. this._handleQueue.push({ callback: callback, handle: null, options: options, message: message, }); return this._handleQueue.length === 1; } var req = new WriteWrap(); req.async = false; var string = JSON.stringify(message) + '\n'; var err = channel.writeUtf8String(req, string, handle); if (err === 0) { if (handle) { if (!this._handleQueue) this._handleQueue = []; if (obj && obj.postSend) obj.postSend(handle, options, target); } req.oncomplete = function() { if (this.async === true) control.unref(); if (typeof callback === 'function') callback(null); }; if (req.async === true) { control.ref(); } else { process.nextTick(function() { req.oncomplete(); }); } } else { // Cleanup handle on error if (obj && obj.postSend) obj.postSend(handle, options); if (!options.swallowErrors) { const ex = errnoException(err, 'write'); if (typeof callback === 'function') { process.nextTick(callback, ex); } else { this.emit('error', ex); // FIXME(bnoordhuis) Defer to next tick. } } } /* If the master is > 2 read() calls behind, please stop sending. */ return channel.writeQueueSize < (65536 * 2); }; // connected will be set to false immediately when a disconnect() is // requested, even though the channel might still be alive internally to // process queued messages. The three states are distinguished as follows: // - disconnect() never requested: channel is not null and connected // is true // - disconnect() requested, messages in the queue: channel is not null // and connected is false // - disconnect() requested, channel actually disconnected: channel is // null and connected is false target.connected = true; target.disconnect = function() { if (!this.connected) { this.emit('error', new Error('IPC channel is already disconnected')); return; } // Do not allow any new messages to be written. this.connected = false; // If there are no queued messages, disconnect immediately. Otherwise, // postpone the disconnect so that it happens internally after the // queue is flushed. if (!this._handleQueue) this._disconnect(); }; target._disconnect = function() { assert(this.channel); // This marks the fact that the channel is actually disconnected. this.channel = null; if (this._pendingHandle) { this._pendingHandle.close(); this._pendingHandle = null; } var fired = false; function finish() { if (fired) return; fired = true; channel.close(); target.emit('disconnect'); } // If a message is being read, then wait for it to complete. if (channel.buffering) { this.once('message', finish); this.once('internalMessage', finish); return; } process.nextTick(finish); }; channel.readStart(); return control; } const INTERNAL_PREFIX = 'NODE_'; function handleMessage(target, message, handle) { if (!target.channel) return; var eventName = 'message'; if (message !== null && typeof message === 'object' && typeof message.cmd === 'string' && message.cmd.length > INTERNAL_PREFIX.length && message.cmd.slice(0, INTERNAL_PREFIX.length) === INTERNAL_PREFIX) { eventName = 'internalMessage'; } process.nextTick(() => { target.emit(eventName, message, handle); }); } function nop() { } function _validateStdio(stdio, sync) { var ipc; var ipcFd; // Replace shortcut with an array if (typeof stdio === 'string') { switch (stdio) { case 'ignore': stdio = ['ignore', 'ignore', 'ignore']; break; case 'pipe': stdio = ['pipe', 'pipe', 'pipe']; break; case 'inherit': stdio = [0, 1, 2]; break; default: throw new TypeError('Incorrect value of stdio option: ' + stdio); } } else if (!Array.isArray(stdio)) { throw new TypeError('Incorrect value of stdio option: ' + util.inspect(stdio)); } // At least 3 stdio will be created // Don't concat() a new Array() because it would be sparse, and // stdio.reduce() would skip the sparse elements of stdio. // See http://stackoverflow.com/a/5501711/3561 while (stdio.length < 3) stdio.push(undefined); // Translate stdio into C++-readable form // (i.e. PipeWraps or fds) stdio = stdio.reduce(function(acc, stdio, i) { function cleanup() { acc.filter(function(stdio) { return stdio.type === 'pipe' || stdio.type === 'ipc'; }).forEach(function(stdio) { if (stdio.handle) stdio.handle.close(); }); } // Defaults if (stdio == null) { stdio = i < 3 ? 'pipe' : 'ignore'; } if (stdio === 'ignore') { acc.push({type: 'ignore'}); } else if (stdio === 'pipe' || typeof stdio === 'number' && stdio < 0) { var a = { type: 'pipe', readable: i === 0, writable: i !== 0 }; if (!sync) a.handle = new Pipe(); acc.push(a); } else if (stdio === 'ipc') { if (sync || ipc !== undefined) { // Cleanup previously created pipes cleanup(); if (!sync) throw new Error('Child process can have only one IPC pipe'); else throw new Error('You cannot use IPC with synchronous forks'); } ipc = new Pipe(true); ipcFd = i; acc.push({ type: 'pipe', handle: ipc, ipc: true }); } else if (stdio === 'inherit') { acc.push({ type: 'inherit', fd: i }); } else if (typeof stdio === 'number' || typeof stdio.fd === 'number') { acc.push({ type: 'fd', fd: typeof stdio === 'number' ? stdio : stdio.fd }); } else if (getHandleWrapType(stdio) || getHandleWrapType(stdio.handle) || getHandleWrapType(stdio._handle)) { var handle = getHandleWrapType(stdio) ? stdio : getHandleWrapType(stdio.handle) ? stdio.handle : stdio._handle; acc.push({ type: 'wrap', wrapType: getHandleWrapType(handle), handle: handle }); } else if (isUint8Array(stdio) || typeof stdio === 'string') { if (!sync) { cleanup(); throw new TypeError('Asynchronous forks do not support ' + 'Buffer, Uint8Array or string input: ' + util.inspect(stdio)); } } else { // Cleanup cleanup(); throw new TypeError('Incorrect value for stdio stream: ' + util.inspect(stdio)); } return acc; }, []); return {stdio: stdio, ipc: ipc, ipcFd: ipcFd}; } function getSocketList(type, slave, key) { var sockets = slave.channel.sockets[type]; var socketList = sockets[key]; if (!socketList) { var Construct = type === 'send' ? SocketListSend : SocketListReceive; socketList = sockets[key] = new Construct(slave, key); } return socketList; } function maybeClose(subprocess) { subprocess._closesGot++; if (subprocess._closesGot === subprocess._closesNeeded) { subprocess.emit('close', subprocess.exitCode, subprocess.signalCode); } }