From 2fd9215b039d921f3cba7ccc6dfa1fa9468e464c Mon Sep 17 00:00:00 2001 From: Andrei Borza Date: Wed, 22 Oct 2025 21:35:42 +0200 Subject: [PATCH] fix(core): Fix wrong async types when instrumenting anthropic's stream api The issue surfaced when `message.stream` was used in conjunction with the `stream: true` option which would lead to us returning async results instead of the expected MessageStream from anthropic ai. We now take this into account and tightened the types. Closes: #17977 --- .../tracing/anthropic/scenario-stream.mjs | 78 ++++++++++++++++++- .../suites/tracing/anthropic/test.ts | 25 ++++++ packages/core/src/utils/anthropic-ai/index.ts | 21 ++--- 3 files changed, 111 insertions(+), 13 deletions(-) diff --git a/dev-packages/node-integration-tests/suites/tracing/anthropic/scenario-stream.mjs b/dev-packages/node-integration-tests/suites/tracing/anthropic/scenario-stream.mjs index ac5eb6019010..4e0fa74fdd0d 100644 --- a/dev-packages/node-integration-tests/suites/tracing/anthropic/scenario-stream.mjs +++ b/dev-packages/node-integration-tests/suites/tracing/anthropic/scenario-stream.mjs @@ -32,6 +32,62 @@ function createMockStreamEvents(model = 'claude-3-haiku-20240307') { return generator(); } +// Mimics Anthropic SDK's MessageStream class +class MockMessageStream { + constructor(model) { + this._model = model; + this._eventHandlers = {}; + } + + on(event, handler) { + if (!this._eventHandlers[event]) { + this._eventHandlers[event] = []; + } + this._eventHandlers[event].push(handler); + + // Start processing events asynchronously (don't await) + if (event === 'streamEvent' && !this._processing) { + this._processing = true; + this._processEvents(); + } + + return this; + } + + async _processEvents() { + try { + const generator = createMockStreamEvents(this._model); + for await (const event of generator) { + if (this._eventHandlers['streamEvent']) { + for (const handler of this._eventHandlers['streamEvent']) { + handler(event); + } + } + } + + // Emit 'message' event when done + if (this._eventHandlers['message']) { + for (const handler of this._eventHandlers['message']) { + handler(); + } + } + } catch (error) { + if (this._eventHandlers['error']) { + for (const handler of this._eventHandlers['error']) { + handler(error); + } + } + } + } + + async *[Symbol.asyncIterator]() { + const generator = createMockStreamEvents(this._model); + for await (const event of generator) { + yield event; + } + } +} + class MockAnthropic { constructor(config) { this.apiKey = config.apiKey; @@ -68,9 +124,9 @@ class MockAnthropic { }; } - async _messagesStream(params) { - await new Promise(resolve => setTimeout(resolve, 5)); - return createMockStreamEvents(params?.model); + // This should return synchronously (like the real Anthropic SDK) + _messagesStream(params) { + return new MockMessageStream(params?.model); } } @@ -90,13 +146,27 @@ async function run() { } // 2) Streaming via messages.stream API - const stream2 = await client.messages.stream({ + const stream2 = client.messages.stream({ model: 'claude-3-haiku-20240307', messages: [{ role: 'user', content: 'Stream this too' }], }); for await (const _ of stream2) { void _; } + + // 3) Streaming via messages.stream API with redundant stream: true param + const stream3 = client.messages.stream({ + model: 'claude-3-haiku-20240307', + messages: [{ role: 'user', content: 'Stream with param' }], + stream: true, // This param is redundant but should not break synchronous behavior + }); + // Verify it has .on() method immediately (not a Promise) + if (typeof stream3.on !== 'function') { + throw new Error('BUG: messages.stream() with stream: true did not return MessageStream synchronously!'); + } + for await (const _ of stream3) { + void _; + } }); } diff --git a/dev-packages/node-integration-tests/suites/tracing/anthropic/test.ts b/dev-packages/node-integration-tests/suites/tracing/anthropic/test.ts index 57e788b721d1..2c92c6f8d233 100644 --- a/dev-packages/node-integration-tests/suites/tracing/anthropic/test.ts +++ b/dev-packages/node-integration-tests/suites/tracing/anthropic/test.ts @@ -308,6 +308,23 @@ describe('Anthropic integration', () => { 'gen_ai.usage.total_tokens': 25, }), }), + // messages.stream with redundant stream: true param + expect.objectContaining({ + description: 'messages claude-3-haiku-20240307 stream-response', + op: 'gen_ai.messages', + data: expect.objectContaining({ + 'gen_ai.system': 'anthropic', + 'gen_ai.operation.name': 'messages', + 'gen_ai.request.model': 'claude-3-haiku-20240307', + 'gen_ai.request.stream': true, + 'gen_ai.response.streaming': true, + 'gen_ai.response.model': 'claude-3-haiku-20240307', + 'gen_ai.response.id': 'msg_stream_1', + 'gen_ai.usage.input_tokens': 10, + 'gen_ai.usage.output_tokens': 15, + 'gen_ai.usage.total_tokens': 25, + }), + }), ]), }; @@ -331,6 +348,14 @@ describe('Anthropic integration', () => { 'gen_ai.response.text': 'Hello from stream!', }), }), + expect.objectContaining({ + description: 'messages claude-3-haiku-20240307 stream-response', + op: 'gen_ai.messages', + data: expect.objectContaining({ + 'gen_ai.response.streaming': true, + 'gen_ai.response.text': 'Hello from stream!', + }), + }), ]), }; diff --git a/packages/core/src/utils/anthropic-ai/index.ts b/packages/core/src/utils/anthropic-ai/index.ts index d81741668be9..669d8a61b068 100644 --- a/packages/core/src/utils/anthropic-ai/index.ts +++ b/packages/core/src/utils/anthropic-ai/index.ts @@ -205,8 +205,8 @@ function handleStreamingError(error: unknown, span: Span, methodPath: string): n * Handle streaming cases with common logic */ function handleStreamingRequest( - originalMethod: (...args: T) => Promise, - target: (...args: T) => Promise, + originalMethod: (...args: T) => R | Promise, + target: (...args: T) => R | Promise, context: unknown, args: T, requestAttributes: Record, @@ -215,7 +215,8 @@ function handleStreamingRequest( params: Record | undefined, options: AnthropicAiOptions, isStreamRequested: boolean, -): Promise { + isStreamingMethod: boolean, +): R | Promise { const model = requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] ?? 'unknown'; const spanConfig = { name: `${operationName} ${model} stream-response`, @@ -223,7 +224,8 @@ function handleStreamingRequest( attributes: requestAttributes as Record, }; - if (isStreamRequested) { + // messages.stream() always returns a sync MessageStream, even with stream: true param + if (isStreamRequested && !isStreamingMethod) { return startSpanManual(spanConfig, async span => { try { if (options.recordInputs && params) { @@ -260,13 +262,13 @@ function handleStreamingRequest( * @see https://docs.sentry.io/platforms/javascript/guides/node/tracing/instrumentation/ai-agents-module/#manual-instrumentation */ function instrumentMethod( - originalMethod: (...args: T) => Promise, + originalMethod: (...args: T) => R | Promise, methodPath: AnthropicAiInstrumentedMethod, context: unknown, options: AnthropicAiOptions, -): (...args: T) => Promise { +): (...args: T) => R | Promise { return new Proxy(originalMethod, { - apply(target, thisArg, args: T): Promise { + apply(target, thisArg, args: T): R | Promise { const requestAttributes = extractRequestAttributes(args, methodPath); const model = requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] ?? 'unknown'; const operationName = getFinalOperationName(methodPath); @@ -287,6 +289,7 @@ function instrumentMethod( params, options, isStreamRequested, + isStreamingMethod, ); } @@ -320,7 +323,7 @@ function instrumentMethod( }, ); }, - }) as (...args: T) => Promise; + }) as (...args: T) => R | Promise; } /** @@ -333,7 +336,7 @@ function createDeepProxy(target: T, currentPath = '', options: const methodPath = buildMethodPath(currentPath, String(prop)); if (typeof value === 'function' && shouldInstrument(methodPath)) { - return instrumentMethod(value as (...args: unknown[]) => Promise, methodPath, obj, options); + return instrumentMethod(value as (...args: unknown[]) => unknown | Promise, methodPath, obj, options); } if (typeof value === 'function') {