diff --git a/.gitignore b/.gitignore index 0de61d7a6..4446bbd94 100644 --- a/.gitignore +++ b/.gitignore @@ -30,6 +30,7 @@ venv __pycache__ .env +metastore.yaml alloc.log slatedb-prefix/ @@ -42,6 +43,7 @@ tests/benchmark/*.json test/errors.log test/test_statistics.csv snowplow +crates/api-snowflake-rest/traces.log *.manifest *.sst diff --git a/Cargo.lock b/Cargo.lock index 14537ff19..0fb0eaf2f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2324,9 +2324,11 @@ dependencies = [ "iceberg-rust-spec", "insta", "object_store", + "parking_lot", "regex", "serde", "serde_json", + "serde_yaml", "slatedb", "snafu", "strum 0.27.2", diff --git a/crates/api-iceberg-rest/src/error.rs b/crates/api-iceberg-rest/src/error.rs index 7059adc1c..1eaa687d5 100644 --- a/crates/api-iceberg-rest/src/error.rs +++ b/crates/api-iceberg-rest/src/error.rs @@ -96,7 +96,10 @@ impl IntoResponse for Error { | core_metastore::Error::Serde { .. } | core_metastore::Error::TableMetadataBuilder { .. } | core_metastore::Error::TableObjectStoreNotFound { .. } - | core_metastore::Error::UrlParse { .. } => http::StatusCode::INTERNAL_SERVER_ERROR, + | core_metastore::Error::UrlParse { .. } + | core_metastore::Error::NotYetImplemented { .. } => { + http::StatusCode::INTERNAL_SERVER_ERROR + } }; // Record the result as part of the current span. diff --git a/crates/core-executor/src/error.rs b/crates/core-executor/src/error.rs index 6d32f4167..bf3257100 100644 --- a/crates/core-executor/src/error.rs +++ b/crates/core-executor/src/error.rs @@ -449,6 +449,12 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + #[snafu(display("Unimplemented functionality: '{name}'"))] + UnimplementedFunctionality { + name: String, + #[snafu(implicit)] + location: Location, + }, #[snafu(display("SQL parser error: {error}"))] SqlParser { #[snafu(source)] diff --git a/crates/core-metastore/Cargo.toml b/crates/core-metastore/Cargo.toml index e8e5c55e4..b5f442952 100644 --- a/crates/core-metastore/Cargo.toml +++ b/crates/core-metastore/Cargo.toml @@ -19,6 +19,7 @@ iceberg-rust-spec = { workspace = true } object_store = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +serde_yaml = { workspace = true } slatedb = { workspace = true } snafu = { workspace = true } strum = { workspace = true } @@ -30,6 +31,7 @@ utoipa = { workspace = true } uuid = { workspace = true } validator = { workspace = true } regex = { workspace = true } +parking_lot = "0.12.5" [dev-dependencies] insta = { workspace = true } diff --git a/crates/core-metastore/src/basic/config.rs b/crates/core-metastore/src/basic/config.rs new file mode 100644 index 000000000..a29ee0867 --- /dev/null +++ b/crates/core-metastore/src/basic/config.rs @@ -0,0 +1,99 @@ +use crate::models::{Database, S3TablesVolume, Volume, VolumeIdent, VolumeType}; +use serde::{Deserialize, Serialize}; + +/// Configuration for the basic metastore loaded from YAML +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BasicMetastoreConfig { + pub volumes: Vec, +} + +/// Volume configuration with optional databases +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct VolumeConfig { + pub name: String, + #[serde(flatten)] + pub volume_type: S3TablesVolumeType, + #[serde(default)] + pub databases: Vec, +} + +/// Only `S3Tables` volume type is supported in basic metastore +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum S3TablesVolumeType { + S3Tables(S3TablesVolume), +} + +/// Database configuration +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DatabaseConfig { + pub name: String, +} + +impl BasicMetastoreConfig { + /// Load configuration from YAML file + pub fn from_yaml_file(path: &str) -> Result { + let content = std::fs::read_to_string(path) + .map_err(|e| format!("Failed to read config file: {e}"))?; + Self::from_yaml_str(&content) + } + + /// Load configuration from YAML string + pub fn from_yaml_str(yaml: &str) -> Result { + serde_yaml::from_str(yaml).map_err(|e| format!("Failed to parse config YAML: {e}")) + } +} + +impl VolumeConfig { + /// Convert to metastore Volume + #[must_use] + pub fn to_volume(&self) -> Volume { + Volume { + ident: self.name.clone(), + volume: match &self.volume_type { + S3TablesVolumeType::S3Tables(s3tables) => VolumeType::S3Tables(s3tables.clone()), + }, + } + } +} + +impl DatabaseConfig { + /// Convert to metastore Database + #[must_use] + pub fn to_database(&self, volume: &VolumeIdent) -> Database { + Database { + ident: self.name.clone(), + volume: volume.clone(), + properties: None, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_config() { + let yaml = r#" +volumes: + - name: my_volume + type: s3_tables + arn: "arn:aws:s3tables:us-east-1:123456789012:bucket/my-bucket" + endpoint: "https://s3tables.us-east-1.amazonaws.com" + credentials: + credential_type: access_key + aws-access-key-id: "AKIAIOSFODNN7EXAMPLE" + aws-secret-access-key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" + databases: + - name: my_db + - name: another_db +"#; + + let config = BasicMetastoreConfig::from_yaml_str(yaml).expect("Failed to parse config"); + assert_eq!(config.volumes.len(), 1); + assert_eq!(config.volumes[0].name, "my_volume"); + assert_eq!(config.volumes[0].databases.len(), 2); + assert_eq!(config.volumes[0].databases[0].name, "my_db"); + } +} diff --git a/crates/core-metastore/src/basic/metastore.rs b/crates/core-metastore/src/basic/metastore.rs new file mode 100644 index 000000000..a493b4c92 --- /dev/null +++ b/crates/core-metastore/src/basic/metastore.rs @@ -0,0 +1,355 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use crate::{ + Metastore, VolumeType, + basic::config::BasicMetastoreConfig, + error::{self as metastore_error, Result}, + models::{ + RwObject, + database::{Database, DatabaseIdent}, + schema::{Schema, SchemaIdent}, + table::{Table, TableCreateRequest, TableIdent, TableUpdate}, + volumes::{Volume, VolumeIdent}, + }, +}; +use async_trait::async_trait; +use core_utils::scan_iterator::VecScanIterator; +use dashmap::DashMap; +use object_store::ObjectStore; +use parking_lot::RwLock; + +/// Basic metastore implementation that reads volumes and databases from config +/// and returns "Not yet implemented" for write operations and schema/table operations +#[derive(Debug)] +pub struct BasicMetastore { + volumes: RwLock>>, + databases: RwLock>>, + object_store_cache: DashMap>, +} + +impl BasicMetastore { + /// Create a new `BasicMetastore` from configuration + #[must_use] + pub fn new(config: BasicMetastoreConfig) -> Self { + let mut volumes = HashMap::new(); + let mut databases = HashMap::new(); + + for volume_config in config.volumes { + let volume = volume_config.to_volume(); + let volume_ident = volume.ident.clone(); + + // Add volume + volumes.insert(volume_ident.clone(), RwObject::new(volume)); + + // Add databases for this volume + for db_config in volume_config.databases { + let database = db_config.to_database(&volume_ident); + let db_ident = database.ident.clone(); + databases.insert(db_ident, RwObject::new(database)); + } + } + + Self { + volumes: RwLock::new(volumes), + databases: RwLock::new(databases), + object_store_cache: DashMap::new(), + } + } + + /// Create from YAML file + pub fn from_yaml_file(path: &str) -> Result { + let config = BasicMetastoreConfig::from_yaml_file(path).map_err(|e| { + metastore_error::NotYetImplementedSnafu { + operation: format!("Failed to load config: {e}"), + } + .build() + })?; + Ok(Self::new(config)) + } + + /// Create from YAML string + pub fn from_yaml_str(yaml: &str) -> Result { + let config = BasicMetastoreConfig::from_yaml_str(yaml).map_err(|e| { + metastore_error::NotYetImplementedSnafu { + operation: format!("Failed to parse config: {e}"), + } + .build() + })?; + Ok(Self::new(config)) + } + + fn not_implemented(operation: &str) -> Result<()> { + metastore_error::NotYetImplementedSnafu { + operation: operation.to_string(), + } + .fail() + } +} + +#[async_trait] +impl Metastore for BasicMetastore { + // Volume operations - read-only + fn iter_volumes(&self) -> VecScanIterator> { + let guard = self.volumes.read(); + VecScanIterator::from_vec(guard.values().cloned().collect()) + } + + async fn create_volume(&self, name: &VolumeIdent, volume: Volume) -> Result> { + match &volume.volume { + VolumeType::S3Tables(_) => { + let rw_object = RwObject::new(volume); + let mut guard = self.volumes.write(); + guard.insert(name.clone(), rw_object.clone()); + Ok(rw_object) + } + _ => metastore_error::NotYetImplementedSnafu { + operation: "create_volume - allowed only for s3 tables type", + } + .fail(), + } + } + + async fn get_volume(&self, name: &VolumeIdent) -> Result>> { + let guard = self.volumes.read(); + Ok(guard.get(name).cloned()) + } + + async fn update_volume( + &self, + _name: &VolumeIdent, + _volume: Volume, + ) -> Result> { + Self::not_implemented("update_volume - use config to define volumes")?; + unreachable!() + } + + async fn delete_volume(&self, _name: &VolumeIdent, _cascade: bool) -> Result<()> { + Self::not_implemented("delete_volume - use config to define volumes") + } + + async fn volume_object_store( + &self, + name: &VolumeIdent, + ) -> Result>> { + // Check cache first + if let Some(store) = self.object_store_cache.get(name) { + return Ok(Some(store.clone())); + } + + // Get volume and create object store + let guard = self.volumes.read(); + if let Some(volume_obj) = guard.get(name) { + let store = volume_obj.data.get_object_store()?; + + // Cache it + self.object_store_cache.insert(name.clone(), store.clone()); + + Ok(Some(store)) + } else { + Ok(None) + } + } + + // Database operations - read-only + fn iter_databases(&self) -> VecScanIterator> { + let guard = self.databases.read(); + VecScanIterator::from_vec(guard.values().cloned().collect()) + } + + async fn create_database( + &self, + name: &DatabaseIdent, + database: Database, + ) -> Result> { + let volumes_guard = self.volumes.read(); + + if let Some(volume) = volumes_guard.get(&database.volume).cloned() + && matches!(volume.volume, VolumeType::S3Tables(_)) + { + let rw_object = RwObject::new(database); + let mut guard = self.databases.write(); + guard.insert(name.clone(), rw_object.clone()); + return Ok(rw_object); + } + Self::not_implemented("create_database - use config to define databases")?; + unreachable!() + } + + async fn get_database(&self, name: &DatabaseIdent) -> Result>> { + let guard = self.databases.read(); + Ok(guard.get(name).cloned()) + } + + async fn update_database( + &self, + _name: &DatabaseIdent, + _database: Database, + ) -> Result> { + Self::not_implemented("update_database - use config to define databases")?; + unreachable!() + } + + async fn delete_database(&self, _name: &DatabaseIdent, _cascade: bool) -> Result<()> { + Self::not_implemented("delete_database - use config to define databases") + } + + // Schema operations - not implemented + fn iter_schemas(&self, _database: &DatabaseIdent) -> VecScanIterator> { + VecScanIterator::from_vec(vec![]) + } + + async fn create_schema( + &self, + _ident: &SchemaIdent, + _schema: Schema, + ) -> Result> { + Self::not_implemented("create_schema")?; + unreachable!() + } + + async fn get_schema(&self, _ident: &SchemaIdent) -> Result>> { + Ok(None) + } + + async fn update_schema( + &self, + _ident: &SchemaIdent, + _schema: Schema, + ) -> Result> { + Self::not_implemented("update_schema")?; + unreachable!() + } + + async fn delete_schema(&self, _ident: &SchemaIdent, _cascade: bool) -> Result<()> { + Self::not_implemented("delete_schema") + } + + // Table operations - not implemented + fn iter_tables(&self, _schema: &SchemaIdent) -> VecScanIterator> { + VecScanIterator::from_vec(vec![]) + } + + async fn create_table( + &self, + _ident: &TableIdent, + _table: TableCreateRequest, + ) -> Result> { + Self::not_implemented("create_table")?; + unreachable!() + } + + async fn get_table(&self, _ident: &TableIdent) -> Result>> { + Ok(None) + } + + async fn update_table( + &self, + _ident: &TableIdent, + _update: TableUpdate, + ) -> Result> { + Self::not_implemented("update_table")?; + unreachable!() + } + + async fn delete_table(&self, _ident: &TableIdent, _cascade: bool) -> Result<()> { + Self::not_implemented("delete_table") + } + + async fn table_object_store( + &self, + _ident: &TableIdent, + ) -> Result>> { + Ok(None) + } + + // Helper methods + async fn table_exists(&self, _ident: &TableIdent) -> Result { + Ok(false) + } + + async fn url_for_table(&self, _ident: &TableIdent) -> Result { + Self::not_implemented("url_for_table")?; + unreachable!() + } + + async fn volume_for_table(&self, _ident: &TableIdent) -> Result>> { + Ok(None) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use core_utils::scan_iterator::ScanIterator; + + #[tokio::test] + #[allow(clippy::unwrap_used)] + async fn test_basic_metastore_from_config() { + let yaml = r#" +volumes: + - name: my_volume + type: s3_tables + arn: "arn:aws:s3tables:us-east-1:123456789012:bucket/my-bucket" + credentials: + credential_type: access_key + aws-access-key-id: "AKIAIOSFODNN7EXAMPLE" + aws-secret-access-key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" + databases: + - name: my_db + - name: another_db +"#; + + let metastore = BasicMetastore::from_yaml_str(yaml).expect("Failed to create metastore"); + + // Test volumes + let volumes: Vec<_> = metastore + .iter_volumes() + .collect() + .await + .expect("Failed to iter volumes"); + assert_eq!(volumes.len(), 1); + assert_eq!(volumes[0].data.ident, "my_volume"); + + // Test get_volume + let volume = metastore + .get_volume(&"my_volume".to_string()) + .await + .expect("Failed to get volume"); + assert!(volume.is_some()); + + // Test databases + let databases: Vec<_> = metastore + .iter_databases() + .collect() + .await + .expect("Failed to iter databases"); + assert_eq!(databases.len(), 2); + + // Test get_database + let db = metastore + .get_database(&"my_db".to_string()) + .await + .expect("Failed to get database"); + assert!(db.is_some()); + assert_eq!(db.unwrap().data.ident, "my_db"); + + // Test write operations return not implemented + let result = metastore + .create_volume( + &"test".to_string(), + Volume { + ident: "test".to_string(), + volume: VolumeType::Memory, + }, + ) + .await; + assert!(result.is_err()); + assert!( + result + .unwrap_err() + .to_string() + .contains("Not yet implemented") + ); + } +} diff --git a/crates/core-metastore/src/basic/mod.rs b/crates/core-metastore/src/basic/mod.rs new file mode 100644 index 000000000..df9689922 --- /dev/null +++ b/crates/core-metastore/src/basic/mod.rs @@ -0,0 +1,2 @@ +pub mod config; +pub mod metastore; diff --git a/crates/core-metastore/src/error.rs b/crates/core-metastore/src/error.rs index 7aa745f19..2b1b3c8a2 100644 --- a/crates/core-metastore/src/error.rs +++ b/crates/core-metastore/src/error.rs @@ -237,4 +237,11 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Not yet implemented: {operation}"))] + NotYetImplemented { + operation: String, + #[snafu(implicit)] + location: Location, + }, } diff --git a/crates/core-metastore/src/lib.rs b/crates/core-metastore/src/lib.rs index 0760a80c3..320187a9d 100644 --- a/crates/core-metastore/src/lib.rs +++ b/crates/core-metastore/src/lib.rs @@ -1,3 +1,4 @@ +pub mod basic; pub mod error; pub mod metastore; pub mod models; diff --git a/crates/core-utils/src/scan_iterator.rs b/crates/core-utils/src/scan_iterator.rs index 91cab7cb0..387e40ee9 100644 --- a/crates/core-utils/src/scan_iterator.rs +++ b/crates/core-utils/src/scan_iterator.rs @@ -17,7 +17,7 @@ pub trait ScanIterator: Sized { #[derive(Clone)] pub struct VecScanIterator serde::de::Deserialize<'de>> { - db: Arc, + db: Option>, key: String, //From where to start the scan range for SlateDB // ex: if we ended on "tested2", the cursor would be "tested2" @@ -35,17 +35,34 @@ pub struct VecScanIterator serde::de::Deserialize<'de>> { // if we however had the cursor from cursor comment (line 21) // we could also go from `tested2\x00..tes\x7F` which would yield us "tested3" and "tested4" only excluding other names if any exist token: Option, + // For in-memory mode + in_memory_items: Option>, marker: PhantomData, } impl serde::de::Deserialize<'de>> VecScanIterator { pub const fn new(db: Arc, key: String) -> Self { Self { - db, + db: Some(db), key, cursor: None, limit: None, token: None, + in_memory_items: None, + marker: PhantomData, + } + } + + /// Create an in-memory iterator from a vector (used when `SlateDB` is not available) + #[must_use] + pub const fn from_vec(items: Vec) -> Self { + Self { + db: None, + key: String::new(), + cursor: None, + limit: None, + token: None, + in_memory_items: Some(items), marker: PhantomData, } } @@ -74,7 +91,18 @@ impl serde::de::Deserialize<'de>> ScanIterator for VecScanIte fields(keys_range, items_count), err )] + #[allow(clippy::expect_used)] async fn collect(self) -> Result { + // Handle in-memory mode + if let Some(items) = self.in_memory_items { + return Ok(items); + } + + // SlateDB mode + let db = self + .db + .expect("VecScanIterator requires either db or in_memory_items"); + //We can look with respect to limit // from start to end (full scan), // from starts_with to start_with (search), @@ -98,7 +126,7 @@ impl serde::de::Deserialize<'de>> ScanIterator for VecScanIte tracing::Span::current().record("keys_range", format!("{start}..{end}")); let range = Bytes::from(start)..Bytes::from(end); - let mut iter = self.db.scan(range).await.context(errors::ScanFailedSnafu)?; + let mut iter = db.scan(range).await.context(errors::ScanFailedSnafu)?; let mut objects = Self::Collectable::new(); while let Ok(Some(bytes)) = iter.next().await { diff --git a/crates/embucketd/src/cli.rs b/crates/embucketd/src/cli.rs index 6163fcf23..2904401ae 100644 --- a/crates/embucketd/src/cli.rs +++ b/crates/embucketd/src/cli.rs @@ -276,6 +276,13 @@ pub struct CliOpts { help = "Tracing span processor" )] pub tracing_span_processor: TracingSpanProcessor, + + #[arg( + long, + env = "METASTORE_CONFIG", + help = "Path to metastore config YAML file (only for basic metastore)" + )] + pub metastore_config: Option, } #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)] diff --git a/crates/embucketd/src/main.rs b/crates/embucketd/src/main.rs index 2f8635043..a2e39dc48 100644 --- a/crates/embucketd/src/main.rs +++ b/crates/embucketd/src/main.rs @@ -36,7 +36,9 @@ use clap::Parser; use core_executor::service::CoreExecutionService; use core_executor::utils::Config as ExecutionConfig; use core_history::SlateDBHistoryStore; -use core_metastore::SlateDBMetastore; +// use core_metastore::SlateDBMetastore; +use core_metastore::basic::config::BasicMetastoreConfig; +use core_metastore::basic::metastore::BasicMetastore; use core_utils::Db; use dotenv::dotenv; use object_store::path::Path; @@ -204,8 +206,10 @@ async fn async_main( ); let db = Db::new(slate_db); - - let metastore = Arc::new(SlateDBMetastore::new(db.clone())); + // let metastore = Arc::new(SlateDBMetastore::new(db.clone())); + let metastore_mem_config = + BasicMetastoreConfig::from_yaml_file(opts.metastore_config.unwrap().to_str().unwrap())?; + let metastore = Arc::new(BasicMetastore::new(metastore_mem_config)); let history_store = Arc::new( SlateDBHistoryStore::new( db.clone(), @@ -384,7 +388,7 @@ fn setup_tracing(opts: &cli::CliOpts) -> SdkTracerProvider { let targets_with_level = |targets: &[&'static str], level: LevelFilter| -> Vec<(&str, LevelFilter)> { // let default_log_targets: Vec<(String, LevelFilter)> = - targets.iter().map(|t| ((*t), level)).collect() + targets.iter().map(|t| (*t, level)).collect() }; // Memory allocations diff --git a/metastore.yaml_example b/metastore.yaml_example new file mode 100644 index 000000000..62dccc4ea --- /dev/null +++ b/metastore.yaml_example @@ -0,0 +1,10 @@ +volumes: + - name: s3tablesdb + type: s3_tables + arn: "arn:aws:s3tables:us-east-2:111111111:bucket/s3-table-bucket" + credentials: + credential_type: access_key + aws-access-key-id: "11111111111111111111" + aws-secret-access-key: "2222222222222222222" + databases: + - name: my_s3_tables_db