Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
f2d6666
First cut of an integration test (fails, obviously)
gwbrown Jul 20, 2021
29641f4
WIP - test passes!
gwbrown Jul 21, 2021
b87dfc4
Clean up getRemainingDelay params + reinstate unit test
gwbrown Jul 21, 2021
4f8f0bd
Back to taking a `Metadata` since we need a setting...
gwbrown Jul 21, 2021
a8fde87
Add a bunch of unit tests
gwbrown Jul 21, 2021
8eadf32
Add shard allocation delay to `SingleNodeShutdownMetadata`
gwbrown Jul 21, 2021
917bde5
Value plumbed into getRemainingDelay + more tests
gwbrown Jul 21, 2021
4e9c299
spotless
gwbrown Jul 21, 2021
cd2a67a
Imports
gwbrown Jul 21, 2021
1b2df0a
EMPTY_MAP -> emptyMap()
gwbrown Jul 21, 2021
acb871a
More integration tests
gwbrown Jul 22, 2021
edf9afa
spotless
gwbrown Jul 22, 2021
28fea44
Fix compilation in ML tests
gwbrown Jul 22, 2021
34e983e
Change version constant name & add comment per review
gwbrown Jul 26, 2021
a933857
Remove unnecessary null check per review
gwbrown Jul 26, 2021
219045a
Remove potentially-flaky assert per review.
gwbrown Jul 26, 2021
1159b86
Remove timeout override on assert per review
gwbrown Jul 26, 2021
bcab443
Move default reallocation delay to getting per review
gwbrown Jul 26, 2021
47324db
Remove unnecessary unit test
gwbrown Jul 26, 2021
523977e
Rename + change incorrect comment on unit test
gwbrown Jul 26, 2021
0630dd6
Use raw allocation delay value in equals/hashcode
gwbrown Aug 2, 2021
4906783
Adjust `delayed` calculation per review to...
gwbrown Aug 2, 2021
9769679
Use randomFrom(Type.value()) to generate test shutdown metadata per r…
gwbrown Aug 2, 2021
2d27b21
Use `restartNode()` instead of explicitly stopping/starting per review
gwbrown Aug 2, 2021
b0437d9
Clean up timeouts per review
gwbrown Aug 2, 2021
0c157f0
Factor out shared test logic per review
gwbrown Aug 2, 2021
b57f7cd
Add type & allocation delay validation to PutShutdownNodeAction.Reque…
gwbrown Aug 2, 2021
a2e4b07
Change delay field to `allocation_delay`
gwbrown Aug 2, 2021
1de72bc
spotless
gwbrown Aug 2, 2021
4c1b1aa
Merge branch 'master' into decom/delayed-shard-reassignment-on-restart
gwbrown Aug 4, 2021
30c1c3f
Change default delay when restarting to 5m
gwbrown Aug 4, 2021
9a309ac
Use maximum of `index-level delay` and `restart delay`.
gwbrown Aug 4, 2021
b34b2b0
Determine delay based on shutdown state at time shard became unassigned
gwbrown Aug 4, 2021
1621069
Merge branch 'master' into decom/delayed-shard-reassignment-on-restart
gwbrown Aug 4, 2021
0e23e09
Switch to NODE_RESTARTING enum value instead of separate boolean
gwbrown Aug 5, 2021
44113f5
Merge branch 'master' into decom/delayed-shard-reassignment-on-restart
gwbrown Aug 5, 2021
9df8ac2
Rename to `allocationDelay`
gwbrown Aug 5, 2021
c2bb729
Add assert per review
gwbrown Aug 5, 2021
010c5f6
Add space to error message
gwbrown Aug 6, 2021
49d1545
Merge branch 'master' into decom/delayed-shard-reassignment-on-restart
gwbrown Aug 11, 2021
8bc7442
Ensure we don't try to set lastNodeId if the reason isn't NODE_LEFT o…
gwbrown Aug 11, 2021
ca51467
Expand comment per review
gwbrown Aug 13, 2021
0491493
Modify assert to require lastAllocatedNodeId for NODE_RESTARTING
gwbrown Aug 13, 2021
dc5d851
Revert "Ensure we don't try to set lastNodeId if the reason isn't NOD…
gwbrown Aug 13, 2021
3269afd
Compute reason once per node when disassociating dead nodes, instead …
gwbrown Aug 13, 2021
6b2f2d1
Remove unused code block per review
gwbrown Aug 13, 2021
1a45d0f
Adjust UnassignedInfo test instance creation per review
gwbrown Aug 13, 2021
8066104
Adjust test delays per review
gwbrown Aug 13, 2021
cdb3f6e
imports
gwbrown Aug 13, 2021
b9f98f0
Assert that NODE_RESTARTING requires lastAllocatedNodeId + fix tests
gwbrown Aug 13, 2021
65f5650
Merge branch 'master' into decom/delayed-shard-reassignment-on-restart
gwbrown Aug 16, 2021
b090715
Alter test to generated `lastAllocatedNodeId` if necessary
gwbrown Aug 16, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.cluster;

