Skip to content

Commit d78883d

Browse files
authored
Merge pull request #3 from Embucket/CP-5
Implement warehouses and profiles
2 parents bcf347e + bfa8b5b commit d78883d

File tree

10 files changed

+272
-95
lines changed

10 files changed

+272
-95
lines changed

crates/nexus/src/http/ui/handlers/databases.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ pub async fn get_database(
9696
created_at: Default::default(),
9797
updated_at: Default::default(),
9898
storage_profile: storage_profile::StorageProfile {
99-
r#type: aws::CloudProvider::S3,
99+
r#type: aws::CloudProvider::AWS,
100100
region: "".to_string(),
101101
bucket: "".to_string(),
102102
credentials: Default::default(),

crates/nexus/src/http/ui/handlers/profiles.rs

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::error::AppError;
22
use crate::http::ui::models::{aws, storage_profile};
33
use crate::state::AppState;
44
use axum::{extract::Path, extract::State, Json};
5+
use control_plane::models::{StorageProfile, StorageProfileCreateRequest};
56
use utoipa::OpenApi;
67
use uuid::Uuid;
78

@@ -44,17 +45,13 @@ pub async fn create_storage_profile(
4445
State(state): State<AppState>,
4546
Json(payload): Json<storage_profile::CreateStorageProfilePayload>,
4647
) -> Result<Json<storage_profile::StorageProfile>, AppError> {
47-
Ok(Json(storage_profile::StorageProfile {
48-
r#type: aws::CloudProvider::S3,
49-
region: "2".to_string(),
50-
bucket: "".to_string(),
51-
credentials: Default::default(),
52-
sts_role_arn: None,
53-
endpoint: None,
54-
id: Default::default(),
55-
created_at: Default::default(),
56-
updated_at: Default::default(),
57-
}))
48+
let request: StorageProfileCreateRequest = payload.into();
49+
let profile: StorageProfile = state
50+
.control_svc
51+
.create_profile(&request)
52+
.await
53+
.map_err(|e| AppError::from(e))?;
54+
Ok(Json(profile.into()))
5855
}
5956

6057
#[utoipa::path(
@@ -71,19 +68,14 @@ pub async fn create_storage_profile(
7168
)]
7269
pub async fn get_storage_profile(
7370
State(state): State<AppState>,
74-
Path(id): Path<Uuid>,
71+
Path(storage_profile_id): Path<Uuid>,
7572
) -> Result<Json<storage_profile::StorageProfile>, AppError> {
76-
Ok(Json(storage_profile::StorageProfile {
77-
r#type: aws::CloudProvider::S3,
78-
region: "1".to_string(),
79-
bucket: "".to_string(),
80-
credentials: Default::default(),
81-
sts_role_arn: None,
82-
endpoint: None,
83-
id: Default::default(),
84-
created_at: Default::default(),
85-
updated_at: Default::default(),
86-
}))
73+
let profile: StorageProfile = state
74+
.control_svc
75+
.get_profile(storage_profile_id)
76+
.await
77+
.map_err(|e| AppError::from(e))?;
78+
Ok(Json(profile.into()))
8779
}
8880

8981
#[utoipa::path(
@@ -100,8 +92,13 @@ pub async fn get_storage_profile(
10092
)]
10193
pub async fn delete_storage_profile(
10294
State(state): State<AppState>,
103-
Path(id): Path<Uuid>,
95+
Path(storage_profile_id): Path<Uuid>,
10496
) -> Result<Json<()>, AppError> {
97+
state
98+
.control_svc
99+
.delete_profile(storage_profile_id)
100+
.await
101+
.map_err(|e| AppError::from(e))?;
105102
Ok(Json(()))
106103
}
107104

