Skip to content

Commit 13cb0fb

Browse files
author
David Roberts
authored
Periodically try to reassign unassigned persistent tasks (#36069)
Previously persistent task assignment was checked in the following situations: - Persistent tasks are changed - A node joins or leaves the cluster - The routing table is changed - Custom metadata in the cluster state is changed - A new master node is elected However, there could be situations when a persistent task that could not be assigned to a node could become assignable due to some other change, such as memory usage on the nodes. This change adds a timed recheck of persistent task assignment to account for such situations. The timer is suspended while checks triggered by cluster state changes are in-flight to avoid adding burden to an already busy cluster. Closes #35792
1 parent 34d7cc1 commit 13cb0fb

File tree

12 files changed

+711
-120
lines changed

12 files changed

+711
-120
lines changed

docs/reference/modules/cluster/misc.asciidoc

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,10 +135,10 @@ Plugins can create a kind of tasks called persistent tasks. Those tasks are
135135
usually long-live tasks and are stored in the cluster state, allowing the
136136
tasks to be revived after a full cluster restart.
137137

138-
Every time a persistent task is created, the master nodes takes care of
138+
Every time a persistent task is created, the master node takes care of
139139
assigning the task to a node of the cluster, and the assigned node will then
140140
pick up the task and execute it locally. The process of assigning persistent
141-
tasks to nodes is controlled by the following property, which can be updated
141+
tasks to nodes is controlled by the following properties, which can be updated
142142
dynamically:
143143

144144
`cluster.persistent_tasks.allocation.enable`::
@@ -153,3 +153,13 @@ This setting does not affect the persistent tasks that are already being execute
153153
Only newly created persistent tasks, or tasks that must be reassigned (after a node
154154
left the cluster, for example), are impacted by this setting.
155155
--
156+
157+
`cluster.persistent_tasks.allocation.recheck_interval`::
158+
159+
The master node will automatically check whether persistent tasks need to
160+
be assigned when the cluster state changes significantly. However, there
161+
may be other factors, such as memory usage, that affect whether persistent
162+
tasks can be assigned to nodes but do not cause the cluster state to change.
163+
This setting controls how often assignment checks are performed to react to
164+
these factors. The default is 30 seconds. The minimum permitted value is 10
165+
seconds.

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@
9292
import org.elasticsearch.monitor.os.OsService;
9393
import org.elasticsearch.monitor.process.ProcessService;
9494
import org.elasticsearch.node.Node;
95+
import org.elasticsearch.persistent.PersistentTasksClusterService;
9596
import org.elasticsearch.persistent.decider.EnableAssignmentDecider;
9697
import org.elasticsearch.plugins.PluginsService;
9798
import org.elasticsearch.repositories.fs.FsRepository;
@@ -456,6 +457,7 @@ public void apply(Settings value, Settings current, Settings previous) {
456457
Node.BREAKER_TYPE_KEY,
457458
OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING,
458459
IndexGraveyard.SETTING_MAX_TOMBSTONES,
460+
PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING,
459461
EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING,
460462
PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING,
461463
PeerFinder.DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING,
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.common.util.concurrent;
20+
21+
import org.apache.logging.log4j.Logger;
22+
import org.apache.logging.log4j.message.ParameterizedMessage;
23+
import org.elasticsearch.common.unit.TimeValue;
24+
import org.elasticsearch.threadpool.ThreadPool;
25+
26+
import java.io.Closeable;
27+
import java.util.Objects;
28+
import java.util.concurrent.ScheduledFuture;
29+
import java.util.concurrent.atomic.AtomicBoolean;
30+
31+
/**
32+
* A base class for tasks that need to repeat.
33+
*/
34+
public abstract class AbstractAsyncTask implements Runnable, Closeable {
35+
36+
private final Logger logger;
37+
private final ThreadPool threadPool;
38+
private final AtomicBoolean closed = new AtomicBoolean(false);
39+
private final boolean autoReschedule;
40+
private volatile ScheduledFuture<?> scheduledFuture;
41+
private volatile boolean isScheduledOrRunning;
42+
private volatile Exception lastThrownException;
43+
private volatile TimeValue interval;
44+
45+
protected AbstractAsyncTask(Logger logger, ThreadPool threadPool, TimeValue interval, boolean autoReschedule) {
46+
this.logger = logger;
47+
this.threadPool = threadPool;
48+
this.interval = interval;
49+
this.autoReschedule = autoReschedule;
50+
}
51+
52+
/**
53+
* Change the interval between runs.
54+
* If a future run is scheduled then this will reschedule it.
55+
* @param interval The new interval between runs.
56+
*/
57+
public synchronized void setInterval(TimeValue interval) {
58+
this.interval = interval;
59+
if (scheduledFuture != null) {
60+
rescheduleIfNecessary();
61+
}
62+
}
63+
64+
public TimeValue getInterval() {
65+
return interval;
66+
}
67+
68+
/**
69+
* Test any external conditions that determine whether the task
70+
* should be scheduled. This method does *not* need to test if
71+
* the task is closed, as being closed automatically prevents
72+
* scheduling.
73+
* @return Should the task be scheduled to run?
74+
*/
75+
protected abstract boolean mustReschedule();
76+
77+
/**
78+
* Schedule the task to run after the configured interval if it
79+
* is not closed and any further conditions imposed by derived
80+
* classes are met. Any previously scheduled invocation is
81+
* cancelled.
82+
*/
83+
public synchronized void rescheduleIfNecessary() {
84+
if (isClosed()) {
85+
return;
86+
}
87+
if (scheduledFuture != null) {
88+
FutureUtils.cancel(scheduledFuture);
89+
}
90+
if (interval.millis() > 0 && mustReschedule()) {
91+
if (logger.isTraceEnabled()) {
92+
logger.trace("scheduling {} every {}", toString(), interval);
93+
}
94+
scheduledFuture = threadPool.schedule(interval, getThreadPool(), this);
95+
isScheduledOrRunning = true;
96+
} else {
97+
logger.trace("scheduled {} disabled", toString());
98+
scheduledFuture = null;
99+
isScheduledOrRunning = false;
100+
}
101+
}
102+
103+
public boolean isScheduled() {
104+
// Currently running counts as scheduled to avoid an oscillating return value
105+
// from this method when a task is repeatedly running and rescheduling itself.
106+
return isScheduledOrRunning;
107+
}
108+
109+
/**
110+
* Cancel any scheduled run, but do not prevent subsequent restarts.
111+
*/
112+
public synchronized void cancel() {
113+
FutureUtils.cancel(scheduledFuture);
114+
scheduledFuture = null;
115+
isScheduledOrRunning = false;
116+
}
117+
118+
/**
119+
* Cancel any scheduled run
120+
*/
121+
@Override
122+
public synchronized void close() {
123+
if (closed.compareAndSet(false, true)) {
124+
cancel();
125+
}
126+
}
127+
128+
public boolean isClosed() {
129+
return this.closed.get();
130+
}
131+
132+
@Override
133+
public final void run() {
134+
synchronized (this) {
135+
scheduledFuture = null;
136+
isScheduledOrRunning = autoReschedule;
137+
}
138+
try {
139+
runInternal();
140+
} catch (Exception ex) {
141+
if (lastThrownException == null || sameException(lastThrownException, ex) == false) {
142+
// prevent the annoying fact of logging the same stuff all the time with an interval of 1 sec will spam all your logs
143+
logger.warn(
144+
() -> new ParameterizedMessage(
145+
"failed to run task {} - suppressing re-occurring exceptions unless the exception changes",
146+
toString()),
147+
ex);
148+
lastThrownException = ex;
149+
}
150+
} finally {
151+
if (autoReschedule) {
152+
rescheduleIfNecessary();
153+
}
154+
}
155+
}
156+
157+
private static boolean sameException(Exception left, Exception right) {
158+
if (left.getClass() == right.getClass()) {
159+
if (Objects.equals(left.getMessage(), right.getMessage())) {
160+
StackTraceElement[] stackTraceLeft = left.getStackTrace();
161+
StackTraceElement[] stackTraceRight = right.getStackTrace();
162+
if (stackTraceLeft.length == stackTraceRight.length) {
163+
for (int i = 0; i < stackTraceLeft.length; i++) {
164+
if (stackTraceLeft[i].equals(stackTraceRight[i]) == false) {
165+
return false;
166+
}
167+
}
168+
return true;
169+
}
170+
}
171+
}
172+
return false;
173+
}
174+
175+
protected abstract void runInternal();
176+
177+
/**
178+
* Use the same threadpool by default.
179+
* Derived classes can change this if required.
180+
*/
181+
protected String getThreadPool() {
182+
return ThreadPool.Names.SAME;
183+
}
184+
}

server/src/main/java/org/elasticsearch/index/IndexService.java

Lines changed: 6 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@
3838
import org.elasticsearch.common.settings.Settings;
3939
import org.elasticsearch.common.unit.TimeValue;
4040
import org.elasticsearch.common.util.BigArrays;
41+
import org.elasticsearch.common.util.concurrent.AbstractAsyncTask;
4142
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
42-
import org.elasticsearch.common.util.concurrent.FutureUtils;
4343
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
4444
import org.elasticsearch.core.internal.io.IOUtils;
4545
import org.elasticsearch.env.NodeEnvironment;
@@ -87,7 +87,6 @@
8787
import java.util.Map;
8888
import java.util.Objects;
8989
import java.util.Set;
90-
import java.util.concurrent.ScheduledFuture;
9190
import java.util.concurrent.TimeUnit;
9291
import java.util.concurrent.atomic.AtomicBoolean;
9392
import java.util.function.Consumer;
@@ -805,100 +804,18 @@ private void maybeSyncGlobalCheckpoints() {
805804
}
806805
}
807806

808-
abstract static class BaseAsyncTask implements Runnable, Closeable {
807+
abstract static class BaseAsyncTask extends AbstractAsyncTask {
809808
protected final IndexService indexService;
810-
protected final ThreadPool threadPool;
811-
private final TimeValue interval;
812-
private ScheduledFuture<?> scheduledFuture;
813-
private final AtomicBoolean closed = new AtomicBoolean(false);
814-
private volatile Exception lastThrownException;
815809

816810
BaseAsyncTask(IndexService indexService, TimeValue interval) {
811+
super(indexService.logger, indexService.threadPool, interval, true);
817812
this.indexService = indexService;
818-
this.threadPool = indexService.getThreadPool();
819-
this.interval = interval;
820-
onTaskCompletion();
813+
rescheduleIfNecessary();
821814
}
822815

823-
boolean mustReschedule() {
816+
protected boolean mustReschedule() {
824817
// don't re-schedule if its closed or if we don't have a single shard here..., we are done
825-
return indexService.closed.get() == false
826-
&& closed.get() == false && interval.millis() > 0;
827-
}
828-
829-
private synchronized void onTaskCompletion() {
830-
if (mustReschedule()) {
831-
if (indexService.logger.isTraceEnabled()) {
832-
indexService.logger.trace("scheduling {} every {}", toString(), interval);
833-
}
834-
this.scheduledFuture = threadPool.schedule(interval, getThreadPool(), BaseAsyncTask.this);
835-
} else {
836-
indexService.logger.trace("scheduled {} disabled", toString());
837-
this.scheduledFuture = null;
838-
}
839-
}
840-
841-
boolean isScheduled() {
842-
return scheduledFuture != null;
843-
}
844-
845-
@Override
846-
public final void run() {
847-
try {
848-
runInternal();
849-
} catch (Exception ex) {
850-
if (lastThrownException == null || sameException(lastThrownException, ex) == false) {
851-
// prevent the annoying fact of logging the same stuff all the time with an interval of 1 sec will spam all your logs
852-
indexService.logger.warn(
853-
() -> new ParameterizedMessage(
854-
"failed to run task {} - suppressing re-occurring exceptions unless the exception changes",
855-
toString()),
856-
ex);
857-
lastThrownException = ex;
858-
}
859-
} finally {
860-
onTaskCompletion();
861-
}
862-
}
863-
864-
private static boolean sameException(Exception left, Exception right) {
865-
if (left.getClass() == right.getClass()) {
866-
if (Objects.equals(left.getMessage(), right.getMessage())) {
867-
StackTraceElement[] stackTraceLeft = left.getStackTrace();
868-
StackTraceElement[] stackTraceRight = right.getStackTrace();
869-
if (stackTraceLeft.length == stackTraceRight.length) {
870-
for (int i = 0; i < stackTraceLeft.length; i++) {
871-
if (stackTraceLeft[i].equals(stackTraceRight[i]) == false) {
872-
return false;
873-
}
874-
}
875-
return true;
876-
}
877-
}
878-
}
879-
return false;
880-
}
881-
882-
protected abstract void runInternal();
883-
884-
protected String getThreadPool() {
885-
return ThreadPool.Names.SAME;
886-
}
887-
888-
@Override
889-
public synchronized void close() {
890-
if (closed.compareAndSet(false, true)) {
891-
FutureUtils.cancel(scheduledFuture);
892-
scheduledFuture = null;
893-
}
894-
}
895-
896-
TimeValue getInterval() {
897-
return interval;
898-
}
899-
900-
boolean isClosed() {
901-
return this.closed.get();
818+
return indexService.closed.get() == false;
902819
}
903820
}
904821

server/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,8 @@ protected Node(
501501

502502
final PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(tasksExecutors);
503503
final PersistentTasksClusterService persistentTasksClusterService =
504-
new PersistentTasksClusterService(settings, registry, clusterService);
504+
new PersistentTasksClusterService(settings, registry, clusterService, threadPool);
505+
resourcesToClose.add(persistentTasksClusterService);
505506
final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client);
506507

507508
modules.add(b -> {

0 commit comments

Comments
 (0)