Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/116348.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 116348
summary: "ESQL: Honor skip_unavailable setting for nonmatching indices errors at planning time"
area: ES|QL
type: enhancement
issues: [ 114531 ]

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,37 @@
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

public final class IndexResolution {

public static IndexResolution valid(EsIndex index, Map<String, FieldCapabilitiesFailure> unavailableClusters) {
/**
* @param index EsIndex encapsulating requested index expression, resolved mappings and index modes from field-caps.
* @param resolvedIndices Set of concrete indices resolved by field-caps. (This information is not always present in the EsIndex).
* @param unavailableClusters Remote clusters that could not be contacted during planning
* @return valid IndexResolution
*/
public static IndexResolution valid(
EsIndex index,
Set<String> resolvedIndices,
Map<String, FieldCapabilitiesFailure> unavailableClusters
) {
Objects.requireNonNull(index, "index must not be null if it was found");
Objects.requireNonNull(resolvedIndices, "resolvedIndices must not be null");
Objects.requireNonNull(unavailableClusters, "unavailableClusters must not be null");
return new IndexResolution(index, null, unavailableClusters);
return new IndexResolution(index, null, resolvedIndices, unavailableClusters);
}

/**
* Use this method only if the set of concrete resolved indices is the same as EsIndex#concreteIndices().
*/
public static IndexResolution valid(EsIndex index) {
return valid(index, Collections.emptyMap());
return valid(index, index.concreteIndices(), Collections.emptyMap());
}

public static IndexResolution invalid(String invalid) {
Objects.requireNonNull(invalid, "invalid must not be null to signal that the index is invalid");
return new IndexResolution(null, invalid, Collections.emptyMap());
return new IndexResolution(null, invalid, Collections.emptySet(), Collections.emptyMap());
}

public static IndexResolution notFound(String name) {
Expand All @@ -39,12 +54,20 @@ public static IndexResolution notFound(String name) {
@Nullable
private final String invalid;

// all indices found by field-caps
private final Set<String> resolvedIndices;
// remote clusters included in the user's index expression that could not be connected to
private final Map<String, FieldCapabilitiesFailure> unavailableClusters;

private IndexResolution(EsIndex index, @Nullable String invalid, Map<String, FieldCapabilitiesFailure> unavailableClusters) {
private IndexResolution(
EsIndex index,
@Nullable String invalid,
Set<String> resolvedIndices,
Map<String, FieldCapabilitiesFailure> unavailableClusters
) {
this.index = index;
this.invalid = invalid;
this.resolvedIndices = resolvedIndices;
this.unavailableClusters = unavailableClusters;
}

Expand All @@ -64,8 +87,8 @@ public EsIndex get() {
}

/**
* Is the index valid for use with ql? Returns {@code false} if the
* index wasn't found.
* Is the index valid for use with ql?
* @return {@code false} if the index wasn't found.
*/
public boolean isValid() {
return invalid == null;
Expand All @@ -75,10 +98,17 @@ public boolean isValid() {
* @return Map of unavailable clusters (could not be connected to during field-caps query). Key of map is cluster alias,
* value is the {@link FieldCapabilitiesFailure} describing the issue.
*/
public Map<String, FieldCapabilitiesFailure> getUnavailableClusters() {
public Map<String, FieldCapabilitiesFailure> unavailableClusters() {
return unavailableClusters;
}

/**
* @return all indices found by field-caps (regardless of whether they had any mappings)
*/
public Set<String> resolvedIndices() {
return resolvedIndices;
}

@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
Expand All @@ -87,16 +117,29 @@ public boolean equals(Object obj) {
IndexResolution other = (IndexResolution) obj;
return Objects.equals(index, other.index)
&& Objects.equals(invalid, other.invalid)
&& Objects.equals(resolvedIndices, other.resolvedIndices)
&& Objects.equals(unavailableClusters, other.unavailableClusters);
}

@Override
public int hashCode() {
return Objects.hash(index, invalid, unavailableClusters);
return Objects.hash(index, invalid, resolvedIndices, unavailableClusters);
}

@Override
public String toString() {
return invalid != null ? invalid : index.name();
return invalid != null
? invalid
: "IndexResolution{"
+ "index="
+ index
+ ", invalid='"
+ invalid
+ '\''
+ ", resolvedIndices="
+ resolvedIndices
+ ", unavailableClusters="
+ unavailableClusters
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -251,19 +251,17 @@ private static void updateExecutionInfoAfterCoordinatorOnlyQuery(EsqlExecutionIn
if (execInfo.isCrossClusterSearch()) {
assert execInfo.planningTookTime() != null : "Planning took time should be set on EsqlExecutionInfo but is null";
for (String clusterAlias : execInfo.clusterAliases()) {
// took time and shard counts for SKIPPED clusters were added at end of planning, so only update other cases here
if (execInfo.getCluster(clusterAlias).getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) {
execInfo.swapCluster(
clusterAlias,
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTook(execInfo.overallTook())
.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)
.setTotalShards(0)
.setSuccessfulShards(0)
.setSkippedShards(0)
.setFailedShards(0)
.build()
);
}
execInfo.swapCluster(clusterAlias, (k, v) -> {
var builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(execInfo.overallTook())
.setTotalShards(0)
.setSuccessfulShards(0)
.setSkippedShards(0)
.setFailedShards(0);
if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) {
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL);
}
return builder.build();
});
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ private <T> void preAnalyze(
// resolution to updateExecutionInfo
if (indexResolution.isValid()) {
EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.getUnavailableClusters());
EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.unavailableClusters());
if (executionInfo.isCrossClusterSearch()
&& executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) == 0) {
// for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
import org.elasticsearch.xpack.esql.analysis.Analyzer;
import org.elasticsearch.xpack.esql.index.IndexResolution;
Expand All @@ -33,7 +34,6 @@ class EsqlSessionCCSUtils {

private EsqlSessionCCSUtils() {}

// visible for testing
static Map<String, FieldCapabilitiesFailure> determineUnavailableRemoteClusters(List<FieldCapabilitiesFailure> failures) {
Map<String, FieldCapabilitiesFailure> unavailableRemotes = new HashMap<>();
for (FieldCapabilitiesFailure failure : failures) {
Expand Down Expand Up @@ -75,10 +75,10 @@ public void onFailure(Exception e) {

/**
* Whether to return an empty result (HTTP status 200) for a CCS rather than a top level 4xx/5xx error.
*
* <p>
* For cases where field-caps had no indices to search and the remotes were unavailable, we
* return an empty successful response (200) if all remotes are marked with skip_unavailable=true.
*
* <p>
* Note: a follow-on PR will expand this logic to handle cases where no indices could be found to match
* on any of the requested clusters.
*/
Expand Down Expand Up @@ -132,7 +132,6 @@ static void updateExecutionInfoToReturnEmptyResult(EsqlExecutionInfo executionIn
}
}

// visible for testing
static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo executionInfo) {
StringBuilder sb = new StringBuilder();
for (String clusterAlias : executionInfo.clusterAliases()) {
Expand Down Expand Up @@ -181,39 +180,91 @@ static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInf
}
}

// visible for testing
static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionInfo executionInfo, IndexResolution indexResolution) {
Set<String> clustersWithResolvedIndices = new HashSet<>();
// determine missing clusters
for (String indexName : indexResolution.get().indexNameWithModes().keySet()) {
for (String indexName : indexResolution.resolvedIndices()) {
clustersWithResolvedIndices.add(RemoteClusterAware.parseClusterAlias(indexName));
}
Set<String> clustersRequested = executionInfo.clusterAliases();
Set<String> clustersWithNoMatchingIndices = Sets.difference(clustersRequested, clustersWithResolvedIndices);
clustersWithNoMatchingIndices.removeAll(indexResolution.getUnavailableClusters().keySet());
clustersWithNoMatchingIndices.removeAll(indexResolution.unavailableClusters().keySet());

/**
* Rules enforced at planning time around non-matching indices
* P1. fail query if no matching indices on any cluster (VerificationException) - that is handled elsewhere (TODO: document where)
* P2. fail query if a skip_unavailable:false cluster has no matching indices (the local cluster already has this rule
* enforced at planning time)
* P3. fail query if the local cluster has no matching indices and a concrete index was specified
*/
String fatalErrorMessage = null;
/*
* These are clusters in the original request that are not present in the field-caps response. They were
* specified with an index or indices that do not exist, so the search on that cluster is done.
* specified with an index expression matched no indices, so the search on that cluster is done.
* Mark it as SKIPPED with 0 shards searched and took=0.
*/
for (String c : clustersWithNoMatchingIndices) {
// TODO: in a follow-on PR, throw a Verification(400 status code) for local and remotes with skip_unavailable=false if
// they were requested with one or more concrete indices
// for now we never mark the local cluster as SKIPPED
final var status = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(c)
? EsqlExecutionInfo.Cluster.Status.SUCCESSFUL
: EsqlExecutionInfo.Cluster.Status.SKIPPED;
executionInfo.swapCluster(
c,
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(status)
.setTook(new TimeValue(0))
.setTotalShards(0)
.setSuccessfulShards(0)
.setSkippedShards(0)
.setFailedShards(0)
.build()
);
final String indexExpression = executionInfo.getCluster(c).getIndexExpression();
if (missingIndicesIsFatal(c, executionInfo)) {
String error = Strings.format(
"Unknown index [%s]",
(c.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) ? indexExpression : c + ":" + indexExpression)
);
if (fatalErrorMessage == null) {
fatalErrorMessage = error;
} else {
fatalErrorMessage += "; " + error;
}
} else {
// handles local cluster (when no concrete indices requested) and skip_unavailable=true clusters
EsqlExecutionInfo.Cluster.Status status;
ShardSearchFailure failure;
if (c.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
status = EsqlExecutionInfo.Cluster.Status.SUCCESSFUL;
failure = null;
} else {
status = EsqlExecutionInfo.Cluster.Status.SKIPPED;
failure = new ShardSearchFailure(new VerificationException("Unknown index [" + indexExpression + "]"));
}
executionInfo.swapCluster(c, (k, v) -> {
var builder = new EsqlExecutionInfo.Cluster.Builder(v).setStatus(status)
.setTook(new TimeValue(0))
.setTotalShards(0)
.setSuccessfulShards(0)
.setSkippedShards(0)
.setFailedShards(0);
if (failure != null) {
builder.setFailures(List.of(failure));
}
return builder.build();
});
}
}
if (fatalErrorMessage != null) {
throw new VerificationException(fatalErrorMessage);
}
}

// visible for testing
static boolean missingIndicesIsFatal(String clusterAlias, EsqlExecutionInfo executionInfo) {
// missing indices on local cluster is fatal only if a concrete index requested
if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
return concreteIndexRequested(executionInfo.getCluster(clusterAlias).getIndexExpression());
}
return executionInfo.getCluster(clusterAlias).isSkipUnavailable() == false;
}

private static boolean concreteIndexRequested(String indexExpression) {
for (String expr : indexExpression.split(",")) {
if (expr.charAt(0) == '<' || expr.startsWith("-<")) {
// skip date math expressions
continue;
}
if (expr.indexOf('*') < 0) {
return true;
}
}
return false;
}

// visible for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package org.elasticsearch.xpack.esql.session;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesIndexResponse;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
Expand Down Expand Up @@ -143,21 +144,24 @@ public IndexResolution mergedMappings(String indexPattern, FieldCapabilitiesResp
fields.put(name, field);
}

Map<String, FieldCapabilitiesFailure> unavailableRemotes = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(
fieldCapsResponse.getFailures()
);

Map<String, IndexMode> concreteIndices = Maps.newMapWithExpectedSize(fieldCapsResponse.getIndexResponses().size());
for (FieldCapabilitiesIndexResponse ir : fieldCapsResponse.getIndexResponses()) {
concreteIndices.put(ir.getIndexName(), ir.getIndexMode());
}

boolean allEmpty = true;
for (FieldCapabilitiesIndexResponse ir : fieldCapsResponse.getIndexResponses()) {
allEmpty &= ir.get().isEmpty();
}
if (allEmpty) {
// If all the mappings are empty we return an empty set of resolved indices to line up with QL
return IndexResolution.valid(new EsIndex(indexPattern, rootFields, Map.of()));
}

Map<String, IndexMode> concreteIndices = Maps.newMapWithExpectedSize(fieldCapsResponse.getIndexResponses().size());
for (FieldCapabilitiesIndexResponse ir : fieldCapsResponse.getIndexResponses()) {
concreteIndices.put(ir.getIndexName(), ir.getIndexMode());
return IndexResolution.valid(new EsIndex(indexPattern, rootFields, Map.of()), concreteIndices.keySet(), unavailableRemotes);
}
EsIndex esIndex = new EsIndex(indexPattern, rootFields, concreteIndices);
return IndexResolution.valid(esIndex, EsqlSessionCCSUtils.determineUnavailableRemoteClusters(fieldCapsResponse.getFailures()));
return IndexResolution.valid(new EsIndex(indexPattern, rootFields, concreteIndices), concreteIndices.keySet(), unavailableRemotes);
}

private boolean allNested(List<IndexFieldCapabilities> caps) {
Expand Down
Loading