Skip to content

Commit c41ac5f

Browse files
authored
Fix race in SLM master/cluster state listeners (#59801)
This change fixes two possible race conditions in SLM related to how local master changes and cluster state events are observed. When implementing the `LocalNodeMasterListener` interface, it is only recommended to execute on a separate threadpool if the operations are heavy and would block the cluster state thread. SLM specified that the listeners should run in the Snapshot thread pool, but the operations in the listener were lightweight. This had the side effect of causing master changes to be delayed if the Snapshot threads were all busy and could also potentially cause the `onMaster` and `offMaster` calls to race if both were queued and then executed concurrently. Additionally, the `SnapshotLifecycleService` is also a `ClusterStateListener` and there is currently no order of operations guarantee between `LocalNodeMasterListeners` and `ClusterStateListeners` so this could lead to incorrect behavior. The resolution for these two issues is that the SnapshotRetentionService now specifies the `SAME` executor for its implementation of the `LocalNodeMasterListener` interface. The `SnapshotLifecycleService` is no longer a `LocalNodeMasterListener` and instead tracks local master changes in its `ClusterStateListner`.
1 parent 9ba70a3 commit c41ac5f

File tree

6 files changed

+103
-74
lines changed

6 files changed

+103
-74
lines changed

x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -969,8 +969,7 @@ public void testMoveToInjectedStep() throws Exception {
969969
assertTrue(indexExists(shrunkenIndex));
970970
assertTrue(aliasExists(shrunkenIndex, index));
971971
assertThat(getStepKeyForIndex(client(), shrunkenIndex), equalTo(PhaseCompleteStep.finalStep("warm").getKey()));
972-
973-
});
972+
}, 30, TimeUnit.SECONDS);
974973
}
975974

976975
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/53612")

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,9 +193,11 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
193193
clusterService));
194194
snapshotLifecycleService.set(new SnapshotLifecycleService(settings,
195195
() -> new SnapshotLifecycleTask(client, clusterService, snapshotHistoryStore.get()), clusterService, getClock()));
196+
snapshotLifecycleService.get().init();
196197
snapshotRetentionService.set(new SnapshotRetentionService(settings,
197198
() -> new SnapshotRetentionTask(client, clusterService, System::nanoTime, snapshotHistoryStore.get(), threadPool),
198-
clusterService, getClock()));
199+
getClock()));
200+
snapshotRetentionService.get().init(clusterService);
199201
components.addAll(Arrays.asList(snapshotLifecycleService.get(), snapshotHistoryStore.get(), snapshotRetentionService.get()));
200202

201203
return components;

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotLifecycleService.java

