Skip to content

Commit c328417

Browse files
committed
add isRelocated flag to simplify code reasoning
1 parent ceb330b commit c328417

File tree

9 files changed

+41
-27
lines changed

9 files changed

+41
-27
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -983,7 +983,7 @@ class PrimaryShardReference extends ShardReference
983983
}
984984

985985
public boolean isRelocated() {
986-
return indexShard.isPrimaryMode() == false;
986+
return indexShard.isRelocatedPrimary();
987987
}
988988

989989
@Override

server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
8585
* computation from that point on.
8686
*/
8787
volatile boolean primaryMode;
88+
8889
/**
8990
* Boolean flag that indicates if a relocation handoff is in progress. A handoff is started by calling {@link #startRelocationHandoff}
9091
* and is finished by either calling {@link #completeRelocationHandoff} or {@link #abortRelocationHandoff}, depending on whether the
@@ -102,6 +103,11 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
102103
*/
103104
boolean handoffInProgress;
104105

106+
/**
107+
* Boolean flag that indicates whether a relocation handoff completed (see {@link #completeRelocationHandoff}).
108+
*/
109+
volatile boolean relocated;
110+
105111
/**
106112
* The global checkpoint tracker relies on the property that cluster state updates are applied in-order. After transferring a primary
107113
* context from the primary relocation source to the target and initializing the target, it is possible for the target to apply a
@@ -260,6 +266,13 @@ public boolean isPrimaryMode() {
260266
return primaryMode;
261267
}
262268

269+
/**
270+
* Returns whether the replication tracker has relocated away to another shard copy.
271+
*/
272+
public boolean isRelocated() {
273+
return relocated;
274+
}
275+
263276
/**
264277
* Class invariant that should hold before and after every invocation of public methods on this class. As Java lacks implication
265278
* as a logical operator, many of the invariants are written under the form (!A || B), they should be read as (A implies B) however.
@@ -287,6 +300,9 @@ private boolean invariant() {
287300
// relocation handoff can only occur in primary mode
288301
assert !handoffInProgress || primaryMode;
289302

303+
// a relocated copy is not in primary mode
304+
assert !relocated || !primaryMode;
305+
290306
// the current shard is marked as in-sync when the global checkpoint tracker operates in primary mode
291307
assert !primaryMode || checkpoints.get(shardAllocationId).inSync;
292308

@@ -766,8 +782,10 @@ public synchronized void completeRelocationHandoff() {
766782
assert invariant();
767783
assert primaryMode;
768784
assert handoffInProgress;
785+
assert relocated == false;
769786
primaryMode = false;
770787
handoffInProgress = false;
788+
relocated = true;
771789
// forget all checkpoint information except for global checkpoint of current shard
772790
checkpoints.entrySet().stream().forEach(e -> {
773791
final CheckpointState cps = e.getValue();

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -425,18 +425,11 @@ public void updateShardState(final ShardRouting newRouting,
425425
"a primary relocation is completed by the master, but primary mode is not active " + currentRouting;
426426

427427
changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
428-
} else if (currentRouting.primary() && currentRouting.relocating() &&
429-
operationPrimaryTerm == pendingPrimaryTerm &&
430-
replicationTracker.isPrimaryMode() == false &&
428+
} else if (currentRouting.primary() && currentRouting.relocating() && replicationTracker.isRelocated() &&
431429
(newRouting.relocating() == false || newRouting.equalsIgnoringMetaData(currentRouting) == false)) {
432430
// if the shard is not in primary mode anymore (after primary relocation) we have to fail when any changes in shard routing occur (e.g. due to recovery
433431
// failure / cancellation). The reason is that at the moment we cannot safely reactivate primary mode without risking two
434432
// active primaries.
435-
// We check for operationPrimaryTerm to be equal to pendingPrimaryTerm, which ensures that we have a fully baked primary,
436-
// i.e. a primary that has has transitioned to primary mode. Assume for example an active replica, which then got a
437-
// cluster state where it became promoted to relocating primary. This means that we asynchronously start the transition to
438-
// primary in the background. A follow-up cluster state might then cancel relocation before we have completed the transition
439-
// to primary, which would result in the shard to be failed if we did not check for the operationPrimaryTerm.
440433
throw new IndexShardRelocatedException(shardId(), "Shard is marked as relocated, cannot safely move to state " + newRouting.state());
441434
}
442435
assert newRouting.active() == false || state == IndexShardState.STARTED || state == IndexShardState.CLOSED :
@@ -610,7 +603,7 @@ public void relocated(final Consumer<ReplicationTracker.PrimaryContext> consumer
610603
consumer.accept(primaryContext);
611604
synchronized (mutex) {
612605
verifyRelocatingState();
613-
replicationTracker.completeRelocationHandoff(); // make changes to primaryMode flag only under mutex
606+
replicationTracker.completeRelocationHandoff(); // make changes to primaryMode and relocated flag only under mutex
614607
}
615608
} catch (final Exception e) {
616609
try {
@@ -2113,10 +2106,11 @@ public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService
21132106
}
21142107

21152108
/**
2116-
* Returns whether the shard is in primary mode, i.e., in charge of replicating changes (see {@link ReplicationTracker}).
2109+
* Returns whether the shard is a relocated primary, i.e. not in charge anymore of replicating changes (see {@link ReplicationTracker}).
21172110
*/
2118-
public boolean isPrimaryMode() {
2119-
return replicationTracker.isPrimaryMode();
2111+
public boolean isRelocatedPrimary() {
2112+
assert shardRouting.primary() : "only call isRelocatedPrimary on primary shard";
2113+
return replicationTracker.isRelocated();
21202114
}
21212115

21222116
class ShardEventListener implements Engine.EventListener {

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ public void onFailure(Exception e) {
250250
try (Releasable ignored = FutureUtils.get(permit)) {
251251
// check that the IndexShard still has the primary authority. This needs to be checked under operation permit to prevent
252252
// races, as IndexShard will switch its authority only when it holds all operation permits, see IndexShard.relocated()
253-
if (primary.isPrimaryMode() == false) {
253+
if (primary.isRelocatedPrimary()) {
254254
throw new IndexShardRelocatedException(primary.shardId());
255255
}
256256
runnable.run();

server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -683,7 +683,7 @@ public void testSeqNoIsSetOnPrimary() throws Exception {
683683
final IndexShard shard = mock(IndexShard.class);
684684
when(shard.getPendingPrimaryTerm()).thenReturn(primaryTerm);
685685
when(shard.routingEntry()).thenReturn(routingEntry);
686-
when(shard.isPrimaryMode()).thenReturn(true);
686+
when(shard.isRelocatedPrimary()).thenReturn(false);
687687
IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().shardRoutingTable(shardId);
688688
Set<String> inSyncIds = randomBoolean() ? Collections.singleton(routingEntry.allocationId().getId()) :
689689
clusterService.state().metaData().index(index).inSyncAllocationIds(0);
@@ -1217,7 +1217,7 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService
12171217
}
12181218
return routing;
12191219
});
1220-
when(indexShard.isPrimaryMode()).thenAnswer(invocationOnMock -> isRelocated.get() == false);
1220+
when(indexShard.isRelocatedPrimary()).thenAnswer(invocationOnMock -> isRelocated.get());
12211221
doThrow(new AssertionError("failed shard is not supported")).when(indexShard).failShard(anyString(), any(Exception.class));
12221222
when(indexShard.getPendingPrimaryTerm()).thenAnswer(i ->
12231223
clusterService.state().metaData().getIndexSafe(shardId.getIndex()).primaryTerm(shardId.id()));

server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,7 @@ private IndexShard mockIndexShard(ShardId shardId, ClusterService clusterService
472472
}
473473
return routing;
474474
});
475-
when(indexShard.isPrimaryMode()).thenAnswer(invocationOnMock -> isRelocated.get() == false);
475+
when(indexShard.isRelocatedPrimary()).thenAnswer(invocationOnMock -> isRelocated.get());
476476
doThrow(new AssertionError("failed shard is not supported")).when(indexShard).failShard(anyString(), any(Exception.class));
477477
when(indexShard.getPendingPrimaryTerm()).thenAnswer(i ->
478478
clusterService.state().metaData().getIndexSafe(shardId.getIndex()).primaryTerm(shardId.id()));

server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -770,8 +770,10 @@ public void testPrimaryContextHandoff() throws IOException {
770770
assertThat(newPrimary.routingTable, equalTo(oldPrimary.routingTable));
771771
assertThat(newPrimary.replicationGroup, equalTo(oldPrimary.replicationGroup));
772772

773+
assertFalse(oldPrimary.relocated);
773774
oldPrimary.completeRelocationHandoff();
774775
assertFalse(oldPrimary.primaryMode);
776+
assertTrue(oldPrimary.relocated);
775777
}
776778

777779
public void testIllegalStateExceptionIfUnknownAllocationId() {

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1421,14 +1421,14 @@ public void testLockingBeforeAndAfterRelocated() throws Exception {
14211421
recoveryThread.start();
14221422
latch.await();
14231423
// recovery can only be finalized after we release the current primaryOperationLock
1424-
assertTrue(shard.isPrimaryMode());
1424+
assertFalse(shard.isRelocatedPrimary());
14251425
}
14261426
// recovery can be now finalized
14271427
recoveryThread.join();
1428-
assertFalse(shard.isPrimaryMode());
1428+
assertTrue(shard.isRelocatedPrimary());
14291429
try (Releasable ignored = acquirePrimaryOperationPermitBlockingly(shard)) {
14301430
// lock can again be acquired
1431-
assertFalse(shard.isPrimaryMode());
1431+
assertTrue(shard.isRelocatedPrimary());
14321432
}
14331433

14341434
closeShards(shard);
@@ -1470,7 +1470,7 @@ public void onResponse(Releasable releasable) {
14701470

14711471
public void testStressRelocated() throws Exception {
14721472
final IndexShard shard = newStartedShard(true);
1473-
assertTrue(shard.isPrimaryMode());
1473+
assertFalse(shard.isRelocatedPrimary());
14741474
IndexShardTestCase.updateRoutingEntry(shard, ShardRoutingHelper.relocate(shard.routingEntry(), "other_node"));
14751475
final int numThreads = randomIntBetween(2, 4);
14761476
Thread[] indexThreads = new Thread[numThreads];
@@ -1506,14 +1506,14 @@ public void run() {
15061506
assertThat(relocated.get(), equalTo(false));
15071507
assertThat(shard.getActiveOperationsCount(), greaterThan(0));
15081508
// ensure we only transition after pending operations completed
1509-
assertTrue(shard.isPrimaryMode());
1509+
assertFalse(shard.isRelocatedPrimary());
15101510
// complete pending operations
15111511
barrier.await();
15121512
// complete recovery/relocation
15131513
recoveryThread.join();
15141514
// ensure relocated successfully once pending operations are done
15151515
assertThat(relocated.get(), equalTo(true));
1516-
assertFalse(shard.isPrimaryMode());
1516+
assertTrue(shard.isRelocatedPrimary());
15171517
assertThat(shard.getActiveOperationsCount(), equalTo(0));
15181518

15191519
for (Thread indexThread : indexThreads) {
@@ -1577,7 +1577,7 @@ protected void doRun() throws Exception {
15771577
cyclicBarrier.await();
15781578
relocationThread.join();
15791579
cancellingThread.join();
1580-
if (shard.isPrimaryMode() == false) {
1580+
if (shard.isRelocatedPrimary()) {
15811581
logger.debug("shard was relocated successfully");
15821582
assertThat(cancellingException.get(), instanceOf(IllegalIndexShardStateException.class));
15831583
assertThat("current routing:" + shard.routingEntry(), shard.routingEntry().relocating(), equalTo(true));
@@ -1858,7 +1858,7 @@ public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedExc
18581858
ShardRouting inRecoveryRouting = ShardRoutingHelper.relocate(origRouting, "some_node");
18591859
IndexShardTestCase.updateRoutingEntry(shard, inRecoveryRouting);
18601860
shard.relocated(primaryContext -> {});
1861-
assertFalse(shard.isPrimaryMode());
1861+
assertTrue(shard.isRelocatedPrimary());
18621862
try {
18631863
IndexShardTestCase.updateRoutingEntry(shard, origRouting);
18641864
fail("Expected IndexShardRelocatedException");

server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ public void testThrowExceptionOnPrimaryRelocatedBeforePhase1Started() throws IOE
395395
final IndexShard shard = mock(IndexShard.class);
396396
when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class));
397397
when(shard.segmentStats(anyBoolean())).thenReturn(mock(SegmentsStats.class));
398-
when(shard.isPrimaryMode()).thenReturn(false);
398+
when(shard.isRelocatedPrimary()).thenReturn(true);
399399
when(shard.acquireSafeIndexCommit()).thenReturn(mock(Engine.IndexCommitRef.class));
400400
doAnswer(invocation -> {
401401
((ActionListener<Releasable>)invocation.getArguments()[0]).onResponse(() -> {});
@@ -444,7 +444,7 @@ public void testCancellationsDoesNotLeakPrimaryPermits() throws Exception {
444444
final CancellableThreads cancellableThreads = new CancellableThreads();
445445
final IndexShard shard = mock(IndexShard.class);
446446
final AtomicBoolean freed = new AtomicBoolean(true);
447-
when(shard.isPrimaryMode()).thenReturn(true);
447+
when(shard.isRelocatedPrimary()).thenReturn(false);
448448
doAnswer(invocation -> {
449449
freed.set(false);
450450
((ActionListener<Releasable>)invocation.getArguments()[0]).onResponse(() -> freed.set(true));

0 commit comments

Comments
 (0)