Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.Node;

import java.util.Arrays;
Expand Down Expand Up @@ -79,6 +80,33 @@ public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapa
return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy(), contextHolder);
}

/**
* Return a new executor that will automatically adjust the queue size based on queue throughput.
*
* @param size number of fixed threads to use for executing tasks
* @param initialQueueCapacity initial size of the executor queue
* @param minQueueSize minimum queue size that the queue can be adjusted to
* @param maxQueueSize maximum queue size that the queue can be adjusted to
* @param frameSize number of tasks during which stats are collected before adjusting queue size
*/
public static EsThreadPoolExecutor newAutoQueueFixed(String name, int size, int initialQueueCapacity, int minQueueSize,
int maxQueueSize, int frameSize, TimeValue targetedResponseTime,
ThreadFactory threadFactory, ThreadContext contextHolder) {
if (initialQueueCapacity == minQueueSize && initialQueueCapacity == maxQueueSize) {
return newFixed(name, size, initialQueueCapacity, threadFactory, contextHolder);
}

if (initialQueueCapacity <= 0) {
throw new IllegalArgumentException("initial queue capacity for [" + name + "] executor must be positive, got: " +
initialQueueCapacity);
}
ResizableBlockingQueue<Runnable> queue =
new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), initialQueueCapacity);
return new QueueResizingEsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we return a simple EsThreadPoolExecutor if min and max queue size is the same here? I think that would be safer from a bwc perspective?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a BWC perspective it certainly would, though then a user couldn't turn on debug logging to see what kind of calculations we make about the optimal queue size.

I'll make the change though, since it's the most safe thing to do.

queue, minQueueSize, maxQueueSize, TimedRunnable::new, frameSize, targetedResponseTime, threadFactory,
new EsAbortPolicy(), contextHolder);
}

private static final ExecutorService DIRECT_EXECUTOR_SERVICE = new AbstractExecutorService() {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
/**
* Name used in error reporting.
*/
private final String name;
protected final String name;

EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, ThreadContext contextHolder) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common.util.concurrent;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ResizableBlockingQueue;

import java.util.Locale;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;

/**
* An extension to thread pool executor, which automatically adjusts the queue size of the
* {@code ResizableBlockingQueue} according to Little's Law.
*/
public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecutor {

private static final Logger logger =
ESLoggerFactory.getLogger(QueueResizingEsThreadPoolExecutor.class);

private final Function<Runnable, Runnable> runnableWrapper;
private final ResizableBlockingQueue<Runnable> workQueue;
private final int tasksPerFrame;
private final int minQueueSize;
private final int maxQueueSize;
private final long targetedResponseTimeNanos;
// The amount the queue size is adjusted by for each calcuation
private static final int QUEUE_ADJUSTMENT_AMOUNT = 50;

private final AtomicLong totalTaskNanos = new AtomicLong(0);
private final AtomicInteger taskCount = new AtomicInteger(0);

private long startNs;

QueueResizingEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
ResizableBlockingQueue<Runnable> workQueue, int minQueueSize, int maxQueueSize,
Function<Runnable, Runnable> runnableWrapper, final int tasksPerFrame,
TimeValue targetedResponseTime, ThreadFactory threadFactory, XRejectedExecutionHandler handler,
ThreadContext contextHolder) {
super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue, threadFactory, handler, contextHolder);
this.runnableWrapper = runnableWrapper;
this.workQueue = workQueue;
this.tasksPerFrame = tasksPerFrame;
this.startNs = System.nanoTime();
this.minQueueSize = minQueueSize;
this.maxQueueSize = maxQueueSize;
this.targetedResponseTimeNanos = targetedResponseTime.getNanos();
logger.debug("thread pool [{}] will adjust queue by [{}] when determining automatic queue size",
name, QUEUE_ADJUSTMENT_AMOUNT);
}

@Override
protected void doExecute(final Runnable command) {
// we are submitting a task, it has not yet started running (because super.excute() has not
// been called), but it could be immediately run, or run at a later time. We need the time
// this task entered the queue, which we get by creating a TimedRunnable, which starts the
// clock as soon as it is created.
super.doExecute(this.runnableWrapper.apply(command));
}

