Skip to content

Commit 971a707

Browse files
author
David Roberts
committed
Periodically try to reassign unassigned persistent tasks (#36069)
Previously persistent task assignment was checked in the following situations: - Persistent tasks are changed - A node joins or leaves the cluster - The routing table is changed - Custom metadata in the cluster state is changed - A new master node is elected However, there could be situations when a persistent task that could not be assigned to a node could become assignable due to some other change, such as memory usage on the nodes. This change adds a timed recheck of persistent task assignment to account for such situations. The timer is suspended while checks triggered by cluster state changes are in-flight to avoid adding burden to an already busy cluster. Closes #35792
1 parent ec0036b commit 971a707

File tree

12 files changed

+712
-121
lines changed

12 files changed

+712
-121
lines changed

docs/reference/modules/cluster/misc.asciidoc

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,10 @@ Plugins can create a kind of tasks called persistent tasks. Those tasks are
139139
usually long-live tasks and are stored in the cluster state, allowing the
140140
tasks to be revived after a full cluster restart.
141141

142-
Every time a persistent task is created, the master nodes takes care of
142+
Every time a persistent task is created, the master node takes care of
143143
assigning the task to a node of the cluster, and the assigned node will then
144144
pick up the task and execute it locally. The process of assigning persistent
145-
tasks to nodes is controlled by the following property, which can be updated
145+
tasks to nodes is controlled by the following properties, which can be updated
146146
dynamically:
147147

148148
`cluster.persistent_tasks.allocation.enable`::
@@ -157,3 +157,13 @@ This setting does not affect the persistent tasks that are already being execute
157157
Only newly created persistent tasks, or tasks that must be reassigned (after a node
158158
left the cluster, for example), are impacted by this setting.
159159
--
160+
161+
`cluster.persistent_tasks.allocation.recheck_interval`::
162+
163+
The master node will automatically check whether persistent tasks need to
164+
be assigned when the cluster state changes significantly. However, there
165+
may be other factors, such as memory usage, that affect whether persistent
166+
tasks can be assigned to nodes but do not cause the cluster state to change.
167+
This setting controls how often assignment checks are performed to react to
168+
these factors. The default is 30 seconds. The minimum permitted value is 10
169+
seconds.

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import org.elasticsearch.monitor.os.OsService;
8282
import org.elasticsearch.monitor.process.ProcessService;
8383
import org.elasticsearch.node.Node;
84+
import org.elasticsearch.persistent.PersistentTasksClusterService;
8485
import org.elasticsearch.persistent.decider.EnableAssignmentDecider;
8586
import org.elasticsearch.plugins.PluginsService;
8687
import org.elasticsearch.repositories.fs.FsRepository;
@@ -447,7 +448,8 @@ public void apply(Settings value, Settings current, Settings previous) {
447448
Node.BREAKER_TYPE_KEY,
448449
OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING,
449450
IndexGraveyard.SETTING_MAX_TOMBSTONES,
450-
EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING
451+
EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING,
452+
PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING
451453
)));
452454

