Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
e5ecfd0
Add system data streams to feature state snapshots
williamrandolph Jul 30, 2021
46871d1
Spotless formatting
williamrandolph Jul 30, 2021
1df9725
Don't pass system data streams through index name resolution
williamrandolph Jul 30, 2021
de6cd83
Don't add no-op features to snapshots
williamrandolph Aug 2, 2021
faeac56
Merge branch 'master' into system-data-streams-in-snapshots
williamrandolph Aug 2, 2021
9083f5d
Refactor and clean up
williamrandolph Aug 2, 2021
bb6385f
Checkstyle and spotless
williamrandolph Aug 2, 2021
8e8eb55
Add test todo
williamrandolph Aug 3, 2021
f1984b5
Hook in system data streams for snapshot restoration
williamrandolph Aug 3, 2021
6df6980
Merge branch 'master' into system-data-streams-in-snapshots
williamrandolph Aug 3, 2021
2461699
Clean up test class
williamrandolph Aug 11, 2021
7ecba14
Merge branch 'master' into system-data-streams-in-snapshots
williamrandolph Aug 11, 2021
33daff6
Built automaton in constructor
williamrandolph Aug 11, 2021
59fb331
Remove low-value comment
williamrandolph Aug 11, 2021
a44a442
server/src/main
williamrandolph Aug 12, 2021
b578ed2
Merge branch 'master' into system-data-streams-in-snapshots
williamrandolph Aug 12, 2021
231d094
Merge branch 'master' into system-data-streams-in-snapshots
williamrandolph Aug 13, 2021
f573027
Cleanup from PR review
williamrandolph Aug 13, 2021
9c09345
Apply spotless formatting
williamrandolph Aug 16, 2021
2d1bee5
Merge branch 'master' into system-data-streams-in-snapshots
williamrandolph Aug 16, 2021
2a84b43
Improve warn logging
williamrandolph Aug 16, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,20 @@

package org.elasticsearch.indices;

import org.apache.lucene.util.automaton.CharacterRunAutomaton;
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.Metadata;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.indices.AssociatedIndexDescriptor.buildAutomaton;

/**
* Describes a {@link DataStream} that is reserved for use by a system component. The data stream will be managed by the system and also
* protected by the system against user modification so that system features are not broken by inadvertent user operations.
Expand All @@ -29,6 +35,7 @@ public class SystemDataStreamDescriptor {
private final Map<String, ComponentTemplate> componentTemplates;
private final List<String> allowedElasticProductOrigins;
private final ExecutorNames executorNames;
private final CharacterRunAutomaton characterRunAutomaton;

/**
* Creates a new descriptor for a system data descriptor
Expand Down Expand Up @@ -70,12 +77,31 @@ public SystemDataStreamDescriptor(String dataStreamName, String description, Typ
this.executorNames = Objects.nonNull(executorNames)
? executorNames
: ExecutorNames.DEFAULT_SYSTEM_DATA_STREAM_THREAD_POOLS;

this.characterRunAutomaton = new CharacterRunAutomaton(
buildAutomaton(backingIndexPatternForDataStream(this.dataStreamName)));
}

public String getDataStreamName() {
return dataStreamName;
}

/**
* Retrieve backing indices for this system data stream
* @param metadata Metadata in which to look for indices
* @return List of names of backing indices
*/
public List<String> getBackingIndexNames(Metadata metadata) {
ArrayList<String> matchingIndices = new ArrayList<>();
metadata.indices().keysIt().forEachRemaining(indexName -> {
if (this.characterRunAutomaton.run(indexName)) {
matchingIndices.add(indexName);
}
});

return Collections.unmodifiableList(matchingIndices);
}

public String getDescription() {
return description;
}
Expand All @@ -89,7 +115,11 @@ public boolean isExternal() {
}

public String getBackingIndexPattern() {
return DataStream.BACKING_INDEX_PREFIX + getDataStreamName() + "-*";
return backingIndexPatternForDataStream(getDataStreamName());
}

