diff --git a/Cargo.lock b/Cargo.lock index b61a08a470dd..ddea7e14b29f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2195,6 +2195,7 @@ dependencies = [ "nix", "object_store", "prost", + "serde_json", "tempfile", "test-utils", "tokio", diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index b31708a5c1cc..66d15f16d418 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -70,6 +70,7 @@ log = { workspace = true } mimalloc = { version = "0.1", default-features = false } object_store = { workspace = true, features = ["aws", "http"] } prost = { workspace = true } +serde_json = { workspace = true } tempfile = { workspace = true } test-utils = { path = "../test-utils" } tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] } diff --git a/datafusion-examples/examples/default_column_values.rs b/datafusion-examples/examples/default_column_values.rs new file mode 100644 index 000000000000..b0270ffd8a3c --- /dev/null +++ b/datafusion-examples/examples/default_column_values.rs @@ -0,0 +1,398 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::collections::HashMap; +use std::sync::Arc; + +use arrow::array::RecordBatch; +use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef}; +use async_trait::async_trait; + +use datafusion::assert_batches_eq; +use datafusion::catalog::memory::DataSourceExec; +use datafusion::catalog::{Session, TableProvider}; +use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion::common::DFSchema; +use datafusion::common::{Result, ScalarValue}; +use datafusion::datasource::listing::PartitionedFile; +use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource}; +use datafusion::execution::context::SessionContext; +use datafusion::execution::object_store::ObjectStoreUrl; +use datafusion::logical_expr::utils::conjunction; +use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType}; +use datafusion::parquet::arrow::ArrowWriter; +use datafusion::parquet::file::properties::WriterProperties; +use datafusion::physical_expr::expressions::{CastExpr, Column, Literal}; +use datafusion::physical_expr::schema_rewriter::{ + DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory, +}; +use datafusion::physical_expr::PhysicalExpr; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::{lit, SessionConfig}; +use futures::StreamExt; +use object_store::memory::InMemory; +use object_store::path::Path; +use object_store::{ObjectStore, PutPayload}; + +// Metadata key for storing default values in field metadata +const DEFAULT_VALUE_METADATA_KEY: &str = "example.default_value"; + +// Example showing how to implement custom default value handling for missing columns +// using field metadata and PhysicalExprAdapter. +// +// This example demonstrates how to: +// 1. Store default values in field metadata using a constant key +// 2. Create a custom PhysicalExprAdapter that reads these defaults +// 3. Inject default values for missing columns in filter predicates +// 4. Use the DefaultPhysicalExprAdapter as a fallback for standard schema adaptation +// 5. Wrap string default values in cast expressions for proper type conversion +// +// Important: PhysicalExprAdapter is specifically designed for rewriting filter predicates +// that get pushed down to file scans. For handling missing columns in projections, +// other mechanisms in DataFusion are used (like SchemaAdapter). +// +// The metadata-based approach provides a flexible way to store default values as strings +// and cast them to the appropriate types at query time. + +#[tokio::main] +async fn main() -> Result<()> { + println!("=== Creating example data with missing columns and default values ==="); + + // Create sample data where the logical schema has more columns than the physical schema + let (logical_schema, physical_schema, batch) = create_sample_data_with_defaults(); + + let store = InMemory::new(); + let buf = { + let mut buf = vec![]; + + let props = WriterProperties::builder() + .set_max_row_group_size(2) + .build(); + + let mut writer = + ArrowWriter::try_new(&mut buf, physical_schema.clone(), Some(props)) + .expect("creating writer"); + + writer.write(&batch).expect("Writing batch"); + writer.close().unwrap(); + buf + }; + let path = Path::from("example.parquet"); + let payload = PutPayload::from_bytes(buf.into()); + store.put(&path, payload).await?; + + // Create a custom table provider that handles missing columns with defaults + let table_provider = Arc::new(DefaultValueTableProvider::new(logical_schema)); + + // Set up query execution + let mut cfg = SessionConfig::new(); + cfg.options_mut().execution.parquet.pushdown_filters = true; + let ctx = SessionContext::new_with_config(cfg); + + // Register our table + ctx.register_table("example_table", table_provider)?; + + ctx.runtime_env().register_object_store( + ObjectStoreUrl::parse("memory://")?.as_ref(), + Arc::new(store), + ); + + println!("\n=== Demonstrating default value injection in filter predicates ==="); + let query = "SELECT id, name FROM example_table WHERE status = 'active' ORDER BY id"; + println!("Query: {query}"); + println!("Note: The 'status' column doesn't exist in the physical schema,"); + println!( + "but our adapter injects the default value 'active' for the filter predicate." + ); + + let batches = ctx.sql(query).await?.collect().await?; + + #[rustfmt::skip] + let expected = [ + "+----+-------+", + "| id | name |", + "+----+-------+", + "| 1 | Alice |", + "| 2 | Bob |", + "| 3 | Carol |", + "+----+-------+", + ]; + arrow::util::pretty::print_batches(&batches)?; + assert_batches_eq!(expected, &batches); + + println!("\n=== Key Insight ==="); + println!("This example demonstrates how PhysicalExprAdapter works:"); + println!("1. Physical schema only has 'id' and 'name' columns"); + println!("2. Logical schema has 'id', 'name', 'status', and 'priority' columns with defaults"); + println!("3. Our custom adapter intercepts filter expressions on missing columns"); + println!("4. Default values from metadata are injected as cast expressions"); + println!("5. The DefaultPhysicalExprAdapter handles other schema adaptations"); + println!("\nNote: PhysicalExprAdapter is specifically for filter predicates."); + println!("For projection columns, different mechanisms handle missing columns."); + + Ok(()) +} + +/// Create sample data with a logical schema that has default values in metadata +/// and a physical schema that's missing some columns +fn create_sample_data_with_defaults() -> (SchemaRef, SchemaRef, RecordBatch) { + // Create metadata for default values + let mut status_metadata = HashMap::new(); + status_metadata.insert(DEFAULT_VALUE_METADATA_KEY.to_string(), "active".to_string()); + + let mut priority_metadata = HashMap::new(); + priority_metadata.insert(DEFAULT_VALUE_METADATA_KEY.to_string(), "1".to_string()); + + // The logical schema includes all columns with their default values in metadata + // Note: We make the columns with defaults nullable to allow the default adapter to handle them + let logical_schema = Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + Field::new("status", DataType::Utf8, true).with_metadata(status_metadata), + Field::new("priority", DataType::Int32, true).with_metadata(priority_metadata), + ]); + + // The physical schema only has some columns (simulating missing columns in storage) + let physical_schema = Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ]); + + // Create sample data for the physical schema + let batch = RecordBatch::try_new( + Arc::new(physical_schema.clone()), + vec![ + Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])), + Arc::new(arrow::array::StringArray::from(vec![ + "Alice", "Bob", "Carol", + ])), + ], + ) + .unwrap(); + + (Arc::new(logical_schema), Arc::new(physical_schema), batch) +} + +/// Custom TableProvider that uses DefaultValuePhysicalExprAdapter +#[derive(Debug)] +struct DefaultValueTableProvider { + schema: SchemaRef, +} + +impl DefaultValueTableProvider { + fn new(schema: SchemaRef) -> Self { + Self { schema } + } +} + +#[async_trait] +impl TableProvider for DefaultValueTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> Result> { + Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) + } + + async fn scan( + &self, + state: &dyn Session, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> Result> { + let schema = self.schema.clone(); + let df_schema = DFSchema::try_from(schema.clone())?; + let filter = state.create_physical_expr( + conjunction(filters.iter().cloned()).unwrap_or_else(|| lit(true)), + &df_schema, + )?; + + let parquet_source = ParquetSource::default() + .with_predicate(filter) + .with_pushdown_filters(true); + + let object_store_url = ObjectStoreUrl::parse("memory://")?; + let store = state.runtime_env().object_store(object_store_url)?; + + let mut files = vec![]; + let mut listing = store.list(None); + while let Some(file) = listing.next().await { + if let Ok(file) = file { + files.push(file); + } + } + + let file_group = files + .iter() + .map(|file| PartitionedFile::new(file.location.clone(), file.size)) + .collect(); + + let file_scan_config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("memory://")?, + self.schema.clone(), + Arc::new(parquet_source), + ) + .with_projection(projection.cloned()) + .with_limit(limit) + .with_file_group(file_group) + .with_expr_adapter(Arc::new(DefaultValuePhysicalExprAdapterFactory) as _); + + Ok(Arc::new(DataSourceExec::new(Arc::new( + file_scan_config.build(), + )))) + } +} + +/// Factory for creating DefaultValuePhysicalExprAdapter instances +#[derive(Debug)] +struct DefaultValuePhysicalExprAdapterFactory; + +impl PhysicalExprAdapterFactory for DefaultValuePhysicalExprAdapterFactory { + fn create( + &self, + logical_file_schema: SchemaRef, + physical_file_schema: SchemaRef, + ) -> Arc { + let default_factory = DefaultPhysicalExprAdapterFactory; + let default_adapter = default_factory + .create(logical_file_schema.clone(), physical_file_schema.clone()); + + Arc::new(DefaultValuePhysicalExprAdapter { + logical_file_schema, + physical_file_schema, + default_adapter, + partition_values: Vec::new(), + }) + } +} + +/// Custom PhysicalExprAdapter that handles missing columns with default values from metadata +/// and wraps DefaultPhysicalExprAdapter for standard schema adaptation +#[derive(Debug)] +struct DefaultValuePhysicalExprAdapter { + logical_file_schema: SchemaRef, + physical_file_schema: SchemaRef, + default_adapter: Arc, + partition_values: Vec<(FieldRef, ScalarValue)>, +} + +impl PhysicalExprAdapter for DefaultValuePhysicalExprAdapter { + fn rewrite(&self, expr: Arc) -> Result> { + // First try our custom default value injection for missing columns + let rewritten = expr + .transform(|expr| { + self.inject_default_values( + expr, + &self.logical_file_schema, + &self.physical_file_schema, + ) + }) + .data()?; + + // Then apply the default adapter as a fallback to handle standard schema differences + // like type casting, partition column handling, etc. + let default_adapter = if !self.partition_values.is_empty() { + self.default_adapter + .with_partition_values(self.partition_values.clone()) + } else { + self.default_adapter.clone() + }; + + default_adapter.rewrite(rewritten) + } + + fn with_partition_values( + &self, + partition_values: Vec<(FieldRef, ScalarValue)>, + ) -> Arc { + Arc::new(DefaultValuePhysicalExprAdapter { + logical_file_schema: self.logical_file_schema.clone(), + physical_file_schema: self.physical_file_schema.clone(), + default_adapter: self.default_adapter.clone(), + partition_values, + }) + } +} + +impl DefaultValuePhysicalExprAdapter { + fn inject_default_values( + &self, + expr: Arc, + logical_file_schema: &Schema, + physical_file_schema: &Schema, + ) -> Result>> { + if let Some(column) = expr.as_any().downcast_ref::() { + let column_name = column.name(); + + // Check if this column exists in the physical schema + if physical_file_schema.index_of(column_name).is_err() { + // Column is missing from physical schema, check if logical schema has a default + if let Ok(logical_field) = + logical_file_schema.field_with_name(column_name) + { + if let Some(default_value_str) = + logical_field.metadata().get(DEFAULT_VALUE_METADATA_KEY) + { + // Create a string literal and wrap it in a cast expression + let default_literal = self.create_default_value_expr( + default_value_str, + logical_field.data_type(), + )?; + return Ok(Transformed::yes(default_literal)); + } + } + } + } + + // No transformation needed + Ok(Transformed::no(expr)) + } + + fn create_default_value_expr( + &self, + value_str: &str, + data_type: &DataType, + ) -> Result> { + // Create a string literal with the default value + let string_literal = + Arc::new(Literal::new(ScalarValue::Utf8(Some(value_str.to_string())))); + + // If the target type is already Utf8, return the string literal directly + if matches!(data_type, DataType::Utf8) { + return Ok(string_literal); + } + + // Otherwise, wrap the string literal in a cast expression + let cast_expr = Arc::new(CastExpr::new(string_literal, data_type.clone(), None)); + + Ok(cast_expr) + } +} diff --git a/datafusion-examples/examples/json_shredding.rs b/datafusion-examples/examples/json_shredding.rs new file mode 100644 index 000000000000..ba9158f6913e --- /dev/null +++ b/datafusion-examples/examples/json_shredding.rs @@ -0,0 +1,480 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use arrow::array::{RecordBatch, StringArray}; +use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef}; +use async_trait::async_trait; + +use datafusion::assert_batches_eq; +use datafusion::catalog::memory::DataSourceExec; +use datafusion::catalog::{Session, TableProvider}; +use datafusion::common::tree_node::{ + Transformed, TransformedResult, TreeNode, TreeNodeRecursion, +}; +use datafusion::common::{assert_contains, DFSchema, Result}; +use datafusion::datasource::listing::PartitionedFile; +use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource}; +use datafusion::execution::context::SessionContext; +use datafusion::execution::object_store::ObjectStoreUrl; +use datafusion::logical_expr::utils::conjunction; +use datafusion::logical_expr::{ + ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, + TableProviderFilterPushDown, TableType, Volatility, +}; +use datafusion::parquet::arrow::ArrowWriter; +use datafusion::parquet::file::properties::WriterProperties; +use datafusion::physical_expr::schema_rewriter::{ + DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory, +}; +use datafusion::physical_expr::PhysicalExpr; +use datafusion::physical_expr::{expressions, ScalarFunctionExpr}; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::{lit, SessionConfig}; +use datafusion::scalar::ScalarValue; +use futures::StreamExt; +use object_store::memory::InMemory; +use object_store::path::Path; +use object_store::{ObjectStore, PutPayload}; + +// Example showing how to implement custom filter rewriting for JSON shredding. +// +// JSON shredding is a technique for optimizing queries on semi-structured data +// by materializing commonly accessed fields into separate columns for better +// columnar storage performance. +// +// In this example, we have a table with both: +// - Original JSON data: data: '{"age": 30}' +// - Shredded flat columns: _data.name: "Alice" (extracted from JSON) +// +// Our custom TableProvider uses a PhysicalExprAdapter to rewrite +// expressions like `json_get_str('name', data)` to use the pre-computed +// flat column `_data.name` when available. This allows the query engine to: +// 1. Push down predicates for better filtering +// 2. Avoid expensive JSON parsing at query time +// 3. Leverage columnar storage benefits for the materialized fields +#[tokio::main] +async fn main() -> Result<()> { + println!("=== Creating example data with flat columns and underscore prefixes ==="); + + // Create sample data with flat columns using underscore prefixes + let (table_schema, batch) = create_sample_data(); + + let store = InMemory::new(); + let buf = { + let mut buf = vec![]; + + let props = WriterProperties::builder() + .set_max_row_group_size(2) + .build(); + + let mut writer = ArrowWriter::try_new(&mut buf, batch.schema(), Some(props)) + .expect("creating writer"); + + writer.write(&batch).expect("Writing batch"); + writer.close().unwrap(); + buf + }; + let path = Path::from("example.parquet"); + let payload = PutPayload::from_bytes(buf.into()); + store.put(&path, payload).await?; + + // Create a custom table provider that rewrites struct field access + let table_provider = Arc::new(ExampleTableProvider::new(table_schema)); + + // Set up query execution + let mut cfg = SessionConfig::new(); + cfg.options_mut().execution.parquet.pushdown_filters = true; + let ctx = SessionContext::new_with_config(cfg); + + // Register our table + ctx.register_table("structs", table_provider)?; + ctx.register_udf(ScalarUDF::new_from_impl(JsonGetStr::default())); + + ctx.runtime_env().register_object_store( + ObjectStoreUrl::parse("memory://")?.as_ref(), + Arc::new(store), + ); + + println!("\n=== Showing all data ==="); + let batches = ctx.sql("SELECT * FROM structs").await?.collect().await?; + arrow::util::pretty::print_batches(&batches)?; + + println!("\n=== Running query with flat column access and filter ==="); + let query = "SELECT json_get_str('age', data) as age FROM structs WHERE json_get_str('name', data) = 'Bob'"; + println!("Query: {query}"); + + let batches = ctx.sql(query).await?.collect().await?; + + #[rustfmt::skip] + let expected = [ + "+-----+", + "| age |", + "+-----+", + "| 25 |", + "+-----+", + ]; + arrow::util::pretty::print_batches(&batches)?; + assert_batches_eq!(expected, &batches); + + println!("\n=== Running explain analyze to confirm row group pruning ==="); + + let batches = ctx + .sql(&format!("EXPLAIN ANALYZE {query}")) + .await? + .collect() + .await?; + let plan = format!("{}", arrow::util::pretty::pretty_format_batches(&batches)?); + println!("{plan}"); + assert_contains!(&plan, "row_groups_pruned_statistics=1"); + assert_contains!(&plan, "pushdown_rows_pruned=1"); + + Ok(()) +} + +/// Create the example data with flat columns using underscore prefixes. +/// +/// This demonstrates the logical data structure: +/// - Table schema: What users see (just the 'data' JSON column) +/// - File schema: What's physically stored (both 'data' and materialized '_data.name') +/// +/// The naming convention uses underscore prefixes to indicate shredded columns: +/// - `data` -> original JSON column +/// - `_data.name` -> materialized field from JSON data.name +fn create_sample_data() -> (SchemaRef, RecordBatch) { + // The table schema only has the main data column - this is what users query against + let table_schema = Schema::new(vec![Field::new("data", DataType::Utf8, false)]); + + // The file schema has both the main column and the shredded flat column with underscore prefix + // This represents the actual physical storage with pre-computed columns + let file_schema = Schema::new(vec![ + Field::new("data", DataType::Utf8, false), // Original JSON data + Field::new("_data.name", DataType::Utf8, false), // Materialized name field + ]); + + let batch = create_sample_record_batch(&file_schema); + + (Arc::new(table_schema), batch) +} + +/// Create the actual RecordBatch with sample data +fn create_sample_record_batch(file_schema: &Schema) -> RecordBatch { + // Build a RecordBatch with flat columns + let data_array = StringArray::from(vec![ + r#"{"age": 30}"#, + r#"{"age": 25}"#, + r#"{"age": 35}"#, + r#"{"age": 22}"#, + ]); + let names_array = StringArray::from(vec!["Alice", "Bob", "Charlie", "Dave"]); + + RecordBatch::try_new( + Arc::new(file_schema.clone()), + vec![Arc::new(data_array), Arc::new(names_array)], + ) + .unwrap() +} + +/// Custom TableProvider that uses a StructFieldRewriter +#[derive(Debug)] +struct ExampleTableProvider { + schema: SchemaRef, +} + +impl ExampleTableProvider { + fn new(schema: SchemaRef) -> Self { + Self { schema } + } +} + +#[async_trait] +impl TableProvider for ExampleTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> Result> { + // Implementers can choose to mark these filters as exact or inexact. + // If marked as exact they cannot have false positives and must always be applied. + // If marked as Inexact they can have false positives and at runtime the rewriter + // can decide to not rewrite / ignore some filters since they will be re-evaluated upstream. + // For the purposes of this example we mark them as Exact to demonstrate the rewriter is working and the filtering is not being re-evaluated upstream. + Ok(vec![TableProviderFilterPushDown::Exact; filters.len()]) + } + + async fn scan( + &self, + state: &dyn Session, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> Result> { + let schema = self.schema.clone(); + let df_schema = DFSchema::try_from(schema.clone())?; + let filter = state.create_physical_expr( + conjunction(filters.iter().cloned()).unwrap_or_else(|| lit(true)), + &df_schema, + )?; + + let parquet_source = ParquetSource::default() + .with_predicate(filter) + .with_pushdown_filters(true); + + let object_store_url = ObjectStoreUrl::parse("memory://")?; + + let store = state.runtime_env().object_store(object_store_url)?; + + let mut files = vec![]; + let mut listing = store.list(None); + while let Some(file) = listing.next().await { + if let Ok(file) = file { + files.push(file); + } + } + + let file_group = files + .iter() + .map(|file| PartitionedFile::new(file.location.clone(), file.size)) + .collect(); + + let file_scan_config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("memory://")?, + schema, + Arc::new(parquet_source), + ) + .with_projection(projection.cloned()) + .with_limit(limit) + .with_file_group(file_group) + // if the rewriter needs a reference to the table schema you can bind self.schema() here + .with_expr_adapter(Arc::new(ShreddedJsonRewriterFactory) as _); + + Ok(Arc::new(DataSourceExec::new(Arc::new( + file_scan_config.build(), + )))) + } +} + +/// Scalar UDF that uses serde_json to access json fields +#[derive(Debug)] +pub struct JsonGetStr { + signature: Signature, + aliases: [String; 1], +} + +impl Default for JsonGetStr { + fn default() -> Self { + Self { + signature: Signature::variadic_any(Volatility::Immutable), + aliases: ["json_get_str".to_string()], + } + } +} + +impl ScalarUDFImpl for JsonGetStr { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + self.aliases[0].as_str() + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Utf8) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + assert!( + args.args.len() == 2, + "json_get_str requires exactly 2 arguments" + ); + let key = match &args.args[0] { + ColumnarValue::Scalar(ScalarValue::Utf8(Some(key))) => key, + _ => { + return Err(datafusion::error::DataFusionError::Execution( + "json_get_str first argument must be a string".to_string(), + )) + } + }; + // We expect a string array that contains JSON strings + let json_array = match &args.args[1] { + ColumnarValue::Array(array) => array + .as_any() + .downcast_ref::() + .ok_or_else(|| { + datafusion::error::DataFusionError::Execution( + "json_get_str second argument must be a string array".to_string(), + ) + })?, + _ => { + return Err(datafusion::error::DataFusionError::Execution( + "json_get_str second argument must be a string array".to_string(), + )) + } + }; + let values = json_array + .iter() + .map(|value| { + value.and_then(|v| { + let json_value: serde_json::Value = + serde_json::from_str(v).unwrap_or_default(); + json_value.get(key).map(|v| v.to_string()) + }) + }) + .collect::(); + Ok(ColumnarValue::Array(Arc::new(values))) + } + + fn aliases(&self) -> &[String] { + &self.aliases + } +} + +/// Factory for creating ShreddedJsonRewriter instances +#[derive(Debug)] +struct ShreddedJsonRewriterFactory; + +impl PhysicalExprAdapterFactory for ShreddedJsonRewriterFactory { + fn create( + &self, + logical_file_schema: SchemaRef, + physical_file_schema: SchemaRef, + ) -> Arc { + let default_factory = DefaultPhysicalExprAdapterFactory; + let default_adapter = default_factory + .create(logical_file_schema.clone(), physical_file_schema.clone()); + + Arc::new(ShreddedJsonRewriter { + logical_file_schema, + physical_file_schema, + default_adapter, + partition_values: Vec::new(), + }) + } +} + +/// Rewriter that converts json_get_str calls to direct flat column references +/// and wraps DefaultPhysicalExprAdapter for standard schema adaptation +#[derive(Debug)] +struct ShreddedJsonRewriter { + logical_file_schema: SchemaRef, + physical_file_schema: SchemaRef, + default_adapter: Arc, + partition_values: Vec<(FieldRef, ScalarValue)>, +} + +impl PhysicalExprAdapter for ShreddedJsonRewriter { + fn rewrite(&self, expr: Arc) -> Result> { + // First try our custom JSON shredding rewrite + let rewritten = expr + .transform(|expr| self.rewrite_impl(expr, &self.physical_file_schema)) + .data()?; + + // Then apply the default adapter as a fallback to handle standard schema differences + // like type casting, missing columns, and partition column handling + let default_adapter = if !self.partition_values.is_empty() { + self.default_adapter + .with_partition_values(self.partition_values.clone()) + } else { + self.default_adapter.clone() + }; + + default_adapter.rewrite(rewritten) + } + + fn with_partition_values( + &self, + partition_values: Vec<(FieldRef, ScalarValue)>, + ) -> Arc { + Arc::new(ShreddedJsonRewriter { + logical_file_schema: self.logical_file_schema.clone(), + physical_file_schema: self.physical_file_schema.clone(), + default_adapter: self.default_adapter.clone(), + partition_values, + }) + } +} + +impl ShreddedJsonRewriter { + fn rewrite_impl( + &self, + expr: Arc, + physical_file_schema: &Schema, + ) -> Result>> { + if let Some(func) = expr.as_any().downcast_ref::() { + if func.name() == "json_get_str" && func.args().len() == 2 { + // Get the key from the first argument + if let Some(literal) = func.args()[0] + .as_any() + .downcast_ref::() + { + if let ScalarValue::Utf8(Some(field_name)) = literal.value() { + // Get the column from the second argument + if let Some(column) = func.args()[1] + .as_any() + .downcast_ref::() + { + let column_name = column.name(); + // Check if there's a flat column with underscore prefix + let flat_column_name = format!("_{column_name}.{field_name}"); + + if let Ok(flat_field_index) = + physical_file_schema.index_of(&flat_column_name) + { + let flat_field = + physical_file_schema.field(flat_field_index); + + if flat_field.data_type() == &DataType::Utf8 { + // Replace the whole expression with a direct column reference + let new_expr = Arc::new(expressions::Column::new( + &flat_column_name, + flat_field_index, + )) + as Arc; + + return Ok(Transformed { + data: new_expr, + tnr: TreeNodeRecursion::Stop, + transformed: true, + }); + } + } + } + } + } + } + } + Ok(Transformed::no(expr)) + } +} diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index b39ec3929f97..e7cb1e061e8a 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -33,8 +33,8 @@ use arrow::datatypes::{FieldRef, SchemaRef, TimeUnit}; use arrow::error::ArrowError; use datafusion_common::{exec_err, DataFusionError, Result}; use datafusion_datasource::PartitionedFile; +use datafusion_physical_expr::schema_rewriter::PhysicalExprAdapterFactory; use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; -use datafusion_physical_expr::PhysicalExprSchemaRewriter; use datafusion_physical_expr_common::physical_expr::{ is_dynamic_physical_expr, PhysicalExpr, }; @@ -42,6 +42,7 @@ use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, MetricBu use datafusion_pruning::{build_pruning_predicate, FilePruner, PruningPredicate}; use futures::{StreamExt, TryStreamExt}; +use itertools::Itertools; use log::debug; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use parquet::arrow::async_reader::AsyncFileReader; @@ -92,6 +93,8 @@ pub(super) struct ParquetOpener { pub coerce_int96: Option, /// Optional parquet FileDecryptionProperties pub file_decryption_properties: Option>, + /// Rewrite expressions in the context of the file schema + pub expr_adapter: Arc, } impl FileOpener for ParquetOpener { @@ -119,7 +122,7 @@ impl FileOpener for ParquetOpener { let schema_adapter = self .schema_adapter_factory .create(projected_schema, Arc::clone(&self.logical_file_schema)); - let predicate = self.predicate.clone(); + let mut predicate = self.predicate.clone(); let logical_file_schema = Arc::clone(&self.logical_file_schema); let partition_fields = self.partition_fields.clone(); let reorder_predicates = self.reorder_filters; @@ -132,6 +135,8 @@ impl FileOpener for ParquetOpener { let predicate_creation_errors = MetricBuilder::new(&self.metrics) .global_counter("num_predicate_creation_errors"); + let expr_adapter = Arc::clone(&self.expr_adapter); + let mut enable_page_index = self.enable_page_index; let file_decryption_properties = self.file_decryption_properties.clone(); @@ -235,19 +240,20 @@ impl FileOpener for ParquetOpener { // Adapt the predicate to the physical file schema. // This evaluates missing columns and inserts any necessary casts. - let predicate = predicate + predicate = predicate .map(|p| { - PhysicalExprSchemaRewriter::new( - &physical_file_schema, - &logical_file_schema, - ) - .with_partition_columns( - partition_fields.to_vec(), - file.partition_values, - ) - .rewrite(p) - .map_err(ArrowError::from) - .map(|p| { + let partition_values = partition_fields + .iter() + .cloned() + .zip(file.partition_values) + .collect_vec(); + let adapter = expr_adapter + .create( + Arc::clone(&logical_file_schema), + Arc::clone(&physical_file_schema), + ) + .with_partition_values(partition_values); + adapter.rewrite(p).map_err(ArrowError::from).map(|p| { // After rewriting to the file schema, further simplifications may be possible. // For example, if `'a' = col_that_is_missing` becomes `'a' = NULL` that can then be simplified to `FALSE` // and we can avoid doing any more work on the file (bloom filters, loading the page index, etc.). @@ -525,7 +531,8 @@ mod test { }; use datafusion_expr::{col, lit}; use datafusion_physical_expr::{ - expressions::DynamicFilterPhysicalExpr, planner::logical2physical, PhysicalExpr, + expressions::DynamicFilterPhysicalExpr, planner::logical2physical, + schema_rewriter::DefaultPhysicalExprAdapterFactory, PhysicalExpr, }; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::{Stream, StreamExt}; @@ -631,6 +638,7 @@ mod test { enable_row_group_stats_pruning: true, coerce_int96: None, file_decryption_properties: None, + expr_adapter: Arc::new(DefaultPhysicalExprAdapterFactory), } }; @@ -716,6 +724,7 @@ mod test { enable_row_group_stats_pruning: true, coerce_int96: None, file_decryption_properties: None, + expr_adapter: Arc::new(DefaultPhysicalExprAdapterFactory), } }; @@ -817,6 +826,7 @@ mod test { enable_row_group_stats_pruning: true, coerce_int96: None, file_decryption_properties: None, + expr_adapter: Arc::new(DefaultPhysicalExprAdapterFactory), } }; let make_meta = || FileMeta { @@ -928,6 +938,7 @@ mod test { enable_row_group_stats_pruning: false, // note that this is false! coerce_int96: None, file_decryption_properties: None, + expr_adapter: Arc::new(DefaultPhysicalExprAdapterFactory), } }; @@ -1040,6 +1051,7 @@ mod test { enable_row_group_stats_pruning: true, coerce_int96: None, file_decryption_properties: None, + expr_adapter: Arc::new(DefaultPhysicalExprAdapterFactory), } }; diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 8ca36e7cd321..67a27b5b401a 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -39,6 +39,7 @@ use datafusion_common::{DataFusionError, Statistics}; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_physical_expr::conjunction; +use datafusion_physical_expr::schema_rewriter::DefaultPhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_plan::filter_pushdown::{ @@ -509,6 +510,10 @@ impl FileSource for ParquetSource { schema_adapter_factory, coerce_int96, file_decryption_properties, + expr_adapter: base_config + .expr_adapter + .clone() + .unwrap_or_else(|| Arc::new(DefaultPhysicalExprAdapterFactory)), }) } diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 431b6ab0bcf0..7e13f84ce0a5 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -53,6 +53,7 @@ use datafusion_execution::{ object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext, }; use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::schema_rewriter::PhysicalExprAdapterFactory; use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; @@ -188,6 +189,9 @@ pub struct FileScanConfig { /// Batch size while creating new batches /// Defaults to [`datafusion_common::config::ExecutionOptions`] batch_size. pub batch_size: Option, + /// Expression adapter used to adapt filters and projections that are pushed down into the scan + /// from the logical schema to the physical schema of the file. + pub expr_adapter: Option>, } /// A builder for [`FileScanConfig`]'s. @@ -265,6 +269,7 @@ pub struct FileScanConfigBuilder { file_compression_type: Option, new_lines_in_values: Option, batch_size: Option, + expr_adapter: Option>, } impl FileScanConfigBuilder { @@ -293,6 +298,7 @@ impl FileScanConfigBuilder { table_partition_cols: vec![], constraints: None, batch_size: None, + expr_adapter: None, } } @@ -401,6 +407,20 @@ impl FileScanConfigBuilder { self } + /// Register an expression adapter used to adapt filters and projections that are pushed down into the scan + /// from the logical schema to the physical schema of the file. + /// This can include things like: + /// - Column ordering changes + /// - Handling of missing columns + /// - Rewriting expression to use pre-computed values or file format specific optimizations + pub fn with_expr_adapter( + mut self, + expr_adapter: Arc, + ) -> Self { + self.expr_adapter = Some(expr_adapter); + self + } + /// Build the final [`FileScanConfig`] with all the configured settings. /// /// This method takes ownership of the builder and returns the constructed `FileScanConfig`. @@ -420,6 +440,7 @@ impl FileScanConfigBuilder { file_compression_type, new_lines_in_values, batch_size, + expr_adapter, } = self; let constraints = constraints.unwrap_or_default(); @@ -446,6 +467,7 @@ impl FileScanConfigBuilder { file_compression_type, new_lines_in_values, batch_size, + expr_adapter, } } } @@ -466,6 +488,7 @@ impl From for FileScanConfigBuilder { table_partition_cols: config.table_partition_cols, constraints: Some(config.constraints), batch_size: config.batch_size, + expr_adapter: config.expr_adapter, } } } @@ -679,6 +702,7 @@ impl FileScanConfig { new_lines_in_values: false, file_source: Arc::clone(&file_source), batch_size: None, + expr_adapter: None, } } diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 03fc77f156d9..845c358d7e58 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -70,7 +70,7 @@ pub use datafusion_physical_expr_common::sort_expr::{ pub use planner::{create_physical_expr, create_physical_exprs}; pub use scalar_function::ScalarFunctionExpr; -pub use schema_rewriter::PhysicalExprSchemaRewriter; +pub use schema_rewriter::DefaultPhysicalExprAdapter; pub use utils::{conjunction, conjunction_opt, split_conjunction}; // For backwards compatibility diff --git a/datafusion/physical-expr/src/schema_rewriter.rs b/datafusion/physical-expr/src/schema_rewriter.rs index b8759ea16d6e..ca3e4ffa20c4 100644 --- a/datafusion/physical-expr/src/schema_rewriter.rs +++ b/datafusion/physical-expr/src/schema_rewriter.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use arrow::compute::can_cast_types; -use arrow::datatypes::{FieldRef, Schema}; +use arrow::datatypes::{FieldRef, Schema, SchemaRef}; use datafusion_common::{ exec_err, tree_node::{Transformed, TransformedResult, TreeNode}, @@ -30,69 +30,202 @@ use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use crate::expressions::{self, CastExpr, Column}; -/// Builder for rewriting physical expressions to match different schemas. +/// Trait for adapting physical expressions to match a target schema. +/// +/// This is used in file scans to rewrite expressions so that they can be evaluated +/// against the physical schema of the file being scanned. It allows for handling +/// differences between logical and physical schemas, such as type mismatches or missing columns. +/// +/// You can create a custom implemention of this trait to handle specific rewriting logic. +/// For example, to fill in missing columns with default values instead of nulls: +/// +/// ```rust +/// use datafusion_physical_expr::schema_rewriter::{PhysicalExprAdapter, PhysicalExprAdapterFactory}; +/// use arrow::datatypes::{Schema, Field, DataType, FieldRef, SchemaRef}; +/// use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +/// use datafusion_common::{Result, ScalarValue, tree_node::{Transformed, TransformedResult, TreeNode}}; +/// use datafusion_physical_expr::expressions::{self, Column}; +/// use std::sync::Arc; +/// +/// #[derive(Debug)] +/// pub struct CustomPhysicalExprAdapter { +/// logical_file_schema: SchemaRef, +/// physical_file_schema: SchemaRef, +/// } +/// +/// impl PhysicalExprAdapter for CustomPhysicalExprAdapter { +/// fn rewrite(&self, expr: Arc) -> Result> { +/// expr.transform(|expr| { +/// if let Some(column) = expr.as_any().downcast_ref::() { +/// // Check if the column exists in the physical schema +/// if self.physical_file_schema.index_of(column.name()).is_err() { +/// // If the column is missing, fill it with a default value instead of null +/// // The default value could be stored in the table schema's column metadata for example. +/// let default_value = ScalarValue::Int32(Some(0)); +/// return Ok(Transformed::yes(expressions::lit(default_value))); +/// } +/// } +/// // If the column exists, return it as is +/// Ok(Transformed::no(expr)) +/// }).data() +/// } +/// +/// fn with_partition_values( +/// &self, +/// partition_values: Vec<(FieldRef, ScalarValue)>, +/// ) -> Arc { +/// // For simplicity, this example ignores partition values +/// Arc::new(CustomPhysicalExprAdapter { +/// logical_file_schema: self.logical_file_schema.clone(), +/// physical_file_schema: self.physical_file_schema.clone(), +/// }) +/// } +/// } +/// +/// #[derive(Debug)] +/// pub struct CustomPhysicalExprAdapterFactory; +/// +/// impl PhysicalExprAdapterFactory for CustomPhysicalExprAdapterFactory { +/// fn create( +/// &self, +/// logical_file_schema: SchemaRef, +/// physical_file_schema: SchemaRef, +/// ) -> Arc { +/// Arc::new(CustomPhysicalExprAdapter { +/// logical_file_schema, +/// physical_file_schema, +/// }) +/// } +/// } +/// ``` +pub trait PhysicalExprAdapter: Send + Sync + std::fmt::Debug { + /// Rewrite a physical expression to match the target schema. + /// + /// This method should return a transformed expression that matches the target schema. + /// + /// Arguments: + /// - `expr`: The physical expression to rewrite. + /// - `logical_file_schema`: The logical schema of the table being queried, excluding any partition columns. + /// - `physical_file_schema`: The physical schema of the file being scanned. + /// - `partition_values`: Optional partition values to use for rewriting partition column references. + /// These are handled as if they were columns appended onto the logical file schema. + /// + /// Returns: + /// - `Arc`: The rewritten physical expression that can be evaluated against the physical schema. + fn rewrite(&self, expr: Arc) -> Result>; + + fn with_partition_values( + &self, + partition_values: Vec<(FieldRef, ScalarValue)>, + ) -> Arc; +} + +pub trait PhysicalExprAdapterFactory: Send + Sync + std::fmt::Debug { + /// Create a new instance of the physical expression adapter. + fn create( + &self, + logical_file_schema: SchemaRef, + physical_file_schema: SchemaRef, + ) -> Arc; +} + +#[derive(Debug, Clone)] +pub struct DefaultPhysicalExprAdapterFactory; + +impl PhysicalExprAdapterFactory for DefaultPhysicalExprAdapterFactory { + fn create( + &self, + logical_file_schema: SchemaRef, + physical_file_schema: SchemaRef, + ) -> Arc { + Arc::new(DefaultPhysicalExprAdapter { + logical_file_schema, + physical_file_schema, + partition_values: Vec::new(), + }) + } +} + +/// Default implementation for rewriting physical expressions to match different schemas. /// /// # Example /// /// ```rust -/// use datafusion_physical_expr::schema_rewriter::PhysicalExprSchemaRewriter; +/// use datafusion_physical_expr::schema_rewriter::{DefaultPhysicalExprAdapterFactory, PhysicalExprAdapterFactory}; /// use arrow::datatypes::Schema; +/// use std::sync::Arc; /// /// # fn example( /// # predicate: std::sync::Arc, /// # physical_file_schema: &Schema, /// # logical_file_schema: &Schema, /// # ) -> datafusion_common::Result<()> { -/// let rewriter = PhysicalExprSchemaRewriter::new(physical_file_schema, logical_file_schema); -/// let adapted_predicate = rewriter.rewrite(predicate)?; +/// let factory = DefaultPhysicalExprAdapterFactory; +/// let adapter = factory.create(Arc::new(logical_file_schema.clone()), Arc::new(physical_file_schema.clone())); +/// let adapted_predicate = adapter.rewrite(predicate)?; /// # Ok(()) /// # } /// ``` -pub struct PhysicalExprSchemaRewriter<'a> { - physical_file_schema: &'a Schema, - logical_file_schema: &'a Schema, - partition_fields: Vec, - partition_values: Vec, +#[derive(Debug, Clone)] +pub struct DefaultPhysicalExprAdapter { + logical_file_schema: SchemaRef, + physical_file_schema: SchemaRef, + partition_values: Vec<(FieldRef, ScalarValue)>, } -impl<'a> PhysicalExprSchemaRewriter<'a> { - /// Create a new schema rewriter with the given schemas - pub fn new( - physical_file_schema: &'a Schema, - logical_file_schema: &'a Schema, - ) -> Self { - Self { - physical_file_schema, - logical_file_schema, - partition_fields: Vec::new(), - partition_values: Vec::new(), - } +// impl PhysicalExprAdapter for DefaultPhysicalExprAdapter { +// /// Rewrite the given physical expression to match the target schema +// /// +// /// This method applies the following transformations: +// /// 1. Replaces partition column references with literal values +// /// 2. Handles missing columns by inserting null literals +// /// 3. Casts columns when logical and physical schemas have different types +// fn rewrite_to_file_schema( +// &self, +// expr: Arc, +// logical_file_schema: &Schema, +// physical_file_schema: &Schema, +// partition_values: &[(FieldRef, ScalarValue)], +// ) -> Result> { +// let rewriter = DefaultPhysicalExprAdapterRewriter { +// logical_file_schema, +// physical_file_schema, +// partition_fields: partition_values, +// }; +// expr.transform(|expr| rewriter.rewrite_expr(Arc::clone(&expr))) +// .data() +// } +// } + +impl PhysicalExprAdapter for DefaultPhysicalExprAdapter { + fn rewrite(&self, expr: Arc) -> Result> { + let rewriter = DefaultPhysicalExprAdapterRewriter { + logical_file_schema: &self.logical_file_schema, + physical_file_schema: &self.physical_file_schema, + partition_fields: &self.partition_values, + }; + expr.transform(|expr| rewriter.rewrite_expr(Arc::clone(&expr))) + .data() } - /// Add partition columns and their corresponding values - /// - /// When a column reference matches a partition field, it will be replaced - /// with the corresponding literal value from partition_values. - pub fn with_partition_columns( - mut self, - partition_fields: Vec, - partition_values: Vec, - ) -> Self { - self.partition_fields = partition_fields; - self.partition_values = partition_values; - self + fn with_partition_values( + &self, + partition_values: Vec<(FieldRef, ScalarValue)>, + ) -> Arc { + Arc::new(DefaultPhysicalExprAdapter { + partition_values, + ..self.clone() + }) } +} - /// Rewrite the given physical expression to match the target schema - /// - /// This method applies the following transformations: - /// 1. Replaces partition column references with literal values - /// 2. Handles missing columns by inserting null literals - /// 3. Casts columns when logical and physical schemas have different types - pub fn rewrite(&self, expr: Arc) -> Result> { - expr.transform(|expr| self.rewrite_expr(expr)).data() - } +struct DefaultPhysicalExprAdapterRewriter<'a> { + logical_file_schema: &'a Schema, + physical_file_schema: &'a Schema, + partition_fields: &'a [(FieldRef, ScalarValue)], +} +impl<'a> DefaultPhysicalExprAdapterRewriter<'a> { fn rewrite_expr( &self, expr: Arc, @@ -109,7 +242,7 @@ impl<'a> PhysicalExprSchemaRewriter<'a> { expr: Arc, column: &Column, ) -> Result>> { - // Get the logical field for this column + // Get the logical field for this column if it exists in the logical schema let logical_field = match self.logical_file_schema.field_with_name(column.name()) { Ok(field) => field, @@ -118,10 +251,22 @@ impl<'a> PhysicalExprSchemaRewriter<'a> { if let Some(partition_value) = self.get_partition_value(column.name()) { return Ok(Transformed::yes(expressions::lit(partition_value))); } - // If the column is not found in the logical schema and is not a partition value, return an error - // This should probably never be hit unless something upstream broke, but nontheless it's better - // for us to return a handleable error than to panic / do something unexpected. - return Err(e.into()); + // This can be hit if a custom rewrite injected a reference to a column that doesn't exist in the logical schema. + // For example, a pre-computed column that is kept only in the physical schema. + // If the column exists in the physical schema, we can still use it. + if let Ok(physical_field) = + self.physical_file_schema.field_with_name(column.name()) + { + // If the column exists in the physical schema, we can use it in place of the logical column. + // This is nice to users because if they do a rewrite that results in something like `phyiscal_int32_col = 123u64` + // we'll at least handle the casts for them. + physical_field + } else { + // A completely unknown column that doesn't exist in either schema! + // This should probably never be hit unless something upstream broke, but nontheless it's better + // for us to return a handleable error than to panic / do something unexpected. + return Err(e.into()); + } } }; @@ -190,7 +335,6 @@ impl<'a> PhysicalExprSchemaRewriter<'a> { fn get_partition_value(&self, column_name: &str) -> Option { self.partition_fields .iter() - .zip(self.partition_values.iter()) .find(|(field, _)| field.name() == column_name) .map(|(_, value)| value.clone()) } @@ -229,10 +373,11 @@ mod tests { fn test_rewrite_column_with_type_cast() { let (physical_schema, logical_schema) = create_test_schema(); - let rewriter = PhysicalExprSchemaRewriter::new(&physical_schema, &logical_schema); + let factory = DefaultPhysicalExprAdapterFactory; + let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema)); let column_expr = Arc::new(Column::new("a", 0)); - let result = rewriter.rewrite(column_expr).unwrap(); + let result = adapter.rewrite(column_expr).unwrap(); // Should be wrapped in a cast expression assert!(result.as_any().downcast_ref::().is_some()); @@ -241,7 +386,8 @@ mod tests { #[test] fn test_rewrite_mulit_column_expr_with_type_cast() { let (physical_schema, logical_schema) = create_test_schema(); - let rewriter = PhysicalExprSchemaRewriter::new(&physical_schema, &logical_schema); + let factory = DefaultPhysicalExprAdapterFactory; + let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema)); // Create a complex expression: (a + 5) OR (c > 0.0) that tests the recursive case of the rewriter let column_a = Arc::new(Column::new("a", 0)) as Arc; @@ -261,7 +407,7 @@ mod tests { )), ); - let result = rewriter.rewrite(Arc::new(expr)).unwrap(); + let result = adapter.rewrite(Arc::new(expr)).unwrap(); println!("Rewritten expression: {result}"); let expected = expressions::BinaryExpr::new( @@ -294,10 +440,11 @@ mod tests { fn test_rewrite_missing_column() -> Result<()> { let (physical_schema, logical_schema) = create_test_schema(); - let rewriter = PhysicalExprSchemaRewriter::new(&physical_schema, &logical_schema); + let factory = DefaultPhysicalExprAdapterFactory; + let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema)); let column_expr = Arc::new(Column::new("c", 2)); - let result = rewriter.rewrite(column_expr)?; + let result = adapter.rewrite(column_expr)?; // Should be replaced with a literal null if let Some(literal) = result.as_any().downcast_ref::() { @@ -313,15 +460,17 @@ mod tests { fn test_rewrite_partition_column() -> Result<()> { let (physical_schema, logical_schema) = create_test_schema(); - let partition_fields = - vec![Arc::new(Field::new("partition_col", DataType::Utf8, false))]; - let partition_values = vec![ScalarValue::Utf8(Some("test_value".to_string()))]; + let partition_field = + Arc::new(Field::new("partition_col", DataType::Utf8, false)); + let partition_value = ScalarValue::Utf8(Some("test_value".to_string())); + let partition_values = vec![(partition_field, partition_value)]; - let rewriter = PhysicalExprSchemaRewriter::new(&physical_schema, &logical_schema) - .with_partition_columns(partition_fields, partition_values); + let factory = DefaultPhysicalExprAdapterFactory; + let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema)); + let adapter = adapter.with_partition_values(partition_values); let column_expr = Arc::new(Column::new("partition_col", 0)); - let result = rewriter.rewrite(column_expr)?; + let result = adapter.rewrite(column_expr)?; // Should be replaced with the partition value if let Some(literal) = result.as_any().downcast_ref::() { @@ -340,10 +489,11 @@ mod tests { fn test_rewrite_no_change_needed() -> Result<()> { let (physical_schema, logical_schema) = create_test_schema(); - let rewriter = PhysicalExprSchemaRewriter::new(&physical_schema, &logical_schema); + let factory = DefaultPhysicalExprAdapterFactory; + let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema)); let column_expr = Arc::new(Column::new("b", 1)) as Arc; - let result = rewriter.rewrite(Arc::clone(&column_expr))?; + let result = adapter.rewrite(Arc::clone(&column_expr))?; // Should be the same expression (no transformation needed) // We compare the underlying pointer through the trait object @@ -363,10 +513,11 @@ mod tests { Field::new("b", DataType::Utf8, false), // Non-nullable missing column ]); - let rewriter = PhysicalExprSchemaRewriter::new(&physical_schema, &logical_schema); + let factory = DefaultPhysicalExprAdapterFactory; + let adapter = factory.create(Arc::new(logical_schema), Arc::new(physical_schema)); let column_expr = Arc::new(Column::new("b", 1)); - let result = rewriter.rewrite(column_expr); + let result = adapter.rewrite(column_expr); assert!(result.is_err()); assert!(result .unwrap_err() @@ -398,7 +549,7 @@ mod tests { } } - /// Example showing how we can use the `PhysicalExprSchemaRewriter` to adapt RecordBatches during a scan + /// Example showing how we can use the `DefaultPhysicalExprAdapter` to adapt RecordBatches during a scan /// to apply projections, type conversions and handling of missing columns all at once. #[test] fn test_adapt_batches() { @@ -420,11 +571,13 @@ mod tests { col("a", &logical_schema).unwrap(), ]; - let rewriter = PhysicalExprSchemaRewriter::new(&physical_schema, &logical_schema); + let factory = DefaultPhysicalExprAdapterFactory; + let adapter = + factory.create(Arc::clone(&logical_schema), Arc::clone(&physical_schema)); let adapted_projection = projection .into_iter() - .map(|expr| rewriter.rewrite(expr).unwrap()) + .map(|expr| adapter.rewrite(expr).unwrap()) .collect_vec(); let adapted_schema = Arc::new(Schema::new(