Skip to content

Commit eeaddd3

Browse files
authored
Added support for day and other datetime keywords in DF (#294)
* start here * no macro, just simple vistor mut * remvoe redundant trait creation * integration test add, fixed logic, next unit tests * activated impl and cargo fmt + clippy * added unit tests + impl extension * cargo fmt + clippy * cargo clippy * cargo clippy * removed redundant check * fix * merged with Artem PR * cargo fmt + clippy * small fixes * cargo fmt + clippy
1 parent e7c0c11 commit eeaddd3

File tree

7 files changed

+367
-86
lines changed

7 files changed

+367
-86
lines changed

crates/runtime/src/datafusion/execution.rs

Lines changed: 143 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,11 @@ impl SqlExecutor {
106106
warehouse_name: &str,
107107
) -> IceBucketSQLResult<Vec<RecordBatch>> {
108108
// Update query to use custom JSON functions
109-
let query = self.preprocess_query(query);
109+
let query = Self::preprocess_query(query);
110110
let mut statement = self
111111
.parse_query(query.as_str())
112112
.context(super::error::DataFusionSnafu)?;
113113
Self::postprocess_query_statement(&mut statement);
114-
115114
// statement = self.update_statement_references(statement, warehouse_name);
116115
// query = statement.to_string();
117116

@@ -244,18 +243,15 @@ impl SqlExecutor {
244243
/// Panics if .
245244
#[must_use]
246245
#[allow(clippy::unwrap_used)]
247-
#[tracing::instrument(level = "trace", skip(self), ret)]
248-
pub fn preprocess_query(&self, query: &str) -> String {
246+
#[tracing::instrument(level = "trace", ret)]
247+
pub fn preprocess_query(query: &str) -> String {
249248
// Replace field[0].subfield -> json_get(json_get(field, 0), 'subfield')
250249
// TODO: This regex should be a static allocation
251250
let re = regex::Regex::new(r"(\w+.\w+)\[(\d+)][:\.](\w+)").unwrap();
252-
let date_add =
253-
regex::Regex::new(r"(date|time|timestamp)(_?add|_?diff)\(\s*([a-zA-Z]+),").unwrap();
254-
251+
//date_add processing moved to `postprocess_query_statement`
255252
let mut query = re
256253
.replace_all(query, "json_get(json_get($1, $2), '$3')")
257254
.to_string();
258-
query = date_add.replace_all(&query, "$1$2('$3',").to_string();
259255
let alter_iceberg_table = regex::Regex::new(r"alter\s+iceberg\s+table").unwrap();
260256
query = alter_iceberg_table
261257
.replace_all(&query, "alter table")
@@ -398,6 +394,7 @@ impl SqlExecutor {
398394
.await
399395
.context(ih_error::IcebergSnafu)?;
400396
};
397+
401398
// Create new table
402399
rest_catalog
403400
.create_table(
@@ -1336,8 +1333,144 @@ pub fn created_entity_response() -> Result<Vec<RecordBatch>, arrow::error::Arrow
13361333
}
13371334

13381335
#[cfg(test)]
1339-
mod test {
1340-
use crate::datafusion::execution::SqlExecutor;
1336+
mod tests {
1337+
use super::SqlExecutor;
1338+
use crate::datafusion::{session::SessionParams, type_planner::CustomTypePlanner};
1339+
use datafusion::sql::parser::Statement as DFStatement;
1340+
use datafusion::sql::sqlparser::ast::visit_expressions;
1341+
use datafusion::sql::sqlparser::ast::{Expr, ObjectName};
1342+
use datafusion::{
1343+
execution::SessionStateBuilder,
1344+
prelude::{SessionConfig, SessionContext},
1345+
};
1346+
use datafusion_iceberg::planner::IcebergQueryPlanner;
1347+
use sqlparser::ast::Value;
1348+
use sqlparser::ast::{
1349+
Function, FunctionArg, FunctionArgExpr, FunctionArgumentList, FunctionArguments,
1350+
};
1351+
use std::ops::ControlFlow;
1352+
use std::sync::Arc;
1353+
1354+
struct Test<'a, T> {
1355+
input: &'a str,
1356+
expected: T,
1357+
should_work: bool,
1358+
}
1359+
impl<'a, T> Test<'a, T> {
1360+
pub const fn new(input: &'a str, expected: T, should_work: bool) -> Self {
1361+
Self {
1362+
input,
1363+
expected,
1364+
should_work,
1365+
}
1366+
}
1367+
}
1368+
#[test]
1369+
#[allow(
1370+
clippy::unwrap_used,
1371+
clippy::explicit_iter_loop,
1372+
clippy::collapsible_match
1373+
)]
1374+
fn test_timestamp_keywords_postprocess() {
1375+
let state = SessionStateBuilder::new()
1376+
.with_config(
1377+
SessionConfig::new()
1378+
.with_information_schema(true)
1379+
.with_option_extension(SessionParams::default())
1380+
.set_str("datafusion.sql_parser.dialect", "SNOWFLAKE"),
1381+
)
1382+
.with_default_features()
1383+
.with_query_planner(Arc::new(IcebergQueryPlanner {}))
1384+
.with_type_planner(Arc::new(CustomTypePlanner {}))
1385+
.build();
1386+
let ctx = SessionContext::new_with_state(state);
1387+
let executor = SqlExecutor::new(ctx).unwrap();
1388+
let test = vec![
1389+
Test::new(
1390+
"SELECT dateadd(year, 5, '2025-06-01')",
1391+
Value::SingleQuotedString("year".to_owned()),
1392+
true,
1393+
),
1394+
Test::new(
1395+
"SELECT dateadd(\"year\", 5, '2025-06-01')",
1396+
Value::SingleQuotedString("year".to_owned()),
1397+
true,
1398+
),
1399+
Test::new(
1400+
"SELECT dateadd('year', 5, '2025-06-01')",
1401+
Value::SingleQuotedString("year".to_owned()),
1402+
true,
1403+
),
1404+
Test::new(
1405+
"SELECT dateadd(\"'year'\", 5, '2025-06-01')",
1406+
Value::SingleQuotedString("year".to_owned()),
1407+
false,
1408+
),
1409+
Test::new(
1410+
"SELECT dateadd(\'year\', 5, '2025-06-01')",
1411+
Value::SingleQuotedString("year".to_owned()),
1412+
true,
1413+
),
1414+
Test::new(
1415+
"SELECT datediff(day, 5, '2025-06-01')",
1416+
Value::SingleQuotedString("day".to_owned()),
1417+
true,
1418+
),
1419+
Test::new(
1420+
"SELECT datediff('week', 5, '2025-06-01')",
1421+
Value::SingleQuotedString("week".to_owned()),
1422+
true,
1423+
),
1424+
Test::new(
1425+
"SELECT datediff(nsecond, 10000000, '2025-06-01')",
1426+
Value::SingleQuotedString("nsecond".to_owned()),
1427+
true,
1428+
),
1429+
Test::new(
1430+
"SELECT date_diff(hour, 5, '2025-06-01')",
1431+
Value::SingleQuotedString("hour".to_owned()),
1432+
true,
1433+
),
1434+
Test::new(
1435+
"SELECT date_add(us, 100000, '2025-06-01')",
1436+
Value::SingleQuotedString("us".to_owned()),
1437+
true,
1438+
),
1439+
];
1440+
for test in test.iter() {
1441+
let mut statement = executor.parse_query(test.input).unwrap();
1442+
SqlExecutor::postprocess_query_statement(&mut statement);
1443+
if let DFStatement::Statement(statement) = statement {
1444+
visit_expressions(&statement, |expr| {
1445+
if let Expr::Function(Function {
1446+
name: ObjectName(idents),
1447+
args: FunctionArguments::List(FunctionArgumentList { args, .. }),
1448+
..
1449+
}) = expr
1450+
{
1451+
match idents.first().unwrap().value.as_str() {
1452+
"dateadd" | "date_add" | "datediff" | "date_diff" => {
1453+
if let FunctionArg::Unnamed(FunctionArgExpr::Expr(ident)) =
1454+
args.iter().next().unwrap()
1455+
{
1456+
if let Expr::Value(found) = ident {
1457+
if test.should_work {
1458+
assert_eq!(*found, test.expected);
1459+
} else {
1460+
assert_ne!(*found, test.expected);
1461+
}
1462+
}
1463+
}
1464+
}
1465+
_ => {}
1466+
}
1467+
}
1468+
ControlFlow::<()>::Continue(())
1469+
});
1470+
}
1471+
}
1472+
}
1473+
13411474
use datafusion::sql::parser::DFParser;
13421475

13431476
#[allow(clippy::unwrap_used)]

crates/runtime/src/datafusion/functions/convert_timezone.rs

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -341,11 +341,10 @@ mod tests {
341341
);
342342
assert_eq!(
343343
result, expected,
344-
"convert_timezone created wrong value for {}",
345-
source_timestamp_tz_value
346-
)
344+
"convert_timezone created wrong value for {source_timestamp_tz_value}"
345+
);
347346
}
348-
_ => panic!("Conversion of {} failed", source_timestamp_tz),
347+
_ => panic!("Conversion of {source_timestamp_tz} failed"),
349348
}
350349
}
351350
#[test]
@@ -371,11 +370,10 @@ mod tests {
371370
);
372371
assert_eq!(
373372
result, expected,
374-
"convert_timezone created wrong value for {}",
375-
source_timestamp_tz_value
376-
)
373+
"convert_timezone created wrong value for {source_timestamp_tz_value}"
374+
);
377375
}
378-
_ => panic!("Conversion of {} failed", source_timestamp_tz_value),
376+
_ => panic!("Conversion of {source_timestamp_tz_value} failed"),
379377
}
380378
}
381379
#[test]
@@ -403,11 +401,10 @@ mod tests {
403401
);
404402
assert_ne!(
405403
result, expected,
406-
"convert_timezone created wrong value for {}",
407-
source_timestamp_tz_value
408-
)
404+
"convert_timezone created wrong value for {source_timestamp_tz_value}"
405+
);
409406
}
410-
_ => panic!("Conversion of {} failed", source_timestamp_tz_value),
407+
_ => panic!("Conversion of {source_timestamp_tz_value} failed"),
411408
}
412409
}
413410
}

