@@ -1210,6 +1210,37 @@ impl TableMetadataBuilder {
12101210 fn highest_sort_order_id ( & self ) -> Option < i64 > {
12111211 self . metadata . sort_orders . keys ( ) . max ( ) . copied ( )
12121212 }
1213+
1214+ /// Remove schemas by their ids from the table metadata.
1215+ /// Does nothing if a schema id is not present. Active schemas should not be removed.
1216+ pub fn remove_schemas ( mut self , schema_id_to_remove : & [ i32 ] ) -> Result < Self > {
1217+ if schema_id_to_remove. contains ( & self . metadata . current_schema_id ) {
1218+ return Err ( Error :: new (
1219+ ErrorKind :: DataInvalid ,
1220+ "Cannot remove current schema" ,
1221+ ) ) ;
1222+ }
1223+
1224+ if schema_id_to_remove. is_empty ( ) {
1225+ return Ok ( self ) ;
1226+ }
1227+
1228+ let mut removed_schemas = Vec :: with_capacity ( schema_id_to_remove. len ( ) ) ;
1229+ self . metadata . schemas . retain ( |id, _schema| {
1230+ if schema_id_to_remove. contains ( id) {
1231+ removed_schemas. push ( * id) ;
1232+ false
1233+ } else {
1234+ true
1235+ }
1236+ } ) ;
1237+
1238+ self . changes . push ( TableUpdate :: RemoveSchemas {
1239+ schema_ids : removed_schemas,
1240+ } ) ;
1241+
1242+ Ok ( self )
1243+ }
12131244}
12141245
12151246impl From < TableMetadataBuildResult > for TableMetadata {
@@ -2412,4 +2443,54 @@ mod tests {
24122443 table. metadata( ) . current_snapshot_id( ) . unwrap( )
24132444 ) ;
24142445 }
2446+
2447+ #[ test]
2448+ fn test_active_schema_cannot_be_removed ( ) {
2449+ let builder = builder_without_changes ( FormatVersion :: V2 ) ;
2450+ builder. remove_schemas ( & [ 0 ] ) . unwrap_err ( ) ;
2451+ }
2452+
2453+ #[ test]
2454+ fn test_remove_schemas ( ) {
2455+ let file = File :: open ( format ! (
2456+ "{}/testdata/table_metadata/{}" ,
2457+ env!( "CARGO_MANIFEST_DIR" ) ,
2458+ "TableMetadataV2Valid.json"
2459+ ) )
2460+ . unwrap ( ) ;
2461+ let reader = BufReader :: new ( file) ;
2462+ let resp = serde_json:: from_reader :: < _ , TableMetadata > ( reader) . unwrap ( ) ;
2463+
2464+ let table = Table :: builder ( )
2465+ . metadata ( resp)
2466+ . metadata_location ( "s3://bucket/test/location/metadata/v1.json" . to_string ( ) )
2467+ . identifier ( TableIdent :: from_strs ( [ "ns1" , "test1" ] ) . unwrap ( ) )
2468+ . file_io ( FileIOBuilder :: new ( "memory" ) . build ( ) . unwrap ( ) )
2469+ . build ( )
2470+ . unwrap ( ) ;
2471+
2472+ assert_eq ! ( 2 , table. metadata( ) . schemas. len( ) ) ;
2473+
2474+ {
2475+ // can not remove active schema
2476+ let meta_data_builder = table. metadata ( ) . clone ( ) . into_builder ( None ) ;
2477+ meta_data_builder. remove_schemas ( & [ 1 ] ) . unwrap_err ( ) ;
2478+ }
2479+
2480+ let mut meta_data_builder = table. metadata ( ) . clone ( ) . into_builder ( None ) ;
2481+ meta_data_builder = meta_data_builder. remove_schemas ( & [ 0 ] ) . unwrap ( ) ;
2482+ let build_result = meta_data_builder. build ( ) . unwrap ( ) ;
2483+ assert_eq ! ( 1 , build_result. metadata. schemas. len( ) ) ;
2484+ assert_eq ! ( 1 , build_result. metadata. current_schema_id) ;
2485+ assert_eq ! ( 1 , build_result. metadata. current_schema( ) . schema_id( ) ) ;
2486+ assert_eq ! ( 1 , build_result. changes. len( ) ) ;
2487+
2488+ let remove_schema_ids =
2489+ if let TableUpdate :: RemoveSchemas { schema_ids } = & build_result. changes [ 0 ] {
2490+ schema_ids
2491+ } else {
2492+ unreachable ! ( "Expected RemoveSchema change" )
2493+ } ;
2494+ assert_eq ! ( remove_schema_ids, & [ 0 ] ) ;
2495+ }
24152496}
0 commit comments