Skip to content

Commit 2006f0c

Browse files
authored
Fix minor issues (#205)
1 parent 9c657ff commit 2006f0c

File tree

5 files changed

+74
-160
lines changed

5 files changed

+74
-160
lines changed

crates/control_plane/src/service.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,8 +225,12 @@ impl ControlService for ControlServiceImpl {
225225
.parse_query(&query)
226226
.context(super::error::DataFusionSnafu)?;
227227

228-
let table_path = executor.get_table_path(&statement);
229-
let warehouse_name = table_path.db;
228+
let table_ref = executor.get_table_path(&statement);
229+
let warehouse_name = table_ref
230+
.as_ref()
231+
.and_then(|table_ref| table_ref.catalog())
232+
.unwrap_or("")
233+
.to_string();
230234

231235
let (catalog_name, warehouse_location): (String, String) = if warehouse_name.is_empty() {
232236
(String::from("datafusion"), String::new())

crates/nexus/src/http/dbt/handlers.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ pub async fn query(
8282
return Err(DbtError::MissingDbtSession);
8383
};
8484

85+
// let _ = log_query(&body_json.sql_text).await;
86+
8587
let (result, columns) = state
8688
.control_svc
8789
.query_dbt(&body_json.sql_text)

crates/runtime/src/datafusion/execution.rs

Lines changed: 20 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,6 @@ use std::collections::HashMap;
4242
use std::sync::Arc;
4343
use url::Url;
4444

45-
#[derive(Debug)]
46-
pub struct TablePath {
47-
pub db: String,
48-
pub schema: String,
49-
pub table: String,
50-
}
51-
5245
pub struct SqlExecutor {
5346
// ctx made public to register_catalog after creating SqlExecutor
5447
pub ctx: SessionContext,
@@ -477,16 +470,13 @@ impl SqlExecutor {
477470
warehouse_name: &str,
478471
) -> IcehutSQLResult<Vec<RecordBatch>> {
479472
if let Statement::Merge {
480-
mut table,
473+
table,
481474
mut source,
482475
on,
483476
clauses,
484477
..
485478
} = statement
486479
{
487-
self.update_tables_in_table_factor(&mut table, warehouse_name);
488-
self.update_tables_in_table_factor(&mut source, warehouse_name);
489-
490480
let (target_table, target_alias) = Self::get_table_with_alias(table);
491481
let (_source_table, source_alias) = Self::get_table_with_alias(source.clone());
492482

@@ -612,14 +602,14 @@ impl SqlExecutor {
612602
name: ObjectName,
613603
_if_not_exists: bool,
614604
) -> IcehutSQLResult<Vec<RecordBatch>> {
615-
// Parse name into catalog (warehous) name and schema name
605+
// Parse name into catalog (warehouse) name and schema name
616606
let (warehouse_name, schema_name) = match name.0.len() {
617607
2 => (
618608
self.ident_normalizer.normalize(name.0[0].clone()),
619609
self.ident_normalizer.normalize(name.0[1].clone()),
620610
),
621611
_ => {
622-
return Err(super::error::IcehutSQLError::DataFusion {
612+
return Err(IcehutSQLError::DataFusion {
623613
source: DataFusionError::NotImplemented(
624614
"Only two-part names are supported".to_string(),
625615
),
@@ -921,81 +911,36 @@ impl SqlExecutor {
921911
}
922912

923913
#[must_use]
924-
#[allow(clippy::too_many_lines)]
925-
pub fn get_table_path(&self, statement: &DFStatement) -> TablePath {
914+
pub fn get_table_path(&self, statement: &DFStatement) -> Option<TableReference> {
926915
let empty = String::new;
927-
let table_path = |arr: &Vec<Ident>| -> TablePath {
928-
match arr.len() {
929-
1 => TablePath {
930-
db: empty(),
931-
schema: empty(),
932-
table: arr[0].value.clone(),
933-
},
934-
2 => TablePath {
935-
db: empty(),
936-
schema: arr[0].value.clone(),
937-
table: arr[1].value.clone(),
938-
},
939-
3 => TablePath {
940-
db: arr[0].value.clone(),
941-
schema: arr[1].value.clone(),
942-
table: arr[2].value.clone(),
943-
},
944-
_ => TablePath {
945-
db: empty(),
946-
schema: empty(),
947-
table: empty(),
948-
},
949-
}
950-
};
916+
let references = self.ctx.state().resolve_table_references(statement).ok()?;
951917

952918
match statement.clone() {
953-
DFStatement::CreateExternalTable(create_external) => {
954-
table_path(&create_external.name.0)
955-
}
956919
DFStatement::Statement(s) => match *s {
957-
Statement::AlterTable { name, .. } => table_path(&name.0),
958-
Statement::Insert(insert) => table_path(&insert.table_name.0),
959-
Statement::Drop { names, .. } => table_path(&names[0].0),
960-
Statement::Query(query) => match *query.body {
961-
sqlparser::ast::SetExpr::Select(select) => {
962-
if select.from.is_empty() {
963-
table_path(&vec![])
964-
} else {
965-
match &select.from[0].relation {
966-
TableFactor::Table { name, .. } => table_path(&name.0),
967-
_ => table_path(&vec![]),
968-
}
969-
}
970-
}
971-
_ => table_path(&vec![]),
972-
},
973-
Statement::CreateTable(create_table) => table_path(&create_table.name.0),
974-
Statement::Update { table, .. } => match table.relation {
975-
TableFactor::Table { name, .. } => table_path(&name.0),
976-
_ => table_path(&vec![]),
977-
},
920+
Statement::Drop { names, .. } => {
921+
Some(TableReference::parse_str(&names[0].to_string()))
922+
}
978923
Statement::CreateSchema {
979924
schema_name: SchemaName::Simple(name),
980925
..
981926
} => {
982927
if name.0.len() == 2 {
983-
TablePath {
984-
db: name.0[0].value.clone(),
985-
schema: name.0[1].value.clone(),
986-
table: empty(),
987-
}
928+
Some(TableReference::full(
929+
name.0[0].value.clone(),
930+
name.0[1].value.clone(),
931+
empty(),
932+
))
988933
} else {
989-
TablePath {
990-
db: empty(),
991-
schema: name.0[1].value.clone(),
992-
table: empty(),
993-
}
934+
Some(TableReference::full(
935+
empty(),
936+
name.0[0].value.clone(),
937+
empty(),
938+
))
994939
}
995940
}
996-
_ => table_path(&vec![]),
941+
_ => references.first().cloned(),
997942
},
998-
_ => table_path(&vec![]),
943+
_ => references.first().cloned(),
999944
}
1000945
}
1001946

crates/runtime/src/datafusion/functions/convert_timezone.rs

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,13 @@ use arrow::array::timezone::Tz;
22
use arrow::datatypes::DataType;
33
use arrow::datatypes::DataType::{Timestamp, Utf8};
44
use arrow::datatypes::TimeUnit::{self, Microsecond, Millisecond, Nanosecond, Second};
5-
use datafusion::common::ExprSchema;
65
use datafusion::common::{internal_err, plan_err, Result};
76
use datafusion::logical_expr::TypeSignature::Exact;
87
use datafusion::logical_expr::{
98
ColumnarValue, ScalarUDFImpl, Signature, Volatility, TIMEZONE_WILDCARD,
109
};
11-
use datafusion::prelude::Expr;
1210
use datafusion::scalar::ScalarValue;
11+
use datafusion_expr::{ReturnInfo, ReturnTypeArgs};
1312
use std::any::Any;
1413
use std::sync::Arc;
1514

@@ -91,31 +90,26 @@ impl ScalarUDFImpl for ConvertTimezoneFunc {
9190
&self.signature
9291
}
9392
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
94-
internal_err!("return_types_from_exprs should be called")
93+
internal_err!("return_type_from_args should be called")
9594
}
96-
fn return_type_from_exprs(
97-
&self,
98-
args: &[Expr],
99-
_schema: &dyn ExprSchema,
100-
arg_types: &[DataType],
101-
) -> Result<DataType> {
102-
match args.len() {
95+
fn return_type_from_args(&self, args: ReturnTypeArgs) -> Result<ReturnInfo> {
96+
match args.arg_types.len() {
10397
2 => {
104-
let tz = match &args[0] {
105-
Expr::Literal(ScalarValue::Utf8(Some(part))) => part.clone(),
98+
let tz = match &args.scalar_arguments[0] {
99+
Some(ScalarValue::Utf8(Some(part))) => part.clone(),
106100
_ => return internal_err!("Invalid target_tz type"),
107101
};
108102

109-
match &arg_types[1] {
110-
DataType::Timestamp(tu, _) => Ok(DataType::Timestamp(
103+
match &args.arg_types[1] {
104+
Timestamp(tu, _) => Ok(ReturnInfo::new_non_nullable(Timestamp(
111105
*tu,
112106
Some(Arc::from(tz.into_boxed_str())),
113-
)),
107+
))),
114108
_ => internal_err!("Invalid source_timestamp_tz type"),
115109
}
116110
}
117-
3 => match &arg_types[2] {
118-
DataType::Timestamp(tu, None) => Ok(DataType::Timestamp(*tu, None)),
111+
3 => match &args.arg_types[2] {
112+
Timestamp(tu, None) => Ok(ReturnInfo::new_non_nullable(Timestamp(*tu, None))),
119113
_ => internal_err!("Invalid source_timestamp_ntz type"),
120114
},
121115
other => {

crates/runtime/src/datafusion/planner.rs

Lines changed: 35 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ use datafusion::sql::sqlparser::ast::{
3030
DataType as SQLDataType, Statement,
3131
};
3232
use datafusion::sql::statement::{calc_inline_constraints_from_columns, object_name_to_string};
33-
use datafusion::sql::utils::normalize_ident;
3433
use datafusion_common::{DFSchema, DFSchemaRef, SchemaReference, TableReference};
3534
use datafusion_expr::DropCatalogSchema;
3635
use sqlparser::ast::ObjectType;
@@ -239,102 +238,72 @@ where
239238
}
240239

241240
fn show_objects_to_plan(&self, parent: &ObjectName) -> Result<LogicalPlan> {
241+
if !self.inner.has_table("information_schema", "df_settings") {
242+
return plan_err!("SHOW OBJECTS is not supported unless information_schema is enabled");
243+
}
242244
// Only support listing objects in schema for now
243245
match parent.0.len() {
244246
2 => {
245-
let (catalog, schema) = (parent.0[0].value.clone(), parent.0[1].value.clone());
247+
// let (catalog, schema) = (parent.0[0].value.clone(), parent.0[1].value.clone());
246248

247249
// Create query to list objects in schema
248250
let columns = [
249251
"table_catalog as 'database_name'",
250252
"table_schema as 'schema_name'",
251253
"table_name as 'name'",
252254
"case when table_type='BASE TABLE' then 'TABLE' else table_type end as 'kind'",
255+
"case when table_type='BASE TABLE' then 'Y' else 'N' end as 'is_iceberg'",
253256
"null as 'comment'",
254257
]
255258
.join(", ");
256259
// TODO: views?
257260
// TODO: Return programmatically constructed plan
258-
let query = format!("SELECT {columns} FROM information_schema.tables where table_schema = '{schema}' and table_catalog = '{catalog}'");
259-
let mut statements = DFParser::parse_sql(query.as_str())?;
260-
statements.pop_front().map_or_else(
261-
|| plan_err!("Failed to parse SQL statement"),
262-
|statement| {
263-
if let DFStatement::Statement(s) = statement {
264-
self.sql_statement_to_plan(*s)
265-
} else {
266-
plan_err!("Failed to parse SQL statement")
267-
}
268-
},
269-
)
261+
let query = format!("SELECT {columns} FROM information_schema.tables");
262+
self.parse_sql(query.as_str())
270263
}
271264
_ => plan_err!("Unsupported show objects: {:?}", parent),
272265
}
273266
}
274267

275268
fn show_variable_to_plan(&self, variable: &[Ident]) -> Result<LogicalPlan> {
276-
//println!("SHOW variable: {:?}", variable);
277269
if !self.inner.has_table("information_schema", "df_settings") {
278270
return plan_err!(
279271
"SHOW [VARIABLE] is not supported unless information_schema is enabled"
280272
);
281273
}
282274

283-
let verbose = variable
284-
.last()
285-
.is_some_and(|s| normalize_ident(s.to_owned()) == "verbose");
286-
let mut variable_vec = variable.to_vec();
287-
let mut columns: String = "name, value".to_owned();
288-
289-
// TODO: Fix how columns are selected. Vec instead of string
290-
#[allow(unused_assignments)]
291-
if verbose {
292-
columns = format!("{columns}, description");
293-
variable_vec = variable_vec.split_at(variable_vec.len() - 1).0.to_vec();
294-
}
295-
296-
let query = if variable_vec.iter().any(|ident| ident.value == "objects") {
297-
columns = [
298-
"table_catalog as 'database_name'",
299-
"table_schema as 'schema_name'",
300-
"table_name as 'name'",
301-
"case when table_type='BASE TABLE' then 'TABLE' else table_type end as 'kind'",
302-
"null as 'comment'",
303-
]
304-
.join(", ");
305-
format!("SELECT {columns} FROM information_schema.tables")
275+
let variable = object_name_to_string(&ObjectName(variable.to_vec()));
276+
// let base_query = format!("SELECT {columns} FROM information_schema.df_settings");
277+
let base_query = "select schema_name as 'name' from information_schema.schemata";
278+
let query = if variable == "all" {
279+
// Add an ORDER BY so the output comes out in a consistent order
280+
format!("{base_query} ORDER BY name")
281+
} else if variable == "timezone" || variable == "time.zone" {
282+
// we could introduce alias in OptionDefinition if this string matching thing grows
283+
format!("{base_query} WHERE name = 'datafusion.execution.time_zone'")
306284
} else {
307-
let variable = object_name_to_string(&ObjectName(variable_vec));
308-
// let base_query = format!("SELECT {columns} FROM information_schema.df_settings");
309-
let base_query = "select schema_name as 'name' from information_schema.schemata";
310-
let query_res = if variable == "all" {
311-
// Add an ORDER BY so the output comes out in a consistent order
312-
format!("{base_query} ORDER BY name")
313-
} else if variable == "timezone" || variable == "time.zone" {
314-
// we could introduce alias in OptionDefinition if this string matching thing grows
315-
format!("{base_query} WHERE name = 'datafusion.execution.time_zone'")
316-
} else {
317-
// These values are what are used to make the information_schema table, so we just
318-
// check here, before actually planning or executing the query, if it would produce no
319-
// results, and error preemptively if it would (for a better UX)
320-
let is_valid_variable = self
321-
.provider
322-
.options()
323-
.entries()
324-
.iter()
325-
.any(|opt| opt.key == variable);
285+
// These values are what are used to make the information_schema table, so we just
286+
// check here, before actually planning or executing the query, if it would produce no
287+
// results, and error preemptively if it would (for a better UX)
288+
let is_valid_variable = self
289+
.provider
290+
.options()
291+
.entries()
292+
.iter()
293+
.any(|opt| opt.key == variable);
326294

327-
if is_valid_variable {
328-
format!("{base_query} WHERE name = '{variable}'")
329-
} else {
330-
// skip where clause to return empty result
331-
base_query.to_string()
332-
}
333-
};
334-
query_res
295+
if is_valid_variable {
296+
format!("{base_query} WHERE name = '{variable}'")
297+
} else {
298+
// skip where clause to return empty result
299+
base_query.to_string()
300+
}
335301
};
302+
self.parse_sql(query.as_str())
303+
}
336304

337-
let mut statements = DFParser::parse_sql(query.as_str())?;
305+
fn parse_sql(&self, sql: &str) -> Result<LogicalPlan> {
306+
let mut statements = DFParser::parse_sql(sql)?;
338307
statements.pop_front().map_or_else(
339308
|| plan_err!("Failed to parse SQL statement"),
340309
|statement| {

0 commit comments

Comments
 (0)