From f3565795eabff03cbd27c0d394dd79c243b99d16 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 8 May 2024 15:43:39 +0530 Subject: [PATCH 1/4] enhancement to allow custom partitions in ingestion to allow - add an optional header - X-P-Custom-Partition in stream creation API with comma separated values the custom-partition will be stored in the stream.json file at the time of ingestion, below validations are put in place - 1. all events in the batch have the custom partitions 2. max of 3 partitions are allowed 3. custom partition values should not have a '.' in the log event 4. custom partition value should not be empty in the log event Prefixes will be created in the alphabetical order of the custom partition fields sort order in the parquet will be p_timestamp/time_partition and the custom partition fields same sort order will be reflected in the manifest file --- server/src/event.rs | 21 ++++- server/src/event/format/json.rs | 2 +- server/src/event/writer.rs | 21 ++++- server/src/event/writer/file_writer.rs | 13 ++- server/src/handlers.rs | 1 + server/src/handlers/http/ingest.rs | 105 +++++++++++++++++---- server/src/handlers/http/logstream.rs | 31 +++++- server/src/metadata.rs | 17 +++- server/src/static_schema.rs | 25 +++++ server/src/storage.rs | 5 + server/src/storage/object_storage.rs | 30 +++++- server/src/storage/staging.rs | 126 ++++++++++++++++--------- server/src/utils.rs | 11 ++- server/src/utils/json.rs | 11 ++- server/src/utils/json/flatten.rs | 115 +++++++++++++++++----- 15 files changed, 424 insertions(+), 110 deletions(-) diff --git a/server/src/event.rs b/server/src/event.rs index eeb95e0b0..30e33a9e6 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -23,13 +23,13 @@ mod writer; use arrow_array::RecordBatch; use arrow_schema::{Field, Fields, Schema}; use itertools::Itertools; - use std::sync::Arc; use self::error::EventError; pub use self::writer::STREAM_WRITERS; use crate::metadata; use chrono::NaiveDateTime; +use std::collections::HashMap; pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp"; pub const DEFAULT_TAGS_KEY: &str = "p_tags"; @@ -44,6 +44,7 @@ pub struct Event { pub is_first_event: bool, pub parsed_timestamp: NaiveDateTime, pub time_partition: Option, + pub custom_partition_values: HashMap, } // Events holds the schema related to a each event for a single log stream @@ -55,6 +56,14 @@ impl Event { key = format!("{key}{parsed_timestamp_to_min}"); } + if !self.custom_partition_values.is_empty() { + let mut custom_partition_key = String::new(); + for (k, v) in self.custom_partition_values.iter().sorted_by_key(|v| v.0) { + custom_partition_key = format!("{custom_partition_key}&{k}={v}"); + } + key = format!("{key}{custom_partition_key}"); + } + let num_rows = self.rb.num_rows() as u64; if self.is_first_event { commit_schema(&self.stream_name, self.rb.schema())?; @@ -65,6 +74,7 @@ impl Event { &key, self.rb.clone(), self.parsed_timestamp, + self.custom_partition_values, )?; metadata::STREAM_INFO.update_stats( @@ -93,8 +103,15 @@ impl Event { schema_key: &str, rb: RecordBatch, parsed_timestamp: NaiveDateTime, + custom_partition_values: HashMap, ) -> Result<(), EventError> { - STREAM_WRITERS.append_to_local(stream_name, schema_key, rb, parsed_timestamp)?; + STREAM_WRITERS.append_to_local( + stream_name, + schema_key, + rb, + parsed_timestamp, + custom_partition_values, + )?; Ok(()) } } diff --git a/server/src/event/format/json.rs b/server/src/event/format/json.rs index b2b9f88c9..ed697af00 100644 --- a/server/src/event/format/json.rs +++ b/server/src/event/format/json.rs @@ -48,7 +48,7 @@ impl EventFormat for Event { static_schema_flag: Option, time_partition: Option, ) -> Result<(Self::Data, Vec>, bool, Tags, Metadata), anyhow::Error> { - let data = flatten_json_body(self.data, None, None, false)?; + let data = flatten_json_body(self.data, None, None, None, false)?; let stream_schema = schema; // incoming event may be a single json or a json array diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 737a0a514..b508197b1 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -49,6 +49,7 @@ impl Writer { schema_key: &str, rb: RecordBatch, parsed_timestamp: NaiveDateTime, + custom_partition_values: HashMap, ) -> Result<(), StreamWriterError> { let rb = utils::arrow::replace_columns( rb.schema(), @@ -57,8 +58,13 @@ impl Writer { &[Arc::new(get_timestamp_array(rb.num_rows()))], ); - self.disk - .push(stream_name, schema_key, &rb, parsed_timestamp)?; + self.disk.push( + stream_name, + schema_key, + &rb, + parsed_timestamp, + custom_partition_values, + )?; self.mem.push(schema_key, rb); Ok(()) } @@ -75,6 +81,7 @@ impl WriterTable { schema_key: &str, record: RecordBatch, parsed_timestamp: NaiveDateTime, + custom_partition_values: HashMap, ) -> Result<(), StreamWriterError> { let hashmap_guard = self.read().unwrap(); @@ -85,6 +92,7 @@ impl WriterTable { schema_key, record, parsed_timestamp, + custom_partition_values, )?; } None => { @@ -98,10 +106,17 @@ impl WriterTable { schema_key, record, parsed_timestamp, + custom_partition_values, )?; } else { let mut writer = Writer::default(); - writer.push(stream_name, schema_key, record, parsed_timestamp)?; + writer.push( + stream_name, + schema_key, + record, + parsed_timestamp, + custom_partition_values, + )?; map.insert(stream_name.to_owned(), Mutex::new(writer)); } } diff --git a/server/src/event/writer/file_writer.rs b/server/src/event/writer/file_writer.rs index 1b193eb4c..60639015e 100644 --- a/server/src/event/writer/file_writer.rs +++ b/server/src/event/writer/file_writer.rs @@ -44,6 +44,7 @@ impl FileWriter { schema_key: &str, record: &RecordBatch, parsed_timestamp: NaiveDateTime, + custom_partition_values: HashMap, ) -> Result<(), StreamWriterError> { match self.get_mut(schema_key) { Some(writer) => { @@ -55,8 +56,13 @@ impl FileWriter { // entry is not present thus we create it None => { // this requires mutable borrow of the map so we drop this read lock and wait for write lock - let (path, writer) = - init_new_stream_writer_file(stream_name, schema_key, record, parsed_timestamp)?; + let (path, writer) = init_new_stream_writer_file( + stream_name, + schema_key, + record, + parsed_timestamp, + custom_partition_values, + )?; self.insert( schema_key.to_owned(), ArrowWriter { @@ -82,9 +88,10 @@ fn init_new_stream_writer_file( schema_key: &str, record: &RecordBatch, parsed_timestamp: NaiveDateTime, + custom_partition_values: HashMap, ) -> Result<(PathBuf, StreamWriter), StreamWriterError> { let dir = StorageDir::new(stream_name); - let path = dir.path_by_current_time(schema_key, parsed_timestamp); + let path = dir.path_by_current_time(schema_key, parsed_timestamp, custom_partition_values); std::fs::create_dir_all(dir.data_path)?; let file = OpenOptions::new().create(true).append(true).open(&path)?; diff --git a/server/src/handlers.rs b/server/src/handlers.rs index d610011cf..883eceb65 100644 --- a/server/src/handlers.rs +++ b/server/src/handlers.rs @@ -25,6 +25,7 @@ const STREAM_NAME_HEADER_KEY: &str = "x-p-stream"; const LOG_SOURCE_KEY: &str = "x-p-log-source"; const TIME_PARTITION_KEY: &str = "x-p-time-partition"; const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit"; +const CUSTOM_PARTITION_KEY: &str = "x-p-custom-partition"; const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag"; const AUTHORIZATION_KEY: &str = "authorization"; const SEPARATOR: char = '^'; diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 7532ad59e..be7d26322 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -106,36 +106,103 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result let time_partition = object_store_format.time_partition; let time_partition_limit = object_store_format.time_partition_limit; let static_schema_flag = object_store_format.static_schema_flag; + let custom_partition = object_store_format.custom_partition; let body_val: Value = serde_json::from_slice(&body)?; let size: usize = body.len(); let mut parsed_timestamp = Utc::now().naive_utc(); + let mut custom_partition_values: HashMap = HashMap::new(); + if time_partition.is_none() { - let stream = stream_name.clone(); - let (rb, is_first_event) = get_stream_schema( - stream.clone(), - req, - body_val, - static_schema_flag, - time_partition.clone(), - )?; - event::Event { - rb, - stream_name: stream, - origin_format: "json", - origin_size: size as u64, - is_first_event, - parsed_timestamp, - time_partition, + if custom_partition.is_none() { + let stream = stream_name.clone(); + let (rb, is_first_event) = get_stream_schema( + stream.clone(), + req, + body_val, + static_schema_flag, + time_partition.clone(), + )?; + event::Event { + rb: rb.clone(), + stream_name: stream, + origin_format: "json", + origin_size: size as u64, + is_first_event, + parsed_timestamp, + time_partition, + custom_partition_values, + } + .process() + .await?; + } else { + let data = convert_array_to_object( + body_val.clone(), + time_partition.clone(), + time_partition_limit, + custom_partition.clone(), + )?; + let custom_partition = custom_partition.unwrap(); + let custom_partition_list = custom_partition.split(',').collect::>(); + + for value in data { + for custom_partition_field in custom_partition_list.clone() { + let custom_partition_value = + value.get(custom_partition_field.trim()).unwrap().to_owned(); + let custom_partition_value = match custom_partition_value.clone() { + e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(), + Value::String(s) => s, + _ => "".to_string(), + }; + custom_partition_values.insert( + custom_partition_field.trim().to_string(), + custom_partition_value, + ); + } + let (rb, is_first_event) = get_stream_schema( + stream_name.clone(), + req.clone(), + value.clone(), + static_schema_flag.clone(), + time_partition.clone(), + )?; + event::Event { + rb, + stream_name: stream_name.clone(), + origin_format: "json", + origin_size: value.to_string().into_bytes().len() as u64, + is_first_event, + parsed_timestamp, + time_partition: time_partition.clone(), + custom_partition_values: custom_partition_values.clone(), + } + .process() + .await?; + } } - .process() - .await?; } else { let data = convert_array_to_object( body_val.clone(), time_partition.clone(), time_partition_limit, + custom_partition.clone(), )?; + let custom_partition = custom_partition.unwrap(); + let custom_partition_list = custom_partition.split(',').collect::>(); + for value in data { + for custom_partition_field in custom_partition_list.clone() { + let custom_partition_value = + value.get(custom_partition_field.trim()).unwrap().to_owned(); + let custom_partition_value = match custom_partition_value.clone() { + e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(), + Value::String(s) => s, + _ => "".to_string(), + }; + custom_partition_values.insert( + custom_partition_field.trim().to_string(), + custom_partition_value, + ); + } let body_timestamp = value.get(&time_partition.clone().unwrap().to_string()); parsed_timestamp = body_timestamp .unwrap() @@ -161,6 +228,7 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result is_first_event, parsed_timestamp, time_partition: time_partition.clone(), + custom_partition_values: custom_partition_values.clone(), } .process() .await?; @@ -216,6 +284,7 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr "", "", "", + "", Arc::new(Schema::empty()), ) .await?; diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 1fa5a10be..5910982f6 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -21,7 +21,9 @@ use super::base_path_without_preceding_slash; use super::cluster::fetch_stats_from_ingestors; use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}; use crate::alerts::Alerts; -use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY}; +use crate::handlers::{ + CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, +}; use crate::metadata::STREAM_INFO; use crate::option::{Mode, CONFIG}; use crate::static_schema::{convert_static_schema_to_arrow_schema, StaticSchema}; @@ -224,6 +226,21 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result>(); + if custom_partition_list.len() > 3 { + return Err(StreamError::Custom { + msg: "maximum 3 custom partition keys are supported".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + } let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); let mut schema = Arc::new(Schema::empty()); @@ -240,8 +257,11 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result Result, ) -> Result<(), CreateStreamError> { @@ -561,6 +583,7 @@ pub async fn create_stream( &stream_name, time_partition, time_partition_limit, + custom_partition, static_schema_flag, schema.clone(), ) @@ -591,6 +614,7 @@ pub async fn create_stream( created_at, time_partition.to_string(), time_partition_limit.to_string(), + custom_partition.to_string(), static_schema_flag.to_string(), static_schema, ); @@ -630,6 +654,7 @@ pub async fn get_stream_info(req: HttpRequest) -> Result, pub time_partition: Option, pub time_partition_limit: Option, + pub custom_partition: Option, pub static_schema_flag: Option, } @@ -100,6 +101,13 @@ impl StreamInfo { .map(|metadata| metadata.time_partition.clone()) } + pub fn get_custom_partition(&self, stream_name: &str) -> Result, MetadataError> { + let map = self.read().expect(LOCK_EXPECT); + map.get(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| metadata.custom_partition.clone()) + } + pub fn get_static_schema_flag( &self, stream_name: &str, @@ -160,13 +168,14 @@ impl StreamInfo { metadata.first_event_at = first_event_at; }) } - + #[allow(clippy::too_many_arguments)] pub fn add_stream( &self, stream_name: String, created_at: String, time_partition: String, time_partition_limit: String, + custom_partition: String, static_schema_flag: String, static_schema: HashMap>, ) { @@ -187,6 +196,11 @@ impl StreamInfo { } else { Some(time_partition_limit) }, + custom_partition: if custom_partition.is_empty() { + None + } else { + Some(custom_partition) + }, static_schema_flag: if static_schema_flag != "true" { None } else { @@ -244,6 +258,7 @@ impl StreamInfo { first_event_at: meta.first_event_at, time_partition: meta.time_partition, time_partition_limit: meta.time_partition_limit, + custom_partition: meta.custom_partition, static_schema_flag: meta.static_schema_flag, }; diff --git a/server/src/static_schema.rs b/server/src/static_schema.rs index b7305da88..488f88f88 100644 --- a/server/src/static_schema.rs +++ b/server/src/static_schema.rs @@ -41,17 +41,42 @@ pub struct Metadata {} pub fn convert_static_schema_to_arrow_schema( static_schema: StaticSchema, time_partition: &str, + custom_partition: &str, ) -> Result, AnyError> { let mut parsed_schema = ParsedSchema { fields: Vec::new(), metadata: HashMap::new(), }; let mut time_partition_exists: bool = false; + + if !custom_partition.is_empty() { + let custom_partition_list = custom_partition.split(',').collect::>(); + let mut custom_partition_exists: HashMap = + HashMap::with_capacity(custom_partition_list.len()); + + for partition in &custom_partition_list { + for field in &static_schema.fields { + if &field.name == partition { + custom_partition_exists.insert(partition.to_string(), true); + } + } + } + for partition in custom_partition_list { + if !custom_partition_exists.contains_key(partition) { + return Err(anyhow! { + format!( + "custom partition field {partition} does not exist in the schema for the static schema logstream" + ), + }); + } + } + } for mut field in static_schema.fields { if !time_partition.is_empty() && field.name == time_partition { time_partition_exists = true; field.data_type = "datetime".to_string(); } + let parsed_field = Fields { name: field.name.clone(), diff --git a/server/src/storage.rs b/server/src/storage.rs index 3de62ef4f..d6fb2fb7b 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -95,6 +95,8 @@ pub struct ObjectStoreFormat { #[serde(skip_serializing_if = "Option::is_none")] pub time_partition_limit: Option, #[serde(skip_serializing_if = "Option::is_none")] + pub custom_partition: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub static_schema_flag: Option, } @@ -112,6 +114,8 @@ pub struct StreamInfo { #[serde(skip_serializing_if = "Option::is_none")] pub time_partition_limit: Option, #[serde(skip_serializing_if = "Option::is_none")] + pub custom_partition: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub static_schema_flag: Option, } @@ -159,6 +163,7 @@ impl Default for ObjectStoreFormat { retention: None, time_partition: None, time_partition_limit: None, + custom_partition: None, static_schema_flag: None, } } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index b0846d0d8..5244feb04 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -27,7 +27,6 @@ use super::{ use crate::handlers::http::modal::ingest_server::INGESTOR_META; use crate::option::Mode; - use crate::{ alerts::Alerts, catalog::{self, manifest::Manifest, snapshot::Snapshot}, @@ -128,6 +127,7 @@ pub trait ObjectStorage: Sync + 'static { stream_name: &str, time_partition: &str, time_partition_limit: &str, + custom_partition: &str, static_schema_flag: &str, schema: Arc, ) -> Result<(), ObjectStorageError> { @@ -145,6 +145,11 @@ pub trait ObjectStorage: Sync + 'static { } else { format.time_partition_limit = Some(time_partition_limit.to_string()); } + if custom_partition.is_empty() { + format.custom_partition = None; + } else { + format.custom_partition = Some(custom_partition.to_string()); + } if static_schema_flag != "true" { format.static_schema_flag = None; } else { @@ -439,9 +444,17 @@ pub trait ObjectStorage: Sync + 'static { let time_partition = STREAM_INFO .get_time_partition(stream) .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; - let dir = StorageDir::new(stream); - let schema = convert_disk_files_to_parquet(stream, &dir, time_partition) + let custom_partition = STREAM_INFO + .get_custom_partition(stream) .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; + let dir = StorageDir::new(stream); + let schema = convert_disk_files_to_parquet( + stream, + &dir, + time_partition, + custom_partition.clone(), + ) + .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; if let Some(schema) = schema { let static_schema_flag = STREAM_INFO @@ -467,7 +480,16 @@ pub trait ObjectStorage: Sync + 'static { .expect("only parquet files are returned by iterator") .to_str() .expect("filename is valid string"); - let file_suffix = str::replacen(filename, ".", "/", 3); + let mut file_suffix = str::replacen(filename, ".", "/", 3); + + let custom_partition_clone = custom_partition.clone(); + if custom_partition_clone.is_some() { + let custom_partition_fields = custom_partition_clone.unwrap(); + let custom_partition_list = + custom_partition_fields.split(',').collect::>(); + file_suffix = + str::replacen(filename, ".", "/", 3 + custom_partition_list.len()); + } let stream_relative_path = format!("{stream}/{file_suffix}"); self.upload_file(&stream_relative_path, &file).await?; let absolute_path = self diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index abeac7062..4d770986a 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -38,7 +38,8 @@ use crate::{ }; use arrow_schema::{ArrowError, Schema}; use base64::Engine; -use chrono::{NaiveDateTime, Timelike}; +use chrono::{NaiveDateTime, Timelike, Utc}; +use itertools::Itertools; use parquet::{ arrow::ArrowWriter, basic::Encoding, @@ -64,10 +65,17 @@ impl StorageDir { Self { data_path } } - pub fn file_time_suffix(time: NaiveDateTime, extention: &str) -> String { - let uri = utils::date_to_prefix(time.date()) + pub fn file_time_suffix( + time: NaiveDateTime, + custom_partition_values: HashMap, + extention: &str, + ) -> String { + let mut uri = utils::date_to_prefix(time.date()) + &utils::hour_to_prefix(time.hour()) + &utils::minute_to_prefix(time.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(); + if !custom_partition_values.is_empty() { + uri = uri + &utils::custom_partition_to_prefix(custom_partition_values); + } let local_uri = str::replace(&uri, "/", "."); let hostname = hostname_unchecked(); if CONFIG.parseable.mode == Mode::Ingest { @@ -78,27 +86,37 @@ impl StorageDir { } } - fn filename_by_time(stream_hash: &str, time: NaiveDateTime) -> String { + fn filename_by_time( + stream_hash: &str, + time: NaiveDateTime, + custom_partition_values: HashMap, + ) -> String { format!( "{}.{}", stream_hash, - Self::file_time_suffix(time, ARROW_FILE_EXTENSION) + Self::file_time_suffix(time, custom_partition_values, ARROW_FILE_EXTENSION) ) } - fn filename_by_current_time(stream_hash: &str, parsed_timestamp: NaiveDateTime) -> String { - Self::filename_by_time(stream_hash, parsed_timestamp) + fn filename_by_current_time( + stream_hash: &str, + parsed_timestamp: NaiveDateTime, + custom_partition_values: HashMap, + ) -> String { + Self::filename_by_time(stream_hash, parsed_timestamp, custom_partition_values) } pub fn path_by_current_time( &self, stream_hash: &str, parsed_timestamp: NaiveDateTime, + custom_partition_values: HashMap, ) -> PathBuf { - self.data_path.join(Self::filename_by_current_time( - stream_hash, - parsed_timestamp, - )) + let server_time_in_min = Utc::now().format("%Y%m%dT%H%M").to_string(); + let mut filename = + Self::filename_by_current_time(stream_hash, parsed_timestamp, custom_partition_values); + filename = format!("{}{}", server_time_in_min, filename); + self.data_path.join(filename) } pub fn arrow_files(&self) -> Vec { @@ -106,10 +124,11 @@ impl StorageDir { return vec![]; }; - let paths: Vec = dir + let paths = dir .flatten() .map(|file| file.path()) .filter(|file| file.extension().map_or(false, |ext| ext.eq("arrows"))) + .sorted_by_key(|f| f.metadata().unwrap().modified().unwrap()) .collect(); paths @@ -135,20 +154,16 @@ impl StorageDir { &self, exclude: NaiveDateTime, ) -> HashMap> { - let hot_filename = StorageDir::file_time_suffix(exclude, ARROW_FILE_EXTENSION); - // hashmap but exclude where hot filename matches let mut grouped_arrow_file: HashMap> = HashMap::new(); let mut arrow_files = self.arrow_files(); - arrow_files.retain(|path| { !path .file_name() .unwrap() .to_str() .unwrap() - .ends_with(&hot_filename) + .starts_with(&exclude.format("%Y%m%dT%H%M").to_string()) }); - for arrow_file_path in arrow_files { let key = Self::arrow_path_to_parquet(&arrow_file_path); grouped_arrow_file @@ -156,7 +171,6 @@ impl StorageDir { .or_default() .push(arrow_file_path); } - grouped_arrow_file } @@ -176,6 +190,7 @@ impl StorageDir { let (_, filename) = filename.split_once('.').unwrap(); let filename = filename.rsplit_once('.').expect("contains the delim `.`"); let filename = format!("{}.{}", filename.0, filename.1); + let random_string = rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 15); let filename_with_random_number = format!("{}.{}.{}", filename, random_string, "arrows"); @@ -186,7 +201,6 @@ impl StorageDir { let (_, filename) = file_stem.split_once('.').unwrap(); let filename_with_random_number = format!("{}.{}.{}", filename, random_number, "arrows"); */ - let mut parquet_path = path.to_owned(); parquet_path.set_file_name(filename_with_random_number); parquet_path.set_extension("parquet"); @@ -197,7 +211,7 @@ impl StorageDir { #[allow(unused)] pub fn to_parquet_path(stream_name: &str, time: NaiveDateTime) -> PathBuf { let data_path = CONFIG.parseable.local_stream_data_path(stream_name); - let dir = StorageDir::file_time_suffix(time, PARQUET_FILE_EXTENSION); + let dir = StorageDir::file_time_suffix(time, HashMap::new(), PARQUET_FILE_EXTENSION); data_path.join(dir) } @@ -206,6 +220,7 @@ pub fn convert_disk_files_to_parquet( stream: &str, dir: &StorageDir, time_partition: Option, + custom_partition: Option, ) -> Result, MoveDataError> { let mut schemas = Vec::new(); @@ -241,8 +256,20 @@ pub fn convert_disk_files_to_parquet( if let Some(time_partition) = time_partition.as_ref() { index_time_partition = merged_schema.index_of(time_partition).unwrap(); } + let mut custom_partition_fields: HashMap = HashMap::new(); + if let Some(custom_partition) = custom_partition.as_ref() { + for custom_partition_field in custom_partition.split(',') { + let index = merged_schema.index_of(custom_partition_field).unwrap(); + custom_partition_fields.insert(custom_partition_field.to_string(), index); + } + } let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; - let props = parquet_writer_props(time_partition.clone(), index_time_partition).build(); + let props = parquet_writer_props( + time_partition.clone(), + index_time_partition, + custom_partition_fields, + ) + .build(); schemas.push(merged_schema.clone()); let schema = Arc::new(merged_schema); @@ -278,36 +305,41 @@ pub fn convert_disk_files_to_parquet( fn parquet_writer_props( time_partition: Option, index_time_partition: usize, + custom_partition_fields: HashMap, ) -> WriterPropertiesBuilder { let index_time_partition: i32 = index_time_partition as i32; - + let mut time_partition_field = DEFAULT_TIMESTAMP_KEY.to_string(); if let Some(time_partition) = time_partition { - WriterProperties::builder() - .set_max_row_group_size(CONFIG.parseable.row_group_size) - .set_compression(CONFIG.parseable.parquet_compression.into()) - .set_column_encoding( - ColumnPath::new(vec![time_partition]), - Encoding::DELTA_BYTE_ARRAY, - ) - .set_sorting_columns(Some(vec![SortingColumn { - column_idx: index_time_partition, - descending: true, - nulls_first: true, - }])) - } else { - WriterProperties::builder() - .set_max_row_group_size(CONFIG.parseable.row_group_size) - .set_compression(CONFIG.parseable.parquet_compression.into()) - .set_column_encoding( - ColumnPath::new(vec![DEFAULT_TIMESTAMP_KEY.to_string()]), - Encoding::DELTA_BINARY_PACKED, - ) - .set_sorting_columns(Some(vec![SortingColumn { - column_idx: index_time_partition, - descending: true, - nulls_first: true, - }])) + time_partition_field = time_partition; + } + let mut sorting_column_vec: Vec = Vec::new(); + sorting_column_vec.push(SortingColumn { + column_idx: index_time_partition, + descending: true, + nulls_first: true, + }); + let mut props = WriterProperties::builder() + .set_max_row_group_size(CONFIG.parseable.row_group_size) + .set_compression(CONFIG.parseable.parquet_compression.into()) + .set_column_encoding( + ColumnPath::new(vec![time_partition_field]), + Encoding::DELTA_BINARY_PACKED, + ); + + for (field, index) in custom_partition_fields { + let field = ColumnPath::new(vec![field]); + let encoding = Encoding::DELTA_BYTE_ARRAY; + props = props.set_column_encoding(field, encoding); + let sorting_column = SortingColumn { + column_idx: index as i32, + descending: true, + nulls_first: true, + }; + sorting_column_vec.push(sorting_column); } + props = props.set_sorting_columns(Some(sorting_column_vec)); + + props } pub fn get_ingestor_info() -> anyhow::Result { diff --git a/server/src/utils.rs b/server/src/utils.rs index ec60f115a..a713cf6f0 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -24,8 +24,9 @@ pub mod uid; pub mod update; use crate::option::CONFIG; use chrono::{DateTime, NaiveDate, Timelike, Utc}; +use itertools::Itertools; use sha2::{Digest, Sha256}; - +use std::collections::HashMap; use std::env; use url::Url; @@ -62,6 +63,14 @@ pub fn date_to_prefix(date: NaiveDate) -> String { date.replace("UTC", "") } +pub fn custom_partition_to_prefix(custom_partition: HashMap) -> String { + let mut prefix = String::new(); + for (key, value) in custom_partition.iter().sorted_by_key(|v| v.0) { + prefix.push_str(&format!("{key}={value}/", key = key, value = value)); + } + prefix +} + pub fn hour_to_prefix(hour: u32) -> String { format!("hour={hour:02}/") } diff --git a/server/src/utils/json.rs b/server/src/utils/json.rs index ad1572c72..526fb532f 100644 --- a/server/src/utils/json.rs +++ b/server/src/utils/json.rs @@ -25,6 +25,7 @@ pub fn flatten_json_body( body: serde_json::Value, time_partition: Option, time_partition_limit: Option, + custom_partition: Option, validation_required: bool, ) -> Result { flatten::flatten( @@ -32,6 +33,7 @@ pub fn flatten_json_body( "_", time_partition, time_partition_limit, + custom_partition, validation_required, ) } @@ -40,8 +42,15 @@ pub fn convert_array_to_object( body: Value, time_partition: Option, time_partition_limit: Option, + custom_partition: Option, ) -> Result, anyhow::Error> { - let data = flatten_json_body(body, time_partition, time_partition_limit, true)?; + let data = flatten_json_body( + body, + time_partition, + time_partition_limit, + custom_partition, + true, + )?; let value_arr = match data { Value::Array(arr) => arr, value @ Value::Object(_) => vec![value], diff --git a/server/src/utils/json/flatten.rs b/server/src/utils/json/flatten.rs index 82f74a532..574671321 100644 --- a/server/src/utils/json/flatten.rs +++ b/server/src/utils/json/flatten.rs @@ -27,20 +27,30 @@ pub fn flatten( separator: &str, time_partition: Option, time_partition_limit: Option, + custom_partition: Option, validation_required: bool, ) -> Result { match nested_value { Value::Object(nested_dict) => { if validation_required { let validate_time_partition_result = validate_time_partition( - Value::Object(nested_dict.clone()), + &Value::Object(nested_dict.clone()), time_partition.clone(), time_partition_limit.clone(), ); + + let validate_custom_partition_result = validate_custom_partition( + &Value::Object(nested_dict.clone()), + custom_partition.clone(), + ); if validate_time_partition_result.is_ok() { - let mut map = Map::new(); - flatten_object(&mut map, None, nested_dict, separator)?; - Ok(Value::Object(map)) + if validate_custom_partition_result.is_ok() { + let mut map = Map::new(); + flatten_object(&mut map, None, nested_dict, separator)?; + Ok(Value::Object(map)) + } else { + Err(anyhow!(validate_custom_partition_result.unwrap_err())) + } } else { Err(anyhow!(validate_time_partition_result.unwrap_err())) } @@ -55,19 +65,24 @@ pub fn flatten( let value: Value = _value.clone(); if validation_required { let validate_time_partition_result = validate_time_partition( - value, + &value, time_partition.clone(), time_partition_limit.clone(), ); - + let validate_custom_partition_result = + validate_custom_partition(&value, custom_partition.clone()); if validate_time_partition_result.is_ok() { - let value = std::mem::replace(_value, Value::Null); - let mut map = Map::new(); - let Value::Object(obj) = value else { - return Err(anyhow!("Expected object in array of objects")); - }; - flatten_object(&mut map, None, obj, separator)?; - *_value = Value::Object(map); + if validate_custom_partition_result.is_ok() { + let value = std::mem::replace(_value, Value::Null); + let mut map = Map::new(); + let Value::Object(obj) = value else { + return Err(anyhow!("Expected object in array of objects")); + }; + flatten_object(&mut map, None, obj, separator)?; + *_value = Value::Object(map); + } else { + return Err(anyhow!(validate_custom_partition_result.unwrap_err())); + } } else { return Err(anyhow!(validate_time_partition_result.unwrap_err())); } @@ -87,8 +102,47 @@ pub fn flatten( } } +pub fn validate_custom_partition( + value: &Value, + custom_partition: Option, +) -> Result { + if custom_partition.is_none() { + return Ok(true); + } else { + let custom_partition = custom_partition.unwrap(); + let custom_partition_list = custom_partition.split(',').collect::>(); + for custom_partition_field in &custom_partition_list { + if value.get(custom_partition_field.trim()).is_none() { + return Err(anyhow!(format!( + "ingestion failed as field {} is not part of the log", + custom_partition_field + ))); + } else { + let custom_partition_value = value + .get(custom_partition_field.trim()) + .unwrap() + .to_string(); + if custom_partition_value.is_empty() { + return Err(anyhow!(format!( + "ingestion failed as field {} is empty", + custom_partition_field + ))); + } + if custom_partition_value.contains('.') { + return Err(anyhow!(format!( + "ingestion failed as field {} contains a period", + custom_partition_field + ))); + } + } + } + } + + Ok(true) +} + pub fn validate_time_partition( - value: Value, + value: &Value, time_partition: Option, time_partition_limit: Option, ) -> Result { @@ -258,19 +312,28 @@ mod tests { #[test] fn flatten_single_key_string() { let obj = json!({"key": "value"}); - assert_eq!(obj.clone(), flatten(obj, "_", None, None, false).unwrap()); + assert_eq!( + obj.clone(), + flatten(obj, "_", None, None, None, false).unwrap() + ); } #[test] fn flatten_single_key_int() { let obj = json!({"key": 1}); - assert_eq!(obj.clone(), flatten(obj, "_", None, None, false).unwrap()); + assert_eq!( + obj.clone(), + flatten(obj, "_", None, None, None, false).unwrap() + ); } #[test] fn flatten_multiple_key_value() { let obj = json!({"key1": 1, "key2": "value2"}); - assert_eq!(obj.clone(), flatten(obj, "_", None, None, false).unwrap()); + assert_eq!( + obj.clone(), + flatten(obj, "_", None, None, None, false).unwrap() + ); } #[test] @@ -278,7 +341,7 @@ mod tests { let obj = json!({"key": "value", "nested_key": {"key":"value"}}); assert_eq!( json!({"key": "value", "nested_key.key": "value"}), - flatten(obj, ".", None, None, false).unwrap() + flatten(obj, ".", None, None, None, false).unwrap() ); } @@ -287,7 +350,7 @@ mod tests { let obj = json!({"key": "value", "nested_key": {"key1":"value1", "key2": "value2"}}); assert_eq!( json!({"key": "value", "nested_key.key1": "value1", "nested_key.key2": "value2"}), - flatten(obj, ".", None, None, false).unwrap() + flatten(obj, ".", None, None, None, false).unwrap() ); } @@ -296,7 +359,7 @@ mod tests { let obj = json!({"key": "value", "nested_key": {"key1":[1,2,3]}}); assert_eq!( json!({"key": "value", "nested_key.key1": [1,2,3]}), - flatten(obj, ".", None, None, false).unwrap() + flatten(obj, ".", None, None, None, false).unwrap() ); } @@ -305,7 +368,7 @@ mod tests { let obj = json!({"key": [{"a": "value0"}, {"a": "value1"}]}); assert_eq!( json!({"key.a": ["value0", "value1"]}), - flatten(obj, ".", None, None, false).unwrap() + flatten(obj, ".", None, None, None, false).unwrap() ); } @@ -314,7 +377,7 @@ mod tests { let obj = json!({"key": [{"a": "value0"}, {"a": "value1", "b": "value1"}]}); assert_eq!( json!({"key.a": ["value0", "value1"], "key.b": [null, "value1"]}), - flatten(obj, ".", None, None, false).unwrap() + flatten(obj, ".", None, None, None, false).unwrap() ); } @@ -323,7 +386,7 @@ mod tests { let obj = json!({"key": [{"a": "value0", "b": "value0"}, {"a": "value1"}]}); assert_eq!( json!({"key.a": ["value0", "value1"], "key.b": ["value0", null]}), - flatten(obj, ".", None, None, false).unwrap() + flatten(obj, ".", None, None, None, false).unwrap() ); } @@ -332,7 +395,7 @@ mod tests { let obj = json!({"key": [{"a": {"p": 0}, "b": "value0"}, {"b": "value1"}]}); assert_eq!( json!({"key.a.p": [0, null], "key.b": ["value0", "value1"]}), - flatten(obj, ".", None, None, false).unwrap() + flatten(obj, ".", None, None, None, false).unwrap() ); } @@ -341,14 +404,14 @@ mod tests { let obj = json!({"key": [{"a": [{"p": "value0", "q": "value0"}, {"p": "value1", "q": null}], "b": "value0"}, {"b": "value1"}]}); assert_eq!( json!({"key.a.p": [["value0", "value1"], null], "key.a.q": [["value0", null], null], "key.b": ["value0", "value1"]}), - flatten(obj, ".", None, None, false).unwrap() + flatten(obj, ".", None, None, None, false).unwrap() ); } #[test] fn flatten_mixed_object() { let obj = json!({"a": 42, "arr": ["1", {"key": "2"}, {"key": {"nested": "3"}}]}); - assert!(flatten(obj, ".", None, None, false).is_err()); + assert!(flatten(obj, ".", None, None, None, false).is_err()); } #[test] From 5199c1eef303a32514cda3eeff9e859dd1b9b9e3 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 8 May 2024 16:27:19 +0530 Subject: [PATCH 2/4] use String::default() instead of String::new() --- server/src/event.rs | 2 +- server/src/utils.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/event.rs b/server/src/event.rs index 30e33a9e6..03c4f26e5 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -57,7 +57,7 @@ impl Event { } if !self.custom_partition_values.is_empty() { - let mut custom_partition_key = String::new(); + let mut custom_partition_key = String::default(); for (k, v) in self.custom_partition_values.iter().sorted_by_key(|v| v.0) { custom_partition_key = format!("{custom_partition_key}&{k}={v}"); } diff --git a/server/src/utils.rs b/server/src/utils.rs index a713cf6f0..df2efdaa9 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -64,7 +64,7 @@ pub fn date_to_prefix(date: NaiveDate) -> String { } pub fn custom_partition_to_prefix(custom_partition: HashMap) -> String { - let mut prefix = String::new(); + let mut prefix = String::default(); for (key, value) in custom_partition.iter().sorted_by_key(|v| v.0) { prefix.push_str(&format!("{key}={value}/", key = key, value = value)); } From 01dad22f216ba326749a4e5112e8aa8150a6cea4 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 15 May 2024 15:00:49 +0530 Subject: [PATCH 3/4] fix for grouping multiple arrows to 1 parquet fix for time-partition with/without custom-partition --- server/src/handlers/http/ingest.rs | 38 +++++++++++++++++++++++++ server/src/storage/staging.rs | 21 ++++---------- server/src/utils/arrow/merged_reader.rs | 31 ++++++++++++++++---- 3 files changed, 70 insertions(+), 20 deletions(-) diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index be7d26322..4c857c9c3 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -179,6 +179,44 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result .await?; } } + } else if custom_partition.is_none() { + let data = convert_array_to_object( + body_val.clone(), + time_partition.clone(), + time_partition_limit, + custom_partition.clone(), + )?; + for value in data { + let body_timestamp = value.get(&time_partition.clone().unwrap().to_string()); + parsed_timestamp = body_timestamp + .unwrap() + .to_owned() + .as_str() + .unwrap() + .parse::>() + .unwrap() + .naive_utc(); + + let (rb, is_first_event) = get_stream_schema( + stream_name.clone(), + req.clone(), + value.clone(), + static_schema_flag.clone(), + time_partition.clone(), + )?; + event::Event { + rb, + stream_name: stream_name.clone(), + origin_format: "json", + origin_size: value.to_string().into_bytes().len() as u64, + is_first_event, + parsed_timestamp, + time_partition: time_partition.clone(), + custom_partition_values: HashMap::new(), + } + .process() + .await?; + } } else { let data = convert_array_to_object( body_val.clone(), diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 4d770986a..9e6e724a0 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -140,7 +140,7 @@ impl StorageDir { let mut grouped_arrow_file: HashMap> = HashMap::new(); let arrow_files = self.arrow_files(); for arrow_file_path in arrow_files { - let key = Self::arrow_path_to_parquet(&arrow_file_path); + let key = Self::arrow_path_to_parquet(&arrow_file_path, String::default()); grouped_arrow_file .entry(key) .or_default() @@ -164,8 +164,10 @@ impl StorageDir { .unwrap() .starts_with(&exclude.format("%Y%m%dT%H%M").to_string()) }); + let random_string = + rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 15); for arrow_file_path in arrow_files { - let key = Self::arrow_path_to_parquet(&arrow_file_path); + let key = Self::arrow_path_to_parquet(&arrow_file_path, random_string.clone()); grouped_arrow_file .entry(key) .or_default() @@ -185,22 +187,12 @@ impl StorageDir { .collect() } - fn arrow_path_to_parquet(path: &Path) -> PathBuf { + fn arrow_path_to_parquet(path: &Path, random_string: String) -> PathBuf { let filename = path.file_stem().unwrap().to_str().unwrap(); let (_, filename) = filename.split_once('.').unwrap(); let filename = filename.rsplit_once('.').expect("contains the delim `.`"); let filename = format!("{}.{}", filename.0, filename.1); - - let random_string = - rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 15); let filename_with_random_number = format!("{}.{}.{}", filename, random_string, "arrows"); - /* - let file_stem = path.file_stem().unwrap().to_str().unwrap(); - let random_string = - rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 20); - let (_, filename) = file_stem.split_once('.').unwrap(); - let filename_with_random_number = format!("{}.{}.{}", filename, random_number, "arrows"); - */ let mut parquet_path = path.to_owned(); parquet_path.set_file_name(filename_with_random_number); parquet_path.set_extension("parquet"); @@ -274,8 +266,7 @@ pub fn convert_disk_files_to_parquet( schemas.push(merged_schema.clone()); let schema = Arc::new(merged_schema); let mut writer = ArrowWriter::try_new(parquet_file, schema.clone(), Some(props))?; - - for ref record in record_reader.merged_iter(schema) { + for ref record in record_reader.merged_iter(schema, time_partition.clone()) { writer.write(record)?; } diff --git a/server/src/utils/arrow/merged_reader.rs b/server/src/utils/arrow/merged_reader.rs index ef76ddf3f..b39349c7c 100644 --- a/server/src/utils/arrow/merged_reader.rs +++ b/server/src/utils/arrow/merged_reader.rs @@ -74,11 +74,16 @@ impl MergedReverseRecordReader { Ok(Self { readers }) } - pub fn merged_iter(self, schema: Arc) -> impl Iterator { + pub fn merged_iter( + self, + schema: Arc, + time_partition: Option, + ) -> impl Iterator { let adapted_readers = self.readers.into_iter().map(|reader| reader.flatten()); - kmerge_by(adapted_readers, |a: &RecordBatch, b: &RecordBatch| { - let a_time = get_timestamp_millis(a); - let b_time = get_timestamp_millis(b); + kmerge_by(adapted_readers, move |a: &RecordBatch, b: &RecordBatch| { + // Capture time_partition by value + let a_time = get_timestamp_millis(a, time_partition.clone()); + let b_time = get_timestamp_millis(b, time_partition.clone()); a_time > b_time }) .map(|batch| reverse(&batch)) @@ -95,7 +100,23 @@ impl MergedReverseRecordReader { } } -fn get_timestamp_millis(batch: &RecordBatch) -> i64 { +fn get_timestamp_millis(batch: &RecordBatch, time_partition: Option) -> i64 { + match time_partition { + Some(time_partition) => { + let time_partition = time_partition.as_str(); + match batch.column_by_name(time_partition) { + Some(column) => column + .as_any() + .downcast_ref::() + .unwrap() + .value(0), + None => get_default_timestamp_millis(batch), + } + } + None => get_default_timestamp_millis(batch), + } +} +fn get_default_timestamp_millis(batch: &RecordBatch) -> i64 { match batch .column(0) .as_any() From b4851ccdd30af98b80949cd545cb083adf4ede42 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 15 May 2024 23:39:18 +0530 Subject: [PATCH 4/4] fix for deepsource analysis --- server/src/handlers/http/ingest.rs | 214 ++++++++++++++--------------- 1 file changed, 104 insertions(+), 110 deletions(-) diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 4c857c9c3..954fd4a8f 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -35,7 +35,7 @@ use crate::utils::json::convert_array_to_object; use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; use arrow_schema::{Field, Schema}; use bytes::Bytes; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, NaiveDateTime, Utc}; use http::StatusCode; use serde_json::Value; use std::collections::{BTreeMap, HashMap}; @@ -110,72 +110,41 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result let body_val: Value = serde_json::from_slice(&body)?; let size: usize = body.len(); let mut parsed_timestamp = Utc::now().naive_utc(); - let mut custom_partition_values: HashMap = HashMap::new(); - if time_partition.is_none() { if custom_partition.is_none() { - let stream = stream_name.clone(); - let (rb, is_first_event) = get_stream_schema( - stream.clone(), - req, - body_val, - static_schema_flag, - time_partition.clone(), - )?; - event::Event { - rb: rb.clone(), - stream_name: stream, - origin_format: "json", - origin_size: size as u64, - is_first_event, + let size = size as u64; + create_process_record_batch( + stream_name.clone(), + req.clone(), + body_val.clone(), + static_schema_flag.clone(), + None, parsed_timestamp, - time_partition, - custom_partition_values, - } - .process() + HashMap::new(), + size, + ) .await?; } else { - let data = convert_array_to_object( - body_val.clone(), - time_partition.clone(), - time_partition_limit, - custom_partition.clone(), - )?; + let data = + convert_array_to_object(body_val.clone(), None, None, custom_partition.clone())?; let custom_partition = custom_partition.unwrap(); let custom_partition_list = custom_partition.split(',').collect::>(); for value in data { - for custom_partition_field in custom_partition_list.clone() { - let custom_partition_value = - value.get(custom_partition_field.trim()).unwrap().to_owned(); - let custom_partition_value = match custom_partition_value.clone() { - e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(), - Value::String(s) => s, - _ => "".to_string(), - }; - custom_partition_values.insert( - custom_partition_field.trim().to_string(), - custom_partition_value, - ); - } - let (rb, is_first_event) = get_stream_schema( + let custom_partition_values = + get_custom_partition_values(&value, &custom_partition_list); + + let size = value.to_string().into_bytes().len() as u64; + create_process_record_batch( stream_name.clone(), req.clone(), value.clone(), static_schema_flag.clone(), - time_partition.clone(), - )?; - event::Event { - rb, - stream_name: stream_name.clone(), - origin_format: "json", - origin_size: value.to_string().into_bytes().len() as u64, - is_first_event, + None, parsed_timestamp, - time_partition: time_partition.clone(), - custom_partition_values: custom_partition_values.clone(), - } - .process() + custom_partition_values.clone(), + size, + ) .await?; } } @@ -184,37 +153,21 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result body_val.clone(), time_partition.clone(), time_partition_limit, - custom_partition.clone(), + None, )?; for value in data { - let body_timestamp = value.get(&time_partition.clone().unwrap().to_string()); - parsed_timestamp = body_timestamp - .unwrap() - .to_owned() - .as_str() - .unwrap() - .parse::>() - .unwrap() - .naive_utc(); - - let (rb, is_first_event) = get_stream_schema( + parsed_timestamp = get_parsed_timestamp(&value, &time_partition); + let size = value.to_string().into_bytes().len() as u64; + create_process_record_batch( stream_name.clone(), req.clone(), value.clone(), static_schema_flag.clone(), time_partition.clone(), - )?; - event::Event { - rb, - stream_name: stream_name.clone(), - origin_format: "json", - origin_size: value.to_string().into_bytes().len() as u64, - is_first_event, parsed_timestamp, - time_partition: time_partition.clone(), - custom_partition_values: HashMap::new(), - } - .process() + HashMap::new(), + size, + ) .await?; } } else { @@ -228,47 +181,21 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result let custom_partition_list = custom_partition.split(',').collect::>(); for value in data { - for custom_partition_field in custom_partition_list.clone() { - let custom_partition_value = - value.get(custom_partition_field.trim()).unwrap().to_owned(); - let custom_partition_value = match custom_partition_value.clone() { - e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(), - Value::String(s) => s, - _ => "".to_string(), - }; - custom_partition_values.insert( - custom_partition_field.trim().to_string(), - custom_partition_value, - ); - } - let body_timestamp = value.get(&time_partition.clone().unwrap().to_string()); - parsed_timestamp = body_timestamp - .unwrap() - .to_owned() - .as_str() - .unwrap() - .parse::>() - .unwrap() - .naive_utc(); + let custom_partition_values = + get_custom_partition_values(&value, &custom_partition_list); - let (rb, is_first_event) = get_stream_schema( + parsed_timestamp = get_parsed_timestamp(&value, &time_partition); + let size = value.to_string().into_bytes().len() as u64; + create_process_record_batch( stream_name.clone(), req.clone(), value.clone(), static_schema_flag.clone(), time_partition.clone(), - )?; - event::Event { - rb, - stream_name: stream_name.clone(), - origin_format: "json", - origin_size: value.to_string().into_bytes().len() as u64, - is_first_event, parsed_timestamp, - time_partition: time_partition.clone(), - custom_partition_values: custom_partition_values.clone(), - } - .process() + custom_partition_values.clone(), + size, + ) .await?; } } @@ -276,6 +203,73 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result Ok(()) } +fn get_parsed_timestamp(body: &Value, time_partition: &Option) -> NaiveDateTime { + let body_timestamp = body.get(&time_partition.clone().unwrap().to_string()); + let parsed_timestamp = body_timestamp + .unwrap() + .to_owned() + .as_str() + .unwrap() + .parse::>() + .unwrap() + .naive_utc(); + parsed_timestamp +} + +fn get_custom_partition_values( + body: &Value, + custom_partition_list: &[&str], +) -> HashMap { + let mut custom_partition_values: HashMap = HashMap::new(); + for custom_partition_field in custom_partition_list { + let custom_partition_value = body.get(custom_partition_field.trim()).unwrap().to_owned(); + let custom_partition_value = match custom_partition_value.clone() { + e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(), + Value::String(s) => s, + _ => "".to_string(), + }; + custom_partition_values.insert( + custom_partition_field.trim().to_string(), + custom_partition_value, + ); + } + custom_partition_values +} + +#[allow(clippy::too_many_arguments)] +async fn create_process_record_batch( + stream_name: String, + req: HttpRequest, + value: Value, + static_schema_flag: Option, + time_partition: Option, + parsed_timestamp: NaiveDateTime, + custom_partition_values: HashMap, + origin_size: u64, +) -> Result<(), PostError> { + let (rb, is_first_event) = get_stream_schema( + stream_name.clone(), + req.clone(), + value.clone(), + static_schema_flag.clone(), + time_partition.clone(), + )?; + event::Event { + rb, + stream_name: stream_name.clone(), + origin_format: "json", + origin_size, + is_first_event, + parsed_timestamp, + time_partition: time_partition.clone(), + custom_partition_values: custom_partition_values.clone(), + } + .process() + .await?; + + Ok(()) +} + fn get_stream_schema( stream_name: String, req: HttpRequest,