Skip to content

Commit 5965ab3

Browse files
committed
Add validation for supported index version on node join
Today we can easily join a cluster that holds an index we don't support since we currently allow rolling upgrades from 5.x to 6.x. This commit adds additional safety that fails cluster state validation if there is an open index with an incompatible index version created in the cluster. Realtes to elastic#21670
1 parent b7292a6 commit 5965ab3

File tree

5 files changed

+144
-11
lines changed

5 files changed

+144
-11
lines changed

core/src/main/java/org/elasticsearch/Version.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,21 @@ public Version minimumCompatibilityVersion() {
324324
return Version.smallest(this, fromId(bwcMajor * 1000000 + bwcMinor * 10000 + 99));
325325
}
326326

327+
/**
328+
* Returns the minimum created index version that this version supports. Indices created with lower versions
329+
* can't be used with this version.
330+
*/
331+
public Version minimumIndexCompatibilityVersion() {
332+
final int bwcMajor;
333+
if (major == 5) {
334+
bwcMajor = 2; // we jumped from 2 to 5
335+
} else {
336+
bwcMajor = major - 1;
337+
}
338+
final int bwcMinor = 0;
339+
return Version.smallest(this, fromId(bwcMajor * 1000000 + bwcMinor * 10000 + 99));
340+
}
341+
327342
/**
328343
* Returns <code>true</code> iff both version are compatible. Otherwise <code>false</code>
329344
*/

core/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,16 @@
1919

2020
package org.elasticsearch.discovery.zen;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.cluster.ClusterState;
24+
import org.elasticsearch.cluster.metadata.IndexMetaData;
25+
import org.elasticsearch.cluster.metadata.MetaData;
2326
import org.elasticsearch.cluster.node.DiscoveryNode;
2427
import org.elasticsearch.common.component.AbstractComponent;
2528
import org.elasticsearch.common.io.stream.StreamInput;
2629
import org.elasticsearch.common.io.stream.StreamOutput;
2730
import org.elasticsearch.common.settings.Settings;
2831
import org.elasticsearch.common.unit.TimeValue;
29-
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
3032
import org.elasticsearch.threadpool.ThreadPool;
3133
import org.elasticsearch.transport.EmptyTransportResponseHandler;
3234
import org.elasticsearch.transport.TransportChannel;
@@ -37,6 +39,7 @@
3739

3840
import java.io.IOException;
3941
import java.util.concurrent.TimeUnit;
42+
import java.util.function.Supplier;
4043

4144
public class MembershipAction extends AbstractComponent {
4245

@@ -58,21 +61,21 @@ public interface MembershipListener {
5861

5962
private final TransportService transportService;
6063

61-
private final DiscoveryNodesProvider nodesProvider;
6264

6365
private final MembershipListener listener;
6466

6567
public MembershipAction(Settings settings, TransportService transportService,
66-
DiscoveryNodesProvider nodesProvider, MembershipListener listener) {
68+
Supplier<DiscoveryNode> localNodeSupplier, MembershipListener listener) {
6769
super(settings);
6870
this.transportService = transportService;
69-
this.nodesProvider = nodesProvider;
7071
this.listener = listener;
7172

73+
7274
transportService.registerRequestHandler(DISCOVERY_JOIN_ACTION_NAME, JoinRequest::new,
7375
ThreadPool.Names.GENERIC, new JoinRequestRequestHandler());
74-
transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME, ValidateJoinRequest::new,
75-
ThreadPool.Names.GENERIC, new ValidateJoinRequestRequestHandler());
76+
transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME,
77+
() -> new ValidateJoinRequest(localNodeSupplier), ThreadPool.Names.GENERIC,
78+
new ValidateJoinRequestRequestHandler());
7679
transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest::new,
7780
ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler());
7881
}
@@ -152,20 +155,23 @@ public void onFailure(Exception e) {
152155
}
153156
}
154157

