Skip to content

Commit 3da91d5

Browse files
author
Hendrik Muhs
authored
[Transform] Rename internal indexes for transform plugin (#47788) (#47900)
rename internal indexes of transform plugin - rename audit index and create an alias for accessing it, BWC: add an alias for old indexes to keep them working, kibana UI will switch to use the read alias - rename config index and provide BWC to read from old and new ones
1 parent 5dd6bd6 commit 3da91d5

File tree

12 files changed

+177
-40
lines changed

12 files changed

+177
-40
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,20 @@ public final class TransformInternalIndexConstants {
2222
*/
2323

2424
// internal index
25-
public static final String INDEX_VERSION = "2";
26-
public static final String INDEX_PATTERN = ".data-frame-internal-";
25+
26+
// version is not a rollover pattern, however padded because sort is string based
27+
public static final String INDEX_VERSION = "003";
28+
public static final String INDEX_PATTERN = ".transform-internal-";
2729
public static final String LATEST_INDEX_VERSIONED_NAME = INDEX_PATTERN + INDEX_VERSION;
2830
public static final String LATEST_INDEX_NAME = LATEST_INDEX_VERSIONED_NAME;
2931
public static final String INDEX_NAME_PATTERN = INDEX_PATTERN + "*";
32+
public static final String INDEX_NAME_PATTERN_DEPRECATED = ".data-frame-internal-*";
3033

3134
// audit index
32-
public static final String AUDIT_TEMPLATE_VERSION = "1";
35+
public static final String AUDIT_TEMPLATE_VERSION = "000001";
3336
public static final String AUDIT_INDEX_PREFIX = ".transform-notifications-";
3437
public static final String AUDIT_INDEX_PATTERN = AUDIT_INDEX_PREFIX + "*";
38+
public static final String AUDIT_INDEX_DEPRECATED = ".data-frame-notifications-1";
3539
public static final String AUDIT_INDEX_PATTERN_DEPRECATED = ".data-frame-notifications-*";
3640

3741
public static final String AUDIT_INDEX_READ_ALIAS = ".transform-notifications-read";

x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformAuditorIT.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
package org.elasticsearch.xpack.transform.integration;
88

99
import org.elasticsearch.client.Request;
10+
import org.elasticsearch.cluster.metadata.IndexMetaData;
11+
import org.elasticsearch.common.settings.Settings;
1012
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
1113
import org.junit.Before;
1214

@@ -48,7 +50,7 @@ public void createIndexes() throws IOException {
4850
createReviewsIndex();
4951
indicesCreated = true;
5052
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME);
51-
setupUser(TEST_USER_NAME, Arrays.asList("data_frame_transforms_admin", DATA_ACCESS_ROLE));
53+
setupUser(TEST_USER_NAME, Arrays.asList("transform_admin", DATA_ACCESS_ROLE));
5254
}
5355

5456
@SuppressWarnings("unchecked")
@@ -67,6 +69,7 @@ public void testAuditorWritesAudits() throws Exception {
6769
request.setJsonEntity("{\"query\":{\"term\":{\"transform_id\":\"simple_pivot_for_audit\"}}}");
6870
assertBusy(() -> {
6971
assertTrue(indexExists(TransformInternalIndexConstants.AUDIT_INDEX));
72+
assertTrue(aliasExists(TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS));
7073
});
7174
// Since calls to write the AbstractAuditor are sent and forgot (async) we could have returned from the start,
7275
// finished the job (as this is a very short DF job), all without the audit being fully written.
@@ -85,4 +88,16 @@ public void testAuditorWritesAudits() throws Exception {
8588
});
8689

8790
}
91+
92+
public void testAliasCreatedforBWCIndexes() throws Exception {
93+
Settings.Builder settings = Settings.builder()
94+
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
95+
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0);
96+
97+
createIndex(TransformInternalIndexConstants.AUDIT_INDEX_DEPRECATED, settings.build());
98+
assertBusy(() -> {
99+
assertTrue(aliasExists(TransformInternalIndexConstants.AUDIT_INDEX_DEPRECATED,
100+
TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS));
101+
});
102+
}
88103
}

