Skip to content

Commit 32b54fe

Browse files
committed
feat: add load table
Signed-off-by: callum-ryan <[email protected]>
1 parent 0e2bf7b commit 32b54fe

File tree

1 file changed

+47
-3
lines changed

1 file changed

+47
-3
lines changed

crates/catalog/sql/src/catalog.rs

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use std::time::Duration;
2020

2121
use async_trait::async_trait;
2222
use iceberg::io::FileIO;
23-
use iceberg::spec::TableMetadataBuilder;
23+
use iceberg::spec::{TableMetadata, TableMetadataBuilder};
2424
use iceberg::table::Table;
2525
use iceberg::{
2626
Catalog, Error, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent,
@@ -41,6 +41,7 @@ static CATALOG_FIELD_TABLE_NAMESPACE: &str = "table_namespace";
4141
static CATALOG_FIELD_METADATA_LOCATION_PROP: &str = "metadata_location";
4242
static CATALOG_FIELD_PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
4343
static CATALOG_FIELD_RECORD_TYPE: &str = "iceberg_type";
44+
static CATALOG_FIELD_TABLE_RECORD_TYPE: &str = "TABLE";
4445

4546
static NAMESPACE_TABLE_NAME: &str = "iceberg_namespace_properties";
4647
static NAMESPACE_FIELD_NAME: &str = "namespace";
@@ -569,8 +570,51 @@ impl Catalog for SqlCatalog {
569570
todo!()
570571
}
571572

572-
async fn load_table(&self, _identifier: &TableIdent) -> Result<Table> {
573-
todo!()
573+
async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
574+
if !self.table_exists(identifier).await? {
575+
return no_such_table_err(identifier);
576+
}
577+
578+
let rows = self
579+
.fetch_rows(
580+
&format!(
581+
"SELECT {CATALOG_FIELD_METADATA_LOCATION_PROP}
582+
FROM {CATALOG_TABLE_NAME}
583+
WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
584+
AND {CATALOG_FIELD_TABLE_NAME} = ?
585+
AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
586+
AND (
587+
{CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
588+
OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
589+
)"
590+
),
591+
vec![
592+
Some(&self.name),
593+
Some(identifier.name()),
594+
Some(&identifier.namespace().join(".")),
595+
],
596+
)
597+
.await?;
598+
599+
if rows.is_empty() {
600+
return no_such_table_err(identifier);
601+
}
602+
603+
let row = &rows[0];
604+
let tbl_metadata_location = row
605+
.try_get::<String, _>(CATALOG_FIELD_METADATA_LOCATION_PROP)
606+
.map_err(from_sqlx_error)?;
607+
608+
let file = self.fileio.new_input(&tbl_metadata_location)?;
609+
let metadata_content = file.read().await?;
610+
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;
611+
612+
Ok(Table::builder()
613+
.file_io(self.fileio.clone())
614+
.identifier(identifier.clone())
615+
.metadata_location(tbl_metadata_location)
616+
.metadata(metadata)
617+
.build()?)
574618
}
575619

576620
async fn create_table(

0 commit comments

Comments
 (0)