Skip to content

Commit 920bcae

Browse files
author
Devdutt Shenoi
committed
Merge remote-tracking branch 'origin/main' into refactor
2 parents b7dd383 + bd07a47 commit 920bcae

File tree

4 files changed

+59
-40
lines changed

4 files changed

+59
-40
lines changed

src/handlers/http/ingest.rs

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,6 @@ use arrow_array::RecordBatch;
2424
use bytes::Bytes;
2525
use chrono::Utc;
2626
use http::StatusCode;
27-
use opentelemetry_proto::tonic::logs::v1::LogsData;
28-
use opentelemetry_proto::tonic::metrics::v1::MetricsData;
29-
use opentelemetry_proto::tonic::trace::v1::TracesData;
3027
use serde_json::Value;
3128

3229
use crate::event;
@@ -35,9 +32,6 @@ use crate::event::format::{json, EventFormat, LogSource};
3532
use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
3633
use crate::metadata::SchemaVersion;
3734
use crate::option::Mode;
38-
use crate::otel::logs::flatten_otel_logs;
39-
use crate::otel::metrics::flatten_otel_metrics;
40-
use crate::otel::traces::flatten_otel_traces;
4135
use crate::parseable::{StreamNotFound, PARSEABLE};
4236
use crate::storage::{ObjectStorageError, StreamType};
4337
use crate::utils::header_parsing::ParseHeaderError;
@@ -69,6 +63,14 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
6963
.get(LOG_SOURCE_KEY)
7064
.and_then(|h| h.to_str().ok())
7165
.map_or(LogSource::default(), LogSource::from);
66+
67+
if matches!(
68+
log_source,
69+
LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces
70+
) {
71+
return Err(PostError::OtelNotSupported);
72+
}
73+
7274
PARSEABLE
7375
.get_stream(&stream_name)?
7476
.flatten_and_push_logs(json, &log_source)
@@ -125,13 +127,11 @@ pub async fn handle_otel_logs_ingestion(
125127
PARSEABLE
126128
.create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelLogs)
127129
.await?;
128-
let stream = PARSEABLE.get_stream(&stream_name)?;
129130

130-
//custom flattening required for otel logs
131-
let logs: LogsData = serde_json::from_value(json)?;
132-
for record in flatten_otel_logs(&logs) {
133-
stream.push_logs(record, &log_source).await?;
134-
}
131+
PARSEABLE
132+
.get_stream(&stream_name)?
133+
.flatten_and_push_logs(json, &log_source)
134+
.await?;
135135

136136
Ok(HttpResponse::Ok().finish())
137137
}
@@ -161,13 +161,11 @@ pub async fn handle_otel_metrics_ingestion(
161161
LogSource::OtelMetrics,
162162
)
163163
.await?;
164-
let stream = PARSEABLE.get_stream(&stream_name)?;
165164

166-
//custom flattening required for otel metrics
167-
let metrics: MetricsData = serde_json::from_value(json)?;
168-
for record in flatten_otel_metrics(metrics) {
169-
stream.push_logs(record, &log_source).await?;
170-
}
165+
PARSEABLE
166+
.get_stream(&stream_name)?
167+
.flatten_and_push_logs(json, &log_source)
168+
.await?;
171169

172170
Ok(HttpResponse::Ok().finish())
173171
}
@@ -194,13 +192,11 @@ pub async fn handle_otel_traces_ingestion(
194192
PARSEABLE
195193
.create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelTraces)
196194
.await?;
197-
let stream = PARSEABLE.get_stream(&stream_name)?;
198195

199-
//custom flattening required for otel traces
200-
let traces: TracesData = serde_json::from_value(json)?;
201-
for record in flatten_otel_traces(&traces) {
202-
stream.push_logs(record, &log_source).await?;
203-
}
196+
PARSEABLE
197+
.get_stream(&stream_name)?
198+
.flatten_and_push_logs(json, &log_source)
199+
.await?;
204200

