From b101c9dc55b1b69393d88242c3fe41a3cde5bb8e Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 27 Oct 2025 21:45:28 -0400 Subject: [PATCH 1/2] Add date32 support to create_column in RecordBatchTransformer. --- crates/iceberg/src/arrow/record_batch_transformer.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs b/crates/iceberg/src/arrow/record_batch_transformer.rs index 779f1cc625..8500942a4b 100644 --- a/crates/iceberg/src/arrow/record_batch_transformer.rs +++ b/crates/iceberg/src/arrow/record_batch_transformer.rs @@ -19,8 +19,8 @@ use std::collections::HashMap; use std::sync::Arc; use arrow_array::{ - Array as ArrowArray, ArrayRef, BinaryArray, BooleanArray, Float32Array, Float64Array, - Int32Array, Int64Array, NullArray, RecordBatch, RecordBatchOptions, StringArray, + Array as ArrowArray, ArrayRef, BinaryArray, BooleanArray, Date32Array, Float32Array, + Float64Array, Int32Array, Int64Array, NullArray, RecordBatch, RecordBatchOptions, StringArray, }; use arrow_cast::cast; use arrow_schema::{ @@ -401,6 +401,13 @@ impl RecordBatchTransformer { let vals: Vec> = vec![None; num_rows]; Arc::new(Int32Array::from(vals)) } + (DataType::Date32, Some(PrimitiveLiteral::Int(value))) => { + Arc::new(Date32Array::from(vec![*value; num_rows])) + } + (DataType::Date32, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(Date32Array::from(vals)) + } (DataType::Int64, Some(PrimitiveLiteral::Long(value))) => { Arc::new(Int64Array::from(vec![*value; num_rows])) } From 4f9139a3b0cd56cc6d68a0ec2ad3bd0a3963e1ae Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Mon, 27 Oct 2025 21:55:26 -0400 Subject: [PATCH 2/2] Add test for date32 in RecordBatchTransformer. --- .../src/arrow/record_batch_transformer.rs | 71 ++++++++++++++++++- 1 file changed, 70 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs b/crates/iceberg/src/arrow/record_batch_transformer.rs index 8500942a4b..71fe59dea5 100644 --- a/crates/iceberg/src/arrow/record_batch_transformer.rs +++ b/crates/iceberg/src/arrow/record_batch_transformer.rs @@ -460,7 +460,8 @@ mod test { use std::sync::Arc; use arrow_array::{ - Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, + Array, Date32Array, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, + StringArray, }; use arrow_schema::{DataType, Field, Schema as ArrowSchema}; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; @@ -516,6 +517,74 @@ mod test { assert_eq!(result, expected); } + #[test] + fn schema_evolution_adds_date_column_with_nulls() { + // Reproduces TestSelect.readAndWriteWithBranchAfterSchemaChange from iceberg-spark. + // When reading old snapshots after adding a DATE column, the transformer must + // populate the new column with NULL values since old files lack this field. + let snapshot_schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::optional(3, "date_col", Type::Primitive(PrimitiveType::Date)) + .into(), + ]) + .build() + .unwrap(), + ); + let projected_iceberg_field_ids = [1, 2, 3]; + + let mut transformer = + RecordBatchTransformer::build(snapshot_schema, &projected_iceberg_field_ids); + + let file_schema = Arc::new(ArrowSchema::new(vec![ + simple_field("id", DataType::Int32, false, "1"), + simple_field("name", DataType::Utf8, true, "2"), + ])); + + let file_batch = RecordBatch::try_new(file_schema, vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec![ + Some("Alice"), + Some("Bob"), + Some("Charlie"), + ])), + ]) + .unwrap(); + + let result = transformer.process_record_batch(file_batch).unwrap(); + + assert_eq!(result.num_columns(), 3); + assert_eq!(result.num_rows(), 3); + + let id_column = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_column.values(), &[1, 2, 3]); + + let name_column = result + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(name_column.value(0), "Alice"); + assert_eq!(name_column.value(1), "Bob"); + assert_eq!(name_column.value(2), "Charlie"); + + let date_column = result + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(date_column.is_null(0)); + assert!(date_column.is_null(1)); + assert!(date_column.is_null(2)); + } + pub fn source_record_batch() -> RecordBatch { RecordBatch::try_new( arrow_schema_promotion_addition_and_renaming_required(),