Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,20 @@ public final class TransformInternalIndexConstants {
*/

// internal index
public static final String INDEX_VERSION = "2";
public static final String INDEX_PATTERN = ".data-frame-internal-";

// version is not a rollover pattern, however padded because sort is string based
public static final String INDEX_VERSION = "003";
public static final String INDEX_PATTERN = ".transform-internal-";
public static final String LATEST_INDEX_VERSIONED_NAME = INDEX_PATTERN + INDEX_VERSION;
public static final String LATEST_INDEX_NAME = LATEST_INDEX_VERSIONED_NAME;
public static final String INDEX_NAME_PATTERN = INDEX_PATTERN + "*";
public static final String INDEX_NAME_PATTERN_DEPRECATED = ".data-frame-internal-*";

// audit index
public static final String AUDIT_TEMPLATE_VERSION = "1";
public static final String AUDIT_TEMPLATE_VERSION = "000001";
public static final String AUDIT_INDEX_PREFIX = ".transform-notifications-";
public static final String AUDIT_INDEX_PATTERN = AUDIT_INDEX_PREFIX + "*";
public static final String AUDIT_INDEX_DEPRECATED = ".data-frame-notifications-1";
public static final String AUDIT_INDEX_PATTERN_DEPRECATED = ".data-frame-notifications-*";

public static final String AUDIT_INDEX_READ_ALIAS = ".transform-notifications-read";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
package org.elasticsearch.xpack.transform.integration;

import org.elasticsearch.client.Request;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
import org.junit.Before;

Expand Down Expand Up @@ -48,7 +50,7 @@ public void createIndexes() throws IOException {
createReviewsIndex();
indicesCreated = true;
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME);
setupUser(TEST_USER_NAME, Arrays.asList("data_frame_transforms_admin", DATA_ACCESS_ROLE));
setupUser(TEST_USER_NAME, Arrays.asList("transform_admin", DATA_ACCESS_ROLE));
}

@SuppressWarnings("unchecked")
Expand All @@ -67,6 +69,7 @@ public void testAuditorWritesAudits() throws Exception {
request.setJsonEntity("{\"query\":{\"term\":{\"transform_id\":\"simple_pivot_for_audit\"}}}");
assertBusy(() -> {
assertTrue(indexExists(TransformInternalIndexConstants.AUDIT_INDEX));
assertTrue(aliasExists(TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS));
});
// Since calls to write the AbstractAuditor are sent and forgot (async) we could have returned from the start,
// finished the job (as this is a very short DF job), all without the audit being fully written.
Expand All @@ -85,4 +88,16 @@ public void testAuditorWritesAudits() throws Exception {
});

}

public void testAliasCreatedforBWCIndexes() throws Exception {
Settings.Builder settings = Settings.builder()
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0);

