From 1832184528fe772b0ddf29b59ef864ddb56dd5c6 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Wed, 13 Mar 2024 23:44:44 +0800 Subject: [PATCH 1/3] init TableMetadataBuilder --- crates/iceberg/src/catalog/mod.rs | 43 ++++++- crates/iceberg/src/spec/partition.rs | 19 +++ crates/iceberg/src/spec/table_metadata.rs | 135 +++++++++++++++++++++- 3 files changed, 189 insertions(+), 8 deletions(-) diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 708e6bf3c4..8291a6cf38 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -18,7 +18,8 @@ //! Catalog API for Apache Iceberg use crate::spec::{ - FormatVersion, Schema, Snapshot, SnapshotReference, SortOrder, UnboundPartitionSpec, + FormatVersion, Schema, Snapshot, SnapshotReference, SortOrder, TableMetadataBuilder, + UnboundPartitionSpec, }; use crate::table::Table; use crate::{Error, ErrorKind, Result}; @@ -427,14 +428,24 @@ pub enum TableUpdate { }, } +impl TableUpdate { + /// Applies the update to the table metadata builder. + pub fn apply(self, builder: TableMetadataBuilder) -> Result { + match self { + TableUpdate::AssignUuid { uuid } => builder.assign_uuid(uuid), + _ => unimplemented!(), + } + } +} + #[cfg(test)] mod tests { use crate::spec::{ FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Summary, - Transform, Type, UnboundPartitionField, UnboundPartitionSpec, + TableMetadataBuilder, Transform, Type, UnboundPartitionField, UnboundPartitionSpec, }; - use crate::{NamespaceIdent, TableIdent, TableRequirement, TableUpdate}; + use crate::{NamespaceIdent, TableCreation, TableIdent, TableRequirement, TableUpdate}; use serde::de::DeserializeOwned; use serde::Serialize; use std::collections::HashMap; @@ -1065,4 +1076,30 @@ mod tests { test_serde_json(json, update); } + + #[test] + fn test_table_update_apply() { + let table_creation = TableCreation::builder() + .location("s3://db/table".to_string()) + .name("table".to_string()) + .partition_spec(UnboundPartitionSpec::default()) + .properties(HashMap::new()) + .schema(Schema::builder().build().unwrap()) + .sort_order(SortOrder::default()) + .build(); + let table_metadata = TableMetadataBuilder::from_table_creation(table_creation) + .unwrap() + .build() + .unwrap(); + let table_metadata_builder = TableMetadataBuilder::new(table_metadata); + + let uuid = uuid::Uuid::new_v4(); + let update = TableUpdate::AssignUuid { uuid }; + let updated_metadata = update + .apply(table_metadata_builder) + .unwrap() + .build() + .unwrap(); + assert_eq!(updated_metadata.uuid(), uuid); + } } diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index 9388820a25..db30d8c7df 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -133,6 +133,25 @@ impl UnboundPartitionSpec { pub fn builder() -> UnboundPartitionSpecBuilder { UnboundPartitionSpecBuilder::default() } + + /// Create a [`PartitionSpec`] for a new table. So it don't need + /// to specify the partition id for each field. + pub fn create_new(self) -> PartitionSpec { + PartitionSpec { + spec_id: self.spec_id.unwrap_or(0), + fields: self + .fields + .into_iter() + .enumerate() + .map(|(field_id, f)| PartitionField { + source_id: f.source_id, + field_id: f.partition_id.unwrap_or(field_id as i32), + name: f.name, + transform: f.transform, + }) + .collect(), + } + } } #[cfg(test)] diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 0ce3e742b1..cf90c0e5f6 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -32,6 +32,8 @@ use super::{ use _serde::TableMetadataEnum; +use crate::error::Result; +use crate::{Error, ErrorKind, TableCreation}; use chrono::{DateTime, TimeZone, Utc}; static MAIN_BRANCH: &str = "main"; @@ -275,6 +277,78 @@ impl TableMetadata { } } +/// Manipulating table metadata. +pub struct TableMetadataBuilder(TableMetadata); + +impl TableMetadataBuilder { + /// Creates a new table metadata builder from the given table metadata. + pub fn new(origin: TableMetadata) -> Self { + Self(origin) + } + + /// Creates a new table metadata builder from the given table creation. + pub fn from_table_creation(table_creation: TableCreation) -> Result { + let TableCreation { + name: _, + location, + schema, + partition_spec, + sort_order, + properties, + } = table_creation; + + let table_metadata = TableMetadata { + format_version: FormatVersion::V2, + table_uuid: Uuid::new_v4(), + location: location.ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Can't create table without location", + ) + })?, + last_sequence_number: 0, + last_updated_ms: 0, + last_column_id: 0, + schemas: HashMap::from([(0, Arc::new(schema))]), + current_schema_id: 0, + partition_specs: partition_spec + .map(|x| { + let partition_spec = PartitionSpecRef::new(x.create_new()); + HashMap::from([(partition_spec.spec_id, partition_spec)]) + }) + .unwrap_or_default(), + default_spec_id: 0, + last_partition_id: 0, + properties, + current_snapshot_id: None, + snapshots: Default::default(), + snapshot_log: vec![], + metadata_log: vec![], + sort_orders: sort_order + .map(|x| { + let sort_order = SortOrderRef::new(x); + HashMap::from([(sort_order.order_id, sort_order)]) + }) + .unwrap_or_default(), + default_sort_order_id: 0, + refs: Default::default(), + }; + + Ok(Self(table_metadata)) + } + + /// Changes uuid of table metadata. + pub fn assign_uuid(mut self, uuid: Uuid) -> Result { + self.0.table_uuid = uuid; + Ok(self) + } + + /// Returns the new table metadata after changes. + pub fn build(self) -> Result { + Ok(self.0) + } +} + pub(super) mod _serde { /// This is a helper module that defines types to help with serialization/deserialization. /// For deserialization the input first gets read into either the [TableMetadataV1] or [TableMetadataV2] struct @@ -838,13 +912,16 @@ mod tests { use pretty_assertions::assert_eq; - use crate::spec::{ - table_metadata::TableMetadata, NestedField, NullOrder, Operation, PartitionField, - PartitionSpec, PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, - SortDirection, SortField, SortOrder, Summary, Transform, Type, + use crate::{ + spec::{ + table_metadata::TableMetadata, NestedField, NullOrder, Operation, PartitionField, + PartitionSpec, PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, + SortDirection, SortField, SortOrder, Summary, Transform, Type, UnboundPartitionSpec, + }, + TableCreation, }; - use super::{FormatVersion, MetadataLog, SnapshotLog}; + use super::{FormatVersion, MetadataLog, SnapshotLog, TableMetadataBuilder}; fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) { let desered_type: TableMetadata = serde_json::from_str(json).unwrap(); @@ -1569,4 +1646,52 @@ mod tests { table_meta_data.sort_orders.get(&default_sort_order_id) ) } + + #[test] + fn test_table_metadata_builder_from_table_creation() { + let table_creation = TableCreation::builder() + .location("s3://db/table".to_string()) + .name("table".to_string()) + .partition_spec(UnboundPartitionSpec::default()) + .properties(HashMap::new()) + .schema(Schema::builder().build().unwrap()) + .sort_order(SortOrder::default()) + .build(); + let table_metadata = TableMetadataBuilder::from_table_creation(table_creation) + .unwrap() + .build() + .unwrap(); + assert_eq!(table_metadata.location, "s3://db/table"); + assert_eq!(table_metadata.schemas.len(), 1); + assert_eq!( + table_metadata + .schemas + .get(&0) + .unwrap() + .as_struct() + .fields() + .len(), + 0 + ); + assert_eq!(table_metadata.partition_specs.len(), 1); + assert_eq!( + table_metadata.partition_specs.get(&0).unwrap().fields.len(), + 0 + ); + assert_eq!(table_metadata.properties.len(), 0); + assert_eq!(table_metadata.sort_orders.len(), 1); + } + + #[test] + fn test_table_builder_from_table_metadata() { + let table_metadata = get_test_table_metadata("TableMetadataV2Valid.json"); + let table_metadata_builder = TableMetadataBuilder::new(table_metadata); + let uuid = Uuid::new_v4(); + let table_metadata = table_metadata_builder + .assign_uuid(uuid) + .unwrap() + .build() + .unwrap(); + assert_eq!(table_metadata.uuid(), uuid); + } } From bc3bca73f47707fd44ae37f5631b8f060b1a7361 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Thu, 14 Mar 2024 11:16:19 +0800 Subject: [PATCH 2/3] fix --- crates/iceberg/src/catalog/mod.rs | 2 - crates/iceberg/src/spec/table_metadata.rs | 48 +++++++++++------------ 2 files changed, 23 insertions(+), 27 deletions(-) diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 8291a6cf38..ad7acf4a27 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -1082,10 +1082,8 @@ mod tests { let table_creation = TableCreation::builder() .location("s3://db/table".to_string()) .name("table".to_string()) - .partition_spec(UnboundPartitionSpec::default()) .properties(HashMap::new()) .schema(Schema::builder().build().unwrap()) - .sort_order(SortOrder::default()) .build(); let table_metadata = TableMetadataBuilder::from_table_creation(table_creation) .unwrap() diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index cf90c0e5f6..9893e9eeae 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -297,6 +297,20 @@ impl TableMetadataBuilder { properties, } = table_creation; + if partition_spec.is_some() { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Can't create table with partition spec now", + )); + } + + if sort_order.is_some() { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Can't create table with sort order now", + )); + } + let table_metadata = TableMetadata { format_version: FormatVersion::V2, table_uuid: Uuid::new_v4(), @@ -307,29 +321,19 @@ impl TableMetadataBuilder { ) })?, last_sequence_number: 0, - last_updated_ms: 0, - last_column_id: 0, - schemas: HashMap::from([(0, Arc::new(schema))]), - current_schema_id: 0, - partition_specs: partition_spec - .map(|x| { - let partition_spec = PartitionSpecRef::new(x.create_new()); - HashMap::from([(partition_spec.spec_id, partition_spec)]) - }) - .unwrap_or_default(), + last_updated_ms: Utc::now().timestamp_millis(), + last_column_id: schema.highest_field_id(), + current_schema_id: schema.schema_id(), + schemas: HashMap::from([(schema.schema_id(), Arc::new(schema))]), + partition_specs: Default::default(), default_spec_id: 0, last_partition_id: 0, properties, current_snapshot_id: None, snapshots: Default::default(), snapshot_log: vec![], + sort_orders: Default::default(), metadata_log: vec![], - sort_orders: sort_order - .map(|x| { - let sort_order = SortOrderRef::new(x); - HashMap::from([(sort_order.order_id, sort_order)]) - }) - .unwrap_or_default(), default_sort_order_id: 0, refs: Default::default(), }; @@ -916,7 +920,7 @@ mod tests { spec::{ table_metadata::TableMetadata, NestedField, NullOrder, Operation, PartitionField, PartitionSpec, PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, - SortDirection, SortField, SortOrder, Summary, Transform, Type, UnboundPartitionSpec, + SortDirection, SortField, SortOrder, Summary, Transform, Type, }, TableCreation, }; @@ -1652,10 +1656,8 @@ mod tests { let table_creation = TableCreation::builder() .location("s3://db/table".to_string()) .name("table".to_string()) - .partition_spec(UnboundPartitionSpec::default()) .properties(HashMap::new()) .schema(Schema::builder().build().unwrap()) - .sort_order(SortOrder::default()) .build(); let table_metadata = TableMetadataBuilder::from_table_creation(table_creation) .unwrap() @@ -1673,13 +1675,9 @@ mod tests { .len(), 0 ); - assert_eq!(table_metadata.partition_specs.len(), 1); - assert_eq!( - table_metadata.partition_specs.get(&0).unwrap().fields.len(), - 0 - ); + assert_eq!(table_metadata.partition_specs.len(), 0); assert_eq!(table_metadata.properties.len(), 0); - assert_eq!(table_metadata.sort_orders.len(), 1); + assert_eq!(table_metadata.sort_orders.len(), 0); } #[test] From cae83cbd83e5743376f85789af3a47a00bf8647b Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Thu, 14 Mar 2024 11:34:10 +0800 Subject: [PATCH 3/3] remove unused function --- crates/iceberg/src/spec/partition.rs | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index db30d8c7df..9388820a25 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -133,25 +133,6 @@ impl UnboundPartitionSpec { pub fn builder() -> UnboundPartitionSpecBuilder { UnboundPartitionSpecBuilder::default() } - - /// Create a [`PartitionSpec`] for a new table. So it don't need - /// to specify the partition id for each field. - pub fn create_new(self) -> PartitionSpec { - PartitionSpec { - spec_id: self.spec_id.unwrap_or(0), - fields: self - .fields - .into_iter() - .enumerate() - .map(|(field_id, f)| PartitionField { - source_id: f.source_id, - field_id: f.partition_id.unwrap_or(field_id as i32), - name: f.name, - transform: f.transform, - }) - .collect(), - } - } } #[cfg(test)]