private static String backingIndexPatternForDataStream(String dataStream) {
return DataStream.BACKING_INDEX_PREFIX + dataStream + "-*";
}

public List<String> getAllowedElasticProductOrigins() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.ShardLimitValidator;
import org.elasticsearch.indices.SystemDataStreamDescriptor;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -323,13 +324,40 @@ private void startRestore(
Collections.addAll(requestIndices, indicesInRequest);
}

// Determine system indices to restore from requested feature states
final Map<String, List<String>> featureStatesToRestore = getFeatureStatesToRestore(request, snapshotInfo, snapshot);
final Set<String> featureStateIndices = featureStatesToRestore.values()
.stream()
.flatMap(Collection::stream)
.collect(Collectors.toSet());

final Map<String, SystemIndices.Feature> featureSet = systemIndices.getFeatures();
final Set<String> featureStateDataStreams = featureStatesToRestore.keySet().stream().filter(featureName -> {
if (featureSet.containsKey(featureName)) {
return true;
}
logger.warn(
() -> new ParameterizedMessage(
"Restoring snapshot[{}] skipping feature [{}] because it is not available in this cluster",
snapshotInfo.snapshotId(),
featureName
)
);
return false;
})
.map(name -> systemIndices.getFeatures().get(name))
.flatMap(feature -> feature.getDataStreamDescriptors().stream())
.map(SystemDataStreamDescriptor::getDataStreamName)
.collect(Collectors.toSet());

// Get data stream metadata for requested data streams
Tuple<Map<String, DataStream>, Map<String, DataStreamAlias>> result = getDataStreamsToRestore(
repository,
snapshotId,
snapshotInfo,
globalMetadata,
requestIndices,
// include system data stream names in argument to this method
Stream.concat(requestIndices.stream(), featureStateDataStreams.stream()).collect(Collectors.toList()),
request.includeAliases()
);
Map<String, DataStream> dataStreamsToRestore = result.v1();
Expand All @@ -346,13 +374,6 @@ private void startRestore(
.collect(Collectors.toSet());
requestIndices.addAll(dataStreamIndices);

// Determine system indices to restore from requested feature states
final Map<String, List<String>> featureStatesToRestore = getFeatureStatesToRestore(request, snapshotInfo, snapshot);
final Set<String> featureStateIndices = featureStatesToRestore.values()
.stream()
.flatMap(Collection::stream)
.collect(Collectors.toSet());

// Resolve the indices that were directly requested
final List<String> requestedIndicesInSnapshot = filterIndices(
snapshotInfo.indices(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.AssociatedIndexDescriptor;
import org.elasticsearch.indices.SystemDataStreamDescriptor;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.repositories.FinalizeSnapshotContext;
import org.elasticsearch.repositories.IndexId;
Expand Down Expand Up @@ -308,36 +308,43 @@ public ClusterState execute(ClusterState currentState) {
// Store newSnapshot here to be processed in clusterStateProcessed
List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request));

final List<SnapshotFeatureInfo> featureStates;
final Set<SnapshotFeatureInfo> featureStates = new HashSet<>();
final Set<String> systemDataStreamNames = new HashSet<>();
// if we have any feature states in the snapshot, we add their required indices to the snapshot indices if they haven't
// been requested by the request directly
if (featureStatesSet.isEmpty()) {
featureStates = Collections.emptyList();
} else {
final Set<String> indexNames = new HashSet<>(indices);
featureStates = featureStatesSet.stream()
.map(
feature -> new SnapshotFeatureInfo(
feature,
systemIndexDescriptorMap.get(feature)
.getIndexDescriptors()
.stream()
.flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata()).stream())
.collect(Collectors.toList())
)
)
.filter(featureInfo -> featureInfo.getIndices().isEmpty() == false) // Omit any empty featureStates
.collect(Collectors.toList());
for (SnapshotFeatureInfo featureState : featureStates) {
indexNames.addAll(featureState.getIndices());
}
final Set<String> indexNames = new HashSet<>(indices);
for (String featureName : featureStatesSet) {
SystemIndices.Feature feature = systemIndexDescriptorMap.get(featureName);

// Add all resolved indices from the feature states to the list of indices
for (String feature : featureStatesSet) {
for (AssociatedIndexDescriptor aid : systemIndexDescriptorMap.get(feature).getAssociatedIndexDescriptors()) {
indexNames.addAll(aid.getMatchingIndices(currentState.metadata()));
Set<String> featureSystemIndices = feature.getIndexDescriptors()
.stream()
.flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata()).stream())
.collect(Collectors.toSet());
Set<String> featureAssociatedIndices = feature.getAssociatedIndexDescriptors()
.stream()
.flatMap(descriptor -> descriptor.getMatchingIndices(currentState.metadata()).stream())
.collect(Collectors.toSet());

Set<String> featureSystemDataStreams = new HashSet<>();
Set<String> featureDataStreamBackingIndices = new HashSet<>();
for (SystemDataStreamDescriptor sdd : feature.getDataStreamDescriptors()) {
List<String> backingIndexNames = sdd.getBackingIndexNames(currentState.metadata());
if (backingIndexNames.size() > 0) {
featureDataStreamBackingIndices.addAll(backingIndexNames);
featureSystemDataStreams.add(sdd.getDataStreamName());
}
}

if (featureSystemIndices.size() > 0
|| featureAssociatedIndices.size() > 0
|| featureDataStreamBackingIndices.size() > 0) {

featureStates.add(new SnapshotFeatureInfo(featureName, List.copyOf(featureSystemIndices)));
indexNames.addAll(featureSystemIndices);
indexNames.addAll(featureAssociatedIndices);
indexNames.addAll(featureDataStreamBackingIndices);
systemDataStreamNames.addAll(featureSystemDataStreams);
}
indices = List.copyOf(indexNames);
}

Expand All @@ -346,6 +353,7 @@ public ClusterState execute(ClusterState currentState) {
request.indicesOptions(),
request.indices()
);
dataStreams.addAll(systemDataStreamNames);

logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);

