Skip to content

Commit d3e77d4

Browse files
committed
TemplateUpgraders should be called during rolling restart
In elastic#24379 we added ability to upgrade templates on full cluster startup. This PR invokes the same update procedure also when a new node first joins the cluster allowing to update templates on a full cluster restart. Closes elastic#24680
1 parent c161d90 commit d3e77d4

File tree

6 files changed

+722
-2
lines changed

6 files changed

+722
-2
lines changed

core/src/main/java/org/elasticsearch/action/admin/indices/template/delete/DeleteIndexTemplateResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public class DeleteIndexTemplateResponse extends AcknowledgedResponse {
3232
DeleteIndexTemplateResponse() {
3333
}
3434

35-
DeleteIndexTemplateResponse(boolean acknowledged) {
35+
protected DeleteIndexTemplateResponse(boolean acknowledged) {
3636
super(acknowledged);
3737
}
3838

core/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,14 @@ public static void toXContent(IndexTemplateMetaData indexTemplateMetaData, XCont
387387
throws IOException {
388388
builder.startObject(indexTemplateMetaData.name());
389389

390+
toInnerXContent(indexTemplateMetaData, builder, params);
391+
392+
builder.endObject();
393+
}
394+
395+
public static void toInnerXContent(IndexTemplateMetaData indexTemplateMetaData, XContentBuilder builder, ToXContent.Params params)
396+
throws IOException {
397+
390398
builder.field("order", indexTemplateMetaData.order());
391399
if (indexTemplateMetaData.version() != null) {
392400
builder.field("version", indexTemplateMetaData.version());
@@ -431,7 +439,6 @@ public static void toXContent(IndexTemplateMetaData indexTemplateMetaData, XCont
431439
}
432440
builder.endObject();
433441

434-
builder.endObject();
435442
}
436443

437444
public static IndexTemplateMetaData fromXContent(XContentParser parser, String templateName) throws IOException {
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.cluster.metadata;
21+
22+
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
23+
import org.apache.logging.log4j.message.ParameterizedMessage;
24+
import org.elasticsearch.action.ActionListener;
25+
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
26+
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateResponse;
27+
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
28+
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
29+
import org.elasticsearch.client.Client;
30+
import org.elasticsearch.cluster.ClusterChangedEvent;
31+
import org.elasticsearch.cluster.ClusterState;
32+
import org.elasticsearch.cluster.ClusterStateListener;
33+
import org.elasticsearch.cluster.service.ClusterService;
34+
import org.elasticsearch.common.bytes.BytesReference;
35+
import org.elasticsearch.common.collect.ImmutableOpenMap;
36+
import org.elasticsearch.common.collect.Tuple;
37+
import org.elasticsearch.common.component.AbstractComponent;
38+
import org.elasticsearch.common.settings.Settings;
39+
import org.elasticsearch.common.unit.TimeValue;
40+
import org.elasticsearch.common.xcontent.ToXContent;
41+
import org.elasticsearch.common.xcontent.XContentHelper;
42+
import org.elasticsearch.common.xcontent.XContentType;
43+
import org.elasticsearch.gateway.GatewayService;
44+
import org.elasticsearch.indices.IndexTemplateMissingException;
45+
import org.elasticsearch.plugins.Plugin;
46+
import org.elasticsearch.threadpool.ThreadPool;
47+
48+
import java.io.IOException;
49+
import java.util.Collection;
50+
import java.util.Collections;
51+
import java.util.HashMap;
52+
import java.util.HashSet;
53+
import java.util.Map;
54+
import java.util.Set;
55+
import java.util.concurrent.atomic.AtomicInteger;
56+
import java.util.function.UnaryOperator;
57+
58+
import static java.util.Collections.singletonMap;
59+
60+
/**
61+
* Upgrades Templates on behalf of installed {@link Plugin}s when a node joins the cluster
62+
*/
63+
public class TemplateUpgradeService extends AbstractComponent implements ClusterStateListener {
64+
private final Tuple<Map<String, BytesReference>, Set<String>> EMPTY = new Tuple<>(Collections.emptyMap(), Collections.emptySet());
65+
66+
private final UnaryOperator<Map<String, IndexTemplateMetaData>> indexTemplateMetaDataUpgraders;
67+
68+
public final ClusterService clusterService;
69+
70+
public final ThreadPool threadPool;
71+
72+
public final Client client;
73+
74+
private final AtomicInteger updatesInProgress = new AtomicInteger();
75+
76+
private ImmutableOpenMap<String, IndexTemplateMetaData> lastTemplateMetaData;
77+
78+
public TemplateUpgradeService(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool,
79+
Collection<UnaryOperator<Map<String, IndexTemplateMetaData>>> indexTemplateMetaDataUpgraders) {
80+
super(settings);
81+
this.client = client;
82+
this.clusterService = clusterService;
83+
this.threadPool = threadPool;
84+
this.indexTemplateMetaDataUpgraders = templates -> {
85+
Map<String, IndexTemplateMetaData> upgradedTemplates = new HashMap<>(templates);
86+
for (UnaryOperator<Map<String, IndexTemplateMetaData>> upgrader : indexTemplateMetaDataUpgraders) {
87+
upgradedTemplates = upgrader.apply(upgradedTemplates);
88+
}
89+
return upgradedTemplates;
90+
};
91+
clusterService.addListener(this);
92+
}
93+
94+
@Override
95+
public void clusterChanged(ClusterChangedEvent event) {
96+
ClusterState state = event.state();
97+
if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
98+
// wait until the gateway has recovered from disk, otherwise we think may not have the index templates,
99+
// while they actually do exist
100+
return;
101+
}
102+
103+
if (updatesInProgress.get() > 0) {
104+
// we are already running some updates - skip this cluster state update
105+
return;
106+
}
107+
108+
ImmutableOpenMap<String, IndexTemplateMetaData> templates = state.getMetaData().getTemplates();
109+
110+
if (templates == lastTemplateMetaData) {
111+
// we already checked these sets of templates - no reason to check it again
112+
// we can do identity check here because due to cluster state diffs the actual map will not change
113+
// if there were no changes
114+
return;
115+
}
116+
117+
lastTemplateMetaData = templates;
118+
Tuple<Map<String, BytesReference>, Set<String>> changes = calculateTemplateChanges(templates);
119+
if (changes.v1().isEmpty() && changes.v2().isEmpty()) {
120+
// no changes required
121+
return;
122+
}
123+
124+
if (updatesInProgress.compareAndSet(0, changes.v1().size() + changes.v2().size())) {
125+
threadPool.generic().execute(() -> updateTemplates(changes.v1(), changes.v2()));
126+
}
127+
}
128+
129+
void updateTemplates(Map<String, BytesReference> changes, Set<String> deletions) {
130+
for (Map.Entry<String, BytesReference> change : changes.entrySet()) {
131+
PutIndexTemplateRequest request = new PutIndexTemplateRequest(change.getKey()).source(change.getValue(), XContentType.JSON);
132+
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
133+
client.admin().indices().putTemplate(request, new ActionListener<PutIndexTemplateResponse>() {
134+
@Override
135+
public void onResponse(PutIndexTemplateResponse response) {
136+
updatesInProgress.decrementAndGet();
137+
if (response.isAcknowledged() == false) {
138+
logger.warn("Error updating template [{}], request was not acknowledged", change.getKey());
139+
}
140+
}
141+
142+
@Override
143+
public void onFailure(Exception e) {
144+
updatesInProgress.decrementAndGet();
145+
logger.warn(new ParameterizedMessage("Error updating template [{}]", change.getKey()), e);
146+
}
147+
});
148+
}
149+
150+
for (String template : deletions) {
151+
DeleteIndexTemplateRequest request = new DeleteIndexTemplateRequest(template);
152+
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
153+
client.admin().indices().deleteTemplate(request, new ActionListener<DeleteIndexTemplateResponse>() {
154+
@Override
155+
public void onResponse(DeleteIndexTemplateResponse response) {
156+
updatesInProgress.decrementAndGet();
157+
if (response.isAcknowledged() == false) {
158+
logger.warn("Error deleting template [{}], request was not acknowledged", template);
159+
}
160+
}
161+
162+
@Override
163+
public void onFailure(Exception e) {
164+
updatesInProgress.decrementAndGet();
165+
if (e instanceof IndexTemplateMissingException == false) {
166+
// we might attempt to delete the same template from different nodes - so that's ok if template doesn't exist
167+
// otherwise we need to warn
168+
logger.warn(new ParameterizedMessage("Error deleting template [{}]", template), e);
169+
}
170+
}
171+
});
172+
}
173+
}
174+
175+
boolean hasUpdatesInProgress() {
176+
return updatesInProgress.get() > 0;
177+
}
178+
179+
Tuple<Map<String, BytesReference>, Set<String>> calculateTemplateChanges(ImmutableOpenMap<String, IndexTemplateMetaData> templates) {
180+
// collect current templates
181+
Map<String, IndexTemplateMetaData> existingMap = new HashMap<>();
182+
for (ObjectObjectCursor<String, IndexTemplateMetaData> customCursor : templates) {
183+
existingMap.put(customCursor.key, customCursor.value);
184+
}
185+
// upgrade global custom meta data
186+
Map<String, IndexTemplateMetaData> upgradedMap = indexTemplateMetaDataUpgraders.apply(existingMap);
187+
if (upgradedMap.equals(existingMap) == false) {
188+
Set<String> deletes = new HashSet<>();
189+
Map<String, BytesReference> changes = new HashMap<>();
190+
// remove templates if needed
191+
existingMap.keySet().forEach(s -> {
192+
if (upgradedMap.containsKey(s) == false) {
193+
deletes.add(s);
194+
}
195+
});
196+
upgradedMap.forEach((key, value) -> {
197+
if (value.equals(existingMap.get(key)) == false) {
198+
changes.put(key, toBytesReference(value));
199+
}
200+
});
201+
return new Tuple<>(changes, deletes);
202+
}
203+
return EMPTY;
204+
}
205+
206+
private static final ToXContent.Params PARAMS = new ToXContent.MapParams(singletonMap("reduce_mappings", "true"));
207+
208+
private BytesReference toBytesReference(IndexTemplateMetaData templateMetaData) {
209+
try {
210+
return XContentHelper.toXContent((builder, params) -> {
211+
IndexTemplateMetaData.Builder.toInnerXContent(templateMetaData, builder, params);
212+
return builder;
213+
}, XContentType.JSON, PARAMS, false);
214+
} catch (IOException ex) {
215+
throw new IllegalStateException("Cannot serialize template [" + templateMetaData.getName() + "]", ex);
216+
}
217+
}
218+
}

core/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
4848
import org.elasticsearch.cluster.metadata.MetaData;
4949
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
50+
import org.elasticsearch.cluster.metadata.TemplateUpgradeService;
5051
import org.elasticsearch.cluster.node.DiscoveryNode;
5152
import org.elasticsearch.cluster.routing.RoutingService;
5253
import org.elasticsearch.cluster.service.ClusterService;
@@ -415,6 +416,8 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
415416
Collection<UnaryOperator<IndexMetaData>> indexMetaDataUpgraders = pluginsService.filterPlugins(Plugin.class).stream()
416417
.map(Plugin::getIndexMetaDataUpgrader).collect(Collectors.toList());
417418
final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(customMetaDataUpgraders, indexTemplateMetaDataUpgraders);
419+
final TemplateUpgradeService templateUpgradeService = new TemplateUpgradeService(settings, client, clusterService, threadPool,
420+
indexTemplateMetaDataUpgraders);
418421
final Transport transport = networkModule.getTransportSupplier().get();
419422
final TransportService transportService = newTransportService(settings, transport, threadPool,
420423
networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings());
@@ -460,6 +463,7 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
460463
b.bind(UsageService.class).toInstance(usageService);
461464
b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
462465
b.bind(MetaDataUpgrader.class).toInstance(metaDataUpgrader);
466+
b.bind(TemplateUpgradeService.class).toInstance(templateUpgradeService);
463467
b.bind(MetaStateService.class).toInstance(metaStateService);
464468
b.bind(IndicesService.class).toInstance(indicesService);
465469
b.bind(SearchService.class).toInstance(newSearchService(clusterService, indicesService,

0 commit comments

Comments
 (0)