Skip to content

Commit a66e888

Browse files
authored
Implement QUAILFY (#151)
* # This is a combination of 14 commits. # This is the 1st commit message: Upload CSV to table # This is the commit message #2: Upload CSV to table # This is the commit message #3: Fix linter # This is the commit message #4: Remove unused error # This is the commit message #5: Fix clippy # This is the commit message #6: Fix clippy # This is the commit message #7: Fix linter # This is the commit message #8: Fix clippy # This is the commit message #9: Fix clippy # This is the commit message #10: Fix clippy # This is the commit message #11: Fix fmt # This is the commit message #12: Add pre-commit # This is the commit message #13: Parse merge into # This is the commit message #14: Parse merge into * Merge * Add full message text into the error * Tmp * Fix * Implement qualify * Update deps * Update MERGE to insert only not matched records * Update deps * Update MERGE to insert only not matched records * Add lt lteq
1 parent 7e1b096 commit a66e888

File tree

7 files changed

+171
-52
lines changed

7 files changed

+171
-52
lines changed

crates/control_plane/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c592
2222
datafusion-common = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" }
2323
datafusion-expr = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" }
2424

25-
iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "68577441273eda894f1eb6d87b1c3e87dee0fdf6" }
26-
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "68577441273eda894f1eb6d87b1c3e87dee0fdf6" }
27-
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "68577441273eda894f1eb6d87b1c3e87dee0fdf6" }
25+
iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "09090f0e9b11153a549ed447581a5b595484b245" }
26+
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "09090f0e9b11153a549ed447581a5b595484b245" }
27+
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "09090f0e9b11153a549ed447581a5b595484b245" }
2828

2929
arrow = { version = "53" }
3030
arrow-json = { version = "53" }

crates/runtime/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ datafusion = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c592
1818
datafusion-common = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" }
1919
datafusion-expr = { git="https://github.com/Embucket/datafusion.git", rev = "cc37c5920463b0cb0b224fc7f567fd4ae0368ffe" }
2020

21-
iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "68577441273eda894f1eb6d87b1c3e87dee0fdf6" }
22-
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "68577441273eda894f1eb6d87b1c3e87dee0fdf6" }
23-
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "68577441273eda894f1eb6d87b1c3e87dee0fdf6" }
21+
iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "09090f0e9b11153a549ed447581a5b595484b245" }
22+
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "09090f0e9b11153a549ed447581a5b595484b245" }
23+
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "09090f0e9b11153a549ed447581a5b595484b245" }
2424

2525
arrow = { version = "53" }
2626
arrow-json = { version = "53" }

crates/runtime/src/datafusion/execution.rs

