Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.index.reindex.ScrollableHitSource.Hit;
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
Expand Down Expand Up @@ -65,6 +63,8 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.reindex.ScrollableHitSource.Hit;
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
Expand Down Expand Up @@ -323,8 +323,13 @@ public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable comman
// While we're here we can check that the sleep made it through
assertThat(delay.nanos(), greaterThan(0L));
assertThat(delay.seconds(), lessThanOrEqualTo(10L));
((AbstractRunnable) command).onRejection(new EsRejectedExecutionException("test"));
return null;
final EsRejectedExecutionException exception = new EsRejectedExecutionException("test");
if (command instanceof AbstractRunnable) {
((AbstractRunnable) command).onRejection(exception);
return null;
} else {
throw exception;
}
}
});
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 0, emptyList(), null);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Runnable that can only be run one time.
*/
public class RunOnce implements Runnable {

private final Runnable delegate;
private final AtomicBoolean hasRun;

public RunOnce(final Runnable delegate) {
this.delegate = Objects.requireNonNull(delegate);
this.hasRun = new AtomicBoolean(false);
}

@Override
public void run() {
if (hasRun.compareAndSet(false, true)) {
delegate.run();
}
}

/**
* {@code true} if the {@link RunOnce} has been executed once.
*/
public boolean hasRun() {
return hasRun.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -188,8 +189,12 @@ public void delayPrepareBulkRequest(ThreadPool threadPool, TimeValue lastBatchSt
synchronized (delayedPrepareBulkRequestReference) {
TimeValue delay = throttleWaitTime(lastBatchStartTime, timeValueNanos(System.nanoTime()), lastBatchSize);
logger.debug("[{}]: preparing bulk request for [{}]", task.getId(), delay);
delayedPrepareBulkRequestReference.set(new DelayedPrepareBulkRequest(threadPool, getRequestsPerSecond(),
delay, new RunOnce(prepareBulkRequestRunnable)));
try {
delayedPrepareBulkRequestReference.set(new DelayedPrepareBulkRequest(threadPool, getRequestsPerSecond(),
delay, new RunOnce(prepareBulkRequestRunnable)));
} catch (EsRejectedExecutionException e) {
prepareBulkRequestRunnable.onRejection(e);
}
}
}

Expand Down Expand Up @@ -242,25 +247,17 @@ public void rethrottle(float newRequestsPerSecond) {

class DelayedPrepareBulkRequest {
private final ThreadPool threadPool;
private final AbstractRunnable command;
private final Runnable command;
private final float requestsPerSecond;
private final ScheduledFuture<?> future;

DelayedPrepareBulkRequest(ThreadPool threadPool, float requestsPerSecond, TimeValue delay, AbstractRunnable command) {
DelayedPrepareBulkRequest(ThreadPool threadPool, float requestsPerSecond, TimeValue delay, Runnable command) {
this.threadPool = threadPool;
this.requestsPerSecond = requestsPerSecond;
this.command = command;
this.future = threadPool.schedule(delay, ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
throttledNanos.addAndGet(delay.nanos());
command.run();
}

@Override
public void onFailure(Exception e) {
command.onFailure(e);
}
this.future = threadPool.schedule(delay, ThreadPool.Names.GENERIC, () -> {
throttledNanos.addAndGet(delay.nanos());
command.run();
});
}

Expand Down Expand Up @@ -302,29 +299,4 @@ TimeValue newDelay(long remainingDelay, float newRequestsPerSecond) {
return timeValueNanos(round(remainingDelay * requestsPerSecond / newRequestsPerSecond));
}
}

/**
* Runnable that can only be run one time. This is paranoia to prevent furiously rethrottling from running the command multiple times.
* Without it the command would be run multiple times.
*/
private static class RunOnce extends AbstractRunnable {
private final AtomicBoolean hasRun = new AtomicBoolean(false);
private final AbstractRunnable delegate;

RunOnce(AbstractRunnable delegate) {
this.delegate = delegate;
}

@Override
protected void doRun() throws Exception {
if (hasRun.compareAndSet(false, true)) {
delegate.run();
}
}

@Override
public void onFailure(Exception e) {
delegate.onFailure(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common.util.concurrent;

import org.elasticsearch.test.ESTestCase;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

public class RunOnceTests extends ESTestCase {

public void testRunOnce() {
final AtomicInteger counter = new AtomicInteger(0);
final RunOnce runOnce = new RunOnce(counter::incrementAndGet);
assertFalse(runOnce.hasRun());

runOnce.run();
assertTrue(runOnce.hasRun());
assertEquals(1, counter.get());

runOnce.run();
assertTrue(runOnce.hasRun());
assertEquals(1, counter.get());
}

public void testRunOnceConcurrently() throws InterruptedException {
final AtomicInteger counter = new AtomicInteger(0);
final RunOnce runOnce = new RunOnce(counter::incrementAndGet);

final Thread[] threads = new Thread[between(3, 10)];
final CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
runOnce.run();
});
threads[i].start();
}

latch.countDown();
for (Thread thread : threads) {
thread.join();
}
assertTrue(runOnce.hasRun());
assertEquals(1, counter.get());
}

public void testRunOnceWithAbstractRunnable() {
final AtomicInteger onRun = new AtomicInteger(0);
final AtomicInteger onFailure = new AtomicInteger(0);
final AtomicInteger onAfter = new AtomicInteger(0);

final RunOnce runOnce = new RunOnce(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
onRun.incrementAndGet();
throw new RuntimeException("failure");
}

@Override
public void onFailure(Exception e) {
onFailure.incrementAndGet();
}

@Override
public void onAfter() {
onAfter.incrementAndGet();
}
});

final int iterations = randomIntBetween(1, 10);
for (int i = 0; i < iterations; i++) {
runOnce.run();
assertEquals(1, onRun.get());
assertEquals(1, onFailure.get());
assertEquals(1, onAfter.get());
assertTrue(runOnce.hasRun());
}
}
}