Skip to content

Commit eacc63b

Browse files
TESTS: Real Coordinator in SnapshotServiceTests (#37162)
* TESTS: Real Coordinator in SnapshotServiceTests * Introduce real coordinator in SnapshotServiceTests to be able to test network disruptions realistically * Make adjustments to cluster applier service so that we can pass a mocked single threaded executor for tests
1 parent ae086eb commit eacc63b

File tree

7 files changed

+273
-82
lines changed

7 files changed

+273
-82
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@
5656
import org.elasticsearch.common.unit.TimeValue;
5757
import org.elasticsearch.common.util.concurrent.EsExecutors;
5858
import org.elasticsearch.common.util.concurrent.ListenableFuture;
59+
import org.elasticsearch.common.xcontent.XContentHelper;
60+
import org.elasticsearch.common.xcontent.json.JsonXContent;
5961
import org.elasticsearch.discovery.Discovery;
6062
import org.elasticsearch.discovery.DiscoverySettings;
6163
import org.elasticsearch.discovery.DiscoveryStats;
@@ -872,12 +874,7 @@ public void publish(ClusterChangedEvent clusterChangedEvent, ActionListener<Void
872874
return;
873875
}
874876

875-
// there is no equals on cluster state, so we just serialize it to XContent and compare JSON representation
876-
assert clusterChangedEvent.previousState() == coordinationState.get().getLastAcceptedState() ||
877-
Strings.toString(clusterChangedEvent.previousState()).equals(
878-
Strings.toString(clusterStateWithNoMasterBlock(coordinationState.get().getLastAcceptedState())))
879-
: Strings.toString(clusterChangedEvent.previousState()) + " vs "
880-
+ Strings.toString(clusterStateWithNoMasterBlock(coordinationState.get().getLastAcceptedState()));
877+
assert assertPreviousStateConsistency(clusterChangedEvent);
881878

882879
final ClusterState clusterState = clusterChangedEvent.state();
883880

@@ -917,6 +914,22 @@ public String toString() {
917914
}
918915
}
919916

917+
// there is no equals on cluster state, so we just serialize it to XContent and compare Maps
918+
// deserialized from the resulting JSON
919+
private boolean assertPreviousStateConsistency(ClusterChangedEvent event) {
920+
assert event.previousState() == coordinationState.get().getLastAcceptedState() ||
921+
XContentHelper.convertToMap(
922+
JsonXContent.jsonXContent, Strings.toString(event.previousState()), false
923+
).equals(
924+
XContentHelper.convertToMap(
925+
JsonXContent.jsonXContent,
926+
Strings.toString(clusterStateWithNoMasterBlock(coordinationState.get().getLastAcceptedState())),
927+
false))
928+
: Strings.toString(event.previousState()) + " vs "
929+
+ Strings.toString(clusterStateWithNoMasterBlock(coordinationState.get().getLastAcceptedState()));
930+
return true;
931+
}
932+
920933
private <T> ActionListener<T> wrapWithMutex(ActionListener<T> listener) {
921934
return new ActionListener<T>() {
922935
@Override

server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
8989

9090
private final Collection<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<>();
9191
private final Collection<TimeoutClusterStateListener> timeoutClusterStateListeners =
92-
Collections.newSetFromMap(new ConcurrentHashMap<TimeoutClusterStateListener, Boolean>());
92+
Collections.newSetFromMap(new ConcurrentHashMap<>());
9393

9494
private final LocalNodeMasterListeners localNodeMasterListeners;
9595

@@ -134,11 +134,15 @@ protected synchronized void doStart() {
134134
Objects.requireNonNull(nodeConnectionsService, "please set the node connection service before starting");
135135
Objects.requireNonNull(state.get(), "please set initial state before starting");
136136
addListener(localNodeMasterListeners);
137-
threadPoolExecutor = EsExecutors.newSinglePrioritizing(
138-
nodeName + "/" + CLUSTER_UPDATE_THREAD_NAME,
139-
daemonThreadFactory(nodeName, CLUSTER_UPDATE_THREAD_NAME),
140-
threadPool.getThreadContext(),
141-
threadPool.scheduler());
137+
threadPoolExecutor = createThreadPoolExecutor();
138+
}
139+
140+
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
141+
return EsExecutors.newSinglePrioritizing(
142+
nodeName + "/" + CLUSTER_UPDATE_THREAD_NAME,
143+
daemonThreadFactory(nodeName, CLUSTER_UPDATE_THREAD_NAME),
144+
threadPool.getThreadContext(),
145+
threadPool.scheduler());
142146
}
143147

144148
class UpdateTask extends SourcePrioritizedRunnable implements Function<ClusterState, ClusterState> {

server/src/main/java/org/elasticsearch/cluster/service/ClusterService.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,13 @@ public class ClusterService extends AbstractLifecycleComponent {
7171

7272
private final String nodeName;
7373

74-
public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool,
75-
MasterService masterService) {
74+
public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
75+
this(settings, clusterSettings, new MasterService(Node.NODE_NAME_SETTING.get(settings), settings, threadPool),
76+
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool));
77+
}
78+
79+
public ClusterService(Settings settings, ClusterSettings clusterSettings, MasterService masterService,
80+
ClusterApplierService clusterApplierService) {
7681
super(settings);
7782
this.settings = settings;
7883
this.nodeName = Node.NODE_NAME_SETTING.get(settings);
@@ -84,11 +89,7 @@ public ClusterService(Settings settings, ClusterSettings clusterSettings, Thread
8489
this::setSlowTaskLoggingThreshold);
8590
// Add a no-op update consumer so changes are logged
8691
this.clusterSettings.addAffixUpdateConsumer(USER_DEFINED_META_DATA, (first, second) -> {}, (first, second) -> {});
87-
this.clusterApplierService = new ClusterApplierService(nodeName, settings, clusterSettings, threadPool);
88-
}
89-
90-
public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
91-
this(settings, clusterSettings, threadPool, new MasterService(Node.NODE_NAME_SETTING.get(settings), settings, threadPool));
92+
this.clusterApplierService = clusterApplierService;
9293
}
9394

9495
private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {

0 commit comments

Comments
 (0)