Skip to content

Commit 73587b3

Browse files
author
Hendrik Muhs
committed
rename audit index and create an alias for accessing it
1 parent a8a7477 commit 73587b3

File tree

4 files changed

+95
-2
lines changed

4 files changed

+95
-2
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/persistence/TransformInternalIndexConstants.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,10 @@ public final class TransformInternalIndexConstants {
2929
public static final String INDEX_NAME_PATTERN = INDEX_PATTERN + "*";
3030

3131
// audit index
32-
public static final String AUDIT_TEMPLATE_VERSION = "1";
32+
public static final String AUDIT_TEMPLATE_VERSION = "000001";
3333
public static final String AUDIT_INDEX_PREFIX = ".transform-notifications-";
3434
public static final String AUDIT_INDEX_PATTERN = AUDIT_INDEX_PREFIX + "*";
35+
public static final String AUDIT_INDEX_DEPRECATED = ".data-frame-notifications-1";
3536
public static final String AUDIT_INDEX_PATTERN_DEPRECATED = ".data-frame-notifications-*";
3637

3738
public static final String AUDIT_INDEX_READ_ALIAS = ".transform-notifications-read";

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,8 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
222222
transformConfigManager.get(),
223223
transformAuditor.get()));
224224

225-
return Arrays.asList(transformConfigManager.get(), transformAuditor.get(), transformCheckpointService.get());
225+
return Arrays.asList(transformConfigManager.get(), transformAuditor.get(), transformCheckpointService.get(),
226+
new TransformClusterStateListener(clusterService, client));
226227
}
227228

228229
@Override
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.transform;
8+
9+
import org.apache.logging.log4j.LogManager;
10+
import org.apache.logging.log4j.Logger;
11+
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
13+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
14+
import org.elasticsearch.client.Client;
15+
import org.elasticsearch.cluster.ClusterChangedEvent;
16+
import org.elasticsearch.cluster.ClusterState;
17+
import org.elasticsearch.cluster.ClusterStateListener;
18+
import org.elasticsearch.cluster.service.ClusterService;
19+
import org.elasticsearch.gateway.GatewayService;
20+
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
21+
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
24+
import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN;
25+
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
26+
27+
class TransformClusterStateListener implements ClusterStateListener {
28+
29+
private static final Logger logger = LogManager.getLogger(TransformClusterStateListener.class);
30+
31+
private final Client client;
32+
private final AtomicBoolean isIndexCreationInProgress = new AtomicBoolean(false);
33+
34+
TransformClusterStateListener(ClusterService clusterService, Client client) {
35+
this.client = client;
36+
clusterService.addListener(this);
37+
logger.debug("Created TransformClusterStateListener");
38+
}
39+
40+
@Override
41+
public void clusterChanged(ClusterChangedEvent event) {
42+
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
43+
// Wait until the gateway has recovered from disk.
44+
return;
45+
}
46+
47+
// The atomic flag prevents multiple simultaneous attempts to run alias creation
48+
// if there is a flurry of cluster state updates in quick succession
49+
if (event.localNodeMaster() && isIndexCreationInProgress.compareAndSet(false, true)) {
50+
createAuditAliasForDataFrameBWC(event.state(), client, ActionListener.wrap(
51+
r -> {
52+
isIndexCreationInProgress.set(false);
53+
if (r) {
54+
logger.info("Created alias for deprecated data frame notifications index");
55+
} else {
56+
logger.debug("Skipped creating alias for deprecated data frame notifications index");
57+
}
58+
},
59+
e -> {
60+
isIndexCreationInProgress.set(false);
61+
logger.error("Error creating alias for deprecated data frame notifications index", e);
62+
}));
63+
}
64+
}
65+
66+
private static void createAuditAliasForDataFrameBWC(ClusterState state, Client client, final ActionListener<Boolean> finalListener) {
67+
68+
// check if alias already exists
69+
if (state.getMetaData().getAliasAndIndexLookup().containsKey(TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS)) {
70+
finalListener.onResponse(false);
71+
return;
72+
}
73+
74+
// check if old audit index exists, no need to create the alias if it does not
75+
if (state.getMetaData().getAliasAndIndexLookup().containsKey(TransformInternalIndexConstants.AUDIT_INDEX_DEPRECATED)) {
76+
finalListener.onResponse(false);
77+
return;
78+
}
79+
80+
final IndicesAliasesRequest request = client.admin().indices().prepareAliases()
81+
.addAlias(TransformInternalIndexConstants.AUDIT_INDEX_DEPRECATED, TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS)
82+
.request();
83+
84+
executeAsyncWithOrigin(client.threadPool().getThreadContext(), TRANSFORM_ORIGIN, request,
85+
ActionListener.<AcknowledgedResponse>wrap(r -> finalListener.onResponse(r.isAcknowledged()), finalListener::onFailure),
86+
client.admin().indices()::aliases);
87+
}
88+
89+
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/TransformInternalIndex.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1313
import org.elasticsearch.client.Client;
1414
import org.elasticsearch.cluster.ClusterState;
15+
import org.elasticsearch.cluster.metadata.AliasMetaData;
1516
import org.elasticsearch.cluster.metadata.IndexMetaData;
1617
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
1718
import org.elasticsearch.cluster.service.ClusterService;
@@ -93,6 +94,7 @@ public static IndexTemplateMetaData getAuditIndexTemplateMetaData() throws IOExc
9394
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
9495
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1"))
9596
.putMapping(MapperService.SINGLE_MAPPING_NAME, Strings.toString(auditMappings()))
97+
.putAlias(AliasMetaData.builder(TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS))
9698
.build();
9799
return transformTemplate;
98100
}

0 commit comments

Comments
 (0)