From 4d59f8799a551f04189a4a85ff8fe2d64a494b3b Mon Sep 17 00:00:00 2001 From: Florian Valeye Date: Wed, 13 Aug 2025 15:11:42 +0200 Subject: [PATCH 1/7] feat(datafusion): implement the project node to add the partition columns defined in Iceberg. Implement physical execution plan node that projects Iceberg partition columns from source data, supporting nested fields and all Iceberg transforms. --- .../datafusion/src/physical_plan/mod.rs | 1 + .../datafusion/src/physical_plan/project.rs | 661 ++++++++++++++++++ 2 files changed, 662 insertions(+) create mode 100644 crates/integrations/datafusion/src/physical_plan/project.rs diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs b/crates/integrations/datafusion/src/physical_plan/mod.rs index fcfd11a453..aa3535bd8a 100644 --- a/crates/integrations/datafusion/src/physical_plan/mod.rs +++ b/crates/integrations/datafusion/src/physical_plan/mod.rs @@ -18,6 +18,7 @@ pub(crate) mod commit; pub(crate) mod expr_to_predicate; pub(crate) mod metadata_scan; +pub(crate) mod project; pub(crate) mod scan; pub(crate) mod write; diff --git a/crates/integrations/datafusion/src/physical_plan/project.rs b/crates/integrations/datafusion/src/physical_plan/project.rs new file mode 100644 index 0000000000..25b0f1a0e4 --- /dev/null +++ b/crates/integrations/datafusion/src/physical_plan/project.rs @@ -0,0 +1,661 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; + +use datafusion::arrow::array::{ArrayRef, RecordBatch}; +use datafusion::arrow::datatypes::{Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; +use datafusion::common::Result as DFResult; +use datafusion::error::DataFusionError; +use datafusion::execution::{SendableRecordBatchStream, TaskContext}; +use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, + execute_input_stream, +}; +use futures::StreamExt; +use iceberg::spec::{PartitionSpec, Schema}; + +use crate::to_datafusion_error; + +/// Prefix for partition column names to avoid collisions with regular columns +const PARTITION_COLUMN_PREFIX: &str = "__partition_"; + +/// An execution plan node that calculates partition values for Iceberg tables. +/// +/// This execution plan takes input data from a child execution plan and adds partition columns +/// based on the table's partition specification. The partition values are computed by applying +/// the appropriate transforms to the source columns. +/// +/// The output schema includes all original columns plus additional partition columns. +#[derive(Debug, Clone)] +pub(crate) struct IcebergProjectExec { + input: Arc, + partition_spec: Arc, + table_schema: Arc, + output_schema: ArrowSchemaRef, + plan_properties: PlanProperties, +} + +/// IcebergProjectExec is responsible for calculating partition values for Iceberg tables. +/// It takes input data from a child execution plan and adds partition columns based on the table's +/// partition specification. The partition values are computed by applying the appropriate transforms +/// to the source columns. The output schema includes all original columns plus additional partition +/// columns. +impl IcebergProjectExec { + pub fn new( + input: Arc, + partition_spec: Arc, + table_schema: Arc, + ) -> DFResult { + let output_schema = + Self::create_output_schema(&input.schema(), &partition_spec, &table_schema)?; + let plan_properties = Self::compute_properties(&input, output_schema.clone()); + + Ok(Self { + input, + partition_spec, + table_schema, + output_schema, + plan_properties, + }) + } + + /// Compute the plan properties for this execution plan + fn compute_properties( + input: &Arc, + schema: ArrowSchemaRef, + ) -> PlanProperties { + PlanProperties::new( + EquivalenceProperties::new(schema), + input.output_partitioning().clone(), + EmissionType::Incremental, + Boundedness::Bounded, + ) + } + + /// Create the output schema by adding partition columns to the input schema + fn create_output_schema( + input_schema: &ArrowSchema, + partition_spec: &PartitionSpec, + table_schema: &Schema, + ) -> DFResult { + if partition_spec.is_unpartitioned() { + return Ok(Arc::new(input_schema.clone())); + } + + let mut fields: Vec> = input_schema.fields().to_vec(); + + let partition_struct = partition_spec + .partition_type(table_schema) + .map_err(to_datafusion_error)?; + + for (idx, pf) in partition_spec.fields().iter().enumerate() { + let struct_field = partition_struct.fields().get(idx).ok_or_else(|| { + DataFusionError::Internal( + "Partition field index out of bounds when creating output schema".to_string(), + ) + })?; + let arrow_type = iceberg::arrow::type_to_arrow_type(&struct_field.field_type) + .map_err(to_datafusion_error)?; + let partition_column_name = Self::create_partition_column_name(&pf.name); + let nullable = !struct_field.required; + fields.push(Arc::new(Field::new( + &partition_column_name, + arrow_type, + nullable, + ))); + } + Ok(Arc::new(ArrowSchema::new(fields))) + } + + /// Calculate partition values for a record batch + fn calculate_partition_values(&self, batch: &RecordBatch) -> DFResult> { + if self.partition_spec.is_unpartitioned() { + return Ok(vec![]); + } + + let batch_schema = batch.schema(); + let mut partition_values = Vec::with_capacity(self.partition_spec.fields().len()); + + for pf in self.partition_spec.fields() { + // Find the source field in the table schema + let source_field = self.table_schema.field_by_id(pf.source_id).ok_or_else(|| { + DataFusionError::Internal(format!( + "Source field not found with id {} when calculating partition values", + pf.source_id + )) + })?; + + let field_path = Self::find_field_path(&self.table_schema, source_field.id)?; + let index_path = Self::resolve_arrow_index_path(batch_schema.as_ref(), &field_path)?; + + let source_column = Self::extract_column_by_index_path(batch, &index_path)?; + + let transform_fn = iceberg::transform::create_transform_function(&pf.transform) + .map_err(to_datafusion_error)?; + let partition_value = transform_fn + .transform(source_column) + .map_err(to_datafusion_error)?; + + partition_values.push(partition_value); + } + Ok(partition_values) + } + + /// Extract a column by an index path + fn extract_column_by_index_path( + batch: &RecordBatch, + index_path: &[usize], + ) -> DFResult { + if index_path.is_empty() { + return Err(DataFusionError::Internal( + "Empty index path when extracting partition column".to_string(), + )); + } + + let mut current_column = batch.column(*index_path.first().unwrap()).clone(); + for child_index in &index_path[1..] { + // We only support traversing nested Structs. Provide explicit errors for unsupported + // nested container types to fail early and clearly. + let dt = current_column.data_type(); + match dt { + datafusion::arrow::datatypes::DataType::Struct(_) => { + let struct_array = current_column + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Internal(format!( + "Failed to downcast to StructArray while traversing index path {:?} for partition column extraction", + index_path + )) + })?; + current_column = struct_array.column(*child_index).clone(); + } + datafusion::arrow::datatypes::DataType::List(_) + | datafusion::arrow::datatypes::DataType::LargeList(_) + | datafusion::arrow::datatypes::DataType::FixedSizeList(_, _) + | datafusion::arrow::datatypes::DataType::Map(_, _) => { + return Err(DataFusionError::NotImplemented(format!( + "Partitioning on nested list/map types is not supported (encountered {:?}) while traversing index path {:?}", + dt, index_path + ))); + } + other => { + return Err(DataFusionError::Internal(format!( + "Expected struct array while traversing index path {:?} for partition column, got {:?}", + index_path, other + ))); + } + } + } + Ok(current_column) + } + + /// Find the path to a field by its ID (e.g., ["address", "city"]) in the Iceberg schema + fn find_field_path(table_schema: &Schema, field_id: i32) -> DFResult> { + let dotted = table_schema.name_by_field_id(field_id).ok_or_else(|| { + DataFusionError::Internal(format!( + "Field with ID {} not found in schema when building field path for partition column", + field_id + )) + })?; + Ok(dotted.split('.').map(|s| s.to_string()).collect()) + } + + fn resolve_arrow_index_path( + input_schema: &ArrowSchema, + field_path: &[String], + ) -> DFResult> { + if field_path.is_empty() { + return Err(DataFusionError::Internal( + "Empty field path when resolving arrow index path for partition column".to_string(), + )); + } + + let mut indices = Vec::with_capacity(field_path.len()); + let mut current_field = input_schema.field_with_name(&field_path[0]).map_err(|_| { + DataFusionError::Internal(format!( + "Top-level column '{}' not found in schema when resolving partition column path", + field_path[0] + )) + })?; + let top_index = input_schema.index_of(&field_path[0]).map_err(|_| { + DataFusionError::Internal(format!( + "Failed to get index of top-level column '{}' when resolving partition column path", + field_path[0] + )) + })?; + indices.push(top_index); + + for name in &field_path[1..] { + let dt = current_field.data_type(); + let struct_fields = match dt { + datafusion::arrow::datatypes::DataType::Struct(fields) => fields, + datafusion::arrow::datatypes::DataType::List(_) + | datafusion::arrow::datatypes::DataType::LargeList(_) + | datafusion::arrow::datatypes::DataType::FixedSizeList(_, _) => { + return Err(DataFusionError::NotImplemented(format!( + "Partitioning on nested list types is not supported while resolving nested field '{}' for partition column", + name + ))); + } + datafusion::arrow::datatypes::DataType::Map(_, _) => { + return Err(DataFusionError::NotImplemented(format!( + "Partitioning on nested map types is not supported while resolving nested field '{}' for partition column", + name + ))); + } + other => { + return Err(DataFusionError::Internal(format!( + "Expected struct type while resolving nested field '{}' for partition column, got {:?}", + name, other + ))); + } + }; + let child_index = struct_fields + .iter() + .position(|f| f.name() == name) + .ok_or_else(|| { + DataFusionError::Internal(format!( + "Field '{}' not found in struct when resolving partition column path", + name + )) + })?; + indices.push(child_index); + current_field = &struct_fields[child_index]; + } + + Ok(indices) + } + + /// Apply a naming convention for partition columns using spec alias, prefixed to avoid collisions + fn create_partition_column_name(partition_field_alias: &str) -> String { + format!("{}{}", PARTITION_COLUMN_PREFIX, partition_field_alias) + } + + /// Process a single batch by adding partition columns + fn process_batch(&self, batch: RecordBatch) -> DFResult { + if self.partition_spec.is_unpartitioned() { + return Ok(batch); + } + + let partition_arrays = self.calculate_partition_values(&batch)?; + + let mut all_columns = batch.columns().to_vec(); + all_columns.extend(partition_arrays); + + RecordBatch::try_new(Arc::clone(&self.output_schema), all_columns) + .map_err(|e| DataFusionError::ArrowError(e, None)) + } +} + +impl DisplayAs for IcebergProjectExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!( + f, + "IcebergProjectExec: partition_fields=[{}]", + self.partition_spec + .fields() + .iter() + .map(|pf| format!("{}({})", pf.transform, pf.name)) + .collect::>() + .join(", ") + ) + } + DisplayFormatType::Verbose => { + write!( + f, + "IcebergProjectExec: partition_fields=[{}], output_schema={:?}", + self.partition_spec + .fields() + .iter() + .map(|pf| format!("{}({})", pf.transform, pf.name)) + .collect::>() + .join(", "), + self.output_schema + ) + } + DisplayFormatType::TreeRender => { + write!( + f, + "IcebergProjectExec: partition_fields=[{}]", + self.partition_spec + .fields() + .iter() + .map(|pf| format!("{}({})", pf.transform, pf.name)) + .collect::>() + .join(", ") + ) + } + } + } +} + +impl ExecutionPlan for IcebergProjectExec { + fn name(&self) -> &str { + "IcebergProjectExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.plan_properties + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> DFResult> { + if children.len() != 1 { + return Err(DataFusionError::Internal(format!( + "IcebergProjectExec expects exactly one child, but provided {}", + children.len() + ))); + } + + Ok(Arc::new(Self::new( + Arc::clone(&children[0]), + Arc::clone(&self.partition_spec), + Arc::clone(&self.table_schema), + )?)) + } + + /// Executes the partition value calculation for the given partition. + /// + /// This processes input data from the child execution plan and adds calculated + /// partition columns based on the partition specification. + fn execute( + &self, + partition: usize, + context: Arc, + ) -> DFResult { + // Get input data stream + let input_stream = execute_input_stream( + Arc::clone(&self.input), + self.input.schema(), + partition, + Arc::clone(&context), + )?; + + if self.partition_spec.is_unpartitioned() { + return Ok(input_stream); + } + + let output_schema = Arc::clone(&self.output_schema); + let project_exec = Arc::new(self.clone()); + + let stream = input_stream.map(move |batch_result| { + let batch = batch_result?; + + project_exec.process_batch(batch) + }); + + Ok(Box::pin(RecordBatchStreamAdapter::new( + output_schema, + stream.boxed(), + ))) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use datafusion::arrow::array::{Int32Array, RecordBatch, StructArray}; + use datafusion::arrow::datatypes::{DataType, Field, Fields, Schema as ArrowSchema}; + use iceberg::spec::{NestedField, PrimitiveType, Schema, StructType, Transform, Type}; + use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + + use super::*; + + #[test] + fn test_create_output_schema_unpartitioned() { + // Create test schema + let arrow_schema = ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + ]); + + let table_schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + + // Create unpartitioned spec + let partition_spec = iceberg::spec::PartitionSpec::unpartition_spec(); + + // Test schema creation + let output_schema = + IcebergProjectExec::create_output_schema(&arrow_schema, &partition_spec, &table_schema) + .unwrap(); + + // For now, should be identical to input schema (pass-through) + assert_eq!(output_schema.fields().len(), 2); + assert_eq!(output_schema.field(0).name(), "id"); + assert_eq!(output_schema.field(1).name(), "name"); + } + + #[test] + fn test_create_output_schema_partitioned() { + // Create test schema + let arrow_schema = ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ]); + + let table_schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + + // Create partitioned spec + let partition_spec = iceberg::spec::PartitionSpec::builder(Arc::new(table_schema.clone())) + .add_partition_field("id", "id_partition", Transform::Identity) + .unwrap() + .build() + .unwrap(); + + // Test schema creation + let output_schema = + IcebergProjectExec::create_output_schema(&arrow_schema, &partition_spec, &table_schema) + .unwrap(); + + // Should have 3 fields: original 2 + 1 partition field + assert_eq!(output_schema.fields().len(), 3); + assert_eq!(output_schema.field(0).name(), "id"); + assert_eq!(output_schema.field(1).name(), "name"); + assert_eq!(output_schema.field(2).name(), "__partition_id_partition"); + } + + #[test] + fn test_partition_on_struct_field() { + // Test partitioning on a nested field within a struct (e.g., address.city) + let struct_fields = Fields::from(vec![ + Field::new("street", DataType::Utf8, false), + Field::new("city", DataType::Utf8, false), + ]); + + let arrow_schema = ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("address", DataType::Struct(struct_fields.clone()), false), + ]); + + // Create Iceberg schema with struct type and nested field IDs + let address_struct = StructType::new(vec![ + NestedField::required(3, "street", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(4, "city", Type::Primitive(PrimitiveType::String)).into(), + ]); + + let table_schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "address", Type::Struct(address_struct)).into(), + ]) + .build() + .unwrap(); + + // Create partitioned spec on the nested city field using dot notation + let partition_spec = iceberg::spec::PartitionSpec::builder(Arc::new(table_schema.clone())) + .add_partition_field("address.city", "city_partition", Transform::Identity) + .unwrap() + .build() + .unwrap(); + + // Test schema creation - should add partition column for the nested field + let output_schema = + IcebergProjectExec::create_output_schema(&arrow_schema, &partition_spec, &table_schema) + .unwrap(); + + // Should have 3 fields: id, address, and partition field for city + assert_eq!(output_schema.fields().len(), 3); + assert_eq!(output_schema.field(0).name(), "id"); + assert_eq!(output_schema.field(1).name(), "address"); + assert_eq!(output_schema.field(2).name(), "__partition_city_partition"); + } + + #[test] + fn test_process_batch_with_nested_struct_partition() { + // Test processing actual data with partitioning on nested struct field + let struct_fields = Fields::from(vec![ + Field::new("street", DataType::Utf8, false), + Field::new("city", DataType::Utf8, false), + ]); + + let arrow_schema = ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("address", DataType::Struct(struct_fields.clone()), false), + ]); + + // Create Iceberg schema with struct type and nested field IDs + let address_struct = StructType::new(vec![ + NestedField::required(3, "street", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(4, "city", Type::Primitive(PrimitiveType::String)).into(), + ]); + + let table_schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "address", Type::Struct(address_struct)).into(), + ]) + .build() + .unwrap(); + + // Create partitioned spec on the nested city field using dot notation + let partition_spec = iceberg::spec::PartitionSpec::builder(Arc::new(table_schema.clone())) + .add_partition_field("address.city", "city_partition", Transform::Identity) + .unwrap() + .build() + .unwrap(); + + // Create test data + let id_array = Arc::new(Int32Array::from(vec![1, 2, 3])); + + // Create struct array for addresses + let street_array = Arc::new(datafusion::arrow::array::StringArray::from(vec![ + "123 Main St", + "456 Oak Ave", + "789 Pine Rd", + ])); + let city_array = Arc::new(datafusion::arrow::array::StringArray::from(vec![ + "New York", + "Los Angeles", + "Chicago", + ])); + + let struct_array = StructArray::from(vec![ + ( + Arc::new(Field::new("street", DataType::Utf8, false)), + street_array as datafusion::arrow::array::ArrayRef, + ), + ( + Arc::new(Field::new("city", DataType::Utf8, false)), + city_array as datafusion::arrow::array::ArrayRef, + ), + ]); + + let batch = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ + id_array, + Arc::new(struct_array), + ]) + .unwrap(); + + // Create project exec + let project_exec = IcebergProjectExec::new( + Arc::new(datafusion::physical_plan::empty::EmptyExec::new(Arc::new( + arrow_schema, + ))), + Arc::new(partition_spec), + Arc::new(table_schema), + ) + .unwrap(); + + // Test processing the batch - this should extract city values from the struct + let result_batch = project_exec.process_batch(batch).unwrap(); + + // Verify the result + assert_eq!(result_batch.num_columns(), 3); // id, address, partition + assert_eq!(result_batch.num_rows(), 3); + + // Verify column names + assert_eq!(result_batch.schema().field(0).name(), "id"); + assert_eq!(result_batch.schema().field(1).name(), "address"); + assert_eq!( + result_batch.schema().field(2).name(), + "__partition_city_partition" + ); + + // Verify that the partition column contains the city values extracted from the struct + let partition_column = result_batch.column(2); + let partition_array = partition_column + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!(partition_array.value(0), "New York"); + assert_eq!(partition_array.value(1), "Los Angeles"); + assert_eq!(partition_array.value(2), "Chicago"); + } +} From 803199a86885b23932614c39ba70a182cdbf9ed0 Mon Sep 17 00:00:00 2001 From: Florian Valeye Date: Fri, 22 Aug 2025 13:55:26 +0200 Subject: [PATCH 2/7] feat(datafusion): adapt IcebergProjectExec to use one partition column containing all the partitions values --- .../datafusion/src/physical_plan/project.rs | 135 +++++++++++------- 1 file changed, 83 insertions(+), 52 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/project.rs b/crates/integrations/datafusion/src/physical_plan/project.rs index 25b0f1a0e4..0ae2aaa99c 100644 --- a/crates/integrations/datafusion/src/physical_plan/project.rs +++ b/crates/integrations/datafusion/src/physical_plan/project.rs @@ -19,8 +19,10 @@ use std::any::Any; use std::fmt::{Debug, Formatter}; use std::sync::Arc; -use datafusion::arrow::array::{ArrayRef, RecordBatch}; -use datafusion::arrow::datatypes::{Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; +use datafusion::arrow::array::{ArrayRef, RecordBatch, StructArray}; +use datafusion::arrow::datatypes::{ + DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, +}; use datafusion::common::Result as DFResult; use datafusion::error::DataFusionError; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; @@ -36,16 +38,16 @@ use iceberg::spec::{PartitionSpec, Schema}; use crate::to_datafusion_error; -/// Prefix for partition column names to avoid collisions with regular columns -const PARTITION_COLUMN_PREFIX: &str = "__partition_"; +/// Column name for the combined partition values struct +const PARTITION_VALUES_COLUMN: &str = "_iceberg_partition_values"; /// An execution plan node that calculates partition values for Iceberg tables. /// -/// This execution plan takes input data from a child execution plan and adds partition columns -/// based on the table's partition specification. The partition values are computed by applying -/// the appropriate transforms to the source columns. +/// This execution plan takes input data from a child execution plan and adds a single struct column +/// containing all partition values based on the table's partition specification. The partition values +/// are computed by applying the appropriate transforms to the source columns. /// -/// The output schema includes all original columns plus additional partition columns. +/// The output schema includes all original columns plus a single `_iceberg_partition_values` struct column. #[derive(Debug, Clone)] pub(crate) struct IcebergProjectExec { input: Arc, @@ -56,10 +58,12 @@ pub(crate) struct IcebergProjectExec { } /// IcebergProjectExec is responsible for calculating partition values for Iceberg tables. -/// It takes input data from a child execution plan and adds partition columns based on the table's -/// partition specification. The partition values are computed by applying the appropriate transforms -/// to the source columns. The output schema includes all original columns plus additional partition -/// columns. +/// It takes input data from a child execution plan and adds a single struct column containing +/// all partition values based on the table's partition specification. The partition values are +/// computed by applying the appropriate transforms to the source columns. The output schema +/// includes all original columns plus a single `_iceberg_partition_values` struct column. +/// This approach simplifies downstream repartitioning operations by providing a single column +/// that can be directly used for sorting and repartitioning. impl IcebergProjectExec { pub fn new( input: Arc, @@ -92,7 +96,7 @@ impl IcebergProjectExec { ) } - /// Create the output schema by adding partition columns to the input schema + /// Create the output schema by adding a single partition values struct column to the input schema fn create_output_schema( input_schema: &ArrowSchema, partition_spec: &PartitionSpec, @@ -104,38 +108,51 @@ impl IcebergProjectExec { let mut fields: Vec> = input_schema.fields().to_vec(); - let partition_struct = partition_spec + let partition_struct_type = partition_spec .partition_type(table_schema) .map_err(to_datafusion_error)?; - for (idx, pf) in partition_spec.fields().iter().enumerate() { - let struct_field = partition_struct.fields().get(idx).ok_or_else(|| { - DataFusionError::Internal( - "Partition field index out of bounds when creating output schema".to_string(), - ) - })?; - let arrow_type = iceberg::arrow::type_to_arrow_type(&struct_field.field_type) + // Convert the Iceberg struct type to Arrow struct type + let arrow_struct_type = + iceberg::arrow::type_to_arrow_type(&iceberg::spec::Type::Struct(partition_struct_type)) .map_err(to_datafusion_error)?; - let partition_column_name = Self::create_partition_column_name(&pf.name); - let nullable = !struct_field.required; - fields.push(Arc::new(Field::new( - &partition_column_name, - arrow_type, - nullable, - ))); - } + + // Add a single struct column containing all partition values + fields.push(Arc::new(Field::new( + PARTITION_VALUES_COLUMN, + arrow_struct_type, + false, // Partition values are generally not null + ))); + Ok(Arc::new(ArrowSchema::new(fields))) } - /// Calculate partition values for a record batch - fn calculate_partition_values(&self, batch: &RecordBatch) -> DFResult> { + /// Calculate partition values for a record batch and return as a single struct array + fn calculate_partition_values(&self, batch: &RecordBatch) -> DFResult> { if self.partition_spec.is_unpartitioned() { - return Ok(vec![]); + return Ok(None); } let batch_schema = batch.schema(); let mut partition_values = Vec::with_capacity(self.partition_spec.fields().len()); + // Get the expected struct fields from our output schema + let partition_column_field = self + .output_schema + .field_with_name(PARTITION_VALUES_COLUMN) + .map_err(|e| { + DataFusionError::Internal(format!("Partition column not found in schema: {}", e)) + })?; + + let expected_struct_fields = match partition_column_field.data_type() { + DataType::Struct(fields) => fields.clone(), + _ => { + return Err(DataFusionError::Internal( + "Partition column is not a struct type".to_string(), + )); + } + }; + for pf in self.partition_spec.fields() { // Find the source field in the table schema let source_field = self.table_schema.field_by_id(pf.source_id).ok_or_else(|| { @@ -158,7 +175,16 @@ impl IcebergProjectExec { partition_values.push(partition_value); } - Ok(partition_values) + + // Create struct array using the expected fields from the schema + let struct_array = StructArray::try_new( + expected_struct_fields, + partition_values, + None, // No null buffer for the struct array itself + ) + .map_err(|e| DataFusionError::ArrowError(e, None))?; + + Ok(Some(Arc::new(struct_array))) } /// Extract a column by an index path @@ -287,21 +313,18 @@ impl IcebergProjectExec { Ok(indices) } - /// Apply a naming convention for partition columns using spec alias, prefixed to avoid collisions - fn create_partition_column_name(partition_field_alias: &str) -> String { - format!("{}{}", PARTITION_COLUMN_PREFIX, partition_field_alias) - } - - /// Process a single batch by adding partition columns + /// Process a single batch by adding a partition values struct column fn process_batch(&self, batch: RecordBatch) -> DFResult { if self.partition_spec.is_unpartitioned() { return Ok(batch); } - let partition_arrays = self.calculate_partition_values(&batch)?; + let partition_array = self.calculate_partition_values(&batch)?; let mut all_columns = batch.columns().to_vec(); - all_columns.extend(partition_arrays); + if let Some(partition_array) = partition_array { + all_columns.push(partition_array); + } RecordBatch::try_new(Arc::clone(&self.output_schema), all_columns) .map_err(|e| DataFusionError::ArrowError(e, None)) @@ -501,11 +524,11 @@ mod tests { IcebergProjectExec::create_output_schema(&arrow_schema, &partition_spec, &table_schema) .unwrap(); - // Should have 3 fields: original 2 + 1 partition field + // Should have 3 fields: original 2 + 1 partition values struct assert_eq!(output_schema.fields().len(), 3); assert_eq!(output_schema.field(0).name(), "id"); assert_eq!(output_schema.field(1).name(), "name"); - assert_eq!(output_schema.field(2).name(), "__partition_id_partition"); + assert_eq!(output_schema.field(2).name(), "_iceberg_partition_values"); } #[test] @@ -548,11 +571,11 @@ mod tests { IcebergProjectExec::create_output_schema(&arrow_schema, &partition_spec, &table_schema) .unwrap(); - // Should have 3 fields: id, address, and partition field for city + // Should have 3 fields: id, address, and partition values struct assert_eq!(output_schema.fields().len(), 3); assert_eq!(output_schema.field(0).name(), "id"); assert_eq!(output_schema.field(1).name(), "address"); - assert_eq!(output_schema.field(2).name(), "__partition_city_partition"); + assert_eq!(output_schema.field(2).name(), "_iceberg_partition_values"); } #[test] @@ -636,7 +659,7 @@ mod tests { let result_batch = project_exec.process_batch(batch).unwrap(); // Verify the result - assert_eq!(result_batch.num_columns(), 3); // id, address, partition + assert_eq!(result_batch.num_columns(), 3); // id, address, partition_values assert_eq!(result_batch.num_rows(), 3); // Verify column names @@ -644,18 +667,26 @@ mod tests { assert_eq!(result_batch.schema().field(1).name(), "address"); assert_eq!( result_batch.schema().field(2).name(), - "__partition_city_partition" + "_iceberg_partition_values" ); - // Verify that the partition column contains the city values extracted from the struct + // Verify that the partition values struct contains the city values extracted from the struct let partition_column = result_batch.column(2); - let partition_array = partition_column + let partition_struct_array = partition_column + .as_any() + .downcast_ref::() + .unwrap(); + + // Get the city_partition field from the struct + let city_partition_array = partition_struct_array + .column_by_name("city_partition") + .unwrap() .as_any() .downcast_ref::() .unwrap(); - assert_eq!(partition_array.value(0), "New York"); - assert_eq!(partition_array.value(1), "Los Angeles"); - assert_eq!(partition_array.value(2), "Chicago"); + assert_eq!(city_partition_array.value(0), "New York"); + assert_eq!(city_partition_array.value(1), "Los Angeles"); + assert_eq!(city_partition_array.value(2), "Chicago"); } } From 531388f6e0807b87cf024308d975daa136e5be95 Mon Sep 17 00:00:00 2001 From: Florian Valeye Date: Wed, 24 Sep 2025 15:29:41 +0200 Subject: [PATCH 3/7] feat(datafusion): add the project_with_partition main entrypoint and use PhysicalExpr for partitions values calculation. Signed-off-by: Florian Valeye --- .../datafusion/src/physical_plan/mod.rs | 1 + .../datafusion/src/physical_plan/project.rs | 483 ++++++++++-------- 2 files changed, 266 insertions(+), 218 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs b/crates/integrations/datafusion/src/physical_plan/mod.rs index aa3535bd8a..ce923b8662 100644 --- a/crates/integrations/datafusion/src/physical_plan/mod.rs +++ b/crates/integrations/datafusion/src/physical_plan/mod.rs @@ -24,4 +24,5 @@ pub(crate) mod write; pub(crate) const DATA_FILES_COL_NAME: &str = "data_files"; +pub use project::project_with_partition; pub use scan::IcebergTableScan; diff --git a/crates/integrations/datafusion/src/physical_plan/project.rs b/crates/integrations/datafusion/src/physical_plan/project.rs index b357c88811..e5e3df61bd 100644 --- a/crates/integrations/datafusion/src/physical_plan/project.rs +++ b/crates/integrations/datafusion/src/physical_plan/project.rs @@ -15,125 +15,203 @@ // specific language governing permissions and limitations // under the License. -//! Utilities for calculating partition values for Iceberg tables. -//! -//! This module provides functions to calculate partition values from record batches -//! based on Iceberg partition specifications. These utilities are used when writing -//! data to partitioned Iceberg tables. +//! Partition value projection for Iceberg tables. use std::sync::Arc; use datafusion::arrow::array::{ArrayRef, RecordBatch, StructArray}; -use datafusion::arrow::datatypes::{ - DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, -}; +use datafusion::arrow::datatypes::{DataType, Schema as ArrowSchema}; use datafusion::common::Result as DFResult; use datafusion::error::DataFusionError; +use datafusion::physical_expr::PhysicalExpr; +use datafusion::physical_expr::expressions::Column; +use datafusion::physical_plan::projection::ProjectionExec; +use datafusion::physical_plan::{ColumnarValue, ExecutionPlan}; use iceberg::spec::{PartitionSpec, Schema}; +use iceberg::table::Table; use crate::to_datafusion_error; /// Column name for the combined partition values struct -#[allow(dead_code)] -pub(crate) const PARTITION_VALUES_COLUMN: &str = "_iceberg_partition_values"; +const PARTITION_VALUES_COLUMN: &str = "_partition"; -/// Create an output schema by adding a single partition values struct column to the input schema. -/// Returns the original schema unchanged if the table is unpartitioned. +/// Extends an ExecutionPlan with partition value calculations for Iceberg tables. +/// +/// This function takes an input ExecutionPlan and extends it with an additional column +/// containing calculated partition values based on the table's partition specification. +/// For unpartitioned tables, returns the original plan unchanged. +/// +/// # Arguments +/// * `input` - The input ExecutionPlan to extend +/// * `table` - The Iceberg table with partition specification +/// +/// # Returns +/// * `Ok(Arc)` - Extended plan with partition values column +/// * `Err` - If partition spec is not found or transformation fails #[allow(dead_code)] -pub(crate) fn create_schema_with_partition_columns( - input_schema: &ArrowSchema, - partition_spec: &PartitionSpec, - table_schema: &Schema, -) -> DFResult { +pub fn project_with_partition( + input: Arc, + table: &Table, +) -> DFResult> { + let metadata = table.metadata(); + let partition_spec = metadata + .partition_spec_by_id(metadata.default_partition_spec_id()) + .ok_or_else(|| DataFusionError::Internal("Default partition spec not found".to_string()))?; + let table_schema = metadata.current_schema(); + if partition_spec.is_unpartitioned() { - return Ok(Arc::new(input_schema.clone())); + return Ok(input); } - let mut fields: Vec> = input_schema.fields().to_vec(); + let input_schema = input.schema(); + let partition_type = build_partition_type(partition_spec, table_schema.as_ref())?; + let calculator = PartitionValueCalculator::new( + partition_spec.as_ref().clone(), + table_schema.as_ref().clone(), + partition_type, + ); - let partition_struct_type = partition_spec - .partition_type(table_schema) - .map_err(to_datafusion_error)?; + let mut projection_exprs: Vec<(Arc, String)> = Vec::new(); - let arrow_struct_type = - iceberg::arrow::type_to_arrow_type(&iceberg::spec::Type::Struct(partition_struct_type)) - .map_err(to_datafusion_error)?; + for (index, field) in input_schema.fields().iter().enumerate() { + let column_expr = Arc::new(Column::new(field.name(), index)); + projection_exprs.push((column_expr, field.name().clone())); + } - fields.push(Arc::new(Field::new( - PARTITION_VALUES_COLUMN, - arrow_struct_type, - false, // Partition values are generally not null - ))); + let partition_expr = Arc::new(PartitionExpr::new(calculator)); + projection_exprs.push((partition_expr, PARTITION_VALUES_COLUMN.to_string())); - Ok(Arc::new(ArrowSchema::new(fields))) + let projection = ProjectionExec::try_new(projection_exprs, input)?; + Ok(Arc::new(projection)) } -/// Calculate partition values for a record batch and return as a single struct array. -/// Returns None if the table is unpartitioned. -/// -/// # Arguments -/// * `batch` - The record batch to calculate partition values for -/// * `partition_spec` - The partition specification defining the partition fields -/// * `table_schema` - The Iceberg table schema -/// * `expected_partition_type` - The expected Arrow struct type for the partition values -#[allow(dead_code)] -pub(crate) fn calculate_partition_values( - batch: &RecordBatch, - partition_spec: &PartitionSpec, - table_schema: &Schema, - expected_partition_type: &DataType, -) -> DFResult> { - if partition_spec.is_unpartitioned() { - return Ok(None); +/// PhysicalExpr implementation for partition value calculation +#[derive(Debug, Clone, PartialEq, Eq)] +struct PartitionExpr { + calculator: PartitionValueCalculator, +} + +impl PartitionExpr { + fn new(calculator: PartitionValueCalculator) -> Self { + Self { calculator } + } +} + +impl PhysicalExpr for PartitionExpr { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn data_type(&self, _input_schema: &ArrowSchema) -> DFResult { + Ok(self.calculator.partition_type.clone()) + } + + fn nullable(&self, _input_schema: &ArrowSchema) -> DFResult { + Ok(false) + } + + fn evaluate(&self, batch: &RecordBatch) -> DFResult { + let array = self.calculator.calculate(batch)?; + Ok(ColumnarValue::Array(array)) + } + + fn children(&self) -> Vec<&Arc> { + vec![] } - let batch_schema = batch.schema(); - let mut partition_values = Vec::with_capacity(partition_spec.fields().len()); + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> DFResult> { + Ok(self) + } + + fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "partition_values") + } +} + +impl std::fmt::Display for PartitionExpr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "partition_values") + } +} + +impl std::hash::Hash for PartitionExpr { + fn hash(&self, state: &mut H) { + std::any::TypeId::of::().hash(state); + } +} + +/// Calculator for partition values in Iceberg tables +#[derive(Debug, Clone, PartialEq, Eq)] +struct PartitionValueCalculator { + partition_spec: PartitionSpec, + table_schema: Schema, + partition_type: DataType, +} - let expected_struct_fields = match expected_partition_type { - DataType::Struct(fields) => fields.clone(), - _ => { +impl PartitionValueCalculator { + fn new(partition_spec: PartitionSpec, table_schema: Schema, partition_type: DataType) -> Self { + Self { + partition_spec, + table_schema, + partition_type, + } + } + + fn calculate(&self, batch: &RecordBatch) -> DFResult { + if self.partition_spec.is_unpartitioned() { return Err(DataFusionError::Internal( - "Expected partition type must be a struct".to_string(), + "Cannot calculate partition values for unpartitioned table".to_string(), )); } - }; - for pf in partition_spec.fields() { - let source_field = table_schema.field_by_id(pf.source_id).ok_or_else(|| { - DataFusionError::Internal(format!( - "Source field not found with id {} when calculating partition values", - pf.source_id - )) - })?; + let batch_schema = batch.schema(); + let mut partition_values = Vec::with_capacity(self.partition_spec.fields().len()); + + let expected_struct_fields = match &self.partition_type { + DataType::Struct(fields) => fields.clone(), + _ => { + return Err(DataFusionError::Internal( + "Expected partition type must be a struct".to_string(), + )); + } + }; + + for pf in self.partition_spec.fields() { + let source_field = self.table_schema.field_by_id(pf.source_id).ok_or_else(|| { + DataFusionError::Internal(format!( + "Source field not found with id {} when calculating partition values", + pf.source_id + )) + })?; - let field_path = find_field_path(table_schema, source_field.id)?; - let index_path = resolve_arrow_index_path(batch_schema.as_ref(), &field_path)?; + let field_path = find_field_path(&self.table_schema, source_field.id)?; + let index_path = resolve_arrow_index_path(batch_schema.as_ref(), &field_path)?; - let source_column = extract_column_by_index_path(batch, &index_path)?; + let source_column = extract_column_by_index_path(batch, &index_path)?; - let transform_fn = iceberg::transform::create_transform_function(&pf.transform) - .map_err(to_datafusion_error)?; - let partition_value = transform_fn - .transform(source_column) - .map_err(to_datafusion_error)?; + let transform_fn = iceberg::transform::create_transform_function(&pf.transform) + .map_err(to_datafusion_error)?; + let partition_value = transform_fn + .transform(source_column) + .map_err(to_datafusion_error)?; - partition_values.push(partition_value); - } + partition_values.push(partition_value); + } - let struct_array = StructArray::try_new( - expected_struct_fields, - partition_values, - None, // No null buffer for the struct array itself - ) - .map_err(|e| DataFusionError::ArrowError(e, None))?; + let struct_array = StructArray::try_new( + expected_struct_fields, + partition_values, + None, // No null buffer for the struct array itself + ) + .map_err(|e| DataFusionError::ArrowError(e, None))?; - Ok(Some(Arc::new(struct_array))) + Ok(Arc::new(struct_array)) + } } -/// Extract a column from a record batch by following an index path. -/// The index path specifies the column indices to traverse for nested structures. -#[allow(dead_code)] fn extract_column_by_index_path(batch: &RecordBatch, index_path: &[usize]) -> DFResult { if index_path.is_empty() { return Err(DataFusionError::Internal( @@ -143,8 +221,6 @@ fn extract_column_by_index_path(batch: &RecordBatch, index_path: &[usize]) -> DF let mut current_column = batch.column(*index_path.first().unwrap()).clone(); for child_index in &index_path[1..] { - // We only support traversing nested Structs. Provide explicit errors for unsupported - // nested container types to fail early and clearly. let dt = current_column.data_type(); match dt { datafusion::arrow::datatypes::DataType::Struct(_) => { @@ -179,8 +255,6 @@ fn extract_column_by_index_path(batch: &RecordBatch, index_path: &[usize]) -> DF Ok(current_column) } -/// Find the path to a field by its ID (e.g., ["address", "city"]) in the Iceberg schema -#[allow(dead_code)] fn find_field_path(table_schema: &Schema, field_id: i32) -> DFResult> { let dotted = table_schema.name_by_field_id(field_id).ok_or_else(|| { DataFusionError::Internal(format!( @@ -191,8 +265,6 @@ fn find_field_path(table_schema: &Schema, field_id: i32) -> DFResult Ok(dotted.split('.').map(|s| s.to_string()).collect()) } -/// Resolve a field path to an index path in an Arrow schema -#[allow(dead_code)] fn resolve_arrow_index_path( input_schema: &ArrowSchema, field_path: &[String], @@ -259,27 +331,7 @@ fn resolve_arrow_index_path( Ok(indices) } -/// Add partition values struct column to a record batch. -/// Returns the original batch unchanged if the partition spec is unpartitioned. -#[allow(dead_code)] -pub(crate) fn add_partition_columns_to_batch( - batch: RecordBatch, - partition_values: Option, - output_schema: ArrowSchemaRef, -) -> DFResult { - if let Some(partition_array) = partition_values { - let mut all_columns = batch.columns().to_vec(); - all_columns.push(partition_array); - RecordBatch::try_new(output_schema, all_columns) - .map_err(|e| DataFusionError::ArrowError(e, None)) - } else { - Ok(batch) - } -} - -/// Get the expected Arrow DataType for partition values based on the partition spec -#[allow(dead_code)] -pub(crate) fn get_partition_struct_type( +fn build_partition_type( partition_spec: &PartitionSpec, table_schema: &Schema, ) -> DFResult { @@ -293,28 +345,15 @@ pub(crate) fn get_partition_struct_type( #[cfg(test)] mod tests { - use std::collections::HashMap; - - use datafusion::arrow::array::{Int32Array, RecordBatch, StructArray}; - use datafusion::arrow::datatypes::{DataType, Field, Fields, Schema as ArrowSchema}; - use iceberg::spec::{NestedField, PrimitiveType, Schema, StructType, Transform, Type}; - use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + use datafusion::arrow::array::Int32Array; + use datafusion::arrow::datatypes::{Field, Fields}; + use datafusion::physical_plan::empty::EmptyExec; + use iceberg::spec::{NestedField, PrimitiveType, StructType, Transform, Type}; use super::*; #[test] - fn test_create_output_schema_unpartitioned() { - let arrow_schema = ArrowSchema::new(vec![ - Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "1".to_string(), - )])), - Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "2".to_string(), - )])), - ]); - + fn test_partition_calculator_basic() { let table_schema = Schema::builder() .with_schema_id(0) .with_fields(vec![ @@ -324,24 +363,24 @@ mod tests { .build() .unwrap(); - let partition_spec = iceberg::spec::PartitionSpec::unpartition_spec(); + let partition_spec = iceberg::spec::PartitionSpec::builder(Arc::new(table_schema.clone())) + .add_partition_field("id", "id_partition", Transform::Identity) + .unwrap() + .build() + .unwrap(); - let output_schema = - create_schema_with_partition_columns(&arrow_schema, &partition_spec, &table_schema) - .unwrap(); + let partition_type = build_partition_type(&partition_spec, &table_schema).unwrap(); + let calculator = PartitionValueCalculator::new( + partition_spec.clone(), + table_schema, + partition_type.clone(), + ); - assert_eq!(output_schema.fields().len(), 2); - assert_eq!(output_schema.field(0).name(), "id"); - assert_eq!(output_schema.field(1).name(), "name"); + assert_eq!(calculator.partition_type, partition_type); } #[test] - fn test_create_output_schema_partitioned() { - let arrow_schema = ArrowSchema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("name", DataType::Utf8, false), - ]); - + fn test_partition_expr_with_projection() { let table_schema = Schema::builder() .with_schema_id(0) .with_fields(vec![ @@ -357,70 +396,93 @@ mod tests { .build() .unwrap(); - let output_schema = - create_schema_with_partition_columns(&arrow_schema, &partition_spec, &table_schema) - .unwrap(); + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + + let input = Arc::new(EmptyExec::new(arrow_schema.clone())); - assert_eq!(output_schema.fields().len(), 3); - assert_eq!(output_schema.field(0).name(), "id"); - assert_eq!(output_schema.field(1).name(), "name"); - assert_eq!(output_schema.field(2).name(), "_iceberg_partition_values"); - } + let partition_type = build_partition_type(&partition_spec, &table_schema).unwrap(); + let calculator = + PartitionValueCalculator::new(partition_spec, table_schema, partition_type); - #[test] - fn test_partition_on_struct_field() { - let struct_fields = Fields::from(vec![ - Field::new("street", DataType::Utf8, false), - Field::new("city", DataType::Utf8, false), - ]); + let mut projection_exprs: Vec<(Arc, String)> = Vec::new(); + for (i, field) in arrow_schema.fields().iter().enumerate() { + let column_expr = Arc::new(Column::new(field.name(), i)); + projection_exprs.push((column_expr, field.name().clone())); + } - let arrow_schema = ArrowSchema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("address", DataType::Struct(struct_fields.clone()), false), - ]); + let partition_expr = Arc::new(PartitionExpr::new(calculator)); + projection_exprs.push((partition_expr, PARTITION_VALUES_COLUMN.to_string())); - let address_struct = StructType::new(vec![ - NestedField::required(3, "street", Type::Primitive(PrimitiveType::String)).into(), - NestedField::required(4, "city", Type::Primitive(PrimitiveType::String)).into(), - ]); + let projection = ProjectionExec::try_new(projection_exprs, input).unwrap(); + let result = Arc::new(projection); + assert_eq!(result.schema().fields().len(), 3); + assert_eq!(result.schema().field(0).name(), "id"); + assert_eq!(result.schema().field(1).name(), "name"); + assert_eq!(result.schema().field(2).name(), "_partition"); + } + + #[test] + fn test_partition_expr_evaluate() { let table_schema = Schema::builder() .with_schema_id(0) .with_fields(vec![ NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), - NestedField::required(2, "address", Type::Struct(address_struct)).into(), + NestedField::required(2, "data", Type::Primitive(PrimitiveType::String)).into(), ]) .build() .unwrap(); let partition_spec = iceberg::spec::PartitionSpec::builder(Arc::new(table_schema.clone())) - .add_partition_field("address.city", "city_partition", Transform::Identity) + .add_partition_field("id", "id_partition", Transform::Identity) .unwrap() .build() .unwrap(); - let output_schema = - create_schema_with_partition_columns(&arrow_schema, &partition_spec, &table_schema) - .unwrap(); + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("data", DataType::Utf8, false), + ])); + + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![ + Arc::new(Int32Array::from(vec![10, 20, 30])), + Arc::new(datafusion::arrow::array::StringArray::from(vec![ + "a", "b", "c", + ])), + ]) + .unwrap(); - assert_eq!(output_schema.fields().len(), 3); - assert_eq!(output_schema.field(0).name(), "id"); - assert_eq!(output_schema.field(1).name(), "address"); - assert_eq!(output_schema.field(2).name(), "_iceberg_partition_values"); + let partition_type = build_partition_type(&partition_spec, &table_schema).unwrap(); + let calculator = + PartitionValueCalculator::new(partition_spec, table_schema, partition_type.clone()); + let expr = PartitionExpr::new(calculator); + + assert_eq!(expr.data_type(&arrow_schema).unwrap(), partition_type); + assert!(!expr.nullable(&arrow_schema).unwrap()); + + let result = expr.evaluate(&batch).unwrap(); + match result { + ColumnarValue::Array(array) => { + let struct_array = array.as_any().downcast_ref::().unwrap(); + let id_partition = struct_array + .column_by_name("id_partition") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_partition.value(0), 10); + assert_eq!(id_partition.value(1), 20); + assert_eq!(id_partition.value(2), 30); + } + _ => panic!("Expected array result"), + } } #[test] - fn test_process_batch_with_nested_struct_partition() { - let struct_fields = Fields::from(vec![ - Field::new("street", DataType::Utf8, false), - Field::new("city", DataType::Utf8, false), - ]); - - let arrow_schema = ArrowSchema::new(vec![ - Field::new("id", DataType::Int32, false), - Field::new("address", DataType::Struct(struct_fields.clone()), false), - ]); - + fn test_nested_partition() { let address_struct = StructType::new(vec![ NestedField::required(3, "street", Type::Primitive(PrimitiveType::String)).into(), NestedField::required(4, "city", Type::Primitive(PrimitiveType::String)).into(), @@ -441,71 +503,56 @@ mod tests { .build() .unwrap(); - let id_array = Arc::new(Int32Array::from(vec![1, 2, 3])); + let struct_fields = Fields::from(vec![ + Field::new("street", DataType::Utf8, false), + Field::new("city", DataType::Utf8, false), + ]); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("address", DataType::Struct(struct_fields), false), + ])); let street_array = Arc::new(datafusion::arrow::array::StringArray::from(vec![ "123 Main St", "456 Oak Ave", - "789 Pine Rd", ])); let city_array = Arc::new(datafusion::arrow::array::StringArray::from(vec![ "New York", "Los Angeles", - "Chicago", ])); let struct_array = StructArray::from(vec![ ( Arc::new(Field::new("street", DataType::Utf8, false)), - street_array as datafusion::arrow::array::ArrayRef, + street_array as ArrayRef, ), ( Arc::new(Field::new("city", DataType::Utf8, false)), - city_array as datafusion::arrow::array::ArrayRef, + city_array as ArrayRef, ), ]); - let batch = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![ - id_array, + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![ + Arc::new(Int32Array::from(vec![1, 2])), Arc::new(struct_array), ]) .unwrap(); - let output_schema = - create_schema_with_partition_columns(&arrow_schema, &partition_spec, &table_schema) - .unwrap(); - - let partition_type = get_partition_struct_type(&partition_spec, &table_schema).unwrap(); - let partition_values = - calculate_partition_values(&batch, &partition_spec, &table_schema, &partition_type) - .unwrap(); - let result_batch = - add_partition_columns_to_batch(batch, partition_values, output_schema).unwrap(); - - assert_eq!(result_batch.num_columns(), 3); // id, address, partition_values - assert_eq!(result_batch.num_rows(), 3); - assert_eq!(result_batch.schema().field(0).name(), "id"); - assert_eq!(result_batch.schema().field(1).name(), "address"); - assert_eq!( - result_batch.schema().field(2).name(), - "_iceberg_partition_values" - ); - - let partition_column = result_batch.column(2); - let partition_struct_array = partition_column - .as_any() - .downcast_ref::() - .unwrap(); + let partition_type = build_partition_type(&partition_spec, &table_schema).unwrap(); + let calculator = + PartitionValueCalculator::new(partition_spec, table_schema, partition_type); + let array = calculator.calculate(&batch).unwrap(); - let city_partition_array = partition_struct_array + let struct_array = array.as_any().downcast_ref::().unwrap(); + let city_partition = struct_array .column_by_name("city_partition") .unwrap() .as_any() .downcast_ref::() .unwrap(); - assert_eq!(city_partition_array.value(0), "New York"); - assert_eq!(city_partition_array.value(1), "Los Angeles"); - assert_eq!(city_partition_array.value(2), "Chicago"); + assert_eq!(city_partition.value(0), "New York"); + assert_eq!(city_partition.value(1), "Los Angeles"); } } From bdc6aaea15a070b5759bbcaa074fd132ed49abba Mon Sep 17 00:00:00 2001 From: Florian Valeye Date: Thu, 25 Sep 2025 15:45:48 +0200 Subject: [PATCH 4/7] feat(datafusion): reuse RecordBatchProjector for project. Signed-off-by: Florian Valeye --- crates/iceberg/src/arrow/mod.rs | 3 +- .../src/arrow/record_batch_projector.rs | 82 +++++- .../datafusion/src/physical_plan/project.rs | 274 +++++++----------- 3 files changed, 186 insertions(+), 173 deletions(-) diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index d32cbeb8f9..28116a4b5e 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -28,7 +28,8 @@ pub mod delete_file_loader; pub(crate) mod delete_filter; mod reader; -pub(crate) mod record_batch_projector; +/// RecordBatch projection utilities +pub mod record_batch_projector; pub(crate) mod record_batch_transformer; mod value; diff --git a/crates/iceberg/src/arrow/record_batch_projector.rs b/crates/iceberg/src/arrow/record_batch_projector.rs index 7ca28c25c7..ced1d6df51 100644 --- a/crates/iceberg/src/arrow/record_batch_projector.rs +++ b/crates/iceberg/src/arrow/record_batch_projector.rs @@ -20,13 +20,15 @@ use std::sync::Arc; use arrow_array::{ArrayRef, RecordBatch, StructArray, make_array}; use arrow_buffer::NullBuffer; use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef}; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use crate::error::Result; +use crate::spec::Schema as IcebergSchema; use crate::{Error, ErrorKind}; /// Help to project specific field from `RecordBatch`` according to the fields id. -#[derive(Clone, Debug)] -pub(crate) struct RecordBatchProjector { +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct RecordBatchProjector { // A vector of vectors, where each inner vector represents the index path to access a specific field in a nested structure. // E.g. [[0], [1, 2]] means the first field is accessed directly from the first column, // while the second field is accessed from the second column and then from its third subcolumn (second column must be a struct column). @@ -77,6 +79,80 @@ impl RecordBatchProjector { }) } + /// Create RecordBatchProjector using Iceberg schema for field mapping. + /// + /// This constructor is more flexible and works with any Arrow schema by using + /// the Iceberg schema to map field names to field IDs. + /// + /// # Arguments + /// * `original_schema` - The original Arrow schema (doesn't need field ID metadata) + /// * `iceberg_schema` - The Iceberg schema for field ID mapping + /// * `target_field_ids` - The field IDs to project + pub fn from_iceberg_schema_mapping( + original_schema: SchemaRef, + iceberg_schema: Arc, + target_field_ids: &[i32], + ) -> Result { + let field_id_fetch_func = |field: &Field| -> Result> { + // First try to get field ID from metadata (Parquet case) + if let Some(value) = field.metadata().get(PARQUET_FIELD_ID_META_KEY) { + let field_id = value.parse::().map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + "Failed to parse field id".to_string(), + ) + .with_context("value", value) + .with_source(e) + })?; + return Ok(Some(field_id as i64)); + } + + // Fallback: use Iceberg schema's built-in field lookup + if let Some(iceberg_field) = iceberg_schema.field_by_name(field.name()) { + return Ok(Some(iceberg_field.id as i64)); + } + + // Additional fallback: for nested fields, we need to search recursively + fn find_field_id_in_struct( + struct_type: &crate::spec::StructType, + field_name: &str, + ) -> Option { + for field in struct_type.fields() { + if field.name == field_name { + return Some(field.id); + } + if let crate::spec::Type::Struct(nested_struct) = &*field.field_type { + if let Some(nested_id) = find_field_id_in_struct(nested_struct, field_name) + { + return Some(nested_id); + } + } + } + None + } + + // Search in nested structs + for iceberg_field in iceberg_schema.as_struct().fields() { + if let crate::spec::Type::Struct(struct_type) = &*iceberg_field.field_type { + if let Some(nested_id) = find_field_id_in_struct(struct_type, field.name()) { + return Ok(Some(nested_id as i64)); + } + } + } + + Ok(None) + }; + + let searchable_field_func = |_field: &Field| -> bool { true }; + + Self::new( + original_schema, + target_field_ids, + field_id_fetch_func, + searchable_field_func, + ) + } + fn fetch_field_index( fields: &Fields, index_vec: &mut Vec, @@ -129,7 +205,7 @@ impl RecordBatchProjector { } /// Do projection with columns - pub(crate) fn project_column(&self, batch: &[ArrayRef]) -> Result> { + pub fn project_column(&self, batch: &[ArrayRef]) -> Result> { self.field_indices .iter() .map(|index_vec| Self::get_column_by_field_index(batch, index_vec)) diff --git a/crates/integrations/datafusion/src/physical_plan/project.rs b/crates/integrations/datafusion/src/physical_plan/project.rs index e5e3df61bd..c8ef8381a4 100644 --- a/crates/integrations/datafusion/src/physical_plan/project.rs +++ b/crates/integrations/datafusion/src/physical_plan/project.rs @@ -17,7 +17,7 @@ //! Partition value projection for Iceberg tables. -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use datafusion::arrow::array::{ArrayRef, RecordBatch, StructArray}; use datafusion::arrow::datatypes::{DataType, Schema as ArrowSchema}; @@ -27,6 +27,7 @@ use datafusion::physical_expr::PhysicalExpr; use datafusion::physical_expr::expressions::Column; use datafusion::physical_plan::projection::ProjectionExec; use datafusion::physical_plan::{ColumnarValue, ExecutionPlan}; +use iceberg::arrow::record_batch_projector::RecordBatchProjector; use iceberg::spec::{PartitionSpec, Schema}; use iceberg::table::Table; @@ -48,15 +49,12 @@ const PARTITION_VALUES_COLUMN: &str = "_partition"; /// # Returns /// * `Ok(Arc)` - Extended plan with partition values column /// * `Err` - If partition spec is not found or transformation fails -#[allow(dead_code)] pub fn project_with_partition( input: Arc, table: &Table, ) -> DFResult> { let metadata = table.metadata(); - let partition_spec = metadata - .partition_spec_by_id(metadata.default_partition_spec_id()) - .ok_or_else(|| DataFusionError::Internal("Default partition spec not found".to_string()))?; + let partition_spec = metadata.default_partition_spec(); let table_schema = metadata.current_schema(); if partition_spec.is_unpartitioned() { @@ -69,9 +67,10 @@ pub fn project_with_partition( partition_spec.as_ref().clone(), table_schema.as_ref().clone(), partition_type, - ); + )?; - let mut projection_exprs: Vec<(Arc, String)> = Vec::new(); + let mut projection_exprs: Vec<(Arc, String)> = + Vec::with_capacity(input_schema.fields().len() + 1); for (index, field) in input_schema.fields().iter().enumerate() { let column_expr = Arc::new(Column::new(field.name(), index)); @@ -86,24 +85,40 @@ pub fn project_with_partition( } /// PhysicalExpr implementation for partition value calculation -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone)] struct PartitionExpr { - calculator: PartitionValueCalculator, + calculator: Arc>, } impl PartitionExpr { fn new(calculator: PartitionValueCalculator) -> Self { - Self { calculator } + Self { + calculator: Arc::new(Mutex::new(calculator)), + } + } +} + +// Manual PartialEq/Eq implementations for pointer-based equality +// (two PartitionExpr are equal if they share the same calculator instance) +impl PartialEq for PartitionExpr { + fn eq(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.calculator, &other.calculator) } } +impl Eq for PartitionExpr {} + impl PhysicalExpr for PartitionExpr { fn as_any(&self) -> &dyn std::any::Any { self } fn data_type(&self, _input_schema: &ArrowSchema) -> DFResult { - Ok(self.calculator.partition_type.clone()) + let calculator = self + .calculator + .lock() + .map_err(|e| DataFusionError::Internal(format!("Failed to lock calculator: {}", e)))?; + Ok(calculator.partition_type.clone()) } fn nullable(&self, _input_schema: &ArrowSchema) -> DFResult { @@ -111,7 +126,11 @@ impl PhysicalExpr for PartitionExpr { } fn evaluate(&self, batch: &RecordBatch) -> DFResult { - let array = self.calculator.calculate(batch)?; + let mut calculator = self + .calculator + .lock() + .map_err(|e| DataFusionError::Internal(format!("Failed to lock calculator: {}", e)))?; + let array = calculator.calculate(batch)?; Ok(ColumnarValue::Array(array)) } @@ -127,13 +146,33 @@ impl PhysicalExpr for PartitionExpr { } fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "partition_values") + if let Ok(calculator) = self.calculator.lock() { + let field_names: Vec = calculator + .partition_spec + .fields() + .iter() + .map(|pf| format!("{}({})", pf.transform, pf.name)) + .collect(); + write!(f, "iceberg_partition_values[{}]", field_names.join(", ")) + } else { + write!(f, "iceberg_partition_values") + } } } impl std::fmt::Display for PartitionExpr { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "partition_values") + if let Ok(calculator) = self.calculator.lock() { + let field_names: Vec<&str> = calculator + .partition_spec + .fields() + .iter() + .map(|pf| pf.name.as_str()) + .collect(); + write!(f, "iceberg_partition_values({})", field_names.join(", ")) + } else { + write!(f, "iceberg_partition_values") + } } } @@ -144,31 +183,57 @@ impl std::hash::Hash for PartitionExpr { } /// Calculator for partition values in Iceberg tables -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone)] struct PartitionValueCalculator { partition_spec: PartitionSpec, table_schema: Schema, partition_type: DataType, + projector: Option, } impl PartitionValueCalculator { - fn new(partition_spec: PartitionSpec, table_schema: Schema, partition_type: DataType) -> Self { - Self { + fn new( + partition_spec: PartitionSpec, + table_schema: Schema, + partition_type: DataType, + ) -> DFResult { + if partition_spec.is_unpartitioned() { + return Err(DataFusionError::Internal( + "Cannot create partition calculator for unpartitioned table".to_string(), + )); + } + + Ok(Self { partition_spec, table_schema, partition_type, - } + projector: None, + }) } - fn calculate(&self, batch: &RecordBatch) -> DFResult { - if self.partition_spec.is_unpartitioned() { - return Err(DataFusionError::Internal( - "Cannot calculate partition values for unpartitioned table".to_string(), - )); + fn calculate(&mut self, batch: &RecordBatch) -> DFResult { + if self.projector.is_none() { + let source_field_ids: Vec = self + .partition_spec + .fields() + .iter() + .map(|pf| pf.source_id) + .collect(); + + let projector = RecordBatchProjector::from_iceberg_schema_mapping( + batch.schema(), + Arc::new(self.table_schema.clone()), + &source_field_ids, + ) + .map_err(to_datafusion_error)?; + + self.projector = Some(projector); } - let batch_schema = batch.schema(); - let mut partition_values = Vec::with_capacity(self.partition_spec.fields().len()); + let projector = self.projector.as_ref().unwrap(); + let source_columns = projector + .project_column(batch.columns()) + .map_err(to_datafusion_error)?; let expected_struct_fields = match &self.partition_type { DataType::Struct(fields) => fields.clone(), @@ -179,158 +244,26 @@ impl PartitionValueCalculator { } }; - for pf in self.partition_spec.fields() { - let source_field = self.table_schema.field_by_id(pf.source_id).ok_or_else(|| { - DataFusionError::Internal(format!( - "Source field not found with id {} when calculating partition values", - pf.source_id - )) - })?; - - let field_path = find_field_path(&self.table_schema, source_field.id)?; - let index_path = resolve_arrow_index_path(batch_schema.as_ref(), &field_path)?; - - let source_column = extract_column_by_index_path(batch, &index_path)?; + let mut partition_values = Vec::with_capacity(self.partition_spec.fields().len()); + for (source_column, pf) in source_columns.iter().zip(self.partition_spec.fields()) { let transform_fn = iceberg::transform::create_transform_function(&pf.transform) .map_err(to_datafusion_error)?; + let partition_value = transform_fn - .transform(source_column) + .transform(source_column.clone()) .map_err(to_datafusion_error)?; partition_values.push(partition_value); } - let struct_array = StructArray::try_new( - expected_struct_fields, - partition_values, - None, // No null buffer for the struct array itself - ) - .map_err(|e| DataFusionError::ArrowError(e, None))?; + let struct_array = StructArray::try_new(expected_struct_fields, partition_values, None) + .map_err(|e| DataFusionError::ArrowError(e, None))?; Ok(Arc::new(struct_array)) } } -fn extract_column_by_index_path(batch: &RecordBatch, index_path: &[usize]) -> DFResult { - if index_path.is_empty() { - return Err(DataFusionError::Internal( - "Empty index path when extracting partition column".to_string(), - )); - } - - let mut current_column = batch.column(*index_path.first().unwrap()).clone(); - for child_index in &index_path[1..] { - let dt = current_column.data_type(); - match dt { - datafusion::arrow::datatypes::DataType::Struct(_) => { - let struct_array = current_column - .as_any() - .downcast_ref::() - .ok_or_else(|| { - DataFusionError::Internal(format!( - "Failed to downcast to StructArray while traversing index path {:?} for partition column extraction", - index_path - )) - })?; - current_column = struct_array.column(*child_index).clone(); - } - datafusion::arrow::datatypes::DataType::List(_) - | datafusion::arrow::datatypes::DataType::LargeList(_) - | datafusion::arrow::datatypes::DataType::FixedSizeList(_, _) - | datafusion::arrow::datatypes::DataType::Map(_, _) => { - return Err(DataFusionError::NotImplemented(format!( - "Partitioning on nested list/map types is not supported (encountered {:?}) while traversing index path {:?}", - dt, index_path - ))); - } - other => { - return Err(DataFusionError::Internal(format!( - "Expected struct array while traversing index path {:?} for partition column, got {:?}", - index_path, other - ))); - } - } - } - Ok(current_column) -} - -fn find_field_path(table_schema: &Schema, field_id: i32) -> DFResult> { - let dotted = table_schema.name_by_field_id(field_id).ok_or_else(|| { - DataFusionError::Internal(format!( - "Field with ID {} not found in schema when building field path for partition column", - field_id - )) - })?; - Ok(dotted.split('.').map(|s| s.to_string()).collect()) -} - -fn resolve_arrow_index_path( - input_schema: &ArrowSchema, - field_path: &[String], -) -> DFResult> { - if field_path.is_empty() { - return Err(DataFusionError::Internal( - "Empty field path when resolving arrow index path for partition column".to_string(), - )); - } - - let mut indices = Vec::with_capacity(field_path.len()); - let mut current_field = input_schema.field_with_name(&field_path[0]).map_err(|_| { - DataFusionError::Internal(format!( - "Top-level column '{}' not found in schema when resolving partition column path", - field_path[0] - )) - })?; - let top_index = input_schema.index_of(&field_path[0]).map_err(|_| { - DataFusionError::Internal(format!( - "Failed to get index of top-level column '{}' when resolving partition column path", - field_path[0] - )) - })?; - indices.push(top_index); - - for name in &field_path[1..] { - let dt = current_field.data_type(); - let struct_fields = match dt { - datafusion::arrow::datatypes::DataType::Struct(fields) => fields, - datafusion::arrow::datatypes::DataType::List(_) - | datafusion::arrow::datatypes::DataType::LargeList(_) - | datafusion::arrow::datatypes::DataType::FixedSizeList(_, _) => { - return Err(DataFusionError::NotImplemented(format!( - "Partitioning on nested list types is not supported while resolving nested field '{}' for partition column", - name - ))); - } - datafusion::arrow::datatypes::DataType::Map(_, _) => { - return Err(DataFusionError::NotImplemented(format!( - "Partitioning on nested map types is not supported while resolving nested field '{}' for partition column", - name - ))); - } - other => { - return Err(DataFusionError::Internal(format!( - "Expected struct type while resolving nested field '{}' for partition column, got {:?}", - name, other - ))); - } - }; - let child_index = struct_fields - .iter() - .position(|f| f.name() == name) - .ok_or_else(|| { - DataFusionError::Internal(format!( - "Field '{}' not found in struct when resolving partition column path", - name - )) - })?; - indices.push(child_index); - current_field = &struct_fields[child_index]; - } - - Ok(indices) -} - fn build_partition_type( partition_spec: &PartitionSpec, table_schema: &Schema, @@ -374,7 +307,8 @@ mod tests { partition_spec.clone(), table_schema, partition_type.clone(), - ); + ) + .unwrap(); assert_eq!(calculator.partition_type, partition_type); } @@ -405,9 +339,10 @@ mod tests { let partition_type = build_partition_type(&partition_spec, &table_schema).unwrap(); let calculator = - PartitionValueCalculator::new(partition_spec, table_schema, partition_type); + PartitionValueCalculator::new(partition_spec, table_schema, partition_type).unwrap(); - let mut projection_exprs: Vec<(Arc, String)> = Vec::new(); + let mut projection_exprs: Vec<(Arc, String)> = + Vec::with_capacity(arrow_schema.fields().len() + 1); for (i, field) in arrow_schema.fields().iter().enumerate() { let column_expr = Arc::new(Column::new(field.name(), i)); projection_exprs.push((column_expr, field.name().clone())); @@ -457,7 +392,8 @@ mod tests { let partition_type = build_partition_type(&partition_spec, &table_schema).unwrap(); let calculator = - PartitionValueCalculator::new(partition_spec, table_schema, partition_type.clone()); + PartitionValueCalculator::new(partition_spec, table_schema, partition_type.clone()) + .unwrap(); let expr = PartitionExpr::new(calculator); assert_eq!(expr.data_type(&arrow_schema).unwrap(), partition_type); @@ -540,8 +476,8 @@ mod tests { .unwrap(); let partition_type = build_partition_type(&partition_spec, &table_schema).unwrap(); - let calculator = - PartitionValueCalculator::new(partition_spec, table_schema, partition_type); + let mut calculator = + PartitionValueCalculator::new(partition_spec, table_schema, partition_type).unwrap(); let array = calculator.calculate(&batch).unwrap(); let struct_array = array.as_any().downcast_ref::().unwrap(); From d4fd33611fb84da8f8211cb90f49c25072e61e8d Mon Sep 17 00:00:00 2001 From: Florian Valeye Date: Mon, 29 Sep 2025 11:59:43 +0200 Subject: [PATCH 5/7] refactor: simplify RecordBatchProjector and optimize partition calculation Signed-off-by: Florian Valeye --- .../src/arrow/record_batch_projector.rs | 79 ++++++++----------- crates/iceberg/src/transform/mod.rs | 2 +- .../datafusion/src/physical_plan/project.rs | 69 ++++++++-------- 3 files changed, 72 insertions(+), 78 deletions(-) diff --git a/crates/iceberg/src/arrow/record_batch_projector.rs b/crates/iceberg/src/arrow/record_batch_projector.rs index ced1d6df51..45de0212e8 100644 --- a/crates/iceberg/src/arrow/record_batch_projector.rs +++ b/crates/iceberg/src/arrow/record_batch_projector.rs @@ -22,6 +22,7 @@ use arrow_buffer::NullBuffer; use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef}; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; +use crate::arrow::schema::schema_to_arrow_schema; use crate::error::Result; use crate::spec::Schema as IcebergSchema; use crate::{Error, ErrorKind}; @@ -79,22 +80,21 @@ impl RecordBatchProjector { }) } - /// Create RecordBatchProjector using Iceberg schema for field mapping. + /// Create RecordBatchProjector using Iceberg schema. /// - /// This constructor is more flexible and works with any Arrow schema by using - /// the Iceberg schema to map field names to field IDs. + /// This constructor converts the Iceberg schema to Arrow schema with field ID metadata, + /// then uses the standard field ID lookup for projection. /// /// # Arguments - /// * `original_schema` - The original Arrow schema (doesn't need field ID metadata) - /// * `iceberg_schema` - The Iceberg schema for field ID mapping + /// * `iceberg_schema` - The Iceberg schema for field ID mapping /// * `target_field_ids` - The field IDs to project - pub fn from_iceberg_schema_mapping( - original_schema: SchemaRef, + pub fn from_iceberg_schema( iceberg_schema: Arc, target_field_ids: &[i32], ) -> Result { + let arrow_schema_with_ids = Arc::new(schema_to_arrow_schema(&iceberg_schema)?); + let field_id_fetch_func = |field: &Field| -> Result> { - // First try to get field ID from metadata (Parquet case) if let Some(value) = field.metadata().get(PARQUET_FIELD_ID_META_KEY) { let field_id = value.parse::().map_err(|e| { Error::new( @@ -104,49 +104,16 @@ impl RecordBatchProjector { .with_context("value", value) .with_source(e) })?; - return Ok(Some(field_id as i64)); - } - - // Fallback: use Iceberg schema's built-in field lookup - if let Some(iceberg_field) = iceberg_schema.field_by_name(field.name()) { - return Ok(Some(iceberg_field.id as i64)); - } - - // Additional fallback: for nested fields, we need to search recursively - fn find_field_id_in_struct( - struct_type: &crate::spec::StructType, - field_name: &str, - ) -> Option { - for field in struct_type.fields() { - if field.name == field_name { - return Some(field.id); - } - if let crate::spec::Type::Struct(nested_struct) = &*field.field_type { - if let Some(nested_id) = find_field_id_in_struct(nested_struct, field_name) - { - return Some(nested_id); - } - } - } - None - } - - // Search in nested structs - for iceberg_field in iceberg_schema.as_struct().fields() { - if let crate::spec::Type::Struct(struct_type) = &*iceberg_field.field_type { - if let Some(nested_id) = find_field_id_in_struct(struct_type, field.name()) { - return Ok(Some(nested_id as i64)); - } - } + Ok(Some(field_id as i64)) + } else { + Ok(None) } - - Ok(None) }; let searchable_field_func = |_field: &Field| -> bool { true }; Self::new( - original_schema, + arrow_schema_with_ids, target_field_ids, field_id_fetch_func, searchable_field_func, @@ -242,6 +209,7 @@ mod test { use arrow_schema::{DataType, Field, Fields, Schema}; use crate::arrow::record_batch_projector::RecordBatchProjector; + use crate::spec::{NestedField, PrimitiveType, Schema as IcebergSchema, Type}; use crate::{Error, ErrorKind}; #[test] @@ -369,4 +337,25 @@ mod test { RecordBatchProjector::new(schema.clone(), &[3], field_id_fetch_func, |_| true); assert!(projector.is_ok()); } + + #[test] + fn test_from_iceberg_schema() { + let iceberg_schema = IcebergSchema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::optional(3, "age", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(); + + let projector = + RecordBatchProjector::from_iceberg_schema(Arc::new(iceberg_schema), &[1, 3]).unwrap(); + + assert_eq!(projector.field_indices.len(), 2); + assert_eq!(projector.projected_schema_ref().fields().len(), 2); + assert_eq!(projector.projected_schema_ref().field(0).name(), "id"); + assert_eq!(projector.projected_schema_ref().field(1).name(), "age"); + } } diff --git a/crates/iceberg/src/transform/mod.rs b/crates/iceberg/src/transform/mod.rs index 4cc0d1fe8f..28722023a4 100644 --- a/crates/iceberg/src/transform/mod.rs +++ b/crates/iceberg/src/transform/mod.rs @@ -29,7 +29,7 @@ mod truncate; mod void; /// TransformFunction is a trait that defines the interface for all transform functions. -pub trait TransformFunction: Send + Sync { +pub trait TransformFunction: Send + Sync + std::fmt::Debug { /// transform will take an input array and transform it into a new array. /// The implementation of this function will need to check and downcast the input to specific /// type. diff --git a/crates/integrations/datafusion/src/physical_plan/project.rs b/crates/integrations/datafusion/src/physical_plan/project.rs index c8ef8381a4..c12da1efdc 100644 --- a/crates/integrations/datafusion/src/physical_plan/project.rs +++ b/crates/integrations/datafusion/src/physical_plan/project.rs @@ -30,6 +30,7 @@ use datafusion::physical_plan::{ColumnarValue, ExecutionPlan}; use iceberg::arrow::record_batch_projector::RecordBatchProjector; use iceberg::spec::{PartitionSpec, Schema}; use iceberg::table::Table; +use iceberg::transform::BoxedTransformFunction; use crate::to_datafusion_error; @@ -126,7 +127,7 @@ impl PhysicalExpr for PartitionExpr { } fn evaluate(&self, batch: &RecordBatch) -> DFResult { - let mut calculator = self + let calculator = self .calculator .lock() .map_err(|e| DataFusionError::Internal(format!("Failed to lock calculator: {}", e)))?; @@ -183,12 +184,12 @@ impl std::hash::Hash for PartitionExpr { } /// Calculator for partition values in Iceberg tables -#[derive(Debug, Clone)] +#[derive(Debug)] struct PartitionValueCalculator { partition_spec: PartitionSpec, - table_schema: Schema, partition_type: DataType, - projector: Option, + projector: RecordBatchProjector, + transform_functions: Vec, } impl PartitionValueCalculator { @@ -203,35 +204,37 @@ impl PartitionValueCalculator { )); } + let transform_functions: Result, _> = partition_spec + .fields() + .iter() + .map(|pf| iceberg::transform::create_transform_function(&pf.transform)) + .collect(); + + let transform_functions = transform_functions.map_err(to_datafusion_error)?; + + let source_field_ids: Vec = partition_spec + .fields() + .iter() + .map(|pf| pf.source_id) + .collect(); + + let projector = RecordBatchProjector::from_iceberg_schema( + Arc::new(table_schema.clone()), + &source_field_ids, + ) + .map_err(to_datafusion_error)?; + Ok(Self { partition_spec, - table_schema, partition_type, - projector: None, + projector, + transform_functions, }) } - fn calculate(&mut self, batch: &RecordBatch) -> DFResult { - if self.projector.is_none() { - let source_field_ids: Vec = self - .partition_spec - .fields() - .iter() - .map(|pf| pf.source_id) - .collect(); - - let projector = RecordBatchProjector::from_iceberg_schema_mapping( - batch.schema(), - Arc::new(self.table_schema.clone()), - &source_field_ids, - ) - .map_err(to_datafusion_error)?; - - self.projector = Some(projector); - } - - let projector = self.projector.as_ref().unwrap(); - let source_columns = projector + fn calculate(&self, batch: &RecordBatch) -> DFResult { + let source_columns = self + .projector .project_column(batch.columns()) .map_err(to_datafusion_error)?; @@ -246,10 +249,7 @@ impl PartitionValueCalculator { let mut partition_values = Vec::with_capacity(self.partition_spec.fields().len()); - for (source_column, pf) in source_columns.iter().zip(self.partition_spec.fields()) { - let transform_fn = iceberg::transform::create_transform_function(&pf.transform) - .map_err(to_datafusion_error)?; - + for (source_column, transform_fn) in source_columns.iter().zip(&self.transform_functions) { let partition_value = transform_fn .transform(source_column.clone()) .map_err(to_datafusion_error)?; @@ -302,6 +302,11 @@ mod tests { .build() .unwrap(); + let _arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + let partition_type = build_partition_type(&partition_spec, &table_schema).unwrap(); let calculator = PartitionValueCalculator::new( partition_spec.clone(), @@ -476,7 +481,7 @@ mod tests { .unwrap(); let partition_type = build_partition_type(&partition_spec, &table_schema).unwrap(); - let mut calculator = + let calculator = PartitionValueCalculator::new(partition_spec, table_schema, partition_type).unwrap(); let array = calculator.calculate(&batch).unwrap(); From 37ac5d5542e547c22e4fc272bed162307b530d62 Mon Sep 17 00:00:00 2001 From: Florian Valeye Date: Sat, 11 Oct 2025 15:07:13 +0200 Subject: [PATCH 6/7] feat(project): add a validation method between Arrow and Iceberg table schemas Signed-off-by: Florian Valeye --- crates/iceberg/src/transform/mod.rs | 4 +- .../datafusion/src/physical_plan/project.rs | 130 +++++++++++++----- 2 files changed, 97 insertions(+), 37 deletions(-) diff --git a/crates/iceberg/src/transform/mod.rs b/crates/iceberg/src/transform/mod.rs index 28722023a4..809d2dafe0 100644 --- a/crates/iceberg/src/transform/mod.rs +++ b/crates/iceberg/src/transform/mod.rs @@ -17,6 +17,8 @@ //! Transform function used to compute partition values. +use std::fmt::Debug; + use arrow_array::ArrayRef; use crate::spec::{Datum, Transform}; @@ -29,7 +31,7 @@ mod truncate; mod void; /// TransformFunction is a trait that defines the interface for all transform functions. -pub trait TransformFunction: Send + Sync + std::fmt::Debug { +pub trait TransformFunction: Send + Sync + Debug { /// transform will take an input array and transform it into a new array. /// The implementation of this function will need to check and downcast the input to specific /// type. diff --git a/crates/integrations/datafusion/src/physical_plan/project.rs b/crates/integrations/datafusion/src/physical_plan/project.rs index c12da1efdc..5d56cda75f 100644 --- a/crates/integrations/datafusion/src/physical_plan/project.rs +++ b/crates/integrations/datafusion/src/physical_plan/project.rs @@ -17,7 +17,7 @@ //! Partition value projection for Iceberg tables. -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use datafusion::arrow::array::{ArrayRef, RecordBatch, StructArray}; use datafusion::arrow::datatypes::{DataType, Schema as ArrowSchema}; @@ -28,6 +28,7 @@ use datafusion::physical_expr::expressions::Column; use datafusion::physical_plan::projection::ProjectionExec; use datafusion::physical_plan::{ColumnarValue, ExecutionPlan}; use iceberg::arrow::record_batch_projector::RecordBatchProjector; +use iceberg::arrow::schema_to_arrow_schema; use iceberg::spec::{PartitionSpec, Schema}; use iceberg::table::Table; use iceberg::transform::BoxedTransformFunction; @@ -63,6 +64,10 @@ pub fn project_with_partition( } let input_schema = input.schema(); + + // Validate that input schema matches the table schema + validate_schema_compatibility(&input_schema, table_schema.as_ref())?; + let partition_type = build_partition_type(partition_spec, table_schema.as_ref())?; let calculator = PartitionValueCalculator::new( partition_spec.as_ref().clone(), @@ -88,13 +93,13 @@ pub fn project_with_partition( /// PhysicalExpr implementation for partition value calculation #[derive(Debug, Clone)] struct PartitionExpr { - calculator: Arc>, + calculator: Arc, } impl PartitionExpr { fn new(calculator: PartitionValueCalculator) -> Self { Self { - calculator: Arc::new(Mutex::new(calculator)), + calculator: Arc::new(calculator), } } } @@ -115,11 +120,7 @@ impl PhysicalExpr for PartitionExpr { } fn data_type(&self, _input_schema: &ArrowSchema) -> DFResult { - let calculator = self - .calculator - .lock() - .map_err(|e| DataFusionError::Internal(format!("Failed to lock calculator: {}", e)))?; - Ok(calculator.partition_type.clone()) + Ok(self.calculator.partition_type.clone()) } fn nullable(&self, _input_schema: &ArrowSchema) -> DFResult { @@ -127,11 +128,7 @@ impl PhysicalExpr for PartitionExpr { } fn evaluate(&self, batch: &RecordBatch) -> DFResult { - let calculator = self - .calculator - .lock() - .map_err(|e| DataFusionError::Internal(format!("Failed to lock calculator: {}", e)))?; - let array = calculator.calculate(batch)?; + let array = self.calculator.calculate(batch)?; Ok(ColumnarValue::Array(array)) } @@ -147,39 +144,34 @@ impl PhysicalExpr for PartitionExpr { } fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - if let Ok(calculator) = self.calculator.lock() { - let field_names: Vec = calculator - .partition_spec - .fields() - .iter() - .map(|pf| format!("{}({})", pf.transform, pf.name)) - .collect(); - write!(f, "iceberg_partition_values[{}]", field_names.join(", ")) - } else { - write!(f, "iceberg_partition_values") - } + let field_names: Vec = self + .calculator + .partition_spec + .fields() + .iter() + .map(|pf| format!("{}({})", pf.transform, pf.name)) + .collect(); + write!(f, "iceberg_partition_values[{}]", field_names.join(", ")) } } impl std::fmt::Display for PartitionExpr { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - if let Ok(calculator) = self.calculator.lock() { - let field_names: Vec<&str> = calculator - .partition_spec - .fields() - .iter() - .map(|pf| pf.name.as_str()) - .collect(); - write!(f, "iceberg_partition_values({})", field_names.join(", ")) - } else { - write!(f, "iceberg_partition_values") - } + let field_names: Vec<&str> = self + .calculator + .partition_spec + .fields() + .iter() + .map(|pf| pf.name.as_str()) + .collect(); + write!(f, "iceberg_partition_values({})", field_names.join(", ")) } } impl std::hash::Hash for PartitionExpr { fn hash(&self, state: &mut H) { - std::any::TypeId::of::().hash(state); + // Two PartitionExpr are equal if they share the same calculator Arc + Arc::as_ptr(&self.calculator).hash(state); } } @@ -264,6 +256,46 @@ impl PartitionValueCalculator { } } +/// Validates that the input Arrow schema is compatible with the Iceberg table schema. +/// +/// This ensures that: +/// - All fields in the input schema have matching names in the table schema +/// - The Arrow data types are compatible with the corresponding Iceberg types +fn validate_schema_compatibility( + arrow_schema: &ArrowSchema, + table_schema: &Schema, +) -> DFResult<()> { + // Convert Iceberg schema to Arrow schema for comparison + let expected_arrow_schema = + schema_to_arrow_schema(table_schema).map_err(to_datafusion_error)?; + + // Check that all fields in the input schema exist in the table schema with compatible types + for arrow_field in arrow_schema.fields() { + let field_name = arrow_field.name(); + + let expected_field = expected_arrow_schema + .field_with_name(field_name) + .map_err(|_| { + DataFusionError::Internal(format!( + "Input schema field '{}' not found in Iceberg table schema", + field_name + )) + })?; + + // Compare data types (metadata like field_id can differ) + if arrow_field.data_type() != expected_field.data_type() { + return Err(DataFusionError::Internal(format!( + "Input schema field '{}' has type {:?}, but Iceberg table schema expects {:?}", + field_name, + arrow_field.data_type(), + expected_field.data_type() + ))); + } + } + + Ok(()) +} + fn build_partition_type( partition_spec: &PartitionSpec, table_schema: &Schema, @@ -496,4 +528,30 @@ mod tests { assert_eq!(city_partition.value(0), "New York"); assert_eq!(city_partition.value(1), "Los Angeles"); } + + #[test] + fn test_validate_schema_compatibility() { + let table_schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + + let valid_arrow_schema = ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ]); + assert!(super::validate_schema_compatibility(&valid_arrow_schema, &table_schema).is_ok()); + + let invalid_arrow_schema = ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("unknown_field", DataType::Int32, false), + ]); + assert!( + super::validate_schema_compatibility(&invalid_arrow_schema, &table_schema).is_err() + ); + } } From 8dc3c2cdcad126e952d3c74534a30adecdd5f135 Mon Sep 17 00:00:00 2001 From: Florian Valeye Date: Fri, 17 Oct 2025 17:31:39 +0200 Subject: [PATCH 7/7] feat(datafusion): remove schema validation method and reference the related issue Signed-off-by: Florian Valeye --- crates/integrations/datafusion/src/physical_plan/project.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/integrations/datafusion/src/physical_plan/project.rs b/crates/integrations/datafusion/src/physical_plan/project.rs index 8c0a2d8b63..4bfe8192b0 100644 --- a/crates/integrations/datafusion/src/physical_plan/project.rs +++ b/crates/integrations/datafusion/src/physical_plan/project.rs @@ -63,7 +63,8 @@ pub fn project_with_partition( } let input_schema = input.schema(); - + // TODO: Validate that input_schema matches the Iceberg table schema. + // See: https://github.com/apache/iceberg-rust/issues/1752 let partition_type = build_partition_type(partition_spec, table_schema.as_ref())?; let calculator = PartitionValueCalculator::new( partition_spec.as_ref().clone(),