Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::TableProvider;
use datafusion::prelude::*;
use serde_json::Value;
use std::collections::HashSet;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
Expand Down Expand Up @@ -86,7 +87,7 @@ impl Query {
.filter(|path| path_intersects_query(path, self.start, self.end))
.collect();

let possible_parquet_files = arrow_files.clone().into_iter().map(|mut path| {
let possible_parquet_files = arrow_files.iter().cloned().map(|mut path| {
path.set_extension("parquet");
path
});
Expand All @@ -96,12 +97,14 @@ impl Query {
.into_iter()
.filter(|path| path_intersects_query(path, self.start, self.end));

let parquet_files: Vec<PathBuf> = possible_parquet_files.chain(parquet_files).collect();
let parquet_files: HashSet<PathBuf> = possible_parquet_files.chain(parquet_files).collect();
let parquet_files = Vec::from_iter(parquet_files.into_iter());

let ctx = SessionContext::with_config_rt(
SessionConfig::default(),
CONFIG.storage().get_datafusion_runtime(),
);

let table = Arc::new(QueryTableProvider::new(
arrow_files,
parquet_files,
Expand All @@ -116,7 +119,6 @@ impl Query {
// execute the query and collect results
let df = ctx.sql(self.query.as_str()).await?;
let results = df.collect().await?;
table.remove_preserve();
Ok(results)
}
}
Expand Down
16 changes: 9 additions & 7 deletions server/src/query/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,6 @@ impl QueryTableProvider {
}
}

pub fn remove_preserve(&self) {
let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning");
for file in &self.parquet_files {
parquet_cached.remove(file)
}
}

async fn create_physical_plan(
&self,
ctx: &SessionState,
Expand Down Expand Up @@ -111,6 +104,15 @@ impl QueryTableProvider {
}
}

impl Drop for QueryTableProvider {
fn drop(&mut self) {
let mut parquet_cached = crate::storage::CACHED_FILES.lock().expect("no poisoning");
for file in &self.parquet_files {
parquet_cached.remove(file)
}
}
}

#[async_trait]
impl TableProvider for QueryTableProvider {
fn as_any(&self) -> &dyn Any {
Expand Down
4 changes: 2 additions & 2 deletions server/src/storage/file_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl<L: Link + Default> FileTable<L> {
}
}

pub fn get_mut(&mut self, path: &Path) -> &mut L {
self.inner.get_mut(path).unwrap()
pub fn get_mut(&mut self, path: &Path) -> Option<&mut L> {
self.inner.get_mut(path)
}
}
35 changes: 28 additions & 7 deletions server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,12 @@ pub trait ObjectStorage: Sync + 'static {
}

for file in dir.parquet_files() {
let metadata = CACHED_FILES.lock().unwrap().get_mut(&file).metadata;
let Some(metadata) = CACHED_FILES
.lock()
.unwrap()
.get_mut(&file)
.map(|fl| fl.metadata) else { continue };

if metadata != CacheState::Idle {
continue;
}
Expand All @@ -241,20 +246,36 @@ pub trait ObjectStorage: Sync + 'static {
.expect("filename is valid string");
let file_suffix = str::replacen(filename, ".", "/", 3);
let objectstore_path = format!("{}/{}", stream, file_suffix);

CACHED_FILES
.lock()
.unwrap()
.get_mut(&file)
.expect("entry checked at the start")
.set_metadata(CacheState::Uploading);

let compressed_size = file.metadata().map_or(0, |meta| meta.len());

let _put_parquet_file = self.upload_file(&objectstore_path, &file).await?;
CACHED_FILES
.lock()
.unwrap()
.get_mut(&file)
.set_metadata(CacheState::Uploaded);
match self.upload_file(&objectstore_path, &file).await {
Ok(()) => {
CACHED_FILES
.lock()
.unwrap()
.get_mut(&file)
.expect("entry checked at the start")
.set_metadata(CacheState::Uploaded);
}
Err(e) => {
CACHED_FILES
.lock()
.unwrap()
.get_mut(&file)
.expect("entry checked at the start")
.set_metadata(CacheState::Idle);

return Err(e.into());
}
}

stream_stats
.entry(stream)
Expand Down