Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,12 @@
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/**
*
*/
public class EsExecutors {

/**
Expand All @@ -62,16 +58,11 @@ public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(String name,

public static EsThreadPoolExecutor newScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) {
ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<>();
// we force the execution, since we might run into concurrency issues in offer for ScalingBlockingQueue
EsThreadPoolExecutor executor = new EsThreadPoolExecutor(name, min, max, keepAliveTime, unit, queue, threadFactory, new ForceQueuePolicy(), contextHolder);
queue.executor = executor;
return executor;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a comment about using the ForceQueuePolicy capturing our research? will be a shame to have to do it again :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How I wish I wrote down what we found out at the time. Now that I look at the ExecutorScalingQueue, I see:

@Override
        public boolean offer(E e) {
            if (!tryTransfer(e)) {
                int left = executor.getMaximumPoolSize() - executor.getCorePoolSize(); <-- **** 
                if (left > 0) {
                    return false;
                } else {
                    return super.offer(e);
                }
            } else {
                return true;
            }
        }

The place I marked with *** feels weird as it effectively returns false all the time, which is fine due to the fact that we want to prefer adding threads first and then, if it fails queue. Which is why we need the rejection policy. My guess is that this there for the case where people make a "fixed" thread pool out of a scaling one but having min==max (feels like an unneeded optimization to me, btw).

Do you agree, and if so, can we add a comment documenting?

Copy link
Member Author

@jasontedor jasontedor Apr 23, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you agree, and if so, can we add a comment documenting?

Yes, your explanation is basically correct, except I don't agree with the part about it being there for the case when people make a fixed thread pool out of a scaling one. It's there for the opposite case, when there could be spare capacity in the thread pool. It's because the JVM prefers queueing to creating new worker threads when the thread pool has reached core pool size that this is needed.

I added comments explaining the behavior.

public static EsThreadPoolExecutor newCached(String name, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) {
return new EsThreadPoolExecutor(name, 0, Integer.MAX_VALUE, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory, new EsAbortPolicy(), contextHolder);
}

public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity, ThreadFactory threadFactory, ThreadContext contextHolder) {
BlockingQueue<Runnable> queue;
if (queueCapacity < 0) {
Expand Down Expand Up @@ -114,6 +105,7 @@ public static ThreadFactory daemonThreadFactory(String namePrefix) {
}

static class EsThreadFactory implements ThreadFactory {

final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;
Expand All @@ -133,6 +125,7 @@ public Thread newThread(Runnable r) {
t.setDaemon(true);
return t;
}

}

/**
Expand All @@ -141,7 +134,6 @@ public Thread newThread(Runnable r) {
private EsExecutors() {
}


static class ExecutorScalingQueue<E> extends LinkedTransferQueue<E> {

ThreadPoolExecutor executor;
Expand All @@ -151,9 +143,17 @@ public ExecutorScalingQueue() {

@Override
public boolean offer(E e) {
// first try to transfer to a waiting worker thread
if (!tryTransfer(e)) {
// check if there might be spare capacity in the thread
// pool executor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is what is happening here. AFAICS the expression executor.getMaximumPoolSize() - executor.getCorePoolSize() is fixed once we have constructed the threadpool and only changed by setCorePoolSize() and setMaximumPoolSize().

What I think this code effectively does is to behave like a fixed size thread pool with unbounded queue in case max pool size is configured to be equal to core pool size (which is what @bleskes was essentially saying, I guess). If that's the case, I also concur with @bleskes that we should get rid of this case distinction. This means that we should require min < max for scaling threadpool and otherwise require to explicitly specify fixed thread pool.

Copy link
Member Author

@jasontedor jasontedor Apr 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICS the expression executor.getMaximumPoolSize() - executor.getCorePoolSize() is fixed once we have constructed the threadpool

@ywelsch This is correct and was already taken into consideration in my assessment of the situation.

From java.util.concurrent.ThreadPoolExecutor#execute:

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);

The first thing that the ThreadPoolExecutor does is checks if there are fewer than core pool size threads. If there are, it tries to add a new worker to handle the submitted task. If this fails (it's racy!), ThreadPoolExecutor then tries to place the task on the queue. This is where we turn to the ExecutorScalingQueue:

        public boolean offer(E e) {
            if (!tryTransfer(e)) {
                int left = executor.getMaximumPoolSize() - executor.getCorePoolSize();
                if (left > 0) {
                    return false;
                } else {
                    return super.offer(e);
                }
            } else {
                return true;
            }
        }

We first try to transfer the task to an available worker thread. If this fails, we come to the code in question. We check if there might be spare capacity in the thread pool, which can only be the case if the thread pool was configured with max pool size greater than core pool size. If there might be spare capacity, we return false. This causes the ThreadPoolExecutor to create a new worker thread, even if core pool size threads are already running. Again, the ThreadPoolExecutor prefers queueing to creating new workers when there are core pool size threads running, and will only resort to creating a new thread above core pool size threads if the queue is saturated which we faked by returning false. If this fails (again, it's racy), ThreadPoolExecutor will reject the task. This is where the ForceQueuePolicy rejection handler comes into play to give us a chance to just drop this on the queue.

I continue to disagree with your and @bleskes assessment of the situation. It is not an optimization, it is a trick for the common case to make the ThreadPoolExecutor spin up threads all the way up to max pool size instead of queuing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we're just missing each other's point. I wasn't arguing for getting rid of ExecutorScalingQueue. I just think that a scaling threadpool should be always defined with min < max. This would mean that the condition if (left > 0) { can be removed and we end up with:

public boolean offer(E e) {
    return tryTransfer(e);
}

Copy link
Member Author

@jasontedor jasontedor Apr 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just think that a scaling threadpool should be always defined with min < max.

I'm not sure that this is the right thing to do. Consider the refresh thread pool with its default settings of min = 1 and max = half the number of processors (with a ceiling of 10); on a two-processor system this will have min = 1, max = 1 and maybe that's the right thing? Either way, we should address this in a follow-up.

int left = executor.getMaximumPoolSize() - executor.getCorePoolSize();
if (left > 0) {
// reject queuing the task to force the thread pool
// executor to add a worker if it can; combined
// with ForceQueuePolicy, this causes the thread
// pool to always scale up to max pool size and we
// only queue when there is no spare capacity
return false;
} else {
return super.offer(e);
Expand All @@ -162,6 +162,7 @@ public boolean offer(E e) {
return true;
}
}

}

/**
Expand All @@ -184,4 +185,5 @@ public long rejected() {
return 0;
}
}

}
180 changes: 108 additions & 72 deletions core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
//Have very low pool and queue sizes to overwhelm internal pools easily
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put("threadpool.generic.size", 1)
.put("threadpool.generic.queue_size", 1)
.put("threadpool.generic.max", 4)
// don't mess with this one! It's quite sensitive to a low queue size
// (see also ThreadedActionListener which is happily spawning threads even when we already got rejected)
//.put("threadpool.listener.queue_size", 1)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.threadpool;

import org.elasticsearch.test.ESTestCase;

import java.util.Map;
import java.util.stream.Collectors;

public abstract class ESThreadPoolTestCase extends ESTestCase {

protected final ThreadPool.Info info(final ThreadPool threadPool, final String name) {
for (final ThreadPool.Info info : threadPool.info()) {
if (info.getName().equals(name)) {
return info;
}
}
throw new IllegalArgumentException(name);
}

protected final ThreadPoolStats.Stats stats(final ThreadPool threadPool, final String name) {
for (final ThreadPoolStats.Stats stats : threadPool.stats()) {
if (name.equals(stats.getName())) {
return stats;
}
}
throw new IllegalArgumentException(name);
}

protected final void terminateThreadPoolIfNeeded(final ThreadPool threadPool) throws InterruptedException {
if (threadPool != null) {
terminate(threadPool);
}
}

static String randomThreadPool(final ThreadPool.ThreadPoolType type) {
return randomFrom(
ThreadPool.THREAD_POOL_TYPES
.entrySet().stream()
.filter(t -> t.getValue().equals(type))
.map(Map.Entry::getKey)
.collect(Collectors.toList()));
}

}
Loading