From 94cf7d874b0db329d49c2416396cdbb920d5e0bd Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Tue, 3 Sep 2024 01:08:48 +0800 Subject: [PATCH 01/15] suppport remove all for file io --- crates/iceberg/src/io/file_io.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 9af398270c..d98ab5780d 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -66,6 +66,12 @@ impl FileIO { let (op, relative_path) = self.inner.create_operator(&path)?; Ok(op.delete(relative_path).await?) } + + /// Remove all + pub async fn remove_all(&self, path: impl AsRef) -> Result<()> { + let (op, relative_path) = self.inner.create_operator(&path)?; + Ok(op.remove_all(relative_path).await?) + } /// Check file exists. pub async fn is_exist(&self, path: impl AsRef) -> Result { From e3e0cd1ac16d6a97a0406e5ffe04e6efd9ca819d Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 5 Sep 2024 22:12:34 +0800 Subject: [PATCH 02/15] resolve conflict --- crates/iceberg/src/arrow/reader.rs | 16 +++++----- crates/iceberg/src/scan.rs | 44 ++++++++++++++-------------- crates/iceberg/src/spec/datatypes.rs | 2 +- 3 files changed, 31 insertions(+), 31 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 5929455449..c1ed1aaf27 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -32,6 +32,7 @@ use fnv::FnvHashSet; use futures::channel::mpsc::{channel, Sender}; use futures::future::BoxFuture; use futures::{try_join, SinkExt, StreamExt, TryFutureExt, TryStreamExt}; +use itertools::Itertools; use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter}; use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader}; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY}; @@ -270,35 +271,34 @@ impl ArrowReader { let fields = arrow_schema.fields(); let iceberg_schema = arrow_schema_to_schema(arrow_schema)?; - fields.filter_leaves(|idx, field| { + fields.into_iter().enumerate().for_each(|(idx, field)| { let field_id = field.metadata().get(PARQUET_FIELD_ID_META_KEY); if field_id.is_none() { - return false; + return; } let field_id = i32::from_str(field_id.unwrap()); if field_id.is_err() { - return false; + return; } let field_id = field_id.unwrap(); if !field_ids.contains(&field_id) { - return false; + return; } let iceberg_field = iceberg_schema_of_task.field_by_id(field_id); let parquet_iceberg_field = iceberg_schema.field_by_id(field_id); if iceberg_field.is_none() || parquet_iceberg_field.is_none() { - return false; + return; } if iceberg_field.unwrap().field_type != parquet_iceberg_field.unwrap().field_type { - return false; + return; } column_map.insert(field_id, idx); - true }); if column_map.len() != field_ids.len() { @@ -322,7 +322,7 @@ impl ArrowReader { )); } } - Ok(ProjectionMask::leaves(parquet_schema, indices)) + Ok(ProjectionMask::roots(parquet_schema, indices)) } } diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index f1cb86ab38..69209b75ec 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -213,28 +213,28 @@ impl<'a> TableScanBuilder<'a> { ) })?; - let field = schema - .as_struct() - .field_by_id(field_id) - .ok_or_else(|| { - Error::new( - ErrorKind::FeatureUnsupported, - format!( - "Column {} is not a direct child of schema but a nested field, which is not supported now. Schema: {}", - column_name, schema - ), - ) - })?; - - if !field.field_type.is_primitive() { - return Err(Error::new( - ErrorKind::FeatureUnsupported, - format!( - "Column {} is not a primitive type. Schema: {}", - column_name, schema - ), - )); - } + // let field = schema + // .as_struct() + // .field_by_id(field_id) + // .ok_or_else(|| { + // Error::new( + // ErrorKind::FeatureUnsupported, + // format!( + // "Column {} is not a direct child of schema but a nested field, which is not supported now. Schema: {}", + // column_name, schema + // ), + // ) + // })?; + + // if !field.field_type.is_primitive() { + // return Err(Error::new( + // ErrorKind::FeatureUnsupported, + // format!( + // "Column {} is not a primitive type. Schema: {}", + // column_name, schema + // ), + // )); + // } field_ids.push(field_id); } diff --git a/crates/iceberg/src/spec/datatypes.rs b/crates/iceberg/src/spec/datatypes.rs index d382459600..ec9a6c07d0 100644 --- a/crates/iceberg/src/spec/datatypes.rs +++ b/crates/iceberg/src/spec/datatypes.rs @@ -36,7 +36,7 @@ use crate::spec::datatypes::_decimal::{MAX_PRECISION, REQUIRED_LENGTH}; use crate::spec::PrimitiveLiteral; /// Field name for list type. -pub(crate) const LIST_FILED_NAME: &str = "element"; +pub(crate) const LIST_FILED_NAME: &str = "item"; pub(crate) const MAP_KEY_FIELD_NAME: &str = "key"; pub(crate) const MAP_VALUE_FIELD_NAME: &str = "value"; From 9c97cb6a818ed4f5213ee07b8755890fcf0bb1b9 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 12 Sep 2024 14:17:53 +0800 Subject: [PATCH 03/15] reorder record batch --- crates/iceberg/src/arrow/reader.rs | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index c1ed1aaf27..5ddb7ec763 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -187,7 +187,8 @@ impl ArrowReader { // Create a projection mask for the batch stream to select which columns in the // Parquet file that we want in the response - let projection_mask = Self::get_arrow_projection_mask( + // Since Parquet projection mask will lose the order of the columns, we need to reorder. + let (projection_mask, reorder) = Self::get_arrow_projection_mask( &task.project_field_ids, &task.schema, record_batch_stream_builder.parquet_schema(), @@ -235,6 +236,11 @@ impl ArrowReader { // to the requester. let mut record_batch_stream = record_batch_stream_builder.build()?; while let Some(batch) = record_batch_stream.try_next().await? { + let batch = if let Some(reorder) = reorder.as_ref() { + batch.project(&reorder).expect("must be able to reorder") + } else { + batch + }; tx.send(Ok(batch)).await? } @@ -262,9 +268,9 @@ impl ArrowReader { iceberg_schema_of_task: &Schema, parquet_schema: &SchemaDescriptor, arrow_schema: &ArrowSchemaRef, - ) -> Result { + ) -> Result<(ProjectionMask, Option>)> { if field_ids.is_empty() { - Ok(ProjectionMask::all()) + Ok((ProjectionMask::all(), None)) } else { // Build the map between field id and column index in Parquet schema. let mut column_map = HashMap::new(); @@ -322,7 +328,17 @@ impl ArrowReader { )); } } - Ok(ProjectionMask::roots(parquet_schema, indices)) + + // projection mask is order by indices + let mut mask_indices = indices.clone(); + mask_indices.sort_by_key(|&x| x); + // try to reorder the mask_indices to indices + let reorder = indices + .iter() + .map(|idx| mask_indices.iter().position(|&i| i == *idx).unwrap()) + .collect::>(); + + Ok((ProjectionMask::roots(parquet_schema, indices), Some(reorder))) } } From 05f7491c7118072dd0d6fc81b1c8e2522de6734d Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Tue, 10 Sep 2024 11:30:24 +0800 Subject: [PATCH 04/15] fix scan --- crates/iceberg/src/scan.rs | 24 +++++++++--------------- 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 69209b75ec..0642ef7b3a 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -36,8 +36,8 @@ use crate::io::object_cache::ObjectCache; use crate::io::FileIO; use crate::runtime::spawn; use crate::spec::{ - DataContentType, DataFileFormat, ManifestContentType, ManifestEntryRef, ManifestFile, - ManifestList, Schema, SchemaRef, SnapshotRef, TableMetadataRef, + DataContentType, DataFileFormat, ManifestEntryRef, ManifestFile, ManifestList, Schema, + SchemaRef, SnapshotRef, TableMetadataRef, }; use crate::table::Table; use crate::utils::available_parallelism; @@ -405,15 +405,6 @@ impl TableScan { return Ok(()); } - // abort the plan if we encounter a manifest entry whose data file's - // content type is currently unsupported - if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data { - return Err(Error::new( - ErrorKind::FeatureUnsupported, - "Only Data files currently supported", - )); - } - if let Some(ref bound_predicates) = manifest_entry_context.bound_predicates { let BoundPredicates { ref snapshot_bound_predicate, @@ -542,6 +533,8 @@ impl ManifestEntryContext { predicate: self .bound_predicates .map(|x| x.as_ref().snapshot_bound_predicate.clone()), + sequence_number: self.manifest_entry.sequence_number().unwrap_or(0), + equality_ids: self.manifest_entry.data_file().equality_ids().to_vec(), } } } @@ -580,10 +573,7 @@ impl PlanContext { manifest_list: Arc, sender: Sender, ) -> Result>>> { - let filtered_entries = manifest_list - .entries() - .iter() - .filter(|manifest_file| manifest_file.content == ManifestContentType::Data); + let filtered_entries = manifest_list.entries().iter(); // TODO: Ideally we could ditch this intermediate Vec as we return an iterator. let mut filtered_mfcs = vec![]; @@ -883,6 +873,10 @@ pub struct FileScanTask { /// The predicate to filter. #[serde(skip_serializing_if = "Option::is_none")] pub predicate: Option, + /// The `sequence_number` of the task. + pub sequence_number: i64, + /// The `equality_ids` of the task. + pub equality_ids: Vec, } #[cfg(test)] From e274e1e45ff4ac5bf68d6804ad19ee7878fd5ba8 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Wed, 11 Sep 2024 14:18:04 +0800 Subject: [PATCH 05/15] fix filtered_entries --- crates/iceberg/src/scan.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 0642ef7b3a..fdb65d7ef8 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -573,12 +573,10 @@ impl PlanContext { manifest_list: Arc, sender: Sender, ) -> Result>>> { - let filtered_entries = manifest_list.entries().iter(); - // TODO: Ideally we could ditch this intermediate Vec as we return an iterator. let mut filtered_mfcs = vec![]; if self.predicate.is_some() { - for manifest_file in filtered_entries { + for manifest_file in manifest_list.entries().iter() { let partition_bound_predicate = self.get_partition_filter(manifest_file)?; // evaluate the ManifestFile against the partition filter. Skip @@ -600,7 +598,7 @@ impl PlanContext { } } } else { - for manifest_file in filtered_entries { + for manifest_file in manifest_list.entries().iter() { let mfc = self.create_manifest_file_context(manifest_file, None, sender.clone()); filtered_mfcs.push(Ok(mfc)); } From 2fefaaa105813a56e86fa9c2c9cb12582c5baa14 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Wed, 11 Sep 2024 14:39:05 +0800 Subject: [PATCH 06/15] fix ci ut --- crates/iceberg/src/scan.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index fdb65d7ef8..0243a9775b 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -1582,6 +1582,8 @@ mod tests { schema: schema.clone(), record_count: Some(100), data_file_format: DataFileFormat::Parquet, + sequence_number: 0, + equality_ids: vec![], }; test_fn(task); @@ -1596,6 +1598,8 @@ mod tests { schema, record_count: None, data_file_format: DataFileFormat::Avro, + sequence_number: 0, + equality_ids: vec![], }; test_fn(task); } From bf97b9de51c5c86394e4df7aaff82cc3ed8bc928 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 12 Sep 2024 15:00:28 +0800 Subject: [PATCH 07/15] change list item name back to list element --- crates/iceberg/src/arrow/reader.rs | 1 - crates/iceberg/src/spec/datatypes.rs | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 5ddb7ec763..5328e04914 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -32,7 +32,6 @@ use fnv::FnvHashSet; use futures::channel::mpsc::{channel, Sender}; use futures::future::BoxFuture; use futures::{try_join, SinkExt, StreamExt, TryFutureExt, TryStreamExt}; -use itertools::Itertools; use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter}; use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader}; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY}; diff --git a/crates/iceberg/src/spec/datatypes.rs b/crates/iceberg/src/spec/datatypes.rs index ec9a6c07d0..d382459600 100644 --- a/crates/iceberg/src/spec/datatypes.rs +++ b/crates/iceberg/src/spec/datatypes.rs @@ -36,7 +36,7 @@ use crate::spec::datatypes::_decimal::{MAX_PRECISION, REQUIRED_LENGTH}; use crate::spec::PrimitiveLiteral; /// Field name for list type. -pub(crate) const LIST_FILED_NAME: &str = "item"; +pub(crate) const LIST_FILED_NAME: &str = "element"; pub(crate) const MAP_KEY_FIELD_NAME: &str = "key"; pub(crate) const MAP_VALUE_FIELD_NAME: &str = "value"; From 82142ea07044203becf72ea6d067d619d0cf46d9 Mon Sep 17 00:00:00 2001 From: xxhZs <1060434431@qq.com> Date: Thu, 12 Sep 2024 19:27:55 +0800 Subject: [PATCH 08/15] fix --- crates/iceberg/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 6166d360d1..7759782b93 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -60,7 +60,7 @@ derive_builder = { workspace = true } fnv = { workspace = true } futures = { workspace = true } itertools = { workspace = true } -moka = { version = "0.12.8", features = ["future"] } +moka = { version = "0.12.0", features = ["future"] } murmur3 = { workspace = true } once_cell = { workspace = true } opendal = { workspace = true } From 50adf7a188ae7b9faf07d8f62a9e13b6547bad00 Mon Sep 17 00:00:00 2001 From: Li0k Date: Sat, 14 Sep 2024 23:31:29 +0800 Subject: [PATCH 09/15] feat(iceberg): support sql catalog interface --- crates/catalog/sql/Cargo.toml | 2 + crates/catalog/sql/src/catalog.rs | 899 +++++++++++++++++++++- crates/catalog/sql/src/error.rs | 19 +- crates/iceberg/src/catalog/mod.rs | 2 +- crates/iceberg/src/spec/table_metadata.rs | 12 + 5 files changed, 913 insertions(+), 21 deletions(-) diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml index 4a88e75b4d..a516716501 100644 --- a/crates/catalog/sql/Cargo.toml +++ b/crates/catalog/sql/Cargo.toml @@ -31,8 +31,10 @@ keywords = ["iceberg", "sql", "catalog"] [dependencies] async-trait = { workspace = true } iceberg = { workspace = true } +serde_json = { workspace = true } sqlx = { version = "0.8.1", features = ["any"], default-features = false } typed-builder = { workspace = true } +uuid = { workspace = true, features = ["v4"] } [dev-dependencies] iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index c6a524cea1..402278a4c1 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -20,15 +20,20 @@ use std::time::Duration; use async_trait::async_trait; use iceberg::io::FileIO; +use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ Catalog, Error, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, + TableUpdate, }; use sqlx::any::{install_default_drivers, AnyPoolOptions, AnyQueryResult, AnyRow}; use sqlx::{Any, AnyPool, Row, Transaction}; use typed_builder::TypedBuilder; +use uuid::Uuid; -use crate::error::{from_sqlx_error, no_such_namespace_err}; +use crate::error::{ + from_sqlx_error, no_such_namespace_err, no_such_table_err, table_already_exists_err, +}; static CATALOG_TABLE_NAME: &str = "iceberg_tables"; static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name"; @@ -37,12 +42,15 @@ static CATALOG_FIELD_TABLE_NAMESPACE: &str = "table_namespace"; static CATALOG_FIELD_METADATA_LOCATION_PROP: &str = "metadata_location"; static CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location"; static CATALOG_FIELD_RECORD_TYPE: &str = "iceberg_type"; +static CATALOG_FIELD_TABLE_RECORD_TYPE: &str = "TABLE"; static NAMESPACE_TABLE_NAME: &str = "iceberg_namespace_properties"; static NAMESPACE_FIELD_NAME: &str = "namespace"; static NAMESPACE_FIELD_PROPERTY_KEY: &str = "property_key"; static NAMESPACE_FIELD_PROPERTY_VALUE: &str = "property_value"; +static NAMESPACE_LOCATION_PROPERTY_KEY: &str = "location"; + static MAX_CONNECTIONS: u32 = 10; // Default the SQL pool to 10 connections if not provided static IDLE_TIMEOUT: u64 = 10; // Default the maximum idle timeout per connection to 10s before it is closed static TEST_BEFORE_ACQUIRE: bool = true; // Default the health-check of each connection to enabled prior to returning @@ -71,8 +79,8 @@ pub struct SqlCatalogConfig { pub struct SqlCatalog { name: String, connection: AnyPool, - _warehouse_location: String, - _fileio: FileIO, + warehouse_location: String, + fileio: FileIO, sql_bind_style: SqlBindStyle, } @@ -142,8 +150,8 @@ impl SqlCatalog { Ok(SqlCatalog { name: config.name.to_owned(), connection: pool, - _warehouse_location: config.warehouse_location, - _fileio: config.file_io, + warehouse_location: config.warehouse_location, + fileio: config.file_io, sql_bind_style: config.sql_bind_style, }) } @@ -476,51 +484,311 @@ impl Catalog for SqlCatalog { todo!() } - async fn list_tables(&self, _namespace: &NamespaceIdent) -> Result> { - todo!() + async fn list_tables(&self, namespace: &NamespaceIdent) -> Result> { + if !self.namespace_exists(namespace).await? { + return no_such_namespace_err(namespace); + } + + let query = format!( + "SELECT {CATALOG_FIELD_TABLE_NAME} FROM {CATALOG_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? + AND ( + {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' + OR {CATALOG_FIELD_RECORD_TYPE} IS NULL + )", + ); + + let namespace_name = namespace.join("."); + let args: Vec> = vec![Some(&self.name), Some(&namespace_name)]; + let query_result_rows = self.fetch_rows(&query, args).await?; + if query_result_rows.is_empty() { + return Ok(vec![]); + } + + let mut table_idents = Vec::with_capacity(query_result_rows.len()); + for row in query_result_rows { + let table_name = row + .try_get::(CATALOG_FIELD_TABLE_NAME) + .map_err(from_sqlx_error)?; + table_idents.push(TableIdent::new(namespace.clone(), table_name)); + } + + Ok(table_idents) } - async fn table_exists(&self, _identifier: &TableIdent) -> Result { - todo!() + async fn table_exists(&self, identifier: &TableIdent) -> Result { + let namespace = identifier.namespace().join("."); + let table_name = identifier.name(); + let catalog_name = self.name.as_str(); + let query = format!("SELECT 1 FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_FIELD_CATALOG_NAME} = ? AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? AND {CATALOG_FIELD_TABLE_NAME} = ? AND ({CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' OR {CATALOG_FIELD_RECORD_TYPE} IS NULL) LIMIT 1"); + let args = vec![ + Some(catalog_name), + Some(namespace.as_str()), + Some(table_name), + ]; + + let table_counts = self.fetch_rows(&query, args).await?; + + Ok(!table_counts.is_empty()) } - async fn drop_table(&self, _identifier: &TableIdent) -> Result<()> { - todo!() + async fn drop_table(&self, identifier: &TableIdent) -> Result<()> { + if !self.table_exists(identifier).await? { + return no_such_table_err(identifier); + } + + let namespace = identifier.namespace().join("."); + let table_name = identifier.name(); + let catalog_name = self.name.as_str(); + + let delete = format!( + "DELETE FROM {CATALOG_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? + AND {CATALOG_FIELD_TABLE_NAME} = ? + AND ( + {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' + OR {CATALOG_FIELD_RECORD_TYPE} IS NULL + )" + ); + + let args = vec![ + Some(catalog_name), + Some(namespace.as_str()), + Some(table_name), + ]; + + self.execute(&delete, args, None).await?; + + Ok(()) } - async fn load_table(&self, _identifier: &TableIdent) -> Result { - todo!() + async fn load_table(&self, identifier: &TableIdent) -> Result
{ + if !self.table_exists(identifier).await? { + return no_such_table_err(identifier); + } + + let namespace = identifier.namespace().join("."); + let table_name = identifier.name(); + let catalog_name = self.name.as_str(); + + let query = format!( + "SELECT {CATALOG_FIELD_METADATA_LOCATION_PROP} FROM {CATALOG_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? + AND {CATALOG_FIELD_TABLE_NAME} = ? + AND ( + {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' + OR {CATALOG_FIELD_RECORD_TYPE} IS NULL) + LIMIT 1 + " + ); + + let args = vec![ + Some(catalog_name), + Some(namespace.as_str()), + Some(table_name), + ]; + + let query_result_rows = self.fetch_rows(&query, args).await?; + + if query_result_rows.is_empty() { + return no_such_table_err(identifier); + } + + let row = query_result_rows.first().unwrap(); + let metadata_location = row + .try_get::(CATALOG_FIELD_METADATA_LOCATION_PROP) + .map_err(from_sqlx_error)?; + + let file = self.fileio.new_input(&metadata_location)?; + let metadata: TableMetadata = serde_json::from_slice(file.read().await?.as_ref())?; + + Ok(Table::builder() + .file_io(self.fileio.clone()) + .identifier(identifier.clone()) + .metadata_location(metadata_location) + .metadata(metadata) + .build()?) } async fn create_table( &self, - _namespace: &NamespaceIdent, - _creation: TableCreation, + namespace: &NamespaceIdent, + creation: TableCreation, ) -> Result
{ - todo!() + if !self.namespace_exists(namespace).await? { + return no_such_namespace_err(namespace); + } + + let identifier = TableIdent::new(namespace.clone(), creation.name.clone()); + if self.table_exists(&identifier).await? { + return table_already_exists_err(&identifier); + } + + let new_table_name = creation.name.clone(); + + // build table location + let table_creation_localtion = match creation.location { + Some(location) => location, + None => { + let namespace_properties = + self.get_namespace(namespace).await?.properties().clone(); + match namespace_properties.get(NAMESPACE_LOCATION_PROPERTY_KEY) { + Some(location) => { + format!("{}/{}", location.clone(), new_table_name,) + } + None => { + format!( + "{}/{}/{}", + self.warehouse_location.clone(), + namespace.join("/"), + new_table_name, + ) + } + } + } + }; + + // build table metadata + let table_metadata = TableMetadataBuilder::from_table_creation(TableCreation { + location: Some(table_creation_localtion.clone()), + ..creation + })? + .build()?; + + // serde table to json + let new_table_meta_localtion = metadata_path(&table_creation_localtion, Uuid::new_v4()); + let file = self.fileio.new_output(&new_table_meta_localtion)?; + file.write(serde_json::to_vec(&table_metadata)?.into()) + .await?; + + let insert = format!( + "INSERT INTO {CATALOG_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE}) VALUES (?, ?, ?, ?, ?)" + ); + + let namespace_name = namespace.join("."); + let args: Vec> = vec![ + Some(&self.name), + Some(&namespace_name), + Some(&new_table_name), + Some(&new_table_meta_localtion), + Some(CATALOG_FIELD_TABLE_RECORD_TYPE), + ]; + + self.execute(&insert, args, None).await?; + + Ok(Table::builder() + .file_io(self.fileio.clone()) + .identifier(identifier) + .metadata_location(new_table_meta_localtion) + .metadata(table_metadata) + .build()?) } async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> Result<()> { todo!() } - async fn update_table(&self, _commit: TableCommit) -> Result
{ - todo!() + async fn update_table(&self, mut commit: TableCommit) -> Result
{ + let identifier = commit.identifier().clone(); + if !self.table_exists(&identifier).await? { + return no_such_table_err(&identifier); + } + + // ReplaceSortOrder is currently not supported, so ignore the requirement here. + let _requirements = commit.take_requirements(); + let table_updates = commit.take_updates(); + + let table = self.load_table(&identifier).await?; + let mut update_table_metadata = table.metadata().clone(); + + for table_update in table_updates { + match table_update { + TableUpdate::AddSnapshot { snapshot } => { + update_table_metadata.append_snapshot(snapshot); + } + + TableUpdate::SetSnapshotRef { + ref_name, + reference, + } => { + update_table_metadata.update_snapshot_ref(ref_name, reference); + } + + _ => { + unreachable!() + } + } + } + + let new_table_meta_localtion = metadata_path(table.metadata().location(), Uuid::new_v4()); + let file = self.fileio.new_output(&new_table_meta_localtion)?; + file.write(serde_json::to_vec(&update_table_metadata)?.into()) + .await?; + + let update = format!( + "UPDATE {CATALOG_TABLE_NAME} + SET {CATALOG_FIELD_METADATA_LOCATION_PROP} = ? + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? + AND {CATALOG_FIELD_TABLE_NAME} = ? + AND ( + {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' + OR {CATALOG_FIELD_RECORD_TYPE} IS NULL + )" + ); + + let namespace_name = identifier.namespace().join("."); + let args: Vec> = vec![ + Some(&new_table_meta_localtion), + Some(&self.name), + Some(&namespace_name), + Some(identifier.name()), + ]; + + self.execute(&update, args, None).await?; + + Ok(Table::builder() + .file_io(self.fileio.clone()) + .identifier(identifier) + .metadata_location(new_table_meta_localtion) + .metadata(update_table_metadata) + .build()?) } } +/// Generate the metadata path for a table +#[inline] +pub fn metadata_path(meta_data_location: &str, uuid: Uuid) -> String { + format!("{}/metadata/0-{}.metadata.json", meta_data_location, uuid) +} + #[cfg(test)] mod tests { use std::collections::{HashMap, HashSet}; use std::hash::Hash; use iceberg::io::FileIOBuilder; - use iceberg::{Catalog, Namespace, NamespaceIdent}; + use iceberg::spec::{ + NestedField, Operation, PartitionSpec, PrimitiveType, Schema, Snapshot, SnapshotReference, + SnapshotRetention, SortOrder, Summary, Type, MAIN_BRANCH, + }; + use iceberg::table::Table; + use iceberg::{ + Catalog, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, TableUpdate, + }; + use itertools::Itertools; + use regex::Regex; use sqlx::migrate::MigrateDatabase; use tempfile::TempDir; + use crate::catalog::NAMESPACE_LOCATION_PROPERTY_KEY; use crate::{SqlBindStyle, SqlCatalog, SqlCatalogConfig}; + const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"; + fn temp_path() -> String { let temp_dir = TempDir::new().unwrap(); temp_dir.path().to_str().unwrap().to_string() @@ -562,6 +830,77 @@ mod tests { } } + fn simple_table_schema() -> Schema { + Schema::builder() + .with_fields(vec![NestedField::required( + 1, + "foo", + Type::Primitive(PrimitiveType::Int), + ) + .into()]) + .build() + .unwrap() + } + + fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) { + assert_eq!(table.identifier(), expected_table_ident); + + let metadata = table.metadata(); + + assert_eq!(metadata.current_schema().as_ref(), expected_schema); + + let expected_partition_spec = PartitionSpec::builder(expected_schema) + .with_spec_id(0) + .build() + .unwrap(); + + assert_eq!( + metadata + .partition_specs_iter() + .map(|p| p.as_ref()) + .collect_vec(), + vec![&expected_partition_spec] + ); + + let expected_sorted_order = SortOrder::builder() + .with_order_id(0) + .with_fields(vec![]) + .build(expected_schema) + .unwrap(); + + assert_eq!( + metadata + .sort_orders_iter() + .map(|s| s.as_ref()) + .collect_vec(), + vec![&expected_sorted_order] + ); + + assert_eq!(metadata.properties(), &HashMap::new()); + + assert!(!table.readonly()); + } + + fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) { + let actual = table.metadata_location().unwrap().to_string(); + let regex = Regex::new(regex_str).unwrap(); + assert!(regex.is_match(&actual)) + } + + async fn create_table(catalog: &C, table_ident: &TableIdent) { + let _ = catalog + .create_table( + &table_ident.namespace, + TableCreation::builder() + .name(table_ident.name().into()) + .schema(simple_table_schema()) + .location(temp_path()) + .build(), + ) + .await + .unwrap(); + } + #[tokio::test] async fn test_initialized() { let warehouse_loc = temp_path(); @@ -934,4 +1273,526 @@ mod tests { .await .unwrap()); } + + #[tokio::test] + async fn test_create_table_with_location() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc.clone()).await; + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + + let table_name = "abc"; + let location = warehouse_loc.clone(); + let table_creation = TableCreation::builder() + .name(table_name.into()) + .location(location.clone()) + .schema(simple_table_schema()) + .build(); + + let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + + assert_table_eq( + &catalog + .create_table(&namespace_ident, table_creation) + .await + .unwrap(), + &expected_table_ident, + &simple_table_schema(), + ); + + let table = catalog.load_table(&expected_table_ident).await.unwrap(); + + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + + assert!(table + .metadata_location() + .unwrap() + .to_string() + .starts_with(&location)) + } + + #[tokio::test] + async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + + let namespace_ident = NamespaceIdent::new("a".into()); + let mut namespace_properties = HashMap::new(); + let namespace_location = temp_path(); + namespace_properties.insert( + NAMESPACE_LOCATION_PROPERTY_KEY.to_string(), + namespace_location.to_string(), + ); + catalog + .create_namespace(&namespace_ident, namespace_properties) + .await + .unwrap(); + + let table_name = "tbl1"; + let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + let expected_table_metadata_location_regex = format!( + "^{}/tbl1/metadata/0-{}.metadata.json$", + namespace_location, UUID_REGEX_STR, + ); + + let table = catalog + .create_table( + &namespace_ident, + TableCreation::builder() + .name(table_name.into()) + .schema(simple_table_schema()) + // no location specified for table + .build(), + ) + .await + .unwrap(); + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex); + + let table = catalog.load_table(&expected_table_ident).await.unwrap(); + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex); + } + + #[tokio::test] + async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing( + ) { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + + let namespace_ident = NamespaceIdent::new("a".into()); + let mut namespace_properties = HashMap::new(); + let namespace_location = temp_path(); + namespace_properties.insert( + NAMESPACE_LOCATION_PROPERTY_KEY.to_string(), + namespace_location.to_string(), + ); + catalog + .create_namespace(&namespace_ident, namespace_properties) + .await + .unwrap(); + + let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + let mut nested_namespace_properties = HashMap::new(); + let nested_namespace_location = temp_path(); + nested_namespace_properties.insert( + NAMESPACE_LOCATION_PROPERTY_KEY.to_string(), + nested_namespace_location.to_string(), + ); + catalog + .create_namespace(&nested_namespace_ident, nested_namespace_properties) + .await + .unwrap(); + + let table_name = "tbl1"; + let expected_table_ident = + TableIdent::new(nested_namespace_ident.clone(), table_name.into()); + let expected_table_metadata_location_regex = format!( + "^{}/tbl1/metadata/0-{}.metadata.json$", + nested_namespace_location, UUID_REGEX_STR, + ); + + let table = catalog + .create_table( + &nested_namespace_ident, + TableCreation::builder() + .name(table_name.into()) + .schema(simple_table_schema()) + // no location specified for table + .build(), + ) + .await + .unwrap(); + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex); + + let table = catalog.load_table(&expected_table_ident).await.unwrap(); + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex); + } + + #[tokio::test] + async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing( + ) { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc.clone()).await; + + let namespace_ident = NamespaceIdent::new("a".into()); + // note: no location specified in namespace_properties + let namespace_properties = HashMap::new(); + catalog + .create_namespace(&namespace_ident, namespace_properties) + .await + .unwrap(); + + let table_name = "tbl1"; + let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + let expected_table_metadata_location_regex = format!( + "^{}/a/tbl1/metadata/0-{}.metadata.json$", + warehouse_loc, UUID_REGEX_STR + ); + + let table = catalog + .create_table( + &namespace_ident, + TableCreation::builder() + .name(table_name.into()) + .schema(simple_table_schema()) + // no location specified for table + .build(), + ) + .await + .unwrap(); + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex); + + let table = catalog.load_table(&expected_table_ident).await.unwrap(); + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex); + } + + #[tokio::test] + async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing( + ) { + let warehouse_loc = temp_path(); + let namespace_location = warehouse_loc.clone(); + let catalog = new_sql_catalog(warehouse_loc).await; + + let namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + let namespace_properties = HashMap::new(); + catalog + .create_namespace(&namespace_ident, namespace_properties) + .await + .unwrap(); + + let table_name = "tbl1"; + let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + let expected_table_metadata_location_regex = format!( + "^{}/{}/{}/{}/metadata/0-{}.metadata.json$", + namespace_location, "a", "b", table_name, UUID_REGEX_STR, + ); + + let table = catalog + .create_table( + &namespace_ident, + TableCreation::builder() + .name(table_name.into()) + .schema(simple_table_schema()) + // no location specified for table + .build(), + ) + .await + .unwrap(); + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex); + + let table = catalog.load_table(&expected_table_ident).await.unwrap(); + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex); + } + + #[tokio::test] + async fn test_create_table_throws_error_if_table_with_same_name_already_exists() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc.clone()).await; + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + let table_name = "tbl1"; + let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + create_table(&catalog, &table_ident).await; + + let tmp_dir = TempDir::new().unwrap(); + let location = tmp_dir.path().to_str().unwrap().to_string(); + + assert_eq!( + catalog + .create_table( + &namespace_ident, + TableCreation::builder() + .name(table_name.into()) + .schema(simple_table_schema()) + .location(location) + .build() + ) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => Cannot create table {:?}. Table already exists.", + &table_ident + ) + ); + } + + #[tokio::test] + async fn test_drop_table_throws_error_if_table_not_exist() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc.clone()).await; + let namespace_ident = NamespaceIdent::new("a".into()); + let table_name = "tbl1"; + let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + create_namespace(&catalog, &namespace_ident).await; + + let err = catalog + .drop_table(&table_ident) + .await + .unwrap_err() + .to_string(); + assert_eq!( + err, + "Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }" + ); + } + + #[tokio::test] + async fn test_drop_table() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc.clone()).await; + let namespace_ident = NamespaceIdent::new("a".into()); + let table_name = "tbl1"; + let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + create_namespace(&catalog, &namespace_ident).await; + + let location = warehouse_loc.clone(); + let table_creation = TableCreation::builder() + .name(table_name.into()) + .location(location.clone()) + .schema(simple_table_schema()) + .build(); + + catalog + .create_table(&namespace_ident, table_creation) + .await + .unwrap(); + + let table = catalog.load_table(&table_ident).await.unwrap(); + assert_table_eq(&table, &table_ident, &simple_table_schema()); + + catalog.drop_table(&table_ident).await.unwrap(); + let err = catalog + .load_table(&table_ident) + .await + .unwrap_err() + .to_string(); + assert_eq!( + err, + "Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }" + ); + } + + #[tokio::test] + async fn test_update_table_throws_error_if_table_not_exist() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc.clone()).await; + let namespace_ident = NamespaceIdent::new("a".into()); + let table_name = "tbl1"; + let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + create_namespace(&catalog, &namespace_ident).await; + let table_commit = TableCommit::builder() + .ident(table_ident.clone()) + .updates(vec![]) + .requirements(vec![]) + .build(); + let err = catalog + .update_table(table_commit) + .await + .unwrap_err() + .to_string(); + assert_eq!( + err, + "Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }" + ); + } + + #[tokio::test] + async fn test_update_table_add_snapshot() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc.clone()).await; + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + + let table_name = "abc"; + let location = warehouse_loc.clone(); + let table_creation = TableCreation::builder() + .name(table_name.into()) + .location(location.clone()) + .schema(simple_table_schema()) + .build(); + + let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + + assert_table_eq( + &catalog + .create_table(&namespace_ident, table_creation) + .await + .unwrap(), + &expected_table_ident, + &simple_table_schema(), + ); + + let table = catalog.load_table(&expected_table_ident).await.unwrap(); + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + + let table_snapshots_iter = table.metadata().snapshots(); + assert_eq!(0, table_snapshots_iter.count()); + + let add_snapshot = Snapshot::builder() + .with_snapshot_id(638933773299822130) + .with_timestamp_ms(1662532818843) + .with_sequence_number(1) + .with_schema_id(1) + .with_manifest_list("/home/iceberg/warehouse/ns/tbl1/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro") + .with_summary(Summary { operation: Operation::Append, other: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) }) + .build(); + + let table_update = TableUpdate::AddSnapshot { + snapshot: add_snapshot, + }; + let requirements = vec![]; + let table_commit = TableCommit::builder() + .ident(expected_table_ident.clone()) + .updates(vec![table_update]) + .requirements(requirements) + .build(); + let table = catalog.update_table(table_commit).await.unwrap(); + let snapshot_vec = table.metadata().snapshots().collect_vec(); + assert_eq!(1, snapshot_vec.len()); + let snapshot = &snapshot_vec[0]; + assert_eq!(snapshot.snapshot_id(), 638933773299822130); + assert_eq!(snapshot.timestamp_ms(), 1662532818843); + assert_eq!(snapshot.sequence_number(), 1); + assert_eq!(snapshot.schema_id().unwrap(), 1); + assert_eq!(snapshot.manifest_list(), "/home/iceberg/warehouse/ns/tbl1/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro"); + assert_eq!(snapshot.summary().operation, Operation::Append); + assert_eq!( + snapshot.summary().other, + HashMap::from_iter(vec![ + ( + "spark.app.id".to_string(), + "local-1662532784305".to_string() + ), + ("added-data-files".to_string(), "4".to_string()), + ("added-records".to_string(), "4".to_string()), + ("added-files-size".to_string(), "6001".to_string()) + ]) + ); + + let table_reload = catalog.load_table(&expected_table_ident).await.unwrap(); + let snapshot_reload_vec = table_reload.metadata().snapshots().collect_vec(); + assert_eq!(1, snapshot_reload_vec.len()); + let snapshot_reload = &snapshot_reload_vec[0]; + assert_eq!(snapshot, snapshot_reload); + } + + #[tokio::test] + async fn test_update_table_set_snapshot_ref() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc.clone()).await; + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + + let table_name = "abc"; + let location = warehouse_loc.clone(); + let table_creation = TableCreation::builder() + .name(table_name.into()) + .location(location.clone()) + .schema(simple_table_schema()) + .build(); + + let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + + assert_table_eq( + &catalog + .create_table(&namespace_ident, table_creation) + .await + .unwrap(), + &expected_table_ident, + &simple_table_schema(), + ); + + let table = catalog.load_table(&expected_table_ident).await.unwrap(); + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + + let table_snapshots_iter = table.metadata().snapshots(); + assert_eq!(0, table_snapshots_iter.count()); + + let snapshot_id = 638933773299822130; + let reference = SnapshotReference { + snapshot_id, + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: Some(10), + max_snapshot_age_ms: Some(100), + max_ref_age_ms: Some(200), + }, + }; + let table_update_set_snapshot_ref = TableUpdate::SetSnapshotRef { + ref_name: MAIN_BRANCH.to_string(), + reference: reference.clone(), + }; + + let add_snapshot = Snapshot::builder() + .with_snapshot_id(638933773299822130) + .with_timestamp_ms(1662532818843) + .with_sequence_number(1) + .with_schema_id(1) + .with_manifest_list("/home/iceberg/warehouse/ns/tbl1/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro") + .with_summary(Summary { operation: Operation::Append, other: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) }) + .build(); + let table_update_add_snapshot = TableUpdate::AddSnapshot { + snapshot: add_snapshot, + }; + let table_commit = TableCommit::builder() + .ident(expected_table_ident.clone()) + .updates(vec![table_update_add_snapshot]) + .requirements(vec![]) + .build(); + catalog.update_table(table_commit).await.unwrap(); + let table = catalog.load_table(&expected_table_ident).await.unwrap(); + let snapshot_vec = table.metadata().snapshots().collect_vec(); + assert_eq!(1, snapshot_vec.len()); + let snapshot = &snapshot_vec[0]; + assert_eq!(snapshot.snapshot_id(), 638933773299822130); + assert_eq!(snapshot.timestamp_ms(), 1662532818843); + assert_eq!(snapshot.sequence_number(), 1); + assert_eq!(snapshot.schema_id().unwrap(), 1); + assert_eq!(snapshot.manifest_list(), "/home/iceberg/warehouse/ns/tbl1/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro"); + assert_eq!(snapshot.summary().operation, Operation::Append); + assert_eq!( + snapshot.summary().other, + HashMap::from_iter(vec![ + ( + "spark.app.id".to_string(), + "local-1662532784305".to_string() + ), + ("added-data-files".to_string(), "4".to_string()), + ("added-records".to_string(), "4".to_string()), + ("added-files-size".to_string(), "6001".to_string()) + ]) + ); + + let snapshot_refs_map = table.metadata().snapshot_refs(); + assert_eq!(1, snapshot_refs_map.len()); + let snapshot_ref = snapshot_refs_map.get(MAIN_BRANCH).unwrap(); + let basic_snapshot_ref = SnapshotReference { + snapshot_id, + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: None, + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + }; + assert_eq!(snapshot_ref, &basic_snapshot_ref); + + let table_commit = TableCommit::builder() + .ident(expected_table_ident.clone()) + .updates(vec![table_update_set_snapshot_ref]) + .requirements(vec![]) + .build(); + catalog.update_table(table_commit).await.unwrap(); + let table = catalog.load_table(&expected_table_ident).await.unwrap(); + let snapshot_refs_map = table.metadata().snapshot_refs(); + assert_eq!(1, snapshot_refs_map.len()); + let snapshot_ref = snapshot_refs_map.get(MAIN_BRANCH).unwrap(); + assert_eq!(snapshot_ref, &reference); + } } diff --git a/crates/catalog/sql/src/error.rs b/crates/catalog/sql/src/error.rs index cfefcc26a9..0bb8ba0c89 100644 --- a/crates/catalog/sql/src/error.rs +++ b/crates/catalog/sql/src/error.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use iceberg::{Error, ErrorKind, NamespaceIdent, Result}; +use iceberg::{Error, ErrorKind, NamespaceIdent, Result, TableIdent}; /// Format an sqlx error into iceberg error. pub fn from_sqlx_error(error: sqlx::Error) -> Error { @@ -32,3 +32,20 @@ pub fn no_such_namespace_err(namespace: &NamespaceIdent) -> Result { format!("No such namespace: {:?}", namespace), )) } + +pub fn no_such_table_err(table_ident: &TableIdent) -> Result { + Err(Error::new( + ErrorKind::Unexpected, + format!("No such table: {:?}", table_ident), + )) +} + +pub fn table_already_exists_err(table_ident: &TableIdent) -> Result { + Err(Error::new( + ErrorKind::Unexpected, + format!( + "Cannot create table {:?}. Table already exists.", + table_ident + ), + )) +} diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index aa2311b4ad..c3e8b5ef50 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -241,7 +241,7 @@ pub struct TableCreation { /// TableCommit represents the commit of a table in the catalog. #[derive(Debug, TypedBuilder)] -#[builder(build_method(vis = "pub(crate)"))] +#[builder(build_method(vis = "pub"))] pub struct TableCommit { /// The table ident. ident: TableIdent, diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 16deaac222..78e86ea52f 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -278,6 +278,18 @@ impl TableMetadata { self.snapshots .insert(snapshot.snapshot_id(), Arc::new(snapshot)); } + + /// update snapshot ref of table + #[inline] + pub fn update_snapshot_ref(&mut self, ref_name: String, reference: SnapshotReference) { + self.refs.insert(ref_name, reference); + } + + /// Returns snapshot references. + #[inline] + pub fn snapshot_refs(&self) -> &HashMap { + &self.refs + } } /// Manipulating table metadata. From 0f1c7d37e14618984f6b5da26d4b3a879db0b7ef Mon Sep 17 00:00:00 2001 From: Li0k Date: Sat, 14 Sep 2024 23:43:52 +0800 Subject: [PATCH 10/15] typo --- crates/catalog/sql/src/catalog.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 402278a4c1..f18d8d4376 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -630,7 +630,7 @@ impl Catalog for SqlCatalog { let new_table_name = creation.name.clone(); // build table location - let table_creation_localtion = match creation.location { + let table_creation_location = match creation.location { Some(location) => location, None => { let namespace_properties = @@ -653,14 +653,14 @@ impl Catalog for SqlCatalog { // build table metadata let table_metadata = TableMetadataBuilder::from_table_creation(TableCreation { - location: Some(table_creation_localtion.clone()), + location: Some(table_creation_location.clone()), ..creation })? .build()?; // serde table to json - let new_table_meta_localtion = metadata_path(&table_creation_localtion, Uuid::new_v4()); - let file = self.fileio.new_output(&new_table_meta_localtion)?; + let new_table_meta_location = metadata_path(&table_creation_location, Uuid::new_v4()); + let file = self.fileio.new_output(&new_table_meta_location)?; file.write(serde_json::to_vec(&table_metadata)?.into()) .await?; @@ -673,7 +673,7 @@ impl Catalog for SqlCatalog { Some(&self.name), Some(&namespace_name), Some(&new_table_name), - Some(&new_table_meta_localtion), + Some(&new_table_meta_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE), ]; @@ -682,7 +682,7 @@ impl Catalog for SqlCatalog { Ok(Table::builder() .file_io(self.fileio.clone()) .identifier(identifier) - .metadata_location(new_table_meta_localtion) + .metadata_location(new_table_meta_location) .metadata(table_metadata) .build()?) } @@ -723,8 +723,8 @@ impl Catalog for SqlCatalog { } } - let new_table_meta_localtion = metadata_path(table.metadata().location(), Uuid::new_v4()); - let file = self.fileio.new_output(&new_table_meta_localtion)?; + let new_table_meta_location = metadata_path(table.metadata().location(), Uuid::new_v4()); + let file = self.fileio.new_output(&new_table_meta_location)?; file.write(serde_json::to_vec(&update_table_metadata)?.into()) .await?; @@ -742,7 +742,7 @@ impl Catalog for SqlCatalog { let namespace_name = identifier.namespace().join("."); let args: Vec> = vec![ - Some(&new_table_meta_localtion), + Some(&new_table_meta_location), Some(&self.name), Some(&namespace_name), Some(identifier.name()), @@ -753,7 +753,7 @@ impl Catalog for SqlCatalog { Ok(Table::builder() .file_io(self.fileio.clone()) .identifier(identifier) - .metadata_location(new_table_meta_localtion) + .metadata_location(new_table_meta_location) .metadata(update_table_metadata) .build()?) } From 82763f8fbc12be1509bb08a569ceb31b5ee53691 Mon Sep 17 00:00:00 2001 From: Li0k Date: Thu, 19 Sep 2024 18:13:54 +0800 Subject: [PATCH 11/15] chore(cargo): Downgrading sqlx from 0.8.0 to 0.7.3 --- crates/catalog/sql/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml index a516716501..8d280ecb95 100644 --- a/crates/catalog/sql/Cargo.toml +++ b/crates/catalog/sql/Cargo.toml @@ -32,7 +32,7 @@ keywords = ["iceberg", "sql", "catalog"] async-trait = { workspace = true } iceberg = { workspace = true } serde_json = { workspace = true } -sqlx = { version = "0.8.1", features = ["any"], default-features = false } +sqlx = { version = "0.7.3", features = ["any"], default-features = false } typed-builder = { workspace = true } uuid = { workspace = true, features = ["v4"] } @@ -40,7 +40,7 @@ uuid = { workspace = true, features = ["v4"] } iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } itertools = { workspace = true } regex = "1.10.5" -sqlx = { version = "0.8.0", features = [ +sqlx = { version = "0.7.3", features = [ "tls-rustls", "runtime-tokio", "any", From 8e6e80a388edb23bbf34957c301dc08a4e8b495c Mon Sep 17 00:00:00 2001 From: Li0k Date: Wed, 25 Sep 2024 17:14:58 +0800 Subject: [PATCH 12/15] fix(iceberg): remove namespace_exists for sql catalogs --- crates/catalog/sql/src/catalog.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index f18d8d4376..0494658961 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -485,9 +485,9 @@ impl Catalog for SqlCatalog { } async fn list_tables(&self, namespace: &NamespaceIdent) -> Result> { - if !self.namespace_exists(namespace).await? { - return no_such_namespace_err(namespace); - } + // if !self.namespace_exists(namespace).await? { + // return no_such_namespace_err(namespace); + // } let query = format!( "SELECT {CATALOG_FIELD_TABLE_NAME} FROM {CATALOG_TABLE_NAME} @@ -618,9 +618,9 @@ impl Catalog for SqlCatalog { namespace: &NamespaceIdent, creation: TableCreation, ) -> Result
{ - if !self.namespace_exists(namespace).await? { - return no_such_namespace_err(namespace); - } + // if !self.namespace_exists(namespace).await? { + // return no_such_namespace_err(namespace); + // } let identifier = TableIdent::new(namespace.clone(), creation.name.clone()); if self.table_exists(&identifier).await? { From ff8ba5a6b785539a9e2390eea28c72bbbb921c50 Mon Sep 17 00:00:00 2001 From: Li0k Date: Thu, 26 Sep 2024 14:56:10 +0800 Subject: [PATCH 13/15] fix(iceberg): fix set snapshot ref --- crates/catalog/sql/src/catalog.rs | 54 +++++++---------------- crates/iceberg/src/spec/table_metadata.rs | 49 +++++++++++++++++--- 2 files changed, 60 insertions(+), 43 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 0494658961..e57189b5bb 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -485,10 +485,6 @@ impl Catalog for SqlCatalog { } async fn list_tables(&self, namespace: &NamespaceIdent) -> Result> { - // if !self.namespace_exists(namespace).await? { - // return no_such_namespace_err(namespace); - // } - let query = format!( "SELECT {CATALOG_FIELD_TABLE_NAME} FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_FIELD_CATALOG_NAME} = ? @@ -618,10 +614,6 @@ impl Catalog for SqlCatalog { namespace: &NamespaceIdent, creation: TableCreation, ) -> Result
{ - // if !self.namespace_exists(namespace).await? { - // return no_such_namespace_err(namespace); - // } - let identifier = TableIdent::new(namespace.clone(), creation.name.clone()); if self.table_exists(&identifier).await? { return table_already_exists_err(&identifier); @@ -702,19 +694,21 @@ impl Catalog for SqlCatalog { let table_updates = commit.take_updates(); let table = self.load_table(&identifier).await?; - let mut update_table_metadata = table.metadata().clone(); + let mut update_table_metadata_builder = TableMetadataBuilder::new(table.metadata().clone()); for table_update in table_updates { match table_update { TableUpdate::AddSnapshot { snapshot } => { - update_table_metadata.append_snapshot(snapshot); + update_table_metadata_builder = + update_table_metadata_builder.append_snapshot(snapshot)?; } TableUpdate::SetSnapshotRef { ref_name, reference, } => { - update_table_metadata.update_snapshot_ref(ref_name, reference); + update_table_metadata_builder = + update_table_metadata_builder.set_snapshot_ref(ref_name, reference)?; } _ => { @@ -725,6 +719,7 @@ impl Catalog for SqlCatalog { let new_table_meta_location = metadata_path(table.metadata().location(), Uuid::new_v4()); let file = self.fileio.new_output(&new_table_meta_location)?; + let update_table_metadata = update_table_metadata_builder.build()?; file.write(serde_json::to_vec(&update_table_metadata)?.into()) .await?; @@ -1676,12 +1671,6 @@ mod tests { ("added-files-size".to_string(), "6001".to_string()) ]) ); - - let table_reload = catalog.load_table(&expected_table_ident).await.unwrap(); - let snapshot_reload_vec = table_reload.metadata().snapshots().collect_vec(); - assert_eq!(1, snapshot_reload_vec.len()); - let snapshot_reload = &snapshot_reload_vec[0]; - assert_eq!(snapshot, snapshot_reload); } #[tokio::test] @@ -1738,16 +1727,19 @@ mod tests { .with_manifest_list("/home/iceberg/warehouse/ns/tbl1/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro") .with_summary(Summary { operation: Operation::Append, other: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) }) .build(); + let table_update_add_snapshot = TableUpdate::AddSnapshot { snapshot: add_snapshot, }; + + let table_update_opers = vec![table_update_add_snapshot, table_update_set_snapshot_ref]; + let table_commit = TableCommit::builder() .ident(expected_table_ident.clone()) - .updates(vec![table_update_add_snapshot]) + .updates(table_update_opers) .requirements(vec![]) .build(); - catalog.update_table(table_commit).await.unwrap(); - let table = catalog.load_table(&expected_table_ident).await.unwrap(); + let table = catalog.update_table(table_commit).await.unwrap(); let snapshot_vec = table.metadata().snapshots().collect_vec(); assert_eq!(1, snapshot_vec.len()); let snapshot = &snapshot_vec[0]; @@ -1773,26 +1765,14 @@ mod tests { let snapshot_refs_map = table.metadata().snapshot_refs(); assert_eq!(1, snapshot_refs_map.len()); let snapshot_ref = snapshot_refs_map.get(MAIN_BRANCH).unwrap(); - let basic_snapshot_ref = SnapshotReference { + let expected_snapshot_ref = SnapshotReference { snapshot_id, retention: SnapshotRetention::Branch { - min_snapshots_to_keep: None, - max_snapshot_age_ms: None, - max_ref_age_ms: None, + min_snapshots_to_keep: Some(10), + max_snapshot_age_ms: Some(100), + max_ref_age_ms: Some(200), }, }; - assert_eq!(snapshot_ref, &basic_snapshot_ref); - - let table_commit = TableCommit::builder() - .ident(expected_table_ident.clone()) - .updates(vec![table_update_set_snapshot_ref]) - .requirements(vec![]) - .build(); - catalog.update_table(table_commit).await.unwrap(); - let table = catalog.load_table(&expected_table_ident).await.unwrap(); - let snapshot_refs_map = table.metadata().snapshot_refs(); - assert_eq!(1, snapshot_refs_map.len()); - let snapshot_ref = snapshot_refs_map.get(MAIN_BRANCH).unwrap(); - assert_eq!(snapshot_ref, &reference); + assert_eq!(snapshot_ref, &expected_snapshot_ref); } } diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index f5dd8b06ef..3d9bc057c4 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -331,12 +331,6 @@ impl TableMetadata { .insert(snapshot.snapshot_id(), Arc::new(snapshot)); } - /// update snapshot ref of table - #[inline] - pub fn update_snapshot_ref(&mut self, ref_name: String, reference: SnapshotReference) { - self.refs.insert(ref_name, reference); - } - /// Returns snapshot references. #[inline] pub fn snapshot_refs(&self) -> &HashMap { @@ -644,6 +638,49 @@ impl TableMetadataBuilder { Ok(self) } + /// Append snapshot to table + pub fn append_snapshot(mut self, snapshot: Snapshot) -> Result { + self.0.append_snapshot(snapshot); + + Ok(self) + } + + /// Set current snapshot ref of table + pub fn set_snapshot_ref( + mut self, + ref_name: String, + reference: SnapshotReference, + ) -> Result { + if let Some(existing_snapshot_ref) = self.0.refs.get(&ref_name) { + if (*existing_snapshot_ref) == reference { + return Ok(self); + } + } + + let snapshot_id = reference.snapshot_id; + let snapshot = self + .0 + .snapshot_by_id(snapshot_id) + .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Snapshot not found"))?; + if snapshot.snapshot_id() == reference.snapshot_id { + self.0.last_updated_ms = snapshot.timestamp_ms(); + } + + if ref_name == MAIN_BRANCH { + self.0.current_snapshot_id = Some(snapshot_id); + assert_ne!(0, self.0.last_updated_ms); + + self.0.snapshot_log.push(SnapshotLog { + timestamp_ms: self.0.last_updated_ms, + snapshot_id, + }); + } + + self.0.refs.insert(ref_name, reference); + + Ok(self) + } + /// Returns the new table metadata after changes. pub fn build(self) -> Result { Ok(self.0) From e0c8dd258bffcb24ba30a90aedabd553d0af2566 Mon Sep 17 00:00:00 2001 From: Li0k Date: Fri, 27 Sep 2024 13:47:10 +0800 Subject: [PATCH 14/15] feat(iceberg): introduce disable_config_load for storage_s3 --- crates/iceberg/src/io/storage_s3.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/crates/iceberg/src/io/storage_s3.rs b/crates/iceberg/src/io/storage_s3.rs index 60e97ab457..13b2007215 100644 --- a/crates/iceberg/src/io/storage_s3.rs +++ b/crates/iceberg/src/io/storage_s3.rs @@ -58,6 +58,8 @@ pub const S3_ASSUME_ROLE_ARN: &str = "client.assume-role.arn"; pub const S3_ASSUME_ROLE_EXTERNAL_ID: &str = "client.assume-role.external-id"; /// Optional session name used to assume an IAM role. pub const S3_ASSUME_ROLE_SESSION_NAME: &str = "client.assume-role.session-name"; +/// If set to true, the S3 configuration will not be loaded from the default locations. +pub const S3_DISABLE_CONFIG_LOAD: &str = "s3.disable_config_load"; /// Parse iceberg props to s3 config. pub(crate) fn s3_config_parse(mut m: HashMap) -> Result { @@ -126,6 +128,12 @@ pub(crate) fn s3_config_parse(mut m: HashMap) -> Result Date: Fri, 27 Sep 2024 17:26:16 +0800 Subject: [PATCH 15/15] chore(cargo): upgrade sqlx from 0.7.3 to 0.7.4 --- crates/catalog/sql/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml index 8d280ecb95..770417ac36 100644 --- a/crates/catalog/sql/Cargo.toml +++ b/crates/catalog/sql/Cargo.toml @@ -32,7 +32,7 @@ keywords = ["iceberg", "sql", "catalog"] async-trait = { workspace = true } iceberg = { workspace = true } serde_json = { workspace = true } -sqlx = { version = "0.7.3", features = ["any"], default-features = false } +sqlx = { version = "0.7.4", features = ["any"], default-features = false } typed-builder = { workspace = true } uuid = { workspace = true, features = ["v4"] } @@ -40,7 +40,7 @@ uuid = { workspace = true, features = ["v4"] } iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } itertools = { workspace = true } regex = "1.10.5" -sqlx = { version = "0.7.3", features = [ +sqlx = { version = "0.7.4", features = [ "tls-rustls", "runtime-tokio", "any",