Skip to content

Commit 42f3129

Browse files
authored
Allow plugins to validate cluster-state on join (#26595)
Today we don't have a pluggable way to validate if the cluster state is compatible with the node that joins. We already apply some checks for index compatibility that prevents nodes to join a cluster with indices it doesn't support but for plugins this isn't possible. This change adds a cluster state validator that allows plugins to prevent a join if the cluster-state is incompatible.
1 parent 3d4e28a commit 42f3129

File tree

7 files changed

+94
-17
lines changed

7 files changed

+94
-17
lines changed

core/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.elasticsearch.discovery;
2121

22+
import org.elasticsearch.cluster.ClusterState;
23+
import org.elasticsearch.cluster.node.DiscoveryNode;
2224
import org.elasticsearch.cluster.routing.allocation.AllocationService;
2325
import org.elasticsearch.cluster.service.ClusterApplier;
2426
import org.elasticsearch.cluster.service.MasterService;
@@ -36,12 +38,15 @@
3638
import org.elasticsearch.threadpool.ThreadPool;
3739
import org.elasticsearch.transport.TransportService;
3840

41+
import java.util.ArrayList;
42+
import java.util.Collection;
3943
import java.util.Collections;
4044
import java.util.HashMap;
4145
import java.util.List;
4246
import java.util.Map;
4347
import java.util.Objects;
4448
import java.util.Optional;
49+
import java.util.function.BiConsumer;
4550
import java.util.function.Function;
4651
import java.util.function.Supplier;
4752

@@ -62,14 +67,18 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
6267
ClusterApplier clusterApplier, ClusterSettings clusterSettings, List<DiscoveryPlugin> plugins,
6368
AllocationService allocationService) {
6469
final UnicastHostsProvider hostsProvider;
65-
70+
final Collection<BiConsumer<DiscoveryNode,ClusterState>> joinValidators = new ArrayList<>();
6671
Map<String, Supplier<UnicastHostsProvider>> hostProviders = new HashMap<>();
6772
for (DiscoveryPlugin plugin : plugins) {
6873
plugin.getZenHostsProviders(transportService, networkService).entrySet().forEach(entry -> {
6974
if (hostProviders.put(entry.getKey(), entry.getValue()) != null) {
7075
throw new IllegalArgumentException("Cannot register zen hosts provider [" + entry.getKey() + "] twice");
7176
}
7277
});
78+
BiConsumer<DiscoveryNode, ClusterState> joinValidator = plugin.getJoinValidator();
79+
if (joinValidator != null) {
80+
joinValidators.add(joinValidator);
81+
}
7382
}
7483
Optional<String> hostsProviderName = DISCOVERY_HOSTS_PROVIDER_SETTING.get(settings);
7584
if (hostsProviderName.isPresent()) {
@@ -85,7 +94,7 @@ public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportServic
8594
Map<String, Supplier<Discovery>> discoveryTypes = new HashMap<>();
8695
discoveryTypes.put("zen",
8796
() -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
88-
clusterSettings, hostsProvider, allocationService));
97+
clusterSettings, hostsProvider, allocationService, Collections.unmodifiableCollection(joinValidators)));
8998
discoveryTypes.put("single-node", () -> new SingleNodeDiscovery(settings, transportService, masterService, clusterApplier));
9099
for (DiscoveryPlugin plugin : plugins) {
91100
plugin.getDiscoveryTypes(threadPool, transportService, namedWriteableRegistry,

core/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,10 @@
3939
import org.elasticsearch.transport.TransportService;
4040

4141
import java.io.IOException;
42+
import java.util.Collection;
4243
import java.util.concurrent.TimeUnit;
44+
import java.util.function.BiConsumer;
45+
import java.util.function.Supplier;
4346

4447
public class MembershipAction extends AbstractComponent {
4548

@@ -63,7 +66,8 @@ public interface MembershipListener {
6366

6467
private final MembershipListener listener;
6568

66-
public MembershipAction(Settings settings, TransportService transportService, MembershipListener listener) {
69+
public MembershipAction(Settings settings, TransportService transportService, MembershipListener listener,
70+
Collection<BiConsumer<DiscoveryNode,ClusterState>> joinValidators) {
6771
super(settings);
6872
this.transportService = transportService;
6973
this.listener = listener;
@@ -73,7 +77,7 @@ public MembershipAction(Settings settings, TransportService transportService, Me
7377
ThreadPool.Names.GENERIC, new JoinRequestRequestHandler());
7478
transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME,
7579
() -> new ValidateJoinRequest(), ThreadPool.Names.GENERIC,
76-
new ValidateJoinRequestRequestHandler());
80+
new ValidateJoinRequestRequestHandler(transportService::getLocalNode, joinValidators));
7781
transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest::new,
7882
ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler());
7983
}
@@ -176,12 +180,20 @@ public void writeTo(StreamOutput out) throws IOException {
176180
}
177181

178182
static class ValidateJoinRequestRequestHandler implements TransportRequestHandler<ValidateJoinRequest> {
183+
private final Supplier<DiscoveryNode> localNodeSupplier;
184+
private final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators;
185+
186+
ValidateJoinRequestRequestHandler(Supplier<DiscoveryNode> localNodeSupplier,
187+
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators) {
188+
this.localNodeSupplier = localNodeSupplier;
189+
this.joinValidators = joinValidators;
190+
}
179191

180192
@Override
181193
public void messageReceived(ValidateJoinRequest request, TransportChannel channel) throws Exception {
182-
ensureNodesCompatibility(Version.CURRENT, request.state.getNodes());
183-
ensureIndexCompatibility(Version.CURRENT, request.state.getMetaData());
184-
// for now, the mere fact that we can serialize the cluster state acts as validation....
194+
DiscoveryNode node = localNodeSupplier.get();
195+
assert node != null : "local node is null";
196+
joinValidators.stream().forEach(action -> action.accept(node, request.state));
185197
channel.sendResponse(TransportResponse.Empty.INSTANCE);
186198
}
187199
}

core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@
6969

7070
import java.io.IOException;
7171
import java.util.ArrayList;
72+
import java.util.Collection;
73+
import java.util.Collections;
7274
import java.util.List;
7375
import java.util.Locale;
7476
import java.util.Set;
@@ -78,6 +80,7 @@
7880
import java.util.concurrent.atomic.AtomicBoolean;
7981
import java.util.concurrent.atomic.AtomicInteger;
8082
import java.util.concurrent.atomic.AtomicReference;
83+
import java.util.function.BiConsumer;
8184
import java.util.function.Consumer;
8285
import java.util.stream.Collectors;
8386

@@ -146,15 +149,17 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
146149

147150
private final NodeJoinController nodeJoinController;
148151
private final NodeRemovalClusterStateTaskExecutor nodeRemovalExecutor;
149-
150152
private final ClusterApplier clusterApplier;
151153
private final AtomicReference<ClusterState> committedState; // last committed cluster state
152154
private final Object stateMutex = new Object();
155+
private final Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators;
153156

154157
public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService transportService,
155158
NamedWriteableRegistry namedWriteableRegistry, MasterService masterService, ClusterApplier clusterApplier,
156-
ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider, AllocationService allocationService) {
159+
ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider, AllocationService allocationService,
160+
Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators) {
157161
super(settings);
162+
this.onJoinValidators = addBuiltInJoinValidators(onJoinValidators);
158163
this.masterService = masterService;
159164
this.clusterApplier = clusterApplier;
160165
this.transportService = transportService;
@@ -211,7 +216,7 @@ public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService t
211216
namedWriteableRegistry,
212217
this,
213218
discoverySettings);
214-
this.membership = new MembershipAction(settings, transportService, new MembershipListener());
219+
this.membership = new MembershipAction(settings, transportService, new MembershipListener(), onJoinValidators);
215220
this.joinThreadControl = new JoinThreadControl();
216221

217222
this.nodeJoinController = new NodeJoinController(masterService, allocationService, electMaster, settings);
@@ -223,6 +228,17 @@ public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService t
223228
DISCOVERY_REJOIN_ACTION_NAME, RejoinClusterRequest::new, ThreadPool.Names.SAME, new RejoinClusterRequestHandler());
224229
}
225230

