From 26c5905a99538f1c8f5aea131d0e24149625f254 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Fri, 7 Oct 2011 00:57:37 -0700 Subject: [PATCH] Reimplement child_process.fork Fixes test/simple/test-child-process-fork.js --- lib/child_process_uv.js | 121 ++++++++++++++++++++++++++++++++++++++-- src/node.cc | 3 + 2 files changed, 120 insertions(+), 4 deletions(-) diff --git a/lib/child_process_uv.js b/lib/child_process_uv.js index 56c3b673ed..3dbe5f11b7 100644 --- a/lib/child_process_uv.js +++ b/lib/child_process_uv.js @@ -24,11 +24,18 @@ var Process = process.binding('process_wrap').Process; var inherits = require('util').inherits; var constants; // if (!constants) constants = process.binding('constants'); +var LF = '\n'.charCodeAt(0); +var Pipe; + // constructors for lazy loading -function createPipe() { - var Pipe = process.binding('pipe_wrap').Pipe; - return new Pipe(); +function createPipe(ipc) { + // Lazy load + if (!Pipe) { + Pipe = new process.binding('pipe_wrap').Pipe; + } + + return new Pipe(ipc); } function createSocket(pipe, readable) { @@ -61,6 +68,106 @@ function mergeOptions(target, overrides) { } +function setupChannel(target, channel) { + target._channel = channel; + + var jsonBuffer = ''; + + channel.onread = function(pool, offset, length) { + if (pool) { + for (var i = 0; i < length; i++) { + if (pool[offset + i] === LF) { + jsonBuffer += pool.toString('ascii', offset, offset + i); + var message = JSON.parse(jsonBuffer); + jsonBuffer = pool.toString('ascii', i, length); + offset = i + 1; + + target.emit('message', message); + } + } + } else { + channel.close(); + target._channel = null; + } + }; + + target.send = function(message, fd) { + if (fd) throw new Error("not yet implemented"); + + if (!target._channel) throw new Error("channel closed"); + + // For overflow protection don't write if channel queue is too deep. + if (channel.writeQueueSize > 1024 * 1024) { + return false; + } + + var buffer = Buffer(JSON.stringify(message) + '\n'); + + var writeReq = channel.write(buffer); + + if (!writeReq) { + throw new Error(errno + " cannot write to IPC channel."); + } else { + writeReq.oncomplete = nop; + } + + return true; + }; + + channel.readStart(); +} + + +function nop() { } + + +exports.fork = function(modulePath, args, options) { + if (!options) options = {}; + + if (!args) args = []; + args.unshift(modulePath); + + if (options.stdinStream) { + throw new Error("stdinStream not allowed for fork()"); + } + + if (options.customFds) { + throw new Error("customFds not allowed for fork()"); + } + + // Leave stdin open for the IPC channel. stdout and stderr should be the + // same as the parent's. + options.customFds = [ -1, 1, 2 ]; + + // Just need to set this - child process won't actually use the fd. + // For backwards compat - this can be changed to 'NODE_CHANNEL' before v0.6. + options.env = { NODE_CHANNEL_FD: 42 }; + + // stdin is the IPC channel. + options.stdinStream = createPipe(true); + + var child = spawn(process.execPath, args, options); + + setupChannel(child, options.stdinStream); + + child.on('exit', function() { + if (child._channel) { + child._channel.close(); + } + }); + + return child; +}; + + +exports._forkChild = function() { + // set process.send() + var p = createPipe(true); + p.open(0); + setupChannel(process, p); +}; + + exports.exec = function(command /*, options, callback */) { var file, args, options, callback; @@ -213,7 +320,8 @@ var spawn = exports.spawn = function(file, args, options) { cwd: options ? options.cwd : null, windowsVerbatimArguments: !!(options && options.windowsVerbatimArguments), envPairs: envPairs, - customFds: options ? options.customFds : null + customFds: options ? options.customFds : null, + stdinStream: options ? options.stdinStream : null }); return child; @@ -266,6 +374,9 @@ inherits(ChildProcess, EventEmitter); function setStreamOption(name, index, options) { + // Skip if we already have options.stdinStream + if (options[name]) return; + if (options.customFds && typeof options.customFds[index] == 'number' && options.customFds[index] !== -1) { @@ -283,6 +394,8 @@ function setStreamOption(name, index, options) { ChildProcess.prototype.spawn = function(options) { var self = this; + debugger; + setStreamOption('stdinStream', 0, options); setStreamOption('stdoutStream', 1, options); setStreamOption('stderrStream', 2, options); diff --git a/src/node.cc b/src/node.cc index 61c2fcc97a..b46705943a 100644 --- a/src/node.cc +++ b/src/node.cc @@ -1078,6 +1078,9 @@ void MakeCallback(Handle object, HandleScope scope; Local callback_v = object->Get(String::New(method)); + if (!callback_v->IsFunction()) { + fprintf(stderr, "method = %s", method); + } assert(callback_v->IsFunction()); Local callback = Local::Cast(callback_v);