createIndex(TransformInternalIndexConstants.AUDIT_INDEX_DEPRECATED, settings.build());
assertBusy(() -> {
assertTrue(aliasExists(TransformInternalIndexConstants.AUDIT_INDEX_DEPRECATED,
TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class TransformInternalIndexIT extends ESRestTestCase {


private static final String CURRENT_INDEX = TransformInternalIndexConstants.LATEST_INDEX_NAME;
private static final String OLD_INDEX = TransformInternalIndexConstants.INDEX_PATTERN + "1";
private static final String OLD_INDEX = TransformInternalIndexConstants.INDEX_PATTERN + "001";


public void testUpdateDeletesOldTransformConfig() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
transformConfigManager.get(),
transformAuditor.get()));

return Arrays.asList(transformConfigManager.get(), transformAuditor.get(), transformCheckpointService.get());
return Arrays.asList(transformConfigManager.get(), transformAuditor.get(), transformCheckpointService.get(),
new TransformClusterStateListener(clusterService, client));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.transform;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;

import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

class TransformClusterStateListener implements ClusterStateListener {

private static final Logger logger = LogManager.getLogger(TransformClusterStateListener.class);

private final Client client;
private final AtomicBoolean isIndexCreationInProgress = new AtomicBoolean(false);

TransformClusterStateListener(ClusterService clusterService, Client client) {
this.client = client;
clusterService.addListener(this);
logger.debug("Created TransformClusterStateListener");
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// Wait until the gateway has recovered from disk.
return;
}

// The atomic flag prevents multiple simultaneous attempts to run alias creation
// if there is a flurry of cluster state updates in quick succession
if (event.localNodeMaster() && isIndexCreationInProgress.compareAndSet(false, true)) {
createAuditAliasForDataFrameBWC(event.state(), client, ActionListener.wrap(
r -> {
isIndexCreationInProgress.set(false);
if (r) {
logger.info("Created alias for deprecated data frame notifications index");
} else {
logger.debug("Skipped creating alias for deprecated data frame notifications index");
}
},
e -> {
isIndexCreationInProgress.set(false);
logger.error("Error creating alias for deprecated data frame notifications index", e);
}));
}
}

private static void createAuditAliasForDataFrameBWC(ClusterState state, Client client, final ActionListener<Boolean> finalListener) {

// check if old audit index exists, no need to create the alias if it does not
if (state.getMetaData().getAliasAndIndexLookup().containsKey(TransformInternalIndexConstants.AUDIT_INDEX_DEPRECATED) == false) {
finalListener.onResponse(false);
return;
}

if (state.getMetaData().getAliasAndIndexLookup().get(TransformInternalIndexConstants.AUDIT_INDEX_DEPRECATED).getIndices().stream()
.anyMatch(metaData -> metaData.getAliases().containsKey(TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS))) {
finalListener.onResponse(false);
return;
}

final IndicesAliasesRequest request = client.admin().indices().prepareAliases()
.addAlias(TransformInternalIndexConstants.AUDIT_INDEX_DEPRECATED, TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS)
.request();

executeAsyncWithOrigin(client.threadPool().getThreadContext(), TRANSFORM_ORIGIN, request,
ActionListener.<AcknowledgedResponse>wrap(r -> finalListener.onResponse(r.isAcknowledged()), finalListener::onFailure),
client.admin().indices()::aliases);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,10 @@ static void getStatisticSummations(Client client, ActionListener<TransformIndexe
.filter(QueryBuilders.termQuery(TransformField.INDEX_DOC_TYPE.getPreferredName(),
TransformStoredDoc.NAME)));

SearchRequestBuilder requestBuilder = client.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN)
SearchRequestBuilder requestBuilder = client
.prepareSearch(
TransformInternalIndexConstants.INDEX_NAME_PATTERN,
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
.setSize(0)
.setQuery(queryBuilder);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ protected ParseField getResultsField() {

@Override
protected String[] getIndices() {
return new String[]{TransformInternalIndexConstants.INDEX_NAME_PATTERN};
return new String[] { TransformInternalIndexConstants.INDEX_NAME_PATTERN,
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED };
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@
*
* Versioned Index:
*
* We wrap several indexes under 1 pattern: ".data-frame-internal-1", ".data-frame-internal-2", ".data-frame-internal-n" while
* n is the _current_ version of the index.
* We wrap several indexes under 1 pattern: ".transform-internal-001", ".transform-internal-002", ".transform-internal-n" while
* n is the _current_ version of the index. For BWC we also search in ".data-frame-internal-1", ".data-frame-internal-2"
*
* - all gets/reads and dbq as well are searches on all indexes, while last-one-wins, so the result with the highest version is uses
* - all puts and updates go into the _current_ version of the index, in case of updates this can leave dups behind
Expand Down Expand Up @@ -168,7 +168,9 @@ public void updateTransformConfiguration(TransformConfig transformConfig,
* @param listener listener to alert on completion
*/
public void deleteOldTransformConfigurations(String transformId, ActionListener<Boolean> listener) {
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(TransformInternalIndexConstants.INDEX_NAME_PATTERN)
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(
TransformInternalIndexConstants.INDEX_NAME_PATTERN,
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
.setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
.mustNot(QueryBuilders.termQuery("_index", TransformInternalIndexConstants.LATEST_INDEX_NAME))
.filter(QueryBuilders.termQuery("_id", TransformConfig.documentId(transformId)))))
Expand All @@ -195,7 +197,8 @@ public void deleteOldTransformConfigurations(String transformId, ActionListener<
* @param listener listener to alert on completion
*/
public void deleteOldTransformStoredDocuments(String transformId, ActionListener<Boolean> listener) {
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(TransformInternalIndexConstants.INDEX_NAME_PATTERN)
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(
TransformInternalIndexConstants.INDEX_NAME_PATTERN, TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
.setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
.mustNot(QueryBuilders.termQuery("_index", TransformInternalIndexConstants.LATEST_INDEX_NAME))
.filter(QueryBuilders.termQuery("_id", TransformStoredDoc.documentId(transformId)))))
Expand Down Expand Up @@ -261,7 +264,9 @@ private void putTransformConfiguration(TransformConfig transformConfig,
*/
public void getTransformCheckpoint(String transformId, long checkpoint, ActionListener<TransformCheckpoint> resultListener) {
QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", TransformCheckpoint.documentId(transformId, checkpoint));
SearchRequest searchRequest = client.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN)
SearchRequest searchRequest = client
.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN,
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
.setQuery(queryBuilder)
// use sort to get the last
.addSort("_index", SortOrder.DESC)
Expand All @@ -283,14 +288,16 @@ public void getTransformCheckpoint(String transformId, long checkpoint, ActionLi

/**
* Get the transform configuration for a given transform id. This function is only for internal use. For transforms returned via GET
* data_frame/transforms, see the @link{TransportGetTransformAction}
* _transform, see the @link{TransportGetTransformAction}
*
* @param transformId the transform id
* @param resultListener listener to call after inner request has returned
*/
public void getTransformConfiguration(String transformId, ActionListener<TransformConfig> resultListener) {
QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", TransformConfig.documentId(transformId));
SearchRequest searchRequest = client.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN)
SearchRequest searchRequest = client
.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN,
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
.setQuery(queryBuilder)
// use sort to get the last
.addSort("_index", SortOrder.DESC)
Expand All @@ -312,7 +319,7 @@ public void getTransformConfiguration(String transformId, ActionListener<Transfo

/**
* Get the transform configuration for a given transform id. This function is only for internal use. For transforms returned via GET
* data_frame/transforms, see the @link{TransportGetTransformAction}
* _transform, see the @link{TransportGetTransformAction}
*
* @param transformId the transform id
* @param configAndVersionListener listener to call after inner request has returned
Expand All @@ -321,7 +328,9 @@ public void getTransformConfigurationForUpdate(String transformId,
ActionListener<Tuple<TransformConfig,
SeqNoPrimaryTermAndIndex>> configAndVersionListener) {
QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", TransformConfig.documentId(transformId));
SearchRequest searchRequest = client.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN)
SearchRequest searchRequest = client
.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN,
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
.setQuery(queryBuilder)
// use sort to get the last
.addSort("_index", SortOrder.DESC)
Expand Down Expand Up @@ -362,7 +371,9 @@ public void expandTransformIds(String transformIdsExpression,
String[] idTokens = ExpandedIdsMatcher.tokenizeExpression(transformIdsExpression);
QueryBuilder queryBuilder = buildQueryFromTokenizedIds(idTokens, TransformConfig.NAME);

SearchRequest request = client.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN)
SearchRequest request = client
.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN,
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
.addSort(TransformField.ID.getPreferredName(), SortOrder.ASC)
.setFrom(pageParams.getFrom())
.setTrackTotalHits(true)
Expand Down Expand Up @@ -413,7 +424,7 @@ public void deleteTransform(String transformId, ActionListener<Boolean> listener
DeleteByQueryRequest request = new DeleteByQueryRequest()
.setAbortOnVersionConflict(false); //since these documents are not updated, a conflict just means it was deleted previously

request.indices(TransformInternalIndexConstants.INDEX_NAME_PATTERN);
request.indices(TransformInternalIndexConstants.INDEX_NAME_PATTERN, TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED);
QueryBuilder query = QueryBuilders.termQuery(TransformField.ID.getPreferredName(), transformId);
request.setQuery(query);
request.setRefresh(true);
Expand Down Expand Up @@ -472,7 +483,9 @@ public void putOrUpdateTransformStoredDoc(TransformStoredDoc stats,
public void getTransformStoredDoc(String transformId,
ActionListener<Tuple<TransformStoredDoc, SeqNoPrimaryTermAndIndex>> resultListener) {
QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", TransformStoredDoc.documentId(transformId));
SearchRequest searchRequest = client.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN)
SearchRequest searchRequest = client
.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN,
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
.setQuery(queryBuilder)
// use sort to get the last
.addSort("_index", SortOrder.DESC)
Expand Down Expand Up @@ -508,7 +521,9 @@ public void getTransformStoredDoc(Collection<String> transformIds, ActionListene
.filter(QueryBuilders.termsQuery(TransformField.ID.getPreferredName(), transformIds))
.filter(QueryBuilders.termQuery(TransformField.INDEX_DOC_TYPE.getPreferredName(), TransformStoredDoc.NAME)));

SearchRequest searchRequest = client.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN)
SearchRequest searchRequest = client
.prepareSearch(TransformInternalIndexConstants.INDEX_NAME_PATTERN,
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
.addSort(TransformField.ID.getPreferredName(), SortOrder.ASC)
.addSort("_index", SortOrder.DESC)
.setQuery(builder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -94,6 +95,7 @@ public static IndexTemplateMetaData getAuditIndexTemplateMetaData() throws IOExc
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1"))
.putMapping(MapperService.SINGLE_MAPPING_NAME, Strings.toString(auditMappings()))
.putAlias(AliasMetaData.builder(TransformInternalIndexConstants.AUDIT_INDEX_READ_ALIAS))
.build();
return transformTemplate;
}
Expand Down
Loading