Skip to content

Commit 369a8fd

Browse files
hmstepanekumaannamalaimergify[bot]
committed
Attach ml_event to APM entity by default (#940)
* Attach non InferenceEvents to APM entity * Validate both resource payloads * Add tests for non-inference events * Add OpenAI sync embedding instrumentation (#938) * Add sync instrumentation for OpenAI embeddings. * Remove comments. * Clean up embedding event dictionary. * Update response_time to duration. * Linting fixes. * [Mega-Linter] Apply linters fixes * Trigger tests --------- Co-authored-by: umaannamalai <[email protected]> Co-authored-by: Hannah Stepanek <[email protected]> * Fixup: test names --------- Co-authored-by: Uma Annamalai <[email protected]> Co-authored-by: umaannamalai <[email protected]> Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
1 parent 732fedb commit 369a8fd

File tree

3 files changed

+220
-42
lines changed

3 files changed

+220
-42
lines changed

newrelic/core/otlp_utils.py

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import logging
2323

24+
from newrelic.api.time_trace import get_service_linking_metadata
2425
from newrelic.common.encoding_utils import json_encode
2526
from newrelic.core.config import global_settings
2627
from newrelic.core.stats_engine import CountStats, TimeStats
@@ -124,8 +125,11 @@ def create_key_values_from_iterable(iterable):
124125
)
125126

126127

127-
def create_resource(attributes=None):
128+
def create_resource(attributes=None, attach_apm_entity=True):
128129
attributes = attributes or {"instrumentation.provider": "newrelic-opentelemetry-python-ml"}
130+
if attach_apm_entity:
131+
metadata = get_service_linking_metadata()
132+
attributes.update(metadata)
129133
return Resource(attributes=create_key_values_from_iterable(attributes))
130134

131135

@@ -203,7 +207,7 @@ def stats_to_otlp_metrics(metric_data, start_time, end_time):
203207

204208

205209
def encode_metric_data(metric_data, start_time, end_time, resource=None, scope=None):
206-
resource = resource or create_resource()
210+
resource = resource or create_resource(attach_apm_entity=False)
207211
return MetricsData(
208212
resource_metrics=[
209213
ResourceMetrics(
@@ -220,24 +224,45 @@ def encode_metric_data(metric_data, start_time, end_time, resource=None, scope=N
220224

221225

222226
def encode_ml_event_data(custom_event_data, agent_run_id):
223-
resource = create_resource()
224-
ml_events = []
227+
# An InferenceEvent is attached to a separate ML Model entity instead
228+
# of the APM entity.
229+
ml_inference_events = []
230+
ml_apm_events = []
225231
for event in custom_event_data:
226232
event_info, event_attrs = event
233+
event_type = event_info["type"]
227234
event_attrs.update(
228235
{
229236
"real_agent_id": agent_run_id,
230237
"event.domain": "newrelic.ml_events",
231-
"event.name": event_info["type"],
238+
"event.name": event_type,
232239
}
233240
)
234241
ml_attrs = create_key_values_from_iterable(event_attrs)
235242
unix_nano_timestamp = event_info["timestamp"] * 1e6
236-
ml_events.append(
237-
{
238-
"time_unix_nano": int(unix_nano_timestamp),
239-
"attributes": ml_attrs,
240-
}
243+
if event_type == "InferenceEvent":
244+
ml_inference_events.append(
245+
{
246+
"time_unix_nano": int(unix_nano_timestamp),
247+
"attributes": ml_attrs,
248+
}
249+
)
250+
else:
251+
ml_apm_events.append(
252+
{
253+
"time_unix_nano": int(unix_nano_timestamp),
254+
"attributes": ml_attrs,
255+
}
256+
)
257+
258+
resource_logs = []
259+
if ml_inference_events:
260+
inference_resource = create_resource(attach_apm_entity=False)
261+
resource_logs.append(
262+
ResourceLogs(resource=inference_resource, scope_logs=[ScopeLogs(log_records=ml_inference_events)])
241263
)
264+
if ml_apm_events:
265+
apm_resource = create_resource()
266+
resource_logs.append(ResourceLogs(resource=apm_resource, scope_logs=[ScopeLogs(log_records=ml_apm_events)]))
242267

243-
return LogsData(resource_logs=[ResourceLogs(resource=resource, scope_logs=[ScopeLogs(log_records=ml_events)])])
268+
return LogsData(resource_logs=resource_logs)

tests/agent_features/test_ml_events.py

Lines changed: 129 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,23 +58,94 @@ def core_app(collector_agent_registration):
5858

5959

6060
@validate_ml_event_payload(
61-
[{"foo": "bar", "real_agent_id": "1234567", "event.domain": "newrelic.ml_events", "event.name": "InferenceEvent"}]
61+
{
62+
"apm": [
63+
{
64+
"foo": "bar",
65+
"real_agent_id": "1234567",
66+
"event.domain": "newrelic.ml_events",
67+
"event.name": "MyCustomEvent",
68+
}
69+
]
70+
}
6271
)
6372
@reset_core_stats_engine()
64-
def test_ml_event_payload_inside_transaction(core_app):
73+
def test_ml_event_payload_noninference_event_inside_transaction(core_app):
74+
@background_task(name="test_ml_event_payload_inside_transaction")
75+
def _test():
76+
record_ml_event("MyCustomEvent", {"foo": "bar"})
77+
78+
_test()
79+
core_app.harvest()
80+
81+
82+
@validate_ml_event_payload(
83+
{
84+
"inference": [
85+
{
86+
"foo": "bar",
87+
"real_agent_id": "1234567",
88+
"event.domain": "newrelic.ml_events",
89+
"event.name": "InferenceEvent",
90+
}
91+
]
92+
}
93+
)
94+
@reset_core_stats_engine()
95+
def test_ml_event_payload_inference_event_inside_transaction(core_app):
96+
@background_task(name="test_ml_event_payload_inside_transaction")
97+
def _test():
98+
record_ml_event("InferenceEvent", {"foo": "bar"})
99+
100+
_test()
101+
core_app.harvest()
102+
103+
104+
@validate_ml_event_payload(
105+
{
106+
"apm": [
107+
{
108+
"foo": "bar",
109+
"real_agent_id": "1234567",
110+
"event.domain": "newrelic.ml_events",
111+
"event.name": "MyCustomEvent",
112+
}
113+
],
114+
"inference": [
115+
{
116+
"foo": "bar",
117+
"real_agent_id": "1234567",
118+
"event.domain": "newrelic.ml_events",
119+
"event.name": "InferenceEvent",
120+
}
121+
],
122+
}
123+
)
124+
@reset_core_stats_engine()
125+
def test_ml_event_payload_both_events_inside_transaction(core_app):
65126
@background_task(name="test_ml_event_payload_inside_transaction")
66127
def _test():
67128
record_ml_event("InferenceEvent", {"foo": "bar"})
129+
record_ml_event("MyCustomEvent", {"foo": "bar"})
68130

69131
_test()
70132
core_app.harvest()
71133

72134

73135
@validate_ml_event_payload(
74-
[{"foo": "bar", "real_agent_id": "1234567", "event.domain": "newrelic.ml_events", "event.name": "InferenceEvent"}]
136+
{
137+
"inference": [
138+
{
139+
"foo": "bar",
140+
"real_agent_id": "1234567",
141+
"event.domain": "newrelic.ml_events",
142+
"event.name": "InferenceEvent",
143+
}
144+
]
145+
}
75146
)
76147
@reset_core_stats_engine()
77-
def test_ml_event_payload_outside_transaction(core_app):
148+
def test_ml_event_payload_inference_event_outside_transaction(core_app):
78149
def _test():
79150
app = application()
80151
record_ml_event("InferenceEvent", {"foo": "bar"}, application=app)
@@ -83,6 +154,59 @@ def _test():
83154
core_app.harvest()
84155

85156

157+
@validate_ml_event_payload(
158+
{
159+
"apm": [
160+
{
161+
"foo": "bar",
162+
"real_agent_id": "1234567",
163+
"event.domain": "newrelic.ml_events",
164+
"event.name": "MyCustomEvent",
165+
}
166+
],
167+
"inference": [
168+
{
169+
"foo": "bar",
170+
"real_agent_id": "1234567",
171+
"event.domain": "newrelic.ml_events",
172+
"event.name": "InferenceEvent",
173+
}
174+
],
175+
}
176+
)
177+
@reset_core_stats_engine()
178+
def test_ml_event_payload_both_events_outside_transaction(core_app):
179+
def _test():
180+
app = application()
181+
record_ml_event("InferenceEvent", {"foo": "bar"}, application=app)
182+
record_ml_event("MyCustomEvent", {"foo": "bar"}, application=app)
183+
184+
_test()
185+
core_app.harvest()
186+
187+
188+
@validate_ml_event_payload(
189+
{
190+
"apm": [
191+
{
192+
"foo": "bar",
193+
"real_agent_id": "1234567",
194+
"event.domain": "newrelic.ml_events",
195+
"event.name": "MyCustomEvent",
196+
}
197+
]
198+
}
199+
)
200+
@reset_core_stats_engine()
201+
def test_ml_event_payload_noninference_event_outside_transaction(core_app):
202+
def _test():
203+
app = application()
204+
record_ml_event("MyCustomEvent", {"foo": "bar"}, application=app)
205+
206+
_test()
207+
core_app.harvest()
208+
209+
86210
@pytest.mark.parametrize(
87211
"params,expected",
88212
[
@@ -178,6 +302,7 @@ def test_record_ml_event_outside_transaction_params_not_a_dict():
178302

179303
# Tests for ML Events configuration settings
180304

305+
181306
@override_application_settings({"ml_insights_events.enabled": False})
182307
@reset_core_stats_engine()
183308
@validate_ml_event_count(count=0)

tests/testing_support/validators/validate_ml_event_payload.py

Lines changed: 55 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -41,23 +41,36 @@ def payload_to_ml_events(payload):
4141
else:
4242
message = payload
4343

44-
resource_logs = message.get("resource_logs")
45-
assert len(resource_logs) == 1
46-
resource_logs = resource_logs[0]
47-
resource = resource_logs.get("resource")
48-
assert resource and resource.get("attributes")[0] == {
49-
"key": "instrumentation.provider",
50-
"value": {"string_value": "newrelic-opentelemetry-python-ml"},
51-
}
52-
scope_logs = resource_logs.get("scope_logs")
53-
assert len(scope_logs) == 1
54-
scope_logs = scope_logs[0]
55-
56-
scope = scope_logs.get("scope")
57-
assert scope is None
58-
logs = scope_logs.get("log_records")
59-
60-
return logs
44+
inference_logs = []
45+
apm_logs = []
46+
resource_log_records = message.get("resource_logs")
47+
for resource_logs in resource_log_records:
48+
resource = resource_logs.get("resource")
49+
assert resource
50+
resource_attrs = resource.get("attributes")
51+
assert {
52+
"key": "instrumentation.provider",
53+
"value": {"string_value": "newrelic-opentelemetry-python-ml"},
54+
} in resource_attrs
55+
scope_logs = resource_logs.get("scope_logs")
56+
assert len(scope_logs) == 1
57+
scope_logs = scope_logs[0]
58+
59+
scope = scope_logs.get("scope")
60+
assert scope is None
61+
logs = scope_logs.get("log_records")
62+
event_name = get_event_name(logs)
63+
if event_name == "InferenceEvent":
64+
inference_logs = logs
65+
else:
66+
# Make sure apm entity attrs are present on the resource.
67+
expected_apm_keys = ("entity.type", "entity.name", "entity.guid", "hostname", "instrumentation.provider")
68+
assert all(attr["key"] in expected_apm_keys for attr in resource_attrs)
69+
assert all(attr["value"] not in ("", None) for attr in resource_attrs)
70+
71+
apm_logs = logs
72+
73+
return inference_logs, apm_logs
6174

6275

6376
def validate_ml_event_payload(ml_events=None):
@@ -86,19 +99,34 @@ def _bind_params(method, payload=(), *args, **kwargs):
8699
assert recorded_ml_events
87100

88101
decoded_payloads = [payload_to_ml_events(payload) for payload in recorded_ml_events]
89-
all_logs = []
90-
for sent_logs in decoded_payloads:
91-
for data_point in sent_logs:
92-
for key in ("time_unix_nano",):
93-
assert key in data_point, "Invalid log format. Missing key: %s" % key
102+
decoded_inference_payloads = [payload[0] for payload in decoded_payloads]
103+
decoded_apm_payloads = [payload[1] for payload in decoded_payloads]
104+
all_apm_logs = normalize_logs(decoded_apm_payloads)
105+
all_inference_logs = normalize_logs(decoded_inference_payloads)
106+
107+
for expected_event in ml_events.get("inference", []):
108+
assert expected_event in all_inference_logs, "%s Not Found. Got: %s" % (expected_event, all_inference_logs)
94109

110+
for expected_event in ml_events.get("apm", []):
111+
assert expected_event in all_apm_logs, "%s Not Found. Got: %s" % (expected_event, all_apm_logs)
112+
return val
113+
114+
return _validate_wrapper
115+
116+
117+
def normalize_logs(decoded_payloads):
118+
all_logs = []
119+
for sent_logs in decoded_payloads:
120+
for data_point in sent_logs:
121+
for key in ("time_unix_nano",):
122+
assert key in data_point, "Invalid log format. Missing key: %s" % key
95123
all_logs.append(
96124
{attr["key"]: attribute_to_value(attr["value"]) for attr in (data_point.get("attributes") or [])}
97125
)
126+
return all_logs
98127

99-
for expected_event in ml_events:
100-
assert expected_event in all_logs, "%s Not Found. Got: %s" % (expected_event, all_logs)
101128

102-
return val
103-
104-
return _validate_wrapper
129+
def get_event_name(logs):
130+
for attr in logs[0]["attributes"]:
131+
if attr["key"] == "event.name":
132+
return attr["value"]["string_value"]

0 commit comments

Comments
 (0)