diff --git a/crates/runtime/src/datafusion/execution.rs b/crates/runtime/src/datafusion/execution.rs index 34fbe3541..b8a7db9d0 100644 --- a/crates/runtime/src/datafusion/execution.rs +++ b/crates/runtime/src/datafusion/execution.rs @@ -114,7 +114,7 @@ impl SqlExecutor { } Statement::CreateStage { .. } => { // We support only CSV uploads for now - return Box::pin(self.create_stage_query(*s, warehouse_name)).await; + return Box::pin(self.create_stage_query(*s)).await; } Statement::CopyIntoSnowflake { .. } => { return Box::pin(self.copy_into_snowflake_query(*s, warehouse_name)).await; @@ -374,7 +374,6 @@ impl SqlExecutor { pub async fn create_stage_query( &self, statement: Statement, - warehouse_name: &str, ) -> IcehutSQLResult> { if let Statement::CreateStage { name, @@ -412,7 +411,6 @@ impl SqlExecutor { .unwrap_or(b'"'); let file_path = stage_params.url.unwrap_or_default(); - let stage_table_name = format!("stage_{table_name}"); let url = Url::parse(file_path.as_str()).map_err(|_| IcehutSQLError::InvalidIdentifier { ident: file_path.clone(), @@ -458,7 +456,7 @@ impl SqlExecutor { // Register CSV file with filled missing datatype with default Utf8 self.ctx .register_csv( - stage_table_name.clone(), + table_name.value.clone(), file_path, CsvReadOptions::new() .has_header(skip_header) @@ -467,13 +465,7 @@ impl SqlExecutor { ) .await .context(ih_error::DataFusionSnafu)?; - - // Create stages database and create table with prepared schema - // TODO Don't create table in case we have common ctx - self.create_database(warehouse_name, ObjectName(vec![Ident::new("stages")]), true) - .await?; - let create_query = format!("CREATE TABLE {warehouse_name}.stages.{table_name} AS (SELECT * FROM {stage_table_name})"); - self.query(&create_query, warehouse_name, "").await + Ok(vec![]) } else { Err(IcehutSQLError::DataFusion { source: DataFusionError::NotImplemented( @@ -493,9 +485,8 @@ impl SqlExecutor { } = statement { // Insert data to table - let from_query = from_stage.to_string().replace('@', ""); - let insert_query = - format!("INSERT INTO {into} SELECT * FROM {warehouse_name}.stages.{from_query}"); + let stage_name = from_stage.to_string().replace('@', ""); + let insert_query = format!("INSERT INTO {into} SELECT * FROM {stage_name}"); self.execute_with_custom_plan(&insert_query, warehouse_name) .await } else { @@ -961,6 +952,9 @@ impl SqlExecutor { match statement.clone() { DFStatement::Statement(s) => match *s { + Statement::CopyIntoSnowflake { into, .. } => { + Some(TableReference::parse_str(&into.to_string())) + } Statement::Drop { names, .. } => { Some(TableReference::parse_str(&names[0].to_string())) }