diff --git a/packages/cloudflare/package.json b/packages/cloudflare/package.json index 7df19ebc4636..16e478f6d67f 100644 --- a/packages/cloudflare/package.json +++ b/packages/cloudflare/package.json @@ -60,7 +60,7 @@ } }, "devDependencies": { - "@cloudflare/workers-types": "4.20240725.0", + "@cloudflare/workers-types": "4.20250620.0", "@types/node": "^18.19.1", "wrangler": "^3.67.1" }, diff --git a/packages/cloudflare/src/client.ts b/packages/cloudflare/src/client.ts index 224865e3731e..9b4f18086658 100644 --- a/packages/cloudflare/src/client.ts +++ b/packages/cloudflare/src/client.ts @@ -29,8 +29,14 @@ export class CloudflareClient extends ServerRuntimeClient { const options = typeof handlerOrOptions === 'function' ? handlerOrOptions(context) : handlerOrOptions; - return wrapRequestHandler({ options, request: context.request, context }, () => context.next()); + return wrapRequestHandler({ options, request: context.request, context: { ...context, props: {} } }, () => + context.next(), + ); }; } diff --git a/packages/cloudflare/src/sdk.ts b/packages/cloudflare/src/sdk.ts index dee32b856eb0..90ef3c0bedf9 100644 --- a/packages/cloudflare/src/sdk.ts +++ b/packages/cloudflare/src/sdk.ts @@ -20,7 +20,9 @@ import { defaultStackParser } from './vendor/stacktrace'; export function getDefaultIntegrations(options: CloudflareOptions): Integration[] { const sendDefaultPii = options.sendDefaultPii ?? false; return [ - dedupeIntegration(), + // The Dedupe integration should not be used in workflows because we want to + // capture all step failures, even if they are the same error. + ...(options.enableDedupe === false ? [] : [dedupeIntegration()]), // TODO(v10): Replace with `eventFiltersIntegration` once we remove the deprecated `inboundFiltersIntegration` // eslint-disable-next-line deprecation/deprecation inboundFiltersIntegration(), diff --git a/packages/cloudflare/src/workflows.ts b/packages/cloudflare/src/workflows.ts new file mode 100644 index 000000000000..022f0040893a --- /dev/null +++ b/packages/cloudflare/src/workflows.ts @@ -0,0 +1,184 @@ +import type { PropagationContext } from '@sentry/core'; +import { + captureException, + flush, + SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN, + SEMANTIC_ATTRIBUTE_SENTRY_SOURCE, + startSpan, + withIsolationScope, + withScope, +} from '@sentry/core'; +import type { + WorkflowEntrypoint, + WorkflowEvent, + WorkflowSleepDuration, + WorkflowStep, + WorkflowStepConfig, + WorkflowStepEvent, + WorkflowTimeoutDuration, +} from 'cloudflare:workers'; +import { setAsyncLocalStorageAsyncContextStrategy } from './async'; +import type { CloudflareOptions } from './client'; +import { addCloudResourceContext } from './scope-utils'; +import { init } from './sdk'; + +const UUID_REGEX = /^[0-9a-f]{8}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{4}-?[0-9a-f]{12}$/i; + +function propagationContextFromInstanceId(instanceId: string): PropagationContext { + // Validate and normalize traceId - should be a valid UUID with or without hyphens + if (!UUID_REGEX.test(instanceId)) { + throw new Error("Invalid 'instanceId' for workflow: Sentry requires random UUIDs for instanceId."); + } + + // Remove hyphens to get UUID without hyphens + const traceId = instanceId.replace(/-/g, ''); + + // Derive sampleRand from last 4 characters of the random UUID + // + // We cannot store any state between workflow steps, so we derive the + // sampleRand from the traceId itself. This ensures that the sampling is + // consistent across all steps in the same workflow instance. + const sampleRand = parseInt(traceId.slice(-4), 16) / 0xffff; + + return { + traceId, + sampleRand, + }; +} + +async function workflowStepWithSentry( + instanceId: string, + options: CloudflareOptions, + callback: () => V, +): Promise { + setAsyncLocalStorageAsyncContextStrategy(); + + return withIsolationScope(async isolationScope => { + const client = init({ ...options, enableDedupe: false }); + isolationScope.setClient(client); + + addCloudResourceContext(isolationScope); + + return withScope(async scope => { + const propagationContext = propagationContextFromInstanceId(instanceId); + scope.setPropagationContext(propagationContext); + + // eslint-disable-next-line no-return-await + return await callback(); + }); + }); +} + +class WrappedWorkflowStep implements WorkflowStep { + public constructor( + private _instanceId: string, + private _ctx: ExecutionContext, + private _options: CloudflareOptions, + private _step: WorkflowStep, + ) {} + + public async do>(name: string, callback: () => Promise): Promise; + public async do>( + name: string, + config: WorkflowStepConfig, + callback: () => Promise, + ): Promise; + public async do>( + name: string, + configOrCallback: WorkflowStepConfig | (() => Promise), + maybeCallback?: () => Promise, + ): Promise { + const userCallback = (maybeCallback || configOrCallback) as () => Promise; + const config = typeof configOrCallback === 'function' ? undefined : configOrCallback; + + const instrumentedCallback: () => Promise = async () => { + return workflowStepWithSentry(this._instanceId, this._options, async () => { + return startSpan( + { + op: 'function.step.do', + name, + attributes: { + 'cloudflare.workflow.timeout': config?.timeout, + 'cloudflare.workflow.retries.backoff': config?.retries?.backoff, + 'cloudflare.workflow.retries.delay': config?.retries?.delay, + 'cloudflare.workflow.retries.limit': config?.retries?.limit, + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.faas.cloudflare.workflow', + [SEMANTIC_ATTRIBUTE_SENTRY_SOURCE]: 'task', + }, + }, + async span => { + try { + const result = await userCallback(); + span.setStatus({ code: 1 }); + return result; + } catch (error) { + captureException(error, { mechanism: { handled: true, type: 'cloudflare' } }); + throw error; + } finally { + this._ctx.waitUntil(flush(2000)); + } + }, + ); + }); + }; + + return config ? this._step.do(name, config, instrumentedCallback) : this._step.do(name, instrumentedCallback); + } + + public async sleep(name: string, duration: WorkflowSleepDuration): Promise { + return this._step.sleep(name, duration); + } + + public async sleepUntil(name: string, timestamp: Date | number): Promise { + return this._step.sleepUntil(name, timestamp); + } + + public async waitForEvent>( + name: string, + options: { type: string; timeout?: WorkflowTimeoutDuration | number }, + ): Promise> { + return this._step.waitForEvent(name, options); + } +} + +/** + * Instruments a Cloudflare Workflow class with Sentry. + * + * @example + * ```typescript + * const InstrumentedWorkflow = instrumentWorkflowWithSentry( + * (env) => ({ dsn: env.SENTRY_DSN }), + * MyWorkflowClass + * ); + * + * export default InstrumentedWorkflow; + * ``` + * + * @param optionsCallback - Function that returns Sentry options to initialize Sentry + * @param WorkflowClass - The workflow class to instrument + * @returns Instrumented workflow class with the same interface + */ +export function instrumentWorkflowWithSentry< + E, // Environment type + P, // Payload type + T extends WorkflowEntrypoint, // WorkflowEntrypoint type + C extends new (ctx: ExecutionContext, env: E) => T, // Constructor type of the WorkflowEntrypoint class +>(optionsCallback: (env: E) => CloudflareOptions, WorkFlowClass: C): C { + return new Proxy(WorkFlowClass, { + construct(target: C, args: [ctx: ExecutionContext, env: E], newTarget) { + const [ctx, env] = args; + const options = optionsCallback(env); + const instance = Reflect.construct(target, args, newTarget) as T; + return new Proxy(instance, { + get(obj, prop, receiver) { + if (prop === 'run') { + return async function (event: WorkflowEvent

, step: WorkflowStep): Promise { + return obj.run.call(obj, event, new WrappedWorkflowStep(event.instanceId, ctx, options, step)); + }; + } + return Reflect.get(obj, prop, receiver); + }, + }); + }, + }) as C; +} diff --git a/packages/cloudflare/test/workflow.test.ts b/packages/cloudflare/test/workflow.test.ts new file mode 100644 index 000000000000..03eee5191eb2 --- /dev/null +++ b/packages/cloudflare/test/workflow.test.ts @@ -0,0 +1,350 @@ +/* eslint-disable @typescript-eslint/unbound-method */ +import type { WorkflowEvent, WorkflowStep, WorkflowStepConfig } from 'cloudflare:workers'; +import { beforeEach, describe, expect, test, vi } from 'vitest'; +import { instrumentWorkflowWithSentry } from '../src/workflows'; + +const mockStep: WorkflowStep = { + do: vi + .fn() + .mockImplementation( + async ( + _name: string, + configOrCallback: WorkflowStepConfig | (() => Promise), + maybeCallback?: () => Promise, + ) => { + let count = 0; + + while (count <= 5) { + count += 1; + + try { + if (typeof configOrCallback === 'function') { + return await configOrCallback(); + } else { + return await (maybeCallback ? maybeCallback() : Promise.resolve()); + } + } catch (error) { + await new Promise(resolve => setTimeout(resolve, 1000)); + } + } + }, + ), + sleep: vi.fn(), + sleepUntil: vi.fn(), + waitForEvent: vi.fn(), +}; + +const mockTransport = { + send: vi.fn().mockImplementation(() => Promise.resolve({ statusCode: 200 })), + flush: vi.fn().mockImplementation(() => Promise.resolve(true)), + close: vi.fn().mockImplementation(() => Promise.resolve(true)), +}; + +const mockContext: ExecutionContext = { + waitUntil: vi.fn().mockImplementation(promise => promise), + passThroughOnException: vi.fn(), + props: {}, +}; + +function getSentryOptions() { + return { + dsn: 'https://8@ingest.sentry.io/4', + release: '1.0.0', + tracesSampleRate: 1.0, + transport: () => mockTransport, + }; +} + +type Params = { + // +}; + +const INSTANCE_ID = 'ae0ee067-61b3-4852-9219-5d62282270f0'; +const SAMPLE_RAND = '0.44116884107728693'; +const TRACE_ID = INSTANCE_ID.replace(/-/g, ''); + +describe('workflows', () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + test('Calls expected functions', async () => { + class BasicTestWorkflow { + constructor(_ctx: ExecutionContext, _env: unknown) {} + + async run(_event: Readonly>, step: WorkflowStep): Promise { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const files = await step.do('first step', async () => { + return { files: ['doc_7392_rev3.pdf', 'report_x29_final.pdf'] }; + }); + } + } + + const TestWorkflowInstrumented = instrumentWorkflowWithSentry(getSentryOptions, BasicTestWorkflow as any); + const workflow = new TestWorkflowInstrumented(mockContext, {}) as BasicTestWorkflow; + const event = { payload: {}, timestamp: new Date(), instanceId: INSTANCE_ID }; + await workflow.run(event, mockStep); + + expect(mockStep.do).toHaveBeenCalledTimes(1); + expect(mockStep.do).toHaveBeenCalledWith('first step', expect.any(Function)); + expect(mockContext.waitUntil).toHaveBeenCalledTimes(1); + expect(mockContext.waitUntil).toHaveBeenCalledWith(expect.any(Promise)); + expect(mockTransport.send).toHaveBeenCalledTimes(1); + expect(mockTransport.send).toHaveBeenCalledWith([ + expect.objectContaining({ + trace: expect.objectContaining({ + transaction: 'first step', + trace_id: TRACE_ID, + sample_rand: SAMPLE_RAND, + }), + }), + [ + [ + { + type: 'transaction', + }, + expect.objectContaining({ + event_id: expect.any(String), + contexts: { + trace: { + parent_span_id: undefined, + span_id: expect.any(String), + trace_id: TRACE_ID, + data: { + 'sentry.origin': 'auto.faas.cloudflare.workflow', + 'sentry.op': 'function.step.do', + 'sentry.source': 'task', + 'sentry.sample_rate': 1, + }, + op: 'function.step.do', + status: 'ok', + origin: 'auto.faas.cloudflare.workflow', + }, + cloud_resource: { 'cloud.provider': 'cloudflare' }, + runtime: { name: 'cloudflare' }, + }, + type: 'transaction', + transaction_info: { source: 'task' }, + start_timestamp: expect.any(Number), + timestamp: expect.any(Number), + }), + ], + ], + ]); + }); + + class ErrorTestWorkflow { + count = 0; + constructor(_ctx: ExecutionContext, _env: unknown) {} + + async run(_event: Readonly>, step: WorkflowStep): Promise { + await step.do('sometimes error step', async () => { + this.count += 1; + + if (this.count <= 1) { + throw new Error('Test error'); + } + + return { files: ['doc_7392_rev3.pdf', 'report_x29_final.pdf'] }; + }); + } + } + + test('Captures step errors', async () => { + const TestWorkflowInstrumented = instrumentWorkflowWithSentry(getSentryOptions, ErrorTestWorkflow as any); + const workflow = new TestWorkflowInstrumented(mockContext, {}) as ErrorTestWorkflow; + const event = { payload: {}, timestamp: new Date(), instanceId: INSTANCE_ID }; + await workflow.run(event, mockStep); + + expect(mockStep.do).toHaveBeenCalledTimes(1); + expect(mockStep.do).toHaveBeenCalledWith('sometimes error step', expect.any(Function)); + expect(mockContext.waitUntil).toHaveBeenCalledTimes(2); + expect(mockContext.waitUntil).toHaveBeenCalledWith(expect.any(Promise)); + expect(mockTransport.send).toHaveBeenCalledTimes(3); + + // First we should get the error event + expect(mockTransport.send).toHaveBeenNthCalledWith(1, [ + expect.objectContaining({ + trace: expect.objectContaining({ + transaction: 'sometimes error step', + trace_id: TRACE_ID, + sample_rand: SAMPLE_RAND, + }), + }), + [ + [ + { + type: 'event', + }, + expect.objectContaining({ + event_id: expect.any(String), + contexts: { + trace: { + parent_span_id: undefined, + span_id: expect.any(String), + trace_id: TRACE_ID, + }, + cloud_resource: { 'cloud.provider': 'cloudflare' }, + runtime: { name: 'cloudflare' }, + }, + timestamp: expect.any(Number), + exception: { + values: [ + expect.objectContaining({ + type: 'Error', + value: 'Test error', + mechanism: { type: 'cloudflare', handled: true }, + }), + ], + }, + }), + ], + ], + ]); + + // The the failed transaction + expect(mockTransport.send).toHaveBeenNthCalledWith(2, [ + expect.objectContaining({ + trace: expect.objectContaining({ + transaction: 'sometimes error step', + trace_id: TRACE_ID, + sample_rand: SAMPLE_RAND, + }), + }), + [ + [ + { + type: 'transaction', + }, + expect.objectContaining({ + event_id: expect.any(String), + contexts: { + trace: { + parent_span_id: undefined, + span_id: expect.any(String), + trace_id: TRACE_ID, + data: { + 'sentry.origin': 'auto.faas.cloudflare.workflow', + 'sentry.op': 'function.step.do', + 'sentry.source': 'task', + 'sentry.sample_rate': 1, + }, + op: 'function.step.do', + status: 'internal_error', + origin: 'auto.faas.cloudflare.workflow', + }, + cloud_resource: { 'cloud.provider': 'cloudflare' }, + runtime: { name: 'cloudflare' }, + }, + type: 'transaction', + transaction_info: { source: 'task' }, + start_timestamp: expect.any(Number), + timestamp: expect.any(Number), + }), + ], + ], + ]); + + // The the successful transaction + expect(mockTransport.send).toHaveBeenNthCalledWith(3, [ + expect.objectContaining({ + trace: expect.objectContaining({ + transaction: 'sometimes error step', + trace_id: TRACE_ID, + sample_rand: SAMPLE_RAND, + }), + }), + [ + [ + { + type: 'transaction', + }, + expect.objectContaining({ + event_id: expect.any(String), + contexts: { + trace: { + parent_span_id: undefined, + span_id: expect.any(String), + trace_id: TRACE_ID, + data: { + 'sentry.origin': 'auto.faas.cloudflare.workflow', + 'sentry.op': 'function.step.do', + 'sentry.source': 'task', + 'sentry.sample_rate': 1, + }, + op: 'function.step.do', + status: 'ok', + origin: 'auto.faas.cloudflare.workflow', + }, + cloud_resource: { 'cloud.provider': 'cloudflare' }, + runtime: { name: 'cloudflare' }, + }, + type: 'transaction', + transaction_info: { source: 'task' }, + start_timestamp: expect.any(Number), + timestamp: expect.any(Number), + }), + ], + ], + ]); + }); + + test('Sampled random via instanceId', async () => { + const TestWorkflowInstrumented = instrumentWorkflowWithSentry( + // Override the tracesSampleRate to 0.4 to be below the sampleRand + // calculated from the instanceId + () => ({ ...getSentryOptions(), tracesSampleRate: 0.4 }), + ErrorTestWorkflow as any, + ); + const workflow = new TestWorkflowInstrumented(mockContext, {}) as ErrorTestWorkflow; + const event = { payload: {}, timestamp: new Date(), instanceId: INSTANCE_ID }; + await workflow.run(event, mockStep); + + expect(mockStep.do).toHaveBeenCalledTimes(1); + expect(mockStep.do).toHaveBeenCalledWith('sometimes error step', expect.any(Function)); + expect(mockContext.waitUntil).toHaveBeenCalledTimes(2); + expect(mockContext.waitUntil).toHaveBeenCalledWith(expect.any(Promise)); + + // We should get the error event and then nothing else. No transactions + // should be sent + expect(mockTransport.send).toHaveBeenCalledTimes(1); + + expect(mockTransport.send).toHaveBeenCalledWith([ + expect.objectContaining({ + trace: expect.objectContaining({ + transaction: 'sometimes error step', + trace_id: TRACE_ID, + sample_rand: SAMPLE_RAND, + }), + }), + [ + [ + { + type: 'event', + }, + expect.objectContaining({ + event_id: expect.any(String), + contexts: { + trace: { + parent_span_id: undefined, + span_id: expect.any(String), + trace_id: TRACE_ID, + }, + cloud_resource: { 'cloud.provider': 'cloudflare' }, + runtime: { name: 'cloudflare' }, + }, + timestamp: expect.any(Number), + exception: { + values: [ + expect.objectContaining({ + type: 'Error', + value: 'Test error', + }), + ], + }, + }), + ], + ], + ]); + }); +}); diff --git a/packages/sveltekit/src/worker/cloudflare.ts b/packages/sveltekit/src/worker/cloudflare.ts index 9508c331369e..e3d93e8b8922 100644 --- a/packages/sveltekit/src/worker/cloudflare.ts +++ b/packages/sveltekit/src/worker/cloudflare.ts @@ -34,7 +34,7 @@ export function initCloudflareSentryHandle(options: CloudflareOptions): Handle { return wrapRequestHandler( { options: opts, - request: event.request, + request: event.request as Request>, // @ts-expect-error This will exist in Cloudflare context: event.platform.context, }, diff --git a/yarn.lock b/yarn.lock index 5c958751e59d..36d4a3baa246 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2671,10 +2671,10 @@ resolved "https://registry.yarnpkg.com/@cloudflare/workerd-windows-64/-/workerd-windows-64-1.20240718.0.tgz#940893e62df7f5a8ec895572b834c95c1e256fbd" integrity sha512-YpCRvvT47XanFum7C3SedOZKK6BfVhqmwdAAVAQFyc4gsCdegZo0JkUkdloC/jwuWlbCACOG2HTADHOqyeolzQ== -"@cloudflare/workers-types@4.20240725.0": - version "4.20240725.0" - resolved "https://registry.yarnpkg.com/@cloudflare/workers-types/-/workers-types-4.20240725.0.tgz#e151e0c069c0070b4d7168c7b0d4ae6c50d3f237" - integrity sha512-L6T/Bg50zm9IIACQVQ0CdVcQL+2nLkRXdPz6BsXF3SlzgjyWR5ndVctAbfr/HLV7aKYxWnnEZsIORsTWb+FssA== +"@cloudflare/workers-types@4.20250620.0": + version "4.20250620.0" + resolved "https://registry.yarnpkg.com/@cloudflare/workers-types/-/workers-types-4.20250620.0.tgz#a22e635a631212963b84e315191614b20c4ad317" + integrity sha512-EVvRB/DJEm6jhdKg+A4Qm4y/ry1cIvylSgSO3/f/Bv161vldDRxaXM2YoQQWFhLOJOw0qtrHsKOD51KYxV1XCw== "@cnakazawa/watch@^1.0.3": version "1.0.4"