Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,20 @@ public final class TransformInternalIndexConstants {
*/

// internal index
public static final String INDEX_VERSION = "2";
public static final String INDEX_PATTERN = ".data-frame-internal-";

// version is not a rollover pattern, however padded because sort is string based
public static final String INDEX_VERSION = "003";
public static final String INDEX_PATTERN = ".transform-internal-";
public static final String LATEST_INDEX_VERSIONED_NAME = INDEX_PATTERN + INDEX_VERSION;
public static final String LATEST_INDEX_NAME = LATEST_INDEX_VERSIONED_NAME;
public static final String INDEX_NAME_PATTERN = INDEX_PATTERN + "*";
public static final String INDEX_NAME_PATTERN_DEPRECATED = ".data-frame-internal-*";

// audit index
public static final String AUDIT_TEMPLATE_VERSION = "1";
public static final String AUDIT_TEMPLATE_VERSION = "000001";
public static final String AUDIT_INDEX_PREFIX = ".transform-notifications-";
public static final String AUDIT_INDEX_PATTERN = AUDIT_INDEX_PREFIX + "*";
public static final String AUDIT_INDEX_DEPRECATED = ".data-frame-notifications-1";
public static final String AUDIT_INDEX_PATTERN_DEPRECATED = ".data-frame-notifications-*";

public static final String AUDIT_INDEX_READ_ALIAS = ".transform-notifications-read";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
package org.elasticsearch.xpack.transform.integration;

import org.elasticsearch.client.Request;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
import org.junit.Before;

Expand Down Expand Up @@ -48,7 +50,7 @@ public void createIndexes() throws IOException {
createReviewsIndex();
indicesCreated = true;
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME);
setupUser(TEST_USER_NAME, Arrays.asList("data_frame_transforms_admin", DATA_ACCESS_ROLE));
setupUser(TEST_USER_NAME, Arrays.asList("transform_admin", DATA_ACCESS_ROLE));
}

@SuppressWarnings("unchecked")
Expand All @@ -67,6 +69,7 @@ public void testAuditorWritesAudits() throws Exception {
request.setJsonEntity("{\"query\":{\"term\":{\"transform_id\":\"simple_pivot_for_audit\"}}}");
assertBusy(() -> {
assertTrue(indexExists(TransformInternalIndexConstants.AUDIT_INDEX));
assertTrue(aliasExists(TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS));
});
// Since calls to write the AbstractAuditor are sent and forgot (async) we could have returned from the start,
// finished the job (as this is a very short DF job), all without the audit being fully written.
Expand All @@ -85,4 +88,16 @@ public void testAuditorWritesAudits() throws Exception {
});

}

public void testAliasCreatedforBWCIndexes() throws Exception {
Settings.Builder settings = Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0);

createIndex(TransformInternalIndexConstants.AUDIT_INDEX_DEPRECATED, settings.build());
assertBusy(() -> {
assertTrue(aliasExists(TransformInternalIndexConstants.AUDIT_INDEX_DEPRECATED,
TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class TransformInternalIndexIT extends ESRestTestCase {


private static final String CURRENT_INDEX = TransformInternalIndexConstants.LATEST_INDEX_NAME;
private static final String OLD_INDEX = TransformInternalIndexConstants.INDEX_PATTERN + "1";
private static final String OLD_INDEX = TransformInternalIndexConstants.INDEX_PATTERN + "001";


public void testUpdateDeletesOldTransformConfig() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
transformConfigManager.get(),
transformAuditor.get()));

return Arrays.asList(transformConfigManager.get(), transformAuditor.get(), transformCheckpointService.get());
return Arrays.asList(transformConfigManager.get(), transformAuditor.get(), transformCheckpointService.get(),
new TransformClusterStateListener(clusterService, client));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.transform;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;

import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

class TransformClusterStateListener implements ClusterStateListener {

private static final Logger logger = LogManager.getLogger(TransformClusterStateListener.class);

private final Client client;
private final AtomicBoolean isIndexCreationInProgress = new AtomicBoolean(false);

TransformClusterStateListener(ClusterService clusterService, Client client) {
this.client = client;
clusterService.addListener(this);
logger.debug("Created TransformClusterStateListener");
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// Wait until the gateway has recovered from disk.
return;
}

// The atomic flag prevents multiple simultaneous attempts to run alias creation
// if there is a flurry of cluster state updates in quick succession
if (event.localNodeMaster() && isIndexCreationInProgress.compareAndSet(false, true)) {
createAuditAliasForDataFrameBWC(event.state(), client, ActionListener.wrap(
r -> {
isIndexCreationInProgress.set(false);
if (r) {
logger.info("Created alias for deprecated data frame notifications index");
} else {
logger.debug("Skipped creating alias for deprecated data frame notifications index");
}
},
e -> {
isIndexCreationInProgress.set(false);
logger.error("Error creating alias for deprecated data frame notifications index", e);
}));
}
}

private static void createAuditAliasForDataFrameBWC(ClusterState state, Client client, final ActionListener<Boolean> finalListener) {

// check if old audit index exists, no need to create the alias if it does not
if (state.getMetaData().getAliasAndIndexLookup().containsKey(TransformInternalIndexConstants.AUDIT_INDEX_DEPRECATED) == false) {
finalListener.onResponse(false);
return;
}

if (state.getMetaData().getAliasAndIndexLookup().get(TransformInternalIndexConstants.AUDIT_INDEX_DEPRECATED).getIndices().stream()
.anyMatch(metaData -> metaData.getAliases().containsKey(TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS))) {
finalListener.onResponse(false);
return;
}

final IndicesAliasesRequest request = client.admin().indices().prepareAliases()
.addAlias(TransformInternalIndexConstants.AUDIT_INDEX_DEPRECATED, TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS)
.request();

executeAsyncWithOrigin(client.threadPool().getThreadContext(), TRANSFORM_ORIGIN, request,
ActionListener.<AcknowledgedResponse>wrap(r -> finalListener.onResponse(r.isAcknowledged()), finalListener::onFailure),
client.admin().indices()::aliases);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ static void getStatisticSummations(Client client, ActionListener<TransformIndexe
.filter(QueryBuilders.termQuery(TransformField.INDEX_DOC_TYPE.getPreferredName(),
TransformStoredDoc.NAME)));

SearchRequestBuilder requestBuilder = client.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN)
SearchRequestBuilder requestBuilder = client
.prepareSearch(
TransformInternalIndexConstants.INDEX_NAME_PATTERN,
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
.setSize(0)
.setQuery(queryBuilder);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
import org.elasticsearch.xpack.core.action.XPackUsageFeatureTransportAction;
import org.elasticsearch.xpack.core.transform.TransformFeatureSetUsage;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;

Expand Down Expand Up @@ -118,7 +118,9 @@ protected void masterOperation(Task task, XPackUsageRequest request, ClusterStat
}
);

SearchRequest totalTransformCount = client.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN)
SearchRequest totalTransformCount = client
.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN,
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
.setTrackTotalHits(true)
.setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(TransformField.INDEX_DOC_TYPE.getPreferredName(), TransformConfig.NAME))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ protected ParseField getResultsField() {

@Override
protected String[] getIndices() {
return new String[]{TransformInternalIndexConstants.INDEX_NAME_PATTERN};
return new String[] { TransformInternalIndexConstants.INDEX_NAME_PATTERN,
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED };
}

@Override
Expand Down
Loading