Skip to content

Commit 4c9e75d

Browse files
simplified method implementation
1 parent cff4520 commit 4c9e75d

File tree

1 file changed

+12
-18
lines changed

1 file changed

+12
-18
lines changed

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,7 @@
1616
*
1717
*/
1818

19-
use std::{
20-
collections::{BTreeMap, HashMap},
21-
sync::Arc,
22-
};
19+
use std::{collections::HashMap, sync::Arc};
2320

2421
use actix_web::HttpRequest;
2522
use arrow_schema::Field;
@@ -46,23 +43,20 @@ pub async fn flatten_and_push_logs(
4643
body: Bytes,
4744
stream_name: String,
4845
) -> Result<(), PostError> {
49-
//flatten logs
50-
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY) {
51-
let mut json: Vec<BTreeMap<String, Value>> = Vec::new();
52-
let log_source: String = log_source.to_str().unwrap().to_owned();
53-
match log_source.as_str() {
54-
LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body),
55-
_ => {
56-
log::warn!("Unknown log source: {}", log_source);
57-
push_logs(stream_name.to_string(), req.clone(), body).await?;
58-
}
59-
}
60-
for record in json.iter_mut() {
46+
let log_source = req
47+
.headers()
48+
.get(LOG_SOURCE_KEY)
49+
.unwrap()
50+
.to_str()
51+
.unwrap_or_default();
52+
if log_source == LOG_SOURCE_KINESIS {
53+
let json = kinesis::flatten_kinesis_logs(&body);
54+
for record in json.iter() {
6155
let body: Bytes = serde_json::to_vec(record).unwrap().into();
62-
push_logs(stream_name.to_string(), req.clone(), body).await?;
56+
push_logs(stream_name.clone(), req.clone(), body.clone()).await?;
6357
}
6458
} else {
65-
push_logs(stream_name.to_string(), req, body).await?;
59+
push_logs(stream_name, req, body).await?;
6660
}
6761
Ok(())
6862
}

0 commit comments

Comments
 (0)