@@ -466,7 +466,10 @@ public synchronized void updateGlobalCheckpointOnReplica(final long globalCheckp
466466 updateGlobalCheckpoint (
467467 shardAllocationId ,
468468 globalCheckpoint ,
469- current -> logger .trace ("updating global checkpoint from [{}] to [{}] due to [{}]" , current , globalCheckpoint , reason ));
469+ current -> {
470+ logger .trace ("updated global checkpoint from [{}] to [{}] due to [{}]" , current , globalCheckpoint , reason );
471+ onGlobalCheckpointUpdated .accept (globalCheckpoint );
472+ });
470473 assert invariant ();
471474 }
472475
@@ -484,7 +487,7 @@ public synchronized void updateGlobalCheckpointForShard(final String allocationI
484487 allocationId ,
485488 globalCheckpoint ,
486489 current -> logger .trace (
487- "updating local knowledge for [{}] on the primary of the global checkpoint from [{}] to [{}]" ,
490+ "updated local knowledge for [{}] on the primary of the global checkpoint from [{}] to [{}]" ,
488491 allocationId ,
489492 current ,
490493 globalCheckpoint ));
@@ -495,9 +498,8 @@ private void updateGlobalCheckpoint(final String allocationId, final long global
495498 final CheckpointState cps = checkpoints .get (allocationId );
496499 assert !this .shardAllocationId .equals (allocationId ) || cps != null ;
497500 if (cps != null && globalCheckpoint > cps .globalCheckpoint ) {
498- ifUpdated .accept (cps .globalCheckpoint );
499501 cps .globalCheckpoint = globalCheckpoint ;
500- onGlobalCheckpointUpdated .accept (globalCheckpoint );
502+ ifUpdated .accept (cps . globalCheckpoint );
501503 }
502504 }
503505
@@ -748,8 +750,8 @@ private synchronized void updateGlobalCheckpointOnPrimary() {
748750 assert computedGlobalCheckpoint >= globalCheckpoint : "new global checkpoint [" + computedGlobalCheckpoint +
749751 "] is lower than previous one [" + globalCheckpoint + "]" ;
750752 if (globalCheckpoint != computedGlobalCheckpoint ) {
751- logger .trace ("global checkpoint updated to [{}]" , computedGlobalCheckpoint );
752753 cps .globalCheckpoint = computedGlobalCheckpoint ;
754+ logger .trace ("updated global checkpoint to [{}]" , computedGlobalCheckpoint );
753755 onGlobalCheckpointUpdated .accept (computedGlobalCheckpoint );
754756 }
755757 }
0 commit comments