-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Verify that main info response returns correct product headers #73910
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
283230e
657c7d9
64c5684
8f9cf5d
ceb527a
115e9d1
94d88f7
b87302d
6aee0ed
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -9,6 +9,9 @@ | |||||
| package org.elasticsearch.client; | ||||||
|
|
||||||
| import org.apache.http.HttpEntity; | ||||||
| import org.apache.logging.log4j.LogManager; | ||||||
| import org.apache.logging.log4j.Logger; | ||||||
| import org.elasticsearch.Build; | ||||||
| import org.elasticsearch.ElasticsearchException; | ||||||
| import org.elasticsearch.ElasticsearchStatusException; | ||||||
| import org.elasticsearch.action.ActionListener; | ||||||
|
|
@@ -64,9 +67,12 @@ | |||||
| import org.elasticsearch.client.core.TermVectorsRequest; | ||||||
| import org.elasticsearch.client.core.TermVectorsResponse; | ||||||
| import org.elasticsearch.client.tasks.TaskSubmissionResponse; | ||||||
| import org.elasticsearch.common.util.concurrent.FutureUtils; | ||||||
| import org.elasticsearch.core.CheckedConsumer; | ||||||
| import org.elasticsearch.core.CheckedFunction; | ||||||
| import org.elasticsearch.common.xcontent.ParseField; | ||||||
| import org.elasticsearch.common.Strings; | ||||||
| import org.elasticsearch.common.util.concurrent.ListenableFuture; | ||||||
| import org.elasticsearch.common.xcontent.ContextParser; | ||||||
| import org.elasticsearch.common.xcontent.DeprecationHandler; | ||||||
| import org.elasticsearch.common.xcontent.NamedXContentRegistry; | ||||||
|
|
@@ -205,6 +211,8 @@ | |||||
| import java.util.Optional; | ||||||
| import java.util.ServiceLoader; | ||||||
| import java.util.Set; | ||||||
| import java.util.concurrent.CompletableFuture; | ||||||
| import java.util.concurrent.ExecutionException; | ||||||
| import java.util.function.Function; | ||||||
| import java.util.stream.Collectors; | ||||||
| import java.util.stream.Stream; | ||||||
|
|
@@ -244,10 +252,16 @@ | |||||
| */ | ||||||
| public class RestHighLevelClient implements Closeable { | ||||||
|
|
||||||
| private static final Logger logger = LogManager.getLogger(RestHighLevelClient.class); | ||||||
|
|
||||||
| // To be called using performClientRequest and performClientRequestAsync to ensure version compatibility check | ||||||
| private final RestClient client; | ||||||
| private final NamedXContentRegistry registry; | ||||||
| private final CheckedConsumer<RestClient, IOException> doClose; | ||||||
|
|
||||||
| /** Do not access directly but through getVersionValidationFuture() */ | ||||||
| private volatile ListenableFuture<Optional<String>> versionValidationFuture; | ||||||
|
|
||||||
| private final IndicesClient indicesClient = new IndicesClient(this); | ||||||
| private final ClusterClient clusterClient = new ClusterClient(this); | ||||||
| private final IngestClient ingestClient = new IngestClient(this); | ||||||
|
|
@@ -1715,7 +1729,7 @@ private <Req, Resp> Resp internalPerformRequest(Req request, | |||||
| req.setOptions(options); | ||||||
| Response response; | ||||||
| try { | ||||||
| response = client.performRequest(req); | ||||||
| response = performClientRequest(req); | ||||||
| } catch (ResponseException e) { | ||||||
| if (ignores.contains(e.getResponse().getStatusLine().getStatusCode())) { | ||||||
| try { | ||||||
|
|
@@ -1755,7 +1769,7 @@ protected final <Req extends Validatable, Resp> Optional<Resp> performRequestAnd | |||||
| req.setOptions(options); | ||||||
| Response response; | ||||||
| try { | ||||||
| response = client.performRequest(req); | ||||||
| response = performClientRequest(req); | ||||||
| } catch (ResponseException e) { | ||||||
| if (RestStatus.NOT_FOUND.getStatus() == e.getResponse().getStatusLine().getStatusCode()) { | ||||||
| return Optional.empty(); | ||||||
|
|
@@ -1854,7 +1868,7 @@ private <Req, Resp> Cancellable internalPerformRequestAsync(Req request, | |||||
| req.setOptions(options); | ||||||
|
|
||||||
| ResponseListener responseListener = wrapResponseListener(responseConverter, listener, ignores); | ||||||
| return client.performRequestAsync(req, responseListener); | ||||||
| return performClientRequestAsync(req, responseListener); | ||||||
| } | ||||||
|
|
||||||
|
|
||||||
|
|
@@ -1920,7 +1934,7 @@ protected final <Req extends Validatable, Resp> Cancellable performRequestAsyncA | |||||
| req.setOptions(options); | ||||||
| ResponseListener responseListener = wrapResponseListener404sOptional(response -> parseEntity(response.getEntity(), | ||||||
| entityParser), listener); | ||||||
| return client.performRequestAsync(req, responseListener); | ||||||
| return performClientRequestAsync(req, responseListener); | ||||||
| } | ||||||
|
|
||||||
| final <Resp> ResponseListener wrapResponseListener404sOptional(CheckedFunction<Response, Resp, IOException> responseConverter, | ||||||
|
|
@@ -2002,6 +2016,204 @@ protected static boolean convertExistsResponse(Response response) { | |||||
| return response.getStatusLine().getStatusCode() == 200; | ||||||
| } | ||||||
|
|
||||||
| private Cancellable performClientRequestAsync(Request request, ResponseListener listener) { | ||||||
|
|
||||||
| ListenableFuture<Optional<String>> versionCheck = getVersionValidationFuture(); | ||||||
|
||||||
|
|
||||||
| // Create a future that tracks cancellation of this method's result and forwards cancellation to the actual LLRC request. | ||||||
| CompletableFuture<Void> cancellationForwarder = new CompletableFuture<Void>(); | ||||||
| Cancellable result = new Cancellable() { | ||||||
| @Override | ||||||
| public void cancel() { | ||||||
| // Raise the flag by completing the future | ||||||
| FutureUtils.cancel(cancellationForwarder); | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
| void runIfNotCancelled(Runnable runnable) { | ||||||
| if (cancellationForwarder.isCancelled()) { | ||||||
| throw newCancellationException(); | ||||||
| } | ||||||
| runnable.run(); | ||||||
| } | ||||||
| }; | ||||||
|
|
||||||
| // Send the request after we have done the version compatibility check. Note that if it has already happened, the listener will | ||||||
| // be called immediately on the same thread with no asynchronous scheduling overhead. | ||||||
| versionCheck.addListener(new ActionListener<Optional<String>>() { | ||||||
| @Override | ||||||
| public void onResponse(Optional<String> validation) { | ||||||
| if (validation.isEmpty()) { | ||||||
| // Send the request and propagate cancellation | ||||||
| Cancellable call = client.performRequestAsync(request, listener); | ||||||
| cancellationForwarder.whenComplete((r, t) -> | ||||||
| // Forward cancellation to the actual request (no need to check parameters as the | ||||||
| // only way for cancellationForwarder to be completed is by being cancelled). | ||||||
| call.cancel() | ||||||
| ); | ||||||
| } else { | ||||||
| // Version validation wasn't successful, fail the request with the validation result. | ||||||
| listener.onFailure(new ElasticsearchException(validation.get())); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
| public void onFailure(Exception e) { | ||||||
| // Propagate validation request failure. This will be transient since `getVersionValidationFuture` clears the validation | ||||||
| // future if the request fails, leading to retries at the next HLRC request (see comments below). | ||||||
| listener.onFailure(e); | ||||||
| } | ||||||
| }); | ||||||
|
|
||||||
| return result; | ||||||
| }; | ||||||
|
|
||||||
| private Response performClientRequest(Request request) throws IOException { | ||||||
|
|
||||||
| Optional<String> versionValidation; | ||||||
| try { | ||||||
| versionValidation = getVersionValidationFuture().get(); | ||||||
| } catch (InterruptedException | ExecutionException e) { | ||||||
| // Unlikely to happen | ||||||
| throw new ElasticsearchException(e); | ||||||
| } | ||||||
|
|
||||||
| if (versionValidation.isEmpty()) { | ||||||
| return client.performRequest(request); | ||||||
| } else { | ||||||
| throw new ElasticsearchException(versionValidation.get()); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * Returns a future that asynchronously validates the Elasticsearch product version. Its result is an optional string: if empty then | ||||||
| * validation was successful, if present it contains the validation error. API requests should be chained to this future and check | ||||||
| * the validation result before going further. | ||||||
| * <p> | ||||||
| * This future is a memoization of the first successful request to the "/" endpoint and the subsequent compatibility check | ||||||
| * ({@see #versionValidationFuture}). Further client requests reuse its result. | ||||||
| * <p> | ||||||
| * If the version check request fails (e.g. network error), {@link #versionValidationFuture} is cleared so that a new validation | ||||||
| * request is sent at the next HLRC request. This allows retries to happen while avoiding a busy retry loop (LLRC retries on the node | ||||||
| * pool still happen). | ||||||
| */ | ||||||
| private ListenableFuture<Optional<String>> getVersionValidationFuture() { | ||||||
| ListenableFuture<Optional<String>> currentFuture = this.versionValidationFuture; | ||||||
| if (currentFuture != null) { | ||||||
| return currentFuture; | ||||||
| } else { | ||||||
| synchronized (this) { | ||||||
| // Re-check in synchronized block | ||||||
| currentFuture = this.versionValidationFuture; | ||||||
| if (currentFuture != null) { | ||||||
| return currentFuture; | ||||||
| } | ||||||
| ListenableFuture<Optional<String>> future = new ListenableFuture<>(); | ||||||
| this.versionValidationFuture = future; | ||||||
|
|
||||||
| // Asynchronously call the info endpoint and complete the future with the version validation result. | ||||||
| Request req = new Request("GET", "/"); | ||||||
| // These status codes are nominal in the context of product version verification | ||||||
| req.addParameter("ignore", "401,403"); | ||||||
| client.performRequestAsync(req, new ResponseListener() { | ||||||
| @Override | ||||||
| public void onSuccess(Response response) { | ||||||
| Optional<String> validation; | ||||||
| try { | ||||||
| validation = getVersionValidation(response); | ||||||
| } catch (Exception e) { | ||||||
| logger.error("Failed to parse info response", e); | ||||||
| validation = Optional.of("Failed to parse info response. Check logs for detailed information - " + | ||||||
| e.getMessage()); | ||||||
| } | ||||||
| future.onResponse(validation); | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
| public void onFailure(Exception exception) { | ||||||
|
|
||||||
| // Fail the requests (this one and the ones waiting for it) and clear the future | ||||||
| // so that we retry the next time the client executes a request. | ||||||
| versionValidationFuture = null; | ||||||
| future.onFailure(exception); | ||||||
dakrone marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||
| } | ||||||
| }); | ||||||
|
|
||||||
| return future; | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * Validates that the response info() is a compatible Elasticsearch version. | ||||||
| * | ||||||
| * @return an optional string. If empty, version is compatible. Otherwise, it's the message to return to the application. | ||||||
| */ | ||||||
| private Optional<String> getVersionValidation(Response response) throws IOException { | ||||||
| // Let requests go through if the client doesn't have permissions for the info endpoint. | ||||||
| int statusCode = response.getStatusLine().getStatusCode(); | ||||||
| if (statusCode == 401 || statusCode == 403) { | ||||||
| return Optional.empty(); | ||||||
| } | ||||||
|
|
||||||
| MainResponse mainResponse; | ||||||
| try { | ||||||
| mainResponse = parseEntity(response.getEntity(), MainResponse::fromXContent); | ||||||
| } catch (ResponseException e) { | ||||||
| throw parseResponseException(e); | ||||||
| } | ||||||
|
|
||||||
| String version = mainResponse.getVersion().getNumber(); | ||||||
| if (Strings.hasLength(version) == false) { | ||||||
| return Optional.of("Missing version.number in info response"); | ||||||
| } | ||||||
|
|
||||||
| String[] parts = version.split("\\."); | ||||||
| if (parts.length < 2) { | ||||||
| return Optional.of("Wrong version.number format in info response"); | ||||||
| } | ||||||
|
|
||||||
| int major = Integer.parseInt(parts[0]); | ||||||
| int minor = Integer.parseInt(parts[1]); | ||||||
|
|
||||||
| if (major < 6) { | ||||||
| return Optional.of("Elasticsearch version 6 or more is required"); | ||||||
|
||||||
| return Optional.of("Elasticsearch version 6 or more is required"); | |
| return Optional.of("Elasticsearch version 6 or higher is required"); |
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should perform additional validation here of build_flavor and taglines.
Uh oh!
There was an error while loading. Please reload this page.