Skip to content

Commit 71880da

Browse files
authored
fix(cloudflare): Initialize once per workflow run and preserve scope for step.do (#17582)
Previously, our Cloudflare Workflows instrumentation created a new Sentry client inside every `step.do`. This resets the tracing context: a custom span started with `startSpan` around multiple `step.do` would finish under a different client/scope than its children. This change initializes Sentry once per workflow run and preserves the active scope across steps. We capture the current scope in `step.do` and pass it to `startSpan`. A new test was added to check that a `step.do` span becomes a child of a surrounding custom span. fixes #17419 ref (for being able to test this): cloudflare/workerd#5030 Multiple `step.do` with a surrounding `startSpan` call will now result in this: <img width="851" height="265" alt="image" src="https://github.com/user-attachments/assets/298dc085-6555-46cb-93da-a22dd9f7ca02" />
1 parent 2cc1ef6 commit 71880da

File tree

2 files changed

+101
-57
lines changed

2 files changed

+101
-57
lines changed

packages/cloudflare/src/workflows.ts

Lines changed: 53 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import type { PropagationContext } from '@sentry/core';
22
import {
33
captureException,
44
flush,
5+
getCurrentScope,
56
SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN,
67
SEMANTIC_ATTRIBUTE_SENTRY_SOURCE,
78
startSpan,
@@ -56,29 +57,6 @@ async function propagationContextFromInstanceId(instanceId: string): Promise<Pro
5657
};
5758
}
5859

