From 0f62a62bbf93548bcc16874de9baacd376b92fa9 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 6 Aug 2025 15:24:06 -0700 Subject: [PATCH 1/9] starting on work node --- Cargo.lock | 1 + crates/iceberg/src/spec/table_metadata.rs | 7 + crates/integrations/datafusion/Cargo.toml | 2 + .../datafusion/src/physical_plan/mod.rs | 4 + .../datafusion/src/physical_plan/write.rs | 261 ++++++++++++++++++ 5 files changed, 275 insertions(+) create mode 100644 crates/integrations/datafusion/src/physical_plan/write.rs diff --git a/Cargo.lock b/Cargo.lock index 92f5408c9b..aa77f14308 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3673,6 +3673,7 @@ dependencies = [ "parquet", "tempfile", "tokio", + "uuid", ] [[package]] diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 3b89f54674..e5b96ae000 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -119,6 +119,13 @@ pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS: &str = "commit.retry.total-timeou /// Default value for total maximum retry time (ms). pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 1000; // 30 minutes +/// Default file format for data files +pub const PROPERTY_DEFAULT_FILE_FORMAT: &str = "write.format.default"; +/// Default file format for delete files +pub const PROPERTY_DELETE_DEFAULT_FILE_FORMAT: &str = "write.delete.format.default"; +/// Default value for data file format +pub const PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT: &str = "parquet"; + /// Reference to [`TableMetadata`]. pub type TableMetadataRef = Arc; diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml index 6954950b06..0ee1738b4f 100644 --- a/crates/integrations/datafusion/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -34,7 +34,9 @@ async-trait = { workspace = true } datafusion = { workspace = true } futures = { workspace = true } iceberg = { workspace = true } +parquet = { workspace = true } tokio = { workspace = true } +uuid = { workspace = true } [dev-dependencies] expect-test = { workspace = true } diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs b/crates/integrations/datafusion/src/physical_plan/mod.rs index e424b690bc..4f0874fbd0 100644 --- a/crates/integrations/datafusion/src/physical_plan/mod.rs +++ b/crates/integrations/datafusion/src/physical_plan/mod.rs @@ -18,4 +18,8 @@ pub(crate) mod expr_to_predicate; pub(crate) mod metadata_scan; pub(crate) mod scan; +pub(crate) mod write; + +pub(crate) const DATA_FILES_COL_NAME: &str = "data_files"; + pub use scan::IcebergTableScan; diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs new file mode 100644 index 0000000000..2b93b3db12 --- /dev/null +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -0,0 +1,261 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::fmt::{Debug, Formatter}; +use std::str::FromStr; +use std::sync::Arc; + +use datafusion::arrow::array::{ArrayRef, RecordBatch, StringArray}; +use datafusion::arrow::datatypes::{ + DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, +}; +use datafusion::common::Result as DFResult; +use datafusion::error::DataFusionError; +use datafusion::execution::{SendableRecordBatchStream, TaskContext}; +use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, + execute_input_stream, +}; +use futures::StreamExt; +use iceberg::arrow::schema_to_arrow_schema; +use iceberg::spec::{ + DataFileFormat, PROPERTY_DEFAULT_FILE_FORMAT, PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT, + serialize_data_file_to_json, +}; +use iceberg::table::Table; +use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; +use iceberg::writer::file_writer::ParquetWriterBuilder; +use iceberg::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, +}; +use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder; +use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; +use iceberg::{Error, ErrorKind}; +use parquet::file::properties::WriterProperties; +use uuid::Uuid; + +use crate::physical_plan::DATA_FILES_COL_NAME; +use crate::to_datafusion_error; + +#[derive(Debug)] +pub(crate) struct IcebergWriteExec { + table: Table, + input: Arc, + result_schema: ArrowSchemaRef, + plan_properties: PlanProperties, +} + +impl IcebergWriteExec { + pub fn new(table: Table, input: Arc, schema: ArrowSchemaRef) -> Self { + let plan_properties = Self::compute_properties(&input, schema); + + Self { + table, + input, + result_schema: Self::make_result_schema(), + plan_properties, + } + } + + fn compute_properties( + input: &Arc, + schema: ArrowSchemaRef, + ) -> PlanProperties { + PlanProperties::new( + EquivalenceProperties::new(schema), + Partitioning::UnknownPartitioning(input.output_partitioning().partition_count()), + EmissionType::Final, + Boundedness::Bounded, + ) + } + + // Create a record batch with serialized data files + fn make_result_batch(data_files: Vec) -> DFResult { + let files_array = Arc::new(StringArray::from(data_files)) as ArrayRef; + + RecordBatch::try_new(Self::make_result_schema(), vec![files_array]).map_err(|e| { + DataFusionError::ArrowError(e, Some("Failed to make result batch".to_string())) + }) + } + + fn make_result_schema() -> ArrowSchemaRef { + // Define a schema. + Arc::new(ArrowSchema::new(vec![Field::new( + DATA_FILES_COL_NAME, + DataType::Utf8, + false, + )])) + } +} + +impl DisplayAs for IcebergWriteExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "IcebergWriteExec: table={}", self.table.identifier()) + } + DisplayFormatType::Verbose => { + write!( + f, + "IcebergWriteExec: table={}, result_schema={:?}", + self.table.identifier(), + self.result_schema + ) + } + DisplayFormatType::TreeRender => { + write!(f, "IcebergWriteExec: table={}", self.table.identifier()) + } + } + } +} + +impl ExecutionPlan for IcebergWriteExec { + fn name(&self) -> &str { + "IcebergWriteExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.plan_properties + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> DFResult> { + if children.len() != 1 { + return Err(DataFusionError::Internal(format!( + "IcebergWriteExec expects exactly one child, but provided {}", + children.len() + ))); + } + + Ok(Arc::new(Self::new( + self.table.clone(), + Arc::clone(&children[0]), + self.schema(), + ))) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> DFResult { + let spec_id = self.table.metadata().default_partition_spec_id(); + let partition_type = self.table.metadata().default_partition_type().clone(); + let format_version = self.table.metadata().format_version(); + + // Check data file format + let file_format = DataFileFormat::from_str( + self.table + .metadata() + .properties() + .get(PROPERTY_DEFAULT_FILE_FORMAT) + .unwrap_or(&PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string()), + ) + .map_err(to_datafusion_error)?; + if file_format != DataFileFormat::Parquet { + return Err(to_datafusion_error(Error::new( + ErrorKind::FeatureUnsupported, + format!( + "File format {} is not supported for insert_into yet!", + file_format + ), + ))); + } + + // Create data file writer builder + let parquet_file_writer_builder = ParquetWriterBuilder::new( + WriterProperties::default(), + self.table.metadata().current_schema().clone(), + self.table.file_io().clone(), + DefaultLocationGenerator::new(self.table.metadata().clone()) + .map_err(to_datafusion_error)?, + // todo filename prefix/suffix should be configurable + DefaultFileNameGenerator::new( + "datafusion".to_string(), + Some(Uuid::now_v7().to_string()), + file_format, + ), + ); + let rolling_writer_builder = + RollingFileWriterBuilder::new(parquet_file_writer_builder, 100 * 1024 * 1024); + + let data_file_writer_builder = + DataFileWriterBuilder::new(rolling_writer_builder, None, spec_id); + + // Get input data + let data = execute_input_stream( + Arc::clone(&self.input), + Arc::new( + schema_to_arrow_schema(self.table.metadata().current_schema()) + .map_err(to_datafusion_error)?, + ), + partition, + Arc::clone(&context), + )?; + + // Create write stream + let stream = futures::stream::once(async move { + let mut writer = data_file_writer_builder + .build() + .await + .map_err(to_datafusion_error)?; + let mut input_stream = data; + + while let Some(batch) = input_stream.next().await { + writer.write(batch?).await.map_err(to_datafusion_error)?; + } + + let data_files = writer.close().await.map_err(to_datafusion_error)?; + + // Convert builders to data files and then to JSON strings + let data_files_strs: Vec = data_files + .into_iter() + .map(|data_file| -> DFResult { + // Serialize to JSON + let json = + serialize_data_file_to_json(data_file, &partition_type, format_version) + .map_err(to_datafusion_error)?; + + println!("Serialized data file: {}", json); // todo remove log + Ok(json) + }) + .collect::>>()?; + + Self::make_result_batch(data_files_strs) + }) + .boxed(); + + Ok(Box::pin(RecordBatchStreamAdapter::new( + Arc::clone(&self.result_schema), + stream, + ))) + } +} From fa74d9879f4ca75c65826f55d5345baddeefabf9 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 6 Aug 2025 15:25:06 -0700 Subject: [PATCH 2/9] clean write --- crates/integrations/datafusion/src/physical_plan/write.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 2b93b3db12..ec5ed89a2e 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -240,12 +240,10 @@ impl ExecutionPlan for IcebergWriteExec { .into_iter() .map(|data_file| -> DFResult { // Serialize to JSON - let json = + Ok( serialize_data_file_to_json(data_file, &partition_type, format_version) - .map_err(to_datafusion_error)?; - - println!("Serialized data file: {}", json); // todo remove log - Ok(json) + .map_err(to_datafusion_error)?, + ) }) .collect::>>()?; From f05d57221cc7b31a75e9f7f884a64d4d1901003e Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 6 Aug 2025 15:53:20 -0700 Subject: [PATCH 3/9] working writing --- .../datafusion/src/physical_plan/write.rs | 303 ++++++++++++++++++ 1 file changed, 303 insertions(+) diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index ec5ed89a2e..3493b38ae8 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -257,3 +257,306 @@ impl ExecutionPlan for IcebergWriteExec { ))) } } + +#[cfg(test)] +mod tests { + use std::any::Any; + use std::collections::HashMap; + use std::fmt::{Debug, Formatter}; + use std::sync::Arc; + + use datafusion::arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray}; + use datafusion::arrow::datatypes::{ + DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, + }; + use datafusion::common::Result as DFResult; + use datafusion::execution::{SendableRecordBatchStream, TaskContext}; + use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; + use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; + use datafusion::physical_plan::stream::RecordBatchStreamAdapter; + use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; + use futures::{StreamExt, stream}; + use iceberg::io::FileIOBuilder; + use iceberg::spec::{ + DataFileFormat, NestedField, PrimitiveType, Schema, Type, deserialize_data_file_from_json, + }; + use iceberg::{Catalog, MemoryCatalog, NamespaceIdent, Result, TableCreation}; + use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + use tempfile::TempDir; + + use super::*; + + /// A simple execution plan that returns a predefined set of record batches + struct MockExecutionPlan { + schema: ArrowSchemaRef, + batches: Vec, + properties: PlanProperties, + } + + impl MockExecutionPlan { + fn new(schema: ArrowSchemaRef, batches: Vec) -> Self { + let properties = PlanProperties::new( + EquivalenceProperties::new(schema.clone()), + Partitioning::UnknownPartitioning(1), + EmissionType::Final, + Boundedness::Bounded, + ); + + Self { + schema, + batches, + properties, + } + } + } + + impl Debug for MockExecutionPlan { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "MockExecutionPlan") + } + } + + impl DisplayAs for MockExecutionPlan { + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default + | DisplayFormatType::Verbose + | DisplayFormatType::TreeRender => { + write!(f, "MockExecutionPlan") + } + } + } + } + + impl ExecutionPlan for MockExecutionPlan { + fn name(&self) -> &str { + "MockExecutionPlan" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.properties + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> DFResult> { + Ok(self) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> DFResult { + let batches = self.batches.clone(); + let stream = stream::iter(batches.into_iter().map(Ok)); + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.schema.clone(), + stream.boxed(), + ))) + } + } + + /// Helper function to create a temporary directory and return its path + fn temp_path() -> String { + let temp_dir = TempDir::new().unwrap(); + temp_dir.path().to_str().unwrap().to_string() + } + + /// Helper function to create a memory catalog + fn get_iceberg_catalog() -> MemoryCatalog { + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + MemoryCatalog::new(file_io, Some(temp_path())) + } + + /// Helper function to create a test table schema + fn get_test_schema() -> Result { + Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + } + + /// Helper function to create a table creation + fn get_table_creation( + location: impl ToString, + name: impl ToString, + schema: Schema, + ) -> TableCreation { + TableCreation::builder() + .location(location.to_string()) + .name(name.to_string()) + .properties(HashMap::new()) + .schema(schema) + .build() + } + + #[tokio::test] + async fn test_iceberg_write_exec() -> Result<()> { + // 1. Set up test environment + let iceberg_catalog = get_iceberg_catalog(); + let namespace = NamespaceIdent::new("test_namespace".to_string()); + + // Create namespace + iceberg_catalog + .create_namespace(&namespace, HashMap::new()) + .await?; + + // Create schema + let schema = get_test_schema()?; + + // Create table + let table_name = "test_table"; + let table_location = temp_path(); + let creation = get_table_creation(table_location, table_name, schema); + let table = iceberg_catalog.create_table(&namespace, creation).await?; + + // 2. Create test data + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + ])); + + let id_array = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef; + let name_array = Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])) as ArrayRef; + + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![id_array, name_array]) + .map_err(|e| { + Error::new( + ErrorKind::Unexpected, + format!("Failed to create record batch: {}", e), + ) + })?; + + // 3. Create mock input execution plan + let input_plan = Arc::new(MockExecutionPlan::new(arrow_schema.clone(), vec![ + batch.clone(), + ])); + + // 4. Create IcebergWriteExec + let write_exec = IcebergWriteExec::new(table.clone(), input_plan, arrow_schema); + + // 5. Execute the plan + let task_ctx = Arc::new(TaskContext::default()); + let stream = write_exec.execute(0, task_ctx).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + format!("Failed to execute plan: {}", e), + ) + })?; + + // Collect the results + let mut results = vec![]; + let mut stream = stream; + while let Some(batch) = stream.next().await { + results.push(batch.map_err(|e| { + Error::new(ErrorKind::Unexpected, format!("Failed to get batch: {}", e)) + })?); + } + + // 6. Verify the results + assert_eq!(results.len(), 1, "Expected one result batch"); + let result_batch = &results[0]; + + // Check schema + assert_eq!( + result_batch.schema().as_ref(), + &ArrowSchema::new(vec![Field::new(DATA_FILES_COL_NAME, DataType::Utf8, false)]) + ); + + // Check data + assert_eq!(result_batch.num_rows(), 1, "Expected one data file"); + + // Get the data file JSON + let data_file_json = result_batch + .column(0) + .as_any() + .downcast_ref::() + .expect("Expected StringArray") + .value(0); + + // Deserialize the data file JSON + let partition_type = table.metadata().default_partition_type(); + let spec_id = table.metadata().default_partition_spec_id(); + let schema = table.metadata().current_schema(); + + let data_file = + deserialize_data_file_from_json(data_file_json, spec_id, partition_type, schema) + .expect("Failed to deserialize data file JSON"); + + // Verify data file properties + assert_eq!( + data_file.record_count(), + 3, + "Expected 3 records in the data file" + ); + assert!( + data_file.file_size_in_bytes() > 0, + "File size should be greater than 0" + ); + assert_eq!( + data_file.file_format(), + DataFileFormat::Parquet, + "Expected Parquet file format" + ); + + // Verify column statistics + assert!( + data_file.column_sizes().get(&1).unwrap() > &0, + "Column 1 size should be greater than 0" + ); + assert!( + data_file.column_sizes().get(&2).unwrap() > &0, + "Column 2 size should be greater than 0" + ); + + assert_eq!( + *data_file.value_counts().get(&1).unwrap(), + 3, + "Expected 3 values for column 1" + ); + assert_eq!( + *data_file.value_counts().get(&2).unwrap(), + 3, + "Expected 3 values for column 2" + ); + + // Verify lower and upper bounds + assert!( + data_file.lower_bounds().contains_key(&1) || data_file.lower_bounds().contains_key(&2), + "Expected lower bounds to contain at least one column" + ); + assert!( + data_file.upper_bounds().contains_key(&1) || data_file.upper_bounds().contains_key(&2), + "Expected upper bounds to contain at least one column" + ); + + // Check that the file path exists + let file_path = data_file.file_path(); + assert!(!file_path.is_empty(), "File path should not be empty"); + + // 7. Verify the file exists + let file_io = table.file_io(); + assert!(file_io.exists(file_path).await?, "Data file should exist"); + + Ok(()) + } +} From 7e6e8d51a77a52580e610c7f55da0efe6deae4d6 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 6 Aug 2025 16:48:14 -0700 Subject: [PATCH 4/9] working on clippy --- .../integrations/datafusion/src/physical_plan/write.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 3493b38ae8..f1bd0b1789 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -238,12 +238,9 @@ impl ExecutionPlan for IcebergWriteExec { // Convert builders to data files and then to JSON strings let data_files_strs: Vec = data_files .into_iter() - .map(|data_file| -> DFResult { - // Serialize to JSON - Ok( - serialize_data_file_to_json(data_file, &partition_type, format_version) - .map_err(to_datafusion_error)?, - ) + .map(|data_file| { + serialize_data_file_to_json(data_file, &partition_type, format_version) + .map_err(to_datafusion_error) }) .collect::>>()?; From bb721b485d30c0e119c74b33e454b7a3e71bb8ca Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 6 Aug 2025 17:08:55 -0700 Subject: [PATCH 5/9] use property to control target file size --- crates/iceberg/src/spec/table_metadata.rs | 5 +++++ .../datafusion/src/physical_plan/write.rs | 22 +++++++++++++++++-- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index e5b96ae000..1f62afb31f 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -126,6 +126,11 @@ pub const PROPERTY_DELETE_DEFAULT_FILE_FORMAT: &str = "write.delete.format.defau /// Default value for data file format pub const PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT: &str = "parquet"; +/// Target file size for newly written files. +pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = "write.target-file-size-bytes"; +/// Default target file size +pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 1024 * 1024; // 512 MB + /// Reference to [`TableMetadata`]. pub type TableMetadataRef = Arc; diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index f1bd0b1789..a56e709ab7 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -38,6 +38,7 @@ use futures::StreamExt; use iceberg::arrow::schema_to_arrow_schema; use iceberg::spec::{ DataFileFormat, PROPERTY_DEFAULT_FILE_FORMAT, PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT, + PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES, PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, serialize_data_file_to_json, }; use iceberg::table::Table; @@ -204,9 +205,26 @@ impl ExecutionPlan for IcebergWriteExec { file_format, ), ); + let target_file_size = match self + .table + .metadata() + .properties() + .get(PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES) + { + Some(value_str) => value_str + .parse::() + .map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + "Invalid value for commit.retry.min-wait-ms", + ) + .with_source(e) + }) + .map_err(to_datafusion_error)?, + None => PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, + }; let rolling_writer_builder = - RollingFileWriterBuilder::new(parquet_file_writer_builder, 100 * 1024 * 1024); - + RollingFileWriterBuilder::new(parquet_file_writer_builder, target_file_size); let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder, None, spec_id); From 7c153e4a2fdb357579afb237003d92ff4a59e1a8 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 6 Aug 2025 17:16:33 -0700 Subject: [PATCH 6/9] minor write --- crates/integrations/datafusion/src/physical_plan/write.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index a56e709ab7..6742f13912 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -199,11 +199,7 @@ impl ExecutionPlan for IcebergWriteExec { DefaultLocationGenerator::new(self.table.metadata().clone()) .map_err(to_datafusion_error)?, // todo filename prefix/suffix should be configurable - DefaultFileNameGenerator::new( - "datafusion".to_string(), - Some(Uuid::now_v7().to_string()), - file_format, - ), + DefaultFileNameGenerator::new(Uuid::now_v7().to_string(), None, file_format), ); let target_file_size = match self .table From 1e87bd4cfc462a2c9f666180590919aa52402275 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 6 Aug 2025 17:23:26 -0700 Subject: [PATCH 7/9] write clean --- .../datafusion/src/physical_plan/write.rs | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 6742f13912..8ccb940172 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -56,6 +56,13 @@ use uuid::Uuid; use crate::physical_plan::DATA_FILES_COL_NAME; use crate::to_datafusion_error; +/// An execution plan node that writes data to an Iceberg table. +/// +/// This execution plan takes input data from a child execution plan and writes it to an Iceberg table. +/// It handles the creation of data files in the appropriate format and returns information about the written files as its output. +/// +/// The output of this execution plan is a record batch containing a single column with serialized +/// data file information that can be used for committing the write operation to the table. #[derive(Debug)] pub(crate) struct IcebergWriteExec { table: Table, @@ -163,6 +170,27 @@ impl ExecutionPlan for IcebergWriteExec { ))) } + /// Executes the write operation for the given partition. + /// + /// This function: + /// 1. Sets up a data file writer based on the table's configuration + /// 2. Processes input data from the child execution plan + /// 3. Writes the data to files using the configured writer + /// 4. Returns a stream containing information about the written data files + /// + /// The output of this function is a stream of record batches with the following structure: + /// + /// ```text + /// +------------------+ + /// | data_files | + /// +------------------+ + /// | "{"file_path":.. | <- JSON string representing a data file + /// +------------------+ + /// ``` + /// + /// Each row in the output contains a JSON string representing a data file that was written. + /// + /// This output can be used by a subsequent operation to commit the added files to the table. fn execute( &self, partition: usize, From 24a34ef988f9bddde4b4393fed4bad81a7ce2b32 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 8 Aug 2025 10:23:31 -0700 Subject: [PATCH 8/9] fail for partitioned tables --- .../datafusion/src/physical_plan/write.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 8ccb940172..e10dcf8d47 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -196,6 +196,18 @@ impl ExecutionPlan for IcebergWriteExec { partition: usize, context: Arc, ) -> DFResult { + if self + .table + .metadata() + .default_partition_spec() + .is_unpartitioned() + { + // TODO add support for partitioned tables + return Err(DataFusionError::NotImplemented( + "IcebergWriteExec does not support partitioned tables yet".to_string(), + )); + } + let spec_id = self.table.metadata().default_partition_spec_id(); let partition_type = self.table.metadata().default_partition_type().clone(); let format_version = self.table.metadata().format_version(); @@ -240,7 +252,7 @@ impl ExecutionPlan for IcebergWriteExec { .map_err(|e| { Error::new( ErrorKind::DataInvalid, - "Invalid value for commit.retry.min-wait-ms", + "Invalid value for write.target-file-size-bytes", ) .with_source(e) }) From 6aa1d9592ebbc1d90779ac1f3fa2ae81c8c1ada1 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Fri, 8 Aug 2025 10:40:10 -0700 Subject: [PATCH 9/9] a bug a day, keep bs away --- crates/integrations/datafusion/src/physical_plan/write.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index e10dcf8d47..a8d0b110af 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -196,7 +196,7 @@ impl ExecutionPlan for IcebergWriteExec { partition: usize, context: Arc, ) -> DFResult { - if self + if !self .table .metadata() .default_partition_spec()