Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 8 additions & 14 deletions crates/runtime/src/datafusion/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl SqlExecutor {
}
Statement::CreateStage { .. } => {
// We support only CSV uploads for now
return Box::pin(self.create_stage_query(*s, warehouse_name)).await;
return Box::pin(self.create_stage_query(*s)).await;
}
Statement::CopyIntoSnowflake { .. } => {
return Box::pin(self.copy_into_snowflake_query(*s, warehouse_name)).await;
Expand Down Expand Up @@ -374,7 +374,6 @@ impl SqlExecutor {
pub async fn create_stage_query(
&self,
statement: Statement,
warehouse_name: &str,
) -> IcehutSQLResult<Vec<RecordBatch>> {
if let Statement::CreateStage {
name,
Expand Down Expand Up @@ -412,7 +411,6 @@ impl SqlExecutor {
.unwrap_or(b'"');

let file_path = stage_params.url.unwrap_or_default();
let stage_table_name = format!("stage_{table_name}");
let url =
Url::parse(file_path.as_str()).map_err(|_| IcehutSQLError::InvalidIdentifier {
ident: file_path.clone(),
Expand Down Expand Up @@ -458,7 +456,7 @@ impl SqlExecutor {
// Register CSV file with filled missing datatype with default Utf8
self.ctx
.register_csv(
stage_table_name.clone(),
table_name.value.clone(),
file_path,
CsvReadOptions::new()
.has_header(skip_header)
Expand All @@ -467,13 +465,7 @@ impl SqlExecutor {
)
.await
.context(ih_error::DataFusionSnafu)?;

// Create stages database and create table with prepared schema
// TODO Don't create table in case we have common ctx
self.create_database(warehouse_name, ObjectName(vec![Ident::new("stages")]), true)
.await?;
let create_query = format!("CREATE TABLE {warehouse_name}.stages.{table_name} AS (SELECT * FROM {stage_table_name})");
self.query(&create_query, warehouse_name, "").await
Ok(vec![])
} else {
Err(IcehutSQLError::DataFusion {
source: DataFusionError::NotImplemented(
Expand All @@ -493,9 +485,8 @@ impl SqlExecutor {
} = statement
{
// Insert data to table
let from_query = from_stage.to_string().replace('@', "");
let insert_query =
format!("INSERT INTO {into} SELECT * FROM {warehouse_name}.stages.{from_query}");
let stage_name = from_stage.to_string().replace('@', "");
let insert_query = format!("INSERT INTO {into} SELECT * FROM {stage_name}");
self.execute_with_custom_plan(&insert_query, warehouse_name)
.await
} else {
Expand Down Expand Up @@ -961,6 +952,9 @@ impl SqlExecutor {

match statement.clone() {
DFStatement::Statement(s) => match *s {
Statement::CopyIntoSnowflake { into, .. } => {
Some(TableReference::parse_str(&into.to_string()))
}
Statement::Drop { names, .. } => {
Some(TableReference::parse_str(&names[0].to_string()))
}
Expand Down
Loading