Browse Source

child_process.spawnNode

For making easy worker processes.
v0.7.4-release
Ryan Dahl 14 years ago
parent
commit
9e26dab150
  1. 33
      doc/api/child_processes.markdown
  2. 60
      lib/child_process.js
  3. 15
      src/node.js
  4. 65
      src/node_child_process.cc
  5. 3
      src/node_child_process.h
  6. 9
      test/fixtures/child-process-channel.js
  7. 10
      test/fixtures/child-process-spawn-node.js
  8. 25
      test/simple/test-child-process-channel.js
  9. 24
      test/simple/test-child-process-spawn-node.js

33
doc/api/child_processes.markdown

@ -179,6 +179,39 @@ amount of data allowed on stdout or stderr - if this value is exceeded then
the child process is killed. the child process is killed.
### child_process.spawnNode(modulePath, arguments, options)
This is a special case of the `spawn()` functionality for spawning Node
processes. In addition to having all the methods in a normal ChildProcess
instance, the returned object, has a communication channel built-in. The
channel is written to with `child.send(message)` and messages are recieved
by a `'message'` event on the child.
For example:
var n = spawnNode(__dirname + '/sub.js');
n.on('message', function(m) {
console.log('PARENT got message:', m);
});
n.send({ hello: 'world' });
And then the child script, `'sub.js'` would might look like this:
process.on('message', function(m) {
console.log('CHILD got message:', m);
});
process.send({ foo: 'bar' });
In the child the `process` object will have a `send()` method, and `process`
will emit objects each time it receives a message on its channel.
By default the spawned Node process will have the stdin, stdout, stderr associated
with the parent's. This can be overridden by using the `customFds` option.
### child.kill(signal='SIGTERM') ### child.kill(signal='SIGTERM')
Send a signal to the child process. If no argument is given, the process will Send a signal to the child process. If no argument is given, the process will

60
lib/child_process.js

