Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,53 +14,75 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.core.Nullable;

import java.io.IOException;
import java.util.Objects;

public class ShutdownShardMigrationStatus implements Writeable, ToXContentObject {

private final SingleNodeShutdownMetadata.Status status;
private final long shardsRemaining;
@Nullable private final String explanation;

public ShutdownShardMigrationStatus() {
this.status = SingleNodeShutdownMetadata.Status.COMPLETE;
public ShutdownShardMigrationStatus(SingleNodeShutdownMetadata.Status status, long shardsRemaining) {
this(status, shardsRemaining, null);
}

public ShutdownShardMigrationStatus(SingleNodeShutdownMetadata.Status status, long shardsRemaining, @Nullable String explanation) {
this.status = Objects.requireNonNull(status, "status must not be null");
this.shardsRemaining = shardsRemaining;
this.explanation = explanation;
}

public ShutdownShardMigrationStatus(StreamInput in) throws IOException {
this.status = SingleNodeShutdownMetadata.Status.COMPLETE;
this.status = in.readEnum(SingleNodeShutdownMetadata.Status.class);
this.shardsRemaining = in.readLong();
this.explanation = in.readOptionalString();
}

public long getShardsRemaining() {
return shardsRemaining;
}

public String getExplanation() {
return explanation;
}

public SingleNodeShutdownMetadata.Status getStatus() {
return status;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("status", status);
builder.field("shard_migrations_remaining", shardsRemaining);
if (Objects.nonNull(explanation)) {
builder.field("explanation", explanation);
}
builder.endObject();
return builder;
}

@Override
public void writeTo(StreamOutput out) throws IOException {

}

public SingleNodeShutdownMetadata.Status getStatus() {
return status;
out.writeEnum(status);
out.writeLong(shardsRemaining);
out.writeOptionalString(explanation);
}

@Override
public int hashCode() {
return status.hashCode();
public boolean equals(Object o) {
if (this == o) return true;
if ((o instanceof ShutdownShardMigrationStatus) == false) return false;
ShutdownShardMigrationStatus that = (ShutdownShardMigrationStatus) o;
return shardsRemaining == that.shardsRemaining && status == that.status && Objects.equals(explanation, that.explanation);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
ShutdownShardMigrationStatus other = (ShutdownShardMigrationStatus) obj;
return status.equals(other.status);
public int hashCode() {
return Objects.hash(status, shardsRemaining, explanation);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,28 @@
package org.elasticsearch.xpack.shutdown;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.ObjectPath;
import org.elasticsearch.test.rest.ESRestTestCase;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

public class NodeShutdownIT extends ESRestTestCase {

Expand Down Expand Up @@ -98,11 +104,13 @@ public void testAllocationPreventedForRemoval() throws Exception {
}

/**
* Checks that a reroute is started immediately after registering a node shutdown, so that shards will actually start moving off of
* the node immediately, rather than waiting for something to trigger it.
* Checks that shards properly move off of a node that's marked for removal, including:
* 1) A reroute needs to be triggered automatically when the node is registered for shutdown, otherwise shards won't start moving
* immediately.
* 2) Ensures the status properly comes to rest at COMPLETE after the shards have moved.
*/
@SuppressWarnings("unchecked")
public void testRerouteStartedOnRemoval() throws Exception {
public void testShardsMoveOffRemovingNode() throws Exception {
String nodeIdToShutdown = getRandomNodeId();

final String indexName = "test-idx";
Expand Down Expand Up @@ -132,15 +140,25 @@ public void testRerouteStartedOnRemoval() throws Exception {
putNodeShutdown(nodeIdToShutdown, "REMOVE");

// assertBusy waiting for the shard to no longer be on that node
AtomicReference<List<Object>> debug = new AtomicReference<>();
assertBusy(() -> {
List<Object> shardsResponse = entityAsList(client().performRequest(checkShardsRequest));
final long shardsOnNodeToShutDown = shardsResponse.stream()
.map(shard -> (Map<String, Object>) shard)
.filter(shard -> nodeIdToShutdown.equals(shard.get("id")))
.filter(shard -> "STARTED".equals(shard.get("state")))
.filter(shard -> "STARTED".equals(shard.get("state")) || "RELOCATING".equals(shard.get("state")))
.count();
assertThat(shardsOnNodeToShutDown, is(0L));
debug.set(shardsResponse);
});

// Now check the shard migration status
Request getStatusRequest = new Request("GET", "_nodes/" + nodeIdToShutdown + "/shutdown");
Response statusResponse = client().performRequest(getStatusRequest);
Map<String, Object> status = entityAsMap(statusResponse);
assertThat(ObjectPath.eval("nodes.0.shard_migration.status", status), equalTo("COMPLETE"));
assertThat(ObjectPath.eval("nodes.0.shard_migration.shard_migrations_remaining", status), equalTo(0));
assertThat(ObjectPath.eval("nodes.0.shard_migration.explanation", status), nullValue());
}

public void testShardsCanBeAllocatedAfterShutdownDeleted() throws Exception {
Expand All @@ -165,6 +183,52 @@ public void testShardsCanBeAllocatedAfterShutdownDeleted() throws Exception {
ensureGreen(indexName);
}

public void testStalledShardMigrationProperlyDetected() throws Exception {
String nodeIdToShutdown = getRandomNodeId();
int numberOfShards = randomIntBetween(1,5);

// Create an index, pin the allocation to the node we're about to shut down
final String indexName = "test-idx";
Request createIndexRequest = new Request("PUT", indexName);
createIndexRequest.setJsonEntity(
"{\"settings\": {\"number_of_shards\": "
+ numberOfShards
+ ", \"number_of_replicas\": 0, \"index.routing.allocation.require._id\": \""
+ nodeIdToShutdown
+ "\"}}"
);
assertOK(client().performRequest(createIndexRequest));

// Mark the node for shutdown
putNodeShutdown(nodeIdToShutdown, "remove");
{
// Now check the shard migration status
Request getStatusRequest = new Request("GET", "_nodes/" + nodeIdToShutdown + "/shutdown");
Response statusResponse = client().performRequest(getStatusRequest);
Map<String, Object> status = entityAsMap(statusResponse);
assertThat(ObjectPath.eval("nodes.0.shard_migration.status", status), equalTo("STALLED"));
assertThat(ObjectPath.eval("nodes.0.shard_migration.shard_migrations_remaining", status), equalTo(numberOfShards));
assertThat(
ObjectPath.eval("nodes.0.shard_migration.explanation", status),
allOf(containsString(indexName), containsString("cannot move, see Cluster Allocation Explain API for details"))
);
}

// Now update the allocation requirements to unblock shard relocation
Request updateSettingsRequest = new Request("PUT", indexName + "/_settings");
updateSettingsRequest.setJsonEntity("{\"index.routing.allocation.require._id\": null}");
assertOK(client().performRequest(updateSettingsRequest));

assertBusy(() -> {
Request getStatusRequest = new Request("GET", "_nodes/" + nodeIdToShutdown + "/shutdown");
Response statusResponse = client().performRequest(getStatusRequest);
Map<String, Object> status = entityAsMap(statusResponse);
assertThat(ObjectPath.eval("nodes.0.shard_migration.status", status), equalTo("COMPLETE"));
assertThat(ObjectPath.eval("nodes.0.shard_migration.shard_migrations_remaining", status), equalTo(0));
assertThat(ObjectPath.eval("nodes.0.shard_migration.explanation", status), nullValue());
});
}

@SuppressWarnings("unchecked")
private void assertNoShuttingDownNodes(String nodeIdToShutdown) throws IOException {
Request getShutdownStatus = new Request("GET", "_nodes/" + nodeIdToShutdown + "/shutdown");
Expand Down
Loading