From 3cc9b68d871faf83548691b2a5974c75f70c5ddc Mon Sep 17 00:00:00 2001 From: Tim Fish Date: Sun, 22 Jun 2025 16:21:58 +0200 Subject: [PATCH 1/7] feat(cloudflare): Add `instrumentWorkflowWithSentry` to instrument workflows --- packages/cloudflare/package.json | 2 +- packages/cloudflare/src/client.ts | 9 +- packages/cloudflare/src/index.ts | 2 + packages/cloudflare/src/sdk.ts | 4 +- packages/cloudflare/src/workflows.ts | 207 +++++++++++++ packages/cloudflare/test/workflow.test.ts | 350 ++++++++++++++++++++++ yarn.lock | 9 +- 7 files changed, 574 insertions(+), 9 deletions(-) create mode 100644 packages/cloudflare/src/workflows.ts create mode 100644 packages/cloudflare/test/workflow.test.ts diff --git a/packages/cloudflare/package.json b/packages/cloudflare/package.json index 7df19ebc4636..16e478f6d67f 100644 --- a/packages/cloudflare/package.json +++ b/packages/cloudflare/package.json @@ -60,7 +60,7 @@ } }, "devDependencies": { - "@cloudflare/workers-types": "4.20240725.0", + "@cloudflare/workers-types": "4.20250620.0", "@types/node": "^18.19.1", "wrangler": "^3.67.1" }, diff --git a/packages/cloudflare/src/client.ts b/packages/cloudflare/src/client.ts index 224865e3731e..ec1a95630b4f 100644 --- a/packages/cloudflare/src/client.ts +++ b/packages/cloudflare/src/client.ts @@ -29,8 +29,13 @@ export class CloudflareClient extends ServerRuntimeClient( + instanceId: string, + options: CloudflareOptions, + callback: () => V, +): Promise { + setAsyncLocalStorageAsyncContextStrategy(); + + return withIsolationScope(async isolationScope => { + const client = init({ ...options, isWorkflow: true }); + isolationScope.setClient(client); + + addCloudResourceContext(isolationScope); + + return withScope(async scope => { + const propagationContext = propagationContextFromInstanceId(instanceId); + scope.setPropagationContext(propagationContext); + + // eslint-disable-next-line no-return-await + return await callback(); + }); + }); +} + +class WrappedWorkflowStep implements WorkflowStep { + public constructor( + private _instanceId: string, + private _ctx: ExecutionContext, + private _options: CloudflareOptions, + private _step: WorkflowStep, + ) {} + + public async do>(name: string, callback: () => Promise): Promise; + public async do>( + name: string, + config: WorkflowStepConfig, + callback: () => Promise, + ): Promise; + public async do>( + name: string, + configOrCallback: WorkflowStepConfig | (() => Promise), + maybeCallback?: () => Promise, + ): Promise { + if (typeof configOrCallback === 'function') { + // do(name, callback) + return this._step.do(name, async () => { + // eslint-disable-next-line no-return-await + return await workflowStepWithSentry(this._instanceId, this._options, async () => { + // eslint-disable-next-line no-return-await + return await startSpan( + { + op: 'function.step.do', + name, + attributes: { + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.faas.cloudflare.workflow', + [SEMANTIC_ATTRIBUTE_SENTRY_SOURCE]: 'task', + }, + }, + async span => { + try { + const result = await configOrCallback(); + span.setStatus({ code: 1 }); + return result; + } catch (error) { + captureException(error, { mechanism: { handled: true, type: 'cloudflare' } }); + throw error; + } finally { + this._ctx.waitUntil(flush(2000)); + } + }, + ); + }); + }); + } else if (typeof maybeCallback === 'function') { + // do(name, config, callback) + return this._step.do(name, configOrCallback, async () => { + // eslint-disable-next-line no-return-await + return await workflowStepWithSentry(this._instanceId, this._options, async () => { + // eslint-disable-next-line no-return-await + return await startSpan( + { + op: 'function.step.do', + name, + attributes: { + 'cloudflare.workflow.timeout': configOrCallback?.timeout, + 'cloudflare.workflow.retries.backoff': configOrCallback?.retries?.backoff, + 'cloudflare.workflow.retries.delay': configOrCallback?.retries?.delay, + 'cloudflare.workflow.retries.limit': configOrCallback?.retries?.limit, + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.faas.cloudflare.workflow', + [SEMANTIC_ATTRIBUTE_SENTRY_SOURCE]: 'task', + }, + }, + async span => { + try { + const result = await maybeCallback(); + span.setStatus({ code: 1 }); + return result; + } catch (error) { + captureException(error, { mechanism: { handled: true, type: 'cloudflare' } }); + throw error; + } finally { + this._ctx.waitUntil(flush(2000)); + } + }, + ); + }); + }); + } else { + throw new Error( + 'Invalid arguments for `step.do` method. Expected either (name, callback) or (name, config, callback).', + ); + } + } + + public async sleep(name: string, duration: WorkflowSleepDuration): Promise { + return this._step.sleep(name, duration); + } + + public async sleepUntil(name: string, timestamp: Date | number): Promise { + return this._step.sleepUntil(name, timestamp); + } + + public async waitForEvent>( + name: string, + options: { type: string; timeout?: WorkflowTimeoutDuration | number }, + ): Promise> { + return this._step.waitForEvent(name, options); + } +} + +/** + * + * @param optionsCallback + * @param WorkFlowClass + * @returns + */ +export function instrumentWorkflowWithSentry< + E, // Environment type + P, // Payload type + T extends WorkflowEntrypoint, // WorkflowEntrypoint type + C extends new (ctx: ExecutionContext, env: E) => T, // Constructor type of the WorkflowEntrypoint class +>(optionsCallback: (env: E) => CloudflareOptions, WorkFlowClass: C): C { + return new Proxy(WorkFlowClass, { + construct(target: C, args: [ctx: ExecutionContext, env: E], newTarget) { + const [ctx, env] = args; + const options = optionsCallback(env); + const instance = Reflect.construct(target, args, newTarget) as T; + return new Proxy(instance, { + get(obj, prop, receiver) { + if (prop === 'run') { + return async function (event: WorkflowEvent

, step: WorkflowStep): Promise { + return obj.run.call(obj, event, new WrappedWorkflowStep(event.instanceId, ctx, options, step)); + }; + } + return Reflect.get(obj, prop, receiver); + }, + }); + }, + }) as C; +} diff --git a/packages/cloudflare/test/workflow.test.ts b/packages/cloudflare/test/workflow.test.ts new file mode 100644 index 000000000000..9d4dee09702e --- /dev/null +++ b/packages/cloudflare/test/workflow.test.ts @@ -0,0 +1,350 @@ +/* eslint-disable @typescript-eslint/unbound-method */ +import type { WorkflowEvent, WorkflowStep, WorkflowStepConfig } from 'cloudflare:workers'; +import { inspect } from 'util'; +import { beforeEach, describe, expect, test, vi } from 'vitest'; +import { instrumentWorkflowWithSentry } from '../src/workflows'; + +const mockStep: WorkflowStep = { + do: vi + .fn() + .mockImplementation( + async ( + _name: string, + configOrCallback: WorkflowStepConfig | (() => Promise), + maybeCallback?: () => Promise, + ) => { + let count = 0; + + while (count <= 5) { + count += 1; + + try { + if (typeof configOrCallback === 'function') { + return await configOrCallback(); + } else { + return await (maybeCallback ? maybeCallback() : Promise.resolve()); + } + } catch (error) { + await new Promise(resolve => setTimeout(resolve, 1000)); + } + } + }, + ), + sleep: vi.fn(), + sleepUntil: vi.fn(), + waitForEvent: vi.fn(), +}; + +const mockTransport = { + send: vi.fn().mockImplementation(() => Promise.resolve({ statusCode: 200 })), + flush: vi.fn().mockImplementation(() => Promise.resolve(true)), + close: vi.fn().mockImplementation(() => Promise.resolve(true)), +}; + +const mockContext: ExecutionContext = { + waitUntil: vi.fn().mockImplementation(promise => promise), + passThroughOnException: vi.fn(), + props: {}, +}; + +function getSentryOptions() { + return { + dsn: 'https://8@ingest.sentry.io/4', + release: '1.0.0', + tracesSampleRate: 1.0, + transport: () => mockTransport, + }; +} + +type Params = { + // +}; + +const INSTANCE_ID = 'ae0ee067-61b3-4852-9219-5d62282270f0'; +const SAMPLE_RAND = '0.44116884107728693'; +const TRACE_ID = INSTANCE_ID.replace(/-/g, ''); + +describe('workflows', () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + test('Calls expected functions', async () => { + class BasicTestWorkflow { + constructor(_ctx: ExecutionContext, _env: unknown) {} + + async run(_event: Readonly>, step: WorkflowStep): Promise { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const files = await step.do('first step', async () => { + return { files: ['doc_7392_rev3.pdf', 'report_x29_final.pdf'] }; + }); + } + } + + const TestWorkflowInstrumented = instrumentWorkflowWithSentry(getSentryOptions, BasicTestWorkflow as any); + const workflow = new TestWorkflowInstrumented(mockContext, {}) as BasicTestWorkflow; + const event = { payload: {}, timestamp: new Date(), instanceId: INSTANCE_ID }; + await workflow.run(event, mockStep); + + expect(mockStep.do).toHaveBeenCalledTimes(1); + expect(mockStep.do).toHaveBeenCalledWith('first step', expect.any(Function)); + expect(mockContext.waitUntil).toHaveBeenCalledTimes(1); + expect(mockContext.waitUntil).toHaveBeenCalledWith(expect.any(Promise)); + expect(mockTransport.send).toHaveBeenCalledTimes(1); + expect(mockTransport.send).toHaveBeenCalledWith([ + expect.objectContaining({ + trace: expect.objectContaining({ + transaction: 'first step', + trace_id: TRACE_ID, + sample_rand: SAMPLE_RAND, + }), + }), + [ + [ + { + type: 'transaction', + }, + expect.objectContaining({ + event_id: expect.any(String), + contexts: { + trace: { + parent_span_id: undefined, + span_id: expect.any(String), + trace_id: TRACE_ID, + data: { + 'sentry.origin': 'auto.faas.cloudflare.workflow', + 'sentry.op': 'function.step.do', + 'sentry.source': 'task', + 'sentry.sample_rate': 1, + }, + op: 'function.step.do', + status: 'ok', + origin: 'auto.faas.cloudflare.workflow', + }, + cloud_resource: { 'cloud.provider': 'cloudflare' }, + runtime: { name: 'cloudflare' }, + }, + type: 'transaction', + transaction_info: { source: 'task' }, + start_timestamp: expect.any(Number), + timestamp: expect.any(Number), + }), + ], + ], + ]); + }); + + class ErrorTestWorkflow { + count = 0; + constructor(_ctx: ExecutionContext, _env: unknown) {} + + async run(_event: Readonly>, step: WorkflowStep): Promise { + await step.do('sometimes error step', async () => { + this.count += 1; + + if (this.count <= 1) { + throw new Error('Test error'); + } + + return { files: ['doc_7392_rev3.pdf', 'report_x29_final.pdf'] }; + }); + } + } + + test('Captures step errors', async () => { + const TestWorkflowInstrumented = instrumentWorkflowWithSentry(getSentryOptions, ErrorTestWorkflow as any); + const workflow = new TestWorkflowInstrumented(mockContext, {}) as ErrorTestWorkflow; + const event = { payload: {}, timestamp: new Date(), instanceId: INSTANCE_ID }; + await workflow.run(event, mockStep); + + expect(mockStep.do).toHaveBeenCalledTimes(1); + expect(mockStep.do).toHaveBeenCalledWith('sometimes error step', expect.any(Function)); + expect(mockContext.waitUntil).toHaveBeenCalledTimes(2); + expect(mockContext.waitUntil).toHaveBeenCalledWith(expect.any(Promise)); + expect(mockTransport.send).toHaveBeenCalledTimes(3); + + // First we should get the error event + expect(mockTransport.send).toHaveBeenNthCalledWith(1, [ + expect.objectContaining({ + trace: expect.objectContaining({ + transaction: 'sometimes error step', + trace_id: TRACE_ID, + sample_rand: SAMPLE_RAND, + }), + }), + [ + [ + { + type: 'event', + }, + expect.objectContaining({ + event_id: expect.any(String), + contexts: { + trace: { + parent_span_id: undefined, + span_id: expect.any(String), + trace_id: TRACE_ID, + }, + cloud_resource: { 'cloud.provider': 'cloudflare' }, + runtime: { name: 'cloudflare' }, + }, + timestamp: expect.any(Number), + exception: { + values: [ + expect.objectContaining({ + type: 'Error', + value: 'Test error', + }), + ], + }, + }), + ], + ], + ]); + + // The the failed transaction + expect(mockTransport.send).toHaveBeenNthCalledWith(2, [ + expect.objectContaining({ + trace: expect.objectContaining({ + transaction: 'sometimes error step', + trace_id: TRACE_ID, + sample_rand: SAMPLE_RAND, + }), + }), + [ + [ + { + type: 'transaction', + }, + expect.objectContaining({ + event_id: expect.any(String), + contexts: { + trace: { + parent_span_id: undefined, + span_id: expect.any(String), + trace_id: TRACE_ID, + data: { + 'sentry.origin': 'auto.faas.cloudflare.workflow', + 'sentry.op': 'function.step.do', + 'sentry.source': 'task', + 'sentry.sample_rate': 1, + }, + op: 'function.step.do', + status: 'internal_error', + origin: 'auto.faas.cloudflare.workflow', + }, + cloud_resource: { 'cloud.provider': 'cloudflare' }, + runtime: { name: 'cloudflare' }, + }, + type: 'transaction', + transaction_info: { source: 'task' }, + start_timestamp: expect.any(Number), + timestamp: expect.any(Number), + }), + ], + ], + ]); + + // The the successful transaction + expect(mockTransport.send).toHaveBeenNthCalledWith(3, [ + expect.objectContaining({ + trace: expect.objectContaining({ + transaction: 'sometimes error step', + trace_id: TRACE_ID, + sample_rand: SAMPLE_RAND, + }), + }), + [ + [ + { + type: 'transaction', + }, + expect.objectContaining({ + event_id: expect.any(String), + contexts: { + trace: { + parent_span_id: undefined, + span_id: expect.any(String), + trace_id: TRACE_ID, + data: { + 'sentry.origin': 'auto.faas.cloudflare.workflow', + 'sentry.op': 'function.step.do', + 'sentry.source': 'task', + 'sentry.sample_rate': 1, + }, + op: 'function.step.do', + status: 'ok', + origin: 'auto.faas.cloudflare.workflow', + }, + cloud_resource: { 'cloud.provider': 'cloudflare' }, + runtime: { name: 'cloudflare' }, + }, + type: 'transaction', + transaction_info: { source: 'task' }, + start_timestamp: expect.any(Number), + timestamp: expect.any(Number), + }), + ], + ], + ]); + }); + + test('Sampled random via instanceId', async () => { + const TestWorkflowInstrumented = instrumentWorkflowWithSentry( + // Override the tracesSampleRate to 0.4 to be below the sampleRand + // calculated from the instanceId + () => ({ ...getSentryOptions(), tracesSampleRate: 0.4 }), + ErrorTestWorkflow as any, + ); + const workflow = new TestWorkflowInstrumented(mockContext, {}) as ErrorTestWorkflow; + const event = { payload: {}, timestamp: new Date(), instanceId: INSTANCE_ID }; + await workflow.run(event, mockStep); + + expect(mockStep.do).toHaveBeenCalledTimes(1); + expect(mockStep.do).toHaveBeenCalledWith('sometimes error step', expect.any(Function)); + expect(mockContext.waitUntil).toHaveBeenCalledTimes(2); + expect(mockContext.waitUntil).toHaveBeenCalledWith(expect.any(Promise)); + + // We should get the error event and then nothing else. No transactions + // should be sent + expect(mockTransport.send).toHaveBeenCalledTimes(1); + + expect(mockTransport.send).toHaveBeenCalledWith([ + expect.objectContaining({ + trace: expect.objectContaining({ + transaction: 'sometimes error step', + trace_id: TRACE_ID, + sample_rand: SAMPLE_RAND, + }), + }), + [ + [ + { + type: 'event', + }, + expect.objectContaining({ + event_id: expect.any(String), + contexts: { + trace: { + parent_span_id: undefined, + span_id: expect.any(String), + trace_id: TRACE_ID, + }, + cloud_resource: { 'cloud.provider': 'cloudflare' }, + runtime: { name: 'cloudflare' }, + }, + timestamp: expect.any(Number), + exception: { + values: [ + expect.objectContaining({ + type: 'Error', + value: 'Test error', + }), + ], + }, + }), + ], + ], + ]); + }); +}); diff --git a/yarn.lock b/yarn.lock index ce3e50da8c41..58d547aea551 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2671,10 +2671,10 @@ resolved "https://registry.yarnpkg.com/@cloudflare/workerd-windows-64/-/workerd-windows-64-1.20240718.0.tgz#940893e62df7f5a8ec895572b834c95c1e256fbd" integrity sha512-YpCRvvT47XanFum7C3SedOZKK6BfVhqmwdAAVAQFyc4gsCdegZo0JkUkdloC/jwuWlbCACOG2HTADHOqyeolzQ== -"@cloudflare/workers-types@4.20240725.0": - version "4.20240725.0" - resolved "https://registry.yarnpkg.com/@cloudflare/workers-types/-/workers-types-4.20240725.0.tgz#e151e0c069c0070b4d7168c7b0d4ae6c50d3f237" - integrity sha512-L6T/Bg50zm9IIACQVQ0CdVcQL+2nLkRXdPz6BsXF3SlzgjyWR5ndVctAbfr/HLV7aKYxWnnEZsIORsTWb+FssA== +"@cloudflare/workers-types@4.20250620.0": + version "4.20250620.0" + resolved "https://registry.yarnpkg.com/@cloudflare/workers-types/-/workers-types-4.20250620.0.tgz#a22e635a631212963b84e315191614b20c4ad317" + integrity sha512-EVvRB/DJEm6jhdKg+A4Qm4y/ry1cIvylSgSO3/f/Bv161vldDRxaXM2YoQQWFhLOJOw0qtrHsKOD51KYxV1XCw== "@cnakazawa/watch@^1.0.3": version "1.0.4" @@ -27148,7 +27148,6 @@ stylus@0.59.0, stylus@^0.59.0: sucrase@^3.27.0, sucrase@^3.35.0, sucrase@getsentry/sucrase#es2020-polyfills: version "3.36.0" - uid fd682f6129e507c00bb4e6319cc5d6b767e36061 resolved "https://codeload.github.com/getsentry/sucrase/tar.gz/fd682f6129e507c00bb4e6319cc5d6b767e36061" dependencies: "@jridgewell/gen-mapping" "^0.3.2" From 332031109cfe0423aaf0530bc080a08585492ad9 Mon Sep 17 00:00:00 2001 From: Tim Fish Date: Sun, 22 Jun 2025 16:52:11 +0200 Subject: [PATCH 2/7] check mechanism --- packages/cloudflare/test/workflow.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/cloudflare/test/workflow.test.ts b/packages/cloudflare/test/workflow.test.ts index 9d4dee09702e..06ef89670790 100644 --- a/packages/cloudflare/test/workflow.test.ts +++ b/packages/cloudflare/test/workflow.test.ts @@ -1,6 +1,5 @@ /* eslint-disable @typescript-eslint/unbound-method */ import type { WorkflowEvent, WorkflowStep, WorkflowStepConfig } from 'cloudflare:workers'; -import { inspect } from 'util'; import { beforeEach, describe, expect, test, vi } from 'vitest'; import { instrumentWorkflowWithSentry } from '../src/workflows'; @@ -194,6 +193,7 @@ describe('workflows', () => { expect.objectContaining({ type: 'Error', value: 'Test error', + mechanism: { type: 'cloudflare', handled: true } }), ], }, From 2d76224451a08b85455de42d88184672a9ddebc1 Mon Sep 17 00:00:00 2001 From: Tim Fish Date: Sun, 22 Jun 2025 17:25:42 +0200 Subject: [PATCH 3/7] Lint --- packages/cloudflare/src/pages-plugin.ts | 4 +++- packages/cloudflare/test/workflow.test.ts | 6 +++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/packages/cloudflare/src/pages-plugin.ts b/packages/cloudflare/src/pages-plugin.ts index 8bdc806b5693..fe784194d411 100644 --- a/packages/cloudflare/src/pages-plugin.ts +++ b/packages/cloudflare/src/pages-plugin.ts @@ -49,6 +49,8 @@ export function sentryPagesPlugin< setAsyncLocalStorageAsyncContextStrategy(); return context => { const options = typeof handlerOrOptions === 'function' ? handlerOrOptions(context) : handlerOrOptions; - return wrapRequestHandler({ options, request: context.request, context }, () => context.next()); + return wrapRequestHandler({ options, request: context.request, context: { ...context, props: {} } }, () => + context.next(), + ); }; } diff --git a/packages/cloudflare/test/workflow.test.ts b/packages/cloudflare/test/workflow.test.ts index 06ef89670790..03eee5191eb2 100644 --- a/packages/cloudflare/test/workflow.test.ts +++ b/packages/cloudflare/test/workflow.test.ts @@ -193,7 +193,7 @@ describe('workflows', () => { expect.objectContaining({ type: 'Error', value: 'Test error', - mechanism: { type: 'cloudflare', handled: true } + mechanism: { type: 'cloudflare', handled: true }, }), ], }, @@ -291,8 +291,8 @@ describe('workflows', () => { test('Sampled random via instanceId', async () => { const TestWorkflowInstrumented = instrumentWorkflowWithSentry( - // Override the tracesSampleRate to 0.4 to be below the sampleRand - // calculated from the instanceId + // Override the tracesSampleRate to 0.4 to be below the sampleRand + // calculated from the instanceId () => ({ ...getSentryOptions(), tracesSampleRate: 0.4 }), ErrorTestWorkflow as any, ); From 57baae68163b6b2422a02aa209453e459b7ec2d7 Mon Sep 17 00:00:00 2001 From: Tim Fish Date: Sun, 22 Jun 2025 17:46:46 +0200 Subject: [PATCH 4/7] Fix sveltkit cloudflare types --- packages/sveltekit/src/worker/cloudflare.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sveltekit/src/worker/cloudflare.ts b/packages/sveltekit/src/worker/cloudflare.ts index 9508c331369e..e3d93e8b8922 100644 --- a/packages/sveltekit/src/worker/cloudflare.ts +++ b/packages/sveltekit/src/worker/cloudflare.ts @@ -34,7 +34,7 @@ export function initCloudflareSentryHandle(options: CloudflareOptions): Handle { return wrapRequestHandler( { options: opts, - request: event.request, + request: event.request as Request>, // @ts-expect-error This will exist in Cloudflare context: event.platform.context, }, From 0812b6dd264f2a29a1443abed823dfbae1e806d0 Mon Sep 17 00:00:00 2001 From: Tim Fish Date: Mon, 23 Jun 2025 15:43:14 +0200 Subject: [PATCH 5/7] PR review Co-authored-by: Abhijeet Prasad --- packages/cloudflare/src/workflows.ts | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/packages/cloudflare/src/workflows.ts b/packages/cloudflare/src/workflows.ts index 0a5ff9705e1e..2e104ae8e601 100644 --- a/packages/cloudflare/src/workflows.ts +++ b/packages/cloudflare/src/workflows.ts @@ -176,10 +176,21 @@ class WrappedWorkflowStep implements WorkflowStep { } /** - * - * @param optionsCallback - * @param WorkFlowClass - * @returns + * Instruments a Cloudflare Workflow class with Sentry. + * + * @example + * ```typescript + * const InstrumentedWorkflow = instrumentWorkflowWithSentry( + * (env) => ({ dsn: env.SENTRY_DSN }), + * MyWorkflowClass + * ); + * + * export default InstrumentedWorkflow; + * ``` + * + * @param optionsCallback - Function that returns Sentry options to initialize Sentry + * @param WorkflowClass - The workflow class to instrument + * @returns Instrumented workflow class with the same interface */ export function instrumentWorkflowWithSentry< E, // Environment type From 5e869003829b931c022e393d611a9146ec511599 Mon Sep 17 00:00:00 2001 From: Tim Fish Date: Mon, 23 Jun 2025 15:57:36 +0200 Subject: [PATCH 6/7] More PR review --- packages/cloudflare/src/client.ts | 7 +- packages/cloudflare/src/sdk.ts | 2 +- packages/cloudflare/src/workflows.ts | 112 ++++++++++----------------- 3 files changed, 46 insertions(+), 75 deletions(-) diff --git a/packages/cloudflare/src/client.ts b/packages/cloudflare/src/client.ts index ec1a95630b4f..9b4f18086658 100644 --- a/packages/cloudflare/src/client.ts +++ b/packages/cloudflare/src/client.ts @@ -31,10 +31,11 @@ export class CloudflareClient extends ServerRuntimeClient( setAsyncLocalStorageAsyncContextStrategy(); return withIsolationScope(async isolationScope => { - const client = init({ ...options, isWorkflow: true }); + const client = init({ ...options, enableDedupe: false }); isolationScope.setClient(client); addCloudResourceContext(isolationScope); @@ -88,75 +88,45 @@ class WrappedWorkflowStep implements WorkflowStep { configOrCallback: WorkflowStepConfig | (() => Promise), maybeCallback?: () => Promise, ): Promise { - if (typeof configOrCallback === 'function') { - // do(name, callback) - return this._step.do(name, async () => { - // eslint-disable-next-line no-return-await - return await workflowStepWithSentry(this._instanceId, this._options, async () => { - // eslint-disable-next-line no-return-await - return await startSpan( - { - op: 'function.step.do', - name, - attributes: { - [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.faas.cloudflare.workflow', - [SEMANTIC_ATTRIBUTE_SENTRY_SOURCE]: 'task', - }, - }, - async span => { - try { - const result = await configOrCallback(); - span.setStatus({ code: 1 }); - return result; - } catch (error) { - captureException(error, { mechanism: { handled: true, type: 'cloudflare' } }); - throw error; - } finally { - this._ctx.waitUntil(flush(2000)); - } - }, - ); - }); - }); - } else if (typeof maybeCallback === 'function') { - // do(name, config, callback) - return this._step.do(name, configOrCallback, async () => { + const userCallback = (maybeCallback || configOrCallback) as () => Promise; + const config = typeof configOrCallback === 'function' ? undefined : configOrCallback; + + const instrumentedCallback: () => Promise = async () => { + // eslint-disable-next-line no-return-await + return await workflowStepWithSentry(this._instanceId, this._options, async () => { // eslint-disable-next-line no-return-await - return await workflowStepWithSentry(this._instanceId, this._options, async () => { - // eslint-disable-next-line no-return-await - return await startSpan( - { - op: 'function.step.do', - name, - attributes: { - 'cloudflare.workflow.timeout': configOrCallback?.timeout, - 'cloudflare.workflow.retries.backoff': configOrCallback?.retries?.backoff, - 'cloudflare.workflow.retries.delay': configOrCallback?.retries?.delay, - 'cloudflare.workflow.retries.limit': configOrCallback?.retries?.limit, - [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.faas.cloudflare.workflow', - [SEMANTIC_ATTRIBUTE_SENTRY_SOURCE]: 'task', - }, + return await startSpan( + { + op: 'function.step.do', + name, + attributes: { + 'cloudflare.workflow.timeout': config?.timeout, + 'cloudflare.workflow.retries.backoff': config?.retries?.backoff, + 'cloudflare.workflow.retries.delay': config?.retries?.delay, + 'cloudflare.workflow.retries.limit': config?.retries?.limit, + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.faas.cloudflare.workflow', + [SEMANTIC_ATTRIBUTE_SENTRY_SOURCE]: 'task', }, - async span => { - try { - const result = await maybeCallback(); - span.setStatus({ code: 1 }); - return result; - } catch (error) { - captureException(error, { mechanism: { handled: true, type: 'cloudflare' } }); - throw error; - } finally { - this._ctx.waitUntil(flush(2000)); - } - }, - ); - }); + }, + async span => { + try { + const result = await userCallback(); + span.setStatus({ code: 1 }); + return result; + } catch (error) { + captureException(error, { mechanism: { handled: true, type: 'cloudflare' } }); + throw error; + } finally { + this._ctx.waitUntil(flush(2000)); + } + }, + ); }); - } else { - throw new Error( - 'Invalid arguments for `step.do` method. Expected either (name, callback) or (name, config, callback).', - ); - } + }; + + return config + ? this._step.do(name, config, instrumentedCallback) + : this._step.do(name, instrumentedCallback); } public async sleep(name: string, duration: WorkflowSleepDuration): Promise { @@ -177,19 +147,19 @@ class WrappedWorkflowStep implements WorkflowStep { /** * Instruments a Cloudflare Workflow class with Sentry. - * + * * @example * ```typescript * const InstrumentedWorkflow = instrumentWorkflowWithSentry( * (env) => ({ dsn: env.SENTRY_DSN }), * MyWorkflowClass * ); - * + * * export default InstrumentedWorkflow; * ``` - * + * * @param optionsCallback - Function that returns Sentry options to initialize Sentry - * @param WorkflowClass - The workflow class to instrument + * @param WorkflowClass - The workflow class to instrument * @returns Instrumented workflow class with the same interface */ export function instrumentWorkflowWithSentry< From fff143823422d418bda6958b17ffa77537619da0 Mon Sep 17 00:00:00 2001 From: Tim Fish Date: Mon, 23 Jun 2025 16:04:14 +0200 Subject: [PATCH 7/7] Final fixes --- packages/cloudflare/src/workflows.ts | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/packages/cloudflare/src/workflows.ts b/packages/cloudflare/src/workflows.ts index 1071f288196f..022f0040893a 100644 --- a/packages/cloudflare/src/workflows.ts +++ b/packages/cloudflare/src/workflows.ts @@ -92,10 +92,8 @@ class WrappedWorkflowStep implements WorkflowStep { const config = typeof configOrCallback === 'function' ? undefined : configOrCallback; const instrumentedCallback: () => Promise = async () => { - // eslint-disable-next-line no-return-await - return await workflowStepWithSentry(this._instanceId, this._options, async () => { - // eslint-disable-next-line no-return-await - return await startSpan( + return workflowStepWithSentry(this._instanceId, this._options, async () => { + return startSpan( { op: 'function.step.do', name, @@ -124,9 +122,7 @@ class WrappedWorkflowStep implements WorkflowStep { }); }; - return config - ? this._step.do(name, config, instrumentedCallback) - : this._step.do(name, instrumentedCallback); + return config ? this._step.do(name, config, instrumentedCallback) : this._step.do(name, instrumentedCallback); } public async sleep(name: string, duration: WorkflowSleepDuration): Promise {