@ -32,6 +32,60 @@ var spawn = exports.spawn = function(path, args /*, options, customFds */) {
return child; return child;
}; };
function setupChannel(target, fd) {
target._channel = new Stream(fd);
target._channel.writable = true;
target._channel.readable = true;
target._channel.resume();
target._channel.setEncoding('ascii');
var buffer = '';
target._channel.on('data', function(d) {
buffer += d;
var i;
while ((i = buffer.indexOf('\n')) >= 0) {
var json = buffer.slice(0, i);
buffer = buffer.slice(i + 1);
var m = JSON.parse(json);
target.emit('message', m);
}
});
target.send = function(m) {
target._channel.write(JSON.stringify(m) + '\n');
};
}
exports.spawnNode = function(modulePath, args, options) {
if (!options) options = {};
options.wantChannel = true;
if (!args) args = [];
args.unshift(modulePath);
// Unless they gave up customFds, just use the parent process
if (!options.customFds) options.customFds = [0, 1, 2];
var child = spawn(process.execPath, args, options);
setupChannel(child, child.fds[3]);
child.on('exit', function() {
child._channel.destroy();
});
return child;
};
exports._spawnNodeChild = function(fd) {
setupChannel(process, fd);
};
exports.exec = function(command /*, options, callback */) { exports.exec = function(command /*, options, callback */) {
var _slice = Array.prototype.slice; var _slice = Array.prototype.slice;
var args = ['/bin/sh', ['-c', command]].concat(_slice.call(arguments, 1)); var args = ['/bin/sh', ['-c', command]].concat(_slice.call(arguments, 1));
@ -240,6 +294,12 @@ ChildProcess.prototype.spawn = function(path, args, options, customFds) {
envPairs.push(key + '=' + env[key]); envPairs.push(key + '=' + env[key]);
} }
if (options && options.wantChannel) {
// The FILLMEIN will be replaced in C land with an integer!
// AWFUL! :D
envPairs.push('NODE_CHANNEL_FD=FILLMEIN');
}
var fds = this._internal.spawn(path, var fds = this._internal.spawn(path,
args, args,
cwd, cwd,

15
src/node.js

@ -38,6 +38,8 @@
startup.processKillAndExit(); startup.processKillAndExit();
startup.processSignalHandlers(); startup.processSignalHandlers();
startup.processChannel();
startup.removedMethods(); startup.removedMethods();
startup.resolveArgv0(); startup.resolveArgv0();
@ -307,6 +309,19 @@
}; };
}; };
startup.processChannel = function() {
// If we were spawned with env NODE_CHANNEL_FD then load that up and
// start parsing data from that stream.
if (process.env.NODE_CHANNEL_FD) {
var fd = parseInt(process.env.NODE_CHANNEL_FD);
assert(fd >= 0);
var cp = NativeModule.require('child_process');
cp._spawnNodeChild(fd);
assert(process.send);
}
}
startup._removedProcessMethods = { startup._removedProcessMethods = {
'assert': 'process.assert() use require("assert").ok() instead', 'assert': 'process.assert() use require("assert").ok() instead',
'debug': 'process.debug() use console.error() instead', 'debug': 'process.debug() use console.error() instead',

65
src/node_child_process.cc

@ -35,6 +35,9 @@
#include <sys/wait.h> #include <sys/wait.h>
#endif #endif
#include <sys/socket.h> /* socketpair */
#include <sys/un.h>
# ifdef __APPLE__ # ifdef __APPLE__
# include <crt_externs.h> # include <crt_externs.h>
# define environ (*_NSGetEnviron()) # define environ (*_NSGetEnviron())
@ -206,7 +209,7 @@ Handle<Value> ChildProcess::Spawn(const Arguments& args) {
String::New("setgid argument must be a number or a string"))); String::New("setgid argument must be a number or a string")));
} }
int channel_fd = -1;
int r = child->Spawn(argv[0], int r = child->Spawn(argv[0],
argv, argv,
@ -218,7 +221,8 @@ Handle<Value> ChildProcess::Spawn(const Arguments& args) {
custom_uid, custom_uid,
custom_uname, custom_uname,
custom_gid, custom_gid,
custom_gname); custom_gname,
&channel_fd);
if (custom_uname != NULL) free(custom_uname); if (custom_uname != NULL) free(custom_uname);
if (custom_gname != NULL) free(custom_gname); if (custom_gname != NULL) free(custom_gname);
@ -235,7 +239,8 @@ Handle<Value> ChildProcess::Spawn(const Arguments& args) {
return ThrowException(Exception::Error(String::New("Error spawning"))); return ThrowException(Exception::Error(String::New("Error spawning")));
} }
Local<Array> a = Array::New(3);
Local<Array> a = Array::New(channel_fd >= 0 ? 4 : 3);
assert(fds[0] >= 0); assert(fds[0] >= 0);
a->Set(0, Integer::New(fds[0])); // stdin a->Set(0, Integer::New(fds[0])); // stdin
@ -244,6 +249,10 @@ Handle<Value> ChildProcess::Spawn(const Arguments& args) {
assert(fds[2] >= 0); assert(fds[2] >= 0);
a->Set(2, Integer::New(fds[2])); // stderr a->Set(2, Integer::New(fds[2])); // stderr
if (channel_fd >= 0) {
a->Set(3, Integer::New(channel_fd));
}
return scope.Close(a); return scope.Close(a);
} }
@ -291,6 +300,8 @@ void ChildProcess::Stop() {
// Note that args[0] must be the same as the "file" param. This is an // Note that args[0] must be the same as the "file" param. This is an
// execvp() requirement. // execvp() requirement.
// //
// TODO: The arguments are rediculously long. Needs to be put into a struct.
//
int ChildProcess::Spawn(const char *file, int ChildProcess::Spawn(const char *file,
char *const args[], char *const args[],
const char *cwd, const char *cwd,
@ -301,7 +312,8 @@ int ChildProcess::Spawn(const char *file,
int custom_uid, int custom_uid,
char *custom_uname, char *custom_uname,
int custom_gid, int custom_gid,
char *custom_gname) { char *custom_gname,
int* channel) {
HandleScope scope; HandleScope scope;
assert(pid_ == -1); assert(pid_ == -1);
assert(!ev_is_active(&child_watcher_)); assert(!ev_is_active(&child_watcher_));
@ -332,11 +344,37 @@ int ChildProcess::Spawn(const char *file,
SetCloseOnExec(stderr_pipe[1]); SetCloseOnExec(stderr_pipe[1]);
} }
// The channel will be used by spawnNode() for a little JSON channel.
// The pointer is used to pass one end of the socket pair back to the
// parent.
// channel_fds[0] is for the parent
// channel_fds[1] is for the child
int channel_fds[2] = { -1, -1 };
#define NODE_CHANNEL_FD "NODE_CHANNEL_FD"
for (int i = 0; env[i]; i++) {
if (!strncmp(env[i], NODE_CHANNEL_FD, sizeof NODE_CHANNEL_FD - 1)) {
if (socketpair(AF_UNIX, SOCK_STREAM, 0, channel_fds)) {
perror("socketpair()");
return -1;
}
assert(channel_fds[0] >= 0 && channel_fds[1] >= 0);
SetNonBlocking(channel_fds[0]);
SetNonBlocking(channel_fds[1]);
// Write over the FILLMEIN :D
sprintf(env[i], NODE_CHANNEL_FD "=%d", channel_fds[1]);
}
}
// Save environ in the case that we get it clobbered // Save environ in the case that we get it clobbered
// by the child process. // by the child process.
char **save_our_env = environ; char **save_our_env = environ;
switch (pid_ = vfork()) { switch (pid_ = fork()) {
case -1: // Error. case -1: // Error.
Stop(); Stop();
return -4; return -4;
@ -429,7 +467,11 @@ int ChildProcess::Spawn(const char *file,
_exit(127); _exit(127);
} }
// Close the parent's end of the channel.
if (channel_fds[0] >= 0) {
close(channel_fds[0]);
channel_fds[0] = -1;
}
environ = env; environ = env;
@ -472,6 +514,17 @@ int ChildProcess::Spawn(const char *file,
stdio_fds[2] = custom_fds[2]; stdio_fds[2] = custom_fds[2];
} }
// Close the child's end of the channel.
if (channel_fds[1] >= 0) {
close(channel_fds[1]);
channel_fds[1] = -1;
assert(channel_fds[0] >= 0);
assert(channel);
*channel = channel_fds[0];
} else {
*channel = -1;
}
return 0; return 0;
} }

3
src/node_child_process.h

@ -89,7 +89,8 @@ class ChildProcess : ObjectWrap {
int custom_uid, int custom_uid,
char *custom_uname, char *custom_uname,
int custom_gid, int custom_gid,
char *custom_gname); char *custom_gname,
int* channel);
// Simple syscall wrapper. Does not disable the watcher. onexit will be // Simple syscall wrapper. Does not disable the watcher. onexit will be
// called still. // called still.

