Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.cluster.metadata.DataStreamOptions;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -83,7 +84,7 @@ public void testResponseIlmAndDataStreamLifecycleRepresentation() throws Excepti
.setAllowCustomRouting(true)
.setIndexMode(IndexMode.STANDARD)
.setLifecycle(new DataStreamLifecycle())
.setFailureStoreEnabled(true)
.setDataStreamOptions(DataStreamOptions.FAILURE_STORE_ENABLED)
.setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStores).build())
.build();

Expand Down Expand Up @@ -186,7 +187,7 @@ public void testResponseIlmAndDataStreamLifecycleRepresentation() throws Excepti
.setAllowCustomRouting(true)
.setIndexMode(IndexMode.STANDARD)
.setLifecycle(new DataStreamLifecycle(null, null, false))
.setFailureStoreEnabled(true)
.setDataStreamOptions(DataStreamOptions.FAILURE_STORE_ENABLED)
.setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStores).build())
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle.Downsampling;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle.Downsampling.Round;
import org.elasticsearch.cluster.metadata.DataStreamOptions;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexGraveyard;
Expand Down Expand Up @@ -1495,6 +1496,13 @@ public void testTargetIndices() {
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
int numBackingIndices = 3;
int numFailureIndices = 2;
int mutationBranch = randomIntBetween(0, 2);
DataStreamOptions dataStreamOptions = switch (mutationBranch) {
case 0 -> DataStreamOptions.EMPTY;
case 1 -> DataStreamOptions.FAILURE_STORE_ENABLED;
case 2 -> DataStreamOptions.FAILURE_STORE_DISABLED;
default -> throw new IllegalStateException("Unexpected value: " + mutationBranch);
};
Metadata.Builder builder = Metadata.builder();
DataStream dataStream = createDataStream(
builder,
Expand All @@ -1504,7 +1512,7 @@ public void testTargetIndices() {
settings(IndexVersion.current()),
new DataStreamLifecycle(),
now
).copy().setFailureStoreEnabled(randomBoolean()).build(); // failure store is managed even when disabled
).copy().setDataStreamOptions(dataStreamOptions).build(); // failure store is managed even when disabled
builder.put(dataStream);
Metadata metadata = builder.build();
Set<Index> indicesToExclude = Set.of(dataStream.getIndices().get(0), dataStream.getFailureIndices().getIndices().get(0));
Expand Down Expand Up @@ -1536,7 +1544,7 @@ public void testFailureStoreIsManagedEvenWhenDisabled() {
settings(IndexVersion.current()),
DataStreamLifecycle.newBuilder().dataRetention(0).build(),
now
).copy().setFailureStoreEnabled(false).build(); // failure store is managed even when it is disabled
).copy().setDataStreamOptions(DataStreamOptions.FAILURE_STORE_DISABLED).build(); // failure store is managed even when disabled
builder.put(dataStream);

ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ML_INFERENCE_CHUNKING_SETTINGS = def(8_751_00_0);
public static final TransportVersion SEMANTIC_QUERY_INNER_HITS = def(8_752_00_0);
public static final TransportVersion RETAIN_ILM_STEP_INFO = def(8_753_00_0);
public static final TransportVersion ADD_DATA_STREAM_OPTIONS = def(8_754_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public static boolean isFailureStoreFeatureFlagEnabled() {
private final IndexMode indexMode;
@Nullable
private final DataStreamLifecycle lifecycle;
private final boolean failureStoreEnabled;
private final DataStreamOptions dataStreamOptions;
Copy link
Contributor Author

@gmarouli gmarouli Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it would be easier to refer to an empty constant than always perform the null check before accessing this object. Only on serialisation we use null to avoid serialising empty configurations.


private final DataStreamIndices backingIndices;
private final DataStreamIndices failureIndices;
Expand All @@ -128,7 +128,7 @@ public DataStream(
boolean allowCustomRouting,
IndexMode indexMode,
DataStreamLifecycle lifecycle,
boolean failureStoreEnabled,
@Nullable DataStreamOptions dataStreamOptions,
List<Index> failureIndices,
boolean rolloverOnWrite,
@Nullable DataStreamAutoShardingEvent autoShardingEvent
Expand All @@ -144,7 +144,7 @@ public DataStream(
allowCustomRouting,
indexMode,
lifecycle,
failureStoreEnabled,
dataStreamOptions,
new DataStreamIndices(BACKING_INDEX_PREFIX, List.copyOf(indices), rolloverOnWrite, autoShardingEvent),
new DataStreamIndices(FAILURE_STORE_PREFIX, List.copyOf(failureIndices), false, null)
);
Expand All @@ -162,7 +162,7 @@ public DataStream(
boolean allowCustomRouting,
IndexMode indexMode,
DataStreamLifecycle lifecycle,
boolean failureStoreEnabled,
DataStreamOptions dataStreamOptions,
DataStreamIndices backingIndices,
DataStreamIndices failureIndices
) {
Expand All @@ -177,7 +177,7 @@ public DataStream(
this.allowCustomRouting = allowCustomRouting;
this.indexMode = indexMode;
this.lifecycle = lifecycle;
this.failureStoreEnabled = failureStoreEnabled;
this.dataStreamOptions = dataStreamOptions == null ? DataStreamOptions.EMPTY : dataStreamOptions;
assert backingIndices.indices.isEmpty() == false;
assert replicated == false || (backingIndices.rolloverOnWrite == false && failureIndices.rolloverOnWrite == false)
: "replicated data streams cannot be marked for lazy rollover";
Expand All @@ -198,9 +198,11 @@ public static DataStream read(StreamInput in) throws IOException {
var lifecycle = in.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X)
? in.readOptionalWriteable(DataStreamLifecycle::new)
: null;
var failureStoreEnabled = in.getTransportVersion().onOrAfter(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION)
? in.readBoolean()
: false;
// This boolean flag has been moved in data stream options
var failureStoreEnabled = in.getTransportVersion()
.between(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION, TransportVersions.ADD_DATA_STREAM_OPTIONS)
? in.readBoolean()
: false;
var failureIndices = in.getTransportVersion().onOrAfter(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION)
? readIndices(in)
: List.<Index>of();
Expand All @@ -213,6 +215,14 @@ public static DataStream read(StreamInput in) throws IOException {
failureIndicesBuilder.setRolloverOnWrite(in.readBoolean())
.setAutoShardingEvent(in.readOptionalWriteable(DataStreamAutoShardingEvent::new));
}
DataStreamOptions dataStreamOptions;
if (in.getTransportVersion().onOrAfter(TransportVersions.ADD_DATA_STREAM_OPTIONS)) {
dataStreamOptions = in.readOptionalWriteable(DataStreamOptions::read);
} else {
// We cannot distinguish if failure store was explicitly disabled or not. Given that failure store
// is still behind a feature flag in previous version we use the default value instead of explicitly disabling it.
dataStreamOptions = failureStoreEnabled ? DataStreamOptions.FAILURE_STORE_ENABLED : null;
}
return new DataStream(
name,
generation,
Expand All @@ -224,7 +234,7 @@ public static DataStream read(StreamInput in) throws IOException {
allowCustomRouting,
indexMode,
lifecycle,
failureStoreEnabled,
dataStreamOptions,
backingIndicesBuilder.build(),
failureIndicesBuilder.build()
);
Expand Down Expand Up @@ -274,6 +284,10 @@ public boolean isFailureStoreIndex(String indexName) {
return failureIndices.containsIndex(indexName);
}

public DataStreamOptions getDataStreamOptions() {
return dataStreamOptions;
}

public boolean rolloverOnWrite() {
return backingIndices.rolloverOnWrite;
}
Expand Down Expand Up @@ -406,13 +420,12 @@ public boolean isAllowCustomRouting() {
}

/**
* Determines if this data stream should persist ingest pipeline and mapping failures from bulk requests to a locally
* configured failure store.
*
* @return Whether this data stream should store ingestion failures.
* Determines if this data stream has its failure store enabled or not. Currently, the failure store
* is enabled only when a user has explicitly requested it.
* @return true, if the user has explicitly enabled the failure store.
*/
public boolean isFailureStoreEnabled() {
return failureStoreEnabled;
return dataStreamOptions.failureStore() != null && dataStreamOptions.failureStore().isExplicitlyEnabled();
}

@Nullable
Expand Down Expand Up @@ -1063,8 +1076,11 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X)) {
out.writeOptionalWriteable(lifecycle);
}
if (out.getTransportVersion()
.between(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION, TransportVersions.ADD_DATA_STREAM_OPTIONS)) {
out.writeBoolean(isFailureStoreEnabled());
}
if (out.getTransportVersion().onOrAfter(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION)) {
out.writeBoolean(failureStoreEnabled);
out.writeCollection(failureIndices.indices);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
Expand All @@ -1077,6 +1093,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(failureIndices.rolloverOnWrite);
out.writeOptionalWriteable(failureIndices.autoShardingEvent);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.ADD_DATA_STREAM_OPTIONS)) {
out.writeOptionalWriteable(dataStreamOptions.isEmpty() ? null : dataStreamOptions);
}
}

public static final ParseField NAME_FIELD = new ParseField("name");
Expand All @@ -1096,6 +1115,7 @@ public void writeTo(StreamOutput out) throws IOException {
public static final ParseField AUTO_SHARDING_FIELD = new ParseField("auto_sharding");
public static final ParseField FAILURE_ROLLOVER_ON_WRITE_FIELD = new ParseField("failure_rollover_on_write");
public static final ParseField FAILURE_AUTO_SHARDING_FIELD = new ParseField("failure_auto_sharding");
public static final ParseField DATA_STREAM_OPTIONS_FIELD = new ParseField("options");

@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>("data_stream", args -> {
Expand All @@ -1110,6 +1130,16 @@ public void writeTo(StreamOutput out) throws IOException {
(DataStreamAutoShardingEvent) args[15]
)
: new DataStreamIndices(FAILURE_STORE_PREFIX, List.of(), false, null);
// We cannot distinguish if failure store was explicitly disabled or not. Given that failure store
// is still behind a feature flag in previous version we use the default value instead of explicitly disabling it.
DataStreamOptions dataStreamOptions = DataStreamOptions.EMPTY;
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
if (args[16] != null) {
dataStreamOptions = (DataStreamOptions) args[16];
} else if (failureStoreEnabled) {
dataStreamOptions = DataStreamOptions.FAILURE_STORE_ENABLED;
}
}
return new DataStream(
(String) args[0],
(Long) args[2],
Expand All @@ -1121,7 +1151,7 @@ public void writeTo(StreamOutput out) throws IOException {
args[7] != null && (boolean) args[7],
args[8] != null ? IndexMode.fromString((String) args[8]) : null,
(DataStreamLifecycle) args[9],
failureStoreEnabled,
dataStreamOptions,
new DataStreamIndices(
BACKING_INDEX_PREFIX,
(List<Index>) args[1],
Expand Down Expand Up @@ -1171,6 +1201,11 @@ public void writeTo(StreamOutput out) throws IOException {
(p, c) -> DataStreamAutoShardingEvent.fromXContent(p),
FAILURE_AUTO_SHARDING_FIELD
);
PARSER.declareObject(
ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> DataStreamOptions.fromXContent(p),
DATA_STREAM_OPTIONS_FIELD
);
}
}

Expand Down Expand Up @@ -1208,7 +1243,6 @@ public XContentBuilder toXContent(
builder.field(SYSTEM_FIELD.getPreferredName(), system);
builder.field(ALLOW_CUSTOM_ROUTING.getPreferredName(), allowCustomRouting);
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
builder.field(FAILURE_STORE_FIELD.getPreferredName(), failureStoreEnabled);
if (failureIndices.indices.isEmpty() == false) {
builder.xContentList(FAILURE_INDICES_FIELD.getPreferredName(), failureIndices.indices);
}
Expand All @@ -1218,6 +1252,10 @@ public XContentBuilder toXContent(
failureIndices.autoShardingEvent.toXContent(builder, params);
builder.endObject();
}
if (dataStreamOptions.isEmpty() == false) {
builder.field(DATA_STREAM_OPTIONS_FIELD.getPreferredName());
dataStreamOptions.toXContent(builder, params);
}
}
if (indexMode != null) {
builder.field(INDEX_MODE.getPreferredName(), indexMode);
Expand Down Expand Up @@ -1250,7 +1288,7 @@ public boolean equals(Object o) {
&& allowCustomRouting == that.allowCustomRouting
&& indexMode == that.indexMode
&& Objects.equals(lifecycle, that.lifecycle)
&& failureStoreEnabled == that.failureStoreEnabled
&& Objects.equals(dataStreamOptions, that.dataStreamOptions)
&& Objects.equals(backingIndices, that.backingIndices)
&& Objects.equals(failureIndices, that.failureIndices);
}
Expand All @@ -1267,7 +1305,7 @@ public int hashCode() {
allowCustomRouting,
indexMode,
lifecycle,
failureStoreEnabled,
dataStreamOptions,
backingIndices,
failureIndices
);
Expand Down Expand Up @@ -1580,7 +1618,7 @@ public static class Builder {
private IndexMode indexMode = null;
@Nullable
private DataStreamLifecycle lifecycle = null;
private boolean failureStoreEnabled = false;
private DataStreamOptions dataStreamOptions = DataStreamOptions.EMPTY;
private DataStreamIndices backingIndices;
private DataStreamIndices failureIndices = DataStreamIndices.failureIndicesBuilder(List.of()).build();

Expand All @@ -1605,7 +1643,7 @@ private Builder(DataStream dataStream) {
allowCustomRouting = dataStream.allowCustomRouting;
indexMode = dataStream.indexMode;
lifecycle = dataStream.lifecycle;
failureStoreEnabled = dataStream.failureStoreEnabled;
dataStreamOptions = dataStream.dataStreamOptions;
backingIndices = dataStream.backingIndices;
failureIndices = dataStream.failureIndices;
}
Expand Down Expand Up @@ -1660,8 +1698,8 @@ public Builder setLifecycle(DataStreamLifecycle lifecycle) {
return this;
}

public Builder setFailureStoreEnabled(boolean failureStoreEnabled) {
this.failureStoreEnabled = failureStoreEnabled;
public Builder setDataStreamOptions(DataStreamOptions dataStreamOptions) {
this.dataStreamOptions = dataStreamOptions;
return this;
}

Expand Down Expand Up @@ -1697,7 +1735,7 @@ public DataStream build() {
allowCustomRouting,
indexMode,
lifecycle,
failureStoreEnabled,
dataStreamOptions,
backingIndices,
failureIndices
);
Expand Down
Loading