From 607aa3ac82cb8c07487b474e548aa205e8ee3ca5 Mon Sep 17 00:00:00 2001 From: Ben Noordhuis Date: Mon, 31 Aug 2015 00:49:34 +0200 Subject: [PATCH] child_process: add callback parameter to .send() Add an optional callback parameter to `ChildProcess.prototype.send()` that is invoked when the message has been sent. Juggle the control channel's reference count so that in-flight messages keep the event loop (and therefore the process) alive until they have been sent. `ChildProcess.prototype.send()` and `process.send()` used to operate synchronously but became asynchronous in commit libuv/libuv@393c1c5 ("unix: set non-block mode in uv_{pipe,tcp,udp}_open"), which landed in io.js in commit 07bd05b ("deps: update libuv to 1.2.1"). Fixes: https://github.com/nodejs/node/issues/760 PR-URL: https://github.com/nodejs/node/pull/2620 Reviewed-By: trevnorris - Trevor Norris Reviewed-By: jasnell - James M Snell --- doc/api/child_process.markdown | 23 ++++-- doc/api/cluster.markdown | 4 +- lib/child_process.js | 10 +-- lib/internal/child_process.js | 91 ++++++++++++++++----- test/parallel/test-child-process-send-cb.js | 19 +++++ 5 files changed, 111 insertions(+), 36 deletions(-) create mode 100644 test/parallel/test-child-process-send-cb.js diff --git a/doc/api/child_process.markdown b/doc/api/child_process.markdown index 05308068a9..31ac72451d 100644 --- a/doc/api/child_process.markdown +++ b/doc/api/child_process.markdown @@ -214,13 +214,15 @@ to a process. See `kill(2)` -### child.send(message[, sendHandle]) +### child.send(message[, sendHandle][, callback]) * `message` {Object} * `sendHandle` {Handle object} +* `callback` {Function} +* Return: Boolean When using `child_process.fork()` you can write to the child using -`child.send(message, [sendHandle])` and messages are received by +`child.send(message[, sendHandle][, callback])` and messages are received by a `'message'` event on the child. For example: @@ -246,11 +248,6 @@ And then the child script, `'sub.js'` might look like this: In the child the `process` object will have a `send()` method, and `process` will emit objects each time it receives a message on its channel. -Please note that the `send()` method on both the parent and child are -synchronous - sending large chunks of data is not advised (pipes can be used -instead, see -[`child_process.spawn`](#child_process_child_process_spawn_command_args_options)). - There is a special case when sending a `{cmd: 'NODE_foo'}` message. All messages containing a `NODE_` prefix in its `cmd` property will not be emitted in the `message` event, since they are internal messages used by Node.js core. @@ -261,8 +258,16 @@ The `sendHandle` option to `child.send()` is for sending a TCP server or socket object to another process. The child will receive the object as its second argument to the `message` event. -Emits an `'error'` event if the message cannot be sent, for example because -the child process has already exited. +The `callback` option is a function that is invoked after the message is +sent but before the target may have received it. It is called with a single +argument: `null` on success, or an `Error` object on failure. + +`child.send()` emits an `'error'` event if no callback was given and the message +cannot be sent, for example because the child process has already exited. + +Returns `true` under normal circumstances or `false` when the backlog of +unsent messages exceeds a threshold that makes it unwise to send more. +Use the callback mechanism to implement flow control. #### Example: sending server object diff --git a/doc/api/cluster.markdown b/doc/api/cluster.markdown index 53f6e0a284..d29d5973d8 100644 --- a/doc/api/cluster.markdown +++ b/doc/api/cluster.markdown @@ -426,10 +426,12 @@ exit, the master may choose not to respawn a worker based on this value. // kill worker worker.kill(); -### worker.send(message[, sendHandle]) +### worker.send(message[, sendHandle][, callback]) * `message` {Object} * `sendHandle` {Handle object} +* `callback` {Function} +* Return: Boolean Send a message to a worker or master, optionally with a handle. diff --git a/lib/child_process.js b/lib/child_process.js index 57a8601df5..a923477846 100644 --- a/lib/child_process.js +++ b/lib/child_process.js @@ -48,16 +48,12 @@ exports._forkChild = function(fd) { var p = new Pipe(true); p.open(fd); p.unref(); - setupChannel(process, p); - - var refs = 0; + const control = setupChannel(process, p); process.on('newListener', function(name) { - if (name !== 'message' && name !== 'disconnect') return; - if (++refs === 1) p.ref(); + if (name === 'message' || name === 'disconnect') control.ref(); }); process.on('removeListener', function(name) { - if (name !== 'message' && name !== 'disconnect') return; - if (--refs === 0) p.unref(); + if (name === 'message' || name === 'disconnect') control.unref(); }); }; diff --git a/lib/internal/child_process.js b/lib/internal/child_process.js index c6bb41ffde..6d1c22d03b 100644 --- a/lib/internal/child_process.js +++ b/lib/internal/child_process.js @@ -397,6 +397,25 @@ function setupChannel(target, channel) { target._channel = channel; target._handleQueue = null; + const control = new class extends EventEmitter { + constructor() { + super(); + this.channel = channel; + this.refs = 0; + } + ref() { + if (++this.refs === 1) { + this.channel.ref(); + } + } + unref() { + if (--this.refs === 0) { + this.channel.unref(); + this.emit('unref'); + } + } + }; + var decoder = new StringDecoder('utf8'); var jsonBuffer = ''; channel.buffering = false; @@ -446,7 +465,7 @@ function setupChannel(target, channel) { target._handleQueue = null; queue.forEach(function(args) { - target._send(args.message, args.handle, false); + target._send(args.message, args.handle, false, args.callback); }); // Process a pending disconnect (if any). @@ -478,14 +497,24 @@ function setupChannel(target, channel) { }); }); - target.send = function(message, handle) { - if (!this.connected) - this.emit('error', new Error('channel closed')); - else - this._send(message, handle, false); + target.send = function(message, handle, callback) { + if (typeof handle === 'function') { + callback = handle; + handle = undefined; + } + if (this.connected) { + this._send(message, handle, false, callback); + return; + } + const ex = new Error('channel closed'); + if (typeof callback === 'function') { + process.nextTick(callback, ex); + } else { + this.emit('error', ex); // FIXME(bnoordhuis) Defer to next tick. + } }; - target._send = function(message, handle, swallowErrors) { + target._send = function(message, handle, swallowErrors, callback) { assert(this.connected || this._channel); if (message === undefined) @@ -516,7 +545,11 @@ function setupChannel(target, channel) { // Queue-up message and handle if we haven't received ACK yet. if (this._handleQueue) { - this._handleQueue.push({ message: message.msg, handle: handle }); + this._handleQueue.push({ + callback: callback, + handle: handle, + message: message.msg, + }); return; } @@ -538,24 +571,43 @@ function setupChannel(target, channel) { } else if (this._handleQueue && !(message && message.cmd === 'NODE_HANDLE_ACK')) { // Queue request anyway to avoid out-of-order messages. - this._handleQueue.push({ message: message, handle: null }); + this._handleQueue.push({ + callback: callback, + handle: null, + message: message, + }); return; } var req = new WriteWrap(); - req.oncomplete = nop; + req.async = false; + var string = JSON.stringify(message) + '\n'; var err = channel.writeUtf8String(req, string, handle); - if (err) { - if (!swallowErrors) - this.emit('error', errnoException(err, 'write')); - } else if (handle && !this._handleQueue) { - this._handleQueue = []; - } - - if (obj && obj.postSend) { - req.oncomplete = obj.postSend.bind(null, handle); + if (err === 0) { + if (handle && !this._handleQueue) + this._handleQueue = []; + req.oncomplete = function() { + if (this.async === true) + control.unref(); + if (obj && obj.postSend) + obj.postSend(handle); + if (typeof callback === 'function') + callback(null); + }; + if (req.async === true) { + control.ref(); + } else { + process.nextTick(function() { req.oncomplete(); }); + } + } else if (!swallowErrors) { + const ex = errnoException(err, 'write'); + if (typeof callback === 'function') { + process.nextTick(callback, ex); + } else { + this.emit('error', ex); // FIXME(bnoordhuis) Defer to next tick. + } } /* If the master is > 2 read() calls behind, please stop sending. */ @@ -616,6 +668,7 @@ function setupChannel(target, channel) { }; channel.readStart(); + return control; } diff --git a/test/parallel/test-child-process-send-cb.js b/test/parallel/test-child-process-send-cb.js new file mode 100644 index 0000000000..d65a1abd20 --- /dev/null +++ b/test/parallel/test-child-process-send-cb.js @@ -0,0 +1,19 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const fork = require('child_process').fork; + +if (process.argv[2] === 'child') { + process.send('ok', common.mustCall(function(err) { + assert.strictEqual(err, null); + })); +} else { + const child = fork(process.argv[1], ['child']); + child.on('message', common.mustCall(function(message) { + assert.strictEqual(message, 'ok'); + })); + child.on('exit', common.mustCall(function(exitCode, signalCode) { + assert.strictEqual(exitCode, 0); + assert.strictEqual(signalCode, null); + })); +}