From d8e942ddcf251079de7d130354d4328669bea6d5 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Wed, 11 Dec 2024 23:50:41 +0000 Subject: [PATCH 1/2] fix: update retry lifecycle when attempting to decompress a gzip object If the initial response failed with a retryable error and the error should be retried, it wasn't. It would only retry for reading bytes after the initial response had been received. --- .../ApiaryUnbufferedReadableByteChannel.java | 26 +++++---- .../GapicUnbufferedReadableByteChannel.java | 17 ++++-- .../conformance/retry/RpcMethodMappings.java | 53 +++++++++++++------ 3 files changed, 65 insertions(+), 31 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedReadableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedReadableByteChannel.java index a5dd1375e0..7c1f475553 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedReadableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedReadableByteChannel.java @@ -23,12 +23,12 @@ import com.google.api.client.http.HttpResponse; import com.google.api.client.http.HttpResponseException; import com.google.api.core.SettableApiFuture; +import com.google.api.gax.retrying.BasicResultRetryAlgorithm; import com.google.api.gax.retrying.ResultRetryAlgorithm; import com.google.api.services.storage.Storage; import com.google.api.services.storage.Storage.Objects; import com.google.api.services.storage.Storage.Objects.Get; import com.google.api.services.storage.model.StorageObject; -import com.google.cloud.BaseServiceException; import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel; import com.google.cloud.storage.spi.v1.StorageRpc; import com.google.common.annotations.VisibleForTesting; @@ -84,7 +84,17 @@ class ApiaryUnbufferedReadableByteChannel implements UnbufferedReadableByteChann this.storage = storage; this.result = result; this.options = options; - this.resultRetryAlgorithm = resultRetryAlgorithm; + this.resultRetryAlgorithm = + new BasicResultRetryAlgorithm() { + @Override + public boolean shouldRetry(Throwable previousThrowable, Object previousResponse) { + boolean shouldRetry = resultRetryAlgorithm.shouldRetry(previousThrowable, null); + if (!shouldRetry) { + result.setException(previousThrowable); + } + return shouldRetry; + } + }; this.open = true; this.returnEOF = false; this.position = apiaryReadRequest.getByteRangeSpec().beginOffset(); @@ -210,17 +220,11 @@ private ScatteringByteChannel open() { throw new StorageException(404, "Failure while trying to resume download", e); } } - StorageException translate = StorageException.translate(e); - result.setException(translate); - throw translate; + throw StorageException.translate(e); } catch (IOException e) { - StorageException translate = StorageException.translate(e); - result.setException(translate); - throw translate; + throw StorageException.translate(e); } catch (Throwable t) { - BaseServiceException coalesce = StorageException.coalesce(t); - result.setException(coalesce); - throw coalesce; + throw StorageException.coalesce(t); } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java index 112d19c531..37f662254b 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java @@ -19,6 +19,7 @@ import com.google.api.client.http.HttpStatusCodes; import com.google.api.core.ApiFuture; import com.google.api.core.SettableApiFuture; +import com.google.api.gax.retrying.BasicResultRetryAlgorithm; import com.google.api.gax.retrying.ResultRetryAlgorithm; import com.google.api.gax.rpc.ApiExceptions; import com.google.api.gax.rpc.ServerStreamingCallable; @@ -87,7 +88,18 @@ final class GapicUnbufferedReadableByteChannel this.blobOffset = req.getReadOffset(); this.rclm = rclm; this.retryingDeps = retryingDependencies; - this.alg = alg; + this.alg = + new BasicResultRetryAlgorithm() { + @Override + public boolean shouldRetry( + Throwable previousThrowable, java.lang.Object previousResponse) { + boolean shouldRetry = alg.shouldRetry(previousThrowable, null); + if (!shouldRetry) { + result.setException(previousThrowable); + } + return shouldRetry; + } + }; // The reasoning for 2 elements below allow for a single response and the EOF/error signal // from onComplete or onError. Same thing com.google.api.gax.rpc.QueuingResponseObserver does. this.queue = new SimpleBlockingQueue<>(2); @@ -337,9 +349,6 @@ protected void onErrorImpl(Throwable t) { } if (!open.isDone()) { open.setException(t); - if (!alg.shouldRetry(t, null)) { - result.setException(StorageException.coalesce(t)); - } } try { queue.offer(t); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RpcMethodMappings.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RpcMethodMappings.java index 7db8c2cf18..6caecc5c76 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RpcMethodMappings.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RpcMethodMappings.java @@ -1283,20 +1283,18 @@ private static void get(ArrayList a) { (ctx, c) -> ctx.peek( state -> { - try { - ReadChannel reader = - ctx.getStorage().reader(ctx.getState().getBlob().getBlobId()); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (ReadChannel reader = + ctx.getStorage().reader(ctx.getState().getBlob().getBlobId())) { WritableByteChannel write = Channels.newChannel(baos); ByteStreams.copy(reader, write); - - assertThat(xxd(baos.toByteArray())) - .isEqualTo(xxd(c.getHelloWorldUtf8Bytes())); } catch (IOException e) { if (e.getCause() instanceof BaseServiceException) { throw e.getCause(); } } + assertThat(xxd(baos.toByteArray())) + .isEqualTo(xxd(c.getHelloWorldUtf8Bytes())); })) .build()); a.add( @@ -1305,23 +1303,46 @@ private static void get(ArrayList a) { (ctx, c) -> ctx.peek( state -> { - try { - ReadChannel reader = - ctx.getStorage() - .reader( - ctx.getState().getBlob().getBlobId().getBucket(), - ctx.getState().getBlob().getBlobId().getName()); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (ReadChannel reader = + ctx.getStorage() + .reader( + ctx.getState().getBlob().getBlobId().getBucket(), + ctx.getState().getBlob().getBlobId().getName())) { WritableByteChannel write = Channels.newChannel(baos); ByteStreams.copy(reader, write); + } catch (IOException e) { + if (e.getCause() instanceof BaseServiceException) { + throw e.getCause(); + } + } - assertThat(xxd(baos.toByteArray())) - .isEqualTo(xxd(c.getHelloWorldUtf8Bytes())); + assertThat(xxd(baos.toByteArray())) + .isEqualTo(xxd(c.getHelloWorldUtf8Bytes())); + })) + .build()); + a.add( + RpcMethodMapping.newBuilder(250, objects.get) + .withTest( + (ctx, c) -> + ctx.peek( + state -> { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (ReadChannel reader = + ctx.getStorage() + .reader( + ctx.getState().getBlob().getBlobId(), + BlobSourceOption.shouldReturnRawInputStream(false))) { + WritableByteChannel write = Channels.newChannel(baos); + ByteStreams.copy(reader, write); } catch (IOException e) { if (e.getCause() instanceof BaseServiceException) { throw e.getCause(); } } + + assertThat(xxd(baos.toByteArray())) + .isEqualTo(xxd(c.getHelloWorldUtf8Bytes())); })) .build()); a.add( From d953a174eadbe2ba715650c207a6c9d7e1f0b9d8 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Thu, 12 Dec 2024 17:40:03 +0000 Subject: [PATCH 2/2] null is hard --- .../cloud/storage/ApiaryUnbufferedReadableByteChannel.java | 2 +- .../cloud/storage/GapicUnbufferedReadableByteChannel.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedReadableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedReadableByteChannel.java index 7c1f475553..f61814d4f9 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedReadableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedReadableByteChannel.java @@ -89,7 +89,7 @@ class ApiaryUnbufferedReadableByteChannel implements UnbufferedReadableByteChann @Override public boolean shouldRetry(Throwable previousThrowable, Object previousResponse) { boolean shouldRetry = resultRetryAlgorithm.shouldRetry(previousThrowable, null); - if (!shouldRetry) { + if (previousThrowable != null && !shouldRetry) { result.setException(previousThrowable); } return shouldRetry; diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java index 37f662254b..2efc340c26 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java @@ -94,7 +94,7 @@ final class GapicUnbufferedReadableByteChannel public boolean shouldRetry( Throwable previousThrowable, java.lang.Object previousResponse) { boolean shouldRetry = alg.shouldRetry(previousThrowable, null); - if (!shouldRetry) { + if (previousThrowable != null && !shouldRetry) { result.setException(previousThrowable); } return shouldRetry;