From ec545f2a9f50dcbe694b2196e04b32de24f7a411 Mon Sep 17 00:00:00 2001 From: Sergei Grebnov Date: Tue, 14 Oct 2025 10:11:23 +0000 Subject: [PATCH 1/2] Improve `IcebergCommitExec` to correctly populate properties/schema (#1721) ## Which issue does this PR close? PR fixes schema mismatch errors (similar to the example shown below) when using `IcebergCommitExec` with DataFusion. This occurs when `IcebergCommitExec` is not the top-level plan but is instead wrapped as the input to another plan node, for example when added by a custom optimization rule (cache invalidation step for example). >An internal error occurred. Internal error: PhysicalOptimizer rule 'OutputRequirements' failed. Schema mismatch. Expected original schema: Schema { fields: [Field { name: "count", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, got new schema: Schema { fields: [Field { name: "r_regionkey", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} }, Field { name: "r_name", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} }, Field { name: "r_comment", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} }], metadata: {} }. This issue was likely caused by a bug in DataFusion's code. Please help us to resolve this by filing a bug report in our issue tracker: https://github.com/apache/datafusion/issues ## What changes are included in this PR? PR updates `compute_properties` logic to use target (output) schema instead of input schema. Below is example DataFusion `DataSinkExec` implementation demonstrating that properties must be created based on target schema, not input. https://github.com/apache/datafusion/blob/4eacb6046773b759dae0b3d801fe8cb1c6b65c0f/datafusion/datasource/src/sink.rs#L101C1-L117C6 ```rust impl DataSinkExec { /// Create a plan to write to `sink` pub fn new( input: Arc, sink: Arc, sort_order: Option, ) -> Self { let count_schema = make_count_schema(); let cache = Self::create_schema(&input, count_schema); Self { input, sink, count_schema: make_count_schema(), sort_order, cache, } } .... fn properties(&self) -> &PlanProperties { &self.cache } ``` ## Are these changes tested? Tested manually, expanded existing test to verify output schema, tested as part of [Spice Iceberg write automated tests](https://github.com/spiceai/spiceai/blob/trunk/crates/runtime/tests/iceberg/write/mod.rs) --- .../integrations/datafusion/src/physical_plan/commit.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs b/crates/integrations/datafusion/src/physical_plan/commit.rs index 067049c122..48373f11e2 100644 --- a/crates/integrations/datafusion/src/physical_plan/commit.rs +++ b/crates/integrations/datafusion/src/physical_plan/commit.rs @@ -57,14 +57,16 @@ impl IcebergCommitExec { input: Arc, schema: ArrowSchemaRef, ) -> Self { - let plan_properties = Self::compute_properties(schema.clone()); + let count_schema = Self::make_count_schema(); + + let plan_properties = Self::compute_properties(Arc::clone(&count_schema)); Self { table, catalog, input, schema, - count_schema: Self::make_count_schema(), + count_schema, plan_properties, } } @@ -469,6 +471,9 @@ mod tests { let commit_exec = IcebergCommitExec::new(table.clone(), catalog.clone(), input_exec, arrow_schema); + // Verify Execution Plan schema matches the count schema + assert_eq!(commit_exec.schema(), IcebergCommitExec::make_count_schema()); + // Execute the commit exec let task_ctx = Arc::new(TaskContext::default()); let stream = commit_exec.execute(0, task_ctx)?; From 36bd7eb36828be24acf08e97e2b2ff8e8e83d6f4 Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Tue, 14 Oct 2025 06:16:32 -0400 Subject: [PATCH 2/2] feat(spec): add `table_properties.rs` to spec (#1733) ## Which issue does this PR close? - Closes #1505 . ## What changes are included in this PR? - Adds `table_properties.rs` to hold and validate properties and set default values. Uses macros to simplify setting new properties. ## Are these changes tested? Yes --- Cargo.toml | 4 +- crates/iceberg/Cargo.toml | 2 +- crates/iceberg/src/spec/mod.rs | 2 + crates/iceberg/src/spec/table_metadata.rs | 85 ------ .../src/spec/table_metadata_builder.rs | 23 +- crates/iceberg/src/spec/table_properties.rs | 284 ++++++++++++++++++ crates/iceberg/src/transaction/mod.rs | 69 +---- crates/iceberg/src/transaction/snapshot.rs | 13 +- .../src/writer/file_writer/rolling_writer.rs | 4 +- .../datafusion/src/physical_plan/write.rs | 14 +- 10 files changed, 327 insertions(+), 173 deletions(-) create mode 100644 crates/iceberg/src/spec/table_properties.rs diff --git a/Cargo.toml b/Cargo.toml index 999b911753..46c99cc3d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,10 +76,10 @@ futures = "0.3" hive_metastore = "0.2.0" http = "1.2" iceberg = { version = "0.7.0", path = "./crates/iceberg" } -iceberg-catalog-rest = { version = "0.7.0", path = "./crates/catalog/rest" } iceberg-catalog-glue = { version = "0.7.0", path = "./crates/catalog/glue" } -iceberg-catalog-s3tables = { version = "0.7.0", path = "./crates/catalog/s3tables" } iceberg-catalog-hms = { version = "0.7.0", path = "./crates/catalog/hms" } +iceberg-catalog-rest = { version = "0.7.0", path = "./crates/catalog/rest" } +iceberg-catalog-s3tables = { version = "0.7.0", path = "./crates/catalog/s3tables" } iceberg-datafusion = { version = "0.7.0", path = "./crates/integrations/datafusion" } indicatif = "0.17" itertools = "0.13" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index d592700b73..b831607aab 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -74,8 +74,8 @@ opendal = { workspace = true } ordered-float = { workspace = true } parquet = { workspace = true, features = ["async"] } rand = { workspace = true } -reqwest = { workspace = true } reqsign = { version = "0.16.3", optional = true, default-features = false } +reqwest = { workspace = true } roaring = { workspace = true } rust_decimal = { workspace = true } serde = { workspace = true } diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs index 90dafcc110..44b35e5a6b 100644 --- a/crates/iceberg/src/spec/mod.rs +++ b/crates/iceberg/src/spec/mod.rs @@ -30,6 +30,7 @@ mod sort; mod statistic_file; mod table_metadata; mod table_metadata_builder; +mod table_properties; mod transform; mod values; mod view_metadata; @@ -48,6 +49,7 @@ pub use snapshot_summary::*; pub use sort::*; pub use statistic_file::*; pub use table_metadata::*; +pub use table_properties::*; pub use transform::*; pub use values::*; pub use view_metadata::*; diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index d7347d50ee..ca298f308d 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -46,91 +46,6 @@ pub(crate) static ONE_MINUTE_MS: i64 = 60_000; pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1; pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0; -/// Reserved table property for table format version. -/// -/// Iceberg will default a new table's format version to the latest stable and recommended -/// version. This reserved property keyword allows users to override the Iceberg format version of -/// the table metadata. -/// -/// If this table property exists when creating a table, the table will use the specified format -/// version. If a table updates this property, it will try to upgrade to the specified format -/// version. -pub const PROPERTY_FORMAT_VERSION: &str = "format-version"; -/// Reserved table property for table UUID. -pub const PROPERTY_UUID: &str = "uuid"; -/// Reserved table property for the total number of snapshots. -pub const PROPERTY_SNAPSHOT_COUNT: &str = "snapshot-count"; -/// Reserved table property for current snapshot summary. -pub const PROPERTY_CURRENT_SNAPSHOT_SUMMARY: &str = "current-snapshot-summary"; -/// Reserved table property for current snapshot id. -pub const PROPERTY_CURRENT_SNAPSHOT_ID: &str = "current-snapshot-id"; -/// Reserved table property for current snapshot timestamp. -pub const PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP: &str = "current-snapshot-timestamp-ms"; -/// Reserved table property for the JSON representation of current schema. -pub const PROPERTY_CURRENT_SCHEMA: &str = "current-schema"; -/// Reserved table property for the JSON representation of current(default) partition spec. -pub const PROPERTY_DEFAULT_PARTITION_SPEC: &str = "default-partition-spec"; -/// Reserved table property for the JSON representation of current(default) sort order. -pub const PROPERTY_DEFAULT_SORT_ORDER: &str = "default-sort-order"; - -/// Property key for max number of previous versions to keep. -pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX: &str = "write.metadata.previous-versions-max"; -/// Default value for max number of previous versions to keep. -pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100; - -/// Property key for max number of partitions to keep summary stats for. -pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT: &str = "write.summary.partition-limit"; -/// Default value for the max number of partitions to keep summary stats for. -pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT: u64 = 0; - -/// Reserved Iceberg table properties list. -/// -/// Reserved table properties are only used to control behaviors when creating or updating a -/// table. The value of these properties are not persisted as a part of the table metadata. -pub const RESERVED_PROPERTIES: [&str; 9] = [ - PROPERTY_FORMAT_VERSION, - PROPERTY_UUID, - PROPERTY_SNAPSHOT_COUNT, - PROPERTY_CURRENT_SNAPSHOT_ID, - PROPERTY_CURRENT_SNAPSHOT_SUMMARY, - PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP, - PROPERTY_CURRENT_SCHEMA, - PROPERTY_DEFAULT_PARTITION_SPEC, - PROPERTY_DEFAULT_SORT_ORDER, -]; - -/// Property key for number of commit retries. -pub const PROPERTY_COMMIT_NUM_RETRIES: &str = "commit.retry.num-retries"; -/// Default value for number of commit retries. -pub const PROPERTY_COMMIT_NUM_RETRIES_DEFAULT: usize = 4; - -/// Property key for minimum wait time (ms) between retries. -pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS: &str = "commit.retry.min-wait-ms"; -/// Default value for minimum wait time (ms) between retries. -pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT: u64 = 100; - -/// Property key for maximum wait time (ms) between retries. -pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS: &str = "commit.retry.max-wait-ms"; -/// Default value for maximum wait time (ms) between retries. -pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT: u64 = 60 * 1000; // 1 minute - -/// Property key for total maximum retry time (ms). -pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS: &str = "commit.retry.total-timeout-ms"; -/// Default value for total maximum retry time (ms). -pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 1000; // 30 minutes - -/// Default file format for data files -pub const PROPERTY_DEFAULT_FILE_FORMAT: &str = "write.format.default"; -/// Default file format for delete files -pub const PROPERTY_DELETE_DEFAULT_FILE_FORMAT: &str = "write.delete.format.default"; -/// Default value for data file format -pub const PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT: &str = "parquet"; - -/// Target file size for newly written files. -pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = "write.target-file-size-bytes"; -/// Default target file size -pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 1024 * 1024; // 512 MB - /// Reference to [`TableMetadata`]. pub type TableMetadataRef = Arc; diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs b/crates/iceberg/src/spec/table_metadata_builder.rs index 068f2002f9..7881ebea40 100644 --- a/crates/iceberg/src/spec/table_metadata_builder.rs +++ b/crates/iceberg/src/spec/table_metadata_builder.rs @@ -22,11 +22,10 @@ use uuid::Uuid; use super::{ DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, FormatVersion, MAIN_BRANCH, MetadataLog, - ONE_MINUTE_MS, PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX, - PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT, PartitionSpec, PartitionSpecBuilder, - PartitionStatisticsFile, RESERVED_PROPERTIES, Schema, SchemaRef, Snapshot, SnapshotLog, - SnapshotReference, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, StructType, - TableMetadata, UNPARTITIONED_LAST_ASSIGNED_ID, UnboundPartitionSpec, + ONE_MINUTE_MS, PartitionSpec, PartitionSpecBuilder, PartitionStatisticsFile, Schema, SchemaRef, + Snapshot, SnapshotLog, SnapshotReference, SnapshotRetention, SortOrder, SortOrderRef, + StatisticsFile, StructType, TableMetadata, TableProperties, UNPARTITIONED_LAST_ASSIGNED_ID, + UnboundPartitionSpec, }; use crate::error::{Error, ErrorKind, Result}; use crate::{TableCreation, TableUpdate}; @@ -247,7 +246,7 @@ impl TableMetadataBuilder { // List of specified properties that are RESERVED and should not be persisted. let reserved_properties = properties .keys() - .filter(|key| RESERVED_PROPERTIES.contains(&key.as_str())) + .filter(|key| TableProperties::RESERVED_PROPERTIES.contains(&key.as_str())) .map(ToString::to_string) .collect::>(); @@ -285,7 +284,7 @@ impl TableMetadataBuilder { // disallow removal of reserved properties let reserved_properties = properties .iter() - .filter(|key| RESERVED_PROPERTIES.contains(&key.as_str())) + .filter(|key| TableProperties::RESERVED_PROPERTIES.contains(&key.as_str())) .map(ToString::to_string) .collect::>(); @@ -1061,9 +1060,9 @@ impl TableMetadataBuilder { let max_size = self .metadata .properties - .get(PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX) + .get(TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX) .and_then(|v| v.parse::().ok()) - .unwrap_or(PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT) + .unwrap_or(TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT) .max(1); if self.metadata.metadata_log.len() > max_size { @@ -1360,8 +1359,8 @@ mod tests { use crate::io::FileIOBuilder; use crate::spec::{ BlobMetadata, NestedField, NullOrder, Operation, PartitionSpec, PrimitiveType, Schema, - SnapshotRetention, SortDirection, SortField, StructType, Summary, Transform, Type, - UnboundPartitionField, + SnapshotRetention, SortDirection, SortField, StructType, Summary, TableProperties, + Transform, Type, UnboundPartitionField, }; use crate::table::Table; @@ -2299,7 +2298,7 @@ mod tests { let builder = builder_without_changes(FormatVersion::V2); let metadata = builder .set_properties(HashMap::from_iter(vec![( - PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX.to_string(), + TableProperties::PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX.to_string(), "2".to_string(), )])) .unwrap() diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs new file mode 100644 index 0000000000..9aa789fed8 --- /dev/null +++ b/crates/iceberg/src/spec/table_properties.rs @@ -0,0 +1,284 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +// Helper function to parse a property from a HashMap +// If the property is not found, use the default value +fn parse_property( + properties: &HashMap, + key: &str, + default: T, +) -> Result +where + ::Err: std::fmt::Display, +{ + properties.get(key).map_or(Ok(default), |value| { + value + .parse::() + .map_err(|e| anyhow::anyhow!("Invalid value for {}: {}", key, e)) + }) +} + +/// TableProperties that contains the properties of a table. +#[derive(Debug)] +pub struct TableProperties { + /// The number of times to retry a commit. + pub commit_num_retries: usize, + /// The minimum wait time between retries. + pub commit_min_retry_wait_ms: u64, + /// The maximum wait time between retries. + pub commit_max_retry_wait_ms: u64, + /// The total timeout for commit retries. + pub commit_total_retry_timeout_ms: u64, + /// The default format for files. + pub write_format_default: String, + /// The target file size for files. + pub write_target_file_size_bytes: usize, +} + +impl TableProperties { + /// Reserved table property for table format version. + /// + /// Iceberg will default a new table's format version to the latest stable and recommended + /// version. This reserved property keyword allows users to override the Iceberg format version of + /// the table metadata. + /// + /// If this table property exists when creating a table, the table will use the specified format + /// version. If a table updates this property, it will try to upgrade to the specified format + /// version. + pub const PROPERTY_FORMAT_VERSION: &str = "format-version"; + /// Reserved table property for table UUID. + pub const PROPERTY_UUID: &str = "uuid"; + /// Reserved table property for the total number of snapshots. + pub const PROPERTY_SNAPSHOT_COUNT: &str = "snapshot-count"; + /// Reserved table property for current snapshot summary. + pub const PROPERTY_CURRENT_SNAPSHOT_SUMMARY: &str = "current-snapshot-summary"; + /// Reserved table property for current snapshot id. + pub const PROPERTY_CURRENT_SNAPSHOT_ID: &str = "current-snapshot-id"; + /// Reserved table property for current snapshot timestamp. + pub const PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP: &str = "current-snapshot-timestamp-ms"; + /// Reserved table property for the JSON representation of current schema. + pub const PROPERTY_CURRENT_SCHEMA: &str = "current-schema"; + /// Reserved table property for the JSON representation of current(default) partition spec. + pub const PROPERTY_DEFAULT_PARTITION_SPEC: &str = "default-partition-spec"; + /// Reserved table property for the JSON representation of current(default) sort order. + pub const PROPERTY_DEFAULT_SORT_ORDER: &str = "default-sort-order"; + + /// Property key for max number of previous versions to keep. + pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX: &str = + "write.metadata.previous-versions-max"; + /// Default value for max number of previous versions to keep. + pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100; + + /// Property key for max number of partitions to keep summary stats for. + pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT: &str = "write.summary.partition-limit"; + /// Default value for the max number of partitions to keep summary stats for. + pub const PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT: u64 = 0; + + /// Reserved Iceberg table properties list. + /// + /// Reserved table properties are only used to control behaviors when creating or updating a + /// table. The value of these properties are not persisted as a part of the table metadata. + pub const RESERVED_PROPERTIES: [&str; 9] = [ + Self::PROPERTY_FORMAT_VERSION, + Self::PROPERTY_UUID, + Self::PROPERTY_SNAPSHOT_COUNT, + Self::PROPERTY_CURRENT_SNAPSHOT_ID, + Self::PROPERTY_CURRENT_SNAPSHOT_SUMMARY, + Self::PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP, + Self::PROPERTY_CURRENT_SCHEMA, + Self::PROPERTY_DEFAULT_PARTITION_SPEC, + Self::PROPERTY_DEFAULT_SORT_ORDER, + ]; + + /// Property key for number of commit retries. + pub const PROPERTY_COMMIT_NUM_RETRIES: &str = "commit.retry.num-retries"; + /// Default value for number of commit retries. + pub const PROPERTY_COMMIT_NUM_RETRIES_DEFAULT: usize = 4; + + /// Property key for minimum wait time (ms) between retries. + pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS: &str = "commit.retry.min-wait-ms"; + /// Default value for minimum wait time (ms) between retries. + pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT: u64 = 100; + + /// Property key for maximum wait time (ms) between retries. + pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS: &str = "commit.retry.max-wait-ms"; + /// Default value for maximum wait time (ms) between retries. + pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT: u64 = 60 * 1000; // 1 minute + + /// Property key for total maximum retry time (ms). + pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS: &str = "commit.retry.total-timeout-ms"; + /// Default value for total maximum retry time (ms). + pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 1000; // 30 minutes + + /// Default file format for data files + pub const PROPERTY_DEFAULT_FILE_FORMAT: &str = "write.format.default"; + /// Default file format for delete files + pub const PROPERTY_DELETE_DEFAULT_FILE_FORMAT: &str = "write.delete.format.default"; + /// Default value for data file format + pub const PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT: &str = "parquet"; + + /// Target file size for newly written files. + pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = "write.target-file-size-bytes"; + /// Default target file size + pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 1024 * 1024; // 512 MB +} + +impl TryFrom<&HashMap> for TableProperties { + // parse by entry key or use default value + type Error = anyhow::Error; + + fn try_from(props: &HashMap) -> Result { + Ok(TableProperties { + commit_num_retries: parse_property( + props, + TableProperties::PROPERTY_COMMIT_NUM_RETRIES, + TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT, + )?, + commit_min_retry_wait_ms: parse_property( + props, + TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS, + TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, + )?, + commit_max_retry_wait_ms: parse_property( + props, + TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS, + TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, + )?, + commit_total_retry_timeout_ms: parse_property( + props, + TableProperties::PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS, + TableProperties::PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT, + )?, + write_format_default: parse_property( + props, + TableProperties::PROPERTY_DEFAULT_FILE_FORMAT, + TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string(), + )?, + write_target_file_size_bytes: parse_property( + props, + TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES, + TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, + )?, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_table_properties_default() { + let props = HashMap::new(); + let table_properties = TableProperties::try_from(&props).unwrap(); + assert_eq!( + table_properties.commit_num_retries, + TableProperties::PROPERTY_COMMIT_NUM_RETRIES_DEFAULT + ); + assert_eq!( + table_properties.commit_min_retry_wait_ms, + TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT + ); + assert_eq!( + table_properties.commit_max_retry_wait_ms, + TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT + ); + assert_eq!( + table_properties.write_format_default, + TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string() + ); + assert_eq!( + table_properties.write_target_file_size_bytes, + TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT + ); + } + + #[test] + fn test_table_properties_valid() { + let props = HashMap::from([ + ( + TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(), + "10".to_string(), + ), + ( + TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS.to_string(), + "20".to_string(), + ), + ( + TableProperties::PROPERTY_DEFAULT_FILE_FORMAT.to_string(), + "avro".to_string(), + ), + ( + TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(), + "512".to_string(), + ), + ]); + let table_properties = TableProperties::try_from(&props).unwrap(); + assert_eq!(table_properties.commit_num_retries, 10); + assert_eq!(table_properties.commit_max_retry_wait_ms, 20); + assert_eq!(table_properties.write_format_default, "avro".to_string()); + assert_eq!(table_properties.write_target_file_size_bytes, 512); + } + + #[test] + fn test_table_properties_invalid() { + let invalid_retries = HashMap::from([( + TableProperties::PROPERTY_COMMIT_NUM_RETRIES.to_string(), + "abc".to_string(), + )]); + + let table_properties = TableProperties::try_from(&invalid_retries).unwrap_err(); + assert!( + table_properties.to_string().contains( + "Invalid value for commit.retry.num-retries: invalid digit found in string" + ) + ); + + let invalid_min_wait = HashMap::from([( + TableProperties::PROPERTY_COMMIT_MIN_RETRY_WAIT_MS.to_string(), + "abc".to_string(), + )]); + let table_properties = TableProperties::try_from(&invalid_min_wait).unwrap_err(); + assert!( + table_properties.to_string().contains( + "Invalid value for commit.retry.min-wait-ms: invalid digit found in string" + ) + ); + + let invalid_max_wait = HashMap::from([( + TableProperties::PROPERTY_COMMIT_MAX_RETRY_WAIT_MS.to_string(), + "abc".to_string(), + )]); + let table_properties = TableProperties::try_from(&invalid_max_wait).unwrap_err(); + assert!( + table_properties.to_string().contains( + "Invalid value for commit.retry.max-wait-ms: invalid digit found in string" + ) + ); + + let invalid_target_size = HashMap::from([( + TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(), + "abc".to_string(), + )]); + let table_properties = TableProperties::try_from(&invalid_target_size).unwrap_err(); + assert!(table_properties.to_string().contains( + "Invalid value for write.target-file-size-bytes: invalid digit found in string" + )); + } +} diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 06549a95c5..26bd652226 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -52,8 +52,6 @@ /// that allows users to apply a transaction action to a `Transaction`. mod action; -use std::collections::HashMap; - pub use action::*; mod append; mod snapshot; @@ -69,12 +67,7 @@ use std::time::Duration; use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWithContext}; use crate::error::Result; -use crate::spec::{ - PROPERTY_COMMIT_MAX_RETRY_WAIT_MS, PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, - PROPERTY_COMMIT_MIN_RETRY_WAIT_MS, PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, - PROPERTY_COMMIT_NUM_RETRIES, PROPERTY_COMMIT_NUM_RETRIES_DEFAULT, - PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS, PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT, -}; +use crate::spec::TableProperties; use crate::table::Table; use crate::transaction::action::BoxedTransactionAction; use crate::transaction::append::FastAppendAction; @@ -170,7 +163,12 @@ impl Transaction { return Ok(self.table); } - let backoff = Self::build_backoff(self.table.metadata().properties())?; + let table_props = + TableProperties::try_from(self.table.metadata().properties()).map_err(|e| { + Error::new(ErrorKind::DataInvalid, "Invalid table properties").with_source(e) + })?; + + let backoff = Self::build_backoff(table_props)?; let tx = self; (|mut tx: Transaction| async { @@ -185,53 +183,14 @@ impl Transaction { .1 } - fn build_backoff(props: &HashMap) -> Result { - let min_delay = match props.get(PROPERTY_COMMIT_MIN_RETRY_WAIT_MS) { - Some(value_str) => value_str.parse::().map_err(|e| { - Error::new( - ErrorKind::DataInvalid, - "Invalid value for commit.retry.min-wait-ms", - ) - .with_source(e) - })?, - None => PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, - }; - let max_delay = match props.get(PROPERTY_COMMIT_MAX_RETRY_WAIT_MS) { - Some(value_str) => value_str.parse::().map_err(|e| { - Error::new( - ErrorKind::DataInvalid, - "Invalid value for commit.retry.max-wait-ms", - ) - .with_source(e) - })?, - None => PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, - }; - let total_delay = match props.get(PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS) { - Some(value_str) => value_str.parse::().map_err(|e| { - Error::new( - ErrorKind::DataInvalid, - "Invalid value for commit.retry.total-timeout-ms", - ) - .with_source(e) - })?, - None => PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT, - }; - let max_times = match props.get(PROPERTY_COMMIT_NUM_RETRIES) { - Some(value_str) => value_str.parse::().map_err(|e| { - Error::new( - ErrorKind::DataInvalid, - "Invalid value for commit.retry.num-retries", - ) - .with_source(e) - })?, - None => PROPERTY_COMMIT_NUM_RETRIES_DEFAULT, - }; - + fn build_backoff(props: TableProperties) -> Result { Ok(ExponentialBuilder::new() - .with_min_delay(Duration::from_millis(min_delay)) - .with_max_delay(Duration::from_millis(max_delay)) - .with_total_delay(Some(Duration::from_millis(total_delay))) - .with_max_times(max_times) + .with_min_delay(Duration::from_millis(props.commit_min_retry_wait_ms)) + .with_max_delay(Duration::from_millis(props.commit_max_retry_wait_ms)) + .with_total_delay(Some(Duration::from_millis( + props.commit_total_retry_timeout_ms, + ))) + .with_max_times(props.commit_num_retries) .with_factor(2.0) .build()) } diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 48dc2b5b90..a03b8dc490 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -24,10 +24,9 @@ use uuid::Uuid; use crate::error::Result; use crate::spec::{ DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, ManifestEntry, - ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, Operation, - PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT, PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT, - Snapshot, SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, StructType, - Summary, update_snapshot_summaries, + ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, Operation, Snapshot, + SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, StructType, Summary, + TableProperties, update_snapshot_summaries, }; use crate::table::Table; use crate::transaction::ActionCommit; @@ -322,15 +321,15 @@ impl<'a> SnapshotProducer<'a> { let partition_summary_limit = if let Some(limit) = table_metadata .properties() - .get(PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT) + .get(TableProperties::PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT) { if let Ok(limit) = limit.parse::() { limit } else { - PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT + TableProperties::PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT } } else { - PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT + TableProperties::PROPERTY_WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT }; summary_collector.set_partition_summary_limit(partition_summary_limit); diff --git a/crates/iceberg/src/writer/file_writer/rolling_writer.rs b/crates/iceberg/src/writer/file_writer/rolling_writer.rs index 0b9b105c5e..0003617043 100644 --- a/crates/iceberg/src/writer/file_writer/rolling_writer.rs +++ b/crates/iceberg/src/writer/file_writer/rolling_writer.rs @@ -20,7 +20,7 @@ use std::fmt::{Debug, Formatter}; use arrow_array::RecordBatch; use crate::io::{FileIO, OutputFile}; -use crate::spec::{DataFileBuilder, PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, PartitionKey}; +use crate::spec::{DataFileBuilder, PartitionKey, TableProperties}; use crate::writer::CurrentFileStatus; use crate::writer::file_writer::location_generator::{FileNameGenerator, LocationGenerator}; use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; @@ -95,7 +95,7 @@ where ) -> Self { Self { inner_builder, - target_file_size: PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, + target_file_size: TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, file_io, location_generator, file_name_generator, diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 759f1a8de2..7bff239222 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -36,11 +36,7 @@ use datafusion::physical_plan::{ }; use futures::StreamExt; use iceberg::arrow::{FieldMatchMode, schema_to_arrow_schema}; -use iceberg::spec::{ - DataFileFormat, PROPERTY_DEFAULT_FILE_FORMAT, PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT, - PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES, PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, - serialize_data_file_to_json, -}; +use iceberg::spec::{DataFileFormat, TableProperties, serialize_data_file_to_json}; use iceberg::table::Table; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; use iceberg::writer::file_writer::ParquetWriterBuilder; @@ -226,8 +222,8 @@ impl ExecutionPlan for IcebergWriteExec { self.table .metadata() .properties() - .get(PROPERTY_DEFAULT_FILE_FORMAT) - .unwrap_or(&PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string()), + .get(TableProperties::PROPERTY_DEFAULT_FILE_FORMAT) + .unwrap_or(&TableProperties::PROPERTY_DEFAULT_FILE_FORMAT_DEFAULT.to_string()), ) .map_err(to_datafusion_error)?; if file_format != DataFileFormat::Parquet { @@ -250,7 +246,7 @@ impl ExecutionPlan for IcebergWriteExec { .table .metadata() .properties() - .get(PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES) + .get(TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES) { Some(value_str) => value_str .parse::() @@ -262,7 +258,7 @@ impl ExecutionPlan for IcebergWriteExec { .with_source(e) }) .map_err(to_datafusion_error)?, - None => PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, + None => TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, }; let file_io = self.table.file_io().clone();