Skip to content

Commit 6a0fc57

Browse files
attributes in otel metrics
1 parent dc6f4d3 commit 6a0fc57

File tree

3 files changed

+105
-87
lines changed

3 files changed

+105
-87
lines changed

src/otel/metrics.rs

Lines changed: 83 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ use opentelemetry_proto::tonic::metrics::v1::{
2424
use serde_json::{Map, Value};
2525

2626
use super::otel_utils::{
27-
convert_epoch_nano_to_timestamp, insert_attributes, insert_number_if_some,
27+
convert_epoch_nano_to_timestamp, fetch_attributes_string, insert_attributes,
28+
insert_number_if_some, merge_attributes_in_json,
2829
};
2930

3031
pub const OTEL_METRICS_KNOWN_FIELD_LIST: [&str; 5] = [
@@ -39,18 +40,16 @@ pub const OTEL_METRICS_KNOWN_FIELD_LIST: [&str; 5] = [
3940
/// this function flatten the exemplar json array
4041
/// and returns a `Map` of the exemplar json
4142
/// this function is reused in all json objects that have exemplar
42-
fn flatten_exemplar(
43-
exemplars: &[Exemplar],
44-
other_attributes: &mut Map<String, Value>,
45-
) -> Vec<Map<String, Value>> {
43+
fn flatten_exemplar(exemplars: &[Exemplar]) -> Vec<Map<String, Value>> {
4644
exemplars
4745
.iter()
4846
.map(|exemplar| {
4947
let mut exemplar_json = Map::new();
48+
let mut other_attributes = Map::new();
5049
insert_attributes(
5150
&mut exemplar_json,
5251
&exemplar.filtered_attributes,
53-
other_attributes,
52+
&mut other_attributes,
5453
);
5554
exemplar_json.insert(
5655
"exemplar_time_unix_nano".to_string(),
@@ -84,6 +83,13 @@ fn flatten_exemplar(
8483
}
8584
}
8685
}
86+
if !other_attributes.is_empty() {
87+
let other_attributes = fetch_attributes_string(&other_attributes);
88+
exemplar_json.insert(
89+
"other_attributes".to_string(),
90+
Value::String(other_attributes),
91+
);
92+
}
8793
exemplar_json
8894
})
8995
.collect()
@@ -93,18 +99,16 @@ fn flatten_exemplar(
9399
/// this function flatten the number data points json array
94100
/// and returns a `Vec` of `Map` of the flattened json
95101
/// this function is reused in all json objects that have number data points
96-
fn flatten_number_data_points(
97-
data_points: &[NumberDataPoint],
98-
other_attributes: &mut Map<String, Value>,
99-
) -> Vec<Map<String, Value>> {
102+
fn flatten_number_data_points(data_points: &[NumberDataPoint]) -> Vec<Map<String, Value>> {
100103
data_points
101104
.iter()
102105
.map(|data_point| {
103106
let mut data_point_json = Map::new();
107+
let mut other_attributes = Map::new();
104108
insert_attributes(
105109
&mut data_point_json,
106110
&data_point.attributes,
107-
other_attributes,
111+
&mut other_attributes,
108112
);
109113
data_point_json.insert(
110114
"start_time_unix_nano".to_string(),
@@ -119,11 +123,19 @@ fn flatten_number_data_points(
119123
)),
120124
);
121125

122-
let exemplar_json = flatten_exemplar(&data_point.exemplars, other_attributes);
123-
for exemplar in exemplar_json {
124-
for (key, value) in exemplar {
125-
data_point_json.insert(key, value);
126+
let mut exemplar_json = flatten_exemplar(&data_point.exemplars);
127+
if !exemplar_json.is_empty() {
128+
merge_attributes_in_json(other_attributes, &mut exemplar_json);
129+
for exemplar in exemplar_json {
130+
for (key, value) in exemplar {
131+
data_point_json.insert(key, value);
132+
}
126133
}
134+
} else {
135+
data_point_json.insert(
136+
"other_attributes".to_string(),
137+
Value::String(fetch_attributes_string(&other_attributes)),
138+
);
127139
}
128140

129141
data_point_json.extend(flatten_data_point_flags(data_point.flags));
@@ -154,12 +166,10 @@ fn flatten_number_data_points(
154166
/// each gauge object has json array for data points
155167
/// this function flatten the gauge json object
156168
/// and returns a `Vec` of `Map` for each data point
157-
fn flatten_gauge(
158-
gauge: &Gauge,
159-
other_attributes: &mut Map<String, Value>,
160-
) -> Vec<Map<String, Value>> {
169+
fn flatten_gauge(gauge: &Gauge) -> Vec<Map<String, Value>> {
161170
let mut vec_gauge_json = Vec::new();
162-
let data_points_json = flatten_number_data_points(&gauge.data_points, other_attributes);
171+
let data_points_json = flatten_number_data_points(&gauge.data_points);
172+
163173
for data_point_json in data_points_json {
164174
let mut gauge_json = Map::new();
165175
for (key, value) in &data_point_json {
@@ -174,9 +184,9 @@ fn flatten_gauge(
174184
/// each sum object has json array for data points
175185
/// this function flatten the sum json object
176186
/// and returns a `Vec` of `Map` for each data point
177-
fn flatten_sum(sum: &Sum, other_attributes: &mut Map<String, Value>) -> Vec<Map<String, Value>> {
187+
fn flatten_sum(sum: &Sum) -> Vec<Map<String, Value>> {
178188
let mut vec_sum_json = Vec::new();
179-
let data_points_json = flatten_number_data_points(&sum.data_points, other_attributes);
189+
let data_points_json = flatten_number_data_points(&sum.data_points);
180190
for data_point_json in data_points_json {
181191
let mut sum_json = Map::new();
182192
for (key, value) in &data_point_json {
@@ -199,17 +209,15 @@ fn flatten_sum(sum: &Sum, other_attributes: &mut Map<String, Value>) -> Vec<Map<
199209
/// each histogram object has json array for data points
200210
/// this function flatten the histogram json object
201211
/// and returns a `Vec` of `Map` for each data point
202-
fn flatten_histogram(
203-
histogram: &Histogram,
204-
other_attributes: &mut Map<String, Value>,
205-
) -> Vec<Map<String, Value>> {
212+
fn flatten_histogram(histogram: &Histogram) -> Vec<Map<String, Value>> {
206213
let mut data_points_json = Vec::new();
207214
for data_point in &histogram.data_points {
208215
let mut data_point_json = Map::new();
216+
let mut other_attributes = Map::new();
209217
insert_attributes(
210218
&mut data_point_json,
211219
&data_point.attributes,
212-
other_attributes,
220+
&mut other_attributes,
213221
);
214222
data_point_json.insert(
215223
"start_time_unix_nano".to_string(),
@@ -254,11 +262,19 @@ fn flatten_histogram(
254262
"data_point_explicit_bounds".to_string(),
255263
data_point_explicit_bounds,
256264
);
257-
let exemplar_json = flatten_exemplar(&data_point.exemplars, other_attributes);
258-
for exemplar in exemplar_json {
259-
for (key, value) in exemplar {
260-
data_point_json.insert(key, value);
265+
let mut exemplar_json = flatten_exemplar(&data_point.exemplars);
266+
if !exemplar_json.is_empty() {
267+
merge_attributes_in_json(other_attributes, &mut exemplar_json);
268+
for exemplar in exemplar_json {
269+
for (key, value) in exemplar {
270+
data_point_json.insert(key, value);
271+
}
261272
}
273+
} else {
274+
data_point_json.insert(
275+
"other_attributes".to_string(),
276+
Value::String(fetch_attributes_string(&other_attributes)),
277+
);
262278
}
263279

264280
data_point_json.extend(flatten_data_point_flags(data_point.flags));
@@ -301,17 +317,15 @@ fn flatten_buckets(bucket: &Buckets) -> Map<String, Value> {
301317
/// each exponential histogram object has json array for data points
302318
/// this function flatten the exponential histogram json object
303319
/// and returns a `Vec` of `Map` for each data point
304-
fn flatten_exp_histogram(
305-
exp_histogram: &ExponentialHistogram,
306-
other_attributes: &mut Map<String, Value>,
307-
) -> Vec<Map<String, Value>> {
320+
fn flatten_exp_histogram(exp_histogram: &ExponentialHistogram) -> Vec<Map<String, Value>> {
308321
let mut data_points_json = Vec::new();
309322
for data_point in &exp_histogram.data_points {
310323
let mut data_point_json = Map::new();
324+
let mut other_attributes = Map::new();
311325
insert_attributes(
312326
&mut data_point_json,
313327
&data_point.attributes,
314-
other_attributes,
328+
&mut other_attributes,
315329
);
316330
data_point_json.insert(
317331
"start_time_unix_nano".to_string(),
@@ -350,11 +364,19 @@ fn flatten_exp_histogram(
350364
data_point_json.insert(format!("negative_{}", key), value);
351365
}
352366
}
353-
let exemplar_json = flatten_exemplar(&data_point.exemplars, other_attributes);
354-
for exemplar in exemplar_json {
355-
for (key, value) in exemplar {
356-
data_point_json.insert(key, value);
367+
let mut exemplar_json = flatten_exemplar(&data_point.exemplars);
368+
if !exemplar_json.is_empty() {
369+
merge_attributes_in_json(other_attributes, &mut exemplar_json);
370+
for exemplar in exemplar_json {
371+
for (key, value) in exemplar {
372+
data_point_json.insert(key, value);
373+
}
357374
}
375+
} else {
376+
data_point_json.insert(
377+
"other_attributes".to_string(),
378+
Value::String(fetch_attributes_string(&other_attributes)),
379+
);
358380
}
359381

360382
data_points_json.push(data_point_json);
@@ -375,17 +397,15 @@ fn flatten_exp_histogram(
375397
/// each summary object has json array for data points
376398
/// this function flatten the summary json object
377399
/// and returns a `Vec` of `Map` for each data point
378-
fn flatten_summary(
379-
summary: &Summary,
380-
other_attributes: &mut Map<String, Value>,
381-
) -> Vec<Map<String, Value>> {
400+
fn flatten_summary(summary: &Summary) -> Vec<Map<String, Value>> {
382401
let mut data_points_json = Vec::new();
383402
for data_point in &summary.data_points {
384403
let mut data_point_json = Map::new();
404+
let mut other_attributes = Map::new();
385405
insert_attributes(
386406
&mut data_point_json,
387407
&data_point.attributes,
388-
other_attributes,
408+
&mut other_attributes,
389409
);
390410
data_point_json.insert(
391411
"start_time_unix_nano".to_string(),
@@ -449,33 +469,31 @@ fn flatten_summary(
449469
/// this function flatten the metric json object
450470
/// and returns a `Vec` of `Map` of the flattened json
451471
/// this function is called recursively for each metric record object in the otel metrics event
452-
pub fn flatten_metrics_record(
453-
metrics_record: &Metric,
454-
other_attributes: &mut Map<String, Value>,
455-
) -> Vec<Map<String, Value>> {
472+
pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec<Map<String, Value>> {
456473
let mut data_points_json = Vec::new();
457474
let mut metric_json = Map::new();
475+
let mut other_attributes = Map::new();
458476
let mut metric_type = String::default();
459477
match &metrics_record.data {
460478
Some(metric::Data::Gauge(gauge)) => {
461479
metric_type = "gauge".to_string();
462-
data_points_json.extend(flatten_gauge(gauge, other_attributes));
480+
data_points_json.extend(flatten_gauge(gauge));
463481
}
464482
Some(metric::Data::Sum(sum)) => {
465483
metric_type = "sum".to_string();
466-
data_points_json.extend(flatten_sum(sum, other_attributes));
484+
data_points_json.extend(flatten_sum(sum));
467485
}
468486
Some(metric::Data::Histogram(histogram)) => {
469487
metric_type = "histogram".to_string();
470-
data_points_json.extend(flatten_histogram(histogram, other_attributes));
488+
data_points_json.extend(flatten_histogram(histogram));
471489
}
472490
Some(metric::Data::ExponentialHistogram(exp_histogram)) => {
473491
metric_type = "exponential_histogram".to_string();
474-
data_points_json.extend(flatten_exp_histogram(exp_histogram, other_attributes));
492+
data_points_json.extend(flatten_exp_histogram(exp_histogram));
475493
}
476494
Some(metric::Data::Summary(summary)) => {
477495
metric_type = "summary".to_string();
478-
data_points_json.extend(flatten_summary(summary, other_attributes));
496+
data_points_json.extend(flatten_summary(summary));
479497
}
480498
None => {}
481499
}
@@ -492,7 +510,11 @@ pub fn flatten_metrics_record(
492510
Value::String(metrics_record.unit.clone()),
493511
);
494512
metric_json.insert("metric_type".to_string(), Value::String(metric_type));
495-
insert_attributes(&mut metric_json, &metrics_record.metadata, other_attributes);
513+
insert_attributes(
514+
&mut metric_json,
515+
&metrics_record.metadata,
516+
&mut other_attributes,
517+
);
496518
for data_point_json in &mut data_points_json {
497519
for (key, value) in &metric_json {
498520
data_point_json.insert(key.clone(), value.clone());
@@ -501,16 +523,17 @@ pub fn flatten_metrics_record(
501523
if data_points_json.is_empty() {
502524
data_points_json.push(metric_json);
503525
}
526+
merge_attributes_in_json(other_attributes, &mut data_points_json);
504527
data_points_json
505528
}
506529

507530
/// this function performs the custom flattening of the otel metrics
508531
/// and returns a `Vec` of `Value::Object` of the flattened json
509532
pub fn flatten_otel_metrics(message: MetricsData) -> Vec<Value> {
510533
let mut vec_otel_json = Vec::new();
511-
let mut other_attributes = Map::new();
512534
for record in &message.resource_metrics {
513535
let mut resource_metrics_json = Map::new();
536+
let mut other_attributes = Map::new();
514537
if let Some(resource) = &record.resource {
515538
insert_attributes(
516539
&mut resource_metrics_json,
@@ -525,11 +548,9 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Vec<Value> {
525548
let mut vec_scope_metrics_json = Vec::new();
526549
for scope_metric in &record.scope_metrics {
527550
let mut scope_metrics_json = Map::new();
551+
let mut other_attributes = Map::new();
528552
for metrics_record in &scope_metric.metrics {
529-
vec_scope_metrics_json.extend(flatten_metrics_record(
530-
metrics_record,
531-
&mut other_attributes,
532-
));
553+
vec_scope_metrics_json.extend(flatten_metrics_record(metrics_record));
533554
}
534555
if let Some(scope) = &scope_metric.scope {
535556
scope_metrics_json
@@ -558,6 +579,7 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Vec<Value> {
558579
scope_metric_json.insert(key.clone(), value.clone());
559580
}
560581
}
582+
merge_attributes_in_json(other_attributes, &mut vec_scope_metrics_json);
561583
}
562584
resource_metrics_json.insert(
563585
"resource_metrics_schema_url".to_string(),
@@ -568,22 +590,9 @@ pub fn flatten_otel_metrics(message: MetricsData) -> Vec<Value> {
568590
resource_metric_json.insert(key.clone(), value.clone());
569591
}
570592
}
593+
merge_attributes_in_json(other_attributes, &mut vec_scope_metrics_json);
571594
vec_otel_json.extend(vec_scope_metrics_json);
572595
}
573-
// Add common attributes as one attribute in stringified array to each metric record
574-
let other_attributes = match serde_json::to_string(&other_attributes) {
575-
Ok(s) => s,
576-
Err(e) => {
577-
tracing::warn!("failed to serialise OTEL other_attributes: {e}");
578-
String::default()
579-
}
580-
};
581-
for metric_record_json in &mut vec_otel_json {
582-
metric_record_json.insert(
583-
"other_attributes".to_string(),
584-
Value::String(other_attributes.clone()),
585-
);
586-
}
587596
vec_otel_json.into_iter().map(Value::Object).collect()
588597
}
589598

src/otel/otel_utils.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,13 @@ pub fn collect_json_from_value(key: &String, value: OtelValue) -> Map<String, Va
4545
OtelValue::ArrayValue(array_val) => {
4646
let json_array_value = collect_json_from_array_value(array_val);
4747
// Convert the array to a JSON string
48-
let json_array_string = serde_json::to_string(&json_array_value).unwrap();
48+
let json_array_string = match serde_json::to_string(&json_array_value) {
49+
Ok(s) => s,
50+
Err(e) => {
51+
tracing::warn!("failed to serialise array value: {e}");
52+
String::default()
53+
}
54+
};
4955
// Insert the array into the result map
5056
value_json.insert(key.to_string(), Value::String(json_array_string));
5157
}
@@ -105,9 +111,12 @@ fn collect_json_from_key_value_list(key_value_list: KeyValueList) -> Map<String,
105111
let mut kv_list_json: Map<String, Value> = Map::new();
106112
for key_value in key_value_list.values {
107113
if let Some(val) = key_value.value {
108-
let val = val.value.unwrap();
109-
let json_value = collect_json_from_value(&key_value.key, val);
110-
kv_list_json.extend(json_value);
114+
if let Some(val) = val.value {
115+
let json_value = collect_json_from_value(&key_value.key, val);
116+
kv_list_json.extend(json_value);
117+
} else {
118+
tracing::warn!("Key '{}' has no value in key-value list", key_value.key);
119+
}
111120
}
112121
}
113122
kv_list_json
@@ -206,7 +215,7 @@ pub fn merge_attributes_in_json(
206215
let other_attributes = json.get_mut("other_attributes").unwrap();
207216
let other_attributes = other_attributes.as_str().unwrap_or_default();
208217
// append the other_attributes to the scope log json
209-
let other_attributes = format!("{other_attributes}, {attributes}");
218+
let other_attributes = format!("{other_attributes} {attributes}");
210219
json.insert(
211220
"other_attributes".to_string(),
212221
Value::String(other_attributes),

0 commit comments

Comments
 (0)