-
Notifications
You must be signed in to change notification settings - Fork 25.6k
TESTS: Real Coordinator in SnapshotServiceTests #37162
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TESTS: Real Coordinator in SnapshotServiceTests #37162
Conversation
original-brownbear
commented
Jan 5, 2019
- Introduce real coordinator in SnapshotServiceTests to be able to test network disruptions realistically
- Make adjustments to cluster applier service so that we can pass a mocked single threaded executor for tests
* Introduce real coordinator in SnapshotServiceTests to be able to test network disruptions realistically * Make adjustments to cluster applier service so that we can pass a mocked single threaded executor for tests
|
Pinging @elastic/es-distributed |
|
|
||
| // there is no equals on cluster state, so we just serialize it to XContent and compare Maps | ||
| // deserialized from the resulting JSON | ||
| private boolean assertPreviousStateConsistency(ClusterChangedEvent event) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was able to reproduce very rare failures on this assertion that were the result of the routing nodes being ordered differently across the previousState and the getLastAcceptedState state.
With the serialization round trip and using the equals from the resulting map it seems completely stable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
weird. Can you dig deeper why the routing nodes being ordered differently?
| threadPool.scheduler())); | ||
| } | ||
|
|
||
| public ClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Allow passing in a factory for the PrioritizedEsThreadPoolExecutor to prevent using any other threads outside the deterministic task queue.
| startCluster(); | ||
| } | ||
|
|
||
| private static <T> ActionListener<T> assertNoFailureListener(Runnable r) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed this to something more appropriate.
| @Override | ||
| protected Optional<DisruptableMockTransport> getDisruptedCapturingTransport(DiscoveryNode node, String action) { | ||
| return Optional.ofNullable(testClusterNodes.nodes.get(node.getName()).mockTransport); | ||
| final Predicate<TestClusterNode> matchesDestination; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic in this override is of the disruptable mock transport is now super similar to what CoordinatorTests use, we can probably just dry things up here in a next step.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
|
@ywelsch This would be the infrastructure additions/changes needed to build a test that simulates network failures (without a real I figured it's easier to make this change first since it has a few prod. code changes here and there and then bring in the actual tests for disruptions. Either way this is an improvement making the existing test more realistic I hope :) |
|
|
||
| // there is no equals on cluster state, so we just serialize it to XContent and compare Maps | ||
| // deserialized from the resulting JSON | ||
| private boolean assertPreviousStateConsistency(ClusterChangedEvent event) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
weird. Can you dig deeper why the routing nodes being ordered differently?
|
|
||
| private final String nodeName; | ||
|
|
||
| public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this constructor is not used anymore
| } | ||
|
|
||
| public ClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, | ||
| Supplier<PrioritizedEsThreadPoolExecutor> threadPoolExecutorFactory) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In MasterService, this was mocked out by having a protected method createThreadPoolExecutor(). Can you align it so that both classes allow the mocking in the same way?
| threadPool = deterministicTaskQueue.getThreadPool(); | ||
| final Deque<Runnable> orderedTasks = new ArrayDeque<>(); | ||
| clusterService = new ClusterService(settings, clusterSettings, threadPool, masterService, | ||
| () -> new PrioritizedEsThreadPoolExecutor(node.getName(), 1, 1, 1, TimeUnit.SECONDS, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it's possible to just mock out the thread factory but otherwise reuse the full ESThreadPoolExecutor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't do that, unfortunately. I just tried it and what happens is that the normal execute delegates to java.util.concurrent.ThreadPoolExecutor#execute and that method eventually leads to running java.util.concurrent.ThreadPoolExecutor#runWorker on the thread created by the mocked out factory and that method blocks => tests get stuck :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try the following patch, which will do the trick I think (I added a basic test in DeterministicTaskQueueTests as well):
diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java
index d7dc4c6541c..2efa2ca9083 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java
@@ -66,7 +66,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
@@ -104,10 +103,8 @@ import org.junit.Before;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
-import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
-import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -384,29 +381,30 @@ public class SnapshotsServiceTests extends ESTestCase {
final Settings settings = environment.settings();
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
threadPool = deterministicTaskQueue.getThreadPool();
- final Deque<Runnable> orderedTasks = new ArrayDeque<>();
+ class KillWorkerError extends Error {}
clusterService = new ClusterService(settings, clusterSettings, threadPool, masterService,
- () -> new PrioritizedEsThreadPoolExecutor(node.getName(), 1, 1, 1, TimeUnit.SECONDS,
+ () -> new PrioritizedEsThreadPoolExecutor(node.getName(), 0, 1, 0, TimeUnit.SECONDS,
r -> new Thread() {
@Override
public void start() {
- throw new UnsupportedOperationException();
+ deterministicTaskQueue.scheduleNow(() -> {
+ try {
+ r.run();
+ } catch (KillWorkerError kwe) {
+ // hacks everywhere
+ }
+ });
}
},
- null, null) {
+ threadPool.getThreadContext(), threadPool.scheduler()) {
@Override
- public void execute(Runnable command, TimeValue timeout, Runnable timeoutCallback) {
- throw new UnsupportedOperationException();
+ protected void afterExecute(Runnable r, Throwable t) {
+ super.afterExecute(r, t);
+ // kill worker so that next one will be scheduled
+ throw new KillWorkerError();
}
- @Override
- public void execute(Runnable command) {
- // Ensure ordered execution of the tasks here since the threadpool we're
- // mocking out is single threaded
- orderedTasks.addLast(command);
- deterministicTaskQueue.scheduleNow(() -> orderedTasks.pollFirst().run());
- }
});
mockTransport = new DisruptableMockTransport(logger) {
@Override
diff --git a/test/framework/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java b/test/framework/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java
index 5238cfe8ecd..29e03f37b2e 100644
--- a/test/framework/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java
+++ b/test/framework/src/test/java/org/elasticsearch/cluster/coordination/DeterministicTaskQueueTests.java
@@ -19,8 +19,11 @@
package org.elasticsearch.cluster.coordination;
+import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
+import org.elasticsearch.common.util.concurrent.PrioritizedRunnable;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
@@ -30,6 +33,7 @@ import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -422,6 +426,60 @@ public class DeterministicTaskQueueTests extends ESTestCase {
assertThat(strings, contains("periodic-0", "periodic-1", "periodic-2"));
}
+ public void testPrioritizedEsThreadPoolExecutor() {
+ final DeterministicTaskQueue taskQueue = newTaskQueue();
+ class KillWorkerError extends Error {}
+ final PrioritizedEsThreadPoolExecutor executor = new PrioritizedEsThreadPoolExecutor("test", 0, 1, 0, TimeUnit.MILLISECONDS,
+ r -> new Thread() {
+ @Override
+ public void start() {
+ taskQueue.scheduleNow(() -> {
+ try {
+ r.run();
+ } catch (KillWorkerError kwe) {
+ // hacks everywhere
+ }
+ });
+ }
+ },
+ taskQueue.getThreadPool().getThreadContext(), taskQueue.getThreadPool().scheduler()) {
+
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ super.afterExecute(r, t);
+ // kill worker so that next one will be scheduled
+ throw new KillWorkerError();
+ }
+
+ };
+ final AtomicBoolean called1 = new AtomicBoolean();
+ final AtomicBoolean called2 = new AtomicBoolean();
+ executor.execute(new PrioritizedRunnable(Priority.NORMAL) {
+ @Override
+ public void run() {
+ assertFalse(called1.get()); // check that this is only called once
+ called1.set(true);
+ }
+ });
+ executor.execute(new PrioritizedRunnable(Priority.HIGH) {
+ @Override
+ public void run() {
+ assertFalse(called2.get()); // check that this is only called once
+ called2.set(true);
+ }
+ });
+ assertFalse(called1.get());
+ assertFalse(called2.get());
+ taskQueue.runRandomTask();
+ assertFalse(called1.get());
+ assertTrue(called2.get());
+ taskQueue.runRandomTask();
+ assertTrue(called1.get());
+ assertTrue(called2.get());
+ taskQueue.runRandomTask();
+ assertFalse(taskQueue.hasRunnableTasks());
+ }
+
private static DeterministicTaskQueue newTaskQueue() {
return newTaskQueue(random());
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ywelsch hmm two points here:
- Less important: Is this really making things easier/closer to the real thing?
- More important: How does this ensure correct order of task execution? It seems we only get correct order of task-enqueuing here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you give an example of what you mean by 2? i.e. expected behavior vs actual behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ywelsch nevermind, that was just me seeing ghosts (I could swear I saw the r in Thread#start being a TieBreakingRunnable randomly but can't reproduce that anymore). => Sorry for the noise :)
Should I extract that implementation to its own class maybe so that we have short tests or do you want to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can do that please
| @Override | ||
| protected Optional<DisruptableMockTransport> getDisruptedCapturingTransport(DiscoveryNode node, String action) { | ||
| return Optional.ofNullable(testClusterNodes.nodes.get(node.getName()).mockTransport); | ||
| final Predicate<TestClusterNode> matchesDestination; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
|
Jenkins test this |
|
Jenkins run Gradle build tests 1 |
|
@ywelsch all done, extracted your version of the executor and a test for it to their own classes (kept it in the package of the |
|
Jenkins run Gradle build tests 2 |
|
|
||
| deterministicTaskQueue.advanceTime(); | ||
| if (deterministicTaskQueue.hasRunnableTasks()) { | ||
| deterministicTaskQueue.runAllRunnableTasks(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the if (deterministicTaskQueue.hasRunnableTasks()) { condition is unnecessary
|
Jenkins run Gradle build tests 1 |