diff --git a/docs/reference/modules/threadpool.asciidoc b/docs/reference/modules/threadpool.asciidoc index 43c4b3ecf01f6..b130d3b217979 100644 --- a/docs/reference/modules/threadpool.asciidoc +++ b/docs/reference/modules/threadpool.asciidoc @@ -41,6 +41,11 @@ There are several thread pools, but the important ones include: keep-alive of `5m` and a max of `min(5, (`<>`) / 2)`. +`snapshot_meta`:: + For snapshot repository metadata read operations. Thread pool type is `scaling` with a + keep-alive of `5m` and a max of `min(50, (`<>` pass:[ * ]3))`. + `warmer`:: For segment warm-up operations. Thread pool type is `scaling` with a keep-alive of `5m` and a max of `min(5, (`< snapshots(snapshotsInProgress, repo, new ArrayList<>(toResolve), ignoreUnavailable, task))); + snapshots(snapshotsInProgress, repo, toResolve, ignoreUnavailable, task, listener); } else { final List snapshotInfos; if (repositoryData != null) { @@ -235,12 +237,16 @@ private void loadSnapshotInfos(SnapshotsInProgress snapshotsInProgress, String r * @param snapshotIds snapshots for which to fetch snapshot information * @param ignoreUnavailable if true, snapshots that could not be read will only be logged with a warning, * if false, they will throw an error - * @return list of snapshots */ - private List snapshots(SnapshotsInProgress snapshotsInProgress, String repositoryName, - List snapshotIds, boolean ignoreUnavailable, CancellableTask task) { + private void snapshots(SnapshotsInProgress snapshotsInProgress, + String repositoryName, + Collection snapshotIds, + boolean ignoreUnavailable, + CancellableTask task, + ActionListener> listener) { if (task.isCancelled()) { - throw new TaskCancelledException("task cancelled"); + listener.onFailure(new TaskCancelledException("task cancelled")); + return; } final Set snapshotSet = new HashSet<>(); final Set snapshotIdsToIterate = new HashSet<>(snapshotIds); @@ -252,28 +258,88 @@ private List snapshots(SnapshotsInProgress snapshotsInProgress, St snapshotSet.add(new SnapshotInfo(entry)); } } - // then, look in the repository - final Repository repository = repositoriesService.repository(repositoryName); - for (SnapshotId snapshotId : snapshotIdsToIterate) { + // then, look in the repository if there's any matching snapshots left + final List snapshotInfos; + if (snapshotIdsToIterate.isEmpty()) { + snapshotInfos = Collections.emptyList(); + } else { + snapshotInfos = Collections.synchronizedList(new ArrayList<>()); + } + final ActionListener> allDoneListener = listener.delegateFailure((l, v) -> { + final ArrayList snapshotList = new ArrayList<>(snapshotInfos); + snapshotList.addAll(snapshotSet); + CollectionUtil.timSort(snapshotList); + listener.onResponse(unmodifiableList(snapshotList)); + }); + if (snapshotIdsToIterate.isEmpty()) { + allDoneListener.onResponse(Collections.emptyList()); + return; + } + // put snapshot info downloads into a task queue instead of pushing them all into the queue to not completely monopolize the + // snapshot meta pool for a single request + final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT_META).getMax(), snapshotIdsToIterate.size()); + final BlockingQueue queue = new LinkedBlockingQueue<>(snapshotIdsToIterate); + final ActionListener workerDoneListener = new GroupedActionListener<>(allDoneListener, workers).delegateResponse((l, e) -> { + queue.clear(); // Stop fetching the remaining snapshots once we've failed fetching one since the response is an error response + // anyway in this case + l.onFailure(e); + }); + final Repository repository; + try { + repository = repositoriesService.repository(repositoryName); + } catch (RepositoryMissingException e) { + listener.onFailure(e); + return; + } + for (int i = 0; i < workers; i++) { + getOneSnapshotInfo( + ignoreUnavailable, + repository, + queue, + snapshotInfos, + task, + workerDoneListener + ); + } + } + + /** + * Tries to poll a {@link SnapshotId} to load {@link SnapshotInfo} for from the given {@code queue}. If it finds one in the queue, + * loads the snapshot info from the repository and adds it to the given {@code snapshotInfos} collection, then invokes itself again to + * try and poll another task from the queue. + * If the queue is empty resolves {@code} listener. + */ + private void getOneSnapshotInfo(boolean ignoreUnavailable, + Repository repository, + BlockingQueue queue, + Collection snapshotInfos, + CancellableTask task, + ActionListener listener) { + final SnapshotId snapshotId = queue.poll(); + if (snapshotId == null) { + listener.onResponse(null); + return; + } + threadPool.executor(ThreadPool.Names.SNAPSHOT_META).execute(() -> { if (task.isCancelled()) { - throw new TaskCancelledException("task cancelled"); + listener.onFailure(new TaskCancelledException("task cancelled")); + return; } try { - snapshotSet.add(repository.getSnapshotInfo(snapshotId)); + snapshotInfos.add(repository.getSnapshotInfo(snapshotId)); } catch (Exception ex) { if (ignoreUnavailable) { logger.warn(() -> new ParameterizedMessage("failed to get snapshot [{}]", snapshotId), ex); } else { - if (ex instanceof SnapshotException) { - throw ex; - } - throw new SnapshotException(repositoryName, snapshotId, "Snapshot could not be read", ex); + listener.onFailure( + ex instanceof SnapshotException + ? ex + : new SnapshotException(repository.getMetadata().name(), snapshotId, "Snapshot could not be read", ex) + ); } } - } - final ArrayList snapshotList = new ArrayList<>(snapshotSet); - CollectionUtil.timSort(snapshotList); - return unmodifiableList(snapshotList); + getOneSnapshotInfo(ignoreUnavailable, repository, queue, snapshotInfos, task, listener); + }); } private boolean isAllSnapshots(String[] snapshots) { diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 38f01969619e6..7b3d994d99050 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1315,6 +1315,7 @@ public long getRestoreThrottleTimeInNanos() { protected void assertSnapshotOrGenericThread() { assert Thread.currentThread().getName().contains('[' + ThreadPool.Names.SNAPSHOT + ']') + || Thread.currentThread().getName().contains('[' + ThreadPool.Names.SNAPSHOT_META + ']') || Thread.currentThread().getName().contains('[' + ThreadPool.Names.GENERIC + ']') : "Expected current thread [" + Thread.currentThread() + "] to be the snapshot or generic thread."; } @@ -1428,11 +1429,12 @@ public void getRepositoryData(ActionListener listener) { // Don't deduplicate repo data loading if we don't have strong consistency guarantees between the repo and the cluster state // Also, if we are not caching repository data (for tests) we assume that the contents of the repository data at a given // generation may change + final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT_META); if (bestEffortConsistency || cacheRepositoryData == false) { - threadPool.generic().execute(ActionRunnable.wrap(listener, this::doGetRepositoryData)); + executor.execute(ActionRunnable.wrap(listener, this::doGetRepositoryData)); } else { repoDataDeduplicator.executeOnce(metadata, listener, (metadata, l) -> - threadPool.generic().execute(ActionRunnable.wrap(l, this::doGetRepositoryData))); + executor.execute(ActionRunnable.wrap(l, this::doGetRepositoryData))); } } } diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index bdc011de774bd..87b21d448c62c 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -69,6 +69,7 @@ public static class Names { public static final String REFRESH = "refresh"; public static final String WARMER = "warmer"; public static final String SNAPSHOT = "snapshot"; + public static final String SNAPSHOT_META = "snapshot_meta"; public static final String FORCE_MERGE = "force_merge"; public static final String FETCH_SHARD_STARTED = "fetch_shard_started"; public static final String FETCH_SHARD_STORE = "fetch_shard_store"; @@ -116,6 +117,7 @@ public static ThreadPoolType fromType(String type) { entry(Names.REFRESH, ThreadPoolType.SCALING), entry(Names.WARMER, ThreadPoolType.SCALING), entry(Names.SNAPSHOT, ThreadPoolType.SCALING), + entry(Names.SNAPSHOT_META, ThreadPoolType.SCALING), entry(Names.FORCE_MERGE, ThreadPoolType.FIXED), entry(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING), entry(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING), @@ -189,6 +191,8 @@ public ThreadPool(final Settings settings, final ExecutorBuilder... customBui builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5))); builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); + builders.put(Names.SNAPSHOT_META, new ScalingExecutorBuilder(Names.SNAPSHOT_META, 1, Math.min(allocatedProcessors * 3, 50), + TimeValue.timeValueSeconds(30L))); builders.put(Names.FETCH_SHARD_STARTED, new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * allocatedProcessors, TimeValue.timeValueMinutes(5))); builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1, false)); diff --git a/server/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java b/server/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java index 1e8a311557d20..0e70e15448065 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java @@ -62,7 +62,8 @@ public void testScalingThreadPoolConfiguration() throws InterruptedException { keepAlive = randomIntBetween(1, 300); builder.put("thread_pool." + threadPoolName + ".keep_alive", keepAlive + "s"); } else { - keepAlive = "generic".equals(threadPoolName) ? 30 : 300; // the defaults + keepAlive = "generic".equals(threadPoolName) || ThreadPool.Names.SNAPSHOT_META.equals(threadPoolName) + ? 30 : 300; // the defaults } runScalingThreadPoolTest(builder.build(), (clusterSettings, threadPool) -> { @@ -96,6 +97,7 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso sizes.put(ThreadPool.Names.REFRESH, ThreadPool::halfAllocatedProcessorsMaxTen); sizes.put(ThreadPool.Names.WARMER, ThreadPool::halfAllocatedProcessorsMaxFive); sizes.put(ThreadPool.Names.SNAPSHOT, ThreadPool::halfAllocatedProcessorsMaxFive); + sizes.put(ThreadPool.Names.SNAPSHOT_META, n -> Math.min(n * 3, 50)); sizes.put(ThreadPool.Names.FETCH_SHARD_STARTED, ThreadPool::twiceAllocatedProcessors); sizes.put(ThreadPool.Names.FETCH_SHARD_STORE, ThreadPool::twiceAllocatedProcessors); return sizes.get(threadPoolName).apply(numberOfProcessors);