3939import java .util .HashMap ;
4040import java .util .HashSet ;
4141import java .util .Map ;
42+ import java .util .Objects ;
4243import java .util .OptionalLong ;
4344import java .util .Set ;
4445import java .util .function .Function ;
@@ -127,6 +128,13 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
127128 */
128129 final Map <String , CheckpointState > checkpoints ;
129130
131+ /**
132+ * A callback invoked when the global checkpoint is updated. For primary mode this occurs if the computed global checkpoint advances on
133+ * the basis of state changes tracked here. For non-primary mode this occurs if the local knowledge of the global checkpoint advances
134+ * due to an update from the primary.
135+ */
136+ private final LongConsumer onGlobalCheckpointUpdated ;
137+
130138 /**
131139 * This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the
132140 * current global checkpoint.
@@ -391,7 +399,8 @@ public ReplicationTracker(
391399 final ShardId shardId ,
392400 final String allocationId ,
393401 final IndexSettings indexSettings ,
394- final long globalCheckpoint ) {
402+ final long globalCheckpoint ,
403+ final LongConsumer onGlobalCheckpointUpdated ) {
395404 super (shardId , indexSettings );
396405 assert globalCheckpoint >= SequenceNumbers .UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint ;
397406 this .shardAllocationId = allocationId ;
@@ -400,6 +409,7 @@ public ReplicationTracker(
400409 this .appliedClusterStateVersion = -1L ;
401410 this .checkpoints = new HashMap <>(1 + indexSettings .getNumberOfReplicas ());
402411 checkpoints .put (allocationId , new CheckpointState (SequenceNumbers .UNASSIGNED_SEQ_NO , globalCheckpoint , false , false ));
412+ this .onGlobalCheckpointUpdated = Objects .requireNonNull (onGlobalCheckpointUpdated );
403413 this .pendingInSync = new HashSet <>();
404414 this .routingTable = null ;
405415 this .replicationGroup = null ;
@@ -456,7 +466,10 @@ public synchronized void updateGlobalCheckpointOnReplica(final long globalCheckp
456466 updateGlobalCheckpoint (
457467 shardAllocationId ,
458468 globalCheckpoint ,
459- current -> logger .trace ("updating global checkpoint from [{}] to [{}] due to [{}]" , current , globalCheckpoint , reason ));
469+ current -> {
470+ logger .trace ("updated global checkpoint from [{}] to [{}] due to [{}]" , current , globalCheckpoint , reason );
471+ onGlobalCheckpointUpdated .accept (globalCheckpoint );
472+ });
460473 assert invariant ();
461474 }
462475
@@ -474,7 +487,7 @@ public synchronized void updateGlobalCheckpointForShard(final String allocationI
474487 allocationId ,
475488 globalCheckpoint ,
476489 current -> logger .trace (
477- "updating local knowledge for [{}] on the primary of the global checkpoint from [{}] to [{}]" ,
490+ "updated local knowledge for [{}] on the primary of the global checkpoint from [{}] to [{}]" ,
478491 allocationId ,
479492 current ,
480493 globalCheckpoint ));
@@ -485,8 +498,8 @@ private void updateGlobalCheckpoint(final String allocationId, final long global
485498 final CheckpointState cps = checkpoints .get (allocationId );
486499 assert !this .shardAllocationId .equals (allocationId ) || cps != null ;
487500 if (cps != null && globalCheckpoint > cps .globalCheckpoint ) {
488- ifUpdated .accept (cps .globalCheckpoint );
489501 cps .globalCheckpoint = globalCheckpoint ;
502+ ifUpdated .accept (cps .globalCheckpoint );
490503 }
491504 }
492505
@@ -737,8 +750,9 @@ private synchronized void updateGlobalCheckpointOnPrimary() {
737750 assert computedGlobalCheckpoint >= globalCheckpoint : "new global checkpoint [" + computedGlobalCheckpoint +
738751 "] is lower than previous one [" + globalCheckpoint + "]" ;
739752 if (globalCheckpoint != computedGlobalCheckpoint ) {
740- logger .trace ("global checkpoint updated to [{}]" , computedGlobalCheckpoint );
741753 cps .globalCheckpoint = computedGlobalCheckpoint ;
754+ logger .trace ("updated global checkpoint to [{}]" , computedGlobalCheckpoint );
755+ onGlobalCheckpointUpdated .accept (computedGlobalCheckpoint );
742756 }
743757 }
744758
0 commit comments