Skip to content

Commit 0eefe77

Browse files
remove other_attributes from otel logs/traces/metrics
keep all attributes as individual columns in the ingested event expose env `P_OTEL_ATTRIBUTES_ALLOWED_LIMIT` to configure the allowed limit for attributes count if attributes count in flattened event > the allowed limit log the error, and reject the event Fixes: #1310
1 parent 3d844b3 commit 0eefe77

File tree

7 files changed

+201
-295
lines changed

7 files changed

+201
-295
lines changed

src/cli.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,14 @@ pub struct Options {
368368

369369
#[arg(long, env = "P_MS_CLARITY_TAG", help = "Tag for MS Clarity")]
370370
pub ms_clarity_tag: Option<String>,
371+
372+
#[arg(
373+
long,
374+
env = "P_OTEL_ATTRIBUTES_ALLOWED_LIMIT",
375+
default_value = "200",
376+
help = "allowed limit for otel attributes"
377+
)]
378+
pub otel_attributes_allowed_limit: usize,
371379
}
372380

373381
#[derive(Parser, Debug)]

src/handlers/http/ingest.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use crate::metadata::SchemaVersion;
3535
use crate::option::Mode;
3636
use crate::otel::logs::OTEL_LOG_KNOWN_FIELD_LIST;
3737
use crate::otel::metrics::OTEL_METRICS_KNOWN_FIELD_LIST;
38+
use crate::otel::otel_utils::OtelError;
3839
use crate::otel::traces::OTEL_TRACES_KNOWN_FIELD_LIST;
3940
use crate::parseable::{StreamNotFound, PARSEABLE};
4041
use crate::storage::{ObjectStorageError, StreamType};
@@ -467,6 +468,8 @@ pub enum PostError {
467468
KnownFormat(#[from] known_schema::Error),
468469
#[error("Ingestion is not allowed to stream {0} as it is already associated with a different OTEL format")]
469470
IncorrectLogFormat(String),
471+
#[error("OtelError: {0}")]
472+
OtelError(#[from] OtelError),
470473
}
471474

472475
impl actix_web::ResponseError for PostError {
@@ -495,6 +498,7 @@ impl actix_web::ResponseError for PostError {
495498
PostError::MissingTimePartition(_) => StatusCode::BAD_REQUEST,
496499
PostError::KnownFormat(_) => StatusCode::BAD_REQUEST,
497500
PostError::IncorrectLogFormat(_) => StatusCode::BAD_REQUEST,
501+
PostError::OtelError(_) => StatusCode::EXPECTATION_FAILED,
498502
}
499503
}
500504

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,21 +65,24 @@ pub async fn flatten_and_push_logs(
6565
LogSource::OtelLogs => {
6666
//custom flattening required for otel logs
6767
let logs: LogsData = serde_json::from_value(json)?;
68-
for record in flatten_otel_logs(&logs) {
68+
let records = flatten_otel_logs(&logs)?;
69+
for record in records {
6970
push_logs(stream_name, record, log_source, p_custom_fields).await?;
7071
}
7172
}
7273
LogSource::OtelTraces => {
7374
//custom flattening required for otel traces
7475
let traces: TracesData = serde_json::from_value(json)?;
75-
for record in flatten_otel_traces(&traces) {
76+
let records = flatten_otel_traces(&traces)?;
77+
for record in records {
7678
push_logs(stream_name, record, log_source, p_custom_fields).await?;
7779
}
7880
}
7981
LogSource::OtelMetrics => {
8082
//custom flattening required for otel metrics
8183
let metrics: MetricsData = serde_json::from_value(json)?;
82-
for record in flatten_otel_metrics(metrics) {
84+
let records = flatten_otel_metrics(metrics)?;
85+
for record in records {
8386
push_logs(stream_name, record, log_source, p_custom_fields).await?;
8487
}
8588
}

src/otel/logs.rs

Lines changed: 46 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,38 @@
1616
*
1717
*/
1818

19+
use std::collections::HashSet;
20+
21+
use crate::parseable::PARSEABLE;
22+
23+
use super::otel_utils::collect_json_from_values;
24+
use super::otel_utils::convert_epoch_nano_to_timestamp;
25+
use super::otel_utils::insert_attributes;
26+
use super::otel_utils::OtelError;
1927
use opentelemetry_proto::tonic::logs::v1::LogRecord;
2028
use opentelemetry_proto::tonic::logs::v1::LogsData;
2129
use opentelemetry_proto::tonic::logs::v1::ScopeLogs;
2230
use opentelemetry_proto::tonic::logs::v1::SeverityNumber;
2331
use serde_json::Map;
2432
use serde_json::Value;
2533

26-
use super::otel_utils::add_other_attributes_if_not_empty;
27-
use super::otel_utils::collect_json_from_values;
28-
use super::otel_utils::convert_epoch_nano_to_timestamp;
29-
use super::otel_utils::insert_attributes;
30-
use super::otel_utils::merge_attributes_in_json;
31-
32-
pub const OTEL_LOG_KNOWN_FIELD_LIST: [&str; 6] = [
34+
pub const OTEL_LOG_KNOWN_FIELD_LIST: [&str; 16] = [
35+
"scope_name",
36+
"scope_version",
37+
"scope_log_schema_url",
38+
"scope_dropped_attributes_count",
39+
"resource_dropped_attributes_count",
40+
"resource_schema_url",
3341
"time_unix_nano",
42+
"observed_time_unix_nano",
3443
"severity_number",
3544
"severity_text",
3645
"body",
46+
"flags",
47+
"log_record_dropped_attributes_count",
3748
"span_id",
3849
"trace_id",
50+
"event_name",
3951
];
4052
/// otel log event has severity number
4153
/// there is a mapping of severity number to severity text provided in proto
@@ -60,7 +72,6 @@ fn flatten_severity(severity_number: i32) -> Map<String, Value> {
6072
/// this function is called recursively for each log record object in the otel logs
6173
pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {
6274
let mut log_record_json: Map<String, Value> = Map::new();
63-
let mut other_attributes = Map::new();
6475
log_record_json.insert(
6576
"time_unix_nano".to_string(),
6677
Value::String(convert_epoch_nano_to_timestamp(
@@ -83,11 +94,7 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {
8394
log_record_json.insert(key.to_owned(), body_json[key].to_owned());
8495
}
8596
}
86-
insert_attributes(
87-
&mut log_record_json,
88-
&log_record.attributes,
89-
&mut other_attributes,
90-
);
97+
insert_attributes(&mut log_record_json, &log_record.attributes);
9198
log_record_json.insert(
9299
"log_record_dropped_attributes_count".to_string(),
93100
Value::Number(log_record.dropped_attributes_count.into()),
@@ -106,9 +113,6 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {
106113
Value::String(hex::encode(&log_record.trace_id)),
107114
);
108115

109-
// Add the `other_attributes` to the log record json
110-
add_other_attributes_if_not_empty(&mut log_record_json, &other_attributes);
111-
112116
log_record_json
113117
}
114118

@@ -117,18 +121,13 @@ pub fn flatten_log_record(log_record: &LogRecord) -> Map<String, Value> {
117121
fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec<Map<String, Value>> {
118122
let mut vec_scope_log_json = Vec::new();
119123
let mut scope_log_json = Map::new();
120-
let mut other_attributes = Map::new();
121124
if let Some(scope) = &scope_log.scope {
122125
scope_log_json.insert("scope_name".to_string(), Value::String(scope.name.clone()));
123126
scope_log_json.insert(
124127
"scope_version".to_string(),
125128
Value::String(scope.version.clone()),
126129
);
127-
insert_attributes(
128-
&mut scope_log_json,
129-
&scope.attributes,
130-
&mut other_attributes,
131-
);
130+
insert_attributes(&mut scope_log_json, &scope.attributes);
132131
scope_log_json.insert(
133132
"scope_dropped_attributes_count".to_string(),
134133
Value::Number(scope.dropped_attributes_count.into()),
@@ -146,26 +145,19 @@ fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec<Map<String, Value>> {
146145
vec_scope_log_json.push(combined_json);
147146
}
148147

149-
// Add the `other_attributes` to the scope log json
150-
merge_attributes_in_json(other_attributes, &mut vec_scope_log_json);
151-
152148
vec_scope_log_json
153149
}
154150

155151
/// this function performs the custom flattening of the otel logs
156152
/// and returns a `Vec` of `Value::Object` of the flattened json
157-
pub fn flatten_otel_logs(message: &LogsData) -> Vec<Value> {
153+
pub fn flatten_otel_logs(message: &LogsData) -> Result<Vec<Value>, OtelError> {
158154
let mut vec_otel_json = Vec::new();
155+
let known_fields: HashSet<&str> = OTEL_LOG_KNOWN_FIELD_LIST.iter().cloned().collect();
159156

160157
for record in &message.resource_logs {
161158
let mut resource_log_json = Map::new();
162-
let mut other_attributes = Map::new();
163159
if let Some(resource) = &record.resource {
164-
insert_attributes(
165-
&mut resource_log_json,
166-
&resource.attributes,
167-
&mut other_attributes,
168-
);
160+
insert_attributes(&mut resource_log_json, &resource.attributes);
169161
resource_log_json.insert(
170162
"resource_dropped_attributes_count".to_string(),
171163
Value::Number(resource.dropped_attributes_count.into()),
@@ -176,19 +168,35 @@ pub fn flatten_otel_logs(message: &LogsData) -> Vec<Value> {
176168
for scope_log in &record.scope_logs {
177169
vec_resource_logs_json.extend(flatten_scope_log(scope_log));
178170
}
171+
179172
resource_log_json.insert(
180173
"schema_url".to_string(),
181174
Value::String(record.schema_url.clone()),
182175
);
183176

184177
for resource_logs_json in &mut vec_resource_logs_json {
185178
resource_logs_json.extend(resource_log_json.clone());
186-
}
187-
188-
// Add the `other_attributes` to the resource log json
189-
merge_attributes_in_json(other_attributes, &mut vec_resource_logs_json);
190179

191-
vec_otel_json.extend(vec_resource_logs_json);
180+
let attribute_count = resource_logs_json
181+
.keys()
182+
.filter(|key| !known_fields.contains(key.as_str()))
183+
.count();
184+
// Check if the number of attributes exceeds the allowed limit
185+
if attribute_count > PARSEABLE.options.otel_attributes_allowed_limit {
186+
tracing::error!(
187+
"OTEL logs ingestion failed because the number of attributes ({}) exceeded the threshold of {}",
188+
attribute_count,
189+
PARSEABLE.options.otel_attributes_allowed_limit
190+
);
191+
return Err(OtelError::AttributeCountExceeded(
192+
attribute_count,
193+
PARSEABLE.options.otel_attributes_allowed_limit,
194+
));
195+
}
196+
197+
vec_otel_json.push(Value::Object(resource_logs_json.clone()));
198+
}
192199
}
193-
vec_otel_json.into_iter().map(Value::Object).collect()
200+
201+
Ok(vec_otel_json)
194202
}

0 commit comments

Comments
 (0)