From 29d08d5c9ba165bcac80b68bb90ef5b12f5150e3 Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Fri, 1 Sep 2023 03:50:45 -0700 Subject: [PATCH 01/10] expect100 src change --- .../fs/azurebfs/services/AbfsHttpOperation.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java index 67ac0c31665d6..cab9ff7e34346 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java @@ -22,6 +22,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.HttpURLConnection; +import java.net.ProtocolException; import java.net.URL; import java.util.List; @@ -85,6 +86,7 @@ public class AbfsHttpOperation implements AbfsPerfLoggable { private long sendRequestTimeMs; private long recvResponseTimeMs; private boolean shouldMask = false; + private boolean expect100failureReceived = false; public static AbfsHttpOperation getAbfsHttpOperationWithFixedResult( final URL url, @@ -339,9 +341,17 @@ public void sendRequest(byte[] buffer, int offset, int length) throws IOExceptio the caller. The caller is responsible for setting the correct status code. If expect header is not enabled, we throw back the exception. */ + if (!"Server rejected operation".equals(e.getMessage())) { + throw e; + } String expectHeader = getConnProperty(EXPECT); - if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE)) { - LOG.debug("Getting output stream failed with expect header enabled, returning back ", e); + if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE) + && e instanceof ProtocolException + && !"Server rejected operation".equals(e.getMessage())) { + LOG.debug( + "Getting output stream failed with expect header enabled, returning back ", + e); + expect100failureReceived = true; return; } else { LOG.debug("Getting output stream failed without expect header enabled, throwing exception ", e); @@ -391,6 +401,9 @@ public void processResponse(final byte[] buffer, final int offset, final int len this.statusDescription = getConnResponseMessage(); + if(expect100failureReceived) { + return; + } this.requestId = this.connection.getHeaderField(HttpHeaderConfigurations.X_MS_REQUEST_ID); if (this.requestId == null) { this.requestId = AbfsHttpConstants.EMPTY_STRING; From 4847c6a015e3d7df671358e27b0f911b23475225 Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Sun, 3 Sep 2023 23:29:24 -0700 Subject: [PATCH 02/10] expect100 error msg refactor; test basics; --- .../azurebfs/services/AbfsHttpOperation.java | 2 +- .../azurebfs/services/AbfsOutputStream.java | 9 +++- .../services/ITestAbfsOutputStream.java | 46 +++++++++++++++++++ 3 files changed, 54 insertions(+), 3 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java index cab9ff7e34346..be086908841e2 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java @@ -347,7 +347,7 @@ public void sendRequest(byte[] buffer, int offset, int length) throws IOExceptio String expectHeader = getConnProperty(EXPECT); if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE) && e instanceof ProtocolException - && !"Server rejected operation".equals(e.getMessage())) { + && "Server rejected operation".equals(e.getMessage())) { LOG.debug( "Getting output stream failed with expect header enabled, returning back ", e); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 4268dc3f918a1..55b7f003f0564 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -336,7 +336,7 @@ private void uploadBlockAsync(DataBlocks.DataBlock blockToUpload, AppendRequestParameters reqParams = new AppendRequestParameters( offset, 0, bytesLength, mode, false, leaseId, isExpectHeaderEnabled); AbfsRestOperation op = - client.append(path, blockUploadData.toByteArray(), reqParams, + getClient().append(path, blockUploadData.toByteArray(), reqParams, cachedSasToken.get(), new TracingContext(tracingContext)); cachedSasToken.update(op.getSasToken()); perfInfo.registerResult(op.getResult()); @@ -648,7 +648,7 @@ private synchronized void flushWrittenBytesToServiceInternal(final long offset, AbfsPerfTracker tracker = client.getAbfsPerfTracker(); try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "flushWrittenBytesToServiceInternal", "flush")) { - AbfsRestOperation op = client.flush(path, offset, retainUncommitedData, isClose, + AbfsRestOperation op = getClient().flush(path, offset, retainUncommitedData, isClose, cachedSasToken.get(), leaseId, new TracingContext(tracingContext)); cachedSasToken.update(op.getSasToken()); perfInfo.registerResult(op.getResult()).registerSuccess(true); @@ -787,4 +787,9 @@ BackReference getFsBackRef() { ListeningExecutorService getExecutorService() { return executorService; } + + @VisibleForTesting + AbfsClient getClient() { + return client; + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java index eee0c177c33b3..b5b21b3cfc51c 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java @@ -18,15 +18,19 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.net.URL; import org.assertj.core.api.Assertions; import org.junit.Test; +import org.mockito.Mockito; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; @@ -34,6 +38,8 @@ import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; import org.apache.hadoop.test.LambdaTestUtils; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED; + /** * Test create operation. */ @@ -148,6 +154,46 @@ public void testAbfsOutputStreamClosingFsBeforeStream() } } + @Test + public void testExpect100ContinueFailureInAppend() throws Exception { + Configuration configuration = new Configuration(getRawConfiguration()); + configuration.set(FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED, "true"); + AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(configuration); + Path path = new Path("/testFile"); + AbfsOutputStream os = Mockito.spy((AbfsOutputStream) fs.create(path).getWrappedStream()); + AbfsClient spiedClient = Mockito.spy(os.getClient()); + AbfsHttpOperation[] httpOpForAppendTest = new AbfsHttpOperation[1]; + readyMocksForAppendTest(httpOpForAppendTest, spiedClient); + Mockito.doReturn(spiedClient).when(os).getClient(); + fs.delete(path, true); + os.write(1); + LambdaTestUtils.intercept(FileNotFoundException.class, () -> { + os.close(); + }); + + } + + private void readyMocksForAppendTest(final AbfsHttpOperation[] httpOpForAppendTest, + final AbfsClient spiedClient) { + Mockito.doAnswer(abfsRestOpAppendGetInvocation -> { + AbfsRestOperation op = Mockito.spy( + (AbfsRestOperation) abfsRestOpAppendGetInvocation.callRealMethod()); + if (httpOpForAppendTest[0] == null) { + Mockito.doAnswer(createHttpOpInvocation -> { + httpOpForAppendTest[0] = Mockito.spy( + (AbfsHttpOperation) createHttpOpInvocation.callRealMethod()); + return httpOpForAppendTest[0]; + }).when(op).createHttpOperation(); + } + return op; + }) + .when(spiedClient) + .getAbfsRestOperationForAppend(Mockito.any(AbfsRestOperationType.class), + Mockito.anyString(), Mockito.any( + URL.class), Mockito.anyList(), Mockito.any(byte[].class), + Mockito.anyInt(), Mockito.anyInt(), Mockito.nullable(String.class)); + } + /** * Separate method to create an outputStream using a local FS instance so * that once this method has returned, the FS instance can be eligible for GC. From 11e4c2d78b8e66327793727e42466f94a329f45b Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Mon, 4 Sep 2023 00:04:52 -0700 Subject: [PATCH 03/10] method readiness for test in abfsHttpOp; improve the expect100Test --- .../azurebfs/services/AbfsHttpOperation.java | 26 ++++++++++++-- .../services/ITestAbfsOutputStream.java | 34 +++++++++++++------ 2 files changed, 47 insertions(+), 13 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java index be086908841e2..96e8fc3509e89 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java @@ -29,6 +29,7 @@ import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLSocketFactory; +import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.azurebfs.utils.UriUtils; import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; @@ -401,9 +402,23 @@ public void processResponse(final byte[] buffer, final int offset, final int len this.statusDescription = getConnResponseMessage(); - if(expect100failureReceived) { + /* + * In case expect-100 assertion has failed, headers and inputStream should not + * be parsed. Reason being, conn.getHeaderField(), conn.getHeaderFields(), + * conn.getInputStream() will lead to repeated server call. + * ref: https://bugs.openjdk.org/browse/JDK-8314978 + */ + if (expect100failureReceived) { + LOG.debug("Expect-100 assertion has failed and hence not parsing headers" + + "and inputStream."); return; } + processConnHeadersAndInputStreams(buffer, offset, length); + } + + void processConnHeadersAndInputStreams(final byte[] buffer, + final int offset, + final int length) throws IOException { this.requestId = this.connection.getHeaderField(HttpHeaderConfigurations.X_MS_REQUEST_ID); if (this.requestId == null) { this.requestId = AbfsHttpConstants.EMPTY_STRING; @@ -417,6 +432,7 @@ public void processResponse(final byte[] buffer, final int offset, final int len return; } + long startTime = 0; if (this.isTraceEnabled) { startTime = System.nanoTime(); } @@ -444,7 +460,8 @@ public void processResponse(final byte[] buffer, final int offset, final int len } else { if (buffer != null) { while (totalBytesRead < length) { - int bytesRead = stream.read(buffer, offset + totalBytesRead, length - totalBytesRead); + int bytesRead = stream.read(buffer, offset + totalBytesRead, length + - totalBytesRead); if (bytesRead == -1) { endOfStream = true; break; @@ -647,6 +664,11 @@ String getConnResponseMessage() throws IOException { return connection.getResponseMessage(); } + @VisibleForTesting + Boolean getExpect100failureReceived() { + return expect100failureReceived; + } + public static class AbfsHttpOperationWithFixedResult extends AbfsHttpOperation { /** * Creates an instance to represent fixed results. diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java index b5b21b3cfc51c..db526e01b7d65 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java @@ -158,11 +158,13 @@ public void testAbfsOutputStreamClosingFsBeforeStream() public void testExpect100ContinueFailureInAppend() throws Exception { Configuration configuration = new Configuration(getRawConfiguration()); configuration.set(FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED, "true"); - AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(configuration); + AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance( + configuration); Path path = new Path("/testFile"); - AbfsOutputStream os = Mockito.spy((AbfsOutputStream) fs.create(path).getWrappedStream()); + AbfsOutputStream os = Mockito.spy( + (AbfsOutputStream) fs.create(path).getWrappedStream()); AbfsClient spiedClient = Mockito.spy(os.getClient()); - AbfsHttpOperation[] httpOpForAppendTest = new AbfsHttpOperation[1]; + AbfsHttpOperation[] httpOpForAppendTest = new AbfsHttpOperation[2]; readyMocksForAppendTest(httpOpForAppendTest, spiedClient); Mockito.doReturn(spiedClient).when(os).getClient(); fs.delete(path, true); @@ -170,21 +172,31 @@ public void testExpect100ContinueFailureInAppend() throws Exception { LambdaTestUtils.intercept(FileNotFoundException.class, () -> { os.close(); }); - + Assertions.assertThat(httpOpForAppendTest[0].getExpect100failureReceived()) + .isTrue(); + Mockito.verify(httpOpForAppendTest[0], Mockito.times(0)) + .processConnHeadersAndInputStreams(Mockito.any(byte[].class), + Mockito.anyInt(), Mockito.anyInt()); + + Assertions.assertThat(httpOpForAppendTest[1].getExpect100failureReceived()) + .isFalse(); + Mockito.verify(httpOpForAppendTest[1], Mockito.times(1)) + .processConnHeadersAndInputStreams(Mockito.any(byte[].class), + Mockito.anyInt(), Mockito.anyInt()); } private void readyMocksForAppendTest(final AbfsHttpOperation[] httpOpForAppendTest, final AbfsClient spiedClient) { + int[] index = new int[1]; + index[0] = 0; Mockito.doAnswer(abfsRestOpAppendGetInvocation -> { AbfsRestOperation op = Mockito.spy( (AbfsRestOperation) abfsRestOpAppendGetInvocation.callRealMethod()); - if (httpOpForAppendTest[0] == null) { - Mockito.doAnswer(createHttpOpInvocation -> { - httpOpForAppendTest[0] = Mockito.spy( - (AbfsHttpOperation) createHttpOpInvocation.callRealMethod()); - return httpOpForAppendTest[0]; - }).when(op).createHttpOperation(); - } + Mockito.doAnswer(createHttpOpInvocation -> { + httpOpForAppendTest[index[0]] = Mockito.spy( + (AbfsHttpOperation) createHttpOpInvocation.callRealMethod()); + return httpOpForAppendTest[index[0]++]; + }).when(op).createHttpOperation(); return op; }) .when(spiedClient) From afb586067d9408e3b1e38ed87cf14e09932eb164 Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Mon, 4 Sep 2023 05:16:13 -0700 Subject: [PATCH 04/10] EXPECT_100_JDK_ERROR error message --- .../apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java | 1 + .../apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java | 3 ++- .../apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java | 3 ++- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java index 7e4ddfa675a4c..d68164ec80c31 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java @@ -69,6 +69,7 @@ public final class AbfsHttpConstants { * and should qualify for retry. */ public static final int HTTP_CONTINUE = 100; + public static final String EXPECT_100_JDK_ERROR = "Server rejected operation"; // Abfs generic constants public static final String SINGLE_WHITE_SPACE = " "; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java index 96e8fc3509e89..4f1efdb916d1d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java @@ -45,6 +45,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable; import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT; @@ -348,7 +349,7 @@ public void sendRequest(byte[] buffer, int offset, int length) throws IOExceptio String expectHeader = getConnProperty(EXPECT); if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE) && e instanceof ProtocolException - && "Server rejected operation".equals(e.getMessage())) { + && EXPECT_100_JDK_ERROR.equals(e.getMessage())) { LOG.debug( "Getting output stream failed with expect header enabled, returning back ", e); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java index 18d1e3917f24e..3d42c834ccb62 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java @@ -48,6 +48,7 @@ import static java.net.HttpURLConnection.HTTP_NOT_FOUND; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE; @@ -580,7 +581,7 @@ public void testExpectHundredContinue() throws Exception { .getConnResponseMessage(); // Make the getOutputStream throw IOException to see it returns from the sendRequest correctly. - Mockito.doThrow(new ProtocolException("Server rejected Operation")) + Mockito.doThrow(new ProtocolException(EXPECT_100_JDK_ERROR)) .when(abfsHttpOperation) .getConnOutputStream(); From 2835269815ddcbaf326ed1348fe089b7329a7e3d Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Mon, 4 Sep 2023 20:16:17 -0700 Subject: [PATCH 05/10] JDK protocol exception message in constant --- .../hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java index 6ffe2e2773bbf..1532e74ac10d1 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java @@ -49,6 +49,7 @@ import static java.net.HttpURLConnection.HTTP_OK; import static java.net.HttpURLConnection.HTTP_UNAVAILABLE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE; @@ -232,7 +233,7 @@ private AbfsRestOperation getRestOperation() throws Exception { Mockito.doReturn(responseMessage) .when(abfsHttpOperation) .getConnResponseMessage(); - Mockito.doThrow(new ProtocolException("Server rejected Operation")) + Mockito.doThrow(new ProtocolException(EXPECT_100_JDK_ERROR)) .when(abfsHttpOperation) .getConnOutputStream(); break; From b15468f26f07acc70fb58e115d303ad9b95b0e99 Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Tue, 5 Sep 2023 03:18:01 -0700 Subject: [PATCH 06/10] remove redundant check --- .../apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java index 4f1efdb916d1d..f35e0754d10ba 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java @@ -343,9 +343,6 @@ public void sendRequest(byte[] buffer, int offset, int length) throws IOExceptio the caller. The caller is responsible for setting the correct status code. If expect header is not enabled, we throw back the exception. */ - if (!"Server rejected operation".equals(e.getMessage())) { - throw e; - } String expectHeader = getConnProperty(EXPECT); if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE) && e instanceof ProtocolException From ef871b316cbbcc2fd721e1f3b32cfa29b85832a0 Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Wed, 6 Sep 2023 02:43:44 -0700 Subject: [PATCH 07/10] review comment refactor. --- .../fs/azurebfs/services/AbfsHttpOperation.java | 4 +--- .../fs/azurebfs/services/ITestAbfsOutputStream.java | 11 +++++++---- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java index f35e0754d10ba..86c42c2755266 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java @@ -347,9 +347,7 @@ public void sendRequest(byte[] buffer, int offset, int length) throws IOExceptio if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE) && e instanceof ProtocolException && EXPECT_100_JDK_ERROR.equals(e.getMessage())) { - LOG.debug( - "Getting output stream failed with expect header enabled, returning back ", - e); + LOG.debug("Getting output stream failed with expect header enabled, returning back ", e); expect100failureReceived = true; return; } else { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java index db526e01b7d65..f1164035b0eb3 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java @@ -165,7 +165,7 @@ public void testExpect100ContinueFailureInAppend() throws Exception { (AbfsOutputStream) fs.create(path).getWrappedStream()); AbfsClient spiedClient = Mockito.spy(os.getClient()); AbfsHttpOperation[] httpOpForAppendTest = new AbfsHttpOperation[2]; - readyMocksForAppendTest(httpOpForAppendTest, spiedClient); + mockSetupForAppend(httpOpForAppendTest, spiedClient); Mockito.doReturn(spiedClient).when(os).getClient(); fs.delete(path, true); os.write(1); @@ -173,19 +173,22 @@ public void testExpect100ContinueFailureInAppend() throws Exception { os.close(); }); Assertions.assertThat(httpOpForAppendTest[0].getExpect100failureReceived()) - .isTrue(); + .describedAs("First try from AbfsClient will have expect-100 " + + "header and should fail with expect-100 error.").isTrue(); Mockito.verify(httpOpForAppendTest[0], Mockito.times(0)) .processConnHeadersAndInputStreams(Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt()); Assertions.assertThat(httpOpForAppendTest[1].getExpect100failureReceived()) - .isFalse(); + .describedAs("The retried operation from AbfsClient should not " + + "fail with expect-100 error. The retried operation does not have" + + "expect-100 header.").isFalse(); Mockito.verify(httpOpForAppendTest[1], Mockito.times(1)) .processConnHeadersAndInputStreams(Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt()); } - private void readyMocksForAppendTest(final AbfsHttpOperation[] httpOpForAppendTest, + private void mockSetupForAppend(final AbfsHttpOperation[] httpOpForAppendTest, final AbfsClient spiedClient) { int[] index = new int[1]; index[0] = 0; From 01d509ebf14fe26dacd5ce7d6911539b4257067a Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Thu, 7 Sep 2023 02:48:58 -0700 Subject: [PATCH 08/10] connectionDisconnectedOnError to be enabled for any getOutputStream IOException --- .../azurebfs/services/AbfsHttpOperation.java | 53 ++++++++++--------- .../services/ITestAbfsOutputStream.java | 4 +- 2 files changed, 29 insertions(+), 28 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java index 86c42c2755266..9317384ebb8d1 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java @@ -88,7 +88,7 @@ public class AbfsHttpOperation implements AbfsPerfLoggable { private long sendRequestTimeMs; private long recvResponseTimeMs; private boolean shouldMask = false; - private boolean expect100failureReceived = false; + private boolean connectionDisconnectedOnError = false; public static AbfsHttpOperation getAbfsHttpOperationWithFixedResult( final URL url, @@ -338,17 +338,26 @@ public void sendRequest(byte[] buffer, int offset, int length) throws IOExceptio */ outputStream = getConnOutputStream(); } catch (IOException e) { - /* If getOutputStream fails with an exception and expect header - is enabled, we return back without throwing an exception to - the caller. The caller is responsible for setting the correct status code. - If expect header is not enabled, we throw back the exception. + connectionDisconnectedOnError = true; + /* If getOutputStream fails with an expect-100 exception , we return back + without throwing an exception to the caller. Else, we throw back the exception. */ String expectHeader = getConnProperty(EXPECT); if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE) && e instanceof ProtocolException && EXPECT_100_JDK_ERROR.equals(e.getMessage())) { LOG.debug("Getting output stream failed with expect header enabled, returning back ", e); - expect100failureReceived = true; + /* + * In case expect-100 assertion has failed, headers and inputStream should not + * be parsed. Reason being, conn.getHeaderField(), conn.getHeaderFields(), + * conn.getInputStream() will lead to repeated server call. + * ref: https://bugs.openjdk.org/browse/JDK-8314978. + * Reading conn.responseCode() and conn.getResponseMessage() is safe in + * case of Expect-100 error. Reason being, in JDK, it stores the responseCode + * in the HttpUrlConnection object before throwing exception to the caller. + */ + this.statusCode = getConnResponseCode(); + this.statusDescription = getConnResponseMessage(); return; } else { LOG.debug("Getting output stream failed without expect header enabled, throwing exception ", e); @@ -383,7 +392,17 @@ public void sendRequest(byte[] buffer, int offset, int length) throws IOExceptio * @throws IOException if an error occurs. */ public void processResponse(final byte[] buffer, final int offset, final int length) throws IOException { + if (connectionDisconnectedOnError) { + LOG.debug("This connection was not successful or has been disconnected, " + + "hence not parsing headers and inputStream"); + return; + } + processConnHeadersAndInputStreams(buffer, offset, length); + } + void processConnHeadersAndInputStreams(final byte[] buffer, + final int offset, + final int length) throws IOException { // get the response long startTime = 0; if (this.isTraceEnabled) { @@ -398,23 +417,6 @@ public void processResponse(final byte[] buffer, final int offset, final int len this.statusDescription = getConnResponseMessage(); - /* - * In case expect-100 assertion has failed, headers and inputStream should not - * be parsed. Reason being, conn.getHeaderField(), conn.getHeaderFields(), - * conn.getInputStream() will lead to repeated server call. - * ref: https://bugs.openjdk.org/browse/JDK-8314978 - */ - if (expect100failureReceived) { - LOG.debug("Expect-100 assertion has failed and hence not parsing headers" - + "and inputStream."); - return; - } - processConnHeadersAndInputStreams(buffer, offset, length); - } - - void processConnHeadersAndInputStreams(final byte[] buffer, - final int offset, - final int length) throws IOException { this.requestId = this.connection.getHeaderField(HttpHeaderConfigurations.X_MS_REQUEST_ID); if (this.requestId == null) { this.requestId = AbfsHttpConstants.EMPTY_STRING; @@ -428,7 +430,6 @@ void processConnHeadersAndInputStreams(final byte[] buffer, return; } - long startTime = 0; if (this.isTraceEnabled) { startTime = System.nanoTime(); } @@ -661,8 +662,8 @@ String getConnResponseMessage() throws IOException { } @VisibleForTesting - Boolean getExpect100failureReceived() { - return expect100failureReceived; + Boolean getConnectionDisconnectedOnError() { + return connectionDisconnectedOnError; } public static class AbfsHttpOperationWithFixedResult extends AbfsHttpOperation { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java index f1164035b0eb3..de804245da7c3 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java @@ -172,14 +172,14 @@ public void testExpect100ContinueFailureInAppend() throws Exception { LambdaTestUtils.intercept(FileNotFoundException.class, () -> { os.close(); }); - Assertions.assertThat(httpOpForAppendTest[0].getExpect100failureReceived()) + Assertions.assertThat(httpOpForAppendTest[0].getConnectionDisconnectedOnError()) .describedAs("First try from AbfsClient will have expect-100 " + "header and should fail with expect-100 error.").isTrue(); Mockito.verify(httpOpForAppendTest[0], Mockito.times(0)) .processConnHeadersAndInputStreams(Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt()); - Assertions.assertThat(httpOpForAppendTest[1].getExpect100failureReceived()) + Assertions.assertThat(httpOpForAppendTest[1].getConnectionDisconnectedOnError()) .describedAs("The retried operation from AbfsClient should not " + "fail with expect-100 error. The retried operation does not have" + "expect-100 header.").isFalse(); From 75c722adfa86288f813be37c8fd2760d017da2c3 Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Thu, 7 Sep 2023 02:50:45 -0700 Subject: [PATCH 09/10] refactor undo --- .../apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java index 9317384ebb8d1..9e07a0520cd1c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java @@ -457,8 +457,7 @@ void processConnHeadersAndInputStreams(final byte[] buffer, } else { if (buffer != null) { while (totalBytesRead < length) { - int bytesRead = stream.read(buffer, offset + totalBytesRead, length - - totalBytesRead); + int bytesRead = stream.read(buffer, offset + totalBytesRead, length - totalBytesRead); if (bytesRead == -1) { endOfStream = true; break; From 01cc8ebf12cc95e0cfc87e76fc6bbf746e0fa266 Mon Sep 17 00:00:00 2001 From: Pranav Saxena Date: Thu, 4 Jan 2024 03:44:43 -0800 Subject: [PATCH 10/10] compile error fix. --- .../apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java | 4 ++-- .../hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index a30a011e9a855..74657c718a1b6 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -338,7 +338,7 @@ private void uploadBlockAsync(DataBlocks.DataBlock blockToUpload, */ AppendRequestParameters reqParams = new AppendRequestParameters( offset, 0, bytesLength, mode, false, leaseId, isExpectHeaderEnabled); - AbfsRestOperation op = client.append(path, + AbfsRestOperation op = getClient().append(path, blockUploadData.toByteArray(), reqParams, cachedSasToken.get(), contextEncryptionAdapter, new TracingContext(tracingContext)); cachedSasToken.update(op.getSasToken()); @@ -655,7 +655,7 @@ private synchronized void flushWrittenBytesToServiceInternal(final long offset, AbfsPerfTracker tracker = client.getAbfsPerfTracker(); try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "flushWrittenBytesToServiceInternal", "flush")) { - AbfsRestOperation op = client.flush(path, offset, retainUncommitedData, + AbfsRestOperation op = getClient().flush(path, offset, retainUncommitedData, isClose, cachedSasToken.get(), leaseId, contextEncryptionAdapter, new TracingContext(tracingContext)); cachedSasToken.update(op.getSasToken()); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java index de804245da7c3..359846ce14dae 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java @@ -203,7 +203,7 @@ private void mockSetupForAppend(final AbfsHttpOperation[] httpOpForAppendTest, return op; }) .when(spiedClient) - .getAbfsRestOperationForAppend(Mockito.any(AbfsRestOperationType.class), + .getAbfsRestOperation(Mockito.any(AbfsRestOperationType.class), Mockito.anyString(), Mockito.any( URL.class), Mockito.anyList(), Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt(), Mockito.nullable(String.class));