From edabc14dd77ddf1f5a6223a04de3b5c17a68eae4 Mon Sep 17 00:00:00 2001 From: callum-ryan <19956159+callum-ryan@users.noreply.github.com> Date: Mon, 5 Aug 2024 18:57:51 +0100 Subject: [PATCH 1/3] feat: initialise SQL Catalog Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> --- crates/catalog/sql/Cargo.toml | 43 +++++ crates/catalog/sql/src/catalog.rs | 283 ++++++++++++++++++++++++++++++ crates/catalog/sql/src/error.rs | 27 +++ crates/catalog/sql/src/lib.rs | 24 +++ 4 files changed, 377 insertions(+) create mode 100644 crates/catalog/sql/Cargo.toml create mode 100644 crates/catalog/sql/src/catalog.rs create mode 100644 crates/catalog/sql/src/error.rs create mode 100644 crates/catalog/sql/src/lib.rs diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml new file mode 100644 index 0000000000..292e60171a --- /dev/null +++ b/crates/catalog/sql/Cargo.toml @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iceberg-catalog-sql" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +rust-version = { workspace = true } + +categories = ["database"] +description = "Apache Iceberg Rust Sql Catalog" +repository = { workspace = true } +license = { workspace = true } +keywords = ["iceberg", "sql", "catalog"] + +[dependencies] +async-trait = { workspace = true } +iceberg = { workspace = true } +sqlx = { version = "0.7.4", features = ["tls-rustls", "any"], default-features = false } +typed-builder = { workspace = true } + +[dev-dependencies] +iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } +itertools = { workspace = true } +regex = "1.10.5" +sqlx = { version = "0.7.4", features = ["tls-rustls", "runtime-tokio", "any", "sqlite", "migrate"], default-features = false } +tempfile = { workspace = true } +tokio = { workspace = true } diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs new file mode 100644 index 0000000000..e4cd9ebf45 --- /dev/null +++ b/crates/catalog/sql/src/catalog.rs @@ -0,0 +1,283 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::borrow::Cow; +use std::collections::HashMap; +use std::time::Duration; + +use async_trait::async_trait; +use iceberg::io::FileIO; +use iceberg::table::Table; +use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent}; +use sqlx::any::{install_default_drivers, AnyPoolOptions, AnyRow}; +use sqlx::AnyPool; +use typed_builder::TypedBuilder; + +use crate::error::from_sqlx_error; + +static CATALOG_TABLE_VIEW_NAME: &str = "iceberg_tables"; +static CATALOG_NAME: &str = "catalog_name"; +static TABLE_NAME: &str = "table_name"; +static TABLE_NAMESPACE: &str = "table_namespace"; +static METADATA_LOCATION_PROP: &str = "metadata_location"; +static PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location"; +static RECORD_TYPE: &str = "iceberg_type"; + +static NAMESPACE_PROPERTIES_TABLE_NAME: &str = "iceberg_namespace_properties"; +static NAMESPACE_NAME: &str = "namespace"; +static NAMESPACE_PROPERTY_KEY: &str = "property_key"; +static NAMESPACE_PROPERTY_VALUE: &str = "property_value"; + +static MAX_CONNECTIONS: u32 = 10; +static IDLE_TIMEOUT: u64 = 10; +static TEST_BEFORE_ACQUIRE: bool = true; + +/// Sql catalog config +#[derive(Debug, TypedBuilder)] +pub struct SqlCatalogConfig { + uri: String, + name: String, + warehouse_location: String, + file_io: FileIO, + #[builder(default)] + props: HashMap, +} + +#[derive(Debug)] +/// Sql catalog implementation. +pub struct SqlCatalog { + _name: String, + connection: AnyPool, + _warehouse_location: String, + _fileio: FileIO, + backend: DatabaseType, +} + +#[derive(Debug, PartialEq)] +enum DatabaseType { + PostgreSQL, + MySQL, + SQLite, +} + +impl SqlCatalog { + /// Create new sql catalog instance + pub async fn new(config: SqlCatalogConfig) -> Result { + install_default_drivers(); + let max_connections: u32 = config + .props + .get("pool.max-connections") + .map(|v| v.parse().unwrap()) + .unwrap_or(MAX_CONNECTIONS); + let idle_timeout: u64 = config + .props + .get("pool.idle-timeout") + .map(|v| v.parse().unwrap()) + .unwrap_or(IDLE_TIMEOUT); + let test_before_acquire: bool = config + .props + .get("pool.test-before-acquire") + .map(|v| v.parse().unwrap()) + .unwrap_or(TEST_BEFORE_ACQUIRE); + + let pool = AnyPoolOptions::new() + .max_connections(max_connections) + .idle_timeout(Duration::from_secs(idle_timeout)) + .test_before_acquire(test_before_acquire) + .connect(&config.uri) + .await + .map_err(from_sqlx_error)?; + + let conn = pool.acquire().await.map_err(from_sqlx_error)?; + + let db_type = match conn.backend_name() { + "PostgreSQL" => DatabaseType::PostgreSQL, + "MySQL" => DatabaseType::MySQL, + "SQLite" => DatabaseType::SQLite, + _ => DatabaseType::SQLite, + }; + + sqlx::query(&format!( + "CREATE TABLE IF NOT EXISTS {CATALOG_TABLE_VIEW_NAME} ( + {CATALOG_NAME} VARCHAR(255) NOT NULL, + {TABLE_NAMESPACE} VARCHAR(255) NOT NULL, + {TABLE_NAME} VARCHAR(255) NOT NULL, + {METADATA_LOCATION_PROP} VARCHAR(1000), + {PREVIOUS_METADATA_LOCATION_PROP} VARCHAR(1000), + {RECORD_TYPE} VARCHAR(5), + PRIMARY KEY ({CATALOG_NAME}, {TABLE_NAMESPACE}, {TABLE_NAME}))" + )) + .execute(&pool) + .await + .map_err(from_sqlx_error)?; + + sqlx::query(&format!( + "CREATE TABLE IF NOT EXISTS {NAMESPACE_PROPERTIES_TABLE_NAME} ( + {CATALOG_NAME} VARCHAR(255) NOT NULL, + {NAMESPACE_NAME} VARCHAR(255) NOT NULL, + {NAMESPACE_PROPERTY_KEY} VARCHAR(255), + {NAMESPACE_PROPERTY_VALUE} VARCHAR(1000), + PRIMARY KEY ({CATALOG_NAME}, {NAMESPACE_NAME}, {NAMESPACE_PROPERTY_KEY}))" + )) + .execute(&pool) + .await + .map_err(from_sqlx_error)?; + + Ok(SqlCatalog { + _name: config.name.to_owned(), + connection: pool, + _warehouse_location: config.warehouse_location, + _fileio: config.file_io, + backend: db_type, + }) + } + + /// SQLX Any does not implement PostgresSQL bindings, so we have to do this. + pub async fn execute_statement( + &self, + query: &String, + args: Vec>, + ) -> Result> { + let query_with_placeholders: Cow = if self.backend == DatabaseType::PostgreSQL { + let mut query = query.clone(); + for i in 0..args.len() { + query = query.replacen("?", &format!("${}", i + 1), 1); + } + Cow::Owned(query) + } else { + Cow::Borrowed(query) + }; + + let mut sqlx_query = sqlx::query(&query_with_placeholders); + for arg in args { + sqlx_query = sqlx_query.bind(arg); + } + + sqlx_query + .fetch_all(&self.connection) + .await + .map_err(from_sqlx_error) + } +} + +#[async_trait] +impl Catalog for SqlCatalog { + async fn list_namespaces( + &self, + _parent: Option<&NamespaceIdent>, + ) -> Result> { + todo!() + } + + async fn create_namespace( + &self, + _namespace: &NamespaceIdent, + _properties: HashMap, + ) -> Result { + todo!() + } + + async fn get_namespace(&self, _namespace: &NamespaceIdent) -> Result { + todo!() + } + + async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> Result { + todo!() + } + + async fn update_namespace( + &self, + _namespace: &NamespaceIdent, + _properties: HashMap, + ) -> Result<()> { + todo!() + } + + async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> { + todo!() + } + + async fn list_tables(&self, _namespace: &NamespaceIdent) -> Result> { + todo!() + } + + async fn table_exists(&self, _identifier: &TableIdent) -> Result { + todo!() + } + + async fn drop_table(&self, _identifier: &TableIdent) -> Result<()> { + todo!() + } + + async fn load_table(&self, _identifier: &TableIdent) -> Result { + todo!() + } + + async fn create_table( + &self, + _namespace: &NamespaceIdent, + _creation: TableCreation, + ) -> Result
{ + todo!() + } + + async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> Result<()> { + todo!() + } + + async fn update_table(&self, _commit: TableCommit) -> Result
{ + todo!() + } +} + +#[cfg(test)] +mod tests { + use iceberg::io::FileIOBuilder; + use iceberg::Catalog; + use sqlx::migrate::MigrateDatabase; + use tempfile::TempDir; + + use crate::{SqlCatalog, SqlCatalogConfig}; + + fn temp_path() -> String { + let temp_dir = TempDir::new().unwrap(); + temp_dir.path().to_str().unwrap().to_string() + } + + async fn new_sql_catalog(warehouse_location: String) -> impl Catalog { + let sql_lite_uri = format!("sqlite:{}", temp_path()); + sqlx::Sqlite::create_database(&sql_lite_uri).await.unwrap(); + + let config = SqlCatalogConfig::builder() + .uri(sql_lite_uri.to_string()) + .name("iceberg".to_string()) + .warehouse_location(warehouse_location) + .file_io(FileIOBuilder::new_fs_io().build().unwrap()) + .build(); + + SqlCatalog::new(config).await.unwrap() + } + + #[tokio::test] + async fn test_initialized() { + let warehouse_loc = temp_path(); + new_sql_catalog(warehouse_loc.clone()).await; + // catalog instantiation should not fail even if tables exist + new_sql_catalog(warehouse_loc.clone()).await; + new_sql_catalog(warehouse_loc.clone()).await; + } +} diff --git a/crates/catalog/sql/src/error.rs b/crates/catalog/sql/src/error.rs new file mode 100644 index 0000000000..90bba1f05d --- /dev/null +++ b/crates/catalog/sql/src/error.rs @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use iceberg::{Error, ErrorKind}; + +/// Format an sqlx error into iceberg error. +pub fn from_sqlx_error(error: sqlx::Error) -> Error { + Error::new( + ErrorKind::Unexpected, + "operation failed for hitting sqlx error".to_string(), + ) + .with_source(error) +} diff --git a/crates/catalog/sql/src/lib.rs b/crates/catalog/sql/src/lib.rs new file mode 100644 index 0000000000..6861dab3f8 --- /dev/null +++ b/crates/catalog/sql/src/lib.rs @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Iceberg sql catalog implementation. + +#![deny(missing_docs)] + +mod catalog; +mod error; +pub use catalog::*; From acc8b6e0773ff64443254acc3f71f9e11ecc9209 Mon Sep 17 00:00:00 2001 From: callum-ryan <19956159+callum-ryan@users.noreply.github.com> Date: Tue, 6 Aug 2024 19:18:06 +0100 Subject: [PATCH 2/3] fix: remove rls-rustls Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> --- crates/catalog/sql/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml index 292e60171a..5d145351c0 100644 --- a/crates/catalog/sql/Cargo.toml +++ b/crates/catalog/sql/Cargo.toml @@ -31,7 +31,7 @@ keywords = ["iceberg", "sql", "catalog"] [dependencies] async-trait = { workspace = true } iceberg = { workspace = true } -sqlx = { version = "0.7.4", features = ["tls-rustls", "any"], default-features = false } +sqlx = { version = "0.7.4", features = ["any"], default-features = false } typed-builder = { workspace = true } [dev-dependencies] From 7343b79efb405a5c53983ce7a1a1d9fa322c858e Mon Sep 17 00:00:00 2001 From: callum-ryan <19956159+callum-ryan@users.noreply.github.com> Date: Tue, 6 Aug 2024 19:19:06 +0100 Subject: [PATCH 3/3] feat: change to SqlBindStyle and rename consts Signed-off-by: callum-ryan <19956159+callum-ryan@users.noreply.github.com> --- crates/catalog/sql/src/catalog.rs | 106 ++++++++++++++---------------- 1 file changed, 51 insertions(+), 55 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index e4cd9ebf45..078fff6903 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -29,22 +29,22 @@ use typed_builder::TypedBuilder; use crate::error::from_sqlx_error; -static CATALOG_TABLE_VIEW_NAME: &str = "iceberg_tables"; -static CATALOG_NAME: &str = "catalog_name"; -static TABLE_NAME: &str = "table_name"; -static TABLE_NAMESPACE: &str = "table_namespace"; -static METADATA_LOCATION_PROP: &str = "metadata_location"; -static PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location"; -static RECORD_TYPE: &str = "iceberg_type"; - -static NAMESPACE_PROPERTIES_TABLE_NAME: &str = "iceberg_namespace_properties"; -static NAMESPACE_NAME: &str = "namespace"; -static NAMESPACE_PROPERTY_KEY: &str = "property_key"; -static NAMESPACE_PROPERTY_VALUE: &str = "property_value"; - -static MAX_CONNECTIONS: u32 = 10; -static IDLE_TIMEOUT: u64 = 10; -static TEST_BEFORE_ACQUIRE: bool = true; +static CATALOG_TABLE_NAME: &str = "iceberg_tables"; +static CATALOG_FIELD_CATALOG_NAME: &str = "catalog_name"; +static CATALOG_FIELD_TABLE_NAME: &str = "table_name"; +static CATALOG_FIELD_TABLE_NAMESPACE: &str = "table_namespace"; +static CATALOG_FIELD_METADATA_LOCATION_PROP: &str = "metadata_location"; +static CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location"; +static CATALOG_FIELD_RECORD_TYPE: &str = "iceberg_type"; + +static NAMESPACE_TABLE_NAME: &str = "iceberg_namespace_properties"; +static NAMESPACE_FIELD_NAME: &str = "namespace"; +static NAMESPACE_FIELD_PROPERTY_KEY: &str = "property_key"; +static NAMESPACE_FIELD_PROPERTY_VALUE: &str = "property_value"; + +static MAX_CONNECTIONS: u32 = 10; // Default the SQL pool to 10 connections if not provided +static IDLE_TIMEOUT: u64 = 10; // Default the maximum idle timeout per connection to 10s before it is closed +static TEST_BEFORE_ACQUIRE: bool = true; // Default the health-check of each connection to enabled prior to returning /// Sql catalog config #[derive(Debug, TypedBuilder)] @@ -53,6 +53,7 @@ pub struct SqlCatalogConfig { name: String, warehouse_location: String, file_io: FileIO, + sql_bind_style: SqlBindStyle, #[builder(default)] props: HashMap, } @@ -64,14 +65,16 @@ pub struct SqlCatalog { connection: AnyPool, _warehouse_location: String, _fileio: FileIO, - backend: DatabaseType, + sql_bind_style: SqlBindStyle, } #[derive(Debug, PartialEq)] -enum DatabaseType { - PostgreSQL, - MySQL, - SQLite, +/// Set the SQL parameter bind style to either $1..$N (Postgres style) or ? (SQLite/MySQL/MariaDB) +pub enum SqlBindStyle { + /// DollarNumeric uses parameters of the form `$1..$N``, which is the Postgres style + DollarNumeric, + /// QMark uses parameters of the form `?` which is the style for other dialects (SQLite/MySQL/MariaDB) + QMark, } impl SqlCatalog { @@ -102,36 +105,27 @@ impl SqlCatalog { .await .map_err(from_sqlx_error)?; - let conn = pool.acquire().await.map_err(from_sqlx_error)?; - - let db_type = match conn.backend_name() { - "PostgreSQL" => DatabaseType::PostgreSQL, - "MySQL" => DatabaseType::MySQL, - "SQLite" => DatabaseType::SQLite, - _ => DatabaseType::SQLite, - }; - sqlx::query(&format!( - "CREATE TABLE IF NOT EXISTS {CATALOG_TABLE_VIEW_NAME} ( - {CATALOG_NAME} VARCHAR(255) NOT NULL, - {TABLE_NAMESPACE} VARCHAR(255) NOT NULL, - {TABLE_NAME} VARCHAR(255) NOT NULL, - {METADATA_LOCATION_PROP} VARCHAR(1000), - {PREVIOUS_METADATA_LOCATION_PROP} VARCHAR(1000), - {RECORD_TYPE} VARCHAR(5), - PRIMARY KEY ({CATALOG_NAME}, {TABLE_NAMESPACE}, {TABLE_NAME}))" + "CREATE TABLE IF NOT EXISTS {CATALOG_TABLE_NAME} ( + {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL, + {CATALOG_FIELD_TABLE_NAMESPACE} VARCHAR(255) NOT NULL, + {CATALOG_FIELD_TABLE_NAME} VARCHAR(255) NOT NULL, + {CATALOG_FIELD_METADATA_LOCATION_PROP} VARCHAR(1000), + {CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP} VARCHAR(1000), + {CATALOG_FIELD_RECORD_TYPE} VARCHAR(5), + PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}))" )) .execute(&pool) .await .map_err(from_sqlx_error)?; sqlx::query(&format!( - "CREATE TABLE IF NOT EXISTS {NAMESPACE_PROPERTIES_TABLE_NAME} ( - {CATALOG_NAME} VARCHAR(255) NOT NULL, - {NAMESPACE_NAME} VARCHAR(255) NOT NULL, - {NAMESPACE_PROPERTY_KEY} VARCHAR(255), - {NAMESPACE_PROPERTY_VALUE} VARCHAR(1000), - PRIMARY KEY ({CATALOG_NAME}, {NAMESPACE_NAME}, {NAMESPACE_PROPERTY_KEY}))" + "CREATE TABLE IF NOT EXISTS {NAMESPACE_TABLE_NAME} ( + {CATALOG_FIELD_CATALOG_NAME} VARCHAR(255) NOT NULL, + {NAMESPACE_FIELD_NAME} VARCHAR(255) NOT NULL, + {NAMESPACE_FIELD_PROPERTY_KEY} VARCHAR(255), + {NAMESPACE_FIELD_PROPERTY_VALUE} VARCHAR(1000), + PRIMARY KEY ({CATALOG_FIELD_CATALOG_NAME}, {NAMESPACE_FIELD_NAME}, {NAMESPACE_FIELD_PROPERTY_KEY}))" )) .execute(&pool) .await @@ -142,7 +136,7 @@ impl SqlCatalog { connection: pool, _warehouse_location: config.warehouse_location, _fileio: config.file_io, - backend: db_type, + sql_bind_style: config.sql_bind_style, }) } @@ -152,15 +146,16 @@ impl SqlCatalog { query: &String, args: Vec>, ) -> Result> { - let query_with_placeholders: Cow = if self.backend == DatabaseType::PostgreSQL { - let mut query = query.clone(); - for i in 0..args.len() { - query = query.replacen("?", &format!("${}", i + 1), 1); - } - Cow::Owned(query) - } else { - Cow::Borrowed(query) - }; + let query_with_placeholders: Cow = + if self.sql_bind_style == SqlBindStyle::DollarNumeric { + let mut query = query.clone(); + for i in 0..args.len() { + query = query.replacen("?", &format!("${}", i + 1), 1); + } + Cow::Owned(query) + } else { + Cow::Borrowed(query) + }; let mut sqlx_query = sqlx::query(&query_with_placeholders); for arg in args { @@ -251,7 +246,7 @@ mod tests { use sqlx::migrate::MigrateDatabase; use tempfile::TempDir; - use crate::{SqlCatalog, SqlCatalogConfig}; + use crate::{SqlBindStyle, SqlCatalog, SqlCatalogConfig}; fn temp_path() -> String { let temp_dir = TempDir::new().unwrap(); @@ -267,6 +262,7 @@ mod tests { .name("iceberg".to_string()) .warehouse_location(warehouse_location) .file_io(FileIOBuilder::new_fs_io().build().unwrap()) + .sql_bind_style(SqlBindStyle::QMark) .build(); SqlCatalog::new(config).await.unwrap()