Skip to content

Commit ee029e8

Browse files
committed
Add support for streaming in openai
1 parent aaedae6 commit ee029e8

File tree

4 files changed

+522
-13
lines changed

4 files changed

+522
-13
lines changed

newrelic/config.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2052,6 +2052,12 @@ def _process_module_builtin_defaults():
20522052
"newrelic.hooks.mlmodel_openai",
20532053
"instrument_openai_util",
20542054
)
2055+
_process_module_definition(
2056+
"openai.api_requestor",
2057+
"newrelic.hooks.mlmodel_openai",
2058+
"instrument_openai_api_requestor_api_requestor",
2059+
)
2060+
20552061
_process_module_definition(
20562062
"asyncio.base_events",
20572063
"newrelic.hooks.coroutines_asyncio",

newrelic/hooks/mlmodel_openai.py

Lines changed: 130 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,129 @@ def wrap_embedding_create(wrapped, instance, args, kwargs):
9595
return response
9696

9797

98+
def wrap_interpret_response_line(wrapped, instance, args, kwargs):
99+
print("Inside _wrap_interpret_response_line")
100+
# If not active transaction return immediately.
101+
transaction = current_transaction()
102+
if not transaction:
103+
return wrapped(*args, **kwargs)
104+
105+
try:
106+
response = wrapped(*args, **kwargs)
107+
except Exception as e:
108+
# If StopIteration is raised, record the events as that indicates the end of the
109+
# streamed response.
110+
if isinstance(e, StopIteration):
111+
# end of iteration
112+
print("Transaction attrs:%s" % (transaction._nr_openai_content))
113+
content = transaction._nr_openai_content
114+
role = transaction._nr_openai_role
115+
116+
custom_attrs_dict = transaction._custom_params
117+
conversation_id = custom_attrs_dict.get("conversation_id", "")
118+
119+
chat_completion_id = str(uuid.uuid4())
120+
available_metadata = get_trace_linking_metadata()
121+
span_id = available_metadata.get("span.id", "")
122+
trace_id = available_metadata.get("trace.id", "")
123+
124+
response_headers = getattr(response, "_nr_response_headers", None)
125+
response_model = response.get("model", "")
126+
settings = transaction.settings if transaction.settings is not None else global_settings()
127+
response_id = response.get("id")
128+
request_id = response_headers.get("x-request-id", "")
129+
130+
api_key = getattr(response, "api_key", None)
131+
response_usage = response.get("usage", {})
132+
133+
messages = kwargs.get("messages", [])
134+
choices = response.get("choices", [])
135+
136+
chat_completion_summary_dict = {
137+
"id": chat_completion_id,
138+
"appName": settings.app_name,
139+
"conversation_id": conversation_id,
140+
"span_id": span_id,
141+
"trace_id": trace_id,
142+
"transaction_id": transaction._transaction_id,
143+
"request_id": request_id,
144+
"api_key_last_four_digits": f"sk-{api_key[-4:]}" if api_key else "",
145+
"duration": ft.duration,
146+
"request.model": kwargs.get("model") or kwargs.get("engine") or "",
147+
"response.model": response_model,
148+
"response.organization": getattr(response, "organization", ""),
149+
"response.usage.completion_tokens": response_usage.get("completion_tokens", "")
150+
if any(response_usage)
151+
else "",
152+
"response.usage.total_tokens": response_usage.get("total_tokens", "") if any(response_usage) else "",
153+
"response.usage.prompt_tokens": response_usage.get("prompt_tokens", "") if any(response_usage) else "",
154+
"request.temperature": kwargs.get("temperature", ""),
155+
"request.max_tokens": kwargs.get("max_tokens", ""),
156+
"response.choices.finish_reason": choices[0].finish_reason if choices else "",
157+
"response.api_type": getattr(response, "api_type", ""),
158+
"response.headers.llmVersion": response_headers.get("openai-version", ""),
159+
"response.headers.ratelimitLimitRequests": check_rate_limit_header(
160+
response_headers, "x-ratelimit-limit-requests", True
161+
),
162+
"response.headers.ratelimitLimitTokens": check_rate_limit_header(
163+
response_headers, "x-ratelimit-limit-tokens", True
164+
),
165+
"response.headers.ratelimitResetTokens": check_rate_limit_header(
166+
response_headers, "x-ratelimit-reset-tokens", False
167+
),
168+
"response.headers.ratelimitResetRequests": check_rate_limit_header(
169+
response_headers, "x-ratelimit-reset-requests", False
170+
),
171+
"response.headers.ratelimitRemainingTokens": check_rate_limit_header(
172+
response_headers, "x-ratelimit-remaining-tokens", True
173+
),
174+
"response.headers.ratelimitRemainingRequests": check_rate_limit_header(
175+
response_headers, "x-ratelimit-remaining-requests", True
176+
),
177+
"vendor": "openAI",
178+
"response.number_of_messages": len(messages) + len(choices),
179+
}
180+
181+
transaction.record_ml_event("LlmChatCompletionSummary", chat_completion_summary_dict)
182+
message_list = list(messages)
183+
if choices:
184+
message_list.extend([choices[0].message])
185+
186+
create_chat_completion_message_event(
187+
transaction,
188+
settings.app_name,
189+
message_list,
190+
chat_completion_id,
191+
span_id,
192+
trace_id,
193+
response_model,
194+
response_id,
195+
request_id,
196+
conversation_id,
197+
)
198+
raise
199+
200+
rbody, rcode, rheaders, stream = bind_interpret_response_line_params(*args, **kwargs)
201+
if not response or not stream:
202+
return response
203+
204+
data = getattr(response, "data", {})
205+
if data: # If there is an active transaction now.
206+
choices = data.get("choices", [])
207+
if choices:
208+
delta = choices[0].get("delta", {})
209+
if delta:
210+
transaction._nr_openai_content = getattr(transaction, "_nr_openai_content", "") + delta.get(
211+
"content", ""
212+
)
213+
transaction._nr_openai_role = getattr(transaction, "_nr_openai_role", None) or delta.get("role")
214+
return response
215+
216+
217+
def bind_interpret_response_line_params(rbody, rcode, rheaders, stream):
218+
return rbody, rcode, rheaders, stream
219+
220+
98221
def wrap_chat_completion_create(wrapped, instance, args, kwargs):
99222
transaction = current_transaction()
100223

@@ -107,7 +230,8 @@ def wrap_chat_completion_create(wrapped, instance, args, kwargs):
107230
with FunctionTrace(ft_name) as ft:
108231
response = wrapped(*args, **kwargs)
109232

110-
if not response:
233+
stream = kwargs.get("stream", False)
234+
if not response or stream:
111235
return response
112236

113237
custom_attrs_dict = transaction._custom_params
@@ -444,3 +568,8 @@ def instrument_openai_api_resources_chat_completion(module):
444568
wrap_function_wrapper(module, "ChatCompletion.create", wrap_chat_completion_create)
445569
if hasattr(module.ChatCompletion, "acreate"):
446570
wrap_function_wrapper(module, "ChatCompletion.acreate", wrap_chat_completion_acreate)
571+
572+
573+
def instrument_openai_api_requestor_api_requestor(module):
574+
if hasattr(module.APIRequestor, "_interpret_response_line"):
575+
wrap_function_wrapper(module, "APIRequestor._interpret_response_line", wrap_interpret_response_line)

tests/mlmodel_openai/conftest.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -111,20 +111,21 @@ def wrap_openai_api_requestor_request(wrapped, instance, args, kwargs):
111111
# Send request
112112
result = wrapped(*args, **kwargs)
113113

114-
# Clean up data
115-
data = result[0].data
116-
headers = result[0]._headers
117-
headers = dict(
118-
filter(
119-
lambda k: k[0].lower() in RECORDED_HEADERS
120-
or k[0].lower().startswith("openai")
121-
or k[0].lower().startswith("x-ratelimit"),
122-
headers.items(),
114+
if hasattr(result[0], "data"):
115+
# Clean up data
116+
data = result[0].data
117+
headers = result[0]._headers
118+
headers = dict(
119+
filter(
120+
lambda k: k[0].lower() in RECORDED_HEADERS
121+
or k[0].lower().startswith("openai")
122+
or k[0].lower().startswith("x-ratelimit"),
123+
headers.items(),
124+
)
123125
)
124-
)
125126

126-
# Log response
127-
OPENAI_AUDIT_LOG_CONTENTS[prompt] = headers, data # Append response data to audit log
127+
# Log response
128+
OPENAI_AUDIT_LOG_CONTENTS[prompt] = headers, data # Append response data to audit log
128129
return result
129130

130131

0 commit comments

Comments
 (0)