Skip to content

Commit 8db5a6d

Browse files
committed
Extract batch executor out of cluster service (#24102)
Refactoring that extracts the task batching functionality from ClusterService and makes it a reusable component that can be tested in isolation.
1 parent 913283a commit 8db5a6d

File tree

9 files changed

+1050
-604
lines changed

9 files changed

+1050
-604
lines changed

core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java

Lines changed: 66 additions & 171 deletions
Large diffs are not rendered by default.
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.cluster.service;
21+
22+
import org.elasticsearch.common.Priority;
23+
import org.elasticsearch.common.util.concurrent.PrioritizedRunnable;
24+
25+
/**
26+
* PrioritizedRunnable that also has a source string
27+
*/
28+
public abstract class SourcePrioritizedRunnable extends PrioritizedRunnable {
29+
protected final String source;
30+
31+
public SourcePrioritizedRunnable(Priority priority, String source) {
32+
super(priority);
33+
this.source = source;
34+
}
35+
36+
public String source() {
37+
return source;
38+
}
39+
40+
@Override
41+
public String toString() {
42+
return "[" + source + "]";
43+
}
44+
}
Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.cluster.service;
21+
22+
import org.apache.logging.log4j.Logger;
23+
import org.elasticsearch.common.Nullable;
24+
import org.elasticsearch.common.Priority;
25+
import org.elasticsearch.common.unit.TimeValue;
26+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
27+
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
28+
29+
import java.util.ArrayList;
30+
import java.util.Collections;
31+
import java.util.HashMap;
32+
import java.util.IdentityHashMap;
33+
import java.util.LinkedHashSet;
34+
import java.util.List;
35+
import java.util.Map;
36+
import java.util.concurrent.atomic.AtomicBoolean;
37+
import java.util.function.Function;
38+
import java.util.stream.Collectors;
39+
40+
/**
41+
* Batching support for {@link PrioritizedEsThreadPoolExecutor}
42+
* Tasks that share the same batching key are batched (see {@link BatchedTask#batchingKey})
43+
*/
44+
public abstract class TaskBatcher {
45+
46+
private final Logger logger;
47+
private final PrioritizedEsThreadPoolExecutor threadExecutor;
48+
// package visible for tests
49+
final Map<Object, LinkedHashSet<BatchedTask>> tasksPerBatchingKey = new HashMap<>();
50+
51+
public TaskBatcher(Logger logger, PrioritizedEsThreadPoolExecutor threadExecutor) {
52+
this.logger = logger;
53+
this.threadExecutor = threadExecutor;
54+
}
55+
56+
public void submitTasks(List<? extends BatchedTask> tasks, @Nullable TimeValue timeout) throws EsRejectedExecutionException {
57+
if (tasks.isEmpty()) {
58+
return;
59+
}
60+
final BatchedTask firstTask = tasks.get(0);
61+
assert tasks.stream().allMatch(t -> t.batchingKey == firstTask.batchingKey) :
62+
"tasks submitted in a batch should share the same batching key: " + tasks;
63+
// convert to an identity map to check for dups based on task identity
64+
final Map<Object, BatchedTask> tasksIdentity = tasks.stream().collect(Collectors.toMap(
65+
BatchedTask::getTask,
66+
Function.identity(),
67+
(a, b) -> { throw new IllegalStateException("cannot add duplicate task: " + a); },
68+
IdentityHashMap::new));
69+
70+
synchronized (tasksPerBatchingKey) {
71+
LinkedHashSet<BatchedTask> existingTasks = tasksPerBatchingKey.computeIfAbsent(firstTask.batchingKey,
72+
k -> new LinkedHashSet<>(tasks.size()));
73+
for (BatchedTask existing : existingTasks) {
74+
// check that there won't be two tasks with the same identity for the same batching key
75+
BatchedTask duplicateTask = tasksIdentity.get(existing.getTask());
76+
if (duplicateTask != null) {
77+
throw new IllegalStateException("task [" + duplicateTask.describeTasks(
78+
Collections.singletonList(existing)) + "] with source [" + duplicateTask.source + "] is already queued");
79+
}
80+
}
81+
existingTasks.addAll(tasks);
82+
}
83+
84+
if (timeout != null) {
85+
threadExecutor.execute(firstTask, timeout, () -> onTimeoutInternal(tasks, timeout));
86+
} else {
87+
threadExecutor.execute(firstTask);
88+
}
89+
}
90+
91+
private void onTimeoutInternal(List<? extends BatchedTask> tasks, TimeValue timeout) {
92+
final ArrayList<BatchedTask> toRemove = new ArrayList<>();
93+
for (BatchedTask task : tasks) {
94+
if (task.processed.getAndSet(true) == false) {
95+
logger.debug("task [{}] timed out after [{}]", task.source, timeout);
96+
toRemove.add(task);
97+
}
98+
}
99+
if (toRemove.isEmpty() == false) {
100+
BatchedTask firstTask = toRemove.get(0);
101+
Object batchingKey = firstTask.batchingKey;
102+
assert tasks.stream().allMatch(t -> t.batchingKey == batchingKey) :
103+
"tasks submitted in a batch should share the same batching key: " + tasks;
104+
synchronized (tasksPerBatchingKey) {
105+
LinkedHashSet<BatchedTask> existingTasks = tasksPerBatchingKey.get(batchingKey);
106+
if (existingTasks != null) {
107+
existingTasks.removeAll(toRemove);
108+
if (existingTasks.isEmpty()) {
109+
tasksPerBatchingKey.remove(batchingKey);
110+
}
111+
}
112+
}
113+
onTimeout(toRemove, timeout);
114+
}
115+
}
116+
117+
/**
118+
* Action to be implemented by the specific batching implementation.
119+
* All tasks have the same batching key.
120+
*/
121+
protected abstract void onTimeout(List<? extends BatchedTask> tasks, TimeValue timeout);
122+
123+
void runIfNotProcessed(BatchedTask updateTask) {
124+
// if this task is already processed, it shouldn't execute other tasks with same batching key that arrived later,
125+
// to give other tasks with different batching key a chance to execute.
126+
if (updateTask.processed.get() == false) {
127+
final List<BatchedTask> toExecute = new ArrayList<>();
128+
final Map<String, List<BatchedTask>> processTasksBySource = new HashMap<>();
129+
synchronized (tasksPerBatchingKey) {
130+
LinkedHashSet<BatchedTask> pending = tasksPerBatchingKey.remove(updateTask.batchingKey);
131+
if (pending != null) {
132+
for (BatchedTask task : pending) {
133+
if (task.processed.getAndSet(true) == false) {
134+
logger.trace("will process {}", task);
135+
toExecute.add(task);
136+
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);
137+
} else {
138+
logger.trace("skipping {}, already processed", task);
139+
}
140+
}
141+
}
142+
}
143+
144+
if (toExecute.isEmpty() == false) {
145+
final String tasksSummary = processTasksBySource.entrySet().stream().map(entry -> {
146+
String tasks = updateTask.describeTasks(entry.getValue());
147+
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
148+
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
149+
150+
run(updateTask.batchingKey, toExecute, tasksSummary);
151+
}
152+
}
153+
}
154+
155+
/**
156+
* Action to be implemented by the specific batching implementation
157+
* All tasks have the given batching key.
158+
*/
159+
protected abstract void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary);
160+
161+
/**
162+
* Represents a runnable task that supports batching.
163+
* Implementors of TaskBatcher can subclass this to add a payload to the task.
164+
*/
165+
protected abstract class BatchedTask extends SourcePrioritizedRunnable {
166+
/**
167+
* whether the task has been processed already
168+
*/
169+
protected final AtomicBoolean processed = new AtomicBoolean();
170+
171+
/**
172+
* the object that is used as batching key
173+
*/
174+
protected final Object batchingKey;
175+
/**
176+
* the task object that is wrapped
177+
*/
178+
protected final Object task;
179+
180+
protected BatchedTask(Priority priority, String source, Object batchingKey, Object task) {
181+
super(priority, source);
182+
this.batchingKey = batchingKey;
183+
this.task = task;
184+
}
185+
186+
@Override
187+
public void run() {
188+
runIfNotProcessed(this);
189+
}
190+
191+
@Override
192+
public String toString() {
193+
String taskDescription = describeTasks(Collections.singletonList(this));
194+
if (taskDescription.isEmpty()) {
195+
return "[" + source + "]";
196+
} else {
197+
return "[" + source + "[" + taskDescription + "]]";
198+
}
199+
}
200+
201+
public abstract String describeTasks(List<? extends BatchedTask> tasks);
202+
203+
public Object getTask() {
204+
return task;
205+
}
206+
}
207+
}

core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.concurrent.BlockingQueue;
3131
import java.util.concurrent.ExecutorService;
3232
import java.util.concurrent.LinkedTransferQueue;
33+
import java.util.concurrent.ScheduledExecutorService;
3334
import java.util.concurrent.ThreadFactory;
3435
import java.util.concurrent.ThreadPoolExecutor;
3536
import java.util.concurrent.TimeUnit;
@@ -56,8 +57,8 @@ public static int boundedNumberOfProcessors(Settings settings) {
5657
return PROCESSORS_SETTING.get(settings);
5758
}
5859

59-
public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(String name, ThreadFactory threadFactory, ThreadContext contextHolder) {
60-
return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory, contextHolder);
60+
public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(String name, ThreadFactory threadFactory, ThreadContext contextHolder, ScheduledExecutorService timer) {
61+
return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory, contextHolder, timer);
6162
}
6263