@@ -117,5 +114,6 @@ pub async fn delete_storage_profile(
117114
pub async fn list_storage_profiles(
118115
State(state): State<AppState>,
119116
) -> Result<Json<Vec<storage_profile::StorageProfile>>, AppError> {
120-
Ok(Json(vec![]))
117+
let profiles = state.control_svc.list_profiles().await?;
118+
Ok(Json(profiles.into_iter().map(|p| p.into()).collect()))
121119
}

crates/nexus/src/http/ui/handlers/tables.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ pub async fn get_table(
9090
statistics: None,
9191
compaction_summary: None,
9292
storage_profile: storage_profile::StorageProfile {
93-
r#type: aws::CloudProvider::S3,
93+
r#type: aws::CloudProvider::AWS,
9494
region: "22".to_string(),
9595
bucket: "2".to_string(),
9696
credentials: Default::default(),
@@ -155,7 +155,7 @@ pub async fn create_table(
155155
statistics: None,
156156
compaction_summary: None,
157157
storage_profile: storage_profile::StorageProfile {
158-
r#type: aws::CloudProvider::S3,
158+
r#type: aws::CloudProvider::AWS,
159159
region: "2".to_string(),
160160
bucket: "2".to_string(),
161161
credentials: Default::default(),

crates/nexus/src/http/ui/handlers/warehouses.rs

Lines changed: 60 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::error::AppError;
22
use crate::http::ui::models::warehouse;
33
use crate::state::AppState;
44
use axum::{extract::Path, extract::State, Json};
5+
use control_plane::models::{Warehouse as WarehouseModel, WarehouseCreateRequest};
56
use utoipa::OpenApi;
67
use uuid::Uuid;
78

@@ -31,14 +32,33 @@ pub struct ApiDoc;
3132
path = "/ui/warehouses",
3233
operation_id = "webWarehousesDashboard",
3334
responses(
34-
(status = 200, description = "List all warehouses", body = Vec<warehouse::WarehousesDashboard>),
35+
(status = 200, description = "List all warehouses", body = warehouse::WarehousesDashboard),
3536
(status = 500, description = "Internal server error")
3637
)
3738
)]
3839
pub async fn list_warehouses(
3940
State(state): State<AppState>,
40-
) -> Result<Json<Vec<warehouse::WarehousesDashboard>>, AppError> {
41-
Ok(Json(vec![]))
41+
) -> Result<Json<warehouse::WarehousesDashboard>, AppError> {
42+
let warehouses = state.control_svc.list_warehouses().await?;
43+
let storage_profiles = state.control_svc.list_profiles().await?;
44+
let mut extended_warehouses = Vec::new();
45+
46+
for warehouse in warehouses {
47+
let mut extended_warehouse =
48+
warehouse::WarehouseExtended::new(warehouse.clone().into(), Default::default());
49+
if let Some(profile) = storage_profiles
50+
.iter()
51+
.find(|p| p.id == extended_warehouse.storage_profile_id)
52+
{
53+
extended_warehouse.storage_profile = profile.clone().into();
54+
extended_warehouses.push(extended_warehouse)
55+
}
56+
}
57+
Ok(Json(warehouse::WarehousesDashboard {
58+
warehouses: extended_warehouses,
59+
statistics: Default::default(),
60+
compaction_summary: None,
61+
}))
4262
}
4363

4464
#[utoipa::path(
@@ -49,25 +69,28 @@ pub async fn list_warehouses(
4969
("warehouseId" = Uuid, Path, description = "Warehouse ID")
5070
),
5171
responses(
52-
(status = 200, description = "Warehouse found", body = warehouse::Warehouse),
72+
(status = 200, description = "Warehouse found", body = warehouse::WarehouseExtended),
5373
(status = 404, description = "Warehouse not found")
5474
)
5575
)]
5676
pub async fn get_warehouse(
5777
State(state): State<AppState>,
5878
Path(warehouse_id): Path<Uuid>,
59-
) -> Result<Json<warehouse::Warehouse>, AppError> {
60-
// let warehouse = state.warehouse_service.get_warehouse(warehouse_id).await?;
61-
Ok(Json(warehouse::Warehouse {
62-
name: "".to_string(),
63-
storage_profile_id: Default::default(),
64-
key_prefix: "key".to_string(),
65-
id: Default::default(),
66-
external_id: Default::default(),
67-
location: "".to_string(),
68-
created_at: Default::default(),
69-
updated_at: Default::default(),
70-
}))
79+
) -> Result<Json<warehouse::WarehouseExtended>, AppError> {
80+
let warehouse = state.control_svc.get_warehouse(warehouse_id).await?;
81+
82+
let mut extended_warehouse =
83+
warehouse::WarehouseExtended::new(warehouse.into(), Default::default());
84+
85+
if let Ok(profile) = state
86+
.control_svc
87+
.get_profile(extended_warehouse.storage_profile_id)
88+
.await
89+
{
90+
extended_warehouse.storage_profile = profile.into();
91+
}
92+
93+
Ok(Json(extended_warehouse))
7194
}
7295

7396
#[utoipa::path(
@@ -85,22 +108,25 @@ pub async fn create_warehouse(
85108
State(state): State<AppState>,
86109
Json(payload): Json<warehouse::CreateWarehousePayload>,
87110
) -> Result<Json<warehouse::Warehouse>, AppError> {
88-
Ok(Json(warehouse::Warehouse {
89-
name: "".to_string(),
90-
storage_profile_id: Default::default(),
91-
key_prefix: "".to_string(),
92-
id: Default::default(),
93-
external_id: Default::default(),
94-
location: "".to_string(),
95-
created_at: Default::default(),
96-
updated_at: Default::default(),
97-
}))
111+
let request: WarehouseCreateRequest = payload.into();
112+
113+
state
114+
.control_svc
115+
.get_profile(request.storage_profile_id)
116+
.await
117+
.map_err(|e| AppError::from(e))?;
118+
let warehouse: WarehouseModel = state
119+
.control_svc
120+
.create_warehouse(&request)
121+
.await
122+
.map_err(|e| AppError::from(e))?;
123+
Ok(Json(warehouse.into()))
98124
}
99125

100126
#[utoipa::path(
101127
delete,
102128
path = "/ui/warehouses/{warehouseId}",
103-
operation_id = "webCeleteWarehouse",
129+
operation_id = "webCreteWarehouse",
104130
params(
105131
("warehouseId" = Uuid, Path, description = "Warehouse ID")
106132
),
@@ -112,6 +138,11 @@ pub async fn create_warehouse(
112138
pub async fn delete_warehouse(
113139
State(state): State<AppState>,
114140
Path(warehouse_id): Path<Uuid>,
115-
) -> Result<(), AppError> {
116-
Ok(())
141+
) -> Result<Json<()>, AppError> {
142+
state
143+
.control_svc
144+
.delete_warehouse(warehouse_id)
145+
.await
146+
.map_err(|e| AppError::from(e))?;
147+
Ok(Json(()))
117148
}

crates/nexus/src/http/ui/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
pub mod router;
2-
pub mod models;
31
pub mod handlers;
2+
pub mod models;
3+
pub mod router;

crates/nexus/src/http/ui/models/aws.rs

Lines changed: 108 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use control_plane::models;
12
use serde::{Deserialize, Serialize};
23
use utoipa::ToSchema;
34
use validator::Validate;
@@ -20,6 +21,23 @@ impl AwsAccessKeyCredential {
2021
}
2122
}
2223

24+
impl From<AwsAccessKeyCredential> for models::AwsAccessKeyCredential {
25+
fn from(credential: AwsAccessKeyCredential) -> Self {
26+
models::AwsAccessKeyCredential {
27+
aws_access_key_id: credential.aws_access_key_id,
28+
aws_secret_access_key: credential.aws_secret_access_key,
29+
}
30+
}
31+
}
32+
impl From<models::AwsAccessKeyCredential> for AwsAccessKeyCredential {
33+
fn from(credential: models::AwsAccessKeyCredential) -> Self {
34+
AwsAccessKeyCredential {
35+
aws_access_key_id: credential.aws_access_key_id,
36+
aws_secret_access_key: credential.aws_secret_access_key,
37+
}
38+
}
39+
}
40+
2341
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Validate, ToSchema)]
2442
pub struct AwsRoleCredential {
2543
#[validate(length(min = 1))]
@@ -38,26 +56,108 @@ impl AwsRoleCredential {
3856
}
3957
}
4058

