Browse Source

Reimplement child_process.fork

Fixes test/simple/test-child-process-fork.js
v0.7.4-release
Ryan Dahl 13 years ago
parent
commit
26c5905a99
  1. 121
      lib/child_process_uv.js
  2. 3
      src/node.cc

121
lib/child_process_uv.js

@ -24,11 +24,18 @@ var Process = process.binding('process_wrap').Process;
var inherits = require('util').inherits;
var constants; // if (!constants) constants = process.binding('constants');
var LF = '\n'.charCodeAt(0);
var Pipe;
// constructors for lazy loading
function createPipe() {
var Pipe = process.binding('pipe_wrap').Pipe;
return new Pipe();
function createPipe(ipc) {
// Lazy load
if (!Pipe) {
Pipe = new process.binding('pipe_wrap').Pipe;
}
return new Pipe(ipc);
}
function createSocket(pipe, readable) {
@ -61,6 +68,106 @@ function mergeOptions(target, overrides) {
}
function setupChannel(target, channel) {
target._channel = channel;
var jsonBuffer = '';
channel.onread = function(pool, offset, length) {
if (pool) {
for (var i = 0; i < length; i++) {
if (pool[offset + i] === LF) {
jsonBuffer += pool.toString('ascii', offset, offset + i);
var message = JSON.parse(jsonBuffer);
jsonBuffer = pool.toString('ascii', i, length);
offset = i + 1;
target.emit('message', message);
}
}
} else {
channel.close();
target._channel = null;
}
};
target.send = function(message, fd) {
if (fd) throw new Error("not yet implemented");
if (!target._channel) throw new Error("channel closed");
// For overflow protection don't write if channel queue is too deep.
if (channel.writeQueueSize > 1024 * 1024) {
return false;
}
var buffer = Buffer(JSON.stringify(message) + '\n');
var writeReq = channel.write(buffer);
if (!writeReq) {
throw new Error(errno + " cannot write to IPC channel.");
} else {
writeReq.oncomplete = nop;
}
return true;
};
channel.readStart();
}
function nop() { }
exports.fork = function(modulePath, args, options) {
if (!options) options = {};
if (!args) args = [];
args.unshift(modulePath);
if (options.stdinStream) {
throw new Error("stdinStream not allowed for fork()");
}
if (options.customFds) {
throw new Error("customFds not allowed for fork()");
}
// Leave stdin open for the IPC channel. stdout and stderr should be the
// same as the parent's.
options.customFds = [ -1, 1, 2 ];
// Just need to set this - child process won't actually use the fd.
// For backwards compat - this can be changed to 'NODE_CHANNEL' before v0.6.
options.env = { NODE_CHANNEL_FD: 42 };
// stdin is the IPC channel.
options.stdinStream = createPipe(true);
var child = spawn(process.execPath, args, options);
setupChannel(child, options.stdinStream);
child.on('exit', function() {
if (child._channel) {
child._channel.close();
}
});
return child;
};
exports._forkChild = function() {
// set process.send()
var p = createPipe(true);
p.open(0);
setupChannel(process, p);
};
exports.exec = function(command /*, options, callback */) {
var file, args, options, callback;
@ -213,7 +320,8 @@ var spawn = exports.spawn = function(file, args, options) {
cwd: options ? options.cwd : null,
windowsVerbatimArguments: !!(options && options.windowsVerbatimArguments),
envPairs: envPairs,
customFds: options ? options.customFds : null
customFds: options ? options.customFds : null,
stdinStream: options ? options.stdinStream : null
});
return child;
@ -266,6 +374,9 @@ inherits(ChildProcess, EventEmitter);
function setStreamOption(name, index, options) {
// Skip if we already have options.stdinStream
if (options[name]) return;
if (options.customFds &&
typeof options.customFds[index] == 'number' &&
options.customFds[index] !== -1) {
@ -283,6 +394,8 @@ function setStreamOption(name, index, options) {
ChildProcess.prototype.spawn = function(options) {
var self = this;
debugger;
setStreamOption('stdinStream', 0, options);
setStreamOption('stdoutStream', 1, options);
setStreamOption('stderrStream', 2, options);

3
src/node.cc

@ -1078,6 +1078,9 @@ void MakeCallback(Handle<Object> object,
HandleScope scope;
Local<Value> callback_v = object->Get(String::New(method));
if (!callback_v->IsFunction()) {
fprintf(stderr, "method = %s", method);
}
assert(callback_v->IsFunction());
Local<Function> callback = Local<Function>::Cast(callback_v);

Loading…
Cancel
Save