Skip to content

Commit c8ef2e1

Browse files
authored
Thread safe clean up of LocalNodeModeListeners (#60007)
This commit continues on the work in #59801 and makes other implementors of the LocalNodeMasterListener interface thread safe in that they will no longer allow the callbacks to run on different threads and possibly race each other. This also helps address other issues where these events could be queued to wait for execution while the service keeps moving forward thinking it is the master even when that is not the case. In order to accomplish this, the LocalNodeMasterListener no longer has the executorName() method to prevent future uses that could encounter this surprising behavior. Each use was inspected and if the class was also a ClusterStateListener, the implementation of LocalNodeMasterListener was removed in favor of a single listener that combined the logic. A single listener is used and there is currently no guarantee on execution order between ClusterStateListeners and LocalNodeMasterListeners, so a future change there could cause undesired consequences. For other classes, the implementations of the callbacks were inspected and if the operations were lightweight, the overriden executorName method was removed to use the default, which runs on the same thread. Backport of #59932
1 parent 702c997 commit c8ef2e1

File tree

15 files changed

+156
-126
lines changed

15 files changed

+156
-126
lines changed

server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,6 @@ public InternalClusterInfoService(Settings settings, ClusterService clusterServi
111111
clusterSettings.addSettingsUpdateConsumer(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING, this::setUpdateFrequency);
112112
clusterSettings.addSettingsUpdateConsumer(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING,
113113
this::setEnabled);
114-
115-
// listen for state changes (this node starts/stops being the elected master, or new nodes are added)
116-
clusterService.addListener(this);
117114
}
118115

119116
private void setEnabled(boolean enabled) {

server/src/main/java/org/elasticsearch/cluster/LocalNodeMasterListener.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,5 @@ public interface LocalNodeMasterListener {
3333
* Called when the local node used to be the master, a new master was elected and it's no longer the local node.
3434
*/
3535
void offMaster();
36-
37-
/**
38-
* The name of the executor that the implementation of the callbacks of this lister should be executed on. The thread
39-
* that is responsible for managing instances of this lister is the same thread handling the cluster state events. If
40-
* the work done is the callbacks above is inexpensive, this value may be
41-
* {@link org.elasticsearch.threadpool.ThreadPool.Names#SAME SAME} (indicating that the callbacks will run on the same thread
42-
* as the cluster state events are fired with). On the other hand, if the logic in the callbacks are heavier and take
43-
* longer to process (or perhaps involve blocking due to IO operations), prefer to execute them on a separate more appropriate
44-
* executor (eg. {@link org.elasticsearch.threadpool.ThreadPool.Names#GENERIC GENERIC}
45-
* or {@link org.elasticsearch.threadpool.ThreadPool.Names#MANAGEMENT MANAGEMENT}).
46-
*
47-
* @return The name of the executor that will run the callbacks of this listener.
48-
*/
49-
String executorName();
50-
5136
}
5237

server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java

Lines changed: 13 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public ClusterApplierService(String nodeName, Settings settings, ClusterSettings
114114
this.clusterSettings = clusterSettings;
115115
this.threadPool = threadPool;
116116
this.state = new AtomicReference<>();
117-
this.localNodeMasterListeners = new LocalNodeMasterListeners(threadPool);
117+
this.localNodeMasterListeners = new LocalNodeMasterListeners();
118118
this.nodeName = nodeName;
119119

120120
this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);
@@ -611,29 +611,30 @@ public void run() {
611611
private static class LocalNodeMasterListeners implements ClusterStateListener {
612612

613613
private final List<LocalNodeMasterListener> listeners = new CopyOnWriteArrayList<>();
614-
private final ThreadPool threadPool;
615614
private volatile boolean master = false;
616615

617-
private LocalNodeMasterListeners(ThreadPool threadPool) {
618-
this.threadPool = threadPool;
616+
private LocalNodeMasterListeners() {
619617
}
620618

621619
@Override
622620
public void clusterChanged(ClusterChangedEvent event) {
623621
if (!master && event.localNodeMaster()) {
624622
master = true;
625623
for (LocalNodeMasterListener listener : listeners) {
626-
java.util.concurrent.Executor executor = threadPool.executor(listener.executorName());
627-
executor.execute(new OnMasterRunnable(listener));
624+
try {
625+
listener.onMaster();
626+
} catch (Exception e) {
627+
logger.warn("failed to notify LocalNodeMasterListener", e);
628+
}
628629
}
629-
return;
630-
}
631-
632-
if (master && !event.localNodeMaster()) {
630+
} else if (master && !event.localNodeMaster()) {
633631
master = false;
634632
for (LocalNodeMasterListener listener : listeners) {
635-
java.util.concurrent.Executor executor = threadPool.executor(listener.executorName());
636-
executor.execute(new OffMasterRunnable(listener));
633+
try {
634+
listener.offMaster();
635+
} catch (Exception e) {
636+
logger.warn("failed to notify LocalNodeMasterListener", e);
637+
}
637638
}
638639
}
639640
}
@@ -644,34 +645,6 @@ private void add(LocalNodeMasterListener listener) {
644645

645646
}
646647

647-
private static class OnMasterRunnable implements Runnable {
648-
649-
private final LocalNodeMasterListener listener;
650-
651-
private OnMasterRunnable(LocalNodeMasterListener listener) {
652-
this.listener = listener;
653-
}
654-
655-
@Override
656-
public void run() {
657-
listener.onMaster();
658-
}
659-
}
660-
661-
private static class OffMasterRunnable implements Runnable {
662-
663-
private final LocalNodeMasterListener listener;
664-
665-
private OffMasterRunnable(LocalNodeMasterListener listener) {
666-
this.listener = listener;
667-
}
668-
669-
@Override
670-
public void run() {
671-
listener.offMaster();
672-
}
673-
}
674-
675648
// this one is overridden in tests so we can control time
676649
protected long currentTimeInMillis() {
677650
return threadPool.relativeTimeInMillis();

server/src/main/java/org/elasticsearch/common/settings/ConsistentSettingsService.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@
2929
import org.elasticsearch.common.Priority;
3030
import org.elasticsearch.common.UUIDs;
3131
import org.elasticsearch.common.hash.MessageDigests;
32-
import org.elasticsearch.threadpool.ThreadPool;
3332

33+
import javax.crypto.SecretKey;
34+
import javax.crypto.SecretKeyFactory;
35+
import javax.crypto.spec.PBEKeySpec;
3436
import java.nio.charset.StandardCharsets;
3537
import java.security.NoSuchAlgorithmException;
3638
import java.security.spec.InvalidKeySpecException;
@@ -45,10 +47,6 @@
4547
import java.util.concurrent.atomic.AtomicBoolean;
4648
import java.util.function.Consumer;
4749

48-
import javax.crypto.SecretKey;
49-
import javax.crypto.SecretKeyFactory;
50-
import javax.crypto.spec.PBEKeySpec;
51-
5250
/**
5351
* Used to publish secure setting hashes in the cluster state and to validate those hashes against the local values of those same settings.
5452
* This is colloquially referred to as the secure setting consistency check. It will publish and verify hashes only for the collection
@@ -247,11 +245,6 @@ public void onFailure(String source, Exception e) {
247245
public void offMaster() {
248246
logger.trace("I am no longer master, nothing to do");
249247
}
250-
251-
@Override
252-
public String executorName() {
253-
return ThreadPool.Names.SAME;
254-
}
255248
}
256249

257250
}

server/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1137,7 +1137,10 @@ private List<NetworkService.CustomNameResolver> getCustomNameResolvers(List<Disc
11371137
/** Constructs a ClusterInfoService which may be mocked for tests. */
11381138
protected ClusterInfoService newClusterInfoService(Settings settings, ClusterService clusterService,
11391139
ThreadPool threadPool, NodeClient client) {
1140-
return new InternalClusterInfoService(settings, clusterService, threadPool, client);
1140+
final InternalClusterInfoService service = new InternalClusterInfoService(settings, clusterService, threadPool, client);
1141+
// listen for state changes (this node starts/stops being the elected master, or new nodes are added)
1142+
clusterService.addListener(service);
1143+
return service;
11411144
}
11421145

11431146
/** Constructs a {@link org.elasticsearch.http.HttpServerTransport} which may be mocked for tests. */

server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
8282

8383
final FakeClusterInfoServiceClient client = new FakeClusterInfoServiceClient(threadPool);
8484
final InternalClusterInfoService clusterInfoService = new InternalClusterInfoService(settings, clusterService, threadPool, client);
85+
clusterService.addListener(clusterInfoService);
8586
clusterInfoService.addListener(ignored -> {});
8687

8788
clusterService.setNodeConnectionsService(ClusterServiceUtils.createNoOpNodeConnectionsService());

server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -293,11 +293,6 @@ public void onMaster() {
293293
public void offMaster() {
294294
isMaster.set(false);
295295
}
296-
297-
@Override
298-
public String executorName() {
299-
return ThreadPool.Names.SAME;
300-
}
301296
});
302297

303298
ClusterState state = timedClusterApplierService.state();

test/framework/src/main/java/org/elasticsearch/node/MockNode.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,9 @@ protected ClusterInfoService newClusterInfoService(Settings settings, ClusterSer
174174
if (getPluginsService().filterPlugins(MockInternalClusterInfoService.TestPlugin.class).isEmpty()) {
175175
return super.newClusterInfoService(settings, clusterService, threadPool, client);
176176
} else {
177-
return new MockInternalClusterInfoService(settings, clusterService, threadPool, client);
177+
final MockInternalClusterInfoService service = new MockInternalClusterInfoService(settings, clusterService, threadPool, client);
178+
clusterService.addListener(service);
179+
return service;
178180
}
179181
}
180182

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyMaintenanceService.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,6 @@ public void offMaster() {
9494
}
9595
}
9696

97-
@Override
98-
public String executorName() {
99-
return ThreadPool.Names.GENERIC;
100-
}
101-
10297
private void scheduleNext() {
10398
if (isMaster) {
10499
try {

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.elasticsearch.cluster.ClusterState;
1616
import org.elasticsearch.cluster.ClusterStateApplier;
1717
import org.elasticsearch.cluster.ClusterStateListener;
18-
import org.elasticsearch.cluster.LocalNodeMasterListener;
1918
import org.elasticsearch.cluster.metadata.IndexMetadata;
2019
import org.elasticsearch.cluster.service.ClusterService;
2120
import org.elasticsearch.common.Priority;
@@ -51,7 +50,7 @@
5150
* A service which runs the {@link LifecyclePolicy}s associated with indexes.
5251
*/
5352
public class IndexLifecycleService
54-
implements ClusterStateListener, ClusterStateApplier, SchedulerEngine.Listener, Closeable, LocalNodeMasterListener, IndexEventListener {
53+
implements ClusterStateListener, ClusterStateApplier, SchedulerEngine.Listener, Closeable, IndexEventListener {
5554
private static final Logger logger = LogManager.getLogger(IndexLifecycleService.class);
5655
private static final Set<String> IGNORE_STEPS_MAINTENANCE_REQUESTED = Collections.singleton(ShrinkStep.NAME);
5756
private volatile boolean isMaster = false;
@@ -82,7 +81,6 @@ public IndexLifecycleService(Settings settings, Client client, ClusterService cl
8281
this.pollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.get(settings);
8382
clusterService.addStateApplier(this);
8483
clusterService.addListener(this);
85-
clusterService.addLocalNodeMasterListener(this);
8684
clusterService.getClusterSettings().addSettingsUpdateConsumer(LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING,
8785
this::updatePollInterval);
8886
}
@@ -121,13 +119,11 @@ public ClusterState moveClusterStateToPreviouslyFailedStep(ClusterState currentS
121119
return newState;
122120
}
123121

124-
@Override
125-
public void onMaster() {
126-
this.isMaster = true;
122+
// package private for testing
123+
void onMaster(ClusterState clusterState) {
127124
maybeScheduleJob();
128125

129-
ClusterState clusterState = clusterService.state();
130-
IndexLifecycleMetadata currentMetadata = clusterState.metadata().custom(IndexLifecycleMetadata.TYPE);
126+
final IndexLifecycleMetadata currentMetadata = clusterState.metadata().custom(IndexLifecycleMetadata.TYPE);
131127
if (currentMetadata != null) {
132128
OperationMode currentMode = currentMetadata.getOperationMode();
133129
if (OperationMode.STOPPED.equals(currentMode)) {
@@ -184,17 +180,6 @@ public void onMaster() {
184180
}
185181
}
186182

187-
@Override
188-
public void offMaster() {
189-
this.isMaster = false;
190-
cancelJob();
191-
}
192-
193-
@Override
194-
public String executorName() {
195-
return ThreadPool.Names.MANAGEMENT;
196-
}
197-
198183
@Override
199184
public void beforeIndexAddedToCluster(Index index, Settings indexSettings) {
200185
if (shouldParseIndexName(indexSettings)) {
@@ -237,7 +222,20 @@ private synchronized void maybeScheduleJob() {
237222

238223
@Override
239224
public void clusterChanged(ClusterChangedEvent event) {
240-
IndexLifecycleMetadata lifecycleMetadata = event.state().metadata().custom(IndexLifecycleMetadata.TYPE);
225+
// Instead of using a LocalNodeMasterListener to track master changes, this service will
226+
// track them here to avoid conditions where master listener events run after other
227+
// listeners that depend on what happened in the master listener
228+
final boolean prevIsMaster = this.isMaster;
229+
if (prevIsMaster != event.localNodeMaster()) {
230+
this.isMaster = event.localNodeMaster();
231+
if (this.isMaster) {
232+
onMaster(event.state());
233+
} else {
234+
cancelJob();
235+
}
236+
}
237+
238+
final IndexLifecycleMetadata lifecycleMetadata = event.state().metadata().custom(IndexLifecycleMetadata.TYPE);
241239
if (this.isMaster && lifecycleMetadata != null) {
242240
triggerPolicies(event.state(), true);
243241
}

0 commit comments

Comments
 (0)