From 9e26dab150e15cdbc7fbec76dcadeafed7d85646 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Wed, 11 May 2011 00:41:16 -0700 Subject: [PATCH] child_process.spawnNode For making easy worker processes. --- doc/api/child_processes.markdown | 33 ++++++++++ lib/child_process.js | 60 ++++++++++++++++++ src/node.js | 15 +++++ src/node_child_process.cc | 67 ++++++++++++++++++-- src/node_child_process.h | 3 +- test/fixtures/child-process-channel.js | 9 +++ test/fixtures/child-process-spawn-node.js | 10 +++ test/simple/test-child-process-channel.js | 25 ++++++++ test/simple/test-child-process-spawn-node.js | 24 +++++++ 9 files changed, 238 insertions(+), 8 deletions(-) create mode 100644 test/fixtures/child-process-channel.js create mode 100644 test/fixtures/child-process-spawn-node.js create mode 100644 test/simple/test-child-process-channel.js create mode 100644 test/simple/test-child-process-spawn-node.js diff --git a/doc/api/child_processes.markdown b/doc/api/child_processes.markdown index b30de4bf27..3aa2a106a9 100644 --- a/doc/api/child_processes.markdown +++ b/doc/api/child_processes.markdown @@ -179,6 +179,39 @@ amount of data allowed on stdout or stderr - if this value is exceeded then the child process is killed. +### child_process.spawnNode(modulePath, arguments, options) + +This is a special case of the `spawn()` functionality for spawning Node +processes. In addition to having all the methods in a normal ChildProcess +instance, the returned object, has a communication channel built-in. The +channel is written to with `child.send(message)` and messages are recieved +by a `'message'` event on the child. + +For example: + + var n = spawnNode(__dirname + '/sub.js'); + + n.on('message', function(m) { + console.log('PARENT got message:', m); + }); + + n.send({ hello: 'world' }); + +And then the child script, `'sub.js'` would might look like this: + + process.on('message', function(m) { + console.log('CHILD got message:', m); + }); + + process.send({ foo: 'bar' }); + +In the child the `process` object will have a `send()` method, and `process` +will emit objects each time it receives a message on its channel. + +By default the spawned Node process will have the stdin, stdout, stderr associated +with the parent's. This can be overridden by using the `customFds` option. + + ### child.kill(signal='SIGTERM') Send a signal to the child process. If no argument is given, the process will diff --git a/lib/child_process.js b/lib/child_process.js index 2b2fc41352..44732c82a3 100644 --- a/lib/child_process.js +++ b/lib/child_process.js @@ -32,6 +32,60 @@ var spawn = exports.spawn = function(path, args /*, options, customFds */) { return child; }; + +function setupChannel(target, fd) { + target._channel = new Stream(fd); + target._channel.writable = true; + target._channel.readable = true; + + target._channel.resume(); + target._channel.setEncoding('ascii'); + + var buffer = ''; + target._channel.on('data', function(d) { + buffer += d; + var i; + while ((i = buffer.indexOf('\n')) >= 0) { + var json = buffer.slice(0, i); + buffer = buffer.slice(i + 1); + var m = JSON.parse(json); + target.emit('message', m); + } + }); + + target.send = function(m) { + target._channel.write(JSON.stringify(m) + '\n'); + }; +} + + +exports.spawnNode = function(modulePath, args, options) { + if (!options) options = {}; + options.wantChannel = true; + + if (!args) args = []; + args.unshift(modulePath); + + // Unless they gave up customFds, just use the parent process + if (!options.customFds) options.customFds = [0, 1, 2]; + + var child = spawn(process.execPath, args, options); + + setupChannel(child, child.fds[3]); + + child.on('exit', function() { + child._channel.destroy(); + }); + + return child; +}; + + +exports._spawnNodeChild = function(fd) { + setupChannel(process, fd); +}; + + exports.exec = function(command /*, options, callback */) { var _slice = Array.prototype.slice; var args = ['/bin/sh', ['-c', command]].concat(_slice.call(arguments, 1)); @@ -240,6 +294,12 @@ ChildProcess.prototype.spawn = function(path, args, options, customFds) { envPairs.push(key + '=' + env[key]); } + if (options && options.wantChannel) { + // The FILLMEIN will be replaced in C land with an integer! + // AWFUL! :D + envPairs.push('NODE_CHANNEL_FD=FILLMEIN'); + } + var fds = this._internal.spawn(path, args, cwd, diff --git a/src/node.js b/src/node.js index d03cbd1a63..ee8d184d1c 100644 --- a/src/node.js +++ b/src/node.js @@ -38,6 +38,8 @@ startup.processKillAndExit(); startup.processSignalHandlers(); + startup.processChannel(); + startup.removedMethods(); startup.resolveArgv0(); @@ -307,6 +309,19 @@ }; }; + + startup.processChannel = function() { + // If we were spawned with env NODE_CHANNEL_FD then load that up and + // start parsing data from that stream. + if (process.env.NODE_CHANNEL_FD) { + var fd = parseInt(process.env.NODE_CHANNEL_FD); + assert(fd >= 0); + var cp = NativeModule.require('child_process'); + cp._spawnNodeChild(fd); + assert(process.send); + } + } + startup._removedProcessMethods = { 'assert': 'process.assert() use require("assert").ok() instead', 'debug': 'process.debug() use console.error() instead', diff --git a/src/node_child_process.cc b/src/node_child_process.cc index 759c901203..a0a465e76b 100644 --- a/src/node_child_process.cc +++ b/src/node_child_process.cc @@ -35,6 +35,9 @@ #include #endif +#include /* socketpair */ +#include + # ifdef __APPLE__ # include # define environ (*_NSGetEnviron()) @@ -153,7 +156,7 @@ Handle ChildProcess::Spawn(const Arguments& args) { // Copy fourth argument, args[3], into a c-string array called env. Local env_handle = Local::Cast(args[3]); int envc = env_handle->Length(); - char **env = new char*[envc+1]; // heap allocated to detect errors + char **env = new char*[envc + 1]; // heap allocated to detect errors env[envc] = NULL; for (int i = 0; i < envc; i++) { String::Utf8Value pair(env_handle->Get(Integer::New(i))->ToString()); @@ -206,7 +209,7 @@ Handle ChildProcess::Spawn(const Arguments& args) { String::New("setgid argument must be a number or a string"))); } - + int channel_fd = -1; int r = child->Spawn(argv[0], argv, @@ -218,7 +221,8 @@ Handle ChildProcess::Spawn(const Arguments& args) { custom_uid, custom_uname, custom_gid, - custom_gname); + custom_gname, + &channel_fd); if (custom_uname != NULL) free(custom_uname); if (custom_gname != NULL) free(custom_gname); @@ -235,7 +239,8 @@ Handle ChildProcess::Spawn(const Arguments& args) { return ThrowException(Exception::Error(String::New("Error spawning"))); } - Local a = Array::New(3); + + Local a = Array::New(channel_fd >= 0 ? 4 : 3); assert(fds[0] >= 0); a->Set(0, Integer::New(fds[0])); // stdin @@ -244,6 +249,10 @@ Handle ChildProcess::Spawn(const Arguments& args) { assert(fds[2] >= 0); a->Set(2, Integer::New(fds[2])); // stderr + if (channel_fd >= 0) { + a->Set(3, Integer::New(channel_fd)); + } + return scope.Close(a); } @@ -291,6 +300,8 @@ void ChildProcess::Stop() { // Note that args[0] must be the same as the "file" param. This is an // execvp() requirement. // +// TODO: The arguments are rediculously long. Needs to be put into a struct. +// int ChildProcess::Spawn(const char *file, char *const args[], const char *cwd, @@ -301,7 +312,8 @@ int ChildProcess::Spawn(const char *file, int custom_uid, char *custom_uname, int custom_gid, - char *custom_gname) { + char *custom_gname, + int* channel) { HandleScope scope; assert(pid_ == -1); assert(!ev_is_active(&child_watcher_)); @@ -332,11 +344,37 @@ int ChildProcess::Spawn(const char *file, SetCloseOnExec(stderr_pipe[1]); } + + // The channel will be used by spawnNode() for a little JSON channel. + // The pointer is used to pass one end of the socket pair back to the + // parent. + // channel_fds[0] is for the parent + // channel_fds[1] is for the child + int channel_fds[2] = { -1, -1 }; + +#define NODE_CHANNEL_FD "NODE_CHANNEL_FD" + + for (int i = 0; env[i]; i++) { + if (!strncmp(env[i], NODE_CHANNEL_FD, sizeof NODE_CHANNEL_FD - 1)) { + if (socketpair(AF_UNIX, SOCK_STREAM, 0, channel_fds)) { + perror("socketpair()"); + return -1; + } + + assert(channel_fds[0] >= 0 && channel_fds[1] >= 0); + + SetNonBlocking(channel_fds[0]); + SetNonBlocking(channel_fds[1]); + // Write over the FILLMEIN :D + sprintf(env[i], NODE_CHANNEL_FD "=%d", channel_fds[1]); + } + } + // Save environ in the case that we get it clobbered // by the child process. char **save_our_env = environ; - switch (pid_ = vfork()) { + switch (pid_ = fork()) { case -1: // Error. Stop(); return -4; @@ -429,7 +467,11 @@ int ChildProcess::Spawn(const char *file, _exit(127); } - + // Close the parent's end of the channel. + if (channel_fds[0] >= 0) { + close(channel_fds[0]); + channel_fds[0] = -1; + } environ = env; @@ -472,6 +514,17 @@ int ChildProcess::Spawn(const char *file, stdio_fds[2] = custom_fds[2]; } + // Close the child's end of the channel. + if (channel_fds[1] >= 0) { + close(channel_fds[1]); + channel_fds[1] = -1; + assert(channel_fds[0] >= 0); + assert(channel); + *channel = channel_fds[0]; + } else { + *channel = -1; + } + return 0; } diff --git a/src/node_child_process.h b/src/node_child_process.h index 6090c47ff9..9665e291bf 100644 --- a/src/node_child_process.h +++ b/src/node_child_process.h @@ -89,7 +89,8 @@ class ChildProcess : ObjectWrap { int custom_uid, char *custom_uname, int custom_gid, - char *custom_gname); + char *custom_gname, + int* channel); // Simple syscall wrapper. Does not disable the watcher. onexit will be // called still. diff --git a/test/fixtures/child-process-channel.js b/test/fixtures/child-process-channel.js new file mode 100644 index 0000000000..802d046d32 --- /dev/null +++ b/test/fixtures/child-process-channel.js @@ -0,0 +1,9 @@ +var assert = require('assert'); + +console.log("NODE_CHANNEL_FD", process.env.NODE_CHANNEL_FD); +assert.ok(process.env.NODE_CHANNEL_FD); + +var fd = parseInt(process.env.NODE_CHANNEL_FD); +assert.ok(fd >= 0); + +process.exit(0); diff --git a/test/fixtures/child-process-spawn-node.js b/test/fixtures/child-process-spawn-node.js new file mode 100644 index 0000000000..9bc6c19873 --- /dev/null +++ b/test/fixtures/child-process-spawn-node.js @@ -0,0 +1,10 @@ +var assert = require('assert'); + +process.on('message', function(m) { + console.log('CHILD got message:', m); + assert.ok(m.hello); + // Note that we have to force exit. + process.exit(); +}); + +process.send({ foo: 'bar' }); diff --git a/test/simple/test-child-process-channel.js b/test/simple/test-child-process-channel.js new file mode 100644 index 0000000000..47d09d8e17 --- /dev/null +++ b/test/simple/test-child-process-channel.js @@ -0,0 +1,25 @@ +var assert = require('assert'); +var spawn = require('child_process').spawn; +var common = require('../common'); + +var sub = common.fixturesDir + '/child-process-channel.js'; + +var child = spawn(process.execPath, [ sub ], { + customFds: [0, 1, 2], + wantChannel: true +}); + +console.log("fds", child.fds); + +assert.ok(child.fds.length == 4); +assert.ok(child.fds[3] >= 0); + +var childExitCode = -1; + +child.on('exit', function(code) { + childExitCode = code; +}); + +process.on('exit', function() { + assert.ok(childExitCode == 0); +}); diff --git a/test/simple/test-child-process-spawn-node.js b/test/simple/test-child-process-spawn-node.js new file mode 100644 index 0000000000..7c895c3e9d --- /dev/null +++ b/test/simple/test-child-process-spawn-node.js @@ -0,0 +1,24 @@ +var assert = require('assert'); +var common = require('../common'); +var spawnNode = require('child_process').spawnNode; + +var n = spawnNode(common.fixturesDir + '/child-process-spawn-node.js'); + +var messageCount = 0; + +n.on('message', function(m) { + console.log('PARENT got message:', m); + assert.ok(m.foo); + messageCount++; +}); + +n.send({ hello: 'world' }); + +var childExitCode = -1; +n.on('exit', function(c) { + childExitCode = c; +}); + +process.on('exit', function() { + assert.ok(childExitCode == 0); +});