From 1251891df56cb7c6a9b73250e89732c64f4d5d08 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 6 Nov 2025 00:00:10 -0800 Subject: [PATCH 1/7] initialize partition splitter in the constructor of task writer --- .../datafusion/src/task_writer.rs | 42 ++++++++----------- 1 file changed, 17 insertions(+), 25 deletions(-) diff --git a/crates/integrations/datafusion/src/task_writer.rs b/crates/integrations/datafusion/src/task_writer.rs index d27b2e6fb..8ff658977 100644 --- a/crates/integrations/datafusion/src/task_writer.rs +++ b/crates/integrations/datafusion/src/task_writer.rs @@ -34,7 +34,7 @@ use iceberg::writer::partitioning::unpartitioned_writer::UnpartitionedWriter; /// /// TaskWriter coordinates writing data to Iceberg tables by: /// - Selecting the appropriate partitioning strategy (unpartitioned, fanout, or clustered) -/// - Lazily initializing the partition splitter on first write +/// - Initializing the partition splitter in the constructor for partitioned tables /// - Routing data to the underlying writer /// - Collecting all written data files /// @@ -67,7 +67,7 @@ use iceberg::writer::partitioning::unpartitioned_writer::UnpartitionedWriter; pub(crate) struct TaskWriter { /// The underlying writer (UnpartitionedWriter, FanoutWriter, or ClusteredWriter) writer: SupportedWriter, - /// Lazily initialized partition splitter for partitioned tables + /// Partition splitter for partitioned tables (initialized in constructor) partition_splitter: Option, /// Iceberg schema reference schema: SchemaRef, @@ -139,9 +139,22 @@ impl TaskWriter { SupportedWriter::Clustered(ClusteredWriter::new(writer_builder)) }; + // Initialize partition splitter in constructor for partitioned tables + let partition_splitter = if !partition_spec.is_unpartitioned() { + Some( + RecordBatchPartitionSplitter::new_with_precomputed_values( + schema.clone(), + partition_spec.clone(), + ) + .expect("Failed to create partition splitter"), + ) + } else { + None + }; + Self { writer, - partition_splitter: None, + partition_splitter, schema, partition_spec, } @@ -149,7 +162,7 @@ impl TaskWriter { /// Write a RecordBatch to the TaskWriter. /// - /// For the first write to a partitioned table, this method initializes the partition splitter. + /// For partitioned tables, the partition splitter is already initialized in the constructor. /// For unpartitioned tables, data is written directly without splitting. /// /// # Parameters @@ -163,7 +176,6 @@ impl TaskWriter { /// # Errors /// /// This method will return an error if: - /// - Partition splitter initialization fails /// - Splitting the batch by partition fails /// - Writing to the underlying writer fails /// @@ -183,29 +195,9 @@ impl TaskWriter { writer.write(batch).await } SupportedWriter::Fanout(writer) => { - // Initialize splitter on first write if needed - if self.partition_splitter.is_none() { - self.partition_splitter = - Some(RecordBatchPartitionSplitter::new_with_precomputed_values( - self.schema.clone(), - self.partition_spec.clone(), - )?); - } - - // Split and write partitioned data Self::write_partitioned_batches(writer, &self.partition_splitter, &batch).await } SupportedWriter::Clustered(writer) => { - // Initialize splitter on first write if needed - if self.partition_splitter.is_none() { - self.partition_splitter = - Some(RecordBatchPartitionSplitter::new_with_precomputed_values( - self.schema.clone(), - self.partition_spec.clone(), - )?); - } - - // Split and write partitioned data Self::write_partitioned_batches(writer, &self.partition_splitter, &batch).await } } From 8745cea8d3cc7ec97c4d9839361b944406f01fa8 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 6 Nov 2025 00:13:20 -0800 Subject: [PATCH 2/7] project and repartition for insert into --- .../src/physical_plan/repartition.rs | 1 - .../datafusion/src/physical_plan/write.rs | 45 +++++++++---------- .../integrations/datafusion/src/table/mod.rs | 43 +++++++++++------- 3 files changed, 49 insertions(+), 40 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/repartition.rs b/crates/integrations/datafusion/src/physical_plan/repartition.rs index 95cdc8472..8ad87fd1c 100644 --- a/crates/integrations/datafusion/src/physical_plan/repartition.rs +++ b/crates/integrations/datafusion/src/physical_plan/repartition.rs @@ -86,7 +86,6 @@ use iceberg::spec::{TableMetadata, TableMetadataRef, Transform}; /// NonZeroUsize::new(4).unwrap(), /// )?; /// ``` -#[allow(dead_code)] pub(crate) fn repartition( input: Arc, table_metadata: TableMetadataRef, diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index b9d1f02d1..4953182ca 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -44,12 +44,12 @@ use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder; -use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; use iceberg::{Error, ErrorKind}; use parquet::file::properties::WriterProperties; use uuid::Uuid; use crate::physical_plan::DATA_FILES_COL_NAME; +use crate::task_writer::TaskWriter; use crate::to_datafusion_error; /// An execution plan node that writes data to an Iceberg table. @@ -205,18 +205,6 @@ impl ExecutionPlan for IcebergWriteExec { partition: usize, context: Arc, ) -> DFResult { - if !self - .table - .metadata() - .default_partition_spec() - .is_unpartitioned() - { - // TODO add support for partitioned tables - return Err(DataFusionError::NotImplemented( - "IcebergWriteExec does not support partitioned tables yet".to_string(), - )); - } - let partition_type = self.table.metadata().default_partition_type().clone(); let format_version = self.table.metadata().format_version(); @@ -277,31 +265,40 @@ impl ExecutionPlan for IcebergWriteExec { ); let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder); + // Create TaskWriter + // TODO: Make fanout_enabled configurable via table properties + let fanout_enabled = true; + let schema = self.table.metadata().current_schema().clone(); + let partition_spec = self.table.metadata().default_partition_spec().clone(); + let task_writer = TaskWriter::new( + data_file_writer_builder, + fanout_enabled, + schema.clone(), + partition_spec, + ); + // Get input data let data = execute_input_stream( Arc::clone(&self.input), - Arc::new( - schema_to_arrow_schema(self.table.metadata().current_schema()) - .map_err(to_datafusion_error)?, - ), + Arc::new(schema_to_arrow_schema(&schema).map_err(to_datafusion_error)?), partition, Arc::clone(&context), )?; // Create write stream let stream = futures::stream::once(async move { - let mut writer = data_file_writer_builder - // todo specify partition key when partitioning writer is supported - .build(None) - .await - .map_err(to_datafusion_error)?; + let mut task_writer = task_writer; let mut input_stream = data; while let Some(batch) = input_stream.next().await { - writer.write(batch?).await.map_err(to_datafusion_error)?; + let batch = batch?; + task_writer + .write(batch) + .await + .map_err(to_datafusion_error)?; } - let data_files = writer.close().await.map_err(to_datafusion_error)?; + let data_files = task_writer.close().await.map_err(to_datafusion_error)?; // Convert builders to data files and then to JSON strings let data_files_strs: Vec = data_files diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index a8c49837c..0b5a62fab 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -19,6 +19,7 @@ pub mod metadata_table; pub mod table_provider_factory; use std::any::Any; +use std::num::NonZeroUsize; use std::sync::Arc; use async_trait::async_trait; @@ -38,6 +39,8 @@ use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent}; use metadata_table::IcebergMetadataTableProvider; use crate::physical_plan::commit::IcebergCommitExec; +use crate::physical_plan::project::project_with_partition; +use crate::physical_plan::repartition::repartition; use crate::physical_plan::scan::IcebergTableScan; use crate::physical_plan::write::IcebergWriteExec; @@ -170,32 +173,42 @@ impl TableProvider for IcebergTableProvider { async fn insert_into( &self, - _state: &dyn Session, + state: &dyn Session, input: Arc, _insert_op: InsertOp, ) -> DFResult> { - if !self - .table - .metadata() - .default_partition_spec() - .is_unpartitioned() - { - // TODO add insert into support for partitioned tables - return Err(DataFusionError::NotImplemented( - "IcebergTableProvider::insert_into does not support partitioned tables yet" - .to_string(), - )); - } - let Some(catalog) = self.catalog.clone() else { return Err(DataFusionError::Execution( "Catalog cannot be none for insert_into".to_string(), )); }; + let metadata = self.table.metadata(); + let partition_spec = metadata.default_partition_spec(); + + // Step 1: Project partition values for partitioned tables + let plan_with_partition = if !partition_spec.is_unpartitioned() { + project_with_partition(input, &self.table)? + } else { + input + }; + + // Step 2: Repartition for parallel processing + let target_partitions = NonZeroUsize::new(state.config().target_partitions()).ok_or_else(|| { + DataFusionError::Configuration( + "target_partitions must be greater than 0".to_string(), + ) + })?; + + let repartitioned_plan = repartition( + plan_with_partition, + self.table.metadata_ref(), + target_partitions, + )?; + let write_plan = Arc::new(IcebergWriteExec::new( self.table.clone(), - input, + repartitioned_plan, self.schema.clone(), )); From 9c571649f98aef597fd21bba58d648e32807e6e1 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 6 Nov 2025 00:22:50 -0800 Subject: [PATCH 3/7] minor --- crates/integrations/datafusion/src/table/mod.rs | 11 ++++++----- crates/integrations/datafusion/src/task_writer.rs | 3 --- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 0b5a62fab..9f491d70f 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -194,11 +194,12 @@ impl TableProvider for IcebergTableProvider { }; // Step 2: Repartition for parallel processing - let target_partitions = NonZeroUsize::new(state.config().target_partitions()).ok_or_else(|| { - DataFusionError::Configuration( - "target_partitions must be greater than 0".to_string(), - ) - })?; + let target_partitions = + NonZeroUsize::new(state.config().target_partitions()).ok_or_else(|| { + DataFusionError::Configuration( + "target_partitions must be greater than 0".to_string(), + ) + })?; let repartitioned_plan = repartition( plan_with_partition, diff --git a/crates/integrations/datafusion/src/task_writer.rs b/crates/integrations/datafusion/src/task_writer.rs index 8ff658977..b4c21a687 100644 --- a/crates/integrations/datafusion/src/task_writer.rs +++ b/crates/integrations/datafusion/src/task_writer.rs @@ -63,7 +63,6 @@ use iceberg::writer::partitioning::unpartitioned_writer::UnpartitionedWriter; /// // Close and get data files /// let data_files = task_writer.close().await?; /// ``` -#[allow(dead_code)] pub(crate) struct TaskWriter { /// The underlying writer (UnpartitionedWriter, FanoutWriter, or ClusteredWriter) writer: SupportedWriter, @@ -79,7 +78,6 @@ pub(crate) struct TaskWriter { /// /// This enum allows TaskWriter to work with different partitioning strategies /// while maintaining a unified interface. -#[allow(dead_code)] enum SupportedWriter { /// Writer for unpartitioned tables Unpartitioned(UnpartitionedWriter), @@ -89,7 +87,6 @@ enum SupportedWriter { Clustered(ClusteredWriter), } -#[allow(dead_code)] impl TaskWriter { /// Create a new TaskWriter. /// From 971747367daaa63c8baedc9c88cf92e8916866cf Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 6 Nov 2025 00:30:10 -0800 Subject: [PATCH 4/7] do not store schema and par spec in task writer --- crates/integrations/datafusion/src/task_writer.rs | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/crates/integrations/datafusion/src/task_writer.rs b/crates/integrations/datafusion/src/task_writer.rs index b4c21a687..d376766c5 100644 --- a/crates/integrations/datafusion/src/task_writer.rs +++ b/crates/integrations/datafusion/src/task_writer.rs @@ -68,10 +68,6 @@ pub(crate) struct TaskWriter { writer: SupportedWriter, /// Partition splitter for partitioned tables (initialized in constructor) partition_splitter: Option, - /// Iceberg schema reference - schema: SchemaRef, - /// Partition specification reference - partition_spec: PartitionSpecRef, } /// Internal enum to hold the different writer types. @@ -152,14 +148,13 @@ impl TaskWriter { Self { writer, partition_splitter, - schema, - partition_spec, } } /// Write a RecordBatch to the TaskWriter. /// - /// For partitioned tables, the partition splitter is already initialized in the constructor. + /// For partitioned tables, uses the partition splitter to split + /// the batch by partition key and route each partition to the underlying writer. /// For unpartitioned tables, data is written directly without splitting. /// /// # Parameters @@ -203,13 +198,13 @@ impl TaskWriter { /// Helper method to split and write partitioned data. /// /// This method handles the common logic for both FanoutWriter and ClusteredWriter: - /// - Splits the batch by partition key using the provided splitter - /// - Writes each partition to the underlying writer + /// - Splits the batch by partition key using the partition splitter + /// - Writes each partition to the underlying writer with its corresponding partition key /// /// # Parameters /// /// * `writer` - The underlying PartitioningWriter (FanoutWriter or ClusteredWriter) - /// * `partition_splitter` - The partition splitter (must be initialized) + /// * `partition_splitter` - The partition splitter /// * `batch` - The RecordBatch to write /// /// # Returns From 47f15edd03b6f379afcf072a32d017af9fa830dd Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 6 Nov 2025 07:17:44 -0800 Subject: [PATCH 5/7] add ut --- .../datafusion/src/physical_plan/write.rs | 4 +- .../tests/integration_datafusion_test.rs | 152 +++++++++++++++++- 2 files changed, 152 insertions(+), 4 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 4953182ca..c272fa8bb 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -35,7 +35,7 @@ use datafusion::physical_plan::{ execute_input_stream, }; use futures::StreamExt; -use iceberg::arrow::{FieldMatchMode, schema_to_arrow_schema}; +use iceberg::arrow::FieldMatchMode; use iceberg::spec::{DataFileFormat, TableProperties, serialize_data_file_to_json}; use iceberg::table::Table; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; @@ -280,7 +280,7 @@ impl ExecutionPlan for IcebergWriteExec { // Get input data let data = execute_input_stream( Arc::clone(&self.input), - Arc::new(schema_to_arrow_schema(&schema).map_err(to_datafusion_error)?), + self.input.schema(), // input schema may have projected column `_partition` partition, Arc::clone(&context), )?; diff --git a/crates/integrations/datafusion/tests/integration_datafusion_test.rs b/crates/integrations/datafusion/tests/integration_datafusion_test.rs index cb4987a97..fdf5b17d1 100644 --- a/crates/integrations/datafusion/tests/integration_datafusion_test.rs +++ b/crates/integrations/datafusion/tests/integration_datafusion_test.rs @@ -27,9 +27,13 @@ use datafusion::execution::context::SessionContext; use datafusion::parquet::arrow::PARQUET_FIELD_ID_META_KEY; use expect_test::expect; use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; -use iceberg::spec::{NestedField, PrimitiveType, Schema, StructType, Type}; +use iceberg::spec::{ + NestedField, PrimitiveType, Schema, StructType, Transform, Type, UnboundPartitionSpec, +}; use iceberg::test_utils::check_record_batches; -use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, NamespaceIdent, Result, TableCreation}; +use iceberg::{ + Catalog, CatalogBuilder, MemoryCatalog, NamespaceIdent, Result, TableCreation, TableIdent, +}; use iceberg_datafusion::IcebergCatalogProvider; use tempfile::TempDir; @@ -810,3 +814,147 @@ async fn test_insert_into_nested() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_insert_into_partitioned() -> Result<()> { + let iceberg_catalog = get_iceberg_catalog().await; + let namespace = NamespaceIdent::new("test_partitioned_write".to_string()); + set_test_namespace(&iceberg_catalog, &namespace).await?; + + // Create a schema with a partition column + let schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "category", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(3, "value", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build()?; + + // Create partition spec with identity transform on category + let partition_spec = UnboundPartitionSpec::builder() + .with_spec_id(0) + .add_partition_field(2, "category", Transform::Identity)? + .build(); + + // Create the partitioned table + let creation = TableCreation::builder() + .name("partitioned_table".to_string()) + .location(temp_path()) + .schema(schema) + .partition_spec(partition_spec) + .properties(HashMap::new()) + .build(); + + iceberg_catalog.create_table(&namespace, creation).await?; + + let client = Arc::new(iceberg_catalog); + let catalog = Arc::new(IcebergCatalogProvider::try_new(client.clone()).await?); + + let ctx = SessionContext::new(); + ctx.register_catalog("catalog", catalog); + + // Insert data with multiple partition values in a single batch + let df = ctx + .sql( + r#" + INSERT INTO catalog.test_partitioned_write.partitioned_table + VALUES + (1, 'electronics', 'laptop'), + (2, 'electronics', 'phone'), + (3, 'books', 'novel'), + (4, 'books', 'textbook'), + (5, 'clothing', 'shirt') + "#, + ) + .await + .unwrap(); + + let batches = df.collect().await.unwrap(); + assert_eq!(batches.len(), 1); + let batch = &batches[0]; + let rows_inserted = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(rows_inserted.value(0), 5); + + // Refresh catalog to get updated table + let catalog = Arc::new(IcebergCatalogProvider::try_new(client.clone()).await?); + ctx.register_catalog("catalog", catalog); + + // Query the table to verify data + let df = ctx + .sql("SELECT * FROM catalog.test_partitioned_write.partitioned_table ORDER BY id") + .await + .unwrap(); + + let batches = df.collect().await.unwrap(); + + // Verify the data - note that _partition column should NOT be present + check_record_batches( + batches, + expect![[r#" + Field { name: "id", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} }, + Field { name: "category", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} }, + Field { name: "value", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} }"#]], + expect![[r#" + id: PrimitiveArray + [ + 1, + 2, + 3, + 4, + 5, + ], + category: StringArray + [ + "electronics", + "electronics", + "books", + "books", + "clothing", + ], + value: StringArray + [ + "laptop", + "phone", + "novel", + "textbook", + "shirt", + ]"#]], + &[], + Some("id"), + ); + + // Verify that data files exist under correct partition paths + let table_ident = TableIdent::new(namespace.clone(), "partitioned_table".to_string()); + let table = client.load_table(&table_ident).await?; + let table_location = table.metadata().location(); + let file_io = table.file_io(); + + // List files under each expected partition path + let electronics_path = format!("{}/data/category=electronics", table_location); + let books_path = format!("{}/data/category=books", table_location); + let clothing_path = format!("{}/data/category=clothing", table_location); + + // Verify partition directories exist and contain data files + assert!( + file_io.exists(&electronics_path).await?, + "Expected partition directory: {}", + electronics_path + ); + assert!( + file_io.exists(&books_path).await?, + "Expected partition directory: {}", + books_path + ); + assert!( + file_io.exists(&clothing_path).await?, + "Expected partition directory: {}", + clothing_path + ); + + Ok(()) +} From 5e53c39b846327849c7edae01bbb218388561e36 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 6 Nov 2025 07:24:49 -0800 Subject: [PATCH 6/7] minor --- crates/integrations/datafusion/src/table/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 9f491d70f..42a3baad3 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -183,8 +183,7 @@ impl TableProvider for IcebergTableProvider { )); }; - let metadata = self.table.metadata(); - let partition_spec = metadata.default_partition_spec(); + let partition_spec = self.table.metadata().default_partition_spec(); // Step 1: Project partition values for partitioned tables let plan_with_partition = if !partition_spec.is_unpartitioned() { From 73697f8dac399396574f9467c7886b0166d766d3 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 6 Nov 2025 20:42:32 -0800 Subject: [PATCH 7/7] trigger CI, rest fixture issue?