Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# will have compiled files and executables
debug/
target/
object_store/

# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
Expand All @@ -12,6 +13,7 @@ Cargo.lock

# MSVC Windows builds of rustc generate these, which store debugging information
*.pdb
.DS_Store

# RustRover
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
Expand Down
10 changes: 5 additions & 5 deletions crates/control_plane/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use crate::models::{StorageProfile, StorageProfileCreateRequest};
use crate::models::{Warehouse, WarehouseCreateRequest};
use crate::repository::{StorageProfileRepository, WarehouseRepository};
use async_trait::async_trait;
use std::sync::Arc;
use uuid::Uuid;
use datafusion::catalog_common::CatalogProvider;
use datafusion::prelude::*;
use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
use iceberg_datafusion::IcebergCatalogProvider;
use datafusion::catalog_common::CatalogProvider;
use std::collections::HashMap;
use std::sync::Arc;
use uuid::Uuid;

#[async_trait]
pub trait ControlService: Send + Sync {
Expand All @@ -33,7 +33,7 @@ pub trait ControlService: Send + Sync {
// async fn delete_table(&self, id: Uuid) -> Result<()>;
// async fn list_tables(&self) -> Result<Vec<Table>>;

async fn query_table(&self, warehouse_id:&Uuid, query:&String) -> Result<(&str)>;
async fn query_table(&self, warehouse_id: &Uuid, query: &String) -> Result<(&str)>;
}

pub struct ControlServiceImpl {
Expand Down Expand Up @@ -102,7 +102,7 @@ impl ControlService for ControlServiceImpl {
self.warehouse_repo.list().await
}

async fn query_table(&self, warehouse_id:&Uuid, query:&String) -> Result<(&str)> {
async fn query_table(&self, warehouse_id: &Uuid, query: &String) -> Result<(&str)> {
let config = RestCatalogConfig::builder()
.uri("http://0.0.0.0:3000/catalog".to_string())
.warehouse(warehouse_id.to_string())
Expand Down
2 changes: 1 addition & 1 deletion crates/nexus/src/http/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub fn create_app(state: AppState) -> Router {
.nest("/", control_router)
.nest("/catalog", catalog_router)
.nest("/ui", ui_router)
.merge(SwaggerUi::new("/").url("/openapi.yaml", spec))
.merge(SwaggerUi::new("/").url("/openapi.json", spec))
.with_state(state)
}

Expand Down
106 changes: 99 additions & 7 deletions crates/nexus/src/http/ui/handlers/common.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::http::ui::models::database::Database;
use crate::http::ui::models::errors::AppError;
use crate::http::ui::models::storage_profile::StorageProfile;
use crate::http::ui::models::table::{Statistics, Table};
use crate::http::ui::models::warehouse::Warehouse;
use crate::state::AppState;
use catalog::models::{DatabaseIdent, Table};
use catalog::models::{DatabaseIdent, TableIdent, WarehouseIdent};
use control_plane::models::Warehouse as WarehouseModel;
use uuid::Uuid;

Expand All @@ -21,7 +22,9 @@ impl AppState {
})
}
pub async fn get_warehouse_by_id(&self, warehouse_id: Uuid) -> Result<Warehouse, AppError> {
self.get_warehouse_model(warehouse_id).await.map(|warehouse| warehouse.into())
self.get_warehouse_model(warehouse_id)
.await
.map(|warehouse| warehouse.into())
}

pub async fn get_profile_by_id(
Expand All @@ -34,26 +37,115 @@ impl AppState {
.map_err(|e| {
let fmt = format!("{}: failed to get profile by id {}", e, storage_profile_id);
AppError::new(e, fmt.as_str())
}).map(|profile| profile.into())
})
.map(|profile| profile.into())
}

pub async fn get_database_by_ident(&self, ident: &DatabaseIdent) -> Result<Database, AppError> {
pub async fn get_database(&self, ident: &DatabaseIdent) -> Result<Database, AppError> {
self.catalog_svc
.get_namespace(ident)
.await
.map_err(|e| {
let fmt = format!("{}: failed to get database with db ident {}", e, &ident);
AppError::new(e, fmt.as_str())
}).map(|database| database.into())
})
.map(|database| database.into())
}

pub async fn list_warehouses(&self) -> Result<Vec<Warehouse>, AppError> {
let warehouses: Vec<Warehouse> = self
.control_svc
.list_warehouses()
.await
.map_err(|e| {
let fmt = format!("{}: failed to get warehouses", e);
AppError::new(e, fmt.as_str())
})?
.into_iter()
.map(|w| w.into())
.collect();

let mut result = Vec::new();
for mut warehouse in warehouses {
let databases = self.list_databases(warehouse.id).await?;

let profile = self
.get_profile_by_id(warehouse.storage_profile_id.unwrap())
.await?;

let mut total_statistics = Statistics::default();
databases.iter().for_each(|database| {
let stats = database.clone().statistics.unwrap_or_default();
total_statistics = total_statistics.aggregate(&stats);
});

warehouse.with_details(
Option::from(profile.clone()),
Option::from(databases.clone()),
);
warehouse.statistics = Some(total_statistics);
result.push(warehouse)
}
Ok(result)
}
pub async fn list_databases(&self, warehouse_id: Uuid) -> Result<Vec<Database>, AppError> {
let ident = &WarehouseIdent::new(warehouse_id);
let databases = self
.catalog_svc
.list_namespaces(ident, None)
.await
.map_err(|e| {
let fmt = format!(
"{}: failed to get warehouse databases with wh id {}",
e, warehouse_id
);
AppError::new(e, fmt.as_str())
})?;

let mut database_entities = Vec::new();
for database in databases {
let tables = self.catalog_svc.list_tables(&database.ident).await?;
let mut total_statistics = Statistics::default();

for table in tables {
let table_stats = Statistics::from_table_metadata(&table.metadata);
total_statistics = total_statistics.aggregate(&table_stats);
}

total_statistics.database_count = Option::from(1);
let mut entity = Database::from(database);
entity.statistics = Option::from(total_statistics);
database_entities.push(entity);
}
Ok(database_entities)
}

