Skip to content

Commit 8241aaa

Browse files
authored
Merge pull request #11 from Embucket/query_endpoint_works
Query endpoint works
2 parents 75b10e2 + 09adeae commit 8241aaa

File tree

12 files changed

+107
-52
lines changed

12 files changed

+107
-52
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ async-trait = { version = "0.1" }
1818
serde = { version = "1.0", features = ["derive"] }
1919
slatedb = { version = "*" }
2020
bytes = { version = "1" }
21-
object_store = { version = "0.11" }
21+
object_store = { version = "0.10.1" }
2222
serde_json = "1.0"
2323
serde_yaml = "0.8"
2424
tower = { version = "0.4", features = ["util"] }

crates/catalog/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ futures = { workspace = true }
1414
slatedb = { workspace = true }
1515
bytes = { workspace = true }
1616
tokio = { workspace = true }
17-
object_store = { workspace = true }
1817
serde = { workspace = true }
1918
serde_json = { workspace = true }
2019
uuid = { workspace = true }

crates/catalog/src/service.rs

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,9 @@
1-
use object_store::path::Path;
21
use async_trait::async_trait;
3-
use object_store::{CredentialProvider, ObjectStore, PutPayload};
42
use std::collections::HashMap;
53
use std::fmt::Debug;
64
use std::sync::Arc;
7-
use bytes::Bytes;
85
use control_plane::models::Warehouse;
96
use iceberg::{spec::TableMetadataBuilder, TableCreation};
10-
use object_store::local::LocalFileSystem;
11-
use tokio::fs;
127
use uuid::Uuid;
138
use crate::error::{Error, Result}; // TODO: Replace this with this crate error and result
149
use crate::models::{
@@ -222,14 +217,14 @@ impl Catalog for CatalogImpl {
222217
&self,
223218
namespace: &DatabaseIdent,
224219
warehouse: &Warehouse,
225-
creation: TableCreation,
220+
table_creation: TableCreation,
226221
) -> Result<Table> {
227222
// Check if namespace exists
228223
_ = self.get_namespace(namespace).await?;
229224
// Check if table exists
230225
let ident = TableIdent {
231226
database: namespace.clone(),
232-
table: creation.name.clone(),
227+
table: table_creation.name.clone(),
233228
};
234229
let res = self.load_table(&ident).await;
235230
if res.is_ok() {
@@ -240,40 +235,44 @@ impl Catalog for CatalogImpl {
240235
// Take into account namespace location property if present
241236
// Take into account provided location if present
242237
// If none, generate location based on warehouse location
243-
let table_location = format!("{}/{}", warehouse.location, creation.name);
238+
// un-hardcode "file://" and make it dynamic - filesystem or s3 (at least)
239+
let working_dir_abs_path = std::env::current_dir().unwrap().to_str().unwrap().to_string();
240+
let table_location = format!("file://{}/{}/{}", working_dir_abs_path, warehouse.location, table_creation.name);
244241
let creation = {
245-
let mut creation = creation;
242+
let mut creation = table_creation;
246243
creation.location = Some(table_location.clone());
247244
creation
248245
};
249246
// TODO: Add checks
250-
// - Check if storage profile is valid (writtable)
247+
// - Check if storage profile is valid (writable)
251248

252249
let name = creation.name.to_string();
253250
let result = TableMetadataBuilder::from_table_creation(creation)?.build()?;
254251
let metadata = result.metadata.clone();
255252
let metadata_file_id = Uuid::new_v4().to_string();
256-
let metadata_relative_location = format!("{table_location}/metadata/{metadata_file_id}.metadata.json");
257-
// TODO un-hardcode "file://" and make it dynamic - filesystem or s3 (at least)
258-
let metadata_full_location = format!("file://object_store/{metadata_relative_location}");
253+
let metadata_location = format!("{table_location}/metadata/{metadata_file_id}.metadata.json");
259254

260255
let table = Table {
261256
metadata: metadata.clone(),
262-
metadata_location: metadata_full_location,
257+
metadata_location: metadata_location.clone(),
263258
ident: TableIdent {
264259
database: namespace.clone(),
265260
table: name.clone(),
266261
},
267262
};
268263
self.table_repo.put(&table).await?;
269264

270-
let local_dir = "object_store";
271-
fs::create_dir_all(local_dir).await.unwrap();
272-
let store = LocalFileSystem::new_with_prefix(local_dir).expect("Failed to initialize filesystem object store");
273-
let path = Path::from(metadata_relative_location);
274-
let json_data = serde_json::to_string(&table.metadata).unwrap();
275-
let content = Bytes::from(json_data);
276-
store.put(&path, PutPayload::from(content)).await.expect("Failed to write file");
265+
let file_io = iceberg::io::FileIOBuilder::new("file").build()?;
266+
let metadata_file = file_io
267+
.new_output(metadata_location)?;
268+
let mut writer = metadata_file
269+
.writer()
270+
.await?;
271+
let buf = serde_json::to_vec(&table.metadata).unwrap();
272+
writer
273+
.write(buf.into())
274+
.await?;
275+
writer.close().await?;
277276

278277
Ok(table)
279278
}

crates/control_plane/Cargo.toml

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@ thiserror = { workspace = true }
1212
utils = { path = "../utils" }
1313
futures = { workspace = true }
1414
serde = { workspace = true }
15-
datafusion = { version = "41" }
16-
iceberg-catalog-rest = { version = "0.3" }
17-
iceberg-datafusion = { version = "0.3" }
15+
datafusion = { version = "40" }
16+
iceberg-rust = { version = "0.5.8" }
17+
iceberg-rest-catalog = { version = "0.5.8" }
18+
datafusion_iceberg = { version = "0.5.8" }
1819
arrow = { version = "52" }
20+
arrow-json = { version = "52" }
21+
object_store = { workspace = true }
1922

2023
[dev-dependencies]
2124
slatedb = {workspace = true }
22-
object_store = { workspace = true }

crates/control_plane/src/service.rs

Lines changed: 54 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@ use crate::models::{StorageProfile, StorageProfileCreateRequest};
33
use crate::models::{Warehouse, WarehouseCreateRequest};
44
use crate::repository::{StorageProfileRepository, WarehouseRepository};
55
use async_trait::async_trait;
6-
use datafusion::catalog_common::CatalogProvider;
7-
use datafusion::prelude::*;
8-
use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
9-
use iceberg_datafusion::IcebergCatalogProvider;
10-
use std::collections::HashMap;
116
use std::sync::Arc;
127
use uuid::Uuid;
8+
use datafusion::prelude::*;
9+
use iceberg_rest_catalog::apis::configuration::Configuration;
10+
use iceberg_rust::catalog::bucket::ObjectStoreBuilder;
11+
use datafusion_iceberg::catalog::catalog::IcebergCatalog;
12+
use iceberg_rest_catalog::catalog::RestCatalog;
13+
use arrow::record_batch::RecordBatch;
14+
use object_store::local::LocalFileSystem;
1315

1416
#[async_trait]
1517
pub trait ControlService: Send + Sync {
@@ -33,7 +35,8 @@ pub trait ControlService: Send + Sync {
3335
// async fn delete_table(&self, id: Uuid) -> Result<()>;
3436
// async fn list_tables(&self) -> Result<Vec<Table>>;
3537

36-
async fn query_table(&self, warehouse_id: &Uuid, query: &String) -> Result<(&str)>;
38+
async fn query_table(&self, warehouse_id:&Uuid, database_name:&String, table_name:&String, query:&String) -> Result<
39+
(String)>;
3740
}
3841

3942
pub struct ControlServiceImpl {
@@ -102,25 +105,58 @@ impl ControlService for ControlServiceImpl {
102105
self.warehouse_repo.list().await
103106
}
104107

105-
async fn query_table(&self, warehouse_id: &Uuid, query: &String) -> Result<(&str)> {
106-
let config = RestCatalogConfig::builder()
107-
.uri("http://0.0.0.0:3000/catalog".to_string())
108-
.warehouse(warehouse_id.to_string())
109-
.props(HashMap::default())
110-
.build();
108+
async fn query_table(&self, warehouse_id:&Uuid, database_name:&String, table_name:&String, query:&String) -> Result<
109+
(String)> {
110+
let config = {
111+
let mut config = Configuration::new();
112+
config.base_path = "http://0.0.0.0:3000/catalog".to_string();
113+
config
114+
};
115+
let builder = {
116+
Arc::new(LocalFileSystem::new())
117+
};
118+
let rest_client = RestCatalog::new(
119+
Some(warehouse_id.to_string().as_str()),
120+
config,
121+
ObjectStoreBuilder::Filesystem(builder),
122+
);
123+
let catalog = IcebergCatalog::new(Arc::new(rest_client), None)
124+
.await
125+
.unwrap();
126+
127+
let ctx = SessionContext::new();
128+
ctx.register_catalog("catalog", Arc::new(catalog));
129+
130+
let provider = ctx.catalog("catalog").unwrap();
131+
let schemas = provider.schema_names();
132+
println!("{schemas:?}");
111133

112-
let catalog = RestCatalog::new(config);
134+
let tables = provider.schema(database_name).unwrap().table_names();
135+
println!("{tables:?}");
113136

114-
let catalog = IcebergCatalogProvider::try_new(Arc::new(catalog))
137+
println!("{}", query);
138+
let records = ctx
139+
.sql(query)
140+
.await
141+
.unwrap()
142+
.collect()
115143
.await
116144
.unwrap();
145+
println!("{records:?}");
146+
147+
let df = ctx.sql(query,).await.unwrap();
148+
df.show().await.unwrap();
117149

118-
// Test that catalog loaded successfully
119-
println!("SCHEMAS: {:?}", catalog.schema_names());
150+
let buf = Vec::new();
151+
let mut writer = arrow_json::ArrayWriter::new(buf);
152+
let record_refs: Vec<&RecordBatch> = records.iter().collect();
153+
writer.write_batches(&record_refs).unwrap();
154+
writer.finish().unwrap();
120155

121-
// TODO rest of the query code
156+
// Get the underlying buffer back,
157+
let buf = writer.into_inner();
122158

123-
Ok(("OK"))
159+
Ok((String::from_utf8(buf).unwrap()))
124160
}
125161
}
126162

crates/nexus/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ serde = { workspace = true }
2020
iceberg = { workspace = true }
2121
slatedb = { workspace = true }
2222
object_store = { workspace = true }
23+
object_store_for_slatedb = { package = "object_store", version = "0.11.1" }
2324
utils = { path = "../utils" }
2425
utoipa = { workspace = true }
2526
utoipa-axum = { workspace = true }

crates/nexus/src/http/catalog/handlers.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,3 +182,15 @@ pub async fn get_config(
182182

183183
Ok(Json(config.into()))
184184
}
185+
186+
// only one endpoint is defined for the catalog implementation to work
187+
// we don't actually have functionality for views yet
188+
pub async fn list_views(
189+
State(state): State<AppState>,
190+
Path((id, namespace_id)): Path<(Uuid, String)>,
191+
) -> Result<Json<schemas::TableListResponse>, AppError> {
192+
193+
Ok(Json(schemas::TableListResponse {
194+
identifiers: vec![],
195+
}))
196+
}

crates/nexus/src/http/catalog/router.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use axum::Router;
44

55
use crate::http::catalog::handlers::{
66
commit_table, create_namespace, create_table, delete_namespace, delete_table, get_config,
7-
get_namespace, get_table, list_namespaces, list_tables,
7+
get_namespace, get_table, list_namespaces, list_tables, list_views
88
};
99

1010
pub fn create_router() -> Router<AppState> {
@@ -15,12 +15,18 @@ pub fn create_router() -> Router<AppState> {
1515
.route("/:table", delete(delete_table))
1616
.route("/:table", post(commit_table));
1717

18+
// only one endpoint is defined for the catalog implementation to work
19+
// we don't actually have functionality for views yet
20+
let view_router: Router<AppState> = Router::new()
21+
.route("/", get(list_views));
22+
1823
let ns_router = Router::new()
1924
.route("/", get(list_namespaces))
2025
.route("/", post(create_namespace))
2126
.route("/:namespace", get(get_namespace))
2227
.route("/:namespace", delete(delete_namespace))
23-
.nest("/:namespace/tables", table_router);
28+
.nest("/:namespace/tables", table_router)
29+
.nest("/:namespace/views", view_router);
2430

2531
// Iceberg clients do not prefix config fetch RPC call
2632
// and do prefix (with whatever prefix returned by config fetch) all other RPC calls

crates/nexus/src/http/ui/handlers/tables.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ pub async fn query_table(
165165
let request: TableQueryRequest = payload.into();
166166
let result = state
167167
.control_svc
168-
.query_table(&warehouse_id, &request.query)
168+
.query_table(&warehouse_id, &database_name, &table_name, &request.query)
169169
.await?;
170170
Ok(Json(TableQueryResponse {
171171
id: Default::default(),

crates/nexus/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use catalog::repository::{DatabaseRepositoryDb, TableRepositoryDb};
22
use catalog::service::CatalogImpl;
33
use control_plane::repository::{StorageProfileRepositoryDb, WarehouseRepositoryDb};
44
use control_plane::service::ControlServiceImpl;
5-
use object_store::{memory::InMemory, path::Path, ObjectStore};
5+
use object_store_for_slatedb::{memory::InMemory, path::Path, ObjectStore};
66
use slatedb::config::DbOptions;
77
use slatedb::db::Db as SlateDb;
88
use std::sync::Arc;

0 commit comments

Comments
 (0)