Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
Expand Down Expand Up @@ -61,9 +62,9 @@ public class RollupResponseTranslator {
* Verifies a live-only search response. Essentially just checks for failure then returns
* the response since we have no work to do
*/
public static SearchResponse verifyResponse(MultiSearchResponse.Item normalResponse) {
public static SearchResponse verifyResponse(MultiSearchResponse.Item normalResponse) throws Exception {
if (normalResponse.isFailure()) {
throw new RuntimeException(normalResponse.getFailureMessage(), normalResponse.getFailure());
throw normalResponse.getFailure();
}
return normalResponse.getResponse();
}
Expand All @@ -77,16 +78,30 @@ public static SearchResponse verifyResponse(MultiSearchResponse.Item normalRespo
* on the translation conventions
*/
public static SearchResponse translateResponse(MultiSearchResponse.Item[] rolledMsearch,
InternalAggregation.ReduceContext reduceContext) {
InternalAggregation.ReduceContext reduceContext) throws Exception {

assert rolledMsearch.length > 0;
List<SearchResponse> responses = new ArrayList<>();
for (MultiSearchResponse.Item item : rolledMsearch) {
if (item.isFailure()) {
Exception e = item.getFailure();

// If an index was deleted after execution, give a hint to the user that this is a transient error
if (e instanceof IndexNotFoundException) {
throw new ResourceNotFoundException("Index [" + ((IndexNotFoundException) e).getIndex().getName()
+ "] was not found, likely because it was deleted while the request was in-flight. " +
"Rollup does not support partial search results, please try the request again.");
}

List<SearchResponse> responses = Arrays.stream(rolledMsearch)
.map(item -> {
if (item.isFailure()) {
throw new RuntimeException(item.getFailureMessage(), item.getFailure());
}
return item.getResponse();
}).collect(Collectors.toList());
// Otherwise just throw
throw e;
}

// No error, add to responses
responses.add(item.getResponse());
}

assert responses.size() > 0;
return doCombineResponse(null, responses, reduceContext);
}

Expand Down Expand Up @@ -187,48 +202,45 @@ public static SearchResponse translateResponse(MultiSearchResponse.Item[] rolled
* @param msearchResponses The responses from the msearch, where the first response is the live-index response
*/
public static SearchResponse combineResponses(MultiSearchResponse.Item[] msearchResponses,
InternalAggregation.ReduceContext reduceContext) {
boolean liveMissing = false;
InternalAggregation.ReduceContext reduceContext) throws Exception {

assert msearchResponses.length >= 2;

// The live response is always first
MultiSearchResponse.Item liveResponse = msearchResponses[0];
if (liveResponse.isFailure()) {
Exception e = liveResponse.getFailure();
// If we have a rollup response we can tolerate a missing live response
if (e instanceof IndexNotFoundException) {
logger.warn("\"Live\" index not found during rollup search.", e);
liveMissing = true;
boolean first = true;
SearchResponse liveResponse = null;
List<SearchResponse> rolledResponses = new ArrayList<>();
for (MultiSearchResponse.Item item : msearchResponses) {
if (item.isFailure()) {
Exception e = item.getFailure();

// If an index was deleted after execution, give a hint to the user that this is a transient error
if (e instanceof IndexNotFoundException) {
throw new ResourceNotFoundException("Index [" + ((IndexNotFoundException) e).getIndex() + "] was not found, " +
"likely because it was deleted while the request was in-flight. Rollup does not support partial search results, " +
"please try the request again.", e);
}

// Otherwise just throw
throw e;
}

// No error, add to responses
if (first) {
liveResponse = item.getResponse();
} else {
throw new RuntimeException(liveResponse.getFailureMessage(), liveResponse.getFailure());
rolledResponses.add(item.getResponse());
}
first = false;
}
List<SearchResponse> rolledResponses = Arrays.stream(msearchResponses)
.skip(1)
.map(item -> {
if (item.isFailure()) {
Exception e = item.getFailure();
// If we have a normal response we can tolerate a missing rollup response, although it theoretically
// should be handled by a different code path (verifyResponse)
if (e instanceof IndexNotFoundException) {
logger.warn("Rollup index not found during rollup search.", e);
} else {
throw new RuntimeException(item.getFailureMessage(), item.getFailure());
}
return null;
} else {
return item.getResponse();
}
}).filter(Objects::nonNull).collect(Collectors.toList());

// If we only have a live index left, process it directly
if (rolledResponses.isEmpty() && liveMissing == false) {
return verifyResponse(liveResponse);
} else if (rolledResponses.isEmpty() && liveMissing) {
throw new RuntimeException("No indices (live or rollup) found during rollup search");
// If we only have a live index left, just return it directly. We know it can't be an error already
if (rolledResponses.isEmpty() && liveResponse != null) {
return liveResponse;
} else if (rolledResponses.isEmpty()) {
throw new ResourceNotFoundException("No indices (live or rollup) found during rollup search");
}

return doCombineResponse(liveResponse.getResponse(), rolledResponses, reduceContext);
return doCombineResponse(liveResponse, rolledResponses, reduceContext);
}

private static SearchResponse doCombineResponse(SearchResponse liveResponse, List<SearchResponse> rolledResponses,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ protected void doExecute(Task task, SearchRequest request, ActionListener<Search
}

static SearchResponse processResponses(RollupSearchContext rollupContext, MultiSearchResponse msearchResponse,
InternalAggregation.ReduceContext reduceContext) {
InternalAggregation.ReduceContext reduceContext) throws Exception {
if (rollupContext.hasLiveIndices() && rollupContext.hasRollupIndices()) {
// Both
return RollupResponseTranslator.combineResponses(msearchResponse.getResponses(), reduceContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.CheckedConsumer;
Expand Down Expand Up @@ -109,14 +110,13 @@ public void testLiveFailure() {

public void testRollupFailure() {
MultiSearchResponse.Item[] failure = new MultiSearchResponse.Item[]{
new MultiSearchResponse.Item(null, new IndexNotFoundException("live missing")),
new MultiSearchResponse.Item(null, new RuntimeException("rollup failure"))};

BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
ScriptService scriptService = mock(ScriptService.class);

Exception e = expectThrows(RuntimeException.class,
() -> RollupResponseTranslator.combineResponses(failure,
() -> RollupResponseTranslator.translateResponse(failure,
new InternalAggregation.ReduceContext(bigArrays, scriptService, true)));
assertThat(e.getMessage(), equalTo("rollup failure"));
}
Expand All @@ -129,13 +129,14 @@ public void testLiveMissingRollupMissing() {
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
ScriptService scriptService = mock(ScriptService.class);

Exception e = expectThrows(RuntimeException.class,
ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class,
() -> RollupResponseTranslator.combineResponses(failure,
new InternalAggregation.ReduceContext(bigArrays, scriptService, true)));
assertThat(e.getMessage(), equalTo("No indices (live or rollup) found during rollup search"));
assertThat(e.getMessage(), equalTo("Index [[foo]] was not found, likely because it was deleted while the request was in-flight. " +
"Rollup does not support partial search results, please try the request again."));
}

public void testMissingLiveIndex() {
public void testMissingLiveIndex() throws Exception {
SearchResponse responseWithout = mock(SearchResponse.class);
when(responseWithout.getTook()).thenReturn(new TimeValue(100));
List<InternalAggregation> aggTree = new ArrayList<>(1);
Expand Down Expand Up @@ -174,30 +175,26 @@ public void testMissingLiveIndex() {
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
ScriptService scriptService = mock(ScriptService.class);

SearchResponse response = RollupResponseTranslator.combineResponses(msearch,
new InternalAggregation.ReduceContext(bigArrays, scriptService, true));
assertNotNull(response);
Aggregations responseAggs = response.getAggregations();
assertNotNull(responseAggs);
Avg avg = responseAggs.get("foo");
assertThat(avg.getValue(), equalTo(5.0));
ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class, () -> RollupResponseTranslator.combineResponses(msearch,
new InternalAggregation.ReduceContext(bigArrays, scriptService, true)));
assertThat(e.getMessage(), equalTo("Index [[foo]] was not found, likely because it was deleted while the request was in-flight. " +
"Rollup does not support partial search results, please try the request again."));
}

public void testRolledMissingAggs() {
public void testRolledMissingAggs() throws Exception {
SearchResponse responseWithout = mock(SearchResponse.class);
when(responseWithout.getTook()).thenReturn(new TimeValue(100));

Aggregations mockAggsWithout = new InternalAggregations(Collections.emptyList());
when(responseWithout.getAggregations()).thenReturn(mockAggsWithout);

MultiSearchResponse.Item[] msearch = new MultiSearchResponse.Item[]{
new MultiSearchResponse.Item(null, new IndexNotFoundException("foo")),
new MultiSearchResponse.Item(responseWithout, null)};

BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
ScriptService scriptService = mock(ScriptService.class);

SearchResponse response = RollupResponseTranslator.combineResponses(msearch,
SearchResponse response = RollupResponseTranslator.translateResponse(msearch,
new InternalAggregation.ReduceContext(bigArrays, scriptService, true));
assertNotNull(response);
Aggregations responseAggs = response.getAggregations();
Expand All @@ -214,12 +211,13 @@ public void testMissingRolledIndex() {
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
ScriptService scriptService = mock(ScriptService.class);

SearchResponse finalResponse = RollupResponseTranslator.combineResponses(msearch,
new InternalAggregation.ReduceContext(bigArrays, scriptService, true));
assertThat(finalResponse, equalTo(response));
ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class, () -> RollupResponseTranslator.combineResponses(msearch,
new InternalAggregation.ReduceContext(bigArrays, scriptService, true)));
assertThat(e.getMessage(), equalTo("Index [[foo]] was not found, likely because it was deleted while the request was in-flight. " +
"Rollup does not support partial search results, please try the request again."));
}

public void testVerifyNormal() {
public void testVerifyNormal() throws Exception {
SearchResponse response = mock(SearchResponse.class);
MultiSearchResponse.Item item = new MultiSearchResponse.Item(response, null);

Expand All @@ -234,7 +232,7 @@ public void testVerifyMissingNormal() {
assertThat(e.getMessage(), equalTo("no such index [foo]"));
}

public void testTranslateRollup() {
public void testTranslateRollup() throws Exception {
SearchResponse response = mock(SearchResponse.class);
when(response.getTook()).thenReturn(new TimeValue(100));
List<InternalAggregation> aggTree = new ArrayList<>(1);
Expand Down Expand Up @@ -285,9 +283,10 @@ public void testTranslateMissingRollup() {
ScriptService scriptService = mock(ScriptService.class);
InternalAggregation.ReduceContext context = new InternalAggregation.ReduceContext(bigArrays, scriptService, true);

Exception e = expectThrows(RuntimeException.class,
ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class,
() -> RollupResponseTranslator.translateResponse(new MultiSearchResponse.Item[]{missing}, context));
assertThat(e.getMessage(), equalTo("no such index [foo]"));
assertThat(e.getMessage(), equalTo("Index [foo] was not found, likely because it was deleted while the request was in-flight. " +
"Rollup does not support partial search results, please try the request again."));
}

public void testMissingFilter() {
Expand Down Expand Up @@ -350,7 +349,7 @@ public void testMatchingNameNotFilter() {
equalTo("Expected [filter_foo] to be a FilterAggregation, but was [InternalMax]"));
}

public void testSimpleReduction() {
public void testSimpleReduction() throws Exception {
SearchResponse protoResponse = mock(SearchResponse.class);
when(protoResponse.getTook()).thenReturn(new TimeValue(100));
List<InternalAggregation> protoAggTree = new ArrayList<>(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ public void testMatchingIndexInMetadata() throws IOException {
assertThat(result.getJobCaps().size(), equalTo(1));
}

public void testLiveOnlyProcess() {
public void testLiveOnlyProcess() throws Exception {
String[] indices = new String[]{"foo"};
IndexMetaData indexMetaData = mock(IndexMetaData.class);
ImmutableOpenMap.Builder<String, IndexMetaData> meta = ImmutableOpenMap.builder(1);
Expand All @@ -601,7 +601,7 @@ public void testLiveOnlyProcess() {
assertThat(r, equalTo(response));
}

public void testRollupOnly() throws IOException {
public void testRollupOnly() throws Exception {
String[] indices = new String[]{"foo"};

String jobName = randomAlphaOfLength(5);
Expand Down Expand Up @@ -701,7 +701,7 @@ public void testEmptyMsearch() {
assertThat(e.getMessage(), equalTo("MSearch response was empty, cannot unroll RollupSearch results"));
}

public void testBoth() throws IOException {
public void testBoth() throws Exception {
String[] indices = new String[]{"foo", "bar"};

String jobName = randomAlphaOfLength(5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1217,4 +1217,49 @@ setup:
- match: { aggregations.date_histogram#histo.buckets.3.doc_count: 20 }
- match: { aggregations.date_histogram#histo.buckets.3.max#the_max.value: 4 }

---
"Search error against live index":

- do:
catch: bad_request
rollup.rollup_search:
index: "foo"
body:
size: 0
aggs:
histo:
date_histogram:
field: "timestamp"
interval: "asdfasdf"


---
"Search error against rollup and live index":

- do:
catch: bad_request
rollup.rollup_search:
index: "foo*"
body:
size: 0
aggs:
histo:
date_histogram:
field: "timestamp"
interval: "asdfasdf"

---
"Search error no matching indices":

- do:
catch: /Must specify at least one concrete index/
rollup.rollup_search:
index: "bar*"
body:
size: 0
aggs:
histo:
date_histogram:
field: "timestamp"
interval: "1h"