From 4a7233c1788334c171d2280026333242df7d37af Mon Sep 17 00:00:00 2001 From: Trevor Norris Date: Fri, 10 Mar 2017 06:17:42 -0700 Subject: [PATCH] lib: implement async_hooks API in core Implement async_hooks support in the following: * fatalException handler * process.nextTick * Timers * net/dgram/http PR-URL: https://github.com/nodejs/node/pull/12892 Ref: https://github.com/nodejs/node/pull/11883 Ref: https://github.com/nodejs/node/pull/8531 Reviewed-By: Andreas Madsen Reviewed-By: Anna Henningsen Reviewed-By: Sam Roberts Reviewed-By: Matteo Collina Reviewed-By: Refael Ackermann Reviewed-By: James M Snell Reviewed-By: Jeremiah Senkpiel --- lib/_http_agent.js | 9 +- lib/_http_client.js | 6 +- lib/_http_common.js | 8 +- lib/_http_outgoing.js | 12 +- lib/async_hooks.js | 6 +- lib/dgram.js | 12 +- lib/internal/bootstrap_node.js | 24 ++- lib/internal/process/next_tick.js | 153 +++++++++++++++--- lib/net.js | 45 +++++- lib/timers.js | 107 +++++++++++- src/async-wrap.cc | 12 ++ .../unhandled_promise_trace_warnings.out | 2 +- .../test-async-wrap-uncaughtexception.js | 43 +++++ 13 files changed, 399 insertions(+), 40 deletions(-) create mode 100644 test/parallel/test-async-wrap-uncaughtexception.js diff --git a/lib/_http_agent.js b/lib/_http_agent.js index 88f4df402a..d791a961c7 100644 --- a/lib/_http_agent.js +++ b/lib/_http_agent.js @@ -25,6 +25,8 @@ const net = require('net'); const util = require('util'); const EventEmitter = require('events'); const debug = util.debuglog('http'); +const async_id_symbol = process.binding('async_wrap').async_id_symbol; +const nextTick = require('internal/process/next_tick').nextTick; // New Agent code. @@ -93,6 +95,7 @@ function Agent(options) { self.freeSockets[name] = freeSockets; socket.setKeepAlive(true, self.keepAliveMsecs); socket.unref(); + socket[async_id_symbol] = -1; socket._httpMessage = null; self.removeSocket(socket, options); freeSockets.push(socket); @@ -163,6 +166,8 @@ Agent.prototype.addRequest = function addRequest(req, options, port/*legacy*/, if (freeLen) { // we have a free socket, so use that. var socket = this.freeSockets[name].shift(); + // Assign the handle a new asyncId and run any init() hooks. + socket._handle.asyncReset(); debug('have free socket'); // don't leak @@ -177,7 +182,7 @@ Agent.prototype.addRequest = function addRequest(req, options, port/*legacy*/, // If we are under maxSockets create a new one. this.createSocket(req, options, function(err, newSocket) { if (err) { - process.nextTick(function() { + nextTick(newSocket._handle.getAsyncId(), function() { req.emit('error', err); }); return; @@ -290,7 +295,7 @@ Agent.prototype.removeSocket = function removeSocket(s, options) { // If we have pending requests and a socket gets closed make a new one this.createSocket(req, options, function(err, newSocket) { if (err) { - process.nextTick(function() { + nextTick(newSocket._handle.getAsyncId(), function() { req.emit('error', err); }); return; diff --git a/lib/_http_client.js b/lib/_http_client.js index e1f064e832..a238a4f14c 100644 --- a/lib/_http_client.js +++ b/lib/_http_client.js @@ -36,6 +36,7 @@ const Agent = require('_http_agent'); const Buffer = require('buffer').Buffer; const urlToOptions = require('internal/url').urlToOptions; const outHeadersKey = require('internal/http').outHeadersKey; +const nextTick = require('internal/process/next_tick').nextTick; // The actual list of disallowed characters in regexp form is more like: // /[^A-Za-z0-9\-._~!$&'()*+,;=/:@]/ @@ -587,9 +588,12 @@ function responseKeepAlive(res, req) { socket.removeListener('close', socketCloseListener); socket.removeListener('error', socketErrorListener); socket.once('error', freeSocketErrorListener); + // There are cases where _handle === null. Avoid those. Passing null to + // nextTick() will call initTriggerId() to retrieve the id. + const asyncId = socket._handle ? socket._handle.getAsyncId() : null; // Mark this socket as available, AFTER user-added end // handlers have a chance to run. - process.nextTick(emitFreeNT, socket); + nextTick(asyncId, emitFreeNT, socket); } } diff --git a/lib/_http_common.js b/lib/_http_common.js index b33e38e08d..59eaec5811 100644 --- a/lib/_http_common.js +++ b/lib/_http_common.js @@ -28,6 +28,7 @@ const HTTPParser = binding.HTTPParser; const FreeList = require('internal/freelist'); const ondrain = require('internal/http').ondrain; const incoming = require('_http_incoming'); +const emitDestroy = require('async_hooks').emitDestroy; const IncomingMessage = incoming.IncomingMessage; const readStart = incoming.readStart; const readStop = incoming.readStop; @@ -211,8 +212,13 @@ function freeParser(parser, req, socket) { parser.incoming = null; parser.outgoing = null; parser[kOnExecute] = null; - if (parsers.free(parser) === false) + if (parsers.free(parser) === false) { parser.close(); + } else { + // Since the Parser destructor isn't going to run the destroy() callbacks + // it needs to be triggered manually. + emitDestroy(parser.getAsyncId()); + } } if (req) { req.parser = null; diff --git a/lib/_http_outgoing.js b/lib/_http_outgoing.js index 4ff84aea7d..9b492df85a 100644 --- a/lib/_http_outgoing.js +++ b/lib/_http_outgoing.js @@ -31,6 +31,8 @@ const common = require('_http_common'); const checkIsHttpToken = common._checkIsHttpToken; const checkInvalidHeaderChar = common._checkInvalidHeaderChar; const outHeadersKey = require('internal/http').outHeadersKey; +const async_id_symbol = process.binding('async_wrap').async_id_symbol; +const nextTick = require('internal/process/next_tick').nextTick; const CRLF = common.CRLF; const debug = common.debug; @@ -264,8 +266,9 @@ function _writeRaw(data, encoding, callback) { if (this.output.length) { this._flushOutput(conn); } else if (!data.length) { - if (typeof callback === 'function') - process.nextTick(callback); + if (typeof callback === 'function') { + nextTick(this.socket[async_id_symbol], callback); + } return true; } // Directly write to socket. @@ -623,7 +626,10 @@ const crlf_buf = Buffer.from('\r\n'); OutgoingMessage.prototype.write = function write(chunk, encoding, callback) { if (this.finished) { var err = new Error('write after end'); - process.nextTick(writeAfterEndNT.bind(this), err, callback); + nextTick(this.socket[async_id_symbol], + writeAfterEndNT.bind(this), + err, + callback); return true; } diff --git a/lib/async_hooks.js b/lib/async_hooks.js index 736b189097..867b5eb52d 100644 --- a/lib/async_hooks.js +++ b/lib/async_hooks.js @@ -32,7 +32,7 @@ var processing_hook = false; // Use to temporarily store and updated active_hooks_array if the user enables // or disables a hook while hooks are being processed. var tmp_active_hooks_array = null; -// Keep track of the field counds held in tmp_active_hooks_array. +// Keep track of the field counts held in tmp_active_hooks_array. var tmp_async_hook_fields = null; // Each constant tracks how many callbacks there are for any given step of @@ -41,9 +41,9 @@ var tmp_async_hook_fields = null; const { kInit, kBefore, kAfter, kDestroy, kCurrentAsyncId, kCurrentTriggerId, kAsyncUidCntr, kInitTriggerId } = async_wrap.constants; +const { async_id_symbol, trigger_id_symbol } = async_wrap; + // Used in AsyncHook and AsyncEvent. -const async_id_symbol = Symbol('_asyncId'); -const trigger_id_symbol = Symbol('_triggerId'); const init_symbol = Symbol('init'); const before_symbol = Symbol('before'); const after_symbol = Symbol('after'); diff --git a/lib/dgram.js b/lib/dgram.js index f2da1da1cc..4d9f7eb29f 100644 --- a/lib/dgram.js +++ b/lib/dgram.js @@ -25,7 +25,10 @@ const assert = require('assert'); const Buffer = require('buffer').Buffer; const util = require('util'); const EventEmitter = require('events'); +const setInitTriggerId = require('async_hooks').setInitTriggerId; const UV_UDP_REUSEADDR = process.binding('constants').os.UV_UDP_REUSEADDR; +const async_id_symbol = process.binding('async_wrap').async_id_symbol; +const nextTick = require('internal/process/next_tick').nextTick; const UDP = process.binding('udp_wrap').UDP; const SendWrap = process.binding('udp_wrap').SendWrap; @@ -111,6 +114,7 @@ function Socket(type, listener) { this._handle = handle; this._receiving = false; this._bindState = BIND_STATE_UNBOUND; + this[async_id_symbol] = this._handle.getAsyncId(); this.type = type; this.fd = null; // compatibility hack @@ -432,6 +436,10 @@ function doSend(ex, self, ip, list, address, port, callback) { req.callback = callback; req.oncomplete = afterSend; } + // node::SendWrap isn't instantiated and attached to the JS instance of + // SendWrap above until send() is called. So don't set the init trigger id + // until now. + setInitTriggerId(self[async_id_symbol]); var err = self._handle.send(req, list, list.length, @@ -441,7 +449,7 @@ function doSend(ex, self, ip, list, address, port, callback) { if (err && callback) { // don't emit as error, dgram_legacy.js compatibility const ex = exceptionWithHostPort(err, 'send', address, port); - process.nextTick(callback, ex); + nextTick(self[async_id_symbol], callback, ex); } } @@ -468,7 +476,7 @@ Socket.prototype.close = function(callback) { this._stopReceiving(); this._handle.close(); this._handle = null; - process.nextTick(socketCloseNT, this); + nextTick(this[async_id_symbol], socketCloseNT, this); return this; }; diff --git a/lib/internal/bootstrap_node.js b/lib/internal/bootstrap_node.js index 67a7c10919..b93b817aba 100644 --- a/lib/internal/bootstrap_node.js +++ b/lib/internal/bootstrap_node.js @@ -292,10 +292,20 @@ } function setupProcessFatal() { + const async_wrap = process.binding('async_wrap'); + // Arrays containing hook flags and ids for async_hook calls. + const { async_hook_fields, async_uid_fields } = async_wrap; + // Internal functions needed to manipulate the stack. + const { clearIdStack, popAsyncIds } = async_wrap; + const { kAfter, kCurrentAsyncId, kInitTriggerId } = async_wrap.constants; process._fatalException = function(er) { var caught; + // It's possible that kInitTriggerId was set for a constructor call that + // threw and was never cleared. So clear it now. + async_uid_fields[kInitTriggerId] = 0; + if (process.domain && process.domain._errorHandler) caught = process.domain._errorHandler(er); @@ -314,9 +324,21 @@ // nothing to be done about it at this point. } - // if we handled an error, then make sure any ticks get processed } else { + // If we handled an error, then make sure any ticks get processed NativeModule.require('timers').setImmediate(process._tickCallback); + + // Emit the after() hooks now that the exception has been handled. + if (async_hook_fields[kAfter] > 0) { + do { + NativeModule.require('async_hooks').emitAfter( + async_uid_fields[kCurrentAsyncId]); + // popAsyncIds() returns true if there are more ids on the stack. + } while (popAsyncIds(async_uid_fields[kCurrentAsyncId])); + // Or completely empty the id stack. + } else { + clearIdStack(); + } } return caught; diff --git a/lib/internal/process/next_tick.js b/lib/internal/process/next_tick.js index c834ffc2e3..0ba26ce033 100644 --- a/lib/internal/process/next_tick.js +++ b/lib/internal/process/next_tick.js @@ -7,11 +7,26 @@ const kMaxCallbacksPerLoop = 1e4; exports.setup = setupNextTick; +// Will be overwritten when setupNextTick() is called. +exports.nextTick = null; function setupNextTick() { + const async_wrap = process.binding('async_wrap'); + const async_hooks = require('async_hooks'); const promises = require('internal/process/promises'); const errors = require('internal/errors'); const emitPendingUnhandledRejections = promises.setup(scheduleMicrotasks); + const initTriggerId = async_hooks.initTriggerId; + // Two arrays that share state between C++ and JS. + const { async_hook_fields, async_uid_fields } = async_wrap; + // Used to change the state of the async id stack. + const { pushAsyncIds, popAsyncIds } = async_wrap; + // The needed emit*() functions. + const { emitInit, emitBefore, emitAfter, emitDestroy } = async_hooks; + // Grab the constants necessary for working with internal arrays. + const { kInit, kBefore, kAfter, kDestroy, kAsyncUidCntr, kInitTriggerId } = + async_wrap.constants; + const { async_id_symbol, trigger_id_symbol } = async_wrap; var nextTickQueue = []; var microtasksScheduled = false; @@ -27,6 +42,9 @@ function setupNextTick() { process._tickCallback = _tickCallback; process._tickDomainCallback = _tickDomainCallback; + // Set the nextTick() function for internal usage. + exports.nextTick = internalNextTick; + // This tickInfo thing is used so that the C++ code in src/node.cc // can have easy access to our nextTick state, and avoid unnecessary // calls into JS land. @@ -51,10 +69,13 @@ function setupNextTick() { if (microtasksScheduled) return; - nextTickQueue.push({ - callback: runMicrotasksCallback, - domain: null - }); + const tickObject = + new TickObject(runMicrotasksCallback, undefined, null); + // For the moment all microtasks come from the void until the PromiseHook + // API is implemented. + tickObject[async_id_symbol] = 0; + tickObject[trigger_id_symbol] = 0; + nextTickQueue.push(tickObject); tickInfo[kLength]++; microtasksScheduled = true; @@ -89,20 +110,58 @@ function setupNextTick() { } } + // TODO(trevnorris): Using std::stack of Environment::AsyncHooks::ids_stack_ + // is much slower here than was the Float64Array stack used in a previous + // implementation. Problem is the Float64Array stack was a bit brittle. + // Investigate how to harden that implementation and possibly reintroduce it. + function nextTickEmitBefore(asyncId, triggerId) { + if (async_hook_fields[kBefore] > 0) + emitBefore(asyncId, triggerId); + else + pushAsyncIds(asyncId, triggerId); + } + + function nextTickEmitAfter(asyncId) { + if (async_hook_fields[kAfter] > 0) + emitAfter(asyncId); + else + popAsyncIds(asyncId); + } + // Run callbacks that have no domain. // Using domains will cause this to be overridden. function _tickCallback() { - var callback, args, tock; - do { while (tickInfo[kIndex] < tickInfo[kLength]) { - tock = nextTickQueue[tickInfo[kIndex]++]; - callback = tock.callback; - args = tock.args; + const tock = nextTickQueue[tickInfo[kIndex]++]; + const callback = tock.callback; + const args = tock.args; + + // CHECK(Number.isSafeInteger(tock[async_id_symbol])) + // CHECK(tock[async_id_symbol] > 0) + // CHECK(Number.isSafeInteger(tock[trigger_id_symbol])) + // CHECK(tock[trigger_id_symbol] > 0) + + nextTickEmitBefore(tock[async_id_symbol], tock[trigger_id_symbol]); + // emitDestroy() places the async_id_symbol into an asynchronous queue + // that calls the destroy callback in the future. It's called before + // calling tock.callback so destroy will be called even if the callback + // throws an exception that is handles by 'uncaughtException' or a + // domain. + // TODO(trevnorris): This is a bit of a hack. It relies on the fact + // that nextTick() doesn't allow the event loop to proceed, but if + // any async hooks are enabled during the callback's execution then + // this tock's after hook will be called, but not its destroy hook. + if (async_hook_fields[kDestroy] > 0) + emitDestroy(tock[async_id_symbol]); + // Using separate callback execution functions allows direct // callback invocation with small numbers of arguments to avoid the // performance hit associated with using `fn.apply()` _combinedTickCallback(args, callback); + + nextTickEmitAfter(tock[async_id_symbol]); + if (kMaxCallbacksPerLoop < tickInfo[kIndex]) tickDone(); } @@ -113,20 +172,33 @@ function setupNextTick() { } function _tickDomainCallback() { - var callback, domain, args, tock; - do { while (tickInfo[kIndex] < tickInfo[kLength]) { - tock = nextTickQueue[tickInfo[kIndex]++]; - callback = tock.callback; - domain = tock.domain; - args = tock.args; + const tock = nextTickQueue[tickInfo[kIndex]++]; + const callback = tock.callback; + const domain = tock.domain; + const args = tock.args; if (domain) domain.enter(); + + // CHECK(Number.isSafeInteger(tock[async_id_symbol])) + // CHECK(tock[async_id_symbol] > 0) + // CHECK(Number.isSafeInteger(tock[trigger_id_symbol])) + // CHECK(tock[trigger_id_symbol] > 0) + + nextTickEmitBefore(tock[async_id_symbol], tock[trigger_id_symbol]); + // TODO(trevnorris): See comment in _tickCallback() as to why this + // isn't a good solution. + if (async_hook_fields[kDestroy] > 0) + emitDestroy(tock[async_id_symbol]); + // Using separate callback execution functions allows direct // callback invocation with small numbers of arguments to avoid the // performance hit associated with using `fn.apply()` _combinedTickCallback(args, callback); + + nextTickEmitAfter(tock[async_id_symbol]); + if (kMaxCallbacksPerLoop < tickInfo[kIndex]) tickDone(); if (domain) @@ -138,6 +210,25 @@ function setupNextTick() { } while (tickInfo[kLength] !== 0); } + function TickObject(callback, args, domain) { + this.callback = callback; + this.domain = domain; + this.args = args; + this[async_id_symbol] = -1; + this[trigger_id_symbol] = -1; + } + + function setupInit(tickObject, triggerId) { + tickObject[async_id_symbol] = ++async_uid_fields[kAsyncUidCntr]; + tickObject[trigger_id_symbol] = triggerId || initTriggerId(); + if (async_hook_fields[kInit] > 0) { + emitInit(tickObject[async_id_symbol], + 'TickObject', + tickObject[trigger_id_symbol], + tickObject); + } + } + function nextTick(callback) { if (typeof callback !== 'function') throw new errors.TypeError('ERR_INVALID_CALLBACK'); @@ -152,11 +243,33 @@ function setupNextTick() { args[i - 1] = arguments[i]; } - nextTickQueue.push({ - callback, - domain: process.domain || null, - args - }); + var obj = new TickObject(callback, args, process.domain || null); + setupInit(obj, null); + nextTickQueue.push(obj); + tickInfo[kLength]++; + } + + function internalNextTick(triggerId, callback) { + if (typeof callback !== 'function') + throw new TypeError('callback is not a function'); + // CHECK(Number.isSafeInteger(triggerId) || triggerId === null) + // CHECK(triggerId > 0 || triggerId === null) + + if (process._exiting) + return; + + var args; + if (arguments.length > 2) { + args = new Array(arguments.length - 2); + for (var i = 2; i < arguments.length; i++) + args[i - 2] = arguments[i]; + } + + var obj = new TickObject(callback, args, process.domain || null); + setupInit(obj, triggerId); + // The call to initTriggerId() was skipped, so clear kInitTriggerId. + async_uid_fields[kInitTriggerId] = 0; + nextTickQueue.push(obj); tickInfo[kLength]++; } } diff --git a/lib/net.js b/lib/net.js index 2da278a32d..2431820783 100644 --- a/lib/net.js +++ b/lib/net.js @@ -39,6 +39,9 @@ const TCPConnectWrap = process.binding('tcp_wrap').TCPConnectWrap; const PipeConnectWrap = process.binding('pipe_wrap').PipeConnectWrap; const ShutdownWrap = process.binding('stream_wrap').ShutdownWrap; const WriteWrap = process.binding('stream_wrap').WriteWrap; +const async_id_symbol = process.binding('async_wrap').async_id_symbol; +const { newUid, setInitTriggerId } = require('async_hooks'); +const nextTick = require('internal/process/next_tick').nextTick; var cluster; var dns; @@ -57,6 +60,12 @@ function createHandle(fd) { } +function getNewAsyncId(handle) { + return (!handle || typeof handle.getAsyncId !== 'function') ? + newUid() : handle.getAsyncId(); +} + + const debug = util.debuglog('net'); function isPipeName(s) { @@ -147,6 +156,7 @@ function initSocketHandle(self) { if (self._handle) { self._handle.owner = self; self._handle.onread = onread; + self[async_id_symbol] = getNewAsyncId(self._handle); // If handle doesn't support writev - neither do we if (!self._handle.writev) @@ -162,6 +172,10 @@ function Socket(options) { if (!(this instanceof Socket)) return new Socket(options); this.connecting = false; + // Problem with this is that users can supply their own handle, that may not + // have _handle.getAsyncId(). In this case an[async_id_symbol] should + // probably be supplied by async_hooks. + this[async_id_symbol] = -1; this._hadError = false; this._handle = null; this._parent = null; @@ -176,9 +190,11 @@ function Socket(options) { if (options.handle) { this._handle = options.handle; // private + this[async_id_symbol] = getNewAsyncId(this._handle); } else if (options.fd !== undefined) { this._handle = createHandle(options.fd); this._handle.open(options.fd); + this[async_id_symbol] = this._handle.getAsyncId(); // options.fd can be string (since it is user-defined), // so changing this to === would be semver-major // See: https://github.com/nodejs/node/pull/11513 @@ -264,6 +280,10 @@ function onSocketFinish() { var req = new ShutdownWrap(); req.oncomplete = afterShutdown; req.handle = this._handle; + // node::ShutdownWrap isn't instantiated and attached to the JS instance of + // ShutdownWrap above until shutdown() is called. So don't set the init + // trigger id until now. + setInitTriggerId(this[async_id_symbol]); var err = this._handle.shutdown(req); if (err) @@ -329,7 +349,7 @@ function writeAfterFIN(chunk, encoding, cb) { // TODO: defer error events consistently everywhere, not just the cb this.emit('error', er); if (typeof cb === 'function') { - process.nextTick(cb, er); + nextTick(this[async_id_symbol], cb, er); } } @@ -887,6 +907,10 @@ function internalConnect( req.localAddress = localAddress; req.localPort = localPort; + // node::TCPConnectWrap isn't instantiated and attached to the JS instance + // of TCPConnectWrap above until connect() is called. So don't set the init + // trigger id until now. + setInitTriggerId(self[async_id_symbol]); if (addressType === 4) err = self._handle.connect(req, address, port); else @@ -896,6 +920,10 @@ function internalConnect( const req = new PipeConnectWrap(); req.address = address; req.oncomplete = afterConnect; + // node::PipeConnectWrap isn't instantiated and attached to the JS instance + // of PipeConnectWrap above until connect() is called. So don't set the + // init trigger id until now. + setInitTriggerId(self[async_id_symbol]); err = self._handle.connect(req, address, afterConnect); } @@ -1020,6 +1048,7 @@ function lookupAndConnect(self, options) { debug('connect: dns options', dnsopts); self._host = host; var lookup = options.lookup || dns.lookup; + setInitTriggerId(self[async_id_symbol]); lookup(host, dnsopts, function emitLookup(err, ip, addressType) { self.emit('lookup', err, ip, addressType, host); @@ -1167,6 +1196,7 @@ function Server(options, connectionListener) { configurable: true, enumerable: false }); + this[async_id_symbol] = -1; this._handle = null; this._usingSlaves = false; this._slaves = []; @@ -1274,6 +1304,7 @@ function setupListenHandle(address, port, addressType, backlog, fd) { this._handle = rval; } + this[async_id_symbol] = getNewAsyncId(this._handle); this._handle.onconnection = onconnection; this._handle.owner = this; @@ -1286,7 +1317,7 @@ function setupListenHandle(address, port, addressType, backlog, fd) { var ex = exceptionWithHostPort(err, 'listen', address, port); this._handle.close(); this._handle = null; - process.nextTick(emitErrorNT, this, ex); + nextTick(this[async_id_symbol], emitErrorNT, this, ex); return; } @@ -1297,7 +1328,7 @@ function setupListenHandle(address, port, addressType, backlog, fd) { if (this._unref) this.unref(); - process.nextTick(emitListeningNT, this); + nextTick(this[async_id_symbol], emitListeningNT, this); } Server.prototype._listen2 = setupListenHandle; // legacy alias @@ -1398,6 +1429,7 @@ Server.prototype.listen = function() { // (handle[, backlog][, cb]) where handle is an object with a handle if (options instanceof TCP) { this._handle = options; + this[async_id_symbol] = this._handle.getAsyncId(); listenInCluster(this, null, -1, -1, backlogFromArgs); return this; } @@ -1521,8 +1553,10 @@ function onconnection(err, clientHandle) { Server.prototype.getConnections = function(cb) { + const self = this; + function end(err, connections) { - process.nextTick(cb, err, connections); + nextTick(self[async_id_symbol], cb, err, connections); } if (!this._usingSlaves) { @@ -1597,7 +1631,8 @@ Server.prototype._emitCloseIfDrained = function() { return; } - process.nextTick(emitCloseNT, this); + const asyncId = this._handle ? this[async_id_symbol] : null; + nextTick(asyncId, emitCloseNT, this); }; diff --git a/lib/timers.js b/lib/timers.js index 5d21227b7b..fb9984abf4 100644 --- a/lib/timers.js +++ b/lib/timers.js @@ -21,14 +21,29 @@ 'use strict'; +const async_wrap = process.binding('async_wrap'); const TimerWrap = process.binding('timer_wrap').Timer; const L = require('internal/linkedlist'); const internalUtil = require('internal/util'); const { createPromise, promiseResolve } = process.binding('util'); +const async_hooks = require('async_hooks'); const assert = require('assert'); const util = require('util'); const debug = util.debuglog('timer'); const kOnTimeout = TimerWrap.kOnTimeout | 0; +const initTriggerId = async_hooks.initTriggerId; +// Two arrays that share state between C++ and JS. +const { async_hook_fields, async_uid_fields } = async_wrap; +// Used to change the state of the async id stack. +const { pushAsyncIds, popAsyncIds } = async_wrap; +// The needed emit*() functions. +const { emitInit, emitBefore, emitAfter, emitDestroy } = async_hooks; +// Grab the constants necessary for working with internal arrays. +const { kInit, kBefore, kAfter, kDestroy, kAsyncUidCntr } = + async_wrap.constants; +// Symbols for storing async id state. +const async_id_symbol = Symbol('asyncId'); +const trigger_id_symbol = Symbol('triggerId'); // Timeout values > TIMEOUT_MAX are set to 1. const TIMEOUT_MAX = 2147483647; // 2^31-1 @@ -134,6 +149,22 @@ exports._unrefActive = function(item) { }; +function timerEmitBefore(asyncId, triggerId) { + if (async_hook_fields[kBefore] > 0) + emitBefore(asyncId, triggerId); + else + pushAsyncIds(asyncId, triggerId); +} + + +function timerEmitAfter(asyncId) { + if (async_hook_fields[kAfter] > 0) + emitAfter(asyncId); + else + popAsyncIds(asyncId); +} + + // The underlying logic for scheduling or re-scheduling a timer. // // Appends a timer onto the end of an existing timers list, or creates a new @@ -154,6 +185,14 @@ function insert(item, unrefed) { lists[msecs] = list = createTimersList(msecs, unrefed); } + if (!item[async_id_symbol] || item._destroyed) { + item._destroyed = false; + item[async_id_symbol] = ++async_uid_fields[kAsyncUidCntr]; + item[trigger_id_symbol] = initTriggerId(); + if (async_hook_fields[kInit] > 0) + emitInit(item[async_id_symbol], 'Timeout', item[trigger_id_symbol], item); + } + L.append(list, item); assert(!L.isEmpty(list)); // list is not empty } @@ -218,7 +257,14 @@ function listOnTimeout() { L.remove(timer); assert(timer !== L.peek(list)); - if (!timer._onTimeout) continue; + if (!timer._onTimeout) { + if (async_hook_fields[kDestroy] > 0 && !timer._destroyed && + typeof timer[async_id_symbol] === 'number') { + emitDestroy(timer[async_id_symbol]); + timer._destroyed = true; + } + continue; + } var domain = timer.domain; if (domain) { @@ -268,11 +314,25 @@ function listOnTimeout() { // 4.7) what is in this smaller function. function tryOnTimeout(timer, list) { timer._called = true; + const timerAsyncId = (typeof timer[async_id_symbol] === 'number') ? + timer[async_id_symbol] : null; var threw = true; + if (timerAsyncId !== null) + timerEmitBefore(timerAsyncId, timer[trigger_id_symbol]); try { ontimeout(timer); threw = false; } finally { + if (timerAsyncId !== null) { + if (!threw) + timerEmitAfter(timerAsyncId); + if (!timer._repeat && async_hook_fields[kDestroy] > 0 && + !timer._destroyed) { + emitDestroy(timerAsyncId); + timer._destroyed = true; + } + } + if (!threw) return; // Postpone all later list events to next tick. We need to do this @@ -324,6 +384,15 @@ function reuse(item) { // Remove a timer. Cancels the timeout and resets the relevant timer properties. const unenroll = exports.unenroll = function(item) { + // Fewer checks may be possible, but these cover everything. + if (async_hook_fields[kDestroy] > 0 && + item && + typeof item[async_id_symbol] === 'number' && + !item._destroyed) { + emitDestroy(item[async_id_symbol]); + item._destroyed = true; + } + var handle = reuse(item); if (handle) { debug('unenroll: list empty'); @@ -516,6 +585,11 @@ function Timeout(after, callback, args) { this._onTimeout = callback; this._timerArgs = args; this._repeat = null; + this._destroyed = false; + this[async_id_symbol] = ++async_uid_fields[kAsyncUidCntr]; + this[trigger_id_symbol] = initTriggerId(); + if (async_hook_fields[kInit] > 0) + emitInit(this[async_id_symbol], 'Timeout', this[trigger_id_symbol], this); } @@ -570,6 +644,15 @@ Timeout.prototype.ref = function() { Timeout.prototype.close = function() { this._onTimeout = null; if (this._handle) { + // Fewer checks may be possible, but these cover everything. + if (async_hook_fields[kDestroy] > 0 && + this && + typeof this[async_id_symbol] === 'number' && + !this._destroyed) { + emitDestroy(this[async_id_symbol]); + this._destroyed = true; + } + this._idleTimeout = -1; this._handle[kOnTimeout] = null; this._handle.close(); @@ -673,11 +756,21 @@ function processImmediate() { // 4.7) what is in this smaller function. function tryOnImmediate(immediate, oldTail) { var threw = true; + timerEmitBefore(immediate[async_id_symbol], immediate[trigger_id_symbol]); try { // make the actual call outside the try/catch to allow it to be optimized runCallback(immediate); threw = false; } finally { + // clearImmediate checks _callback === null for kDestroy hooks. + immediate._callback = null; + if (!threw) + timerEmitAfter(immediate[async_id_symbol]); + if (async_hook_fields[kDestroy] > 0 && !immediate._destroyed) { + emitDestroy(immediate[async_id_symbol]); + immediate._destroyed = true; + } + if (threw && immediate._idleNext) { // Handle any remaining on next tick, assuming we're still alive to do so. const curHead = immediateQueue.head; @@ -726,7 +819,12 @@ function Immediate() { this._callback = null; this._argv = null; this._onImmediate = null; + this._destroyed = false; this.domain = process.domain; + this[async_id_symbol] = ++async_uid_fields[kAsyncUidCntr]; + this[trigger_id_symbol] = initTriggerId(); + if (async_hook_fields[kInit] > 0) + emitInit(this[async_id_symbol], 'Immediate', this[trigger_id_symbol], this); } function setImmediate(callback, arg1, arg2, arg3) { @@ -785,6 +883,13 @@ function createImmediate(args, callback) { exports.clearImmediate = function(immediate) { if (!immediate) return; + if (async_hook_fields[kDestroy] > 0 && + immediate._callback !== null && + !immediate._destroyed) { + emitDestroy(immediate[async_id_symbol]); + immediate._destroyed = true; + } + immediate._onImmediate = null; immediateQueue.remove(immediate); diff --git a/src/async-wrap.cc b/src/async-wrap.cc index 6ccccfcb65..06567d6f7c 100644 --- a/src/async-wrap.cc +++ b/src/async-wrap.cc @@ -45,6 +45,7 @@ using v8::MaybeLocal; using v8::Number; using v8::Object; using v8::RetainedObjectInfo; +using v8::Symbol; using v8::TryCatch; using v8::Uint32Array; using v8::Value; @@ -325,6 +326,17 @@ void AsyncWrap::Initialize(Local target, #undef V FORCE_SET_TARGET_FIELD(target, "Providers", async_providers); + // These Symbols are used throughout node so the stored values on each object + // can be accessed easily across files. + FORCE_SET_TARGET_FIELD( + target, + "async_id_symbol", + Symbol::New(isolate, FIXED_ONE_BYTE_STRING(isolate, "asyncId"))); + FORCE_SET_TARGET_FIELD( + target, + "trigger_id_symbol", + Symbol::New(isolate, FIXED_ONE_BYTE_STRING(isolate, "triggerId"))); + #undef FORCE_SET_TARGET_FIELD env->set_async_hooks_init_function(Local()); diff --git a/test/message/unhandled_promise_trace_warnings.out b/test/message/unhandled_promise_trace_warnings.out index 4372efdeed..0cf65dcdbb 100644 --- a/test/message/unhandled_promise_trace_warnings.out +++ b/test/message/unhandled_promise_trace_warnings.out @@ -23,7 +23,7 @@ at * at Promise.then * at Promise.catch * - at Immediate.setImmediate (*test*message*unhandled_promise_trace_warnings.js:*) + at Immediate.setImmediate [as _onImmediate] (*test*message*unhandled_promise_trace_warnings.js:*) at * at * at * diff --git a/test/parallel/test-async-wrap-uncaughtexception.js b/test/parallel/test-async-wrap-uncaughtexception.js new file mode 100644 index 0000000000..099bdb70dd --- /dev/null +++ b/test/parallel/test-async-wrap-uncaughtexception.js @@ -0,0 +1,43 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const async_hooks = require('async_hooks'); +const call_log = [0, 0, 0, 0]; // [before, callback, exception, after]; +let call_id = null; +let hooks = null; + + +process.on('beforeExit', common.mustCall(() => { + process.removeAllListeners('uncaughtException'); + hooks.disable(); + assert.strictEqual(typeof call_id, 'number'); + assert.deepStrictEqual(call_log, [1, 1, 1, 1]); +})); + + +hooks = async_hooks.createHook({ + init(id, type) { + if (type === 'RANDOMBYTESREQUEST') + call_id = id; + }, + before(id) { + if (id === call_id) call_log[0]++; + }, + after(id) { + if (id === call_id) call_log[3]++; + }, +}).enable(); + + +process.on('uncaughtException', common.mustCall(() => { + assert.strictEqual(call_id, async_hooks.currentId()); + call_log[2]++; +})); + + +require('crypto').randomBytes(1, common.mustCall(() => { + assert.strictEqual(call_id, async_hooks.currentId()); + call_log[1]++; + throw new Error('ah crap'); +}));