Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common;


import java.util.concurrent.atomic.AtomicLong;

/**
* Implements exponentially weighted moving averages (commonly abbreviated EWMA) for a single value.
* This class is safe to share between threads.
*/
public class ExponentiallyWeightedMovingAverage {

private final double alpha;
private final AtomicLong averageBits;

/**
* Create a new EWMA with a given {@code alpha} and {@code initialAvg}. A smaller alpha means
* that new data points will have less weight, where a high alpha means older data points will
* have a lower influence.
*/
public ExponentiallyWeightedMovingAverage(double alpha, double initialAvg) {
if (alpha < 0 || alpha > 1) {
throw new IllegalArgumentException("alpha must be greater or equal to 0 and less than or equal to 1");
}
this.alpha = alpha;
this.averageBits = new AtomicLong(Double.doubleToLongBits(initialAvg));
}

public double getAverage() {
return Double.longBitsToDouble(this.averageBits.get());
}

public void addValue(double newValue) {
boolean successful = false;
do {
final long currentBits = this.averageBits.get();
final double currentAvg = getAverage();
final double newAvg = (alpha * newValue) + ((1 - alpha) * currentAvg);
final long newBits = Double.doubleToLongBits(newAvg);
successful = averageBits.compareAndSet(currentBits, newBits);
} while (successful == false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.ExponentiallyWeightedMovingAverage;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -43,17 +44,21 @@
*/
public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecutor {

// This is a random starting point alpha. TODO: revisit this with actual testing and/or make it configurable
public static double EWMA_ALPHA = 0.3;

private static final Logger logger =
ESLoggerFactory.getLogger(QueueResizingEsThreadPoolExecutor.class);
// The amount the queue size is adjusted by for each calcuation
private static final int QUEUE_ADJUSTMENT_AMOUNT = 50;

private final Function<Runnable, Runnable> runnableWrapper;
private final ResizableBlockingQueue<Runnable> workQueue;
private final int tasksPerFrame;
private final int minQueueSize;
private final int maxQueueSize;
private final long targetedResponseTimeNanos;
// The amount the queue size is adjusted by for each calcuation
private static final int QUEUE_ADJUSTMENT_AMOUNT = 50;
private final ExponentiallyWeightedMovingAverage executionEWMA;

private final AtomicLong totalTaskNanos = new AtomicLong(0);
private final AtomicInteger taskCount = new AtomicInteger(0);
Expand All @@ -74,6 +79,9 @@ public final class QueueResizingEsThreadPoolExecutor extends EsThreadPoolExecuto
this.minQueueSize = minQueueSize;
this.maxQueueSize = maxQueueSize;
this.targetedResponseTimeNanos = targetedResponseTime.getNanos();
// We choose to start the EWMA with the targeted response time, reasoning that it is a
// better start point for a realistic task execution time than starting at 0
this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, targetedResponseTimeNanos);
logger.debug("thread pool [{}] will adjust queue by [{}] when determining automatic queue size",
name, QUEUE_ADJUSTMENT_AMOUNT);
}
Expand Down Expand Up @@ -126,6 +134,13 @@ public int getCurrentCapacity() {
return workQueue.capacity();
}

/**
* Returns the exponentially weighted moving average of the task execution time
*/
public double getTaskExecutionEWMA() {
return executionEWMA.getAverage();
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
Expand All @@ -136,6 +151,11 @@ protected void afterExecute(Runnable r, Throwable t) {
assert r instanceof TimedRunnable : "expected only TimedRunnables in queue";
final long taskNanos = ((TimedRunnable) r).getTotalNanos();
final long totalNanos = totalTaskNanos.addAndGet(taskNanos);

final long taskExecutionNanos = ((TimedRunnable) r).getTotalExecutionNanos();
assert taskExecutionNanos >= 0 : "expected task to always take longer than 0 nanoseconds, got: " + taskExecutionNanos;
executionEWMA.addValue(taskExecutionNanos);

if (taskCount.incrementAndGet() == this.tasksPerFrame) {
final long endTimeNs = System.nanoTime();
final long totalRuntime = endTimeNs - this.startNs;
Expand All @@ -149,20 +169,22 @@ protected void afterExecute(Runnable r, Throwable t) {
try {
final double lambda = calculateLambda(tasksPerFrame, totalNanos);
final int desiredQueueSize = calculateL(lambda, targetedResponseTimeNanos);
final int oldCapacity = workQueue.capacity();

if (logger.isDebugEnabled()) {
final long avgTaskTime = totalNanos / tasksPerFrame;
logger.debug("[{}]: there were [{}] tasks in [{}], avg task time: [{}], [{} tasks/s], " +
"optimal queue is [{}]",
logger.debug("[{}]: there were [{}] tasks in [{}], avg task time [{}], EWMA task execution [{}], " +
"[{} tasks/s], optimal queue is [{}], current capacity [{}]",
name,
tasksPerFrame,
TimeValue.timeValueNanos(totalRuntime),
TimeValue.timeValueNanos(avgTaskTime),
TimeValue.timeValueNanos((long)executionEWMA.getAverage()),
String.format(Locale.ROOT, "%.2f", lambda * TimeValue.timeValueSeconds(1).nanos()),
desiredQueueSize);
desiredQueueSize,
oldCapacity);
}

final int oldCapacity = workQueue.capacity();

// Adjust the queue size towards the desired capacity using an adjust of
// QUEUE_ADJUSTMENT_AMOUNT (either up or down), keeping in mind the min and max
// values the queue size can have.
Expand Down Expand Up @@ -223,6 +245,7 @@ public String toString() {
b.append("max queue capacity = ").append(maxQueueSize).append(", ");
b.append("frame size = ").append(tasksPerFrame).append(", ");
b.append("targeted response rate = ").append(TimeValue.timeValueNanos(targetedResponseTimeNanos)).append(", ");
b.append("task execution EWMA = ").append(TimeValue.timeValueNanos((long)executionEWMA.getAverage())).append(", ");
b.append("adjustment amount = ").append(QUEUE_ADJUSTMENT_AMOUNT).append(", ");
/*
* ThreadPoolExecutor has some nice information in its toString but we
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@
package org.elasticsearch.common.util.concurrent;

/**
* A class used to wrap a {@code Runnable} that allows capturing the time the task since creation
* through execution.
* A class used to wrap a {@code Runnable} that allows capturing the time of the task since creation
* through execution as well as only execution time.
*/
class TimedRunnable implements Runnable {
private final Runnable original;
private final long creationTimeNanos;
private long startTimeNanos;
private long finishTimeNanos = -1;

TimedRunnable(Runnable original) {
Expand All @@ -36,6 +37,7 @@ class TimedRunnable implements Runnable {
@Override
public void run() {
try {
startTimeNanos = System.nanoTime();
original.run();
} finally {
finishTimeNanos = System.nanoTime();
Expand All @@ -53,4 +55,16 @@ long getTotalNanos() {
}
return finishTimeNanos - creationTimeNanos;
}

/**
* Return the time this task spent being run.
* If the task is still running or has not yet been run, returns -1.
*/
long getTotalExecutionNanos() {
if (startTimeNanos == -1 || finishTimeNanos == -1) {
// There must have been an exception thrown, the total time is unknown (-1)
return -1;
}
return finishTimeNanos - startTimeNanos;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common;

import org.elasticsearch.test.ESTestCase;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertThat;

/**
* Implements exponentially weighted moving averages (commonly abbreviated EWMA) for a single value.
*/
public class ExponentiallyWeightedMovingAverageTests extends ESTestCase {

public void testEWMA() {
final ExponentiallyWeightedMovingAverage ewma = new ExponentiallyWeightedMovingAverage(0.5, 10);
ewma.addValue(12);
assertThat(ewma.getAverage(), equalTo(11.0));
ewma.addValue(10);
ewma.addValue(15);
ewma.addValue(13);
assertThat(ewma.getAverage(), equalTo(12.875));
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be nice to have a test that starts with 0 and continually adds the same number and asserts that eventually the average comes near enough to the number. Or something that asserts that the changes get smaller with additional data points.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I've added this test, thanks for the suggestion

public void testInvalidAlpha() {
try {
ExponentiallyWeightedMovingAverage ewma = new ExponentiallyWeightedMovingAverage(-0.5, 10);
fail("should have failed to create EWMA");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("alpha must be greater or equal to 0 and less than or equal to 1"));
}

try {
ExponentiallyWeightedMovingAverage ewma = new ExponentiallyWeightedMovingAverage(1.5, 10);
fail("should have failed to create EWMA");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("alpha must be greater or equal to 0 and less than or equal to 1"));
}
}

public void testConvergingToValue() {
final ExponentiallyWeightedMovingAverage ewma = new ExponentiallyWeightedMovingAverage(0.5, 10000);
for (int i = 0; i < 100000; i++) {
ewma.addValue(1);
}
assertThat(ewma.getAverage(), lessThan(2.0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,47 @@ public void testAutoQueueSizingWithMax() throws Exception {
context.close();
}

public void testExecutionEWMACalculation() throws Exception {
ThreadContext context = new ThreadContext(Settings.EMPTY);
ResizableBlockingQueue<Runnable> queue =
new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(),
100);

QueueResizingEsThreadPoolExecutor executor =
new QueueResizingEsThreadPoolExecutor(
"test-threadpool", 1, 1, 1000,
TimeUnit.MILLISECONDS, queue, 10, 200, fastWrapper(), 10, TimeValue.timeValueMillis(1),
EsExecutors.daemonThreadFactory("queuetest"), new EsAbortPolicy(), context);
executor.prestartAllCoreThreads();
logger.info("--> executor: {}", executor);

assertThat((long)executor.getTaskExecutionEWMA(), equalTo(1000000L));
executeTask(executor, 1);
assertBusy(() -> {
assertThat((long)executor.getTaskExecutionEWMA(), equalTo(700030L));
});
executeTask(executor, 1);
assertBusy(() -> {
assertThat((long)executor.getTaskExecutionEWMA(), equalTo(490050L));
});
executeTask(executor, 1);
assertBusy(() -> {
assertThat((long)executor.getTaskExecutionEWMA(), equalTo(343065L));
});
executeTask(executor, 1);
assertBusy(() -> {
assertThat((long)executor.getTaskExecutionEWMA(), equalTo(240175L));
});
executeTask(executor, 1);
assertBusy(() -> {
assertThat((long)executor.getTaskExecutionEWMA(), equalTo(168153L));
});

executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
context.close();
}

private Function<Runnable, Runnable> randomBetweenLimitsWrapper(final int minNs, final int maxNs) {
return (runnable) -> {
return new SettableTimedRunnable(randomIntBetween(minNs, maxNs));
Expand Down Expand Up @@ -222,5 +263,10 @@ public SettableTimedRunnable(long timeTaken) {
public long getTotalNanos() {
return timeTaken;
}

@Override
public long getTotalExecutionNanos() {
return timeTaken;
}
}
}