From ef0e4ff45e398f070ac1c85ec203a5c4d954e792 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Sun, 25 Sep 2022 11:55:37 +0530 Subject: [PATCH 1/3] Use register_table for execute on cache Registering table with `register_listing_table` uses the listing feature of the object store to find the files to be processed. Internally this call `ListingTable::list_files_for_scan` in datafusion which panics when table path is empty. This code change is similar to how query works for s3. --- server/src/query.rs | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/server/src/query.rs b/server/src/query.rs index d655dc111..2723d3583 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -21,6 +21,9 @@ use datafusion::arrow::datatypes::Schema; use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::ListingOptions; +use datafusion::datasource::listing::ListingTable; +use datafusion::datasource::listing::ListingTableConfig; +use datafusion::datasource::listing::ListingTableUrl; use datafusion::prelude::*; use serde_json::Value; use std::sync::Arc; @@ -29,6 +32,7 @@ use crate::metadata::STREAM_INFO; use crate::option::CONFIG; use crate::storage; use crate::storage::ObjectStorage; +use crate::storage::ObjectStorageError; use crate::utils::TimePeriod; use crate::validator; use crate::Error; @@ -102,17 +106,20 @@ impl Query { None => return Ok(()), }; - ctx.register_listing_table( - &self.stream_name, - CONFIG - .parseable - .get_cache_path(&self.stream_name) - .to_str() - .unwrap(), - listing_options, - Some(schema), - ) - .await?; + let cache_path = CONFIG.parseable.get_cache_path(&self.stream_name); + + let table_path = + ListingTableUrl::parse(cache_path.to_str().expect("path should is valid unicode")) + .expect("path should parse into valid listing url for local filesystem"); + + let config = ListingTableConfig::new(table_path) + .with_listing_options(listing_options) + .with_schema(schema); + + let table = ListingTable::try_new(config)?; + + ctx.register_table(&*self.stream_name, Arc::new(table)) + .map_err(ObjectStorageError::DataFusionError)?; // execute the query and collect results let df = ctx.sql(self.query.as_str()).await?; From 8fd315d929962812731065443ab9ff15accd06fb Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 26 Sep 2022 17:07:42 +0530 Subject: [PATCH 2/3] Skip unavailable directories --- server/src/main.rs | 10 ++++++---- server/src/query.rs | 12 +++++++++--- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/server/src/main.rs b/server/src/main.rs index d5f3320ef..3730b3b2f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -114,10 +114,6 @@ fn startup_sync() { for stream in metadata::STREAM_INFO.list_streams() { let dir = StorageDir::new(stream.clone()); - // if data.records file is not present then skip this stream - if !dir.local_data_exists() { - continue; - } if let Err(e) = dir.create_temp_dir() { log::error!( @@ -127,6 +123,12 @@ fn startup_sync() { ); continue; } + + // if data.records file is not present then skip this stream + if !dir.local_data_exists() { + continue; + } + // create prefix for this file from its last modified time let path = dir.data_path.join("data.records"); diff --git a/server/src/query.rs b/server/src/query.rs index 2723d3583..318201433 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -108,9 +108,15 @@ impl Query { let cache_path = CONFIG.parseable.get_cache_path(&self.stream_name); - let table_path = - ListingTableUrl::parse(cache_path.to_str().expect("path should is valid unicode")) - .expect("path should parse into valid listing url for local filesystem"); + let table_path = match ListingTableUrl::parse( + cache_path.to_str().expect("path should is valid unicode"), + ) { + Ok(table_path) => table_path, + Err(e) => { + log::warn!("could not parse local filesystem path. Maybe directory does not exist. Error {}", e); + return Ok(()); + } + }; let config = ListingTableConfig::new(table_path) .with_listing_options(listing_options) From 80b8391378030402b3888b248a16d74e490e942d Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 26 Sep 2022 18:02:40 +0530 Subject: [PATCH 3/3] Remove hostname --- server/src/s3.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/s3.rs b/server/src/s3.rs index 37a4fa007..d7a7ee0c1 100644 --- a/server/src/s3.rs +++ b/server/src/s3.rs @@ -29,7 +29,6 @@ use crate::metadata::Stats; use crate::option::{StorageOpt, CONFIG}; use crate::query::Query; use crate::storage::{LogStream, ObjectStorage, ObjectStorageError}; -use crate::utils::hostname_unchecked; // Default object storage currently is DO Spaces bucket // Any user who starts the Parseable server with default configuration @@ -427,7 +426,7 @@ impl ObjectStorage for S3 { let file_format = ParquetFormat::default().with_enable_pruning(true); let listing_options = ListingOptions { - file_extension: format!("{}.data.parquet", hostname_unchecked()), + file_extension: ".data.parquet".to_string(), format: Arc::new(file_format), table_partition_cols: vec![], collect_stat: true,