59+
impl From<AwsRoleCredential> for models::AwsRoleCredential {
60+
fn from(credential: AwsRoleCredential) -> Self {
61+
models::AwsRoleCredential {
62+
role_arn: credential.role_arn,
63+
external_id: credential.external_id,
64+
}
65+
}
66+
}
67+
impl From<models::AwsRoleCredential> for AwsRoleCredential {
68+
fn from(credential: models::AwsRoleCredential) -> Self {
69+
AwsRoleCredential {
70+
role_arn: credential.role_arn,
71+
external_id: credential.external_id,
72+
}
73+
}
74+
}
75+
4176
#[derive(
42-
Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Hash, ToSchema,
77+
Debug,
78+
Clone,
79+
Copy,
80+
PartialEq,
81+
Eq,
82+
PartialOrd,
83+
Ord,
84+
Serialize,
85+
Deserialize,
86+
Hash,
87+
Default,
88+
ToSchema,
4389
)]
4490
pub enum CloudProvider {
45-
#[serde(rename = "s3")]
46-
S3,
91+
#[serde(rename = "aws")]
92+
#[default]
93+
AWS,
4794
#[serde(rename = "gcs")]
48-
Gcs,
95+
GCS,
4996
#[serde(rename = "azure")]
50-
Azure,
97+
AZURE,
98+
}
99+
100+
impl From<models::CloudProvider> for CloudProvider {
101+
fn from(provider: models::CloudProvider) -> Self {
102+
match provider {
103+
models::CloudProvider::AWS => CloudProvider::AWS,
104+
models::CloudProvider::GCS => CloudProvider::GCS,
105+
models::CloudProvider::AZURE => CloudProvider::AZURE,
106+
}
107+
}
108+
}
109+
110+
impl From<CloudProvider> for models::CloudProvider {
111+
fn from(provider: CloudProvider) -> Self {
112+
match provider {
113+
CloudProvider::AWS => models::CloudProvider::AWS,
114+
CloudProvider::GCS => models::CloudProvider::GCS,
115+
CloudProvider::AZURE => models::CloudProvider::AZURE,
116+
}
117+
}
51118
}
52119

