Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/reference/ccr/auto-follow.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ automatically followed if the data stream name matches an auto-follow
pattern. If you create a data stream after creating the auto-follow pattern,
all backing indices are followed automatically.

The data streams replicated from a remote cluster by CCR are protected from
local rollovers. The <<promote-data-stream-api,promote data stream API>>
can be used to turn these data streams into regular data streams.

Auto-follow patterns are especially useful with
<<index-lifecycle-management,{ilm-cap}>>, which might continually create
Expand Down
5 changes: 4 additions & 1 deletion docs/reference/data-streams/data-stream-apis.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The following APIs are available for managing <<data-streams,data streams>>:
* <<indices-delete-data-stream>>
* <<indices-get-data-stream>>
* <<data-stream-stats-api>>
* <<promote-data-stream-api>>

For concepts and tutorials, see <<data-streams>>.

Expand All @@ -17,4 +18,6 @@ include::{es-repo-dir}/indices/delete-data-stream.asciidoc[]

include::{es-repo-dir}/indices/get-data-stream.asciidoc[]

include::{es-repo-dir}/indices/data-stream-stats.asciidoc[]
include::{es-repo-dir}/indices/data-stream-stats.asciidoc[]

include::{es-repo-dir}/data-streams/promote-data-stream-api.asciidoc[]
38 changes: 38 additions & 0 deletions docs/reference/data-streams/promote-data-stream-api.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
[role="xpack"]
[[promote-data-stream-api]]
=== Promote Data Stream API
++++
<titleabbrev>Promote data stream api</titleabbrev>
++++

The purpose of the promote data stream api is to turn
a data stream that is replicated by CCR into a regular
data stream.

Via CCR Auto Following, a data stream from a remote cluster
can be replicated to the local cluster. These data streams
can't be rolled over in the local cluster. Only if the upstream
data stream rolls over then these replicated data streams roll
over as well. In the event that the remote cluster is no longer
available, the data stream in the local cluster can be promoted
to a regular data stream, which allows these data streams to
be rolled over in the local cluster.

[source,console]
----
POST /_data_stream/_promote/my-data-stream
----
// TEST[catch:missing]

[[promote-data-stream-api-request]]
==== {api-request-title}

`POST /_data_stream/_promote/<data-stream>`


[[promote-data-stream-api-path-params]]
==== {api-path-parms-title}

