From d3e77d4e3015b1d6c236fc11cb93a115791e42df Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Thu, 15 Jun 2017 12:00:41 -0400 Subject: [PATCH 1/5] TemplateUpgraders should be called during rolling restart In #24379 we added ability to upgrade templates on full cluster startup. This PR invokes the same update procedure also when a new node first joins the cluster allowing to update templates on a full cluster restart. Closes #24680 --- .../delete/DeleteIndexTemplateResponse.java | 2 +- .../metadata/IndexTemplateMetaData.java | 9 +- .../metadata/TemplateUpgradeService.java | 218 ++++++++++++ .../java/org/elasticsearch/node/Node.java | 4 + .../metadata/TemplateUpgradeServiceIT.java | 161 +++++++++ .../metadata/TemplateUpgradeServiceTests.java | 330 ++++++++++++++++++ 6 files changed, 722 insertions(+), 2 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java create mode 100644 core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceIT.java create mode 100644 core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/template/delete/DeleteIndexTemplateResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/template/delete/DeleteIndexTemplateResponse.java index 5c2a2b166bc3c..9519f0f9fcf4f 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/template/delete/DeleteIndexTemplateResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/template/delete/DeleteIndexTemplateResponse.java @@ -32,7 +32,7 @@ public class DeleteIndexTemplateResponse extends AcknowledgedResponse { DeleteIndexTemplateResponse() { } - DeleteIndexTemplateResponse(boolean acknowledged) { + protected DeleteIndexTemplateResponse(boolean acknowledged) { super(acknowledged); } diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java index b22106d97107a..838987224c3ac 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java @@ -387,6 +387,14 @@ public static void toXContent(IndexTemplateMetaData indexTemplateMetaData, XCont throws IOException { builder.startObject(indexTemplateMetaData.name()); + toInnerXContent(indexTemplateMetaData, builder, params); + + builder.endObject(); + } + + public static void toInnerXContent(IndexTemplateMetaData indexTemplateMetaData, XContentBuilder builder, ToXContent.Params params) + throws IOException { + builder.field("order", indexTemplateMetaData.order()); if (indexTemplateMetaData.version() != null) { builder.field("version", indexTemplateMetaData.version()); @@ -431,7 +439,6 @@ public static void toXContent(IndexTemplateMetaData indexTemplateMetaData, XCont } builder.endObject(); - builder.endObject(); } public static IndexTemplateMetaData fromXContent(XContentParser parser, String templateName) throws IOException { diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java new file mode 100644 index 0000000000000..496d5726dbc96 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java @@ -0,0 +1,218 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.metadata; + +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest; +import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateResponse; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.indices.IndexTemplateMissingException; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.UnaryOperator; + +import static java.util.Collections.singletonMap; + +/** + * Upgrades Templates on behalf of installed {@link Plugin}s when a node joins the cluster + */ +public class TemplateUpgradeService extends AbstractComponent implements ClusterStateListener { + private final Tuple, Set> EMPTY = new Tuple<>(Collections.emptyMap(), Collections.emptySet()); + + private final UnaryOperator> indexTemplateMetaDataUpgraders; + + public final ClusterService clusterService; + + public final ThreadPool threadPool; + + public final Client client; + + private final AtomicInteger updatesInProgress = new AtomicInteger(); + + private ImmutableOpenMap lastTemplateMetaData; + + public TemplateUpgradeService(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool, + Collection>> indexTemplateMetaDataUpgraders) { + super(settings); + this.client = client; + this.clusterService = clusterService; + this.threadPool = threadPool; + this.indexTemplateMetaDataUpgraders = templates -> { + Map upgradedTemplates = new HashMap<>(templates); + for (UnaryOperator> upgrader : indexTemplateMetaDataUpgraders) { + upgradedTemplates = upgrader.apply(upgradedTemplates); + } + return upgradedTemplates; + }; + clusterService.addListener(this); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + ClusterState state = event.state(); + if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + // wait until the gateway has recovered from disk, otherwise we think may not have the index templates, + // while they actually do exist + return; + } + + if (updatesInProgress.get() > 0) { + // we are already running some updates - skip this cluster state update + return; + } + + ImmutableOpenMap templates = state.getMetaData().getTemplates(); + + if (templates == lastTemplateMetaData) { + // we already checked these sets of templates - no reason to check it again + // we can do identity check here because due to cluster state diffs the actual map will not change + // if there were no changes + return; + } + + lastTemplateMetaData = templates; + Tuple, Set> changes = calculateTemplateChanges(templates); + if (changes.v1().isEmpty() && changes.v2().isEmpty()) { + // no changes required + return; + } + + if (updatesInProgress.compareAndSet(0, changes.v1().size() + changes.v2().size())) { + threadPool.generic().execute(() -> updateTemplates(changes.v1(), changes.v2())); + } + } + + void updateTemplates(Map changes, Set deletions) { + for (Map.Entry change : changes.entrySet()) { + PutIndexTemplateRequest request = new PutIndexTemplateRequest(change.getKey()).source(change.getValue(), XContentType.JSON); + request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); + client.admin().indices().putTemplate(request, new ActionListener() { + @Override + public void onResponse(PutIndexTemplateResponse response) { + updatesInProgress.decrementAndGet(); + if (response.isAcknowledged() == false) { + logger.warn("Error updating template [{}], request was not acknowledged", change.getKey()); + } + } + + @Override + public void onFailure(Exception e) { + updatesInProgress.decrementAndGet(); + logger.warn(new ParameterizedMessage("Error updating template [{}]", change.getKey()), e); + } + }); + } + + for (String template : deletions) { + DeleteIndexTemplateRequest request = new DeleteIndexTemplateRequest(template); + request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); + client.admin().indices().deleteTemplate(request, new ActionListener() { + @Override + public void onResponse(DeleteIndexTemplateResponse response) { + updatesInProgress.decrementAndGet(); + if (response.isAcknowledged() == false) { + logger.warn("Error deleting template [{}], request was not acknowledged", template); + } + } + + @Override + public void onFailure(Exception e) { + updatesInProgress.decrementAndGet(); + if (e instanceof IndexTemplateMissingException == false) { + // we might attempt to delete the same template from different nodes - so that's ok if template doesn't exist + // otherwise we need to warn + logger.warn(new ParameterizedMessage("Error deleting template [{}]", template), e); + } + } + }); + } + } + + boolean hasUpdatesInProgress() { + return updatesInProgress.get() > 0; + } + + Tuple, Set> calculateTemplateChanges(ImmutableOpenMap templates) { + // collect current templates + Map existingMap = new HashMap<>(); + for (ObjectObjectCursor customCursor : templates) { + existingMap.put(customCursor.key, customCursor.value); + } + // upgrade global custom meta data + Map upgradedMap = indexTemplateMetaDataUpgraders.apply(existingMap); + if (upgradedMap.equals(existingMap) == false) { + Set deletes = new HashSet<>(); + Map changes = new HashMap<>(); + // remove templates if needed + existingMap.keySet().forEach(s -> { + if (upgradedMap.containsKey(s) == false) { + deletes.add(s); + } + }); + upgradedMap.forEach((key, value) -> { + if (value.equals(existingMap.get(key)) == false) { + changes.put(key, toBytesReference(value)); + } + }); + return new Tuple<>(changes, deletes); + } + return EMPTY; + } + + private static final ToXContent.Params PARAMS = new ToXContent.MapParams(singletonMap("reduce_mappings", "true")); + + private BytesReference toBytesReference(IndexTemplateMetaData templateMetaData) { + try { + return XContentHelper.toXContent((builder, params) -> { + IndexTemplateMetaData.Builder.toInnerXContent(templateMetaData, builder, params); + return builder; + }, XContentType.JSON, PARAMS, false); + } catch (IOException ex) { + throw new IllegalStateException("Cannot serialize template [" + templateMetaData.getName() + "]", ex); + } + } +} diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index 13c829844e197..9070b1b50ece3 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -47,6 +47,7 @@ import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; +import org.elasticsearch.cluster.metadata.TemplateUpgradeService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.service.ClusterService; @@ -415,6 +416,8 @@ protected Node(final Environment environment, Collection Collection> indexMetaDataUpgraders = pluginsService.filterPlugins(Plugin.class).stream() .map(Plugin::getIndexMetaDataUpgrader).collect(Collectors.toList()); final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(customMetaDataUpgraders, indexTemplateMetaDataUpgraders); + final TemplateUpgradeService templateUpgradeService = new TemplateUpgradeService(settings, client, clusterService, threadPool, + indexTemplateMetaDataUpgraders); final Transport transport = networkModule.getTransportSupplier().get(); final TransportService transportService = newTransportService(settings, transport, threadPool, networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings()); @@ -460,6 +463,7 @@ protected Node(final Environment environment, Collection b.bind(UsageService.class).toInstance(usageService); b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry); b.bind(MetaDataUpgrader.class).toInstance(metaDataUpgrader); + b.bind(TemplateUpgradeService.class).toInstance(templateUpgradeService); b.bind(MetaStateService.class).toInstance(metaStateService); b.bind(IndicesService.class).toInstance(indicesService); b.bind(SearchService.class).toInstance(newSearchService(clusterService, indicesService, diff --git a/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceIT.java b/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceIT.java new file mode 100644 index 0000000000000..b4ef32caafbfa --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceIT.java @@ -0,0 +1,161 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.metadata; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.UnaryOperator; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) +public class TemplateUpgradeServiceIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Collections.singletonList(TestPlugin.class); + } + + public static class TestPlugin extends Plugin { + static final Setting UPDATE_TEMPLATE_ENABLED = + Setting.boolSetting("tests.update_template_enabled", false, Setting.Property.NodeScope); + + protected final Logger logger; + protected final Settings settings; + + public TestPlugin(Settings settings) { + this.logger = Loggers.getLogger(getClass(), settings); + this.settings = settings; + } + + @Override + public UnaryOperator> getIndexTemplateMetaDataUpgrader() { + return templates -> { + if (UPDATE_TEMPLATE_ENABLED.get(settings)) { + templates.put("test_added_template", IndexTemplateMetaData.builder("test_added_template") + .patterns(Collections.singletonList("*")).build()); + templates.remove("test_removed_template"); + templates.put("test_changed_template", IndexTemplateMetaData.builder("test_changed_template").order(10) + .patterns(Collections.singletonList("*")).build()); + } + return templates; + }; + } + + @Override + public List> getSettings() { + return Collections.singletonList(UPDATE_TEMPLATE_ENABLED); + } + } + + + public void testTemplateUpdate() throws Exception { + assertThat(client().admin().indices().prepareGetTemplates("test_*").get().getIndexTemplates(), empty()); + assertAcked(client().admin().indices().preparePutTemplate("test_dummy_template").setOrder(0) + .setPatterns(Collections.singletonList("*")).get()); + assertAcked(client().admin().indices().preparePutTemplate("test_changed_template").setOrder(0) + .setPatterns(Collections.singletonList("*")).get()); + assertAcked(client().admin().indices().preparePutTemplate("test_removed_template").setOrder(1) + .setPatterns(Collections.singletonList("*")).get()); + String upgradeNode = internalCluster().startNode(Settings.builder() + .put(nodeSettings(0)).put(TestPlugin.UPDATE_TEMPLATE_ENABLED.getKey(), true)); + // Waiting for the templates to be updated by the newly added node + + assertBusy(() -> { + List templates = client().admin().indices().prepareGetTemplates("test_*").get().getIndexTemplates(); + assertThat(templates.size(), equalTo(3)); + boolean addedFound = false; + boolean changedFound = false; + boolean dummyFound = false; + for (int i = 0; i < 3; i++) { + IndexTemplateMetaData templateMetaData = templates.get(i); + switch (templateMetaData.getName()) { + case "test_added_template": + assertFalse(addedFound); + addedFound = true; + break; + case "test_changed_template": + assertFalse(changedFound); + changedFound = true; + assertThat(templateMetaData.getOrder(), equalTo(10)); + break; + case "test_dummy_template": + assertFalse(dummyFound); + dummyFound = true; + break; + default: + fail("unexpected template " + templateMetaData.getName()); + break; + } + } + + assertTrue(addedFound); + assertTrue(changedFound); + assertTrue(dummyFound); + }); + + // Wait for upgradeNode to finish all updates + TemplateUpgradeService service = internalCluster().getInstance(TemplateUpgradeService.class, upgradeNode); + assertBusy(() -> assertFalse(service.hasUpdatesInProgress())); + + // Wipe out all templates + assertAcked(client().admin().indices().prepareDeleteTemplate("test_*").get()); + + // Make sure all templates are recreated correctly + assertBusy(() -> { + List templates = client().admin().indices().prepareGetTemplates("test_*").get().getIndexTemplates(); + assertThat(templates.size(), equalTo(2)); + boolean addedFound = false; + boolean changedFound = false; + for (int i = 0; i < 2; i++) { + IndexTemplateMetaData templateMetaData = templates.get(i); + switch (templateMetaData.getName()) { + case "test_added_template": + assertFalse(addedFound); + addedFound = true; + break; + case "test_changed_template": + assertFalse(changedFound); + changedFound = true; + assertThat(templateMetaData.getOrder(), equalTo(10)); + break; + default: + fail("unexpected template " + templateMetaData.getName()); + break; + } + } + + assertTrue(addedFound); + assertTrue(changedFound); + }); + } + +} diff --git a/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java new file mode 100644 index 0000000000000..75d3f83eaba74 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java @@ -0,0 +1,330 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.metadata; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest; +import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateResponse; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; +import org.elasticsearch.client.AdminClient; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.IndicesAdminClient; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.CoreMatchers.startsWith; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TemplateUpgradeServiceTests extends ESTestCase { + + private ClusterService clusterService = new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings + .BUILT_IN_CLUSTER_SETTINGS), null); + + public void testCalculateChangesAddChangeAndDelete() { + + boolean shouldAdd = randomBoolean(); + boolean shouldRemove = randomBoolean(); + boolean shouldChange = randomBoolean(); + + MetaData metaData = randomMetaData( + IndexTemplateMetaData.builder("user_template").build(), + IndexTemplateMetaData.builder("removed_test_template").build(), + IndexTemplateMetaData.builder("changed_test_template").build() + ); + + TemplateUpgradeService service = new TemplateUpgradeService(Settings.EMPTY, null, clusterService, null, + Arrays.asList( + templates -> { + if (shouldAdd) { + assertNull(templates.put("added_test_template", IndexTemplateMetaData.builder("added_test_template").build())); + } + return templates; + }, + templates -> { + if (shouldRemove) { + assertNotNull(templates.remove("removed_test_template")); + } + return templates; + }, + templates -> { + if (shouldChange) { + assertNotNull(templates.put("changed_test_template", + IndexTemplateMetaData.builder("changed_test_template").order(10).build())); + } + return templates; + } + )); + + Tuple, Set> changes = service.calculateTemplateChanges(metaData.templates()); + + if (shouldAdd) { + assertThat(changes.v1().get("added_test_template"), notNullValue()); + if (shouldChange) { + assertThat(changes.v1().keySet(), hasSize(2)); + assertThat(changes.v1().get("changed_test_template"), notNullValue()); + } else { + assertThat(changes.v1().keySet(), hasSize(1)); + } + } else { + if (shouldChange) { + assertThat(changes.v1().get("changed_test_template"), notNullValue()); + assertThat(changes.v1().keySet(), hasSize(1)); + } else { + assertThat(changes.v1().keySet(), empty()); + } + } + + if (shouldRemove) { + assertThat(changes.v2(), hasSize(1)); + assertThat(changes.v2().contains("removed_test_template"), equalTo(true)); + } else { + assertThat(changes.v2(), empty()); + } + } + + @SuppressWarnings("unchecked") + public void testUpdateTemplates() { + int additionsCount = randomIntBetween(0, 5); + int deletionsCount = randomIntBetween(0, 3); + + List> putTemplateListeners = new ArrayList<>(); + List> deleteTemplateListeners = new ArrayList<>(); + + Client mockClient = mock(Client.class); + AdminClient mockAdminClient = mock(AdminClient.class); + IndicesAdminClient mockIndicesAdminClient = mock(IndicesAdminClient.class); + when(mockClient.admin()).thenReturn(mockAdminClient); + when(mockAdminClient.indices()).thenReturn(mockIndicesAdminClient); + + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + assert args.length == 2; + PutIndexTemplateRequest request = (PutIndexTemplateRequest) args[0]; + assertThat(request.name(), equalTo("add_template_" + request.order())); + putTemplateListeners.add((ActionListener) args[1]); + return null; + }).when(mockIndicesAdminClient).putTemplate(any(PutIndexTemplateRequest.class), any(ActionListener.class)); + + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + assert args.length == 2; + DeleteIndexTemplateRequest request = (DeleteIndexTemplateRequest) args[0]; + assertThat(request.name(), startsWith("remove_template_")); + deleteTemplateListeners.add((ActionListener) args[1]); + return null; + }).when(mockIndicesAdminClient).deleteTemplate(any(DeleteIndexTemplateRequest.class), any(ActionListener.class)); + + + Set deletions = new HashSet<>(deletionsCount); + for (int i = 0; i < deletionsCount; i++) { + deletions.add("remove_template_" + i); + } + Map additions = new HashMap<>(additionsCount); + for (int i = 0; i < additionsCount; i++) { + additions.put("add_template_" + i, new BytesArray("{\"index_patterns\" : \"*\", \"order\" : " + i + "}")); + } + + TemplateUpgradeService service = new TemplateUpgradeService(Settings.EMPTY, mockClient, clusterService, null, + Collections.emptyList()); + + service.updateTemplates(additions, deletions); + + for (int i = 0; i < additionsCount; i++) { + if (randomBoolean()) { + putTemplateListeners.get(i).onFailure(new RuntimeException()); + } else { + putTemplateListeners.get(i).onResponse(new PutIndexTemplateResponse(randomBoolean()) { + + }); + } + } + + for (int i = 0; i < deletionsCount; i++) { + if (randomBoolean()) { + deleteTemplateListeners.get(i).onFailure(new RuntimeException()); + } else { + deleteTemplateListeners.get(i).onResponse(new DeleteIndexTemplateResponse(randomBoolean()) { + + }); + } + } + } + + @SuppressWarnings("unchecked") + public void testClusterStateUpdate() { + + AtomicReference> addedListener = new AtomicReference<>(); + AtomicReference> changedListener = new AtomicReference<>(); + AtomicReference> removedListener = new AtomicReference<>(); + AtomicInteger updateInvocation = new AtomicInteger(); + + MetaData metaData = randomMetaData( + IndexTemplateMetaData.builder("user_template").build(), + IndexTemplateMetaData.builder("removed_test_template").build(), + IndexTemplateMetaData.builder("changed_test_template").build() + ); + + ThreadPool threadPool = mock(ThreadPool.class); + ExecutorService executorService = mock(ExecutorService.class); + when(threadPool.generic()).thenReturn(executorService); + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + assert args.length == 1; + Runnable runnable = (Runnable) args[0]; + runnable.run(); + updateInvocation.incrementAndGet(); + return null; + }).when(executorService).execute(any(Runnable.class)); + + Client mockClient = mock(Client.class); + AdminClient mockAdminClient = mock(AdminClient.class); + IndicesAdminClient mockIndicesAdminClient = mock(IndicesAdminClient.class); + when(mockClient.admin()).thenReturn(mockAdminClient); + when(mockAdminClient.indices()).thenReturn(mockIndicesAdminClient); + + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + assert args.length == 2; + PutIndexTemplateRequest request = (PutIndexTemplateRequest) args[0]; + if (request.name().equals("added_test_template")) { + assertThat(addedListener.getAndSet((ActionListener) args[1]), nullValue()); + } else if (request.name().equals("changed_test_template")) { + assertThat(changedListener.getAndSet((ActionListener) args[1]), nullValue()); + } else { + fail("unexpected put template call for " + request.name()); + } + return null; + }).when(mockIndicesAdminClient).putTemplate(any(PutIndexTemplateRequest.class), any(ActionListener.class)); + + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + assert args.length == 2; + DeleteIndexTemplateRequest request = (DeleteIndexTemplateRequest) args[0]; + assertThat(request.name(), startsWith("removed_test_template")); + assertThat(removedListener.getAndSet((ActionListener) args[1]), nullValue()); + return null; + }).when(mockIndicesAdminClient).deleteTemplate(any(DeleteIndexTemplateRequest.class), any(ActionListener.class)); + + + TemplateUpgradeService service = new TemplateUpgradeService(Settings.EMPTY, mockClient, clusterService, threadPool, + Arrays.asList( + templates -> { + assertNull(templates.put("added_test_template", IndexTemplateMetaData.builder("added_test_template") + .patterns(Collections.singletonList("*")).build())); + return templates; + }, + templates -> { + assertNotNull(templates.remove("removed_test_template")); + return templates; + }, + templates -> { + assertNotNull(templates.put("changed_test_template", IndexTemplateMetaData.builder("changed_test_template") + .patterns(Collections.singletonList("*")).order(10).build())); + return templates; + } + )); + + ClusterState prevState = ClusterState.EMPTY_STATE; + ClusterState state = ClusterState.builder(prevState).metaData(metaData).build(); + service.clusterChanged(new ClusterChangedEvent("test", state, prevState)); + + assertThat(updateInvocation.get(), equalTo(1)); + assertThat(addedListener.get(), notNullValue()); + assertThat(changedListener.get(), notNullValue()); + assertThat(removedListener.get(), notNullValue()); + + prevState = state; + state = ClusterState.builder(prevState).metaData(MetaData.builder(state.metaData()).removeTemplate("user_template")).build(); + service.clusterChanged(new ClusterChangedEvent("test 2", state, prevState)); + + // Make sure that update wasn't invoked since we are still running + assertThat(updateInvocation.get(), equalTo(1)); + + addedListener.getAndSet(null).onResponse(new PutIndexTemplateResponse(true) { + }); + changedListener.getAndSet(null).onResponse(new PutIndexTemplateResponse(true) { + }); + removedListener.getAndSet(null).onResponse(new DeleteIndexTemplateResponse(true) { + }); + + service.clusterChanged(new ClusterChangedEvent("test 3", state, prevState)); + + // Make sure that update was called this time since we are no longer running + assertThat(updateInvocation.get(), equalTo(2)); + + addedListener.getAndSet(null).onFailure(new RuntimeException()); + changedListener.getAndSet(null).onFailure(new RuntimeException()); + removedListener.getAndSet(null).onFailure(new RuntimeException()); + + service.clusterChanged(new ClusterChangedEvent("test 3", state, prevState)); + + // Make sure that update wasn't called this time since the index template metadata didn't change + assertThat(updateInvocation.get(), equalTo(2)); + + } + + public static MetaData randomMetaData(IndexTemplateMetaData... templates) { + MetaData.Builder builder = MetaData.builder(); + for (IndexTemplateMetaData template : templates) { + builder.put(template); + } + for (int i = 0; i < randomIntBetween(1, 5); i++) { + builder.put( + IndexMetaData.builder(randomAlphaOfLength(10)) + .settings(settings(Version.CURRENT)) + .numberOfReplicas(randomIntBetween(0, 3)) + .numberOfShards(randomIntBetween(1, 5)) + ); + } + return builder.build(); + } + +} From 8d141d0aa40a4bfac9dc46a2a46532a3f4ebf009 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Thu, 15 Jun 2017 15:57:21 -0400 Subject: [PATCH 2/5] Remove TemplateUpgradeService from guice --- .../metadata/TemplateUpgradeService.java | 4 -- .../java/org/elasticsearch/node/Node.java | 4 +- .../metadata/TemplateUpgradeServiceIT.java | 39 +++++++++++++++---- 3 files changed, 33 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java index 496d5726dbc96..547ab9b528997 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java @@ -172,10 +172,6 @@ public void onFailure(Exception e) { } } - boolean hasUpdatesInProgress() { - return updatesInProgress.get() > 0; - } - Tuple, Set> calculateTemplateChanges(ImmutableOpenMap templates) { // collect current templates Map existingMap = new HashMap<>(); diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index 9070b1b50ece3..0945d58e4538a 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -416,8 +416,7 @@ protected Node(final Environment environment, Collection Collection> indexMetaDataUpgraders = pluginsService.filterPlugins(Plugin.class).stream() .map(Plugin::getIndexMetaDataUpgrader).collect(Collectors.toList()); final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(customMetaDataUpgraders, indexTemplateMetaDataUpgraders); - final TemplateUpgradeService templateUpgradeService = new TemplateUpgradeService(settings, client, clusterService, threadPool, - indexTemplateMetaDataUpgraders); + new TemplateUpgradeService(settings, client, clusterService, threadPool, indexTemplateMetaDataUpgraders); final Transport transport = networkModule.getTransportSupplier().get(); final TransportService transportService = newTransportService(settings, transport, threadPool, networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings()); @@ -463,7 +462,6 @@ protected Node(final Environment environment, Collection b.bind(UsageService.class).toInstance(usageService); b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry); b.bind(MetaDataUpgrader.class).toInstance(metaDataUpgrader); - b.bind(TemplateUpgradeService.class).toInstance(templateUpgradeService); b.bind(MetaStateService.class).toInstance(metaStateService); b.bind(IndicesService.class).toInstance(indicesService); b.bind(SearchService.class).toInstance(newSearchService(clusterService, indicesService, diff --git a/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceIT.java b/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceIT.java index b4ef32caafbfa..695eadb901393 100644 --- a/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceIT.java @@ -20,16 +20,24 @@ package org.elasticsearch.cluster.metadata; import org.apache.logging.log4j.Logger; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.script.ScriptService; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.ResourceWatcherService; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.UnaryOperator; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -48,6 +56,10 @@ public static class TestPlugin extends Plugin { static final Setting UPDATE_TEMPLATE_ENABLED = Setting.boolSetting("tests.update_template_enabled", false, Setting.Property.NodeScope); + // This setting is used to simulate cluster state updates + static final Setting UPDATE_TEMPLATE_DUMMY_SETTING = + Setting.intSetting("tests.update_template_count", 0, Setting.Property.NodeScope, Setting.Property.Dynamic); + protected final Logger logger; protected final Settings settings; @@ -56,6 +68,16 @@ public TestPlugin(Settings settings) { this.settings = settings; } + @Override + public Collection createComponents(Client client, ClusterService clusterService, ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, ScriptService scriptService, + NamedXContentRegistry xContentRegistry) { + clusterService.getClusterSettings().addSettingsUpdateConsumer(UPDATE_TEMPLATE_DUMMY_SETTING, integer -> { + logger.debug("the template dummy setting was updated to {}", integer); + }); + return super.createComponents(client, clusterService, threadPool, resourceWatcherService, scriptService, xContentRegistry); + } + @Override public UnaryOperator> getIndexTemplateMetaDataUpgrader() { return templates -> { @@ -72,7 +94,7 @@ public UnaryOperator> getIndexTemplateMetaDat @Override public List> getSettings() { - return Collections.singletonList(UPDATE_TEMPLATE_ENABLED); + return Arrays.asList(UPDATE_TEMPLATE_ENABLED, UPDATE_TEMPLATE_DUMMY_SETTING); } } @@ -85,8 +107,7 @@ public void testTemplateUpdate() throws Exception { .setPatterns(Collections.singletonList("*")).get()); assertAcked(client().admin().indices().preparePutTemplate("test_removed_template").setOrder(1) .setPatterns(Collections.singletonList("*")).get()); - String upgradeNode = internalCluster().startNode(Settings.builder() - .put(nodeSettings(0)).put(TestPlugin.UPDATE_TEMPLATE_ENABLED.getKey(), true)); + internalCluster().startNode(Settings.builder().put(nodeSettings(0)).put(TestPlugin.UPDATE_TEMPLATE_ENABLED.getKey(), true)); // Waiting for the templates to be updated by the newly added node assertBusy(() -> { @@ -122,15 +143,19 @@ public void testTemplateUpdate() throws Exception { assertTrue(dummyFound); }); - // Wait for upgradeNode to finish all updates - TemplateUpgradeService service = internalCluster().getInstance(TemplateUpgradeService.class, upgradeNode); - assertBusy(() -> assertFalse(service.hasUpdatesInProgress())); - // Wipe out all templates assertAcked(client().admin().indices().prepareDeleteTemplate("test_*").get()); + AtomicInteger updateCount = new AtomicInteger(); // Make sure all templates are recreated correctly assertBusy(() -> { + logger.info("checking...."); + // the updates only happen on cluster state updates, so we need to make sure that the cluster state updates are happening + // so we need to simulate updates to make sure the template upgrade kicks in + assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings( + Settings.builder().put(TestPlugin.UPDATE_TEMPLATE_DUMMY_SETTING.getKey(), updateCount.incrementAndGet()) + ).get()); + List templates = client().admin().indices().prepareGetTemplates("test_*").get().getIndexTemplates(); assertThat(templates.size(), equalTo(2)); boolean addedFound = false; From cbe8ca540fb8d97ba8f02ff7fb5f8559aa9ff7a1 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Mon, 19 Jun 2017 14:49:36 -0400 Subject: [PATCH 3/5] Address @abeyad's comments --- .../metadata/IndexTemplateMetaData.java | 1 - .../metadata/TemplateUpgradeService.java | 31 +++++----- .../metadata/TemplateUpgradeServiceIT.java | 1 - .../metadata/TemplateUpgradeServiceTests.java | 59 +++++++++++-------- 4 files changed, 52 insertions(+), 40 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java index 838987224c3ac..cae2042f52f0f 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexTemplateMetaData.java @@ -438,7 +438,6 @@ public static void toInnerXContent(IndexTemplateMetaData indexTemplateMetaData, AliasMetaData.Builder.toXContent(cursor.value, builder, params); } builder.endObject(); - } public static IndexTemplateMetaData fromXContent(XContentParser parser, String templateName) throws IOException { diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java index 547ab9b528997..f978769a67c88 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java @@ -47,10 +47,10 @@ import java.io.IOException; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.UnaryOperator; @@ -61,8 +61,6 @@ * Upgrades Templates on behalf of installed {@link Plugin}s when a node joins the cluster */ public class TemplateUpgradeService extends AbstractComponent implements ClusterStateListener { - private final Tuple, Set> EMPTY = new Tuple<>(Collections.emptyMap(), Collections.emptySet()); - private final UnaryOperator> indexTemplateMetaDataUpgraders; public final ClusterService clusterService; @@ -115,20 +113,18 @@ public void clusterChanged(ClusterChangedEvent event) { } lastTemplateMetaData = templates; - Tuple, Set> changes = calculateTemplateChanges(templates); - if (changes.v1().isEmpty() && changes.v2().isEmpty()) { - // no changes required - return; - } - - if (updatesInProgress.compareAndSet(0, changes.v1().size() + changes.v2().size())) { - threadPool.generic().execute(() -> updateTemplates(changes.v1(), changes.v2())); + Optional, Set>> changes = calculateTemplateChanges(templates); + if (changes.isPresent()) { + if (updatesInProgress.compareAndSet(0, changes.get().v1().size() + changes.get().v2().size())) { + threadPool.generic().execute(() -> updateTemplates(changes.get().v1(), changes.get().v2())); + } } } void updateTemplates(Map changes, Set deletions) { for (Map.Entry change : changes.entrySet()) { - PutIndexTemplateRequest request = new PutIndexTemplateRequest(change.getKey()).source(change.getValue(), XContentType.JSON); + PutIndexTemplateRequest request = + new PutIndexTemplateRequest(change.getKey()).source(change.getValue(), XContentType.JSON); request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); client.admin().indices().putTemplate(request, new ActionListener() { @Override @@ -172,7 +168,12 @@ public void onFailure(Exception e) { } } - Tuple, Set> calculateTemplateChanges(ImmutableOpenMap templates) { + int getUpdatesInProgress() { + return updatesInProgress.get(); + } + + Optional, Set>> calculateTemplateChanges( + ImmutableOpenMap templates) { // collect current templates Map existingMap = new HashMap<>(); for (ObjectObjectCursor customCursor : templates) { @@ -194,9 +195,9 @@ Tuple, Set> calculateTemplateChanges(Immutab changes.put(key, toBytesReference(value)); } }); - return new Tuple<>(changes, deletes); + return Optional.of(new Tuple<>(changes, deletes)); } - return EMPTY; + return Optional.empty(); } private static final ToXContent.Params PARAMS = new ToXContent.MapParams(singletonMap("reduce_mappings", "true")); diff --git a/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceIT.java b/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceIT.java index 695eadb901393..72a4f881b4224 100644 --- a/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceIT.java @@ -149,7 +149,6 @@ public void testTemplateUpdate() throws Exception { AtomicInteger updateCount = new AtomicInteger(); // Make sure all templates are recreated correctly assertBusy(() -> { - logger.info("checking...."); // the updates only happen on cluster state updates, so we need to make sure that the cluster state updates are happening // so we need to simulate updates to make sure the template upgrade kicks in assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings( diff --git a/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java index 75d3f83eaba74..e46beb78986ae 100644 --- a/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java @@ -46,6 +46,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; @@ -65,9 +66,8 @@ public class TemplateUpgradeServiceTests extends ESTestCase { - private ClusterService clusterService = new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings - .BUILT_IN_CLUSTER_SETTINGS), null); - + private final ClusterService clusterService = new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, + ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null); public void testCalculateChangesAddChangeAndDelete() { boolean shouldAdd = randomBoolean(); @@ -103,33 +103,42 @@ public void testCalculateChangesAddChangeAndDelete() { } )); - Tuple, Set> changes = service.calculateTemplateChanges(metaData.templates()); - - if (shouldAdd) { - assertThat(changes.v1().get("added_test_template"), notNullValue()); - if (shouldChange) { - assertThat(changes.v1().keySet(), hasSize(2)); - assertThat(changes.v1().get("changed_test_template"), notNullValue()); + Optional, Set>> optChanges = + service.calculateTemplateChanges(metaData.templates()); + + if (shouldAdd || shouldRemove || shouldChange) { + Tuple, Set> changes = optChanges.orElseThrow(() -> + new AssertionError("Should have non empty changes")); + if (shouldAdd) { + assertThat(changes.v1().get("added_test_template"), notNullValue()); + if (shouldChange) { + assertThat(changes.v1().keySet(), hasSize(2)); + assertThat(changes.v1().get("changed_test_template"), notNullValue()); + } else { + assertThat(changes.v1().keySet(), hasSize(1)); + } } else { - assertThat(changes.v1().keySet(), hasSize(1)); + if (shouldChange) { + assertThat(changes.v1().get("changed_test_template"), notNullValue()); + assertThat(changes.v1().keySet(), hasSize(1)); + } else { + assertThat(changes.v1().keySet(), empty()); + } } - } else { - if (shouldChange) { - assertThat(changes.v1().get("changed_test_template"), notNullValue()); - assertThat(changes.v1().keySet(), hasSize(1)); + + if (shouldRemove) { + assertThat(changes.v2(), hasSize(1)); + assertThat(changes.v2().contains("removed_test_template"), equalTo(true)); } else { - assertThat(changes.v1().keySet(), empty()); + assertThat(changes.v2(), empty()); } - } - - if (shouldRemove) { - assertThat(changes.v2(), hasSize(1)); - assertThat(changes.v2().contains("removed_test_template"), equalTo(true)); } else { - assertThat(changes.v2(), empty()); + assertThat(optChanges.isPresent(), equalTo(false)); } } + + @SuppressWarnings("unchecked") public void testUpdateTemplates() { int additionsCount = randomIntBetween(0, 5); @@ -162,7 +171,6 @@ public void testUpdateTemplates() { return null; }).when(mockIndicesAdminClient).deleteTemplate(any(DeleteIndexTemplateRequest.class), any(ActionListener.class)); - Set deletions = new HashSet<>(deletionsCount); for (int i = 0; i < deletionsCount; i++) { deletions.add("remove_template_" + i); @@ -176,6 +184,10 @@ public void testUpdateTemplates() { Collections.emptyList()); service.updateTemplates(additions, deletions); + int updatesInProgress = service.getUpdatesInProgress(); + + assertThat(putTemplateListeners.size(), equalTo(additionsCount)); + assertThat(deleteTemplateListeners.size(), equalTo(deletionsCount)); for (int i = 0; i < additionsCount; i++) { if (randomBoolean()) { @@ -196,6 +208,7 @@ public void testUpdateTemplates() { }); } } + assertThat(updatesInProgress - service.getUpdatesInProgress(), equalTo(additionsCount + deletionsCount)); } @SuppressWarnings("unchecked") From e84c0782f7a1cb45e1429cf16fddec23d1f257d7 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Mon, 19 Jun 2017 20:31:50 -0400 Subject: [PATCH 4/5] Ensure that only one node will try to upgrade the templates at a time --- .../metadata/TemplateUpgradeService.java | 44 +++++++ .../cluster/node/DiscoveryNodes.java | 40 +++++-- .../metadata/TemplateUpgradeServiceIT.java | 29 ++--- .../metadata/TemplateUpgradeServiceTests.java | 108 ++++++++++++++++-- 4 files changed, 193 insertions(+), 28 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java index f978769a67c88..4b5d3b889945b 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java @@ -19,8 +19,10 @@ package org.elasticsearch.cluster.metadata; +import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest; import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateResponse; @@ -30,6 +32,8 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableOpenMap; @@ -112,6 +116,11 @@ public void clusterChanged(ClusterChangedEvent event) { return; } + if (shouldLocalNodeUpdateTemplates(state.nodes()) == false) { + return; + } + + lastTemplateMetaData = templates; Optional, Set>> changes = calculateTemplateChanges(templates); if (changes.isPresent()) { @@ -121,6 +130,41 @@ public void clusterChanged(ClusterChangedEvent event) { } } + /** + * Checks if the current node should update the templates + * + * If the master has the newest verison in the cluster - it will be dedicated template updater. + * Otherwise the node with the highest id among nodes with the highest version should update the templates + */ + boolean shouldLocalNodeUpdateTemplates(DiscoveryNodes nodes) { + DiscoveryNode localNode = nodes.getLocalNode(); + // Only data and master nodes should update the template + if (localNode.isDataNode() || localNode.isMasterNode()) { + Version maxVersion = nodes.getLargestNonClientNodeVersion(); + if (maxVersion.equals(nodes.getMasterNode().getVersion())) { + // If the master has the latest version - we will allow it to handle the update + return nodes.isLocalNodeElectedMaster(); + } else { + if (maxVersion.equals(localNode.getVersion()) == false) { + // The localhost node doesn't have the latest version - not going to update + return false; + } + for (ObjectCursor node : nodes.getMasterAndDataNodes().values()) { + if (node.value.getVersion().equals(maxVersion)) { + if (node.value.getId().compareTo(localNode.getId()) > 0) { + // We have a node with higher id then mine - it should update + return false; + } + } + } + // We have the highest version and highest id - we should perform the update + return true; + } + } else { + return false; + } + } + void updateTemplates(Map changes, Set deletions) { for (Map.Entry change : changes.entrySet()) { PutIndexTemplateRequest request = diff --git a/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java b/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java index c38f556a0a82e..2ed88aa1127c6 100644 --- a/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java +++ b/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java @@ -56,13 +56,14 @@ public class DiscoveryNodes extends AbstractDiffable implements private final String masterNodeId; private final String localNodeId; private final Version minNonClientNodeVersion; + private final Version maxNonClientNodeVersion; private final Version maxNodeVersion; private final Version minNodeVersion; private DiscoveryNodes(ImmutableOpenMap nodes, ImmutableOpenMap dataNodes, ImmutableOpenMap masterNodes, ImmutableOpenMap ingestNodes, - String masterNodeId, String localNodeId, Version minNonClientNodeVersion, Version maxNodeVersion, - Version minNodeVersion) { + String masterNodeId, String localNodeId, Version minNonClientNodeVersion, Version maxNonClientNodeVersion, + Version maxNodeVersion, Version minNodeVersion) { this.nodes = nodes; this.dataNodes = dataNodes; this.masterNodes = masterNodes; @@ -70,6 +71,7 @@ private DiscoveryNodes(ImmutableOpenMap nodes, ImmutableO this.masterNodeId = masterNodeId; this.localNodeId = localNodeId; this.minNonClientNodeVersion = minNonClientNodeVersion; + this.maxNonClientNodeVersion = maxNonClientNodeVersion; this.minNodeVersion = minNodeVersion; this.maxNodeVersion = maxNodeVersion; } @@ -234,12 +236,25 @@ public boolean isAllNodes(String... nodesIds) { /** * Returns the version of the node with the oldest version in the cluster that is not a client node * + * If there are no non-client nodes, Version.CURRENT will be returned. + * * @return the oldest version in the cluster */ public Version getSmallestNonClientNodeVersion() { return minNonClientNodeVersion; } + /** + * Returns the version of the node with the youngest version in the cluster that is not a client node. + * + * If there are no non-client nodes, Version.CURRENT will be returned. + * + * @return the youngest version in the cluster + */ + public Version getLargestNonClientNodeVersion() { + return maxNonClientNodeVersion; + } + /** * Returns the version of the node with the oldest version in the cluster. * @@ -252,7 +267,7 @@ public Version getMinNodeVersion() { /** * Returns the version of the node with the youngest version in the cluster * - * @return the oldest version in the cluster + * @return the youngest version in the cluster */ public Version getMaxNodeVersion() { return maxNodeVersion; @@ -654,15 +669,25 @@ public DiscoveryNodes build() { ImmutableOpenMap.Builder ingestNodesBuilder = ImmutableOpenMap.builder(); Version minNodeVersion = Version.CURRENT; Version maxNodeVersion = Version.CURRENT; - Version minNonClientNodeVersion = Version.CURRENT; + // The node where we are building this on might not be a master or a data node, so we cannot assume + // that there is a node with the current version as a part of the cluster. + Version minNonClientNodeVersion = null; + Version maxNonClientNodeVersion = null; for (ObjectObjectCursor nodeEntry : nodes) { if (nodeEntry.value.isDataNode()) { dataNodesBuilder.put(nodeEntry.key, nodeEntry.value); - minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion()); } if (nodeEntry.value.isMasterNode()) { masterNodesBuilder.put(nodeEntry.key, nodeEntry.value); - minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion()); + } + if (nodeEntry.value.isDataNode() || nodeEntry.value.isMasterNode()) { + if (minNonClientNodeVersion == null) { + minNonClientNodeVersion = nodeEntry.value.getVersion(); + maxNonClientNodeVersion = nodeEntry.value.getVersion(); + } else { + minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion()); + maxNonClientNodeVersion = Version.max(maxNonClientNodeVersion, nodeEntry.value.getVersion()); + } } if (nodeEntry.value.isIngestNode()) { ingestNodesBuilder.put(nodeEntry.key, nodeEntry.value); @@ -673,7 +698,8 @@ public DiscoveryNodes build() { return new DiscoveryNodes( nodes.build(), dataNodesBuilder.build(), masterNodesBuilder.build(), ingestNodesBuilder.build(), - masterNodeId, localNodeId, minNonClientNodeVersion, maxNodeVersion, minNodeVersion + masterNodeId, localNodeId, minNonClientNodeVersion == null ? Version.CURRENT : minNonClientNodeVersion, + maxNonClientNodeVersion == null ? Version.CURRENT : maxNonClientNodeVersion, maxNodeVersion, minNodeVersion ); } diff --git a/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceIT.java b/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceIT.java index 72a4f881b4224..52e1971126590 100644 --- a/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceIT.java @@ -53,9 +53,6 @@ protected Collection> nodePlugins() { } public static class TestPlugin extends Plugin { - static final Setting UPDATE_TEMPLATE_ENABLED = - Setting.boolSetting("tests.update_template_enabled", false, Setting.Property.NodeScope); - // This setting is used to simulate cluster state updates static final Setting UPDATE_TEMPLATE_DUMMY_SETTING = Setting.intSetting("tests.update_template_count", 0, Setting.Property.NodeScope, Setting.Property.Dynamic); @@ -81,35 +78,34 @@ public Collection createComponents(Client client, ClusterService cluster @Override public UnaryOperator> getIndexTemplateMetaDataUpgrader() { return templates -> { - if (UPDATE_TEMPLATE_ENABLED.get(settings)) { - templates.put("test_added_template", IndexTemplateMetaData.builder("test_added_template") - .patterns(Collections.singletonList("*")).build()); - templates.remove("test_removed_template"); - templates.put("test_changed_template", IndexTemplateMetaData.builder("test_changed_template").order(10) - .patterns(Collections.singletonList("*")).build()); - } + templates.put("test_added_template", IndexTemplateMetaData.builder("test_added_template") + .patterns(Collections.singletonList("*")).build()); + templates.remove("test_removed_template"); + templates.put("test_changed_template", IndexTemplateMetaData.builder("test_changed_template").order(10) + .patterns(Collections.singletonList("*")).build()); return templates; }; } @Override public List> getSettings() { - return Arrays.asList(UPDATE_TEMPLATE_ENABLED, UPDATE_TEMPLATE_DUMMY_SETTING); + return Collections.singletonList(UPDATE_TEMPLATE_DUMMY_SETTING); } } public void testTemplateUpdate() throws Exception { - assertThat(client().admin().indices().prepareGetTemplates("test_*").get().getIndexTemplates(), empty()); + assertTemplates(); + + // Change some templates assertAcked(client().admin().indices().preparePutTemplate("test_dummy_template").setOrder(0) .setPatterns(Collections.singletonList("*")).get()); assertAcked(client().admin().indices().preparePutTemplate("test_changed_template").setOrder(0) .setPatterns(Collections.singletonList("*")).get()); assertAcked(client().admin().indices().preparePutTemplate("test_removed_template").setOrder(1) .setPatterns(Collections.singletonList("*")).get()); - internalCluster().startNode(Settings.builder().put(nodeSettings(0)).put(TestPlugin.UPDATE_TEMPLATE_ENABLED.getKey(), true)); - // Waiting for the templates to be updated by the newly added node + // Wait for the templates to be updated back to normal assertBusy(() -> { List templates = client().admin().indices().prepareGetTemplates("test_*").get().getIndexTemplates(); assertThat(templates.size(), equalTo(3)); @@ -146,6 +142,11 @@ public void testTemplateUpdate() throws Exception { // Wipe out all templates assertAcked(client().admin().indices().prepareDeleteTemplate("test_*").get()); + assertTemplates(); + + } + + private void assertTemplates() throws Exception { AtomicInteger updateCount = new AtomicInteger(); // Make sure all templates are recreated correctly assertBusy(() -> { diff --git a/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java index e46beb78986ae..dab0621d509a5 100644 --- a/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java @@ -30,6 +30,8 @@ import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -42,6 +44,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -52,11 +55,15 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static java.util.Collections.emptyMap; +import static org.elasticsearch.test.VersionUtils.randomVersion; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.notNullValue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; @@ -68,6 +75,7 @@ public class TemplateUpgradeServiceTests extends ESTestCase { private final ClusterService clusterService = new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null); + public void testCalculateChangesAddChangeAndDelete() { boolean shouldAdd = randomBoolean(); @@ -138,7 +146,6 @@ public void testCalculateChangesAddChangeAndDelete() { } - @SuppressWarnings("unchecked") public void testUpdateTemplates() { int additionsCount = randomIntBetween(0, 5); @@ -191,7 +198,7 @@ public void testUpdateTemplates() { for (int i = 0; i < additionsCount; i++) { if (randomBoolean()) { - putTemplateListeners.get(i).onFailure(new RuntimeException()); + putTemplateListeners.get(i).onFailure(new RuntimeException("test - ignore")); } else { putTemplateListeners.get(i).onResponse(new PutIndexTemplateResponse(randomBoolean()) { @@ -201,7 +208,7 @@ public void testUpdateTemplates() { for (int i = 0; i < deletionsCount; i++) { if (randomBoolean()) { - deleteTemplateListeners.get(i).onFailure(new RuntimeException()); + deleteTemplateListeners.get(i).onFailure(new RuntimeException("test - ignore")); } else { deleteTemplateListeners.get(i).onResponse(new DeleteIndexTemplateResponse(randomBoolean()) { @@ -211,6 +218,9 @@ public void testUpdateTemplates() { assertThat(updatesInProgress - service.getUpdatesInProgress(), equalTo(additionsCount + deletionsCount)); } + private static final Set MASTER_DATA_ROLES = + Collections.unmodifiableSet(EnumSet.of(DiscoveryNode.Role.MASTER, DiscoveryNode.Role.DATA)); + @SuppressWarnings("unchecked") public void testClusterStateUpdate() { @@ -286,7 +296,10 @@ public void testClusterStateUpdate() { )); ClusterState prevState = ClusterState.EMPTY_STATE; - ClusterState state = ClusterState.builder(prevState).metaData(metaData).build(); + ClusterState state = ClusterState.builder(prevState).nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("node1", "node1", buildNewFakeTransportAddress(), emptyMap(), MASTER_DATA_ROLES, Version.CURRENT) + ).localNodeId("node1").masterNodeId("node1").build() + ).metaData(metaData).build(); service.clusterChanged(new ClusterChangedEvent("test", state, prevState)); assertThat(updateInvocation.get(), equalTo(1)); @@ -313,9 +326,9 @@ public void testClusterStateUpdate() { // Make sure that update was called this time since we are no longer running assertThat(updateInvocation.get(), equalTo(2)); - addedListener.getAndSet(null).onFailure(new RuntimeException()); - changedListener.getAndSet(null).onFailure(new RuntimeException()); - removedListener.getAndSet(null).onFailure(new RuntimeException()); + addedListener.getAndSet(null).onFailure(new RuntimeException("test - ignore")); + changedListener.getAndSet(null).onFailure(new RuntimeException("test - ignore")); + removedListener.getAndSet(null).onFailure(new RuntimeException("test - ignore")); service.clusterChanged(new ClusterChangedEvent("test 3", state, prevState)); @@ -324,6 +337,87 @@ public void testClusterStateUpdate() { } + private static final int NODE_TEST_ITERS = 100; + + public void testOnlyOneNodeRunsTemplateUpdates() { + TemplateUpgradeService service = new TemplateUpgradeService(Settings.EMPTY, null, clusterService, null, Collections.emptyList()); + for (int i = 0; i < NODE_TEST_ITERS; i++) { + int nodesCount = randomIntBetween(1, 10); + int clientNodesCount = randomIntBetween(0, 4); + DiscoveryNodes nodes = randomNodes(nodesCount, clientNodesCount); + int updaterNode = -1; + for (int j = 0; j < nodesCount; j++) { + DiscoveryNodes localNodes = DiscoveryNodes.builder(nodes).localNodeId(nodes.resolveNode("node_" + j).getId()).build(); + if (service.shouldLocalNodeUpdateTemplates(localNodes)) { + assertThat("Expected only one node to update template, found " + updaterNode + " and " + j, updaterNode, lessThan(0)); + updaterNode = j; + } + } + assertThat("Expected one node to update template", updaterNode, greaterThanOrEqualTo(0)); + } + } + + public void testIfMasterHasTheHighestVersionItShouldRunsTemplateUpdates() { + for (int i = 0; i < NODE_TEST_ITERS; i++) { + int nodesCount = randomIntBetween(1, 10); + int clientNodesCount = randomIntBetween(0, 4); + DiscoveryNodes nodes = randomNodes(nodesCount, clientNodesCount); + DiscoveryNodes.Builder builder = DiscoveryNodes.builder(nodes).localNodeId(nodes.resolveNode("_master").getId()); + nodes = builder.build(); + TemplateUpgradeService service = new TemplateUpgradeService(Settings.EMPTY, null, clusterService, null, + Collections.emptyList()); + assertThat(service.shouldLocalNodeUpdateTemplates(nodes), + equalTo(nodes.getLargestNonClientNodeVersion().equals(nodes.getMasterNode().getVersion()))); + } + } + + public void testClientNodeRunsTemplateUpdates() { + for (int i = 0; i < NODE_TEST_ITERS; i++) { + int nodesCount = randomIntBetween(1, 10); + int clientNodesCount = randomIntBetween(1, 4); + DiscoveryNodes nodes = randomNodes(nodesCount, clientNodesCount); + int testClient = randomIntBetween(0, clientNodesCount - 1); + DiscoveryNodes.Builder builder = DiscoveryNodes.builder(nodes).localNodeId(nodes.resolveNode("client_" + testClient).getId()); + TemplateUpgradeService service = new TemplateUpgradeService(Settings.EMPTY, null, clusterService, null, + Collections.emptyList()); + assertThat(service.shouldLocalNodeUpdateTemplates(builder.build()), equalTo(false)); + } + } + + private DiscoveryNodes randomNodes(int dataAndMasterNodes, int clientNodes) { + DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); + String masterNodeId = null; + for (int i = 0; i < dataAndMasterNodes; i++) { + String id = randomAlphaOfLength(10) + "_" + i; + Set roles; + if (i == 0) { + masterNodeId = id; + // The first node has to be master node + if (randomBoolean()) { + roles = EnumSet.of(DiscoveryNode.Role.MASTER, DiscoveryNode.Role.DATA); + } else { + roles = EnumSet.of(DiscoveryNode.Role.MASTER); + } + } else { + if (randomBoolean()) { + roles = EnumSet.of(DiscoveryNode.Role.DATA); + } else { + roles = EnumSet.of(DiscoveryNode.Role.MASTER); + } + } + String node = "node_" + i; + builder.add(new DiscoveryNode(node, id, buildNewFakeTransportAddress(), emptyMap(), roles, randomVersion(random()))); + } + builder.masterNodeId(masterNodeId); // Node 0 is always a master node + + for (int i = 0; i < clientNodes; i++) { + String node = "client_" + i; + builder.add(new DiscoveryNode(node, randomAlphaOfLength(10) + "__" + i, buildNewFakeTransportAddress(), emptyMap(), + EnumSet.noneOf(DiscoveryNode.Role.class), randomVersion(random()))); + } + return builder.build(); + } + public static MetaData randomMetaData(IndexTemplateMetaData... templates) { MetaData.Builder builder = MetaData.builder(); for (IndexTemplateMetaData template : templates) { From dba5ab10468dca5d2d7242408d18c795a926e208 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Tue, 20 Jun 2017 09:13:51 -0400 Subject: [PATCH 5/5] Address @spinscale's comments --- .../cluster/metadata/TemplateUpgradeService.java | 8 +++----- .../metadata/TemplateUpgradeServiceTests.java | 13 +++++++------ 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java index 4b5d3b889945b..a22dc7252afb4 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/TemplateUpgradeService.java @@ -150,11 +150,9 @@ boolean shouldLocalNodeUpdateTemplates(DiscoveryNodes nodes) { return false; } for (ObjectCursor node : nodes.getMasterAndDataNodes().values()) { - if (node.value.getVersion().equals(maxVersion)) { - if (node.value.getId().compareTo(localNode.getId()) > 0) { - // We have a node with higher id then mine - it should update - return false; - } + if (node.value.getVersion().equals(maxVersion) && node.value.getId().compareTo(localNode.getId()) > 0) { + // We have a node with higher id then mine - it should update + return false; } } // We have the highest version and highest id - we should perform the update diff --git a/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java index dab0621d509a5..f4e8ba21fc06a 100644 --- a/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/metadata/TemplateUpgradeServiceTests.java @@ -193,8 +193,8 @@ public void testUpdateTemplates() { service.updateTemplates(additions, deletions); int updatesInProgress = service.getUpdatesInProgress(); - assertThat(putTemplateListeners.size(), equalTo(additionsCount)); - assertThat(deleteTemplateListeners.size(), equalTo(deletionsCount)); + assertThat(putTemplateListeners, hasSize(additionsCount)); + assertThat(deleteTemplateListeners, hasSize(deletionsCount)); for (int i = 0; i < additionsCount; i++) { if (randomBoolean()) { @@ -208,11 +208,15 @@ public void testUpdateTemplates() { for (int i = 0; i < deletionsCount; i++) { if (randomBoolean()) { + int prevUpdatesInProgress = service.getUpdatesInProgress(); deleteTemplateListeners.get(i).onFailure(new RuntimeException("test - ignore")); + assertThat(prevUpdatesInProgress - service.getUpdatesInProgress(), equalTo(1)); } else { + int prevUpdatesInProgress = service.getUpdatesInProgress(); deleteTemplateListeners.get(i).onResponse(new DeleteIndexTemplateResponse(randomBoolean()) { }); + assertThat(prevUpdatesInProgress - service.getUpdatesInProgress(), equalTo(1)); } } assertThat(updatesInProgress - service.getUpdatesInProgress(), equalTo(additionsCount + deletionsCount)); @@ -276,7 +280,6 @@ public void testClusterStateUpdate() { return null; }).when(mockIndicesAdminClient).deleteTemplate(any(DeleteIndexTemplateRequest.class), any(ActionListener.class)); - TemplateUpgradeService service = new TemplateUpgradeService(Settings.EMPTY, mockClient, clusterService, threadPool, Arrays.asList( templates -> { @@ -334,7 +337,6 @@ public void testClusterStateUpdate() { // Make sure that update wasn't called this time since the index template metadata didn't change assertThat(updateInvocation.get(), equalTo(2)); - } private static final int NODE_TEST_ITERS = 100; @@ -371,7 +373,7 @@ public void testIfMasterHasTheHighestVersionItShouldRunsTemplateUpdates() { } } - public void testClientNodeRunsTemplateUpdates() { + public void testClientNodeDontRunTemplateUpdates() { for (int i = 0; i < NODE_TEST_ITERS; i++) { int nodesCount = randomIntBetween(1, 10); int clientNodesCount = randomIntBetween(1, 4); @@ -433,5 +435,4 @@ public static MetaData randomMetaData(IndexTemplateMetaData... templates) { } return builder.build(); } - }