import com.carrotsearch.hppc.cursors.ObjectCursor;

import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.CoordinationMetadata;
Expand Down Expand Up @@ -58,7 +59,7 @@
import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.metadata.AliasMetadata.newAliasMetadataBuilder;
import static org.elasticsearch.cluster.routing.RandomShardRoutingMutator.randomChange;
import static org.elasticsearch.cluster.routing.RandomShardRoutingMutator.randomReason;
import static org.elasticsearch.cluster.routing.UnassignedInfoTests.randomUnassignedInfo;
import static org.elasticsearch.test.VersionUtils.randomVersion;
import static org.elasticsearch.test.XContentTestUtils.convertToMap;
import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder;
Expand Down Expand Up @@ -270,7 +271,7 @@ private IndexRoutingTable randomIndexRoutingTable(String index, String[] nodeIds
for (int j = 0; j < replicaCount; j++) {
UnassignedInfo unassignedInfo = null;
if (randomInt(5) == 1) {
unassignedInfo = new UnassignedInfo(randomReason(), randomAlphaOfLength(10));
unassignedInfo = randomUnassignedInfo(randomAlphaOfLength(10));
}
if (availableNodeIds.isEmpty()) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,12 @@ public Map<String, DataStreamAlias> dataStreamAliases() {
.orElse(Collections.emptyMap());
}

public Map<String, SingleNodeShutdownMetadata> nodeShutdowns() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not really part of this review, but I wonder if we could not risk seeing multiple shutdown indications for the same node, for instance both a RESTART and REMOVE or REPLACE? I think of ECK in particular here, but might also be relevant in cloud.

Copy link
Contributor Author

@gwbrown gwbrown Aug 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, there's a couple things that prevent this:

  1. In TransportPutShutdownNodeAction when we get a PUT for a node that already has a record, it's updated rather than added to, and
  2. The data structure used to store the SingleNodeShutdownMetadata (the Map in the line you're commenting on) is keyed by node UUID, so it should be impossible to have multiple records for the same key/nodeId.

Since the node id is duplicated in the SingleNodeShutdownMetadata as well, it's conceivable that in the case of a bug we could end up with a mismatch between the id used for keying and the id used in the object, but I don't think that's likely.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, sorry if this was unclear, what I meant was whether it could be a reasonable use case to have both a RESTART and one of the two others at the same time. Not that there is anything wrong in this PR or anything, more wanted to bring this to your attention for possible discussion (and maybe you discussed it already and discarded the use case?).

return Optional.ofNullable((NodesShutdownMetadata) this.custom(NodesShutdownMetadata.TYPE))
.map(NodesShutdownMetadata::getAllNodeMetadataMap)
.orElse(Collections.emptyMap());
}

public ImmutableOpenMap<String, Custom> customs() {
return this.customs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.common.xcontent.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ParseField;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@

import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.Diffable;
import org.elasticsearch.common.xcontent.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ParseField;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;

import java.io.IOException;
import java.util.Locale;
Expand All @@ -35,14 +38,16 @@ public class SingleNodeShutdownMetadata extends AbstractDiffable<SingleNodeShutd
public static final ParseField REASON_FIELD = new ParseField("reason");
public static final String STARTED_AT_READABLE_FIELD = "shutdown_started";
public static final ParseField STARTED_AT_MILLIS_FIELD = new ParseField(STARTED_AT_READABLE_FIELD + "millis");
public static final ParseField ALLOCATION_DELAY_FIELD = new ParseField("allocation_delay");

public static final ConstructingObjectParser<SingleNodeShutdownMetadata, Void> PARSER = new ConstructingObjectParser<>(
"node_shutdown_info",
a -> new SingleNodeShutdownMetadata(
(String) a[0],
Type.valueOf((String) a[1]),
(String) a[2],
(long) a[3]
(long) a[3],
(TimeValue) a[4]
)
);

Expand All @@ -51,16 +56,24 @@ public class SingleNodeShutdownMetadata extends AbstractDiffable<SingleNodeShutd
PARSER.declareString(ConstructingObjectParser.constructorArg(), TYPE_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), REASON_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), STARTED_AT_MILLIS_FIELD);
PARSER.declareField(
ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.textOrNull(), ALLOCATION_DELAY_FIELD.getPreferredName()), ALLOCATION_DELAY_FIELD,
ObjectParser.ValueType.STRING_OR_NULL
);
}

