Skip to content

Commit da64e2e

Browse files
authored
Merge pull request apache#14 from passaro/HADOOP-18073-v2/select
Upgrade select to SDK v2
2 parents d429906 + 4d2413a commit da64e2e

17 files changed

+745
-255
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSClientConfig.java

Lines changed: 96 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import software.amazon.awssdk.core.retry.RetryPolicy;
3232
import software.amazon.awssdk.http.apache.ApacheHttpClient;
3333
import software.amazon.awssdk.http.apache.ProxyConfiguration;
34+
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
3435
import software.amazon.awssdk.thirdparty.org.apache.http.client.utils.URIBuilder;
3536

3637
import org.apache.hadoop.conf.Configuration;
@@ -111,6 +112,33 @@ public static ApacheHttpClient.Builder createHttpClientBuilder(Configuration con
111112
return httpClientBuilder;
112113
}
113114

115+
/**
116+
* Configures the async http client.
117+
*
118+
* @param conf The Hadoop configuration
119+
* @return Http client builder
120+
*/
121+
public static NettyNioAsyncHttpClient.Builder createAsyncHttpClientBuilder(Configuration conf) {
122+
NettyNioAsyncHttpClient.Builder httpClientBuilder =
123+
NettyNioAsyncHttpClient.builder();
124+
125+
httpClientBuilder.maxConcurrency(S3AUtils.intOption(conf, MAXIMUM_CONNECTIONS,
126+
DEFAULT_MAXIMUM_CONNECTIONS, 1));
127+
128+
int connectionEstablishTimeout =
129+
S3AUtils.intOption(conf, ESTABLISH_TIMEOUT, DEFAULT_ESTABLISH_TIMEOUT, 0);
130+
int socketTimeout = S3AUtils.intOption(conf, SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT, 0);
131+
132+
httpClientBuilder.connectionTimeout(Duration.ofSeconds(connectionEstablishTimeout));
133+
httpClientBuilder.readTimeout(Duration.ofSeconds(socketTimeout));
134+
httpClientBuilder.writeTimeout(Duration.ofSeconds(socketTimeout));
135+
136+
// TODO: Need to set ssl socket factory, as done in
137+
// NetworkBinding.bindSSLChannelMode(conf, awsConf);
138+
139+
return httpClientBuilder;
140+
}
141+
114142
/**
115143
* Configures the retry policy.
116144
*
@@ -132,10 +160,10 @@ public static RetryPolicy.Builder createRetryPolicyBuilder(Configuration conf) {
132160
*
133161
* @param conf The Hadoop configuration
134162
* @param bucket Optional bucket to use to look up per-bucket proxy secrets
135-
* @return Proxy configuration builder
163+
* @return Proxy configuration
136164
* @throws IOException on any IO problem
137165
*/
138-
public static ProxyConfiguration.Builder createProxyConfigurationBuilder(Configuration conf,
166+
public static ProxyConfiguration createProxyConfiguration(Configuration conf,
139167
String bucket) throws IOException {
140168

141169
ProxyConfiguration.Builder proxyConfigBuilder = ProxyConfiguration.builder();
@@ -181,7 +209,72 @@ public static ProxyConfiguration.Builder createProxyConfigurationBuilder(Configu
181209
throw new IllegalArgumentException(msg);
182210
}
183211

184-
return proxyConfigBuilder;
212+
return proxyConfigBuilder.build();
213+
}
214+
215+
/**
216+
* Configures the proxy for the async http client.
217+
*
218+
* @param conf The Hadoop configuration
219+
* @param bucket Optional bucket to use to look up per-bucket proxy secrets
220+
* @return Proxy configuration
221+
* @throws IOException on any IO problem
222+
*/
223+
public static software.amazon.awssdk.http.nio.netty.ProxyConfiguration
224+
createAsyncProxyConfiguration(Configuration conf,
225+
String bucket) throws IOException {
226+
227+
software.amazon.awssdk.http.nio.netty.ProxyConfiguration.Builder proxyConfigBuilder =
228+
software.amazon.awssdk.http.nio.netty.ProxyConfiguration.builder();
229+
230+
String proxyHost = conf.getTrimmed(PROXY_HOST, "");
231+
int proxyPort = conf.getInt(PROXY_PORT, -1);
232+
233+
if (!proxyHost.isEmpty()) {
234+
if (proxyPort >= 0) {
235+
proxyConfigBuilder.host(proxyHost);
236+
proxyConfigBuilder.port(proxyPort);
237+
} else {
238+
if (conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) {
239+
LOG.warn("Proxy host set without port. Using HTTPS default 443");
240+
proxyConfigBuilder.host(proxyHost);
241+
proxyConfigBuilder.port(443);
242+
} else {
243+
LOG.warn("Proxy host set without port. Using HTTP default 80");
244+
proxyConfigBuilder.host(proxyHost);
245+
proxyConfigBuilder.port(80);
246+
}
247+
}
248+
final String proxyUsername = S3AUtils.lookupPassword(bucket, conf, PROXY_USERNAME,
249+
null, null);
250+
final String proxyPassword = S3AUtils.lookupPassword(bucket, conf, PROXY_PASSWORD,
251+
null, null);
252+
if ((proxyUsername == null) != (proxyPassword == null)) {
253+
String msg = "Proxy error: " + PROXY_USERNAME + " or " +
254+
PROXY_PASSWORD + " set without the other.";
255+
LOG.error(msg);
256+
throw new IllegalArgumentException(msg);
257+
}
258+
proxyConfigBuilder.username(proxyUsername);
259+
proxyConfigBuilder.password(proxyPassword);
260+
// TODO: check NTLM support
261+
// proxyConfigBuilder.ntlmDomain(conf.getTrimmed(PROXY_DOMAIN));
262+
// proxyConfigBuilder.ntlmWorkstation(conf.getTrimmed(PROXY_WORKSTATION));
263+
if (LOG.isDebugEnabled()) {
264+
LOG.debug("Using proxy server {}:{} as user {} with password {} on "
265+
+ "domain {} as workstation {}", proxyHost, proxyPort, proxyUsername, proxyPassword,
266+
PROXY_DOMAIN, PROXY_WORKSTATION);
267+
}
268+
} else if (proxyPort >= 0) {
269+
String msg =
270+
"Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST;
271+
LOG.error(msg);
272+
throw new IllegalArgumentException(msg);
273+
} else {
274+
return null;
275+
}
276+
277+
return proxyConfigBuilder.build();
185278
}
186279

187280
/***

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java

Lines changed: 75 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,11 @@
5555
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
5656
import software.amazon.awssdk.core.retry.RetryPolicy;
5757
import software.amazon.awssdk.http.apache.ApacheHttpClient;
58-
import software.amazon.awssdk.http.apache.ProxyConfiguration;
58+
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
5959
import software.amazon.awssdk.regions.Region;
60+
import software.amazon.awssdk.services.s3.S3AsyncClient;
61+
import software.amazon.awssdk.services.s3.S3BaseClientBuilder;
6062
import software.amazon.awssdk.services.s3.S3Client;
61-
import software.amazon.awssdk.services.s3.S3ClientBuilder;
6263
import software.amazon.awssdk.services.s3.S3Configuration;
6364
import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
6465
import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
@@ -184,34 +185,85 @@ public AmazonS3 createS3Client(
184185
}
185186
}
186187

187-
/**
188-
* Creates a new {@link S3Client}.
189-
*
190-
* @param uri S3A file system URI
191-
* @param parameters parameter object
192-
* @return S3 client
193-
* @throws IOException on any IO problem
194-
*/
195188
@Override
196189
public S3Client createS3ClientV2(
197190
final URI uri,
198191
final S3ClientCreationParameters parameters) throws IOException {
199192

200193
Configuration conf = getConf();
201194
bucket = uri.getHost();
195+
ApacheHttpClient.Builder httpClientBuilder = AWSClientConfig
196+
.createHttpClientBuilder(conf)
197+
.proxyConfiguration(AWSClientConfig.createProxyConfiguration(conf, bucket));
198+
return configureClientBuilder(S3Client.builder(), parameters, conf, bucket)
199+
.httpClientBuilder(httpClientBuilder)
200+
.build();
201+
}
202202

203-
final ClientOverrideConfiguration.Builder clientOverrideConfigBuilder =
204-
AWSClientConfig.createClientConfigBuilder(conf);
203+
@Override
204+
public S3AsyncClient createS3AsyncClient(
205+
final URI uri,
206+
final S3ClientCreationParameters parameters) throws IOException {
205207

206-
final ApacheHttpClient.Builder httpClientBuilder =
207-
AWSClientConfig.createHttpClientBuilder(conf);
208+
Configuration conf = getConf();
209+
bucket = uri.getHost();
210+
NettyNioAsyncHttpClient.Builder httpClientBuilder = AWSClientConfig
211+
.createAsyncHttpClientBuilder(conf)
212+
.proxyConfiguration(AWSClientConfig.createAsyncProxyConfiguration(conf, bucket));
213+
return configureClientBuilder(S3AsyncClient.builder(), parameters, conf, bucket)
214+
.httpClientBuilder(httpClientBuilder)
215+
.build();
216+
}
208217

209-
final RetryPolicy.Builder retryPolicyBuilder = AWSClientConfig.createRetryPolicyBuilder(conf);
218+
/**
219+
* Configure a sync or async S3 client builder.
220+
* This method handles all shared configuration.
221+
* @param builder S3 client builder
222+
* @param parameters parameter object
223+
* @param conf configuration object
224+
* @param bucket bucket name
225+
* @return the builder object
226+
* @param <BuilderT> S3 client builder type
227+
* @param <ClientT> S3 client type
228+
*/
229+
private static <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT>
230+
BuilderT configureClientBuilder(
231+
BuilderT builder,
232+
S3ClientCreationParameters parameters,
233+
Configuration conf,
234+
String bucket) {
210235

211-
final ProxyConfiguration.Builder proxyConfigBuilder =
212-
AWSClientConfig.createProxyConfigurationBuilder(conf, bucket);
236+
URI endpoint = getS3Endpoint(parameters.getEndpoint(), conf);
237+
Region region = getS3Region(conf.getTrimmed(AWS_REGION), bucket,
238+
parameters.getCredentialSet());
239+
LOG.debug("Using endpoint {}; and region {}", endpoint, region);
213240

214-
S3ClientBuilder s3ClientBuilder = S3Client.builder();
241+
// TODO: Some configuration done in configureBasicParams is not done yet.
242+
S3Configuration serviceConfiguration = S3Configuration.builder()
243+
.pathStyleAccessEnabled(parameters.isPathStyleAccess())
244+
.build();
245+
246+
return builder
247+
.overrideConfiguration(createClientOverrideConfiguration(parameters, conf))
248+
.credentialsProvider(
249+
// use adapter classes so V1 credential providers continue to work. This will
250+
// be moved to AWSCredentialProviderList.add() when that class is updated.
251+
V1V2AwsCredentialProviderAdapter.adapt(parameters.getCredentialSet()))
252+
.endpointOverride(endpoint)
253+
.region(region)
254+
.serviceConfiguration(serviceConfiguration);
255+
}
256+
257+
/**
258+
* Create an override configuration for an S3 client.
259+
* @param parameters parameter object
260+
* @param conf configuration object
261+
* @return the override configuration
262+
*/
263+
private static ClientOverrideConfiguration createClientOverrideConfiguration(
264+
S3ClientCreationParameters parameters, Configuration conf) {
265+
final ClientOverrideConfiguration.Builder clientOverrideConfigBuilder =
266+
AWSClientConfig.createClientConfigBuilder(conf);
215267

216268
// add any headers
217269
parameters.getHeaders().forEach((h, v) -> clientOverrideConfigBuilder.putHeader(h, v));
@@ -232,40 +284,12 @@ public S3Client createS3ClientV2(
232284
}
233285
}
234286

287+
final RetryPolicy.Builder retryPolicyBuilder = AWSClientConfig.createRetryPolicyBuilder(conf);
235288
clientOverrideConfigBuilder.retryPolicy(retryPolicyBuilder.build());
236-
httpClientBuilder.proxyConfiguration(proxyConfigBuilder.build());
237-
238-
s3ClientBuilder.httpClientBuilder(httpClientBuilder)
239-
.overrideConfiguration(clientOverrideConfigBuilder.build());
240-
241-
// use adapter classes so V1 credential providers continue to work. This will be moved to
242-
// AWSCredentialProviderList.add() when that class is updated.
243-
s3ClientBuilder.credentialsProvider(
244-
V1V2AwsCredentialProviderAdapter.adapt(parameters.getCredentialSet()));
245-
246-
URI endpoint = getS3Endpoint(parameters.getEndpoint(), conf);
247-
248-
Region region =
249-
getS3Region(conf.getTrimmed(AWS_REGION), parameters.getCredentialSet());
250-
251-
LOG.debug("Using endpoint {}; and region {}", endpoint, region);
252-
253-
s3ClientBuilder.endpointOverride(endpoint).region(region);
254-
255-
S3Configuration s3Configuration = S3Configuration.builder()
256-
.pathStyleAccessEnabled(parameters.isPathStyleAccess())
257-
.build();
258-
259-
s3ClientBuilder.serviceConfiguration(s3Configuration);
260289

261-
// TODO: Some configuration done in configureBasicParams is not done yet.
262-
// Need to verify how metrics collection can be done, as SDK V2 only
263-
// seems to have a metrics publisher.
264-
265-
return s3ClientBuilder.build();
290+
return clientOverrideConfigBuilder.build();
266291
}
267292

