Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions crates/control_plane/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,12 @@ impl ControlService for ControlServiceImpl {
.parse_query(&query)
.context(super::error::DataFusionSnafu)?;

let table_path = executor.get_table_path(&statement);
let warehouse_name = table_path.db;
let table_ref = executor.get_table_path(&statement);
let warehouse_name = table_ref
.as_ref()
.and_then(|table_ref| table_ref.catalog())
.unwrap_or("")
.to_string();

let (catalog_name, warehouse_location): (String, String) = if warehouse_name.is_empty() {
(String::from("datafusion"), String::new())
Expand Down
2 changes: 2 additions & 0 deletions crates/nexus/src/http/dbt/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ pub async fn query(
return Err(DbtError::MissingDbtSession);
};

// let _ = log_query(&body_json.sql_text).await;

let (result, columns) = state
.control_svc
.query_dbt(&body_json.sql_text)
Expand Down
95 changes: 20 additions & 75 deletions crates/runtime/src/datafusion/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use url::Url;

#[derive(Debug)]
pub struct TablePath {
pub db: String,
pub schema: String,
pub table: String,
}

pub struct SqlExecutor {
// ctx made public to register_catalog after creating SqlExecutor
pub ctx: SessionContext,
Expand Down Expand Up @@ -477,16 +470,13 @@ impl SqlExecutor {
warehouse_name: &str,
) -> IcehutSQLResult<Vec<RecordBatch>> {
if let Statement::Merge {
mut table,
table,
mut source,
on,
clauses,
..
} = statement
{
self.update_tables_in_table_factor(&mut table, warehouse_name);
self.update_tables_in_table_factor(&mut source, warehouse_name);

let (target_table, target_alias) = Self::get_table_with_alias(table);
let (_source_table, source_alias) = Self::get_table_with_alias(source.clone());

Expand Down Expand Up @@ -612,14 +602,14 @@ impl SqlExecutor {
name: ObjectName,
_if_not_exists: bool,
) -> IcehutSQLResult<Vec<RecordBatch>> {
// Parse name into catalog (warehous) name and schema name
// Parse name into catalog (warehouse) name and schema name
let (warehouse_name, schema_name) = match name.0.len() {
2 => (
self.ident_normalizer.normalize(name.0[0].clone()),
self.ident_normalizer.normalize(name.0[1].clone()),
),
_ => {
return Err(super::error::IcehutSQLError::DataFusion {
return Err(IcehutSQLError::DataFusion {
source: DataFusionError::NotImplemented(
"Only two-part names are supported".to_string(),
),
Expand Down Expand Up @@ -921,81 +911,36 @@ impl SqlExecutor {
}

#[must_use]
#[allow(clippy::too_many_lines)]
pub fn get_table_path(&self, statement: &DFStatement) -> TablePath {
pub fn get_table_path(&self, statement: &DFStatement) -> Option<TableReference> {
let empty = String::new;
let table_path = |arr: &Vec<Ident>| -> TablePath {
match arr.len() {
1 => TablePath {
db: empty(),
schema: empty(),
table: arr[0].value.clone(),
},
2 => TablePath {
db: empty(),
schema: arr[0].value.clone(),
table: arr[1].value.clone(),
},
3 => TablePath {
db: arr[0].value.clone(),
schema: arr[1].value.clone(),
table: arr[2].value.clone(),
},
_ => TablePath {
db: empty(),
schema: empty(),
table: empty(),
},
}
};
let references = self.ctx.state().resolve_table_references(statement).ok()?;

match statement.clone() {
DFStatement::CreateExternalTable(create_external) => {
table_path(&create_external.name.0)
}
DFStatement::Statement(s) => match *s {
Statement::AlterTable { name, .. } => table_path(&name.0),
Statement::Insert(insert) => table_path(&insert.table_name.0),
Statement::Drop { names, .. } => table_path(&names[0].0),
Statement::Query(query) => match *query.body {
sqlparser::ast::SetExpr::Select(select) => {
if select.from.is_empty() {
table_path(&vec![])
} else {
match &select.from[0].relation {
TableFactor::Table { name, .. } => table_path(&name.0),
_ => table_path(&vec![]),
}
}
}
_ => table_path(&vec![]),
},
Statement::CreateTable(create_table) => table_path(&create_table.name.0),
Statement::Update { table, .. } => match table.relation {
TableFactor::Table { name, .. } => table_path(&name.0),
_ => table_path(&vec![]),
},
Statement::Drop { names, .. } => {
Some(TableReference::parse_str(&names[0].to_string()))
}
Statement::CreateSchema {
schema_name: SchemaName::Simple(name),
..
} => {
if name.0.len() == 2 {
TablePath {
db: name.0[0].value.clone(),
schema: name.0[1].value.clone(),
table: empty(),
}
Some(TableReference::full(
name.0[0].value.clone(),
name.0[1].value.clone(),
empty(),
))
} else {
TablePath {
db: empty(),
schema: name.0[1].value.clone(),
table: empty(),
}
Some(TableReference::full(
empty(),
name.0[0].value.clone(),
empty(),
))
}
}
_ => table_path(&vec![]),
_ => references.first().cloned(),
},
_ => table_path(&vec![]),
_ => references.first().cloned(),
}
}

Expand Down
28 changes: 11 additions & 17 deletions crates/runtime/src/datafusion/functions/convert_timezone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ use arrow::array::timezone::Tz;
use arrow::datatypes::DataType;
use arrow::datatypes::DataType::{Timestamp, Utf8};
use arrow::datatypes::TimeUnit::{self, Microsecond, Millisecond, Nanosecond, Second};
use datafusion::common::ExprSchema;
use datafusion::common::{internal_err, plan_err, Result};
use datafusion::logical_expr::TypeSignature::Exact;
use datafusion::logical_expr::{
ColumnarValue, ScalarUDFImpl, Signature, Volatility, TIMEZONE_WILDCARD,
};
use datafusion::prelude::Expr;
use datafusion::scalar::ScalarValue;
use datafusion_expr::{ReturnInfo, ReturnTypeArgs};
use std::any::Any;
use std::sync::Arc;

Expand Down Expand Up @@ -91,31 +90,26 @@ impl ScalarUDFImpl for ConvertTimezoneFunc {
&self.signature
}
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
internal_err!("return_types_from_exprs should be called")
internal_err!("return_type_from_args should be called")
}
fn return_type_from_exprs(
&self,
args: &[Expr],
_schema: &dyn ExprSchema,
arg_types: &[DataType],
) -> Result<DataType> {
match args.len() {
fn return_type_from_args(&self, args: ReturnTypeArgs) -> Result<ReturnInfo> {
match args.arg_types.len() {
2 => {
let tz = match &args[0] {
Expr::Literal(ScalarValue::Utf8(Some(part))) => part.clone(),
let tz = match &args.scalar_arguments[0] {
Some(ScalarValue::Utf8(Some(part))) => part.clone(),
_ => return internal_err!("Invalid target_tz type"),
};

match &arg_types[1] {
DataType::Timestamp(tu, _) => Ok(DataType::Timestamp(
match &args.arg_types[1] {
Timestamp(tu, _) => Ok(ReturnInfo::new_non_nullable(Timestamp(
*tu,
Some(Arc::from(tz.into_boxed_str())),
)),
))),
_ => internal_err!("Invalid source_timestamp_tz type"),
}
}
3 => match &arg_types[2] {
DataType::Timestamp(tu, None) => Ok(DataType::Timestamp(*tu, None)),
3 => match &args.arg_types[2] {
Timestamp(tu, None) => Ok(ReturnInfo::new_non_nullable(Timestamp(*tu, None))),
_ => internal_err!("Invalid source_timestamp_ntz type"),
},
other => {
Expand Down
101 changes: 35 additions & 66 deletions crates/runtime/src/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use datafusion::sql::sqlparser::ast::{
DataType as SQLDataType, Statement,
};
use datafusion::sql::statement::{calc_inline_constraints_from_columns, object_name_to_string};
use datafusion::sql::utils::normalize_ident;
use datafusion_common::{DFSchema, DFSchemaRef, SchemaReference, TableReference};
use datafusion_expr::DropCatalogSchema;
use sqlparser::ast::ObjectType;
Expand Down Expand Up @@ -239,102 +238,72 @@ where
}

fn show_objects_to_plan(&self, parent: &ObjectName) -> Result<LogicalPlan> {
if !self.inner.has_table("information_schema", "df_settings") {
return plan_err!("SHOW OBJECTS is not supported unless information_schema is enabled");
}
// Only support listing objects in schema for now
match parent.0.len() {
2 => {
let (catalog, schema) = (parent.0[0].value.clone(), parent.0[1].value.clone());
// let (catalog, schema) = (parent.0[0].value.clone(), parent.0[1].value.clone());

// Create query to list objects in schema
let columns = [
"table_catalog as 'database_name'",
"table_schema as 'schema_name'",
"table_name as 'name'",
"case when table_type='BASE TABLE' then 'TABLE' else table_type end as 'kind'",
"case when table_type='BASE TABLE' then 'Y' else 'N' end as 'is_iceberg'",
"null as 'comment'",
]
.join(", ");
// TODO: views?
// TODO: Return programmatically constructed plan
let query = format!("SELECT {columns} FROM information_schema.tables where table_schema = '{schema}' and table_catalog = '{catalog}'");
let mut statements = DFParser::parse_sql(query.as_str())?;
statements.pop_front().map_or_else(
|| plan_err!("Failed to parse SQL statement"),
|statement| {
if let DFStatement::Statement(s) = statement {
self.sql_statement_to_plan(*s)
} else {
plan_err!("Failed to parse SQL statement")
}
},
)
let query = format!("SELECT {columns} FROM information_schema.tables");
self.parse_sql(query.as_str())
}
_ => plan_err!("Unsupported show objects: {:?}", parent),
}
}

fn show_variable_to_plan(&self, variable: &[Ident]) -> Result<LogicalPlan> {
//println!("SHOW variable: {:?}", variable);
if !self.inner.has_table("information_schema", "df_settings") {
return plan_err!(
"SHOW [VARIABLE] is not supported unless information_schema is enabled"
);
}

let verbose = variable
.last()
.is_some_and(|s| normalize_ident(s.to_owned()) == "verbose");
let mut variable_vec = variable.to_vec();
let mut columns: String = "name, value".to_owned();

// TODO: Fix how columns are selected. Vec instead of string
#[allow(unused_assignments)]
if verbose {
columns = format!("{columns}, description");
variable_vec = variable_vec.split_at(variable_vec.len() - 1).0.to_vec();
}

let query = if variable_vec.iter().any(|ident| ident.value == "objects") {
columns = [
"table_catalog as 'database_name'",
"table_schema as 'schema_name'",
"table_name as 'name'",
"case when table_type='BASE TABLE' then 'TABLE' else table_type end as 'kind'",
"null as 'comment'",
]
.join(", ");
format!("SELECT {columns} FROM information_schema.tables")
let variable = object_name_to_string(&ObjectName(variable.to_vec()));
// let base_query = format!("SELECT {columns} FROM information_schema.df_settings");
let base_query = "select schema_name as 'name' from information_schema.schemata";
let query = if variable == "all" {
// Add an ORDER BY so the output comes out in a consistent order
format!("{base_query} ORDER BY name")
} else if variable == "timezone" || variable == "time.zone" {
// we could introduce alias in OptionDefinition if this string matching thing grows
format!("{base_query} WHERE name = 'datafusion.execution.time_zone'")
} else {
let variable = object_name_to_string(&ObjectName(variable_vec));
// let base_query = format!("SELECT {columns} FROM information_schema.df_settings");
let base_query = "select schema_name as 'name' from information_schema.schemata";
let query_res = if variable == "all" {
// Add an ORDER BY so the output comes out in a consistent order
format!("{base_query} ORDER BY name")
} else if variable == "timezone" || variable == "time.zone" {
// we could introduce alias in OptionDefinition if this string matching thing grows
format!("{base_query} WHERE name = 'datafusion.execution.time_zone'")
} else {
// These values are what are used to make the information_schema table, so we just
// check here, before actually planning or executing the query, if it would produce no
// results, and error preemptively if it would (for a better UX)
let is_valid_variable = self
.provider
.options()
.entries()
.iter()
.any(|opt| opt.key == variable);
// These values are what are used to make the information_schema table, so we just
// check here, before actually planning or executing the query, if it would produce no
// results, and error preemptively if it would (for a better UX)
let is_valid_variable = self
.provider
.options()
.entries()
.iter()
.any(|opt| opt.key == variable);

if is_valid_variable {
format!("{base_query} WHERE name = '{variable}'")
} else {
// skip where clause to return empty result
base_query.to_string()
}
};
query_res
if is_valid_variable {
format!("{base_query} WHERE name = '{variable}'")
} else {
// skip where clause to return empty result
base_query.to_string()
}
};
self.parse_sql(query.as_str())
}

let mut statements = DFParser::parse_sql(query.as_str())?;
fn parse_sql(&self, sql: &str) -> Result<LogicalPlan> {
let mut statements = DFParser::parse_sql(sql)?;
statements.pop_front().map_or_else(
|| plan_err!("Failed to parse SQL statement"),
|statement| {
Expand Down
Loading