From c53f65cdf900b047dc62cb9eee2f3e5c38f60c35 Mon Sep 17 00:00:00 2001 From: Michael Peterson Date: Mon, 18 Nov 2024 09:24:34 -0500 Subject: [PATCH 01/12] Init commit EnrichResolution changed --- .../xpack/esql/analysis/EnrichResolution.java | 15 +++-- .../esql/enrich/EnrichPolicyResolver.java | 2 +- .../xpack/esql/session/EsqlSession.java | 2 +- .../esql/session/EsqlSessionCCSUtils.java | 4 ++ .../session/EsqlSessionCCSUtilsTests.java | 64 ++++++++++++++++++- 5 files changed, 78 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java index 4f6886edc5fbc..9f409fe57934d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java @@ -23,7 +23,8 @@ public final class EnrichResolution { private final Map resolvedPolicies = ConcurrentCollections.newConcurrentMap(); private final Map errors = ConcurrentCollections.newConcurrentMap(); - private final Map unavailableClusters = ConcurrentCollections.newConcurrentMap(); + // skip_unavailable=true remote clusters that are unavailable or had errors when resolving the enrich policy + private final Map unusableRemotes = ConcurrentCollections.newConcurrentMap(); public ResolvedEnrichPolicy getResolvedPolicy(String policyName, Enrich.Mode mode) { return resolvedPolicies.get(new Key(policyName, mode)); @@ -52,12 +53,16 @@ public void addError(String policyName, Enrich.Mode mode, String reason) { errors.putIfAbsent(new Key(policyName, mode), reason); } - public void addUnavailableCluster(String clusterAlias, Exception e) { - unavailableClusters.put(clusterAlias, e); + public void addUnusableRemote(String clusterAlias, Exception e) { + unusableRemotes.put(clusterAlias, e); } - public Map getUnavailableClusters() { - return unavailableClusters; + /** + * @return Remote clusters that are either unavailable (disconnected) or had a failure when resolving the enrich policy. + * Map key is cluster alias. Map value is the Exception showing the particular error encountered. + */ + public Map unusableRemotes() { + return unusableRemotes; } private record Key(String policyName, Enrich.Mode mode) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index c8a7a6bcc4e98..0016828d12109 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -121,7 +121,7 @@ public void resolvePolicies( for (Map.Entry entry : lookupResponses.entrySet()) { String clusterAlias = entry.getKey(); if (entry.getValue().connectionError != null) { - enrichResolution.addUnavailableCluster(clusterAlias, entry.getValue().connectionError); + enrichResolution.addUnusableRemote(clusterAlias, entry.getValue().connectionError); // remove unavailable cluster from the list of clusters which is used below to create the ResolvedEnrichPolicy remoteClusters.remove(clusterAlias); } else { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index c576d15f92608..11420d5cfae63 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -303,7 +303,7 @@ private void preAnalyze( .stream() .map(ResolvedEnrichPolicy::matchField) .collect(Collectors.toSet()); - Map unavailableClusters = enrichResolution.getUnavailableClusters(); + Map unavailableClusters = enrichResolution.unusableRemotes(); preAnalyzeIndices(parsed, executionInfo, unavailableClusters, l.delegateFailureAndWrap((ll, indexResolution) -> { // TODO in follow-PR (for skip_unavailble handling of missing concrete indexes) add some tests for invalid index // resolution to updateExecutionInfo diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java index 4fe2fef7e3f45..e1c758fd6dbac 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java @@ -204,6 +204,10 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionIn * Mark it as SKIPPED with 0 shards searched and took=0. */ for (String c : clustersWithNoMatchingIndices) { + if (executionInfo.getCluster(c).getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) { + // if cluster was already marked SKIPPED during enrich policy resolution, do not overwrite + continue; + } final String indexExpression = executionInfo.getCluster(c).getIndexExpression(); if (missingIndicesIsFatal(c, executionInfo)) { String error = Strings.format( diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java index 60b632c443f8e..da85d1810e860 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.common.Strings; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexMode; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.ConnectTransportException; @@ -251,7 +252,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); + executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "x*", true)); executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); EsIndex esIndex = new EsIndex( @@ -277,13 +278,15 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); - assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); + assertThat(remote1Cluster.getIndexExpression(), equalTo("x*")); assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); assertThat(remote1Cluster.getTook().millis(), equalTo(0L)); assertThat(remote1Cluster.getTotalShards(), equalTo(0)); assertThat(remote1Cluster.getSuccessfulShards(), equalTo(0)); assertThat(remote1Cluster.getSkippedShards(), equalTo(0)); assertThat(remote1Cluster.getFailedShards(), equalTo(0)); + assertThat(remote1Cluster.getFailures().size(), equalTo(1)); + assertThat(remote1Cluster.getFailures().get(0).getCause().getMessage(), containsString("Unknown index [x*]")); EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); @@ -350,6 +353,63 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { ); assertThat(ve.getDetailedMessage(), containsString("Unknown index [remote2:mylogs1,mylogs2,logs*]")); } + + // remotes already marked as skipped should not be modified + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); + executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); + // remote1 has already been marked as SKIPPED with a previous error message, so should not be overwritten + // by updateExecutionInfoWithClustersWithNoMatchingIndices + executionInfo.swapCluster( + remote1Alias, + (k, v) -> new EsqlExecutionInfo.Cluster( + remote1Alias, + "*", + true, + EsqlExecutionInfo.Cluster.Status.SKIPPED, + 0, + 0, + 0, + 0, + List.of(new ShardSearchFailure(new IllegalStateException("previous error message"))), + TimeValue.timeValueMillis(22) + ) + ); + executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); + EsIndex esIndex = new EsIndex( + "logs*,remote2:mylogs1,remote2:mylogs2,remote2:logs*", + randomMapping(), + Map.of( + "logs-a", + IndexMode.STANDARD, + "remote2:mylogs1", + IndexMode.STANDARD, + "remote2:mylogs2", + IndexMode.STANDARD, + "remote2:logs-b", + IndexMode.STANDARD + ) + ); + // remote1 is missing from the concrete indices so would normally be marked as skipped and set with failure of "Unknown index" + IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), Map.of()); + EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); + assertThat(localCluster.getIndexExpression(), equalTo("logs*")); + assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); + EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); + assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); + assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remote1Cluster.getTook().millis(), equalTo(22L)); + assertThat(remote1Cluster.getTotalShards(), equalTo(0)); + assertThat(remote1Cluster.getSuccessfulShards(), equalTo(0)); + assertThat(remote1Cluster.getSkippedShards(), equalTo(0)); + assertThat(remote1Cluster.getFailedShards(), equalTo(0)); + assertThat(remote1Cluster.getFailures().size(), equalTo(1)); + assertThat(remote1Cluster.getFailures().get(0).getCause().getMessage(), containsString("previous error message")); + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); + assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); + assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); + } } public void testDetermineUnavailableRemoteClusters() { From ad7c1c44793300e0472bead39119904a7f879323 Mon Sep 17 00:00:00 2001 From: Michael Peterson Date: Mon, 18 Nov 2024 12:13:59 -0500 Subject: [PATCH 02/12] Sympatico with the original branch - new changes start after this --- .../esql/action/CrossClustersEnrichIT.java | 316 ++++++++++++++++++ .../esql/enrich/EnrichPolicyResolver.java | 111 ++++-- .../xpack/esql/session/EsqlSession.java | 30 +- .../enrich/EnrichPolicyResolverTests.java | 4 +- 4 files changed, 417 insertions(+), 44 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java index e8e9f45694e9c..475076fd0af63 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java @@ -24,6 +24,7 @@ import org.elasticsearch.protocol.xpack.XPackInfoResponse; import org.elasticsearch.reindex.ReindexPlugin; import org.elasticsearch.test.AbstractMultiClustersTestCase; +import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; import org.elasticsearch.xpack.core.XPackSettings; @@ -606,6 +607,321 @@ public void testEnrichCoordinatorThenEnrichRemote() { ); } + public void testWithHostsPolicyWithMissingEnrichPolicyOnSkipUnavailableTrueCluster() { + // delete enrich policies from cluster c1 + for (String policy : List.of("hosts", "vendors")) { + assertAcked( + client("c1").execute(DeleteEnrichPolicyAction.INSTANCE, new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policy)) + ); + } + for (var mode : Enrich.Mode.values()) { + String query = "FROM events | eval ip= TO_STR(host) | " + enrichHosts(mode) + " | stats c = COUNT(*) by os | SORT os"; + try (EsqlQueryResponse resp = runQuery(query, null)) { + List> rows = getValuesList(resp); + assertThat( + rows, + equalTo( + List.of( + List.of(2L, "Android"), + List.of(1L, "Linux"), + List.of(1L, "MacOS"), + List.of(4L, "Windows"), + Arrays.asList(1L, (String) null) + ) + ) + ); + assertFalse(resp.getExecutionInfo().isCrossClusterSearch()); + } + } + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + boolean responseExpectMeta = includeCCSMetadata.v2(); + { + // since these two modes require the enrich policy on the remote cluster - c1 will be skipped + Enrich.Mode mode = randomFrom(Enrich.Mode.REMOTE, Enrich.Mode.ANY); + String query = "FROM *:events | eval ip= TO_STR(host) | " + enrichHosts(mode) + " | stats c = COUNT(*) by os | SORT os"; + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { + List> rows = getValuesList(resp); + assertThat(rows.size(), greaterThanOrEqualTo(1)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of("c1", "c2"))); + assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); + assertTrue(executionInfo.isCrossClusterSearch()); + EsqlExecutionInfo.Cluster remote1 = executionInfo.getCluster("c1"); + // this test assumes (but does not set) skip_unavailable=true + assertThat(remote1.isSkipUnavailable(), equalTo(true)); + assertThat(remote1.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote1.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remote1.getIndexExpression(), equalTo("events")); + assertThat(remote1.getTotalShards(), equalTo(0)); + assertThat(remote1.getSuccessfulShards(), equalTo(0)); + assertThat(remote1.getSkippedShards(), equalTo(0)); + assertThat(remote1.getFailedShards(), equalTo(0)); + assertThat(remote1.getFailures().size(), equalTo(1)); + assertThat(remote1.getFailures().get(0).getCause().getMessage(), containsString("failed to resolve enrich policy [hosts]")); + EsqlExecutionInfo.Cluster remote2 = executionInfo.getCluster("c2"); + assertThat(remote2.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote2.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(remote2.getIndexExpression(), equalTo("events")); + assertThat(remote2.getTotalShards(), equalTo(1)); + assertThat(remote2.getSuccessfulShards(), equalTo(1)); + assertThat(remote2.getSkippedShards(), equalTo(0)); + assertThat(remote2.getFailedShards(), equalTo(0)); + assertThat(remote2.getFailures().size(), equalTo(0)); + } + } + { + Enrich.Mode mode = randomFrom(Enrich.Mode.REMOTE, Enrich.Mode.ANY); + // c1 queried by itself - should be skipped with "" column result + String query = "FROM c1:events | eval ip= TO_STR(host) | " + enrichHosts(mode) + " | stats c = COUNT(*) by os | SORT os"; + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { + List> rows = getValuesList(resp); + assertThat(rows.size(), equalTo(0)); + assertThat(resp.columns().size(), equalTo(1)); + assertThat(resp.columns().get(0).name(), equalTo("")); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of("c1"))); + assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); + assertTrue(executionInfo.isCrossClusterSearch()); + EsqlExecutionInfo.Cluster remote1 = executionInfo.getCluster("c1"); + assertThat(remote1.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote1.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remote1.getIndexExpression(), equalTo("events")); + assertThat(remote1.getTotalShards(), equalTo(0)); + assertThat(remote1.getSuccessfulShards(), equalTo(0)); + assertThat(remote1.getSkippedShards(), equalTo(0)); + assertThat(remote1.getFailedShards(), equalTo(0)); + assertThat(remote1.getFailures().size(), equalTo(1)); + assertThat(remote1.getFailures().get(0).getCause().getMessage(), containsString("failed to resolve enrich policy [hosts]")); + } + } + { + // COORDINATOR modes requires the enrich policy only on the coordinator, so c1 should not be skipped + Enrich.Mode mode = Enrich.Mode.COORDINATOR; + String query = "FROM *:events | eval ip= TO_STR(host) | " + enrichHosts(mode) + " | stats c = COUNT(*) by os | SORT os"; + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { + List> rows = getValuesList(resp); + assertThat(rows.size(), greaterThanOrEqualTo(1)); + assertThat( + rows, + equalTo( + List.of( + List.of(1L, "Android"), + List.of(2L, "Linux"), + List.of(4L, "MacOS"), + List.of(3L, "Windows"), + List.of(1L, "iOS"), + Arrays.asList(2L, (String) null) + ) + ) + ); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of("c1", "c2"))); + assertCCSExecutionInfoDetails(executionInfo); + } + } + for (var mode : Enrich.Mode.values()) { + String query = "FROM *:events,events | eval ip= TO_STR(host) | " + enrichHosts(mode) + " | stats c = COUNT(*) by os | SORT os"; + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { + List> rows = getValuesList(resp); + assertThat(rows.size(), greaterThanOrEqualTo(1)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, "c1", "c2"))); + if (mode == Enrich.Mode.COORDINATOR) { + // COORDINATOR mode only requires the enrich policy on the local cluster + assertCCSExecutionInfoDetails(executionInfo); + } else { + EsqlExecutionInfo.Cluster remote1 = executionInfo.getCluster("c1"); + assertThat(remote1.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote1.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remote1.getFailures().size(), equalTo(1)); + assertThat( + remote1.getFailures().get(0).getCause().getMessage(), + containsString("failed to resolve enrich policy [hosts]") + ); + EsqlExecutionInfo.Cluster remote2 = executionInfo.getCluster("c2"); + assertThat(remote2.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote2.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(remote2.getFailures().size(), equalTo(0)); + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(localCluster.getFailures().size(), equalTo(0)); + } + } + } + // delete enrich policies from cluster c2 also + for (String policy : List.of("hosts", "vendors")) { + assertAcked( + client("c2").execute(DeleteEnrichPolicyAction.INSTANCE, new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policy)) + ); + } + for (var mode : Enrich.Mode.values()) { + String query = "FROM *:events,events | eval ip= TO_STR(host) | " + enrichHosts(mode) + " | stats c = COUNT(*) by os | SORT os"; + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { + List> rows = getValuesList(resp); + assertThat(rows.size(), greaterThanOrEqualTo(1)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, "c1", "c2"))); + if (mode == Enrich.Mode.COORDINATOR) { + // COORDINATOR mode only requires the enrich policy on the local cluster + assertCCSExecutionInfoDetails(executionInfo); + } else { + EsqlExecutionInfo.Cluster remote1 = executionInfo.getCluster("c1"); + assertThat(remote1.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote1.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remote1.getFailures().size(), equalTo(1)); + assertThat( + remote1.getFailures().get(0).getCause().getMessage(), + containsString("failed to resolve enrich policy [hosts]") + ); + EsqlExecutionInfo.Cluster remote2 = executionInfo.getCluster("c2"); + assertThat(remote2.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote2.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remote2.getFailures().size(), equalTo(1)); + assertThat( + remote1.getFailures().get(0).getCause().getMessage(), + containsString("failed to resolve enrich policy [hosts]") + ); + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(localCluster.getFailures().size(), equalTo(0)); + } + } + } + } + + public void testRemoteHasOnePolicyButNotTheOther() { + // delete the hosts enrich policy from c1, but not the vendors policy + assertAcked( + client("c1").execute(DeleteEnrichPolicyAction.INSTANCE, new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "hosts")) + ); + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + boolean responseExpectMeta = includeCCSMetadata.v2(); + { + for (Enrich.Mode hostMode : List.of(Enrich.Mode.ANY, Enrich.Mode.REMOTE)) { + var query = String.format(Locale.ROOT, """ + FROM *:events,events + | eval ip= TO_STR(host) + | %s + | %s + | stats c = COUNT(*) by vendor + | sort vendor + """, enrichHosts(hostMode), enrichVendors(Enrich.Mode.REMOTE)); + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { + assertThat(getValuesList(resp).size(), greaterThanOrEqualTo(1)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, "c1", "c2"))); + EsqlExecutionInfo.Cluster remote1 = executionInfo.getCluster("c1"); + // this test assumes (but does not set) skip_unavailable=true + assertThat(remote1.isSkipUnavailable(), equalTo(true)); + assertThat(remote1.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote1.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remote1.getIndexExpression(), equalTo("events")); + assertThat(remote1.getTotalShards(), equalTo(0)); + assertThat(remote1.getSuccessfulShards(), equalTo(0)); + assertThat(remote1.getSkippedShards(), equalTo(0)); + assertThat(remote1.getFailedShards(), equalTo(0)); + assertThat(remote1.getFailures().size(), equalTo(1)); + assertThat( + remote1.getFailures().get(0).getCause().getMessage(), + containsString("failed to resolve enrich policy [hosts]") + ); + EsqlExecutionInfo.Cluster remote2 = executionInfo.getCluster("c2"); + assertThat(remote2.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote2.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(remote2.getIndexExpression(), equalTo("events")); + assertThat(remote2.getTotalShards(), equalTo(1)); + assertThat(remote2.getSuccessfulShards(), equalTo(1)); + assertThat(remote2.getSkippedShards(), equalTo(0)); + assertThat(remote2.getFailedShards(), equalTo(0)); + assertThat(remote2.getFailures().size(), equalTo(0)); + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(localCluster.getIndexExpression(), equalTo("events")); + assertThat(localCluster.getTotalShards(), equalTo(1)); + assertThat(localCluster.getSuccessfulShards(), equalTo(1)); + assertThat(localCluster.getSkippedShards(), equalTo(0)); + assertThat(localCluster.getFailedShards(), equalTo(0)); + assertThat(localCluster.getFailures().size(), equalTo(0)); + } + } + // delete the vendors policy but not hosts from c2 + assertAcked( + client("c2").execute( + DeleteEnrichPolicyAction.INSTANCE, + new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "vendors") + ) + ); + { + for (Enrich.Mode hostMode : List.of(Enrich.Mode.ANY, Enrich.Mode.REMOTE)) { + var query = String.format(Locale.ROOT, """ + FROM *:events,events + | eval ip= TO_STR(host) + | %s + | %s + | stats c = COUNT(*) by vendor + | sort vendor + """, enrichHosts(hostMode), enrichVendors(Enrich.Mode.REMOTE)); + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { + assertThat(getValuesList(resp).size(), greaterThanOrEqualTo(1)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, "c1", "c2"))); + EsqlExecutionInfo.Cluster remote1 = executionInfo.getCluster("c1"); + // this test assumes (but does not set) skip_unavailable=true + assertThat(remote1.isSkipUnavailable(), equalTo(true)); + assertThat(remote1.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote1.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remote1.getIndexExpression(), equalTo("events")); + assertThat(remote1.getTotalShards(), equalTo(0)); + assertThat(remote1.getSuccessfulShards(), equalTo(0)); + assertThat(remote1.getSkippedShards(), equalTo(0)); + assertThat(remote1.getFailedShards(), equalTo(0)); + assertThat(remote1.getFailures().size(), equalTo(1)); + assertThat( + remote1.getFailures().get(0).getCause().getMessage(), + containsString("failed to resolve enrich policy [hosts]") + ); + EsqlExecutionInfo.Cluster remote2 = executionInfo.getCluster("c2"); + // this test assumes (but does not set) skip_unavailable=true + assertThat(remote2.isSkipUnavailable(), equalTo(true)); + assertThat(remote2.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(remote2.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(remote2.getIndexExpression(), equalTo("events")); + assertThat(remote2.getTotalShards(), equalTo(0)); + assertThat(remote2.getSuccessfulShards(), equalTo(0)); + assertThat(remote2.getSkippedShards(), equalTo(0)); + assertThat(remote2.getFailedShards(), equalTo(0)); + assertThat(remote2.getFailures().size(), equalTo(1)); + assertThat( + remote2.getFailures().get(0).getCause().getMessage(), + containsString("failed to resolve enrich policy [vendors]") + ); + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(localCluster.getIndexExpression(), equalTo("events")); + assertThat(localCluster.getTotalShards(), equalTo(1)); + assertThat(localCluster.getSuccessfulShards(), equalTo(1)); + assertThat(localCluster.getSkippedShards(), equalTo(0)); + assertThat(localCluster.getFailedShards(), equalTo(0)); + assertThat(localCluster.getFailures().size(), equalTo(0)); + } + } + } + } + } + protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse) { EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); request.query(query); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index 0016828d12109..f48a957ca5c17 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.iterable.Iterables; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Tuple; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -98,12 +99,12 @@ public record UnresolvedPolicy(String name, Enrich.Mode mode) { /** * Resolves a set of enrich policies * - * @param targetClusters the target clusters + * @param targetClusters the target clusters; v1: cluster alias; v2: skip_unavailable setting (null for local cluster) * @param unresolvedPolicies the unresolved policies * @param listener notified with the enrich resolution */ public void resolvePolicies( - Collection targetClusters, + List> targetClusters, Collection unresolvedPolicies, ActionListener listener ) { @@ -111,9 +112,9 @@ public void resolvePolicies( listener.onResponse(new EnrichResolution()); return; } - final Set remoteClusters = new HashSet<>(targetClusters); - final boolean includeLocal = remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); - lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, listener.map(lookupResponses -> { + final Set> remotes = new HashSet<>(targetClusters); + final boolean includeLocal = remotes.removeIf(t -> t.v1().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)); + lookupPolicies(remotes.stream().map(t -> t.v1()).toList(), includeLocal, unresolvedPolicies, listener.map(lookupResponses -> { final EnrichResolution enrichResolution = new EnrichResolution(); Map lookupResponsesToProcess = new HashMap<>(); @@ -123,36 +124,59 @@ public void resolvePolicies( if (entry.getValue().connectionError != null) { enrichResolution.addUnusableRemote(clusterAlias, entry.getValue().connectionError); // remove unavailable cluster from the list of clusters which is used below to create the ResolvedEnrichPolicy - remoteClusters.remove(clusterAlias); + boolean removed = remotes.removeIf(t -> t.v1().equals(clusterAlias)); + assert removed : "Remote " + clusterAlias + " was not removed from the remotes list."; } else { lookupResponsesToProcess.put(clusterAlias, entry.getValue()); } } for (UnresolvedPolicy unresolved : unresolvedPolicies) { - Tuple resolved = mergeLookupResults( + // MP TODO: change mergeLookupResults to return MergedPolicyLookupResult + MergedPolicyLookupResult resolved = mergeLookupResults( unresolved, - calculateTargetClusters(unresolved.mode, includeLocal, remoteClusters), + calculateTargetClusters(unresolved.mode, includeLocal, remotes), lookupResponsesToProcess ); - if (resolved.v1() != null) { - enrichResolution.addResolvedPolicy(unresolved.name, unresolved.mode, resolved.v1()); - } else { - assert resolved.v2() != null; - enrichResolution.addError(unresolved.name, unresolved.mode, resolved.v2()); + for (Map.Entry entry : resolved.remotesToBeSkipped().entrySet()) { + enrichResolution.addUnusableRemote(entry.getKey(), entry.getValue()); + } + // If a remote-only CCS is done and all the remotes are "unusable" (due to missing enrich policies) + // *and* skippable (skip_unavailable=true), don't fill in either the resolved policy or the error on + // the enrichResolution object. The only field with contents will be the "unusable remotes" field. + if (resolved.resolvedPolicy() != null) { + enrichResolution.addResolvedPolicy(unresolved.name, unresolved.mode, resolved.resolvedPolicy()); + } else if (resolved.error() != null) { + enrichResolution.addError(unresolved.name, unresolved.mode, resolved.error()); } } return enrichResolution; })); } - private Collection calculateTargetClusters(Enrich.Mode mode, boolean includeLocal, Set remoteClusters) { + record MergedPolicyLookupResult( + @Nullable ResolvedEnrichPolicy resolvedPolicy, + Map remotesToBeSkipped, + @Nullable String error + ) {} + + /** + * @param mode + * @param includeLocal + * @param remoteClusters + * @return Collection of Tuples where v1: String=clusterAlias, v2: Boolean=skipUnavailable setting + */ + private Collection> calculateTargetClusters( + Enrich.Mode mode, + boolean includeLocal, + Set> remoteClusters + ) { return switch (mode) { - case ANY -> CollectionUtils.appendToCopy(remoteClusters, RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); - case COORDINATOR -> List.of(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + case ANY -> CollectionUtils.appendToCopy(remoteClusters, new Tuple<>(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, null)); + case COORDINATOR -> List.of(new Tuple<>(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, null)); case REMOTE -> includeLocal - ? CollectionUtils.appendToCopy(remoteClusters, RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) + ? CollectionUtils.appendToCopy(remoteClusters, new Tuple<>(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, null)) : remoteClusters; }; } @@ -161,18 +185,22 @@ private Collection calculateTargetClusters(Enrich.Mode mode, boolean inc * Resolve an enrich policy by merging the lookup responses from the target clusters. * @return a resolved enrich policy or an error */ - private Tuple mergeLookupResults( + private MergedPolicyLookupResult mergeLookupResults( UnresolvedPolicy unresolved, - Collection targetClusters, + Collection> targetClusters, Map lookupResults ) { String policyName = unresolved.name; if (targetClusters.isEmpty()) { - return Tuple.tuple(null, "enrich policy [" + policyName + "] cannot be resolved since remote clusters are unavailable"); + String error = "enrich policy [" + policyName + "] cannot be resolved since remote clusters are unavailable"; + return new MergedPolicyLookupResult(null, Collections.emptyMap(), error); } final Map policies = new HashMap<>(); - final List failures = new ArrayList<>(); - for (String cluster : targetClusters) { + final List failures = new ArrayList<>(); // (fatal) failures on local or skip_unavailable=false remote + final Map remotesToBeSkipped = new HashMap<>(); // skip_unavailable=true remotes enrich policy failures + for (Tuple clusterInfo : targetClusters) { + String cluster = clusterInfo.v1(); + boolean skipUnavailable = clusterInfo.v2() == null ? false : clusterInfo.v2(); LookupResponse lookupResult = lookupResults.get(cluster); if (lookupResult != null) { assert lookupResult.connectionError == null : "Should never have a non-null connectionError here"; @@ -182,35 +210,51 @@ private Tuple mergeLookupResults( } else { final String failure = lookupResult.failures.get(policyName); if (failure != null) { - failures.add(failure); + if (skipUnavailable) { + remotesToBeSkipped.put(cluster, new IllegalStateException(failure)); + } else { + failures.add(failure); + } + } else if (skipUnavailable) { + // code path where there was no enrich policy found at all on the remote + remotesToBeSkipped.put(cluster, new IllegalStateException("failed to resolve enrich policy [" + policyName + "]")); } } } } - if (targetClusters.size() != policies.size()) { + if (targetClusters.size() != policies.size() + remotesToBeSkipped.size()) { final String reason; if (failures.isEmpty()) { - List missingClusters = targetClusters.stream().filter(c -> policies.containsKey(c) == false).sorted().toList(); - reason = missingPolicyError(policyName, targetClusters, missingClusters); + List missingClusters = targetClusters.stream() + .filter(t -> policies.containsKey(t.v1()) == false) + .map(t -> t.v1()) + .sorted() + .toList(); + reason = missingPolicyError(policyName, targetClusters.stream().map(t -> t.v1()).toList(), missingClusters); } else { reason = "failed to resolve enrich policy [" + policyName + "]; reason " + failures; } - return Tuple.tuple(null, reason); + // return Tuple.tuple(null, reason); + return new MergedPolicyLookupResult(null, remotesToBeSkipped, reason); + } else if (targetClusters.size() == remotesToBeSkipped.size()) { + return new MergedPolicyLookupResult(null, remotesToBeSkipped, null); } + Map mappings = new HashMap<>(); Map concreteIndices = new HashMap<>(); ResolvedEnrichPolicy last = null; for (Map.Entry e : policies.entrySet()) { ResolvedEnrichPolicy curr = e.getValue(); + // MP TODO: I'm not sure this handling is right - do we need to NOT error if skip_un=true? if (last != null && last.matchField().equals(curr.matchField()) == false) { String error = "enrich policy [" + policyName + "] has different match fields "; error += "[" + last.matchField() + ", " + curr.matchField() + "] across clusters"; - return Tuple.tuple(null, error); + return new MergedPolicyLookupResult(null, remotesToBeSkipped, error); } if (last != null && last.matchType().equals(curr.matchType()) == false) { String error = "enrich policy [" + policyName + "] has different match types "; error += "[" + last.matchType() + ", " + curr.matchType() + "] across clusters"; - return Tuple.tuple(null, error); + return new MergedPolicyLookupResult(null, remotesToBeSkipped, error); } // merge mappings for (Map.Entry m : curr.mapping().entrySet()) { @@ -226,7 +270,8 @@ private Tuple mergeLookupResults( if (old != null && old.getDataType().equals(field.getDataType()) == false) { String error = "field [" + m.getKey() + "] of enrich policy [" + policyName + "] has different data types "; error += "[" + old.getDataType() + ", " + field.getDataType() + "] across clusters"; - return Tuple.tuple(null, error); + // TODO: we should return remotesToBeSkipped here (and other error paths), right? + return new MergedPolicyLookupResult(null, remotesToBeSkipped, error); } } if (last != null) { @@ -237,7 +282,9 @@ private Tuple mergeLookupResults( var diff = counts.entrySet().stream().filter(f -> f.getValue() < 2).map(Map.Entry::getKey).limit(20).sorted().toList(); if (diff.isEmpty() == false) { String detailed = "these fields are missing in some policies: " + diff; - return Tuple.tuple(null, "enrich policy [" + policyName + "] has different enrich fields across clusters; " + detailed); + String fullMessage = "enrich policy [" + policyName + "] has different enrich fields across clusters; " + detailed; + // TODO: we should return remotesToBeSkipped here (and other error paths), right? + return new MergedPolicyLookupResult(null, remotesToBeSkipped, fullMessage); } } // merge concrete indices @@ -246,7 +293,7 @@ private Tuple mergeLookupResults( } assert last != null; var resolved = new ResolvedEnrichPolicy(last.matchField(), last.matchType(), last.enrichFields(), concreteIndices, mappings); - return Tuple.tuple(resolved, null); + return new MergedPolicyLookupResult(resolved, remotesToBeSkipped, null); } private String missingPolicyError(String policyName, Collection targetClusters, List missingClusters) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 11420d5cfae63..e35b89b3bd296 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -19,6 +19,7 @@ import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.indices.IndicesExpressionGrouper; @@ -292,11 +293,17 @@ private void preAnalyze( var unresolvedPolicies = preAnalysis.enriches.stream() .map(e -> new EnrichPolicyResolver.UnresolvedPolicy((String) e.policyName().fold(), e.mode())) .collect(Collectors.toSet()); - final Set targetClusters = enrichPolicyResolver.groupIndicesPerCluster( + final Set clusters = enrichPolicyResolver.groupIndicesPerCluster( preAnalysis.indices.stream() .flatMap(t -> Arrays.stream(Strings.commaDelimitedListToStringArray(t.id().index()))) .toArray(String[]::new) ).keySet(); + + List> targetClusters = new ArrayList<>(); + for (String cluster : clusters) { + targetClusters.add(new Tuple<>(cluster, executionInfo.isSkipUnavailable(cluster))); + } + enrichPolicyResolver.resolvePolicies(targetClusters, unresolvedPolicies, listener.delegateFailureAndWrap((l, enrichResolution) -> { // first we need the match_fields names from enrich policies and THEN, with an updated list of fields, we call field_caps API var matchFields = enrichResolution.resolvedEnrichPolicies() @@ -324,16 +331,17 @@ private void preAnalyze( // If new clusters appear when resolving the main indices, we need to resolve the enrich policies again // or exclude main concrete indices. Since this is rare, it's simpler to resolve the enrich policies again. // TODO: add a test for this - if (targetClusters.containsAll(newClusters) == false - // do not bother with a re-resolution if only remotes were requested and all were offline - && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) > 0) { - enrichPolicyResolver.resolvePolicies( - newClusters, - unresolvedPolicies, - ll.map(newEnrichResolution -> action.apply(indexResolution, newEnrichResolution)) - ); - return; - } + // MP TODO: deal with this later in this PR + // if (clusters.containsAll(newClusters) == false + // // do not bother with a re-resolution if only remotes were requested and all were offline + // && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) > 0) { + // enrichPolicyResolver.resolvePolicies( + // newClusters, + // unresolvedPolicies, + // ll.map(newEnrichResolution -> action.apply(indexResolution, newEnrichResolution)) + // ); + // return; + // } } ll.onResponse(action.apply(indexResolution, enrichResolution)); }), matchFields); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java index 39170f1a305df..35f6de43e2473 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexMode; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; @@ -427,6 +428,7 @@ class TestEnrichPolicyResolver extends EnrichPolicyResolver { } EnrichResolution resolvePolicies(Collection clusters, Collection unresolvedPolicies) { + List> clusterInfo = clusters.stream().map(c -> new Tuple<>(c, false)).toList(); PlainActionFuture future = new PlainActionFuture<>(); if (randomBoolean()) { unresolvedPolicies = new ArrayList<>(unresolvedPolicies); @@ -441,7 +443,7 @@ EnrichResolution resolvePolicies(Collection clusters, Collection Date: Mon, 18 Nov 2024 13:47:56 -0500 Subject: [PATCH 03/12] Changed away from Tuple to Map targetClusters in EnrichPolicyResolver. --- .../esql/enrich/EnrichPolicyResolver.java | 64 ++++++++++--------- .../xpack/esql/session/EsqlSession.java | 15 +++-- .../enrich/EnrichPolicyResolverTests.java | 6 +- 3 files changed, 48 insertions(+), 37 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index f48a957ca5c17..53b0fadd79c01 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -17,13 +17,11 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.core.Nullable; -import org.elasticsearch.core.Tuple; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; @@ -54,7 +52,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -99,12 +96,12 @@ public record UnresolvedPolicy(String name, Enrich.Mode mode) { /** * Resolves a set of enrich policies * - * @param targetClusters the target clusters; v1: cluster alias; v2: skip_unavailable setting (null for local cluster) + * @param targetClusters the target clusters; key: cluster alias; value: skip_unavailable setting (null for local cluster) * @param unresolvedPolicies the unresolved policies * @param listener notified with the enrich resolution */ public void resolvePolicies( - List> targetClusters, + Map targetClusters, Collection unresolvedPolicies, ActionListener listener ) { @@ -112,9 +109,10 @@ public void resolvePolicies( listener.onResponse(new EnrichResolution()); return; } - final Set> remotes = new HashSet<>(targetClusters); - final boolean includeLocal = remotes.removeIf(t -> t.v1().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)); - lookupPolicies(remotes.stream().map(t -> t.v1()).toList(), includeLocal, unresolvedPolicies, listener.map(lookupResponses -> { + final Map remotes = new HashMap<>(targetClusters); + final boolean includeLocal = targetClusters.containsKey(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + remotes.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + lookupPolicies(remotes.keySet(), includeLocal, unresolvedPolicies, listener.map(lookupResponses -> { final EnrichResolution enrichResolution = new EnrichResolution(); Map lookupResponsesToProcess = new HashMap<>(); @@ -124,8 +122,8 @@ public void resolvePolicies( if (entry.getValue().connectionError != null) { enrichResolution.addUnusableRemote(clusterAlias, entry.getValue().connectionError); // remove unavailable cluster from the list of clusters which is used below to create the ResolvedEnrichPolicy - boolean removed = remotes.removeIf(t -> t.v1().equals(clusterAlias)); - assert removed : "Remote " + clusterAlias + " was not removed from the remotes list."; + Boolean removedVal = remotes.remove(clusterAlias); + assert removedVal != null : "Remote " + clusterAlias + " was not removed from the remotes list."; } else { lookupResponsesToProcess.put(clusterAlias, entry.getValue()); } @@ -165,29 +163,29 @@ record MergedPolicyLookupResult( * @param mode * @param includeLocal * @param remoteClusters - * @return Collection of Tuples where v1: String=clusterAlias, v2: Boolean=skipUnavailable setting + * @return Map where key: String=clusterAlias, value: Boolean=skipUnavailable setting */ - private Collection> calculateTargetClusters( - Enrich.Mode mode, - boolean includeLocal, - Set> remoteClusters - ) { + private Map calculateTargetClusters(Enrich.Mode mode, boolean includeLocal, Map remoteClusters) { return switch (mode) { - case ANY -> CollectionUtils.appendToCopy(remoteClusters, new Tuple<>(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, null)); - case COORDINATOR -> List.of(new Tuple<>(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, null)); - case REMOTE -> includeLocal - ? CollectionUtils.appendToCopy(remoteClusters, new Tuple<>(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, null)) - : remoteClusters; + case ANY -> copyAndAddLocalCluster(remoteClusters); + case COORDINATOR -> copyAndAddLocalCluster(Collections.emptyMap()); + case REMOTE -> includeLocal ? copyAndAddLocalCluster(remoteClusters) : remoteClusters; }; } + private Map copyAndAddLocalCluster(Map remoteClusters) { + Map newMap = new HashMap<>(remoteClusters); + newMap.put(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, null); + return newMap; + } + /** * Resolve an enrich policy by merging the lookup responses from the target clusters. * @return a resolved enrich policy or an error */ private MergedPolicyLookupResult mergeLookupResults( UnresolvedPolicy unresolved, - Collection> targetClusters, + Map targetClusters, Map lookupResults ) { String policyName = unresolved.name; @@ -195,12 +193,14 @@ private MergedPolicyLookupResult mergeLookupResults( String error = "enrich policy [" + policyName + "] cannot be resolved since remote clusters are unavailable"; return new MergedPolicyLookupResult(null, Collections.emptyMap(), error); } + // key for this map is cluster alias; a cluster will be in this map only if it has resolved the enrich policy/policies needed final Map policies = new HashMap<>(); final List failures = new ArrayList<>(); // (fatal) failures on local or skip_unavailable=false remote final Map remotesToBeSkipped = new HashMap<>(); // skip_unavailable=true remotes enrich policy failures - for (Tuple clusterInfo : targetClusters) { - String cluster = clusterInfo.v1(); - boolean skipUnavailable = clusterInfo.v2() == null ? false : clusterInfo.v2(); + for (Map.Entry clusterInfo : targetClusters.entrySet()) { + String cluster = clusterInfo.getKey(); + // MP TODO: HMM: do we really need local cluster mapped to null then? Or can we just use "false"? + boolean skipUnavailable = clusterInfo.getValue() == null ? false : clusterInfo.getValue(); LookupResponse lookupResult = lookupResults.get(cluster); if (lookupResult != null) { assert lookupResult.connectionError == null : "Should never have a non-null connectionError here"; @@ -225,30 +225,34 @@ private MergedPolicyLookupResult mergeLookupResults( if (targetClusters.size() != policies.size() + remotesToBeSkipped.size()) { final String reason; if (failures.isEmpty()) { - List missingClusters = targetClusters.stream() - .filter(t -> policies.containsKey(t.v1()) == false) - .map(t -> t.v1()) + List missingClusters = targetClusters.keySet() + .stream() + .filter(c -> policies.containsKey(c) == false) .sorted() .toList(); - reason = missingPolicyError(policyName, targetClusters.stream().map(t -> t.v1()).toList(), missingClusters); + // MP TODO: add assert that all of these are skip_un=false clusters, thus is fatal + reason = missingPolicyError(policyName, targetClusters.keySet(), missingClusters); } else { reason = "failed to resolve enrich policy [" + policyName + "]; reason " + failures; } // return Tuple.tuple(null, reason); return new MergedPolicyLookupResult(null, remotesToBeSkipped, reason); } else if (targetClusters.size() == remotesToBeSkipped.size()) { + // MP TODO: this doesn't feel right to me - are we ever seeing this block get hit? + // no target cluster had an enrich policy and all are skip_unavailable=true return new MergedPolicyLookupResult(null, remotesToBeSkipped, null); } Map mappings = new HashMap<>(); Map concreteIndices = new HashMap<>(); ResolvedEnrichPolicy last = null; + // loop over clusters with a ResolvedEnrichPolicy - ensure no errors within the policy for (Map.Entry e : policies.entrySet()) { ResolvedEnrichPolicy curr = e.getValue(); - // MP TODO: I'm not sure this handling is right - do we need to NOT error if skip_un=true? if (last != null && last.matchField().equals(curr.matchField()) == false) { String error = "enrich policy [" + policyName + "] has different match fields "; error += "[" + last.matchField() + ", " + curr.matchField() + "] across clusters"; + // MP TODO: handle skip_un here return new MergedPolicyLookupResult(null, remotesToBeSkipped, error); } if (last != null && last.matchType().equals(curr.matchType()) == false) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index e35b89b3bd296..dbaac4bf62b67 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -19,12 +19,12 @@ import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; +import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.action.EsqlQueryRequest; import org.elasticsearch.xpack.esql.analysis.Analyzer; @@ -73,6 +73,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -299,11 +300,15 @@ private void preAnalyze( .toArray(String[]::new) ).keySet(); - List> targetClusters = new ArrayList<>(); - for (String cluster : clusters) { - targetClusters.add(new Tuple<>(cluster, executionInfo.isSkipUnavailable(cluster))); + // key: cluster alias; value = skip_unavailable setting (null for local cluster) + Map targetClusters = new HashMap<>(); + for (String alias : clusters) { + if (alias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { + targetClusters.put(alias, null); + } else { + targetClusters.put(alias, executionInfo.isSkipUnavailable(alias)); + } } - enrichPolicyResolver.resolvePolicies(targetClusters, unresolvedPolicies, listener.delegateFailureAndWrap((l, enrichResolution) -> { // first we need the match_fields names from enrich policies and THEN, with an updated list of fields, we call field_caps API var matchFields = enrichResolution.resolvedEnrichPolicies() diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java index 35f6de43e2473..02a4fa422f4e9 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java @@ -26,7 +26,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexMode; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; @@ -428,7 +427,10 @@ class TestEnrichPolicyResolver extends EnrichPolicyResolver { } EnrichResolution resolvePolicies(Collection clusters, Collection unresolvedPolicies) { - List> clusterInfo = clusters.stream().map(c -> new Tuple<>(c, false)).toList(); + Map clusterInfo = new HashMap<>(); + for (String alias : clusters) { + clusterInfo.put(alias, Boolean.FALSE); + } PlainActionFuture future = new PlainActionFuture<>(); if (randomBoolean()) { unresolvedPolicies = new ArrayList<>(unresolvedPolicies); From 4733271098de40beebbd8f629ddb08a73bfea4bf Mon Sep 17 00:00:00 2001 From: Michael Peterson Date: Tue, 19 Nov 2024 09:17:36 -0500 Subject: [PATCH 04/12] Minor cleanup after deciding to not handle errors for skip_un=true clusters when looking for inconsistencies between enrich policy mappings --- .../xpack/esql/enrich/EnrichPolicyResolver.java | 12 +++--------- .../xpack/esql/session/EsqlSession.java | 6 ++---- 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index 53b0fadd79c01..b9a871580ecd8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -130,7 +130,6 @@ public void resolvePolicies( } for (UnresolvedPolicy unresolved : unresolvedPolicies) { - // MP TODO: change mergeLookupResults to return MergedPolicyLookupResult MergedPolicyLookupResult resolved = mergeLookupResults( unresolved, calculateTargetClusters(unresolved.mode, includeLocal, remotes), @@ -230,16 +229,14 @@ private MergedPolicyLookupResult mergeLookupResults( .filter(c -> policies.containsKey(c) == false) .sorted() .toList(); - // MP TODO: add assert that all of these are skip_un=false clusters, thus is fatal reason = missingPolicyError(policyName, targetClusters.keySet(), missingClusters); } else { reason = "failed to resolve enrich policy [" + policyName + "]; reason " + failures; } - // return Tuple.tuple(null, reason); return new MergedPolicyLookupResult(null, remotesToBeSkipped, reason); - } else if (targetClusters.size() == remotesToBeSkipped.size()) { - // MP TODO: this doesn't feel right to me - are we ever seeing this block get hit? - // no target cluster had an enrich policy and all are skip_unavailable=true + } else if (policies.isEmpty()) { + // no target cluster had a valid enrich policy and all are skip_unavailable=true + assert targetClusters.values().stream().allMatch(b -> b == Boolean.TRUE) : "Not all target clusters are skip_unavailable=true"; return new MergedPolicyLookupResult(null, remotesToBeSkipped, null); } @@ -252,7 +249,6 @@ private MergedPolicyLookupResult mergeLookupResults( if (last != null && last.matchField().equals(curr.matchField()) == false) { String error = "enrich policy [" + policyName + "] has different match fields "; error += "[" + last.matchField() + ", " + curr.matchField() + "] across clusters"; - // MP TODO: handle skip_un here return new MergedPolicyLookupResult(null, remotesToBeSkipped, error); } if (last != null && last.matchType().equals(curr.matchType()) == false) { @@ -274,7 +270,6 @@ private MergedPolicyLookupResult mergeLookupResults( if (old != null && old.getDataType().equals(field.getDataType()) == false) { String error = "field [" + m.getKey() + "] of enrich policy [" + policyName + "] has different data types "; error += "[" + old.getDataType() + ", " + field.getDataType() + "] across clusters"; - // TODO: we should return remotesToBeSkipped here (and other error paths), right? return new MergedPolicyLookupResult(null, remotesToBeSkipped, error); } } @@ -287,7 +282,6 @@ private MergedPolicyLookupResult mergeLookupResults( if (diff.isEmpty() == false) { String detailed = "these fields are missing in some policies: " + diff; String fullMessage = "enrich policy [" + policyName + "] has different enrich fields across clusters; " + detailed; - // TODO: we should return remotesToBeSkipped here (and other error paths), right? return new MergedPolicyLookupResult(null, remotesToBeSkipped, fullMessage); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index dbaac4bf62b67..a1e44d3451b03 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -315,10 +315,8 @@ private void preAnalyze( .stream() .map(ResolvedEnrichPolicy::matchField) .collect(Collectors.toSet()); - Map unavailableClusters = enrichResolution.unusableRemotes(); - preAnalyzeIndices(parsed, executionInfo, unavailableClusters, l.delegateFailureAndWrap((ll, indexResolution) -> { - // TODO in follow-PR (for skip_unavailble handling of missing concrete indexes) add some tests for invalid index - // resolution to updateExecutionInfo + Map unusableRemotes = enrichResolution.unusableRemotes(); + preAnalyzeIndices(parsed, executionInfo, unusableRemotes, l.delegateFailureAndWrap((ll, indexResolution) -> { if (indexResolution.isValid()) { EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.unavailableClusters()); From d7e4df59cca1e0b27e88bf1e594bde02955ce6a5 Mon Sep 17 00:00:00 2001 From: Michael Peterson Date: Tue, 19 Nov 2024 10:05:06 -0500 Subject: [PATCH 05/12] Changed local cluster to map to skip_un=false for purposes of enrich policy resolution error handling --- .../xpack/esql/enrich/EnrichPolicyResolver.java | 7 ++++--- .../elasticsearch/xpack/esql/session/EsqlSession.java | 9 ++------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index b9a871580ecd8..3f8b252282396 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -174,7 +174,9 @@ private Map calculateTargetClusters(Enrich.Mode mode, boolean i private Map copyAndAddLocalCluster(Map remoteClusters) { Map newMap = new HashMap<>(remoteClusters); - newMap.put(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, null); + // technically the local cluster has no skip_unavailable setting, but for enrich policy resolution + // purposes we treat errors on the local cluster as fatal, so set it as false to simplify downstream code + newMap.put(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Boolean.FALSE); return newMap; } @@ -198,8 +200,7 @@ private MergedPolicyLookupResult mergeLookupResults( final Map remotesToBeSkipped = new HashMap<>(); // skip_unavailable=true remotes enrich policy failures for (Map.Entry clusterInfo : targetClusters.entrySet()) { String cluster = clusterInfo.getKey(); - // MP TODO: HMM: do we really need local cluster mapped to null then? Or can we just use "false"? - boolean skipUnavailable = clusterInfo.getValue() == null ? false : clusterInfo.getValue(); + boolean skipUnavailable = clusterInfo.getValue(); LookupResponse lookupResult = lookupResults.get(cluster); if (lookupResult != null) { assert lookupResult.connectionError == null : "Should never have a non-null connectionError here"; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index a1e44d3451b03..f67a43fc23dfb 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -24,7 +24,6 @@ import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; -import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.action.EsqlQueryRequest; import org.elasticsearch.xpack.esql.analysis.Analyzer; @@ -300,14 +299,10 @@ private void preAnalyze( .toArray(String[]::new) ).keySet(); - // key: cluster alias; value = skip_unavailable setting (null for local cluster) + // key: cluster alias; value = skip_unavailable setting (false for local cluster) Map targetClusters = new HashMap<>(); for (String alias : clusters) { - if (alias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { - targetClusters.put(alias, null); - } else { - targetClusters.put(alias, executionInfo.isSkipUnavailable(alias)); - } + targetClusters.put(alias, executionInfo.isSkipUnavailable(alias)); } enrichPolicyResolver.resolvePolicies(targetClusters, unresolvedPolicies, listener.delegateFailureAndWrap((l, enrichResolution) -> { // first we need the match_fields names from enrich policies and THEN, with an updated list of fields, we call field_caps API From 10d13cc4f2240ee5425c4e1659f2b577197d2386 Mon Sep 17 00:00:00 2001 From: Michael Peterson Date: Tue, 19 Nov 2024 13:59:51 -0500 Subject: [PATCH 06/12] Modified EnrichPolicyResolverTests to match new skip_unavailable behaviors --- .../xpack/esql/analysis/EnrichResolution.java | 6 + .../enrich/EnrichPolicyResolverTests.java | 228 ++++++++++++++---- 2 files changed, 185 insertions(+), 49 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java index 9f409fe57934d..3e34f5dea95a8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java @@ -35,11 +35,17 @@ public Collection resolvedEnrichPolicies() { } + // created for testing + public boolean hasErrors() { + return errors.size() > 0; + } + public String getError(String policyName, Enrich.Mode mode) { final String error = errors.get(new Key(policyName, mode)); if (error != null) { return error; } else { + // TODO: I don't understand this code - why not just return null? Why is it wrong to call this when there's no errors? assert false : "unresolved enrich policy [" + policyName + "] mode [" + mode + "]"; return "unresolved enrich policy [" + policyName + "] mode [" + mode + "]"; } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java index 02a4fa422f4e9..5b3a26c4af7ce 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java @@ -55,6 +55,7 @@ import static org.elasticsearch.transport.RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Mockito.mock; @@ -164,8 +165,11 @@ private void assertHostPolicies(ResolvedEnrichPolicy resolved) { public void testLocalHosts() { for (Enrich.Mode mode : Enrich.Mode.values()) { - Set clusters = Set.of(LOCAL_CLUSTER_GROUP_KEY); - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("hosts", mode))); + Map clusters = Map.of(LOCAL_CLUSTER_GROUP_KEY, Boolean.FALSE); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusters, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("hosts", mode)) + ); ResolvedEnrichPolicy resolved = resolution.getResolvedPolicy("hosts", mode); assertHostPolicies(resolved); assertThat(resolved.concreteIndices(), equalTo(Map.of("", ".enrich-hosts-123"))); @@ -173,9 +177,12 @@ public void testLocalHosts() { } public void testRemoteHosts() { - Set clusters = Set.of("cluster_a", "cluster_b"); + Map clusters = Map.of("cluster_a", randomBoolean(), "cluster_b", randomBoolean()); for (Enrich.Mode mode : Enrich.Mode.values()) { - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("hosts", mode))); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusters, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("hosts", mode)) + ); ResolvedEnrichPolicy resolved = resolution.getResolvedPolicy("hosts", mode); assertHostPolicies(resolved); var expectedIndices = switch (mode) { @@ -188,9 +195,19 @@ public void testRemoteHosts() { } public void testMixedHosts() { - Set clusters = Set.of(LOCAL_CLUSTER_GROUP_KEY, "cluster_a", "cluster_b"); + Map clusters = Map.of( + LOCAL_CLUSTER_GROUP_KEY, + Boolean.TRUE, + "cluster_a", + randomBoolean(), + "cluster_b", + randomBoolean() + ); for (Enrich.Mode mode : Enrich.Mode.values()) { - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("hosts", mode))); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusters, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("hosts", mode)) + ); ResolvedEnrichPolicy resolved = resolution.getResolvedPolicy("hosts", mode); assertHostPolicies(resolved); var expectedIndices = switch (mode) { @@ -203,8 +220,11 @@ public void testMixedHosts() { public void testLocalAddress() { for (Enrich.Mode mode : Enrich.Mode.values()) { - Set clusters = Set.of(LOCAL_CLUSTER_GROUP_KEY); - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("address", mode))); + Map clusters = Map.of(LOCAL_CLUSTER_GROUP_KEY, Boolean.FALSE); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusters, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("address", mode)) + ); ResolvedEnrichPolicy resolved = resolution.getResolvedPolicy("address", mode); assertNotNull(resolved); assertThat(resolved.matchField(), equalTo("emp_id")); @@ -214,8 +234,15 @@ public void testLocalAddress() { } { List clusters = randomSubsetOf(between(1, 3), List.of("", "cluster_a", "cluster_a")); + Map clusterInfo = new HashMap<>(); + for (String cluster : clusters) { + clusterInfo.put(cluster, cluster.equals("") ? Boolean.FALSE : randomBoolean()); + } var mode = Enrich.Mode.COORDINATOR; - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("address", mode))); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusterInfo, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("address", mode)) + ); ResolvedEnrichPolicy resolved = resolution.getResolvedPolicy("address", mode); assertNotNull(resolved); assertThat(resolved.matchField(), equalTo("emp_id")); @@ -226,9 +253,12 @@ public void testLocalAddress() { } public void testRemoteAddress() { - Set clusters = Set.of("cluster_a", "cluster_b"); + Map clusters = Map.of("cluster_a", randomBoolean(), "cluster_b", randomBoolean()); for (Enrich.Mode mode : List.of(Enrich.Mode.ANY, Enrich.Mode.REMOTE)) { - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("address", mode))); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusters, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("address", mode)) + ); assertNull(resolution.getResolvedPolicy("address", mode)); var msg = "enrich policy [address] has different enrich fields across clusters; " + "these fields are missing in some policies: [state]"; @@ -237,9 +267,19 @@ public void testRemoteAddress() { } public void testMixedAddress() { - Set clusters = Set.of(LOCAL_CLUSTER_GROUP_KEY, "cluster_a", "cluster_b"); + Map clusters = Map.of( + LOCAL_CLUSTER_GROUP_KEY, + Boolean.FALSE, + "cluster_a", + randomBoolean(), + "cluster_b", + randomBoolean() + ); for (Enrich.Mode mode : List.of(Enrich.Mode.ANY, Enrich.Mode.REMOTE)) { - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("hosts", mode))); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusters, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("hosts", mode)) + ); ResolvedEnrichPolicy resolved = resolution.getResolvedPolicy("hosts", mode); assertHostPolicies(resolved); assertThat( @@ -252,8 +292,11 @@ public void testMixedAddress() { public void testLocalAuthor() { for (Enrich.Mode mode : Enrich.Mode.values()) { - Set clusters = Set.of(LOCAL_CLUSTER_GROUP_KEY); - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode))); + Map clusters = Map.of(LOCAL_CLUSTER_GROUP_KEY, Boolean.FALSE); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusters, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode)) + ); ResolvedEnrichPolicy resolved = resolution.getResolvedPolicy("author", mode); assertNotNull(resolved); assertThat(resolved.matchField(), equalTo("author")); @@ -264,7 +307,14 @@ public void testLocalAuthor() { { var mode = Enrich.Mode.COORDINATOR; var clusters = randomSubsetOf(between(1, 3), Set.of("", "cluster_a", "cluster_b")); - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode))); + Map clusterInfo = new HashMap<>(); + for (String cluster : clusters) { + clusterInfo.put(cluster, cluster.equals("") ? Boolean.FALSE : randomBoolean()); + } + var resolution = localCluster.resolvePoliciesWithRandomization( + clusterInfo, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode)) + ); ResolvedEnrichPolicy resolved = resolution.getResolvedPolicy("author", mode); assertNotNull(resolved); assertThat(resolved.matchField(), equalTo("author")); @@ -276,10 +326,13 @@ public void testLocalAuthor() { } public void testAuthorClusterA() { - Set clusters = Set.of("cluster_a"); + Map clusters = Map.of("cluster_a", randomBoolean()); { var mode = Enrich.Mode.ANY; - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode))); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusters, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode)) + ); assertNull(resolution.getResolvedPolicy("author", mode)); assertThat( resolution.getError("author", mode), @@ -288,7 +341,10 @@ public void testAuthorClusterA() { } { var mode = Enrich.Mode.REMOTE; - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode))); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusters, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode)) + ); ResolvedEnrichPolicy resolved = resolution.getResolvedPolicy("author", mode); assertNotNull(resolved); assertThat(resolved.matchType(), equalTo("range")); @@ -300,10 +356,13 @@ public void testAuthorClusterA() { } public void testAuthorClusterB() { - Set clusters = Set.of("cluster_b"); + Map clusters = Map.of("cluster_b", randomBoolean()); { var mode = Enrich.Mode.ANY; - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode))); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusters, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode)) + ); assertNull(resolution.getResolvedPolicy("author", mode)); assertThat( resolution.getError("author", mode), @@ -312,7 +371,10 @@ public void testAuthorClusterB() { } { var mode = Enrich.Mode.REMOTE; - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode))); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusters, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode)) + ); ResolvedEnrichPolicy resolved = resolution.getResolvedPolicy("author", mode); assertNotNull(resolved); assertThat(resolved.matchType(), equalTo("match")); @@ -324,10 +386,13 @@ public void testAuthorClusterB() { } public void testAuthorClusterAAndClusterB() { - Set clusters = Set.of("cluster_a", "cluster_b"); + Map clusters = Map.of("cluster_a", randomBoolean(), "cluster_b", randomBoolean()); { var mode = Enrich.Mode.ANY; - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode))); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusters, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode)) + ); assertNull(resolution.getResolvedPolicy("author", mode)); assertThat( resolution.getError("author", mode), @@ -336,7 +401,10 @@ public void testAuthorClusterAAndClusterB() { } { var mode = Enrich.Mode.REMOTE; - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode))); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusters, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode)) + ); assertNull(resolution.getResolvedPolicy("author", mode)); assertThat( resolution.getError("author", mode), @@ -346,10 +414,13 @@ public void testAuthorClusterAAndClusterB() { } public void testLocalAndClusterBAuthor() { - Set clusters = Set.of("", "cluster_b"); + Map clusters = Map.of("", Boolean.FALSE, "cluster_b", randomBoolean()); { var mode = Enrich.Mode.ANY; - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode))); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusters, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode)) + ); assertNull(resolution.getResolvedPolicy("author", mode)); assertThat( resolution.getError("author", mode), @@ -358,7 +429,10 @@ public void testLocalAndClusterBAuthor() { } { var mode = Enrich.Mode.REMOTE; - var resolution = localCluster.resolvePolicies(clusters, List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode))); + var resolution = localCluster.resolvePoliciesWithRandomization( + clusters, + List.of(new EnrichPolicyResolver.UnresolvedPolicy("author", mode)) + ); assertNull(resolution.getResolvedPolicy("author", mode)); assertThat( resolution.getError("author", mode), @@ -369,31 +443,76 @@ public void testLocalAndClusterBAuthor() { public void testMissingLocalPolicy() { for (Enrich.Mode mode : Enrich.Mode.values()) { - var resolution = localCluster.resolvePolicies(Set.of(""), List.of(new EnrichPolicyResolver.UnresolvedPolicy("authoz", mode))); + var resolution = localCluster.resolvePoliciesWithRandomization( + Map.of("", Boolean.FALSE), + List.of(new EnrichPolicyResolver.UnresolvedPolicy("authoz", mode)) + ); assertNull(resolution.getResolvedPolicy("authoz", mode)); assertThat(resolution.getError("authoz", mode), equalTo("cannot find enrich policy [authoz], did you mean [author]?")); } } public void testMissingRemotePolicy() { + String missingPolicyName = "addrezz"; + + // missing policy on skip_unavailable=true is not a (fatal) error - the error is simply recorded in metadata { - var mode = Enrich.Mode.REMOTE; - var resolution = localCluster.resolvePolicies( - Set.of("cluster_a"), + boolean skipUnavailable = true; + var mode = Enrich.Mode.REMOTE; // only requires policy on remote cluster + EnrichResolution resolution = localCluster.resolvePolicies( + Map.of("cluster_a", skipUnavailable), List.of(new EnrichPolicyResolver.UnresolvedPolicy("addrezz", mode)) ); - assertNull(resolution.getResolvedPolicy("addrezz", mode)); - assertThat(resolution.getError("addrezz", mode), equalTo("cannot find enrich policy [addrezz] on clusters [cluster_a]")); + assertThat(resolution.unusableRemotes().size(), equalTo(1)); + Exception exception = resolution.unusableRemotes().get("cluster_a"); + assertNotNull(exception); + assertThat(exception.getMessage(), containsString("failed to resolve enrich policy [addrezz]")); + assertNull(resolution.getResolvedPolicy(missingPolicyName, mode)); + assertFalse(resolution.hasErrors()); } + + // missing policy on skip_unavailable=true is not a (fatal) error, but the missing policy on local is a fatal error { - var mode = Enrich.Mode.ANY; + boolean skipUnavailable = true; + var mode = Enrich.Mode.ANY; // requires policy to be on the local cluster as well var resolution = localCluster.resolvePolicies( - Set.of("cluster_a"), - List.of(new EnrichPolicyResolver.UnresolvedPolicy("addrezz", mode)) + Map.of("cluster_a", skipUnavailable), + List.of(new EnrichPolicyResolver.UnresolvedPolicy(missingPolicyName, mode)) + ); + assertThat(resolution.unusableRemotes().size(), equalTo(1)); + assertNull(resolution.getResolvedPolicy(missingPolicyName, mode)); + assertThat( + resolution.getError(missingPolicyName, mode), + equalTo("cannot find enrich policy [addrezz] on clusters [_local, cluster_a]") + ); + } + + // missing policy on skip_unavailable=false is an error + { + boolean skipUnavailable = false; + var mode = Enrich.Mode.REMOTE; // only requires policy on remote cluster + var resolution = localCluster.resolvePolicies( + Map.of("cluster_a", skipUnavailable), + List.of(new EnrichPolicyResolver.UnresolvedPolicy(missingPolicyName, mode)) + ); + assertThat(resolution.unusableRemotes().size(), equalTo(0)); + assertNull(resolution.getResolvedPolicy(missingPolicyName, mode)); + assertThat( + resolution.getError(missingPolicyName, mode), + equalTo("cannot find enrich policy [addrezz] on clusters [cluster_a]") ); - assertNull(resolution.getResolvedPolicy("addrezz", mode)); + } + { + boolean skipUnavailable = false; + var mode = Enrich.Mode.ANY; // requires policy to be on the local cluster as well + var resolution = localCluster.resolvePolicies( + Map.of("cluster_a", skipUnavailable), + List.of(new EnrichPolicyResolver.UnresolvedPolicy(missingPolicyName, mode)) + ); + assertThat(resolution.unusableRemotes().size(), equalTo(0)); + assertNull(resolution.getResolvedPolicy(missingPolicyName, mode)); assertThat( - resolution.getError("addrezz", mode), + resolution.getError(missingPolicyName, mode), equalTo("cannot find enrich policy [addrezz] on clusters [_local, cluster_a]") ); } @@ -426,26 +545,37 @@ class TestEnrichPolicyResolver extends EnrichPolicyResolver { this.mappings = mappings; } - EnrichResolution resolvePolicies(Collection clusters, Collection unresolvedPolicies) { - Map clusterInfo = new HashMap<>(); - for (String alias : clusters) { - clusterInfo.put(alias, Boolean.FALSE); - } - PlainActionFuture future = new PlainActionFuture<>(); + /** + * @param clusters target clusters map; key=cluster alias; value=skip_unavailable setting (should be false for local) + * @param unresolvedPolicies + * @return + */ + EnrichResolution resolvePoliciesWithRandomization(Map clusters, Collection unresolvedPolicies) { + return resolvePolicies(clusters, randomAdditionsToUnresolvedPolicyList(unresolvedPolicies)); + } + + Collection randomAdditionsToUnresolvedPolicyList(Collection unresolvedPolicies) { + Collection revisedList = unresolvedPolicies; if (randomBoolean()) { - unresolvedPolicies = new ArrayList<>(unresolvedPolicies); + revisedList = new ArrayList<>(revisedList); for (Enrich.Mode mode : Enrich.Mode.values()) { for (String policy : List.of("hosts", "address", "author")) { if (randomBoolean()) { - unresolvedPolicies.add(new UnresolvedPolicy(policy, mode)); + revisedList.add(new UnresolvedPolicy(policy, mode)); } } } if (randomBoolean()) { - unresolvedPolicies.add(new UnresolvedPolicy("legacy-policy-1", randomFrom(Enrich.Mode.values()))); + // legacy-policy-1 does not exist on any cluster in this test + revisedList.add(new UnresolvedPolicy("legacy-policy-1", randomFrom(Enrich.Mode.values()))); } } - super.resolvePolicies(clusterInfo, unresolvedPolicies, future); + return revisedList; + } + + EnrichResolution resolvePolicies(Map clusters, Collection unresolvedPolicies) { + PlainActionFuture future = new PlainActionFuture<>(); + super.resolvePolicies(clusters, unresolvedPolicies, future); return future.actionGet(30, TimeUnit.SECONDS); } From 3705eb33631ac29adf991df08ff453b199b04a6a Mon Sep 17 00:00:00 2001 From: Michael Peterson Date: Tue, 19 Nov 2024 15:24:43 -0500 Subject: [PATCH 07/12] The enrich policy re-resolution now also passes in the map of clusterAlias-to-skipUn-setting --- .../xpack/esql/session/EsqlSession.java | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index f67a43fc23dfb..f8a440420a0db 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -329,17 +329,20 @@ private void preAnalyze( // If new clusters appear when resolving the main indices, we need to resolve the enrich policies again // or exclude main concrete indices. Since this is rare, it's simpler to resolve the enrich policies again. // TODO: add a test for this - // MP TODO: deal with this later in this PR - // if (clusters.containsAll(newClusters) == false - // // do not bother with a re-resolution if only remotes were requested and all were offline - // && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) > 0) { - // enrichPolicyResolver.resolvePolicies( - // newClusters, - // unresolvedPolicies, - // ll.map(newEnrichResolution -> action.apply(indexResolution, newEnrichResolution)) - // ); - // return; - // } + if (clusters.containsAll(newClusters) == false + // do not bother with a re-resolution if only remotes were requested and all were offline + && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) > 0) { + Map newClusterInfo = new HashMap<>(); + for (String newCluster : newClusters) { + newClusterInfo.put(newCluster, executionInfo.isSkipUnavailable(newCluster)); + } + enrichPolicyResolver.resolvePolicies( + newClusterInfo, + unresolvedPolicies, + ll.map(newEnrichResolution -> action.apply(indexResolution, newEnrichResolution)) + ); + return; + } } ll.onResponse(action.apply(indexResolution, enrichResolution)); }), matchFields); From ebc931e74928a185d86b23f5a1728f7e73232a77 Mon Sep 17 00:00:00 2001 From: Michael Peterson Date: Tue, 19 Nov 2024 16:50:08 -0500 Subject: [PATCH 08/12] Added missing enrich policy tests to RemoteClusterSecurityEsqlIT --- .../RemoteClusterSecurityEsqlIT.java | 89 ++++++++++++++++++- 1 file changed, 86 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityEsqlIT.java b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityEsqlIT.java index 09449f81121fd..21868e5b4d294 100644 --- a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityEsqlIT.java +++ b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityEsqlIT.java @@ -158,6 +158,10 @@ public class RemoteClusterSecurityEsqlIT extends AbstractRemoteClusterSecurityTe })).around(fulfillingCluster).around(queryCluster); public void populateData() throws Exception { + populateData(true); + } + + public void populateData(boolean includeEnrichPolicyOnRemoteCluster) throws Exception { CheckedConsumer setupEnrich = client -> { Request createIndex = new Request("PUT", "countries"); createIndex.setJsonEntity(""" @@ -204,8 +208,10 @@ public void populateData() throws Exception { } } """); - assertOK(performRequestWithAdminUser(client, createEnrich)); - assertOK(performRequestWithAdminUser(client, new Request("PUT", "_enrich/policy/countries/_execute"))); + if (includeEnrichPolicyOnRemoteCluster) { + assertOK(performRequestWithAdminUser(client, createEnrich)); + assertOK(performRequestWithAdminUser(client, new Request("PUT", "_enrich/policy/countries/_execute"))); + } performRequestWithAdminUser(client, new Request("DELETE", "/countries")); }; // Fulfilling cluster @@ -331,7 +337,12 @@ public void wipeData() throws Exception { performRequestWithAdminUser(client, new Request("DELETE", "/employees")); performRequestWithAdminUser(client, new Request("DELETE", "/employees2")); performRequestWithAdminUser(client, new Request("DELETE", "/employees3")); - performRequestWithAdminUser(client, new Request("DELETE", "/_enrich/policy/countries")); + final Response response = performRequestWithAdminUser(client, new Request("GET", "/_enrich/policy/countries")); + final Map getResponseMap = responseAsMap(response); + List policies = (List) getResponseMap.get("policies"); + if (policies.isEmpty() == false) { + performRequestWithAdminUser(client, new Request("DELETE", "/_enrich/policy/countries")); + } }; wipe.accept(fulfillingClusterClient); wipe.accept(client()); @@ -811,6 +822,78 @@ public void testCrossClusterEnrichWithOnlyRemotePrivs() throws Exception { assertThat(flatList, containsInAnyOrder(1, 3, "usa", "germany")); } + public void testCrossClusterEnrichWithMissingEnrichPolicySkipUnavailableFalse() throws Exception { + configureRemoteCluster(REMOTE_CLUSTER_ALIAS, fulfillingCluster, false, randomBoolean(), false); + populateData(false); + { + Request request = esqlRequest(""" + FROM my_remote_cluster:employees + | ENRICH countries + | STATS size=count(*) by country + | SORT size DESC + | LIMIT 2"""); + + ResponseException e = expectThrows(ResponseException.class, () -> performRequestWithRemoteSearchUser(request)); + assertThat(e.getMessage(), containsString("verification_exception")); + assertThat(e.getMessage(), containsString("cannot find enrich policy [countries]")); + } + } + + @SuppressWarnings("unchecked") + public void testCrossClusterEnrichWithMissingEnrichPolicySkipUnavailableTrue() throws Exception { + configureRemoteCluster(REMOTE_CLUSTER_ALIAS, fulfillingCluster, false, randomBoolean(), true); + populateData(false); + { + Request request = esqlRequest(""" + FROM my_remote_cluster:employees + | ENRICH countries + | STATS size=count(*) by country + | SORT size DESC + | LIMIT 2"""); + + Response response = performRequestWithRemoteSearchUser(request); + assertOK(response); + + /** + * Expected response + * took=109, + * columns=[{name=, type=null}], + * values=[], + * _clusters={running=0, total=1, failed=0, partial=0, successful=0, skipped=1, + * details={my_remote_cluster= + * {_shards={total=0, failed=0, successful=0, skipped=0}, + * took=109, indices=employees, + * failures=[{reason={reason=failed to resolve enrich policy [countries], + * type=illegal_state_exception}, index=null, shard=-1}], status=skipped}}, + * }} + */ + + Map responseAsMap = entityAsMap(response); + List columns = (List) responseAsMap.get("columns"); + List values = (List) responseAsMap.get("values"); + assertThat(columns.size(), equalTo(1)); + Map column1 = (Map) columns.get(0); + assertThat(column1.get("name").toString(), equalTo("")); + assertThat(values.size(), equalTo(0)); + Map clusters = (Map) responseAsMap.get("_clusters"); + + assertThat((int) clusters.get("total"), is(1)); + assertThat((int) clusters.get("successful"), is(0)); + assertThat((int) clusters.get("skipped"), is(1)); + assertThat((int) clusters.get("failed"), is(0)); + + Map details = (Map) clusters.get("details"); + Map invalidRemoteEntry = (Map) details.get("my_remote_cluster"); + assertThat(invalidRemoteEntry.get("status").toString(), equalTo("skipped")); + List failures = (List) invalidRemoteEntry.get("failures"); + assertThat(failures.size(), equalTo(1)); + Map failuresMap = (Map) failures.get(0); + Map reason = (Map) failuresMap.get("reason"); + assertThat(reason.get("type").toString(), equalTo("illegal_state_exception")); + assertThat(reason.get("reason").toString(), containsString("failed to resolve enrich policy [countries]")); + } + } + private void createAliases() throws Exception { Request createAlias = new Request("POST", "_aliases"); createAlias.setJsonEntity(""" From 7e4cfaf1665f5b669ff6f403bad0a77080f464a3 Mon Sep 17 00:00:00 2001 From: Michael Peterson Date: Tue, 19 Nov 2024 16:56:51 -0500 Subject: [PATCH 09/12] Update docs/changelog/116972.yaml --- docs/changelog/116972.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 docs/changelog/116972.yaml diff --git a/docs/changelog/116972.yaml b/docs/changelog/116972.yaml new file mode 100644 index 0000000000000..0387b3e9619cc --- /dev/null +++ b/docs/changelog/116972.yaml @@ -0,0 +1,6 @@ +pr: 116972 +summary: "ESQL: Missing enrich policies on skip_unavailable=true clusters do not fail\ + \ the query" +area: ES|QL +type: enhancement +issues: [] From 8acb25c2623a863eaabd9da904ce77c83f0e9cd4 Mon Sep 17 00:00:00 2001 From: Michael Peterson Date: Wed, 20 Nov 2024 11:09:52 -0500 Subject: [PATCH 10/12] Minor changes based on PR feedback --- .../elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index 3f8b252282396..bf1ba4bc4c975 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -96,7 +96,7 @@ public record UnresolvedPolicy(String name, Enrich.Mode mode) { /** * Resolves a set of enrich policies * - * @param targetClusters the target clusters; key: cluster alias; value: skip_unavailable setting (null for local cluster) + * @param targetClusters the target clusters; key: cluster alias; value: skip_unavailable setting * @param unresolvedPolicies the unresolved policies * @param listener notified with the enrich resolution */ @@ -167,7 +167,7 @@ record MergedPolicyLookupResult( private Map calculateTargetClusters(Enrich.Mode mode, boolean includeLocal, Map remoteClusters) { return switch (mode) { case ANY -> copyAndAddLocalCluster(remoteClusters); - case COORDINATOR -> copyAndAddLocalCluster(Collections.emptyMap()); + case COORDINATOR -> Map.of(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Boolean.FALSE); case REMOTE -> includeLocal ? copyAndAddLocalCluster(remoteClusters) : remoteClusters; }; } From c9517acd8ec8e0ba35602152d90dba90f1f32756 Mon Sep 17 00:00:00 2001 From: Michael Peterson Date: Wed, 20 Nov 2024 14:50:05 -0500 Subject: [PATCH 11/12] Added test from Pawan Karthik for RCS2 testing of unavailable remotes during enrich CrossClusterEsqlRCS2EnrichUnavailableRemotesIT --- ...terEsqlRCS2EnrichUnavailableRemotesIT.java | 380 ++++++++++++++++++ 1 file changed, 380 insertions(+) create mode 100644 x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS2EnrichUnavailableRemotesIT.java diff --git a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS2EnrichUnavailableRemotesIT.java b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS2EnrichUnavailableRemotesIT.java new file mode 100644 index 0000000000000..da59e3c772736 --- /dev/null +++ b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS2EnrichUnavailableRemotesIT.java @@ -0,0 +1,380 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.remotecluster; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.common.Strings; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.cluster.util.resource.Resource; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.rules.RuleChain; +import org.junit.rules.TestRule; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.endsWith; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; + +public class CrossClusterEsqlRCS2EnrichUnavailableRemotesIT extends AbstractRemoteClusterSecurityTestCase { + private static final AtomicReference> API_KEY_MAP_REF = new AtomicReference<>(); + + static { + fulfillingCluster = ElasticsearchCluster.local() + .name("fulfilling-cluster") + .nodes(1) + .module("x-pack-autoscaling") + .module("x-pack-esql") + .module("x-pack-enrich") + .module("x-pack-ml") + .module("ingest-common") + .apply(commonClusterConfig) + .setting("remote_cluster.port", "0") + .setting("xpack.ml.enabled", "false") + .setting("xpack.security.remote_cluster_server.ssl.enabled", "true") + .setting("xpack.security.remote_cluster_server.ssl.key", "remote-cluster.key") + .setting("xpack.security.remote_cluster_server.ssl.certificate", "remote-cluster.crt") + .setting("xpack.security.authc.token.enabled", "true") + .keystore("xpack.security.remote_cluster_server.ssl.secure_key_passphrase", "remote-cluster-password") + .node(0, spec -> spec.setting("remote_cluster_server.enabled", "true")) + .build(); + + queryCluster = ElasticsearchCluster.local() + .name("query-cluster") + .module("x-pack-autoscaling") + .module("x-pack-esql") + .module("x-pack-enrich") + .module("x-pack-ml") + .module("ingest-common") + .apply(commonClusterConfig) + .setting("xpack.ml.enabled", "false") + .setting("xpack.security.remote_cluster_client.ssl.enabled", "true") + .setting("xpack.security.remote_cluster_client.ssl.certificate_authorities", "remote-cluster-ca.crt") + .setting("xpack.security.authc.token.enabled", "true") + .keystore("cluster.remote.my_remote_cluster.credentials", () -> { + if (API_KEY_MAP_REF.get() == null) { + final Map apiKeyMap = createCrossClusterAccessApiKey(""" + { + "search": [ + { + "names": ["*"] + } + ] + }"""); + API_KEY_MAP_REF.set(apiKeyMap); + } + return (String) API_KEY_MAP_REF.get().get("encoded"); + }) + .rolesFile(Resource.fromClasspath("roles.yml")) + .user(REMOTE_METRIC_USER, PASS.toString(), "read_remote_shared_metrics", false) + .build(); + } + + @ClassRule + public static TestRule clusterRule = RuleChain.outerRule(fulfillingCluster).around(queryCluster); + + private String[] modes = { "_coordinator", "_remote" }; + + @Before + public void setupPreRequisites() throws IOException { + setupRolesAndPrivileges(); + setSourceData(); + + var policy = createPolicy("employees-policy", "employees", "email", new String[] { "id", "designation" }); + // Create the enrich policy on both clusters. + assertOK(client().performRequest(policy)); + assertOK(performRequestAgainstFulfillingCluster(policy)); + + // Execute the enrich policy on both clusters. + var exec = executePolicy("employees-policy"); + assertOK(client().performRequest(exec)); + assertOK(performRequestAgainstFulfillingCluster(exec)); + } + + public void testEsqlEnrichWithSkipUnavailable() throws Exception { + esqlEnrichWithRandomSkipUnavailable(); + esqlEnrichWithSkipUnavailableTrue(); + esqlEnrichWithSkipUnavailableFalse(); + } + + private void esqlEnrichWithRandomSkipUnavailable() throws Exception { + configureRemoteCluster("my_remote_cluster", fulfillingCluster, false, randomBoolean(), randomBoolean()); + + String query = "FROM to-be-enr*,my_remote_cluster:to-be-enr* | ENRICH " + randomFrom(modes) + ":employees-policy | LIMIT 10"; + Response response = performRequestWithRemoteSearchUser(esqlRequest(query)); + + Map map = responseAsMap(response); + ArrayList values = (ArrayList) map.get("values"); + Map clusters = (Map) map.get("_clusters"); + Map clusterDetails = (Map) clusters.get("details"); + Map localClusterDetails = (Map) clusterDetails.get("(local)"); + Map remoteClusterDetails = (Map) clusterDetails.get("my_remote_cluster"); + + assertOK(response); + assertThat((int) map.get("took"), greaterThan(0)); + assertThat(values.size(), is(6)); + for (int i = 0; i < 6; i++) { + ArrayList value = (ArrayList) values.get(i); + // Size is 3: ID, Email, Designation. + assertThat(value.size(), is(3)); + // Email + assertThat((String) value.get(0), endsWith("@corp.co")); + // ID + assertThat(value.get(1), is(i + 1)); + } + + assertThat((int) clusters.get("total"), is(2)); + assertThat((int) clusters.get("successful"), is(2)); + assertThat((int) clusters.get("running"), is(0)); + assertThat((int) clusters.get("skipped"), is(0)); + assertThat((int) clusters.get("partial"), is(0)); + assertThat((int) clusters.get("failed"), is(0)); + + assertThat(clusterDetails.size(), is(2)); + assertThat((int) localClusterDetails.get("took"), greaterThan(0)); + assertThat(localClusterDetails.get("status"), is("successful")); + + assertThat((int) remoteClusterDetails.get("took"), greaterThan(0)); + assertThat(remoteClusterDetails.get("status"), is("successful")); + } + + @SuppressWarnings("unchecked") + private void esqlEnrichWithSkipUnavailableTrue() throws Exception { + configureRemoteCluster("my_remote_cluster", fulfillingCluster, false, randomBoolean(), true); + + try { + fulfillingCluster.stop(true); + + String query = "FROM to-be-enriched,my_remote_cluster:to-be-enriched | ENRICH employees-policy | LIMIT 10"; + Response response = performRequestWithRemoteSearchUser(esqlRequest(query)); + + Map map = responseAsMap(response); + ArrayList values = (ArrayList) map.get("values"); + Map clusters = (Map) map.get("_clusters"); + Map clusterDetails = (Map) clusters.get("details"); + Map localClusterDetails = (Map) clusterDetails.get("(local)"); + Map remoteClusterDetails = (Map) clusterDetails.get("my_remote_cluster"); + + assertOK(response); + assertThat((int) map.get("took"), greaterThan(0)); + assertThat(values.size(), is(3)); + + // We only have 3 values since the remote cluster is turned off. + for (int i = 0; i < 3; i++) { + ArrayList value = (ArrayList) values.get(i); + // Size is 3: ID, Email, Designation. + assertThat(value.size(), is(3)); + // Email + assertThat((String) value.get(0), endsWith("@corp.co")); + // ID + assertThat(value.get(1), is(i + 1)); + } + + assertThat((int) clusters.get("total"), is(2)); + assertThat((int) clusters.get("successful"), is(1)); + assertThat((int) clusters.get("running"), is(0)); + assertThat((int) clusters.get("skipped"), is(1)); + assertThat((int) clusters.get("partial"), is(0)); + assertThat((int) clusters.get("failed"), is(0)); + + assertThat(clusterDetails.size(), is(2)); + assertThat((int) localClusterDetails.get("took"), greaterThan(0)); + assertThat(localClusterDetails.get("status"), is("successful")); + + assertThat((int) remoteClusterDetails.get("took"), greaterThan(0)); + assertThat(remoteClusterDetails.get("status"), is("skipped")); + + ArrayList remoteClusterFailures = (ArrayList) remoteClusterDetails.get("failures"); + assertThat(remoteClusterFailures.size(), equalTo(1)); + Map failuresMap = (Map) remoteClusterFailures.get(0); + + Map reason = (Map) failuresMap.get("reason"); + assertThat(reason.get("type").toString(), equalTo("connect_transport_exception")); + assertThat(reason.get("reason").toString(), containsString("Unable to connect to [my_remote_cluster]")); + } finally { + fulfillingCluster.start(); + closeFulfillingClusterClient(); + initFulfillingClusterClient(); + } + } + + private void esqlEnrichWithSkipUnavailableFalse() throws Exception { + configureRemoteCluster("my_remote_cluster", fulfillingCluster, false, randomBoolean(), false); + + try { + fulfillingCluster.stop(true); + + String query = "FROM to-be-enr*,my_remote_cluster:to-be-enr* | ENRICH " + randomFrom(modes) + ":employees-policy | LIMIT 10"; + ResponseException ex = expectThrows(ResponseException.class, () -> performRequestWithRemoteSearchUser(esqlRequest(query))); + assertThat(ex.getMessage(), containsString("connect_transport_exception")); + } finally { + fulfillingCluster.start(); + closeFulfillingClusterClient(); + initFulfillingClusterClient(); + } + } + + private void setupRolesAndPrivileges() throws IOException { + var putUserRequest = new Request("PUT", "/_security/user/" + REMOTE_SEARCH_USER); + putUserRequest.setJsonEntity(""" + { + "password": "x-pack-test-password", + "roles" : ["remote_search"] + }"""); + assertOK(adminClient().performRequest(putUserRequest)); + + var putRoleOnRemoteClusterRequest = new Request("PUT", "/_security/role/" + REMOTE_SEARCH_ROLE); + putRoleOnRemoteClusterRequest.setJsonEntity(""" + { + "indices": [ + { + "names": ["*"], + "privileges": ["read"] + } + ], + "cluster": [ "monitor_enrich", "manage_own_api_key" ], + "remote_indices": [ + { + "names": ["*"], + "privileges": ["read"], + "clusters": ["my_remote_cluster"] + } + ], + "remote_cluster": [ + { + "privileges": ["monitor_enrich"], + "clusters": ["my_remote_cluster"] + } + ] + }"""); + assertOK(adminClient().performRequest(putRoleOnRemoteClusterRequest)); + } + + private void setSourceData() throws IOException { + Request createIndex = new Request("PUT", "employees"); + createIndex.setJsonEntity(""" + { + "mappings": { + "properties": { + "id": { "type": "integer" }, + "email": { "type": "text" }, + "designation": { "type": "text" } + } + } + } + """); + assertOK(client().performRequest(createIndex)); + assertOK(performRequestAgainstFulfillingCluster(createIndex)); + + Request bulkRequest = new Request("POST", "/_bulk?refresh=true"); + bulkRequest.setJsonEntity(""" + { "index": { "_index": "employees" } } + { "id": 1, "email": "a@corp.co", "designation": "SDE intern"} + { "index": { "_index": "employees" } } + { "id": 2, "email": "b@corp.co", "designation": "SDE 1"} + { "index": { "_index": "employees" } } + { "id": 3, "email": "c@corp.co", "designation": "SDE 2"} + { "index": { "_index": "employees" } } + { "id": 4, "email": "d@corp.co", "designation": "SSE"} + { "index": { "_index": "employees" } } + { "id": 5, "email": "e@corp.co", "designation": "PSE 1"} + { "index": { "_index": "employees" } } + { "id": 6, "email": "f@corp.co", "designation": "PSE 2"} + """); + assertOK(client().performRequest(bulkRequest)); + assertOK(performRequestAgainstFulfillingCluster(bulkRequest)); + + createIndex = new Request("PUT", "to-be-enriched"); + createIndex.setJsonEntity(""" + { + "mappings": { + "properties": { + "email": { "type": "text" } + } + } + } + """); + assertOK(client().performRequest(createIndex)); + assertOK(performRequestAgainstFulfillingCluster(createIndex)); + + bulkRequest = new Request("POST", "/_bulk?refresh=true"); + bulkRequest.setJsonEntity(""" + { "index": { "_index": "to-be-enriched" } } + { "email": "a@corp.co"} + { "index": { "_index": "to-be-enriched" } } + { "email": "b@corp.co"} + { "index": { "_index": "to-be-enriched" } } + { "email": "c@corp.co"} + """); + assertOK(client().performRequest(bulkRequest)); + + bulkRequest = new Request("POST", "/_bulk?refresh=true"); + bulkRequest.setJsonEntity(""" + { "index": { "_index": "to-be-enriched" } } + { "email": "d@corp.co"} + { "index": { "_index": "to-be-enriched" } } + { "email": "e@corp.co"} + { "index": { "_index": "to-be-enriched" } } + { "email": "f@corp.co"} + """); + assertOK(performRequestAgainstFulfillingCluster(bulkRequest)); + } + + private Request createPolicy(String policyName, String matchIndex, String matchField, String[] enrichFields) throws IOException { + XContentBuilder body = JsonXContent.contentBuilder(); + body.startObject(); + body.startObject("match"); + body.field("indices", matchIndex); + body.field("match_field", matchField); + body.field("enrich_fields", enrichFields); + + body.endObject(); + body.endObject(); + + return makeRequest("PUT", "_enrich/policy/" + policyName, body); + } + + private Request executePolicy(String policyName) { + return new Request("PUT", "_enrich/policy/employees-policy/_execute"); + } + + private Request esqlRequest(String query) throws IOException { + XContentBuilder body = JsonXContent.contentBuilder(); + + body.startObject(); + body.field("query", query); + body.field("include_ccs_metadata", true); + body.endObject(); + + return makeRequest("POST", "_query", body); + } + + private Request makeRequest(String method, String endpoint, XContentBuilder requestBody) { + Request request = new Request(method, endpoint); + request.setJsonEntity(Strings.toString(requestBody)); + return request; + } + + private Response performRequestWithRemoteSearchUser(final Request request) throws IOException { + request.setOptions( + RequestOptions.DEFAULT.toBuilder().addHeader("Authorization", headerFromRandomAuthMethod(REMOTE_SEARCH_USER, PASS)) + ); + return client().performRequest(request); + } +} From d94a7c579eb9ade6702e8fd56e9bd5f4ca9436f8 Mon Sep 17 00:00:00 2001 From: Michael Peterson Date: Thu, 21 Nov 2024 08:56:10 -0500 Subject: [PATCH 12/12] PR feedback changes and added Pawan's CrossClusterEsqlRCS1EnrichUnavailableRemotesIT --- .../xpack/esql/session/EsqlSession.java | 5 +- ...terEsqlRCS1EnrichUnavailableRemotesIT.java | 353 ++++++++++++++++++ 2 files changed, 356 insertions(+), 2 deletions(-) create mode 100644 x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS1EnrichUnavailableRemotesIT.java diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index c1fd8458ca846..78945f4d9bc5c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.common.util.Maps; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.DriverProfile; @@ -298,8 +299,8 @@ private void preAnalyze( final Set clusters = enrichPolicyResolver.groupIndicesPerCluster( indices.stream().flatMap(t -> Arrays.stream(Strings.commaDelimitedListToStringArray(t.id().index()))).toArray(String[]::new) ).keySet(); - // key: cluster alias; value = skip_unavailable setting (false for local cluster) - Map targetClusters = new HashMap<>(); + // key: cluster alias; value = skip_unavailable setting + Map targetClusters = Maps.newHashMapWithExpectedSize(clusters.size()); for (String alias : clusters) { targetClusters.put(alias, executionInfo.isSkipUnavailable(alias)); } diff --git a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS1EnrichUnavailableRemotesIT.java b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS1EnrichUnavailableRemotesIT.java new file mode 100644 index 0000000000000..47c7ac8241fcd --- /dev/null +++ b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS1EnrichUnavailableRemotesIT.java @@ -0,0 +1,353 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.remotecluster; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.common.Strings; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.rules.RuleChain; +import org.junit.rules.TestRule; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.endsWith; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; + +public class CrossClusterEsqlRCS1EnrichUnavailableRemotesIT extends AbstractRemoteClusterSecurityTestCase { + private static final AtomicBoolean SSL_ENABLED_REF = new AtomicBoolean(); + + static { + fulfillingCluster = ElasticsearchCluster.local() + .name("fulfilling-cluster") + .nodes(1) + .module("x-pack-autoscaling") + .module("x-pack-esql") + .module("x-pack-enrich") + .module("x-pack-ml") + .module("ingest-common") + .apply(commonClusterConfig) + .setting("remote_cluster.port", "0") + .setting("xpack.ml.enabled", "false") + .setting("xpack.security.remote_cluster_server.ssl.enabled", () -> String.valueOf(SSL_ENABLED_REF.get())) + .setting("xpack.security.remote_cluster_server.ssl.key", "remote-cluster.key") + .setting("xpack.security.remote_cluster_server.ssl.certificate", "remote-cluster.crt") + .setting("xpack.security.authc.token.enabled", "true") + .keystore("xpack.security.remote_cluster_server.ssl.secure_key_passphrase", "remote-cluster-password") + .node(0, spec -> spec.setting("remote_cluster_server.enabled", "true")) + .build(); + + queryCluster = ElasticsearchCluster.local() + .name("query-cluster") + .module("x-pack-autoscaling") + .module("x-pack-esql") + .module("x-pack-enrich") + .module("x-pack-ml") + .module("ingest-common") + .apply(commonClusterConfig) + .setting("xpack.ml.enabled", "false") + .setting("xpack.security.remote_cluster_client.ssl.enabled", () -> String.valueOf(SSL_ENABLED_REF.get())) + .build(); + } + + @ClassRule + public static TestRule clusterRule = RuleChain.outerRule(fulfillingCluster).around(queryCluster); + + private final String[] modes = { "_coordinator", "_remote" }; + + @Before + public void setupPreRequisites() throws IOException { + setupRolesAndPrivileges(); + setSourceData(); + + var policy = createPolicy("employees-policy", "employees", "email", new String[] { "id", "designation" }); + // Create the enrich policy on both clusters. + assertOK(client().performRequest(policy)); + assertOK(performRequestAgainstFulfillingCluster(policy)); + + // Execute the enrich policy on both clusters. + var exec = executePolicy("employees-policy"); + assertOK(client().performRequest(exec)); + assertOK(performRequestAgainstFulfillingCluster(exec)); + } + + public void testEsqlEnrichWithSkipUnavailable() throws Exception { + esqlEnrichWithRandomSkipUnavailable(); + esqlEnrichWithSkipUnavailableTrue(); + esqlEnrichWithSkipUnavailableFalse(); + } + + private void esqlEnrichWithRandomSkipUnavailable() throws Exception { + configureRemoteCluster("my_remote_cluster", fulfillingCluster, true, randomBoolean(), randomBoolean()); + + String query = "FROM to-be-enr*,my_remote_cluster:to-be-enr* | ENRICH " + randomFrom(modes) + ":employees-policy | LIMIT 10"; + Response response = client().performRequest(esqlRequest(query)); + + Map map = responseAsMap(response); + ArrayList values = (ArrayList) map.get("values"); + Map clusters = (Map) map.get("_clusters"); + Map clusterDetails = (Map) clusters.get("details"); + Map localClusterDetails = (Map) clusterDetails.get("(local)"); + Map remoteClusterDetails = (Map) clusterDetails.get("my_remote_cluster"); + + assertOK(response); + assertThat((int) map.get("took"), greaterThan(0)); + assertThat(values.size(), is(6)); + for (int i = 0; i < 6; i++) { + ArrayList value = (ArrayList) values.get(i); + // Size is 3: ID, Email, Designation. + assertThat(value.size(), is(3)); + // Email + assertThat((String) value.get(0), endsWith("@corp.co")); + // ID + assertThat(value.get(1), is(i + 1)); + } + + assertThat((int) clusters.get("total"), is(2)); + assertThat((int) clusters.get("successful"), is(2)); + assertThat((int) clusters.get("running"), is(0)); + assertThat((int) clusters.get("skipped"), is(0)); + assertThat((int) clusters.get("partial"), is(0)); + assertThat((int) clusters.get("failed"), is(0)); + + assertThat(clusterDetails.size(), is(2)); + assertThat((int) localClusterDetails.get("took"), greaterThan(0)); + assertThat(localClusterDetails.get("status"), is("successful")); + + assertThat((int) remoteClusterDetails.get("took"), greaterThan(0)); + assertThat(remoteClusterDetails.get("status"), is("successful")); + } + + @SuppressWarnings("unchecked") + private void esqlEnrichWithSkipUnavailableTrue() throws Exception { + configureRemoteCluster("my_remote_cluster", fulfillingCluster, true, randomBoolean(), true); + + try { + fulfillingCluster.stop(true); + + String query = "FROM to-be-enriched,my_remote_cluster:to-be-enriched | ENRICH employees-policy | LIMIT 10"; + Response response = client().performRequest(esqlRequest(query)); + + Map map = responseAsMap(response); + ArrayList values = (ArrayList) map.get("values"); + Map clusters = (Map) map.get("_clusters"); + Map clusterDetails = (Map) clusters.get("details"); + Map localClusterDetails = (Map) clusterDetails.get("(local)"); + Map remoteClusterDetails = (Map) clusterDetails.get("my_remote_cluster"); + + assertOK(response); + assertThat((int) map.get("took"), greaterThan(0)); + assertThat(values.size(), is(3)); + + // We only have 3 values since the remote cluster is turned off. + for (int i = 0; i < 3; i++) { + ArrayList value = (ArrayList) values.get(i); + // Size is 3: ID, Email, Designation. + assertThat(value.size(), is(3)); + // Email + assertThat((String) value.get(0), endsWith("@corp.co")); + // ID + assertThat(value.get(1), is(i + 1)); + } + + assertThat((int) clusters.get("total"), is(2)); + assertThat((int) clusters.get("successful"), is(1)); + assertThat((int) clusters.get("running"), is(0)); + assertThat((int) clusters.get("skipped"), is(1)); + assertThat((int) clusters.get("partial"), is(0)); + assertThat((int) clusters.get("failed"), is(0)); + + assertThat(clusterDetails.size(), is(2)); + assertThat((int) localClusterDetails.get("took"), greaterThan(0)); + assertThat(localClusterDetails.get("status"), is("successful")); + + assertThat((int) remoteClusterDetails.get("took"), greaterThan(0)); + assertThat(remoteClusterDetails.get("status"), is("skipped")); + + ArrayList remoteClusterFailures = (ArrayList) remoteClusterDetails.get("failures"); + assertThat(remoteClusterFailures.size(), equalTo(1)); + Map failuresMap = (Map) remoteClusterFailures.get(0); + + Map reason = (Map) failuresMap.get("reason"); + assertThat(reason.get("type").toString(), equalTo("connect_transport_exception")); + assertThat(reason.get("reason").toString(), containsString("Unable to connect to [my_remote_cluster]")); + } finally { + fulfillingCluster.start(); + closeFulfillingClusterClient(); + initFulfillingClusterClient(); + } + } + + private void esqlEnrichWithSkipUnavailableFalse() throws Exception { + configureRemoteCluster("my_remote_cluster", fulfillingCluster, true, randomBoolean(), false); + + try { + fulfillingCluster.stop(true); + + String query = "FROM to-be-enr*,my_remote_cluster:to-be-enr* | ENRICH " + randomFrom(modes) + ":employees-policy | LIMIT 10"; + ResponseException ex = expectThrows(ResponseException.class, () -> client().performRequest(esqlRequest(query))); + assertThat(ex.getMessage(), containsString("connect_transport_exception")); + } finally { + fulfillingCluster.start(); + closeFulfillingClusterClient(); + initFulfillingClusterClient(); + } + } + + private void setupRolesAndPrivileges() throws IOException { + var putUserRequest = new Request("PUT", "/_security/user/" + REMOTE_SEARCH_USER); + putUserRequest.setJsonEntity(""" + { + "password": "x-pack-test-password", + "roles" : ["remote_search"] + }"""); + assertOK(adminClient().performRequest(putUserRequest)); + + var putRoleOnRemoteClusterRequest = new Request("PUT", "/_security/role/" + REMOTE_SEARCH_ROLE); + putRoleOnRemoteClusterRequest.setJsonEntity(""" + { + "indices": [ + { + "names": ["*"], + "privileges": ["read"] + } + ], + "cluster": [ "monitor_enrich", "manage_own_api_key" ], + "remote_indices": [ + { + "names": ["*"], + "privileges": ["read"], + "clusters": ["my_remote_cluster"] + } + ], + "remote_cluster": [ + { + "privileges": ["monitor_enrich"], + "clusters": ["my_remote_cluster"] + } + ] + }"""); + assertOK(adminClient().performRequest(putRoleOnRemoteClusterRequest)); + } + + private void setSourceData() throws IOException { + Request createIndex = new Request("PUT", "employees"); + createIndex.setJsonEntity(""" + { + "mappings": { + "properties": { + "id": { "type": "integer" }, + "email": { "type": "text" }, + "designation": { "type": "text" } + } + } + } + """); + assertOK(client().performRequest(createIndex)); + assertOK(performRequestAgainstFulfillingCluster(createIndex)); + + Request bulkRequest = new Request("POST", "/_bulk?refresh=true"); + bulkRequest.setJsonEntity(""" + { "index": { "_index": "employees" } } + { "id": 1, "email": "a@corp.co", "designation": "SDE intern"} + { "index": { "_index": "employees" } } + { "id": 2, "email": "b@corp.co", "designation": "SDE 1"} + { "index": { "_index": "employees" } } + { "id": 3, "email": "c@corp.co", "designation": "SDE 2"} + { "index": { "_index": "employees" } } + { "id": 4, "email": "d@corp.co", "designation": "SSE"} + { "index": { "_index": "employees" } } + { "id": 5, "email": "e@corp.co", "designation": "PSE 1"} + { "index": { "_index": "employees" } } + { "id": 6, "email": "f@corp.co", "designation": "PSE 2"} + """); + assertOK(client().performRequest(bulkRequest)); + assertOK(performRequestAgainstFulfillingCluster(bulkRequest)); + + createIndex = new Request("PUT", "to-be-enriched"); + createIndex.setJsonEntity(""" + { + "mappings": { + "properties": { + "email": { "type": "text" } + } + } + } + """); + assertOK(client().performRequest(createIndex)); + assertOK(performRequestAgainstFulfillingCluster(createIndex)); + + bulkRequest = new Request("POST", "/_bulk?refresh=true"); + bulkRequest.setJsonEntity(""" + { "index": { "_index": "to-be-enriched" } } + { "email": "a@corp.co"} + { "index": { "_index": "to-be-enriched" } } + { "email": "b@corp.co"} + { "index": { "_index": "to-be-enriched" } } + { "email": "c@corp.co"} + """); + assertOK(client().performRequest(bulkRequest)); + + bulkRequest = new Request("POST", "/_bulk?refresh=true"); + bulkRequest.setJsonEntity(""" + { "index": { "_index": "to-be-enriched" } } + { "email": "d@corp.co"} + { "index": { "_index": "to-be-enriched" } } + { "email": "e@corp.co"} + { "index": { "_index": "to-be-enriched" } } + { "email": "f@corp.co"} + """); + assertOK(performRequestAgainstFulfillingCluster(bulkRequest)); + } + + private Request createPolicy(String policyName, String matchIndex, String matchField, String[] enrichFields) throws IOException { + XContentBuilder body = JsonXContent.contentBuilder(); + body.startObject(); + body.startObject("match"); + body.field("indices", matchIndex); + body.field("match_field", matchField); + body.field("enrich_fields", enrichFields); + + body.endObject(); + body.endObject(); + + return makeRequest("PUT", "_enrich/policy/" + policyName, body); + } + + private Request executePolicy(String policyName) { + return new Request("PUT", "_enrich/policy/employees-policy/_execute"); + } + + private Request esqlRequest(String query) throws IOException { + XContentBuilder body = JsonXContent.contentBuilder(); + + body.startObject(); + body.field("query", query); + body.field("include_ccs_metadata", true); + body.endObject(); + + return makeRequest("POST", "_query", body); + } + + private Request makeRequest(String method, String endpoint, XContentBuilder requestBody) { + Request request = new Request(method, endpoint); + request.setJsonEntity(Strings.toString(requestBody)); + return request; + } +}