From 5d34ec4f65415acf55969ac7618cdd9f84c2f159 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Wed, 25 Jun 2025 11:38:55 +0300 Subject: [PATCH 1/2] Add basic support for flatten join --- datafusion/physical-expr/src/planner.rs | 4 +++ datafusion/sql/src/relation/mod.rs | 42 +++++++++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index 8660bff796d5..5534c3c38dcd 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -118,6 +118,10 @@ pub fn create_physical_expr( let idx = input_dfschema.index_of_column(c)?; Ok(Arc::new(Column::new(&c.name, idx))) } + Expr::OuterReferenceColumn(_datatype, c) => { + let idx = input_dfschema.index_of_column(c)?; + Ok(Arc::new(Column::new(&c.name, idx))) + } Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))), Expr::ScalarVariable(_, variable_names) => { if is_system_variables(variable_names) { diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index cf9812e1af83..8d0e101cec0b 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -400,6 +400,48 @@ impl SqlToRel<'_, S> { (unpivot_plan, alias) } + TableFactor::Function { + name, args, alias, .. + } => { + let tbl_func_name = self.object_name_to_table_reference(name)?; + let schema = planner_context + .outer_query_schema() + .cloned() + .unwrap_or_else(DFSchema::empty); + let func_args = args + .into_iter() + .map(|arg| match arg { + FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => { + let expr = self.sql_expr_to_logical_expr( + expr, + &schema, + planner_context, + )?; + Ok((expr, None)) + } + FunctionArg::Named { + name, + arg: FunctionArgExpr::Expr(expr), + .. + } => { + let expr = self.sql_expr_to_logical_expr( + expr, + &schema, + planner_context, + )?; + Ok((expr, Some(name.value.clone()))) + } + _ => plan_err!("Unsupported function argument: {arg:?}"), + }) + .collect::)>>>()?; + + let provider = self + .context_provider + .get_table_function_source(tbl_func_name.table(), func_args)?; + let plan = + LogicalPlanBuilder::scan(tbl_func_name, provider, None)?.build()?; + (plan, alias) + } // @todo: Support TableFactory::TableFunction _ => { return not_impl_err!( From a8b343db6b7fe0f285d925ba3fb8e5d3a8e7b91a Mon Sep 17 00:00:00 2001 From: osipovartem Date: Thu, 26 Jun 2025 14:38:56 +0300 Subject: [PATCH 2/2] Fix outer --- datafusion/physical-expr/src/planner.rs | 4 ---- datafusion/sql/src/relation/mod.rs | 6 +++--- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index 5534c3c38dcd..8660bff796d5 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -118,10 +118,6 @@ pub fn create_physical_expr( let idx = input_dfschema.index_of_column(c)?; Ok(Arc::new(Column::new(&c.name, idx))) } - Expr::OuterReferenceColumn(_datatype, c) => { - let idx = input_dfschema.index_of_column(c)?; - Ok(Arc::new(Column::new(&c.name, idx))) - } Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))), Expr::ScalarVariable(_, variable_names) => { if is_system_variables(variable_names) { diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index 8d0e101cec0b..2dbc1c920c65 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -403,7 +403,7 @@ impl SqlToRel<'_, S> { TableFactor::Function { name, args, alias, .. } => { - let tbl_func_name = self.object_name_to_table_reference(name)?; + let tbl_func_ref = self.object_name_to_table_reference(name)?; let schema = planner_context .outer_query_schema() .cloned() @@ -434,10 +434,10 @@ impl SqlToRel<'_, S> { _ => plan_err!("Unsupported function argument: {arg:?}"), }) .collect::)>>>()?; - + let tbl_func_name = tbl_func_ref.table().to_ascii_lowercase(); let provider = self .context_provider - .get_table_function_source(tbl_func_name.table(), func_args)?; + .get_table_function_source(&tbl_func_name, func_args)?; let plan = LogicalPlanBuilder::scan(tbl_func_name, provider, None)?.build()?; (plan, alias)