Skip to content

Commit 5785941

Browse files
authored
Skip global checkpoint sync for closed indices (#41874)
The verifying-before-close step ensures the global checkpoints on all shard copies are in sync; thus, we don' t need to sync global checkpoints for closed indices. Relate #33888
1 parent c1aef4b commit 5785941

File tree

3 files changed

+56
-2
lines changed

3 files changed

+56
-2
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2136,8 +2136,8 @@ public void maybeSyncGlobalCheckpoint(final String reason) {
21362136
StreamSupport
21372137
.stream(globalCheckpoints.values().spliterator(), false)
21382138
.anyMatch(v -> v.value < globalCheckpoint);
2139-
// only sync if there is a shard lagging the primary
2140-
if (syncNeeded) {
2139+
// only sync if index is not closed and there is a shard lagging the primary
2140+
if (syncNeeded && indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN) {
21412141
logger.trace("syncing global checkpoint for [{}]", reason);
21422142
globalCheckpointSyncer.run();
21432143
}

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1140,6 +1140,33 @@ public void testGlobalCheckpointSync() throws IOException {
11401140
closeShards(replicaShard, primaryShard);
11411141
}
11421142

1143+
public void testClosedIndicesSkipSyncGlobalCheckpoint() throws Exception {
1144+
ShardId shardId = new ShardId("index", "_na_", 0);
1145+
IndexMetaData.Builder indexMetadata = IndexMetaData.builder("index")
1146+
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
1147+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2))
1148+
.state(IndexMetaData.State.CLOSE).primaryTerm(0, 1);
1149+
ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, randomAlphaOfLength(8), true,
1150+
ShardRoutingState.INITIALIZING, RecoverySource.EmptyStoreRecoverySource.INSTANCE);
1151+
AtomicBoolean synced = new AtomicBoolean();
1152+
IndexShard primaryShard = newShard(shardRouting, indexMetadata.build(), null, new InternalEngineFactory(),
1153+
() -> synced.set(true), RetentionLeaseSyncer.EMPTY);
1154+
recoverShardFromStore(primaryShard);
1155+
IndexShard replicaShard = newShard(shardId, false);
1156+
recoverReplica(replicaShard, primaryShard, true);
1157+
int numDocs = between(1, 10);
1158+
for (int i = 0; i < numDocs; i++) {
1159+
indexDoc(primaryShard, "_doc", Integer.toString(i));
1160+
}
1161+
assertThat(primaryShard.getLocalCheckpoint(), equalTo(numDocs - 1L));
1162+
primaryShard.updateLocalCheckpointForShard(replicaShard.shardRouting.allocationId().getId(), primaryShard.getLocalCheckpoint());
1163+
long globalCheckpointOnReplica = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, primaryShard.getLocalCheckpoint());
1164+
primaryShard.updateGlobalCheckpointForShard(replicaShard.shardRouting.allocationId().getId(), globalCheckpointOnReplica);
1165+
primaryShard.maybeSyncGlobalCheckpoint("test");
1166+
assertFalse("closed indices should skip global checkpoint sync", synced.get());
1167+
closeShards(primaryShard, replicaShard);
1168+
}
1169+
11431170
public void testRestoreLocalHistoryFromTranslogOnPromotion() throws IOException, InterruptedException {
11441171
final IndexShard indexShard = newStartedShard(false);
11451172
final int operations = 1024 - scaledRandomIntBetween(0, 1024);

server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@
3636
import org.elasticsearch.common.util.set.Sets;
3737
import org.elasticsearch.index.IndexNotFoundException;
3838
import org.elasticsearch.index.IndexSettings;
39+
import org.elasticsearch.index.shard.IndexShard;
3940
import org.elasticsearch.indices.IndexClosedException;
41+
import org.elasticsearch.indices.IndicesService;
4042
import org.elasticsearch.indices.recovery.RecoveryState;
4143
import org.elasticsearch.test.BackgroundIndexer;
4244
import org.elasticsearch.test.ESIntegTestCase;
@@ -421,6 +423,31 @@ public Settings onNodeStopped(String nodeName) throws Exception {
421423
}
422424
}
423425

426+
public void testResyncPropagatePrimaryTerm() throws Exception {
427+
internalCluster().ensureAtLeastNumDataNodes(3);
428+
final String indexName = "closed_indices_promotion";
429+
createIndex(indexName, Settings.builder()
430+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
431+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
432+
.build());
433+
indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(0, 50))
434+
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList()));
435+
ensureGreen(indexName);
436+
assertAcked(client().admin().indices().prepareClose(indexName));
437+
assertIndexIsClosed(indexName);
438+
ensureGreen(indexName);
439+
String nodeWithPrimary = clusterService().state().nodes().get(clusterService().state()
440+
.routingTable().index(indexName).shard(0).primaryShard().currentNodeId()).getName();
441+
internalCluster().restartNode(nodeWithPrimary, new InternalTestCluster.RestartCallback());
442+
ensureGreen(indexName);
443+
long primaryTerm = clusterService().state().metaData().index(indexName).primaryTerm(0);
444+
for (String nodeName : internalCluster().nodesInclude(indexName)) {
445+
IndexShard shard = internalCluster().getInstance(IndicesService.class, nodeName)
446+
.indexService(resolveIndex(indexName)).getShard(0);
447+
assertThat(shard.routingEntry().toString(), shard.getOperationPrimaryTerm(), equalTo(primaryTerm));
448+
}
449+
}
450+
424451
static void assertIndexIsClosed(final String... indices) {
425452
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
426453
for (String index : indices) {

0 commit comments

Comments
 (0)