diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java index 4d66a3f343877..60afa9292ce99 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java @@ -10,7 +10,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; @@ -29,12 +28,12 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.repositories.GetSnapshotInfoContext; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryMissingException; -import org.elasticsearch.snapshots.SnapshotException; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotMissingException; @@ -53,8 +52,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.Collectors; import static java.util.Collections.unmodifiableList; @@ -327,25 +324,16 @@ private void snapshots( } else { snapshotInfos = Collections.synchronizedList(new ArrayList<>()); } - final ActionListener> allDoneListener = listener.delegateFailure((l, v) -> { + final ActionListener allDoneListener = listener.delegateFailure((l, v) -> { final ArrayList snapshotList = new ArrayList<>(snapshotInfos); snapshotList.addAll(snapshotSet); CollectionUtil.timSort(snapshotList); listener.onResponse(unmodifiableList(snapshotList)); }); if (snapshotIdsToIterate.isEmpty()) { - allDoneListener.onResponse(Collections.emptyList()); + allDoneListener.onResponse(null); return; } - // put snapshot info downloads into a task queue instead of pushing them all into the queue to not completely monopolize the - // snapshot meta pool for a single request - final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT_META).getMax(), snapshotIdsToIterate.size()); - final BlockingQueue queue = new LinkedBlockingQueue<>(snapshotIdsToIterate); - final ActionListener workerDoneListener = new GroupedActionListener<>(allDoneListener, workers).delegateResponse((l, e) -> { - queue.clear(); // Stop fetching the remaining snapshots once we've failed fetching one since the response is an error response - // anyway in this case - l.onFailure(e); - }); final Repository repository; try { repository = repositoriesService.repository(repositoryName); @@ -353,50 +341,26 @@ private void snapshots( listener.onFailure(e); return; } - for (int i = 0; i < workers; i++) { - getOneSnapshotInfo(ignoreUnavailable, repository, queue, snapshotInfos, task, workerDoneListener); - } - } + repository.getSnapshotInfo( + new GetSnapshotInfoContext( + snapshotIdsToIterate, + ignoreUnavailable == false, + task::isCancelled, + (context, snapshotInfo) -> snapshotInfos.add(snapshotInfo), + ignoreUnavailable ? ActionListener.runAfter(new ActionListener<>() { + @Override + public void onResponse(Void unused) { + logger.trace("done fetching snapshot infos [{}]", snapshotIdsToIterate); + } - /** - * Tries to poll a {@link SnapshotId} to load {@link SnapshotInfo} for from the given {@code queue}. If it finds one in the queue, - * loads the snapshot info from the repository and adds it to the given {@code snapshotInfos} collection, then invokes itself again to - * try and poll another task from the queue. - * If the queue is empty resolves {@code} listener. - */ - private void getOneSnapshotInfo( - boolean ignoreUnavailable, - Repository repository, - BlockingQueue queue, - Collection snapshotInfos, - CancellableTask task, - ActionListener listener - ) { - final SnapshotId snapshotId = queue.poll(); - if (snapshotId == null) { - listener.onResponse(null); - return; - } - threadPool.executor(ThreadPool.Names.SNAPSHOT_META).execute(() -> { - if (task.isCancelled()) { - listener.onFailure(new TaskCancelledException("task cancelled")); - return; - } - try { - snapshotInfos.add(repository.getSnapshotInfo(snapshotId)); - } catch (Exception ex) { - if (ignoreUnavailable) { - logger.warn(() -> new ParameterizedMessage("failed to get snapshot [{}]", snapshotId), ex); - } else { - listener.onFailure( - ex instanceof SnapshotException - ? ex - : new SnapshotException(repository.getMetadata().name(), snapshotId, "Snapshot could not be read", ex) - ); - } - } - getOneSnapshotInfo(ignoreUnavailable, repository, queue, snapshotInfos, task, listener); - }); + @Override + public void onFailure(Exception e) { + assert false : new AssertionError("listener should always complete successfully for ignoreUnavailable=true", e); + logger.warn("failed to fetch snapshot info for some snapshots", e); + } + }, () -> allDoneListener.onResponse(null)) : allDoneListener + ) + ); } private boolean isAllSnapshots(String[] snapshots) { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java index f1f3547266fb6..e0bbb474804dc 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java @@ -13,7 +13,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.StepListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.node.NodeClient; @@ -27,10 +27,10 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.util.CollectionUtils; -import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.repositories.GetSnapshotInfoContext; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; @@ -52,6 +52,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -62,6 +63,7 @@ import java.util.stream.Collectors; import static java.util.Collections.unmodifiableMap; +import static org.elasticsearch.cluster.SnapshotsInProgress.ShardState.SUCCESS; public class TransportSnapshotsStatusAction extends TransportMasterNodeAction { @@ -90,7 +92,7 @@ public TransportSnapshotsStatusAction( SnapshotsStatusRequest::new, indexNameExpressionResolver, SnapshotsStatusResponse::new, - ThreadPool.Names.GENERIC + ThreadPool.Names.SAME ); this.repositoriesService = repositoriesService; this.client = client; @@ -142,13 +144,14 @@ protected void masterOperation( new TransportNodesSnapshotsStatus.Request(nodesIds.toArray(Strings.EMPTY_ARRAY)).snapshots(snapshots) .timeout(request.masterNodeTimeout()), ActionListener.wrap( - nodeSnapshotStatuses -> threadPool.generic() - .execute( - ActionRunnable.wrap( - listener, - l -> buildResponse(snapshotsInProgress, request, currentSnapshots, nodeSnapshotStatuses, cancellableTask, l) - ) - ), + nodeSnapshotStatuses -> buildResponse( + snapshotsInProgress, + request, + currentSnapshots, + nodeSnapshotStatuses, + cancellableTask, + listener + ), listener::onFailure ) ); @@ -192,8 +195,7 @@ private void buildResponse( SnapshotIndexShardStatus shardStatus = shardStatues.get(shardEntry.key); if (shardStatus != null) { // We have full information about this shard - if (shardStatus.getStage() == SnapshotIndexShardStage.DONE - && shardEntry.value.state() != SnapshotsInProgress.ShardState.SUCCESS) { + if (shardStatus.getStage() == SnapshotIndexShardStage.DONE && shardEntry.value.state() != SUCCESS) { // Unlikely edge case: // Data node has finished snapshotting the shard but the cluster state has not yet been updated // to reflect this. We adjust the status to show up as snapshot metadata being written because @@ -286,9 +288,10 @@ private void loadRepositoryData( ActionListener listener ) { final Set requestedSnapshotNames = Sets.newHashSet(request.snapshots()); - final ListenableFuture repositoryDataListener = new ListenableFuture<>(); + final StepListener repositoryDataListener = new StepListener<>(); repositoriesService.getRepositoryData(repositoryName, repositoryDataListener); - repositoryDataListener.addListener(ActionListener.wrap(repositoryData -> { + final Collection snapshotIdsToLoad = new ArrayList<>(); + repositoryDataListener.whenComplete(repositoryData -> { ensureNotCancelled(task); final Map matchedSnapshotIds = repositoryData.getSnapshotIds() .stream() @@ -314,73 +317,62 @@ private void loadRepositoryData( throw new SnapshotMissingException(repositoryName, snapshotName); } } - SnapshotInfo snapshotInfo = snapshot(snapshotsInProgress, repositoryName, snapshotId); - List shardStatusBuilder = new ArrayList<>(); - if (snapshotInfo.state().completed()) { - Map shardStatuses = snapshotShards( - repositoryName, - repositoryData, - task, - snapshotInfo - ); - for (Map.Entry shardStatus : shardStatuses.entrySet()) { - IndexShardSnapshotStatus.Copy lastSnapshotStatus = shardStatus.getValue().asCopy(); - shardStatusBuilder.add(new SnapshotIndexShardStatus(shardStatus.getKey(), lastSnapshotStatus)); - } - final SnapshotsInProgress.State state; - switch (snapshotInfo.state()) { - case FAILED: - state = SnapshotsInProgress.State.FAILED; - break; - case SUCCESS: - case PARTIAL: - // Translating both PARTIAL and SUCCESS to SUCCESS for now - // TODO: add the differentiation on the metadata level in the next major release - state = SnapshotsInProgress.State.SUCCESS; - break; - default: - throw new IllegalArgumentException("Unknown snapshot state " + snapshotInfo.state()); - } - final long startTime = snapshotInfo.startTime(); - final long endTime = snapshotInfo.endTime(); - assert endTime >= startTime || (endTime == 0L && snapshotInfo.state().completed() == false) - : "Inconsistent timestamps found in SnapshotInfo [" + snapshotInfo + "]"; - builder.add( - new SnapshotStatus( - new Snapshot(repositoryName, snapshotId), - state, - Collections.unmodifiableList(shardStatusBuilder), - snapshotInfo.includeGlobalState(), - startTime, - // Use current time to calculate overall runtime for in-progress snapshots that have endTime == 0 - (endTime == 0 ? threadPool.absoluteTimeInMillis() : endTime) - startTime - ) - ); + if (snapshotsInProgress.snapshot(new Snapshot(repositoryName, snapshotId)) == null) { + snapshotIdsToLoad.add(snapshotId); } } - listener.onResponse(new SnapshotsStatusResponse(Collections.unmodifiableList(builder))); - }, listener::onFailure), threadPool.generic(), null); - } - /** - * Retrieves snapshot from repository - * - * @param snapshotsInProgress snapshots in progress in the cluster state - * @param repositoryName repository name - * @param snapshotId snapshot id - * @return snapshot - * @throws SnapshotMissingException if snapshot is not found - */ - private SnapshotInfo snapshot(SnapshotsInProgress snapshotsInProgress, String repositoryName, SnapshotId snapshotId) { - List entries = SnapshotsService.currentSnapshots( - snapshotsInProgress, - repositoryName, - Collections.singletonList(snapshotId.getName()) - ); - if (entries.isEmpty() == false) { - return new SnapshotInfo(entries.iterator().next()); - } - return repositoriesService.repository(repositoryName).getSnapshotInfo(snapshotId); + if (snapshotIdsToLoad.isEmpty()) { + listener.onResponse(new SnapshotsStatusResponse(Collections.unmodifiableList(builder))); + } else { + final List threadSafeBuilder = Collections.synchronizedList(builder); + repositoriesService.repository(repositoryName) + .getSnapshotInfo(new GetSnapshotInfoContext(snapshotIdsToLoad, true, task::isCancelled, (context, snapshotInfo) -> { + List shardStatusBuilder = new ArrayList<>(); + final Map shardStatuses; + try { + shardStatuses = snapshotShards(repositoryName, repositoryData, task, snapshotInfo); + } catch (Exception e) { + // stops all further fetches of snapshotInfo since context is fail-fast + context.onFailure(e); + return; + } + for (Map.Entry shardStatus : shardStatuses.entrySet()) { + IndexShardSnapshotStatus.Copy lastSnapshotStatus = shardStatus.getValue().asCopy(); + shardStatusBuilder.add(new SnapshotIndexShardStatus(shardStatus.getKey(), lastSnapshotStatus)); + } + final SnapshotsInProgress.State state; + switch (snapshotInfo.state()) { + case FAILED: + state = SnapshotsInProgress.State.FAILED; + break; + case SUCCESS: + case PARTIAL: + // Translating both PARTIAL and SUCCESS to SUCCESS for now + // TODO: add the differentiation on the metadata level in the next major release + state = SnapshotsInProgress.State.SUCCESS; + break; + default: + throw new IllegalArgumentException("Unknown snapshot state " + snapshotInfo.state()); + } + final long startTime = snapshotInfo.startTime(); + final long endTime = snapshotInfo.endTime(); + assert endTime >= startTime || (endTime == 0L && snapshotInfo.state().completed() == false) + : "Inconsistent timestamps found in SnapshotInfo [" + snapshotInfo + "]"; + threadSafeBuilder.add( + new SnapshotStatus( + new Snapshot(repositoryName, snapshotInfo.snapshotId()), + state, + Collections.unmodifiableList(shardStatusBuilder), + snapshotInfo.includeGlobalState(), + startTime, + // Use current time to calculate overall runtime for in-progress snapshots that have endTime == 0 + (endTime == 0 ? threadPool.absoluteTimeInMillis() : endTime) - startTime + ) + ); + }, listener.map(v -> new SnapshotsStatusResponse(List.copyOf(threadSafeBuilder))))); + } + }, listener::onFailure); } /** diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index b1bb3ffccf1fc..3a87611e27dfc 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -49,8 +49,8 @@ public RepositoryMetadata getMetadata() { } @Override - public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) { - return in.getSnapshotInfo(snapshotId); + public void getSnapshotInfo(GetSnapshotInfoContext context) { + in.getSnapshotInfo(context); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/GetSnapshotInfoContext.java b/server/src/main/java/org/elasticsearch/repositories/GetSnapshotInfoContext.java new file mode 100644 index 0000000000000..bfacdb3107e76 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/repositories/GetSnapshotInfoContext.java @@ -0,0 +1,148 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.repositories; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.Collection; +import java.util.List; +import java.util.function.BiConsumer; +import java.util.function.BooleanSupplier; + +/** + * Describes the context of fetching one or more {@link SnapshotInfo} via {@link Repository#getSnapshotInfo(GetSnapshotInfoContext)}. + */ +public final class GetSnapshotInfoContext implements ActionListener { + + private static final Logger logger = LogManager.getLogger(GetSnapshotInfoContext.class); + + /** + * Snapshot ids to fetch info for. + */ + private final List snapshotIds; + + /** + * Stop fetching additional {@link SnapshotInfo} if an exception is encountered. + */ + private final boolean abortOnFailure; + + /** + * If this supplier returns true, indicates that the task that initiated this context has been cancelled and that not further fetching + * of {@link SnapshotInfo} should be started. + */ + private final BooleanSupplier isCancelled; + + /** + * Listener resolved when fetching {@link SnapshotInfo} has completed. If resolved successfully, no more calls to + * {@link #consumer} will be made. Only resolves exceptionally if {@link #abortOnFailure} is true in case one or more + * {@link SnapshotInfo} failed to be fetched. + * This listener is always invoked on the {@link ThreadPool.Names#SNAPSHOT_META} pool. + */ + private final ActionListener doneListener; + + /** + * {@link BiConsumer} invoked for each {@link SnapshotInfo} that is fetched with this instance and the {@code SnapshotInfo} as + * arguments. This consumer is always invoked on the {@link ThreadPool.Names#SNAPSHOT_META} pool. + */ + private final BiConsumer consumer; + + private final CountDown counter; + + public GetSnapshotInfoContext( + Collection snapshotIds, + boolean abortOnFailure, + BooleanSupplier isCancelled, + BiConsumer consumer, + ActionListener listener + ) { + if (snapshotIds.isEmpty()) { + throw new IllegalArgumentException("no snapshot ids to fetch given"); + } + this.snapshotIds = List.copyOf(snapshotIds); + this.counter = new CountDown(snapshotIds.size()); + this.abortOnFailure = abortOnFailure; + this.isCancelled = isCancelled; + this.consumer = consumer; + this.doneListener = listener; + } + + public List snapshotIds() { + return snapshotIds; + } + + /** + * @return true if fetching {@link SnapshotInfo} should be stopped after encountering any exception + */ + public boolean abortOnFailure() { + return abortOnFailure; + } + + /** + * @return true if fetching {@link SnapshotInfo} has been cancelled + */ + public boolean isCancelled() { + return isCancelled.getAsBoolean(); + } + + /** + * @return true if fetching {@link SnapshotInfo} is either complete or should be stopped because of an error + */ + public boolean done() { + return counter.isCountedDown(); + } + + @Override + public void onResponse(SnapshotInfo snapshotInfo) { + assert Repository.assertSnapshotMetaThread(); + try { + consumer.accept(this, snapshotInfo); + } catch (Exception e) { + assert false : e; + onFailure(e); + return; + } + if (counter.countDown()) { + try { + doneListener.onResponse(null); + } catch (Exception e) { + assert false : e; + failDoneListener(e); + } + } + } + + @Override + public void onFailure(Exception e) { + assert Repository.assertSnapshotMetaThread(); + if (abortOnFailure) { + if (counter.fastForward()) { + failDoneListener(e); + } + } else { + logger.warn("failed to fetch snapshot info", e); + if (counter.countDown()) { + doneListener.onResponse(null); + } + } + } + + private void failDoneListener(Exception failure) { + try { + doneListener.onFailure(failure); + } catch (Exception ex) { + assert false : ex; + throw ex; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index 6fd9ebe15a0a9..9cbc0287d6c6a 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -25,9 +25,11 @@ import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Consumer; @@ -70,12 +72,34 @@ default Repository create(RepositoryMetadata metadata, Function listener) { + getSnapshotInfo(new GetSnapshotInfoContext(List.of(snapshotId), true, () -> false, (context, snapshotInfo) -> { + assert Repository.assertSnapshotMetaThread(); + listener.onResponse(snapshotInfo); + }, new ActionListener<>() { + @Override + public void onResponse(Void o) { + // ignored + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + })); + } /** * Returns global metadata associated with the snapshot. @@ -296,4 +320,11 @@ void cloneShardSnapshot( default Map adaptUserMetadata(Map userMetadata) { return userMetadata; } + + static boolean assertSnapshotMetaThread() { + final String threadName = Thread.currentThread().getName(); + assert threadName.contains('[' + ThreadPool.Names.SNAPSHOT_META + ']') || threadName.startsWith("TEST-") + : "Expected current thread [" + Thread.currentThread() + "] to be a snapshot meta thread."; + return true; + } } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index c76773be4bfdf..31e574ed0b53a 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -90,6 +90,7 @@ import org.elasticsearch.index.store.StoreFileMetadata; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.repositories.GetSnapshotInfoContext; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.IndexMetaDataGenerations; import org.elasticsearch.repositories.RepositoriesService; @@ -111,6 +112,7 @@ import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotMissingException; import org.elasticsearch.snapshots.SnapshotsService; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.threadpool.ThreadPool; import java.io.FilterInputStream; @@ -1450,14 +1452,57 @@ private void cleanupOldShardGens(RepositoryData existingRepositoryData, Reposito } @Override - public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) { - try { - return SNAPSHOT_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry); - } catch (NoSuchFileException ex) { - throw new SnapshotMissingException(metadata.name(), snapshotId, ex); - } catch (IOException | NotXContentException ex) { - throw new SnapshotException(metadata.name(), snapshotId, "failed to get snapshots", ex); + public void getSnapshotInfo(GetSnapshotInfoContext context) { + // put snapshot info downloads into a task queue instead of pushing them all into the queue to not completely monopolize the + // snapshot meta pool for a single request + final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT_META).getMax(), context.snapshotIds().size()); + final BlockingQueue queue = new LinkedBlockingQueue<>(context.snapshotIds()); + for (int i = 0; i < workers; i++) { + getOneSnapshotInfo(queue, context); + } + } + + /** + * Tries to poll a {@link SnapshotId} to load {@link SnapshotInfo} for from the given {@code queue}. + */ + private void getOneSnapshotInfo(BlockingQueue queue, GetSnapshotInfoContext context) { + final SnapshotId snapshotId = queue.poll(); + if (snapshotId == null) { + return; } + threadPool.executor(ThreadPool.Names.SNAPSHOT_META).execute(() -> { + if (context.done()) { + return; + } + if (context.isCancelled()) { + queue.clear(); + context.onFailure(new TaskCancelledException("task cancelled")); + return; + } + Exception failure = null; + SnapshotInfo snapshotInfo = null; + try { + snapshotInfo = SNAPSHOT_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry); + } catch (NoSuchFileException ex) { + failure = new SnapshotMissingException(metadata.name(), snapshotId, ex); + } catch (IOException | NotXContentException ex) { + failure = new SnapshotException(metadata.name(), snapshotId, "failed to get snapshot info" + snapshotId, ex); + } catch (Exception e) { + failure = e instanceof SnapshotException + ? e + : new SnapshotException(metadata.name(), snapshotId, "Snapshot could not be read", e); + } + if (failure != null) { + if (context.abortOnFailure()) { + queue.clear(); + } + context.onFailure(failure); + } else { + assert snapshotInfo != null; + context.onResponse(snapshotInfo); + } + getOneSnapshotInfo(queue, context); + }); } @Override @@ -2147,43 +2192,40 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS .collect(Collectors.toList()); if (snapshotIdsWithMissingDetails.isEmpty() == false) { final Map extraDetailsMap = new ConcurrentHashMap<>(); - final GroupedActionListener loadExtraDetailsListener = new GroupedActionListener<>( - ActionListener.runAfter(new ActionListener<>() { - @Override - public void onResponse(Collection voids) { - logger.info( - "Successfully loaded all snapshots' detailed information for {} from snapshot metadata", - AllocationService.firstListElementsToCommaDelimitedString( - snapshotIdsWithMissingDetails, - SnapshotId::toString, - logger.isDebugEnabled() - ) - ); - } - - @Override - public void onFailure(Exception e) { - logger.warn("Failure when trying to load missing details from snapshot metadata", e); - } - }, () -> filterRepositoryDataStep.onResponse(repositoryData.withExtraDetails(extraDetailsMap))), - snapshotIdsWithMissingDetails.size() - ); - for (SnapshotId snapshotId : snapshotIdsWithMissingDetails) { - // Just spawn all the download jobs at the same time: this is pretty important, executes only rarely (typically once - // after an upgrade) and each job is only a small download so this shouldn't block other SNAPSHOT activities for long. - threadPool().executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(loadExtraDetailsListener, () -> { - final SnapshotInfo snapshotInfo = getSnapshotInfo(snapshotId); - extraDetailsMap.put( - snapshotId, + getSnapshotInfo( + new GetSnapshotInfoContext( + snapshotIdsWithMissingDetails, + false, + () -> false, + (context, snapshotInfo) -> extraDetailsMap.put( + snapshotInfo.snapshotId(), new SnapshotDetails( snapshotInfo.state(), snapshotInfo.version(), snapshotInfo.startTime(), snapshotInfo.endTime() ) - ); - })); - } + ), + ActionListener.runAfter(new ActionListener<>() { + @Override + public void onResponse(Void aVoid) { + logger.info( + "Successfully loaded all snapshots' detailed information for {} from snapshot metadata", + AllocationService.firstListElementsToCommaDelimitedString( + snapshotIdsWithMissingDetails, + SnapshotId::toString, + logger.isDebugEnabled() + ) + ); + } + + @Override + public void onFailure(Exception e) { + logger.warn("Failure when trying to load missing details from snapshot metadata", e); + } + }, () -> filterRepositoryDataStep.onResponse(repositoryData.withExtraDetails(extraDetailsMap))) + ) + ); } else { filterRepositoryDataStep.onResponse(repositoryData); } diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 733a0f9391cfc..92e4321d26f70 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -16,7 +16,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.StepListener; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; import org.elasticsearch.action.support.GroupedActionListener; @@ -233,10 +232,7 @@ public void restoreSnapshot( final StepListener repositoryDataListener = new StepListener<>(); repository.getRepositoryData(repositoryDataListener); - repositoryDataListener.whenComplete(repositoryData -> repositoryUuidRefreshListener.whenComplete(ignored -> - // fork handling to the generic pool since it loads various pieces of metadata from the repository over a longer period - // of time - clusterService.getClusterApplierService().threadPool().generic().execute(ActionRunnable.wrap(listener, l -> { + repositoryDataListener.whenComplete(repositoryData -> repositoryUuidRefreshListener.whenComplete(ignored -> { final String snapshotName = request.snapshot(); final Optional matchingSnapshotId = repositoryData.getSnapshotIds() .stream() @@ -254,8 +250,14 @@ public void restoreSnapshot( "snapshot UUID mismatch: expected [" + request.snapshotUuid() + "] but got [" + snapshotId.getUUID() + "]" ); } - startRestore(repository.getSnapshotInfo(snapshotId), repository, request, repositoryData, updater, l); - })), listener::onFailure), listener::onFailure); + repository.getSnapshotInfo( + snapshotId, + ActionListener.wrap( + snapshotInfo -> startRestore(snapshotInfo, repository, request, repositoryData, updater, listener), + listener::onFailure + ) + ); + }, listener::onFailure), listener::onFailure); } catch (Exception e) { logger.warn( () -> new ParameterizedMessage("[{}] failed to restore snapshot", request.repository() + ":" + request.snapshot()), @@ -287,6 +289,7 @@ private void startRestore( BiConsumer updater, ActionListener listener ) throws IOException { + assert Repository.assertSnapshotMetaThread(); final SnapshotId snapshotId = snapshotInfo.snapshotId(); final String repositoryName = repository.getMetadata().name(); final Snapshot snapshot = new Snapshot(repositoryName, snapshotId); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 986e0fe1a520a..d086463772ad9 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -578,7 +578,7 @@ private void startCloning(Repository repository, SnapshotsInProgress.Entry clone // 1. step, load SnapshotInfo to make sure that source snapshot was successful for the indices we want to clone // TODO: we could skip this step for snapshots with state SUCCESS final StepListener snapshotInfoListener = new StepListener<>(); - executor.execute(ActionRunnable.supply(snapshotInfoListener, () -> repository.getSnapshotInfo(sourceSnapshot))); + repository.getSnapshotInfo(sourceSnapshot, snapshotInfoListener); final StepListener>> allShardCountsListener = new StepListener<>(); final GroupedActionListener> shardCountListener = new GroupedActionListener<>( diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index 800bac246b50b..88ebac243fa0f 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -202,8 +202,8 @@ public RepositoryMetadata getMetadata() { } @Override - public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) { - return null; + public void getSnapshotInfo(GetSnapshotInfoContext context) { + throw new UnsupportedOperationException(); } @Override diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 8d5de18a8973f..057bbeea38f62 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -348,13 +348,20 @@ public void testSuccessfulSnapshotAndRestore() { Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); assertThat(snapshotIds, hasSize(1)); - final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next()); + final SnapshotInfo snapshotInfo = getSnapshotInfo(repository, snapshotIds.iterator().next()); assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); assertThat(snapshotInfo.indices(), containsInAnyOrder(index)); assertEquals(shards, snapshotInfo.successfulShards()); assertEquals(0, snapshotInfo.failedShards()); } + private SnapshotInfo getSnapshotInfo(Repository repository, SnapshotId snapshotId) { + final StepListener listener = new StepListener<>(); + repository.getSnapshotInfo(snapshotId, listener); + deterministicTaskQueue.runAllRunnableTasks(); + return listener.result(); + } + public void testSnapshotWithNodeDisconnects() { final int dataNodes = randomIntBetween(2, 10); final int masterNodes = randomFrom(1, 3, 5); @@ -546,7 +553,7 @@ public void clusterChanged(ClusterChangedEvent event) { Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); assertThat(snapshotIds, hasSize(1)); - final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next()); + final SnapshotInfo snapshotInfo = getSnapshotInfo(repository, snapshotIds.iterator().next()); assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); assertThat(snapshotInfo.indices(), containsInAnyOrder(index)); assertEquals(shards, snapshotInfo.successfulShards()); @@ -620,7 +627,7 @@ public void testConcurrentSnapshotCreateAndDeleteOther() { assertThat(snapshotIds, hasSize(2)); for (SnapshotId snapshotId : snapshotIds) { - final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId); + final SnapshotInfo snapshotInfo = getSnapshotInfo(repository, snapshotId); assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); assertThat(snapshotInfo.indices(), containsInAnyOrder(index)); assertEquals(shards, snapshotInfo.successfulShards()); @@ -770,7 +777,7 @@ public void testConcurrentSnapshotRestoreAndDeleteOther() { assertThat(snapshotIds, contains(createOtherSnapshotResponseStepListener.result().getSnapshotInfo().snapshotId())); for (SnapshotId snapshotId : snapshotIds) { - final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId); + final SnapshotInfo snapshotInfo = getSnapshotInfo(repository, snapshotId); assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); assertThat(snapshotInfo.indices(), containsInAnyOrder(index)); assertEquals(shards, snapshotInfo.successfulShards()); @@ -865,7 +872,7 @@ public void onFailure(Exception e) { Collection snapshotIds = repositoryData.getSnapshotIds(); assertThat(snapshotIds, hasSize(1)); - final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next()); + final SnapshotInfo snapshotInfo = getSnapshotInfo(repository, snapshotIds.iterator().next()); if (partialSnapshot) { assertThat(snapshotInfo.state(), either(is(SnapshotState.SUCCESS)).or(is(SnapshotState.PARTIAL))); // Single shard for each index so we either get all indices or all except for the deleted index @@ -1143,7 +1150,7 @@ public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() { Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); assertThat(snapshotIds, hasSize(1)); - final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next()); + final SnapshotInfo snapshotInfo = getSnapshotInfo(repository, snapshotIds.iterator().next()); assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); assertThat(snapshotInfo.indices(), containsInAnyOrder(index)); assertEquals(shards, snapshotInfo.successfulShards()); @@ -1211,7 +1218,7 @@ public void testRunConcurrentSnapshots() { assertThat(snapshotIds, hasSize(snapshotNames.size())); for (SnapshotId snapshotId : snapshotIds) { - final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId); + final SnapshotInfo snapshotInfo = getSnapshotInfo(repository, snapshotId); assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); assertThat(snapshotInfo.indices(), containsInAnyOrder(index)); assertEquals(shards, snapshotInfo.successfulShards()); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index 583c856c73e53..25c07e8b7a689 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -17,14 +17,15 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.repositories.GetSnapshotInfoContext; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.IndexMetaDataGenerations; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryShardId; import org.elasticsearch.repositories.ShardGenerations; -import org.elasticsearch.repositories.SnapshotShardContext; import org.elasticsearch.repositories.ShardSnapshotResult; +import org.elasticsearch.repositories.SnapshotShardContext; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; @@ -63,8 +64,8 @@ public RepositoryMetadata getMetadata() { } @Override - public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) { - return null; + public void getSnapshotInfo(GetSnapshotInfoContext context) { + throw new UnsupportedOperationException(); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java index a301bcb626879..b60e22b7ca3fb 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java @@ -9,6 +9,7 @@ import org.apache.lucene.util.SameThreadExecutorService; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterChangedEvent; @@ -33,6 +34,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.repositories.GetSnapshotInfoContext; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.ShardGenerations; @@ -44,6 +46,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.NoSuchFileException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -110,9 +113,32 @@ public static PlainActionFuture assertConsistencyAsync(BlobStore repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen, false); } assertIndexUUIDs(repository, repositoryData); - assertSnapshotUUIDs(repository, repositoryData); - assertShardIndexGenerations(blobContainer, repositoryData.shardGenerations()); - listener.onResponse(null); + assertSnapshotUUIDs(repository, repositoryData, new ActionListener<>() { + @Override + public void onResponse(AssertionError assertionError) { + if (assertionError == null) { + try { + try { + assertShardIndexGenerations(blobContainer, repositoryData.shardGenerations()); + } catch (AssertionError e) { + listener.onResponse(e); + return; + } + } catch (Exception e) { + onFailure(e); + return; + } + listener.onResponse(null); + } else { + listener.onResponse(assertionError); + } + } + + @Override + public void onFailure(Exception e) { + listener.onResponse(new AssertionError(e)); + } + }); } catch (AssertionError e) { listener.onResponse(e); } @@ -179,7 +205,8 @@ private static void assertIndexUUIDs(BlobStoreRepository repository, RepositoryD } } - private static void assertSnapshotUUIDs(BlobStoreRepository repository, RepositoryData repositoryData) throws IOException { + private static void assertSnapshotUUIDs(BlobStoreRepository repository, RepositoryData repositoryData, + ActionListener listener) throws IOException { final BlobContainer repoRoot = repository.blobContainer(); final Collection snapshotIds = repositoryData.getSnapshotIds(); final List expectedSnapshotUUIDs = snapshotIds.stream().map(SnapshotId::getUUID).collect(Collectors.toList()); @@ -197,11 +224,50 @@ private static void assertSnapshotUUIDs(BlobStoreRepository repository, Reposito } else { indices = indicesContainer.children(); } + if (snapshotIds.isEmpty()) { + listener.onResponse(null); + return; + } + // Assert that for each snapshot, the relevant metadata was written to index and shard folders + final List snapshotInfos = Collections.synchronizedList(new ArrayList<>()); + repository.getSnapshotInfo( + new GetSnapshotInfoContext( + List.copyOf(snapshotIds), + true, + () -> false, + (ctx, sni) -> snapshotInfos.add(sni), + new ActionListener<>() { + @Override + public void onResponse(Void unused) { + try { + assertSnapshotInfosConsistency(repository, repositoryData, indices, snapshotInfos); + } catch (Exception e) { + listener.onResponse(new AssertionError(e)); + return; + } catch (AssertionError e) { + listener.onResponse(e); + return; + } + listener.onResponse(null); + } + + @Override + public void onFailure(Exception e) { + listener.onResponse(new AssertionError(e)); + } + } + ) + ); + } + + private static void assertSnapshotInfosConsistency(BlobStoreRepository repository, + RepositoryData repositoryData, + Map indices, + List snapshotInfos) throws IOException { final Map maxShardCountsExpected = new HashMap<>(); final Map maxShardCountsSeen = new HashMap<>(); - // Assert that for each snapshot, the relevant metadata was written to index and shard folders - for (SnapshotId snapshotId: snapshotIds) { - final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId); + for (SnapshotInfo snapshotInfo: snapshotInfos) { + final SnapshotId snapshotId = snapshotInfo.snapshotId(); for (String index : snapshotInfo.indices()) { final IndexId indexId = repositoryData.resolveIndexId(index); assertThat(indices, hasKey(indexId.getId())); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 9610d519589a7..a62226775bddb 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -62,14 +62,15 @@ import org.elasticsearch.indices.recovery.MultiChunkTransfer; import org.elasticsearch.indices.recovery.MultiFileWriter; import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.repositories.GetSnapshotInfoContext; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.IndexMetaDataGenerations; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryShardId; import org.elasticsearch.repositories.ShardGenerations; -import org.elasticsearch.repositories.SnapshotShardContext; import org.elasticsearch.repositories.ShardSnapshotResult; +import org.elasticsearch.repositories.SnapshotShardContext; import org.elasticsearch.repositories.blobstore.FileRestoreContext; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; @@ -172,8 +173,10 @@ private Client getRemoteClusterClient() { } @Override - public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) { - assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId"; + public void getSnapshotInfo(GetSnapshotInfoContext context) { + final List snapshotIds = context.snapshotIds(); + assert snapshotIds.size() == 1 && SNAPSHOT_ID.equals(snapshotIds.iterator().next()) + : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId but saw " + snapshotIds; Client remoteClient = getRemoteClusterClient(); ClusterStateResponse response = remoteClient.admin().cluster().prepareState().clear().setMetadata(true).setNodes(true) .get(ccrSettings.getRecoveryActionTimeout()); @@ -182,9 +185,17 @@ public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) { ArrayList indices = new ArrayList<>(indicesMap.size()); indicesMap.keysIt().forEachRemaining(indices::add); - return new SnapshotInfo(snapshotId, indices, new ArrayList<>(metadata.dataStreams().keySet()), Collections.emptyList(), - response.getState().getNodes().getMaxNodeVersion(), SnapshotState.SUCCESS - ); + // fork to the snapshot meta pool because the context expects to run on it and asserts that it does + threadPool.executor(ThreadPool.Names.SNAPSHOT_META).execute(() -> context.onResponse( + new SnapshotInfo( + SNAPSHOT_ID, + indices, + new ArrayList<>(metadata.dataStreams().keySet()), + Collections.emptyList(), + response.getState().getNodes().getMaxNodeVersion(), + SnapshotState.SUCCESS + ) + )); } @Override