Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public final class AbfsHttpConstants {
* and should qualify for retry.
*/
public static final int HTTP_CONTINUE = 100;
public static final String EXPECT_100_JDK_ERROR = "Server rejected operation";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this error string be returned by server only for this exception ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wont the JDK bug were redundant connections are attempted be an issue for any exception case in getOutputStream() ? Shouldn't the case to handle be irrespective of explicit server rejected request case ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For any IOException thrown by getOutputStream, no headers / inputStream will be parsed. The flow of code is such that for IOException other expect-100 error, exception will be thrown back to AbfsRestOperation which will retry again as per retry-policy.


// Abfs generic constants
public static final String SINGLE_WHITE_SPACE = " ";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.ProtocolException;
import java.net.URL;
import java.util.List;

import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLSocketFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;

Expand All @@ -43,6 +45,7 @@
import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable;
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;

Expand Down Expand Up @@ -83,6 +86,7 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
private long sendRequestTimeMs;
private long recvResponseTimeMs;
private boolean shouldMask = false;
private boolean connectionDisconnectedOnError = false;

public static AbfsHttpOperation getAbfsHttpOperationWithFixedResult(
final URL url,
Expand Down Expand Up @@ -324,14 +328,26 @@ public void sendRequest(byte[] buffer, int offset, int length) throws IOExceptio
*/
outputStream = getConnOutputStream();
} catch (IOException e) {
/* If getOutputStream fails with an exception and expect header
is enabled, we return back without throwing an exception to
the caller. The caller is responsible for setting the correct status code.
If expect header is not enabled, we throw back the exception.
connectionDisconnectedOnError = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setting this field here and using in processResponse() means that we won't be processing response for any IOException. But isn't the intent to not process only in case of JDK error?
So shouldn't this go inside the if (EXPECT_100_JDK_ERROR.equals(e.getMessage().......) check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to prevent any further API called on the httpUrlConnection if it throws IOException for the reason shared in https://github.com/apache/hadoop/pull/6022/files#r1444240890.

/* If getOutputStream fails with an expect-100 exception , we return back
without throwing an exception to the caller. Else, we throw back the exception.
*/
String expectHeader = getConnProperty(EXPECT);
if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE)) {
if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE)
&& e instanceof ProtocolException
&& EXPECT_100_JDK_ERROR.equals(e.getMessage())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in an earlier comment, the HttpUrlConnection that we hold currently is disconnected while inside this catch block.

Do we want to prevent later API calls that trigger connections irrespective of throttled failures ? If so, setting the status of connectionDisconnectedOnError should be at the start of catch block ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taken. The setting of connectionDisconnectedOnError in starting of the catch block.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the question by @snvijaya is Do we want to prevent later API calls that trigger connections irrespective of any failures?
If yes then why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At httpUrlConnection.getOutputStream, either the error could IOException(including ConnectionTimeout and ReadTimeout) or expect-100 error (this raises ProtocolException which is child of IOException). Server errors if any would be caught in processResponse and the treatment would be same as done with all other apis (analyse if needed to be retried and then RestOperation would retry it).

In the JDK's implementation of getOutputStream, For the IOExceptions, the connection is killed. So, if further APIs are let go ahead, they would be firing a new server call all together. So, other APIs, like getHeaderField() etc, would be returning the data as per the new server call which is undesirable.

Also, the implementation of httpUrlConnection is such that the other APIs (like getHeaderField()), would internally call getInputStream(), which would would first call getOutputStream() (if the sendData flag is true and doesnt hold strOutputStream object). Now, here two things can happen:

  1. Expect100 failure: no data capture, and again any next API on the httpUrlConnection would fire a new call.
  2. Status-100 : Now, it is not in the block where data can be put in the outputStream, the stream shall be closed which will raise IOException, and from here it will go back to retry loop. Ref: https://github.com/openjdk/jdk8/blob/master/jdk/src/share/classes/sun/net/www/protocol/http/HttpURLConnection.java#L1463-L1471

