Skip to content

Commit 450d301

Browse files
committed
[TEST] Fix testLimitConcurrentShardRequests failure
With #36221 we introduced shards counting to address a rare failure. This caused a worse problem in this test when replicas were allocated and shards failures were randomly returned. The latch has to take into account additional attempts caused by the shard failures, which means that in order for run to be called, performPhaseOnShard will be called (numShards + numFailures) times. To address this, we need to decide upfront which shard is going to fail, making sure that at least one shards is successful otherwise the whole request fails. Closes #37074
1 parent b10a330 commit 450d301

File tree

1 file changed

+20
-10
lines changed

1 file changed

+20
-10
lines changed

server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -154,14 +154,27 @@ protected void executeNext(Runnable runnable, Thread originalThread) {
154154
assertEquals(shardsIter.size(), searchResponse.getSuccessfulShards());
155155
}
156156

157-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37074")
158157
public void testLimitConcurrentShardRequests() throws InterruptedException {
159158
SearchRequest request = new SearchRequest();
160159
request.allowPartialSearchResults(true);
161160
int numConcurrent = randomIntBetween(1, 5);
162161
request.setMaxConcurrentShardRequests(numConcurrent);
163-
int numShards = 10;
164-
CountDownLatch latch = new CountDownLatch(numShards);
162+
boolean doReplicas = randomBoolean();
163+
int numShards = randomIntBetween(5, 10);
164+
int numShardAttempts = numShards;
165+
Boolean[] shardFailures = new Boolean[numShards];
166+
// at least one response otherwise the entire request fails
167+
shardFailures[randomIntBetween(0, shardFailures.length - 1)] = false;
168+
for (int i = 0; i < shardFailures.length; i++) {
169+
if (shardFailures[i] == null) {
170+
boolean failure = randomBoolean();
171+
shardFailures[i] = failure;
172+
if (failure && doReplicas) {
173+
numShardAttempts++;
174+
}
175+
}
176+
}
177+
CountDownLatch latch = new CountDownLatch(numShardAttempts);
165178
AtomicBoolean searchPhaseDidRun = new AtomicBoolean(false);
166179
ActionListener<SearchResponse> responseListener = ActionListener.wrap(response -> {},
167180
(e) -> { throw new AssertionError("unexpected", e);});
@@ -172,7 +185,7 @@ public void testLimitConcurrentShardRequests() throws InterruptedException {
172185
AtomicInteger contextIdGenerator = new AtomicInteger(0);
173186
GroupShardsIterator<SearchShardIterator> shardsIter = getShardsIter("idx",
174187
new OriginalIndices(new String[]{"idx"}, SearchRequest.DEFAULT_INDICES_OPTIONS),
175-
numShards, randomBoolean(), primaryNode, replicaNode);
188+
numShards, doReplicas, primaryNode, replicaNode);
176189
SearchTransportService transportService = new SearchTransportService(null, null);
177190
Map<String, Transport.Connection> lookup = new HashMap<>();
178191
Map<ShardId, Boolean> seenShard = new ConcurrentHashMap<>();
@@ -181,7 +194,6 @@ public void testLimitConcurrentShardRequests() throws InterruptedException {
181194
Map<String, AliasFilter> aliasFilters = Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY));
182195
CountDownLatch awaitInitialRequests = new CountDownLatch(1);
183196
AtomicInteger numRequests = new AtomicInteger(0);
184-
AtomicInteger numResponses = new AtomicInteger(0);
185197
AbstractSearchAsyncAction<TestSearchPhaseResult> asyncAction =
186198
new AbstractSearchAsyncAction<TestSearchPhaseResult>(
187199
"test",
@@ -208,7 +220,7 @@ public void testLimitConcurrentShardRequests() throws InterruptedException {
208220
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
209221
SearchActionListener<TestSearchPhaseResult> listener) {
210222
seenShard.computeIfAbsent(shard.shardId(), (i) -> {
211-
numRequests.incrementAndGet(); // only count this once per replica
223+
numRequests.incrementAndGet(); // only count this once per shard copy
212224
return Boolean.TRUE;
213225
});
214226

@@ -221,13 +233,11 @@ protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting sha
221233
Transport.Connection connection = getConnection(null, shard.currentNodeId());
222234
TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(contextIdGenerator.incrementAndGet(),
223235
connection.getNode());
224-
if (numResponses.getAndIncrement() > 0 && randomBoolean()) { // at least one response otherwise the entire
225-
// request fails
236+
if (shardFailures[shard.shardId().id()]) {
226237
listener.onFailure(new RuntimeException());
227238
} else {
228239
listener.onResponse(testSearchPhaseResult);
229240
}
230-
231241
}).start();
232242
}
233243

@@ -252,7 +262,7 @@ protected void executeNext(Runnable runnable, Thread originalThread) {
252262
awaitInitialRequests.countDown();
253263
latch.await();
254264
assertTrue(searchPhaseDidRun.get());
255-
assertEquals(10, numRequests.get());
265+
assertEquals(numShards, numRequests.get());
256266
}
257267

258268
public void testFanOutAndCollect() throws InterruptedException {

0 commit comments

Comments
 (0)