diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 95c20e6ee99a2..27fad07235e9f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -22,6 +22,7 @@ import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService; import org.elasticsearch.cluster.metadata.MetadataMappingService; import org.elasticsearch.cluster.metadata.MetadataUpdateSettingsService; +import org.elasticsearch.cluster.metadata.NodesShutdownMetadata; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; import org.elasticsearch.cluster.routing.DelayedAllocationService; import org.elasticsearch.cluster.routing.allocation.AllocationService; @@ -132,6 +133,7 @@ public static List getNamedWriteables() { registerMetadataCustom(entries, ComposableIndexTemplateMetadata.TYPE, ComposableIndexTemplateMetadata::new, ComposableIndexTemplateMetadata::readDiffFrom); registerMetadataCustom(entries, DataStreamMetadata.TYPE, DataStreamMetadata::new, DataStreamMetadata::readDiffFrom); + registerMetadataCustom(entries, NodesShutdownMetadata.TYPE, NodesShutdownMetadata::new, NodesShutdownMetadata::readDiffFrom); // Task Status (not Diffable) entries.add(new Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME, PersistentTasksNodeService.Status::new)); @@ -157,6 +159,8 @@ public static List getNamedXWriteables() { ComposableIndexTemplateMetadata::fromXContent)); entries.add(new NamedXContentRegistry.Entry(Metadata.Custom.class, new ParseField(DataStreamMetadata.TYPE), DataStreamMetadata::fromXContent)); + entries.add(new NamedXContentRegistry.Entry(Metadata.Custom.class, new ParseField(NodesShutdownMetadata.TYPE), + NodesShutdownMetadata::fromXContent)); return entries; } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/NodeShutdownComponentStatus.java b/server/src/main/java/org/elasticsearch/cluster/metadata/NodeShutdownComponentStatus.java new file mode 100644 index 0000000000000..67b6ec9048022 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/NodeShutdownComponentStatus.java @@ -0,0 +1,120 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.cluster.AbstractDiffable; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentFragment; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Objects; + +/** + * Contains information about the status of a single component (e.g. `shard_migration`, `persistent_tasks`) of the node shutdown process. + */ +public class NodeShutdownComponentStatus extends AbstractDiffable implements ToXContentFragment { + private final SingleNodeShutdownMetadata.Status status; + @Nullable private final Long startedAtMillis; + @Nullable private final String errorMessage; + + private static final ParseField STATUS_FIELD = new ParseField("status"); + private static final ParseField TIME_STARTED_FIELD = new ParseField("time_started_millis"); + private static final ParseField ERROR_FIELD = new ParseField("error"); + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "node_shutdown_component", + a -> new NodeShutdownComponentStatus(SingleNodeShutdownMetadata.Status.valueOf((String) a[0]), (Long) a[1], (String) a[2])); + + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), STATUS_FIELD); + PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), TIME_STARTED_FIELD); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), ERROR_FIELD); + } + + public static NodeShutdownComponentStatus parse(XContentParser parser) { + return PARSER.apply(parser, null); + } + + public NodeShutdownComponentStatus( + SingleNodeShutdownMetadata.Status status, + @Nullable Long startedAtMillis, + @Nullable String errorMessage) { + this.status = status; + this.startedAtMillis = startedAtMillis; + this.errorMessage = errorMessage; + } + + public NodeShutdownComponentStatus(StreamInput in) throws IOException { + this.status = in.readEnum(SingleNodeShutdownMetadata.Status.class); + this.startedAtMillis = in.readOptionalVLong(); + this.errorMessage = in.readOptionalString(); + } + + /** + * @return The overall status of this component. + */ + public SingleNodeShutdownMetadata.Status getStatus() { + return status; + } + + /** + * @return The timestamp this component started shutting down. Null if the component has not yet started shutting down. + */ + @Nullable public Long getStartedAtMillis() { + return startedAtMillis; + } + + /** + * @return The error message this component encountered while trying to shut down, if any. Null if no errors have been encountered. + */ + @Nullable public String getErrorMessage() { + return errorMessage; + } + + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + { + builder.field(STATUS_FIELD.getPreferredName(), status); + if (startedAtMillis != null) { + builder.timeField(TIME_STARTED_FIELD.getPreferredName(), "time_started", startedAtMillis); + } + if (errorMessage != null) { + builder.field(ERROR_FIELD.getPreferredName(), errorMessage); + } + } + builder.endObject(); + return builder; + } + + @Override public void writeTo(StreamOutput out) throws IOException { + out.writeEnum(status); + out.writeOptionalVLong(startedAtMillis); + out.writeOptionalString(errorMessage); + } + + @Override public boolean equals(Object o) { + if (this == o) + return true; + if ((o instanceof NodeShutdownComponentStatus) == false) + return false; + NodeShutdownComponentStatus that = (NodeShutdownComponentStatus) o; + return getStatus() == that.getStatus() && Objects.equals(getStartedAtMillis(), that.getStartedAtMillis()) && Objects.equals( + getErrorMessage(), + that.getErrorMessage()); + } + + @Override public int hashCode() { + return Objects.hash(getStatus(), getStartedAtMillis(), getErrorMessage()); + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/NodesShutdownMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/NodesShutdownMetadata.java new file mode 100644 index 0000000000000..81859c0b138e4 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/NodesShutdownMetadata.java @@ -0,0 +1,173 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.AbstractDiffable; +import org.elasticsearch.cluster.Diff; +import org.elasticsearch.cluster.DiffableUtils; +import org.elasticsearch.cluster.NamedDiff; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Contains the data about nodes which are currently configured to shut down, either permanently or temporarily. + * + * Stored in the cluster state as custom metadata. + */ +public class NodesShutdownMetadata implements Metadata.Custom { + public static final String TYPE = "node_shutdown"; + public static final Version NODE_SHUTDOWN_VERSION = Version.V_8_0_0; + + private static final ParseField NODES_FIELD = new ParseField("nodes"); + + @SuppressWarnings("unchecked") + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(TYPE, a -> { + final Map nodes = ((List) a[0]).stream() + .collect(Collectors.toMap(SingleNodeShutdownMetadata::getNodeId, Function.identity())); + return new NodesShutdownMetadata(nodes); + }); + + static { + PARSER.declareNamedObjects( + ConstructingObjectParser.constructorArg(), + (p, c, n) -> SingleNodeShutdownMetadata.parse(p), + v -> { throw new IllegalArgumentException("ordered " + NODES_FIELD.getPreferredName() + " are not supported"); }, + NODES_FIELD + ); + } + + public static NodesShutdownMetadata fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } + + public static NamedDiff readDiffFrom(StreamInput in) throws IOException { + return new NodeShutdownMetadataDiff(in); + } + + private final Map nodes; + + public NodesShutdownMetadata(Map nodes) { + this.nodes = nodes; + } + + public NodesShutdownMetadata(StreamInput in) throws IOException { + this.nodes = in.readMap(StreamInput::readString, SingleNodeShutdownMetadata::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(nodes, StreamOutput::writeString, (outStream, v) -> v.writeTo(outStream)); + } + + /** + * Retrieve the data about nodes which are currently in the process of shutting down. + * @return A map of node IDs to information about the node's shutdown status. + */ + public Map getPerNodeInfo() { + return Collections.unmodifiableMap(nodes); + } + + @Override + public Diff diff(Metadata.Custom previousState) { + return new NodeShutdownMetadataDiff((NodesShutdownMetadata) previousState, this); + } + + @Override + public EnumSet context() { + return Metadata.ALL_CONTEXTS; + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public Version getMinimalSupportedVersion() { + return NODE_SHUTDOWN_VERSION; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if ((o instanceof NodesShutdownMetadata) == false) return false; + NodesShutdownMetadata that = (NodesShutdownMetadata) o; + return getPerNodeInfo().equals(that.getPerNodeInfo()); + } + + @Override + public int hashCode() { + return Objects.hash(getPerNodeInfo()); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(NODES_FIELD.getPreferredName(), nodes); + return builder; + } + + /** + * Handles diffing and appling diffs for {@link NodesShutdownMetadata} as necessary for the cluster state infrastructure. + */ + public static class NodeShutdownMetadataDiff implements NamedDiff { + + private final Diff> nodesDiff; + + NodeShutdownMetadataDiff(NodesShutdownMetadata before, NodesShutdownMetadata after) { + this.nodesDiff = DiffableUtils.diff(before.nodes, after.nodes, DiffableUtils.getStringKeySerializer()); + } + + public NodeShutdownMetadataDiff(StreamInput in) throws IOException { + this.nodesDiff = DiffableUtils.readJdkMapDiff( + in, + DiffableUtils.getStringKeySerializer(), + SingleNodeShutdownMetadata::new, + NodeShutdownMetadataDiff::readNodesDiffFrom + ); + } + + @Override + public Metadata.Custom apply(Metadata.Custom part) { + TreeMap newNodes = new TreeMap<>( + nodesDiff.apply(((NodesShutdownMetadata) part).getPerNodeInfo()) + ); + return new NodesShutdownMetadata(newNodes); + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + nodesDiff.writeTo(out); + } + + static Diff readNodesDiffFrom(StreamInput in) throws IOException { + return AbstractDiffable.readDiffFrom(SingleNodeShutdownMetadata::new, in); + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadata.java new file mode 100644 index 0000000000000..8bbe23ae73f2e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/SingleNodeShutdownMetadata.java @@ -0,0 +1,227 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.cluster.AbstractDiffable; +import org.elasticsearch.cluster.Diffable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Objects; + +/** + * Contains data about a single node's shutdown readiness. + */ +public class SingleNodeShutdownMetadata extends AbstractDiffable + implements + ToXContentObject, + Diffable { + + private static final ParseField NODE_ID_FIELD = new ParseField("node_id"); + private static final ParseField TYPE_FIELD = new ParseField("type"); + private static final ParseField REASON_FIELD = new ParseField("reason"); + private static final ParseField STATUS_FIELD = new ParseField("shutdown_status"); + private static final String STARTED_AT_READABLE_FIELD = "shutdown_started"; + private static final ParseField STARTED_AT_MILLIS_FIELD = new ParseField(STARTED_AT_READABLE_FIELD + "millis"); + private static final ParseField SHARD_MIGRATION_FIELD = new ParseField("shard_migration"); + private static final ParseField PERSISTENT_TASKS_FIELD = new ParseField("persistent_tasks"); + private static final ParseField PLUGINS_STATUS = new ParseField("plugins"); + + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "node_shutdown_info", + a -> new SingleNodeShutdownMetadata( + (String) a[0], + Type.valueOf((String) a[1]), + (String) a[2], + Status.valueOf((String) a[3]), + (long) a[4], + (NodeShutdownComponentStatus) a[5], + (NodeShutdownComponentStatus) a[6], + (NodeShutdownComponentStatus) a[7] + ) + ); + + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), NODE_ID_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), TYPE_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), REASON_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), STATUS_FIELD); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), STARTED_AT_MILLIS_FIELD); + PARSER.declareObject( + ConstructingObjectParser.constructorArg(), + (parser, context) -> NodeShutdownComponentStatus.parse(parser), + SHARD_MIGRATION_FIELD + ); + PARSER.declareObject( + ConstructingObjectParser.constructorArg(), + (parser, context) -> NodeShutdownComponentStatus.parse(parser), + PERSISTENT_TASKS_FIELD + ); + PARSER.declareObject( + ConstructingObjectParser.constructorArg(), + (parser, context) -> NodeShutdownComponentStatus.parse(parser), + PLUGINS_STATUS + ); + } + + public static SingleNodeShutdownMetadata parse(XContentParser parser) { + return PARSER.apply(parser, null); + } + + private final String nodeId; + private final Type type; + private final String reason; + private final Status status; + private final long startedAtMillis; + private final NodeShutdownComponentStatus shardMigrationStatus; + private final NodeShutdownComponentStatus persistentTasksStatus; + private final NodeShutdownComponentStatus pluginsStatus; + + + public SingleNodeShutdownMetadata( + String nodeId, + Type type, + String reason, + Status status, + long startedAtMillis, + NodeShutdownComponentStatus shardMigrationStatus, + NodeShutdownComponentStatus persistentTasksStatus, NodeShutdownComponentStatus pluginsStatus) { + this.nodeId = Objects.requireNonNull(nodeId, "node ID must not be null"); + this.type = Objects.requireNonNull(type, "shutdown type must not be null"); + this.reason = Objects.requireNonNull(reason, "shutdown reason must not be null"); + this.status = status; + this.startedAtMillis = startedAtMillis; + this.shardMigrationStatus = Objects.requireNonNull(shardMigrationStatus, "shard migration status must not be null"); + this.persistentTasksStatus = Objects.requireNonNull(persistentTasksStatus, "persistent tasks status must not be null"); + this.pluginsStatus = Objects.requireNonNull(pluginsStatus, "plugins status must not be null"); + } + + public SingleNodeShutdownMetadata(StreamInput in) throws IOException { + this.nodeId = in.readString(); + this.type = in.readEnum(Type.class); + this.reason = in.readString(); + this.status = in.readEnum(Status.class); + this.startedAtMillis = in.readVLong(); + this.shardMigrationStatus = new NodeShutdownComponentStatus(in); + this.persistentTasksStatus = new NodeShutdownComponentStatus(in); + this.pluginsStatus = new NodeShutdownComponentStatus(in); + } + + /** + * @return The ID of the node this {@link SingleNodeShutdownMetadata} concerns. + */ + public String getNodeId() { + return nodeId; + } + + /** + * @return The type of shutdown this is (shutdown vs. permanent). + */ + public Type getType() { + return type; + } + + /** + * @return The user-supplied reason this node is shutting down. + */ + public String getReason() { + return reason; + } + + /** + * @return The status of this node's shutdown. + */ + public Status isStatus() { + return status; + } + + /** + * @return The timestamp that this shutdown procedure was started. + */ + public long getStartedAtMillis() { + return startedAtMillis; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(nodeId); + out.writeEnum(type); + out.writeString(reason); + out.writeEnum(status); + out.writeVLong(startedAtMillis); + shardMigrationStatus.writeTo(out); + persistentTasksStatus.writeTo(out); + pluginsStatus.writeTo(out); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + { + builder.field(NODE_ID_FIELD.getPreferredName(), nodeId); + builder.field(TYPE_FIELD.getPreferredName(), type); + builder.field(REASON_FIELD.getPreferredName(), reason); + builder.field(STATUS_FIELD.getPreferredName(), status); + builder.timeField(STARTED_AT_MILLIS_FIELD.getPreferredName(), STARTED_AT_READABLE_FIELD, startedAtMillis); + builder.field(SHARD_MIGRATION_FIELD.getPreferredName(), shardMigrationStatus); + builder.field(PERSISTENT_TASKS_FIELD.getPreferredName(), persistentTasksStatus); + builder.field(PLUGINS_STATUS.getPreferredName(), pluginsStatus); + } + builder.endObject(); + + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if ((o instanceof SingleNodeShutdownMetadata) == false) return false; + SingleNodeShutdownMetadata that = (SingleNodeShutdownMetadata) o; + return getStartedAtMillis() == that.getStartedAtMillis() + && getNodeId().equals(that.getNodeId()) + && getType() == that.getType() + && getReason().equals(that.getReason()) + && status == that.status + && shardMigrationStatus.equals(that.shardMigrationStatus) + && persistentTasksStatus.equals(that.persistentTasksStatus) + && pluginsStatus.equals(that.pluginsStatus); + } + + @Override + public int hashCode() { + return Objects.hash( + getNodeId(), + getType(), + getReason(), + status, + getStartedAtMillis(), + shardMigrationStatus, + persistentTasksStatus, + pluginsStatus + ); + } + + public enum Type { + REMOVE, + RESTART + } + + public enum Status { + NOT_STARTED, + IN_PROGRESS, + STALLED, + COMPLETE + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/NodesShutdownMetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/NodesShutdownMetadataTests.java new file mode 100644 index 0000000000000..e971eda0f672f --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/NodesShutdownMetadataTests.java @@ -0,0 +1,80 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.cluster.Diff; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractDiffableSerializationTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class NodesShutdownMetadataTests extends AbstractDiffableSerializationTestCase { + + @Override + protected Writeable.Reader> diffReader() { + return NodesShutdownMetadata.NodeShutdownMetadataDiff::new; + } + + @Override + protected NodesShutdownMetadata doParseInstance(XContentParser parser) throws IOException { + return NodesShutdownMetadata.fromXContent(parser); + } + + @Override + protected Writeable.Reader instanceReader() { + return NodesShutdownMetadata::new; + } + + @Override + protected NodesShutdownMetadata createTestInstance() { + Map nodes = randomList(0, 10, this::randomNodeShutdownInfo).stream() + .collect(Collectors.toMap(SingleNodeShutdownMetadata::getNodeId, Function.identity())); + return new NodesShutdownMetadata(nodes); + } + + private SingleNodeShutdownMetadata randomNodeShutdownInfo() { + return new SingleNodeShutdownMetadata( + randomAlphaOfLength(5), + randomBoolean() ? SingleNodeShutdownMetadata.Type.REMOVE : SingleNodeShutdownMetadata.Type.RESTART, + randomAlphaOfLength(5), + randomStatus(), + randomNonNegativeLong(), + randomComponentStatus(), + randomComponentStatus(), + randomComponentStatus()); + } + + private SingleNodeShutdownMetadata.Status randomStatus() { + return randomFrom(new ArrayList<>(EnumSet.allOf(SingleNodeShutdownMetadata.Status.class))); + } + + private NodeShutdownComponentStatus randomComponentStatus() { + return new NodeShutdownComponentStatus( + randomStatus(), + randomBoolean() ? null : randomNonNegativeLong(), + randomBoolean() ? null : randomAlphaOfLengthBetween(4, 10) + ); + } + + @Override + protected Metadata.Custom makeTestChanges(Metadata.Custom testInstance) { + return randomValueOtherThan(testInstance, this::createTestInstance); + } + + @Override + protected Metadata.Custom mutateInstance(Metadata.Custom instance) throws IOException { + return makeTestChanges(instance); + } +}