9
test/fixtures/child-process-channel.js

@ -0,0 +1,9 @@
var assert = require('assert');
console.log("NODE_CHANNEL_FD", process.env.NODE_CHANNEL_FD);
assert.ok(process.env.NODE_CHANNEL_FD);
var fd = parseInt(process.env.NODE_CHANNEL_FD);
assert.ok(fd >= 0);
process.exit(0);

10
test/fixtures/child-process-spawn-node.js

@ -0,0 +1,10 @@
var assert = require('assert');
process.on('message', function(m) {
console.log('CHILD got message:', m);
assert.ok(m.hello);
// Note that we have to force exit.
process.exit();
});
process.send({ foo: 'bar' });

25
test/simple/test-child-process-channel.js

@ -0,0 +1,25 @@
var assert = require('assert');
var spawn = require('child_process').spawn;
var common = require('../common');
var sub = common.fixturesDir + '/child-process-channel.js';
var child = spawn(process.execPath, [ sub ], {
customFds: [0, 1, 2],
wantChannel: true
});
console.log("fds", child.fds);
assert.ok(child.fds.length == 4);
assert.ok(child.fds[3] >= 0);
var childExitCode = -1;
child.on('exit', function(code) {
childExitCode = code;
});
process.on('exit', function() {
assert.ok(childExitCode == 0);
});

24
test/simple/test-child-process-spawn-node.js

@ -0,0 +1,24 @@
var assert = require('assert');
var common = require('../common');
var spawnNode = require('child_process').spawnNode;
var n = spawnNode(common.fixturesDir + '/child-process-spawn-node.js');
var messageCount = 0;
n.on('message', function(m) {
console.log('PARENT got message:', m);
assert.ok(m.foo);
messageCount++;
});
n.send({ hello: 'world' });
var childExitCode = -1;
n.on('exit', function(c) {
childExitCode = c;
});
process.on('exit', function() {
assert.ok(childExitCode == 0);
});
Loading…
Cancel
Save