Lines changed: 21 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,10 @@
1111
import org.elasticsearch.cluster.ClusterChangedEvent;
1212
import org.elasticsearch.cluster.ClusterState;
1313
import org.elasticsearch.cluster.ClusterStateListener;
14-
import org.elasticsearch.cluster.LocalNodeMasterListener;
1514
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
1615
import org.elasticsearch.cluster.service.ClusterService;
1716
import org.elasticsearch.common.settings.Settings;
1817
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
19-
import org.elasticsearch.threadpool.ThreadPool;
2018
import org.elasticsearch.xpack.core.ilm.OperationMode;
2119
import org.elasticsearch.xpack.core.scheduler.CronSchedule;
2220
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
@@ -40,7 +38,7 @@
4038
* {@link SnapshotLifecycleTask}. It reacts to new policies in the cluster state by scheduling a
4139
* task according to the policy's schedule.
4240
*/
43-
public class SnapshotLifecycleService implements LocalNodeMasterListener, Closeable, ClusterStateListener {
41+
public class SnapshotLifecycleService implements Closeable, ClusterStateListener {
4442

4543
private static final Logger logger = LogManager.getLogger(SnapshotLifecycleService.class);
4644
private static final String JOB_PATTERN_SUFFIX = "-\\d+$";
@@ -59,12 +57,31 @@ public SnapshotLifecycleService(Settings settings,
5957
this.scheduler = new SchedulerEngine(settings, clock);
6058
this.clusterService = clusterService;
6159
this.snapshotTask = taskSupplier.get();
62-
clusterService.addLocalNodeMasterListener(this); // TODO: change this not to use 'this'
60+
}
61+
62+
/**
63+
* Initializer method to avoid the publication of a self reference in the constructor.
64+
*/
65+
public void init() {
6366
clusterService.addListener(this);
6467
}
6568

6669
@Override
6770
public void clusterChanged(final ClusterChangedEvent event) {
71+
// Instead of using a LocalNodeMasterListener to track master changes, this service will
72+
// track them here to avoid conditions where master listener events run after other
73+
// listeners that depend on what happened in the master listener
74+
final boolean prevIsMaster = this.isMaster;
75+
if (prevIsMaster != event.localNodeMaster()) {
76+
this.isMaster = event.localNodeMaster();
77+
if (this.isMaster) {
78+
scheduler.register(snapshotTask);
79+
} else {
80+
scheduler.unregister(snapshotTask);
81+
cancelSnapshotJobs();
82+
}
83+
}
84+
6885
if (this.isMaster) {
6986
final ClusterState state = event.state();
7087

@@ -83,25 +100,6 @@ public void clusterChanged(final ClusterChangedEvent event) {
83100
}
84101
}
85102

86-
@Override
87-
public void onMaster() {
88-
this.isMaster = true;
89-
scheduler.register(snapshotTask);
90-
final ClusterState state = clusterService.state();
91-
if (slmStoppedOrStopping(state)) {
92-
// SLM is currently stopped, so don't schedule jobs
93-
return;
94-
}
95-
scheduleSnapshotJobs(state);
96-
}
97-
98-
@Override
99-
public void offMaster() {
100-
this.isMaster = false;
101-
scheduler.unregister(snapshotTask);
102-
cancelSnapshotJobs();
103-
}
104-
105103
// Only used for testing
106104
SchedulerEngine getScheduler() {
107105
return this.scheduler;
@@ -236,11 +234,6 @@ public static void validateRepositoryExists(final String repository, final Clust
236234
.orElseThrow(() -> new IllegalArgumentException("no such repository [" + repository + "]"));
237235
}
238236

239-
@Override
240-
public String executorName() {
241-
return ThreadPool.Names.SNAPSHOT;
242-
}
243-
244237
@Override
245238
public void close() {
246239
if (this.running.compareAndSet(true, false)) {

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionService.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,18 @@ public class SnapshotRetentionService implements LocalNodeMasterListener, Closea
4646

4747
public SnapshotRetentionService(Settings settings,
4848
Supplier<SnapshotRetentionTask> taskSupplier,
49-
ClusterService clusterService,
5049
Clock clock) {
5150
this.clock = clock;
5251
this.scheduler = new SchedulerEngine(settings, clock);
5352
this.retentionTask = taskSupplier.get();
5453
this.scheduler.register(this.retentionTask);
5554
this.slmRetentionSchedule = LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING.get(settings);
55+
}
56+
57+
/**
58+
* Initializer method to avoid the publication of a self reference in the constructor.
59+
*/
60+
public void init(ClusterService clusterService) {
5661
clusterService.addLocalNodeMasterListener(this);
5762
clusterService.getClusterSettings().addSettingsUpdateConsumer(LifecycleSettings.SLM_RETENTION_SCHEDULE_SETTING,
5863
this::setUpdateSchedule);
@@ -110,7 +115,7 @@ public void triggerRetention() {
110115

111116
@Override
112117
public String executorName() {
113-
return ThreadPool.Names.SNAPSHOT;
118+
return ThreadPool.Names.SAME;
114119
}
115120

116121
@Override

x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java

Lines changed: 64 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,18 @@
66

77
package org.elasticsearch.xpack.slm;
88

9+
import org.elasticsearch.Version;
910
import org.elasticsearch.cluster.ClusterChangedEvent;
1011
import org.elasticsearch.cluster.ClusterName;
1112
import org.elasticsearch.cluster.ClusterState;
1213
import org.elasticsearch.cluster.metadata.Metadata;
1314
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
1415
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
16+
import org.elasticsearch.cluster.node.DiscoveryNode;
17+
import org.elasticsearch.cluster.node.DiscoveryNodes;
1518
import org.elasticsearch.cluster.service.ClusterService;
1619
import org.elasticsearch.common.settings.Settings;
20+
import org.elasticsearch.common.transport.TransportAddress;
1721
import org.elasticsearch.test.ClusterServiceUtils;
1822
import org.elasticsearch.test.ESTestCase;
1923
import org.elasticsearch.threadpool.TestThreadPool;
@@ -100,8 +104,7 @@ public void testNothingScheduledWhenNotRunning() throws InterruptedException {
100104
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(initialState, threadPool);
101105
SnapshotLifecycleService sls = new SnapshotLifecycleService(Settings.EMPTY,
102106
() -> new FakeSnapshotTask(e -> logger.info("triggered")), clusterService, clock)) {
103-
104-
sls.offMaster();
107+
sls.init();
105108

106109
SnapshotLifecyclePolicyMetadata newPolicy = SnapshotLifecyclePolicyMetadata.builder()
107110
.setPolicy(createPolicy("foo", "*/1 * * * * ?"))
@@ -121,26 +124,30 @@ public void testNothingScheduledWhenNotRunning() throws InterruptedException {
121124
// Since the service does not think it is master, it should not be triggered or scheduled
122125
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
123126

124-
sls.onMaster();
125-
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.singleton("initial-1")));
127+
ClusterState prevState = state;
128+
state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats()), true);
129+
sls.clusterChanged(new ClusterChangedEvent("2", state, prevState));
130+
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.singleton("foo-2")));
126131

127-
state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.STOPPING, new SnapshotLifecycleStats()));
128-
sls.clusterChanged(new ClusterChangedEvent("2", state, emptyState));
132+
prevState = state;
133+
state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.STOPPING, new SnapshotLifecycleStats()), true);
134+
sls.clusterChanged(new ClusterChangedEvent("3", state, prevState));
129135

