Skip to content

Commit 02deb30

Browse files
fix: otel log ingestion with all available fields
issue: log ingested did not have all the data few fields missed in flattening the multi-level hierarchical structure of the OTEL log flattening issue has been fixed with this PR tested and verified that all the data in the log event has been ingested successfully change: fix the flattening
1 parent 7c37afa commit 02deb30

File tree

1 file changed

+121
-118
lines changed

1 file changed

+121
-118
lines changed

server/src/handlers/http/otel.rs

Lines changed: 121 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818

1919
use bytes::Bytes;
20+
use proto::common::v1::KeyValue;
21+
use proto::logs::v1::LogRecord;
2022
use serde_json::Value;
2123
mod proto;
2224
use crate::handlers::http::otel::proto::logs::v1::LogRecordFlags;
@@ -127,6 +129,116 @@ fn value_to_string(value: serde_json::Value) -> String {
127129
}
128130
}
129131

132+
pub fn flatten_attributes(
133+
attributes: &Vec<KeyValue>,
134+
attribute_source_key: String,
135+
) -> BTreeMap<String, Value> {
136+
let mut attributes_json: BTreeMap<String, Value> = BTreeMap::new();
137+
for attribute in attributes {
138+
let key = &attribute.key;
139+
let value = &attribute.value;
140+
let value_json =
141+
collect_json_from_values(value, &format!("{}_{}", attribute_source_key, key));
142+
for key in value_json.keys() {
143+
attributes_json.insert(key.to_owned(), value_json[key].to_owned());
144+
}
145+
}
146+
attributes_json
147+
}
148+
149+
pub fn flatten_log_record(log_record: &LogRecord) -> BTreeMap<String, Value> {
150+
let mut log_record_json: BTreeMap<String, Value> = BTreeMap::new();
151+
if log_record.time_unix_nano.is_some() {
152+
log_record_json.insert(
153+
"time_unix_nano".to_string(),
154+
Value::String(log_record.time_unix_nano.as_ref().unwrap().to_string()),
155+
);
156+
}
157+
if log_record.observed_time_unix_nano.is_some() {
158+
log_record_json.insert(
159+
"observed_time_unix_nano".to_string(),
160+
Value::String(
161+
log_record
162+
.observed_time_unix_nano
163+
.as_ref()
164+
.unwrap()
165+
.to_string(),
166+
),
167+
);
168+
}
169+
if log_record.severity_number.is_some() {
170+
let severity_number: i32 = log_record.severity_number.unwrap();
171+
log_record_json.insert(
172+
"severity_number".to_string(),
173+
Value::Number(serde_json::Number::from(severity_number)),
174+
);
175+
if log_record.severity_text.is_none() {
176+
log_record_json.insert(
177+
"severity_text".to_string(),
178+
Value::String(SeverityNumber::as_str_name(severity_number).to_string()),
179+
);
180+
}
181+
}
182+
if log_record.severity_text.is_some() {
183+
log_record_json.insert(
184+
"severity_text".to_string(),
185+
Value::String(log_record.severity_text.as_ref().unwrap().to_string()),
186+
);
187+
}
188+
189+
if log_record.body.is_some() {
190+
let body = &log_record.body;
191+
let body_json = collect_json_from_values(body, &"body".to_string());
192+
for key in body_json.keys() {
193+
log_record_json.insert(key.to_owned(), body_json[key].to_owned());
194+
}
195+
}
196+
197+
if let Some(attributes) = log_record.attributes.as_ref() {
198+
let attributes_json = flatten_attributes(attributes, "log_record".to_string());
199+
for key in attributes_json.keys() {
200+
log_record_json.insert(key.to_owned(), attributes_json[key].to_owned());
201+
}
202+
}
203+
204+
if log_record.dropped_attributes_count.is_some() {
205+
log_record_json.insert(
206+
"log_record_dropped_attributes_count".to_string(),
207+
Value::Number(serde_json::Number::from(
208+
log_record.dropped_attributes_count.unwrap(),
209+
)),
210+
);
211+
}
212+
213+
if log_record.flags.is_some() {
214+
let flags: u32 = log_record.flags.unwrap();
215+
log_record_json.insert(
216+
"flags_number".to_string(),
217+
Value::Number(serde_json::Number::from(flags)),
218+
);
219+
log_record_json.insert(
220+
"flags_string".to_string(),
221+
Value::String(LogRecordFlags::as_str_name(flags).to_string()),
222+
);
223+
}
224+
225+
if log_record.span_id.is_some() {
226+
log_record_json.insert(
227+
"span_id".to_string(),
228+
Value::String(log_record.span_id.as_ref().unwrap().to_string()),
229+
);
230+
}
231+
232+
if log_record.trace_id.is_some() {
233+
log_record_json.insert(
234+
"trace_id".to_string(),
235+
Value::String(log_record.trace_id.as_ref().unwrap().to_string()),
236+
);
237+
}
238+
239+
log_record_json
240+
}
241+
130242
pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
131243
let mut vec_otel_json: Vec<BTreeMap<String, Value>> = Vec::new();
132244
let body_str = std::str::from_utf8(body).unwrap();
@@ -139,14 +251,9 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
139251

