From e143b35e7b298fb0ade448c9d5e7690d1fa6e68a Mon Sep 17 00:00:00 2001 From: osipovartem Date: Fri, 7 Feb 2025 12:02:44 +0300 Subject: [PATCH 1/2] # This is a combination of 5 commits. # This is the 1st commit message: Make common sql executor Make common sql executor Alter session params to current context Make common sql executor (#209) * Make common sql executor * Make common sql executor # This is the commit message #2: Fix deployment (#211) * Fix deployment * Fix binary name # This is the commit message #3: Fix tmp EOF until fix released # This is the commit message #4: Register CSV within current context # This is the commit message #5: Register CSV within current context --- Dockerfile | 4 +- bin/icehutd/src/cli.rs | 2 +- crates/control_plane/src/service.rs | 106 +++++++++--------- crates/runtime/src/datafusion/execution.rs | 80 ++++++++++---- crates/runtime/src/datafusion/mod.rs | 1 + crates/runtime/src/datafusion/session.rs | 120 ++++++++++++++++++++- 6 files changed, 228 insertions(+), 85 deletions(-) diff --git a/Dockerfile b/Dockerfile index 1b5b1f505..3d38a7b7d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -25,7 +25,7 @@ WORKDIR /app RUN apt-get update && apt-get install -y openssl && rm -rf /var/lib/apt/lists/* -COPY --from=builder /app/target/release/nexus ./ +COPY --from=builder /app/target/release/icehutd ./ COPY --from=builder /app/rest-catalog-open-api.yaml rest-catalog-open-api.yaml -CMD ["./nexus"] +CMD ["./icehutd"] diff --git a/bin/icehutd/src/cli.rs b/bin/icehutd/src/cli.rs index f5129521f..10d69059a 100644 --- a/bin/icehutd/src/cli.rs +++ b/bin/icehutd/src/cli.rs @@ -66,7 +66,7 @@ pub struct IceHutOpts { #[arg(long, env="FILE_STORAGE_PATH", required_if_eq("backend", "file"), - conflicts_with_all(["access_key_id", "secret_access_key", "region", "bucket", "endpoint", "allow_http"]), + conflicts_with_all(["region", "bucket", "endpoint", "allow_http"]), help_heading="File Backend Options", help="Path to the directory where files will be stored" )] diff --git a/crates/control_plane/src/service.rs b/crates/control_plane/src/service.rs index bfa9d4f5c..20feaa178 100644 --- a/crates/control_plane/src/service.rs +++ b/crates/control_plane/src/service.rs @@ -8,22 +8,18 @@ use arrow_json::writer::JsonArray; use arrow_json::WriterBuilder; use async_trait::async_trait; use bytes::Bytes; -use datafusion::execution::context::SessionContext; -use datafusion::execution::SessionStateBuilder; -use datafusion::prelude::{CsvReadOptions, SessionConfig}; +use datafusion::prelude::CsvReadOptions; use datafusion_iceberg::catalog::catalog::IcebergCatalog; -use datafusion_iceberg::planner::IcebergQueryPlanner; use iceberg_rest_catalog::apis::configuration::Configuration; use iceberg_rest_catalog::catalog::RestCatalog; use object_store::path::Path; use object_store::{ObjectStore, PutPayload}; use runtime::datafusion::execution::SqlExecutor; -use runtime::datafusion::type_planner::CustomTypePlanner; +use runtime::datafusion::session::Session; use rusoto_core::{HttpClient, Region}; use rusoto_credential::StaticProvider; use rusoto_s3::{GetBucketAclRequest, S3Client, S3}; use snafu::ResultExt; -use std::env; use std::sync::Arc; use url::Url; use uuid::Uuid; @@ -77,6 +73,7 @@ pub trait ControlService: Send + Sync { pub struct ControlServiceImpl { storage_profile_repo: Arc, warehouse_repo: Arc, + executor: SqlExecutor, } impl ControlServiceImpl { @@ -84,9 +81,13 @@ impl ControlServiceImpl { storage_profile_repo: Arc, warehouse_repo: Arc, ) -> Self { + let session = Session::default(); + #[allow(clippy::unwrap_used)] + let executor = SqlExecutor::new(session.ctx).unwrap(); Self { storage_profile_repo, warehouse_repo, + executor, } } } @@ -203,29 +204,13 @@ impl ControlService for ControlServiceImpl { #[tracing::instrument(level = "debug", skip(self))] #[allow(clippy::large_futures)] async fn query(&self, query: &str) -> ControlPlaneResult<(Vec, Vec)> { - let sql_parser_dialect = - env::var("SQL_PARSER_DIALECT").unwrap_or_else(|_| "snowflake".to_string()); - let state = SessionStateBuilder::new() - .with_config( - SessionConfig::new() - .with_information_schema(true) - .set_str("datafusion.sql_parser.dialect", &sql_parser_dialect), - ) - .with_default_features() - .with_query_planner(Arc::new(IcebergQueryPlanner {})) - .with_type_planner(Arc::new(CustomTypePlanner {})) - .build(); - let ctx = SessionContext::new_with_state(state); - - // TODO: Should be shared context - let executor = SqlExecutor::new(ctx).context(crate::error::ExecutionSnafu)?; - - let query = executor.preprocess_query(query); - let statement = executor + let query = self.executor.preprocess_query(query); + let statement = self + .executor .parse_query(&query) .context(super::error::DataFusionSnafu)?; - let table_ref = executor.get_table_path(&statement); + let table_ref = self.executor.get_table_path(&statement); let warehouse_name = table_ref .as_ref() .and_then(|table_ref| table_ref.catalog()) @@ -236,7 +221,7 @@ impl ControlService for ControlServiceImpl { (String::from("datafusion"), String::new()) } else { let warehouse = self.get_warehouse_by_name(warehouse_name.clone()).await?; - if executor.ctx.catalog(warehouse.name.as_str()).is_none() { + if self.executor.ctx.catalog(warehouse.name.as_str()).is_none() { let storage_profile = self.get_profile(warehouse.storage_profile_id).await?; let config = { @@ -254,9 +239,11 @@ impl ControlService for ControlServiceImpl { ); let catalog = IcebergCatalog::new(Arc::new(rest_client), None).await?; - executor - .ctx - .register_catalog(warehouse.name.clone(), Arc::new(catalog)); + if self.executor.ctx.catalog(warehouse.name.as_str()).is_none() { + self.executor + .ctx + .register_catalog(warehouse.name.clone(), Arc::new(catalog)); + } let object_store = storage_profile .get_object_store() @@ -264,14 +251,15 @@ impl ControlService for ControlServiceImpl { let endpoint_url = storage_profile .get_object_store_endpoint_url() .map_err(|_| ControlPlaneError::MissingStorageEndpointURL)?; - executor + self.executor .ctx .register_object_store(&endpoint_url, Arc::from(object_store)); } (warehouse.name, warehouse.location) }; - let records: Vec = executor + let records: Vec = self + .executor .query(&query, catalog_name.as_str(), warehouse_location.as_str()) .await .context(crate::error::ExecutionSnafu)? @@ -388,28 +376,26 @@ impl ControlService for ControlServiceImpl { .await .context(crate::error::ObjectStoreSnafu)?; - // Create table from CSV - let config = { - let mut config = Configuration::new(); - config.base_path = "http://0.0.0.0:3000/catalog".to_string(); - config - }; - let object_store_builder = storage_profile - .get_object_store_builder() - .context(crate::error::InvalidStorageProfileSnafu)?; - let rest_client = RestCatalog::new( - Some(warehouse_id.to_string().as_str()), - config, - object_store_builder, - ); - let catalog = IcebergCatalog::new(Arc::new(rest_client), None).await?; - let state = SessionStateBuilder::new() - .with_default_features() - .with_query_planner(Arc::new(IcebergQueryPlanner {})) - .build(); - - let ctx = SessionContext::new_with_state(state); - ctx.register_catalog(warehouse_name.clone(), Arc::new(catalog)); + if self.executor.ctx.catalog(warehouse.name.as_str()).is_none() { + // Create table from CSV + let config = { + let mut config = Configuration::new(); + config.base_path = "http://0.0.0.0:3000/catalog".to_string(); + config + }; + let object_store_builder = storage_profile + .get_object_store_builder() + .context(crate::error::InvalidStorageProfileSnafu)?; + let rest_client = RestCatalog::new( + Some(warehouse_id.to_string().as_str()), + config, + object_store_builder, + ); + let catalog = IcebergCatalog::new(Arc::new(rest_client), None).await?; + self.executor + .ctx + .register_catalog(warehouse.name.clone(), Arc::new(catalog)); + } // Register CSV file as a table let storage_endpoint_url = storage_profile @@ -429,15 +415,18 @@ impl ControlService for ControlServiceImpl { url: storage_endpoint_url, }, )?; - ctx.register_object_store(&endpoint_url, Arc::from(object_store)); - ctx.register_csv(table_name, path_string, CsvReadOptions::new()) + self.executor + .ctx + .register_object_store(&endpoint_url, Arc::from(object_store)); + self.executor + .ctx + .register_csv(table_name, path_string, CsvReadOptions::new()) .await?; let insert_query = format!( "INSERT INTO {warehouse_name}.{database_name}.{table_name} SELECT * FROM {table_name}" ); - let executor = SqlExecutor::new(ctx).context(crate::error::ExecutionSnafu)?; - executor + self.executor .execute_with_custom_plan(&insert_query, warehouse_name.as_str()) .await .context(crate::error::ExecutionSnafu)?; @@ -553,6 +542,7 @@ mod tests { }; use crate::repository::InMemoryStorageProfileRepository; use crate::repository::InMemoryWarehouseRepository; + use std::env; #[tokio::test] async fn test_create_warehouse_failed_no_storage_profile() { diff --git a/crates/runtime/src/datafusion/execution.rs b/crates/runtime/src/datafusion/execution.rs index 8edaa2e38..b8a7db9d0 100644 --- a/crates/runtime/src/datafusion/execution.rs +++ b/crates/runtime/src/datafusion/execution.rs @@ -4,6 +4,7 @@ use super::error::{self as ih_error, IcehutSQLError, IcehutSQLResult}; use crate::datafusion::functions::register_udfs; use crate::datafusion::planner::ExtendedSqlToRel; +use crate::datafusion::session::SessionParams; use arrow::array::{RecordBatch, UInt64Array}; use arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use datafusion::common::tree_node::{TransformedResult, TreeNode}; @@ -88,6 +89,9 @@ impl SqlExecutor { // etc if let DFStatement::Statement(s) = statement { match *s { + Statement::AlterSession { .. } => { + return Box::pin(self.alter_session_query(*s)).await; + } Statement::CreateTable { .. } => { return Box::pin(self.create_table_query(*s, warehouse_name)).await; } @@ -110,7 +114,7 @@ impl SqlExecutor { } Statement::CreateStage { .. } => { // We support only CSV uploads for now - return Box::pin(self.create_stage_query(*s, warehouse_name)).await; + return Box::pin(self.create_stage_query(*s)).await; } Statement::CopyIntoSnowflake { .. } => { return Box::pin(self.copy_into_snowflake_query(*s, warehouse_name)).await; @@ -166,20 +170,60 @@ impl SqlExecutor { let date_add = regex::Regex::new(r"(date|time|timestamp)(_?add|_?diff)\(\s*([a-zA-Z]+),").unwrap(); - let query = re + let mut query = re .replace_all(query, "json_get(json_get($1, $2), '$3')") .to_string(); - let query = date_add.replace_all(&query, "$1$2('$3',").to_string(); - // TODO implement alter session logic + query = date_add.replace_all(&query, "$1$2('$3',").to_string(); + // TODO remove this check after release of https://github.com/Embucket/datafusion-sqlparser-rs/pull/8 + if query.to_lowercase().contains("alter session") { + query = query.replace(';', ""); + } query - .replace( - "alter session set query_tag = 'snowplow_dbt'", - "SHOW session", - ) .replace("skip_header=1", "skip_header=TRUE") .replace("FROM @~/", "FROM ") } + #[allow(clippy::unused_async)] + pub async fn alter_session_query( + &self, + statement: Statement, + ) -> IcehutSQLResult> { + if let Statement::AlterSession { + set, + session_params, + } = statement + { + let state = self.ctx.state_ref(); + let mut write = state.write(); + let config = write + .config_mut() + .options_mut() + .extensions + .get_mut::(); + if let Some(cfg) = config { + let params = session_params + .options + .iter() + .map(|v| (v.option_name.clone(), v.value.clone())) + .collect::>(); + if set { + cfg.set_properties(params) + .context(ih_error::DataFusionSnafu)?; + } else { + cfg.remove_properties(params) + .context(ih_error::DataFusionSnafu)?; + } + } + Ok(vec![]) + } else { + Err(IcehutSQLError::DataFusion { + source: DataFusionError::NotImplemented( + "Only ALTER SESSION statements are supported".to_string(), + ), + }) + } + } + #[allow(clippy::redundant_else, clippy::too_many_lines)] #[tracing::instrument(level = "trace", skip(self), err, ret)] pub async fn create_table_query( @@ -330,7 +374,6 @@ impl SqlExecutor { pub async fn create_stage_query( &self, statement: Statement, - warehouse_name: &str, ) -> IcehutSQLResult> { if let Statement::CreateStage { name, @@ -368,7 +411,6 @@ impl SqlExecutor { .unwrap_or(b'"'); let file_path = stage_params.url.unwrap_or_default(); - let stage_table_name = format!("stage_{table_name}"); let url = Url::parse(file_path.as_str()).map_err(|_| IcehutSQLError::InvalidIdentifier { ident: file_path.clone(), @@ -414,7 +456,7 @@ impl SqlExecutor { // Register CSV file with filled missing datatype with default Utf8 self.ctx .register_csv( - stage_table_name.clone(), + table_name.value.clone(), file_path, CsvReadOptions::new() .has_header(skip_header) @@ -423,13 +465,7 @@ impl SqlExecutor { ) .await .context(ih_error::DataFusionSnafu)?; - - // Create stages database and create table with prepared schema - // TODO Don't create table in case we have common ctx - self.create_database(warehouse_name, ObjectName(vec![Ident::new("stages")]), true) - .await?; - let create_query = format!("CREATE TABLE {warehouse_name}.stages.{table_name} AS (SELECT * FROM {stage_table_name})"); - self.query(&create_query, warehouse_name, "").await + Ok(vec![]) } else { Err(IcehutSQLError::DataFusion { source: DataFusionError::NotImplemented( @@ -449,9 +485,8 @@ impl SqlExecutor { } = statement { // Insert data to table - let from_query = from_stage.to_string().replace('@', ""); - let insert_query = - format!("INSERT INTO {into} SELECT * FROM {warehouse_name}.stages.{from_query}"); + let stage_name = from_stage.to_string().replace('@', ""); + let insert_query = format!("INSERT INTO {into} SELECT * FROM {stage_name}"); self.execute_with_custom_plan(&insert_query, warehouse_name) .await } else { @@ -917,6 +952,9 @@ impl SqlExecutor { match statement.clone() { DFStatement::Statement(s) => match *s { + Statement::CopyIntoSnowflake { into, .. } => { + Some(TableReference::parse_str(&into.to_string())) + } Statement::Drop { names, .. } => { Some(TableReference::parse_str(&names[0].to_string())) } diff --git a/crates/runtime/src/datafusion/mod.rs b/crates/runtime/src/datafusion/mod.rs index 4fded0a3e..2eb5bdaa4 100644 --- a/crates/runtime/src/datafusion/mod.rs +++ b/crates/runtime/src/datafusion/mod.rs @@ -4,4 +4,5 @@ pub mod functions; pub mod planner; //pub mod session; pub mod execution; +pub mod session; pub mod type_planner; diff --git a/crates/runtime/src/datafusion/session.rs b/crates/runtime/src/datafusion/session.rs index d2eeff71f..3ca3104ee 100644 --- a/crates/runtime/src/datafusion/session.rs +++ b/crates/runtime/src/datafusion/session.rs @@ -1,5 +1,119 @@ +use crate::datafusion::type_planner::CustomTypePlanner; +<<<<<<< HEAD +use datafusion::common::error::Result as DFResult; +use datafusion::execution::SessionStateBuilder; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_common::config::{ConfigEntry, ConfigExtension, ExtensionOptions}; +use datafusion_iceberg::planner::IcebergQueryPlanner; +use std::any::Any; +use std::collections::HashMap; +use std::env; +use std::sync::Arc; +pub struct Session { + pub ctx: SessionContext, +} +impl Default for Session { + fn default() -> Self { + Self::new() + } +} -pub struct IcehutSession { - pub(crate) ctx: SessionContext -} \ No newline at end of file +======= +use datafusion::execution::SessionStateBuilder; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_iceberg::planner::IcebergQueryPlanner; +use std::env; +use std::sync::Arc; + +pub struct Session { + pub ctx: SessionContext, +} + +impl Default for Session { + fn default() -> Self { + Self::new() + } +} + +>>>>>>> d972fcb (Make common sql executor (#209)) +impl Session { + #[must_use] + pub fn new() -> Self { + let sql_parser_dialect = + env::var("SQL_PARSER_DIALECT").unwrap_or_else(|_| "snowflake".to_string()); + let state = SessionStateBuilder::new() + .with_config( + SessionConfig::new() +<<<<<<< HEAD + .with_option_extension(SessionParams::default()) +======= +>>>>>>> d972fcb (Make common sql executor (#209)) + .with_information_schema(true) + .set_str("datafusion.sql_parser.dialect", &sql_parser_dialect), + ) + .with_default_features() + .with_query_planner(Arc::new(IcebergQueryPlanner {})) + .with_type_planner(Arc::new(CustomTypePlanner {})) + .build(); + let ctx = SessionContext::new_with_state(state); + Self { ctx } + } +} +<<<<<<< HEAD + +#[derive(Default, Debug, Clone)] +pub struct SessionParams { + pub properties: HashMap, +} + +impl SessionParams { + pub fn set_properties(&mut self, properties: HashMap) -> DFResult<()> { + for (key, value) in properties { + self.properties + .insert(format!("session_params.{key}"), value); + } + Ok(()) + } + + pub fn remove_properties(&mut self, properties: HashMap) -> DFResult<()> { + for (key, ..) in properties { + self.properties.remove(&key); + } + Ok(()) + } +} + +impl ConfigExtension for SessionParams { + const PREFIX: &'static str = "session_params"; +} + +impl ExtensionOptions for SessionParams { + fn as_any(&self) -> &dyn Any { + self + } + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + fn cloned(&self) -> Box { + Box::new(self.clone()) + } + + fn set(&mut self, key: &str, value: &str) -> DFResult<()> { + self.properties.insert(key.to_owned(), value.to_owned()); + Ok(()) + } + + fn entries(&self) -> Vec { + self.properties + .iter() + .map(|(k, v)| ConfigEntry { + key: k.into(), + value: Some(v.into()), + description: "", + }) + .collect() + } +} +======= +>>>>>>> d972fcb (Make common sql executor (#209)) From 7761e369e92e9c70411fd70dc5028283f78cb8ff Mon Sep 17 00:00:00 2001 From: osipovartem Date: Fri, 7 Feb 2025 17:19:22 +0300 Subject: [PATCH 2/2] Register CSV within current context --- crates/runtime/src/datafusion/session.rs | 25 ------------------------ 1 file changed, 25 deletions(-) diff --git a/crates/runtime/src/datafusion/session.rs b/crates/runtime/src/datafusion/session.rs index 3ca3104ee..131542f3b 100644 --- a/crates/runtime/src/datafusion/session.rs +++ b/crates/runtime/src/datafusion/session.rs @@ -1,5 +1,4 @@ use crate::datafusion::type_planner::CustomTypePlanner; -<<<<<<< HEAD use datafusion::common::error::Result as DFResult; use datafusion::execution::SessionStateBuilder; use datafusion::prelude::{SessionConfig, SessionContext}; @@ -19,24 +18,6 @@ impl Default for Session { } } -======= -use datafusion::execution::SessionStateBuilder; -use datafusion::prelude::{SessionConfig, SessionContext}; -use datafusion_iceberg::planner::IcebergQueryPlanner; -use std::env; -use std::sync::Arc; - -pub struct Session { - pub ctx: SessionContext, -} - -impl Default for Session { - fn default() -> Self { - Self::new() - } -} - ->>>>>>> d972fcb (Make common sql executor (#209)) impl Session { #[must_use] pub fn new() -> Self { @@ -45,10 +26,7 @@ impl Session { let state = SessionStateBuilder::new() .with_config( SessionConfig::new() -<<<<<<< HEAD .with_option_extension(SessionParams::default()) -======= ->>>>>>> d972fcb (Make common sql executor (#209)) .with_information_schema(true) .set_str("datafusion.sql_parser.dialect", &sql_parser_dialect), ) @@ -60,7 +38,6 @@ impl Session { Self { ctx } } } -<<<<<<< HEAD #[derive(Default, Debug, Clone)] pub struct SessionParams { @@ -115,5 +92,3 @@ impl ExtensionOptions for SessionParams { .collect() } } -======= ->>>>>>> d972fcb (Make common sql executor (#209))