From 5a02677754fd170d7ba8744e5733dde0d7f84681 Mon Sep 17 00:00:00 2001 From: jimczi Date: Mon, 7 Sep 2020 15:49:41 +0200 Subject: [PATCH 1/3] Shard Search Scroll failures consistency Today some uncaught shard failures such as RejectedExecutionException skips the release of shard context and let subsequent scroll requests access the same shard context again. Depending on how the other shards advanced, this behavior can lead to missing data since scrolls always move forward. In order to avoid hidden data loss, this commit ensures that we always release the context of shard search scroll requests whenever a failure occurs locally. The shard search context will no longer exist in subsequent scroll requests which will lead to consistent shard failures in the responses. This change also modifies the retry tests of the reindex feature. Reindex retries scroll search request that contains a shard failure and move on whenever the failure disappears. That is not compatible with how scrolls work and can lead to missing data as explained above. That means that reindex will now report scroll failures when search rejection happen during the operation instead of skipping document silently. Finally this change removes an old TODO that was fulfilled with #61062. --- .../index/reindex/RetryTests.java | 43 ++++--------------- .../search/SearchScrollAsyncAction.java | 15 ------- .../elasticsearch/search/SearchService.java | 13 +++--- 3 files changed, 14 insertions(+), 57 deletions(-) diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java index 0a08c7a2039c5..56da16acd80b0 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java @@ -53,7 +53,7 @@ import static org.hamcrest.Matchers.hasSize; /** - * Integration test for retry behavior. Useful because retrying relies on the way that the + * Integration test for bulk retry behavior. Useful because retrying relies on the way that the * rest of Elasticsearch throws exceptions and unit tests won't verify that. */ public class RetryTests extends ESIntegTestCase { @@ -77,7 +77,7 @@ protected Collection> nodePlugins() { } /** - * Lower the queue sizes to be small enough that both bulk and searches will time out and have to be retried. + * Lower the queue sizes to be small enough that bulk will time out and have to be retried. */ @Override protected Settings nodeSettings(int nodeOrdinal) { @@ -145,22 +145,15 @@ private void testCase( BulkIndexByScrollResponseMatcher matcher) throws Exception { /* - * These test cases work by stuffing the search and bulk queues of a single node and - * making sure that we read and write from that node. Because of some "fun" with the - * way that searches work, we need at least one more node to act as the coordinating - * node for the search request. If we didn't do this then the searches would get stuck - * in the queue anyway because we force queue portions of the coordinating node's - * actions. This is not a big deal in normal operations but a real pain when you are - * intentionally stuffing queues hoping for a failure. + * These test cases work by stuffing the bulk queue of a single node and + * making sure that we read and write from that node. */ final Settings nodeSettings = Settings.builder() // use pools of size 1 so we can block them .put("thread_pool.write.size", 1) - .put("thread_pool.search.size", 1) - // use queues of size 1 because size 0 is broken and because search requests need the queue to function + // use queues of size 1 because size 0 is broken and because bulk requests need the queue to function .put("thread_pool.write.queue_size", 1) - .put("thread_pool.search.queue_size", 1) .put("node.attr.color", "blue") .build(); final String node = internalCluster().startDataOnlyNode(nodeSettings); @@ -186,45 +179,25 @@ private void testCase( assertFalse(initialBulkResponse.buildFailureMessage(), initialBulkResponse.hasFailures()); client().admin().indices().prepareRefresh("source").get(); - logger.info("Blocking search"); - CyclicBarrier initialSearchBlock = blockExecutor(ThreadPool.Names.SEARCH, node); - AbstractBulkByScrollRequestBuilder builder = request.apply(internalCluster().masterClient()); // Make sure we use more than one batch so we have to scroll builder.source().setSize(DOC_COUNT / randomIntBetween(2, 10)); + logger.info("Blocking bulk so we start to get bulk rejections"); + CyclicBarrier bulkBlock = blockExecutor(ThreadPool.Names.WRITE, node); + logger.info("Starting request"); ActionFuture responseListener = builder.execute(); try { - logger.info("Waiting for search rejections on the initial search"); - assertBusy(() -> assertThat(taskStatus(action).getSearchRetries(), greaterThan(0L))); - - logger.info("Blocking bulk and unblocking search so we start to get bulk rejections"); - CyclicBarrier bulkBlock = blockExecutor(ThreadPool.Names.WRITE, node); - initialSearchBlock.await(); - logger.info("Waiting for bulk rejections"); assertBusy(() -> assertThat(taskStatus(action).getBulkRetries(), greaterThan(0L))); - - // Keep a copy of the current number of search rejections so we can assert that we get more when we block the scroll - long initialSearchRejections = taskStatus(action).getSearchRetries(); - - logger.info("Blocking search and unblocking bulk so we should get search rejections for the scroll"); - CyclicBarrier scrollBlock = blockExecutor(ThreadPool.Names.SEARCH, node); bulkBlock.await(); - logger.info("Waiting for search rejections for the scroll"); - assertBusy(() -> assertThat(taskStatus(action).getSearchRetries(), greaterThan(initialSearchRejections))); - - logger.info("Unblocking the scroll"); - scrollBlock.await(); - logger.info("Waiting for the request to finish"); BulkByScrollResponse response = responseListener.get(); assertThat(response, matcher); assertThat(response.getBulkRetries(), greaterThan(0L)); - assertThat(response.getSearchRetries(), greaterThan(initialSearchRejections)); } finally { // Fetch the response just in case we blew up half way through. This will make sure the failure is thrown up to the top level. BulkByScrollResponse response = responseListener.get(); diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchScrollAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchScrollAsyncAction.java index 1db433278620d..5486ae59581a2 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchScrollAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchScrollAsyncAction.java @@ -53,21 +53,6 @@ * run separate fetch phases etc. */ abstract class SearchScrollAsyncAction implements Runnable { - /* - * Some random TODO: - * Today we still have a dedicated executing mode for scrolls while we could simplify this by implementing - * scroll like functionality (mainly syntactic sugar) as an ordinary search with search_after. We could even go further and - * make the scroll entirely stateless and encode the state per shard in the scroll ID. - * - * Today we also hold a context per shard but maybe - * we want the context per coordinating node such that we route the scroll to the same coordinator all the time and hold the context - * here? This would have the advantage that if we loose that node the entire scroll is deal not just one shard. - * - * Additionally there is the possibility to associate the scroll with a seq. id. such that we can talk to any replica as long as - * the shards engine hasn't advanced that seq. id yet. Such a resume is possible and best effort, it could be even a safety net since - * if you rely on indices being read-only things can change in-between without notification or it's hard to detect if there where any - * changes while scrolling. These are all options to improve the current situation which we can look into down the road - */ protected final Logger logger; protected final ActionListener listener; protected final ParsedScrollId scrollId; diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 13ef1b1259037..12956ca7640a7 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -512,10 +512,10 @@ public void executeQueryPhase(InternalScrollSearchRequest request, return new ScrollQuerySearchResult(searchContext.queryResult(), searchContext.shardTarget()); } catch (Exception e) { logger.trace("Query phase failed", e); - processFailure(readerContext, e); + // we handle the failure in the failure listener below throw e; } - }, ActionListener.runAfter(listener, markAsUsed::close)); + }, wrapFailureListener(listener, readerContext, markAsUsed)); } public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, ActionListener listener) { @@ -542,7 +542,7 @@ public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, } catch (Exception e) { assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e); logger.trace("Query phase failed", e); - processFailure(readerContext, e); + // we handle the failure in the failure listener below throw e; } }, wrapFailureListener(listener, readerContext, markAsUsed)); @@ -583,10 +583,10 @@ public void executeFetchPhase(InternalScrollSearchRequest request, SearchShardTa } catch (Exception e) { assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e); logger.trace("Fetch phase failed", e); - processFailure(readerContext, e); + // we handle the failure in the failure listener below throw e; } - }, ActionListener.runAfter(listener, markAsUsed::close)); + }, wrapFailureListener(listener, readerContext, markAsUsed)); } public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener listener) { @@ -612,8 +612,7 @@ public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, A return searchContext.fetchResult(); } catch (Exception e) { assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e); - logger.trace("Fetch phase failed", e); - processFailure(readerContext, e); + // we handle the failure in the failure listener below throw e; } }, wrapFailureListener(listener, readerContext, markAsUsed)); From 1f12505eac0e18d8077101a807a8d0c751d0700f Mon Sep 17 00:00:00 2001 From: jimczi Date: Mon, 7 Sep 2020 22:56:09 +0200 Subject: [PATCH 2/3] Make the searcher for legacy reader context final This commit ensures that the searcher that we create for scrolls is initialized only once.t --- .../elasticsearch/index/engine/Engine.java | 3 +- .../elasticsearch/search/SearchService.java | 130 ++++++++---------- .../search/internal/LegacyReaderContext.java | 28 ++-- 3 files changed, 73 insertions(+), 88 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index b79b713ccef53..f543cc7011db1 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -108,7 +108,8 @@ public abstract class Engine implements Closeable { public static final String FORCE_MERGE_UUID_KEY = "force_merge_uuid"; public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no"; public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp"; - public static final String CAN_MATCH_SEARCH_SOURCE = "can_match"; // TODO: Make source of search enum? + public static final String SEARCH_SOURCE = "search"; // TODO: Make source of search enum? + public static final String CAN_MATCH_SEARCH_SOURCE = "can_match"; protected final ShardId shardId; protected final String allocationId; diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 12956ca7640a7..71122868bd7b3 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -348,8 +348,8 @@ private DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchShardTask task, boolean keepStatesInContext) throws IOException { ReaderContext readerContext = createOrGetReaderContext(request, keepStatesInContext); - try (Releasable ignored = readerContext.markAsUsed(); - SearchContext context = createContext(readerContext, request, task, true)) { + try (Releasable ignored = updateKeepAliveAndMarkAsUsed(readerContext, getKeepAlive(request)); + SearchContext context = createContext(readerContext, request, task, true)) { dfsPhase.execute(context); return context.dfsResult(); } catch (Exception e) { @@ -380,49 +380,24 @@ public void executeQueryPhase(ShardSearchRequest request, boolean keepStatesInCo rewriteAndFetchShardRequest(shard, request, new ActionListener() { @Override public void onResponse(ShardSearchRequest orig) { - final ReaderContext readerContext = createOrGetReaderContext(orig, keepStatesInContext); - final Releasable markAsUsed = readerContext.markAsUsed(); + // check if we can shortcut the query phase entirely. if (orig.canReturnNullResponseIfMatchNoDocs()) { assert orig.scroll() == null; - // we clone the shard request and perform a quick rewrite using a lightweight - // searcher since we are outside of the search thread pool. - // If the request rewrites to "match none" we can shortcut the query phase - // entirely. Otherwise we fork the execution in the search thread pool. - ShardSearchRequest canMatchRequest = new ShardSearchRequest(orig); - try (Engine.Searcher searcher = readerContext.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE)) { - QueryShardContext context = readerContext.indexService().newQueryShardContext(canMatchRequest.shardId().id(), - searcher, canMatchRequest::nowInMillis, canMatchRequest.getClusterAlias()); - Rewriteable.rewrite(canMatchRequest.getRewriteable(), context, true); + final CanMatchResponse canMatchResp; + try { + ShardSearchRequest clone = new ShardSearchRequest(orig); + canMatchResp = canMatch(clone, false); } catch (Exception exc) { - try (markAsUsed) { - listener.onFailure(exc); - } finally { - processFailure(readerContext, exc); - } + listener.onFailure(exc); return; } - if (canRewriteToMatchNone(canMatchRequest.source()) - && canMatchRequest.source().query() instanceof MatchNoneQueryBuilder) { - try (markAsUsed) { - if (orig.readerId() == null) { - try { - listener.onResponse(QuerySearchResult.nullInstance()); - } finally { - // close and remove the ephemeral reader context - removeReaderContext(readerContext.id().getId()); - Releasables.close(readerContext); - } - } else { - listener.onResponse(QuerySearchResult.nullInstance()); - } - } + if (canMatchResp.canMatch == false) { + listener.onResponse(QuerySearchResult.nullInstance()); return; } } - // fork the execution in the search thread pool - runAsync(getExecutor(shard), () -> executeQueryPhase(orig, task, readerContext), - wrapFailureListener(listener, readerContext, markAsUsed)); + runAsync(getExecutor(shard), () -> executeQueryPhase(orig, task, keepStatesInContext), listener); } @Override @@ -446,8 +421,10 @@ private void runAsync(Executor executor, CheckedSupplier execu private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchShardTask task, - ReaderContext readerContext) throws Exception { - try (SearchContext context = createContext(readerContext, request, task, true)) { + boolean keepStatesInContext) throws Exception { + final ReaderContext readerContext = createOrGetReaderContext(request, keepStatesInContext); + try (Releasable ignored = updateKeepAliveAndMarkAsUsed(readerContext, getKeepAlive(request)); + SearchContext context = createContext(readerContext, request, task, true)) { final long afterQueryTime; try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { loadOrExecuteQueryPhase(request, context); @@ -494,16 +471,11 @@ public void executeQueryPhase(InternalScrollSearchRequest request, SearchShardTask task, ActionListener listener) { final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId(), request); - final Releasable markAsUsed = readerContext.markAsUsed(); + final Releasable markAsUsed = updateKeepAliveAndMarkAsUsed(readerContext, getScrollKeepAlive(request.scroll())); runAsync(getExecutor(readerContext.indexShard()), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false); SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)) { - if (request.scroll() != null && request.scroll().keepAlive() != null) { - final long keepAlive = request.scroll().keepAlive().millis(); - checkKeepAliveLimit(keepAlive); - readerContext.keepAlive(keepAlive); - } searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null)); processScroll(request, readerContext, searchContext); queryPhase.execute(searchContext); @@ -519,10 +491,10 @@ public void executeQueryPhase(InternalScrollSearchRequest request, } public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, ActionListener listener) { - final ReaderContext readerContext = findReaderContext(request.contextId(), request); - final Releasable markAsUsed = readerContext.markAsUsed(); + final ReaderContext readerContext = findReaderContext(request.contextId(), request.shardSearchRequest()); + final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.shardSearchRequest()); + final Releasable markAsUsed = updateKeepAliveAndMarkAsUsed(readerContext, getKeepAlive(shardSearchRequest)); runAsync(getExecutor(readerContext.indexShard()), () -> { - final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.shardSearchRequest()); readerContext.setAggregatedDfs(request.dfs()); try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, true); SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)) { @@ -564,15 +536,11 @@ private Executor getExecutor(IndexShard indexShard) { public void executeFetchPhase(InternalScrollSearchRequest request, SearchShardTask task, ActionListener listener) { final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId(), request); - final Releasable markAsUsed = readerContext.markAsUsed(); + final Releasable markAsUsed = updateKeepAliveAndMarkAsUsed(readerContext, getScrollKeepAlive(request.scroll())); runAsync(getExecutor(readerContext.indexShard()), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false); SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)) { - if (request.scroll() != null && request.scroll().keepAlive() != null) { - checkKeepAliveLimit(request.scroll().keepAlive().millis()); - readerContext.keepAlive(request.scroll().keepAlive().millis()); - } searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(null)); searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null)); processScroll(request, readerContext, searchContext); @@ -591,9 +559,9 @@ public void executeFetchPhase(InternalScrollSearchRequest request, SearchShardTa public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener listener) { final ReaderContext readerContext = findReaderContext(request.contextId(), request); - final Releasable markAsUsed = readerContext.markAsUsed(); + final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest()); + final Releasable markAsUsed = updateKeepAliveAndMarkAsUsed(readerContext, getKeepAlive(shardSearchRequest)); runAsync(getExecutor(readerContext.indexShard()), () -> { - final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest()); try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false)) { if (request.lastEmittedDoc() != null) { searchContext.scrollContext().lastEmittedDoc = request.lastEmittedDoc(); @@ -643,13 +611,18 @@ private ReaderContext findReaderContext(ShardSearchContextId id, TransportReques return reader; } + private Releasable updateKeepAliveAndMarkAsUsed(ReaderContext reader, long keepAlive) { + if (keepAlive > 0L) { + checkKeepAliveLimit(keepAlive); + reader.keepAlive(keepAlive); + } + return reader.markAsUsed(); + } + final ReaderContext createOrGetReaderContext(ShardSearchRequest request, boolean keepStatesInContext) { if (request.readerId() != null) { assert keepStatesInContext == false; final ReaderContext readerContext = findReaderContext(request.readerId(), request); - final long keepAlive = request.keepAlive().millis(); - checkKeepAliveLimit(keepAlive); - readerContext.keepAlive(keepAlive); return readerContext; } IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); @@ -835,11 +808,20 @@ public void freeAllScrollContexts() { } private long getKeepAlive(ShardSearchRequest request) { - if (request.scroll() != null && request.scroll().keepAlive() != null) { - return request.scroll().keepAlive().millis(); + if (request.scroll() != null) { + return getScrollKeepAlive(request.scroll()); + } else if (request.keepAlive() != null) { + return request.keepAlive().getMillis(); } else { - return defaultKeepAlive; + return request.readerId() == null ? defaultKeepAlive : -1; + } + } + + private long getScrollKeepAlive(Scroll scroll) { + if (scroll != null && scroll.keepAlive() != null) { + return scroll.keepAlive().getMillis(); } + return defaultKeepAlive; } private void checkKeepAliveLimit(long keepAlive) { @@ -1150,29 +1132,39 @@ public AliasFilter buildAliasFilter(ClusterState state, String index, Set listener) { + try { + listener.onResponse(canMatch(request)); + } catch (IOException e) { + listener.onFailure(e); + } + } + /** * This method uses a lightweight searcher without wrapping (i.e., not open a full reader on frozen indices) to rewrite the query * to check if the query can match any documents. This method can have false positives while if it returns {@code false} the query * won't match any documents on the current shard. */ public CanMatchResponse canMatch(ShardSearchRequest request) throws IOException { + return canMatch(request, true); + } + + private CanMatchResponse canMatch(ShardSearchRequest request, boolean checkRefreshPending) throws IOException { assert request.searchType() == SearchType.QUERY_THEN_FETCH : "unexpected search type: " + request.searchType(); - final ReaderContext readerContext = request.readerId() != null ? findReaderContext(request.readerId(), request) : null; - final Releasable markAsUsed = readerContext != null ? readerContext.markAsUsed() : null; + final ReaderContext readerContext = request.readerId() != null ? findReaderContext(request.readerId(), request) : null; + final Releasable markAsUsed = readerContext != null ? updateKeepAliveAndMarkAsUsed(readerContext, getKeepAlive(request)) : null; try (markAsUsed) { final IndexService indexService; final Engine.Searcher canMatchSearcher; final boolean hasRefreshPending; if (readerContext != null) { - checkKeepAliveLimit(request.keepAlive().millis()); - readerContext.keepAlive(request.keepAlive().millis()); indexService = readerContext.indexService(); canMatchSearcher = readerContext.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE); hasRefreshPending = false; } else { indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); IndexShard indexShard = indexService.getShard(request.shardId().getId()); - hasRefreshPending = indexShard.hasRefreshPending(); + hasRefreshPending = indexShard.hasRefreshPending() && checkRefreshPending; canMatchSearcher = indexShard.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE); } @@ -1197,14 +1189,6 @@ public CanMatchResponse canMatch(ShardSearchRequest request) throws IOException } } - public void canMatch(ShardSearchRequest request, ActionListener listener) { - try { - listener.onResponse(canMatch(request)); - } catch (IOException e) { - listener.onFailure(e); - } - } - /** * Returns true iff the given search source builder can be early terminated by rewriting to a match none query. Or in other words * if the execution of the search request can be early terminated without executing it. This is for instance not possible if diff --git a/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java b/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java index ab188f8ddf23f..f47d97fe33633 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java @@ -30,11 +30,11 @@ public class LegacyReaderContext extends ReaderContext { private final ShardSearchRequest shardSearchRequest; private final ScrollContext scrollContext; + private final Engine.Searcher searcher; + private AggregatedDfs aggregatedDfs; private RescoreDocIds rescoreDocIds; - private volatile Engine.Searcher searcher; - public LegacyReaderContext(long id, IndexService indexService, IndexShard indexShard, Engine.SearcherSupplier reader, ShardSearchRequest shardSearchRequest, long keepAliveInMillis) { super(id, indexService, indexShard, reader, keepAliveInMillis, false); @@ -42,26 +42,26 @@ public LegacyReaderContext(long id, IndexService indexService, IndexShard indexS assert shardSearchRequest.keepAlive() == null; this.shardSearchRequest = Objects.requireNonNull(shardSearchRequest); if (shardSearchRequest.scroll() != null) { + // Search scroll requests are special, they don't hold indices names so we have + // to reuse the searcher created on the request that initialized the scroll. + // This ensures that we wrap the searcher's reader with the user's permissions + // when they are available. + final Engine.Searcher delegate = searcherSupplier.acquireSearcher("search"); + addOnClose(delegate); + // wrap the searcher so that closing is a noop, the actual closing happens when this context is closed + this.searcher = new Engine.Searcher(delegate.source(), delegate.getDirectoryReader(), + delegate.getSimilarity(), delegate.getQueryCache(), delegate.getQueryCachingPolicy(), () -> {}); this.scrollContext = new ScrollContext(); } else { this.scrollContext = null; + this.searcher = null; } } @Override public Engine.Searcher acquireSearcher(String source) { - if (scrollContext != null && "search".equals(source)) { - // Search scroll requests are special, they don't hold indices names so we have - // to reuse the searcher created on the request that initialized the scroll. - // This ensures that we wrap the searcher's reader with the user's permissions - // when they are available. - if (searcher == null) { - final Engine.Searcher delegate = searcherSupplier.acquireSearcher(source); - addOnClose(delegate); - // wrap the searcher so that closing is a noop, the actual closing happens when this context is closed - searcher = new Engine.Searcher(delegate.source(), delegate.getDirectoryReader(), - delegate.getSimilarity(), delegate.getQueryCache(), delegate.getQueryCachingPolicy(), () -> {}); - } + if (scrollContext != null) { + assert Engine.SEARCH_SOURCE.equals(source) : "scroll context should not acquire searcher for " + source; return searcher; } return super.acquireSearcher(source); From 7d927699905a354353f242505aedf805019bade8 Mon Sep 17 00:00:00 2001 From: jimczi Date: Wed, 9 Sep 2020 01:27:22 +0200 Subject: [PATCH 3/3] add keepAlive in markAsUsed --- .../elasticsearch/search/SearchService.java | 27 +++++++------------ .../search/internal/ReaderContext.java | 11 ++++---- 2 files changed, 16 insertions(+), 22 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 71122868bd7b3..2de2497aacfbb 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -348,7 +348,7 @@ private DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchShardTask task, boolean keepStatesInContext) throws IOException { ReaderContext readerContext = createOrGetReaderContext(request, keepStatesInContext); - try (Releasable ignored = updateKeepAliveAndMarkAsUsed(readerContext, getKeepAlive(request)); + try (Releasable ignored = readerContext.markAsUsed(getKeepAlive(request)); SearchContext context = createContext(readerContext, request, task, true)) { dfsPhase.execute(context); return context.dfsResult(); @@ -423,7 +423,7 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchShardTask task, boolean keepStatesInContext) throws Exception { final ReaderContext readerContext = createOrGetReaderContext(request, keepStatesInContext); - try (Releasable ignored = updateKeepAliveAndMarkAsUsed(readerContext, getKeepAlive(request)); + try (Releasable ignored = readerContext.markAsUsed(getKeepAlive(request)); SearchContext context = createContext(readerContext, request, task, true)) { final long afterQueryTime; try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { @@ -471,7 +471,7 @@ public void executeQueryPhase(InternalScrollSearchRequest request, SearchShardTask task, ActionListener listener) { final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId(), request); - final Releasable markAsUsed = updateKeepAliveAndMarkAsUsed(readerContext, getScrollKeepAlive(request.scroll())); + final Releasable markAsUsed = readerContext.markAsUsed(getScrollKeepAlive(request.scroll())); runAsync(getExecutor(readerContext.indexShard()), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false); @@ -493,7 +493,7 @@ public void executeQueryPhase(InternalScrollSearchRequest request, public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, ActionListener listener) { final ReaderContext readerContext = findReaderContext(request.contextId(), request.shardSearchRequest()); final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.shardSearchRequest()); - final Releasable markAsUsed = updateKeepAliveAndMarkAsUsed(readerContext, getKeepAlive(shardSearchRequest)); + final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest)); runAsync(getExecutor(readerContext.indexShard()), () -> { readerContext.setAggregatedDfs(request.dfs()); try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, true); @@ -536,7 +536,7 @@ private Executor getExecutor(IndexShard indexShard) { public void executeFetchPhase(InternalScrollSearchRequest request, SearchShardTask task, ActionListener listener) { final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId(), request); - final Releasable markAsUsed = updateKeepAliveAndMarkAsUsed(readerContext, getScrollKeepAlive(request.scroll())); + final Releasable markAsUsed = readerContext.markAsUsed(getScrollKeepAlive(request.scroll())); runAsync(getExecutor(readerContext.indexShard()), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false); @@ -560,7 +560,7 @@ public void executeFetchPhase(InternalScrollSearchRequest request, SearchShardTa public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener listener) { final ReaderContext readerContext = findReaderContext(request.contextId(), request); final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest()); - final Releasable markAsUsed = updateKeepAliveAndMarkAsUsed(readerContext, getKeepAlive(shardSearchRequest)); + final Releasable markAsUsed = readerContext.markAsUsed(getKeepAlive(shardSearchRequest)); runAsync(getExecutor(readerContext.indexShard()), () -> { try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false)) { if (request.lastEmittedDoc() != null) { @@ -611,14 +611,6 @@ private ReaderContext findReaderContext(ShardSearchContextId id, TransportReques return reader; } - private Releasable updateKeepAliveAndMarkAsUsed(ReaderContext reader, long keepAlive) { - if (keepAlive > 0L) { - checkKeepAliveLimit(keepAlive); - reader.keepAlive(keepAlive); - } - return reader.markAsUsed(); - } - final ReaderContext createOrGetReaderContext(ShardSearchRequest request, boolean keepStatesInContext) { if (request.readerId() != null) { assert keepStatesInContext == false; @@ -648,7 +640,6 @@ final ReaderContext createAndPutReaderContext(ShardSearchRequest request, IndexS } } final long keepAlive = getKeepAlive(request); - checkKeepAliveLimit(keepAlive); if (keepStatesInContext || request.scroll() != null) { readerContext = new LegacyReaderContext(idGenerator.incrementAndGet(), indexService, shard, reader, request, keepAlive); if (request.scroll() != null) { @@ -749,7 +740,7 @@ public DefaultSearchContext createSearchContext(ShardSearchRequest request, Time final Engine.SearcherSupplier reader = indexShard.acquireSearcherSupplier(); try (ReaderContext readerContext = new ReaderContext(idGenerator.incrementAndGet(), indexService, indexShard, reader, -1L, true)) { DefaultSearchContext searchContext = createSearchContext(readerContext, request, timeout); - searchContext.addReleasable(readerContext.markAsUsed()); + searchContext.addReleasable(readerContext.markAsUsed(0L)); return searchContext; } } @@ -811,6 +802,7 @@ private long getKeepAlive(ShardSearchRequest request) { if (request.scroll() != null) { return getScrollKeepAlive(request.scroll()); } else if (request.keepAlive() != null) { + checkKeepAliveLimit(request.keepAlive().millis()); return request.keepAlive().getMillis(); } else { return request.readerId() == null ? defaultKeepAlive : -1; @@ -819,6 +811,7 @@ private long getKeepAlive(ShardSearchRequest request) { private long getScrollKeepAlive(Scroll scroll) { if (scroll != null && scroll.keepAlive() != null) { + checkKeepAliveLimit(scroll.keepAlive().millis()); return scroll.keepAlive().getMillis(); } return defaultKeepAlive; @@ -1152,7 +1145,7 @@ public CanMatchResponse canMatch(ShardSearchRequest request) throws IOException private CanMatchResponse canMatch(ShardSearchRequest request, boolean checkRefreshPending) throws IOException { assert request.searchType() == SearchType.QUERY_THEN_FETCH : "unexpected search type: " + request.searchType(); final ReaderContext readerContext = request.readerId() != null ? findReaderContext(request.readerId(), request) : null; - final Releasable markAsUsed = readerContext != null ? updateKeepAliveAndMarkAsUsed(readerContext, getKeepAlive(request)) : null; + final Releasable markAsUsed = readerContext != null ? readerContext.markAsUsed(getKeepAlive(request)) : null; try (markAsUsed) { final IndexService indexService; final Engine.Searcher canMatchSearcher; diff --git a/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java b/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java index b2a69d601965c..c00333d38d6bb 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java @@ -124,17 +124,18 @@ public Engine.Searcher acquireSearcher(String source) { return searcherSupplier.acquireSearcher(source); } - public void keepAlive(long keepAlive) { + private void tryUpdateKeepAlive(long keepAlive) { this.keepAlive.updateAndGet(curr -> Math.max(curr, keepAlive)); } /** - * Marks this reader as being used so its time to live should not be expired. - * - * @return a releasable to indicate the caller has stopped using this reader + * Returns a releasable to indicate that the caller has stopped using this reader. + * The time to live of the reader after usage can be extended using the provided + * keepAliveInMillis. */ - public Releasable markAsUsed() { + public Releasable markAsUsed(long keepAliveInMillis) { refCounted.incRef(); + tryUpdateKeepAlive(keepAliveInMillis); return Releasables.releaseOnce(() -> { this.lastAccessTime.updateAndGet(curr -> Math.max(curr, nowInMillis())); refCounted.decRef();