diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 47eaab7a0..7326c82aa 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -38,6 +38,7 @@ use parquet::{ }; use crate::{ + event::DEFAULT_TIMESTAMP_KEY, metrics, option::CONFIG, storage::OBJECT_STORE_DATA_GRANULARITY, @@ -47,6 +48,9 @@ use crate::{ }, }; +const ARROW_FILE_EXTENSION: &str = "data.arrows"; +const PARQUET_FILE_EXTENSION: &str = "data.parquet"; + // in mem global that hold all the in mem buffer that are ready to convert pub static MEMORY_READ_BUFFERS: Lazy>>> = Lazy::new(RwLock::default); @@ -93,7 +97,7 @@ impl StorageDir { format!( "{}.{}", stream_hash, - Self::file_time_suffix(time, "data.arrows") + Self::file_time_suffix(time, ARROW_FILE_EXTENSION) ) } @@ -140,8 +144,8 @@ impl StorageDir { &self, exclude: NaiveDateTime, ) -> HashMap> { - let hot_filename = StorageDir::file_time_suffix(exclude, "data.arrow"); - // hashmap but exclude where hotfilename matches + let hot_filename = StorageDir::file_time_suffix(exclude, ARROW_FILE_EXTENSION); + // hashmap but exclude where hot filename matches let mut grouped_arrow_file: HashMap> = HashMap::new(); let mut arrow_files = self.arrow_files(); arrow_files.retain(|path| { @@ -185,7 +189,7 @@ impl StorageDir { pub fn to_parquet_path(stream_name: &str, time: NaiveDateTime) -> PathBuf { let data_path = CONFIG.parseable.local_stream_data_path(stream_name); - let dir = StorageDir::file_time_suffix(time, "data.parquet"); + let dir = StorageDir::file_time_suffix(time, PARQUET_FILE_EXTENSION); data_path.join(dir) } @@ -278,7 +282,7 @@ fn parquet_writer_props() -> WriterPropertiesBuilder { .set_max_row_group_size(CONFIG.parseable.row_group_size) .set_compression(CONFIG.parseable.parquet_compression.into()) .set_column_encoding( - ColumnPath::new(vec!["p_timestamp".to_string()]), + ColumnPath::new(vec![DEFAULT_TIMESTAMP_KEY.to_string()]), Encoding::DELTA_BINARY_PACKED, ) }