diff --git a/server/src/main.rs b/server/src/main.rs index d5f3320ef..3730b3b2f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -114,10 +114,6 @@ fn startup_sync() { for stream in metadata::STREAM_INFO.list_streams() { let dir = StorageDir::new(stream.clone()); - // if data.records file is not present then skip this stream - if !dir.local_data_exists() { - continue; - } if let Err(e) = dir.create_temp_dir() { log::error!( @@ -127,6 +123,12 @@ fn startup_sync() { ); continue; } + + // if data.records file is not present then skip this stream + if !dir.local_data_exists() { + continue; + } + // create prefix for this file from its last modified time let path = dir.data_path.join("data.records"); diff --git a/server/src/query.rs b/server/src/query.rs index d655dc111..318201433 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -21,6 +21,9 @@ use datafusion::arrow::datatypes::Schema; use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::ListingOptions; +use datafusion::datasource::listing::ListingTable; +use datafusion::datasource::listing::ListingTableConfig; +use datafusion::datasource::listing::ListingTableUrl; use datafusion::prelude::*; use serde_json::Value; use std::sync::Arc; @@ -29,6 +32,7 @@ use crate::metadata::STREAM_INFO; use crate::option::CONFIG; use crate::storage; use crate::storage::ObjectStorage; +use crate::storage::ObjectStorageError; use crate::utils::TimePeriod; use crate::validator; use crate::Error; @@ -102,17 +106,26 @@ impl Query { None => return Ok(()), }; - ctx.register_listing_table( - &self.stream_name, - CONFIG - .parseable - .get_cache_path(&self.stream_name) - .to_str() - .unwrap(), - listing_options, - Some(schema), - ) - .await?; + let cache_path = CONFIG.parseable.get_cache_path(&self.stream_name); + + let table_path = match ListingTableUrl::parse( + cache_path.to_str().expect("path should is valid unicode"), + ) { + Ok(table_path) => table_path, + Err(e) => { + log::warn!("could not parse local filesystem path. Maybe directory does not exist. Error {}", e); + return Ok(()); + } + }; + + let config = ListingTableConfig::new(table_path) + .with_listing_options(listing_options) + .with_schema(schema); + + let table = ListingTable::try_new(config)?; + + ctx.register_table(&*self.stream_name, Arc::new(table)) + .map_err(ObjectStorageError::DataFusionError)?; // execute the query and collect results let df = ctx.sql(self.query.as_str()).await?; diff --git a/server/src/s3.rs b/server/src/s3.rs index 37a4fa007..d7a7ee0c1 100644 --- a/server/src/s3.rs +++ b/server/src/s3.rs @@ -29,7 +29,6 @@ use crate::metadata::Stats; use crate::option::{StorageOpt, CONFIG}; use crate::query::Query; use crate::storage::{LogStream, ObjectStorage, ObjectStorageError}; -use crate::utils::hostname_unchecked; // Default object storage currently is DO Spaces bucket // Any user who starts the Parseable server with default configuration @@ -427,7 +426,7 @@ impl ObjectStorage for S3 { let file_format = ParquetFormat::default().with_enable_pruning(true); let listing_options = ListingOptions { - file_extension: format!("{}.data.parquet", hostname_unchecked()), + file_extension: ".data.parquet".to_string(), format: Arc::new(file_format), table_partition_cols: vec![], collect_stat: true,