x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformInternalIndexIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class TransformInternalIndexIT extends ESRestTestCase {
4343

4444

4545
private static final String CURRENT_INDEX = TransformInternalIndexConstants.LATEST_INDEX_NAME;
46-
private static final String OLD_INDEX = TransformInternalIndexConstants.INDEX_PATTERN + "1";
46+
private static final String OLD_INDEX = TransformInternalIndexConstants.INDEX_PATTERN + "001";
4747

4848

4949
public void testUpdateDeletesOldTransformConfig() throws Exception {

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,8 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
232232
transformConfigManager.get(),
233233
transformAuditor.get()));
234234

235-
return Arrays.asList(transformConfigManager.get(), transformAuditor.get(), transformCheckpointService.get());
235+
return Arrays.asList(transformConfigManager.get(), transformAuditor.get(), transformCheckpointService.get(),
236+
new TransformClusterStateListener(clusterService, client));
236237
}
237238

238239
@Override
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.transform;
8+
9+
import org.apache.logging.log4j.LogManager;
10+
import org.apache.logging.log4j.Logger;
11+
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
13+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
14+
import org.elasticsearch.client.Client;
15+
import org.elasticsearch.cluster.ClusterChangedEvent;
16+
import org.elasticsearch.cluster.ClusterState;
17+
import org.elasticsearch.cluster.ClusterStateListener;
18+
import org.elasticsearch.cluster.service.ClusterService;
19+
import org.elasticsearch.gateway.GatewayService;
20+
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
21+
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
24+
import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN;
25+
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
26+
27+
class TransformClusterStateListener implements ClusterStateListener {
28+
29+
private static final Logger logger = LogManager.getLogger(TransformClusterStateListener.class);
30+
31+
private final Client client;
32+
private final AtomicBoolean isIndexCreationInProgress = new AtomicBoolean(false);
33+
34+
TransformClusterStateListener(ClusterService clusterService, Client client) {
35+
this.client = client;
36+
clusterService.addListener(this);
37+
logger.debug("Created TransformClusterStateListener");
38+
}
39+
40+
@Override
41+
public void clusterChanged(ClusterChangedEvent event) {
42+
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
43+
// Wait until the gateway has recovered from disk.
44+
return;
45+
}
46+
47+
// The atomic flag prevents multiple simultaneous attempts to run alias creation
48+
// if there is a flurry of cluster state updates in quick succession
49+
if (event.localNodeMaster() && isIndexCreationInProgress.compareAndSet(false, true)) {
50+
createAuditAliasForDataFrameBWC(event.state(), client, ActionListener.wrap(
51+
r -> {
52+
isIndexCreationInProgress.set(false);
53+
if (r) {
54+
logger.info("Created alias for deprecated data frame notifications index");
55+
} else {
56+
logger.debug("Skipped creating alias for deprecated data frame notifications index");
57+
}
58+
},
59+
e -> {
60+
isIndexCreationInProgress.set(false);
61+
logger.error("Error creating alias for deprecated data frame notifications index", e);
62+
}));
63+
}
64+
}
65+
66+
private static void createAuditAliasForDataFrameBWC(ClusterState state, Client client, final ActionListener<Boolean> finalListener) {
67+
68+
// check if old audit index exists, no need to create the alias if it does not
69+
if (state.getMetaData().getAliasAndIndexLookup().containsKey(TransformInternalIndexConstants.AUDIT_INDEX_DEPRECATED) == false) {
70+
finalListener.onResponse(false);
71+
return;
72+
}
73+
74+
if (state.getMetaData().getAliasAndIndexLookup().get(TransformInternalIndexConstants.AUDIT_INDEX_DEPRECATED).getIndices().stream()
75+
.anyMatch(metaData -> metaData.getAliases().containsKey(TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS))) {
76+
finalListener.onResponse(false);
77+
return;
78+
}
79+
80+
final IndicesAliasesRequest request = client.admin().indices().prepareAliases()
81+
.addAlias(TransformInternalIndexConstants.AUDIT_INDEX_DEPRECATED, TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS)
82+
.request();
83+
84+
executeAsyncWithOrigin(client.threadPool().getThreadContext(), TRANSFORM_ORIGIN, request,
85+
ActionListener.<AcknowledgedResponse>wrap(r -> finalListener.onResponse(r.isAcknowledged()), finalListener::onFailure),
86+
client.admin().indices()::aliases);
87+
}
88+
89+
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformFeatureSet.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,10 @@ static void getStatisticSummations(Client client, ActionListener<TransformIndexe
196196
.filter(QueryBuilders.termQuery(TransformField.INDEX_DOC_TYPE.getPreferredName(),
197197
TransformStoredDoc.NAME)));
198198

