diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 2dfdcf9482afb..8811083baa9cb 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -162,6 +162,7 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -2135,6 +2136,7 @@ public void testConcurrentWritesAndCommits() throws Exception { final int numDocsPerThread = randomIntBetween(500, 1000); final CyclicBarrier barrier = new CyclicBarrier(numIndexingThreads + 1); final List indexingThreads = new ArrayList<>(); + final CountDownLatch doneLatch = new CountDownLatch(numIndexingThreads); // create N indexing threads to index documents simultaneously for (int threadNum = 0; threadNum < numIndexingThreads; threadNum++) { final int threadIdx = threadNum; @@ -2149,7 +2151,10 @@ public void testConcurrentWritesAndCommits() throws Exception { } } catch (Exception e) { throw new RuntimeException(e); + } finally { + doneLatch.countDown(); } + }); indexingThreads.add(indexingThread); } @@ -2159,12 +2164,19 @@ public void testConcurrentWritesAndCommits() throws Exception { thread.start(); } barrier.await(); // wait for indexing threads to all be ready to start - + int commitLimit = randomIntBetween(10, 20); + long sleepTime = 1; // create random commit points boolean doneIndexing; do { - doneIndexing = indexingThreads.stream().filter(Thread::isAlive).count() == 0; + doneIndexing = doneLatch.await(sleepTime, TimeUnit.MILLISECONDS); commits.add(engine.acquireIndexCommit(true)); + if (commits.size() > commitLimit) { // don't keep on piling up too many commits + IOUtils.close(commits.remove(randomIntBetween(0, commits.size()-1))); + // we increase the wait time to make sure we eventually if things are slow wait for threads to finish. + // this will reduce pressure on disks and will allow threads to make progress without piling up too many commits + sleepTime = sleepTime * 2; + } } while (doneIndexing == false); // now, verify all the commits have the correct docs according to the user commit data