231+
static Collection<BiConsumer<DiscoveryNode,ClusterState>> addBuiltInJoinValidators(
232+
Collection<BiConsumer<DiscoveryNode,ClusterState>> onJoinValidators) {
233+
Collection<BiConsumer<DiscoveryNode, ClusterState>> validators = new ArrayList<>();
234+
validators.add((node, state) -> {
235+
MembershipAction.ensureNodesCompatibility(node.getVersion(), state.getNodes());
236+
MembershipAction.ensureIndexCompatibility(node.getVersion(), state.getMetaData());
237+
});
238+
validators.addAll(onJoinValidators);
239+
return Collections.unmodifiableCollection(validators);
240+
}
241+
226242
// protected to allow overriding in tests
227243
protected ZenPing newZenPing(Settings settings, ThreadPool threadPool, TransportService transportService,
228244
UnicastHostsProvider hostsProvider) {
@@ -885,8 +901,7 @@ void handleJoinRequest(final DiscoveryNode node, final ClusterState state, final
885901
} else {
886902
// we do this in a couple of places including the cluster update thread. This one here is really just best effort
887903
// to ensure we fail as fast as possible.
888-
MembershipAction.ensureNodesCompatibility(node.getVersion(), state.getNodes());
889-
MembershipAction.ensureIndexCompatibility(node.getVersion(), state.getMetaData());
904+
onJoinValidators.stream().forEach(a -> a.accept(node, state));
890905
if (state.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
891906
MembershipAction.ensureMajorVersionBarrier(node.getVersion(), state.getNodes().getMinNodeVersion());
892907
}
@@ -898,7 +913,8 @@ void handleJoinRequest(final DiscoveryNode node, final ClusterState state, final
898913
try {
899914
membership.sendValidateJoinRequestBlocking(node, state, joinTimeout);
900915
} catch (Exception e) {
901-
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to validate incoming join request from node [{}]", node), e);
916+
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to validate incoming join request from node [{}]", node),
917+
e);
902918
callback.onFailure(new IllegalStateException("failure when sending a validation request to node", e));
903919
return;
904920
}
@@ -1313,4 +1329,9 @@ public void start() {
13131329
}
13141330

13151331
}
1332+
1333+
public final Collection<BiConsumer<DiscoveryNode, ClusterState>> getOnJoinValidators() {
1334+
return onJoinValidators;
1335+
}
1336+
13161337
}

