diff --git a/.gitignore b/.gitignore index e6f8fcd2b..ef2c9d6fc 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ # will have compiled files and executables debug/ target/ +object_store/ # Remove Cargo.lock from gitignore if creating an executable, leave it for libraries # More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html @@ -12,6 +13,7 @@ Cargo.lock # MSVC Windows builds of rustc generate these, which store debugging information *.pdb +.DS_Store # RustRover # JetBrains specific template is maintained in a separate JetBrains.gitignore that can diff --git a/crates/control_plane/src/service.rs b/crates/control_plane/src/service.rs index c8dc02cb2..6412503b4 100644 --- a/crates/control_plane/src/service.rs +++ b/crates/control_plane/src/service.rs @@ -3,13 +3,13 @@ use crate::models::{StorageProfile, StorageProfileCreateRequest}; use crate::models::{Warehouse, WarehouseCreateRequest}; use crate::repository::{StorageProfileRepository, WarehouseRepository}; use async_trait::async_trait; -use std::sync::Arc; -use uuid::Uuid; +use datafusion::catalog_common::CatalogProvider; use datafusion::prelude::*; use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig}; use iceberg_datafusion::IcebergCatalogProvider; -use datafusion::catalog_common::CatalogProvider; use std::collections::HashMap; +use std::sync::Arc; +use uuid::Uuid; #[async_trait] pub trait ControlService: Send + Sync { @@ -33,7 +33,7 @@ pub trait ControlService: Send + Sync { // async fn delete_table(&self, id: Uuid) -> Result<()>; // async fn list_tables(&self) -> Result>; - async fn query_table(&self, warehouse_id:&Uuid, query:&String) -> Result<(&str)>; + async fn query_table(&self, warehouse_id: &Uuid, query: &String) -> Result<(&str)>; } pub struct ControlServiceImpl { @@ -102,7 +102,7 @@ impl ControlService for ControlServiceImpl { self.warehouse_repo.list().await } - async fn query_table(&self, warehouse_id:&Uuid, query:&String) -> Result<(&str)> { + async fn query_table(&self, warehouse_id: &Uuid, query: &String) -> Result<(&str)> { let config = RestCatalogConfig::builder() .uri("http://0.0.0.0:3000/catalog".to_string()) .warehouse(warehouse_id.to_string()) diff --git a/crates/nexus/src/http/router.rs b/crates/nexus/src/http/router.rs index e046be21d..7a107a56d 100644 --- a/crates/nexus/src/http/router.rs +++ b/crates/nexus/src/http/router.rs @@ -48,7 +48,7 @@ pub fn create_app(state: AppState) -> Router { .nest("/", control_router) .nest("/catalog", catalog_router) .nest("/ui", ui_router) - .merge(SwaggerUi::new("/").url("/openapi.yaml", spec)) + .merge(SwaggerUi::new("/").url("/openapi.json", spec)) .with_state(state) } diff --git a/crates/nexus/src/http/ui/handlers/common.rs b/crates/nexus/src/http/ui/handlers/common.rs index 60727d745..1bff2aa7e 100644 --- a/crates/nexus/src/http/ui/handlers/common.rs +++ b/crates/nexus/src/http/ui/handlers/common.rs @@ -1,9 +1,10 @@ use crate::http::ui::models::database::Database; use crate::http::ui::models::errors::AppError; use crate::http::ui::models::storage_profile::StorageProfile; +use crate::http::ui::models::table::{Statistics, Table}; use crate::http::ui::models::warehouse::Warehouse; use crate::state::AppState; -use catalog::models::{DatabaseIdent, Table}; +use catalog::models::{DatabaseIdent, TableIdent, WarehouseIdent}; use control_plane::models::Warehouse as WarehouseModel; use uuid::Uuid; @@ -21,7 +22,9 @@ impl AppState { }) } pub async fn get_warehouse_by_id(&self, warehouse_id: Uuid) -> Result { - self.get_warehouse_model(warehouse_id).await.map(|warehouse| warehouse.into()) + self.get_warehouse_model(warehouse_id) + .await + .map(|warehouse| warehouse.into()) } pub async fn get_profile_by_id( @@ -34,26 +37,115 @@ impl AppState { .map_err(|e| { let fmt = format!("{}: failed to get profile by id {}", e, storage_profile_id); AppError::new(e, fmt.as_str()) - }).map(|profile| profile.into()) + }) + .map(|profile| profile.into()) } - pub async fn get_database_by_ident(&self, ident: &DatabaseIdent) -> Result { + pub async fn get_database(&self, ident: &DatabaseIdent) -> Result { self.catalog_svc .get_namespace(ident) .await .map_err(|e| { let fmt = format!("{}: failed to get database with db ident {}", e, &ident); AppError::new(e, fmt.as_str()) - }).map(|database| database.into()) + }) + .map(|database| database.into()) + } + + pub async fn list_warehouses(&self) -> Result, AppError> { + let warehouses: Vec = self + .control_svc + .list_warehouses() + .await + .map_err(|e| { + let fmt = format!("{}: failed to get warehouses", e); + AppError::new(e, fmt.as_str()) + })? + .into_iter() + .map(|w| w.into()) + .collect(); + + let mut result = Vec::new(); + for mut warehouse in warehouses { + let databases = self.list_databases(warehouse.id).await?; + + let profile = self + .get_profile_by_id(warehouse.storage_profile_id.unwrap()) + .await?; + + let mut total_statistics = Statistics::default(); + databases.iter().for_each(|database| { + let stats = database.clone().statistics.unwrap_or_default(); + total_statistics = total_statistics.aggregate(&stats); + }); + + warehouse.with_details( + Option::from(profile.clone()), + Option::from(databases.clone()), + ); + warehouse.statistics = Some(total_statistics); + result.push(warehouse) + } + Ok(result) + } + pub async fn list_databases(&self, warehouse_id: Uuid) -> Result, AppError> { + let ident = &WarehouseIdent::new(warehouse_id); + let databases = self + .catalog_svc + .list_namespaces(ident, None) + .await + .map_err(|e| { + let fmt = format!( + "{}: failed to get warehouse databases with wh id {}", + e, warehouse_id + ); + AppError::new(e, fmt.as_str()) + })?; + + let mut database_entities = Vec::new(); + for database in databases { + let tables = self.catalog_svc.list_tables(&database.ident).await?; + let mut total_statistics = Statistics::default(); + + for table in tables { + let table_stats = Statistics::from_table_metadata(&table.metadata); + total_statistics = total_statistics.aggregate(&table_stats); + } + + total_statistics.database_count = Option::from(1); + let mut entity = Database::from(database); + entity.statistics = Option::from(total_statistics); + database_entities.push(entity); + } + Ok(database_entities) } pub async fn list_tables(&self, ident: &DatabaseIdent) -> Result, AppError> { - self.catalog_svc.list_tables(ident).await.map_err(|e| { + let tables = self + .catalog_svc + .list_tables(ident) + .await + .map_err(|e| { + let fmt = format!( + "{}: failed to get database tables with db ident {}", + e, &ident + ); + AppError::new(e, fmt.as_str()) + })? + .into_iter() + .map(|table| table.into()) + .collect(); + Ok(tables) + } + + pub async fn get_table(&self, ident: &TableIdent) -> Result { + let table = self.catalog_svc.load_table(ident).await.map_err(|e| { let fmt = format!( "{}: failed to get database tables with db ident {}", e, &ident ); AppError::new(e, fmt.as_str()) - }) + })?; + Ok(table.into()) } } diff --git a/crates/nexus/src/http/ui/handlers/databases.rs b/crates/nexus/src/http/ui/handlers/databases.rs index 447519f0c..c4c16f510 100644 --- a/crates/nexus/src/http/ui/handlers/databases.rs +++ b/crates/nexus/src/http/ui/handlers/databases.rs @@ -1,8 +1,5 @@ -use crate::http::ui::models::database; -use crate::http::ui::models::database::{get_database_id, DatabaseDashboard}; +use crate::http::ui::models::database::{CreateDatabasePayload, Database}; use crate::http::ui::models::errors::AppError; -use crate::http::ui::models::table::{get_table_id, TableEntity}; -use crate::http::ui::models::warehouse::WarehouseEntity; use crate::state::AppState; use axum::{extract::Path, extract::State, Json}; use catalog::models::{DatabaseIdent, WarehouseIdent}; @@ -20,12 +17,8 @@ use uuid::Uuid; ), components( schemas( - database::CreateDatabasePayload, - database::Database, - database::DatabaseDashboard, - database::DatabaseEntity, - database::DatabaseExtended, - database::DatabaseShort, + CreateDatabasePayload, + Database, AppError, ) ), @@ -42,9 +35,9 @@ pub struct ApiDoc; params( ("warehouseId" = Uuid, description = "Warehouse ID"), ), - request_body = database::CreateDatabasePayload, + request_body = CreateDatabasePayload, responses( - (status = 200, description = "Successful Response", body = database::Database), + (status = 200, description = "Successful Response", body = Database), (status = 400, description = "Bad request", body = AppError), (status = 422, description = "Unprocessable entity", body = AppError), (status = 500, description = "Internal server error", body = AppError) @@ -53,8 +46,8 @@ pub struct ApiDoc; pub async fn create_database( State(state): State, Path(warehouse_id): Path, - Json(payload): Json, -) -> Result, AppError> { + Json(payload): Json, +) -> Result, AppError> { let warehouse = state.get_warehouse_by_id(warehouse_id).await?; let ident = DatabaseIdent { warehouse: WarehouseIdent::new(warehouse.id), @@ -114,7 +107,7 @@ pub async fn delete_database( ), operation_id = "webDatabaseDashboard", responses( - (status = 200, description = "Successful Response", body = DatabaseDashboard), + (status = 200, description = "Successful Response", body = Database), (status = 404, description = "Database not found", body = AppError), (status = 422, description = "Unprocessable entity", body = AppError), ) @@ -122,36 +115,19 @@ pub async fn delete_database( pub async fn get_database( State(state): State, Path((warehouse_id, database_name)): Path<(Uuid, String)>, -) -> Result, AppError> { - let warehouse = state.get_warehouse_by_id(warehouse_id).await?; - let profile = state.get_profile_by_id(warehouse.storage_profile_id).await?; +) -> Result, AppError> { + let mut warehouse = state.get_warehouse_by_id(warehouse_id).await?; + let profile = state + .get_profile_by_id(warehouse.storage_profile_id.unwrap()) + .await?; let ident = DatabaseIdent { warehouse: WarehouseIdent::new(warehouse.id), namespace: NamespaceIdent::new(database_name), }; - let database = state.get_database_by_ident(&ident).await?; - let tables = state.catalog_svc.list_tables(&ident).await?; - Ok(Json(DatabaseDashboard { - name: ident.namespace.first().unwrap().to_string(), - properties: Option::from(database.properties), - id: get_database_id(ident), - warehouse_id, - warehouse: WarehouseEntity::new(warehouse.into(), profile.into()), - tables: tables - .into_iter() - .map(|t| { - let ident = t.ident.clone(); - TableEntity { - id: get_table_id(t.ident), - name: ident.table, - created_at: Default::default(), - updated_at: Default::default(), - statistics: Default::default(), - compaction_summary: None, - } - }) - .collect(), - statistics: Default::default(), - compaction_summary: None, - })) + let mut database = state.get_database(&ident).await?; + let tables = state.list_tables(&ident).await?; + + warehouse.with_details(Option::from(profile), None); + database.with_details(Option::from(tables)); + Ok(Json(database)) } diff --git a/crates/nexus/src/http/ui/handlers/tables.rs b/crates/nexus/src/http/ui/handlers/tables.rs index bba17b8bd..ce37a5c50 100644 --- a/crates/nexus/src/http/ui/handlers/tables.rs +++ b/crates/nexus/src/http/ui/handlers/tables.rs @@ -1,8 +1,6 @@ use crate::http::ui::models::errors::AppError; -use crate::http::ui::models::table; -use crate::http::ui::models::table::TableQueryRequest; use crate::http::ui::models::table::{ - Table, TableCreateRequest, TableExtended, TableQueryResponse, + Table, TableCreatePayload, TableQueryRequest, TableQueryResponse, }; use crate::state::AppState; use axum::{extract::Path, extract::State, Json}; @@ -23,8 +21,7 @@ use uuid::Uuid; schemas( TableQueryResponse, TableQueryRequest, - TableExtended, - TableCreateRequest, + TableCreatePayload, Table, AppError, ) @@ -45,7 +42,7 @@ pub struct ApiDoc; ("tableName" = String, description = "Table name") ), responses( - (status = 200, description = "List all warehouses"), + (status = 200, description = "Get table"), (status = 404, description = "Not found", body = AppError), (status = 422, description = "Unprocessable entity", body = AppError), (status = 500, description = "Internal server error", body = AppError) @@ -54,35 +51,23 @@ pub struct ApiDoc; pub async fn get_table( State(state): State, Path((warehouse_id, database_name, table_name)): Path<(Uuid, String, String)>, -) -> Result, AppError> { - let warehouse = state.get_warehouse_by_id(warehouse_id).await?; +) -> Result, AppError> { + let mut warehouse = state.get_warehouse_by_id(warehouse_id).await?; let profile = state - .get_profile_by_id(warehouse.storage_profile_id) + .get_profile_by_id(warehouse.storage_profile_id.unwrap()) .await?; let ident = DatabaseIdent { warehouse: WarehouseIdent::new(warehouse.id), namespace: NamespaceIdent::new(database_name), }; - let database = state.get_database_by_ident(&ident).await?; + let mut database = state.get_database(&ident).await?; let table_ident = TableIdent { database: ident, table: table_name, }; - let table = state - .catalog_svc - .load_table(&table_ident) - .await - .map_err(|e| { - let fmt = format!("{}: failed to get table with ident {}", e, &table_ident); - AppError::new(e, fmt.as_str()) - })?; - - Ok(Json(TableExtended::new( - profile.into(), - warehouse.into(), - database.into(), - table, - ))) + warehouse.with_details(Option::from(profile), None); + let mut table = state.get_table(&table_ident).await?; + Ok(Json(table)) } #[utoipa::path( @@ -101,7 +86,7 @@ pub async fn get_table( pub async fn create_table( State(state): State, Path((warehouse_id, database_name)): Path<(Uuid, String)>, - Json(payload): Json, + Json(payload): Json, ) -> Result, AppError> { let warehouse = state.get_warehouse_model(warehouse_id).await?; let db_ident = DatabaseIdent { @@ -175,14 +160,14 @@ pub async fn delete_table( pub async fn query_table( State(state): State, Path((warehouse_id, database_name, table_name)): Path<(Uuid, String, String)>, - Json(payload): Json, -) -> Result, AppError> { + Json(payload): Json, +) -> Result, AppError> { let request: TableQueryRequest = payload.into(); let result = state .control_svc .query_table(&warehouse_id, &request.query) .await?; - Ok(Json(table::TableQueryResponse { + Ok(Json(TableQueryResponse { id: Default::default(), query: request.query.clone(), result: result.to_string(), diff --git a/crates/nexus/src/http/ui/handlers/warehouses.rs b/crates/nexus/src/http/ui/handlers/warehouses.rs index 236b4b4d8..1090207e1 100644 --- a/crates/nexus/src/http/ui/handlers/warehouses.rs +++ b/crates/nexus/src/http/ui/handlers/warehouses.rs @@ -1,10 +1,10 @@ -use crate::http::ui::models::database::{get_database_id, DatabaseShort}; use crate::http::ui::models::errors::AppError; -use crate::http::ui::models::table::{get_table_id, TableShort}; -use crate::http::ui::models::warehouse; +use crate::http::ui::models::table::Statistics; +use crate::http::ui::models::warehouse::{ + CreateWarehousePayload, Navigation, Warehouse, WarehousesDashboard, +}; use crate::state::AppState; use axum::{extract::Path, extract::State, Json}; -use catalog::models::WarehouseIdent; use control_plane::models::{Warehouse as WarehouseModel, WarehouseCreateRequest}; use utoipa::OpenApi; use uuid::Uuid; @@ -20,12 +20,9 @@ use uuid::Uuid; ), components( schemas( - warehouse::CreateWarehousePayload, - warehouse::Warehouse, - warehouse::WarehouseExtended, - warehouse::WarehousesDashboard, - warehouse::WarehouseEntity, - warehouse::WarehouseShort, + CreateWarehousePayload, + Warehouse, + Navigation, AppError, ) ), @@ -40,58 +37,13 @@ pub struct ApiDoc; path = "/ui/navigation", operation_id = "webWarehousesNavigation", responses( - (status = 200, description = "List all warehouses fot navigation", body = warehouse::Navigation), + (status = 200, description = "List all warehouses fot navigation", body = Navigation), ) )] -pub async fn navigation( - State(state): State, -) -> Result, AppError> { - let warehouses = state.control_svc.list_warehouses().await.map_err(|e| { - let fmt = format!("{}: failed to get warehouses", e); - AppError::new(e, fmt.as_str()) - })?; - let mut warehouses_short = Vec::new(); - - for warehouse in warehouses { - let databases = state - .catalog_svc - .list_namespaces(&WarehouseIdent::new(warehouse.id), None) - .await - .map_err(|e| { - let fmt = format!( - "{}: failed to get warehouse databases with wh id {}", - e, warehouse.id - ); - AppError::new(e, fmt.as_str()) - })?; - let mut databases_short = Vec::new(); - - for database in databases { - let tables = state.catalog_svc.list_tables(&database.ident).await?; - let ident = database.ident.clone(); - databases_short.push(DatabaseShort { - id: get_database_id(database.ident), - name: ident.to_string(), - tables: tables - .into_iter() - .map(|t| { - let ident = t.ident.clone(); - TableShort { - id: get_table_id(t.ident), - name: ident.table, - } - }) - .collect(), - }); - } - warehouses_short.push(warehouse::WarehouseShort { - id: warehouse.id, - name: warehouse.name, - databases: databases_short, - }); - } - Ok(Json(warehouse::Navigation { - warehouses: warehouses_short, +pub async fn navigation(State(state): State) -> Result, AppError> { + let warehouses = state.list_warehouses().await?; + Ok(Json(Navigation { + warehouses, })) } #[utoipa::path( @@ -99,38 +51,30 @@ pub async fn navigation( path = "/ui/warehouses", operation_id = "webWarehousesDashboard", responses( - (status = 200, description = "List all warehouses", body = warehouse::WarehousesDashboard), + (status = 200, description = "List all warehouses", body = WarehousesDashboard), (status = 500, description = "List all warehouses error", body = AppError), ) )] pub async fn list_warehouses( State(state): State, -) -> Result, AppError> { - let warehouses = state.control_svc.list_warehouses().await.map_err(|e| { - let fmt = format!("{}: failed to get warehouses", e); - AppError::new(e, fmt.as_str()) - })?; - let storage_profiles = state.control_svc.list_profiles().await.map_err(|e| { - let fmt = format!("{}: failed to get profiles", e); - AppError::new(e, fmt.as_str()) - })?; - let mut extended_warehouses = Vec::new(); +) -> Result, AppError> { + let warehouses = state.list_warehouses().await?; + + let mut total_statistics = Statistics::default(); + let dashboards = warehouses + .into_iter() + .map(|warehouse| { + if let Some(stats) = &warehouse.statistics { + total_statistics = total_statistics.aggregate(stats); + } + warehouse.into() + }) + .collect(); - for warehouse in warehouses { - let mut extended_warehouse = - warehouse::WarehouseExtended::new(warehouse.clone().into(), Default::default()); - if let Some(profile) = storage_profiles - .iter() - .find(|p| p.id == extended_warehouse.storage_profile_id) - { - extended_warehouse.storage_profile = profile.clone().into(); - extended_warehouses.push(extended_warehouse) - } - } - Ok(Json(warehouse::WarehousesDashboard { - warehouses: extended_warehouses, - statistics: Default::default(), + Ok(Json(WarehousesDashboard { + warehouses: dashboards, + statistics: total_statistics, compaction_summary: None, })) } @@ -143,61 +87,38 @@ pub async fn list_warehouses( ("warehouseId" = Uuid, Path, description = "Warehouse ID") ), responses( - (status = 200, description = "Warehouse found", body = warehouse::WarehouseDashboard), + (status = 200, description = "Warehouse found", body = Warehouse), (status = 404, description = "Warehouse not found", body = AppError) ) )] pub async fn get_warehouse( State(state): State, Path(warehouse_id): Path, -) -> Result, AppError> { - let warehouse = state.get_warehouse_by_id(warehouse_id).await?; +) -> Result, AppError> { + let mut warehouse = state.get_warehouse_by_id(warehouse_id).await?; let profile = state - .get_profile_by_id(warehouse.storage_profile_id) + .get_profile_by_id(warehouse.storage_profile_id.unwrap()) .await?; - let databases = state - .catalog_svc - .list_namespaces(&WarehouseIdent::new(warehouse.id), None) - .await - .map_err(|e| { - let fmt = format!( - "{}: failed to get warehouse databases with wh id {}", - e, warehouse.id - ); - AppError::new(e, fmt.as_str()) - })?; - let mut dashboard = warehouse::WarehouseDashboard::new( - warehouse.into(), - profile.into(), - databases.into_iter().map(|d| d.into()).collect(), - ); - - if let Ok(profile) = state - .control_svc - .get_profile(dashboard.storage_profile_id) - .await - { - dashboard.storage_profile = profile.into(); - } - - Ok(Json(dashboard)) + let databases = state.list_databases(warehouse_id).await?; + warehouse.with_details(Option::from(profile), Option::from(databases)); + Ok(Json(warehouse)) } #[utoipa::path( post, path = "/ui/warehouses", - request_body = warehouse::CreateWarehousePayload, + request_body = CreateWarehousePayload, operation_id = "webCreateWarehouse", responses( - (status = 201, description = "Warehouse created", body = warehouse::Warehouse), + (status = 201, description = "Warehouse created", body = Warehouse), (status = 422, description = "Unprocessable Entity", body = AppError), (status = 500, description = "Internal server error", body = AppError) ) )] pub async fn create_warehouse( State(state): State, - Json(payload): Json, -) -> Result, AppError> { + Json(payload): Json, +) -> Result, AppError> { let request: WarehouseCreateRequest = payload.into(); state.get_profile_by_id(request.storage_profile_id).await?; let warehouse: WarehouseModel = diff --git a/crates/nexus/src/http/ui/models/database.rs b/crates/nexus/src/http/ui/models/database.rs index 1e2130a64..e911ca53a 100644 --- a/crates/nexus/src/http/ui/models/database.rs +++ b/crates/nexus/src/http/ui/models/database.rs @@ -1,6 +1,4 @@ -use crate::http::ui::models::storage_profile::StorageProfile; -use crate::http::ui::models::table::{Statistics, TableEntity, TableShort}; -use crate::http::ui::models::warehouse::{Warehouse, WarehouseEntity, WarehouseExtended}; +use crate::http::ui::models::table::{Statistics, Table}; use catalog::models; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -8,8 +6,6 @@ use utoipa::ToSchema; use uuid::Uuid; use validator::Validate; -pub const DATABASE_NAME: &str = "database_name"; - #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] pub struct CreateDatabasePayload { pub name: String, @@ -29,160 +25,54 @@ impl CreateDatabasePayload { #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] pub struct Database { - pub name: String, - #[serde(skip_serializing_if = "Option::is_none")] - pub properties: Option>, pub id: Uuid, - pub warehouse_id: Uuid, -} - -impl Database { - #[allow(clippy::new_without_default)] - pub fn new(name: String, id: Uuid, warehouse_id: Uuid) -> Database { - Database { - name, - properties: None, - id, - warehouse_id, - } - } -} - -impl From for Database { - fn from(db: models::Database) -> Self { - Database { - id: get_database_id(db.ident.clone()), - name: db.ident.namespace.first().unwrap().to_string(), - properties: db.properties.into(), - warehouse_id: *db.ident.warehouse.id(), - } - } -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] -pub struct DatabaseDashboard { pub name: String, + pub tables: Vec, #[serde(skip_serializing_if = "Option::is_none")] pub properties: Option>, - pub id: Uuid, - pub warehouse_id: Uuid, - pub warehouse: WarehouseEntity, - pub tables: Vec, - pub statistics: Statistics, #[serde(skip_serializing_if = "Option::is_none")] - pub compaction_summary: Option, -} - -impl DatabaseDashboard { - #[allow(clippy::new_without_default)] - pub fn new( - name: String, - id: Uuid, - warehouse_id: Uuid, - warehouse: WarehouseEntity, - tables: Vec, - statistics: Statistics, - ) -> DatabaseDashboard { - DatabaseDashboard { - name, - properties: None, - id, - warehouse_id, - warehouse, - tables, - statistics, - compaction_summary: None, - } - } -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] -pub struct DatabaseEntity { - pub id: Uuid, - pub name: String, - pub created_at: chrono::DateTime, - pub updated_at: chrono::DateTime, - pub statistics: Statistics, + pub warehouse_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub statistics: Option, #[serde(skip_serializing_if = "Option::is_none")] pub compaction_summary: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub created_at: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub updated_at: Option>, } -impl DatabaseEntity { - #[allow(clippy::new_without_default)] - pub fn new( - id: Uuid, - name: String, - created_at: chrono::DateTime, - updated_at: chrono::DateTime, - statistics: Statistics, - ) -> DatabaseEntity { - DatabaseEntity { - id, - name, - created_at, - updated_at, - statistics, - compaction_summary: None, +impl Database { + pub fn with_details(&mut self, tables: Option>) { + if tables.is_some() { + let mut total_statistics = Statistics::default(); + + for t in tables.clone().unwrap() { + total_statistics = total_statistics.aggregate(&t.statistics.unwrap_or_default()); + } + total_statistics.database_count = Some(1); + self.statistics = Option::from(total_statistics); + self.tables = tables.unwrap(); } } } -impl From for DatabaseEntity { +impl From for Database { fn from(db: models::Database) -> Self { - DatabaseEntity { + Database { id: get_database_id(db.ident.clone()), name: db.ident.namespace.first().unwrap().to_string(), + tables: vec![], + warehouse_id: None, created_at: Default::default(), updated_at: Default::default(), statistics: Default::default(), compaction_summary: None, + properties: None, } } } -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] -pub struct DatabaseExtended { - pub name: String, - #[serde(skip_serializing_if = "Option::is_none")] - pub properties: Option>, - pub id: Uuid, - pub warehouse_id: Uuid, - #[serde(skip_serializing_if = "Option::is_none")] - pub statistics: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub compaction_summary: Option, - pub created_at: chrono::DateTime, - pub updated_at: chrono::DateTime, - pub warehouse: WarehouseExtended, -} - -impl DatabaseExtended { - pub fn new( - profile: StorageProfile, - warehouse: Warehouse, - database: Database, - ) -> DatabaseExtended { - DatabaseExtended { - id: database.id, - name: database.name, - warehouse_id: database.warehouse_id, - properties: database.properties, - statistics: None, - compaction_summary: None, - created_at: Default::default(), - updated_at: Default::default(), - warehouse: WarehouseExtended::new(warehouse, profile), - } - } -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] -pub struct DatabaseShort { - pub id: Uuid, - pub name: String, - pub tables: Vec, -} - pub fn get_database_id(ident: models::DatabaseIdent) -> Uuid { Uuid::new_v5( &Uuid::NAMESPACE_DNS, @@ -190,13 +80,6 @@ pub fn get_database_id(ident: models::DatabaseIdent) -> Uuid { ) } -impl DatabaseShort { - #[allow(clippy::new_without_default)] - pub fn new(id: Uuid, name: String, tables: Vec) -> DatabaseShort { - DatabaseShort { id, name, tables } - } -} - #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] pub struct CompactionSummary { pub compactions: i32, diff --git a/crates/nexus/src/http/ui/models/table.rs b/crates/nexus/src/http/ui/models/table.rs index 9610d835a..41ed02a8f 100644 --- a/crates/nexus/src/http/ui/models/table.rs +++ b/crates/nexus/src/http/ui/models/table.rs @@ -1,8 +1,7 @@ -use crate::http::ui::models::database::{CompactionSummary, Database, DatabaseExtended}; +use crate::http::ui::models::database::CompactionSummary; use crate::http::ui::models::metadata::TableMetadataWrapper; -use crate::http::ui::models::storage_profile::StorageProfile; -use crate::http::ui::models::warehouse::Warehouse; use catalog::models as CatalogModels; +use iceberg::spec::TableMetadata; use iceberg::spec::{Schema, SortOrder, UnboundPartitionSpec}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -11,8 +10,12 @@ use utoipa::{PartialSchema, ToSchema}; use uuid::Uuid; use validator::Validate; +pub fn get_table_id(ident: CatalogModels::TableIdent) -> Uuid { + Uuid::new_v5(&Uuid::NAMESPACE_DNS, ident.table.to_string().as_bytes()) +} + #[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Validate, ToSchema)] -pub struct TableCreateRequest { +pub struct TableCreatePayload { pub name: String, pub location: Option, pub schema: SchemaWrapper, @@ -22,8 +25,8 @@ pub struct TableCreateRequest { pub properties: Option>, } -impl From for catalog::models::TableCreation { - fn from(schema: TableCreateRequest) -> Self { +impl From for catalog::models::TableCreation { + fn from(schema: TableCreatePayload) -> Self { catalog::models::TableCreation { name: schema.name, location: schema.location, @@ -35,101 +38,52 @@ impl From for catalog::models::TableCreation { } } -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, ToSchema)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)] pub struct Table { pub id: Uuid, pub name: String, - pub metadata: TableMetadataWrapper, - pub metadata_location: String, -} - -impl From for Table { - fn from(table: catalog::models::Table) -> Self { - Self { - id: get_table_id(table.clone().ident), - name: table.ident.table, - metadata_location: table.metadata_location, - metadata: TableMetadataWrapper(table.metadata), - } - } -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] -pub struct TableShort { - pub id: Uuid, - pub name: String, -} -pub fn get_table_id(ident: CatalogModels::TableIdent) -> Uuid { - Uuid::new_v5(&Uuid::NAMESPACE_DNS, ident.table.to_string().as_bytes()) -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] -pub struct TableEntity { - pub id: Uuid, - pub name: String, - pub created_at: chrono::DateTime, - pub updated_at: chrono::DateTime, - pub statistics: Statistics, #[serde(skip_serializing_if = "Option::is_none")] - pub compaction_summary: Option, -} - -impl TableEntity { - #[allow(clippy::new_without_default)] - pub fn new( - id: Uuid, - name: String, - created_at: chrono::DateTime, - updated_at: chrono::DateTime, - statistics: Statistics, - ) -> TableEntity { - TableEntity { - id, - name, - created_at, - updated_at, - statistics, - compaction_summary: None, - } - } -} -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)] -pub struct TableExtended { - pub id: Uuid, - pub name: String, - pub database_name: String, - pub warehouse_id: Uuid, + pub database_name: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub warehouse_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub properties: Option>, - pub metadata: TableMetadataWrapper, - pub metadata_location: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub metadata: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub metadata_location: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub statistics: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub compaction_summary: Option, - pub created_at: chrono::DateTime, - pub updated_at: chrono::DateTime, - pub database: DatabaseExtended, + #[serde(skip_serializing_if = "Option::is_none")] + pub created_at: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub updated_at: Option>, } -impl TableExtended { - #[allow(clippy::new_without_default)] - pub fn new( - profile: StorageProfile, - warehouse: Warehouse, - database: Database, - table: CatalogModels::Table, - ) -> TableExtended { - TableExtended { +// impl Table { +// pub fn with_details(&mut self, database: Option) { +// if database.is_some() { +// self.database = database; +// } +// } +// } + +impl From for Table { + fn from(table: catalog::models::Table) -> Self { + Self { id: get_table_id(table.clone().ident), name: table.ident.table, - database_name: table.ident.database.to_string(), - warehouse_id: warehouse.id, + database_name: None, + warehouse_id: None, properties: None, - metadata: TableMetadataWrapper(table.metadata), - metadata_location: table.metadata_location, - statistics: None, - compaction_summary: None, + metadata: None, created_at: Default::default(), updated_at: Default::default(), - database: DatabaseExtended::new(profile, warehouse, database), + statistics: Option::from(Statistics::from_table_metadata(&table.metadata)), + compaction_summary: None, + metadata_location: None, } } } @@ -154,20 +108,65 @@ pub struct Statistics { } impl Statistics { - #[allow(clippy::new_without_default)] - pub fn new( - commit_count: i32, - op_append_count: i32, - op_overwrite_count: i32, - op_delete_count: i32, - op_replace_count: i32, - total_bytes: i32, - bytes_added: i32, - bytes_removed: i32, - total_rows: i32, - rows_added: i32, - rows_deleted: i32, - ) -> Statistics { + pub fn from_table_metadata(metadata: &TableMetadata) -> Statistics { + let mut total_bytes = 0; + let mut total_rows = 0; + let mut rows_deleted = 0; + let mut commit_count = 0; + let mut op_append_count = 0; + let mut op_overwrite_count = 0; + let mut op_delete_count = 0; + let mut op_replace_count = 0; + let mut bytes_added = 0; + let mut rows_added = 0; + + if let Some(latest_snapshot) = metadata.current_snapshot() { + total_bytes = latest_snapshot + .summary() + .other + .get("total-files-size") + .unwrap() + .parse::() + .unwrap(); + total_rows = latest_snapshot + .summary() + .other + .get("total-records") + .unwrap() + .parse::() + .unwrap(); + rows_deleted = latest_snapshot + .summary() + .other + .get("removed-records") + .unwrap() + .parse::() + .unwrap(); + }; + + metadata.snapshots().for_each(|snapshot| { + let summary = snapshot.summary(); + commit_count += 1; + bytes_added += summary + .other + .get("added-files-size") + .unwrap() + .parse::() + .unwrap(); + rows_added += summary + .other + .get("added-records") + .unwrap() + .parse::() + .unwrap(); + match summary.operation { + iceberg::spec::Operation::Append => op_append_count += 1, + iceberg::spec::Operation::Overwrite => op_overwrite_count += 1, + iceberg::spec::Operation::Delete => op_delete_count += 1, + iceberg::spec::Operation::Replace => op_replace_count += 1, + } + }); + Statistics { commit_count, op_append_count, @@ -176,14 +175,36 @@ impl Statistics { op_replace_count, total_bytes, bytes_added, - bytes_removed, + bytes_removed: 0, total_rows, rows_added, rows_deleted, - table_count: None, + table_count: Option::from(1), database_count: None, } } + + pub fn aggregate(&self, other: &Statistics) -> Statistics { + Statistics { + commit_count: self.commit_count + other.commit_count, + op_append_count: self.op_append_count + other.op_append_count, + op_overwrite_count: self.op_overwrite_count + other.op_overwrite_count, + op_delete_count: self.op_delete_count + other.op_delete_count, + op_replace_count: self.op_replace_count + other.op_replace_count, + total_bytes: self.total_bytes + other.total_bytes, + bytes_added: self.bytes_added + other.bytes_added, + bytes_removed: self.bytes_removed + other.bytes_removed, + total_rows: self.total_rows + other.total_rows, + rows_added: self.rows_added + other.rows_added, + rows_deleted: self.rows_deleted + other.rows_deleted, + table_count: self.table_count.map_or(other.table_count, |count| { + Some(count + other.table_count.unwrap_or(0)) + }), + database_count: self.database_count.map_or(other.database_count, |count| { + Some(count + other.database_count.unwrap_or(0)) + }), + } + } } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] diff --git a/crates/nexus/src/http/ui/models/warehouse.rs b/crates/nexus/src/http/ui/models/warehouse.rs index c152e6f21..a34c35ed7 100644 --- a/crates/nexus/src/http/ui/models/warehouse.rs +++ b/crates/nexus/src/http/ui/models/warehouse.rs @@ -1,6 +1,6 @@ #![allow(unused_qualifications)] -use crate::http::ui::models::database::{CompactionSummary, DatabaseEntity, DatabaseShort}; +use crate::http::ui::models::database::{CompactionSummary, Database}; use crate::http::ui::models::storage_profile::StorageProfile; use crate::http::ui::models::table::Statistics; use chrono::{DateTime, Utc}; @@ -11,12 +11,12 @@ use validator::Validate; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] pub struct Navigation { - pub warehouses: Vec, + pub warehouses: Vec, } impl Navigation { #[allow(clippy::new_without_default)] - pub fn new(warehouses: Vec) -> Navigation { + pub fn new(warehouses: Vec) -> Navigation { Navigation { warehouses } } } @@ -55,196 +55,67 @@ impl From for models::WarehouseCreateRequest { } } + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] pub struct Warehouse { - #[validate(length(min = 1))] - pub name: String, - pub storage_profile_id: uuid::Uuid, - #[validate(length(min = 1))] - #[serde(rename = "prefix")] - pub key_prefix: String, pub id: uuid::Uuid, - pub external_id: uuid::Uuid, - pub location: String, - pub created_at: chrono::DateTime, - pub updated_at: chrono::DateTime, -} - -impl Warehouse { - #[allow(clippy::new_without_default)] - pub fn new( - name: String, - storage_profile_id: uuid::Uuid, - key_prefix: String, - id: uuid::Uuid, - external_id: uuid::Uuid, - location: String, - created_at: chrono::DateTime, - updated_at: chrono::DateTime, - ) -> Warehouse { - Warehouse { - name, - storage_profile_id, - key_prefix, - id, - external_id, - location, - created_at, - updated_at, - } - } -} - -impl From for Warehouse { - fn from(warehouse: control_plane::models::Warehouse) -> Self { - Warehouse { - id: warehouse.id, - key_prefix: warehouse.prefix, - name: warehouse.name, - location: warehouse.location, - storage_profile_id: warehouse.storage_profile_id, - created_at: DateTime::from_naive_utc_and_offset(warehouse.created_at, Utc), - updated_at: DateTime::from_naive_utc_and_offset(warehouse.updated_at, Utc), - external_id: Default::default(), - } - } -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] -pub struct WarehouseDashboard { #[validate(length(min = 1))] pub name: String, - pub storage_profile_id: uuid::Uuid, + pub databases: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub storage_profile_id: Option, #[validate(length(min = 1))] - pub key_prefix: String, - pub id: uuid::Uuid, - pub external_id: uuid::Uuid, - pub location: String, - pub created_at: chrono::DateTime, - pub updated_at: chrono::DateTime, - pub storage_profile: StorageProfile, - pub databases: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub key_prefix: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub external_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub location: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub created_at: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub updated_at: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub storage_profile: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub statistics: Option, #[serde(skip_serializing_if = "Option::is_none")] pub compaction_summary: Option, } -impl WarehouseDashboard { - #[allow(clippy::new_without_default)] - pub fn new( - warehouse: Warehouse, - storage_profile: StorageProfile, - databases: Vec, - ) -> WarehouseDashboard { - WarehouseDashboard { - id: warehouse.id, - key_prefix: warehouse.key_prefix, - name: warehouse.name, - location: warehouse.location, - storage_profile_id: warehouse.storage_profile_id, - created_at: warehouse.created_at, - updated_at: warehouse.updated_at, - external_id: warehouse.external_id, - statistics: None, - compaction_summary: None, - storage_profile, - databases: databases, +impl Warehouse { + pub fn with_details(&mut self, profile: Option, databases: Option>) { + if profile.is_some() { + self.storage_profile = profile; } - } -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] -pub struct WarehouseEntity { - #[validate(length(min = 1))] - pub name: String, - pub storage_profile_id: uuid::Uuid, - #[validate(length(min = 1))] - pub key_prefix: String, - pub id: uuid::Uuid, - pub external_id: uuid::Uuid, - pub location: String, - pub created_at: chrono::DateTime, - pub updated_at: chrono::DateTime, - pub storage_profile: StorageProfile, -} - -impl WarehouseEntity { - #[allow(clippy::new_without_default)] - pub fn new(warehouse: Warehouse, storage_profile: StorageProfile) -> WarehouseEntity { - WarehouseEntity { - id: warehouse.id, - key_prefix: warehouse.key_prefix, - name: warehouse.name, - location: warehouse.location, - storage_profile_id: warehouse.storage_profile_id, - created_at: warehouse.created_at, - updated_at: warehouse.updated_at, - external_id: warehouse.external_id, - storage_profile, + if databases.is_some() { + self.databases = databases; } } } - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] -pub struct WarehouseExtended { - #[validate(length(min = 1))] - pub name: String, - pub storage_profile_id: uuid::Uuid, - #[validate(length(min = 1))] - pub key_prefix: String, - pub id: uuid::Uuid, - pub external_id: uuid::Uuid, - pub location: String, - pub created_at: chrono::DateTime, - pub updated_at: chrono::DateTime, - #[serde(skip_serializing_if = "Option::is_none")] - pub statistics: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub compaction_summary: Option, - pub storage_profile: StorageProfile, -} - -impl WarehouseExtended { - #[allow(clippy::new_without_default)] - pub fn new(warehouse: Warehouse, storage_profile: StorageProfile) -> WarehouseExtended { - WarehouseExtended { +impl From for Warehouse { + fn from(warehouse: control_plane::models::Warehouse) -> Self { + Warehouse { id: warehouse.id, - key_prefix: warehouse.key_prefix, + key_prefix: Option::from(warehouse.prefix), name: warehouse.name, - location: warehouse.location, - storage_profile_id: warehouse.storage_profile_id, - created_at: warehouse.created_at, - updated_at: warehouse.updated_at, - external_id: warehouse.external_id, + location: Option::from(warehouse.location), + storage_profile_id: Option::from(warehouse.storage_profile_id), + created_at: Option::from(DateTime::from_naive_utc_and_offset(warehouse.created_at, Utc)), + updated_at: Option::from(DateTime::from_naive_utc_and_offset(warehouse.updated_at, Utc)), + storage_profile: None, statistics: None, + external_id: Default::default(), + databases: None, compaction_summary: None, - storage_profile, - } - } -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)] -pub struct WarehouseShort { - pub id: uuid::Uuid, - pub name: String, - pub databases: Vec, -} - -impl WarehouseShort { - #[allow(clippy::new_without_default)] - pub fn new(id: uuid::Uuid, name: String, databases: Vec) -> WarehouseShort { - WarehouseShort { - id, - name, - databases, } } } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, Default, ToSchema)] pub struct WarehousesDashboard { - pub warehouses: Vec, + pub warehouses: Vec, pub statistics: Statistics, #[serde(skip_serializing_if = "Option::is_none")] pub compaction_summary: Option, @@ -252,11 +123,11 @@ pub struct WarehousesDashboard { impl WarehousesDashboard { #[allow(clippy::new_without_default)] - pub fn new(warehouses: Vec, statistics: Statistics) -> WarehousesDashboard { + pub fn new(warehouses: Vec, statistics: Statistics) -> WarehousesDashboard { WarehousesDashboard { warehouses, statistics, compaction_summary: None, } } -} +} \ No newline at end of file