Skip to content

Commit d92c9a0

Browse files
committed
Extract RunOnce into a dedicated class (#35489)
This commit extracts the static inner class RunOnce from WorkerBulkByScrollTaskState so that it can be reused at other places.
1 parent afa4138 commit d92c9a0

File tree

4 files changed

+173
-45
lines changed

4 files changed

+173
-45
lines changed

modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@
3636
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
3737
import org.elasticsearch.action.bulk.BulkRequest;
3838
import org.elasticsearch.action.bulk.BulkResponse;
39-
import org.elasticsearch.index.reindex.ScrollableHitSource.Hit;
40-
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
4139
import org.elasticsearch.action.delete.DeleteRequest;
4240
import org.elasticsearch.action.delete.DeleteResponse;
4341
import org.elasticsearch.action.index.IndexRequest;
@@ -66,6 +64,8 @@
6664
import org.elasticsearch.common.xcontent.XContentType;
6765
import org.elasticsearch.index.Index;
6866
import org.elasticsearch.index.engine.VersionConflictEngineException;
67+
import org.elasticsearch.index.reindex.ScrollableHitSource.Hit;
68+
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
6969
import org.elasticsearch.index.shard.ShardId;
7070
import org.elasticsearch.rest.RestStatus;
7171
import org.elasticsearch.search.SearchHit;
@@ -324,8 +324,13 @@ public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable comman
324324
// While we're here we can check that the sleep made it through
325325
assertThat(delay.nanos(), greaterThan(0L));
326326
assertThat(delay.seconds(), lessThanOrEqualTo(10L));
327-
((AbstractRunnable) command).onRejection(new EsRejectedExecutionException("test"));
328-
return null;
327+
final EsRejectedExecutionException exception = new EsRejectedExecutionException("test");
328+
if (command instanceof AbstractRunnable) {
329+
((AbstractRunnable) command).onRejection(exception);
330+
return null;
331+
} else {
332+
throw exception;
333+
}
329334
}
330335
});
331336
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 0, emptyList(), null);
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.common.util.concurrent;
20+
21+
import java.util.Objects;
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
24+
/**
25+
* Runnable that can only be run one time.
26+
*/
27+
public class RunOnce implements Runnable {
28+
29+
private final Runnable delegate;
30+
private final AtomicBoolean hasRun;
31+
32+
public RunOnce(final Runnable delegate) {
33+
this.delegate = Objects.requireNonNull(delegate);
34+
this.hasRun = new AtomicBoolean(false);
35+
}
36+
37+
@Override
38+
public void run() {
39+
if (hasRun.compareAndSet(false, true)) {
40+
delegate.run();
41+
}
42+
}
43+
44+
/**
45+
* {@code true} if the {@link RunOnce} has been executed once.
46+
*/
47+
public boolean hasRun() {
48+
return hasRun.get();
49+
}
50+
}

server/src/main/java/org/elasticsearch/index/reindex/WorkerBulkByScrollTaskState.java

Lines changed: 13 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,13 @@
2323
import org.apache.logging.log4j.Logger;
2424
import org.elasticsearch.common.unit.TimeValue;
2525
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
26+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
2627
import org.elasticsearch.common.util.concurrent.FutureUtils;
28+
import org.elasticsearch.common.util.concurrent.RunOnce;
2729
import org.elasticsearch.threadpool.ThreadPool;
2830

2931
import java.util.concurrent.ScheduledFuture;
3032
import java.util.concurrent.TimeUnit;
31-
import java.util.concurrent.atomic.AtomicBoolean;
3233
import java.util.concurrent.atomic.AtomicInteger;
3334
import java.util.concurrent.atomic.AtomicLong;
3435
import java.util.concurrent.atomic.AtomicReference;
@@ -188,8 +189,12 @@ public void delayPrepareBulkRequest(ThreadPool threadPool, TimeValue lastBatchSt
188189
synchronized (delayedPrepareBulkRequestReference) {
189190
TimeValue delay = throttleWaitTime(lastBatchStartTime, timeValueNanos(System.nanoTime()), lastBatchSize);
190191
logger.debug("[{}]: preparing bulk request for [{}]", task.getId(), delay);
191-
delayedPrepareBulkRequestReference.set(new DelayedPrepareBulkRequest(threadPool, getRequestsPerSecond(),
192-
delay, new RunOnce(prepareBulkRequestRunnable)));
192+
try {
193+
delayedPrepareBulkRequestReference.set(new DelayedPrepareBulkRequest(threadPool, getRequestsPerSecond(),
194+
delay, new RunOnce(prepareBulkRequestRunnable)));
195+
} catch (EsRejectedExecutionException e) {
196+
prepareBulkRequestRunnable.onRejection(e);
197+
}
193198
}
194199
}
195200

