Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions doc/api/diagnostics_channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,136 @@ channels.asyncStart.bindStore(myStore, (data) => {
});
```

#### `tracingChannel.traceIterator(fn[, context[, thisArg[, ...args]]])`

<!-- YAML
added: REPLACEME
-->

* `fn` {Function} Iterator or async iterator returning function to wrap a trace
around
* `context` {Object} Shared object to correlate trace events through
* `thisArg` {any} The receiver to be used for the function call
* `...args` {any} Optional arguments to pass to the function
* Returns: {Iterator|AsyncIterator|Promise} The iterator returned by the given
function, or a {Promise} resolving to it if the function is async

Trace an iterator-returning function call. This will always produce a
[`start` event][] and [`end` event][] around the synchronous portion of the
function execution. If the given function returns a promise (i.e. is an async
function), it will additionally produce an [`asyncStart` event][] and
[`asyncEnd` event][] when the promise resolves to the iterator.

Each call to `next()`, `return()`, or `throw()` on the returned iterator is
also traced via a sub-channel derived from the tracing channel name by appending
`:next`. For example, if the tracing channel is named `my-channel`, the
sub-channel will be `my-channel:next`. These calls follow the same event
pattern as the outer function call: [`start` event][] and [`end` event][] for
synchronous results, plus [`asyncStart` event][] and [`asyncEnd` event][] if
the method returns a promise (e.g. when iterating an async iterator). An
[`error` event][] is produced if `next()` throws or the iterator method rejects.

To ensure only correct trace graphs are formed, events will only be published
if subscribers are present prior to starting the trace. Subscriptions which are
added after the trace begins will not receive future events from that trace,
only future traces will be seen.

```mjs
import diagnostics_channel from 'node:diagnostics_channel';

const channels = diagnostics_channel.tracingChannel('my-channel');

// Sync function returning a sync iterator.
// Fires start/end on 'my-channel'; fires start/end on 'my-channel:next'
// for each next() call.
for (const value of channels.traceIterator(function*() {
yield 1;
yield 2;
}, { some: 'thing' })) {
// consume values
}

// Sync call to an async generator function, returning an AsyncIterator.
// Fires start/end on 'my-channel'; fires start/end/asyncStart/asyncEnd on
// 'my-channel:next' for each next() call because next() returns a Promise.
for await (const value of channels.traceIterator(async function*() {
yield 1;
yield 2;
}, { some: 'thing' })) {
// consume values
}

// Async function returning a sync iterator.
// Fires start/end/asyncStart/asyncEnd on 'my-channel' when the Promise
// resolves; fires start/end on 'my-channel:next' for each next() call.
const iter = await channels.traceIterator(async function() {
return [1, 2].values();
}, { some: 'thing' });
for (const value of iter) {
// consume values
}

// Async function returning an async iterator.
// Fires start/end/asyncStart/asyncEnd on 'my-channel' when the Promise
// resolves; fires start/end/asyncStart/asyncEnd on 'my-channel:next' for each
// next() call.
const asyncIter = await channels.traceIterator(async function() {
return (async function*() { yield 1; yield 2; })();
}, { some: 'thing' });
for await (const value of asyncIter) {
// consume values
}
```

```cjs
const diagnostics_channel = require('node:diagnostics_channel');

const channels = diagnostics_channel.tracingChannel('my-channel');

// Sync function returning a sync iterator.
// Fires start/end on 'my-channel'; fires start/end on 'my-channel:next'
// for each next() call.
for (const value of channels.traceIterator(function*() {
yield 1;
yield 2;
}, { some: 'thing' })) {
// consume values
}

