From 2d89185b0f9eedd2404afea681c3b3df43301f58 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 18 Feb 2020 09:59:08 +0100 Subject: [PATCH] Fix Failure to Drain Stream in GCS Repo Tests (#52431) Same as #51933 but for the custom handler just used in this test. Closes #52430 --- ...eCloudStorageBlobContainerRetriesTests.java | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java index aa8583c3d5ada..1d72e9c36c395 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.Streams; @@ -52,7 +53,6 @@ import org.junit.Before; import org.threeten.bp.Duration; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.InetAddress; @@ -323,6 +323,8 @@ public void testWriteLargeBlob() throws IOException { logger.debug("starting with resumable upload id [{}]", sessionUploadId.get()); httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> { + final BytesReference requestBody = Streams.readFully(exchange.getRequestBody()); + final Map params = new HashMap<>(); RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params); assertThat(params.get("uploadType"), equalTo("resumable")); @@ -330,7 +332,7 @@ public void testWriteLargeBlob() throws IOException { if ("POST".equals(exchange.getRequestMethod())) { assertThat(params.get("name"), equalTo("write_large_blob")); if (countInits.decrementAndGet() <= 0) { - byte[] response = Streams.readFully(exchange.getRequestBody()).utf8ToString().getBytes(UTF_8); + byte[] response = requestBody.utf8ToString().getBytes(UTF_8); exchange.getResponseHeaders().add("Content-Type", "application/json"); exchange.getResponseHeaders().add("Location", httpServerUrl() + "/upload/storage/v1/b/bucket/o?uploadType=resumable&upload_id=" + sessionUploadId.get()); @@ -348,7 +350,6 @@ public void testWriteLargeBlob() throws IOException { if (uploadId.equals(sessionUploadId.get()) == false) { logger.debug("session id [{}] is gone", uploadId); assertThat(wrongChunk, greaterThan(0)); - Streams.readFully(exchange.getRequestBody()); exchange.sendResponseHeaders(HttpStatus.SC_GONE, -1); return; } @@ -367,7 +368,6 @@ public void testWriteLargeBlob() throws IOException { countInits.set(nbErrors); countUploads.set(nbErrors * totalChunks); - Streams.readFully(exchange.getRequestBody()); exchange.sendResponseHeaders(HttpStatus.SC_GONE, -1); return; } @@ -377,14 +377,12 @@ public void testWriteLargeBlob() throws IOException { assertTrue(Strings.hasLength(range)); if (countUploads.decrementAndGet() % 2 == 0) { - final ByteArrayOutputStream requestBody = new ByteArrayOutputStream(); - final long bytesRead = Streams.copy(exchange.getRequestBody(), requestBody); - assertThat(Math.toIntExact(bytesRead), anyOf(equalTo(defaultChunkSize), equalTo(lastChunkSize))); + assertThat(Math.toIntExact(requestBody.length()), anyOf(equalTo(defaultChunkSize), equalTo(lastChunkSize))); final int rangeStart = getContentRangeStart(range); final int rangeEnd = getContentRangeEnd(range); - assertThat(rangeEnd + 1 - rangeStart, equalTo(Math.toIntExact(bytesRead))); - assertArrayEquals(Arrays.copyOfRange(data, rangeStart, rangeEnd + 1), requestBody.toByteArray()); + assertThat(rangeEnd + 1 - rangeStart, equalTo(Math.toIntExact(requestBody.length()))); + assertThat(new BytesArray(data, rangeStart, rangeEnd - rangeStart + 1), is(requestBody)); final Integer limit = getContentRangeLimit(range); if (limit != null) { @@ -399,8 +397,6 @@ public void testWriteLargeBlob() throws IOException { } } - // read all the request body, otherwise the SDK client throws a non-retryable StorageException - Streams.readFully(exchange.getRequestBody()); if (randomBoolean()) { exchange.sendResponseHeaders(HttpStatus.SC_INTERNAL_SERVER_ERROR, -1); }