From e81c7f403cc5072064fba590295460a3f5dabafd Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 7 Feb 2024 00:12:27 +0530 Subject: [PATCH] fix for #651: removed use of env var P_STORAGE_UPLOAD_INTERVAL added const of 60 secs to be used for local to storage sync --- server/src/main.rs | 3 ++- server/src/option.rs | 30 ------------------------------ server/src/storage/staging.rs | 16 ++++++---------- 3 files changed, 8 insertions(+), 41 deletions(-) diff --git a/server/src/main.rs b/server/src/main.rs index 157c982f1..ef0cb2cc6 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -50,6 +50,7 @@ mod validator; use option::CONFIG; use crate::localcache::LocalCacheManager; +pub const STORAGE_UPLOAD_INTERVAL: u32 = 60; #[actix_web::main] async fn main() -> anyhow::Result<()> { @@ -129,7 +130,7 @@ fn object_store_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sende rt.block_on(async { let mut scheduler = AsyncScheduler::new(); scheduler - .every((CONFIG.parseable.upload_interval as u32).seconds()) + .every(STORAGE_UPLOAD_INTERVAL.seconds()) // Extra time interval is added so that this schedular does not race with local sync. .plus(5u32.seconds()) .run(|| async { diff --git a/server/src/option.rs b/server/src/option.rs index 624e11388..5d713f28b 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -215,10 +215,6 @@ pub struct Server { /// Size for local cache pub local_cache_size: u64, - /// Interval in seconds after which uncommited data would be - /// uploaded to the storage platform. - pub upload_interval: u64, - /// Username for the basic authentication on the server pub username: String, @@ -284,10 +280,6 @@ impl FromArgMatches for Server { .get_one::(Self::CACHE_SIZE) .cloned() .expect("default value for cache size"); - self.upload_interval = m - .get_one::(Self::UPLOAD_INTERVAL) - .cloned() - .expect("default value for upload"); self.username = m .get_one::(Self::USERNAME) .cloned() @@ -381,7 +373,6 @@ impl Server { pub const STAGING: &'static str = "local-staging-path"; pub const CACHE: &'static str = "cache-path"; pub const CACHE_SIZE: &'static str = "cache-size"; - pub const UPLOAD_INTERVAL: &'static str = "upload-interval"; pub const USERNAME: &'static str = "username"; pub const PASSWORD: &'static str = "password"; pub const CHECK_UPDATE: &'static str = "check-update"; @@ -467,16 +458,6 @@ impl Server { .help("Maximum allowed cache size for all streams combined (In human readable format, e.g 1GiB, 2GiB, 100MB)") .next_line_help(true), ) - .arg( - Arg::new(Self::UPLOAD_INTERVAL) - .long(Self::UPLOAD_INTERVAL) - .env("P_STORAGE_UPLOAD_INTERVAL") - .value_name("SECONDS") - .default_value("60") - .value_parser(validation::upload_interval) - .help("Interval in seconds after which staging data would be sent to the storage") - .next_line_help(true), - ) .arg( Arg::new(Self::USERNAME) .long(Self::USERNAME) @@ -677,7 +658,6 @@ pub mod validation { use path_clean::PathClean; use crate::option::MIN_CACHE_SIZE_BYTES; - use crate::storage::LOCAL_SYNC_INTERVAL; use human_size::{multiples, SpecificSize}; pub fn file_path(s: &str) -> Result { @@ -755,14 +735,4 @@ pub mod validation { } Ok(size) } - - pub fn upload_interval(s: &str) -> Result { - let u = s - .parse::() - .map_err(|_| "invalid upload interval".to_string())?; - if u < LOCAL_SYNC_INTERVAL { - return Err("object storage upload interval must be 60 seconds or more".to_string()); - } - Ok(u) - } } diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 04d3bef9a..31c5dffed 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -134,16 +134,12 @@ impl StorageDir { .ends_with(&hot_filename) }); - //check if arrow files is not empty, fetch the parquet file path from last file from sorted arrow file list - if !(arrow_files.is_empty()) { - arrow_files.sort(); - let key = Self::arrow_path_to_parquet(arrow_files.last().unwrap()); - for arrow_file_path in arrow_files { - grouped_arrow_file - .entry(key.clone()) - .or_default() - .push(arrow_file_path); - } + for arrow_file_path in arrow_files { + let key = Self::arrow_path_to_parquet(&arrow_file_path); + grouped_arrow_file + .entry(key) + .or_default() + .push(arrow_file_path); } grouped_arrow_file