`<data-stream>`::
(Required, string)
The name of the data stream to promote.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
Expand Down Expand Up @@ -150,6 +151,7 @@ private RolloverResult rolloverDataStream(ClusterState currentState, IndexAbstra
final DataStream ds = dataStream.getDataStream();
final IndexMetadata originalWriteIndex = dataStream.getWriteIndex();
final String newWriteIndexName = DataStream.getDefaultBackingIndexName(ds.getName(), ds.getGeneration() + 1);
ds.rollover(new Index(newWriteIndexName, "uuid")); // just for validation
createIndexService.validateIndexName(newWriteIndexName, currentState); // fails if the index already exists
if (onlyValidate) {
return new RolloverResult(newWriteIndexName, originalWriteIndex.getIndex().getName(), currentState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,29 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To

public static final String BACKING_INDEX_PREFIX = ".ds-";
public static final Version HIDDEN_VERSION = Version.V_7_11_0;
public static final Version REPLICATED_VERSION = Version.V_7_11_0;

private final String name;
private final TimestampField timeStampField;
private final List<Index> indices;
private final long generation;
private final Map<String, Object> metadata;
private final boolean hidden;
private final boolean replicated;

public DataStream(String name, TimestampField timeStampField, List<Index> indices, long generation, Map<String, Object> metadata) {
this(name, timeStampField, indices, generation, metadata, false);
this(name, timeStampField, indices, generation, metadata, false, false);
}

public DataStream(String name, TimestampField timeStampField, List<Index> indices, long generation, Map<String, Object> metadata,
boolean hidden) {
boolean hidden, boolean replicated) {
this.name = name;
this.timeStampField = timeStampField;
this.indices = Collections.unmodifiableList(indices);
this.generation = generation;
this.metadata = metadata;
this.hidden = hidden;
this.replicated = replicated;
assert indices.size() > 0;
assert indices.get(indices.size() - 1).getName().equals(getDefaultBackingIndexName(name, generation));
}
Expand Down Expand Up @@ -97,6 +100,16 @@ public boolean isHidden() {
return hidden;
}

/**
* Determines whether this data stream is replicated from elsewhere,
* for example a remote cluster.
*
* @return Whether this data stream is replicated.
*/
public boolean isReplicated() {
return replicated;
}

/**
* Performs a rollover on a {@code DataStream} instance and returns a new instance containing
* the updated list of backing indices and incremented generation.
Expand All @@ -107,9 +120,14 @@ public boolean isHidden() {
*/
public DataStream rollover(Index newWriteIndex) {
assert newWriteIndex.getName().equals(getDefaultBackingIndexName(name, generation + 1));
if (replicated) {
throw new IllegalArgumentException("data stream [" + name + "] cannot be rolled over, " +
"because it is a replicated data stream");
}

List<Index> backingIndices = new ArrayList<>(indices);
backingIndices.add(newWriteIndex);
return new DataStream(name, timeStampField, backingIndices, generation + 1, metadata, hidden);
return new DataStream(name, timeStampField, backingIndices, generation + 1, metadata, hidden, replicated);
}

/**
Expand All @@ -123,7 +141,7 @@ public DataStream removeBackingIndex(Index index) {
List<Index> backingIndices = new ArrayList<>(indices);
backingIndices.remove(index);
assert backingIndices.size() == indices.size() - 1;
return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden);
return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated);
}

/**
Expand All @@ -148,7 +166,11 @@ public DataStream replaceBackingIndex(Index existingBackingIndex, Index newBacki
"it is the write index", existingBackingIndex.getName(), name));
}
backingIndices.set(backingIndexPosition, newBackingIndex);
return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden);
return new DataStream(name, timeStampField, backingIndices, generation, metadata, hidden, replicated);
}

public DataStream promoteDataStream() {
return new DataStream(name, timeStampField, indices, getGeneration(), metadata, hidden, false);
}

/**
Expand All @@ -166,7 +188,8 @@ public static String getDefaultBackingIndexName(String dataStreamName, long gene
public DataStream(StreamInput in) throws IOException {
this(in.readString(), new TimestampField(in), in.readList(Index::new), in.readVLong(),
in.getVersion().onOrAfter(Version.V_7_11_0) ? in.readMap(): null,
in.getVersion().onOrAfter(HIDDEN_VERSION) && in.readBoolean());
in.getVersion().onOrAfter(HIDDEN_VERSION) && in.readBoolean(),
in.getVersion().onOrAfter(REPLICATED_VERSION) && in.readBoolean());
}

public static Diff<DataStream> readDiffFrom(StreamInput in) throws IOException {
Expand All @@ -185,6 +208,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(HIDDEN_VERSION)) {
out.writeBoolean(hidden);
}
if (out.getVersion().onOrAfter(REPLICATED_VERSION)) {
out.writeBoolean(replicated);
}
}

public static final ParseField NAME_FIELD = new ParseField("name");
Expand All @@ -193,11 +219,12 @@ public void writeTo(StreamOutput out) throws IOException {
public static final ParseField GENERATION_FIELD = new ParseField("generation");
public static final ParseField METADATA_FIELD = new ParseField("_meta");
public static final ParseField HIDDEN_FIELD = new ParseField("hidden");
public static final ParseField REPLICATED_FIELD = new ParseField("replicated");

@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>("data_stream",
args -> new DataStream((String) args[0], (TimestampField) args[1], (List<Index>) args[2], (Long) args[3],
(Map<String, Object>) args[4], args[5] != null && (boolean) args[5]));
(Map<String, Object>) args[4], args[5] != null && (boolean) args[5], args[6] != null && (boolean) args[6]));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD);
Expand All @@ -206,6 +233,7 @@ public void writeTo(StreamOutput out) throws IOException {
PARSER.declareLong(ConstructingObjectParser.constructorArg(), GENERATION_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), METADATA_FIELD);
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), HIDDEN_FIELD);
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), REPLICATED_FIELD);
}

public static DataStream fromXContent(XContentParser parser) throws IOException {
Expand All @@ -223,6 +251,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(METADATA_FIELD.getPreferredName(), metadata);
}
builder.field(HIDDEN_FIELD.getPreferredName(), hidden);
builder.field(REPLICATED_FIELD.getPreferredName(), replicated);
builder.endObject();
return builder;
}
Expand All @@ -236,12 +265,14 @@ public boolean equals(Object o) {
timeStampField.equals(that.timeStampField) &&
indices.equals(that.indices) &&
generation == that.generation &&
Objects.equals(metadata, that.metadata);
Objects.equals(metadata, that.metadata) &&
hidden == that.hidden &&
replicated == that.replicated;
}

@Override
public int hashCode() {
return Objects.hash(name, timeStampField, indices, generation, metadata);
return Objects.hash(name, timeStampField, indices, generation, metadata, hidden, replicated);
}

public static final class TimestampField implements Writeable, ToXContentObject {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIn
DataStream newDataStream =
new DataStream(request.name, timestampField,
Collections.singletonList(firstBackingIndex.getIndex()), 1L,
template.metadata() != null ? Collections.unmodifiableMap(new HashMap<>(template.metadata())) : null, hidden);
template.metadata() != null ? Collections.unmodifiableMap(new HashMap<>(template.metadata())) : null, hidden, false);
Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(newDataStream);
logger.info("adding data stream [{}]", request.name);
return ClusterState.builder(currentState).metadata(builder).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ static DataStream updateDataStream(DataStream dataStream, Metadata.Builder metad
.map(i -> metadata.get(renameIndex(i.getName(), request, true)).getIndex())
.collect(Collectors.toList());
return new DataStream(dataStreamName, dataStream.getTimeStampField(), updatedIndices, dataStream.getGeneration(),
dataStream.getMetadata(), dataStream.isHidden());
dataStream.getMetadata(), dataStream.isHidden(), dataStream.isReplicated());
}

public static RestoreInProgress updateRestoreStateWithDeletedIndices(RestoreInProgress oldRestore, Set<Index> deletedIndices) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,9 @@ public void testRolloverClusterState() throws Exception {
}

public void testRolloverClusterStateForDataStream() throws Exception {
final DataStream dataStream = DataStreamTestHelper.randomInstance();
final DataStream dataStream = DataStreamTestHelper.randomInstance()
// ensure no replicate data stream
.promoteDataStream();
ComposableIndexTemplate template = new ComposableIndexTemplate(Collections.singletonList(dataStream.getName() + "*"),
null, null, null, null, null, new ComposableIndexTemplate.DataStreamTemplate(), null);
Metadata.Builder builder = Metadata.builder();
Expand Down Expand Up @@ -633,7 +635,9 @@ public void testValidation() throws Exception {
final boolean useDataStream = randomBoolean();
final Metadata.Builder builder = Metadata.builder();
if (useDataStream) {
DataStream dataStream = DataStreamTestHelper.randomInstance();
DataStream dataStream = DataStreamTestHelper.randomInstance()
// ensure no replicate data stream
.promoteDataStream();
rolloverTarget = dataStream.getName();
sourceIndexName = dataStream.getIndices().get(dataStream.getIndices().size() - 1).getName();
defaultRolloverIndexName = DataStream.getDefaultBackingIndexName(dataStream.getName(), dataStream.getGeneration() + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ protected DataStream createTestInstance() {
}

public void testRollover() {
DataStream ds = DataStreamTestHelper.randomInstance();
DataStream ds = DataStreamTestHelper.randomInstance().promoteDataStream();
Index newWriteIndex = new Index(getDefaultBackingIndexName(ds.getName(), ds.getGeneration() + 1), UUIDs.randomBase64UUID(random()));
DataStream rolledDs = ds.rollover(newWriteIndex);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2134,7 +2134,7 @@ public void testHiddenDataStreams() {
.put(index2, false)
.put(justAnIndex, false)
.put(new DataStream(dataStream1, createTimestampField("@timestamp"),
Arrays.asList(index1.getIndex(), index2.getIndex()), 2, Collections.emptyMap(), true))).build();
Arrays.asList(index1.getIndex(), index2.getIndex()), 2, Collections.emptyMap(), true, false))).build();

Index[] result = indexNameExpressionResolver.concreteIndices(state, IndicesOptions.strictExpandHidden(), true, "logs-*");
assertThat(result, arrayContainingInAnyOrder(index1.getIndex(), index2.getIndex(), justAnIndex.getIndex() ));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ public static DataStream randomInstance() {
metadata = new HashMap<>();
metadata.put("key", "value");
}
return new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation, metadata, randomBoolean());
return new DataStream(dataStreamName, createTimestampField("@timestamp"), indices, generation, metadata,
randomBoolean(), randomBoolean());
}

/**
Expand Down
Loading