core/src/main/java/org/elasticsearch/plugins/DiscoveryPlugin.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,14 @@
1919

2020
package org.elasticsearch.plugins;
2121

22+
import java.util.Collection;
2223
import java.util.Collections;
2324
import java.util.Map;
25+
import java.util.function.BiConsumer;
2426
import java.util.function.Supplier;
2527

28+
import org.elasticsearch.cluster.ClusterState;
29+
import org.elasticsearch.cluster.node.DiscoveryNode;
2630
import org.elasticsearch.cluster.routing.allocation.AllocationService;
2731
import org.elasticsearch.cluster.service.ClusterApplier;
2832
import org.elasticsearch.cluster.service.MasterService;
@@ -106,4 +110,11 @@ default Map<String, Supplier<UnicastHostsProvider>> getZenHostsProviders(Transpo
106110
NetworkService networkService) {
107111
return Collections.emptyMap();
108112
}
113+
114+
/**
115+
* Returns a consumer that validate the initial join cluster state. The validator, unless <code>null</code> is called exactly once per
116+
* join attempt but might be called multiple times during the lifetime of a node. Validators are expected to throw a
117+
* {@link IllegalStateException} if the node and the cluster-state are incompatible.
118+
*/
119+
default BiConsumer<DiscoveryNode,ClusterState> getJoinValidator() { return null; }
109120
}

core/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import org.apache.lucene.util.IOUtils;
2222
import org.elasticsearch.Version;
23+
import org.elasticsearch.cluster.ClusterState;
24+
import org.elasticsearch.cluster.node.DiscoveryNode;
2325
import org.elasticsearch.cluster.routing.allocation.AllocationService;
2426
import org.elasticsearch.cluster.service.ClusterApplier;
2527
import org.elasticsearch.cluster.service.MasterService;
@@ -40,10 +42,12 @@
4042

