diff --git a/newrelic/core/otlp_utils.py b/newrelic/core/otlp_utils.py index e78a63603e..0719fed33c 100644 --- a/newrelic/core/otlp_utils.py +++ b/newrelic/core/otlp_utils.py @@ -21,6 +21,7 @@ import logging +from newrelic.api.time_trace import get_service_linking_metadata from newrelic.common.encoding_utils import json_encode from newrelic.core.config import global_settings from newrelic.core.stats_engine import CountStats, TimeStats @@ -124,8 +125,11 @@ def create_key_values_from_iterable(iterable): ) -def create_resource(attributes=None): +def create_resource(attributes=None, attach_apm_entity=True): attributes = attributes or {"instrumentation.provider": "newrelic-opentelemetry-python-ml"} + if attach_apm_entity: + metadata = get_service_linking_metadata() + attributes.update(metadata) return Resource(attributes=create_key_values_from_iterable(attributes)) @@ -203,7 +207,7 @@ def stats_to_otlp_metrics(metric_data, start_time, end_time): def encode_metric_data(metric_data, start_time, end_time, resource=None, scope=None): - resource = resource or create_resource() + resource = resource or create_resource(attach_apm_entity=False) return MetricsData( resource_metrics=[ ResourceMetrics( @@ -220,24 +224,45 @@ def encode_metric_data(metric_data, start_time, end_time, resource=None, scope=N def encode_ml_event_data(custom_event_data, agent_run_id): - resource = create_resource() - ml_events = [] + # An InferenceEvent is attached to a separate ML Model entity instead + # of the APM entity. + ml_inference_events = [] + ml_apm_events = [] for event in custom_event_data: event_info, event_attrs = event + event_type = event_info["type"] event_attrs.update( { "real_agent_id": agent_run_id, "event.domain": "newrelic.ml_events", - "event.name": event_info["type"], + "event.name": event_type, } ) ml_attrs = create_key_values_from_iterable(event_attrs) unix_nano_timestamp = event_info["timestamp"] * 1e6 - ml_events.append( - { - "time_unix_nano": int(unix_nano_timestamp), - "attributes": ml_attrs, - } + if event_type == "InferenceEvent": + ml_inference_events.append( + { + "time_unix_nano": int(unix_nano_timestamp), + "attributes": ml_attrs, + } + ) + else: + ml_apm_events.append( + { + "time_unix_nano": int(unix_nano_timestamp), + "attributes": ml_attrs, + } + ) + + resource_logs = [] + if ml_inference_events: + inference_resource = create_resource(attach_apm_entity=False) + resource_logs.append( + ResourceLogs(resource=inference_resource, scope_logs=[ScopeLogs(log_records=ml_inference_events)]) ) + if ml_apm_events: + apm_resource = create_resource() + resource_logs.append(ResourceLogs(resource=apm_resource, scope_logs=[ScopeLogs(log_records=ml_apm_events)])) - return LogsData(resource_logs=[ResourceLogs(resource=resource, scope_logs=[ScopeLogs(log_records=ml_events)])]) + return LogsData(resource_logs=resource_logs) diff --git a/tests/agent_features/test_ml_events.py b/tests/agent_features/test_ml_events.py index 5720224bbe..c99b1b3e71 100644 --- a/tests/agent_features/test_ml_events.py +++ b/tests/agent_features/test_ml_events.py @@ -58,23 +58,94 @@ def core_app(collector_agent_registration): @validate_ml_event_payload( - [{"foo": "bar", "real_agent_id": "1234567", "event.domain": "newrelic.ml_events", "event.name": "InferenceEvent"}] + { + "apm": [ + { + "foo": "bar", + "real_agent_id": "1234567", + "event.domain": "newrelic.ml_events", + "event.name": "MyCustomEvent", + } + ] + } ) @reset_core_stats_engine() -def test_ml_event_payload_inside_transaction(core_app): +def test_ml_event_payload_noninference_event_inside_transaction(core_app): + @background_task(name="test_ml_event_payload_inside_transaction") + def _test(): + record_ml_event("MyCustomEvent", {"foo": "bar"}) + + _test() + core_app.harvest() + + +@validate_ml_event_payload( + { + "inference": [ + { + "foo": "bar", + "real_agent_id": "1234567", + "event.domain": "newrelic.ml_events", + "event.name": "InferenceEvent", + } + ] + } +) +@reset_core_stats_engine() +def test_ml_event_payload_inference_event_inside_transaction(core_app): + @background_task(name="test_ml_event_payload_inside_transaction") + def _test(): + record_ml_event("InferenceEvent", {"foo": "bar"}) + + _test() + core_app.harvest() + + +@validate_ml_event_payload( + { + "apm": [ + { + "foo": "bar", + "real_agent_id": "1234567", + "event.domain": "newrelic.ml_events", + "event.name": "MyCustomEvent", + } + ], + "inference": [ + { + "foo": "bar", + "real_agent_id": "1234567", + "event.domain": "newrelic.ml_events", + "event.name": "InferenceEvent", + } + ], + } +) +@reset_core_stats_engine() +def test_ml_event_payload_both_events_inside_transaction(core_app): @background_task(name="test_ml_event_payload_inside_transaction") def _test(): record_ml_event("InferenceEvent", {"foo": "bar"}) + record_ml_event("MyCustomEvent", {"foo": "bar"}) _test() core_app.harvest() @validate_ml_event_payload( - [{"foo": "bar", "real_agent_id": "1234567", "event.domain": "newrelic.ml_events", "event.name": "InferenceEvent"}] + { + "inference": [ + { + "foo": "bar", + "real_agent_id": "1234567", + "event.domain": "newrelic.ml_events", + "event.name": "InferenceEvent", + } + ] + } ) @reset_core_stats_engine() -def test_ml_event_payload_outside_transaction(core_app): +def test_ml_event_payload_inference_event_outside_transaction(core_app): def _test(): app = application() record_ml_event("InferenceEvent", {"foo": "bar"}, application=app) @@ -83,6 +154,59 @@ def _test(): core_app.harvest() +@validate_ml_event_payload( + { + "apm": [ + { + "foo": "bar", + "real_agent_id": "1234567", + "event.domain": "newrelic.ml_events", + "event.name": "MyCustomEvent", + } + ], + "inference": [ + { + "foo": "bar", + "real_agent_id": "1234567", + "event.domain": "newrelic.ml_events", + "event.name": "InferenceEvent", + } + ], + } +) +@reset_core_stats_engine() +def test_ml_event_payload_both_events_outside_transaction(core_app): + def _test(): + app = application() + record_ml_event("InferenceEvent", {"foo": "bar"}, application=app) + record_ml_event("MyCustomEvent", {"foo": "bar"}, application=app) + + _test() + core_app.harvest() + + +@validate_ml_event_payload( + { + "apm": [ + { + "foo": "bar", + "real_agent_id": "1234567", + "event.domain": "newrelic.ml_events", + "event.name": "MyCustomEvent", + } + ] + } +) +@reset_core_stats_engine() +def test_ml_event_payload_noninference_event_outside_transaction(core_app): + def _test(): + app = application() + record_ml_event("MyCustomEvent", {"foo": "bar"}, application=app) + + _test() + core_app.harvest() + + @pytest.mark.parametrize( "params,expected", [ @@ -151,6 +275,7 @@ def test_record_ml_event_outside_transaction_params_not_a_dict(): # Tests for ML Events configuration settings + @override_application_settings({"ml_insights_events.enabled": False}) @reset_core_stats_engine() @validate_ml_event_count(count=0) diff --git a/tests/testing_support/validators/validate_ml_event_payload.py b/tests/testing_support/validators/validate_ml_event_payload.py index 4d43cbb22e..9933b85f6d 100644 --- a/tests/testing_support/validators/validate_ml_event_payload.py +++ b/tests/testing_support/validators/validate_ml_event_payload.py @@ -41,23 +41,36 @@ def payload_to_ml_events(payload): else: message = payload - resource_logs = message.get("resource_logs") - assert len(resource_logs) == 1 - resource_logs = resource_logs[0] - resource = resource_logs.get("resource") - assert resource and resource.get("attributes")[0] == { - "key": "instrumentation.provider", - "value": {"string_value": "newrelic-opentelemetry-python-ml"}, - } - scope_logs = resource_logs.get("scope_logs") - assert len(scope_logs) == 1 - scope_logs = scope_logs[0] - - scope = scope_logs.get("scope") - assert scope is None - logs = scope_logs.get("log_records") - - return logs + inference_logs = [] + apm_logs = [] + resource_log_records = message.get("resource_logs") + for resource_logs in resource_log_records: + resource = resource_logs.get("resource") + assert resource + resource_attrs = resource.get("attributes") + assert { + "key": "instrumentation.provider", + "value": {"string_value": "newrelic-opentelemetry-python-ml"}, + } in resource_attrs + scope_logs = resource_logs.get("scope_logs") + assert len(scope_logs) == 1 + scope_logs = scope_logs[0] + + scope = scope_logs.get("scope") + assert scope is None + logs = scope_logs.get("log_records") + event_name = get_event_name(logs) + if event_name == "InferenceEvent": + inference_logs = logs + else: + # Make sure apm entity attrs are present on the resource. + expected_apm_keys = ("entity.type", "entity.name", "entity.guid", "hostname", "instrumentation.provider") + assert all(attr["key"] in expected_apm_keys for attr in resource_attrs) + assert all(attr["value"] not in ("", None) for attr in resource_attrs) + + apm_logs = logs + + return inference_logs, apm_logs def validate_ml_event_payload(ml_events=None): @@ -86,19 +99,34 @@ def _bind_params(method, payload=(), *args, **kwargs): assert recorded_ml_events decoded_payloads = [payload_to_ml_events(payload) for payload in recorded_ml_events] - all_logs = [] - for sent_logs in decoded_payloads: - for data_point in sent_logs: - for key in ("time_unix_nano",): - assert key in data_point, "Invalid log format. Missing key: %s" % key + decoded_inference_payloads = [payload[0] for payload in decoded_payloads] + decoded_apm_payloads = [payload[1] for payload in decoded_payloads] + all_apm_logs = normalize_logs(decoded_apm_payloads) + all_inference_logs = normalize_logs(decoded_inference_payloads) + + for expected_event in ml_events.get("inference", []): + assert expected_event in all_inference_logs, "%s Not Found. Got: %s" % (expected_event, all_inference_logs) + for expected_event in ml_events.get("apm", []): + assert expected_event in all_apm_logs, "%s Not Found. Got: %s" % (expected_event, all_apm_logs) + return val + + return _validate_wrapper + + +def normalize_logs(decoded_payloads): + all_logs = [] + for sent_logs in decoded_payloads: + for data_point in sent_logs: + for key in ("time_unix_nano",): + assert key in data_point, "Invalid log format. Missing key: %s" % key all_logs.append( {attr["key"]: attribute_to_value(attr["value"]) for attr in (data_point.get("attributes") or [])} ) + return all_logs - for expected_event in ml_events: - assert expected_event in all_logs, "%s Not Found. Got: %s" % (expected_event, all_logs) - return val - - return _validate_wrapper +def get_event_name(logs): + for attr in logs[0]["attributes"]: + if attr["key"] == "event.name": + return attr["value"]["string_value"]