155-
class ValidateJoinRequest extends TransportRequest {
158+
static class ValidateJoinRequest extends TransportRequest {
159+
private final Supplier<DiscoveryNode> localNode;
156160
private ClusterState state;
157161

158-
ValidateJoinRequest() {
162+
ValidateJoinRequest(Supplier<DiscoveryNode> localNode) {
163+
this.localNode = localNode;
159164
}
160165

161166
ValidateJoinRequest(ClusterState state) {
162167
this.state = state;
168+
this.localNode = state.nodes()::getLocalNode;
163169
}
164170

165171
@Override
166172
public void readFrom(StreamInput in) throws IOException {
167173
super.readFrom(in);
168-
this.state = ClusterState.Builder.readFrom(in, nodesProvider.nodes().getLocalNode());
174+
this.state = ClusterState.Builder.readFrom(in, localNode.get());
169175
}
170176

171177
@Override
@@ -175,13 +181,25 @@ public void writeTo(StreamOutput out) throws IOException {
175181
}
176182
}
177183

178-
class ValidateJoinRequestRequestHandler implements TransportRequestHandler<ValidateJoinRequest> {
184+
static class ValidateJoinRequestRequestHandler implements TransportRequestHandler<ValidateJoinRequest> {
179185

180186
@Override
181187
public void messageReceived(ValidateJoinRequest request, TransportChannel channel) throws Exception {
188+
MetaData metaData = request.state.getMetaData();
189+
ensureAllIndicesAreCompatible(metaData);
182190
// for now, the mere fact that we can serialize the cluster state acts as validation....
183191
channel.sendResponse(TransportResponse.Empty.INSTANCE);
184192
}
193+
194+
void ensureAllIndicesAreCompatible(MetaData metaData) {
195+
for (IndexMetaData idxMetaData : metaData) {
196+
if(idxMetaData.getState() == IndexMetaData.State.OPEN &&
197+
idxMetaData.getCreationVersion().before(Version.CURRENT.minimumIndexCompatibilityVersion())) {
198+
throw new IllegalStateException("index " + idxMetaData.getIndex() + " version not supported: "
199+
+ idxMetaData.getCreationVersion());
200+
}
201+
}
202+
}
185203
}
186204

187205
public static class LeaveRequest extends TransportRequest {

core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService t
183183
new NewPendingClusterStateListener(),
184184
discoverySettings,
185185
clusterService.getClusterName());
186-
this.membership = new MembershipAction(settings, transportService, this, new MembershipListener());
186+
this.membership = new MembershipAction(settings, transportService, this::localNode, new MembershipListener());
187187
this.joinThreadControl = new JoinThreadControl(threadPool);
188188

189189
transportService.registerRequestHandler(

core/src/test/java/org/elasticsearch/VersionTests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,13 @@ public void testVersionComparison() throws Exception {
6464
assertTrue(Version.fromString("5.0.0").onOrAfter(Version.fromString("5.0.0-beta2")));
6565
assertTrue(Version.fromString("5.0.0-rc1").onOrAfter(Version.fromString("5.0.0-beta24")));
6666
assertTrue(Version.fromString("5.0.0-alpha24").before(Version.fromString("5.0.0-beta0")));
67+
}
6768

69+
public void testMinimumIndexCompatibilityVersion() {
70+
assertEquals(Version.V_5_0_0, Version.V_6_0_0_alpha1_UNRELEASED.minimumIndexCompatibilityVersion());
71+
assertEquals(Version.V_2_0_0, Version.V_5_0_0.minimumIndexCompatibilityVersion());
72+
assertEquals(Version.V_2_0_0, Version.V_5_1_0_UNRELEASED.minimumIndexCompatibilityVersion());
73+
assertEquals(Version.V_2_0_0, Version.V_5_0_0_alpha1.minimumIndexCompatibilityVersion());
6874
}
6975

7076
public void testVersionConstantPresent() {

core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,16 @@
2020
package org.elasticsearch.discovery.zen;
2121

2222
import java.io.Closeable;
23+
import java.io.IOException;
2324
import java.util.ArrayList;
2425
import java.util.Arrays;
2526
import java.util.Collections;
27+
import java.util.EnumSet;
2628
import java.util.HashSet;
2729
import java.util.List;
2830
import java.util.Set;
2931
import java.util.concurrent.TimeUnit;
32+
import java.util.concurrent.atomic.AtomicBoolean;
3033
import java.util.stream.Collectors;
3134

3235
import org.apache.lucene.util.IOUtils;
@@ -35,22 +38,40 @@
3538
import org.elasticsearch.cluster.ClusterChangedEvent;
3639
import org.elasticsearch.cluster.ClusterName;
3740
import org.elasticsearch.cluster.ClusterState;
41+
import org.elasticsearch.cluster.metadata.IndexMetaData;
42+
import org.elasticsearch.cluster.metadata.MetaData;
3843
import org.elasticsearch.cluster.node.DiscoveryNode;
3944
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
4045
import org.elasticsearch.cluster.node.DiscoveryNodes;
46+
import org.elasticsearch.cluster.routing.IndexRoutingTable;
47+
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
48+
import org.elasticsearch.cluster.routing.RoutingTable;
49+
import org.elasticsearch.cluster.routing.ShardRoutingState;
50+
import org.elasticsearch.cluster.routing.TestShardRouting;
51+
import org.elasticsearch.cluster.routing.UnassignedInfo;
4152
import org.elasticsearch.cluster.service.ClusterService;
4253
import org.elasticsearch.common.settings.ClusterSettings;
4354
import org.elasticsearch.common.settings.Settings;
4455
import org.elasticsearch.discovery.Discovery;
4556
import org.elasticsearch.discovery.zen.PublishClusterStateActionTests.AssertingAckListener;
57+
import org.elasticsearch.index.shard.ShardId;
4658
import org.elasticsearch.test.ESTestCase;
59+
import org.elasticsearch.test.VersionUtils;
4760
import org.elasticsearch.test.transport.MockTransportService;
4861
import org.elasticsearch.threadpool.TestThreadPool;
4962
import org.elasticsearch.threadpool.ThreadPool;
63+
import org.elasticsearch.transport.TransportChannel;
64+
import org.elasticsearch.transport.TransportResponse;
65+
import org.elasticsearch.transport.TransportResponseOptions;
5066
import org.elasticsearch.transport.TransportService;
5167

5268
import static java.util.Collections.emptyMap;
5369
import static java.util.Collections.emptySet;
70+
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE;
71+
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
72+
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
73+
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
74+
import static org.elasticsearch.cluster.routing.RoutingTableTests.updateActiveAllocations;
5475
import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
5576
import static org.elasticsearch.discovery.zen.ZenDiscovery.shouldIgnoreOrRejectNewClusterState;
5677
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
@@ -283,4 +304,77 @@ private Set<DiscoveryNode> fdNodesForState(ClusterState clusterState, DiscoveryN
283304
});
284305
return discoveryNodes;
285306
}
307+
308+
public void testValidateOnUnsupportedIndexVersionCreated() throws Exception {
309+
ClusterState.Builder stateBuilder = ClusterState.builder(ClusterName.DEFAULT);
310+
final DiscoveryNode otherNode = new DiscoveryNode("other_node", buildNewFakeTransportAddress(), emptyMap(),
311+
EnumSet.allOf(DiscoveryNode.Role.class), Version.CURRENT);
312+
MembershipAction.ValidateJoinRequestRequestHandler request = new MembershipAction.ValidateJoinRequestRequestHandler();
313+
final boolean closed = randomBoolean();
314+
IndexMetaData indexMetaData = IndexMetaData.builder("test").settings(Settings.builder()
315+
.put(SETTING_VERSION_CREATED, VersionUtils.getPreviousVersion(Version.CURRENT.minimumCompatibilityVersion()))
316+
.put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)
317+
.put(SETTING_CREATION_DATE, System.currentTimeMillis()))
318+
.state(closed ? IndexMetaData.State.CLOSE : IndexMetaData.State.OPEN)
319+
.build();
320+
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetaData.getIndex());
321+
RoutingTable.Builder routing = new RoutingTable.Builder();
322+
routing.addAsNew(indexMetaData);
323+
final ShardId shardId = new ShardId("test", "_na_", 0);
324+
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);
325+
326+
final DiscoveryNode primaryNode = otherNode;
327+
indexShardRoutingBuilder.addShard(TestShardRouting.newShardRouting("test", 0, primaryNode.getId(), null, true,
328+
ShardRoutingState.INITIALIZING, new UnassignedInfo(UnassignedInfo.Reason.INDEX_REOPENED, "getting there")));
329+
indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build());
330+
IndexRoutingTable indexRoutingTable = indexRoutingTableBuilder.build();
331+
IndexMetaData updatedIndexMetaData = updateActiveAllocations(indexRoutingTable, indexMetaData);
332+
stateBuilder.metaData(MetaData.builder().put(updatedIndexMetaData, false).generateClusterUuidIfNeeded())
333+
.routingTable(RoutingTable.builder().add(indexRoutingTable).build());
334+
if (closed == false) {
335+
IllegalStateException ex = expectThrows(IllegalStateException.class, () ->
336+
request.messageReceived(new MembershipAction.ValidateJoinRequest(stateBuilder.build()), null));
337+
assertEquals("index [test] version not supported: "
338+
+ VersionUtils.getPreviousVersion(Version.CURRENT.minimumCompatibilityVersion()), ex.getMessage());
339+
} else {
340+
AtomicBoolean sendResponse = new AtomicBoolean(false);
341+
request.messageReceived(new MembershipAction.ValidateJoinRequest(stateBuilder.build()), new TransportChannel() {
342+
@Override
343+
public String action() {
344+
return null;
345+
}
346+
347+
@Override
348+
public String getProfileName() {
349+
return null;
350+
}
351+
352+
@Override
353+
public long getRequestId() {
354+
return 0;
355+
}
356+
357+
@Override
358+
public String getChannelType() {
359+
return null;
360+
}
361+
362+
@Override
363+
public void sendResponse(TransportResponse response) throws IOException {
364+
sendResponse.set(true);
365+
}
366+
367+
@Override
368+
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
369+
370+
}
371+
372+
@Override
373+
public void sendResponse(Exception exception) throws IOException {
374+
375+
}
376+
});
377+
assertTrue(sendResponse.get());
378+
}
379+
}
286380
}

0 commit comments

Comments
 (0)