Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,9 @@ recursive = "0.1.1"
regex = "1.11"
rstest = "0.25.0"
serde_json = "1"
sqlparser = { version = "0.58.0", default-features = false, features = ["std", "visitor"] }
sqlparser = { git = "https://github.com/Embucket/datafusion-sqlparser-rs.git", rev = "28012078c09714542c95ddbad8203a282fa37cb2", features = [
"visitor",
] }
tempfile = "3"
testcontainers = { version = "0.24", features = ["default"] }
testcontainers-modules = { version = "0.12" }
Expand Down
Empty file added FETCH_HEAD
Empty file.
12 changes: 8 additions & 4 deletions datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,10 +321,14 @@ fn fixed_len_byte_array_to_string(val: Option<&FixedLenByteArray>) -> Option<Str
pub struct ParquetMetadataFunc {}

impl TableFunctionImpl for ParquetMetadataFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
fn call(&self, exprs: &[(Expr, Option<String>)]) -> Result<Arc<dyn TableProvider>> {
if exprs.is_empty() {
return plan_err!("parquet_metadata requires string argument as its input");
}

let filename = match exprs.first() {
Some(Expr::Literal(ScalarValue::Utf8(Some(s)), _)) => s, // single quote: parquet_metadata('x.parquet')
Some(Expr::Column(Column { name, .. })) => name, // double quote: parquet_metadata("x.parquet")
Some((Expr::Literal(ScalarValue::Utf8(Some(s)), _), _)) => s, // single quote: parquet_metadata('x.parquet')
Some((Expr::Column(Column { name, .. }), _)) => name, // double quote: parquet_metadata("x.parquet")
_ => {
return plan_err!(
"parquet_metadata requires string argument as its input"
Expand Down Expand Up @@ -510,7 +514,7 @@ impl MetadataCacheFunc {
}

impl TableFunctionImpl for MetadataCacheFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
fn call(&self, exprs: &[(Expr, Option<String>)]) -> Result<Arc<dyn TableProvider>> {
if !exprs.is_empty() {
return plan_err!("metadata_cache should have no arguments");
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion-examples/examples/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ async fn main() -> Result<()> {
write_out(&ctx).await?;
register_aggregate_test_data("t1", &ctx).await?;
register_aggregate_test_data("t2", &ctx).await?;
where_scalar_subquery(&ctx).await?;
where_in_subquery(&ctx).await?;
Box::pin(where_scalar_subquery(&ctx)).await?;
Box::pin(where_in_subquery(&ctx)).await?;
where_exist_subquery(&ctx).await?;
Ok(())
}
Expand Down
7 changes: 4 additions & 3 deletions datafusion-examples/examples/simple_udtf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,16 @@ impl TableProvider for LocalCsvTable {
struct LocalCsvTableFunc {}

impl TableFunctionImpl for LocalCsvTableFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
let Some(Expr::Literal(ScalarValue::Utf8(Some(ref path)), _)) = exprs.first()
fn call(&self, exprs: &[(Expr, Option<String>)]) -> Result<Arc<dyn TableProvider>> {
let Some((Expr::Literal(ScalarValue::Utf8(Some(ref path)), _), _)) =
exprs.first()
else {
return plan_err!("read_csv requires at least one string argument");
};

let limit = exprs
.get(1)
.map(|expr| {
.map(|(expr, _)| {
// try to simplify the expression, so 1+2 becomes 3, for example
let execution_props = ExecutionProps::new();
let info = SimplifyContext::new(&execution_props);
Expand Down
4 changes: 4 additions & 0 deletions datafusion/catalog/src/default_table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ impl TableSource for DefaultTableSource {
fn get_column_default(&self, column: &str) -> Option<&Expr> {
self.table_provider.get_column_default(column)
}

fn statistics(&self) -> Option<datafusion_common::Statistics> {
self.table_provider.statistics()
}
}

/// Wrap TableProvider in TableSource
Expand Down
7 changes: 5 additions & 2 deletions datafusion/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ pub trait TableProviderFactory: Debug + Sync + Send {
/// A trait for table function implementations
pub trait TableFunctionImpl: Debug + Sync + Send {
/// Create a table provider
fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>>;
fn call(&self, args: &[(Expr, Option<String>)]) -> Result<Arc<dyn TableProvider>>;
}

/// A table that uses a function to generate data
Expand Down Expand Up @@ -345,7 +345,10 @@ impl TableFunction {
}

/// Get the function implementation and generate a table
pub fn create_table_provider(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>> {
pub fn create_table_provider(
&self,
args: &[(Expr, Option<String>)],
) -> Result<Arc<dyn TableProvider>> {
self.fun.call(args)
}
}
26 changes: 18 additions & 8 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,9 @@ impl Session for SessionState {
}

impl SessionState {
pub(crate) fn resolve_table_ref(
/// Resolve a [`TableReference`] into a [`ResolvedTableReference`] using
/// the session's configured default catalog and schema.
pub fn resolve_table_ref(
&self,
table_ref: impl Into<TableReference>,
) -> ResolvedTableReference {
Expand Down Expand Up @@ -474,7 +476,8 @@ impl SessionState {
query.statement_to_plan(statement)
}

fn get_parser_options(&self) -> ParserOptions {
/// Get the parser options
pub fn get_parser_options(&self) -> ParserOptions {
let sql_parser_options = &self.config.options().sql_parser;

ParserOptions {
Expand Down Expand Up @@ -1634,9 +1637,11 @@ impl From<SessionState> for SessionStateBuilder {
///
/// This is used so the SQL planner can access the state of the session without
/// having a direct dependency on the [`SessionState`] struct (and core crate)
struct SessionContextProvider<'a> {
state: &'a SessionState,
tables: HashMap<ResolvedTableReference, Arc<dyn TableSource>>,
pub struct SessionContextProvider<'a> {
/// The session state
pub state: &'a SessionState,
/// The tables available in the session
pub tables: HashMap<ResolvedTableReference, Arc<dyn TableSource>>,
}

impl ContextProvider for SessionContextProvider<'_> {
Expand Down Expand Up @@ -1666,20 +1671,25 @@ impl ContextProvider for SessionContextProvider<'_> {
fn get_table_function_source(
&self,
name: &str,
args: Vec<Expr>,
args: Vec<(Expr, Option<String>)>,
) -> datafusion_common::Result<Arc<dyn TableSource>> {
let name = name.to_ascii_lowercase();
let tbl_func = self
.state
.table_functions
.get(name)
.get(&name)
.cloned()
.ok_or_else(|| plan_datafusion_err!("table function '{name}' not found"))?;
let dummy_schema = DFSchema::empty();
let simplifier =
ExprSimplifier::new(SessionSimplifyProvider::new(self.state, &dummy_schema));

let args = args
.into_iter()
.map(|arg| simplifier.simplify(arg))
.map(|(expr, named_param)| {
// simplify returns Result<Expr>, map it into Result<(Expr, Option<String>)>
simplifier.simplify(expr).map(|e| (e, named_param))
})
.collect::<datafusion_common::Result<Vec<_>>>()?;
let provider = tbl_func.create_table_provider(&args)?;

Expand Down
Loading
Loading