59-
async function workflowStepWithSentry<V>(
60-
instanceId: string,
61-
options: CloudflareOptions,
62-
callback: () => V,
63-
): Promise<V> {
64-
setAsyncLocalStorageAsyncContextStrategy();
65-
66-
return withIsolationScope(async isolationScope => {
67-
const client = init({ ...options, enableDedupe: false });
68-
isolationScope.setClient(client);
69-
70-
addCloudResourceContext(isolationScope);
71-
72-
return withScope(async scope => {
73-
const propagationContext = await propagationContextFromInstanceId(instanceId);
74-
scope.setPropagationContext(propagationContext);
75-
76-
// eslint-disable-next-line no-return-await
77-
return await callback();
78-
});
79-
});
80-
}
81-
8260
class WrappedWorkflowStep implements WorkflowStep {
8361
public constructor(
8462
private _instanceId: string,
@@ -98,38 +76,40 @@ class WrappedWorkflowStep implements WorkflowStep {
9876
configOrCallback: WorkflowStepConfig | (() => Promise<T>),
9977
maybeCallback?: () => Promise<T>,
10078
): Promise<T> {
79+
// Capture the current scope, so parent span (e.g., a startSpan surrounding step.do) is preserved
80+
const scopeForStep = getCurrentScope();
81+
10182
const userCallback = (maybeCallback || configOrCallback) as () => Promise<T>;
10283
const config = typeof configOrCallback === 'function' ? undefined : configOrCallback;
10384

10485
const instrumentedCallback: () => Promise<T> = async () => {
105-
return workflowStepWithSentry(this._instanceId, this._options, async () => {
106-
return startSpan(
107-
{
108-
op: 'function.step.do',
109-
name,
110-
attributes: {
111-
'cloudflare.workflow.timeout': config?.timeout,
112-
'cloudflare.workflow.retries.backoff': config?.retries?.backoff,
113-
'cloudflare.workflow.retries.delay': config?.retries?.delay,
114-
'cloudflare.workflow.retries.limit': config?.retries?.limit,
115-
[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.faas.cloudflare.workflow',
116-
[SEMANTIC_ATTRIBUTE_SENTRY_SOURCE]: 'task',
117-
},
118-
},
119-
async span => {
120-
try {
121-
const result = await userCallback();
122-
span.setStatus({ code: 1 });
123-
return result;
124-
} catch (error) {
125-
captureException(error, { mechanism: { handled: true, type: 'cloudflare' } });
126-
throw error;
127-
} finally {
128-
this._ctx.waitUntil(flush(2000));
129-
}
86+
return startSpan(
87+
{
88+
op: 'function.step.do',
89+
name,
90+
scope: scopeForStep,
91+
attributes: {
92+
'cloudflare.workflow.timeout': config?.timeout,
93+
'cloudflare.workflow.retries.backoff': config?.retries?.backoff,
94+
'cloudflare.workflow.retries.delay': config?.retries?.delay,
95+
'cloudflare.workflow.retries.limit': config?.retries?.limit,
96+
[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.faas.cloudflare.workflow',
97+
[SEMANTIC_ATTRIBUTE_SENTRY_SOURCE]: 'task',
13098
},
131-
);
132-
});
99+
},
100+
async span => {
101+
try {
102+
const result = await userCallback();
103+
span.setStatus({ code: 1 });
104+
return result;
105+
} catch (error) {
106+
captureException(error, { mechanism: { handled: true, type: 'cloudflare' } });
107+
throw error;
108+
} finally {
109+
this._ctx.waitUntil(flush(2000));
110+
}
111+
},
112+
);
133113
};
134114

135115
return config ? this._step.do(name, config, instrumentedCallback) : this._step.do(name, instrumentedCallback);
@@ -183,7 +163,29 @@ export function instrumentWorkflowWithSentry<
183163
get(obj, prop, receiver) {
184164
if (prop === 'run') {
185165
return async function (event: WorkflowEvent<P>, step: WorkflowStep): Promise<unknown> {
186-
return obj.run.call(obj, event, new WrappedWorkflowStep(event.instanceId, ctx, options, step));
166+
setAsyncLocalStorageAsyncContextStrategy();
167+
168+
return withIsolationScope(async isolationScope => {
169+
const client = init({ ...options, enableDedupe: false });
170+
isolationScope.setClient(client);
171+
172+
addCloudResourceContext(isolationScope);
173+
174+
return withScope(async scope => {
175+
const propagationContext = await propagationContextFromInstanceId(event.instanceId);
176+
scope.setPropagationContext(propagationContext);
177+
178+
try {
179+
return await obj.run.call(
180+
obj,
181+
event,
182+
new WrappedWorkflowStep(event.instanceId, ctx, options, step),
183+
);
184+
} finally {
185+
ctx.waitUntil(flush(2000));
186+
}
187+
});
188+
});
187189
};
188190
}
189191
return Reflect.get(obj, prop, receiver);

packages/cloudflare/test/workflow.test.ts

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/* eslint-disable @typescript-eslint/unbound-method */
2+
import { startSpan } from '@sentry/core';
23
import type { WorkflowEvent, WorkflowStep, WorkflowStepConfig } from 'cloudflare:workers';
34
import { beforeEach, describe, expect, test, vi } from 'vitest';
45
import { deterministicTraceIdFromInstanceId, instrumentWorkflowWithSentry } from '../src/workflows';
@@ -96,7 +97,8 @@ describe.skipIf(NODE_MAJOR_VERSION < 20)('workflows', () => {
9697

9798
expect(mockStep.do).toHaveBeenCalledTimes(1);
9899
expect(mockStep.do).toHaveBeenCalledWith('first step', expect.any(Function));
99-
expect(mockContext.waitUntil).toHaveBeenCalledTimes(1);
100+
// We flush after the step.do and at the end of the run
101+
expect(mockContext.waitUntil).toHaveBeenCalledTimes(2);
100102
expect(mockContext.waitUntil).toHaveBeenCalledWith(expect.any(Promise));
101103
expect(mockTransport.send).toHaveBeenCalledTimes(1);
102104
expect(mockTransport.send).toHaveBeenCalledWith([
@@ -161,7 +163,8 @@ describe.skipIf(NODE_MAJOR_VERSION < 20)('workflows', () => {
161163

162164
expect(mockStep.do).toHaveBeenCalledTimes(1);
163165
expect(mockStep.do).toHaveBeenCalledWith('first step', expect.any(Function));
164-
expect(mockContext.waitUntil).toHaveBeenCalledTimes(1);
166+
// We flush after the step.do and at the end of the run
167+
expect(mockContext.waitUntil).toHaveBeenCalledTimes(2);
165168
expect(mockContext.waitUntil).toHaveBeenCalledWith(expect.any(Promise));
166169
expect(mockTransport.send).toHaveBeenCalledTimes(1);
167170
expect(mockTransport.send).toHaveBeenCalledWith([
@@ -232,8 +235,10 @@ describe.skipIf(NODE_MAJOR_VERSION < 20)('workflows', () => {
232235

233236
expect(mockStep.do).toHaveBeenCalledTimes(1);
234237
expect(mockStep.do).toHaveBeenCalledWith('sometimes error step', expect.any(Function));
235-
expect(mockContext.waitUntil).toHaveBeenCalledTimes(2);
238+
// One flush for the error transaction, one for the retry success, one at end of run
239+
expect(mockContext.waitUntil).toHaveBeenCalledTimes(3);
236240
expect(mockContext.waitUntil).toHaveBeenCalledWith(expect.any(Promise));
241+
// error event + failed transaction + successful retry transaction
237242
expect(mockTransport.send).toHaveBeenCalledTimes(3);
238243

239244
// First we should get the error event
@@ -376,11 +381,11 @@ describe.skipIf(NODE_MAJOR_VERSION < 20)('workflows', () => {
376381

377382
expect(mockStep.do).toHaveBeenCalledTimes(1);
378383
expect(mockStep.do).toHaveBeenCalledWith('sometimes error step', expect.any(Function));
379-
expect(mockContext.waitUntil).toHaveBeenCalledTimes(2);
384+
// One flush for the error event and one at end of run
385+
expect(mockContext.waitUntil).toHaveBeenCalledTimes(3);
380386
expect(mockContext.waitUntil).toHaveBeenCalledWith(expect.any(Promise));
381387

382-
// We should get the error event and then nothing else. No transactions
383-
// should be sent
388+
// We should get the error event and then nothing else. No transactions should be sent
384389
expect(mockTransport.send).toHaveBeenCalledTimes(1);
385390

386391
expect(mockTransport.send).toHaveBeenCalledWith([
@@ -421,4 +426,41 @@ describe.skipIf(NODE_MAJOR_VERSION < 20)('workflows', () => {
421426
],
422427
]);
423428
});
429+
430+
test('Step.do span becomes child of surrounding custom span', async () => {
431+
class ParentChildWorkflow {
432+
constructor(_ctx: ExecutionContext, _env: unknown) {}
433+
434+
async run(_event: Readonly<WorkflowEvent<Params>>, step: WorkflowStep): Promise<void> {
435+
await startSpan({ name: 'custom span' }, async () => {
436+
await step.do('first step', async () => {
437+
return { files: ['a'] };
438+
});
439+
});
440+
}
441+
}
442+
443+
const TestWorkflowInstrumented = instrumentWorkflowWithSentry(getSentryOptions, ParentChildWorkflow as any);
444+
const workflow = new TestWorkflowInstrumented(mockContext, {}) as ParentChildWorkflow;
445+
const event = { payload: {}, timestamp: new Date(), instanceId: INSTANCE_ID };
446+
await workflow.run(event, mockStep);
447+
448+
// Flush after step.do and at end of run
449+
expect(mockContext.waitUntil).toHaveBeenCalledTimes(2);
450+
expect(mockTransport.send).toHaveBeenCalledTimes(1);
451+
452+
const sendArg = mockTransport.send.mock.calls[0]![0] as any;
453+
const items = sendArg[1] as any[];
454+
const rootSpanItem = items.find(i => i[0].type === 'transaction');
455+
expect(rootSpanItem).toBeDefined();
456+
const rootSpan = rootSpanItem[1];
457+
458+
expect(rootSpan.transaction).toBe('custom span');
459+
const rootSpanId = rootSpan.contexts.trace.span_id;
460+
461+
// Child span for the step.do with the custom span as parent
462+
const stepSpan = rootSpan.spans.find((s: any) => s.description === 'first step' && s.op === 'function.step.do');
463+
expect(stepSpan).toBeDefined();
464+
expect(stepSpan.parent_span_id).toBe(rootSpanId);
465+
});
424466
});

0 commit comments

Comments
 (0)