Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
0b3e3a0
Service to migrate indices and ILM policies to data tiers
andreidan May 28, 2021
9f9a64f
Fix line length
andreidan Jun 2, 2021
fc04682
Javadoc mention we update the cached phase definition
andreidan Jun 2, 2021
3faa8b4
Merge branch 'master' into migrate-to-data-tiers
elasticmachine Jun 3, 2021
e41c599
Add validation that ILM is STOPPED
andreidan Jun 3, 2021
405f1b0
Test migration doesn't delete composable templates
andreidan Jun 3, 2021
ac854b5
Merge branch 'master' into migrate-to-data-tiers
elasticmachine Jun 4, 2021
82baff0
Merge branch 'master' into migrate-to-data-tiers
elasticmachine Jun 4, 2021
58bdba7
Merge branch 'master' into migrate-to-data-tiers
elasticmachine Jun 7, 2021
33bd70a
Use Strings.hasText
andreidan Jun 8, 2021
fba9f61
Flip ILM metadata check
andreidan Jun 8, 2021
690808e
Merge branch 'master' into migrate-to-data-tiers
elasticmachine Jun 8, 2021
83ce6ac
Inspect the `include.data` node attribute routing too
andreidan Jun 8, 2021
a25a487
Extract `migrateSingleILMPolicy` method
andreidan Jun 8, 2021
90a81ed
Add test to check the require setting is migrated even if include is …
andreidan Jun 8, 2021
846e85a
Merge branch 'master' into migrate-to-data-tiers
andreidan Jun 10, 2021
13268b6
Fix master merge
andreidan Jun 10, 2021
c8cfac2
Merge branch 'master' into migrate-to-data-tiers
elasticmachine Jun 14, 2021
f05ef74
Oxford comma
andreidan Jun 15, 2021
0203343
Merge branch 'master' into migrate-to-data-tiers
elasticmachine Jun 15, 2021
9cc1198
Debug logging when the template doesn't exist
andreidan Jun 15, 2021
00659d1
Debug log how many indices had their ILM phase definition refreshed
andreidan Jun 15, 2021
ddd6aa7
Remove all attribute routing settings when indices are migrated
andreidan Jun 15, 2021
effa1db
Merge branch 'master' into migrate-to-data-tiers
elasticmachine Jun 15, 2021
3d3473e
Fix phase definition refresh for migrated policies where we remove th…
andreidan Jun 16, 2021
4c84c88
Merge branch 'master' into migrate-to-data-tiers
elasticmachine Jun 17, 2021
01ad3d9
Inline var
andreidan Jun 17, 2021
354b645
Move MetadataMigrateToDataTiersRoutingService to xpack.ilm
andreidan Jun 17, 2021
a5cd72e
Extract moveStateToNextActionAndUpdateCachedPhase into IndexLifecycle…
andreidan Jun 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* The {@code DataTier} class encapsulates the formalization of the "content",
Expand All @@ -40,6 +42,10 @@ public class DataTier {
public static final Set<String> ALL_DATA_TIERS =
new HashSet<>(Arrays.asList(DATA_CONTENT, DATA_HOT, DATA_WARM, DATA_COLD, DATA_FROZEN));

// Represents an ordered list of data tiers from frozen to hot (or slow to fast)
private static final List<String> ORDERED_FROZEN_TO_HOT_TIERS =
List.of(DataTier.DATA_FROZEN, DataTier.DATA_COLD, DataTier.DATA_WARM, DataTier.DATA_HOT);

/**
* Returns true if the given tier name is a valid tier
*/
Expand All @@ -51,6 +57,19 @@ public static boolean validTierName(String tierName) {
DATA_FROZEN.equals(tierName);
}

/**
* Based on the provided target tier it will return a comma separated list of preferred tiers.
* ie. if `data_cold` is the target tier, it will return `data_cold,data_warm,data_hot`
* This is usually used in conjunction with {@link DataTierAllocationDecider#INDEX_ROUTING_PREFER_SETTING}
*/
public static String getPreferredTiersConfiguration(String targetTier) {
int indexOfTargetTier = ORDERED_FROZEN_TO_HOT_TIERS.indexOf(targetTier);
if (indexOfTargetTier == -1) {
throw new IllegalArgumentException("invalid data tier [" + targetTier + "]");
}
return ORDERED_FROZEN_TO_HOT_TIERS.stream().skip(indexOfTargetTier).collect(Collectors.joining(","));
}

/**
* Returns true iff the given settings have a data tier setting configured
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.core.DataTier.getPreferredTiersConfiguration;

/**
* A {@link LifecycleAction} which enables or disables the automatic migration of data between
Expand All @@ -37,9 +38,6 @@ public class MigrateAction implements LifecycleAction {

private static final Logger logger = LogManager.getLogger(MigrateAction.class);
static final String CONDITIONAL_SKIP_MIGRATE_STEP = BranchingStep.NAME + "-check-skip-action";
// Represents an ordered list of data tiers from frozen to hot (or slow to fast)
private static final List<String> FROZEN_TO_HOT_TIERS =
List.of(DataTier.DATA_FROZEN, DataTier.DATA_COLD, DataTier.DATA_WARM, DataTier.DATA_HOT);

private static final ConstructingObjectParser<MigrateAction, Void> PARSER = new ConstructingObjectParser<>(NAME,
a -> new MigrateAction(a[0] == null ? true : (boolean) a[0]));
Expand Down Expand Up @@ -128,19 +126,6 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
}
}

/**
* Based on the provided target tier it will return a comma separated list of preferred tiers.
* ie. if `data_cold` is the target tier, it will return `data_cold,data_warm,data_hot`
* This is usually used in conjunction with {@link DataTierAllocationDecider#INDEX_ROUTING_PREFER_SETTING}
*/
static String getPreferredTiersConfiguration(String targetTier) {
int indexOfTargetTier = FROZEN_TO_HOT_TIERS.indexOf(targetTier);
if (indexOfTargetTier == -1) {
throw new IllegalArgumentException("invalid data tier [" + targetTier + "]");
}
return FROZEN_TO_HOT_TIERS.stream().skip(indexOfTargetTier).collect(Collectors.joining(","));
}

@Override
public int hashCode() {
return Objects.hash(enabled);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.ilm;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.core.Nullable;

import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.xpack.core.ilm.LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY;

/**
* We cache the currently executing ILM phase in the index metadata so the ILM execution for managed indices is not irrecoverably
* interrupted by a concurrent update policy that, say, would remove the current execution phase altogether.
* <p>
* This contains class contains a series of methods that help manage the cached ILM phase.
*/
public final class PhaseCacheManagement {

private static final Logger logger = LogManager.getLogger(PhaseCacheManagement.class);

private PhaseCacheManagement() {
}

/**
* Rereads the phase JSON for the given index, returning a new cluster state.
*/
public static ClusterState refreshPhaseDefinition(final ClusterState state, final String index,
final LifecyclePolicyMetadata updatedPolicy) {
final IndexMetadata idxMeta = state.metadata().index(index);
Metadata.Builder metadataBuilder = Metadata.builder(state.metadata());
refreshPhaseDefinition(metadataBuilder, idxMeta, updatedPolicy);
return ClusterState.builder(state).metadata(metadataBuilder).build();
}

/**
* Rereads the phase JSON for the given index, and updates the provided metadata.
*/
public static void refreshPhaseDefinition(final Metadata.Builder metadataBuilder, final IndexMetadata idxMeta,
final LifecyclePolicyMetadata updatedPolicy) {
String index = idxMeta.getIndex().getName();
assert eligibleToCheckForRefresh(idxMeta) : "index " + index + " is missing crucial information needed to refresh phase definition";

logger.trace("[{}] updating cached phase definition for policy [{}]", index, updatedPolicy.getName());
LifecycleExecutionState currentExState = LifecycleExecutionState.fromIndexMetadata(idxMeta);

String currentPhase = currentExState.getPhase();
PhaseExecutionInfo pei = new PhaseExecutionInfo(updatedPolicy.getName(),
updatedPolicy.getPolicy().getPhases().get(currentPhase), updatedPolicy.getVersion(), updatedPolicy.getModifiedDate());

LifecycleExecutionState newExState = LifecycleExecutionState.builder(currentExState)
.setPhaseDefinition(Strings.toString(pei, false, false))
.build();

metadataBuilder.put(IndexMetadata.builder(idxMeta)
.putCustom(ILM_CUSTOM_METADATA_KEY, newExState.asMap()));
}


/**
* Ensure that we have the minimum amount of metadata necessary to check for cache phase
* refresh. This includes:
* - An execution state
* - Existing phase definition JSON
* - A current step key
* - A current phase in the step key
* - Not currently in the ERROR step
*/
public static boolean eligibleToCheckForRefresh(final IndexMetadata metadata) {
LifecycleExecutionState executionState = LifecycleExecutionState.fromIndexMetadata(metadata);
if (executionState == null || executionState.getPhaseDefinition() == null) {
return false;
}

Step.StepKey currentStepKey = LifecycleExecutionState.getCurrentStepKey(executionState);
if (currentStepKey == null || currentStepKey.getPhase() == null) {
return false;
}

return ErrorStep.NAME.equals(currentStepKey.getName()) == false;
}

/**
* For the given new policy, returns a new cluster with all updateable indices' phase JSON refreshed.
*/
public static ClusterState updateIndicesForPolicy(final ClusterState state, final NamedXContentRegistry xContentRegistry,
final Client client, final LifecyclePolicy oldPolicy,
final LifecyclePolicyMetadata newPolicy) {
Metadata.Builder mb = Metadata.builder(state.metadata());
if (updateIndicesForPolicy(mb, state, xContentRegistry, client, oldPolicy, newPolicy)) {
return ClusterState.builder(state).metadata(mb).build();
}
return state;
}

/**
* For the given new policy, update the provided metadata to reflect the refreshed phase JSON for all updateable indices.
* Returns true if any indices were updated and false otherwise.
* Users of this API should consider the returned value and only create a new {@link ClusterState} if `true` is returned.
*/
public static boolean updateIndicesForPolicy(final Metadata.Builder mb, final ClusterState currentState,
final NamedXContentRegistry xContentRegistry, final Client client,
final LifecyclePolicy oldPolicy, final LifecyclePolicyMetadata newPolicy) {
assert oldPolicy.getName().equals(newPolicy.getName()) : "expected both policies to have the same id but they were: [" +
oldPolicy.getName() + "] vs. [" + newPolicy.getName() + "]";

// No need to update anything if the policies are identical in contents
if (oldPolicy.equals(newPolicy.getPolicy())) {
logger.debug("policy [{}] is unchanged and no phase definition refresh is needed", oldPolicy.getName());
return false;
}

final List<IndexMetadata> indicesThatCanBeUpdated =
StreamSupport.stream(Spliterators.spliteratorUnknownSize(currentState.metadata().indices().valuesIt(), 0), false)
.filter(meta -> newPolicy.getName().equals(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(meta.getSettings())))
.filter(meta -> isIndexPhaseDefinitionUpdatable(xContentRegistry, client, meta, newPolicy.getPolicy()))
.collect(Collectors.toList());

final List<String> refreshedIndices = new ArrayList<>(indicesThatCanBeUpdated.size());
for (IndexMetadata index : indicesThatCanBeUpdated) {
try {
refreshPhaseDefinition(mb, index, newPolicy);
refreshedIndices.add(index.getIndex().getName());
} catch (Exception e) {
logger.warn(new ParameterizedMessage("[{}] unable to refresh phase definition for updated policy [{}]",
index, newPolicy.getName()), e);
}
}
logger.debug("refreshed policy [{}] phase definition for [{}] indices", newPolicy.getName(), refreshedIndices.size());
return refreshedIndices.size() > 0;
}

/**
* Returns 'true' if the index's cached phase JSON can be safely reread, 'false' otherwise.
*/
public static boolean isIndexPhaseDefinitionUpdatable(final NamedXContentRegistry xContentRegistry, final Client client,
final IndexMetadata metadata, final LifecyclePolicy newPolicy) {
final String index = metadata.getIndex().getName();
if (eligibleToCheckForRefresh(metadata) == false) {
logger.debug("[{}] does not contain enough information to check for eligibility of refreshing phase", index);
return false;
}
final String policyId = newPolicy.getName();

final LifecycleExecutionState executionState = LifecycleExecutionState.fromIndexMetadata(metadata);
final Step.StepKey currentStepKey = LifecycleExecutionState.getCurrentStepKey(executionState);
final String currentPhase = currentStepKey.getPhase();

final Set<Step.StepKey> newStepKeys = newPolicy.toSteps(client).stream()
.map(Step::getKey)
.collect(Collectors.toCollection(LinkedHashSet::new));

if (newStepKeys.contains(currentStepKey) == false) {
// The index is on a step that doesn't exist in the new policy, we
// can't safely re-read the JSON
logger.debug("[{}] updated policy [{}] does not contain the current step key [{}], so the policy phase will not be refreshed",
index, policyId, currentStepKey);
return false;
}

final String phaseDef = executionState.getPhaseDefinition();
final Set<Step.StepKey> oldStepKeys = readStepKeys(xContentRegistry, client, phaseDef, currentPhase);
if (oldStepKeys == null) {
logger.debug("[{}] unable to parse phase definition for cached policy [{}], policy phase will not be refreshed",
index, policyId);
return false;
}

final Set<Step.StepKey> oldPhaseStepKeys = oldStepKeys.stream()
.filter(sk -> currentPhase.equals(sk.getPhase()))
.collect(Collectors.toCollection(LinkedHashSet::new));

final PhaseExecutionInfo phaseExecutionInfo = new PhaseExecutionInfo(policyId, newPolicy.getPhases().get(currentPhase), 1L, 1L);
final String peiJson = Strings.toString(phaseExecutionInfo);

final Set<Step.StepKey> newPhaseStepKeys = readStepKeys(xContentRegistry, client, peiJson, currentPhase);
if (newPhaseStepKeys == null) {
logger.debug(new ParameterizedMessage("[{}] unable to parse phase definition for policy [{}] " +
"to determine if it could be refreshed", index, policyId));
return false;
}

if (newPhaseStepKeys.equals(oldPhaseStepKeys)) {
// The new and old phase have the same stepkeys for this current phase, so we can
// refresh the definition because we know it won't change the execution flow.
logger.debug("[{}] updated policy [{}] contains the same phase step keys and can be refreshed", index, policyId);
return true;
} else {
logger.debug("[{}] updated policy [{}] has different phase step keys and will NOT refresh phase " +
"definition as it differs too greatly. old: {}, new: {}",
index, policyId, oldPhaseStepKeys, newPhaseStepKeys);
return false;
}
}

/**
* Parse the {@code phaseDef} phase definition to get the stepkeys for the given phase.
* If there is an error parsing or if the phase definition is missing the required
* information, returns null.
*/
@Nullable
static Set<Step.StepKey> readStepKeys(final NamedXContentRegistry xContentRegistry, final Client client,
final String phaseDef, final String currentPhase) {
final PhaseExecutionInfo phaseExecutionInfo;
try (XContentParser parser = JsonXContent.jsonXContent.createParser(xContentRegistry,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, phaseDef)) {
phaseExecutionInfo = PhaseExecutionInfo.parse(parser, currentPhase);
} catch (Exception e) {
logger.trace(new ParameterizedMessage("exception reading step keys checking for refreshability, phase definition: {}",
phaseDef), e);
return null;
}

if (phaseExecutionInfo == null || phaseExecutionInfo.getPhase() == null) {
return null;
}

return phaseExecutionInfo.getPhase().getActions().values().stream()
.flatMap(a -> a.toSteps(client, phaseExecutionInfo.getPhase().getName(), null).stream())
.map(Step::getKey)
.collect(Collectors.toCollection(LinkedHashSet::new));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.StreamSupport;

import static org.elasticsearch.xpack.core.DataTier.DATA_COLD;
import static org.elasticsearch.xpack.core.DataTier.DATA_HOT;
import static org.elasticsearch.xpack.core.DataTier.DATA_WARM;
import static org.elasticsearch.xpack.core.DataTier.getPreferredTiersConfiguration;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.not;
Expand Down Expand Up @@ -110,6 +115,14 @@ public void testDataRoleDoesNotImplyTieredDataRoles() {
assertThat(node.getRoles(), not(hasItem(DiscoveryNodeRole.DATA_COLD_NODE_ROLE)));
}

public void testGetPreferredTiersConfiguration() {
assertThat(getPreferredTiersConfiguration(DATA_HOT), is(DATA_HOT));
assertThat(getPreferredTiersConfiguration(DATA_WARM), is(DATA_WARM + "," + DATA_HOT));
assertThat(getPreferredTiersConfiguration(DATA_COLD), is(DATA_COLD + "," + DATA_WARM + "," + DATA_HOT));
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> getPreferredTiersConfiguration("no_tier"));
assertThat(exception.getMessage(), is("invalid data tier [no_tier]"));
}

private static DiscoveryNodes buildDiscoveryNodes() {
int numNodes = randomIntBetween(3, 15);
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static org.elasticsearch.xpack.core.DataTier.DATA_COLD;
import static org.elasticsearch.xpack.core.DataTier.DATA_HOT;
import static org.elasticsearch.xpack.core.DataTier.DATA_WARM;
import static org.elasticsearch.xpack.core.ilm.MigrateAction.getPreferredTiersConfiguration;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.COLD_PHASE;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.DELETE_PHASE;
import static org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType.HOT_PHASE;
Expand Down Expand Up @@ -83,14 +82,6 @@ public void testToSteps() {
}
}

public void testGetPreferredTiersConfiguration() {
assertThat(getPreferredTiersConfiguration(DATA_HOT), is(DATA_HOT));
assertThat(getPreferredTiersConfiguration(DATA_WARM), is(DATA_WARM + "," + DATA_HOT));
assertThat(getPreferredTiersConfiguration(DATA_COLD), is(DATA_COLD + "," + DATA_WARM + "," + DATA_HOT));
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> getPreferredTiersConfiguration("no_tier"));
assertThat(exception.getMessage(), is("invalid data tier [no_tier]"));
}

public void testMigrateActionsConfiguresTierPreference() {
StepKey nextStepKey = new StepKey(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
randomAlphaOfLengthBetween(1, 10));
Expand Down
Loading