lib: add tracing channel to diagnostics_channel

PR-URL: https://github.com/nodejs/node/pull/44943
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Chengzhong Wu <legendecas@gmail.com>
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
Reviewed-By: Gerhard Stöbich <deb2001-github@yahoo.de>
Reviewed-By: Rafael Gonzaga <rafael.nunu@hotmail.com>
Reviewed-By: Bryan English <bryan@bryanenglish.com>
This commit is contained in:
Stephen Belanger 2023-03-31 10:40:30 -07:00 committed by Michaël Zasso
parent 3552afb904
commit 2e9f7284a1
No known key found for this signature in database
GPG key ID: 770F7A9A5AE15600
13 changed files with 1308 additions and 36 deletions

View file

@ -227,6 +227,56 @@ diagnostics_channel.subscribe('my-channel', onMessage);
diagnostics_channel.unsubscribe('my-channel', onMessage);
```
#### `diagnostics_channel.tracingChannel(nameOrChannels)`
<!-- YAML
added:
- REPLACEME
-->
> Stability: 1 - Experimental
* `nameOrChannels` {string|TracingChannel} Channel name or
object containing all the [TracingChannel Channels][]
* Returns: {TracingChannel} Collection of channels to trace with
Creates a [`TracingChannel`][] wrapper for the given
[TracingChannel Channels][]. If a name is given, the corresponding tracing
channels will be created in the form of `tracing:${name}:${eventType}` where
`eventType` corresponds to the types of [TracingChannel Channels][].
```mjs
import diagnostics_channel from 'node:diagnostics_channel';
const channelsByName = diagnostics_channel.tracingChannel('my-channel');
// or...
const channelsByCollection = diagnostics_channel.tracingChannel({
start: diagnostics_channel.channel('tracing:my-channel:start'),
end: diagnostics_channel.channel('tracing:my-channel:end'),
asyncStart: diagnostics_channel.channel('tracing:my-channel:asyncStart'),
asyncEnd: diagnostics_channel.channel('tracing:my-channel:asyncEnd'),
error: diagnostics_channel.channel('tracing:my-channel:error'),
});
```
```cjs
const diagnostics_channel = require('node:diagnostics_channel');
const channelsByName = diagnostics_channel.tracingChannel('my-channel');
// or...
const channelsByCollection = diagnostics_channel.tracingChannel({
start: diagnostics_channel.channel('tracing:my-channel:start'),
end: diagnostics_channel.channel('tracing:my-channel:end'),
asyncStart: diagnostics_channel.channel('tracing:my-channel:asyncStart'),
asyncEnd: diagnostics_channel.channel('tracing:my-channel:asyncEnd'),
error: diagnostics_channel.channel('tracing:my-channel:error'),
});
```
### Class: `Channel`
<!-- YAML
@ -403,6 +453,591 @@ channel.subscribe(onMessage);
channel.unsubscribe(onMessage);
```
#### `channel.bindStore(store[, transform])`
<!-- YAML
added:
- REPLACEME
-->
> Stability: 1 - Experimental
* `store` {AsyncLocalStorage} The store to which to bind the context data
* `transform` {Function} Transform context data before setting the store context
When [`channel.runStores(context, ...)`][] is called, the given context data
will be applied to any store bound to the channel. If the store has already been
bound the previous `transform` function will be replaced with the new one.
The `transform` function may be omitted to set the given context data as the
context directly.
```mjs
import diagnostics_channel from 'node:diagnostics_channel';
import { AsyncLocalStorage } from 'node:async_hooks';
const store = new AsyncLocalStorage();
const channel = diagnostics_channel.channel('my-channel');
channel.bindStore(store, (data) => {
return { data };
});
```
```cjs
const diagnostics_channel = require('node:diagnostics_channel');
const { AsyncLocalStorage } = require('node:async_hooks');
const store = new AsyncLocalStorage();
const channel = diagnostics_channel.channel('my-channel');
channel.bindStore(store, (data) => {
return { data };
});
```
#### `channel.unbindStore(store)`
<!-- YAML
added:
- REPLACEME
-->
> Stability: 1 - Experimental
* `store` {AsyncLocalStorage} The store to unbind from the channel.
* Returns: {boolean} `true` if the store was found, `false` otherwise.
Remove a message handler previously registered to this channel with
[`channel.bindStore(store)`][].
```mjs
import diagnostics_channel from 'node:diagnostics_channel';
import { AsyncLocalStorage } from 'node:async_hooks';
const store = new AsyncLocalStorage();
const channel = diagnostics_channel.channel('my-channel');
channel.bindStore(store);
channel.unbindStore(store);
```
```cjs
const diagnostics_channel = require('node:diagnostics_channel');
const { AsyncLocalStorage } = require('node:async_hooks');
const store = new AsyncLocalStorage();
const channel = diagnostics_channel.channel('my-channel');
channel.bindStore(store);
channel.unbindStore(store);
```
#### `channel.runStores(context, fn[, thisArg[, ...args]])`
<!-- YAML
added:
- REPLACEME
-->
> Stability: 1 - Experimental
* `context` {any} Message to send to subscribers and bind to stores
* `fn` {Function} Handler to run within the entered storage context
* `thisArg` {any} The receiver to be used for the function call.
* `...args` {any} Optional arguments to pass to the function.
Applies the given data to any AsyncLocalStorage instances bound to the channel
for the duration of the given function, then publishes to the channel within
the scope of that data is applied to the stores.
If a transform function was given to [`channel.bindStore(store)`][] it will be
applied to transform the message data before it becomes the context value for
the store. The prior storage context is accessible from within the transform
function in cases where context linking is required.
The context applied to the store should be accesible in any async code which
continues from execution which began during the given function, however
there are some situations in which [context loss][] may occur.
```mjs
import diagnostics_channel from 'node:diagnostics_channel';
import { AsyncLocalStorage } from 'node:async_hooks';
const store = new AsyncLocalStorage();
const channel = diagnostics_channel.channel('my-channel');
channel.bindStore(store, (message) => {
const parent = store.getStore();
return new Span(message, parent);
});
channel.runStores({ some: 'message' }, () => {
store.getStore(); // Span({ some: 'message' })
});
```
```cjs
const diagnostics_channel = require('node:diagnostics_channel');
const { AsyncLocalStorage } = require('node:async_hooks');
const store = new AsyncLocalStorage();
const channel = diagnostics_channel.channel('my-channel');
channel.bindStore(store, (message) => {
const parent = store.getStore();
return new Span(message, parent);
});
channel.runStores({ some: 'message' }, () => {
store.getStore(); // Span({ some: 'message' })
});
```
### Class: `TracingChannel`
<!-- YAML
added:
- REPLACEME
-->
> Stability: 1 - Experimental
The class `TracingChannel` is a collection of [TracingChannel Channels][] which
together express a single traceable action. It is used to formalize and
simplify the process of producing events for tracing application flow.
[`diagnostics_channel.tracingChannel()`][] is used to construct a
`TracingChannel`. As with `Channel` it is recommended to create and reuse a
single `TracingChannel` at the top-level of the file rather than creating them
dynamically.
#### `tracingChannel.subscribe(subscribers)`
<!-- YAML
added:
- REPLACEME
-->
> Stability: 1 - Experimental
* `subscribers` {Object} Set of [TracingChannel Channels][] subscribers
* `start` {Function} The [`start` event][] subscriber
* `end` {Function} The [`end` event][] subscriber
* `asyncStart` {Function} The [`asyncStart` event][] subscriber
* `asyncEnd` {Function} The [`asyncEnd` event][] subscriber
* `error` {Function} The [`error` event][] subscriber
Helper to subscribe a collection of functions to the corresponding channels.
This is the same as calling [`channel.subscribe(onMessage)`][] on each channel
individually.
```mjs
import diagnostics_channel from 'node:diagnostics_channel';
const channels = diagnostics_channel.tracingChannel('my-channel');
channels.subscribe({
start(message) {
// Handle start message
},
end(message) {
// Handle end message
},
asyncStart(message) {
// Handle asyncStart message
},
asyncEnd(message) {
// Handle asyncEnd message
},
error(message) {
// Handle error message
},
});
```
```cjs
const diagnostics_channel = require('node:diagnostics_channel');
const channels = diagnostics_channel.tracingChannel('my-channel');
channels.subscribe({
start(message) {
// Handle start message
},
end(message) {
// Handle end message
},
asyncStart(message) {
// Handle asyncStart message
},
asyncEnd(message) {
// Handle asyncEnd message
},
error(message) {
// Handle error message
},
});
```
#### `tracingChannel.unsubscribe(subscribers)`
<!-- YAML
added:
- REPLACEME
-->
> Stability: 1 - Experimental
* `subscribers` {Object} Set of [TracingChannel Channels][] subscribers
* `start` {Function} The [`start` event][] subscriber
* `end` {Function} The [`end` event][] subscriber
* `asyncStart` {Function} The [`asyncStart` event][] subscriber
* `asyncEnd` {Function} The [`asyncEnd` event][] subscriber
* `error` {Function} The [`error` event][] subscriber
* Returns: {boolean} `true` if all handlers were successfully unsubscribed,
and `false` otherwise.
Helper to unsubscribe a collection of functions from the corresponding channels.
This is the same as calling [`channel.unsubscribe(onMessage)`][] on each channel
individually.
```mjs
import diagnostics_channel from 'node:diagnostics_channel';
const channels = diagnostics_channel.tracingChannel('my-channel');
channels.unsubscribe({
start(message) {
// Handle start message
},
end(message) {
// Handle end message
},
asyncStart(message) {
// Handle asyncStart message
},
asyncEnd(message) {
// Handle asyncEnd message
},
error(message) {
// Handle error message
},
});
```
```cjs
const diagnostics_channel = require('node:diagnostics_channel');
const channels = diagnostics_channel.tracingChannel('my-channel');
channels.unsubscribe({
start(message) {
// Handle start message
},
end(message) {
// Handle end message
},
asyncStart(message) {
// Handle asyncStart message
},
asyncEnd(message) {
// Handle asyncEnd message
},
error(message) {
// Handle error message
},
});
```
#### `tracingChannel.traceSync(fn[, context[, thisArg[, ...args]]])`
<!-- YAML
added:
- REPLACEME
-->
> Stability: 1 - Experimental
* `fn` {Function} Function to wrap a trace around
* `context` {Object} Shared object to correlate events through
* `thisArg` {any} The receiver to be used for the function call
* `...args` {any} Optional arguments to pass to the function
* Returns: {any} The return value of the given function
Trace a synchronous function call. This will always produce a [`start` event][]
and [`end` event][] around the execution and may produce an [`error` event][]
if the given function throws an error. This will run the given function using
[`channel.runStores(context, ...)`][] on the `start` channel which ensures all
events should have any bound stores set to match this trace context.
```mjs
import diagnostics_channel from 'node:diagnostics_channel';
const channels = diagnostics_channel.tracingChannel('my-channel');
channels.traceSync(() => {
// Do something
}, {
some: 'thing',
});
```
```cjs
const diagnostics_channel = require('node:diagnostics_channel');
const channels = diagnostics_channel.tracingChannel('my-channel');
channels.traceSync(() => {
// Do something
}, {
some: 'thing',
});
```
#### `tracingChannel.tracePromise(fn[, context[, thisArg[, ...args]]])`
<!-- YAML
added:
- REPLACEME
-->
> Stability: 1 - Experimental
* `fn` {Function} Promise-returning function to wrap a trace around
* `context` {Object} Shared object to correlate trace events through
* `thisArg` {any} The receiver to be used for the function call
* `...args` {any} Optional arguments to pass to the function
* Returns: {Promise} Chained from promise returned by the given function
Trace a promise-returning function call. This will always produce a
[`start` event][] and [`end` event][] around the synchronous portion of the
function execution, and will produce an [`asyncStart` event][] and
[`asyncEnd` event][] when a promise continuation is reached. It may also
produce an [`error` event][] if the given function throws an error or the
returned promise rejects. This will run the given function using
[`channel.runStores(context, ...)`][] on the `start` channel which ensures all
events should have any bound stores set to match this trace context.
```mjs
import diagnostics_channel from 'node:diagnostics_channel';
const channels = diagnostics_channel.tracingChannel('my-channel');
channels.tracePromise(async () => {
// Do something
}, {
some: 'thing',
});
```
```cjs
const diagnostics_channel = require('node:diagnostics_channel');
const channels = diagnostics_channel.tracingChannel('my-channel');
channels.tracePromise(async () => {
// Do something
}, {
some: 'thing',
});
```
#### `tracingChannel.traceCallback(fn[, position[, context[, thisArg[, ...args]]]])`
<!-- YAML
added:
- REPLACEME
-->
> Stability: 1 - Experimental
* `fn` {Function} callback using function to wrap a trace around
* `position` {number} Zero-indexed argument position of expected callback
* `context` {Object} Shared object to correlate trace events through
* `thisArg` {any} The receiver to be used for the function call
* `...args` {any} Optional arguments to pass to the function
* Returns: {any} The return value of the given function
Trace a callback-receiving function call. This will always produce a
[`start` event][] and [`end` event][] around the synchronous portion of the
function execution, and will produce a [`asyncStart` event][] and
[`asyncEnd` event][] around the callback execution. It may also produce an
[`error` event][] if the given function throws an error or the returned
promise rejects. This will run the given function using
[`channel.runStores(context, ...)`][] on the `start` channel which ensures all
events should have any bound stores set to match this trace context.
The `position` will be -1 by default to indicate the final argument should
be used as the callback.
```mjs
import diagnostics_channel from 'node:diagnostics_channel';
const channels = diagnostics_channel.tracingChannel('my-channel');
channels.traceCallback((arg1, callback) => {
// Do something
callback(null, 'result');
}, 1, {
some: 'thing',
}, thisArg, arg1, callback);
```
```cjs
const diagnostics_channel = require('node:diagnostics_channel');
const channels = diagnostics_channel.tracingChannel('my-channel');
channels.traceCallback((arg1, callback) => {
// Do something
callback(null, 'result');
}, {
some: 'thing',
}, thisArg, arg1, callback);
```
The callback will also be run with [`channel.runStores(context, ...)`][] which
enables context loss recovery in some cases.
```mjs
import diagnostics_channel from 'node:diagnostics_channel';
import { AsyncLocalStorage } from 'node:async_hooks';
const channels = diagnostics_channel.tracingChannel('my-channel');
const myStore = new AsyncLocalStorage();
// The start channel sets the initial store data to something
// and stores that store data value on the trace context object
channels.start.bindStore(myStore, (data) => {
const span = new Span(data);
data.span = span;
return span;
});
// Then asyncStart can restore from that data it stored previously
channels.asyncStart.bindStore(myStore, (data) => {
return data.span;
});
```
```cjs
const diagnostics_channel = require('node:diagnostics_channel');
const { AsyncLocalStorage } = require('node:async_hooks');
const channels = diagnostics_channel.tracingChannel('my-channel');
const myStore = new AsyncLocalStorage();
// The start channel sets the initial store data to something
// and stores that store data value on the trace context object
channels.start.bindStore(myStore, (data) => {
const span = new Span(data);
data.span = span;
return span;
});
// Then asyncStart can restore from that data it stored previously
channels.asyncStart.bindStore(myStore, (data) => {
return data.span;
});
```
### TracingChannel Channels
A TracingChannel is a collection of several diagnostics\_channels representing
specific points in the execution lifecycle of a single traceable action. The
behaviour is split into five diagnostics\_channels consisting of `start`,
`end`, `asyncStart`, `asyncEnd`, and `error`. A single traceable action will
share the same event object between all events, this can be helpful for
managing correlation through a weakmap.
These event objects will be extended with `result` or `error` values when
the task "completes". In the case of a synchronous task the `result` will be
the return value and the `error` will be anything thrown from the function.
With callback-based async functions the `result` will be the second argument
of the callback while the `error` will either be a thrown error visible in the
`end` event or the first callback argument in either of the `asyncStart` or
`asyncEnd` events.
Tracing channels should follow a naming pattern of:
* `tracing:module.class.method:start` or `tracing:module.function:start`
* `tracing:module.class.method:end` or `tracing:module.function:end`
* `tracing:module.class.method:asyncStart` or `tracing:module.function:asyncStart`
* `tracing:module.class.method:asyncEnd` or `tracing:module.function:asyncEnd`
* `tracing:module.class.method:error` or `tracing:module.function:error`
#### `start(event)`
* Name: `tracing:${name}:start`
The `start` event represents the point at which a function is called. At this
point the event data may contain function arguments or anything else available
at the very start of the execution of the function.
#### `end(event)`
* Name: `tracing:${name}:end`
The `end` event represents the point at which a function call returns a value.
In the case of an async function this is when the promise returned not when the
function itself makes a return statement internally. At this point, if the
traced function was synchronous the `result` field will be set to the return
value of the function. Alternatively, the `error` field may be present to
represent any thrown errors.
It is recommended to listen specifically to the `error` event to track errors
as it may be possible for a traceable action to produce multiple errors. For
example, an async task which fails may be started internally before the sync
part of the task then throws an error.
#### `asyncStart(event)`
* Name: `tracing:${name}:asyncStart`
The `asyncStart` event represents the callback or continuation of a traceable
function being reached. At this point things like callback arguments may be
available, or anything else expressing the "result" of the action.
For callbacks-based functions, the first argument of the callback will be
assigned to the `error` field, if not `undefined` or `null`, and the second
argument will be assigned to the `result` field.
For promises, the argument to the `resolve` path will be assigned to `result`
or the argument to the `reject` path will be assign to `error`.
It is recommended to listen specifically to the `error` event to track errors
as it may be possible for a traceable action to produce multiple errors. For
example, an async task which fails may be started internally before the sync
part of the task then throws an error.
#### `asyncEnd(event)`
* Name: `tracing:${name}:asyncEnd`
The `asyncEnd` event represents the callback of an asynchronous function
returning. It's not likely event data will change after the `asyncStart` event,
however it may be useful to see the point where the callback completes.
#### `error(event)`
* Name: `tracing:${name}:error`
The `error` event represents any error produced by the traceable function
either synchronously or asynchronously. If an error is thrown in the
synchronous portion of the traced function the error will be assigned to the
`error` field of the event and the `error` event will be triggered. If an error
is received asynchronously through a callback or promise rejection it will also
be assigned to the `error` field of the event and trigger the `error` event.
It is possible for a single traceable function call to produce errors multiple
times so this should be considered when consuming this event. For example, if
another async task is triggered internally which fails and then the sync part
of the function then throws and error two `error` events will be emitted, one
for the sync error and one for the async error.
### Built-in Channels
> Stability: 1 - Experimental
@ -462,8 +1097,20 @@ Emitted when a new TCP or pipe connection is received.
Emitted when a new UDP socket is created.
[TracingChannel Channels]: #tracingchannel-channels
[`'uncaughtException'`]: process.md#event-uncaughtexception
[`TracingChannel`]: #class-tracingchannel
[`asyncEnd` event]: #asyncendevent
[`asyncStart` event]: #asyncstartevent
[`channel.bindStore(store)`]: #channelbindstorestore-transform
[`channel.runStores(context, ...)`]: #channelrunstorescontext-fn-thisarg-args
[`channel.subscribe(onMessage)`]: #channelsubscribeonmessage
[`channel.unsubscribe(onMessage)`]: #channelunsubscribeonmessage
[`diagnostics_channel.channel(name)`]: #diagnostics_channelchannelname
[`diagnostics_channel.subscribe(name, onMessage)`]: #diagnostics_channelsubscribename-onmessage
[`diagnostics_channel.tracingChannel()`]: #diagnostics_channeltracingchannelnameorchannels
[`diagnostics_channel.unsubscribe(name, onMessage)`]: #diagnostics_channelunsubscribename-onmessage
[`end` event]: #endevent
[`error` event]: #errorevent
[`start` event]: #startevent
[context loss]: async_context.md#troubleshooting-context-loss