/**
* Calculate task rate (λ), for a fixed number of tasks and time it took those tasks to be measured
*
* @param totalNumberOfTasks total number of tasks that were measured
* @param totalFrameFrameNanos nanoseconds during which the tasks were received
* @return the rate of tasks in the system
*/
static double calculateLambda(final int totalNumberOfTasks, final long totalFrameFrameNanos) {
assert totalFrameFrameNanos > 0 : "cannot calculate for instantaneous tasks";
assert totalNumberOfTasks > 0 : "cannot calculate for no tasks";
// There is no set execution time, instead we adjust the time window based on the
// number of completed tasks, so there is no background thread required to update the
// queue size at a regular interval. This means we need to calculate our λ by the
// total runtime, rather than a fixed interval.

// λ = total tasks divided by measurement time
return (double) totalNumberOfTasks / totalFrameFrameNanos;
}

/**
* Calculate Little's Law (L), which is the "optimal" queue size for a particular task rate (lambda) and targeted response time.
*
* @param lambda the arrival rate of tasks in nanoseconds
* @param targetedResponseTimeNanos nanoseconds for the average targeted response rate of requests
* @return the optimal queue size for the give task rate and targeted response time
*/
static int calculateL(final double lambda, final long targetedResponseTimeNanos) {
assert targetedResponseTimeNanos > 0 : "cannot calculate for instantaneous requests";
// L = λ * W
return Math.toIntExact((long)(lambda * targetedResponseTimeNanos));
}

/**
* Returns the current queue capacity
*/
public int getCurrentCapacity() {
return workQueue.capacity();
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
// A task has been completed, it has left the building. We should now be able to get the
// total time as a combination of the time in the queue and time spent running the task. We
// only want runnables that did not throw errors though, because they could be fast-failures
// that throw off our timings, so only check when t is null.
assert r instanceof TimedRunnable : "expected only TimedRunnables in queue";
final long taskNanos = ((TimedRunnable) r).getTotalNanos();
final long totalNanos = totalTaskNanos.addAndGet(taskNanos);
if (taskCount.incrementAndGet() == this.tasksPerFrame) {
final long endTimeNs = System.nanoTime();
final long totalRuntime = endTimeNs - this.startNs;
// Reset the start time for all tasks. At first glance this appears to need to be
// volatile, since we are reading from a different thread when it is set, but it
// is protected by the taskCount memory barrier.
// See: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/package-summary.html
startNs = endTimeNs;

// Calculate the new desired queue size
try {
final double lambda = calculateLambda(tasksPerFrame, totalNanos);
final int desiredQueueSize = calculateL(lambda, targetedResponseTimeNanos);
if (logger.isDebugEnabled()) {
final long avgTaskTime = totalNanos / tasksPerFrame;
logger.debug("[{}]: there were [{}] tasks in [{}], avg task time: [{}], [{} tasks/s], " +
"optimal queue is [{}]",
name,
tasksPerFrame,
TimeValue.timeValueNanos(totalRuntime),
TimeValue.timeValueNanos(avgTaskTime),
String.format(Locale.ROOT, "%.2f", lambda * TimeValue.timeValueSeconds(1).nanos()),
desiredQueueSize);
}

final int oldCapacity = workQueue.capacity();

// Adjust the queue size towards the desired capacity using an adjust of
// QUEUE_ADJUSTMENT_AMOUNT (either up or down), keeping in mind the min and max
// values the queue size can have.
final int newCapacity =
workQueue.adjustCapacity(desiredQueueSize, QUEUE_ADJUSTMENT_AMOUNT, minQueueSize, maxQueueSize);
if (oldCapacity != newCapacity && logger.isDebugEnabled()) {
logger.debug("adjusted [{}] queue size by [{}], old capacity: [{}], new capacity: [{}]", name,
newCapacity > oldCapacity ? QUEUE_ADJUSTMENT_AMOUNT : -QUEUE_ADJUSTMENT_AMOUNT,
oldCapacity, newCapacity);
}
} catch (ArithmeticException e) {
// There was an integer overflow, so just log about it, rather than adjust the queue size
logger.warn((Supplier<?>) () -> new ParameterizedMessage(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we detect this rather than catching this exception? I'd feel better about it if we could

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me make sure I understand what you're asking - we switched to using Math.toIntExact, so are you saying we should manually check if the long value is > Integer.MAX_VALUE rather than catching the exception?

"failed to calculate optimal queue size for [{}] thread pool, " +
"total frame time [{}ns], tasks [{}], task execution time [{}ns]",
name, totalRuntime, tasksPerFrame, totalNanos),
e);
} finally {
// Finally, decrement the task count and time back to their starting values. We
// do this at the end so there is no concurrent adjustments happening. We also
// decrement them instead of resetting them back to zero, as resetting them back
// to zero causes operations that came in during the adjustment to be uncounted
int tasks = taskCount.addAndGet(-this.tasksPerFrame);
assert tasks >= 0 : "tasks should never be negative, got: " + tasks;

if (tasks >= this.tasksPerFrame) {
// Start over, because we can potentially reach a "never adjusting" state,
//
// consider the following:
// - If the frame window is 10, and there are 10 tasks, then an adjustment will begin. (taskCount == 10)
// - Prior to the adjustment being done, 15 more tasks come in, the taskCount is now 25
// - Adjustment happens and we decrement the tasks by 10, taskCount is now 15
// - Since taskCount will now be incremented forever, it will never be 10 again,
// so there will be no further adjustments
logger.debug("[{}]: too many incoming tasks while queue size adjustment occurs, resetting measurements to 0", name);
totalTaskNanos.getAndSet(0);
taskCount.getAndSet(0);
startNs = System.nanoTime();
} else {
// Do a regular adjustment
totalTaskNanos.addAndGet(-totalNanos);
}
}
}
}

