Skip to content

Commit cacc3f7

Browse files
Async IO Processor release before notify (#43682)
This commit changes async IO processor to release the promiseSemaphore before notifying consumers. This ensures that a bad consumer that sometimes does blocking (or otherwise slow) operations does not halt the processor. This should slightly increase the concurrency for shard fsync, but primarily improves safety so that one bad piece of code has less effect on overall system performance.
1 parent c593085 commit cacc3f7

File tree

2 files changed

+74
-19
lines changed

2 files changed

+74
-19
lines changed

server/src/main/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessor.java

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -75,35 +75,33 @@ public final void put(Item item, Consumer<Exception> listener) {
7575
// while we are draining that mean we might exit below too early in the while loop if the drainAndSync call is fast.
7676
if (promised || promiseSemaphore.tryAcquire()) {
7777
final List<Tuple<Item, Consumer<Exception>>> candidates = new ArrayList<>();
78-
try {
79-
if (promised) {
80-
// we are responsible for processing we don't need to add the tuple to the queue we can just add it to the candidates
81-
// no need to preserve context for listener since it runs in current thread.
82-
candidates.add(new Tuple<>(item, listener));
83-
}
84-
// since we made the promise to process we gotta do it here at least once
85-
drainAndProcess(candidates);
86-
} finally {
87-
promiseSemaphore.release(); // now to ensure we are passing it on we release the promise so another thread can take over
78+
if (promised) {
79+
// we are responsible for processing we don't need to add the tuple to the queue we can just add it to the candidates
80+
// no need to preserve context for listener since it runs in current thread.
81+
candidates.add(new Tuple<>(item, listener));
8882
}
83+
// since we made the promise to process we gotta do it here at least once
84+
drainAndProcessAndRelease(candidates);
8985
while (queue.isEmpty() == false && promiseSemaphore.tryAcquire()) {
9086
// yet if the queue is not empty AND nobody else has yet made the promise to take over we continue processing
91-
try {
92-
drainAndProcess(candidates);
93-
} finally {
94-
promiseSemaphore.release();
95-
}
87+
drainAndProcessAndRelease(candidates);
9688
}
9789
}
9890
}
9991

100-
private void drainAndProcess(List<Tuple<Item, Consumer<Exception>>> candidates) {
101-
queue.drainTo(candidates);
102-
processList(candidates);
92+
private void drainAndProcessAndRelease(List<Tuple<Item, Consumer<Exception>>> candidates) {
93+
Exception exception;
94+
try {
95+
queue.drainTo(candidates);
96+
exception = processList(candidates);
97+
} finally {
98+
promiseSemaphore.release();
99+
}
100+
notifyList(candidates, exception);
103101
candidates.clear();
104102
}
105103

106-
private void processList(List<Tuple<Item, Consumer<Exception>>> candidates) {
104+
private Exception processList(List<Tuple<Item, Consumer<Exception>>> candidates) {
107105
Exception exception = null;
108106
if (candidates.isEmpty() == false) {
109107
try {
@@ -114,6 +112,10 @@ private void processList(List<Tuple<Item, Consumer<Exception>>> candidates) {
114112
exception = ex;
115113
}
116114
}
115+
return exception;
116+
}
117+
118+
private void notifyList(List<Tuple<Item, Consumer<Exception>>> candidates, Exception exception) {
117119
for (Tuple<Item, Consumer<Exception>> tuple : candidates) {
118120
Consumer<Exception> consumer = tuple.v2();
119121
try {

server/src/test/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessorTests.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,12 @@
2727
import java.io.IOException;
2828
import java.util.Collections;
2929
import java.util.List;
30+
import java.util.concurrent.BrokenBarrierException;
3031
import java.util.concurrent.CountDownLatch;
32+
import java.util.concurrent.CyclicBarrier;
3133
import java.util.concurrent.Semaphore;
3234
import java.util.concurrent.TimeUnit;
35+
import java.util.concurrent.TimeoutException;
3336
import java.util.concurrent.atomic.AtomicInteger;
3437
import java.util.function.Consumer;
3538
import java.util.stream.Collectors;
@@ -239,4 +242,54 @@ public void run() {
239242
assertEquals(threadCount, received.get());
240243
threads.forEach(t -> assertFalse(t.isAlive()));
241244
}
245+
246+
public void testSlowConsumer() {
247+
AtomicInteger received = new AtomicInteger(0);
248+
AtomicInteger notified = new AtomicInteger(0);
249+
250+
AsyncIOProcessor<Object> processor = new AsyncIOProcessor<Object>(logger, scaledRandomIntBetween(1, 2024), threadContext) {
251+
@Override
252+
protected void write(List<Tuple<Object, Consumer<Exception>>> candidates) throws IOException {
253+
received.addAndGet(candidates.size());
254+
}
255+
};
256+
257+
int threadCount = randomIntBetween(2, 10);
258+
CyclicBarrier barrier = new CyclicBarrier(threadCount);
259+
Semaphore serializePutSemaphore = new Semaphore(1);
260+
List<Thread> threads = IntStream.range(0, threadCount).mapToObj(i -> new Thread(getTestName() + "_" + i) {
261+
{
262+
setDaemon(true);
263+
}
264+
265+
@Override
266+
public void run() {
267+
try {
268+
assertTrue(serializePutSemaphore.tryAcquire(10, TimeUnit.SECONDS));
269+
} catch (InterruptedException e) {
270+
throw new RuntimeException(e);
271+
}
272+
processor.put(new Object(), (e) -> {
273+
serializePutSemaphore.release();
274+
try {
275+
barrier.await(10, TimeUnit.SECONDS);
276+
} catch (InterruptedException | BrokenBarrierException | TimeoutException ex) {
277+
throw new RuntimeException(ex);
278+
}
279+
notified.incrementAndGet();
280+
});
281+
}
282+
}).collect(Collectors.toList());
283+
threads.forEach(Thread::start);
284+
threads.forEach(t -> {
285+
try {
286+
t.join(20000);
287+
} catch (InterruptedException e) {
288+
throw new RuntimeException(e);
289+
}
290+
});
291+
assertEquals(threadCount, notified.get());
292+
assertEquals(threadCount, received.get());
293+
threads.forEach(t -> assertFalse(t.isAlive()));
294+
}
242295
}

0 commit comments

Comments
 (0)