diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/FutureExecutor.java b/server/src/main/java/org/elasticsearch/cluster/coordination/FutureExecutor.java new file mode 100644 index 0000000000000..b1e9a14d7f198 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/FutureExecutor.java @@ -0,0 +1,33 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.common.unit.TimeValue; + +/** + * Device which supports running a task after some delay has elapsed. + */ +public interface FutureExecutor { + /** + * Schedule the given task for execution after the given delay has elapsed. + */ + void schedule(Runnable task, TimeValue delay); +} + diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 51a4adec8d16d..18f57b3a99380 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -362,9 +362,13 @@ public Runnable preserveContext(Runnable command) { return getThreadContext().preserveContext(command); } - public void shutdown() { + protected final void stopCachedTimeThread() { cachedTimeThread.running = false; cachedTimeThread.interrupt(); + } + + public void shutdown() { + stopCachedTimeThread(); scheduler.shutdown(); for (ExecutorHolder executor : executors.values()) { if (executor.executor() instanceof ThreadPoolExecutor) { @@ -374,8 +378,7 @@ public void shutdown() { } public void shutdownNow() { - cachedTimeThread.running = false; - cachedTimeThread.interrupt(); + stopCachedTimeThread(); scheduler.shutdownNow(); for (ExecutorHolder executor : executors.values()) { if (executor.executor() instanceof ThreadPoolExecutor) { diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java new file mode 100644 index 0000000000000..6afdf81a8b45f --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueue.java @@ -0,0 +1,360 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.coordination; + +import com.carrotsearch.randomizedtesting.generators.RandomNumbers; +import org.apache.lucene.util.Counter; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPoolInfo; +import org.elasticsearch.threadpool.ThreadPoolStats; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public class DeterministicTaskQueue extends AbstractComponent { + + private final List runnableTasks = new ArrayList<>(); + private List deferredTasks = new ArrayList<>(); + private long currentTimeMillis; + private long nextDeferredTaskExecutionTimeMillis = Long.MAX_VALUE; + + public DeterministicTaskQueue(Settings settings) { + super(settings); + } + + /** + * @return whether there are any runnable tasks. + */ + public boolean hasRunnableTasks() { + return runnableTasks.isEmpty() == false; + } + + /** + * @return whether there are any deferred tasks, i.e. tasks that are scheduled for the future. + */ + public boolean hasDeferredTasks() { + return deferredTasks.isEmpty() == false; + } + + /** + * @return the current (simulated) time, in milliseconds. + */ + public long getCurrentTimeMillis() { + return currentTimeMillis; + } + + /** + * Runs the first runnable task. + */ + public void runNextTask() { + assert hasRunnableTasks(); + runTask(0); + } + + /** + * Runs an arbitrary runnable task. + */ + public void runRandomTask(final Random random) { + assert hasRunnableTasks(); + runTask(RandomNumbers.randomIntBetween(random, 0, runnableTasks.size() - 1)); + } + + private void runTask(final int index) { + final Runnable task = runnableTasks.remove(index); + logger.trace("running task {} of {}: {}", index, runnableTasks.size() + 1, task); + task.run(); + } + + /** + * Schedule a task for immediate execution. + */ + public void scheduleNow(final Runnable task) { + logger.trace("scheduleNow: adding runnable {}", task); + runnableTasks.add(task); + } + + /** + * Schedule a task for future execution. + */ + public void scheduleAt(final long executionTimeMillis, final Runnable task) { + if (executionTimeMillis <= currentTimeMillis) { + logger.trace("scheduleAt: [{}ms] is not in the future, adding runnable {}", executionTimeMillis, task); + runnableTasks.add(task); + } else { + final DeferredTask deferredTask = new DeferredTask(executionTimeMillis, task); + logger.trace("scheduleAt: adding {}", deferredTask); + nextDeferredTaskExecutionTimeMillis = Math.min(nextDeferredTaskExecutionTimeMillis, executionTimeMillis); + deferredTasks.add(deferredTask); + } + } + + /** + * Advance the current time to the time of the next deferred task, and update the sets of deferred and runnable tasks accordingly. + */ + public void advanceTime() { + assert hasDeferredTasks(); + assert currentTimeMillis < nextDeferredTaskExecutionTimeMillis; + + logger.trace("advanceTime: from [{}ms] to [{}ms]", currentTimeMillis, nextDeferredTaskExecutionTimeMillis); + currentTimeMillis = nextDeferredTaskExecutionTimeMillis; + + nextDeferredTaskExecutionTimeMillis = Long.MAX_VALUE; + List remainingDeferredTasks = new ArrayList<>(); + for (final DeferredTask deferredTask : deferredTasks) { + assert currentTimeMillis <= deferredTask.getExecutionTimeMillis(); + if (deferredTask.getExecutionTimeMillis() == currentTimeMillis) { + logger.trace("advanceTime: no longer deferred: {}", deferredTask); + runnableTasks.add(deferredTask.getTask()); + } else { + remainingDeferredTasks.add(deferredTask); + nextDeferredTaskExecutionTimeMillis = Math.min(nextDeferredTaskExecutionTimeMillis, deferredTask.getExecutionTimeMillis()); + } + } + deferredTasks = remainingDeferredTasks; + + assert deferredTasks.isEmpty() == (nextDeferredTaskExecutionTimeMillis == Long.MAX_VALUE); + } + + /** + * @return A FutureExecutor that uses this task queue. + */ + public FutureExecutor getFutureExecutor() { + return (task, delay) -> scheduleAt(currentTimeMillis + delay.millis(), task); + } + + /** + * @return A ExecutorService that uses this task queue. + */ + public ExecutorService getExecutorService() { + return new ExecutorService() { + + @Override + public void shutdown() { + throw new UnsupportedOperationException(); + } + + @Override + public List shutdownNow() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isShutdown() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isTerminated() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + @Override + public Future submit(Callable task) { + throw new UnsupportedOperationException(); + } + + @Override + public Future submit(Runnable task, T result) { + throw new UnsupportedOperationException(); + } + + @Override + public Future submit(Runnable task) { + throw new UnsupportedOperationException(); + } + + @Override + public List> invokeAll(Collection> tasks) { + throw new UnsupportedOperationException(); + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + @Override + public T invokeAny(Collection> tasks) { + throw new UnsupportedOperationException(); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + @Override + public void execute(Runnable command) { + scheduleNow(command); + } + }; + } + + /** + * @return A ThreadPool that uses this task queue. + */ + public ThreadPool getThreadPool() { + return new ThreadPool(settings) { + + { + stopCachedTimeThread(); + } + + @Override + public long relativeTimeInMillis() { + return currentTimeMillis; + } + + @Override + public long absoluteTimeInMillis() { + return currentTimeMillis; + } + + @Override + public Counter estimatedTimeInMillisCounter() { + return new Counter() { + @Override + public long addAndGet(long delta) { + throw new UnsupportedOperationException(); + } + + @Override + public long get() { + return currentTimeMillis; + } + }; + } + + @Override + public ThreadPoolInfo info() { + throw new UnsupportedOperationException(); + } + + @Override + public Info info(String name) { + throw new UnsupportedOperationException(); + } + + @Override + public ThreadPoolStats stats() { + throw new UnsupportedOperationException(); + } + + @Override + public ExecutorService generic() { + return getExecutorService(); + } + + @Override + public ExecutorService executor(String name) { + return getExecutorService(); + } + + @Override + public ScheduledFuture schedule(TimeValue delay, String executor, Runnable command) { + throw new UnsupportedOperationException(); + } + + @Override + public Cancellable scheduleWithFixedDelay(Runnable command, TimeValue interval, String executor) { + throw new UnsupportedOperationException(); + } + + @Override + public Runnable preserveContext(Runnable command) { + throw new UnsupportedOperationException(); + } + + @Override + public void shutdown() { + throw new UnsupportedOperationException(); + } + + @Override + public void shutdownNow() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + @Override + public ScheduledExecutorService scheduler() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() { + throw new UnsupportedOperationException(); + } + + @Override + public ThreadContext getThreadContext() { + throw new UnsupportedOperationException(); + } + }; + } + + private static class DeferredTask { + private final long executionTimeMillis; + private final Runnable task; + + DeferredTask(long executionTimeMillis, Runnable task) { + this.executionTimeMillis = executionTimeMillis; + this.task = task; + assert executionTimeMillis < Long.MAX_VALUE : "Long.MAX_VALUE is special, cannot be an execution time"; + } + + long getExecutionTimeMillis() { + return executionTimeMillis; + } + + Runnable getTask() { + return task; + } + + @Override + public String toString() { + return "DeferredTask{" + + "executionTimeMillis=" + executionTimeMillis + + ", task=" + task + + '}'; + } + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java new file mode 100644 index 0000000000000..adba2bcc8a0e8 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java @@ -0,0 +1,319 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.coordination; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; + +import static org.elasticsearch.node.Node.NODE_NAME_SETTING; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.core.Is.is; + +public class DeterministicTaskQueueTests extends ESTestCase { + + public void testRunNextTask() { + final DeterministicTaskQueue taskQueue = newTaskQueue(); + final List strings = new ArrayList<>(2); + + taskQueue.scheduleNow(() -> strings.add("foo")); + taskQueue.scheduleNow(() -> strings.add("bar")); + + assertThat(strings, empty()); + + assertTrue(taskQueue.hasRunnableTasks()); + taskQueue.runNextTask(); + assertThat(strings, contains("foo")); + + assertTrue(taskQueue.hasRunnableTasks()); + taskQueue.runNextTask(); + assertThat(strings, contains("foo", "bar")); + + assertFalse(taskQueue.hasRunnableTasks()); + } + + public void testRunRandomTask() { + final List strings1 = getResultsOfRunningRandomly(new Random(4520795446362137264L)); + final List strings2 = getResultsOfRunningRandomly(new Random(266504691902226821L)); + assertThat(strings1, not(equalTo(strings2))); + } + + private List getResultsOfRunningRandomly(Random random) { + final DeterministicTaskQueue taskQueue = newTaskQueue(); + final List strings = new ArrayList<>(4); + + taskQueue.scheduleNow(() -> strings.add("foo")); + taskQueue.scheduleNow(() -> strings.add("bar")); + taskQueue.scheduleNow(() -> strings.add("baz")); + taskQueue.scheduleNow(() -> strings.add("quux")); + + assertThat(strings, empty()); + + while (taskQueue.hasRunnableTasks()) { + taskQueue.runRandomTask(random); + } + + assertThat(strings, containsInAnyOrder("foo", "bar", "baz", "quux")); + return strings; + } + + public void testStartsAtTimeZero() { + final DeterministicTaskQueue taskQueue = newTaskQueue(); + assertThat(taskQueue.getCurrentTimeMillis(), is(0L)); + } + + private void advanceToRandomTime(DeterministicTaskQueue taskQueue) { + taskQueue.scheduleAt(randomLongBetween(1, 100), () -> { + }); + taskQueue.advanceTime(); + taskQueue.runNextTask(); + assertFalse(taskQueue.hasRunnableTasks()); + assertFalse(taskQueue.hasDeferredTasks()); + } + + public void testDoesNotDeferTasksForImmediateExecution() { + final DeterministicTaskQueue taskQueue = newTaskQueue(); + advanceToRandomTime(taskQueue); + + final List strings = new ArrayList<>(1); + + taskQueue.scheduleAt(taskQueue.getCurrentTimeMillis(), () -> strings.add("foo")); + assertTrue(taskQueue.hasRunnableTasks()); + assertFalse(taskQueue.hasDeferredTasks()); + taskQueue.runNextTask(); + assertThat(strings, contains("foo")); + + assertFalse(taskQueue.hasRunnableTasks()); + } + + public void testDoesNotDeferTasksScheduledInThePast() { + final DeterministicTaskQueue taskQueue = newTaskQueue(); + advanceToRandomTime(taskQueue); + + final List strings = new ArrayList<>(1); + + taskQueue.scheduleAt(taskQueue.getCurrentTimeMillis() - randomInt(200), () -> strings.add("foo")); + assertTrue(taskQueue.hasRunnableTasks()); + assertFalse(taskQueue.hasDeferredTasks()); + taskQueue.runNextTask(); + assertThat(strings, contains("foo")); + + assertFalse(taskQueue.hasRunnableTasks()); + } + + public void testDefersTasksWithPositiveDelays() { + final DeterministicTaskQueue taskQueue = newTaskQueue(); + final List strings = new ArrayList<>(1); + + final long executionTimeMillis = randomLongBetween(1, 100); + taskQueue.scheduleAt(executionTimeMillis, () -> strings.add("foo")); + assertThat(taskQueue.getCurrentTimeMillis(), is(0L)); + assertFalse(taskQueue.hasRunnableTasks()); + assertTrue(taskQueue.hasDeferredTasks()); + + taskQueue.advanceTime(); + assertThat(taskQueue.getCurrentTimeMillis(), is(executionTimeMillis)); + assertTrue(taskQueue.hasRunnableTasks()); + assertFalse(taskQueue.hasDeferredTasks()); + + taskQueue.runNextTask(); + assertThat(strings, contains("foo")); + + assertFalse(taskQueue.hasRunnableTasks()); + assertFalse(taskQueue.hasDeferredTasks()); + } + + public void testKeepsFutureTasksDeferred() { + final DeterministicTaskQueue taskQueue = newTaskQueue(); + final List strings = new ArrayList<>(2); + + final long executionTimeMillis1 = randomLongBetween(1, 100); + final long executionTimeMillis2 = randomLongBetween(executionTimeMillis1 + 1, 200); + + taskQueue.scheduleAt(executionTimeMillis1, () -> strings.add("foo")); + taskQueue.scheduleAt(executionTimeMillis2, () -> strings.add("bar")); + + assertThat(taskQueue.getCurrentTimeMillis(), is(0L)); + assertFalse(taskQueue.hasRunnableTasks()); + assertTrue(taskQueue.hasDeferredTasks()); + + taskQueue.advanceTime(); + assertThat(taskQueue.getCurrentTimeMillis(), is(executionTimeMillis1)); + assertTrue(taskQueue.hasRunnableTasks()); + assertTrue(taskQueue.hasDeferredTasks()); + + taskQueue.runNextTask(); + assertThat(strings, contains("foo")); + assertFalse(taskQueue.hasRunnableTasks()); + assertTrue(taskQueue.hasDeferredTasks()); + + taskQueue.advanceTime(); + assertThat(taskQueue.getCurrentTimeMillis(), is(executionTimeMillis2)); + assertTrue(taskQueue.hasRunnableTasks()); + assertFalse(taskQueue.hasDeferredTasks()); + + taskQueue.runNextTask(); + assertThat(strings, contains("foo", "bar")); + } + + public void testExecutesTasksInTimeOrder() { + final DeterministicTaskQueue taskQueue = newTaskQueue(); + final List strings = new ArrayList<>(3); + + final long executionTimeMillis1 = randomLongBetween(1, 100); + final long executionTimeMillis2 = randomLongBetween(executionTimeMillis1 + 100, 300); + + taskQueue.scheduleAt(executionTimeMillis1, () -> strings.add("foo")); + taskQueue.scheduleAt(executionTimeMillis2, () -> strings.add("bar")); + + assertThat(taskQueue.getCurrentTimeMillis(), is(0L)); + assertFalse(taskQueue.hasRunnableTasks()); + assertTrue(taskQueue.hasDeferredTasks()); + + taskQueue.advanceTime(); + assertThat(taskQueue.getCurrentTimeMillis(), is(executionTimeMillis1)); + assertTrue(taskQueue.hasRunnableTasks()); + assertTrue(taskQueue.hasDeferredTasks()); + + taskQueue.runNextTask(); + assertThat(strings, contains("foo")); + assertFalse(taskQueue.hasRunnableTasks()); + assertTrue(taskQueue.hasDeferredTasks()); + + final long executionTimeMillis3 = randomLongBetween(executionTimeMillis1 + 1, executionTimeMillis2 - 1); + taskQueue.scheduleAt(executionTimeMillis3, () -> strings.add("baz")); + + taskQueue.advanceTime(); + assertThat(taskQueue.getCurrentTimeMillis(), is(executionTimeMillis3)); + assertTrue(taskQueue.hasRunnableTasks()); + assertTrue(taskQueue.hasDeferredTasks()); + + taskQueue.runNextTask(); + taskQueue.advanceTime(); + taskQueue.runNextTask(); + assertThat(strings, contains("foo", "baz", "bar")); + assertThat(taskQueue.getCurrentTimeMillis(), is(executionTimeMillis2)); + assertFalse(taskQueue.hasRunnableTasks()); + assertFalse(taskQueue.hasDeferredTasks()); + } + + public void testExecutorServiceEnqueuesTasks() { + final DeterministicTaskQueue taskQueue = newTaskQueue(); + final List strings = new ArrayList<>(2); + + final ExecutorService executorService = taskQueue.getExecutorService(); + assertFalse(taskQueue.hasRunnableTasks()); + executorService.execute(() -> strings.add("foo")); + assertTrue(taskQueue.hasRunnableTasks()); + executorService.execute(() -> strings.add("bar")); + + assertThat(strings, empty()); + + while (taskQueue.hasRunnableTasks()) { + taskQueue.runRandomTask(random()); + } + + assertThat(strings, containsInAnyOrder("foo", "bar")); + } + + public void testThreadPoolEnqueuesTasks() { + final DeterministicTaskQueue taskQueue = newTaskQueue(); + final List strings = new ArrayList<>(2); + + final ThreadPool threadPool = taskQueue.getThreadPool(); + assertFalse(taskQueue.hasRunnableTasks()); + threadPool.generic().execute(() -> strings.add("foo")); + assertTrue(taskQueue.hasRunnableTasks()); + threadPool.executor("anything").execute(() -> strings.add("bar")); + + assertThat(strings, empty()); + + while (taskQueue.hasRunnableTasks()) { + taskQueue.runRandomTask(random()); + } + + assertThat(strings, containsInAnyOrder("foo", "bar")); + } + + public void testFutureExecutorSchedulesTasks() { + final DeterministicTaskQueue taskQueue = newTaskQueue(); + advanceToRandomTime(taskQueue); + final long startTime = taskQueue.getCurrentTimeMillis(); + + final List strings = new ArrayList<>(5); + + final FutureExecutor futureExecutor = taskQueue.getFutureExecutor(); + final long delayMillis = randomLongBetween(1, 100); + + futureExecutor.schedule(() -> strings.add("deferred"), TimeValue.timeValueMillis(delayMillis)); + assertFalse(taskQueue.hasRunnableTasks()); + assertTrue(taskQueue.hasDeferredTasks()); + + futureExecutor.schedule(() -> strings.add("runnable"), TimeValue.ZERO); + assertTrue(taskQueue.hasRunnableTasks()); + + futureExecutor.schedule(() -> strings.add("also runnable"), TimeValue.MINUS_ONE); + + runAllTasks(taskQueue); + + assertThat(taskQueue.getCurrentTimeMillis(), is(startTime + delayMillis)); + assertThat(strings, contains("runnable", "also runnable", "deferred")); + + final long delayMillis1 = randomLongBetween(2, 100); + final long delayMillis2 = randomLongBetween(1, delayMillis1 - 1); + + futureExecutor.schedule(() -> strings.add("further deferred"), TimeValue.timeValueMillis(delayMillis1)); + futureExecutor.schedule(() -> strings.add("not quite so deferred"), TimeValue.timeValueMillis(delayMillis2)); + + assertFalse(taskQueue.hasRunnableTasks()); + assertTrue(taskQueue.hasDeferredTasks()); + + runAllTasks(taskQueue); + assertThat(taskQueue.getCurrentTimeMillis(), is(startTime + delayMillis + delayMillis1)); + assertThat(strings, contains("runnable", "also runnable", "deferred", "not quite so deferred", "further deferred")); + } + + private static void runAllTasks(DeterministicTaskQueue taskQueue) { + while (true) { + while (taskQueue.hasRunnableTasks()) { + taskQueue.runNextTask(); + } + if (taskQueue.hasDeferredTasks()) { + taskQueue.advanceTime(); + } else { + break; + } + } + } + + private static DeterministicTaskQueue newTaskQueue() { + return new DeterministicTaskQueue(Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build()); + } +}