From 81c221d478fe4658d8722f4208dac7c17744f7b7 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Fri, 10 Aug 2018 17:46:43 -0400 Subject: [PATCH 1/3] [Rollup] Return empty response when aggs are missing If a search request doesn't contain aggs (or an empty agg object), we should just retun an empty response. This is how the normal search API works if you specify zero hits and empty aggs. The existing behavior throws an exception because it tries to send an empty msearch. Closes #32256 --- .../rollup/RollupResponseTranslator.java | 38 +++++++++++++++++-- .../action/TransportRollupSearchAction.java | 17 ++++++--- .../RollupResponseTranslationTests.java | 9 +++-- .../rollup/action/SearchActionTests.java | 13 ++++--- .../test/rollup/rollup_search.yml | 14 +++++++ 5 files changed, 72 insertions(+), 19 deletions(-) diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupResponseTranslator.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupResponseTranslator.java index 4042e98ef93fb..3f0ddd2f88e35 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupResponseTranslator.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupResponseTranslator.java @@ -238,11 +238,41 @@ private static SearchResponse doCombineResponse(SearchResponse liveResponse, Lis ? (InternalAggregations)liveResponse.getAggregations() : InternalAggregations.EMPTY; - rolledResponses.forEach(r -> { - if (r == null || r.getAggregations() == null || r.getAggregations().asList().size() == 0) { - throw new RuntimeException("Expected to find aggregations in rollup response, but none found."); + int missingRollupAggs = rolledResponses.stream().mapToInt(searchResponse -> { + if (searchResponse == null + || searchResponse.getAggregations() == null + || searchResponse.getAggregations().asList().size() == 0) { + return 1; } - }); + return 0; + }).sum(); + + // We had no rollup aggs, so there is nothing to process + if (missingRollupAggs == rolledResponses.size()) { + // If we had a live response (even if it was empty) just return that + if (liveResponse != null) { + return liveResponse; + } else { + // Otherwise we just had rollup, so build an empty response + InternalSearchResponse combinedInternal = new InternalSearchResponse(SearchHits.empty(), + InternalAggregations.EMPTY, null, null, + rolledResponses.stream().anyMatch(SearchResponse::isTimedOut), + rolledResponses.stream().anyMatch(SearchResponse::isTimedOut), + rolledResponses.stream().mapToInt(SearchResponse::getNumReducePhases).sum()); + + int totalShards = rolledResponses.stream().mapToInt(SearchResponse::getTotalShards).sum(); + int sucessfulShards = rolledResponses.stream().mapToInt(SearchResponse::getSuccessfulShards).sum(); + int skippedShards = rolledResponses.stream().mapToInt(SearchResponse::getSkippedShards).sum(); + long took = rolledResponses.stream().mapToLong(r -> r.getTook().getMillis()).sum() ; + + // Shard failures are ignored atm, so returning an empty array is fine + return new SearchResponse(combinedInternal, null, totalShards, sucessfulShards, skippedShards, + took, ShardSearchFailure.EMPTY_ARRAY, rolledResponses.get(0).getClusters()); + } + } else if (missingRollupAggs > 0 && missingRollupAggs != rolledResponses.size()) { + // We were missing some but not all the aggs, unclear how to handle this. Bail. + throw new RuntimeException("Expected to find aggregations in rollup response, but none found."); + } // The combination process returns a tree that is identical to the non-rolled // which means we can use aggregation's reduce method to combine, just as if diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportRollupSearchAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportRollupSearchAction.java index c63ab96fa2595..ea0319c34328b 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportRollupSearchAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportRollupSearchAction.java @@ -155,6 +155,18 @@ static MultiSearchRequest createMSearchRequest(SearchRequest request, NamedWrite rolledSearchSource.size(0); AggregatorFactories.Builder sourceAgg = request.source().aggregations(); + // If there are no aggs in the request, our translation won't create any msearch. + // So just add an dummy request to the msearch and return. This is a bit silly + // but maintains how the regular search API behaves + if (sourceAgg == null || sourceAgg.count() == 0) { + + // Note: we can't apply any query rewriting or filtering on the query because there + // are no validated caps, so we have no idea what job is intended here. The only thing + // this affects is doc count, since hits and aggs will both be empty it doesn't really matter. + msearch.add(new SearchRequest(context.getRollupIndices(), request.source()).types(request.types())); + return msearch; + } + // Find our list of "best" job caps Set validatedCaps = new HashSet<>(); sourceAgg.getAggregatorFactories() @@ -248,11 +260,6 @@ static void validateSearchRequest(SearchRequest request) { if (request.source().explain() != null && request.source().explain()) { throw new IllegalArgumentException("Rollup search does not support explaining."); } - - // Rollup is only useful if aggregations are set, throw an exception otherwise - if (request.source().aggregations() == null) { - throw new IllegalArgumentException("Rollup requires at least one aggregation to be set."); - } } static QueryBuilder rewriteQuery(QueryBuilder builder, Set jobCaps) { diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupResponseTranslationTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupResponseTranslationTests.java index 35d9f0d133a3d..73a4d0665c4e1 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupResponseTranslationTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupResponseTranslationTests.java @@ -198,10 +198,11 @@ public void testRolledMissingAggs() { BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); ScriptService scriptService = mock(ScriptService.class); - Exception e = expectThrows(RuntimeException.class, - () -> RollupResponseTranslator.combineResponses(msearch, - new InternalAggregation.ReduceContext(bigArrays, scriptService, true))); - assertThat(e.getMessage(), equalTo("Expected to find aggregations in rollup response, but none found.")); + SearchResponse response = RollupResponseTranslator.combineResponses(msearch, + new InternalAggregation.ReduceContext(bigArrays, scriptService, true)); + assertNotNull(response); + Aggregations responseAggs = response.getAggregations(); + assertThat(responseAggs.asList().size(), equalTo(0)); } public void testMissingRolledIndex() { diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/SearchActionTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/SearchActionTests.java index 069e23e4093de..3cc6190db30d5 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/SearchActionTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/SearchActionTests.java @@ -307,21 +307,22 @@ public void testExplain() { assertThat(e.getMessage(), equalTo("Rollup search does not support explaining.")); } - public void testNoAgg() { - String[] normalIndices = new String[]{randomAlphaOfLength(10)}; + public void testNoRollupAgg() { + String[] normalIndices = new String[]{}; String[] rollupIndices = new String[]{randomAlphaOfLength(10)}; TransportRollupSearchAction.RollupSearchContext ctx = new TransportRollupSearchAction.RollupSearchContext(normalIndices, rollupIndices, Collections.emptySet()); SearchSourceBuilder source = new SearchSourceBuilder(); source.query(new MatchAllQueryBuilder()); source.size(0); - SearchRequest request = new SearchRequest(normalIndices, source); + SearchRequest request = new SearchRequest(rollupIndices, source); NamedWriteableRegistry registry = mock(NamedWriteableRegistry.class); - Exception e = expectThrows(IllegalArgumentException.class, - () -> TransportRollupSearchAction.createMSearchRequest(request, registry, ctx)); - assertThat(e.getMessage(), equalTo("Rollup requires at least one aggregation to be set.")); + MultiSearchRequest msearch = TransportRollupSearchAction.createMSearchRequest(request, registry, ctx); + assertThat(msearch.requests().size(), equalTo(1)); + assertThat(msearch.requests().get(0), equalTo(request)); } + public void testNoLiveNoRollup() { String[] normalIndices = new String[0]; String[] rollupIndices = new String[0]; diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/rollup_search.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/rollup_search.yml index d401d5c69bacb..e2f1174665ea6 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/rollup_search.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/rollup_search.yml @@ -152,6 +152,20 @@ setup: - match: { aggregations.histo.buckets.3.key_as_string: "2017-01-01T08:00:00.000Z" } - match: { aggregations.histo.buckets.3.doc_count: 20 } +--- +"Empty aggregation": + + - do: + xpack.rollup.rollup_search: + index: "foo_rollup" + body: + size: 0 + aggs: {} + + - length: { hits.hits: 0 } + - match: { hits.total: 0 } + - is_false: aggregations + --- "Search with Metric": From f25d427956ea80da582195eeb9b552f5a42de0ad Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Mon, 13 Aug 2018 12:51:47 -0400 Subject: [PATCH 2/3] Tweak response to include shard, failure, timeout, etc stats Also noticed a bug where isTimedOut was used for isTerminatedEarly --- .../rollup/RollupResponseTranslator.java | 46 ++++++++----------- 1 file changed, 20 insertions(+), 26 deletions(-) diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupResponseTranslator.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupResponseTranslator.java index 3f0ddd2f88e35..a38adf5d9de3a 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupResponseTranslator.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupResponseTranslator.java @@ -249,26 +249,8 @@ private static SearchResponse doCombineResponse(SearchResponse liveResponse, Lis // We had no rollup aggs, so there is nothing to process if (missingRollupAggs == rolledResponses.size()) { - // If we had a live response (even if it was empty) just return that - if (liveResponse != null) { - return liveResponse; - } else { - // Otherwise we just had rollup, so build an empty response - InternalSearchResponse combinedInternal = new InternalSearchResponse(SearchHits.empty(), - InternalAggregations.EMPTY, null, null, - rolledResponses.stream().anyMatch(SearchResponse::isTimedOut), - rolledResponses.stream().anyMatch(SearchResponse::isTimedOut), - rolledResponses.stream().mapToInt(SearchResponse::getNumReducePhases).sum()); - - int totalShards = rolledResponses.stream().mapToInt(SearchResponse::getTotalShards).sum(); - int sucessfulShards = rolledResponses.stream().mapToInt(SearchResponse::getSuccessfulShards).sum(); - int skippedShards = rolledResponses.stream().mapToInt(SearchResponse::getSkippedShards).sum(); - long took = rolledResponses.stream().mapToLong(r -> r.getTook().getMillis()).sum() ; - - // Shard failures are ignored atm, so returning an empty array is fine - return new SearchResponse(combinedInternal, null, totalShards, sucessfulShards, skippedShards, - took, ShardSearchFailure.EMPTY_ARRAY, rolledResponses.get(0).getClusters()); - } + // Return an empty response, but make sure we include all the shard, failure, etc stats + return mergeFinalResponse(liveResponse, rolledResponses, InternalAggregations.EMPTY); } else if (missingRollupAggs > 0 && missingRollupAggs != rolledResponses.size()) { // We were missing some but not all the aggs, unclear how to handle this. Bail. throw new RuntimeException("Expected to find aggregations in rollup response, but none found."); @@ -305,27 +287,39 @@ private static SearchResponse doCombineResponse(SearchResponse liveResponse, Lis new InternalAggregation.ReduceContext(reduceContext.bigArrays(), reduceContext.scriptService(), true)); } - // TODO allow profiling in the future - InternalSearchResponse combinedInternal = new InternalSearchResponse(SearchHits.empty(), currentTree, null, null, - rolledResponses.stream().anyMatch(SearchResponse::isTimedOut), - rolledResponses.stream().anyMatch(SearchResponse::isTimedOut), - rolledResponses.stream().mapToInt(SearchResponse::getNumReducePhases).sum()); + return mergeFinalResponse(liveResponse, rolledResponses, currentTree); + } + + private static SearchResponse mergeFinalResponse(SearchResponse liveResponse, List rolledResponses, + InternalAggregations aggs) { int totalShards = rolledResponses.stream().mapToInt(SearchResponse::getTotalShards).sum(); int sucessfulShards = rolledResponses.stream().mapToInt(SearchResponse::getSuccessfulShards).sum(); int skippedShards = rolledResponses.stream().mapToInt(SearchResponse::getSkippedShards).sum(); long took = rolledResponses.stream().mapToLong(r -> r.getTook().getMillis()).sum() ; + boolean isTimedOut = rolledResponses.stream().anyMatch(SearchResponse::isTimedOut); + boolean isTerminatedEarly = rolledResponses.stream() + .filter(r -> r.isTerminatedEarly() != null) + .anyMatch(SearchResponse::isTerminatedEarly); + int numReducePhases = rolledResponses.stream().mapToInt(SearchResponse::getNumReducePhases).sum(); + if (liveResponse != null) { totalShards += liveResponse.getTotalShards(); sucessfulShards += liveResponse.getSuccessfulShards(); skippedShards += liveResponse.getSkippedShards(); took = Math.max(took, liveResponse.getTook().getMillis()); + isTimedOut = isTimedOut && liveResponse.isTimedOut(); + isTerminatedEarly = isTerminatedEarly && liveResponse.isTerminatedEarly(); + numReducePhases += liveResponse.getNumReducePhases(); } + InternalSearchResponse combinedInternal = new InternalSearchResponse(SearchHits.empty(), aggs, null, null, + isTimedOut, isTerminatedEarly, numReducePhases); + // Shard failures are ignored atm, so returning an empty array is fine return new SearchResponse(combinedInternal, null, totalShards, sucessfulShards, skippedShards, - took, ShardSearchFailure.EMPTY_ARRAY, rolledResponses.get(0).getClusters()); + took, ShardSearchFailure.EMPTY_ARRAY, rolledResponses.get(0).getClusters()); } /** From 81991fb0a8d50f736e4a1880f42d2ab095dc3440 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Tue, 14 Aug 2018 12:34:44 -0400 Subject: [PATCH 3/3] Tweak doc tests --- x-pack/docs/en/rest-api/rollup/rollup-search.asciidoc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/x-pack/docs/en/rest-api/rollup/rollup-search.asciidoc b/x-pack/docs/en/rest-api/rollup/rollup-search.asciidoc index 470cbc4eaf57d..2fb5aea2a8e6a 100644 --- a/x-pack/docs/en/rest-api/rollup/rollup-search.asciidoc +++ b/x-pack/docs/en/rest-api/rollup/rollup-search.asciidoc @@ -101,6 +101,7 @@ GET /sensor_rollup/_rollup_search -------------------------------------------------- // CONSOLE // TEST[setup:sensor_prefab_data] +// TEST[s/_rollup_search/_rollup_search?filter_path=took,timed_out,terminated_early,_shards,hits,aggregations/] The query is targeting the `sensor_rollup` data, since this contains the rollup data as configured in the job. A `max` aggregation has been used on the `temperature` field, yielding the following response: @@ -194,6 +195,7 @@ GET sensor-1,sensor_rollup/_rollup_search <1> -------------------------------------------------- // CONSOLE // TEST[continued] +// TEST[s/_rollup_search/_rollup_search?filter_path=took,timed_out,terminated_early,_shards,hits,aggregations/] <1> Note the URI now searches `sensor-1` and `sensor_rollup` at the same time When the search is executed, the Rollup Search endpoint will do two things: