Skip to content

Commit 6908630

Browse files
fix: rebase from main
1 parent ae8f826 commit 6908630

File tree

2 files changed

+21
-5
lines changed

2 files changed

+21
-5
lines changed

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ pub async fn push_logs(
9898
&HashMap::new(),
9999
size,
100100
schema_version,
101+
log_source,
101102
)
102103
.await?;
103104
} else {
@@ -107,6 +108,7 @@ pub async fn push_logs(
107108
None,
108109
custom_partition.as_ref(),
109110
schema_version,
111+
log_source,
110112
)?;
111113
let custom_partition = custom_partition.unwrap();
112114
let custom_partition_list = custom_partition.split(',').collect::<Vec<&str>>();
@@ -126,6 +128,7 @@ pub async fn push_logs(
126128
&custom_partition_values,
127129
size,
128130
schema_version,
131+
log_source,
129132
)
130133
.await?;
131134
}
@@ -137,6 +140,7 @@ pub async fn push_logs(
137140
time_partition_limit,
138141
None,
139142
schema_version,
143+
log_source,
140144
)?;
141145
for value in data {
142146
parsed_timestamp = get_parsed_timestamp(&value, time_partition.as_ref().unwrap())?;
@@ -151,6 +155,7 @@ pub async fn push_logs(
151155
&HashMap::new(),
152156
size,
153157
schema_version,
158+
log_source,
154159
)
155160
.await?;
156161
}
@@ -161,6 +166,7 @@ pub async fn push_logs(
161166
time_partition_limit,
162167
custom_partition.as_ref(),
163168
schema_version,
169+
log_source,
164170
)?;
165171
let custom_partition = custom_partition.unwrap();
166172
let custom_partition_list = custom_partition.split(',').collect::<Vec<&str>>();
@@ -181,6 +187,7 @@ pub async fn push_logs(
181187
&custom_partition_values,
182188
size,
183189
schema_version,
190+
log_source,
184191
)
185192
.await?;
186193
}
@@ -200,6 +207,7 @@ pub async fn create_process_record_batch(
200207
custom_partition_values: &HashMap<String, String>,
201208
origin_size: u64,
202209
schema_version: SchemaVersion,
210+
log_source: &LogSource,
203211
) -> Result<(), PostError> {
204212
let (rb, is_first_event) = get_stream_schema(
205213
stream_name,
@@ -208,7 +216,7 @@ pub async fn create_process_record_batch(
208216
static_schema_flag,
209217
time_partition,
210218
schema_version,
211-
log_source,
219+
log_source,
212220
)?;
213221
event::Event {
214222
rb,
@@ -234,6 +242,7 @@ pub fn get_stream_schema(
234242
static_schema_flag: Option<&String>,
235243
time_partition: Option<&String>,
236244
schema_version: SchemaVersion,
245+
log_source: &LogSource,
237246
) -> Result<(arrow_array::RecordBatch, bool), PostError> {
238247
let hash_map = STREAM_INFO.read().unwrap();
239248
let schema = hash_map
@@ -248,6 +257,7 @@ pub fn get_stream_schema(
248257
static_schema_flag,
249258
time_partition,
250259
schema_version,
260+
log_source,
251261
)
252262
}
253263

src/utils/json/mod.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,18 @@ pub fn flatten_json_body(
3939
validation_required: bool,
4040
log_source: &LogSource,
4141
) -> Result<Value, anyhow::Error> {
42+
// Flatten the json body only if new schema and has less than 4 levels of nesting
4243
let mut nested_value = if schema_version == SchemaVersion::V1
44+
&& !has_more_than_four_levels(&body, 1)
4345
&& matches!(
4446
log_source,
4547
LogSource::Json | LogSource::Custom(_) | LogSource::Kinesis
4648
) {
47-
flatten::generic_flattening(body)?
49+
let flattened_json = generic_flattening(&body)?;
50+
convert_to_array(flattened_json)?
4851
} else {
4952
body
5053
};
51-
5254
flatten::flatten(
5355
&mut nested_value,
5456
"_",
@@ -107,6 +109,8 @@ pub fn convert_to_string(value: &Value) -> Value {
107109

108110
#[cfg(test)]
109111
mod tests {
112+
use crate::event::format::LogSource;
113+
110114
use super::flatten_json_body;
111115
use serde_json::json;
112116

@@ -121,7 +125,8 @@ mod tests {
121125
None,
122126
None,
123127
crate::metadata::SchemaVersion::V1,
124-
false
128+
false,
129+
&LogSource::default()
125130
)
126131
.unwrap(),
127132
expected
@@ -139,7 +144,8 @@ mod tests {
139144
None,
140145
None,
141146
crate::metadata::SchemaVersion::V1,
142-
false
147+
false,
148+
&LogSource::default()
143149
)
144150
.unwrap(),
145151
expected

0 commit comments

Comments
 (0)