Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.collapse.CollapseBuilder;
import org.elasticsearch.search.internal.InternalSearchResponse;

import java.io.IOException;
import java.util.HashMap;
Expand All @@ -42,11 +43,11 @@
*/
final class ExpandSearchPhase extends SearchPhase {
private final SearchPhaseContext context;
private final SearchResponse searchResponse;
private final Function<SearchResponse, SearchPhase> nextPhaseFactory;
private final InternalSearchResponse searchResponse;
private final Function<InternalSearchResponse, SearchPhase> nextPhaseFactory;

ExpandSearchPhase(SearchPhaseContext context, SearchResponse searchResponse,
Function<SearchResponse, SearchPhase> nextPhaseFactory) {
ExpandSearchPhase(SearchPhaseContext context, InternalSearchResponse searchResponse,
Function<InternalSearchResponse, SearchPhase> nextPhaseFactory) {
super("expand");
this.context = context;
this.searchResponse = searchResponse;
Expand All @@ -65,15 +66,15 @@ private boolean isCollapseRequest() {

@Override
public void run() throws IOException {
if (isCollapseRequest() && searchResponse.getHits().getHits().length > 0) {
if (isCollapseRequest() && searchResponse.hits().getHits().length > 0) {
SearchRequest searchRequest = context.getRequest();
CollapseBuilder collapseBuilder = searchRequest.source().collapse();
final List<InnerHitBuilder> innerHitBuilders = collapseBuilder.getInnerHits();
MultiSearchRequest multiRequest = new MultiSearchRequest();
if (collapseBuilder.getMaxConcurrentGroupRequests() > 0) {
multiRequest.maxConcurrentSearchRequests(collapseBuilder.getMaxConcurrentGroupRequests());
}
for (SearchHit hit : searchResponse.getHits()) {
for (SearchHit hit : searchResponse.hits().getHits()) {
BoolQueryBuilder groupQuery = new BoolQueryBuilder();
Object collapseValue = hit.field(collapseBuilder.getField()).getValue();
if (collapseValue != null) {
Expand All @@ -97,7 +98,7 @@ public void run() throws IOException {
context.getSearchTransport().sendExecuteMultiSearch(multiRequest, context.getTask(),
ActionListener.wrap(response -> {
Iterator<MultiSearchResponse.Item> it = response.iterator();
for (SearchHit hit : searchResponse.getHits()) {
for (SearchHit hit : searchResponse.hits.getHits()) {
for (InnerHitBuilder innerHitBuilder : innerHitBuilders) {
MultiSearchResponse.Item item = it.next();
if (item.isFailure()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

import java.io.IOException;
import java.util.List;
import java.util.function.Function;
import java.util.function.BiFunction;

/**
* This search phase merges the query results from the previous phase together and calculates the topN hits for this search.
Expand All @@ -46,7 +46,7 @@ final class FetchSearchPhase extends SearchPhase {
private final AtomicArray<FetchSearchResult> fetchResults;
private final SearchPhaseController searchPhaseController;
private final AtomicArray<SearchPhaseResult> queryResults;
private final Function<SearchResponse, SearchPhase> nextPhaseFactory;
private final BiFunction<InternalSearchResponse, String, SearchPhase> nextPhaseFactory;
private final SearchPhaseContext context;
private final Logger logger;
private final InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> resultConsumer;
Expand All @@ -55,13 +55,13 @@ final class FetchSearchPhase extends SearchPhase {
SearchPhaseController searchPhaseController,
SearchPhaseContext context) {
this(resultConsumer, searchPhaseController, context,
(response) -> new ExpandSearchPhase(context, response, // collapse only happens if the request has inner hits
(finalResponse) -> sendResponsePhase(finalResponse, context)));
(response, scrollId) -> new ExpandSearchPhase(context, response, // collapse only happens if the request has inner hits
(finalResponse) -> sendResponsePhase(finalResponse, scrollId, context)));
}

FetchSearchPhase(InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> resultConsumer,
SearchPhaseController searchPhaseController,
SearchPhaseContext context, Function<SearchResponse, SearchPhase> nextPhaseFactory) {
SearchPhaseContext context, BiFunction<InternalSearchResponse, String, SearchPhase> nextPhaseFactory) {
super("fetch");
if (context.getNumShards() != resultConsumer.getNumShards()) {
throw new IllegalStateException("number of shards must match the length of the query results but doesn't:"
Expand Down Expand Up @@ -205,14 +205,14 @@ private void moveToNextPhase(SearchPhaseController searchPhaseController,
AtomicArray<? extends SearchPhaseResult> fetchResultsArr) {
final InternalSearchResponse internalResponse = searchPhaseController.merge(context.getRequest().scroll() != null,
reducedQueryPhase, fetchResultsArr.asList(), fetchResultsArr::get);
context.executeNextPhase(this, nextPhaseFactory.apply(context.buildSearchResponse(internalResponse, scrollId)));
context.executeNextPhase(this, nextPhaseFactory.apply(internalResponse, scrollId));
}

private static SearchPhase sendResponsePhase(SearchResponse response, SearchPhaseContext context) {
private static SearchPhase sendResponsePhase(InternalSearchResponse response, String scrollId, SearchPhaseContext context) {
return new SearchPhase("response") {
@Override
public void run() throws IOException {
context.onResponse(response);
context.onResponse(context.buildSearchResponse(response, scrollId));
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,12 @@ void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionL
Collections.singletonMap("someField", new SearchHitField("someField", Collections.singletonList(collapseValue))))},
1, 1.0F);
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null, 1);
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
AtomicReference<SearchResponse> reference = new AtomicReference<>();
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, response, r ->
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, internalSearchResponse, (r) ->
new SearchPhase("test") {
@Override
public void run() throws IOException {
reference.set(r);
reference.set(mockSearchPhaseContext.buildSearchResponse(r, null));
}
}
);
Expand All @@ -123,7 +122,6 @@ public void run() throws IOException {
mockSearchPhaseContext.assertNoFailure();
assertNotNull(reference.get());
SearchResponse theResponse = reference.get();
assertSame(theResponse, response);
assertEquals(numInnerHits, theResponse.getHits().getHits()[0].getInnerHits().size());

for (int innerHitNum = 0; innerHitNum < numInnerHits; innerHitNum++) {
Expand Down Expand Up @@ -167,13 +165,12 @@ void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionL
Collections.singletonMap("someField", new SearchHitField("someField", Collections.singletonList(collapseValue))))}, 1,
1.0F);
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null, 1);
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
AtomicReference<SearchResponse> reference = new AtomicReference<>();
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, response, r ->
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, internalSearchResponse, r ->
new SearchPhase("test") {
@Override
public void run() throws IOException {
reference.set(r);
reference.set(mockSearchPhaseContext.buildSearchResponse(r, null));
}
}
);
Expand Down Expand Up @@ -201,13 +198,12 @@ void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionL
new SearchHit(2, "ID2", new Text("type"),
Collections.singletonMap("someField", new SearchHitField("someField", Collections.singletonList(null))))}, 1, 1.0F);
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null, 1);
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
AtomicReference<SearchResponse> reference = new AtomicReference<>();
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, response, r ->
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, internalSearchResponse, r ->
new SearchPhase("test") {
@Override
public void run() throws IOException {
reference.set(r);
reference.set(mockSearchPhaseContext.buildSearchResponse(r, null));
}
}
);
Expand All @@ -232,13 +228,12 @@ void sendExecuteMultiSearch(MultiSearchRequest request, SearchTask task, ActionL

SearchHits hits = new SearchHits(new SearchHit[0], 1, 1.0f);
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(hits, null, null, null, false, null, 1);
SearchResponse response = mockSearchPhaseContext.buildSearchResponse(internalSearchResponse, null);
AtomicReference<SearchResponse> reference = new AtomicReference<>();
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, response, r ->
ExpandSearchPhase phase = new ExpandSearchPhase(mockSearchPhaseContext, internalSearchResponse, r ->
new SearchPhase("test") {
@Override
public void run() throws IOException {
reference.set(r);
reference.set(mockSearchPhaseContext.buildSearchResponse(r, null));
}
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.Transport;
Expand Down Expand Up @@ -66,10 +67,10 @@ public void testShortcutQueryAndFetchOptimization() throws IOException {
}

FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
(searchResponse) -> new SearchPhase("test") {
(searchResponse, scrollId) -> new SearchPhase("test") {
@Override
public void run() throws IOException {
responseRef.set(searchResponse);
responseRef.set(mockSearchPhaseContext.buildSearchResponse(searchResponse, null));
}
});
assertEquals("fetch", phase.getName());
Expand Down Expand Up @@ -119,10 +120,10 @@ public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRe
};
mockSearchPhaseContext.searchTransport = searchTransportService;
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
(searchResponse) -> new SearchPhase("test") {
(searchResponse, scrollId) -> new SearchPhase("test") {
@Override
public void run() throws IOException {
responseRef.set(searchResponse);
responseRef.set(mockSearchPhaseContext.buildSearchResponse(searchResponse, null));
}
});
assertEquals("fetch", phase.getName());
Expand Down Expand Up @@ -173,10 +174,10 @@ public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRe
};
mockSearchPhaseContext.searchTransport = searchTransportService;
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
(searchResponse) -> new SearchPhase("test") {
(searchResponse, scrollId) -> new SearchPhase("test") {
@Override
public void run() throws IOException {
responseRef.set(searchResponse);
responseRef.set(mockSearchPhaseContext.buildSearchResponse(searchResponse, null));
}
});
assertEquals("fetch", phase.getName());
Expand Down Expand Up @@ -224,10 +225,10 @@ public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRe
mockSearchPhaseContext.searchTransport = searchTransportService;
CountDownLatch latch = new CountDownLatch(1);
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
(searchResponse) -> new SearchPhase("test") {
(searchResponse, scrollId) -> new SearchPhase("test") {
@Override
public void run() throws IOException {
responseRef.set(searchResponse);
responseRef.set(mockSearchPhaseContext.buildSearchResponse(searchResponse, null));
latch.countDown();
}
});
Expand Down Expand Up @@ -290,10 +291,10 @@ public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRe
};
mockSearchPhaseContext.searchTransport = searchTransportService;
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
(searchResponse) -> new SearchPhase("test") {
(searchResponse, scrollId) -> new SearchPhase("test") {
@Override
public void run() throws IOException {
responseRef.set(searchResponse);
responseRef.set(mockSearchPhaseContext.buildSearchResponse(searchResponse, null));
}
});
assertEquals("fetch", phase.getName());
Expand Down Expand Up @@ -339,10 +340,10 @@ public void sendExecuteFetch(Transport.Connection connection, ShardFetchSearchRe
};
mockSearchPhaseContext.searchTransport = searchTransportService;
FetchSearchPhase phase = new FetchSearchPhase(results, controller, mockSearchPhaseContext,
(searchResponse) -> new SearchPhase("test") {
(searchResponse, scrollId) -> new SearchPhase("test") {
@Override
public void run() throws IOException {
responseRef.set(searchResponse);
responseRef.set(mockSearchPhaseContext.buildSearchResponse(searchResponse, null));
}
});
assertEquals("fetch", phase.getName());
Expand All @@ -357,5 +358,4 @@ public void run() throws IOException {
assertEquals(1, mockSearchPhaseContext.releasedSearchContexts.size());
assertTrue(mockSearchPhaseContext.releasedSearchContexts.contains(123L));
}

}