Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 53 additions & 51 deletions packages/cloudflare/src/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { PropagationContext } from '@sentry/core';
import {
captureException,
flush,
getCurrentScope,
SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN,
SEMANTIC_ATTRIBUTE_SENTRY_SOURCE,
startSpan,
Expand Down Expand Up @@ -56,29 +57,6 @@ async function propagationContextFromInstanceId(instanceId: string): Promise<Pro
};
}

async function workflowStepWithSentry<V>(
instanceId: string,
options: CloudflareOptions,
callback: () => V,
): Promise<V> {
setAsyncLocalStorageAsyncContextStrategy();

return withIsolationScope(async isolationScope => {
const client = init({ ...options, enableDedupe: false });
isolationScope.setClient(client);

addCloudResourceContext(isolationScope);

return withScope(async scope => {
const propagationContext = await propagationContextFromInstanceId(instanceId);
scope.setPropagationContext(propagationContext);

// eslint-disable-next-line no-return-await
return await callback();
});
});
}

class WrappedWorkflowStep implements WorkflowStep {
public constructor(
private _instanceId: string,
Expand All @@ -98,38 +76,40 @@ class WrappedWorkflowStep implements WorkflowStep {
configOrCallback: WorkflowStepConfig | (() => Promise<T>),
maybeCallback?: () => Promise<T>,
): Promise<T> {
// Capture the current scope, so parent span (e.g., a startSpan surrounding step.do) is preserved
const scopeForStep = getCurrentScope();

const userCallback = (maybeCallback || configOrCallback) as () => Promise<T>;
const config = typeof configOrCallback === 'function' ? undefined : configOrCallback;

const instrumentedCallback: () => Promise<T> = async () => {
return workflowStepWithSentry(this._instanceId, this._options, async () => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can it be a problem that we no longer create a sentry instance here? 🤔 is there always an existing sentry instance outside here...?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we always need to create a Sentry instance (or maybe check that one exists?) inside step.do because these can be run in a new execution context if the workflow gets suspended.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under the assumption that run is called on every new invocation of the workflow (e.g. after hibernation), we should have a client here

return startSpan(
{
op: 'function.step.do',
name,
attributes: {
'cloudflare.workflow.timeout': config?.timeout,
'cloudflare.workflow.retries.backoff': config?.retries?.backoff,
'cloudflare.workflow.retries.delay': config?.retries?.delay,
'cloudflare.workflow.retries.limit': config?.retries?.limit,
[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.faas.cloudflare.workflow',
[SEMANTIC_ATTRIBUTE_SENTRY_SOURCE]: 'task',
},
},
async span => {
try {
const result = await userCallback();
span.setStatus({ code: 1 });
return result;
} catch (error) {
captureException(error, { mechanism: { handled: true, type: 'cloudflare' } });
throw error;
} finally {
this._ctx.waitUntil(flush(2000));
}
return startSpan(
{
op: 'function.step.do',
name,
scope: scopeForStep,
attributes: {
'cloudflare.workflow.timeout': config?.timeout,
'cloudflare.workflow.retries.backoff': config?.retries?.backoff,
'cloudflare.workflow.retries.delay': config?.retries?.delay,
'cloudflare.workflow.retries.limit': config?.retries?.limit,
[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.faas.cloudflare.workflow',
[SEMANTIC_ATTRIBUTE_SENTRY_SOURCE]: 'task',
},
);
});
},
async span => {
try {
const result = await userCallback();
span.setStatus({ code: 1 });
return result;
} catch (error) {
captureException(error, { mechanism: { handled: true, type: 'cloudflare' } });
throw error;
} finally {
this._ctx.waitUntil(flush(2000));
}
},
);
};

return config ? this._step.do(name, config, instrumentedCallback) : this._step.do(name, instrumentedCallback);
Expand Down Expand Up @@ -183,7 +163,29 @@ export function instrumentWorkflowWithSentry<
get(obj, prop, receiver) {
if (prop === 'run') {
return async function (event: WorkflowEvent<P>, step: WorkflowStep): Promise<unknown> {
return obj.run.call(obj, event, new WrappedWorkflowStep(event.instanceId, ctx, options, step));
setAsyncLocalStorageAsyncContextStrategy();

return withIsolationScope(async isolationScope => {
const client = init({ ...options, enableDedupe: false });
isolationScope.setClient(client);

addCloudResourceContext(isolationScope);

return withScope(async scope => {
const propagationContext = await propagationContextFromInstanceId(event.instanceId);
scope.setPropagationContext(propagationContext);

try {
return await obj.run.call(
obj,
event,
new WrappedWorkflowStep(event.instanceId, ctx, options, step),
);
} finally {
ctx.waitUntil(flush(2000));
}
});
});
};
}
return Reflect.get(obj, prop, receiver);
Expand Down
54 changes: 48 additions & 6 deletions packages/cloudflare/test/workflow.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/* eslint-disable @typescript-eslint/unbound-method */
import { startSpan } from '@sentry/core';
import type { WorkflowEvent, WorkflowStep, WorkflowStepConfig } from 'cloudflare:workers';
import { beforeEach, describe, expect, test, vi } from 'vitest';
import { deterministicTraceIdFromInstanceId, instrumentWorkflowWithSentry } from '../src/workflows';
Expand Down Expand Up @@ -96,7 +97,8 @@ describe.skipIf(NODE_MAJOR_VERSION < 20)('workflows', () => {

expect(mockStep.do).toHaveBeenCalledTimes(1);
expect(mockStep.do).toHaveBeenCalledWith('first step', expect.any(Function));
expect(mockContext.waitUntil).toHaveBeenCalledTimes(1);
// We flush after the step.do and at the end of the run
expect(mockContext.waitUntil).toHaveBeenCalledTimes(2);
expect(mockContext.waitUntil).toHaveBeenCalledWith(expect.any(Promise));
expect(mockTransport.send).toHaveBeenCalledTimes(1);
expect(mockTransport.send).toHaveBeenCalledWith([
Expand Down Expand Up @@ -161,7 +163,8 @@ describe.skipIf(NODE_MAJOR_VERSION < 20)('workflows', () => {

expect(mockStep.do).toHaveBeenCalledTimes(1);
expect(mockStep.do).toHaveBeenCalledWith('first step', expect.any(Function));
expect(mockContext.waitUntil).toHaveBeenCalledTimes(1);
// We flush after the step.do and at the end of the run
expect(mockContext.waitUntil).toHaveBeenCalledTimes(2);
expect(mockContext.waitUntil).toHaveBeenCalledWith(expect.any(Promise));
expect(mockTransport.send).toHaveBeenCalledTimes(1);
expect(mockTransport.send).toHaveBeenCalledWith([
Expand Down Expand Up @@ -232,8 +235,10 @@ describe.skipIf(NODE_MAJOR_VERSION < 20)('workflows', () => {

expect(mockStep.do).toHaveBeenCalledTimes(1);
expect(mockStep.do).toHaveBeenCalledWith('sometimes error step', expect.any(Function));
expect(mockContext.waitUntil).toHaveBeenCalledTimes(2);
// One flush for the error transaction, one for the retry success, one at end of run
expect(mockContext.waitUntil).toHaveBeenCalledTimes(3);
expect(mockContext.waitUntil).toHaveBeenCalledWith(expect.any(Promise));
// error event + failed transaction + successful retry transaction
expect(mockTransport.send).toHaveBeenCalledTimes(3);

// First we should get the error event
Expand Down Expand Up @@ -376,11 +381,11 @@ describe.skipIf(NODE_MAJOR_VERSION < 20)('workflows', () => {

expect(mockStep.do).toHaveBeenCalledTimes(1);
expect(mockStep.do).toHaveBeenCalledWith('sometimes error step', expect.any(Function));
expect(mockContext.waitUntil).toHaveBeenCalledTimes(2);
// One flush for the error event and one at end of run
expect(mockContext.waitUntil).toHaveBeenCalledTimes(3);
expect(mockContext.waitUntil).toHaveBeenCalledWith(expect.any(Promise));

// We should get the error event and then nothing else. No transactions
// should be sent
// We should get the error event and then nothing else. No transactions should be sent
expect(mockTransport.send).toHaveBeenCalledTimes(1);

expect(mockTransport.send).toHaveBeenCalledWith([
Expand Down Expand Up @@ -421,4 +426,41 @@ describe.skipIf(NODE_MAJOR_VERSION < 20)('workflows', () => {
],
]);
});

test('Step.do span becomes child of surrounding custom span', async () => {
class ParentChildWorkflow {
constructor(_ctx: ExecutionContext, _env: unknown) {}

async run(_event: Readonly<WorkflowEvent<Params>>, step: WorkflowStep): Promise<void> {
await startSpan({ name: 'custom span' }, async () => {
await step.do('first step', async () => {
return { files: ['a'] };
});
});
}
}

const TestWorkflowInstrumented = instrumentWorkflowWithSentry(getSentryOptions, ParentChildWorkflow as any);
const workflow = new TestWorkflowInstrumented(mockContext, {}) as ParentChildWorkflow;
const event = { payload: {}, timestamp: new Date(), instanceId: INSTANCE_ID };
await workflow.run(event, mockStep);

// Flush after step.do and at end of run
expect(mockContext.waitUntil).toHaveBeenCalledTimes(2);
expect(mockTransport.send).toHaveBeenCalledTimes(1);

const sendArg = mockTransport.send.mock.calls[0]![0] as any;
const items = sendArg[1] as any[];
const rootSpanItem = items.find(i => i[0].type === 'transaction');
expect(rootSpanItem).toBeDefined();
const rootSpan = rootSpanItem[1];

expect(rootSpan.transaction).toBe('custom span');
const rootSpanId = rootSpan.contexts.trace.span_id;

// Child span for the step.do with the custom span as parent
const stepSpan = rootSpan.spans.find((s: any) => s.description === 'first step' && s.op === 'function.step.do');
expect(stepSpan).toBeDefined();
expect(stepSpan.parent_span_id).toBe(rootSpanId);
});
});