crates/runtime/src/datafusion/functions/date_add.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ mod tests {
235235
)),
236236
];
237237
let fn_args = ScalarFunctionArgs {
238-
args: args,
238+
args,
239239
number_rows: 0,
240240
return_type: &arrow_schema::DataType::Timestamp(
241241
arrow_schema::TimeUnit::Microsecond,
@@ -248,7 +248,7 @@ mod tests {
248248
Some(1736600400000000i64),
249249
Some(Arc::from(String::from("+00").into_boxed_str())),
250250
);
251-
assert_eq!(&result, &expected, "date_add created a wrong value")
251+
assert_eq!(&result, &expected, "date_add created a wrong value");
252252
}
253253
_ => panic!("Conversion failed"),
254254
}
@@ -268,7 +268,7 @@ mod tests {
268268
),
269269
];
270270
let fn_args = ScalarFunctionArgs {
271-
args: args,
271+
args,
272272
number_rows: 0,
273273
return_type: &arrow_schema::DataType::Timestamp(
274274
arrow_schema::TimeUnit::Microsecond,
@@ -283,7 +283,7 @@ mod tests {
283283
)
284284
.to_array()
285285
.unwrap();
286-
assert_eq!(&result, &expected, "date_add created a wrong value")
286+
assert_eq!(&result, &expected, "date_add created a wrong value");
287287
}
288288
_ => panic!("Conversion failed"),
289289
}
@@ -303,7 +303,7 @@ mod tests {
303303
),
304304
];
305305
let fn_args = ScalarFunctionArgs {
306-
args: args,
306+
args,
307307
number_rows: 0,
308308
return_type: &arrow_schema::DataType::Timestamp(
309309
arrow_schema::TimeUnit::Microsecond,
@@ -318,7 +318,7 @@ mod tests {
318318
))
319319
.to_array(2)
320320
.unwrap();
321-
assert_eq!(&result, &expected, "date_add created a wrong value")
321+
assert_eq!(&result, &expected, "date_add created a wrong value");
322322
}
323323
_ => panic!("Conversion failed"),
324324
}

