Skip to content

Commit c21d385

Browse files
committed
Make primary terms fields private in index shard (#38036)
This commit encapsulates the primary terms fields in index shard. This is a precursor to pushing the operation primary term down to the replication tracker.
1 parent 1787394 commit c21d385

File tree

2 files changed

+22
-17
lines changed

2 files changed

+22
-17
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,8 +200,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
200200

201201
protected volatile ShardRouting shardRouting;
202202
protected volatile IndexShardState state;
203-
protected volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm
204-
protected volatile long operationPrimaryTerm;
203+
private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm
204+
private volatile long operationPrimaryTerm;
205205
protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
206206
final EngineFactory engineFactory;
207207

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -954,18 +954,18 @@ private void finish() {
954954
// our operation should be blocked until the previous operations complete
955955
assertFalse(onResponse.get());
956956
assertNull(onFailure.get());
957-
assertThat(indexShard.operationPrimaryTerm, equalTo(primaryTerm));
957+
assertThat(indexShard.getOperationPrimaryTerm(), equalTo(primaryTerm));
958958
assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(primaryTerm));
959959
Releasables.close(operation1);
960960
// our operation should still be blocked
961961
assertFalse(onResponse.get());
962962
assertNull(onFailure.get());
963-
assertThat(indexShard.operationPrimaryTerm, equalTo(primaryTerm));
963+
assertThat(indexShard.getOperationPrimaryTerm(), equalTo(primaryTerm));
964964
assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(primaryTerm));
965965
Releasables.close(operation2);
966966
barrier.await();
967967
// now lock acquisition should have succeeded
968-
assertThat(indexShard.operationPrimaryTerm, equalTo(newPrimaryTerm));
968+
assertThat(indexShard.getOperationPrimaryTerm(), equalTo(newPrimaryTerm));
969969
assertThat(indexShard.getPendingPrimaryTerm(), equalTo(newPrimaryTerm));
970970
assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(newPrimaryTerm));
971971
if (engineClosed) {
@@ -1006,7 +1006,7 @@ public void onFailure(Exception e) {
10061006
}
10071007
};
10081008

1009-
final long oldPrimaryTerm = indexShard.pendingPrimaryTerm - 1;
1009+
final long oldPrimaryTerm = indexShard.getPendingPrimaryTerm() - 1;
10101010
randomReplicaOperationPermitAcquisition(indexShard, oldPrimaryTerm, indexShard.getGlobalCheckpoint(),
10111011
randomNonNegativeLong(), onLockAcquired, "");
10121012
latch.await();
@@ -1028,7 +1028,7 @@ public void testAcquireReplicaPermitAdvanceMaxSeqNoOfUpdates() throws Exception
10281028

10291029
long newMaxSeqNoOfUpdates = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
10301030
PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
1031-
randomReplicaOperationPermitAcquisition(replica, replica.operationPrimaryTerm, replica.getGlobalCheckpoint(),
1031+
randomReplicaOperationPermitAcquisition(replica, replica.getOperationPrimaryTerm(), replica.getGlobalCheckpoint(),
10321032
newMaxSeqNoOfUpdates, fut, "");
10331033
try (Releasable ignored = fut.actionGet()) {
10341034
assertThat(replica.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(currentMaxSeqNoOfUpdates, newMaxSeqNoOfUpdates)));
@@ -1179,7 +1179,7 @@ public void testRollbackReplicaEngineOnPromotion() throws IOException, Interrupt
11791179
final Engine beforeRollbackEngine = indexShard.getEngine();
11801180
final long newMaxSeqNoOfUpdates = randomLongBetween(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), Long.MAX_VALUE);
11811181
randomReplicaOperationPermitAcquisition(indexShard,
1182-
indexShard.pendingPrimaryTerm + 1,
1182+
indexShard.getPendingPrimaryTerm() + 1,
11831183
globalCheckpoint,
11841184
newMaxSeqNoOfUpdates,
11851185
new ActionListener<Releasable>() {
@@ -2103,10 +2103,6 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception {
21032103
SourceToParse.source(indexName, "_doc", "doc-1", new BytesArray("{}"), XContentType.JSON));
21042104
flushShard(shard);
21052105
assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1"));
2106-
// Here we try to increase term (i.e. a new primary is promoted) without rolling back a replica so we can keep stale operations
2107-
// in the index commit; then verify that a recovery from store (started with the safe commit) will remove all stale operations.
2108-
shard.pendingPrimaryTerm++;
2109-
shard.operationPrimaryTerm++;
21102106
shard.getEngine().rollTranslogGeneration();
21112107
shard.markSeqNoAsNoop(1, "test");
21122108
shard.applyIndexOperationOnReplica(2, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
@@ -2116,11 +2112,20 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception {
21162112
closeShard(shard, false);
21172113
// Recovering from store should discard doc #1
21182114
final ShardRouting replicaRouting = shard.routingEntry();
2119-
IndexShard newShard = reinitShard(shard,
2120-
newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true, ShardRoutingState.INITIALIZING,
2121-
RecoverySource.ExistingStoreRecoverySource.INSTANCE));
2122-
newShard.pendingPrimaryTerm++;
2123-
newShard.operationPrimaryTerm++;
2115+
final IndexMetaData newShardIndexMetadata = IndexMetaData.builder(shard.indexSettings().getIndexMetaData())
2116+
.primaryTerm(replicaRouting.shardId().id(), shard.getOperationPrimaryTerm() + 1)
2117+
.build();
2118+
closeShards(shard);
2119+
IndexShard newShard = newShard(
2120+
newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true, ShardRoutingState.INITIALIZING,
2121+
RecoverySource.ExistingStoreRecoverySource.INSTANCE),
2122+
shard.shardPath(),
2123+
newShardIndexMetadata,
2124+
null,
2125+
null,
2126+
shard.getEngineFactory(),
2127+
shard.getGlobalCheckpointSyncer(),
2128+
EMPTY_EVENT_LISTENER);
21242129
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
21252130
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
21262131
assertTrue(newShard.recoverFromStore());

0 commit comments

Comments
 (0)