Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ snafu = { version = "0.8.5", features = ["futures"] }
tracing = { version = "0.1" }

[patch.crates-io]
datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "d176c5872a93b0c5dfdd1d2bc717ad22739e9c18" }

datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "4930757e1108fbe33704b29d1aa222a3d6214584" }

[workspace.lints.clippy]
all={ level="deny", priority=-1 }
Expand Down
12 changes: 6 additions & 6 deletions crates/control_plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ flatbuffers = { version = "24.3.25" }
#iceberg-rest-catalog = { git = "https://github.com/JanKaul/iceberg-rust.git", rev = "836f11f" }
#datafusion_iceberg = { git = "https://github.com/JanKaul/iceberg-rust.git", rev = "836f11f" }

datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "d176c5872a93b0c5dfdd1d2bc717ad22739e9c18" }
datafusion-common = { git="https://github.com/Embucket/datafusion.git", rev = "d176c5872a93b0c5dfdd1d2bc717ad22739e9c18" }
datafusion-expr = { git="https://github.com/Embucket/datafusion.git", rev = "d176c5872a93b0c5dfdd1d2bc717ad22739e9c18" }
datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "4930757e1108fbe33704b29d1aa222a3d6214584" }
datafusion-common = { git="https://github.com/Embucket/datafusion.git", rev = "4930757e1108fbe33704b29d1aa222a3d6214584" }
datafusion-expr = { git="https://github.com/Embucket/datafusion.git", rev = "4930757e1108fbe33704b29d1aa222a3d6214584" }

iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "4114c25c46d0c7ad272031e61ece1e62a892ddfc" }
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "4114c25c46d0c7ad272031e61ece1e62a892ddfc" }
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "4114c25c46d0c7ad272031e61ece1e62a892ddfc" }
iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "7f4c767e9f8e4398a01a37190c30be3864066e34" }
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "7f4c767e9f8e4398a01a37190c30be3864066e34" }
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "7f4c767e9f8e4398a01a37190c30be3864066e34" }

arrow = { version = "53" }
arrow-json = { version = "53" }
Expand Down
9 changes: 8 additions & 1 deletion crates/control_plane/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use iceberg_rest_catalog::catalog::RestCatalog;
use object_store::path::Path;
use object_store::{ObjectStore, PutPayload};
use runtime::datafusion::execution::SqlExecutor;
use runtime::datafusion::type_planner::CustomTypePlanner;
use rusoto_core::{HttpClient, Region};
use rusoto_credential::StaticProvider;
use rusoto_s3::{GetBucketAclRequest, S3Client, S3};
Expand Down Expand Up @@ -248,6 +249,7 @@ impl ControlService for ControlServiceImpl {
)
.with_default_features()
.with_query_planner(Arc::new(IcebergQueryPlanner {}))
.with_type_planner(Arc::new(CustomTypePlanner {}))
.build();
let ctx = SessionContext::new_with_state(state);

Expand Down Expand Up @@ -407,7 +409,12 @@ impl ControlService for ControlServiceImpl {
object_store_builder,
);
let catalog = IcebergCatalog::new(Arc::new(rest_client), None).await?;
let ctx = SessionContext::new();
let state = SessionStateBuilder::new()
.with_default_features()
.with_query_planner(Arc::new(IcebergQueryPlanner {}))
.build();

let ctx = SessionContext::new_with_state(state);
ctx.register_catalog(warehouse_name.clone(), Arc::new(catalog));

// Register CSV file as a table
Expand Down
2 changes: 1 addition & 1 deletion crates/nexus/src/http/dbt/schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub struct LoginRequestQuery {
#[serde(rename = "warehouse")]
pub warehouse: String,
#[serde(rename = "roleName")]
pub role_name: String,
pub role_name: Option<String>,
}

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
Expand Down
12 changes: 6 additions & 6 deletions crates/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ serde_json = { workspace = true }
object_store = { workspace = true }
tracing = { workspace = true}

datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "d176c5872a93b0c5dfdd1d2bc717ad22739e9c18" }
datafusion-common = { git="https://github.com/Embucket/datafusion.git", rev = "d176c5872a93b0c5dfdd1d2bc717ad22739e9c18" }
datafusion-expr = { git="https://github.com/Embucket/datafusion.git", rev = "d176c5872a93b0c5dfdd1d2bc717ad22739e9c18" }
datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "4930757e1108fbe33704b29d1aa222a3d6214584" }
datafusion-common = { git="https://github.com/Embucket/datafusion.git", rev = "4930757e1108fbe33704b29d1aa222a3d6214584" }
datafusion-expr = { git="https://github.com/Embucket/datafusion.git", rev = "4930757e1108fbe33704b29d1aa222a3d6214584" }

iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "4114c25c46d0c7ad272031e61ece1e62a892ddfc" }
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "4114c25c46d0c7ad272031e61ece1e62a892ddfc" }
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "4114c25c46d0c7ad272031e61ece1e62a892ddfc" }
iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "7f4c767e9f8e4398a01a37190c30be3864066e34" }
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "7f4c767e9f8e4398a01a37190c30be3864066e34" }
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "7f4c767e9f8e4398a01a37190c30be3864066e34" }

arrow = { version = "53" }
arrow-json = { version = "53" }
Expand Down
134 changes: 0 additions & 134 deletions crates/runtime/src/datafusion/context.rs

This file was deleted.