Lines changed: 115 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,12 @@ use iceberg_rust::spec::namespace::Namespace;
2828
use iceberg_rust::spec::schema::Schema;
2929
use iceberg_rust::spec::types::StructType;
3030
use snafu::ResultExt;
31-
use sqlparser::ast::{MergeAction, MergeClauseKind, MergeInsertKind, Query as AstQuery};
31+
use sqlparser::ast::helpers::attached_token::AttachedToken;
32+
use sqlparser::ast::{
33+
BinaryOperator, GroupByExpr, MergeAction, MergeClauseKind, MergeInsertKind, Query as AstQuery,
34+
Select, SelectItem,
35+
};
36+
use sqlparser::tokenizer::Span;
3237
use std::collections::hash_map::Entry;
3338
use std::collections::HashMap;
3439
use std::sync::Arc;
@@ -77,11 +82,17 @@ impl SqlExecutor {
7782
| Statement::StartTransaction { .. }
7883
| Statement::Commit { .. }
7984
| Statement::Insert { .. }
80-
| Statement::Query { .. }
8185
| Statement::ShowSchemas { .. }
8286
| Statement::ShowVariable { .. } => {
8387
return Box::pin(self.execute_with_custom_plan(&query, warehouse_name)).await;
8488
}
89+
Statement::Query(mut subquery) => {
90+
self.update_qualify_in_query(subquery.as_mut());
91+
return Box::pin(
92+
self.execute_with_custom_plan(&subquery.to_string(), warehouse_name),
93+
)
94+
.await;
95+
}
8596
Statement::Drop { .. } => {
8697
return Box::pin(self.drop_table_query(&query, warehouse_name)).await;
8798
}
@@ -110,8 +121,9 @@ impl SqlExecutor {
110121
pub fn preprocess_query(&self, query: &str) -> String {
111122
// Replace field[0].subfield -> json_get(json_get(field, 0), 'subfield')
112123
// TODO: This regex should be a static allocation
113-
let re = regex::Regex::new(r"(\w+)\[(\d+)][:\.](\w+)").unwrap();
114-
let date_add = regex::Regex::new(r"(date|time|timestamp)(_?add|_?diff)\(\s*([a-zA-Z]+),").unwrap();
124+
let re = regex::Regex::new(r"(\w+.\w+)\[(\d+)][:\.](\w+)").unwrap();
125+
let date_add =
126+
regex::Regex::new(r"(date|time|timestamp)(_?add|_?diff)\(\s*([a-zA-Z]+),").unwrap();
115127

116128
let query = re
117129
.replace_all(query, "json_get(json_get($1, $2), '$3')")
@@ -150,13 +162,19 @@ impl SqlExecutor {
150162

151163
// Replace the name of table that needs creation (for ex. "warehouse"."database"."table" -> "table")
152164
// And run the query - this will create an InMemory table
153-
let modified_statement = CreateTableStatement {
165+
let mut modified_statement = CreateTableStatement {
154166
name: ObjectName(vec![new_table_name.clone()]),
155167
transient: false,
156168
..create_table_statement
157169
};
170+
171+
// Replace qualify with nested select
172+
if let Some(ref mut query) = modified_statement.query {
173+
self.update_qualify_in_query(query);
174+
}
158175
// Create InMemory table since external tables with "AS SELECT" are not supported
159176
let updated_query = modified_statement.to_string();
177+
160178
let plan = self
161179
.get_custom_logical_plan(&updated_query, warehouse_name)
162180
.await?;
@@ -481,7 +499,7 @@ impl SqlExecutor {
481499
}
482500
}
483501
}
484-
// println!("Tables: {:?}", ctx_provider.tables.keys());
502+
485503
let planner = ExtendedSqlToRel::new(&ctx_provider);
486504
planner
487505
.sql_statement_to_plan(*s)
@@ -509,6 +527,97 @@ impl SqlExecutor {
509527
.context(super::error::DataFusionSnafu)
510528
}
511529

530+
#[allow(clippy::only_used_in_recursion)]
531+
fn update_qualify_in_query(&self, query: &mut Query) {
532+
if let Some(with) = query.with.as_mut() {
533+
for cte in &mut with.cte_tables {
534+
self.update_qualify_in_query(&mut cte.query);
535+
}
536+
}
537+
538+
match query.body.as_mut() {
539+
sqlparser::ast::SetExpr::Select(select) => {
540+
if let Some(Expr::BinaryOp { left, op, right }) = select.qualify.as_ref() {
541+
if matches!(
542+
op,
543+
BinaryOperator::Eq | BinaryOperator::Lt | BinaryOperator::LtEq
544+
) {
545+
let mut inner_select = select.clone();
546+
inner_select.qualify = None;
547+
inner_select.projection.push(SelectItem::ExprWithAlias {
548+
expr: *(left.clone()),
549+
alias: Ident {
550+
value: "qualify_alias".to_string(),
551+
quote_style: None,
552+
span: Span::empty(),
553+
},
554+
});
555+
let subquery = Query {
556+
with: None,
557+
body: Box::new(sqlparser::ast::SetExpr::Select(inner_select)),
558+
order_by: None,
559+
limit: None,
560+
limit_by: vec![],
561+
offset: None,
562+
fetch: None,
563+
locks: vec![],
564+
for_clause: None,
565+
settings: None,
566+
format_clause: None,
567+
};
568+
let outer_select = Select {
569+
select_token: AttachedToken::empty(),
570+
distinct: None,
571+
top: None,
572+
top_before_distinct: false,
573+
projection: vec![SelectItem::UnnamedExpr(Expr::Identifier(Ident {
574+
value: "*".to_string(),
575+
quote_style: None,
576+
span: Span::empty(),
577+
}))],
578+
into: None,
579+
from: vec![TableWithJoins {
580+
relation: TableFactor::Derived {
581+
lateral: false,
582+
subquery: Box::new(subquery),
583+
alias: None,
584+
},
585+
joins: vec![],
586+
}],
587+
lateral_views: vec![],
588+
prewhere: None,
589+
selection: Some(Expr::BinaryOp {
590+
left: Box::new(Expr::Identifier(Ident {
591+
value: "qualify_alias".to_string(),
592+
quote_style: None,
593+
span: Span::empty(),
594+
})),
595+
op: op.clone(),
596+
right: Box::new(*right.clone()),
597+
}),
598+
group_by: GroupByExpr::Expressions(vec![], vec![]),
599+
cluster_by: vec![],
600+
distribute_by: vec![],
601+
sort_by: vec![],
602+
having: None,
603+
named_window: vec![],
604+
qualify: None,
605+
window_before_qualify: false,
606+
value_table_mode: None,
607+
connect_by: None,
608+
};
609+
610+
*query.body = sqlparser::ast::SetExpr::Select(Box::new(outer_select));
611+
}
612+
}
613+
}
614+
sqlparser::ast::SetExpr::Query(q) => {
615+
self.update_qualify_in_query(q);
616+
}
617+
_ => {}
618+
}
619+
}
620+
512621
#[allow(clippy::only_used_in_recursion)]
513622
fn get_expr_where_clause(&self, expr: Expr, target_alias: &str) -> Vec<String> {
514623
match expr {

crates/runtime/src/datafusion/functions/convert_timezone.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ impl ScalarUDFImpl for ConvertTimezoneFunc {
130130
//should use local session time
131131
//TODO: select convert_timezone('America/New_York, 'UTC', v3) with v3 a timestamp with value = '2025-01-06 08:00:00 America/New_York',
132132
//should be parsed as the timezone None variant timestamp
133+
#[allow(clippy::too_many_lines)]
133134
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
134135
match args.len() {
135136
2 => {

crates/runtime/src/datafusion/functions/date_diff.rs

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
11
use arrow::array::Array;
22
use arrow::compute::{date_part, DatePart};
3-
use arrow::datatypes::DataType::Int64;
43
use arrow::datatypes::DataType;
4+
use arrow::datatypes::DataType::Int64;
55
use datafusion::common::{plan_err, Result};
66
use datafusion::logical_expr::TypeSignature::Coercible;
77
use datafusion::logical_expr::TypeSignatureClass;
8-
use datafusion::logical_expr::{
9-
ColumnarValue, ScalarUDFImpl, Signature, Volatility,
10-
};
8+
use datafusion::logical_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
119
use datafusion::scalar::ScalarValue;
1210
use datafusion_common::{internal_err, types::logical_string};
1311
use std::any::Any;
14-
use std::vec;
1512
use std::sync::Arc;
13+
use std::vec;
1614

1715
#[derive(Debug)]
1816
pub struct DateDiffFunc {
@@ -60,7 +58,11 @@ impl DateDiffFunc {
6058
],
6159
}
6260
}
63-
fn date_diff_func(date_or_time_expr1: &Arc<dyn Array>, date_or_time_expr2: &Arc<dyn Array>, unit_type: DatePart) -> Result<ColumnarValue> {
61+
fn date_diff_func(
62+
date_or_time_expr1: &Arc<dyn Array>,
63+
date_or_time_expr2: &Arc<dyn Array>,
64+
unit_type: DatePart,
65+
) -> Result<ColumnarValue> {
6466
let unit2 = date_part(date_or_time_expr2, unit_type)?;
6567
let unit1 = date_part(date_or_time_expr1, unit_type)?;
6668
Ok(ColumnarValue::Scalar(
@@ -159,14 +161,22 @@ impl ScalarUDFImpl for DateDiffFunc {
159161
"second" | "s" | "sec" | "seconds" | "secs" => {
160162
Self::date_diff_func(&date_or_time_expr1, &date_or_time_expr2, DatePart::Second)
161163
}
162-
"millisecond" | "ms" | "msec" | "milliseconds" => {
163-
Self::date_diff_func(&date_or_time_expr1, &date_or_time_expr2, DatePart::Millisecond)
164-
}
165-
"microsecond" | "us" | "usec" | "microseconds" => {
166-
Self::date_diff_func(&date_or_time_expr1, &date_or_time_expr2, DatePart::Microsecond)
167-
}
164+
"millisecond" | "ms" | "msec" | "milliseconds" => Self::date_diff_func(
165+
&date_or_time_expr1,
166+
&date_or_time_expr2,
167+
DatePart::Millisecond,
168+
),
169+
"microsecond" | "us" | "usec" | "microseconds" => Self::date_diff_func(
170+
&date_or_time_expr1,
171+
&date_or_time_expr2,
172+
DatePart::Microsecond,
173+
),
168174
"nanosecond" | "ns" | "nsec" | "nanosec" | "nsecond" | "nanoseconds" | "nanosecs" => {
169-
Self::date_diff_func(&date_or_time_expr1, &date_or_time_expr2, DatePart::Nanosecond)
175+
Self::date_diff_func(
176+
&date_or_time_expr1,
177+
&date_or_time_expr2,
178+
DatePart::Nanosecond,
179+
)
170180
}
171181
_ => plan_err!("Invalid date_or_time_part type")?,
172182
}
@@ -176,4 +186,4 @@ impl ScalarUDFImpl for DateDiffFunc {
176186
}
177187
}
178188

179-
super::macros::make_udf_function!(DateDiffFunc);
189+
super::macros::make_udf_function!(DateDiffFunc);

crates/runtime/src/datafusion/planner.rs

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -231,30 +231,38 @@ where
231231
match sql_type {
232232
SQLDataType::Boolean | SQLDataType::Bool => Ok(DataType::Boolean),
233233
SQLDataType::TinyInt(_) => Ok(DataType::Int8),
234-
SQLDataType::SmallInt(_) | SQLDataType::Int2(_) => Ok(DataType::Int16),
235-
SQLDataType::Int(_) | SQLDataType::Integer(_) | SQLDataType::Int4(_) => {
236-
Ok(DataType::Int32)
237-
}
238-
SQLDataType::BigInt(_) | SQLDataType::Int8(_) => Ok(DataType::Int64),
239-
SQLDataType::UnsignedTinyInt(_) => Ok(DataType::UInt8),
240-
SQLDataType::UnsignedSmallInt(_) | SQLDataType::UnsignedInt2(_) => Ok(DataType::UInt16),
234+
SQLDataType::SmallInt(_) | SQLDataType::Int2(_)| SQLDataType::Int16 => Ok(DataType::Int16),
235+
SQLDataType::Int(_)
236+
| SQLDataType::Integer(_)
237+
| SQLDataType::Int4(_)
238+
| SQLDataType::Int32 => Ok(DataType::Int32) ,
239+
SQLDataType::BigInt(_) | SQLDataType::Int8(_) | SQLDataType::Int64 => Ok(DataType::Int64),
240+
SQLDataType::UnsignedTinyInt(_) | SQLDataType::UInt8 => Ok(DataType::UInt8),
241+
SQLDataType::UnsignedSmallInt(_)
242+
| SQLDataType::UnsignedInt2(_)
243+
| SQLDataType::UInt16 => Ok(DataType::UInt16),
241244
SQLDataType::UnsignedInt(_)
242245
| SQLDataType::UnsignedInteger(_)
243-
| SQLDataType::UnsignedInt4(_) => Ok(DataType::UInt32),
246+
| SQLDataType::UnsignedInt4(_)
247+
| SQLDataType::UInt32 => Ok(DataType::UInt32),
244248
SQLDataType::Varchar(length) => match (length, true) {
245249
(Some(_), false) => plan_err!(
246250
"does not support Varchar with length, please set `support_varchar_with_length` to be true"
247251
),
248252
_ => Ok(DataType::Utf8),
249253
},
250254
SQLDataType::Blob(_) => Ok(DataType::Binary),
251-
SQLDataType::UnsignedBigInt(_) | SQLDataType::UnsignedInt8(_) => Ok(DataType::UInt64),
252-
SQLDataType::Real | SQLDataType::Float4 | SQLDataType::Float(_) => {
253-
Ok(DataType::Float32)
254-
}
255-
SQLDataType::Double | SQLDataType::DoublePrecision | SQLDataType::Float8 => {
256-
Ok(DataType::Float64)
257-
}
255+
SQLDataType::UnsignedBigInt(_)
256+
| SQLDataType::UnsignedInt8(_)
257+
| SQLDataType::UInt64 => Ok(DataType::UInt64),
258+
SQLDataType::Real
259+
| SQLDataType::Float4
260+
| SQLDataType::Float(_)
261+
| SQLDataType::Float32=> Ok(DataType::Float32),
262+
SQLDataType::Double
263+
| SQLDataType::DoublePrecision
264+
| SQLDataType::Float8
265+
| SQLDataType::Float64 => Ok(DataType::Float64),
258266
SQLDataType::Char(_) | SQLDataType::Text | SQLDataType::String(_) => Ok(DataType::Utf8),
259267
SQLDataType::Timestamp(precision, tz_info) => {
260268
let tz = if matches!(tz_info, TimezoneInfo::Tz)
@@ -386,19 +394,10 @@ where
386394
| SQLDataType::BigDecimal(_)
387395
| SQLDataType::Clob(_)
388396
| SQLDataType::Bytes(_)
389-
| SQLDataType::Int16
390-
| SQLDataType::Int32
391-
| SQLDataType::Int64
392397
| SQLDataType::Int128
393398
| SQLDataType::Int256
394-
| SQLDataType::UInt8
395-
| SQLDataType::UInt16
396-
| SQLDataType::UInt32
397-
| SQLDataType::UInt64
398399
| SQLDataType::UInt128
399400
| SQLDataType::UInt256
400-
| SQLDataType::Float32
401-
| SQLDataType::Float64
402401
| SQLDataType::Date32
403402
| SQLDataType::Datetime64(_, _)
404403
| SQLDataType::FixedString(_)

crates/utils/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,9 @@ impl Db {
134134
///
135135
/// Returns a `DbError` if the database operations fail, or
136136
/// `SerializeError`/`DeserializeError` if the value cannot be serialized or deserialized.
137-
pub async fn modify<T>(&self, key: &str, f: impl Fn(&mut T)) -> Result<()>
137+
pub async fn modify<T>(&self, key: &str, f: impl Fn(&mut T) + Send) -> Result<()>
138138
where
139-
T: serde::Serialize + DeserializeOwned + Default + Sync,
139+
T: serde::Serialize + DeserializeOwned + Default + Sync + Send,
140140
{
141141
let mut value: T = self.get(key).await?.unwrap_or_default();
142142

0 commit comments

Comments
 (0)