Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ public class ShutdownPersistentTasksStatus implements Writeable, ToXContentObjec
private final SingleNodeShutdownMetadata.Status status;

public ShutdownPersistentTasksStatus() {
this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS;
this.status = SingleNodeShutdownMetadata.Status.COMPLETE;
}

public ShutdownPersistentTasksStatus(StreamInput in) throws IOException {
this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS;
this.status = SingleNodeShutdownMetadata.Status.COMPLETE;
}

@Override
Expand All @@ -42,6 +42,10 @@ public void writeTo(StreamOutput out) throws IOException {

}

public SingleNodeShutdownMetadata.Status getStatus() {
return status;
}

@Override
public int hashCode() {
return status.hashCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,17 @@ public class ShutdownPluginsStatus implements Writeable, ToXContentObject {

private final SingleNodeShutdownMetadata.Status status;

public ShutdownPluginsStatus() {
this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS;
public ShutdownPluginsStatus(boolean safeToShutdown) {
this.status = safeToShutdown ? SingleNodeShutdownMetadata.Status.COMPLETE :
SingleNodeShutdownMetadata.Status.IN_PROGRESS;
}

public ShutdownPluginsStatus(StreamInput in) throws IOException {
this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS;
this.status = in.readEnum(SingleNodeShutdownMetadata.Status.class);
}

public SingleNodeShutdownMetadata.Status getStatus() {
return this.status;
}

@Override
Expand All @@ -39,7 +44,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws

@Override
public void writeTo(StreamOutput out) throws IOException {

out.writeEnum(this.status);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ public class ShutdownShardMigrationStatus implements Writeable, ToXContentObject
private final SingleNodeShutdownMetadata.Status status;

public ShutdownShardMigrationStatus() {
this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS;
this.status = SingleNodeShutdownMetadata.Status.COMPLETE;
}

public ShutdownShardMigrationStatus(StreamInput in) throws IOException {
this.status = SingleNodeShutdownMetadata.Status.IN_PROGRESS;
this.status = SingleNodeShutdownMetadata.Status.COMPLETE;
}

@Override
Expand All @@ -42,6 +42,10 @@ public void writeTo(StreamOutput out) throws IOException {

}

public SingleNodeShutdownMetadata.Status getStatus() {
return status;
}

@Override
public int hashCode() {
return status.hashCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Locale;
import java.util.Objects;

/**
Expand Down Expand Up @@ -234,16 +235,52 @@ public SingleNodeShutdownMetadata build() {
*/
public enum Type {
REMOVE,
RESTART
RESTART;

public static Type parse(String type) {
if ("remove".equals(type.toLowerCase(Locale.ROOT))) {
return REMOVE;
} else if ("restart".equals(type.toLowerCase(Locale.ROOT))) {
return RESTART;
} else {
throw new IllegalArgumentException("unknown shutdown type: " + type);
}
}
}

/**
* Describes the status of a component of shutdown.
*/
public enum Status {
// These are ordered (see #combine(...))
NOT_STARTED,
IN_PROGRESS,
STALLED,
COMPLETE
COMPLETE;

/**
* Merges multiple statuses into a single, final, status
*
* For example, if called with NOT_STARTED, IN_PROGRESS, and STALLED, the returned state is STALLED.
* Called with IN_PROGRESS, IN_PROGRESS, NOT_STARTED, the returned state is IN_PROGRESS.
* Called with IN_PROGRESS, NOT_STARTED, COMPLETE, the returned state is IN_PROGRESS
* Called with COMPLETE, COMPLETE, COMPLETE, the returned state is COMPLETE
* Called with an empty array, the returned state is COMPLETE
*/
public static Status combine(Status... statuses) {
int statusOrd = -1;
for (Status status : statuses) {
// Max the status up to, but not including, "complete"
if (status != COMPLETE) {
statusOrd = Math.max(status.ordinal(), statusOrd);
}
}
if (statusOrd == -1) {
// Either all the statuses were complete, or there were no statuses given
return COMPLETE;
} else {
return Status.values()[statusOrd];
}
}
}
}
7 changes: 7 additions & 0 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.plugins.ScriptPlugin;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.plugins.ShutdownAwarePlugin;
import org.elasticsearch.plugins.SystemIndexPlugin;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.repositories.RepositoriesService;
Expand All @@ -152,6 +153,7 @@
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.aggregations.support.AggregationUsageService;
import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.shutdown.PluginShutdownService;
import org.elasticsearch.snapshots.InternalSnapshotsInfoService;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotShardsService;
Expand Down Expand Up @@ -697,6 +699,10 @@ protected Node(final Environment initialEnvironment,
resourcesToClose.add(persistentTasksClusterService);
final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client);

final List<ShutdownAwarePlugin> shutdownAwarePlugins = pluginsService.filterPlugins(ShutdownAwarePlugin.class);
final PluginShutdownService pluginShutdownService = new PluginShutdownService(shutdownAwarePlugins);
clusterService.addListener(pluginShutdownService);

modules.add(b -> {
b.bind(Node.class).toInstance(this);
b.bind(NodeService.class).toInstance(nodeService);
Expand Down Expand Up @@ -759,6 +765,7 @@ protected Node(final Environment initialEnvironment,
b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator);
b.bind(FsHealthService.class).toInstance(fsHealthService);
b.bind(SystemIndices.class).toInstance(systemIndices);
b.bind(PluginShutdownService.class).toInstance(pluginShutdownService);
b.bind(ExecutorSelector.class).toInstance(executorSelector);
}
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.plugins;

import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;

import java.util.Collection;

/**
* A {@link ShutdownAwarePlugin} is a plugin that can be made aware of a shutdown. It comprises two
* parts, one part used for telling plugins that a set of nodes are going to be shut down
* ({@link #signalShutdown(Collection)}), the other for retrieving the status of those plugins
* as to whether it is safe to shut down ({@link #safeToShutdown(String, SingleNodeShutdownMetadata.Type)}
*/
public interface ShutdownAwarePlugin {

/**
* Whether the plugin is considered safe to shut down. This method is called when the status of
* a shutdown is retrieved via the API, and it is only called on the master node.
*/
boolean safeToShutdown(String nodeId, SingleNodeShutdownMetadata.Type shutdownType);

/**
* A trigger to notify the plugin that a shutdown for the nodes has been triggered. This method
* will be called on every node for each cluster state, so it should return quickly.
*/
void signalShutdown(Collection<String> shutdownNodeIds);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.shutdown;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.plugins.ShutdownAwarePlugin;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* The {@link PluginShutdownService} is used for the node shutdown infrastructure to signal to
* plugins that a shutdown is occurring, and to check whether it is safe to shut down.
*/
public class PluginShutdownService implements ClusterStateListener {

private static final Logger logger = LogManager.getLogger(PluginShutdownService.class);
public List<ShutdownAwarePlugin> plugins;

public PluginShutdownService(@Nullable List<ShutdownAwarePlugin> plugins) {
this.plugins = plugins == null ? Collections.emptyList() : plugins;
}

/**
* Return all nodes shutting down from the given cluster state
*/
public static Set<String> shutdownNodes(final ClusterState clusterState) {
return NodesShutdownMetadata.getShutdowns(clusterState)
.map(NodesShutdownMetadata::getAllNodeMetadataMap)
.map(Map::keySet)
.orElse(Collections.emptySet());
}

/**
* Return all nodes shutting down with the given shutdown type from the given cluster state
*/
public static Set<String> shutdownTypeNodes(final ClusterState clusterState, final SingleNodeShutdownMetadata.Type shutdownType) {
return NodesShutdownMetadata.getShutdowns(clusterState)
.map(NodesShutdownMetadata::getAllNodeMetadataMap)
.map(m -> m.entrySet().stream()
.filter(e -> e.getValue().getType() == shutdownType)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
.map(Map::keySet)
.orElse(Collections.emptySet());
}

/**
* Check with registered plugins whether the shutdown is safe for the given node id and type
*/
public boolean readyToShutdown(String nodeId, SingleNodeShutdownMetadata.Type shutdownType) {
// TODO: consider adding debugging information (a message about why not?)
// TODO: consider adding more fine-grained status rather than true/false
for (ShutdownAwarePlugin plugin : plugins) {
try {
if (plugin.safeToShutdown(nodeId, shutdownType) == false) {
logger.trace("shutdown aware plugin [{}] is not yet ready for shutdown", plugin);
return false;
}
} catch (Exception e) {
logger.warn("uncaught exception when retrieving whether plugin is ready for node shutdown", e);
}
}
return true;
}

/**
* Signal to plugins the nodes that are currently shutting down
*/
public void signalShutdown(final ClusterState state) {
Set<String> shutdownNodes = shutdownNodes(state);
for (ShutdownAwarePlugin plugin : plugins) {
try {
plugin.signalShutdown(shutdownNodes);
} catch (Exception e) {
logger.warn(new ParameterizedMessage("uncaught exception when notifying plugins of nodes {} shutdown", shutdownNodes), e);
}
}
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
signalShutdown(event.state());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.cluster.metadata;

import org.elasticsearch.test.ESTestCase;

public class SingleNodeShutdownMetadataTests extends ESTestCase {
public void testStatusComination() {
SingleNodeShutdownMetadata.Status status;

status = SingleNodeShutdownMetadata.Status.combine(SingleNodeShutdownMetadata.Status.NOT_STARTED,
SingleNodeShutdownMetadata.Status.IN_PROGRESS,
SingleNodeShutdownMetadata.Status.STALLED);
assertEquals(status, SingleNodeShutdownMetadata.Status.STALLED);

status = SingleNodeShutdownMetadata.Status.combine(SingleNodeShutdownMetadata.Status.NOT_STARTED,
SingleNodeShutdownMetadata.Status.IN_PROGRESS,
SingleNodeShutdownMetadata.Status.NOT_STARTED);
assertEquals(status, SingleNodeShutdownMetadata.Status.IN_PROGRESS);

status = SingleNodeShutdownMetadata.Status.combine(SingleNodeShutdownMetadata.Status.NOT_STARTED,
SingleNodeShutdownMetadata.Status.NOT_STARTED,
SingleNodeShutdownMetadata.Status.NOT_STARTED);
assertEquals(status, SingleNodeShutdownMetadata.Status.NOT_STARTED);

status = SingleNodeShutdownMetadata.Status.combine(SingleNodeShutdownMetadata.Status.IN_PROGRESS,
SingleNodeShutdownMetadata.Status.IN_PROGRESS,
SingleNodeShutdownMetadata.Status.COMPLETE);
assertEquals(status, SingleNodeShutdownMetadata.Status.IN_PROGRESS);

status = SingleNodeShutdownMetadata.Status.combine(SingleNodeShutdownMetadata.Status.COMPLETE,
SingleNodeShutdownMetadata.Status.COMPLETE,
SingleNodeShutdownMetadata.Status.COMPLETE);
assertEquals(status, SingleNodeShutdownMetadata.Status.COMPLETE);

status = SingleNodeShutdownMetadata.Status.combine();
assertEquals(status, SingleNodeShutdownMetadata.Status.COMPLETE);
}
}
Loading