130136
// Since the service is stopping, jobs should have been cancelled
131137
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
132138

133-
state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.STOPPED, new SnapshotLifecycleStats()));
134-
sls.clusterChanged(new ClusterChangedEvent("3", state, emptyState));
139+
prevState = state;
140+
state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.STOPPED, new SnapshotLifecycleStats()), true);
141+
sls.clusterChanged(new ClusterChangedEvent("4", state, prevState));
135142

136143
// Since the service is stopped, jobs should have been cancelled
137144
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
138145

139146
// No jobs should be scheduled when service is closed
140-
state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats()));
147+
prevState = state;
148+
state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats()), true);
141149
sls.close();
142-
sls.onMaster();
143-
sls.clusterChanged(new ClusterChangedEvent("1", state, emptyState));
150+
sls.clusterChanged(new ClusterChangedEvent("5", state, prevState));
144151
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
145152
} finally {
146153
threadPool.shutdownNow();
@@ -160,11 +167,12 @@ public void testPolicyCRUD() throws Exception {
160167
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
161168
SnapshotLifecycleService sls = new SnapshotLifecycleService(Settings.EMPTY,
162169
() -> new FakeSnapshotTask(e -> trigger.get().accept(e)), clusterService, clock)) {
163-
164-
sls.offMaster();
170+
sls.init();
165171
SnapshotLifecycleMetadata snapMeta =
166172
new SnapshotLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING, new SnapshotLifecycleStats());
167-
ClusterState previousState = createState(snapMeta);
173+
ClusterState state = createState(snapMeta, false);
174+
sls.clusterChanged(new ClusterChangedEvent("1", state, ClusterState.EMPTY_STATE));
175+
168176
Map<String, SnapshotLifecyclePolicyMetadata> policies = new HashMap<>();
169177

170178
SnapshotLifecyclePolicyMetadata policy = SnapshotLifecyclePolicyMetadata.builder()
@@ -174,8 +182,9 @@ public void testPolicyCRUD() throws Exception {
174182
.build();
175183
policies.put(policy.getPolicy().getId(), policy);
176184
snapMeta = new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats());
177-
ClusterState state = createState(snapMeta);
178-
ClusterChangedEvent event = new ClusterChangedEvent("1", state, previousState);
185+
ClusterState previousState = state;
186+
state = createState(snapMeta, false);
187+
ClusterChangedEvent event = new ClusterChangedEvent("2", state, previousState);
179188
trigger.set(e -> {
180189
fail("trigger should not be invoked");
181190
});
@@ -185,8 +194,10 @@ public void testPolicyCRUD() throws Exception {
185194
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
186195

187196
// Change the service to think it's on the master node, events should be scheduled now
188-
sls.onMaster();
189197
trigger.set(e -> triggerCount.incrementAndGet());
198+
previousState = state;
199+
state = createState(snapMeta, true);
200+
event = new ClusterChangedEvent("3", state, previousState);
190201
sls.clusterChanged(event);
191202
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.singleton("foo-1")));
192203