public static SingleNodeShutdownMetadata parse(XContentParser parser) {
return PARSER.apply(parser, null);
}

public static final TimeValue DEFAULT_RESTART_SHARD_ALLOCATION_DELAY = TimeValue.timeValueMinutes(5);

private final String nodeId;
private final Type type;
private final String reason;
private final long startedAtMillis;
@Nullable private final TimeValue allocationDelay;

/**
* @param nodeId The node ID that this shutdown metadata refers to.
Expand All @@ -72,19 +85,25 @@ private SingleNodeShutdownMetadata(
String nodeId,
Type type,
String reason,
long startedAtMillis
long startedAtMillis,
@Nullable TimeValue allocationDelay
) {
this.nodeId = Objects.requireNonNull(nodeId, "node ID must not be null");
this.type = Objects.requireNonNull(type, "shutdown type must not be null");
this.reason = Objects.requireNonNull(reason, "shutdown reason must not be null");
this.startedAtMillis = startedAtMillis;
if (allocationDelay != null && Type.RESTART.equals(type) == false) {
throw new IllegalArgumentException("shard allocation delay is only valid for RESTART-type shutdowns");
}
this.allocationDelay = allocationDelay;
}

public SingleNodeShutdownMetadata(StreamInput in) throws IOException {
this.nodeId = in.readString();
this.type = in.readEnum(Type.class);
this.reason = in.readString();
this.startedAtMillis = in.readVLong();
this.allocationDelay = in.readOptionalTimeValue();
}

/**
Expand Down Expand Up @@ -115,12 +134,27 @@ public long getStartedAtMillis() {
return startedAtMillis;
}

/**
* @return The amount of time shard reallocation should be delayed for shards on this node, so that they will not be automatically
* reassigned while the node is restarting. Will be {@code null} for non-restart shutdowns.
*/
@Nullable
public TimeValue getAllocationDelay() {
if (allocationDelay != null) {
return allocationDelay;
} else if (Type.RESTART.equals(type)) {
return DEFAULT_RESTART_SHARD_ALLOCATION_DELAY;
}
return null;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(nodeId);
out.writeEnum(type);
out.writeString(reason);
out.writeVLong(startedAtMillis);
out.writeOptionalTimeValue(allocationDelay);
}

@Override
Expand All @@ -131,6 +165,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(TYPE_FIELD.getPreferredName(), type);
builder.field(REASON_FIELD.getPreferredName(), reason);
builder.timeField(STARTED_AT_MILLIS_FIELD.getPreferredName(), STARTED_AT_READABLE_FIELD, startedAtMillis);
if (allocationDelay != null) {
builder.field(ALLOCATION_DELAY_FIELD.getPreferredName(), allocationDelay.getStringRep());
}
}
builder.endObject();

Expand All @@ -145,7 +182,8 @@ public boolean equals(Object o) {
return getStartedAtMillis() == that.getStartedAtMillis()
&& getNodeId().equals(that.getNodeId())
&& getType() == that.getType()
&& getReason().equals(that.getReason());
&& getReason().equals(that.getReason())
&& Objects.equals(allocationDelay, that.allocationDelay);
}

@Override
Expand All @@ -154,7 +192,8 @@ public int hashCode() {
getNodeId(),
getType(),
getReason(),
getStartedAtMillis()
getStartedAtMillis(),
allocationDelay
);
}

