Skip to content

Commit 309a3e4

Browse files
authored
Add support for replicating closed indices (#39499)
Before this change, closed indexes were simply not replicated. It was therefore possible to close an index and then decommission a data node without knowing that this data node contained shards of the closed index, potentially leading to data loss. Shards of closed indices were not completely taken into account when balancing the shards within the cluster, or automatically replicated through shard copies, and they were not easily movable from node A to node B using APIs like Cluster Reroute without being fully reopened and closed again. This commit changes the logic executed when closing an index, so that its shards are not just removed and forgotten but are instead reinitialized and reallocated on data nodes using an engine implementation which does not allow searching or indexing, which has a low memory overhead (compared with searchable/indexable opened shards) and which allows shards to be recovered from peer or promoted as primaries when needed. This new closing logic is built on top of the new Close Index API introduced in 6.7.0 (#37359). Some pre-closing sanity checks are executed on the shards before closing them, and closing an index on a 8.0 cluster will reinitialize the index shards and therefore impact the cluster health. Some APIs have been adapted to make them work with closed indices: - Cluster Health API - Cluster Reroute API - Cluster Allocation Explain API - Recovery API - Cat Indices - Cat Shards - Cat Health - Cat Recovery This commit contains all the following changes (most recent first): * c6c42a1 Adapt NoOpEngineTests after #39006 * 3f9993d Wait for shards to be active after closing indices (#38854) * 5e7a428 Adapt the Cluster Health API to closed indices (#39364) * 3e61939 Adapt CloseFollowerIndexIT for replicated closed indices (#38767) * 71f5c34 Recover closed indices after a full cluster restart (#39249) * 4db7fd9 Adapt the Recovery API for closed indices (#38421) * 4fd1bb2 Adapt more tests suites to closed indices (#39186) * 0519016 Add replica to primary promotion test for closed indices (#39110) * b756f6c Test the Cluster Shard Allocation Explain API with closed indices (#38631) * c484c66 Remove index routing table of closed indices in mixed versions clusters (#38955) * 00f1828 Mute CloseFollowerIndexIT.testCloseAndReopenFollowerIndex() * e845b0a Do not schedule Refresh/Translog/GlobalCheckpoint tasks for closed indices (#38329) * cf9a015 Adapt testIndexCanChangeCustomDataPath for replicated closed indices (#38327) * b9becdd Adapt testPendingTasks() for replicated closed indices (#38326) * 02cc730 Allow shards of closed indices to be replicated as regular shards (#38024) * e53a9be Fix compilation error in IndexShardIT after merge with master * cae4155 Relax NoOpEngine constraints (#37413) * 54d110b [RCI] Adapt NoOpEngine to latest FrozenEngine changes * c63fd69 [RCI] Add NoOpEngine for closed indices (#33903) Relates to #33888
1 parent c075107 commit 309a3e4

File tree

75 files changed

+2790
-416
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

75 files changed

+2790
-416
lines changed

qa/full-cluster-restart/src/test/java/org/elasticsearch/upgrades/FullClusterRestartIT.java

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.client.ResponseException;
2727
import org.elasticsearch.client.RestClient;
2828
import org.elasticsearch.cluster.metadata.IndexMetaData;
29+
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
2930
import org.elasticsearch.common.Booleans;
3031
import org.elasticsearch.common.CheckedFunction;
3132
import org.elasticsearch.common.Strings;
@@ -41,6 +42,7 @@
4142
import java.io.IOException;
4243
import java.util.ArrayList;
4344
import java.util.Base64;
45+
import java.util.Collection;
4446
import java.util.HashMap;
4547
import java.util.HashSet;
4648
import java.util.List;
@@ -59,8 +61,11 @@
5961
import static org.hamcrest.Matchers.containsString;
6062
import static org.hamcrest.Matchers.equalTo;
6163
import static org.hamcrest.Matchers.greaterThan;
64+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
6265
import static org.hamcrest.Matchers.hasSize;
66+
import static org.hamcrest.Matchers.is;
6367
import static org.hamcrest.Matchers.notNullValue;
68+
import static org.hamcrest.Matchers.nullValue;
6469

6570
/**
6671
* Tests to run before and after a full cluster restart. This is run twice,
@@ -951,6 +956,97 @@ public void testSoftDeletes() throws Exception {
951956
}
952957
}
953958

959+
/**
960+
* This test creates an index in the old cluster and then closes it. When the cluster is fully restarted in a newer version,
961+
* it verifies that the index exists and is replicated if the old version supports replication.
962+
*/
963+
public void testClosedIndices() throws Exception {
964+
if (isRunningAgainstOldCluster()) {
965+
createIndex(index, Settings.builder()
966+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
967+
.build());
968+
ensureGreen(index);
969+
970+
int numDocs = 0;
971+
if (randomBoolean()) {
972+
numDocs = between(1, 100);
973+
for (int i = 0; i < numDocs; i++) {
974+
final Request request = new Request("POST", "/" + index + "/_doc/" + i);
975+
request.setJsonEntity(Strings.toString(JsonXContent.contentBuilder().startObject().field("field", "v1").endObject()));
976+
assertOK(client().performRequest(request));
977+
if (rarely()) {
978+
refresh();
979+
}
980+
}
981+
refresh();
982+
}
983+
984+
assertTotalHits(numDocs, entityAsMap(client().performRequest(new Request("GET", "/" + index + "/_search"))));
985+
saveInfoDocument(index + "_doc_count", Integer.toString(numDocs));
986+
closeIndex(index);
987+
}
988+
989+
if (getOldClusterVersion().onOrAfter(Version.V_8_0_0)) {
990+
ensureGreenLongWait(index);
991+
assertClosedIndex(index, true);
992+
} else {
993+
assertClosedIndex(index, false);
994+
}
995+
996+
if (isRunningAgainstOldCluster() == false) {
997+
openIndex(index);
998+
ensureGreen(index);
999+
1000+
final int expectedNumDocs = Integer.parseInt(loadInfoDocument(index + "_doc_count"));
1001+
assertTotalHits(expectedNumDocs, entityAsMap(client().performRequest(new Request("GET", "/" + index + "/_search"))));
1002+
}
1003+
}
1004+
1005+
/**
1006+
* Asserts that an index is closed in the cluster state. If `checkRoutingTable` is true, it also asserts
1007+
* that the index has started shards.
1008+
*/
1009+
@SuppressWarnings("unchecked")
1010+
private void assertClosedIndex(final String index, final boolean checkRoutingTable) throws IOException {
1011+
final Map<String, ?> state = entityAsMap(client().performRequest(new Request("GET", "/_cluster/state")));
1012+
1013+
final Map<String, ?> metadata = (Map<String, Object>) XContentMapValues.extractValue("metadata.indices." + index, state);
1014+
assertThat(metadata, notNullValue());
1015+
assertThat(metadata.get("state"), equalTo("close"));
1016+
1017+
final Map<String, ?> blocks = (Map<String, Object>) XContentMapValues.extractValue("blocks.indices." + index, state);
1018+
assertThat(blocks, notNullValue());
1019+
assertThat(blocks.containsKey(String.valueOf(MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID)), is(true));
1020+
1021+
final Map<String, ?> settings = (Map<String, Object>) XContentMapValues.extractValue("settings", metadata);
1022+
assertThat(settings, notNullValue());
1023+
1024+
final Map<String, ?> routingTable = (Map<String, Object>) XContentMapValues.extractValue("routing_table.indices." + index, state);
1025+
if (checkRoutingTable) {
1026+
assertThat(routingTable, notNullValue());
1027+
assertThat(Booleans.parseBoolean((String) XContentMapValues.extractValue("index.verified_before_close", settings)), is(true));
1028+
final String numberOfShards = (String) XContentMapValues.extractValue("index.number_of_shards", settings);
1029+
assertThat(numberOfShards, notNullValue());
1030+
final int nbShards = Integer.parseInt(numberOfShards);
1031+
assertThat(nbShards, greaterThanOrEqualTo(1));
1032+
1033+
for (int i = 0; i < nbShards; i++) {
1034+
final Collection<Map<String, ?>> shards =
1035+
(Collection<Map<String, ?>>) XContentMapValues.extractValue("shards." + i, routingTable);
1036+
assertThat(shards, notNullValue());
1037+
assertThat(shards.size(), equalTo(2));
1038+
for (Map<String, ?> shard : shards) {
1039+
assertThat(XContentMapValues.extractValue("shard", shard), equalTo(i));
1040+
assertThat(XContentMapValues.extractValue("state", shard), equalTo("STARTED"));
1041+
assertThat(XContentMapValues.extractValue("index", shard), equalTo(index));
1042+
}
1043+
}
1044+
} else {
1045+
assertThat(routingTable, nullValue());
1046+
assertThat(XContentMapValues.extractValue("index.verified_before_close", settings), nullValue());
1047+
}
1048+
}
1049+
9541050
private void checkSnapshot(final String snapshotName, final int count, final Version tookOnVersion) throws IOException {
9551051
// Check the snapshot metadata, especially the version
9561052
Request listSnapshotRequest = new Request("GET", "/_snapshot/repo/" + snapshotName);

qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,20 @@
2424
import org.elasticsearch.client.Response;
2525
import org.elasticsearch.client.ResponseException;
2626
import org.elasticsearch.cluster.metadata.IndexMetaData;
27+
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
28+
import org.elasticsearch.common.Booleans;
2729
import org.elasticsearch.common.settings.Settings;
2830
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
31+
import org.elasticsearch.common.xcontent.support.XContentMapValues;
2932
import org.elasticsearch.index.IndexSettings;
3033
import org.elasticsearch.rest.action.document.RestIndexAction;
3134
import org.elasticsearch.test.rest.yaml.ObjectPath;
3235

3336
import java.io.IOException;
3437
import java.util.ArrayList;
38+
import java.util.Collection;
3539
import java.util.List;
40+
import java.util.Locale;
3641
import java.util.Map;
3742
import java.util.concurrent.Future;
3843
import java.util.function.Predicate;
@@ -43,7 +48,9 @@
4348
import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY;
4449
import static org.hamcrest.Matchers.equalTo;
4550
import static org.hamcrest.Matchers.hasSize;
51+
import static org.hamcrest.Matchers.is;
4652
import static org.hamcrest.Matchers.notNullValue;
53+
import static org.hamcrest.Matchers.nullValue;
4754

4855
/**
4956
* In depth testing of the recovery mechanism during a rolling restart.
@@ -310,4 +317,144 @@ public void testRecoveryWithSoftDeletes() throws Exception {
310317
}
311318
ensureGreen(index);
312319
}
320+
321+
/**
322+
* This test creates an index in the non upgraded cluster and closes it. It then checks that the index
323+
* is effectively closed and potentially replicated (if the version the index was created on supports
324+
* the replication of closed indices) during the rolling upgrade.
325+
*/
326+
public void testRecoveryClosedIndex() throws Exception {
327+
final String indexName = "closed_index_created_on_old";
328+
if (CLUSTER_TYPE == ClusterType.OLD) {
329+
createIndex(indexName, Settings.builder()
330+
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
331+
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)
332+
// if the node with the replica is the first to be restarted, while a replica is still recovering
333+
// then delayed allocation will kick in. When the node comes back, the master will search for a copy
334+
// but the recovering copy will be seen as invalid and the cluster health won't return to GREEN
335+
// before timing out
336+
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms")
337+
.put(SETTING_ALLOCATION_MAX_RETRY.getKey(), "0") // fail faster
338+
.build());
339+
ensureGreen(indexName);
340+
closeIndex(indexName);
341+
}
342+
343+
final Version indexVersionCreated = indexVersionCreated(indexName);
344+
if (indexVersionCreated.onOrAfter(Version.V_8_0_0)) {
345+
// index was created on a version that supports the replication of closed indices,
346+
// so we expect the index to be closed and replicated
347+
ensureGreen(indexName);
348+
assertClosedIndex(indexName, true);
349+
} else {
350+
assertClosedIndex(indexName, false);
351+
}
352+
}
353+
354+
/**
355+
* This test creates and closes a new index at every stage of the rolling upgrade. It then checks that the index
356+
* is effectively closed and potentially replicated if the cluster supports replication of closed indices at the
357+
* time the index was closed.
358+
*/
359+
public void testCloseIndexDuringRollingUpgrade() throws Exception {
360+
final Version minimumNodeVersion = minimumNodeVersion();
361+
final String indexName =
362+
String.join("_", "index", CLUSTER_TYPE.toString(), Integer.toString(minimumNodeVersion.id)).toLowerCase(Locale.ROOT);
363+
364+
if (indexExists(indexName) == false) {
365+
createIndex(indexName, Settings.builder()
366+
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
367+
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
368+
.build());
369+
ensureGreen(indexName);
370+
closeIndex(indexName);
371+
}
372+
373+
if (minimumNodeVersion.onOrAfter(Version.V_8_0_0)) {
374+
// index is created on a version that supports the replication of closed indices,
375+
// so we expect the index to be closed and replicated
376+
ensureGreen(indexName);
377+
assertClosedIndex(indexName, true);
378+
} else {
379+
assertClosedIndex(indexName, false);
380+
}
381+
}
382+
383+
/**
384+
* Returns the version in which the given index has been created
385+
*/
386+
private static Version indexVersionCreated(final String indexName) throws IOException {
387+
final Request request = new Request("GET", "/" + indexName + "/_settings");
388+
final String versionCreatedSetting = indexName + ".settings.index.version.created";
389+
request.addParameter("filter_path", versionCreatedSetting);
390+
391+
final Response response = client().performRequest(request);
392+
return Version.fromId(Integer.parseInt(ObjectPath.createFromResponse(response).evaluate(versionCreatedSetting)));
393+
}
394+
395+
/**
396+
* Returns the minimum node version among all nodes of the cluster
397+
*/
398+
private static Version minimumNodeVersion() throws IOException {
399+
final Request request = new Request("GET", "_nodes");
400+
request.addParameter("filter_path", "nodes.*.version");
401+
402+
final Response response = client().performRequest(request);
403+
final Map<String, Object> nodes = ObjectPath.createFromResponse(response).evaluate("nodes");
404+
405+
Version minVersion = null;
406+
for (Map.Entry<String, Object> node : nodes.entrySet()) {
407+
@SuppressWarnings("unchecked")
408+
Version nodeVersion = Version.fromString((String) ((Map<String, Object>) node.getValue()).get("version"));
409+
if (minVersion == null || minVersion.after(nodeVersion)) {
410+
minVersion = nodeVersion;
411+
}
412+
}
413+
assertNotNull(minVersion);
414+
return minVersion;
415+
}
416+
417+
/**
418+
* Asserts that an index is closed in the cluster state. If `checkRoutingTable` is true, it also asserts
419+
* that the index has started shards.
420+
*/
421+
@SuppressWarnings("unchecked")
422+
private void assertClosedIndex(final String index, final boolean checkRoutingTable) throws IOException {
423+
final Map<String, ?> state = entityAsMap(client().performRequest(new Request("GET", "/_cluster/state")));
424+
425+
final Map<String, ?> metadata = (Map<String, Object>) XContentMapValues.extractValue("metadata.indices." + index, state);
426+
assertThat(metadata, notNullValue());
427+
assertThat(metadata.get("state"), equalTo("close"));
428+
429+
final Map<String, ?> blocks = (Map<String, Object>) XContentMapValues.extractValue("blocks.indices." + index, state);
430+
assertThat(blocks, notNullValue());
431+
assertThat(blocks.containsKey(String.valueOf(MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID)), is(true));
432+
433+
final Map<String, ?> settings = (Map<String, Object>) XContentMapValues.extractValue("settings", metadata);
434+
assertThat(settings, notNullValue());
435+
436+
final int numberOfShards = Integer.parseInt((String) XContentMapValues.extractValue("index.number_of_shards", settings));
437+
final int numberOfReplicas = Integer.parseInt((String) XContentMapValues.extractValue("index.number_of_replicas", settings));
438+
439+
final Map<String, ?> routingTable = (Map<String, Object>) XContentMapValues.extractValue("routing_table.indices." + index, state);
440+
if (checkRoutingTable) {
441+
assertThat(routingTable, notNullValue());
442+
assertThat(Booleans.parseBoolean((String) XContentMapValues.extractValue("index.verified_before_close", settings)), is(true));
443+
444+
for (int i = 0; i < numberOfShards; i++) {
445+
final Collection<Map<String, ?>> shards =
446+
(Collection<Map<String, ?>>) XContentMapValues.extractValue("shards." + i, routingTable);
447+
assertThat(shards, notNullValue());
448+
assertThat(shards.size(), equalTo(numberOfReplicas + 1));
449+
for (Map<String, ?> shard : shards) {
450+
assertThat(XContentMapValues.extractValue("shard", shard), equalTo(i));
451+
assertThat(XContentMapValues.extractValue("state", shard), equalTo("STARTED"));
452+
assertThat(XContentMapValues.extractValue("index", shard), equalTo(index));
453+
}
454+
}
455+
} else {
456+
assertThat(routingTable, nullValue());
457+
assertThat(XContentMapValues.extractValue("index.verified_before_close", settings), nullValue());
458+
}
459+
}
313460
}

rest-api-spec/src/main/resources/rest-api-spec/api/cluster.health.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@
1212
}
1313
},
1414
"params": {
15+
"expand_wildcards": {
16+
"type" : "enum",
17+
"options" : ["open","closed","none","all"],
18+
"default" : "all",
19+
"description" : "Whether to expand wildcard expression to concrete indices that are open, closed or both."
20+
},
1521
"level": {
1622
"type" : "enum",
1723
"options" : ["cluster","indices","shards"],

rest-api-spec/src/main/resources/rest-api-spec/api/indices.close.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@
3434
"options" : ["open","closed","none","all"],
3535
"default" : "open",
3636
"description" : "Whether to expand wildcard expression to concrete indices that are open, closed or both."
37+
},
38+
"wait_for_active_shards": {
39+
"type" : "string",
40+
"description" : "Sets the number of active shards to wait for before the operation returns."
3741
}
3842
}
3943
},

0 commit comments

Comments
 (0)