From e7dbe12d829235ba8bea0c873cef7ab66b5b59c7 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 7 May 2018 22:19:31 +0200 Subject: [PATCH 1/3] User proper write-once semantics for GCS repository --- .../gcs/GoogleCloudStorageBlobContainer.java | 3 --- .../gcs/GoogleCloudStorageBlobStore.java | 14 ++++++++++++-- .../repositories/gcs/MockStorage.java | 15 +++++++++++++++ .../ESBlobStoreContainerTestCase.java | 3 ++- 4 files changed, 29 insertions(+), 6 deletions(-) diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java index 331e2dadca2da..833539905103a 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java @@ -66,9 +66,6 @@ public InputStream readBlob(String blobName) throws IOException { @Override public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException { - if (blobExists(blobName)) { - throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite"); - } blobStore.writeBlob(buildKey(blobName), inputStream, blobSize); } diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index 225411f86dc49..aae5812369b90 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -42,6 +42,7 @@ import java.io.IOException; import java.io.InputStream; +import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; import java.util.ArrayList; import java.util.Collection; @@ -56,6 +57,7 @@ import java.util.stream.StreamSupport; import static java.net.HttpURLConnection.HTTP_NOT_FOUND; +import static java.net.HttpURLConnection.HTTP_PRECON_FAILED; class GoogleCloudStorageBlobStore extends AbstractComponent implements BlobStore { @@ -191,7 +193,7 @@ InputStream readBlob(String blobName) throws IOException { } catch (GoogleJsonResponseException e) { GoogleJsonError error = e.getDetails(); if ((e.getStatusCode() == HTTP_NOT_FOUND) || ((error != null) && (error.getCode() == HTTP_NOT_FOUND))) { - throw new NoSuchFileException(e.getMessage()); + throw new NoSuchFileException(blobName, null, e.getMessage()); } throw e; } @@ -209,8 +211,16 @@ void writeBlob(String blobName, InputStream inputStream, long blobSize) throws I stream.setLength(blobSize); Storage.Objects.Insert insert = client.objects().insert(bucket, null, stream); + insert.setIfGenerationMatch(0L); // ensures that the file does not already exist insert.setName(blobName); - insert.execute(); + try { + insert.execute(); + } catch (GoogleJsonResponseException e) { + GoogleJsonError error = e.getDetails(); + if ((e.getStatusCode() == HTTP_PRECON_FAILED) || ((error != null) && (error.getCode() == HTTP_PRECON_FAILED))) { + throw new FileAlreadyExistsException(blobName, null, e.getMessage()); + } + } }); } diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/MockStorage.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/MockStorage.java index 325cea132beb6..5515b6ede37ac 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/MockStorage.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/MockStorage.java @@ -138,6 +138,16 @@ public StorageObject execute() throws IOException { throw newBucketNotFoundException(getBucket()); } + if (getIfGenerationMatch() != null) { + if (getIfGenerationMatch() == 0L) { + if (blobs.containsKey(getName())) { + throw newPreconditionFailedException(getName()); + } + } else { + throw new AssertionError("not implemented"); + } + } + ByteArrayOutputStream out = new ByteArrayOutputStream(); Streams.copy(insertStream.getInputStream(), out); blobs.put(getName(), out.toByteArray()); @@ -236,6 +246,11 @@ private static GoogleJsonResponseException newObjectNotFoundException(final Stri return new GoogleJsonResponseException(builder, new GoogleJsonError()); } + private static GoogleJsonResponseException newPreconditionFailedException(final String object) { + HttpResponseException.Builder builder = new HttpResponseException.Builder(412, "Precondition Failed: " + object, new HttpHeaders()); + return new GoogleJsonResponseException(builder, new GoogleJsonError()); + } + /** * {@link MockedHttpTransport} extends the existing testing transport to analyze the content * of {@link com.google.api.client.googleapis.batch.BatchRequest} and delete the appropriates diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java index 8aff12edc8a53..743be6d1bcb01 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.io.InputStream; +import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; import java.util.Arrays; import java.util.HashMap; @@ -149,7 +150,7 @@ public void testVerifyOverwriteFails() throws IOException { final BytesArray bytesArray = new BytesArray(data); writeBlob(container, blobName, bytesArray); // should not be able to overwrite existing blob - expectThrows(IOException.class, () -> writeBlob(container, blobName, bytesArray)); + expectThrows(FileAlreadyExistsException.class, () -> writeBlob(container, blobName, bytesArray)); container.deleteBlob(blobName); writeBlob(container, blobName, bytesArray); // after deleting the previous blob, we should be able to write to it again } From de9163a1200a1605d26f616b1f8c25e497b09ccb Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 15 May 2018 15:15:54 +0200 Subject: [PATCH 2/3] rethrow exception --- .../repositories/gcs/GoogleCloudStorageBlobStore.java | 1 + 1 file changed, 1 insertion(+) diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index aae5812369b90..4c240d11ec049 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -220,6 +220,7 @@ void writeBlob(String blobName, InputStream inputStream, long blobSize) throws I if ((e.getStatusCode() == HTTP_PRECON_FAILED) || ((error != null) && (error.getCode() == HTTP_PRECON_FAILED))) { throw new FileAlreadyExistsException(blobName, null, e.getMessage()); } + throw e; } }); } From 92c10a5f57ddd49b5029f8f0c83fe092dcc077e3 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Wed, 16 May 2018 13:49:39 +0200 Subject: [PATCH 3/3] wrap try catch around whole call --- .../gcs/GoogleCloudStorageBlobStore.java | 39 +++++++++++-------- .../repositories/gcs/MockStorage.java | 11 +++++- 2 files changed, 32 insertions(+), 18 deletions(-) diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index 31247dc0e8fc5..83aafdde2b1ab 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -208,25 +208,32 @@ void writeBlob(String blobName, InputStream inputStream, long blobSize) throws I * @param inputStream the stream containing the blob data */ private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream) throws IOException { - final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException( - () -> storage.writer(blobInfo, Storage.BlobWriteOption.doesNotExist())); - Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() { - @Override - public boolean isOpen() { - return writeChannel.isOpen(); - } + try { + final WriteChannel writeChannel = SocketAccess.doPrivilegedIOException( + () -> storage.writer(blobInfo, Storage.BlobWriteOption.doesNotExist())); + Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() { + @Override + public boolean isOpen() { + return writeChannel.isOpen(); + } - @Override - public void close() throws IOException { - SocketAccess.doPrivilegedVoidIOException(writeChannel::close); - } + @Override + public void close() throws IOException { + SocketAccess.doPrivilegedVoidIOException(writeChannel::close); + } - @SuppressForbidden(reason = "Channel is based of a socket not a file") - @Override - public int write(ByteBuffer src) throws IOException { - return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src)); + @SuppressForbidden(reason = "Channel is based of a socket not a file") + @Override + public int write(ByteBuffer src) throws IOException { + return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src)); + } + })); + } catch (StorageException se) { + if (se.getCode() == HTTP_PRECON_FAILED) { + throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage()); } - })); + throw se; + } } /** diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/MockStorage.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/MockStorage.java index 54c1b26296e38..1b31b3018e48a 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/MockStorage.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/MockStorage.java @@ -251,9 +251,16 @@ public boolean isOpen() { } @Override - public void close() throws IOException { + public void close() { IOUtils.closeWhileHandlingException(writableByteChannel); - blobs.put(blobInfo.getName(), output.toByteArray()); + if (Stream.of(options).anyMatch(option -> option.equals(BlobWriteOption.doesNotExist()))) { + byte[] existingBytes = blobs.putIfAbsent(blobInfo.getName(), output.toByteArray()); + if (existingBytes != null) { + throw new StorageException(412, "Blob already exists"); + } + } else { + blobs.put(blobInfo.getName(), output.toByteArray()); + } } }; }