From 6f81d6de161c6ff2d376d51555c349579ed81c8f Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 19 Dec 2019 12:06:37 +0100 Subject: [PATCH 1/3] Only auto-expand replicas with allocation filtering when all nodes upgraded --- .../elasticsearch/upgrades/RecoveryIT.java | 28 ++++++++ .../cluster/metadata/AutoExpandReplicas.java | 14 ++-- .../metadata/AutoExpandReplicasTests.java | 65 ++++++++++++++++++- 3 files changed, 100 insertions(+), 7 deletions(-) diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java index cd4a07aab3ec0..1ef4dcaa159d3 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -729,4 +729,32 @@ public void testTurnOffTranslogRetentionAfterUpgraded() throws Exception { assertEmptyTranslog(index); } } + + public void testAutoExpandIndicesDuringRollingUpgrade() throws Exception { + final String indexName = "test-auto-expand-filtering"; + final Version minimumNodeVersion = minimumNodeVersion(); + + Response response = client().performRequest(new Request("GET", "_nodes")); + ObjectPath objectPath = ObjectPath.createFromResponse(response); + final Map nodeMap = objectPath.evaluate("nodes"); + List nodes = new ArrayList<>(nodeMap.keySet()); + + if (CLUSTER_TYPE == ClusterType.OLD) { + createIndex(indexName, Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-all") + .put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "._id", nodes.get(randomInt(2))) + .build()); + } + + ensureGreen(indexName); + + final int numberOfReplicas = Integer.parseInt( + getIndexSettings(indexName).get(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey()).toString()); + if (minimumNodeVersion.onOrAfter(Version.V_7_6_0)) { + assertEquals(nodes.size() - 1, numberOfReplicas); + } else { + assertEquals(nodes.size(), numberOfReplicas); + } + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/AutoExpandReplicas.java b/server/src/main/java/org/elasticsearch/cluster/metadata/AutoExpandReplicas.java index 346d755c37916..ffd1c2a8263c6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/AutoExpandReplicas.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/AutoExpandReplicas.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.metadata; import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.Decision; @@ -105,11 +106,16 @@ int getMaxReplicas(int numDataNodes) { private OptionalInt getDesiredNumberOfReplicas(IndexMetaData indexMetaData, RoutingAllocation allocation) { if (enabled) { int numMatchingDataNodes = 0; - for (ObjectCursor cursor : allocation.nodes().getDataNodes().values()) { - Decision decision = allocation.deciders().shouldAutoExpandToNode(indexMetaData, cursor.value, allocation); - if (decision.type() != Decision.Type.NO) { - numMatchingDataNodes ++; + // Only start using new logic once all nodes are migrated to 7.6.0, avoiding disruption during an upgrade + if (allocation.nodes().getMinNodeVersion().onOrAfter(Version.V_7_6_0)) { + for (ObjectCursor cursor : allocation.nodes().getDataNodes().values()) { + Decision decision = allocation.deciders().shouldAutoExpandToNode(indexMetaData, cursor.value, allocation); + if (decision.type() != Decision.Type.NO) { + numMatchingDataNodes ++; + } } + } else { + numMatchingDataNodes = allocation.nodes().getDataNodes().size(); } final int min = getMinReplicas(); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/AutoExpandReplicasTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/AutoExpandReplicasTests.java index f78104201c9fb..de13e5f3e7e31 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/AutoExpandReplicasTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/AutoExpandReplicasTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.cluster.ClusterState; @@ -29,9 +30,11 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.indices.cluster.ClusterStateChanges; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -46,6 +49,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.isIn; @@ -104,12 +108,15 @@ public void testInvalidValues() { private static final AtomicInteger nodeIdGenerator = new AtomicInteger(); - protected DiscoveryNode createNode(DiscoveryNodeRole... mustHaveRoles) { + protected DiscoveryNode createNode(Version version, DiscoveryNodeRole... mustHaveRoles) { Set roles = new HashSet<>(randomSubsetOf(DiscoveryNodeRole.BUILT_IN_ROLES)); Collections.addAll(roles, mustHaveRoles); final String id = String.format(Locale.ROOT, "node_%03d", nodeIdGenerator.incrementAndGet()); - return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), roles, - Version.CURRENT); + return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), roles, version); + } + + protected DiscoveryNode createNode(DiscoveryNodeRole... mustHaveRoles) { + return createNode(Version.CURRENT, mustHaveRoles); } /** @@ -200,4 +207,56 @@ public void testAutoExpandWhenNodeLeavesAndPossiblyRejoins() throws InterruptedE terminate(threadPool); } } + + public void testOnlyAutoExpandAllocationFilteringAfterAllNodesUpgraded() { + final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + final ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool); + + try { + List allNodes = new ArrayList<>(); + DiscoveryNode oldNode = createNode(VersionUtils.randomVersionBetween(random(), Version.V_7_0_0, Version.V_7_5_1), + DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE); // local node is the master + allNodes.add(oldNode); + ClusterState state = ClusterStateCreationUtils.state(oldNode, oldNode, allNodes.toArray(new DiscoveryNode[0])); + + CreateIndexRequest request = new CreateIndexRequest("index", + Settings.builder() + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_AUTO_EXPAND_REPLICAS, "0-all").build()) + .waitForActiveShards(ActiveShardCount.NONE); + state = cluster.createIndex(state, request); + assertTrue(state.metaData().hasIndex("index")); + while (state.routingTable().index("index").shard(0).allShardsStarted() == false) { + logger.info(state); + state = cluster.applyStartedShards(state, + state.routingTable().index("index").shard(0).shardsWithState(ShardRoutingState.INITIALIZING)); + state = cluster.reroute(state, new ClusterRerouteRequest()); + } + + DiscoveryNode newNode = createNode(Version.V_7_6_0, + DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE); // local node is the master + + state = cluster.addNodes(state, Collections.singletonList(newNode)); + + // use allocation filtering + state = cluster.updateSettings(state, new UpdateSettingsRequest("index").settings(Settings.builder() + .put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "._name", oldNode.getName()).build())); + + while (state.routingTable().index("index").shard(0).allShardsStarted() == false) { + logger.info(state); + state = cluster.applyStartedShards(state, + state.routingTable().index("index").shard(0).shardsWithState(ShardRoutingState.INITIALIZING)); + state = cluster.reroute(state, new ClusterRerouteRequest()); + } + + // check that presence of old node means that auto-expansion does not take allocation filtering into account + assertThat(state.routingTable().index("index").shard(0).size(), equalTo(2)); + + // remove old node and check that auto-expansion takes allocation filtering into account + state = cluster.removeNodes(state, Collections.singletonList(oldNode)); + assertThat(state.routingTable().index("index").shard(0).size(), equalTo(1)); + } finally { + terminate(threadPool); + } + } } From 63fc0a04bba545cdaebb055762ef93d4a6e911e1 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 19 Dec 2019 14:13:15 +0100 Subject: [PATCH 2/3] fix test --- .../org/elasticsearch/upgrades/RecoveryIT.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java index 1ef4dcaa159d3..ed1988bbd1aa5 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -741,7 +741,8 @@ public void testAutoExpandIndicesDuringRollingUpgrade() throws Exception { if (CLUSTER_TYPE == ClusterType.OLD) { createIndex(indexName, Settings.builder() - .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2)) .put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-all") .put(IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "._id", nodes.get(randomInt(2))) .build()); @@ -750,11 +751,17 @@ public void testAutoExpandIndicesDuringRollingUpgrade() throws Exception { ensureGreen(indexName); final int numberOfReplicas = Integer.parseInt( - getIndexSettings(indexName).get(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey()).toString()); + getIndexSettingsAsMap(indexName).get(IndexMetaData.SETTING_NUMBER_OF_REPLICAS).toString()); if (minimumNodeVersion.onOrAfter(Version.V_7_6_0)) { - assertEquals(nodes.size() - 1, numberOfReplicas); + assertEquals(nodes.size() - 2, numberOfReplicas); } else { - assertEquals(nodes.size(), numberOfReplicas); + assertEquals(nodes.size() - 1, numberOfReplicas); } } + + @SuppressWarnings("unchecked") + private Map getIndexSettingsAsMap(String index) throws IOException { + Map indexSettings = getIndexSettings(index); + return (Map)((Map) indexSettings.get(index)).get("settings"); + } } From cf03f10ffeb71d2a4a1e7dd5e0bc566b04246f98 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 19 Dec 2019 15:09:34 +0100 Subject: [PATCH 3/3] checkstyle --- .../elasticsearch/cluster/metadata/AutoExpandReplicasTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/AutoExpandReplicasTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/AutoExpandReplicasTests.java index de13e5f3e7e31..32d25b09fa3c0 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/AutoExpandReplicasTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/AutoExpandReplicasTests.java @@ -30,7 +30,6 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.indices.cluster.ClusterStateChanges; import org.elasticsearch.test.ESTestCase;