From 8f1c1e6fc2eb45b66005dc020de2fa7be6be11f4 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 23 Jun 2025 15:59:40 -0700 Subject: [PATCH 1/5] Add TableCommit::apply to help update metadata --- crates/iceberg/src/catalog/mod.rs | 27 ++++++++++++++++++++++++++- crates/iceberg/src/error.rs | 2 +- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 5cfbae2056..daac830a91 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -22,6 +22,7 @@ use std::fmt::{Debug, Display}; use std::future::Future; use std::mem::take; use std::ops::Deref; +use std::sync::Arc; use _serde::deserialize_snapshot; use async_trait::async_trait; @@ -313,6 +314,30 @@ impl TableCommit { pub fn take_updates(&mut self) -> Vec { take(&mut self.updates) } + + /// Applies this [`TableCommit`] to the given [`Table`] as part of a catalog update. + /// Typically used by [`Catalog::update_table`] to validate requirements and apply metadata updates. + /// + /// Returns a new [`Table`] with updated metadata, + /// or an error if validation or application fails. + #[allow(dead_code)] + pub fn apply(&mut self, table: Table) -> Result { + // check requirements + let requirements = self.take_requirements(); + for requirement in requirements { + requirement.check(Some(table.metadata()))?; + } + + // apply updates to metadata builder + let mut metadata_builder = table.metadata().clone().into_builder(None); + + let updates = self.take_updates(); + for update in updates { + metadata_builder = update.apply(metadata_builder)?; + } + + Ok(table.with_metadata(Arc::new(metadata_builder.build()?.metadata))) + } } /// TableRequirement represents a requirement for a table in the catalog. @@ -898,7 +923,7 @@ mod tests { UnboundPartitionSpec, ViewFormatVersion, ViewRepresentation, ViewRepresentations, ViewVersion, }; - use crate::{NamespaceIdent, TableCreation, TableIdent, TableRequirement, TableUpdate}; + use crate::{NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement, TableUpdate}; #[test] fn test_parent_namespace() { diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs index 37529ee6f3..07fd26413d 100644 --- a/crates/iceberg/src/error.rs +++ b/crates/iceberg/src/error.rs @@ -39,7 +39,7 @@ pub enum ErrorKind { /// Iceberg data is invalid. /// /// This error is returned when we try to read a table from iceberg but - /// failed to parse it's metadata or data file correctly. + /// failed to parse its metadata or data file correctly. /// /// The table could be invalid or corrupted. DataInvalid, From 7667b91af581121125681e09e043db475bb8890f Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 23 Jun 2025 16:44:35 -0700 Subject: [PATCH 2/5] added unit tests --- crates/iceberg/src/catalog/mod.rs | 77 ++++++++++++++++++++++++++++--- 1 file changed, 71 insertions(+), 6 deletions(-) diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index daac830a91..c130a8e0c8 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -321,18 +321,16 @@ impl TableCommit { /// Returns a new [`Table`] with updated metadata, /// or an error if validation or application fails. #[allow(dead_code)] - pub fn apply(&mut self, table: Table) -> Result
{ + pub fn apply(self, table: Table) -> Result
{ // check requirements - let requirements = self.take_requirements(); - for requirement in requirements { + for requirement in self.requirements { requirement.check(Some(table.metadata()))?; } // apply updates to metadata builder let mut metadata_builder = table.metadata().clone().into_builder(None); - let updates = self.take_updates(); - for update in updates { + for update in self.updates { metadata_builder = update.apply(metadata_builder)?; } @@ -909,7 +907,8 @@ mod _serde_set_statistics { mod tests { use std::collections::HashMap; use std::fmt::Debug; - + use std::fs::File; + use std::io::BufReader; use serde::Serialize; use serde::de::DeserializeOwned; use uuid::uuid; @@ -924,6 +923,8 @@ mod tests { ViewVersion, }; use crate::{NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement, TableUpdate}; + use crate::io::FileIOBuilder; + use crate::table::Table; #[test] fn test_parent_namespace() { @@ -2136,4 +2137,68 @@ mod tests { }, ); } + + #[test] + fn test_table_commit() { + let table = { + let file = File::open(format!( + "{}/testdata/table_metadata/{}", + env!("CARGO_MANIFEST_DIR"), + "TableMetadataV2Valid.json" + )) + .unwrap(); + let reader = BufReader::new(file); + let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap(); + + Table::builder() + .metadata(resp) + .metadata_location("s3://bucket/test/location/metadata/v2.json".to_string()) + .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap()) + .file_io(FileIOBuilder::new("memory").build().unwrap()) + .build() + .unwrap() + }; + + let updates = vec![ + TableUpdate::SetLocation { + location: "s3://bucket/test/new_location/metadata/v2.json".to_string(), + }, + TableUpdate::SetProperties { + updates: vec![ + ("prop1".to_string(), "v1".to_string()), + ("prop2".to_string(), "v2".to_string()), + ] + .into_iter() + .collect(), + } + ]; + + let requirements = vec![ + TableRequirement::UuidMatch { + uuid: table.metadata().table_uuid + } + ]; + + let table_commit = TableCommit::builder() + .ident(table.identifier().to_owned()) + .updates(updates) + .requirements(requirements) + .build(); + + let updated_table = table_commit.apply(table).unwrap(); + + assert_eq!( + updated_table.metadata().properties.get("prop1").unwrap(), + "v1" + ); + assert_eq!( + updated_table.metadata().properties.get("prop2").unwrap(), + "v2" + ); + + assert_eq!( + updated_table.metadata().location, + "s3://bucket/test/new_location/metadata/v2.json".to_string() + ) + } } From 7719d3b0a88726a160755e16d8a1563a393f7e8f Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 23 Jun 2025 16:47:02 -0700 Subject: [PATCH 3/5] why not fmt --- crates/iceberg/src/catalog/mod.rs | 37 ++++++++++++++++--------------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index c130a8e0c8..0b93447efa 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -909,11 +909,13 @@ mod tests { use std::fmt::Debug; use std::fs::File; use std::io::BufReader; + use serde::Serialize; use serde::de::DeserializeOwned; use uuid::uuid; use super::ViewUpdate; + use crate::io::FileIOBuilder; use crate::spec::{ BlobMetadata, FormatVersion, MAIN_BRANCH, NestedField, NullOrder, Operation, PartitionStatisticsFile, PrimitiveType, Schema, Snapshot, SnapshotReference, @@ -922,9 +924,10 @@ mod tests { UnboundPartitionSpec, ViewFormatVersion, ViewRepresentation, ViewRepresentations, ViewVersion, }; - use crate::{NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement, TableUpdate}; - use crate::io::FileIOBuilder; use crate::table::Table; + use crate::{ + NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement, TableUpdate, + }; #[test] fn test_parent_namespace() { @@ -2137,7 +2140,7 @@ mod tests { }, ); } - + #[test] fn test_table_commit() { let table = { @@ -2146,7 +2149,7 @@ mod tests { env!("CARGO_MANIFEST_DIR"), "TableMetadataV2Valid.json" )) - .unwrap(); + .unwrap(); let reader = BufReader::new(file); let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap(); @@ -2158,35 +2161,33 @@ mod tests { .build() .unwrap() }; - + let updates = vec![ - TableUpdate::SetLocation { - location: "s3://bucket/test/new_location/metadata/v2.json".to_string(), + TableUpdate::SetLocation { + location: "s3://bucket/test/new_location/metadata/v2.json".to_string(), }, TableUpdate::SetProperties { updates: vec![ ("prop1".to_string(), "v1".to_string()), ("prop2".to_string(), "v2".to_string()), ] - .into_iter() - .collect(), - } - ]; - - let requirements = vec![ - TableRequirement::UuidMatch { - uuid: table.metadata().table_uuid - } + .into_iter() + .collect(), + }, ]; + let requirements = vec![TableRequirement::UuidMatch { + uuid: table.metadata().table_uuid, + }]; + let table_commit = TableCommit::builder() .ident(table.identifier().to_owned()) .updates(updates) .requirements(requirements) .build(); - + let updated_table = table_commit.apply(table).unwrap(); - + assert_eq!( updated_table.metadata().properties.get("prop1").unwrap(), "v1" From 992a86efd376ae434591f830bf6c731c71d91298 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 23 Jun 2025 17:06:32 -0700 Subject: [PATCH 4/5] flaky test? From 90796bc701a63a71ba98be8d3d81cbea52391ebc Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 24 Jun 2025 11:36:59 -0700 Subject: [PATCH 5/5] undead code --- crates/iceberg/src/catalog/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 0b93447efa..ebb9a66ce8 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -320,7 +320,6 @@ impl TableCommit { /// /// Returns a new [`Table`] with updated metadata, /// or an error if validation or application fails. - #[allow(dead_code)] pub fn apply(self, table: Table) -> Result
{ // check requirements for requirement in self.requirements {