Skip to content

Commit 97b40f0

Browse files
committed
Use RunOnce when appropriate (#35553)
This pull request replaces some blocks of code that must be run once and that are currently based on AtomicBoolean by the convient RunOnce class added in #35489.
1 parent 96f3e5e commit 97b40f0

File tree

4 files changed

+25
-36
lines changed

4 files changed

+25
-36
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.common.collect.Tuple;
2929
import org.elasticsearch.common.lease.Releasable;
3030
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
31+
import org.elasticsearch.common.util.concurrent.RunOnce;
3132
import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext;
3233
import org.elasticsearch.core.internal.io.IOUtils;
3334
import org.elasticsearch.threadpool.ThreadPool;
@@ -128,12 +129,12 @@ public void asyncBlockOperations(final ActionListener<Releasable> onAcquired, fi
128129
delayOperations();
129130
threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() {
130131

131-
final AtomicBoolean released = new AtomicBoolean(false);
132+
final RunOnce released = new RunOnce(() -> releaseDelayedOperations());
132133

133134
@Override
134135
public void onFailure(final Exception e) {
135136
try {
136-
releaseDelayedOperationsIfNeeded(); // resume delayed operations as soon as possible
137+
released.run(); // resume delayed operations as soon as possible
137138
} finally {
138139
onAcquired.onFailure(e);
139140
}
@@ -146,16 +147,10 @@ protected void doRun() throws Exception {
146147
try {
147148
releasable.close();
148149
} finally {
149-
releaseDelayedOperationsIfNeeded();
150+
released.run();
150151
}
151152
});
152153
}
153-
154-
private void releaseDelayedOperationsIfNeeded() {
155-
if (released.compareAndSet(false, true)) {
156-
releaseDelayedOperations();
157-
}
158-
}
159154
});
160155
}
161156

@@ -177,13 +172,11 @@ private Releasable acquireAll(final long timeout, final TimeUnit timeUnit) throw
177172
}
178173
}
179174
if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) {
180-
final AtomicBoolean closed = new AtomicBoolean();
181-
return () -> {
182-
if (closed.compareAndSet(false, true)) {
183-
assert semaphore.availablePermits() == 0;
184-
semaphore.release(TOTAL_PERMITS);
185-
}
186-
};
175+
final RunOnce release = new RunOnce(() -> {
176+
assert semaphore.availablePermits() == 0;
177+
semaphore.release(TOTAL_PERMITS);
178+
});
179+
return release::run;
187180
} else {
188181
throw new TimeoutException("timeout while blocking operations");
189182
}

test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.elasticsearch.common.unit.TimeValue;
3838
import org.elasticsearch.common.util.BigArrays;
3939
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
40+
import org.elasticsearch.common.util.concurrent.RunOnce;
4041
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
4142
import org.elasticsearch.node.Node;
4243
import org.elasticsearch.plugins.Plugin;
@@ -66,7 +67,6 @@
6667
import java.util.Set;
6768
import java.util.concurrent.CopyOnWriteArrayList;
6869
import java.util.concurrent.LinkedBlockingDeque;
69-
import java.util.concurrent.atomic.AtomicBoolean;
7070
import java.util.function.Function;
7171
import java.util.function.Supplier;
7272

@@ -348,21 +348,17 @@ public void sendRequest(Transport.Connection connection, long requestId, String
348348
request.writeTo(bStream);
349349
final TransportRequest clonedRequest = reg.newRequest(bStream.bytes().streamInput());
350350

351-
Runnable runnable = new AbstractRunnable() {
352-
AtomicBoolean requestSent = new AtomicBoolean();
353-
351+
final RunOnce runnable = new RunOnce(new AbstractRunnable() {
354352
@Override
355353
public void onFailure(Exception e) {
356354
logger.debug("failed to send delayed request", e);
357355
}
358356

359357
@Override
360358
protected void doRun() throws IOException {
361-
if (requestSent.compareAndSet(false, true)) {
362-
connection.sendRequest(requestId, action, clonedRequest, options);
363-
}
359+
connection.sendRequest(requestId, action, clonedRequest, options);
364360
}
365-
};
361+
});
366362

367363
// store the request to send it once the rule is cleared.
368364
synchronized (this) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListener.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.elasticsearch.xpack.ml.job.process.autodetect.output;
77

88
import org.elasticsearch.common.Nullable;
9+
import org.elasticsearch.common.util.concurrent.RunOnce;
910
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
1011

1112
import java.time.Duration;
@@ -14,16 +15,21 @@
1415
import java.util.concurrent.ConcurrentMap;
1516
import java.util.concurrent.CountDownLatch;
1617
import java.util.concurrent.TimeUnit;
17-
import java.util.concurrent.atomic.AtomicBoolean;
1818

1919
class FlushListener {
2020

2121
final ConcurrentMap<String, FlushAcknowledgementHolder> awaitingFlushed = new ConcurrentHashMap<>();
22-
final AtomicBoolean cleared = new AtomicBoolean(false);
22+
final RunOnce onClear = new RunOnce(() -> {
23+
Iterator<ConcurrentMap.Entry<String, FlushAcknowledgementHolder>> latches = awaitingFlushed.entrySet().iterator();
24+
while (latches.hasNext()) {
25+
latches.next().getValue().latch.countDown();
26+
latches.remove();
27+
}
28+
});
2329

2430
@Nullable
2531
FlushAcknowledgement waitForFlush(String flushId, Duration timeout) throws InterruptedException {
26-
if (cleared.get()) {
32+
if (onClear.hasRun()) {
2733
return null;
2834
}
2935

@@ -49,13 +55,7 @@ void clear(String flushId) {
4955
}
5056

5157
void clear() {
52-
if (cleared.compareAndSet(false, true)) {
53-
Iterator<ConcurrentMap.Entry<String, FlushAcknowledgementHolder>> latches = awaitingFlushed.entrySet().iterator();
54-
while (latches.hasNext()) {
55-
latches.next().getValue().latch.countDown();
56-
latches.remove();
57-
}
58-
}
58+
onClear.run();
5959
}
6060

6161
private static class FlushAcknowledgementHolder {

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/FlushListenerTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,14 @@ public void testClear() throws Exception {
6060
}
6161
assertBusy(() -> assertEquals(numWaits, listener.awaitingFlushed.size()));
6262
assertThat(flushAcknowledgementHolders.stream().map(f -> f.get()).filter(f -> f != null).findAny().isPresent(), is(false));
63-
assertFalse(listener.cleared.get());
63+
assertFalse(listener.onClear.hasRun());
6464

6565
listener.clear();
6666

6767
for (AtomicReference<FlushAcknowledgement> f : flushAcknowledgementHolders) {
6868
assertBusy(() -> assertNotNull(f.get()));
6969
}
7070
assertTrue(listener.awaitingFlushed.isEmpty());
71-
assertTrue(listener.cleared.get());
71+
assertTrue(listener.onClear.hasRun());
7272
}
7373
}

0 commit comments

Comments
 (0)