Skip to content

Commit f3f1b69

Browse files
authored
Preserve final response headers in asynchronous search (#54349)
This change adds the response headers of the original search request in the stored response in order to be able to restore them when retrieving a result from the async-search index. It also ensures that response headers are preserved for users that retrieve a final response on a running search task. Partial response can eventually return response headers too but this change only ensures that they are present when the response if final. Relates #33936
1 parent 9e24558 commit f3f1b69

File tree

12 files changed

+360
-34
lines changed

12 files changed

+360
-34
lines changed

x-pack/plugin/async-search/build.gradle

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,3 @@
1-
/*
2-
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3-
* or more contributor license agreements. Licensed under the Elastic License;
4-
* you may not use this file except in compliance with the Elastic License.
5-
*/
6-
71
evaluationDependsOn(xpackModule('core'))
82

93
apply plugin: 'elasticsearch.esplugin'
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import org.elasticsearch.gradle.info.BuildParams
2+
3+
apply plugin: 'elasticsearch.testclusters'
4+
apply plugin: 'elasticsearch.esplugin'
5+
6+
esplugin {
7+
description 'Deprecated query plugin'
8+
classname 'org.elasticsearch.query.DeprecatedQueryPlugin'
9+
}
10+
11+
restResources {
12+
restApi {
13+
includeCore '_common', 'indices', 'index'
14+
includeXpack 'async_search'
15+
}
16+
}
17+
18+
testClusters.integTest {
19+
testDistribution = 'DEFAULT'
20+
// add the deprecated query plugin
21+
plugin file(project(':x-pack:plugin:async-search:qa:rest').tasks.bundlePlugin.archiveFile)
22+
setting 'xpack.security.enabled', 'false'
23+
}
24+
25+
test.enabled = false
26+
27+
check.dependsOn integTest
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.query;
8+
9+
import org.apache.logging.log4j.LogManager;
10+
import org.apache.lucene.search.MatchAllDocsQuery;
11+
import org.apache.lucene.search.Query;
12+
import org.elasticsearch.common.ParsingException;
13+
import org.elasticsearch.common.io.stream.StreamInput;
14+
import org.elasticsearch.common.io.stream.StreamOutput;
15+
import org.elasticsearch.common.logging.DeprecationLogger;
16+
import org.elasticsearch.common.xcontent.ObjectParser;
17+
import org.elasticsearch.common.xcontent.XContentBuilder;
18+
import org.elasticsearch.common.xcontent.XContentParser;
19+
import org.elasticsearch.index.query.AbstractQueryBuilder;
20+
import org.elasticsearch.index.query.QueryShardContext;
21+
22+
import java.io.IOException;
23+
24+
public class DeprecatedQueryBuilder extends AbstractQueryBuilder<DeprecatedQueryBuilder> {
25+
private static final DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger("Deprecated"));
26+
27+
public static final String NAME = "deprecated";
28+
29+
public DeprecatedQueryBuilder() {}
30+
31+
DeprecatedQueryBuilder(StreamInput in) throws IOException {
32+
super(in);
33+
}
34+
35+
@Override
36+
protected void doWriteTo(StreamOutput out) {}
37+
38+
@Override
39+
protected void doXContent(XContentBuilder builder, Params params) throws IOException {
40+
builder.startObject(NAME);
41+
builder.endObject();
42+
}
43+
44+
private static final ObjectParser<DeprecatedQueryBuilder, Void> PARSER = new ObjectParser<>(NAME, DeprecatedQueryBuilder::new);
45+
46+
public static DeprecatedQueryBuilder fromXContent(XContentParser parser) {
47+
try {
48+
PARSER.apply(parser, null);
49+
return new DeprecatedQueryBuilder();
50+
} catch (IllegalArgumentException e) {
51+
throw new ParsingException(parser.getTokenLocation(), e.getMessage(), e);
52+
}
53+
}
54+
55+
@Override
56+
protected Query doToQuery(QueryShardContext context) {
57+
deprecationLogger.deprecated("[deprecated] query");
58+
return new MatchAllDocsQuery();
59+
}
60+
61+
@Override
62+
protected boolean doEquals(DeprecatedQueryBuilder other) {
63+
return false;
64+
}
65+
66+
@Override
67+
protected int doHashCode() {
68+
return 0;
69+
}
70+
71+
@Override
72+
public String getWriteableName() {
73+
return NAME;
74+
}
75+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.query;
8+
9+
import org.elasticsearch.plugins.Plugin;
10+
import org.elasticsearch.plugins.SearchPlugin;
11+
12+
import java.util.List;
13+
14+
import static java.util.Collections.singletonList;
15+
16+
public class DeprecatedQueryPlugin extends Plugin implements SearchPlugin {
17+
18+
public DeprecatedQueryPlugin() {}
19+
20+
@Override
21+
public List<QuerySpec<?>> getQueries() {
22+
return singletonList(new QuerySpec<>("deprecated", DeprecatedQueryBuilder::new, p -> DeprecatedQueryBuilder.fromXContent(p)));
23+
}
24+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.qa;
8+
9+
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
10+
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
11+
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
12+
13+
public class AsyncSearchRestIT extends ESClientYamlSuiteTestCase {
14+
15+
public AsyncSearchRestIT(final ClientYamlTestCandidate testCandidate) {
16+
super(testCandidate);
17+
}
18+
19+
@ParametersFactory
20+
public static Iterable<Object[]> parameters() throws Exception {
21+
return ESClientYamlSuiteTestCase.createParameters();
22+
}
23+
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
setup:
2+
- do:
3+
indices.create:
4+
index: test-1
5+
body:
6+
settings:
7+
number_of_shards: "2"
8+
9+
- do:
10+
indices.create:
11+
index: test-2
12+
body:
13+
settings:
14+
number_of_shards: "1"
15+
16+
- do:
17+
indices.create:
18+
index: test-3
19+
body:
20+
settings:
21+
number_of_shards: "3"
22+
23+
- do:
24+
index:
25+
index: test-2
26+
body: { max: 2 }
27+
28+
- do:
29+
index:
30+
index: test-1
31+
body: { max: 1 }
32+
33+
- do:
34+
index:
35+
index: test-3
36+
body: { max: 3 }
37+
38+
- do:
39+
indices.refresh: {}
40+
41+
---
42+
"Deprecation when retrieved from task":
43+
- skip:
44+
features: "warnings"
45+
46+
- do:
47+
warnings:
48+
- '[deprecated] query'
49+
async_search.submit:
50+
index: test-*
51+
wait_for_completion_timeout: 10s
52+
body:
53+
query:
54+
deprecated: {}
55+
56+
- is_false: id
57+
- match: { is_partial: false }
58+
- length: { response.hits.hits: 3 }
59+
60+
---
61+
"Deprecation when retrieved from store":
62+
- skip:
63+
features: "warnings"
64+
65+
- do:
66+
warnings:
67+
- '[deprecated] query'
68+
async_search.submit:
69+
index: test-*
70+
wait_for_completion_timeout: 10s
71+
keep_on_completion: true
72+
body:
73+
query:
74+
deprecated: {}
75+
76+
- set: { id: id }
77+
- match: { is_partial: false }
78+
- length: { response.hits.hits: 3 }
79+
80+
- do:
81+
warnings:
82+
- '[deprecated] query'
83+
async_search.get:
84+
id: "$id"
85+
86+
- match: { is_partial: false }
87+
- length: { response.hits.hits: 3 }
88+
89+
- do:
90+
async_search.delete:
91+
id: "$id"
92+
93+
- match: { acknowledged: true }
94+
95+
- do:
96+
catch: missing
97+
async_search.get:
98+
id: "$id"
99+
100+
- do:
101+
catch: missing
102+
async_search.delete:
103+
id: "$id"
104+

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchIndexService.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.Base64;
4848
import java.util.Collections;
4949
import java.util.HashMap;
50+
import java.util.List;
5051
import java.util.Map;
5152

5253
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@@ -63,6 +64,7 @@ class AsyncSearchIndexService {
6364
public static final String INDEX = ".async-search";
6465

6566
public static final String HEADERS_FIELD = "headers";
67+
public static final String RESPONSE_HEADERS_FIELD = "response_headers";
6668
public static final String EXPIRATION_TIME_FIELD = "expiration_time";
6769
public static final String RESULT_FIELD = "result";
6870

@@ -86,6 +88,10 @@ private static XContentBuilder mappings() throws IOException {
8688
.field("type", "object")
8789
.field("enabled", "false")
8890
.endObject()
91+
.startObject(RESPONSE_HEADERS_FIELD)
92+
.field("type", "object")
93+
.field("enabled", "false")
94+
.endObject()
8995
.startObject(RESULT_FIELD)
9096
.field("type", "object")
9197
.field("enabled", "false")
@@ -172,9 +178,11 @@ void storeInitialResponse(String docId,
172178
* Stores the final response if the place-holder document is still present (update).
173179
*/
174180
void storeFinalResponse(String docId,
181+
Map<String, List<String>> responseHeaders,
175182
AsyncSearchResponse response,
176183
ActionListener<UpdateResponse> listener) throws IOException {
177184
Map<String, Object> source = new HashMap<>();
185+
source.put(RESPONSE_HEADERS_FIELD, responseHeaders);
178186
source.put(RESULT_FIELD, encodeResponse(response));
179187
UpdateRequest request = new UpdateRequest()
180188
.index(INDEX)
@@ -249,8 +257,11 @@ AsyncSearchTask getTask(TaskManager taskManager, AsyncSearchId searchId) throws
249257
/**
250258
* Gets the response from the index if present, or delegate a {@link ResourceNotFoundException}
251259
* failure to the provided listener if not.
260+
* When the provided <code>restoreResponseHeaders</code> is <code>true</code>, this method also restores the
261+
* response headers of the original request in the current thread context.
252262
*/
253263
void getResponse(AsyncSearchId searchId,
264+
boolean restoreResponseHeaders,
254265
ActionListener<AsyncSearchResponse> listener) {
255266
final Authentication current = securityContext.getAuthentication();
256267
GetRequest internalGet = new GetRequest(INDEX)
@@ -271,6 +282,12 @@ void getResponse(AsyncSearchId searchId,
271282
return;
272283
}
273284

285+
if (restoreResponseHeaders) {
286+
@SuppressWarnings("unchecked")
287+
Map<String, List<String>> responseHeaders = (Map<String, List<String>>) get.getSource().get(RESPONSE_HEADERS_FIELD);
288+
restoreResponseHeadersContext(securityContext.getThreadContext(), responseHeaders);
289+
}
290+
274291
String encoded = (String) get.getSource().get(RESULT_FIELD);
275292
listener.onResponse(encoded != null ? decodeResponse(encoded) : null);
276293
},
@@ -339,4 +356,15 @@ AsyncSearchResponse decodeResponse(String value) throws IOException {
339356
}
340357
}
341358
}
359+
360+
/**
361+
* Restores the provided <code>responseHeaders</code> to the current thread context.
362+
*/
363+
static void restoreResponseHeadersContext(ThreadContext threadContext, Map<String, List<String>> responseHeaders) {
364+
for (Map.Entry<String, List<String>> entry : responseHeaders.entrySet()) {
365+
for (String value : entry.getValue()) {
366+
threadContext.addResponseHeader(entry.getKey(), value);
367+
}
368+
}
369+
}
342370
}

0 commit comments

Comments
 (0)