diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedReadableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedReadableByteChannel.java index a5dd1375e0..f61814d4f9 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedReadableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedReadableByteChannel.java @@ -23,12 +23,12 @@ import com.google.api.client.http.HttpResponse; import com.google.api.client.http.HttpResponseException; import com.google.api.core.SettableApiFuture; +import com.google.api.gax.retrying.BasicResultRetryAlgorithm; import com.google.api.gax.retrying.ResultRetryAlgorithm; import com.google.api.services.storage.Storage; import com.google.api.services.storage.Storage.Objects; import com.google.api.services.storage.Storage.Objects.Get; import com.google.api.services.storage.model.StorageObject; -import com.google.cloud.BaseServiceException; import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel; import com.google.cloud.storage.spi.v1.StorageRpc; import com.google.common.annotations.VisibleForTesting; @@ -84,7 +84,17 @@ class ApiaryUnbufferedReadableByteChannel implements UnbufferedReadableByteChann this.storage = storage; this.result = result; this.options = options; - this.resultRetryAlgorithm = resultRetryAlgorithm; + this.resultRetryAlgorithm = + new BasicResultRetryAlgorithm() { + @Override + public boolean shouldRetry(Throwable previousThrowable, Object previousResponse) { + boolean shouldRetry = resultRetryAlgorithm.shouldRetry(previousThrowable, null); + if (previousThrowable != null && !shouldRetry) { + result.setException(previousThrowable); + } + return shouldRetry; + } + }; this.open = true; this.returnEOF = false; this.position = apiaryReadRequest.getByteRangeSpec().beginOffset(); @@ -210,17 +220,11 @@ private ScatteringByteChannel open() { throw new StorageException(404, "Failure while trying to resume download", e); } } - StorageException translate = StorageException.translate(e); - result.setException(translate); - throw translate; + throw StorageException.translate(e); } catch (IOException e) { - StorageException translate = StorageException.translate(e); - result.setException(translate); - throw translate; + throw StorageException.translate(e); } catch (Throwable t) { - BaseServiceException coalesce = StorageException.coalesce(t); - result.setException(coalesce); - throw coalesce; + throw StorageException.coalesce(t); } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java index 112d19c531..2efc340c26 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java @@ -19,6 +19,7 @@ import com.google.api.client.http.HttpStatusCodes; import com.google.api.core.ApiFuture; import com.google.api.core.SettableApiFuture; +import com.google.api.gax.retrying.BasicResultRetryAlgorithm; import com.google.api.gax.retrying.ResultRetryAlgorithm; import com.google.api.gax.rpc.ApiExceptions; import com.google.api.gax.rpc.ServerStreamingCallable; @@ -87,7 +88,18 @@ final class GapicUnbufferedReadableByteChannel this.blobOffset = req.getReadOffset(); this.rclm = rclm; this.retryingDeps = retryingDependencies; - this.alg = alg; + this.alg = + new BasicResultRetryAlgorithm() { + @Override + public boolean shouldRetry( + Throwable previousThrowable, java.lang.Object previousResponse) { + boolean shouldRetry = alg.shouldRetry(previousThrowable, null); + if (previousThrowable != null && !shouldRetry) { + result.setException(previousThrowable); + } + return shouldRetry; + } + }; // The reasoning for 2 elements below allow for a single response and the EOF/error signal // from onComplete or onError. Same thing com.google.api.gax.rpc.QueuingResponseObserver does. this.queue = new SimpleBlockingQueue<>(2); @@ -337,9 +349,6 @@ protected void onErrorImpl(Throwable t) { } if (!open.isDone()) { open.setException(t); - if (!alg.shouldRetry(t, null)) { - result.setException(StorageException.coalesce(t)); - } } try { queue.offer(t); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RpcMethodMappings.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RpcMethodMappings.java index 7db8c2cf18..6caecc5c76 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RpcMethodMappings.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RpcMethodMappings.java @@ -1283,20 +1283,18 @@ private static void get(ArrayList a) { (ctx, c) -> ctx.peek( state -> { - try { - ReadChannel reader = - ctx.getStorage().reader(ctx.getState().getBlob().getBlobId()); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (ReadChannel reader = + ctx.getStorage().reader(ctx.getState().getBlob().getBlobId())) { WritableByteChannel write = Channels.newChannel(baos); ByteStreams.copy(reader, write); - - assertThat(xxd(baos.toByteArray())) - .isEqualTo(xxd(c.getHelloWorldUtf8Bytes())); } catch (IOException e) { if (e.getCause() instanceof BaseServiceException) { throw e.getCause(); } } + assertThat(xxd(baos.toByteArray())) + .isEqualTo(xxd(c.getHelloWorldUtf8Bytes())); })) .build()); a.add( @@ -1305,23 +1303,46 @@ private static void get(ArrayList a) { (ctx, c) -> ctx.peek( state -> { - try { - ReadChannel reader = - ctx.getStorage() - .reader( - ctx.getState().getBlob().getBlobId().getBucket(), - ctx.getState().getBlob().getBlobId().getName()); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (ReadChannel reader = + ctx.getStorage() + .reader( + ctx.getState().getBlob().getBlobId().getBucket(), + ctx.getState().getBlob().getBlobId().getName())) { WritableByteChannel write = Channels.newChannel(baos); ByteStreams.copy(reader, write); + } catch (IOException e) { + if (e.getCause() instanceof BaseServiceException) { + throw e.getCause(); + } + } - assertThat(xxd(baos.toByteArray())) - .isEqualTo(xxd(c.getHelloWorldUtf8Bytes())); + assertThat(xxd(baos.toByteArray())) + .isEqualTo(xxd(c.getHelloWorldUtf8Bytes())); + })) + .build()); + a.add( + RpcMethodMapping.newBuilder(250, objects.get) + .withTest( + (ctx, c) -> + ctx.peek( + state -> { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (ReadChannel reader = + ctx.getStorage() + .reader( + ctx.getState().getBlob().getBlobId(), + BlobSourceOption.shouldReturnRawInputStream(false))) { + WritableByteChannel write = Channels.newChannel(baos); + ByteStreams.copy(reader, write); } catch (IOException e) { if (e.getCause() instanceof BaseServiceException) { throw e.getCause(); } } + + assertThat(xxd(baos.toByteArray())) + .isEqualTo(xxd(c.getHelloWorldUtf8Bytes())); })) .build()); a.add(