Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2137,8 +2137,8 @@ public void maybeSyncGlobalCheckpoint(final String reason) {
StreamSupport
.stream(globalCheckpoints.values().spliterator(), false)
.anyMatch(v -> v.value < globalCheckpoint);
// only sync if there is a shard lagging the primary
if (syncNeeded) {
// only sync if index is not closed and there is a shard lagging the primary
if (syncNeeded && indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN) {
logger.trace("syncing global checkpoint for [{}]", reason);
globalCheckpointSyncer.run();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,33 @@ public void testGlobalCheckpointSync() throws IOException {
closeShards(replicaShard, primaryShard);
}

public void testClosedIndicesSkipSyncGlobalCheckpoint() throws Exception {
ShardId shardId = new ShardId("index", "_na_", 0);
IndexMetaData.Builder indexMetadata = IndexMetaData.builder("index")
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2))
.state(IndexMetaData.State.CLOSE).primaryTerm(0, 1);
ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, randomAlphaOfLength(8), true,
ShardRoutingState.INITIALIZING, RecoverySource.EmptyStoreRecoverySource.INSTANCE);
AtomicBoolean synced = new AtomicBoolean();
IndexShard primaryShard = newShard(shardRouting, indexMetadata.build(), null, new InternalEngineFactory(),
() -> synced.set(true), RetentionLeaseSyncer.EMPTY);
recoverShardFromStore(primaryShard);
IndexShard replicaShard = newShard(shardId, false);
recoverReplica(replicaShard, primaryShard, true);
int numDocs = between(1, 10);
for (int i = 0; i < numDocs; i++) {
indexDoc(primaryShard, "_doc", Integer.toString(i));
}
assertThat(primaryShard.getLocalCheckpoint(), equalTo(numDocs - 1L));
primaryShard.updateLocalCheckpointForShard(replicaShard.shardRouting.allocationId().getId(), primaryShard.getLocalCheckpoint());
long globalCheckpointOnReplica = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, primaryShard.getLocalCheckpoint());
primaryShard.updateGlobalCheckpointForShard(replicaShard.shardRouting.allocationId().getId(), globalCheckpointOnReplica);
primaryShard.maybeSyncGlobalCheckpoint("test");
assertFalse("closed indices should skip global checkpoint sync", synced.get());
closeShards(primaryShard, replicaShard);
}

public void testRestoreLocalHistoryFromTranslogOnPromotion() throws IOException, InterruptedException {
final IndexShard indexShard = newStartedShard(false);
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ESIntegTestCase;
Expand Down Expand Up @@ -421,6 +423,31 @@ public Settings onNodeStopped(String nodeName) throws Exception {
}
}

public void testResyncPropagatePrimaryTerm() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(3);
final String indexName = "closed_indices_promotion";
createIndex(indexName, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
.build());
indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(0, 50))
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList()));
ensureGreen(indexName);
assertAcked(client().admin().indices().prepareClose(indexName));
assertIndexIsClosed(indexName);
ensureGreen(indexName);
String nodeWithPrimary = clusterService().state().nodes().get(clusterService().state()
.routingTable().index(indexName).shard(0).primaryShard().currentNodeId()).getName();
internalCluster().restartNode(nodeWithPrimary, new InternalTestCluster.RestartCallback());
ensureGreen(indexName);
long primaryTerm = clusterService().state().metaData().index(indexName).primaryTerm(0);
for (String nodeName : internalCluster().nodesInclude(indexName)) {
IndexShard shard = internalCluster().getInstance(IndicesService.class, nodeName)
.indexService(resolveIndex(indexName)).getShard(0);
assertThat(shard.routingEntry().toString(), shard.getOperationPrimaryTerm(), equalTo(primaryTerm));
}
}

static void assertIndexIsClosed(final String... indices) {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
for (String index : indices) {
Expand Down