diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs
index b6bff78963..21ac81285f 100644
--- a/crates/catalog/sql/src/catalog.rs
+++ b/crates/catalog/sql/src/catalog.rs
@@ -23,8 +23,7 @@ use iceberg::io::FileIO;
use iceberg::spec::{TableMetadata, TableMetadataBuilder};
use iceberg::table::Table;
use iceberg::{
- Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation,
- TableIdent,
+ Catalog, Error, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent,
};
use sqlx::any::{install_default_drivers, AnyPoolOptions, AnyQueryResult, AnyRow};
use sqlx::{Any, AnyPool, Row, Transaction};
@@ -702,12 +701,7 @@ impl Catalog for SqlCatalog {
let tbl_metadata = TableMetadataBuilder::from_table_creation(tbl_creation)?
.build()?
.metadata;
- let tbl_metadata_location = format!(
- "{}/metadata/0-{}.metadata.json",
- location.clone(),
- Uuid::new_v4()
- );
-
+ let tbl_metadata_location = metadata_path(&location, Uuid::new_v4());
let file = self.fileio.new_output(&tbl_metadata_location)?;
file.write(serde_json::to_vec(&tbl_metadata)?.into())
.await?;
@@ -769,23 +763,94 @@ impl Catalog for SqlCatalog {
Ok(())
}
- async fn update_table(&self, _commit: TableCommit) -> Result
{
- Err(Error::new(
- ErrorKind::FeatureUnsupported,
- "Updating a table is not supported yet",
- ))
+ async fn update_table(&self, mut commit: TableCommit) -> Result {
+ let identifier = commit.identifier().clone();
+ if !self.table_exists(&identifier).await? {
+ return no_such_table_err(&identifier);
+ }
+
+ let requirements = commit.take_requirements();
+ let table_updates = commit.take_updates();
+
+ let table = self.load_table(&identifier).await?;
+ let mut update_table_metadata_builder =
+ TableMetadataBuilder::new_from_metadata(table.metadata().clone(), None);
+
+ for table_update in table_updates {
+ update_table_metadata_builder = table_update.apply(update_table_metadata_builder)?;
+ }
+
+ for table_requirement in requirements {
+ table_requirement.check(Some(table.metadata()))?;
+ }
+
+ let new_table_meta_location = metadata_path(table.metadata().location(), Uuid::new_v4());
+ let file = self.fileio.new_output(&new_table_meta_location)?;
+ let update_table_metadata = update_table_metadata_builder.build()?;
+ file.write(serde_json::to_vec(&update_table_metadata.metadata)?.into())
+ .await?;
+
+ let update = format!(
+ "UPDATE {CATALOG_TABLE_NAME}
+ SET {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?
+ WHERE {CATALOG_FIELD_CATALOG_NAME} = ?
+ AND {CATALOG_FIELD_TABLE_NAMESPACE} = ?
+ AND {CATALOG_FIELD_TABLE_NAME} = ?
+ AND (
+ {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}'
+ OR {CATALOG_FIELD_RECORD_TYPE} IS NULL
+ )"
+ );
+
+ let namespace_name = identifier.namespace().join(".");
+ let args: Vec