Skip to content

Commit 8927921

Browse files
0xbad0c0d3cod1k
andauthored
feat(cloudflare): Introduce lock instrumentation for context.waitUntil to prevent multiple instrumentation (#17539)
fixes #17514 P.S.: It would be better to avoid "side effects" of instrumentation process. But it will require more changes and deeply tested. --------- Co-authored-by: cod1k <[email protected]>
1 parent 702cc87 commit 8927921

14 files changed

+300
-158
lines changed

packages/cloudflare/src/client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import type { ClientOptions, Options, ServerRuntimeClientOptions } from '@sentry/core';
22
import { applySdkMetadata, ServerRuntimeClient } from '@sentry/core';
3-
import type { makeFlushLock } from './flush';
43
import type { CloudflareTransportOptions } from './transport';
4+
import type { makeFlushLock } from './utils/flushLock';
55

66
/**
77
* The Sentry Cloudflare SDK Client.

packages/cloudflare/src/durableobject.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import { isInstrumented, markAsInstrumented } from './instrument';
1818
import { getFinalOptions } from './options';
1919
import { wrapRequestHandler } from './request';
2020
import { init } from './sdk';
21+
import { copyExecutionContext } from './utils/copyExecutionContext';
2122

2223
type MethodWrapperOptions = {
2324
spanName?: string;
@@ -192,9 +193,11 @@ export function instrumentDurableObjectWithSentry<
192193
C extends new (state: DurableObjectState, env: E) => T,
193194
>(optionsCallback: (env: E) => CloudflareOptions, DurableObjectClass: C): C {
194195
return new Proxy(DurableObjectClass, {
195-
construct(target, [context, env]) {
196+
construct(target, [ctx, env]) {
196197
setAsyncLocalStorageAsyncContextStrategy();
197198

199+
const context = copyExecutionContext(ctx);
200+
198201
const options = getFinalOptions(optionsCallback(env), env);
199202

200203
const obj = new target(context, env);

packages/cloudflare/src/flush.ts

Lines changed: 0 additions & 38 deletions
This file was deleted.

packages/cloudflare/src/handler.ts

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import { getFinalOptions } from './options';
1414
import { wrapRequestHandler } from './request';
1515
import { addCloudResourceContext } from './scope-utils';
1616
import { init } from './sdk';
17+
import { copyExecutionContext } from './utils/copyExecutionContext';
1718

1819
/**
1920
* Wrapper for Cloudflare handlers.
@@ -37,9 +38,11 @@ export function withSentry<Env = unknown, QueueHandlerMessage = unknown, CfHostM
3738
if ('fetch' in handler && typeof handler.fetch === 'function' && !isInstrumented(handler.fetch)) {
3839
handler.fetch = new Proxy(handler.fetch, {
3940
apply(target, thisArg, args: Parameters<ExportedHandlerFetchHandler<Env, CfHostMetadata>>) {
40-
const [request, env, context] = args;
41+
const [request, env, ctx] = args;
4142

4243
const options = getFinalOptions(optionsCallback(env), env);
44+
const context = copyExecutionContext(ctx);
45+
args[2] = context;
4346

4447
return wrapRequestHandler({ options, request, context }, () => target.apply(thisArg, args));
4548
},
@@ -71,7 +74,9 @@ export function withSentry<Env = unknown, QueueHandlerMessage = unknown, CfHostM
7174
if ('scheduled' in handler && typeof handler.scheduled === 'function' && !isInstrumented(handler.scheduled)) {
7275
handler.scheduled = new Proxy(handler.scheduled, {
7376
apply(target, thisArg, args: Parameters<ExportedHandlerScheduledHandler<Env>>) {
74-
const [event, env, context] = args;
77+
const [event, env, ctx] = args;
78+
const context = copyExecutionContext(ctx);
79+
args[2] = context;
7580
return withIsolationScope(isolationScope => {
7681
const options = getFinalOptions(optionsCallback(env), env);
7782
const waitUntil = context.waitUntil.bind(context);
@@ -114,7 +119,9 @@ export function withSentry<Env = unknown, QueueHandlerMessage = unknown, CfHostM
114119
if ('email' in handler && typeof handler.email === 'function' && !isInstrumented(handler.email)) {
115120
handler.email = new Proxy(handler.email, {
116121
apply(target, thisArg, args: Parameters<EmailExportedHandler<Env>>) {
117-
const [emailMessage, env, context] = args;
122+
const [emailMessage, env, ctx] = args;
123+
const context = copyExecutionContext(ctx);
124+
args[2] = context;
118125
return withIsolationScope(isolationScope => {
119126
const options = getFinalOptions(optionsCallback(env), env);
120127
const waitUntil = context.waitUntil.bind(context);
@@ -155,7 +162,9 @@ export function withSentry<Env = unknown, QueueHandlerMessage = unknown, CfHostM
155162
if ('queue' in handler && typeof handler.queue === 'function' && !isInstrumented(handler.queue)) {
156163
handler.queue = new Proxy(handler.queue, {
157164
apply(target, thisArg, args: Parameters<ExportedHandlerQueueHandler<Env, QueueHandlerMessage>>) {
158-
const [batch, env, context] = args;
165+
const [batch, env, ctx] = args;
166+
const context = copyExecutionContext(ctx);
167+
args[2] = context;
159168

160169
return withIsolationScope(isolationScope => {
161170
const options = getFinalOptions(optionsCallback(env), env);
@@ -205,7 +214,9 @@ export function withSentry<Env = unknown, QueueHandlerMessage = unknown, CfHostM
205214
if ('tail' in handler && typeof handler.tail === 'function' && !isInstrumented(handler.tail)) {
206215
handler.tail = new Proxy(handler.tail, {
207216
apply(target, thisArg, args: Parameters<ExportedHandlerTailHandler<Env>>) {
208-
const [, env, context] = args;
217+
const [, env, ctx] = args;
218+
const context = copyExecutionContext(ctx);
219+
args[2] = context;
209220

210221
return withIsolationScope(async isolationScope => {
211222
const options = getFinalOptions(optionsCallback(env), env);

packages/cloudflare/src/sdk.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ import {
1212
} from '@sentry/core';
1313
import type { CloudflareClientOptions, CloudflareOptions } from './client';
1414
import { CloudflareClient } from './client';
15-
import { makeFlushLock } from './flush';
1615
import { fetchIntegration } from './integrations/fetch';
1716
import { setupOpenTelemetryTracer } from './opentelemetry/tracer';
1817
import { makeCloudflareTransport } from './transport';
18+
import { makeFlushLock } from './utils/flushLock';
1919
import { defaultStackParser } from './vendor/stacktrace';
2020

2121
/** Get the default integrations for the Cloudflare SDK. */
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import { type DurableObjectState, type ExecutionContext } from '@cloudflare/workers-types';
2+
3+
const kBound = Symbol.for('kBound');
4+
5+
const defaultPropertyOptions: PropertyDescriptor = {
6+
enumerable: true,
7+
configurable: true,
8+
writable: true,
9+
};
10+
11+
/**
12+
* Clones the given execution context by creating a shallow copy while ensuring the binding of specific methods.
13+
*
14+
* @param {ExecutionContext|DurableObjectState|void} ctx - The execution context to clone. Can be void.
15+
* @return {ExecutionContext|DurableObjectState|void} A cloned execution context with bound methods, or the original void value if no context was provided.
16+
*/
17+
export function copyExecutionContext<T extends ExecutionContext | DurableObjectState>(ctx: T): T {
18+
if (!ctx) return ctx;
19+
return Object.create(ctx, {
20+
waitUntil: { ...defaultPropertyOptions, value: copyAndBindMethod(ctx, 'waitUntil') },
21+
...('passThroughOnException' in ctx && {
22+
passThroughOnException: { ...defaultPropertyOptions, value: copyAndBindMethod(ctx, 'passThroughOnException') },
23+
}),
24+
});
25+
}
26+
27+
/**
28+
* Copies a method from the given object and ensures the copied method remains bound to the original object's context.
29+
*
30+
* @param {object} obj - The object containing the method to be copied and bound.
31+
* @param {string|symbol} method - The key of the method within the object to be copied and bound.
32+
* @return {Function} - The copied and bound method, or the original property if it is not a function.
33+
*/
34+
function copyAndBindMethod<T, K extends keyof T>(obj: T, method: K): T[K] {
35+
const methodImpl = obj[method];
36+
if (typeof methodImpl !== 'function') return methodImpl;
37+
if ((methodImpl as T[K] & { [kBound]?: boolean })[kBound]) return methodImpl;
38+
const bound = methodImpl.bind(obj);
39+
40+
return new Proxy(bound, {
41+
get: (target, prop, receiver) => {
42+
if (kBound === prop) return true;
43+
if ('bind' === prop) return () => receiver;
44+
return Reflect.get(target, prop, receiver);
45+
},
46+
});
47+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import type { ExecutionContext } from '@cloudflare/workers-types';
2+
import { createPromiseResolver } from './makePromiseResolver';
3+
4+
type FlushLock = {
5+
readonly ready: Promise<void>;
6+
readonly finalize: () => Promise<void>;
7+
};
8+
type MaybeLockable<T extends object> = T & { [kFlushLock]?: FlushLock };
9+
10+
const kFlushLock = Symbol.for('kFlushLock');
11+
12+
function getInstrumentedLock<T extends object>(o: MaybeLockable<T>): FlushLock | undefined {
13+
return o[kFlushLock];
14+
}
15+
16+
function storeInstrumentedLock<T extends object>(o: MaybeLockable<T>, lock: FlushLock): void {
17+
o[kFlushLock] = lock;
18+
}
19+
20+
/**
21+
* Enhances the given execution context by wrapping its `waitUntil` method with a proxy
22+
* to monitor pending tasks and provides a flusher function to ensure all tasks
23+
* have been completed before executing any subsequent logic.
24+
*
25+
* @param {ExecutionContext} context - The execution context to be enhanced. If no context is provided, the function returns undefined.
26+
* @return {FlushLock} Returns a flusher function if a valid context is provided, otherwise undefined.
27+
*/
28+
export function makeFlushLock(context: ExecutionContext): FlushLock {
29+
// eslint-disable-next-line @typescript-eslint/unbound-method
30+
let lock = getInstrumentedLock(context.waitUntil);
31+
if (lock) {
32+
// It is fine to return the same lock multiple times because this means the context has already been instrumented.
33+
return lock;
34+
}
35+
let pending = 0;
36+
const originalWaitUntil = context.waitUntil.bind(context) as typeof context.waitUntil;
37+
const { promise, resolve } = createPromiseResolver();
38+
const hijackedWaitUntil: typeof originalWaitUntil = promise => {
39+
pending++;
40+
return originalWaitUntil(
41+
promise.finally(() => {
42+
if (--pending === 0) resolve();
43+
}),
44+
);
45+
};
46+
lock = Object.freeze({
47+
ready: promise,
48+
finalize: () => {
49+
if (pending === 0) resolve();
50+
return promise;
51+
},
52+
}) as FlushLock;
53+
storeInstrumentedLock(hijackedWaitUntil, lock);
54+
context.waitUntil = hijackedWaitUntil;
55+
56+
return lock;
57+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
type PromiseWithResolvers<T, E = unknown> = {
2+
readonly promise: Promise<T>;
3+
readonly resolve: (value?: T | PromiseLike<T>) => void;
4+
readonly reject: (reason?: E) => void;
5+
};
6+
/**
7+
* Creates an object containing a promise, along with its corresponding resolve and reject functions.
8+
*
9+
* This method provides a convenient way to create a promise and access its resolvers externally.
10+
*
11+
* @template T - The type of the resolved value of the promise.
12+
* @template E - The type of the rejected value of the promise. Defaults to `unknown`.
13+
* @return {PromiseWithResolvers<T, E>} An object containing the promise and its resolve and reject functions.
14+
*/
15+
export function createPromiseResolver<T, E = unknown>(): PromiseWithResolvers<T, E> {
16+
if ('withResolvers' in Promise && typeof Promise.withResolvers === 'function') {
17+
return Promise.withResolvers();
18+
}
19+
let resolve;
20+
let reject;
21+
const promise = new Promise<T>((res, rej) => {
22+
resolve = res;
23+
reject = rej;
24+
});
25+
return { promise, resolve, reject } as unknown as PromiseWithResolvers<T, E>;
26+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import { type ExecutionContext } from '@cloudflare/workers-types';
2+
import { type Mocked, describe, expect, it, vi } from 'vitest';
3+
import { copyExecutionContext } from '../src/utils/copyExecutionContext';
4+
5+
describe('Copy of the execution context', () => {
6+
describe.for<keyof ExecutionContext>(['waitUntil', 'passThroughOnException'])('%s', method => {
7+
it('Was not bound more than once', async () => {
8+
const context = makeExecutionContextMock();
9+
const copy = copyExecutionContext(context);
10+
const copy_of_copy = copyExecutionContext(copy);
11+
12+
expect(copy[method]).toBe(copy_of_copy[method]);
13+
});
14+
it('Copied method is bound to the original', async () => {
15+
const context = makeExecutionContextMock();
16+
const copy = copyExecutionContext(context);
17+
18+
expect(copy[method]()).toBe(context);
19+
});
20+
it('Copied method "rebind" prevention', async () => {
21+
const context = makeExecutionContextMock();
22+
const copy = copyExecutionContext(context);
23+
expect(copy[method].bind('test')).toBe(copy[method]);
24+
});
25+
});
26+
27+
it('No side effects', async () => {
28+
const context = makeExecutionContextMock();
29+
expect(() => copyExecutionContext(Object.freeze(context))).not.toThrow(
30+
/Cannot define property \w+, object is not extensible/,
31+
);
32+
});
33+
it('Respects symbols', async () => {
34+
const s = Symbol('test');
35+
const context = makeExecutionContextMock<ExecutionContext & { [s]: unknown }>();
36+
context[s] = {};
37+
const copy = copyExecutionContext(context);
38+
expect(copy[s]).toBe(context[s]);
39+
});
40+
});
41+
42+
function makeExecutionContextMock<T extends ExecutionContext>() {
43+
return {
44+
waitUntil: vi.fn().mockReturnThis(),
45+
passThroughOnException: vi.fn().mockReturnThis(),
46+
} as unknown as Mocked<T>;
47+
}

packages/cloudflare/test/durableobject.test.ts

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import type { ExecutionContext } from '@cloudflare/workers-types';
22
import * as SentryCore from '@sentry/core';
3-
import { afterEach, describe, expect, it, onTestFinished, vi } from 'vitest';
3+
import { afterEach, describe, expect, it, vi } from 'vitest';
44
import { instrumentDurableObjectWithSentry } from '../src';
55
import { isInstrumented } from '../src/instrument';
6+
import { createPromiseResolver } from '../src/utils/makePromiseResolver';
67

78
describe('instrumentDurableObjectWithSentry', () => {
89
afterEach(() => {
@@ -122,15 +123,13 @@ describe('instrumentDurableObjectWithSentry', () => {
122123
});
123124

124125
it('flush performs after all waitUntil promises are finished', async () => {
125-
vi.useFakeTimers();
126-
onTestFinished(() => {
127-
vi.useRealTimers();
128-
});
129126
const flush = vi.spyOn(SentryCore.Client.prototype, 'flush');
130127
const waitUntil = vi.fn();
128+
const { promise, resolve } = createPromiseResolver();
129+
process.nextTick(resolve);
131130
const testClass = vi.fn(context => ({
132131
fetch: () => {
133-
context.waitUntil(new Promise(res => setTimeout(res)));
132+
context.waitUntil(promise);
134133
return new Response('test');
135134
},
136135
}));
@@ -142,8 +141,7 @@ describe('instrumentDurableObjectWithSentry', () => {
142141
expect(() => dObject.fetch(new Request('https://example.com'))).not.toThrow();
143142
expect(flush).not.toBeCalled();
144143
expect(waitUntil).toHaveBeenCalledOnce();
145-
vi.advanceTimersToNextTimer();
146-
await Promise.all(waitUntil.mock.calls.map(([p]) => p));
144+
await Promise.all(waitUntil.mock.calls.map(call => call[0]));
147145
expect(flush).toBeCalled();
148146
});
149147

0 commit comments

Comments
 (0)