2525import com .amazonaws .services .s3 .model .CompleteMultipartUploadRequest ;
2626import com .amazonaws .services .s3 .model .DeleteObjectsRequest ;
2727import com .amazonaws .services .s3 .model .InitiateMultipartUploadRequest ;
28+ import com .amazonaws .services .s3 .model .MultiObjectDeleteException ;
2829import com .amazonaws .services .s3 .model .ObjectListing ;
2930import com .amazonaws .services .s3 .model .ObjectMetadata ;
3031import com .amazonaws .services .s3 .model .PartETag ;
3435import com .amazonaws .services .s3 .model .UploadPartRequest ;
3536import com .amazonaws .services .s3 .model .UploadPartResult ;
3637import org .apache .lucene .util .SetOnce ;
38+ import org .elasticsearch .ExceptionsHelper ;
3739import org .elasticsearch .common .Nullable ;
3840import org .elasticsearch .common .Strings ;
3941import org .elasticsearch .common .blobstore .BlobMetaData ;
5052import java .util .ArrayList ;
5153import java .util .List ;
5254import java .util .Map ;
55+ import java .util .Set ;
56+ import java .util .stream .Collectors ;
5357
5458import static org .elasticsearch .repositories .s3 .S3Repository .MAX_FILE_SIZE ;
5559import static org .elasticsearch .repositories .s3 .S3Repository .MAX_FILE_SIZE_USING_MULTIPART ;
@@ -127,12 +131,13 @@ public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOExce
127131 if (blobNames .isEmpty ()) {
128132 return ;
129133 }
134+ final Set <String > outstanding = blobNames .stream ().map (this ::buildKey ).collect (Collectors .toSet ());
130135 try (AmazonS3Reference clientReference = blobStore .clientReference ()) {
131136 // S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes
132137 final List <DeleteObjectsRequest > deleteRequests = new ArrayList <>();
133138 final List <String > partition = new ArrayList <>();
134- for (String blob : blobNames ) {
135- partition .add (buildKey ( blob ) );
139+ for (String key : outstanding ) {
140+ partition .add (key );
136141 if (partition .size () == MAX_BULK_DELETES ) {
137142 deleteRequests .add (bulkDelete (blobStore .bucket (), partition ));
138143 partition .clear ();
@@ -144,23 +149,32 @@ public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOExce
144149 SocketAccess .doPrivilegedVoid (() -> {
145150 AmazonClientException aex = null ;
146151 for (DeleteObjectsRequest deleteRequest : deleteRequests ) {
152+ List <String > keysInRequest =
153+ deleteRequest .getKeys ().stream ().map (DeleteObjectsRequest .KeyVersion ::getKey ).collect (Collectors .toList ());
147154 try {
148155 clientReference .client ().deleteObjects (deleteRequest );
156+ outstanding .removeAll (keysInRequest );
157+ } catch (MultiObjectDeleteException e ) {
158+ // We are sending quiet mode requests so we can't use the deleted keys entry on the exception and instead
159+ // first remove all keys that were sent in the request and then add back those that ran into an exception.
160+ outstanding .removeAll (keysInRequest );
161+ outstanding .addAll (
162+ e .getErrors ().stream ().map (MultiObjectDeleteException .DeleteError ::getKey ).collect (Collectors .toSet ()));
163+ aex = ExceptionsHelper .useOrSuppress (aex , e );
149164 } catch (AmazonClientException e ) {
150- if (aex == null ) {
151- aex = e ;
152- } else {
153- aex .addSuppressed (e );
154- }
165+ // The AWS client threw any unexpected exception and did not execute the request at all so we do not
166+ // remove any keys from the outstanding deletes set.
167+ aex = ExceptionsHelper .useOrSuppress (aex , e );
155168 }
156169 }
157170 if (aex != null ) {
158171 throw aex ;
159172 }
160173 });
161- } catch (final AmazonClientException e ) {
162- throw new IOException ("Exception when deleting blobs [" + blobNames + "]" , e );
174+ } catch (Exception e ) {
175+ throw new IOException ("Failed to delete blobs [" + outstanding + "]" , e );
163176 }
177+ assert outstanding .isEmpty ();
164178 }
165179
166180 private static DeleteObjectsRequest bulkDelete (String bucket , List <String > blobs ) {
0 commit comments