Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 36 additions & 11 deletions newrelic/core/otlp_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import logging

from newrelic.api.time_trace import get_service_linking_metadata
from newrelic.common.encoding_utils import json_encode
from newrelic.core.config import global_settings
from newrelic.core.stats_engine import CountStats, TimeStats
Expand Down Expand Up @@ -124,8 +125,11 @@ def create_key_values_from_iterable(iterable):
)


def create_resource(attributes=None):
def create_resource(attributes=None, attach_apm_entity=True):
attributes = attributes or {"instrumentation.provider": "newrelic-opentelemetry-python-ml"}
if attach_apm_entity:
metadata = get_service_linking_metadata()
attributes.update(metadata)
return Resource(attributes=create_key_values_from_iterable(attributes))


Expand Down Expand Up @@ -203,7 +207,7 @@ def stats_to_otlp_metrics(metric_data, start_time, end_time):


def encode_metric_data(metric_data, start_time, end_time, resource=None, scope=None):
resource = resource or create_resource()
resource = resource or create_resource(attach_apm_entity=False)
return MetricsData(
resource_metrics=[
ResourceMetrics(
Expand All @@ -220,24 +224,45 @@ def encode_metric_data(metric_data, start_time, end_time, resource=None, scope=N


def encode_ml_event_data(custom_event_data, agent_run_id):
resource = create_resource()
ml_events = []
# An InferenceEvent is attached to a separate ML Model entity instead
# of the APM entity.
ml_inference_events = []
ml_apm_events = []
for event in custom_event_data:
event_info, event_attrs = event
event_type = event_info["type"]
event_attrs.update(
{
"real_agent_id": agent_run_id,
"event.domain": "newrelic.ml_events",
"event.name": event_info["type"],
"event.name": event_type,
}
)
ml_attrs = create_key_values_from_iterable(event_attrs)
unix_nano_timestamp = event_info["timestamp"] * 1e6
ml_events.append(
{
"time_unix_nano": int(unix_nano_timestamp),
"attributes": ml_attrs,
}
if event_type == "InferenceEvent":
ml_inference_events.append(
{
"time_unix_nano": int(unix_nano_timestamp),
"attributes": ml_attrs,
}
)
else:
ml_apm_events.append(
{
"time_unix_nano": int(unix_nano_timestamp),
"attributes": ml_attrs,
}
)

resource_logs = []
if ml_inference_events:
inference_resource = create_resource(attach_apm_entity=False)
resource_logs.append(
ResourceLogs(resource=inference_resource, scope_logs=[ScopeLogs(log_records=ml_inference_events)])
)
if ml_apm_events:
apm_resource = create_resource()
resource_logs.append(ResourceLogs(resource=apm_resource, scope_logs=[ScopeLogs(log_records=ml_apm_events)]))

return LogsData(resource_logs=[ResourceLogs(resource=resource, scope_logs=[ScopeLogs(log_records=ml_events)])])
return LogsData(resource_logs=resource_logs)
133 changes: 129 additions & 4 deletions tests/agent_features/test_ml_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,23 +58,94 @@ def core_app(collector_agent_registration):


@validate_ml_event_payload(
[{"foo": "bar", "real_agent_id": "1234567", "event.domain": "newrelic.ml_events", "event.name": "InferenceEvent"}]
{
"apm": [
{
"foo": "bar",
"real_agent_id": "1234567",
"event.domain": "newrelic.ml_events",
"event.name": "MyCustomEvent",
}
]
}
)
@reset_core_stats_engine()
def test_ml_event_payload_inside_transaction(core_app):
def test_ml_event_payload_noninference_event_inside_transaction(core_app):
@background_task(name="test_ml_event_payload_inside_transaction")
def _test():
record_ml_event("MyCustomEvent", {"foo": "bar"})

_test()
core_app.harvest()


@validate_ml_event_payload(
{
"inference": [
{
"foo": "bar",
"real_agent_id": "1234567",
"event.domain": "newrelic.ml_events",
"event.name": "InferenceEvent",
}
]
}
)
@reset_core_stats_engine()
def test_ml_event_payload_inference_event_inside_transaction(core_app):
@background_task(name="test_ml_event_payload_inside_transaction")
def _test():
record_ml_event("InferenceEvent", {"foo": "bar"})

_test()
core_app.harvest()


@validate_ml_event_payload(
{
"apm": [
{
"foo": "bar",
"real_agent_id": "1234567",
"event.domain": "newrelic.ml_events",
"event.name": "MyCustomEvent",
}
],
"inference": [
{
"foo": "bar",
"real_agent_id": "1234567",
"event.domain": "newrelic.ml_events",
"event.name": "InferenceEvent",
}
],
}
)
@reset_core_stats_engine()
def test_ml_event_payload_both_events_inside_transaction(core_app):
@background_task(name="test_ml_event_payload_inside_transaction")
def _test():
record_ml_event("InferenceEvent", {"foo": "bar"})
record_ml_event("MyCustomEvent", {"foo": "bar"})

_test()
core_app.harvest()


@validate_ml_event_payload(
[{"foo": "bar", "real_agent_id": "1234567", "event.domain": "newrelic.ml_events", "event.name": "InferenceEvent"}]
{
"inference": [
{
"foo": "bar",
"real_agent_id": "1234567",
"event.domain": "newrelic.ml_events",
"event.name": "InferenceEvent",
}
]
}
)
@reset_core_stats_engine()
def test_ml_event_payload_outside_transaction(core_app):
def test_ml_event_payload_inference_event_outside_transaction(core_app):
def _test():
app = application()
record_ml_event("InferenceEvent", {"foo": "bar"}, application=app)
Expand All @@ -83,6 +154,59 @@ def _test():
core_app.harvest()


@validate_ml_event_payload(
{
"apm": [
{
"foo": "bar",
"real_agent_id": "1234567",
"event.domain": "newrelic.ml_events",
"event.name": "MyCustomEvent",
}
],
"inference": [
{
"foo": "bar",
"real_agent_id": "1234567",
"event.domain": "newrelic.ml_events",
"event.name": "InferenceEvent",
}
],
}
)
@reset_core_stats_engine()
def test_ml_event_payload_both_events_outside_transaction(core_app):
def _test():
app = application()
record_ml_event("InferenceEvent", {"foo": "bar"}, application=app)
record_ml_event("MyCustomEvent", {"foo": "bar"}, application=app)

_test()
core_app.harvest()


@validate_ml_event_payload(
{
"apm": [
{
"foo": "bar",
"real_agent_id": "1234567",
"event.domain": "newrelic.ml_events",
"event.name": "MyCustomEvent",
}
]
}
)
@reset_core_stats_engine()
def test_ml_event_payload_noninference_event_outside_transaction(core_app):
def _test():
app = application()
record_ml_event("MyCustomEvent", {"foo": "bar"}, application=app)

_test()
core_app.harvest()


@pytest.mark.parametrize(
"params,expected",
[
Expand Down Expand Up @@ -151,6 +275,7 @@ def test_record_ml_event_outside_transaction_params_not_a_dict():

# Tests for ML Events configuration settings


@override_application_settings({"ml_insights_events.enabled": False})
@reset_core_stats_engine()
@validate_ml_event_count(count=0)
Expand Down
82 changes: 55 additions & 27 deletions tests/testing_support/validators/validate_ml_event_payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,36 @@ def payload_to_ml_events(payload):
else:
message = payload

resource_logs = message.get("resource_logs")
assert len(resource_logs) == 1
resource_logs = resource_logs[0]
resource = resource_logs.get("resource")
assert resource and resource.get("attributes")[0] == {
"key": "instrumentation.provider",
"value": {"string_value": "newrelic-opentelemetry-python-ml"},
}
scope_logs = resource_logs.get("scope_logs")
assert len(scope_logs) == 1
scope_logs = scope_logs[0]

scope = scope_logs.get("scope")
assert scope is None
logs = scope_logs.get("log_records")

return logs
inference_logs = []
apm_logs = []
resource_log_records = message.get("resource_logs")
for resource_logs in resource_log_records:
resource = resource_logs.get("resource")
assert resource
resource_attrs = resource.get("attributes")
assert {
"key": "instrumentation.provider",
"value": {"string_value": "newrelic-opentelemetry-python-ml"},
} in resource_attrs
scope_logs = resource_logs.get("scope_logs")
assert len(scope_logs) == 1
scope_logs = scope_logs[0]

scope = scope_logs.get("scope")
assert scope is None
logs = scope_logs.get("log_records")
event_name = get_event_name(logs)
if event_name == "InferenceEvent":
inference_logs = logs
else:
# Make sure apm entity attrs are present on the resource.
expected_apm_keys = ("entity.type", "entity.name", "entity.guid", "hostname", "instrumentation.provider")
assert all(attr["key"] in expected_apm_keys for attr in resource_attrs)
assert all(attr["value"] not in ("", None) for attr in resource_attrs)

apm_logs = logs

return inference_logs, apm_logs


def validate_ml_event_payload(ml_events=None):
Expand Down Expand Up @@ -86,19 +99,34 @@ def _bind_params(method, payload=(), *args, **kwargs):
assert recorded_ml_events

decoded_payloads = [payload_to_ml_events(payload) for payload in recorded_ml_events]
all_logs = []
for sent_logs in decoded_payloads:
for data_point in sent_logs:
for key in ("time_unix_nano",):
assert key in data_point, "Invalid log format. Missing key: %s" % key
decoded_inference_payloads = [payload[0] for payload in decoded_payloads]
decoded_apm_payloads = [payload[1] for payload in decoded_payloads]
all_apm_logs = normalize_logs(decoded_apm_payloads)
all_inference_logs = normalize_logs(decoded_inference_payloads)

for expected_event in ml_events.get("inference", []):
assert expected_event in all_inference_logs, "%s Not Found. Got: %s" % (expected_event, all_inference_logs)

for expected_event in ml_events.get("apm", []):
assert expected_event in all_apm_logs, "%s Not Found. Got: %s" % (expected_event, all_apm_logs)
return val

return _validate_wrapper


def normalize_logs(decoded_payloads):
all_logs = []
for sent_logs in decoded_payloads:
for data_point in sent_logs:
for key in ("time_unix_nano",):
assert key in data_point, "Invalid log format. Missing key: %s" % key
all_logs.append(
{attr["key"]: attribute_to_value(attr["value"]) for attr in (data_point.get("attributes") or [])}
)
return all_logs

for expected_event in ml_events:
assert expected_event in all_logs, "%s Not Found. Got: %s" % (expected_event, all_logs)

return val

return _validate_wrapper
def get_event_name(logs):
for attr in logs[0]["attributes"]:
if attr["key"] == "event.name":
return attr["value"]["string_value"]