Skip to content

Commit b830cb2

Browse files
feat(core): Support stream responses and tool calls for Google GenAI (#17664)
✅ This PR depends on #17625 and it should be reviewed only after that one is merged. This PR adds: 1. Streaming support for Google GenAI sdk instrumenting the following methods: 1. Models API Streaming - `models.generateContentStream()` - Stream content generation with real-time chunks 3. Chat API Streaming (chats.create) - `chat.sendMessageStream()` - Stream chat responses with conversation context 2. New tool calls attributes - GEN_AI_REQUEST_AVAILABLE_TOOLS_ATTRIBUTE - Captures available tools from requests - GEN_AI_RESPONSE_TOOL_CALLS_ATTRIBUTE - Captures function calls from responses --------- Co-authored-by: Andrei <[email protected]>
1 parent 985873e commit b830cb2

File tree

8 files changed

+1105
-41
lines changed

8 files changed

+1105
-41
lines changed
Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
import { GoogleGenAI } from '@google/genai';
2+
import * as Sentry from '@sentry/node';
3+
import express from 'express';
4+
5+
function startMockGoogleGenAIServer() {
6+
const app = express();
7+
app.use(express.json());
8+
9+
// Streaming endpoint for models.generateContentStream and chat.sendMessageStream
10+
app.post('/v1beta/models/:model\\:streamGenerateContent', (req, res) => {
11+
const model = req.params.model;
12+
13+
if (model === 'error-model') {
14+
res.status(404).set('x-request-id', 'mock-request-123').end('Model not found');
15+
return;
16+
}
17+
18+
// Set headers for streaming response
19+
res.setHeader('Content-Type', 'application/json');
20+
res.setHeader('Transfer-Encoding', 'chunked');
21+
22+
// Create a mock stream
23+
const mockStream = createMockStream(model);
24+
25+
// Send chunks
26+
const sendChunk = async () => {
27+
const { value, done } = await mockStream.next();
28+
if (done) {
29+
res.end();
30+
return;
31+
}
32+
33+
res.write(`data: ${JSON.stringify(value)}\n\n`);
34+
setTimeout(sendChunk, 10); // Small delay between chunks
35+
};
36+
37+
sendChunk();
38+
});
39+
40+
return new Promise(resolve => {
41+
const server = app.listen(0, () => {
42+
resolve(server);
43+
});
44+
});
45+
}
46+
47+
// Helper function to create mock stream
48+
async function* createMockStream(model) {
49+
if (model === 'blocked-model') {
50+
// First chunk: Contains promptFeedback with blockReason
51+
yield {
52+
promptFeedback: {
53+
blockReason: 'SAFETY',
54+
blockReasonMessage: 'The prompt was blocked due to safety concerns',
55+
},
56+
responseId: 'mock-blocked-response-streaming-id',
57+
modelVersion: 'gemini-1.5-pro',
58+
};
59+
60+
// Note: In a real blocked scenario, there would typically be no more chunks
61+
// But we'll add one more to test that processing stops after the error
62+
yield {
63+
candidates: [
64+
{
65+
content: {
66+
parts: [{ text: 'This should not be processed' }],
67+
role: 'model',
68+
},
69+
index: 0,
70+
},
71+
],
72+
};
73+
return;
74+
}
75+
76+
// First chunk: Start of response with initial text
77+
yield {
78+
candidates: [
79+
{
80+
content: {
81+
parts: [{ text: 'Hello! ' }],
82+
role: 'model',
83+
},
84+
index: 0,
85+
},
86+
],
87+
responseId: 'mock-response-streaming-id',
88+
modelVersion: 'gemini-1.5-pro',
89+
};
90+
91+
// Second chunk: More text content
92+
yield {
93+
candidates: [
94+
{
95+
content: {
96+
parts: [{ text: 'This is a streaming ' }],
97+
role: 'model',
98+
},
99+
index: 0,
100+
},
101+
],
102+
};
103+
104+
// Third chunk: Final text content
105+
yield {
106+
candidates: [
107+
{
108+
content: {
109+
parts: [{ text: 'response from Google GenAI!' }],
110+
role: 'model',
111+
},
112+
index: 0,
113+
},
114+
],
115+
};
116+
117+
// Final chunk: End with finish reason and usage metadata
118+
yield {
119+
candidates: [
120+
{
121+
content: {
122+
parts: [{ text: '' }], // Empty text in final chunk
123+
role: 'model',
124+
},
125+
finishReason: 'STOP',
126+
index: 0,
127+
},
128+
],
129+
usageMetadata: {
130+
promptTokenCount: 10,
131+
candidatesTokenCount: 12,
132+
totalTokenCount: 22,
133+
},
134+
};
135+
}
136+
137+
async function run() {
138+
const server = await startMockGoogleGenAIServer();
139+
140+
await Sentry.startSpan({ op: 'function', name: 'main' }, async () => {
141+
const client = new GoogleGenAI({
142+
apiKey: 'mock-api-key',
143+
httpOptions: { baseUrl: `http://localhost:${server.address().port}` },
144+
});
145+
146+
// Test 1: models.generateContentStream (streaming)
147+
const streamResponse = await client.models.generateContentStream({
148+
model: 'gemini-1.5-flash',
149+
config: {
150+
temperature: 0.7,
151+
topP: 0.9,
152+
maxOutputTokens: 100,
153+
},
154+
contents: [
155+
{
156+
role: 'user',
157+
parts: [{ text: 'Tell me about streaming' }],
158+
},
159+
],
160+
});
161+
162+
// Consume the stream
163+
for await (const _ of streamResponse) {
164+
void _;
165+
}
166+
167+
// Test 2: chat.sendMessageStream (streaming)
168+
const streamingChat = client.chats.create({
169+
model: 'gemini-1.5-pro',
170+
config: {
171+
temperature: 0.8,
172+
topP: 0.9,
173+
maxOutputTokens: 150,
174+
},
175+
});
176+
177+
const chatStreamResponse = await streamingChat.sendMessageStream({
178+
message: 'Tell me a streaming joke',
179+
});
180+
181+
// Consume the chat stream
182+
for await (const _ of chatStreamResponse) {
183+
void _;
184+
}
185+
186+
// Test 3: Blocked content streaming (should trigger error handling)
187+
try {
188+
const blockedStreamResponse = await client.models.generateContentStream({
189+
model: 'blocked-model',
190+
config: {
191+
temperature: 0.7,
192+
},
193+
contents: [
194+
{
195+
role: 'user',
196+
parts: [{ text: 'This should be blocked' }],
197+
},
198+
],
199+
});
200+
201+
// Consume the blocked stream
202+
for await (const _ of blockedStreamResponse) {
203+
void _;
204+
}
205+
} catch {
206+
// Expected: The stream should be processed, but the span should be marked with error status
207+
// The error handling happens in the streaming instrumentation, not as a thrown error
208+
}
209+
210+
// Test 4: Error handling for streaming
211+
try {
212+
const errorStreamResponse = await client.models.generateContentStream({
213+
model: 'error-model',
214+
config: {
215+
temperature: 0.7,
216+
},
217+
contents: [
218+
{
219+
role: 'user',
220+
parts: [{ text: 'This will fail' }],
221+
},
222+
],
223+
});
224+
225+
// Consume the error stream
226+
for await (const _ of errorStreamResponse) {
227+
void _;
228+
}
229+
} catch {
230+
// Expected error
231+
}
232+
});
233+
234+
server.close();
235+
}
236+
237+
run();

0 commit comments

Comments
 (0)