Skip to content

Commit 7dc166d

Browse files
authored
HADOOP-18883. [ABFS]: Expect-100 JDK bug resolution: prevent multiple server calls (#6022)
Address JDK bug JDK-8314978 related to handling of HTTP 100 responses. https://bugs.openjdk.org/browse/JDK-8314978 In the AbfsHttpOperation, after sendRequest() we call processResponse() method from AbfsRestOperation. Even if the conn.getOutputStream() fails due to expect-100 error, we consume the exception and let the code go ahead. This may call getHeaderField() / getHeaderFields() / getHeaderFieldLong() after getOutputStream() has failed. These invocation all lead to server calls. This commit aims to prevent this. If connection.getOutputStream() fails due to an Expect-100 error, the ABFS client does not invoke getHeaderField(), getHeaderFields(), getHeaderFieldLong() or getInputStream(). getResponseCode() is safe as on the failure it sets the responseCode variable in HttpUrlConnection object. Contributed by Pranav Saxena
1 parent d274f77 commit 7dc166d

File tree

6 files changed

+109
-9
lines changed

6 files changed

+109
-9
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public final class AbfsHttpConstants {
6969
* and should qualify for retry.
7070
*/
7171
public static final int HTTP_CONTINUE = 100;
72+
public static final String EXPECT_100_JDK_ERROR = "Server rejected operation";
7273

7374
// Abfs generic constants
7475
public static final String SINGLE_WHITE_SPACE = " ";

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222
import java.io.InputStream;
2323
import java.io.OutputStream;
2424
import java.net.HttpURLConnection;
25+
import java.net.ProtocolException;
2526
import java.net.URL;
2627
import java.util.List;
2728

2829
import javax.net.ssl.HttpsURLConnection;
2930
import javax.net.ssl.SSLSocketFactory;
3031

32+
import org.apache.hadoop.classification.VisibleForTesting;
3133
import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
3234
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
3335

@@ -43,6 +45,7 @@
4345
import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsPerfLoggable;
4446
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
4547

48+
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR;
4649
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
4750
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT;
4851

@@ -83,6 +86,7 @@ public class AbfsHttpOperation implements AbfsPerfLoggable {
8386
private long sendRequestTimeMs;
8487
private long recvResponseTimeMs;
8588
private boolean shouldMask = false;
89+
private boolean connectionDisconnectedOnError = false;
8690

8791
public static AbfsHttpOperation getAbfsHttpOperationWithFixedResult(
8892
final URL url,
@@ -324,14 +328,26 @@ public void sendRequest(byte[] buffer, int offset, int length) throws IOExceptio
324328
*/
325329
outputStream = getConnOutputStream();
326330
} catch (IOException e) {
327-
/* If getOutputStream fails with an exception and expect header
328-
is enabled, we return back without throwing an exception to
329-
the caller. The caller is responsible for setting the correct status code.
330-
If expect header is not enabled, we throw back the exception.
331+
connectionDisconnectedOnError = true;
332+
/* If getOutputStream fails with an expect-100 exception , we return back
333+
without throwing an exception to the caller. Else, we throw back the exception.
331334
*/
332335
String expectHeader = getConnProperty(EXPECT);
333-
if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE)) {
336+
if (expectHeader != null && expectHeader.equals(HUNDRED_CONTINUE)
337+
&& e instanceof ProtocolException
338+
&& EXPECT_100_JDK_ERROR.equals(e.getMessage())) {
334339
LOG.debug("Getting output stream failed with expect header enabled, returning back ", e);
340+
/*
341+
* In case expect-100 assertion has failed, headers and inputStream should not
342+
* be parsed. Reason being, conn.getHeaderField(), conn.getHeaderFields(),
343+
* conn.getInputStream() will lead to repeated server call.
344+
* ref: https://bugs.openjdk.org/browse/JDK-8314978.
345+
* Reading conn.responseCode() and conn.getResponseMessage() is safe in
346+
* case of Expect-100 error. Reason being, in JDK, it stores the responseCode
347+
* in the HttpUrlConnection object before throwing exception to the caller.
348+
*/
349+
this.statusCode = getConnResponseCode();
350+
this.statusDescription = getConnResponseMessage();
335351
return;
336352
} else {
337353
LOG.debug("Getting output stream failed without expect header enabled, throwing exception ", e);
@@ -364,7 +380,17 @@ public void sendRequest(byte[] buffer, int offset, int length) throws IOExceptio
364380
* @throws IOException if an error occurs.
365381
*/
366382
public void processResponse(final byte[] buffer, final int offset, final int length) throws IOException {
383+
if (connectionDisconnectedOnError) {
384+
LOG.debug("This connection was not successful or has been disconnected, "
385+
+ "hence not parsing headers and inputStream");
386+
return;
387+
}
388+
processConnHeadersAndInputStreams(buffer, offset, length);
389+
}
367390

391+
void processConnHeadersAndInputStreams(final byte[] buffer,
392+
final int offset,
393+
final int length) throws IOException {
368394
// get the response
369395
long startTime = 0;
370396
startTime = System.nanoTime();
@@ -608,6 +634,11 @@ String getConnResponseMessage() throws IOException {
608634
return connection.getResponseMessage();
609635
}
610636

637+
@VisibleForTesting
638+
Boolean getConnectionDisconnectedOnError() {
639+
return connectionDisconnectedOnError;
640+
}
641+
611642
public static class AbfsHttpOperationWithFixedResult extends AbfsHttpOperation {
612643
/**
613644
* Creates an instance to represent fixed results.

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ private void uploadBlockAsync(DataBlocks.DataBlock blockToUpload,
338338
*/
339339
AppendRequestParameters reqParams = new AppendRequestParameters(
340340
offset, 0, bytesLength, mode, false, leaseId, isExpectHeaderEnabled);
341-
AbfsRestOperation op = client.append(path,
341+
AbfsRestOperation op = getClient().append(path,
342342
blockUploadData.toByteArray(), reqParams, cachedSasToken.get(),
343343
contextEncryptionAdapter, new TracingContext(tracingContext));
344344
cachedSasToken.update(op.getSasToken());
@@ -655,7 +655,7 @@ private synchronized void flushWrittenBytesToServiceInternal(final long offset,
655655
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
656656
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
657657
"flushWrittenBytesToServiceInternal", "flush")) {
658-
AbfsRestOperation op = client.flush(path, offset, retainUncommitedData,
658+
AbfsRestOperation op = getClient().flush(path, offset, retainUncommitedData,
659659
isClose, cachedSasToken.get(), leaseId, contextEncryptionAdapter,
660660
new TracingContext(tracingContext));
661661
cachedSasToken.update(op.getSasToken());
@@ -795,4 +795,9 @@ BackReference getFsBackRef() {
795795
ListeningExecutorService getExecutorService() {
796796
return executorService;
797797
}
798+
799+
@VisibleForTesting
800+
AbfsClient getClient() {
801+
return client;
802+
}
798803
}

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848

4949
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
5050
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
51+
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR;
5152
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
5253
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
5354
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
@@ -586,7 +587,7 @@ public void testExpectHundredContinue() throws Exception {
586587
.getConnResponseMessage();
587588

588589
// Make the getOutputStream throw IOException to see it returns from the sendRequest correctly.
589-
Mockito.doThrow(new ProtocolException("Server rejected Operation"))
590+
Mockito.doThrow(new ProtocolException(EXPECT_100_JDK_ERROR))
590591
.when(abfsHttpOperation)
591592
.getConnOutputStream();
592593

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,28 @@
1818

1919
package org.apache.hadoop.fs.azurebfs.services;
2020

21+
import java.io.FileNotFoundException;
2122
import java.io.IOException;
2223
import java.net.URI;
2324
import java.net.URISyntaxException;
25+
import java.net.URL;
2426

2527
import org.assertj.core.api.Assertions;
2628
import org.junit.Test;
29+
import org.mockito.Mockito;
2730

2831
import org.apache.hadoop.conf.Configuration;
2932
import org.apache.hadoop.fs.FSDataOutputStream;
33+
import org.apache.hadoop.fs.FileSystem;
3034
import org.apache.hadoop.fs.Path;
3135
import org.apache.hadoop.fs.PathIOException;
3236
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
3337
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
3438
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
3539
import org.apache.hadoop.test.LambdaTestUtils;
3640

41+
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED;
42+
3743
/**
3844
* Test create operation.
3945
*/
@@ -148,6 +154,61 @@ public void testAbfsOutputStreamClosingFsBeforeStream()
148154
}
149155
}
150156

157+
@Test
158+
public void testExpect100ContinueFailureInAppend() throws Exception {
159+
Configuration configuration = new Configuration(getRawConfiguration());
160+
configuration.set(FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED, "true");
161+
AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
162+
configuration);
163+
Path path = new Path("/testFile");
164+
AbfsOutputStream os = Mockito.spy(
165+
(AbfsOutputStream) fs.create(path).getWrappedStream());
166+
AbfsClient spiedClient = Mockito.spy(os.getClient());
167+
AbfsHttpOperation[] httpOpForAppendTest = new AbfsHttpOperation[2];
168+
mockSetupForAppend(httpOpForAppendTest, spiedClient);
169+
Mockito.doReturn(spiedClient).when(os).getClient();
170+
fs.delete(path, true);
171+
os.write(1);
172+
LambdaTestUtils.intercept(FileNotFoundException.class, () -> {
173+
os.close();
174+
});
175+
Assertions.assertThat(httpOpForAppendTest[0].getConnectionDisconnectedOnError())
176+
.describedAs("First try from AbfsClient will have expect-100 "
177+
+ "header and should fail with expect-100 error.").isTrue();
178+
Mockito.verify(httpOpForAppendTest[0], Mockito.times(0))
179+
.processConnHeadersAndInputStreams(Mockito.any(byte[].class),
180+
Mockito.anyInt(), Mockito.anyInt());
181+
182+
Assertions.assertThat(httpOpForAppendTest[1].getConnectionDisconnectedOnError())
183+
.describedAs("The retried operation from AbfsClient should not "
184+
+ "fail with expect-100 error. The retried operation does not have"
185+
+ "expect-100 header.").isFalse();
186+
Mockito.verify(httpOpForAppendTest[1], Mockito.times(1))
187+
.processConnHeadersAndInputStreams(Mockito.any(byte[].class),
188+
Mockito.anyInt(), Mockito.anyInt());
189+
}
190+
191+
private void mockSetupForAppend(final AbfsHttpOperation[] httpOpForAppendTest,
192+
final AbfsClient spiedClient) {
193+
int[] index = new int[1];
194+
index[0] = 0;
195+
Mockito.doAnswer(abfsRestOpAppendGetInvocation -> {
196+
AbfsRestOperation op = Mockito.spy(
197+
(AbfsRestOperation) abfsRestOpAppendGetInvocation.callRealMethod());
198+
Mockito.doAnswer(createHttpOpInvocation -> {
199+
httpOpForAppendTest[index[0]] = Mockito.spy(
200+
(AbfsHttpOperation) createHttpOpInvocation.callRealMethod());
201+
return httpOpForAppendTest[index[0]++];
202+
}).when(op).createHttpOperation();
203+
return op;
204+
})
205+
.when(spiedClient)
206+
.getAbfsRestOperation(Mockito.any(AbfsRestOperationType.class),
207+
Mockito.anyString(), Mockito.any(
208+
URL.class), Mockito.anyList(), Mockito.any(byte[].class),
209+
Mockito.anyInt(), Mockito.anyInt(), Mockito.nullable(String.class));
210+
}
211+
151212
/**
152213
* Separate method to create an outputStream using a local FS instance so
153214
* that once this method has returned, the FS instance can be eligible for GC.

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import static java.net.HttpURLConnection.HTTP_OK;
5050
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
5151
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
52+
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR;
5253
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
5354
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
5455
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HUNDRED_CONTINUE;
@@ -232,7 +233,7 @@ private AbfsRestOperation getRestOperation() throws Exception {
232233
Mockito.doReturn(responseMessage)
233234
.when(abfsHttpOperation)
234235
.getConnResponseMessage();
235-
Mockito.doThrow(new ProtocolException("Server rejected Operation"))
236+
Mockito.doThrow(new ProtocolException(EXPECT_100_JDK_ERROR))
236237
.when(abfsHttpOperation)
237238
.getConnOutputStream();
238239
break;

0 commit comments

Comments
 (0)