diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractPrioritizedRunnable.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractPrioritizedRunnable.java
new file mode 100644
index 0000000000000..bdef7f9f6bb6b
--- /dev/null
+++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractPrioritizedRunnable.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.common.util.concurrent;
+
+/**
+ * Wrapper over {@link AbstractRunnable} which provides priority.
+ *
+ * Priority is done by natural ordering.
+ * 0 has higher priority than 1, 1 has higher priority than 2 and so on.
+ */
+public abstract class AbstractPrioritizedRunnable extends AbstractRunnable implements Comparable {
+
+ private final Long priority;
+
+ protected AbstractPrioritizedRunnable(long priority) {
+ this.priority = priority;
+ }
+
+ public Long getPriority() {
+ return priority;
+ }
+
+ public int compareTo(AbstractPrioritizedRunnable other) {
+ return this.priority.compareTo(other.priority);
+ }
+
+}
diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java
index 29cd7f6682a64..2ec13f7c9b358 100644
--- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java
+++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java
@@ -36,12 +36,14 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
public class EsExecutors {
@@ -79,6 +81,16 @@ public static EsThreadPoolExecutor newScaling(String name, int min, int max, lon
return executor;
}
+
+ public static EsThreadPoolExecutor newPrioritizedScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit,
+ ThreadFactory threadFactory, ThreadContext contextHolder) {
+ ExecutorPrioritizedScalingQueue queue = new ExecutorPrioritizedScalingQueue<>();
+ EsThreadPoolExecutor executor = new EsThreadPoolExecutor(name, min, max, keepAliveTime, unit, queue, threadFactory,
+ new ForceQueuePolicy(), contextHolder);
+ queue.executor = executor;
+ return executor;
+ }
+
public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity,
ThreadFactory threadFactory, ThreadContext contextHolder) {
BlockingQueue queue;
@@ -307,6 +319,55 @@ public boolean offer(E e) {
}
+ static class ExecutorPrioritizedScalingQueue extends PriorityBlockingQueue {
+
+ private final ReentrantLock lock;
+ ThreadPoolExecutor executor;
+
+ ExecutorPrioritizedScalingQueue() {
+ lock = new ReentrantLock(true);
+ }
+
+ @Override
+ public boolean offer(E e) {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ // check if any threads is idle in pool
+ int notActive = executor.getPoolSize() - executor.getActiveCount();
+ if (notActive > 0) {
+ // There exists some idle threads in the pool, adding to the queue so that they get picked up
+ return super.offer(e);
+ }
+ // check if there might be spare capacity in the thread pool executor
+ int left = executor.getMaximumPoolSize() - executor.getPoolSize();
+ if (left > 0) {
+ // reject queuing the task to force the thread pool executor to add a worker if it can; combined
+ // with ForceQueuePolicy, this causes the thread pool to always scale up to max pool size and we
+ // only queue when there is no spare capacity
+ return false;
+ } else {
+ return super.offer(e);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void put(E e) {
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ // This is called from ForceQueuePolicy so directly add it to parent
+ super.offer(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ }
+
/**
* A handler for rejected tasks that adds the specified element to this queue,
* waiting if necessary for space to become available.
@@ -317,7 +378,8 @@ static class ForceQueuePolicy implements XRejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// force queue policy should only be used with a scaling queue
- assert executor.getQueue() instanceof ExecutorScalingQueue;
+ assert (executor.getQueue() instanceof ExecutorScalingQueue) ||
+ (executor.getQueue() instanceof ExecutorPrioritizedScalingQueue);
executor.getQueue().put(r);
} catch (final InterruptedException e) {
// a scaling queue never blocks so a put to it can never be interrupted
diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java
index 0fa0e832a0a2b..afe6b5f4acd5a 100644
--- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java
+++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java
@@ -345,12 +345,18 @@ public void addResponseHeader(final String key, final String value, final Functi
* command has already been passed through this method then it is returned unaltered rather than wrapped twice.
*/
public Runnable preserveContext(Runnable command) {
+ if (command instanceof ContextPreservingAbstractPrioritizedRunnable) {
+ return command;
+ }
if (command instanceof ContextPreservingAbstractRunnable) {
return command;
}
if (command instanceof ContextPreservingRunnable) {
return command;
}
+ if (command instanceof AbstractPrioritizedRunnable) {
+ return new ContextPreservingAbstractPrioritizedRunnable((AbstractPrioritizedRunnable) command);
+ }
if (command instanceof AbstractRunnable) {
return new ContextPreservingAbstractRunnable((AbstractRunnable) command);
}
@@ -361,6 +367,9 @@ public Runnable preserveContext(Runnable command) {
* Unwraps a command that was previously wrapped by {@link #preserveContext(Runnable)}.
*/
public Runnable unwrap(Runnable command) {
+ if (command instanceof ContextPreservingAbstractPrioritizedRunnable) {
+ return ((ContextPreservingAbstractPrioritizedRunnable) command).unwrap();
+ }
if (command instanceof WrappedRunnable) {
return ((WrappedRunnable) command).unwrap();
}
@@ -771,6 +780,77 @@ public AbstractRunnable unwrap() {
}
}
+ /**
+ * Wraps an AbstractPrioritizedRunnable to preserve the thread context.
+ */
+ private class ContextPreservingAbstractPrioritizedRunnable extends AbstractPrioritizedRunnable {
+ private final AbstractPrioritizedRunnable in;
+ private final ThreadContext.StoredContext creatorsContext;
+
+ private ThreadContext.StoredContext threadsOriginalContext = null;
+
+ private ContextPreservingAbstractPrioritizedRunnable(AbstractPrioritizedRunnable in) {
+ super(in.getPriority());
+ creatorsContext = newStoredContext(false);
+ this.in = in;
+ }
+
+ @Override
+ public boolean isForceExecution() {
+ return in.isForceExecution();
+ }
+
+ @Override
+ public void onAfter() {
+ try {
+ in.onAfter();
+ } finally {
+ if (threadsOriginalContext != null) {
+ threadsOriginalContext.restore();
+ }
+ }
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ in.onFailure(e);
+ }
+
+ @Override
+ public void onRejection(Exception e) {
+ in.onRejection(e);
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ boolean whileRunning = false;
+ threadsOriginalContext = stashContext();
+ try {
+ creatorsContext.restore();
+ whileRunning = true;
+ in.doRun();
+ whileRunning = false;
+ } catch (IllegalStateException ex) {
+ if (whileRunning || threadLocal.closed.get() == false) {
+ throw ex;
+ }
+ // if we hit an ISE here we have been shutting down
+ // this comes from the threadcontext and barfs if
+ // our threadpool has been shutting down
+ }
+ }
+
+ @Override
+ public String toString() {
+ return in.toString();
+ }
+
+ public AbstractPrioritizedRunnable unwrap() {
+ return in;
+ }
+
+ }
+
private static final Collector, Set> LINKED_HASH_SET_COLLECTOR = new LinkedHashSetCollector<>();
private static class LinkedHashSetCollector implements Collector, Set> {
diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java
index 4e8e9b6c7f569..fe33d73009b0b 100644
--- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java
@@ -37,6 +37,9 @@
import java.io.IOException;
import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
public class FilterRepository implements Repository {
@@ -124,6 +127,13 @@ public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId,
in.snapshotShard(shard, store, snapshotId, indexId, snapshotIndexCommit, snapshotStatus);
}
+ @Override
+ public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
+ IndexShardSnapshotStatus snapshotStatus, Optional priorityGenerator,
+ Optional executor) {
+ in.snapshotShard(shard, store, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, priorityGenerator, executor);
+ }
+
@Override
public void restoreShard(IndexShard shard, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
RecoveryState recoveryState) {
diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java
index 1ca6f5e148510..c2b128d3762cf 100644
--- a/server/src/main/java/org/elasticsearch/repositories/Repository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java
@@ -36,6 +36,9 @@
import java.io.IOException;
import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
/**
@@ -204,6 +207,30 @@ SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long
void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus);
+ /**
+ * Creates a snapshot of the shard based on the index commit point.
+ *
+ * The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#acquireLastIndexCommit} method.
+ * Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller.
+ *
+ * As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check
+ * {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted.
+ * @param shard shard to be snapshotted
+ * @param store store to be snapshotted
+ * @param snapshotId snapshot id
+ * @param indexId id for the index being snapshotted
+ * @param snapshotIndexCommit commit point
+ * @param snapshotStatus snapshot status
+ * @param priorityGenerator priority generator for this shard
+ * @param executor executor to upload files in parallel
+ */
+ default void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
+ IndexShardSnapshotStatus snapshotStatus, Optional priorityGenerator,
+ Optional executor) {
+ // Default implementation will ignore priority and executor and execute the older way
+ snapshotShard(shard, store, snapshotId, indexId, snapshotIndexCommit, snapshotStatus);
+ }
+
/**
* Restores snapshot of the shard.
*
diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
index 4fee2fad41600..4d1f03aea4fbc 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
@@ -60,6 +60,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.util.concurrent.AbstractPrioritizedRunnable;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@@ -108,7 +109,12 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName;
@@ -166,6 +172,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
private static final int BUFFER_SIZE = 4096;
+ private static final int MAX_SEGMENT_PRIORITY_IN_A_SHARD = 10000;
+
private static final String SNAPSHOT_PREFIX = "snap-";
private static final String SNAPSHOT_CODEC = "snapshot";
@@ -847,7 +855,15 @@ private void writeAtomic(final String blobName, final BytesReference bytesRef, b
@Override
public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus) {
- SnapshotContext snapshotContext = new SnapshotContext(store, snapshotId, indexId, snapshotStatus, System.currentTimeMillis());
+ snapshotShard(shard, store, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, Optional.empty(), Optional.empty());
+ }
+
+ @Override
+ public void snapshotShard(IndexShard shard, Store store, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
+ IndexShardSnapshotStatus snapshotStatus,Optional priorityGenerator,
+ Optional executor) {
+ SnapshotContext snapshotContext = new SnapshotContext(store, snapshotId, indexId, snapshotStatus, System.currentTimeMillis(),
+ priorityGenerator, executor);
try {
snapshotContext.snapshot(snapshotIndexCommit);
} catch (Exception e) {
@@ -1165,20 +1181,27 @@ private class SnapshotContext extends Context {
private final Store store;
private final IndexShardSnapshotStatus snapshotStatus;
private final long startTime;
+ private final Optional priorityProvider;
+ private final Optional executor;
/**
* Constructs new context
*
- * @param store store to be snapshotted
- * @param snapshotId snapshot id
- * @param indexId the id of the index being snapshotted
- * @param snapshotStatus snapshot status to report progress
+ * @param store store to be snapshotted
+ * @param snapshotId snapshot id
+ * @param indexId the id of the index being snapshotted
+ * @param snapshotStatus snapshot status to report progress
+ * @param priorityProvider priority provider for this shard
+ * @param executor executor to upload the files
*/
- SnapshotContext(Store store, SnapshotId snapshotId, IndexId indexId, IndexShardSnapshotStatus snapshotStatus, long startTime) {
+ SnapshotContext(Store store, SnapshotId snapshotId, IndexId indexId, IndexShardSnapshotStatus snapshotStatus, long startTime,
+ Optional priorityProvider, Optional executor) {
super(snapshotId, indexId, store.shardId());
this.snapshotStatus = snapshotStatus;
this.store = store;
this.startTime = startTime;
+ this.priorityProvider = priorityProvider;
+ this.executor = executor;
}
/**
@@ -1275,13 +1298,7 @@ public void snapshot(final IndexCommit snapshotIndexCommit) {
snapshotStatus.moveToStarted(startTime, indexIncrementalFileCount,
indexTotalNumberOfFiles, indexIncrementalSize, indexTotalFileCount);
- for (BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo : filesToSnapshot) {
- try {
- snapshotFile(snapshotFileInfo);
- } catch (IOException e) {
- throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", e);
- }
- }
+ snapshotFiles(filesToSnapshot);
} finally {
store.decRef();
}
@@ -1320,6 +1337,84 @@ public void snapshot(final IndexCommit snapshotIndexCommit) {
snapshotStatus.moveToDone(System.currentTimeMillis());
}
+ /**
+ * Snapshot all files that belong to a shard.
+ *
+ * @param filesToSnapshot files to be snapshotted
+ */
+ private void snapshotFiles(final List filesToSnapshot) {
+ if (executor.isPresent()) {
+ // Sort the files in descending order of size so that bigger segments are uploaded before smaller
+ // segments, this will help in the max utilization of segment thread pool. If bigger segments are
+ // uploaded later then there will be only few threads which are active towards the end and rest of the
+ // threads will be idle.
+ filesToSnapshot.sort((f1, f2) -> (int)(f2.metadata().length() - f1.metadata().length()));
+ final SetOnce shardFailure = new SetOnce<>();
+ final CountDownLatch countDownLatch = new CountDownLatch(filesToSnapshot.size());
+ final long priority = priorityProvider.get().getAndIncrement() * MAX_SEGMENT_PRIORITY_IN_A_SHARD;
+ int segmentCounter = 0;
+ for (BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo : filesToSnapshot) {
+ if (segmentCounter < MAX_SEGMENT_PRIORITY_IN_A_SHARD) {
+ segmentCounter++;
+ }
+ final long segmentPriority = priority + segmentCounter;
+ executor.get().execute(new AbstractPrioritizedRunnable(segmentPriority) {
+ @Override
+ public void doRun() throws Exception {
+ if (shardFailure.get() == null && !snapshotStatus.isAborted()){
+ // shard failure will be set if any of the previous segments got an exception, if
+ // shard failure is set we can return immediately. shard failure is not set here
+ // indicating previous segments uploads were fine, good to go.
+ snapshotFile(snapshotFileInfo);
+ } else {
+ logger.info(() -> new ParameterizedMessage("Skipping segment upload {} as the shard {} already errored out",
+ snapshotFileInfo.name(), shardId));
+ }
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ logger.warn(() -> new ParameterizedMessage("Failed to perform snapshot for segment {}",
+ snapshotFileInfo.name()), e);
+ if (shardFailure.get() == null) {
+ try {
+ shardFailure.set(e);
+ } catch (SetOnce.AlreadySetException ase) {
+ // Some other thread has set it, swallow the exception
+ }
+ }
+ }
+
+ @Override
+ public void onAfter() {
+ countDownLatch.countDown();
+ }
+ });
+ }
+ while (shardFailure.get() == null && countDownLatch.getCount() > 0 && !snapshotStatus.isAborted()) {
+ try {
+ countDownLatch.await(1, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ // Do nothing
+ }
+ }
+ if (snapshotStatus.isAborted()) {
+ throw new IndexShardSnapshotFailedException(shardId, "Aborted");
+ }
+ if (shardFailure.get() != null) {
+ throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", shardFailure.get());
+ }
+ } else {
+ for (BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo : filesToSnapshot) {
+ try {
+ snapshotFile(snapshotFileInfo);
+ } catch (IOException e) {
+ throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", e);
+ }
+ }
+ }
+ }
+
/**
* Snapshot individual file
*
diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java
index e420dfb9843b8..8588930855dbc 100644
--- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java
+++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java
@@ -215,7 +215,8 @@ private Table buildTable(RestRequest req, ClusterStateResponse state, NodesInfoR
keepAlive = poolInfo.getKeepAlive().toString();
}
- if (poolInfo.getThreadPoolType() == ThreadPool.ThreadPoolType.SCALING) {
+ if (poolInfo.getThreadPoolType() == ThreadPool.ThreadPoolType.SCALING ||
+ poolInfo.getThreadPoolType() == ThreadPool.ThreadPoolType.PRIORITIZED_SCALING) {
assert poolInfo.getMin() >= 0;
core = poolInfo.getMin();
assert poolInfo.getMax() > 0;
diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java
index fbb0a876e8f29..0055d862bb6e3 100644
--- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java
+++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java
@@ -78,7 +78,9 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -107,6 +109,8 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
private final Map> shardSnapshots = new HashMap<>();
+ private final Map shardSnapshotPriorities = new HashMap<>();
+
// A map of snapshots to the shardIds that we already reported to the master as failed
private final TransportRequestDeduplicator remoteFailedRequestDeduplicator =
new TransportRequestDeduplicator<>();
@@ -226,6 +230,7 @@ private void cancelRemoved(@Nullable SnapshotsInProgress snapshotsInProgress) {
// running shards is missed, then the snapshot is removed is a subsequent cluster
// state update, which is being processed here
it.remove();
+ shardSnapshotPriorities.remove(snapshot);
for (IndexShardSnapshotStatus snapshotStatus : entry.getValue().values()) {
snapshotStatus.abortIfNotCompleted("snapshot has been removed in cluster state, aborting");
}
@@ -259,6 +264,7 @@ private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) {
}
if (startedShards != null && startedShards.isEmpty() == false) {
shardSnapshots.computeIfAbsent(snapshot, s -> new HashMap<>()).putAll(startedShards);
+ shardSnapshotPriorities.computeIfAbsent(snapshot, a -> new AtomicInteger());
startNewShards(entry, startedShards);
}
} else if (entryState == State.ABORTED) {
@@ -299,6 +305,8 @@ private void startNewShards(SnapshotsInProgress.Entry entry, Map indicesMap = entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity()));
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
+ final Executor segmentExecutor = threadPool.executor(ThreadPool.Names.SNAPSHOT_SEGMENTS);
+ final AtomicInteger shardPriorityProvider = shardSnapshotPriorities.get(snapshot);
for (final Map.Entry shardEntry : startedShards.entrySet()) {
final ShardId shardId = shardEntry.getKey();
final IndexId indexId = indicesMap.get(shardId.getIndexName());
@@ -311,7 +319,7 @@ private void startNewShards(SnapshotsInProgress.Entry entry, Map {
+
+ private final Setting coreSetting;
+ private final Setting maxSetting;
+ private final Setting keepAliveSetting;
+
+ /**
+ * Construct a scaling executor builder; the settings will have the
+ * key prefix "thread_pool." followed by the executor name.
+ *
+ * @param name the name of the executor
+ * @param core the minimum number of threads in the pool
+ * @param max the maximum number of threads in the pool
+ * @param keepAlive the time that spare threads above {@code core}
+ * threads will be kept alive
+ */
+ public PrioritizedScalingExecutorBuilder(final String name, final int core, final int max, final TimeValue keepAlive) {
+ this(name, core, max, keepAlive, "thread_pool." + name);
+ }
+
+ /**
+ * Construct a scaling executor builder; the settings will have the
+ * specified key prefix.
+ *
+ * @param name the name of the executor
+ * @param core the minimum number of threads in the pool
+ * @param max the maximum number of threads in the pool
+ * @param keepAlive the time that spare threads above {@code core}
+ * threads will be kept alive
+ * @param prefix the prefix for the settings keys
+ */
+ public PrioritizedScalingExecutorBuilder(final String name, final int core, final int max, final TimeValue keepAlive,
+ final String prefix) {
+ super(name);
+ this.coreSetting =
+ Setting.intSetting(settingsKey(prefix, "core"), core, Setting.Property.NodeScope);
+ this.maxSetting = Setting.intSetting(settingsKey(prefix, "max"), max, Setting.Property.NodeScope);
+ this.keepAliveSetting =
+ Setting.timeSetting(settingsKey(prefix, "keep_alive"), keepAlive, Setting.Property.NodeScope);
+ }
+
+ @Override
+ public List> getRegisteredSettings() {
+ return Arrays.asList(coreSetting, maxSetting, keepAliveSetting);
+ }
+
+ @Override
+ PrioritizedScalingExecutorSettings getSettings(Settings settings) {
+ final String nodeName = Node.NODE_NAME_SETTING.get(settings);
+ final int coreThreads = coreSetting.get(settings);
+ final int maxThreads = maxSetting.get(settings);
+ final TimeValue keepAlive = keepAliveSetting.get(settings);
+ return new PrioritizedScalingExecutorSettings(nodeName, coreThreads, maxThreads, keepAlive);
+ }
+
+ ThreadPool.ExecutorHolder build(final PrioritizedScalingExecutorBuilder.PrioritizedScalingExecutorSettings settings,
+ final ThreadContext threadContext) {
+ TimeValue keepAlive = settings.keepAlive;
+ int core = settings.core;
+ int max = settings.max;
+ final ThreadPool.Info info = new ThreadPool.Info(name(), ThreadPool.ThreadPoolType.PRIORITIZED_SCALING, core, max, keepAlive,
+ null);
+ final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));
+ final ExecutorService executor =
+ EsExecutors.newPrioritizedScaling(
+ settings.nodeName + "/" + name(),
+ core,
+ max,
+ keepAlive.millis(),
+ TimeUnit.MILLISECONDS,
+ threadFactory,
+ threadContext);
+ return new ThreadPool.ExecutorHolder(executor, info);
+ }
+
+ @Override
+ String formatInfo(ThreadPool.Info info) {
+ return String.format(
+ Locale.ROOT,
+ "name [%s], core [%d], max [%d], keep alive [%s]",
+ info.getName(),
+ info.getMin(),
+ info.getMax(),
+ info.getKeepAlive());
+ }
+
+ static class PrioritizedScalingExecutorSettings extends ExecutorBuilder.ExecutorSettings {
+
+ private final int core;
+ private final int max;
+ private final TimeValue keepAlive;
+
+ PrioritizedScalingExecutorSettings(final String nodeName, final int core, final int max, final TimeValue keepAlive) {
+ super(nodeName);
+ this.core = core;
+ this.max = max;
+ this.keepAlive = keepAlive;
+ }
+ }
+
+
+}
diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
index 031514c1de0d9..7382e6f7a2132 100644
--- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
+++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
@@ -79,6 +79,7 @@ public static class Names {
public static final String REFRESH = "refresh";
public static final String WARMER = "warmer";
public static final String SNAPSHOT = "snapshot";
+ public static final String SNAPSHOT_SEGMENTS = "snapshot_segments";
public static final String FORCE_MERGE = "force_merge";
public static final String FETCH_SHARD_STARTED = "fetch_shard_started";
public static final String FETCH_SHARD_STORE = "fetch_shard_store";
@@ -88,7 +89,8 @@ public enum ThreadPoolType {
DIRECT("direct"),
FIXED("fixed"),
FIXED_AUTO_QUEUE_SIZE("fixed_auto_queue_size"),
- SCALING("scaling");
+ SCALING("scaling"),
+ PRIORITIZED_SCALING("prioritized_scaling");
private final String type;
@@ -135,6 +137,7 @@ public static ThreadPoolType fromType(String type) {
map.put(Names.REFRESH, ThreadPoolType.SCALING);
map.put(Names.WARMER, ThreadPoolType.SCALING);
map.put(Names.SNAPSHOT, ThreadPoolType.SCALING);
+ map.put(Names.SNAPSHOT_SEGMENTS, ThreadPoolType.PRIORITIZED_SCALING);
map.put(Names.FORCE_MERGE, ThreadPoolType.FIXED);
map.put(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING);
map.put(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING);
@@ -169,6 +172,7 @@ public ThreadPool(final Settings settings, final ExecutorBuilder>... customBui
final Map builders = new HashMap<>();
final int availableProcessors = EsExecutors.numberOfProcessors(settings);
+ final int halfNumberOfProcessors = halfNumberOfProcessors(availableProcessors);
final int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors);
final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);
final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);
@@ -188,6 +192,8 @@ public ThreadPool(final Settings settings, final ExecutorBuilder>... customBui
builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)));
builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
+ builders.put(Names.SNAPSHOT_SEGMENTS,
+ new PrioritizedScalingExecutorBuilder(Names.SNAPSHOT_SEGMENTS, 1, halfNumberOfProcessors, TimeValue.timeValueMinutes(5)));
builders.put(Names.FETCH_SHARD_STARTED,
new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));
builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1));
@@ -434,6 +440,10 @@ static int boundedBy(int value, int min, int max) {
return Math.min(max, Math.max(min, value));
}
+ static int halfNumberOfProcessors(int numberOfProcessors) {
+ return (numberOfProcessors + 1) / 2;
+ }
+
static int halfNumberOfProcessorsMaxFive(int numberOfProcessors) {
return boundedBy((numberOfProcessors + 1) / 2, 1, 5);
}
@@ -688,7 +698,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startObject(name);
builder.field("type", type.getType());
- if (type == ThreadPoolType.SCALING) {
+ if (type == ThreadPoolType.SCALING || type == ThreadPoolType.PRIORITIZED_SCALING) {
assert min != -1;
builder.field("core", min);
assert max != -1;
diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractPrioritizedRunnableTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractPrioritizedRunnableTests.java
new file mode 100644
index 0000000000000..41f49d575445f
--- /dev/null
+++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/AbstractPrioritizedRunnableTests.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.common.util.concurrent;
+
+import org.elasticsearch.test.ESTestCase;
+
+import org.mockito.InOrder;
+
+import java.util.concurrent.Callable;
+
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests {@link AbstractPrioritizedRunnable}
+ */
+public class AbstractPrioritizedRunnableTests extends ESTestCase {
+ public void testRunSuccess() throws Exception {
+ Callable> runCallable = mock(Callable.class);
+
+ AbstractPrioritizedRunnable runnable = new AbstractPrioritizedRunnable(1) {
+ @Override
+ public void onFailure(Exception e) {
+ fail(e.toString());
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ runCallable.call();
+ }
+ };
+
+ runnable.run();
+
+ verify(runCallable).call();
+ }
+
+ public void testRunFailure() throws Exception {
+ RuntimeException exception = new RuntimeException();
+
+ AbstractPrioritizedRunnable runnable = new AbstractPrioritizedRunnable(1) {
+ @Override
+ public void onFailure(Exception e) {
+ assertSame(exception, e);
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ throw exception;
+ }
+ };
+
+ runnable.run();
+ }
+
+ public void testOnAfterSuccess() throws Exception {
+ Callable> runCallable = mock(Callable.class);
+ Callable> afterCallable = mock(Callable.class);
+
+ AbstractPrioritizedRunnable runnable = new AbstractPrioritizedRunnable(1) {
+ @Override
+ public void onFailure(Exception e) {
+ fail(e.toString());
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ runCallable.call();
+ }
+
+ @Override
+ public void onAfter() {
+ try {
+ afterCallable.call();
+ }
+ catch (Exception e) {
+ fail(e.toString());
+ }
+ }
+ };
+
+ runnable.run();
+
+ InOrder inOrder = inOrder(runCallable, afterCallable);
+
+ inOrder.verify(runCallable).call();
+ inOrder.verify(afterCallable).call();
+
+ }
+
+ public void testOnAfterFailure() throws Exception {
+ RuntimeException exception = new RuntimeException();
+ Callable> afterCallable = mock(Callable.class);
+
+ AbstractPrioritizedRunnable runnable = new AbstractPrioritizedRunnable(1) {
+ @Override
+ public void onFailure(Exception e) {
+ assertSame(exception, e);
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ throw exception;
+ }
+
+ @Override
+ public void onAfter() {
+ try {
+ afterCallable.call();
+ }
+ catch (Exception e) {
+ fail(e.toString());
+ }
+ }
+ };
+
+ runnable.run();
+
+ verify(afterCallable).call();
+ }
+
+ public void testOnRejection() throws Exception {
+ RuntimeException exception = new RuntimeException();
+ Callable> failureCallable = mock(Callable.class);
+
+ AbstractPrioritizedRunnable runnable = new AbstractPrioritizedRunnable(1) {
+ @Override
+ public void onFailure(Exception e) {
+ assertSame(exception, e);
+
+ try {
+ failureCallable.call();
+ }
+ catch (Exception inner) {
+ inner.addSuppressed(e);
+ fail(inner.toString());
+ }
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ fail("Not tested");
+ }
+ };
+
+ runnable.onRejection(exception);
+ }
+
+ public void testIsForceExecutuonDefaultsFalse() {
+ AbstractPrioritizedRunnable runnable = new AbstractPrioritizedRunnable(1) {
+ @Override
+ public void onFailure(Exception e) {
+ fail(e.toString());
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ fail("Not tested");
+ }
+ };
+
+ assertFalse(runnable.isForceExecution());
+ }
+
+}
diff --git a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java
index 0f0350c48210c..639e55f2e65bc 100644
--- a/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java
+++ b/server/src/test/java/org/elasticsearch/common/util/concurrent/EsExecutorsTests.java
@@ -23,6 +23,9 @@
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matcher;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadPoolExecutor;
@@ -240,6 +243,143 @@ public void testScaleDown() throws Exception {
terminate(pool);
}
+ public void testPrioritizedScaleUp() throws Exception {
+ final int min = between(1, 3);
+ final int max = between(min + 1, 6);
+ final CyclicBarrier barrier = new CyclicBarrier(max + 1);
+
+ ThreadPoolExecutor pool = EsExecutors.newPrioritizedScaling(getClass().getName() + "/" + getTestName(), min, max, between(1, 100),
+ randomTimeUnit(), EsExecutors.daemonThreadFactory("test"), threadContext);
+ assertThat("Min property", pool.getCorePoolSize(), equalTo(min));
+ assertThat("Max property", pool.getMaximumPoolSize(), equalTo(max));
+
+ for (int i = 0; i < max; ++i) {
+ final CountDownLatch latch = new CountDownLatch(1);
+ pool.execute(new AbstractPrioritizedRunnable(i) {
+ @Override
+ public void doRun() {
+ latch.countDown();
+ try {
+ barrier.await();
+ barrier.await();
+ Thread.sleep((getPriority()+1)*100);
+ } catch (Exception e) {
+ barrier.reset();
+ }
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ }
+
+ });
+
+ //wait until thread executes this task
+ //otherwise, a task might be queued
+ latch.await();
+ }
+
+ final List expectedPriority = new ArrayList<>();
+ final List actualPriority = new Vector<>();
+ for (int i = 0; i < max; ++i) {
+ expectedPriority.add(Long.valueOf(max+i));
+ pool.execute(new AbstractPrioritizedRunnable(2 * max - i - 1) {
+ @Override
+ public void doRun() {
+ try {
+ actualPriority.add(getPriority());
+ barrier.await();
+ } catch (Exception e) {
+ barrier.reset();
+ }
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ }
+
+ });
+ }
+
+ barrier.await();
+ assertThat("wrong pool size", pool.getPoolSize(), equalTo(max));
+ assertThat("wrong active size", pool.getActiveCount(), equalTo(max));
+ barrier.await();
+ barrier.await();
+ assertThat("wrong priority order", actualPriority, equalTo(expectedPriority));
+ terminate(pool);
+ }
+
+ public void testPrioritizedScaleDown() throws Exception {
+ final int min = between(1, 3);
+ final int max = between(min + 1, 6);
+ final CyclicBarrier barrier = new CyclicBarrier(max + 1);
+
+ final ThreadPoolExecutor pool = EsExecutors.newPrioritizedScaling(getClass().getName() + "/" + getTestName(), min, max,
+ between(1, 100), TimeUnit.MILLISECONDS, EsExecutors.daemonThreadFactory("test"), threadContext);
+ assertThat("Min property", pool.getCorePoolSize(), equalTo(min));
+ assertThat("Max property", pool.getMaximumPoolSize(), equalTo(max));
+
+ for (int i = 0; i < max; ++i) {
+ final CountDownLatch latch = new CountDownLatch(1);
+ pool.execute(new AbstractPrioritizedRunnable(i) {
+ @Override
+ public void doRun() {
+ latch.countDown();
+ try {
+ barrier.await();
+ barrier.await();
+ Thread.sleep((getPriority()+1)*100);
+ } catch (Exception e) {
+ barrier.reset();
+ }
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ }
+ });
+
+ //wait until thread executes this task
+ //otherwise, a task might be queued
+ latch.await();
+ }
+
+ final List expectedPriority = new ArrayList<>();
+ final List actualPriority = new Vector<>();
+ for (int i = 0; i < max; ++i) {
+ expectedPriority.add(Long.valueOf(max+i));
+ pool.execute(new AbstractPrioritizedRunnable(2 * max - i - 1) {
+ @Override
+ public void doRun() {
+ try {
+ actualPriority.add(Long.valueOf(getPriority()));
+ barrier.await();
+ } catch (Exception e) {
+ barrier.reset();
+ }
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ }
+
+ });
+ }
+
+ barrier.await();
+ assertThat("wrong pool size", pool.getPoolSize(), equalTo(max));
+ assertThat("wrong active size", pool.getActiveCount(), equalTo(max));
+ barrier.await();
+ barrier.await();
+ assertThat("wrong priority order", actualPriority, equalTo(expectedPriority));
+ assertBusy(() -> {
+ assertThat("wrong active count", pool.getActiveCount(), equalTo(0));
+ assertThat("idle threads didn't shrink below max. (" + pool.getPoolSize() + ")", pool.getPoolSize(), lessThan(max));
+ });
+ terminate(pool);
+ }
+
public void testRejectionMessageAndShuttingDownFlag() throws InterruptedException {
int pool = between(1, 10);
int queue = between(0, 100);
diff --git a/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java
index d5409821befd5..bd57000b853ed 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java
@@ -159,6 +159,16 @@ public static String blockNodeWithIndex(final String repositoryName, final Strin
return null;
}
+ public static String errorNodeWithIndex(final String repositoryName, final String indexName, double errorRate) {
+ for(String node : internalCluster().nodesInclude(indexName)) {
+ ((MockRepository)internalCluster().getInstance(RepositoriesService.class, node).repository(repositoryName))
+ .randomDataFileIOExceptionRate(errorRate);
+ return node;
+ }
+ fail("No nodes for the index " + indexName + " found");
+ return null;
+ }
+
public static void blockAllDataNodes(String repository) {
for(RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
((MockRepository)repositoriesService.repository(repository)).blockOnDataFiles(true);
diff --git a/server/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java
index 885baa883ed63..dfb4a7df51499 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java
@@ -21,6 +21,7 @@
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.plugins.Plugin;
@@ -32,6 +33,8 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
/**
* Tests for snapshot/restore that require at least 2 threads available
@@ -45,6 +48,8 @@ protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
.put("thread_pool.snapshot.core", 2)
.put("thread_pool.snapshot.max", 2)
+ .put("thread_pool.snapshot_segments.core", 1)
+ .put("thread_pool.snapshot_segments.max", 2)
.build();
}
@@ -206,4 +211,43 @@ public void testRestoreWithInProgressDeletionsNotAllowed() throws Exception {
client().admin().cluster().prepareRestoreSnapshot(repo, snapshot1).setWaitForCompletion(true).get();
assertEquals(1, client().admin().cluster().prepareGetSnapshots(repo).setSnapshots("_all").get().getSnapshots().size());
}
+
+ public void testSnapshotPartialDuringUploadFailure() throws Exception {
+ logger.info("--> start 2 nodes");
+ internalCluster().startNode();
+ internalCluster().startNode();
+ Client client = client();
+
+ assertAcked(prepareCreate("test-idx", 2, Settings.builder().put("number_of_shards", 2).put("number_of_replicas", 0)));
+ ensureGreen();
+
+ logger.info("--> indexing some data");
+ for (int i = 0; i < 100; i++) {
+ index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i);
+ }
+ refresh();
+ assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits().value, equalTo(100L));
+
+ logger.info("--> create repository");
+ logger.info("--> creating repository");
+ assertAcked(client.admin().cluster().preparePutRepository("test-repo")
+ .setType("mock").setSettings(
+ Settings.builder()
+ .put("location", randomRepoPath())
+ .put("random", randomAlphaOfLength(10))
+ ).get());
+
+ String errorNode = errorNodeWithIndex("test-repo", "test-idx", 100);
+
+ logger.info("--> snapshot");
+ client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get();
+
+ logger.info("--> waiting for completion");
+ SnapshotInfo snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(60));
+ logger.info("Number of failed shards [{}]", snapshotInfo.shardFailures().size());
+ assertThat(snapshotInfo.state(), equalTo(SnapshotState.PARTIAL));
+ assertThat(((MockRepository)internalCluster().getInstance(RepositoriesService.class, errorNode).repository("test-repo"))
+ .getFailureCount(), lessThanOrEqualTo(2L));
+ logger.info("--> done");
+ }
}
diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java
index 8a49324757f27..caee1dcb30503 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java
@@ -87,7 +87,7 @@ public long getFailureCount() {
private final double randomControlIOExceptionRate;
- private final double randomDataFileIOExceptionRate;
+ private volatile double randomDataFileIOExceptionRate;
private final boolean useLuceneCorruptionException;
@@ -175,6 +175,10 @@ public synchronized void unblock() {
this.notifyAll();
}
+ public void randomDataFileIOExceptionRate(double rate) {
+ randomDataFileIOExceptionRate = rate;
+ }
+
public void blockOnDataFiles(boolean blocked) {
blockOnDataFiles = blocked;
}
diff --git a/server/src/test/java/org/elasticsearch/threadpool/PrioritizedScalingThreadPoolTests.java b/server/src/test/java/org/elasticsearch/threadpool/PrioritizedScalingThreadPoolTests.java
new file mode 100644
index 0000000000000..1073c245c6207
--- /dev/null
+++ b/server/src/test/java/org/elasticsearch/threadpool/PrioritizedScalingThreadPoolTests.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.threadpool;
+
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.AbstractPrioritizedRunnable;
+import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.Matchers.equalTo;
+
+public class PrioritizedScalingThreadPoolTests extends ESThreadPoolTestCase {
+
+ public void testPrioritizedScalingThreadPoolConfiguration() throws InterruptedException {
+ final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.PRIORITIZED_SCALING);
+ final Settings.Builder builder = Settings.builder();
+
+ final int core;
+ if (randomBoolean()) {
+ core = randomIntBetween(0, 8);
+ builder.put("thread_pool." + threadPoolName + ".core", core);
+ } else {
+ core = "generic".equals(threadPoolName) ? 4 : 1; // the defaults
+ }
+
+ final int maxBasedOnNumberOfProcessors;
+ if (randomBoolean()) {
+ final int processors = randomIntBetween(1, 64);
+ maxBasedOnNumberOfProcessors = expectedSize(threadPoolName, processors);
+ builder.put("processors", processors);
+ } else {
+ maxBasedOnNumberOfProcessors = expectedSize(threadPoolName, Runtime.getRuntime().availableProcessors());
+ }
+
+ final int expectedMax;
+ if (maxBasedOnNumberOfProcessors < core || randomBoolean()) {
+ expectedMax = randomIntBetween(Math.max(1, core), 16);
+ builder.put("thread_pool." + threadPoolName + ".max", expectedMax);
+ } else {
+ expectedMax = maxBasedOnNumberOfProcessors;
+ }
+
+ final long keepAlive;
+ if (randomBoolean()) {
+ keepAlive = randomIntBetween(1, 300);
+ builder.put("thread_pool." + threadPoolName + ".keep_alive", keepAlive + "s");
+ } else {
+ keepAlive = "generic".equals(threadPoolName) ? 30 : 300; // the defaults
+ }
+
+ runPrioritizedScalingThreadPoolTest(builder.build(), (clusterSettings, threadPool) -> {
+ final Executor executor = threadPool.executor(threadPoolName);
+ assertThat(executor, instanceOf(EsThreadPoolExecutor.class));
+ final EsThreadPoolExecutor esThreadPoolExecutor = (EsThreadPoolExecutor)executor;
+ final ThreadPool.Info info = info(threadPool, threadPoolName);
+
+ assertThat(info.getName(), equalTo(threadPoolName));
+ assertThat(info.getThreadPoolType(), equalTo(ThreadPool.ThreadPoolType.PRIORITIZED_SCALING));
+
+ assertThat(info.getKeepAlive().seconds(), equalTo(keepAlive));
+ assertThat(esThreadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS), equalTo(keepAlive));
+
+ assertNull(info.getQueueSize());
+ assertThat(esThreadPoolExecutor.getQueue().remainingCapacity(), equalTo(Integer.MAX_VALUE));
+
+ assertThat(info.getMin(), equalTo(core));
+ assertThat(esThreadPoolExecutor.getCorePoolSize(), equalTo(core));
+ assertThat(info.getMax(), equalTo(expectedMax));
+ assertThat(esThreadPoolExecutor.getMaximumPoolSize(), equalTo(expectedMax));
+ });
+ }
+
+ private int expectedSize(final String threadPoolName, final int numberOfProcessors) {
+ final Map> sizes = new HashMap<>();
+ sizes.put(ThreadPool.Names.SNAPSHOT_SEGMENTS, ThreadPool::halfNumberOfProcessors);
+ return sizes.get(threadPoolName).apply(numberOfProcessors);
+ }
+
+ public void testPrioritizedScalingThreadPoolIsBounded() throws InterruptedException {
+ final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.PRIORITIZED_SCALING);
+ final int size = randomIntBetween(32, 512);
+ final Settings settings = Settings.builder().put("thread_pool." + threadPoolName + ".max", size).build();
+ runPrioritizedScalingThreadPoolTest(settings, (clusterSettings, threadPool) -> {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final int numberOfTasks = 2 * size;
+ final CountDownLatch taskLatch = new CountDownLatch(numberOfTasks);
+ for (int i = 0; i < numberOfTasks; i++) {
+ threadPool.executor(threadPoolName).execute(getRunnable(latch, taskLatch));
+ }
+ final ThreadPoolStats.Stats stats = stats(threadPool, threadPoolName);
+ assertThat(stats.getQueue(), equalTo(numberOfTasks - size));
+ assertThat(stats.getLargest(), equalTo(size));
+ latch.countDown();
+ try {
+ taskLatch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ public void testPrioritizedScalingThreadPoolThreadsAreTerminatedAfterKeepAlive() throws InterruptedException {
+ final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.PRIORITIZED_SCALING);
+ final int min = "generic".equals(threadPoolName) ? 4 : 1;
+ final Settings settings =
+ Settings.builder()
+ .put("thread_pool." + threadPoolName + ".max", 128)
+ .put("thread_pool." + threadPoolName + ".keep_alive", "1ms")
+ .build();
+ runPrioritizedScalingThreadPoolTest(settings, ((clusterSettings, threadPool) -> {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch taskLatch = new CountDownLatch(128);
+ for (int i = 0; i < 128; i++) {
+ threadPool.executor(threadPoolName).execute(getRunnable(latch, taskLatch));
+ }
+ int threads = stats(threadPool, threadPoolName).getThreads();
+ assertEquals(128, threads);
+ latch.countDown();
+ // this while loop is the core of this test; if threads
+ // are correctly idled down by the pool, the number of
+ // threads in the pool will drop to the min for the pool
+ // but if threads are not correctly idled down by the pool,
+ // this test will just timeout waiting for them to idle
+ // down
+ do {
+ spinForAtLeastOneMillisecond();
+ } while (stats(threadPool, threadPoolName).getThreads() > min);
+ try {
+ taskLatch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }));
+ }
+
+ private void runPrioritizedScalingThreadPoolTest(
+ final Settings settings,
+ final BiConsumer consumer) throws InterruptedException {
+ ThreadPool threadPool = null;
+ try {
+ final String test = Thread.currentThread().getStackTrace()[2].getMethodName();
+ final Settings nodeSettings = Settings.builder().put(settings).put("node.name", test).build();
+ threadPool = new ThreadPool(nodeSettings);
+ final ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
+ consumer.accept(clusterSettings, threadPool);
+ } finally {
+ terminateThreadPoolIfNeeded(threadPool);
+ }
+ }
+
+ private Runnable getRunnable(CountDownLatch latch, CountDownLatch taskLatch) {
+ return new AbstractPrioritizedRunnable(1) {
+ @Override
+ public void onFailure(Exception e) {
+
+ }
+
+ @Override
+ protected void doRun() throws Exception {
+ try {
+ latch.await();
+ taskLatch.countDown();
+ } catch (final InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+}
diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java
index 4567b97700604..0be6b09a2a4f6 100644
--- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java
+++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java
@@ -24,6 +24,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.AbstractPrioritizedRunnable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolInfo;
import org.elasticsearch.threadpool.ThreadPoolStats;
@@ -134,6 +135,9 @@ private void runTask(final int index) {
* Schedule a task for immediate execution.
*/
public void scheduleNow(final Runnable task) {
+ if (task instanceof AbstractPrioritizedRunnable) {
+ task.run();
+ }
if (executionDelayVariabilityMillis > 0 && random.nextBoolean()) {
final long executionDelay = RandomNumbers.randomLongBetween(random, 1, executionDelayVariabilityMillis);
final DeferredTask deferredTask = new DeferredTask(currentTimeMillis + executionDelay, task);