From 3292d51d0c70b30df5b25aeefcf17f1053c665cf Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Sat, 15 Feb 2025 23:34:23 -0500 Subject: [PATCH 1/2] feat: Added `UpdatePartitionSpec` --- crates/iceberg/src/spec/partition.rs | 10 + crates/iceberg/src/spec/schema.rs | 7 + crates/iceberg/src/spec/transform.rs | 23 +- crates/iceberg/src/transaction.rs | 584 ++++++++++++++++++++++++++- 4 files changed, 619 insertions(+), 5 deletions(-) diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index e6405be4c0..452bc721e9 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -46,6 +46,16 @@ pub struct PartitionField { } impl PartitionField { + /// Creates new `PartitionField`` instance + pub fn new(source_id: i32, field_id: i32, name: String, transform: Transform) -> Self { + Self { + source_id, + field_id, + name, + transform, + } + } + /// To unbound partition field pub fn into_unbound(self) -> UnboundPartitionField { self.into() diff --git a/crates/iceberg/src/spec/schema.rs b/crates/iceberg/src/spec/schema.rs index f290441aa7..4e3b3bd462 100644 --- a/crates/iceberg/src/spec/schema.rs +++ b/crates/iceberg/src/spec/schema.rs @@ -60,6 +60,13 @@ pub struct Schema { field_id_to_accessor: HashMap>, } +impl Schema { + /// Returns id to field mapping + pub fn id_to_field(&self) -> HashMap { + self.id_to_field.clone() + } +} + impl PartialEq for Schema { fn eq(&self, other: &Self) -> bool { self.r#struct == other.r#struct diff --git a/crates/iceberg/src/spec/transform.rs b/crates/iceberg/src/spec/transform.rs index 8e9783a6c8..7e73f99af1 100644 --- a/crates/iceberg/src/spec/transform.rs +++ b/crates/iceberg/src/spec/transform.rs @@ -45,7 +45,7 @@ use crate::ErrorKind; /// predicates and partition predicates. /// /// All transforms must return `null` for a `null` input value. -#[derive(Debug, PartialEq, Eq, Clone, Copy)] +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum Transform { /// Source value, unmodified /// @@ -355,11 +355,30 @@ impl Transform { } /// Check if `Transform` is applicable on datum's `PrimitiveType` - fn can_transform(&self, datum: &Datum) -> bool { + pub fn can_transform(&self, datum: &Datum) -> bool { let input_type = datum.data_type().clone(); self.result_type(&Type::Primitive(input_type)).is_ok() } + /// Checks if can transform from type + pub fn can_transform_from_primitive_type(&self, primitive_type: &PrimitiveType) -> bool { + self.result_type(&Type::Primitive(primitive_type.clone())) + .is_ok() + } + + /// Checks if time transform + pub fn is_time_transform(&self) -> bool { + matches!( + self, + Transform::Hour | Transform::Day | Transform::Month | Transform::Year + ) + } + + /// Checks if void transform + pub fn is_void_transform(&self) -> bool { + matches!(self, Transform::Void) + } + /// Creates a unary predicate from a given operator and a reference name. fn project_unary(op: PredicateOperator, name: &str) -> Result> { Ok(Some(Predicate::Unary(UnaryExpression::new( diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index c27a107daa..b790fa6952 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -18,7 +18,7 @@ //! This module contains transaction api. use std::cmp::Ordering; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::future::Future; use std::mem::discriminant; use std::ops::RangeFrom; @@ -26,17 +26,22 @@ use std::ops::RangeFrom; use uuid::Uuid; use crate::error::Result; +use crate::expr::{Bind, Reference}; use crate::io::OutputFile; use crate::spec::{ DataFile, DataFileFormat, FormatVersion, ManifestEntry, ManifestFile, ManifestListWriter, - ManifestWriterBuilder, NullOrder, Operation, Snapshot, SnapshotReference, SnapshotRetention, - SortDirection, SortField, SortOrder, Struct, StructType, Summary, Transform, MAIN_BRANCH, + ManifestWriterBuilder, NullOrder, Operation, PartitionField, PartitionSpec, Schema, Snapshot, + SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Struct, StructType, + Summary, Transform, MAIN_BRANCH, }; use crate::table::Table; +use crate::transaction::FormatVersion::V2; use crate::TableUpdate::UpgradeFormatVersion; use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate}; const META_ROOT_PATH: &str = "metadata"; +#[allow(dead_code)] +const PARTITION_FIELD_ID_START: u32 = 1000; /// Table transaction. pub struct Transaction<'a> { @@ -600,12 +605,448 @@ impl<'a> ReplaceSortOrderAction<'a> { } } +/// Only used for v2 table metadata +#[allow(dead_code)] +struct UpdatePartitionSpec<'a> { + transaction: &'a Transaction<'a>, + name_to_field: HashMap, + name_to_added_field: HashMap, + transform_to_field: HashMap<(i32, Transform), PartitionField>, + transform_to_added_field: HashMap<(i32, Transform), PartitionField>, + renames: HashMap, + added_time_fields: HashMap, + case_sensitive: bool, + adds: Vec, + deletes: HashSet, + last_assigned_partition_id: i32, +} + +#[allow(dead_code)] +impl<'a> UpdatePartitionSpec<'a> { + pub fn new(transaction: &'a Transaction<'a>, case_sensitive: bool) -> Self { + let name_to_field = transaction + .table + .metadata() + .partition_specs + .values() + .flat_map(|spec| spec.fields().iter()) + .map(|field| (field.name.clone(), field.clone())) + .collect(); + + let transform_to_field = transaction + .table + .metadata() + .partition_specs + .values() + .flat_map(|spec| spec.fields().iter()) + .map(|field| { + let key = (field.source_id, field.transform); + (key, field.clone()) + }) + .collect(); + + Self { + transaction, + name_to_field, + name_to_added_field: HashMap::new(), + transform_to_field, + transform_to_added_field: HashMap::new(), + renames: HashMap::new(), + added_time_fields: HashMap::new(), + case_sensitive, + adds: Vec::new(), + deletes: HashSet::new(), + last_assigned_partition_id: transaction.table.metadata().last_partition_id, + } + } + + pub fn add_field( + &mut self, + source_column: String, + transform: Transform, + partition_field_name: Option, + ) -> Result<&mut Self> { + let reference = Reference::new(source_column); + let bound_reference = reference.bind( + self.transaction.table.metadata().current_schema().clone(), + self.case_sensitive, + )?; + + let primitive_type = bound_reference + .field() + .field_type + .as_primitive_type() + .unwrap(); + if !transform.can_transform_from_primitive_type(primitive_type) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Cannot apply transform output on {:?}", primitive_type), + )); + } + + // Check if field already exists with the same transformation + let transform_key = (bound_reference.field().id, transform); + + if let Some(existing_partition_field) = self.transform_to_field.get(&transform_key) { + if !self.deletes.contains(&existing_partition_field.field_id) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Duplicate partition field for {}, {} already exists", + reference.name(), + existing_partition_field.name + ), + )); + } + } + + if let Some(added) = self.transform_to_added_field.get(&transform_key) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Already added partition: {}", added.name), + )); + } + + let new_field = self.create_partition_field( + (bound_reference.field().id, transform), + partition_field_name, + )?; + + if self.name_to_added_field.contains_key(&new_field.name) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Already added partition field with name: {}", + new_field.name + ), + )); + } + + if new_field.transform.is_time_transform() { + if let Some(existing_time_field) = self.added_time_fields.get(&new_field.source_id) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add time partition field: {} conflicts with {}", + new_field.name, existing_time_field.name + ), + )); + } + self.added_time_fields + .insert(new_field.source_id, new_field.clone()); + } + + // Record the new field in the "added" map + self.transform_to_added_field + .insert(transform_key, new_field.clone()); + + if let Some(existing_partition_field) = self.name_to_field.get(&new_field.name) { + if !self.deletes.contains(&new_field.field_id) { + if existing_partition_field.transform.is_void_transform() { + let new_name = format!( + "{}_{}", + existing_partition_field.name, existing_partition_field.field_id + ); + let _ = self.rename_field(existing_partition_field.name.clone(), new_name); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add duplicate partition field name: {}", + existing_partition_field.name + ), + )); + } + } + } + + // Add new fields + self.name_to_added_field + .insert(new_field.name.clone(), new_field.clone()); + self.adds.push(new_field); + + Ok(self) + } + + pub fn add_identity(&mut self, source_column_name: String) -> Result<&mut Self> { + self.add_field(source_column_name, Transform::Identity, None) + } + + pub fn remove_field(&mut self, name: &str) -> Result<&mut Self> { + // Cannot remove name that was added in the update + if self.name_to_added_field.contains_key(name) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Cannot delete newly added field {}", name), + )); + } + + // Cannot remove a field that has been renamed + if self.renames.contains_key(name) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Cannot rename and delete field {}", name), + )); + } + + let field = self.name_to_field.get(name).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("No such partition field: {}", name), + ) + })?; + + self.deletes.insert(field.field_id); + + Ok(self) + } + + pub fn rename_field(&mut self, name: String, new_name: String) -> Result<&mut Self> { + if let Some(existing_field) = self.name_to_field.get(&new_name) { + if existing_field.transform.is_void_transform() { + let new_new_name = format!("{}_{}", name, existing_field.field_id); + return self.rename_field(name, new_new_name); + } + } + + // do not allow renaming of recently added fields. + if self.name_to_added_field.contains_key(&name) { + return Err(Error::new( + ErrorKind::DataInvalid, + "Cannot rename recently added partitions".to_string(), + )); + } + + let field = self.name_to_field.get(&name).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Cannot find partition field {}", name), + ) + })?; + + if self.deletes.contains(&field.field_id) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Cannot delete and rename partition field {}", name), + )); + } + + self.renames.insert(name, new_name); + Ok(self) + } + + /// Commit the changes and produce updates and requirements + pub fn commit(&mut self) -> (Vec, Vec) { + let new_spec = self.apply(); + let mut updates = Vec::new(); + let mut requirements = Vec::new(); + + // if default spec id changed + if self + .transaction + .table + .metadata() + .default_partition_spec() + .spec_id() + != new_spec.spec_id() + { + if !self + .transaction + .table + .metadata() + .partition_specs + .contains_key(&new_spec.spec_id()) + { + updates.push(TableUpdate::AddSpec { + spec: new_spec.clone().into_unbound(), + }); + updates.push(TableUpdate::SetDefaultSpec { spec_id: -1 }); + } else { + // Otherwise, simply update the default spec to the new spec's id + updates.push(TableUpdate::SetDefaultSpec { + spec_id: new_spec.spec_id(), + }); + } + let required_last_assigned_partition_id = + self.transaction.table.metadata().last_partition_id; + requirements.push(TableRequirement::LastAssignedPartitionIdMatch { + last_assigned_partition_id: required_last_assigned_partition_id, + }); + } + (updates, requirements) + } + + /// Apply the updates and produce a new PartitionSpec. + pub fn apply(&self) -> PartitionSpec { + let mut partition_fields = Vec::new(); + let mut partition_names = HashSet::new(); + let schema = self.transaction.table.metadata().current_schema(); + + // Iterate over existing partition fields from the metadata spec. + for spec in self.transaction.table.metadata().partition_specs.values() { + for field in spec.fields().iter() { + if !self.deletes.contains(&field.field_id) { + if let Some(renamed) = self.renames.get(&field.name) { + // Create a new field with the renamed name. + let new_field = self + .add_new_field( + schema, + field.source_id, + field.field_id, + renamed.clone(), + field.transform, + &mut partition_names, + ) + .expect("Error adding renamed field"); + partition_fields.push(new_field); + } else { + partition_fields.push(field.clone()); + } + } + } + } + + // Append newly added fields. + for field in &self.adds { + partition_fields.push(field.clone()); + } + + let mut builder = PartitionSpec::builder(schema.clone()) + .with_spec_id(self.transaction.table.metadata().default_spec.spec_id()); + + for field in partition_fields { + // Retrieve the source column name from the schema + let source_name = schema + .field_by_id(field.source_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Source id {} not found in schema", field.source_id), + ) + }) + .unwrap() + .name + .clone(); + + // Add the field using the builder. + builder = builder + .add_partition_field(source_name, field.name, field.transform) + .unwrap(); + } + + builder.build().unwrap() + } + + pub fn create_partition_field( + &mut self, + transform_key: (i32, Transform), + partition_field_name: Option, + ) -> Result { + let (source_id, transform) = transform_key; + + if self.transaction.table.metadata().format_version == V2 { + let historical_fields: Vec<&PartitionField> = self + .transaction + .table + .metadata() + .partition_specs + .values() + .flat_map(|spec| spec.fields().iter()) + .collect(); + + for field in historical_fields { + if field.source_id == source_id + && field.transform == transform + && partition_field_name + .as_ref() + .map_or(true, |name| name == &field.name) + { + return Ok(field.clone()); + } + } + } + + let new_field_id = self.new_field_id(); + let field_name = if let Some(name) = partition_field_name { + name + } else { + // TODO: create partition visitor for naming + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Partition Visitor not implemented", + )); + }; + + Ok(PartitionField::new( + source_id, + new_field_id, + field_name, + transform, + )) + } + + fn check_and_add_partition_name( + &self, + schema: &Schema, + name: &str, + source_id: i32, + partition_names: &mut HashSet, + ) -> Result<()> { + // Attempt to find a field by name in the schema. + let field = schema.field_by_name(name).unwrap(); + if field.id != source_id { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot create identity partition from a different field in the schema {}", + name + ), + )); + } + + if name.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Undefined name".to_string(), + )); + } + if partition_names.contains(name) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Partition name has to be unique: {}", name), + )); + } + partition_names.insert(name.to_string()); + Ok(()) + } + + // Create partition field helper + fn add_new_field( + &self, + schema: &Schema, + source_id: i32, + field_id: i32, + name: String, + transform: Transform, + partition_names: &mut std::collections::HashSet, + ) -> Result { + self.check_and_add_partition_name(schema, &name, source_id, partition_names)?; + Ok(PartitionField::new(source_id, field_id, name, transform)) + } + + fn new_field_id(&mut self) -> i32 { + self.last_assigned_partition_id += 1; + self.last_assigned_partition_id + } +} + #[cfg(test)] mod tests { use std::collections::HashMap; use std::fs::File; use std::io::BufReader; + use super::*; use crate::io::FileIOBuilder; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Struct, @@ -869,4 +1310,141 @@ mod tests { "Should not allow to do same kinds update in same transaction" ); } + + fn find_unused_column(schema: &Schema, ups: &UpdatePartitionSpec) -> Option { + schema.id_to_field().values().find_map(|f| { + if ups + .name_to_field + .values() + .any(|pf| pf.source_id == f.id && pf.transform == Transform::Identity) + { + None + } else { + Some(f.name.clone()) + } + }) + } + + #[test] + fn test_add_field_success() { + let table = make_v2_table(); + let tx = Transaction::new(&table); + let mut ups = UpdatePartitionSpec::new(&tx, true); + + let schema = tx.table.metadata().current_schema(); + + let new_col = find_unused_column(schema, &ups) + .expect("No available column for a new partition field"); + + // Attempt to add a new identity partition field + let res = ups.add_field( + new_col.clone(), + Transform::Identity, + Some("new_identity".to_string()), + ); + assert!(res.is_ok()); + + // Check that the new field was recorded in the 'added' maps + assert!(ups.name_to_added_field.contains_key("new_identity")); + assert!(ups + .transform_to_added_field + .values() + .any(|pf| pf.name == "new_identity")); + assert!(ups.adds.iter().any(|pf| pf.name == "new_identity")); + } + + #[test] + fn test_add_field_duplicate() { + let table = make_v2_table(); + let tx = Transaction::new(&table); + let mut ups = UpdatePartitionSpec::new(&tx, true); + + let existing_field = ups + .name_to_field + .values() + .next() + .expect("No partition field in metadata") + .clone(); + + let source_name = tx + .table + .metadata() + .current_schema() + .field_by_id(existing_field.source_id) + .unwrap() + .name + .clone(); + + // Try to add the same partition field (same source and transform). + let res = ups.add_field( + source_name, + existing_field.transform.clone(), + Some(existing_field.name.clone()), + ); + + // Shouldn't allow duplicate + assert!(res.is_err()); + } + + #[test] + fn test_remove_field_success() { + let table = make_v2_table(); + let tx = Transaction::new(&table); + let mut ups = UpdatePartitionSpec::new(&tx, true); + + let field_name = ups.name_to_field.keys().next().unwrap().clone(); + let field_id = ups.name_to_field.get(&field_name).unwrap().field_id; + + // removing existing field should succeed + let res = ups.remove_field(&field_name); + assert!(res.is_ok()); + assert!(ups.deletes.contains(&field_id)); + } + + #[test] + fn test_remove_field_newly_added_failure() { + let table = make_v2_table(); + let tx = Transaction::new(&table); + let mut ups = UpdatePartitionSpec::new(&tx, true); + + let schema = tx.table.metadata().current_schema(); + let new_col = find_unused_column(schema, &ups).expect("No available column"); + let res = ups.add_field(new_col, Transform::Identity, Some("new_field".to_string())); + assert!(res.is_ok()); + + // Attempting to remove a field that was just added should fail. + let res_remove = ups.remove_field("new_field"); + assert!(res_remove.is_err()); + } + + #[test] + fn test_rename_field_success() { + let table = make_v2_table(); + let tx = Transaction::new(&table); + let mut ups = UpdatePartitionSpec::new(&tx, true); + + // Rename an existing partition field. + let field_name = ups.name_to_field.keys().next().unwrap().clone(); + let new_name = format!("renamed_{}", field_name); + let res = ups.rename_field(field_name.clone(), new_name.clone()); + + assert!(res.is_ok()); + assert_eq!(ups.renames.get(&field_name).unwrap(), &new_name); + } + + #[test] + fn test_rename_field_newly_added_failure() { + let table = make_v2_table(); + let tx = Transaction::new(&table); + let mut ups = UpdatePartitionSpec::new(&tx, true); + + let schema = tx.table.metadata().current_schema(); + let new_col = find_unused_column(schema, &ups).expect("No available column"); + let res = ups.add_field(new_col, Transform::Identity, Some("new_field".to_string())); + assert!(res.is_ok()); + + // Renaming a newly added field is not allowed. + let res_rename = ups.rename_field("new_field".to_string(), "renamed_new_field".to_string()); + assert!(res_rename.is_err()); + } } From bb7fcc753c3b7148ec5c8bdf1ee6441846d4de65 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Sat, 15 Feb 2025 23:49:38 -0500 Subject: [PATCH 2/2] clippy fix --- crates/iceberg/src/transaction.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index b790fa6952..427d19d2bc 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -1378,7 +1378,7 @@ mod tests { // Try to add the same partition field (same source and transform). let res = ups.add_field( source_name, - existing_field.transform.clone(), + existing_field.transform, Some(existing_field.name.clone()), );