From 31c63e478ad4a147c48cd6233c7a488b084720cf Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Wed, 15 Nov 2023 17:39:20 +0800 Subject: [PATCH 01/10] feat: Implement update table/create table api for rest catalog --- crates/catalog/rest/src/catalog.rs | 123 ++++++++++------- crates/iceberg/src/catalog/mod.rs | 104 ++++++++++++--- crates/iceberg/src/lib.rs | 2 + crates/iceberg/src/spec/sort.rs | 7 +- crates/iceberg/src/spec/table_metadata.rs | 60 ++++++--- crates/iceberg/src/transaction.rs | 156 ++++++++++++++++++++++ 6 files changed, 373 insertions(+), 79 deletions(-) create mode 100644 crates/iceberg/src/transaction.rs diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 653cfd4f5e..d7447f0ee5 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -26,7 +26,7 @@ use serde::de::DeserializeOwned; use typed_builder::TypedBuilder; use urlencoding::encode; -use crate::catalog::_serde::LoadTableResponse; +use crate::catalog::_serde::{CommitTableRequest, CommitTableResponse, LoadTableResponse}; use iceberg::io::{FileIO, FileIOBuilder}; use iceberg::table::Table; use iceberg::Result; @@ -75,7 +75,7 @@ impl RestCatalogConfig { &ns.encode_in_url(), "tables", ] - .join("/") + .join("/") } fn rename_table_endpoint(&self) -> String { @@ -91,7 +91,7 @@ impl RestCatalogConfig { "tables", encode(&table.name).as_ref(), ] - .join("/") + .join("/") } fn try_create_rest_client(&self) -> Result { @@ -138,8 +138,8 @@ impl HttpClient { ErrorKind::Unexpected, "Failed to parse response from rest catalog server!", ) - .with_context("json", String::from_utf8_lossy(&text)) - .with_source(e) + .with_context("json", String::from_utf8_lossy(&text)) + .with_source(e) })?) } else { let text = resp.bytes().await?; @@ -148,8 +148,8 @@ impl HttpClient { ErrorKind::Unexpected, "Failed to parse response from rest catalog server!", ) - .with_context("json", String::from_utf8_lossy(&text)) - .with_source(e) + .with_context("json", String::from_utf8_lossy(&text)) + .with_source(e) })?; Err(e.into()) } @@ -170,8 +170,8 @@ impl HttpClient { ErrorKind::Unexpected, "Failed to parse response from rest catalog server!", ) - .with_context("json", String::from_utf8_lossy(&text)) - .with_source(e) + .with_context("json", String::from_utf8_lossy(&text)) + .with_source(e) })?; Err(e.into()) } @@ -330,20 +330,7 @@ impl Catalog for RestCatalog { .query::(request) .await?; - let mut props = self.config.props.clone(); - if let Some(config) = resp.config { - props.extend(config); - } - - let file_io = match self - .config - .warehouse - .as_ref() - .or_else(|| resp.metadata_location.as_ref()) - { - Some(url) => FileIO::from_path(url)?.with_props(props).build()?, - None => FileIOBuilder::new("s3").with_props(props).build()?, - }; + let file_io = self.load_file_io(resp.metadata_location.as_deref(), resp.config)?; let table_builder = Table::builder() .identifier(table.clone()) @@ -401,14 +388,26 @@ impl Catalog for RestCatalog { .await } - /// Update a table to the catalog. - async fn update_table(&self, _table: &TableIdent, _commit: TableCommit) -> Result { - todo!() - } - - /// Update multiple tables to the catalog as an atomic operation. - async fn update_tables(&self, _tables: &[(TableIdent, TableCommit)]) -> Result<()> { - todo!() + /// Update table. + async fn update_table(&self, mut commit: TableCommit) -> Result
{ + let request = self.client + .0 + .post(self.config.table_endpoint(&commit.identifier())) + .json(&CommitTableRequest { + identifier: commit.identifier().clone(), + requirements: commit.take_requirements(), + updates: commit.take_updates(), + }).build()?; + + let resp = self.client.query::(request).await?; + + let file_io = self.load_file_io(Some(&resp.metadata_location()), None)?; + Ok(Table::builder() + .identifier(commit.identifier().clone()) + .file_io(file_io) + .metadata(resp.metadata) + .metadata_location(resp.metadata_location) + .build()) } } @@ -446,6 +445,25 @@ impl RestCatalog { Ok(()) } + + fn load_file_io(&self, metadata_location: Option<&str>, extra_config: Option>) -> Result { + let mut props = self.config.props.clone(); + if let Some(config) = extra_config { + props.extend(config); + } + + let file_io = match self + .config + .warehouse + .as_deref() + .or_else(|| metadata_location) + { + Some(url) => FileIO::from_path(url)?.with_props(props).build()?, + None => FileIOBuilder::new("s3").with_props(props).build()?, + }; + + Ok(file_io) + } } /// Requests and responses for rest api. @@ -455,7 +473,7 @@ mod _serde { use serde_derive::{Deserialize, Serialize}; use iceberg::spec::TableMetadata; - use iceberg::{Error, ErrorKind, Namespace, TableIdent}; + use iceberg::{Error, ErrorKind, Namespace, TableIdent, TableRequirement, TableUpdate}; pub(super) const OK: u16 = 200u16; pub(super) const NO_CONTENT: u16 = 204u16; @@ -586,6 +604,19 @@ mod _serde { pub(super) metadata: TableMetadata, pub(super) config: Option>, } + + #[derive(Debug, Serialize, Deserialize)] + pub(super) struct CommitTableRequest { + pub(super) identifier: TableIdent, + pub(super) requirements: Vec, + pub(super) updates: Vec, + } + + #[derive(Debug, Serialize, Deserialize)] + pub(super) struct CommitTableResponse { + pub(super) metadata_location: String, + pub(super) metadata: TableMetadata, + } } #[cfg(test)] @@ -1017,31 +1048,31 @@ mod tests { .with_summary(Summary { operation: Operation::Append, other: HashMap::from_iter([ - ("spark.app.id", "local-1646787004168"), - ("added-data-files", "1"), - ("added-records", "1"), - ("added-files-size", "697"), - ("changed-partition-count", "1"), - ("total-records", "1"), - ("total-files-size", "697"), - ("total-data-files", "1"), - ("total-delete-files", "0"), - ("total-position-deletes", "0"), - ("total-equality-deletes", "0") - ].iter().map(|p|(p.0.to_string(), p.1.to_string()))) + ("spark.app.id", "local-1646787004168"), + ("added-data-files", "1"), + ("added-records", "1"), + ("added-files-size", "697"), + ("changed-partition-count", "1"), + ("total-records", "1"), + ("total-files-size", "697"), + ("total-data-files", "1"), + ("total-delete-files", "0"), + ("total-position-deletes", "0"), + ("total-equality-deletes", "0") + ].iter().map(|p| (p.0.to_string(), p.1.to_string()))), }).build().unwrap() )], table.metadata().snapshots().collect::>()); assert_eq!( &[SnapshotLog { timestamp_ms: 1646787054459, - snapshot_id: 3497810964824022504 + snapshot_id: 3497810964824022504, }], table.metadata().history() ); assert_eq!( vec![&Arc::new(SortOrder { order_id: 0, - fields: vec![] + fields: vec![], })], table.metadata().sort_orders_iter().collect::>() ); diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index b2d8d8ebde..ba4b41ba46 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -20,19 +20,21 @@ use serde_derive::{Deserialize, Serialize}; use urlencoding::encode; -use crate::spec::{PartitionSpec, Schema, SortOrder}; +use crate::spec::{FormatVersion, PartitionSpec, Schema, Snapshot, SnapshotReference, SortOrder}; use crate::table::Table; use crate::{Error, ErrorKind, Result}; use async_trait::async_trait; use std::collections::HashMap; +use std::mem::{replace, take}; use std::ops::Deref; +use typed_builder::TypedBuilder; /// The catalog API for Iceberg Rust. #[async_trait] pub trait Catalog: std::fmt::Debug { /// List namespaces from table. async fn list_namespaces(&self, parent: Option<&NamespaceIdent>) - -> Result>; + -> Result>; /// Create a new namespace inside the catalog. async fn create_namespace( @@ -84,10 +86,7 @@ pub trait Catalog: std::fmt::Debug { async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()>; /// Update a table to the catalog. - async fn update_table(&self, table: &TableIdent, commit: TableCommit) -> Result
; - - /// Update multiple tables to the catalog as an atomic operation. - async fn update_tables(&self, tables: &[(TableIdent, TableCommit)]) -> Result<()>; + async fn update_table(&self, commit: TableCommit) -> Result
; } /// NamespaceIdent represents the identifier of a namespace in the catalog. @@ -116,7 +115,7 @@ impl NamespaceIdent { } /// Try to create namespace identifier from an iterator of string. - pub fn from_strs(iter: impl IntoIterator) -> Result { + pub fn from_strs(iter: impl IntoIterator) -> Result { Self::from_vec(iter.into_iter().map(|s| s.to_string()).collect()) } @@ -200,7 +199,7 @@ impl TableIdent { } /// Try to create table identifier from an iterator of string. - pub fn from_strs(iter: impl IntoIterator) -> Result { + pub fn from_strs(iter: impl IntoIterator) -> Result { let mut vec: Vec = iter.into_iter().map(|s| s.to_string()).collect(); let table_name = vec.pop().ok_or_else(|| { Error::new(ErrorKind::DataInvalid, "Table identifier can't be empty!") @@ -232,16 +231,34 @@ pub struct TableCreation { } /// TableCommit represents the commit of a table in the catalog. -#[derive(Debug)] +#[derive(Debug, TypedBuilder)] +#[builder(build_method(vis="pub(crate)"))] pub struct TableCommit { /// The table ident. - pub ident: TableIdent, + ident: TableIdent, /// The requirements of the table. /// /// Commit will fail if the requirements are not met. - pub requirements: Vec, + requirements: Vec, /// The updates of the table. - pub updates: Vec, + updates: Vec, +} + +impl TableCommit { + /// Return the table identifier. + pub fn identifier(&self) -> &TableIdent { + &self.ident + } + + /// Take all requirements. + pub fn take_requirements(&mut self) -> Vec { + take(&mut self.requirements) + } + + /// Take all updates. + pub fn take_updates(&mut self) -> Vec { + take(&mut self.updates) + } } /// TableRequirement represents a requirement for a table in the catalog. @@ -274,10 +291,67 @@ pub enum TableRequirement { } /// TableUpdate represents an update to a table in the catalog. -/// -/// TODO: we should fill with UpgradeFormatVersionUpdate, AddSchemaUpdate and so on. #[derive(Debug)] -pub enum TableUpdate {} +pub enum TableUpdate { + /// Upgrade table's format version + UpgradeFormatVersion { + format_version: FormatVersion, + }, + /// Add a new schema to the table + AddSchema { + schema: Schema, + last_column_id: i32, + }, + /// Set table's current schema + SetCurrentSchema { + schema_id: i32 + }, + /// Add a new partition spec to the table + AddPartitionSpec { + spec: PartitionSpec, + }, + /// Set table's default spec + SetDefaultSpec { + spec_id: i32, + }, + /// Add sort order to table. + AddSortOrder { + sort_order: SortOrder, + }, + /// Set table's default sort order + SetDefaultSortOrder { + sort_order_id: i32 + }, + /// Add snapshot to table. + AddSnapshot { + snapshot: Snapshot, + }, + /// Set table's snapshot ref. + SetSnapshotRef { + ref_name: String, + reference: SnapshotReference, + }, + /// Remove table's snapshots + RemoveSnapshots { + snapshot_ids: Vec, + }, + /// Remove snapshot reference + RemoveSnapshotRef { + ref_name: String, + }, + /// Update table's location + SetLocation { + location: String, + }, + /// Update table's properties + SetProperties { + updates: HashMap, + }, + /// Remove table's properties + RemoveProperties { + removals: Vec, + }, +} #[cfg(test)] mod tests { diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 710eb960a9..fa1872241a 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -45,3 +45,5 @@ pub mod io; pub mod spec; pub mod transform; +pub mod transaction; + diff --git a/crates/iceberg/src/spec/sort.rs b/crates/iceberg/src/spec/sort.rs index dcb19d3594..63515f0a31 100644 --- a/crates/iceberg/src/spec/sort.rs +++ b/crates/iceberg/src/spec/sort.rs @@ -20,6 +20,7 @@ */ use serde::{Deserialize, Serialize}; use std::sync::Arc; +use typed_builder::TypedBuilder; use super::transform::Transform; @@ -47,12 +48,12 @@ pub enum NullOrder { Last, } -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)] #[serde(rename_all = "kebab-case")] /// Entry for every column that is to be sorted pub struct SortField { /// A source column id from the table’s schema - pub source_id: i64, + pub source_id: i32, /// A transform that is used to produce values to be sorted on from the source column. pub transform: Transform, /// A sort direction, that can only be either asc or desc @@ -61,7 +62,7 @@ pub struct SortField { pub null_order: NullOrder, } -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Builder)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Builder, Default)] #[serde(rename_all = "kebab-case")] #[builder(setter(prefix = "with"))] /// A sort order is defined by a sort order id and a list of sort fields. diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 6abe959646..c5e307f42c 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -18,11 +18,13 @@ /*! Defines the [table metadata](https://iceberg.apache.org/spec/#table-metadata). The main struct here is [TableMetadataV2] which defines the data for a table. -*/ + */ use serde::{Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr}; use std::{collections::HashMap, sync::Arc}; +use std::cmp::Ordering; +use std::fmt::{Display, Formatter}; use uuid::Uuid; use super::{ @@ -139,7 +141,7 @@ impl TableMetadata { /// Returns schemas #[inline] - pub fn schemas_iter(&self) -> impl Iterator { + pub fn schemas_iter(&self) -> impl Iterator { self.schemas.values() } @@ -158,7 +160,7 @@ impl TableMetadata { /// Returns all partition specs. #[inline] - pub fn partition_specs_iter(&self) -> impl Iterator { + pub fn partition_specs_iter(&self) -> impl Iterator { self.partition_specs.values() } @@ -183,7 +185,7 @@ impl TableMetadata { /// Returns all snapshots #[inline] - pub fn snapshots(&self) -> impl Iterator { + pub fn snapshots(&self) -> impl Iterator { self.snapshots.values() } @@ -210,7 +212,7 @@ impl TableMetadata { /// Return all sort orders. #[inline] - pub fn sort_orders_iter(&self) -> impl Iterator { + pub fn sort_orders_iter(&self) -> impl Iterator { self.sort_orders.values() } @@ -373,8 +375,8 @@ pub(super) mod _serde { impl Serialize for VersionNumber { fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, + where + S: serde::Serializer, { serializer.serialize_u8(V) } @@ -382,8 +384,8 @@ pub(super) mod _serde { impl<'de, const V: u8> Deserialize<'de> for VersionNumber { fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, + where + D: serde::Deserializer<'de>, { let value = u8::deserialize(deserializer)?; if value == V { @@ -751,6 +753,27 @@ pub enum FormatVersion { V2 = b'2', } +impl PartialOrd for FormatVersion { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for FormatVersion { + fn cmp(&self, other: &Self) -> Ordering { + self.to_u8().cmp(&other.to_u8()) + } +} + +impl Display for FormatVersion { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + FormatVersion::V1 => write!(f, "v1"), + FormatVersion::V2 => write!(f, "v2"), + } + } +} + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] #[serde(rename_all = "kebab-case")] /// Encodes changes to the previous metadata files for the table @@ -773,7 +796,6 @@ pub struct SnapshotLog { #[cfg(test)] mod tests { - use std::{collections::HashMap, fs, sync::Arc}; use anyhow::Result; @@ -1041,7 +1063,7 @@ mod tests { .with_sequence_number(0) .with_schema_id(0) .with_manifest_list(ManifestListLocation::ManifestListFile("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string())) - .with_summary(Summary{operation: Operation::Append, other: HashMap::from_iter(vec![("spark.app.id".to_string(),"local-1662532784305".to_string()),("added-data-files".to_string(),"4".to_string()),("added-records".to_string(),"4".to_string()),("added-files-size".to_string(),"6001".to_string())])}) + .with_summary(Summary { operation: Operation::Append, other: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) }) .build().unwrap(); let expected = TableMetadata { @@ -1060,13 +1082,13 @@ mod tests { snapshots: HashMap::from_iter(vec![(638933773299822130, Arc::new(snapshot))]), current_snapshot_id: Some(638933773299822130), last_sequence_number: 0, - properties: HashMap::from_iter(vec![("owner".to_string(),"root".to_string())]), + properties: HashMap::from_iter(vec![("owner".to_string(), "root".to_string())]), snapshot_log: vec![SnapshotLog { snapshot_id: 638933773299822130, timestamp_ms: 1662532818843, }], - metadata_log: vec![MetadataLog{metadata_file:"/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string(), timestamp_ms: 1662532805245}], - refs: HashMap::from_iter(vec![("main".to_string(),SnapshotReference{snapshot_id: 638933773299822130, retention: SnapshotRetention::Branch { min_snapshots_to_keep: None, max_snapshot_age_ms: None, max_ref_age_ms: None }})]) + metadata_log: vec![MetadataLog { metadata_file: "/home/iceberg/warehouse/nyc/taxis/metadata/00000-8a62c37d-4573-4021-952a-c0baef7d21d0.metadata.json".to_string(), timestamp_ms: 1662532805245 }], + refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference { snapshot_id: 638933773299822130, retention: SnapshotRetention::Branch { min_snapshots_to_keep: None, max_snapshot_age_ms: None, max_ref_age_ms: None } })]), }; check_table_metadata_serde(data, expected); @@ -1083,6 +1105,7 @@ mod tests { assert!(serde_json::from_str::(data).is_err()); Ok(()) } + #[test] fn test_deserialize_table_data_v2_invalid_format_version() -> Result<()> { let data = r#" @@ -1434,7 +1457,7 @@ mod tests { let metadata = fs::read_to_string( "testdata/table_metadata/TableMetadataV2MissingLastPartitionId.json", ) - .unwrap(); + .unwrap(); let desered: Result = serde_json::from_str(&metadata); @@ -1471,4 +1494,11 @@ mod tests { "data did not match any variant of untagged enum TableMetadataEnum" ) } + + #[test] + fn order_of_format_version() { + assert!(FormatVersion::V1 < FormatVersion::V2); + assert_eq!(FormatVersion::V1, FormatVersion::V1); + assert_eq!(FormatVersion::V2, FormatVersion::V2); + } } diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs new file mode 100644 index 0000000000..48735233d9 --- /dev/null +++ b/crates/iceberg/src/transaction.rs @@ -0,0 +1,156 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains transaction api. + +use std::collections::HashMap; +use crate::table::Table; +use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate}; +use crate::spec::{FormatVersion, NullOrder, SortDirection, SortField, SortOrder, Transform}; +use crate::error::Result; +use crate::TableUpdate::UpgradeFormatVersion; + +pub struct Transaction<'a> { + table: &'a Table, + updates: Vec, + requirements: Vec, +} + +impl Transaction<'_> { + /// Creates a new transaction. + pub fn new(table: &'_ Table) -> Self { + Self { + table, + updates: vec![], + requirements: vec![], + } + } + + fn append_updates(&mut self, updates: Vec) -> Result<()> { + self.updates.extend(updates); + Ok(()) + } + + fn append_requirements(&mut self, requirements: Vec) -> Result<()> { + self.requirements.extend(requirements); + Ok(()) + } + + /// Sets table to a new version. + pub fn upgrade_table_version(mut self, format_version: FormatVersion) -> Result { + let current_version = self.table.metadata().format_version(); + if current_version > format_version { + return Err(Error::new(ErrorKind::DataInvalid, format!("Cannot downgrade table version from {} to {}", + current_version, + format_version))); + } else if current_version < format_version { + self.append_updates(vec![UpgradeFormatVersion { + format_version + }])?; + } + Ok(self) + } + + /// Update table's property. + pub fn set_properties(mut self, props: HashMap) -> Result { + self.append_updates(vec![TableUpdate::SetProperties { + updates: props + }])?; + Ok(self) + } + + /// Creates replace sort order action. + pub fn replace_sort_order(mut self) -> ReplaceSortOrderAction { + ReplaceSortOrderAction { + tx: self, + sort_fields: vec![], + } + } + + /// Remove properties in table. + pub fn remove_properties(mut self, keys: Vec) -> Result { + self.append_updates(vec![TableUpdate::RemoveProperties { + removals: keys + }])?; + Ok(self) + } + + /// Commit transaction. + pub async fn commit(self, catalog: &impl Catalog) -> Result
{ + let table_commit = TableCommit::builder() + .ident(self.table.identifier().clone()) + .updates(self.updates) + .requirements(self.requirements) + .build(); + + catalog.update_table(table_commit).await + } +} + +/// Transaction action for replacing sort order. +pub struct ReplaceSortOrderAction { + tx: Transaction<'_>, + sort_fields: Vec, +} + +impl ReplaceSortOrderAction { + /// Adds a field for sorting in ascending order. + pub fn asc(mut self, name: &str, null_order: NullOrder) -> Result { + self.add_sort_field(name, SortDirection::Ascending, null_order) + } + + /// Adds a field for sorting in descending order. + pub fn desc(mut self, name: &str, null_order: NullOrder) -> Result { + self.add_sort_field(name, SortDirection::Descending, null_order) + } + + /// Finished building the action and apply it to the transaction. + pub fn apply(mut self) -> Result> { + let updates = vec![TableUpdate::AddSortOrder { + sort_order: SortOrder { + fields: self.sort_fields, + ..SortOrder::default() + } + }, TableUpdate::SetDefaultSortOrder { + sort_order_id: -1 + }]; + + let requirements = vec![ + TableRequirement::CurrentSchemaIdMatch(self.tx.table.metadata().current_schema().schema_id() as i64), + TableRequirement::DefaultSortOrderIdMatch(self.tx.table.metadata().default_sort_order().unwrap().order_id), + ]; + + self.tx.append_requirements(requirements)?; + self.tx.append_updates(updates)?; + Ok(self.tx) + } + + fn add_sort_field(mut self, name: &str, sort_direction: SortDirection, null_order: NullOrder) -> Result { + let field_id = self.tx.table.metadata().current_schema().field_id_by_name(name) + .ok_or_else(|| Error::new(ErrorKind::DataInvalid, format!("Cannot find field {} in table schema", name)))?; + + let sort_field = SortField::builder() + .source_id(field_id) + .transform(Transform::Identity) + .direction(sort_direction) + .null_order(null_order) + .build(); + + self.sort_fields.push(sort_field); + Ok(self) + } +} \ No newline at end of file From 838299740eb80f16836dcf84bfe1d0621aadb2fc Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Thu, 16 Nov 2023 16:44:10 +0800 Subject: [PATCH 02/10] Add create table test --- crates/catalog/rest/src/catalog.rs | 224 +++++++++++++++++- .../rest/testdata/create_table_response.json | 53 +++++ crates/iceberg/src/catalog/mod.rs | 17 +- crates/iceberg/src/lib.rs | 2 +- crates/iceberg/src/spec/partition.rs | 3 +- crates/iceberg/src/spec/snapshot.rs | 4 +- crates/iceberg/src/spec/sort.rs | 1 + crates/iceberg/src/spec/table_metadata.rs | 2 +- crates/iceberg/src/transaction.rs | 15 +- 9 files changed, 291 insertions(+), 30 deletions(-) create mode 100644 crates/catalog/rest/testdata/create_table_response.json diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index d7447f0ee5..2cf201ce90 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -26,7 +26,7 @@ use serde::de::DeserializeOwned; use typed_builder::TypedBuilder; use urlencoding::encode; -use crate::catalog::_serde::{CommitTableRequest, CommitTableResponse, LoadTableResponse}; +use crate::catalog::_serde::{CommitTableRequest, CommitTableResponse, CreateTableRequest, LoadTableResponse}; use iceberg::io::{FileIO, FileIOBuilder}; use iceberg::table::Table; use iceberg::Result; @@ -308,13 +308,41 @@ impl Catalog for RestCatalog { /// Create a new table inside the namespace. async fn create_table( &self, - _namespace: &NamespaceIdent, - _creation: TableCreation, + namespace: &NamespaceIdent, + creation: TableCreation, ) -> Result
{ - Err(Error::new( - ErrorKind::FeatureUnsupported, - "Creating table not supported yet!", - )) + let table_ident = TableIdent::new(namespace.clone(), creation.name.clone()); + + let request = self.client + .0 + .post(self.config.tables_endpoint(namespace)) + .json(&CreateTableRequest { + name: creation.name, + location: creation.location, + schema: creation.schema, + partition_spec: creation.partition_spec, + write_order: creation.sort_order, + // We don't support stage create yet. + stage_create: false, + properties: if creation.properties.is_empty() { + None + } else { + Some(creation.properties) + }, + }).build()?; + + let resp = self.client.query::(request).await?; + + let file_io = self.load_file_io(resp.metadata_location.as_deref(), resp.config)?; + + let table = Table::builder() + .identifier(table_ident) + .file_io(file_io) + .metadata(resp.metadata) + .metadata_location(resp.metadata_location.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Metadata location missing in create table response!"))?) + .build(); + + Ok(table) } /// Load table from the catalog. @@ -401,7 +429,7 @@ impl Catalog for RestCatalog { let resp = self.client.query::(request).await?; - let file_io = self.load_file_io(Some(&resp.metadata_location()), None)?; + let file_io = self.load_file_io(Some(&resp.metadata_location), None)?; Ok(Table::builder() .identifier(commit.identifier().clone()) .file_io(file_io) @@ -472,7 +500,7 @@ mod _serde { use serde_derive::{Deserialize, Serialize}; - use iceberg::spec::TableMetadata; + use iceberg::spec::{PartitionSpec, Schema, SortOrder, TableMetadata}; use iceberg::{Error, ErrorKind, Namespace, TableIdent, TableRequirement, TableUpdate}; pub(super) const OK: u16 = 200u16; @@ -605,6 +633,18 @@ mod _serde { pub(super) config: Option>, } + #[derive(Debug, Serialize, Deserialize)] + #[serde(rename_all = "kebab-case")] + pub(super) struct CreateTableRequest { + pub(super) name: String, + pub(super) location: Option, + pub(super) schema: Schema, + pub(super) partition_spec: Option, + pub(super) write_order: Option, + pub(super) stage_create: bool, + pub(super) properties: Option>, + } + #[derive(Debug, Serialize, Deserialize)] pub(super) struct CommitTableRequest { pub(super) identifier: TableIdent, @@ -622,10 +662,7 @@ mod _serde { #[cfg(test)] mod tests { use iceberg::spec::ManifestListLocation::ManifestListFile; - use iceberg::spec::{ - FormatVersion, NestedField, Operation, PrimitiveType, Schema, Snapshot, SnapshotLog, - SortOrder, Summary, Type, - }; + use iceberg::spec::{FormatVersion, NestedField, NullOrder, Operation, PartitionField, PartitionSpec, PrimitiveType, Schema, Snapshot, SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, Type}; use mockito::{Mock, Server, ServerGuard}; use std::sync::Arc; use uuid::uuid; @@ -1123,4 +1160,165 @@ mod tests { config_mock.assert_async().await; rename_table_mock.assert_async().await; } + + #[tokio::test] + async fn test_create_table() { + let mut server = Server::new_async().await; + + let config_mock = create_config_mock(&mut server).await; + + let rename_table_mock = server + .mock("POST", "/v1/namespaces/ns1/tables") + .with_status(200) + .with_body_from_file(format!( + "{}/testdata/{}", + env!("CARGO_MANIFEST_DIR"), + "create_table_response.json" + )) + .create_async() + .await; + + let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build()) + .await + .unwrap(); + + let table_creation = TableCreation::builder() + .name("test1".to_string()) + .schema(Schema::builder() + .with_fields(vec![ + NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), + ]) + .with_schema_id(1) + .with_identifier_field_ids(vec![2]) + .build() + .unwrap()) + .properties(HashMap::from([("owner".to_string(), "testx".to_string())])) + .partition_spec(PartitionSpec::builder() + .with_fields(vec![ + PartitionField::builder().source_id(1).field_id(1000).transform(Transform::Truncate(3)).name("id".to_string()).build() + ]) + .with_spec_id(1) + .build() + .unwrap()) + .sort_order(SortOrder::builder() + .with_sort_field(SortField::builder() + .source_id(2) + .transform(Transform::Identity) + .direction(SortDirection::Ascending) + .null_order(NullOrder::First) + .build()) + .build().unwrap()) + .build(); + + let table = catalog + .create_table(&NamespaceIdent::from_strs(["ns1"]).unwrap(), table_creation) + .await + .unwrap(); + + assert_eq!( + &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(), + table.identifier() + ); + assert_eq!("s3://warehouse/database/table/metadata.json", table.metadata_location().unwrap()); + assert_eq!(FormatVersion::V1, table.metadata().format_version()); + assert_eq!("s3://warehouse/database/table", table.metadata().location()); + assert_eq!( + uuid!("bf289591-dcc0-4234-ad4f-5c3eed811a29"), + table.metadata().uuid() + ); + assert_eq!(1657810967051, table.metadata().last_updated_ms()); + assert_eq!( + vec![&Arc::new( + Schema::builder() + .with_fields(vec![ + NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), + ]) + .with_schema_id(0) + .with_identifier_field_ids(vec![2]) + .build() + .unwrap() + )], + table.metadata().schemas_iter().collect::>() + ); + assert_eq!( + &HashMap::from([ + ("write.delete.parquet.compression-codec".to_string(), "zstd".to_string()), + ("write.metadata.compression-codec".to_string(), "gzip".to_string()), + ("write.summary.partition-limit".to_string(), "100".to_string()), + ("write.parquet.compression-codec".to_string(), "zstd".to_string()), + ]), + table.metadata().properties() + ); + assert!(table.metadata().current_snapshot().is_none()); + assert!(table.metadata().history().is_empty()); + assert_eq!( + vec![&Arc::new(SortOrder { + order_id: 0, + fields: vec![], + })], + table.metadata().sort_orders_iter().collect::>() + ); + + config_mock.assert_async().await; + rename_table_mock.assert_async().await; + } + + #[tokio::test] + async fn test_create_table_409() { + let mut server = Server::new_async().await; + + let config_mock = create_config_mock(&mut server).await; + + let rename_table_mock = server + .mock("POST", "/v1/namespaces/ns1/tables") + .with_status(409) + .with_body(r#" +{ + "error": { + "message": "Table already exists: ns1.test1 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e", + "type": "AlreadyExistsException", + "code": 409 + } +} + "#) + .create_async() + .await; + + let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build()) + .await + .unwrap(); + + let table_creation = TableCreation::builder() + .name("test1".to_string()) + .schema(Schema::builder() + .with_fields(vec![ + NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), + ]) + .with_schema_id(1) + .with_identifier_field_ids(vec![2]) + .build() + .unwrap()) + .properties(HashMap::from([("owner".to_string(), "testx".to_string())])) + .build(); + + let table_result = catalog + .create_table(&NamespaceIdent::from_strs(["ns1"]).unwrap(), table_creation) + .await; + + assert!(table_result.is_err()); + assert!(table_result + .err() + .unwrap() + .message() + .contains("Table already exists")); + + config_mock.assert_async().await; + rename_table_mock.assert_async().await; + } } diff --git a/crates/catalog/rest/testdata/create_table_response.json b/crates/catalog/rest/testdata/create_table_response.json new file mode 100644 index 0000000000..e01a52fdce --- /dev/null +++ b/crates/catalog/rest/testdata/create_table_response.json @@ -0,0 +1,53 @@ +{ + "metadata-location": "s3://warehouse/database/table/metadata.json", + "metadata": { + "format-version": 1, + "table-uuid": "bf289591-dcc0-4234-ad4f-5c3eed811a29", + "location": "s3://warehouse/database/table", + "last-updated-ms": 1657810967051, + "last-column-id": 3, + "schema": { + "type": "struct", + "schema-id": 0, + "identifier-field-ids": [2], + "fields": [ + {"id": 1, "name": "foo", "required": false, "type": "string"}, + {"id": 2, "name": "bar", "required": true, "type": "int"}, + {"id": 3, "name": "baz", "required": false, "type": "boolean"} + ] + }, + "current-schema-id": 0, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "identifier-field-ids": [2], + "fields": [ + {"id": 1, "name": "foo", "required": false, "type": "string"}, + {"id": 2, "name": "bar", "required": true, "type": "int"}, + {"id": 3, "name": "baz", "required": false, "type": "boolean"} + ] + } + ], + "partition-spec": [], + "default-spec-id": 0, + "last-partition-id": 999, + "default-sort-order-id": 0, + "sort-orders": [{"order-id": 0, "fields": []}], + "properties": { + "write.delete.parquet.compression-codec": "zstd", + "write.metadata.compression-codec": "gzip", + "write.summary.partition-limit": "100", + "write.parquet.compression-codec": "zstd" + }, + "current-snapshot-id": -1, + "refs": {}, + "snapshots": [], + "snapshot-log": [], + "metadata-log": [] + }, + "config": { + "client.factory": "io.tabular.iceberg.catalog.TabularAwsClientFactory", + "region": "us-west-2" + } +} \ No newline at end of file diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index ba4b41ba46..0014a4c68c 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -25,7 +25,7 @@ use crate::table::Table; use crate::{Error, ErrorKind, Result}; use async_trait::async_trait; use std::collections::HashMap; -use std::mem::{replace, take}; +use std::mem::take; use std::ops::Deref; use typed_builder::TypedBuilder; @@ -214,19 +214,23 @@ impl TableIdent { } /// TableCreation represents the creation of a table in the catalog. -#[derive(Debug)] +#[derive(Debug, TypedBuilder)] pub struct TableCreation { /// The name of the table. pub name: String, /// The location of the table. - pub location: String, + #[builder(default, setter(strip_option))] + pub location: Option, /// The schema of the table. pub schema: Schema, /// The partition spec of the table, could be None. + #[builder(default, setter(strip_option))] pub partition_spec: Option, /// The sort order of the table. - pub sort_order: SortOrder, + #[builder(default, setter(strip_option))] + pub sort_order: Option, /// The properties of the table. + #[builder(default)] pub properties: HashMap, } @@ -262,7 +266,7 @@ impl TableCommit { } /// TableRequirement represents a requirement for a table in the catalog. -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub enum TableRequirement { /// The table must not already exist; used for create transactions NotExist, @@ -291,10 +295,11 @@ pub enum TableRequirement { } /// TableUpdate represents an update to a table in the catalog. -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub enum TableUpdate { /// Upgrade table's format version UpgradeFormatVersion { + /// Target format upgrade to. format_version: FormatVersion, }, /// Add a new schema to the table diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index fa1872241a..8f7ba3b109 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -17,7 +17,7 @@ //! Native Rust implementation of Apache Iceberg -#![deny(missing_docs)] +// #![deny(missing_docs)] #[macro_use] extern crate derive_builder; diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index c5ea8f319a..cfdbb6f17a 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -20,12 +20,13 @@ */ use serde::{Deserialize, Serialize}; use std::sync::Arc; +use typed_builder::TypedBuilder; use super::transform::Transform; /// Reference to [`PartitionSpec`]. pub type PartitionSpecRef = Arc; -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, TypedBuilder)] #[serde(rename_all = "kebab-case")] /// Partition fields capture the transform from table data to partition values. pub struct PartitionField { diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index a04bb99980..ca9db4156a 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -23,6 +23,7 @@ use std::collections::HashMap; use std::sync::Arc; use super::table_metadata::SnapshotLog; +use _serde::SnapshotV2; /// Reference to [`Snapshot`]. pub type SnapshotRef = Arc; @@ -57,7 +58,8 @@ impl Default for Operation { } } -#[derive(Debug, PartialEq, Eq, Clone, Builder)] +#[derive(Debug, PartialEq, Eq, Clone, Builder, Serialize, Deserialize)] +#[serde(from = "SnapshotV2", into = "SnapshotV2")] #[builder(setter(prefix = "with"))] /// A snapshot represents the state of a table at some time and is used to access the complete set of data files in the table. pub struct Snapshot { diff --git a/crates/iceberg/src/spec/sort.rs b/crates/iceberg/src/spec/sort.rs index 63515f0a31..2150421b7e 100644 --- a/crates/iceberg/src/spec/sort.rs +++ b/crates/iceberg/src/spec/sort.rs @@ -69,6 +69,7 @@ pub struct SortField { /// The order of the sort fields within the list defines the order in which the sort is applied to the data. pub struct SortOrder { /// Identifier for SortOrder, order_id `0` is no sort order. + #[builder(default)] pub order_id: i64, /// Details of the sort #[builder(setter(each(name = "with_sort_field")), default)] diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index c5e307f42c..8b2204b929 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -761,7 +761,7 @@ impl PartialOrd for FormatVersion { impl Ord for FormatVersion { fn cmp(&self, other: &Self) -> Ordering { - self.to_u8().cmp(&other.to_u8()) + (*self as u8).cmp(&(*other as u8)) } } diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 48735233d9..95ed0556bb 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -24,15 +24,16 @@ use crate::spec::{FormatVersion, NullOrder, SortDirection, SortField, SortOrder, use crate::error::Result; use crate::TableUpdate::UpgradeFormatVersion; +/// Table transaction. pub struct Transaction<'a> { table: &'a Table, updates: Vec, requirements: Vec, } -impl Transaction<'_> { +impl<'a> Transaction<'a> { /// Creates a new transaction. - pub fn new(table: &'_ Table) -> Self { + pub fn new(table: &'a Table) -> Self { Self { table, updates: vec![], @@ -74,7 +75,7 @@ impl Transaction<'_> { } /// Creates replace sort order action. - pub fn replace_sort_order(mut self) -> ReplaceSortOrderAction { + pub fn replace_sort_order(mut self) -> ReplaceSortOrderAction<'a> { ReplaceSortOrderAction { tx: self, sort_fields: vec![], @@ -102,12 +103,12 @@ impl Transaction<'_> { } /// Transaction action for replacing sort order. -pub struct ReplaceSortOrderAction { - tx: Transaction<'_>, +pub struct ReplaceSortOrderAction<'a> { + tx: Transaction<'a>, sort_fields: Vec, } -impl ReplaceSortOrderAction { +impl<'a> ReplaceSortOrderAction<'a> { /// Adds a field for sorting in ascending order. pub fn asc(mut self, name: &str, null_order: NullOrder) -> Result { self.add_sort_field(name, SortDirection::Ascending, null_order) @@ -119,7 +120,7 @@ impl ReplaceSortOrderAction { } /// Finished building the action and apply it to the transaction. - pub fn apply(mut self) -> Result> { + pub fn apply(mut self) -> Result> { let updates = vec![TableUpdate::AddSortOrder { sort_order: SortOrder { fields: self.sort_fields, From 5bbf3754222c46563805ecdecdfb40ce84fed13c Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Thu, 16 Nov 2023 17:44:00 +0800 Subject: [PATCH 03/10] Add some tests for update table --- crates/catalog/rest/src/catalog.rs | 305 ++++++++++++++---- .../rest/testdata/update_table_response.json | 40 +++ crates/iceberg/src/transaction.rs | 83 +++-- 3 files changed, 334 insertions(+), 94 deletions(-) create mode 100644 crates/catalog/rest/testdata/update_table_response.json diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 2cf201ce90..f0a600a128 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -26,7 +26,9 @@ use serde::de::DeserializeOwned; use typed_builder::TypedBuilder; use urlencoding::encode; -use crate::catalog::_serde::{CommitTableRequest, CommitTableResponse, CreateTableRequest, LoadTableResponse}; +use crate::catalog::_serde::{ + CommitTableRequest, CommitTableResponse, CreateTableRequest, LoadTableResponse, +}; use iceberg::io::{FileIO, FileIOBuilder}; use iceberg::table::Table; use iceberg::Result; @@ -75,7 +77,7 @@ impl RestCatalogConfig { &ns.encode_in_url(), "tables", ] - .join("/") + .join("/") } fn rename_table_endpoint(&self) -> String { @@ -91,7 +93,7 @@ impl RestCatalogConfig { "tables", encode(&table.name).as_ref(), ] - .join("/") + .join("/") } fn try_create_rest_client(&self) -> Result { @@ -138,8 +140,8 @@ impl HttpClient { ErrorKind::Unexpected, "Failed to parse response from rest catalog server!", ) - .with_context("json", String::from_utf8_lossy(&text)) - .with_source(e) + .with_context("json", String::from_utf8_lossy(&text)) + .with_source(e) })?) } else { let text = resp.bytes().await?; @@ -148,8 +150,8 @@ impl HttpClient { ErrorKind::Unexpected, "Failed to parse response from rest catalog server!", ) - .with_context("json", String::from_utf8_lossy(&text)) - .with_source(e) + .with_context("json", String::from_utf8_lossy(&text)) + .with_source(e) })?; Err(e.into()) } @@ -170,8 +172,8 @@ impl HttpClient { ErrorKind::Unexpected, "Failed to parse response from rest catalog server!", ) - .with_context("json", String::from_utf8_lossy(&text)) - .with_source(e) + .with_context("json", String::from_utf8_lossy(&text)) + .with_source(e) })?; Err(e.into()) } @@ -313,7 +315,8 @@ impl Catalog for RestCatalog { ) -> Result
{ let table_ident = TableIdent::new(namespace.clone(), creation.name.clone()); - let request = self.client + let request = self + .client .0 .post(self.config.tables_endpoint(namespace)) .json(&CreateTableRequest { @@ -329,9 +332,13 @@ impl Catalog for RestCatalog { } else { Some(creation.properties) }, - }).build()?; + }) + .build()?; - let resp = self.client.query::(request).await?; + let resp = self + .client + .query::(request) + .await?; let file_io = self.load_file_io(resp.metadata_location.as_deref(), resp.config)?; @@ -339,7 +346,12 @@ impl Catalog for RestCatalog { .identifier(table_ident) .file_io(file_io) .metadata(resp.metadata) - .metadata_location(resp.metadata_location.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Metadata location missing in create table response!"))?) + .metadata_location(resp.metadata_location.ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Metadata location missing in create table response!", + ) + })?) .build(); Ok(table) @@ -418,16 +430,21 @@ impl Catalog for RestCatalog { /// Update table. async fn update_table(&self, mut commit: TableCommit) -> Result
{ - let request = self.client + let request = self + .client .0 .post(self.config.table_endpoint(&commit.identifier())) .json(&CommitTableRequest { identifier: commit.identifier().clone(), requirements: commit.take_requirements(), updates: commit.take_updates(), - }).build()?; + }) + .build()?; - let resp = self.client.query::(request).await?; + let resp = self + .client + .query::(request) + .await?; let file_io = self.load_file_io(Some(&resp.metadata_location), None)?; Ok(Table::builder() @@ -474,7 +491,11 @@ impl RestCatalog { Ok(()) } - fn load_file_io(&self, metadata_location: Option<&str>, extra_config: Option>) -> Result { + fn load_file_io( + &self, + metadata_location: Option<&str>, + extra_config: Option>, + ) -> Result { let mut props = self.config.props.clone(); if let Some(config) = extra_config { props.extend(config); @@ -653,6 +674,7 @@ mod _serde { } #[derive(Debug, Serialize, Deserialize)] + #[serde(rename_all = "kebab-case")] pub(super) struct CommitTableResponse { pub(super) metadata_location: String, pub(super) metadata: TableMetadata, @@ -662,8 +684,15 @@ mod _serde { #[cfg(test)] mod tests { use iceberg::spec::ManifestListLocation::ManifestListFile; - use iceberg::spec::{FormatVersion, NestedField, NullOrder, Operation, PartitionField, PartitionSpec, PrimitiveType, Schema, Snapshot, SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, Type}; + use iceberg::spec::{ + FormatVersion, NestedField, NullOrder, Operation, PartitionField, PartitionSpec, + PrimitiveType, Schema, Snapshot, SnapshotLog, SortDirection, SortField, SortOrder, Summary, + Transform, Type, + }; + use iceberg::transaction::Transaction; use mockito::{Mock, Server, ServerGuard}; + use std::fs::File; + use std::io::BufReader; use std::sync::Arc; use uuid::uuid; @@ -1167,7 +1196,7 @@ mod tests { let config_mock = create_config_mock(&mut server).await; - let rename_table_mock = server + let create_table_mock = server .mock("POST", "/v1/namespaces/ns1/tables") .with_status(200) .with_body_from_file(format!( @@ -1184,32 +1213,46 @@ mod tests { let table_creation = TableCreation::builder() .name("test1".to_string()) - .schema(Schema::builder() - .with_fields(vec![ - NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(), - NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), - NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), - ]) - .with_schema_id(1) - .with_identifier_field_ids(vec![2]) - .build() - .unwrap()) + .schema( + Schema::builder() + .with_fields(vec![ + NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)) + .into(), + NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)) + .into(), + ]) + .with_schema_id(1) + .with_identifier_field_ids(vec![2]) + .build() + .unwrap(), + ) .properties(HashMap::from([("owner".to_string(), "testx".to_string())])) - .partition_spec(PartitionSpec::builder() - .with_fields(vec![ - PartitionField::builder().source_id(1).field_id(1000).transform(Transform::Truncate(3)).name("id".to_string()).build() - ]) - .with_spec_id(1) - .build() - .unwrap()) - .sort_order(SortOrder::builder() - .with_sort_field(SortField::builder() - .source_id(2) - .transform(Transform::Identity) - .direction(SortDirection::Ascending) - .null_order(NullOrder::First) - .build()) - .build().unwrap()) + .partition_spec( + PartitionSpec::builder() + .with_fields(vec![PartitionField::builder() + .source_id(1) + .field_id(1000) + .transform(Transform::Truncate(3)) + .name("id".to_string()) + .build()]) + .with_spec_id(1) + .build() + .unwrap(), + ) + .sort_order( + SortOrder::builder() + .with_sort_field( + SortField::builder() + .source_id(2) + .transform(Transform::Identity) + .direction(SortDirection::Ascending) + .null_order(NullOrder::First) + .build(), + ) + .build() + .unwrap(), + ) .build(); let table = catalog @@ -1221,7 +1264,10 @@ mod tests { &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(), table.identifier() ); - assert_eq!("s3://warehouse/database/table/metadata.json", table.metadata_location().unwrap()); + assert_eq!( + "s3://warehouse/database/table/metadata.json", + table.metadata_location().unwrap() + ); assert_eq!(FormatVersion::V1, table.metadata().format_version()); assert_eq!("s3://warehouse/database/table", table.metadata().location()); assert_eq!( @@ -1233,9 +1279,11 @@ mod tests { vec![&Arc::new( Schema::builder() .with_fields(vec![ - NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(), + NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)) + .into(), NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), - NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), + NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)) + .into(), ]) .with_schema_id(0) .with_identifier_field_ids(vec![2]) @@ -1246,10 +1294,22 @@ mod tests { ); assert_eq!( &HashMap::from([ - ("write.delete.parquet.compression-codec".to_string(), "zstd".to_string()), - ("write.metadata.compression-codec".to_string(), "gzip".to_string()), - ("write.summary.partition-limit".to_string(), "100".to_string()), - ("write.parquet.compression-codec".to_string(), "zstd".to_string()), + ( + "write.delete.parquet.compression-codec".to_string(), + "zstd".to_string() + ), + ( + "write.metadata.compression-codec".to_string(), + "gzip".to_string() + ), + ( + "write.summary.partition-limit".to_string(), + "100".to_string() + ), + ( + "write.parquet.compression-codec".to_string(), + "zstd".to_string() + ), ]), table.metadata().properties() ); @@ -1264,7 +1324,7 @@ mod tests { ); config_mock.assert_async().await; - rename_table_mock.assert_async().await; + create_table_mock.assert_async().await; } #[tokio::test] @@ -1273,7 +1333,7 @@ mod tests { let config_mock = create_config_mock(&mut server).await; - let rename_table_mock = server + let create_table_mock = server .mock("POST", "/v1/namespaces/ns1/tables") .with_status(409) .with_body(r#" @@ -1294,16 +1354,20 @@ mod tests { let table_creation = TableCreation::builder() .name("test1".to_string()) - .schema(Schema::builder() - .with_fields(vec![ - NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(), - NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), - NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), - ]) - .with_schema_id(1) - .with_identifier_field_ids(vec![2]) - .build() - .unwrap()) + .schema( + Schema::builder() + .with_fields(vec![ + NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)) + .into(), + NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)) + .into(), + ]) + .with_schema_id(1) + .with_identifier_field_ids(vec![2]) + .build() + .unwrap(), + ) .properties(HashMap::from([("owner".to_string(), "testx".to_string())])) .build(); @@ -1319,6 +1383,119 @@ mod tests { .contains("Table already exists")); config_mock.assert_async().await; - rename_table_mock.assert_async().await; + create_table_mock.assert_async().await; + } + + #[tokio::test] + async fn test_update_table() { + let mut server = Server::new_async().await; + + let config_mock = create_config_mock(&mut server).await; + + let update_table_mock = server + .mock("POST", "/v1/namespaces/ns1/tables/test1") + .with_status(200) + .with_body_from_file(format!( + "{}/testdata/{}", + env!("CARGO_MANIFEST_DIR"), + "update_table_response.json" + )) + .create_async() + .await; + + let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build()) + .await + .unwrap(); + + let table1 = { + let file = File::open(format!( + "{}/testdata/{}", + env!("CARGO_MANIFEST_DIR"), + "create_table_response.json" + )) + .unwrap(); + let reader = BufReader::new(file); + let resp = serde_json::from_reader::<_, LoadTableResponse>(reader).unwrap(); + + Table::builder() + .metadata(resp.metadata) + .metadata_location(resp.metadata_location.unwrap()) + .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap()) + .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap()) + .build() + }; + + let table = Transaction::new(&table1) + .upgrade_table_version(FormatVersion::V2) + .unwrap() + .commit(&catalog) + .await + .unwrap(); + + assert_eq!( + &TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(), + table.identifier() + ); + assert_eq!( + "s3://warehouse/database/table/metadata.json", + table.metadata_location().unwrap() + ); + assert_eq!(FormatVersion::V2, table.metadata().format_version()); + assert_eq!("s3://warehouse/database/table", table.metadata().location()); + assert_eq!( + uuid!("bf289591-dcc0-4234-ad4f-5c3eed811a29"), + table.metadata().uuid() + ); + assert_eq!(1657810967051, table.metadata().last_updated_ms()); + assert_eq!( + vec![&Arc::new( + Schema::builder() + .with_fields(vec![ + NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)) + .into(), + NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)) + .into(), + ]) + .with_schema_id(0) + .with_identifier_field_ids(vec![2]) + .build() + .unwrap() + )], + table.metadata().schemas_iter().collect::>() + ); + assert_eq!( + &HashMap::from([ + ( + "write.delete.parquet.compression-codec".to_string(), + "zstd".to_string() + ), + ( + "write.metadata.compression-codec".to_string(), + "gzip".to_string() + ), + ( + "write.summary.partition-limit".to_string(), + "100".to_string() + ), + ( + "write.parquet.compression-codec".to_string(), + "zstd".to_string() + ), + ]), + table.metadata().properties() + ); + assert!(table.metadata().current_snapshot().is_none()); + assert!(table.metadata().history().is_empty()); + assert_eq!( + vec![&Arc::new(SortOrder { + order_id: 0, + fields: vec![], + })], + table.metadata().sort_orders_iter().collect::>() + ); + + config_mock.assert_async().await; + update_table_mock.assert_async().await; } } diff --git a/crates/catalog/rest/testdata/update_table_response.json b/crates/catalog/rest/testdata/update_table_response.json new file mode 100644 index 0000000000..80ec269a16 --- /dev/null +++ b/crates/catalog/rest/testdata/update_table_response.json @@ -0,0 +1,40 @@ +{ + "metadata-location": "s3://warehouse/database/table/metadata.json", + "metadata": { + "format-version": 2, + "table-uuid": "bf289591-dcc0-4234-ad4f-5c3eed811a29", + "location": "s3://warehouse/database/table", + "last-sequence-number" : 1, + "last-updated-ms": 1657810967051, + "last-column-id": 3, + "current-schema-id": 0, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "identifier-field-ids": [2], + "fields": [ + {"id": 1, "name": "foo", "required": false, "type": "string"}, + {"id": 2, "name": "bar", "required": true, "type": "int"}, + {"id": 3, "name": "baz", "required": false, "type": "boolean"} + ] + } + ], + "partition-specs": [], + "default-spec-id": 0, + "last-partition-id": 999, + "default-sort-order-id": 0, + "sort-orders": [{"order-id": 0, "fields": []}], + "properties": { + "write.delete.parquet.compression-codec": "zstd", + "write.metadata.compression-codec": "gzip", + "write.summary.partition-limit": "100", + "write.parquet.compression-codec": "zstd" + }, + "current-snapshot-id": -1, + "refs": {}, + "snapshots": [], + "snapshot-log": [], + "metadata-log": [] + } +} \ No newline at end of file diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 95ed0556bb..05d5d1c07c 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -17,12 +17,12 @@ //! This module contains transaction api. -use std::collections::HashMap; -use crate::table::Table; -use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate}; -use crate::spec::{FormatVersion, NullOrder, SortDirection, SortField, SortOrder, Transform}; use crate::error::Result; +use crate::spec::{FormatVersion, NullOrder, SortDirection, SortField, SortOrder, Transform}; +use crate::table::Table; use crate::TableUpdate::UpgradeFormatVersion; +use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate}; +use std::collections::HashMap; /// Table transaction. pub struct Transaction<'a> { @@ -55,22 +55,22 @@ impl<'a> Transaction<'a> { pub fn upgrade_table_version(mut self, format_version: FormatVersion) -> Result { let current_version = self.table.metadata().format_version(); if current_version > format_version { - return Err(Error::new(ErrorKind::DataInvalid, format!("Cannot downgrade table version from {} to {}", - current_version, - format_version))); + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot downgrade table version from {} to {}", + current_version, format_version + ), + )); } else if current_version < format_version { - self.append_updates(vec![UpgradeFormatVersion { - format_version - }])?; + self.append_updates(vec![UpgradeFormatVersion { format_version }])?; } Ok(self) } /// Update table's property. pub fn set_properties(mut self, props: HashMap) -> Result { - self.append_updates(vec![TableUpdate::SetProperties { - updates: props - }])?; + self.append_updates(vec![TableUpdate::SetProperties { updates: props }])?; Ok(self) } @@ -84,9 +84,7 @@ impl<'a> Transaction<'a> { /// Remove properties in table. pub fn remove_properties(mut self, keys: Vec) -> Result { - self.append_updates(vec![TableUpdate::RemoveProperties { - removals: keys - }])?; + self.append_updates(vec![TableUpdate::RemoveProperties { removals: keys }])?; Ok(self) } @@ -121,18 +119,28 @@ impl<'a> ReplaceSortOrderAction<'a> { /// Finished building the action and apply it to the transaction. pub fn apply(mut self) -> Result> { - let updates = vec![TableUpdate::AddSortOrder { - sort_order: SortOrder { - fields: self.sort_fields, - ..SortOrder::default() - } - }, TableUpdate::SetDefaultSortOrder { - sort_order_id: -1 - }]; + let updates = vec![ + TableUpdate::AddSortOrder { + sort_order: SortOrder { + fields: self.sort_fields, + ..SortOrder::default() + }, + }, + TableUpdate::SetDefaultSortOrder { sort_order_id: -1 }, + ]; let requirements = vec![ - TableRequirement::CurrentSchemaIdMatch(self.tx.table.metadata().current_schema().schema_id() as i64), - TableRequirement::DefaultSortOrderIdMatch(self.tx.table.metadata().default_sort_order().unwrap().order_id), + TableRequirement::CurrentSchemaIdMatch( + self.tx.table.metadata().current_schema().schema_id() as i64, + ), + TableRequirement::DefaultSortOrderIdMatch( + self.tx + .table + .metadata() + .default_sort_order() + .unwrap() + .order_id, + ), ]; self.tx.append_requirements(requirements)?; @@ -140,9 +148,24 @@ impl<'a> ReplaceSortOrderAction<'a> { Ok(self.tx) } - fn add_sort_field(mut self, name: &str, sort_direction: SortDirection, null_order: NullOrder) -> Result { - let field_id = self.tx.table.metadata().current_schema().field_id_by_name(name) - .ok_or_else(|| Error::new(ErrorKind::DataInvalid, format!("Cannot find field {} in table schema", name)))?; + fn add_sort_field( + mut self, + name: &str, + sort_direction: SortDirection, + null_order: NullOrder, + ) -> Result { + let field_id = self + .tx + .table + .metadata() + .current_schema() + .field_id_by_name(name) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Cannot find field {} in table schema", name), + ) + })?; let sort_field = SortField::builder() .source_id(field_id) @@ -154,4 +177,4 @@ impl<'a> ReplaceSortOrderAction<'a> { self.sort_fields.push(sort_field); Ok(self) } -} \ No newline at end of file +} From 8909ce3f2fc2121be6a740790cf23f631d2adce3 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Fri, 17 Nov 2023 16:22:16 +0800 Subject: [PATCH 04/10] Add tests for table updates and requirements --- crates/catalog/rest/src/catalog.rs | 62 ++ crates/iceberg/src/catalog/mod.rs | 753 ++++++++++++++++++++-- crates/iceberg/src/rest.rs | 23 - crates/iceberg/src/spec/schema.rs | 12 +- crates/iceberg/src/spec/table_metadata.rs | 24 +- crates/iceberg/src/transaction.rs | 13 +- 6 files changed, 797 insertions(+), 90 deletions(-) delete mode 100644 crates/iceberg/src/rest.rs diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index f0a600a128..cbe96772e6 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -1498,4 +1498,66 @@ mod tests { config_mock.assert_async().await; update_table_mock.assert_async().await; } + + #[tokio::test] + async fn test_update_table_404() { + let mut server = Server::new_async().await; + + let config_mock = create_config_mock(&mut server).await; + + let update_table_mock = server + .mock("POST", "/v1/namespaces/ns1/tables/test1") + .with_status(404) + .with_body( + r#" +{ + "error": { + "message": "The given table does not exist", + "type": "NoSuchTableException", + "code": 404 + } +} + "#, + ) + .create_async() + .await; + + let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build()) + .await + .unwrap(); + + let table1 = { + let file = File::open(format!( + "{}/testdata/{}", + env!("CARGO_MANIFEST_DIR"), + "create_table_response.json" + )) + .unwrap(); + let reader = BufReader::new(file); + let resp = serde_json::from_reader::<_, LoadTableResponse>(reader).unwrap(); + + Table::builder() + .metadata(resp.metadata) + .metadata_location(resp.metadata_location.unwrap()) + .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap()) + .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap()) + .build() + }; + + let table_result = Transaction::new(&table1) + .upgrade_table_version(FormatVersion::V2) + .unwrap() + .commit(&catalog) + .await; + + assert!(table_result.is_err()); + assert!(table_result + .err() + .unwrap() + .message() + .contains("The given table does not exist")); + + config_mock.assert_async().await; + update_table_mock.assert_async().await; + } } diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 0014a4c68c..da276bac9f 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -28,13 +28,14 @@ use std::collections::HashMap; use std::mem::take; use std::ops::Deref; use typed_builder::TypedBuilder; +use uuid::Uuid; /// The catalog API for Iceberg Rust. #[async_trait] pub trait Catalog: std::fmt::Debug { /// List namespaces from table. async fn list_namespaces(&self, parent: Option<&NamespaceIdent>) - -> Result>; + -> Result>; /// Create a new namespace inside the catalog. async fn create_namespace( @@ -115,7 +116,7 @@ impl NamespaceIdent { } /// Try to create namespace identifier from an iterator of string. - pub fn from_strs(iter: impl IntoIterator) -> Result { + pub fn from_strs(iter: impl IntoIterator) -> Result { Self::from_vec(iter.into_iter().map(|s| s.to_string()).collect()) } @@ -199,7 +200,7 @@ impl TableIdent { } /// Try to create table identifier from an iterator of string. - pub fn from_strs(iter: impl IntoIterator) -> Result { + pub fn from_strs(iter: impl IntoIterator) -> Result { let mut vec: Vec = iter.into_iter().map(|s| s.to_string()).collect(); let table_name = vec.pop().ok_or_else(|| { Error::new(ErrorKind::DataInvalid, "Table identifier can't be empty!") @@ -236,7 +237,7 @@ pub struct TableCreation { /// TableCommit represents the commit of a table in the catalog. #[derive(Debug, TypedBuilder)] -#[builder(build_method(vis="pub(crate)"))] +#[builder(build_method(vis = "pub(crate)"))] pub struct TableCommit { /// The table ident. ident: TableIdent, @@ -266,101 +267,133 @@ impl TableCommit { } /// TableRequirement represents a requirement for a table in the catalog. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[serde(tag = "type")] pub enum TableRequirement { /// The table must not already exist; used for create transactions + #[serde(rename = "assert-create")] NotExist, /// The table UUID must match the requirement. - UuidMatch(String), + #[serde(rename = "assert-table-uuid")] + UuidMatch { uuid: Uuid }, /// The table branch or tag identified by the requirement's `reference` must /// reference the requirement's `snapshot-id`. + #[serde(rename = "assert-ref-snapshot-id")] RefSnapshotIdMatch { /// The reference of the table to assert. - reference: String, + r#ref: String, /// The snapshot id of the table to assert. /// If the id is `None`, the ref must not already exist. + #[serde(rename = "snapshot-id")] snapshot_id: Option, }, /// The table's last assigned column id must match the requirement. - LastAssignedFieldIdMatch(i64), + #[serde(rename = "assert-last-assigned-field-id")] + LastAssignedFieldIdMatch { + #[serde(rename = "last-assigned-field-id")] + last_assigned_field_id: i64, + }, /// The table's current schema id must match the requirement. - CurrentSchemaIdMatch(i64), + #[serde(rename = "assert-current-schema-id")] + CurrentSchemaIdMatch { + #[serde(rename = "current-schema-id")] + current_schema_id: i64, + }, /// The table's last assigned partition id must match the /// requirement. - LastAssignedPartitionIdMatch(i64), + #[serde(rename = "assert-last-assigned-partition-id")] + LastAssignedPartitionIdMatch { + #[serde(rename = "last-assigned-partition-id")] + last_assigned_partition_id: i64, + }, /// The table's default spec id must match the requirement. - DefaultSpecIdMatch(i64), + #[serde(rename = "assert-default-spec-id")] + DefaultSpecIdMatch { + #[serde(rename = "default-spec-id")] + default_spec_id: i64, + }, /// The table's default sort order id must match the requirement. - DefaultSortOrderIdMatch(i64), + #[serde(rename = "assert-default-sort-order-id")] + DefaultSortOrderIdMatch { + #[serde(rename = "default-sort-order-id")] + default_sort_order_id: i64, + }, } /// TableUpdate represents an update to a table in the catalog. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[serde(tag = "action", rename_all = "kebab-case")] pub enum TableUpdate { /// Upgrade table's format version + #[serde(rename_all = "kebab-case")] UpgradeFormatVersion { /// Target format upgrade to. format_version: FormatVersion, }, + /// Assign a new UUID to the table + #[serde(rename_all = "kebab-case")] + AssignUuid { + /// The new UUID to assign. + uuid: Uuid, + }, /// Add a new schema to the table + #[serde(rename_all = "kebab-case")] AddSchema { schema: Schema, - last_column_id: i32, + last_column_id: Option, }, /// Set table's current schema - SetCurrentSchema { - schema_id: i32 - }, + #[serde(rename_all = "kebab-case")] + SetCurrentSchema { schema_id: i32 }, /// Add a new partition spec to the table - AddPartitionSpec { - spec: PartitionSpec, - }, + AddSpec { spec: PartitionSpec }, /// Set table's default spec - SetDefaultSpec { - spec_id: i32, - }, + #[serde(rename_all = "kebab-case")] + SetDefaultSpec { spec_id: i32 }, /// Add sort order to table. - AddSortOrder { - sort_order: SortOrder, - }, + #[serde(rename_all = "kebab-case")] + AddSortOrder { sort_order: SortOrder }, /// Set table's default sort order - SetDefaultSortOrder { - sort_order_id: i32 - }, + #[serde(rename_all = "kebab-case")] + SetDefaultSortOrder { sort_order_id: i32 }, /// Add snapshot to table. - AddSnapshot { - snapshot: Snapshot, - }, + #[serde(rename_all = "kebab-case")] + AddSnapshot { snapshot: Snapshot }, /// Set table's snapshot ref. + #[serde(rename_all = "kebab-case")] SetSnapshotRef { ref_name: String, + #[serde(flatten)] reference: SnapshotReference, }, /// Remove table's snapshots - RemoveSnapshots { - snapshot_ids: Vec, - }, + #[serde(rename_all = "kebab-case")] + RemoveSnapshots { snapshot_ids: Vec }, /// Remove snapshot reference - RemoveSnapshotRef { - ref_name: String, - }, + #[serde(rename_all = "kebab-case")] + RemoveSnapshotRef { ref_name: String }, /// Update table's location - SetLocation { - location: String, - }, + SetLocation { location: String }, /// Update table's properties - SetProperties { - updates: HashMap, - }, + SetProperties { updates: HashMap }, /// Remove table's properties - RemoveProperties { - removals: Vec, - }, + RemoveProperties { removals: Vec }, } #[cfg(test)] mod tests { - use crate::{NamespaceIdent, TableIdent}; + use crate::spec::ManifestListLocation::ManifestListFile; + use crate::spec::{ + FormatVersion, NestedField, NullOrder, Operation, PartitionField, PartitionSpec, + PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, + SortField, SortOrder, Summary, Transform, Type, + }; + use crate::{NamespaceIdent, TableIdent, TableRequirement, TableUpdate}; + use serde::de::DeserializeOwned; + use serde::Serialize; + use std::collections::HashMap; + use std::fmt::{Debug, Display}; + use uuid::uuid; #[test] fn test_create_table_id() { @@ -371,4 +404,628 @@ mod tests { assert_eq!(table_id, TableIdent::from_strs(vec!["ns1", "t1"]).unwrap()); } + + fn test_serde_json( + json: impl ToString, + expected: T, + ) { + let json_str = json.to_string(); + let actual: T = serde_json::from_str(&json_str).expect("Failed to parse from json"); + assert_eq!(actual, expected, "Parsed value is not equal to expected"); + + let restored: T = serde_json::from_str( + &serde_json::to_string(&actual).expect("Failed to serialize to json"), + ) + .expect("Failed to parse from serialized json"); + + assert_eq!( + restored, expected, + "Parsed restored value is not equal to expected" + ); + } + + #[test] + fn test_table_uuid() { + test_serde_json( + r#" +{ + "type": "assert-table-uuid", + "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151" +} + "#, + TableRequirement::UuidMatch { + uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"), + }, + ); + } + + #[test] + fn test_assert_table_not_exists() { + test_serde_json( + r#" +{ + "type": "assert-create" +} + "#, + TableRequirement::NotExist, + ); + } + + #[test] + fn test_assert_ref_snapshot_id() { + test_serde_json( + r#" +{ + "type": "assert-ref-snapshot-id", + "ref": "snapshot-name", + "snapshot-id": null +} + "#, + TableRequirement::RefSnapshotIdMatch { + r#ref: "snapshot-name".to_string(), + snapshot_id: None, + }, + ); + + test_serde_json( + r#" +{ + "type": "assert-ref-snapshot-id", + "ref": "snapshot-name", + "snapshot-id": 1 +} + "#, + TableRequirement::RefSnapshotIdMatch { + r#ref: "snapshot-name".to_string(), + snapshot_id: Some(1), + }, + ); + } + + #[test] + fn test_assert_last_assigned_field_id() { + test_serde_json( + r#" +{ + "type": "assert-last-assigned-field-id", + "last-assigned-field-id": 12 +} + "#, + TableRequirement::LastAssignedFieldIdMatch { + last_assigned_field_id: 12, + }, + ); + } + + #[test] + fn test_assert_current_schema_id() { + test_serde_json( + r#" +{ + "type": "assert-current-schema-id", + "current-schema-id": 4 +} + "#, + TableRequirement::CurrentSchemaIdMatch { + current_schema_id: 4, + }, + ); + } + + #[test] + fn test_assert_last_assigned_partition_id() { + test_serde_json( + r#" +{ + "type": "assert-last-assigned-partition-id", + "last-assigned-partition-id": 1004 +} + "#, + TableRequirement::LastAssignedPartitionIdMatch { + last_assigned_partition_id: 1004, + }, + ); + } + + #[test] + fn test_assert_default_spec_id() { + test_serde_json( + r#" +{ + "type": "assert-default-spec-id", + "default-spec-id": 5 +} + "#, + TableRequirement::DefaultSpecIdMatch { default_spec_id: 5 }, + ); + } + + #[test] + fn test_assert_default_sort_order() { + let json = r#" +{ + "type": "assert-default-sort-order-id", + "default-sort-order-id": 10 +} + "#; + + let update = TableRequirement::DefaultSortOrderIdMatch { + default_sort_order_id: 10, + }; + + test_serde_json(json, update); + } + + #[test] + fn test_parse_assert_invalid() { + assert!( + serde_json::from_str::( + r#" +{ + "default-sort-order-id": 10 +} +"# + ) + .is_err(), + "Table requirements should not be parsed without type." + ); + } + + #[test] + fn test_assign_uuid() { + test_serde_json( + r#" +{ + "action": "assign-uuid", + "uuid": "2cc52516-5e73-41f2-b139-545d41a4e151" +} + "#, + TableUpdate::AssignUuid { + uuid: uuid!("2cc52516-5e73-41f2-b139-545d41a4e151"), + }, + ); + } + + #[test] + fn test_upgrade_format_version() { + test_serde_json( + r#" +{ + "action": "upgrade-format-version", + "format-version": 2 +} + "#, + TableUpdate::UpgradeFormatVersion { + format_version: FormatVersion::V2, + }, + ); + } + + #[test] + fn test_add_schema() { + let test_schema = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![2]) + .with_fields(vec![ + NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), + ]) + .build() + .unwrap(); + test_serde_json( + r#" +{ + "action": "add-schema", + "schema": { + "type": "struct", + "schema-id": 1, + "fields": [ + { + "id": 1, + "name": "foo", + "required": false, + "type": "string" + }, + { + "id": 2, + "name": "bar", + "required": true, + "type": "int" + }, + { + "id": 3, + "name": "baz", + "required": false, + "type": "boolean" + } + ], + "identifier-field-ids": [ + 2 + ] + }, + "last-column-id": 3 +} + "#, + TableUpdate::AddSchema { + schema: test_schema.clone(), + last_column_id: Some(3), + }, + ); + + test_serde_json( + r#" +{ + "action": "add-schema", + "schema": { + "type": "struct", + "schema-id": 1, + "fields": [ + { + "id": 1, + "name": "foo", + "required": false, + "type": "string" + }, + { + "id": 2, + "name": "bar", + "required": true, + "type": "int" + }, + { + "id": 3, + "name": "baz", + "required": false, + "type": "boolean" + } + ], + "identifier-field-ids": [ + 2 + ] + } +} + "#, + TableUpdate::AddSchema { + schema: test_schema.clone(), + last_column_id: None, + }, + ); + } + + #[test] + fn test_set_current_schema() { + test_serde_json( + r#" +{ + "action": "set-current-schema", + "schema-id": 23 +} + "#, + TableUpdate::SetCurrentSchema { schema_id: 23 }, + ); + } + + #[test] + fn test_add_spec() { + test_serde_json( + r#" +{ + "action": "add-spec", + "spec": { + "spec-id": 1, + "fields": [ + { + "source-id": 4, + "field-id": 1000, + "name": "ts_day", + "transform": "day" + }, + { + "source-id": 1, + "field-id": 1001, + "name": "id_bucket", + "transform": "bucket[16]" + }, + { + "source-id": 2, + "field-id": 1002, + "name": "id_truncate", + "transform": "truncate[4]" + } + ] + } +} + "#, + TableUpdate::AddSpec { + spec: PartitionSpec::builder() + .with_spec_id(1) + .with_partition_field( + PartitionField::builder() + .source_id(4) + .field_id(1000) + .name("ts_day".to_string()) + .transform(Transform::Day) + .build(), + ) + .with_partition_field( + PartitionField::builder() + .source_id(1) + .field_id(1001) + .name("id_bucket".to_string()) + .transform(Transform::Bucket(16)) + .build(), + ) + .with_partition_field( + PartitionField::builder() + .source_id(2) + .field_id(1002) + .name("id_truncate".to_string()) + .transform(Transform::Truncate(4)) + .build(), + ) + .build() + .unwrap(), + }, + ); + } + + #[test] + fn test_set_default_spec() { + test_serde_json( + r#" +{ + "action": "set-default-spec", + "spec-id": 1 +} + "#, + TableUpdate::SetDefaultSpec { spec_id: 1 }, + ) + } + + #[test] + fn test_add_sort_order() { + let json = r#" +{ + "action": "add-sort-order", + "sort-order": { + "order-id": 1, + "fields": [ + { + "transform": "identity", + "source-id": 2, + "direction": "asc", + "null-order": "nulls-first" + }, + { + "transform": "bucket[4]", + "source-id": 3, + "direction": "desc", + "null-order": "nulls-last" + } + ] + } +} + "#; + + let update = TableUpdate::AddSortOrder { + sort_order: SortOrder::builder() + .with_order_id(1) + .with_sort_field( + SortField::builder() + .source_id(2) + .direction(SortDirection::Ascending) + .null_order(NullOrder::First) + .transform(Transform::Identity) + .build(), + ) + .with_sort_field( + SortField::builder() + .source_id(3) + .direction(SortDirection::Descending) + .null_order(NullOrder::Last) + .transform(Transform::Bucket(4)) + .build(), + ) + .build() + .unwrap(), + }; + + test_serde_json(json, update); + } + + #[test] + fn test_set_default_order() { + let json = r#" +{ + "action": "set-default-sort-order", + "sort-order-id": 2 +} + "#; + let update = TableUpdate::SetDefaultSortOrder { sort_order_id: 2 }; + + test_serde_json(json, update); + } + + #[test] + fn test_add_snapshot() { + let json = r#" +{ + "action": "add-snapshot", + "snapshot": { + "snapshot-id": 3055729675574597000, + "parent-snapshot-id": 3051729675574597000, + "timestamp-ms": 1555100955770, + "sequence-number": 1, + "summary": { + "operation": "append" + }, + "manifest-list": "s3://a/b/2.avro", + "schema-id": 1 + } +} + "#; + + let update = TableUpdate::AddSnapshot { + snapshot: Snapshot::builder() + .with_snapshot_id(3055729675574597000) + .with_parent_snapshot_id(Some(3051729675574597000)) + .with_timestamp_ms(1555100955770) + .with_sequence_number(1) + .with_manifest_list(ManifestListFile("s3://a/b/2.avro".to_string())) + .with_schema_id(1) + .with_summary(Summary { + operation: Operation::Append, + other: HashMap::default(), + }) + .build() + .unwrap(), + }; + + test_serde_json(json, update); + } + + #[test] + fn test_remove_snapshots() { + let json = r#" +{ + "action": "remove-snapshots", + "snapshot-ids": [ + 1, + 2 + ] +} + "#; + + let update = TableUpdate::RemoveSnapshots { + snapshot_ids: vec![1, 2], + }; + test_serde_json(json, update); + } + + #[test] + fn test_remove_snapshot_ref() { + let json = r#" +{ + "action": "remove-snapshot-ref", + "ref-name": "snapshot-ref" +} + "#; + + let update = TableUpdate::RemoveSnapshotRef { + ref_name: "snapshot-ref".to_string(), + }; + test_serde_json(json, update); + } + + #[test] + fn test_set_snapshot_ref_tag() { + let json = r#" +{ + "action": "set-snapshot-ref", + "type": "tag", + "ref-name": "hank", + "snapshot-id": 1, + "max-ref-age-ms": 1 +} + "#; + + let update = TableUpdate::SetSnapshotRef { + ref_name: "hank".to_string(), + reference: SnapshotReference { + snapshot_id: 1, + retention: SnapshotRetention::Tag { max_ref_age_ms: 1 }, + }, + }; + + test_serde_json(json, update); + } + + #[test] + fn test_set_snapshot_ref_branch() { + let json = r#" +{ + "action": "set-snapshot-ref", + "type": "branch", + "ref-name": "hank", + "snapshot-id": 1, + "min-snapshots-to-keep": 2, + "max-snapshot-age-ms": 3, + "max-ref-age-ms": 4 +} + "#; + + let update = TableUpdate::SetSnapshotRef { + ref_name: "hank".to_string(), + reference: SnapshotReference { + snapshot_id: 1, + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: Some(2), + max_snapshot_age_ms: Some(3), + max_ref_age_ms: Some(4), + }, + }, + }; + + test_serde_json(json, update); + } + + #[test] + fn test_set_properties() { + let json = r#" +{ + "action": "set-properties", + "updates": { + "prop1": "v1", + "prop2": "v2" + } +} + "#; + + let update = TableUpdate::SetProperties { + updates: vec![ + ("prop1".to_string(), "v1".to_string()), + ("prop2".to_string(), "v2".to_string()), + ] + .into_iter() + .collect(), + }; + + test_serde_json(json, update); + } + + #[test] + fn test_remove_properties() { + let json = r#" +{ + "action": "remove-properties", + "removals": [ + "prop1", + "prop2" + ] +} + "#; + + let update = TableUpdate::RemoveProperties { + removals: vec!["prop1".to_string(), "prop2".to_string()], + }; + + test_serde_json(json, update); + } + + #[test] + fn test_set_location() { + let json = r#" +{ + "action": "set-location", + "location": "s3://bucket/warehouse/tbl_location" +} + "#; + + let update = TableUpdate::SetLocation { + location: "s3://bucket/warehouse/tbl_location".to_string(), + }; + + test_serde_json(json, update); + } } diff --git a/crates/iceberg/src/rest.rs b/crates/iceberg/src/rest.rs deleted file mode 100644 index 31d4da8b67..0000000000 --- a/crates/iceberg/src/rest.rs +++ /dev/null @@ -1,23 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! This module provide rest catalog implementation. - -#[derive(Debug)] -pub struct RestCatalog { - url: String, -} diff --git a/crates/iceberg/src/spec/schema.rs b/crates/iceberg/src/spec/schema.rs index 3679445e62..d690c2d22d 100644 --- a/crates/iceberg/src/spec/schema.rs +++ b/crates/iceberg/src/spec/schema.rs @@ -37,7 +37,7 @@ pub type SchemaRef = Arc; const DEFAULT_SCHEMA_ID: i32 = 0; /// Defines schema in iceberg. -#[derive(Debug, PartialEq, Serialize, Deserialize, Eq, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone)] #[serde(try_from = "SchemaEnum", into = "SchemaEnum")] pub struct Schema { r#struct: StructType, @@ -53,6 +53,16 @@ pub struct Schema { lower_case_name_to_id: OnceLock>, } +impl PartialEq for Schema { + fn eq(&self, other: &Self) -> bool { + self.r#struct == other.r#struct + && self.schema_id == other.schema_id + && self.identifier_field_ids == other.identifier_field_ids + } +} + +impl Eq for Schema {} + /// Schema builder. #[derive(Debug)] pub struct SchemaBuilder { diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 8b2204b929..2c482b032d 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -22,9 +22,9 @@ The main struct here is [TableMetadataV2] which defines the data for a table. use serde::{Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr}; -use std::{collections::HashMap, sync::Arc}; use std::cmp::Ordering; use std::fmt::{Display, Formatter}; +use std::{collections::HashMap, sync::Arc}; use uuid::Uuid; use super::{ @@ -141,7 +141,7 @@ impl TableMetadata { /// Returns schemas #[inline] - pub fn schemas_iter(&self) -> impl Iterator { + pub fn schemas_iter(&self) -> impl Iterator { self.schemas.values() } @@ -160,7 +160,7 @@ impl TableMetadata { /// Returns all partition specs. #[inline] - pub fn partition_specs_iter(&self) -> impl Iterator { + pub fn partition_specs_iter(&self) -> impl Iterator { self.partition_specs.values() } @@ -185,7 +185,7 @@ impl TableMetadata { /// Returns all snapshots #[inline] - pub fn snapshots(&self) -> impl Iterator { + pub fn snapshots(&self) -> impl Iterator { self.snapshots.values() } @@ -212,7 +212,7 @@ impl TableMetadata { /// Return all sort orders. #[inline] - pub fn sort_orders_iter(&self) -> impl Iterator { + pub fn sort_orders_iter(&self) -> impl Iterator { self.sort_orders.values() } @@ -375,8 +375,8 @@ pub(super) mod _serde { impl Serialize for VersionNumber { fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, + where + S: serde::Serializer, { serializer.serialize_u8(V) } @@ -384,8 +384,8 @@ pub(super) mod _serde { impl<'de, const V: u8> Deserialize<'de> for VersionNumber { fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, + where + D: serde::Deserializer<'de>, { let value = u8::deserialize(deserializer)?; if value == V { @@ -748,9 +748,9 @@ pub(super) mod _serde { /// Iceberg format version pub enum FormatVersion { /// Iceberg spec version 1 - V1 = b'1', + V1 = 1u8, /// Iceberg spec version 2 - V2 = b'2', + V2 = 2u8, } impl PartialOrd for FormatVersion { @@ -1457,7 +1457,7 @@ mod tests { let metadata = fs::read_to_string( "testdata/table_metadata/TableMetadataV2MissingLastPartitionId.json", ) - .unwrap(); + .unwrap(); let desered: Result = serde_json::from_str(&metadata); diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 05d5d1c07c..545c95d9a9 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -130,17 +130,18 @@ impl<'a> ReplaceSortOrderAction<'a> { ]; let requirements = vec![ - TableRequirement::CurrentSchemaIdMatch( - self.tx.table.metadata().current_schema().schema_id() as i64, - ), - TableRequirement::DefaultSortOrderIdMatch( - self.tx + TableRequirement::CurrentSchemaIdMatch { + current_schema_id: self.tx.table.metadata().current_schema().schema_id() as i64, + }, + TableRequirement::DefaultSortOrderIdMatch { + default_sort_order_id: self + .tx .table .metadata() .default_sort_order() .unwrap() .order_id, - ), + }, ]; self.tx.append_requirements(requirements)?; From b0fe15db276eea9fa9481019034a13bd57f11151 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Fri, 17 Nov 2023 16:27:54 +0800 Subject: [PATCH 05/10] Add some comments --- crates/iceberg/src/catalog/mod.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index da276bac9f..7e5cfff9da 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -344,18 +344,27 @@ pub enum TableUpdate { }, /// Set table's current schema #[serde(rename_all = "kebab-case")] - SetCurrentSchema { schema_id: i32 }, + SetCurrentSchema { + /// Schema ID to set as current, or -1 to set last added schema + schema_id: i32, + }, /// Add a new partition spec to the table AddSpec { spec: PartitionSpec }, /// Set table's default spec #[serde(rename_all = "kebab-case")] - SetDefaultSpec { spec_id: i32 }, + SetDefaultSpec { + /// Partition spec ID to set as the default, or -1 to set last added spec + spec_id: i32, + }, /// Add sort order to table. #[serde(rename_all = "kebab-case")] AddSortOrder { sort_order: SortOrder }, /// Set table's default sort order #[serde(rename_all = "kebab-case")] - SetDefaultSortOrder { sort_order_id: i32 }, + SetDefaultSortOrder { + /// Sort order ID to set as the default, or -1 to set last added sort order + sort_order_id: i32, + }, /// Add snapshot to table. #[serde(rename_all = "kebab-case")] AddSnapshot { snapshot: Snapshot }, From 8624626b3b238e2615316220b21eaaf882d60b66 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Mon, 20 Nov 2023 11:43:05 +0800 Subject: [PATCH 06/10] Add tests for transaction --- crates/iceberg/src/transaction.rs | 178 ++++++++++++++++++++++++++++++ 1 file changed, 178 insertions(+) diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 545c95d9a9..93ec8ee294 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -23,6 +23,7 @@ use crate::table::Table; use crate::TableUpdate::UpgradeFormatVersion; use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate}; use std::collections::HashMap; +use std::mem::discriminant; /// Table transaction. pub struct Transaction<'a> { @@ -42,6 +43,19 @@ impl<'a> Transaction<'a> { } fn append_updates(&mut self, updates: Vec) -> Result<()> { + for update in &updates { + for up in &self.updates { + if discriminant(up) == discriminant(update) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot apply update with same type at same time: {:?}", + update + ), + )); + } + } + } self.updates.extend(updates); Ok(()) } @@ -179,3 +193,167 @@ impl<'a> ReplaceSortOrderAction<'a> { Ok(self) } } + +#[cfg(test)] +mod tests { + use crate::io::FileIO; + use crate::spec::{FormatVersion, TableMetadata}; + use crate::table::Table; + use crate::transaction::Transaction; + use crate::{TableIdent, TableRequirement, TableUpdate}; + use std::collections::HashMap; + use std::fs::{metadata, File}; + use std::io::BufReader; + + fn make_v1_table() -> Table { + let file = File::open(format!( + "{}/testdata/table_metadata/{}", + env!("CARGO_MANIFEST_DIR"), + "TableMetadataV1Valid.json" + )) + .unwrap(); + let reader = BufReader::new(file); + let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap(); + + Table::builder() + .metadata(resp) + .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string()) + .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap()) + .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap()) + .build() + } + + fn make_v2_table() -> Table { + let file = File::open(format!( + "{}/testdata/table_metadata/{}", + env!("CARGO_MANIFEST_DIR"), + "TableMetadataV2Valid.json" + )) + .unwrap(); + let reader = BufReader::new(file); + let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap(); + + Table::builder() + .metadata(resp) + .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string()) + .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap()) + .file_io(FileIO::from_path("/tmp").unwrap().build().unwrap()) + .build() + } + + #[test] + fn test_upgrade_table_version_v1_to_v2() { + let table = make_v1_table(); + let tx = Transaction::new(&table); + let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap(); + + assert_eq!( + vec![TableUpdate::UpgradeFormatVersion { + format_version: FormatVersion::V2 + }], + tx.updates + ); + } + + #[test] + fn test_upgrade_table_version_v2_to_v2() { + let table = make_v2_table(); + let tx = Transaction::new(&table); + let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap(); + + assert!( + tx.updates.is_empty(), + "Upgrade table to same version should not generate any updates" + ); + assert!( + tx.requirements.is_empty(), + "Upgrade table to same version should not generate any requirements" + ); + } + + #[test] + fn test_downgrade_table_version() { + let table = make_v2_table(); + let tx = Transaction::new(&table); + let tx = tx.upgrade_table_version(FormatVersion::V1); + + assert!(tx.is_err(), "Downgrade table version should fail!"); + } + + #[test] + fn test_set_table_property() { + let table = make_v2_table(); + let tx = Transaction::new(&table); + let tx = tx + .set_properties(HashMap::from([("a".to_string(), "b".to_string())])) + .unwrap(); + + assert_eq!( + vec![TableUpdate::SetProperties { + updates: HashMap::from([("a".to_string(), "b".to_string())]) + }], + tx.updates + ); + } + + #[test] + fn test_remove_property() { + let table = make_v2_table(); + let tx = Transaction::new(&table); + let tx = tx + .remove_properties(vec!["a".to_string(), "b".to_string()]) + .unwrap(); + + assert_eq!( + vec![TableUpdate::RemoveProperties { + removals: vec!["a".to_string(), "b".to_string()] + }], + tx.updates + ); + } + + #[test] + fn test_replace_sort_order() { + let table = make_v2_table(); + let tx = Transaction::new(&table); + let tx = tx.replace_sort_order().apply().unwrap(); + + assert_eq!( + vec![ + TableUpdate::AddSortOrder { + sort_order: Default::default() + }, + TableUpdate::SetDefaultSortOrder { sort_order_id: -1 } + ], + tx.updates + ); + + assert_eq!( + vec![ + TableRequirement::CurrentSchemaIdMatch { + current_schema_id: 1 + }, + TableRequirement::DefaultSortOrderIdMatch { + default_sort_order_id: 3 + } + ], + tx.requirements + ); + } + + #[test] + fn test_do_same_update_in_same_transaction() { + let table = make_v2_table(); + let tx = Transaction::new(&table); + let tx = tx + .remove_properties(vec!["a".to_string(), "b".to_string()]) + .unwrap(); + + let tx = tx.remove_properties(vec!["c".to_string(), "d".to_string()]); + + assert!( + tx.is_err(), + "Should not allow to do same kinds update in same transaction" + ); + } +} From 39a7387ab9d5e20132207fd1e2c21b688fa9ba18 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Mon, 20 Nov 2023 11:43:33 +0800 Subject: [PATCH 07/10] Fix format --- crates/iceberg/src/lib.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 8f7ba3b109..5bde2a3f23 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -44,6 +44,5 @@ mod avro; pub mod io; pub mod spec; -pub mod transform; pub mod transaction; - +pub mod transform; From 4db89a813392fae97d7f8d2c5678bc8fb26913d4 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Mon, 20 Nov 2023 11:55:21 +0800 Subject: [PATCH 08/10] Fix linters --- crates/catalog/rest/src/catalog.rs | 9 ++--- crates/iceberg/src/catalog/mod.rs | 56 ++++++++++++++++++++++++------ crates/iceberg/src/lib.rs | 2 +- crates/iceberg/src/spec/schema.rs | 4 +-- crates/iceberg/src/transaction.rs | 35 +++++++++++-------- 5 files changed, 71 insertions(+), 35 deletions(-) diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index cbe96772e6..1805dab680 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -433,7 +433,7 @@ impl Catalog for RestCatalog { let request = self .client .0 - .post(self.config.table_endpoint(&commit.identifier())) + .post(self.config.table_endpoint(commit.identifier())) .json(&CommitTableRequest { identifier: commit.identifier().clone(), requirements: commit.take_requirements(), @@ -501,12 +501,7 @@ impl RestCatalog { props.extend(config); } - let file_io = match self - .config - .warehouse - .as_deref() - .or_else(|| metadata_location) - { + let file_io = match self.config.warehouse.as_deref().or(metadata_location) { Some(url) => FileIO::from_path(url)?.with_props(props).build()?, None => FileIOBuilder::new("s3").with_props(props).build()?, }; diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 7e5cfff9da..ce3c5e7a0e 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -275,7 +275,10 @@ pub enum TableRequirement { NotExist, /// The table UUID must match the requirement. #[serde(rename = "assert-table-uuid")] - UuidMatch { uuid: Uuid }, + UuidMatch { + /// Uuid of original table. + uuid: Uuid, + }, /// The table branch or tag identified by the requirement's `reference` must /// reference the requirement's `snapshot-id`. #[serde(rename = "assert-ref-snapshot-id")] @@ -290,12 +293,14 @@ pub enum TableRequirement { /// The table's last assigned column id must match the requirement. #[serde(rename = "assert-last-assigned-field-id")] LastAssignedFieldIdMatch { + /// The last assigned field id of the table to assert. #[serde(rename = "last-assigned-field-id")] last_assigned_field_id: i64, }, /// The table's current schema id must match the requirement. #[serde(rename = "assert-current-schema-id")] CurrentSchemaIdMatch { + /// Current schema id of the table to assert. #[serde(rename = "current-schema-id")] current_schema_id: i64, }, @@ -303,18 +308,21 @@ pub enum TableRequirement { /// requirement. #[serde(rename = "assert-last-assigned-partition-id")] LastAssignedPartitionIdMatch { + /// Last assigned partition id of the table to assert. #[serde(rename = "last-assigned-partition-id")] last_assigned_partition_id: i64, }, /// The table's default spec id must match the requirement. #[serde(rename = "assert-default-spec-id")] DefaultSpecIdMatch { + /// Default spec id of the table to assert. #[serde(rename = "default-spec-id")] default_spec_id: i64, }, /// The table's default sort order id must match the requirement. #[serde(rename = "assert-default-sort-order-id")] DefaultSortOrderIdMatch { + /// Default sort order id of the table to assert. #[serde(rename = "default-sort-order-id")] default_sort_order_id: i64, }, @@ -339,7 +347,9 @@ pub enum TableUpdate { /// Add a new schema to the table #[serde(rename_all = "kebab-case")] AddSchema { + /// The schema to add. schema: Schema, + /// The last column id of the table. last_column_id: Option, }, /// Set table's current schema @@ -349,7 +359,10 @@ pub enum TableUpdate { schema_id: i32, }, /// Add a new partition spec to the table - AddSpec { spec: PartitionSpec }, + AddSpec { + /// The partition spec to add. + spec: PartitionSpec, + }, /// Set table's default spec #[serde(rename_all = "kebab-case")] SetDefaultSpec { @@ -358,7 +371,10 @@ pub enum TableUpdate { }, /// Add sort order to table. #[serde(rename_all = "kebab-case")] - AddSortOrder { sort_order: SortOrder }, + AddSortOrder { + /// Sort order to add. + sort_order: SortOrder, + }, /// Set table's default sort order #[serde(rename_all = "kebab-case")] SetDefaultSortOrder { @@ -367,26 +383,46 @@ pub enum TableUpdate { }, /// Add snapshot to table. #[serde(rename_all = "kebab-case")] - AddSnapshot { snapshot: Snapshot }, + AddSnapshot { + /// Snapshot to add. + snapshot: Snapshot, + }, /// Set table's snapshot ref. #[serde(rename_all = "kebab-case")] SetSnapshotRef { + /// Name of snapshot reference to set. ref_name: String, + /// Snapshot reference to set. #[serde(flatten)] reference: SnapshotReference, }, /// Remove table's snapshots #[serde(rename_all = "kebab-case")] - RemoveSnapshots { snapshot_ids: Vec }, + RemoveSnapshots { + /// Snapshot ids to remove. + snapshot_ids: Vec, + }, /// Remove snapshot reference #[serde(rename_all = "kebab-case")] - RemoveSnapshotRef { ref_name: String }, + RemoveSnapshotRef { + /// Name of snapshot reference to remove. + ref_name: String, + }, /// Update table's location - SetLocation { location: String }, + SetLocation { + /// New location for table. + location: String, + }, /// Update table's properties - SetProperties { updates: HashMap }, + SetProperties { + /// Properties to update for table. + updates: HashMap, + }, /// Remove table's properties - RemoveProperties { removals: Vec }, + RemoveProperties { + /// Properties to remove + removals: Vec, + }, } #[cfg(test)] @@ -401,7 +437,7 @@ mod tests { use serde::de::DeserializeOwned; use serde::Serialize; use std::collections::HashMap; - use std::fmt::{Debug, Display}; + use std::fmt::Debug; use uuid::uuid; #[test] diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 5bde2a3f23..378164838e 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -17,7 +17,7 @@ //! Native Rust implementation of Apache Iceberg -// #![deny(missing_docs)] +#![deny(missing_docs)] #[macro_use] extern crate derive_builder; diff --git a/crates/iceberg/src/spec/schema.rs b/crates/iceberg/src/spec/schema.rs index d690c2d22d..3aa1f8b5b4 100644 --- a/crates/iceberg/src/spec/schema.rs +++ b/crates/iceberg/src/spec/schema.rs @@ -28,7 +28,7 @@ use itertools::Itertools; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; use std::fmt::{Display, Formatter}; -use std::sync::{Arc, OnceLock}; +use std::sync::Arc; use _serde::SchemaEnum; @@ -50,7 +50,6 @@ pub struct Schema { name_to_id: HashMap, id_to_name: HashMap, - lower_case_name_to_id: OnceLock>, } impl PartialEq for Schema { @@ -127,7 +126,6 @@ impl SchemaBuilder { name_to_id, id_to_name, - lower_case_name_to_id: OnceLock::default(), }) } diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 93ec8ee294..4ea89a2977 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -22,6 +22,7 @@ use crate::spec::{FormatVersion, NullOrder, SortDirection, SortField, SortOrder, use crate::table::Table; use crate::TableUpdate::UpgradeFormatVersion; use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate}; +use std::cmp::Ordering; use std::collections::HashMap; use std::mem::discriminant; @@ -68,16 +69,22 @@ impl<'a> Transaction<'a> { /// Sets table to a new version. pub fn upgrade_table_version(mut self, format_version: FormatVersion) -> Result { let current_version = self.table.metadata().format_version(); - if current_version > format_version { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Cannot downgrade table version from {} to {}", - current_version, format_version - ), - )); - } else if current_version < format_version { - self.append_updates(vec![UpgradeFormatVersion { format_version }])?; + match current_version.cmp(&format_version) { + Ordering::Greater => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot downgrade table version from {} to {}", + current_version, format_version + ), + )); + } + Ordering::Less => { + self.append_updates(vec![UpgradeFormatVersion { format_version }])?; + } + Ordering::Equal => { + // Do nothing. + } } Ok(self) } @@ -89,7 +96,7 @@ impl<'a> Transaction<'a> { } /// Creates replace sort order action. - pub fn replace_sort_order(mut self) -> ReplaceSortOrderAction<'a> { + pub fn replace_sort_order(self) -> ReplaceSortOrderAction<'a> { ReplaceSortOrderAction { tx: self, sort_fields: vec![], @@ -122,12 +129,12 @@ pub struct ReplaceSortOrderAction<'a> { impl<'a> ReplaceSortOrderAction<'a> { /// Adds a field for sorting in ascending order. - pub fn asc(mut self, name: &str, null_order: NullOrder) -> Result { + pub fn asc(self, name: &str, null_order: NullOrder) -> Result { self.add_sort_field(name, SortDirection::Ascending, null_order) } /// Adds a field for sorting in descending order. - pub fn desc(mut self, name: &str, null_order: NullOrder) -> Result { + pub fn desc(self, name: &str, null_order: NullOrder) -> Result { self.add_sort_field(name, SortDirection::Descending, null_order) } @@ -202,7 +209,7 @@ mod tests { use crate::transaction::Transaction; use crate::{TableIdent, TableRequirement, TableUpdate}; use std::collections::HashMap; - use std::fs::{metadata, File}; + use std::fs::File; use std::io::BufReader; fn make_v1_table() -> Table { From 733b339eba44c371ccc48069aab637e5a688ad17 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Mon, 20 Nov 2023 12:27:12 +0800 Subject: [PATCH 09/10] Stop upgrading to uuid 1.6.0 --- Cargo.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index dab988441c..5aaef032ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,4 +55,5 @@ tokio = { version = "1", features = ["macros"] } typed-builder = "^0.18" url = "2" urlencoding = "2" -uuid = "1.5.0" +# We pin uuid's version to 1.5.0 because this bug: https://github.com/uuid-rs/uuid/issues/720 +uuid = "~1.5.0" From 42a5435a35204e512801b6398014a16ea091d659 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Tue, 21 Nov 2023 10:22:53 +0800 Subject: [PATCH 10/10] Fix comments --- crates/catalog/rest/src/catalog.rs | 13 +++++++++---- crates/iceberg/src/spec/table_metadata.rs | 6 ++---- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 1805dab680..59ccca0541 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -29,7 +29,7 @@ use urlencoding::encode; use crate::catalog::_serde::{ CommitTableRequest, CommitTableResponse, CreateTableRequest, LoadTableResponse, }; -use iceberg::io::{FileIO, FileIOBuilder}; +use iceberg::io::FileIO; use iceberg::table::Table; use iceberg::Result; use iceberg::{ @@ -326,7 +326,7 @@ impl Catalog for RestCatalog { partition_spec: creation.partition_spec, write_order: creation.sort_order, // We don't support stage create yet. - stage_create: false, + stage_create: None, properties: if creation.properties.is_empty() { None } else { @@ -503,7 +503,12 @@ impl RestCatalog { let file_io = match self.config.warehouse.as_deref().or(metadata_location) { Some(url) => FileIO::from_path(url)?.with_props(props).build()?, - None => FileIOBuilder::new("s3").with_props(props).build()?, + None => { + return Err(Error::new( + ErrorKind::Unexpected, + "Unable to load file io, neither warehouse nor metadata location is set!", + ))? + } }; Ok(file_io) @@ -657,7 +662,7 @@ mod _serde { pub(super) schema: Schema, pub(super) partition_spec: Option, pub(super) write_order: Option, - pub(super) stage_create: bool, + pub(super) stage_create: Option, pub(super) properties: Option>, } diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 2c482b032d..a28a4f117b 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -15,10 +15,8 @@ // specific language governing permissions and limitations // under the License. -/*! -Defines the [table metadata](https://iceberg.apache.org/spec/#table-metadata). -The main struct here is [TableMetadataV2] which defines the data for a table. - */ +//! Defines the [table metadata](https://iceberg.apache.org/spec/#table-metadata). +//! The main struct here is [TableMetadataV2] which defines the data for a table. use serde::{Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr};