Skip to content

Commit b25c070

Browse files
author
Devdutt Shenoi
committed
refactor: specialized flatten together
1 parent ca4b25a commit b25c070

File tree

2 files changed

+30
-27
lines changed

2 files changed

+30
-27
lines changed

src/handlers/http/ingest.rs

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818

1919
use super::logstream::error::{CreateStreamError, StreamError};
20-
use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs};
20+
use super::modal::utils::ingest_utils::flatten_and_push_logs;
2121
use super::users::dashboards::DashboardError;
2222
use super::users::filters::FiltersError;
2323
use crate::event::format::LogSource;
@@ -31,9 +31,6 @@ use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
3131
use crate::metadata::error::stream_info::MetadataError;
3232
use crate::metadata::{SchemaVersion, STREAM_INFO};
3333
use crate::option::{Mode, CONFIG};
34-
use crate::otel::logs::flatten_otel_logs;
35-
use crate::otel::metrics::flatten_otel_metrics;
36-
use crate::otel::traces::flatten_otel_traces;
3734
use crate::storage::{ObjectStorageError, StreamType};
3835
use crate::utils::header_parsing::ParseHeaderError;
3936
use crate::utils::json::flatten::JsonFlattenError;
@@ -44,9 +41,6 @@ use arrow_schema::Schema;
4441
use bytes::Bytes;
4542
use chrono::Utc;
4643
use http::StatusCode;
47-
use opentelemetry_proto::tonic::logs::v1::LogsData;
48-
use opentelemetry_proto::tonic::metrics::v1::MetricsData;
49-
use opentelemetry_proto::tonic::trace::v1::TracesData;
5044
use serde_json::Value;
5145
use std::collections::HashMap;
5246
use std::sync::Arc;
@@ -130,11 +124,7 @@ pub async fn handle_otel_logs_ingestion(
130124
let stream_name = stream_name.to_str().unwrap().to_owned();
131125
create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelLogs).await?;
132126

133-
//custom flattening required for otel logs
134-
let logs: LogsData = serde_json::from_value(json)?;
135-
for record in flatten_otel_logs(&logs) {
136-
push_logs(&stream_name, record, &log_source).await?;
137-
}
127+
flatten_and_push_logs(json, &stream_name, &log_source).await?;
138128

139129
Ok(HttpResponse::Ok().finish())
140130
}
@@ -164,11 +154,7 @@ pub async fn handle_otel_metrics_ingestion(
164154
)
165155
.await?;
166156

167-
//custom flattening required for otel metrics
168-
let metrics: MetricsData = serde_json::from_value(json)?;
169-
for record in flatten_otel_metrics(metrics) {
170-
push_logs(&stream_name, record, &log_source).await?;
171-
}
157+
flatten_and_push_logs(json, &stream_name, &log_source).await?;
172158

173159
Ok(HttpResponse::Ok().finish())
174160
}
@@ -195,11 +181,7 @@ pub async fn handle_otel_traces_ingestion(
195181
create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelTraces)
196182
.await?;
197183

198-
//custom flattening required for otel traces
199-
let traces: TracesData = serde_json::from_value(json)?;
200-
for record in flatten_otel_traces(&traces) {
201-
push_logs(&stream_name, record, &log_source).await?;
202-
}
184+
flatten_and_push_logs(json, &stream_name, &log_source).await?;
203185

204186
Ok(HttpResponse::Ok().finish())
205187
}

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
use arrow_schema::Field;
2020
use chrono::{DateTime, NaiveDateTime, Utc};
2121
use itertools::Itertools;
22+
use opentelemetry_proto::tonic::{
23+
logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData,
24+
};
2225
use serde_json::Value;
2326
use std::{collections::HashMap, sync::Arc};
2427

@@ -32,6 +35,7 @@ use crate::{
3235
kinesis::{flatten_kinesis_logs, Message},
3336
},
3437
metadata::{SchemaVersion, STREAM_INFO},
38+
otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces},
3539
storage::StreamType,
3640
utils::json::{convert_array_to_object, flatten::convert_to_array},
3741
};
@@ -44,13 +48,30 @@ pub async fn flatten_and_push_logs(
4448
match log_source {
4549
LogSource::Kinesis => {
4650
let message: Message = serde_json::from_value(json)?;
47-
let json = flatten_kinesis_logs(message);
48-
for record in json {
51+
for record in flatten_kinesis_logs(message) {
4952
push_logs(stream_name, record, &LogSource::default()).await?;
5053
}
5154
}
52-
LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => {
53-
return Err(PostError::OtelNotSupported);
55+
LogSource::OtelLogs => {
56+
//custom flattening required for otel logs
57+
let logs: LogsData = serde_json::from_value(json)?;
58+
for record in flatten_otel_logs(&logs) {
59+
push_logs(&stream_name, record, &log_source).await?;
60+
}
61+
}
62+
LogSource::OtelTraces => {
63+
//custom flattening required for otel traces
64+
let traces: TracesData = serde_json::from_value(json)?;
65+
for record in flatten_otel_traces(&traces) {
66+
push_logs(&stream_name, record, &log_source).await?;
67+
}
68+
}
69+
LogSource::OtelMetrics => {
70+
//custom flattening required for otel metrics
71+
let metrics: MetricsData = serde_json::from_value(json)?;
72+
for record in flatten_otel_metrics(metrics) {
73+
push_logs(&stream_name, record, &log_source).await?;
74+
}
5475
}
5576
_ => {
5677
push_logs(stream_name, json, log_source).await?;
@@ -59,7 +80,7 @@ pub async fn flatten_and_push_logs(
5980
Ok(())
6081
}
6182

62-
pub async fn push_logs(
83+
async fn push_logs(
6384
stream_name: &str,
6485
json: Value,
6586
log_source: &LogSource,

0 commit comments

Comments
 (0)