|
110 | 110 | import java.util.Optional; |
111 | 111 | import java.util.Set; |
112 | 112 | import java.util.concurrent.Executor; |
| 113 | +import java.util.concurrent.atomic.AtomicBoolean; |
113 | 114 | import java.util.stream.Collectors; |
114 | 115 |
|
115 | 116 | import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName; |
@@ -1048,17 +1049,27 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s |
1048 | 1049 | final GroupedActionListener<Void> filesListener = |
1049 | 1050 | new GroupedActionListener<>(allFilesUploadedListener, indexIncrementalFileCount); |
1050 | 1051 | final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); |
| 1052 | + // Flag to signal that the snapshot has been aborted/failed so we can stop any further blob uploads from starting |
| 1053 | + final AtomicBoolean alreadyFailed = new AtomicBoolean(); |
1051 | 1054 | for (BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo : filesToSnapshot) { |
1052 | 1055 | executor.execute(new ActionRunnable<>(filesListener) { |
1053 | 1056 | @Override |
1054 | 1057 | protected void doRun() { |
1055 | 1058 | try { |
1056 | | - snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store); |
| 1059 | + if (alreadyFailed.get() == false) { |
| 1060 | + snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store); |
| 1061 | + } |
1057 | 1062 | filesListener.onResponse(null); |
1058 | 1063 | } catch (IOException e) { |
1059 | 1064 | throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", e); |
1060 | 1065 | } |
1061 | 1066 | } |
| 1067 | + |
| 1068 | + @Override |
| 1069 | + public void onFailure(Exception e) { |
| 1070 | + alreadyFailed.set(true); |
| 1071 | + super.onFailure(e); |
| 1072 | + } |
1062 | 1073 | }); |
1063 | 1074 | } |
1064 | 1075 | } catch (Exception e) { |
|
0 commit comments