@@ -242,25 +247,17 @@ public void rethrottle(float newRequestsPerSecond) {
242247

243248
class DelayedPrepareBulkRequest {
244249
private final ThreadPool threadPool;
245-
private final AbstractRunnable command;
250+
private final Runnable command;
246251
private final float requestsPerSecond;
247252
private final ScheduledFuture<?> future;
248253

249-
DelayedPrepareBulkRequest(ThreadPool threadPool, float requestsPerSecond, TimeValue delay, AbstractRunnable command) {
254+
DelayedPrepareBulkRequest(ThreadPool threadPool, float requestsPerSecond, TimeValue delay, Runnable command) {
250255
this.threadPool = threadPool;
251256
this.requestsPerSecond = requestsPerSecond;
252257
this.command = command;
253-
this.future = threadPool.schedule(delay, ThreadPool.Names.GENERIC, new AbstractRunnable() {
254-
@Override
255-
protected void doRun() throws Exception {
256-
throttledNanos.addAndGet(delay.nanos());
257-
command.run();
258-
}
259-
260-
@Override
261-
public void onFailure(Exception e) {
262-
command.onFailure(e);
263-
}
258+
this.future = threadPool.schedule(delay, ThreadPool.Names.GENERIC, () -> {
259+
throttledNanos.addAndGet(delay.nanos());
260+
command.run();
264261
});
265262
}
266263

@@ -302,29 +299,4 @@ TimeValue newDelay(long remainingDelay, float newRequestsPerSecond) {
302299
return timeValueNanos(round(remainingDelay * requestsPerSecond / newRequestsPerSecond));
303300
}
304301
}
305-
306-
/**
307-
* Runnable that can only be run one time. This is paranoia to prevent furiously rethrottling from running the command multiple times.
308-
* Without it the command would be run multiple times.
309-
*/
310-
private static class RunOnce extends AbstractRunnable {
311-
private final AtomicBoolean hasRun = new AtomicBoolean(false);
312-
private final AbstractRunnable delegate;
313-
314-
RunOnce(AbstractRunnable delegate) {
315-
this.delegate = delegate;
316-
}
317-
318-
@Override
319-
protected void doRun() throws Exception {
320-
if (hasRun.compareAndSet(false, true)) {
321-
delegate.run();
322-
}
323-
}
324-
325-
@Override
326-
public void onFailure(Exception e) {
327-
delegate.onFailure(e);
328-
}
329-
}
330302
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.common.util.concurrent;
21+
22+
import org.elasticsearch.test.ESTestCase;
23+
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.atomic.AtomicInteger;
26+
27+
public class RunOnceTests extends ESTestCase {
28+
29+
public void testRunOnce() {
30+
final AtomicInteger counter = new AtomicInteger(0);
31+
final RunOnce runOnce = new RunOnce(counter::incrementAndGet);
32+
assertFalse(runOnce.hasRun());
33+
34+
runOnce.run();
35+
assertTrue(runOnce.hasRun());
36+
assertEquals(1, counter.get());
37+
38+
runOnce.run();
39+
assertTrue(runOnce.hasRun());
40+
assertEquals(1, counter.get());
41+
}
42+
43+
public void testRunOnceConcurrently() throws InterruptedException {
44+
final AtomicInteger counter = new AtomicInteger(0);
45+
final RunOnce runOnce = new RunOnce(counter::incrementAndGet);
46+
47+
final Thread[] threads = new Thread[between(3, 10)];
48+
final CountDownLatch latch = new CountDownLatch(1);
49+
for (int i = 0; i < threads.length; i++) {
50+
threads[i] = new Thread(() -> {
51+
try {
52+
latch.await();
53+
} catch (InterruptedException e) {
54+
throw new RuntimeException(e);
55+
}
56+
runOnce.run();
57+
});
58+
threads[i].start();
59+
}
60+
61+
latch.countDown();
62+
for (Thread thread : threads) {
63+
thread.join();
64+
}
65+
assertTrue(runOnce.hasRun());
66+
assertEquals(1, counter.get());
67+
}
68+
69+
public void testRunOnceWithAbstractRunnable() {
70+
final AtomicInteger onRun = new AtomicInteger(0);
71+
final AtomicInteger onFailure = new AtomicInteger(0);
72+
final AtomicInteger onAfter = new AtomicInteger(0);
73+
74+
final RunOnce runOnce = new RunOnce(new AbstractRunnable() {
75+
@Override
76+
protected void doRun() throws Exception {
77+
onRun.incrementAndGet();
78+
throw new RuntimeException("failure");
79+
}
80+
81+
@Override
82+
public void onFailure(Exception e) {
83+
onFailure.incrementAndGet();
84+
}
85+
86+
@Override
87+
public void onAfter() {
88+
onAfter.incrementAndGet();
89+
}
90+
});
91+
92+
final int iterations = randomIntBetween(1, 10);
93+
for (int i = 0; i < iterations; i++) {
94+
runOnce.run();
95+
assertEquals(1, onRun.get());
96+
assertEquals(1, onFailure.get());
97+
assertEquals(1, onAfter.get());
98+
assertTrue(runOnce.hasRun());
99+
}
100+
}
101+
}

0 commit comments

Comments
 (0)