Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,13 +736,11 @@ pub async fn get_node_info<T: Metadata + DeserializeOwned>(
)
.await?
.iter()
.filter_map(|x| {
match serde_json::from_slice::<T>(x) {
Ok(val) => Some(val),
Err(e) => {
error!("Failed to parse node metadata: {:?}", e);
None
}
.filter_map(|x| match serde_json::from_slice::<T>(x) {
Ok(val) => Some(val),
Err(e) => {
error!("Failed to parse node metadata: {:?}", e);
None
}
})
.collect();
Expand Down
36 changes: 30 additions & 6 deletions src/otel/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ use opentelemetry_proto::tonic::logs::v1::SeverityNumber;
use serde_json::Map;
use serde_json::Value;

use super::otel_utils::add_other_attributes_if_not_empty;
use super::otel_utils::collect_json_from_values;
use super::otel_utils::convert_epoch_nano_to_timestamp;
use super::otel_utils::insert_attributes;
use super::otel_utils::merge_attributes_in_json;

pub const OTEL_LOG_KNOWN_FIELD_LIST: [&str; 6] = [
"time_unix_nano",
Expand Down Expand Up @@ -58,6 +60,7 @@ fn flatten_severity(severity_number: i32) -> Map<String, Value> {
/// this function is called recursively for each log record object in the otel logs
pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {
let mut log_record_json: Map<String, Value> = Map::new();
let mut other_attributes = Map::new();
log_record_json.insert(
"time_unix_nano".to_string(),
Value::String(convert_epoch_nano_to_timestamp(
Expand All @@ -80,7 +83,11 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {
log_record_json.insert(key.to_owned(), body_json[key].to_owned());
}
}
insert_attributes(&mut log_record_json, &log_record.attributes);
insert_attributes(
&mut log_record_json,
&log_record.attributes,
&mut other_attributes,
);
log_record_json.insert(
"log_record_dropped_attributes_count".to_string(),
Value::Number(log_record.dropped_attributes_count.into()),
Expand All @@ -99,6 +106,9 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {
Value::String(hex::encode(&log_record.trace_id)),
);

// Add the `other_attributes` to the log record json
add_other_attributes_if_not_empty(&mut log_record_json, &other_attributes);

log_record_json
}

Expand All @@ -107,14 +117,18 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {
fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec<Map<String, Value>> {
let mut vec_scope_log_json = Vec::new();
let mut scope_log_json = Map::new();

let mut other_attributes = Map::new();
if let Some(scope) = &scope_log.scope {
scope_log_json.insert("scope_name".to_string(), Value::String(scope.name.clone()));
scope_log_json.insert(
"scope_version".to_string(),
Value::String(scope.version.clone()),
);
insert_attributes(&mut scope_log_json, &scope.attributes);
insert_attributes(
&mut scope_log_json,
&scope.attributes,
&mut other_attributes,
);
scope_log_json.insert(
"scope_dropped_attributes_count".to_string(),
Value::Number(scope.dropped_attributes_count.into()),
Expand All @@ -132,18 +146,26 @@ fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec<Map<String, Value>> {
vec_scope_log_json.push(combined_json);
}

// Add the `other_attributes` to the scope log json
merge_attributes_in_json(other_attributes, &mut vec_scope_log_json);

vec_scope_log_json
}

/// this function performs the custom flattening of the otel logs
/// and returns a `Vec` of `Value::Object` of the flattened json
pub fn flatten_otel_logs(message: &LogsData) -> Vec<Value> {
let mut vec_otel_json = Vec::new();

for record in &message.resource_logs {
let mut resource_log_json = Map::new();

let mut other_attributes = Map::new();
if let Some(resource) = &record.resource {
insert_attributes(&mut resource_log_json, &resource.attributes);
insert_attributes(
&mut resource_log_json,
&resource.attributes,
&mut other_attributes,
);
resource_log_json.insert(
"resource_dropped_attributes_count".to_string(),
Value::Number(resource.dropped_attributes_count.into()),
Expand All @@ -163,8 +185,10 @@ pub fn flatten_otel_logs(message: &LogsData) -> Vec<Value> {
resource_logs_json.extend(resource_log_json.clone());
}

// Add the `other_attributes` to the resource log json
merge_attributes_in_json(other_attributes, &mut vec_resource_logs_json);

vec_otel_json.extend(vec_resource_logs_json);
}

vec_otel_json.into_iter().map(Value::Object).collect()
}
Loading
Loading