Expand Down Expand Up @@ -388,7 +396,7 @@ public ClusterState execute(ClusterState currentState) {
shards,
userMeta,
version,
featureStates
List.copyOf(featureStates)
);
return ClusterState.builder(currentState)
.putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(CollectionUtils.appendToCopy(runningSnapshots, newEntry)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package org.elasticsearch.datastreams;

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.index.IndexResponse;
Expand All @@ -20,6 +19,7 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SystemIndexPlugin;
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.action.CreateDataStreamAction;
Expand All @@ -35,8 +35,11 @@
import java.util.Map;

import static org.elasticsearch.datastreams.SystemDataStreamSnapshotIT.SystemDataStreamTestPlugin.SYSTEM_DATA_STREAM_NAME;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.oneOf;

public class SystemDataStreamSnapshotIT extends AbstractSnapshotIntegTestCase {
Expand Down Expand Up @@ -82,12 +85,91 @@ public void testSystemDataStreamSnapshotIT() throws Exception {
assertTrue(response.getDataStreams().get(0).getDataStream().isSystem());
}

CreateSnapshotResponse createSnapshotResponse = client().admin()
assertSuccessful(
client().admin()
.cluster()
.prepareCreateSnapshot(REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIncludeGlobalState(false)
.execute()
);

// We have to delete the data stream directly, as the feature reset API doesn't clean up system data streams yet
// See https://github.com/elastic/elasticsearch/issues/75818
{
DeleteDataStreamAction.Request request = new DeleteDataStreamAction.Request(new String[] { SYSTEM_DATA_STREAM_NAME });
AcknowledgedResponse response = client().execute(DeleteDataStreamAction.INSTANCE, request).get();
assertTrue(response.isAcknowledged());
}

{
GetIndexResponse indicesRemaining = client().admin().indices().prepareGetIndex().addIndices("_all").get();
assertThat(indicesRemaining.indices(), arrayWithSize(0));
}

RestoreSnapshotResponse restoreSnapshotResponse = client().admin()
.cluster()
.prepareCreateSnapshot(REPO, SNAPSHOT)
.prepareRestoreSnapshot(REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setIncludeGlobalState(false)
.setRestoreGlobalState(false)
.get();
assertEquals(restoreSnapshotResponse.getRestoreInfo().totalShards(), restoreSnapshotResponse.getRestoreInfo().successfulShards());

{
GetDataStreamAction.Request request = new GetDataStreamAction.Request(new String[] { SYSTEM_DATA_STREAM_NAME });
GetDataStreamAction.Response response = client().execute(GetDataStreamAction.INSTANCE, request).get();
assertThat(response.getDataStreams(), hasSize(1));
assertTrue(response.getDataStreams().get(0).getDataStream().isSystem());
}
}

public void testSystemDataStreamInFeatureState() throws Exception {
Path location = randomRepoPath();
createRepository(REPO, "fs", location);

{
CreateDataStreamAction.Request request = new CreateDataStreamAction.Request(SYSTEM_DATA_STREAM_NAME);
final AcknowledgedResponse response = client().execute(CreateDataStreamAction.INSTANCE, request).get();
assertTrue(response.isAcknowledged());
}

// Index a doc so that a concrete backing index will be created
IndexResponse indexToDataStreamResponse = client().prepareIndex(SYSTEM_DATA_STREAM_NAME)
.setId("42")
.setSource("{ \"@timestamp\": \"2099-03-08T11:06:07.000Z\", \"name\": \"my-name\" }", XContentType.JSON)
.setOpType(DocWriteRequest.OpType.CREATE)
.execute()
.actionGet();
assertThat(indexToDataStreamResponse.status().getStatus(), oneOf(200, 201));

// Index a doc so that a concrete backing index will be created
IndexResponse indexResponse = client().prepareIndex("my-index")
.setId("42")
.setSource("{ \"name\": \"my-name\" }", XContentType.JSON)
.setOpType(DocWriteRequest.OpType.CREATE)
.execute()
.get();
assertThat(indexResponse.status().getStatus(), oneOf(200, 201));

{
GetDataStreamAction.Request request = new GetDataStreamAction.Request(new String[] { SYSTEM_DATA_STREAM_NAME });
GetDataStreamAction.Response response = client().execute(GetDataStreamAction.INSTANCE, request).get();
assertThat(response.getDataStreams(), hasSize(1));
assertTrue(response.getDataStreams().get(0).getDataStream().isSystem());
}

SnapshotInfo snapshotInfo = assertSuccessful(
client().admin()
.cluster()
.prepareCreateSnapshot(REPO, SNAPSHOT)
.setIndices("my-index")
.setFeatureStates(SystemDataStreamTestPlugin.class.getSimpleName())
.setWaitForCompletion(true)
.setIncludeGlobalState(false)
.execute()
);

assertThat(snapshotInfo.dataStreams(), not(empty()));

// We have to delete the data stream directly, as the feature reset API doesn't clean up system data streams yet
// See https://github.com/elastic/elasticsearch/issues/75818
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment to the linked issue so we remember to clean up the places we have to work around this in tests. No action required here, just noting.

Expand All @@ -97,6 +179,8 @@ public void testSystemDataStreamSnapshotIT() throws Exception {
assertTrue(response.isAcknowledged());
}

assertAcked(client().admin().indices().prepareDelete("my-index"));

{
GetIndexResponse indicesRemaining = client().admin().indices().prepareGetIndex().addIndices("_all").get();
assertThat(indicesRemaining.indices(), arrayWithSize(0));
Expand All @@ -106,7 +190,8 @@ public void testSystemDataStreamSnapshotIT() throws Exception {
.cluster()
.prepareRestoreSnapshot(REPO, SNAPSHOT)
.setWaitForCompletion(true)
.setRestoreGlobalState(false)
.setIndices("my-index")
.setFeatureStates(SystemDataStreamTestPlugin.class.getSimpleName())
.get();
assertEquals(restoreSnapshotResponse.getRestoreInfo().totalShards(), restoreSnapshotResponse.getRestoreInfo().successfulShards());

Expand Down