(async () => {
// Sync call to an async generator function, returning an AsyncIterator.
// Fires start/end on 'my-channel'; fires start/end/asyncStart/asyncEnd on
// 'my-channel:next' for each next() call because next() returns a Promise.
for await (const value of channels.traceIterator(async function*() {
yield 1;
yield 2;
}, { some: 'thing' })) {
// consume values
}

// Async function returning a sync iterator.
// Fires start/end/asyncStart/asyncEnd on 'my-channel' when the Promise
// resolves; fires start/end on 'my-channel:next' for each next() call.
const iter = await channels.traceIterator(async function() {
return [1, 2].values();
}, { some: 'thing' });
for (const value of iter) {
// consume values
}

// Async function returning an async iterator.
// Fires start/end/asyncStart/asyncEnd on 'my-channel' when the Promise
// resolves; fires start/end/asyncStart/asyncEnd on 'my-channel:next' for
// each next() call.
const asyncIter = await channels.traceIterator(async function() {
return (async function*() { yield 1; yield 2; })();
}, { some: 'thing' });
for await (const value of asyncIter) {
// consume values
}
})();
```

#### `tracingChannel.hasSubscribers`

<!-- YAML
Expand Down
80 changes: 80 additions & 0 deletions lib/diagnostics_channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ function tracingChannelFrom(nameOrChannels, name) {
}

class TracingChannel {
#nextChannel;

constructor(nameOrChannels) {
for (let i = 0; i < traceEvents.length; ++i) {
const eventName = traceEvents[i];
Expand Down Expand Up @@ -428,6 +430,84 @@ class TracingChannel {
}
});
}

traceIterator(fn, context = {}, thisArg, ...args) {
if (!this.hasSubscribers) {
return ReflectApply(fn, thisArg, args);
}

const { start, end, asyncStart, asyncEnd, error } = this;

const nextChannel = this.#nextChannel ||= tracingChannel({
start: channel(start.name.slice(0, -6) + ':next:start'),
end: channel(end.name.slice(0, -4) + ':next:end'),
asyncStart: channel(asyncStart.name.slice(0, -11) + ':next:asyncStart'),
asyncEnd: channel(asyncEnd.name.slice(0, -9) + ':next:asyncEnd'),
error: channel(error.name.slice(0, -6) + ':next:error'),
});

const wrapIter = (iter) => {
const { next: iterNext, return: iterReturn, throw: iterThrow } = iter;

iter.next = (...args) =>
nextChannel.#traceMaybePromise(iterNext, context, iter, ...args);
iter.return = (...args) =>
nextChannel.#traceMaybePromise(iterReturn, context, iter, ...args);
iter.throw = (...args) =>
nextChannel.#traceMaybePromise(iterThrow, context, iter, ...args);

return iter;
};

const result = this.#traceMaybePromise(fn, context, thisArg, ...args);

return result instanceof Promise ?
PromisePrototypeThen(result, wrapIter) :
wrapIter(result);
}

#traceMaybePromise(fn, context = {}, thisArg, ...args) {
if (!this.hasSubscribers) {
return ReflectApply(fn, thisArg, args);
}

const { start, end, asyncStart, asyncEnd, error } = this;

function reject(err) {
context.error = err;
error.publish(context);
asyncStart.publish(context);
// TODO: Is there a way to have asyncEnd _after_ the continuation?
asyncEnd.publish(context);
return PromiseReject(err);
}

function resolve(result) {
context.result = result;
asyncStart.publish(context);
// TODO: Is there a way to have asyncEnd _after_ the continuation?
asyncEnd.publish(context);
return result;
}

return start.runStores(context, () => {
try {
const result = ReflectApply(fn, thisArg, args);
// TODO: Should tracePromise just always do this?
if (!(result instanceof Promise)) {
context.result = result;
return result;
}
return PromisePrototypeThen(result, resolve, reject);
} catch (err) {
context.error = err;
error.publish(context);
throw err;
} finally {
end.publish(context);
}
});
}
}

function tracingChannel(nameOrChannels) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
'use strict';

const common = require('../common');
const dc = require('diagnostics_channel');
const assert = require('assert');

const channel = dc.tracingChannel('test');
const nextChannel = dc.tracingChannel('test:next');

const expectedResult = { foo: 'bar' };
const input = { foo: 'bar' };
const thisArg = { baz: 'buz' };

function check(found) {
assert.deepStrictEqual(found, input);
}

function checkNextAsync(found) {
check(found);
assert.strictEqual(found.error, undefined);
assert.deepStrictEqual(found.result, { value: expectedResult, done: false });
}

// Async function* returns an AsyncGenerator synchronously, so no asyncStart/asyncEnd
// for the fn call itself
const handlers = {
start: common.mustCall(check),
end: common.mustCall(check),
asyncStart: common.mustNotCall(),
asyncEnd: common.mustNotCall(),
error: common.mustNotCall(),
};

// next() on an AsyncGenerator returns a Promise
const nextHandlers = {
start: common.mustCall(check),
end: common.mustCall(check),
asyncStart: common.mustCall(checkNextAsync),
asyncEnd: common.mustCall(checkNextAsync),
error: common.mustNotCall(),
};

channel.subscribe(handlers);
nextChannel.subscribe(nextHandlers);

const iter = channel.traceIterator(common.mustCall(async function*(value) {
assert.deepStrictEqual(this, thisArg);
yield value;
}), input, thisArg, expectedResult);

// next() returns a Promise since iter is an AsyncGenerator
iter.next().then(common.mustCall((result) => {
assert.deepStrictEqual(result, { value: expectedResult, done: false });
}));
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
'use strict';

const common = require('../common');
const dc = require('diagnostics_channel');

const channel = dc.tracingChannel('test');
const nextChannel = dc.tracingChannel('test:next');

const handlers = {
start: common.mustNotCall(),
end: common.mustNotCall(),
asyncStart: common.mustNotCall(),
asyncEnd: common.mustNotCall(),
error: common.mustNotCall(),
};

const nextHandlers = {
start: common.mustNotCall(),
end: common.mustNotCall(),
asyncStart: common.mustNotCall(),
asyncEnd: common.mustNotCall(),
error: common.mustNotCall(),
};

// Subscribe after traceIterator call - no events should fire for the iterator
// or for subsequent next() calls since the iterator was not wrapped
const iter = channel.traceIterator(function*() {
yield 1;
}, {});

channel.subscribe(handlers);
nextChannel.subscribe(nextHandlers);

iter.next();
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
'use strict';

const common = require('../common');
const dc = require('diagnostics_channel');
const assert = require('assert');

const channel = dc.tracingChannel('test');
const nextChannel = dc.tracingChannel('test:next');

const expectedError = new Error('test');
const input = { foo: 'bar' };
const thisArg = { baz: 'buz' };

function check(found) {
assert.deepStrictEqual(found, input);
}

function checkError(found) {
check(found);
assert.deepStrictEqual(found.error, expectedError);
}

// Two traceIterator calls: one for next() error, one for throw() error
const handlers = {
start: common.mustCall(check, 2),
end: common.mustCall(check, 2),
asyncStart: common.mustNotCall(),
asyncEnd: common.mustNotCall(),
error: common.mustNotCall(),
};

// iter1: next() success + next() throws = start×2, end×2, error×1
// iter2: throw() throws = start×1, end×1, error×1
const nextHandlers = {
start: common.mustCall(check, 3),
end: common.mustCall(check, 3),
asyncStart: common.mustNotCall(),
asyncEnd: common.mustNotCall(),
error: common.mustCall(checkError, 2),
};

channel.subscribe(handlers);
nextChannel.subscribe(nextHandlers);

// Test next(): generator throws after the first yield
const iter1 = channel.traceIterator(common.mustCall(function*() {
assert.deepStrictEqual(this, thisArg);
yield 1;
throw expectedError;
}), input, thisArg);

assert.deepStrictEqual(iter1.next(), { value: 1, done: false });
assert.throws(() => iter1.next(), expectedError);

// Test throw(): propagates error through the iterator
const iter2 = channel.traceIterator(common.mustCall(function*() {
yield 1;
}), input, thisArg);

assert.throws(() => iter2.throw(expectedError), expectedError);
Loading
Loading