From 8612b7541322c2635106035c31d913da645bae59 Mon Sep 17 00:00:00 2001 From: jimczi Date: Tue, 10 Mar 2020 23:12:12 +0100 Subject: [PATCH 1/5] Fix race condition in shard group failure callback Shard group failure callbacks should be executed before incrementing the total operations. This is required to ensure that we don't notify a shard group failure **after** the completion callback. --- .../action/search/AbstractSearchAsyncAction.java | 9 +++++---- .../xpack/search/MutableSearchResponse.java | 1 - 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index ab965e53ded77..1d2339dd9e6fe 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -375,6 +375,11 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, // we do make sure to clean it on a successful response from a shard SearchShardTarget shardTarget = shardIt.newSearchShardTarget(nodeId); onShardFailure(shardIndex, shardTarget, e); + final ShardRouting nextShard = shardIt.nextOrNull(); + final boolean lastShard = nextShard == null; + if (lastShard) { + onShardGroupFailure(shardIndex, shardTarget, e); + } if (totalOps.incrementAndGet() == expectedTotalOps) { if (logger.isDebugEnabled()) { @@ -385,11 +390,8 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, logger.trace(new ParameterizedMessage("{}: Failed to execute [{}]", shard, request), e); } } - onShardGroupFailure(shardIndex, shardTarget, e); onPhaseDone(); } else { - final ShardRouting nextShard = shardIt.nextOrNull(); - final boolean lastShard = nextShard == null; // trace log this exception logger.trace(() -> new ParameterizedMessage( "{}: Failed to execute [{}] lastShard [{}]", @@ -405,7 +407,6 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, shard != null ? shard.shortSummary() : shardIt.shardId(), request, lastShard), e); } } - onShardGroupFailure(shardIndex, shardTarget, e); } } } diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java index 203978edea580..1949aceb6bd91 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java @@ -83,7 +83,6 @@ synchronized void updatePartialResponse(int successfulShards, SearchResponseSect throw new IllegalStateException("received partial response out of order: " + newSections.getNumReducePhases() + " < " + sections.getNumReducePhases()); } - failIfFrozen(); ++ version; this.successfulShards = successfulShards; this.sections = newSections; From da65d6f97064ac08e7bcb7a9af6bcd8a912ca7c8 Mon Sep 17 00:00:00 2001 From: jimczi Date: Tue, 10 Mar 2020 23:34:46 +0100 Subject: [PATCH 2/5] Fix the initial response stored in async search index This change ensures that we set the isRunning flag to `false` when storing the initial response of an async search request. --- .../xpack/search/TransportSubmitAsyncSearchAction.java | 7 +++++-- .../xpack/core/search/action/AsyncSearchResponse.java | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java index 72e93575c977a..214f0abe0de68 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java @@ -83,7 +83,10 @@ public void onResponse(AsyncSearchResponse searchResponse) { onFatalFailure(searchTask, cause, false, submitListener); } else { final String docId = searchTask.getSearchId().getDocId(); - store.storeInitialResponse(docId, searchTask.getOriginHeaders(), searchResponse, + // creates the fallback response if the node crashes/restarts in the middle of the request + // TODO: store intermediate results ? + AsyncSearchResponse initialResp = searchResponse.clone(searchResponse.getId(), false); + store.storeInitialResponse(docId, searchTask.getOriginHeaders(), initialResp, new ActionListener<>() { @Override public void onResponse(IndexResponse r) { @@ -114,7 +117,7 @@ public void onFailure(Exception exc) { // the task completed within the timeout so the response is sent back to the user // with a null id since nothing was stored on the cluster. taskManager.unregister(searchTask); - submitListener.onResponse(searchResponse.clone(null)); + submitListener.onResponse(searchResponse.clone(null, false)); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncSearchResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncSearchResponse.java index ed27a0e06dee9..0966d757823e5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncSearchResponse.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncSearchResponse.java @@ -99,7 +99,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(expirationTimeMillis); } - public AsyncSearchResponse clone(String id) { + public AsyncSearchResponse clone(String id, boolean isRunning) { return new AsyncSearchResponse(id, version, searchResponse, error, isPartial, isRunning, startTimeMillis, expirationTimeMillis); } From 77dd1310bc001ead7eb1266a66a273f359fdf3fd Mon Sep 17 00:00:00 2001 From: jimczi Date: Wed, 11 Mar 2020 00:15:43 +0100 Subject: [PATCH 3/5] Remove the awaits fix --- .../elasticsearch/xpack/search/AsyncSearchActionTests.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchActionTests.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchActionTests.java index 7ee2282b33ced..ec592d19c6466 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchActionTests.java +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchActionTests.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.search; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.common.settings.Settings; @@ -37,7 +36,6 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; // TODO: add tests for keepAlive and expiration -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/53360") public class AsyncSearchActionTests extends AsyncSearchIntegTestCase { private String indexName; private int numShards; @@ -52,7 +50,7 @@ public class AsyncSearchActionTests extends AsyncSearchIntegTestCase { public void indexDocuments() throws InterruptedException { indexName = "test-async"; numShards = randomIntBetween(internalCluster().numDataNodes(), internalCluster().numDataNodes()*10); - numDocs = randomIntBetween(numShards, numShards*10); + numDocs = randomIntBetween(numShards, numShards*3); createIndex(indexName, Settings.builder().put("index.number_of_shards", numShards).build()); numKeywords = randomIntBetween(1, 100); keywordFreqs = new HashMap<>(); @@ -217,6 +215,7 @@ public void testCleanupOnFailure() throws Exception { } ensureTaskCompletion(initial.getId()); AsyncSearchResponse response = getAsyncSearch(initial.getId()); + assertFalse(response.isRunning()); assertNotNull(response.getFailure()); assertTrue(response.isPartial()); assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards)); From 55e59705d137999f03d9b0a9af60f447e813f2d8 Mon Sep 17 00:00:00 2001 From: jimczi Date: Wed, 11 Mar 2020 00:32:27 +0100 Subject: [PATCH 4/5] disable cacheability of BlockQueryBuilder --- .../elasticsearch/xpack/search/AsyncSearchIntegTestCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java index 5c229a5de1d80..4eca82dd8a0c9 100644 --- a/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java +++ b/x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java @@ -364,7 +364,7 @@ public int hashCode() { @Override protected boolean doEquals(BlockQueryBuilder other) { - return true; + return false; } @Override From b0afc8c17602de78633d74848f7b5f088e13d7e1 Mon Sep 17 00:00:00 2001 From: jimczi Date: Wed, 11 Mar 2020 12:44:13 +0100 Subject: [PATCH 5/5] address review --- .../xpack/search/TransportSubmitAsyncSearchAction.java | 4 ++-- .../xpack/core/search/action/AsyncSearchResponse.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java index 214f0abe0de68..ff7801305302c 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java @@ -85,7 +85,7 @@ public void onResponse(AsyncSearchResponse searchResponse) { final String docId = searchTask.getSearchId().getDocId(); // creates the fallback response if the node crashes/restarts in the middle of the request // TODO: store intermediate results ? - AsyncSearchResponse initialResp = searchResponse.clone(searchResponse.getId(), false); + AsyncSearchResponse initialResp = searchResponse.clone(searchResponse.getId()); store.storeInitialResponse(docId, searchTask.getOriginHeaders(), initialResp, new ActionListener<>() { @Override @@ -117,7 +117,7 @@ public void onFailure(Exception exc) { // the task completed within the timeout so the response is sent back to the user // with a null id since nothing was stored on the cluster. taskManager.unregister(searchTask); - submitListener.onResponse(searchResponse.clone(null, false)); + submitListener.onResponse(searchResponse.clone(null)); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncSearchResponse.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncSearchResponse.java index 0966d757823e5..9828635ae64fe 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncSearchResponse.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncSearchResponse.java @@ -99,8 +99,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(expirationTimeMillis); } - public AsyncSearchResponse clone(String id, boolean isRunning) { - return new AsyncSearchResponse(id, version, searchResponse, error, isPartial, isRunning, startTimeMillis, expirationTimeMillis); + public AsyncSearchResponse clone(String id) { + return new AsyncSearchResponse(id, version, searchResponse, error, isPartial, false, startTimeMillis, expirationTimeMillis); } /**