Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.metadata.MetadataMappingService;
import org.elasticsearch.cluster.metadata.MetadataUpdateSettingsService;
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.routing.DelayedAllocationService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
Expand Down Expand Up @@ -132,6 +133,7 @@ public static List<Entry> getNamedWriteables() {
registerMetadataCustom(entries, ComposableIndexTemplateMetadata.TYPE, ComposableIndexTemplateMetadata::new,
ComposableIndexTemplateMetadata::readDiffFrom);
registerMetadataCustom(entries, DataStreamMetadata.TYPE, DataStreamMetadata::new, DataStreamMetadata::readDiffFrom);
registerMetadataCustom(entries, NodesShutdownMetadata.TYPE, NodesShutdownMetadata::new, NodesShutdownMetadata::readDiffFrom);

// Task Status (not Diffable)
entries.add(new Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME, PersistentTasksNodeService.Status::new));
Expand All @@ -157,6 +159,8 @@ public static List<NamedXContentRegistry.Entry> getNamedXWriteables() {
ComposableIndexTemplateMetadata::fromXContent));
entries.add(new NamedXContentRegistry.Entry(Metadata.Custom.class, new ParseField(DataStreamMetadata.TYPE),
DataStreamMetadata::fromXContent));
entries.add(new NamedXContentRegistry.Entry(Metadata.Custom.class, new ParseField(NodesShutdownMetadata.TYPE),
NodesShutdownMetadata::fromXContent));
return entries;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.cluster.metadata;

import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Objects;

/**
* Contains information about the status of a single component (e.g. `shard_migration`, `persistent_tasks`) of the node shutdown process.
*/
public class NodeShutdownComponentStatus extends AbstractDiffable<NodeShutdownComponentStatus> implements ToXContentFragment {
private final SingleNodeShutdownMetadata.Status status;
@Nullable private final Long startedAtMillis;
@Nullable private final String errorMessage;

private static final ParseField STATUS_FIELD = new ParseField("status");
private static final ParseField TIME_STARTED_FIELD = new ParseField("time_started_millis");
private static final ParseField ERROR_FIELD = new ParseField("error");
private static final ConstructingObjectParser<NodeShutdownComponentStatus, Void> PARSER = new ConstructingObjectParser<>(
"node_shutdown_component",
a -> new NodeShutdownComponentStatus(SingleNodeShutdownMetadata.Status.valueOf((String) a[0]), (Long) a[1], (String) a[2]));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), STATUS_FIELD);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), TIME_STARTED_FIELD);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), ERROR_FIELD);
}

public static NodeShutdownComponentStatus parse(XContentParser parser) {
return PARSER.apply(parser, null);
}

public NodeShutdownComponentStatus(
SingleNodeShutdownMetadata.Status status,
@Nullable Long startedAtMillis,
@Nullable String errorMessage) {
this.status = status;
this.startedAtMillis = startedAtMillis;
this.errorMessage = errorMessage;
}

public NodeShutdownComponentStatus(StreamInput in) throws IOException {
this.status = in.readEnum(SingleNodeShutdownMetadata.Status.class);
this.startedAtMillis = in.readOptionalVLong();
this.errorMessage = in.readOptionalString();
}

/**
* @return The overall status of this component.
*/
public SingleNodeShutdownMetadata.Status getStatus() {
return status;
}

/**
* @return The timestamp this component started shutting down. Null if the component has not yet started shutting down.
*/
@Nullable public Long getStartedAtMillis() {
return startedAtMillis;
}

/**
* @return The error message this component encountered while trying to shut down, if any. Null if no errors have been encountered.
*/
@Nullable public String getErrorMessage() {
return errorMessage;
}

@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
builder.field(STATUS_FIELD.getPreferredName(), status);
if (startedAtMillis != null) {
builder.timeField(TIME_STARTED_FIELD.getPreferredName(), "time_started", startedAtMillis);
}
if (errorMessage != null) {
builder.field(ERROR_FIELD.getPreferredName(), errorMessage);
}
}
builder.endObject();
return builder;
}

@Override public void writeTo(StreamOutput out) throws IOException {
out.writeEnum(status);
out.writeOptionalVLong(startedAtMillis);
out.writeOptionalString(errorMessage);
}

@Override public boolean equals(Object o) {
if (this == o)
return true;
if ((o instanceof NodeShutdownComponentStatus) == false)
return false;
NodeShutdownComponentStatus that = (NodeShutdownComponentStatus) o;
return getStatus() == that.getStatus() && Objects.equals(getStartedAtMillis(), that.getStartedAtMillis()) && Objects.equals(
getErrorMessage(),
that.getErrorMessage());
}

