Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ final class DefaultSearchContext extends SearchContext {
private final QuerySearchResult queryResult;
private final FetchSearchResult fetchResult;
private final float queryBoost;
private final boolean lowLevelCancellation;
private TimeValue timeout;
// terminate after count
private int terminateAfter = DEFAULT_TERMINATE_AFTER;
Expand All @@ -118,7 +119,6 @@ final class DefaultSearchContext extends SearchContext {
private int trackTotalHitsUpTo = SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO;
private FieldDoc searchAfter;
private CollapseContext collapse;
private boolean lowLevelCancellation;
// filter for sliced scroll
private SliceBuilder sliceBuilder;
private SearchShardTask task;
Expand Down Expand Up @@ -156,7 +156,7 @@ final class DefaultSearchContext extends SearchContext {
DefaultSearchContext(SearchContextId id, ShardSearchRequest request, SearchShardTarget shardTarget,
Engine.Searcher engineSearcher, ClusterService clusterService, IndexService indexService,
IndexShard indexShard, BigArrays bigArrays, LongSupplier relativeTimeSupplier, TimeValue timeout,
FetchPhase fetchPhase) throws IOException {
FetchPhase fetchPhase, boolean lowLevelCancellation) throws IOException {
this.id = id;
this.request = request;
this.fetchPhase = fetchPhase;
Expand All @@ -172,12 +172,13 @@ final class DefaultSearchContext extends SearchContext {
this.indexService = indexService;
this.clusterService = clusterService;
this.searcher = new ContextIndexSearcher(engineSearcher.getIndexReader(), engineSearcher.getSimilarity(),
engineSearcher.getQueryCache(), engineSearcher.getQueryCachingPolicy());
engineSearcher.getQueryCache(), engineSearcher.getQueryCachingPolicy(), lowLevelCancellation);
this.relativeTimeSupplier = relativeTimeSupplier;
this.timeout = timeout;
queryShardContext = indexService.newQueryShardContext(request.shardId().id(), searcher,
request::nowInMillis, shardTarget.getClusterAlias());
queryBoost = request.indexBoost();
this.lowLevelCancellation = lowLevelCancellation;
}

@Override
Expand Down Expand Up @@ -563,10 +564,6 @@ public boolean lowLevelCancellation() {
return lowLevelCancellation;
}

public void lowLevelCancellation(boolean lowLevelCancellation) {
this.lowLevelCancellation = lowLevelCancellation;
}

@Override
public FieldDoc searchAfter() {
return searchAfter;
Expand Down
16 changes: 7 additions & 9 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -339,10 +339,9 @@ public void executeDfsPhase(ShardSearchRequest request, SearchShardTask task, Ac
}

private DfsSearchResult executeDfsPhase(SearchRewriteContext rewriteContext, SearchShardTask task) throws IOException {
final SearchContext context = createAndPutContext(rewriteContext);
final SearchContext context = createAndPutContext(rewriteContext, task);
context.incRef();
try {
context.setTask(task);
contextProcessing(context);
dfsPhase.execute(context);
contextProcessedSuccessfully(context);
Expand Down Expand Up @@ -422,11 +421,10 @@ private <T> void runAsync(SearchContextId contextId, Supplier<T> executable, Act
}

private SearchPhaseResult executeQueryPhase(SearchRewriteContext rewriteContext, SearchShardTask task) throws Exception {
final SearchContext context = createAndPutContext(rewriteContext);
final SearchContext context = createAndPutContext(rewriteContext, task);
final ShardSearchRequest request = rewriteContext.request;
context.incRef();
try {
context.setTask(task);
final long afterQueryTime;
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
contextProcessing(context);
Expand Down Expand Up @@ -626,8 +624,8 @@ private SearchContext findContext(SearchContextId contextId, TransportRequest re
}
}

final SearchContext createAndPutContext(SearchRewriteContext rewriteContext) throws IOException {
SearchContext context = createContext(rewriteContext);
final SearchContext createAndPutContext(SearchRewriteContext rewriteContext, SearchShardTask task) throws IOException {
SearchContext context = createContext(rewriteContext, task);
onNewContext(context);
boolean success = false;
try {
Expand Down Expand Up @@ -660,7 +658,7 @@ private void onNewContext(SearchContext context) {
}
}

final SearchContext createContext(SearchRewriteContext rewriteContext) throws IOException {
final SearchContext createContext(SearchRewriteContext rewriteContext, SearchShardTask searchTask) throws IOException {
final DefaultSearchContext context = createSearchContext(rewriteContext, defaultSearchTimeout);
try {
final ShardSearchRequest request = rewriteContext.request;
Expand All @@ -684,6 +682,7 @@ final SearchContext createContext(SearchRewriteContext rewriteContext) throws IO
if (context.size() == -1) {
context.size(DEFAULT_SIZE);
}
context.setTask(searchTask);

// pre process
dfsPhase.preProcess(context);
Expand All @@ -696,7 +695,6 @@ final SearchContext createContext(SearchRewriteContext rewriteContext) throws IO
keepAlive = request.scroll().keepAlive().millis();
}
contextScrollKeepAlive(context, keepAlive);
context.lowLevelCancellation(lowLevelCancellation);
} catch (Exception e) {
context.close();
throw e;
Expand Down Expand Up @@ -731,7 +729,7 @@ private DefaultSearchContext createSearchContext(SearchRewriteContext rewriteCon
DefaultSearchContext searchContext = new DefaultSearchContext(
new SearchContextId(readerId, idGenerator.incrementAndGet()),
request, shardTarget, searcher, clusterService, indexService, indexShard, bigArrays,
threadPool::relativeTimeInMillis, timeout, fetchPhase);
threadPool::relativeTimeInMillis, timeout, fetchPhase, lowLevelCancellation);
success = true;
return searchContext;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,23 +82,20 @@ public class ContextIndexSearcher extends IndexSearcher {
private MutableQueryTimeout cancellable;

public ContextIndexSearcher(IndexReader reader, Similarity similarity,
QueryCache queryCache, QueryCachingPolicy queryCachingPolicy) throws IOException {
this(reader, similarity, queryCache, queryCachingPolicy, new MutableQueryTimeout());
QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
boolean wrapWithExitableDirectoryReader) throws IOException {
this(reader, similarity, queryCache, queryCachingPolicy, new MutableQueryTimeout(), wrapWithExitableDirectoryReader);
}

// TODO: Make the 2nd constructor private so that the IndexReader is always wrapped.
// Some issues must be fixed:
// - regarding tests deriving from AggregatorTestCase and more specifically the use of searchAndReduce and
// the ShardSearcher sub-searchers.
// - tests that use a MultiReader
public ContextIndexSearcher(IndexReader reader, Similarity similarity,
QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
MutableQueryTimeout cancellable) throws IOException {
super(cancellable != null ? new ExitableDirectoryReader((DirectoryReader) reader, cancellable) : reader);
private ContextIndexSearcher(IndexReader reader, Similarity similarity,
QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
MutableQueryTimeout cancellable,
boolean wrapWithExitableDirectoryReader) throws IOException {
super(wrapWithExitableDirectoryReader ? new ExitableDirectoryReader((DirectoryReader) reader, cancellable) : reader);
setSimilarity(similarity);
setQueryCache(queryCache);
setQueryCachingPolicy(queryCachingPolicy);
this.cancellable = cancellable != null ? cancellable : new MutableQueryTimeout();
this.cancellable = cancellable;
}

public void setProfiler(QueryProfiler profiler) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public int getDocCount() {

private static class ExitableIntersectVisitor implements PointValues.IntersectVisitor {

private static final int MAX_CALLS_BEFORE_QUERY_TIMEOUT_CHECK = (1 << 10) - 1; // 1023
private static final int MAX_CALLS_BEFORE_QUERY_TIMEOUT_CHECK = (1 << 13) - 1; // 8191

private final PointValues.IntersectVisitor in;
private final QueryCancellation queryCancellation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ public QueryPhase() {
public void preProcess(SearchContext context) {
final Runnable cancellation;
if (context.lowLevelCancellation()) {
SearchShardTask task = context.getTask();
cancellation = context.searcher().addQueryCancellation(() -> {
if (task.isCancelled()) {
SearchShardTask task = context.getTask();
if (task != null && task.isCancelled()) {
throw new TaskCancelledException("cancelled");
}
});
Expand Down Expand Up @@ -282,9 +282,9 @@ static boolean executeInternal(SearchContext searchContext) throws QueryPhaseExe
}

if (searchContext.lowLevelCancellation()) {
SearchShardTask task = searchContext.getTask();
searcher.addQueryCancellation(() -> {
if (task.isCancelled()) {
SearchShardTask task = searchContext.getTask();
if (task != null && task.isCancelled()) {
throw new TaskCancelledException("cancelled");
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void testPreProcess() throws Exception {
SearchShardTarget target = new SearchShardTarget("node", shardId, null, OriginalIndices.NONE);

DefaultSearchContext context1 = new DefaultSearchContext(new SearchContextId(UUIDs.randomBase64UUID(), 1L),
shardSearchRequest, target, searcher, null, indexService, indexShard, bigArrays, null, timeout, null);
shardSearchRequest, target, searcher, null, indexService, indexShard, bigArrays, null, timeout, null, false);
context1.from(300);

// resultWindow greater than maxResultWindow and scrollContext is null
Expand Down Expand Up @@ -162,7 +162,7 @@ public void testPreProcess() throws Exception {

// rescore is null but sliceBuilder is not null
DefaultSearchContext context2 = new DefaultSearchContext(new SearchContextId(UUIDs.randomBase64UUID(), 2L),
shardSearchRequest, target, searcher, null, indexService, indexShard, bigArrays, null, timeout, null);
shardSearchRequest, target, searcher, null, indexService, indexShard, bigArrays, null, timeout, null, false);

SliceBuilder sliceBuilder = mock(SliceBuilder.class);
int numSlices = maxSlicesPerScroll + randomIntBetween(1, 100);
Expand All @@ -179,7 +179,7 @@ public void testPreProcess() throws Exception {
when(shardSearchRequest.indexBoost()).thenReturn(AbstractQueryBuilder.DEFAULT_BOOST);

DefaultSearchContext context3 = new DefaultSearchContext(new SearchContextId(UUIDs.randomBase64UUID(), 3L),
shardSearchRequest, target, searcher, null, indexService, indexShard, bigArrays, null, timeout, null);
shardSearchRequest, target, searcher, null, indexService, indexShard, bigArrays, null, timeout, null, false);
ParsedQuery parsedQuery = ParsedQuery.parsedMatchAllQuery();
context3.sliceBuilder(null).parsedQuery(parsedQuery).preProcess(false);
assertEquals(context3.query(), context3.buildFilteredQuery(parsedQuery.query()));
Expand All @@ -189,7 +189,7 @@ public void testPreProcess() throws Exception {
when(shardSearchRequest.indexRoutings()).thenReturn(new String[0]);

DefaultSearchContext context4 = new DefaultSearchContext(new SearchContextId(UUIDs.randomBase64UUID(), 4L),
shardSearchRequest, target, searcher, null, indexService, indexShard, bigArrays, null, timeout, null);
shardSearchRequest, target, searcher, null, indexService, indexShard, bigArrays, null, timeout, null, false);
context4.sliceBuilder(new SliceBuilder(1,2)).parsedQuery(parsedQuery).preProcess(false);
Query query1 = context4.query();
context4.sliceBuilder(new SliceBuilder(0,2)).parsedQuery(parsedQuery).preProcess(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ public static void cleanup() throws IOException {
}

public void testAddingCancellationActions() throws IOException {
ContextIndexSearcher searcher = new ContextIndexSearcher(reader,
IndexSearcher.getDefaultSimilarity(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy());
ContextIndexSearcher searcher = new ContextIndexSearcher(reader, IndexSearcher.getDefaultSimilarity(),
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), true);
NullPointerException npe = expectThrows(NullPointerException.class, () -> searcher.addQueryCancellation(null));
assertEquals("cancellation runnable should not be null", npe.getMessage());

Expand All @@ -100,8 +100,8 @@ public void testAddingCancellationActions() throws IOException {
public void testCancellableCollector() throws IOException {
TotalHitCountCollector collector1 = new TotalHitCountCollector();
Runnable cancellation = () -> { throw new TaskCancelledException("cancelled"); };
ContextIndexSearcher searcher = new ContextIndexSearcher(reader,
IndexSearcher.getDefaultSimilarity(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy());
ContextIndexSearcher searcher = new ContextIndexSearcher(reader, IndexSearcher.getDefaultSimilarity(),
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), true);

searcher.search(new MatchAllDocsQuery(), collector1);
assertThat(collector1.getTotalHits(), equalTo(reader.numDocs()));
Expand All @@ -116,14 +116,14 @@ public void testCancellableCollector() throws IOException {
assertThat(collector2.getTotalHits(), equalTo(reader.numDocs()));
}

public void testCancellableDirectoryReader() throws IOException {
public void testExitableDirectoryReader() throws IOException {
AtomicBoolean cancelled = new AtomicBoolean(true);
Runnable cancellation = () -> {
if (cancelled.get()) {
throw new TaskCancelledException("cancelled");
}};
ContextIndexSearcher searcher = new ContextIndexSearcher(reader,
IndexSearcher.getDefaultSimilarity(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy());
ContextIndexSearcher searcher = new ContextIndexSearcher(reader, IndexSearcher.getDefaultSimilarity(),
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), true);
searcher.addQueryCancellation(cancellation);
CompiledAutomaton automaton = new CompiledAutomaton(new RegExp("a.*").toAutomaton());

Expand Down
Loading