diff --git a/bin/icehutd/src/cli.rs b/bin/icehutd/src/cli.rs index f5129521f..10d69059a 100644 --- a/bin/icehutd/src/cli.rs +++ b/bin/icehutd/src/cli.rs @@ -66,7 +66,7 @@ pub struct IceHutOpts { #[arg(long, env="FILE_STORAGE_PATH", required_if_eq("backend", "file"), - conflicts_with_all(["access_key_id", "secret_access_key", "region", "bucket", "endpoint", "allow_http"]), + conflicts_with_all(["region", "bucket", "endpoint", "allow_http"]), help_heading="File Backend Options", help="Path to the directory where files will be stored" )] diff --git a/crates/control_plane/src/service.rs b/crates/control_plane/src/service.rs index bfa9d4f5c..20feaa178 100644 --- a/crates/control_plane/src/service.rs +++ b/crates/control_plane/src/service.rs @@ -8,22 +8,18 @@ use arrow_json::writer::JsonArray; use arrow_json::WriterBuilder; use async_trait::async_trait; use bytes::Bytes; -use datafusion::execution::context::SessionContext; -use datafusion::execution::SessionStateBuilder; -use datafusion::prelude::{CsvReadOptions, SessionConfig}; +use datafusion::prelude::CsvReadOptions; use datafusion_iceberg::catalog::catalog::IcebergCatalog; -use datafusion_iceberg::planner::IcebergQueryPlanner; use iceberg_rest_catalog::apis::configuration::Configuration; use iceberg_rest_catalog::catalog::RestCatalog; use object_store::path::Path; use object_store::{ObjectStore, PutPayload}; use runtime::datafusion::execution::SqlExecutor; -use runtime::datafusion::type_planner::CustomTypePlanner; +use runtime::datafusion::session::Session; use rusoto_core::{HttpClient, Region}; use rusoto_credential::StaticProvider; use rusoto_s3::{GetBucketAclRequest, S3Client, S3}; use snafu::ResultExt; -use std::env; use std::sync::Arc; use url::Url; use uuid::Uuid; @@ -77,6 +73,7 @@ pub trait ControlService: Send + Sync { pub struct ControlServiceImpl { storage_profile_repo: Arc, warehouse_repo: Arc, + executor: SqlExecutor, } impl ControlServiceImpl { @@ -84,9 +81,13 @@ impl ControlServiceImpl { storage_profile_repo: Arc, warehouse_repo: Arc, ) -> Self { + let session = Session::default(); + #[allow(clippy::unwrap_used)] + let executor = SqlExecutor::new(session.ctx).unwrap(); Self { storage_profile_repo, warehouse_repo, + executor, } } } @@ -203,29 +204,13 @@ impl ControlService for ControlServiceImpl { #[tracing::instrument(level = "debug", skip(self))] #[allow(clippy::large_futures)] async fn query(&self, query: &str) -> ControlPlaneResult<(Vec, Vec)> { - let sql_parser_dialect = - env::var("SQL_PARSER_DIALECT").unwrap_or_else(|_| "snowflake".to_string()); - let state = SessionStateBuilder::new() - .with_config( - SessionConfig::new() - .with_information_schema(true) - .set_str("datafusion.sql_parser.dialect", &sql_parser_dialect), - ) - .with_default_features() - .with_query_planner(Arc::new(IcebergQueryPlanner {})) - .with_type_planner(Arc::new(CustomTypePlanner {})) - .build(); - let ctx = SessionContext::new_with_state(state); - - // TODO: Should be shared context - let executor = SqlExecutor::new(ctx).context(crate::error::ExecutionSnafu)?; - - let query = executor.preprocess_query(query); - let statement = executor + let query = self.executor.preprocess_query(query); + let statement = self + .executor .parse_query(&query) .context(super::error::DataFusionSnafu)?; - let table_ref = executor.get_table_path(&statement); + let table_ref = self.executor.get_table_path(&statement); let warehouse_name = table_ref .as_ref() .and_then(|table_ref| table_ref.catalog()) @@ -236,7 +221,7 @@ impl ControlService for ControlServiceImpl { (String::from("datafusion"), String::new()) } else { let warehouse = self.get_warehouse_by_name(warehouse_name.clone()).await?; - if executor.ctx.catalog(warehouse.name.as_str()).is_none() { + if self.executor.ctx.catalog(warehouse.name.as_str()).is_none() { let storage_profile = self.get_profile(warehouse.storage_profile_id).await?; let config = { @@ -254,9 +239,11 @@ impl ControlService for ControlServiceImpl { ); let catalog = IcebergCatalog::new(Arc::new(rest_client), None).await?; - executor - .ctx - .register_catalog(warehouse.name.clone(), Arc::new(catalog)); + if self.executor.ctx.catalog(warehouse.name.as_str()).is_none() { + self.executor + .ctx + .register_catalog(warehouse.name.clone(), Arc::new(catalog)); + } let object_store = storage_profile .get_object_store() @@ -264,14 +251,15 @@ impl ControlService for ControlServiceImpl { let endpoint_url = storage_profile .get_object_store_endpoint_url() .map_err(|_| ControlPlaneError::MissingStorageEndpointURL)?; - executor + self.executor .ctx .register_object_store(&endpoint_url, Arc::from(object_store)); } (warehouse.name, warehouse.location) }; - let records: Vec = executor + let records: Vec = self + .executor .query(&query, catalog_name.as_str(), warehouse_location.as_str()) .await .context(crate::error::ExecutionSnafu)? @@ -388,28 +376,26 @@ impl ControlService for ControlServiceImpl { .await .context(crate::error::ObjectStoreSnafu)?; - // Create table from CSV - let config = { - let mut config = Configuration::new(); - config.base_path = "http://0.0.0.0:3000/catalog".to_string(); - config - }; - let object_store_builder = storage_profile - .get_object_store_builder() - .context(crate::error::InvalidStorageProfileSnafu)?; - let rest_client = RestCatalog::new( - Some(warehouse_id.to_string().as_str()), - config, - object_store_builder, - ); - let catalog = IcebergCatalog::new(Arc::new(rest_client), None).await?; - let state = SessionStateBuilder::new() - .with_default_features() - .with_query_planner(Arc::new(IcebergQueryPlanner {})) - .build(); - - let ctx = SessionContext::new_with_state(state); - ctx.register_catalog(warehouse_name.clone(), Arc::new(catalog)); + if self.executor.ctx.catalog(warehouse.name.as_str()).is_none() { + // Create table from CSV + let config = { + let mut config = Configuration::new(); + config.base_path = "http://0.0.0.0:3000/catalog".to_string(); + config + }; + let object_store_builder = storage_profile + .get_object_store_builder() + .context(crate::error::InvalidStorageProfileSnafu)?; + let rest_client = RestCatalog::new( + Some(warehouse_id.to_string().as_str()), + config, + object_store_builder, + ); + let catalog = IcebergCatalog::new(Arc::new(rest_client), None).await?; + self.executor + .ctx + .register_catalog(warehouse.name.clone(), Arc::new(catalog)); + } // Register CSV file as a table let storage_endpoint_url = storage_profile @@ -429,15 +415,18 @@ impl ControlService for ControlServiceImpl { url: storage_endpoint_url, }, )?; - ctx.register_object_store(&endpoint_url, Arc::from(object_store)); - ctx.register_csv(table_name, path_string, CsvReadOptions::new()) + self.executor + .ctx + .register_object_store(&endpoint_url, Arc::from(object_store)); + self.executor + .ctx + .register_csv(table_name, path_string, CsvReadOptions::new()) .await?; let insert_query = format!( "INSERT INTO {warehouse_name}.{database_name}.{table_name} SELECT * FROM {table_name}" ); - let executor = SqlExecutor::new(ctx).context(crate::error::ExecutionSnafu)?; - executor + self.executor .execute_with_custom_plan(&insert_query, warehouse_name.as_str()) .await .context(crate::error::ExecutionSnafu)?; @@ -553,6 +542,7 @@ mod tests { }; use crate::repository::InMemoryStorageProfileRepository; use crate::repository::InMemoryWarehouseRepository; + use std::env; #[tokio::test] async fn test_create_warehouse_failed_no_storage_profile() { diff --git a/crates/runtime/src/datafusion/mod.rs b/crates/runtime/src/datafusion/mod.rs index 4fded0a3e..2eb5bdaa4 100644 --- a/crates/runtime/src/datafusion/mod.rs +++ b/crates/runtime/src/datafusion/mod.rs @@ -4,4 +4,5 @@ pub mod functions; pub mod planner; //pub mod session; pub mod execution; +pub mod session; pub mod type_planner; diff --git a/crates/runtime/src/datafusion/session.rs b/crates/runtime/src/datafusion/session.rs index d2eeff71f..b997f98a0 100644 --- a/crates/runtime/src/datafusion/session.rs +++ b/crates/runtime/src/datafusion/session.rs @@ -1,5 +1,36 @@ +use crate::datafusion::type_planner::CustomTypePlanner; +use datafusion::execution::SessionStateBuilder; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_iceberg::planner::IcebergQueryPlanner; +use std::env; +use std::sync::Arc; +pub struct Session { + pub ctx: SessionContext, +} -pub struct IcehutSession { - pub(crate) ctx: SessionContext -} \ No newline at end of file +impl Default for Session { + fn default() -> Self { + Self::new() + } +} + +impl Session { + #[must_use] + pub fn new() -> Self { + let sql_parser_dialect = + env::var("SQL_PARSER_DIALECT").unwrap_or_else(|_| "snowflake".to_string()); + let state = SessionStateBuilder::new() + .with_config( + SessionConfig::new() + .with_information_schema(true) + .set_str("datafusion.sql_parser.dialect", &sql_parser_dialect), + ) + .with_default_features() + .with_query_planner(Arc::new(IcebergQueryPlanner {})) + .with_type_planner(Arc::new(CustomTypePlanner {})) + .build(); + let ctx = SessionContext::new_with_state(state); + Self { ctx } + } +}