From c4e1a43f9cd2a77698ab9f656d4272cac3050cb5 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 17 May 2021 16:10:27 +0200 Subject: [PATCH 1/6] Introduce SNAPSHOT_META Threadpool for Fetching Repository Metadata Adds new snapshot meta pool that is used to speed up the get snapshots API by making `SnapshotInfo` load in parallel. Also use this pool to load `RepositoryData`. A follow-up to this would expand the use of this pool to the snapshot status API and make it run in parallel as well. --- .../get/TransportGetSnapshotsAction.java | 95 +++++++++++++++---- .../blobstore/BlobStoreRepository.java | 6 +- .../elasticsearch/threadpool/ThreadPool.java | 4 + .../threadpool/ScalingThreadPoolTests.java | 1 + 4 files changed, 85 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java index 53b091abf89f3..e9cc1b3eaca54 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java @@ -14,7 +14,6 @@ import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.StepListener; import org.elasticsearch.action.admin.cluster.repositories.get.TransportGetRepositoriesAction; import org.elasticsearch.action.support.ActionFilters; @@ -46,12 +45,16 @@ import org.elasticsearch.transport.TransportService; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.Collectors; import static java.util.Collections.unmodifiableList; @@ -211,8 +214,7 @@ private void loadSnapshotInfos(SnapshotsInProgress snapshotsInProgress, String r } if (verbose) { - threadPool.generic().execute(ActionRunnable.supply( - listener, () -> snapshots(snapshotsInProgress, repo, new ArrayList<>(toResolve), ignoreUnavailable, task))); + snapshots(snapshotsInProgress, repo, toResolve, ignoreUnavailable, task, listener); } else { final List snapshotInfos; if (repositoryData != null) { @@ -235,12 +237,16 @@ private void loadSnapshotInfos(SnapshotsInProgress snapshotsInProgress, String r * @param snapshotIds snapshots for which to fetch snapshot information * @param ignoreUnavailable if true, snapshots that could not be read will only be logged with a warning, * if false, they will throw an error - * @return list of snapshots */ - private List snapshots(SnapshotsInProgress snapshotsInProgress, String repositoryName, - List snapshotIds, boolean ignoreUnavailable, CancellableTask task) { + private void snapshots(SnapshotsInProgress snapshotsInProgress, + String repositoryName, + Collection snapshotIds, + boolean ignoreUnavailable, + CancellableTask task, + ActionListener> listener) { if (task.isCancelled()) { - throw new TaskCancelledException("task cancelled"); + listener.onFailure(new TaskCancelledException("task cancelled")); + return; } final Set snapshotSet = new HashSet<>(); final Set snapshotIdsToIterate = new HashSet<>(snapshotIds); @@ -252,28 +258,79 @@ private List snapshots(SnapshotsInProgress snapshotsInProgress, St snapshotSet.add(new SnapshotInfo(entry)); } } - // then, look in the repository + // then, look in the repository if there's any matching snapshots left + final List snapshotInfos; + if (snapshotIdsToIterate.isEmpty()) { + snapshotInfos = Collections.emptyList(); + } else { + snapshotInfos = Collections.synchronizedList(new ArrayList<>()); + } + final ActionListener> allDoneListener = listener.delegateFailure((l, v) -> { + final ArrayList snapshotList = new ArrayList<>(snapshotInfos); + snapshotList.addAll(snapshotSet); + CollectionUtil.timSort(snapshotList); + listener.onResponse(unmodifiableList(snapshotList)); + }); + if (snapshotIdsToIterate.isEmpty()) { + allDoneListener.onResponse(Collections.emptyList()); + return; + } + // put snapshot info downloads into a task queue instead of pushing them all into the queue to not completely monopolize the + // snapshot meta pool for a single request + final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT_META).getMax(), snapshotIdsToIterate.size()); + final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT_META); + final BlockingQueue queue = new LinkedBlockingQueue<>(snapshotIdsToIterate); + final ActionListener workerDoneListener = new GroupedActionListener<>(allDoneListener, workers).delegateResponse((l, e) -> { + queue.clear(); // Stop fetching the remaining snapshots once we've failed fetching one since the response is an error response + // anyway in this case + l.onFailure(e); + }); final Repository repository = repositoriesService.repository(repositoryName); - for (SnapshotId snapshotId : snapshotIdsToIterate) { + for (int i = 0; i < workers; i++) { + getOneSnapshotInfo( + ignoreUnavailable, + repository, + queue, + snapshotInfos, + task, + executor, + workerDoneListener + ); + } + } + + private void getOneSnapshotInfo(boolean ignoreUnavailable, + Repository repository, + BlockingQueue queue, + Collection snapshotInfos, + CancellableTask task, + Executor executor, + ActionListener listener) { + final SnapshotId snapshotId = queue.poll(); + if (snapshotId == null) { + listener.onResponse(null); + return; + } + executor.execute(() -> { if (task.isCancelled()) { - throw new TaskCancelledException("task cancelled"); + listener.onFailure(new TaskCancelledException("task cancelled")); + return; } try { - snapshotSet.add(repository.getSnapshotInfo(snapshotId)); + snapshotInfos.add(repository.getSnapshotInfo(snapshotId)); } catch (Exception ex) { if (ignoreUnavailable) { logger.warn(() -> new ParameterizedMessage("failed to get snapshot [{}]", snapshotId), ex); } else { - if (ex instanceof SnapshotException) { - throw ex; - } - throw new SnapshotException(repositoryName, snapshotId, "Snapshot could not be read", ex); + listener.onFailure( + ex instanceof SnapshotException + ? ex + : new SnapshotException(repository.getMetadata().name(), snapshotId, "Snapshot could not be read", ex) + ); } } - } - final ArrayList snapshotList = new ArrayList<>(snapshotSet); - CollectionUtil.timSort(snapshotList); - return unmodifiableList(snapshotList); + getOneSnapshotInfo(ignoreUnavailable, repository, queue, snapshotInfos, task, executor, listener); + }); } private boolean isAllSnapshots(String[] snapshots) { diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 38f01969619e6..7b3d994d99050 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1315,6 +1315,7 @@ public long getRestoreThrottleTimeInNanos() { protected void assertSnapshotOrGenericThread() { assert Thread.currentThread().getName().contains('[' + ThreadPool.Names.SNAPSHOT + ']') + || Thread.currentThread().getName().contains('[' + ThreadPool.Names.SNAPSHOT_META + ']') || Thread.currentThread().getName().contains('[' + ThreadPool.Names.GENERIC + ']') : "Expected current thread [" + Thread.currentThread() + "] to be the snapshot or generic thread."; } @@ -1428,11 +1429,12 @@ public void getRepositoryData(ActionListener listener) { // Don't deduplicate repo data loading if we don't have strong consistency guarantees between the repo and the cluster state // Also, if we are not caching repository data (for tests) we assume that the contents of the repository data at a given // generation may change + final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT_META); if (bestEffortConsistency || cacheRepositoryData == false) { - threadPool.generic().execute(ActionRunnable.wrap(listener, this::doGetRepositoryData)); + executor.execute(ActionRunnable.wrap(listener, this::doGetRepositoryData)); } else { repoDataDeduplicator.executeOnce(metadata, listener, (metadata, l) -> - threadPool.generic().execute(ActionRunnable.wrap(l, this::doGetRepositoryData))); + executor.execute(ActionRunnable.wrap(l, this::doGetRepositoryData))); } } } diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index bdc011de774bd..365cc99ace86b 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -69,6 +69,7 @@ public static class Names { public static final String REFRESH = "refresh"; public static final String WARMER = "warmer"; public static final String SNAPSHOT = "snapshot"; + public static final String SNAPSHOT_META = "snapshot_meta"; public static final String FORCE_MERGE = "force_merge"; public static final String FETCH_SHARD_STARTED = "fetch_shard_started"; public static final String FETCH_SHARD_STORE = "fetch_shard_store"; @@ -116,6 +117,7 @@ public static ThreadPoolType fromType(String type) { entry(Names.REFRESH, ThreadPoolType.SCALING), entry(Names.WARMER, ThreadPoolType.SCALING), entry(Names.SNAPSHOT, ThreadPoolType.SCALING), + entry(Names.SNAPSHOT_META, ThreadPoolType.SCALING), entry(Names.FORCE_MERGE, ThreadPoolType.FIXED), entry(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING), entry(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING), @@ -189,6 +191,8 @@ public ThreadPool(final Settings settings, final ExecutorBuilder... customBui builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5))); builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); + builders.put(Names.SNAPSHOT_META, new ScalingExecutorBuilder(Names.SNAPSHOT_META, 1, 2 * allocatedProcessors, + TimeValue.timeValueMinutes(5))); builders.put(Names.FETCH_SHARD_STARTED, new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5))); builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1, false)); diff --git a/server/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java b/server/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java index 1e8a311557d20..f3a4f984f6d43 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java @@ -96,6 +96,7 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso sizes.put(ThreadPool.Names.REFRESH, ThreadPool::halfAllocatedProcessorsMaxTen); sizes.put(ThreadPool.Names.WARMER, ThreadPool::halfAllocatedProcessorsMaxFive); sizes.put(ThreadPool.Names.SNAPSHOT, ThreadPool::halfAllocatedProcessorsMaxFive); + sizes.put(ThreadPool.Names.SNAPSHOT_META, ThreadPool::twiceAllocatedProcessors); sizes.put(ThreadPool.Names.FETCH_SHARD_STARTED, ThreadPool::twiceAllocatedProcessors); sizes.put(ThreadPool.Names.FETCH_SHARD_STORE, ThreadPool::twiceAllocatedProcessors); return sizes.get(threadPoolName).apply(numberOfProcessors); From b4ba77e25948d1187d954c58f8f07d27bbc54b8a Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 18 May 2021 12:21:50 +0200 Subject: [PATCH 2/6] CR: docs --- docs/reference/modules/threadpool.asciidoc | 6 ++++++ .../cluster/snapshots/get/TransportGetSnapshotsAction.java | 6 ++++++ .../main/java/org/elasticsearch/threadpool/ThreadPool.java | 4 ++-- .../elasticsearch/threadpool/ScalingThreadPoolTests.java | 2 +- 4 files changed, 15 insertions(+), 3 deletions(-) diff --git a/docs/reference/modules/threadpool.asciidoc b/docs/reference/modules/threadpool.asciidoc index 43c4b3ecf01f6..97156b068f038 100644 --- a/docs/reference/modules/threadpool.asciidoc +++ b/docs/reference/modules/threadpool.asciidoc @@ -1,3 +1,4 @@ + [[modules-threadpool]] === Thread pools @@ -41,6 +42,11 @@ There are several thread pools, but the important ones include: keep-alive of `5m` and a max of `min(5, (`<>`) / 2)`. +`snapshot_meta`:: + For snapshot repository metadata read operations. Thread pool type is `scaling` with a + keep-alive of `5m` and a max of `min(50, (`<>` pass:[ * ]3))`. + `warmer`:: For segment warm-up operations. Thread pool type is `scaling` with a keep-alive of `5m` and a max of `min(5, (`< queue, diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 365cc99ace86b..87b21d448c62c 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -191,8 +191,8 @@ public ThreadPool(final Settings settings, final ExecutorBuilder... customBui builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5))); builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); - builders.put(Names.SNAPSHOT_META, new ScalingExecutorBuilder(Names.SNAPSHOT_META, 1, 2 * allocatedProcessors, - TimeValue.timeValueMinutes(5))); + builders.put(Names.SNAPSHOT_META, new ScalingExecutorBuilder(Names.SNAPSHOT_META, 1, Math.min(allocatedProcessors * 3, 50), + TimeValue.timeValueSeconds(30L))); builders.put(Names.FETCH_SHARD_STARTED, new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5))); builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1, false)); diff --git a/server/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java b/server/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java index f3a4f984f6d43..c58924e550b46 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java @@ -96,7 +96,7 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso sizes.put(ThreadPool.Names.REFRESH, ThreadPool::halfAllocatedProcessorsMaxTen); sizes.put(ThreadPool.Names.WARMER, ThreadPool::halfAllocatedProcessorsMaxFive); sizes.put(ThreadPool.Names.SNAPSHOT, ThreadPool::halfAllocatedProcessorsMaxFive); - sizes.put(ThreadPool.Names.SNAPSHOT_META, ThreadPool::twiceAllocatedProcessors); + sizes.put(ThreadPool.Names.SNAPSHOT_META, n -> Math.min(n * 3, 50)); sizes.put(ThreadPool.Names.FETCH_SHARD_STARTED, ThreadPool::twiceAllocatedProcessors); sizes.put(ThreadPool.Names.FETCH_SHARD_STORE, ThreadPool::twiceAllocatedProcessors); return sizes.get(threadPoolName).apply(numberOfProcessors); From fb55daa956249c41ac8034f9ba789c2344b366c1 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 18 May 2021 12:58:01 +0200 Subject: [PATCH 3/6] fix repo missing case --- docs/reference/modules/threadpool.asciidoc | 1 - .../snapshots/get/TransportGetSnapshotsAction.java | 9 ++++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/docs/reference/modules/threadpool.asciidoc b/docs/reference/modules/threadpool.asciidoc index 97156b068f038..b130d3b217979 100644 --- a/docs/reference/modules/threadpool.asciidoc +++ b/docs/reference/modules/threadpool.asciidoc @@ -1,4 +1,3 @@ - [[modules-threadpool]] === Thread pools diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java index f88a668c83ea0..d82613d3ebc17 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java @@ -33,6 +33,7 @@ import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.repositories.RepositoryMissingException; import org.elasticsearch.snapshots.SnapshotException; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; @@ -285,7 +286,13 @@ private void snapshots(SnapshotsInProgress snapshotsInProgress, // anyway in this case l.onFailure(e); }); - final Repository repository = repositoriesService.repository(repositoryName); + final Repository repository; + try { + repository = repositoriesService.repository(repositoryName); + } catch (RepositoryMissingException e) { + listener.onFailure(e); + return; + } for (int i = 0; i < workers; i++) { getOneSnapshotInfo( ignoreUnavailable, From 24cf149f94e6b68eac231b79cca4f5eda3f52fe4 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 18 May 2021 13:23:07 +0200 Subject: [PATCH 4/6] don't pass executor around --- .../snapshots/get/TransportGetSnapshotsAction.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java index d82613d3ebc17..e2e89b3d5fd42 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java @@ -54,7 +54,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.Collectors; @@ -279,7 +278,6 @@ private void snapshots(SnapshotsInProgress snapshotsInProgress, // put snapshot info downloads into a task queue instead of pushing them all into the queue to not completely monopolize the // snapshot meta pool for a single request final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT_META).getMax(), snapshotIdsToIterate.size()); - final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT_META); final BlockingQueue queue = new LinkedBlockingQueue<>(snapshotIdsToIterate); final ActionListener workerDoneListener = new GroupedActionListener<>(allDoneListener, workers).delegateResponse((l, e) -> { queue.clear(); // Stop fetching the remaining snapshots once we've failed fetching one since the response is an error response @@ -300,7 +298,6 @@ private void snapshots(SnapshotsInProgress snapshotsInProgress, queue, snapshotInfos, task, - executor, workerDoneListener ); } @@ -308,7 +305,7 @@ private void snapshots(SnapshotsInProgress snapshotsInProgress, /** * Tries to poll a {@link SnapshotId} to load {@link SnapshotInfo} for from the given {@code queue}. If it finds one in the queue, - * loads the snapshot info from the repository on the given {@code executor} and adds it to the given {@code snapshotInfos} collect, + * loads the snapshot info from the repository and adds it to the given {@code snapshotInfos} collect, * then invokes itself again to try and poll another task from the queue. * If the queue is empty resolves {@code} listener. */ @@ -317,14 +314,13 @@ private void getOneSnapshotInfo(boolean ignoreUnavailable, BlockingQueue queue, Collection snapshotInfos, CancellableTask task, - Executor executor, ActionListener listener) { final SnapshotId snapshotId = queue.poll(); if (snapshotId == null) { listener.onResponse(null); return; } - executor.execute(() -> { + threadPool.executor(ThreadPool.Names.SNAPSHOT_META).execute(() -> { if (task.isCancelled()) { listener.onFailure(new TaskCancelledException("task cancelled")); return; @@ -342,7 +338,7 @@ private void getOneSnapshotInfo(boolean ignoreUnavailable, ); } } - getOneSnapshotInfo(ignoreUnavailable, repository, queue, snapshotInfos, task, executor, listener); + getOneSnapshotInfo(ignoreUnavailable, repository, queue, snapshotInfos, task, listener); }); } From bde33f68f8354e16237ab8d98dad11347cc701af Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 18 May 2021 13:25:07 +0200 Subject: [PATCH 5/6] typos --- .../cluster/snapshots/get/TransportGetSnapshotsAction.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java index e2e89b3d5fd42..f38aa3dbc28dc 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java @@ -305,8 +305,8 @@ private void snapshots(SnapshotsInProgress snapshotsInProgress, /** * Tries to poll a {@link SnapshotId} to load {@link SnapshotInfo} for from the given {@code queue}. If it finds one in the queue, - * loads the snapshot info from the repository and adds it to the given {@code snapshotInfos} collect, - * then invokes itself again to try and poll another task from the queue. + * loads the snapshot info from the repository and adds it to the given {@code snapshotInfos} collection, then invokes itself again to + * try and poll another task from the queue. * If the queue is empty resolves {@code} listener. */ private void getOneSnapshotInfo(boolean ignoreUnavailable, From 9f465ac2c7f81409e0a0274765fe6c07a4bd2ed9 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 18 May 2021 13:48:06 +0200 Subject: [PATCH 6/6] fix test --- .../org/elasticsearch/threadpool/ScalingThreadPoolTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java b/server/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java index c58924e550b46..0e70e15448065 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java @@ -62,7 +62,8 @@ public void testScalingThreadPoolConfiguration() throws InterruptedException { keepAlive = randomIntBetween(1, 300); builder.put("thread_pool." + threadPoolName + ".keep_alive", keepAlive + "s"); } else { - keepAlive = "generic".equals(threadPoolName) ? 30 : 300; // the defaults + keepAlive = "generic".equals(threadPoolName) || ThreadPool.Names.SNAPSHOT_META.equals(threadPoolName) + ? 30 : 300; // the defaults } runScalingThreadPoolTest(builder.build(), (clusterSettings, threadPool) -> {