From e83157a37dccb178d03cd59f01e175f494678eb1 Mon Sep 17 00:00:00 2001 From: callum-ryan <19956159+callum-ryan@users.noreply.github.com> Date: Sat, 7 Sep 2024 14:40:01 +0100 Subject: [PATCH 1/8] feat: add list/exist table + drop namespace Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> --- crates/catalog/sql/src/catalog.rs | 140 +++++++++++++++++++++++++++--- crates/catalog/sql/src/error.rs | 9 +- 2 files changed, 134 insertions(+), 15 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index c6a524cea1..483fe3a6df 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -28,7 +28,7 @@ use sqlx::any::{install_default_drivers, AnyPoolOptions, AnyQueryResult, AnyRow} use sqlx::{Any, AnyPool, Row, Transaction}; use typed_builder::TypedBuilder; -use crate::error::{from_sqlx_error, no_such_namespace_err}; +use crate::error::{from_sqlx_error, no_such_namespace_err, no_such_table_err}; static CATALOG_TABLE_NAME: &str = "iceberg_tables"; static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name"; @@ -472,16 +472,91 @@ impl Catalog for SqlCatalog { } } - async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> { - todo!() + async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> { + let exists = self.namespace_exists(namespace).await?; + if exists { + // if there are tables in the namespace, don't allow drop. + let tables = self.list_tables(namespace).await?; + if !tables.is_empty() { + return Err(Error::new( + iceberg::ErrorKind::Unexpected, + format!( + "Namespace {:?} is not empty. {} tables exist.", + namespace, + tables.len() + ), + )); + } + + self.execute( + &format!("DELETE FROM {NAMESPACE_TABLE_NAME} WHERE {NAMESPACE_FIELD_NAME} = ?"), + vec![Some(&namespace.join("."))], + None, + ) + .await?; + + Ok(()) + } else { + no_such_namespace_err(namespace) + } } - async fn list_tables(&self, _namespace: &NamespaceIdent) -> Result> { - todo!() + async fn list_tables(&self, namespace: &NamespaceIdent) -> Result> { + let exists = self.namespace_exists(namespace).await?; + if exists { + let rows = self + .fetch_rows( + &format!( + "SELECT {CATALOG_FIELD_TABLE_NAME}, + {CATALOG_FIELD_TABLE_NAMESPACE} + FROM {CATALOG_TABLE_NAME} + WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ? + AND {CATALOG_FIELD_CATALOG_NAME} = ?" + ), + vec![Some(&namespace.join(".")), Some(&self.name)], + ) + .await?; + + let mut tables = HashSet::::with_capacity(rows.len()); + + for row in rows.iter() { + let tbl = row + .try_get::(CATALOG_FIELD_TABLE_NAME) + .map_err(from_sqlx_error)?; + let ns_strs = row + .try_get::(CATALOG_FIELD_TABLE_NAMESPACE) + .map_err(from_sqlx_error)?; + let ns = NamespaceIdent::from_strs(ns_strs.split("."))?; + tables.insert(TableIdent::new(ns, tbl)); + } + + Ok(tables.into_iter().collect::>()) + } else { + no_such_namespace_err(namespace) + } } - async fn table_exists(&self, _identifier: &TableIdent) -> Result { - todo!() + async fn table_exists(&self, identifier: &TableIdent) -> Result { + let namespace = identifier.namespace().join("."); + let table_name = identifier.name(); + let table_counts = self + .fetch_rows( + &format!( + "SELECT 1 + FROM {CATALOG_TABLE_NAME} + WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ? + AND {CATALOG_FIELD_CATALOG_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAME} = ?" + ), + vec![Some(&namespace), Some(&self.name), Some(&table_name)], + ) + .await?; + + if !table_counts.is_empty() { + Ok(true) + } else { + Ok(false) + } } async fn drop_table(&self, _identifier: &TableIdent) -> Result<()> { @@ -515,7 +590,8 @@ mod tests { use std::hash::Hash; use iceberg::io::FileIOBuilder; - use iceberg::{Catalog, Namespace, NamespaceIdent}; + use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; + use iceberg::{Catalog, Namespace, NamespaceIdent, TableIdent}; use sqlx::migrate::MigrateDatabase; use tempfile::TempDir; @@ -562,6 +638,18 @@ mod tests { } } + fn simple_table_schema() -> Schema { + Schema::builder() + .with_fields(vec![NestedField::required( + 1, + "foo", + Type::Primitive(PrimitiveType::Int), + ) + .into()]) + .build() + .unwrap() + } + #[tokio::test] async fn test_initialized() { let warehouse_loc = temp_path(); @@ -810,7 +898,6 @@ mod tests { } #[tokio::test] - #[ignore = "drop_namespace not implemented"] async fn test_drop_namespace() { let warehouse_loc = temp_path(); let catalog = new_sql_catalog(warehouse_loc).await; @@ -823,7 +910,6 @@ mod tests { } #[tokio::test] - #[ignore = "drop_namespace not implemented"] async fn test_drop_nested_namespace() { let warehouse_loc = temp_path(); let catalog = new_sql_catalog(warehouse_loc).await; @@ -842,7 +928,6 @@ mod tests { } #[tokio::test] - #[ignore = "drop_namespace not implemented"] async fn test_drop_deeply_nested_namespace() { let warehouse_loc = temp_path(); let catalog = new_sql_catalog(warehouse_loc).await; @@ -875,7 +960,6 @@ mod tests { } #[tokio::test] - #[ignore = "drop_namespace not implemented"] async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() { let warehouse_loc = temp_path(); let catalog = new_sql_catalog(warehouse_loc).await; @@ -895,7 +979,6 @@ mod tests { } #[tokio::test] - #[ignore = "drop_namespace not implemented"] async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() { let warehouse_loc = temp_path(); let catalog = new_sql_catalog(warehouse_loc).await; @@ -917,7 +1000,6 @@ mod tests { } #[tokio::test] - #[ignore = "drop_namespace not implemented"] async fn test_dropping_a_namespace_does_not_drop_namespaces_nested_under_that_one() { let warehouse_loc = temp_path(); let catalog = new_sql_catalog(warehouse_loc).await; @@ -934,4 +1016,34 @@ mod tests { .await .unwrap()); } + + #[tokio::test] + async fn test_list_tables_returns_empty_vector() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + + assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![]); + } + + #[tokio::test] + async fn test_list_tables_throws_error_if_namespace_doesnt_exist() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + + let non_existent_namespace_ident = NamespaceIdent::new("n1".into()); + + assert_eq!( + catalog + .list_tables(&non_existent_namespace_ident) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such namespace: {:?}", + non_existent_namespace_ident + ), + ); + } } diff --git a/crates/catalog/sql/src/error.rs b/crates/catalog/sql/src/error.rs index cfefcc26a9..1487cc812f 100644 --- a/crates/catalog/sql/src/error.rs +++ b/crates/catalog/sql/src/error.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use iceberg::{Error, ErrorKind, NamespaceIdent, Result}; +use iceberg::{Error, ErrorKind, NamespaceIdent, Result, TableIdent}; /// Format an sqlx error into iceberg error. pub fn from_sqlx_error(error: sqlx::Error) -> Error { @@ -32,3 +32,10 @@ pub fn no_such_namespace_err(namespace: &NamespaceIdent) -> Result { format!("No such namespace: {:?}", namespace), )) } + +pub fn no_such_table_err(table_ident: &TableIdent) -> Result { + Err(Error::new( + ErrorKind::Unexpected, + format!("No such table: {:?}", table_ident), + )) +} From 1d9ea5ae1e20595fe019b84f666021d70036dc86 Mon Sep 17 00:00:00 2001 From: callum-ryan <19956159+callum-ryan@users.noreply.github.com> Date: Tue, 10 Sep 2024 21:01:38 +0100 Subject: [PATCH 2/8] feat: create table Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> --- crates/catalog/sql/Cargo.toml | 2 + crates/catalog/sql/src/catalog.rs | 402 +++++++++++++++++++++++++++++- crates/catalog/sql/src/error.rs | 10 + 3 files changed, 403 insertions(+), 11 deletions(-) diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml index 4a88e75b4d..16465610b5 100644 --- a/crates/catalog/sql/Cargo.toml +++ b/crates/catalog/sql/Cargo.toml @@ -33,6 +33,8 @@ async-trait = { workspace = true } iceberg = { workspace = true } sqlx = { version = "0.8.1", features = ["any"], default-features = false } typed-builder = { workspace = true } +uuid = { workspace = true, features = ["v4"] } +serde_json = { workspace = true } [dev-dependencies] iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 483fe3a6df..15a48ac6a3 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -20,6 +20,7 @@ use std::time::Duration; use async_trait::async_trait; use iceberg::io::FileIO; +use iceberg::spec::TableMetadataBuilder; use iceberg::table::Table; use iceberg::{ Catalog, Error, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, @@ -27,8 +28,11 @@ use iceberg::{ use sqlx::any::{install_default_drivers, AnyPoolOptions, AnyQueryResult, AnyRow}; use sqlx::{Any, AnyPool, Row, Transaction}; use typed_builder::TypedBuilder; +use uuid::Uuid; -use crate::error::{from_sqlx_error, no_such_namespace_err, no_such_table_err}; +use crate::error::{ + from_sqlx_error, no_such_namespace_err, no_such_table_err, table_already_exists_err, +}; static CATALOG_TABLE_NAME: &str = "iceberg_tables"; static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name"; @@ -43,6 +47,8 @@ static NAMESPACE_FIELD_NAME: &str = "namespace"; static NAMESPACE_FIELD_PROPERTY_KEY: &str = "property_key"; static NAMESPACE_FIELD_PROPERTY_VALUE: &str = "property_value"; +static NAMESPACE_LOCATION_PROPERTY_KEY: &str = "location"; + static MAX_CONNECTIONS: u32 = 10; // Default the SQL pool to 10 connections if not provided static IDLE_TIMEOUT: u64 = 10; // Default the maximum idle timeout per connection to 10s before it is closed static TEST_BEFORE_ACQUIRE: bool = true; // Default the health-check of each connection to enabled prior to returning @@ -71,8 +77,8 @@ pub struct SqlCatalogConfig { pub struct SqlCatalog { name: String, connection: AnyPool, - _warehouse_location: String, - _fileio: FileIO, + warehouse_location: String, + fileio: FileIO, sql_bind_style: SqlBindStyle, } @@ -142,8 +148,8 @@ impl SqlCatalog { Ok(SqlCatalog { name: config.name.to_owned(), connection: pool, - _warehouse_location: config.warehouse_location, - _fileio: config.file_io, + warehouse_location: config.warehouse_location, + fileio: config.file_io, sql_bind_style: config.sql_bind_style, }) } @@ -548,7 +554,7 @@ impl Catalog for SqlCatalog { AND {CATALOG_FIELD_CATALOG_NAME} = ? AND {CATALOG_FIELD_TABLE_NAME} = ?" ), - vec![Some(&namespace), Some(&self.name), Some(&table_name)], + vec![Some(&namespace), Some(&self.name), Some(table_name)], ) .await?; @@ -569,10 +575,70 @@ impl Catalog for SqlCatalog { async fn create_table( &self, - _namespace: &NamespaceIdent, - _creation: TableCreation, + namespace: &NamespaceIdent, + creation: TableCreation, ) -> Result { - todo!() + if !self.namespace_exists(namespace).await? { + return no_such_namespace_err(namespace); + } + + let tbl_name = creation.name.clone(); + let tbl_ident = TableIdent::new(namespace.clone(), tbl_name.clone()); + + if self.table_exists(&tbl_ident).await? { + return table_already_exists_err(&tbl_ident); + } + + let (tbl_creation, location) = match creation.location.clone() { + Some(location) => (creation, location), + None => { + // fall back to namespace-specific location + // and then to warehouse location + let nsp_properties = self.get_namespace(namespace).await?.properties().clone(); + let nsp_location = match nsp_properties.get(NAMESPACE_LOCATION_PROPERTY_KEY) { + Some(location) => location.clone(), + None => { + format!( + "{}/{}", + self.warehouse_location.clone(), + namespace.join("/") + ) + } + }; + + let tbl_location = format!("{}/{}", nsp_location, tbl_ident.name()); + + ( + TableCreation { + location: Some(tbl_location.clone()), + ..creation + }, + tbl_location, + ) + } + }; + + let tbl_metadata = TableMetadataBuilder::from_table_creation(tbl_creation)?.build()?; + let tbl_metadata_location = format!( + "{}/metadata/0-{}.metadata.json", + location.clone(), + Uuid::new_v4() + ); + + let file = self.fileio.new_output(&tbl_metadata_location)?; + file.write(serde_json::to_vec(&tbl_metadata)?.into()) + .await?; + + self.execute(&format!( + "INSERT INTO {CATALOG_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}) + VALUES (?, ?, ?, ?)"), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location)], None).await?; + + Ok(Table::builder() + .file_io(self.fileio.clone()) + .metadata_location(tbl_metadata_location) + .identifier(tbl_ident) + .metadata(tbl_metadata) + .build()?) } async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> Result<()> { @@ -590,11 +656,15 @@ mod tests { use std::hash::Hash; use iceberg::io::FileIOBuilder; - use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; - use iceberg::{Catalog, Namespace, NamespaceIdent, TableIdent}; + use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type}; + use iceberg::table::Table; + use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent}; + use itertools::Itertools; + use regex::Regex; use sqlx::migrate::MigrateDatabase; use tempfile::TempDir; + use crate::catalog::NAMESPACE_LOCATION_PROPERTY_KEY; use crate::{SqlBindStyle, SqlCatalog, SqlCatalogConfig}; fn temp_path() -> String { @@ -650,6 +720,67 @@ mod tests { .unwrap() } + async fn create_table(catalog: &C, table_ident: &TableIdent) { + let _ = catalog + .create_table( + &table_ident.namespace, + TableCreation::builder() + .name(table_ident.name().into()) + .schema(simple_table_schema()) + .location(temp_path()) + .build(), + ) + .await + .unwrap(); + } + + const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"; + + fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) { + assert_eq!(table.identifier(), expected_table_ident); + + let metadata = table.metadata(); + + assert_eq!(metadata.current_schema().as_ref(), expected_schema); + + let expected_partition_spec = PartitionSpec::builder(expected_schema) + .with_spec_id(0) + .build() + .unwrap(); + + assert_eq!( + metadata + .partition_specs_iter() + .map(|p| p.as_ref()) + .collect_vec(), + vec![&expected_partition_spec] + ); + + let expected_sorted_order = SortOrder::builder() + .with_order_id(0) + .with_fields(vec![]) + .build(expected_schema.clone()) + .unwrap(); + + assert_eq!( + metadata + .sort_orders_iter() + .map(|s| s.as_ref()) + .collect_vec(), + vec![&expected_sorted_order] + ); + + assert_eq!(metadata.properties(), &HashMap::new()); + + assert!(!table.readonly()); + } + + fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) { + let actual = table.metadata_location().unwrap().to_string(); + let regex = Regex::new(regex_str).unwrap(); + assert!(regex.is_match(&actual)) + } + #[tokio::test] async fn test_initialized() { let warehouse_loc = temp_path(); @@ -1046,4 +1177,253 @@ mod tests { ), ); } + + #[tokio::test] + async fn test_create_table_with_location() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc.clone()).await; + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + + let table_name = "abc"; + let location = warehouse_loc.clone(); + let table_creation = TableCreation::builder() + .name(table_name.into()) + .location(location.clone()) + .schema(simple_table_schema()) + .build(); + + let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + + assert_table_eq( + &catalog + .create_table(&namespace_ident, table_creation) + .await + .unwrap(), + &expected_table_ident, + &simple_table_schema(), + ); + + let table = catalog.load_table(&expected_table_ident).await.unwrap(); + + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + + assert!(table + .metadata_location() + .unwrap() + .to_string() + .starts_with(&location)) + } + + #[tokio::test] + async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + + let namespace_ident = NamespaceIdent::new("a".into()); + let mut namespace_properties = HashMap::new(); + let namespace_location = temp_path(); + namespace_properties.insert( + NAMESPACE_LOCATION_PROPERTY_KEY.to_string(), + namespace_location.to_string(), + ); + catalog + .create_namespace(&namespace_ident, namespace_properties) + .await + .unwrap(); + + let table_name = "tbl1"; + let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + let expected_table_metadata_location_regex = format!( + "^{}/tbl1/metadata/0-{}.metadata.json$", + namespace_location, UUID_REGEX_STR, + ); + + let table = catalog + .create_table( + &namespace_ident, + TableCreation::builder() + .name(table_name.into()) + .schema(simple_table_schema()) + // no location specified for table + .build(), + ) + .await + .unwrap(); + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex); + + let table = catalog.load_table(&expected_table_ident).await.unwrap(); + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex); + } + + #[tokio::test] + async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing( + ) { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + + let namespace_ident = NamespaceIdent::new("a".into()); + let mut namespace_properties = HashMap::new(); + let namespace_location = temp_path(); + namespace_properties.insert( + NAMESPACE_LOCATION_PROPERTY_KEY.to_string(), + namespace_location.to_string(), + ); + catalog + .create_namespace(&namespace_ident, namespace_properties) + .await + .unwrap(); + + let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + let mut nested_namespace_properties = HashMap::new(); + let nested_namespace_location = temp_path(); + nested_namespace_properties.insert( + NAMESPACE_LOCATION_PROPERTY_KEY.to_string(), + nested_namespace_location.to_string(), + ); + catalog + .create_namespace(&nested_namespace_ident, nested_namespace_properties) + .await + .unwrap(); + + let table_name = "tbl1"; + let expected_table_ident = + TableIdent::new(nested_namespace_ident.clone(), table_name.into()); + let expected_table_metadata_location_regex = format!( + "^{}/tbl1/metadata/0-{}.metadata.json$", + nested_namespace_location, UUID_REGEX_STR, + ); + + let table = catalog + .create_table( + &nested_namespace_ident, + TableCreation::builder() + .name(table_name.into()) + .schema(simple_table_schema()) + // no location specified for table + .build(), + ) + .await + .unwrap(); + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex); + + let table = catalog.load_table(&expected_table_ident).await.unwrap(); + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex); + } + + #[tokio::test] + async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing( + ) { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc.clone()).await; + + let namespace_ident = NamespaceIdent::new("a".into()); + // note: no location specified in namespace_properties + let namespace_properties = HashMap::new(); + catalog + .create_namespace(&namespace_ident, namespace_properties) + .await + .unwrap(); + + let table_name = "tbl1"; + let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + let expected_table_metadata_location_regex = format!( + "^{}/a/tbl1/metadata/0-{}.metadata.json$", + warehouse_loc, UUID_REGEX_STR + ); + + let table = catalog + .create_table( + &namespace_ident, + TableCreation::builder() + .name(table_name.into()) + .schema(simple_table_schema()) + // no location specified for table + .build(), + ) + .await + .unwrap(); + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex); + + let table = catalog.load_table(&expected_table_ident).await.unwrap(); + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex); + } + + #[tokio::test] + async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing( + ) { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc.clone()).await; + + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + + let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + create_namespace(&catalog, &nested_namespace_ident).await; + + let table_name = "tbl1"; + let expected_table_ident = + TableIdent::new(nested_namespace_ident.clone(), table_name.into()); + let expected_table_metadata_location_regex = format!( + "^{}/a/b/tbl1/metadata/0-{}.metadata.json$", + warehouse_loc, UUID_REGEX_STR + ); + + let table = catalog + .create_table( + &nested_namespace_ident, + TableCreation::builder() + .name(table_name.into()) + .schema(simple_table_schema()) + // no location specified for table + .build(), + ) + .await + .unwrap(); + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex); + + let table = catalog.load_table(&expected_table_ident).await.unwrap(); + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex); + } + + #[tokio::test] + async fn test_create_table_throws_error_if_table_with_same_name_already_exists() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc.clone()).await; + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + let table_name = "tbl1"; + let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + create_table(&catalog, &table_ident).await; + + let tmp_dir = TempDir::new().unwrap(); + let location = tmp_dir.path().to_str().unwrap().to_string(); + + assert_eq!( + catalog + .create_table( + &namespace_ident, + TableCreation::builder() + .name(table_name.into()) + .schema(simple_table_schema()) + .location(location) + .build() + ) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => Cannot create table {:?}. Table already exists.", + &table_ident + ) + ); + } } diff --git a/crates/catalog/sql/src/error.rs b/crates/catalog/sql/src/error.rs index 1487cc812f..0bb8ba0c89 100644 --- a/crates/catalog/sql/src/error.rs +++ b/crates/catalog/sql/src/error.rs @@ -39,3 +39,13 @@ pub fn no_such_table_err(table_ident: &TableIdent) -> Result { format!("No such table: {:?}", table_ident), )) } + +pub fn table_already_exists_err(table_ident: &TableIdent) -> Result { + Err(Error::new( + ErrorKind::Unexpected, + format!( + "Cannot create table {:?}. Table already exists.", + table_ident + ), + )) +} From 445633eb3cf263aa220452217f932b616ab008c2 Mon Sep 17 00:00:00 2001 From: callum-ryan <19956159+callum-ryan@users.noreply.github.com> Date: Tue, 10 Sep 2024 21:33:13 +0100 Subject: [PATCH 3/8] feat: add load table Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> --- crates/catalog/sql/src/catalog.rs | 50 +++++++++++++++++++++++++++++-- 1 file changed, 47 insertions(+), 3 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 15a48ac6a3..78b27d700a 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -20,7 +20,7 @@ use std::time::Duration; use async_trait::async_trait; use iceberg::io::FileIO; -use iceberg::spec::TableMetadataBuilder; +use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ Catalog, Error, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, @@ -41,6 +41,7 @@ static CATALOG_FIELD_TABLE_NAMESPACE: &str = "table_namespace"; static CATALOG_FIELD_METADATA_LOCATION_PROP: &str = "metadata_location"; static CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location"; static CATALOG_FIELD_RECORD_TYPE: &str = "iceberg_type"; +static CATALOG_FIELD_TABLE_RECORD_TYPE: &str = "TABLE"; static NAMESPACE_TABLE_NAME: &str = "iceberg_namespace_properties"; static NAMESPACE_FIELD_NAME: &str = "namespace"; @@ -569,8 +570,51 @@ impl Catalog for SqlCatalog { todo!() } - async fn load_table(&self, _identifier: &TableIdent) -> Result
{ - todo!() + async fn load_table(&self, identifier: &TableIdent) -> Result
{ + if !self.table_exists(identifier).await? { + return no_such_table_err(identifier); + } + + let rows = self + .fetch_rows( + &format!( + "SELECT {CATALOG_FIELD_METADATA_LOCATION_PROP} + FROM {CATALOG_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? + AND ( + {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' + OR {CATALOG_FIELD_RECORD_TYPE} IS NULL + )" + ), + vec![ + Some(&self.name), + Some(identifier.name()), + Some(&identifier.namespace().join(".")), + ], + ) + .await?; + + if rows.is_empty() { + return no_such_table_err(identifier); + } + + let row = &rows[0]; + let tbl_metadata_location = row + .try_get::(CATALOG_FIELD_METADATA_LOCATION_PROP) + .map_err(from_sqlx_error)?; + + let file = self.fileio.new_input(&tbl_metadata_location)?; + let metadata_content = file.read().await?; + let metadata = serde_json::from_slice::(&metadata_content)?; + + Ok(Table::builder() + .file_io(self.fileio.clone()) + .identifier(identifier.clone()) + .metadata_location(tbl_metadata_location) + .metadata(metadata) + .build()?) } async fn create_table( From 7553605ac5f6664b8e8b279c16aea47113a1eae3 Mon Sep 17 00:00:00 2001 From: callum-ryan <19956159+callum-ryan@users.noreply.github.com> Date: Wed, 11 Sep 2024 21:00:25 +0100 Subject: [PATCH 4/8] feat: add the rest of table ops Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> --- crates/catalog/sql/src/catalog.rs | 258 ++++++++++++++++++++++++++++-- 1 file changed, 249 insertions(+), 9 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 78b27d700a..262ed6a656 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -518,7 +518,11 @@ impl Catalog for SqlCatalog { {CATALOG_FIELD_TABLE_NAMESPACE} FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ? - AND {CATALOG_FIELD_CATALOG_NAME} = ?" + AND {CATALOG_FIELD_CATALOG_NAME} = ? + AND ( + {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' + OR {CATALOG_FIELD_RECORD_TYPE} IS NULL + )", ), vec![Some(&namespace.join(".")), Some(&self.name)], ) @@ -553,7 +557,11 @@ impl Catalog for SqlCatalog { FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ? AND {CATALOG_FIELD_CATALOG_NAME} = ? - AND {CATALOG_FIELD_TABLE_NAME} = ?" + AND {CATALOG_FIELD_TABLE_NAME} = ? + AND ( + {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' + OR {CATALOG_FIELD_RECORD_TYPE} IS NULL + )" ), vec![Some(&namespace), Some(&self.name), Some(table_name)], ) @@ -566,8 +574,32 @@ impl Catalog for SqlCatalog { } } - async fn drop_table(&self, _identifier: &TableIdent) -> Result<()> { - todo!() + async fn drop_table(&self, identifier: &TableIdent) -> Result<()> { + if !self.table_exists(identifier).await? { + return no_such_table_err(identifier); + } + + self.execute( + &format!( + "DELETE FROM {CATALOG_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? + AND {CATALOG_FIELD_TABLE_NAME} = ? + AND ( + {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' + OR {CATALOG_FIELD_RECORD_TYPE} IS NULL + )" + ), + vec![ + Some(&self.name), + Some(identifier.name()), + Some(&identifier.namespace().join(".")), + ], + None, + ) + .await?; + + Ok(()) } async fn load_table(&self, identifier: &TableIdent) -> Result
{ @@ -674,8 +706,10 @@ impl Catalog for SqlCatalog { .await?; self.execute(&format!( - "INSERT INTO {CATALOG_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}) - VALUES (?, ?, ?, ?)"), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location)], None).await?; + "INSERT INTO {CATALOG_TABLE_NAME} + ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE}) + VALUES (?, ?, ?, ?, ?) + "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?; Ok(Table::builder() .file_io(self.fileio.clone()) @@ -685,8 +719,47 @@ impl Catalog for SqlCatalog { .build()?) } - async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> Result<()> { - todo!() + async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> { + if src == dest { + return Ok(()); + } + + if !self.table_exists(src).await? { + return no_such_table_err(src); + } + + if !self.namespace_exists(dest.namespace()).await? { + return no_such_namespace_err(dest.namespace()); + } + + if self.table_exists(dest).await? { + return table_already_exists_err(dest); + } + + self.execute( + &format!( + "UPDATE {CATALOG_TABLE_NAME} + SET {CATALOG_FIELD_TABLE_NAME} = ?, {CATALOG_FIELD_TABLE_NAMESPACE} = ? + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? + AND ( + {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' + OR {CATALOG_FIELD_RECORD_TYPE} IS NULL + )" + ), + vec![ + Some(dest.name()), + Some(&dest.namespace().join(".")), + Some(&self.name), + Some(src.name()), + Some(&src.namespace().join(".")), + ], + None, + ) + .await?; + + Ok(()) } async fn update_table(&self, _commit: TableCommit) -> Result
{ @@ -711,6 +784,8 @@ mod tests { use crate::catalog::NAMESPACE_LOCATION_PROPERTY_KEY; use crate::{SqlBindStyle, SqlCatalog, SqlCatalogConfig}; + const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"; + fn temp_path() -> String { let temp_dir = TempDir::new().unwrap(); temp_dir.path().to_str().unwrap().to_string() @@ -778,7 +853,11 @@ mod tests { .unwrap(); } - const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"; + async fn create_tables(catalog: &C, table_idents: Vec<&TableIdent>) { + for table_ident in table_idents { + create_table(catalog, table_ident).await; + } + } fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) { assert_eq!(table.identifier(), expected_table_ident); @@ -1470,4 +1549,165 @@ mod tests { ) ); } + + #[tokio::test] + async fn test_rename_table_in_same_namespace() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident = NamespaceIdent::new("n1".into()); + create_namespace(&catalog, &namespace_ident).await; + let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); + let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into()); + create_table(&catalog, &src_table_ident).await; + + catalog + .rename_table(&src_table_ident, &dst_table_ident) + .await + .unwrap(); + + assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![ + dst_table_ident + ],); + } + + #[tokio::test] + async fn test_rename_table_across_namespaces() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let src_namespace_ident = NamespaceIdent::new("a".into()); + let dst_namespace_ident = NamespaceIdent::new("b".into()); + create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await; + let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into()); + let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl2".into()); + create_table(&catalog, &src_table_ident).await; + + catalog + .rename_table(&src_table_ident, &dst_table_ident) + .await + .unwrap(); + + assert_eq!( + catalog.list_tables(&src_namespace_ident).await.unwrap(), + vec![], + ); + + assert_eq!( + catalog.list_tables(&dst_namespace_ident).await.unwrap(), + vec![dst_table_ident], + ); + } + + #[tokio::test] + async fn test_rename_table_src_table_is_same_as_dst_table() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident = NamespaceIdent::new("n1".into()); + create_namespace(&catalog, &namespace_ident).await; + let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into()); + create_table(&catalog, &table_ident).await; + + catalog + .rename_table(&table_ident, &table_ident) + .await + .unwrap(); + + assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![ + table_ident + ],); + } + + #[tokio::test] + async fn test_rename_table_across_nested_namespaces() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); + create_namespaces(&catalog, &vec![ + &namespace_ident_a, + &namespace_ident_a_b, + &namespace_ident_a_b_c, + ]) + .await; + + let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into()); + create_tables(&catalog, vec![&src_table_ident]).await; + + let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into()); + catalog + .rename_table(&src_table_ident, &dst_table_ident) + .await + .unwrap(); + + assert!(!catalog.table_exists(&src_table_ident).await.unwrap()); + + assert!(catalog.table_exists(&dst_table_ident).await.unwrap()); + } + + #[tokio::test] + async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let src_namespace_ident = NamespaceIdent::new("n1".into()); + let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into()); + create_namespace(&catalog, &src_namespace_ident).await; + create_table(&catalog, &src_table_ident).await; + + let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into()); + let dst_table_ident = + TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into()); + assert_eq!( + catalog + .rename_table(&src_table_ident, &dst_table_ident) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such namespace: {:?}", + non_existent_dst_namespace_ident + ), + ); + } + + #[tokio::test] + async fn test_rename_table_throws_error_if_src_table_doesnt_exist() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident = NamespaceIdent::new("n1".into()); + create_namespace(&catalog, &namespace_ident).await; + let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); + let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into()); + + assert_eq!( + catalog + .rename_table(&src_table_ident, &dst_table_ident) + .await + .unwrap_err() + .to_string(), + format!("Unexpected => No such table: {:?}", src_table_ident), + ); + } + + #[tokio::test] + async fn test_rename_table_throws_error_if_dst_table_already_exists() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident = NamespaceIdent::new("n1".into()); + create_namespace(&catalog, &namespace_ident).await; + let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); + let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into()); + create_tables(&catalog, vec![&src_table_ident, &dst_table_ident]).await; + + assert_eq!( + catalog + .rename_table(&src_table_ident, &dst_table_ident) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => Cannot create table {:? }. Table already exists.", + &dst_table_ident + ), + ); + } } From 8229ea86446ccaa2fa1953b35d0a55642d8fd4bf Mon Sep 17 00:00:00 2001 From: callum-ryan <19956159+callum-ryan@users.noreply.github.com> Date: Wed, 11 Sep 2024 21:07:48 +0100 Subject: [PATCH 5/8] fix: sort order on test Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> --- crates/catalog/sql/src/catalog.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 262ed6a656..5e405a4d1d 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -882,7 +882,7 @@ mod tests { let expected_sorted_order = SortOrder::builder() .with_order_id(0) .with_fields(vec![]) - .build(expected_schema.clone()) + .build(expected_schema) .unwrap(); assert_eq!( From 61598af4774abaa53034c2b8e7c808c144f68552 Mon Sep 17 00:00:00 2001 From: callum-ryan <19956159+callum-ryan@users.noreply.github.com> Date: Wed, 11 Sep 2024 21:13:23 +0100 Subject: [PATCH 6/8] fix: sort Cargo.toml Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> --- crates/catalog/sql/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml index 16465610b5..a516716501 100644 --- a/crates/catalog/sql/Cargo.toml +++ b/crates/catalog/sql/Cargo.toml @@ -31,10 +31,10 @@ keywords = ["iceberg", "sql", "catalog"] [dependencies] async-trait = { workspace = true } iceberg = { workspace = true } +serde_json = { workspace = true } sqlx = { version = "0.8.1", features = ["any"], default-features = false } typed-builder = { workspace = true } uuid = { workspace = true, features = ["v4"] } -serde_json = { workspace = true } [dev-dependencies] iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } From 589447520a7884a9086484c3976a06f7a3fcd17e Mon Sep 17 00:00:00 2001 From: callum-ryan <19956159+callum-ryan@users.noreply.github.com> Date: Mon, 16 Sep 2024 19:02:38 +0100 Subject: [PATCH 7/8] fix: Adjust error message for existence of table Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> --- crates/catalog/sql/src/catalog.rs | 10 ++-------- crates/catalog/sql/src/error.rs | 5 +---- 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 5e405a4d1d..e65a77624a 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -1543,10 +1543,7 @@ mod tests { .await .unwrap_err() .to_string(), - format!( - "Unexpected => Cannot create table {:?}. Table already exists.", - &table_ident - ) + format!("Unexpected => Table {:?} already exists.", &table_ident) ); } @@ -1704,10 +1701,7 @@ mod tests { .await .unwrap_err() .to_string(), - format!( - "Unexpected => Cannot create table {:? }. Table already exists.", - &dst_table_ident - ), + format!("Unexpected => Table {:?} already exists.", &dst_table_ident), ); } } diff --git a/crates/catalog/sql/src/error.rs b/crates/catalog/sql/src/error.rs index 0bb8ba0c89..15b56e8e23 100644 --- a/crates/catalog/sql/src/error.rs +++ b/crates/catalog/sql/src/error.rs @@ -43,9 +43,6 @@ pub fn no_such_table_err(table_ident: &TableIdent) -> Result { pub fn table_already_exists_err(table_ident: &TableIdent) -> Result { Err(Error::new( ErrorKind::Unexpected, - format!( - "Cannot create table {:?}. Table already exists.", - table_ident - ), + format!("Table {:?} already exists.", table_ident), )) } From 07a989ea2852554c3ee70ffaab9bee0ae6fb2268 Mon Sep 17 00:00:00 2001 From: callum-ryan <19956159+callum-ryan@users.noreply.github.com> Date: Mon, 30 Sep 2024 19:05:31 +0100 Subject: [PATCH 8/8] fix: update_table throws Unsupported, add catalog filter to drop_nsp Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> --- crates/catalog/sql/src/catalog.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index e65a77624a..b7976d9d56 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -23,7 +23,8 @@ use iceberg::io::FileIO; use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ - Catalog, Error, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, + Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, + TableIdent, }; use sqlx::any::{install_default_drivers, AnyPoolOptions, AnyQueryResult, AnyRow}; use sqlx::{Any, AnyPool, Row, Transaction}; @@ -496,8 +497,12 @@ impl Catalog for SqlCatalog { } self.execute( - &format!("DELETE FROM {NAMESPACE_TABLE_NAME} WHERE {NAMESPACE_FIELD_NAME} = ?"), - vec![Some(&namespace.join("."))], + &format!( + "DELETE FROM {NAMESPACE_TABLE_NAME} + WHERE {NAMESPACE_FIELD_NAME} = ? + AND {CATALOG_FIELD_CATALOG_NAME} = ?" + ), + vec![Some(&namespace.join(".")), Some(&self.name)], None, ) .await?; @@ -763,7 +768,10 @@ impl Catalog for SqlCatalog { } async fn update_table(&self, _commit: TableCommit) -> Result
{ - todo!() + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Updating a table is not supported yet", + )) } }