4143
import java.io.IOException;
4244
import java.util.Arrays;
45+
import java.util.Collection;
4346
import java.util.Collections;
4447
import java.util.List;
4548
import java.util.Map;
4649
import java.util.concurrent.atomic.AtomicBoolean;
50+
import java.util.function.BiConsumer;
4751
import java.util.function.Supplier;
4852

4953
import static org.mockito.Mockito.mock;
@@ -160,7 +164,23 @@ public void testDuplicateHostsProvider() {
160164

161165
public void testLazyConstructionHostsProvider() {
162166
DummyHostsProviderPlugin plugin = () -> Collections.singletonMap("custom",
163-
() -> { throw new AssertionError("created hosts provider which was not selected"); });
167+
() -> {
168+
throw new AssertionError("created hosts provider which was not selected");
169+
});
164170
newModule(Settings.EMPTY, Collections.singletonList(plugin));
165171
}
172+
173+
public void testJoinValidator() {
174+
BiConsumer<DiscoveryNode, ClusterState> consumer = (a, b) -> {};
175+
DiscoveryModule module = newModule(Settings.EMPTY, Collections.singletonList(new DiscoveryPlugin() {
176+
@Override
177+
public BiConsumer<DiscoveryNode, ClusterState> getJoinValidator() {
178+
return consumer;
179+
}
180+
}));
181+
ZenDiscovery discovery = (ZenDiscovery) module.getDiscovery();
182+
Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators = discovery.getOnJoinValidators();
183+
assertEquals(2, onJoinValidators.size());
184+
assertTrue(onJoinValidators.contains(consumer));
185+
}
166186
}

core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,8 @@ public void onNewClusterState(String source, Supplier<ClusterState> clusterState
320320
}
321321
};
322322
ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, service, new NamedWriteableRegistry(ClusterModule.getNamedWriteables()),
323-
masterService, clusterApplier, clusterSettings, Collections::emptyList, ESAllocationTestCase.createAllocationService());
323+
masterService, clusterApplier, clusterSettings, Collections::emptyList, ESAllocationTestCase.createAllocationService(),
324+
Collections.emptyList());
324325
zenDiscovery.start();
325326
return zenDiscovery;
326327
}
@@ -342,7 +343,10 @@ public void testValidateOnUnsupportedIndexVersionCreated() throws Exception {
342343
ClusterState.Builder stateBuilder = ClusterState.builder(ClusterName.DEFAULT);
343344
final DiscoveryNode otherNode = new DiscoveryNode("other_node", buildNewFakeTransportAddress(), emptyMap(),
344345
EnumSet.allOf(DiscoveryNode.Role.class), Version.CURRENT);
345-
MembershipAction.ValidateJoinRequestRequestHandler request = new MembershipAction.ValidateJoinRequestRequestHandler();
346+
final DiscoveryNode localNode = new DiscoveryNode("other_node", buildNewFakeTransportAddress(), emptyMap(),
347+
EnumSet.allOf(DiscoveryNode.Role.class), Version.CURRENT);
348+
MembershipAction.ValidateJoinRequestRequestHandler request = new MembershipAction.ValidateJoinRequestRequestHandler
349+
(() -> localNode, ZenDiscovery.addBuiltInJoinValidators(Collections.emptyList()));
346350
final boolean incompatible = randomBoolean();
347351
IndexMetaData indexMetaData = IndexMetaData.builder("test").settings(Settings.builder()
348352
.put(SETTING_VERSION_CREATED, incompatible ? VersionUtils.getPreviousVersion(Version.CURRENT.minimumIndexCompatibilityVersion())

test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ private TestZenDiscovery(Settings settings, ThreadPool threadPool, TransportServ
8383
ClusterApplier clusterApplier, ClusterSettings clusterSettings, UnicastHostsProvider hostsProvider,
8484
AllocationService allocationService) {
8585
super(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier, clusterSettings,
86-
hostsProvider, allocationService);
86+
hostsProvider, allocationService, Collections.emptyList());
8787
}
8888

8989
@Override

0 commit comments

Comments
 (0)