diff --git a/server/src/query.rs b/server/src/query.rs index f4285da83..b6c5ae20b 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -25,6 +25,7 @@ use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::TableProvider; use datafusion::prelude::*; use serde_json::Value; +use std::collections::HashSet; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; @@ -86,7 +87,7 @@ impl Query { .filter(|path| path_intersects_query(path, self.start, self.end)) .collect(); - let possible_parquet_files = arrow_files.clone().into_iter().map(|mut path| { + let possible_parquet_files = arrow_files.iter().cloned().map(|mut path| { path.set_extension("parquet"); path }); @@ -96,12 +97,14 @@ impl Query { .into_iter() .filter(|path| path_intersects_query(path, self.start, self.end)); - let parquet_files: Vec = possible_parquet_files.chain(parquet_files).collect(); + let parquet_files: HashSet = possible_parquet_files.chain(parquet_files).collect(); + let parquet_files = Vec::from_iter(parquet_files.into_iter()); let ctx = SessionContext::with_config_rt( SessionConfig::default(), CONFIG.storage().get_datafusion_runtime(), ); + let table = Arc::new(QueryTableProvider::new( arrow_files, parquet_files, @@ -116,7 +119,6 @@ impl Query { // execute the query and collect results let df = ctx.sql(self.query.as_str()).await?; let results = df.collect().await?; - table.remove_preserve(); Ok(results) } } diff --git a/server/src/query/table_provider.rs b/server/src/query/table_provider.rs index 17707bef9..e6aff729b 100644 --- a/server/src/query/table_provider.rs +++ b/server/src/query/table_provider.rs @@ -67,13 +67,6 @@ impl QueryTableProvider { } } - pub fn remove_preserve(&self) { - let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning"); - for file in &self.parquet_files { - parquet_cached.remove(file) - } - } - async fn create_physical_plan( &self, ctx: &SessionState, @@ -111,6 +104,15 @@ impl QueryTableProvider { } } +impl Drop for QueryTableProvider { + fn drop(&mut self) { + let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning"); + for file in &self.parquet_files { + parquet_cached.remove(file) + } + } +} + #[async_trait] impl TableProvider for QueryTableProvider { fn as_any(&self) -> &dyn Any { diff --git a/server/src/storage/file_link.rs b/server/src/storage/file_link.rs index 70af28016..90d2176a5 100644 --- a/server/src/storage/file_link.rs +++ b/server/src/storage/file_link.rs @@ -97,7 +97,7 @@ impl FileTable { } } - pub fn get_mut(&mut self, path: &Path) -> &mut L { - self.inner.get_mut(path).unwrap() + pub fn get_mut(&mut self, path: &Path) -> Option<&mut L> { + self.inner.get_mut(path) } } diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index f1c0a0f20..0b70085e6 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -229,7 +229,12 @@ pub trait ObjectStorage: Sync + 'static { } for file in dir.parquet_files() { - let metadata = CACHED_FILES.lock().unwrap().get_mut(&file).metadata; + let Some(metadata) = CACHED_FILES + .lock() + .unwrap() + .get_mut(&file) + .map(|fl| fl.metadata) else { continue }; + if metadata != CacheState::Idle { continue; } @@ -241,20 +246,36 @@ pub trait ObjectStorage: Sync + 'static { .expect("filename is valid string"); let file_suffix = str::replacen(filename, ".", "/", 3); let objectstore_path = format!("{}/{}", stream, file_suffix); + CACHED_FILES .lock() .unwrap() .get_mut(&file) + .expect("entry checked at the start") .set_metadata(CacheState::Uploading); let compressed_size = file.metadata().map_or(0, |meta| meta.len()); - let _put_parquet_file = self.upload_file(&objectstore_path, &file).await?; - CACHED_FILES - .lock() - .unwrap() - .get_mut(&file) - .set_metadata(CacheState::Uploaded); + match self.upload_file(&objectstore_path, &file).await { + Ok(()) => { + CACHED_FILES + .lock() + .unwrap() + .get_mut(&file) + .expect("entry checked at the start") + .set_metadata(CacheState::Uploaded); + } + Err(e) => { + CACHED_FILES + .lock() + .unwrap() + .get_mut(&file) + .expect("entry checked at the start") + .set_metadata(CacheState::Idle); + + return Err(e.into()); + } + } stream_stats .entry(stream)