@@ -202,8 +213,8 @@ public void testPolicyCRUD() throws Exception {
202213
.setModifiedDate(2)
203214
.build();
204215
policies.put(policy.getPolicy().getId(), newPolicy);
205-
state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats()));
206-
event = new ClusterChangedEvent("2", state, previousState);
216+
state = createState(new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats()), true);
217+
event = new ClusterChangedEvent("4", state, previousState);
207218
sls.clusterChanged(event);
208219
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.singleton("foo-2")));
209220

@@ -226,9 +237,9 @@ public void testPolicyCRUD() throws Exception {
226237
final int currentCount2 = triggerCount.get();
227238
previousState = state;
228239
// Create a state simulating the policy being deleted
229-
state =
230-
createState(new SnapshotLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING, new SnapshotLifecycleStats()));
231-
event = new ClusterChangedEvent("2", state, previousState);
240+
state = createState(
241+
new SnapshotLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING, new SnapshotLifecycleStats()), true);
242+
event = new ClusterChangedEvent("5", state, previousState);
232243
sls.clusterChanged(event);
233244
clock.fastForwardSeconds(2);
234245

@@ -246,8 +257,8 @@ public void testPolicyCRUD() throws Exception {
246257
policies.put(policy.getPolicy().getId(), policy);
247258
snapMeta = new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats());
248259
previousState = state;
249-
state = createState(snapMeta);
250-
event = new ClusterChangedEvent("1", state, previousState);
260+
state = createState(snapMeta, true);
261+
event = new ClusterChangedEvent("6", state, previousState);
251262
trigger.set(e -> triggerCount.incrementAndGet());
252263
sls.clusterChanged(event);
253264
clock.fastForwardSeconds(2);
@@ -257,7 +268,10 @@ public void testPolicyCRUD() throws Exception {
257268
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.singleton("foo-3")));
258269

