Skip to content

Commit 39e59b4

Browse files
authored
Extract a common base class for scroll executions (#24979)
Today there is a lot of code duplication and different handling of errors in the two different scroll modes. Yet, it's not clear if we keep both of them but this simplification will help to further refactor this code to also add cross cluster search capabilities. This refactoring also fixes bugs when shards failed due to the node dropped out of the cluster in between scroll requests and failures during the fetch phase of the scroll. Both places where simply ignoring the failure and logging to debug. This can cause issues like #16555
1 parent a301bbb commit 39e59b4

File tree

6 files changed

+707
-350
lines changed

6 files changed

+707
-350
lines changed

core/src/main/java/org/elasticsearch/action/search/AbstractAsyncAction.java

Lines changed: 0 additions & 52 deletions
This file was deleted.
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action.search;
21+
22+
import org.apache.logging.log4j.Logger;
23+
import org.apache.logging.log4j.message.ParameterizedMessage;
24+
import org.apache.logging.log4j.util.Supplier;
25+
import org.elasticsearch.action.ActionListener;
26+
import org.elasticsearch.cluster.node.DiscoveryNode;
27+
import org.elasticsearch.cluster.node.DiscoveryNodes;
28+
import org.elasticsearch.common.Nullable;
29+
import org.elasticsearch.common.util.concurrent.AtomicArray;
30+
import org.elasticsearch.common.util.concurrent.CountDown;
31+
import org.elasticsearch.search.SearchPhaseResult;
32+
import org.elasticsearch.search.SearchShardTarget;
33+
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
34+
import org.elasticsearch.search.internal.InternalSearchResponse;
35+
36+
import java.io.IOException;
37+
import java.util.ArrayList;
38+
import java.util.List;
39+
import java.util.concurrent.atomic.AtomicInteger;
40+
41+
import static org.elasticsearch.action.search.TransportSearchHelper.internalScrollSearchRequest;
42+
43+
/**
44+
* Abstract base class for scroll execution modes. This class encapsulates the basic logic to
45+
* fan out to nodes and execute the query part of the scroll request. Subclasses can for instance
46+
* run separate fetch phases etc.
47+
*/
48+
abstract class SearchScrollAsyncAction<T extends SearchPhaseResult> implements Runnable {
49+
/*
50+
* Some random TODO:
51+
* Today we still have a dedicated executing mode for scrolls while we could simplify this by implementing
52+
* scroll like functionality (mainly syntactic sugar) as an ordinary search with search_after. We could even go further and
53+
* make the scroll entirely stateless and encode the state per shard in the scroll ID.
54+
*
55+
* Today we also hold a context per shard but maybe
56+
* we want the context per coordinating node such that we route the scroll to the same coordinator all the time and hold the context
57+
* here? This would have the advantage that if we loose that node the entire scroll is deal not just one shard.
58+
*
59+
* Additionally there is the possibility to associate the scroll with a seq. id. such that we can talk to any replica as long as
60+
* the shards engine hasn't advanced that seq. id yet. Such a resume is possible and best effort, it could be even a safety net since
61+
* if you rely on indices being read-only things can change in-between without notification or it's hard to detect if there where any
62+
* changes while scrolling. These are all options to improve the current situation which we can look into down the road
63+
*/
64+
protected final Logger logger;
65+
protected final ActionListener<SearchResponse> listener;
66+
protected final ParsedScrollId scrollId;
67+
protected final DiscoveryNodes nodes;
68+
protected final SearchPhaseController searchPhaseController;
69+
protected final SearchScrollRequest request;
70+
private final long startTime;
71+
private final List<ShardSearchFailure> shardFailures = new ArrayList<>();
72+
private final AtomicInteger successfulOps;
73+
74+
protected SearchScrollAsyncAction(ParsedScrollId scrollId, Logger logger, DiscoveryNodes nodes,
75+
ActionListener<SearchResponse> listener, SearchPhaseController searchPhaseController,
76+
SearchScrollRequest request) {
77+
this.startTime = System.currentTimeMillis();
78+
this.scrollId = scrollId;
79+
this.successfulOps = new AtomicInteger(scrollId.getContext().length);
80+
this.logger = logger;
81+
this.listener = listener;
82+
this.nodes = nodes;
83+
this.searchPhaseController = searchPhaseController;
84+
this.request = request;
85+
}
86+
87+
/**
88+
* Builds how long it took to execute the search.
89+
*/
90+
private long buildTookInMillis() {
91+
// protect ourselves against time going backwards
92+
// negative values don't make sense and we want to be able to serialize that thing as a vLong
93+
return Math.max(1, System.currentTimeMillis() - startTime);
94+
}
95+
96+
public final void run() {
97+
final ScrollIdForNode[] context = scrollId.getContext();
98+
if (context.length == 0) {
99+
listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", ShardSearchFailure.EMPTY_ARRAY));
100+
return;
101+
}
102+
final CountDown counter = new CountDown(scrollId.getContext().length);
103+
for (int i = 0; i < context.length; i++) {
104+
ScrollIdForNode target = context[i];
105+
DiscoveryNode node = nodes.get(target.getNode());
106+
final int shardIndex = i;
107+
if (node != null) { // it might happen that a node is going down in-between scrolls...
108+
InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(target.getScrollId(), request);
109+
// we can't create a SearchShardTarget here since we don't know the index and shard ID we are talking to
110+
// we only know the node and the search context ID. Yet, the response will contain the SearchShardTarget
111+
// from the target node instead...that's why we pass null here
112+
SearchActionListener<T> searchActionListener = new SearchActionListener<T>(null, shardIndex) {
113+
114+
@Override
115+
protected void setSearchShardTarget(T response) {
116+
// don't do this - it's part of the response...
117+
assert response.getSearchShardTarget() != null : "search shard target must not be null";
118+
}
119+
120+
@Override
121+
protected void innerOnResponse(T result) {
122+
assert shardIndex == result.getShardIndex() : "shard index mismatch: " + shardIndex + " but got: "
123+
+ result.getShardIndex();
124+
onFirstPhaseResult(shardIndex, result);
125+
if (counter.countDown()) {
126+
SearchPhase phase = moveToNextPhase();
127+
try {
128+
phase.run();
129+
} catch (Exception e) {
130+
// we need to fail the entire request here - the entire phase just blew up
131+
// don't call onShardFailure or onFailure here since otherwise we'd countDown the counter
132+
// again which would result in an exception
133+
listener.onFailure(new SearchPhaseExecutionException(phase.getName(), "Phase failed", e,
134+
ShardSearchFailure.EMPTY_ARRAY));
135+
}
136+
}
137+
}
138+
139+
@Override
140+
public void onFailure(Exception t) {
141+
onShardFailure("query", shardIndex, counter, target.getScrollId(), t, null,
142+
SearchScrollAsyncAction.this::moveToNextPhase);
143+
}
144+
};
145+
executeInitialPhase(node, internalRequest, searchActionListener);
146+
} else { // the node is not available we treat this as a shard failure here
147+
onShardFailure("query", shardIndex, counter, target.getScrollId(),
148+
new IllegalStateException("node [" + target.getNode() + "] is not available"), null,
149+
SearchScrollAsyncAction.this::moveToNextPhase);
150+
}
151+
}
152+
}
153+
154+
synchronized ShardSearchFailure[] buildShardFailures() { // pkg private for testing
155+
if (shardFailures.isEmpty()) {
156+
return ShardSearchFailure.EMPTY_ARRAY;
157+
}
158+
return shardFailures.toArray(new ShardSearchFailure[shardFailures.size()]);
159+
}
160+
161+
// we do our best to return the shard failures, but its ok if its not fully concurrently safe
162+
// we simply try and return as much as possible
163+
private synchronized void addShardFailure(ShardSearchFailure failure) {
164+
shardFailures.add(failure);
165+
}
166+
167+
protected abstract void executeInitialPhase(DiscoveryNode node, InternalScrollSearchRequest internalRequest,
168+
SearchActionListener<T> searchActionListener);
169+
170+
protected abstract SearchPhase moveToNextPhase();
171+
172+
protected abstract void onFirstPhaseResult(int shardId, T result);
173+
174+
protected SearchPhase sendResponsePhase(SearchPhaseController.ReducedQueryPhase queryPhase,
175+
final AtomicArray<? extends SearchPhaseResult> fetchResults) {
176+
return new SearchPhase("fetch") {
177+
@Override
178+
public void run() throws IOException {
179+
sendResponse(queryPhase, fetchResults);
180+
}
181+
};
182+
}
183+
184+
protected final void sendResponse(SearchPhaseController.ReducedQueryPhase queryPhase,
185+
final AtomicArray<? extends SearchPhaseResult> fetchResults) {
186+
try {
187+
final InternalSearchResponse internalResponse = searchPhaseController.merge(true, queryPhase, fetchResults.asList(),
188+
fetchResults::get);
189+
// the scroll ID never changes we always return the same ID. This ID contains all the shards and their context ids
190+
// such that we can talk to them abgain in the next roundtrip.
191+
String scrollId = null;
192+
if (request.scroll() != null) {
193+
scrollId = request.scrollId();
194+
}
195+
listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.getContext().length, successfulOps.get(),
196+
buildTookInMillis(), buildShardFailures()));
197+
} catch (Exception e) {
198+
listener.onFailure(new ReduceSearchPhaseException("fetch", "inner finish failed", e, buildShardFailures()));
199+
}
200+
}
201+
202+
protected void onShardFailure(String phaseName, final int shardIndex, final CountDown counter, final long searchId, Exception failure,
203+
@Nullable SearchShardTarget searchShardTarget,
204+
Supplier<SearchPhase> nextPhaseSupplier) {
205+
if (logger.isDebugEnabled()) {
206+
logger.debug((Supplier<?>) () -> new ParameterizedMessage("[{}] Failed to execute {} phase", searchId, phaseName), failure);
207+
}
208+
addShardFailure(new ShardSearchFailure(failure, searchShardTarget));
209+
int successfulOperations = successfulOps.decrementAndGet();
210+
assert successfulOperations >= 0 : "successfulOperations must be >= 0 but was: " + successfulOperations;
211+
if (counter.countDown()) {
212+
if (successfulOps.get() == 0) {
213+
listener.onFailure(new SearchPhaseExecutionException(phaseName, "all shards failed", failure, buildShardFailures()));
214+
} else {
215+
SearchPhase phase = nextPhaseSupplier.get();
216+
try {
217+
phase.run();
218+
} catch (Exception e) {
219+
e.addSuppressed(failure);
220+
listener.onFailure(new SearchPhaseExecutionException(phase.getName(), "Phase failed", e,
221+
ShardSearchFailure.EMPTY_ARRAY));
222+
}
223+
}
224+
}
225+
}
226+
}

0 commit comments

Comments
 (0)