268-
269293
/**
270294
* Create an {@link AmazonS3} client of type
271295
* {@link AmazonS3EncryptionV2} if CSE is enabled.
@@ -527,10 +551,12 @@ private static URI getS3Endpoint(String endpoint, final Configuration conf) {
527551
*
528552
* @param region AWS S3 Region set in the config. This property may not be set, in which case
529553
* ask S3 for the region.
554+
* @param bucket Bucket name.
530555
* @param credentialsProvider Credentials provider to be used with the default s3 client.
531556
* @return region of the bucket.
532557
*/
533-
private Region getS3Region(String region, AWSCredentialsProvider credentialsProvider) {
558+
private static Region getS3Region(String region, String bucket,
559+
AWSCredentialsProvider credentialsProvider) {
534560

535561
if (!StringUtils.isBlank(region)) {
536562
return Region.of(region);

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,6 @@
5454
import com.amazonaws.SdkBaseException;
5555
import com.amazonaws.services.s3.AmazonS3;
5656
import com.amazonaws.services.s3.Headers;
57-
import com.amazonaws.services.s3.model.SelectObjectContentRequest;
58-
import com.amazonaws.services.s3.model.SelectObjectContentResult;
5957
import com.amazonaws.services.s3.transfer.TransferManager;
6058
import com.amazonaws.services.s3.transfer.TransferManagerConfiguration;
6159

@@ -64,6 +62,8 @@
6462
import org.slf4j.LoggerFactory;
6563

6664
import software.amazon.awssdk.core.ResponseInputStream;
65+
import software.amazon.awssdk.core.async.SdkPublisher;
66+
import software.amazon.awssdk.services.s3.S3AsyncClient;
6767
import software.amazon.awssdk.services.s3.S3Client;
6868
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
6969
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
@@ -88,6 +88,10 @@
8888
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
8989
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
9090
import software.amazon.awssdk.services.s3.model.S3Error;
91+
import software.amazon.awssdk.services.s3.model.SelectObjectContentEventStream;
92+
import software.amazon.awssdk.services.s3.model.SelectObjectContentRequest;
93+
import software.amazon.awssdk.services.s3.model.SelectObjectContentResponse;
94+
import software.amazon.awssdk.services.s3.model.SelectObjectContentResponseHandler;
9195
import software.amazon.awssdk.services.s3.model.StorageClass;
9296
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
9397
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
@@ -289,6 +293,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
289293
private String username;
290294
private AmazonS3 s3;
291295
private S3Client s3V2;
296+
private S3AsyncClient s3AsyncClient;
292297
// initial callback policy is fail-once; it's there just to assist
293298
// some mock tests and other codepaths trying to call the low level
294299
// APIs on an uninitialized filesystem.
@@ -973,6 +978,9 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
973978
.createS3ClientV2(getUri(),
974979
parameters);
975980

981+
s3AsyncClient = ReflectionUtils.newInstance(s3ClientFactoryClass, conf)
982+
.createS3AsyncClient(getUri(),
983+
parameters);
976984
}
977985

978986
/**
@@ -1689,8 +1697,10 @@ private final class WriteOperationHelperCallbacksImpl
16891697
implements WriteOperationHelper.WriteOperationHelperCallbacks {
16901698

16911699
@Override
1692-
public SelectObjectContentResult selectObjectContent(SelectObjectContentRequest request) {
1693-
return s3.selectObjectContent(request);
1700+
public CompletableFuture<Void> selectObjectContent(
1701+
SelectObjectContentRequest request,
1702+
SelectObjectContentResponseHandler responseHandler) {
1703+
return s3AsyncClient.selectObjectContent(request, responseHandler);
16941704
}
16951705

16961706
@Override
@@ -1700,7 +1710,6 @@ public CompleteMultipartUploadResponse completeMultipartUpload(
17001710
}
17011711
}
17021712

1703-
17041713
/**
17051714
* Create the read context for reading from the referenced file,
17061715
* using FS state as well as the status.

0 commit comments

Comments
 (0)