Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ protected ClusterHealthResponse newResponse() {
protected void masterOperation(final ClusterHealthRequest request, final ClusterState unusedState, final ActionListener<ClusterHealthResponse> listener) {
if (request.waitForEvents() != null) {
final long endTimeMS = TimeValue.nsecToMSec(System.nanoTime()) + request.timeout().millis();
clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])", request.waitForEvents(), new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("cluster_health (wait_for_events [" + request.waitForEvents() + "])", new ClusterStateUpdateTask(request.waitForEvents()) {
@Override
public ClusterState execute(ClusterState currentState) {
return currentState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ protected ClusterRerouteResponse newResponse() {

@Override
protected void masterOperation(final ClusterRerouteRequest request, final ClusterState state, final ActionListener<ClusterRerouteResponse> listener) {
clusterService.submitStateUpdateTask("cluster_reroute (api)", Priority.IMMEDIATE, new AckedClusterStateUpdateTask<ClusterRerouteResponse>(request, listener) {
clusterService.submitStateUpdateTask("cluster_reroute (api)", new AckedClusterStateUpdateTask<ClusterRerouteResponse>(Priority.IMMEDIATE, request, listener) {

private volatile ClusterState clusterStateToSend;
private volatile RoutingExplanations explanations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ protected void masterOperation(final ClusterUpdateSettingsRequest request, final
final Settings.Builder transientUpdates = Settings.settingsBuilder();
final Settings.Builder persistentUpdates = Settings.settingsBuilder();

clusterService.submitStateUpdateTask("cluster_update_settings", Priority.IMMEDIATE, new AckedClusterStateUpdateTask<ClusterUpdateSettingsResponse>(request, listener) {
clusterService.submitStateUpdateTask("cluster_update_settings",
new AckedClusterStateUpdateTask<ClusterUpdateSettingsResponse>(Priority.IMMEDIATE, request, listener) {

private volatile boolean changed = false;

Expand Down Expand Up @@ -132,7 +133,8 @@ private void reroute(final boolean updateSettingsAcked) {
// in the components (e.g. FilterAllocationDecider), so the changes made by the first call aren't visible
// to the components until the ClusterStateListener instances have been invoked, but are visible after
// the first update task has been completed.
clusterService.submitStateUpdateTask("reroute_after_cluster_update_settings", Priority.URGENT, new AckedClusterStateUpdateTask<ClusterUpdateSettingsResponse>(request, listener) {
clusterService.submitStateUpdateTask("reroute_after_cluster_update_settings",
new AckedClusterStateUpdateTask<ClusterUpdateSettingsResponse>(Priority.URGENT, request, listener) {

@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;

public interface AckedClusterStateTaskListener extends ClusterStateTaskListener {

/**
* Called to determine which nodes the acknowledgement is expected from
*
* @param discoveryNode a node
* @return true if the node is expected to send ack back, false otherwise
*/
boolean mustAck(DiscoveryNode discoveryNode);

/**
* Called once all the nodes have acknowledged the cluster state update request. Must be
* very lightweight execution, since it gets executed on the cluster service thread.
*
* @param t optional error that might have been thrown
*/
void onAllNodesAcked(@Nullable Throwable t);

/**
* Called once the acknowledgement timeout defined by
* {@link AckedClusterStateUpdateTask#ackTimeout()} has expired
*/
void onAckTimeout();

/**
* Acknowledgement timeout, maximum time interval to wait for acknowledgements
*/
TimeValue ackTimeout();

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,24 @@
import org.elasticsearch.cluster.ack.AckedRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.unit.TimeValue;

/**
* An extension interface to {@link ClusterStateUpdateTask} that allows to be notified when
* all the nodes have acknowledged a cluster state update request
*/
public abstract class AckedClusterStateUpdateTask<Response> extends ClusterStateUpdateTask {
public abstract class AckedClusterStateUpdateTask<Response> extends ClusterStateUpdateTask implements AckedClusterStateTaskListener {

private final ActionListener<Response> listener;
private final AckedRequest request;

protected AckedClusterStateUpdateTask(AckedRequest request, ActionListener<Response> listener) {
this(Priority.NORMAL, request, listener);
}

protected AckedClusterStateUpdateTask(Priority priority, AckedRequest request, ActionListener<Response> listener) {
super(priority);
this.listener = listener;
this.request = request;
}
Expand Down
36 changes: 29 additions & 7 deletions core/src/main/java/org/elasticsearch/cluster/ClusterService.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.unit.TimeValue;

Expand Down Expand Up @@ -101,12 +100,35 @@ public interface ClusterService extends LifecycleComponent<ClusterService> {
void add(@Nullable TimeValue timeout, TimeoutClusterStateListener listener);

/**
* Submits a task that will update the cluster state.
*/
void submitStateUpdateTask(final String source, Priority priority, final ClusterStateUpdateTask updateTask);

/**
* Submits a task that will update the cluster state (the task has a default priority of {@link Priority#NORMAL}).
* Submits a cluster state update task; submitted updates will be
* batched across the same instance of executor. The exact batching
* semantics depend on the underlying implementation but a rough
* guideline is that if the update task is submitted while there
* are pending update tasks for the same executor, these update
* tasks will all be executed on the executor in a single batch
*
* @param source the source of the cluster state update task
* @param task the state needed for the cluster state update task
* @param config the cluster state update task configuration
* @param executor the cluster state update task executor; tasks
* that share the same executor will be executed
* batches on this executor
* @param listener callback after the cluster state update task
* completes
* @param <T> the type of the cluster state update task state
*/
<T> void submitStateUpdateTask(final String source, final T task,
final ClusterStateTaskConfig config,
final ClusterStateTaskExecutor<T> executor,
final ClusterStateTaskListener listener);

/**
* Submits a cluster state update task; unlike {@link #submitStateUpdateTask(String, Object, ClusterStateTaskConfig, ClusterStateTaskExecutor, ClusterStateTaskListener)},
* submitted updates will not be batched.
*
* @param source the source of the cluster state update task
* @param updateTask the full context for the cluster state update
* task
*/
void submitStateUpdateTask(final String source, final ClusterStateUpdateTask updateTask);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.unit.TimeValue;

/**
* Cluster state update task configuration for timeout and priority
*/
public interface ClusterStateTaskConfig {
/**
* The timeout for this cluster state update task configuration. If
* the cluster state update task isn't processed within this
* timeout, the associated {@link ClusterStateTaskListener#onFailure(String, Throwable)}
* is invoked.
*
* @return the timeout, or null if one is not set
*/
@Nullable
TimeValue timeout();

/**
* The {@link Priority} for this cluster state update task configuration.
*
* @return the priority
*/
Priority priority();

/**
* Build a cluster state update task configuration with the
* specified {@link Priority} and no timeout.
*
* @param priority the priority for the associated cluster state
* update task
* @return the resulting cluster state update task configuration
*/
static ClusterStateTaskConfig build(Priority priority) {
return new Basic(priority, null);
}

/**
* Build a cluster state update task configuration with the
* specified {@link Priority} and timeout.
*
* @param priority the priority for the associated cluster state
* update task
* @param timeout the timeout for the associated cluster state
* update task
* @return the result cluster state update task configuration
*/
static ClusterStateTaskConfig build(Priority priority, TimeValue timeout) {
return new Basic(priority, timeout);
}

class Basic implements ClusterStateTaskConfig {
final TimeValue timeout;
final Priority priority;

public Basic(Priority priority, TimeValue timeout) {
this.timeout = timeout;
this.priority = priority;
}

@Override
public TimeValue timeout() {
return timeout;
}

@Override
public Priority priority() {
return priority;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;

import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

public interface ClusterStateTaskExecutor<T> {
/**
* Update the cluster state based on the current state and the given tasks. Return the *same instance* if no state
* should be changed.
*/
BatchResult<T> execute(ClusterState currentState, List<T> tasks) throws Exception;

/**
* indicates whether this task should only run if current node is master
*/
default boolean runOnlyOnMaster() {
return true;
}

/**
* Represents the result of a batched execution of cluster state update tasks
* @param <T> the type of the cluster state update task
*/
class BatchResult<T> {
final public ClusterState resultingState;
final public Map<T, TaskResult> executionResults;

/**
* Construct an execution result instance with a correspondence between the tasks and their execution result
* @param resultingState the resulting cluster state
* @param executionResults the correspondence between tasks and their outcome
*/
BatchResult(ClusterState resultingState, Map<T, TaskResult> executionResults) {
this.resultingState = resultingState;
this.executionResults = executionResults;
}

public static <T> Builder<T> builder() {
return new Builder<>();
}

public static class Builder<T> {
private final Map<T, TaskResult> executionResults = new IdentityHashMap<>();

public Builder<T> success(T task) {
return result(task, TaskResult.success());
}

public Builder<T> successes(Iterable<T> tasks) {
for (T task : tasks) {
success(task);
}
return this;
}

public Builder<T> failure(T task, Throwable t) {
return result(task, TaskResult.failure(t));
}

public Builder<T> failures(Iterable<T> tasks, Throwable t) {
for (T task : tasks) {
failure(task, t);
}
return this;
}

private Builder<T> result(T task, TaskResult executionResult) {
executionResults.put(task, executionResult);
return this;
}

public BatchResult<T> build(ClusterState resultingState) {
return new BatchResult<>(resultingState, executionResults);
}
}
}

final class TaskResult {
private final Throwable failure;

private static final TaskResult SUCCESS = new TaskResult(null);

public static TaskResult success() {
return SUCCESS;
}

public static TaskResult failure(Throwable failure) {
return new TaskResult(failure);
}

private TaskResult(Throwable failure) {
this.failure = failure;
}

public boolean isSuccess() {
return failure != null;
}

/**
* Handle the execution result with the provided consumers
* @param onSuccess handler to invoke on success
* @param onFailure handler to invoke on failure; the throwable passed through will not be null
*/
public void handle(Runnable onSuccess, Consumer<Throwable> onFailure) {
if (failure == null) {
onSuccess.run();
} else {
onFailure.accept(failure);
}
}
}
}
Loading