Skip to content

Commit 52afaf2

Browse files
authored
Protect replicated data streams against local rollovers (#64710)
When a data stream is being auto followed then a rollover in a local cluster can break auto following, if the local cluster performs a rollover then it creates a new write index and if then later the remote cluster rolls over as well then that new write index can't be replicated, because it has the same name as in the write index in the local cluster, which was created earlier. If a data stream is managed by ccr, then the local cluster should not do a rollover for those data streams. The data stream should be rolled over in the remote cluster and that change should replicate to the local cluster. Performing a rollover in the local cluster is an operation that the data stream support in ccr should perform. To protect against rolling over a replicated data stream, this PR adds a replicate field to DataStream class. The rollover api will fail with an error in case a data stream is being rolled over and the targeted data stream is a replicated data stream. When the put follow api creates a data stream in the local cluster then the replicate flag is set to true. There should be a way to turn a replicated data stream into a regular data stream when for example during disaster recovery. The newly added api in this pr (promote data stream api) is doing that. After a replicated data stream is promoted to a regular data stream then the local data stream can be rolled over, so that the new write index is no longer a follower index. Also if the put follow api is attempting to update this data stream (for example to attempt to resume auto following) then that with fail, because the data stream is no longer a replicated data stream. Today with time based indices behind an alias, the is_write_index property isn't replicated from remote cluster to the local cluster, so when attempting to rollover the alias in the local cluster the rollover fails, because the alias doesn't have a write index. The added replicated field in the DataStream class and added validation achieve the same kind of protection, but in a more robust way. A followup from #61993.
1 parent c58937b commit 52afaf2

File tree

21 files changed

+762
-76
lines changed

21 files changed

+762
-76
lines changed

docs/reference/ccr/auto-follow.asciidoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ automatically followed if the data stream name matches an auto-follow
1313
pattern. If you create a data stream after creating the auto-follow pattern,
1414
all backing indices are followed automatically.
1515

16+
The data streams replicated from a remote cluster by CCR are protected from
17+
local rollovers. The <<promote-data-stream-api,promote data stream API>>
18+
can be used to turn these data streams into regular data streams.
1619

1720
Auto-follow patterns are especially useful with
1821
<<index-lifecycle-management,{ilm-cap}>>, which might continually create

docs/reference/data-streams/data-stream-apis.asciidoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ The following APIs are available for managing <<data-streams,data streams>>:
99
* <<indices-get-data-stream>>
1010
* <<indices-migrate-to-data-stream>>
1111
* <<data-stream-stats-api>>
12+
* <<promote-data-stream-api>>
1213

1314
For concepts and tutorials, see <<data-streams>>.
1415

@@ -21,3 +22,5 @@ include::{es-repo-dir}/indices/get-data-stream.asciidoc[]
2122
include::{es-repo-dir}/indices/migrate-to-data-stream.asciidoc[]
2223

2324
include::{es-repo-dir}/indices/data-stream-stats.asciidoc[]
25+
26+
include::{es-repo-dir}/data-streams/promote-data-stream-api.asciidoc[]
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
[role="xpack"]
2+
[[promote-data-stream-api]]
3+
=== Promote Data Stream API
4+
++++
5+
<titleabbrev>Promote data stream api</titleabbrev>
6+
++++
7+
8+
The purpose of the promote data stream api is to turn
9+
a data stream that is replicated by CCR into a regular
10+
data stream.
11+
12+
Via CCR Auto Following, a data stream from a remote cluster
13+
can be replicated to the local cluster. These data streams
14+
can't be rolled over in the local cluster. Only if the upstream
15+
data stream rolls over then these replicated data streams roll
16+
over as well. In the event that the remote cluster is no longer
17+
available, the data stream in the local cluster can be promoted
18+
to a regular data stream, which allows these data streams to
19+
be rolled over in the local cluster.
20+
21+
[source,console]
22+
----
23+
POST /_data_stream/_promote/my-data-stream
24+
----
25+
// TEST[catch:missing]
26+
27+
[[promote-data-stream-api-request]]
28+
==== {api-request-title}
29+
30+
`POST /_data_stream/_promote/<data-stream>`
31+
32+
33+
[[promote-data-stream-api-path-params]]
34+
==== {api-path-parms-title}
35+
36+
`<data-stream>`::
37+
(Required, string)
38+
The name of the data stream to promote.

server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.common.Strings;
3939
import org.elasticsearch.common.inject.Inject;
4040
import org.elasticsearch.common.settings.Settings;
41+
import org.elasticsearch.index.Index;
4142
import org.elasticsearch.threadpool.ThreadPool;
4243

4344
import java.util.List;
@@ -148,6 +149,7 @@ private RolloverResult rolloverDataStream(ClusterState currentState, IndexAbstra
148149
final DataStream ds = dataStream.getDataStream();
149150
final IndexMetadata originalWriteIndex = dataStream.getWriteIndex();
150151
final String newWriteIndexName = DataStream.getDefaultBackingIndexName(ds.getName(), ds.getGeneration() + 1);
152+
ds.rollover(new Index(newWriteIndexName, "uuid")); // just for validation
151153
createIndexService.validateIndexName(newWriteIndexName, currentState); // fails if the index already exists
152154
if (onlyValidate) {
153155
return new RolloverResult(newWriteIndexName, originalWriteIndex.getIndex().getName(), currentState);

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,26 +44,29 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
4444

4545
public static final String BACKING_INDEX_PREFIX = ".ds-";
4646
public static final Version HIDDEN_VERSION = Version.V_7_11_0;
47+
public static final Version REPLICATED_VERSION = Version.V_8_0_0;
4748

4849
private final String name;
4950
private final TimestampField timeStampField;
5051
private final List<Index> indices;
5152
private final long generation;
5253
private final Map<String, Object> metadata;
5354
private final boolean hidden;
55+
private final boolean replicated;
5456

5557
public DataStream(String name, TimestampField timeStampField, List<Index> indices, long generation, Map<String, Object> metadata) {
56-
this(name, timeStampField, indices, generation, metadata, false);
58+
this(name, timeStampField, indices, generation, metadata, false, false);
5759
}
5860

5961
public DataStream(String name, TimestampField timeStampField, List<Index> indices, long generation, Map<String, Object> metadata,
60-
boolean hidden) {
62+
boolean hidden, boolean replicated) {
6163
this.name = name;
6264
this.timeStampField = timeStampField;
6365
this.indices = Collections.unmodifiableList(indices);
6466
this.generation = generation;
6567
this.metadata = metadata;
6668
this.hidden = hidden;
69+
this.replicated = replicated;
6770
assert indices.size() > 0;
6871
}
6972

@@ -100,6 +103,16 @@ public boolean isHidden() {
100103
return hidden;
101104
}
102105

106+
/**
107+
* Determines whether this data stream is replicated from elsewhere,
108+
* for example a remote cluster.
109+
*
110+
* @return Whether this data stream is replicated.
111+
*/
112+
public boolean isReplicated() {
113+
return replicated;
114+
}
115+
103116
/**
104117
* Performs a rollover on a {@code DataStream} instance and returns a new instance containing
105118
* the updated list of backing indices and incremented generation.
@@ -110,9 +123,14 @@ public boolean isHidden() {
110123
*/
111124
public DataStream rollover(Index newWriteIndex) {
112125
assert newWriteIndex.getName().equals(getDefaultBackingIndexName(name, generation + 1));
126+
if (replicated) {
127+
throw new IllegalArgumentException("data stream [" + name + "] cannot be rolled over, " +
128+
"because it is a replicated data stream");
129+
}
130+
113131
List<Index> backingIndices = new ArrayList<>(indices);
114132
backingIndices.add(newWriteIndex);
115-
return new DataStream(name, timeStampField, backingIndices, generation + 1, metadata, hidden);
133+
return new DataStream(name, timeStampField, backingIndices, generation + 1, metadata, hidden, replicated);
116134
}
117135

118136
/**
@@ -126,7 +144,7 @@ public DataStream removeBackingIndex(Index index) {
126144
List<Index> backingIndices = new ArrayList<>(indices);
127145
backingIndices.remove(index);
128146
assert backingIndices.size() == indices.size() - 1;
129-
return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden);
147+
return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated);
130148
}
131149

132150
/**
@@ -151,7 +169,11 @@ public DataStream replaceBackingIndex(Index existingBackingIndex, Index newBacki
151169
"it is the write index", existingBackingIndex.getName(), name));
152170
}
153171
backingIndices.set(backingIndexPosition, newBackingIndex);
154-
return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden);
172+
return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated);
173+
}
174+
175+
public DataStream promoteDataStream() {
176+
return new DataStream(name, timeStampField, indices, getGeneration(), metadata, hidden, false);
155177
}
156178

157179
/**
@@ -169,7 +191,8 @@ public static String getDefaultBackingIndexName(String dataStreamName, long gene
169191
public DataStream(StreamInput in) throws IOException {
170192
this(in.readString(), new TimestampField(in), in.readList(Index::new), in.readVLong(),
171193
in.getVersion().onOrAfter(Version.V_7_11_0) ? in.readMap(): null,
172-
in.getVersion().onOrAfter(HIDDEN_VERSION) && in.readBoolean());
194+
in.getVersion().onOrAfter(HIDDEN_VERSION) && in.readBoolean(),
195+
in.getVersion().onOrAfter(REPLICATED_VERSION) && in.readBoolean());
173196
}
174197

175198
public static Diff<DataStream> readDiffFrom(StreamInput in) throws IOException {
@@ -188,6 +211,9 @@ public void writeTo(StreamOutput out) throws IOException {
188211
if (out.getVersion().onOrAfter(HIDDEN_VERSION)) {
189212
out.writeBoolean(hidden);
190213
}
214+
if (out.getVersion().onOrAfter(REPLICATED_VERSION)) {
215+
out.writeBoolean(replicated);
216+
}
191217
}
192218

193219
public static final ParseField NAME_FIELD = new ParseField("name");
@@ -196,11 +222,12 @@ public void writeTo(StreamOutput out) throws IOException {
196222
public static final ParseField GENERATION_FIELD = new ParseField("generation");
197223
public static final ParseField METADATA_FIELD = new ParseField("_meta");
198224
public static final ParseField HIDDEN_FIELD = new ParseField("hidden");
225+
public static final ParseField REPLICATED_FIELD = new ParseField("replicated");
199226

200227
@SuppressWarnings("unchecked")
201228
private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>("data_stream",
202229
args -> new DataStream((String) args[0], (TimestampField) args[1], (List<Index>) args[2], (Long) args[3],
203-
(Map<String, Object>) args[4], args[5] != null && (boolean) args[5]));
230+
(Map<String, Object>) args[4], args[5] != null && (boolean) args[5], args[6] != null && (boolean) args[6]));
204231

205232
static {
206233
PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD);
@@ -209,6 +236,7 @@ public void writeTo(StreamOutput out) throws IOException {
209236
PARSER.declareLong(ConstructingObjectParser.constructorArg(), GENERATION_FIELD);
210237
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), METADATA_FIELD);
211238
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), HIDDEN_FIELD);
239+
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), REPLICATED_FIELD);
212240
}
213241

214242
public static DataStream fromXContent(XContentParser parser) throws IOException {
@@ -226,6 +254,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
226254
builder.field(METADATA_FIELD.getPreferredName(), metadata);
227255
}
228256
builder.field(HIDDEN_FIELD.getPreferredName(), hidden);
257+
builder.field(REPLICATED_FIELD.getPreferredName(), replicated);
229258
builder.endObject();
230259
return builder;
231260
}
@@ -239,12 +268,14 @@ public boolean equals(Object o) {
239268
timeStampField.equals(that.timeStampField) &&
240269
indices.equals(that.indices) &&
241270
generation == that.generation &&
242-
Objects.equals(metadata, that.metadata);
271+
Objects.equals(metadata, that.metadata) &&
272+
hidden == that.hidden &&
273+
replicated == that.replicated;
243274
}
244275

245276
@Override
246277
public int hashCode() {
247-
return Objects.hash(name, timeStampField, indices, generation, metadata);
278+
return Objects.hash(name, timeStampField, indices, generation, metadata, hidden, replicated);
248279
}
249280

250281
public static final class TimestampField implements Writeable, ToXContentObject {

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIn
185185
dsBackingIndices.add(writeIndex.getIndex());
186186
boolean hidden = template.getDataStreamTemplate().isHidden();
187187
DataStream newDataStream = new DataStream(dataStreamName, timestampField, dsBackingIndices, 1L,
188-
template.metadata() != null ? Map.copyOf(template.metadata()) : null, hidden);
188+
template.metadata() != null ? Map.copyOf(template.metadata()) : null, hidden, false);
189189
Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(newDataStream);
190190
logger.info("adding data stream [{}] with write index [{}] and backing indices [{}]", dataStreamName,
191191
writeIndex.getIndex().getName(),

server/src/main/java/org/elasticsearch/snapshots/RestoreService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -623,7 +623,7 @@ static DataStream updateDataStream(DataStream dataStream, Metadata.Builder metad
623623
.map(i -> metadata.get(renameIndex(i.getName(), request, true)).getIndex())
624624
.collect(Collectors.toList());
625625
return new DataStream(dataStreamName, dataStream.getTimeStampField(), updatedIndices, dataStream.getGeneration(),
626-
dataStream.getMetadata(), dataStream.isHidden());
626+
dataStream.getMetadata(), dataStream.isHidden(), dataStream.isReplicated());
627627
}
628628

629629
public static RestoreInProgress updateRestoreStateWithDeletedIndices(RestoreInProgress oldRestore, Set<Index> deletedIndices) {

server/src/test/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverServiceTests.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,9 @@ public void testRolloverClusterState() throws Exception {
536536
}
537537

538538
public void testRolloverClusterStateForDataStream() throws Exception {
539-
final DataStream dataStream = DataStreamTestHelper.randomInstance();
539+
final DataStream dataStream = DataStreamTestHelper.randomInstance()
540+
// ensure no replicate data stream
541+
.promoteDataStream();
540542
ComposableIndexTemplate template = new ComposableIndexTemplate(List.of(dataStream.getName() + "*"), null, null, null, null, null,
541543
new ComposableIndexTemplate.DataStreamTemplate(), null);
542544
Metadata.Builder builder = Metadata.builder();
@@ -632,7 +634,9 @@ public void testValidation() throws Exception {
632634
final boolean useDataStream = randomBoolean();
633635
final Metadata.Builder builder = Metadata.builder();
634636
if (useDataStream) {
635-
DataStream dataStream = DataStreamTestHelper.randomInstance();
637+
DataStream dataStream = DataStreamTestHelper.randomInstance()
638+
// ensure no replicate data stream
639+
.promoteDataStream();
636640
rolloverTarget = dataStream.getName();
637641
sourceIndexName = dataStream.getIndices().get(dataStream.getIndices().size() - 1).getName();
638642
defaultRolloverIndexName = DataStream.getDefaultBackingIndexName(dataStream.getName(), dataStream.getGeneration() + 1);

server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ protected DataStream createTestInstance() {
5252
}
5353

5454
public void testRollover() {
55-
DataStream ds = DataStreamTestHelper.randomInstance();
55+
DataStream ds = DataStreamTestHelper.randomInstance().promoteDataStream();
5656
Index newWriteIndex = new Index(getDefaultBackingIndexName(ds.getName(), ds.getGeneration() + 1), UUIDs.randomBase64UUID(random()));
5757
DataStream rolledDs = ds.rollover(newWriteIndex);
5858

server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2127,7 +2127,7 @@ public void testHiddenDataStreams() {
21272127
.put(index2, false)
21282128
.put(justAnIndex, false)
21292129
.put(new DataStream(dataStream1, createTimestampField("@timestamp"),
2130-
List.of(index1.getIndex(), index2.getIndex()), 2, Collections.emptyMap(), true))).build();
2130+
List.of(index1.getIndex(), index2.getIndex()), 2, Collections.emptyMap(), true, false))).build();
21312131

21322132
Index[] result = indexNameExpressionResolver.concreteIndices(state, IndicesOptions.strictExpandHidden(), true, "logs-*");
21332133
assertThat(result, arrayContainingInAnyOrder(index1.getIndex(), index2.getIndex(), justAnIndex.getIndex() ));

0 commit comments

Comments
 (0)