@Override public int hashCode() {
return Objects.hash(getStatus(), getStartedAtMillis(), getErrorMessage());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.cluster.metadata;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* Contains the data about nodes which are currently configured to shut down, either permanently or temporarily.
*
* Stored in the cluster state as custom metadata.
*/
public class NodesShutdownMetadata implements Metadata.Custom {
public static final String TYPE = "node_shutdown";
public static final Version NODE_SHUTDOWN_VERSION = Version.V_8_0_0;

private static final ParseField NODES_FIELD = new ParseField("nodes");

@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<NodesShutdownMetadata, Void> PARSER = new ConstructingObjectParser<>(TYPE, a -> {
final Map<String, SingleNodeShutdownMetadata> nodes = ((List<SingleNodeShutdownMetadata>) a[0]).stream()
.collect(Collectors.toMap(SingleNodeShutdownMetadata::getNodeId, Function.identity()));
return new NodesShutdownMetadata(nodes);
});

static {
PARSER.declareNamedObjects(
ConstructingObjectParser.constructorArg(),
(p, c, n) -> SingleNodeShutdownMetadata.parse(p),
v -> { throw new IllegalArgumentException("ordered " + NODES_FIELD.getPreferredName() + " are not supported"); },
NODES_FIELD
);
}

public static NodesShutdownMetadata fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}

public static NamedDiff<Metadata.Custom> readDiffFrom(StreamInput in) throws IOException {
return new NodeShutdownMetadataDiff(in);
}

private final Map<String, SingleNodeShutdownMetadata> nodes;

public NodesShutdownMetadata(Map<String, SingleNodeShutdownMetadata> nodes) {
this.nodes = nodes;
}

public NodesShutdownMetadata(StreamInput in) throws IOException {
this.nodes = in.readMap(StreamInput::readString, SingleNodeShutdownMetadata::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(nodes, StreamOutput::writeString, (outStream, v) -> v.writeTo(outStream));
}

/**
* Retrieve the data about nodes which are currently in the process of shutting down.
* @return A map of node IDs to information about the node's shutdown status.
*/
public Map<String, SingleNodeShutdownMetadata> getPerNodeInfo() {
return Collections.unmodifiableMap(nodes);
}

@Override
public Diff<Metadata.Custom> diff(Metadata.Custom previousState) {
return new NodeShutdownMetadataDiff((NodesShutdownMetadata) previousState, this);
}

@Override
public EnumSet<Metadata.XContentContext> context() {
return Metadata.ALL_CONTEXTS;
}

@Override
public String getWriteableName() {
return TYPE;
}

@Override
public Version getMinimalSupportedVersion() {
return NODE_SHUTDOWN_VERSION;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if ((o instanceof NodesShutdownMetadata) == false) return false;
NodesShutdownMetadata that = (NodesShutdownMetadata) o;
return getPerNodeInfo().equals(that.getPerNodeInfo());
}

@Override
public int hashCode() {
return Objects.hash(getPerNodeInfo());
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(NODES_FIELD.getPreferredName(), nodes);
return builder;
}

/**
* Handles diffing and appling diffs for {@link NodesShutdownMetadata} as necessary for the cluster state infrastructure.
*/
public static class NodeShutdownMetadataDiff implements NamedDiff<Metadata.Custom> {

private final Diff<Map<String, SingleNodeShutdownMetadata>> nodesDiff;

NodeShutdownMetadataDiff(NodesShutdownMetadata before, NodesShutdownMetadata after) {
this.nodesDiff = DiffableUtils.diff(before.nodes, after.nodes, DiffableUtils.getStringKeySerializer());
}

public NodeShutdownMetadataDiff(StreamInput in) throws IOException {
this.nodesDiff = DiffableUtils.readJdkMapDiff(
in,
DiffableUtils.getStringKeySerializer(),
SingleNodeShutdownMetadata::new,
NodeShutdownMetadataDiff::readNodesDiffFrom
);
}

@Override
public Metadata.Custom apply(Metadata.Custom part) {
TreeMap<String, SingleNodeShutdownMetadata> newNodes = new TreeMap<>(
nodesDiff.apply(((NodesShutdownMetadata) part).getPerNodeInfo())
);
return new NodesShutdownMetadata(newNodes);
}

@Override
public String getWriteableName() {
return TYPE;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
nodesDiff.writeTo(out);
}

static Diff<SingleNodeShutdownMetadata> readNodesDiffFrom(StreamInput in) throws IOException {
return AbstractDiffable.readDiffFrom(SingleNodeShutdownMetadata::new, in);
}
}

}
Loading