453455
public static List<SettingUpgrader<?>> BUILT_IN_SETTING_UPGRADERS = Collections.unmodifiableList(Arrays.asList(
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.common.util.concurrent;
20+
21+
import org.apache.logging.log4j.Logger;
22+
import org.apache.logging.log4j.message.ParameterizedMessage;
23+
import org.elasticsearch.common.unit.TimeValue;
24+
import org.elasticsearch.threadpool.ThreadPool;
25+
26+
import java.io.Closeable;
27+
import java.util.Objects;
28+
import java.util.concurrent.ScheduledFuture;
29+
import java.util.concurrent.atomic.AtomicBoolean;
30+
31+
/**
32+
* A base class for tasks that need to repeat.
33+
*/
34+
public abstract class AbstractAsyncTask implements Runnable, Closeable {
35+
36+
private final Logger logger;
37+
private final ThreadPool threadPool;
38+
private final AtomicBoolean closed = new AtomicBoolean(false);
39+
private final boolean autoReschedule;
40+
private volatile ScheduledFuture<?> scheduledFuture;
41+
private volatile boolean isScheduledOrRunning;
42+
private volatile Exception lastThrownException;
43+
private volatile TimeValue interval;
44+
45+
protected AbstractAsyncTask(Logger logger, ThreadPool threadPool, TimeValue interval, boolean autoReschedule) {
46+
this.logger = logger;
47+
this.threadPool = threadPool;
48+
this.interval = interval;
49+
this.autoReschedule = autoReschedule;
50+
}
51+
52+
/**
53+
* Change the interval between runs.
54+
* If a future run is scheduled then this will reschedule it.
55+
* @param interval The new interval between runs.
56+
*/
57+
public synchronized void setInterval(TimeValue interval) {
58+
this.interval = interval;
59+
if (scheduledFuture != null) {
60+
rescheduleIfNecessary();
61+
}
62+
}
63+
64+
public TimeValue getInterval() {
65+
return interval;
66+
}
67+
68+
/**
69+
* Test any external conditions that determine whether the task
70+
* should be scheduled. This method does *not* need to test if
71+
* the task is closed, as being closed automatically prevents
72+
* scheduling.
73+
* @return Should the task be scheduled to run?
74+
*/
75+
protected abstract boolean mustReschedule();
76+
77+
/**
78+
* Schedule the task to run after the configured interval if it
79+
* is not closed and any further conditions imposed by derived
80+
* classes are met. Any previously scheduled invocation is
81+
* cancelled.
82+
*/
83+
public synchronized void rescheduleIfNecessary() {
84+
if (isClosed()) {
85+
return;
86+
}
87+
if (scheduledFuture != null) {
88+
FutureUtils.cancel(scheduledFuture);
89+
}
90+
if (interval.millis() > 0 && mustReschedule()) {
91+
if (logger.isTraceEnabled()) {
92+
logger.trace("scheduling {} every {}", toString(), interval);
93+
}
94+
scheduledFuture = threadPool.schedule(interval, getThreadPool(), this);
95+
isScheduledOrRunning = true;
96+
} else {
97+
logger.trace("scheduled {} disabled", toString());
98+
scheduledFuture = null;
99+
isScheduledOrRunning = false;
100+
}
101+
}
102+
103+
public boolean isScheduled() {
104+
// Currently running counts as scheduled to avoid an oscillating return value
105+
// from this method when a task is repeatedly running and rescheduling itself.
106+
return isScheduledOrRunning;
107+
}
108+
109+
/**
110+
* Cancel any scheduled run, but do not prevent subsequent restarts.
111+
*/
112+
public synchronized void cancel() {
113+
FutureUtils.cancel(scheduledFuture);
114+
scheduledFuture = null;
115+
isScheduledOrRunning = false;
116+
}
117+
118+
/**
119+
* Cancel any scheduled run
120+
*/
121+
@Override
122+
public synchronized void close() {
123+
if (closed.compareAndSet(false, true)) {
124+
cancel();
125+
}
126+
}
127+
128+
public boolean isClosed() {
129+
return this.closed.get();
130+
}
131+
132+
@Override
133+
public final void run() {
134+
synchronized (this) {
135+
scheduledFuture = null;
136+
isScheduledOrRunning = autoReschedule;
137+
}
138+
try {
139+
runInternal();
140+
} catch (Exception ex) {
141+
if (lastThrownException == null || sameException(lastThrownException, ex) == false) {
142+
// prevent the annoying fact of logging the same stuff all the time with an interval of 1 sec will spam all your logs
143+
logger.warn(
144+
() -> new ParameterizedMessage(
145+
"failed to run task {} - suppressing re-occurring exceptions unless the exception changes",
146+
toString()),
147+
ex);
148+
lastThrownException = ex;
149+
}
150+
} finally {
151+
if (autoReschedule) {
152+
rescheduleIfNecessary();
153+
}
154+
}
155+
}
156+
157+
private static boolean sameException(Exception left, Exception right) {
158+
if (left.getClass() == right.getClass()) {
159+
if (Objects.equals(left.getMessage(), right.getMessage())) {
160+
StackTraceElement[] stackTraceLeft = left.getStackTrace();
161+
StackTraceElement[] stackTraceRight = right.getStackTrace();
162+
if (stackTraceLeft.length == stackTraceRight.length) {
163+
for (int i = 0; i < stackTraceLeft.length; i++) {
164+
if (stackTraceLeft[i].equals(stackTraceRight[i]) == false) {
165+
return false;
166+
}
167+
}
168+
return true;
169+
}
170+
}
171+
}
172+
return false;
173+
}
174+
175+
protected abstract void runInternal();
176+
177+
/**
178+
* Use the same threadpool by default.
179+
* Derived classes can change this if required.
180+
*/
181+
protected String getThreadPool() {
182+
return ThreadPool.Names.SAME;
183+
}
184+
}

server/src/main/java/org/elasticsearch/index/IndexService.java

Lines changed: 6 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import org.elasticsearch.common.settings.Settings;
3939
import org.elasticsearch.common.unit.TimeValue;
4040
import org.elasticsearch.common.util.BigArrays;
41-
import org.elasticsearch.common.util.concurrent.FutureUtils;
41+
import org.elasticsearch.common.util.concurrent.AbstractAsyncTask;
4242
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
4343
import org.elasticsearch.core.internal.io.IOUtils;
4444
import org.elasticsearch.env.NodeEnvironment;
@@ -86,7 +86,6 @@
8686
import java.util.Map;
8787
import java.util.Objects;
8888
import java.util.Set;
89-
import java.util.concurrent.ScheduledFuture;
9089
import java.util.concurrent.TimeUnit;
9190
import java.util.concurrent.atomic.AtomicBoolean;
9291
import java.util.function.Consumer;
@@ -787,100 +786,18 @@ private void maybeSyncGlobalCheckpoints() {
787786
}
788787
}
789788

790-
abstract static class BaseAsyncTask implements Runnable, Closeable {
789+
abstract static class BaseAsyncTask extends AbstractAsyncTask {
791790
protected final IndexService indexService;
792-
protected final ThreadPool threadPool;
793-
private final TimeValue interval;
794-
private ScheduledFuture<?> scheduledFuture;
795-
private final AtomicBoolean closed = new AtomicBoolean(false);
796-
private volatile Exception lastThrownException;
797791

798792
BaseAsyncTask(IndexService indexService, TimeValue interval) {
793+
super(indexService.logger, indexService.threadPool, interval, true);
799794
this.indexService = indexService;
800-
this.threadPool = indexService.getThreadPool();
801-
this.interval = interval;
802-
onTaskCompletion();
795+
rescheduleIfNecessary();
803796
}
804797

805-
boolean mustReschedule() {
798+
protected boolean mustReschedule() {
806799
// don't re-schedule if its closed or if we don't have a single shard here..., we are done
807-
return indexService.closed.get() == false
808-
&& closed.get() == false && interval.millis() > 0;
809-
}
810-
811-
private synchronized void onTaskCompletion() {
812-
if (mustReschedule()) {
813-
if (indexService.logger.isTraceEnabled()) {
814-
indexService.logger.trace("scheduling {} every {}", toString(), interval);
815-
}
816-
this.scheduledFuture = threadPool.schedule(interval, getThreadPool(), BaseAsyncTask.this);
817-
} else {
818-
indexService.logger.trace("scheduled {} disabled", toString());
819-
this.scheduledFuture = null;
820-
}
821-
}
822-
823-
boolean isScheduled() {
824-
return scheduledFuture != null;
825-
}
826-
827-
@Override
828-
public final void run() {
829-
try {
830-
runInternal();
831-
} catch (Exception ex) {
832-
if (lastThrownException == null || sameException(lastThrownException, ex) == false) {
833-
// prevent the annoying fact of logging the same stuff all the time with an interval of 1 sec will spam all your logs
834-
indexService.logger.warn(
835-
() -> new ParameterizedMessage(
836-
"failed to run task {} - suppressing re-occurring exceptions unless the exception changes",
837-
toString()),
838-
ex);
839-
lastThrownException = ex;
840-
}
841-
} finally {
842-
onTaskCompletion();
843-
}
844-
}
845-
846-
private static boolean sameException(Exception left, Exception right) {
847-
if (left.getClass() == right.getClass()) {
848-
if (Objects.equals(left.getMessage(), right.getMessage())) {
849-
StackTraceElement[] stackTraceLeft = left.getStackTrace();
850-
StackTraceElement[] stackTraceRight = right.getStackTrace();
851-
if (stackTraceLeft.length == stackTraceRight.length) {
852-
for (int i = 0; i < stackTraceLeft.length; i++) {
853-
if (stackTraceLeft[i].equals(stackTraceRight[i]) == false) {
854-
return false;
855-
}
856-
}
857-
return true;
858-
}
859-
}
860-
}
861-
return false;
862-
}
863-
864-
protected abstract void runInternal();
865-
866-
protected String getThreadPool() {
867-
return ThreadPool.Names.SAME;
868-
}
869-
870-
@Override
871-
public synchronized void close() {
872-
if (closed.compareAndSet(false, true)) {
873-
FutureUtils.cancel(scheduledFuture);
874-
scheduledFuture = null;
875-
}
876-
}
877-
878-
TimeValue getInterval() {
879-
return interval;
880-
}
881-
882-
boolean isClosed() {
883-
return this.closed.get();
800+
return indexService.closed.get() == false;
884801
}
885802
}
886803

server/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,8 @@ protected Node(
548548

549549
final PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(tasksExecutors);
550550
final PersistentTasksClusterService persistentTasksClusterService =
551-
new PersistentTasksClusterService(settings, registry, clusterService);
551+
new PersistentTasksClusterService(settings, registry, clusterService, threadPool);
552+
resourcesToClose.add(persistentTasksClusterService);
552553
final PersistentTasksService persistentTasksService = new PersistentTasksService(clusterService, threadPool, client);
553554

554555
modules.add(b -> {

0 commit comments

Comments
 (0)