From a99dcf97046ec2a59c799d5f0ba22198b5f623b8 Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Wed, 2 May 2018 14:59:22 +0300 Subject: [PATCH 1/2] SQL: Fix bug caused by empty composites When dealing with filtering, a composite aggregation might return empty buckets (which have been filtered) which gets sent as is to the client. Unfortunately this interprets the response as no more data instead of retrying. This now has changed and the listener keeps retrying until either the query has ended or data passes the filter. Fix #30292 SQL: Fix bug related to HAVING filtering --- .../search/CompositeAggregationCursor.java | 41 +++++++++++++++---- .../xpack/sql/execution/search/Querier.java | 9 +++- .../xpack/qa/sql/security/JdbcSqlSpecIT.java | 2 - 3 files changed, 42 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java index fe9479f3c1aa4..810bbc4f8185f 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java @@ -113,12 +113,36 @@ public void nextPage(Configuration cfg, Client client, NamedWriteableRegistry re SearchRequest search = Querier.prepareRequest(client, query, cfg.pageTimeout(), indices); - client.search(search, ActionListener.wrap(r -> { - updateCompositeAfterKey(r, query); - CompositeAggsRowSet rowSet = new CompositeAggsRowSet(extractors, r, limit, - serializeQuery(query), indices); - listener.onResponse(rowSet); - }, listener::onFailure)); + client.search(search, new ActionListener() { + @Override + public void onResponse(SearchResponse r) { + try { + // retry + if (shouldRetryDueToEmptyPage(r)) { + CompositeAggregationCursor.updateCompositeAfterKey(r, search.source()); + client.search(search, this); + return; + } + + updateCompositeAfterKey(r, query); + CompositeAggsRowSet rowSet = new CompositeAggsRowSet(extractors, r, limit, serializeQuery(query), indices); + listener.onResponse(rowSet); + } catch (Exception ex) { + listener.onFailure(ex); + } + } + + @Override + public void onFailure(Exception ex) { + listener.onFailure(ex); + } + }); + } + + static boolean shouldRetryDueToEmptyPage(SearchResponse response) { + CompositeAggregation composite = getComposite(response); + // if there are no buckets but a next page, go fetch it instead of sending an empty response to the client + return composite != null && composite.getBuckets().isEmpty() && composite.afterKey() != null && !composite.afterKey().isEmpty(); } static CompositeAggregation getComposite(SearchResponse response) { @@ -134,7 +158,7 @@ static CompositeAggregation getComposite(SearchResponse response) { throw new SqlIllegalArgumentException("Unrecognized root group found; {}", agg.getClass()); } - static void updateCompositeAfterKey(SearchResponse r, SearchSourceBuilder next) { + static boolean updateCompositeAfterKey(SearchResponse r, SearchSourceBuilder next) { CompositeAggregation composite = getComposite(r); if (composite == null) { @@ -152,7 +176,10 @@ static void updateCompositeAfterKey(SearchResponse r, SearchSourceBuilder next) } else { throw new SqlIllegalArgumentException("Invalid client request; expected a group-by but instead got {}", aggBuilder); } + + return true; } + return false; } /** diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java index 3c10f08c53a8d..62941a5b14f07 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/Querier.java @@ -206,8 +206,15 @@ static class CompositeActionListener extends BaseAggActionListener { protected void handleResponse(SearchResponse response, ActionListener listener) { // there are some results if (response.getAggregations().asList().size() > 0) { - CompositeAggregationCursor.updateCompositeAfterKey(response, request.source()); + // retry + if (CompositeAggregationCursor.shouldRetryDueToEmptyPage(response)) { + CompositeAggregationCursor.updateCompositeAfterKey(response, request.source()); + client.search(request, this); + return; + } + + CompositeAggregationCursor.updateCompositeAfterKey(response, request.source()); byte[] nextSearch = null; try { nextSearch = CompositeAggregationCursor.serializeQuery(request.source()); diff --git a/x-pack/qa/sql/security/src/test/java/org/elasticsearch/xpack/qa/sql/security/JdbcSqlSpecIT.java b/x-pack/qa/sql/security/src/test/java/org/elasticsearch/xpack/qa/sql/security/JdbcSqlSpecIT.java index caa0613595921..609847f513e3a 100644 --- a/x-pack/qa/sql/security/src/test/java/org/elasticsearch/xpack/qa/sql/security/JdbcSqlSpecIT.java +++ b/x-pack/qa/sql/security/src/test/java/org/elasticsearch/xpack/qa/sql/security/JdbcSqlSpecIT.java @@ -5,13 +5,11 @@ */ package org.elasticsearch.xpack.qa.sql.security; -import org.apache.lucene.util.LuceneTestCase.AwaitsFix; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.xpack.qa.sql.jdbc.SqlSpecTestCase; import java.util.Properties; -@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/30292") public class JdbcSqlSpecIT extends SqlSpecTestCase { public JdbcSqlSpecIT(String fileName, String groupName, String testName, Integer lineNumber, String query) { super(fileName, groupName, testName, lineNumber, query); From fb1ce3fd15061fedf2d3b427f9feadf28235ed0d Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Wed, 2 May 2018 20:06:01 +0300 Subject: [PATCH 2/2] Eliminate some unneeded changes --- .../sql/execution/search/CompositeAggregationCursor.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java index 810bbc4f8185f..31d933f9f59d6 100644 --- a/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java +++ b/x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursor.java @@ -158,7 +158,7 @@ static CompositeAggregation getComposite(SearchResponse response) { throw new SqlIllegalArgumentException("Unrecognized root group found; {}", agg.getClass()); } - static boolean updateCompositeAfterKey(SearchResponse r, SearchSourceBuilder next) { + static void updateCompositeAfterKey(SearchResponse r, SearchSourceBuilder next) { CompositeAggregation composite = getComposite(r); if (composite == null) { @@ -176,10 +176,7 @@ static boolean updateCompositeAfterKey(SearchResponse r, SearchSourceBuilder nex } else { throw new SqlIllegalArgumentException("Invalid client request; expected a group-by but instead got {}", aggBuilder); } - - return true; } - return false; } /**