Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData.State;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand All @@ -33,9 +35,11 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;

Expand All @@ -54,7 +58,6 @@ public void testSimpleAwareness() throws Exception {
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.build();


logger.info("--> starting 2 nodes on the same rack");
internalCluster().startNodes(2, Settings.builder().put(commonSettings).put("node.attr.rack_id", "rack_1").build());

Expand All @@ -68,21 +71,33 @@ public void testSimpleAwareness() throws Exception {

ensureGreen();

final List<String> indicesToClose = randomSubsetOf(Arrays.asList("test1", "test2"));
indicesToClose.forEach(indexToClose -> assertAcked(client().admin().indices().prepareClose(indexToClose).get()));

logger.info("--> starting 1 node on a different rack");
final String node3 = internalCluster().startNode(Settings.builder().put(commonSettings).put("node.attr.rack_id", "rack_2").build());

// On slow machines the initial relocation might be delayed
assertThat(awaitBusy(
() -> {
logger.info("--> waiting for no relocation");
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus().setWaitForNodes("3").setWaitForNoRelocatingShards(true).get();
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth()
.setIndices("test1", "test2")
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes("3")
.setWaitForNoRelocatingShards(true)
.get();
if (clusterHealth.isTimedOut()) {
return false;
}

logger.info("--> checking current state");
ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
// check that closed indices are effectively closed
if (indicesToClose.stream().anyMatch(index -> clusterState.metaData().index(index).getState() != State.CLOSE)) {
return false;
}
// verify that we have all the primaries on node3
ObjectIntHashMap<String> counts = new ObjectIntHashMap<>();
for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
Expand All @@ -99,7 +114,7 @@ public void testSimpleAwareness() throws Exception {
), equalTo(true));
}

public void testAwarenessZones() throws Exception {
public void testAwarenessZones() {
Settings commonSettings = Settings.builder()
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone.values", "a,b")
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone")
Expand All @@ -121,12 +136,20 @@ public void testAwarenessZones() throws Exception {
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("4").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

client().admin().indices().prepareCreate("test")
.setSettings(Settings.builder().put("index.number_of_shards", 5)
.put("index.number_of_replicas", 1)).execute().actionGet();
createIndex("test", Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 5)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build());

if (randomBoolean()) {
assertAcked(client().admin().indices().prepareClose("test"));
}

logger.info("--> waiting for shards to be allocated");
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus()
health = client().admin().cluster().prepareHealth()
.setIndices("test")
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNoRelocatingShards(true).execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

Expand All @@ -146,7 +169,7 @@ public void testAwarenessZones() throws Exception {
assertThat(counts.get(B_0), anyOf(equalTo(2),equalTo(3)));
}

public void testAwarenessZonesIncrementalNodes() throws Exception {
public void testAwarenessZonesIncrementalNodes() {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b")
.put("cluster.routing.allocation.awareness.attributes", "zone")
Expand All @@ -159,11 +182,23 @@ public void testAwarenessZonesIncrementalNodes() throws Exception {
);
String A_0 = nodes.get(0);
String B_0 = nodes.get(1);
client().admin().indices().prepareCreate("test")
.setSettings(Settings.builder().put("index.number_of_shards", 5)
.put("index.number_of_replicas", 1)).execute().actionGet();
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus().setWaitForNodes("2").setWaitForNoRelocatingShards(true).execute().actionGet();

createIndex("test", Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 5)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build());

if (randomBoolean()) {
assertAcked(client().admin().indices().prepareClose("test"));
}

ClusterHealthResponse health = client().admin().cluster().prepareHealth()
.setIndices("test")
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes("2")
.setWaitForNoRelocatingShards(true)
.execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
ObjectIntHashMap<String> counts = new ObjectIntHashMap<>();
Expand All @@ -180,12 +215,22 @@ public void testAwarenessZonesIncrementalNodes() throws Exception {
logger.info("--> starting another node in zone 'b'");

String B_1 = internalCluster().startNode(Settings.builder().put(commonSettings).put("node.attr.zone", "b").build());
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus()
.setWaitForNodes("3").execute().actionGet();
health = client().admin().cluster().prepareHealth()
.setIndices("test")
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes("3")
.execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
client().admin().cluster().prepareReroute().get();
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus()
.setWaitForNodes("3").setWaitForActiveShards(10).setWaitForNoRelocatingShards(true).execute().actionGet();
health = client().admin().cluster().prepareHealth()
.setIndices("test")
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes("3")
.setWaitForActiveShards(10)
.setWaitForNoRelocatingShards(true)
.execute().actionGet();

assertThat(health.isTimedOut(), equalTo(false));
clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
Expand All @@ -204,12 +249,22 @@ public void testAwarenessZonesIncrementalNodes() throws Exception {
assertThat(counts.get(B_1), equalTo(2));

String noZoneNode = internalCluster().startNode();
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus()
.setWaitForNodes("4").execute().actionGet();
health = client().admin().cluster().prepareHealth()
.setIndices("test")
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes("4")
.execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
client().admin().cluster().prepareReroute().get();
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus()
.setWaitForNodes("4").setWaitForActiveShards(10).setWaitForNoRelocatingShards(true).execute().actionGet();
health = client().admin().cluster().prepareHealth()
.setIndices("test")
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes("4")
.setWaitForActiveShards(10)
.setWaitForNoRelocatingShards(true)
.execute().actionGet();

assertThat(health.isTimedOut(), equalTo(false));
clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
Expand All @@ -231,8 +286,14 @@ public void testAwarenessZonesIncrementalNodes() throws Exception {
client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.routing.allocation.awareness.attributes", "").build()).get();

health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus()
.setWaitForNodes("4").setWaitForActiveShards(10).setWaitForNoRelocatingShards(true).execute().actionGet();
health = client().admin().cluster().prepareHealth()
.setIndices("test")
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes("4")
.setWaitForActiveShards(10)
.setWaitForNoRelocatingShards(true)
.execute().actionGet();

assertThat(health.isTimedOut(), equalTo(false));
clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.elasticsearch.action.admin.cluster.reroute.TransportClusterRerouteAction;
Expand All @@ -34,6 +33,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
Expand All @@ -48,6 +48,7 @@
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -102,6 +103,10 @@ private void rerouteWithCommands(Settings commonSettings) throws Exception {
.setSettings(Settings.builder().put("index.number_of_shards", 1))
.execute().actionGet();

if (randomBoolean()) {
client().admin().indices().prepareClose("test").get();
}

ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState();
assertThat(state.getRoutingNodes().unassigned().size(), equalTo(2));

Expand All @@ -128,8 +133,11 @@ private void rerouteWithCommands(Settings commonSettings) throws Exception {
assertThat(state.getRoutingNodes().node(state.nodes().resolveNode(node_1).getId()).iterator().next().state(),
equalTo(ShardRoutingState.INITIALIZING));

ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID)
.setWaitForYellowStatus().execute().actionGet();
ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth()
.setIndices("test")
.setWaitForEvents(Priority.LANGUID)
.setWaitForYellowStatus()
.execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));

logger.info("--> get the state, verify shard 1 primary allocated");
Expand All @@ -149,9 +157,12 @@ private void rerouteWithCommands(Settings commonSettings) throws Exception {
assertThat(state.getRoutingNodes().node(state.nodes().resolveNode(node_2).getId()).iterator().next().state(),
equalTo(ShardRoutingState.INITIALIZING));


healthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus()
.setWaitForNoRelocatingShards(true).execute().actionGet();
healthResponse = client().admin().cluster().prepareHealth()
.setIndices("test")
.setWaitForEvents(Priority.LANGUID)
.setWaitForYellowStatus()
.setWaitForNoRelocatingShards(true)
.execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));

logger.info("--> get the state, verify shard 1 primary moved from node1 to node2");
Expand Down Expand Up @@ -193,11 +204,15 @@ public void testDelayWithALargeAmountOfShards() throws Exception {

logger.info("--> create indices");
for (int i = 0; i < 25; i++) {
client().admin().indices().prepareCreate("test" + i)
.setSettings(Settings.builder()
.put("index.number_of_shards", 5).put("index.number_of_replicas", 1)
.put("index.unassigned.node_left.delayed_timeout", randomIntBetween(250, 1000) + "ms"))
.execute().actionGet();
final String indexName = "test" + i;
createIndex(indexName, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 5)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), randomIntBetween(250, 1000) + "ms")
.build());
if (randomBoolean()) {
assertAcked(client().admin().indices().prepareClose(indexName));
}
}

ensureGreen(TimeValue.timeValueMinutes(1));
Expand Down Expand Up @@ -294,10 +309,14 @@ public void testRerouteExplain() {
assertThat(healthResponse.isTimedOut(), equalTo(false));

logger.info("--> create an index with 1 shard");
client().admin().indices().prepareCreate("test")
.setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0))
.execute().actionGet();
createIndex("test", Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build());

if (randomBoolean()) {
assertAcked(client().admin().indices().prepareClose("test"));
}
ensureGreen("test");

logger.info("--> disable allocation");
Expand Down Expand Up @@ -403,12 +422,18 @@ public void testMessageLogging() throws Exception{
Loggers.removeAppender(actionLogger, allocateMockLog);
}

public void testClusterRerouteWithBlocks() throws Exception {
public void testClusterRerouteWithBlocks() {
List<String> nodesIds = internalCluster().startNodes(2);

logger.info("--> create an index with 1 shard and 0 replicas");
assertAcked(prepareCreate("test-blocks").setSettings(Settings.builder().put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)));
createIndex("test-blocks", Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build());

if (randomBoolean()) {
assertAcked(client().admin().indices().prepareClose("test-blocks"));
}
ensureGreen("test-blocks");

logger.info("--> check that the index has 1 shard");
Expand All @@ -432,11 +457,14 @@ public void testClusterRerouteWithBlocks() throws Exception {
SETTING_READ_ONLY_ALLOW_DELETE)) {
try {
enableIndexBlock("test-blocks", blockSetting);
assertAcked(client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test-blocks", 0,
nodesIds.get(toggle % 2), nodesIds.get(++toggle % 2))));
assertAcked(client().admin().cluster().prepareReroute()
.add(new MoveAllocationCommand("test-blocks", 0, nodesIds.get(toggle % 2), nodesIds.get(++toggle % 2))));

ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForYellowStatus()
.setWaitForNoRelocatingShards(true).execute().actionGet();
ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth()
.setIndices("test-blocks")
.setWaitForYellowStatus()
.setWaitForNoRelocatingShards(true)
.execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));
} finally {
disableIndexBlock("test-blocks", blockSetting);
Expand Down
Loading