Skip to content

Commit f405c2c

Browse files
author
David Roberts
authored
[ML] Update ML results mappings on process start (#37758)
This change moves the update to the results index mappings from the open job action to the code that starts the autodetect process. When a rolling upgrade is performed we need to update the mappings for already-open jobs that are reassigned from an old version node to a new version node, but the open job action is not called in this case. Closes #37607
1 parent 0cb696a commit f405c2c

File tree

7 files changed

+455
-301
lines changed

7 files changed

+455
-301
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,24 @@
55
*/
66
package org.elasticsearch.xpack.core.ml.job.persistence;
77

8+
import org.apache.logging.log4j.LogManager;
9+
import org.apache.logging.log4j.Logger;
10+
import org.apache.logging.log4j.message.ParameterizedMessage;
11+
import org.elasticsearch.ElasticsearchException;
812
import org.elasticsearch.Version;
13+
import org.elasticsearch.action.ActionListener;
14+
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
15+
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
16+
import org.elasticsearch.client.Client;
17+
import org.elasticsearch.cluster.ClusterState;
18+
import org.elasticsearch.cluster.metadata.AliasOrIndex;
19+
import org.elasticsearch.cluster.metadata.IndexMetaData;
20+
import org.elasticsearch.cluster.metadata.MappingMetaData;
21+
import org.elasticsearch.common.CheckedSupplier;
22+
import org.elasticsearch.common.collect.ImmutableOpenMap;
923
import org.elasticsearch.common.xcontent.XContentBuilder;
24+
import org.elasticsearch.index.Index;
25+
import org.elasticsearch.plugins.MapperPlugin;
1026
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig;
1127
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
1228
import org.elasticsearch.xpack.core.ml.datafeed.DelayedDataCheckConfig;
@@ -38,10 +54,16 @@
3854
import org.elasticsearch.xpack.core.ml.notifications.AuditMessage;
3955

4056
import java.io.IOException;
57+
import java.util.ArrayList;
58+
import java.util.Arrays;
4159
import java.util.Collection;
4260
import java.util.Collections;
61+
import java.util.List;
62+
import java.util.Map;
4363

4464
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
65+
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
66+
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
4567

4668
/**
4769
* Static methods to create Elasticsearch index mappings for the autodetect
@@ -107,6 +129,8 @@ public class ElasticsearchMappings {
107129

108130
static final String RAW = "raw";
109131

132+
private static final Logger logger = LogManager.getLogger(ElasticsearchMappings.class);
133+
110134
private ElasticsearchMappings() {
111135
}
112136

@@ -968,4 +992,94 @@ public static XContentBuilder auditMessageMapping() throws IOException {
968992
.endObject()
969993
.endObject();
970994
}
995+
996+
static String[] mappingRequiresUpdate(ClusterState state, String[] concreteIndices, Version minVersion) throws IOException {
997+
List<String> indicesToUpdate = new ArrayList<>();
998+
999+
ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> currentMapping = state.metaData().findMappings(concreteIndices,
1000+
new String[] {DOC_TYPE}, MapperPlugin.NOOP_FIELD_FILTER);
1001+
1002+
for (String index : concreteIndices) {
1003+
ImmutableOpenMap<String, MappingMetaData> innerMap = currentMapping.get(index);
1004+
if (innerMap != null) {
1005+
MappingMetaData metaData = innerMap.get(DOC_TYPE);
1006+
try {
1007+
@SuppressWarnings("unchecked")
1008+
Map<String, Object> meta = (Map<String, Object>) metaData.sourceAsMap().get("_meta");
1009+
if (meta != null) {
1010+
String versionString = (String) meta.get("version");
1011+
if (versionString == null) {
1012+
logger.info("Version of mappings for [{}] not found, recreating", index);
1013+
indicesToUpdate.add(index);
1014+
continue;
1015+
}
1016+
1017+
Version mappingVersion = Version.fromString(versionString);
1018+
1019+
if (mappingVersion.onOrAfter(minVersion)) {
1020+
continue;
1021+
} else {
1022+
logger.info("Mappings for [{}] are outdated [{}], updating it[{}].", index, mappingVersion, Version.CURRENT);
1023+
indicesToUpdate.add(index);
1024+
continue;
1025+
}
1026+
} else {
1027+
logger.info("Version of mappings for [{}] not found, recreating", index);
1028+
indicesToUpdate.add(index);
1029+
continue;
1030+
}
1031+
} catch (Exception e) {
1032+
logger.error(new ParameterizedMessage("Failed to retrieve mapping version for [{}], recreating", index), e);
1033+
indicesToUpdate.add(index);
1034+
continue;
1035+
}
1036+
} else {
1037+
logger.info("No mappings found for [{}], recreating", index);
1038+
indicesToUpdate.add(index);
1039+
}
1040+
}
1041+
return indicesToUpdate.toArray(new String[indicesToUpdate.size()]);
1042+
}
1043+
1044+
public static void addDocMappingIfMissing(String alias, CheckedSupplier<XContentBuilder, IOException> mappingSupplier,
1045+
Client client, ClusterState state, ActionListener<Boolean> listener) {
1046+
AliasOrIndex aliasOrIndex = state.metaData().getAliasAndIndexLookup().get(alias);
1047+
if (aliasOrIndex == null) {
1048+
// The index has never been created yet
1049+
listener.onResponse(true);
1050+
return;
1051+
}
1052+
String[] concreteIndices = aliasOrIndex.getIndices().stream().map(IndexMetaData::getIndex).map(Index::getName)
1053+
.toArray(String[]::new);
1054+
1055+
String[] indicesThatRequireAnUpdate;
1056+
try {
1057+
indicesThatRequireAnUpdate = mappingRequiresUpdate(state, concreteIndices, Version.CURRENT);
1058+
} catch (IOException e) {
1059+
listener.onFailure(e);
1060+
return;
1061+
}
1062+
1063+
if (indicesThatRequireAnUpdate.length > 0) {
1064+
try (XContentBuilder mapping = mappingSupplier.get()) {
1065+
PutMappingRequest putMappingRequest = new PutMappingRequest(indicesThatRequireAnUpdate);
1066+
putMappingRequest.type(DOC_TYPE);
1067+
putMappingRequest.source(mapping);
1068+
executeAsyncWithOrigin(client, ML_ORIGIN, PutMappingAction.INSTANCE, putMappingRequest,
1069+
ActionListener.wrap(response -> {
1070+
if (response.isAcknowledged()) {
1071+
listener.onResponse(true);
1072+
} else {
1073+
listener.onFailure(new ElasticsearchException("Attempt to put missing mapping in indices "
1074+
+ Arrays.toString(indicesThatRequireAnUpdate) + " was not acknowledged"));
1075+
}
1076+
}, listener::onFailure));
1077+
} catch (IOException e) {
1078+
listener.onFailure(e);
1079+
}
1080+
} else {
1081+
logger.trace("Mappings are up to date.");
1082+
listener.onResponse(true);
1083+
}
1084+
}
9711085
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,18 @@
99
import com.fasterxml.jackson.core.JsonParseException;
1010
import com.fasterxml.jackson.core.JsonParser;
1111
import com.fasterxml.jackson.core.JsonToken;
12+
import org.elasticsearch.Version;
13+
import org.elasticsearch.cluster.ClusterName;
14+
import org.elasticsearch.cluster.ClusterState;
15+
import org.elasticsearch.cluster.metadata.IndexMetaData;
16+
import org.elasticsearch.cluster.metadata.MappingMetaData;
17+
import org.elasticsearch.cluster.metadata.MetaData;
1218
import org.elasticsearch.common.Strings;
19+
import org.elasticsearch.common.settings.Settings;
1320
import org.elasticsearch.common.xcontent.XContentBuilder;
1421
import org.elasticsearch.common.xcontent.XContentParser;
1522
import org.elasticsearch.test.ESTestCase;
23+
import org.elasticsearch.test.VersionUtils;
1624
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
1725
import org.elasticsearch.xpack.core.ml.job.config.Job;
1826
import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
@@ -30,6 +38,8 @@
3038
import java.io.IOException;
3139
import java.nio.charset.StandardCharsets;
3240
import java.util.Arrays;
41+
import java.util.Collections;
42+
import java.util.HashMap;
3343
import java.util.HashSet;
3444
import java.util.List;
3545
import java.util.Map;
@@ -128,6 +138,110 @@ public void testTermFieldMapping() throws IOException {
128138
assertNull(instanceMapping);
129139
}
130140

141+
142+
public void testMappingRequiresUpdateNoMapping() throws IOException {
143+
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
144+
ClusterState cs = csBuilder.build();
145+
String[] indices = new String[] { "no_index" };
146+
147+
assertArrayEquals(new String[] { "no_index" }, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT));
148+
}
149+
150+
public void testMappingRequiresUpdateNullMapping() throws IOException {
151+
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("null_mapping", null));
152+
String[] indices = new String[] { "null_index" };
153+
assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT));
154+
}
155+
156+
public void testMappingRequiresUpdateNoVersion() throws IOException {
157+
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("no_version_field", "NO_VERSION_FIELD"));
158+
String[] indices = new String[] { "no_version_field" };
159+
assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT));
160+
}
161+
162+
public void testMappingRequiresUpdateRecentMappingVersion() throws IOException {
163+
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_current", Version.CURRENT.toString()));
164+
String[] indices = new String[] { "version_current" };
165+
assertArrayEquals(new String[] {}, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT));
166+
}
167+
168+
public void testMappingRequiresUpdateMaliciousMappingVersion() throws IOException {
169+
ClusterState cs = getClusterStateWithMappingsWithMetaData(
170+
Collections.singletonMap("version_current", Collections.singletonMap("nested", "1.0")));
171+
String[] indices = new String[] { "version_nested" };
172+
assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT));
173+
}
174+
175+
public void testMappingRequiresUpdateBogusMappingVersion() throws IOException {
176+
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_bogus", "0.0"));
177+
String[] indices = new String[] { "version_bogus" };
178+
assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT));
179+
}
180+
181+
public void testMappingRequiresUpdateNewerMappingVersion() throws IOException {
182+
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_newer", Version.CURRENT));
183+
String[] indices = new String[] { "version_newer" };
184+
assertArrayEquals(new String[] {}, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousVersion()));
185+
}
186+
187+
public void testMappingRequiresUpdateNewerMappingVersionMinor() throws IOException {
188+
ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_newer_minor", Version.CURRENT));
189+
String[] indices = new String[] { "version_newer_minor" };
190+
assertArrayEquals(new String[] {},
191+
ElasticsearchMappings.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousMinorVersion()));
192+
}
193+
194+
public void testMappingRequiresUpdateSomeVersionMix() throws IOException {
195+
Map<String, Object> versionMix = new HashMap<>();
196+
versionMix.put("version_54", Version.V_5_4_0);
197+
versionMix.put("version_current", Version.CURRENT);
198+
versionMix.put("version_null", null);
199+
versionMix.put("version_current2", Version.CURRENT);
200+
versionMix.put("version_bogus", "0.0.0");
201+
versionMix.put("version_current3", Version.CURRENT);
202+
versionMix.put("version_bogus2", "0.0.0");
203+
204+
ClusterState cs = getClusterStateWithMappingsWithMetaData(versionMix);
205+
String[] indices = new String[] { "version_54", "version_null", "version_bogus", "version_bogus2" };
206+
assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT));
207+
}
208+
209+
private ClusterState getClusterStateWithMappingsWithMetaData(Map<String, Object> namesAndVersions) throws IOException {
210+
MetaData.Builder metaDataBuilder = MetaData.builder();
211+
212+
for (Map.Entry<String, Object> entry : namesAndVersions.entrySet()) {
213+
214+
String indexName = entry.getKey();
215+
Object version = entry.getValue();
216+
217+
IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName);
218+
indexMetaData.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
219+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0));
220+
221+
Map<String, Object> mapping = new HashMap<>();
222+
Map<String, Object> properties = new HashMap<>();
223+
for (int i = 0; i < 10; i++) {
224+
properties.put("field" + i, Collections.singletonMap("type", "string"));
225+
}
226+
mapping.put("properties", properties);
227+
228+
Map<String, Object> meta = new HashMap<>();
229+
if (version != null && version.equals("NO_VERSION_FIELD") == false) {
230+
meta.put("version", version);
231+
}
232+
mapping.put("_meta", meta);
233+
234+
indexMetaData.putMapping(new MappingMetaData(ElasticsearchMappings.DOC_TYPE, mapping));
235+
236+
metaDataBuilder.put(indexMetaData);
237+
}
238+
MetaData metaData = metaDataBuilder.build();
239+
240+
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"));
241+
csBuilder.metaData(metaData);
242+
return csBuilder.build();
243+
}
244+
131245
private Set<String> collectResultsDocFieldNames() throws IOException {
132246
// Only the mappings for the results index should be added below. Do NOT add mappings for other indexes here.
133247
return collectFieldNames(ElasticsearchMappings.resultsMapping());

0 commit comments

Comments
 (0)