diff --git a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/RestNoopBulkAction.java b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/RestNoopBulkAction.java
index 6c04cda6a66a9..6ad1bac8d6e32 100644
--- a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/RestNoopBulkAction.java
+++ b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/RestNoopBulkAction.java
@@ -54,6 +54,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
String defaultRouting = request.param("routing");
String defaultPipeline = request.param("pipeline");
Boolean defaultRequireAlias = request.paramAsBoolean("require_alias", null);
+ Boolean defaultRequireDataStream = request.paramAsBoolean("require_data_stream", null);
Boolean defaultListExecutedPipelines = request.paramAsBoolean("list_executed_pipelines", null);
String waitForActiveShards = request.param("wait_for_active_shards");
@@ -69,6 +70,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
null,
defaultPipeline,
defaultRequireAlias,
+ defaultRequireDataStream,
defaultListExecutedPipelines,
true,
request.getXContentType(),
diff --git a/client/rest/src/main/java/org/elasticsearch/client/Request.java b/client/rest/src/main/java/org/elasticsearch/client/Request.java
index 6423bee1cb44e..8195e3fdc8a79 100644
--- a/client/rest/src/main/java/org/elasticsearch/client/Request.java
+++ b/client/rest/src/main/java/org/elasticsearch/client/Request.java
@@ -67,7 +67,7 @@ public String getEndpoint() {
/**
* Add a query string parameter.
* @param name the name of the url parameter. Must not be null.
- * @param value the value of the url url parameter. If {@code null} then
+ * @param value the value of the url parameter. If {@code null} then
* the parameter is sent as {@code name} rather than {@code name=value}
* @throws IllegalArgumentException if a parameter with that name has
* already been set
diff --git a/docs/changelog/101872.yaml b/docs/changelog/101872.yaml
new file mode 100644
index 0000000000000..1c63c2d8b009a
--- /dev/null
+++ b/docs/changelog/101872.yaml
@@ -0,0 +1,6 @@
+pr: 101872
+summary: "Add `require_data_stream` parameter to indexing requests to enforce indexing operations target a data stream"
+area: Data streams
+type: feature
+issues:
+ - 97032
diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/AutoCreateDataStreamIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/AutoCreateDataStreamIT.java
index 5ea366784c66a..4ffdc128f8ec0 100644
--- a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/AutoCreateDataStreamIT.java
+++ b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/AutoCreateDataStreamIT.java
@@ -7,6 +7,7 @@
*/
package org.elasticsearch.datastreams;
+import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
@@ -23,6 +24,7 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.Matchers.containsString;
+@SuppressWarnings("resource")
public class AutoCreateDataStreamIT extends DisabledSecurityDataStreamTestCase {
/**
@@ -31,7 +33,7 @@ public class AutoCreateDataStreamIT extends DisabledSecurityDataStreamTestCase {
*/
public void testCanAutoCreateDataStreamWhenAutoCreateIndexDisabled() throws IOException {
configureAutoCreateIndex(false);
- createTemplateWithAllowAutoCreate(null);
+ createTemplate(null, true);
assertOK(this.indexDocument());
}
@@ -40,7 +42,7 @@ public void testCanAutoCreateDataStreamWhenAutoCreateIndexDisabled() throws IOEx
* and that template has allow_auto_create set to true.
*/
public void testCanAutoCreateDataStreamWhenExplicitlyAllowedByTemplate() throws IOException {
- createTemplateWithAllowAutoCreate(true);
+ createTemplate(true, true);
// Attempt to add a document to a non-existing index. Auto-creating the index should succeed because the index name
// matches the template pattern
@@ -52,15 +54,60 @@ public void testCanAutoCreateDataStreamWhenExplicitlyAllowedByTemplate() throws
* allow_auto_create explicitly to false.
*/
public void testCannotAutoCreateDataStreamWhenDisallowedByTemplate() throws IOException {
- createTemplateWithAllowAutoCreate(false);
+ createTemplate(false, true);
- // Attempt to add a document to a non-existing index. Auto-creating the index should succeed because the index name
- // matches the template pattern
+ // Auto-creating the index should fail when the template disallows that
final ResponseException responseException = expectThrows(ResponseException.class, this::indexDocument);
assertThat(
Streams.copyToString(new InputStreamReader(responseException.getResponse().getEntity().getContent(), UTF_8)),
- containsString("no such index [composable template [recipe*] forbids index auto creation]")
+ containsString("no such index [recipe_kr] and composable template [recipe*] forbids index auto creation")
+ );
+ }
+
+ /**
+ * Check that if require_data_stream is set to true, automatically creating an index is allowed only
+ * if its name matches an index template AND it contains a data-stream template
+ */
+ public void testCannotAutoCreateDataStreamWhenNoDataStreamTemplateMatch() throws IOException {
+ createTemplate(true, true);
+
+ final Request request = prepareIndexRequest("ingredients_kr");
+ request.addParameter(DocWriteRequest.REQUIRE_DATA_STREAM, Boolean.TRUE.toString());
+
+ // Attempt to add a document to a non-existing index. Auto-creating the index should fail because the index name doesn't
+ // match the template pattern and the request requires a data stream template
+ final ResponseException responseException = expectThrows(ResponseException.class, () -> client().performRequest(request));
+
+ assertThat(
+ Streams.copyToString(new InputStreamReader(responseException.getResponse().getEntity().getContent(), UTF_8)),
+ containsString(
+ "no such index [ingredients_kr] and the index creation request requires a data stream, "
+ + "but no matching index template with data stream template was found for it"
+ )
+ );
+ }
+
+ /**
+ * Check that if require_data_stream is set to true, automatically creating an index is allowed only
+ * if its name matches an index template AND it contains a data-stream template
+ */
+ public void testCannotAutoCreateDataStreamWhenMatchingTemplateIsNotDataStream() throws IOException {
+ createTemplate(true, false);
+
+ final Request request = prepareIndexRequest("recipe_kr");
+ request.addParameter(DocWriteRequest.REQUIRE_DATA_STREAM, Boolean.TRUE.toString());
+
+ // Attempt to add a document to a non-existing index. Auto-creating the index should fail because the index name doesn't
+ // match the template pattern and the request requires a data stream template
+ final ResponseException responseException = expectThrows(ResponseException.class, () -> client().performRequest(request));
+
+ assertThat(
+ Streams.copyToString(new InputStreamReader(responseException.getResponse().getEntity().getContent(), UTF_8)),
+ containsString(
+ "no such index [recipe_kr] and the index creation request requires a data stream, "
+ + "but no matching index template with data stream template was found for it"
+ )
);
}
@@ -78,7 +125,7 @@ private void configureAutoCreateIndex(boolean value) throws IOException {
assertOK(settingsResponse);
}
- private void createTemplateWithAllowAutoCreate(Boolean allowAutoCreate) throws IOException {
+ private void createTemplate(Boolean allowAutoCreate, boolean addDataStreamTemplate) throws IOException {
XContentBuilder b = JsonXContent.contentBuilder();
b.startObject();
{
@@ -86,8 +133,10 @@ private void createTemplateWithAllowAutoCreate(Boolean allowAutoCreate) throws I
if (allowAutoCreate != null) {
b.field("allow_auto_create", allowAutoCreate);
}
- b.startObject("data_stream");
- b.endObject();
+ if (addDataStreamTemplate) {
+ b.startObject("data_stream");
+ b.endObject();
+ }
}
b.endObject();
@@ -98,8 +147,13 @@ private void createTemplateWithAllowAutoCreate(Boolean allowAutoCreate) throws I
}
private Response indexDocument() throws IOException {
- final Request indexDocumentRequest = new Request("POST", "recipe_kr/_doc");
- indexDocumentRequest.setJsonEntity("{ \"@timestamp\": \"" + Instant.now() + "\", \"name\": \"Kimchi\" }");
+ final Request indexDocumentRequest = prepareIndexRequest("recipe_kr");
return client().performRequest(indexDocumentRequest);
}
+
+ private Request prepareIndexRequest(String indexName) {
+ final Request indexDocumentRequest = new Request("POST", indexName + "/_doc");
+ indexDocumentRequest.setJsonEntity("{ \"@timestamp\": \"" + Instant.now() + "\", \"name\": \"Kimchi\" }");
+ return indexDocumentRequest;
+ }
}
diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_require_data_stream.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_require_data_stream.yml
new file mode 100644
index 0000000000000..c77e114616c78
--- /dev/null
+++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_require_data_stream.yml
@@ -0,0 +1,138 @@
+---
+"Testing require_data_stream in index creation":
+ - skip:
+ version: " - 8.12.99"
+ reason: "require_data_stream was introduced in 8.13.0"
+ features: allowed_warnings
+
+ - do:
+ allowed_warnings:
+ - "index template [ds-template] has index patterns [ds-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [ds-template] will take precedence during new index creation"
+ indices.put_index_template:
+ name: ds-template
+ body:
+ index_patterns: ds-*
+ template:
+ settings:
+ number_of_shards: 1
+ number_of_replicas: 0
+ mappings:
+ properties:
+ field:
+ type: keyword
+ data_stream: {}
+ allow_auto_create: true
+
+ - do:
+ index:
+ index: ds-test
+ require_data_stream: true
+ body:
+ '@timestamp': '2022-12-12'
+ foo: bar
+
+ - do:
+ catch: /no matching index template with data stream template was found for it/
+ index:
+ index: index-test
+ require_data_stream: true
+ body:
+ '@timestamp': '2022-12-12'
+ foo: bar
+
+ - do:
+ index:
+ index: other-index
+ require_data_stream: false
+ body:
+ '@timestamp': '2022-12-12'
+ foo: bar
+
+ - do:
+ catch: /is not a data stream/
+ index:
+ index: other-index
+ require_data_stream: true
+ body:
+ '@timestamp': '2022-12-12'
+ foo: bar
+
+---
+"Testing require_data_stream in bulk requests":
+ - skip:
+ version: " - 8.12.99"
+ reason: "require_data_stream was introduced in 8.13.0"
+ features: allowed_warnings
+
+ - do:
+ allowed_warnings:
+ - "index template [ds-template] has index patterns [ds-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [ds-template] will take precedence during new index creation"
+ indices.put_index_template:
+ name: ds-template
+ body:
+ index_patterns: ds-*
+ template:
+ settings:
+ number_of_shards: 1
+ number_of_replicas: 0
+ mappings:
+ properties:
+ field:
+ type: keyword
+ data_stream: {}
+ allow_auto_create: true
+
+ - do:
+ bulk:
+ refresh: true
+ require_data_stream: true
+ body:
+ - index:
+ _index: new_index_not_created
+ - f: 1
+ - index:
+ _index: new_index_created
+ require_data_stream: false
+ - f: 2
+ - index:
+ _index: ds-other
+ op_type: create
+ - "@timestamp": "2024-01-01"
+ - match: { errors: true }
+ - match: { items.0.index.status: 404 }
+ - match: { items.0.index.error.type: index_not_found_exception }
+ - match: { items.0.index.error.reason: "no such index [new_index_not_created] and the index creation request requires a data stream, but no matching index template with data stream template was found for it" }
+ - match: { items.1.index.result: created }
+ - match: { items.2.create.result: created }
+
+ - do:
+ allowed_warnings:
+ - "index template [other-template] has index patterns [ds-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [other-template] will take precedence during new index creation"
+ indices.put_index_template:
+ name: other-template
+ body:
+ index_patterns: other-*
+ template:
+ settings:
+ number_of_shards: 1
+ number_of_replicas: 0
+ mappings:
+ properties:
+ field:
+ type: keyword
+ allow_auto_create: true
+
+ - do:
+ bulk:
+ refresh: true
+ require_data_stream: false
+ body:
+ - index:
+ _index: other-myindex
+ require_data_stream: true
+ op_type: create
+ - "@timestamp": "2024-01-01"
+ - match: { errors: true }
+ - match: { items.0.create.status: 404 }
+ - match: { items.0.create.error.type: index_not_found_exception }
+ - match: { items.0.create.error.reason: "no such index [other-myindex] and the index creation request requires a data stream, but no matching index template with data stream template was found for it" }
diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json b/rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json
index 79751e4b0f61b..9ced5d3e8c454 100644
--- a/rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json
+++ b/rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json
@@ -80,6 +80,10 @@
"type": "boolean",
"description": "Sets require_alias for all incoming documents. Defaults to unset (false)"
},
+ "require_data_stream": {
+ "type": "boolean",
+ "description": "When true, requires the destination to be a data stream (existing or to-be-created). Default is false"
+ },
"list_executed_pipelines": {
"type": "boolean",
"description": "Sets list_executed_pipelines for all incoming documents. Defaults to unset (false)"
diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/index.json b/rest-api-spec/src/main/resources/rest-api-spec/api/index.json
index bd94d653014a0..102ca4e012e85 100644
--- a/rest-api-spec/src/main/resources/rest-api-spec/api/index.json
+++ b/rest-api-spec/src/main/resources/rest-api-spec/api/index.json
@@ -101,6 +101,10 @@
"require_alias": {
"type": "boolean",
"description": "When true, requires destination to be an alias. Default is false"
+ },
+ "require_data_stream": {
+ "type": "boolean",
+ "description": "When true, requires the destination to be a data stream (existing or to-be-created). Default is false"
}
},
"body":{
diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java
index a730587f32c20..823ade37fe87c 100644
--- a/server/src/main/java/org/elasticsearch/TransportVersions.java
+++ b/server/src/main/java/org/elasticsearch/TransportVersions.java
@@ -188,6 +188,7 @@ static TransportVersion def(int id) {
public static final TransportVersion PEERFINDER_REPORTS_PEERS_MASTERS = def(8_575_00_0);
public static final TransportVersion ESQL_MULTI_CLUSTERS_ENRICH = def(8_576_00_0);
public static final TransportVersion NESTED_KNN_MORE_INNER_HITS = def(8_577_00_0);
+ public static final TransportVersion REQUIRE_DATA_STREAM_ADDED = def(8_578_00_0);
/*
* STOP! READ THIS FIRST! No, really,
diff --git a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java
index 2a9449b35c7b5..7f3578ce9f16f 100644
--- a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java
+++ b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java
@@ -41,6 +41,9 @@ public interface DocWriteRequest extends IndicesRequest, Accountable {
// Flag set for disallowing index auto creation for an individual write request.
String REQUIRE_ALIAS = "require_alias";
+ // Flag set for disallowing index auto creation if no matching data-stream index template is available.
+ String REQUIRE_DATA_STREAM = "require_data_stream";
+
// Flag indicating that the list of executed pipelines should be returned in the request
String LIST_EXECUTED_PIPELINES = "list_executed_pipelines";
@@ -149,6 +152,12 @@ public interface DocWriteRequest extends IndicesRequest, Accountable {
*/
boolean isRequireAlias();
+ /**
+ * Should this request override specifically require the destination to be a data stream?
+ * @return boolean flag, when true specifically requires a data stream
+ */
+ boolean isRequireDataStream();
+
/**
* Finalize the request before executing or routing it.
*/
diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java
index 87334afa3ed8a..f7d58e186d2fd 100644
--- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java
+++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java
@@ -244,7 +244,8 @@ ClusterState execute(
// This expression only evaluates to true when the argument is non-null and false
if (isSystemDataStream == false && Boolean.FALSE.equals(template.getAllowAutoCreate())) {
throw new IndexNotFoundException(
- "composable template " + template.indexPatterns() + " forbids index auto creation"
+ "composable template " + template.indexPatterns() + " forbids index auto creation",
+ request.index()
);
}
@@ -272,6 +273,13 @@ ClusterState execute(
successfulRequests.put(request, indexNames);
return clusterState;
} else {
+ if (request.isRequireDataStream()) {
+ throw new IndexNotFoundException(
+ "the index creation request requires a data stream, "
+ + "but no matching index template with data stream template was found for it",
+ request.index()
+ );
+ }
final var indexName = IndexNameExpressionResolver.resolveDateMathExpression(request.index());
if (isSystemIndex) {
if (indexName.equals(request.index()) == false) {
diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java
index 136f261dc3ef3..2ec6db339b6ef 100644
--- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java
+++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java
@@ -62,6 +62,8 @@ public class CreateIndexRequest extends AcknowledgedRequest
private String index;
+ private boolean requireDataStream;
+
private Settings settings = Settings.EMPTY;
private String mappings = "{}";
@@ -102,6 +104,11 @@ public CreateIndexRequest(StreamInput in) throws IOException {
if (in.getTransportVersion().onOrAfter(TransportVersions.V_7_12_0)) {
origin = in.readString();
}
+ if (in.getTransportVersion().onOrAfter(TransportVersions.REQUIRE_DATA_STREAM_ADDED)) {
+ requireDataStream = in.readBoolean();
+ } else {
+ requireDataStream = false;
+ }
}
public CreateIndexRequest() {}
@@ -446,6 +453,18 @@ public CreateIndexRequest waitForActiveShards(final int waitForActiveShards) {
return waitForActiveShards(ActiveShardCount.from(waitForActiveShards));
}
+ public boolean isRequireDataStream() {
+ return requireDataStream;
+ }
+
+ /**
+ * Set whether this CreateIndexRequest requires a data stream. The data stream may be pre-existing or to-be-created.
+ */
+ public CreateIndexRequest requireDataStream(boolean requireDataStream) {
+ this.requireDataStream = requireDataStream;
+ return this;
+ }
+
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@@ -468,6 +487,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.V_7_12_0)) {
out.writeString(origin);
}
+ if (out.getTransportVersion().onOrAfter(TransportVersions.REQUIRE_DATA_STREAM_ADDED)) {
+ out.writeOptionalBoolean(this.requireDataStream);
+ }
}
@Override
diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java
index 7052d4b1356ac..307cafbb9b8e1 100644
--- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java
+++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java
@@ -246,4 +246,12 @@ public CreateIndexRequestBuilder setWaitForActiveShards(ActiveShardCount waitFor
public CreateIndexRequestBuilder setWaitForActiveShards(final int waitForActiveShards) {
return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
}
+
+ /**
+ * Set whether this request requires a data stream. The data stream may be pre-existing or to-be-created.
+ */
+ public CreateIndexRequestBuilder setRequireDataStream(final boolean requireDataStream) {
+ request.requireDataStream(requireDataStream);
+ return this;
+ }
}
diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java
index 6d99f794e972a..7a2bd0e68608a 100644
--- a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java
+++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java
@@ -438,7 +438,19 @@ public BulkProcessor add(
lock.lock();
try {
ensureOpen();
- bulkRequest.add(data, defaultIndex, null, null, defaultPipeline, null, null, true, xContentType, RestApiVersion.current());
+ bulkRequest.add(
+ data,
+ defaultIndex,
+ null,
+ null,
+ defaultPipeline,
+ null,
+ null,
+ null,
+ true,
+ xContentType,
+ RestApiVersion.current()
+ );
bulkRequestToExecute = newBulkRequestIfNeeded();
} finally {
lock.unlock();
diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java
index cbe4252a0b6a1..6998ca4150ad5 100644
--- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java
+++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java
@@ -75,6 +75,7 @@ public class BulkRequest extends ActionRequest
private String globalRouting;
private String globalIndex;
private Boolean globalRequireAlias;
+ private Boolean globalRequireDatsStream;
private long sizeInBytes = 0;
@@ -232,7 +233,7 @@ public BulkRequest add(byte[] data, int from, int length, @Nullable String defau
* Adds a framed data in binary format
*/
public BulkRequest add(BytesReference data, @Nullable String defaultIndex, XContentType xContentType) throws IOException {
- return add(data, defaultIndex, null, null, null, null, null, true, xContentType, RestApiVersion.current());
+ return add(data, defaultIndex, null, null, null, null, null, null, true, xContentType, RestApiVersion.current());
}
/**
@@ -240,7 +241,7 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, XCont
*/
public BulkRequest add(BytesReference data, @Nullable String defaultIndex, boolean allowExplicitIndex, XContentType xContentType)
throws IOException {
- return add(data, defaultIndex, null, null, null, null, null, allowExplicitIndex, xContentType, RestApiVersion.current());
+ return add(data, defaultIndex, null, null, null, null, null, null, allowExplicitIndex, xContentType, RestApiVersion.current());
}
@@ -251,6 +252,7 @@ public BulkRequest add(
@Nullable FetchSourceContext defaultFetchSourceContext,
@Nullable String defaultPipeline,
@Nullable Boolean defaultRequireAlias,
+ @Nullable Boolean defaultRequireDataStream,
@Nullable Boolean defaultListExecutedPipelines,
boolean allowExplicitIndex,
XContentType xContentType,
@@ -259,6 +261,7 @@ public BulkRequest add(
String routing = valueOrDefault(defaultRouting, globalRouting);
String pipeline = valueOrDefault(defaultPipeline, globalPipeline);
Boolean requireAlias = valueOrDefault(defaultRequireAlias, globalRequireAlias);
+ Boolean requireDataStream = valueOrDefault(defaultRequireDataStream, globalRequireDatsStream);
new BulkRequestParser(true, restApiVersion).parse(
data,
defaultIndex,
@@ -266,6 +269,7 @@ public BulkRequest add(
defaultFetchSourceContext,
pipeline,
requireAlias,
+ requireDataStream,
defaultListExecutedPipelines,
allowExplicitIndex,
xContentType,
@@ -374,6 +378,10 @@ public Boolean requireAlias() {
return globalRequireAlias;
}
+ public Boolean requireDataStream() {
+ return globalRequireDatsStream;
+ }
+
/**
* Note for internal callers (NOT high level rest client),
* the global parameter setting is ignored when used with:
@@ -391,6 +399,11 @@ public BulkRequest requireAlias(Boolean globalRequireAlias) {
return this;
}
+ public BulkRequest requireDataStream(Boolean globalRequireDatsStream) {
+ this.globalRequireDatsStream = globalRequireDatsStream;
+ return this;
+ }
+
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java
index f1280587a0c55..5dccd1b55f554 100644
--- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java
+++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java
@@ -61,6 +61,7 @@ public final class BulkRequestParser {
private static final ParseField IF_SEQ_NO = new ParseField("if_seq_no");
private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term");
private static final ParseField REQUIRE_ALIAS = new ParseField(DocWriteRequest.REQUIRE_ALIAS);
+ private static final ParseField REQUIRE_DATA_STREAM = new ParseField(DocWriteRequest.REQUIRE_DATA_STREAM);
private static final ParseField LIST_EXECUTED_PIPELINES = new ParseField(DocWriteRequest.LIST_EXECUTED_PIPELINES);
private static final ParseField DYNAMIC_TEMPLATES = new ParseField("dynamic_templates");
@@ -127,6 +128,7 @@ public void parse(
@Nullable FetchSourceContext defaultFetchSourceContext,
@Nullable String defaultPipeline,
@Nullable Boolean defaultRequireAlias,
+ @Nullable Boolean defaultRequireDataStream,
@Nullable Boolean defaultListExecutedPipelines,
boolean allowExplicitIndex,
XContentType xContentType,
@@ -209,6 +211,7 @@ public void parse(
int retryOnConflict = 0;
String pipeline = defaultPipeline;
boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias;
+ boolean requireDataStream = defaultRequireDataStream != null && defaultRequireDataStream;
boolean listExecutedPipelines = defaultListExecutedPipelines != null && defaultListExecutedPipelines;
Map dynamicTemplates = Map.of();
@@ -263,6 +266,8 @@ public void parse(
fetchSourceContext = FetchSourceContext.fromXContent(parser);
} else if (REQUIRE_ALIAS.match(currentFieldName, parser.getDeprecationHandler())) {
requireAlias = parser.booleanValue();
+ } else if (REQUIRE_DATA_STREAM.match(currentFieldName, parser.getDeprecationHandler())) {
+ requireDataStream = parser.booleanValue();
} else if (LIST_EXECUTED_PIPELINES.match(currentFieldName, parser.getDeprecationHandler())) {
listExecutedPipelines = parser.booleanValue();
} else {
@@ -349,6 +354,7 @@ public void parse(
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setDynamicTemplates(dynamicTemplates)
.setRequireAlias(requireAlias)
+ .setRequireDataStream(requireDataStream)
.setListExecutedPipelines(listExecutedPipelines),
type
);
@@ -365,6 +371,7 @@ public void parse(
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setDynamicTemplates(dynamicTemplates)
.setRequireAlias(requireAlias)
+ .setRequireDataStream(requireDataStream)
.setListExecutedPipelines(listExecutedPipelines),
type
);
@@ -382,6 +389,7 @@ public void parse(
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setDynamicTemplates(dynamicTemplates)
.setRequireAlias(requireAlias)
+ .setRequireDataStream(requireDataStream)
.setListExecutedPipelines(listExecutedPipelines),
type
);
@@ -391,6 +399,12 @@ public void parse(
"Update requests do not support versioning. " + "Please use `if_seq_no` and `if_primary_term` instead"
);
}
+ if (requireDataStream) {
+ throw new IllegalArgumentException(
+ "Update requests do not support the `require_data_stream` flag, "
+ + "as data streams do not support update operations"
+ );
+ }
// TODO: support dynamic_templates in update requests
if (dynamicTemplates.isEmpty() == false) {
throw new IllegalArgumentException(
diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
index ea0399d0b87fe..3f1b1d8f6d182 100644
--- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
+++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
@@ -73,7 +73,6 @@
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -346,7 +345,7 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec
// Attempt to create all the indices that we're going to need during the bulk before we start.
// Step 1: collect all the indices in the request
- final Map indices = bulkRequest.requests.stream()
+ final Map indices = bulkRequest.requests.stream()
// delete requests should not attempt to create the index (if the index does not
// exist), unless an external versioning is used
.filter(
@@ -354,20 +353,23 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec
|| request.versionType() == VersionType.EXTERNAL
|| request.versionType() == VersionType.EXTERNAL_GTE
)
- .collect(Collectors.toMap(DocWriteRequest::index, DocWriteRequest::isRequireAlias, (v1, v2) -> v1 || v2));
+ .collect(
+ Collectors.toMap(
+ DocWriteRequest::index,
+ request -> new ReducedRequestInfo(request.isRequireAlias(), request.isRequireDataStream()),
+ ReducedRequestInfo::merge
+ )
+ );
// Step 2: filter the list of indices to find those that don't currently exist.
final Map indicesThatCannotBeCreated = new HashMap<>();
- Set autoCreateIndices = new HashSet<>();
- ClusterState state = clusterService.state();
- for (Map.Entry indexAndFlag : indices.entrySet()) {
- final String index = indexAndFlag.getKey();
- boolean shouldAutoCreate = indexNameExpressionResolver.hasIndexAbstraction(index, state) == false;
+ final ClusterState state = clusterService.state();
+ Map indicesToAutoCreate = indices.entrySet()
+ .stream()
+ .filter(entry -> indexNameExpressionResolver.hasIndexAbstraction(entry.getKey(), state) == false)
// We should only auto create if we are not requiring it to be an alias
- if (shouldAutoCreate && (indexAndFlag.getValue() == false)) {
- autoCreateIndices.add(index);
- }
- }
+ .filter(entry -> entry.getValue().isRequireAlias == false)
+ .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().isRequireDataStream));
// Step 3: Collect all the data streams that need to be rolled over before writing
Set dataStreamsToBeRolledOver = indices.keySet().stream().filter(target -> {
@@ -381,7 +383,7 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec
bulkRequest,
executorName,
listener,
- autoCreateIndices,
+ indicesToAutoCreate,
dataStreamsToBeRolledOver,
indicesThatCannotBeCreated,
startTime
@@ -397,14 +399,14 @@ protected void createMissingIndicesAndIndexData(
BulkRequest bulkRequest,
String executorName,
ActionListener listener,
- Set autoCreateIndices,
+ Map indicesToAutoCreate,
Set dataStreamsToBeRolledOver,
Map indicesThatCannotBeCreated,
long startTime
) {
final AtomicArray responses = new AtomicArray<>(bulkRequest.requests.size());
// Optimizing when there are no prerequisite actions
- if (autoCreateIndices.isEmpty() && dataStreamsToBeRolledOver.isEmpty()) {
+ if (indicesToAutoCreate.isEmpty() && dataStreamsToBeRolledOver.isEmpty()) {
executeBulk(task, bulkRequest, startTime, listener, executorName, responses, indicesThatCannotBeCreated);
return;
}
@@ -415,8 +417,9 @@ protected void doRun() {
}
});
try (RefCountingRunnable refs = new RefCountingRunnable(executeBulkRunnable)) {
- for (String index : autoCreateIndices) {
- createIndex(index, bulkRequest.timeout(), ActionListener.releaseAfter(new ActionListener<>() {
+ for (Map.Entry indexEntry : indicesToAutoCreate.entrySet()) {
+ final String index = indexEntry.getKey();
+ createIndex(index, indexEntry.getValue(), bulkRequest.timeout(), ActionListener.releaseAfter(new ActionListener<>() {
@Override
public void onResponse(CreateIndexResponse createIndexResponse) {}
@@ -564,9 +567,10 @@ private static boolean isSystemIndex(SortedMap indices
}
}
- void createIndex(String index, TimeValue timeout, ActionListener listener) {
+ void createIndex(String index, boolean requireDataStream, TimeValue timeout, ActionListener listener) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest();
createIndexRequest.index(index);
+ createIndexRequest.requireDataStream(requireDataStream);
createIndexRequest.cause("auto(bulk api)");
createIndexRequest.masterNodeTimeout(timeout);
client.execute(AutoCreateAction.INSTANCE, createIndexRequest, listener);
@@ -597,6 +601,15 @@ protected long buildTookInMillis(long startTimeNanos) {
return TimeUnit.NANOSECONDS.toMillis(relativeTime() - startTimeNanos);
}
+ private record ReducedRequestInfo(boolean isRequireAlias, boolean isRequireDataStream) {
+ private ReducedRequestInfo merge(ReducedRequestInfo other) {
+ return new ReducedRequestInfo(
+ this.isRequireAlias || other.isRequireAlias,
+ this.isRequireDataStream || other.isRequireDataStream
+ );
+ }
+ }
+
/**
* retries on retryable cluster blocks, resolves item requests,
* constructs shard bulk requests and delegates execution to shard bulk action
@@ -655,6 +668,9 @@ protected void doRun() {
if (addFailureIfIndexCannotBeCreated(docWriteRequest, i)) {
continue;
}
+ if (addFailureIfRequiresDataStreamAndNoParentDataStream(docWriteRequest, i, metadata)) {
+ continue;
+ }
IndexAbstraction ia = null;
boolean includeDataStreams = docWriteRequest.opType() == DocWriteRequest.OpType.CREATE;
try {
@@ -829,6 +845,22 @@ private boolean addFailureIfRequiresAliasAndAliasIsMissing(DocWriteRequest> re
return false;
}
+ private boolean addFailureIfRequiresDataStreamAndNoParentDataStream(DocWriteRequest> request, int idx, final Metadata metadata) {
+ if (request.isRequireDataStream() && (metadata.indexIsADataStream(request.index()) == false)) {
+ Exception exception = new ResourceNotFoundException(
+ "["
+ + DocWriteRequest.REQUIRE_DATA_STREAM
+ + "] request flag is [true] and ["
+ + request.index()
+ + "] is not a data stream",
+ request.index()
+ );
+ addFailure(request, idx, exception);
+ return true;
+ }
+ return false;
+ }
+
private boolean addFailureIfIndexIsClosed(DocWriteRequest> request, Index concreteIndex, int idx, final Metadata metadata) {
IndexMetadata indexMetadata = metadata.getIndexSafe(concreteIndex);
if (indexMetadata.getState() == IndexMetadata.State.CLOSE) {
diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java
index a44c8091aaa2e..e77d4ab9e0b85 100644
--- a/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java
+++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java
@@ -69,7 +69,7 @@ protected void createMissingIndicesAndIndexData(
BulkRequest bulkRequest,
String executorName,
ActionListener listener,
- Set autoCreateIndices,
+ Map indicesToAutoCreate,
Set dataStreamsToRollover,
Map indicesThatCannotBeCreated,
long startTime
diff --git a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java
index 70f0a9a12e02e..e4b8edea58114 100644
--- a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java
+++ b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java
@@ -231,6 +231,11 @@ public boolean isRequireAlias() {
return false;
}
+ @Override
+ public boolean isRequireDataStream() {
+ return false;
+ }
+
@Override
public void process(IndexRouting indexRouting) {
// Nothing to do
diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java
index 285346adcd13f..eda28eb4e139e 100644
--- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java
+++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java
@@ -107,6 +107,9 @@ public class IndexRequest extends ReplicatedWriteRequest implement
private boolean isPipelineResolved;
private boolean requireAlias;
+
+ private boolean requireDataStream;
+
/**
* This indicates whether the response to this request ought to list the ingest pipelines that were executed on the document
*/
@@ -188,6 +191,11 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
: new ArrayList<>(possiblyImmutableExecutedPipelines);
}
}
+ if (in.getTransportVersion().onOrAfter(TransportVersions.REQUIRE_DATA_STREAM_ADDED)) {
+ requireDataStream = in.readBoolean();
+ } else {
+ requireDataStream = false;
+ }
}
public IndexRequest() {
@@ -739,6 +747,9 @@ private void writeBody(StreamOutput out) throws IOException {
out.writeOptionalCollection(executedPipelines, StreamOutput::writeString);
}
}
+ if (out.getTransportVersion().onOrAfter(TransportVersions.REQUIRE_DATA_STREAM_ADDED)) {
+ out.writeBoolean(requireDataStream);
+ }
}
@Override
@@ -794,6 +805,19 @@ public boolean isRequireAlias() {
return requireAlias;
}
+ @Override
+ public boolean isRequireDataStream() {
+ return requireDataStream;
+ }
+
+ /**
+ * Set whether this IndexRequest requires a data stream. The data stream may be pre-existing or to-be-created.
+ */
+ public IndexRequest setRequireDataStream(boolean requireDataStream) {
+ this.requireDataStream = requireDataStream;
+ return this;
+ }
+
@Override
public Index getConcreteWriteIndex(IndexAbstraction ia, Metadata metadata) {
return ia.getWriteIndex(this, metadata);
diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java
index 5e156070d0154..b8faf39514cbe 100644
--- a/server/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java
+++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java
@@ -221,4 +221,12 @@ public IndexRequestBuilder setRequireAlias(boolean requireAlias) {
request.setRequireAlias(requireAlias);
return this;
}
+
+ /**
+ * Sets the require_data_stream flag
+ */
+ public IndexRequestBuilder setRequireDataStream(boolean requireDataStream) {
+ request.setRequireDataStream(requireDataStream);
+ return this;
+ }
}
diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java
index 800eca618c5bc..73cb6ebe2e39a 100644
--- a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java
+++ b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java
@@ -833,6 +833,12 @@ public boolean isRequireAlias() {
return requireAlias;
}
+ @Override
+ public boolean isRequireDataStream() {
+ // Always false because data streams cannot accept update operations
+ return false;
+ }
+
@Override
public void process(IndexRouting indexRouting) {
// Nothing to do
diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java
index c7bd513ff84d4..88bed844558f2 100644
--- a/server/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java
+++ b/server/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java
@@ -348,4 +348,11 @@ public UpdateRequestBuilder setScriptedUpsert(boolean scriptedUpsert) {
return this;
}
+ /**
+ * Sets the require_alias flag
+ */
+ public UpdateRequestBuilder setRequireAlias(boolean requireAlias) {
+ request.setRequireAlias(requireAlias);
+ return this;
+ }
}
diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java
index 64b234c8f5d2b..2a5aa3c603c9f 100644
--- a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java
+++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java
@@ -897,6 +897,15 @@ public Map findDataStreams(String... concreteIndices) {
return builder.build();
}
+ /**
+ * Checks whether the provided index is a data stream.
+ */
+ public boolean indexIsADataStream(String indexName) {
+ final SortedMap lookup = getIndicesLookup();
+ IndexAbstraction abstraction = lookup.get(indexName);
+ return abstraction != null && abstraction.getType() == IndexAbstraction.Type.DATA_STREAM;
+ }
+
@SuppressWarnings("unchecked")
private static MappingMetadata filterFields(MappingMetadata mappingMetadata, Predicate fieldPredicate) {
if (mappingMetadata == null) {
diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java
index 83a7728b82a4a..0bb97b1f51ff5 100644
--- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java
+++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java
@@ -80,6 +80,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
bulkRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
}
Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, false);
+ boolean defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false);
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
bulkRequest.setRefreshPolicy(request.param("refresh"));
bulkRequest.add(
@@ -89,6 +90,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
defaultFetchSourceContext,
defaultPipeline,
defaultRequireAlias,
+ defaultRequireDataStream,
defaultListExecutedPipelines,
allowExplicitIndex,
request.getXContentType(),
diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java
index fed7d8606ba01..cdda3ea38129f 100644
--- a/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java
+++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java
@@ -133,6 +133,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
indexRequest.setIfSeqNo(request.paramAsLong("if_seq_no", indexRequest.ifSeqNo()));
indexRequest.setIfPrimaryTerm(request.paramAsLong("if_primary_term", indexRequest.ifPrimaryTerm()));
indexRequest.setRequireAlias(request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, indexRequest.isRequireAlias()));
+ indexRequest.setRequireDataStream(request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, indexRequest.isRequireDataStream()));
String sOpType = request.param("op_type");
String waitForActiveShards = request.param("wait_for_active_shards");
if (waitForActiveShards != null) {
diff --git a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestAction.java b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestAction.java
index 2c9b84f78636a..d13c39f112878 100644
--- a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestAction.java
+++ b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestAction.java
@@ -84,6 +84,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
defaultFetchSourceContext,
defaultPipeline,
null,
+ null,
true,
true,
request.getXContentType(),
diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java
index a60b42de12d6e..f228326955f38 100644
--- a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java
+++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java
@@ -29,7 +29,7 @@ public void testIndexRequest() throws IOException {
""");
BulkRequestParser parser = new BulkRequestParser(randomBoolean(), RestApiVersion.current());
final AtomicBoolean parsed = new AtomicBoolean();
- parser.parse(request, "foo", null, null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> {
+ parser.parse(request, "foo", null, null, null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> {
assertFalse(parsed.get());
assertEquals("foo", indexRequest.index());
assertEquals("bar", indexRequest.id());
@@ -38,7 +38,7 @@ public void testIndexRequest() throws IOException {
}, req -> fail(), req -> fail());
assertTrue(parsed.get());
- parser.parse(request, "foo", null, null, null, true, null, false, XContentType.JSON, (indexRequest, type) -> {
+ parser.parse(request, "foo", null, null, null, true, null, null, false, XContentType.JSON, (indexRequest, type) -> {
assertTrue(indexRequest.isRequireAlias());
}, req -> fail(), req -> fail());
@@ -46,7 +46,7 @@ public void testIndexRequest() throws IOException {
{ "index":{ "_id": "bar", "require_alias": true } }
{}
""");
- parser.parse(request, "foo", null, null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> {
+ parser.parse(request, "foo", null, null, null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> {
assertTrue(indexRequest.isRequireAlias());
}, req -> fail(), req -> fail());
@@ -54,7 +54,7 @@ public void testIndexRequest() throws IOException {
{ "index":{ "_id": "bar", "require_alias": false } }
{}
""");
- parser.parse(request, "foo", null, null, null, true, null, false, XContentType.JSON, (indexRequest, type) -> {
+ parser.parse(request, "foo", null, null, null, true, null, null, false, XContentType.JSON, (indexRequest, type) -> {
assertFalse(indexRequest.isRequireAlias());
}, req -> fail(), req -> fail());
}
@@ -73,6 +73,7 @@ public void testDeleteRequest() throws IOException {
null,
null,
null,
+ null,
false,
XContentType.JSON,
(req, type) -> fail(),
@@ -94,7 +95,7 @@ public void testUpdateRequest() throws IOException {
""");
BulkRequestParser parser = new BulkRequestParser(randomBoolean(), RestApiVersion.current());
final AtomicBoolean parsed = new AtomicBoolean();
- parser.parse(request, "foo", null, null, null, null, null, false, XContentType.JSON, (req, type) -> fail(), updateRequest -> {
+ parser.parse(request, "foo", null, null, null, null, null, null, false, XContentType.JSON, (req, type) -> fail(), updateRequest -> {
assertFalse(parsed.get());
assertEquals("foo", updateRequest.index());
assertEquals("bar", updateRequest.id());
@@ -103,7 +104,7 @@ public void testUpdateRequest() throws IOException {
}, req -> fail());
assertTrue(parsed.get());
- parser.parse(request, "foo", null, null, null, true, null, false, XContentType.JSON, (req, type) -> fail(), updateRequest -> {
+ parser.parse(request, "foo", null, null, null, true, null, null, false, XContentType.JSON, (req, type) -> fail(), updateRequest -> {
assertTrue(updateRequest.isRequireAlias());
}, req -> fail());
@@ -111,7 +112,7 @@ public void testUpdateRequest() throws IOException {
{ "update":{ "_id": "bar", "require_alias": true } }
{}
""");
- parser.parse(request, "foo", null, null, null, null, null, false, XContentType.JSON, (req, type) -> fail(), updateRequest -> {
+ parser.parse(request, "foo", null, null, null, null, null, null, false, XContentType.JSON, (req, type) -> fail(), updateRequest -> {
assertTrue(updateRequest.isRequireAlias());
}, req -> fail());
@@ -119,7 +120,7 @@ public void testUpdateRequest() throws IOException {
{ "update":{ "_id": "bar", "require_alias": false } }
{}
""");
- parser.parse(request, "foo", null, null, null, true, null, false, XContentType.JSON, (req, type) -> fail(), updateRequest -> {
+ parser.parse(request, "foo", null, null, null, true, null, null, false, XContentType.JSON, (req, type) -> fail(), updateRequest -> {
assertFalse(updateRequest.isRequireAlias());
}, req -> fail());
}
@@ -139,6 +140,7 @@ public void testBarfOnLackOfTrailingNewline() {
null,
null,
null,
+ null,
false,
XContentType.JSON,
(req, type) -> fail(),
@@ -166,6 +168,7 @@ public void testFailOnExplicitIndex() {
null,
null,
null,
+ null,
false,
XContentType.JSON,
(req, type) -> fail(),
@@ -183,7 +186,7 @@ public void testTypesStillParsedForBulkMonitoring() throws IOException {
""");
BulkRequestParser parser = new BulkRequestParser(false, RestApiVersion.current());
final AtomicBoolean parsed = new AtomicBoolean();
- parser.parse(request, "foo", null, null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> {
+ parser.parse(request, "foo", null, null, null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> {
assertFalse(parsed.get());
assertEquals("foo", indexRequest.index());
assertEquals("bar", indexRequest.id());
@@ -210,6 +213,7 @@ public void testParseDeduplicatesParameterStrings() throws IOException {
null,
null,
null,
+ null,
true,
XContentType.JSON,
(indexRequest, type) -> indexRequests.add(indexRequest),
@@ -241,6 +245,7 @@ public void testFailOnInvalidAction() {
null,
null,
null,
+ null,
false,
XContentType.JSON,
(req, type) -> fail(),
@@ -260,11 +265,11 @@ public void testListExecutedPipelines() throws IOException {
{}
""");
BulkRequestParser parser = new BulkRequestParser(randomBoolean(), RestApiVersion.current());
- parser.parse(request, "foo", null, null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> {
+ parser.parse(request, "foo", null, null, null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> {
assertFalse(indexRequest.getListExecutedPipelines());
}, req -> fail(), req -> fail());
- parser.parse(request, "foo", null, null, null, null, true, false, XContentType.JSON, (indexRequest, type) -> {
+ parser.parse(request, "foo", null, null, null, null, null, true, false, XContentType.JSON, (indexRequest, type) -> {
assertTrue(indexRequest.getListExecutedPipelines());
}, req -> fail(), req -> fail());
@@ -272,7 +277,7 @@ public void testListExecutedPipelines() throws IOException {
{ "index":{ "_id": "bar", "op_type": "create" } }
{}
""");
- parser.parse(request, "foo", null, null, null, null, true, false, XContentType.JSON, (indexRequest, type) -> {
+ parser.parse(request, "foo", null, null, null, null, null, true, false, XContentType.JSON, (indexRequest, type) -> {
assertTrue(indexRequest.getListExecutedPipelines());
}, req -> fail(), req -> fail());
@@ -280,7 +285,7 @@ public void testListExecutedPipelines() throws IOException {
{ "create":{ "_id": "bar" } }
{}
""");
- parser.parse(request, "foo", null, null, null, null, true, false, XContentType.JSON, (indexRequest, type) -> {
+ parser.parse(request, "foo", null, null, null, null, null, true, false, XContentType.JSON, (indexRequest, type) -> {
assertTrue(indexRequest.getListExecutedPipelines());
}, req -> fail(), req -> fail());
@@ -288,7 +293,7 @@ public void testListExecutedPipelines() throws IOException {
{ "index":{ "_id": "bar", "list_executed_pipelines": "true" } }
{}
""");
- parser.parse(request, "foo", null, null, null, null, false, false, XContentType.JSON, (indexRequest, type) -> {
+ parser.parse(request, "foo", null, null, null, null, null, false, false, XContentType.JSON, (indexRequest, type) -> {
assertTrue(indexRequest.getListExecutedPipelines());
}, req -> fail(), req -> fail());
@@ -296,7 +301,7 @@ public void testListExecutedPipelines() throws IOException {
{ "index":{ "_id": "bar", "list_executed_pipelines": "false" } }
{}
""");
- parser.parse(request, "foo", null, null, null, null, true, false, XContentType.JSON, (indexRequest, type) -> {
+ parser.parse(request, "foo", null, null, null, null, null, true, false, XContentType.JSON, (indexRequest, type) -> {
assertFalse(indexRequest.getListExecutedPipelines());
}, req -> fail(), req -> fail());
}
diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java
index 75833052dd4c8..70be3207486ec 100644
--- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java
+++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java
@@ -139,7 +139,7 @@ void executeBulk(
}
@Override
- void createIndex(String index, TimeValue timeout, ActionListener listener) {
+ void createIndex(String index, boolean requireDataStream, TimeValue timeout, ActionListener listener) {
try {
simulateAutoCreate.accept(index);
// If we try to create an index just immediately assume it worked
diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java
index f30bceada65d9..188adf396435f 100644
--- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java
+++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java
@@ -154,7 +154,7 @@ void executeBulk(
}
@Override
- void createIndex(String index, TimeValue timeout, ActionListener listener) {
+ void createIndex(String index, boolean requireDataStream, TimeValue timeout, ActionListener listener) {
indexCreated = true;
listener.onResponse(null);
}
diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java
index a2e164f6a242c..c3a1747902893 100644
--- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java
+++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java
@@ -92,7 +92,7 @@ class TestTransportBulkAction extends TransportBulkAction {
}
@Override
- void createIndex(String index, TimeValue timeout, ActionListener listener) {
+ void createIndex(String index, boolean requireDataStream, TimeValue timeout, ActionListener listener) {
indexCreated = true;
if (beforeIndexCreation != null) {
beforeIndexCreation.run();
diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java
index 49dff864e7374..0a3adaf54a8ea 100644
--- a/server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java
+++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java
@@ -52,7 +52,9 @@
public class TransportSimulateBulkActionTests extends ESTestCase {
- /** Services needed by bulk action */
+ /**
+ * Services needed by bulk action
+ */
private TransportService transportService;
private ClusterService clusterService;
private TestThreadPool threadPool;
@@ -80,7 +82,7 @@ class TestTransportSimulateBulkAction extends TransportSimulateBulkAction {
}
@Override
- void createIndex(String index, TimeValue timeout, ActionListener listener) {
+ void createIndex(String index, boolean requireDataStream, TimeValue timeout, ActionListener listener) {
indexCreated = true;
if (beforeIndexCreation != null) {
beforeIndexCreation.run();
@@ -189,7 +191,7 @@ public void onFailure(Exception e) {
fail(e, "Unexpected error");
}
};
- Set autoCreateIndices = Set.of(); // unused
+ Map indicesToAutoCreate = Map.of(); // unused
Set dataStreamsToRollover = Set.of(); // unused
Map indicesThatCannotBeCreated = Map.of(); // unused
long startTime = 0;
@@ -198,7 +200,7 @@ public void onFailure(Exception e) {
bulkRequest,
randomAlphaOfLength(10),
listener,
- autoCreateIndices,
+ indicesToAutoCreate,
dataStreamsToRollover,
indicesThatCannotBeCreated,
startTime
diff --git a/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java b/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java
index 38ffc6c46c3f3..e505f04e0ce06 100644
--- a/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java
+++ b/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java
@@ -46,12 +46,10 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import static org.apache.lucene.tests.util.LuceneTestCase.expectThrows;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
@@ -463,6 +461,7 @@ public void testSerialization() throws IOException {
assertThat(copy.ifSeqNo(), equalTo(indexRequest.ifSeqNo()));
assertThat(copy.getFinalPipeline(), equalTo(indexRequest.getFinalPipeline()));
assertThat(copy.ifPrimaryTerm(), equalTo(indexRequest.ifPrimaryTerm()));
+ assertThat(copy.isRequireDataStream(), equalTo(indexRequest.isRequireDataStream()));
}
private IndexRequest createTestInstance() {
@@ -470,6 +469,7 @@ private IndexRequest createTestInstance() {
indexRequest.setPipeline(randomAlphaOfLength(15));
indexRequest.setRequestId(randomLong());
indexRequest.setRequireAlias(randomBoolean());
+ indexRequest.setRequireDataStream(randomBoolean());
indexRequest.setIfSeqNo(randomNonNegativeLong());
indexRequest.setFinalPipeline(randomAlphaOfLength(20));
indexRequest.setIfPrimaryTerm(randomNonNegativeLong());
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/action/MonitoringBulkRequest.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/action/MonitoringBulkRequest.java
index 2b252607b05ad..638e57207fbeb 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/action/MonitoringBulkRequest.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/action/MonitoringBulkRequest.java
@@ -91,6 +91,7 @@ public MonitoringBulkRequest add(
null,
null,
null,
+ null,
true,
xContentType,
(indexRequest, type) -> {
diff --git a/x-pack/qa/runtime-fields/src/main/java/org/elasticsearch/xpack/runtimefields/test/CoreTestTranslater.java b/x-pack/qa/runtime-fields/src/main/java/org/elasticsearch/xpack/runtimefields/test/CoreTestTranslater.java
index badc04800e40f..3b602a53a76a4 100644
--- a/x-pack/qa/runtime-fields/src/main/java/org/elasticsearch/xpack/runtimefields/test/CoreTestTranslater.java
+++ b/x-pack/qa/runtime-fields/src/main/java/org/elasticsearch/xpack/runtimefields/test/CoreTestTranslater.java
@@ -352,6 +352,7 @@ private boolean handleBulk(ApiCallSection bulk) {
defaultPipeline,
null,
null,
+ null,
true,
XContentType.JSON,
(index, type) -> indexRequests.add(index),