From d526b14708d94d402888e8f587e0027f05768c33 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 16 Apr 2025 07:16:26 -0400 Subject: [PATCH 01/18] fix: stringify otel attributes of array types --- src/otel/metrics.rs | 34 +++++++------ src/otel/otel_utils.rs | 105 ++++++++++++++++++++++++++++------------- 2 files changed, 91 insertions(+), 48 deletions(-) diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index f35a7f88a..686b95423 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -193,25 +193,31 @@ fn flatten_histogram(histogram: &Histogram) -> Vec> { Value::Number(data_point.count.into()), ); insert_number_if_some(&mut data_point_json, "data_point_sum", &data_point.sum); + let data_point_bucket_counts = Value::Array( + data_point + .bucket_counts + .iter() + .map(|&count| Value::Number(count.into())) + .collect(), + ); + let data_point_bucket_counts_string = + serde_json::to_string(&data_point_bucket_counts).unwrap(); data_point_json.insert( "data_point_bucket_counts".to_string(), - Value::Array( - data_point - .bucket_counts - .iter() - .map(|&count| Value::Number(count.into())) - .collect(), - ), + Value::String(data_point_bucket_counts_string), ); + let data_point_explicit_bounds = Value::Array( + data_point + .explicit_bounds + .iter() + .map(|bound| Value::Number(serde_json::Number::from_f64(*bound).unwrap())) + .collect(), + ); + let data_point_explicit_bounds_string = + serde_json::to_string(&data_point_explicit_bounds).unwrap(); data_point_json.insert( "data_point_explicit_bounds".to_string(), - Value::Array( - data_point - .explicit_bounds - .iter() - .map(|bound| Value::String(bound.to_string())) - .collect(), - ), + Value::String(data_point_explicit_bounds_string), ); let exemplar_json = flatten_exemplar(&data_point.exemplars); for (key, value) in exemplar_json { diff --git a/src/otel/otel_utils.rs b/src/otel/otel_utils.rs index 4eb1fa2a6..99e27f303 100644 --- a/src/otel/otel_utils.rs +++ b/src/otel/otel_utils.rs @@ -17,7 +17,9 @@ */ use chrono::DateTime; -use opentelemetry_proto::tonic::common::v1::{any_value::Value as OtelValue, AnyValue, KeyValue}; +use opentelemetry_proto::tonic::common::v1::{ + any_value::Value as OtelValue, AnyValue, ArrayValue, KeyValue, KeyValueList, +}; use serde_json::{Map, Value}; // Value can be one of types - String, Bool, Int, Double, ArrayValue, AnyValue, KeyValueList, Byte @@ -39,40 +41,17 @@ pub fn collect_json_from_value(key: &String, value: OtelValue) -> Map { - let values = &array_val.values; - for value in values { - let array_value_json = collect_json_from_anyvalue(key, value.clone()); - for key in array_value_json.keys() { - value_json.insert( - format!( - "{}_{}", - key.to_owned(), - value_to_string(array_value_json[key].to_owned()) - ), - array_value_json[key].to_owned(), - ); - } - } + let json_array_value = collect_json_from_array_value(array_val); + // Convert the array to a JSON string + let json_array_string = serde_json::to_string(&json_array_value).unwrap(); + // Insert the array into the result map + value_json.insert(key.to_string(), Value::String(json_array_string)); } OtelValue::KvlistValue(kv_list_val) => { - for key_value in kv_list_val.values { - let value = key_value.value; - if value.is_some() { - let value = value.unwrap(); - let key_value_json = collect_json_from_anyvalue(key, value.clone()); - - for key in key_value_json.keys() { - value_json.insert( - format!( - "{}_{}_{}", - key.to_owned(), - key_value.key, - value_to_string(key_value_json[key].to_owned()) - ), - key_value_json[key].to_owned(), - ); - } - } + // Create a JSON object to store the key-value list + let kv_object = collect_json_from_key_value_list(kv_list_val); + for (key, value) in kv_object.iter() { + value_json.insert(key.clone(), value.clone()); } } OtelValue::BytesValue(bytes_val) => { @@ -86,6 +65,52 @@ pub fn collect_json_from_value(key: &String, value: OtelValue) -> Map Value { + let mut json_array = Vec::new(); + for value in array_value.values { + if let Some(val) = &value.value { + match val { + OtelValue::StringValue(s) => json_array.push(Value::String(s.clone())), + OtelValue::BoolValue(b) => json_array.push(Value::Bool(*b)), + OtelValue::IntValue(i) => { + json_array.push(Value::Number(serde_json::Number::from(*i))) + } + OtelValue::DoubleValue(d) => { + if let Some(n) = serde_json::Number::from_f64(*d) { + json_array.push(Value::Number(n)); + } + } + OtelValue::BytesValue(b) => { + json_array.push(Value::String(String::from_utf8_lossy(b).to_string())); + } + OtelValue::ArrayValue(arr) => { + // Recursively collect JSON from nested array values + let nested_json = collect_json_from_array_value(arr.clone()); + json_array.push(nested_json); + } + OtelValue::KvlistValue(kv_list) => { + // Recursively collect JSON from nested key-value lists + let nested_json = collect_json_from_key_value_list(kv_list.clone()); + json_array.push(Value::Object(nested_json)); + } + } + } + } + Value::Array(json_array) +} + +fn collect_json_from_key_value_list(key_value_list: KeyValueList) -> Map { + let mut kv_list_json: Map = Map::new(); + for key_value in key_value_list.values { + if let Some(val) = key_value.value { + let val = val.value.unwrap(); + let json_value = collect_json_from_value(&key_value.key, val); + kv_list_json.extend(json_value); + } + } + kv_list_json +} + pub fn collect_json_from_anyvalue(key: &String, value: AnyValue) -> Map { collect_json_from_value(key, value.value.unwrap()) } @@ -142,11 +167,23 @@ pub fn insert_bool_if_some(map: &mut Map, key: &str, option: &Opt } } -pub fn insert_attributes(map: &mut Map, attributes: &Vec) { +pub fn insert_attributes( + map: &mut Map, + attributes: &Vec, +) -> Map { let attributes_json = flatten_attributes(attributes); for (key, value) in attributes_json { map.insert(key, value); } + + let attributes_map = map.clone(); + if attributes_map.contains_key("process.command_args") { + println!( + "attributes value in attributes_map: {:?}", + attributes_map["process.command_args"] + ); + } + attributes_map } pub fn convert_epoch_nano_to_timestamp(epoch_ns: i64) -> String { From f81c59eb2c6d75c097ef9dd063789e6dbaa1f2d1 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 16 Apr 2025 07:33:34 -0400 Subject: [PATCH 02/18] refactor --- src/handlers/http/cluster/mod.rs | 12 +++++------- src/otel/otel_utils.rs | 14 +------------- 2 files changed, 6 insertions(+), 20 deletions(-) diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 038a1654b..c1cd46b6c 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -736,13 +736,11 @@ pub async fn get_node_info( ) .await? .iter() - .filter_map(|x| { - match serde_json::from_slice::(x) { - Ok(val) => Some(val), - Err(e) => { - error!("Failed to parse node metadata: {:?}", e); - None - } + .filter_map(|x| match serde_json::from_slice::(x) { + Ok(val) => Some(val), + Err(e) => { + error!("Failed to parse node metadata: {:?}", e); + None } }) .collect(); diff --git a/src/otel/otel_utils.rs b/src/otel/otel_utils.rs index 99e27f303..0c678ff7f 100644 --- a/src/otel/otel_utils.rs +++ b/src/otel/otel_utils.rs @@ -167,23 +167,11 @@ pub fn insert_bool_if_some(map: &mut Map, key: &str, option: &Opt } } -pub fn insert_attributes( - map: &mut Map, - attributes: &Vec, -) -> Map { +pub fn insert_attributes(map: &mut Map, attributes: &Vec) { let attributes_json = flatten_attributes(attributes); for (key, value) in attributes_json { map.insert(key, value); } - - let attributes_map = map.clone(); - if attributes_map.contains_key("process.command_args") { - println!( - "attributes value in attributes_map: {:?}", - attributes_map["process.command_args"] - ); - } - attributes_map } pub fn convert_epoch_nano_to_timestamp(epoch_ns: i64) -> String { From fefcd34422fbb3048b1366e003b7db89bdc5cab6 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 17 Apr 2025 02:37:42 -0400 Subject: [PATCH 03/18] revert converting array to string for otel metrics --- src/otel/metrics.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index 686b95423..519fb7279 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -200,11 +200,9 @@ fn flatten_histogram(histogram: &Histogram) -> Vec> { .map(|&count| Value::Number(count.into())) .collect(), ); - let data_point_bucket_counts_string = - serde_json::to_string(&data_point_bucket_counts).unwrap(); data_point_json.insert( "data_point_bucket_counts".to_string(), - Value::String(data_point_bucket_counts_string), + data_point_bucket_counts, ); let data_point_explicit_bounds = Value::Array( data_point @@ -213,11 +211,9 @@ fn flatten_histogram(histogram: &Histogram) -> Vec> { .map(|bound| Value::Number(serde_json::Number::from_f64(*bound).unwrap())) .collect(), ); - let data_point_explicit_bounds_string = - serde_json::to_string(&data_point_explicit_bounds).unwrap(); data_point_json.insert( "data_point_explicit_bounds".to_string(), - Value::String(data_point_explicit_bounds_string), + data_point_explicit_bounds, ); let exemplar_json = flatten_exemplar(&data_point.exemplars); for (key, value) in exemplar_json { From fa3dee32d636b6fb377d69a33dc71afd4d88d2b0 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 17 Apr 2025 08:19:38 -0400 Subject: [PATCH 04/18] add metric type --- src/otel/metrics.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index 519fb7279..c21055e4d 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -82,6 +82,7 @@ fn flatten_exemplar(exemplars: &[Exemplar]) -> Map { /// and returns a `Vec` of `Map` of the flattened json /// this function is reused in all json objects that have number data points fn flatten_number_data_points(data_points: &[NumberDataPoint]) -> Vec> { + println!("data points: {:?}", data_points); data_points .iter() .map(|data_point| { @@ -392,21 +393,26 @@ fn flatten_summary(summary: &Summary) -> Vec> { pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec> { let mut data_points_json = Vec::new(); let mut metric_json = Map::new(); - + let mut metric_type = String::default(); match &metrics_record.data { Some(metric::Data::Gauge(gauge)) => { + metric_type = "gauge".to_string(); data_points_json.extend(flatten_gauge(gauge)); } Some(metric::Data::Sum(sum)) => { + metric_type = "sum".to_string(); data_points_json.extend(flatten_sum(sum)); } Some(metric::Data::Histogram(histogram)) => { + metric_type = "histogram".to_string(); data_points_json.extend(flatten_histogram(histogram)); } Some(metric::Data::ExponentialHistogram(exp_histogram)) => { + metric_type = "exponential_histogram".to_string(); data_points_json.extend(flatten_exp_histogram(exp_histogram)); } Some(metric::Data::Summary(summary)) => { + metric_type = "summary".to_string(); data_points_json.extend(flatten_summary(summary)); } None => {} @@ -423,6 +429,7 @@ pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec "metric_unit".to_string(), Value::String(metrics_record.unit.clone()), ); + metric_json.insert("metric_type".to_string(), Value::String(metric_type)); insert_attributes(&mut metric_json, &metrics_record.metadata); for data_point_json in &mut data_points_json { for (key, value) in &metric_json { From ff80dd8b5828f71cd30066b1250ac7e37422651d Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 17 Apr 2025 08:22:36 -0400 Subject: [PATCH 05/18] remove print --- src/otel/metrics.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index c21055e4d..82939d28f 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -82,7 +82,6 @@ fn flatten_exemplar(exemplars: &[Exemplar]) -> Map { /// and returns a `Vec` of `Map` of the flattened json /// this function is reused in all json objects that have number data points fn flatten_number_data_points(data_points: &[NumberDataPoint]) -> Vec> { - println!("data points: {:?}", data_points); data_points .iter() .map(|data_point| { From dcce86f8fec00c423308dd421568b8ad33f30429 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sat, 19 Apr 2025 05:17:31 -0400 Subject: [PATCH 06/18] combine attributes to one field --- src/otel/logs.rs | 39 +++++++++++--- src/otel/metrics.rs | 115 +++++++++++++++++++++++++++++++---------- src/otel/otel_utils.rs | 24 +++++++-- src/otel/traces.rs | 57 ++++++++++++++------ 4 files changed, 181 insertions(+), 54 deletions(-) diff --git a/src/otel/logs.rs b/src/otel/logs.rs index cc7fe327f..b43bc859f 100644 --- a/src/otel/logs.rs +++ b/src/otel/logs.rs @@ -35,6 +35,7 @@ pub const OTEL_LOG_KNOWN_FIELD_LIST: [&str; 6] = [ "span_id", "trace_id", ]; + /// otel log event has severity number /// there is a mapping of severity number to severity text provided in proto /// this function fetches the severity text from the severity number @@ -56,7 +57,10 @@ fn flatten_severity(severity_number: i32) -> Map { /// this function flattens the `LogRecord` object /// and returns a `Map` of the flattened json /// this function is called recursively for each log record object in the otel logs -pub fn flatten_log_record(log_record: &LogRecord) -> Map { +pub fn flatten_log_record( + log_record: &LogRecord, + other_attributes: &mut Map, +) -> Map { let mut log_record_json: Map = Map::new(); log_record_json.insert( "time_unix_nano".to_string(), @@ -80,7 +84,11 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map { log_record_json.insert(key.to_owned(), body_json[key].to_owned()); } } - insert_attributes(&mut log_record_json, &log_record.attributes); + insert_attributes( + &mut log_record_json, + &log_record.attributes, + other_attributes, + ); log_record_json.insert( "log_record_dropped_attributes_count".to_string(), Value::Number(log_record.dropped_attributes_count.into()), @@ -104,7 +112,10 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map { /// this function flattens the `ScopeLogs` object /// and returns a `Vec` of `Map` of the flattened json -fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec> { +fn flatten_scope_log( + scope_log: &ScopeLogs, + other_attributes: &mut Map, +) -> Vec> { let mut vec_scope_log_json = Vec::new(); let mut scope_log_json = Map::new(); @@ -114,7 +125,7 @@ fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec> { "scope_version".to_string(), Value::String(scope.version.clone()), ); - insert_attributes(&mut scope_log_json, &scope.attributes); + insert_attributes(&mut scope_log_json, &scope.attributes, other_attributes); scope_log_json.insert( "scope_dropped_attributes_count".to_string(), Value::Number(scope.dropped_attributes_count.into()), @@ -126,7 +137,7 @@ fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec> { ); for log_record in &scope_log.log_records { - let log_record_json = flatten_log_record(log_record); + let log_record_json = flatten_log_record(log_record, other_attributes); let mut combined_json = scope_log_json.clone(); combined_json.extend(log_record_json); vec_scope_log_json.push(combined_json); @@ -139,11 +150,16 @@ fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec> { /// and returns a `Vec` of `Value::Object` of the flattened json pub fn flatten_otel_logs(message: &LogsData) -> Vec { let mut vec_otel_json = Vec::new(); + let mut other_attributes = Map::new(); for record in &message.resource_logs { let mut resource_log_json = Map::new(); if let Some(resource) = &record.resource { - insert_attributes(&mut resource_log_json, &resource.attributes); + insert_attributes( + &mut resource_log_json, + &resource.attributes, + &mut other_attributes, + ); resource_log_json.insert( "resource_dropped_attributes_count".to_string(), Value::Number(resource.dropped_attributes_count.into()), @@ -152,7 +168,7 @@ pub fn flatten_otel_logs(message: &LogsData) -> Vec { let mut vec_resource_logs_json = Vec::new(); for scope_log in &record.scope_logs { - vec_resource_logs_json.extend(flatten_scope_log(scope_log)); + vec_resource_logs_json.extend(flatten_scope_log(scope_log, &mut other_attributes)); } resource_log_json.insert( "schema_url".to_string(), @@ -165,6 +181,13 @@ pub fn flatten_otel_logs(message: &LogsData) -> Vec { vec_otel_json.extend(vec_resource_logs_json); } - + // Add common attributes as one attribute in stringified array to each log record + for log_record_json in &mut vec_otel_json { + let other_attributes = serde_json::to_string(&other_attributes).unwrap(); + log_record_json.insert( + "other_attributes".to_string(), + Value::String(other_attributes), + ); + } vec_otel_json.into_iter().map(Value::Object).collect() } diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index 82939d28f..94aef5430 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -39,10 +39,17 @@ pub const OTEL_METRICS_KNOWN_FIELD_LIST: [&str; 5] = [ /// this function flatten the exemplar json array /// and returns a `Map` of the exemplar json /// this function is reused in all json objects that have exemplar -fn flatten_exemplar(exemplars: &[Exemplar]) -> Map { +fn flatten_exemplar( + exemplars: &[Exemplar], + other_attributes: &mut Map, +) -> Map { let mut exemplar_json = Map::new(); for exemplar in exemplars { - insert_attributes(&mut exemplar_json, &exemplar.filtered_attributes); + insert_attributes( + &mut exemplar_json, + &exemplar.filtered_attributes, + other_attributes, + ); exemplar_json.insert( "exemplar_time_unix_nano".to_string(), Value::String(convert_epoch_nano_to_timestamp( @@ -81,12 +88,19 @@ fn flatten_exemplar(exemplars: &[Exemplar]) -> Map { /// this function flatten the number data points json array /// and returns a `Vec` of `Map` of the flattened json /// this function is reused in all json objects that have number data points -fn flatten_number_data_points(data_points: &[NumberDataPoint]) -> Vec> { +fn flatten_number_data_points( + data_points: &[NumberDataPoint], + other_attributes: &mut Map, +) -> Vec> { data_points .iter() .map(|data_point| { let mut data_point_json = Map::new(); - insert_attributes(&mut data_point_json, &data_point.attributes); + insert_attributes( + &mut data_point_json, + &data_point.attributes, + other_attributes, + ); data_point_json.insert( "start_time_unix_nano".to_string(), Value::String(convert_epoch_nano_to_timestamp( @@ -99,7 +113,7 @@ fn flatten_number_data_points(data_points: &[NumberDataPoint]) -> Vec Vec Vec> { +fn flatten_gauge( + gauge: &Gauge, + other_attributes: &mut Map, +) -> Vec> { let mut vec_gauge_json = Vec::new(); - let data_points_json = flatten_number_data_points(&gauge.data_points); + let data_points_json = flatten_number_data_points(&gauge.data_points, other_attributes); for data_point_json in data_points_json { let mut gauge_json = Map::new(); for (key, value) in &data_point_json { @@ -146,9 +163,9 @@ fn flatten_gauge(gauge: &Gauge) -> Vec> { /// each sum object has json array for data points /// this function flatten the sum json object /// and returns a `Vec` of `Map` for each data point -fn flatten_sum(sum: &Sum) -> Vec> { +fn flatten_sum(sum: &Sum, other_attributes: &mut Map) -> Vec> { let mut vec_sum_json = Vec::new(); - let data_points_json = flatten_number_data_points(&sum.data_points); + let data_points_json = flatten_number_data_points(&sum.data_points, other_attributes); for data_point_json in data_points_json { let mut sum_json = Map::new(); for (key, value) in &data_point_json { @@ -171,11 +188,18 @@ fn flatten_sum(sum: &Sum) -> Vec> { /// each histogram object has json array for data points /// this function flatten the histogram json object /// and returns a `Vec` of `Map` for each data point -fn flatten_histogram(histogram: &Histogram) -> Vec> { +fn flatten_histogram( + histogram: &Histogram, + other_attributes: &mut Map, +) -> Vec> { let mut data_points_json = Vec::new(); for data_point in &histogram.data_points { let mut data_point_json = Map::new(); - insert_attributes(&mut data_point_json, &data_point.attributes); + insert_attributes( + &mut data_point_json, + &data_point.attributes, + other_attributes, + ); data_point_json.insert( "start_time_unix_nano".to_string(), Value::String(convert_epoch_nano_to_timestamp( @@ -215,7 +239,7 @@ fn flatten_histogram(histogram: &Histogram) -> Vec> { "data_point_explicit_bounds".to_string(), data_point_explicit_bounds, ); - let exemplar_json = flatten_exemplar(&data_point.exemplars); + let exemplar_json = flatten_exemplar(&data_point.exemplars, other_attributes); for (key, value) in exemplar_json { data_point_json.insert(key.to_string(), value); } @@ -259,11 +283,18 @@ fn flatten_buckets(bucket: &Buckets) -> Map { /// each exponential histogram object has json array for data points /// this function flatten the exponential histogram json object /// and returns a `Vec` of `Map` for each data point -fn flatten_exp_histogram(exp_histogram: &ExponentialHistogram) -> Vec> { +fn flatten_exp_histogram( + exp_histogram: &ExponentialHistogram, + other_attributes: &mut Map, +) -> Vec> { let mut data_points_json = Vec::new(); for data_point in &exp_histogram.data_points { let mut data_point_json = Map::new(); - insert_attributes(&mut data_point_json, &data_point.attributes); + insert_attributes( + &mut data_point_json, + &data_point.attributes, + other_attributes, + ); data_point_json.insert( "start_time_unix_nano".to_string(), Value::String(convert_epoch_nano_to_timestamp( @@ -301,7 +332,7 @@ fn flatten_exp_histogram(exp_histogram: &ExponentialHistogram) -> Vec Vec Vec> { +fn flatten_summary( + summary: &Summary, + other_attributes: &mut Map, +) -> Vec> { let mut data_points_json = Vec::new(); for data_point in &summary.data_points { let mut data_point_json = Map::new(); - insert_attributes(&mut data_point_json, &data_point.attributes); + insert_attributes( + &mut data_point_json, + &data_point.attributes, + other_attributes, + ); data_point_json.insert( "start_time_unix_nano".to_string(), Value::String(convert_epoch_nano_to_timestamp( @@ -389,30 +427,33 @@ fn flatten_summary(summary: &Summary) -> Vec> { /// this function flatten the metric json object /// and returns a `Vec` of `Map` of the flattened json /// this function is called recursively for each metric record object in the otel metrics event -pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec> { +pub fn flatten_metrics_record( + metrics_record: &Metric, + other_attributes: &mut Map, +) -> Vec> { let mut data_points_json = Vec::new(); let mut metric_json = Map::new(); let mut metric_type = String::default(); match &metrics_record.data { Some(metric::Data::Gauge(gauge)) => { metric_type = "gauge".to_string(); - data_points_json.extend(flatten_gauge(gauge)); + data_points_json.extend(flatten_gauge(gauge, other_attributes)); } Some(metric::Data::Sum(sum)) => { metric_type = "sum".to_string(); - data_points_json.extend(flatten_sum(sum)); + data_points_json.extend(flatten_sum(sum, other_attributes)); } Some(metric::Data::Histogram(histogram)) => { metric_type = "histogram".to_string(); - data_points_json.extend(flatten_histogram(histogram)); + data_points_json.extend(flatten_histogram(histogram, other_attributes)); } Some(metric::Data::ExponentialHistogram(exp_histogram)) => { metric_type = "exponential_histogram".to_string(); - data_points_json.extend(flatten_exp_histogram(exp_histogram)); + data_points_json.extend(flatten_exp_histogram(exp_histogram, other_attributes)); } Some(metric::Data::Summary(summary)) => { metric_type = "summary".to_string(); - data_points_json.extend(flatten_summary(summary)); + data_points_json.extend(flatten_summary(summary, other_attributes)); } None => {} } @@ -429,7 +470,7 @@ pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec Value::String(metrics_record.unit.clone()), ); metric_json.insert("metric_type".to_string(), Value::String(metric_type)); - insert_attributes(&mut metric_json, &metrics_record.metadata); + insert_attributes(&mut metric_json, &metrics_record.metadata, other_attributes); for data_point_json in &mut data_points_json { for (key, value) in &metric_json { data_point_json.insert(key.clone(), value.clone()); @@ -445,10 +486,15 @@ pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec /// and returns a `Vec` of `Value::Object` of the flattened json pub fn flatten_otel_metrics(message: MetricsData) -> Vec { let mut vec_otel_json = Vec::new(); + let mut other_attributes = Map::new(); for record in &message.resource_metrics { let mut resource_metrics_json = Map::new(); if let Some(resource) = &record.resource { - insert_attributes(&mut resource_metrics_json, &resource.attributes); + insert_attributes( + &mut resource_metrics_json, + &resource.attributes, + &mut other_attributes, + ); resource_metrics_json.insert( "resource_dropped_attributes_count".to_string(), Value::Number(resource.dropped_attributes_count.into()), @@ -458,7 +504,10 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Vec { for scope_metric in &record.scope_metrics { let mut scope_metrics_json = Map::new(); for metrics_record in &scope_metric.metrics { - vec_scope_metrics_json.extend(flatten_metrics_record(metrics_record)); + vec_scope_metrics_json.extend(flatten_metrics_record( + metrics_record, + &mut other_attributes, + )); } if let Some(scope) = &scope_metric.scope { scope_metrics_json @@ -467,7 +516,11 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Vec { "scope_version".to_string(), Value::String(scope.version.clone()), ); - insert_attributes(&mut scope_metrics_json, &scope.attributes); + insert_attributes( + &mut scope_metrics_json, + &scope.attributes, + &mut other_attributes, + ); scope_metrics_json.insert( "scope_dropped_attributes_count".to_string(), Value::Number(scope.dropped_attributes_count.into()), @@ -495,6 +548,14 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Vec { } vec_otel_json.extend(vec_scope_metrics_json); } + // Add common attributes as one attribute in stringified array to each log record + for log_record_json in &mut vec_otel_json { + let other_attributes_json_string = serde_json::to_string(&other_attributes).unwrap(); + log_record_json.insert( + "other_attributes".to_string(), + Value::String(other_attributes_json_string), + ); + } vec_otel_json.into_iter().map(Value::Object).collect() } diff --git a/src/otel/otel_utils.rs b/src/otel/otel_utils.rs index 0c678ff7f..f547689c5 100644 --- a/src/otel/otel_utils.rs +++ b/src/otel/otel_utils.rs @@ -22,6 +22,8 @@ use opentelemetry_proto::tonic::common::v1::{ }; use serde_json::{Map, Value}; +const KNOWN_ATTRIBUTES_PREFIX: [&str; 3] = ["http", "url", "service"]; + // Value can be one of types - String, Bool, Int, Double, ArrayValue, AnyValue, KeyValueList, Byte pub fn collect_json_from_value(key: &String, value: OtelValue) -> Map { let mut value_json: Map = Map::new(); @@ -134,14 +136,24 @@ pub fn value_to_string(value: serde_json::Value) -> String { } } -pub fn flatten_attributes(attributes: &Vec) -> Map { +pub fn flatten_attributes( + attributes: &Vec, + other_attributes_json: &mut Map, +) -> Map { let mut attributes_json: Map = Map::new(); for attribute in attributes { let key = &attribute.key; let value = &attribute.value; let value_json = collect_json_from_values(value, &key.to_string()); for key in value_json.keys() { - attributes_json.insert(key.to_owned(), value_json[key].to_owned()); + if KNOWN_ATTRIBUTES_PREFIX + .iter() + .any(|prefix| key.starts_with(prefix)) + { + attributes_json.insert(key.to_owned(), value_json[key].to_owned()); + } else { + other_attributes_json.insert(key.to_owned(), value_json[key].to_owned()); + } } } attributes_json @@ -167,8 +179,12 @@ pub fn insert_bool_if_some(map: &mut Map, key: &str, option: &Opt } } -pub fn insert_attributes(map: &mut Map, attributes: &Vec) { - let attributes_json = flatten_attributes(attributes); +pub fn insert_attributes( + map: &mut Map, + attributes: &Vec, + other_attributes_json: &mut Map, +) { + let attributes_json = flatten_attributes(attributes, other_attributes_json); for (key, value) in attributes_json { map.insert(key, value); } diff --git a/src/otel/traces.rs b/src/otel/traces.rs index 596e788fc..ebd116eea 100644 --- a/src/otel/traces.rs +++ b/src/otel/traces.rs @@ -46,12 +46,15 @@ pub const OTEL_TRACES_KNOWN_FIELD_LIST: [&str; 15] = [ ]; /// this function flattens the `ScopeSpans` object /// and returns a `Vec` of `Map` of the flattened json -fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec> { +fn flatten_scope_span( + scope_span: &ScopeSpans, + other_attributes: &mut Map, +) -> Vec> { let mut vec_scope_span_json = Vec::new(); let mut scope_span_json = Map::new(); for span in &scope_span.spans { - let span_record_json = flatten_span_record(span); + let span_record_json = flatten_span_record(span, other_attributes); vec_scope_span_json.extend(span_record_json); } @@ -61,7 +64,7 @@ fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec> { "scope_version".to_string(), Value::String(scope.version.clone()), ); - insert_attributes(&mut scope_span_json, &scope.attributes); + insert_attributes(&mut scope_span_json, &scope.attributes, other_attributes); scope_span_json.insert( "scope_dropped_attributes_count".to_string(), Value::Number(scope.dropped_attributes_count.into()), @@ -88,12 +91,16 @@ fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec> { /// and returns a `Vec` of `Value::Object` of the flattened json pub fn flatten_otel_traces(message: &TracesData) -> Vec { let mut vec_otel_json = Vec::new(); - + let mut other_attributes = Map::new(); for record in &message.resource_spans { let mut resource_span_json = Map::new(); if let Some(resource) = &record.resource { - insert_attributes(&mut resource_span_json, &resource.attributes); + insert_attributes( + &mut resource_span_json, + &resource.attributes, + &mut other_attributes, + ); resource_span_json.insert( "resource_dropped_attributes_count".to_string(), Value::Number(resource.dropped_attributes_count.into()), @@ -102,7 +109,7 @@ pub fn flatten_otel_traces(message: &TracesData) -> Vec { let mut vec_resource_spans_json = Vec::new(); for scope_span in &record.scope_spans { - let scope_span_json = flatten_scope_span(scope_span); + let scope_span_json = flatten_scope_span(scope_span, &mut other_attributes); vec_resource_spans_json.extend(scope_span_json); } @@ -119,14 +126,24 @@ pub fn flatten_otel_traces(message: &TracesData) -> Vec { vec_otel_json.extend(vec_resource_spans_json); } - + // Add common attributes as one attribute in stringified array to each log record + for log_record_json in &mut vec_otel_json { + let other_attributes = serde_json::to_string(&other_attributes).unwrap(); + log_record_json.insert( + "other_attributes".to_string(), + Value::String(other_attributes), + ); + } vec_otel_json.into_iter().map(Value::Object).collect() } /// otel traces has json array of events /// this function flattens the `Event` object /// and returns a `Vec` of `Map` of the flattened json -fn flatten_events(events: &[Event]) -> Vec> { +fn flatten_events( + events: &[Event], + other_attributes: &mut Map, +) -> Vec> { events .iter() .map(|event| { @@ -138,7 +155,7 @@ fn flatten_events(events: &[Event]) -> Vec> { ), ); event_json.insert("event_name".to_string(), Value::String(event.name.clone())); - insert_attributes(&mut event_json, &event.attributes); + insert_attributes(&mut event_json, &event.attributes, other_attributes); event_json.insert( "event_dropped_attributes_count".to_string(), Value::Number(event.dropped_attributes_count.into()), @@ -151,7 +168,10 @@ fn flatten_events(events: &[Event]) -> Vec> { /// otel traces has json array of links /// this function flattens the `Link` object /// and returns a `Vec` of `Map` of the flattened json -fn flatten_links(links: &[Link]) -> Vec> { +fn flatten_links( + links: &[Link], + other_attributes: &mut Map, +) -> Vec> { links .iter() .map(|link| { @@ -165,7 +185,7 @@ fn flatten_links(links: &[Link]) -> Vec> { Value::String(hex::encode(&link.trace_id)), ); - insert_attributes(&mut link_json, &link.attributes); + insert_attributes(&mut link_json, &link.attributes, other_attributes); link_json.insert( "link_dropped_attributes_count".to_string(), Value::Number(link.dropped_attributes_count.into()), @@ -252,7 +272,10 @@ fn flatten_kind(kind: i32) -> Map { /// this function flattens the `Span` object /// and returns a `Vec` of `Map` of the flattened json /// this function is called recursively for each span record object in the otel traces event -fn flatten_span_record(span_record: &Span) -> Vec> { +fn flatten_span_record( + span_record: &Span, + other_attributes: &mut Map, +) -> Vec> { let mut span_records_json = Vec::new(); let mut span_record_json = Map::new(); @@ -290,17 +313,21 @@ fn flatten_span_record(span_record: &Span) -> Vec> { span_record.end_time_unix_nano as i64, )), ); - insert_attributes(&mut span_record_json, &span_record.attributes); + insert_attributes( + &mut span_record_json, + &span_record.attributes, + other_attributes, + ); span_record_json.insert( "span_dropped_attributes_count".to_string(), Value::Number(span_record.dropped_attributes_count.into()), ); - span_records_json.extend(flatten_events(&span_record.events)); + span_records_json.extend(flatten_events(&span_record.events, other_attributes)); span_record_json.insert( "span_dropped_events_count".to_string(), Value::Number(span_record.dropped_events_count.into()), ); - span_records_json.extend(flatten_links(&span_record.links)); + span_records_json.extend(flatten_links(&span_record.links, other_attributes)); span_record_json.insert( "span_dropped_links_count".to_string(), Value::Number(span_record.dropped_links_count.into()), From 2e2d5e1feb07be91c4531cdb95f82aed2d116c86 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sat, 19 Apr 2025 08:30:55 -0400 Subject: [PATCH 07/18] update known attribute list --- src/otel/logs.rs | 11 ++++++++--- src/otel/metrics.rs | 16 +++++++++++----- src/otel/otel_utils.rs | 2 +- src/otel/traces.rs | 16 +++++++++++----- 4 files changed, 31 insertions(+), 14 deletions(-) diff --git a/src/otel/logs.rs b/src/otel/logs.rs index b43bc859f..6a25c4435 100644 --- a/src/otel/logs.rs +++ b/src/otel/logs.rs @@ -35,7 +35,6 @@ pub const OTEL_LOG_KNOWN_FIELD_LIST: [&str; 6] = [ "span_id", "trace_id", ]; - /// otel log event has severity number /// there is a mapping of severity number to severity text provided in proto /// this function fetches the severity text from the severity number @@ -182,11 +181,17 @@ pub fn flatten_otel_logs(message: &LogsData) -> Vec { vec_otel_json.extend(vec_resource_logs_json); } // Add common attributes as one attribute in stringified array to each log record + let other_attributes = match serde_json::to_string(&other_attributes) { + Ok(s) => s, + Err(e) => { + tracing::warn!("failed to serialise OTEL other_attributes: {e}"); + String::new() + } + }; for log_record_json in &mut vec_otel_json { - let other_attributes = serde_json::to_string(&other_attributes).unwrap(); log_record_json.insert( "other_attributes".to_string(), - Value::String(other_attributes), + Value::String(other_attributes.clone()), ); } vec_otel_json.into_iter().map(Value::Object).collect() diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index 94aef5430..d382ff216 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -548,12 +548,18 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Vec { } vec_otel_json.extend(vec_scope_metrics_json); } - // Add common attributes as one attribute in stringified array to each log record - for log_record_json in &mut vec_otel_json { - let other_attributes_json_string = serde_json::to_string(&other_attributes).unwrap(); - log_record_json.insert( + // Add common attributes as one attribute in stringified array to each metric record + let other_attributes = match serde_json::to_string(&other_attributes) { + Ok(s) => s, + Err(e) => { + tracing::warn!("failed to serialise OTEL other_attributes: {e}"); + String::new() + } + }; + for metric_record_json in &mut vec_otel_json { + metric_record_json.insert( "other_attributes".to_string(), - Value::String(other_attributes_json_string), + Value::String(other_attributes.clone()), ); } vec_otel_json.into_iter().map(Value::Object).collect() diff --git a/src/otel/otel_utils.rs b/src/otel/otel_utils.rs index f547689c5..3dc7f43b5 100644 --- a/src/otel/otel_utils.rs +++ b/src/otel/otel_utils.rs @@ -22,7 +22,7 @@ use opentelemetry_proto::tonic::common::v1::{ }; use serde_json::{Map, Value}; -const KNOWN_ATTRIBUTES_PREFIX: [&str; 3] = ["http", "url", "service"]; +const KNOWN_ATTRIBUTES_PREFIX: [&str; 6] = ["http", "url", "service", "os", "host", "telemetry"]; // Value can be one of types - String, Bool, Int, Double, ArrayValue, AnyValue, KeyValueList, Byte pub fn collect_json_from_value(key: &String, value: OtelValue) -> Map { diff --git a/src/otel/traces.rs b/src/otel/traces.rs index ebd116eea..3d38a8186 100644 --- a/src/otel/traces.rs +++ b/src/otel/traces.rs @@ -126,12 +126,18 @@ pub fn flatten_otel_traces(message: &TracesData) -> Vec { vec_otel_json.extend(vec_resource_spans_json); } - // Add common attributes as one attribute in stringified array to each log record - for log_record_json in &mut vec_otel_json { - let other_attributes = serde_json::to_string(&other_attributes).unwrap(); - log_record_json.insert( + // Add common attributes as one attribute in stringified array to each span record + let other_attributes = match serde_json::to_string(&other_attributes) { + Ok(s) => s, + Err(e) => { + tracing::warn!("failed to serialise OTEL other_attributes: {e}"); + String::new() + } + }; + for span_record_json in &mut vec_otel_json { + span_record_json.insert( "other_attributes".to_string(), - Value::String(other_attributes), + Value::String(other_attributes.clone()), ); } vec_otel_json.into_iter().map(Value::Object).collect() From 1f30052e36535b5bb4889230503898d3fc4a5ba0 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sat, 19 Apr 2025 08:35:35 -0400 Subject: [PATCH 08/18] new to default --- src/otel/logs.rs | 2 +- src/otel/metrics.rs | 2 +- src/otel/traces.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/otel/logs.rs b/src/otel/logs.rs index 6a25c4435..9d758b320 100644 --- a/src/otel/logs.rs +++ b/src/otel/logs.rs @@ -185,7 +185,7 @@ pub fn flatten_otel_logs(message: &LogsData) -> Vec { Ok(s) => s, Err(e) => { tracing::warn!("failed to serialise OTEL other_attributes: {e}"); - String::new() + String::default() } }; for log_record_json in &mut vec_otel_json { diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index d382ff216..0a91a8bce 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -553,7 +553,7 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Vec { Ok(s) => s, Err(e) => { tracing::warn!("failed to serialise OTEL other_attributes: {e}"); - String::new() + String::default() } }; for metric_record_json in &mut vec_otel_json { diff --git a/src/otel/traces.rs b/src/otel/traces.rs index 3d38a8186..c77ea6ad8 100644 --- a/src/otel/traces.rs +++ b/src/otel/traces.rs @@ -131,7 +131,7 @@ pub fn flatten_otel_traces(message: &TracesData) -> Vec { Ok(s) => s, Err(e) => { tracing::warn!("failed to serialise OTEL other_attributes: {e}"); - String::new() + String::default() } }; for span_record_json in &mut vec_otel_json { From f0fc4f21291c5fc276b7fe0db06230619416ec65 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sat, 19 Apr 2025 09:18:47 -0400 Subject: [PATCH 09/18] refactor --- src/otel/metrics.rs | 137 +++++++++++++++++++++++++------------------- 1 file changed, 77 insertions(+), 60 deletions(-) diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index 0a91a8bce..c044fb592 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -42,46 +42,51 @@ pub const OTEL_METRICS_KNOWN_FIELD_LIST: [&str; 5] = [ fn flatten_exemplar( exemplars: &[Exemplar], other_attributes: &mut Map, -) -> Map { - let mut exemplar_json = Map::new(); - for exemplar in exemplars { - insert_attributes( - &mut exemplar_json, - &exemplar.filtered_attributes, - other_attributes, - ); - exemplar_json.insert( - "exemplar_time_unix_nano".to_string(), - Value::String(convert_epoch_nano_to_timestamp( - exemplar.time_unix_nano as i64, - )), - ); - exemplar_json.insert( - "exemplar_span_id".to_string(), - Value::String(hex::encode(&exemplar.span_id)), - ); - exemplar_json.insert( - "exemplar_trace_id".to_string(), - Value::String(hex::encode(&exemplar.trace_id)), - ); - if let Some(value) = &exemplar.value { - match value { - ExemplarValue::AsDouble(double_val) => { - exemplar_json.insert( - "exemplar_value".to_string(), - Value::Number(serde_json::Number::from_f64(*double_val).unwrap()), - ); - } - ExemplarValue::AsInt(int_val) => { - exemplar_json.insert( - "exemplar_value".to_string(), - Value::Number(serde_json::Number::from(*int_val)), - ); +) -> Vec> { + exemplars + .iter() + .map(|exemplar| { + let mut exemplar_json = Map::new(); + insert_attributes( + &mut exemplar_json, + &exemplar.filtered_attributes, + other_attributes, + ); + exemplar_json.insert( + "exemplar_time_unix_nano".to_string(), + Value::String(convert_epoch_nano_to_timestamp( + exemplar.time_unix_nano as i64, + )), + ); + exemplar_json.insert( + "exemplar_span_id".to_string(), + Value::String(hex::encode(&exemplar.span_id)), + ); + exemplar_json.insert( + "exemplar_trace_id".to_string(), + Value::String(hex::encode(&exemplar.trace_id)), + ); + if let Some(value) = &exemplar.value { + match value { + ExemplarValue::AsDouble(double_val) => { + exemplar_json.insert( + "exemplar_value".to_string(), + serde_json::Number::from_f64(*double_val) + .map(Value::Number) + .unwrap_or(Value::Null), + ); + } + ExemplarValue::AsInt(int_val) => { + exemplar_json.insert( + "exemplar_value".to_string(), + Value::Number(serde_json::Number::from(*int_val)), + ); + } } } - } - } - exemplar_json + exemplar_json + }) + .collect() } /// otel metrics event has json array for number data points @@ -113,17 +118,20 @@ fn flatten_number_data_points( data_point.time_unix_nano as i64, )), ); - let exemplar_json = flatten_exemplar(&data_point.exemplars, other_attributes); - for (key, value) in exemplar_json { - data_point_json.insert(key, value); - } + data_point_json.extend( + flatten_exemplar(&data_point.exemplars, other_attributes) + .into_iter() + .flatten(), + ); data_point_json.extend(flatten_data_point_flags(data_point.flags)); if let Some(value) = &data_point.value { match value { NumberDataPointValue::AsDouble(double_val) => { data_point_json.insert( "data_point_value".to_string(), - Value::Number(serde_json::Number::from_f64(*double_val).unwrap()), + serde_json::Number::from_f64(*double_val) + .map(Value::Number) + .unwrap_or(Value::Null), ); } NumberDataPointValue::AsInt(int_val) => { @@ -232,17 +240,23 @@ fn flatten_histogram( data_point .explicit_bounds .iter() - .map(|bound| Value::Number(serde_json::Number::from_f64(*bound).unwrap())) + .map(|bound| { + serde_json::Number::from_f64(*bound) + .map(Value::Number) + .unwrap_or(Value::Null) + }) .collect(), ); data_point_json.insert( "data_point_explicit_bounds".to_string(), data_point_explicit_bounds, ); - let exemplar_json = flatten_exemplar(&data_point.exemplars, other_attributes); - for (key, value) in exemplar_json { - data_point_json.insert(key.to_string(), value); - } + data_point_json.extend( + flatten_exemplar(&data_point.exemplars, other_attributes) + .into_iter() + .flatten(), + ); + data_point_json.extend(flatten_data_point_flags(data_point.flags)); insert_number_if_some(&mut data_point_json, "min", &data_point.min); insert_number_if_some(&mut data_point_json, "max", &data_point.max); @@ -332,10 +346,12 @@ fn flatten_exp_histogram( data_point_json.insert(format!("negative_{}", key), value); } } - let exemplar_json = flatten_exemplar(&data_point.exemplars, other_attributes); - for (key, value) in exemplar_json { - data_point_json.insert(key, value); - } + data_point_json.extend( + flatten_exemplar(&data_point.exemplars, other_attributes) + .into_iter() + .flatten(), + ); + data_points_json.push(data_point_json); } let mut exp_histogram_json = Map::new(); @@ -384,7 +400,9 @@ fn flatten_summary( ); data_point_json.insert( "data_point_sum".to_string(), - Value::Number(serde_json::Number::from_f64(data_point.sum).unwrap()), + serde_json::Number::from_f64(data_point.sum) + .map(Value::Number) + .unwrap_or(Value::Null), ); data_point_json.insert( "data_point_quantile_values".to_string(), @@ -397,16 +415,15 @@ fn flatten_summary( vec![ ( "quantile", - Value::Number( - serde_json::Number::from_f64(quantile_value.quantile) - .unwrap(), - ), + serde_json::Number::from_f64(quantile_value.quantile) + .map(Value::Number) + .unwrap_or(Value::Null), ), ( "value", - Value::Number( - serde_json::Number::from_f64(quantile_value.value).unwrap(), - ), + serde_json::Number::from_f64(quantile_value.value) + .map(Value::Number) + .unwrap_or(Value::Null), ), ] .into_iter() From bf0bf15c2289bcfcdbf158d7102f28e702445125 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sat, 19 Apr 2025 10:33:40 -0400 Subject: [PATCH 10/18] update for exemplar --- src/otel/metrics.rs | 35 ++++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index c044fb592..c9ffb5c1d 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -118,11 +118,14 @@ fn flatten_number_data_points( data_point.time_unix_nano as i64, )), ); - data_point_json.extend( - flatten_exemplar(&data_point.exemplars, other_attributes) - .into_iter() - .flatten(), - ); + + let exemplar_json = flatten_exemplar(&data_point.exemplars, other_attributes); + for exemplar in exemplar_json { + for (key, value) in exemplar { + data_point_json.insert(key, value); + } + } + data_point_json.extend(flatten_data_point_flags(data_point.flags)); if let Some(value) = &data_point.value { match value { @@ -251,11 +254,12 @@ fn flatten_histogram( "data_point_explicit_bounds".to_string(), data_point_explicit_bounds, ); - data_point_json.extend( - flatten_exemplar(&data_point.exemplars, other_attributes) - .into_iter() - .flatten(), - ); + let exemplar_json = flatten_exemplar(&data_point.exemplars, other_attributes); + for exemplar in exemplar_json { + for (key, value) in exemplar { + data_point_json.insert(key, value); + } + } data_point_json.extend(flatten_data_point_flags(data_point.flags)); insert_number_if_some(&mut data_point_json, "min", &data_point.min); @@ -346,11 +350,12 @@ fn flatten_exp_histogram( data_point_json.insert(format!("negative_{}", key), value); } } - data_point_json.extend( - flatten_exemplar(&data_point.exemplars, other_attributes) - .into_iter() - .flatten(), - ); + let exemplar_json = flatten_exemplar(&data_point.exemplars, other_attributes); + for exemplar in exemplar_json { + for (key, value) in exemplar { + data_point_json.insert(key, value); + } + } data_points_json.push(data_point_json); } From 70d8c6d39bdbb98e2ac6f447c5a7dabd9391cfc5 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 21 Apr 2025 02:32:41 -0400 Subject: [PATCH 11/18] fixed data loss for other_attributes in logs --- src/otel/logs.rs | 115 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 86 insertions(+), 29 deletions(-) diff --git a/src/otel/logs.rs b/src/otel/logs.rs index 9d758b320..ea546eceb 100644 --- a/src/otel/logs.rs +++ b/src/otel/logs.rs @@ -56,11 +56,9 @@ fn flatten_severity(severity_number: i32) -> Map { /// this function flattens the `LogRecord` object /// and returns a `Map` of the flattened json /// this function is called recursively for each log record object in the otel logs -pub fn flatten_log_record( - log_record: &LogRecord, - other_attributes: &mut Map, -) -> Map { +pub fn flatten_log_record(log_record: &LogRecord) -> Map { let mut log_record_json: Map = Map::new(); + let mut other_attributes = Map::new(); log_record_json.insert( "time_unix_nano".to_string(), Value::String(convert_epoch_nano_to_timestamp( @@ -86,7 +84,7 @@ pub fn flatten_log_record( insert_attributes( &mut log_record_json, &log_record.attributes, - other_attributes, + &mut other_attributes, ); log_record_json.insert( "log_record_dropped_attributes_count".to_string(), @@ -106,25 +104,41 @@ pub fn flatten_log_record( Value::String(hex::encode(&log_record.trace_id)), ); + // Add the `other_attributes` to the log record json + if !other_attributes.is_empty() { + let other_attributes = match serde_json::to_string(&other_attributes) { + Ok(s) => s, + Err(e) => { + tracing::warn!("failed to serialise OTEL other_attributes: {e}"); + String::default() + } + }; + log_record_json.insert( + "other_attributes".to_string(), + Value::String(other_attributes), + ); + } + log_record_json } /// this function flattens the `ScopeLogs` object /// and returns a `Vec` of `Map` of the flattened json -fn flatten_scope_log( - scope_log: &ScopeLogs, - other_attributes: &mut Map, -) -> Vec> { +fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec> { let mut vec_scope_log_json = Vec::new(); let mut scope_log_json = Map::new(); - + let mut other_attributes = Map::new(); if let Some(scope) = &scope_log.scope { scope_log_json.insert("scope_name".to_string(), Value::String(scope.name.clone())); scope_log_json.insert( "scope_version".to_string(), Value::String(scope.version.clone()), ); - insert_attributes(&mut scope_log_json, &scope.attributes, other_attributes); + insert_attributes( + &mut scope_log_json, + &scope.attributes, + &mut other_attributes, + ); scope_log_json.insert( "scope_dropped_attributes_count".to_string(), Value::Number(scope.dropped_attributes_count.into()), @@ -136,12 +150,40 @@ fn flatten_scope_log( ); for log_record in &scope_log.log_records { - let log_record_json = flatten_log_record(log_record, other_attributes); + let log_record_json = flatten_log_record(log_record); let mut combined_json = scope_log_json.clone(); combined_json.extend(log_record_json); vec_scope_log_json.push(combined_json); } + if !other_attributes.is_empty() { + let scope_other_attributes = match serde_json::to_string(&other_attributes) { + Ok(s) => s, + Err(e) => { + tracing::warn!("failed to serialise OTEL other_attributes: {e}"); + String::default() + } + }; + // append scope_other_attributes to each log record json + for scope_log_json in &mut vec_scope_log_json { + // fetch the other_attributes from the scope log json + if let Some(other_attributes) = scope_log_json.get("other_attributes") { + let other_attributes = other_attributes.as_str().unwrap_or_default(); + // append the other_attributes to the scope log json + let other_attributes = format!("{other_attributes}, {scope_other_attributes}"); + scope_log_json.insert( + "other_attributes".to_string(), + Value::String(other_attributes), + ); + } else { + scope_log_json.insert( + "other_attributes".to_string(), + Value::String(scope_other_attributes.clone()), + ); + } + } + } + vec_scope_log_json } @@ -149,10 +191,10 @@ fn flatten_scope_log( /// and returns a `Vec` of `Value::Object` of the flattened json pub fn flatten_otel_logs(message: &LogsData) -> Vec { let mut vec_otel_json = Vec::new(); - let mut other_attributes = Map::new(); + for record in &message.resource_logs { let mut resource_log_json = Map::new(); - + let mut other_attributes = Map::new(); if let Some(resource) = &record.resource { insert_attributes( &mut resource_log_json, @@ -167,7 +209,7 @@ pub fn flatten_otel_logs(message: &LogsData) -> Vec { let mut vec_resource_logs_json = Vec::new(); for scope_log in &record.scope_logs { - vec_resource_logs_json.extend(flatten_scope_log(scope_log, &mut other_attributes)); + vec_resource_logs_json.extend(flatten_scope_log(scope_log)); } resource_log_json.insert( "schema_url".to_string(), @@ -178,21 +220,36 @@ pub fn flatten_otel_logs(message: &LogsData) -> Vec { resource_logs_json.extend(resource_log_json.clone()); } - vec_otel_json.extend(vec_resource_logs_json); - } - // Add common attributes as one attribute in stringified array to each log record - let other_attributes = match serde_json::to_string(&other_attributes) { - Ok(s) => s, - Err(e) => { - tracing::warn!("failed to serialise OTEL other_attributes: {e}"); - String::default() + if !other_attributes.is_empty() { + let resource_other_attributes = match serde_json::to_string(&other_attributes) { + Ok(s) => s, + Err(e) => { + tracing::warn!("failed to serialise OTEL other_attributes: {e}"); + String::default() + } + }; + // append scope_other_attributes to each log record json + for resource_logs_json in &mut vec_resource_logs_json { + // fetch the other_attributes from the scope log json + if let Some(other_attributes) = resource_logs_json.get("other_attributes") { + let other_attributes = other_attributes.as_str().unwrap_or_default(); + // append the other_attributes to the scope log json + let other_attributes = + format!("{other_attributes}, {resource_other_attributes}"); + resource_logs_json.insert( + "other_attributes".to_string(), + Value::String(other_attributes), + ); + } else { + resource_logs_json.insert( + "other_attributes".to_string(), + Value::String(resource_other_attributes.clone()), + ); + } + } } - }; - for log_record_json in &mut vec_otel_json { - log_record_json.insert( - "other_attributes".to_string(), - Value::String(other_attributes.clone()), - ); + + vec_otel_json.extend(vec_resource_logs_json); } vec_otel_json.into_iter().map(Value::Object).collect() } From dc6f4d32bc26911c1f9f4ff394994f6ee5f11a0f Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 21 Apr 2025 06:26:37 -0400 Subject: [PATCH 12/18] refactor and changes to logs and traces attributes handling --- src/otel/logs.rs | 69 +++----------------------- src/otel/otel_utils.rs | 48 ++++++++++++++++++ src/otel/traces.rs | 108 +++++++++++++++++++++++++---------------- 3 files changed, 122 insertions(+), 103 deletions(-) diff --git a/src/otel/logs.rs b/src/otel/logs.rs index ea546eceb..e4f095200 100644 --- a/src/otel/logs.rs +++ b/src/otel/logs.rs @@ -25,7 +25,9 @@ use serde_json::Value; use super::otel_utils::collect_json_from_values; use super::otel_utils::convert_epoch_nano_to_timestamp; +use super::otel_utils::fetch_attributes_string; use super::otel_utils::insert_attributes; +use super::otel_utils::merge_attributes_in_json; pub const OTEL_LOG_KNOWN_FIELD_LIST: [&str; 6] = [ "time_unix_nano", @@ -106,13 +108,7 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map { // Add the `other_attributes` to the log record json if !other_attributes.is_empty() { - let other_attributes = match serde_json::to_string(&other_attributes) { - Ok(s) => s, - Err(e) => { - tracing::warn!("failed to serialise OTEL other_attributes: {e}"); - String::default() - } - }; + let other_attributes = fetch_attributes_string(&other_attributes); log_record_json.insert( "other_attributes".to_string(), Value::String(other_attributes), @@ -156,33 +152,8 @@ fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec> { vec_scope_log_json.push(combined_json); } - if !other_attributes.is_empty() { - let scope_other_attributes = match serde_json::to_string(&other_attributes) { - Ok(s) => s, - Err(e) => { - tracing::warn!("failed to serialise OTEL other_attributes: {e}"); - String::default() - } - }; - // append scope_other_attributes to each log record json - for scope_log_json in &mut vec_scope_log_json { - // fetch the other_attributes from the scope log json - if let Some(other_attributes) = scope_log_json.get("other_attributes") { - let other_attributes = other_attributes.as_str().unwrap_or_default(); - // append the other_attributes to the scope log json - let other_attributes = format!("{other_attributes}, {scope_other_attributes}"); - scope_log_json.insert( - "other_attributes".to_string(), - Value::String(other_attributes), - ); - } else { - scope_log_json.insert( - "other_attributes".to_string(), - Value::String(scope_other_attributes.clone()), - ); - } - } - } + // Add the `other_attributes` to the scope log json + merge_attributes_in_json(other_attributes, &mut vec_scope_log_json); vec_scope_log_json } @@ -220,34 +191,8 @@ pub fn flatten_otel_logs(message: &LogsData) -> Vec { resource_logs_json.extend(resource_log_json.clone()); } - if !other_attributes.is_empty() { - let resource_other_attributes = match serde_json::to_string(&other_attributes) { - Ok(s) => s, - Err(e) => { - tracing::warn!("failed to serialise OTEL other_attributes: {e}"); - String::default() - } - }; - // append scope_other_attributes to each log record json - for resource_logs_json in &mut vec_resource_logs_json { - // fetch the other_attributes from the scope log json - if let Some(other_attributes) = resource_logs_json.get("other_attributes") { - let other_attributes = other_attributes.as_str().unwrap_or_default(); - // append the other_attributes to the scope log json - let other_attributes = - format!("{other_attributes}, {resource_other_attributes}"); - resource_logs_json.insert( - "other_attributes".to_string(), - Value::String(other_attributes), - ); - } else { - resource_logs_json.insert( - "other_attributes".to_string(), - Value::String(resource_other_attributes.clone()), - ); - } - } - } + // Add the `other_attributes` to the resource log json + merge_attributes_in_json(other_attributes, &mut vec_resource_logs_json); vec_otel_json.extend(vec_resource_logs_json); } diff --git a/src/otel/otel_utils.rs b/src/otel/otel_utils.rs index 3dc7f43b5..ff5d65dbb 100644 --- a/src/otel/otel_utils.rs +++ b/src/otel/otel_utils.rs @@ -194,3 +194,51 @@ pub fn convert_epoch_nano_to_timestamp(epoch_ns: i64) -> String { let dt = DateTime::from_timestamp_nanos(epoch_ns).naive_utc(); dt.format("%Y-%m-%dT%H:%M:%S%.6fZ").to_string() } + +pub fn merge_attributes_in_json( + attributes: Map, + vec_json: &mut Vec>, +) { + if !attributes.is_empty() { + let attributes = fetch_attributes_string(&attributes); + for json in vec_json { + if json.contains_key("other_attributes") { + let other_attributes = json.get_mut("other_attributes").unwrap(); + let other_attributes = other_attributes.as_str().unwrap_or_default(); + // append the other_attributes to the scope log json + let other_attributes = format!("{other_attributes}, {attributes}"); + json.insert( + "other_attributes".to_string(), + Value::String(other_attributes), + ); + } else { + json.insert( + "other_attributes".to_string(), + Value::String(attributes.clone()), + ); + } + } + } +} + +pub fn fetch_attributes_from_json(json_arr: &Vec>) -> String { + let mut combined_attributes = String::default(); + for json in json_arr { + if let Some(other_attributes) = json.get("other_attributes") { + if let Some(other_attributes) = other_attributes.as_str() { + combined_attributes.push_str(other_attributes); + } + } + } + combined_attributes +} + +pub fn fetch_attributes_string(attributes: &Map) -> String { + match serde_json::to_string(attributes) { + Ok(s) => s, + Err(e) => { + tracing::warn!("failed to serialise OTEL other_attributes: {e}"); + String::default() + } + } +} diff --git a/src/otel/traces.rs b/src/otel/traces.rs index c77ea6ad8..5ffcf9308 100644 --- a/src/otel/traces.rs +++ b/src/otel/traces.rs @@ -25,7 +25,10 @@ use opentelemetry_proto::tonic::trace::v1::TracesData; use serde_json::{Map, Value}; use super::otel_utils::convert_epoch_nano_to_timestamp; +use super::otel_utils::fetch_attributes_from_json; +use super::otel_utils::fetch_attributes_string; use super::otel_utils::insert_attributes; +use super::otel_utils::merge_attributes_in_json; pub const OTEL_TRACES_KNOWN_FIELD_LIST: [&str; 15] = [ "span_trace_id", @@ -46,15 +49,12 @@ pub const OTEL_TRACES_KNOWN_FIELD_LIST: [&str; 15] = [ ]; /// this function flattens the `ScopeSpans` object /// and returns a `Vec` of `Map` of the flattened json -fn flatten_scope_span( - scope_span: &ScopeSpans, - other_attributes: &mut Map, -) -> Vec> { +fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec> { let mut vec_scope_span_json = Vec::new(); let mut scope_span_json = Map::new(); - + let mut other_attributes = Map::new(); for span in &scope_span.spans { - let span_record_json = flatten_span_record(span, other_attributes); + let span_record_json = flatten_span_record(span); vec_scope_span_json.extend(span_record_json); } @@ -64,7 +64,11 @@ fn flatten_scope_span( "scope_version".to_string(), Value::String(scope.version.clone()), ); - insert_attributes(&mut scope_span_json, &scope.attributes, other_attributes); + insert_attributes( + &mut scope_span_json, + &scope.attributes, + &mut other_attributes, + ); scope_span_json.insert( "scope_dropped_attributes_count".to_string(), Value::Number(scope.dropped_attributes_count.into()), @@ -83,6 +87,8 @@ fn flatten_scope_span( Value::String(scope_span.schema_url.clone()), ); } + // Add the `other_attributes` to the scope span json + merge_attributes_in_json(other_attributes.clone(), &mut vec_scope_span_json); vec_scope_span_json } @@ -109,7 +115,7 @@ pub fn flatten_otel_traces(message: &TracesData) -> Vec { let mut vec_resource_spans_json = Vec::new(); for scope_span in &record.scope_spans { - let scope_span_json = flatten_scope_span(scope_span, &mut other_attributes); + let scope_span_json = flatten_scope_span(scope_span); vec_resource_spans_json.extend(scope_span_json); } @@ -123,37 +129,23 @@ pub fn flatten_otel_traces(message: &TracesData) -> Vec { resource_spans_json.insert(key.clone(), value.clone()); } } - + // Add the `other_attributes` to the resource span json + merge_attributes_in_json(other_attributes.clone(), &mut vec_resource_spans_json); vec_otel_json.extend(vec_resource_spans_json); } - // Add common attributes as one attribute in stringified array to each span record - let other_attributes = match serde_json::to_string(&other_attributes) { - Ok(s) => s, - Err(e) => { - tracing::warn!("failed to serialise OTEL other_attributes: {e}"); - String::default() - } - }; - for span_record_json in &mut vec_otel_json { - span_record_json.insert( - "other_attributes".to_string(), - Value::String(other_attributes.clone()), - ); - } + vec_otel_json.into_iter().map(Value::Object).collect() } /// otel traces has json array of events /// this function flattens the `Event` object /// and returns a `Vec` of `Map` of the flattened json -fn flatten_events( - events: &[Event], - other_attributes: &mut Map, -) -> Vec> { +fn flatten_events(events: &[Event]) -> Vec> { events .iter() .map(|event| { let mut event_json = Map::new(); + let mut other_attributes = Map::new(); event_json.insert( "event_time_unix_nano".to_string(), Value::String( @@ -161,11 +153,19 @@ fn flatten_events( ), ); event_json.insert("event_name".to_string(), Value::String(event.name.clone())); - insert_attributes(&mut event_json, &event.attributes, other_attributes); + insert_attributes(&mut event_json, &event.attributes, &mut other_attributes); event_json.insert( "event_dropped_attributes_count".to_string(), Value::Number(event.dropped_attributes_count.into()), ); + + if !other_attributes.is_empty() { + let other_attributes = fetch_attributes_string(&other_attributes); + event_json.insert( + "other_attributes".to_string(), + Value::String(other_attributes), + ); + } event_json }) .collect() @@ -174,14 +174,12 @@ fn flatten_events( /// otel traces has json array of links /// this function flattens the `Link` object /// and returns a `Vec` of `Map` of the flattened json -fn flatten_links( - links: &[Link], - other_attributes: &mut Map, -) -> Vec> { +fn flatten_links(links: &[Link]) -> Vec> { links .iter() .map(|link| { let mut link_json = Map::new(); + let mut other_attributes = Map::new(); link_json.insert( "link_span_id".to_string(), Value::String(hex::encode(&link.span_id)), @@ -191,11 +189,19 @@ fn flatten_links( Value::String(hex::encode(&link.trace_id)), ); - insert_attributes(&mut link_json, &link.attributes, other_attributes); + insert_attributes(&mut link_json, &link.attributes, &mut other_attributes); link_json.insert( "link_dropped_attributes_count".to_string(), Value::Number(link.dropped_attributes_count.into()), ); + + if !other_attributes.is_empty() { + let other_attributes = fetch_attributes_string(&other_attributes); + link_json.insert( + "other_attributes".to_string(), + Value::String(other_attributes), + ); + } link_json }) .collect() @@ -278,12 +284,9 @@ fn flatten_kind(kind: i32) -> Map { /// this function flattens the `Span` object /// and returns a `Vec` of `Map` of the flattened json /// this function is called recursively for each span record object in the otel traces event -fn flatten_span_record( - span_record: &Span, - other_attributes: &mut Map, -) -> Vec> { +fn flatten_span_record(span_record: &Span) -> Vec> { let mut span_records_json = Vec::new(); - + let mut other_attributes = Map::new(); let mut span_record_json = Map::new(); span_record_json.insert( "span_trace_id".to_string(), @@ -322,18 +325,41 @@ fn flatten_span_record( insert_attributes( &mut span_record_json, &span_record.attributes, - other_attributes, + &mut other_attributes, ); span_record_json.insert( "span_dropped_attributes_count".to_string(), Value::Number(span_record.dropped_attributes_count.into()), ); - span_records_json.extend(flatten_events(&span_record.events, other_attributes)); + let events_json = flatten_events(&span_record.events); + // fetch all other_attributes from the events_json + let events_other_attributes = fetch_attributes_from_json(&events_json); + span_records_json.extend(events_json); span_record_json.insert( "span_dropped_events_count".to_string(), Value::Number(span_record.dropped_events_count.into()), ); - span_records_json.extend(flatten_links(&span_record.links, other_attributes)); + let links_json = flatten_links(&span_record.links); + // fetch all other_attributes from the links_json + let links_other_attributes = fetch_attributes_from_json(&links_json); + span_records_json.extend(links_json); + if !other_attributes.is_empty() { + let other_attributes = fetch_attributes_string(&other_attributes); + span_record_json.insert( + "other_attributes".to_string(), + Value::String(format!( + "{other_attributes} {events_other_attributes} {links_other_attributes}" + )), + ); + } else { + span_record_json.insert( + "other_attributes".to_string(), + Value::String(format!( + "{events_other_attributes} {links_other_attributes}" + )), + ); + } + span_record_json.insert( "span_dropped_links_count".to_string(), Value::Number(span_record.dropped_links_count.into()), From 6a0fc576665d6864bbb17985b3d54a6b6bf80947 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 21 Apr 2025 08:36:35 -0400 Subject: [PATCH 13/18] attributes in otel metrics --- src/otel/metrics.rs | 157 ++++++++++++++++++++++------------------- src/otel/otel_utils.rs | 19 +++-- src/otel/traces.rs | 16 ++--- 3 files changed, 105 insertions(+), 87 deletions(-) diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index c9ffb5c1d..9935c1157 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -24,7 +24,8 @@ use opentelemetry_proto::tonic::metrics::v1::{ use serde_json::{Map, Value}; use super::otel_utils::{ - convert_epoch_nano_to_timestamp, insert_attributes, insert_number_if_some, + convert_epoch_nano_to_timestamp, fetch_attributes_string, insert_attributes, + insert_number_if_some, merge_attributes_in_json, }; pub const OTEL_METRICS_KNOWN_FIELD_LIST: [&str; 5] = [ @@ -39,18 +40,16 @@ pub const OTEL_METRICS_KNOWN_FIELD_LIST: [&str; 5] = [ /// this function flatten the exemplar json array /// and returns a `Map` of the exemplar json /// this function is reused in all json objects that have exemplar -fn flatten_exemplar( - exemplars: &[Exemplar], - other_attributes: &mut Map, -) -> Vec> { +fn flatten_exemplar(exemplars: &[Exemplar]) -> Vec> { exemplars .iter() .map(|exemplar| { let mut exemplar_json = Map::new(); + let mut other_attributes = Map::new(); insert_attributes( &mut exemplar_json, &exemplar.filtered_attributes, - other_attributes, + &mut other_attributes, ); exemplar_json.insert( "exemplar_time_unix_nano".to_string(), @@ -84,6 +83,13 @@ fn flatten_exemplar( } } } + if !other_attributes.is_empty() { + let other_attributes = fetch_attributes_string(&other_attributes); + exemplar_json.insert( + "other_attributes".to_string(), + Value::String(other_attributes), + ); + } exemplar_json }) .collect() @@ -93,18 +99,16 @@ fn flatten_exemplar( /// this function flatten the number data points json array /// and returns a `Vec` of `Map` of the flattened json /// this function is reused in all json objects that have number data points -fn flatten_number_data_points( - data_points: &[NumberDataPoint], - other_attributes: &mut Map, -) -> Vec> { +fn flatten_number_data_points(data_points: &[NumberDataPoint]) -> Vec> { data_points .iter() .map(|data_point| { let mut data_point_json = Map::new(); + let mut other_attributes = Map::new(); insert_attributes( &mut data_point_json, &data_point.attributes, - other_attributes, + &mut other_attributes, ); data_point_json.insert( "start_time_unix_nano".to_string(), @@ -119,11 +123,19 @@ fn flatten_number_data_points( )), ); - let exemplar_json = flatten_exemplar(&data_point.exemplars, other_attributes); - for exemplar in exemplar_json { - for (key, value) in exemplar { - data_point_json.insert(key, value); + let mut exemplar_json = flatten_exemplar(&data_point.exemplars); + if !exemplar_json.is_empty() { + merge_attributes_in_json(other_attributes, &mut exemplar_json); + for exemplar in exemplar_json { + for (key, value) in exemplar { + data_point_json.insert(key, value); + } } + } else { + data_point_json.insert( + "other_attributes".to_string(), + Value::String(fetch_attributes_string(&other_attributes)), + ); } data_point_json.extend(flatten_data_point_flags(data_point.flags)); @@ -154,12 +166,10 @@ fn flatten_number_data_points( /// each gauge object has json array for data points /// this function flatten the gauge json object /// and returns a `Vec` of `Map` for each data point -fn flatten_gauge( - gauge: &Gauge, - other_attributes: &mut Map, -) -> Vec> { +fn flatten_gauge(gauge: &Gauge) -> Vec> { let mut vec_gauge_json = Vec::new(); - let data_points_json = flatten_number_data_points(&gauge.data_points, other_attributes); + let data_points_json = flatten_number_data_points(&gauge.data_points); + for data_point_json in data_points_json { let mut gauge_json = Map::new(); for (key, value) in &data_point_json { @@ -174,9 +184,9 @@ fn flatten_gauge( /// each sum object has json array for data points /// this function flatten the sum json object /// and returns a `Vec` of `Map` for each data point -fn flatten_sum(sum: &Sum, other_attributes: &mut Map) -> Vec> { +fn flatten_sum(sum: &Sum) -> Vec> { let mut vec_sum_json = Vec::new(); - let data_points_json = flatten_number_data_points(&sum.data_points, other_attributes); + let data_points_json = flatten_number_data_points(&sum.data_points); for data_point_json in data_points_json { let mut sum_json = Map::new(); for (key, value) in &data_point_json { @@ -199,17 +209,15 @@ fn flatten_sum(sum: &Sum, other_attributes: &mut Map) -> Vec, -) -> Vec> { +fn flatten_histogram(histogram: &Histogram) -> Vec> { let mut data_points_json = Vec::new(); for data_point in &histogram.data_points { let mut data_point_json = Map::new(); + let mut other_attributes = Map::new(); insert_attributes( &mut data_point_json, &data_point.attributes, - other_attributes, + &mut other_attributes, ); data_point_json.insert( "start_time_unix_nano".to_string(), @@ -254,11 +262,19 @@ fn flatten_histogram( "data_point_explicit_bounds".to_string(), data_point_explicit_bounds, ); - let exemplar_json = flatten_exemplar(&data_point.exemplars, other_attributes); - for exemplar in exemplar_json { - for (key, value) in exemplar { - data_point_json.insert(key, value); + let mut exemplar_json = flatten_exemplar(&data_point.exemplars); + if !exemplar_json.is_empty() { + merge_attributes_in_json(other_attributes, &mut exemplar_json); + for exemplar in exemplar_json { + for (key, value) in exemplar { + data_point_json.insert(key, value); + } } + } else { + data_point_json.insert( + "other_attributes".to_string(), + Value::String(fetch_attributes_string(&other_attributes)), + ); } data_point_json.extend(flatten_data_point_flags(data_point.flags)); @@ -301,17 +317,15 @@ fn flatten_buckets(bucket: &Buckets) -> Map { /// each exponential histogram object has json array for data points /// this function flatten the exponential histogram json object /// and returns a `Vec` of `Map` for each data point -fn flatten_exp_histogram( - exp_histogram: &ExponentialHistogram, - other_attributes: &mut Map, -) -> Vec> { +fn flatten_exp_histogram(exp_histogram: &ExponentialHistogram) -> Vec> { let mut data_points_json = Vec::new(); for data_point in &exp_histogram.data_points { let mut data_point_json = Map::new(); + let mut other_attributes = Map::new(); insert_attributes( &mut data_point_json, &data_point.attributes, - other_attributes, + &mut other_attributes, ); data_point_json.insert( "start_time_unix_nano".to_string(), @@ -350,11 +364,19 @@ fn flatten_exp_histogram( data_point_json.insert(format!("negative_{}", key), value); } } - let exemplar_json = flatten_exemplar(&data_point.exemplars, other_attributes); - for exemplar in exemplar_json { - for (key, value) in exemplar { - data_point_json.insert(key, value); + let mut exemplar_json = flatten_exemplar(&data_point.exemplars); + if !exemplar_json.is_empty() { + merge_attributes_in_json(other_attributes, &mut exemplar_json); + for exemplar in exemplar_json { + for (key, value) in exemplar { + data_point_json.insert(key, value); + } } + } else { + data_point_json.insert( + "other_attributes".to_string(), + Value::String(fetch_attributes_string(&other_attributes)), + ); } data_points_json.push(data_point_json); @@ -375,17 +397,15 @@ fn flatten_exp_histogram( /// each summary object has json array for data points /// this function flatten the summary json object /// and returns a `Vec` of `Map` for each data point -fn flatten_summary( - summary: &Summary, - other_attributes: &mut Map, -) -> Vec> { +fn flatten_summary(summary: &Summary) -> Vec> { let mut data_points_json = Vec::new(); for data_point in &summary.data_points { let mut data_point_json = Map::new(); + let mut other_attributes = Map::new(); insert_attributes( &mut data_point_json, &data_point.attributes, - other_attributes, + &mut other_attributes, ); data_point_json.insert( "start_time_unix_nano".to_string(), @@ -449,33 +469,31 @@ fn flatten_summary( /// this function flatten the metric json object /// and returns a `Vec` of `Map` of the flattened json /// this function is called recursively for each metric record object in the otel metrics event -pub fn flatten_metrics_record( - metrics_record: &Metric, - other_attributes: &mut Map, -) -> Vec> { +pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec> { let mut data_points_json = Vec::new(); let mut metric_json = Map::new(); + let mut other_attributes = Map::new(); let mut metric_type = String::default(); match &metrics_record.data { Some(metric::Data::Gauge(gauge)) => { metric_type = "gauge".to_string(); - data_points_json.extend(flatten_gauge(gauge, other_attributes)); + data_points_json.extend(flatten_gauge(gauge)); } Some(metric::Data::Sum(sum)) => { metric_type = "sum".to_string(); - data_points_json.extend(flatten_sum(sum, other_attributes)); + data_points_json.extend(flatten_sum(sum)); } Some(metric::Data::Histogram(histogram)) => { metric_type = "histogram".to_string(); - data_points_json.extend(flatten_histogram(histogram, other_attributes)); + data_points_json.extend(flatten_histogram(histogram)); } Some(metric::Data::ExponentialHistogram(exp_histogram)) => { metric_type = "exponential_histogram".to_string(); - data_points_json.extend(flatten_exp_histogram(exp_histogram, other_attributes)); + data_points_json.extend(flatten_exp_histogram(exp_histogram)); } Some(metric::Data::Summary(summary)) => { metric_type = "summary".to_string(); - data_points_json.extend(flatten_summary(summary, other_attributes)); + data_points_json.extend(flatten_summary(summary)); } None => {} } @@ -492,7 +510,11 @@ pub fn flatten_metrics_record( Value::String(metrics_record.unit.clone()), ); metric_json.insert("metric_type".to_string(), Value::String(metric_type)); - insert_attributes(&mut metric_json, &metrics_record.metadata, other_attributes); + insert_attributes( + &mut metric_json, + &metrics_record.metadata, + &mut other_attributes, + ); for data_point_json in &mut data_points_json { for (key, value) in &metric_json { data_point_json.insert(key.clone(), value.clone()); @@ -501,6 +523,7 @@ pub fn flatten_metrics_record( if data_points_json.is_empty() { data_points_json.push(metric_json); } + merge_attributes_in_json(other_attributes, &mut data_points_json); data_points_json } @@ -508,9 +531,9 @@ pub fn flatten_metrics_record( /// and returns a `Vec` of `Value::Object` of the flattened json pub fn flatten_otel_metrics(message: MetricsData) -> Vec { let mut vec_otel_json = Vec::new(); - let mut other_attributes = Map::new(); for record in &message.resource_metrics { let mut resource_metrics_json = Map::new(); + let mut other_attributes = Map::new(); if let Some(resource) = &record.resource { insert_attributes( &mut resource_metrics_json, @@ -525,11 +548,9 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Vec { let mut vec_scope_metrics_json = Vec::new(); for scope_metric in &record.scope_metrics { let mut scope_metrics_json = Map::new(); + let mut other_attributes = Map::new(); for metrics_record in &scope_metric.metrics { - vec_scope_metrics_json.extend(flatten_metrics_record( - metrics_record, - &mut other_attributes, - )); + vec_scope_metrics_json.extend(flatten_metrics_record(metrics_record)); } if let Some(scope) = &scope_metric.scope { scope_metrics_json @@ -558,6 +579,7 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Vec { scope_metric_json.insert(key.clone(), value.clone()); } } + merge_attributes_in_json(other_attributes, &mut vec_scope_metrics_json); } resource_metrics_json.insert( "resource_metrics_schema_url".to_string(), @@ -568,22 +590,9 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Vec { resource_metric_json.insert(key.clone(), value.clone()); } } + merge_attributes_in_json(other_attributes, &mut vec_scope_metrics_json); vec_otel_json.extend(vec_scope_metrics_json); } - // Add common attributes as one attribute in stringified array to each metric record - let other_attributes = match serde_json::to_string(&other_attributes) { - Ok(s) => s, - Err(e) => { - tracing::warn!("failed to serialise OTEL other_attributes: {e}"); - String::default() - } - }; - for metric_record_json in &mut vec_otel_json { - metric_record_json.insert( - "other_attributes".to_string(), - Value::String(other_attributes.clone()), - ); - } vec_otel_json.into_iter().map(Value::Object).collect() } diff --git a/src/otel/otel_utils.rs b/src/otel/otel_utils.rs index ff5d65dbb..59e432899 100644 --- a/src/otel/otel_utils.rs +++ b/src/otel/otel_utils.rs @@ -45,7 +45,13 @@ pub fn collect_json_from_value(key: &String, value: OtelValue) -> Map { let json_array_value = collect_json_from_array_value(array_val); // Convert the array to a JSON string - let json_array_string = serde_json::to_string(&json_array_value).unwrap(); + let json_array_string = match serde_json::to_string(&json_array_value) { + Ok(s) => s, + Err(e) => { + tracing::warn!("failed to serialise array value: {e}"); + String::default() + } + }; // Insert the array into the result map value_json.insert(key.to_string(), Value::String(json_array_string)); } @@ -105,9 +111,12 @@ fn collect_json_from_key_value_list(key_value_list: KeyValueList) -> Map = Map::new(); for key_value in key_value_list.values { if let Some(val) = key_value.value { - let val = val.value.unwrap(); - let json_value = collect_json_from_value(&key_value.key, val); - kv_list_json.extend(json_value); + if let Some(val) = val.value { + let json_value = collect_json_from_value(&key_value.key, val); + kv_list_json.extend(json_value); + } else { + tracing::warn!("Key '{}' has no value in key-value list", key_value.key); + } } } kv_list_json @@ -206,7 +215,7 @@ pub fn merge_attributes_in_json( let other_attributes = json.get_mut("other_attributes").unwrap(); let other_attributes = other_attributes.as_str().unwrap_or_default(); // append the other_attributes to the scope log json - let other_attributes = format!("{other_attributes}, {attributes}"); + let other_attributes = format!("{other_attributes} {attributes}"); json.insert( "other_attributes".to_string(), Value::String(other_attributes), diff --git a/src/otel/traces.rs b/src/otel/traces.rs index 5ffcf9308..e3b61a44f 100644 --- a/src/otel/traces.rs +++ b/src/otel/traces.rs @@ -344,12 +344,16 @@ fn flatten_span_record(span_record: &Span) -> Vec> { let links_other_attributes = fetch_attributes_from_json(&links_json); span_records_json.extend(links_json); if !other_attributes.is_empty() { - let other_attributes = fetch_attributes_string(&other_attributes); + let mut other_attributes = fetch_attributes_string(&other_attributes); + if !events_other_attributes.is_empty() { + other_attributes.push_str(&events_other_attributes); + } + if !links_other_attributes.is_empty() { + other_attributes.push_str(&links_other_attributes); + } span_record_json.insert( "other_attributes".to_string(), - Value::String(format!( - "{other_attributes} {events_other_attributes} {links_other_attributes}" - )), + Value::String(other_attributes), ); } else { span_record_json.insert( @@ -359,16 +363,13 @@ fn flatten_span_record(span_record: &Span) -> Vec> { )), ); } - span_record_json.insert( "span_dropped_links_count".to_string(), Value::Number(span_record.dropped_links_count.into()), ); - if let Some(status) = &span_record.status { span_record_json.extend(flatten_status(status)); } - // if span_record.events is null, code should still flatten other elements in the span record - this is handled in the if block // else block handles the flattening the span record that includes events and links records in each span record if span_records_json.is_empty() { @@ -380,6 +381,5 @@ fn flatten_span_record(span_record: &Span) -> Vec> { } } } - span_records_json } From 217e96876a5a8d9aa7ab10cd581040d51f19bc8d Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 21 Apr 2025 22:48:48 -0400 Subject: [PATCH 14/18] refactor and store other attributes as map --- src/otel/metrics.rs | 20 +++++++++---- src/otel/otel_utils.rs | 64 ++++++++++++++++++++++++++++-------------- src/otel/traces.rs | 28 ++++++++---------- 3 files changed, 68 insertions(+), 44 deletions(-) diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index 9935c1157..902b84cc7 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -131,7 +131,7 @@ fn flatten_number_data_points(data_points: &[NumberDataPoint]) -> Vec Vec> { data_point_json.insert(key, value); } } - } else { + } else if !other_attributes.is_empty() { data_point_json.insert( "other_attributes".to_string(), Value::String(fetch_attributes_string(&other_attributes)), @@ -372,7 +372,7 @@ fn flatten_exp_histogram(exp_histogram: &ExponentialHistogram) -> Vec Vec> { .collect(), ), ); + + if !other_attributes.is_empty() { + data_point_json.insert( + "other_attributes".to_string(), + Value::String(fetch_attributes_string(&other_attributes)), + ); + } + data_points_json.push(data_point_json); } data_points_json @@ -548,7 +556,7 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Vec { let mut vec_scope_metrics_json = Vec::new(); for scope_metric in &record.scope_metrics { let mut scope_metrics_json = Map::new(); - let mut other_attributes = Map::new(); + let mut scope_other_attributes = Map::new(); for metrics_record in &scope_metric.metrics { vec_scope_metrics_json.extend(flatten_metrics_record(metrics_record)); } @@ -562,7 +570,7 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Vec { insert_attributes( &mut scope_metrics_json, &scope.attributes, - &mut other_attributes, + &mut scope_other_attributes, ); scope_metrics_json.insert( "scope_dropped_attributes_count".to_string(), @@ -579,7 +587,7 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Vec { scope_metric_json.insert(key.clone(), value.clone()); } } - merge_attributes_in_json(other_attributes, &mut vec_scope_metrics_json); + merge_attributes_in_json(scope_other_attributes, &mut vec_scope_metrics_json); } resource_metrics_json.insert( "resource_metrics_schema_url".to_string(), diff --git a/src/otel/otel_utils.rs b/src/otel/otel_utils.rs index 59e432899..b733471b4 100644 --- a/src/otel/otel_utils.rs +++ b/src/otel/otel_utils.rs @@ -204,44 +204,66 @@ pub fn convert_epoch_nano_to_timestamp(epoch_ns: i64) -> String { dt.format("%Y-%m-%dT%H:%M:%S%.6fZ").to_string() } +/// fetch `other_attributes` from array of JSON objects +/// merge them with the provided attributes +/// and return the merged array of JSON object pub fn merge_attributes_in_json( attributes: Map, vec_json: &mut Vec>, ) { if !attributes.is_empty() { - let attributes = fetch_attributes_string(&attributes); for json in vec_json { - if json.contains_key("other_attributes") { - let other_attributes = json.get_mut("other_attributes").unwrap(); - let other_attributes = other_attributes.as_str().unwrap_or_default(); - // append the other_attributes to the scope log json - let other_attributes = format!("{other_attributes} {attributes}"); - json.insert( - "other_attributes".to_string(), - Value::String(other_attributes), - ); - } else { - json.insert( - "other_attributes".to_string(), - Value::String(attributes.clone()), - ); + if let Some(other_attrs) = json.get("other_attributes") { + if let Value::String(attrs_str) = other_attrs { + if let Ok(mut existing_attrs) = + serde_json::from_str::>(attrs_str) + { + for (key, value) in attributes.clone() { + existing_attrs.insert(key, value); + } + if let Ok(merged_str) = serde_json::to_string(&existing_attrs) { + json.insert("other_attributes".to_string(), Value::String(merged_str)); + } + } else if let Ok(attrs_str) = serde_json::to_string(&attributes) { + json.insert("other_attributes".to_string(), Value::String(attrs_str)); + } + } else if let Value::Object(existing_attrs) = other_attrs { + let mut merged_attrs = existing_attrs.clone(); + for (key, value) in attributes.clone() { + merged_attrs.insert(key, value); + } + if let Ok(merged_str) = serde_json::to_string(&merged_attrs) { + json.insert("other_attributes".to_string(), Value::String(merged_str)); + } + } + } else if let Ok(attrs_str) = serde_json::to_string(&attributes) { + json.insert("other_attributes".to_string(), Value::String(attrs_str)); } } } } -pub fn fetch_attributes_from_json(json_arr: &Vec>) -> String { - let mut combined_attributes = String::default(); +/// fetch `other_attributes` from array of JSON objects +/// and merge them into a single map +/// and return the merged map +pub fn fetch_attributes_from_json(json_arr: &Vec>) -> Map { + let mut merged_attributes = Map::new(); + for json in json_arr { - if let Some(other_attributes) = json.get("other_attributes") { - if let Some(other_attributes) = other_attributes.as_str() { - combined_attributes.push_str(other_attributes); + if let Some(Value::String(attrs_str)) = json.get("other_attributes") { + if let Ok(attrs) = serde_json::from_str::>(attrs_str) { + for (key, value) in attrs { + merged_attributes.insert(key, value); + } } } } - combined_attributes + merged_attributes } +/// convert attributes map to a string +/// and return the string +/// if serialisation fails, return an empty string pub fn fetch_attributes_string(attributes: &Map) -> String { match serde_json::to_string(attributes) { Ok(s) => s, diff --git a/src/otel/traces.rs b/src/otel/traces.rs index e3b61a44f..b85f104e2 100644 --- a/src/otel/traces.rs +++ b/src/otel/traces.rs @@ -343,25 +343,19 @@ fn flatten_span_record(span_record: &Span) -> Vec> { // fetch all other_attributes from the links_json let links_other_attributes = fetch_attributes_from_json(&links_json); span_records_json.extend(links_json); - if !other_attributes.is_empty() { - let mut other_attributes = fetch_attributes_string(&other_attributes); - if !events_other_attributes.is_empty() { - other_attributes.push_str(&events_other_attributes); + // merge all other_attributes from the events_json and links_json + if !other_attributes.is_empty() + || !events_other_attributes.is_empty() + || !links_other_attributes.is_empty() + { + for (key, value) in &events_other_attributes { + other_attributes.insert(key.clone(), value.clone()); } - if !links_other_attributes.is_empty() { - other_attributes.push_str(&links_other_attributes); + for (key, value) in &links_other_attributes { + other_attributes.insert(key.clone(), value.clone()); } - span_record_json.insert( - "other_attributes".to_string(), - Value::String(other_attributes), - ); - } else { - span_record_json.insert( - "other_attributes".to_string(), - Value::String(format!( - "{events_other_attributes} {links_other_attributes}" - )), - ); + let attrs_str = fetch_attributes_string(&other_attributes); + span_record_json.insert("other_attributes".to_string(), Value::String(attrs_str)); } span_record_json.insert( "span_dropped_links_count".to_string(), From b628649ce8854632d717e8a5f2b32ef779cfa20b Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 21 Apr 2025 23:05:02 -0400 Subject: [PATCH 15/18] add comments and refactor --- src/otel/logs.rs | 10 ++-------- src/otel/metrics.rs | 38 +++++++++----------------------------- src/otel/otel_utils.rs | 21 +++++++++++++++++++++ src/otel/traces.rs | 21 ++++----------------- 4 files changed, 36 insertions(+), 54 deletions(-) diff --git a/src/otel/logs.rs b/src/otel/logs.rs index e4f095200..dfd60b8ec 100644 --- a/src/otel/logs.rs +++ b/src/otel/logs.rs @@ -23,9 +23,9 @@ use opentelemetry_proto::tonic::logs::v1::SeverityNumber; use serde_json::Map; use serde_json::Value; +use super::otel_utils::add_other_attributes_if_not_empty; use super::otel_utils::collect_json_from_values; use super::otel_utils::convert_epoch_nano_to_timestamp; -use super::otel_utils::fetch_attributes_string; use super::otel_utils::insert_attributes; use super::otel_utils::merge_attributes_in_json; @@ -107,13 +107,7 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map { ); // Add the `other_attributes` to the log record json - if !other_attributes.is_empty() { - let other_attributes = fetch_attributes_string(&other_attributes); - log_record_json.insert( - "other_attributes".to_string(), - Value::String(other_attributes), - ); - } + add_other_attributes_if_not_empty(&mut log_record_json, &other_attributes); log_record_json } diff --git a/src/otel/metrics.rs b/src/otel/metrics.rs index 902b84cc7..97a27ad74 100644 --- a/src/otel/metrics.rs +++ b/src/otel/metrics.rs @@ -24,7 +24,7 @@ use opentelemetry_proto::tonic::metrics::v1::{ use serde_json::{Map, Value}; use super::otel_utils::{ - convert_epoch_nano_to_timestamp, fetch_attributes_string, insert_attributes, + add_other_attributes_if_not_empty, convert_epoch_nano_to_timestamp, insert_attributes, insert_number_if_some, merge_attributes_in_json, }; @@ -83,13 +83,7 @@ fn flatten_exemplar(exemplars: &[Exemplar]) -> Vec> { } } } - if !other_attributes.is_empty() { - let other_attributes = fetch_attributes_string(&other_attributes); - exemplar_json.insert( - "other_attributes".to_string(), - Value::String(other_attributes), - ); - } + add_other_attributes_if_not_empty(&mut exemplar_json, &other_attributes); exemplar_json }) .collect() @@ -131,11 +125,8 @@ fn flatten_number_data_points(data_points: &[NumberDataPoint]) -> Vec Vec> { data_point_json.insert(key, value); } } - } else if !other_attributes.is_empty() { - data_point_json.insert( - "other_attributes".to_string(), - Value::String(fetch_attributes_string(&other_attributes)), - ); + } else { + add_other_attributes_if_not_empty(&mut data_point_json, &other_attributes); } data_point_json.extend(flatten_data_point_flags(data_point.flags)); @@ -372,11 +360,8 @@ fn flatten_exp_histogram(exp_histogram: &ExponentialHistogram) -> Vec Vec> { ), ); - if !other_attributes.is_empty() { - data_point_json.insert( - "other_attributes".to_string(), - Value::String(fetch_attributes_string(&other_attributes)), - ); - } + add_other_attributes_if_not_empty(&mut data_point_json, &other_attributes); data_points_json.push(data_point_json); } diff --git a/src/otel/otel_utils.rs b/src/otel/otel_utils.rs index b733471b4..18ec64f08 100644 --- a/src/otel/otel_utils.rs +++ b/src/otel/otel_utils.rs @@ -22,6 +22,8 @@ use opentelemetry_proto::tonic::common::v1::{ }; use serde_json::{Map, Value}; +/// Prefixes of attribute keys that should be preserved as individual fields in flattened output. +/// Other attributes will be collected in a separate JSON object under `other_attributes`. const KNOWN_ATTRIBUTES_PREFIX: [&str; 6] = ["http", "url", "service", "os", "host", "telemetry"]; // Value can be one of types - String, Bool, Int, Double, ArrayValue, AnyValue, KeyValueList, Byte @@ -73,6 +75,9 @@ pub fn collect_json_from_value(key: &String, value: OtelValue) -> Map Value { let mut json_array = Vec::new(); for value in array_value.values { @@ -107,6 +112,9 @@ fn collect_json_from_array_value(array_value: ArrayValue) -> Value { Value::Array(json_array) } +/// Recursively converts an OpenTelemetry KeyValueList into a JSON Map +/// The function iterates through the key-value pairs in the list +/// and collects their JSON representations into a single Map fn collect_json_from_key_value_list(key_value_list: KeyValueList) -> Map { let mut kv_list_json: Map = Map::new(); for key_value in key_value_list.values { @@ -273,3 +281,16 @@ pub fn fetch_attributes_string(attributes: &Map) -> String { } } } + +/// add `other_attributes` to the JSON object +/// if `other_attributes` is not empty +/// and return the JSON object +pub fn add_other_attributes_if_not_empty( + json: &mut Map, + other_attributes: &Map, +) { + if !other_attributes.is_empty() { + let attrs_str = fetch_attributes_string(other_attributes); + json.insert("other_attributes".to_string(), Value::String(attrs_str)); + } +} diff --git a/src/otel/traces.rs b/src/otel/traces.rs index b85f104e2..bfb6908e3 100644 --- a/src/otel/traces.rs +++ b/src/otel/traces.rs @@ -24,9 +24,9 @@ use opentelemetry_proto::tonic::trace::v1::Status; use opentelemetry_proto::tonic::trace::v1::TracesData; use serde_json::{Map, Value}; +use super::otel_utils::add_other_attributes_if_not_empty; use super::otel_utils::convert_epoch_nano_to_timestamp; use super::otel_utils::fetch_attributes_from_json; -use super::otel_utils::fetch_attributes_string; use super::otel_utils::insert_attributes; use super::otel_utils::merge_attributes_in_json; @@ -159,13 +159,7 @@ fn flatten_events(events: &[Event]) -> Vec> { Value::Number(event.dropped_attributes_count.into()), ); - if !other_attributes.is_empty() { - let other_attributes = fetch_attributes_string(&other_attributes); - event_json.insert( - "other_attributes".to_string(), - Value::String(other_attributes), - ); - } + add_other_attributes_if_not_empty(&mut event_json, &other_attributes); event_json }) .collect() @@ -195,13 +189,7 @@ fn flatten_links(links: &[Link]) -> Vec> { Value::Number(link.dropped_attributes_count.into()), ); - if !other_attributes.is_empty() { - let other_attributes = fetch_attributes_string(&other_attributes); - link_json.insert( - "other_attributes".to_string(), - Value::String(other_attributes), - ); - } + add_other_attributes_if_not_empty(&mut link_json, &other_attributes); link_json }) .collect() @@ -354,8 +342,7 @@ fn flatten_span_record(span_record: &Span) -> Vec> { for (key, value) in &links_other_attributes { other_attributes.insert(key.clone(), value.clone()); } - let attrs_str = fetch_attributes_string(&other_attributes); - span_record_json.insert("other_attributes".to_string(), Value::String(attrs_str)); + add_other_attributes_if_not_empty(&mut span_record_json, &other_attributes); } span_record_json.insert( "span_dropped_links_count".to_string(), From e9a56d0e4714900197e3c9afa534e0e4ec960679 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 21 Apr 2025 23:25:04 -0400 Subject: [PATCH 16/18] suggestions from coderabbitai --- src/otel/otel_utils.rs | 96 +++++++++++++++++++++++++----------------- src/otel/traces.rs | 6 +-- 2 files changed, 61 insertions(+), 41 deletions(-) diff --git a/src/otel/otel_utils.rs b/src/otel/otel_utils.rs index 18ec64f08..7fab3c2d7 100644 --- a/src/otel/otel_utils.rs +++ b/src/otel/otel_utils.rs @@ -25,7 +25,7 @@ use serde_json::{Map, Value}; /// Prefixes of attribute keys that should be preserved as individual fields in flattened output. /// Other attributes will be collected in a separate JSON object under `other_attributes`. const KNOWN_ATTRIBUTES_PREFIX: [&str; 6] = ["http", "url", "service", "os", "host", "telemetry"]; - +pub const OTHER_ATTRIBUTES_KEY: &str = "other_attributes"; // Value can be one of types - String, Bool, Int, Double, ArrayValue, AnyValue, KeyValueList, Byte pub fn collect_json_from_value(key: &String, value: OtelValue) -> Map { let mut value_json: Map = Map::new(); @@ -45,7 +45,7 @@ pub fn collect_json_from_value(key: &String, value: OtelValue) -> Map { - let json_array_value = collect_json_from_array_value(array_val); + let json_array_value = collect_json_from_array_value(&array_val); // Convert the array to a JSON string let json_array_string = match serde_json::to_string(&json_array_value) { Ok(s) => s, @@ -78,9 +78,9 @@ pub fn collect_json_from_value(key: &String, value: OtelValue) -> Map Value { +fn collect_json_from_array_value(array_value: &ArrayValue) -> Value { let mut json_array = Vec::new(); - for value in array_value.values { + for value in &array_value.values { if let Some(val) = &value.value { match val { OtelValue::StringValue(s) => json_array.push(Value::String(s.clone())), @@ -98,7 +98,7 @@ fn collect_json_from_array_value(array_value: ArrayValue) -> Value { } OtelValue::ArrayValue(arr) => { // Recursively collect JSON from nested array values - let nested_json = collect_json_from_array_value(arr.clone()); + let nested_json = collect_json_from_array_value(arr); json_array.push(nested_json); } OtelValue::KvlistValue(kv_list) => { @@ -162,14 +162,14 @@ pub fn flatten_attributes( let key = &attribute.key; let value = &attribute.value; let value_json = collect_json_from_values(value, &key.to_string()); - for key in value_json.keys() { + for (attr_key, attr_val) in &value_json { if KNOWN_ATTRIBUTES_PREFIX .iter() - .any(|prefix| key.starts_with(prefix)) + .any(|prefix| attr_key.starts_with(prefix)) { - attributes_json.insert(key.to_owned(), value_json[key].to_owned()); + attributes_json.insert(attr_key.clone(), attr_val.clone()); } else { - other_attributes_json.insert(key.to_owned(), value_json[key].to_owned()); + other_attributes_json.insert(attr_key.clone(), attr_val.clone()); } } } @@ -219,38 +219,58 @@ pub fn merge_attributes_in_json( attributes: Map, vec_json: &mut Vec>, ) { - if !attributes.is_empty() { - for json in vec_json { - if let Some(other_attrs) = json.get("other_attributes") { - if let Value::String(attrs_str) = other_attrs { - if let Ok(mut existing_attrs) = - serde_json::from_str::>(attrs_str) - { - for (key, value) in attributes.clone() { - existing_attrs.insert(key, value); - } - if let Ok(merged_str) = serde_json::to_string(&existing_attrs) { - json.insert("other_attributes".to_string(), Value::String(merged_str)); - } - } else if let Ok(attrs_str) = serde_json::to_string(&attributes) { - json.insert("other_attributes".to_string(), Value::String(attrs_str)); - } - } else if let Value::Object(existing_attrs) = other_attrs { - let mut merged_attrs = existing_attrs.clone(); - for (key, value) in attributes.clone() { - merged_attrs.insert(key, value); - } - if let Ok(merged_str) = serde_json::to_string(&merged_attrs) { - json.insert("other_attributes".to_string(), Value::String(merged_str)); - } - } - } else if let Ok(attrs_str) = serde_json::to_string(&attributes) { - json.insert("other_attributes".to_string(), Value::String(attrs_str)); + if attributes.is_empty() { + return; + } + + for json in vec_json { + let merged_attributes = match json.get(OTHER_ATTRIBUTES_KEY) { + Some(Value::String(attrs_str)) => { + merge_with_existing_attributes(&attributes, attrs_str) } + Some(Value::Object(existing_attrs)) => { + merge_with_existing_object(&attributes, existing_attrs) + } + _ => serialize_attributes(&attributes), + }; + + if let Some(merged_str) = merged_attributes { + json.insert(OTHER_ATTRIBUTES_KEY.to_string(), Value::String(merged_str)); } } } +/// Merge attributes with an existing JSON string of attributes +fn merge_with_existing_attributes( + attributes: &Map, + attrs_str: &str, +) -> Option { + if let Ok(mut existing_attrs) = serde_json::from_str::>(attrs_str) { + for (key, value) in attributes { + existing_attrs.insert(key.clone(), value.clone()); + } + return serde_json::to_string(&existing_attrs).ok(); + } + None +} + +/// Merge attributes with an existing JSON object of attributes +fn merge_with_existing_object( + attributes: &Map, + existing_attrs: &Map, +) -> Option { + let mut merged_attrs = existing_attrs.clone(); + for (key, value) in attributes { + merged_attrs.insert(key.clone(), value.clone()); + } + serde_json::to_string(&merged_attrs).ok() +} + +/// Serialize attributes into a JSON string +fn serialize_attributes(attributes: &Map) -> Option { + serde_json::to_string(attributes).ok() +} + /// fetch `other_attributes` from array of JSON objects /// and merge them into a single map /// and return the merged map @@ -258,7 +278,7 @@ pub fn fetch_attributes_from_json(json_arr: &Vec>) -> Map>(attrs_str) { for (key, value) in attrs { merged_attributes.insert(key, value); @@ -291,6 +311,6 @@ pub fn add_other_attributes_if_not_empty( ) { if !other_attributes.is_empty() { let attrs_str = fetch_attributes_string(other_attributes); - json.insert("other_attributes".to_string(), Value::String(attrs_str)); + json.insert(OTHER_ATTRIBUTES_KEY.to_string(), Value::String(attrs_str)); } } diff --git a/src/otel/traces.rs b/src/otel/traces.rs index bfb6908e3..560e8ba11 100644 --- a/src/otel/traces.rs +++ b/src/otel/traces.rs @@ -88,7 +88,7 @@ fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec> { ); } // Add the `other_attributes` to the scope span json - merge_attributes_in_json(other_attributes.clone(), &mut vec_scope_span_json); + merge_attributes_in_json(other_attributes, &mut vec_scope_span_json); vec_scope_span_json } @@ -97,10 +97,10 @@ fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec> { /// and returns a `Vec` of `Value::Object` of the flattened json pub fn flatten_otel_traces(message: &TracesData) -> Vec { let mut vec_otel_json = Vec::new(); - let mut other_attributes = Map::new(); + for record in &message.resource_spans { let mut resource_span_json = Map::new(); - + let mut other_attributes = Map::new(); if let Some(resource) = &record.resource { insert_attributes( &mut resource_span_json, From 1fe1f355bc23f8dc462417fcbcb670afdcfeb696 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 21 Apr 2025 23:32:33 -0400 Subject: [PATCH 17/18] remove unused cloning --- src/otel/traces.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/otel/traces.rs b/src/otel/traces.rs index 560e8ba11..87e4556a8 100644 --- a/src/otel/traces.rs +++ b/src/otel/traces.rs @@ -130,7 +130,7 @@ pub fn flatten_otel_traces(message: &TracesData) -> Vec { } } // Add the `other_attributes` to the resource span json - merge_attributes_in_json(other_attributes.clone(), &mut vec_resource_spans_json); + merge_attributes_in_json(other_attributes, &mut vec_resource_spans_json); vec_otel_json.extend(vec_resource_spans_json); } From bba60f594768f48a645c3e7eaaa6ebf8f29b6fc7 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 21 Apr 2025 23:35:59 -0400 Subject: [PATCH 18/18] error handling --- src/otel/otel_utils.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/otel/otel_utils.rs b/src/otel/otel_utils.rs index 7fab3c2d7..3c37c0e0a 100644 --- a/src/otel/otel_utils.rs +++ b/src/otel/otel_utils.rs @@ -245,13 +245,18 @@ fn merge_with_existing_attributes( attributes: &Map, attrs_str: &str, ) -> Option { - if let Ok(mut existing_attrs) = serde_json::from_str::>(attrs_str) { - for (key, value) in attributes { - existing_attrs.insert(key.clone(), value.clone()); + match serde_json::from_str::>(attrs_str) { + Ok(mut existing_attrs) => { + for (key, value) in attributes { + existing_attrs.insert(key.clone(), value.clone()); + } + serde_json::to_string(&existing_attrs).ok() + } + Err(e) => { + tracing::warn!("failed to deserialize existing attributes: {e}"); + None } - return serde_json::to_string(&existing_attrs).ok(); } - None } /// Merge attributes with an existing JSON object of attributes