Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f0d9076
Added test that fails now, but tests that rolling over a data stream …
martijnvg Nov 5, 2020
10d7ffc
Added replicate flag to data stream and promote data stream api.
martijnvg Nov 6, 2020
5514ae8
Merge remote-tracking branch 'es/master' into ccr_data_stream_support…
martijnvg Nov 9, 2020
461773d
fix precommit
martijnvg Nov 9, 2020
ef71529
nit
martijnvg Nov 9, 2020
7237930
fixed tests
martijnvg Nov 9, 2020
e9b6627
fixed tests
martijnvg Nov 9, 2020
e031d72
fixed npe
martijnvg Nov 9, 2020
b1c57d1
Merge remote-tracking branch 'es/master' into ccr_data_stream_support…
martijnvg Nov 10, 2020
0c22ce1
fixed test
martijnvg Nov 10, 2020
73877d6
Merge remote-tracking branch 'es/master' into ccr_data_stream_support…
martijnvg Nov 12, 2020
5cbaf3d
Merge remote-tracking branch 'es/master' into ccr_data_stream_support…
martijnvg Nov 16, 2020
c9d410f
Merge remote-tracking branch 'es/master' into ccr_data_stream_support…
martijnvg Nov 16, 2020
5efa943
added ccr bi-directional test with data streams
martijnvg Nov 17, 2020
312a788
Merge remote-tracking branch 'es/master' into ccr_data_stream_support…
martijnvg Nov 17, 2020
56074c3
added docs
martijnvg Nov 19, 2020
57941ac
Merge remote-tracking branch 'es/master' into ccr_data_stream_support…
martijnvg Nov 20, 2020
1b49909
Added a test, which verifies that an alias in follow cluster can't be…
martijnvg Nov 23, 2020
5700b67
Merge remote-tracking branch 'es/master' into ccr_data_stream_support…
martijnvg Nov 23, 2020
5528da4
fix checkstyle
martijnvg Nov 23, 2020
a21454e
added rest spec and renamed rest action
martijnvg Nov 23, 2020
ef26773
Merge remote-tracking branch 'es/master' into ccr_data_stream_support…
martijnvg Dec 2, 2020
5051eaf
Merge remote-tracking branch 'es/master' into ccr_data_stream_support…
martijnvg Dec 7, 2020
676c7a8
added TODOs
martijnvg Dec 7, 2020
8e21fc7
varify
martijnvg Dec 7, 2020
9c7a071
mark promote ds api as non operator api
martijnvg Dec 7, 2020
23c59b8
Merge remote-tracking branch 'es/master' into ccr_data_stream_support…
martijnvg Dec 8, 2020
a80f777
fixed typo
martijnvg Dec 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/reference/ccr/auto-follow.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ automatically followed if the data stream name matches an auto-follow
pattern. If you create a data stream after creating the auto-follow pattern,
all backing indices are followed automatically.

The data streams replicated from a remote cluster by CCR are protected from
local rollovers. The <<promote-data-stream-api,promote data stream API>>
can be used to turn these data streams into regular data streams.

Auto-follow patterns are especially useful with
<<index-lifecycle-management,{ilm-cap}>>, which might continually create
Expand Down
3 changes: 3 additions & 0 deletions docs/reference/data-streams/data-stream-apis.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The following APIs are available for managing <<data-streams,data streams>>:
* <<indices-get-data-stream>>
* <<indices-migrate-to-data-stream>>
* <<data-stream-stats-api>>
* <<promote-data-stream-api>>

For concepts and tutorials, see <<data-streams>>.

Expand All @@ -21,3 +22,5 @@ include::{es-repo-dir}/indices/get-data-stream.asciidoc[]
include::{es-repo-dir}/indices/migrate-to-data-stream.asciidoc[]

include::{es-repo-dir}/indices/data-stream-stats.asciidoc[]

include::{es-repo-dir}/data-streams/promote-data-stream-api.asciidoc[]
38 changes: 38 additions & 0 deletions docs/reference/data-streams/promote-data-stream-api.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
[role="xpack"]
[[promote-data-stream-api]]
=== Promote Data Stream API
++++
<titleabbrev>Promote data stream api</titleabbrev>
++++

