Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ public class ShutdownPersistentTasksStatus implements Writeable, ToXContentObjec
private final SingleNodeShutdownMetadata.Status status;

public ShutdownPersistentTasksStatus() {
this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS;
this.status = SingleNodeShutdownMetadata.Status.COMPLETE;
}

public ShutdownPersistentTasksStatus(StreamInput in) throws IOException {
this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS;
this.status = SingleNodeShutdownMetadata.Status.COMPLETE;
}

@Override
Expand All @@ -42,6 +42,10 @@ public void writeTo(StreamOutput out) throws IOException {

}

public SingleNodeShutdownMetadata.Status getStatus() {
return status;
}

@Override
public int hashCode() {
return status.hashCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,17 @@ public class ShutdownPluginsStatus implements Writeable, ToXContentObject {

private final SingleNodeShutdownMetadata.Status status;

public ShutdownPluginsStatus() {
this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS;
public ShutdownPluginsStatus(boolean safeToShutdown) {
this.status = safeToShutdown ? SingleNodeShutdownMetadata.Status.COMPLETE :
SingleNodeShutdownMetadata.Status.IN_PROGRESS;
}

public ShutdownPluginsStatus(StreamInput in) throws IOException {
this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS;
this.status = in.readEnum(SingleNodeShutdownMetadata.Status.class);
}

public SingleNodeShutdownMetadata.Status getStatus() {
return this.status;
}

@Override
Expand All @@ -39,7 +44,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws

@Override
public void writeTo(StreamOutput out) throws IOException {

out.writeEnum(this.status);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ public class ShutdownShardMigrationStatus implements Writeable, ToXContentObject
private final SingleNodeShutdownMetadata.Status status;

public ShutdownShardMigrationStatus() {
this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS;
this.status = SingleNodeShutdownMetadata.Status.COMPLETE;
}

public ShutdownShardMigrationStatus(StreamInput in) throws IOException {
this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS;
this.status = SingleNodeShutdownMetadata.Status.COMPLETE;
}

@Override
Expand All @@ -42,6 +42,10 @@ public void writeTo(StreamOutput out) throws IOException {

}

public SingleNodeShutdownMetadata.Status getStatus() {
return status;
}

@Override
public int hashCode() {
return status.hashCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Locale;
import java.util.Objects;

/**
Expand Down Expand Up @@ -234,16 +235,52 @@ public SingleNodeShutdownMetadata build() {
*/
public enum Type {
REMOVE,
RESTART
RESTART;

public static Type parse(String type) {
if ("remove".equals(type.toLowerCase(Locale.ROOT))) {
return REMOVE;
} else if ("restart".equals(type.toLowerCase(Locale.ROOT))) {
return RESTART;
} else {
throw new IllegalArgumentException("unknown shutdown type: " + type);
}
}
}

/**
* Describes the status of a component of shutdown.
*/
public enum Status {
// These are ordered (see #combine(...))
NOT_STARTED,
IN_PROGRESS,
STALLED,
COMPLETE
COMPLETE;

/**
* Merges multiple statuses into a single, final, status
*
* For example, if called with NOT_STARTED, IN_PROGRESS, and STALLED, the returned state is STALLED.
* Called with IN_PROGRESS, IN_PROGRESS, NOT_STARTED, the returned state is IN_PROGRESS.
* Called with IN_PROGRESS, NOT_STARTED, COMPLETE, the returned state is IN_PROGRESS
* Called with COMPLETE, COMPLETE, COMPLETE, the returned state is COMPLETE
* Called with an empty array, the returned state is COMPLETE
*/
public static Status combine(Status... statuses) {
int statusOrd = -1;
for (Status status : statuses) {
// Max the status up to, but not including, "complete"
if (status != COMPLETE) {
statusOrd = Math.max(status.ordinal(), statusOrd);
}
}
if (statusOrd == -1) {
// Either all the statuses were complete, or there were no statuses given
return COMPLETE;
} else {
return Status.values()[statusOrd];
}
Comment on lines +271 to +283
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really want to be a functional programming dork and tell you to use reduce here, but I think this is actually clearer than what you'd have to do to make reduce work.

}
}
}
7 changes: 7 additions & 0 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.plugins.ScriptPlugin;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.plugins.ShutdownAwarePlugin;
import org.elasticsearch.plugins.SystemIndexPlugin;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.repositories.RepositoriesService;
Expand All @@ -149,6 +150,7 @@
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.aggregations.support.AggregationUsageService;
import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.shutdown.PluginShutdownService;
import org.elasticsearch.snapshots.InternalSnapshotsInfoService;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotShardsService;
Expand Down Expand Up @@ -628,6 +630,10 @@ protected Node(final Environment initialEnvironment,
resourcesToClose.add(persistentTasksClusterService);
final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client);

final List<ShutdownAwarePlugin> shutdownAwarePlugins = pluginsService.filterPlugins(ShutdownAwarePlugin.class);
final PluginShutdownService pluginShutdownService = new PluginShutdownService(shutdownAwarePlugins);
clusterService.addListener(pluginShutdownService);

modules.add(b -> {
b.bind(Node.class).toInstance(this);
b.bind(NodeService.class).toInstance(nodeService);
Expand Down Expand Up @@ -689,6 +695,7 @@ protected Node(final Environment initialEnvironment,
b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator);
b.bind(FsHealthService.class).toInstance(fsHealthService);
b.bind(SystemIndices.class).toInstance(systemIndices);
b.bind(PluginShutdownService.class).toInstance(pluginShutdownService);
b.bind(ExecutorSelector.class).toInstance(executorSelector);
}
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.plugins;

import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;

import java.util.Collection;

/**
* A {@link ShutdownAwarePlugin} is a plugin that can be made aware of a shutdown. It comprises two
* parts, one part used for telling plugins that a set of nodes are going to be shut down
* ({@link #signalShutdown(Collection)}), the other for retrieving the status of those plugins
* as to whether it is safe to shut down ({@link #safeToShutdown(String, SingleNodeShutdownMetadata.Type)}
*/
public interface ShutdownAwarePlugin {

/**
* Whether the plugin is considered safe to shut down. This method is called when the status of
* a shutdown is retrieved via the API, and it is only called on the master node.
*/
boolean safeToShutdown(String nodeId, SingleNodeShutdownMetadata.Type shutdownType);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please consider adding an extra ClusterState parameter to this method. I think almost every implementation of this will involve looking for something in the cluster state.


/**
* A trigger to notify the plugin that a shutdown for the nodes has been triggered. This method
* will be called on every node for each cluster state, so it should return quickly.
*/
void signalShutdown(Collection<String> shutdownNodeIds);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please consider adding an extra ClusterState parameter to this method. I think almost every implementation of this will involve looking for something in the cluster state.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.shutdown;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.plugins.ShutdownAwarePlugin;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* The {@link PluginShutdownService} is used for the node shutdown infrastructure to signal to
* plugins that a shutdown is occurring, and to check whether it is safe to shut down.
*/
public class PluginShutdownService implements ClusterStateListener {

private static final Logger logger = LogManager.getLogger(PluginShutdownService.class);
public List<ShutdownAwarePlugin> plugins;

public PluginShutdownService(@Nullable List<ShutdownAwarePlugin> plugins) {
this.plugins = plugins == null ? Collections.emptyList() : plugins;
}

/**
* Return all nodes shutting down from the given cluster state
*/
public static Set<String> shutdownNodes(final ClusterState clusterState) {
return NodesShutdownMetadata.getShutdowns(clusterState)
.map(NodesShutdownMetadata::getAllNodeMetadataMap)
.map(Map::keySet)
.orElse(Collections.emptySet());
}

/**
* Return all nodes shutting down with the given shutdown type from the given cluster state
*/
public static Set<String> shutdownTypeNodes(final ClusterState clusterState, final SingleNodeShutdownMetadata.Type shutdownType) {
return NodesShutdownMetadata.getShutdowns(clusterState)
.map(NodesShutdownMetadata::getAllNodeMetadataMap)
.map(m -> m.entrySet().stream()
.filter(e -> e.getValue().getType() == shutdownType)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
.map(Map::keySet)
.orElse(Collections.emptySet());
}

/**
* Check with registered plugins whether the shutdown is safe for the given node id and type
*/
public boolean readyToShutdown(String nodeId, SingleNodeShutdownMetadata.Type shutdownType) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it's worth having this method take an extra ClusterState parameter, and pass that as an extra argument to each of the plugin.safeToShutdown calls it makes. It will make it easier for plugins that don't currently have their own ClusterService reference to implement the interface.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed adding these today, but since there's no current user, we're going to keep the cluster state out of the interface for now, and revisit it when ML (or a different plugin) has an implementation where they need these

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK cool. I am going to work on the ML PR soon, so can add the arguments to that if you don't have a fundamental objection.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so can add the arguments to that if you don't have a fundamental objection.

If possible, I think I'd prefer to keep them out of the interface. Especially for the signalShutdown method, if the cluster state is added it's essentially no different than a regular ClusterStateListener call. I think it's okay to add it to the readyToShutdown because that is a one-off call (not called on every cluster state change).

Would that work for you?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since both methods are implemented by the same class, it doesn't really help to just add the current cluster state to one of them. I can instead add a reference to the ClusterService to the class that implements the interface, and then that can be used in both methods.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be my preference then, as long as that isn't too distasteful of a solution for you.

// TODO: consider adding debugging information (a message about why not?)
// TODO: consider adding more fine-grained status rather than true/false
for (ShutdownAwarePlugin plugin : plugins) {
try {
if (plugin.safeToShutdown(nodeId, shutdownType) == false) {
logger.trace("shutdown aware plugin [{}] is not yet ready for shutdown", plugin);
return false;
}
} catch (Exception e) {
logger.warn("uncaught exception when retrieving whether plugin is ready for node shutdown", e);
}
}
return true;
}

/**
* Signal to plugins the nodes that are currently shutting down
*/
public void signalShutdown(final ClusterState state) {
Set<String> shutdownNodes = shutdownNodes(state);
for (ShutdownAwarePlugin plugin : plugins) {
try {
plugin.signalShutdown(shutdownNodes);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, it would be nice if state was passed as an extra argument here, so that plugins that don't currently have their own reference to ClusterService can look at the current cluster state.

} catch (Exception e) {
logger.warn(new ParameterizedMessage("uncaught exception when notifying plugins of nodes {} shutdown", shutdownNodes), e);
}
}
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
signalShutdown(event.state());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.cluster.metadata;

import org.elasticsearch.test.ESTestCase;

public class SingleNodeShutdownMetadataTests extends ESTestCase {
public void testStatusComination() {
SingleNodeShutdownMetadata.Status status;

status = SingleNodeShutdownMetadata.Status.combine(SingleNodeShutdownMetadata.Status.NOT_STARTED,
SingleNodeShutdownMetadata.Status.IN_PROGRESS,
SingleNodeShutdownMetadata.Status.STALLED);
assertEquals(status, SingleNodeShutdownMetadata.Status.STALLED);

status = SingleNodeShutdownMetadata.Status.combine(SingleNodeShutdownMetadata.Status.NOT_STARTED,
SingleNodeShutdownMetadata.Status.IN_PROGRESS,
SingleNodeShutdownMetadata.Status.NOT_STARTED);
assertEquals(status, SingleNodeShutdownMetadata.Status.IN_PROGRESS);

status = SingleNodeShutdownMetadata.Status.combine(SingleNodeShutdownMetadata.Status.NOT_STARTED,
SingleNodeShutdownMetadata.Status.NOT_STARTED,
SingleNodeShutdownMetadata.Status.NOT_STARTED);
assertEquals(status, SingleNodeShutdownMetadata.Status.NOT_STARTED);

status = SingleNodeShutdownMetadata.Status.combine(SingleNodeShutdownMetadata.Status.IN_PROGRESS,
SingleNodeShutdownMetadata.Status.IN_PROGRESS,
SingleNodeShutdownMetadata.Status.COMPLETE);
assertEquals(status, SingleNodeShutdownMetadata.Status.IN_PROGRESS);

status = SingleNodeShutdownMetadata.Status.combine(SingleNodeShutdownMetadata.Status.COMPLETE,
SingleNodeShutdownMetadata.Status.COMPLETE,
SingleNodeShutdownMetadata.Status.COMPLETE);
assertEquals(status, SingleNodeShutdownMetadata.Status.COMPLETE);

status = SingleNodeShutdownMetadata.Status.combine();
assertEquals(status, SingleNodeShutdownMetadata.Status.COMPLETE);
}
}
Loading