Skip to content

Commit 3ba968c

Browse files
committed
TemplateUpgraders should be called during rolling restart (#25263)
In #24379 we added ability to upgrade templates on full cluster startup. This PR invokes the same update procedure also when a new node first joins the cluster allowing to update templates on a rolling cluster restart as well. Closes #24680
1 parent 3f458ea commit 3ba968c

File tree

7 files changed

+934
-10
lines changed

7 files changed

+934
-10
lines changed

core/src/main/java/org/elasticsearch/action/admin/indices/template/delete/DeleteIndexTemplateResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public class DeleteIndexTemplateResponse extends AcknowledgedResponse {
3232
DeleteIndexTemplateResponse() {
3333
}
3434

35-
DeleteIndexTemplateResponse(boolean acknowledged) {
35+
protected DeleteIndexTemplateResponse(boolean acknowledged) {
3636
super(acknowledged);
3737
}
3838

core/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,14 @@ public static void toXContent(IndexTemplateMetaData indexTemplateMetaData, XCont
377377
throws IOException {
378378
builder.startObject(indexTemplateMetaData.name());
379379

380+
toInnerXContent(indexTemplateMetaData, builder, params);
381+
382+
builder.endObject();
383+
}
384+
385+
public static void toInnerXContent(IndexTemplateMetaData indexTemplateMetaData, XContentBuilder builder, ToXContent.Params params)
386+
throws IOException {
387+
380388
builder.field("order", indexTemplateMetaData.order());
381389
if (indexTemplateMetaData.version() != null) {
382390
builder.field("version", indexTemplateMetaData.version());
@@ -420,8 +428,6 @@ public static void toXContent(IndexTemplateMetaData indexTemplateMetaData, XCont
420428
AliasMetaData.Builder.toXContent(cursor.value, builder, params);
421429
}
422430
builder.endObject();
423-
424-
builder.endObject();
425431
}
426432

427433
public static IndexTemplateMetaData fromXContent(XContentParser parser, String templateName) throws IOException {
Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.cluster.metadata;
21+
22+
import com.carrotsearch.hppc.cursors.ObjectCursor;
23+
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
24+
import org.apache.logging.log4j.message.ParameterizedMessage;
25+
import org.elasticsearch.Version;
26+
import org.elasticsearch.action.ActionListener;
27+
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
28+
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateResponse;
29+
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
30+
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
31+
import org.elasticsearch.client.Client;
32+
import org.elasticsearch.cluster.ClusterChangedEvent;
33+
import org.elasticsearch.cluster.ClusterState;
34+
import org.elasticsearch.cluster.ClusterStateListener;
35+
import org.elasticsearch.cluster.node.DiscoveryNode;
36+
import org.elasticsearch.cluster.node.DiscoveryNodes;
37+
import org.elasticsearch.cluster.service.ClusterService;
38+
import org.elasticsearch.common.bytes.BytesReference;
39+
import org.elasticsearch.common.collect.ImmutableOpenMap;
40+
import org.elasticsearch.common.collect.Tuple;
41+
import org.elasticsearch.common.component.AbstractComponent;
42+
import org.elasticsearch.common.settings.Settings;
43+
import org.elasticsearch.common.unit.TimeValue;
44+
import org.elasticsearch.common.xcontent.ToXContent;
45+
import org.elasticsearch.common.xcontent.XContentHelper;
46+
import org.elasticsearch.common.xcontent.XContentType;
47+
import org.elasticsearch.gateway.GatewayService;
48+
import org.elasticsearch.indices.IndexTemplateMissingException;
49+
import org.elasticsearch.plugins.Plugin;
50+
import org.elasticsearch.threadpool.ThreadPool;
51+
52+
import java.io.IOException;
53+
import java.util.Collection;
54+
import java.util.HashMap;
55+
import java.util.HashSet;
56+
import java.util.Map;
57+
import java.util.Optional;
58+
import java.util.Set;
59+
import java.util.concurrent.atomic.AtomicInteger;
60+
import java.util.function.UnaryOperator;
61+
62+
import static java.util.Collections.singletonMap;
63+
64+
/**
65+
* Upgrades Templates on behalf of installed {@link Plugin}s when a node joins the cluster
66+
*/
67+
public class TemplateUpgradeService extends AbstractComponent implements ClusterStateListener {
68+
private final UnaryOperator<Map<String, IndexTemplateMetaData>> indexTemplateMetaDataUpgraders;
69+
70+
public final ClusterService clusterService;
71+
72+
public final ThreadPool threadPool;
73+
74+
public final Client client;
75+
76+
private final AtomicInteger updatesInProgress = new AtomicInteger();
77+
78+
private ImmutableOpenMap<String, IndexTemplateMetaData> lastTemplateMetaData;
79+
80+
public TemplateUpgradeService(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool,
81+
Collection<UnaryOperator<Map<String, IndexTemplateMetaData>>> indexTemplateMetaDataUpgraders) {
82+
super(settings);
83+
this.client = client;
84+
this.clusterService = clusterService;
85+
this.threadPool = threadPool;
86+
this.indexTemplateMetaDataUpgraders = templates -> {
87+
Map<String, IndexTemplateMetaData> upgradedTemplates = new HashMap<>(templates);
88+
for (UnaryOperator<Map<String, IndexTemplateMetaData>> upgrader : indexTemplateMetaDataUpgraders) {
89+
upgradedTemplates = upgrader.apply(upgradedTemplates);
90+
}
91+
return upgradedTemplates;
92+
};
93+
clusterService.addListener(this);
94+
}
95+
96+
@Override
97+
public void clusterChanged(ClusterChangedEvent event) {
98+
ClusterState state = event.state();
99+
if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
100+
// wait until the gateway has recovered from disk, otherwise we think may not have the index templates,
101+
// while they actually do exist
102+
return;
103+
}
104+
105+
if (updatesInProgress.get() > 0) {
106+
// we are already running some updates - skip this cluster state update
107+
return;
108+
}
109+
110+
ImmutableOpenMap<String, IndexTemplateMetaData> templates = state.getMetaData().getTemplates();
111+
112+
if (templates == lastTemplateMetaData) {
113+
// we already checked these sets of templates - no reason to check it again
114+
// we can do identity check here because due to cluster state diffs the actual map will not change
115+
// if there were no changes
116+
return;
117+
}
118+
119+
if (shouldLocalNodeUpdateTemplates(state.nodes()) == false) {
120+
return;
121+
}
122+
123+
124+
lastTemplateMetaData = templates;
125+
Optional<Tuple<Map<String, BytesReference>, Set<String>>> changes = calculateTemplateChanges(templates);
126+
if (changes.isPresent()) {
127+
if (updatesInProgress.compareAndSet(0, changes.get().v1().size() + changes.get().v2().size())) {
128+
threadPool.generic().execute(() -> updateTemplates(changes.get().v1(), changes.get().v2()));
129+
}
130+
}
131+
}
132+
133+
/**
134+
* Checks if the current node should update the templates
135+
*
136+
* If the master has the newest verison in the cluster - it will be dedicated template updater.
137+
* Otherwise the node with the highest id among nodes with the highest version should update the templates
138+
*/
139+
boolean shouldLocalNodeUpdateTemplates(DiscoveryNodes nodes) {
140+
DiscoveryNode localNode = nodes.getLocalNode();
141+
// Only data and master nodes should update the template
142+
if (localNode.isDataNode() || localNode.isMasterNode()) {
143+
Version maxVersion = nodes.getLargestNonClientNodeVersion();
144+
if (maxVersion.equals(nodes.getMasterNode().getVersion())) {
145+
// If the master has the latest version - we will allow it to handle the update
146+
return nodes.isLocalNodeElectedMaster();
147+
} else {
148+
if (maxVersion.equals(localNode.getVersion()) == false) {
149+
// The localhost node doesn't have the latest version - not going to update
150+
return false;
151+
}
152+
for (ObjectCursor<DiscoveryNode> node : nodes.getMasterAndDataNodes().values()) {
153+
if (node.value.getVersion().equals(maxVersion) && node.value.getId().compareTo(localNode.getId()) > 0) {
154+
// We have a node with higher id then mine - it should update
155+
return false;
156+
}
157+
}
158+
// We have the highest version and highest id - we should perform the update
159+
return true;
160+
}
161+
} else {
162+
return false;
163+
}
164+
}
165+
166+
void updateTemplates(Map<String, BytesReference> changes, Set<String> deletions) {
167+
for (Map.Entry<String, BytesReference> change : changes.entrySet()) {
168+
PutIndexTemplateRequest request =
169+
new PutIndexTemplateRequest(change.getKey()).source(change.getValue(), XContentType.JSON);
170+
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
171+
client.admin().indices().putTemplate(request, new ActionListener<PutIndexTemplateResponse>() {
172+
@Override
173+
public void onResponse(PutIndexTemplateResponse response) {
174+
updatesInProgress.decrementAndGet();
175+
if (response.isAcknowledged() == false) {
176+
logger.warn("Error updating template [{}], request was not acknowledged", change.getKey());
177+
}
178+
}
179+
180+
@Override
181+
public void onFailure(Exception e) {
182+
updatesInProgress.decrementAndGet();
183+
logger.warn(new ParameterizedMessage("Error updating template [{}]", change.getKey()), e);
184+
}
185+
});
186+
}
187+
188+
for (String template : deletions) {
189+
DeleteIndexTemplateRequest request = new DeleteIndexTemplateRequest(template);
190+
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
191+
client.admin().indices().deleteTemplate(request, new ActionListener<DeleteIndexTemplateResponse>() {
192+
@Override
193+
public void onResponse(DeleteIndexTemplateResponse response) {
194+
updatesInProgress.decrementAndGet();
195+
if (response.isAcknowledged() == false) {
196+
logger.warn("Error deleting template [{}], request was not acknowledged", template);
197+
}
198+
}
199+
200+
@Override
201+
public void onFailure(Exception e) {
202+
updatesInProgress.decrementAndGet();
203+
if (e instanceof IndexTemplateMissingException == false) {
204+
// we might attempt to delete the same template from different nodes - so that's ok if template doesn't exist
205+
// otherwise we need to warn
206+
logger.warn(new ParameterizedMessage("Error deleting template [{}]", template), e);
207+
}
208+
}
209+
});
210+
}
211+
}
212+
213+
int getUpdatesInProgress() {
214+
return updatesInProgress.get();
215+
}
216+
217+
Optional<Tuple<Map<String, BytesReference>, Set<String>>> calculateTemplateChanges(
218+
ImmutableOpenMap<String, IndexTemplateMetaData> templates) {
219+
// collect current templates
220+
Map<String, IndexTemplateMetaData> existingMap = new HashMap<>();
221+
for (ObjectObjectCursor<String, IndexTemplateMetaData> customCursor : templates) {
222+
existingMap.put(customCursor.key, customCursor.value);
223+
}
224+
// upgrade global custom meta data
225+
Map<String, IndexTemplateMetaData> upgradedMap = indexTemplateMetaDataUpgraders.apply(existingMap);
226+
if (upgradedMap.equals(existingMap) == false) {
227+
Set<String> deletes = new HashSet<>();
228+
Map<String, BytesReference> changes = new HashMap<>();
229+
// remove templates if needed
230+
existingMap.keySet().forEach(s -> {
231+
if (upgradedMap.containsKey(s) == false) {
232+
deletes.add(s);
233+
}
234+
});
235+
upgradedMap.forEach((key, value) -> {
236+
if (value.equals(existingMap.get(key)) == false) {
237+
changes.put(key, toBytesReference(value));
238+
}
239+
});
240+
return Optional.of(new Tuple<>(changes, deletes));
241+
}
242+
return Optional.empty();
243+
}
244+
245+
private static final ToXContent.Params PARAMS = new ToXContent.MapParams(singletonMap("reduce_mappings", "true"));
246+
247+
private BytesReference toBytesReference(IndexTemplateMetaData templateMetaData) {
248+
try {
249+
return XContentHelper.toXContent((builder, params) -> {
250+
IndexTemplateMetaData.Builder.toInnerXContent(templateMetaData, builder, params);
251+
return builder;
252+
}, XContentType.JSON, PARAMS, false);
253+
} catch (IOException ex) {
254+
throw new IllegalStateException("Cannot serialize template [" + templateMetaData.getName() + "]", ex);
255+
}
256+
}
257+
}

core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,20 +57,22 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
5757
private final String masterNodeId;
5858
private final String localNodeId;
5959
private final Version minNonClientNodeVersion;
60+
private final Version maxNonClientNodeVersion;
6061
private final Version maxNodeVersion;
6162
private final Version minNodeVersion;
6263

6364
private DiscoveryNodes(ImmutableOpenMap<String, DiscoveryNode> nodes, ImmutableOpenMap<String, DiscoveryNode> dataNodes,
6465
ImmutableOpenMap<String, DiscoveryNode> masterNodes, ImmutableOpenMap<String, DiscoveryNode> ingestNodes,
65-
String masterNodeId, String localNodeId, Version minNonClientNodeVersion, Version maxNodeVersion,
66-
Version minNodeVersion) {
66+
String masterNodeId, String localNodeId, Version minNonClientNodeVersion, Version maxNonClientNodeVersion,
67+
Version maxNodeVersion, Version minNodeVersion) {
6768
this.nodes = nodes;
6869
this.dataNodes = dataNodes;
6970
this.masterNodes = masterNodes;
7071
this.ingestNodes = ingestNodes;
7172
this.masterNodeId = masterNodeId;
7273
this.localNodeId = localNodeId;
7374
this.minNonClientNodeVersion = minNonClientNodeVersion;
75+
this.maxNonClientNodeVersion = maxNonClientNodeVersion;
7476
this.minNodeVersion = minNodeVersion;
7577
this.maxNodeVersion = maxNodeVersion;
7678
}
@@ -235,12 +237,25 @@ public boolean isAllNodes(String... nodesIds) {
235237
/**
236238
* Returns the version of the node with the oldest version in the cluster that is not a client node
237239
*
240+
* If there are no non-client nodes, Version.CURRENT will be returned.
241+
*
238242
* @return the oldest version in the cluster
239243
*/
240244
public Version getSmallestNonClientNodeVersion() {
241245
return minNonClientNodeVersion;
242246
}
243247

248+
/**
249+
* Returns the version of the node with the youngest version in the cluster that is not a client node.
250+
*
251+
* If there are no non-client nodes, Version.CURRENT will be returned.
252+
*
253+
* @return the youngest version in the cluster
254+
*/
255+
public Version getLargestNonClientNodeVersion() {
256+
return maxNonClientNodeVersion;
257+
}
258+
244259
/**
245260
* Returns the version of the node with the oldest version in the cluster.
246261
*
@@ -253,7 +268,7 @@ public Version getMinNodeVersion() {
253268
/**
254269
* Returns the version of the node with the yougest version in the cluster
255270
*
256-
* @return the oldest version in the cluster
271+
* @return the youngest version in the cluster
257272
*/
258273
public Version getMaxNodeVersion() {
259274
return maxNodeVersion;
@@ -665,15 +680,25 @@ public DiscoveryNodes build() {
665680
ImmutableOpenMap.Builder<String, DiscoveryNode> ingestNodesBuilder = ImmutableOpenMap.builder();
666681
Version minNodeVersion = Version.CURRENT;
667682
Version maxNodeVersion = Version.CURRENT;
668-
Version minNonClientNodeVersion = Version.CURRENT;
683+
// The node where we are building this on might not be a master or a data node, so we cannot assume
684+
// that there is a node with the current version as a part of the cluster.
685+
Version minNonClientNodeVersion = null;
686+
Version maxNonClientNodeVersion = null;
669687
for (ObjectObjectCursor<String, DiscoveryNode> nodeEntry : nodes) {
670688
if (nodeEntry.value.isDataNode()) {
671689
dataNodesBuilder.put(nodeEntry.key, nodeEntry.value);
672-
minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion());
673690
}
674691
if (nodeEntry.value.isMasterNode()) {
675692
masterNodesBuilder.put(nodeEntry.key, nodeEntry.value);
676-
minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion());
693+
}
694+
if (nodeEntry.value.isDataNode() || nodeEntry.value.isMasterNode()) {
695+
if (minNonClientNodeVersion == null) {
696+
minNonClientNodeVersion = nodeEntry.value.getVersion();
697+
maxNonClientNodeVersion = nodeEntry.value.getVersion();
698+
} else {
699+
minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion());
700+
maxNonClientNodeVersion = Version.max(maxNonClientNodeVersion, nodeEntry.value.getVersion());
701+
}
677702
}
678703
if (nodeEntry.value.isIngestNode()) {
679704
ingestNodesBuilder.put(nodeEntry.key, nodeEntry.value);
@@ -684,7 +709,8 @@ public DiscoveryNodes build() {
684709

685710
return new DiscoveryNodes(
686711
nodes.build(), dataNodesBuilder.build(), masterNodesBuilder.build(), ingestNodesBuilder.build(),
687-
masterNodeId, localNodeId, minNonClientNodeVersion, maxNodeVersion, minNodeVersion
712+
masterNodeId, localNodeId, minNonClientNodeVersion == null ? Version.CURRENT : minNonClientNodeVersion,
713+
maxNonClientNodeVersion == null ? Version.CURRENT : maxNonClientNodeVersion, maxNodeVersion, minNodeVersion
688714
);
689715
}
690716

0 commit comments

Comments
 (0)