Skip to content

Commit dc6f4d3

Browse files
refactor and changes to logs and traces attributes handling
1 parent 70d8c6d commit dc6f4d3

File tree

3 files changed

+122
-103
lines changed

3 files changed

+122
-103
lines changed

src/otel/logs.rs

Lines changed: 7 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ use serde_json::Value;
2525

2626
use super::otel_utils::collect_json_from_values;
2727
use super::otel_utils::convert_epoch_nano_to_timestamp;
28+
use super::otel_utils::fetch_attributes_string;
2829
use super::otel_utils::insert_attributes;
30+
use super::otel_utils::merge_attributes_in_json;
2931

3032
pub const OTEL_LOG_KNOWN_FIELD_LIST: [&str; 6] = [
3133
"time_unix_nano",
@@ -106,13 +108,7 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {
106108

107109
// Add the `other_attributes` to the log record json
108110
if !other_attributes.is_empty() {
109-
let other_attributes = match serde_json::to_string(&other_attributes) {
110-
Ok(s) => s,
111-
Err(e) => {
112-
tracing::warn!("failed to serialise OTEL other_attributes: {e}");
113-
String::default()
114-
}
115-
};
111+
let other_attributes = fetch_attributes_string(&other_attributes);
116112
log_record_json.insert(
117113
"other_attributes".to_string(),
118114
Value::String(other_attributes),
@@ -156,33 +152,8 @@ fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec<Map<String, Value>> {
156152
vec_scope_log_json.push(combined_json);
157153
}
158154

159-
if !other_attributes.is_empty() {
160-
let scope_other_attributes = match serde_json::to_string(&other_attributes) {
161-
Ok(s) => s,
162-
Err(e) => {
163-
tracing::warn!("failed to serialise OTEL other_attributes: {e}");
164-
String::default()
165-
}
166-
};
167-
// append scope_other_attributes to each log record json
168-
for scope_log_json in &mut vec_scope_log_json {
169-
// fetch the other_attributes from the scope log json
170-
if let Some(other_attributes) = scope_log_json.get("other_attributes") {
171-
let other_attributes = other_attributes.as_str().unwrap_or_default();
172-
// append the other_attributes to the scope log json
173-
let other_attributes = format!("{other_attributes}, {scope_other_attributes}");
174-
scope_log_json.insert(
175-
"other_attributes".to_string(),
176-
Value::String(other_attributes),
177-
);
178-
} else {
179-
scope_log_json.insert(
180-
"other_attributes".to_string(),
181-
Value::String(scope_other_attributes.clone()),
182-
);
183-
}
184-
}
185-
}
155+
// Add the `other_attributes` to the scope log json
156+
merge_attributes_in_json(other_attributes, &mut vec_scope_log_json);
186157

187158
vec_scope_log_json
188159
}
@@ -220,34 +191,8 @@ pub fn flatten_otel_logs(message: &LogsData) -> Vec<Value> {
220191
resource_logs_json.extend(resource_log_json.clone());
221192
}
222193

223-
if !other_attributes.is_empty() {
224-
let resource_other_attributes = match serde_json::to_string(&other_attributes) {
225-
Ok(s) => s,
226-
Err(e) => {
227-
tracing::warn!("failed to serialise OTEL other_attributes: {e}");
228-
String::default()
229-
}
230-
};
231-
// append scope_other_attributes to each log record json
232-
for resource_logs_json in &mut vec_resource_logs_json {
233-
// fetch the other_attributes from the scope log json
234-
if let Some(other_attributes) = resource_logs_json.get("other_attributes") {
235-
let other_attributes = other_attributes.as_str().unwrap_or_default();
236-
// append the other_attributes to the scope log json
237-
let other_attributes =
238-
format!("{other_attributes}, {resource_other_attributes}");
239-
resource_logs_json.insert(
240-
"other_attributes".to_string(),
241-
Value::String(other_attributes),
242-
);
243-
} else {
244-
resource_logs_json.insert(
245-
"other_attributes".to_string(),
246-
Value::String(resource_other_attributes.clone()),
247-
);
248-
}
249-
}
250-
}
194+
// Add the `other_attributes` to the resource log json
195+
merge_attributes_in_json(other_attributes, &mut vec_resource_logs_json);
251196

252197
vec_otel_json.extend(vec_resource_logs_json);
253198
}

src/otel/otel_utils.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,3 +194,51 @@ pub fn convert_epoch_nano_to_timestamp(epoch_ns: i64) -> String {
194194
let dt = DateTime::from_timestamp_nanos(epoch_ns).naive_utc();
195195
dt.format("%Y-%m-%dT%H:%M:%S%.6fZ").to_string()
196196
}
197+
198+
pub fn merge_attributes_in_json(
199+
attributes: Map<String, Value>,
200+
vec_json: &mut Vec<Map<String, Value>>,
201+
) {
202+
if !attributes.is_empty() {
203+
let attributes = fetch_attributes_string(&attributes);
204+
for json in vec_json {
205+
if json.contains_key("other_attributes") {
206+
let other_attributes = json.get_mut("other_attributes").unwrap();
207+
let other_attributes = other_attributes.as_str().unwrap_or_default();
208+
// append the other_attributes to the scope log json
209+
let other_attributes = format!("{other_attributes}, {attributes}");
210+
json.insert(
211+
"other_attributes".to_string(),
212+
Value::String(other_attributes),
213+
);
214+
} else {
215+
json.insert(
216+
"other_attributes".to_string(),
217+
Value::String(attributes.clone()),
218+
);
219+
}
220+
}
221+
}
222+
}
223+
224+
pub fn fetch_attributes_from_json(json_arr: &Vec<Map<String, Value>>) -> String {
225+
let mut combined_attributes = String::default();
226+
for json in json_arr {
227+
if let Some(other_attributes) = json.get("other_attributes") {
228+
if let Some(other_attributes) = other_attributes.as_str() {
229+
combined_attributes.push_str(other_attributes);
230+
}
231+
}
232+
}
233+
combined_attributes
234+
}
235+
236+
pub fn fetch_attributes_string(attributes: &Map<String, Value>) -> String {
237+
match serde_json::to_string(attributes) {
238+
Ok(s) => s,
239+
Err(e) => {
240+
tracing::warn!("failed to serialise OTEL other_attributes: {e}");
241+
String::default()
242+
}
243+
}
244+
}

src/otel/traces.rs

Lines changed: 67 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@ use opentelemetry_proto::tonic::trace::v1::TracesData;
2525
use serde_json::{Map, Value};
2626

2727
use super::otel_utils::convert_epoch_nano_to_timestamp;
28+
use super::otel_utils::fetch_attributes_from_json;
29+
use super::otel_utils::fetch_attributes_string;
2830
use super::otel_utils::insert_attributes;
31+
use super::otel_utils::merge_attributes_in_json;
2932

3033
pub const OTEL_TRACES_KNOWN_FIELD_LIST: [&str; 15] = [
3134
"span_trace_id",
@@ -46,15 +49,12 @@ pub const OTEL_TRACES_KNOWN_FIELD_LIST: [&str; 15] = [
4649
];
4750
/// this function flattens the `ScopeSpans` object
4851
/// and returns a `Vec` of `Map` of the flattened json
49-
fn flatten_scope_span(
50-
scope_span: &ScopeSpans,
51-
other_attributes: &mut Map<String, Value>,
52-
) -> Vec<Map<String, Value>> {
52+
fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec<Map<String, Value>> {
5353
let mut vec_scope_span_json = Vec::new();
5454
let mut scope_span_json = Map::new();
55-
55+
let mut other_attributes = Map::new();
5656
for span in &scope_span.spans {
57-
let span_record_json = flatten_span_record(span, other_attributes);
57+
let span_record_json = flatten_span_record(span);
5858
vec_scope_span_json.extend(span_record_json);
5959
}
6060

@@ -64,7 +64,11 @@ fn flatten_scope_span(
6464
"scope_version".to_string(),
6565
Value::String(scope.version.clone()),
6666
);
67-
insert_attributes(&mut scope_span_json, &scope.attributes, other_attributes);
67+
insert_attributes(
68+
&mut scope_span_json,
69+
&scope.attributes,
70+
&mut other_attributes,
71+
);
6872
scope_span_json.insert(
6973
"scope_dropped_attributes_count".to_string(),
7074
Value::Number(scope.dropped_attributes_count.into()),
@@ -83,6 +87,8 @@ fn flatten_scope_span(
8387
Value::String(scope_span.schema_url.clone()),
8488
);
8589
}
90+
// Add the `other_attributes` to the scope span json
91+
merge_attributes_in_json(other_attributes.clone(), &mut vec_scope_span_json);
8692

8793
vec_scope_span_json
8894
}
@@ -109,7 +115,7 @@ pub fn flatten_otel_traces(message: &TracesData) -> Vec<Value> {
109115

110116
let mut vec_resource_spans_json = Vec::new();
111117
for scope_span in &record.scope_spans {
112-
let scope_span_json = flatten_scope_span(scope_span, &mut other_attributes);
118+
let scope_span_json = flatten_scope_span(scope_span);
113119
vec_resource_spans_json.extend(scope_span_json);
114120
}
115121

@@ -123,49 +129,43 @@ pub fn flatten_otel_traces(message: &TracesData) -> Vec<Value> {
123129
resource_spans_json.insert(key.clone(), value.clone());
124130
}
125131
}
126-
132+
// Add the `other_attributes` to the resource span json
133+
merge_attributes_in_json(other_attributes.clone(), &mut vec_resource_spans_json);
127134
vec_otel_json.extend(vec_resource_spans_json);
128135
}
129-
// Add common attributes as one attribute in stringified array to each span record
130-
let other_attributes = match serde_json::to_string(&other_attributes) {
131-
Ok(s) => s,
132-
Err(e) => {
133-
tracing::warn!("failed to serialise OTEL other_attributes: {e}");
134-
String::default()
135-
}
136-
};
137-
for span_record_json in &mut vec_otel_json {
138-
span_record_json.insert(
139-
"other_attributes".to_string(),
140-
Value::String(other_attributes.clone()),
141-
);
142-
}
136+
143137
vec_otel_json.into_iter().map(Value::Object).collect()
144138
}
145139

146140
/// otel traces has json array of events
147141
/// this function flattens the `Event` object
148142
/// and returns a `Vec` of `Map` of the flattened json
149-
fn flatten_events(
150-
events: &[Event],
151-
other_attributes: &mut Map<String, Value>,
152-
) -> Vec<Map<String, Value>> {
143+
fn flatten_events(events: &[Event]) -> Vec<Map<String, Value>> {
153144
events
154145
.iter()
155146
.map(|event| {
156147
let mut event_json = Map::new();
148+
let mut other_attributes = Map::new();
157149
event_json.insert(
158150
"event_time_unix_nano".to_string(),
159151
Value::String(
160152
convert_epoch_nano_to_timestamp(event.time_unix_nano as i64).to_string(),
161153
),
162154
);
163155
event_json.insert("event_name".to_string(), Value::String(event.name.clone()));
164-
insert_attributes(&mut event_json, &event.attributes, other_attributes);
156+
insert_attributes(&mut event_json, &event.attributes, &mut other_attributes);
165157
event_json.insert(
166158
"event_dropped_attributes_count".to_string(),
167159
Value::Number(event.dropped_attributes_count.into()),
168160
);
161+
162+
if !other_attributes.is_empty() {
163+
let other_attributes = fetch_attributes_string(&other_attributes);
164+
event_json.insert(
165+
"other_attributes".to_string(),
166+
Value::String(other_attributes),
167+
);
168+
}
169169
event_json
170170
})
171171
.collect()
@@ -174,14 +174,12 @@ fn flatten_events(
174174
/// otel traces has json array of links
175175
/// this function flattens the `Link` object
176176
/// and returns a `Vec` of `Map` of the flattened json
177-
fn flatten_links(
178-
links: &[Link],
179-
other_attributes: &mut Map<String, Value>,
180-
) -> Vec<Map<String, Value>> {
177+
fn flatten_links(links: &[Link]) -> Vec<Map<String, Value>> {
181178
links
182179
.iter()
183180
.map(|link| {
184181
let mut link_json = Map::new();
182+
let mut other_attributes = Map::new();
185183
link_json.insert(
186184
"link_span_id".to_string(),
187185
Value::String(hex::encode(&link.span_id)),
@@ -191,11 +189,19 @@ fn flatten_links(
191189
Value::String(hex::encode(&link.trace_id)),
192190
);
193191

194-
insert_attributes(&mut link_json, &link.attributes, other_attributes);
192+
insert_attributes(&mut link_json, &link.attributes, &mut other_attributes);
195193
link_json.insert(
196194
"link_dropped_attributes_count".to_string(),
197195
Value::Number(link.dropped_attributes_count.into()),
198196
);
197+
198+
if !other_attributes.is_empty() {
199+
let other_attributes = fetch_attributes_string(&other_attributes);
200+
link_json.insert(
201+
"other_attributes".to_string(),
202+
Value::String(other_attributes),
203+
);
204+
}
199205
link_json
200206
})
201207
.collect()
@@ -278,12 +284,9 @@ fn flatten_kind(kind: i32) -> Map<String, Value> {
278284
/// this function flattens the `Span` object
279285
/// and returns a `Vec` of `Map` of the flattened json
280286
/// this function is called recursively for each span record object in the otel traces event
281-
fn flatten_span_record(
282-
span_record: &Span,
283-
other_attributes: &mut Map<String, Value>,
284-
) -> Vec<Map<String, Value>> {
287+
fn flatten_span_record(span_record: &Span) -> Vec<Map<String, Value>> {
285288
let mut span_records_json = Vec::new();
286-
289+
let mut other_attributes = Map::new();
287290
let mut span_record_json = Map::new();
288291
span_record_json.insert(
289292
"span_trace_id".to_string(),
@@ -322,18 +325,41 @@ fn flatten_span_record(
322325
insert_attributes(
323326
&mut span_record_json,
324327
&span_record.attributes,
325-
other_attributes,
328+
&mut other_attributes,
326329
);
327330
span_record_json.insert(
328331
"span_dropped_attributes_count".to_string(),
329332
Value::Number(span_record.dropped_attributes_count.into()),
330333
);
331-
span_records_json.extend(flatten_events(&span_record.events, other_attributes));
334+
let events_json = flatten_events(&span_record.events);
335+
// fetch all other_attributes from the events_json
336+
let events_other_attributes = fetch_attributes_from_json(&events_json);
337+
span_records_json.extend(events_json);
332338
span_record_json.insert(
333339
"span_dropped_events_count".to_string(),
334340
Value::Number(span_record.dropped_events_count.into()),
335341
);
336-
span_records_json.extend(flatten_links(&span_record.links, other_attributes));
342+
let links_json = flatten_links(&span_record.links);
343+
// fetch all other_attributes from the links_json
344+
let links_other_attributes = fetch_attributes_from_json(&links_json);
345+
span_records_json.extend(links_json);
346+
if !other_attributes.is_empty() {
347+
let other_attributes = fetch_attributes_string(&other_attributes);
348+
span_record_json.insert(
349+
"other_attributes".to_string(),
350+
Value::String(format!(
351+
"{other_attributes} {events_other_attributes} {links_other_attributes}"
352+
)),
353+
);
354+
} else {
355+
span_record_json.insert(
356+
"other_attributes".to_string(),
357+
Value::String(format!(
358+
"{events_other_attributes} {links_other_attributes}"
359+
)),
360+
);
361+
}
362+
337363
span_record_json.insert(
338364
"span_dropped_links_count".to_string(),
339365
Value::Number(span_record.dropped_links_count.into()),

0 commit comments

Comments
 (0)