199-
SearchRequestBuilder requestBuilder = client.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN)
199+
SearchRequestBuilder requestBuilder = client
200+
.prepareSearch(
201+
TransformInternalIndexConstants.INDEX_NAME_PATTERN,
202+
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
200203
.setSize(0)
201204
.setQuery(queryBuilder);
202205

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ protected ParseField getResultsField() {
6363

6464
@Override
6565
protected String[] getIndices() {
66-
return new String[]{TransformInternalIndexConstants.INDEX_NAME_PATTERN};
66+
return new String[] { TransformInternalIndexConstants.INDEX_NAME_PATTERN,
67+
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED };
6768
}
6869

6970
@Override

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformConfigManager.java

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@
7373
*
7474
* Versioned Index:
7575
*
76-
* We wrap several indexes under 1 pattern: ".data-frame-internal-1", ".data-frame-internal-2", ".data-frame-internal-n" while
77-
* n is the _current_ version of the index.
76+
* We wrap several indexes under 1 pattern: ".transform-internal-001", ".transform-internal-002", ".transform-internal-n" while
77+
* n is the _current_ version of the index. For BWC we also search in ".data-frame-internal-1", ".data-frame-internal-2"
7878
*
7979
* - all gets/reads and dbq as well are searches on all indexes, while last-one-wins, so the result with the highest version is uses
8080
* - all puts and updates go into the _current_ version of the index, in case of updates this can leave dups behind
@@ -168,7 +168,9 @@ public void updateTransformConfiguration(TransformConfig transformConfig,
168168
* @param listener listener to alert on completion
169169
*/
170170
public void deleteOldTransformConfigurations(String transformId, ActionListener<Boolean> listener) {
171-
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(TransformInternalIndexConstants.INDEX_NAME_PATTERN)
171+
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(
172+
TransformInternalIndexConstants.INDEX_NAME_PATTERN,
173+
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
172174
.setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
173175
.mustNot(QueryBuilders.termQuery("_index", TransformInternalIndexConstants.LATEST_INDEX_NAME))
174176
.filter(QueryBuilders.termQuery("_id", TransformConfig.documentId(transformId)))))
@@ -195,7 +197,8 @@ public void deleteOldTransformConfigurations(String transformId, ActionListener<
195197
* @param listener listener to alert on completion
196198
*/
197199
public void deleteOldTransformStoredDocuments(String transformId, ActionListener<Boolean> listener) {
198-
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(TransformInternalIndexConstants.INDEX_NAME_PATTERN)
200+
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(
201+
TransformInternalIndexConstants.INDEX_NAME_PATTERN, TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
199202
.setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
200203
.mustNot(QueryBuilders.termQuery("_index", TransformInternalIndexConstants.LATEST_INDEX_NAME))
201204
.filter(QueryBuilders.termQuery("_id", TransformStoredDoc.documentId(transformId)))))
@@ -261,7 +264,9 @@ private void putTransformConfiguration(TransformConfig transformConfig,
261264
*/
262265
public void getTransformCheckpoint(String transformId, long checkpoint, ActionListener<TransformCheckpoint> resultListener) {
263266
QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", TransformCheckpoint.documentId(transformId, checkpoint));
264-
SearchRequest searchRequest = client.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN)
267+
SearchRequest searchRequest = client
268+
.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN,
269+
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
265270
.setQuery(queryBuilder)
266271
// use sort to get the last
267272
.addSort("_index", SortOrder.DESC)
@@ -283,14 +288,16 @@ public void getTransformCheckpoint(String transformId, long checkpoint, ActionLi
283288

284289
/**
285290
* Get the transform configuration for a given transform id. This function is only for internal use. For transforms returned via GET
286-
* data_frame/transforms, see the @link{TransportGetTransformAction}
291+
* _transform, see the @link{TransportGetTransformAction}
287292
*
288293
* @param transformId the transform id
289294
* @param resultListener listener to call after inner request has returned
290295
*/
291296
public void getTransformConfiguration(String transformId, ActionListener<TransformConfig> resultListener) {
292297
QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", TransformConfig.documentId(transformId));
293-
SearchRequest searchRequest = client.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN)
298+
SearchRequest searchRequest = client
299+
.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN,
300+
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
294301
.setQuery(queryBuilder)
295302
// use sort to get the last
296303
.addSort("_index", SortOrder.DESC)
@@ -312,7 +319,7 @@ public void getTransformConfiguration(String transformId, ActionListener<Transfo
312319

313320
/**
314321
* Get the transform configuration for a given transform id. This function is only for internal use. For transforms returned via GET
315-
* data_frame/transforms, see the @link{TransportGetTransformAction}
322+
* _transform, see the @link{TransportGetTransformAction}
316323
*
317324
* @param transformId the transform id
318325
* @param configAndVersionListener listener to call after inner request has returned
@@ -321,7 +328,9 @@ public void getTransformConfigurationForUpdate(String transformId,
321328
ActionListener<Tuple<TransformConfig,
322329
SeqNoPrimaryTermAndIndex>> configAndVersionListener) {
323330
QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", TransformConfig.documentId(transformId));
324-
SearchRequest searchRequest = client.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN)
331+
SearchRequest searchRequest = client
332+
.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN,
333+
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
325334
.setQuery(queryBuilder)
326335
// use sort to get the last
327336
.addSort("_index", SortOrder.DESC)
@@ -362,7 +371,9 @@ public void expandTransformIds(String transformIdsExpression,
362371
String[] idTokens = ExpandedIdsMatcher.tokenizeExpression(transformIdsExpression);
363372
QueryBuilder queryBuilder = buildQueryFromTokenizedIds(idTokens, TransformConfig.NAME);
364373

365-
SearchRequest request = client.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN)
374+
SearchRequest request = client
375+
.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN,
376+
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
366377
.addSort(TransformField.ID.getPreferredName(), SortOrder.ASC)
367378
.setFrom(pageParams.getFrom())
368379
.setTrackTotalHits(true)
@@ -413,7 +424,7 @@ public void deleteTransform(String transformId, ActionListener<Boolean> listener
413424
DeleteByQueryRequest request = new DeleteByQueryRequest()
414425
.setAbortOnVersionConflict(false); //since these documents are not updated, a conflict just means it was deleted previously
415426

416-
request.indices(TransformInternalIndexConstants.INDEX_NAME_PATTERN);
427+
request.indices(TransformInternalIndexConstants.INDEX_NAME_PATTERN, TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED);
417428
QueryBuilder query = QueryBuilders.termQuery(TransformField.ID.getPreferredName(), transformId);
418429
request.setQuery(query);
419430
request.setRefresh(true);
@@ -472,7 +483,9 @@ public void putOrUpdateTransformStoredDoc(TransformStoredDoc stats,
472483
public void getTransformStoredDoc(String transformId,
473484
ActionListener<Tuple<TransformStoredDoc, SeqNoPrimaryTermAndIndex>> resultListener) {
474485
QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", TransformStoredDoc.documentId(transformId));
475-
SearchRequest searchRequest = client.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN)
486+
SearchRequest searchRequest = client
487+
.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN,
488+
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
476489
.setQuery(queryBuilder)
477490
// use sort to get the last
478491
.addSort("_index", SortOrder.DESC)
@@ -508,7 +521,9 @@ public void getTransformStoredDoc(Collection<String> transformIds, ActionListene
508521
.filter(QueryBuilders.termsQuery(TransformField.ID.getPreferredName(), transformIds))
509522
.filter(QueryBuilders.termQuery(TransformField.INDEX_DOC_TYPE.getPreferredName(), TransformStoredDoc.NAME)));
510523

511-
SearchRequest searchRequest = client.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN)
524+
SearchRequest searchRequest = client
525+
.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN,
526+
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
512527
.addSort(TransformField.ID.getPreferredName(), SortOrder.ASC)
513528
.addSort("_index", SortOrder.DESC)
514529
.setQuery(builder)

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1313
import org.elasticsearch.client.Client;
1414
import org.elasticsearch.cluster.ClusterState;
15+
import org.elasticsearch.cluster.metadata.AliasMetaData;
1516
import org.elasticsearch.cluster.metadata.IndexMetaData;
1617
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
1718
import org.elasticsearch.cluster.service.ClusterService;
@@ -94,6 +95,7 @@ public static IndexTemplateMetaData getAuditIndexTemplateMetaData() throws IOExc
9495
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
9596
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1"))
9697
.putMapping(MapperService.SINGLE_MAPPING_NAME, Strings.toString(auditMappings()))
98+
.putAlias(AliasMetaData.builder(TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS))
9799
.build();
98100
return transformTemplate;
99101
}

0 commit comments

Comments
 (0)