Skip to content

Commit 7d050b5

Browse files
authored
fix(core): Fix wrong async types when instrumenting anthropic's stream api (#18007)
The issue surfaced when `message.stream` was used in conjunction with the `stream: true` option which would lead to us returning async results instead of the expected MessageStream from anthropic ai. We now take this into account and tightened the types. Closes: #17977
1 parent 925a4ea commit 7d050b5

File tree

3 files changed

+111
-13
lines changed

3 files changed

+111
-13
lines changed

dev-packages/node-integration-tests/suites/tracing/anthropic/scenario-stream.mjs

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,62 @@ function createMockStreamEvents(model = 'claude-3-haiku-20240307') {
3232
return generator();
3333
}
3434

35+
// Mimics Anthropic SDK's MessageStream class
36+
class MockMessageStream {
37+
constructor(model) {
38+
this._model = model;
39+
this._eventHandlers = {};
40+
}
41+
42+
on(event, handler) {
43+
if (!this._eventHandlers[event]) {
44+
this._eventHandlers[event] = [];
45+
}
46+
this._eventHandlers[event].push(handler);
47+
48+
// Start processing events asynchronously (don't await)
49+
if (event === 'streamEvent' && !this._processing) {
50+
this._processing = true;
51+
this._processEvents();
52+
}
53+
54+
return this;
55+
}
56+
57+
async _processEvents() {
58+
try {
59+
const generator = createMockStreamEvents(this._model);
60+
for await (const event of generator) {
61+
if (this._eventHandlers['streamEvent']) {
62+
for (const handler of this._eventHandlers['streamEvent']) {
63+
handler(event);
64+
}
65+
}
66+
}
67+
68+
// Emit 'message' event when done
69+
if (this._eventHandlers['message']) {
70+
for (const handler of this._eventHandlers['message']) {
71+
handler();
72+
}
73+
}
74+
} catch (error) {
75+
if (this._eventHandlers['error']) {
76+
for (const handler of this._eventHandlers['error']) {
77+
handler(error);
78+
}
79+
}
80+
}
81+
}
82+
83+
async *[Symbol.asyncIterator]() {
84+
const generator = createMockStreamEvents(this._model);
85+
for await (const event of generator) {
86+
yield event;
87+
}
88+
}
89+
}
90+
3591
class MockAnthropic {
3692
constructor(config) {
3793
this.apiKey = config.apiKey;
@@ -68,9 +124,9 @@ class MockAnthropic {
68124
};
69125
}
70126

71-
async _messagesStream(params) {
72-
await new Promise(resolve => setTimeout(resolve, 5));
73-
return createMockStreamEvents(params?.model);
127+
// This should return synchronously (like the real Anthropic SDK)
128+
_messagesStream(params) {
129+
return new MockMessageStream(params?.model);
74130
}
75131
}
76132

@@ -90,13 +146,27 @@ async function run() {
90146
}
91147

92148
// 2) Streaming via messages.stream API
93-
const stream2 = await client.messages.stream({
149+
const stream2 = client.messages.stream({
94150
model: 'claude-3-haiku-20240307',
95151
messages: [{ role: 'user', content: 'Stream this too' }],
96152
});
97153
for await (const _ of stream2) {
98154
void _;
99155
}
156+
157+
// 3) Streaming via messages.stream API with redundant stream: true param
158+
const stream3 = client.messages.stream({
159+
model: 'claude-3-haiku-20240307',
160+
messages: [{ role: 'user', content: 'Stream with param' }],
161+
stream: true, // This param is redundant but should not break synchronous behavior
162+
});
163+
// Verify it has .on() method immediately (not a Promise)
164+
if (typeof stream3.on !== 'function') {
165+
throw new Error('BUG: messages.stream() with stream: true did not return MessageStream synchronously!');
166+
}
167+
for await (const _ of stream3) {
168+
void _;
169+
}
100170
});
101171
}
102172

dev-packages/node-integration-tests/suites/tracing/anthropic/test.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,23 @@ describe('Anthropic integration', () => {
308308
'gen_ai.usage.total_tokens': 25,
309309
}),
310310
}),
311+
// messages.stream with redundant stream: true param
312+
expect.objectContaining({
313+
description: 'messages claude-3-haiku-20240307 stream-response',
314+
op: 'gen_ai.messages',
315+
data: expect.objectContaining({
316+
'gen_ai.system': 'anthropic',
317+
'gen_ai.operation.name': 'messages',
318+
'gen_ai.request.model': 'claude-3-haiku-20240307',
319+
'gen_ai.request.stream': true,
320+
'gen_ai.response.streaming': true,
321+
'gen_ai.response.model': 'claude-3-haiku-20240307',
322+
'gen_ai.response.id': 'msg_stream_1',
323+
'gen_ai.usage.input_tokens': 10,
324+
'gen_ai.usage.output_tokens': 15,
325+
'gen_ai.usage.total_tokens': 25,
326+
}),
327+
}),
311328
]),
312329
};
313330

