From 22aca1454555348dfa3ad7310a96a82a2de09a88 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Fri, 7 Nov 2025 16:25:31 +0300 Subject: [PATCH 1/4] Allow only s3 tables volumes --- .gitignore | 1 + Cargo.lock | 1 + crates/api-iceberg-rest/src/error.rs | 5 +- crates/core-executor/src/error.rs | 6 + crates/core-metastore/Cargo.toml | 1 + crates/core-metastore/src/basic/config.rs | 98 ++++++ crates/core-metastore/src/basic/metastore.rs | 323 +++++++++++++++++++ crates/core-metastore/src/basic/mod.rs | 2 + crates/core-metastore/src/error.rs | 7 + crates/core-metastore/src/lib.rs | 1 + crates/core-utils/src/scan_iterator.rs | 34 +- crates/embucketd/src/cli.rs | 7 + crates/embucketd/src/main.rs | 12 +- metastore.yaml_example | 10 + 14 files changed, 500 insertions(+), 8 deletions(-) create mode 100644 crates/core-metastore/src/basic/config.rs create mode 100644 crates/core-metastore/src/basic/metastore.rs create mode 100644 crates/core-metastore/src/basic/mod.rs create mode 100644 metastore.yaml_example diff --git a/.gitignore b/.gitignore index 0de61d7a6..4ea068b04 100644 --- a/.gitignore +++ b/.gitignore @@ -30,6 +30,7 @@ venv __pycache__ .env +metastore.yaml alloc.log slatedb-prefix/ diff --git a/Cargo.lock b/Cargo.lock index 14537ff19..92b08e112 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2327,6 +2327,7 @@ dependencies = [ "regex", "serde", "serde_json", + "serde_yaml", "slatedb", "snafu", "strum 0.27.2", diff --git a/crates/api-iceberg-rest/src/error.rs b/crates/api-iceberg-rest/src/error.rs index 7059adc1c..1eaa687d5 100644 --- a/crates/api-iceberg-rest/src/error.rs +++ b/crates/api-iceberg-rest/src/error.rs @@ -96,7 +96,10 @@ impl IntoResponse for Error { | core_metastore::Error::Serde { .. } | core_metastore::Error::TableMetadataBuilder { .. } | core_metastore::Error::TableObjectStoreNotFound { .. } - | core_metastore::Error::UrlParse { .. } => http::StatusCode::INTERNAL_SERVER_ERROR, + | core_metastore::Error::UrlParse { .. } + | core_metastore::Error::NotYetImplemented { .. } => { + http::StatusCode::INTERNAL_SERVER_ERROR + } }; // Record the result as part of the current span. diff --git a/crates/core-executor/src/error.rs b/crates/core-executor/src/error.rs index 6d32f4167..bf3257100 100644 --- a/crates/core-executor/src/error.rs +++ b/crates/core-executor/src/error.rs @@ -449,6 +449,12 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + #[snafu(display("Unimplemented functionality: '{name}'"))] + UnimplementedFunctionality { + name: String, + #[snafu(implicit)] + location: Location, + }, #[snafu(display("SQL parser error: {error}"))] SqlParser { #[snafu(source)] diff --git a/crates/core-metastore/Cargo.toml b/crates/core-metastore/Cargo.toml index e8e5c55e4..3a43312cc 100644 --- a/crates/core-metastore/Cargo.toml +++ b/crates/core-metastore/Cargo.toml @@ -19,6 +19,7 @@ iceberg-rust-spec = { workspace = true } object_store = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +serde_yaml = { workspace = true } slatedb = { workspace = true } snafu = { workspace = true } strum = { workspace = true } diff --git a/crates/core-metastore/src/basic/config.rs b/crates/core-metastore/src/basic/config.rs new file mode 100644 index 000000000..42622e10a --- /dev/null +++ b/crates/core-metastore/src/basic/config.rs @@ -0,0 +1,98 @@ +use crate::models::{Database, S3TablesVolume, Volume, VolumeIdent, VolumeType}; +use serde::{Deserialize, Serialize}; + +/// Configuration for the basic metastore loaded from YAML +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BasicMetastoreConfig { + pub volumes: Vec, +} + +/// Volume configuration with optional databases +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct VolumeConfig { + pub name: String, + #[serde(flatten)] + pub volume_type: S3TablesVolumeType, + #[serde(default)] + pub databases: Vec, +} + +/// Only `S3Tables` volume type is supported in basic metastore +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum S3TablesVolumeType { + S3Tables(S3TablesVolume), +} + +/// Database configuration +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DatabaseConfig { + pub name: String, +} + +impl BasicMetastoreConfig { + /// Load configuration from YAML file + pub fn from_yaml_file(path: &str) -> Result { + let content = std::fs::read_to_string(path) + .map_err(|e| format!("Failed to read config file: {e}"))?; + Self::from_yaml_str(&content) + } + + /// Load configuration from YAML string + pub fn from_yaml_str(yaml: &str) -> Result { + serde_yaml::from_str(yaml).map_err(|e| format!("Failed to parse config YAML: {e}")) + } +} + +impl VolumeConfig { + /// Convert to metastore Volume + #[must_use] + pub fn to_volume(&self) -> Volume { + Volume { + ident: self.name.clone(), + volume: match &self.volume_type { + S3TablesVolumeType::S3Tables(s3tables) => VolumeType::S3Tables(s3tables.clone()), + }, + } + } +} + +impl DatabaseConfig { + /// Convert to metastore Database + #[must_use] + pub fn to_database(&self, volume: &VolumeIdent) -> Database { + Database { + ident: self.name.clone(), + volume: volume.clone(), + properties: None, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_config() { + let yaml = r#" +volumes: + - name: my_volume + type: s3_tables + arn: "arn:aws:s3tables:us-east-1:123456789012:bucket/my-bucket" + endpoint: "https://s3tables.us-east-1.amazonaws.com" + credentials: !AccessKey + aws_access_key_id: "AKIAIOSFODNN7EXAMPLE" + aws_secret_access_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" + databases: + - name: my_db + - name: another_db +"#; + + let config = BasicMetastoreConfig::from_yaml_str(yaml).expect("Failed to parse config"); + assert_eq!(config.volumes.len(), 1); + assert_eq!(config.volumes[0].name, "my_volume"); + assert_eq!(config.volumes[0].databases.len(), 2); + assert_eq!(config.volumes[0].databases[0].name, "my_db"); + } +} diff --git a/crates/core-metastore/src/basic/metastore.rs b/crates/core-metastore/src/basic/metastore.rs new file mode 100644 index 000000000..976a3af99 --- /dev/null +++ b/crates/core-metastore/src/basic/metastore.rs @@ -0,0 +1,323 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use crate::{ + Metastore, + basic::config::BasicMetastoreConfig, + error::{self as metastore_error, Result}, + models::{ + RwObject, + database::{Database, DatabaseIdent}, + schema::{Schema, SchemaIdent}, + table::{Table, TableCreateRequest, TableIdent, TableUpdate}, + volumes::{Volume, VolumeIdent}, + }, +}; +use async_trait::async_trait; +use core_utils::scan_iterator::VecScanIterator; +use dashmap::DashMap; +use object_store::ObjectStore; + +/// Basic metastore implementation that reads volumes and databases from config +/// and returns "Not yet implemented" for write operations and schema/table operations +#[derive(Debug)] +pub struct BasicMetastore { + volumes: HashMap>, + databases: HashMap>, + object_store_cache: DashMap>, +} + +impl BasicMetastore { + /// Create a new `BasicMetastore` from configuration + #[must_use] + pub fn new(config: BasicMetastoreConfig) -> Self { + let mut volumes = HashMap::new(); + let mut databases = HashMap::new(); + + for volume_config in config.volumes { + let volume = volume_config.to_volume(); + let volume_ident = volume.ident.clone(); + + // Add volume + volumes.insert(volume_ident.clone(), RwObject::new(volume)); + + // Add databases for this volume + for db_config in volume_config.databases { + let database = db_config.to_database(&volume_ident); + let db_ident = database.ident.clone(); + databases.insert(db_ident, RwObject::new(database)); + } + } + + Self { + volumes, + databases, + object_store_cache: DashMap::new(), + } + } + + /// Create from YAML file + pub fn from_yaml_file(path: &str) -> Result { + let config = BasicMetastoreConfig::from_yaml_file(path).map_err(|e| { + metastore_error::NotYetImplementedSnafu { + operation: format!("Failed to load config: {e}"), + } + .build() + })?; + Ok(Self::new(config)) + } + + /// Create from YAML string + pub fn from_yaml_str(yaml: &str) -> Result { + let config = BasicMetastoreConfig::from_yaml_str(yaml).map_err(|e| { + metastore_error::NotYetImplementedSnafu { + operation: format!("Failed to parse config: {e}"), + } + .build() + })?; + Ok(Self::new(config)) + } + + fn not_implemented(operation: &str) -> Result<()> { + metastore_error::NotYetImplementedSnafu { + operation: operation.to_string(), + } + .fail() + } +} + +#[async_trait] +impl Metastore for BasicMetastore { + // Volume operations - read-only + fn iter_volumes(&self) -> VecScanIterator> { + VecScanIterator::from_vec(self.volumes.values().cloned().collect()) + } + + async fn create_volume( + &self, + _name: &VolumeIdent, + _volume: Volume, + ) -> Result> { + Self::not_implemented("create_volume - use config to define volumes")?; + unreachable!() + } + + async fn get_volume(&self, name: &VolumeIdent) -> Result>> { + Ok(self.volumes.get(name).cloned()) + } + + async fn update_volume( + &self, + _name: &VolumeIdent, + _volume: Volume, + ) -> Result> { + Self::not_implemented("update_volume - use config to define volumes")?; + unreachable!() + } + + async fn delete_volume(&self, _name: &VolumeIdent, _cascade: bool) -> Result<()> { + Self::not_implemented("delete_volume - use config to define volumes") + } + + async fn volume_object_store( + &self, + name: &VolumeIdent, + ) -> Result>> { + // Check cache first + if let Some(store) = self.object_store_cache.get(name) { + return Ok(Some(store.clone())); + } + + // Get volume and create object store + if let Some(volume_obj) = self.volumes.get(name) { + let store = volume_obj.data.get_object_store()?; + + // Cache it + self.object_store_cache.insert(name.clone(), store.clone()); + + Ok(Some(store)) + } else { + Ok(None) + } + } + + // Database operations - read-only + fn iter_databases(&self) -> VecScanIterator> { + VecScanIterator::from_vec(self.databases.values().cloned().collect()) + } + + async fn create_database( + &self, + _name: &DatabaseIdent, + _database: Database, + ) -> Result> { + Self::not_implemented("create_database - use config to define databases")?; + unreachable!() + } + + async fn get_database(&self, name: &DatabaseIdent) -> Result>> { + Ok(self.databases.get(name).cloned()) + } + + async fn update_database( + &self, + _name: &DatabaseIdent, + _database: Database, + ) -> Result> { + Self::not_implemented("update_database - use config to define databases")?; + unreachable!() + } + + async fn delete_database(&self, _name: &DatabaseIdent, _cascade: bool) -> Result<()> { + Self::not_implemented("delete_database - use config to define databases") + } + + // Schema operations - not implemented + fn iter_schemas(&self, _database: &DatabaseIdent) -> VecScanIterator> { + VecScanIterator::from_vec(vec![]) + } + + async fn create_schema( + &self, + _ident: &SchemaIdent, + _schema: Schema, + ) -> Result> { + Self::not_implemented("create_schema")?; + unreachable!() + } + + async fn get_schema(&self, _ident: &SchemaIdent) -> Result>> { + Ok(None) + } + + async fn update_schema( + &self, + _ident: &SchemaIdent, + _schema: Schema, + ) -> Result> { + Self::not_implemented("update_schema")?; + unreachable!() + } + + async fn delete_schema(&self, _ident: &SchemaIdent, _cascade: bool) -> Result<()> { + Self::not_implemented("delete_schema") + } + + // Table operations - not implemented + fn iter_tables(&self, _schema: &SchemaIdent) -> VecScanIterator> { + VecScanIterator::from_vec(vec![]) + } + + async fn create_table( + &self, + _ident: &TableIdent, + _table: TableCreateRequest, + ) -> Result> { + Self::not_implemented("create_table")?; + unreachable!() + } + + async fn get_table(&self, _ident: &TableIdent) -> Result>> { + Ok(None) + } + + async fn update_table( + &self, + _ident: &TableIdent, + _update: TableUpdate, + ) -> Result> { + Self::not_implemented("update_table")?; + unreachable!() + } + + async fn delete_table(&self, _ident: &TableIdent, _cascade: bool) -> Result<()> { + Self::not_implemented("delete_table") + } + + async fn table_object_store( + &self, + _ident: &TableIdent, + ) -> Result>> { + Ok(None) + } + + // Helper methods + async fn table_exists(&self, _ident: &TableIdent) -> Result { + Ok(false) + } + + async fn url_for_table(&self, _ident: &TableIdent) -> Result { + Self::not_implemented("url_for_table")?; + unreachable!() + } + + async fn volume_for_table(&self, _ident: &TableIdent) -> Result>> { + Ok(None) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_basic_metastore_from_config() { + let yaml = r#" +volumes: + - name: my_volume + type: s3_tables + arn: "arn:aws:s3tables:us-east-1:123456789012:bucket/my-bucket" + endpoint: "https://s3tables.us-east-1.amazonaws.com" + credentials: !AccessKey + aws_access_key_id: "AKIAIOSFODNN7EXAMPLE" + aws_secret_access_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" + databases: + - name: my_db + - name: another_db +"#; + + let metastore = BasicMetastore::from_yaml_str(yaml).expect("Failed to create metastore"); + + // Test volumes + let volumes: Vec<_> = metastore.iter_volumes().collect(); + assert_eq!(volumes.len(), 1); + assert_eq!(volumes[0].data.ident.0, "my_volume"); + + // Test get_volume + let volume = metastore + .get_volume(&VolumeIdent("my_volume".to_string())) + .await + .expect("Failed to get volume"); + assert!(volume.is_some()); + + // Test databases + let databases: Vec<_> = metastore.iter_databases().collect(); + assert_eq!(databases.len(), 2); + + // Test get_database + let db = metastore + .get_database(&"my_db".to_string()) + .await + .expect("Failed to get database"); + assert!(db.is_some()); + assert_eq!(db.unwrap().data.ident, "my_db"); + + // Test write operations return not implemented + let result = metastore + .create_volume( + &VolumeIdent("test".to_string()), + Volume { + ident: VolumeIdent("test".to_string()), + volume: crate::models::VolumeType::Memory, + }, + ) + .await; + assert!(result.is_err()); + assert!( + result + .unwrap_err() + .to_string() + .contains("Not yet implemented") + ); + } +} diff --git a/crates/core-metastore/src/basic/mod.rs b/crates/core-metastore/src/basic/mod.rs new file mode 100644 index 000000000..df9689922 --- /dev/null +++ b/crates/core-metastore/src/basic/mod.rs @@ -0,0 +1,2 @@ +pub mod config; +pub mod metastore; diff --git a/crates/core-metastore/src/error.rs b/crates/core-metastore/src/error.rs index 7aa745f19..2b1b3c8a2 100644 --- a/crates/core-metastore/src/error.rs +++ b/crates/core-metastore/src/error.rs @@ -237,4 +237,11 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Not yet implemented: {operation}"))] + NotYetImplemented { + operation: String, + #[snafu(implicit)] + location: Location, + }, } diff --git a/crates/core-metastore/src/lib.rs b/crates/core-metastore/src/lib.rs index 0760a80c3..320187a9d 100644 --- a/crates/core-metastore/src/lib.rs +++ b/crates/core-metastore/src/lib.rs @@ -1,3 +1,4 @@ +pub mod basic; pub mod error; pub mod metastore; pub mod models; diff --git a/crates/core-utils/src/scan_iterator.rs b/crates/core-utils/src/scan_iterator.rs index 91cab7cb0..387e40ee9 100644 --- a/crates/core-utils/src/scan_iterator.rs +++ b/crates/core-utils/src/scan_iterator.rs @@ -17,7 +17,7 @@ pub trait ScanIterator: Sized { #[derive(Clone)] pub struct VecScanIterator serde::de::Deserialize<'de>> { - db: Arc, + db: Option>, key: String, //From where to start the scan range for SlateDB // ex: if we ended on "tested2", the cursor would be "tested2" @@ -35,17 +35,34 @@ pub struct VecScanIterator serde::de::Deserialize<'de>> { // if we however had the cursor from cursor comment (line 21) // we could also go from `tested2\x00..tes\x7F` which would yield us "tested3" and "tested4" only excluding other names if any exist token: Option, + // For in-memory mode + in_memory_items: Option>, marker: PhantomData, } impl serde::de::Deserialize<'de>> VecScanIterator { pub const fn new(db: Arc, key: String) -> Self { Self { - db, + db: Some(db), key, cursor: None, limit: None, token: None, + in_memory_items: None, + marker: PhantomData, + } + } + + /// Create an in-memory iterator from a vector (used when `SlateDB` is not available) + #[must_use] + pub const fn from_vec(items: Vec) -> Self { + Self { + db: None, + key: String::new(), + cursor: None, + limit: None, + token: None, + in_memory_items: Some(items), marker: PhantomData, } } @@ -74,7 +91,18 @@ impl serde::de::Deserialize<'de>> ScanIterator for VecScanIte fields(keys_range, items_count), err )] + #[allow(clippy::expect_used)] async fn collect(self) -> Result { + // Handle in-memory mode + if let Some(items) = self.in_memory_items { + return Ok(items); + } + + // SlateDB mode + let db = self + .db + .expect("VecScanIterator requires either db or in_memory_items"); + //We can look with respect to limit // from start to end (full scan), // from starts_with to start_with (search), @@ -98,7 +126,7 @@ impl serde::de::Deserialize<'de>> ScanIterator for VecScanIte tracing::Span::current().record("keys_range", format!("{start}..{end}")); let range = Bytes::from(start)..Bytes::from(end); - let mut iter = self.db.scan(range).await.context(errors::ScanFailedSnafu)?; + let mut iter = db.scan(range).await.context(errors::ScanFailedSnafu)?; let mut objects = Self::Collectable::new(); while let Ok(Some(bytes)) = iter.next().await { diff --git a/crates/embucketd/src/cli.rs b/crates/embucketd/src/cli.rs index 6163fcf23..2904401ae 100644 --- a/crates/embucketd/src/cli.rs +++ b/crates/embucketd/src/cli.rs @@ -276,6 +276,13 @@ pub struct CliOpts { help = "Tracing span processor" )] pub tracing_span_processor: TracingSpanProcessor, + + #[arg( + long, + env = "METASTORE_CONFIG", + help = "Path to metastore config YAML file (only for basic metastore)" + )] + pub metastore_config: Option, } #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)] diff --git a/crates/embucketd/src/main.rs b/crates/embucketd/src/main.rs index 2f8635043..a2e39dc48 100644 --- a/crates/embucketd/src/main.rs +++ b/crates/embucketd/src/main.rs @@ -36,7 +36,9 @@ use clap::Parser; use core_executor::service::CoreExecutionService; use core_executor::utils::Config as ExecutionConfig; use core_history::SlateDBHistoryStore; -use core_metastore::SlateDBMetastore; +// use core_metastore::SlateDBMetastore; +use core_metastore::basic::config::BasicMetastoreConfig; +use core_metastore::basic::metastore::BasicMetastore; use core_utils::Db; use dotenv::dotenv; use object_store::path::Path; @@ -204,8 +206,10 @@ async fn async_main( ); let db = Db::new(slate_db); - - let metastore = Arc::new(SlateDBMetastore::new(db.clone())); + // let metastore = Arc::new(SlateDBMetastore::new(db.clone())); + let metastore_mem_config = + BasicMetastoreConfig::from_yaml_file(opts.metastore_config.unwrap().to_str().unwrap())?; + let metastore = Arc::new(BasicMetastore::new(metastore_mem_config)); let history_store = Arc::new( SlateDBHistoryStore::new( db.clone(), @@ -384,7 +388,7 @@ fn setup_tracing(opts: &cli::CliOpts) -> SdkTracerProvider { let targets_with_level = |targets: &[&'static str], level: LevelFilter| -> Vec<(&str, LevelFilter)> { // let default_log_targets: Vec<(String, LevelFilter)> = - targets.iter().map(|t| ((*t), level)).collect() + targets.iter().map(|t| (*t, level)).collect() }; // Memory allocations diff --git a/metastore.yaml_example b/metastore.yaml_example new file mode 100644 index 000000000..62dccc4ea --- /dev/null +++ b/metastore.yaml_example @@ -0,0 +1,10 @@ +volumes: + - name: s3tablesdb + type: s3_tables + arn: "arn:aws:s3tables:us-east-2:111111111:bucket/s3-table-bucket" + credentials: + credential_type: access_key + aws-access-key-id: "11111111111111111111" + aws-secret-access-key: "2222222222222222222" + databases: + - name: my_s3_tables_db From 58aa51b8f9503be0108bfd29870011f2eee364ac Mon Sep 17 00:00:00 2001 From: osipovartem Date: Fri, 7 Nov 2025 16:41:44 +0300 Subject: [PATCH 2/4] Fix test --- .gitignore | 1 + crates/core-metastore/src/basic/metastore.rs | 30 +++++++++++++------- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/.gitignore b/.gitignore index 4ea068b04..4446bbd94 100644 --- a/.gitignore +++ b/.gitignore @@ -43,6 +43,7 @@ tests/benchmark/*.json test/errors.log test/test_statistics.csv snowplow +crates/api-snowflake-rest/traces.log *.manifest *.sst diff --git a/crates/core-metastore/src/basic/metastore.rs b/crates/core-metastore/src/basic/metastore.rs index 976a3af99..e7c512a99 100644 --- a/crates/core-metastore/src/basic/metastore.rs +++ b/crates/core-metastore/src/basic/metastore.rs @@ -259,18 +259,20 @@ impl Metastore for BasicMetastore { #[cfg(test)] mod tests { use super::*; + use core_utils::scan_iterator::ScanIterator; #[tokio::test] + #[allow(clippy::unwrap_used)] async fn test_basic_metastore_from_config() { let yaml = r#" volumes: - name: my_volume type: s3_tables arn: "arn:aws:s3tables:us-east-1:123456789012:bucket/my-bucket" - endpoint: "https://s3tables.us-east-1.amazonaws.com" - credentials: !AccessKey - aws_access_key_id: "AKIAIOSFODNN7EXAMPLE" - aws_secret_access_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" + credentials: + credential_type: access_key + aws-access-key-id: "AKIAIOSFODNN7EXAMPLE" + aws-secret-access-key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" databases: - name: my_db - name: another_db @@ -279,19 +281,27 @@ volumes: let metastore = BasicMetastore::from_yaml_str(yaml).expect("Failed to create metastore"); // Test volumes - let volumes: Vec<_> = metastore.iter_volumes().collect(); + let volumes: Vec<_> = metastore + .iter_volumes() + .collect() + .await + .expect("Failed to iter volumes"); assert_eq!(volumes.len(), 1); - assert_eq!(volumes[0].data.ident.0, "my_volume"); + assert_eq!(volumes[0].data.ident, "my_volume"); // Test get_volume let volume = metastore - .get_volume(&VolumeIdent("my_volume".to_string())) + .get_volume(&"my_volume".to_string()) .await .expect("Failed to get volume"); assert!(volume.is_some()); // Test databases - let databases: Vec<_> = metastore.iter_databases().collect(); + let databases: Vec<_> = metastore + .iter_databases() + .collect() + .await + .expect("Failed to iter databases"); assert_eq!(databases.len(), 2); // Test get_database @@ -305,9 +315,9 @@ volumes: // Test write operations return not implemented let result = metastore .create_volume( - &VolumeIdent("test".to_string()), + &"test".to_string(), Volume { - ident: VolumeIdent("test".to_string()), + ident: "test".to_string(), volume: crate::models::VolumeType::Memory, }, ) From 7824f7923ba615fadefbff0ec22dbaef361985c3 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Fri, 7 Nov 2025 17:07:16 +0300 Subject: [PATCH 3/4] Fix test --- Cargo.lock | 1 + crates/core-metastore/Cargo.toml | 1 + crates/core-metastore/src/basic/config.rs | 7 +-- crates/core-metastore/src/basic/metastore.rs | 48 ++++++++++++-------- 4 files changed, 36 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 92b08e112..0fb0eaf2f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2324,6 +2324,7 @@ dependencies = [ "iceberg-rust-spec", "insta", "object_store", + "parking_lot", "regex", "serde", "serde_json", diff --git a/crates/core-metastore/Cargo.toml b/crates/core-metastore/Cargo.toml index 3a43312cc..b5f442952 100644 --- a/crates/core-metastore/Cargo.toml +++ b/crates/core-metastore/Cargo.toml @@ -31,6 +31,7 @@ utoipa = { workspace = true } uuid = { workspace = true } validator = { workspace = true } regex = { workspace = true } +parking_lot = "0.12.5" [dev-dependencies] insta = { workspace = true } diff --git a/crates/core-metastore/src/basic/config.rs b/crates/core-metastore/src/basic/config.rs index 42622e10a..a29ee0867 100644 --- a/crates/core-metastore/src/basic/config.rs +++ b/crates/core-metastore/src/basic/config.rs @@ -81,9 +81,10 @@ volumes: type: s3_tables arn: "arn:aws:s3tables:us-east-1:123456789012:bucket/my-bucket" endpoint: "https://s3tables.us-east-1.amazonaws.com" - credentials: !AccessKey - aws_access_key_id: "AKIAIOSFODNN7EXAMPLE" - aws_secret_access_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" + credentials: + credential_type: access_key + aws-access-key-id: "AKIAIOSFODNN7EXAMPLE" + aws-secret-access-key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" databases: - name: my_db - name: another_db diff --git a/crates/core-metastore/src/basic/metastore.rs b/crates/core-metastore/src/basic/metastore.rs index e7c512a99..9938da431 100644 --- a/crates/core-metastore/src/basic/metastore.rs +++ b/crates/core-metastore/src/basic/metastore.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::sync::Arc; use crate::{ - Metastore, + Metastore, VolumeType, basic::config::BasicMetastoreConfig, error::{self as metastore_error, Result}, models::{ @@ -17,13 +17,14 @@ use async_trait::async_trait; use core_utils::scan_iterator::VecScanIterator; use dashmap::DashMap; use object_store::ObjectStore; +use parking_lot::RwLock; /// Basic metastore implementation that reads volumes and databases from config /// and returns "Not yet implemented" for write operations and schema/table operations #[derive(Debug)] pub struct BasicMetastore { - volumes: HashMap>, - databases: HashMap>, + volumes: RwLock>>, + databases: RwLock>>, object_store_cache: DashMap>, } @@ -50,8 +51,8 @@ impl BasicMetastore { } Self { - volumes, - databases, + volumes: RwLock::new(volumes), + databases: RwLock::new(databases), object_store_cache: DashMap::new(), } } @@ -90,20 +91,28 @@ impl BasicMetastore { impl Metastore for BasicMetastore { // Volume operations - read-only fn iter_volumes(&self) -> VecScanIterator> { - VecScanIterator::from_vec(self.volumes.values().cloned().collect()) + let guard = self.volumes.read(); + VecScanIterator::from_vec(guard.values().cloned().collect()) } - async fn create_volume( - &self, - _name: &VolumeIdent, - _volume: Volume, - ) -> Result> { - Self::not_implemented("create_volume - use config to define volumes")?; - unreachable!() + async fn create_volume(&self, name: &VolumeIdent, volume: Volume) -> Result> { + match &volume.volume { + VolumeType::S3Tables(_) => { + let rw_object = RwObject::new(volume); + let mut guard = self.volumes.write(); + guard.insert(name.clone(), rw_object.clone()); + Ok(rw_object) + } + _ => metastore_error::NotYetImplementedSnafu { + operation: "create_volume - allowed only for s3 tables type", + } + .fail(), + } } async fn get_volume(&self, name: &VolumeIdent) -> Result>> { - Ok(self.volumes.get(name).cloned()) + let guard = self.volumes.read(); + Ok(guard.get(name).cloned()) } async fn update_volume( @@ -129,7 +138,8 @@ impl Metastore for BasicMetastore { } // Get volume and create object store - if let Some(volume_obj) = self.volumes.get(name) { + let guard = self.volumes.read(); + if let Some(volume_obj) = guard.get(name) { let store = volume_obj.data.get_object_store()?; // Cache it @@ -143,7 +153,8 @@ impl Metastore for BasicMetastore { // Database operations - read-only fn iter_databases(&self) -> VecScanIterator> { - VecScanIterator::from_vec(self.databases.values().cloned().collect()) + let guard = self.databases.read(); + VecScanIterator::from_vec(guard.values().cloned().collect()) } async fn create_database( @@ -156,7 +167,8 @@ impl Metastore for BasicMetastore { } async fn get_database(&self, name: &DatabaseIdent) -> Result>> { - Ok(self.databases.get(name).cloned()) + let guard = self.databases.read(); + Ok(guard.get(name).cloned()) } async fn update_database( @@ -318,7 +330,7 @@ volumes: &"test".to_string(), Volume { ident: "test".to_string(), - volume: crate::models::VolumeType::Memory, + volume: VolumeType::Memory, }, ) .await; From 15ab44df5d93249c3f37351511ac20cc2039a596 Mon Sep 17 00:00:00 2001 From: osipovartem Date: Fri, 7 Nov 2025 20:06:55 +0300 Subject: [PATCH 4/4] Allow to create s3 tables database --- crates/core-metastore/src/basic/metastore.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/crates/core-metastore/src/basic/metastore.rs b/crates/core-metastore/src/basic/metastore.rs index 9938da431..a493b4c92 100644 --- a/crates/core-metastore/src/basic/metastore.rs +++ b/crates/core-metastore/src/basic/metastore.rs @@ -159,9 +159,19 @@ impl Metastore for BasicMetastore { async fn create_database( &self, - _name: &DatabaseIdent, - _database: Database, + name: &DatabaseIdent, + database: Database, ) -> Result> { + let volumes_guard = self.volumes.read(); + + if let Some(volume) = volumes_guard.get(&database.volume).cloned() + && matches!(volume.volume, VolumeType::S3Tables(_)) + { + let rw_object = RwObject::new(database); + let mut guard = self.databases.write(); + guard.insert(name.clone(), rw_object.clone()); + return Ok(rw_object); + } Self::not_implemented("create_database - use config to define databases")?; unreachable!() }