Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,10 @@ fn fixed_len_byte_array_to_string(val: Option<&FixedLenByteArray>) -> Option<Str
pub struct ParquetMetadataFunc {}

impl TableFunctionImpl for ParquetMetadataFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
fn call(&self, exprs: &[(Expr, Option<String>)]) -> Result<Arc<dyn TableProvider>> {
let filename = match exprs.first() {
Some(Expr::Literal(ScalarValue::Utf8(Some(s)))) => s, // single quote: parquet_metadata('x.parquet')
Some(Expr::Column(Column { name, .. })) => name, // double quote: parquet_metadata("x.parquet")
Some((Expr::Literal(ScalarValue::Utf8(Some(s))), _)) => s, // single quote: parquet_metadata('x.parquet')
Some((Expr::Column(Column { name, .. }), _)) => name, // double quote: parquet_metadata("x.parquet")
_ => {
return plan_err!(
"parquet_metadata requires string argument as its input"
Expand Down
7 changes: 4 additions & 3 deletions datafusion-examples/examples/simple_udtf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,15 @@ impl TableProvider for LocalCsvTable {
struct LocalCsvTableFunc {}

impl TableFunctionImpl for LocalCsvTableFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
let Some(Expr::Literal(ScalarValue::Utf8(Some(ref path)))) = exprs.first() else {
fn call(&self, exprs: &[(Expr, Option<String>)]) -> Result<Arc<dyn TableProvider>> {
let Some((Expr::Literal(ScalarValue::Utf8(Some(ref path))), _)) = exprs.first()
else {
return plan_err!("read_csv requires at least one string argument");
};

let limit = exprs
.get(1)
.map(|expr| {
.map(|(expr, _)| {
// try to simplify the expression, so 1+2 becomes 3, for example
let execution_props = ExecutionProps::new();
let info = SimplifyContext::new(&execution_props);
Expand Down
7 changes: 5 additions & 2 deletions datafusion/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ pub trait TableProviderFactory: Debug + Sync + Send {
/// A trait for table function implementations
pub trait TableFunctionImpl: Debug + Sync + Send {
/// Create a table provider
fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>>;
fn call(&self, args: &[(Expr, Option<String>)]) -> Result<Arc<dyn TableProvider>>;
}

/// A table that uses a function to generate data
Expand Down Expand Up @@ -345,7 +345,10 @@ impl TableFunction {
}

/// Get the function implementation and generate a table
pub fn create_table_provider(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>> {
pub fn create_table_provider(
&self,
args: &[(Expr, Option<String>)],
) -> Result<Arc<dyn TableProvider>> {
self.fun.call(args)
}
}
2 changes: 1 addition & 1 deletion datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1655,7 +1655,7 @@ impl ContextProvider for SessionContextProvider<'_> {
fn get_table_function_source(
&self,
name: &str,
args: Vec<Expr>,
args: Vec<(Expr, Option<String>)>,
) -> datafusion_common::Result<Arc<dyn TableSource>> {
let tbl_func = self
.state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,11 @@ impl SimpleCsvTable {
struct SimpleCsvTableFunc {}

impl TableFunctionImpl for SimpleCsvTableFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
fn call(&self, args: &[(Expr, Option<String>)]) -> Result<Arc<dyn TableProvider>> {
dbg!(args);
let mut new_exprs = vec![];
let mut filepath = String::new();
for expr in exprs {
for (expr, _) in args {
match expr {
Expr::Literal(ScalarValue::Utf8(Some(ref path))) => {
filepath.clone_from(path);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub trait ContextProvider {
fn get_table_function_source(
&self,
_name: &str,
_args: Vec<Expr>,
_args: Vec<(Expr, Option<String>)>,
) -> Result<Arc<dyn TableSource>> {
not_impl_err!("Table Functions are not supported")
}
Expand Down
8 changes: 4 additions & 4 deletions datafusion/functions-table/src/generate_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,13 @@ struct GenerateSeriesFuncImpl {
}

impl TableFunctionImpl for GenerateSeriesFuncImpl {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
fn call(&self, exprs: &[(Expr, Option<String>)]) -> Result<Arc<dyn TableProvider>> {
if exprs.is_empty() || exprs.len() > 3 {
return plan_err!("{} function requires 1 to 3 arguments", self.name);
}

let mut normalize_args = Vec::new();
for expr in exprs {
for (expr, _) in exprs {
match expr {
Expr::Literal(ScalarValue::Null) => {}
Expr::Literal(ScalarValue::Int64(Some(n))) => normalize_args.push(*n),
Expand Down Expand Up @@ -257,7 +257,7 @@ impl TableFunctionImpl for GenerateSeriesFuncImpl {
pub struct GenerateSeriesFunc {}

impl TableFunctionImpl for GenerateSeriesFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
fn call(&self, exprs: &[(Expr, Option<String>)]) -> Result<Arc<dyn TableProvider>> {
let impl_func = GenerateSeriesFuncImpl {
name: "generate_series",
include_end: true,
Expand All @@ -270,7 +270,7 @@ impl TableFunctionImpl for GenerateSeriesFunc {
pub struct RangeFunc {}

impl TableFunctionImpl for RangeFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
fn call(&self, exprs: &[(Expr, Option<String>)]) -> Result<Arc<dyn TableProvider>> {
let impl_func = GenerateSeriesFuncImpl {
name: "range",
include_end: false,
Expand Down
19 changes: 17 additions & 2 deletions datafusion/sql/src/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,34 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
let args = func_args
.args
.into_iter()
.flat_map(|arg| {
.map(|arg| {
if let FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) = arg
{
self.sql_expr_to_logical_expr(
expr,
&DFSchema::empty(),
planner_context,
)
.map(|expr| (expr, None))
} else if let FunctionArg::Named { name, arg, .. } = arg {
if let FunctionArgExpr::Expr(expr) = arg {
self.sql_expr_to_logical_expr(
expr,
&DFSchema::empty(),
planner_context,
)
.map(|expr| (expr, Some(name.to_string())))
} else {
plan_err!(
"Unsupported function argument type: {:?}",
arg
)
}
} else {
plan_err!("Unsupported function argument type: {:?}", arg)
}
})
.collect::<Vec<_>>();
.collect::<Result<Vec<_>>>()?;
let provider = self
.context_provider
.get_table_function_source(&tbl_func_name, args)?;
Expand Down
Loading