diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index ebee670f45..d910b5c8fc 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -162,6 +162,10 @@ pub struct Table { } impl Table { + pub(crate) fn with_metadata(&mut self, metadata: TableMetadataRef) { + self.metadata = metadata; + } + /// Returns a TableBuilder to build a table pub fn builder() -> TableBuilder { TableBuilder::new() diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 8c7b36358e..474b32becb 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -83,7 +83,7 @@ impl<'a> FastAppendAction<'a> { if !self .snapshot_produce_action .tx - .table + .current_table .metadata() .default_spec .is_unpartitioned() @@ -94,10 +94,10 @@ impl<'a> FastAppendAction<'a> { )); } - let table_metadata = self.snapshot_produce_action.tx.table.metadata(); + let table_metadata = self.snapshot_produce_action.tx.current_table.metadata(); let data_files = ParquetWriter::parquet_files_to_data_files( - self.snapshot_produce_action.tx.table.file_io(), + self.snapshot_produce_action.tx.current_table.file_io(), file_path, table_metadata, ) @@ -122,7 +122,7 @@ impl<'a> FastAppendAction<'a> { let mut manifest_stream = self .snapshot_produce_action .tx - .table + .current_table .inspect() .manifests() .scan() @@ -184,14 +184,19 @@ impl SnapshotProduceOperation for FastAppendOperation { &self, snapshot_produce: &SnapshotProduceAction<'_>, ) -> Result> { - let Some(snapshot) = snapshot_produce.tx.table.metadata().current_snapshot() else { + let Some(snapshot) = snapshot_produce + .tx + .current_table + .metadata() + .current_snapshot() + else { return Ok(vec![]); }; let manifest_list = snapshot .load_manifest_list( - snapshot_produce.tx.table.file_io(), - &snapshot_produce.tx.table.metadata_ref(), + snapshot_produce.tx.current_table.file_io(), + &snapshot_produce.tx.current_table.metadata_ref(), ) .await?; @@ -253,11 +258,11 @@ mod tests { assert_eq!( vec![ TableRequirement::UuidMatch { - uuid: tx.table.metadata().uuid() + uuid: table.metadata().uuid() }, TableRequirement::RefSnapshotIdMatch { r#ref: MAIN_BRANCH.to_string(), - snapshot_id: tx.table.metadata().current_snapshot_id + snapshot_id: table.metadata().current_snapshot_id } ], tx.requirements diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index d3c7bc3f9f..6ae25775b4 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -24,6 +24,7 @@ mod sort_order; use std::cmp::Ordering; use std::collections::HashMap; use std::mem::discriminant; +use std::sync::Arc; use uuid::Uuid; @@ -37,7 +38,8 @@ use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdat /// Table transaction. pub struct Transaction<'a> { - table: &'a Table, + base_table: &'a Table, + current_table: Table, updates: Vec, requirements: Vec, } @@ -46,38 +48,60 @@ impl<'a> Transaction<'a> { /// Creates a new transaction. pub fn new(table: &'a Table) -> Self { Self { - table, + base_table: table, + current_table: table.clone(), updates: vec![], requirements: vec![], } } - fn append_updates(&mut self, updates: Vec) -> Result<()> { - for update in &updates { - for up in &self.updates { - if discriminant(up) == discriminant(update) { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Cannot apply update with same type at same time: {:?}", - update - ), - )); - } - } + fn update_table_metadata(&mut self, updates: &[TableUpdate]) -> Result<()> { + let mut metadata_builder = self.current_table.metadata().clone().into_builder(None); + for update in updates { + metadata_builder = update.clone().apply(metadata_builder)?; } - self.updates.extend(updates); + + self.current_table + .with_metadata(Arc::new(metadata_builder.build()?.metadata)); + Ok(()) } - fn append_requirements(&mut self, requirements: Vec) -> Result<()> { - self.requirements.extend(requirements); + fn apply( + &mut self, + updates: Vec, + requirements: Vec, + ) -> Result<()> { + for requirement in &requirements { + requirement.check(Some(self.current_table.metadata()))?; + } + + self.update_table_metadata(&updates)?; + + self.updates.extend(updates); + + // For the requirements, it does not make sense to add a requirement more than once + // For example, you cannot assert that the current schema has two different IDs + for new_requirement in requirements { + if self + .requirements + .iter() + .map(discriminant) + .all(|d| d != discriminant(&new_requirement)) + { + self.requirements.push(new_requirement); + } + } + + // # TODO + // Support auto commit later. + Ok(()) } /// Sets table to a new version. pub fn upgrade_table_version(mut self, format_version: FormatVersion) -> Result { - let current_version = self.table.metadata().format_version(); + let current_version = self.current_table.metadata().format_version(); match current_version.cmp(&format_version) { Ordering::Greater => { return Err(Error::new( @@ -89,7 +113,7 @@ impl<'a> Transaction<'a> { )); } Ordering::Less => { - self.append_updates(vec![UpgradeFormatVersion { format_version }])?; + self.apply(vec![UpgradeFormatVersion { format_version }], vec![])?; } Ordering::Equal => { // Do nothing. @@ -100,7 +124,7 @@ impl<'a> Transaction<'a> { /// Update table's property. pub fn set_properties(mut self, props: HashMap) -> Result { - self.append_updates(vec![TableUpdate::SetProperties { updates: props }])?; + self.apply(vec![TableUpdate::SetProperties { updates: props }], vec![])?; Ok(self) } @@ -116,7 +140,7 @@ impl<'a> Transaction<'a> { }; let mut snapshot_id = generate_random_id(); while self - .table + .current_table .metadata() .snapshots() .any(|s| s.snapshot_id() == snapshot_id) @@ -152,14 +176,17 @@ impl<'a> Transaction<'a> { /// Remove properties in table. pub fn remove_properties(mut self, keys: Vec) -> Result { - self.append_updates(vec![TableUpdate::RemoveProperties { removals: keys }])?; + self.apply( + vec![TableUpdate::RemoveProperties { removals: keys }], + vec![], + )?; Ok(self) } /// Commit transaction. pub async fn commit(self, catalog: &dyn Catalog) -> Result { let table_commit = TableCommit::builder() - .ident(self.table.identifier().clone()) + .ident(self.base_table.identifier().clone()) .updates(self.updates) .requirements(self.requirements) .build(); @@ -308,19 +335,21 @@ mod tests { ); } - #[test] - fn test_do_same_update_in_same_transaction() { - let table = make_v2_table(); + #[tokio::test] + async fn test_transaction_apply_upgrade() { + let table = make_v1_table(); let tx = Transaction::new(&table); - let tx = tx - .remove_properties(vec!["a".to_string(), "b".to_string()]) - .unwrap(); - - let tx = tx.remove_properties(vec!["c".to_string(), "d".to_string()]); - - assert!( - tx.is_err(), - "Should not allow to do same kinds update in same transaction" + // Upgrade v1 to v1, do nothing. + let tx = tx.upgrade_table_version(FormatVersion::V1).unwrap(); + // Upgrade v1 to v2, success. + let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap(); + assert_eq!( + vec![TableUpdate::UpgradeFormatVersion { + format_version: FormatVersion::V2 + }], + tx.updates ); + // Upgrade v2 to v1, return error. + assert!(tx.upgrade_table_version(FormatVersion::V1).is_err()); } } diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index cfa6b47062..ee9721c16c 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -137,7 +137,9 @@ impl<'a> SnapshotProduceAction<'a> { )); } // Check if the data file partition spec id matches the table default partition spec id. - if self.tx.table.metadata().default_partition_spec_id() != data_file.partition_spec_id { + if self.tx.current_table.metadata().default_partition_spec_id() + != data_file.partition_spec_id + { return Err(Error::new( ErrorKind::DataInvalid, "Data file partition spec id does not match table default partition spec id", @@ -145,7 +147,7 @@ impl<'a> SnapshotProduceAction<'a> { } Self::validate_partition_value( data_file.partition(), - self.tx.table.metadata().default_partition_type(), + self.tx.current_table.metadata().default_partition_type(), )?; } self.added_data_files.extend(data_files); @@ -155,24 +157,28 @@ impl<'a> SnapshotProduceAction<'a> { fn new_manifest_output(&mut self) -> Result { let new_manifest_path = format!( "{}/{}/{}-m{}.{}", - self.tx.table.metadata().location(), + self.tx.current_table.metadata().location(), META_ROOT_PATH, self.commit_uuid, self.manifest_counter.next().unwrap(), DataFileFormat::Avro ); - self.tx.table.file_io().new_output(new_manifest_path) + self.tx + .current_table + .file_io() + .new_output(new_manifest_path) } // Write manifest file for added data files and return the ManifestFile for ManifestList. async fn write_added_manifest(&mut self) -> Result { let added_data_files = std::mem::take(&mut self.added_data_files); let snapshot_id = self.snapshot_id; + let format_version = self.tx.current_table.metadata().format_version(); let manifest_entries = added_data_files.into_iter().map(|data_file| { let builder = ManifestEntry::builder() .status(crate::spec::ManifestStatus::Added) .data_file(data_file); - if self.tx.table.metadata().format_version() == FormatVersion::V1 { + if format_version == FormatVersion::V1 { builder.snapshot_id(snapshot_id).build() } else { // For format version > 1, we set the snapshot id at the inherited time to avoid rewrite the manifest file when @@ -185,15 +191,15 @@ impl<'a> SnapshotProduceAction<'a> { self.new_manifest_output()?, Some(self.snapshot_id), self.key_metadata.clone(), - self.tx.table.metadata().current_schema().clone(), + self.tx.current_table.metadata().current_schema().clone(), self.tx - .table + .current_table .metadata() .default_partition_spec() .as_ref() .clone(), ); - if self.tx.table.metadata().format_version() == FormatVersion::V1 { + if self.tx.current_table.metadata().format_version() == FormatVersion::V1 { builder.build_v1() } else { builder.build_v2_data() @@ -227,7 +233,7 @@ impl<'a> SnapshotProduceAction<'a> { snapshot_produce_operation: &OP, ) -> Result { let mut summary_collector = SnapshotSummaryCollector::default(); - let table_metadata = self.tx.table.metadata_ref(); + let table_metadata = self.tx.current_table.metadata_ref(); let partition_summary_limit = if let Some(limit) = table_metadata .properties() @@ -275,7 +281,7 @@ impl<'a> SnapshotProduceAction<'a> { fn generate_manifest_list_file_path(&self, attempt: i64) -> String { format!( "{}/{}/snap-{}-{}-{}.{}", - self.tx.table.metadata().location(), + self.tx.current_table.metadata().location(), META_ROOT_PATH, self.snapshot_id, attempt, @@ -293,7 +299,7 @@ impl<'a> SnapshotProduceAction<'a> { let new_manifests = self .manifest_file(&snapshot_produce_operation, &process) .await?; - let next_seq_num = self.tx.table.metadata().next_sequence_number(); + let next_seq_num = self.tx.current_table.metadata().next_sequence_number(); let summary = self .summary(&snapshot_produce_operation) @@ -305,22 +311,22 @@ impl<'a> SnapshotProduceAction<'a> { let manifest_list_path = self.generate_manifest_list_file_path(0); - let mut manifest_list_writer = match self.tx.table.metadata().format_version() { + let mut manifest_list_writer = match self.tx.current_table.metadata().format_version() { FormatVersion::V1 => ManifestListWriter::v1( self.tx - .table + .current_table .file_io() .new_output(manifest_list_path.clone())?, self.snapshot_id, - self.tx.table.metadata().current_snapshot_id(), + self.tx.current_table.metadata().current_snapshot_id(), ), FormatVersion::V2 => ManifestListWriter::v2( self.tx - .table + .current_table .file_io() .new_output(manifest_list_path.clone())?, self.snapshot_id, - self.tx.table.metadata().current_snapshot_id(), + self.tx.current_table.metadata().current_snapshot_id(), next_seq_num, ), }; @@ -331,34 +337,37 @@ impl<'a> SnapshotProduceAction<'a> { let new_snapshot = Snapshot::builder() .with_manifest_list(manifest_list_path) .with_snapshot_id(self.snapshot_id) - .with_parent_snapshot_id(self.tx.table.metadata().current_snapshot_id()) + .with_parent_snapshot_id(self.tx.current_table.metadata().current_snapshot_id()) .with_sequence_number(next_seq_num) .with_summary(summary) - .with_schema_id(self.tx.table.metadata().current_schema_id()) + .with_schema_id(self.tx.current_table.metadata().current_schema_id()) .with_timestamp_ms(commit_ts) .build(); - self.tx.append_updates(vec![ - TableUpdate::AddSnapshot { - snapshot: new_snapshot, - }, - TableUpdate::SetSnapshotRef { - ref_name: MAIN_BRANCH.to_string(), - reference: SnapshotReference::new( - self.snapshot_id, - SnapshotRetention::branch(None, None, None), - ), - }, - ])?; - self.tx.append_requirements(vec![ - TableRequirement::UuidMatch { - uuid: self.tx.table.metadata().uuid(), - }, - TableRequirement::RefSnapshotIdMatch { - r#ref: MAIN_BRANCH.to_string(), - snapshot_id: self.tx.table.metadata().current_snapshot_id(), - }, - ])?; + self.tx.apply( + vec![ + TableUpdate::AddSnapshot { + snapshot: new_snapshot, + }, + TableUpdate::SetSnapshotRef { + ref_name: MAIN_BRANCH.to_string(), + reference: SnapshotReference::new( + self.snapshot_id, + SnapshotRetention::branch(None, None, None), + ), + }, + ], + vec![ + TableRequirement::UuidMatch { + uuid: self.tx.current_table.metadata().uuid(), + }, + TableRequirement::RefSnapshotIdMatch { + r#ref: MAIN_BRANCH.to_string(), + snapshot_id: self.tx.current_table.metadata().current_snapshot_id(), + }, + ], + )?; + Ok(self.tx) } } diff --git a/crates/iceberg/src/transaction/sort_order.rs b/crates/iceberg/src/transaction/sort_order.rs index 4f21eef070..51012dca10 100644 --- a/crates/iceberg/src/transaction/sort_order.rs +++ b/crates/iceberg/src/transaction/sort_order.rs @@ -52,15 +52,25 @@ impl<'a> ReplaceSortOrderAction<'a> { let requirements = vec![ TableRequirement::CurrentSchemaIdMatch { - current_schema_id: self.tx.table.metadata().current_schema().schema_id(), + current_schema_id: self + .tx + .current_table + .metadata() + .current_schema() + .schema_id(), }, TableRequirement::DefaultSortOrderIdMatch { - default_sort_order_id: self.tx.table.metadata().default_sort_order().order_id, + default_sort_order_id: self + .tx + .current_table + .metadata() + .default_sort_order() + .order_id, }, ]; - self.tx.append_requirements(requirements)?; - self.tx.append_updates(updates)?; + self.tx.apply(updates, requirements)?; + Ok(self.tx) } @@ -72,7 +82,7 @@ impl<'a> ReplaceSortOrderAction<'a> { ) -> Result { let field_id = self .tx - .table + .current_table .metadata() .current_schema() .field_id_by_name(name)