From d089bbaf2f1257168795ec34eec199b70772e3d7 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 22 Jun 2017 21:00:04 +0200 Subject: [PATCH] Ensure `InternalEngineTests.testConcurrentWritesAndCommits` doesn't pile up commits `InternalEngineTests.testConcurrentWritesAndCommits` can be very heavy on disks if threads are slow and the main thread keeps on pulling commit points holding on to many many segments. This commit adds some quadratic backoff to not pile up too many commits and to make sure indexing threads can make progress. This also now doesn't do busy waiting but waits on a latch with a timeout. Closes #25110 --- .../index/engine/InternalEngineTests.java | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 2dfdcf9482afb..8811083baa9cb 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -162,6 +162,7 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -2135,6 +2136,7 @@ public void testConcurrentWritesAndCommits() throws Exception { final int numDocsPerThread = randomIntBetween(500, 1000); final CyclicBarrier barrier = new CyclicBarrier(numIndexingThreads + 1); final List indexingThreads = new ArrayList<>(); + final CountDownLatch doneLatch = new CountDownLatch(numIndexingThreads); // create N indexing threads to index documents simultaneously for (int threadNum = 0; threadNum < numIndexingThreads; threadNum++) { final int threadIdx = threadNum; @@ -2149,7 +2151,10 @@ public void testConcurrentWritesAndCommits() throws Exception { } } catch (Exception e) { throw new RuntimeException(e); + } finally { + doneLatch.countDown(); } + }); indexingThreads.add(indexingThread); } @@ -2159,12 +2164,19 @@ public void testConcurrentWritesAndCommits() throws Exception { thread.start(); } barrier.await(); // wait for indexing threads to all be ready to start - + int commitLimit = randomIntBetween(10, 20); + long sleepTime = 1; // create random commit points boolean doneIndexing; do { - doneIndexing = indexingThreads.stream().filter(Thread::isAlive).count() == 0; + doneIndexing = doneLatch.await(sleepTime, TimeUnit.MILLISECONDS); commits.add(engine.acquireIndexCommit(true)); + if (commits.size() > commitLimit) { // don't keep on piling up too many commits + IOUtils.close(commits.remove(randomIntBetween(0, commits.size()-1))); + // we increase the wait time to make sure we eventually if things are slow wait for threads to finish. + // this will reduce pressure on disks and will allow threads to make progress without piling up too many commits + sleepTime = sleepTime * 2; + } } while (doneIndexing == false); // now, verify all the commits have the correct docs according to the user commit data