From 2952adb24907e40b591566fd55e124f4395de514 Mon Sep 17 00:00:00 2001 From: Lionel Herbet Date: Wed, 20 Sep 2023 18:05:31 +0200 Subject: [PATCH 1/6] Add builder to TableMetadata --- crates/iceberg/src/spec/table_metadata.rs | 88 ++++++++++++++++++++++- 1 file changed, 87 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index f40b63e2ec..0303131fcf 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -25,6 +25,7 @@ use std::{collections::HashMap, sync::Arc}; use serde::{Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr}; use uuid::Uuid; +use std::time::Instant; use crate::{Error, ErrorKind}; @@ -41,19 +42,24 @@ static MAIN_BRANCH: &str = "main"; static DEFAULT_SPEC_ID: i32 = 0; static DEFAULT_SORT_ORDER_ID: i64 = 0; -#[derive(Debug, PartialEq, Serialize, Deserialize, Eq, Clone)] +#[derive(Debug, PartialEq, Serialize, Deserialize, Eq, Clone, Builder)] #[serde(try_from = "TableMetadataEnum", into = "TableMetadataEnum")] +#[builder(setter(prefix = "with"))] /// Fields for the version 2 of the table metadata. pub struct TableMetadata { /// Integer Version for the format. + #[builder(default = "FormatVersion::V2")] format_version: FormatVersion, /// A UUID that identifies the table + #[builder(default)] table_uuid: Uuid, /// Location tables base location + #[builder(setter(into))] location: String, /// The tables highest sequence number last_sequence_number: i64, /// Timestamp in milliseconds from the unix epoch when the table was last updated. + #[builder(default = "Instant::now().elapsed().as_millis().try_into().unwrap()")] last_updated_ms: i64, /// An integer; the highest assigned column ID for the table. last_column_id: i32, @@ -70,14 +76,17 @@ pub struct TableMetadata { ///A string to string map of table properties. This is used to control settings that /// affect reading and writing and is not intended to be used for arbitrary metadata. /// For example, commit.retry.num-retries is used to control the number of commit retries. + #[builder(default)] properties: HashMap, /// long ID of the current table snapshot; must be the same as the current /// ID of the main branch in refs. + #[builder(setter(strip_option), default = "None")] current_snapshot_id: Option, ///A list of valid snapshots. Valid snapshots are snapshots for which all /// data files exist in the file system. A data file must not be deleted /// from the file system until the last snapshot in which it was listed is /// garbage collected. + #[builder(setter(strip_option), default = "None")] snapshots: Option>>, /// A list (optional) of timestamp and snapshot ID pairs that encodes changes /// to the current snapshot for the table. Each time the current-snapshot-id @@ -85,6 +94,7 @@ pub struct TableMetadata { /// and the new current-snapshot-id. When snapshots are expired from /// the list of valid snapshots, all entries before a snapshot that has /// expired should be removed. + #[builder(default)] snapshot_log: Vec, /// A list (optional) of timestamp and metadata file location pairs @@ -93,22 +103,31 @@ pub struct TableMetadata { /// previous metadata file location should be added to the list. /// Tables can be configured to remove oldest metadata log entries and /// keep a fixed-size log of the most recent entries after a commit. + #[builder(default)] metadata_log: Vec, /// A list of sort orders, stored as full sort order objects. + #[builder(default)] sort_orders: HashMap, /// Default sort order id of the table. Note that this could be used by /// writers, but is not used when reading because reads use the specs /// stored in manifest files. + #[builder(default)] default_sort_order_id: i64, ///A map of snapshot references. The map keys are the unique snapshot reference /// names in the table, and the map values are snapshot reference objects. /// There is always a main branch reference pointing to the current-snapshot-id /// even if the refs map is null. + #[builder(default)] refs: HashMap, } impl TableMetadata { + /// Create partition spec builer + pub fn builder() -> TableMetadataBuilder { + TableMetadataBuilder::default() + } + /// Get current schema #[inline] pub fn current_schema(&self) -> Result, Error> { @@ -1367,4 +1386,71 @@ mod tests { "data did not match any variant of untagged enum TableMetadataEnum" ) } + + #[test] + fn test_metadata_builder() { + let schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + Arc::new(NestedField::required( + 1, + "x", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new( + NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)) + .with_doc("comment"), + ), + Arc::new(NestedField::required( + 3, + "z", + Type::Primitive(PrimitiveType::Long), + )), + ]) + .build() + .unwrap(); + + let partition_spec = PartitionSpec::builder() + .with_spec_id(0) + .with_partition_field(PartitionField { + name: "x".to_string(), + transform: Transform::Identity, + source_id: 1, + field_id: 1000, + }) + .build() + .unwrap(); + + + let built_table_metadata = TableMetadata::builder() + .with_location("s3://bucket/test/location") + .with_last_sequence_number(0) + .with_last_column_id(3) + .with_schemas(HashMap::from_iter(vec![(0, Arc::new(schema))])) + .with_current_schema_id(0) + .with_partition_specs(HashMap::from_iter(vec![(0, partition_spec)])) + .with_default_spec_id(0) + .with_last_partition_id(0) + .with_refs(HashMap::from_iter(vec![( + "main".to_string(), + SnapshotReference { + snapshot_id: -1, + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: None, + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + }, + )])) + .build().unwrap(); + + assert_eq!(built_table_metadata.format_version, FormatVersion::V2); + assert_eq!(built_table_metadata.location, "s3://bucket/test/location".to_string()); + assert_eq!(built_table_metadata.last_column_id,3); + assert_eq!(built_table_metadata.current_schema_id,0); + assert_eq!(built_table_metadata.default_spec_id,0); + assert_eq!(built_table_metadata.last_partition_id,0); + assert_eq!(built_table_metadata.refs.get("main").unwrap().snapshot_id, -1); + assert_eq!(built_table_metadata.snapshots, None); + } } From 14ff235835cbe633490e45f5ca318ad3d11b46d4 Mon Sep 17 00:00:00 2001 From: Lionel Herbet Date: Fri, 29 Sep 2023 17:42:15 +0200 Subject: [PATCH 2/6] Add a validation function Update builder default values Cast Builder Error to Iceberg Error --- crates/iceberg/Cargo.toml | 2 +- crates/iceberg/src/spec/table_metadata.rs | 186 +++++++++++++++++++--- 2 files changed, 161 insertions(+), 27 deletions(-) diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 195f1949c9..f56a2fc414 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -52,7 +52,7 @@ serde_derive = "^1.0" serde_json = "^1.0" serde_repr = "0.1.16" url = "2" -uuid = "1.4.1" +uuid = { version = "1.4.1", features = ["v4"] } [dev-dependencies] pretty_assertions = "1.4.0" diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 0303131fcf..8509545062 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -20,14 +20,14 @@ Defines the [table metadata](https://iceberg.apache.org/spec/#table-metadata). The main struct here is [TableMetadataV2] which defines the data for a table. */ -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, sync::Arc, time::UNIX_EPOCH}; +use derive_builder::UninitializedFieldError; use serde::{Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr}; use uuid::Uuid; -use std::time::Instant; -use crate::{Error, ErrorKind}; +use crate::{Error, ErrorKind, TableCreation}; use super::{ partition::PartitionSpec, @@ -44,14 +44,14 @@ static DEFAULT_SORT_ORDER_ID: i64 = 0; #[derive(Debug, PartialEq, Serialize, Deserialize, Eq, Clone, Builder)] #[serde(try_from = "TableMetadataEnum", into = "TableMetadataEnum")] -#[builder(setter(prefix = "with"))] +#[builder(setter(prefix = "with"), build_fn(validate = "Self::validate", error = "Error"))] /// Fields for the version 2 of the table metadata. pub struct TableMetadata { /// Integer Version for the format. #[builder(default = "FormatVersion::V2")] format_version: FormatVersion, /// A UUID that identifies the table - #[builder(default)] + #[builder(default = "Uuid::new_v4()")] table_uuid: Uuid, /// Location tables base location #[builder(setter(into))] @@ -59,7 +59,7 @@ pub struct TableMetadata { /// The tables highest sequence number last_sequence_number: i64, /// Timestamp in milliseconds from the unix epoch when the table was last updated. - #[builder(default = "Instant::now().elapsed().as_millis().try_into().unwrap()")] + #[builder(default = "UNIX_EPOCH.elapsed().unwrap().as_millis().try_into().unwrap()")] last_updated_ms: i64, /// An integer; the highest assigned column ID for the table. last_column_id: i32, @@ -68,10 +68,13 @@ pub struct TableMetadata { /// ID of the table’s current schema. current_schema_id: i32, /// A list of partition specs, stored as full partition spec objects. + #[builder(default = "HashMap::from([(DEFAULT_SPEC_ID, PartitionSpec::builder().build().unwrap())])")] partition_specs: HashMap, /// ID of the “current” spec that writers should use by default. + #[builder(default = "DEFAULT_SPEC_ID")] default_spec_id: i32, /// An integer; the highest assigned partition field ID across all partition specs for the table. + #[builder(default = "-1")] last_partition_id: i32, ///A string to string map of table properties. This is used to control settings that /// affect reading and writing and is not intended to be used for arbitrary metadata. @@ -112,7 +115,7 @@ pub struct TableMetadata { /// Default sort order id of the table. Note that this could be used by /// writers, but is not used when reading because reads use the specs /// stored in manifest files. - #[builder(default)] + #[builder(default = "DEFAULT_SORT_ORDER_ID")] default_sort_order_id: i64, ///A map of snapshot references. The map keys are the unique snapshot reference /// names in the table, and the map values are snapshot reference objects. @@ -122,6 +125,131 @@ pub struct TableMetadata { refs: HashMap, } +// We define a from implementation from builder Error to Iceberg Error +impl From for Error { + fn from(ufe: UninitializedFieldError) -> Error { + Error::new( + ErrorKind::DataInvalid, + ufe.to_string(), + ) + } +} + +impl TableMetadataBuilder { + /// Create setter with TableCreation + pub fn with_table_creation(&mut self, tc: TableCreation) { + self.with_location(tc.location) + .with_properties(tc.properties) + .with_default_sort_order_id(tc.sort_order.order_id) + .with_sort_orders(HashMap::from([(tc.sort_order.order_id, tc.sort_order)])) + .with_current_schema_id(tc.schema.schema_id()) + .with_last_column_id(tc.schema.highest_field_id()) + .with_schemas(HashMap::from([(tc.schema.schema_id(), Arc::new(tc.schema))])); + + if tc.partition_spec.is_some() { + let partition_spec = tc.partition_spec.unwrap(); + self.with_default_spec_id(partition_spec.spec_id) + .with_last_partition_id(partition_spec.fields.iter().map(|field| field.field_id).max().unwrap_or(-1)) + .with_partition_specs(HashMap::from([(partition_spec.spec_id, partition_spec)])); + } + } + + /// validate the content of the TableMetada Struct + fn validate(&self) -> Result<(),Error> { + // default_spec_id should match an entry inside the partition_specs HashMap if set + if self.partition_specs.is_some() { + if self.default_spec_id.is_some() { + // partition_specs map should contain an entry for the default_spec_id + let partition_spec = self.partition_specs.as_ref().unwrap().get(&self.default_spec_id.unwrap()); + if partition_spec.is_none() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Default spec id {:?} is provided but there are no corresponding partition",self.default_spec_id.unwrap()), + )); + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Partitions are defined but there are no default partition spec id set"), + )); + } + } else { + if self.default_spec_id.is_some_and(|x| x != -1) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Default spec id {:?} is provided but there are no partition defined",self.default_spec_id.unwrap()), + )); + } + } + + // current_schema_id should match an entry inside schemas HashMap + if self.schemas.is_some() && self.current_schema_id.is_some() { + let current_schema = self.schemas.as_ref().unwrap().get(&self.current_schema_id.unwrap()); + if current_schema.is_none() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Current schema id {:?} is provided but there are no corresponding schemas",self.current_schema_id.unwrap()), + )); + } + // As current_schema_id and schemas are mandatory builder itself will throw an error if not set. + } + + // default_sort_order_id should match and entry inside sort_orders HashMap if set + if self.sort_orders.is_some() { + if self.default_sort_order_id.is_some() { + let default_sort_order = self.sort_orders.as_ref().unwrap().get(&self.default_sort_order_id.unwrap()); + if default_sort_order.is_none() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Default sort order id {:?} is provided but there are no corresponding sort order",self.default_sort_order_id.unwrap()), + )); + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Sort order is defined but there are no default sort order id set"), + )); + } + } else { + if self.default_sort_order_id.is_some() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Default sort order id {:?} is provided but there are no sort order defined",self.default_sort_order_id.unwrap()), + )); + } + // sort_orders and default_sort_order_id are not set, so we have default value. + } + + // current_snapshot_id should match and entry inside current_snapshot HashMap if set + if self.snapshots.as_ref().is_some_and(|x| x.is_some()) { + if self.current_snapshot_id.is_some_and(|x| x.is_some()) { + let inner_snapshots = self.snapshots.as_ref().unwrap(); + let default_snapshot = inner_snapshots.as_ref().unwrap().get(&self.current_snapshot_id.unwrap().unwrap()); + if default_snapshot.is_none() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Current snapshot id {} is provided but there are no corresponding snapshot",self.current_snapshot_id.unwrap().unwrap()), + )); + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("snapshot is defined but there are no default current snapshot id set"), + )); + } + } else { + if self.current_snapshot_id.is_some_and(|x| x.is_some()) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Current snapshot id {} is provided but there are no snapshot set",self.current_snapshot_id.unwrap().unwrap()), + )); + } + // current snapshot id and snapshots are not set, so we have default value. + } + Ok(()) + } +} + impl TableMetadata { /// Create partition spec builer pub fn builder() -> TableMetadataBuilder { @@ -411,7 +539,7 @@ pub(super) mod _serde { snapshot_log: value.snapshot_log.unwrap_or_default(), metadata_log: value.metadata_log.unwrap_or_default(), sort_orders: HashMap::from_iter( - value.sort_orders.into_iter().map(|x| (x.order_id, x)), + value.sort_orders.into_iter().map(|x: SortOrder| (x.order_id, x)), ), default_sort_order_id: value.default_sort_order_id, refs: value.refs.unwrap_or_else(|| { @@ -1421,7 +1549,6 @@ mod tests { .build() .unwrap(); - let built_table_metadata = TableMetadata::builder() .with_location("s3://bucket/test/location") .with_last_sequence_number(0) @@ -1432,25 +1559,32 @@ mod tests { .with_default_spec_id(0) .with_last_partition_id(0) .with_refs(HashMap::from_iter(vec![( - "main".to_string(), - SnapshotReference { - snapshot_id: -1, - retention: SnapshotRetention::Branch { - min_snapshots_to_keep: None, - max_snapshot_age_ms: None, - max_ref_age_ms: None, - }, - }, - )])) - .build().unwrap(); + "main".to_string(), + SnapshotReference { + snapshot_id: -1, + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: None, + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + }, + )])) + .build() + .unwrap(); assert_eq!(built_table_metadata.format_version, FormatVersion::V2); - assert_eq!(built_table_metadata.location, "s3://bucket/test/location".to_string()); - assert_eq!(built_table_metadata.last_column_id,3); - assert_eq!(built_table_metadata.current_schema_id,0); - assert_eq!(built_table_metadata.default_spec_id,0); - assert_eq!(built_table_metadata.last_partition_id,0); - assert_eq!(built_table_metadata.refs.get("main").unwrap().snapshot_id, -1); + assert_eq!( + built_table_metadata.location, + "s3://bucket/test/location".to_string() + ); + assert_eq!(built_table_metadata.last_column_id, 3); + assert_eq!(built_table_metadata.current_schema_id, 0); + assert_eq!(built_table_metadata.default_spec_id, 0); + assert_eq!(built_table_metadata.last_partition_id, 0); + assert_eq!( + built_table_metadata.refs.get("main").unwrap().snapshot_id, + -1 + ); assert_eq!(built_table_metadata.snapshots, None); } } From c6cca841bc28263356decfa7257dc4d283fe7246 Mon Sep 17 00:00:00 2001 From: Lionel Herbet Date: Fri, 29 Sep 2023 17:47:22 +0200 Subject: [PATCH 3/6] clippy fix --- crates/iceberg/src/spec/table_metadata.rs | 123 ++++++++++++++-------- 1 file changed, 79 insertions(+), 44 deletions(-) diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 8509545062..66f425e538 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -44,7 +44,10 @@ static DEFAULT_SORT_ORDER_ID: i64 = 0; #[derive(Debug, PartialEq, Serialize, Deserialize, Eq, Clone, Builder)] #[serde(try_from = "TableMetadataEnum", into = "TableMetadataEnum")] -#[builder(setter(prefix = "with"), build_fn(validate = "Self::validate", error = "Error"))] +#[builder( + setter(prefix = "with"), + build_fn(validate = "Self::validate", error = "Error") +)] /// Fields for the version 2 of the table metadata. pub struct TableMetadata { /// Integer Version for the format. @@ -68,7 +71,9 @@ pub struct TableMetadata { /// ID of the table’s current schema. current_schema_id: i32, /// A list of partition specs, stored as full partition spec objects. - #[builder(default = "HashMap::from([(DEFAULT_SPEC_ID, PartitionSpec::builder().build().unwrap())])")] + #[builder( + default = "HashMap::from([(DEFAULT_SPEC_ID, PartitionSpec::builder().build().unwrap())])" + )] partition_specs: HashMap, /// ID of the “current” spec that writers should use by default. #[builder(default = "DEFAULT_SPEC_ID")] @@ -128,39 +133,50 @@ pub struct TableMetadata { // We define a from implementation from builder Error to Iceberg Error impl From for Error { fn from(ufe: UninitializedFieldError) -> Error { - Error::new( - ErrorKind::DataInvalid, - ufe.to_string(), - ) + Error::new(ErrorKind::DataInvalid, ufe.to_string()) } } impl TableMetadataBuilder { /// Create setter with TableCreation - pub fn with_table_creation(&mut self, tc: TableCreation) { + pub fn with_table_creation(&mut self, tc: TableCreation) { self.with_location(tc.location) .with_properties(tc.properties) .with_default_sort_order_id(tc.sort_order.order_id) .with_sort_orders(HashMap::from([(tc.sort_order.order_id, tc.sort_order)])) .with_current_schema_id(tc.schema.schema_id()) .with_last_column_id(tc.schema.highest_field_id()) - .with_schemas(HashMap::from([(tc.schema.schema_id(), Arc::new(tc.schema))])); - + .with_schemas(HashMap::from([( + tc.schema.schema_id(), + Arc::new(tc.schema), + )])); + if tc.partition_spec.is_some() { let partition_spec = tc.partition_spec.unwrap(); self.with_default_spec_id(partition_spec.spec_id) - .with_last_partition_id(partition_spec.fields.iter().map(|field| field.field_id).max().unwrap_or(-1)) + .with_last_partition_id( + partition_spec + .fields + .iter() + .map(|field| field.field_id) + .max() + .unwrap_or(-1), + ) .with_partition_specs(HashMap::from([(partition_spec.spec_id, partition_spec)])); } } /// validate the content of the TableMetada Struct - fn validate(&self) -> Result<(),Error> { + fn validate(&self) -> Result<(), Error> { // default_spec_id should match an entry inside the partition_specs HashMap if set if self.partition_specs.is_some() { if self.default_spec_id.is_some() { // partition_specs map should contain an entry for the default_spec_id - let partition_spec = self.partition_specs.as_ref().unwrap().get(&self.default_spec_id.unwrap()); + let partition_spec = self + .partition_specs + .as_ref() + .unwrap() + .get(&self.default_spec_id.unwrap()); if partition_spec.is_none() { return Err(Error::new( ErrorKind::DataInvalid, @@ -170,34 +186,46 @@ impl TableMetadataBuilder { } else { return Err(Error::new( ErrorKind::DataInvalid, - format!("Partitions are defined but there are no default partition spec id set"), - )); - } - } else { - if self.default_spec_id.is_some_and(|x| x != -1) { - return Err(Error::new( - ErrorKind::DataInvalid, - format!("Default spec id {:?} is provided but there are no partition defined",self.default_spec_id.unwrap()), + "Partitions are defined but there are no default partition spec id set", )); } + } else if self.default_spec_id.is_some_and(|x| x != -1) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Default spec id {:?} is provided but there are no partition defined", + self.default_spec_id.unwrap() + ), + )); } // current_schema_id should match an entry inside schemas HashMap if self.schemas.is_some() && self.current_schema_id.is_some() { - let current_schema = self.schemas.as_ref().unwrap().get(&self.current_schema_id.unwrap()); + let current_schema = self + .schemas + .as_ref() + .unwrap() + .get(&self.current_schema_id.unwrap()); if current_schema.is_none() { return Err(Error::new( ErrorKind::DataInvalid, - format!("Current schema id {:?} is provided but there are no corresponding schemas",self.current_schema_id.unwrap()), + format!( + "Current schema id {:?} is provided but there are no corresponding schemas", + self.current_schema_id.unwrap() + ), )); } - // As current_schema_id and schemas are mandatory builder itself will throw an error if not set. + // As current_schema_id and schemas are mandatory builder itself will throw an error if not set. } // default_sort_order_id should match and entry inside sort_orders HashMap if set if self.sort_orders.is_some() { if self.default_sort_order_id.is_some() { - let default_sort_order = self.sort_orders.as_ref().unwrap().get(&self.default_sort_order_id.unwrap()); + let default_sort_order = self + .sort_orders + .as_ref() + .unwrap() + .get(&self.default_sort_order_id.unwrap()); if default_sort_order.is_none() { return Err(Error::new( ErrorKind::DataInvalid, @@ -207,24 +235,27 @@ impl TableMetadataBuilder { } else { return Err(Error::new( ErrorKind::DataInvalid, - format!("Sort order is defined but there are no default sort order id set"), - )); - } - } else { - if self.default_sort_order_id.is_some() { - return Err(Error::new( - ErrorKind::DataInvalid, - format!("Default sort order id {:?} is provided but there are no sort order defined",self.default_sort_order_id.unwrap()), + "Sort order is defined but there are no default sort order id set", )); } - // sort_orders and default_sort_order_id are not set, so we have default value. + } else if self.default_sort_order_id.is_some() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Default sort order id {:?} is provided but there are no sort order defined", + self.default_sort_order_id.unwrap() + ), + )); } // current_snapshot_id should match and entry inside current_snapshot HashMap if set if self.snapshots.as_ref().is_some_and(|x| x.is_some()) { if self.current_snapshot_id.is_some_and(|x| x.is_some()) { let inner_snapshots = self.snapshots.as_ref().unwrap(); - let default_snapshot = inner_snapshots.as_ref().unwrap().get(&self.current_snapshot_id.unwrap().unwrap()); + let default_snapshot = inner_snapshots + .as_ref() + .unwrap() + .get(&self.current_snapshot_id.unwrap().unwrap()); if default_snapshot.is_none() { return Err(Error::new( ErrorKind::DataInvalid, @@ -234,18 +265,19 @@ impl TableMetadataBuilder { } else { return Err(Error::new( ErrorKind::DataInvalid, - format!("snapshot is defined but there are no default current snapshot id set"), + "snapshot is defined but there are no default current snapshot id set", )); } - } else { - if self.current_snapshot_id.is_some_and(|x| x.is_some()) { - return Err(Error::new( - ErrorKind::DataInvalid, - format!("Current snapshot id {} is provided but there are no snapshot set",self.current_snapshot_id.unwrap().unwrap()), - )); - } - // current snapshot id and snapshots are not set, so we have default value. + } else if self.current_snapshot_id.is_some_and(|x| x.is_some()) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Current snapshot id {} is provided but there are no snapshot set", + self.current_snapshot_id.unwrap().unwrap() + ), + )); } + Ok(()) } } @@ -539,7 +571,10 @@ pub(super) mod _serde { snapshot_log: value.snapshot_log.unwrap_or_default(), metadata_log: value.metadata_log.unwrap_or_default(), sort_orders: HashMap::from_iter( - value.sort_orders.into_iter().map(|x: SortOrder| (x.order_id, x)), + value + .sort_orders + .into_iter() + .map(|x: SortOrder| (x.order_id, x)), ), default_sort_order_id: value.default_sort_order_id, refs: value.refs.unwrap_or_else(|| { From 6f78bd389cd8f10235416699b5f439252fa820ca Mon Sep 17 00:00:00 2001 From: Lionel Herbet Date: Thu, 5 Oct 2023 17:55:14 +0200 Subject: [PATCH 4/6] Add single element for Map Strenghten builder validation --- crates/iceberg/src/spec/table_metadata.rs | 571 ++++++++++++---------- 1 file changed, 312 insertions(+), 259 deletions(-) diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 66f425e538..dd137c808c 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -60,6 +60,7 @@ pub struct TableMetadata { #[builder(setter(into))] location: String, /// The tables highest sequence number + #[builder(default)] last_sequence_number: i64, /// Timestamp in milliseconds from the unix epoch when the table was last updated. #[builder(default = "UNIX_EPOCH.elapsed().unwrap().as_millis().try_into().unwrap()")] @@ -138,147 +139,255 @@ impl From for Error { } impl TableMetadataBuilder { - /// Create setter with TableCreation - pub fn with_table_creation(&mut self, tc: TableCreation) { + /// Initialize a TableMetadata with a TableCreation struct + /// the Schema, sortOrder and PartitionSpec will be set as current + pub fn with_table_creation(&mut self, tc: TableCreation) -> &mut Self { self.with_location(tc.location) .with_properties(tc.properties) - .with_default_sort_order_id(tc.sort_order.order_id) - .with_sort_orders(HashMap::from([(tc.sort_order.order_id, tc.sort_order)])) - .with_current_schema_id(tc.schema.schema_id()) - .with_last_column_id(tc.schema.highest_field_id()) - .with_schemas(HashMap::from([( - tc.schema.schema_id(), - Arc::new(tc.schema), - )])); + .with_sort_order(tc.sort_order, true) + .with_schema(tc.schema, true); - if tc.partition_spec.is_some() { - let partition_spec = tc.partition_spec.unwrap(); - self.with_default_spec_id(partition_spec.spec_id) - .with_last_partition_id( - partition_spec - .fields - .iter() - .map(|field| field.field_id) - .max() - .unwrap_or(-1), - ) - .with_partition_specs(HashMap::from([(partition_spec.spec_id, partition_spec)])); + if let Some(partition_spec) = tc.partition_spec { + self.with_partition_spec(partition_spec, true); } + self } - /// validate the content of the TableMetada Struct - fn validate(&self) -> Result<(), Error> { - // default_spec_id should match an entry inside the partition_specs HashMap if set - if self.partition_specs.is_some() { - if self.default_spec_id.is_some() { + /// Add a schema to the TableMetadata + /// schema : Schema to be added or replaced + /// current : True if the schema is the current one + pub fn with_schema(&mut self, schema: Schema, current: bool) -> &mut Self { + if current { + self.current_schema_id = Some(schema.schema_id()); + self.last_column_id = Some(schema.highest_field_id()); + } + if let Some(map) = self.schemas.as_mut() { + map.insert(schema.schema_id(), Arc::new(schema)); + } else { + self.schemas = Some(HashMap::from([(schema.schema_id(), Arc::new(schema))])); + }; + self + } + + /// Add a partition_spec to the TableMetadata and update the last_partition_id accordinlgy + /// partition_spec : PartitionSpec to be added or replaced + /// default: True if this PartitionSpec is the default one + pub fn with_partition_spec( + &mut self, + partition_spec: PartitionSpec, + default: bool, + ) -> &mut Self { + if default { + self.default_spec_id = Some(partition_spec.spec_id); + } + let max_id = partition_spec + .fields + .iter() + .map(|field| field.field_id) + .max(); + if max_id > self.last_partition_id { + self.last_partition_id = max_id; + } + if let Some(map) = self.partition_specs.as_mut() { + map.insert(partition_spec.spec_id, partition_spec); + } else { + self.partition_specs = Some(HashMap::from([(partition_spec.spec_id, partition_spec)])); + }; + self + } + + /// Add a snapshot to the TableMetadata and update last_sequence_number + /// snapshot : Snapshot to be added or replaced + /// current : True if the snapshot is the current one + pub fn with_snapshot(&mut self, snapshot: Snapshot, current: bool) -> &mut Self { + if current { + self.current_snapshot_id = Some(Some(snapshot.snapshot_id())) + } + if Some(snapshot.sequence_number()) > self.last_sequence_number { + self.last_sequence_number = Some(snapshot.sequence_number()) + } + if let Some(Some(map)) = self.snapshots.as_mut() { + map.insert(snapshot.snapshot_id(), Arc::new(snapshot)); + } else { + self.snapshots = Some(Some(HashMap::from([( + snapshot.snapshot_id(), + Arc::new(snapshot), + )]))); + }; + + self + } + + /// Add a sort_order to the TableMetadata + /// sort_order : SortOrder to be added or replaced + /// default: True if this SortOrder is the default one + pub fn with_sort_order(&mut self, sort_order: SortOrder, default: bool) -> &mut Self { + if default { + self.default_sort_order_id = Some(sort_order.order_id) + } + if let Some(map) = self.sort_orders.as_mut() { + map.insert(sort_order.order_id, sort_order); + } else { + self.sort_orders = Some(HashMap::from([(sort_order.order_id, sort_order)])); + }; + self + } + + /// Add a snapshot_reference to the TableMetadata + /// key_ref : reference id of the snapshot + /// snapshot_ref : SnapshotReference to add or update + pub fn with_ref(&mut self, key_ref: String, snapshot_ref: SnapshotReference) -> &mut Self { + if let Some(map) = self.refs.as_mut() { + map.insert(key_ref, snapshot_ref); + } else { + self.refs = Some(HashMap::from([(key_ref, snapshot_ref)])); + }; + self + } + + /// Check if the default key exists in the map. + /// Verify incoherent behavior and throw Error if : + /// - the map is not defined but the key exists + /// - the default key exists but there is no map + /// except if key is a default value + /// Params : + /// - key : the key to look for in the map + /// - map : the map to scan + /// - default : default value for the key if any + /// - field : map name for throwing a better error + fn check_id_in_map( + key: Option, + map: &Option>, + default: Option, + field: &str, + ) -> Result<(), Error> { + if map.is_some() { + if let Some(k) = key { // partition_specs map should contain an entry for the default_spec_id - let partition_spec = self - .partition_specs - .as_ref() - .unwrap() - .get(&self.default_spec_id.unwrap()); - if partition_spec.is_none() { - return Err(Error::new( + let entry = map.as_ref().unwrap().get(&k); + if entry.is_none() { + Err(Error::new( ErrorKind::DataInvalid, - format!("Default spec id {:?} is provided but there are no corresponding partition",self.default_spec_id.unwrap()), - )); + format!( + "Default id {} is provided but there are no corresponding entry in {}", + k, field + ), + )) + } else { + Ok(()) } } else { - return Err(Error::new( + Err(Error::new( ErrorKind::DataInvalid, - "Partitions are defined but there are no default partition spec id set", - )); + format!( + "{} are defined but there are no default partition spec id set", + field + ), + )) } - } else if self.default_spec_id.is_some_and(|x| x != -1) { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Default spec id {:?} is provided but there are no partition defined", - self.default_spec_id.unwrap() - ), - )); - } - - // current_schema_id should match an entry inside schemas HashMap - if self.schemas.is_some() && self.current_schema_id.is_some() { - let current_schema = self - .schemas - .as_ref() - .unwrap() - .get(&self.current_schema_id.unwrap()); - if current_schema.is_none() { - return Err(Error::new( + } else if let Some(k) = key { + if default.is_some_and(|def| k == def) { + Ok(()) + } else { + Err(Error::new( ErrorKind::DataInvalid, format!( - "Current schema id {:?} is provided but there are no corresponding schemas", - self.current_schema_id.unwrap() + "Default spec id {} is provided but there are no {} defined", + k, field ), - )); + )) } - // As current_schema_id and schemas are mandatory builder itself will throw an error if not set. + } else { + Ok(()) } + } - // default_sort_order_id should match and entry inside sort_orders HashMap if set - if self.sort_orders.is_some() { - if self.default_sort_order_id.is_some() { - let default_sort_order = self - .sort_orders - .as_ref() - .unwrap() - .get(&self.default_sort_order_id.unwrap()); - if default_sort_order.is_none() { - return Err(Error::new( - ErrorKind::DataInvalid, - format!("Default sort order id {:?} is provided but there are no corresponding sort order",self.default_sort_order_id.unwrap()), - )); - } - } else { - return Err(Error::new( - ErrorKind::DataInvalid, - "Sort order is defined but there are no default sort order id set", - )); - } - } else if self.default_sort_order_id.is_some() { - return Err(Error::new( + /// Check if last_column_id is coherent with the default schema fields ids + fn check_schema_last_column_id( + schemas: &Option>>, + schema_id: Option, + last_column_id: Option, + ) -> Result<(), Error> { + let expected_id = schemas + .as_ref() + .unwrap() + .get(&schema_id.unwrap()) + .unwrap() + .highest_field_id(); + + if expected_id == last_column_id.unwrap() { + Ok(()) + } else { + Err(Error::new( ErrorKind::DataInvalid, - format!( - "Default sort order id {:?} is provided but there are no sort order defined", - self.default_sort_order_id.unwrap() - ), - )); + "last_column_id and default schema highest field id does not match", + )) } + } - // current_snapshot_id should match and entry inside current_snapshot HashMap if set - if self.snapshots.as_ref().is_some_and(|x| x.is_some()) { - if self.current_snapshot_id.is_some_and(|x| x.is_some()) { - let inner_snapshots = self.snapshots.as_ref().unwrap(); - let default_snapshot = inner_snapshots - .as_ref() - .unwrap() - .get(&self.current_snapshot_id.unwrap().unwrap()); - if default_snapshot.is_none() { - return Err(Error::new( - ErrorKind::DataInvalid, - format!("Current snapshot id {} is provided but there are no corresponding snapshot",self.current_snapshot_id.unwrap().unwrap()), - )); - } - } else { - return Err(Error::new( - ErrorKind::DataInvalid, - "snapshot is defined but there are no default current snapshot id set", - )); - } - } else if self.current_snapshot_id.is_some_and(|x| x.is_some()) { - return Err(Error::new( + /// Check if last_partition_id is coherent with the all partition spec field ids + fn check_last_partition_id( + partition_specs: &Option>, + last_partition_id: Option, + ) -> Result<(), Error> { + let expected_id = partition_specs + .as_ref() + .map(|specs| { + specs + .values() + .map(|spec: &PartitionSpec| { + spec.fields.iter().map(|field| field.field_id).max() + }) + .max() + }) + .unwrap_or(None) + .unwrap_or(None); + if expected_id == last_partition_id { + Ok(()) + } else { + Err(Error::new( ErrorKind::DataInvalid, - format!( - "Current snapshot id {} is provided but there are no snapshot set", - self.current_snapshot_id.unwrap().unwrap() - ), - )); + "last_partittion_id and default partition highest field id does not match", + )) } + } - Ok(()) + /// validate the content of the TableMetada Struct + fn validate(&self) -> Result<(), Error> { + // check default key and maps are coherents + Self::check_id_in_map( + self.default_spec_id, + &self.partition_specs, + None, + "partitions", + ) + .and(Self::check_id_in_map( + self.current_schema_id, + &self.schemas, + None, + "schemas", + )) + .and(Self::check_id_in_map( + self.default_sort_order_id, + &self.sort_orders, + None, + "sort_order", + )) + .and(Self::check_id_in_map( + self.current_snapshot_id.unwrap_or(None), + self.snapshots.as_ref().unwrap_or(&None), + Some(-1), + "snapshots", + )) + .and(Self::check_schema_last_column_id( + &self.schemas, + self.current_schema_id, + self.last_column_id, + )) + .and(Self::check_last_partition_id( + &self.partition_specs, + self.last_partition_id, + )) } } @@ -859,10 +968,13 @@ mod tests { use pretty_assertions::assert_eq; - use crate::spec::{ - table_metadata::TableMetadata, ManifestList, NestedField, NullOrder, Operation, - PartitionField, PartitionSpec, PrimitiveType, Schema, Snapshot, SnapshotReference, - SnapshotRetention, SortDirection, SortField, SortOrder, Summary, Transform, Type, + use crate::{ + spec::{ + table_metadata::TableMetadata, ManifestList, NestedField, NullOrder, Operation, + PartitionField, PartitionSpec, PrimitiveType, Schema, Snapshot, SnapshotReference, + SnapshotRetention, SortDirection, SortField, SortOrder, Summary, Transform, Type, + }, + TableCreation, }; use super::{FormatVersion, MetadataLog, SnapshotLog}; @@ -1177,48 +1289,10 @@ mod tests { let metadata = fs::read_to_string("testdata/table_metadata/TableMetadataV2Valid.json").unwrap(); - let schema1 = Schema::builder() - .with_schema_id(0) - .with_fields(vec![Arc::new(NestedField::required( - 1, - "x", - Type::Primitive(PrimitiveType::Long), - ))]) - .build() - .unwrap(); - - let schema2 = Schema::builder() - .with_schema_id(1) - .with_fields(vec![ - Arc::new(NestedField::required( - 1, - "x", - Type::Primitive(PrimitiveType::Long), - )), - Arc::new( - NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)) - .with_doc("comment"), - ), - Arc::new(NestedField::required( - 3, - "z", - Type::Primitive(PrimitiveType::Long), - )), - ]) - .with_identifier_field_ids(vec![1, 2]) - .build() - .unwrap(); + let schema1 = generate_schema(0, 1, None); + let schema2 = generate_schema(1, 3, Some(vec![1, 2])); - let partition_spec = PartitionSpec::builder() - .with_spec_id(0) - .with_partition_field(PartitionField { - name: "x".to_string(), - transform: Transform::Identity, - source_id: 1, - field_id: 1000, - }) - .build() - .unwrap(); + let partition_spec = generate_partition_spec(0, 1); let sort_order = SortOrder::builder() .with_order_id(3) @@ -1319,37 +1393,9 @@ mod tests { let metadata = fs::read_to_string("testdata/table_metadata/TableMetadataV2ValidMinimal.json").unwrap(); - let schema = Schema::builder() - .with_schema_id(0) - .with_fields(vec![ - Arc::new(NestedField::required( - 1, - "x", - Type::Primitive(PrimitiveType::Long), - )), - Arc::new( - NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)) - .with_doc("comment"), - ), - Arc::new(NestedField::required( - 3, - "z", - Type::Primitive(PrimitiveType::Long), - )), - ]) - .build() - .unwrap(); + let schema = generate_schema(0, 3, None); - let partition_spec = PartitionSpec::builder() - .with_spec_id(0) - .with_partition_field(PartitionField { - name: "x".to_string(), - transform: Transform::Identity, - source_id: 1, - field_id: 1000, - }) - .build() - .unwrap(); + let partition_spec = generate_partition_spec(0, 1); let sort_order = SortOrder::builder() .with_order_id(3) @@ -1398,37 +1444,8 @@ mod tests { let metadata = fs::read_to_string("testdata/table_metadata/TableMetadataV1Valid.json").unwrap(); - let schema = Schema::builder() - .with_schema_id(0) - .with_fields(vec![ - Arc::new(NestedField::required( - 1, - "x", - Type::Primitive(PrimitiveType::Long), - )), - Arc::new( - NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)) - .with_doc("comment"), - ), - Arc::new(NestedField::required( - 3, - "z", - Type::Primitive(PrimitiveType::Long), - )), - ]) - .build() - .unwrap(); - - let partition_spec = PartitionSpec::builder() - .with_spec_id(0) - .with_partition_field(PartitionField { - name: "x".to_string(), - transform: Transform::Identity, - source_id: 1, - field_id: 1000, - }) - .build() - .unwrap(); + let schema = generate_schema(0, 3, None); + let partition_spec = generate_partition_spec(0, 1); let expected = TableMetadata { format_version: FormatVersion::V1, @@ -1550,49 +1567,71 @@ mod tests { ) } - #[test] - fn test_metadata_builder() { - let schema = Schema::builder() - .with_schema_id(0) - .with_fields(vec![ - Arc::new(NestedField::required( - 1, - "x", - Type::Primitive(PrimitiveType::Long), - )), - Arc::new( - NestedField::required(2, "y", Type::Primitive(PrimitiveType::Long)) - .with_doc("comment"), - ), - Arc::new(NestedField::required( - 3, - "z", - Type::Primitive(PrimitiveType::Long), - )), - ]) + fn generate_schema(id: i32, length: usize, identifier_fields_id: Option>) -> Schema { + let mut test_data: Vec<(i32, &str, PrimitiveType, Option<&str>)> = vec![ + (1, "x", PrimitiveType::Long, None), + (2, "y", PrimitiveType::Long, Some("comment")), + (3, "z", PrimitiveType::Long, None), + ]; + test_data.truncate(length); + let data: Vec> = test_data + .iter() + .map(|x| { + ( + NestedField::required(x.0, x.1, Type::Primitive(x.2.clone())), + x.3, + ) + }) + .map(|x| { + if x.1.is_some() { + x.0.with_doc(x.1.unwrap()) + } else { + x.0 + } + }) + .map(|x| Arc::new(x)) + .collect(); + Schema::builder() + .with_schema_id(id) + .with_fields(data) + .with_identifier_field_ids(identifier_fields_id.unwrap_or_default()) .build() - .unwrap(); + .unwrap() + } - let partition_spec = PartitionSpec::builder() - .with_spec_id(0) - .with_partition_field(PartitionField { - name: "x".to_string(), - transform: Transform::Identity, - source_id: 1, - field_id: 1000, + fn generate_partition_spec(id: i32, length: usize) -> PartitionSpec { + let mut test_data = vec![ + ("x", Transform::Identity, 1, 1000), + ("y", Transform::Identity, 2, 1001), + ("z", Transform::Identity, 3, 1002), + ]; + test_data.truncate(length); + let data: Vec = test_data + .iter() + .map(|x| PartitionField { + name: x.0.to_string(), + transform: x.1, + source_id: x.2, + field_id: x.3, }) + .collect(); + PartitionSpec::builder() + .with_spec_id(id) + .with_fields(data) .build() - .unwrap(); + .unwrap() + } + + #[test] + fn test_metadata_builder() { + let schema = generate_schema(0, 3, None); + let partition_spec = generate_partition_spec(0, 1); let built_table_metadata = TableMetadata::builder() .with_location("s3://bucket/test/location") .with_last_sequence_number(0) - .with_last_column_id(3) - .with_schemas(HashMap::from_iter(vec![(0, Arc::new(schema))])) - .with_current_schema_id(0) - .with_partition_specs(HashMap::from_iter(vec![(0, partition_spec)])) - .with_default_spec_id(0) - .with_last_partition_id(0) + .with_schema(schema, true) + .with_partition_spec(partition_spec, true) .with_refs(HashMap::from_iter(vec![( "main".to_string(), SnapshotReference { @@ -1615,11 +1654,25 @@ mod tests { assert_eq!(built_table_metadata.last_column_id, 3); assert_eq!(built_table_metadata.current_schema_id, 0); assert_eq!(built_table_metadata.default_spec_id, 0); - assert_eq!(built_table_metadata.last_partition_id, 0); + assert_eq!(built_table_metadata.last_partition_id, 1000); assert_eq!( built_table_metadata.refs.get("main").unwrap().snapshot_id, -1 ); assert_eq!(built_table_metadata.snapshots, None); + + let table_creation = TableCreation { + name: "test".to_string(), + location: "s3://bucket/test/location".to_string(), + schema: generate_schema(0, 3, None), + partition_spec: Some(generate_partition_spec(0, 1)), + sort_order: SortOrder::builder().with_order_id(0).build().unwrap(), + properties: HashMap::new(), + }; + let built_table_metadata = TableMetadata::builder() + .with_table_creation(table_creation) + .build(); + + assert!(built_table_metadata.is_ok()) } } From 850e17dbd0755a7b6c7b10856ef6810d9d51e21c Mon Sep 17 00:00:00 2001 From: Lionel Herbet Date: Wed, 11 Oct 2023 18:28:27 +0200 Subject: [PATCH 5/6] add only needed method on builder do not allow generation by builder for other fields except few --- crates/iceberg/src/spec/table_metadata.rs | 333 ++++++++++------------ 1 file changed, 156 insertions(+), 177 deletions(-) diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index dd137c808c..d86bebc658 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -60,27 +60,31 @@ pub struct TableMetadata { #[builder(setter(into))] location: String, /// The tables highest sequence number - #[builder(default)] + #[builder(default, setter(custom))] last_sequence_number: i64, /// Timestamp in milliseconds from the unix epoch when the table was last updated. - #[builder(default = "UNIX_EPOCH.elapsed().unwrap().as_millis().try_into().unwrap()")] + #[builder(default = "Self::current_time_ms()", setter(custom))] last_updated_ms: i64, /// An integer; the highest assigned column ID for the table. + #[builder(setter(custom))] last_column_id: i32, /// A list of schemas, stored as objects with schema-id. + #[builder(setter(custom))] schemas: HashMap>, /// ID of the table’s current schema. + #[builder(setter(custom))] current_schema_id: i32, /// A list of partition specs, stored as full partition spec objects. #[builder( - default = "HashMap::from([(DEFAULT_SPEC_ID, PartitionSpec::builder().build().unwrap())])" + default = "HashMap::from([(DEFAULT_SPEC_ID, PartitionSpec::builder().build().unwrap())])", + setter(custom) )] partition_specs: HashMap, /// ID of the “current” spec that writers should use by default. - #[builder(default = "DEFAULT_SPEC_ID")] + #[builder(default = "DEFAULT_SPEC_ID", setter(custom))] default_spec_id: i32, /// An integer; the highest assigned partition field ID across all partition specs for the table. - #[builder(default = "-1")] + #[builder(default = "-1", setter(custom))] last_partition_id: i32, ///A string to string map of table properties. This is used to control settings that /// affect reading and writing and is not intended to be used for arbitrary metadata. @@ -89,13 +93,13 @@ pub struct TableMetadata { properties: HashMap, /// long ID of the current table snapshot; must be the same as the current /// ID of the main branch in refs. - #[builder(setter(strip_option), default = "None")] + #[builder(default = "None", setter(custom))] current_snapshot_id: Option, ///A list of valid snapshots. Valid snapshots are snapshots for which all /// data files exist in the file system. A data file must not be deleted /// from the file system until the last snapshot in which it was listed is /// garbage collected. - #[builder(setter(strip_option), default = "None")] + #[builder(default = "None", setter(custom))] snapshots: Option>>, /// A list (optional) of timestamp and snapshot ID pairs that encodes changes /// to the current snapshot for the table. Each time the current-snapshot-id @@ -103,7 +107,7 @@ pub struct TableMetadata { /// and the new current-snapshot-id. When snapshots are expired from /// the list of valid snapshots, all entries before a snapshot that has /// expired should be removed. - #[builder(default)] + #[builder(default, setter(custom))] snapshot_log: Vec, /// A list (optional) of timestamp and metadata file location pairs @@ -112,22 +116,22 @@ pub struct TableMetadata { /// previous metadata file location should be added to the list. /// Tables can be configured to remove oldest metadata log entries and /// keep a fixed-size log of the most recent entries after a commit. - #[builder(default)] + #[builder(default, setter(custom))] metadata_log: Vec, /// A list of sort orders, stored as full sort order objects. - #[builder(default)] + #[builder(default, setter(custom))] sort_orders: HashMap, /// Default sort order id of the table. Note that this could be used by /// writers, but is not used when reading because reads use the specs /// stored in manifest files. - #[builder(default = "DEFAULT_SORT_ORDER_ID")] + #[builder(default = "DEFAULT_SORT_ORDER_ID", setter(custom))] default_sort_order_id: i64, ///A map of snapshot references. The map keys are the unique snapshot reference /// names in the table, and the map values are snapshot reference objects. /// There is always a main branch reference pointing to the current-snapshot-id /// even if the refs map is null. - #[builder(default)] + #[builder(default = "Self::default_ref()", setter(custom))] refs: HashMap, } @@ -139,47 +143,92 @@ impl From for Error { } impl TableMetadataBuilder { + /// Get current time in ms + fn current_time_ms() -> i64 { + UNIX_EPOCH + .elapsed() + .unwrap() + .as_millis() + .try_into() + .unwrap() + } + + fn default_ref() -> HashMap { + HashMap::from([( + "main".to_string(), + SnapshotReference { + snapshot_id: -1, + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: None, + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + }, + )]) + } + + /// Add or replace a snapshot_reference + /// branch : branch id of the snapshot + /// snapshot_ref : SnapshotReference to add or update + fn with_ref(&mut self, branch: String, snapshot_ref: SnapshotReference) -> &mut Self { + if branch == "main" { + self.current_snapshot_id = Some(Some(snapshot_ref.snapshot_id)); + if let Some(vec) = self.snapshot_log.as_mut() { + vec.push(SnapshotLog { + snapshot_id: snapshot_ref.snapshot_id, + timestamp_ms: self.last_updated_ms.unwrap_or(Self::current_time_ms()), + }) + } else { + self.snapshot_log = Some(vec![SnapshotLog { + snapshot_id: snapshot_ref.snapshot_id, + timestamp_ms: self.last_updated_ms.unwrap_or(Self::current_time_ms()), + }]) + } + } + if let Some(map) = self.refs.as_mut() { + map.insert(branch, snapshot_ref); + } else { + self.refs = Some(HashMap::from([(branch, snapshot_ref)])); + } + self + } + /// Initialize a TableMetadata with a TableCreation struct /// the Schema, sortOrder and PartitionSpec will be set as current - pub fn with_table_creation(&mut self, tc: TableCreation) -> &mut Self { + pub fn from_table_creation(&mut self, tc: TableCreation) -> &mut Self { self.with_location(tc.location) .with_properties(tc.properties) - .with_sort_order(tc.sort_order, true) - .with_schema(tc.schema, true); + .with_default_sort_order(tc.sort_order) + .with_current_schema(tc.schema); if let Some(partition_spec) = tc.partition_spec { - self.with_partition_spec(partition_spec, true); + self.with_default_partition_spec(partition_spec); } self } - /// Add a schema to the TableMetadata + /// Add or replace a schema /// schema : Schema to be added or replaced - /// current : True if the schema is the current one - pub fn with_schema(&mut self, schema: Schema, current: bool) -> &mut Self { - if current { - self.current_schema_id = Some(schema.schema_id()); - self.last_column_id = Some(schema.highest_field_id()); - } + pub fn with_schema(&mut self, schema: Schema) -> &mut Self { if let Some(map) = self.schemas.as_mut() { map.insert(schema.schema_id(), Arc::new(schema)); } else { self.schemas = Some(HashMap::from([(schema.schema_id(), Arc::new(schema))])); - }; + } self } - /// Add a partition_spec to the TableMetadata and update the last_partition_id accordinlgy + /// Add or replace a schema and set current schema to this one + /// schema : Schema to be added or replaced + pub fn with_current_schema(&mut self, schema: Schema) -> &mut Self { + self.current_schema_id = Some(schema.schema_id()); + self.last_column_id = Some(schema.highest_field_id()); + self.with_schema(schema) + } + + /// Add or replace a partition_spec and update the last_partition_id accordingly /// partition_spec : PartitionSpec to be added or replaced - /// default: True if this PartitionSpec is the default one - pub fn with_partition_spec( - &mut self, - partition_spec: PartitionSpec, - default: bool, - ) -> &mut Self { - if default { - self.default_spec_id = Some(partition_spec.spec_id); - } + pub fn with_partition_spec(&mut self, partition_spec: PartitionSpec) -> &mut Self { let max_id = partition_spec .fields .iter() @@ -192,20 +241,38 @@ impl TableMetadataBuilder { map.insert(partition_spec.spec_id, partition_spec); } else { self.partition_specs = Some(HashMap::from([(partition_spec.spec_id, partition_spec)])); - }; + } self } - /// Add a snapshot to the TableMetadata and update last_sequence_number + /// Add or replace a partition_spec, update the last_partition_id accordingly + /// and set the default spec id to the partition_spec id + /// partition_spec : PartitionSpec to be added or replaced + pub fn with_default_partition_spec(&mut self, partition_spec: PartitionSpec) -> &mut Self { + self.default_spec_id = Some(partition_spec.spec_id); + self.with_partition_spec(partition_spec) + } + + /// Add or replace a snapshot to the main branch, update last_sequence_number /// snapshot : Snapshot to be added or replaced - /// current : True if the snapshot is the current one - pub fn with_snapshot(&mut self, snapshot: Snapshot, current: bool) -> &mut Self { - if current { - self.current_snapshot_id = Some(Some(snapshot.snapshot_id())) - } + pub fn with_branch_snapshot(&mut self, branch: String, snapshot: Snapshot) -> &mut Self { if Some(snapshot.sequence_number()) > self.last_sequence_number { - self.last_sequence_number = Some(snapshot.sequence_number()) + self.last_sequence_number = Some(snapshot.sequence_number()); + } + if self.last_updated_ms < Some(snapshot.timestamp()) { + self.last_updated_ms = Some(snapshot.timestamp()); } + self.with_ref( + branch, + SnapshotReference::new( + snapshot.snapshot_id(), + SnapshotRetention::Branch { + min_snapshots_to_keep: None, + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + ), + ); if let Some(Some(map)) = self.snapshots.as_mut() { map.insert(snapshot.snapshot_id(), Arc::new(snapshot)); } else { @@ -213,18 +280,19 @@ impl TableMetadataBuilder { snapshot.snapshot_id(), Arc::new(snapshot), )]))); - }; - + } self } - /// Add a sort_order to the TableMetadata + /// Add or replace a snapshot to the main branch, update last_sequence_number + /// snapshot : Snapshot to be added or replaced + pub fn with_snapshot(&mut self, snapshot: Snapshot) -> &mut Self { + self.with_branch_snapshot("main".to_string(), snapshot) + } + + /// Add or replace a sort_order /// sort_order : SortOrder to be added or replaced - /// default: True if this SortOrder is the default one - pub fn with_sort_order(&mut self, sort_order: SortOrder, default: bool) -> &mut Self { - if default { - self.default_sort_order_id = Some(sort_order.order_id) - } + pub fn with_sort_order(&mut self, sort_order: SortOrder) -> &mut Self { if let Some(map) = self.sort_orders.as_mut() { map.insert(sort_order.order_id, sort_order); } else { @@ -233,16 +301,11 @@ impl TableMetadataBuilder { self } - /// Add a snapshot_reference to the TableMetadata - /// key_ref : reference id of the snapshot - /// snapshot_ref : SnapshotReference to add or update - pub fn with_ref(&mut self, key_ref: String, snapshot_ref: SnapshotReference) -> &mut Self { - if let Some(map) = self.refs.as_mut() { - map.insert(key_ref, snapshot_ref); - } else { - self.refs = Some(HashMap::from([(key_ref, snapshot_ref)])); - }; - self + /// Add or replace a sort_order and set the default sort order to this one. + /// sort_order : SortOrder to be added or replaced + pub fn with_default_sort_order(&mut self, sort_order: SortOrder) -> &mut Self { + self.default_sort_order_id = Some(sort_order.order_id); + self.with_sort_order(sort_order) } /// Check if the default key exists in the map. @@ -258,136 +321,64 @@ impl TableMetadataBuilder { fn check_id_in_map( key: Option, map: &Option>, - default: Option, field: &str, ) -> Result<(), Error> { - if map.is_some() { - if let Some(k) = key { - // partition_specs map should contain an entry for the default_spec_id - let entry = map.as_ref().unwrap().get(&k); + if let Some(k) = key { + if let Some(m) = map { + // Key and map exists, let check if entry for the given key exists in map + let entry = m.get(&k); if entry.is_none() { Err(Error::new( ErrorKind::DataInvalid, format!( - "Default id {} is provided but there are no corresponding entry in {}", - k, field + "Default {} id {} is provided but there are no corresponding entry in {}", + field, k, field ), )) } else { Ok(()) } } else { + // Key exist and map does not Err(Error::new( ErrorKind::DataInvalid, format!( - "{} are defined but there are no default partition spec id set", - field - ), - )) - } - } else if let Some(k) = key { - if default.is_some_and(|def| k == def) { - Ok(()) - } else { - Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Default spec id {} is provided but there are no {} defined", - k, field + "Default {} id {} is provided but there are no {} defined", + field, k, field ), )) } - } else { - Ok(()) - } - } - - /// Check if last_column_id is coherent with the default schema fields ids - fn check_schema_last_column_id( - schemas: &Option>>, - schema_id: Option, - last_column_id: Option, - ) -> Result<(), Error> { - let expected_id = schemas - .as_ref() - .unwrap() - .get(&schema_id.unwrap()) - .unwrap() - .highest_field_id(); - - if expected_id == last_column_id.unwrap() { - Ok(()) - } else { + } else if map.is_some() { + // Key does not exist but map does Err(Error::new( ErrorKind::DataInvalid, - "last_column_id and default schema highest field id does not match", + format!("Default id is not provided for the field {}", field), )) - } - } - - /// Check if last_partition_id is coherent with the all partition spec field ids - fn check_last_partition_id( - partition_specs: &Option>, - last_partition_id: Option, - ) -> Result<(), Error> { - let expected_id = partition_specs - .as_ref() - .map(|specs| { - specs - .values() - .map(|spec: &PartitionSpec| { - spec.fields.iter().map(|field| field.field_id).max() - }) - .max() - }) - .unwrap_or(None) - .unwrap_or(None); - if expected_id == last_partition_id { - Ok(()) } else { - Err(Error::new( - ErrorKind::DataInvalid, - "last_partittion_id and default partition highest field id does not match", - )) + // Key and map does not exist, builder will handle required fields + Ok(()) } } /// validate the content of the TableMetada Struct fn validate(&self) -> Result<(), Error> { // check default key and maps are coherents - Self::check_id_in_map( - self.default_spec_id, - &self.partition_specs, - None, - "partitions", - ) - .and(Self::check_id_in_map( - self.current_schema_id, - &self.schemas, - None, - "schemas", - )) - .and(Self::check_id_in_map( - self.default_sort_order_id, - &self.sort_orders, - None, - "sort_order", - )) - .and(Self::check_id_in_map( - self.current_snapshot_id.unwrap_or(None), - self.snapshots.as_ref().unwrap_or(&None), - Some(-1), - "snapshots", - )) - .and(Self::check_schema_last_column_id( - &self.schemas, - self.current_schema_id, - self.last_column_id, - )) - .and(Self::check_last_partition_id( - &self.partition_specs, - self.last_partition_id, - )) + Self::check_id_in_map(self.default_spec_id, &self.partition_specs, "partitions") + .and(Self::check_id_in_map( + self.current_schema_id, + &self.schemas, + "schemas", + )) + .and(Self::check_id_in_map( + self.default_sort_order_id, + &self.sort_orders, + "sort_order", + )) + .and(Self::check_id_in_map( + self.current_snapshot_id.unwrap_or(None), + self.snapshots.as_ref().unwrap_or(&None), + "snapshots", + )) } } @@ -1629,20 +1620,8 @@ mod tests { let built_table_metadata = TableMetadata::builder() .with_location("s3://bucket/test/location") - .with_last_sequence_number(0) - .with_schema(schema, true) - .with_partition_spec(partition_spec, true) - .with_refs(HashMap::from_iter(vec![( - "main".to_string(), - SnapshotReference { - snapshot_id: -1, - retention: SnapshotRetention::Branch { - min_snapshots_to_keep: None, - max_snapshot_age_ms: None, - max_ref_age_ms: None, - }, - }, - )])) + .with_current_schema(schema) + .with_default_partition_spec(partition_spec) .build() .unwrap(); @@ -1670,7 +1649,7 @@ mod tests { properties: HashMap::new(), }; let built_table_metadata = TableMetadata::builder() - .with_table_creation(table_creation) + .from_table_creation(table_creation) .build(); assert!(built_table_metadata.is_ok()) From c81c9de20a7eecaf93e82af04f80aa8d1601b9a4 Mon Sep 17 00:00:00 2001 From: Lionel Herbet Date: Wed, 18 Oct 2023 17:18:52 +0200 Subject: [PATCH 6/6] move builder error from impl to error module --- crates/iceberg/src/error.rs | 10 ++++++++++ crates/iceberg/src/spec/table_metadata.rs | 8 -------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs index e4ae576d82..4d822e5a15 100644 --- a/crates/iceberg/src/error.rs +++ b/crates/iceberg/src/error.rs @@ -21,6 +21,8 @@ use std::fmt::Debug; use std::fmt::Display; use std::fmt::Formatter; +use derive_builder::UninitializedFieldError; + /// Result that is a wrapper of `Result` pub type Result = std::result::Result; @@ -192,6 +194,14 @@ impl std::error::Error for Error { } } +// We define a from implementation from builder Error to Iceberg Error +impl From for Error { + fn from(ufe: UninitializedFieldError) -> Error { + Error::new(ErrorKind::DataInvalid, "Some fields of table metadata not inited") + .with_source(ufe) + } +} + impl Error { /// Create a new Error with error kind and message. pub fn new(kind: ErrorKind, message: impl Into) -> Self { diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index d86bebc658..dd97ab2027 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -22,7 +22,6 @@ The main struct here is [TableMetadataV2] which defines the data for a table. use std::{collections::HashMap, sync::Arc, time::UNIX_EPOCH}; -use derive_builder::UninitializedFieldError; use serde::{Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr}; use uuid::Uuid; @@ -135,13 +134,6 @@ pub struct TableMetadata { refs: HashMap, } -// We define a from implementation from builder Error to Iceberg Error -impl From for Error { - fn from(ufe: UninitializedFieldError) -> Error { - Error::new(ErrorKind::DataInvalid, ufe.to_string()) - } -} - impl TableMetadataBuilder { /// Get current time in ms fn current_time_ms() -> i64 {