|
| 1 | +use object_store::path::Path; |
1 | 2 | use async_trait::async_trait; |
2 | | -use object_store::CredentialProvider; |
| 3 | +use object_store::{CredentialProvider, ObjectStore, PutPayload}; |
3 | 4 | use std::collections::HashMap; |
4 | 5 | use std::fmt::Debug; |
5 | 6 | use std::sync::Arc; |
6 | | - |
| 7 | +use bytes::Bytes; |
7 | 8 | use control_plane::models::Warehouse; |
8 | 9 | use iceberg::{spec::TableMetadataBuilder, TableCreation}; |
9 | | - |
| 10 | +use object_store::local::LocalFileSystem; |
| 11 | +use tokio::fs; |
| 12 | +use uuid::Uuid; |
10 | 13 | use crate::error::{Error, Result}; // TODO: Replace this with this crate error and result |
11 | 14 | use crate::models::{ |
12 | 15 | Config, Database, DatabaseIdent, Table, TableCommit, TableIdent, TableRequirementExt, |
@@ -108,14 +111,17 @@ impl Catalog for CatalogImpl { |
108 | 111 | .map(TableRequirementExt::new) |
109 | 112 | .try_for_each(|req| req.assert(&table.metadata, true))?; |
110 | 113 |
|
| 114 | + // TODO rewrite metadata file? need to research when metadata rewrite is needed |
| 115 | + // Currently the metadata file is only written once - during table creation |
| 116 | + |
111 | 117 | let mut builder = |
112 | | - TableMetadataBuilder::new_from_metadata(table.metadata, Some(table.metadata_location)); |
| 118 | + TableMetadataBuilder::new_from_metadata(table.metadata, Some(table.metadata_location.clone())); |
113 | 119 |
|
114 | 120 | for update in commit.updates { |
115 | 121 | builder = update.apply(builder)?; |
116 | 122 | } |
117 | 123 | let result = builder.build()?; |
118 | | - let metadata_location = result.metadata.location.clone(); |
| 124 | + let metadata_location = table.metadata_location.clone(); |
119 | 125 | let metadata = result.metadata; |
120 | 126 |
|
121 | 127 | let table: Table = Table { |
@@ -234,28 +240,40 @@ impl Catalog for CatalogImpl { |
234 | 240 | // Take into account namespace location property if present |
235 | 241 | // Take into account provided location if present |
236 | 242 | // If none, generate location based on warehouse location |
| 243 | + let table_location = format!("{}/{}", warehouse.location, creation.name); |
237 | 244 | let creation = { |
238 | 245 | let mut creation = creation; |
239 | | - creation.location = Some(format!("{}/{}", warehouse.location, creation.name)); |
| 246 | + creation.location = Some(table_location.clone()); |
240 | 247 | creation |
241 | 248 | }; |
242 | 249 | // TODO: Add checks |
243 | 250 | // - Check if storage profile is valid (writtable) |
244 | 251 |
|
245 | 252 | let name = creation.name.to_string(); |
246 | 253 | let result = TableMetadataBuilder::from_table_creation(creation)?.build()?; |
247 | | - let location = result.metadata.location.clone(); |
| 254 | + let metadata = result.metadata.clone(); |
| 255 | + let metadata_file_id = Uuid::new_v4().to_string(); |
| 256 | + let metadata_relative_location = format!("{table_location}/metadata/{metadata_file_id}.metadata.json"); |
| 257 | + // TODO un-hardcode "file://" and make it dynamic - filesystem or s3 (at least) |
| 258 | + let metadata_full_location = format!("file://object_store/{metadata_relative_location}"); |
248 | 259 |
|
249 | 260 | let table = Table { |
250 | | - metadata: result.metadata, |
251 | | - metadata_location: location, |
| 261 | + metadata: metadata.clone(), |
| 262 | + metadata_location: metadata_full_location, |
252 | 263 | ident: TableIdent { |
253 | 264 | database: namespace.clone(), |
254 | 265 | table: name.clone(), |
255 | 266 | }, |
256 | 267 | }; |
257 | 268 | self.table_repo.put(&table).await?; |
258 | | - // TODO: Write metadata contents to metadata_location |
| 269 | + |
| 270 | + let local_dir = "object_store"; |
| 271 | + fs::create_dir_all(local_dir).await.unwrap(); |
| 272 | + let store = LocalFileSystem::new_with_prefix(local_dir).expect("Failed to initialize filesystem object store"); |
| 273 | + let path = Path::from(metadata_relative_location); |
| 274 | + let json_data = serde_json::to_string(&table.metadata).unwrap(); |
| 275 | + let content = Bytes::from(json_data); |
| 276 | + store.put(&path, PutPayload::from(content)).await.expect("Failed to write file"); |
259 | 277 |
|
260 | 278 | Ok(table) |
261 | 279 | } |
|
0 commit comments