Expand All @@ -178,6 +217,7 @@ public static class Builder {
private Type type;
private String reason;
private long startedAtMillis = -1;
private TimeValue allocationDelay;

private Builder() {}

Expand Down Expand Up @@ -217,15 +257,25 @@ public Builder setStartedAtMillis(long startedAtMillis) {
return this;
}

/**
* @param allocationDelay The amount of time shard reallocation should be delayed while this node is offline.
* @return This builder.
*/
public Builder setAllocationDelay(TimeValue allocationDelay) {
this.allocationDelay = allocationDelay;
return this;
}

public SingleNodeShutdownMetadata build() {
if (startedAtMillis == -1) {
throw new IllegalArgumentException("start timestamp must be set");
}

return new SingleNodeShutdownMetadata(
nodeId,
type,
reason,
startedAtMillis
startedAtMillis, allocationDelay
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,20 @@
package org.elasticsearch.cluster.routing;

import com.carrotsearch.hppc.cursors.ObjectCursor;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.Assertions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -65,6 +67,8 @@ public class RoutingNodes implements Iterable<RoutingNode> {

private final Map<ShardId, List<ShardRouting>> assignedShards = new HashMap<>();

private final Map<String, SingleNodeShutdownMetadata> nodeShutdowns;

private final boolean readOnly;

private int inactivePrimaryCount = 0;
Expand All @@ -83,6 +87,7 @@ public RoutingNodes(ClusterState clusterState) {
public RoutingNodes(ClusterState clusterState, boolean readOnly) {
this.readOnly = readOnly;
final RoutingTable routingTable = clusterState.routingTable();
nodeShutdowns = clusterState.metadata().nodeShutdowns();

Map<String, LinkedHashMap<ShardId, ShardRouting>> nodesToShards = new HashMap<>();
// fill in the nodeToShards with the "live" nodes
Expand Down Expand Up @@ -533,9 +538,17 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId
// re-resolve replica as earlier iteration could have changed source/target of replica relocation
ShardRouting replicaShard = getByAllocationId(routing.shardId(), routing.allocationId().getId());
assert replicaShard != null : "failed to re-resolve " + routing + " when failing replicas";
UnassignedInfo primaryFailedUnassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED,
"primary failed while replica initializing", null, 0, unassignedInfo.getUnassignedTimeInNanos(),
unassignedInfo.getUnassignedTimeInMillis(), false, AllocationStatus.NO_ATTEMPT, Collections.emptySet());
UnassignedInfo primaryFailedUnassignedInfo = new UnassignedInfo(
UnassignedInfo.Reason.PRIMARY_FAILED,
"primary failed while replica initializing",
null,
0,
unassignedInfo.getUnassignedTimeInNanos(),
unassignedInfo.getUnassignedTimeInMillis(),
false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit annoying that we have to calculate nodeIsRestarting when we pass in false to delayed anyway. I wonder if we were better off by making delayed an enum with values "NORMAL", "SHUTDOWN" (and possibly use "NO" instead of null)? That would avoid calculating nodeIsRestarting above for no good reason.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This no longer needs to be calculated here, as the reason is PRIMARY_FAILED.

AllocationStatus.NO_ATTEMPT,
Collections.emptySet(),
routing.currentNodeId());
failShard(logger, replicaShard, primaryFailedUnassignedInfo, indexMetadata, routingChangesObserver);
}
}
Expand Down Expand Up @@ -858,10 +871,17 @@ public void ignoreShard(ShardRouting shard, AllocationStatus allocationStatus, R
UnassignedInfo currInfo = shard.unassignedInfo();
assert currInfo != null;
if (allocationStatus.equals(currInfo.getLastAllocationStatus()) == false) {
UnassignedInfo newInfo = new UnassignedInfo(currInfo.getReason(), currInfo.getMessage(), currInfo.getFailure(),
currInfo.getNumFailedAllocations(), currInfo.getUnassignedTimeInNanos(),
currInfo.getUnassignedTimeInMillis(), currInfo.isDelayed(),
allocationStatus, currInfo.getFailedNodeIds());
UnassignedInfo newInfo = new UnassignedInfo(
currInfo.getReason(),
currInfo.getMessage(),
currInfo.getFailure(),
currInfo.getNumFailedAllocations(),
currInfo.getUnassignedTimeInNanos(),
currInfo.getUnassignedTimeInMillis(),
currInfo.isDelayed(),
allocationStatus,
currInfo.getFailedNodeIds(),
currInfo.getLastAllocatedNodeId());
ShardRouting updatedShard = shard.updateUnassigned(newInfo, shard.recoverySource());
changes.unassignedInfoUpdated(shard, newInfo);
shard = updatedShard;
Expand Down
Loading