diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java index fe9479f3c1aa4..31d933f9f59d6 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java @@ -113,12 +113,36 @@ public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry re SearchRequest search = Querier.prepareRequest(client, query, cfg.pageTimeout(), indices); - client.search(search, ActionListener.wrap(r -> { - updateCompositeAfterKey(r, query); - CompositeAggsRowSet rowSet = new CompositeAggsRowSet(extractors, r, limit, - serializeQuery(query), indices); - listener.onResponse(rowSet); - }, listener::onFailure)); + client.search(search, new ActionListener() { + @Override + public void onResponse(SearchResponse r) { + try { + // retry + if (shouldRetryDueToEmptyPage(r)) { + CompositeAggregationCursor.updateCompositeAfterKey(r, search.source()); + client.search(search, this); + return; + } + + updateCompositeAfterKey(r, query); + CompositeAggsRowSet rowSet = new CompositeAggsRowSet(extractors, r, limit, serializeQuery(query), indices); + listener.onResponse(rowSet); + } catch (Exception ex) { + listener.onFailure(ex); + } + } + + @Override + public void onFailure(Exception ex) { + listener.onFailure(ex); + } + }); + } + + static boolean shouldRetryDueToEmptyPage(SearchResponse response) { + CompositeAggregation composite = getComposite(response); + // if there are no buckets but a next page, go fetch it instead of sending an empty response to the client + return composite != null && composite.getBuckets().isEmpty() && composite.afterKey() != null && !composite.afterKey().isEmpty(); } static CompositeAggregation getComposite(SearchResponse response) { diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java index 3c10f08c53a8d..62941a5b14f07 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java @@ -206,8 +206,15 @@ static class CompositeActionListener extends BaseAggActionListener { protected void handleResponse(SearchResponse response, ActionListener listener) { // there are some results if (response.getAggregations().asList().size() > 0) { - CompositeAggregationCursor.updateCompositeAfterKey(response, request.source()); + // retry + if (CompositeAggregationCursor.shouldRetryDueToEmptyPage(response)) { + CompositeAggregationCursor.updateCompositeAfterKey(response, request.source()); + client.search(request, this); + return; + } + + CompositeAggregationCursor.updateCompositeAfterKey(response, request.source()); byte[] nextSearch = null; try { nextSearch = CompositeAggregationCursor.serializeQuery(request.source()); diff --git a/x-pack/qa/sql/security/src/test/java/org/elasticsearch/xpack/qa/sql/security/JdbcSqlSpecIT.java b/x-pack/qa/sql/security/src/test/java/org/elasticsearch/xpack/qa/sql/security/JdbcSqlSpecIT.java index caa0613595921..609847f513e3a 100644 --- a/x-pack/qa/sql/security/src/test/java/org/elasticsearch/xpack/qa/sql/security/JdbcSqlSpecIT.java +++ b/x-pack/qa/sql/security/src/test/java/org/elasticsearch/xpack/qa/sql/security/JdbcSqlSpecIT.java @@ -5,13 +5,11 @@ */ package org.elasticsearch.xpack.qa.sql.security; -import org.apache.lucene.util.LuceneTestCase.AwaitsFix; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.xpack.qa.sql.jdbc.SqlSpecTestCase; import java.util.Properties; -@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/30292") public class JdbcSqlSpecIT extends SqlSpecTestCase { public JdbcSqlSpecIT(String fileName, String groupName, String testName, Integer lineNumber, String query) { super(fileName, groupName, testName, lineNumber, query);