Skip to content

Commit d09e643

Browse files
committed
Add ability to automatically adjust search threadpool queue_size
This PR adds a new thread pool type: `fixed_auto_queue_size`. This thread pool behaves like a regular `fixed` threadpool, except that every `auto_queue_frame_size` operations (default: 10,000) in the thread pool, [Little's Law](https://en.wikipedia.org/wiki/Little's_law) is calculated and used to adjust the pool's `queue_size` either up or down by 50. A minimum and maximum is taken into account also. When the min and max are the same value, a regular fixed executor is used instead. The `SEARCH` threadpool is changed to use this new type of thread pool. However, the min and max are both set to 1000, meaning auto adjustment is opt-in rather than opt-out. Resolves #3890
1 parent 7ef3900 commit d09e643

File tree

14 files changed

+955
-17
lines changed

14 files changed

+955
-17
lines changed

core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.common.settings.Setting;
2323
import org.elasticsearch.common.settings.Setting.Property;
2424
import org.elasticsearch.common.settings.Settings;
25+
import org.elasticsearch.common.unit.TimeValue;
2526
import org.elasticsearch.node.Node;
2627

2728
import java.util.Arrays;
@@ -79,6 +80,33 @@ public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapa
7980
return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy(), contextHolder);
8081
}
8182

