From 2c3d4b76bdeba3da1338f3de3704d1ac255c88c8 Mon Sep 17 00:00:00 2001 From: Prabhakar Sithanandam Date: Mon, 4 Mar 2019 11:16:01 +0000 Subject: [PATCH] Introducing a new snapshot segments threadpool to uploads segments of shards in parallel. Current Behavior: The snapshot thread pool max size is determined by the number of processors by 2 capped at 5. Each snapshot thread works on snapshotting one shard. So, at any point in time at max there will be 5 shards uploaded in parallel. This works good for cases where the number of shards to be snapshotted is greater than 5. But, for cases where there are lesser number of shards, the snapshot threads are under utilized. This problem is more visible towards the end of a snapshot, where snapshot is complete for most of the shards and in progress for 1 or 2 shards. Proposed Behavior: Introducing a new prioritized thread pool to upload segements of various shards in parallel, this will prioritize uploading segments of one shard at a time. Thus the snapshot processing capacity is fully utilized throughout the entire span of snapshot. Prioritizing one shard also results in releasing the shard lock (held by snapshot) sooner. --- .../AbstractPrioritizedRunnable.java | 43 ++++ .../common/util/concurrent/EsExecutors.java | 64 +++++- .../common/util/concurrent/ThreadContext.java | 80 +++++++ .../repositories/FilterRepository.java | 10 + .../repositories/Repository.java | 27 +++ .../blobstore/BlobStoreRepository.java | 121 +++++++++-- .../rest/action/cat/RestThreadPoolAction.java | 3 +- .../snapshots/SnapshotShardsService.java | 14 +- .../PrioritizedScalingExecutorBuilder.java | 141 +++++++++++++ .../elasticsearch/threadpool/ThreadPool.java | 14 +- .../AbstractPrioritizedRunnableTests.java | 182 ++++++++++++++++ .../util/concurrent/EsExecutorsTests.java | 140 +++++++++++++ .../AbstractSnapshotIntegTestCase.java | 10 + .../MinThreadsSnapshotRestoreIT.java | 44 ++++ .../snapshots/mockstore/MockRepository.java | 6 +- .../PrioritizedScalingThreadPoolTests.java | 195 ++++++++++++++++++ .../coordination/DeterministicTaskQueue.java | 4 + 17 files changed, 1077 insertions(+), 21 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractPrioritizedRunnable.java create mode 100644 server/src/main/java/org/elasticsearch/threadpool/PrioritizedScalingExecutorBuilder.java create mode 100644 server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractPrioritizedRunnableTests.java create mode 100644 server/src/test/java/org/elasticsearch/threadpool/PrioritizedScalingThreadPoolTests.java diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractPrioritizedRunnable.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractPrioritizedRunnable.java new file mode 100644 index 0000000000000..bdef7f9f6bb6b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractPrioritizedRunnable.java @@ -0,0 +1,43 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.common.util.concurrent; + +/** + * Wrapper over {@link AbstractRunnable} which provides priority. + *

+ * Priority is done by natural ordering. + * 0 has higher priority than 1, 1 has higher priority than 2 and so on. + */ +public abstract class AbstractPrioritizedRunnable extends AbstractRunnable implements Comparable { + + private final Long priority; + + protected AbstractPrioritizedRunnable(long priority) { + this.priority = priority; + } + + public Long getPriority() { + return priority; + } + + public int compareTo(AbstractPrioritizedRunnable other) { + return this.priority.compareTo(other.priority); + } + +} diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 29cd7f6682a64..2ec13f7c9b358 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -36,12 +36,14 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedTransferQueue; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.RunnableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; public class EsExecutors { @@ -79,6 +81,16 @@ public static EsThreadPoolExecutor newScaling(String name, int min, int max, lon return executor; } + + public static EsThreadPoolExecutor newPrioritizedScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit, + ThreadFactory threadFactory, ThreadContext contextHolder) { + ExecutorPrioritizedScalingQueue queue = new ExecutorPrioritizedScalingQueue<>(); + EsThreadPoolExecutor executor = new EsThreadPoolExecutor(name, min, max, keepAliveTime, unit, queue, threadFactory, + new ForceQueuePolicy(), contextHolder); + queue.executor = executor; + return executor; + } + public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity, ThreadFactory threadFactory, ThreadContext contextHolder) { BlockingQueue queue; @@ -307,6 +319,55 @@ public boolean offer(E e) { } + static class ExecutorPrioritizedScalingQueue extends PriorityBlockingQueue { + + private final ReentrantLock lock; + ThreadPoolExecutor executor; + + ExecutorPrioritizedScalingQueue() { + lock = new ReentrantLock(true); + } + + @Override + public boolean offer(E e) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + // check if any threads is idle in pool + int notActive = executor.getPoolSize() - executor.getActiveCount(); + if (notActive > 0) { + // There exists some idle threads in the pool, adding to the queue so that they get picked up + return super.offer(e); + } + // check if there might be spare capacity in the thread pool executor + int left = executor.getMaximumPoolSize() - executor.getPoolSize(); + if (left > 0) { + // reject queuing the task to force the thread pool executor to add a worker if it can; combined + // with ForceQueuePolicy, this causes the thread pool to always scale up to max pool size and we + // only queue when there is no spare capacity + return false; + } else { + return super.offer(e); + } + } finally { + lock.unlock(); + } + } + + @Override + public void put(E e) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + // This is called from ForceQueuePolicy so directly add it to parent + super.offer(e); + } finally { + lock.unlock(); + } + } + + } + /** * A handler for rejected tasks that adds the specified element to this queue, * waiting if necessary for space to become available. @@ -317,7 +378,8 @@ static class ForceQueuePolicy implements XRejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { // force queue policy should only be used with a scaling queue - assert executor.getQueue() instanceof ExecutorScalingQueue; + assert (executor.getQueue() instanceof ExecutorScalingQueue) || + (executor.getQueue() instanceof ExecutorPrioritizedScalingQueue); executor.getQueue().put(r); } catch (final InterruptedException e) { // a scaling queue never blocks so a put to it can never be interrupted diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java index 0fa0e832a0a2b..afe6b5f4acd5a 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java @@ -345,12 +345,18 @@ public void addResponseHeader(final String key, final String value, final Functi * command has already been passed through this method then it is returned unaltered rather than wrapped twice. */ public Runnable preserveContext(Runnable command) { + if (command instanceof ContextPreservingAbstractPrioritizedRunnable) { + return command; + } if (command instanceof ContextPreservingAbstractRunnable) { return command; } if (command instanceof ContextPreservingRunnable) { return command; } + if (command instanceof AbstractPrioritizedRunnable) { + return new ContextPreservingAbstractPrioritizedRunnable((AbstractPrioritizedRunnable) command); + } if (command instanceof AbstractRunnable) { return new ContextPreservingAbstractRunnable((AbstractRunnable) command); } @@ -361,6 +367,9 @@ public Runnable preserveContext(Runnable command) { * Unwraps a command that was previously wrapped by {@link #preserveContext(Runnable)}. */ public Runnable unwrap(Runnable command) { + if (command instanceof ContextPreservingAbstractPrioritizedRunnable) { + return ((ContextPreservingAbstractPrioritizedRunnable) command).unwrap(); + } if (command instanceof WrappedRunnable) { return ((WrappedRunnable) command).unwrap(); } @@ -771,6 +780,77 @@ public AbstractRunnable unwrap() { } } + /** + * Wraps an AbstractPrioritizedRunnable to preserve the thread context. + */ + private class ContextPreservingAbstractPrioritizedRunnable extends AbstractPrioritizedRunnable { + private final AbstractPrioritizedRunnable in; + private final ThreadContext.StoredContext creatorsContext; + + private ThreadContext.StoredContext threadsOriginalContext = null; + + private ContextPreservingAbstractPrioritizedRunnable(AbstractPrioritizedRunnable in) { + super(in.getPriority()); + creatorsContext = newStoredContext(false); + this.in = in; + } + + @Override + public boolean isForceExecution() { + return in.isForceExecution(); + } + + @Override + public void onAfter() { + try { + in.onAfter(); + } finally { + if (threadsOriginalContext != null) { + threadsOriginalContext.restore(); + } + } + } + + @Override + public void onFailure(Exception e) { + in.onFailure(e); + } + + @Override + public void onRejection(Exception e) { + in.onRejection(e); + } + + @Override + protected void doRun() throws Exception { + boolean whileRunning = false; + threadsOriginalContext = stashContext(); + try { + creatorsContext.restore(); + whileRunning = true; + in.doRun(); + whileRunning = false; + } catch (IllegalStateException ex) { + if (whileRunning || threadLocal.closed.get() == false) { + throw ex; + } + // if we hit an ISE here we have been shutting down + // this comes from the threadcontext and barfs if + // our threadpool has been shutting down + } + } + + @Override + public String toString() { + return in.toString(); + } + + public AbstractPrioritizedRunnable unwrap() { + return in; + } + + } + private static final Collector, Set> LINKED_HASH_SET_COLLECTOR = new LinkedHashSetCollector<>(); private static class LinkedHashSetCollector implements Collector, Set> { diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index 4e8e9b6c7f569..fe33d73009b0b 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -37,6 +37,9 @@ import java.io.IOException; import java.util.List; +import java.util.Optional; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; public class FilterRepository implements Repository { @@ -124,6 +127,13 @@ public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, in.snapshotShard(shard, store, snapshotId, indexId, snapshotIndexCommit, snapshotStatus); } + @Override + public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, + IndexShardSnapshotStatus snapshotStatus, Optional priorityGenerator, + Optional executor) { + in.snapshotShard(shard, store, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, priorityGenerator, executor); + } + @Override public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) { diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index 1ca6f5e148510..c2b128d3762cf 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -36,6 +36,9 @@ import java.io.IOException; import java.util.List; +import java.util.Optional; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; /** @@ -204,6 +207,30 @@ SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus); + /** + * Creates a snapshot of the shard based on the index commit point. + *

+ * The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#acquireLastIndexCommit} method. + * Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller. + *

+ * As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check + * {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted. + * @param shard shard to be snapshotted + * @param store store to be snapshotted + * @param snapshotId snapshot id + * @param indexId id for the index being snapshotted + * @param snapshotIndexCommit commit point + * @param snapshotStatus snapshot status + * @param priorityGenerator priority generator for this shard + * @param executor executor to upload files in parallel + */ + default void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, + IndexShardSnapshotStatus snapshotStatus, Optional priorityGenerator, + Optional executor) { + // Default implementation will ignore priority and executor and execute the older way + snapshotShard(shard, store, snapshotId, indexId, snapshotIndexCommit, snapshotStatus); + } + /** * Restores snapshot of the shard. *

diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 4fee2fad41600..4d1f03aea4fbc 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -60,6 +60,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.AbstractPrioritizedRunnable; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -108,7 +109,12 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName; @@ -166,6 +172,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private static final int BUFFER_SIZE = 4096; + private static final int MAX_SEGMENT_PRIORITY_IN_A_SHARD = 10000; + private static final String SNAPSHOT_PREFIX = "snap-"; private static final String SNAPSHOT_CODEC = "snapshot"; @@ -847,7 +855,15 @@ private void writeAtomic(final String blobName, final BytesReference bytesRef, b @Override public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) { - SnapshotContext snapshotContext = new SnapshotContext(store, snapshotId, indexId, snapshotStatus, System.currentTimeMillis()); + snapshotShard(shard, store, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, Optional.empty(), Optional.empty()); + } + + @Override + public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, + IndexShardSnapshotStatus snapshotStatus,Optional priorityGenerator, + Optional executor) { + SnapshotContext snapshotContext = new SnapshotContext(store, snapshotId, indexId, snapshotStatus, System.currentTimeMillis(), + priorityGenerator, executor); try { snapshotContext.snapshot(snapshotIndexCommit); } catch (Exception e) { @@ -1165,20 +1181,27 @@ private class SnapshotContext extends Context { private final Store store; private final IndexShardSnapshotStatus snapshotStatus; private final long startTime; + private final Optional priorityProvider; + private final Optional executor; /** * Constructs new context * - * @param store store to be snapshotted - * @param snapshotId snapshot id - * @param indexId the id of the index being snapshotted - * @param snapshotStatus snapshot status to report progress + * @param store store to be snapshotted + * @param snapshotId snapshot id + * @param indexId the id of the index being snapshotted + * @param snapshotStatus snapshot status to report progress + * @param priorityProvider priority provider for this shard + * @param executor executor to upload the files */ - SnapshotContext(Store store, SnapshotId snapshotId, IndexId indexId, IndexShardSnapshotStatus snapshotStatus, long startTime) { + SnapshotContext(Store store, SnapshotId snapshotId, IndexId indexId, IndexShardSnapshotStatus snapshotStatus, long startTime, + Optional priorityProvider, Optional executor) { super(snapshotId, indexId, store.shardId()); this.snapshotStatus = snapshotStatus; this.store = store; this.startTime = startTime; + this.priorityProvider = priorityProvider; + this.executor = executor; } /** @@ -1275,13 +1298,7 @@ public void snapshot(final IndexCommit snapshotIndexCommit) { snapshotStatus.moveToStarted(startTime, indexIncrementalFileCount, indexTotalNumberOfFiles, indexIncrementalSize, indexTotalFileCount); - for (BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo : filesToSnapshot) { - try { - snapshotFile(snapshotFileInfo); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", e); - } - } + snapshotFiles(filesToSnapshot); } finally { store.decRef(); } @@ -1320,6 +1337,84 @@ public void snapshot(final IndexCommit snapshotIndexCommit) { snapshotStatus.moveToDone(System.currentTimeMillis()); } + /** + * Snapshot all files that belong to a shard. + * + * @param filesToSnapshot files to be snapshotted + */ + private void snapshotFiles(final List filesToSnapshot) { + if (executor.isPresent()) { + // Sort the files in descending order of size so that bigger segments are uploaded before smaller + // segments, this will help in the max utilization of segment thread pool. If bigger segments are + // uploaded later then there will be only few threads which are active towards the end and rest of the + // threads will be idle. + filesToSnapshot.sort((f1, f2) -> (int)(f2.metadata().length() - f1.metadata().length())); + final SetOnce shardFailure = new SetOnce<>(); + final CountDownLatch countDownLatch = new CountDownLatch(filesToSnapshot.size()); + final long priority = priorityProvider.get().getAndIncrement() * MAX_SEGMENT_PRIORITY_IN_A_SHARD; + int segmentCounter = 0; + for (BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo : filesToSnapshot) { + if (segmentCounter < MAX_SEGMENT_PRIORITY_IN_A_SHARD) { + segmentCounter++; + } + final long segmentPriority = priority + segmentCounter; + executor.get().execute(new AbstractPrioritizedRunnable(segmentPriority) { + @Override + public void doRun() throws Exception { + if (shardFailure.get() == null && !snapshotStatus.isAborted()){ + // shard failure will be set if any of the previous segments got an exception, if + // shard failure is set we can return immediately. shard failure is not set here + // indicating previous segments uploads were fine, good to go. + snapshotFile(snapshotFileInfo); + } else { + logger.info(() -> new ParameterizedMessage("Skipping segment upload {} as the shard {} already errored out", + snapshotFileInfo.name(), shardId)); + } + } + + @Override + public void onFailure(Exception e) { + logger.warn(() -> new ParameterizedMessage("Failed to perform snapshot for segment {}", + snapshotFileInfo.name()), e); + if (shardFailure.get() == null) { + try { + shardFailure.set(e); + } catch (SetOnce.AlreadySetException ase) { + // Some other thread has set it, swallow the exception + } + } + } + + @Override + public void onAfter() { + countDownLatch.countDown(); + } + }); + } + while (shardFailure.get() == null && countDownLatch.getCount() > 0 && !snapshotStatus.isAborted()) { + try { + countDownLatch.await(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // Do nothing + } + } + if (snapshotStatus.isAborted()) { + throw new IndexShardSnapshotFailedException(shardId, "Aborted"); + } + if (shardFailure.get() != null) { + throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", shardFailure.get()); + } + } else { + for (BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo : filesToSnapshot) { + try { + snapshotFile(snapshotFileInfo); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", e); + } + } + } + } + /** * Snapshot individual file * diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java index e420dfb9843b8..8588930855dbc 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java @@ -215,7 +215,8 @@ private Table buildTable(RestRequest req, ClusterStateResponse state, NodesInfoR keepAlive = poolInfo.getKeepAlive().toString(); } - if (poolInfo.getThreadPoolType() == ThreadPool.ThreadPoolType.SCALING) { + if (poolInfo.getThreadPoolType() == ThreadPool.ThreadPoolType.SCALING || + poolInfo.getThreadPoolType() == ThreadPool.ThreadPoolType.PRIORITIZED_SCALING) { assert poolInfo.getMin() >= 0; core = poolInfo.getMin(); assert poolInfo.getMax() > 0; diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index fbb0a876e8f29..0055d862bb6e3 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -78,7 +78,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; @@ -107,6 +109,8 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements private final Map> shardSnapshots = new HashMap<>(); + private final Map shardSnapshotPriorities = new HashMap<>(); + // A map of snapshots to the shardIds that we already reported to the master as failed private final TransportRequestDeduplicator remoteFailedRequestDeduplicator = new TransportRequestDeduplicator<>(); @@ -226,6 +230,7 @@ private void cancelRemoved(@Nullable SnapshotsInProgress snapshotsInProgress) { // running shards is missed, then the snapshot is removed is a subsequent cluster // state update, which is being processed here it.remove(); + shardSnapshotPriorities.remove(snapshot); for (IndexShardSnapshotStatus snapshotStatus : entry.getValue().values()) { snapshotStatus.abortIfNotCompleted("snapshot has been removed in cluster state, aborting"); } @@ -259,6 +264,7 @@ private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) { } if (startedShards != null && startedShards.isEmpty() == false) { shardSnapshots.computeIfAbsent(snapshot, s -> new HashMap<>()).putAll(startedShards); + shardSnapshotPriorities.computeIfAbsent(snapshot, a -> new AtomicInteger()); startNewShards(entry, startedShards); } } else if (entryState == State.ABORTED) { @@ -299,6 +305,8 @@ private void startNewShards(SnapshotsInProgress.Entry entry, Map indicesMap = entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity())); final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + final Executor segmentExecutor = threadPool.executor(ThreadPool.Names.SNAPSHOT_SEGMENTS); + final AtomicInteger shardPriorityProvider = shardSnapshotPriorities.get(snapshot); for (final Map.Entry shardEntry : startedShards.entrySet()) { final ShardId shardId = shardEntry.getKey(); final IndexId indexId = indicesMap.get(shardId.getIndexName()); @@ -311,7 +319,7 @@ private void startNewShards(SnapshotsInProgress.Entry entry, Map { + + private final Setting coreSetting; + private final Setting maxSetting; + private final Setting keepAliveSetting; + + /** + * Construct a scaling executor builder; the settings will have the + * key prefix "thread_pool." followed by the executor name. + * + * @param name the name of the executor + * @param core the minimum number of threads in the pool + * @param max the maximum number of threads in the pool + * @param keepAlive the time that spare threads above {@code core} + * threads will be kept alive + */ + public PrioritizedScalingExecutorBuilder(final String name, final int core, final int max, final TimeValue keepAlive) { + this(name, core, max, keepAlive, "thread_pool." + name); + } + + /** + * Construct a scaling executor builder; the settings will have the + * specified key prefix. + * + * @param name the name of the executor + * @param core the minimum number of threads in the pool + * @param max the maximum number of threads in the pool + * @param keepAlive the time that spare threads above {@code core} + * threads will be kept alive + * @param prefix the prefix for the settings keys + */ + public PrioritizedScalingExecutorBuilder(final String name, final int core, final int max, final TimeValue keepAlive, + final String prefix) { + super(name); + this.coreSetting = + Setting.intSetting(settingsKey(prefix, "core"), core, Setting.Property.NodeScope); + this.maxSetting = Setting.intSetting(settingsKey(prefix, "max"), max, Setting.Property.NodeScope); + this.keepAliveSetting = + Setting.timeSetting(settingsKey(prefix, "keep_alive"), keepAlive, Setting.Property.NodeScope); + } + + @Override + public List> getRegisteredSettings() { + return Arrays.asList(coreSetting, maxSetting, keepAliveSetting); + } + + @Override + PrioritizedScalingExecutorSettings getSettings(Settings settings) { + final String nodeName = Node.NODE_NAME_SETTING.get(settings); + final int coreThreads = coreSetting.get(settings); + final int maxThreads = maxSetting.get(settings); + final TimeValue keepAlive = keepAliveSetting.get(settings); + return new PrioritizedScalingExecutorSettings(nodeName, coreThreads, maxThreads, keepAlive); + } + + ThreadPool.ExecutorHolder build(final PrioritizedScalingExecutorBuilder.PrioritizedScalingExecutorSettings settings, + final ThreadContext threadContext) { + TimeValue keepAlive = settings.keepAlive; + int core = settings.core; + int max = settings.max; + final ThreadPool.Info info = new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.PRIORITIZED_SCALING, core, max, keepAlive, + null); + final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name())); + final ExecutorService executor = + EsExecutors.newPrioritizedScaling( + settings.nodeName + "/" + name(), + core, + max, + keepAlive.millis(), + TimeUnit.MILLISECONDS, + threadFactory, + threadContext); + return new ThreadPool.ExecutorHolder(executor, info); + } + + @Override + String formatInfo(ThreadPool.Info info) { + return String.format( + Locale.ROOT, + "name [%s], core [%d], max [%d], keep alive [%s]", + info.getName(), + info.getMin(), + info.getMax(), + info.getKeepAlive()); + } + + static class PrioritizedScalingExecutorSettings extends ExecutorBuilder.ExecutorSettings { + + private final int core; + private final int max; + private final TimeValue keepAlive; + + PrioritizedScalingExecutorSettings(final String nodeName, final int core, final int max, final TimeValue keepAlive) { + super(nodeName); + this.core = core; + this.max = max; + this.keepAlive = keepAlive; + } + } + + +} diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 031514c1de0d9..7382e6f7a2132 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -79,6 +79,7 @@ public static class Names { public static final String REFRESH = "refresh"; public static final String WARMER = "warmer"; public static final String SNAPSHOT = "snapshot"; + public static final String SNAPSHOT_SEGMENTS = "snapshot_segments"; public static final String FORCE_MERGE = "force_merge"; public static final String FETCH_SHARD_STARTED = "fetch_shard_started"; public static final String FETCH_SHARD_STORE = "fetch_shard_store"; @@ -88,7 +89,8 @@ public enum ThreadPoolType { DIRECT("direct"), FIXED("fixed"), FIXED_AUTO_QUEUE_SIZE("fixed_auto_queue_size"), - SCALING("scaling"); + SCALING("scaling"), + PRIORITIZED_SCALING("prioritized_scaling"); private final String type; @@ -135,6 +137,7 @@ public static ThreadPoolType fromType(String type) { map.put(Names.REFRESH, ThreadPoolType.SCALING); map.put(Names.WARMER, ThreadPoolType.SCALING); map.put(Names.SNAPSHOT, ThreadPoolType.SCALING); + map.put(Names.SNAPSHOT_SEGMENTS, ThreadPoolType.PRIORITIZED_SCALING); map.put(Names.FORCE_MERGE, ThreadPoolType.FIXED); map.put(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING); map.put(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING); @@ -169,6 +172,7 @@ public ThreadPool(final Settings settings, final ExecutorBuilder... customBui final Map builders = new HashMap<>(); final int availableProcessors = EsExecutors.numberOfProcessors(settings); + final int halfNumberOfProcessors = halfNumberOfProcessors(availableProcessors); final int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors); final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors); final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512); @@ -188,6 +192,8 @@ public ThreadPool(final Settings settings, final ExecutorBuilder... customBui builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5))); builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); + builders.put(Names.SNAPSHOT_SEGMENTS, + new PrioritizedScalingExecutorBuilder(Names.SNAPSHOT_SEGMENTS, 1, halfNumberOfProcessors, TimeValue.timeValueMinutes(5))); builders.put(Names.FETCH_SHARD_STARTED, new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5))); builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1)); @@ -434,6 +440,10 @@ static int boundedBy(int value, int min, int max) { return Math.min(max, Math.max(min, value)); } + static int halfNumberOfProcessors(int numberOfProcessors) { + return (numberOfProcessors + 1) / 2; + } + static int halfNumberOfProcessorsMaxFive(int numberOfProcessors) { return boundedBy((numberOfProcessors + 1) / 2, 1, 5); } @@ -688,7 +698,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject(name); builder.field("type", type.getType()); - if (type == ThreadPoolType.SCALING) { + if (type == ThreadPoolType.SCALING || type == ThreadPoolType.PRIORITIZED_SCALING) { assert min != -1; builder.field("core", min); assert max != -1; diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractPrioritizedRunnableTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractPrioritizedRunnableTests.java new file mode 100644 index 0000000000000..41f49d575445f --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractPrioritizedRunnableTests.java @@ -0,0 +1,182 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.common.util.concurrent; + +import org.elasticsearch.test.ESTestCase; + +import org.mockito.InOrder; + +import java.util.concurrent.Callable; + +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +/** + * Tests {@link AbstractPrioritizedRunnable} + */ +public class AbstractPrioritizedRunnableTests extends ESTestCase { + public void testRunSuccess() throws Exception { + Callable runCallable = mock(Callable.class); + + AbstractPrioritizedRunnable runnable = new AbstractPrioritizedRunnable(1) { + @Override + public void onFailure(Exception e) { + fail(e.toString()); + } + + @Override + protected void doRun() throws Exception { + runCallable.call(); + } + }; + + runnable.run(); + + verify(runCallable).call(); + } + + public void testRunFailure() throws Exception { + RuntimeException exception = new RuntimeException(); + + AbstractPrioritizedRunnable runnable = new AbstractPrioritizedRunnable(1) { + @Override + public void onFailure(Exception e) { + assertSame(exception, e); + } + + @Override + protected void doRun() throws Exception { + throw exception; + } + }; + + runnable.run(); + } + + public void testOnAfterSuccess() throws Exception { + Callable runCallable = mock(Callable.class); + Callable afterCallable = mock(Callable.class); + + AbstractPrioritizedRunnable runnable = new AbstractPrioritizedRunnable(1) { + @Override + public void onFailure(Exception e) { + fail(e.toString()); + } + + @Override + protected void doRun() throws Exception { + runCallable.call(); + } + + @Override + public void onAfter() { + try { + afterCallable.call(); + } + catch (Exception e) { + fail(e.toString()); + } + } + }; + + runnable.run(); + + InOrder inOrder = inOrder(runCallable, afterCallable); + + inOrder.verify(runCallable).call(); + inOrder.verify(afterCallable).call(); + + } + + public void testOnAfterFailure() throws Exception { + RuntimeException exception = new RuntimeException(); + Callable afterCallable = mock(Callable.class); + + AbstractPrioritizedRunnable runnable = new AbstractPrioritizedRunnable(1) { + @Override + public void onFailure(Exception e) { + assertSame(exception, e); + } + + @Override + protected void doRun() throws Exception { + throw exception; + } + + @Override + public void onAfter() { + try { + afterCallable.call(); + } + catch (Exception e) { + fail(e.toString()); + } + } + }; + + runnable.run(); + + verify(afterCallable).call(); + } + + public void testOnRejection() throws Exception { + RuntimeException exception = new RuntimeException(); + Callable failureCallable = mock(Callable.class); + + AbstractPrioritizedRunnable runnable = new AbstractPrioritizedRunnable(1) { + @Override + public void onFailure(Exception e) { + assertSame(exception, e); + + try { + failureCallable.call(); + } + catch (Exception inner) { + inner.addSuppressed(e); + fail(inner.toString()); + } + } + + @Override + protected void doRun() throws Exception { + fail("Not tested"); + } + }; + + runnable.onRejection(exception); + } + + public void testIsForceExecutuonDefaultsFalse() { + AbstractPrioritizedRunnable runnable = new AbstractPrioritizedRunnable(1) { + @Override + public void onFailure(Exception e) { + fail(e.toString()); + } + + @Override + protected void doRun() throws Exception { + fail("Not tested"); + } + }; + + assertFalse(runnable.isForceExecution()); + } + +} diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java index 0f0350c48210c..639e55f2e65bc 100644 --- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java +++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java @@ -23,6 +23,9 @@ import org.elasticsearch.test.ESTestCase; import org.hamcrest.Matcher; +import java.util.ArrayList; +import java.util.List; +import java.util.Vector; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ThreadPoolExecutor; @@ -240,6 +243,143 @@ public void testScaleDown() throws Exception { terminate(pool); } + public void testPrioritizedScaleUp() throws Exception { + final int min = between(1, 3); + final int max = between(min + 1, 6); + final CyclicBarrier barrier = new CyclicBarrier(max + 1); + + ThreadPoolExecutor pool = EsExecutors.newPrioritizedScaling(getClass().getName() + "/" + getTestName(), min, max, between(1, 100), + randomTimeUnit(), EsExecutors.daemonThreadFactory("test"), threadContext); + assertThat("Min property", pool.getCorePoolSize(), equalTo(min)); + assertThat("Max property", pool.getMaximumPoolSize(), equalTo(max)); + + for (int i = 0; i < max; ++i) { + final CountDownLatch latch = new CountDownLatch(1); + pool.execute(new AbstractPrioritizedRunnable(i) { + @Override + public void doRun() { + latch.countDown(); + try { + barrier.await(); + barrier.await(); + Thread.sleep((getPriority()+1)*100); + } catch (Exception e) { + barrier.reset(); + } + } + + @Override + public void onFailure(Exception e) { + } + + }); + + //wait until thread executes this task + //otherwise, a task might be queued + latch.await(); + } + + final List expectedPriority = new ArrayList<>(); + final List actualPriority = new Vector<>(); + for (int i = 0; i < max; ++i) { + expectedPriority.add(Long.valueOf(max+i)); + pool.execute(new AbstractPrioritizedRunnable(2 * max - i - 1) { + @Override + public void doRun() { + try { + actualPriority.add(getPriority()); + barrier.await(); + } catch (Exception e) { + barrier.reset(); + } + } + + @Override + public void onFailure(Exception e) { + } + + }); + } + + barrier.await(); + assertThat("wrong pool size", pool.getPoolSize(), equalTo(max)); + assertThat("wrong active size", pool.getActiveCount(), equalTo(max)); + barrier.await(); + barrier.await(); + assertThat("wrong priority order", actualPriority, equalTo(expectedPriority)); + terminate(pool); + } + + public void testPrioritizedScaleDown() throws Exception { + final int min = between(1, 3); + final int max = between(min + 1, 6); + final CyclicBarrier barrier = new CyclicBarrier(max + 1); + + final ThreadPoolExecutor pool = EsExecutors.newPrioritizedScaling(getClass().getName() + "/" + getTestName(), min, max, + between(1, 100), TimeUnit.MILLISECONDS, EsExecutors.daemonThreadFactory("test"), threadContext); + assertThat("Min property", pool.getCorePoolSize(), equalTo(min)); + assertThat("Max property", pool.getMaximumPoolSize(), equalTo(max)); + + for (int i = 0; i < max; ++i) { + final CountDownLatch latch = new CountDownLatch(1); + pool.execute(new AbstractPrioritizedRunnable(i) { + @Override + public void doRun() { + latch.countDown(); + try { + barrier.await(); + barrier.await(); + Thread.sleep((getPriority()+1)*100); + } catch (Exception e) { + barrier.reset(); + } + } + + @Override + public void onFailure(Exception e) { + } + }); + + //wait until thread executes this task + //otherwise, a task might be queued + latch.await(); + } + + final List expectedPriority = new ArrayList<>(); + final List actualPriority = new Vector<>(); + for (int i = 0; i < max; ++i) { + expectedPriority.add(Long.valueOf(max+i)); + pool.execute(new AbstractPrioritizedRunnable(2 * max - i - 1) { + @Override + public void doRun() { + try { + actualPriority.add(Long.valueOf(getPriority())); + barrier.await(); + } catch (Exception e) { + barrier.reset(); + } + } + + @Override + public void onFailure(Exception e) { + } + + }); + } + + barrier.await(); + assertThat("wrong pool size", pool.getPoolSize(), equalTo(max)); + assertThat("wrong active size", pool.getActiveCount(), equalTo(max)); + barrier.await(); + barrier.await(); + assertThat("wrong priority order", actualPriority, equalTo(expectedPriority)); + assertBusy(() -> { + assertThat("wrong active count", pool.getActiveCount(), equalTo(0)); + assertThat("idle threads didn't shrink below max. (" + pool.getPoolSize() + ")", pool.getPoolSize(), lessThan(max)); + }); + terminate(pool); + } + public void testRejectionMessageAndShuttingDownFlag() throws InterruptedException { int pool = between(1, 10); int queue = between(0, 100); diff --git a/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index d5409821befd5..bd57000b853ed 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -159,6 +159,16 @@ public static String blockNodeWithIndex(final String repositoryName, final Strin return null; } + public static String errorNodeWithIndex(final String repositoryName, final String indexName, double errorRate) { + for(String node : internalCluster().nodesInclude(indexName)) { + ((MockRepository)internalCluster().getInstance(RepositoriesService.class, node).repository(repositoryName)) + .randomDataFileIOExceptionRate(errorRate); + return node; + } + fail("No nodes for the index " + indexName + " found"); + return null; + } + public static void blockAllDataNodes(String repository) { for(RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { ((MockRepository)repositoriesService.repository(repository)).blockOnDataFiles(true); diff --git a/server/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java index 885baa883ed63..dfb4a7df51499 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.plugins.Plugin; @@ -32,6 +33,8 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; /** * Tests for snapshot/restore that require at least 2 threads available @@ -45,6 +48,8 @@ protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder().put(super.nodeSettings(nodeOrdinal)) .put("thread_pool.snapshot.core", 2) .put("thread_pool.snapshot.max", 2) + .put("thread_pool.snapshot_segments.core", 1) + .put("thread_pool.snapshot_segments.max", 2) .build(); } @@ -206,4 +211,43 @@ public void testRestoreWithInProgressDeletionsNotAllowed() throws Exception { client().admin().cluster().prepareRestoreSnapshot(repo, snapshot1).setWaitForCompletion(true).get(); assertEquals(1, client().admin().cluster().prepareGetSnapshots(repo).setSnapshots("_all").get().getSnapshots().size()); } + + public void testSnapshotPartialDuringUploadFailure() throws Exception { + logger.info("--> start 2 nodes"); + internalCluster().startNode(); + internalCluster().startNode(); + Client client = client(); + + assertAcked(prepareCreate("test-idx", 2, Settings.builder().put("number_of_shards", 2).put("number_of_replicas", 0))); + ensureGreen(); + + logger.info("--> indexing some data"); + for (int i = 0; i < 100; i++) { + index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i); + } + refresh(); + assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits().value, equalTo(100L)); + + logger.info("--> create repository"); + logger.info("--> creating repository"); + assertAcked(client.admin().cluster().preparePutRepository("test-repo") + .setType("mock").setSettings( + Settings.builder() + .put("location", randomRepoPath()) + .put("random", randomAlphaOfLength(10)) + ).get()); + + String errorNode = errorNodeWithIndex("test-repo", "test-idx", 100); + + logger.info("--> snapshot"); + client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get(); + + logger.info("--> waiting for completion"); + SnapshotInfo snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(60)); + logger.info("Number of failed shards [{}]", snapshotInfo.shardFailures().size()); + assertThat(snapshotInfo.state(), equalTo(SnapshotState.PARTIAL)); + assertThat(((MockRepository)internalCluster().getInstance(RepositoriesService.class, errorNode).repository("test-repo")) + .getFailureCount(), lessThanOrEqualTo(2L)); + logger.info("--> done"); + } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 8a49324757f27..caee1dcb30503 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -87,7 +87,7 @@ public long getFailureCount() { private final double randomControlIOExceptionRate; - private final double randomDataFileIOExceptionRate; + private volatile double randomDataFileIOExceptionRate; private final boolean useLuceneCorruptionException; @@ -175,6 +175,10 @@ public synchronized void unblock() { this.notifyAll(); } + public void randomDataFileIOExceptionRate(double rate) { + randomDataFileIOExceptionRate = rate; + } + public void blockOnDataFiles(boolean blocked) { blockOnDataFiles = blocked; } diff --git a/server/src/test/java/org/elasticsearch/threadpool/PrioritizedScalingThreadPoolTests.java b/server/src/test/java/org/elasticsearch/threadpool/PrioritizedScalingThreadPoolTests.java new file mode 100644 index 0000000000000..1073c245c6207 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/threadpool/PrioritizedScalingThreadPoolTests.java @@ -0,0 +1,195 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.threadpool; + +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractPrioritizedRunnable; +import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.Function; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.Matchers.equalTo; + +public class PrioritizedScalingThreadPoolTests extends ESThreadPoolTestCase { + + public void testPrioritizedScalingThreadPoolConfiguration() throws InterruptedException { + final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.PRIORITIZED_SCALING); + final Settings.Builder builder = Settings.builder(); + + final int core; + if (randomBoolean()) { + core = randomIntBetween(0, 8); + builder.put("thread_pool." + threadPoolName + ".core", core); + } else { + core = "generic".equals(threadPoolName) ? 4 : 1; // the defaults + } + + final int maxBasedOnNumberOfProcessors; + if (randomBoolean()) { + final int processors = randomIntBetween(1, 64); + maxBasedOnNumberOfProcessors = expectedSize(threadPoolName, processors); + builder.put("processors", processors); + } else { + maxBasedOnNumberOfProcessors = expectedSize(threadPoolName, Runtime.getRuntime().availableProcessors()); + } + + final int expectedMax; + if (maxBasedOnNumberOfProcessors < core || randomBoolean()) { + expectedMax = randomIntBetween(Math.max(1, core), 16); + builder.put("thread_pool." + threadPoolName + ".max", expectedMax); + } else { + expectedMax = maxBasedOnNumberOfProcessors; + } + + final long keepAlive; + if (randomBoolean()) { + keepAlive = randomIntBetween(1, 300); + builder.put("thread_pool." + threadPoolName + ".keep_alive", keepAlive + "s"); + } else { + keepAlive = "generic".equals(threadPoolName) ? 30 : 300; // the defaults + } + + runPrioritizedScalingThreadPoolTest(builder.build(), (clusterSettings, threadPool) -> { + final Executor executor = threadPool.executor(threadPoolName); + assertThat(executor, instanceOf(EsThreadPoolExecutor.class)); + final EsThreadPoolExecutor esThreadPoolExecutor = (EsThreadPoolExecutor)executor; + final ThreadPool.Info info = info(threadPool, threadPoolName); + + assertThat(info.getName(), equalTo(threadPoolName)); + assertThat(info.getThreadPoolType(), equalTo(ThreadPool.ThreadPoolType.PRIORITIZED_SCALING)); + + assertThat(info.getKeepAlive().seconds(), equalTo(keepAlive)); + assertThat(esThreadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS), equalTo(keepAlive)); + + assertNull(info.getQueueSize()); + assertThat(esThreadPoolExecutor.getQueue().remainingCapacity(), equalTo(Integer.MAX_VALUE)); + + assertThat(info.getMin(), equalTo(core)); + assertThat(esThreadPoolExecutor.getCorePoolSize(), equalTo(core)); + assertThat(info.getMax(), equalTo(expectedMax)); + assertThat(esThreadPoolExecutor.getMaximumPoolSize(), equalTo(expectedMax)); + }); + } + + private int expectedSize(final String threadPoolName, final int numberOfProcessors) { + final Map> sizes = new HashMap<>(); + sizes.put(ThreadPool.Names.SNAPSHOT_SEGMENTS, ThreadPool::halfNumberOfProcessors); + return sizes.get(threadPoolName).apply(numberOfProcessors); + } + + public void testPrioritizedScalingThreadPoolIsBounded() throws InterruptedException { + final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.PRIORITIZED_SCALING); + final int size = randomIntBetween(32, 512); + final Settings settings = Settings.builder().put("thread_pool." + threadPoolName + ".max", size).build(); + runPrioritizedScalingThreadPoolTest(settings, (clusterSettings, threadPool) -> { + final CountDownLatch latch = new CountDownLatch(1); + final int numberOfTasks = 2 * size; + final CountDownLatch taskLatch = new CountDownLatch(numberOfTasks); + for (int i = 0; i < numberOfTasks; i++) { + threadPool.executor(threadPoolName).execute(getRunnable(latch, taskLatch)); + } + final ThreadPoolStats.Stats stats = stats(threadPool, threadPoolName); + assertThat(stats.getQueue(), equalTo(numberOfTasks - size)); + assertThat(stats.getLargest(), equalTo(size)); + latch.countDown(); + try { + taskLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + } + + public void testPrioritizedScalingThreadPoolThreadsAreTerminatedAfterKeepAlive() throws InterruptedException { + final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.PRIORITIZED_SCALING); + final int min = "generic".equals(threadPoolName) ? 4 : 1; + final Settings settings = + Settings.builder() + .put("thread_pool." + threadPoolName + ".max", 128) + .put("thread_pool." + threadPoolName + ".keep_alive", "1ms") + .build(); + runPrioritizedScalingThreadPoolTest(settings, ((clusterSettings, threadPool) -> { + final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch taskLatch = new CountDownLatch(128); + for (int i = 0; i < 128; i++) { + threadPool.executor(threadPoolName).execute(getRunnable(latch, taskLatch)); + } + int threads = stats(threadPool, threadPoolName).getThreads(); + assertEquals(128, threads); + latch.countDown(); + // this while loop is the core of this test; if threads + // are correctly idled down by the pool, the number of + // threads in the pool will drop to the min for the pool + // but if threads are not correctly idled down by the pool, + // this test will just timeout waiting for them to idle + // down + do { + spinForAtLeastOneMillisecond(); + } while (stats(threadPool, threadPoolName).getThreads() > min); + try { + taskLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + })); + } + + private void runPrioritizedScalingThreadPoolTest( + final Settings settings, + final BiConsumer consumer) throws InterruptedException { + ThreadPool threadPool = null; + try { + final String test = Thread.currentThread().getStackTrace()[2].getMethodName(); + final Settings nodeSettings = Settings.builder().put(settings).put("node.name", test).build(); + threadPool = new ThreadPool(nodeSettings); + final ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + consumer.accept(clusterSettings, threadPool); + } finally { + terminateThreadPoolIfNeeded(threadPool); + } + } + + private Runnable getRunnable(CountDownLatch latch, CountDownLatch taskLatch) { + return new AbstractPrioritizedRunnable(1) { + @Override + public void onFailure(Exception e) { + + } + + @Override + protected void doRun() throws Exception { + try { + latch.await(); + taskLatch.countDown(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + } + }; + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java index 4567b97700604..0be6b09a2a4f6 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java @@ -24,6 +24,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractPrioritizedRunnable; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPoolInfo; import org.elasticsearch.threadpool.ThreadPoolStats; @@ -134,6 +135,9 @@ private void runTask(final int index) { * Schedule a task for immediate execution. */ public void scheduleNow(final Runnable task) { + if (task instanceof AbstractPrioritizedRunnable) { + task.run(); + } if (executionDelayVariabilityMillis > 0 && random.nextBoolean()) { final long executionDelay = RandomNumbers.randomLongBetween(random, 1, executionDelayVariabilityMillis); final DeferredTask deferredTask = new DeferredTask(currentTimeMillis + executionDelay, task);