From f9a39ed7e6ac3cbc49d6417428ec60b662b82846 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 29 Aug 2019 16:06:48 +0200 Subject: [PATCH 1/2] More Efficient Ordering of Shard Upload Execution (#42791) * Change the upload order of of snapshots to work file by file in parallel on the snapshot pool instead of merely shard-by-shard * Inspired by #39657 --- .../elasticsearch/action/ActionListener.java | 32 +++ .../repositories/FilterRepository.java | 6 +- .../repositories/Repository.java | 26 +- .../blobstore/BlobStoreRepository.java | 230 ++++++++++-------- .../snapshots/SnapshotShardsService.java | 122 ++++------ .../action/ActionListenerTests.java | 17 ++ .../RepositoriesServiceTests.java | 2 +- .../repositories/fs/FsRepositoryTests.java | 10 +- .../index/shard/IndexShardTestCase.java | 4 +- .../index/shard/RestoreOnlyRepository.java | 2 +- .../xpack/ccr/repository/CcrRepository.java | 2 +- .../SourceOnlySnapshotRepository.java | 39 ++- .../SourceOnlySnapshotShardTests.java | 29 ++- 13 files changed, 293 insertions(+), 228 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/ActionListener.java b/server/src/main/java/org/elasticsearch/action/ActionListener.java index c21aa3b9d4b8f..6379d37d5cd6e 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionListener.java +++ b/server/src/main/java/org/elasticsearch/action/ActionListener.java @@ -22,6 +22,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.CheckedSupplier; import java.util.ArrayList; @@ -226,6 +227,37 @@ public void onFailure(Exception e) { }; } + /** + * Wraps a given listener and returns a new listener which executes the provided {@code runBefore} + * callback before the listener is notified via either {@code #onResponse} or {@code #onFailure}. + * If the callback throws an exception then it will be passed to the listener's {@code #onFailure} and its {@code #onResponse} will + * not be executed. + */ + static ActionListener runBefore(ActionListener delegate, CheckedRunnable runBefore) { + return new ActionListener() { + @Override + public void onResponse(Response response) { + try { + runBefore.run(); + } catch (Exception ex) { + delegate.onFailure(ex); + return; + } + delegate.onResponse(response); + } + + @Override + public void onFailure(Exception e) { + try { + runBefore.run(); + } catch (Exception ex) { + e.addSuppressed(ex); + } + delegate.onFailure(e); + } + }; + } + /** * Wraps a given listener and returns a new listener which makes sure {@link #onResponse(Object)} * and {@link #onFailure(Exception)} of the provided listener will be called at most once. diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index 8c9eff0698835..39fd92f9eaae9 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -121,13 +121,11 @@ public boolean isReadOnly() { return in.isReadOnly(); } - @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, - IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) { - in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus); + IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { + in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, listener); } - @Override public void restoreShard(Store store, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) { diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index 1d828da344bb6..a5072293e36b2 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -51,7 +51,7 @@ *
    *
  • Master calls {@link #initializeSnapshot(SnapshotId, List, org.elasticsearch.cluster.metadata.MetaData)} * with list of indices that will be included into the snapshot
  • - *
  • Data nodes call {@link Repository#snapshotShard(Store, MapperService, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)} + *
  • Data nodes call {@link Repository#snapshotShard} * for each shard
  • *
  • When all shard calls return master calls {@link #finalizeSnapshot} with possible list of failures
  • *
@@ -191,27 +191,6 @@ SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long */ boolean isReadOnly(); - /** - * Creates a snapshot of the shard based on the index commit point. - *

- * The index commit point can be obtained by using {@link org.elasticsearch.index.engine.Engine#acquireLastIndexCommit} method. - * Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller. - *

- * As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check - * {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted. - * @param indexShard the shard to be snapshotted - * @param snapshotId snapshot id - * @param indexId id for the index being snapshotted - * @param snapshotIndexCommit commit point - * @param snapshotStatus snapshot status - * @deprecated use {@link #snapshotShard(Store, MapperService, SnapshotId, IndexId, IndexCommit, IndexShardSnapshotStatus)} instead - */ - @Deprecated - default void snapshotShard(IndexShard indexShard, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, - IndexShardSnapshotStatus snapshotStatus) { - snapshotShard(indexShard.store(), indexShard.mapperService(), snapshotId, indexId, snapshotIndexCommit, snapshotStatus); - } - /** * Creates a snapshot of the shard based on the index commit point. *

@@ -226,9 +205,10 @@ default void snapshotShard(IndexShard indexShard, SnapshotId snapshotId, IndexId * @param indexId id for the index being snapshotted * @param snapshotIndexCommit commit point * @param snapshotStatus snapshot status + * @param listener listener invoked on completion */ void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, - IndexShardSnapshotStatus snapshotStatus); + IndexShardSnapshotStatus snapshotStatus, ActionListener listener); /** * Restores snapshot of the shard. diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 8a9c12f9f4c74..036e1ceea4331 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -32,6 +32,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.StepListener; import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; @@ -108,6 +109,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Executor; import java.util.stream.Collectors; import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName; @@ -909,9 +911,15 @@ private void writeAtomic(final String blobName, final BytesReference bytesRef, b @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, - IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) { + IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { final ShardId shardId = store.shardId(); final long startTime = threadPool.absoluteTimeInMillis(); + final StepListener snapshotDoneListener = new StepListener<>(); + snapshotDoneListener.whenComplete(listener::onResponse, e -> { + snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), ExceptionsHelper.detailedMessage(e)); + listener.onFailure(e instanceof IndexShardSnapshotFailedException ? (IndexShardSnapshotFailedException) e + : new IndexShardSnapshotFailedException(store.shardId(), e)); + }); try { logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, metadata.name()); @@ -933,132 +941,145 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s } final List indexCommitPointFiles = new ArrayList<>(); + ArrayList filesToSnapshot = new ArrayList<>(); store.incRef(); + final Collection fileNames; + final Store.MetadataSnapshot metadataFromStore; try { - ArrayList filesToSnapshot = new ArrayList<>(); - final Store.MetadataSnapshot metadata; // TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should - final Collection fileNames; try { logger.trace( "[{}] [{}] Loading store metadata using index commit [{}]", shardId, snapshotId, snapshotIndexCommit); - metadata = store.getMetadata(snapshotIndexCommit); + metadataFromStore = store.getMetadata(snapshotIndexCommit); fileNames = snapshotIndexCommit.getFileNames(); } catch (IOException e) { throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e); } - int indexIncrementalFileCount = 0; - int indexTotalNumberOfFiles = 0; - long indexIncrementalSize = 0; - long indexTotalFileCount = 0; - for (String fileName : fileNames) { - if (snapshotStatus.isAborted()) { - logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName); - throw new IndexShardSnapshotFailedException(shardId, "Aborted"); - } + } finally { + store.decRef(); + } + int indexIncrementalFileCount = 0; + int indexTotalNumberOfFiles = 0; + long indexIncrementalSize = 0; + long indexTotalFileCount = 0; + for (String fileName : fileNames) { + if (snapshotStatus.isAborted()) { + logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName); + throw new IndexShardSnapshotFailedException(shardId, "Aborted"); + } - logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName); - final StoreFileMetaData md = metadata.get(fileName); - BlobStoreIndexShardSnapshot.FileInfo existingFileInfo = null; - List filesInfo = snapshots.findPhysicalIndexFiles(fileName); - if (filesInfo != null) { - for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : filesInfo) { - if (fileInfo.isSame(md)) { - // a commit point file with the same name, size and checksum was already copied to repository - // we will reuse it for this snapshot - existingFileInfo = fileInfo; - break; - } + logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName); + final StoreFileMetaData md = metadataFromStore.get(fileName); + BlobStoreIndexShardSnapshot.FileInfo existingFileInfo = null; + List filesInfo = snapshots.findPhysicalIndexFiles(fileName); + if (filesInfo != null) { + for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : filesInfo) { + if (fileInfo.isSame(md)) { + // a commit point file with the same name, size and checksum was already copied to repository + // we will reuse it for this snapshot + existingFileInfo = fileInfo; + break; } } - - indexTotalFileCount += md.length(); - indexTotalNumberOfFiles++; - - if (existingFileInfo == null) { - indexIncrementalFileCount++; - indexIncrementalSize += md.length(); - // create a new FileInfo - BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = - new BlobStoreIndexShardSnapshot.FileInfo(DATA_BLOB_PREFIX + UUIDs.randomBase64UUID(), md, chunkSize()); - indexCommitPointFiles.add(snapshotFileInfo); - filesToSnapshot.add(snapshotFileInfo); - } else { - indexCommitPointFiles.add(existingFileInfo); - } } - snapshotStatus.moveToStarted(startTime, indexIncrementalFileCount, - indexTotalNumberOfFiles, indexIncrementalSize, indexTotalFileCount); - - for (BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo : filesToSnapshot) { - try { - snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", e); - } + indexTotalFileCount += md.length(); + indexTotalNumberOfFiles++; + + if (existingFileInfo == null) { + indexIncrementalFileCount++; + indexIncrementalSize += md.length(); + // create a new FileInfo + BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = + new BlobStoreIndexShardSnapshot.FileInfo(DATA_BLOB_PREFIX + UUIDs.randomBase64UUID(), md, chunkSize()); + indexCommitPointFiles.add(snapshotFileInfo); + filesToSnapshot.add(snapshotFileInfo); + } else { + indexCommitPointFiles.add(existingFileInfo); } - } finally { - store.decRef(); } - final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize(snapshotIndexCommit.getGeneration()); + snapshotStatus.moveToStarted(startTime, indexIncrementalFileCount, + indexTotalNumberOfFiles, indexIncrementalSize, indexTotalFileCount); - // now create and write the commit point - final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(), - lastSnapshotStatus.getIndexVersion(), - indexCommitPointFiles, - lastSnapshotStatus.getStartTime(), - threadPool.absoluteTimeInMillis() - lastSnapshotStatus.getStartTime(), - lastSnapshotStatus.getIncrementalFileCount(), - lastSnapshotStatus.getIncrementalSize() - ); + assert indexIncrementalFileCount == filesToSnapshot.size(); - logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); - try { - indexShardSnapshotFormat.write(snapshot, shardContainer, snapshotId.getUUID()); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e); - } + final StepListener> allFilesUploadedListener = new StepListener<>(); + allFilesUploadedListener.whenComplete(v -> { + final IndexShardSnapshotStatus.Copy lastSnapshotStatus = + snapshotStatus.moveToFinalize(snapshotIndexCommit.getGeneration()); - // delete all files that are not referenced by any commit point - // build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones - List newSnapshotsList = new ArrayList<>(); - newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles())); - for (SnapshotFiles point : snapshots) { - newSnapshotsList.add(point); - } - final String indexGeneration = Long.toString(fileListGeneration + 1); - final List blobsToDelete; - try { - final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList); - indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, shardContainer, indexGeneration); - // Delete all previous index-N blobs - blobsToDelete = - blobs.keySet().stream().filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)).collect(Collectors.toList()); - assert blobsToDelete.stream().mapToLong(b -> Long.parseLong(b.replaceFirst(SNAPSHOT_INDEX_PREFIX, ""))) - .max().orElse(-1L) < Long.parseLong(indexGeneration) - : "Tried to delete an index-N blob newer than the current generation [" + indexGeneration + "] when deleting index-N" + - " blobs " + blobsToDelete; - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, - "Failed to finalize snapshot creation [" + snapshotId + "] with shard index [" - + indexShardSnapshotsFormat.blobName(indexGeneration) + "]", e); + // now create and write the commit point + final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(), + lastSnapshotStatus.getIndexVersion(), + indexCommitPointFiles, + lastSnapshotStatus.getStartTime(), + threadPool.absoluteTimeInMillis() - lastSnapshotStatus.getStartTime(), + lastSnapshotStatus.getIncrementalFileCount(), + lastSnapshotStatus.getIncrementalSize() + ); + + logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); + try { + indexShardSnapshotFormat.write(snapshot, shardContainer, snapshotId.getUUID()); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e); + } + // delete all files that are not referenced by any commit point + // build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones + List newSnapshotsList = new ArrayList<>(); + newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles())); + for (SnapshotFiles point : snapshots) { + newSnapshotsList.add(point); + } + final String indexGeneration = Long.toString(fileListGeneration + 1); + final List blobsToDelete; + try { + final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList); + indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, shardContainer, indexGeneration); + // Delete all previous index-N blobs + blobsToDelete = + blobs.keySet().stream().filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)).collect(Collectors.toList()); + assert blobsToDelete.stream().mapToLong(b -> Long.parseLong(b.replaceFirst(SNAPSHOT_INDEX_PREFIX, ""))) + .max().orElse(-1L) < Long.parseLong(indexGeneration) + : "Tried to delete an index-N blob newer than the current generation [" + indexGeneration + + "] when deleting index-N blobs " + blobsToDelete; + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, + "Failed to finalize snapshot creation [" + snapshotId + "] with shard index [" + + indexShardSnapshotsFormat.blobName(indexGeneration) + "]", e); + } + try { + shardContainer.deleteBlobsIgnoringIfNotExists(blobsToDelete); + } catch (IOException e) { + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete old index-N blobs during finalization", + snapshotId, shardId), e); + } + snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis()); + snapshotDoneListener.onResponse(null); + }, snapshotDoneListener::onFailure); + if (indexIncrementalFileCount == 0) { + allFilesUploadedListener.onResponse(Collections.emptyList()); + return; } - try { - shardContainer.deleteBlobsIgnoringIfNotExists(blobsToDelete); - } catch (IOException e) { - logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete old index-N blobs during finalization", - snapshotId, shardId), e); + final GroupedActionListener filesListener = + new GroupedActionListener<>(allFilesUploadedListener, indexIncrementalFileCount); + final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + for (BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo : filesToSnapshot) { + executor.execute(new ActionRunnable(filesListener) { + @Override + protected void doRun() { + try { + snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store); + filesListener.onResponse(null); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", e); + } + } + }); } - snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis()); } catch (Exception e) { - snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), ExceptionsHelper.detailedMessage(e)); - if (e instanceof IndexShardSnapshotFailedException) { - throw (IndexShardSnapshotFailedException) e; - } else { - throw new IndexShardSnapshotFailedException(store.shardId(), e); - } + snapshotDoneListener.onFailure(e); } } @@ -1245,6 +1266,7 @@ private void snapshotFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, IndexId IndexShardSnapshotStatus snapshotStatus, Store store) throws IOException { final BlobContainer shardContainer = shardContainer(indexId, shardId); final String file = fileInfo.physicalName(); + store.incRef(); try (IndexInput indexInput = store.openVerifyingInput(file, IOContext.READONCE, fileInfo.metadata())) { for (int i = 0; i < fileInfo.numberOfParts(); i++) { final long partBytes = fileInfo.partBytes(i); @@ -1284,6 +1306,8 @@ private void checkAborted() { failStoreIfCorrupted(store, t); snapshotStatus.addProcessedFile(0); throw t; + } finally { + store.decRef(); } } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index f313aff03b977..964db78f5662c 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -23,7 +23,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; @@ -53,9 +52,8 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.engine.SnapshotFailedEngineException; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; @@ -79,7 +77,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.Executor; import java.util.function.Function; import java.util.stream.Collectors; @@ -297,46 +294,33 @@ private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) { } private void startNewShards(SnapshotsInProgress.Entry entry, Map startedShards) { - final Snapshot snapshot = entry.snapshot(); - final Map indicesMap = entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity())); - final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - for (final Map.Entry shardEntry : startedShards.entrySet()) { - final ShardId shardId = shardEntry.getKey(); - final IndexId indexId = indicesMap.get(shardId.getIndexName()); - assert indexId != null; - executor.execute(new AbstractRunnable() { - - private final SetOnce failure = new SetOnce<>(); - - @Override - public void doRun() { - final IndexShard indexShard = - indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()); - snapshot(indexShard, snapshot, indexId, shardEntry.getValue()); - } - - @Override - public void onFailure(Exception e) { - logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); - failure.set(e); - } - - @Override - public void onRejection(Exception e) { - failure.set(e); - } - - @Override - public void onAfter() { - final Exception exception = failure.get(); - if (exception != null) { - notifyFailedSnapshotShard(snapshot, shardId, ExceptionsHelper.detailedMessage(exception)); - } else { + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { + final Snapshot snapshot = entry.snapshot(); + final Map indicesMap = + entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity())); + for (final Map.Entry shardEntry : startedShards.entrySet()) { + final ShardId shardId = shardEntry.getKey(); + final IndexShardSnapshotStatus snapshotStatus = shardEntry.getValue(); + final IndexId indexId = indicesMap.get(shardId.getIndexName()); + assert indexId != null; + snapshot(shardId, snapshot, indexId, snapshotStatus, new ActionListener() { + @Override + public void onResponse(final Void aVoid) { + if (logger.isDebugEnabled()) { + final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); + logger.debug("snapshot ({}) completed to {} with {}", snapshot, snapshot.getRepository(), lastSnapshotStatus); + } notifySuccessfulSnapshotShard(snapshot, shardId); } - } - }); - } + + @Override + public void onFailure(Exception e) { + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); + notifyFailedSnapshotShard(snapshot, shardId, ExceptionsHelper.detailedMessage(e)); + } + }); + } + }); } /** @@ -345,37 +329,37 @@ public void onAfter() { * @param snapshot snapshot * @param snapshotStatus snapshot status */ - private void snapshot(final IndexShard indexShard, final Snapshot snapshot, final IndexId indexId, - final IndexShardSnapshotStatus snapshotStatus) { - final ShardId shardId = indexShard.shardId(); - if (indexShard.routingEntry().primary() == false) { - throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary"); - } - if (indexShard.routingEntry().relocating()) { - // do not snapshot when in the process of relocation of primaries so we won't get conflicts - throw new IndexShardSnapshotFailedException(shardId, "cannot snapshot while relocating"); - } + private void snapshot(final ShardId shardId, final Snapshot snapshot, final IndexId indexId, + final IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { + try { + final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()); + if (indexShard.routingEntry().primary() == false) { + throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary"); + } + if (indexShard.routingEntry().relocating()) { + // do not snapshot when in the process of relocation of primaries so we won't get conflicts + throw new IndexShardSnapshotFailedException(shardId, "cannot snapshot while relocating"); + } - final IndexShardState indexShardState = indexShard.state(); - if (indexShardState == IndexShardState.CREATED || indexShardState == IndexShardState.RECOVERING) { - // shard has just been created, or still recovering - throw new IndexShardSnapshotFailedException(shardId, "shard didn't fully recover yet"); - } + final IndexShardState indexShardState = indexShard.state(); + if (indexShardState == IndexShardState.CREATED || indexShardState == IndexShardState.RECOVERING) { + // shard has just been created, or still recovering + throw new IndexShardSnapshotFailedException(shardId, "shard didn't fully recover yet"); + } - final Repository repository = repositoriesService.repository(snapshot.getRepository()); - try { - // we flush first to make sure we get the latest writes snapshotted - try (Engine.IndexCommitRef snapshotRef = indexShard.acquireLastIndexCommit(true)) { - repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), snapshotStatus); - if (logger.isDebugEnabled()) { - final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); - logger.debug("snapshot ({}) completed to {} with {}", snapshot, repository, lastSnapshotStatus); - } + final Repository repository = repositoriesService.repository(snapshot.getRepository()); + Engine.IndexCommitRef snapshotRef = null; + try { + // we flush first to make sure we get the latest writes snapshotted + snapshotRef = indexShard.acquireLastIndexCommit(true); + repository.snapshotShard(indexShard.store(), indexShard.mapperService(), snapshot.getSnapshotId(), indexId, + snapshotRef.getIndexCommit(), snapshotStatus, ActionListener.runBefore(listener, snapshotRef::close)); + } catch (Exception e) { + IOUtils.close(snapshotRef); + throw e; } - } catch (SnapshotFailedEngineException | IndexShardSnapshotFailedException e) { - throw e; } catch (Exception e) { - throw new IndexShardSnapshotFailedException(shardId, "Failed to snapshot", e); + listener.onFailure(e); } } diff --git a/server/src/test/java/org/elasticsearch/action/ActionListenerTests.java b/server/src/test/java/org/elasticsearch/action/ActionListenerTests.java index cd3735b4843e6..4f9b63fb75e6c 100644 --- a/server/src/test/java/org/elasticsearch/action/ActionListenerTests.java +++ b/server/src/test/java/org/elasticsearch/action/ActionListenerTests.java @@ -171,6 +171,23 @@ public void testRunAfter() { } } + public void testRunBefore() { + { + AtomicBoolean afterSuccess = new AtomicBoolean(); + ActionListener listener = + ActionListener.runBefore(ActionListener.wrap(r -> {}, e -> {}), () -> afterSuccess.set(true)); + listener.onResponse(null); + assertThat(afterSuccess.get(), equalTo(true)); + } + { + AtomicBoolean afterFailure = new AtomicBoolean(); + ActionListener listener = + ActionListener.runBefore(ActionListener.wrap(r -> {}, e -> {}), () -> afterFailure.set(true)); + listener.onFailure(null); + assertThat(afterFailure.get(), equalTo(true)); + } + } + public void testNotifyOnce() { AtomicInteger onResponseTimes = new AtomicInteger(); AtomicInteger onFailureTimes = new AtomicInteger(); diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index d79763a9f6eab..55a365af5d5ff 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -202,7 +202,7 @@ public boolean isReadOnly() { @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit - snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) { + snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { } diff --git a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java index 1dc7a6263d37b..6c48a19cbb5e6 100644 --- a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java @@ -35,6 +35,7 @@ import org.apache.lucene.util.IOSupplier; import org.apache.lucene.util.TestUtil; import org.elasticsearch.Version; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -99,10 +100,12 @@ public void testSnapshotAndRestore() throws IOException, InterruptedException { IndexId indexId = new IndexId(idxSettings.getIndex().getName(), idxSettings.getUUID()); IndexCommit indexCommit = Lucene.getIndexCommit(Lucene.readSegmentInfos(store.directory()), store.directory()); + final PlainActionFuture future1 = PlainActionFuture.newFuture(); runGeneric(threadPool, () -> { IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(); repository.snapshotShard(store, null, snapshotId, indexId, indexCommit, - snapshotStatus); + snapshotStatus, future1); + future1.actionGet(); IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy(); assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount()); }); @@ -124,9 +127,11 @@ public void testSnapshotAndRestore() throws IOException, InterruptedException { SnapshotId incSnapshotId = new SnapshotId("test1", "test1"); IndexCommit incIndexCommit = Lucene.getIndexCommit(Lucene.readSegmentInfos(store.directory()), store.directory()); Collection commitFileNames = incIndexCommit.getFileNames(); + final PlainActionFuture future2 = PlainActionFuture.newFuture(); runGeneric(threadPool, () -> { IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(); - repository.snapshotShard(store, null, incSnapshotId, indexId, incIndexCommit, snapshotStatus); + repository.snapshotShard(store, null, incSnapshotId, indexId, incIndexCommit, snapshotStatus, future2); + future2.actionGet(); IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy(); assertEquals(2, copy.getIncrementalFileCount()); assertEquals(commitFileNames.size(), copy.getTotalFileCount()); @@ -198,4 +203,5 @@ private int indexDocs(Directory directory) throws IOException { return docs; } } + } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index bc5a368c47daa..cce9780b09223 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -832,12 +832,14 @@ protected void snapshotShard(final IndexShard shard, final Snapshot snapshot, final Repository repository) throws IOException { final IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(); + final PlainActionFuture future = PlainActionFuture.newFuture(); try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(true)) { Index index = shard.shardId().getIndex(); IndexId indexId = new IndexId(index.getName(), index.getUUID()); repository.snapshotShard(shard.store(), shard.mapperService(), snapshot.getSnapshotId(), indexId, - indexCommitRef.getIndexCommit(), snapshotStatus); + indexCommitRef.getIndexCommit(), snapshotStatus, future); + future.actionGet(); } final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index 418cee00c0d21..313bf7c5daaa1 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -135,7 +135,7 @@ public boolean isReadOnly() { @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, - IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) { + IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 0231681666f5a..e8e7567091342 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -296,7 +296,7 @@ public boolean isReadOnly() { @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, - IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) { + IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java index f6c3124c9be9f..8e0f7d04c3056 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java @@ -15,6 +15,7 @@ import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.SimpleFSDirectory; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; @@ -24,6 +25,7 @@ import org.elasticsearch.common.lucene.search.Queries; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.ShardLock; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.ReadOnlyEngine; @@ -35,9 +37,11 @@ import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; +import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -108,11 +112,13 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, - IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) { + IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { if (mapperService.documentMapper() != null // if there is no mapping this is null && mapperService.documentMapper().sourceMapper().isComplete() == false) { - throw new IllegalStateException("Can't snapshot _source only on an index that has incomplete source ie. has _source disabled " + - "or filters the source"); + listener.onFailure( + new IllegalStateException("Can't snapshot _source only on an index that has incomplete source ie. has _source disabled " + + "or filters the source")); + return; } Directory unwrap = FilterDirectory.unwrap(store.directory()); if (unwrap instanceof FSDirectory == false) { @@ -121,7 +127,10 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s Path dataPath = ((FSDirectory) unwrap).getDirectory().getParent(); // TODO should we have a snapshot tmp directory per shard that is maintained by the system? Path snapPath = dataPath.resolve(SNAPSHOT_DIR_NAME); - try (FSDirectory directory = new SimpleFSDirectory(snapPath)) { + final List toClose = new ArrayList<>(3); + try { + FSDirectory directory = new SimpleFSDirectory(snapPath); + toClose.add(directory); Store tempStore = new Store(store.shardId(), store.indexSettings(), directory, new ShardLock(store.shardId()) { @Override protected void closeInternal() { @@ -137,16 +146,20 @@ protected void closeInternal() { final long maxDoc = segmentInfos.totalMaxDoc(); tempStore.bootstrapNewHistory(maxDoc, maxDoc); store.incRef(); - try (DirectoryReader reader = DirectoryReader.open(tempStore.directory(), - Collections.singletonMap(BlockTreeTermsReader.FST_MODE_KEY, BlockTreeTermsReader.FSTLoadMode.OFF_HEAP.name()))) { - IndexCommit indexCommit = reader.getIndexCommit(); - super.snapshotShard(tempStore, mapperService, snapshotId, indexId, indexCommit, snapshotStatus); - } finally { - store.decRef(); - } + toClose.add(store::decRef); + DirectoryReader reader = DirectoryReader.open(tempStore.directory(), + Collections.singletonMap(BlockTreeTermsReader.FST_MODE_KEY, BlockTreeTermsReader.FSTLoadMode.OFF_HEAP.name())); + toClose.add(reader); + IndexCommit indexCommit = reader.getIndexCommit(); + super.snapshotShard(tempStore, mapperService, snapshotId, indexId, indexCommit, snapshotStatus, + ActionListener.runBefore(listener, () -> IOUtils.close(toClose))); } catch (IOException e) { - // why on earth does this super method not declare IOException - throw new UncheckedIOException(e); + try { + IOUtils.close(toClose); + } catch (IOException ex) { + e.addSuppressed(ex); + } + listener.onFailure(e); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index 14aae50b3b1cd..b875f76ac59cf 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -96,12 +96,13 @@ public void testSourceIncomplete() throws IOException { repository.start(); try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) { IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(); - IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, () -> - runAsSnapshot(shard.getThreadPool(), - () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, - snapshotRef.getIndexCommit(), indexShardSnapshotStatus))); - assertEquals("Can't snapshot _source only on an index that has incomplete source ie. has _source disabled or filters the source" - , illegalStateException.getMessage()); + final PlainActionFuture future = PlainActionFuture.newFuture(); + runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, + snapshotRef.getIndexCommit(), indexShardSnapshotStatus, future)); + IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, future::actionGet); + assertEquals( + "Can't snapshot _source only on an index that has incomplete source ie. has _source disabled or filters the source", + illegalStateException.getMessage()); } closeShards(shard); } @@ -120,8 +121,10 @@ public void testIncrementalSnapshot() throws IOException { try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) { IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(); SnapshotId snapshotId = new SnapshotId("test", "test"); + final PlainActionFuture future = PlainActionFuture.newFuture(); runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, - snapshotRef.getIndexCommit(), indexShardSnapshotStatus)); + snapshotRef.getIndexCommit(), indexShardSnapshotStatus, future)); + future.actionGet(); IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount()); totalFileCount = copy.getTotalFileCount(); @@ -134,8 +137,10 @@ public void testIncrementalSnapshot() throws IOException { SnapshotId snapshotId = new SnapshotId("test_1", "test_1"); IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(); + final PlainActionFuture future = PlainActionFuture.newFuture(); runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, - snapshotRef.getIndexCommit(), indexShardSnapshotStatus)); + snapshotRef.getIndexCommit(), indexShardSnapshotStatus, future)); + future.actionGet(); IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); // we processed the segments_N file plus _1.si, _1.fdx, _1.fnm, _1.fdt assertEquals(5, copy.getIncrementalFileCount()); @@ -148,8 +153,10 @@ public void testIncrementalSnapshot() throws IOException { SnapshotId snapshotId = new SnapshotId("test_2", "test_2"); IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(); + final PlainActionFuture future = PlainActionFuture.newFuture(); runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, - snapshotRef.getIndexCommit(), indexShardSnapshotStatus)); + snapshotRef.getIndexCommit(), indexShardSnapshotStatus, future)); + future.actionGet(); IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); // we processed the segments_N file plus _1_1.liv assertEquals(2, copy.getIncrementalFileCount()); @@ -197,8 +204,10 @@ public void testRestoreMinmal() throws IOException { repository.initializeSnapshot(snapshotId, Arrays.asList(indexId), MetaData.builder().put(shard.indexSettings() .getIndexMetaData(), false).build()); + final PlainActionFuture future = PlainActionFuture.newFuture(); repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef.getIndexCommit(), - indexShardSnapshotStatus); + indexShardSnapshotStatus, future); + future.actionGet(); }); IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount()); From dcd114613b26e11e38fbeb48e98d82afea713c61 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 2 Sep 2019 11:26:43 +0200 Subject: [PATCH 2/2] Cleanup BlobStoreRepository Abort and Failure Handling (#46208) Aborts and failures were handled in a somewhat unfortunate way in #42791: Since the tasks for all files are generated before uploading they are all executed when a snapshot is aborted and lead to a massive number of failures added to the original aborted exception. In the case of failures the situation was not very reasonable as well. If one blob fails uploading the snapshot logic would upload all the remaining files as well and then fail (when previously it would just fail all following files). I fixed both of the above issues, by just short-circuiting all remaining tasks for a shard in case of an exception in any one upload. --- .../repositories/blobstore/BlobStoreRepository.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 036e1ceea4331..e312acaf83c1d 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -110,6 +110,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName; @@ -1065,17 +1066,27 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s final GroupedActionListener filesListener = new GroupedActionListener<>(allFilesUploadedListener, indexIncrementalFileCount); final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + // Flag to signal that the snapshot has been aborted/failed so we can stop any further blob uploads from starting + final AtomicBoolean alreadyFailed = new AtomicBoolean(); for (BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo : filesToSnapshot) { executor.execute(new ActionRunnable(filesListener) { @Override protected void doRun() { try { - snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store); + if (alreadyFailed.get() == false) { + snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store); + } filesListener.onResponse(null); } catch (IOException e) { throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", e); } } + + @Override + public void onFailure(Exception e) { + alreadyFailed.set(true); + super.onFailure(e); + } }); } } catch (Exception e) {