diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterStateDiffIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterStateDiffIT.java index cf4de829d559c..600770ea5f163 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterStateDiffIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterStateDiffIT.java @@ -9,6 +9,7 @@ package org.elasticsearch.cluster; import com.carrotsearch.hppc.cursors.ObjectCursor; + import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.coordination.CoordinationMetadata; @@ -58,7 +59,7 @@ import static java.util.Collections.emptySet; import static org.elasticsearch.cluster.metadata.AliasMetadata.newAliasMetadataBuilder; import static org.elasticsearch.cluster.routing.RandomShardRoutingMutator.randomChange; -import static org.elasticsearch.cluster.routing.RandomShardRoutingMutator.randomReason; +import static org.elasticsearch.cluster.routing.UnassignedInfoTests.randomUnassignedInfo; import static org.elasticsearch.test.VersionUtils.randomVersion; import static org.elasticsearch.test.XContentTestUtils.convertToMap; import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder; @@ -270,7 +271,7 @@ private IndexRoutingTable randomIndexRoutingTable(String index, String[] nodeIds for (int j = 0; j < replicaCount; j++) { UnassignedInfo unassignedInfo = null; if (randomInt(5) == 1) { - unassignedInfo = new UnassignedInfo(randomReason(), randomAlphaOfLength(10)); + unassignedInfo = randomUnassignedInfo(randomAlphaOfLength(10)); } if (availableNodeIds.isEmpty()) { break; diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java index 8d559824966a0..267db7de99ced 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java @@ -719,6 +719,12 @@ public Map dataStreamAliases() { .orElse(Collections.emptyMap()); } + public Map nodeShutdowns() { + return Optional.ofNullable((NodesShutdownMetadata) this.custom(NodesShutdownMetadata.TYPE)) + .map(NodesShutdownMetadata::getAllNodeMetadataMap) + .orElse(Collections.emptyMap()); + } + public ImmutableOpenMap customs() { return this.customs; } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/NodesShutdownMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/NodesShutdownMetadata.java index a52d27f56a325..59d2743a189be 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/NodesShutdownMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/NodesShutdownMetadata.java @@ -14,10 +14,10 @@ import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.DiffableUtils; import org.elasticsearch.cluster.NamedDiff; -import org.elasticsearch.common.xcontent.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ParseField; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadata.java index bfe73c4d409d9..200878577b411 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadata.java @@ -10,13 +10,16 @@ import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.cluster.Diffable; -import org.elasticsearch.common.xcontent.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ParseField; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; import java.io.IOException; import java.util.Locale; @@ -35,6 +38,7 @@ public class SingleNodeShutdownMetadata extends AbstractDiffable PARSER = new ConstructingObjectParser<>( "node_shutdown_info", @@ -42,7 +46,8 @@ public class SingleNodeShutdownMetadata extends AbstractDiffable TimeValue.parseTimeValue(p.textOrNull(), ALLOCATION_DELAY_FIELD.getPreferredName()), ALLOCATION_DELAY_FIELD, + ObjectParser.ValueType.STRING_OR_NULL + ); } public static SingleNodeShutdownMetadata parse(XContentParser parser) { return PARSER.apply(parser, null); } + public static final TimeValue DEFAULT_RESTART_SHARD_ALLOCATION_DELAY = TimeValue.timeValueMinutes(5); + private final String nodeId; private final Type type; private final String reason; private final long startedAtMillis; + @Nullable private final TimeValue allocationDelay; /** * @param nodeId The node ID that this shutdown metadata refers to. @@ -72,12 +85,17 @@ private SingleNodeShutdownMetadata( String nodeId, Type type, String reason, - long startedAtMillis + long startedAtMillis, + @Nullable TimeValue allocationDelay ) { this.nodeId = Objects.requireNonNull(nodeId, "node ID must not be null"); this.type = Objects.requireNonNull(type, "shutdown type must not be null"); this.reason = Objects.requireNonNull(reason, "shutdown reason must not be null"); this.startedAtMillis = startedAtMillis; + if (allocationDelay != null && Type.RESTART.equals(type) == false) { + throw new IllegalArgumentException("shard allocation delay is only valid for RESTART-type shutdowns"); + } + this.allocationDelay = allocationDelay; } public SingleNodeShutdownMetadata(StreamInput in) throws IOException { @@ -85,6 +103,7 @@ public SingleNodeShutdownMetadata(StreamInput in) throws IOException { this.type = in.readEnum(Type.class); this.reason = in.readString(); this.startedAtMillis = in.readVLong(); + this.allocationDelay = in.readOptionalTimeValue(); } /** @@ -115,12 +134,27 @@ public long getStartedAtMillis() { return startedAtMillis; } + /** + * @return The amount of time shard reallocation should be delayed for shards on this node, so that they will not be automatically + * reassigned while the node is restarting. Will be {@code null} for non-restart shutdowns. + */ + @Nullable + public TimeValue getAllocationDelay() { + if (allocationDelay != null) { + return allocationDelay; + } else if (Type.RESTART.equals(type)) { + return DEFAULT_RESTART_SHARD_ALLOCATION_DELAY; + } + return null; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(nodeId); out.writeEnum(type); out.writeString(reason); out.writeVLong(startedAtMillis); + out.writeOptionalTimeValue(allocationDelay); } @Override @@ -131,6 +165,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(TYPE_FIELD.getPreferredName(), type); builder.field(REASON_FIELD.getPreferredName(), reason); builder.timeField(STARTED_AT_MILLIS_FIELD.getPreferredName(), STARTED_AT_READABLE_FIELD, startedAtMillis); + if (allocationDelay != null) { + builder.field(ALLOCATION_DELAY_FIELD.getPreferredName(), allocationDelay.getStringRep()); + } } builder.endObject(); @@ -145,7 +182,8 @@ public boolean equals(Object o) { return getStartedAtMillis() == that.getStartedAtMillis() && getNodeId().equals(that.getNodeId()) && getType() == that.getType() - && getReason().equals(that.getReason()); + && getReason().equals(that.getReason()) + && Objects.equals(allocationDelay, that.allocationDelay); } @Override @@ -154,7 +192,8 @@ public int hashCode() { getNodeId(), getType(), getReason(), - getStartedAtMillis() + getStartedAtMillis(), + allocationDelay ); } @@ -178,6 +217,7 @@ public static class Builder { private Type type; private String reason; private long startedAtMillis = -1; + private TimeValue allocationDelay; private Builder() {} @@ -217,15 +257,25 @@ public Builder setStartedAtMillis(long startedAtMillis) { return this; } + /** + * @param allocationDelay The amount of time shard reallocation should be delayed while this node is offline. + * @return This builder. + */ + public Builder setAllocationDelay(TimeValue allocationDelay) { + this.allocationDelay = allocationDelay; + return this; + } + public SingleNodeShutdownMetadata build() { if (startedAtMillis == -1) { throw new IllegalArgumentException("start timestamp must be set"); } + return new SingleNodeShutdownMetadata( nodeId, type, reason, - startedAtMillis + startedAtMillis, allocationDelay ); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index 78bade7493875..b207776d49260 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -9,18 +9,20 @@ package org.elasticsearch.cluster.routing; import com.carrotsearch.hppc.cursors.ObjectCursor; + import org.apache.logging.log4j.Logger; import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.Assertions; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus; import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator; import org.elasticsearch.cluster.service.MasterService; -import org.elasticsearch.core.Nullable; import org.elasticsearch.common.Randomness; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; @@ -65,6 +67,8 @@ public class RoutingNodes implements Iterable { private final Map> assignedShards = new HashMap<>(); + private final Map nodeShutdowns; + private final boolean readOnly; private int inactivePrimaryCount = 0; @@ -83,6 +87,7 @@ public RoutingNodes(ClusterState clusterState) { public RoutingNodes(ClusterState clusterState, boolean readOnly) { this.readOnly = readOnly; final RoutingTable routingTable = clusterState.routingTable(); + nodeShutdowns = clusterState.metadata().nodeShutdowns(); Map> nodesToShards = new HashMap<>(); // fill in the nodeToShards with the "live" nodes @@ -533,9 +538,17 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId // re-resolve replica as earlier iteration could have changed source/target of replica relocation ShardRouting replicaShard = getByAllocationId(routing.shardId(), routing.allocationId().getId()); assert replicaShard != null : "failed to re-resolve " + routing + " when failing replicas"; - UnassignedInfo primaryFailedUnassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED, - "primary failed while replica initializing", null, 0, unassignedInfo.getUnassignedTimeInNanos(), - unassignedInfo.getUnassignedTimeInMillis(), false, AllocationStatus.NO_ATTEMPT, Collections.emptySet()); + UnassignedInfo primaryFailedUnassignedInfo = new UnassignedInfo( + UnassignedInfo.Reason.PRIMARY_FAILED, + "primary failed while replica initializing", + null, + 0, + unassignedInfo.getUnassignedTimeInNanos(), + unassignedInfo.getUnassignedTimeInMillis(), + false, + AllocationStatus.NO_ATTEMPT, + Collections.emptySet(), + routing.currentNodeId()); failShard(logger, replicaShard, primaryFailedUnassignedInfo, indexMetadata, routingChangesObserver); } } @@ -858,10 +871,17 @@ public void ignoreShard(ShardRouting shard, AllocationStatus allocationStatus, R UnassignedInfo currInfo = shard.unassignedInfo(); assert currInfo != null; if (allocationStatus.equals(currInfo.getLastAllocationStatus()) == false) { - UnassignedInfo newInfo = new UnassignedInfo(currInfo.getReason(), currInfo.getMessage(), currInfo.getFailure(), - currInfo.getNumFailedAllocations(), currInfo.getUnassignedTimeInNanos(), - currInfo.getUnassignedTimeInMillis(), currInfo.isDelayed(), - allocationStatus, currInfo.getFailedNodeIds()); + UnassignedInfo newInfo = new UnassignedInfo( + currInfo.getReason(), + currInfo.getMessage(), + currInfo.getFailure(), + currInfo.getNumFailedAllocations(), + currInfo.getUnassignedTimeInNanos(), + currInfo.getUnassignedTimeInMillis(), + currInfo.isDelayed(), + allocationStatus, + currInfo.getFailedNodeIds(), + currInfo.getLastAllocatedNodeId()); ShardRouting updatedShard = shard.updateUnassigned(newInfo, shard.recoverySource()); changes.unassignedInfoUpdated(shard, newInfo); shard = updatedShard; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java b/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java index 7ef47eee9e7de..f8ed1f8444350 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java @@ -9,11 +9,12 @@ package org.elasticsearch.cluster.routing; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.Decision; -import org.elasticsearch.core.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -21,9 +22,10 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.time.DateFormatter; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; import java.io.IOException; import java.time.Instant; @@ -31,7 +33,9 @@ import java.util.Collections; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; /** @@ -39,6 +43,12 @@ */ public final class UnassignedInfo implements ToXContentFragment, Writeable { + /** + * The version that the {@code lastAllocatedNode} field was added in. Used to adapt streaming of this class as appropriate for the + * version of the node sending/receiving it. Should be removed once wire compatibility with this version is no longer necessary. + */ + private static final Version VERSION_LAST_ALLOCATED_NODE_ADDED = Version.V_8_0_0; + public static final DateFormatter DATE_TIME_FORMATTER = DateFormatter.forPattern("date_optional_time").withZone(ZoneOffset.UTC); public static final Setting INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING = @@ -114,7 +124,12 @@ public enum Reason { /** * Unassigned as a result of closing an index. */ - INDEX_CLOSED + INDEX_CLOSED, + /** + * Similar to NODE_LEFT, but at the time the node left, it had been registered for a restart via the Node Shutdown API. Note that + * there is no verification that it was ready to be restarted, so this may be an intentional restart or a node crash. + */ + NODE_RESTARTING } /** @@ -208,6 +223,7 @@ public String value() { private final int failedAllocations; private final Set failedNodeIds; private final AllocationStatus lastAllocationStatus; // result of the last allocation attempt for this shard + private final String lastAllocatedNodeId; /** * creates an UnassignedInfo object based on **current** time @@ -217,22 +233,32 @@ public String value() { **/ public UnassignedInfo(Reason reason, String message) { this(reason, message, null, reason == Reason.ALLOCATION_FAILED ? 1 : 0, System.nanoTime(), System.currentTimeMillis(), false, - AllocationStatus.NO_ATTEMPT, Collections.emptySet()); + AllocationStatus.NO_ATTEMPT, Collections.emptySet(), null); } /** - * @param reason the cause for making this shard unassigned. See {@link Reason} for more information. - * @param message more information about cause. - * @param failure the shard level failure that caused this shard to be unassigned, if exists. - * @param unassignedTimeNanos the time to use as the base for any delayed re-assignment calculation - * @param unassignedTimeMillis the time of unassignment used to display to in our reporting. - * @param delayed if allocation of this shard is delayed due to INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING. - * @param lastAllocationStatus the result of the last allocation attempt for this shard - * @param failedNodeIds a set of nodeIds that failed to complete allocations for this shard + * @param reason the cause for making this shard unassigned. See {@link Reason} for more information. + * @param message more information about cause. + * @param failure the shard level failure that caused this shard to be unassigned, if exists. + * @param unassignedTimeNanos the time to use as the base for any delayed re-assignment calculation + * @param unassignedTimeMillis the time of unassignment used to display to in our reporting. + * @param delayed if allocation of this shard is delayed due to INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING. + * @param lastAllocationStatus the result of the last allocation attempt for this shard + * @param failedNodeIds a set of nodeIds that failed to complete allocations for this shard + * @param lastAllocatedNodeId the ID of the node this shard was last allocated to */ - public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Exception failure, int failedAllocations, - long unassignedTimeNanos, long unassignedTimeMillis, boolean delayed, AllocationStatus lastAllocationStatus, - Set failedNodeIds) { + public UnassignedInfo( + Reason reason, + @Nullable String message, + @Nullable Exception failure, + int failedAllocations, + long unassignedTimeNanos, + long unassignedTimeMillis, + boolean delayed, + AllocationStatus lastAllocationStatus, + Set failedNodeIds, + @Nullable String lastAllocatedNodeId + ) { this.reason = Objects.requireNonNull(reason); this.unassignedTimeMillis = unassignedTimeMillis; this.unassignedTimeNanos = unassignedTimeNanos; @@ -242,13 +268,23 @@ public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Excepti this.failedAllocations = failedAllocations; this.lastAllocationStatus = Objects.requireNonNull(lastAllocationStatus); this.failedNodeIds = Set.copyOf(failedNodeIds); - assert (failedAllocations > 0) == (reason == Reason.ALLOCATION_FAILED) : - "failedAllocations: " + failedAllocations + " for reason " + reason; + this.lastAllocatedNodeId = lastAllocatedNodeId; + assert (failedAllocations > 0) == (reason == Reason.ALLOCATION_FAILED) : "failedAllocations: " + + failedAllocations + + " for reason " + + reason; assert (message == null && failure != null) == false : "provide a message if a failure exception is provided"; - assert (delayed && reason != Reason.NODE_LEFT) == false : "shard can only be delayed if it is unassigned due to a node leaving"; + assert (delayed + && reason != Reason.NODE_LEFT + && reason != Reason.NODE_RESTARTING) == false : "shard can only be delayed if it is unassigned due to a node leaving"; + // The below check should be expanded to require `lastAllocatedNodeId` for `NODE_LEFT` as well, once we no longer have to consider + // BWC with versions prior to `VERSION_LAST_ALLOCATED_NODE_ADDED`. + assert (reason == Reason.NODE_RESTARTING && lastAllocatedNodeId == null) == false + : "last allocated node ID must be set if the shard is unassigned due to a node restarting"; } public UnassignedInfo(StreamInput in) throws IOException { + // Because Reason.NODE_RESTARTING is new and can't be sent by older versions, there's no need to vary the deserialization behavior this.reason = Reason.values()[(int) in.readByte()]; this.unassignedTimeMillis = in.readLong(); // As System.nanoTime() cannot be compared across different JVMs, reset it to now. @@ -260,10 +296,19 @@ public UnassignedInfo(StreamInput in) throws IOException { this.failedAllocations = in.readVInt(); this.lastAllocationStatus = AllocationStatus.readFrom(in); this.failedNodeIds = Collections.unmodifiableSet(in.readSet(StreamInput::readString)); + if (in.getVersion().onOrAfter(VERSION_LAST_ALLOCATED_NODE_ADDED)) { + this.lastAllocatedNodeId = in.readOptionalString(); + } else { + this.lastAllocatedNodeId = null; + } } public void writeTo(StreamOutput out) throws IOException { - out.writeByte((byte) reason.ordinal()); + if (reason.equals(Reason.NODE_RESTARTING) && out.getVersion().before(VERSION_LAST_ALLOCATED_NODE_ADDED)) { + out.writeByte((byte) Reason.NODE_LEFT.ordinal()); + } else { + out.writeByte((byte) reason.ordinal()); + } out.writeLong(unassignedTimeMillis); // Do not serialize unassignedTimeNanos as System.nanoTime() cannot be compared across different JVMs out.writeBoolean(delayed); @@ -272,6 +317,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVInt(failedAllocations); lastAllocationStatus.writeTo(out); out.writeCollection(failedNodeIds, StreamOutput::writeString); + if (out.getVersion().onOrAfter(VERSION_LAST_ALLOCATED_NODE_ADDED)) { + out.writeOptionalString(lastAllocatedNodeId); + } } /** @@ -339,6 +387,14 @@ public String getDetails() { return message + (failure == null ? "" : ", failure " + ExceptionsHelper.stackTrace(failure)); } + /** + * Gets the ID of the node this shard was last allocated to, or null if unavailable. + */ + @Nullable + public String getLastAllocatedNodeId() { + return lastAllocatedNodeId; + } + /** * Get the status for the last allocation attempt for this shard. */ @@ -366,8 +422,21 @@ public Set getFailedNodeIds() { * * @return calculated delay in nanoseconds */ - public long getRemainingDelay(final long nanoTimeNow, final Settings indexSettings) { - long delayTimeoutNanos = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexSettings).nanos(); + public long getRemainingDelay( + final long nanoTimeNow, + final Settings indexSettings, + final Map nodesShutdownMap + ) { + final long indexLevelDelay = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexSettings).nanos(); + long delayTimeoutNanos = Optional.ofNullable(lastAllocatedNodeId) + // If the node wasn't restarting when this became unassigned, use default delay + .filter(nodeId -> reason.equals(Reason.NODE_RESTARTING)) + .map(nodesShutdownMap::get) + .filter(shutdownMetadata -> SingleNodeShutdownMetadata.Type.RESTART.equals(shutdownMetadata.getType())) + .map(SingleNodeShutdownMetadata::getAllocationDelay) + .map(TimeValue::nanos) + .map(knownRestartDelay -> Math.max(indexLevelDelay, knownRestartDelay)) + .orElse(indexLevelDelay); assert nanoTimeNow >= unassignedTimeNanos; return Math.max(0L, delayTimeoutNanos - (nanoTimeNow - unassignedTimeNanos)); } @@ -399,7 +468,11 @@ public static long findNextDelayedAllocation(long currentNanoTime, ClusterState if (unassignedInfo.isDelayed()) { Settings indexSettings = metadata.index(shard.index()).getSettings(); // calculate next time to schedule - final long newComputedLeftDelayNanos = unassignedInfo.getRemainingDelay(currentNanoTime, indexSettings); + final long newComputedLeftDelayNanos = unassignedInfo.getRemainingDelay( + currentNanoTime, + indexSettings, + metadata.nodeShutdowns() + ); if (newComputedLeftDelayNanos < nextDelayNanos) { nextDelayNanos = newComputedLeftDelayNanos; } @@ -486,6 +559,11 @@ public boolean equals(Object o) { if (Objects.equals(failure, that.failure) == false) { return false; } + + if (Objects.equals(lastAllocatedNodeId, that.lastAllocatedNodeId) == false) { + return false; + } + return failedNodeIds.equals(that.failedNodeIds); } @@ -499,6 +577,7 @@ public int hashCode() { result = 31 * result + (failure != null ? failure.hashCode() : 0); result = 31 * result + lastAllocationStatus.hashCode(); result = 31 * result + failedNodeIds.hashCode(); + result = 31 * result + (lastAllocatedNodeId != null ? lastAllocatedNodeId.hashCode() : 0); return result; } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index 5e7d008f1e6f2..948ec14bea434 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -19,6 +19,8 @@ import org.elasticsearch.cluster.metadata.AutoExpandReplicas; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.Type; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.RoutingNode; @@ -45,6 +47,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -209,7 +212,7 @@ public ClusterState applyFailedShards(final ClusterState clusterState, final Lis String message = "failed shard on node [" + shardToFail.currentNodeId() + "]: " + failedShardEntry.getMessage(); UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, message, failedShardEntry.getFailure(), failedAllocations + 1, currentNanoTime, System.currentTimeMillis(), false, - AllocationStatus.NO_ATTEMPT, failedNodeIds); + AllocationStatus.NO_ATTEMPT, failedNodeIds, shardToFail.currentNodeId()); if (failedShardEntry.markAsStale()) { allocation.removeAllocationId(failedShard); } @@ -300,13 +303,27 @@ private void removeDelayMarkers(RoutingAllocation allocation) { ShardRouting shardRouting = unassignedIterator.next(); UnassignedInfo unassignedInfo = shardRouting.unassignedInfo(); if (unassignedInfo.isDelayed()) { - final long newComputedLeftDelayNanos = unassignedInfo.getRemainingDelay(allocation.getCurrentNanoTime(), - metadata.getIndexSafe(shardRouting.index()).getSettings()); + final long newComputedLeftDelayNanos = unassignedInfo.getRemainingDelay( + allocation.getCurrentNanoTime(), + metadata.getIndexSafe(shardRouting.index()).getSettings(), + metadata.nodeShutdowns() + ); if (newComputedLeftDelayNanos == 0) { - unassignedIterator.updateUnassigned(new UnassignedInfo(unassignedInfo.getReason(), unassignedInfo.getMessage(), - unassignedInfo.getFailure(), unassignedInfo.getNumFailedAllocations(), unassignedInfo.getUnassignedTimeInNanos(), - unassignedInfo.getUnassignedTimeInMillis(), false, unassignedInfo.getLastAllocationStatus(), - unassignedInfo.getFailedNodeIds()), shardRouting.recoverySource(), allocation.changes()); + unassignedIterator.updateUnassigned( + new UnassignedInfo( + unassignedInfo.getReason(), + unassignedInfo.getMessage(), + unassignedInfo.getFailure(), + unassignedInfo.getNumFailedAllocations(), + unassignedInfo.getUnassignedTimeInNanos(), + unassignedInfo.getUnassignedTimeInMillis(), + false, + unassignedInfo.getLastAllocationStatus(), + unassignedInfo.getFailedNodeIds(), + unassignedInfo.getLastAllocatedNodeId()), + shardRouting.recoverySource(), + allocation.changes() + ); } } } @@ -320,11 +337,21 @@ private void resetFailedAllocationCounter(RoutingAllocation allocation) { while (unassignedIterator.hasNext()) { ShardRouting shardRouting = unassignedIterator.next(); UnassignedInfo unassignedInfo = shardRouting.unassignedInfo(); - unassignedIterator.updateUnassigned(new UnassignedInfo(unassignedInfo.getNumFailedAllocations() > 0 ? - UnassignedInfo.Reason.MANUAL_ALLOCATION : unassignedInfo.getReason(), unassignedInfo.getMessage(), - unassignedInfo.getFailure(), 0, unassignedInfo.getUnassignedTimeInNanos(), - unassignedInfo.getUnassignedTimeInMillis(), unassignedInfo.isDelayed(), - unassignedInfo.getLastAllocationStatus(), Collections.emptySet()), shardRouting.recoverySource(), allocation.changes()); + unassignedIterator.updateUnassigned( + new UnassignedInfo( + unassignedInfo.getNumFailedAllocations() > 0 ? UnassignedInfo.Reason.MANUAL_ALLOCATION : unassignedInfo.getReason(), + unassignedInfo.getMessage(), + unassignedInfo.getFailure(), + 0, + unassignedInfo.getUnassignedTimeInNanos(), + unassignedInfo.getUnassignedTimeInMillis(), + unassignedInfo.isDelayed(), + unassignedInfo.getLastAllocationStatus(), + Collections.emptySet(), + unassignedInfo.getLastAllocatedNodeId()), + shardRouting.recoverySource(), + allocation.changes() + ); } } @@ -460,19 +487,40 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) { } private void disassociateDeadNodes(RoutingAllocation allocation) { + Map nodesShutdownMetadata = allocation.metadata().nodeShutdowns(); + for (Iterator it = allocation.routingNodes().mutableIterator(); it.hasNext(); ) { RoutingNode node = it.next(); if (allocation.nodes().getDataNodes().containsKey(node.nodeId())) { // its a live node, continue continue; } + final UnassignedInfo.Reason + unassignedReason = + nodesShutdownMetadata.containsKey(node.nodeId()) ? + UnassignedInfo.Reason.NODE_RESTARTING : + UnassignedInfo.Reason.NODE_LEFT; // now, go over all the shards routing on the node, and fail them for (ShardRouting shardRouting : node.copyShards()) { final IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index()); - boolean delayed = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetadata.getSettings()).nanos() > 0; - UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left [" + node.nodeId() + "]", - null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT, - Collections.emptySet()); + boolean delayedDueToKnownRestart = Optional.ofNullable(nodesShutdownMetadata.get(node.nodeId())) + .map(shutdown -> Type.RESTART.equals(shutdown.getType()) && shutdown.getAllocationDelay().nanos() > 0) + .orElse(false); + boolean delayed = delayedDueToKnownRestart + || INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetadata.getSettings()).nanos() > 0; + + UnassignedInfo unassignedInfo = new UnassignedInfo( + unassignedReason, + "node_left [" + node.nodeId() + "]", + null, + 0, + allocation.getCurrentNanoTime(), + System.currentTimeMillis(), + delayed, + AllocationStatus.NO_ATTEMPT, + Collections.emptySet(), + shardRouting.currentNodeId() + ); allocation.routingNodes().failShard(logger, shardRouting, unassignedInfo, indexMetadata, allocation.changes()); } // its a dead node, remove it, note, its important to remove it *after* we apply failed shard diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 59a332d6ce5bf..0b223d0e69c3d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -138,11 +138,22 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) { final ShardRouting shardRouting = unassignedIterator.next(); final UnassignedInfo unassignedInfo = shardRouting.unassignedInfo(); if (shardRouting.primary() && unassignedInfo.getLastAllocationStatus() == AllocationStatus.NO_ATTEMPT) { - unassignedIterator.updateUnassigned(new UnassignedInfo(unassignedInfo.getReason(), unassignedInfo.getMessage(), - unassignedInfo.getFailure(), unassignedInfo.getNumFailedAllocations(), unassignedInfo.getUnassignedTimeInNanos(), - unassignedInfo.getUnassignedTimeInMillis(), unassignedInfo.isDelayed(), AllocationStatus.DECIDERS_NO, - unassignedInfo.getFailedNodeIds()), - shardRouting.recoverySource(), allocation.changes()); + unassignedIterator.updateUnassigned( + new UnassignedInfo( + unassignedInfo.getReason(), + unassignedInfo.getMessage(), + unassignedInfo.getFailure(), + unassignedInfo.getNumFailedAllocations(), + unassignedInfo.getUnassignedTimeInNanos(), + unassignedInfo.getUnassignedTimeInMillis(), + unassignedInfo.isDelayed(), + AllocationStatus.DECIDERS_NO, + unassignedInfo.getFailedNodeIds(), + unassignedInfo.getLastAllocatedNodeId() + ), + shardRouting.recoverySource(), + allocation.changes() + ); } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java index eeb76f336bd59..980d88d3c42a8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java @@ -129,7 +129,7 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain) ", " + shardRouting.unassignedInfo().getMessage(); unassignedInfoToUpdate = new UnassignedInfo(UnassignedInfo.Reason.FORCED_EMPTY_PRIMARY, unassignedInfoMessage, shardRouting.unassignedInfo().getFailure(), 0, System.nanoTime(), System.currentTimeMillis(), false, - shardRouting.unassignedInfo().getLastAllocationStatus(), Collections.emptySet()); + shardRouting.unassignedInfo().getLastAllocationStatus(), Collections.emptySet(), null); } initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting, unassignedInfoToUpdate, diff --git a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java index 2108585a54c5b..43adcf9ef70e1 100644 --- a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java @@ -9,6 +9,7 @@ package org.elasticsearch.gateway; import com.carrotsearch.hppc.cursors.ObjectCursor; + import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; @@ -23,10 +24,10 @@ import org.elasticsearch.cluster.routing.allocation.NodeAllocationResult.ShardStoreInfo; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.Decision; -import org.elasticsearch.core.Nullable; -import org.elasticsearch.core.Tuple; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.store.StoreFileMetadata; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetadata; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata; @@ -103,7 +104,7 @@ && canPerformOperationBasedRecovery(primaryStore, shardStores, currentNode) == f "existing allocation of replica to [" + currentNode + "] cancelled, can perform a noop recovery on ["+ nodeWithHighestMatch + "]", null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false, - UnassignedInfo.AllocationStatus.NO_ATTEMPT, failedNodeIds); + UnassignedInfo.AllocationStatus.NO_ATTEMPT, failedNodeIds, null); // don't cancel shard in the loop as it will cause a ConcurrentModificationException shardCancellationActions.add(() -> routingNodes.failShard(logger, shard, unassignedInfo, metadata.getIndexSafe(shard.index()), allocation.changes())); @@ -213,7 +214,11 @@ public AllocateUnassignedDecision makeAllocationDecision(final ShardRouting unas Metadata metadata = allocation.metadata(); IndexMetadata indexMetadata = metadata.index(unassignedShard.index()); totalDelayMillis = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetadata.getSettings()).getMillis(); - long remainingDelayNanos = unassignedInfo.getRemainingDelay(System.nanoTime(), indexMetadata.getSettings()); + long remainingDelayNanos = unassignedInfo.getRemainingDelay( + System.nanoTime(), + indexMetadata.getSettings(), + metadata.nodeShutdowns() + ); remainingDelayMillis = TimeValue.timeValueNanos(remainingDelayNanos).millis(); } return AllocateUnassignedDecision.delayed(remainingDelayMillis, totalDelayMillis, nodeDecisions); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/NodesShutdownMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/NodesShutdownMetadataTests.java index 363659fbacec0..cfa3909c4db5b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/NodesShutdownMetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/NodesShutdownMetadataTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.cluster.Diff; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.AbstractDiffableSerializationTestCase; import java.io.IOException; @@ -78,11 +79,16 @@ protected NodesShutdownMetadata createTestInstance() { } private SingleNodeShutdownMetadata randomNodeShutdownInfo() { - return SingleNodeShutdownMetadata.builder().setNodeId(randomAlphaOfLength(5)) - .setType(randomBoolean() ? SingleNodeShutdownMetadata.Type.REMOVE : SingleNodeShutdownMetadata.Type.RESTART) + final SingleNodeShutdownMetadata.Type type = randomFrom(SingleNodeShutdownMetadata.Type.values()); + final SingleNodeShutdownMetadata.Builder builder = SingleNodeShutdownMetadata.builder() + .setNodeId(randomAlphaOfLength(5)) + .setType(type) .setReason(randomAlphaOfLength(5)) - .setStartedAtMillis(randomNonNegativeLong()) - .build(); + .setStartedAtMillis(randomNonNegativeLong()); + if (type.equals(SingleNodeShutdownMetadata.Type.RESTART) && randomBoolean()) { + builder.setAllocationDelay(TimeValue.parseTimeValue(randomTimeValue(), this.getTestName())); + } + return builder.build(); } @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java index 0b5e08c269ca0..ab7b27dfa6bf4 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.test.ESTestCase; import java.io.IOException; +import java.util.Collections; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; @@ -98,7 +99,20 @@ public void testMoveToUnassigned() { shard = shard.moveToStarted(); logger.info("-- move to unassigned"); - shard = shard.moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, null)); + shard = shard.moveToUnassigned( + new UnassignedInfo( + UnassignedInfo.Reason.NODE_LEFT, + this.getTestName(), + null, + 0, + System.nanoTime(), + System.currentTimeMillis(), + false, + UnassignedInfo.AllocationStatus.NO_ATTEMPT, + Collections.emptySet(), + randomAlphaOfLength(10) + ) + ); assertThat(shard.allocationId(), nullValue()); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/RandomShardRoutingMutator.java b/server/src/test/java/org/elasticsearch/cluster/routing/RandomShardRoutingMutator.java index 14ae849dee906..de6de339e6ada 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/RandomShardRoutingMutator.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/RandomShardRoutingMutator.java @@ -10,6 +10,7 @@ import java.util.Set; +import static org.elasticsearch.cluster.routing.UnassignedInfoTests.randomUnassignedInfo; import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength; import static org.elasticsearch.test.ESTestCase.randomFrom; import static org.elasticsearch.test.ESTestCase.randomInt; @@ -26,9 +27,9 @@ public static ShardRouting randomChange(ShardRouting shardRouting, Set n switch (randomInt(2)) { case 0: if (shardRouting.unassigned() == false && shardRouting.primary() == false) { - shardRouting = shardRouting.moveToUnassigned(new UnassignedInfo(randomReason(), randomAlphaOfLength(10))); + shardRouting = shardRouting.moveToUnassigned(randomUnassignedInfo(randomAlphaOfLength(10), false)); } else if (shardRouting.unassignedInfo() != null) { - shardRouting = shardRouting.updateUnassigned(new UnassignedInfo(randomReason(), randomAlphaOfLength(10)), + shardRouting = shardRouting.updateUnassigned(randomUnassignedInfo(randomAlphaOfLength(10), false), shardRouting.recoverySource()); } break; @@ -45,30 +46,4 @@ public static ShardRouting randomChange(ShardRouting shardRouting, Set n } return shardRouting; } - - - public static UnassignedInfo.Reason randomReason() { - switch (randomInt(9)) { - case 0: - return UnassignedInfo.Reason.INDEX_CREATED; - case 1: - return UnassignedInfo.Reason.CLUSTER_RECOVERED; - case 2: - return UnassignedInfo.Reason.INDEX_REOPENED; - case 3: - return UnassignedInfo.Reason.DANGLING_INDEX_IMPORTED; - case 4: - return UnassignedInfo.Reason.NEW_INDEX_RESTORED; - case 5: - return UnassignedInfo.Reason.EXISTING_INDEX_RESTORED; - case 6: - return UnassignedInfo.Reason.REPLICA_ADDED; - case 7: - return UnassignedInfo.Reason.ALLOCATION_FAILED; - case 8: - return UnassignedInfo.Reason.NODE_LEFT; - default: - return UnassignedInfo.Reason.REROUTE_CANCELLED; - } - } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java index 092f7c651b075..21c1e8b3484fd 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java @@ -10,12 +10,14 @@ import com.carrotsearch.hppc.IntHashSet; import com.carrotsearch.randomizedtesting.generators.RandomPicks; + import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus; @@ -25,6 +27,7 @@ import org.elasticsearch.common.io.stream.ByteBufferStreamInput; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.repositories.IndexId; @@ -34,6 +37,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -64,7 +70,8 @@ public void testReasonOrdinalOrder() { UnassignedInfo.Reason.PRIMARY_FAILED, UnassignedInfo.Reason.FORCED_EMPTY_PRIMARY, UnassignedInfo.Reason.MANUAL_ALLOCATION, - UnassignedInfo.Reason.INDEX_CLOSED,}; + UnassignedInfo.Reason.INDEX_CLOSED, + UnassignedInfo.Reason.NODE_RESTARTING}; for (int i = 0; i < order.length; i++) { assertThat(order[i].ordinal(), equalTo(i)); } @@ -76,10 +83,41 @@ public void testSerialization() throws Exception { int failedAllocations = randomIntBetween(1, 100); Set failedNodes = IntStream.range(0, between(0, failedAllocations)) .mapToObj(n -> "failed-node-" + n).collect(Collectors.toSet()); - UnassignedInfo meta = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ? - new UnassignedInfo(reason, randomBoolean() ? randomAlphaOfLength(4) : null, null, - failedAllocations, System.nanoTime(), System.currentTimeMillis(), false, AllocationStatus.NO_ATTEMPT, failedNodes): - new UnassignedInfo(reason, randomBoolean() ? randomAlphaOfLength(4) : null); + + UnassignedInfo meta; + if (reason == UnassignedInfo.Reason.ALLOCATION_FAILED) { + meta = new UnassignedInfo( + reason, + randomBoolean() ? randomAlphaOfLength(4) : null, + null, + failedAllocations, + System.nanoTime(), + System.currentTimeMillis(), + false, + AllocationStatus.NO_ATTEMPT, + failedNodes, + null); + } else if (reason == UnassignedInfo.Reason.NODE_LEFT || reason == UnassignedInfo.Reason.NODE_RESTARTING) { + String lastAssignedNodeId = randomAlphaOfLength(10); + if (reason == UnassignedInfo.Reason.NODE_LEFT && randomBoolean()) { + // If the reason is `NODE_LEFT`, sometimes we'll have an empty lastAllocatedNodeId due to BWC + lastAssignedNodeId = null; + } + meta = new UnassignedInfo( + reason, + randomBoolean() ? randomAlphaOfLength(4) : null, + null, + 0, + System.nanoTime(), + System.currentTimeMillis(), + false, + AllocationStatus.NO_ATTEMPT, + Collections.emptySet(), + lastAssignedNodeId + ); + } else { + meta = new UnassignedInfo(reason, randomBoolean() ? randomAlphaOfLength(4) : null); + } BytesStreamOutput out = new BytesStreamOutput(); meta.writeTo(out); out.close(); @@ -91,6 +129,7 @@ public void testSerialization() throws Exception { assertThat(read.getDetails(), equalTo(meta.getDetails())); assertThat(read.getNumFailedAllocations(), equalTo(meta.getNumFailedAllocations())); assertThat(read.getFailedNodeIds(), equalTo(meta.getFailedNodeIds())); + assertThat(read.getLastAllocatedNodeId(), equalTo(meta.getLastAllocatedNodeId())); } public void testIndexCreated() { @@ -290,23 +329,168 @@ public void testFailedShard() { } /** - * Verifies that delayed allocation calculation are correct. + * Verifies that delayed allocation calculation are correct when there are no registered node shutdowns. + */ + public void testRemainingDelayCalculationWithNoShutdowns() throws Exception { + checkRemainingDelayCalculation( + "bogusNodeId", + TimeValue.timeValueNanos(10), + Collections.emptyMap(), + TimeValue.timeValueNanos(10), + false + ); + } + + /** + * Verifies that delayed allocation calculations are correct when there are registered node shutdowns for nodes which are not relevant + * to the shard currently being evaluated. + */ + public void testRemainingDelayCalculationsWithUnrelatedShutdowns() throws Exception { + String lastNodeId = "bogusNodeId"; + Map shutdowns = new HashMap<>(); + int numberOfShutdowns = randomIntBetween(1,15); + for (int i = 0; i <= numberOfShutdowns; i++) { + SingleNodeShutdownMetadata shutdown = SingleNodeShutdownMetadata.builder() + .setNodeId(randomValueOtherThan(lastNodeId, () -> randomAlphaOfLengthBetween(5,10))) + .setReason(this.getTestName()) + .setStartedAtMillis(randomNonNegativeLong()) + .setType(randomFrom(EnumSet.allOf(SingleNodeShutdownMetadata.Type.class))) + .build(); + shutdowns.put(shutdown.getNodeId(), shutdown); + } + checkRemainingDelayCalculation(lastNodeId, TimeValue.timeValueNanos(10), shutdowns, TimeValue.timeValueNanos(10), false); + } + + /** + * Verifies that delay calculation is not impacted when the node the shard was last assigned to was registered for removal. + */ + public void testRemainingDelayCalculationWhenNodeIsShuttingDownForRemoval() throws Exception { + String lastNodeId = "bogusNodeId"; + Map shutdowns = new HashMap<>(); + SingleNodeShutdownMetadata shutdown = SingleNodeShutdownMetadata.builder() + .setNodeId(lastNodeId) + .setReason(this.getTestName()) + .setStartedAtMillis(randomNonNegativeLong()) + .setType(SingleNodeShutdownMetadata.Type.REMOVE) + .build(); + shutdowns.put(shutdown.getNodeId(), shutdown); + + checkRemainingDelayCalculation(lastNodeId, TimeValue.timeValueNanos(10), shutdowns, TimeValue.timeValueNanos(10), false); + } + + /** + * Verifies that the delay calculation uses the configured delay value for nodes known to be restarting, because they are registered for + * a `RESTART`-type shutdown, rather than the default global delay. */ - public void testRemainingDelayCalculation() throws Exception { + public void testRemainingDelayCalculationWhenNodeIsKnownToBeRestartingWithCustomDelay() throws Exception { + String lastNodeId = "bogusNodeId"; + Map shutdowns = new HashMap<>(); + SingleNodeShutdownMetadata shutdown = SingleNodeShutdownMetadata.builder() + .setNodeId(lastNodeId) + .setReason(this.getTestName()) + .setStartedAtMillis(randomNonNegativeLong()) + .setType(SingleNodeShutdownMetadata.Type.RESTART) + .setAllocationDelay(TimeValue.timeValueMinutes(1)) + .build(); + shutdowns.put(shutdown.getNodeId(), shutdown); + + // Use a different index-level delay so this test will fail if that one gets used instead of the one from the shutdown metadata + checkRemainingDelayCalculation(lastNodeId, TimeValue.timeValueNanos(10), shutdowns, TimeValue.timeValueMinutes(1), true); + } + + /** + * Verifies that the delay calculation uses the default delay value for nodes known to be restarting, because they are registered for + * a `RESTART`-type shutdown, rather than the default global delay. + */ + public void testRemainingDelayCalculationWhenNodeIsKnownToBeRestartingWithDefaultDelay() throws Exception { + String lastNodeId = "bogusNodeId"; + Map shutdowns = new HashMap<>(); + + // Note that we do not explicitly configure the reallocation delay here. + SingleNodeShutdownMetadata shutdown = SingleNodeShutdownMetadata.builder() + .setNodeId(lastNodeId) + .setReason(this.getTestName()) + .setStartedAtMillis(randomNonNegativeLong()) + .setType(SingleNodeShutdownMetadata.Type.RESTART) + .build(); + shutdowns.put(shutdown.getNodeId(), shutdown); + + // Use a different index-level delay so this test will fail if that one gets used instead of the one from the shutdown metadata + checkRemainingDelayCalculation( + lastNodeId, + TimeValue.timeValueNanos(10), + shutdowns, + SingleNodeShutdownMetadata.DEFAULT_RESTART_SHARD_ALLOCATION_DELAY, + true + ); + } + + public void testRemainingDelayUsesIndexLevelDelayIfNodeWasNotRestartingWhenShardBecameUnassigned() throws Exception { + String lastNodeId = "bogusNodeId"; + Map shutdowns = new HashMap<>(); + + // Generate a random time value - but don't use nanos as extremely small values of nanos can break assertion calculations + final TimeValue shutdownDelay = TimeValue.parseTimeValue( + randomTimeValue(100, 1000, "d", "h", "ms", "s", "m", "micros"), + this.getTestName() + ); + SingleNodeShutdownMetadata shutdown = SingleNodeShutdownMetadata.builder() + .setNodeId(lastNodeId) + .setReason(this.getTestName()) + .setStartedAtMillis(randomNonNegativeLong()) + .setType(SingleNodeShutdownMetadata.Type.RESTART) + .setAllocationDelay(shutdownDelay) + .build(); + shutdowns.put(shutdown.getNodeId(), shutdown); + + // We want an index level delay that's less than the shutdown delay to avoid picking the index-level delay because it's larger + final TimeValue indexLevelDelay = randomValueOtherThanMany( + tv -> shutdownDelay.compareTo(tv) < 0, + () -> TimeValue.parseTimeValue(randomTimeValue(1, 1000, "d", "h", "ms", "s", "m", "micros"), this.getTestName()) + ); + + logger.info("index level delay: {}, shutdown delay: {}", indexLevelDelay, shutdownDelay); + checkRemainingDelayCalculation( + lastNodeId, + indexLevelDelay, + shutdowns, + indexLevelDelay, + false + ); + } + + private void checkRemainingDelayCalculation( + String lastNodeId, + TimeValue indexLevelTimeoutSetting, + Map nodeShutdowns, + TimeValue expectedTotalDelay, + boolean nodeRestarting + ) throws Exception { final long baseTime = System.nanoTime(); - UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "test", null, 0, baseTime, - System.currentTimeMillis(), randomBoolean(), AllocationStatus.NO_ATTEMPT, Collections.emptySet()); - final long totalDelayNanos = TimeValue.timeValueMillis(10).nanos(); + UnassignedInfo unassignedInfo = new UnassignedInfo( + nodeRestarting ? UnassignedInfo.Reason.NODE_RESTARTING : UnassignedInfo.Reason.NODE_LEFT, + "test", + null, + 0, + baseTime, + System.currentTimeMillis(), + randomBoolean(), + AllocationStatus.NO_ATTEMPT, + Collections.emptySet(), + lastNodeId + ); + final long totalDelayNanos = expectedTotalDelay.nanos(); final Settings indexSettings = Settings.builder() - .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueNanos(totalDelayNanos)).build(); - long delay = unassignedInfo.getRemainingDelay(baseTime, indexSettings); + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), indexLevelTimeoutSetting) + .build(); + long delay = unassignedInfo.getRemainingDelay(baseTime, indexSettings, nodeShutdowns); assertThat(delay, equalTo(totalDelayNanos)); - long delta1 = randomIntBetween(1, (int) (totalDelayNanos - 1)); - delay = unassignedInfo.getRemainingDelay(baseTime + delta1, indexSettings); + long delta1 = randomLongBetween(1, (totalDelayNanos - 1)); + delay = unassignedInfo.getRemainingDelay(baseTime + delta1, indexSettings, nodeShutdowns); assertThat(delay, equalTo(totalDelayNanos - delta1)); - delay = unassignedInfo.getRemainingDelay(baseTime + totalDelayNanos, indexSettings); + delay = unassignedInfo.getRemainingDelay(baseTime + totalDelayNanos, indexSettings, nodeShutdowns); assertThat(delay, equalTo(0L)); - delay = unassignedInfo.getRemainingDelay(baseTime + totalDelayNanos + randomIntBetween(1, 20), indexSettings); + delay = unassignedInfo.getRemainingDelay(baseTime + totalDelayNanos + randomIntBetween(1, 20), indexSettings, nodeShutdowns); assertThat(delay, equalTo(0L)); } @@ -386,4 +570,39 @@ public void testAllocationStatusSerialization() throws IOException { assertThat(readStatus, equalTo(allocationStatus)); } } + + public static UnassignedInfo randomUnassignedInfo(String message) { + return randomUnassignedInfo(message, null); + } + + /** + * Randomly generates an UnassignedInfo. + * @param message The message to be used. + * @param delayed Used for the `delayed` flag if provided. + * @return A randomly-generated UnassignedInfo with the given message and delayed value (if any) + */ + public static UnassignedInfo randomUnassignedInfo(String message, @Nullable Boolean delayed) { + UnassignedInfo.Reason reason = randomFrom(UnassignedInfo.Reason.values()); + String lastAllocatedNodeId = null; + boolean delayedFlag = delayed == null ? false : delayed; + if (reason == UnassignedInfo.Reason.NODE_LEFT || reason == UnassignedInfo.Reason.NODE_RESTARTING) { + if (randomBoolean() && delayed == null) { + delayedFlag = true; + } + lastAllocatedNodeId = randomAlphaOfLength(10); + } + int failedAllocations = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ? 1 : 0; + return new UnassignedInfo( + reason, + message, + null, + failedAllocations, + System.nanoTime(), + System.currentTimeMillis(), + delayedFlag, + UnassignedInfo.AllocationStatus.NO_ATTEMPT, + Collections.emptySet(), + lastAllocatedNodeId + ); + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java index c25ca14028481..fc3b66bd34a70 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java @@ -108,9 +108,17 @@ public void testCanAllocatePrimaryExistingInRestoreInProgress() { shardState = RestoreInProgress.State.FAILURE; UnassignedInfo currentInfo = primary.unassignedInfo(); - UnassignedInfo newInfo = new UnassignedInfo(currentInfo.getReason(), currentInfo.getMessage(), new IOException("i/o failure"), - currentInfo.getNumFailedAllocations(), currentInfo.getUnassignedTimeInNanos(), currentInfo.getUnassignedTimeInMillis(), - currentInfo.isDelayed(), currentInfo.getLastAllocationStatus(), currentInfo.getFailedNodeIds()); + UnassignedInfo newInfo = new UnassignedInfo( + currentInfo.getReason(), + currentInfo.getMessage(), + new IOException("i/o failure"), + currentInfo.getNumFailedAllocations(), + currentInfo.getUnassignedTimeInNanos(), + currentInfo.getUnassignedTimeInMillis(), + currentInfo.isDelayed(), + currentInfo.getLastAllocationStatus(), + currentInfo.getFailedNodeIds(), + currentInfo.getLastAllocatedNodeId()); primary = primary.updateUnassigned(newInfo, primary.recoverySource()); IndexRoutingTable indexRoutingTable = routingTable.index("test"); diff --git a/server/src/test/java/org/elasticsearch/gateway/PriorityComparatorTests.java b/server/src/test/java/org/elasticsearch/gateway/PriorityComparatorTests.java index 7477849c4288c..0b41e4d626032 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PriorityComparatorTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PriorityComparatorTests.java @@ -13,7 +13,6 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; -import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.test.ESTestCase; @@ -25,6 +24,7 @@ import java.util.Locale; import java.util.Map; +import static org.elasticsearch.cluster.routing.UnassignedInfoTests.randomUnassignedInfo; import static org.hamcrest.Matchers.greaterThan; import static org.mockito.Mockito.mock; @@ -34,9 +34,9 @@ public void testPreferNewIndices() { RoutingNodes.UnassignedShards shards = new RoutingNodes.UnassignedShards(mock(RoutingNodes.class)); List shardRoutings = Arrays.asList( TestShardRouting.newShardRouting("oldest", 0, null, null, - randomBoolean(), ShardRoutingState.UNASSIGNED, new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()), "foobar")), + randomBoolean(), ShardRoutingState.UNASSIGNED, randomUnassignedInfo("foobar")), TestShardRouting.newShardRouting("newest", 0, null, null, - randomBoolean(), ShardRoutingState.UNASSIGNED, new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()), "foobar"))); + randomBoolean(), ShardRoutingState.UNASSIGNED, randomUnassignedInfo("foobar"))); Collections.shuffle(shardRoutings, random()); for (ShardRouting routing : shardRoutings) { shards.add(routing); @@ -68,9 +68,9 @@ public void testPreferPriorityIndices() { RoutingNodes.UnassignedShards shards = new RoutingNodes.UnassignedShards(mock(RoutingNodes.class)); List shardRoutings = Arrays.asList( TestShardRouting.newShardRouting("oldest", 0, null, null, - randomBoolean(), ShardRoutingState.UNASSIGNED, new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()), "foobar")), + randomBoolean(), ShardRoutingState.UNASSIGNED, randomUnassignedInfo("foobar")), TestShardRouting.newShardRouting("newest", 0, null, null, - randomBoolean(), ShardRoutingState.UNASSIGNED, new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()), "foobar"))); + randomBoolean(), ShardRoutingState.UNASSIGNED, randomUnassignedInfo("foobar"))); Collections.shuffle(shardRoutings, random()); for (ShardRouting routing : shardRoutings) { shards.add(routing); @@ -102,9 +102,9 @@ public void testPreferSystemIndices() { RoutingNodes.UnassignedShards shards = new RoutingNodes.UnassignedShards(mock(RoutingNodes.class)); List shardRoutings = Arrays.asList( TestShardRouting.newShardRouting("oldest", 0, null, null, - randomBoolean(), ShardRoutingState.UNASSIGNED, new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()), "foobar")), + randomBoolean(), ShardRoutingState.UNASSIGNED, randomUnassignedInfo("foobar")), TestShardRouting.newShardRouting("newest", 0, null, null, - randomBoolean(), ShardRoutingState.UNASSIGNED, new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()), "foobar"))); + randomBoolean(), ShardRoutingState.UNASSIGNED, randomUnassignedInfo("foobar"))); Collections.shuffle(shardRoutings, random()); for (ShardRouting routing : shardRoutings) { shards.add(routing); @@ -165,8 +165,7 @@ public void testPriorityComparatorSort() { for (int i = 0; i < numShards; i++) { IndexMetadata indexMeta = randomFrom(indices); shards.add(TestShardRouting.newShardRouting(indexMeta.getIndex().getName(), randomIntBetween(1, 5), null, null, - randomBoolean(), ShardRoutingState.UNASSIGNED, new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()), - "foobar"))); + randomBoolean(), ShardRoutingState.UNASSIGNED, randomUnassignedInfo("foobar"))); } shards.sort(new PriorityComparator() { @Override diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java index 0b5a289a32f5e..75369c0c89fd2 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java @@ -190,8 +190,18 @@ public void testCancelRecoveryIfFoundCopyWithNoopRetentionLease() { unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null); } else { failedNodeIds = new HashSet<>(randomSubsetOf(Set.of("node-4", "node-5", "node-6", "node-7"))); - unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, null, null, randomIntBetween(1, 10), - System.nanoTime(), System.currentTimeMillis(), false, UnassignedInfo.AllocationStatus.NO_ATTEMPT, failedNodeIds); + unassignedInfo = new UnassignedInfo( + UnassignedInfo.Reason.ALLOCATION_FAILED, + null, + null, + randomIntBetween(1, 10), + System.nanoTime(), + System.currentTimeMillis(), + false, + UnassignedInfo.AllocationStatus.NO_ATTEMPT, + failedNodeIds, + null + ); } RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders(), unassignedInfo); long retainingSeqNo = randomLongBetween(1, Long.MAX_VALUE); @@ -375,8 +385,18 @@ public void testNotCancellingRecoveryIfSyncedOnExistingRecovery() { if (randomBoolean()) { unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null); } else { - unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, null, null, randomIntBetween(1, 10), - System.nanoTime(), System.currentTimeMillis(), false, UnassignedInfo.AllocationStatus.NO_ATTEMPT, Set.of("node-4")); + unassignedInfo = new UnassignedInfo( + UnassignedInfo.Reason.ALLOCATION_FAILED, + null, + null, + randomIntBetween(1, 10), + System.nanoTime(), + System.currentTimeMillis(), + false, + UnassignedInfo.AllocationStatus.NO_ATTEMPT, + Set.of("node-4"), + null + ); } RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders(), unassignedInfo); List retentionLeases = new ArrayList<>(); @@ -417,7 +437,7 @@ public void testDoNotCancelForBrokenNode() { } UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, null, null, randomIntBetween(failedNodes.size(), 10), System.nanoTime(), System.currentTimeMillis(), false, - UnassignedInfo.AllocationStatus.NO_ATTEMPT, failedNodes); + UnassignedInfo.AllocationStatus.NO_ATTEMPT, failedNodes, null); RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders(), unassignedInfo); long retainingSeqNoOnPrimary = randomLongBetween(0, Long.MAX_VALUE); List retentionLeases = Arrays.asList( @@ -446,6 +466,9 @@ private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders decide boolean delayed = reason == UnassignedInfo.Reason.NODE_LEFT && UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(settings).nanos() > 0; int failedAllocations = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ? 1 : 0; + final String lastAllocatedNodeId = reason == UnassignedInfo.Reason.NODE_RESTARTING || randomBoolean() + ? randomAlphaOfLength(10) + : null; RoutingTable routingTable = RoutingTable.builder() .add(IndexRoutingTable.builder(shardId.getIndex()) .addIndexShard(new IndexShardRoutingTable.Builder(shardId) @@ -454,7 +477,7 @@ private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders decide RecoverySource.PeerRecoverySource.INSTANCE, new UnassignedInfo(reason, null, null, failedAllocations, System.nanoTime(), System.currentTimeMillis(), delayed, UnassignedInfo.AllocationStatus.NO_ATTEMPT, - Collections.emptySet()))) + Collections.emptySet(), lastAllocatedNodeId))) .build()) ) .build(); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/routing/TestShardRouting.java b/test/framework/src/main/java/org/elasticsearch/cluster/routing/TestShardRouting.java index 14e1ac5fb86d3..79e4cb4f76f8a 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/routing/TestShardRouting.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/routing/TestShardRouting.java @@ -15,10 +15,13 @@ import org.elasticsearch.repositories.IndexId; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; -import org.elasticsearch.test.ESTestCase; + +import java.util.Collections; import static org.apache.lucene.util.LuceneTestCase.random; import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength; +import static org.elasticsearch.test.ESTestCase.randomBoolean; +import static org.elasticsearch.test.ESTestCase.randomFrom; /** * A helper that allows to create shard routing instances within tests, while not requiring to expose @@ -88,7 +91,7 @@ private static RecoverySource buildRecoveryTarget(boolean primary, ShardRoutingS case UNASSIGNED: case INITIALIZING: if (primary) { - return ESTestCase.randomFrom(RecoverySource.EmptyStoreRecoverySource.INSTANCE, + return randomFrom(RecoverySource.EmptyStoreRecoverySource.INSTANCE, RecoverySource.ExistingStoreRecoverySource.INSTANCE); } else { return RecoverySource.PeerRecoverySource.INSTANCE; @@ -120,7 +123,7 @@ private static UnassignedInfo buildUnassignedInfo(ShardRoutingState state) { switch (state) { case UNASSIGNED: case INITIALIZING: - return new UnassignedInfo(ESTestCase.randomFrom(UnassignedInfo.Reason.values()), "auto generated for test"); + return randomUnassignedInfo("auto generated for test"); case STARTED: case RELOCATING: return null; @@ -129,8 +132,33 @@ private static UnassignedInfo buildUnassignedInfo(ShardRoutingState state) { } } + public static UnassignedInfo randomUnassignedInfo(String message) { + UnassignedInfo.Reason reason = randomFrom(UnassignedInfo.Reason.values()); + String lastAllocatedNodeId = null; + boolean delayed = false; + if (reason == UnassignedInfo.Reason.NODE_LEFT || reason == UnassignedInfo.Reason.NODE_RESTARTING) { + if (randomBoolean()) { + delayed = true; + } + lastAllocatedNodeId = randomAlphaOfLength(10); + } + int failedAllocations = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ? 1 : 0; + return new UnassignedInfo( + reason, + message, + null, + failedAllocations, + System.nanoTime(), + System.currentTimeMillis(), + delayed, + UnassignedInfo.AllocationStatus.NO_ATTEMPT, + Collections.emptySet(), + lastAllocatedNodeId + ); + } + public static RecoverySource randomRecoverySource() { - return ESTestCase.randomFrom(RecoverySource.EmptyStoreRecoverySource.INSTANCE, + return randomFrom(RecoverySource.EmptyStoreRecoverySource.INSTANCE, RecoverySource.ExistingStoreRecoverySource.INSTANCE, RecoverySource.PeerRecoverySource.INSTANCE, RecoverySource.LocalShardsRecoverySource.INSTANCE, diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/CheckShrinkReadyStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/CheckShrinkReadyStepTests.java index 1c27a4f9ea0dd..00fb39ac1b682 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/CheckShrinkReadyStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/CheckShrinkReadyStepTests.java @@ -244,7 +244,7 @@ public void testExecuteAllocateReplicaUnassigned() { IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index) .addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true, ShardRoutingState.STARTED)) .addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), null, null, false, ShardRoutingState.UNASSIGNED, - new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()), "the shard is intentionally unassigned"))); + randomUnassignedInfo("the shard is intentionally unassigned"))); CheckShrinkReadyStep step = createRandomInstance(); assertAllocateStatus(index, 1, 1, step, existingSettings, node1Settings, node2Settings, indexRoutingTable, @@ -458,4 +458,29 @@ private void assertAllocateStatus(Index index, int shards, int replicas, CheckSh assertEquals(expectedResult.isComplete(), actualResult.isComplete()); assertEquals(expectedResult.getInfomationContext(), actualResult.getInfomationContext()); } + + public static UnassignedInfo randomUnassignedInfo(String message) { + UnassignedInfo.Reason reason = randomFrom(UnassignedInfo.Reason.values()); + String lastAllocatedNodeId = null; + boolean delayed = false; + if (reason == UnassignedInfo.Reason.NODE_LEFT || reason == UnassignedInfo.Reason.NODE_RESTARTING) { + if (randomBoolean()) { + delayed = true; + } + lastAllocatedNodeId = randomAlphaOfLength(10); + } + int failedAllocations = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ? 1 : 0; + return new UnassignedInfo( + reason, + message, + null, + failedAllocations, + System.nanoTime(), + System.currentTimeMillis(), + delayed, + UnassignedInfo.AllocationStatus.NO_ATTEMPT, + Collections.emptySet(), + lastAllocatedNodeId + ); + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/DataTierMigrationRoutedStepTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/DataTierMigrationRoutedStepTests.java index 6c321a990c219..57349053e9e11 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/DataTierMigrationRoutedStepTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/DataTierMigrationRoutedStepTests.java @@ -6,7 +6,6 @@ */ package org.elasticsearch.xpack.core.ilm; - import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -18,8 +17,6 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; -import org.elasticsearch.cluster.routing.UnassignedInfo; -import org.elasticsearch.cluster.routing.UnassignedInfo.Reason; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.xpack.core.DataTier; @@ -32,6 +29,7 @@ import static java.util.Collections.emptyMap; import static org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider.INDEX_ROUTING_PREFER; +import static org.elasticsearch.xpack.core.ilm.CheckShrinkReadyStepTests.randomUnassignedInfo; import static org.elasticsearch.xpack.core.ilm.step.info.AllocationInfo.waitingForActiveShardsAllocationInfo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; @@ -77,7 +75,7 @@ public void testExecuteWithUnassignedShard() { IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(index) .addShard(TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true, ShardRoutingState.STARTED)) .addShard(TestShardRouting.newShardRouting(new ShardId(index, 1), null, null, true, ShardRoutingState.UNASSIGNED, - new UnassignedInfo(randomFrom(Reason.values()), "the shard is intentionally unassigned"))); + randomUnassignedInfo("the shard is intentionally unassigned"))); ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metadata(Metadata.builder().put(indexMetadata, true).build()) diff --git a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlNodeShutdownIT.java b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlNodeShutdownIT.java index bbc5520dbb595..aebfa48381911 100644 --- a/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlNodeShutdownIT.java +++ b/x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlNodeShutdownIT.java @@ -78,9 +78,15 @@ public void testJobsVacateShuttingDownNode() throws Exception { }); // Call the shutdown API for the chosen node. - client().execute(PutShutdownNodeAction.INSTANCE, - new PutShutdownNodeAction.Request(nodeIdToShutdown.get(), randomFrom(SingleNodeShutdownMetadata.Type.values()), "just testing")) - .actionGet(); + client().execute( + PutShutdownNodeAction.INSTANCE, + new PutShutdownNodeAction.Request( + nodeIdToShutdown.get(), + randomFrom(SingleNodeShutdownMetadata.Type.values()), + "just testing", + null + ) + ).actionGet(); // Wait for the desired end state of all 6 jobs running on nodes that are not shutting down. assertBusy(() -> { @@ -145,8 +151,15 @@ public void testCloseJobVacatingShuttingDownNode() throws Exception { }); // Call the shutdown API for the chosen node. - client().execute(PutShutdownNodeAction.INSTANCE, - new PutShutdownNodeAction.Request(nodeIdToShutdown.get(), randomFrom(SingleNodeShutdownMetadata.Type.values()), "just testing")) + client().execute( + PutShutdownNodeAction.INSTANCE, + new PutShutdownNodeAction.Request( + nodeIdToShutdown.get(), + randomFrom(SingleNodeShutdownMetadata.Type.values()), + "just testing", + null + ) + ) .actionGet(); if (randomBoolean()) { diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java index 199fb3f3adc96..2320a154c2a95 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java @@ -47,6 +47,7 @@ import static org.elasticsearch.common.xcontent.XContentHelper.convertToJson; import static org.elasticsearch.common.xcontent.XContentHelper.stripWhitespace; +import static org.elasticsearch.xpack.core.ilm.CheckShrinkReadyStepTests.randomUnassignedInfo; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; @@ -424,8 +425,7 @@ private static IndexRoutingTable mockIndexRoutingTable(final Index index, --unassignedTotal; --unassignedPrimaries; - final UnassignedInfo unassignedInfo = - new UnassignedInfo(randomFrom(UnassignedInfo.Reason.values()), randomAlphaOfLength(3)); + final UnassignedInfo unassignedInfo = randomUnassignedInfo(randomAlphaOfLength(3)); final String nodeId; final ShardRoutingState state; diff --git a/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownDelayedAllocationIT.java b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownDelayedAllocationIT.java new file mode 100644 index 0000000000000..c89c95d1cf332 --- /dev/null +++ b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownDelayedAllocationIT.java @@ -0,0 +1,235 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.shutdown; + +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalTestCluster; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) +public class NodeShutdownDelayedAllocationIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(ShutdownPlugin.class); + } + + public void testShardAllocationIsDelayedForRestartingNode() throws Exception { + internalCluster().startNodes(3); + prepareCreate("test").setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), 0) // Disable "normal" delayed allocation + ).get(); + ensureGreen("test"); + indexRandomData(); + + final String nodeToRestartId = findIdOfNodeWithShard(); + final String nodeToRestartName = findNodeNameFromId(nodeToRestartId); + + // Mark the node for shutdown + PutShutdownNodeAction.Request putShutdownRequest = new PutShutdownNodeAction.Request( + nodeToRestartId, + SingleNodeShutdownMetadata.Type.RESTART, + this.getTestName(), + null // Make sure it works with the default - we'll check this override in other tests + ); + AcknowledgedResponse putShutdownResponse = client().execute(PutShutdownNodeAction.INSTANCE, putShutdownRequest).get(); + assertTrue(putShutdownResponse.isAcknowledged()); + + internalCluster().restartNode(nodeToRestartName, new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + assertBusy( + () -> { assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(1)); } + ); + return super.onNodeStopped(nodeName); + } + }); + + // And the index should turn green again + ensureGreen("test"); + } + + public void testShardAllocationWillProceedAfterTimeout() throws Exception { + internalCluster().startNodes(3); + prepareCreate("test").setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), 0) // Disable "normal" delayed allocation + ).get(); + ensureGreen("test"); + indexRandomData(); + + final String nodeToRestartId = findIdOfNodeWithShard(); + final String nodeToRestartName = findNodeNameFromId(nodeToRestartId); + + // Mark the node for shutdown + PutShutdownNodeAction.Request putShutdownRequest = new PutShutdownNodeAction.Request( + nodeToRestartId, + SingleNodeShutdownMetadata.Type.RESTART, + this.getTestName(), + TimeValue.timeValueMillis(randomIntBetween(10, 1000)) + ); + AcknowledgedResponse putShutdownResponse = client().execute(PutShutdownNodeAction.INSTANCE, putShutdownRequest).get(); + assertTrue(putShutdownResponse.isAcknowledged()); + + // Actually stop the node + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeToRestartName)); + + // And the index should turn green again well within the 30-second timeout + ensureGreen("test"); + } + + public void testIndexLevelAllocationDelayWillBeUsedIfLongerThanShutdownDelay() throws Exception { + internalCluster().startNodes(3); + prepareCreate("test").setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "3h") // Use a long timeout we definitely won't hit + ).get(); + ensureGreen("test"); + indexRandomData(); + + final String nodeToRestartId = findIdOfNodeWithShard(); + final String nodeToRestartName = findNodeNameFromId(nodeToRestartId); + + // Mark the node for shutdown + PutShutdownNodeAction.Request putShutdownRequest = new PutShutdownNodeAction.Request( + nodeToRestartId, + SingleNodeShutdownMetadata.Type.RESTART, + this.getTestName(), + TimeValue.timeValueMillis(0) // No delay for reallocating these shards, IF this timeout is used. + ); + AcknowledgedResponse putShutdownResponse = client().execute(PutShutdownNodeAction.INSTANCE, putShutdownRequest).get(); + assertTrue(putShutdownResponse.isAcknowledged()); + + internalCluster().restartNode(nodeToRestartName, new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + assertBusy( + () -> { assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(1)); } + ); + return super.onNodeStopped(nodeName); + } + }); + + // And the index should turn green again + ensureGreen("test"); + } + + public void testShardAllocationTimeoutCanBeChanged() throws Exception { + String nodeToRestartId = setupLongTimeoutTestCase(); + + // Update the timeout on the shutdown request to something shorter + PutShutdownNodeAction.Request putShutdownRequest = new PutShutdownNodeAction.Request( + nodeToRestartId, + SingleNodeShutdownMetadata.Type.RESTART, + this.getTestName(), + TimeValue.timeValueMillis(1) + ); + AcknowledgedResponse putShutdownResponse = client().execute(PutShutdownNodeAction.INSTANCE, putShutdownRequest).get(); + assertTrue(putShutdownResponse.isAcknowledged()); + + // And the index should turn green again + ensureGreen("test"); + } + + public void testShardAllocationStartsImmediatelyIfShutdownDeleted() throws Exception { + String nodeToRestartId = setupLongTimeoutTestCase(); + + DeleteShutdownNodeAction.Request deleteShutdownRequest = new DeleteShutdownNodeAction.Request(nodeToRestartId); + AcknowledgedResponse deleteShutdownResponse = client().execute(DeleteShutdownNodeAction.INSTANCE, deleteShutdownRequest).get(); + assertTrue(deleteShutdownResponse.isAcknowledged()); + + // And the index should turn green again + ensureGreen("test"); + } + + /** + * Sets up a cluster and an index, picks a random node that has a shard, marks it for shutdown with a long timeout, and then stops the + * node. + * + * @return The ID of the node that was randomly chosen to be marked for shutdown and stopped. + */ + private String setupLongTimeoutTestCase() throws Exception { + internalCluster().startNodes(3); + prepareCreate("test").setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), 0) // Disable "normal" delayed allocation + ).get(); + ensureGreen("test"); + indexRandomData(); + + final String nodeToRestartId = findIdOfNodeWithShard(); + final String nodeToRestartName = findNodeNameFromId(nodeToRestartId); + + { + // Mark the node for shutdown with a delay that we'll never reach in the test + PutShutdownNodeAction.Request putShutdownRequest = new PutShutdownNodeAction.Request( + nodeToRestartId, + SingleNodeShutdownMetadata.Type.RESTART, + this.getTestName(), + TimeValue.timeValueHours(3) + ); + AcknowledgedResponse putShutdownResponse = client().execute(PutShutdownNodeAction.INSTANCE, putShutdownRequest).get(); + assertTrue(putShutdownResponse.isAcknowledged()); + } + + // Actually stop the node + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeToRestartName)); + + // Verify that the shard's allocation is delayed + assertBusy(() -> { assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(1)); }); + + return nodeToRestartId; + } + + private void indexRandomData() throws Exception { + int numDocs = scaledRandomIntBetween(100, 1000); + IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs]; + for (int i = 0; i < builders.length; i++) { + builders[i] = client().prepareIndex("test").setSource("field", "value"); + } + indexRandom(true, builders); + } + + private String findIdOfNodeWithShard() { + ClusterState state = client().admin().cluster().prepareState().get().getState(); + List startedShards = state.routingTable().shardsWithState(ShardRoutingState.STARTED); + Collections.shuffle(startedShards, random()); + return startedShards.get(0).currentNodeId(); + } + + private String findNodeNameFromId(String id) { + ClusterState state = client().admin().cluster().prepareState().get().getState(); + return state.nodes().get(id).getName(); + } +} diff --git a/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownPluginsIT.java b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownPluginsIT.java index 29325c3626277..f216dcd7562ac 100644 --- a/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownPluginsIT.java +++ b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownPluginsIT.java @@ -80,7 +80,7 @@ public void testShutdownAwarePlugin() throws Exception { // Mark the node as shutting down client().execute( PutShutdownNodeAction.INSTANCE, - new PutShutdownNodeAction.Request(shutdownNode, SingleNodeShutdownMetadata.Type.REMOVE, "removal for testing") + new PutShutdownNodeAction.Request(shutdownNode, SingleNodeShutdownMetadata.Type.REMOVE, "removal for testing", null) ).get(); GetShutdownStatusAction.Response getResp = client().execute( diff --git a/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownTasksIT.java b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownTasksIT.java index a55c3dab3ad38..dd50014d6a84d 100644 --- a/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownTasksIT.java +++ b/x-pack/plugin/shutdown/src/internalClusterTest/java/org/elasticsearch/xpack/shutdown/NodeShutdownTasksIT.java @@ -113,7 +113,7 @@ public void testTasksAreNotAssignedToShuttingDownNode() throws Exception { // Mark the node as shutting down client().execute( PutShutdownNodeAction.INSTANCE, - new PutShutdownNodeAction.Request(shutdownNode, SingleNodeShutdownMetadata.Type.REMOVE, "removal for testing") + new PutShutdownNodeAction.Request(shutdownNode, SingleNodeShutdownMetadata.Type.REMOVE, "removal for testing", null) ).get(); // Tell the persistent task executor it can start allocating the task diff --git a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/PutShutdownNodeAction.java b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/PutShutdownNodeAction.java index c3f7081185060..388227e5c6328 100644 --- a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/PutShutdownNodeAction.java +++ b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/PutShutdownNodeAction.java @@ -18,6 +18,8 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ParseField; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; import java.io.IOException; @@ -35,42 +37,49 @@ public static class Request extends AcknowledgedRequest { private final String nodeId; private final SingleNodeShutdownMetadata.Type type; private final String reason; + @Nullable + private final TimeValue allocationDelay; private static final ParseField TYPE_FIELD = new ParseField("type"); - public static final ParseField REASON_FIELD = new ParseField("reason"); + private static final ParseField REASON_FIELD = new ParseField("reason"); + private static final ParseField ALLOCATION_DELAY_FIELD = new ParseField("allocation_delay"); private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "put_node_shutdown_request", false, - (a, nodeId) -> new Request(nodeId, SingleNodeShutdownMetadata.Type.parse((String) a[0]), (String) a[1]) + (a, nodeId) -> new Request(nodeId, SingleNodeShutdownMetadata.Type.parse((String) a[0]), (String) a[1], (TimeValue) a[2]) ); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), TYPE_FIELD); PARSER.declareString(ConstructingObjectParser.constructorArg(), REASON_FIELD); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), ALLOCATION_DELAY_FIELD); } public static Request parseRequest(String nodeId, XContentParser parser) { return PARSER.apply(parser, nodeId); } - public Request(String nodeId, SingleNodeShutdownMetadata.Type type, String reason) { + public Request(String nodeId, SingleNodeShutdownMetadata.Type type, String reason, @Nullable TimeValue allocationDelay) { this.nodeId = nodeId; this.type = type; this.reason = reason; + this.allocationDelay = allocationDelay; } public Request(StreamInput in) throws IOException { this.nodeId = in.readString(); this.type = in.readEnum(SingleNodeShutdownMetadata.Type.class); this.reason = in.readString(); + this.allocationDelay = in.readOptionalTimeValue(); } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeString(this.nodeId); + out.writeString(nodeId); out.writeEnum(type); out.writeString(reason); + out.writeOptionalTimeValue(allocationDelay); } public String getNodeId() { @@ -85,6 +94,10 @@ public String getReason() { return reason; } + public TimeValue getAllocationDelay() { + return allocationDelay; + } + @Override public ActionRequestValidationException validate() { ActionRequestValidationException arve = new ActionRequestValidationException(); @@ -101,6 +114,10 @@ public ActionRequestValidationException validate() { arve.addValidationError("the reason for shutdown is required"); } + if (allocationDelay != null && SingleNodeShutdownMetadata.Type.RESTART.equals(type) == false) { + arve.addValidationError(ALLOCATION_DELAY_FIELD + " is only allowed for RESTART-type shutdown requests"); + } + if (arve.validationErrors().isEmpty() == false) { return arve; } else { diff --git a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportPutShutdownNodeAction.java b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportPutShutdownNodeAction.java index 19c06341b9003..034c9d0c4e154 100644 --- a/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportPutShutdownNodeAction.java +++ b/x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportPutShutdownNodeAction.java @@ -87,6 +87,7 @@ public ClusterState execute(ClusterState currentState) { .setType(request.getType()) .setReason(request.getReason()) .setStartedAtMillis(System.currentTimeMillis()) + .setAllocationDelay(request.getAllocationDelay()) .build(); return ClusterState.builder(currentState)