diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java index 8ad9b453a9092..fb81a5c90039f 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java @@ -26,7 +26,9 @@ import java.io.IOException; import java.io.InputStream; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; class GoogleCloudStorageBlobContainer extends AbstractBlobContainer { @@ -78,7 +80,12 @@ public void deleteBlob(String blobName) throws IOException { blobStore.deleteBlob(buildKey(blobName)); } - protected String buildKey(String blobName) { + @Override + public void deleteBlobsIgnoringIfNotExists(List blobNames) throws IOException { + blobStore.deleteBlobsIgnoringIfNotExists(blobNames.stream().map(this::buildKey).collect(Collectors.toList())); + } + + private String buildKey(String blobName) { assert blobName != null; return path + blobName; } diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index 84184660159a4..88489c4fcb18f 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -19,6 +19,7 @@ package org.elasticsearch.repositories.gcs; +import com.google.cloud.BatchResult; import com.google.cloud.ReadChannel; import com.google.cloud.WriteChannel; import com.google.cloud.storage.Blob; @@ -27,10 +28,9 @@ import com.google.cloud.storage.Bucket; import com.google.cloud.storage.Storage; import com.google.cloud.storage.Storage.BlobListOption; +import com.google.cloud.storage.StorageBatch; import com.google.cloud.storage.StorageException; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; @@ -50,17 +50,18 @@ import java.nio.channels.WritableByteChannel; import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static java.net.HttpURLConnection.HTTP_NOT_FOUND; import static java.net.HttpURLConnection.HTTP_PRECON_FAILED; class GoogleCloudStorageBlobStore implements BlobStore { - - private static final Logger logger = LogManager.getLogger(GoogleCloudStorageBlobStore.class); // The recommended maximum size of a blob that should be uploaded in a single // request. Larger files should be uploaded over multiple requests (this is @@ -105,7 +106,7 @@ public void close() { * @param bucketName name of the bucket * @return true iff the bucket exists */ - boolean doesBucketExist(String bucketName) { + private boolean doesBucketExist(String bucketName) { try { final Bucket bucket = SocketAccess.doPrivilegedIOException(() -> client().get(bucketName)); return bucket != null; @@ -295,8 +296,8 @@ void deleteBlob(String blobName) throws IOException { * * @param prefix prefix of the blobs to delete */ - void deleteBlobsByPrefix(String prefix) throws IOException { - deleteBlobs(listBlobsByPrefix("", prefix).keySet()); + private void deleteBlobsByPrefix(String prefix) throws IOException { + deleteBlobsIgnoringIfNotExists(listBlobsByPrefix("", prefix).keySet()); } /** @@ -304,7 +305,7 @@ void deleteBlobsByPrefix(String prefix) throws IOException { * * @param blobNames names of the blobs to delete */ - void deleteBlobs(Collection blobNames) throws IOException { + void deleteBlobsIgnoringIfNotExists(Collection blobNames) throws IOException { if (blobNames.isEmpty()) { return; } @@ -314,17 +315,33 @@ void deleteBlobs(Collection blobNames) throws IOException { return; } final List blobIdsToDelete = blobNames.stream().map(blob -> BlobId.of(bucketName, blob)).collect(Collectors.toList()); - final List deletedStatuses = SocketAccess.doPrivilegedIOException(() -> client().delete(blobIdsToDelete)); - assert blobIdsToDelete.size() == deletedStatuses.size(); - boolean failed = false; - for (int i = 0; i < blobIdsToDelete.size(); i++) { - if (deletedStatuses.get(i) == false) { - logger.error("Failed to delete blob [{}] in bucket [{}]", blobIdsToDelete.get(i).getName(), bucketName); - failed = true; + final List failedBlobs = Collections.synchronizedList(new ArrayList<>()); + final StorageException e = SocketAccess.doPrivilegedIOException(() -> { + final AtomicReference ioe = new AtomicReference<>(); + final StorageBatch batch = client().batch(); + for (BlobId blob : blobIdsToDelete) { + batch.delete(blob).notify( + new BatchResult.Callback() { + @Override + public void success(Boolean result) { + } + + @Override + public void error(StorageException exception) { + if (exception.getCode() != HTTP_NOT_FOUND) { + failedBlobs.add(blob); + if (ioe.compareAndSet(null, exception) == false) { + ioe.get().addSuppressed(exception); + } + } + } + }); } - } - if (failed) { - throw new IOException("Failed to delete all [" + blobIdsToDelete.size() + "] blobs"); + batch.submit(); + return ioe.get(); + }); + if (e != null) { + throw new IOException("Exception when deleting blobs [" + failedBlobs + "]", e); } } diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/MockStorage.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/MockStorage.java index 97c7e2ab76bd2..eddf2a9f78082 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/MockStorage.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/MockStorage.java @@ -20,6 +20,7 @@ package org.elasticsearch.repositories.gcs; import com.google.api.gax.paging.Page; +import com.google.cloud.BatchResult; import com.google.cloud.Policy; import com.google.cloud.ReadChannel; import com.google.cloud.RestorableState; @@ -34,11 +35,13 @@ import com.google.cloud.storage.ServiceAccount; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageBatch; +import com.google.cloud.storage.StorageBatchResult; import com.google.cloud.storage.StorageException; import com.google.cloud.storage.StorageOptions; import com.google.cloud.storage.StorageRpcOptionUtils; import com.google.cloud.storage.StorageTestUtils; import org.elasticsearch.core.internal.io.IOUtils; +import org.mockito.stubbing.Answer; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -57,6 +60,11 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyVararg; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; + /** * {@link MockStorage} mocks a {@link Storage} client by storing all the blobs * in a given concurrent map. @@ -356,8 +364,25 @@ public byte[] readAllBytes(BlobId blob, BlobSourceOption... options) { } @Override + @SuppressWarnings("unchecked") public StorageBatch batch() { - return null; + final Answer throwOnMissingMock = invocationOnMock -> { + throw new AssertionError("Did not expect call to method [" + invocationOnMock.getMethod().getName() + ']'); + }; + final StorageBatch batch = mock(StorageBatch.class, throwOnMissingMock); + StorageBatchResult result = mock(StorageBatchResult.class, throwOnMissingMock); + doAnswer(answer -> { + BatchResult.Callback callback = (BatchResult.Callback) answer.getArguments()[0]; + callback.success(true); + return null; + }).when(result).notify(any(BatchResult.Callback.class)); + doAnswer(invocation -> { + final BlobId blobId = (BlobId) invocation.getArguments()[0]; + delete(blobId); + return result; + }).when(batch).delete(any(BlobId.class), anyVararg()); + doAnswer(invocation -> null).when(batch).submit(); + return batch; } @Override