Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ public class IndexLifecycleService extends AbstractComponent
private final SetOnce<SchedulerEngine> scheduler = new SetOnce<>();
private final Clock clock;
private final PolicyStepsRegistry policyRegistry;
private final IndexLifecycleRunner lifecycleRunner;
private Client client;
private ClusterService clusterService;
private LongSupplier nowSupplier;
private SchedulerEngine.Job scheduledJob;
private IndexLifecycleRunner lifecycleRunner;

public IndexLifecycleService(Settings settings, Client client, ClusterService clusterService, Clock clock, LongSupplier nowSupplier) {
super(settings);
Expand Down Expand Up @@ -121,9 +121,9 @@ public void applyClusterState(ClusterChangedEvent event) {
if (event.localNodeMaster()) { // only act if we are master, otherwise
// keep idle until elected
IndexLifecycleMetadata lifecycleMetadata = event.state().metaData().custom(IndexLifecycleMetadata.TYPE);
if (lifecycleMetadata != null && event.changedCustomMetaDataSet().contains(IndexLifecycleMetadata.TYPE)) {
Copy link
Contributor Author

@talevy talevy Jul 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @jasontedor,

Previously, IndexLifecycleService was not updating its internal state when
a node was newly-elected master, and there were no changes to policies in the cluster-state.

To fix this, there are a few options, but two that I see are as follows:

  1. always attempt to call policyRegistry.update, Since the real diff that matters is between the internal state of the registry and the current cluster state.
  2. upon un-election, clear the registry, and re-bootstrap it on the first cluster-state listener call that is triggered once it is master. (a version of this is what exists in the code now, but should be cleaned up if this approach is taken)

(2) has the benefit that it makes it clear that this instance is no longer master and therefore should forget about any policies it once had since it is not keeping up to date with updating it anymore for when changes do occur. The downside here is that this can result in weird behavior for when nodes are un-elected, then elected, all before the next state listener callback occurs.

(1) has the benefit that there does not seem to be any edge cases to worry about. The only downside is that the object will be stale until it becomes master again. This should be OK because the cluster-state-applier that will re-update the registry will be called before the cluster-state-listener-callback re-launches the scheduled job and triggers policies.

Are there aspects of this that these thoughts are missing in the story of re-election and state management?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update to this inquiry has been discussed in #32181 and #32212 was opened to address it

if (lifecycleMetadata != null) {
// update policy steps registry
policyRegistry.update(event.state(), client, nowSupplier);
policyRegistry.update(lifecycleMetadata, client, nowSupplier);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ Map<String, Map<Step.StepKey, Step>> getStepMap() {


@SuppressWarnings({ "unchecked", "rawtypes" })
public void update(ClusterState currentState, Client client, LongSupplier nowSupplier) {
IndexLifecycleMetadata meta = currentState.metaData().custom(IndexLifecycleMetadata.TYPE);
public void update(IndexLifecycleMetadata meta, Client client, LongSupplier nowSupplier) {
assert meta != null : "IndexLifecycleMetadata cannot be null when updating the policy steps registry";

Diff<Map<String, LifecyclePolicyMetadata>> diff = DiffableUtils.diff(lifecyclePolicyMap, meta.getPolicyMetadatas(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ public void prepareState() {
.put(LifecycleSettings.LIFECYCLE_STEP, "init"))
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
index = indexMetadata.getIndex();

IndexLifecycleMetadata lifecycleMetadata = new IndexLifecycleMetadata(policyMap, OperationMode.RUNNING);
MetaData metaData = MetaData.builder()
.persistentSettings(settings(Version.CURRENT).build())
.putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(policyMap, OperationMode.RUNNING))
.putCustom(IndexLifecycleMetadata.TYPE, lifecycleMetadata)
.put(IndexMetaData.builder(indexMetadata))
.build();
String nodeId = randomAlphaOfLength(10);
Expand All @@ -118,7 +118,7 @@ public void prepareState() {
.metaData(metaData)
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
.build();
policyStepsRegistry.update(clusterState, client, () -> 0L);
policyStepsRegistry.update(lifecycleMetadata, client, () -> 0L);
}

public void testExecuteAllUntilEndOfPolicy() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,44 @@

import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.indexlifecycle.DeleteAction;
import org.elasticsearch.xpack.core.indexlifecycle.ForceMergeAction;
import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleAction;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleType;
import org.elasticsearch.xpack.core.indexlifecycle.MockAction;
import org.elasticsearch.xpack.core.indexlifecycle.Phase;
import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType;
import org.elasticsearch.xpack.core.indexlifecycle.Step;
import org.elasticsearch.xpack.core.indexlifecycle.TerminalPolicyStep;
import org.elasticsearch.xpack.core.indexlifecycle.action.PutLifecycleAction;
import org.junit.Before;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.elasticsearch.client.Requests.clusterHealthRequest;
import static org.elasticsearch.client.Requests.createIndexRequest;
Expand Down Expand Up @@ -83,7 +95,7 @@ protected Settings transportClientSettings() {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(LocalStateCompositeXPackPlugin.class, IndexLifecycle.class);
return Arrays.asList(LocalStateCompositeXPackPlugin.class, IndexLifecycle.class, TestILMPlugin.class);
}

@Override
Expand All @@ -95,17 +107,16 @@ protected Collection<Class<? extends Plugin>> transportClientPlugins() {
public void init() {
settings = Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 0).put(LifecycleSettings.LIFECYCLE_NAME, "test").build();
Map<String, Phase> phases = new HashMap<>();

Map<String, LifecycleAction> warmPhaseActions = Collections.singletonMap(ForceMergeAction.NAME, new ForceMergeAction(10000));
phases.put("warm", new Phase("warm", TimeValue.timeValueSeconds(2), warmPhaseActions));

Map<String, LifecycleAction> deletePhaseActions = Collections.singletonMap(DeleteAction.NAME, new DeleteAction());
phases.put("delete", new Phase("delete", TimeValue.timeValueSeconds(3), deletePhaseActions));
lifecyclePolicy = new LifecyclePolicy(TimeseriesLifecycleType.INSTANCE, "test", phases);
List<Step> steps = new ArrayList<>();
Step.StepKey key = new Step.StepKey("mock", ObservableAction.NAME, ObservableClusterStateWaitStep.NAME);
steps.add(new ObservableClusterStateWaitStep(key, TerminalPolicyStep.KEY));
Map<String, LifecycleAction> actions = Collections.singletonMap(ObservableAction.NAME, new ObservableAction(steps, true));
Map<String, Phase> phases = Collections.singletonMap("mock", new Phase("mock", TimeValue.timeValueSeconds(0), actions));
lifecyclePolicy = new LifecyclePolicy(LockableLifecycleType.INSTANCE, "test", phases);
}

public void testSingleNodeCluster() throws Exception {
settings = Settings.builder().put(settings).put("index.lifecycle.test.complete", true).build();
// start master node
logger.info("Starting server1");
final String server_1 = internalCluster().startNode();
Expand All @@ -122,13 +133,17 @@ public void testSingleNodeCluster() throws Exception {
RoutingNode routingNodeEntry1 = clusterState.getRoutingNodes().node(node1);
assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), equalTo(1));
assertBusy(() -> {
assertEquals(false, client().admin().indices().prepareExists("test").get().isExists());
assertEquals(true, client().admin().indices().prepareExists("test").get().isExists());
});
assertBusy(() -> {
GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings("test").get();
String step = settingsResponse.getSetting("test", "index.lifecycle.step");
assertThat(step, equalTo(TerminalPolicyStep.KEY.getName()));
});
}

// NORELEASE re-enable when we re-visit integration testing
@AwaitsFix(bugUrl = "Fails because of timing")
public void testMasterDedicatedDataDedicated() throws Exception {
settings = Settings.builder().put(settings).put("index.lifecycle.test.complete", true).build();
// start master node
logger.info("Starting sever1");
internalCluster().startMasterOnlyNode();
Expand All @@ -151,12 +166,15 @@ public void testMasterDedicatedDataDedicated() throws Exception {
assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), equalTo(1));

assertBusy(() -> {
assertEquals(false, client().admin().indices().prepareExists("test").get().isExists());
assertEquals(true, client().admin().indices().prepareExists("test").get().isExists());
});
assertBusy(() -> {
GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings("test").get();
String step = settingsResponse.getSetting("test", "index.lifecycle.step");
assertThat(step, equalTo(TerminalPolicyStep.KEY.getName()));
});
}

// NORELEASE re-enable when force merge action bug is fixed
@AwaitsFix(bugUrl = "Fails because force merge action expect shards to be assigned")
public void testMasterFailover() throws Exception {
// start one server
logger.info("Starting sever1");
Expand Down Expand Up @@ -189,12 +207,33 @@ public void testMasterFailover() throws Exception {
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));

// check step in progress in lifecycle
assertBusy(() -> {
GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings("test").get();
String step = settingsResponse.getSetting("test", "index.lifecycle.step");
assertThat(step, equalTo(ObservableClusterStateWaitStep.NAME));
});


logger.info("Closing server1");
// kill the first server
internalCluster().stopCurrentMasterNode();

// check that index lifecycle picked back up where it
assertBusy(() -> {
assertEquals(false, client().admin().indices().prepareExists("test").get().isExists());
GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings("test").get();
String step = settingsResponse.getSetting("test", "index.lifecycle.step");
assertThat(step, equalTo(ObservableClusterStateWaitStep.NAME));
});

// complete the step
client().admin().indices().prepareUpdateSettings("test")
.setSettings(Collections.singletonMap("index.lifecycle.test.complete", true)).get();

assertBusy(() -> {
GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings("test").get();
String step = settingsResponse.getSetting("test", "index.lifecycle.step");
assertThat(step, equalTo(TerminalPolicyStep.KEY.getName()));
});
}

Expand All @@ -204,4 +243,86 @@ private String getLocalNodeId(String name) {
assertThat(nodeId, not(nullValue()));
return nodeId;
}

public static class TestILMPlugin extends Plugin {
public TestILMPlugin() {
}

public List<Setting<?>> getSettings() {
final Setting<Boolean> COMPLETE_SETTING = Setting.boolSetting("index.lifecycle.test.complete", false,
Setting.Property.Dynamic, Setting.Property.IndexScope);
return Collections.singletonList(COMPLETE_SETTING);
}
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return Arrays.asList(new NamedWriteableRegistry.Entry(LifecycleType.class, LockableLifecycleType.TYPE,
(in) -> LockableLifecycleType.INSTANCE),
new NamedWriteableRegistry.Entry(LifecycleAction.class, ObservableAction.NAME, ObservableAction::readObservableAction),
new NamedWriteableRegistry.Entry(ObservableClusterStateWaitStep.class, ObservableClusterStateWaitStep.NAME,
ObservableClusterStateWaitStep::new));
}
}

public static class ObservableClusterStateWaitStep extends ClusterStateWaitStep implements NamedWriteable {
public static final String NAME = "observable_cluster_state_action";

public ObservableClusterStateWaitStep(StepKey current, StepKey next) {
super(current, next);
}

public ObservableClusterStateWaitStep(StreamInput in) throws IOException {
this(new StepKey(in.readString(), in.readString(), in.readString()), readOptionalNextStepKey(in));
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(getKey().getPhase());
out.writeString(getKey().getAction());
out.writeString(getKey().getName());
boolean hasNextStep = getNextStepKey() != null;
out.writeBoolean(hasNextStep);
if (hasNextStep) {
out.writeString(getNextStepKey().getPhase());
out.writeString(getNextStepKey().getAction());
out.writeString(getNextStepKey().getName());
}
}

private static StepKey readOptionalNextStepKey(StreamInput in) throws IOException {
if (in.readBoolean()) {
return new StepKey(in.readString(), in.readString(), in.readString());
}
return null;
}

@Override
public String getWriteableName() {
return NAME;
}

@Override
public Result isConditionMet(Index index, ClusterState clusterState) {
boolean complete = clusterState.metaData().index("test").getSettings()
.getAsBoolean("index.lifecycle.test.complete", false);
return new Result(complete, null);
}
}

public static class ObservableAction extends MockAction {

ObservableAction(List<Step> steps, boolean safe) {
super(steps, safe);
}

public static ObservableAction readObservableAction(StreamInput in) throws IOException {
List<Step> steps = in.readList(ObservableClusterStateWaitStep::new);
boolean safe = in.readBoolean();
return new ObservableAction(steps, safe);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeList(getSteps().stream().map(s -> (ObservableClusterStateWaitStep) s).collect(Collectors.toList()));
out.writeBoolean(isSafeAction());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,7 @@ public class IndexLifecycleMetadataTests extends AbstractDiffableSerializationTe

@Override
protected IndexLifecycleMetadata createTestInstance() {
int numPolicies = randomInt(5);
SortedMap<String, LifecyclePolicyMetadata> policies = new TreeMap<>();
for (int i = 0; i < numPolicies; i++) {
int numberPhases = randomInt(5);
Map<String, Phase> phases = new HashMap<>(numberPhases);
for (int j = 0; j < numberPhases; j++) {
TimeValue after = TimeValue.parseTimeValue(randomTimeValue(0, 1000000000, "s", "m", "h", "d"), "test_after");
Map<String, LifecycleAction> actions = Collections.emptyMap();
if (randomBoolean()) {
actions = Collections.singletonMap(DeleteAction.NAME, new DeleteAction());
}
String phaseName = randomAlphaOfLength(10);
phases.put(phaseName, new Phase(phaseName, after, actions));
}
String policyName = randomAlphaOfLength(10);
policies.put(policyName, new LifecyclePolicyMetadata(new LifecyclePolicy(TestLifecycleType.INSTANCE, policyName, phases),
Collections.emptyMap()));
}
return new IndexLifecycleMetadata(policies, randomFrom(OperationMode.values()));
return createTestInstance(randomInt(5), randomFrom(OperationMode.values()));
}

@Override
Expand Down Expand Up @@ -123,4 +105,24 @@ public void testcontext() {
assertEquals(MetaData.ALL_CONTEXTS, createTestInstance().context());
}

public static IndexLifecycleMetadata createTestInstance(int numPolicies, OperationMode mode) {
SortedMap<String, LifecyclePolicyMetadata> policies = new TreeMap<>();
for (int i = 0; i < numPolicies; i++) {
int numberPhases = randomInt(5);
Map<String, Phase> phases = new HashMap<>(numberPhases);
for (int j = 0; j < numberPhases; j++) {
TimeValue after = TimeValue.parseTimeValue(randomTimeValue(0, 1000000000, "s", "m", "h", "d"), "test_after");
Map<String, LifecycleAction> actions = Collections.emptyMap();
if (randomBoolean()) {
actions = Collections.singletonMap(DeleteAction.NAME, new DeleteAction());
}
String phaseName = randomAlphaOfLength(10);
phases.put(phaseName, new Phase(phaseName, after, actions));
}
String policyName = randomAlphaOfLength(10);
policies.put(policyName, new LifecyclePolicyMetadata(new LifecyclePolicy(TestLifecycleType.INSTANCE, policyName, phases),
Collections.emptyMap()));
}
return new IndexLifecycleMetadata(policies, mode);
}
}
Loading