-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Async IO Processor release before notify #43682
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Async IO Processor release before notify #43682
Conversation
This commit changes async IO processor to release the promiseSemaphore before notifying consumers. This ensures that a bad consumer that sometimes does blocking (or otherwise slow) operations does not halt the processor. This should slightly increase the concurrency for shard fsync, but primarily improves safety so that one bad piece of code has less effect on overall system performance.
|
Pinging @elastic/es-distributed |
|
tough class :) - I first checked who wrote it and why am I reviewing 🤦♂ I do wonder if it would be enough to just do the draining under the lock and do all process and notifying outside like this: diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessor.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessor.java
index ad68471041b..ca9edae13f1 100644
--- a/server/src/main/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessor.java
+++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/AsyncIOProcessor.java
@@ -78,24 +78,26 @@ public abstract class AsyncIOProcessor<Item> {
// we are responsible for processing we don't need to add the tuple to the queue we can just add it to the candidates
candidates.add(itemTuple);
}
- // since we made the promise to process we gotta do it here at least once
- drainAndProcess(candidates);
+ queue.drainTo(candidates);
} finally {
promiseSemaphore.release(); // now to ensure we are passing it on we release the promise so another thread can take over
}
+ // since we made the promise to process we gotta do it here at least once
+ process(candidates);
while (queue.isEmpty() == false && promiseSemaphore.tryAcquire()) {
// yet if the queue is not empty AND nobody else has yet made the promise to take over we continue processing
try {
- drainAndProcess(candidates);
+ queue.drainTo(candidates);
} finally {
promiseSemaphore.release();
}
+ process(candidates);
}
+
}
}
- private void drainAndProcess(List<Tuple<Item, Consumer<Exception>>> candidates) {
- queue.drainTo(candidates);
+ private void process(List<Tuple<Item, Consumer<Exception>>> candidates) {
processList(candidates);
candidates.clear();
} |
|
I think that would then mean that we have parallel threads doing fsync. I think it would be hard to imagine the async IO processor then having any effect 95% of the time, since we would put and drain under semaphore but then immediately release semaphore before fsync'ing, allowing the next thread to arrive to do the same. We have to hold the semaphore during fsync to get the queuing effect we are after? |
ywelsch
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
s1monw
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah good point. LGTM 2
…ocessor_concurrency
This commit changes async IO processor to release the promiseSemaphore before notifying consumers. This ensures that a bad consumer that sometimes does blocking (or otherwise slow) operations does not halt the processor. This should slightly increase the concurrency for shard fsync, but primarily improves safety so that one bad piece of code has less effect on overall system performance.
This commit changes AsyncIOProcessor to release the promiseSemaphore
before notifying consumers. This ensures that a bad consumer that
sometimes does blocking (or otherwise slow) operations does not halt the
processor. This should slightly increase the concurrency for translog
fsync, but primarily improves safety so that one bad piece of code has
less effect on overall write performance.