Skip to content

Commit 0629ad5

Browse files
Add hive metastore catalog support (part 2/2) (#285)
* fmt members * setup basic test-infra for hms-catalog * add license * add hms create_namespace * add hms get_namespace * fix: typo * add hms namespace_exists and drop_namespace * add hms update_namespace * move fns into HmsCatalog * use `expose` in docker-compose * add hms list_tables * fix: clippy * fix: cargo sort * fix: cargo workspace * move fns into utils + add constants * include database name in error msg * add pilota to cargo workspace * add minio version * change visibility to pub(crate); return namespace from conversion fn * add minio version in rest-catalog docker-compose * fix: hms test docker infrastructure * add version to minio/mc * fix: license header * fix: core-site * split utils and errors * add fn get_default_table_location * add fn get_metadata_location * add docs * add HiveSchemaBuilder * add schema to HiveSchemaBuilder * add convert_to_hive_table * cargo sort * implement table_ops without TableMetadataBuilder * refactor: HiveSchema fn from_iceberg * prepare table creation without metadata * simplify HiveSchemaBuilder * refactor: use ok_or_else() * simplify HiveSchemaBuilder * fix visibility of consts * change serde metadata v2 * change default partition_specs and sort_orders * change test * add create table with metadata * use FileIO::from_path * add test_load_table * small fixes + docs * rename * extract get_metadata_location from hive_table * add integration tests * fix: clippy * remove whitespace * fix: fixture names * remove builder-prefix `with` * capitalize error msg * remove trait bound `Display` * add const `OWNER` * fix: default warehouse location * add test-case `list_tables` * add all primitives to test_schema * exclude `Timestamptz` from hive conversion * remove Self::T from schema * remove context * keep file_io in HmsCatalog * use json schema repr --------- Co-authored-by: mlanhenke <[email protected]>
1 parent 757ef4c commit 0629ad5

File tree

7 files changed

+1167
-70
lines changed

7 files changed

+1167
-70
lines changed

crates/catalog/hms/Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,17 @@ keywords = ["iceberg", "hive", "catalog"]
3131
[dependencies]
3232
anyhow = { workspace = true }
3333
async-trait = { workspace = true }
34+
chrono = { workspace = true }
3435
hive_metastore = { workspace = true }
3536
iceberg = { workspace = true }
3637
log = { workspace = true }
3738
pilota = { workspace = true }
39+
serde_json = { workspace = true }
40+
tokio = { workspace = true }
3841
typed-builder = { workspace = true }
42+
uuid = { workspace = true }
3943
volo-thrift = { workspace = true }
4044

4145
[dev-dependencies]
4246
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
4347
port_scanner = { workspace = true }
44-
tokio = { workspace = true }

crates/catalog/hms/src/catalog.rs

Lines changed: 216 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,18 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use crate::error::from_io_error;
19+
use crate::error::from_thrift_error;
20+
1821
use super::utils::*;
1922
use async_trait::async_trait;
2023
use hive_metastore::ThriftHiveMetastoreClient;
2124
use hive_metastore::ThriftHiveMetastoreClientBuilder;
2225
use hive_metastore::ThriftHiveMetastoreGetDatabaseException;
26+
use hive_metastore::ThriftHiveMetastoreGetTableException;
27+
use iceberg::io::FileIO;
28+
use iceberg::spec::TableMetadata;
29+
use iceberg::spec::TableMetadataBuilder;
2330
use iceberg::table::Table;
2431
use iceberg::{
2532
Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation,
@@ -28,6 +35,8 @@ use iceberg::{
2835
use std::collections::HashMap;
2936
use std::fmt::{Debug, Formatter};
3037
use std::net::ToSocketAddrs;
38+
use tokio::io::AsyncReadExt;
39+
use tokio::io::AsyncWriteExt;
3140
use typed_builder::TypedBuilder;
3241
use volo_thrift::ResponseError;
3342

@@ -47,6 +56,9 @@ pub enum HmsThriftTransport {
4756
pub struct HmsCatalogConfig {
4857
address: String,
4958
thrift_transport: HmsThriftTransport,
59+
warehouse: String,
60+
#[builder(default)]
61+
props: HashMap<String, String>,
5062
}
5163

5264
struct HmsClient(ThriftHiveMetastoreClient);
@@ -55,6 +67,7 @@ struct HmsClient(ThriftHiveMetastoreClient);
5567
pub struct HmsCatalog {
5668
config: HmsCatalogConfig,
5769
client: HmsClient,
70+
file_io: FileIO,
5871
}
5972

6073
impl Debug for HmsCatalog {
@@ -92,11 +105,20 @@ impl HmsCatalog {
92105
.build(),
93106
};
94107

108+
let file_io = FileIO::from_path(&config.warehouse)?
109+
.with_props(&config.props)
110+
.build()?;
111+
95112
Ok(Self {
96113
config,
97114
client: HmsClient(client),
115+
file_io,
98116
})
99117
}
118+
/// Get the catalogs `FileIO`
119+
pub fn file_io(&self) -> FileIO {
120+
self.file_io.clone()
121+
}
100122
}
101123

102124
#[async_trait]
@@ -173,7 +195,7 @@ impl Catalog for HmsCatalog {
173195
let db = self
174196
.client
175197
.0
176-
.get_database(name.clone().into())
198+
.get_database(name.into())
177199
.await
178200
.map_err(from_thrift_error)?;
179201

@@ -197,7 +219,7 @@ impl Catalog for HmsCatalog {
197219
async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
198220
let name = validate_namespace(namespace)?;
199221

200-
let resp = self.client.0.get_database(name.clone().into()).await;
222+
let resp = self.client.0.get_database(name.into()).await;
201223

202224
match resp {
203225
Ok(_) => Ok(true),
@@ -269,13 +291,22 @@ impl Catalog for HmsCatalog {
269291
Ok(())
270292
}
271293

294+
/// Asynchronously lists all tables within a specified namespace.
295+
///
296+
/// # Returns
297+
///
298+
/// A `Result<Vec<TableIdent>>`, which is:
299+
/// - `Ok(vec![...])` containing a vector of `TableIdent` instances, each
300+
/// representing a table within the specified namespace.
301+
/// - `Err(...)` if an error occurs during namespace validation or while
302+
/// querying the database.
272303
async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
273304
let name = validate_namespace(namespace)?;
274305

275306
let tables = self
276307
.client
277308
.0
278-
.get_all_tables(name.clone().into())
309+
.get_all_tables(name.into())
279310
.await
280311
.map_err(from_thrift_error)?;
281312

@@ -287,31 +318,201 @@ impl Catalog for HmsCatalog {
287318
Ok(tables)
288319
}
289320

321+
/// Creates a new table within a specified namespace using the provided
322+
/// table creation settings.
323+
///
324+
/// # Returns
325+
/// A `Result` wrapping a `Table` object representing the newly created
326+
/// table.
327+
///
328+
/// # Errors
329+
/// This function may return an error in several cases, including invalid
330+
/// namespace identifiers, failure to determine a default storage location,
331+
/// issues generating or writing table metadata, and errors communicating
332+
/// with the Hive Metastore.
290333
async fn create_table(
291334
&self,
292-
_namespace: &NamespaceIdent,
293-
_creation: TableCreation,
335+
namespace: &NamespaceIdent,
336+
creation: TableCreation,
294337
) -> Result<Table> {
295-
todo!()
338+
let db_name = validate_namespace(namespace)?;
339+
let table_name = creation.name.clone();
340+
341+
let location = match &creation.location {
342+
Some(location) => location.clone(),
343+
None => {
344+
let ns = self.get_namespace(namespace).await?;
345+
get_default_table_location(&ns, &table_name, &self.config.warehouse)
346+
}
347+
};
348+
349+
let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?;
350+
let metadata_location = create_metadata_location(&location, 0)?;
351+
352+
let mut file = self
353+
.file_io
354+
.new_output(&metadata_location)?
355+
.writer()
356+
.await?;
357+
file.write_all(&serde_json::to_vec(&metadata)?).await?;
358+
file.shutdown().await?;
359+
360+
let hive_table = convert_to_hive_table(
361+
db_name.clone(),
362+
metadata.current_schema(),
363+
table_name.clone(),
364+
location,
365+
metadata_location.clone(),
366+
metadata.properties(),
367+
)?;
368+
369+
self.client
370+
.0
371+
.create_table(hive_table)
372+
.await
373+
.map_err(from_thrift_error)?;
374+
375+
let table = Table::builder()
376+
.file_io(self.file_io())
377+
.metadata_location(metadata_location)
378+
.metadata(metadata)
379+
.identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
380+
.build();
381+
382+
Ok(table)
296383
}
297384

298-
async fn load_table(&self, _table: &TableIdent) -> Result<Table> {
299-
todo!()
385+
/// Loads a table from the Hive Metastore and constructs a `Table` object
386+
/// based on its metadata.
387+
///
388+
/// # Returns
389+
/// A `Result` wrapping a `Table` object that represents the loaded table.
390+
///
391+
/// # Errors
392+
/// This function may return an error in several scenarios, including:
393+
/// - Failure to validate the namespace.
394+
/// - Failure to retrieve the table from the Hive Metastore.
395+
/// - Absence of metadata location information in the table's properties.
396+
/// - Issues reading or deserializing the table's metadata file.
397+
async fn load_table(&self, table: &TableIdent) -> Result<Table> {
398+
let db_name = validate_namespace(table.namespace())?;
399+
400+
let hive_table = self
401+
.client
402+
.0
403+
.get_table(db_name.clone().into(), table.name.clone().into())
404+
.await
405+
.map_err(from_thrift_error)?;
406+
407+
let metadata_location = get_metadata_location(&hive_table.parameters)?;
408+
409+
let mut reader = self.file_io.new_input(&metadata_location)?.reader().await?;
410+
let mut metadata_str = String::new();
411+
reader.read_to_string(&mut metadata_str).await?;
412+
let metadata = serde_json::from_str::<TableMetadata>(&metadata_str)?;
413+
414+
let table = Table::builder()
415+
.file_io(self.file_io())
416+
.metadata_location(metadata_location)
417+
.metadata(metadata)
418+
.identifier(TableIdent::new(
419+
NamespaceIdent::new(db_name),
420+
table.name.clone(),
421+
))
422+
.build();
423+
424+
Ok(table)
300425
}
301426

302-
async fn drop_table(&self, _table: &TableIdent) -> Result<()> {
303-
todo!()
427+
/// Asynchronously drops a table from the database.
428+
///
429+
/// # Errors
430+
/// Returns an error if:
431+
/// - The namespace provided in `table` cannot be validated
432+
/// or does not exist.
433+
/// - The underlying database client encounters an error while
434+
/// attempting to drop the table. This includes scenarios where
435+
/// the table does not exist.
436+
/// - Any network or communication error occurs with the database backend.
437+
async fn drop_table(&self, table: &TableIdent) -> Result<()> {
438+
let db_name = validate_namespace(table.namespace())?;
439+
440+
self.client
441+
.0
442+
.drop_table(db_name.into(), table.name.clone().into(), false)
443+
.await
444+
.map_err(from_thrift_error)?;
445+
446+
Ok(())
304447
}
305448

306-
async fn table_exists(&self, _table: &TableIdent) -> Result<bool> {
307-
todo!()
449+
/// Asynchronously checks the existence of a specified table
450+
/// in the database.
451+
///
452+
/// # Returns
453+
/// - `Ok(true)` if the table exists in the database.
454+
/// - `Ok(false)` if the table does not exist in the database.
455+
/// - `Err(...)` if an error occurs during the process
456+
async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
457+
let db_name = validate_namespace(table.namespace())?;
458+
let table_name = table.name.clone();
459+
460+
let resp = self
461+
.client
462+
.0
463+
.get_table(db_name.into(), table_name.into())
464+
.await;
465+
466+
match resp {
467+
Ok(_) => Ok(true),
468+
Err(err) => {
469+
if let ResponseError::UserException(ThriftHiveMetastoreGetTableException::O2(_)) =
470+
&err
471+
{
472+
Ok(false)
473+
} else {
474+
Err(from_thrift_error(err))
475+
}
476+
}
477+
}
308478
}
309479

310-
async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> Result<()> {
311-
todo!()
480+
/// Asynchronously renames a table within the database
481+
/// or moves it between namespaces (databases).
482+
///
483+
/// # Returns
484+
/// - `Ok(())` on successful rename or move of the table.
485+
/// - `Err(...)` if an error occurs during the process.
486+
async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
487+
let src_dbname = validate_namespace(src.namespace())?;
488+
let dest_dbname = validate_namespace(dest.namespace())?;
489+
490+
let src_tbl_name = src.name.clone();
491+
let dest_tbl_name = dest.name.clone();
492+
493+
let mut tbl = self
494+
.client
495+
.0
496+
.get_table(src_dbname.clone().into(), src_tbl_name.clone().into())
497+
.await
498+
.map_err(from_thrift_error)?;
499+
500+
tbl.db_name = Some(dest_dbname.into());
501+
tbl.table_name = Some(dest_tbl_name.into());
502+
503+
self.client
504+
.0
505+
.alter_table(src_dbname.into(), src_tbl_name.into(), tbl)
506+
.await
507+
.map_err(from_thrift_error)?;
508+
509+
Ok(())
312510
}
313511

314512
async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
315-
todo!()
513+
Err(Error::new(
514+
ErrorKind::FeatureUnsupported,
515+
"Updating a table is not supported yet",
516+
))
316517
}
317518
}

crates/catalog/hms/src/error.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use anyhow::anyhow;
19+
use iceberg::{Error, ErrorKind};
20+
use std::fmt::Debug;
21+
use std::io;
22+
23+
/// Format a thrift error into iceberg error.
24+
pub fn from_thrift_error<T>(error: volo_thrift::error::ResponseError<T>) -> Error
25+
where
26+
T: Debug,
27+
{
28+
Error::new(
29+
ErrorKind::Unexpected,
30+
"Operation failed for hitting thrift error".to_string(),
31+
)
32+
.with_source(anyhow!("thrift error: {:?}", error))
33+
}
34+
35+
/// Format an io error into iceberg error.
36+
pub fn from_io_error(error: io::Error) -> Error {
37+
Error::new(
38+
ErrorKind::Unexpected,
39+
"Operation failed for hitting io error".to_string(),
40+
)
41+
.with_source(error)
42+
}

crates/catalog/hms/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,6 @@
2222
mod catalog;
2323
pub use catalog::*;
2424

25+
mod error;
26+
mod schema;
2527
mod utils;

0 commit comments

Comments
 (0)