Skip to content

Commit 77a1afc

Browse files
authored
Fix expiration time in async search response (#55435)
This change ensures that we return the latest expiration time when retrieving the response from the index. This commit also fixes a bug that stops the garbage collection of saved responses if the async search index is deleted.
1 parent 9233438 commit 77a1afc

File tree

15 files changed

+218
-97
lines changed

15 files changed

+218
-97
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/asyncsearch/GetAsyncSearchRequest.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@ public class GetAsyncSearchRequest implements Validatable {
3232
private TimeValue waitForCompletion;
3333
private TimeValue keepAlive;
3434

35-
public static final long MIN_KEEPALIVE = TimeValue.timeValueMinutes(1).millis();
36-
3735
private final String id;
3836

3937
public GetAsyncSearchRequest(String id) {
@@ -62,14 +60,7 @@ public void setKeepAlive(TimeValue keepAlive) {
6260

6361
@Override
6462
public Optional<ValidationException> validate() {
65-
final ValidationException validationException = new ValidationException();
66-
if (keepAlive != null && keepAlive.getMillis() < MIN_KEEPALIVE) {
67-
validationException.addValidationError("keep_alive must be greater than 1 minute, got: " + keepAlive.toString());
68-
}
69-
if (validationException.validationErrors().isEmpty()) {
70-
return Optional.empty();
71-
}
72-
return Optional.of(validationException);
63+
return Optional.empty();
7364
}
7465

7566
@Override

client/rest-high-level/src/main/java/org/elasticsearch/client/asyncsearch/SubmitAsyncSearchRequest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@
3636
*/
3737
public class SubmitAsyncSearchRequest implements Validatable {
3838

39-
public static final int DEFAULT_BATCHED_REDUCE_SIZE = 5;
40-
4139
public static long MIN_KEEP_ALIVE = TimeValue.timeValueMinutes(1).millis();
4240

4341
private TimeValue waitForCompletionTimeout;

client/rest-high-level/src/test/java/org/elasticsearch/client/asyncsearch/GetAsyncSearchRequestTests.java

Lines changed: 0 additions & 41 deletions
This file was deleted.

server/src/main/java/org/elasticsearch/search/SearchShardTarget.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public SearchShardTarget(String nodeId, ShardId shardId, @Nullable String cluste
6363

6464
@Nullable
6565
public String getNodeId() {
66-
return nodeId.string();
66+
return nodeId != null ? nodeId.string() : null;
6767
}
6868

6969
public Text getNodeIdText() {

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearch.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1616
import org.elasticsearch.common.settings.ClusterSettings;
1717
import org.elasticsearch.common.settings.IndexScopedSettings;
18+
import org.elasticsearch.common.settings.Setting;
1819
import org.elasticsearch.common.settings.Settings;
1920
import org.elasticsearch.common.settings.SettingsFilter;
20-
import org.elasticsearch.common.unit.TimeValue;
2121
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2222
import org.elasticsearch.env.Environment;
2323
import org.elasticsearch.env.NodeEnvironment;
@@ -39,6 +39,8 @@
3939
import java.util.List;
4040
import java.util.function.Supplier;
4141

42+
import static org.elasticsearch.xpack.search.AsyncSearchMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING;
43+
4244
public final class AsyncSearch extends Plugin implements ActionPlugin {
4345
private final Settings settings;
4446

@@ -84,11 +86,16 @@ public Collection<Object> createComponents(Client client,
8486
AsyncSearchIndexService indexService =
8587
new AsyncSearchIndexService(clusterService, threadPool.getThreadContext(), client, namedWriteableRegistry);
8688
AsyncSearchMaintenanceService maintenanceService =
87-
new AsyncSearchMaintenanceService(nodeEnvironment.nodeId(), threadPool, indexService, TimeValue.timeValueHours(1));
89+
new AsyncSearchMaintenanceService(nodeEnvironment.nodeId(), settings, threadPool, indexService);
8890
clusterService.addListener(maintenanceService);
8991
return Collections.singletonList(maintenanceService);
9092
} else {
9193
return Collections.emptyList();
9294
}
9395
}
96+
97+
@Override
98+
public List<Setting<?>> getSettings() {
99+
return Collections.singletonList(ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING);
100+
}
94101
}

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchIndexService.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -265,14 +265,16 @@ void getResponse(AsyncSearchId searchId,
265265
return;
266266
}
267267

268-
if (restoreResponseHeaders) {
268+
if (restoreResponseHeaders && get.getSource().containsKey(RESPONSE_HEADERS_FIELD)) {
269269
@SuppressWarnings("unchecked")
270270
Map<String, List<String>> responseHeaders = (Map<String, List<String>>) get.getSource().get(RESPONSE_HEADERS_FIELD);
271271
restoreResponseHeadersContext(securityContext.getThreadContext(), responseHeaders);
272272
}
273273

274+
long expirationTime = (long) get.getSource().get(EXPIRATION_TIME_FIELD);
274275
String encoded = (String) get.getSource().get(RESULT_FIELD);
275-
listener.onResponse(encoded != null ? decodeResponse(encoded) : null);
276+
AsyncSearchResponse response = decodeResponse(encoded, expirationTime);
277+
listener.onResponse(encoded != null ? response : null);
276278
},
277279
listener::onFailure
278280
));
@@ -331,11 +333,11 @@ String encodeResponse(AsyncSearchResponse response) throws IOException {
331333
/**
332334
* Decode the provided base-64 bytes into a {@link AsyncSearchResponse}.
333335
*/
334-
AsyncSearchResponse decodeResponse(String value) throws IOException {
336+
AsyncSearchResponse decodeResponse(String value, long expirationTime) throws IOException {
335337
try (ByteBufferStreamInput buf = new ByteBufferStreamInput(ByteBuffer.wrap(Base64.getDecoder().decode(value)))) {
336338
try (StreamInput in = new NamedWriteableAwareStreamInput(buf, registry)) {
337339
in.setVersion(Version.readVersion(in));
338-
return new AsyncSearchResponse(in);
340+
return new AsyncSearchResponse(in, expirationTime);
339341
}
340342
}
341343
}

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchMaintenanceService.java

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
import org.elasticsearch.cluster.ClusterStateListener;
1515
import org.elasticsearch.cluster.routing.IndexRoutingTable;
1616
import org.elasticsearch.common.lease.Releasable;
17+
import org.elasticsearch.common.settings.Setting;
18+
import org.elasticsearch.common.settings.Settings;
1719
import org.elasticsearch.common.unit.TimeValue;
1820
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
1921
import org.elasticsearch.gateway.GatewayService;
@@ -26,30 +28,40 @@
2628
import java.util.concurrent.atomic.AtomicBoolean;
2729

2830
import static org.elasticsearch.xpack.search.AsyncSearchIndexService.EXPIRATION_TIME_FIELD;
31+
import static org.elasticsearch.xpack.search.AsyncSearchIndexService.INDEX;
2932

3033
/**
3134
* A service that runs a periodic cleanup over the async-search index.
3235
*/
3336
class AsyncSearchMaintenanceService implements Releasable, ClusterStateListener {
3437
private static final Logger logger = LogManager.getLogger(AsyncSearchMaintenanceService.class);
3538

39+
/**
40+
* Controls the interval at which the cleanup is scheduled.
41+
* Defaults to 1h. It is an undocumented/expert setting that
42+
* is mainly used by integration tests to make the garbage
43+
* collection of search responses more reactive.
44+
*/
45+
public static final Setting<TimeValue> ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING =
46+
Setting.timeSetting("async_search.index_cleanup_interval", TimeValue.timeValueHours(1), Setting.Property.NodeScope);
47+
3648
private final String localNodeId;
3749
private final ThreadPool threadPool;
3850
private final AsyncSearchIndexService indexService;
3951
private final TimeValue delay;
4052

41-
private final AtomicBoolean isCleanupRunning = new AtomicBoolean(false);
53+
private boolean isCleanupRunning;
4254
private final AtomicBoolean isClosed = new AtomicBoolean(false);
4355
private volatile Scheduler.Cancellable cancellable;
4456

4557
AsyncSearchMaintenanceService(String localNodeId,
58+
Settings nodeSettings,
4659
ThreadPool threadPool,
47-
AsyncSearchIndexService indexService,
48-
TimeValue delay) {
60+
AsyncSearchIndexService indexService) {
4961
this.localNodeId = localNodeId;
5062
this.threadPool = threadPool;
5163
this.indexService = indexService;
52-
this.delay = delay;
64+
this.delay = ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING.get(nodeSettings);
5365
}
5466

5567
@Override
@@ -62,39 +74,38 @@ public void clusterChanged(ClusterChangedEvent event) {
6274
tryStartCleanup(state);
6375
}
6476

65-
void tryStartCleanup(ClusterState state) {
77+
synchronized void tryStartCleanup(ClusterState state) {
6678
if (isClosed.get()) {
6779
return;
6880
}
6981
IndexRoutingTable indexRouting = state.routingTable().index(AsyncSearchIndexService.INDEX);
7082
if (indexRouting == null) {
71-
if (isCleanupRunning.compareAndSet(true, false)) {
72-
close();
73-
}
83+
stop();
7484
return;
7585
}
7686
String primaryNodeId = indexRouting.shard(0).primaryShard().currentNodeId();
7787
if (localNodeId.equals(primaryNodeId)) {
78-
if (isCleanupRunning.compareAndSet(false, true)) {
88+
if (isCleanupRunning == false) {
89+
isCleanupRunning = true;
7990
executeNextCleanup();
8091
}
81-
} else if (isCleanupRunning.compareAndSet(true, false)) {
82-
close();
92+
} else {
93+
stop();
8394
}
8495
}
8596

8697
synchronized void executeNextCleanup() {
87-
if (isClosed.get() == false && isCleanupRunning.get()) {
98+
if (isClosed.get() == false && isCleanupRunning) {
8899
long nowInMillis = System.currentTimeMillis();
89-
DeleteByQueryRequest toDelete = new DeleteByQueryRequest()
100+
DeleteByQueryRequest toDelete = new DeleteByQueryRequest(INDEX)
90101
.setQuery(QueryBuilders.rangeQuery(EXPIRATION_TIME_FIELD).lte(nowInMillis));
91102
indexService.getClient()
92103
.execute(DeleteByQueryAction.INSTANCE, toDelete, ActionListener.wrap(() -> scheduleNextCleanup()));
93104
}
94105
}
95106

96107
synchronized void scheduleNextCleanup() {
97-
if (isClosed.get() == false && isCleanupRunning.get()) {
108+
if (isClosed.get() == false && isCleanupRunning) {
98109
try {
99110
cancellable = threadPool.schedule(this::executeNextCleanup, delay, ThreadPool.Names.GENERIC);
100111
} catch (EsRejectedExecutionException e) {
@@ -107,11 +118,18 @@ synchronized void scheduleNextCleanup() {
107118
}
108119
}
109120

121+
synchronized void stop() {
122+
if (isCleanupRunning) {
123+
if (cancellable != null && cancellable.isCancelled() == false) {
124+
cancellable.cancel();
125+
}
126+
isCleanupRunning = false;
127+
}
128+
}
129+
110130
@Override
111131
public void close() {
112-
if (cancellable != null && cancellable.isCancelled() == false) {
113-
cancellable.cancel();
114-
}
132+
stop();
115133
isClosed.compareAndSet(false, true);
116134
}
117135
}

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,7 @@ private void executeCompletionListeners() {
297297
*/
298298
private AsyncSearchResponse getResponse() {
299299
assert searchResponse.get() != null;
300+
checkCancellation();
300301
return searchResponse.get().toAsyncSearchResponse(this, expirationTimeMillis);
301302
}
302303

@@ -306,15 +307,17 @@ private AsyncSearchResponse getResponse() {
306307
*/
307308
private AsyncSearchResponse getResponseWithHeaders() {
308309
assert searchResponse.get() != null;
310+
checkCancellation();
309311
return searchResponse.get().toAsyncSearchResponseWithHeaders(this, expirationTimeMillis);
310312
}
311313

312314

313315

314316
// checks if the search task should be cancelled
315-
private void checkCancellation() {
317+
private synchronized void checkCancellation() {
316318
long now = System.currentTimeMillis();
317-
if (expirationTimeMillis < now || checkSubmitCancellation.getAsBoolean()) {
319+
if (hasCompleted == false &&
320+
expirationTimeMillis < now || checkSubmitCancellation.getAsBoolean()) {
318321
// we cancel the search task if the initial submit task was cancelled,
319322
// this is needed because the task cancellation mechanism doesn't
320323
// handle the cancellation of grand-children.

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.apache.logging.log4j.LogManager;
99
import org.apache.logging.log4j.Logger;
1010
import org.apache.logging.log4j.message.ParameterizedMessage;
11+
import org.elasticsearch.ExceptionsHelper;
1112
import org.elasticsearch.action.ActionListener;
1213
import org.elasticsearch.action.index.IndexResponse;
1314
import org.elasticsearch.action.search.SearchAction;
@@ -25,6 +26,7 @@
2526
import org.elasticsearch.common.unit.TimeValue;
2627
import org.elasticsearch.common.util.concurrent.ThreadContext;
2728
import org.elasticsearch.index.engine.DocumentMissingException;
29+
import org.elasticsearch.index.engine.VersionConflictEngineException;
2830
import org.elasticsearch.search.SearchService;
2931
import org.elasticsearch.search.aggregations.InternalAggregation;
3032
import org.elasticsearch.tasks.CancellableTask;
@@ -187,7 +189,9 @@ private void onFinalResponse(CancellableTask submitTask,
187189
store.storeFinalResponse(searchTask.getSearchId().getDocId(), threadContext.getResponseHeaders(),response,
188190
ActionListener.wrap(resp -> unregisterTaskAndMoveOn(searchTask, nextAction),
189191
exc -> {
190-
if (exc.getCause() instanceof DocumentMissingException == false) {
192+
Throwable cause = ExceptionsHelper.unwrapCause(exc);
193+
if (cause instanceof DocumentMissingException == false &&
194+
cause instanceof VersionConflictEngineException == false) {
191195
logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]",
192196
searchTask.getSearchId().getEncoded()), exc);
193197
}

0 commit comments

Comments
 (0)