Hence, any further API is prevented on the HttpUrlConnection object which has got an IOException in getOutputStream.

LOG.debug("Getting output stream failed with expect header enabled, returning back ", e);
/*
* In case expect-100 assertion has failed, headers and inputStream should not
* be parsed. Reason being, conn.getHeaderField(), conn.getHeaderFields(),
* conn.getInputStream() will lead to repeated server call.
* ref: https://bugs.openjdk.org/browse/JDK-8314978.
* Reading conn.responseCode() and conn.getResponseMessage() is safe in
* case of Expect-100 error. Reason being, in JDK, it stores the responseCode
* in the HttpUrlConnection object before throwing exception to the caller.
*/
this.statusCode = getConnResponseCode();
this.statusDescription = getConnResponseMessage();
return;
} else {
LOG.debug("Getting output stream failed without expect header enabled, throwing exception ", e);
Expand Down Expand Up @@ -364,7 +380,17 @@ public void sendRequest(byte[] buffer, int offset, int length) throws IOExceptio
* @throws IOException if an error occurs.
*/
public void processResponse(final byte[] buffer, final int offset, final int length) throws IOException {
if (connectionDisconnectedOnError) {
LOG.debug("This connection was not successful or has been disconnected, "
+ "hence not parsing headers and inputStream");
return;
}
processConnHeadersAndInputStreams(buffer, offset, length);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the method name can remain same as before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this to make it more testable. This is to abstract out the logic which actually reads the responseHeaders and inputStream. And in future, if some more logic would need to be added, that can be added in the processConnHeadersAndInputStreams

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

}

void processConnHeadersAndInputStreams(final byte[] buffer,
final int offset,
final int length) throws IOException {
// get the response
long startTime = 0;
startTime = System.nanoTime();
Expand Down Expand Up @@ -608,6 +634,11 @@ String getConnResponseMessage() throws IOException {
return connection.getResponseMessage();
}

@VisibleForTesting
Boolean getConnectionDisconnectedOnError() {
return connectionDisconnectedOnError;
}

public static class AbfsHttpOperationWithFixedResult extends AbfsHttpOperation {
/**
* Creates an instance to represent fixed results.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ private void uploadBlockAsync(DataBlocks.DataBlock blockToUpload,
*/
AppendRequestParameters reqParams = new AppendRequestParameters(
offset, 0, bytesLength, mode, false, leaseId, isExpectHeaderEnabled);
AbfsRestOperation op = client.append(path,
AbfsRestOperation op = getClient().append(path,
blockUploadData.toByteArray(), reqParams, cachedSasToken.get(),
contextEncryptionAdapter, new TracingContext(tracingContext));
cachedSasToken.update(op.getSasToken());
Expand Down Expand Up @@ -655,7 +655,7 @@ private synchronized void flushWrittenBytesToServiceInternal(final long offset,
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"flushWrittenBytesToServiceInternal", "flush")) {
AbfsRestOperation op = client.flush(path, offset, retainUncommitedData,
AbfsRestOperation op = getClient().flush(path, offset, retainUncommitedData,
isClose, cachedSasToken.get(), leaseId, contextEncryptionAdapter,
new TracingContext(tracingContext));
cachedSasToken.update(op.getSasToken());
Expand Down Expand Up @@ -795,4 +795,9 @@ BackReference getFsBackRef() {
ListeningExecutorService getExecutorService() {
return executorService;
}

@VisibleForTesting
AbfsClient getClient() {
return client;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
Expand Down Expand Up @@ -586,7 +587,7 @@ public void testExpectHundredContinue() throws Exception {
.getConnResponseMessage();

// Make the getOutputStream throw IOException to see it returns from the sendRequest correctly.
Mockito.doThrow(new ProtocolException("Server rejected Operation"))
Mockito.doThrow(new ProtocolException(EXPECT_100_JDK_ERROR))
.when(abfsHttpOperation)
.getConnOutputStream();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,28 @@

package org.apache.hadoop.fs.azurebfs.services;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;

import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.mockito.Mockito;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
import org.apache.hadoop.test.LambdaTestUtils;

import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED;

/**
* Test create operation.
*/
Expand Down Expand Up @@ -148,6 +154,61 @@ public void testAbfsOutputStreamClosingFsBeforeStream()
}
}

@Test
public void testExpect100ContinueFailureInAppend() throws Exception {
Configuration configuration = new Configuration(getRawConfiguration());
configuration.set(FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED, "true");
AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
configuration);
Path path = new Path("/testFile");
AbfsOutputStream os = Mockito.spy(
(AbfsOutputStream) fs.create(path).getWrappedStream());
AbfsClient spiedClient = Mockito.spy(os.getClient());
AbfsHttpOperation[] httpOpForAppendTest = new AbfsHttpOperation[2];
mockSetupForAppend(httpOpForAppendTest, spiedClient);
Mockito.doReturn(spiedClient).when(os).getClient();
fs.delete(path, true);
os.write(1);
LambdaTestUtils.intercept(FileNotFoundException.class, () -> {
os.close();
});
Assertions.assertThat(httpOpForAppendTest[0].getConnectionDisconnectedOnError())
.describedAs("First try from AbfsClient will have expect-100 "
+ "header and should fail with expect-100 error.").isTrue();
Mockito.verify(httpOpForAppendTest[0], Mockito.times(0))
.processConnHeadersAndInputStreams(Mockito.any(byte[].class),
Mockito.anyInt(), Mockito.anyInt());

Assertions.assertThat(httpOpForAppendTest[1].getConnectionDisconnectedOnError())
.describedAs("The retried operation from AbfsClient should not "
+ "fail with expect-100 error. The retried operation does not have"
+ "expect-100 header.").isFalse();
Mockito.verify(httpOpForAppendTest[1], Mockito.times(1))
.processConnHeadersAndInputStreams(Mockito.any(byte[].class),
Mockito.anyInt(), Mockito.anyInt());
}

private void mockSetupForAppend(final AbfsHttpOperation[] httpOpForAppendTest,
final AbfsClient spiedClient) {
int[] index = new int[1];
index[0] = 0;
Mockito.doAnswer(abfsRestOpAppendGetInvocation -> {
AbfsRestOperation op = Mockito.spy(
(AbfsRestOperation) abfsRestOpAppendGetInvocation.callRealMethod());
Mockito.doAnswer(createHttpOpInvocation -> {
httpOpForAppendTest[index[0]] = Mockito.spy(
(AbfsHttpOperation) createHttpOpInvocation.callRealMethod());
return httpOpForAppendTest[index[0]++];
}).when(op).createHttpOperation();
return op;
})
.when(spiedClient)
.getAbfsRestOperation(Mockito.any(AbfsRestOperationType.class),
Mockito.anyString(), Mockito.any(
URL.class), Mockito.anyList(), Mockito.any(byte[].class),
Mockito.anyInt(), Mockito.anyInt(), Mockito.nullable(String.class));
}

/**
* Separate method to create an outputStream using a local FS instance so
* that once this method has returned, the FS instance can be eligible for GC.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import static java.net.HttpURLConnection.HTTP_OK;
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
Expand Down Expand Up @@ -232,7 +233,7 @@ private AbfsRestOperation getRestOperation() throws Exception {
Mockito.doReturn(responseMessage)
.when(abfsHttpOperation)
.getConnResponseMessage();
Mockito.doThrow(new ProtocolException("Server rejected Operation"))
Mockito.doThrow(new ProtocolException(EXPECT_100_JDK_ERROR))
.when(abfsHttpOperation)
.getConnOutputStream();
break;
Expand Down