@@ -60,6 +60,7 @@ use super::utils::{append_query_filters, establish_connection};
60
60
use crate :: checkpoint:: {
61
61
IndexCheckpointDelta , PartitionId , SourceCheckpoint , SourceCheckpointDelta ,
62
62
} ;
63
+ use crate :: file_backed:: MutationOccurred ;
63
64
use crate :: metastore:: postgres:: utils:: split_maturity_timestamp;
64
65
use crate :: metastore:: { PublishSplitsRequestExt , STREAM_SPLITS_CHUNK_SIZE } ;
65
66
use crate :: {
@@ -288,26 +289,31 @@ macro_rules! run_with_tx {
288
289
} } ;
289
290
}
290
291
291
- async fn mutate_index_metadata < E , M : FnOnce ( & mut IndexMetadata ) -> Result < bool , E > > (
292
+ async fn mutate_index_metadata <
293
+ E ,
294
+ M : FnOnce ( IndexMetadata ) -> Result < MutationOccurred < IndexMetadata > , E > ,
295
+ > (
292
296
tx : & mut Transaction < ' _ , Postgres > ,
293
297
index_uid : IndexUid ,
294
298
mutate_fn : M ,
295
- ) -> MetastoreResult < bool >
299
+ ) -> MetastoreResult < IndexMetadata >
296
300
where
297
301
MetastoreError : From < E > ,
298
302
{
299
303
let index_id = & index_uid. index_id ;
300
- let mut index_metadata = index_metadata ( tx, index_id) . await ?;
304
+ let index_metadata = index_metadata ( tx, index_id) . await ?;
301
305
if index_metadata. index_uid != index_uid {
302
306
return Err ( MetastoreError :: NotFound ( EntityKind :: Index {
303
307
index_id : index_id. to_string ( ) ,
304
308
} ) ) ;
305
309
}
306
- let mutation_occurred = mutate_fn ( & mut index_metadata) ?;
307
- if !mutation_occurred {
308
- return Ok ( mutation_occurred) ;
309
- }
310
- let index_metadata_json = serde_json:: to_string ( & index_metadata) . map_err ( |error| {
310
+
311
+ let mutated_index_metadata = match mutate_fn ( index_metadata) ? {
312
+ MutationOccurred :: Yes ( index_metadata) => index_metadata,
313
+ MutationOccurred :: No ( index_metadata) => return Ok ( index_metadata) ,
314
+ } ;
315
+
316
+ let index_metadata_json = serde_json:: to_string ( & mutated_index_metadata) . map_err ( |error| {
311
317
MetastoreError :: JsonSerializeError {
312
318
struct_name : "IndexMetadata" . to_string ( ) ,
313
319
message : error. to_string ( ) ,
@@ -329,7 +335,7 @@ where
329
335
index_id : index_id. to_string ( ) ,
330
336
} ) ) ;
331
337
}
332
- Ok ( mutation_occurred )
338
+ Ok ( mutated_index_metadata )
333
339
}
334
340
335
341
#[ async_trait]
@@ -406,32 +412,25 @@ impl MetastoreService for PostgresqlMetastore {
406
412
let retention_policy_opt = request. deserialize_retention_policy ( ) ?;
407
413
let search_settings = request. deserialize_search_settings ( ) ?;
408
414
let index_uid: IndexUid = request. index_uid ( ) . clone ( ) ;
409
- let mut mutated_metadata_opt = None ;
410
- let mutated_metadata_ref = & mut mutated_metadata_opt;
411
- run_with_tx ! ( self . connection_pool, tx, {
415
+ let updated_metadata = run_with_tx ! ( self . connection_pool, tx, {
412
416
mutate_index_metadata:: <MetastoreError , _>(
413
417
tx,
414
418
index_uid,
415
- |index_metadata: & mut IndexMetadata | {
416
- let mutated = if index_metadata. index_config. search_settings != search_settings
419
+ |mut index_metadata: IndexMetadata | {
420
+ if index_metadata. index_config. search_settings != search_settings
417
421
|| index_metadata. index_config. retention_policy_opt != retention_policy_opt
418
422
{
419
423
index_metadata. index_config. search_settings = search_settings;
420
424
index_metadata. index_config. retention_policy_opt = retention_policy_opt;
421
- true
425
+ Ok ( MutationOccurred :: Yes ( index_metadata ) )
422
426
} else {
423
- false
424
- } ;
425
- * mutated_metadata_ref = Some ( index_metadata. clone( ) ) ;
426
- Ok ( mutated)
427
+ Ok ( MutationOccurred :: No ( index_metadata) )
428
+ }
427
429
} ,
428
430
)
429
- . await ?;
430
- Ok ( ( ) )
431
+ . await
431
432
} ) ?;
432
- let mutated_metadata =
433
- mutated_metadata_opt. expect ( "Mutated IndexMetadata should be set by transaction" ) ;
434
- IndexMetadataResponse :: try_from_index_metadata ( & mutated_metadata)
433
+ IndexMetadataResponse :: try_from_index_metadata ( & updated_metadata)
435
434
}
436
435
437
436
#[ instrument( skip_all, fields( index_id=%request. index_uid( ) ) ) ]
@@ -975,9 +974,9 @@ impl MetastoreService for PostgresqlMetastore {
975
974
mutate_index_metadata:: <MetastoreError , _>(
976
975
tx,
977
976
index_uid,
978
- |index_metadata: & mut IndexMetadata | {
977
+ |mut index_metadata: IndexMetadata | {
979
978
index_metadata. add_source( source_config) ?;
980
- Ok ( true )
979
+ Ok ( MutationOccurred :: Yes ( index_metadata ) )
981
980
} ,
982
981
)
983
982
. await ?;
@@ -993,8 +992,12 @@ impl MetastoreService for PostgresqlMetastore {
993
992
) -> MetastoreResult < EmptyResponse > {
994
993
let index_uid: IndexUid = request. index_uid ( ) . clone ( ) ;
995
994
run_with_tx ! ( self . connection_pool, tx, {
996
- mutate_index_metadata( tx, index_uid, |index_metadata| {
997
- index_metadata. toggle_source( & request. source_id, request. enable)
995
+ mutate_index_metadata( tx, index_uid, |mut index_metadata| {
996
+ if index_metadata. toggle_source( & request. source_id, request. enable) ? {
997
+ Ok :: <_, MetastoreError >( MutationOccurred :: Yes ( index_metadata) )
998
+ } else {
999
+ Ok :: <_, MetastoreError >( MutationOccurred :: No ( index_metadata) )
1000
+ }
998
1001
} )
999
1002
. await ?;
1000
1003
Ok ( ( ) )
@@ -1010,8 +1013,9 @@ impl MetastoreService for PostgresqlMetastore {
1010
1013
let index_uid: IndexUid = request. index_uid ( ) . clone ( ) ;
1011
1014
let source_id = request. source_id . clone ( ) ;
1012
1015
run_with_tx ! ( self . connection_pool, tx, {
1013
- mutate_index_metadata( tx, index_uid. clone( ) , |index_metadata| {
1014
- index_metadata. delete_source( & source_id)
1016
+ mutate_index_metadata( tx, index_uid. clone( ) , |mut index_metadata| {
1017
+ index_metadata. delete_source( & source_id) ?;
1018
+ Ok :: <_, MetastoreError >( MutationOccurred :: Yes ( index_metadata) )
1015
1019
} )
1016
1020
. await ?;
1017
1021
sqlx:: query(
@@ -1038,8 +1042,12 @@ impl MetastoreService for PostgresqlMetastore {
1038
1042
) -> MetastoreResult < EmptyResponse > {
1039
1043
let index_uid: IndexUid = request. index_uid ( ) . clone ( ) ;
1040
1044
run_with_tx ! ( self . connection_pool, tx, {
1041
- mutate_index_metadata( tx, index_uid, |index_metadata| {
1042
- Ok :: <_, MetastoreError >( index_metadata. checkpoint. reset_source( & request. source_id) )
1045
+ mutate_index_metadata( tx, index_uid, |mut index_metadata| {
1046
+ if index_metadata. checkpoint. reset_source( & request. source_id) {
1047
+ Ok :: <_, MetastoreError >( MutationOccurred :: Yes ( index_metadata) )
1048
+ } else {
1049
+ Ok :: <_, MetastoreError >( MutationOccurred :: No ( index_metadata) )
1050
+ }
1043
1051
} )
1044
1052
. await ?;
1045
1053
Ok ( ( ) )
0 commit comments