Skip to content

Commit 6f82b0c

Browse files
authored
Allow ClusterState.Custom to be created on initial cluster states (#26144)
Today we have a `null` invariant on all `ClusterState.Custom`. This makes several code paths complicated and requires complex state handling in some cases. This change allows to register a custom supplier that is used to initialize the initial clusterstate with these transient customs.
1 parent 99ac7be commit 6f82b0c

File tree

22 files changed

+220
-37
lines changed

22 files changed

+220
-37
lines changed

core/src/main/java/org/elasticsearch/cluster/ClusterModule.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373

7474
import java.util.ArrayList;
7575
import java.util.Collection;
76+
import java.util.Collections;
7677
import java.util.HashMap;
7778
import java.util.LinkedHashMap;
7879
import java.util.List;
@@ -94,7 +95,6 @@ public class ClusterModule extends AbstractModule {
9495
private final IndexNameExpressionResolver indexNameExpressionResolver;
9596
private final AllocationDeciders allocationDeciders;
9697
private final AllocationService allocationService;
97-
private final Runnable onStarted;
9898
// pkg private for tests
9999
final Collection<AllocationDecider> deciderList;
100100
final ShardsAllocator shardsAllocator;
@@ -107,9 +107,24 @@ public ClusterModule(Settings settings, ClusterService clusterService, List<Clus
107107
this.clusterService = clusterService;
108108
this.indexNameExpressionResolver = new IndexNameExpressionResolver(settings);
109109
this.allocationService = new AllocationService(settings, allocationDeciders, shardsAllocator, clusterInfoService);
110-
this.onStarted = () -> clusterPlugins.forEach(plugin -> plugin.onNodeStarted());
111110
}
112111

112+
public static Map<String, Supplier<ClusterState.Custom>> getClusterStateCustomSuppliers(List<ClusterPlugin> clusterPlugins) {
113+
final Map<String, Supplier<ClusterState.Custom>> customSupplier = new HashMap<>();
114+
customSupplier.put(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress::new);
115+
customSupplier.put(RestoreInProgress.TYPE, RestoreInProgress::new);
116+
customSupplier.put(SnapshotsInProgress.TYPE, SnapshotsInProgress::new);
117+
for (ClusterPlugin plugin : clusterPlugins) {
118+
Map<String, Supplier<ClusterState.Custom>> initialCustomSupplier = plugin.getInitialClusterStateCustomSupplier();
119+
for (String key : initialCustomSupplier.keySet()) {
120+
if (customSupplier.containsKey(key)) {
121+
throw new IllegalStateException("custom supplier key [" + key + "] is registered more than once");
122+
}
123+
}
124+
customSupplier.putAll(initialCustomSupplier);
125+
}
126+
return Collections.unmodifiableMap(customSupplier);
127+
}
113128

114129
public static List<Entry> getNamedWriteables() {
115130
List<Entry> entries = new ArrayList<>();
@@ -243,8 +258,4 @@ protected void configure() {
243258
bind(AllocationDeciders.class).toInstance(allocationDeciders);
244259
bind(ShardsAllocator.class).toInstance(shardsAllocator);
245260
}
246-
247-
public Runnable onStarted() {
248-
return onStarted;
249-
}
250261
}

core/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,6 @@ public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements
4545

4646
private final List<Entry> entries;
4747

48-
/**
49-
* Constructs new restore metadata
50-
*
51-
* @param entries list of currently running restore processes
52-
*/
53-
public RestoreInProgress(List<Entry> entries) {
54-
this.entries = entries;
55-
}
56-
5748
/**
5849
* Constructs new restore metadata
5950
*

core/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
4545
// the list of snapshot deletion request entries
4646
private final List<Entry> entries;
4747

48+
public SnapshotDeletionsInProgress() {
49+
this(Collections.emptyList());
50+
}
51+
4852
private SnapshotDeletionsInProgress(List<Entry> entries) {
4953
this.entries = Collections.unmodifiableList(entries);
5054
}

core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ private Decision canMove(ShardRouting shardRouting, RoutingAllocation allocation
6767
// Only primary shards are snapshotted
6868

6969
SnapshotsInProgress snapshotsInProgress = allocation.custom(SnapshotsInProgress.TYPE);
70-
if (snapshotsInProgress == null) {
70+
if (snapshotsInProgress == null || snapshotsInProgress.entries().isEmpty()) {
7171
// Snapshots are not running
7272
return allocation.decision(Decision.YES, NAME, "no snapshots are currently running");
7373
}

core/src/main/java/org/elasticsearch/cluster/service/ClusterApplier.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,10 @@ public interface ClusterApplier {
3939
* @param listener callback that is invoked after cluster state is applied
4040
*/
4141
void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterStateTaskListener listener);
42+
43+
/**
44+
* Creates a new cluster state builder that is initialized with the cluster name and all initial cluster state customs.
45+
*/
46+
ClusterState.Builder newClusterStateBuilder();
47+
4248
}

core/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,17 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
9797
private final AtomicReference<ClusterState> state; // last applied state
9898

9999
private NodeConnectionsService nodeConnectionsService;
100+
private Supplier<ClusterState.Builder> stateBuilderSupplier;
100101

