From 9adcb63442a38a3ee1f85b5360916b93beb86693 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Mon, 21 Jan 2019 16:17:18 +0000 Subject: [PATCH 1/3] [ML] Update ML results mappings on process start This change moves the update to the results index mappings from the open job action to the code that starts the autodetect process. When a rolling upgrade is performed we need to update the mappings for already-open jobs that are reassigned from an old version node to a new version node, but the open job action is not called in this case. Closes #37607 --- .../persistence/ElasticsearchMappings.java | 112 ++++++++++++++++ .../ElasticsearchMappingsTests.java | 101 +++++++++++++++ .../ml/action/TransportOpenJobAction.java | 121 +----------------- .../autodetect/AutodetectProcessManager.java | 18 ++- .../action/TransportOpenJobActionTests.java | 93 -------------- .../AutodetectProcessManagerTests.java | 1 + .../upgrades/MlMappingsUpgradeIT.java | 101 +++++++++++++++ 7 files changed, 335 insertions(+), 212 deletions(-) create mode 100644 x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMappingsUpgradeIT.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java index fb0db771fa581..f35e6c1029992 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java @@ -5,8 +5,23 @@ */ package org.elasticsearch.xpack.core.ml.job.persistence; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasOrIndex; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.common.CheckedSupplier; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.Index; +import org.elasticsearch.plugins.MapperPlugin; import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DelayedDataCheckConfig; @@ -38,10 +53,16 @@ import org.elasticsearch.xpack.core.ml.notifications.AuditMessage; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.List; +import java.util.Map; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; /** * Static methods to create Elasticsearch index mappings for the autodetect @@ -964,4 +985,95 @@ public static XContentBuilder auditMessageMapping() throws IOException { .endObject() .endObject(); } + + static String[] mappingRequiresUpdate(ClusterState state, String[] concreteIndices, Version minVersion, + Logger logger) throws IOException { + List indicesToUpdate = new ArrayList<>(); + + ImmutableOpenMap> currentMapping = state.metaData().findMappings(concreteIndices, + new String[] {DOC_TYPE}, MapperPlugin.NOOP_FIELD_FILTER); + + for (String index : concreteIndices) { + ImmutableOpenMap innerMap = currentMapping.get(index); + if (innerMap != null) { + MappingMetaData metaData = innerMap.get(DOC_TYPE); + try { + @SuppressWarnings("unchecked") + Map meta = (Map) metaData.sourceAsMap().get("_meta"); + if (meta != null) { + String versionString = (String) meta.get("version"); + if (versionString == null) { + logger.info("Version of mappings for [{}] not found, recreating", index); + indicesToUpdate.add(index); + continue; + } + + Version mappingVersion = Version.fromString(versionString); + + if (mappingVersion.onOrAfter(minVersion)) { + continue; + } else { + logger.info("Mappings for [{}] are outdated [{}], updating it[{}].", index, mappingVersion, Version.CURRENT); + indicesToUpdate.add(index); + continue; + } + } else { + logger.info("Version of mappings for [{}] not found, recreating", index); + indicesToUpdate.add(index); + continue; + } + } catch (Exception e) { + logger.error(new ParameterizedMessage("Failed to retrieve mapping version for [{}], recreating", index), e); + indicesToUpdate.add(index); + continue; + } + } else { + logger.info("No mappings found for [{}], recreating", index); + indicesToUpdate.add(index); + } + } + return indicesToUpdate.toArray(new String[indicesToUpdate.size()]); + } + + public static void addDocMappingIfMissing(String alias, CheckedSupplier mappingSupplier, + Client client, Logger logger, ClusterState state, ActionListener listener) { + AliasOrIndex aliasOrIndex = state.metaData().getAliasAndIndexLookup().get(alias); + if (aliasOrIndex == null) { + // The index has never been created yet + listener.onResponse(true); + return; + } + String[] concreteIndices = aliasOrIndex.getIndices().stream().map(IndexMetaData::getIndex).map(Index::getName) + .toArray(String[]::new); + + String[] indicesThatRequireAnUpdate; + try { + indicesThatRequireAnUpdate = mappingRequiresUpdate(state, concreteIndices, Version.CURRENT, logger); + } catch (IOException e) { + listener.onFailure(e); + return; + } + + if (indicesThatRequireAnUpdate.length > 0) { + try (XContentBuilder mapping = mappingSupplier.get()) { + PutMappingRequest putMappingRequest = new PutMappingRequest(indicesThatRequireAnUpdate); + putMappingRequest.type(DOC_TYPE); + putMappingRequest.source(mapping); + executeAsyncWithOrigin(client, ML_ORIGIN, PutMappingAction.INSTANCE, putMappingRequest, + ActionListener.wrap(response -> { + if (response.isAcknowledged()) { + listener.onResponse(true); + } else { + listener.onFailure(new ElasticsearchException("Attempt to put missing mapping in indices " + + Arrays.toString(indicesThatRequireAnUpdate) + " was not acknowledged")); + } + }, listener::onFailure)); + } catch (IOException e) { + listener.onFailure(e); + } + } else { + logger.trace("Mappings are up to date."); + listener.onResponse(true); + } + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java index e4ce536a3ccf6..beb47b75064a7 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java @@ -9,10 +9,18 @@ import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; @@ -30,6 +38,8 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -128,6 +138,97 @@ public void testTermFieldMapping() throws IOException { assertNull(instanceMapping); } + + public void testMappingRequiresUpdateNoMapping() throws IOException { + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); + ClusterState cs = csBuilder.build(); + String[] indices = new String[] { "no_index" }; + + assertArrayEquals(new String[] { "no_index" }, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); + } + + public void testMappingRequiresUpdateNullMapping() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("null_mapping", null)); + String[] indices = new String[] { "null_index" }; + assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); + } + + public void testMappingRequiresUpdateNoVersion() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("no_version_field", "NO_VERSION_FIELD")); + String[] indices = new String[] { "no_version_field" }; + assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); + } + + public void testMappingRequiresUpdateRecentMappingVersion() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_current", Version.CURRENT.toString())); + String[] indices = new String[] { "version_current" }; + assertArrayEquals(new String[] {}, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); + } + + public void testMappingRequiresUpdateMaliciousMappingVersion() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData( + Collections.singletonMap("version_current", Collections.singletonMap("nested", "1.0"))); + String[] indices = new String[] { "version_nested" }; + assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); + } + + public void testMappingRequiresUpdateBogusMappingVersion() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_bogus", "0.0")); + String[] indices = new String[] { "version_bogus" }; + assertArrayEquals(indices, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); + } + + public void testMappingRequiresUpdateNewerMappingVersion() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_newer", Version.CURRENT)); + String[] indices = new String[] { "version_newer" }; + assertArrayEquals(new String[] {}, ElasticsearchMappings.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousVersion(), + logger)); + } + + public void testMappingRequiresUpdateNewerMappingVersionMinor() throws IOException { + ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_newer_minor", Version.CURRENT)); + String[] indices = new String[] { "version_newer_minor" }; + assertArrayEquals(new String[] {}, + ElasticsearchMappings.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousMinorVersion(), logger)); + } + + + private ClusterState getClusterStateWithMappingsWithMetaData(Map namesAndVersions) throws IOException { + MetaData.Builder metaDataBuilder = MetaData.builder(); + + for (Map.Entry entry : namesAndVersions.entrySet()) { + + String indexName = entry.getKey(); + Object version = entry.getValue(); + + IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName); + indexMetaData.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)); + + Map mapping = new HashMap<>(); + Map properties = new HashMap<>(); + for (int i = 0; i < 10; i++) { + properties.put("field" + i, Collections.singletonMap("type", "string")); + } + mapping.put("properties", properties); + + Map meta = new HashMap<>(); + if (version != null && version.equals("NO_VERSION_FIELD") == false) { + meta.put("version", version); + } + mapping.put("_meta", meta); + + indexMetaData.putMapping(new MappingMetaData(ElasticsearchMappings.DOC_TYPE, mapping)); + + metaDataBuilder.put(indexMetaData); + } + MetaData metaData = metaDataBuilder.build(); + + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); + csBuilder.metaData(metaData); + return csBuilder.build(); + } + private Set collectResultsDocFieldNames() throws IOException { // Only the mappings for the results index should be added below. Do NOT add mappings for other indexes here. return collectFieldNames(ElasticsearchMappings.resultsMapping()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 2da89c359e793..79c4f56580b71 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -7,14 +7,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceAlreadyExistsException; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -23,21 +19,14 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.AliasOrIndex; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.index.Index; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.persistent.AllocatedPersistentTask; @@ -45,7 +34,6 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.persistent.PersistentTasksService; -import org.elasticsearch.plugins.MapperPlugin; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; @@ -69,9 +57,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.process.MlMemoryTracker; -import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.LinkedList; import java.util.List; @@ -405,54 +391,6 @@ private static boolean jobHasRules(Job job) { return job.getAnalysisConfig().getDetectors().stream().anyMatch(d -> d.getRules().isEmpty() == false); } - static String[] mappingRequiresUpdate(ClusterState state, String[] concreteIndices, Version minVersion, - Logger logger) throws IOException { - List indicesToUpdate = new ArrayList<>(); - - ImmutableOpenMap> currentMapping = state.metaData().findMappings(concreteIndices, - new String[] { ElasticsearchMappings.DOC_TYPE }, MapperPlugin.NOOP_FIELD_FILTER); - - for (String index : concreteIndices) { - ImmutableOpenMap innerMap = currentMapping.get(index); - if (innerMap != null) { - MappingMetaData metaData = innerMap.get(ElasticsearchMappings.DOC_TYPE); - try { - Map meta = (Map) metaData.sourceAsMap().get("_meta"); - if (meta != null) { - String versionString = (String) meta.get("version"); - if (versionString == null) { - logger.info("Version of mappings for [{}] not found, recreating", index); - indicesToUpdate.add(index); - continue; - } - - Version mappingVersion = Version.fromString(versionString); - - if (mappingVersion.onOrAfter(minVersion)) { - continue; - } else { - logger.info("Mappings for [{}] are outdated [{}], updating it[{}].", index, mappingVersion, Version.CURRENT); - indicesToUpdate.add(index); - continue; - } - } else { - logger.info("Version of mappings for [{}] not found, recreating", index); - indicesToUpdate.add(index); - continue; - } - } catch (Exception e) { - logger.error(new ParameterizedMessage("Failed to retrieve mapping version for [{}], recreating", index), e); - indicesToUpdate.add(index); - continue; - } - } else { - logger.info("No mappings found for [{}], recreating", index); - indicesToUpdate.add(index); - } - } - return indicesToUpdate.toArray(new String[indicesToUpdate.size()]); - } - @Override protected String executor() { // This api doesn't do heavy or blocking operations (just delegates PersistentTasksService), @@ -527,25 +465,18 @@ public void onFailure(Exception e) { ); // Try adding state doc mapping - ActionListener resultsPutMappingHandler = ActionListener.wrap( + ActionListener getJobHandler = ActionListener.wrap( response -> { - addDocMappingIfMissing(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), ElasticsearchMappings::stateMapping, - state, jobUpdateListener); + ElasticsearchMappings.addDocMappingIfMissing(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), + ElasticsearchMappings::stateMapping, client, logger, state, jobUpdateListener); }, listener::onFailure ); // Get the job config jobConfigProvider.getJob(jobParams.getJobId(), ActionListener.wrap( builder -> { - try { - jobParams.setJob(builder.build()); - - // Try adding results doc mapping - addDocMappingIfMissing(AnomalyDetectorsIndex.jobResultsAliasedName(jobParams.getJobId()), - ElasticsearchMappings::resultsMapping, state, resultsPutMappingHandler); - } catch (Exception e) { - listener.onFailure(e); - } + jobParams.setJob(builder.build()); + getJobHandler.onResponse(null); }, listener::onFailure )); @@ -620,48 +551,6 @@ public void onFailure(Exception e) { ); } - private void addDocMappingIfMissing(String alias, CheckedSupplier mappingSupplier, ClusterState state, - ActionListener listener) { - AliasOrIndex aliasOrIndex = state.metaData().getAliasAndIndexLookup().get(alias); - if (aliasOrIndex == null) { - // The index has never been created yet - listener.onResponse(true); - return; - } - String[] concreteIndices = aliasOrIndex.getIndices().stream().map(IndexMetaData::getIndex).map(Index::getName) - .toArray(String[]::new); - - String[] indicesThatRequireAnUpdate; - try { - indicesThatRequireAnUpdate = mappingRequiresUpdate(state, concreteIndices, Version.CURRENT, logger); - } catch (IOException e) { - listener.onFailure(e); - return; - } - - if (indicesThatRequireAnUpdate.length > 0) { - try (XContentBuilder mapping = mappingSupplier.get()) { - PutMappingRequest putMappingRequest = new PutMappingRequest(indicesThatRequireAnUpdate); - putMappingRequest.type(ElasticsearchMappings.DOC_TYPE); - putMappingRequest.source(mapping); - executeAsyncWithOrigin(client, ML_ORIGIN, PutMappingAction.INSTANCE, putMappingRequest, - ActionListener.wrap(response -> { - if (response.isAcknowledged()) { - listener.onResponse(true); - } else { - listener.onFailure(new ElasticsearchException("Attempt to put missing mapping in indices " - + Arrays.toString(indicesThatRequireAnUpdate) + " was not acknowledged")); - } - }, listener::onFailure)); - } catch (IOException e) { - listener.onFailure(e); - } - } else { - logger.trace("Mappings are uptodate."); - listener.onResponse(true); - } - } - public static class OpenJobPersistentTasksExecutor extends PersistentTasksExecutor { private static final Logger logger = LogManager.getLogger(OpenJobPersistentTasksExecutor.class); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 9695d73ed05c5..310e492847935 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -40,6 +40,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; @@ -417,7 +418,9 @@ public void onFailure(Exception e) { public void openJob(JobTask jobTask, ClusterState clusterState, Consumer closeHandler) { String jobId = jobTask.getJobId(); logger.info("Opening job [{}]", jobId); - AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, ActionListener.wrap( + + // Start the process + ActionListener stateAliasHandler = ActionListener.wrap( r -> { jobManager.getJob(jobId, ActionListener.wrap( job -> { @@ -427,7 +430,6 @@ public void openJob(JobTask jobTask, ClusterState clusterState, Consumer { // We need to fork, otherwise we restore model state from a network thread (several GET api calls): @@ -477,7 +479,17 @@ protected void doRun() { closeHandler )); }, - closeHandler)); + closeHandler); + + // Make sure the state index and alias exist + ActionListener resultsMappingUpdateHandler = ActionListener.wrap( + ack -> AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, stateAliasHandler), + closeHandler + ); + + // Try adding the results doc mapping - this updates to the latest version if an old mapping is present + ElasticsearchMappings.addDocMappingIfMissing(AnomalyDetectorsIndex.jobResultsAliasedName(jobId), + ElasticsearchMappings::resultsMapping, client, logger, clusterState, resultsMappingUpdateHandler); } private void createProcessAndSetRunning(ProcessContext processContext, Job job, AutodetectParams params, Consumer handler) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index 9bd32bdc9eff3..da54b33d27597 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -15,7 +15,6 @@ import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -38,7 +37,6 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.VersionUtils; import org.elasticsearch.xpack.core.ml.MlMetaIndex; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; @@ -53,7 +51,6 @@ import org.elasticsearch.xpack.core.ml.job.config.RuleCondition; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; -import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.notifications.AuditorField; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; @@ -61,7 +58,6 @@ import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.junit.Before; -import java.io.IOException; import java.net.InetAddress; import java.util.ArrayList; import java.util.Arrays; @@ -486,59 +482,6 @@ public void testVerifyIndicesPrimaryShardsAreActive() { assertEquals(indexToRemove, result.get(0)); } - public void testMappingRequiresUpdateNoMapping() throws IOException { - ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); - ClusterState cs = csBuilder.build(); - String[] indices = new String[] { "no_index" }; - - assertArrayEquals(new String[] { "no_index" }, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); - } - - public void testMappingRequiresUpdateNullMapping() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("null_mapping", null)); - String[] indices = new String[] { "null_index" }; - assertArrayEquals(indices, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); - } - - public void testMappingRequiresUpdateNoVersion() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("no_version_field", "NO_VERSION_FIELD")); - String[] indices = new String[] { "no_version_field" }; - assertArrayEquals(indices, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); - } - - public void testMappingRequiresUpdateRecentMappingVersion() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_current", Version.CURRENT.toString())); - String[] indices = new String[] { "version_current" }; - assertArrayEquals(new String[] {}, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); - } - - public void testMappingRequiresUpdateMaliciousMappingVersion() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData( - Collections.singletonMap("version_current", Collections.singletonMap("nested", "1.0"))); - String[] indices = new String[] { "version_nested" }; - assertArrayEquals(indices, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); - } - - public void testMappingRequiresUpdateBogusMappingVersion() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_bogus", "0.0")); - String[] indices = new String[] { "version_bogus" }; - assertArrayEquals(indices, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, Version.CURRENT, logger)); - } - - public void testMappingRequiresUpdateNewerMappingVersion() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_newer", Version.CURRENT)); - String[] indices = new String[] { "version_newer" }; - assertArrayEquals(new String[] {}, TransportOpenJobAction.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousVersion(), - logger)); - } - - public void testMappingRequiresUpdateNewerMappingVersionMinor() throws IOException { - ClusterState cs = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("version_newer_minor", Version.CURRENT)); - String[] indices = new String[] { "version_newer_minor" }; - assertArrayEquals(new String[] {}, - TransportOpenJobAction.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousMinorVersion(), logger)); - } - public void testNodeNameAndVersion() { TransportAddress ta = new TransportAddress(InetAddress.getLoopbackAddress(), 9300); Map attributes = new HashMap<>(); @@ -641,42 +584,6 @@ private void addIndices(MetaData.Builder metaData, RoutingTable.Builder routingT } } - private ClusterState getClusterStateWithMappingsWithMetaData(Map namesAndVersions) throws IOException { - MetaData.Builder metaDataBuilder = MetaData.builder(); - - for (Map.Entry entry : namesAndVersions.entrySet()) { - - String indexName = entry.getKey(); - Object version = entry.getValue(); - - IndexMetaData.Builder indexMetaData = IndexMetaData.builder(indexName); - indexMetaData.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)); - - Map mapping = new HashMap<>(); - Map properties = new HashMap<>(); - for (int i = 0; i < 10; i++) { - properties.put("field" + i, Collections.singletonMap("type", "string")); - } - mapping.put("properties", properties); - - Map meta = new HashMap<>(); - if (version != null && version.equals("NO_VERSION_FIELD") == false) { - meta.put("version", version); - } - mapping.put("_meta", meta); - - indexMetaData.putMapping(new MappingMetaData(ElasticsearchMappings.DOC_TYPE, mapping)); - - metaDataBuilder.put(indexMetaData); - } - MetaData metaData = metaDataBuilder.build(); - - ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); - csBuilder.metaData(metaData); - return csBuilder.build(); - } - private static Job jobWithRules(String jobId) { DetectionRule rule = new DetectionRule.Builder(Collections.singletonList( new RuleCondition(RuleCondition.AppliesTo.TYPICAL, Operator.LT, 100.0) diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 9024d0edcee9c..ba319f1a90781 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -141,6 +141,7 @@ public void setup() throws Exception { when(metaData.getAliasAndIndexLookup()).thenReturn(aliasOrIndexSortedMap); clusterState = mock(ClusterState.class); when(clusterState.getMetaData()).thenReturn(metaData); + when(clusterState.metaData()).thenReturn(metaData); doAnswer(invocationOnMock -> { @SuppressWarnings("unchecked") diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMappingsUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMappingsUpgradeIT.java new file mode 100644 index 0000000000000..5602f14ef2267 --- /dev/null +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMappingsUpgradeIT.java @@ -0,0 +1,101 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.upgrades; + +import org.elasticsearch.Version; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ml.job.config.AnalysisConfig; +import org.elasticsearch.client.ml.job.config.DataDescription; +import org.elasticsearch.client.ml.job.config.Detector; +import org.elasticsearch.client.ml.job.config.Job; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.test.rest.XPackRestTestHelper; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class MlMappingsUpgradeIT extends AbstractUpgradeTestCase { + + private static final String JOB_ID = "ml-mappings-upgrade-job"; + + @Override + protected Collection templatesToWaitFor() { + return Stream.concat(XPackRestTestHelper.ML_POST_V660_TEMPLATES.stream(), + super.templatesToWaitFor().stream()).collect(Collectors.toSet()); + } + + /** + * The purpose of this test is to ensure that when a job is open through a rolling upgrade we upgrade the results + * index mappings when it is assigned to an upgraded node even if no other ML endpoint is called after the upgrade + */ + public void testMappingsUpgrade() throws Exception { + + switch (CLUSTER_TYPE) { + case OLD: + createAndOpenTestJob(); + break; + case MIXED: + // We don't know whether the job is on an old or upgraded node, so cannot assert that the mappings have been upgraded + break; + case UPGRADED: + assertUpgradedMappings(); + break; + default: + throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]"); + } + } + + private void createAndOpenTestJob() throws IOException { + + Detector.Builder d = new Detector.Builder("metric", "responsetime"); + d.setByFieldName("airline"); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build())); + analysisConfig.setBucketSpan(TimeValue.timeValueMinutes(10)); + Job.Builder job = new Job.Builder(JOB_ID); + job.setAnalysisConfig(analysisConfig); + job.setDataDescription(new DataDescription.Builder()); + + Request putJob = new Request("PUT", "_ml/anomaly_detectors/" + JOB_ID); + putJob.setJsonEntity(Strings.toString(job.build())); + Response response = client().performRequest(putJob); + assertEquals(200, response.getStatusLine().getStatusCode()); + + Request openJob = new Request("POST", "_ml/anomaly_detectors/" + JOB_ID + "/_open"); + response = client().performRequest(openJob); + assertEquals(200, response.getStatusLine().getStatusCode()); + } + + @SuppressWarnings("unchecked") + private void assertUpgradedMappings() throws Exception { + + assertBusy(() -> { + Request getMappings = new Request("GET", AnomalyDetectorsIndex.resultsWriteAlias(JOB_ID) + "/_mappings"); + Response response = client().performRequest(getMappings); + + Map responseLevel = entityAsMap(response); + assertNotNull(responseLevel); + Map indexLevel = (Map) responseLevel.get(".ml-anomalies-shared"); + assertNotNull(indexLevel); + Map mappingsLevel = (Map) indexLevel.get("mappings"); + assertNotNull(mappingsLevel); + Map metaLevel = (Map) mappingsLevel.get("_meta"); + assertEquals(Collections.singletonMap("version", Version.CURRENT.toString()), metaLevel); + Map propertiesLevel = (Map) mappingsLevel.get("properties"); + assertNotNull(propertiesLevel); + // TODO: as the years go by, the field we assert on here should be changed + // to the most recent field we've added that is NOT of type "keyword" + Map fieldLevel = (Map) propertiesLevel.get("multi_bucket_impact"); + assertEquals(Collections.singletonMap("type", "double"), fieldLevel); + }); + } +} From 2ad255845db9962537b5161e31522605325c9a8b Mon Sep 17 00:00:00 2001 From: David Roberts Date: Tue, 22 Jan 2019 16:07:01 +0000 Subject: [PATCH 2/3] Uses ElasticsearchMappings's own logger --- .../job/persistence/ElasticsearchMappings.java | 10 ++++++---- .../persistence/ElasticsearchMappingsTests.java | 17 ++++++++--------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java index f35e6c1029992..0eb2e666916dc 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.ml.job.persistence; +import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; @@ -128,6 +129,8 @@ public class ElasticsearchMappings { static final String RAW = "raw"; + private static final Logger logger = LogManager.getLogger(ElasticsearchMappings.class); + private ElasticsearchMappings() { } @@ -986,8 +989,7 @@ public static XContentBuilder auditMessageMapping() throws IOException { .endObject(); } - static String[] mappingRequiresUpdate(ClusterState state, String[] concreteIndices, Version minVersion, - Logger logger) throws IOException { + static String[] mappingRequiresUpdate(ClusterState state, String[] concreteIndices, Version minVersion) throws IOException { List indicesToUpdate = new ArrayList<>(); ImmutableOpenMap> currentMapping = state.metaData().findMappings(concreteIndices, @@ -1036,7 +1038,7 @@ static String[] mappingRequiresUpdate(ClusterState state, String[] concreteIndic } public static void addDocMappingIfMissing(String alias, CheckedSupplier mappingSupplier, - Client client, Logger logger, ClusterState state, ActionListener listener) { + Client client, ClusterState state, ActionListener listener) { AliasOrIndex aliasOrIndex = state.metaData().getAliasAndIndexLookup().get(alias); if (aliasOrIndex == null) { // The index has never been created yet @@ -1048,7 +1050,7 @@ public static void addDocMappingIfMissing(String alias, CheckedSupplier Date: Tue, 22 Jan 2019 16:30:39 +0000 Subject: [PATCH 3/3] Fix compilation --- .../elasticsearch/xpack/ml/action/TransportOpenJobAction.java | 2 +- .../ml/job/process/autodetect/AutodetectProcessManager.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 79c4f56580b71..a5aed9b5b5957 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -468,7 +468,7 @@ public void onFailure(Exception e) { ActionListener getJobHandler = ActionListener.wrap( response -> { ElasticsearchMappings.addDocMappingIfMissing(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), - ElasticsearchMappings::stateMapping, client, logger, state, jobUpdateListener); + ElasticsearchMappings::stateMapping, client, state, jobUpdateListener); }, listener::onFailure ); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 310e492847935..dd3656ee04b67 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -489,7 +489,7 @@ protected void doRun() { // Try adding the results doc mapping - this updates to the latest version if an old mapping is present ElasticsearchMappings.addDocMappingIfMissing(AnomalyDetectorsIndex.jobResultsAliasedName(jobId), - ElasticsearchMappings::resultsMapping, client, logger, clusterState, resultsMappingUpdateHandler); + ElasticsearchMappings::resultsMapping, client, clusterState, resultsMappingUpdateHandler); } private void createProcessAndSetRunning(ProcessContext processContext, Job job, AutodetectParams params, Consumer handler) {