From a02c3c343da873f583988b54e5ef465b34b40fc3 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 15 Oct 2019 03:40:28 +0200 Subject: [PATCH] Fix Bug in Azure Repo Exception Handling (#47968) We were incorrectly handling `IOExceptions` thrown by the `InputStream` side of the upload operation, resulting in a `ClassCastException` as we expected to never get `IOException` from the Azure SDK code but we do in practice. This PR also sets an assertion on `markSupported` for the streams used by the SDK as adding the test for this scenario revealed that the SDK client would retry uploads for non-mark-supporting streams on `IOException` in the `InputStream`. --- .../repositories/azure/AzureBlobContainer.java | 3 +-- .../repositories/azure/AzureBlobStore.java | 11 +++++------ .../repositories/azure/AzureStorageService.java | 9 +++++---- .../repositories/azure/SocketAccess.java | 16 ++++++++-------- 4 files changed, 19 insertions(+), 20 deletions(-) diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java index 37963648a7499..f02b7ff71dd2d 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java @@ -62,7 +62,7 @@ private boolean blobExists(String blobName) { logger.trace("blobExists({})", blobName); try { return blobStore.blobExists(buildKey(blobName)); - } catch (URISyntaxException | StorageException e) { + } catch (URISyntaxException | StorageException | IOException e) { logger.warn("can not access [{}] in container {{}}: {}", blobName, blobStore, e.getMessage()); } return false; @@ -97,7 +97,6 @@ public InputStream readBlob(String blobName) throws IOException { @Override public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { logger.trace("writeBlob({}, stream, {})", buildKey(blobName), blobSize); - try { blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists); } catch (URISyntaxException|StorageException e) { diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java index e4a7e3acb6526..4717da5f22ead 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java @@ -33,7 +33,6 @@ import java.io.IOException; import java.io.InputStream; import java.net.URISyntaxException; -import java.nio.file.FileAlreadyExistsException; import java.util.Collections; import java.util.Map; import java.util.concurrent.Executor; @@ -84,11 +83,11 @@ public BlobContainer blobContainer(BlobPath path) { public void close() { } - public boolean blobExists(String blob) throws URISyntaxException, StorageException { + public boolean blobExists(String blob) throws URISyntaxException, StorageException, IOException { return service.blobExists(clientName, container, blob); } - public void deleteBlob(String blob) throws URISyntaxException, StorageException { + public void deleteBlob(String blob) throws URISyntaxException, StorageException, IOException { service.deleteBlob(clientName, container, blob); } @@ -102,17 +101,17 @@ public InputStream getInputStream(String blob) throws URISyntaxException, Storag } public Map listBlobsByPrefix(String keyPath, String prefix) - throws URISyntaxException, StorageException { + throws URISyntaxException, StorageException, IOException { return service.listBlobsByPrefix(clientName, container, keyPath, prefix); } - public Map children(BlobPath path) throws URISyntaxException, StorageException { + public Map children(BlobPath path) throws URISyntaxException, StorageException, IOException { return Collections.unmodifiableMap(service.children(clientName, container, path).stream().collect( Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this, threadPool)))); } public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) - throws URISyntaxException, StorageException, FileAlreadyExistsException { + throws URISyntaxException, StorageException, IOException { service.writeBlob(this.clientName, container, blobName, inputStream, blobSize, failIfAlreadyExists); } } diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java index ef34c533501d2..02f3488cac04b 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java @@ -262,7 +262,7 @@ public InputStream getInputStream(String account, String container, String blob) } public Map listBlobsByPrefix(String account, String container, String keyPath, String prefix) - throws URISyntaxException, StorageException { + throws URISyntaxException, StorageException, IOException { // NOTE: this should be here: if (prefix == null) prefix = ""; // however, this is really inefficient since deleteBlobsByPrefix enumerates everything and // then does a prefix match on the result; it should just call listBlobsByPrefix with the prefix! @@ -290,7 +290,7 @@ public Map listBlobsByPrefix(String account, String contai return blobsBuilder.immutableMap(); } - public Set children(String account, String container, BlobPath path) throws URISyntaxException, StorageException { + public Set children(String account, String container, BlobPath path) throws URISyntaxException, StorageException, IOException { final Set blobsBuilder = new HashSet<>(); final Tuple> client = client(account); final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); @@ -314,8 +314,9 @@ public Set children(String account, String container, BlobPath path) thr } public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize, - boolean failIfAlreadyExists) - throws URISyntaxException, StorageException, FileAlreadyExistsException { + boolean failIfAlreadyExists) throws URISyntaxException, StorageException, IOException { + assert inputStream.markSupported() + : "Should not be used with non-mark supporting streams as their retry handling in the SDK is broken"; logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {})", blobName, blobSize)); final Tuple> client = client(account); final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/SocketAccess.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/SocketAccess.java index 1400cc5b06627..18acf088cdb32 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/SocketAccess.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/SocketAccess.java @@ -20,6 +20,7 @@ package org.elasticsearch.repositories.azure; import com.microsoft.azure.storage.StorageException; +import org.apache.logging.log4j.core.util.Throwables; import org.elasticsearch.SpecialPermission; import java.io.IOException; @@ -44,7 +45,9 @@ public static T doPrivilegedIOException(PrivilegedExceptionAction operati try { return AccessController.doPrivileged(operation); } catch (PrivilegedActionException e) { - throw (IOException) e.getCause(); + Throwables.rethrow(e.getCause()); + assert false : "always throws"; + return null; } } @@ -53,7 +56,9 @@ public static T doPrivilegedException(PrivilegedExceptionAction operation try { return AccessController.doPrivileged(operation); } catch (PrivilegedActionException e) { - throw (StorageException) e.getCause(); + Throwables.rethrow(e.getCause()); + assert false : "always throws"; + return null; } } @@ -65,12 +70,7 @@ public static void doPrivilegedVoidException(StorageRunnable action) throws Stor return null; }); } catch (PrivilegedActionException e) { - Throwable cause = e.getCause(); - if (cause instanceof StorageException) { - throw (StorageException) cause; - } else { - throw (URISyntaxException) cause; - } + Throwables.rethrow(e.getCause()); } }