@@ -331,6 +348,14 @@ describe('Anthropic integration', () => {
331348
'gen_ai.response.text': 'Hello from stream!',
332349
}),
333350
}),
351+
expect.objectContaining({
352+
description: 'messages claude-3-haiku-20240307 stream-response',
353+
op: 'gen_ai.messages',
354+
data: expect.objectContaining({
355+
'gen_ai.response.streaming': true,
356+
'gen_ai.response.text': 'Hello from stream!',
357+
}),
358+
}),
334359
]),
335360
};
336361

packages/core/src/utils/anthropic-ai/index.ts

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -205,8 +205,8 @@ function handleStreamingError(error: unknown, span: Span, methodPath: string): n
205205
* Handle streaming cases with common logic
206206
*/
207207
function handleStreamingRequest<T extends unknown[], R>(
208-
originalMethod: (...args: T) => Promise<R>,
209-
target: (...args: T) => Promise<R>,
208+
originalMethod: (...args: T) => R | Promise<R>,
209+
target: (...args: T) => R | Promise<R>,
210210
context: unknown,
211211
args: T,
212212
requestAttributes: Record<string, unknown>,
@@ -215,15 +215,17 @@ function handleStreamingRequest<T extends unknown[], R>(
215215
params: Record<string, unknown> | undefined,
216216
options: AnthropicAiOptions,
217217
isStreamRequested: boolean,
218-
): Promise<R> {
218+
isStreamingMethod: boolean,
219+
): R | Promise<R> {
219220
const model = requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] ?? 'unknown';
220221
const spanConfig = {
221222
name: `${operationName} ${model} stream-response`,
222223
op: getSpanOperation(methodPath),
223224
attributes: requestAttributes as Record<string, SpanAttributeValue>,
224225
};
225226

226-
if (isStreamRequested) {
227+
// messages.stream() always returns a sync MessageStream, even with stream: true param
228+
if (isStreamRequested && !isStreamingMethod) {
227229
return startSpanManual(spanConfig, async span => {
228230
try {
229231
if (options.recordInputs && params) {
@@ -260,13 +262,13 @@ function handleStreamingRequest<T extends unknown[], R>(
260262
* @see https://docs.sentry.io/platforms/javascript/guides/node/tracing/instrumentation/ai-agents-module/#manual-instrumentation
261263
*/
262264
function instrumentMethod<T extends unknown[], R>(
263-
originalMethod: (...args: T) => Promise<R>,
265+
originalMethod: (...args: T) => R | Promise<R>,
264266
methodPath: AnthropicAiInstrumentedMethod,
265267
context: unknown,
266268
options: AnthropicAiOptions,
267-
): (...args: T) => Promise<R> {
269+
): (...args: T) => R | Promise<R> {
268270
return new Proxy(originalMethod, {
269-
apply(target, thisArg, args: T): Promise<R> {
271+
apply(target, thisArg, args: T): R | Promise<R> {
270272
const requestAttributes = extractRequestAttributes(args, methodPath);
271273
const model = requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] ?? 'unknown';
272274
const operationName = getFinalOperationName(methodPath);
@@ -287,6 +289,7 @@ function instrumentMethod<T extends unknown[], R>(
287289
params,
288290
options,
289291
isStreamRequested,
292+
isStreamingMethod,
290293
);
291294
}
292295

@@ -320,7 +323,7 @@ function instrumentMethod<T extends unknown[], R>(
320323
},
321324
);
322325
},
323-
}) as (...args: T) => Promise<R>;
326+
}) as (...args: T) => R | Promise<R>;
324327
}
325328

326329
/**
@@ -333,7 +336,7 @@ function createDeepProxy<T extends object>(target: T, currentPath = '', options:
333336
const methodPath = buildMethodPath(currentPath, String(prop));
334337

335338
if (typeof value === 'function' && shouldInstrument(methodPath)) {
336-
return instrumentMethod(value as (...args: unknown[]) => Promise<unknown>, methodPath, obj, options);
339+
return instrumentMethod(value as (...args: unknown[]) => unknown | Promise<unknown>, methodPath, obj, options);
337340
}
338341

339342
if (typeof value === 'function') {

0 commit comments

Comments
 (0)