From 82be6c6d2b67e7d3c37761f858361ff8b8f8b95d Mon Sep 17 00:00:00 2001 From: callum-ryan <19956159+callum-ryan@users.noreply.github.com> Date: Fri, 9 Aug 2024 20:34:29 +0100 Subject: [PATCH 01/12] feat: SQL Catalog - namespaces Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> --- crates/catalog/sql/src/catalog.rs | 199 +++++++++++++++++++++++++++--- crates/catalog/sql/src/error.rs | 9 +- 2 files changed, 193 insertions(+), 15 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 078fff6903..2f114f2480 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -22,12 +22,14 @@ use std::time::Duration; use async_trait::async_trait; use iceberg::io::FileIO; use iceberg::table::Table; -use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent}; +use iceberg::{ + Catalog, Error, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, +}; use sqlx::any::{install_default_drivers, AnyPoolOptions, AnyRow}; -use sqlx::AnyPool; +use sqlx::{AnyPool, Row}; use typed_builder::TypedBuilder; -use crate::error::from_sqlx_error; +use crate::error::{from_sqlx_error, no_such_namespace_err}; static CATALOG_TABLE_NAME: &str = "iceberg_tables"; static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name"; @@ -61,7 +63,7 @@ pub struct SqlCatalogConfig { #[derive(Debug)] /// Sql catalog implementation. pub struct SqlCatalog { - _name: String, + name: String, connection: AnyPool, _warehouse_location: String, _fileio: FileIO, @@ -132,7 +134,7 @@ impl SqlCatalog { .map_err(from_sqlx_error)?; Ok(SqlCatalog { - _name: config.name.to_owned(), + name: config.name.to_owned(), connection: pool, _warehouse_location: config.warehouse_location, _fileio: config.file_io, @@ -173,25 +175,194 @@ impl SqlCatalog { impl Catalog for SqlCatalog { async fn list_namespaces( &self, - _parent: Option<&NamespaceIdent>, + parent: Option<&NamespaceIdent>, ) -> Result> { - todo!() + let table_namespaces_stmt = format!( + "SELECT {CATALOG_FIELD_TABLE_NAMESPACE} + FROM {CATALOG_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ?" + ); + let namespaces_stmt = format!( + "SELECT {NAMESPACE_FIELD_NAME} + FROM {NAMESPACE_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ?" + ); + + match parent { + Some(parent) => match self.namespace_exists(parent).await? { + true => { + let parent_table_namespaces_stmt = format!( + "{table_namespaces_stmt} AND {CATALOG_FIELD_TABLE_NAMESPACE} LIKE CONCAT(?, '%')" + ); + let parent_namespaces_stmt = + format!("{namespaces_stmt} AND {NAMESPACE_FIELD_NAME} LIKE CONCAT(?, '%')"); + + let namespace_rows = self + .execute_statement( + &format!( + "{parent_namespaces_stmt} UNION {parent_table_namespaces_stmt}" + ), + vec![ + Some(&self.name), + Some(&parent.join(".")), + Some(&self.name), + Some(&parent.join(".")), + ], + ) + .await?; + + Ok(namespace_rows + .iter() + .filter_map(|r| { + let nsp = r.try_get::(0).ok(); + nsp.and_then(|n| NamespaceIdent::from_strs(n.split('.')).ok()) + }) + .collect()) + } + false => no_such_namespace_err(parent), + }, + None => { + let namespace_rows = self + .execute_statement( + &format!("{namespaces_stmt} UNION {table_namespaces_stmt}"), + vec![Some(&self.name), Some(&self.name)], + ) + .await?; + + Ok(namespace_rows + .iter() + .filter_map(|r| { + let nsp = r.try_get::(0).ok(); + nsp.and_then(|n| NamespaceIdent::from_strs(n.split('.')).ok()) + }) + .collect()) + } + } } async fn create_namespace( &self, - _namespace: &NamespaceIdent, - _properties: HashMap, + namespace: &NamespaceIdent, + properties: HashMap, ) -> Result { - todo!() + let exists = self.namespace_exists(namespace).await?; + if exists { + Err(Error::new( + iceberg::ErrorKind::Unexpected, + format!("Namespace {:?} already exists", namespace), + )) + } else { + let namespace_str = namespace.join("."); + let insert = format!( + "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE}) + VALUES (?, ?, ?, ?)"); + if !properties.is_empty() { + let mut query_args = vec![]; + let mut properties_insert = insert.clone(); + for (index, (key, value)) in properties.iter().enumerate() { + query_args.extend( + [ + Some(&self.name), + Some(&namespace_str), + Some(key), + Some(value), + ] + .iter(), + ); + if index > 0 { + properties_insert = format!("{properties_insert}, (?, ?, ?, ?)"); + } + } + + self.execute_statement(&properties_insert, query_args) + .await?; + + Ok(Namespace::with_properties(namespace.clone(), properties)) + } else { + // set a default property of exists = true + self.execute_statement(&insert, vec![ + Some(&self.name), + Some(&namespace_str), + Some(&"exists".to_string()), + Some(&"true".to_string()), + ]) + .await?; + Ok(Namespace::with_properties(namespace.clone(), properties)) + } + } } - async fn get_namespace(&self, _namespace: &NamespaceIdent) -> Result { - todo!() + async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result { + let exists = self.namespace_exists(namespace).await?; + if exists { + let namespace_props = self + .execute_statement( + &format!( + "SELECT + {NAMESPACE_FIELD_NAME}, + {NAMESPACE_FIELD_PROPERTY_KEY}, + {NAMESPACE_FIELD_PROPERTY_VALUE} + FROM {NAMESPACE_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + AND {NAMESPACE_FIELD_NAME} = ?" + ), + vec![Some(&self.name), Some(&namespace.join("."))], + ) + .await?; + + let properties: HashMap = namespace_props + .iter() + .filter_map(|r| { + let key = r.try_get(NAMESPACE_FIELD_PROPERTY_KEY).ok(); + let value = r.try_get(NAMESPACE_FIELD_PROPERTY_VALUE).ok(); + match (key, value) { + (Some(k), Some(v)) => Some((k, v)), + _ => None, + } + }) + .collect(); + + Ok(Namespace::with_properties(namespace.clone(), properties)) + } else { + no_such_namespace_err(namespace) + } } - async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> Result { - todo!() + async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result { + let namespace_str = namespace.join("."); + + let table_namespaces = self + .execute_statement( + &format!( + "SELECT 1 FROM {CATALOG_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? + LIMIT 1" + ), + vec![Some(&self.name), Some(&namespace_str)], + ) + .await?; + + if !table_namespaces.is_empty() { + Ok(true) + } else { + let namespaces = self + .execute_statement( + &format!( + "SELECT 1 FROM {NAMESPACE_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + AND {NAMESPACE_FIELD_NAME} = ? + LIMIT 1" + ), + vec![Some(&self.name), Some(&namespace_str)], + ) + .await?; + if !namespaces.is_empty() { + Ok(true) + } else { + Ok(false) + } + } } async fn update_namespace( diff --git a/crates/catalog/sql/src/error.rs b/crates/catalog/sql/src/error.rs index 90bba1f05d..cfefcc26a9 100644 --- a/crates/catalog/sql/src/error.rs +++ b/crates/catalog/sql/src/error.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use iceberg::{Error, ErrorKind}; +use iceberg::{Error, ErrorKind, NamespaceIdent, Result}; /// Format an sqlx error into iceberg error. pub fn from_sqlx_error(error: sqlx::Error) -> Error { @@ -25,3 +25,10 @@ pub fn from_sqlx_error(error: sqlx::Error) -> Error { ) .with_source(error) } + +pub fn no_such_namespace_err(namespace: &NamespaceIdent) -> Result { + Err(Error::new( + ErrorKind::Unexpected, + format!("No such namespace: {:?}", namespace), + )) +} From 5a09b182caa4d2fbb3cee8c1f12646b4e683f7bb Mon Sep 17 00:00:00 2001 From: callum-ryan <19956159+callum-ryan@users.noreply.github.com> Date: Fri, 9 Aug 2024 23:03:33 +0100 Subject: [PATCH 02/12] feat: use transaction for updates and creates Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> --- crates/catalog/sql/src/catalog.rs | 140 ++++++++++++++++++++++++++---- 1 file changed, 121 insertions(+), 19 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 2f114f2480..67fa407a78 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -25,8 +25,8 @@ use iceberg::table::Table; use iceberg::{ Catalog, Error, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, }; -use sqlx::any::{install_default_drivers, AnyPoolOptions, AnyRow}; -use sqlx::{AnyPool, Row}; +use sqlx::any::{install_default_drivers, AnyPoolOptions, AnyQueryResult, AnyRow}; +use sqlx::{Any, AnyConnection, AnyPool, Executor, Pool, Row, Transaction}; use typed_builder::TypedBuilder; use crate::error::{from_sqlx_error, no_such_namespace_err}; @@ -143,11 +143,12 @@ impl SqlCatalog { } /// SQLX Any does not implement PostgresSQL bindings, so we have to do this. - pub async fn execute_statement( + pub async fn fetch_rows( &self, query: &String, args: Vec>, ) -> Result> { + // TODO: move this out to a function let query_with_placeholders: Cow = if self.sql_bind_style == SqlBindStyle::DollarNumeric { let mut query = query.clone(); @@ -169,6 +170,40 @@ impl SqlCatalog { .await .map_err(from_sqlx_error) } + /// Execute statements in a transaction, provided or not + pub async fn execute( + &self, + query: &String, + args: Vec>, + transaction: Option<&mut Transaction<'_, Any>>, + ) -> Result { + // TODO: move this out to a function + let query_with_placeholders: Cow = + if self.sql_bind_style == SqlBindStyle::DollarNumeric { + let mut query = query.clone(); + for i in 0..args.len() { + query = query.replacen("?", &format!("${}", i + 1), 1); + } + Cow::Owned(query) + } else { + Cow::Borrowed(query) + }; + + let mut sqlx_query = sqlx::query(&query_with_placeholders); + for arg in args { + sqlx_query = sqlx_query.bind(arg); + } + + match transaction { + Some(t) => sqlx_query.execute(&mut **t).await.map_err(from_sqlx_error), + None => { + let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?; + let result = sqlx_query.execute(&mut *tx).await.map_err(from_sqlx_error); + let _ = tx.commit().await.map_err(from_sqlx_error); + result + } + } + } } #[async_trait] @@ -198,7 +233,7 @@ impl Catalog for SqlCatalog { format!("{namespaces_stmt} AND {NAMESPACE_FIELD_NAME} LIKE CONCAT(?, '%')"); let namespace_rows = self - .execute_statement( + .fetch_rows( &format!( "{parent_namespaces_stmt} UNION {parent_table_namespaces_stmt}" ), @@ -223,7 +258,7 @@ impl Catalog for SqlCatalog { }, None => { let namespace_rows = self - .execute_statement( + .fetch_rows( &format!("{namespaces_stmt} UNION {table_namespaces_stmt}"), vec![Some(&self.name), Some(&self.name)], ) @@ -274,18 +309,21 @@ impl Catalog for SqlCatalog { } } - self.execute_statement(&properties_insert, query_args) - .await?; + self.execute(&properties_insert, query_args, None).await?; Ok(Namespace::with_properties(namespace.clone(), properties)) } else { // set a default property of exists = true - self.execute_statement(&insert, vec![ - Some(&self.name), - Some(&namespace_str), - Some(&"exists".to_string()), - Some(&"true".to_string()), - ]) + self.execute( + &insert, + vec![ + Some(&self.name), + Some(&namespace_str), + Some(&"exists".to_string()), + Some(&"true".to_string()), + ], + None, + ) .await?; Ok(Namespace::with_properties(namespace.clone(), properties)) } @@ -296,7 +334,7 @@ impl Catalog for SqlCatalog { let exists = self.namespace_exists(namespace).await?; if exists { let namespace_props = self - .execute_statement( + .fetch_rows( &format!( "SELECT {NAMESPACE_FIELD_NAME}, @@ -332,7 +370,7 @@ impl Catalog for SqlCatalog { let namespace_str = namespace.join("."); let table_namespaces = self - .execute_statement( + .fetch_rows( &format!( "SELECT 1 FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_FIELD_CATALOG_NAME} = ? @@ -347,7 +385,7 @@ impl Catalog for SqlCatalog { Ok(true) } else { let namespaces = self - .execute_statement( + .fetch_rows( &format!( "SELECT 1 FROM {NAMESPACE_TABLE_NAME} WHERE {CATALOG_FIELD_CATALOG_NAME} = ? @@ -367,10 +405,74 @@ impl Catalog for SqlCatalog { async fn update_namespace( &self, - _namespace: &NamespaceIdent, - _properties: HashMap, + namespace: &NamespaceIdent, + properties: HashMap, ) -> Result<()> { - todo!() + let exists = self.namespace_exists(namespace).await?; + if exists { + let existing_properties = self.get_namespace(namespace).await?.properties().clone(); + let namespace_str = namespace.join("."); + + let mut updates = vec![]; + let mut inserts = vec![]; + + for (key, value) in properties.iter() { + if existing_properties.contains_key(key) { + if existing_properties.get(key) != Some(value) { + updates.push((key, value)); + } + } else { + inserts.push((key, value)); + } + } + + let mut tx = self.connection.begin().await.map_err(from_sqlx_error)?; + let update_stmt = format!( + "UPDATE {NAMESPACE_TABLE_NAME} SET {NAMESPACE_FIELD_PROPERTY_VALUE} = ? + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + AND {NAMESPACE_FIELD_NAME} = ? + AND {NAMESPACE_FIELD_PROPERTY_KEY} = ?" + ); + + let insert_stmt = format!( + "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE}) + VALUES (?, ?, ?, ?)" + ); + + for (key, value) in updates { + self.execute( + &update_stmt, + vec![ + Some(value), + Some(&self.name), + Some(&namespace_str), + Some(key), + ], + Some(&mut tx), + ) + .await?; + } + + for (key, value) in inserts { + self.execute( + &insert_stmt, + vec![ + Some(&self.name), + Some(&namespace_str), + Some(key), + Some(value), + ], + Some(&mut tx), + ) + .await?; + } + + let _ = tx.commit().await.map_err(from_sqlx_error)?; + + Ok(()) + } else { + no_such_namespace_err(namespace) + } } async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> { From fabc12c4e13e4288919939875f0c1bac520e19a2 Mon Sep 17 00:00:00 2001 From: callum-ryan <19956159+callum-ryan@users.noreply.github.com> Date: Fri, 9 Aug 2024 23:26:27 +0100 Subject: [PATCH 03/12] fix: pull out query param builder to fn Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> --- crates/catalog/sql/src/catalog.rs | 49 +++++++++++++------------------ 1 file changed, 20 insertions(+), 29 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 67fa407a78..0ea505132e 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::borrow::Cow; use std::collections::HashMap; use std::time::Duration; @@ -26,7 +25,7 @@ use iceberg::{ Catalog, Error, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, }; use sqlx::any::{install_default_drivers, AnyPoolOptions, AnyQueryResult, AnyRow}; -use sqlx::{Any, AnyConnection, AnyPool, Executor, Pool, Row, Transaction}; +use sqlx::{Any, AnyPool, Row, Transaction}; use typed_builder::TypedBuilder; use crate::error::{from_sqlx_error, no_such_namespace_err}; @@ -143,22 +142,24 @@ impl SqlCatalog { } /// SQLX Any does not implement PostgresSQL bindings, so we have to do this. - pub async fn fetch_rows( - &self, - query: &String, - args: Vec>, - ) -> Result> { - // TODO: move this out to a function - let query_with_placeholders: Cow = - if self.sql_bind_style == SqlBindStyle::DollarNumeric { - let mut query = query.clone(); - for i in 0..args.len() { - query = query.replacen("?", &format!("${}", i + 1), 1); + pub fn build_query(&self, query: &str) -> String { + match self.sql_bind_style { + SqlBindStyle::DollarNumeric => { + let mut query = query.to_owned(); + let mut i = 1; + while let Some(pos) = query.find('?') { + query.replace_range(pos..pos + 1, &format!("${}", i)); + i += 1; } - Cow::Owned(query) - } else { - Cow::Borrowed(query) - }; + query + } + _ => query.to_owned(), + } + } + + /// Fetch a vec of AnyRows from a given query + pub async fn fetch_rows(&self, query: &str, args: Vec>) -> Result> { + let query_with_placeholders = self.build_query(query); let mut sqlx_query = sqlx::query(&query_with_placeholders); for arg in args { @@ -173,21 +174,11 @@ impl SqlCatalog { /// Execute statements in a transaction, provided or not pub async fn execute( &self, - query: &String, + query: &str, args: Vec>, transaction: Option<&mut Transaction<'_, Any>>, ) -> Result { - // TODO: move this out to a function - let query_with_placeholders: Cow = - if self.sql_bind_style == SqlBindStyle::DollarNumeric { - let mut query = query.clone(); - for i in 0..args.len() { - query = query.replacen("?", &format!("${}", i + 1), 1); - } - Cow::Owned(query) - } else { - Cow::Borrowed(query) - }; + let query_with_placeholders = self.build_query(query); let mut sqlx_query = sqlx::query(&query_with_placeholders); for arg in args { From f7e66e4accaad8cb4056f46fad0e47be8f842ed4 Mon Sep 17 00:00:00 2001 From: callum-ryan <19956159+callum-ryan@users.noreply.github.com> Date: Sun, 11 Aug 2024 14:37:31 +0100 Subject: [PATCH 04/12] feat: add drop and tests Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> --- crates/catalog/sql/src/catalog.rs | 542 +++++++++++++++++++++++++++--- 1 file changed, 495 insertions(+), 47 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 0ea505132e..94552d8fbf 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::time::Duration; use async_trait::async_trait; @@ -217,6 +217,7 @@ impl Catalog for SqlCatalog { match parent { Some(parent) => match self.namespace_exists(parent).await? { true => { + let parent_str = parent.join("."); let parent_table_namespaces_stmt = format!( "{table_namespaces_stmt} AND {CATALOG_FIELD_TABLE_NAMESPACE} LIKE CONCAT(?, '%')" ); @@ -230,9 +231,9 @@ impl Catalog for SqlCatalog { ), vec![ Some(&self.name), - Some(&parent.join(".")), + Some(&parent_str), Some(&self.name), - Some(&parent.join(".")), + Some(&parent_str), ], ) .await?; @@ -241,7 +242,14 @@ impl Catalog for SqlCatalog { .iter() .filter_map(|r| { let nsp = r.try_get::(0).ok(); - nsp.and_then(|n| NamespaceIdent::from_strs(n.split('.')).ok()) + nsp.and_then(|n| { + if n == parent_str { + // Filter out itself + None + } else { + NamespaceIdent::from_strs(n.split(".")).ok() + } + }) }) .collect()) } @@ -259,9 +267,21 @@ impl Catalog for SqlCatalog { .iter() .filter_map(|r| { let nsp = r.try_get::(0).ok(); - nsp.and_then(|n| NamespaceIdent::from_strs(n.split('.')).ok()) + nsp.and_then(|n| { + // for each row, split a.b.c into a, b, c levels + let mut levels = n.split(".").collect::>(); + if !levels.is_empty() { + // only return first-level idents + let first_level = levels.drain(..1).collect::>(); + NamespaceIdent::from_strs(first_level).ok() + } else { + None + } + }) }) - .collect()) + .collect::>() + .into_iter() + .collect::>()) } } } @@ -272,52 +292,62 @@ impl Catalog for SqlCatalog { properties: HashMap, ) -> Result { let exists = self.namespace_exists(namespace).await?; + if exists { - Err(Error::new( + return Err(Error::new( iceberg::ErrorKind::Unexpected, format!("Namespace {:?} already exists", namespace), - )) - } else { - let namespace_str = namespace.join("."); - let insert = format!( - "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE}) - VALUES (?, ?, ?, ?)"); - if !properties.is_empty() { - let mut query_args = vec![]; - let mut properties_insert = insert.clone(); - for (index, (key, value)) in properties.iter().enumerate() { - query_args.extend( - [ - Some(&self.name), - Some(&namespace_str), - Some(key), - Some(value), - ] - .iter(), - ); - if index > 0 { - properties_insert = format!("{properties_insert}, (?, ?, ?, ?)"); - } - } + )); + } - self.execute(&properties_insert, query_args, None).await?; + for i in 1..namespace.len() { + let parent_namespace = NamespaceIdent::from_vec(namespace[..i].to_vec())?; + let parent_exists = self.namespace_exists(&parent_namespace).await?; + if !parent_exists { + return no_such_namespace_err(&parent_namespace); + } + } - Ok(Namespace::with_properties(namespace.clone(), properties)) - } else { - // set a default property of exists = true - self.execute( - &insert, - vec![ + let namespace_str = namespace.join("."); + let insert = format!( + "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE}) + VALUES (?, ?, ?, ?)"); + if !properties.is_empty() { + let mut query_args = vec![]; + let mut properties_insert = insert.clone(); + for (index, (key, value)) in properties.iter().enumerate() { + query_args.extend( + [ Some(&self.name), Some(&namespace_str), - Some(&"exists".to_string()), - Some(&"true".to_string()), - ], - None, - ) - .await?; - Ok(Namespace::with_properties(namespace.clone(), properties)) + Some(key), + Some(value), + ] + .iter(), + ); + if index > 0 { + properties_insert = format!("{properties_insert}, (?, ?, ?, ?)"); + } } + + self.execute(&properties_insert, query_args, None).await?; + + Ok(Namespace::with_properties(namespace.clone(), properties)) + } else { + // set a default property of exists = true + // up for debate if this is worthwhile + self.execute( + &insert, + vec![ + Some(&self.name), + Some(&namespace_str), + Some(&"exists".to_string()), + Some(&"true".to_string()), + ], + None, + ) + .await?; + Ok(Namespace::with_properties(namespace.clone(), properties)) } } @@ -466,8 +496,21 @@ impl Catalog for SqlCatalog { } } - async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> { - todo!() + async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> { + let exists = self.namespace_exists(namespace).await?; + if exists { + // TODO: check that the namespace is empty + self.execute( + &format!("DELETE FROM {NAMESPACE_TABLE_NAME} WHERE {NAMESPACE_FIELD_NAME} = ?"), + vec![Some(&namespace.join("."))], + None, + ) + .await?; + + Ok(()) + } else { + no_such_namespace_err(namespace) + } } async fn list_tables(&self, _namespace: &NamespaceIdent) -> Result> { @@ -505,8 +548,11 @@ impl Catalog for SqlCatalog { #[cfg(test)] mod tests { + use std::collections::{HashMap, HashSet}; + use std::hash::Hash; + use iceberg::io::FileIOBuilder; - use iceberg::Catalog; + use iceberg::{Catalog, Namespace, NamespaceIdent}; use sqlx::migrate::MigrateDatabase; use tempfile::TempDir; @@ -517,6 +563,14 @@ mod tests { temp_dir.path().to_str().unwrap().to_string() } + fn to_set(vec: Vec) -> HashSet { + HashSet::from_iter(vec) + } + + fn default_properties() -> HashMap { + HashMap::from([("exists".to_string(), "true".to_string())]) + } + async fn new_sql_catalog(warehouse_location: String) -> impl Catalog { let sql_lite_uri = format!("sqlite:{}", temp_path()); sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap(); @@ -532,6 +586,19 @@ mod tests { SqlCatalog::new(config).await.unwrap() } + async fn create_namespace(catalog: &C, namespace_ident: &NamespaceIdent) { + let _ = catalog + .create_namespace(namespace_ident, HashMap::new()) + .await + .unwrap(); + } + + async fn create_namespaces(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) { + for namespace_ident in namespace_idents { + let _ = create_namespace(catalog, namespace_ident).await; + } + } + #[tokio::test] async fn test_initialized() { let warehouse_loc = temp_path(); @@ -540,4 +607,385 @@ mod tests { new_sql_catalog(warehouse_loc.clone()).await; new_sql_catalog(warehouse_loc.clone()).await; } + + #[tokio::test] + async fn test_list_namespaces_returns_empty_vector() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + + assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]); + } + + #[tokio::test] + async fn test_list_namespaces_returns_multiple_namespaces() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident_1 = NamespaceIdent::new("a".into()); + let namespace_ident_2 = NamespaceIdent::new("b".into()); + create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await; + + assert_eq!( + to_set(catalog.list_namespaces(None).await.unwrap()), + to_set(vec![namespace_ident_1, namespace_ident_2]) + ); + } + + #[tokio::test] + async fn test_list_namespaces_returns_only_top_level_namespaces() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident_1 = NamespaceIdent::new("a".into()); + let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + let namespace_ident_3 = NamespaceIdent::new("b".into()); + create_namespaces(&catalog, &vec![ + &namespace_ident_1, + &namespace_ident_2, + &namespace_ident_3, + ]) + .await; + + assert_eq!( + to_set(catalog.list_namespaces(None).await.unwrap()), + to_set(vec![namespace_ident_1, namespace_ident_3]) + ); + } + + #[tokio::test] + async fn test_list_namespaces_returns_no_namespaces_under_parent() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident_1 = NamespaceIdent::new("a".into()); + let namespace_ident_2 = NamespaceIdent::new("b".into()); + create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await; + + assert_eq!( + catalog + .list_namespaces(Some(&namespace_ident_1)) + .await + .unwrap(), + vec![] + ); + } + + #[tokio::test] + async fn test_list_namespaces_returns_namespace_under_parent() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident_1 = NamespaceIdent::new("a".into()); + let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + let namespace_ident_3 = NamespaceIdent::new("c".into()); + create_namespaces(&catalog, &vec![ + &namespace_ident_1, + &namespace_ident_2, + &namespace_ident_3, + ]) + .await; + + assert_eq!( + to_set(catalog.list_namespaces(None).await.unwrap()), + to_set(vec![namespace_ident_1.clone(), namespace_ident_3]) + ); + + assert_eq!( + catalog + .list_namespaces(Some(&namespace_ident_1)) + .await + .unwrap(), + vec![NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()] + ); + } + + #[tokio::test] + async fn test_list_namespaces_returns_multiple_namespaces_under_parent() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident_1 = NamespaceIdent::new("a".to_string()); + let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap(); + let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap(); + let namespace_ident_5 = NamespaceIdent::new("b".into()); + create_namespaces(&catalog, &vec![ + &namespace_ident_1, + &namespace_ident_2, + &namespace_ident_3, + &namespace_ident_4, + &namespace_ident_5, + ]) + .await; + + assert_eq!( + to_set( + catalog + .list_namespaces(Some(&namespace_ident_1)) + .await + .unwrap() + ), + to_set(vec![ + NamespaceIdent::from_strs(vec!["a", "a"]).unwrap(), + NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(), + NamespaceIdent::from_strs(vec!["a", "c"]).unwrap(), + ]) + ); + } + + #[tokio::test] + async fn test_namespace_exists_returns_false() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + + assert!(!catalog + .namespace_exists(&NamespaceIdent::new("b".into())) + .await + .unwrap()); + } + + #[tokio::test] + async fn test_namespace_exists_returns_true() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + + assert!(catalog.namespace_exists(&namespace_ident).await.unwrap()); + } + + #[tokio::test] + async fn test_create_namespace_with_properties() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident = NamespaceIdent::new("abc".into()); + + let mut properties: HashMap = HashMap::new(); + properties.insert("k".into(), "v".into()); + + assert_eq!( + catalog + .create_namespace(&namespace_ident, properties.clone()) + .await + .unwrap(), + Namespace::with_properties(namespace_ident.clone(), properties.clone()) + ); + + assert_eq!( + catalog.get_namespace(&namespace_ident).await.unwrap(), + Namespace::with_properties(namespace_ident, properties) + ); + } + + #[tokio::test] + async fn test_create_namespace_throws_error_if_namespace_already_exists() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + + assert_eq!( + catalog + .create_namespace(&namespace_ident, HashMap::new()) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => Namespace {:?} already exists", + &namespace_ident + ) + ); + + assert_eq!( + catalog.get_namespace(&namespace_ident).await.unwrap(), + Namespace::with_properties(namespace_ident, default_properties()) + ); + } + + #[tokio::test] + async fn test_create_nested_namespace() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let parent_namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &parent_namespace_ident).await; + + let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + + assert_eq!( + catalog + .create_namespace(&child_namespace_ident, HashMap::new()) + .await + .unwrap(), + Namespace::new(child_namespace_ident.clone()) + ); + + assert_eq!( + catalog.get_namespace(&child_namespace_ident).await.unwrap(), + Namespace::with_properties(child_namespace_ident, default_properties()) + ); + } + + #[tokio::test] + async fn test_create_deeply_nested_namespace() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; + + let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); + + assert_eq!( + catalog + .create_namespace(&namespace_ident_a_b_c, HashMap::new()) + .await + .unwrap(), + Namespace::new(namespace_ident_a_b_c.clone()) + ); + + assert_eq!( + catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(), + Namespace::with_properties(namespace_ident_a_b_c, default_properties()) + ); + } + + #[tokio::test] + async fn test_create_nested_namespace_throws_error_if_top_level_namespace_doesnt_exist() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + + let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + + assert_eq!( + catalog + .create_namespace(&nested_namespace_ident, HashMap::new()) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such namespace: {:?}", + NamespaceIdent::new("a".into()) + ) + ); + + assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]); + } + + #[tokio::test] + async fn test_drop_namespace() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident = NamespaceIdent::new("abc".into()); + create_namespace(&catalog, &namespace_ident).await; + + catalog.drop_namespace(&namespace_ident).await.unwrap(); + + assert!(!catalog.namespace_exists(&namespace_ident).await.unwrap()) + } + + #[tokio::test] + async fn test_drop_nested_namespace() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; + + catalog.drop_namespace(&namespace_ident_a_b).await.unwrap(); + + assert!(!catalog + .namespace_exists(&namespace_ident_a_b) + .await + .unwrap()); + + assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap()); + } + + #[tokio::test] + async fn test_drop_deeply_nested_namespace() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); + create_namespaces(&catalog, &vec![ + &namespace_ident_a, + &namespace_ident_a_b, + &namespace_ident_a_b_c, + ]) + .await; + + catalog + .drop_namespace(&namespace_ident_a_b_c) + .await + .unwrap(); + + assert!(!catalog + .namespace_exists(&namespace_ident_a_b_c) + .await + .unwrap()); + + assert!(catalog + .namespace_exists(&namespace_ident_a_b) + .await + .unwrap()); + + assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap()); + } + + #[tokio::test] + async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + + let non_existent_namespace_ident = NamespaceIdent::new("abc".into()); + assert_eq!( + catalog + .drop_namespace(&non_existent_namespace_ident) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such namespace: {:?}", + non_existent_namespace_ident + ) + ) + } + + #[tokio::test] + async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + create_namespace(&catalog, &NamespaceIdent::new("a".into())).await; + + let non_existent_namespace_ident = + NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap(); + assert_eq!( + catalog + .drop_namespace(&non_existent_namespace_ident) + .await + .unwrap_err() + .to_string(), + format!( + "Unexpected => No such namespace: {:?}", + non_existent_namespace_ident + ) + ) + } + + #[tokio::test] + #[ignore = "Java/Python do not drop nested namespaces?"] + async fn test_dropping_a_namespace_also_drops_namespaces_nested_under_that_one() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc).await; + let namespace_ident_a = NamespaceIdent::new("a".into()); + let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); + create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; + + catalog.drop_namespace(&namespace_ident_a).await.unwrap(); + + assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap()); + + assert!(!catalog + .namespace_exists(&namespace_ident_a_b) + .await + .unwrap()); + } } From 4719a5069f949b129d52b1da1c238da4d545d9cf Mon Sep 17 00:00:00 2001 From: callum-ryan <19956159+callum-ryan@users.noreply.github.com> Date: Mon, 12 Aug 2024 18:30:31 +0100 Subject: [PATCH 05/12] fix: String to str, remove pub and optimise query builder Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> --- crates/catalog/sql/src/catalog.rs | 50 +++++++++++++++++-------------- 1 file changed, 27 insertions(+), 23 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 94552d8fbf..9c1dc79f34 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -142,24 +142,30 @@ impl SqlCatalog { } /// SQLX Any does not implement PostgresSQL bindings, so we have to do this. - pub fn build_query(&self, query: &str) -> String { + fn replace_placeholders(&self, query: &str) -> String { match self.sql_bind_style { SqlBindStyle::DollarNumeric => { - let mut query = query.to_owned(); - let mut i = 1; - while let Some(pos) = query.find('?') { - query.replace_range(pos..pos + 1, &format!("${}", i)); - i += 1; - } + let mut count = 1; query + .chars() + .fold(String::with_capacity(query.len()), |mut acc, c| { + if c == '?' { + acc.push('$'); + acc.push_str(&count.to_string()); + count += 1; + } else { + acc.push(c); + } + acc + }) } _ => query.to_owned(), } } /// Fetch a vec of AnyRows from a given query - pub async fn fetch_rows(&self, query: &str, args: Vec>) -> Result> { - let query_with_placeholders = self.build_query(query); + pub async fn fetch_rows(&self, query: &str, args: Vec>) -> Result> { + let query_with_placeholders = self.replace_placeholders(query); let mut sqlx_query = sqlx::query(&query_with_placeholders); for arg in args { @@ -171,14 +177,15 @@ impl SqlCatalog { .await .map_err(from_sqlx_error) } + /// Execute statements in a transaction, provided or not pub async fn execute( &self, query: &str, - args: Vec>, + args: Vec>, transaction: Option<&mut Transaction<'_, Any>>, ) -> Result { - let query_with_placeholders = self.build_query(query); + let query_with_placeholders = self.replace_placeholders(query); let mut sqlx_query = sqlx::query(&query_with_placeholders); for arg in args { @@ -313,18 +320,15 @@ impl Catalog for SqlCatalog { "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE}) VALUES (?, ?, ?, ?)"); if !properties.is_empty() { - let mut query_args = vec![]; + let mut query_args = Vec::with_capacity(properties.len() * 4); let mut properties_insert = insert.clone(); for (index, (key, value)) in properties.iter().enumerate() { - query_args.extend( - [ - Some(&self.name), - Some(&namespace_str), - Some(key), - Some(value), - ] - .iter(), - ); + query_args.extend_from_slice(&[ + Some(self.name.as_str()), + Some(namespace_str.as_str()), + Some(key.as_str()), + Some(value.as_str()), + ]); if index > 0 { properties_insert = format!("{properties_insert}, (?, ?, ?, ?)"); } @@ -341,8 +345,8 @@ impl Catalog for SqlCatalog { vec![ Some(&self.name), Some(&namespace_str), - Some(&"exists".to_string()), - Some(&"true".to_string()), + Some("exists"), + Some("true"), ], None, ) From 1da80b842c85b4fe77e890b1e4ae05e32d8faa8f Mon Sep 17 00:00:00 2001 From: callum-ryan <19956159+callum-ryan@users.noreply.github.com> Date: Tue, 13 Aug 2024 19:24:18 +0100 Subject: [PATCH 06/12] fix: nested match, remove ok() Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> --- crates/catalog/sql/src/catalog.rs | 87 ++++++++++++++----------------- 1 file changed, 39 insertions(+), 48 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 9c1dc79f34..916b86fb9b 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -222,8 +222,8 @@ impl Catalog for SqlCatalog { ); match parent { - Some(parent) => match self.namespace_exists(parent).await? { - true => { + Some(parent) => { + if self.namespace_exists(parent).await? { let parent_str = parent.join("."); let parent_table_namespaces_stmt = format!( "{table_namespaces_stmt} AND {CATALOG_FIELD_TABLE_NAMESPACE} LIKE CONCAT(?, '%')" @@ -245,23 +245,20 @@ impl Catalog for SqlCatalog { ) .await?; - Ok(namespace_rows - .iter() - .filter_map(|r| { - let nsp = r.try_get::(0).ok(); - nsp.and_then(|n| { - if n == parent_str { - // Filter out itself - None - } else { - NamespaceIdent::from_strs(n.split(".")).ok() - } - }) - }) - .collect()) + let mut namespaces = Vec::::with_capacity(namespace_rows.len()); + + for row in namespace_rows { + let nsp = row.try_get::(0).map_err(from_sqlx_error)?; + if nsp != parent_str { + namespaces.push(NamespaceIdent::from_strs(nsp.split("."))?); + } + } + + Ok(namespaces) + } else { + no_such_namespace_err(parent) } - false => no_such_namespace_err(parent), - }, + } None => { let namespace_rows = self .fetch_rows( @@ -270,25 +267,18 @@ impl Catalog for SqlCatalog { ) .await?; - Ok(namespace_rows - .iter() - .filter_map(|r| { - let nsp = r.try_get::(0).ok(); - nsp.and_then(|n| { - // for each row, split a.b.c into a, b, c levels - let mut levels = n.split(".").collect::>(); - if !levels.is_empty() { - // only return first-level idents - let first_level = levels.drain(..1).collect::>(); - NamespaceIdent::from_strs(first_level).ok() - } else { - None - } - }) - }) - .collect::>() - .into_iter() - .collect::>()) + let mut namespaces = HashSet::::with_capacity(namespace_rows.len()); + + for row in namespace_rows.iter() { + let nsp = row.try_get::(0).map_err(from_sqlx_error)?; + let mut levels = nsp.split(".").collect::>(); + if !levels.is_empty() { + let first_level = levels.drain(..1).collect::>(); + namespaces.insert(NamespaceIdent::from_strs(first_level)?); + } + } + + Ok(namespaces.into_iter().collect::>()) } } } @@ -373,17 +363,18 @@ impl Catalog for SqlCatalog { ) .await?; - let properties: HashMap = namespace_props - .iter() - .filter_map(|r| { - let key = r.try_get(NAMESPACE_FIELD_PROPERTY_KEY).ok(); - let value = r.try_get(NAMESPACE_FIELD_PROPERTY_VALUE).ok(); - match (key, value) { - (Some(k), Some(v)) => Some((k, v)), - _ => None, - } - }) - .collect(); + let mut properties = HashMap::with_capacity(namespace_props.len()); + + for row in namespace_props { + let key = row + .try_get::(NAMESPACE_FIELD_PROPERTY_KEY) + .map_err(from_sqlx_error)?; + let value = row + .try_get::(NAMESPACE_FIELD_PROPERTY_VALUE) + .map_err(from_sqlx_error)?; + + properties.insert(key, value); + } Ok(Namespace::with_properties(namespace.clone(), properties)) } else { From 328a6e5a984f3c243a560f86cebbf5b0fd9baf3a Mon Sep 17 00:00:00 2001 From: callum-ryan <19956159+callum-ryan@users.noreply.github.com> Date: Mon, 19 Aug 2024 19:09:49 +0100 Subject: [PATCH 07/12] fix: remove pub, add set, add comments Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> --- crates/catalog/sql/src/catalog.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 916b86fb9b..8ee77d1113 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -164,7 +164,7 @@ impl SqlCatalog { } /// Fetch a vec of AnyRows from a given query - pub async fn fetch_rows(&self, query: &str, args: Vec>) -> Result> { + async fn fetch_rows(&self, query: &str, args: Vec>) -> Result> { let query_with_placeholders = self.replace_placeholders(query); let mut sqlx_query = sqlx::query(&query_with_placeholders); @@ -179,7 +179,7 @@ impl SqlCatalog { } /// Execute statements in a transaction, provided or not - pub async fn execute( + async fn execute( &self, query: &str, args: Vec>, @@ -245,16 +245,18 @@ impl Catalog for SqlCatalog { ) .await?; - let mut namespaces = Vec::::with_capacity(namespace_rows.len()); + let mut namespaces = + HashSet::::with_capacity(namespace_rows.len()); for row in namespace_rows { let nsp = row.try_get::(0).map_err(from_sqlx_error)?; + // if parent = a, then we only want to see a.b, a.c returned. if nsp != parent_str { - namespaces.push(NamespaceIdent::from_strs(nsp.split("."))?); + namespaces.insert(NamespaceIdent::from_strs(nsp.split("."))?); } } - Ok(namespaces) + Ok(namespaces.into_iter().collect::>()) } else { no_such_namespace_err(parent) } From b0b731c2ed2cab8ffd3df381b17f989e8b8f6380 Mon Sep 17 00:00:00 2001 From: callum-ryan <19956159+callum-ryan@users.noreply.github.com> Date: Mon, 19 Aug 2024 20:34:06 +0100 Subject: [PATCH 08/12] fix: refactor list_namespaces slightly Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> --- crates/catalog/sql/src/catalog.rs | 87 +++++++++++-------------------- 1 file changed, 31 insertions(+), 56 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 8ee77d1113..9065672da0 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -210,78 +210,53 @@ impl Catalog for SqlCatalog { &self, parent: Option<&NamespaceIdent>, ) -> Result> { - let table_namespaces_stmt = format!( + // UNION will remove duplicates. + let all_namespaces_stmt = format!( "SELECT {CATALOG_FIELD_TABLE_NAMESPACE} FROM {CATALOG_TABLE_NAME} - WHERE {CATALOG_FIELD_CATALOG_NAME} = ?" - ); - let namespaces_stmt = format!( - "SELECT {NAMESPACE_FIELD_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + UNION + SELECT {NAMESPACE_FIELD_NAME} FROM {NAMESPACE_TABLE_NAME} WHERE {CATALOG_FIELD_CATALOG_NAME} = ?" ); - match parent { - Some(parent) => { - if self.namespace_exists(parent).await? { - let parent_str = parent.join("."); - let parent_table_namespaces_stmt = format!( - "{table_namespaces_stmt} AND {CATALOG_FIELD_TABLE_NAMESPACE} LIKE CONCAT(?, '%')" - ); - let parent_namespaces_stmt = - format!("{namespaces_stmt} AND {NAMESPACE_FIELD_NAME} LIKE CONCAT(?, '%')"); - - let namespace_rows = self - .fetch_rows( - &format!( - "{parent_namespaces_stmt} UNION {parent_table_namespaces_stmt}" - ), - vec![ - Some(&self.name), - Some(&parent_str), - Some(&self.name), - Some(&parent_str), - ], - ) - .await?; - - let mut namespaces = - HashSet::::with_capacity(namespace_rows.len()); - - for row in namespace_rows { - let nsp = row.try_get::(0).map_err(from_sqlx_error)?; - // if parent = a, then we only want to see a.b, a.c returned. - if nsp != parent_str { - namespaces.insert(NamespaceIdent::from_strs(nsp.split("."))?); - } - } + let namespace_rows = self + .fetch_rows(&all_namespaces_stmt, vec![ + Some(&self.name), + Some(&self.name), + ]) + .await?; - Ok(namespaces.into_iter().collect::>()) - } else { - no_such_namespace_err(parent) - } - } - None => { - let namespace_rows = self - .fetch_rows( - &format!("{namespaces_stmt} UNION {table_namespaces_stmt}"), - vec![Some(&self.name), Some(&self.name)], - ) - .await?; + let mut namespaces = HashSet::::with_capacity(namespace_rows.len()); - let mut namespaces = HashSet::::with_capacity(namespace_rows.len()); + if let Some(parent) = parent { + if self.namespace_exists(parent).await? { + let parent_str = parent.join("."); for row in namespace_rows.iter() { let nsp = row.try_get::(0).map_err(from_sqlx_error)?; - let mut levels = nsp.split(".").collect::>(); - if !levels.is_empty() { - let first_level = levels.drain(..1).collect::>(); - namespaces.insert(NamespaceIdent::from_strs(first_level)?); + // if parent = a, then we only want to see a.b, a.c returned. + if nsp != parent_str && nsp.starts_with(&parent_str) { + namespaces.insert(NamespaceIdent::from_strs(nsp.split("."))?); } } Ok(namespaces.into_iter().collect::>()) + } else { + no_such_namespace_err(parent) } + } else { + for row in namespace_rows.iter() { + let nsp = row.try_get::(0).map_err(from_sqlx_error)?; + let mut levels = nsp.split(".").collect::>(); + if !levels.is_empty() { + let first_level = levels.drain(..1).collect::>(); + namespaces.insert(NamespaceIdent::from_strs(first_level)?); + } + } + + Ok(namespaces.into_iter().collect::>()) } } From b63ecd54c20013fa7c5cc18649f44553b6468f90 Mon Sep 17 00:00:00 2001 From: callum-ryan <19956159+callum-ryan@users.noreply.github.com> Date: Wed, 21 Aug 2024 19:48:33 +0100 Subject: [PATCH 09/12] fix: add default properties to all new namespaces Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> --- crates/catalog/sql/src/catalog.rs | 43 ++++++++++++++----------------- 1 file changed, 20 insertions(+), 23 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 9065672da0..4608e45635 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -287,9 +287,12 @@ impl Catalog for SqlCatalog { "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE}) VALUES (?, ?, ?, ?)"); if !properties.is_empty() { - let mut query_args = Vec::with_capacity(properties.len() * 4); - let mut properties_insert = insert.clone(); - for (index, (key, value)) in properties.iter().enumerate() { + let mut insert_properties = properties.clone(); + insert_properties.insert("exists".to_string(), "true".to_string()); + + let mut query_args = Vec::with_capacity(insert_properties.len() * 4); + let mut insert_stmt = insert.clone(); + for (index, (key, value)) in insert_properties.iter().enumerate() { query_args.extend_from_slice(&[ Some(self.name.as_str()), Some(namespace_str.as_str()), @@ -297,16 +300,18 @@ impl Catalog for SqlCatalog { Some(value.as_str()), ]); if index > 0 { - properties_insert = format!("{properties_insert}, (?, ?, ?, ?)"); + insert_stmt.push_str(", (?, ?, ?, ?)"); } } - self.execute(&properties_insert, query_args, None).await?; + self.execute(&insert_stmt, query_args, None).await?; - Ok(Namespace::with_properties(namespace.clone(), properties)) + Ok(Namespace::with_properties( + namespace.clone(), + insert_properties, + )) } else { // set a default property of exists = true - // up for debate if this is worthwhile self.execute( &insert, vec![ @@ -468,21 +473,8 @@ impl Catalog for SqlCatalog { } } - async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> { - let exists = self.namespace_exists(namespace).await?; - if exists { - // TODO: check that the namespace is empty - self.execute( - &format!("DELETE FROM {NAMESPACE_TABLE_NAME} WHERE {NAMESPACE_FIELD_NAME} = ?"), - vec![Some(&namespace.join("."))], - None, - ) - .await?; - - Ok(()) - } else { - no_such_namespace_err(namespace) - } + async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> { + todo!() } async fn list_tables(&self, _namespace: &NamespaceIdent) -> Result> { @@ -729,7 +721,7 @@ mod tests { let catalog = new_sql_catalog(warehouse_loc).await; let namespace_ident = NamespaceIdent::new("abc".into()); - let mut properties: HashMap = HashMap::new(); + let mut properties = default_properties(); properties.insert("k".into(), "v".into()); assert_eq!( @@ -841,6 +833,7 @@ mod tests { } #[tokio::test] + #[ignore = "drop_namespace not implemented"] async fn test_drop_namespace() { let warehouse_loc = temp_path(); let catalog = new_sql_catalog(warehouse_loc).await; @@ -853,6 +846,7 @@ mod tests { } #[tokio::test] + #[ignore = "drop_namespace not implemented"] async fn test_drop_nested_namespace() { let warehouse_loc = temp_path(); let catalog = new_sql_catalog(warehouse_loc).await; @@ -871,6 +865,7 @@ mod tests { } #[tokio::test] + #[ignore = "drop_namespace not implemented"] async fn test_drop_deeply_nested_namespace() { let warehouse_loc = temp_path(); let catalog = new_sql_catalog(warehouse_loc).await; @@ -903,6 +898,7 @@ mod tests { } #[tokio::test] + #[ignore = "drop_namespace not implemented"] async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() { let warehouse_loc = temp_path(); let catalog = new_sql_catalog(warehouse_loc).await; @@ -922,6 +918,7 @@ mod tests { } #[tokio::test] + #[ignore = "drop_namespace not implemented"] async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() { let warehouse_loc = temp_path(); let catalog = new_sql_catalog(warehouse_loc).await; From e4c8bdfeafe195f1bbb1c4d16d34696de8bdbeec Mon Sep 17 00:00:00 2001 From: callum-ryan <19956159+callum-ryan@users.noreply.github.com> Date: Sat, 31 Aug 2024 15:50:21 +0100 Subject: [PATCH 10/12] fix: remove check for nested namespace Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> --- crates/catalog/sql/src/catalog.rs | 30 ------------------------------ 1 file changed, 30 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 4608e45635..d2b13d39df 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -274,14 +274,6 @@ impl Catalog for SqlCatalog { )); } - for i in 1..namespace.len() { - let parent_namespace = NamespaceIdent::from_vec(namespace[..i].to_vec())?; - let parent_exists = self.namespace_exists(&parent_namespace).await?; - if !parent_exists { - return no_such_namespace_err(&parent_namespace); - } - } - let namespace_str = namespace.join("."); let insert = format!( "INSERT INTO {NAMESPACE_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}, {NAMESPACE_FIELD_PROPERTY_VALUE}) @@ -810,28 +802,6 @@ mod tests { ); } - #[tokio::test] - async fn test_create_nested_namespace_throws_error_if_top_level_namespace_doesnt_exist() { - let warehouse_loc = temp_path(); - let catalog = new_sql_catalog(warehouse_loc).await; - - let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); - - assert_eq!( - catalog - .create_namespace(&nested_namespace_ident, HashMap::new()) - .await - .unwrap_err() - .to_string(), - format!( - "Unexpected => No such namespace: {:?}", - NamespaceIdent::new("a".into()) - ) - ); - - assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]); - } - #[tokio::test] #[ignore = "drop_namespace not implemented"] async fn test_drop_namespace() { From f4ff9bd11bcc0d33c4b2ca9e48c96240431bfe48 Mon Sep 17 00:00:00 2001 From: callum-ryan <19956159+callum-ryan@users.noreply.github.com> Date: Sun, 1 Sep 2024 11:52:12 +0100 Subject: [PATCH 11/12] chore: add more comments to the CatalogConfig to explain bind styles Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> --- crates/catalog/sql/src/catalog.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index d2b13d39df..b93cf13bec 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -47,7 +47,14 @@ static MAX_CONNECTIONS: u32 = 10; // Default the SQL pool to 10 connections if n static IDLE_TIMEOUT: u64 = 10; // Default the maximum idle timeout per connection to 10s before it is closed static TEST_BEFORE_ACQUIRE: bool = true; // Default the health-check of each connection to enabled prior to returning -/// Sql catalog config +/// A struct representing the SQL catalog configuration. +/// +/// This struct contains various parameters that are used to configure a SQL catalog, +/// such as the database URI, warehouse location, and file I/O settings. +/// You are required to provide a `SqlBindStyle`, which determines how SQL statements will be bound to values in the catalog. +/// The options available for this parameter include: +/// - `SqlBindStyle::DollarNumeric`: Binds SQL statements using `$1`, `$2`, etc., as placeholders. This is for PostgreSQL databases. +/// - `SqlBindStyle::QuestionMark`: Binds SQL statements using `?` as a placeholder. This is for MySQL and SQLite databases. #[derive(Debug, TypedBuilder)] pub struct SqlCatalogConfig { uri: String, From 2611fe2bacdb8162631c2e5fa627f53df1ec2cbc Mon Sep 17 00:00:00 2001 From: callum-ryan <19956159+callum-ryan@users.noreply.github.com> Date: Mon, 2 Sep 2024 18:42:02 +0100 Subject: [PATCH 12/12] fix: edit test for nested namespaces Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> --- crates/catalog/sql/src/catalog.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index b93cf13bec..c6a524cea1 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -917,8 +917,8 @@ mod tests { } #[tokio::test] - #[ignore = "Java/Python do not drop nested namespaces?"] - async fn test_dropping_a_namespace_also_drops_namespaces_nested_under_that_one() { + #[ignore = "drop_namespace not implemented"] + async fn test_dropping_a_namespace_does_not_drop_namespaces_nested_under_that_one() { let warehouse_loc = temp_path(); let catalog = new_sql_catalog(warehouse_loc).await; let namespace_ident_a = NamespaceIdent::new("a".into()); @@ -929,7 +929,7 @@ mod tests { assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap()); - assert!(!catalog + assert!(catalog .namespace_exists(&namespace_ident_a_b) .await .unwrap());