From 73587b3f763ffd7e57349d7c09c8b1f1f305a32c Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Wed, 9 Oct 2019 13:32:59 +0200 Subject: [PATCH 01/10] rename audit index and create an alias for accessing it --- .../TransformInternalIndexConstants.java | 3 +- .../xpack/transform/Transform.java | 3 +- .../TransformClusterStateListener.java | 89 +++++++++++++++++++ .../persistence/TransformInternalIndex.java | 2 + 4 files changed, 95 insertions(+), 2 deletions(-) create mode 100644 x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformClusterStateListener.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java index f0828c281cedf..f27acd5c42b79 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java @@ -29,9 +29,10 @@ public final class TransformInternalIndexConstants { public static final String INDEX_NAME_PATTERN = INDEX_PATTERN + "*"; // audit index - public static final String AUDIT_TEMPLATE_VERSION = "1"; + public static final String AUDIT_TEMPLATE_VERSION = "000001"; public static final String AUDIT_INDEX_PREFIX = ".transform-notifications-"; public static final String AUDIT_INDEX_PATTERN = AUDIT_INDEX_PREFIX + "*"; + public static final String AUDIT_INDEX_DEPRECATED = ".data-frame-notifications-1"; public static final String AUDIT_INDEX_PATTERN_DEPRECATED = ".data-frame-notifications-*"; public static final String AUDIT_INDEX_READ_ALIAS = ".transform-notifications-read"; diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index 8c1e4fdf9348b..e7d8dbe4a60e5 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -222,7 +222,8 @@ public Collection createComponents(Client client, ClusterService cluster transformConfigManager.get(), transformAuditor.get())); - return Arrays.asList(transformConfigManager.get(), transformAuditor.get(), transformCheckpointService.get()); + return Arrays.asList(transformConfigManager.get(), transformAuditor.get(), transformCheckpointService.get(), + new TransformClusterStateListener(clusterService, client)); } @Override diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformClusterStateListener.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformClusterStateListener.java new file mode 100644 index 0000000000000..bfc89405d6cf6 --- /dev/null +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformClusterStateListener.java @@ -0,0 +1,89 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.transform; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; + +class TransformClusterStateListener implements ClusterStateListener { + + private static final Logger logger = LogManager.getLogger(TransformClusterStateListener.class); + + private final Client client; + private final AtomicBoolean isIndexCreationInProgress = new AtomicBoolean(false); + + TransformClusterStateListener(ClusterService clusterService, Client client) { + this.client = client; + clusterService.addListener(this); + logger.debug("Created TransformClusterStateListener"); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + // Wait until the gateway has recovered from disk. + return; + } + + // The atomic flag prevents multiple simultaneous attempts to run alias creation + // if there is a flurry of cluster state updates in quick succession + if (event.localNodeMaster() && isIndexCreationInProgress.compareAndSet(false, true)) { + createAuditAliasForDataFrameBWC(event.state(), client, ActionListener.wrap( + r -> { + isIndexCreationInProgress.set(false); + if (r) { + logger.info("Created alias for deprecated data frame notifications index"); + } else { + logger.debug("Skipped creating alias for deprecated data frame notifications index"); + } + }, + e -> { + isIndexCreationInProgress.set(false); + logger.error("Error creating alias for deprecated data frame notifications index", e); + })); + } + } + + private static void createAuditAliasForDataFrameBWC(ClusterState state, Client client, final ActionListener finalListener) { + + // check if alias already exists + if (state.getMetaData().getAliasAndIndexLookup().containsKey(TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS)) { + finalListener.onResponse(false); + return; + } + + // check if old audit index exists, no need to create the alias if it does not + if (state.getMetaData().getAliasAndIndexLookup().containsKey(TransformInternalIndexConstants.AUDIT_INDEX_DEPRECATED)) { + finalListener.onResponse(false); + return; + } + + final IndicesAliasesRequest request = client.admin().indices().prepareAliases() + .addAlias(TransformInternalIndexConstants.AUDIT_INDEX_DEPRECATED, TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS) + .request(); + + executeAsyncWithOrigin(client.threadPool().getThreadContext(), TRANSFORM_ORIGIN, request, + ActionListener.wrap(r -> finalListener.onResponse(r.isAcknowledged()), finalListener::onFailure), + client.admin().indices()::aliases); + } + +} diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java index 2d6437ede18d9..934c2508de3f2 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.service.ClusterService; @@ -93,6 +94,7 @@ public static IndexTemplateMetaData getAuditIndexTemplateMetaData() throws IOExc .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1")) .putMapping(MapperService.SINGLE_MAPPING_NAME, Strings.toString(auditMappings())) + .putAlias(AliasMetaData.builder(TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS)) .build(); return transformTemplate; } From 00da79b38c8283b7009a1e042d3b6ce3ee14062a Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Wed, 9 Oct 2019 14:39:53 +0200 Subject: [PATCH 02/10] rename internal index and search on old name pattern --- .../TransformInternalIndexConstants.java | 7 ++-- .../TransformInfoTransportAction.java | 5 ++- .../TransformUsageTransportAction.java | 8 +++-- .../action/TransportGetTransformAction.java | 3 +- .../persistence/TransformConfigManager.java | 33 ++++++++++++++----- 5 files changed, 40 insertions(+), 16 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java index f27acd5c42b79..cb1d41d5ea21c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java @@ -22,11 +22,14 @@ public final class TransformInternalIndexConstants { */ // internal index - public static final String INDEX_VERSION = "2"; - public static final String INDEX_PATTERN = ".data-frame-internal-"; + + // version is not a rollover pattern, however padded because sort is string based + public static final String INDEX_VERSION = "003"; + public static final String INDEX_PATTERN = ".transform-internal-"; public static final String LATEST_INDEX_VERSIONED_NAME = INDEX_PATTERN + INDEX_VERSION; public static final String LATEST_INDEX_NAME = LATEST_INDEX_VERSIONED_NAME; public static final String INDEX_NAME_PATTERN = INDEX_PATTERN + "*"; + public static final String INDEX_NAME_PATTERN_DEPRECATED = ".data-frame-internal-*"; // audit index public static final String AUDIT_TEMPLATE_VERSION = "000001"; diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformInfoTransportAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformInfoTransportAction.java index 9250ea441f48d..b64ec7a70da3f 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformInfoTransportAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformInfoTransportAction.java @@ -110,7 +110,10 @@ static void getStatisticSummations(Client client, ActionListener listener) { - DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(TransformInternalIndexConstants.INDEX_NAME_PATTERN) + DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest( + TransformInternalIndexConstants.INDEX_NAME_PATTERN, + TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED) .setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery() .mustNot(QueryBuilders.termQuery("_index", TransformInternalIndexConstants.LATEST_INDEX_NAME)) .filter(QueryBuilders.termQuery("_id", TransformConfig.documentId(transformId))))) @@ -195,7 +197,8 @@ public void deleteOldTransformConfigurations(String transformId, ActionListener< * @param listener listener to alert on completion */ public void deleteOldTransformStoredDocuments(String transformId, ActionListener listener) { - DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(TransformInternalIndexConstants.INDEX_NAME_PATTERN) + DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest( + TransformInternalIndexConstants.INDEX_NAME_PATTERN, TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED) .setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery() .mustNot(QueryBuilders.termQuery("_index", TransformInternalIndexConstants.LATEST_INDEX_NAME)) .filter(QueryBuilders.termQuery("_id", TransformStoredDoc.documentId(transformId))))) @@ -261,7 +264,9 @@ private void putTransformConfiguration(TransformConfig transformConfig, */ public void getTransformCheckpoint(String transformId, long checkpoint, ActionListener resultListener) { QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", TransformCheckpoint.documentId(transformId, checkpoint)); - SearchRequest searchRequest = client.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN) + SearchRequest searchRequest = client + .prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN, + TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED) .setQuery(queryBuilder) // use sort to get the last .addSort("_index", SortOrder.DESC) @@ -290,7 +295,9 @@ public void getTransformCheckpoint(String transformId, long checkpoint, ActionLi */ public void getTransformConfiguration(String transformId, ActionListener resultListener) { QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", TransformConfig.documentId(transformId)); - SearchRequest searchRequest = client.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN) + SearchRequest searchRequest = client + .prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN, + TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED) .setQuery(queryBuilder) // use sort to get the last .addSort("_index", SortOrder.DESC) @@ -321,7 +328,9 @@ public void getTransformConfigurationForUpdate(String transformId, ActionListener> configAndVersionListener) { QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", TransformConfig.documentId(transformId)); - SearchRequest searchRequest = client.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN) + SearchRequest searchRequest = client + .prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN, + TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED) .setQuery(queryBuilder) // use sort to get the last .addSort("_index", SortOrder.DESC) @@ -362,7 +371,9 @@ public void expandTransformIds(String transformIdsExpression, String[] idTokens = ExpandedIdsMatcher.tokenizeExpression(transformIdsExpression); QueryBuilder queryBuilder = buildQueryFromTokenizedIds(idTokens, TransformConfig.NAME); - SearchRequest request = client.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN) + SearchRequest request = client + .prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN, + TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED) .addSort(TransformField.ID.getPreferredName(), SortOrder.ASC) .setFrom(pageParams.getFrom()) .setTrackTotalHits(true) @@ -413,7 +424,7 @@ public void deleteTransform(String transformId, ActionListener listener DeleteByQueryRequest request = new DeleteByQueryRequest() .setAbortOnVersionConflict(false); //since these documents are not updated, a conflict just means it was deleted previously - request.indices(TransformInternalIndexConstants.INDEX_NAME_PATTERN); + request.indices(TransformInternalIndexConstants.INDEX_NAME_PATTERN, TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED); QueryBuilder query = QueryBuilders.termQuery(TransformField.ID.getPreferredName(), transformId); request.setQuery(query); request.setRefresh(true); @@ -472,7 +483,9 @@ public void putOrUpdateTransformStoredDoc(TransformStoredDoc stats, public void getTransformStoredDoc(String transformId, ActionListener> resultListener) { QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", TransformStoredDoc.documentId(transformId)); - SearchRequest searchRequest = client.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN) + SearchRequest searchRequest = client + .prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN, + TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED) .setQuery(queryBuilder) // use sort to get the last .addSort("_index", SortOrder.DESC) @@ -508,7 +521,9 @@ public void getTransformStoredDoc(Collection transformIds, ActionListene .filter(QueryBuilders.termsQuery(TransformField.ID.getPreferredName(), transformIds)) .filter(QueryBuilders.termQuery(TransformField.INDEX_DOC_TYPE.getPreferredName(), TransformStoredDoc.NAME))); - SearchRequest searchRequest = client.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN) + SearchRequest searchRequest = client + .prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN, + TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED) .addSort(TransformField.ID.getPreferredName(), SortOrder.ASC) .addSort("_index", SortOrder.DESC) .setQuery(builder) From 124d3eedbffbf63994a99ec3b1e072f7452b9cbc Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 10 Oct 2019 10:18:23 +0200 Subject: [PATCH 03/10] use old and new indexes in rolling upgrade IT --- .../upgrades/TransformSurvivesUpgradeIT.java | 38 +++++++++++-------- .../test/rest/XPackRestTestConstants.java | 1 + 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/TransformSurvivesUpgradeIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/TransformSurvivesUpgradeIT.java index 7ada6e80daa2f..82ba1830ef853 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/TransformSurvivesUpgradeIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/TransformSurvivesUpgradeIT.java @@ -55,6 +55,7 @@ import static org.elasticsearch.xpack.test.rest.XPackRestTestConstants.TRANSFORM_INTERNAL_INDEX_PREFIX_DEPRECATED; import static org.elasticsearch.xpack.test.rest.XPackRestTestConstants.TRANSFORM_NOTIFICATIONS_INDEX_PREFIX; import static org.elasticsearch.xpack.test.rest.XPackRestTestConstants.TRANSFORM_NOTIFICATIONS_INDEX_PREFIX_DEPRECATED; +import static org.elasticsearch.xpack.test.rest.XPackRestTestConstants.TRANSFORM_TASK_NAME; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -64,8 +65,8 @@ public class TransformSurvivesUpgradeIT extends AbstractUpgradeTestCase { private static final Version UPGRADE_FROM_VERSION = Version.fromString(System.getProperty("tests.upgrade_from_version")); - private static final String DATAFRAME_ENDPOINT = "/_transform/"; - private static final String DATAFRAME_ENDPOINT_DEPRECATED = "/_data_frame/transforms/"; + private static final String TRANSFORM_ENDPOINT = "/_transform/"; + private static final String TRANSFORM_ENDPOINT_DEPRECATED = "/_data_frame/transforms/"; private static final String CONTINUOUS_TRANSFORM_ID = "continuous-transform-upgrade-job"; private static final String CONTINUOUS_TRANSFORM_SOURCE = "transform-upgrade-continuous-source"; private static final List ENTITIES = Stream.iterate(1, n -> n + 1) @@ -106,8 +107,8 @@ public void waitForTemplates() throws Exception { }); } - protected static void waitForPendingDataFrameTasks() throws Exception { - waitForPendingTasks(adminClient(), taskName -> taskName.startsWith("data_frame/transforms") == false); + protected static void waitForPendingTransformTasks() throws Exception { + waitForPendingTasks(adminClient(), taskName -> taskName.startsWith(TRANSFORM_TASK_NAME) == false); } @Override @@ -122,8 +123,8 @@ protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOE * The purpose of this test is to ensure that when a job is open through a rolling upgrade we upgrade the results * index mappings when it is assigned to an upgraded node even if no other ML endpoint is called after the upgrade */ - public void testDataFramesRollingUpgrade() throws Exception { - assumeTrue("Continuous data frames not supported until 7.3", UPGRADE_FROM_VERSION.onOrAfter(Version.V_7_3_0)); + public void testTransformRollingUpgrade() throws Exception { + assumeTrue("Continuous transform not supported until 7.3", UPGRADE_FROM_VERSION.onOrAfter(Version.V_7_3_0)); Request adjustLoggingLevels = new Request("PUT", "/_cluster/settings"); adjustLoggingLevels.setJsonEntity( "{\"transient\": {" + @@ -138,7 +139,7 @@ public void testDataFramesRollingUpgrade() throws Exception { switch (CLUSTER_TYPE) { case OLD: client().performRequest(waitForYellow); - createAndStartContinuousDataFrame(); + createAndStartContinuousTransform(); break; case MIXED: client().performRequest(waitForYellow); @@ -146,11 +147,11 @@ public void testDataFramesRollingUpgrade() throws Exception { if (Booleans.parseBoolean(System.getProperty("tests.first_round")) == false) { lastCheckpoint = 2; } - verifyContinuousDataFrameHandlesData(lastCheckpoint); + verifyContinuousTransformHandlesData(lastCheckpoint); break; case UPGRADED: client().performRequest(waitForYellow); - verifyContinuousDataFrameHandlesData(3); + verifyContinuousTransformHandlesData(3); cleanUpTransforms(); break; default: @@ -161,10 +162,10 @@ public void testDataFramesRollingUpgrade() throws Exception { private void cleanUpTransforms() throws Exception { stopTransform(CONTINUOUS_TRANSFORM_ID); deleteTransform(CONTINUOUS_TRANSFORM_ID); - waitForPendingDataFrameTasks(); + waitForPendingTransformTasks(); } - private void createAndStartContinuousDataFrame() throws Exception { + private void createAndStartContinuousTransform() throws Exception { createIndex(CONTINUOUS_TRANSFORM_SOURCE); long totalDocsWrittenSum = 0; for (TimeValue bucket : BUCKETS) { @@ -204,9 +205,9 @@ private void createAndStartContinuousDataFrame() throws Exception { } @SuppressWarnings("unchecked") - private void verifyContinuousDataFrameHandlesData(long expectedLastCheckpoint) throws Exception { + private void verifyContinuousTransformHandlesData(long expectedLastCheckpoint) throws Exception { - // A continuous data frame should automatically become started when it gets assigned to a node + // A continuous transform should automatically become started when it gets assigned to a node // if it was assigned to the node that was removed from the cluster assertBusy(() -> { TransformStats stateAndStats = getTransformStats(CONTINUOUS_TRANSFORM_ID); @@ -250,7 +251,11 @@ private void verifyContinuousDataFrameHandlesData(long expectedLastCheckpoint) t } private void awaitWrittenIndexerState(String id, Consumer> responseAssertion) throws Exception { - Request getStatsDocsRequest = new Request("GET", ".data-frame-internal-*/_search"); + Request getStatsDocsRequest = new Request("GET", + TRANSFORM_INTERNAL_INDEX_PREFIX + "*," + + TRANSFORM_INTERNAL_INDEX_PREFIX_DEPRECATED + "*" + + "/_search"); + getStatsDocsRequest.setJsonEntity("{\n" + " \"query\": {\n" + " \"bool\": {\n" + @@ -271,7 +276,8 @@ private void awaitWrittenIndexerState(String id, Consumer> responseAss "}"); assertBusy(() -> { // Want to make sure we get the latest docs - client().performRequest(new Request("POST", ".data-frame-internal-*/_refresh")); + client().performRequest(new Request("POST", TRANSFORM_INTERNAL_INDEX_PREFIX + "*/_refresh")); + client().performRequest(new Request("POST", TRANSFORM_INTERNAL_INDEX_PREFIX_DEPRECATED + "*/_refresh")); Response response = client().performRequest(getStatsDocsRequest); assertEquals(200, response.getStatusLine().getStatusCode()); Map responseBody = entityAsMap(response); @@ -290,7 +296,7 @@ private void awaitWrittenIndexerState(String id, String indexerState) throws Exc } private String getTransformEndpoint() { - return CLUSTER_TYPE == ClusterType.UPGRADED ? DATAFRAME_ENDPOINT : DATAFRAME_ENDPOINT_DEPRECATED; + return CLUSTER_TYPE == ClusterType.UPGRADED ? TRANSFORM_ENDPOINT : TRANSFORM_ENDPOINT_DEPRECATED; } private void putTransform(String id, TransformConfig config) throws IOException { diff --git a/x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestConstants.java b/x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestConstants.java index 3aabcb22d6caa..64fa7ceb2f313 100644 --- a/x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestConstants.java +++ b/x-pack/qa/src/main/java/org/elasticsearch/xpack/test/rest/XPackRestTestConstants.java @@ -34,6 +34,7 @@ public final class XPackRestTestConstants { CONFIG_INDEX); // Transform constants: + public static final String TRANSFORM_TASK_NAME = "data_frame/transforms"; public static final String TRANSFORM_INTERNAL_INDEX_PREFIX = ".transform-internal-"; public static final String TRANSFORM_NOTIFICATIONS_INDEX_PREFIX = ".transform-notifications-"; public static final String TRANSFORM_INTERNAL_INDEX_PREFIX_DEPRECATED = ".data-frame-internal-"; From bde0d3ef1d2a8ff4728b6a16e11025bc90890987 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 10 Oct 2019 10:47:42 +0200 Subject: [PATCH 04/10] update index name --- .../test/upgraded_cluster/80_data_frame_jobs_crud.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml index 9f21fda1c0fad..6e0b3eb69a5b5 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml @@ -271,6 +271,6 @@ setup: - do: indices.get_mapping: - index: .data-frame-internal-2 - - match: { \.data-frame-internal-2.mappings.dynamic: "false" } - - match: { \.data-frame-internal-2.mappings.properties.id.type: "keyword" } + index: .transform-internal-003 + - match: { \.transform-internal-003.mappings.dynamic: "false" } + - match: { \.transform-internal-003.mappings.properties.id.type: "keyword" } From 517fae77f69ac0c4c204d0a595774e635733f1b6 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 10 Oct 2019 11:02:19 +0200 Subject: [PATCH 05/10] improve doc strings --- .../xpack/transform/persistence/TransformConfigManager.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManager.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManager.java index dbbb53427b472..16ff1c2bc60d7 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManager.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManager.java @@ -73,7 +73,7 @@ * * Versioned Index: * - * We wrap several indexes under 1 pattern: ".data-frame-internal-1", ".data-frame-internal-2", ".data-frame-internal-n" while + * We wrap several indexes under 1 pattern: ".data-frame-internal-1", ".data-frame-internal-2", ".transform-internal-n" while * n is the _current_ version of the index. * * - all gets/reads and dbq as well are searches on all indexes, while last-one-wins, so the result with the highest version is uses @@ -288,7 +288,7 @@ public void getTransformCheckpoint(String transformId, long checkpoint, ActionLi /** * Get the transform configuration for a given transform id. This function is only for internal use. For transforms returned via GET - * data_frame/transforms, see the @link{TransportGetTransformAction} + * _transform, see the @link{TransportGetTransformAction} * * @param transformId the transform id * @param resultListener listener to call after inner request has returned @@ -319,7 +319,7 @@ public void getTransformConfiguration(String transformId, ActionListener Date: Thu, 10 Oct 2019 12:49:32 +0200 Subject: [PATCH 06/10] fix alias creation for old notification index --- .../xpack/transform/TransformClusterStateListener.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformClusterStateListener.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformClusterStateListener.java index bfc89405d6cf6..8df8cf8d2644e 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformClusterStateListener.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformClusterStateListener.java @@ -65,14 +65,14 @@ public void clusterChanged(ClusterChangedEvent event) { private static void createAuditAliasForDataFrameBWC(ClusterState state, Client client, final ActionListener finalListener) { - // check if alias already exists - if (state.getMetaData().getAliasAndIndexLookup().containsKey(TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS)) { + // check if old audit index exists, no need to create the alias if it does not + if (state.getMetaData().getAliasAndIndexLookup().containsKey(TransformInternalIndexConstants.AUDIT_INDEX_DEPRECATED) == false) { finalListener.onResponse(false); return; } - // check if old audit index exists, no need to create the alias if it does not - if (state.getMetaData().getAliasAndIndexLookup().containsKey(TransformInternalIndexConstants.AUDIT_INDEX_DEPRECATED)) { + if (state.getMetaData().getAliasAndIndexLookup().get(TransformInternalIndexConstants.AUDIT_INDEX_DEPRECATED).getIndices().stream() + .anyMatch(metaData -> metaData.getAliases().containsKey(TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS))) { finalListener.onResponse(false); return; } From c5862fdf96027875fec02e77cd06f87a684cb03b Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 10 Oct 2019 14:42:47 +0200 Subject: [PATCH 07/10] use new role for auditor IT --- .../xpack/transform/integration/TransformAuditorIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformAuditorIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformAuditorIT.java index 01abbbae7d75d..8ac9f4922a50d 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformAuditorIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformAuditorIT.java @@ -48,7 +48,7 @@ public void createIndexes() throws IOException { createReviewsIndex(); indicesCreated = true; setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME); - setupUser(TEST_USER_NAME, Arrays.asList("data_frame_transforms_admin", DATA_ACCESS_ROLE)); + setupUser(TEST_USER_NAME, Arrays.asList("transform_admin", DATA_ACCESS_ROLE)); } @SuppressWarnings("unchecked") From 986ac275d7580ac7babb9a0f31220b9419f337db Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Fri, 11 Oct 2019 11:02:07 +0200 Subject: [PATCH 08/10] improve code doc --- .../xpack/transform/persistence/TransformConfigManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManager.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManager.java index 16ff1c2bc60d7..8aa25fb053e22 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManager.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManager.java @@ -73,8 +73,8 @@ * * Versioned Index: * - * We wrap several indexes under 1 pattern: ".data-frame-internal-1", ".data-frame-internal-2", ".transform-internal-n" while - * n is the _current_ version of the index. + * We wrap several indexes under 1 pattern: ".transform-internal-001", ".transform-internal-002", ".transform-internal-n" while + * n is the _current_ version of the index. For BWC we also search in ".data-frame-internal-1", ".data-frame-internal-2" * * - all gets/reads and dbq as well are searches on all indexes, while last-one-wins, so the result with the highest version is uses * - all puts and updates go into the _current_ version of the index, in case of updates this can leave dups behind From 3419d5543afefd33c5adafe157d647f7cc1255da Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Fri, 11 Oct 2019 11:02:38 +0200 Subject: [PATCH 09/10] use proper pattern --- .../xpack/transform/integration/TransformInternalIndexIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformInternalIndexIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformInternalIndexIT.java index 9c76c0baf98a2..1e29db2c88425 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformInternalIndexIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformInternalIndexIT.java @@ -43,7 +43,7 @@ public class TransformInternalIndexIT extends ESRestTestCase { private static final String CURRENT_INDEX = TransformInternalIndexConstants.LATEST_INDEX_NAME; - private static final String OLD_INDEX = TransformInternalIndexConstants.INDEX_PATTERN + "1"; + private static final String OLD_INDEX = TransformInternalIndexConstants.INDEX_PATTERN + "001"; public void testUpdateDeletesOldTransformConfig() throws Exception { From 3645d5db70d15851a32b3726475957f32f8cdf6d Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Fri, 11 Oct 2019 11:02:58 +0200 Subject: [PATCH 10/10] add an integration test for testing alias creation --- .../transform/integration/TransformAuditorIT.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformAuditorIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformAuditorIT.java index 8ac9f4922a50d..43121a1478035 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformAuditorIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformAuditorIT.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.transform.integration; import org.elasticsearch.client.Request; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants; import org.junit.Before; @@ -67,6 +69,7 @@ public void testAuditorWritesAudits() throws Exception { request.setJsonEntity("{\"query\":{\"term\":{\"transform_id\":\"simple_pivot_for_audit\"}}}"); assertBusy(() -> { assertTrue(indexExists(TransformInternalIndexConstants.AUDIT_INDEX)); + assertTrue(aliasExists(TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS)); }); // Since calls to write the AbstractAuditor are sent and forgot (async) we could have returned from the start, // finished the job (as this is a very short DF job), all without the audit being fully written. @@ -85,4 +88,16 @@ public void testAuditorWritesAudits() throws Exception { }); } + + public void testAliasCreatedforBWCIndexes() throws Exception { + Settings.Builder settings = Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0); + + createIndex(TransformInternalIndexConstants.AUDIT_INDEX_DEPRECATED, settings.build()); + assertBusy(() -> { + assertTrue(aliasExists(TransformInternalIndexConstants.AUDIT_INDEX_DEPRECATED, + TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS)); + }); + } }