The purpose of the promote data stream api is to turn
a data stream that is replicated by CCR into a regular
data stream.

Via CCR Auto Following, a data stream from a remote cluster
can be replicated to the local cluster. These data streams
can't be rolled over in the local cluster. Only if the upstream
data stream rolls over then these replicated data streams roll
over as well. In the event that the remote cluster is no longer
available, the data stream in the local cluster can be promoted
to a regular data stream, which allows these data streams to
be rolled over in the local cluster.

[source,console]
----
POST /_data_stream/_promote/my-data-stream
----
// TEST[catch:missing]

[[promote-data-stream-api-request]]
==== {api-request-title}

`POST /_data_stream/_promote/<data-stream>`


[[promote-data-stream-api-path-params]]
==== {api-path-parms-title}

`<data-stream>`::
(Required, string)
The name of the data stream to promote.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.List;
Expand Down Expand Up @@ -148,6 +149,7 @@ private RolloverResult rolloverDataStream(ClusterState currentState, IndexAbstra
final DataStream ds = dataStream.getDataStream();
final IndexMetadata originalWriteIndex = dataStream.getWriteIndex();
final String newWriteIndexName = DataStream.getDefaultBackingIndexName(ds.getName(), ds.getGeneration() + 1);
ds.rollover(new Index(newWriteIndexName, "uuid")); // just for validation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a separate method that ensures this datastream can be rolled over instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this is what happens in this rolloverDataStream(...) method. Anything before the if statement is for validating purposes and after the if statement is for actually performing the rollover (the DataStream#rollover(...) method is then also invoked). I just added the DataStream#rollover(...) invocation here, so that validation that is in this method is also performed when just validating the rollover.

createIndexService.validateIndexName(newWriteIndexName, currentState); // fails if the index already exists
if (onlyValidate) {
return new RolloverResult(newWriteIndexName, originalWriteIndex.getIndex().getName(), currentState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,29 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To

public static final String BACKING_INDEX_PREFIX = ".ds-";
public static final Version HIDDEN_VERSION = Version.V_7_11_0;
public static final Version REPLICATED_VERSION = Version.V_8_0_0;

private final String name;
private final TimestampField timeStampField;
private final List<Index> indices;
private final long generation;
private final Map<String, Object> metadata;
private final boolean hidden;
private final boolean replicated;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we name this "allowRollover" or "followed"? I am okay with "replicated" if you prefer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something that indicates whether a data stream can be rolled over is a better name. I will think about this.


public DataStream(String name, TimestampField timeStampField, List<Index> indices, long generation, Map<String, Object> metadata) {
this(name, timeStampField, indices, generation, metadata, false);
this(name, timeStampField, indices, generation, metadata, false, false);
}

public DataStream(String name, TimestampField timeStampField, List<Index> indices, long generation, Map<String, Object> metadata,
boolean hidden) {
boolean hidden, boolean replicated) {
this.name = name;
this.timeStampField = timeStampField;
this.indices = Collections.unmodifiableList(indices);
this.generation = generation;
this.metadata = metadata;
this.hidden = hidden;
this.replicated = replicated;
assert indices.size() > 0;
}

Expand Down Expand Up @@ -100,6 +103,16 @@ public boolean isHidden() {
return hidden;
}

/**
* Determines whether this data stream is replicated from elsewhere,
* for example a remote cluster.
*
* @return Whether this data stream is replicated.
*/
public boolean isReplicated() {
return replicated;
}

/**
* Performs a rollover on a {@code DataStream} instance and returns a new instance containing
* the updated list of backing indices and incremented generation.
Expand All @@ -110,9 +123,14 @@ public boolean isHidden() {
*/
public DataStream rollover(Index newWriteIndex) {
assert newWriteIndex.getName().equals(getDefaultBackingIndexName(name, generation + 1));
if (replicated) {
throw new IllegalArgumentException("data stream [" + name + "] cannot be rolled over, " +
"because it is a replicated data stream");
}

List<Index> backingIndices = new ArrayList<>(indices);
backingIndices.add(newWriteIndex);
return new DataStream(name, timeStampField, backingIndices, generation + 1, metadata, hidden);
return new DataStream(name, timeStampField, backingIndices, generation + 1, metadata, hidden, replicated);
}

/**
Expand All @@ -126,7 +144,7 @@ public DataStream removeBackingIndex(Index index) {
List<Index> backingIndices = new ArrayList<>(indices);
backingIndices.remove(index);
assert backingIndices.size() == indices.size() - 1;
return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden);
return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated);
}

/**
Expand All @@ -151,7 +169,11 @@ public DataStream replaceBackingIndex(Index existingBackingIndex, Index newBacki
"it is the write index", existingBackingIndex.getName(), name));
}
backingIndices.set(backingIndexPosition, newBackingIndex);
return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden);
return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated);
}

public DataStream promoteDataStream() {
return new DataStream(name, timeStampField, indices, getGeneration(), metadata, hidden, false);
}

/**
Expand All @@ -169,7 +191,8 @@ public static String getDefaultBackingIndexName(String dataStreamName, long gene
public DataStream(StreamInput in) throws IOException {
this(in.readString(), new TimestampField(in), in.readList(Index::new), in.readVLong(),
in.getVersion().onOrAfter(Version.V_7_11_0) ? in.readMap(): null,
in.getVersion().onOrAfter(HIDDEN_VERSION) && in.readBoolean());
in.getVersion().onOrAfter(HIDDEN_VERSION) && in.readBoolean(),
in.getVersion().onOrAfter(REPLICATED_VERSION) && in.readBoolean());
}

public static Diff<DataStream> readDiffFrom(StreamInput in) throws IOException {
Expand All @@ -188,6 +211,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(HIDDEN_VERSION)) {
out.writeBoolean(hidden);
}
if (out.getVersion().onOrAfter(REPLICATED_VERSION)) {
out.writeBoolean(replicated);
}
}

public static final ParseField NAME_FIELD = new ParseField("name");
Expand All @@ -196,11 +222,12 @@ public void writeTo(StreamOutput out) throws IOException {
public static final ParseField GENERATION_FIELD = new ParseField("generation");
public static final ParseField METADATA_FIELD = new ParseField("_meta");
public static final ParseField HIDDEN_FIELD = new ParseField("hidden");
public static final ParseField REPLICATED_FIELD = new ParseField("replicated");

@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>("data_stream",
args -> new DataStream((String) args[0], (TimestampField) args[1], (List<Index>) args[2], (Long) args[3],
(Map<String, Object>) args[4], args[5] != null && (boolean) args[5]));
(Map<String, Object>) args[4], args[5] != null && (boolean) args[5], args[6] != null && (boolean) args[6]));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD);
Expand All @@ -209,6 +236,7 @@ public void writeTo(StreamOutput out) throws IOException {
PARSER.declareLong(ConstructingObjectParser.constructorArg(), GENERATION_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), METADATA_FIELD);
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), HIDDEN_FIELD);
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), REPLICATED_FIELD);
}

public static DataStream fromXContent(XContentParser parser) throws IOException {
Expand All @@ -226,6 +254,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(METADATA_FIELD.getPreferredName(), metadata);
}
builder.field(HIDDEN_FIELD.getPreferredName(), hidden);
builder.field(REPLICATED_FIELD.getPreferredName(), replicated);
builder.endObject();
return builder;
}
Expand All @@ -239,12 +268,14 @@ public boolean equals(Object o) {
timeStampField.equals(that.timeStampField) &&
indices.equals(that.indices) &&
generation == that.generation &&
Objects.equals(metadata, that.metadata);
Objects.equals(metadata, that.metadata) &&
hidden == that.hidden &&
replicated == that.replicated;
}

@Override
public int hashCode() {
return Objects.hash(name, timeStampField, indices, generation, metadata);
return Objects.hash(name, timeStampField, indices, generation, metadata, hidden, replicated);
}

public static final class TimestampField implements Writeable, ToXContentObject {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIn
dsBackingIndices.add(writeIndex.getIndex());
boolean hidden = template.getDataStreamTemplate().isHidden();
DataStream newDataStream = new DataStream(dataStreamName, timestampField, dsBackingIndices, 1L,
template.metadata() != null ? Map.copyOf(template.metadata()) : null, hidden);
template.metadata() != null ? Map.copyOf(template.metadata()) : null, hidden, false);
Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(newDataStream);
logger.info("adding data stream [{}] with write index [{}] and backing indices [{}]", dataStreamName,
writeIndex.getIndex().getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ static DataStream updateDataStream(DataStream dataStream, Metadata.Builder metad
.map(i -> metadata.get(renameIndex(i.getName(), request, true)).getIndex())
.collect(Collectors.toList());
return new DataStream(dataStreamName, dataStream.getTimeStampField(), updatedIndices, dataStream.getGeneration(),
dataStream.getMetadata(), dataStream.isHidden());
dataStream.getMetadata(), dataStream.isHidden(), dataStream.isReplicated());
}

public static RestoreInProgress updateRestoreStateWithDeletedIndices(RestoreInProgress oldRestore, Set<Index> deletedIndices) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,9 @@ public void testRolloverClusterState() throws Exception {
}

public void testRolloverClusterStateForDataStream() throws Exception {
final DataStream dataStream = DataStreamTestHelper.randomInstance();
final DataStream dataStream = DataStreamTestHelper.randomInstance()
// ensure no replicate data stream
.promoteDataStream();
ComposableIndexTemplate template = new ComposableIndexTemplate(List.of(dataStream.getName() + "*"), null, null, null, null, null,
new ComposableIndexTemplate.DataStreamTemplate(), null);
Metadata.Builder builder = Metadata.builder();
Expand Down Expand Up @@ -632,7 +634,9 @@ public void testValidation() throws Exception {
final boolean useDataStream = randomBoolean();
final Metadata.Builder builder = Metadata.builder();
if (useDataStream) {
DataStream dataStream = DataStreamTestHelper.randomInstance();
DataStream dataStream = DataStreamTestHelper.randomInstance()
// ensure no replicate data stream
.promoteDataStream();
rolloverTarget = dataStream.getName();
sourceIndexName = dataStream.getIndices().get(dataStream.getIndices().size() - 1).getName();
defaultRolloverIndexName = DataStream.getDefaultBackingIndexName(dataStream.getName(), dataStream.getGeneration() + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ protected DataStream createTestInstance() {
}

public void testRollover() {
DataStream ds = DataStreamTestHelper.randomInstance();
DataStream ds = DataStreamTestHelper.randomInstance().promoteDataStream();
Index newWriteIndex = new Index(getDefaultBackingIndexName(ds.getName(), ds.getGeneration() + 1), UUIDs.randomBase64UUID(random()));
DataStream rolledDs = ds.rollover(newWriteIndex);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2127,7 +2127,7 @@ public void testHiddenDataStreams() {
.put(index2, false)
.put(justAnIndex, false)
.put(new DataStream(dataStream1, createTimestampField("@timestamp"),
List.of(index1.getIndex(), index2.getIndex()), 2, Collections.emptyMap(), true))).build();
List.of(index1.getIndex(), index2.getIndex()), 2, Collections.emptyMap(), true, false))).build();

Index[] result = indexNameExpressionResolver.concreteIndices(state, IndicesOptions.strictExpandHidden(), true, "logs-*");
assertThat(result, arrayContainingInAnyOrder(index1.getIndex(), index2.getIndex(), justAnIndex.getIndex() ));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ public static DataStream randomInstance() {
if (randomBoolean()) {
metadata = Map.of("key", "value");
}
return new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation, metadata, randomBoolean());
return new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation, metadata,
randomBoolean(), randomBoolean());
}

/**
Expand Down
Loading