259270
// Signify becoming non-master, the jobs should all be cancelled
260-
sls.offMaster();
271+
previousState = state;
272+
state = createState(snapMeta, false);
273+
event = new ClusterChangedEvent("7", state, previousState);
274+
sls.clusterChanged(event);
261275
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
262276
} finally {
263277
threadPool.shutdownNow();
@@ -276,11 +290,13 @@ public void testPolicyNamesEndingInNumbers() throws Exception {
276290
try (ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
277291
SnapshotLifecycleService sls = new SnapshotLifecycleService(Settings.EMPTY,
278292
() -> new FakeSnapshotTask(e -> trigger.get().accept(e)), clusterService, clock)) {
279-
sls.onMaster();
280-
293+
sls.init();
281294
SnapshotLifecycleMetadata snapMeta =
282295
new SnapshotLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING, new SnapshotLifecycleStats());
283-
ClusterState previousState = createState(snapMeta);
296+
ClusterState state = createState(snapMeta, true);
297+
ClusterChangedEvent event = new ClusterChangedEvent("1", state, ClusterState.EMPTY_STATE);
298+
sls.clusterChanged(event);
299+
284300
Map<String, SnapshotLifecyclePolicyMetadata> policies = new HashMap<>();
285301

286302
SnapshotLifecyclePolicyMetadata policy = SnapshotLifecyclePolicyMetadata.builder()
@@ -291,13 +307,13 @@ public void testPolicyNamesEndingInNumbers() throws Exception {
291307
.build();
292308
policies.put(policy.getPolicy().getId(), policy);
293309
snapMeta = new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats());
294-
ClusterState state = createState(snapMeta);
295-
ClusterChangedEvent event = new ClusterChangedEvent("1", state, previousState);
310+
ClusterState previousState = state;
311+
state = createState(snapMeta, true);
312+
event = new ClusterChangedEvent("2", state, previousState);
296313
sls.clusterChanged(event);
297314

298315
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.singleton("foo-2-1")));
299316

300-
previousState = state;
301317
SnapshotLifecyclePolicyMetadata secondPolicy = SnapshotLifecyclePolicyMetadata.builder()
302318
.setPolicy(createPolicy("foo-1", "45 * * * * ?"))
303319
.setHeaders(Collections.emptyMap())
@@ -306,13 +322,17 @@ public void testPolicyNamesEndingInNumbers() throws Exception {
306322
.build();
307323
policies.put(secondPolicy.getPolicy().getId(), secondPolicy);
308324
snapMeta = new SnapshotLifecycleMetadata(policies, OperationMode.RUNNING, new SnapshotLifecycleStats());
309-
state = createState(snapMeta);
310-
event = new ClusterChangedEvent("2", state, previousState);
325+
previousState = state;
326+
state = createState(snapMeta, true);
327+
event = new ClusterChangedEvent("3", state, previousState);
311328
sls.clusterChanged(event);
312329

313330
assertThat(sls.getScheduler().scheduledJobIds(), containsInAnyOrder("foo-2-1", "foo-1-2"));
314331

315-
sls.offMaster();
332+
previousState = state;
333+
state = createState(snapMeta, false);
334+
event = new ClusterChangedEvent("4", state, previousState);
335+
sls.clusterChanged(event);
316336
assertThat(sls.getScheduler().scheduledJobIds(), equalTo(Collections.emptySet()));
317337
} finally {
318338
threadPool.shutdownNow();
@@ -336,10 +356,20 @@ public void triggered(SchedulerEngine.Event event) {
336356
}
337357

338358
public ClusterState createState(SnapshotLifecycleMetadata snapMeta) {
359+
return createState(snapMeta, false);
360+
}
361+
362+
public ClusterState createState(SnapshotLifecycleMetadata snapMeta, boolean localNodeMaster) {
339363
Metadata metadata = Metadata.builder()
340364
.putCustom(SnapshotLifecycleMetadata.TYPE, snapMeta)
341365
.build();
366+
final DiscoveryNodes.Builder discoveryNodesBuilder = DiscoveryNodes.builder()
367+
.add(DiscoveryNode.createLocal(Settings.EMPTY, new TransportAddress(TransportAddress.META_ADDRESS, 9300), "local"))
368+
.add(new DiscoveryNode("remote", new TransportAddress(TransportAddress.META_ADDRESS, 9301), Version.CURRENT))
369+
.localNodeId("local")
370+
.masterNodeId(localNodeMaster ? "local" : "remote");
342371
return ClusterState.builder(new ClusterName("cluster"))
372+
.nodes(discoveryNodesBuilder)
343373
.metadata(metadata)
344374
.build();
345375
}

0 commit comments

Comments
 (0)