45 changes: 31 additions & 14 deletions crates/runtime/src/datafusion/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,23 @@
#![allow(clippy::missing_panics_doc)]

use super::error::{self as ih_error, IcehutSQLError, IcehutSQLResult};
use crate::datafusion::context::CustomContextProvider;
use crate::datafusion::functions::register_udfs;
use crate::datafusion::planner::ExtendedSqlToRel;
use arrow::array::{RecordBatch, UInt64Array};
use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use datafusion::common::tree_node::{TransformedResult, TreeNode};
use datafusion::datasource::default_table_source::provider_as_source;
use datafusion::execution::context::SessionContext;
use datafusion::execution::session_state::SessionContextProvider;
use datafusion::logical_expr::sqlparser::ast::Insert;
use datafusion::logical_expr::LogicalPlan;
use datafusion::sql::parser::{CreateExternalTable, DFParser, Statement as DFStatement};
use datafusion::sql::planner::IdentNormalizer;
use datafusion::sql::sqlparser::ast::{
CreateTable as CreateTableStatement, Expr, Ident, ObjectName, Query, SchemaName, Statement,
TableFactor, TableWithJoins,
};
use datafusion_common::DataFusionError;
use datafusion_common::{DataFusionError, TableReference};
use datafusion_functions_json::register_all;
use datafusion_iceberg::catalog::catalog::IcebergCatalog;
use datafusion_iceberg::planner::iceberg_transform;
Expand All @@ -40,13 +41,18 @@ use std::sync::Arc;

pub struct SqlExecutor {
ctx: SessionContext,
ident_normalizer: IdentNormalizer,
}

impl SqlExecutor {
pub fn new(mut ctx: SessionContext) -> IcehutSQLResult<Self> {
register_udfs(&mut ctx).context(ih_error::RegisterUDFSnafu)?;
register_all(&mut ctx).context(ih_error::RegisterUDFSnafu)?;
Ok(Self { ctx })
let enable_ident_normalization = ctx.enable_ident_normalization();
Ok(Self {
ctx,
ident_normalizer: IdentNormalizer::new(enable_ident_normalization),
})
}

#[tracing::instrument(level = "debug", skip(self), err, ret(level = tracing::Level::TRACE))]
Expand Down Expand Up @@ -233,12 +239,13 @@ impl SqlExecutor {
},
)?;
let rest_catalog = iceberg_catalog.catalog();
let new_table_name = self.ident_normalizer.normalize(new_table_name.clone());
let new_table_ident = Identifier::new(
&new_table_db
.iter()
.map(|v| v.value.clone())
.map(|v| self.ident_normalizer.normalize(v.clone()))
.collect::<Vec<String>>(),
&new_table_name.value,
&new_table_name.clone(),
);
if matches!(
rest_catalog.tabular_exists(&new_table_ident).await,
Expand All @@ -255,7 +262,7 @@ impl SqlExecutor {
.create_table(
new_table_ident.clone(),
CreateTableCatalog {
name: new_table_name.value.clone(),
name: new_table_name.clone(),
location,
schema,
partition_spec: None,
Expand Down Expand Up @@ -448,7 +455,11 @@ impl SqlExecutor {
},
)?;
let rest_catalog = iceberg_catalog.catalog();
let namespace_vec: Vec<String> = name.0.iter().map(|ident| ident.value.clone()).collect();
let namespace_vec: Vec<String> = name
.0
.iter()
.map(|ident| self.ident_normalizer.normalize(ident.clone()))
.collect();
let single_layer_namespace = vec![namespace_vec.join(".")];

let namespace =
Expand Down Expand Up @@ -479,17 +490,18 @@ impl SqlExecutor {
//println!("modified query: {:?}", statement.to_string());

if let DFStatement::Statement(s) = statement.clone() {
let mut ctx_provider = CustomContextProvider {
let mut ctx_provider = SessionContextProvider {
state: &state,
tables: HashMap::new(),
};

let references = state
.resolve_table_references(&statement)
.context(super::error::DataFusionSnafu)?;
//println!("References: {:?}", references);
for reference in references {
let resolved = state.resolve_table_ref(reference);
if let Entry::Vacant(v) = ctx_provider.tables.entry(resolved.to_string()) {
if let Entry::Vacant(v) = ctx_provider.tables.entry(resolved.clone()) {
if let Ok(schema) = state.schema_for_ref(resolved.clone()) {
if let Some(table) = schema
.table(&resolved.table)
Expand All @@ -516,15 +528,20 @@ impl SqlExecutor {
.ok_or(IcehutSQLError::TableProviderNotFound {
table_name: table.clone(),
})?;
ctx_provider.tables.insert(
format!("{catalog}.{schema}.{table}"),
provider_as_source(table_source),
);
let resolved = state.resolve_table_ref(TableReference::full(
catalog.to_string(),
schema.to_string(),
table,
));
ctx_provider
.tables
.insert(resolved, provider_as_source(table_source));
}
}
}

let planner = ExtendedSqlToRel::new(&ctx_provider);
let planner =
ExtendedSqlToRel::new(&ctx_provider, self.ctx.state().get_parser_options());
planner
.sql_statement_to_plan(*s)
.context(super::error::DataFusionSnafu)
Expand Down
2 changes: 1 addition & 1 deletion crates/runtime/src/datafusion/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//pub mod analyzer;
pub mod context;
pub mod error;
pub mod functions;
pub mod planner;
//pub mod session;
pub mod execution;
pub mod type_planner;
Loading
Loading