crates/runtime/src/datafusion/functions/mod.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
use std::sync::Arc;
1919

2020
use datafusion::{common::Result, execution::FunctionRegistry, logical_expr::ScalarUDF};
21-
use sqlparser::ast::Value::SingleQuotedString;
22-
use sqlparser::ast::{Expr, Function, FunctionArg, FunctionArgExpr, FunctionArguments};
21+
use sqlparser::ast::Value::{self, SingleQuotedString};
22+
use sqlparser::ast::{
23+
Expr, Function, FunctionArg, FunctionArgExpr, FunctionArgumentList, FunctionArguments, Ident,
24+
};
2325

2426
mod convert_timezone;
2527
mod date_add;
@@ -99,4 +101,18 @@ pub fn visit_functions_expressions(func: &mut Function) {
99101
_ => func_name,
100102
};
101103
func.name = sqlparser::ast::ObjectName(vec![sqlparser::ast::Ident::new(name)]);
104+
if let FunctionArguments::List(FunctionArgumentList { args, .. }) = &mut func.args {
105+
match func_name {
106+
"dateadd" | "date_add" | "datediff" | "date_diff" => {
107+
if let Some(FunctionArg::Unnamed(FunctionArgExpr::Expr(ident))) =
108+
args.iter_mut().next()
109+
{
110+
if let Expr::Identifier(Ident { value, .. }) = ident {
111+
*ident = Expr::Value(Value::SingleQuotedString(value.clone()));
112+
}
113+
}
114+
}
115+
_ => {}
116+
}
117+
}
102118
}

crates/runtime/src/tests/queries.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
// use crate::tests::utils::macros::test_query;
18+
use crate::tests::utils::macros::test_query;
1919

20+
test_query!(select_date_add_diff, "SELECT dateadd(day, 5, '2025-06-01')");
2021
// // SELECT
2122
// test_query!(select_star, "SELECT * FROM employee_table");
2223
// test_query!(select_ilike, "SELECT * ILIKE '%id%' FROM employee_table;");

0 commit comments

Comments
 (0)