Skip to content

Commit 7c63f54

Browse files
authored
Use a threadsafe map in SearchAsyncActionTests (#33700)
Today `SearchAsyncActionTests#testFanOutAndCollect` uses a simple `HashMap` for the `nodeToContextMap` variable, which is then accessed from multiple threads without, apparently, explicit synchronisation. This provides an explanation for the test failure identified in #29242 in which `.toString()` returns `"[]"` just before `.isEmpty` returns `false`, without any concurrent modifications. This change converts `nodeToContextMap` to a `newConcurrentMap()` so that this cannot occur. It also fixes a race condition in the detection of double-calling the subsequent search phase. Closes #29242.
1 parent 5166dd0 commit 7c63f54

File tree

1 file changed

+12
-7
lines changed

1 file changed

+12
-7
lines changed

server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,13 @@
5252
import java.util.concurrent.CountDownLatch;
5353
import java.util.concurrent.ExecutorService;
5454
import java.util.concurrent.Executors;
55+
import java.util.concurrent.atomic.AtomicBoolean;
5556
import java.util.concurrent.atomic.AtomicInteger;
5657
import java.util.concurrent.atomic.AtomicReference;
5758

59+
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
60+
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet;
61+
5862
public class SearchAsyncActionTests extends ESTestCase {
5963

6064
public void testSkipSearchShards() throws InterruptedException {
@@ -139,7 +143,7 @@ protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting sha
139143
protected SearchPhase getNextPhase(SearchPhaseResults<TestSearchPhaseResult> results, SearchPhaseContext context) {
140144
return new SearchPhase("test") {
141145
@Override
142-
public void run() throws IOException {
146+
public void run() {
143147
latch.countDown();
144148
}
145149
};
@@ -260,7 +264,6 @@ public void testFanOutAndCollect() throws InterruptedException {
260264
SearchRequest request = new SearchRequest();
261265
request.allowPartialSearchResults(true);
262266
request.setMaxConcurrentShardRequests(randomIntBetween(1, 100));
263-
CountDownLatch latch = new CountDownLatch(1);
264267
AtomicReference<TestSearchResponse> response = new AtomicReference<>();
265268
ActionListener<SearchResponse> responseListener = new ActionListener<SearchResponse>() {
266269
@Override
@@ -277,7 +280,7 @@ public void onFailure(Exception e) {
277280
DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT);
278281
DiscoveryNode replicaNode = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT);
279282

280-
Map<DiscoveryNode, Set<Long>> nodeToContextMap = new HashMap<>();
283+
Map<DiscoveryNode, Set<Long>> nodeToContextMap = newConcurrentMap();
281284
AtomicInteger contextIdGenerator = new AtomicInteger(0);
282285
GroupShardsIterator<SearchShardIterator> shardsIter = getShardsIter("idx",
283286
new OriginalIndices(new String[]{"idx"}, IndicesOptions.strictExpandOpenAndForbidClosed()),
@@ -296,6 +299,8 @@ public void sendFreeContext(Transport.Connection connection, long contextId, Ori
296299
lookup.put(replicaNode.getId(), new MockConnection(replicaNode));
297300
Map<String, AliasFilter> aliasFilters = Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY));
298301
final ExecutorService executor = Executors.newFixedThreadPool(randomIntBetween(1, Runtime.getRuntime().availableProcessors()));
302+
final CountDownLatch latch = new CountDownLatch(1);
303+
final AtomicBoolean latchTriggered = new AtomicBoolean();
299304
AbstractSearchAsyncAction<TestSearchPhaseResult> asyncAction =
300305
new AbstractSearchAsyncAction<TestSearchPhaseResult>(
301306
"test",
@@ -326,7 +331,7 @@ protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting sha
326331
Transport.Connection connection = getConnection(null, shard.currentNodeId());
327332
TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(contextIdGenerator.incrementAndGet(),
328333
connection.getNode());
329-
Set<Long> ids = nodeToContextMap.computeIfAbsent(connection.getNode(), (n) -> new HashSet<>());
334+
Set<Long> ids = nodeToContextMap.computeIfAbsent(connection.getNode(), (n) -> newConcurrentSet());
330335
ids.add(testSearchPhaseResult.getRequestId());
331336
if (randomBoolean()) {
332337
listener.onResponse(testSearchPhaseResult);
@@ -339,15 +344,15 @@ protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting sha
339344
protected SearchPhase getNextPhase(SearchPhaseResults<TestSearchPhaseResult> results, SearchPhaseContext context) {
340345
return new SearchPhase("test") {
341346
@Override
342-
public void run() throws IOException {
347+
public void run() {
343348
for (int i = 0; i < results.getNumShards(); i++) {
344349
TestSearchPhaseResult result = results.getAtomicArray().get(i);
345350
assertEquals(result.node.getId(), result.getSearchShardTarget().getNodeId());
346351
sendReleaseSearchContext(result.getRequestId(), new MockConnection(result.node), OriginalIndices.NONE);
347352
}
348353
responseListener.onResponse(response);
349-
if (latch.getCount() == 0) {
350-
throw new AssertionError("Running a search phase after the latch has reached 0 !!!!");
354+
if (latchTriggered.compareAndSet(false, true) == false) {
355+
throw new AssertionError("latch triggered twice");
351356
}
352357
latch.countDown();
353358
}

0 commit comments

Comments
 (0)