205201
Ok(HttpResponse::Ok().finish())
206202
}
@@ -241,6 +237,14 @@ pub async fn post_event(
241237
.get(LOG_SOURCE_KEY)
242238
.and_then(|h| h.to_str().ok())
243239
.map_or(LogSource::default(), LogSource::from);
240+
241+
if matches!(
242+
log_source,
243+
LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces
244+
) {
245+
return Err(PostError::OtelNotSupported);
246+
}
247+
244248
PARSEABLE
245249
.get_stream(&stream_name)?
246250
.flatten_and_push_logs(json, &log_source)

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818

1919
pub mod about;
20-
mod alerts;
20+
pub mod alerts;
2121
pub mod analytics;
2222
pub mod audit;
2323
pub mod banner;

src/parseable/streams.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ use arrow_schema::{ArrowError, Field, Fields, Schema};
3232
use chrono::{DateTime, NaiveDateTime, Timelike, Utc};
3333
use derive_more::{Deref, DerefMut};
3434
use itertools::Itertools;
35+
use opentelemetry_proto::tonic::{
36+
logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData,
37+
};
3538
use parquet::{
3639
arrow::ArrowWriter,
3740
basic::Encoding,
@@ -55,6 +58,7 @@ use crate::{
5558
metadata::{LogStreamMetadata, SchemaVersion},
5659
metrics,
5760
option::Mode,
61+
otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces},
5862
storage::{
5963
object_storage::to_bytes, retention::Retention, StreamType, OBJECT_STORE_DATA_GRANULARITY,
6064
},
@@ -119,14 +123,32 @@ impl Stream {
119123
) -> Result<(), PostError> {
120124
match log_source {
121125
LogSource::Kinesis => {
126+
//custom flattening required for Amazon Kinesis
122127
let message: Message = serde_json::from_value(json)?;
123-
let json = flatten_kinesis_logs(message);
124-
for record in json {
128+
for record in flatten_kinesis_logs(message) {
125129
self.push_logs(record, &LogSource::default()).await?;
126130
}
127131
}
128-
LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => {
129-
return Err(PostError::OtelNotSupported);
132+
LogSource::OtelLogs => {
133+
//custom flattening required for otel logs
134+
let logs: LogsData = serde_json::from_value(json)?;
135+
for record in flatten_otel_logs(&logs) {
136+
self.push_logs(record, log_source).await?;
137+
}
138+
}
139+
LogSource::OtelTraces => {
140+
//custom flattening required for otel traces
141+
let traces: TracesData = serde_json::from_value(json)?;
142+
for record in flatten_otel_traces(&traces) {
143+
self.push_logs(record, log_source).await?;
144+
}
145+
}
146+
LogSource::OtelMetrics => {
147+
//custom flattening required for otel metrics
148+
let metrics: MetricsData = serde_json::from_value(json)?;
149+
for record in flatten_otel_metrics(metrics) {
150+
self.push_logs(record, log_source).await?;
151+
}
130152
}
131153
_ => {
132154
self.push_logs(json, log_source).await?;

src/utils/json/flatten.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -335,18 +335,11 @@ pub fn has_more_than_four_levels(value: &Value, current_level: usize) -> bool {
335335

336336
// Converts a Vector of values into a `Value::Array`, as long as all of them are objects
337337
pub fn convert_to_array(flattened: Vec<Value>) -> Result<Value, JsonFlattenError> {
338-
let mut result = Vec::new();
339-
for item in flattened {
340-
let mut map = Map::new();
341-
let Some(item) = item.as_object() else {
342-
return Err(JsonFlattenError::ExpectedObjectInArray);
343-
};
344-
for (key, value) in item {
345-
map.insert(key.clone(), value.clone());
346-
}
347-
result.push(Value::Object(map));
338+
if flattened.iter().any(|item| !item.is_object()) {
339+
return Err(JsonFlattenError::ExpectedObjectInArray);
348340
}
349-
Ok(Value::Array(result))
341+
342+
Ok(Value::Array(flattened))
350343
}
351344

352345
#[cfg(test)]

0 commit comments

Comments
 (0)