View file

@ -4,9 +4,14 @@ const {
ArrayPrototypeIndexOf,
ArrayPrototypePush,
ArrayPrototypeSplice,
ObjectCreate,
ObjectGetPrototypeOf,
ObjectSetPrototypeOf,
Promise,
PromisePrototypeThen,
PromiseResolve,
PromiseReject,
ReflectApply,
SafeMap,
SymbolHasInstance,
} = primordials;
@ -23,11 +28,59 @@ const { triggerUncaughtException } = internalBinding('errors');
const { WeakReference } = internalBinding('util');
function decRef(channel) {
if (channels.get(channel.name).decRef() === 0) {
channels.delete(channel.name);
}
}
function incRef(channel) {
channels.get(channel.name).incRef();
}
function markActive(channel) {
// eslint-disable-next-line no-use-before-define
ObjectSetPrototypeOf(channel, ActiveChannel.prototype);
channel._subscribers = [];
channel._stores = new SafeMap();
}
function maybeMarkInactive(channel) {
// When there are no more active subscribers or bound, restore to fast prototype.
if (!channel._subscribers.length && !channel._stores.size) {
// eslint-disable-next-line no-use-before-define
ObjectSetPrototypeOf(channel, Channel.prototype);
channel._subscribers = undefined;
channel._stores = undefined;
}
}
function defaultTransform(data) {
return data;
}
function wrapStoreRun(store, data, next, transform = defaultTransform) {
return () => {
let context;
try {
context = transform(data);
} catch (err) {
process.nextTick(() => {
triggerUncaughtException(err, false);
});
return next();
}
return store.run(context, next);
};
}
// TODO(qard): should there be a C++ channel interface?
class ActiveChannel {
subscribe(subscription) {
validateFunction(subscription, 'subscription');
ArrayPrototypePush(this._subscribers, subscription);
incRef(this);
}
unsubscribe(subscription) {
@ -36,12 +89,28 @@ class ActiveChannel {
ArrayPrototypeSplice(this._subscribers, index, 1);
// When there are no more active subscribers, restore to fast prototype.
if (!this._subscribers.length) {
// eslint-disable-next-line no-use-before-define
ObjectSetPrototypeOf(this, Channel.prototype);
decRef(this);
maybeMarkInactive(this);
return true;
}
bindStore(store, transform) {
const replacing = this._stores.has(store);
if (!replacing) incRef(this);
this._stores.set(store, transform);
}
unbindStore(store) {
if (!this._stores.has(store)) {
return false;
}
this._stores.delete(store);
decRef(this);
maybeMarkInactive(this);
return true;
}
@ -61,12 +130,30 @@ class ActiveChannel {
}
}
}
runStores(data, fn, thisArg, ...args) {
let run = () => {
this.publish(data);
return ReflectApply(fn, thisArg, args);
};
for (const entry of this._stores.entries()) {
const store = entry[0];
const transform = entry[1];
run = wrapStoreRun(store, data, run, transform);
}
return run();
}
}
class Channel {
constructor(name) {
this._subscribers = undefined;
this._stores = undefined;
this.name = name;
channels.set(name, new WeakReference(this));
}
static [SymbolHasInstance](instance) {
@ -76,8 +163,7 @@ class Channel {
}
subscribe(subscription) {
ObjectSetPrototypeOf(this, ActiveChannel.prototype);
this._subscribers = [];
markActive(this);
this.subscribe(subscription);
}
@ -85,18 +171,31 @@ class Channel {
return false;
}
bindStore(store, transform) {
markActive(this);
this.bindStore(store, transform);
}
unbindStore() {
return false;
}
get hasSubscribers() {
return false;
}
publish() {}
runStores(data, fn, thisArg, ...args) {
return ReflectApply(fn, thisArg, args);
}
}
const channels = ObjectCreate(null);
const channels = new SafeMap();
function channel(name) {
let channel;
const ref = channels[name];
const ref = channels.get(name);
if (ref) channel = ref.get();
if (channel) return channel;
@ -104,33 +203,20 @@ function channel(name) {
throw new ERR_INVALID_ARG_TYPE('channel', ['string', 'symbol'], name);
}
channel = new Channel(name);
channels[name] = new WeakReference(channel);
return channel;
return new Channel(name);
}
function subscribe(name, subscription) {
const chan = channel(name);
channels[name].incRef();
chan.subscribe(subscription);
return channel(name).subscribe(subscription);
}
function unsubscribe(name, subscription) {
const chan = channel(name);
if (!chan.unsubscribe(subscription)) {
return false;
}
channels[name].decRef();
if (channels[name].getRef() === 0) {
delete channels[name];
}
return true;
return channel(name).unsubscribe(subscription);
}
function hasSubscribers(name) {
let channel;
const ref = channels[name];
const ref = channels.get(name);
if (ref) channel = ref.get();
if (!channel) {
return false;
@ -139,10 +225,179 @@ function hasSubscribers(name) {
return channel.hasSubscribers;
}
const traceEvents = [
'start',
'end',
'asyncStart',
'asyncEnd',
'error',
];
function assertChannel(value, name) {
if (!(value instanceof Channel)) {
throw new ERR_INVALID_ARG_TYPE(name, ['Channel'], value);
}
}
class TracingChannel {
constructor(nameOrChannels) {
if (typeof nameOrChannels === 'string') {
this.start = channel(`tracing:${nameOrChannels}:start`);
this.end = channel(`tracing:${nameOrChannels}:end`);
this.asyncStart = channel(`tracing:${nameOrChannels}:asyncStart`);
this.asyncEnd = channel(`tracing:${nameOrChannels}:asyncEnd`);
this.error = channel(`tracing:${nameOrChannels}:error`);
} else if (typeof nameOrChannels === 'object') {
const { start, end, asyncStart, asyncEnd, error } = nameOrChannels;
assertChannel(start, 'nameOrChannels.start');
assertChannel(end, 'nameOrChannels.end');
assertChannel(asyncStart, 'nameOrChannels.asyncStart');
assertChannel(asyncEnd, 'nameOrChannels.asyncEnd');
assertChannel(error, 'nameOrChannels.error');
this.start = start;
this.end = end;
this.asyncStart = asyncStart;
this.asyncEnd = asyncEnd;
this.error = error;
} else {
throw new ERR_INVALID_ARG_TYPE('nameOrChannels',
['string', 'object', 'Channel'],
nameOrChannels);
}
}
subscribe(handlers) {
for (const name of traceEvents) {
if (!handlers[name]) continue;
this[name]?.subscribe(handlers[name]);
}
}
unsubscribe(handlers) {
let done = true;
for (const name of traceEvents) {
if (!handlers[name]) continue;
if (!this[name]?.unsubscribe(handlers[name])) {
done = false;
}
}
return done;
}
traceSync(fn, context = {}, thisArg, ...args) {
const { start, end, error } = this;
return start.runStores(context, () => {
try {
const result = ReflectApply(fn, thisArg, args);
context.result = result;
return result;
} catch (err) {
context.error = err;
error.publish(context);
throw err;
} finally {
end.publish(context);
}
});
}
tracePromise(fn, context = {}, thisArg, ...args) {
const { start, end, asyncStart, asyncEnd, error } = this;
function reject(err) {
context.error = err;
error.publish(context);
asyncStart.publish(context);
// TODO: Is there a way to have asyncEnd _after_ the continuation?
asyncEnd.publish(context);
return PromiseReject(err);
}
function resolve(result) {
context.result = result;
asyncStart.publish(context);
// TODO: Is there a way to have asyncEnd _after_ the continuation?
asyncEnd.publish(context);
return result;
}
return start.runStores(context, () => {
try {
let promise = ReflectApply(fn, thisArg, args);
// Convert thenables to native promises
if (!(promise instanceof Promise)) {
promise = PromiseResolve(promise);
}
return PromisePrototypeThen(promise, resolve, reject);
} catch (err) {
context.error = err;
error.publish(context);
throw err;
} finally {
end.publish(context);
}
});
}
traceCallback(fn, position = -1, context = {}, thisArg, ...args) {
const { start, end, asyncStart, asyncEnd, error } = this;
function wrappedCallback(err, res) {
if (err) {
context.error = err;
error.publish(context);
} else {
context.result = res;
}
// Using runStores here enables manual context failure recovery
asyncStart.runStores(context, () => {
try {
if (callback) {
return ReflectApply(callback, this, arguments);
}
} finally {
asyncEnd.publish(context);
}
});
}
const callback = args.at(position);
if (typeof callback !== 'function') {
throw new ERR_INVALID_ARG_TYPE('callback', ['function'], callback);
}
ArrayPrototypeSplice(args, position, 1, wrappedCallback);
return start.runStores(context, () => {
try {
return ReflectApply(fn, thisArg, args);
} catch (err) {
context.error = err;
error.publish(context);
throw err;
} finally {
end.publish(context);
}
});
}
}
function tracingChannel(nameOrChannels) {
return new TracingChannel(nameOrChannels);
}
module.exports = {
channel,
hasSubscribers,
subscribe,
tracingChannel,
unsubscribe,
Channel,
};

View file

@ -262,18 +262,13 @@ void WeakReference::Get(const FunctionCallbackInfo<Value>& args) {
args.GetReturnValue().Set(weak_ref->target_.Get(isolate));
}
void WeakReference::GetRef(const FunctionCallbackInfo<Value>& args) {
WeakReference* weak_ref = Unwrap<WeakReference>(args.Holder());
Isolate* isolate = args.GetIsolate();
args.GetReturnValue().Set(
v8::Number::New(isolate, weak_ref->reference_count_));
}
void WeakReference::IncRef(const FunctionCallbackInfo<Value>& args) {
WeakReference* weak_ref = Unwrap<WeakReference>(args.Holder());
weak_ref->reference_count_++;
if (weak_ref->target_.IsEmpty()) return;
if (weak_ref->reference_count_ == 1) weak_ref->target_.ClearWeak();
args.GetReturnValue().Set(
v8::Number::New(args.GetIsolate(), weak_ref->reference_count_));
}
void WeakReference::DecRef(const FunctionCallbackInfo<Value>& args) {
@ -282,6 +277,8 @@ void WeakReference::DecRef(const FunctionCallbackInfo<Value>& args) {
weak_ref->reference_count_--;
if (weak_ref->target_.IsEmpty()) return;
if (weak_ref->reference_count_ == 0) weak_ref->target_.SetWeak();
args.GetReturnValue().Set(
v8::Number::New(args.GetIsolate(), weak_ref->reference_count_));
}
static void GuessHandleType(const FunctionCallbackInfo<Value>& args) {
@ -365,7 +362,6 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
registry->Register(ArrayBufferViewHasBuffer);
registry->Register(WeakReference::New);
registry->Register(WeakReference::Get);
registry->Register(WeakReference::GetRef);
registry->Register(WeakReference::IncRef);
registry->Register(WeakReference::DecRef);
registry->Register(GuessHandleType);
@ -457,7 +453,6 @@ void Initialize(Local<Object> target,
WeakReference::kInternalFieldCount);
weak_ref->Inherit(BaseObject::GetConstructorTemplate(env));
SetProtoMethod(isolate, weak_ref, "get", WeakReference::Get);
SetProtoMethod(isolate, weak_ref, "getRef", WeakReference::GetRef);
SetProtoMethod(isolate, weak_ref, "incRef", WeakReference::IncRef);
SetProtoMethod(isolate, weak_ref, "decRef", WeakReference::DecRef);
SetConstructorFunction(context, target, "WeakReference", weak_ref);

View file

@ -21,7 +21,6 @@ class WeakReference : public SnapshotableObject {
v8::Local<v8::Object> target);
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Get(const v8::FunctionCallbackInfo<v8::Value>& args);
static void GetRef(const v8::FunctionCallbackInfo<v8::Value>& args);
static void IncRef(const v8::FunctionCallbackInfo<v8::Value>& args);
static void DecRef(const v8::FunctionCallbackInfo<v8::Value>& args);

View file

@ -0,0 +1,108 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const dc = require('diagnostics_channel');
const { AsyncLocalStorage } = require('async_hooks');
let n = 0;
const thisArg = new Date();
const inputs = [
{ foo: 'bar' },
{ baz: 'buz' },
];
const channel = dc.channel('test');
// Bind a storage directly to published data
const store1 = new AsyncLocalStorage();
channel.bindStore(store1);
let store1bound = true;
// Bind a store with transformation of published data
const store2 = new AsyncLocalStorage();
channel.bindStore(store2, common.mustCall((data) => {
assert.strictEqual(data, inputs[n]);
return { data };
}, 4));
// Regular subscribers should see publishes from runStores calls
channel.subscribe(common.mustCall((data) => {
if (store1bound) {
assert.deepStrictEqual(data, store1.getStore());
}
assert.deepStrictEqual({ data }, store2.getStore());
assert.strictEqual(data, inputs[n]);
}, 4));
// Verify stores are empty before run
assert.strictEqual(store1.getStore(), undefined);
assert.strictEqual(store2.getStore(), undefined);
channel.runStores(inputs[n], common.mustCall(function(a, b) {
// Verify this and argument forwarding
assert.strictEqual(this, thisArg);
assert.strictEqual(a, 1);
assert.strictEqual(b, 2);
// Verify store 1 state matches input
assert.strictEqual(store1.getStore(), inputs[n]);
// Verify store 2 state has expected transformation
assert.deepStrictEqual(store2.getStore(), { data: inputs[n] });
// Should support nested contexts
n++;
channel.runStores(inputs[n], common.mustCall(function() {
// Verify this and argument forwarding
assert.strictEqual(this, undefined);
// Verify store 1 state matches input
assert.strictEqual(store1.getStore(), inputs[n]);
// Verify store 2 state has expected transformation
assert.deepStrictEqual(store2.getStore(), { data: inputs[n] });
}));
n--;
// Verify store 1 state matches input
assert.strictEqual(store1.getStore(), inputs[n]);
// Verify store 2 state has expected transformation
assert.deepStrictEqual(store2.getStore(), { data: inputs[n] });
}), thisArg, 1, 2);
// Verify stores are empty after run
assert.strictEqual(store1.getStore(), undefined);
assert.strictEqual(store2.getStore(), undefined);
// Verify unbinding works
assert.ok(channel.unbindStore(store1));
store1bound = false;
// Verify unbinding a store that is not bound returns false
assert.ok(!channel.unbindStore(store1));
n++;
channel.runStores(inputs[n], common.mustCall(() => {
// Verify after unbinding store 1 will remain undefined
assert.strictEqual(store1.getStore(), undefined);
// Verify still bound store 2 receives expected data
assert.deepStrictEqual(store2.getStore(), { data: inputs[n] });
}));
// Contain transformer errors and emit on next tick
const fail = new Error('fail');
channel.bindStore(store1, () => {
throw fail;
});
let calledRunStores = false;
process.once('uncaughtException', common.mustCall((err) => {
assert.strictEqual(calledRunStores, true);
assert.strictEqual(err, fail);
}));
channel.runStores(inputs[n], common.mustCall());
calledRunStores = true;

View file

@ -0,0 +1,46 @@
'use strict';
const common = require('../common');
const dc = require('diagnostics_channel');
const assert = require('assert');
const channel = dc.tracingChannel('test');
const expectedError = new Error('test');
const input = { foo: 'bar' };
const thisArg = { baz: 'buz' };
function check(found) {
assert.deepStrictEqual(found, input);
}
const handlers = {
start: common.mustCall(check, 2),
end: common.mustCall(check, 2),
asyncStart: common.mustCall(check, 2),
asyncEnd: common.mustCall(check, 2),
error: common.mustCall((found) => {
check(found);
assert.deepStrictEqual(found.error, expectedError);
}, 2)
};
channel.subscribe(handlers);
channel.traceCallback(function(cb, err) {
assert.deepStrictEqual(this, thisArg);
setImmediate(cb, err);
}, 0, input, thisArg, common.mustCall((err, res) => {
assert.strictEqual(err, expectedError);
assert.strictEqual(res, undefined);
}), expectedError);
channel.tracePromise(function(value) {
assert.deepStrictEqual(this, thisArg);
return Promise.reject(value);
}, input, thisArg, expectedError).then(
common.mustNotCall(),
common.mustCall((value) => {
assert.deepStrictEqual(value, expectedError);
})
);

View file

@ -0,0 +1,60 @@
'use strict';
const common = require('../common');
const dc = require('diagnostics_channel');
const assert = require('assert');
const channel = dc.tracingChannel('test');
const expectedResult = { foo: 'bar' };
const input = { foo: 'bar' };
const thisArg = { baz: 'buz' };
function check(found) {
assert.deepStrictEqual(found, input);
}
const handlers = {
start: common.mustCall(check, 2),
end: common.mustCall(check, 2),
asyncStart: common.mustCall((found) => {
check(found);
assert.strictEqual(found.error, undefined);
assert.deepStrictEqual(found.result, expectedResult);
}, 2),
asyncEnd: common.mustCall((found) => {
check(found);
assert.strictEqual(found.error, undefined);
assert.deepStrictEqual(found.result, expectedResult);
}, 2),
error: common.mustNotCall()
};
channel.subscribe(handlers);
channel.traceCallback(function(cb, err, res) {
assert.deepStrictEqual(this, thisArg);
setImmediate(cb, err, res);
}, 0, input, thisArg, common.mustCall((err, res) => {
assert.strictEqual(err, null);
assert.deepStrictEqual(res, expectedResult);
}), null, expectedResult);
channel.tracePromise(function(value) {
assert.deepStrictEqual(this, thisArg);
return Promise.resolve(value);
}, input, thisArg, expectedResult).then(
common.mustCall((value) => {
assert.deepStrictEqual(value, expectedResult);
}),
common.mustNotCall()
);
let failed = false;
try {
channel.traceCallback(common.mustNotCall(), 0, input, thisArg, 1, 2, 3);
} catch (err) {
assert.ok(/"callback" argument must be of type function/.test(err.message));
failed = true;
}
assert.strictEqual(failed, true);

View file

@ -0,0 +1,29 @@
'use strict';
const common = require('../common');
const { AsyncLocalStorage } = require('async_hooks');
const dc = require('diagnostics_channel');
const assert = require('assert');
const channel = dc.tracingChannel('test');
const store = new AsyncLocalStorage();
const firstContext = { foo: 'bar' };
const secondContext = { baz: 'buz' };
channel.start.bindStore(store, common.mustCall(() => {
return firstContext;
}));
channel.asyncStart.bindStore(store, common.mustCall(() => {
return secondContext;
}));
assert.strictEqual(store.getStore(), undefined);
channel.traceCallback(common.mustCall((cb) => {
assert.deepStrictEqual(store.getStore(), firstContext);
setImmediate(cb);
}), 0, {}, null, common.mustCall(() => {
assert.deepStrictEqual(store.getStore(), secondContext);
}));
assert.strictEqual(store.getStore(), undefined);

View file

@ -0,0 +1,24 @@
'use strict';
const common = require('../common');
const { setTimeout } = require('node:timers/promises');
const { AsyncLocalStorage } = require('async_hooks');
const dc = require('diagnostics_channel');
const assert = require('assert');
const channel = dc.tracingChannel('test');
const store = new AsyncLocalStorage();
const context = { foo: 'bar' };
channel.start.bindStore(store, common.mustCall(() => {
return context;
}));
assert.strictEqual(store.getStore(), undefined);
channel.tracePromise(common.mustCall(async () => {
assert.deepStrictEqual(store.getStore(), context);
await setTimeout(1);
assert.deepStrictEqual(store.getStore(), context);
}));
assert.strictEqual(store.getStore(), undefined);

View file

@ -0,0 +1,21 @@
'use strict';
const common = require('../common');
const { AsyncLocalStorage } = require('async_hooks');
const dc = require('diagnostics_channel');
const assert = require('assert');
const channel = dc.tracingChannel('test');
const store = new AsyncLocalStorage();
const context = { foo: 'bar' };
channel.start.bindStore(store, common.mustCall(() => {
return context;
}));
assert.strictEqual(store.getStore(), undefined);
channel.traceSync(common.mustCall(() => {
assert.deepStrictEqual(store.getStore(), context);
}));
assert.strictEqual(store.getStore(), undefined);

View file

@ -0,0 +1,39 @@
'use strict';
const common = require('../common');
const dc = require('diagnostics_channel');
const assert = require('assert');
const channel = dc.tracingChannel('test');
const expectedError = new Error('test');
const input = { foo: 'bar' };
const thisArg = { baz: 'buz' };
function check(found) {
assert.deepStrictEqual(found, input);
}
const handlers = {
start: common.mustCall(check),
end: common.mustCall(check),
asyncStart: common.mustNotCall(),
asyncEnd: common.mustNotCall(),
error: common.mustCall((found) => {
check(found);
assert.deepStrictEqual(found.error, expectedError);
})
};
channel.subscribe(handlers);
try {
channel.traceSync(function(err) {
assert.deepStrictEqual(this, thisArg);
assert.strictEqual(err, expectedError);
throw err;
}, input, thisArg, expectedError);
throw new Error('It should not reach this error');
} catch (error) {
assert.deepStrictEqual(error, expectedError);
}

View file

@ -0,0 +1,46 @@
'use strict';
const common = require('../common');
const dc = require('diagnostics_channel');
const assert = require('assert');
const channel = dc.tracingChannel('test');
const expectedResult = { foo: 'bar' };
const input = { foo: 'bar' };
const thisArg = { baz: 'buz' };
const arg = { baz: 'buz' };
function check(found) {
assert.strictEqual(found, input);
}
const handlers = {
start: common.mustCall(check),
end: common.mustCall((found) => {
check(found);
assert.strictEqual(found.result, expectedResult);
}),
asyncStart: common.mustNotCall(),
asyncEnd: common.mustNotCall(),
error: common.mustNotCall()
};
assert.strictEqual(channel.start.hasSubscribers, false);
channel.subscribe(handlers);
assert.strictEqual(channel.start.hasSubscribers, true);
const result1 = channel.traceSync(function(arg1) {
assert.strictEqual(arg1, arg);
assert.strictEqual(this, thisArg);
return expectedResult;
}, input, thisArg, arg);
assert.strictEqual(result1, expectedResult);
channel.unsubscribe(handlers);
assert.strictEqual(channel.start.hasSubscribers, false);
const result2 = channel.traceSync(function(arg1) {
assert.strictEqual(arg1, arg);
assert.strictEqual(this, thisArg);
return expectedResult;
}, input, thisArg, arg);
assert.strictEqual(result2, expectedResult);

View file

@ -57,6 +57,8 @@ const customTypesMap = {
'Module Namespace Object':
'https://tc39.github.io/ecma262/#sec-module-namespace-exotic-objects',
'AsyncLocalStorage': 'async_context.html#class-asynclocalstorage',
'AsyncHook': 'async_hooks.html#async_hookscreatehookcallbacks',
'AsyncResource': 'async_hooks.html#class-asyncresource',
@ -108,6 +110,7 @@ const customTypesMap = {
'dgram.Socket': 'dgram.html#class-dgramsocket',
'Channel': 'diagnostics_channel.html#class-channel',
'TracingChannel': 'diagnostics_channel.html#class-tracingchannel',
'Domain': 'domain.html#class-domain',