83+
/**
84+
* Return a new executor that will automatically adjust the queue size based on queue throughput.
85+
*
86+
* @param size number of fixed threads to use for executing tasks
87+
* @param initialQueueCapacity initial size of the executor queue
88+
* @param minQueueSize minimum queue size that the queue can be adjusted to
89+
* @param maxQueueSize maximum queue size that the queue can be adjusted to
90+
* @param frameSize number of tasks during which stats are collected before adjusting queue size
91+
*/
92+
public static EsThreadPoolExecutor newAutoQueueFixed(String name, int size, int initialQueueCapacity, int minQueueSize,
93+
int maxQueueSize, int frameSize, TimeValue targetedResponseTime,
94+
ThreadFactory threadFactory, ThreadContext contextHolder) {
95+
if (initialQueueCapacity == minQueueSize && initialQueueCapacity == maxQueueSize) {
96+
return newFixed(name, size, initialQueueCapacity, threadFactory, contextHolder);
97+
}
98+
99+
if (initialQueueCapacity <= 0) {
100+
throw new IllegalArgumentException("initial queue capacity for [" + name + "] executor must be positive, got: " +
101+
initialQueueCapacity);
102+
}
103+
ResizableBlockingQueue<Runnable> queue =
104+
new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), initialQueueCapacity);
105+
return new QueueResizingEsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS,
106+
queue, minQueueSize, maxQueueSize, TimedRunnable::new, frameSize, targetedResponseTime, threadFactory,
107+
new EsAbortPolicy(), contextHolder);
108+
}
109+
82110
private static final ExecutorService DIRECT_EXECUTOR_SERVICE = new AbstractExecutorService() {
83111

84112
@Override

core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
3737
/**
3838
* Name used in error reporting.
3939
*/
40-
private final String name;
40+
protected final String name;
4141

4242
EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
4343
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, ThreadContext contextHolder) {
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.common.util.concurrent;
21+
22+
import org.apache.logging.log4j.Logger;
23+
import org.apache.logging.log4j.message.ParameterizedMessage;
24+
import org.elasticsearch.common.collect.Tuple;
25+
import org.elasticsearch.common.logging.ESLoggerFactory;
26+
import org.elasticsearch.common.unit.TimeValue;
27+
import org.elasticsearch.common.util.concurrent.ResizableBlockingQueue;
28+
29+
import java.util.Locale;
30+
import java.util.concurrent.BlockingQueue;
31+
import java.util.concurrent.ThreadFactory;
32+
import java.util.concurrent.ThreadPoolExecutor;
33+
import java.util.concurrent.TimeUnit;
34+
import java.util.concurrent.atomic.AtomicInteger;
35+
import java.util.concurrent.atomic.AtomicLong;
36+
import java.util.function.Function;
37+
import java.util.function.Supplier;
38+
import java.util.stream.Stream;
39+
40+
/**
41+
* An extension to thread pool executor, which automatically adjusts the queue size of the
42+
* {@code ResizableBlockingQueue} according to Little's Law.
43+
*/
44+
public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecutor {
45+
46+
private static final Logger logger =
47+
ESLoggerFactory.getLogger(QueueResizingEsThreadPoolExecutor.class);
48+
49+
private final Function<Runnable, Runnable> runnableWrapper;
50+
private final ResizableBlockingQueue<Runnable> workQueue;
51+
private final int tasksPerFrame;
52+
private final int minQueueSize;
53+
private final int maxQueueSize;
54+
private final long targetedResponseTimeNanos;
55+
// The amount the queue size is adjusted by for each calcuation
56+
private static final int QUEUE_ADJUSTMENT_AMOUNT = 50;
57+
58+
private final AtomicLong totalTaskNanos = new AtomicLong(0);
59+
private final AtomicInteger taskCount = new AtomicInteger(0);
60+
61+
private long startNs;
62+
63+
QueueResizingEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
64+
ResizableBlockingQueue<Runnable> workQueue, int minQueueSize, int maxQueueSize,
65+
Function<Runnable, Runnable> runnableWrapper, final int tasksPerFrame,
66+
TimeValue targetedResponseTime, ThreadFactory threadFactory, XRejectedExecutionHandler handler,
67+
ThreadContext contextHolder) {
68+
super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit,
69+
workQueue, threadFactory, handler, contextHolder);
70+
this.runnableWrapper = runnableWrapper;
71+
this.workQueue = workQueue;
72+
this.tasksPerFrame = tasksPerFrame;
73+
this.startNs = System.nanoTime();
74+
this.minQueueSize = minQueueSize;
75+
this.maxQueueSize = maxQueueSize;
76+
this.targetedResponseTimeNanos = targetedResponseTime.getNanos();
77+
logger.debug("thread pool [{}] will adjust queue by [{}] when determining automatic queue size",
78+
name, QUEUE_ADJUSTMENT_AMOUNT);
79+
}
80+
81+
@Override
82+
protected void doExecute(final Runnable command) {
83+
// we are submitting a task, it has not yet started running (because super.excute() has not
84+
// been called), but it could be immediately run, or run at a later time. We need the time
85+
// this task entered the queue, which we get by creating a TimedRunnable, which starts the
86+
// clock as soon as it is created.
87+
super.doExecute(this.runnableWrapper.apply(command));
88+
}
89+
90+
/**
91+
* Calculate task rate (λ), for a fixed number of tasks and time it took those tasks to be measured
92+
*
93+
* @param totalNumberOfTasks total number of tasks that were measured
94+
* @param totalFrameFrameNanos nanoseconds during which the tasks were received
95+
* @return the rate of tasks in the system
96+
*/
97+
static double calculateLambda(final int totalNumberOfTasks, final long totalFrameFrameNanos) {
98+
assert totalFrameFrameNanos > 0 : "cannot calculate for instantaneous tasks";
99+
assert totalNumberOfTasks > 0 : "cannot calculate for no tasks";
100+
// There is no set execution time, instead we adjust the time window based on the
101+
// number of completed tasks, so there is no background thread required to update the
102+
// queue size at a regular interval. This means we need to calculate our λ by the
103+
// total runtime, rather than a fixed interval.
104+
105+
// λ = total tasks divided by measurement time
106+
return (double) totalNumberOfTasks / totalFrameFrameNanos;
107+
}
108+
109+
/**
110+
* Calculate Little's Law (L), which is the "optimal" queue size for a particular task rate (lambda) and targeted response time.
111+
*
112+
* @param lambda the arrival rate of tasks in nanoseconds
113+
* @param targetedResponseTimeNanos nanoseconds for the average targeted response rate of requests
114+
* @return the optimal queue size for the give task rate and targeted response time
115+
*/
116+
static int calculateL(final double lambda, final long targetedResponseTimeNanos) {
117+
assert targetedResponseTimeNanos > 0 : "cannot calculate for instantaneous requests";
118+
// L = λ * W
119+
return Math.toIntExact((long)(lambda * targetedResponseTimeNanos));
120+
}
121+
122+
/**
123+
* Returns the current queue capacity
124+
*/
125+
public int getCurrentCapacity() {
126+
return workQueue.capacity();
127+
}
128+
129+
@Override
130+
protected void afterExecute(Runnable r, Throwable t) {
131+
super.afterExecute(r, t);
132+
// A task has been completed, it has left the building. We should now be able to get the
133+
// total time as a combination of the time in the queue and time spent running the task. We
134+
// only want runnables that did not throw errors though, because they could be fast-failures
135+
// that throw off our timings, so only check when t is null.
136+
assert r instanceof TimedRunnable : "expected only TimedRunnables in queue";
137+
final long taskNanos = ((TimedRunnable) r).getTotalNanos();
138+
final long totalNanos = totalTaskNanos.addAndGet(taskNanos);
139+
if (taskCount.incrementAndGet() == this.tasksPerFrame) {
140+
final long endTimeNs = System.nanoTime();
141+
final long totalRuntime = endTimeNs - this.startNs;
142+
// Reset the start time for all tasks. At first glance this appears to need to be
143+
// volatile, since we are reading from a different thread when it is set, but it
144+
// is protected by the taskCount memory barrier.
145+
// See: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/package-summary.html
146+
startNs = endTimeNs;
147+
148+
// Calculate the new desired queue size
149+
try {
150+
final double lambda = calculateLambda(tasksPerFrame, totalNanos);
151+
final int desiredQueueSize = calculateL(lambda, targetedResponseTimeNanos);
152+
if (logger.isDebugEnabled()) {
153+
final long avgTaskTime = totalNanos / tasksPerFrame;
154+
logger.debug("[{}]: there were [{}] tasks in [{}], avg task time: [{}], [{} tasks/s], " +
155+
"optimal queue is [{}]",
156+
name,
157+
tasksPerFrame,
158+
TimeValue.timeValueNanos(totalRuntime),
159+
TimeValue.timeValueNanos(avgTaskTime),
160+
String.format(Locale.ROOT, "%.2f", lambda * TimeValue.timeValueSeconds(1).nanos()),
161+
desiredQueueSize);
162+
}
163+
164+
final int oldCapacity = workQueue.capacity();
165+
166+
// Adjust the queue size towards the desired capacity using an adjust of
167+
// QUEUE_ADJUSTMENT_AMOUNT (either up or down), keeping in mind the min and max
168+
// values the queue size can have.
169+
final int newCapacity =
170+
workQueue.adjustCapacity(desiredQueueSize, QUEUE_ADJUSTMENT_AMOUNT, minQueueSize, maxQueueSize);
171+
if (oldCapacity != newCapacity && logger.isDebugEnabled()) {
172+
logger.debug("adjusted [{}] queue size by [{}], old capacity: [{}], new capacity: [{}]", name,
173+
newCapacity > oldCapacity ? QUEUE_ADJUSTMENT_AMOUNT : -QUEUE_ADJUSTMENT_AMOUNT,
174+
oldCapacity, newCapacity);
175+
}
176+
} catch (ArithmeticException e) {
177+
// There was an integer overflow, so just log about it, rather than adjust the queue size
178+
logger.warn((Supplier<?>) () -> new ParameterizedMessage(
179+
"failed to calculate optimal queue size for [{}] thread pool, " +
180+
"total frame time [{}ns], tasks [{}], task execution time [{}ns]",
181+
name, totalRuntime, tasksPerFrame, totalNanos),
182+
e);
183+
} finally {
184+
// Finally, decrement the task count and time back to their starting values. We
185+
// do this at the end so there is no concurrent adjustments happening. We also
186+
// decrement them instead of resetting them back to zero, as resetting them back
187+
// to zero causes operations that came in during the adjustment to be uncounted
188+
int tasks = taskCount.addAndGet(-this.tasksPerFrame);
189+
assert tasks >= 0 : "tasks should never be negative, got: " + tasks;
190+
191+
if (tasks >= this.tasksPerFrame) {
192+
// Start over, because we can potentially reach a "never adjusting" state,
193+
//
194+
// consider the following:
195+
// - If the frame window is 10, and there are 10 tasks, then an adjustment will begin. (taskCount == 10)
196+
// - Prior to the adjustment being done, 15 more tasks come in, the taskCount is now 25
197+
// - Adjustment happens and we decrement the tasks by 10, taskCount is now 15
198+
// - Since taskCount will now be incremented forever, it will never be 10 again,
199+
// so there will be no further adjustments
200+
logger.debug("[{}]: too many incoming tasks while queue size adjustment occurs, resetting measurements to 0", name);
201+
totalTaskNanos.getAndSet(0);
202+
taskCount.getAndSet(0);
203+
startNs = System.nanoTime();
204+
} else {
205+
// Do a regular adjustment
206+
totalTaskNanos.addAndGet(-totalNanos);
207+
}
208+
}
209+
}
210+
}
211+
212+
@Override
213+
public String toString() {
214+
StringBuilder b = new StringBuilder();
215+
b.append(getClass().getSimpleName()).append('[');
216+
b.append(name).append(", ");
217+
218+
@SuppressWarnings("rawtypes")
219+
ResizableBlockingQueue queue = (ResizableBlockingQueue) getQueue();
220+
221+
b.append("queue capacity = ").append(getCurrentCapacity()).append(", ");
222+
b.append("min queue capacity = ").append(minQueueSize).append(", ");
223+
b.append("max queue capacity = ").append(maxQueueSize).append(", ");
224+
b.append("frame size = ").append(tasksPerFrame).append(", ");
225+
b.append("targeted response rate = ").append(TimeValue.timeValueNanos(targetedResponseTimeNanos)).append(", ");
226+
b.append("adjustment amount = ").append(QUEUE_ADJUSTMENT_AMOUNT).append(", ");
227+
/*
228+
* ThreadPoolExecutor has some nice information in its toString but we
229+
* can't get at it easily without just getting the toString.
230+
*/
231+
b.append(super.toString()).append(']');
232+
return b.toString();
233+
}
234+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.common.util.concurrent;
21+
22+
import java.util.concurrent.BlockingQueue;
23+
import org.elasticsearch.common.SuppressForbidden;
24+
25+
/**
26+
* Extends the {@code SizeBlockingQueue} to add the {@code adjustCapacity} method, which will adjust
27+
* the capacity by a certain amount towards a maximum or minimum.
28+
*/
29+
final class ResizableBlockingQueue<E> extends SizeBlockingQueue<E> {
30+
31+
private volatile int capacity;
32+
33+
ResizableBlockingQueue(BlockingQueue<E> queue, int initialCapacity) {
34+
super(queue, initialCapacity);
35+
this.capacity = initialCapacity;
36+
}
37+
38+
@SuppressForbidden(reason = "optimalCapacity is non-negative, therefore the difference cannot be < -Integer.MAX_VALUE")
39+
private int getChangeAmount(int optimalCapacity) {
40+
assert optimalCapacity >= 0 : "optimal capacity should always be positive, got: " + optimalCapacity;
41+
return Math.abs(optimalCapacity - this.capacity);
42+
}
43+
44+
@Override
45+
public int capacity() {
46+
return this.capacity;
47+
}
48+
49+
@Override
50+
public int remainingCapacity() {
51+
return Math.max(0, this.capacity());
52+
}
53+
54+
/** Resize the limit for the queue, returning the new size limit */
55+
public synchronized int adjustCapacity(int optimalCapacity, int adjustmentAmount, int minCapacity, int maxCapacity) {
56+
assert adjustmentAmount > 0 : "adjustment amount should be a positive value";
57+
assert optimalCapacity >= 0 : "desired capacity cannot be negative";
58+
assert minCapacity >= 0 : "cannot have min capacity smaller than 0";
59+
assert maxCapacity >= minCapacity : "cannot have max capacity smaller than min capacity";
60+
61+
if (optimalCapacity == capacity) {
62+
// Yahtzee!
63+
return this.capacity;
64+
}
65+
66+
if (optimalCapacity > capacity + adjustmentAmount) {
67+
// adjust up
68+
final int newCapacity = Math.min(maxCapacity, capacity + adjustmentAmount);
69+
this.capacity = newCapacity;
70+
return newCapacity;
71+
} else if (optimalCapacity < capacity - adjustmentAmount) {
72+
// adjust down
73+
final int newCapacity = Math.max(minCapacity, capacity - adjustmentAmount);
74+
this.capacity = newCapacity;
75+
return newCapacity;
76+
} else {
77+
return this.capacity;
78+
}
79+
}
80+
}

core/src/main/java/org/elasticsearch/common/util/concurrent/SizeBlockingQueue.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public void forcePut(E e) throws InterruptedException {
131131
@Override
132132
public boolean offer(E e) {
133133
int count = size.incrementAndGet();
134-
if (count > capacity) {
134+
if (count > capacity()) {
135135
size.decrementAndGet();
136136
return false;
137137
}
@@ -168,7 +168,7 @@ public E take() throws InterruptedException {
168168

169169
@Override
170170
public int remainingCapacity() {
171-
return capacity - size.get();
171+
return capacity() - size.get();
172172
}
173173

174174
@Override

0 commit comments

Comments
 (0)