From bda4dc2a73b7f94f9ec44d688dacec6c0c25fac3 Mon Sep 17 00:00:00 2001 From: Maxim Bogdanov Date: Thu, 24 Apr 2025 19:49:30 +0200 Subject: [PATCH 1/3] named arguments for table functions --- datafusion/catalog/src/table.rs | 4 ++-- .../core/src/execution/session_state.rs | 4 ++-- .../user_defined_table_functions.rs | 5 ++-- datafusion/expr/src/planner.rs | 2 +- .../functions-table/src/generate_series.rs | 8 +++---- datafusion/sql/src/relation/mod.rs | 24 +++++++++++++++++-- 6 files changed, 34 insertions(+), 13 deletions(-) diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index ecc792f73d30..adabb3a7eea6 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -316,7 +316,7 @@ pub trait TableProviderFactory: Debug + Sync + Send { /// A trait for table function implementations pub trait TableFunctionImpl: Debug + Sync + Send { /// Create a table provider - fn call(&self, args: &[Expr]) -> Result>; + fn call(&self, args: &[(Expr, Option)]) -> Result>; } /// A table that uses a function to generate data @@ -345,7 +345,7 @@ impl TableFunction { } /// Get the function implementation and generate a table - pub fn create_table_provider(&self, args: &[Expr]) -> Result> { + pub fn create_table_provider(&self, args: &[(Expr, Option)]) -> Result> { self.fun.call(args) } } diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index d640f8e37a29..f2f2ce087839 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -33,7 +33,7 @@ use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; use datafusion_catalog::information_schema::{ InformationSchemaProvider, INFORMATION_SCHEMA, }; -use datafusion_catalog::MemoryCatalogProviderList; +use datafusion_catalog::{MemoryCatalogProviderList}; use arrow::datatypes::{DataType, SchemaRef}; use datafusion_catalog::{Session, TableFunction, TableFunctionImpl}; @@ -1655,7 +1655,7 @@ impl ContextProvider for SessionContextProvider<'_> { fn get_table_function_source( &self, name: &str, - args: Vec, + args: Vec<(Expr, Option)>, ) -> datafusion_common::Result> { let tbl_func = self .state diff --git a/datafusion/core/tests/user_defined/user_defined_table_functions.rs b/datafusion/core/tests/user_defined/user_defined_table_functions.rs index e4aff0b00705..c6c9e78d88e1 100644 --- a/datafusion/core/tests/user_defined/user_defined_table_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_table_functions.rs @@ -200,10 +200,11 @@ impl SimpleCsvTable { struct SimpleCsvTableFunc {} impl TableFunctionImpl for SimpleCsvTableFunc { - fn call(&self, exprs: &[Expr]) -> Result> { + fn call(&self, args: &[(Expr, Option)]) -> Result> { + dbg!(args); let mut new_exprs = vec![]; let mut filepath = String::new(); - for expr in exprs { + for (expr,_) in args { match expr { Expr::Literal(ScalarValue::Utf8(Some(ref path))) => { filepath.clone_from(path); diff --git a/datafusion/expr/src/planner.rs b/datafusion/expr/src/planner.rs index a2ed0592efdb..6c8a3f864f25 100644 --- a/datafusion/expr/src/planner.rs +++ b/datafusion/expr/src/planner.rs @@ -52,7 +52,7 @@ pub trait ContextProvider { fn get_table_function_source( &self, _name: &str, - _args: Vec, + _args: Vec<(Expr, Option)>, ) -> Result> { not_impl_err!("Table Functions are not supported") } diff --git a/datafusion/functions-table/src/generate_series.rs b/datafusion/functions-table/src/generate_series.rs index 5bb56f28bc8d..485454d9acab 100644 --- a/datafusion/functions-table/src/generate_series.rs +++ b/datafusion/functions-table/src/generate_series.rs @@ -188,13 +188,13 @@ struct GenerateSeriesFuncImpl { } impl TableFunctionImpl for GenerateSeriesFuncImpl { - fn call(&self, exprs: &[Expr]) -> Result> { + fn call(&self, exprs: &[(Expr, Option)]) -> Result> { if exprs.is_empty() || exprs.len() > 3 { return plan_err!("{} function requires 1 to 3 arguments", self.name); } let mut normalize_args = Vec::new(); - for expr in exprs { + for (expr,_) in exprs { match expr { Expr::Literal(ScalarValue::Null) => {} Expr::Literal(ScalarValue::Int64(Some(n))) => normalize_args.push(*n), @@ -257,7 +257,7 @@ impl TableFunctionImpl for GenerateSeriesFuncImpl { pub struct GenerateSeriesFunc {} impl TableFunctionImpl for GenerateSeriesFunc { - fn call(&self, exprs: &[Expr]) -> Result> { + fn call(&self, exprs: &[(Expr, Option)]) -> Result> { let impl_func = GenerateSeriesFuncImpl { name: "generate_series", include_end: true, @@ -270,7 +270,7 @@ impl TableFunctionImpl for GenerateSeriesFunc { pub struct RangeFunc {} impl TableFunctionImpl for RangeFunc { - fn call(&self, exprs: &[Expr]) -> Result> { + fn call(&self, exprs: &[(Expr, Option)]) -> Result> { let impl_func = GenerateSeriesFuncImpl { name: "range", include_end: false, diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index 8078261d9152..f9d8989c08bb 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -47,7 +47,7 @@ impl SqlToRel<'_, S> { let args = func_args .args .into_iter() - .flat_map(|arg| { + .map(|arg| { if let FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) = arg { self.sql_expr_to_logical_expr( @@ -55,11 +55,31 @@ impl SqlToRel<'_, S> { &DFSchema::empty(), planner_context, ) + .map(|expr| (expr, None)) + } else if let FunctionArg::Named { + name, + arg, + .. + } = arg + { + if let FunctionArgExpr::Expr(expr) = arg { + self.sql_expr_to_logical_expr( + expr, + &DFSchema::empty(), + planner_context, + ) + .map(|expr| (expr, Some(name.to_string()))) + } else { + plan_err!( + "Unsupported function argument type: {:?}", + arg + ) + } } else { plan_err!("Unsupported function argument type: {:?}", arg) } }) - .collect::>(); + .collect::>>()?; let provider = self .context_provider .get_table_function_source(&tbl_func_name, args)?; From 3940e44d4b665d28a96bf9052f99b948f915bdd9 Mon Sep 17 00:00:00 2001 From: Maxim Bogdanov Date: Fri, 25 Apr 2025 14:46:37 +0200 Subject: [PATCH 2/3] clippy --- datafusion-cli/src/functions.rs | 6 +++--- datafusion-examples/examples/simple_udtf.rs | 6 +++--- datafusion/catalog/src/table.rs | 5 ++++- datafusion/core/src/execution/session_state.rs | 2 +- .../tests/user_defined/user_defined_table_functions.rs | 2 +- datafusion/functions-table/src/generate_series.rs | 2 +- datafusion/sql/src/relation/mod.rs | 7 +------ 7 files changed, 14 insertions(+), 16 deletions(-) diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs index 13d2d5fd3547..7c057f60e337 100644 --- a/datafusion-cli/src/functions.rs +++ b/datafusion-cli/src/functions.rs @@ -320,10 +320,10 @@ fn fixed_len_byte_array_to_string(val: Option<&FixedLenByteArray>) -> Option Result> { + fn call(&self, exprs: &[(Expr, Option)]) -> Result> { let filename = match exprs.first() { - Some(Expr::Literal(ScalarValue::Utf8(Some(s)))) => s, // single quote: parquet_metadata('x.parquet') - Some(Expr::Column(Column { name, .. })) => name, // double quote: parquet_metadata("x.parquet") + Some((Expr::Literal(ScalarValue::Utf8(Some(s))), _)) => s, // single quote: parquet_metadata('x.parquet') + Some((Expr::Column(Column { name, .. }), _)) => name, // double quote: parquet_metadata("x.parquet") _ => { return plan_err!( "parquet_metadata requires string argument as its input" diff --git a/datafusion-examples/examples/simple_udtf.rs b/datafusion-examples/examples/simple_udtf.rs index d2b2d1bf9655..4426fd2e5154 100644 --- a/datafusion-examples/examples/simple_udtf.rs +++ b/datafusion-examples/examples/simple_udtf.rs @@ -132,14 +132,14 @@ impl TableProvider for LocalCsvTable { struct LocalCsvTableFunc {} impl TableFunctionImpl for LocalCsvTableFunc { - fn call(&self, exprs: &[Expr]) -> Result> { - let Some(Expr::Literal(ScalarValue::Utf8(Some(ref path)))) = exprs.first() else { + fn call(&self, exprs: &[(Expr, Option)]) -> Result> { + let Some((Expr::Literal(ScalarValue::Utf8(Some(ref path))),_)) = exprs.first() else { return plan_err!("read_csv requires at least one string argument"); }; let limit = exprs .get(1) - .map(|expr| { + .map(|(expr,_)| { // try to simplify the expression, so 1+2 becomes 3, for example let execution_props = ExecutionProps::new(); let info = SimplifyContext::new(&execution_props); diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index adabb3a7eea6..ad1381ccc3b3 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -345,7 +345,10 @@ impl TableFunction { } /// Get the function implementation and generate a table - pub fn create_table_provider(&self, args: &[(Expr, Option)]) -> Result> { + pub fn create_table_provider( + &self, + args: &[(Expr, Option)], + ) -> Result> { self.fun.call(args) } } diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index f2f2ce087839..fef08e9fe51f 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -33,7 +33,7 @@ use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; use datafusion_catalog::information_schema::{ InformationSchemaProvider, INFORMATION_SCHEMA, }; -use datafusion_catalog::{MemoryCatalogProviderList}; +use datafusion_catalog::MemoryCatalogProviderList; use arrow::datatypes::{DataType, SchemaRef}; use datafusion_catalog::{Session, TableFunction, TableFunctionImpl}; diff --git a/datafusion/core/tests/user_defined/user_defined_table_functions.rs b/datafusion/core/tests/user_defined/user_defined_table_functions.rs index c6c9e78d88e1..e6da549f6463 100644 --- a/datafusion/core/tests/user_defined/user_defined_table_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_table_functions.rs @@ -204,7 +204,7 @@ impl TableFunctionImpl for SimpleCsvTableFunc { dbg!(args); let mut new_exprs = vec![]; let mut filepath = String::new(); - for (expr,_) in args { + for (expr, _) in args { match expr { Expr::Literal(ScalarValue::Utf8(Some(ref path))) => { filepath.clone_from(path); diff --git a/datafusion/functions-table/src/generate_series.rs b/datafusion/functions-table/src/generate_series.rs index 485454d9acab..4bc21e373351 100644 --- a/datafusion/functions-table/src/generate_series.rs +++ b/datafusion/functions-table/src/generate_series.rs @@ -194,7 +194,7 @@ impl TableFunctionImpl for GenerateSeriesFuncImpl { } let mut normalize_args = Vec::new(); - for (expr,_) in exprs { + for (expr, _) in exprs { match expr { Expr::Literal(ScalarValue::Null) => {} Expr::Literal(ScalarValue::Int64(Some(n))) => normalize_args.push(*n), diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index f9d8989c08bb..ca744bb67696 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -56,12 +56,7 @@ impl SqlToRel<'_, S> { planner_context, ) .map(|expr| (expr, None)) - } else if let FunctionArg::Named { - name, - arg, - .. - } = arg - { + } else if let FunctionArg::Named { name, arg, .. } = arg { if let FunctionArgExpr::Expr(expr) = arg { self.sql_expr_to_logical_expr( expr, From 79edbd8e901ee53a8c7bc82c78797ce6127a8651 Mon Sep 17 00:00:00 2001 From: Maxim Bogdanov Date: Tue, 29 Apr 2025 13:40:34 +0200 Subject: [PATCH 3/3] fmt --- datafusion-examples/examples/simple_udtf.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/datafusion-examples/examples/simple_udtf.rs b/datafusion-examples/examples/simple_udtf.rs index 4426fd2e5154..4c174a687f3e 100644 --- a/datafusion-examples/examples/simple_udtf.rs +++ b/datafusion-examples/examples/simple_udtf.rs @@ -133,13 +133,14 @@ struct LocalCsvTableFunc {} impl TableFunctionImpl for LocalCsvTableFunc { fn call(&self, exprs: &[(Expr, Option)]) -> Result> { - let Some((Expr::Literal(ScalarValue::Utf8(Some(ref path))),_)) = exprs.first() else { + let Some((Expr::Literal(ScalarValue::Utf8(Some(ref path))), _)) = exprs.first() + else { return plan_err!("read_csv requires at least one string argument"); }; let limit = exprs .get(1) - .map(|(expr,_)| { + .map(|(expr, _)| { // try to simplify the expression, so 1+2 becomes 3, for example let execution_props = ExecutionProps::new(); let info = SimplifyContext::new(&execution_props);