From e5ecfd07aa1a3d13121552ae289a6db8c1002031 Mon Sep 17 00:00:00 2001 From: William Brafford Date: Fri, 30 Jul 2021 14:56:21 -0400 Subject: [PATCH 01/15] Add system data streams to feature state snapshots --- .../indices/SystemDataStreamDescriptor.java | 21 +++++++ .../snapshots/SnapshotsService.java | 11 +++- .../SystemDataStreamSnapshotIT.java | 61 +++++++++++++++++++ 3 files changed, 91 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java b/server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java index 4b0a8bc6a9200..0e2043e01b748 100644 --- a/server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java +++ b/server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java @@ -8,14 +8,20 @@ package org.elasticsearch.indices; +import org.apache.lucene.util.automaton.CharacterRunAutomaton; import org.elasticsearch.cluster.metadata.ComponentTemplate; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.Metadata; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; +import static org.elasticsearch.indices.AssociatedIndexDescriptor.buildAutomaton; + /** * Describes a {@link DataStream} that is reserved for use by a system component. The data stream will be managed by the system and also * protected by the system against user modification so that system features are not broken by inadvertent user operations. @@ -76,6 +82,21 @@ public String getDataStreamName() { return dataStreamName; } + // TODO[wrb]: Javadoc + // TODO[wrb]: Refactor to only build automaton once + public List getBackingIndexNames(Metadata metadata) { + CharacterRunAutomaton auto = new CharacterRunAutomaton(buildAutomaton(getBackingIndexPattern())); + + ArrayList matchingIndices = new ArrayList<>(); + metadata.indices().keysIt().forEachRemaining(indexName -> { + if (auto.run(indexName)) { + matchingIndices.add(indexName); + } + }); + + return Collections.unmodifiableList(matchingIndices); + } + public String getDescription() { return description; } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 6b9f0201cb60e..23fa2dd06e627 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -71,6 +71,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.AssociatedIndexDescriptor; +import org.elasticsearch.indices.SystemDataStreamDescriptor; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; @@ -307,6 +308,7 @@ public ClusterState execute(ClusterState currentState) { List indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request)); final List featureStates; + final List systemDataStreamNames = new ArrayList<>(); // if we have any feature states in the snapshot, we add their required indices to the snapshot indices if they haven't // been requested by the request directly if (featureStatesSet.isEmpty()) { @@ -324,7 +326,6 @@ public ClusterState execute(ClusterState currentState) { .collect(Collectors.toList()) ) ) - .filter(featureInfo -> featureInfo.getIndices().isEmpty() == false) // Omit any empty featureStates .collect(Collectors.toList()); for (SnapshotFeatureInfo featureState : featureStates) { indexNames.addAll(featureState.getIndices()); @@ -335,14 +336,20 @@ public ClusterState execute(ClusterState currentState) { for (AssociatedIndexDescriptor aid : systemIndexDescriptorMap.get(feature).getAssociatedIndexDescriptors()) { indexNames.addAll(aid.getMatchingIndices(currentState.metadata())); } + for (SystemDataStreamDescriptor sdd : systemIndexDescriptorMap.get(feature).getDataStreamDescriptors()) { + systemDataStreamNames.add(sdd.getDataStreamName()); + indexNames.addAll(sdd.getBackingIndexNames(currentState.metadata())); + + } } indices = List.copyOf(indexNames); } + // need feature state data streams... final List dataStreams = indexNameExpressionResolver.dataStreamNames( currentState, request.indicesOptions(), - request.indices() + Stream.concat(Arrays.stream(request.indices()), systemDataStreamNames.stream()).distinct().toArray(String[]::new) ); logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices); diff --git a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java index 40b060298e949..55c1e368a3b13 100644 --- a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java +++ b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java @@ -36,7 +36,11 @@ import static org.elasticsearch.datastreams.SystemDataStreamSnapshotIT.SystemDataStreamTestPlugin.SYSTEM_DATA_STREAM_NAME; import static org.hamcrest.Matchers.arrayWithSize; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.oneOf; public class SystemDataStreamSnapshotIT extends AbstractSnapshotIntegTestCase { @@ -88,6 +92,7 @@ public void testSystemDataStreamSnapshotIT() throws Exception { .setWaitForCompletion(true) .setIncludeGlobalState(false) .get(); + assertSnapshotSuccess(createSnapshotResponse); // We have to delete the data stream directly, as the feature reset API doesn't clean up system data streams yet // See https://github.com/elastic/elasticsearch/issues/75818 @@ -118,6 +123,62 @@ public void testSystemDataStreamSnapshotIT() throws Exception { } } + public void testSystemDataStreamInFeatureState() throws Exception { + Path location = randomRepoPath(); + createRepository(REPO, "fs", location); + + { + CreateDataStreamAction.Request request = new CreateDataStreamAction.Request(SYSTEM_DATA_STREAM_NAME); + final AcknowledgedResponse response = client().execute(CreateDataStreamAction.INSTANCE, request).get(); + assertTrue(response.isAcknowledged()); + } + + // Index a doc so that a concrete backing index will be created + IndexResponse indexToDataStreamResponse = client().prepareIndex(SYSTEM_DATA_STREAM_NAME) + .setId("42") + .setSource("{ \"@timestamp\": \"2099-03-08T11:06:07.000Z\", \"name\": \"my-name\" }", XContentType.JSON) + .setOpType(DocWriteRequest.OpType.CREATE) + .execute() + .actionGet(); + assertThat(indexToDataStreamResponse.status().getStatus(), oneOf(200, 201)); + + // Index a doc so that a concrete backing index will be created + IndexResponse indexResponse = client().prepareIndex("my-index") + .setId("42") + .setSource("{ \"name\": \"my-name\" }", XContentType.JSON) + .setOpType(DocWriteRequest.OpType.CREATE) + .execute() + .actionGet(); + assertThat(indexResponse.status().getStatus(), oneOf(200, 201)); + + { + GetDataStreamAction.Request request = new GetDataStreamAction.Request(new String[]{SYSTEM_DATA_STREAM_NAME}); + GetDataStreamAction.Response response = client().execute(GetDataStreamAction.INSTANCE, request).get(); + assertThat(response.getDataStreams(), hasSize(1)); + assertTrue(response.getDataStreams().get(0).getDataStream().isSystem()); + } + + CreateSnapshotResponse createSnapshotResponse = client().admin() + .cluster() + .prepareCreateSnapshot(REPO, SNAPSHOT) + .setIndices("my-index") + .setFeatureStates(SystemDataStreamTestPlugin.class.getSimpleName()) + .setWaitForCompletion(true) + .setIncludeGlobalState(false) + .get(); + assertSnapshotSuccess(createSnapshotResponse); + + assertThat(createSnapshotResponse.getSnapshotInfo().dataStreams(), not(empty())); + } + + private void assertSnapshotSuccess(CreateSnapshotResponse createSnapshotResponse) { + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat( + createSnapshotResponse.getSnapshotInfo().successfulShards(), + equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()) + ); + } + public static class SystemDataStreamTestPlugin extends Plugin implements SystemIndexPlugin { static final String SYSTEM_DATA_STREAM_NAME = ".test-data-stream"; From 46871d1bb22a5b3813be220c19b4e4c4e87126b5 Mon Sep 17 00:00:00 2001 From: William Brafford Date: Fri, 30 Jul 2021 15:10:15 -0400 Subject: [PATCH 02/15] Spotless formatting --- .../elasticsearch/datastreams/SystemDataStreamSnapshotIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java index 55c1e368a3b13..89f91eb4137a4 100644 --- a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java +++ b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java @@ -152,7 +152,7 @@ public void testSystemDataStreamInFeatureState() throws Exception { assertThat(indexResponse.status().getStatus(), oneOf(200, 201)); { - GetDataStreamAction.Request request = new GetDataStreamAction.Request(new String[]{SYSTEM_DATA_STREAM_NAME}); + GetDataStreamAction.Request request = new GetDataStreamAction.Request(new String[] { SYSTEM_DATA_STREAM_NAME }); GetDataStreamAction.Response response = client().execute(GetDataStreamAction.INSTANCE, request).get(); assertThat(response.getDataStreams(), hasSize(1)); assertTrue(response.getDataStreams().get(0).getDataStream().isSystem()); From 1df9725ced8c8c40bf5ea19021eb8dccbbe765ee Mon Sep 17 00:00:00 2001 From: William Brafford Date: Fri, 30 Jul 2021 16:32:35 -0400 Subject: [PATCH 03/15] Don't pass system data streams through index name resolution --- .../org/elasticsearch/snapshots/SnapshotsService.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 23fa2dd06e627..f7ba18a7c8c28 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -337,9 +337,11 @@ public ClusterState execute(ClusterState currentState) { indexNames.addAll(aid.getMatchingIndices(currentState.metadata())); } for (SystemDataStreamDescriptor sdd : systemIndexDescriptorMap.get(feature).getDataStreamDescriptors()) { - systemDataStreamNames.add(sdd.getDataStreamName()); - indexNames.addAll(sdd.getBackingIndexNames(currentState.metadata())); - + List backingIndexNames = sdd.getBackingIndexNames(currentState.metadata()); + if (backingIndexNames.size() > 0) { + indexNames.addAll(backingIndexNames); + systemDataStreamNames.add(sdd.getDataStreamName()); + } } } indices = List.copyOf(indexNames); @@ -349,8 +351,9 @@ public ClusterState execute(ClusterState currentState) { final List dataStreams = indexNameExpressionResolver.dataStreamNames( currentState, request.indicesOptions(), - Stream.concat(Arrays.stream(request.indices()), systemDataStreamNames.stream()).distinct().toArray(String[]::new) + request.indices() ); + dataStreams.addAll(systemDataStreamNames); logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices); From de6cd836715540ae430a8d8a30516cb83e845e00 Mon Sep 17 00:00:00 2001 From: William Brafford Date: Mon, 2 Aug 2021 10:44:50 -0400 Subject: [PATCH 04/15] Don't add no-op features to snapshots --- .../snapshots/SnapshotsService.java | 64 ++++++++++--------- 1 file changed, 33 insertions(+), 31 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index f7ba18a7c8c28..2c25e3c9e7b75 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -307,43 +307,45 @@ public ClusterState execute(ClusterState currentState) { // Store newSnapshot here to be processed in clusterStateProcessed List indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request)); - final List featureStates; + final List featureStates = new ArrayList<>(); final List systemDataStreamNames = new ArrayList<>(); // if we have any feature states in the snapshot, we add their required indices to the snapshot indices if they haven't // been requested by the request directly - if (featureStatesSet.isEmpty()) { - featureStates = Collections.emptyList(); - } else { - final Set indexNames = new HashSet<>(indices); - featureStates = featureStatesSet.stream() - .map( - feature -> new SnapshotFeatureInfo( - feature, - systemIndexDescriptorMap.get(feature) - .getIndexDescriptors() - .stream() - .flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata()).stream()) - .collect(Collectors.toList()) - ) - ) - .collect(Collectors.toList()); - for (SnapshotFeatureInfo featureState : featureStates) { - indexNames.addAll(featureState.getIndices()); - } + final Set indexNames = new HashSet<>(indices); + for (String featureName : featureStatesSet) { + SystemIndices.Feature feature = systemIndexDescriptorMap.get(featureName); - // Add all resolved indices from the feature states to the list of indices - for (String feature : featureStatesSet) { - for (AssociatedIndexDescriptor aid : systemIndexDescriptorMap.get(feature).getAssociatedIndexDescriptors()) { - indexNames.addAll(aid.getMatchingIndices(currentState.metadata())); - } - for (SystemDataStreamDescriptor sdd : systemIndexDescriptorMap.get(feature).getDataStreamDescriptors()) { - List backingIndexNames = sdd.getBackingIndexNames(currentState.metadata()); - if (backingIndexNames.size() > 0) { - indexNames.addAll(backingIndexNames); - systemDataStreamNames.add(sdd.getDataStreamName()); - } + SnapshotFeatureInfo snapshotFeatureInfo = new SnapshotFeatureInfo( + featureName, + feature.getIndexDescriptors() + .stream() + .flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata()).stream()) + .collect(Collectors.toList()) + ); + + List featureSystemIndices = snapshotFeatureInfo.getIndices(); + List featureAssociatedIndices = new ArrayList<>(); + List featureDataStreamBackingIndices = new ArrayList<>(); + List featureSystemDataStreams = new ArrayList<>(); + for (AssociatedIndexDescriptor aid : feature.getAssociatedIndexDescriptors()) { + featureAssociatedIndices.addAll(aid.getMatchingIndices(currentState.metadata())); + } + for (SystemDataStreamDescriptor sdd : feature.getDataStreamDescriptors()) { + List backingIndexNames = sdd.getBackingIndexNames(currentState.metadata()); + if (backingIndexNames.size() > 0) { + featureDataStreamBackingIndices.addAll(backingIndexNames); + featureSystemDataStreams.add(sdd.getDataStreamName()); } } + if (featureSystemIndices.size() > 0 + || featureAssociatedIndices.size() > 0 + || featureDataStreamBackingIndices.size() > 0) { + featureStates.add(snapshotFeatureInfo); + indexNames.addAll(featureSystemIndices); + indexNames.addAll(featureAssociatedIndices); + indexNames.addAll(featureDataStreamBackingIndices); + systemDataStreamNames.addAll(featureSystemDataStreams); + } indices = List.copyOf(indexNames); } From 9083f5dd5d9ea27fefcfcd7b5faf8a107562375d Mon Sep 17 00:00:00 2001 From: William Brafford Date: Mon, 2 Aug 2021 16:59:31 -0400 Subject: [PATCH 05/15] Refactor and clean up --- .../indices/SystemDataStreamDescriptor.java | 15 +++++++--- .../snapshots/SnapshotsService.java | 29 ++++++++++--------- 2 files changed, 26 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java b/server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java index 0e2043e01b748..baaa10227cb5a 100644 --- a/server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java +++ b/server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java @@ -36,6 +36,8 @@ public class SystemDataStreamDescriptor { private final List allowedElasticProductOrigins; private final ExecutorNames executorNames; + private CharacterRunAutomaton characterRunAutomaton; + /** * Creates a new descriptor for a system data descriptor * @param dataStreamName the name of the data stream. Must not be {@code null} @@ -82,14 +84,19 @@ public String getDataStreamName() { return dataStreamName; } - // TODO[wrb]: Javadoc - // TODO[wrb]: Refactor to only build automaton once + /** + * Retrieve backing indices for this system data stream + * @param metadata Metadata in which to look for indices + * @return List of names of backing indices + */ public List getBackingIndexNames(Metadata metadata) { - CharacterRunAutomaton auto = new CharacterRunAutomaton(buildAutomaton(getBackingIndexPattern())); + if (Objects.isNull(this.characterRunAutomaton)) { + this.characterRunAutomaton = new CharacterRunAutomaton(buildAutomaton(getBackingIndexPattern())); + } ArrayList matchingIndices = new ArrayList<>(); metadata.indices().keysIt().forEachRemaining(indexName -> { - if (auto.run(indexName)) { + if (this.characterRunAutomaton.run(indexName)) { matchingIndices.add(indexName); } }); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 329d4fd02218d..a4db8ef64cc30 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -315,21 +315,17 @@ public ClusterState execute(ClusterState currentState) { for (String featureName : featureStatesSet) { SystemIndices.Feature feature = systemIndexDescriptorMap.get(featureName); - SnapshotFeatureInfo snapshotFeatureInfo = new SnapshotFeatureInfo( - featureName, - feature.getIndexDescriptors() - .stream() - .flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata()).stream()) - .collect(Collectors.toList()) - ); + List featureSystemIndices = feature.getIndexDescriptors() + .stream() + .flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata()).stream()) + .collect(Collectors.toList()); + List featureAssociatedIndices = feature.getAssociatedIndexDescriptors() + .stream() + .flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata()).stream()) + .collect(Collectors.toList()); - List featureSystemIndices = snapshotFeatureInfo.getIndices(); - List featureAssociatedIndices = new ArrayList<>(); - List featureDataStreamBackingIndices = new ArrayList<>(); List featureSystemDataStreams = new ArrayList<>(); - for (AssociatedIndexDescriptor aid : feature.getAssociatedIndexDescriptors()) { - featureAssociatedIndices.addAll(aid.getMatchingIndices(currentState.metadata())); - } + List featureDataStreamBackingIndices = new ArrayList<>(); for (SystemDataStreamDescriptor sdd : feature.getDataStreamDescriptors()) { List backingIndexNames = sdd.getBackingIndexNames(currentState.metadata()); if (backingIndexNames.size() > 0) { @@ -337,10 +333,15 @@ public ClusterState execute(ClusterState currentState) { featureSystemDataStreams.add(sdd.getDataStreamName()); } } + if (featureSystemIndices.size() > 0 || featureAssociatedIndices.size() > 0 || featureDataStreamBackingIndices.size() > 0) { - featureStates.add(snapshotFeatureInfo); + + featureStates.add(new SnapshotFeatureInfo( + featureName, + featureSystemIndices + )); indexNames.addAll(featureSystemIndices); indexNames.addAll(featureAssociatedIndices); indexNames.addAll(featureDataStreamBackingIndices); From bb6385f1a3427dba0599f5828844116952f6aa41 Mon Sep 17 00:00:00 2001 From: William Brafford Date: Mon, 2 Aug 2021 18:23:04 -0400 Subject: [PATCH 06/15] Checkstyle and spotless --- .../java/org/elasticsearch/snapshots/SnapshotsService.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index a4db8ef64cc30..18bbe73398b65 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -70,7 +70,6 @@ import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.AssociatedIndexDescriptor; import org.elasticsearch.indices.SystemDataStreamDescriptor; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.repositories.IndexId; @@ -338,10 +337,7 @@ public ClusterState execute(ClusterState currentState) { || featureAssociatedIndices.size() > 0 || featureDataStreamBackingIndices.size() > 0) { - featureStates.add(new SnapshotFeatureInfo( - featureName, - featureSystemIndices - )); + featureStates.add(new SnapshotFeatureInfo(featureName, featureSystemIndices)); indexNames.addAll(featureSystemIndices); indexNames.addAll(featureAssociatedIndices); indexNames.addAll(featureDataStreamBackingIndices); From 8e8eb552175c3fe4f97d5f79541548f3ba9dfb1f Mon Sep 17 00:00:00 2001 From: William Brafford Date: Tue, 3 Aug 2021 09:13:34 -0400 Subject: [PATCH 07/15] Add test todo --- .../elasticsearch/datastreams/SystemDataStreamSnapshotIT.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java index 89f91eb4137a4..026459b1de173 100644 --- a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java +++ b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java @@ -169,6 +169,8 @@ public void testSystemDataStreamInFeatureState() throws Exception { assertSnapshotSuccess(createSnapshotResponse); assertThat(createSnapshotResponse.getSnapshotInfo().dataStreams(), not(empty())); + + // TODO[wrb]: continue test, check restore } private void assertSnapshotSuccess(CreateSnapshotResponse createSnapshotResponse) { From f1984b53496e3e4f3a4a5e5949d05029169d1c62 Mon Sep 17 00:00:00 2001 From: William Brafford Date: Tue, 3 Aug 2021 16:14:58 -0400 Subject: [PATCH 08/15] Hook in system data streams for snapshot restoration --- .../snapshots/RestoreService.java | 25 +++++++++---- .../SystemDataStreamSnapshotIT.java | 37 ++++++++++++++++++- 2 files changed, 53 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 46f10034089c8..d43d7ec1fbd30 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -69,6 +69,7 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.ShardLimitValidator; +import org.elasticsearch.indices.SystemDataStreamDescriptor; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; @@ -323,13 +324,28 @@ private void startRestore( Collections.addAll(requestIndices, indicesInRequest); } + // Determine system indices to restore from requested feature states + final Map> featureStatesToRestore = getFeatureStatesToRestore(request, snapshotInfo, snapshot); + final Set featureStateIndices = featureStatesToRestore.values() + .stream() + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + + final Set featureStateDataStreams = featureStatesToRestore.keySet() + .stream() + .map(name -> systemIndices.getFeatures().get(name)) + .flatMap(feature -> feature.getDataStreamDescriptors().stream()) + .map(SystemDataStreamDescriptor::getDataStreamName) + .collect(Collectors.toSet()); + // Get data stream metadata for requested data streams Tuple, Map> result = getDataStreamsToRestore( repository, snapshotId, snapshotInfo, globalMetadata, - requestIndices, + // include system data stream names in argument to this method + Stream.concat(requestIndices.stream(), featureStateDataStreams.stream()).collect(Collectors.toList()), request.includeAliases() ); Map dataStreamsToRestore = result.v1(); @@ -346,13 +362,6 @@ private void startRestore( .collect(Collectors.toSet()); requestIndices.addAll(dataStreamIndices); - // Determine system indices to restore from requested feature states - final Map> featureStatesToRestore = getFeatureStatesToRestore(request, snapshotInfo, snapshot); - final Set featureStateIndices = featureStatesToRestore.values() - .stream() - .flatMap(Collection::stream) - .collect(Collectors.toSet()); - // Resolve the indices that were directly requested final List requestedIndicesInSnapshot = filterIndices( snapshotInfo.indices(), diff --git a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java index 026459b1de173..343cb0cadf3f7 100644 --- a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java +++ b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java @@ -10,6 +10,8 @@ import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -170,7 +172,40 @@ public void testSystemDataStreamInFeatureState() throws Exception { assertThat(createSnapshotResponse.getSnapshotInfo().dataStreams(), not(empty())); - // TODO[wrb]: continue test, check restore + // We have to delete the data stream directly, as the feature reset API doesn't clean up system data streams yet + // See https://github.com/elastic/elasticsearch/issues/75818 + { + DeleteDataStreamAction.Request request = new DeleteDataStreamAction.Request(new String[] { SYSTEM_DATA_STREAM_NAME }); + AcknowledgedResponse response = client().execute(DeleteDataStreamAction.INSTANCE, request).get(); + assertTrue(response.isAcknowledged()); + } + + { + DeleteIndexRequest request = new DeleteIndexRequest("my-index"); + AcknowledgedResponse response = client().execute(DeleteIndexAction.INSTANCE, request).get(); + assertTrue(response.isAcknowledged()); + } + + { + GetIndexResponse indicesRemaining = client().admin().indices().prepareGetIndex().addIndices("_all").get(); + assertThat(indicesRemaining.indices(), arrayWithSize(0)); + } + + RestoreSnapshotResponse restoreSnapshotResponse = client().admin() + .cluster() + .prepareRestoreSnapshot(REPO, SNAPSHOT) + .setWaitForCompletion(true) + .setIndices("my-index") + .setFeatureStates(SystemDataStreamTestPlugin.class.getSimpleName()) + .get(); + assertEquals(restoreSnapshotResponse.getRestoreInfo().totalShards(), restoreSnapshotResponse.getRestoreInfo().successfulShards()); + + { + GetDataStreamAction.Request request = new GetDataStreamAction.Request(new String[] { SYSTEM_DATA_STREAM_NAME }); + GetDataStreamAction.Response response = client().execute(GetDataStreamAction.INSTANCE, request).get(); + assertThat(response.getDataStreams(), hasSize(1)); + assertTrue(response.getDataStreams().get(0).getDataStream().isSystem()); + } } private void assertSnapshotSuccess(CreateSnapshotResponse createSnapshotResponse) { From 24616999e6814894382c2380a181725f79c080c8 Mon Sep 17 00:00:00 2001 From: William Brafford Date: Tue, 10 Aug 2021 21:06:17 -0400 Subject: [PATCH 09/15] Clean up test class --- .../SystemDataStreamSnapshotIT.java | 48 +++++++++---------- 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java index 343cb0cadf3f7..8057169532ebd 100644 --- a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java +++ b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java @@ -10,8 +10,6 @@ import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -22,6 +20,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.SystemIndexPlugin; import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase; +import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.mockstore.MockRepository; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.action.CreateDataStreamAction; @@ -37,6 +36,7 @@ import java.util.Map; import static org.elasticsearch.datastreams.SystemDataStreamSnapshotIT.SystemDataStreamTestPlugin.SYSTEM_DATA_STREAM_NAME; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @@ -88,13 +88,14 @@ public void testSystemDataStreamSnapshotIT() throws Exception { assertTrue(response.getDataStreams().get(0).getDataStream().isSystem()); } - CreateSnapshotResponse createSnapshotResponse = client().admin() - .cluster() - .prepareCreateSnapshot(REPO, SNAPSHOT) - .setWaitForCompletion(true) - .setIncludeGlobalState(false) - .get(); - assertSnapshotSuccess(createSnapshotResponse); + assertSuccessful( + client().admin() + .cluster() + .prepareCreateSnapshot(REPO, SNAPSHOT) + .setWaitForCompletion(true) + .setIncludeGlobalState(false) + .execute() + ); // We have to delete the data stream directly, as the feature reset API doesn't clean up system data streams yet // See https://github.com/elastic/elasticsearch/issues/75818 @@ -150,7 +151,7 @@ public void testSystemDataStreamInFeatureState() throws Exception { .setSource("{ \"name\": \"my-name\" }", XContentType.JSON) .setOpType(DocWriteRequest.OpType.CREATE) .execute() - .actionGet(); + .get(); assertThat(indexResponse.status().getStatus(), oneOf(200, 201)); { @@ -160,17 +161,18 @@ public void testSystemDataStreamInFeatureState() throws Exception { assertTrue(response.getDataStreams().get(0).getDataStream().isSystem()); } - CreateSnapshotResponse createSnapshotResponse = client().admin() - .cluster() - .prepareCreateSnapshot(REPO, SNAPSHOT) - .setIndices("my-index") - .setFeatureStates(SystemDataStreamTestPlugin.class.getSimpleName()) - .setWaitForCompletion(true) - .setIncludeGlobalState(false) - .get(); - assertSnapshotSuccess(createSnapshotResponse); + SnapshotInfo snapshotInfo = assertSuccessful( + client().admin() + .cluster() + .prepareCreateSnapshot(REPO, SNAPSHOT) + .setIndices("my-index") + .setFeatureStates(SystemDataStreamTestPlugin.class.getSimpleName()) + .setWaitForCompletion(true) + .setIncludeGlobalState(false) + .execute() + ); - assertThat(createSnapshotResponse.getSnapshotInfo().dataStreams(), not(empty())); + assertThat(snapshotInfo.dataStreams(), not(empty())); // We have to delete the data stream directly, as the feature reset API doesn't clean up system data streams yet // See https://github.com/elastic/elasticsearch/issues/75818 @@ -180,11 +182,7 @@ public void testSystemDataStreamInFeatureState() throws Exception { assertTrue(response.isAcknowledged()); } - { - DeleteIndexRequest request = new DeleteIndexRequest("my-index"); - AcknowledgedResponse response = client().execute(DeleteIndexAction.INSTANCE, request).get(); - assertTrue(response.isAcknowledged()); - } + assertAcked(client().admin().indices().prepareDelete("my-index")); { GetIndexResponse indicesRemaining = client().admin().indices().prepareGetIndex().addIndices("_all").get(); From 33daff69691110c4e373a28f8054e0c8dba70861 Mon Sep 17 00:00:00 2001 From: William Brafford Date: Tue, 10 Aug 2021 22:28:02 -0400 Subject: [PATCH 10/15] Built automaton in constructor --- .../indices/SystemDataStreamDescriptor.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java b/server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java index baaa10227cb5a..6ef0c835595a0 100644 --- a/server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java +++ b/server/src/main/java/org/elasticsearch/indices/SystemDataStreamDescriptor.java @@ -35,8 +35,7 @@ public class SystemDataStreamDescriptor { private final Map componentTemplates; private final List allowedElasticProductOrigins; private final ExecutorNames executorNames; - - private CharacterRunAutomaton characterRunAutomaton; + private final CharacterRunAutomaton characterRunAutomaton; /** * Creates a new descriptor for a system data descriptor @@ -78,6 +77,9 @@ public SystemDataStreamDescriptor(String dataStreamName, String description, Typ this.executorNames = Objects.nonNull(executorNames) ? executorNames : ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS; + + this.characterRunAutomaton = new CharacterRunAutomaton( + buildAutomaton(backingIndexPatternForDataStream(this.dataStreamName))); } public String getDataStreamName() { @@ -90,10 +92,6 @@ public String getDataStreamName() { * @return List of names of backing indices */ public List getBackingIndexNames(Metadata metadata) { - if (Objects.isNull(this.characterRunAutomaton)) { - this.characterRunAutomaton = new CharacterRunAutomaton(buildAutomaton(getBackingIndexPattern())); - } - ArrayList matchingIndices = new ArrayList<>(); metadata.indices().keysIt().forEachRemaining(indexName -> { if (this.characterRunAutomaton.run(indexName)) { @@ -117,7 +115,11 @@ public boolean isExternal() { } public String getBackingIndexPattern() { - return DataStream.BACKING_INDEX_PREFIX + getDataStreamName() + "-*"; + return backingIndexPatternForDataStream(getDataStreamName()); + } + + private static String backingIndexPatternForDataStream(String dataStream) { + return DataStream.BACKING_INDEX_PREFIX + dataStream + "-*"; } public List getAllowedElasticProductOrigins() { From 59fb3313aac9380fca6c1cf8bb0daeedbea90167 Mon Sep 17 00:00:00 2001 From: William Brafford Date: Tue, 10 Aug 2021 22:30:23 -0400 Subject: [PATCH 11/15] Remove low-value comment --- .../main/java/org/elasticsearch/snapshots/SnapshotsService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 82522234051ef..d1f20a6f02b72 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -348,7 +348,6 @@ public ClusterState execute(ClusterState currentState) { indices = List.copyOf(indexNames); } - // need feature state data streams... final List dataStreams = indexNameExpressionResolver.dataStreamNames( currentState, request.indicesOptions(), From a44a44299bb7e153b85e3e2077e0da345cd49c44 Mon Sep 17 00:00:00 2001 From: William Brafford Date: Thu, 12 Aug 2021 10:53:35 -0400 Subject: [PATCH 12/15] server/src/main --- .../snapshots/SnapshotsService.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index d1f20a6f02b72..370efadb0efde 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -308,25 +308,25 @@ public ClusterState execute(ClusterState currentState) { // Store newSnapshot here to be processed in clusterStateProcessed List indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request)); - final List featureStates = new ArrayList<>(); - final List systemDataStreamNames = new ArrayList<>(); + final Set featureStates = new HashSet<>(); + final Set systemDataStreamNames = new HashSet<>(); // if we have any feature states in the snapshot, we add their required indices to the snapshot indices if they haven't // been requested by the request directly final Set indexNames = new HashSet<>(indices); for (String featureName : featureStatesSet) { SystemIndices.Feature feature = systemIndexDescriptorMap.get(featureName); - List featureSystemIndices = feature.getIndexDescriptors() + Set featureSystemIndices = feature.getIndexDescriptors() .stream() .flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata()).stream()) - .collect(Collectors.toList()); - List featureAssociatedIndices = feature.getAssociatedIndexDescriptors() + .collect(Collectors.toSet()); + Set featureAssociatedIndices = feature.getAssociatedIndexDescriptors() .stream() .flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata()).stream()) - .collect(Collectors.toList()); + .collect(Collectors.toSet()); - List featureSystemDataStreams = new ArrayList<>(); - List featureDataStreamBackingIndices = new ArrayList<>(); + Set featureSystemDataStreams = new HashSet<>(); + Set featureDataStreamBackingIndices = new HashSet<>(); for (SystemDataStreamDescriptor sdd : feature.getDataStreamDescriptors()) { List backingIndexNames = sdd.getBackingIndexNames(currentState.metadata()); if (backingIndexNames.size() > 0) { @@ -339,7 +339,7 @@ public ClusterState execute(ClusterState currentState) { || featureAssociatedIndices.size() > 0 || featureDataStreamBackingIndices.size() > 0) { - featureStates.add(new SnapshotFeatureInfo(featureName, featureSystemIndices)); + featureStates.add(new SnapshotFeatureInfo(featureName, new ArrayList<>(featureSystemIndices))); indexNames.addAll(featureSystemIndices); indexNames.addAll(featureAssociatedIndices); indexNames.addAll(featureDataStreamBackingIndices); @@ -396,7 +396,7 @@ public ClusterState execute(ClusterState currentState) { shards, userMeta, version, - featureStates + new ArrayList<>(featureStates) ); return ClusterState.builder(currentState) .putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(CollectionUtils.appendToCopy(runningSnapshots, newEntry))) From f5730275baf20ba6021f035558235957f4fb2999 Mon Sep 17 00:00:00 2001 From: William Brafford Date: Fri, 13 Aug 2021 17:37:20 -0400 Subject: [PATCH 13/15] Cleanup from PR review --- .../org/elasticsearch/snapshots/RestoreService.java | 9 +++++++++ .../org/elasticsearch/snapshots/SnapshotsService.java | 4 ++-- .../datastreams/SystemDataStreamSnapshotIT.java | 11 ----------- 3 files changed, 11 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index d43d7ec1fbd30..29f1db2c1cdf3 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -331,8 +331,17 @@ private void startRestore( .flatMap(Collection::stream) .collect(Collectors.toSet()); + final Map featureSet = systemIndices.getFeatures(); final Set featureStateDataStreams = featureStatesToRestore.keySet() .stream() + .filter(featureName -> { + if (featureSet.containsKey(featureName)) { + return true; + } + logger.warn("Restoring snapshot [" + snapshotInfo.snapshotId() + "]: skipping feature [" + + featureName + "] because it is not available in this cluster"); + return false; + }) .map(name -> systemIndices.getFeatures().get(name)) .flatMap(feature -> feature.getDataStreamDescriptors().stream()) .map(SystemDataStreamDescriptor::getDataStreamName) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 3a1cb35b5a4cf..2db67625506c0 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -339,7 +339,7 @@ public ClusterState execute(ClusterState currentState) { || featureAssociatedIndices.size() > 0 || featureDataStreamBackingIndices.size() > 0) { - featureStates.add(new SnapshotFeatureInfo(featureName, new ArrayList<>(featureSystemIndices))); + featureStates.add(new SnapshotFeatureInfo(featureName, List.copyOf(featureSystemIndices))); indexNames.addAll(featureSystemIndices); indexNames.addAll(featureAssociatedIndices); indexNames.addAll(featureDataStreamBackingIndices); @@ -396,7 +396,7 @@ public ClusterState execute(ClusterState currentState) { shards, userMeta, version, - new ArrayList<>(featureStates) + List.copyOf(featureStates) ); return ClusterState.builder(currentState) .putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(CollectionUtils.appendToCopy(runningSnapshots, newEntry))) diff --git a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java index 8057169532ebd..c701ff5e6ed36 100644 --- a/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java +++ b/x-pack/plugin/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/SystemDataStreamSnapshotIT.java @@ -8,7 +8,6 @@ package org.elasticsearch.datastreams; import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.index.IndexResponse; @@ -39,8 +38,6 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.oneOf; @@ -206,14 +203,6 @@ public void testSystemDataStreamInFeatureState() throws Exception { } } - private void assertSnapshotSuccess(CreateSnapshotResponse createSnapshotResponse) { - assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); - assertThat( - createSnapshotResponse.getSnapshotInfo().successfulShards(), - equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()) - ); - } - public static class SystemDataStreamTestPlugin extends Plugin implements SystemIndexPlugin { static final String SYSTEM_DATA_STREAM_NAME = ".test-data-stream"; From 9c09345ed8326c0900d07e8024f6d51c3d2cdf75 Mon Sep 17 00:00:00 2001 From: William Brafford Date: Mon, 16 Aug 2021 09:05:00 -0400 Subject: [PATCH 14/15] Apply spotless formatting --- .../snapshots/RestoreService.java | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 29f1db2c1cdf3..6aa581dd72e75 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -332,16 +332,19 @@ private void startRestore( .collect(Collectors.toSet()); final Map featureSet = systemIndices.getFeatures(); - final Set featureStateDataStreams = featureStatesToRestore.keySet() - .stream() - .filter(featureName -> { - if (featureSet.containsKey(featureName)) { - return true; - } - logger.warn("Restoring snapshot [" + snapshotInfo.snapshotId() + "]: skipping feature [" + - featureName + "] because it is not available in this cluster"); - return false; - }) + final Set featureStateDataStreams = featureStatesToRestore.keySet().stream().filter(featureName -> { + if (featureSet.containsKey(featureName)) { + return true; + } + logger.warn( + "Restoring snapshot [" + + snapshotInfo.snapshotId() + + "]: skipping feature [" + + featureName + + "] because it is not available in this cluster" + ); + return false; + }) .map(name -> systemIndices.getFeatures().get(name)) .flatMap(feature -> feature.getDataStreamDescriptors().stream()) .map(SystemDataStreamDescriptor::getDataStreamName) From 2a84b4346e3e00f11b7c2d74d6016c220437eb9c Mon Sep 17 00:00:00 2001 From: William Brafford Date: Mon, 16 Aug 2021 10:05:12 -0400 Subject: [PATCH 15/15] Improve warn logging --- .../org/elasticsearch/snapshots/RestoreService.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 6aa581dd72e75..c05b0a5d01c9b 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -337,11 +337,11 @@ private void startRestore( return true; } logger.warn( - "Restoring snapshot [" - + snapshotInfo.snapshotId() - + "]: skipping feature [" - + featureName - + "] because it is not available in this cluster" + () -> new ParameterizedMessage( + "Restoring snapshot[{}] skipping feature [{}] because it is not available in this cluster", + snapshotInfo.snapshotId(), + featureName + ) ); return false; })