Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
import co.elastic.clients.elasticsearch.core.bulk.UpdateOperation;
import co.elastic.clients.elasticsearch.core.mget.MultiGetOperation;
import co.elastic.clients.elasticsearch.core.msearch.MultisearchBody;
import co.elastic.clients.elasticsearch.core.search.Highlight;
import co.elastic.clients.elasticsearch.core.search.Rescore;
import co.elastic.clients.elasticsearch.core.search.SourceConfig;
Expand Down Expand Up @@ -1047,26 +1048,120 @@ public <T> SearchRequest searchRequest(Query query, @Nullable Class<T> clazz, In
public MsearchRequest searchMsearchRequest(
List<ElasticsearchTemplate.MultiSearchQueryParameter> multiSearchQueryParameters) {

// basically the same stuff as in prepareSearchRequest, but the new Elasticsearch has different builders for a
// normal search and msearch
return MsearchRequest.of(mrb -> {
multiSearchQueryParameters.forEach(param -> {
ElasticsearchPersistentEntity<?> persistentEntity = getPersistentEntity(param.clazz);

var query = param.query;
mrb.searches(sb -> sb //
.header(h -> h //
.index(Arrays.asList(param.index.getIndexNames())) //
// todo #2156 add remaining flags for header
) //
.body(bb -> bb //
.query(getQuery(param.query, param.clazz))//
.seqNoPrimaryTerm(persistentEntity.hasSeqNoPrimaryTermProperty()).version(true)
// todo #2156 add remaining flags for body
.header(h -> {
h //
.index(Arrays.asList(param.index.getIndexNames())) //
.routing(query.getRoute()) //
.searchType(searchType(query.getSearchType())) //
.requestCache(query.getRequestCache()) //
;

if (query.getPreference() != null) {
h.preference(query.getPreference());
}

return h;
}) //
.body(bb -> {
bb //
.query(getQuery(query, param.clazz))//
.seqNoPrimaryTerm(persistentEntity != null ? persistentEntity.hasSeqNoPrimaryTermProperty() : null) //
.version(true) //
.trackScores(query.getTrackScores()) //
.source(getSourceConfig(query)) //
.timeout(timeStringMs(query.getTimeout())) //
;

if (query.getPageable().isPaged()) {
bb //
.from((int) query.getPageable().getOffset()) //
.size(query.getPageable().getPageSize());
}

if (!isEmpty(query.getFields())) {
bb.fields(fb -> {
query.getFields().forEach(fb::field);
return fb;
});
}

if (!isEmpty(query.getStoredFields())) {
bb.storedFields(query.getStoredFields());
}

if (query.isLimiting()) {
bb.size(query.getMaxResults());
}

if (query.getMinScore() > 0) {
bb.minScore((double) query.getMinScore());
}

if (query.getSort() != null) {
List<SortOptions> sortOptions = getSortOptions(query.getSort(), persistentEntity);

if (!sortOptions.isEmpty()) {
bb.sort(sortOptions);
}
}

addHighlight(query, bb);

if (query.getExplain()) {
bb.explain(true);
}

if (!isEmpty(query.getSearchAfter())) {
bb.searchAfter(query.getSearchAfter().stream().map(Object::toString).collect(Collectors.toList()));
}

query.getRescorerQueries().forEach(rescorerQuery -> {
bb.rescore(getRescore(rescorerQuery));
});

if (!query.getRuntimeFields().isEmpty()) {

Map<String, List<RuntimeField>> runtimeMappings = new HashMap<>();
query.getRuntimeFields().forEach(runtimeField -> {
runtimeMappings.put(runtimeField.getName(), Collections.singletonList(RuntimeField.of(rt -> rt //
.type(RuntimeFieldType._DESERIALIZER.parse(runtimeField.getType())) //
.script(s -> s.inline(is -> is.source(runtimeField.getScript()))))));
});
bb.runtimeMappings(runtimeMappings);
}

if (!isEmpty(query.getIndicesBoost())) {
Map<String, Double> boosts = new LinkedHashMap<>();
query.getIndicesBoost().forEach(indexBoost -> {
boosts.put(indexBoost.getIndexName(), (double) indexBoost.getBoost());
});
// noinspection unchecked
bb.indicesBoost(boosts);
}

if (query instanceof NativeQuery) {
prepareNativeSearch((NativeQuery) query, bb);
}
return bb;
} //
) //
);

});

return mrb;
});
}


private <T> void prepareSearchRequest(Query query, @Nullable Class<T> clazz, IndexCoordinates indexCoordinates,
SearchRequest.Builder builder, boolean forCount, boolean useScroll) {

Expand Down Expand Up @@ -1225,6 +1320,16 @@ private void addHighlight(Query query, SearchRequest.Builder builder) {
builder.highlight(highlight);
}

private void addHighlight(Query query, MultisearchBody.Builder builder) {

Highlight highlight = query.getHighlightQuery()
.map(highlightQuery -> new HighlightQueryBuilder(elasticsearchConverter.getMappingContext())
.getHighlight(highlightQuery.getHighlight(), highlightQuery.getType()))
.orElse(null);

builder.highlight(highlight);
}

private List<SortOptions> getSortOptions(Sort sort, @Nullable ElasticsearchPersistentEntity<?> persistentEntity) {
return sort.stream().map(order -> getSortOptions(order, persistentEntity)).collect(Collectors.toList());
}
Expand Down Expand Up @@ -1289,6 +1394,7 @@ private SortOptions getSortOptions(Sort.Order order, @Nullable ElasticsearchPers
}
}

@SuppressWarnings("DuplicatedCode")
private void prepareNativeSearch(NativeQuery query, SearchRequest.Builder builder) {

query.getScriptedFields().forEach(scriptedField -> {
Expand All @@ -1307,6 +1413,25 @@ private void prepareNativeSearch(NativeQuery query, SearchRequest.Builder builde
// todo #2150 searchExt, currently not supported by the new client
}

@SuppressWarnings("DuplicatedCode")
private void prepareNativeSearch(NativeQuery query, MultisearchBody.Builder builder) {

query.getScriptedFields().forEach(scriptedField -> {
builder.scriptFields(scriptedField.getFieldName(), sf -> sf.script(getScript(scriptedField.getScriptData())));
});

builder //
.suggest(query.getSuggester()) //
.collapse(query.getFieldCollapse()) //
;

if (!isEmpty(query.getAggregations())) {
builder.aggregations(query.getAggregations());
}

// todo #2150 searchExt, currently not supported by the new client
}

@Nullable
private co.elastic.clients.elasticsearch._types.query_dsl.Query getQuery(@Nullable Query query,
@Nullable Class<?> clazz) {
Expand Down