101-
public ClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
102+
public ClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, Supplier<ClusterState
103+
.Builder> stateBuilderSupplier) {
102104
super(settings);
103105
this.clusterSettings = clusterSettings;
104106
this.threadPool = threadPool;
105107
this.state = new AtomicReference<>();
106108
this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);
107109
this.localNodeMasterListeners = new LocalNodeMasterListeners(threadPool);
110+
this.stateBuilderSupplier = stateBuilderSupplier;
108111
}
109112

110113
public void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
@@ -653,4 +656,9 @@ public void run() {
653656
protected long currentTimeInNanos() {
654657
return System.nanoTime();
655658
}
659+
660+
@Override
661+
public ClusterState.Builder newClusterStateBuilder() {
662+
return stateBuilderSupplier.get();
663+
}
656664
}

core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242

4343
import java.util.Collections;
4444
import java.util.Map;
45+
import java.util.function.Supplier;
4546

4647
public class ClusterService extends AbstractLifecycleComponent {
4748

@@ -58,16 +59,30 @@ public class ClusterService extends AbstractLifecycleComponent {
5859
private final OperationRouting operationRouting;
5960

6061
private final ClusterSettings clusterSettings;
62+
private final Map<String, Supplier<ClusterState.Custom>> initialClusterStateCustoms;
6163

62-
public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
64+
public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool,
65+
Map<String, Supplier<ClusterState.Custom>> initialClusterStateCustoms) {
6366
super(settings);
64-
this.clusterApplierService = new ClusterApplierService(settings, clusterSettings, threadPool);
6567
this.masterService = new MasterService(settings, threadPool);
6668
this.operationRouting = new OperationRouting(settings, clusterSettings);
6769
this.clusterSettings = clusterSettings;
6870
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
6971
this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
7072
this::setSlowTaskLoggingThreshold);
73+
this.initialClusterStateCustoms = initialClusterStateCustoms;
74+
this.clusterApplierService = new ClusterApplierService(settings, clusterSettings, threadPool, this::newClusterStateBuilder);
75+
}
76+
77+
/**
78+
* Creates a new cluster state builder that is initialized with the cluster name and all initial cluster state customs.
79+
*/
80+
public ClusterState.Builder newClusterStateBuilder() {
81+
ClusterState.Builder builder = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings));
82+
for (Map.Entry<String, Supplier<ClusterState.Custom>> entry : initialClusterStateCustoms.entrySet()) {
83+
builder.putCustom(entry.getKey(), entry.getValue().get());
84+
}
85+
return builder;
7186
}
7287

7388
private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {

core/src/main/java/org/elasticsearch/discovery/single/SingleNodeDiscovery.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,8 @@ protected synchronized void doStart() {
117117
}
118118

119119
protected ClusterState createInitialState(DiscoveryNode localNode) {
120-
return ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings))
121-
.nodes(DiscoveryNodes.builder().add(localNode)
120+
ClusterState.Builder builder = clusterApplier.newClusterStateBuilder();
121+
return builder.nodes(DiscoveryNodes.builder().add(localNode)
122122
.localNodeId(localNode.getId())
123123
.masterNodeId(localNode.getId())
124124
.build())

core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,6 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
113113

114114
private final TransportService transportService;
115115
private final MasterService masterService;
116-
private final ClusterName clusterName;
117116
private final DiscoverySettings discoverySettings;
118117
protected final ZenPing zenPing; // protected to allow tests access
119118
private final MasterFaultDetection masterFD;
@@ -169,7 +168,7 @@ public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService t
169168
this.maxPingsFromAnotherMaster = MAX_PINGS_FROM_ANOTHER_MASTER_SETTING.get(settings);
170169
this.sendLeaveRequest = SEND_LEAVE_REQUEST_SETTING.get(settings);
171170
this.threadPool = threadPool;
172-
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
171+
ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
173172
this.committedState = new AtomicReference<>();
174173

175174
this.masterElectionIgnoreNonMasters = MASTER_ELECTION_IGNORE_NON_MASTER_PINGS_SETTING.get(settings);
@@ -238,7 +237,8 @@ protected void doStart() {
238237
// set initial state
239238
assert committedState.get() == null;
240239
assert localNode != null;
241-
ClusterState initialState = ClusterState.builder(clusterName)
240+
ClusterState.Builder builder = clusterApplier.newClusterStateBuilder();
241+
ClusterState initialState = builder
242242
.blocks(ClusterBlocks.builder()
243243
.addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)
244244
.addGlobalBlock(discoverySettings.getNoMasterBlock()))

core/src/main/java/org/elasticsearch/gateway/Gateway.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ public void performStateRecovery(final GatewayStateRecoveredListener listener) t
155155
metaDataBuilder.transientSettings(),
156156
e -> logUnknownSetting("transient", e),
157157
(e, ex) -> logInvalidSetting("transient", e, ex)));
158-
ClusterState.Builder builder = ClusterState.builder(clusterService.getClusterName());
158+
ClusterState.Builder builder = clusterService.newClusterStateBuilder();
159159
builder.metaData(metaDataBuilder);
160160
listener.onSuccess(builder.build());
161161
}

0 commit comments

Comments
 (0)