6364
public static EsThreadPoolExecutor newScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) {

core/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,14 @@
4444
public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
4545

4646
private static final TimeValue NO_WAIT_TIME_VALUE = TimeValue.timeValueMillis(0);
47-
private AtomicLong insertionOrder = new AtomicLong();
48-
private Queue<Runnable> current = ConcurrentCollections.newQueue();
47+
private final AtomicLong insertionOrder = new AtomicLong();
48+
private final Queue<Runnable> current = ConcurrentCollections.newQueue();
49+
private final ScheduledExecutorService timer;
4950

50-
PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) {
51+
PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
52+
ThreadFactory threadFactory, ThreadContext contextHolder, ScheduledExecutorService timer) {
5153
super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(), threadFactory, contextHolder);
54+
this.timer = timer;
5255
}
5356

5457
public Pending[] getPending() {
@@ -111,7 +114,7 @@ protected void afterExecute(Runnable r, Throwable t) {
111114
current.remove(r);
112115
}
113116

114-
public void execute(Runnable command, final ScheduledExecutorService timer, final TimeValue timeout, final Runnable timeoutCallback) {
117+
public void execute(Runnable command, final TimeValue timeout, final Runnable timeoutCallback) {
115118
command = wrapRunnable(command);
116119
doExecute(command);
117120
if (timeout.nanos() >= 0) {

0 commit comments

Comments
 (0)