From de9b82b3bb705bfcca79efb69d05152d438b2319 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 6 Aug 2018 16:45:12 -0400 Subject: [PATCH 1/6] Expose whether or not the global checkpoint updated It will be useful for future efforts to know if the global checkpoint was updated. To this end, we need to expose whether or not the global checkpoint was updated when the state of the replication tracker updates. For this, we add to the tracker a callback that is invoked whenever the global checkpoint is updated. For primaries this will be invoked when the computed global checkpoint is updated based on state changes to the tracker. For replicas this will be invoked when the local knowledge of the global checkpoint is advanced from the primary. --- .../index/seqno/ReplicationTracker.java | 8 +- .../elasticsearch/index/shard/IndexShard.java | 5 +- .../index/seqno/ReplicationTrackerTests.java | 123 +++++++++++++++--- .../index/engine/EngineTestCase.java | 2 +- 4 files changed, 114 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index e868da5e82ac6..b8e8c0cf5c3b4 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -127,6 +127,8 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L */ final Map checkpoints; + final LongConsumer onGlobalCheckpointUpdated; + /** * This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the * current global checkpoint. @@ -391,7 +393,8 @@ public ReplicationTracker( final ShardId shardId, final String allocationId, final IndexSettings indexSettings, - final long globalCheckpoint) { + final long globalCheckpoint, + final LongConsumer onGlobalCheckpointUpdated) { super(shardId, indexSettings); assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint; this.shardAllocationId = allocationId; @@ -400,6 +403,7 @@ public ReplicationTracker( this.appliedClusterStateVersion = -1L; this.checkpoints = new HashMap<>(1 + indexSettings.getNumberOfReplicas()); checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false)); + this.onGlobalCheckpointUpdated = onGlobalCheckpointUpdated; this.pendingInSync = new HashSet<>(); this.routingTable = null; this.replicationGroup = null; @@ -487,6 +491,7 @@ private void updateGlobalCheckpoint(final String allocationId, final long global if (cps != null && globalCheckpoint > cps.globalCheckpoint) { ifUpdated.accept(cps.globalCheckpoint); cps.globalCheckpoint = globalCheckpoint; + onGlobalCheckpointUpdated.accept(globalCheckpoint); } } @@ -739,6 +744,7 @@ private synchronized void updateGlobalCheckpointOnPrimary() { if (globalCheckpoint != computedGlobalCheckpoint) { logger.trace("global checkpoint updated to [{}]", computedGlobalCheckpoint); cps.globalCheckpoint = computedGlobalCheckpoint; + onGlobalCheckpointUpdated.accept(computedGlobalCheckpoint); } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 9131055bcd928..17b39274c0a3d 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -297,8 +297,9 @@ public IndexShard( this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP); this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays); - this.replicationTracker = new ReplicationTracker(shardId, shardRouting.allocationId().getId(), indexSettings, - SequenceNumbers.UNASSIGNED_SEQ_NO); + final String aId = shardRouting.allocationId().getId(); + this.replicationTracker = + new ReplicationTracker(shardId, aId, indexSettings, SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint -> {}); // the query cache is a node-level thing, however we want the most popular filters // to be computed on a per-shard basis if (IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.get(settings)) { diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java index e001f82809b07..c2945f8c1bc2d 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java @@ -47,7 +47,9 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; +import java.util.function.LongConsumer; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -60,7 +62,7 @@ import static org.hamcrest.Matchers.not; public class ReplicationTrackerTests extends ESTestCase { - + public void testEmptyShards() { final ReplicationTracker tracker = newTracker(AllocationId.newInitializing()); assertThat(tracker.getGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); @@ -99,6 +101,17 @@ private static Set ids(Set allocationIds) { return allocationIds.stream().map(AllocationId::getId).collect(Collectors.toSet()); } + private void updateLocalCheckpoint(final ReplicationTracker tracker, final String allocationId, final long localCheckpoint) { + final long globalCheckpoint = tracker.getGlobalCheckpoint(); + updatedGlobalCheckpoint.set(globalCheckpoint); + tracker.updateLocalCheckpoint(allocationId, localCheckpoint); + if (globalCheckpoint == tracker.getGlobalCheckpoint()) { + assertThat(updatedGlobalCheckpoint.get(), equalTo(globalCheckpoint)); + } else { + assertThat(updatedGlobalCheckpoint.get(), equalTo(tracker.getGlobalCheckpoint())); + } + } + public void testGlobalCheckpointUpdate() { final long initialClusterStateVersion = randomNonNegativeLong(); Map allocations = new HashMap<>(); @@ -137,14 +150,14 @@ public void testGlobalCheckpointUpdate() { assertThat(tracker.getReplicationGroup().getReplicationTargets().size(), equalTo(1)); initializing.forEach(aId -> markAsTrackingAndInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED)); assertThat(tracker.getReplicationGroup().getReplicationTargets().size(), equalTo(1 + initializing.size())); - allocations.keySet().forEach(aId -> tracker.updateLocalCheckpoint(aId.getId(), allocations.get(aId))); + allocations.keySet().forEach(aId -> updateLocalCheckpoint(tracker, aId.getId(), allocations.get(aId))); assertThat(tracker.getGlobalCheckpoint(), equalTo(minLocalCheckpoint)); // increment checkpoints active.forEach(aId -> allocations.put(aId, allocations.get(aId) + 1 + randomInt(4))); initializing.forEach(aId -> allocations.put(aId, allocations.get(aId) + 1 + randomInt(4))); - allocations.keySet().forEach(aId -> tracker.updateLocalCheckpoint(aId.getId(), allocations.get(aId))); + allocations.keySet().forEach(aId -> updateLocalCheckpoint(tracker, aId.getId(), allocations.get(aId))); final long minLocalCheckpointAfterUpdates = allocations.entrySet().stream().map(Map.Entry::getValue).min(Long::compareTo).orElse(UNASSIGNED_SEQ_NO); @@ -153,7 +166,7 @@ public void testGlobalCheckpointUpdate() { final AllocationId extraId = AllocationId.newInitializing(); // first check that adding it without the master blessing doesn't change anything. - tracker.updateLocalCheckpoint(extraId.getId(), minLocalCheckpointAfterUpdates + 1 + randomInt(4)); + updateLocalCheckpoint(tracker, extraId.getId(), minLocalCheckpointAfterUpdates + 1 + randomInt(4)); assertNull(tracker.checkpoints.get(extraId)); expectThrows(IllegalStateException.class, () -> tracker.initiateTracking(extraId.getId())); @@ -165,7 +178,7 @@ public void testGlobalCheckpointUpdate() { // now notify for the new id if (randomBoolean()) { - tracker.updateLocalCheckpoint(extraId.getId(), minLocalCheckpointAfterUpdates + 1 + randomInt(4)); + updateLocalCheckpoint(tracker, extraId.getId(), minLocalCheckpointAfterUpdates + 1 + randomInt(4)); markAsTrackingAndInSyncQuietly(tracker, extraId.getId(), randomInt((int) minLocalCheckpointAfterUpdates)); } else { markAsTrackingAndInSyncQuietly(tracker, extraId.getId(), minLocalCheckpointAfterUpdates + 1 + randomInt(4)); @@ -175,6 +188,64 @@ public void testGlobalCheckpointUpdate() { assertThat(tracker.getGlobalCheckpoint(), greaterThan(minLocalCheckpoint)); } + public void testUpdateGlobalCheckpointOnReplica() { + final AllocationId active = AllocationId.newInitializing(); + final ReplicationTracker tracker = newTracker(active); + final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE - 1); + tracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "test"); + assertThat(updatedGlobalCheckpoint.get(), equalTo(globalCheckpoint)); + final long nonUpdate = randomLongBetween(NO_OPS_PERFORMED, globalCheckpoint); + updatedGlobalCheckpoint.set(UNASSIGNED_SEQ_NO); + tracker.updateGlobalCheckpointOnReplica(nonUpdate, "test"); + assertThat(updatedGlobalCheckpoint.get(), equalTo(UNASSIGNED_SEQ_NO)); + final long update = randomLongBetween(globalCheckpoint, Long.MAX_VALUE); + tracker.updateGlobalCheckpointOnReplica(update, "test"); + assertThat(updatedGlobalCheckpoint.get(), equalTo(update)); + } + + public void testMarkAllocationIdAsInSync() throws BrokenBarrierException, InterruptedException { + final long initialClusterStateVersion = randomNonNegativeLong(); + Map activeWithCheckpoints = randomAllocationsWithLocalCheckpoints(1, 1); + Set active = new HashSet<>(activeWithCheckpoints.keySet()); + Map initializingWithCheckpoints = randomAllocationsWithLocalCheckpoints(1, 1); + Set initializing = new HashSet<>(initializingWithCheckpoints.keySet()); + final AllocationId primaryId = active.iterator().next(); + final AllocationId replicaId = initializing.iterator().next(); + final ReplicationTracker tracker = newTracker(primaryId); + tracker.updateFromMaster(initialClusterStateVersion, ids(active), routingTable(initializing, primaryId), emptySet()); + final long localCheckpoint = randomLongBetween(0, Long.MAX_VALUE - 1); + tracker.activatePrimaryMode(localCheckpoint); + tracker.initiateTracking(replicaId.getId()); + final CyclicBarrier barrier = new CyclicBarrier(2); + final Thread thread = new Thread(() -> { + try { + barrier.await(); + tracker.markAllocationIdAsInSync( + replicaId.getId(), + randomLongBetween(NO_OPS_PERFORMED, localCheckpoint - 1)); + barrier.await(); + } catch (BrokenBarrierException | InterruptedException e) { + throw new AssertionError(e); + } + }); + thread.start(); + barrier.await(); + awaitBusy(tracker::pendingInSync); + final long updatedLocalCheckpoint = randomLongBetween(1 + localCheckpoint, Long.MAX_VALUE); + // there is a shard copy pending in sync, the global checkpoint can not advance + updatedGlobalCheckpoint.set(UNASSIGNED_SEQ_NO); + tracker.updateLocalCheckpoint(primaryId.getId(), updatedLocalCheckpoint); + assertThat(updatedGlobalCheckpoint.get(), equalTo(UNASSIGNED_SEQ_NO)); + // we are implicitly marking the pending in sync copy as in sync with the current global checkpoint, no advancement should occur + tracker.updateLocalCheckpoint(replicaId.getId(), localCheckpoint); + assertThat(updatedGlobalCheckpoint.get(), equalTo(UNASSIGNED_SEQ_NO)); + barrier.await(); + thread.join(); + // now we expect that the global checkpoint would advance + tracker.markAllocationIdAsInSync(replicaId.getId(), updatedLocalCheckpoint); + assertThat(updatedGlobalCheckpoint.get(), equalTo(updatedLocalCheckpoint)); + } + public void testMissingActiveIdsPreventAdvance() { final Map active = randomAllocationsWithLocalCheckpoints(2, 5); final Map initializing = randomAllocationsWithLocalCheckpoints(0, 5); @@ -182,6 +253,7 @@ public void testMissingActiveIdsPreventAdvance() { assigned.putAll(active); assigned.putAll(initializing); AllocationId primaryId = active.keySet().iterator().next(); + updatedGlobalCheckpoint.set(UNASSIGNED_SEQ_NO); final ReplicationTracker tracker = newTracker(primaryId); tracker.updateFromMaster(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet(), primaryId), emptySet()); tracker.activatePrimaryMode(NO_OPS_PERFORMED); @@ -191,14 +263,16 @@ public void testMissingActiveIdsPreventAdvance() { .entrySet() .stream() .filter(e -> !e.getKey().equals(missingActiveID)) - .forEach(e -> tracker.updateLocalCheckpoint(e.getKey().getId(), e.getValue())); + .forEach(e -> updateLocalCheckpoint(tracker, e.getKey().getId(), e.getValue())); if (missingActiveID.equals(primaryId) == false) { assertThat(tracker.getGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); + assertThat(updatedGlobalCheckpoint.get(), equalTo(UNASSIGNED_SEQ_NO)); } // now update all knowledge of all shards - assigned.forEach((aid, localCP) -> tracker.updateLocalCheckpoint(aid.getId(), localCP)); + assigned.forEach((aid, localCP) -> updateLocalCheckpoint(tracker, aid.getId(), localCP)); assertThat(tracker.getGlobalCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO))); + assertThat(updatedGlobalCheckpoint.get(), not(equalTo(UNASSIGNED_SEQ_NO))); } public void testMissingInSyncIdsPreventAdvance() { @@ -207,19 +281,22 @@ public void testMissingInSyncIdsPreventAdvance() { logger.info("active: {}, initializing: {}", active, initializing); AllocationId primaryId = active.keySet().iterator().next(); + updatedGlobalCheckpoint.set(NO_OPS_PERFORMED); final ReplicationTracker tracker = newTracker(primaryId); tracker.updateFromMaster(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet(), primaryId), emptySet()); tracker.activatePrimaryMode(NO_OPS_PERFORMED); randomSubsetOf(randomIntBetween(1, initializing.size() - 1), initializing.keySet()).forEach(aId -> markAsTrackingAndInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED)); - active.forEach((aid, localCP) -> tracker.updateLocalCheckpoint(aid.getId(), localCP)); + active.forEach((aid, localCP) -> updateLocalCheckpoint(tracker, aid.getId(), localCP)); assertThat(tracker.getGlobalCheckpoint(), equalTo(NO_OPS_PERFORMED)); + assertThat(updatedGlobalCheckpoint.get(), equalTo(NO_OPS_PERFORMED)); // update again - initializing.forEach((aid, localCP) -> tracker.updateLocalCheckpoint(aid.getId(), localCP)); + initializing.forEach((aid, localCP) -> updateLocalCheckpoint(tracker, aid.getId(), localCP)); assertThat(tracker.getGlobalCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO))); + assertThat(updatedGlobalCheckpoint.get(), not(equalTo(UNASSIGNED_SEQ_NO))); } public void testInSyncIdsAreIgnoredIfNotValidatedByMaster() { @@ -236,7 +313,7 @@ public void testInSyncIdsAreIgnoredIfNotValidatedByMaster() { List> allocations = Arrays.asList(active, initializing, nonApproved); Collections.shuffle(allocations, random()); - allocations.forEach(a -> a.forEach((aid, localCP) -> tracker.updateLocalCheckpoint(aid.getId(), localCP))); + allocations.forEach(a -> a.forEach((aid, localCP) -> updateLocalCheckpoint(tracker, aid.getId(), localCP))); assertThat(tracker.getGlobalCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO))); } @@ -271,7 +348,7 @@ public void testInSyncIdsAreRemovedIfNotValidatedByMaster() { initializing.forEach(k -> markAsTrackingAndInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED)); } if (randomBoolean()) { - allocations.forEach((aid, localCP) -> tracker.updateLocalCheckpoint(aid.getId(), localCP)); + allocations.forEach((aid, localCP) -> updateLocalCheckpoint(tracker, aid.getId(), localCP)); } // now remove shards @@ -281,9 +358,9 @@ public void testInSyncIdsAreRemovedIfNotValidatedByMaster() { ids(activeToStay.keySet()), routingTable(initializingToStay.keySet(), primaryId), emptySet()); - allocations.forEach((aid, ckp) -> tracker.updateLocalCheckpoint(aid.getId(), ckp + 10L)); + allocations.forEach((aid, ckp) -> updateLocalCheckpoint(tracker, aid.getId(), ckp + 10L)); } else { - allocations.forEach((aid, ckp) -> tracker.updateLocalCheckpoint(aid.getId(), ckp + 10L)); + allocations.forEach((aid, ckp) -> updateLocalCheckpoint(tracker, aid.getId(), ckp + 10L)); tracker.updateFromMaster( initialClusterStateVersion + 2, ids(activeToStay.keySet()), @@ -331,7 +408,7 @@ public void testWaitForAllocationIdToBeInSync() throws Exception { final List elements = IntStream.rangeClosed(0, globalCheckpoint - 1).boxed().collect(Collectors.toList()); Randomness.shuffle(elements); for (int i = 0; i < elements.size(); i++) { - tracker.updateLocalCheckpoint(trackingAllocationId.getId(), elements.get(i)); + updateLocalCheckpoint(tracker, trackingAllocationId.getId(), elements.get(i)); assertFalse(complete.get()); assertFalse(tracker.getTrackedLocalCheckpointForShard(trackingAllocationId.getId()).inSync); assertBusy(() -> assertTrue(tracker.pendingInSync.contains(trackingAllocationId.getId()))); @@ -339,7 +416,7 @@ public void testWaitForAllocationIdToBeInSync() throws Exception { if (randomBoolean()) { // normal path, shard catches up - tracker.updateLocalCheckpoint(trackingAllocationId.getId(), randomIntBetween(globalCheckpoint, 64)); + updateLocalCheckpoint(tracker, trackingAllocationId.getId(), randomIntBetween(globalCheckpoint, 64)); // synchronize with the waiting thread to mark that it is complete barrier.await(); assertTrue(complete.get()); @@ -355,13 +432,16 @@ public void testWaitForAllocationIdToBeInSync() throws Exception { assertFalse(tracker.pendingInSync.contains(trackingAllocationId.getId())); thread.join(); } + + private AtomicLong updatedGlobalCheckpoint = new AtomicLong(); private ReplicationTracker newTracker(final AllocationId allocationId) { return new ReplicationTracker( new ShardId("test", "_na_", 0), allocationId.getId(), IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), - UNASSIGNED_SEQ_NO); + UNASSIGNED_SEQ_NO, + updatedGlobalCheckpoint::set); } public void testWaitForAllocationIdToBeInSyncCanBeInterrupted() throws BrokenBarrierException, InterruptedException { @@ -486,12 +566,13 @@ public void testUpdateAllocationIdsFromMaster() throws Exception { == SequenceNumbers.UNASSIGNED_SEQ_NO)); // the tracking allocation IDs should play no role in determining the global checkpoint + updatedGlobalCheckpoint.set(UNASSIGNED_SEQ_NO); final Map activeLocalCheckpoints = newActiveAllocationIds.stream().collect(Collectors.toMap(Function.identity(), a -> randomIntBetween(1, 1024))); - activeLocalCheckpoints.forEach((a, l) -> tracker.updateLocalCheckpoint(a.getId(), l)); + activeLocalCheckpoints.forEach((a, l) -> updateLocalCheckpoint(tracker, a.getId(), l)); final Map initializingLocalCheckpoints = newInitializingAllocationIds.stream().collect(Collectors.toMap(Function.identity(), a -> randomIntBetween(1, 1024))); - initializingLocalCheckpoints.forEach((a, l) -> tracker.updateLocalCheckpoint(a.getId(), l)); + initializingLocalCheckpoints.forEach((a, l) -> updateLocalCheckpoint(tracker, a.getId(), l)); assertTrue( activeLocalCheckpoints .entrySet() @@ -504,6 +585,7 @@ public void testUpdateAllocationIdsFromMaster() throws Exception { .allMatch(e -> tracker.getTrackedLocalCheckpointForShard(e.getKey().getId()).getLocalCheckpoint() == e.getValue())); final long minimumActiveLocalCheckpoint = (long) activeLocalCheckpoints.values().stream().min(Integer::compareTo).get(); assertThat(tracker.getGlobalCheckpoint(), equalTo(minimumActiveLocalCheckpoint)); + assertThat(updatedGlobalCheckpoint.get(), equalTo(minimumActiveLocalCheckpoint)); final long minimumInitailizingLocalCheckpoint = (long) initializingLocalCheckpoints.values().stream().min(Integer::compareTo).get(); // now we are going to add a new allocation ID and bring it in sync which should move it to the in-sync allocation IDs @@ -635,10 +717,11 @@ public void testPrimaryContextHandoff() throws IOException { FakeClusterState clusterState = initialState(); final AllocationId primaryAllocationId = clusterState.routingTable.primaryShard().allocationId(); + final LongConsumer onUpdate = updatedGlobalCheckpoint -> {}; ReplicationTracker oldPrimary = - new ReplicationTracker(shardId, primaryAllocationId.getId(), indexSettings, UNASSIGNED_SEQ_NO); + new ReplicationTracker(shardId, primaryAllocationId.getId(), indexSettings, UNASSIGNED_SEQ_NO, onUpdate); ReplicationTracker newPrimary = - new ReplicationTracker(shardId, primaryAllocationId.getRelocationId(), indexSettings, UNASSIGNED_SEQ_NO); + new ReplicationTracker(shardId, primaryAllocationId.getRelocationId(), indexSettings, UNASSIGNED_SEQ_NO, onUpdate); Set allocationIds = new HashSet<>(Arrays.asList(oldPrimary.shardAllocationId, newPrimary.shardAllocationId)); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index f26522245493f..2a84a8f424616 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -460,7 +460,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) { TimeValue.timeValueMinutes(5), refreshListenerList, Collections.emptyList(), indexSort, handler, new NoneCircuitBreakerService(), globalCheckpointSupplier == null ? - new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED) : + new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED, update -> {}) : globalCheckpointSupplier, primaryTerm::get); return config; } From 2cccab0db2439ed5a7405d538f1429d55c08ebf0 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 6 Aug 2018 18:01:50 -0400 Subject: [PATCH 2/6] Add Javadoc --- .../org/elasticsearch/index/seqno/ReplicationTracker.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index b8e8c0cf5c3b4..254dfa5833aa6 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -127,7 +127,12 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L */ final Map checkpoints; - final LongConsumer onGlobalCheckpointUpdated; + /** + * A callback invoked when the global checkpoint is updated. For primary mode this occurs if the computed global checkpoint advances on + * the basis of state changes tracked here. For non-primary mode this occurs if the local knowledge of the global checkpoint advances + * due to an update from the primary. + */ + private final LongConsumer onGlobalCheckpointUpdated; /** * This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the From b0c4e1872cb4f6777b0a26302aa479b4bb90575c Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 6 Aug 2018 21:57:15 -0400 Subject: [PATCH 3/6] Add null check --- .../java/org/elasticsearch/index/seqno/ReplicationTracker.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 254dfa5833aa6..f3701cdfe70e3 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -39,6 +39,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Objects; import java.util.OptionalLong; import java.util.Set; import java.util.function.Function; @@ -408,7 +409,7 @@ public ReplicationTracker( this.appliedClusterStateVersion = -1L; this.checkpoints = new HashMap<>(1 + indexSettings.getNumberOfReplicas()); checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false)); - this.onGlobalCheckpointUpdated = onGlobalCheckpointUpdated; + this.onGlobalCheckpointUpdated = Objects.requireNonNull(onGlobalCheckpointUpdated); this.pendingInSync = new HashSet<>(); this.routingTable = null; this.replicationGroup = null; From a477ff4f27a91d1a7eec2c6840983ebd8a89e5dc Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 7 Aug 2018 06:43:03 -0400 Subject: [PATCH 4/6] Only update for replica update on replica --- .../index/seqno/ReplicationTracker.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index f3701cdfe70e3..b406621e978da 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -466,7 +466,10 @@ public synchronized void updateGlobalCheckpointOnReplica(final long globalCheckp updateGlobalCheckpoint( shardAllocationId, globalCheckpoint, - current -> logger.trace("updating global checkpoint from [{}] to [{}] due to [{}]", current, globalCheckpoint, reason)); + current -> { + logger.trace("updated global checkpoint from [{}] to [{}] due to [{}]", current, globalCheckpoint, reason); + onGlobalCheckpointUpdated.accept(globalCheckpoint); + }); assert invariant(); } @@ -484,7 +487,7 @@ public synchronized void updateGlobalCheckpointForShard(final String allocationI allocationId, globalCheckpoint, current -> logger.trace( - "updating local knowledge for [{}] on the primary of the global checkpoint from [{}] to [{}]", + "updated local knowledge for [{}] on the primary of the global checkpoint from [{}] to [{}]", allocationId, current, globalCheckpoint)); @@ -495,9 +498,8 @@ private void updateGlobalCheckpoint(final String allocationId, final long global final CheckpointState cps = checkpoints.get(allocationId); assert !this.shardAllocationId.equals(allocationId) || cps != null; if (cps != null && globalCheckpoint > cps.globalCheckpoint) { - ifUpdated.accept(cps.globalCheckpoint); cps.globalCheckpoint = globalCheckpoint; - onGlobalCheckpointUpdated.accept(globalCheckpoint); + ifUpdated.accept(cps.globalCheckpoint); } } @@ -748,8 +750,8 @@ private synchronized void updateGlobalCheckpointOnPrimary() { assert computedGlobalCheckpoint >= globalCheckpoint : "new global checkpoint [" + computedGlobalCheckpoint + "] is lower than previous one [" + globalCheckpoint + "]"; if (globalCheckpoint != computedGlobalCheckpoint) { - logger.trace("global checkpoint updated to [{}]", computedGlobalCheckpoint); cps.globalCheckpoint = computedGlobalCheckpoint; + logger.trace("updated global checkpoint to [{}]", computedGlobalCheckpoint); onGlobalCheckpointUpdated.accept(computedGlobalCheckpoint); } } From 614a350a5891a1d4bd5f4d7675c7827dbf0d3f05 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 7 Aug 2018 09:37:22 -0400 Subject: [PATCH 5/6] Simplify test assertions --- .../index/seqno/ReplicationTrackerTests.java | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java index c2945f8c1bc2d..af5d80ace96d3 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java @@ -102,14 +102,8 @@ private static Set ids(Set allocationIds) { } private void updateLocalCheckpoint(final ReplicationTracker tracker, final String allocationId, final long localCheckpoint) { - final long globalCheckpoint = tracker.getGlobalCheckpoint(); - updatedGlobalCheckpoint.set(globalCheckpoint); tracker.updateLocalCheckpoint(allocationId, localCheckpoint); - if (globalCheckpoint == tracker.getGlobalCheckpoint()) { - assertThat(updatedGlobalCheckpoint.get(), equalTo(globalCheckpoint)); - } else { - assertThat(updatedGlobalCheckpoint.get(), equalTo(tracker.getGlobalCheckpoint())); - } + assertThat(updatedGlobalCheckpoint.get(), equalTo(tracker.getGlobalCheckpoint())); } public void testGlobalCheckpointUpdate() { @@ -433,7 +427,7 @@ public void testWaitForAllocationIdToBeInSync() throws Exception { thread.join(); } - private AtomicLong updatedGlobalCheckpoint = new AtomicLong(); + private AtomicLong updatedGlobalCheckpoint = new AtomicLong(UNASSIGNED_SEQ_NO); private ReplicationTracker newTracker(final AllocationId allocationId) { return new ReplicationTracker( From c5204c559d2f3e943f6500044a664ea79fdb9308 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 7 Aug 2018 11:43:30 -0400 Subject: [PATCH 6/6] Fix test failure --- .../org/elasticsearch/index/seqno/ReplicationTrackerTests.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java index af5d80ace96d3..3948da9c1119c 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java @@ -247,7 +247,6 @@ public void testMissingActiveIdsPreventAdvance() { assigned.putAll(active); assigned.putAll(initializing); AllocationId primaryId = active.keySet().iterator().next(); - updatedGlobalCheckpoint.set(UNASSIGNED_SEQ_NO); final ReplicationTracker tracker = newTracker(primaryId); tracker.updateFromMaster(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet(), primaryId), emptySet()); tracker.activatePrimaryMode(NO_OPS_PERFORMED); @@ -275,7 +274,6 @@ public void testMissingInSyncIdsPreventAdvance() { logger.info("active: {}, initializing: {}", active, initializing); AllocationId primaryId = active.keySet().iterator().next(); - updatedGlobalCheckpoint.set(NO_OPS_PERFORMED); final ReplicationTracker tracker = newTracker(primaryId); tracker.updateFromMaster(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet(), primaryId), emptySet()); tracker.activatePrimaryMode(NO_OPS_PERFORMED); @@ -560,7 +558,6 @@ public void testUpdateAllocationIdsFromMaster() throws Exception { == SequenceNumbers.UNASSIGNED_SEQ_NO)); // the tracking allocation IDs should play no role in determining the global checkpoint - updatedGlobalCheckpoint.set(UNASSIGNED_SEQ_NO); final Map activeLocalCheckpoints = newActiveAllocationIds.stream().collect(Collectors.toMap(Function.identity(), a -> randomIntBetween(1, 1024))); activeLocalCheckpoints.forEach((a, l) -> updateLocalCheckpoint(tracker, a.getId(), l));