53120
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
54121
pub enum Credentials {
55-
AwsAccessKeyCredential(AwsAccessKeyCredential),
56-
AwsRoleCredential(AwsRoleCredential),
122+
#[serde(rename = "access_key")]
123+
AccessKey(AwsAccessKeyCredential),
124+
#[serde(rename = "role")]
125+
Role(AwsRoleCredential),
57126
}
58127

59128
impl Default for Credentials {
60129
fn default() -> Self {
61-
Credentials::AwsAccessKeyCredential(AwsAccessKeyCredential::default())
130+
Credentials::AccessKey(AwsAccessKeyCredential::default())
131+
}
132+
}
133+
impl From<models::Credentials> for Credentials {
134+
fn from(credentials: models::Credentials) -> Self {
135+
match credentials {
136+
models::Credentials::AccessKey(creds) => Credentials::AccessKey(
137+
AwsAccessKeyCredential::new(creds.aws_access_key_id, creds.aws_secret_access_key),
138+
),
139+
models::Credentials::Role(creds) => {
140+
Credentials::Role(AwsRoleCredential::new(creds.role_arn, creds.external_id))
141+
}
142+
}
143+
}
144+
}
145+
146+
impl From<Credentials> for models::Credentials {
147+
fn from(credentials: Credentials) -> Self {
148+
match credentials {
149+
Credentials::AccessKey(creds) => models::Credentials::AccessKey(
150+
models::AwsAccessKeyCredential {
151+
aws_access_key_id: creds.aws_access_key_id,
152+
aws_secret_access_key: creds.aws_secret_access_key,
153+
},
154+
),
155+
Credentials::Role(creds) => {
156+
models::Credentials::Role(models::AwsRoleCredential {
157+
role_arn: creds.role_arn,
158+
external_id: creds.external_id,
159+
})
160+
}
161+
}
62162
}
63163
}

0 commit comments

Comments
 (0)