Skip to content

Commit 71b476b

Browse files
authored
[7.x] Improve data stream rollover and simplify cluster metadata validation for data streams (#70987)
1 parent d4e7ad3 commit 71b476b

File tree

42 files changed

+114
-190
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+114
-190
lines changed

client/rest-high-level/src/test/java/org/elasticsearch/client/indices/GetDataStreamResponseTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
package org.elasticsearch.client.indices;
1010

1111
import org.elasticsearch.client.AbstractResponseTestCase;
12-
import org.elasticsearch.cluster.DataStreamTestHelper;
12+
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
1313
import org.elasticsearch.cluster.health.ClusterHealthStatus;
1414
import org.elasticsearch.cluster.metadata.DataStream;
1515
import org.elasticsearch.common.xcontent.XContentParser;

server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ private NameResolution resolveDataStreamRolloverNames(ClusterState currentState,
159159
final Version minNodeVersion = currentState.nodes().getMinNodeVersion();
160160
final DataStream ds = dataStream.getDataStream();
161161
final IndexMetadata originalWriteIndex = dataStream.getWriteIndex();
162-
final DataStream rolledDataStream = ds.rollover("uuid", minNodeVersion);
162+
final DataStream rolledDataStream = ds.rollover(currentState.getMetadata(), "uuid", minNodeVersion);
163163
return new NameResolution(originalWriteIndex.getIndex().getName(), null, rolledDataStream.getWriteIndex().getName());
164164
}
165165

@@ -217,7 +217,7 @@ private RolloverResult rolloverDataStream(ClusterState currentState, IndexAbstra
217217
final Version minNodeVersion = currentState.nodes().getMinNodeVersion();
218218
final DataStream ds = dataStream.getDataStream();
219219
final IndexMetadata originalWriteIndex = dataStream.getWriteIndex();
220-
DataStream rolledDataStream = ds.rollover("uuid", minNodeVersion);
220+
DataStream rolledDataStream = ds.rollover(currentState.metadata(), "uuid", minNodeVersion);
221221
createIndexService.validateIndexName(rolledDataStream.getWriteIndex().getName(), currentState); // fails if the index already exists
222222
if (onlyValidate) {
223223
return new RolloverResult(rolledDataStream.getWriteIndex().getName(), originalWriteIndex.getIndex().getName(), currentState);
@@ -226,7 +226,7 @@ private RolloverResult rolloverDataStream(ClusterState currentState, IndexAbstra
226226
CreateIndexClusterStateUpdateRequest createIndexClusterStateRequest =
227227
prepareDataStreamCreateIndexRequest(dataStreamName, rolledDataStream.getWriteIndex().getName(), createIndexRequest);
228228
ClusterState newState = createIndexService.applyCreateIndexRequest(currentState, createIndexClusterStateRequest, silent,
229-
(builder, indexMetadata) -> builder.put(ds.rollover(indexMetadata.getIndexUUID(), minNodeVersion)));
229+
(builder, indexMetadata) -> builder.put(ds.rollover(currentState.metadata(), indexMetadata.getIndexUUID(), minNodeVersion)));
230230

231231
RolloverInfo rolloverInfo = new RolloverInfo(dataStreamName, metConditions, threadPool.absoluteTimeInMillis());
232232
newState = ClusterState.builder(newState)

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.Locale;
3232
import java.util.Map;
3333
import java.util.Objects;
34+
import java.util.function.LongSupplier;
3435

3536
public final class DataStream extends AbstractDiffable<DataStream> implements ToXContentObject {
3637

@@ -42,6 +43,7 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
4243
*/
4344
public static final Version NEW_FEATURES_VERSION = Version.V_7_11_0;
4445

46+
private final LongSupplier timeProvider;
4547
private final String name;
4648
private final TimestampField timeStampField;
4749
private final List<Index> indices;
@@ -56,13 +58,20 @@ public DataStream(String name, TimestampField timeStampField, List<Index> indice
5658

5759
public DataStream(String name, TimestampField timeStampField, List<Index> indices, long generation, Map<String, Object> metadata,
5860
boolean hidden, boolean replicated) {
61+
this(name, timeStampField, indices, generation, metadata, hidden, replicated, System::currentTimeMillis);
62+
}
63+
64+
// visible for testing
65+
DataStream(String name, TimestampField timeStampField, List<Index> indices, long generation, Map<String, Object> metadata,
66+
boolean hidden, boolean replicated, LongSupplier timeProvider) {
5967
this.name = name;
6068
this.timeStampField = timeStampField;
6169
this.indices = Collections.unmodifiableList(indices);
6270
this.generation = generation;
6371
this.metadata = metadata;
6472
this.hidden = hidden;
6573
this.replicated = replicated;
74+
this.timeProvider = timeProvider;
6675
assert indices.size() > 0;
6776
}
6877

@@ -113,21 +122,27 @@ public boolean isReplicated() {
113122
* Performs a rollover on a {@code DataStream} instance and returns a new instance containing
114123
* the updated list of backing indices and incremented generation.
115124
*
125+
* @param clusterMetadata Cluster metadata
116126
* @param writeIndexUuid UUID for the data stream's new write index
117127
* @param minNodeVersion minimum cluster node version
118128
*
119129
* @return new {@code DataStream} instance with the rollover operation applied
120130
*/
121-
public DataStream rollover(String writeIndexUuid, Version minNodeVersion) {
131+
public DataStream rollover(Metadata clusterMetadata, String writeIndexUuid, Version minNodeVersion) {
122132
if (replicated) {
123133
throw new IllegalArgumentException("data stream [" + name + "] cannot be rolled over, " +
124134
"because it is a replicated data stream");
125135
}
126136

127137
List<Index> backingIndices = new ArrayList<>(indices);
128-
final String newWriteIndexName = DataStream.getDefaultBackingIndexName(getName(), getGeneration() + 1, minNodeVersion);
138+
String newWriteIndexName;
139+
long generation = this.generation;
140+
long currentTimeMillis = timeProvider.getAsLong();
141+
do {
142+
newWriteIndexName = DataStream.getDefaultBackingIndexName(getName(), ++generation, currentTimeMillis, minNodeVersion);
143+
} while (clusterMetadata.getIndicesLookup().containsKey(newWriteIndexName));
129144
backingIndices.add(new Index(newWriteIndexName, writeIndexUuid));
130-
return new DataStream(name, timeStampField, backingIndices, generation + 1, metadata, hidden, replicated);
145+
return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated);
131146
}
132147

133148
/**
@@ -170,7 +185,7 @@ public DataStream replaceBackingIndex(Index existingBackingIndex, Index newBacki
170185
}
171186

172187
public DataStream promoteDataStream() {
173-
return new DataStream(name, timeStampField, indices, getGeneration(), metadata, hidden, false);
188+
return new DataStream(name, timeStampField, indices, getGeneration(), metadata, hidden, false, timeProvider);
174189
}
175190

176191
/**

server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java

Lines changed: 0 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@
6969
import java.util.TreeMap;
7070
import java.util.function.Function;
7171
import java.util.function.Predicate;
72-
import java.util.regex.Pattern;
7372
import java.util.stream.Collectors;
7473
import java.util.stream.StreamSupport;
7574

@@ -82,7 +81,6 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, To
8281

8382
public static final String ALL = "_all";
8483
public static final String UNKNOWN_CLUSTER_UUID = "_na_";
85-
public static final Pattern BACKING_INDEX_SUFFIX = Pattern.compile("(\\d{4}\\.\\d{2}\\.\\d{2}-)?[0-9]+$");
8684

8785
public enum XContentContext {
8886
/* Custom metadata should be returns as part of API call */
@@ -1568,51 +1566,8 @@ private SortedMap<String, IndexAbstraction> buildIndicesLookup() {
15681566
return indicesLookup;
15691567
}
15701568

1571-
/**
1572-
* Validates there isn't any index with a name that could clash with the future backing indices of the existing data streams.
1573-
*
1574-
* E.g., if data stream `foo` has backing indices [`.ds-foo-yyyy.MM.dd-000001`, `.ds-foo-yyyy.MM.dd-000002`] and the indices lookup
1575-
* contains indices `.ds-foo-yyyy-MM.dd.000001`, `.ds-foo-yyyy.MM.dd-000002` and `.ds-foo-yyyy.MM.dd-000006` this will throw an
1576-
* IllegalStateException as attempting to rollover the `foo` data stream from generation 5 to 6 may not be possible
1577-
*
1578-
* @param indicesLookup the indices in the system including the data stream backing indices
1579-
* @param dsMetadata the data streams in the system
1580-
*/
15811569
static void validateDataStreams(SortedMap<String, IndexAbstraction> indicesLookup, @Nullable DataStreamMetadata dsMetadata) {
15821570
if (dsMetadata != null) {
1583-
for (DataStream ds : dsMetadata.dataStreams().values()) {
1584-
String prefix = DataStream.BACKING_INDEX_PREFIX + ds.getName() + "-";
1585-
Set<String> conflicts =
1586-
indicesLookup.subMap(prefix, DataStream.BACKING_INDEX_PREFIX + ds.getName() + ".") // '.' is the char after '-'
1587-
.keySet().stream()
1588-
.filter(s -> BACKING_INDEX_SUFFIX.matcher(s.substring(prefix.length())).matches())
1589-
.filter(s -> IndexMetadata.parseIndexNameCounter(s) > ds.getGeneration())
1590-
.filter(indexName -> {
1591-
// Logic to avoid marking backing indices of other data streams as conflict:
1592-
1593-
// Backing index pattern is either .ds-[ds-name]-[date]-[generation] for 7.11 and up or
1594-
// .ds-[ds-name]-[generation] for 7.9 to 7.10.2. So two step process to capture the data stream name:
1595-
String dataStreamName =
1596-
indexName.substring(DataStream.BACKING_INDEX_PREFIX.length(), indexName.lastIndexOf('-'));
1597-
if (dsMetadata.dataStreams().containsKey(dataStreamName)) {
1598-
return false;
1599-
}
1600-
dataStreamName = indexName.substring(0, indexName.lastIndexOf('-'));
1601-
if (dsMetadata.dataStreams().containsKey(dataStreamName)) {
1602-
return false;
1603-
} else {
1604-
return true;
1605-
}
1606-
})
1607-
.collect(Collectors.toSet());
1608-
1609-
if (conflicts.size() > 0) {
1610-
throw new IllegalStateException("data stream [" + ds.getName() +
1611-
"] could create backing indices that conflict with " + conflicts.size() + " existing index(s) or alias(s)" +
1612-
" including '" + conflicts.iterator().next() + "'");
1613-
}
1614-
}
1615-
16161571
// Sanity check, because elsewhere a more user friendly error should have occurred:
16171572
List<String> conflictingAliases = indicesLookup.values().stream()
16181573
.filter(ia -> ia.getType() == IndexAbstraction.Type.ALIAS)

server/src/test/java/org/elasticsearch/action/admin/indices/mapping/put/PutMappingRequestTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
import org.elasticsearch.action.ActionRequestValidationException;
1212
import org.elasticsearch.cluster.ClusterState;
13-
import org.elasticsearch.cluster.DataStreamTestHelper;
13+
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
1414
import org.elasticsearch.cluster.metadata.AliasMetadata;
1515
import org.elasticsearch.cluster.metadata.IndexAbstraction;
1616
import org.elasticsearch.cluster.metadata.IndexMetadata;

server/src/test/java/org/elasticsearch/action/admin/indices/resolve/ResolveIndexTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141

4242
import static java.util.Arrays.asList;
4343
import static java.util.Collections.singletonList;
44-
import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
44+
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createTimestampField;
4545
import static org.hamcrest.Matchers.arrayContaining;
4646
import static org.hamcrest.Matchers.equalTo;
4747
import static org.hamcrest.Matchers.is;

server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import org.elasticsearch.action.support.ActiveShardCount;
1616
import org.elasticsearch.cluster.ClusterName;
1717
import org.elasticsearch.cluster.ClusterState;
18-
import org.elasticsearch.cluster.DataStreamTestHelper;
18+
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
1919
import org.elasticsearch.cluster.metadata.AliasAction;
2020
import org.elasticsearch.cluster.metadata.AliasMetadata;
2121
import org.elasticsearch.cluster.metadata.AliasValidator;
@@ -71,7 +71,7 @@
7171
import java.util.Locale;
7272
import java.util.Map;
7373

74-
import static org.elasticsearch.cluster.DataStreamTestHelper.generateMapping;
74+
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.generateMapping;
7575
import static org.hamcrest.Matchers.containsString;
7676
import static org.hamcrest.Matchers.equalTo;
7777
import static org.hamcrest.Matchers.greaterThanOrEqualTo;

server/src/test/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommandTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
import java.util.stream.Collectors;
3131
import java.util.stream.Stream;
3232

33-
import static org.elasticsearch.cluster.DataStreamTestHelper.createFirstBackingIndex;
34-
import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
33+
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createFirstBackingIndex;
34+
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createTimestampField;
3535
import static org.hamcrest.Matchers.equalTo;
3636
import static org.hamcrest.Matchers.instanceOf;
3737
import static org.hamcrest.Matchers.not;

server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamMetadataTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
package org.elasticsearch.cluster.metadata;
1010

11-
import org.elasticsearch.cluster.DataStreamTestHelper;
1211
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1312
import org.elasticsearch.test.AbstractNamedWriteableTestCase;
1413

server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
package org.elasticsearch.cluster.metadata;
99

1010
import org.elasticsearch.Version;
11-
import org.elasticsearch.cluster.DataStreamTestHelper;
1211
import org.elasticsearch.common.UUIDs;
1312
import org.elasticsearch.common.io.stream.Writeable;
1413
import org.elasticsearch.common.xcontent.XContentParser;
@@ -23,7 +22,7 @@
2322
import java.util.Locale;
2423
import java.util.stream.Collectors;
2524

26-
import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
25+
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createTimestampField;
2726
import static org.hamcrest.Matchers.equalTo;
2827
import static org.hamcrest.Matchers.everyItem;
2928
import static org.hamcrest.Matchers.hasItems;
@@ -49,7 +48,7 @@ protected DataStream createTestInstance() {
4948

5049
public void testRollover() {
5150
DataStream ds = DataStreamTestHelper.randomInstance().promoteDataStream();
52-
DataStream rolledDs = ds.rollover(UUIDs.randomBase64UUID(), DataStream.NEW_FEATURES_VERSION);
51+
DataStream rolledDs = ds.rollover(Metadata.EMPTY_METADATA, UUIDs.randomBase64UUID(), DataStream.NEW_FEATURES_VERSION);
5352

5453
assertThat(rolledDs.getName(), equalTo(ds.getName()));
5554
assertThat(rolledDs.getTimeStampField(), equalTo(ds.getTimeStampField()));
@@ -61,7 +60,7 @@ public void testRollover() {
6160

6261
public void testRolloverWithLegacyBackingIndexNames() {
6362
DataStream ds = DataStreamTestHelper.randomInstance().promoteDataStream();
64-
DataStream rolledDs = ds.rollover(UUIDs.randomBase64UUID(), Version.V_7_10_0);
63+
DataStream rolledDs = ds.rollover(Metadata.EMPTY_METADATA, UUIDs.randomBase64UUID(), Version.V_7_10_0);
6564

6665
assertThat(rolledDs.getName(), equalTo(ds.getName()));
6766
assertThat(rolledDs.getTimeStampField(), equalTo(ds.getTimeStampField()));
@@ -72,6 +71,31 @@ public void testRolloverWithLegacyBackingIndexNames() {
7271
equalTo(DataStream.getLegacyDefaultBackingIndexName(ds.getName(), ds.getGeneration() + 1)));
7372
}
7473

74+
public void testRolloverWithConflictingBackingIndexName() {
75+
// used a fixed time provider to guarantee name conflicts
76+
DataStream ds = DataStreamTestHelper.randomInstance(() -> 0L).promoteDataStream();
77+
78+
// create some indices with names that conflict with the names of the data stream's backing indices
79+
int numConflictingIndices = randomIntBetween(1, 10);
80+
Metadata.Builder builder = Metadata.builder();
81+
for (int k = 1; k <= numConflictingIndices; k++) {
82+
IndexMetadata im = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(ds.getName(), ds.getGeneration() + k, 0L))
83+
.settings(settings(Version.CURRENT))
84+
.numberOfShards(1)
85+
.numberOfReplicas(1)
86+
.build();
87+
builder.put(im, false);
88+
}
89+
90+
DataStream rolledDs = ds.rollover(builder.build(), UUIDs.randomBase64UUID(), DataStream.NEW_FEATURES_VERSION);
91+
assertThat(rolledDs.getName(), equalTo(ds.getName()));
92+
assertThat(rolledDs.getTimeStampField(), equalTo(ds.getTimeStampField()));
93+
assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + numConflictingIndices + 1));
94+
assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1));
95+
assertTrue(rolledDs.getIndices().containsAll(ds.getIndices()));
96+
assertTrue(rolledDs.getIndices().contains(rolledDs.getWriteIndex()));
97+
}
98+
7599
public void testRemoveBackingIndex() {
76100
int numBackingIndices = randomIntBetween(2, 32);
77101
int indexToRemove = randomIntBetween(1, numBackingIndices - 1);

0 commit comments

Comments
 (0)