From 85d5ef7d57ee65dc41451e2592507da8e42a0c8a Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 7 May 2018 22:16:09 +0200 Subject: [PATCH 1/3] Use stronger write-once semantics for Azure repository --- .../repositories/azure/AzureBlobContainer.java | 4 +--- .../repositories/azure/AzureBlobStore.java | 4 +++- .../repositories/azure/AzureStorageService.java | 3 ++- .../azure/AzureStorageServiceImpl.java | 14 ++++++++++++-- .../azure/AzureStorageServiceMock.java | 6 +++++- .../repositories/ESBlobStoreContainerTestCase.java | 3 ++- 6 files changed, 25 insertions(+), 9 deletions(-) diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java index 8f7671697db56..0b067ae9da0b0 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java @@ -88,10 +88,8 @@ public InputStream readBlob(String blobName) throws IOException { @Override public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException { - if (blobExists(blobName)) { - throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite"); - } logger.trace("writeBlob({}, stream, {})", buildKey(blobName), blobSize); + try { blobStore.writeBlob(buildKey(blobName), inputStream, blobSize); } catch (URISyntaxException|StorageException e) { diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java index 7e8987ae94576..5e8678bc19afc 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.URISyntaxException; +import java.nio.file.FileAlreadyExistsException; import java.util.Locale; import java.util.Map; @@ -122,7 +123,8 @@ public void moveBlob(String sourceBlob, String targetBlob) throws URISyntaxExcep this.client.moveBlob(this.clientName, this.locMode, container, sourceBlob, targetBlob); } - public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws URISyntaxException, StorageException { + public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws URISyntaxException, StorageException, + FileAlreadyExistsException { this.client.writeBlob(this.clientName, this.locMode, container, blobName, inputStream, blobSize); } } diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java index 3337c07e6eece..1f5c7c2806350 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.URISyntaxException; +import java.nio.file.FileAlreadyExistsException; import java.util.Map; /** @@ -61,7 +62,7 @@ void moveBlob(String account, LocationMode mode, String container, String source throws URISyntaxException, StorageException; void writeBlob(String account, LocationMode mode, String container, String blobName, InputStream inputStream, long blobSize) throws - URISyntaxException, StorageException; + URISyntaxException, StorageException, FileAlreadyExistsException; static InputStream giveSocketPermissionsToStream(InputStream stream) { return new InputStream() { diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageServiceImpl.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageServiceImpl.java index f21dbdfd269f4..7cdb2fe2ee460 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageServiceImpl.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageServiceImpl.java @@ -19,6 +19,7 @@ package org.elasticsearch.repositories.azure; +import com.microsoft.azure.storage.AccessCondition; import com.microsoft.azure.storage.CloudStorageAccount; import com.microsoft.azure.storage.LocationMode; import com.microsoft.azure.storage.OperationContext; @@ -43,8 +44,10 @@ import org.elasticsearch.repositories.RepositoryException; import java.io.InputStream; +import java.net.HttpURLConnection; import java.net.URI; import java.net.URISyntaxException; +import java.nio.file.FileAlreadyExistsException; import java.util.EnumSet; import java.util.HashMap; import java.util.Map; @@ -312,12 +315,19 @@ public void moveBlob(String account, LocationMode mode, String container, String @Override public void writeBlob(String account, LocationMode mode, String container, String blobName, InputStream inputStream, long blobSize) - throws URISyntaxException, StorageException { + throws URISyntaxException, StorageException, FileAlreadyExistsException { logger.trace("writeBlob({}, stream, {})", blobName, blobSize); CloudBlobClient client = this.getSelectedClient(account, mode); CloudBlobContainer blobContainer = client.getContainerReference(container); CloudBlockBlob blob = blobContainer.getBlockBlobReference(blobName); - SocketAccess.doPrivilegedVoidException(() -> blob.upload(inputStream, blobSize, null, null, generateOperationContext(account))); + try { + SocketAccess.doPrivilegedVoidException(() -> blob.upload(inputStream, blobSize, AccessCondition.generateIfNotExistsCondition(), + null, generateOperationContext(account))); + } catch (StorageException se) { + if (se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT) { + throw new FileAlreadyExistsException(blobName, null, se.getMessage()); + } + } logger.trace("writeBlob({}, stream, {}) - done", blobName, blobSize); } } diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceMock.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceMock.java index 80035d8f78840..e76eb707e2181 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceMock.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceMock.java @@ -34,6 +34,7 @@ import java.io.InputStream; import java.net.SocketPermission; import java.net.URISyntaxException; +import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; import java.security.AccessController; import java.util.Locale; @@ -120,7 +121,10 @@ public void moveBlob(String account, LocationMode mode, String container, String @Override public void writeBlob(String account, LocationMode mode, String container, String blobName, InputStream inputStream, long blobSize) - throws URISyntaxException, StorageException { + throws URISyntaxException, StorageException, FileAlreadyExistsException { + if (blobs.containsKey(blobName)) { + throw new FileAlreadyExistsException(blobName); + } try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { blobs.put(blobName, outputStream); Streams.copy(inputStream, outputStream); diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java index 8aff12edc8a53..743be6d1bcb01 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.io.InputStream; +import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; import java.util.Arrays; import java.util.HashMap; @@ -149,7 +150,7 @@ public void testVerifyOverwriteFails() throws IOException { final BytesArray bytesArray = new BytesArray(data); writeBlob(container, blobName, bytesArray); // should not be able to overwrite existing blob - expectThrows(IOException.class, () -> writeBlob(container, blobName, bytesArray)); + expectThrows(FileAlreadyExistsException.class, () -> writeBlob(container, blobName, bytesArray)); container.deleteBlob(blobName); writeBlob(container, blobName, bytesArray); // after deleting the previous blob, we should be able to write to it again } From 158d5f706bb054346f59f926183b8628f4c2a2c4 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 14 May 2018 14:36:28 +0200 Subject: [PATCH 2/3] rely on StorageErrorCode string instead --- .../repositories/azure/AzureStorageServiceImpl.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageServiceImpl.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageServiceImpl.java index 7cdb2fe2ee460..a58d3131ee3df 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageServiceImpl.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageServiceImpl.java @@ -25,6 +25,7 @@ import com.microsoft.azure.storage.OperationContext; import com.microsoft.azure.storage.RetryExponentialRetry; import com.microsoft.azure.storage.RetryPolicy; +import com.microsoft.azure.storage.StorageErrorCodeStrings; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.BlobInputStream; import com.microsoft.azure.storage.blob.BlobListingDetails; @@ -44,7 +45,6 @@ import org.elasticsearch.repositories.RepositoryException; import java.io.InputStream; -import java.net.HttpURLConnection; import java.net.URI; import java.net.URISyntaxException; import java.nio.file.FileAlreadyExistsException; @@ -324,9 +324,10 @@ public void writeBlob(String account, LocationMode mode, String container, Strin SocketAccess.doPrivilegedVoidException(() -> blob.upload(inputStream, blobSize, AccessCondition.generateIfNotExistsCondition(), null, generateOperationContext(account))); } catch (StorageException se) { - if (se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT) { + if (StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(se.getErrorCode())) { throw new FileAlreadyExistsException(blobName, null, se.getMessage()); } + throw se; } logger.trace("writeBlob({}, stream, {}) - done", blobName, blobSize); } From 903a07afd8901b765e2e06ed72eaf877bd13a20d Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 14 May 2018 15:10:59 +0200 Subject: [PATCH 3/3] extra checks --- .../repositories/azure/AzureStorageTestServer.java | 14 ++++++++++++-- .../azure/AzureStorageServiceImpl.java | 4 +++- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageTestServer.java b/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageTestServer.java index 584428f9a45b0..65b18f9867bc0 100644 --- a/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageTestServer.java +++ b/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageTestServer.java @@ -188,13 +188,19 @@ private static PathTrie defaultHandlers(final String endpoint, f byte[] bytes = srcContainer.objects.get(srcBlobName); if (bytes != null) { - destContainer.objects.put(destBlobName, bytes); + byte[] existingBytes = destContainer.objects.putIfAbsent(destBlobName, bytes); + if (existingBytes != null) { + return newBlobAlreadyExistsError(requestId); + } return new Response(RestStatus.ACCEPTED, singletonMap("x-ms-copy-status", "success"), "text/plain", EMPTY_BYTE); } else { return newBlobNotFoundError(requestId); } } else { - destContainer.objects.put(destBlobName, body); + byte[] existingBytes = destContainer.objects.putIfAbsent(destBlobName, body); + if (existingBytes != null) { + return newBlobAlreadyExistsError(requestId); + } } return new Response(RestStatus.CREATED, emptyMap(), "text/plain", EMPTY_BYTE); @@ -395,6 +401,10 @@ private static Response newBlobNotFoundError(final long requestId) { return newError(requestId, RestStatus.NOT_FOUND, "BlobNotFound", "The specified blob does not exist"); } + private static Response newBlobAlreadyExistsError(final long requestId) { + return newError(requestId, RestStatus.CONFLICT, "BlobAlreadyExists", "The specified blob already exists"); + } + private static Response newInternalError(final long requestId) { return newError(requestId, RestStatus.INTERNAL_SERVER_ERROR, "InternalError", "The server encountered an internal error"); } diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageServiceImpl.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageServiceImpl.java index a58d3131ee3df..681ace40a20ba 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageServiceImpl.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageServiceImpl.java @@ -45,6 +45,7 @@ import org.elasticsearch.repositories.RepositoryException; import java.io.InputStream; +import java.net.HttpURLConnection; import java.net.URI; import java.net.URISyntaxException; import java.nio.file.FileAlreadyExistsException; @@ -324,7 +325,8 @@ public void writeBlob(String account, LocationMode mode, String container, Strin SocketAccess.doPrivilegedVoidException(() -> blob.upload(inputStream, blobSize, AccessCondition.generateIfNotExistsCondition(), null, generateOperationContext(account))); } catch (StorageException se) { - if (StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(se.getErrorCode())) { + if (se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT && + StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(se.getErrorCode())) { throw new FileAlreadyExistsException(blobName, null, se.getMessage()); } throw se;