-
Notifications
You must be signed in to change notification settings - Fork 25.6k
ESQL: Missing enrich policies on skip_unavailable=true clusters no longer fail the query #116972
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c53f65c
ad7c1c4
65e3654
4733271
d7e4df5
10d13cc
3705eb3
ebc931e
7e4cfaf
e12d7dd
8acb25c
22961c2
c9517ac
ac92ab1
404a01a
920f1d4
d94a7c5
8117530
f503eac
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| pr: 116972 | ||
| summary: "ESQL: Missing enrich policies on skip_unavailable=true clusters do not fail\ | ||
| \ the query" | ||
| area: ES|QL | ||
| type: enhancement | ||
| issues: [] |
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,12 +17,11 @@ | |
| import org.elasticsearch.cluster.service.ClusterService; | ||
| import org.elasticsearch.common.io.stream.StreamInput; | ||
| import org.elasticsearch.common.io.stream.StreamOutput; | ||
| import org.elasticsearch.common.util.CollectionUtils; | ||
| import org.elasticsearch.common.util.Maps; | ||
| import org.elasticsearch.common.util.concurrent.ConcurrentCollections; | ||
| import org.elasticsearch.common.util.concurrent.ThreadContext; | ||
| import org.elasticsearch.common.util.iterable.Iterables; | ||
| import org.elasticsearch.core.Tuple; | ||
| import org.elasticsearch.core.Nullable; | ||
| import org.elasticsearch.tasks.Task; | ||
| import org.elasticsearch.threadpool.ThreadPool; | ||
| import org.elasticsearch.transport.RemoteClusterAware; | ||
|
|
@@ -53,7 +52,6 @@ | |
| import java.util.Collection; | ||
| import java.util.Collections; | ||
| import java.util.HashMap; | ||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
|
|
@@ -98,81 +96,111 @@ public record UnresolvedPolicy(String name, Enrich.Mode mode) { | |
| /** | ||
| * Resolves a set of enrich policies | ||
| * | ||
| * @param targetClusters the target clusters | ||
| * @param targetClusters the target clusters; key: cluster alias; value: skip_unavailable setting | ||
| * @param unresolvedPolicies the unresolved policies | ||
| * @param listener notified with the enrich resolution | ||
| */ | ||
| public void resolvePolicies( | ||
| Collection<String> targetClusters, | ||
| Map<String, Boolean> targetClusters, | ||
| Collection<UnresolvedPolicy> unresolvedPolicies, | ||
| ActionListener<EnrichResolution> listener | ||
| ) { | ||
| if (unresolvedPolicies.isEmpty() || targetClusters.isEmpty()) { | ||
| listener.onResponse(new EnrichResolution()); | ||
| return; | ||
| } | ||
| final Set<String> remoteClusters = new HashSet<>(targetClusters); | ||
| final boolean includeLocal = remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); | ||
| lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, listener.map(lookupResponses -> { | ||
| final Map<String, Boolean> remotes = new HashMap<>(targetClusters); | ||
| final boolean includeLocal = targetClusters.containsKey(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); | ||
| remotes.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); | ||
| lookupPolicies(remotes.keySet(), includeLocal, unresolvedPolicies, listener.map(lookupResponses -> { | ||
| final EnrichResolution enrichResolution = new EnrichResolution(); | ||
|
|
||
| Map<String, LookupResponse> lookupResponsesToProcess = new HashMap<>(); | ||
|
|
||
| for (Map.Entry<String, LookupResponse> entry : lookupResponses.entrySet()) { | ||
| String clusterAlias = entry.getKey(); | ||
| if (entry.getValue().connectionError != null) { | ||
| enrichResolution.addUnavailableCluster(clusterAlias, entry.getValue().connectionError); | ||
| enrichResolution.addUnusableRemote(clusterAlias, entry.getValue().connectionError); | ||
| // remove unavailable cluster from the list of clusters which is used below to create the ResolvedEnrichPolicy | ||
| remoteClusters.remove(clusterAlias); | ||
| Boolean removedVal = remotes.remove(clusterAlias); | ||
| assert removedVal != null : "Remote " + clusterAlias + " was not removed from the remotes list."; | ||
smalyshev marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } else { | ||
| lookupResponsesToProcess.put(clusterAlias, entry.getValue()); | ||
| } | ||
| } | ||
|
|
||
| for (UnresolvedPolicy unresolved : unresolvedPolicies) { | ||
| Tuple<ResolvedEnrichPolicy, String> resolved = mergeLookupResults( | ||
| MergedPolicyLookupResult resolved = mergeLookupResults( | ||
| unresolved, | ||
| calculateTargetClusters(unresolved.mode, includeLocal, remoteClusters), | ||
| calculateTargetClusters(unresolved.mode, includeLocal, remotes), | ||
| lookupResponsesToProcess | ||
| ); | ||
|
|
||
| if (resolved.v1() != null) { | ||
| enrichResolution.addResolvedPolicy(unresolved.name, unresolved.mode, resolved.v1()); | ||
| } else { | ||
| assert resolved.v2() != null; | ||
| enrichResolution.addError(unresolved.name, unresolved.mode, resolved.v2()); | ||
| for (Map.Entry<String, Exception> entry : resolved.remotesToBeSkipped().entrySet()) { | ||
| enrichResolution.addUnusableRemote(entry.getKey(), entry.getValue()); | ||
| } | ||
| // If a remote-only CCS is done and all the remotes are "unusable" (due to missing enrich policies) | ||
| // *and* skippable (skip_unavailable=true), don't fill in either the resolved policy or the error on | ||
| // the enrichResolution object. The only field with contents will be the "unusable remotes" field. | ||
| if (resolved.resolvedPolicy() != null) { | ||
| enrichResolution.addResolvedPolicy(unresolved.name, unresolved.mode, resolved.resolvedPolicy()); | ||
| } else if (resolved.error() != null) { | ||
| enrichResolution.addError(unresolved.name, unresolved.mode, resolved.error()); | ||
| } | ||
| } | ||
| return enrichResolution; | ||
| })); | ||
| } | ||
|
|
||
| private Collection<String> calculateTargetClusters(Enrich.Mode mode, boolean includeLocal, Set<String> remoteClusters) { | ||
| record MergedPolicyLookupResult( | ||
| @Nullable ResolvedEnrichPolicy resolvedPolicy, | ||
| Map<String, Exception> remotesToBeSkipped, | ||
| @Nullable String error | ||
| ) {} | ||
|
|
||
| /** | ||
| * @param mode | ||
| * @param includeLocal | ||
| * @param remoteClusters | ||
| * @return Map where key: String=clusterAlias, value: Boolean=skipUnavailable setting | ||
| */ | ||
| private Map<String, Boolean> calculateTargetClusters(Enrich.Mode mode, boolean includeLocal, Map<String, Boolean> remoteClusters) { | ||
| return switch (mode) { | ||
| case ANY -> CollectionUtils.appendToCopy(remoteClusters, RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); | ||
| case COORDINATOR -> List.of(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); | ||
| case REMOTE -> includeLocal | ||
| ? CollectionUtils.appendToCopy(remoteClusters, RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) | ||
| : remoteClusters; | ||
| case ANY -> copyAndAddLocalCluster(remoteClusters); | ||
| case COORDINATOR -> Map.of(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Boolean.FALSE); | ||
| case REMOTE -> includeLocal ? copyAndAddLocalCluster(remoteClusters) : remoteClusters; | ||
| }; | ||
| } | ||
|
|
||
| private Map<String, Boolean> copyAndAddLocalCluster(Map<String, Boolean> remoteClusters) { | ||
| Map<String, Boolean> newMap = new HashMap<>(remoteClusters); | ||
| // technically the local cluster has no skip_unavailable setting, but for enrich policy resolution | ||
| // purposes we treat errors on the local cluster as fatal, so set it as false to simplify downstream code | ||
| newMap.put(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Boolean.FALSE); | ||
| return newMap; | ||
| } | ||
|
|
||
| /** | ||
| * Resolve an enrich policy by merging the lookup responses from the target clusters. | ||
| * @return a resolved enrich policy or an error | ||
| */ | ||
| private Tuple<ResolvedEnrichPolicy, String> mergeLookupResults( | ||
| private MergedPolicyLookupResult mergeLookupResults( | ||
| UnresolvedPolicy unresolved, | ||
| Collection<String> targetClusters, | ||
| Map<String, Boolean> targetClusters, | ||
| Map<String, LookupResponse> lookupResults | ||
| ) { | ||
| String policyName = unresolved.name; | ||
| if (targetClusters.isEmpty()) { | ||
| return Tuple.tuple(null, "enrich policy [" + policyName + "] cannot be resolved since remote clusters are unavailable"); | ||
| String error = "enrich policy [" + policyName + "] cannot be resolved since remote clusters are unavailable"; | ||
| return new MergedPolicyLookupResult(null, Collections.emptyMap(), error); | ||
| } | ||
| // key for this map is cluster alias; a cluster will be in this map only if it has resolved the enrich policy/policies needed | ||
| final Map<String, ResolvedEnrichPolicy> policies = new HashMap<>(); | ||
| final List<String> failures = new ArrayList<>(); | ||
| for (String cluster : targetClusters) { | ||
| final List<String> failures = new ArrayList<>(); // (fatal) failures on local or skip_unavailable=false remote | ||
| final Map<String, Exception> remotesToBeSkipped = new HashMap<>(); // skip_unavailable=true remotes enrich policy failures | ||
| for (Map.Entry<String, Boolean> clusterInfo : targetClusters.entrySet()) { | ||
| String cluster = clusterInfo.getKey(); | ||
| boolean skipUnavailable = clusterInfo.getValue(); | ||
| LookupResponse lookupResult = lookupResults.get(cluster); | ||
| if (lookupResult != null) { | ||
| assert lookupResult.connectionError == null : "Should never have a non-null connectionError here"; | ||
|
|
@@ -182,35 +210,52 @@ private Tuple<ResolvedEnrichPolicy, String> mergeLookupResults( | |
| } else { | ||
| final String failure = lookupResult.failures.get(policyName); | ||
| if (failure != null) { | ||
| failures.add(failure); | ||
| if (skipUnavailable) { | ||
| remotesToBeSkipped.put(cluster, new IllegalStateException(failure)); | ||
| } else { | ||
| failures.add(failure); | ||
| } | ||
| } else if (skipUnavailable) { | ||
| // code path where there was no enrich policy found at all on the remote | ||
| remotesToBeSkipped.put(cluster, new IllegalStateException("failed to resolve enrich policy [" + policyName + "]")); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| if (targetClusters.size() != policies.size()) { | ||
| if (targetClusters.size() != policies.size() + remotesToBeSkipped.size()) { | ||
| final String reason; | ||
| if (failures.isEmpty()) { | ||
| List<String> missingClusters = targetClusters.stream().filter(c -> policies.containsKey(c) == false).sorted().toList(); | ||
| reason = missingPolicyError(policyName, targetClusters, missingClusters); | ||
| List<String> missingClusters = targetClusters.keySet() | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not convinced this block (under ( |
||
| .stream() | ||
| .filter(c -> policies.containsKey(c) == false) | ||
| .sorted() | ||
| .toList(); | ||
| reason = missingPolicyError(policyName, targetClusters.keySet(), missingClusters); | ||
| } else { | ||
| reason = "failed to resolve enrich policy [" + policyName + "]; reason " + failures; | ||
| } | ||
| return Tuple.tuple(null, reason); | ||
| return new MergedPolicyLookupResult(null, remotesToBeSkipped, reason); | ||
| } else if (policies.isEmpty()) { | ||
| // no target cluster had a valid enrich policy and all are skip_unavailable=true | ||
| assert targetClusters.values().stream().allMatch(b -> b == Boolean.TRUE) : "Not all target clusters are skip_unavailable=true"; | ||
| return new MergedPolicyLookupResult(null, remotesToBeSkipped, null); | ||
| } | ||
|
|
||
| Map<String, EsField> mappings = new HashMap<>(); | ||
| Map<String, String> concreteIndices = new HashMap<>(); | ||
| ResolvedEnrichPolicy last = null; | ||
| // loop over clusters with a ResolvedEnrichPolicy - ensure no errors within the policy | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that mismatches across policies (being checked in the section below here) are still fatal errors for skip_unavailable=true clusters. I started down the road of having these be skippable errors, but that looks rather tricky to pull off. At a minimum, you'd have to partition the policies by "skip_unavailable" and build a canonical list of fields/types/etc. from skip_un=false clusters and then compare the skip_un=true clusters and then if any mismatches are found, those aren't fatal, but you pull that cluster out of the list to be resolved for field-caps. Not impossible but this section would require a significant rewrite, so I decided to only handle missing enrich policies (and policies that have errors on the remote cluster during resolution), but still fail them based on mismatches between policies. |
||
| for (Map.Entry<String, ResolvedEnrichPolicy> e : policies.entrySet()) { | ||
| ResolvedEnrichPolicy curr = e.getValue(); | ||
| if (last != null && last.matchField().equals(curr.matchField()) == false) { | ||
| String error = "enrich policy [" + policyName + "] has different match fields "; | ||
| error += "[" + last.matchField() + ", " + curr.matchField() + "] across clusters"; | ||
| return Tuple.tuple(null, error); | ||
| return new MergedPolicyLookupResult(null, remotesToBeSkipped, error); | ||
| } | ||
| if (last != null && last.matchType().equals(curr.matchType()) == false) { | ||
| String error = "enrich policy [" + policyName + "] has different match types "; | ||
| error += "[" + last.matchType() + ", " + curr.matchType() + "] across clusters"; | ||
| return Tuple.tuple(null, error); | ||
| return new MergedPolicyLookupResult(null, remotesToBeSkipped, error); | ||
| } | ||
| // merge mappings | ||
| for (Map.Entry<String, EsField> m : curr.mapping().entrySet()) { | ||
|
|
@@ -226,7 +271,7 @@ private Tuple<ResolvedEnrichPolicy, String> mergeLookupResults( | |
| if (old != null && old.getDataType().equals(field.getDataType()) == false) { | ||
| String error = "field [" + m.getKey() + "] of enrich policy [" + policyName + "] has different data types "; | ||
| error += "[" + old.getDataType() + ", " + field.getDataType() + "] across clusters"; | ||
| return Tuple.tuple(null, error); | ||
| return new MergedPolicyLookupResult(null, remotesToBeSkipped, error); | ||
| } | ||
| } | ||
| if (last != null) { | ||
|
|
@@ -237,7 +282,8 @@ private Tuple<ResolvedEnrichPolicy, String> mergeLookupResults( | |
| var diff = counts.entrySet().stream().filter(f -> f.getValue() < 2).map(Map.Entry::getKey).limit(20).sorted().toList(); | ||
| if (diff.isEmpty() == false) { | ||
| String detailed = "these fields are missing in some policies: " + diff; | ||
| return Tuple.tuple(null, "enrich policy [" + policyName + "] has different enrich fields across clusters; " + detailed); | ||
| String fullMessage = "enrich policy [" + policyName + "] has different enrich fields across clusters; " + detailed; | ||
| return new MergedPolicyLookupResult(null, remotesToBeSkipped, fullMessage); | ||
| } | ||
| } | ||
| // merge concrete indices | ||
|
|
@@ -246,7 +292,7 @@ private Tuple<ResolvedEnrichPolicy, String> mergeLookupResults( | |
| } | ||
| assert last != null; | ||
| var resolved = new ResolvedEnrichPolicy(last.matchField(), last.matchType(), last.enrichFields(), concreteIndices, mappings); | ||
| return Tuple.tuple(resolved, null); | ||
| return new MergedPolicyLookupResult(resolved, remotesToBeSkipped, null); | ||
| } | ||
|
|
||
| private String missingPolicyError(String policyName, Collection<String> targetClusters, List<String> missingClusters) { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.