diff --git a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java index 55c730799c0db..486bc8cce8a94 100644 --- a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java +++ b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleService.java @@ -45,11 +45,11 @@ public class IndexLifecycleService extends AbstractComponent private final SetOnce scheduler = new SetOnce<>(); private final Clock clock; private final PolicyStepsRegistry policyRegistry; + private final IndexLifecycleRunner lifecycleRunner; private Client client; private ClusterService clusterService; private LongSupplier nowSupplier; private SchedulerEngine.Job scheduledJob; - private IndexLifecycleRunner lifecycleRunner; public IndexLifecycleService(Settings settings, Client client, ClusterService clusterService, Clock clock, LongSupplier nowSupplier) { super(settings); @@ -121,9 +121,9 @@ public void applyClusterState(ClusterChangedEvent event) { if (event.localNodeMaster()) { // only act if we are master, otherwise // keep idle until elected IndexLifecycleMetadata lifecycleMetadata = event.state().metaData().custom(IndexLifecycleMetadata.TYPE); - if (lifecycleMetadata != null && event.changedCustomMetaDataSet().contains(IndexLifecycleMetadata.TYPE)) { + if (lifecycleMetadata != null) { // update policy steps registry - policyRegistry.update(event.state(), client, nowSupplier); + policyRegistry.update(lifecycleMetadata, client, nowSupplier); } } } diff --git a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/PolicyStepsRegistry.java b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/PolicyStepsRegistry.java index 0db59c9fc5ffe..f62364c39f8a3 100644 --- a/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/PolicyStepsRegistry.java +++ b/x-pack/plugin/index-lifecycle/src/main/java/org/elasticsearch/xpack/indexlifecycle/PolicyStepsRegistry.java @@ -61,8 +61,7 @@ Map> getStepMap() { @SuppressWarnings({ "unchecked", "rawtypes" }) - public void update(ClusterState currentState, Client client, LongSupplier nowSupplier) { - IndexLifecycleMetadata meta = currentState.metaData().custom(IndexLifecycleMetadata.TYPE); + public void update(IndexLifecycleMetadata meta, Client client, LongSupplier nowSupplier) { assert meta != null : "IndexLifecycleMetadata cannot be null when updating the policy steps registry"; Diff> diff = DiffableUtils.diff(lifecyclePolicyMap, meta.getPolicyMetadatas(), diff --git a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTaskTests.java b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTaskTests.java index 233356e743e84..e72b6029450a0 100644 --- a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTaskTests.java +++ b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/ExecuteStepsUpdateTaskTests.java @@ -104,10 +104,10 @@ public void prepareState() { .put(LifecycleSettings.LIFECYCLE_STEP, "init")) .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); index = indexMetadata.getIndex(); - + IndexLifecycleMetadata lifecycleMetadata = new IndexLifecycleMetadata(policyMap, OperationMode.RUNNING); MetaData metaData = MetaData.builder() .persistentSettings(settings(Version.CURRENT).build()) - .putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(policyMap, OperationMode.RUNNING)) + .putCustom(IndexLifecycleMetadata.TYPE, lifecycleMetadata) .put(IndexMetaData.builder(indexMetadata)) .build(); String nodeId = randomAlphaOfLength(10); @@ -118,7 +118,7 @@ public void prepareState() { .metaData(metaData) .nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build()) .build(); - policyStepsRegistry.update(clusterState, client, () -> 0L); + policyStepsRegistry.update(lifecycleMetadata, client, () -> 0L); } public void testExecuteAllUntilEndOfPolicy() throws IOException { diff --git a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationIT.java b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationIT.java index 248a2b317e562..a29137d357768 100644 --- a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationIT.java +++ b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleInitialisationIT.java @@ -7,32 +7,44 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.common.io.stream.NamedWriteable; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.Index; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; import org.elasticsearch.xpack.core.XPackSettings; -import org.elasticsearch.xpack.core.indexlifecycle.DeleteAction; -import org.elasticsearch.xpack.core.indexlifecycle.ForceMergeAction; +import org.elasticsearch.xpack.core.indexlifecycle.ClusterStateWaitStep; import org.elasticsearch.xpack.core.indexlifecycle.LifecycleAction; import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy; import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings; +import org.elasticsearch.xpack.core.indexlifecycle.LifecycleType; +import org.elasticsearch.xpack.core.indexlifecycle.MockAction; import org.elasticsearch.xpack.core.indexlifecycle.Phase; -import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType; +import org.elasticsearch.xpack.core.indexlifecycle.Step; +import org.elasticsearch.xpack.core.indexlifecycle.TerminalPolicyStep; import org.elasticsearch.xpack.core.indexlifecycle.action.PutLifecycleAction; import org.junit.Before; +import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.elasticsearch.client.Requests.clusterHealthRequest; import static org.elasticsearch.client.Requests.createIndexRequest; @@ -83,7 +95,7 @@ protected Settings transportClientSettings() { @Override protected Collection> nodePlugins() { - return Arrays.asList(LocalStateCompositeXPackPlugin.class, IndexLifecycle.class); + return Arrays.asList(LocalStateCompositeXPackPlugin.class, IndexLifecycle.class, TestILMPlugin.class); } @Override @@ -95,17 +107,16 @@ protected Collection> transportClientPlugins() { public void init() { settings = Settings.builder().put(indexSettings()).put(SETTING_NUMBER_OF_SHARDS, 1) .put(SETTING_NUMBER_OF_REPLICAS, 0).put(LifecycleSettings.LIFECYCLE_NAME, "test").build(); - Map phases = new HashMap<>(); - - Map warmPhaseActions = Collections.singletonMap(ForceMergeAction.NAME, new ForceMergeAction(10000)); - phases.put("warm", new Phase("warm", TimeValue.timeValueSeconds(2), warmPhaseActions)); - - Map deletePhaseActions = Collections.singletonMap(DeleteAction.NAME, new DeleteAction()); - phases.put("delete", new Phase("delete", TimeValue.timeValueSeconds(3), deletePhaseActions)); - lifecyclePolicy = new LifecyclePolicy(TimeseriesLifecycleType.INSTANCE, "test", phases); + List steps = new ArrayList<>(); + Step.StepKey key = new Step.StepKey("mock", ObservableAction.NAME, ObservableClusterStateWaitStep.NAME); + steps.add(new ObservableClusterStateWaitStep(key, TerminalPolicyStep.KEY)); + Map actions = Collections.singletonMap(ObservableAction.NAME, new ObservableAction(steps, true)); + Map phases = Collections.singletonMap("mock", new Phase("mock", TimeValue.timeValueSeconds(0), actions)); + lifecyclePolicy = new LifecyclePolicy(LockableLifecycleType.INSTANCE, "test", phases); } public void testSingleNodeCluster() throws Exception { + settings = Settings.builder().put(settings).put("index.lifecycle.test.complete", true).build(); // start master node logger.info("Starting server1"); final String server_1 = internalCluster().startNode(); @@ -122,13 +133,17 @@ public void testSingleNodeCluster() throws Exception { RoutingNode routingNodeEntry1 = clusterState.getRoutingNodes().node(node1); assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), equalTo(1)); assertBusy(() -> { - assertEquals(false, client().admin().indices().prepareExists("test").get().isExists()); + assertEquals(true, client().admin().indices().prepareExists("test").get().isExists()); + }); + assertBusy(() -> { + GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings("test").get(); + String step = settingsResponse.getSetting("test", "index.lifecycle.step"); + assertThat(step, equalTo(TerminalPolicyStep.KEY.getName())); }); } - // NORELEASE re-enable when we re-visit integration testing - @AwaitsFix(bugUrl = "Fails because of timing") public void testMasterDedicatedDataDedicated() throws Exception { + settings = Settings.builder().put(settings).put("index.lifecycle.test.complete", true).build(); // start master node logger.info("Starting sever1"); internalCluster().startMasterOnlyNode(); @@ -151,12 +166,15 @@ public void testMasterDedicatedDataDedicated() throws Exception { assertThat(routingNodeEntry1.numberOfShardsWithState(STARTED), equalTo(1)); assertBusy(() -> { - assertEquals(false, client().admin().indices().prepareExists("test").get().isExists()); + assertEquals(true, client().admin().indices().prepareExists("test").get().isExists()); + }); + assertBusy(() -> { + GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings("test").get(); + String step = settingsResponse.getSetting("test", "index.lifecycle.step"); + assertThat(step, equalTo(TerminalPolicyStep.KEY.getName())); }); } - // NORELEASE re-enable when force merge action bug is fixed - @AwaitsFix(bugUrl = "Fails because force merge action expect shards to be assigned") public void testMasterFailover() throws Exception { // start one server logger.info("Starting sever1"); @@ -189,12 +207,33 @@ public void testMasterFailover() throws Exception { assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); + // check step in progress in lifecycle + assertBusy(() -> { + GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings("test").get(); + String step = settingsResponse.getSetting("test", "index.lifecycle.step"); + assertThat(step, equalTo(ObservableClusterStateWaitStep.NAME)); + }); + + logger.info("Closing server1"); // kill the first server internalCluster().stopCurrentMasterNode(); + // check that index lifecycle picked back up where it assertBusy(() -> { - assertEquals(false, client().admin().indices().prepareExists("test").get().isExists()); + GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings("test").get(); + String step = settingsResponse.getSetting("test", "index.lifecycle.step"); + assertThat(step, equalTo(ObservableClusterStateWaitStep.NAME)); + }); + + // complete the step + client().admin().indices().prepareUpdateSettings("test") + .setSettings(Collections.singletonMap("index.lifecycle.test.complete", true)).get(); + + assertBusy(() -> { + GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings("test").get(); + String step = settingsResponse.getSetting("test", "index.lifecycle.step"); + assertThat(step, equalTo(TerminalPolicyStep.KEY.getName())); }); } @@ -204,4 +243,86 @@ private String getLocalNodeId(String name) { assertThat(nodeId, not(nullValue())); return nodeId; } + + public static class TestILMPlugin extends Plugin { + public TestILMPlugin() { + } + + public List> getSettings() { + final Setting COMPLETE_SETTING = Setting.boolSetting("index.lifecycle.test.complete", false, + Setting.Property.Dynamic, Setting.Property.IndexScope); + return Collections.singletonList(COMPLETE_SETTING); + } + public List getNamedWriteables() { + return Arrays.asList(new NamedWriteableRegistry.Entry(LifecycleType.class, LockableLifecycleType.TYPE, + (in) -> LockableLifecycleType.INSTANCE), + new NamedWriteableRegistry.Entry(LifecycleAction.class, ObservableAction.NAME, ObservableAction::readObservableAction), + new NamedWriteableRegistry.Entry(ObservableClusterStateWaitStep.class, ObservableClusterStateWaitStep.NAME, + ObservableClusterStateWaitStep::new)); + } + } + + public static class ObservableClusterStateWaitStep extends ClusterStateWaitStep implements NamedWriteable { + public static final String NAME = "observable_cluster_state_action"; + + public ObservableClusterStateWaitStep(StepKey current, StepKey next) { + super(current, next); + } + + public ObservableClusterStateWaitStep(StreamInput in) throws IOException { + this(new StepKey(in.readString(), in.readString(), in.readString()), readOptionalNextStepKey(in)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(getKey().getPhase()); + out.writeString(getKey().getAction()); + out.writeString(getKey().getName()); + boolean hasNextStep = getNextStepKey() != null; + out.writeBoolean(hasNextStep); + if (hasNextStep) { + out.writeString(getNextStepKey().getPhase()); + out.writeString(getNextStepKey().getAction()); + out.writeString(getNextStepKey().getName()); + } + } + + private static StepKey readOptionalNextStepKey(StreamInput in) throws IOException { + if (in.readBoolean()) { + return new StepKey(in.readString(), in.readString(), in.readString()); + } + return null; + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + public Result isConditionMet(Index index, ClusterState clusterState) { + boolean complete = clusterState.metaData().index("test").getSettings() + .getAsBoolean("index.lifecycle.test.complete", false); + return new Result(complete, null); + } + } + + public static class ObservableAction extends MockAction { + + ObservableAction(List steps, boolean safe) { + super(steps, safe); + } + + public static ObservableAction readObservableAction(StreamInput in) throws IOException { + List steps = in.readList(ObservableClusterStateWaitStep::new); + boolean safe = in.readBoolean(); + return new ObservableAction(steps, safe); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeList(getSteps().stream().map(s -> (ObservableClusterStateWaitStep) s).collect(Collectors.toList())); + out.writeBoolean(isSafeAction()); + } + } } diff --git a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleMetadataTests.java b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleMetadataTests.java index b577aa6714254..eaf34c0cc4f45 100644 --- a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleMetadataTests.java +++ b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleMetadataTests.java @@ -42,25 +42,7 @@ public class IndexLifecycleMetadataTests extends AbstractDiffableSerializationTe @Override protected IndexLifecycleMetadata createTestInstance() { - int numPolicies = randomInt(5); - SortedMap policies = new TreeMap<>(); - for (int i = 0; i < numPolicies; i++) { - int numberPhases = randomInt(5); - Map phases = new HashMap<>(numberPhases); - for (int j = 0; j < numberPhases; j++) { - TimeValue after = TimeValue.parseTimeValue(randomTimeValue(0, 1000000000, "s", "m", "h", "d"), "test_after"); - Map actions = Collections.emptyMap(); - if (randomBoolean()) { - actions = Collections.singletonMap(DeleteAction.NAME, new DeleteAction()); - } - String phaseName = randomAlphaOfLength(10); - phases.put(phaseName, new Phase(phaseName, after, actions)); - } - String policyName = randomAlphaOfLength(10); - policies.put(policyName, new LifecyclePolicyMetadata(new LifecyclePolicy(TestLifecycleType.INSTANCE, policyName, phases), - Collections.emptyMap())); - } - return new IndexLifecycleMetadata(policies, randomFrom(OperationMode.values())); + return createTestInstance(randomInt(5), randomFrom(OperationMode.values())); } @Override @@ -123,4 +105,24 @@ public void testcontext() { assertEquals(MetaData.ALL_CONTEXTS, createTestInstance().context()); } + public static IndexLifecycleMetadata createTestInstance(int numPolicies, OperationMode mode) { + SortedMap policies = new TreeMap<>(); + for (int i = 0; i < numPolicies; i++) { + int numberPhases = randomInt(5); + Map phases = new HashMap<>(numberPhases); + for (int j = 0; j < numberPhases; j++) { + TimeValue after = TimeValue.parseTimeValue(randomTimeValue(0, 1000000000, "s", "m", "h", "d"), "test_after"); + Map actions = Collections.emptyMap(); + if (randomBoolean()) { + actions = Collections.singletonMap(DeleteAction.NAME, new DeleteAction()); + } + String phaseName = randomAlphaOfLength(10); + phases.put(phaseName, new Phase(phaseName, after, actions)); + } + String policyName = randomAlphaOfLength(10); + policies.put(policyName, new LifecyclePolicyMetadata(new LifecyclePolicy(TestLifecycleType.INSTANCE, policyName, phases), + Collections.emptyMap())); + } + return new IndexLifecycleMetadata(policies, mode); + } } diff --git a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java index 73228fbec76bc..8d3eca2322ff8 100644 --- a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java +++ b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycleServiceTests.java @@ -44,6 +44,7 @@ import java.time.Instant; import java.time.ZoneId; import java.util.Collections; +import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.ExecutorService; @@ -127,11 +128,48 @@ public void testOnlyChangesStateOnMasterAndMetadataExists() { assertNull(indexLifecycleService.getScheduler()); } - public void testElectUnElectMaster() throws Exception { + public void testOnlyChangesStateOnMasterWhenMetadataChanges() { + int numPolicies = randomIntBetween(1, 5); + IndexLifecycleMetadata lifecycleMetadata = IndexLifecycleMetadataTests.createTestInstance(numPolicies, OperationMode.RUNNING); + IndexLifecycleMetadata newLifecycleMetadata = randomValueOtherThan(lifecycleMetadata, + () -> IndexLifecycleMetadataTests.createTestInstance(numPolicies, OperationMode.RUNNING)); + MetaData previousMetadata = MetaData.builder() + .persistentSettings(settings(Version.CURRENT) + .put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(3)).build()) + .putCustom(IndexLifecycleMetadata.TYPE, lifecycleMetadata) + .build(); + MetaData newMetaData = MetaData.builder(previousMetadata).putCustom(IndexLifecycleMetadata.TYPE, newLifecycleMetadata).build(); + + ClusterState previousState = ClusterState.builder(ClusterName.DEFAULT) + .metaData(previousMetadata) + .nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build()) + .build(); + ClusterState newState = ClusterState.builder(previousState).metaData(newMetaData).build(); + ClusterChangedEvent event = new ClusterChangedEvent("_source", previousState, previousState); + + Mockito.reset(clusterService); + PolicyStepsRegistry policyStepsRegistry = indexLifecycleService.getPolicyRegistry(); + indexLifecycleService.applyClusterState(event); + indexLifecycleService.clusterChanged(event); + Mockito.verifyZeroInteractions(clusterService); + assertNotNull(indexLifecycleService.getScheduler()); + assertEquals(1, indexLifecycleService.getScheduler().jobCount()); + assertNotNull(indexLifecycleService.getScheduledJob()); + assertThat(policyStepsRegistry.getLifecyclePolicyMap().keySet(), equalTo(lifecycleMetadata.getPolicyMetadatas().keySet())); + + event = new ClusterChangedEvent("_source", newState, previousState); + indexLifecycleService.applyClusterState(event); + assertThat(policyStepsRegistry.getLifecyclePolicyMap().keySet(), equalTo(newLifecycleMetadata.getPolicyMetadatas().keySet())); + } + + public void testElectUnElectMaster() { + int numberOfPolicies = randomIntBetween(1, 5); + IndexLifecycleMetadata lifecycleMetadata = IndexLifecycleMetadataTests.createTestInstance(numberOfPolicies, OperationMode.RUNNING); + Map expectedPolicyMap = lifecycleMetadata.getPolicyMetadatas(); MetaData metaData = MetaData.builder() .persistentSettings(settings(Version.CURRENT) .put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(3)).build()) - .putCustom(IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata.EMPTY) + .putCustom(IndexLifecycleMetadata.TYPE, lifecycleMetadata) .build(); // First check that when the node has never been master the scheduler @@ -158,13 +196,14 @@ public void testElectUnElectMaster() throws Exception { event = new ClusterChangedEvent("_source", state, state); // Check that when the node is first elected as master it sets up - // the scheduler and job + // the scheduler job and steps registry indexLifecycleService.applyClusterState(event); indexLifecycleService.clusterChanged(event); Mockito.verifyZeroInteractions(clusterService); assertNotNull(indexLifecycleService.getScheduler()); assertEquals(1, indexLifecycleService.getScheduler().jobCount()); assertNotNull(indexLifecycleService.getScheduledJob()); + assertThat(indexLifecycleService.getPolicyRegistry().getLifecyclePolicyMap(), equalTo(expectedPolicyMap)); Mockito.reset(clusterService); state = ClusterState.builder(ClusterName.DEFAULT) @@ -173,13 +212,14 @@ public void testElectUnElectMaster() throws Exception { .build(); event = new ClusterChangedEvent("_source", state, state); - // Check that when the node is un-elected as master it cancels the job indexLifecycleService.applyClusterState(event); + // Check that when the node is un-elected as master it cancels the job and cleans up steps registry indexLifecycleService.clusterChanged(event); Mockito.verifyZeroInteractions(clusterService); assertNotNull(indexLifecycleService.getScheduler()); assertEquals(0, indexLifecycleService.getScheduler().jobCount()); assertNull(indexLifecycleService.getScheduledJob()); + assertThat(indexLifecycleService.getPolicyRegistry().getLifecyclePolicyMap(), equalTo(expectedPolicyMap)); Mockito.reset(clusterService); state = ClusterState.builder(ClusterName.DEFAULT) @@ -188,13 +228,14 @@ public void testElectUnElectMaster() throws Exception { .build(); event = new ClusterChangedEvent("_source", state, state); - // Check that when the node is re-elected as master it cancels the job + // Check that when the node is re-elected as master it re-starts the job and populates the registry indexLifecycleService.applyClusterState(event); indexLifecycleService.clusterChanged(event); Mockito.verifyZeroInteractions(clusterService); assertNotNull(indexLifecycleService.getScheduler()); assertEquals(1, indexLifecycleService.getScheduler().jobCount()); assertNotNull(indexLifecycleService.getScheduledJob()); + assertThat(indexLifecycleService.getPolicyRegistry().getLifecyclePolicyMap(), equalTo(expectedPolicyMap)); } public void testSchedulerInitializationAndUpdate() { diff --git a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/LockableLifecycleType.java b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/LockableLifecycleType.java new file mode 100644 index 0000000000000..3e09133c435a8 --- /dev/null +++ b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/LockableLifecycleType.java @@ -0,0 +1,64 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.indexlifecycle; + +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.xpack.core.indexlifecycle.LifecycleAction; +import org.elasticsearch.xpack.core.indexlifecycle.LifecycleType; +import org.elasticsearch.xpack.core.indexlifecycle.Phase; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +/** + * This {@link LifecycleType} is used for encapsulating test policies + * used in integration tests where the underlying {@link LifecycleAction}s are + * able to communicate with the test + */ +public class LockableLifecycleType implements LifecycleType { + public static final String TYPE = "lockable"; + public static final LockableLifecycleType INSTANCE = new LockableLifecycleType(); + + @Override + public List getOrderedPhases(Map phases) { + return new ArrayList<>(phases.values()); + } + + @Override + public String getNextPhaseName(String currentPhaseName, Map phases) { + return null; + } + + @Override + public String getPreviousPhaseName(String currentPhaseName, Map phases) { + return null; + } + + @Override + public List getOrderedActions(Phase phase) { + return new ArrayList<>(phase.getActions().values()); + } + + @Override + public String getNextActionName(String currentActionName, Phase phase) { + return null; + } + + @Override + public void validate(Collection phases) { + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public void writeTo(StreamOutput out) { + } +} diff --git a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/PolicyStepsRegistryTests.java b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/PolicyStepsRegistryTests.java index 89d447e737259..b8ac644eb7380 100644 --- a/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/PolicyStepsRegistryTests.java +++ b/x-pack/plugin/index-lifecycle/src/test/java/org/elasticsearch/xpack/indexlifecycle/PolicyStepsRegistryTests.java @@ -105,9 +105,10 @@ public void testUpdateFromNothingToSomethingToNothing() { } Map policyMap = Collections.singletonMap(newPolicy.getName(), new LifecyclePolicyMetadata(newPolicy, headers)); + IndexLifecycleMetadata lifecycleMetadata = new IndexLifecycleMetadata(policyMap, OperationMode.RUNNING); MetaData metaData = MetaData.builder() .persistentSettings(settings(Version.CURRENT).build()) - .putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(policyMap, OperationMode.RUNNING)) + .putCustom(IndexLifecycleMetadata.TYPE, lifecycleMetadata) .build(); String nodeId = randomAlphaOfLength(10); DiscoveryNode masterNode = DiscoveryNode.createLocal(settings(Version.CURRENT) @@ -122,7 +123,7 @@ public void testUpdateFromNothingToSomethingToNothing() { PolicyStepsRegistry registry = new PolicyStepsRegistry(); // add new policy - registry.update(currentState, client, () -> 0L); + registry.update(lifecycleMetadata, client, () -> 0L); assertThat(registry.getFirstStep(newPolicy.getName()), equalTo(policySteps.get(0))); assertThat(registry.getLifecyclePolicyMap().size(), equalTo(1)); @@ -140,18 +141,18 @@ public void testUpdateFromNothingToSomethingToNothing() { Map registryPolicyMap = registry.getLifecyclePolicyMap(); Map registryFirstStepMap = registry.getFirstStepMap(); Map> registryStepMap = registry.getStepMap(); - registry.update(currentState, client, () -> 0L); + registry.update(lifecycleMetadata, client, () -> 0L); assertThat(registry.getLifecyclePolicyMap(), equalTo(registryPolicyMap)); assertThat(registry.getFirstStepMap(), equalTo(registryFirstStepMap)); assertThat(registry.getStepMap(), equalTo(registryStepMap)); // remove policy + lifecycleMetadata = new IndexLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING); currentState = ClusterState.builder(currentState) .metaData( MetaData.builder(metaData) - .putCustom(IndexLifecycleMetadata.TYPE, - new IndexLifecycleMetadata(Collections.emptyMap(), OperationMode.RUNNING))).build(); - registry.update(currentState, client, () -> 0L); + .putCustom(IndexLifecycleMetadata.TYPE, lifecycleMetadata)).build(); + registry.update(lifecycleMetadata, client, () -> 0L); assertTrue(registry.getLifecyclePolicyMap().isEmpty()); assertTrue(registry.getFirstStepMap().isEmpty()); assertTrue(registry.getStepMap().isEmpty()); @@ -169,9 +170,10 @@ public void testUpdateChangedPolicy() { } Map policyMap = Collections.singletonMap(newPolicy.getName(), new LifecyclePolicyMetadata(newPolicy, headers)); + IndexLifecycleMetadata lifecycleMetadata = new IndexLifecycleMetadata(policyMap, OperationMode.RUNNING); MetaData metaData = MetaData.builder() .persistentSettings(settings(Version.CURRENT).build()) - .putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(policyMap, OperationMode.RUNNING)) + .putCustom(IndexLifecycleMetadata.TYPE, lifecycleMetadata) .build(); String nodeId = randomAlphaOfLength(10); DiscoveryNode masterNode = DiscoveryNode.createLocal(settings(Version.CURRENT) @@ -183,18 +185,15 @@ public void testUpdateChangedPolicy() { .build(); PolicyStepsRegistry registry = new PolicyStepsRegistry(); // add new policy - registry.update(currentState, client, () -> 0L); + registry.update(lifecycleMetadata, client, () -> 0L); // swap out policy newPolicy = LifecyclePolicyTests.randomLifecyclePolicy(policyName); + lifecycleMetadata = new IndexLifecycleMetadata(Collections.singletonMap(policyName, + new LifecyclePolicyMetadata(newPolicy, Collections.emptyMap())), OperationMode.RUNNING); currentState = ClusterState.builder(currentState) - .metaData( - MetaData.builder(metaData) - .putCustom(IndexLifecycleMetadata.TYPE, - new IndexLifecycleMetadata(Collections.singletonMap(policyName, - new LifecyclePolicyMetadata(newPolicy, Collections.emptyMap())), OperationMode.RUNNING))) - .build(); - registry.update(currentState, client, () -> 0L); + .metaData(MetaData.builder(metaData).putCustom(IndexLifecycleMetadata.TYPE, lifecycleMetadata)).build(); + registry.update(lifecycleMetadata, client, () -> 0L); // TODO(talevy): assert changes... right now we do not support updates to policies. will require internal cleanup } }