pub async fn list_tables(&self, ident: &DatabaseIdent) -> Result<Vec<Table>, AppError> {
self.catalog_svc.list_tables(ident).await.map_err(|e| {
let tables = self
.catalog_svc
.list_tables(ident)
.await
.map_err(|e| {
let fmt = format!(
"{}: failed to get database tables with db ident {}",
e, &ident
);
AppError::new(e, fmt.as_str())
})?
.into_iter()
.map(|table| table.into())
.collect();
Ok(tables)
}

pub async fn get_table(&self, ident: &TableIdent) -> Result<Table, AppError> {
let table = self.catalog_svc.load_table(ident).await.map_err(|e| {
let fmt = format!(
"{}: failed to get database tables with db ident {}",
e, &ident
);
AppError::new(e, fmt.as_str())
})
})?;
Ok(table.into())
}
}
62 changes: 19 additions & 43 deletions crates/nexus/src/http/ui/handlers/databases.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use crate::http::ui::models::database;
use crate::http::ui::models::database::{get_database_id, DatabaseDashboard};
use crate::http::ui::models::database::{CreateDatabasePayload, Database};
use crate::http::ui::models::errors::AppError;
use crate::http::ui::models::table::{get_table_id, TableEntity};
use crate::http::ui::models::warehouse::WarehouseEntity;
use crate::state::AppState;
use axum::{extract::Path, extract::State, Json};
use catalog::models::{DatabaseIdent, WarehouseIdent};
Expand All @@ -20,12 +17,8 @@ use uuid::Uuid;
),
components(
schemas(
database::CreateDatabasePayload,
database::Database,
database::DatabaseDashboard,
database::DatabaseEntity,
database::DatabaseExtended,
database::DatabaseShort,
CreateDatabasePayload,
Database,
AppError,
)
),
Expand All @@ -42,9 +35,9 @@ pub struct ApiDoc;
params(
("warehouseId" = Uuid, description = "Warehouse ID"),
),
request_body = database::CreateDatabasePayload,
request_body = CreateDatabasePayload,
responses(
(status = 200, description = "Successful Response", body = database::Database),
(status = 200, description = "Successful Response", body = Database),
(status = 400, description = "Bad request", body = AppError),
(status = 422, description = "Unprocessable entity", body = AppError),
(status = 500, description = "Internal server error", body = AppError)
Expand All @@ -53,8 +46,8 @@ pub struct ApiDoc;
pub async fn create_database(
State(state): State<AppState>,
Path(warehouse_id): Path<Uuid>,
Json(payload): Json<database::CreateDatabasePayload>,
) -> Result<Json<database::Database>, AppError> {
Json(payload): Json<CreateDatabasePayload>,
) -> Result<Json<Database>, AppError> {
let warehouse = state.get_warehouse_by_id(warehouse_id).await?;
let ident = DatabaseIdent {
warehouse: WarehouseIdent::new(warehouse.id),
Expand Down Expand Up @@ -114,44 +107,27 @@ pub async fn delete_database(
),
operation_id = "webDatabaseDashboard",
responses(
(status = 200, description = "Successful Response", body = DatabaseDashboard),
(status = 200, description = "Successful Response", body = Database),
(status = 404, description = "Database not found", body = AppError),
(status = 422, description = "Unprocessable entity", body = AppError),
)
)]
pub async fn get_database(
State(state): State<AppState>,
Path((warehouse_id, database_name)): Path<(Uuid, String)>,
) -> Result<Json<database::DatabaseDashboard>, AppError> {
let warehouse = state.get_warehouse_by_id(warehouse_id).await?;
let profile = state.get_profile_by_id(warehouse.storage_profile_id).await?;
) -> Result<Json<Database>, AppError> {
let mut warehouse = state.get_warehouse_by_id(warehouse_id).await?;
let profile = state
.get_profile_by_id(warehouse.storage_profile_id.unwrap())
.await?;
let ident = DatabaseIdent {
warehouse: WarehouseIdent::new(warehouse.id),
namespace: NamespaceIdent::new(database_name),
};
let database = state.get_database_by_ident(&ident).await?;
let tables = state.catalog_svc.list_tables(&ident).await?;
Ok(Json(DatabaseDashboard {
name: ident.namespace.first().unwrap().to_string(),
properties: Option::from(database.properties),
id: get_database_id(ident),
warehouse_id,
warehouse: WarehouseEntity::new(warehouse.into(), profile.into()),
tables: tables
.into_iter()
.map(|t| {
let ident = t.ident.clone();
TableEntity {
id: get_table_id(t.ident),
name: ident.table,
created_at: Default::default(),
updated_at: Default::default(),
statistics: Default::default(),
compaction_summary: None,
}
})
.collect(),
statistics: Default::default(),
compaction_summary: None,
}))
let mut database = state.get_database(&ident).await?;
let tables = state.list_tables(&ident).await?;

warehouse.with_details(Option::from(profile), None);
database.with_details(Option::from(tables));
Ok(Json(database))
}
Loading