diff --git a/docs/changelog/116972.yaml b/docs/changelog/116972.yaml new file mode 100644 index 0000000000000..0387b3e9619cc --- /dev/null +++ b/docs/changelog/116972.yaml @@ -0,0 +1,6 @@ +pr: 116972 +summary: "ESQL: Missing enrich policies on skip_unavailable=true clusters do not fail\ + \ the query" +area: ES|QL +type: enhancement +issues: [] diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java index e8e9f45694e9c..475076fd0af63 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java @@ -24,6 +24,7 @@ import org.elasticsearch.protocol.xpack.XPackInfoResponse; import org.elasticsearch.reindex.ReindexPlugin; import org.elasticsearch.test.AbstractMultiClustersTestCase; +import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; import org.elasticsearch.xpack.core.XPackSettings; @@ -606,6 +607,321 @@ public void testEnrichCoordinatorThenEnrichRemote() { ); } + public void testWithHostsPolicyWithMissingEnrichPolicyOnSkipUnavailableTrueCluster() { + // delete enrich policies from cluster c1 + for (String policy : List.of("hosts", "vendors")) { + assertAcked( + client("c1").execute(DeleteEnrichPolicyAction.INSTANCE, new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policy)) + ); + } + for (var mode : Enrich.Mode.values()) { + String query = "FROM events | eval ip= TO_STR(host) | " + enrichHosts(mode) + " | stats c = COUNT(*) by os | SORT os"; + try (EsqlQueryResponse resp = runQuery(query, null)) { + List> rows = getValuesList(resp); + assertThat( + rows, + equalTo( + List.of( + List.of(2L, "Android"), + List.of(1L, "Linux"), + List.of(1L, "MacOS"), + List.of(4L, "Windows"), + Arrays.asList(1L, (String) null) + ) + ) + ); + assertFalse(resp.getExecutionInfo().isCrossClusterSearch()); + } + } + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + boolean responseExpectMeta = includeCCSMetadata.v2(); + { + // since these two modes require the enrich policy on the remote cluster - c1 will be skipped + Enrich.Mode mode = randomFrom(Enrich.Mode.REMOTE, Enrich.Mode.ANY); + String query = "FROM *:events | eval ip= TO_STR(host) | " + enrichHosts(mode) + " | stats c = COUNT(*) by os | SORT os"; + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { + List> rows = getValuesList(resp); + assertThat(rows.size(), greaterThanOrEqualTo(1)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of("c1", "c2"))); + assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); + assertTrue(executionInfo.isCrossClusterSearch()); + EsqlExecutionInfo.Cluster remote1 = executionInfo.getCluster("c1"); + // this test assumes (but does not set) skip_unavailable=true + assertThat(remote1.isSkipUnavailable(), equalTo(true)); + assertThat(remote1.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote1.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remote1.getIndexExpression(), equalTo("events")); + assertThat(remote1.getTotalShards(), equalTo(0)); + assertThat(remote1.getSuccessfulShards(), equalTo(0)); + assertThat(remote1.getSkippedShards(), equalTo(0)); + assertThat(remote1.getFailedShards(), equalTo(0)); + assertThat(remote1.getFailures().size(), equalTo(1)); + assertThat(remote1.getFailures().get(0).getCause().getMessage(), containsString("failed to resolve enrich policy [hosts]")); + EsqlExecutionInfo.Cluster remote2 = executionInfo.getCluster("c2"); + assertThat(remote2.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote2.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(remote2.getIndexExpression(), equalTo("events")); + assertThat(remote2.getTotalShards(), equalTo(1)); + assertThat(remote2.getSuccessfulShards(), equalTo(1)); + assertThat(remote2.getSkippedShards(), equalTo(0)); + assertThat(remote2.getFailedShards(), equalTo(0)); + assertThat(remote2.getFailures().size(), equalTo(0)); + } + } + { + Enrich.Mode mode = randomFrom(Enrich.Mode.REMOTE, Enrich.Mode.ANY); + // c1 queried by itself - should be skipped with "" column result + String query = "FROM c1:events | eval ip= TO_STR(host) | " + enrichHosts(mode) + " | stats c = COUNT(*) by os | SORT os"; + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { + List> rows = getValuesList(resp); + assertThat(rows.size(), equalTo(0)); + assertThat(resp.columns().size(), equalTo(1)); + assertThat(resp.columns().get(0).name(), equalTo("")); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of("c1"))); + assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); + assertTrue(executionInfo.isCrossClusterSearch()); + EsqlExecutionInfo.Cluster remote1 = executionInfo.getCluster("c1"); + assertThat(remote1.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote1.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remote1.getIndexExpression(), equalTo("events")); + assertThat(remote1.getTotalShards(), equalTo(0)); + assertThat(remote1.getSuccessfulShards(), equalTo(0)); + assertThat(remote1.getSkippedShards(), equalTo(0)); + assertThat(remote1.getFailedShards(), equalTo(0)); + assertThat(remote1.getFailures().size(), equalTo(1)); + assertThat(remote1.getFailures().get(0).getCause().getMessage(), containsString("failed to resolve enrich policy [hosts]")); + } + } + { + // COORDINATOR modes requires the enrich policy only on the coordinator, so c1 should not be skipped + Enrich.Mode mode = Enrich.Mode.COORDINATOR; + String query = "FROM *:events | eval ip= TO_STR(host) | " + enrichHosts(mode) + " | stats c = COUNT(*) by os | SORT os"; + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { + List> rows = getValuesList(resp); + assertThat(rows.size(), greaterThanOrEqualTo(1)); + assertThat( + rows, + equalTo( + List.of( + List.of(1L, "Android"), + List.of(2L, "Linux"), + List.of(4L, "MacOS"), + List.of(3L, "Windows"), + List.of(1L, "iOS"), + Arrays.asList(2L, (String) null) + ) + ) + ); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of("c1", "c2"))); + assertCCSExecutionInfoDetails(executionInfo); + } + } + for (var mode : Enrich.Mode.values()) { + String query = "FROM *:events,events | eval ip= TO_STR(host) | " + enrichHosts(mode) + " | stats c = COUNT(*) by os | SORT os"; + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { + List> rows = getValuesList(resp); + assertThat(rows.size(), greaterThanOrEqualTo(1)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, "c1", "c2"))); + if (mode == Enrich.Mode.COORDINATOR) { + // COORDINATOR mode only requires the enrich policy on the local cluster + assertCCSExecutionInfoDetails(executionInfo); + } else { + EsqlExecutionInfo.Cluster remote1 = executionInfo.getCluster("c1"); + assertThat(remote1.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote1.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remote1.getFailures().size(), equalTo(1)); + assertThat( + remote1.getFailures().get(0).getCause().getMessage(), + containsString("failed to resolve enrich policy [hosts]") + ); + EsqlExecutionInfo.Cluster remote2 = executionInfo.getCluster("c2"); + assertThat(remote2.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote2.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(remote2.getFailures().size(), equalTo(0)); + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(localCluster.getFailures().size(), equalTo(0)); + } + } + } + // delete enrich policies from cluster c2 also + for (String policy : List.of("hosts", "vendors")) { + assertAcked( + client("c2").execute(DeleteEnrichPolicyAction.INSTANCE, new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policy)) + ); + } + for (var mode : Enrich.Mode.values()) { + String query = "FROM *:events,events | eval ip= TO_STR(host) | " + enrichHosts(mode) + " | stats c = COUNT(*) by os | SORT os"; + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { + List> rows = getValuesList(resp); + assertThat(rows.size(), greaterThanOrEqualTo(1)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, "c1", "c2"))); + if (mode == Enrich.Mode.COORDINATOR) { + // COORDINATOR mode only requires the enrich policy on the local cluster + assertCCSExecutionInfoDetails(executionInfo); + } else { + EsqlExecutionInfo.Cluster remote1 = executionInfo.getCluster("c1"); + assertThat(remote1.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote1.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remote1.getFailures().size(), equalTo(1)); + assertThat( + remote1.getFailures().get(0).getCause().getMessage(), + containsString("failed to resolve enrich policy [hosts]") + ); + EsqlExecutionInfo.Cluster remote2 = executionInfo.getCluster("c2"); + assertThat(remote2.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote2.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remote2.getFailures().size(), equalTo(1)); + assertThat( + remote1.getFailures().get(0).getCause().getMessage(), + containsString("failed to resolve enrich policy [hosts]") + ); + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(localCluster.getFailures().size(), equalTo(0)); + } + } + } + } + + public void testRemoteHasOnePolicyButNotTheOther() { + // delete the hosts enrich policy from c1, but not the vendors policy + assertAcked( + client("c1").execute(DeleteEnrichPolicyAction.INSTANCE, new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "hosts")) + ); + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + boolean responseExpectMeta = includeCCSMetadata.v2(); + { + for (Enrich.Mode hostMode : List.of(Enrich.Mode.ANY, Enrich.Mode.REMOTE)) { + var query = String.format(Locale.ROOT, """ + FROM *:events,events + | eval ip= TO_STR(host) + | %s + | %s + | stats c = COUNT(*) by vendor + | sort vendor + """, enrichHosts(hostMode), enrichVendors(Enrich.Mode.REMOTE)); + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { + assertThat(getValuesList(resp).size(), greaterThanOrEqualTo(1)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, "c1", "c2"))); + EsqlExecutionInfo.Cluster remote1 = executionInfo.getCluster("c1"); + // this test assumes (but does not set) skip_unavailable=true + assertThat(remote1.isSkipUnavailable(), equalTo(true)); + assertThat(remote1.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote1.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remote1.getIndexExpression(), equalTo("events")); + assertThat(remote1.getTotalShards(), equalTo(0)); + assertThat(remote1.getSuccessfulShards(), equalTo(0)); + assertThat(remote1.getSkippedShards(), equalTo(0)); + assertThat(remote1.getFailedShards(), equalTo(0)); + assertThat(remote1.getFailures().size(), equalTo(1)); + assertThat( + remote1.getFailures().get(0).getCause().getMessage(), + containsString("failed to resolve enrich policy [hosts]") + ); + EsqlExecutionInfo.Cluster remote2 = executionInfo.getCluster("c2"); + assertThat(remote2.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote2.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(remote2.getIndexExpression(), equalTo("events")); + assertThat(remote2.getTotalShards(), equalTo(1)); + assertThat(remote2.getSuccessfulShards(), equalTo(1)); + assertThat(remote2.getSkippedShards(), equalTo(0)); + assertThat(remote2.getFailedShards(), equalTo(0)); + assertThat(remote2.getFailures().size(), equalTo(0)); + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(localCluster.getIndexExpression(), equalTo("events")); + assertThat(localCluster.getTotalShards(), equalTo(1)); + assertThat(localCluster.getSuccessfulShards(), equalTo(1)); + assertThat(localCluster.getSkippedShards(), equalTo(0)); + assertThat(localCluster.getFailedShards(), equalTo(0)); + assertThat(localCluster.getFailures().size(), equalTo(0)); + } + } + // delete the vendors policy but not hosts from c2 + assertAcked( + client("c2").execute( + DeleteEnrichPolicyAction.INSTANCE, + new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "vendors") + ) + ); + { + for (Enrich.Mode hostMode : List.of(Enrich.Mode.ANY, Enrich.Mode.REMOTE)) { + var query = String.format(Locale.ROOT, """ + FROM *:events,events + | eval ip= TO_STR(host) + | %s + | %s + | stats c = COUNT(*) by vendor + | sort vendor + """, enrichHosts(hostMode), enrichVendors(Enrich.Mode.REMOTE)); + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { + assertThat(getValuesList(resp).size(), greaterThanOrEqualTo(1)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, "c1", "c2"))); + EsqlExecutionInfo.Cluster remote1 = executionInfo.getCluster("c1"); + // this test assumes (but does not set) skip_unavailable=true + assertThat(remote1.isSkipUnavailable(), equalTo(true)); + assertThat(remote1.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote1.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remote1.getIndexExpression(), equalTo("events")); + assertThat(remote1.getTotalShards(), equalTo(0)); + assertThat(remote1.getSuccessfulShards(), equalTo(0)); + assertThat(remote1.getSkippedShards(), equalTo(0)); + assertThat(remote1.getFailedShards(), equalTo(0)); + assertThat(remote1.getFailures().size(), equalTo(1)); + assertThat( + remote1.getFailures().get(0).getCause().getMessage(), + containsString("failed to resolve enrich policy [hosts]") + ); + EsqlExecutionInfo.Cluster remote2 = executionInfo.getCluster("c2"); + // this test assumes (but does not set) skip_unavailable=true + assertThat(remote2.isSkipUnavailable(), equalTo(true)); + assertThat(remote2.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote2.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remote2.getIndexExpression(), equalTo("events")); + assertThat(remote2.getTotalShards(), equalTo(0)); + assertThat(remote2.getSuccessfulShards(), equalTo(0)); + assertThat(remote2.getSkippedShards(), equalTo(0)); + assertThat(remote2.getFailedShards(), equalTo(0)); + assertThat(remote2.getFailures().size(), equalTo(1)); + assertThat( + remote2.getFailures().get(0).getCause().getMessage(), + containsString("failed to resolve enrich policy [vendors]") + ); + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(localCluster.getIndexExpression(), equalTo("events")); + assertThat(localCluster.getTotalShards(), equalTo(1)); + assertThat(localCluster.getSuccessfulShards(), equalTo(1)); + assertThat(localCluster.getSkippedShards(), equalTo(0)); + assertThat(localCluster.getFailedShards(), equalTo(0)); + assertThat(localCluster.getFailures().size(), equalTo(0)); + } + } + } + } + } + protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse) { EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); request.query(query); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java index 4f6886edc5fbc..3e34f5dea95a8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java @@ -23,7 +23,8 @@ public final class EnrichResolution { private final Map resolvedPolicies = ConcurrentCollections.newConcurrentMap(); private final Map errors = ConcurrentCollections.newConcurrentMap(); - private final Map unavailableClusters = ConcurrentCollections.newConcurrentMap(); + // skip_unavailable=true remote clusters that are unavailable or had errors when resolving the enrich policy + private final Map unusableRemotes = ConcurrentCollections.newConcurrentMap(); public ResolvedEnrichPolicy getResolvedPolicy(String policyName, Enrich.Mode mode) { return resolvedPolicies.get(new Key(policyName, mode)); @@ -34,11 +35,17 @@ public Collection resolvedEnrichPolicies() { } + // created for testing + public boolean hasErrors() { + return errors.size() > 0; + } + public String getError(String policyName, Enrich.Mode mode) { final String error = errors.get(new Key(policyName, mode)); if (error != null) { return error; } else { + // TODO: I don't understand this code - why not just return null? Why is it wrong to call this when there's no errors? assert false : "unresolved enrich policy [" + policyName + "] mode [" + mode + "]"; return "unresolved enrich policy [" + policyName + "] mode [" + mode + "]"; } @@ -52,12 +59,16 @@ public void addError(String policyName, Enrich.Mode mode, String reason) { errors.putIfAbsent(new Key(policyName, mode), reason); } - public void addUnavailableCluster(String clusterAlias, Exception e) { - unavailableClusters.put(clusterAlias, e); + public void addUnusableRemote(String clusterAlias, Exception e) { + unusableRemotes.put(clusterAlias, e); } - public Map getUnavailableClusters() { - return unavailableClusters; + /** + * @return Remote clusters that are either unavailable (disconnected) or had a failure when resolving the enrich policy. + * Map key is cluster alias. Map value is the Exception showing the particular error encountered. + */ + public Map unusableRemotes() { + return unusableRemotes; } private record Key(String policyName, Enrich.Mode mode) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index c8a7a6bcc4e98..bf1ba4bc4c975 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -17,12 +17,11 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.iterable.Iterables; -import org.elasticsearch.core.Tuple; +import org.elasticsearch.core.Nullable; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; @@ -53,7 +52,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -98,12 +96,12 @@ public record UnresolvedPolicy(String name, Enrich.Mode mode) { /** * Resolves a set of enrich policies * - * @param targetClusters the target clusters + * @param targetClusters the target clusters; key: cluster alias; value: skip_unavailable setting * @param unresolvedPolicies the unresolved policies * @param listener notified with the enrich resolution */ public void resolvePolicies( - Collection targetClusters, + Map targetClusters, Collection unresolvedPolicies, ActionListener listener ) { @@ -111,9 +109,10 @@ public void resolvePolicies( listener.onResponse(new EnrichResolution()); return; } - final Set remoteClusters = new HashSet<>(targetClusters); - final boolean includeLocal = remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); - lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, listener.map(lookupResponses -> { + final Map remotes = new HashMap<>(targetClusters); + final boolean includeLocal = targetClusters.containsKey(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + remotes.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + lookupPolicies(remotes.keySet(), includeLocal, unresolvedPolicies, listener.map(lookupResponses -> { final EnrichResolution enrichResolution = new EnrichResolution(); Map lookupResponsesToProcess = new HashMap<>(); @@ -121,58 +120,87 @@ public void resolvePolicies( for (Map.Entry entry : lookupResponses.entrySet()) { String clusterAlias = entry.getKey(); if (entry.getValue().connectionError != null) { - enrichResolution.addUnavailableCluster(clusterAlias, entry.getValue().connectionError); + enrichResolution.addUnusableRemote(clusterAlias, entry.getValue().connectionError); // remove unavailable cluster from the list of clusters which is used below to create the ResolvedEnrichPolicy - remoteClusters.remove(clusterAlias); + Boolean removedVal = remotes.remove(clusterAlias); + assert removedVal != null : "Remote " + clusterAlias + " was not removed from the remotes list."; } else { lookupResponsesToProcess.put(clusterAlias, entry.getValue()); } } for (UnresolvedPolicy unresolved : unresolvedPolicies) { - Tuple resolved = mergeLookupResults( + MergedPolicyLookupResult resolved = mergeLookupResults( unresolved, - calculateTargetClusters(unresolved.mode, includeLocal, remoteClusters), + calculateTargetClusters(unresolved.mode, includeLocal, remotes), lookupResponsesToProcess ); - if (resolved.v1() != null) { - enrichResolution.addResolvedPolicy(unresolved.name, unresolved.mode, resolved.v1()); - } else { - assert resolved.v2() != null; - enrichResolution.addError(unresolved.name, unresolved.mode, resolved.v2()); + for (Map.Entry entry : resolved.remotesToBeSkipped().entrySet()) { + enrichResolution.addUnusableRemote(entry.getKey(), entry.getValue()); + } + // If a remote-only CCS is done and all the remotes are "unusable" (due to missing enrich policies) + // *and* skippable (skip_unavailable=true), don't fill in either the resolved policy or the error on + // the enrichResolution object. The only field with contents will be the "unusable remotes" field. + if (resolved.resolvedPolicy() != null) { + enrichResolution.addResolvedPolicy(unresolved.name, unresolved.mode, resolved.resolvedPolicy()); + } else if (resolved.error() != null) { + enrichResolution.addError(unresolved.name, unresolved.mode, resolved.error()); } } return enrichResolution; })); } - private Collection calculateTargetClusters(Enrich.Mode mode, boolean includeLocal, Set remoteClusters) { + record MergedPolicyLookupResult( + @Nullable ResolvedEnrichPolicy resolvedPolicy, + Map remotesToBeSkipped, + @Nullable String error + ) {} + + /** + * @param mode + * @param includeLocal + * @param remoteClusters + * @return Map where key: String=clusterAlias, value: Boolean=skipUnavailable setting + */ + private Map calculateTargetClusters(Enrich.Mode mode, boolean includeLocal, Map remoteClusters) { return switch (mode) { - case ANY -> CollectionUtils.appendToCopy(remoteClusters, RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); - case COORDINATOR -> List.of(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); - case REMOTE -> includeLocal - ? CollectionUtils.appendToCopy(remoteClusters, RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) - : remoteClusters; + case ANY -> copyAndAddLocalCluster(remoteClusters); + case COORDINATOR -> Map.of(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Boolean.FALSE); + case REMOTE -> includeLocal ? copyAndAddLocalCluster(remoteClusters) : remoteClusters; }; } + private Map copyAndAddLocalCluster(Map remoteClusters) { + Map newMap = new HashMap<>(remoteClusters); + // technically the local cluster has no skip_unavailable setting, but for enrich policy resolution + // purposes we treat errors on the local cluster as fatal, so set it as false to simplify downstream code + newMap.put(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Boolean.FALSE); + return newMap; + } + /** * Resolve an enrich policy by merging the lookup responses from the target clusters. * @return a resolved enrich policy or an error */ - private Tuple mergeLookupResults( + private MergedPolicyLookupResult mergeLookupResults( UnresolvedPolicy unresolved, - Collection targetClusters, + Map targetClusters, Map lookupResults ) { String policyName = unresolved.name; if (targetClusters.isEmpty()) { - return Tuple.tuple(null, "enrich policy [" + policyName + "] cannot be resolved since remote clusters are unavailable"); + String error = "enrich policy [" + policyName + "] cannot be resolved since remote clusters are unavailable"; + return new MergedPolicyLookupResult(null, Collections.emptyMap(), error); } + // key for this map is cluster alias; a cluster will be in this map only if it has resolved the enrich policy/policies needed final Map policies = new HashMap<>(); - final List failures = new ArrayList<>(); - for (String cluster : targetClusters) { + final List failures = new ArrayList<>(); // (fatal) failures on local or skip_unavailable=false remote + final Map remotesToBeSkipped = new HashMap<>(); // skip_unavailable=true remotes enrich policy failures + for (Map.Entry clusterInfo : targetClusters.entrySet()) { + String cluster = clusterInfo.getKey(); + boolean skipUnavailable = clusterInfo.getValue(); LookupResponse lookupResult = lookupResults.get(cluster); if (lookupResult != null) { assert lookupResult.connectionError == null : "Should never have a non-null connectionError here"; @@ -182,35 +210,52 @@ private Tuple mergeLookupResults( } else { final String failure = lookupResult.failures.get(policyName); if (failure != null) { - failures.add(failure); + if (skipUnavailable) { + remotesToBeSkipped.put(cluster, new IllegalStateException(failure)); + } else { + failures.add(failure); + } + } else if (skipUnavailable) { + // code path where there was no enrich policy found at all on the remote + remotesToBeSkipped.put(cluster, new IllegalStateException("failed to resolve enrich policy [" + policyName + "]")); } } } } - if (targetClusters.size() != policies.size()) { + if (targetClusters.size() != policies.size() + remotesToBeSkipped.size()) { final String reason; if (failures.isEmpty()) { - List missingClusters = targetClusters.stream().filter(c -> policies.containsKey(c) == false).sorted().toList(); - reason = missingPolicyError(policyName, targetClusters, missingClusters); + List missingClusters = targetClusters.keySet() + .stream() + .filter(c -> policies.containsKey(c) == false) + .sorted() + .toList(); + reason = missingPolicyError(policyName, targetClusters.keySet(), missingClusters); } else { reason = "failed to resolve enrich policy [" + policyName + "]; reason " + failures; } - return Tuple.tuple(null, reason); + return new MergedPolicyLookupResult(null, remotesToBeSkipped, reason); + } else if (policies.isEmpty()) { + // no target cluster had a valid enrich policy and all are skip_unavailable=true + assert targetClusters.values().stream().allMatch(b -> b == Boolean.TRUE) : "Not all target clusters are skip_unavailable=true"; + return new MergedPolicyLookupResult(null, remotesToBeSkipped, null); } + Map mappings = new HashMap<>(); Map concreteIndices = new HashMap<>(); ResolvedEnrichPolicy last = null; + // loop over clusters with a ResolvedEnrichPolicy - ensure no errors within the policy for (Map.Entry e : policies.entrySet()) { ResolvedEnrichPolicy curr = e.getValue(); if (last != null && last.matchField().equals(curr.matchField()) == false) { String error = "enrich policy [" + policyName + "] has different match fields "; error += "[" + last.matchField() + ", " + curr.matchField() + "] across clusters"; - return Tuple.tuple(null, error); + return new MergedPolicyLookupResult(null, remotesToBeSkipped, error); } if (last != null && last.matchType().equals(curr.matchType()) == false) { String error = "enrich policy [" + policyName + "] has different match types "; error += "[" + last.matchType() + ", " + curr.matchType() + "] across clusters"; - return Tuple.tuple(null, error); + return new MergedPolicyLookupResult(null, remotesToBeSkipped, error); } // merge mappings for (Map.Entry m : curr.mapping().entrySet()) { @@ -226,7 +271,7 @@ private Tuple mergeLookupResults( if (old != null && old.getDataType().equals(field.getDataType()) == false) { String error = "field [" + m.getKey() + "] of enrich policy [" + policyName + "] has different data types "; error += "[" + old.getDataType() + ", " + field.getDataType() + "] across clusters"; - return Tuple.tuple(null, error); + return new MergedPolicyLookupResult(null, remotesToBeSkipped, error); } } if (last != null) { @@ -237,7 +282,8 @@ private Tuple mergeLookupResults( var diff = counts.entrySet().stream().filter(f -> f.getValue() < 2).map(Map.Entry::getKey).limit(20).sorted().toList(); if (diff.isEmpty() == false) { String detailed = "these fields are missing in some policies: " + diff; - return Tuple.tuple(null, "enrich policy [" + policyName + "] has different enrich fields across clusters; " + detailed); + String fullMessage = "enrich policy [" + policyName + "] has different enrich fields across clusters; " + detailed; + return new MergedPolicyLookupResult(null, remotesToBeSkipped, fullMessage); } } // merge concrete indices @@ -246,7 +292,7 @@ private Tuple mergeLookupResults( } assert last != null; var resolved = new ResolvedEnrichPolicy(last.matchField(), last.matchType(), last.enrichFields(), concreteIndices, mappings); - return Tuple.tuple(resolved, null); + return new MergedPolicyLookupResult(resolved, remotesToBeSkipped, null); } private String missingPolicyError(String policyName, Collection targetClusters, List missingClusters) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 9630a520e8654..78945f4d9bc5c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.common.util.Maps; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.DriverProfile; @@ -72,6 +73,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -294,19 +296,22 @@ private void preAnalyze( .collect(Collectors.toSet()); final List indices = preAnalysis.indices; // TODO: make a separate call for lookup indices - final Set targetClusters = enrichPolicyResolver.groupIndicesPerCluster( + final Set clusters = enrichPolicyResolver.groupIndicesPerCluster( indices.stream().flatMap(t -> Arrays.stream(Strings.commaDelimitedListToStringArray(t.id().index()))).toArray(String[]::new) ).keySet(); + // key: cluster alias; value = skip_unavailable setting + Map targetClusters = Maps.newHashMapWithExpectedSize(clusters.size()); + for (String alias : clusters) { + targetClusters.put(alias, executionInfo.isSkipUnavailable(alias)); + } enrichPolicyResolver.resolvePolicies(targetClusters, unresolvedPolicies, listener.delegateFailureAndWrap((l, enrichResolution) -> { // first we need the match_fields names from enrich policies and THEN, with an updated list of fields, we call field_caps API var matchFields = enrichResolution.resolvedEnrichPolicies() .stream() .map(ResolvedEnrichPolicy::matchField) .collect(Collectors.toSet()); - Map unavailableClusters = enrichResolution.getUnavailableClusters(); - preAnalyzeIndices(parsed, executionInfo, unavailableClusters, l.delegateFailureAndWrap((ll, indexResolution) -> { - // TODO in follow-PR (for skip_unavailble handling of missing concrete indexes) add some tests for invalid index - // resolution to updateExecutionInfo + Map unusableRemotes = enrichResolution.unusableRemotes(); + preAnalyzeIndices(parsed, executionInfo, unusableRemotes, l.delegateFailureAndWrap((ll, indexResolution) -> { if (indexResolution.isValid()) { EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.unavailableClusters()); @@ -324,11 +329,15 @@ private void preAnalyze( // If new clusters appear when resolving the main indices, we need to resolve the enrich policies again // or exclude main concrete indices. Since this is rare, it's simpler to resolve the enrich policies again. // TODO: add a test for this - if (targetClusters.containsAll(newClusters) == false + if (clusters.containsAll(newClusters) == false // do not bother with a re-resolution if only remotes were requested and all were offline && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) > 0) { + Map newClusterInfo = new HashMap<>(); + for (String newCluster : newClusters) { + newClusterInfo.put(newCluster, executionInfo.isSkipUnavailable(newCluster)); + } enrichPolicyResolver.resolvePolicies( - newClusters, + newClusterInfo, unresolvedPolicies, ll.map(newEnrichResolution -> action.apply(indexResolution, newEnrichResolution)) ); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java index 4fe2fef7e3f45..e1c758fd6dbac 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java @@ -204,6 +204,10 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionIn * Mark it as SKIPPED with 0 shards searched and took=0. */ for (String c : clustersWithNoMatchingIndices) { + if (executionInfo.getCluster(c).getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) { + // if cluster was already marked SKIPPED during enrich policy resolution, do not overwrite + continue; + } final String indexExpression = executionInfo.getCluster(c).getIndexExpression(); if (missingIndicesIsFatal(c, executionInfo)) { String error = Strings.format( diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java index 39170f1a305df..5b3a26c4af7ce 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java @@ -55,6 +55,7 @@ import static org.elasticsearch.transport.RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Mockito.mock; @@ -164,8 +165,11 @@ private void assertHostPolicies(ResolvedEnrichPolicy resolved) { public void testLocalHosts() { for (Enrich.Mode mode : Enrich.Mode.values()) { - Set clusters = Set.of(LOCAL_CLUSTER_GROUP_KEY); - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("hosts", mode))); + Map clusters = Map.of(LOCAL_CLUSTER_GROUP_KEY, Boolean.FALSE); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusters, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("hosts", mode)) + ); ResolvedEnrichPolicy resolved = resolution.getResolvedPolicy("hosts", mode); assertHostPolicies(resolved); assertThat(resolved.concreteIndices(), equalTo(Map.of("", ".enrich-hosts-123"))); @@ -173,9 +177,12 @@ public void testLocalHosts() { } public void testRemoteHosts() { - Set clusters = Set.of("cluster_a", "cluster_b"); + Map clusters = Map.of("cluster_a", randomBoolean(), "cluster_b", randomBoolean()); for (Enrich.Mode mode : Enrich.Mode.values()) { - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("hosts", mode))); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusters, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("hosts", mode)) + ); ResolvedEnrichPolicy resolved = resolution.getResolvedPolicy("hosts", mode); assertHostPolicies(resolved); var expectedIndices = switch (mode) { @@ -188,9 +195,19 @@ public void testRemoteHosts() { } public void testMixedHosts() { - Set clusters = Set.of(LOCAL_CLUSTER_GROUP_KEY, "cluster_a", "cluster_b"); + Map clusters = Map.of( + LOCAL_CLUSTER_GROUP_KEY, + Boolean.TRUE, + "cluster_a", + randomBoolean(), + "cluster_b", + randomBoolean() + ); for (Enrich.Mode mode : Enrich.Mode.values()) { - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("hosts", mode))); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusters, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("hosts", mode)) + ); ResolvedEnrichPolicy resolved = resolution.getResolvedPolicy("hosts", mode); assertHostPolicies(resolved); var expectedIndices = switch (mode) { @@ -203,8 +220,11 @@ public void testMixedHosts() { public void testLocalAddress() { for (Enrich.Mode mode : Enrich.Mode.values()) { - Set clusters = Set.of(LOCAL_CLUSTER_GROUP_KEY); - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("address", mode))); + Map clusters = Map.of(LOCAL_CLUSTER_GROUP_KEY, Boolean.FALSE); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusters, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("address", mode)) + ); ResolvedEnrichPolicy resolved = resolution.getResolvedPolicy("address", mode); assertNotNull(resolved); assertThat(resolved.matchField(), equalTo("emp_id")); @@ -214,8 +234,15 @@ public void testLocalAddress() { } { List clusters = randomSubsetOf(between(1, 3), List.of("", "cluster_a", "cluster_a")); + Map clusterInfo = new HashMap<>(); + for (String cluster : clusters) { + clusterInfo.put(cluster, cluster.equals("") ? Boolean.FALSE : randomBoolean()); + } var mode = Enrich.Mode.COORDINATOR; - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("address", mode))); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusterInfo, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("address", mode)) + ); ResolvedEnrichPolicy resolved = resolution.getResolvedPolicy("address", mode); assertNotNull(resolved); assertThat(resolved.matchField(), equalTo("emp_id")); @@ -226,9 +253,12 @@ public void testLocalAddress() { } public void testRemoteAddress() { - Set clusters = Set.of("cluster_a", "cluster_b"); + Map clusters = Map.of("cluster_a", randomBoolean(), "cluster_b", randomBoolean()); for (Enrich.Mode mode : List.of(Enrich.Mode.ANY, Enrich.Mode.REMOTE)) { - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("address", mode))); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusters, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("address", mode)) + ); assertNull(resolution.getResolvedPolicy("address", mode)); var msg = "enrich policy [address] has different enrich fields across clusters; " + "these fields are missing in some policies: [state]"; @@ -237,9 +267,19 @@ public void testRemoteAddress() { } public void testMixedAddress() { - Set clusters = Set.of(LOCAL_CLUSTER_GROUP_KEY, "cluster_a", "cluster_b"); + Map clusters = Map.of( + LOCAL_CLUSTER_GROUP_KEY, + Boolean.FALSE, + "cluster_a", + randomBoolean(), + "cluster_b", + randomBoolean() + ); for (Enrich.Mode mode : List.of(Enrich.Mode.ANY, Enrich.Mode.REMOTE)) { - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("hosts", mode))); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusters, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("hosts", mode)) + ); ResolvedEnrichPolicy resolved = resolution.getResolvedPolicy("hosts", mode); assertHostPolicies(resolved); assertThat( @@ -252,8 +292,11 @@ public void testMixedAddress() { public void testLocalAuthor() { for (Enrich.Mode mode : Enrich.Mode.values()) { - Set clusters = Set.of(LOCAL_CLUSTER_GROUP_KEY); - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode))); + Map clusters = Map.of(LOCAL_CLUSTER_GROUP_KEY, Boolean.FALSE); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusters, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode)) + ); ResolvedEnrichPolicy resolved = resolution.getResolvedPolicy("author", mode); assertNotNull(resolved); assertThat(resolved.matchField(), equalTo("author")); @@ -264,7 +307,14 @@ public void testLocalAuthor() { { var mode = Enrich.Mode.COORDINATOR; var clusters = randomSubsetOf(between(1, 3), Set.of("", "cluster_a", "cluster_b")); - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode))); + Map clusterInfo = new HashMap<>(); + for (String cluster : clusters) { + clusterInfo.put(cluster, cluster.equals("") ? Boolean.FALSE : randomBoolean()); + } + var resolution = localCluster.resolvePoliciesWithRandomization( + clusterInfo, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode)) + ); ResolvedEnrichPolicy resolved = resolution.getResolvedPolicy("author", mode); assertNotNull(resolved); assertThat(resolved.matchField(), equalTo("author")); @@ -276,10 +326,13 @@ public void testLocalAuthor() { } public void testAuthorClusterA() { - Set clusters = Set.of("cluster_a"); + Map clusters = Map.of("cluster_a", randomBoolean()); { var mode = Enrich.Mode.ANY; - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode))); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusters, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode)) + ); assertNull(resolution.getResolvedPolicy("author", mode)); assertThat( resolution.getError("author", mode), @@ -288,7 +341,10 @@ public void testAuthorClusterA() { } { var mode = Enrich.Mode.REMOTE; - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode))); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusters, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode)) + ); ResolvedEnrichPolicy resolved = resolution.getResolvedPolicy("author", mode); assertNotNull(resolved); assertThat(resolved.matchType(), equalTo("range")); @@ -300,10 +356,13 @@ public void testAuthorClusterA() { } public void testAuthorClusterB() { - Set clusters = Set.of("cluster_b"); + Map clusters = Map.of("cluster_b", randomBoolean()); { var mode = Enrich.Mode.ANY; - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode))); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusters, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode)) + ); assertNull(resolution.getResolvedPolicy("author", mode)); assertThat( resolution.getError("author", mode), @@ -312,7 +371,10 @@ public void testAuthorClusterB() { } { var mode = Enrich.Mode.REMOTE; - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode))); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusters, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode)) + ); ResolvedEnrichPolicy resolved = resolution.getResolvedPolicy("author", mode); assertNotNull(resolved); assertThat(resolved.matchType(), equalTo("match")); @@ -324,10 +386,13 @@ public void testAuthorClusterB() { } public void testAuthorClusterAAndClusterB() { - Set clusters = Set.of("cluster_a", "cluster_b"); + Map clusters = Map.of("cluster_a", randomBoolean(), "cluster_b", randomBoolean()); { var mode = Enrich.Mode.ANY; - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode))); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusters, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode)) + ); assertNull(resolution.getResolvedPolicy("author", mode)); assertThat( resolution.getError("author", mode), @@ -336,7 +401,10 @@ public void testAuthorClusterAAndClusterB() { } { var mode = Enrich.Mode.REMOTE; - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode))); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusters, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode)) + ); assertNull(resolution.getResolvedPolicy("author", mode)); assertThat( resolution.getError("author", mode), @@ -346,10 +414,13 @@ public void testAuthorClusterAAndClusterB() { } public void testLocalAndClusterBAuthor() { - Set clusters = Set.of("", "cluster_b"); + Map clusters = Map.of("", Boolean.FALSE, "cluster_b", randomBoolean()); { var mode = Enrich.Mode.ANY; - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode))); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusters, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode)) + ); assertNull(resolution.getResolvedPolicy("author", mode)); assertThat( resolution.getError("author", mode), @@ -358,7 +429,10 @@ public void testLocalAndClusterBAuthor() { } { var mode = Enrich.Mode.REMOTE; - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode))); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusters, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode)) + ); assertNull(resolution.getResolvedPolicy("author", mode)); assertThat( resolution.getError("author", mode), @@ -369,31 +443,76 @@ public void testLocalAndClusterBAuthor() { public void testMissingLocalPolicy() { for (Enrich.Mode mode : Enrich.Mode.values()) { - var resolution = localCluster.resolvePolicies(Set.of(""), List.of(new EnrichPolicyResolver.UnresolvedPolicy("authoz", mode))); + var resolution = localCluster.resolvePoliciesWithRandomization( + Map.of("", Boolean.FALSE), + List.of(new EnrichPolicyResolver.UnresolvedPolicy("authoz", mode)) + ); assertNull(resolution.getResolvedPolicy("authoz", mode)); assertThat(resolution.getError("authoz", mode), equalTo("cannot find enrich policy [authoz], did you mean [author]?")); } } public void testMissingRemotePolicy() { + String missingPolicyName = "addrezz"; + + // missing policy on skip_unavailable=true is not a (fatal) error - the error is simply recorded in metadata { - var mode = Enrich.Mode.REMOTE; - var resolution = localCluster.resolvePolicies( - Set.of("cluster_a"), + boolean skipUnavailable = true; + var mode = Enrich.Mode.REMOTE; // only requires policy on remote cluster + EnrichResolution resolution = localCluster.resolvePolicies( + Map.of("cluster_a", skipUnavailable), List.of(new EnrichPolicyResolver.UnresolvedPolicy("addrezz", mode)) ); - assertNull(resolution.getResolvedPolicy("addrezz", mode)); - assertThat(resolution.getError("addrezz", mode), equalTo("cannot find enrich policy [addrezz] on clusters [cluster_a]")); + assertThat(resolution.unusableRemotes().size(), equalTo(1)); + Exception exception = resolution.unusableRemotes().get("cluster_a"); + assertNotNull(exception); + assertThat(exception.getMessage(), containsString("failed to resolve enrich policy [addrezz]")); + assertNull(resolution.getResolvedPolicy(missingPolicyName, mode)); + assertFalse(resolution.hasErrors()); } + + // missing policy on skip_unavailable=true is not a (fatal) error, but the missing policy on local is a fatal error { - var mode = Enrich.Mode.ANY; + boolean skipUnavailable = true; + var mode = Enrich.Mode.ANY; // requires policy to be on the local cluster as well var resolution = localCluster.resolvePolicies( - Set.of("cluster_a"), - List.of(new EnrichPolicyResolver.UnresolvedPolicy("addrezz", mode)) + Map.of("cluster_a", skipUnavailable), + List.of(new EnrichPolicyResolver.UnresolvedPolicy(missingPolicyName, mode)) + ); + assertThat(resolution.unusableRemotes().size(), equalTo(1)); + assertNull(resolution.getResolvedPolicy(missingPolicyName, mode)); + assertThat( + resolution.getError(missingPolicyName, mode), + equalTo("cannot find enrich policy [addrezz] on clusters [_local, cluster_a]") + ); + } + + // missing policy on skip_unavailable=false is an error + { + boolean skipUnavailable = false; + var mode = Enrich.Mode.REMOTE; // only requires policy on remote cluster + var resolution = localCluster.resolvePolicies( + Map.of("cluster_a", skipUnavailable), + List.of(new EnrichPolicyResolver.UnresolvedPolicy(missingPolicyName, mode)) ); - assertNull(resolution.getResolvedPolicy("addrezz", mode)); + assertThat(resolution.unusableRemotes().size(), equalTo(0)); + assertNull(resolution.getResolvedPolicy(missingPolicyName, mode)); assertThat( - resolution.getError("addrezz", mode), + resolution.getError(missingPolicyName, mode), + equalTo("cannot find enrich policy [addrezz] on clusters [cluster_a]") + ); + } + { + boolean skipUnavailable = false; + var mode = Enrich.Mode.ANY; // requires policy to be on the local cluster as well + var resolution = localCluster.resolvePolicies( + Map.of("cluster_a", skipUnavailable), + List.of(new EnrichPolicyResolver.UnresolvedPolicy(missingPolicyName, mode)) + ); + assertThat(resolution.unusableRemotes().size(), equalTo(0)); + assertNull(resolution.getResolvedPolicy(missingPolicyName, mode)); + assertThat( + resolution.getError(missingPolicyName, mode), equalTo("cannot find enrich policy [addrezz] on clusters [_local, cluster_a]") ); } @@ -426,21 +545,36 @@ class TestEnrichPolicyResolver extends EnrichPolicyResolver { this.mappings = mappings; } - EnrichResolution resolvePolicies(Collection clusters, Collection unresolvedPolicies) { - PlainActionFuture future = new PlainActionFuture<>(); + /** + * @param clusters target clusters map; key=cluster alias; value=skip_unavailable setting (should be false for local) + * @param unresolvedPolicies + * @return + */ + EnrichResolution resolvePoliciesWithRandomization(Map clusters, Collection unresolvedPolicies) { + return resolvePolicies(clusters, randomAdditionsToUnresolvedPolicyList(unresolvedPolicies)); + } + + Collection randomAdditionsToUnresolvedPolicyList(Collection unresolvedPolicies) { + Collection revisedList = unresolvedPolicies; if (randomBoolean()) { - unresolvedPolicies = new ArrayList<>(unresolvedPolicies); + revisedList = new ArrayList<>(revisedList); for (Enrich.Mode mode : Enrich.Mode.values()) { for (String policy : List.of("hosts", "address", "author")) { if (randomBoolean()) { - unresolvedPolicies.add(new UnresolvedPolicy(policy, mode)); + revisedList.add(new UnresolvedPolicy(policy, mode)); } } } if (randomBoolean()) { - unresolvedPolicies.add(new UnresolvedPolicy("legacy-policy-1", randomFrom(Enrich.Mode.values()))); + // legacy-policy-1 does not exist on any cluster in this test + revisedList.add(new UnresolvedPolicy("legacy-policy-1", randomFrom(Enrich.Mode.values()))); } } + return revisedList; + } + + EnrichResolution resolvePolicies(Map clusters, Collection unresolvedPolicies) { + PlainActionFuture future = new PlainActionFuture<>(); super.resolvePolicies(clusters, unresolvedPolicies, future); return future.actionGet(30, TimeUnit.SECONDS); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java index 60b632c443f8e..da85d1810e860 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.common.Strings; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexMode; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.ConnectTransportException; @@ -251,7 +252,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); + executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "x*", true)); executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); EsIndex esIndex = new EsIndex( @@ -277,13 +278,15 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); - assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); + assertThat(remote1Cluster.getIndexExpression(), equalTo("x*")); assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); assertThat(remote1Cluster.getTook().millis(), equalTo(0L)); assertThat(remote1Cluster.getTotalShards(), equalTo(0)); assertThat(remote1Cluster.getSuccessfulShards(), equalTo(0)); assertThat(remote1Cluster.getSkippedShards(), equalTo(0)); assertThat(remote1Cluster.getFailedShards(), equalTo(0)); + assertThat(remote1Cluster.getFailures().size(), equalTo(1)); + assertThat(remote1Cluster.getFailures().get(0).getCause().getMessage(), containsString("Unknown index [x*]")); EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); @@ -350,6 +353,63 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { ); assertThat(ve.getDetailedMessage(), containsString("Unknown index [remote2:mylogs1,mylogs2,logs*]")); } + + // remotes already marked as skipped should not be modified + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); + executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); + // remote1 has already been marked as SKIPPED with a previous error message, so should not be overwritten + // by updateExecutionInfoWithClustersWithNoMatchingIndices + executionInfo.swapCluster( + remote1Alias, + (k, v) -> new EsqlExecutionInfo.Cluster( + remote1Alias, + "*", + true, + EsqlExecutionInfo.Cluster.Status.SKIPPED, + 0, + 0, + 0, + 0, + List.of(new ShardSearchFailure(new IllegalStateException("previous error message"))), + TimeValue.timeValueMillis(22) + ) + ); + executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); + EsIndex esIndex = new EsIndex( + "logs*,remote2:mylogs1,remote2:mylogs2,remote2:logs*", + randomMapping(), + Map.of( + "logs-a", + IndexMode.STANDARD, + "remote2:mylogs1", + IndexMode.STANDARD, + "remote2:mylogs2", + IndexMode.STANDARD, + "remote2:logs-b", + IndexMode.STANDARD + ) + ); + // remote1 is missing from the concrete indices so would normally be marked as skipped and set with failure of "Unknown index" + IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), Map.of()); + EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); + assertThat(localCluster.getIndexExpression(), equalTo("logs*")); + assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); + EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); + assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); + assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remote1Cluster.getTook().millis(), equalTo(22L)); + assertThat(remote1Cluster.getTotalShards(), equalTo(0)); + assertThat(remote1Cluster.getSuccessfulShards(), equalTo(0)); + assertThat(remote1Cluster.getSkippedShards(), equalTo(0)); + assertThat(remote1Cluster.getFailedShards(), equalTo(0)); + assertThat(remote1Cluster.getFailures().size(), equalTo(1)); + assertThat(remote1Cluster.getFailures().get(0).getCause().getMessage(), containsString("previous error message")); + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); + assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); + assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); + } } public void testDetermineUnavailableRemoteClusters() { diff --git a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS1EnrichUnavailableRemotesIT.java b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS1EnrichUnavailableRemotesIT.java new file mode 100644 index 0000000000000..47c7ac8241fcd --- /dev/null +++ b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS1EnrichUnavailableRemotesIT.java @@ -0,0 +1,353 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.remotecluster; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.common.Strings; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.rules.RuleChain; +import org.junit.rules.TestRule; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.endsWith; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; + +public class CrossClusterEsqlRCS1EnrichUnavailableRemotesIT extends AbstractRemoteClusterSecurityTestCase { + private static final AtomicBoolean SSL_ENABLED_REF = new AtomicBoolean(); + + static { + fulfillingCluster = ElasticsearchCluster.local() + .name("fulfilling-cluster") + .nodes(1) + .module("x-pack-autoscaling") + .module("x-pack-esql") + .module("x-pack-enrich") + .module("x-pack-ml") + .module("ingest-common") + .apply(commonClusterConfig) + .setting("remote_cluster.port", "0") + .setting("xpack.ml.enabled", "false") + .setting("xpack.security.remote_cluster_server.ssl.enabled", () -> String.valueOf(SSL_ENABLED_REF.get())) + .setting("xpack.security.remote_cluster_server.ssl.key", "remote-cluster.key") + .setting("xpack.security.remote_cluster_server.ssl.certificate", "remote-cluster.crt") + .setting("xpack.security.authc.token.enabled", "true") + .keystore("xpack.security.remote_cluster_server.ssl.secure_key_passphrase", "remote-cluster-password") + .node(0, spec -> spec.setting("remote_cluster_server.enabled", "true")) + .build(); + + queryCluster = ElasticsearchCluster.local() + .name("query-cluster") + .module("x-pack-autoscaling") + .module("x-pack-esql") + .module("x-pack-enrich") + .module("x-pack-ml") + .module("ingest-common") + .apply(commonClusterConfig) + .setting("xpack.ml.enabled", "false") + .setting("xpack.security.remote_cluster_client.ssl.enabled", () -> String.valueOf(SSL_ENABLED_REF.get())) + .build(); + } + + @ClassRule + public static TestRule clusterRule = RuleChain.outerRule(fulfillingCluster).around(queryCluster); + + private final String[] modes = { "_coordinator", "_remote" }; + + @Before + public void setupPreRequisites() throws IOException { + setupRolesAndPrivileges(); + setSourceData(); + + var policy = createPolicy("employees-policy", "employees", "email", new String[] { "id", "designation" }); + // Create the enrich policy on both clusters. + assertOK(client().performRequest(policy)); + assertOK(performRequestAgainstFulfillingCluster(policy)); + + // Execute the enrich policy on both clusters. + var exec = executePolicy("employees-policy"); + assertOK(client().performRequest(exec)); + assertOK(performRequestAgainstFulfillingCluster(exec)); + } + + public void testEsqlEnrichWithSkipUnavailable() throws Exception { + esqlEnrichWithRandomSkipUnavailable(); + esqlEnrichWithSkipUnavailableTrue(); + esqlEnrichWithSkipUnavailableFalse(); + } + + private void esqlEnrichWithRandomSkipUnavailable() throws Exception { + configureRemoteCluster("my_remote_cluster", fulfillingCluster, true, randomBoolean(), randomBoolean()); + + String query = "FROM to-be-enr*,my_remote_cluster:to-be-enr* | ENRICH " + randomFrom(modes) + ":employees-policy | LIMIT 10"; + Response response = client().performRequest(esqlRequest(query)); + + Map map = responseAsMap(response); + ArrayList values = (ArrayList) map.get("values"); + Map clusters = (Map) map.get("_clusters"); + Map clusterDetails = (Map) clusters.get("details"); + Map localClusterDetails = (Map) clusterDetails.get("(local)"); + Map remoteClusterDetails = (Map) clusterDetails.get("my_remote_cluster"); + + assertOK(response); + assertThat((int) map.get("took"), greaterThan(0)); + assertThat(values.size(), is(6)); + for (int i = 0; i < 6; i++) { + ArrayList value = (ArrayList) values.get(i); + // Size is 3: ID, Email, Designation. + assertThat(value.size(), is(3)); + // Email + assertThat((String) value.get(0), endsWith("@corp.co")); + // ID + assertThat(value.get(1), is(i + 1)); + } + + assertThat((int) clusters.get("total"), is(2)); + assertThat((int) clusters.get("successful"), is(2)); + assertThat((int) clusters.get("running"), is(0)); + assertThat((int) clusters.get("skipped"), is(0)); + assertThat((int) clusters.get("partial"), is(0)); + assertThat((int) clusters.get("failed"), is(0)); + + assertThat(clusterDetails.size(), is(2)); + assertThat((int) localClusterDetails.get("took"), greaterThan(0)); + assertThat(localClusterDetails.get("status"), is("successful")); + + assertThat((int) remoteClusterDetails.get("took"), greaterThan(0)); + assertThat(remoteClusterDetails.get("status"), is("successful")); + } + + @SuppressWarnings("unchecked") + private void esqlEnrichWithSkipUnavailableTrue() throws Exception { + configureRemoteCluster("my_remote_cluster", fulfillingCluster, true, randomBoolean(), true); + + try { + fulfillingCluster.stop(true); + + String query = "FROM to-be-enriched,my_remote_cluster:to-be-enriched | ENRICH employees-policy | LIMIT 10"; + Response response = client().performRequest(esqlRequest(query)); + + Map map = responseAsMap(response); + ArrayList values = (ArrayList) map.get("values"); + Map clusters = (Map) map.get("_clusters"); + Map clusterDetails = (Map) clusters.get("details"); + Map localClusterDetails = (Map) clusterDetails.get("(local)"); + Map remoteClusterDetails = (Map) clusterDetails.get("my_remote_cluster"); + + assertOK(response); + assertThat((int) map.get("took"), greaterThan(0)); + assertThat(values.size(), is(3)); + + // We only have 3 values since the remote cluster is turned off. + for (int i = 0; i < 3; i++) { + ArrayList value = (ArrayList) values.get(i); + // Size is 3: ID, Email, Designation. + assertThat(value.size(), is(3)); + // Email + assertThat((String) value.get(0), endsWith("@corp.co")); + // ID + assertThat(value.get(1), is(i + 1)); + } + + assertThat((int) clusters.get("total"), is(2)); + assertThat((int) clusters.get("successful"), is(1)); + assertThat((int) clusters.get("running"), is(0)); + assertThat((int) clusters.get("skipped"), is(1)); + assertThat((int) clusters.get("partial"), is(0)); + assertThat((int) clusters.get("failed"), is(0)); + + assertThat(clusterDetails.size(), is(2)); + assertThat((int) localClusterDetails.get("took"), greaterThan(0)); + assertThat(localClusterDetails.get("status"), is("successful")); + + assertThat((int) remoteClusterDetails.get("took"), greaterThan(0)); + assertThat(remoteClusterDetails.get("status"), is("skipped")); + + ArrayList remoteClusterFailures = (ArrayList) remoteClusterDetails.get("failures"); + assertThat(remoteClusterFailures.size(), equalTo(1)); + Map failuresMap = (Map) remoteClusterFailures.get(0); + + Map reason = (Map) failuresMap.get("reason"); + assertThat(reason.get("type").toString(), equalTo("connect_transport_exception")); + assertThat(reason.get("reason").toString(), containsString("Unable to connect to [my_remote_cluster]")); + } finally { + fulfillingCluster.start(); + closeFulfillingClusterClient(); + initFulfillingClusterClient(); + } + } + + private void esqlEnrichWithSkipUnavailableFalse() throws Exception { + configureRemoteCluster("my_remote_cluster", fulfillingCluster, true, randomBoolean(), false); + + try { + fulfillingCluster.stop(true); + + String query = "FROM to-be-enr*,my_remote_cluster:to-be-enr* | ENRICH " + randomFrom(modes) + ":employees-policy | LIMIT 10"; + ResponseException ex = expectThrows(ResponseException.class, () -> client().performRequest(esqlRequest(query))); + assertThat(ex.getMessage(), containsString("connect_transport_exception")); + } finally { + fulfillingCluster.start(); + closeFulfillingClusterClient(); + initFulfillingClusterClient(); + } + } + + private void setupRolesAndPrivileges() throws IOException { + var putUserRequest = new Request("PUT", "/_security/user/" + REMOTE_SEARCH_USER); + putUserRequest.setJsonEntity(""" + { + "password": "x-pack-test-password", + "roles" : ["remote_search"] + }"""); + assertOK(adminClient().performRequest(putUserRequest)); + + var putRoleOnRemoteClusterRequest = new Request("PUT", "/_security/role/" + REMOTE_SEARCH_ROLE); + putRoleOnRemoteClusterRequest.setJsonEntity(""" + { + "indices": [ + { + "names": ["*"], + "privileges": ["read"] + } + ], + "cluster": [ "monitor_enrich", "manage_own_api_key" ], + "remote_indices": [ + { + "names": ["*"], + "privileges": ["read"], + "clusters": ["my_remote_cluster"] + } + ], + "remote_cluster": [ + { + "privileges": ["monitor_enrich"], + "clusters": ["my_remote_cluster"] + } + ] + }"""); + assertOK(adminClient().performRequest(putRoleOnRemoteClusterRequest)); + } + + private void setSourceData() throws IOException { + Request createIndex = new Request("PUT", "employees"); + createIndex.setJsonEntity(""" + { + "mappings": { + "properties": { + "id": { "type": "integer" }, + "email": { "type": "text" }, + "designation": { "type": "text" } + } + } + } + """); + assertOK(client().performRequest(createIndex)); + assertOK(performRequestAgainstFulfillingCluster(createIndex)); + + Request bulkRequest = new Request("POST", "/_bulk?refresh=true"); + bulkRequest.setJsonEntity(""" + { "index": { "_index": "employees" } } + { "id": 1, "email": "a@corp.co", "designation": "SDE intern"} + { "index": { "_index": "employees" } } + { "id": 2, "email": "b@corp.co", "designation": "SDE 1"} + { "index": { "_index": "employees" } } + { "id": 3, "email": "c@corp.co", "designation": "SDE 2"} + { "index": { "_index": "employees" } } + { "id": 4, "email": "d@corp.co", "designation": "SSE"} + { "index": { "_index": "employees" } } + { "id": 5, "email": "e@corp.co", "designation": "PSE 1"} + { "index": { "_index": "employees" } } + { "id": 6, "email": "f@corp.co", "designation": "PSE 2"} + """); + assertOK(client().performRequest(bulkRequest)); + assertOK(performRequestAgainstFulfillingCluster(bulkRequest)); + + createIndex = new Request("PUT", "to-be-enriched"); + createIndex.setJsonEntity(""" + { + "mappings": { + "properties": { + "email": { "type": "text" } + } + } + } + """); + assertOK(client().performRequest(createIndex)); + assertOK(performRequestAgainstFulfillingCluster(createIndex)); + + bulkRequest = new Request("POST", "/_bulk?refresh=true"); + bulkRequest.setJsonEntity(""" + { "index": { "_index": "to-be-enriched" } } + { "email": "a@corp.co"} + { "index": { "_index": "to-be-enriched" } } + { "email": "b@corp.co"} + { "index": { "_index": "to-be-enriched" } } + { "email": "c@corp.co"} + """); + assertOK(client().performRequest(bulkRequest)); + + bulkRequest = new Request("POST", "/_bulk?refresh=true"); + bulkRequest.setJsonEntity(""" + { "index": { "_index": "to-be-enriched" } } + { "email": "d@corp.co"} + { "index": { "_index": "to-be-enriched" } } + { "email": "e@corp.co"} + { "index": { "_index": "to-be-enriched" } } + { "email": "f@corp.co"} + """); + assertOK(performRequestAgainstFulfillingCluster(bulkRequest)); + } + + private Request createPolicy(String policyName, String matchIndex, String matchField, String[] enrichFields) throws IOException { + XContentBuilder body = JsonXContent.contentBuilder(); + body.startObject(); + body.startObject("match"); + body.field("indices", matchIndex); + body.field("match_field", matchField); + body.field("enrich_fields", enrichFields); + + body.endObject(); + body.endObject(); + + return makeRequest("PUT", "_enrich/policy/" + policyName, body); + } + + private Request executePolicy(String policyName) { + return new Request("PUT", "_enrich/policy/employees-policy/_execute"); + } + + private Request esqlRequest(String query) throws IOException { + XContentBuilder body = JsonXContent.contentBuilder(); + + body.startObject(); + body.field("query", query); + body.field("include_ccs_metadata", true); + body.endObject(); + + return makeRequest("POST", "_query", body); + } + + private Request makeRequest(String method, String endpoint, XContentBuilder requestBody) { + Request request = new Request(method, endpoint); + request.setJsonEntity(Strings.toString(requestBody)); + return request; + } +} diff --git a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS2EnrichUnavailableRemotesIT.java b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS2EnrichUnavailableRemotesIT.java new file mode 100644 index 0000000000000..da59e3c772736 --- /dev/null +++ b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS2EnrichUnavailableRemotesIT.java @@ -0,0 +1,380 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.remotecluster; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.common.Strings; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.cluster.util.resource.Resource; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.rules.RuleChain; +import org.junit.rules.TestRule; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.endsWith; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; + +public class CrossClusterEsqlRCS2EnrichUnavailableRemotesIT extends AbstractRemoteClusterSecurityTestCase { + private static final AtomicReference> API_KEY_MAP_REF = new AtomicReference<>(); + + static { + fulfillingCluster = ElasticsearchCluster.local() + .name("fulfilling-cluster") + .nodes(1) + .module("x-pack-autoscaling") + .module("x-pack-esql") + .module("x-pack-enrich") + .module("x-pack-ml") + .module("ingest-common") + .apply(commonClusterConfig) + .setting("remote_cluster.port", "0") + .setting("xpack.ml.enabled", "false") + .setting("xpack.security.remote_cluster_server.ssl.enabled", "true") + .setting("xpack.security.remote_cluster_server.ssl.key", "remote-cluster.key") + .setting("xpack.security.remote_cluster_server.ssl.certificate", "remote-cluster.crt") + .setting("xpack.security.authc.token.enabled", "true") + .keystore("xpack.security.remote_cluster_server.ssl.secure_key_passphrase", "remote-cluster-password") + .node(0, spec -> spec.setting("remote_cluster_server.enabled", "true")) + .build(); + + queryCluster = ElasticsearchCluster.local() + .name("query-cluster") + .module("x-pack-autoscaling") + .module("x-pack-esql") + .module("x-pack-enrich") + .module("x-pack-ml") + .module("ingest-common") + .apply(commonClusterConfig) + .setting("xpack.ml.enabled", "false") + .setting("xpack.security.remote_cluster_client.ssl.enabled", "true") + .setting("xpack.security.remote_cluster_client.ssl.certificate_authorities", "remote-cluster-ca.crt") + .setting("xpack.security.authc.token.enabled", "true") + .keystore("cluster.remote.my_remote_cluster.credentials", () -> { + if (API_KEY_MAP_REF.get() == null) { + final Map apiKeyMap = createCrossClusterAccessApiKey(""" + { + "search": [ + { + "names": ["*"] + } + ] + }"""); + API_KEY_MAP_REF.set(apiKeyMap); + } + return (String) API_KEY_MAP_REF.get().get("encoded"); + }) + .rolesFile(Resource.fromClasspath("roles.yml")) + .user(REMOTE_METRIC_USER, PASS.toString(), "read_remote_shared_metrics", false) + .build(); + } + + @ClassRule + public static TestRule clusterRule = RuleChain.outerRule(fulfillingCluster).around(queryCluster); + + private String[] modes = { "_coordinator", "_remote" }; + + @Before + public void setupPreRequisites() throws IOException { + setupRolesAndPrivileges(); + setSourceData(); + + var policy = createPolicy("employees-policy", "employees", "email", new String[] { "id", "designation" }); + // Create the enrich policy on both clusters. + assertOK(client().performRequest(policy)); + assertOK(performRequestAgainstFulfillingCluster(policy)); + + // Execute the enrich policy on both clusters. + var exec = executePolicy("employees-policy"); + assertOK(client().performRequest(exec)); + assertOK(performRequestAgainstFulfillingCluster(exec)); + } + + public void testEsqlEnrichWithSkipUnavailable() throws Exception { + esqlEnrichWithRandomSkipUnavailable(); + esqlEnrichWithSkipUnavailableTrue(); + esqlEnrichWithSkipUnavailableFalse(); + } + + private void esqlEnrichWithRandomSkipUnavailable() throws Exception { + configureRemoteCluster("my_remote_cluster", fulfillingCluster, false, randomBoolean(), randomBoolean()); + + String query = "FROM to-be-enr*,my_remote_cluster:to-be-enr* | ENRICH " + randomFrom(modes) + ":employees-policy | LIMIT 10"; + Response response = performRequestWithRemoteSearchUser(esqlRequest(query)); + + Map map = responseAsMap(response); + ArrayList values = (ArrayList) map.get("values"); + Map clusters = (Map) map.get("_clusters"); + Map clusterDetails = (Map) clusters.get("details"); + Map localClusterDetails = (Map) clusterDetails.get("(local)"); + Map remoteClusterDetails = (Map) clusterDetails.get("my_remote_cluster"); + + assertOK(response); + assertThat((int) map.get("took"), greaterThan(0)); + assertThat(values.size(), is(6)); + for (int i = 0; i < 6; i++) { + ArrayList value = (ArrayList) values.get(i); + // Size is 3: ID, Email, Designation. + assertThat(value.size(), is(3)); + // Email + assertThat((String) value.get(0), endsWith("@corp.co")); + // ID + assertThat(value.get(1), is(i + 1)); + } + + assertThat((int) clusters.get("total"), is(2)); + assertThat((int) clusters.get("successful"), is(2)); + assertThat((int) clusters.get("running"), is(0)); + assertThat((int) clusters.get("skipped"), is(0)); + assertThat((int) clusters.get("partial"), is(0)); + assertThat((int) clusters.get("failed"), is(0)); + + assertThat(clusterDetails.size(), is(2)); + assertThat((int) localClusterDetails.get("took"), greaterThan(0)); + assertThat(localClusterDetails.get("status"), is("successful")); + + assertThat((int) remoteClusterDetails.get("took"), greaterThan(0)); + assertThat(remoteClusterDetails.get("status"), is("successful")); + } + + @SuppressWarnings("unchecked") + private void esqlEnrichWithSkipUnavailableTrue() throws Exception { + configureRemoteCluster("my_remote_cluster", fulfillingCluster, false, randomBoolean(), true); + + try { + fulfillingCluster.stop(true); + + String query = "FROM to-be-enriched,my_remote_cluster:to-be-enriched | ENRICH employees-policy | LIMIT 10"; + Response response = performRequestWithRemoteSearchUser(esqlRequest(query)); + + Map map = responseAsMap(response); + ArrayList values = (ArrayList) map.get("values"); + Map clusters = (Map) map.get("_clusters"); + Map clusterDetails = (Map) clusters.get("details"); + Map localClusterDetails = (Map) clusterDetails.get("(local)"); + Map remoteClusterDetails = (Map) clusterDetails.get("my_remote_cluster"); + + assertOK(response); + assertThat((int) map.get("took"), greaterThan(0)); + assertThat(values.size(), is(3)); + + // We only have 3 values since the remote cluster is turned off. + for (int i = 0; i < 3; i++) { + ArrayList value = (ArrayList) values.get(i); + // Size is 3: ID, Email, Designation. + assertThat(value.size(), is(3)); + // Email + assertThat((String) value.get(0), endsWith("@corp.co")); + // ID + assertThat(value.get(1), is(i + 1)); + } + + assertThat((int) clusters.get("total"), is(2)); + assertThat((int) clusters.get("successful"), is(1)); + assertThat((int) clusters.get("running"), is(0)); + assertThat((int) clusters.get("skipped"), is(1)); + assertThat((int) clusters.get("partial"), is(0)); + assertThat((int) clusters.get("failed"), is(0)); + + assertThat(clusterDetails.size(), is(2)); + assertThat((int) localClusterDetails.get("took"), greaterThan(0)); + assertThat(localClusterDetails.get("status"), is("successful")); + + assertThat((int) remoteClusterDetails.get("took"), greaterThan(0)); + assertThat(remoteClusterDetails.get("status"), is("skipped")); + + ArrayList remoteClusterFailures = (ArrayList) remoteClusterDetails.get("failures"); + assertThat(remoteClusterFailures.size(), equalTo(1)); + Map failuresMap = (Map) remoteClusterFailures.get(0); + + Map reason = (Map) failuresMap.get("reason"); + assertThat(reason.get("type").toString(), equalTo("connect_transport_exception")); + assertThat(reason.get("reason").toString(), containsString("Unable to connect to [my_remote_cluster]")); + } finally { + fulfillingCluster.start(); + closeFulfillingClusterClient(); + initFulfillingClusterClient(); + } + } + + private void esqlEnrichWithSkipUnavailableFalse() throws Exception { + configureRemoteCluster("my_remote_cluster", fulfillingCluster, false, randomBoolean(), false); + + try { + fulfillingCluster.stop(true); + + String query = "FROM to-be-enr*,my_remote_cluster:to-be-enr* | ENRICH " + randomFrom(modes) + ":employees-policy | LIMIT 10"; + ResponseException ex = expectThrows(ResponseException.class, () -> performRequestWithRemoteSearchUser(esqlRequest(query))); + assertThat(ex.getMessage(), containsString("connect_transport_exception")); + } finally { + fulfillingCluster.start(); + closeFulfillingClusterClient(); + initFulfillingClusterClient(); + } + } + + private void setupRolesAndPrivileges() throws IOException { + var putUserRequest = new Request("PUT", "/_security/user/" + REMOTE_SEARCH_USER); + putUserRequest.setJsonEntity(""" + { + "password": "x-pack-test-password", + "roles" : ["remote_search"] + }"""); + assertOK(adminClient().performRequest(putUserRequest)); + + var putRoleOnRemoteClusterRequest = new Request("PUT", "/_security/role/" + REMOTE_SEARCH_ROLE); + putRoleOnRemoteClusterRequest.setJsonEntity(""" + { + "indices": [ + { + "names": ["*"], + "privileges": ["read"] + } + ], + "cluster": [ "monitor_enrich", "manage_own_api_key" ], + "remote_indices": [ + { + "names": ["*"], + "privileges": ["read"], + "clusters": ["my_remote_cluster"] + } + ], + "remote_cluster": [ + { + "privileges": ["monitor_enrich"], + "clusters": ["my_remote_cluster"] + } + ] + }"""); + assertOK(adminClient().performRequest(putRoleOnRemoteClusterRequest)); + } + + private void setSourceData() throws IOException { + Request createIndex = new Request("PUT", "employees"); + createIndex.setJsonEntity(""" + { + "mappings": { + "properties": { + "id": { "type": "integer" }, + "email": { "type": "text" }, + "designation": { "type": "text" } + } + } + } + """); + assertOK(client().performRequest(createIndex)); + assertOK(performRequestAgainstFulfillingCluster(createIndex)); + + Request bulkRequest = new Request("POST", "/_bulk?refresh=true"); + bulkRequest.setJsonEntity(""" + { "index": { "_index": "employees" } } + { "id": 1, "email": "a@corp.co", "designation": "SDE intern"} + { "index": { "_index": "employees" } } + { "id": 2, "email": "b@corp.co", "designation": "SDE 1"} + { "index": { "_index": "employees" } } + { "id": 3, "email": "c@corp.co", "designation": "SDE 2"} + { "index": { "_index": "employees" } } + { "id": 4, "email": "d@corp.co", "designation": "SSE"} + { "index": { "_index": "employees" } } + { "id": 5, "email": "e@corp.co", "designation": "PSE 1"} + { "index": { "_index": "employees" } } + { "id": 6, "email": "f@corp.co", "designation": "PSE 2"} + """); + assertOK(client().performRequest(bulkRequest)); + assertOK(performRequestAgainstFulfillingCluster(bulkRequest)); + + createIndex = new Request("PUT", "to-be-enriched"); + createIndex.setJsonEntity(""" + { + "mappings": { + "properties": { + "email": { "type": "text" } + } + } + } + """); + assertOK(client().performRequest(createIndex)); + assertOK(performRequestAgainstFulfillingCluster(createIndex)); + + bulkRequest = new Request("POST", "/_bulk?refresh=true"); + bulkRequest.setJsonEntity(""" + { "index": { "_index": "to-be-enriched" } } + { "email": "a@corp.co"} + { "index": { "_index": "to-be-enriched" } } + { "email": "b@corp.co"} + { "index": { "_index": "to-be-enriched" } } + { "email": "c@corp.co"} + """); + assertOK(client().performRequest(bulkRequest)); + + bulkRequest = new Request("POST", "/_bulk?refresh=true"); + bulkRequest.setJsonEntity(""" + { "index": { "_index": "to-be-enriched" } } + { "email": "d@corp.co"} + { "index": { "_index": "to-be-enriched" } } + { "email": "e@corp.co"} + { "index": { "_index": "to-be-enriched" } } + { "email": "f@corp.co"} + """); + assertOK(performRequestAgainstFulfillingCluster(bulkRequest)); + } + + private Request createPolicy(String policyName, String matchIndex, String matchField, String[] enrichFields) throws IOException { + XContentBuilder body = JsonXContent.contentBuilder(); + body.startObject(); + body.startObject("match"); + body.field("indices", matchIndex); + body.field("match_field", matchField); + body.field("enrich_fields", enrichFields); + + body.endObject(); + body.endObject(); + + return makeRequest("PUT", "_enrich/policy/" + policyName, body); + } + + private Request executePolicy(String policyName) { + return new Request("PUT", "_enrich/policy/employees-policy/_execute"); + } + + private Request esqlRequest(String query) throws IOException { + XContentBuilder body = JsonXContent.contentBuilder(); + + body.startObject(); + body.field("query", query); + body.field("include_ccs_metadata", true); + body.endObject(); + + return makeRequest("POST", "_query", body); + } + + private Request makeRequest(String method, String endpoint, XContentBuilder requestBody) { + Request request = new Request(method, endpoint); + request.setJsonEntity(Strings.toString(requestBody)); + return request; + } + + private Response performRequestWithRemoteSearchUser(final Request request) throws IOException { + request.setOptions( + RequestOptions.DEFAULT.toBuilder().addHeader("Authorization", headerFromRandomAuthMethod(REMOTE_SEARCH_USER, PASS)) + ); + return client().performRequest(request); + } +} diff --git a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityEsqlIT.java b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityEsqlIT.java index 09449f81121fd..21868e5b4d294 100644 --- a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityEsqlIT.java +++ b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityEsqlIT.java @@ -158,6 +158,10 @@ public class RemoteClusterSecurityEsqlIT extends AbstractRemoteClusterSecurityTe })).around(fulfillingCluster).around(queryCluster); public void populateData() throws Exception { + populateData(true); + } + + public void populateData(boolean includeEnrichPolicyOnRemoteCluster) throws Exception { CheckedConsumer setupEnrich = client -> { Request createIndex = new Request("PUT", "countries"); createIndex.setJsonEntity(""" @@ -204,8 +208,10 @@ public void populateData() throws Exception { } } """); - assertOK(performRequestWithAdminUser(client, createEnrich)); - assertOK(performRequestWithAdminUser(client, new Request("PUT", "_enrich/policy/countries/_execute"))); + if (includeEnrichPolicyOnRemoteCluster) { + assertOK(performRequestWithAdminUser(client, createEnrich)); + assertOK(performRequestWithAdminUser(client, new Request("PUT", "_enrich/policy/countries/_execute"))); + } performRequestWithAdminUser(client, new Request("DELETE", "/countries")); }; // Fulfilling cluster @@ -331,7 +337,12 @@ public void wipeData() throws Exception { performRequestWithAdminUser(client, new Request("DELETE", "/employees")); performRequestWithAdminUser(client, new Request("DELETE", "/employees2")); performRequestWithAdminUser(client, new Request("DELETE", "/employees3")); - performRequestWithAdminUser(client, new Request("DELETE", "/_enrich/policy/countries")); + final Response response = performRequestWithAdminUser(client, new Request("GET", "/_enrich/policy/countries")); + final Map getResponseMap = responseAsMap(response); + List policies = (List) getResponseMap.get("policies"); + if (policies.isEmpty() == false) { + performRequestWithAdminUser(client, new Request("DELETE", "/_enrich/policy/countries")); + } }; wipe.accept(fulfillingClusterClient); wipe.accept(client()); @@ -811,6 +822,78 @@ public void testCrossClusterEnrichWithOnlyRemotePrivs() throws Exception { assertThat(flatList, containsInAnyOrder(1, 3, "usa", "germany")); } + public void testCrossClusterEnrichWithMissingEnrichPolicySkipUnavailableFalse() throws Exception { + configureRemoteCluster(REMOTE_CLUSTER_ALIAS, fulfillingCluster, false, randomBoolean(), false); + populateData(false); + { + Request request = esqlRequest(""" + FROM my_remote_cluster:employees + | ENRICH countries + | STATS size=count(*) by country + | SORT size DESC + | LIMIT 2"""); + + ResponseException e = expectThrows(ResponseException.class, () -> performRequestWithRemoteSearchUser(request)); + assertThat(e.getMessage(), containsString("verification_exception")); + assertThat(e.getMessage(), containsString("cannot find enrich policy [countries]")); + } + } + + @SuppressWarnings("unchecked") + public void testCrossClusterEnrichWithMissingEnrichPolicySkipUnavailableTrue() throws Exception { + configureRemoteCluster(REMOTE_CLUSTER_ALIAS, fulfillingCluster, false, randomBoolean(), true); + populateData(false); + { + Request request = esqlRequest(""" + FROM my_remote_cluster:employees + | ENRICH countries + | STATS size=count(*) by country + | SORT size DESC + | LIMIT 2"""); + + Response response = performRequestWithRemoteSearchUser(request); + assertOK(response); + + /** + * Expected response + * took=109, + * columns=[{name=, type=null}], + * values=[], + * _clusters={running=0, total=1, failed=0, partial=0, successful=0, skipped=1, + * details={my_remote_cluster= + * {_shards={total=0, failed=0, successful=0, skipped=0}, + * took=109, indices=employees, + * failures=[{reason={reason=failed to resolve enrich policy [countries], + * type=illegal_state_exception}, index=null, shard=-1}], status=skipped}}, + * }} + */ + + Map responseAsMap = entityAsMap(response); + List columns = (List) responseAsMap.get("columns"); + List values = (List) responseAsMap.get("values"); + assertThat(columns.size(), equalTo(1)); + Map column1 = (Map) columns.get(0); + assertThat(column1.get("name").toString(), equalTo("")); + assertThat(values.size(), equalTo(0)); + Map clusters = (Map) responseAsMap.get("_clusters"); + + assertThat((int) clusters.get("total"), is(1)); + assertThat((int) clusters.get("successful"), is(0)); + assertThat((int) clusters.get("skipped"), is(1)); + assertThat((int) clusters.get("failed"), is(0)); + + Map details = (Map) clusters.get("details"); + Map invalidRemoteEntry = (Map) details.get("my_remote_cluster"); + assertThat(invalidRemoteEntry.get("status").toString(), equalTo("skipped")); + List failures = (List) invalidRemoteEntry.get("failures"); + assertThat(failures.size(), equalTo(1)); + Map failuresMap = (Map) failures.get(0); + Map reason = (Map) failuresMap.get("reason"); + assertThat(reason.get("type").toString(), equalTo("illegal_state_exception")); + assertThat(reason.get("reason").toString(), containsString("failed to resolve enrich policy [countries]")); + } + } + private void createAliases() throws Exception { Request createAlias = new Request("POST", "_aliases"); createAlias.setJsonEntity("""