@Override
public String toString() {
StringBuilder b = new StringBuilder();
b.append(getClass().getSimpleName()).append('[');
b.append(name).append(", ");

@SuppressWarnings("rawtypes")
ResizableBlockingQueue queue = (ResizableBlockingQueue) getQueue();

b.append("queue capacity = ").append(getCurrentCapacity()).append(", ");
b.append("min queue capacity = ").append(minQueueSize).append(", ");
b.append("max queue capacity = ").append(maxQueueSize).append(", ");
b.append("frame size = ").append(tasksPerFrame).append(", ");
b.append("targeted response rate = ").append(TimeValue.timeValueNanos(targetedResponseTimeNanos)).append(", ");
b.append("adjustment amount = ").append(QUEUE_ADJUSTMENT_AMOUNT).append(", ");
/*
* ThreadPoolExecutor has some nice information in its toString but we
* can't get at it easily without just getting the toString.
*/
b.append(super.toString()).append(']');
return b.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common.util.concurrent;

import java.util.concurrent.BlockingQueue;
import org.elasticsearch.common.SuppressForbidden;

/**
* Extends the {@code SizeBlockingQueue} to add the {@code adjustCapacity} method, which will adjust
* the capacity by a certain amount towards a maximum or minimum.
*/
final class ResizableBlockingQueue<E> extends SizeBlockingQueue<E> {

private volatile int capacity;

ResizableBlockingQueue(BlockingQueue<E> queue, int initialCapacity) {
super(queue, initialCapacity);
this.capacity = initialCapacity;
}

@SuppressForbidden(reason = "optimalCapacity is non-negative, therefore the difference cannot be < -Integer.MAX_VALUE")
private int getChangeAmount(int optimalCapacity) {
assert optimalCapacity >= 0 : "optimal capacity should always be positive, got: " + optimalCapacity;
return Math.abs(optimalCapacity - this.capacity);
}

@Override
public int capacity() {
return this.capacity;
}

@Override
public int remainingCapacity() {
return Math.max(0, this.capacity());
}

/** Resize the limit for the queue, returning the new size limit */
public synchronized int adjustCapacity(int optimalCapacity, int adjustmentAmount, int minCapacity, int maxCapacity) {
assert adjustmentAmount > 0 : "adjustment amount should be a positive value";
assert optimalCapacity >= 0 : "desired capacity cannot be negative";
assert minCapacity >= 0 : "cannot have min capacity smaller than 0";
assert maxCapacity >= minCapacity : "cannot have max capacity smaller than min capacity";

if (optimalCapacity == capacity) {
// Yahtzee!
return this.capacity;
}

if (optimalCapacity > capacity + adjustmentAmount) {
// adjust up
final int newCapacity = Math.min(maxCapacity, capacity + adjustmentAmount);
this.capacity = newCapacity;
return newCapacity;
} else if (optimalCapacity < capacity - adjustmentAmount) {
// adjust down
final int newCapacity = Math.max(minCapacity, capacity - adjustmentAmount);
this.capacity = newCapacity;
return newCapacity;
} else {
return this.capacity;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void forcePut(E e) throws InterruptedException {
@Override
public boolean offer(E e) {
int count = size.incrementAndGet();
if (count > capacity) {
if (count > capacity()) {
size.decrementAndGet();
return false;
}
Expand Down Expand Up @@ -168,7 +168,7 @@ public E take() throws InterruptedException {

@Override
public int remainingCapacity() {
return capacity - size.get();
return capacity() - size.get();
}

@Override
Expand Down
Loading