From 3c90f950ecd5fb7cde1327ad4a679169c2f37e11 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Thu, 24 Mar 2022 14:45:52 -0400 Subject: [PATCH 1/5] Make this message a bit more helpful --- .../java/org/elasticsearch/xpack/ilm/PolicyStepsRegistry.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/PolicyStepsRegistry.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/PolicyStepsRegistry.java index a60868a6777eb..fcc21948a8494 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/PolicyStepsRegistry.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/PolicyStepsRegistry.java @@ -370,7 +370,8 @@ public Step getStep(final IndexMetadata indexMetadata, final Step.StepKey stepKe cachedSteps.put(indexMetadata.getIndex(), Tuple.tuple(indexMetadata, s)); // assert that the cache works as expected -- that is, if we put something into the cache, // we should get back the same thing if we were to invoke getStep again with the same arguments - assert s == getCachedStep(indexMetadata, stepKey) : "policy step registry cache failed sanity check"; + Step found = getCachedStep(indexMetadata, stepKey); + assert s == found : "policy step registry cache failed sanity check (expected [" + s + "], found [" + found + "])"; } return s; } From af7fb2b7608f31e2a243f51abfc42bf1bd552fdf Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Thu, 24 Mar 2022 14:53:01 -0400 Subject: [PATCH 2/5] Add a multithreaded stress test of the steps cache --- .../xpack/ilm/PolicyStepsRegistryTests.java | 69 +++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/PolicyStepsRegistryTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/PolicyStepsRegistryTests.java index 1eb5dbabf8be2..f5a59298f1198 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/PolicyStepsRegistryTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/PolicyStepsRegistryTests.java @@ -50,6 +50,8 @@ import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.cluster.metadata.LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY; import static org.hamcrest.Matchers.containsString; @@ -465,4 +467,71 @@ public void testUpdatePolicyButNoPhaseChangeIndexStepsDontChange() throws Except assertThat(((ShrinkStep) shrinkStep).getNumberOfShards(), equalTo(2)); assertThat(((ShrinkStep) gotStep).getNumberOfShards(), equalTo(1)); } + + public void testGetStepMultithreaded() throws Exception { + Client client = mock(Client.class); + Mockito.when(client.settings()).thenReturn(Settings.EMPTY); + + LifecyclePolicy policy = LifecyclePolicyTests.randomTimeseriesLifecyclePolicyWithAllPhases("policy"); + String phaseName = randomFrom(policy.getPhases().keySet()); + Phase phase = policy.getPhases().get(phaseName); + + LifecycleExecutionState lifecycleState = LifecycleExecutionState.builder() + .setPhaseDefinition(Strings.toString(new PhaseExecutionInfo(policy.getName(), phase, 1, randomNonNegativeLong()))) + .build(); + IndexMetadata indexMetadata = IndexMetadata.builder("test") + .settings( + Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.version.created", Version.CURRENT) + .put(LifecycleSettings.LIFECYCLE_NAME, "policy") + .build() + ) + .putCustom(ILM_CUSTOM_METADATA_KEY, lifecycleState.asMap()) + .build(); + + SortedMap metas = new TreeMap<>(); + metas.put("policy", new LifecyclePolicyMetadata(policy, Collections.emptyMap(), 1, randomNonNegativeLong())); + IndexLifecycleMetadata meta = new IndexLifecycleMetadata(metas, OperationMode.RUNNING); + + PolicyStepsRegistry registry = new PolicyStepsRegistry(REGISTRY, client, null); + registry.update(meta); + + // test a variety of getStep calls with random actions and steps + for (int i = 0; i < scaledRandomIntBetween(100, 1000); i++) { + LifecycleAction action = randomValueOtherThan(MigrateAction.DISABLED, () -> randomFrom(phase.getActions().values())); + Step step = randomFrom(action.toSteps(client, phaseName, MOCK_STEP_KEY, null)); + Step actualStep = registry.getStep(indexMetadata, step.getKey()); + assertThat(actualStep.getKey(), equalTo(step.getKey())); + } + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicBoolean done = new AtomicBoolean(false); + + // now, in another thread, update the registry repeatedly as fast as possible. + // updating the registry has the side effect of clearing the cache. + new Thread(() -> { + latch.countDown(); // signal that we're starting + while (done.get() == false) { + registry.update(meta); + } + }).start(); + + try { + latch.await(); // wait until the other thread started + + // and, while the cache is being repeatedly cleared, + // test a variety of getStep calls with random actions and steps + for (int i = 0; i < scaledRandomIntBetween(100, 1000); i++) { + LifecycleAction action = randomValueOtherThan(MigrateAction.DISABLED, () -> randomFrom(phase.getActions().values())); + Step step = randomFrom(action.toSteps(client, phaseName, MOCK_STEP_KEY, null)); + Step actualStep = registry.getStep(indexMetadata, step.getKey()); + assertThat(actualStep.getKey(), equalTo(step.getKey())); + } + } finally { + // tell the other thread we're finished + done.set(true); + } + } } From a2fda5a86f665d193f9c1a0fa64f34abc1ab1cce Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Thu, 24 Mar 2022 15:46:57 -0400 Subject: [PATCH 3/5] Drop this assert It doesn't work as written because it expects the cachedSteps map to be the same between the call to the put and the call to getCachedStep -- but of course another thread could have put a new policy, which results in PolicyStepsRegistry#update being called and the cache being cleared (in which case the second call returns null). --- .../java/org/elasticsearch/xpack/ilm/PolicyStepsRegistry.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/PolicyStepsRegistry.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/PolicyStepsRegistry.java index fcc21948a8494..1f5c021755fe4 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/PolicyStepsRegistry.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/PolicyStepsRegistry.java @@ -368,10 +368,6 @@ public Step getStep(final IndexMetadata indexMetadata, final Step.StepKey stepKe final Step s = phaseSteps.stream().filter(step -> step.getKey().equals(stepKey)).findFirst().orElse(null); if (s != null) { cachedSteps.put(indexMetadata.getIndex(), Tuple.tuple(indexMetadata, s)); - // assert that the cache works as expected -- that is, if we put something into the cache, - // we should get back the same thing if we were to invoke getStep again with the same arguments - Step found = getCachedStep(indexMetadata, stepKey); - assert s == found : "policy step registry cache failed sanity check (expected [" + s + "], found [" + found + "])"; } return s; } From 0b4e18c247a3dc8c93f85957df7cb3eb58ad7189 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Thu, 24 Mar 2022 16:53:42 -0400 Subject: [PATCH 4/5] Wait 1s for the background thread to die --- .../elasticsearch/xpack/ilm/PolicyStepsRegistryTests.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/PolicyStepsRegistryTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/PolicyStepsRegistryTests.java index f5a59298f1198..1fbd5540f533e 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/PolicyStepsRegistryTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/PolicyStepsRegistryTests.java @@ -511,12 +511,13 @@ public void testGetStepMultithreaded() throws Exception { // now, in another thread, update the registry repeatedly as fast as possible. // updating the registry has the side effect of clearing the cache. - new Thread(() -> { + Thread t = new Thread(() -> { latch.countDown(); // signal that we're starting while (done.get() == false) { registry.update(meta); } - }).start(); + }); + t.start(); try { latch.await(); // wait until the other thread started @@ -530,8 +531,9 @@ public void testGetStepMultithreaded() throws Exception { assertThat(actualStep.getKey(), equalTo(step.getKey())); } } finally { - // tell the other thread we're finished + // tell the other thread we're finished and wait for it to die done.set(true); + t.join(1000); } } } From ae5df8433e045be8cde097dd851274bd04d605b1 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Mon, 28 Mar 2022 09:26:20 -0400 Subject: [PATCH 5/5] Add a comment --- .../org/elasticsearch/xpack/ilm/PolicyStepsRegistryTests.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/PolicyStepsRegistryTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/PolicyStepsRegistryTests.java index 1fbd5540f533e..fb16414c5e752 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/PolicyStepsRegistryTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/PolicyStepsRegistryTests.java @@ -502,6 +502,9 @@ public void testGetStepMultithreaded() throws Exception { for (int i = 0; i < scaledRandomIntBetween(100, 1000); i++) { LifecycleAction action = randomValueOtherThan(MigrateAction.DISABLED, () -> randomFrom(phase.getActions().values())); Step step = randomFrom(action.toSteps(client, phaseName, MOCK_STEP_KEY, null)); + // if the step's key is different from the previous iteration of the loop, then the cache will be updated, and we'll + // get a non-cached response. if the step's key happens to be the same as the previous iteration of the loop, then + // we'll get a cached response. so this loop randomly tests both cached and non-cached responses. Step actualStep = registry.getStep(indexMetadata, step.getKey()); assertThat(actualStep.getKey(), equalTo(step.getKey())); }