Skip to content

Commit 70d8c6d

Browse files
fixed data loss for other_attributes in logs
1 parent bf0bf15 commit 70d8c6d

File tree

1 file changed

+86
-29
lines changed

1 file changed

+86
-29
lines changed

src/otel/logs.rs

Lines changed: 86 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,9 @@ fn flatten_severity(severity_number: i32) -> Map<String, Value> {
5656
/// this function flattens the `LogRecord` object
5757
/// and returns a `Map` of the flattened json
5858
/// this function is called recursively for each log record object in the otel logs
59-
pub fn flatten_log_record(
60-
log_record: &LogRecord,
61-
other_attributes: &mut Map<String, Value>,
62-
) -> Map<String, Value> {
59+
pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {
6360
let mut log_record_json: Map<String, Value> = Map::new();
61+
let mut other_attributes = Map::new();
6462
log_record_json.insert(
6563
"time_unix_nano".to_string(),
6664
Value::String(convert_epoch_nano_to_timestamp(
@@ -86,7 +84,7 @@ pub fn flatten_log_record(
8684
insert_attributes(
8785
&mut log_record_json,
8886
&log_record.attributes,
89-
other_attributes,
87+
&mut other_attributes,
9088
);
9189
log_record_json.insert(
9290
"log_record_dropped_attributes_count".to_string(),
@@ -106,25 +104,41 @@ pub fn flatten_log_record(
106104
Value::String(hex::encode(&log_record.trace_id)),
107105
);
108106

107+
// Add the `other_attributes` to the log record json
108+
if !other_attributes.is_empty() {
109+
let other_attributes = match serde_json::to_string(&other_attributes) {
110+
Ok(s) => s,
111+
Err(e) => {
112+
tracing::warn!("failed to serialise OTEL other_attributes: {e}");
113+
String::default()
114+
}
115+
};
116+
log_record_json.insert(
117+
"other_attributes".to_string(),
118+
Value::String(other_attributes),
119+
);
120+
}
121+
109122
log_record_json
110123
}
111124

112125
/// this function flattens the `ScopeLogs` object
113126
/// and returns a `Vec` of `Map` of the flattened json
114-
fn flatten_scope_log(
115-
scope_log: &ScopeLogs,
116-
other_attributes: &mut Map<String, Value>,
117-
) -> Vec<Map<String, Value>> {
127+
fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec<Map<String, Value>> {
118128
let mut vec_scope_log_json = Vec::new();
119129
let mut scope_log_json = Map::new();
120-
130+
let mut other_attributes = Map::new();
121131
if let Some(scope) = &scope_log.scope {
122132
scope_log_json.insert("scope_name".to_string(), Value::String(scope.name.clone()));
123133
scope_log_json.insert(
124134
"scope_version".to_string(),
125135
Value::String(scope.version.clone()),
126136
);
127-
insert_attributes(&mut scope_log_json, &scope.attributes, other_attributes);
137+
insert_attributes(
138+
&mut scope_log_json,
139+
&scope.attributes,
140+
&mut other_attributes,
141+
);
128142
scope_log_json.insert(
129143
"scope_dropped_attributes_count".to_string(),
130144
Value::Number(scope.dropped_attributes_count.into()),
@@ -136,23 +150,51 @@ fn flatten_scope_log(
136150
);
137151

138152
for log_record in &scope_log.log_records {
139-
let log_record_json = flatten_log_record(log_record, other_attributes);
153+
let log_record_json = flatten_log_record(log_record);
140154
let mut combined_json = scope_log_json.clone();
141155
combined_json.extend(log_record_json);
142156
vec_scope_log_json.push(combined_json);
143157
}
144158

159+
if !other_attributes.is_empty() {
160+
let scope_other_attributes = match serde_json::to_string(&other_attributes) {
161+
Ok(s) => s,
162+
Err(e) => {
163+
tracing::warn!("failed to serialise OTEL other_attributes: {e}");
164+
String::default()
165+
}
166+
};
167+
// append scope_other_attributes to each log record json
168+
for scope_log_json in &mut vec_scope_log_json {
169+
// fetch the other_attributes from the scope log json
170+
if let Some(other_attributes) = scope_log_json.get("other_attributes") {
171+
let other_attributes = other_attributes.as_str().unwrap_or_default();
172+
// append the other_attributes to the scope log json
173+
let other_attributes = format!("{other_attributes}, {scope_other_attributes}");
174+
scope_log_json.insert(
175+
"other_attributes".to_string(),
176+
Value::String(other_attributes),
177+
);
178+
} else {
179+
scope_log_json.insert(
180+
"other_attributes".to_string(),
181+
Value::String(scope_other_attributes.clone()),
182+
);
183+
}
184+
}
185+
}
186+
145187
vec_scope_log_json
146188
}
147189

148190
/// this function performs the custom flattening of the otel logs
149191
/// and returns a `Vec` of `Value::Object` of the flattened json
150192
pub fn flatten_otel_logs(message: &LogsData) -> Vec<Value> {
151193
let mut vec_otel_json = Vec::new();
152-
let mut other_attributes = Map::new();
194+
153195
for record in &message.resource_logs {
154196
let mut resource_log_json = Map::new();
155-
197+
let mut other_attributes = Map::new();
156198
if let Some(resource) = &record.resource {
157199
insert_attributes(
158200
&mut resource_log_json,
@@ -167,7 +209,7 @@ pub fn flatten_otel_logs(message: &LogsData) -> Vec<Value> {
167209

168210
let mut vec_resource_logs_json = Vec::new();
169211
for scope_log in &record.scope_logs {
170-
vec_resource_logs_json.extend(flatten_scope_log(scope_log, &mut other_attributes));
212+
vec_resource_logs_json.extend(flatten_scope_log(scope_log));
171213
}
172214
resource_log_json.insert(
173215
"schema_url".to_string(),
@@ -178,21 +220,36 @@ pub fn flatten_otel_logs(message: &LogsData) -> Vec<Value> {
178220
resource_logs_json.extend(resource_log_json.clone());
179221
}
180222

181-
vec_otel_json.extend(vec_resource_logs_json);
182-
}
183-
// Add common attributes as one attribute in stringified array to each log record
184-
let other_attributes = match serde_json::to_string(&other_attributes) {
185-
Ok(s) => s,
186-
Err(e) => {
187-
tracing::warn!("failed to serialise OTEL other_attributes: {e}");
188-
String::default()
223+
if !other_attributes.is_empty() {
224+
let resource_other_attributes = match serde_json::to_string(&other_attributes) {
225+
Ok(s) => s,
226+
Err(e) => {
227+
tracing::warn!("failed to serialise OTEL other_attributes: {e}");
228+
String::default()
229+
}
230+
};
231+
// append scope_other_attributes to each log record json
232+
for resource_logs_json in &mut vec_resource_logs_json {
233+
// fetch the other_attributes from the scope log json
234+
if let Some(other_attributes) = resource_logs_json.get("other_attributes") {
235+
let other_attributes = other_attributes.as_str().unwrap_or_default();
236+
// append the other_attributes to the scope log json
237+
let other_attributes =
238+
format!("{other_attributes}, {resource_other_attributes}");
239+
resource_logs_json.insert(
240+
"other_attributes".to_string(),
241+
Value::String(other_attributes),
242+
);
243+
} else {
244+
resource_logs_json.insert(
245+
"other_attributes".to_string(),
246+
Value::String(resource_other_attributes.clone()),
247+
);
248+
}
249+
}
189250
}
190-
};
191-
for log_record_json in &mut vec_otel_json {
192-
log_record_json.insert(
193-
"other_attributes".to_string(),
194-
Value::String(other_attributes.clone()),
195-
);
251+
252+
vec_otel_json.extend(vec_resource_logs_json);
196253
}
197254
vec_otel_json.into_iter().map(Value::Object).collect()
198255
}

0 commit comments

Comments
 (0)