140252
if let Some(resource) = record.resource.as_ref() {
141253
if let Some(attributes) = resource.attributes.as_ref() {
142-
for attribute in attributes {
143-
let key = &attribute.key;
144-
let value = &attribute.value;
145-
let value_json =
146-
collect_json_from_values(value, &format!("resource_{}", key));
147-
for key in value_json.keys() {
148-
resource_log_json.insert(key.to_owned(), value_json[key].to_owned());
149-
}
254+
let attributes_json = flatten_attributes(attributes, "resource".to_string());
255+
for key in attributes_json.keys() {
256+
resource_log_json.insert(key.to_owned(), attributes_json[key].to_owned());
150257
}
151258
}
152259

@@ -184,17 +291,11 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
184291
}
185292

186293
if let Some(attributes) = instrumentation_scope.attributes.as_ref() {
187-
for attribute in attributes.iter() {
188-
let key = &attribute.key;
189-
let value = &attribute.value;
190-
let value_json = collect_json_from_values(
191-
value,
192-
&format!("instrumentation_scope_{}", key),
193-
);
194-
for key in value_json.keys() {
195-
scope_log_json
196-
.insert(key.to_owned(), value_json[key].to_owned());
197-
}
294+
let attributes_json =
295+
flatten_attributes(attributes, "instrumentation_scope".to_string());
296+
for key in attributes_json.keys() {
297+
scope_log_json
298+
.insert(key.to_owned(), attributes_json[key].to_owned());
198299
}
199300
}
200301

@@ -215,106 +316,8 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
215316
}
216317

217318
for log_record in scope_log.log_records.iter() {
218-
let mut log_record_json: BTreeMap<String, Value> = BTreeMap::new();
219-
if log_record.time_unix_nano.is_some() {
220-
log_record_json.insert(
221-
"time_unix_nano".to_string(),
222-
Value::String(
223-
log_record.time_unix_nano.as_ref().unwrap().to_string(),
224-
),
225-
);
226-
}
227-
if log_record.observed_time_unix_nano.is_some() {
228-
log_record_json.insert(
229-
"observed_time_unix_nano".to_string(),
230-
Value::String(
231-
log_record
232-
.observed_time_unix_nano
233-
.as_ref()
234-
.unwrap()
235-
.to_string(),
236-
),
237-
);
238-
}
239-
if log_record.severity_number.is_some() {
240-
let severity_number: i32 = log_record.severity_number.unwrap();
241-
log_record_json.insert(
242-
"severity_number".to_string(),
243-
Value::Number(serde_json::Number::from(severity_number)),
244-
);
245-
if log_record.severity_text.is_none() {
246-
log_record_json.insert(
247-
"severity_text".to_string(),
248-
Value::String(
249-
SeverityNumber::as_str_name(severity_number).to_string(),
250-
),
251-
);
252-
}
253-
}
254-
if log_record.severity_text.is_some() {
255-
log_record_json.insert(
256-
"severity_text".to_string(),
257-
Value::String(
258-
log_record.severity_text.as_ref().unwrap().to_string(),
259-
),
260-
);
261-
}
262-
263-
if log_record.body.is_some() {
264-
let body = &log_record.body;
265-
let body_json = collect_json_from_values(body, &"body".to_string());
266-
for key in body_json.keys() {
267-
log_record_json.insert(key.to_owned(), body_json[key].to_owned());
268-
}
269-
}
270-
271-
if let Some(attributes) = log_record.attributes.as_ref() {
272-
for attribute in attributes {
273-
let key = &attribute.key;
274-
let value = &attribute.value;
275-
let value_json =
276-
collect_json_from_values(value, &format!("log_record_{}", key));
277-
for key in value_json.keys() {
278-
log_record_json
279-
.insert(key.to_owned(), value_json[key].to_owned());
280-
}
281-
}
282-
}
283-
284-
if log_record.dropped_attributes_count.is_some() {
285-
log_record_json.insert(
286-
"log_record_dropped_attributes_count".to_string(),
287-
Value::Number(serde_json::Number::from(
288-
log_record.dropped_attributes_count.unwrap(),
289-
)),
290-
);
291-
}
319+
let log_record_json = flatten_log_record(log_record);
292320

293-
if log_record.flags.is_some() {
294-
let flags: u32 = log_record.flags.unwrap();
295-
log_record_json.insert(
296-
"flags_number".to_string(),
297-
Value::Number(serde_json::Number::from(flags)),
298-
);
299-
log_record_json.insert(
300-
"flags_string".to_string(),
301-
Value::String(LogRecordFlags::as_str_name(flags).to_string()),
302-
);
303-
}
304-
305-
if log_record.span_id.is_some() {
306-
log_record_json.insert(
307-
"span_id".to_string(),
308-
Value::String(log_record.span_id.as_ref().unwrap().to_string()),
309-
);
310-
}
311-
312-
if log_record.trace_id.is_some() {
313-
log_record_json.insert(
314-
"trace_id".to_string(),
315-
Value::String(log_record.trace_id.as_ref().unwrap().to_string()),
316-
);
317-
}
318321
for key in log_record_json.keys() {
319322
scope_log_json.insert(key.to_owned(), log_record_json[key].to_owned());
320323
}

0 commit comments

Comments
 (0)