Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 178 additions & 30 deletions hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
package org.apache.hadoop.hbase.rest.client;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.GeneralSecurityException;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
Expand All @@ -44,9 +46,14 @@
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.ssl.SSLFactory.Mode;
import org.apache.http.Header;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpDelete;
Expand All @@ -55,9 +62,12 @@
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.cookie.BasicClientCookie;
import org.apache.http.message.BasicHeader;
import org.apache.http.ssl.SSLContexts;
import org.apache.http.util.EntityUtils;
Expand Down Expand Up @@ -86,8 +96,11 @@ public class Client {
private boolean sslEnabled;
private HttpResponse resp;
private HttpGet httpGet = null;

private HttpClientContext stickyContext = null;
private BasicCredentialsProvider provider;
private Optional<KeyStore> trustStore;
private Map<String, String> extraHeaders;
private KerberosAuthenticator authenticator;

private static final String AUTH_COOKIE = "hadoop.auth";
private static final String AUTH_COOKIE_EQ = AUTH_COOKIE + "=";
Expand All @@ -100,11 +113,13 @@ public Client() {
this(null);
}

private void initialize(Cluster cluster, Configuration conf, boolean sslEnabled,
Optional<KeyStore> trustStore) {
private void initialize(Cluster cluster, Configuration conf, boolean sslEnabled, boolean sticky,
Optional<KeyStore> trustStore, Optional<String> userName, Optional<String> password,
Optional<String> bearerToken) {
this.cluster = cluster;
this.conf = conf;
this.sslEnabled = sslEnabled;
this.trustStore = trustStore;
extraHeaders = new ConcurrentHashMap<>();
String clspath = System.getProperty("java.class.path");
LOG.debug("classpath " + clspath);
Expand Down Expand Up @@ -136,38 +151,77 @@ private void initialize(Cluster cluster, Configuration conf, boolean sslEnabled,
}
}

if (userName.isPresent() && password.isPresent()) {
// We want to stick to the old very limited authentication and session handling when sticky is
// not set
// to preserve backwards compatibility
if (!sticky) {
throw new IllegalArgumentException("BASIC auth is only implemented when sticky is set");
}
provider = new BasicCredentialsProvider();
// AuthScope.ANY is required for pre-emptive auth. We only ever use a single auth method
// anyway.
AuthScope anyAuthScope = AuthScope.ANY;
this.provider.setCredentials(anyAuthScope,
new UsernamePasswordCredentials(userName.get(), password.get()));
}

if (bearerToken.isPresent()) {
// We want to stick to the old very limited authentication and session handling when sticky is
// not set
// to preserve backwards compatibility
if (!sticky) {
throw new IllegalArgumentException("BEARER auth is only implemented when sticky is set");
}
// We could also put the header into the context or connection, but that would have the same
// effect.
extraHeaders.put(HttpHeaders.AUTHORIZATION, "Bearer " + bearerToken.get());
}

this.httpClient = httpClientBuilder.build();
setSticky(sticky);
}

/**
* Constructor
* Constructor This constructor will create an object using the old faulty load balancing logic.
* When specifying multiple servers in the cluster object, it is highly recommended to call
* setSticky() on the created client, or use one of the preferred constructors instead.
* @param cluster the cluster definition
*/
public Client(Cluster cluster) {
this(cluster, false);
}

/**
* Constructor
* Constructor This constructor will create an object using the old faulty load balancing logic.
* When specifying multiple servers in the cluster object, it is highly recommended to call
* setSticky() on the created client, or use one of the preferred constructors instead.
* @param cluster the cluster definition
* @param sslEnabled enable SSL or not
*/
public Client(Cluster cluster, boolean sslEnabled) {
initialize(cluster, HBaseConfiguration.create(), sslEnabled, Optional.empty());
initialize(cluster, HBaseConfiguration.create(), sslEnabled, false, Optional.empty(),
Optional.empty(), Optional.empty(), Optional.empty());
}

/**
* Constructor
* Constructor This constructor will create an object using the old faulty load balancing logic.
* When specifying multiple servers in the cluster object, it is highly recommended to call
* setSticky() on the created client, or use one of the preferred constructors instead.
* @param cluster the cluster definition
* @param conf Configuration
* @param sslEnabled enable SSL or not
*/
public Client(Cluster cluster, Configuration conf, boolean sslEnabled) {
initialize(cluster, conf, sslEnabled, Optional.empty());
initialize(cluster, conf, sslEnabled, false, Optional.empty(), Optional.empty(),
Optional.empty(), Optional.empty());
}

/**
* Constructor, allowing to define custom trust store (only for SSL connections)
* Constructor, allowing to define custom trust store (only for SSL connections) This constructor
* will create an object using the old faulty load balancing logic. When specifying multiple
* servers in the cluster object, it is highly recommended to call setSticky() on the created
* client, or use one of the preferred constructors instead.
* @param cluster the cluster definition
* @param trustStorePath custom trust store to use for SSL connections
* @param trustStorePassword password to use for custom trust store
Expand All @@ -176,22 +230,56 @@ public Client(Cluster cluster, Configuration conf, boolean sslEnabled) {
*/
public Client(Cluster cluster, String trustStorePath, Optional<String> trustStorePassword,
Optional<String> trustStoreType) {
this(cluster, HBaseConfiguration.create(), trustStorePath, trustStorePassword, trustStoreType);
this(cluster, HBaseConfiguration.create(), true, trustStorePath, trustStorePassword,
trustStoreType);
}

/**
* Constructor, allowing to define custom trust store (only for SSL connections)
* Constructor that accepts an optional trustStore and authentication information for either BASIC
* or BEARER authentication in sticky mode, which does not use the old faulty load balancing
* logic, and enables correct session handling. If neither userName/password, nor the bearer token
* is specified, the client falls back to SPNEGO auth. The loadTrustsore static method can be used
* to load a local trustStore file. This is the preferred constructor to use.
* @param cluster the cluster definition
* @param conf HBase/Hadoop configuration
* @param sslEnabled use HTTPS
* @param trustStore the optional trustStore object
* @param userName for BASIC auth
* @param password for BASIC auth
* @param bearerToken for BEAERER auth
*/
public Client(Cluster cluster, Configuration conf, boolean sslEnabled,
Optional<KeyStore> trustStore, Optional<String> userName, Optional<String> password,
Optional<String> bearerToken) {
initialize(cluster, conf, sslEnabled, true, trustStore, userName, password, bearerToken);
}

/**
* Constructor, allowing to define custom trust store (only for SSL connections) This constructor
* also enables sticky mode. This is a preferred constructor when not using BASIC or JWT
* authentication. Clients created by this will use the old faulty load balancing logic.
* @param cluster the cluster definition
* @param conf Configuration
* @param conf HBase/Hadoop Configuration
* @param trustStorePath custom trust store to use for SSL connections
* @param trustStorePassword password to use for custom trust store
* @param trustStoreType type of custom trust store
* @throws ClientTrustStoreInitializationException if the trust store file can not be loaded
*/
public Client(Cluster cluster, Configuration conf, String trustStorePath,
public Client(Cluster cluster, Configuration conf, boolean sslEnabled, String trustStorePath,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sslEnabled argument is not mentioned in the Javadoc nor used in the constructor either.

Copy link
Contributor Author

@stoty stoty May 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is also incorrect, as this sets sticky to false.
I will fix the comment as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Optional<String> trustStorePassword, Optional<String> trustStoreType) {
KeyStore trustStore = loadTruststore(trustStorePath, trustStorePassword, trustStoreType);
initialize(cluster, conf, sslEnabled, false, Optional.of(trustStore), Optional.empty(),
Optional.empty(), Optional.empty());
}

/**
* Loads a trustStore from the local fileSystem. Can be used to load the trustStore for the
* preferred constructor.
*/
public static KeyStore loadTruststore(String trustStorePath, Optional<String> trustStorePassword,
Optional<String> trustStoreType) {

char[] password = trustStorePassword.map(String::toCharArray).orElse(null);
char[] truststorePassword = trustStorePassword.map(String::toCharArray).orElse(null);
String type = trustStoreType.orElse(KeyStore.getDefaultType());

KeyStore trustStore;
Expand All @@ -202,13 +290,12 @@ public Client(Cluster cluster, Configuration conf, String trustStorePath,
}
try (InputStream inputStream =
new BufferedInputStream(Files.newInputStream(new File(trustStorePath).toPath()))) {
trustStore.load(inputStream, password);
trustStore.load(inputStream, truststorePassword);
} catch (CertificateException | NoSuchAlgorithmException | IOException e) {
throw new ClientTrustStoreInitializationException("Trust store load error: " + trustStorePath,
e);
}

initialize(cluster, conf, true, Optional.of(trustStore));
return trustStore;
}

/**
Expand Down Expand Up @@ -337,12 +424,24 @@ public HttpResponse executeURI(HttpUriRequest method, Header[] headers, String u
}
long startTime = EnvironmentEdgeManager.currentTime();
if (resp != null) EntityUtils.consumeQuietly(resp.getEntity());
resp = httpClient.execute(method);
if (stickyContext != null) {
resp = httpClient.execute(method, stickyContext);
} else {
resp = httpClient.execute(method);
}
if (resp.getStatusLine().getStatusCode() == HttpStatus.SC_UNAUTHORIZED) {
// Authentication error
LOG.debug("Performing negotiation with the server.");
negotiate(method, uri);
resp = httpClient.execute(method);
try {
negotiate(method, uri);
} catch (GeneralSecurityException e) {
throw new IOException(e);
}
if (stickyContext != null) {
resp = httpClient.execute(method, stickyContext);
} else {
resp = httpClient.execute(method);
}
}

long endTime = EnvironmentEdgeManager.currentTime();
Expand Down Expand Up @@ -377,19 +476,58 @@ public HttpResponse execute(Cluster cluster, HttpUriRequest method, Header[] hea
* @param uri the String to parse as a URL.
* @throws IOException if unknown protocol is found.
*/
private void negotiate(HttpUriRequest method, String uri) throws IOException {
private void negotiate(HttpUriRequest method, String uri)
throws IOException, GeneralSecurityException {
try {
AuthenticatedURL.Token token = new AuthenticatedURL.Token();
KerberosAuthenticator authenticator = new KerberosAuthenticator();
authenticator.authenticate(new URL(uri), token);
// Inject the obtained negotiated token in the method cookie
injectToken(method, token);
if (authenticator == null) {
authenticator = new KerberosAuthenticator();
if (trustStore.isPresent()) {
// The authenticator does not use Apache HttpClient, so we need to
// configure it separately to use the specified trustStore
Configuration sslConf = setupTrustStoreForHadoop(trustStore.get());
SSLFactory sslFactory = new SSLFactory(Mode.CLIENT, sslConf);
sslFactory.init();
authenticator.setConnectionConfigurator(sslFactory);
}
}
URL url = new URL(uri);
authenticator.authenticate(url, token);
if (sticky) {
BasicClientCookie authCookie = new BasicClientCookie("hadoop.auth", token.toString());
// Hadoop eats the domain even if set by server
authCookie.setDomain(url.getHost());
stickyContext.getCookieStore().addCookie(authCookie);
} else {
// session cookie is NOT set for backwards compatibility for non-sticky mode
// Inject the obtained negotiated token in the method cookie
// This is only done for this single request, the next one will trigger a new SPENGO
// handshake
injectToken(method, token);
}
} catch (AuthenticationException e) {
LOG.error("Failed to negotiate with the server.", e);
throw new IOException(e);
}
}

private Configuration setupTrustStoreForHadoop(KeyStore trustStore)
throws IOException, KeyStoreException, NoSuchAlgorithmException, CertificateException {
Path tmpDirPath = Files.createTempDirectory("hbase_rest_client_truststore");
File trustStoreFile = tmpDirPath.resolve("truststore.jks").toFile();
// Shouldn't be needed with the secure temp dir, but let's generate a password anyway
String password = Double.toString(Math.random());
try (FileOutputStream fos = new FileOutputStream(trustStoreFile)) {
trustStore.store(fos, password.toCharArray());
}

Configuration sslConf = new Configuration();
// Type is the Java default, we use the same JVM to read this back
sslConf.set("ssl.client.keystore.location", trustStoreFile.getAbsolutePath());
sslConf.set("ssl.client.keystore.password", password);
return sslConf;
}

/**
* Helper method that injects an authentication token to send with the method.
* @param method method to inject the authentication token into.
Expand Down Expand Up @@ -431,11 +569,21 @@ public boolean isSticky() {
* The default behaviour is load balancing by sending each request to a random host. This DOES NOT
* work with scans, which have state on the REST servers. Set sticky to true before attempting
* Scan related operations if more than one host is defined in the cluster. Nodes must not be
* added or removed from the Cluster object while sticky is true.
* added or removed from the Cluster object while sticky is true. Setting the sticky flag also
* enables session handling, which eliminates the need to re-authenticate each request, and lets
* the client handle any other cookies (like the sticky cookie set by load balancers) correctly.
* @param sticky whether subsequent requests will use the same host
*/
public void setSticky(boolean sticky) {
lastNodeId = null;
if (sticky) {
stickyContext = new HttpClientContext();
if (provider != null) {
stickyContext.setCredentialsProvider(provider);
}
} else {
stickyContext = null;
}
this.sticky = sticky;
}

Expand Down Expand Up @@ -654,7 +802,7 @@ public Response put(Cluster cluster, String path, Header[] headers, byte[] conte
throws IOException {
HttpPut method = new HttpPut(path);
try {
method.setEntity(new InputStreamEntity(new ByteArrayInputStream(content), content.length));
method.setEntity(new ByteArrayEntity(content));
HttpResponse resp = execute(cluster, method, headers, path);
headers = resp.getAllHeaders();
content = getResponseBody(resp);
Expand Down Expand Up @@ -748,7 +896,7 @@ public Response post(Cluster cluster, String path, Header[] headers, byte[] cont
throws IOException {
HttpPost method = new HttpPost(path);
try {
method.setEntity(new InputStreamEntity(new ByteArrayInputStream(content), content.length));
method.setEntity(new ByteArrayEntity(content));
HttpResponse resp = execute(cluster, method, headers, path);
headers = resp.getAllHeaders();
content = getResponseBody(resp);
Expand Down