diff --git a/crates/catalog/memory/src/catalog.rs b/crates/catalog/memory/src/catalog.rs index 995630110e..db09212645 100644 --- a/crates/catalog/memory/src/catalog.rs +++ b/crates/catalog/memory/src/catalog.rs @@ -53,6 +53,30 @@ impl MemoryCatalog { warehouse_location, } } + + fn new_metadata_location(&self, location: &str) -> String { + format!("{}/metadata/{}.metadata.json", location, Uuid::new_v4()) + } + + async fn commit_table( + &self, + table_ident: &TableIdent, + next_metadata: TableMetadata, + ) -> Result<()> { + let mut root_namespace_state = self.root_namespace_state.lock().await; + + let table_metadata_dir = + root_namespace_state.get_existing_table_metadata_dir(table_ident)?; + let metadata_location = self.new_metadata_location(table_metadata_dir); + self.file_io + .new_output(&metadata_location)? + .write(serde_json::to_vec(&next_metadata)?.into()) + .await?; + + root_namespace_state.update_table(table_ident, metadata_location)?; + + Ok(()) + } } #[async_trait] @@ -197,19 +221,14 @@ impl Catalog for MemoryCatalog { let metadata = TableMetadataBuilder::from_table_creation(table_creation)? .build()? .metadata; - let metadata_location = format!( - "{}/metadata/{}-{}.metadata.json", - &location, - 0, - Uuid::new_v4() - ); + let metadata_location = self.new_metadata_location(&location); self.file_io .new_output(&metadata_location)? .write(serde_json::to_vec(&metadata)?.into()) .await?; - root_namespace_state.insert_new_table(&table_ident, metadata_location.clone())?; + root_namespace_state.insert_new_table(&table_ident, location, metadata_location.clone())?; Table::builder() .file_io(self.file_io.clone()) @@ -263,19 +282,39 @@ impl Catalog for MemoryCatalog { let metadata_location = new_root_namespace_state .get_existing_table_location(src_table_ident)? .clone(); + let metadata_dir = new_root_namespace_state + .get_existing_table_metadata_dir(src_table_ident)? + .clone(); new_root_namespace_state.remove_existing_table(src_table_ident)?; - new_root_namespace_state.insert_new_table(dst_table_ident, metadata_location)?; + new_root_namespace_state.insert_new_table( + dst_table_ident, + metadata_dir, + metadata_location, + )?; *root_namespace_state = new_root_namespace_state; Ok(()) } /// Update a table to the catalog. - async fn update_table(&self, _commit: TableCommit) -> Result { - Err(Error::new( - ErrorKind::FeatureUnsupported, - "MemoryCatalog does not currently support updating tables.", - )) + async fn update_table(&self, mut commit: TableCommit) -> Result
{ + // Apply the update to get the new metadata. + let table = self.load_table(commit.identifier()).await?; + let requirements = commit.take_requirements(); + let updates = commit.take_updates(); + let metadata = table.metadata().clone(); + for requirement in requirements { + requirement.check(Some(&metadata))?; + } + let mut metadata_builder = metadata.into_builder(None); + for update in updates { + metadata_builder = update.apply(metadata_builder)?; + } + + self.commit_table(commit.identifier(), metadata_builder.build()?.metadata) + .await?; + + self.load_table(commit.identifier()).await } } @@ -287,6 +326,7 @@ mod tests { use iceberg::io::FileIOBuilder; use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type}; + use iceberg::transaction::Transaction; use regex::Regex; use tempfile::TempDir; @@ -1035,7 +1075,7 @@ mod tests { let table_name = "tbl1"; let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); let expected_table_metadata_location_regex = format!( - "^{}/tbl1/metadata/0-{}.metadata.json$", + "^{}/tbl1/metadata/{}.metadata.json$", namespace_location, UUID_REGEX_STR, ); @@ -1088,7 +1128,7 @@ mod tests { let expected_table_ident = TableIdent::new(nested_namespace_ident.clone(), table_name.into()); let expected_table_metadata_location_regex = format!( - "^{}/tbl1/metadata/0-{}.metadata.json$", + "^{}/tbl1/metadata/{}.metadata.json$", nested_namespace_location, UUID_REGEX_STR, ); @@ -1129,7 +1169,7 @@ mod tests { let table_name = "tbl1"; let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); let expected_table_metadata_location_regex = format!( - "^{}/a/tbl1/metadata/0-{}.metadata.json$", + "^{}/a/tbl1/metadata/{}.metadata.json$", warehouse_location, UUID_REGEX_STR ); @@ -1177,7 +1217,7 @@ mod tests { let expected_table_ident = TableIdent::new(nested_namespace_ident.clone(), table_name.into()); let expected_table_metadata_location_regex = format!( - "^{}/a/b/tbl1/metadata/0-{}.metadata.json$", + "^{}/a/b/tbl1/metadata/{}.metadata.json$", warehouse_location, UUID_REGEX_STR ); @@ -1678,4 +1718,71 @@ mod tests { ), ); } + + #[tokio::test] + async fn test_update_table() { + let catalog = new_memory_catalog(); + let namespace_ident = NamespaceIdent::new("n1".into()); + create_namespace(&catalog, &namespace_ident).await; + let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); + create_table(&catalog, &table_ident).await; + + let table = catalog.load_table(&table_ident).await.unwrap(); + assert!(table.metadata().properties().is_empty()); + + let transaction = Transaction::new(&table); + let transaction = transaction + .set_properties(HashMap::from_iter(vec![("k".to_string(), "v".to_string())])) + .unwrap(); + transaction.commit(&catalog).await.unwrap(); + + let table = catalog.load_table(&table_ident).await.unwrap(); + assert_eq!( + table.metadata().properties(), + &HashMap::from_iter(vec![("k".to_string(), "v".to_string())]) + ); + } + + #[tokio::test] + async fn test_update_rename_table() { + let catalog = new_memory_catalog(); + let namespace_ident = NamespaceIdent::new("n1".into()); + create_namespace(&catalog, &namespace_ident).await; + let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); + create_table(&catalog, &table_ident).await; + + let table = catalog.load_table(&table_ident).await.unwrap(); + assert!(table.metadata().properties().is_empty()); + + let transaction = Transaction::new(&table); + let transaction = transaction + .set_properties(HashMap::from_iter(vec![("k".to_string(), "v".to_string())])) + .unwrap(); + transaction.commit(&catalog).await.unwrap(); + + let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into()); + catalog + .rename_table(&table_ident, &dst_table_ident) + .await + .unwrap(); + + let table = catalog.load_table(&dst_table_ident).await.unwrap(); + let transaction = Transaction::new(&table); + let transaction = transaction + .set_properties(HashMap::from_iter(vec![( + "k1".to_string(), + "v2".to_string(), + )])) + .unwrap(); + transaction.commit(&catalog).await.unwrap(); + + let table = catalog.load_table(&dst_table_ident).await.unwrap(); + assert_eq!( + table.metadata().properties(), + &HashMap::from_iter(vec![ + ("k".to_string(), "v".to_string()), + ("k1".to_string(), "v2".to_string()) + ]) + ); + } } diff --git a/crates/catalog/memory/src/namespace_state.rs b/crates/catalog/memory/src/namespace_state.rs index 653487f3df..4f394be5a4 100644 --- a/crates/catalog/memory/src/namespace_state.rs +++ b/crates/catalog/memory/src/namespace_state.rs @@ -29,6 +29,8 @@ pub(crate) struct NamespaceState { namespaces: HashMap, // Mapping of tables to metadata locations in this namespace table_metadata_locations: HashMap, + // Mapping of tables to metadata dir locations in this namespace + table_metadata_dirs: HashMap, } fn no_such_namespace_err(namespace_ident: &NamespaceIdent) -> Result { @@ -175,6 +177,7 @@ impl NamespaceState { properties, namespaces: HashMap::new(), table_metadata_locations: HashMap::new(), + table_metadata_dirs: HashMap::new(), }); Ok(()) @@ -266,6 +269,7 @@ impl NamespaceState { pub(crate) fn insert_new_table( &mut self, table_ident: &TableIdent, + table_metadata_dir: String, metadata_location: String, ) -> Result<()> { let namespace = self.get_mut_namespace(table_ident.namespace())?; @@ -277,9 +281,45 @@ impl NamespaceState { hash_map::Entry::Occupied(_) => table_already_exists_err(table_ident), hash_map::Entry::Vacant(entry) => { let _ = entry.insert(metadata_location); + let dir = namespace + .table_metadata_dirs + .insert(table_ident.name().to_string(), table_metadata_dir); + // New table should not have a metadata dir. + assert_eq!(dir, None); + Ok(()) + } + } + } + + pub(crate) fn update_table( + &mut self, + table_ident: &TableIdent, + metadata_location: String, + ) -> Result<()> { + let namespace = self.get_mut_namespace(table_ident.namespace())?; + match namespace + .table_metadata_locations + .entry(table_ident.name().to_string()) + { + hash_map::Entry::Occupied(mut entry) => { + let _ = entry.insert(metadata_location); Ok(()) } + hash_map::Entry::Vacant(_) => no_such_table_err(table_ident), + } + } + + /// Return the metadata dir of the given table or an error if doesn't exist + pub(crate) fn get_existing_table_metadata_dir( + &self, + table_ident: &TableIdent, + ) -> Result<&String> { + let namespace = self.get_namespace(table_ident.namespace())?; + + match namespace.table_metadata_dirs.get(table_ident.name()) { + None => no_such_table_err(table_ident), + Some(table_metadata_dir) => Ok(table_metadata_dir), } } @@ -292,7 +332,10 @@ impl NamespaceState { .remove(table_ident.name()) { None => no_such_table_err(table_ident), - Some(metadata_location) => Ok(metadata_location), + Some(metadata_location) => { + let _ = namespace.table_metadata_dirs.remove(table_ident.name()); + Ok(metadata_location) + } } } }