Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.cluster.service;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskListener;

import java.util.function.Supplier;

Expand All @@ -38,11 +37,29 @@ public interface ClusterApplier {
* @param clusterStateSupplier the cluster state supplier which provides the latest cluster state to apply
* @param listener callback that is invoked after cluster state is applied
*/
void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterStateTaskListener listener);
void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterApplyListener listener);

/**
* Creates a new cluster state builder that is initialized with the cluster name and all initial cluster state customs.
*/
ClusterState.Builder newClusterStateBuilder();

/**
* Listener for results of cluster state application
*/
interface ClusterApplyListener {
/**
* Called on successful cluster state application
* @param source information where the cluster state came from
*/
default void onSuccess(String source) {
}

/**
* Called on failure during cluster state application
* @param source information where the cluster state came from
* @param e exception that occurred
*/
void onFailure(String source, Exception e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.TimeoutClusterStateListener;
Expand Down Expand Up @@ -141,10 +140,10 @@ protected synchronized void doStart() {
}

class UpdateTask extends SourcePrioritizedRunnable implements Function<ClusterState, ClusterState> {
final ClusterStateTaskListener listener;
final ClusterApplyListener listener;
final Function<ClusterState, ClusterState> updateFunction;

UpdateTask(Priority priority, String source, ClusterStateTaskListener listener,
UpdateTask(Priority priority, String source, ClusterApplyListener listener,
Function<ClusterState, ClusterState> updateFunction) {
super(priority, source);
this.listener = listener;
Expand Down Expand Up @@ -301,7 +300,7 @@ public void run() {
}

public void runOnApplierThread(final String source, Consumer<ClusterState> clusterStateConsumer,
final ClusterStateTaskListener listener, Priority priority) {
final ClusterApplyListener listener, Priority priority) {
submitStateUpdateTask(source, ClusterStateTaskConfig.build(priority),
(clusterState) -> {
clusterStateConsumer.accept(clusterState);
Expand All @@ -311,13 +310,13 @@ public void runOnApplierThread(final String source, Consumer<ClusterState> clust
}

public void runOnApplierThread(final String source, Consumer<ClusterState> clusterStateConsumer,
final ClusterStateTaskListener listener) {
final ClusterApplyListener listener) {
runOnApplierThread(source, clusterStateConsumer, listener, Priority.HIGH);
}

@Override
public void onNewClusterState(final String source, final Supplier<ClusterState> clusterStateSupplier,
final ClusterStateTaskListener listener) {
final ClusterApplyListener listener) {
Function<ClusterState, ClusterState> applyFunction = currentState -> {
ClusterState nextState = clusterStateSupplier.get();
if (nextState != null) {
Expand All @@ -331,12 +330,12 @@ public void onNewClusterState(final String source, final Supplier<ClusterState>

private void submitStateUpdateTask(final String source, final ClusterStateTaskConfig config,
final Function<ClusterState, ClusterState> executor,
final ClusterStateTaskListener listener) {
final ClusterApplyListener listener) {
if (!lifecycle.started()) {
return;
}
try {
UpdateTask updateTask = new UpdateTask(config.priority(), source, new SafeClusterStateTaskListener(listener, logger), executor);
UpdateTask updateTask = new UpdateTask(config.priority(), source, new SafeClusterApplyListener(listener, logger), executor);
if (config.timeout() != null) {
threadPoolExecutor.execute(updateTask, config.timeout(),
() -> threadPool.generic().execute(
Expand Down Expand Up @@ -417,7 +416,7 @@ protected void runTask(UpdateTask task) {
}

if (previousClusterState == newClusterState) {
task.listener.clusterStateProcessed(task.source, newClusterState, newClusterState);
task.listener.onSuccess(task.source);
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
logger.debug("processing [{}]: took [{}] no change in cluster state", task.source, executionTime);
warnAboutSlowTaskIfNeeded(executionTime, task.source);
Expand Down Expand Up @@ -486,7 +485,7 @@ private void applyChanges(UpdateTask task, ClusterState previousClusterState, Cl

callClusterStateListeners(clusterChangedEvent);

task.listener.clusterStateProcessed(task.source, previousClusterState, newClusterState);
task.listener.onSuccess(task.source);
}

private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent) {
Expand All @@ -511,11 +510,11 @@ private void callClusterStateListeners(ClusterChangedEvent clusterChangedEvent)
});
}

private static class SafeClusterStateTaskListener implements ClusterStateTaskListener {
private final ClusterStateTaskListener listener;
private static class SafeClusterApplyListener implements ClusterApplyListener {
private final ClusterApplyListener listener;
private final Logger logger;

SafeClusterStateTaskListener(ClusterStateTaskListener listener, Logger logger) {
SafeClusterApplyListener(ClusterApplyListener listener, Logger logger) {
this.listener = listener;
this.logger = logger;
}
Expand All @@ -532,14 +531,12 @@ public void onFailure(String source, Exception e) {
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
try {
listener.clusterStateProcessed(source, oldState, newState);
listener.onSuccess(source);
} catch (Exception e) {
logger.error(new ParameterizedMessage(
"exception thrown by listener while notifying of cluster state processed from [{}], old cluster state:\n" +
"{}\nnew cluster state:\n{}",
source, oldState, newState), e);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only place where oldState and newState are used is in this log message.

"exception thrown by listener while notifying of cluster state processed from [{}]", source), e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,16 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.discovery.zen.PendingClusterStateStats;
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
Expand Down Expand Up @@ -65,9 +63,9 @@ public synchronized void publish(final ClusterChangedEvent event,
clusterState = event.state();
CountDownLatch latch = new CountDownLatch(1);

ClusterStateTaskListener listener = new ClusterStateTaskListener() {
ClusterApplyListener listener = new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
latch.countDown();
ackListener.onNodeAck(transportService.getLocalNode(), null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
Expand All @@ -34,12 +33,11 @@
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
Expand Down Expand Up @@ -789,9 +787,9 @@ boolean processNextCommittedClusterState(String reason) {

clusterApplier.onNewClusterState("apply cluster state (from master [" + reason + "])",
this::clusterState,
new ClusterStateTaskListener() {
new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
try {
pendingStatesQueue.markAsProcessed(newClusterState);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -135,9 +136,9 @@ public void testClusterStateUpdateLogging() throws Exception {
clusterApplierService.currentTimeOverride = System.nanoTime();
clusterApplierService.runOnApplierThread("test1",
currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(1).nanos(),
new ClusterStateTaskListener() {
new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
latch.countDown();
}

Expand All @@ -151,9 +152,9 @@ public void onFailure(String source, Exception e) {
clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(2).nanos();
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
},
new ClusterStateTaskListener() {
new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
fail();
}

Expand All @@ -166,9 +167,9 @@ public void onFailure(String source, Exception e) {
// We don't check logging for this on since there is no guarantee that it will occur before our check
clusterApplierService.runOnApplierThread("test3",
currentState -> {},
new ClusterStateTaskListener() {
new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
latch.countDown();
}

Expand Down Expand Up @@ -216,9 +217,9 @@ public void testLongClusterStateUpdateLogging() throws Exception {
clusterApplierService.currentTimeOverride = System.nanoTime();
clusterApplierService.runOnApplierThread("test1",
currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(1).nanos(),
new ClusterStateTaskListener() {
new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
latch.countDown();
processedFirstTask.countDown();
}
Expand All @@ -234,9 +235,9 @@ public void onFailure(String source, Exception e) {
clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(32).nanos();
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
},
new ClusterStateTaskListener() {
new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
fail();
}

Expand All @@ -247,9 +248,9 @@ public void onFailure(String source, Exception e) {
});
clusterApplierService.runOnApplierThread("test3",
currentState -> clusterApplierService.currentTimeOverride += TimeValue.timeValueSeconds(34).nanos(),
new ClusterStateTaskListener() {
new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
latch.countDown();
}

Expand All @@ -262,9 +263,9 @@ public void onFailure(String source, Exception e) {
// We don't check logging for this on since there is no guarantee that it will occur before our check
clusterApplierService.runOnApplierThread("test4",
currentState -> {},
new ClusterStateTaskListener() {
new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
latch.countDown();
}

Expand Down Expand Up @@ -340,10 +341,10 @@ public void testClusterStateApplierCantSampleClusterState() throws InterruptedEx

CountDownLatch latch = new CountDownLatch(1);
clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state()).build(),
new ClusterStateTaskListener() {
new ClusterApplyListener() {

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
latch.countDown();
}

Expand Down Expand Up @@ -390,9 +391,9 @@ public void onTimeout(TimeValue timeout) {

CountDownLatch latch = new CountDownLatch(1);
clusterApplierService.onNewClusterState("test", () -> ClusterState.builder(clusterApplierService.state()).build(),
new ClusterStateTaskListener() {
new ClusterApplyListener() {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
public void onSuccess(String source) {
latch.countDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplier;
Expand Down Expand Up @@ -72,9 +71,9 @@ public ClusterState.Builder newClusterStateBuilder() {

@Override
public void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier,
ClusterStateTaskListener listener) {
ClusterApplyListener listener) {
clusterState.set(clusterStateSupplier.get());
listener.clusterStateProcessed(source, clusterState.get(), clusterState.get());
listener.onSuccess(source);
}
});
discovery.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
Expand Down Expand Up @@ -314,8 +313,8 @@ public ClusterState.Builder newClusterStateBuilder() {
}

@Override
public void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterStateTaskListener listener) {
listener.clusterStateProcessed(source, clusterStateSupplier.get(), clusterStateSupplier.get());
public void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterApplyListener listener) {
listener.onSuccess(source);
}
};
ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, service, new NamedWriteableRegistry(ClusterModule.getNamedWriteables()),
Expand Down
Loading