From c64b6811a1d7b2294735c8b5b1f9d86177b3edb2 Mon Sep 17 00:00:00 2001
From: Eyal Koren <41850454+eyalkoren@users.noreply.github.com>
Date: Tue, 7 Nov 2023 19:53:14 +0700
Subject: [PATCH 01/22] Adding require_data_stream feature
---
.../noop/action/bulk/RestNoopBulkAction.java | 2 +
.../org/elasticsearch/client/Request.java | 2 +-
.../datastreams/AutoCreateDataStreamIT.java | 74 +++++++++++++++---
.../data_stream/190_require_data_stream.yml | 37 +++++++++
.../resources/rest-api-spec/api/bulk.json | 4 +
.../resources/rest-api-spec/api/index.json | 4 +
.../rest-api-spec/api/indices.create.json | 4 +
.../resources/rest-api-spec/api/update.json | 4 +
.../org/elasticsearch/TransportVersions.java | 1 +
.../elasticsearch/action/DocWriteRequest.java | 9 +++
.../indices/create/AutoCreateAction.java | 9 ++-
.../indices/create/CreateIndexRequest.java | 24 ++++++
.../create/CreateIndexRequestBuilder.java | 10 +++
.../action/bulk/BulkProcessor.java | 14 +++-
.../action/bulk/BulkRequest.java | 17 ++++-
.../action/bulk/BulkRequestParser.java | 9 +++
.../action/bulk/TransportBulkAction.java | 75 ++++++++++++++-----
.../action/delete/DeleteRequest.java | 5 ++
.../action/index/IndexRequest.java | 26 +++++++
.../action/index/IndexRequestBuilder.java | 8 ++
.../action/update/TransportUpdateAction.java | 15 +++-
.../action/update/UpdateRequest.java | 24 ++++++
.../action/update/UpdateRequestBuilder.java | 15 ++++
.../cluster/metadata/Metadata.java | 9 +++
.../index/reindex/ReindexRequest.java | 2 +
.../rest/action/document/RestBulkAction.java | 2 +
.../rest/action/document/RestIndexAction.java | 1 +
.../action/bulk/BulkRequestParserTests.java | 35 +++++----
...ActionIndicesThatCannotBeCreatedTests.java | 2 +-
.../bulk/TransportBulkActionIngestTests.java | 2 +-
.../action/bulk/TransportBulkActionTests.java | 2 +-
.../action/MonitoringBulkRequest.java | 1 +
.../test/CoreTestTranslater.java | 1 +
33 files changed, 396 insertions(+), 53 deletions(-)
create mode 100644 modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_require_data_stream.yml
diff --git a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/RestNoopBulkAction.java b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/RestNoopBulkAction.java
index 6c04cda6a66a9..6ad1bac8d6e32 100644
--- a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/RestNoopBulkAction.java
+++ b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/RestNoopBulkAction.java
@@ -54,6 +54,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
String defaultRouting = request.param("routing");
String defaultPipeline = request.param("pipeline");
Boolean defaultRequireAlias = request.paramAsBoolean("require_alias", null);
+ Boolean defaultRequireDataStream = request.paramAsBoolean("require_data_stream", null);
Boolean defaultListExecutedPipelines = request.paramAsBoolean("list_executed_pipelines", null);
String waitForActiveShards = request.param("wait_for_active_shards");
@@ -69,6 +70,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
null,
defaultPipeline,
defaultRequireAlias,
+ defaultRequireDataStream,
defaultListExecutedPipelines,
true,
request.getXContentType(),
diff --git a/client/rest/src/main/java/org/elasticsearch/client/Request.java b/client/rest/src/main/java/org/elasticsearch/client/Request.java
index 6423bee1cb44e..8195e3fdc8a79 100644
--- a/client/rest/src/main/java/org/elasticsearch/client/Request.java
+++ b/client/rest/src/main/java/org/elasticsearch/client/Request.java
@@ -67,7 +67,7 @@ public String getEndpoint() {
/**
* Add a query string parameter.
* @param name the name of the url parameter. Must not be null.
- * @param value the value of the url url parameter. If {@code null} then
+ * @param value the value of the url parameter. If {@code null} then
* the parameter is sent as {@code name} rather than {@code name=value}
* @throws IllegalArgumentException if a parameter with that name has
* already been set
diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/AutoCreateDataStreamIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/AutoCreateDataStreamIT.java
index 5ea366784c66a..8b3ad5ba1be26 100644
--- a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/AutoCreateDataStreamIT.java
+++ b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/AutoCreateDataStreamIT.java
@@ -7,6 +7,7 @@
*/
package org.elasticsearch.datastreams;
+import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
@@ -23,6 +24,7 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.Matchers.containsString;
+@SuppressWarnings("resource")
public class AutoCreateDataStreamIT extends DisabledSecurityDataStreamTestCase {
/**
@@ -31,7 +33,7 @@ public class AutoCreateDataStreamIT extends DisabledSecurityDataStreamTestCase {
*/
public void testCanAutoCreateDataStreamWhenAutoCreateIndexDisabled() throws IOException {
configureAutoCreateIndex(false);
- createTemplateWithAllowAutoCreate(null);
+ createTemplate(null, true);
assertOK(this.indexDocument());
}
@@ -40,7 +42,7 @@ public void testCanAutoCreateDataStreamWhenAutoCreateIndexDisabled() throws IOEx
* and that template has allow_auto_create set to true.
*/
public void testCanAutoCreateDataStreamWhenExplicitlyAllowedByTemplate() throws IOException {
- createTemplateWithAllowAutoCreate(true);
+ createTemplate(true, true);
// Attempt to add a document to a non-existing index. Auto-creating the index should succeed because the index name
// matches the template pattern
@@ -52,15 +54,58 @@ public void testCanAutoCreateDataStreamWhenExplicitlyAllowedByTemplate() throws
* allow_auto_create explicitly to false.
*/
public void testCannotAutoCreateDataStreamWhenDisallowedByTemplate() throws IOException {
- createTemplateWithAllowAutoCreate(false);
+ createTemplate(false, true);
- // Attempt to add a document to a non-existing index. Auto-creating the index should succeed because the index name
- // matches the template pattern
+ // Auto-creating the index should fail when the template disallows that
final ResponseException responseException = expectThrows(ResponseException.class, this::indexDocument);
assertThat(
Streams.copyToString(new InputStreamReader(responseException.getResponse().getEntity().getContent(), UTF_8)),
- containsString("no such index [composable template [recipe*] forbids index auto creation]")
+ containsString("no such index [recipe_kr] and composable template [recipe*] forbids index auto creation")
+ );
+ }
+
+ /**
+ * Check that if require_data_stream is set to true, automatically creating an index is allowed only
+ * if its name matches an index template AND it contains a data-stream template
+ */
+ public void testCannotAutoCreateDataStreamWhenNoDataStreamTemplateMatch() throws IOException {
+ createTemplate(true, true);
+
+ final Request request = prepareIndexRequest("ingredients_kr");
+ request.addParameter(DocWriteRequest.REQUIRE_DATA_STREAM, Boolean.TRUE.toString());
+
+ // Attempt to add a document to a non-existing index. Auto-creating the index should fail because the index name doesn't
+ // match the template pattern and the request requires a data stream template
+ final ResponseException responseException = expectThrows(ResponseException.class, () -> client().performRequest(request));
+
+ assertThat(
+ Streams.copyToString(new InputStreamReader(responseException.getResponse().getEntity().getContent(), UTF_8)),
+ containsString(
+ "no such index [ingredients_kr] and the index creation request requires a data stream, but no matching index template with data stream template was found for it"
+ )
+ );
+ }
+
+ /**
+ * Check that if require_data_stream is set to true, automatically creating an index is allowed only
+ * if its name matches an index template AND it contains a data-stream template
+ */
+ public void testCannotAutoCreateDataStreamWhenMatchingTemplateIsNotDataStream() throws IOException {
+ createTemplate(true, false);
+
+ final Request request = prepareIndexRequest("recipe_kr");
+ request.addParameter(DocWriteRequest.REQUIRE_DATA_STREAM, Boolean.TRUE.toString());
+
+ // Attempt to add a document to a non-existing index. Auto-creating the index should fail because the index name doesn't
+ // match the template pattern and the request requires a data stream template
+ final ResponseException responseException = expectThrows(ResponseException.class, () -> client().performRequest(request));
+
+ assertThat(
+ Streams.copyToString(new InputStreamReader(responseException.getResponse().getEntity().getContent(), UTF_8)),
+ containsString(
+ "no such index [recipe_kr] and the index creation request requires a data stream, but no matching index template with data stream template was found for it"
+ )
);
}
@@ -78,7 +123,7 @@ private void configureAutoCreateIndex(boolean value) throws IOException {
assertOK(settingsResponse);
}
- private void createTemplateWithAllowAutoCreate(Boolean allowAutoCreate) throws IOException {
+ private void createTemplate(Boolean allowAutoCreate, boolean addDataStreamTemplate) throws IOException {
XContentBuilder b = JsonXContent.contentBuilder();
b.startObject();
{
@@ -86,8 +131,10 @@ private void createTemplateWithAllowAutoCreate(Boolean allowAutoCreate) throws I
if (allowAutoCreate != null) {
b.field("allow_auto_create", allowAutoCreate);
}
- b.startObject("data_stream");
- b.endObject();
+ if (addDataStreamTemplate) {
+ b.startObject("data_stream");
+ b.endObject();
+ }
}
b.endObject();
@@ -98,8 +145,13 @@ private void createTemplateWithAllowAutoCreate(Boolean allowAutoCreate) throws I
}
private Response indexDocument() throws IOException {
- final Request indexDocumentRequest = new Request("POST", "recipe_kr/_doc");
- indexDocumentRequest.setJsonEntity("{ \"@timestamp\": \"" + Instant.now() + "\", \"name\": \"Kimchi\" }");
+ final Request indexDocumentRequest = prepareIndexRequest("recipe_kr");
return client().performRequest(indexDocumentRequest);
}
+
+ private Request prepareIndexRequest(String indexName) {
+ final Request indexDocumentRequest = new Request("POST", indexName + "/_doc");
+ indexDocumentRequest.setJsonEntity("{ \"@timestamp\": \"" + Instant.now() + "\", \"name\": \"Kimchi\" }");
+ return indexDocumentRequest;
+ }
}
diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_require_data_stream.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_require_data_stream.yml
new file mode 100644
index 0000000000000..a0ab0f9872244
--- /dev/null
+++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_require_data_stream.yml
@@ -0,0 +1,37 @@
+---
+"Testing require_data_stream in index creation":
+ - skip:
+ version: " - 8.11.99"
+ reason: "require_data_stream was introduced in 8.12.0"
+
+ - do:
+ indices.put_index_template:
+ name: ds-template
+ body:
+ index_patterns: ds-*
+ template:
+ settings:
+ number_of_shards: 1
+ number_of_replicas: 0
+ mappings:
+ properties:
+ field:
+ type: keyword
+ data_stream: {}
+ allow_auto_create: true
+
+# - do:
+# index:
+# index: ds-test
+# require_data_stream: true
+# body:
+# '@timestamp': '2022-12-12'
+# foo: bar
+#
+# - do:
+# index:
+# index: index-test
+# require_data_stream: true
+# body:
+# '@timestamp': '2022-12-12'
+# foo: bar
diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json b/rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json
index 79751e4b0f61b..f2f857ed82ea8 100644
--- a/rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json
+++ b/rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json
@@ -80,6 +80,10 @@
"type": "boolean",
"description": "Sets require_alias for all incoming documents. Defaults to unset (false)"
},
+ "require_data_stream": {
+ "type": "boolean",
+ "description": "When true, requires destination to have a parent data stream. Default is false"
+ },
"list_executed_pipelines": {
"type": "boolean",
"description": "Sets list_executed_pipelines for all incoming documents. Defaults to unset (false)"
diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/index.json b/rest-api-spec/src/main/resources/rest-api-spec/api/index.json
index bd94d653014a0..8ebdb8658a370 100644
--- a/rest-api-spec/src/main/resources/rest-api-spec/api/index.json
+++ b/rest-api-spec/src/main/resources/rest-api-spec/api/index.json
@@ -101,6 +101,10 @@
"require_alias": {
"type": "boolean",
"description": "When true, requires destination to be an alias. Default is false"
+ },
+ "require_data_stream": {
+ "type": "boolean",
+ "description": "When true, requires destination to have a parent data stream. Default is false"
}
},
"body":{
diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.create.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.create.json
index 3a3f279775fa8..64cea3caad06b 100644
--- a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.create.json
+++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.create.json
@@ -38,6 +38,10 @@
"master_timeout":{
"type":"time",
"description":"Specify timeout for connection to master"
+ },
+ "require_data_stream": {
+ "type": "boolean",
+ "description": "When true, requires a matching index template with data stream template. Default is false"
}
},
"body":{
diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/update.json b/rest-api-spec/src/main/resources/rest-api-spec/api/update.json
index e588777e990ec..6975880cab3d9 100644
--- a/rest-api-spec/src/main/resources/rest-api-spec/api/update.json
+++ b/rest-api-spec/src/main/resources/rest-api-spec/api/update.json
@@ -83,6 +83,10 @@
"require_alias": {
"type": "boolean",
"description": "When true, requires destination is an alias. Default is false"
+ },
+ "require_data_stream": {
+ "type": "boolean",
+ "description": "When true, requires destination to have a parent data stream. Default is false"
}
},
"body":{
diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java
index 0c7145730e447..b5ac381d8ae10 100644
--- a/server/src/main/java/org/elasticsearch/TransportVersions.java
+++ b/server/src/main/java/org/elasticsearch/TransportVersions.java
@@ -161,6 +161,7 @@ static TransportVersion def(int id) {
public static final TransportVersion UNDESIRED_SHARD_ALLOCATIONS_COUNT_ADDED = def(8_530_00_0);
public static final TransportVersion ML_INFERENCE_TASK_SETTINGS_OPTIONAL_ADDED = def(8_531_00_0);
public static final TransportVersion DEPRECATED_COMPONENT_TEMPLATES_ADDED = def(8_532_00_0);
+ public static final TransportVersion REQUIRE_DATA_STREAM_ADDED = def(8_533_00_0);
/*
* STOP! READ THIS FIRST! No, really,
diff --git a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java
index dab46aed5b4bc..8224f7c2963cb 100644
--- a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java
+++ b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java
@@ -39,6 +39,9 @@ public interface DocWriteRequest extends IndicesRequest, Accountable {
// Flag set for disallowing index auto creation for an individual write request.
String REQUIRE_ALIAS = "require_alias";
+ // Flag set for disallowing index auto creation if no matching data-stream index template is available.
+ String REQUIRE_DATA_STREAM = "require_data_stream";
+
// Flag indicating that the list of executed pipelines should be returned in the request
String LIST_EXECUTED_PIPELINES = "list_executed_pipelines";
@@ -147,6 +150,12 @@ public interface DocWriteRequest extends IndicesRequest, Accountable {
*/
boolean isRequireAlias();
+ /**
+ * Should this request override specifically require the destination to be a data stream?
+ * @return boolean flag, when true specifically requires a data stream
+ */
+ boolean isRequireDataStream();
+
/**
* Finalize the request before executing or routing it.
*/
diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java
index 1cec71d2abe53..a99cb22884c74 100644
--- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java
+++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java
@@ -236,7 +236,8 @@ ClusterState execute(
// This expression only evaluates to true when the argument is non-null and false
if (isSystemDataStream == false && Boolean.FALSE.equals(template.getAllowAutoCreate())) {
throw new IndexNotFoundException(
- "composable template " + template.indexPatterns() + " forbids index auto creation"
+ "composable template " + template.indexPatterns() + " forbids index auto creation",
+ request.index()
);
}
@@ -260,6 +261,12 @@ ClusterState execute(
successfulRequests.put(request, indexName);
return clusterState;
} else {
+ if (request.isRequireDataStream()) {
+ throw new IndexNotFoundException(
+ "the index creation request requires a data stream, but no matching index template with data stream template was found for it",
+ request.index()
+ );
+ }
final var indexName = IndexNameExpressionResolver.resolveDateMathExpression(request.index());
if (isSystemIndex) {
if (indexName.equals(request.index()) == false) {
diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java
index 136f261dc3ef3..80d483add77eb 100644
--- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java
+++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java
@@ -62,6 +62,8 @@ public class CreateIndexRequest extends AcknowledgedRequest
private String index;
+ private boolean requireDataStream;
+
private Settings settings = Settings.EMPTY;
private String mappings = "{}";
@@ -102,6 +104,11 @@ public CreateIndexRequest(StreamInput in) throws IOException {
if (in.getTransportVersion().onOrAfter(TransportVersions.V_7_12_0)) {
origin = in.readString();
}
+ if (in.getTransportVersion().onOrAfter(TransportVersions.REQUIRE_DATA_STREAM_ADDED)) {
+ requireDataStream = in.readBoolean();
+ } else {
+ requireDataStream = false;
+ }
}
public CreateIndexRequest() {}
@@ -446,6 +453,20 @@ public CreateIndexRequest waitForActiveShards(final int waitForActiveShards) {
return waitForActiveShards(ActiveShardCount.from(waitForActiveShards));
}
+ public boolean isRequireDataStream() {
+ return requireDataStream;
+ }
+
+ /**
+ * todo
+ * @param requireDataStream
+ * @return
+ */
+ public CreateIndexRequest requireDataStream(boolean requireDataStream) {
+ this.requireDataStream = requireDataStream;
+ return this;
+ }
+
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@@ -468,6 +489,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.V_7_12_0)) {
out.writeString(origin);
}
+ if (out.getTransportVersion().onOrAfter(TransportVersions.REQUIRE_DATA_STREAM_ADDED)) {
+ out.writeOptionalBoolean(this.requireDataStream);
+ }
}
@Override
diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java
index c07734aee557c..46b3137beebb1 100644
--- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java
+++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java
@@ -246,4 +246,14 @@ public CreateIndexRequestBuilder setWaitForActiveShards(ActiveShardCount waitFor
public CreateIndexRequestBuilder setWaitForActiveShards(final int waitForActiveShards) {
return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
}
+
+ /**
+ * todo
+ * @param requireDataStream
+ * @return
+ */
+ public CreateIndexRequestBuilder setRequireDataStream(final boolean requireDataStream) {
+ request.requireDataStream(requireDataStream);
+ return this;
+ }
}
diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java
index 6d99f794e972a..7a2bd0e68608a 100644
--- a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java
+++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java
@@ -438,7 +438,19 @@ public BulkProcessor add(
lock.lock();
try {
ensureOpen();
- bulkRequest.add(data, defaultIndex, null, null, defaultPipeline, null, null, true, xContentType, RestApiVersion.current());
+ bulkRequest.add(
+ data,
+ defaultIndex,
+ null,
+ null,
+ defaultPipeline,
+ null,
+ null,
+ null,
+ true,
+ xContentType,
+ RestApiVersion.current()
+ );
bulkRequestToExecute = newBulkRequestIfNeeded();
} finally {
lock.unlock();
diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java
index cbe4252a0b6a1..6998ca4150ad5 100644
--- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java
+++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java
@@ -75,6 +75,7 @@ public class BulkRequest extends ActionRequest
private String globalRouting;
private String globalIndex;
private Boolean globalRequireAlias;
+ private Boolean globalRequireDatsStream;
private long sizeInBytes = 0;
@@ -232,7 +233,7 @@ public BulkRequest add(byte[] data, int from, int length, @Nullable String defau
* Adds a framed data in binary format
*/
public BulkRequest add(BytesReference data, @Nullable String defaultIndex, XContentType xContentType) throws IOException {
- return add(data, defaultIndex, null, null, null, null, null, true, xContentType, RestApiVersion.current());
+ return add(data, defaultIndex, null, null, null, null, null, null, true, xContentType, RestApiVersion.current());
}
/**
@@ -240,7 +241,7 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, XCont
*/
public BulkRequest add(BytesReference data, @Nullable String defaultIndex, boolean allowExplicitIndex, XContentType xContentType)
throws IOException {
- return add(data, defaultIndex, null, null, null, null, null, allowExplicitIndex, xContentType, RestApiVersion.current());
+ return add(data, defaultIndex, null, null, null, null, null, null, allowExplicitIndex, xContentType, RestApiVersion.current());
}
@@ -251,6 +252,7 @@ public BulkRequest add(
@Nullable FetchSourceContext defaultFetchSourceContext,
@Nullable String defaultPipeline,
@Nullable Boolean defaultRequireAlias,
+ @Nullable Boolean defaultRequireDataStream,
@Nullable Boolean defaultListExecutedPipelines,
boolean allowExplicitIndex,
XContentType xContentType,
@@ -259,6 +261,7 @@ public BulkRequest add(
String routing = valueOrDefault(defaultRouting, globalRouting);
String pipeline = valueOrDefault(defaultPipeline, globalPipeline);
Boolean requireAlias = valueOrDefault(defaultRequireAlias, globalRequireAlias);
+ Boolean requireDataStream = valueOrDefault(defaultRequireDataStream, globalRequireDatsStream);
new BulkRequestParser(true, restApiVersion).parse(
data,
defaultIndex,
@@ -266,6 +269,7 @@ public BulkRequest add(
defaultFetchSourceContext,
pipeline,
requireAlias,
+ requireDataStream,
defaultListExecutedPipelines,
allowExplicitIndex,
xContentType,
@@ -374,6 +378,10 @@ public Boolean requireAlias() {
return globalRequireAlias;
}
+ public Boolean requireDataStream() {
+ return globalRequireDatsStream;
+ }
+
/**
* Note for internal callers (NOT high level rest client),
* the global parameter setting is ignored when used with:
@@ -391,6 +399,11 @@ public BulkRequest requireAlias(Boolean globalRequireAlias) {
return this;
}
+ public BulkRequest requireDataStream(Boolean globalRequireDatsStream) {
+ this.globalRequireDatsStream = globalRequireDatsStream;
+ return this;
+ }
+
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java
index 3b6e69d16bae3..79d8865e5ba34 100644
--- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java
+++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java
@@ -61,6 +61,7 @@ public final class BulkRequestParser {
private static final ParseField IF_SEQ_NO = new ParseField("if_seq_no");
private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term");
private static final ParseField REQUIRE_ALIAS = new ParseField(DocWriteRequest.REQUIRE_ALIAS);
+ private static final ParseField REQUIRE_DATA_STREAM = new ParseField(DocWriteRequest.REQUIRE_DATA_STREAM);
private static final ParseField LIST_EXECUTED_PIPELINES = new ParseField(DocWriteRequest.LIST_EXECUTED_PIPELINES);
private static final ParseField DYNAMIC_TEMPLATES = new ParseField("dynamic_templates");
@@ -127,6 +128,7 @@ public void parse(
@Nullable FetchSourceContext defaultFetchSourceContext,
@Nullable String defaultPipeline,
@Nullable Boolean defaultRequireAlias,
+ @Nullable Boolean defaultRequireDataStream,
@Nullable Boolean defaultListExecutedPipelines,
boolean allowExplicitIndex,
XContentType xContentType,
@@ -209,6 +211,7 @@ public void parse(
int retryOnConflict = 0;
String pipeline = defaultPipeline;
boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias;
+ boolean requireDataStream = defaultRequireDataStream != null && defaultRequireDataStream;
boolean listExecutedPipelines = defaultListExecutedPipelines != null && defaultListExecutedPipelines;
Map dynamicTemplates = Map.of();
@@ -263,6 +266,8 @@ public void parse(
fetchSourceContext = FetchSourceContext.fromXContent(parser);
} else if (REQUIRE_ALIAS.match(currentFieldName, parser.getDeprecationHandler())) {
requireAlias = parser.booleanValue();
+ } else if (REQUIRE_DATA_STREAM.match(currentFieldName, parser.getDeprecationHandler())) {
+ requireDataStream = parser.booleanValue();
} else if (LIST_EXECUTED_PIPELINES.match(currentFieldName, parser.getDeprecationHandler())) {
listExecutedPipelines = parser.booleanValue();
} else {
@@ -349,6 +354,7 @@ public void parse(
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setDynamicTemplates(dynamicTemplates)
.setRequireAlias(requireAlias)
+ .setRequireDataStream(requireDataStream)
.setListExecutedPipelines(listExecutedPipelines),
type
);
@@ -365,6 +371,7 @@ public void parse(
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setDynamicTemplates(dynamicTemplates)
.setRequireAlias(requireAlias)
+ .setRequireDataStream(requireDataStream)
.setListExecutedPipelines(listExecutedPipelines),
type
);
@@ -382,6 +389,7 @@ public void parse(
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setDynamicTemplates(dynamicTemplates)
.setRequireAlias(requireAlias)
+ .setRequireDataStream(requireDataStream)
.setListExecutedPipelines(listExecutedPipelines),
type
);
@@ -404,6 +412,7 @@ public void parse(
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.setRequireAlias(requireAlias)
+ .setRequireDataStream(requireDataStream)
.routing(routing);
try (
XContentParser sliceParser = createParser(
diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
index 13d10be86bd68..df0ba4db6b3e3 100644
--- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
+++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
@@ -68,12 +68,10 @@
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -312,7 +310,7 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec
// Attempt to create all the indices that we're going to need during the bulk before we start.
// Step 1: collect all the indices in the request
- final Map indices = bulkRequest.requests.stream()
+ final Map indices = bulkRequest.requests.stream()
// delete requests should not attempt to create the index (if the index does not
// exists), unless an external versioning is used
.filter(
@@ -320,28 +318,32 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec
|| request.versionType() == VersionType.EXTERNAL
|| request.versionType() == VersionType.EXTERNAL_GTE
)
- .collect(Collectors.toMap(DocWriteRequest::index, DocWriteRequest::isRequireAlias, (v1, v2) -> v1 || v2));
+ .collect(
+ Collectors.toMap(
+ DocWriteRequest::index,
+ request -> new ReducedRequestInfo(request.isRequireAlias(), request.isRequireDataStream()),
+ ReducedRequestInfo::merge
+ )
+ );
// Step 2: filter the list of indices to find those that don't currently exist.
final Map indicesThatCannotBeCreated = new HashMap<>();
- Set autoCreateIndices = new HashSet<>();
- ClusterState state = clusterService.state();
- for (Map.Entry indexAndFlag : indices.entrySet()) {
- final String index = indexAndFlag.getKey();
- boolean shouldAutoCreate = indexNameExpressionResolver.hasIndexAbstraction(index, state) == false;
+ final ClusterState state = clusterService.state();
+ Map indicesToAutoCreate = indices.entrySet()
+ .stream()
+ .filter(entry -> indexNameExpressionResolver.hasIndexAbstraction(entry.getKey(), state) == false)
// We should only auto create if we are not requiring it to be an alias
- if (shouldAutoCreate && (indexAndFlag.getValue() == false)) {
- autoCreateIndices.add(index);
- }
- }
+ .filter(entry -> entry.getValue().isRequireAlias == false)
+ .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().isRequireDataStream));
// Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.
- if (autoCreateIndices.isEmpty()) {
+ if (indicesToAutoCreate.isEmpty()) {
executeBulk(task, bulkRequest, startTime, listener, executorName, responses, indicesThatCannotBeCreated);
} else {
- final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
- for (String index : autoCreateIndices) {
- createIndex(index, bulkRequest.timeout(), new ActionListener<>() {
+ final AtomicInteger counter = new AtomicInteger(indicesToAutoCreate.size());
+ for (Map.Entry indexEntry : indicesToAutoCreate.entrySet()) {
+ final String index = indexEntry.getKey();
+ createIndex(index, indexEntry.getValue(), bulkRequest.timeout(), new ActionListener<>() {
@Override
public void onResponse(CreateIndexResponse result) {
if (counter.decrementAndGet() == 0) {
@@ -468,9 +470,10 @@ private static boolean isSystemIndex(SortedMap indices
}
}
- void createIndex(String index, TimeValue timeout, ActionListener listener) {
+ void createIndex(String index, boolean requireDataStream, TimeValue timeout, ActionListener listener) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest();
createIndexRequest.index(index);
+ createIndexRequest.requireDataStream(requireDataStream);
createIndexRequest.cause("auto(bulk api)");
createIndexRequest.masterNodeTimeout(timeout);
client.execute(AutoCreateAction.INSTANCE, createIndexRequest, listener);
@@ -495,6 +498,23 @@ private long buildTookInMillis(long startTimeNanos) {
return TimeUnit.NANOSECONDS.toMillis(relativeTime() - startTimeNanos);
}
+ private static final class ReducedRequestInfo {
+ // todo: can stream mapping be parallel on multi threads? If so, this needs to be volatile
+ private boolean isRequireAlias;
+ private boolean isRequireDataStream;
+
+ private ReducedRequestInfo(boolean isRequireAlias, boolean isRequireDataStream) {
+ this.isRequireAlias = isRequireAlias;
+ this.isRequireDataStream = isRequireDataStream;
+ }
+
+ private ReducedRequestInfo merge(ReducedRequestInfo other) {
+ this.isRequireAlias |= other.isRequireAlias;
+ this.isRequireDataStream |= other.isRequireDataStream;
+ return this;
+ }
+ }
+
/**
* retries on retryable cluster blocks, resolves item requests,
* constructs shard bulk requests and delegates execution to shard bulk action
@@ -553,6 +573,9 @@ protected void doRun() {
if (addFailureIfIndexCannotBeCreated(docWriteRequest, i)) {
continue;
}
+ if (addFailureIfRequiresDataStreamAndNoParentDataStream(docWriteRequest, i, metadata)) {
+ continue;
+ }
IndexAbstraction ia = null;
boolean includeDataStreams = docWriteRequest.opType() == DocWriteRequest.OpType.CREATE;
try {
@@ -727,6 +750,22 @@ private boolean addFailureIfRequiresAliasAndAliasIsMissing(DocWriteRequest> re
return false;
}
+ private boolean addFailureIfRequiresDataStreamAndNoParentDataStream(DocWriteRequest> request, int idx, final Metadata metadata) {
+ if (request.isRequireDataStream() && (metadata.hasParentDataStream(request.index()) == false)) {
+ Exception exception = new IndexNotFoundException(
+ "["
+ + DocWriteRequest.REQUIRE_DATA_STREAM
+ + "] request flag is [true] and ["
+ + request.index()
+ + "] has no parent data stream",
+ request.index()
+ );
+ addFailure(request, idx, exception);
+ return true;
+ }
+ return false;
+ }
+
private boolean addFailureIfIndexIsClosed(DocWriteRequest> request, Index concreteIndex, int idx, final Metadata metadata) {
IndexMetadata indexMetadata = metadata.getIndexSafe(concreteIndex);
if (indexMetadata.getState() == IndexMetadata.State.CLOSE) {
diff --git a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java
index 70f0a9a12e02e..e4b8edea58114 100644
--- a/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java
+++ b/server/src/main/java/org/elasticsearch/action/delete/DeleteRequest.java
@@ -231,6 +231,11 @@ public boolean isRequireAlias() {
return false;
}
+ @Override
+ public boolean isRequireDataStream() {
+ return false;
+ }
+
@Override
public void process(IndexRouting indexRouting) {
// Nothing to do
diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java
index 2f202dd21ad7c..c9e1feb947d0c 100644
--- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java
+++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java
@@ -106,6 +106,9 @@ public class IndexRequest extends ReplicatedWriteRequest implement
private boolean isPipelineResolved;
private boolean requireAlias;
+
+ private boolean requireDataStream;
+
/**
* This indicates whether the response to this request ought to list the ingest pipelines that were executed on the document
*/
@@ -189,6 +192,11 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
: new ArrayList<>(possiblyImmutableExecutedPipelines);
}
}
+ if (in.getTransportVersion().onOrAfter(TransportVersions.REQUIRE_DATA_STREAM_ADDED)) {
+ requireDataStream = in.readBoolean();
+ } else {
+ requireDataStream = false;
+ }
}
public IndexRequest() {
@@ -755,6 +763,9 @@ private void writeBody(StreamOutput out) throws IOException {
out.writeOptionalCollection(executedPipelines, StreamOutput::writeString);
}
}
+ if (out.getTransportVersion().onOrAfter(TransportVersions.REQUIRE_DATA_STREAM_ADDED)) {
+ out.writeOptionalBoolean(requireDataStream);
+ }
}
@Override
@@ -810,6 +821,21 @@ public boolean isRequireAlias() {
return requireAlias;
}
+ @Override
+ public boolean isRequireDataStream() {
+ return requireDataStream;
+ }
+
+ /**
+ * todo
+ * @param requireDataStream
+ * @return
+ */
+ public IndexRequest setRequireDataStream(boolean requireDataStream) {
+ this.requireDataStream = requireDataStream;
+ return this;
+ }
+
@Override
public Index getConcreteWriteIndex(IndexAbstraction ia, Metadata metadata) {
return ia.getWriteIndex(this, metadata);
diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java
index 360b470eb1ab4..ade85d6679c86 100644
--- a/server/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java
+++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java
@@ -221,4 +221,12 @@ public IndexRequestBuilder setRequireAlias(boolean requireAlias) {
request.setRequireAlias(requireAlias);
return this;
}
+
+ /**
+ * Sets the require_data_stream flag
+ */
+ public IndexRequestBuilder setRequireDataStream(boolean requireDataStream) {
+ request.setRequireDataStream(requireDataStream);
+ return this;
+ }
}
diff --git a/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java
index dd85276bdf81a..2f21434aa048c 100644
--- a/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java
+++ b/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java
@@ -121,12 +121,25 @@ protected void doExecute(Task task, final UpdateRequest request, final ActionLis
request.index()
);
}
+ if (request.isRequireDataStream() && (clusterService.state().getMetadata().hasParentDataStream(request.index()) == false)) {
+ throw new IndexNotFoundException(
+ "["
+ + DocWriteRequest.REQUIRE_DATA_STREAM
+ + "] request flag is [true] and ["
+ + request.index()
+ + "] has no parent data stream",
+ request.index()
+ );
+ }
// if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API
if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) {
client.admin()
.indices()
.create(
- new CreateIndexRequest().index(request.index()).cause("auto(update api)").masterNodeTimeout(request.timeout()),
+ new CreateIndexRequest().index(request.index())
+ .cause("auto(update api)")
+ .masterNodeTimeout(request.timeout())
+ .requireDataStream(request.isRequireDataStream()),
new ActionListener() {
@Override
public void onResponse(CreateIndexResponse result) {
diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java
index 600790b2fd841..c7c8cdc8f2207 100644
--- a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java
+++ b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java
@@ -123,6 +123,7 @@ public class UpdateRequest extends InstanceShardOperationRequest
private boolean docAsUpsert = false;
private boolean detectNoop = true;
private boolean requireAlias = false;
+ private boolean requireDataStream = false;
@Nullable
private IndexRequest doc;
@@ -164,6 +165,11 @@ public UpdateRequest(@Nullable ShardId shardId, StreamInput in) throws IOExcepti
} else {
requireAlias = false;
}
+ if (in.getTransportVersion().onOrAfter(TransportVersions.REQUIRE_DATA_STREAM_ADDED)) {
+ requireDataStream = in.readBoolean();
+ } else {
+ requireDataStream = false;
+ }
}
public UpdateRequest(String index, String id) {
@@ -831,6 +837,21 @@ public boolean isRequireAlias() {
return requireAlias;
}
+ @Override
+ public boolean isRequireDataStream() {
+ return requireDataStream;
+ }
+
+ /**
+ * todo
+ * @param requireDataStream
+ * @return
+ */
+ public UpdateRequest setRequireDataStream(boolean requireDataStream) {
+ this.requireDataStream = requireDataStream;
+ return this;
+ }
+
@Override
public void process(IndexRouting indexRouting) {
// Nothing to do
@@ -908,6 +929,9 @@ private void doWrite(StreamOutput out, boolean thin) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.V_7_10_0)) {
out.writeBoolean(requireAlias);
}
+ if (out.getTransportVersion().onOrAfter(TransportVersions.REQUIRE_DATA_STREAM_ADDED)) {
+ out.writeOptionalBoolean(requireDataStream);
+ }
}
@Override
diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java
index 482ebd3489916..02bab99916bbe 100644
--- a/server/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java
+++ b/server/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java
@@ -348,4 +348,19 @@ public UpdateRequestBuilder setScriptedUpsert(boolean scriptedUpsert) {
return this;
}
+ /**
+ * Sets the require_alias flag
+ */
+ public UpdateRequestBuilder setRequireAlias(boolean requireAlias) {
+ request.setRequireAlias(requireAlias);
+ return this;
+ }
+
+ /**
+ * Sets the require_data_stream flag
+ */
+ public UpdateRequestBuilder setRequireDataStream(boolean requireDataStream) {
+ request.setRequireDataStream(requireDataStream);
+ return this;
+ }
}
diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java
index 64b234c8f5d2b..0c8739119baa4 100644
--- a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java
+++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java
@@ -897,6 +897,15 @@ public Map findDataStreams(String... concreteIndices) {
return builder.build();
}
+ /**
+ * Checks whether the provided index has a parent data streams.
+ */
+ public boolean hasParentDataStream(String indexName) {
+ final SortedMap lookup = getIndicesLookup();
+ IndexAbstraction index = lookup.get(indexName);
+ return index != null && index.getParentDataStream() != null;
+ }
+
@SuppressWarnings("unchecked")
private static MappingMetadata filterFields(MappingMetadata mappingMetadata, Predicate fieldPredicate) {
if (mappingMetadata == null) {
diff --git a/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java b/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java
index 02776e3b24531..b84d2e700dbe9 100644
--- a/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java
+++ b/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java
@@ -50,6 +50,8 @@
* returning all the subrequests that it will make it tries to return a representative set of subrequests. This is best-effort for a bunch
* of reasons, not least of which that scripts are allowed to change the destination request in drastic ways, including changing the index
* to which documents are written.
+ *
+ * todo add support for require_data_stream
*/
public class ReindexRequest extends AbstractBulkIndexByScrollRequest implements CompositeIndicesRequest, ToXContentObject {
/**
diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java
index 603e2ebadbd53..c29c5ca61eaa5 100644
--- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java
+++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java
@@ -80,6 +80,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
bulkRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
}
Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, null);
+ Boolean defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, null);
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
bulkRequest.setRefreshPolicy(request.param("refresh"));
bulkRequest.add(
@@ -89,6 +90,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
defaultFetchSourceContext,
defaultPipeline,
defaultRequireAlias,
+ defaultRequireDataStream,
defaultListExecutedPipelines,
allowExplicitIndex,
request.getXContentType(),
diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java
index e5c70fa4fe188..6284d63131999 100644
--- a/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java
+++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java
@@ -136,6 +136,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
indexRequest.setIfSeqNo(request.paramAsLong("if_seq_no", indexRequest.ifSeqNo()));
indexRequest.setIfPrimaryTerm(request.paramAsLong("if_primary_term", indexRequest.ifPrimaryTerm()));
indexRequest.setRequireAlias(request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, indexRequest.isRequireAlias()));
+ indexRequest.setRequireDataStream(request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, indexRequest.isRequireDataStream()));
String sOpType = request.param("op_type");
String waitForActiveShards = request.param("wait_for_active_shards");
if (waitForActiveShards != null) {
diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java
index a60b42de12d6e..f228326955f38 100644
--- a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java
+++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java
@@ -29,7 +29,7 @@ public void testIndexRequest() throws IOException {
""");
BulkRequestParser parser = new BulkRequestParser(randomBoolean(), RestApiVersion.current());
final AtomicBoolean parsed = new AtomicBoolean();
- parser.parse(request, "foo", null, null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> {
+ parser.parse(request, "foo", null, null, null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> {
assertFalse(parsed.get());
assertEquals("foo", indexRequest.index());
assertEquals("bar", indexRequest.id());
@@ -38,7 +38,7 @@ public void testIndexRequest() throws IOException {
}, req -> fail(), req -> fail());
assertTrue(parsed.get());
- parser.parse(request, "foo", null, null, null, true, null, false, XContentType.JSON, (indexRequest, type) -> {
+ parser.parse(request, "foo", null, null, null, true, null, null, false, XContentType.JSON, (indexRequest, type) -> {
assertTrue(indexRequest.isRequireAlias());
}, req -> fail(), req -> fail());
@@ -46,7 +46,7 @@ public void testIndexRequest() throws IOException {
{ "index":{ "_id": "bar", "require_alias": true } }
{}
""");
- parser.parse(request, "foo", null, null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> {
+ parser.parse(request, "foo", null, null, null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> {
assertTrue(indexRequest.isRequireAlias());
}, req -> fail(), req -> fail());
@@ -54,7 +54,7 @@ public void testIndexRequest() throws IOException {
{ "index":{ "_id": "bar", "require_alias": false } }
{}
""");
- parser.parse(request, "foo", null, null, null, true, null, false, XContentType.JSON, (indexRequest, type) -> {
+ parser.parse(request, "foo", null, null, null, true, null, null, false, XContentType.JSON, (indexRequest, type) -> {
assertFalse(indexRequest.isRequireAlias());
}, req -> fail(), req -> fail());
}
@@ -73,6 +73,7 @@ public void testDeleteRequest() throws IOException {
null,
null,
null,
+ null,
false,
XContentType.JSON,
(req, type) -> fail(),
@@ -94,7 +95,7 @@ public void testUpdateRequest() throws IOException {
""");
BulkRequestParser parser = new BulkRequestParser(randomBoolean(), RestApiVersion.current());
final AtomicBoolean parsed = new AtomicBoolean();
- parser.parse(request, "foo", null, null, null, null, null, false, XContentType.JSON, (req, type) -> fail(), updateRequest -> {
+ parser.parse(request, "foo", null, null, null, null, null, null, false, XContentType.JSON, (req, type) -> fail(), updateRequest -> {
assertFalse(parsed.get());
assertEquals("foo", updateRequest.index());
assertEquals("bar", updateRequest.id());
@@ -103,7 +104,7 @@ public void testUpdateRequest() throws IOException {
}, req -> fail());
assertTrue(parsed.get());
- parser.parse(request, "foo", null, null, null, true, null, false, XContentType.JSON, (req, type) -> fail(), updateRequest -> {
+ parser.parse(request, "foo", null, null, null, true, null, null, false, XContentType.JSON, (req, type) -> fail(), updateRequest -> {
assertTrue(updateRequest.isRequireAlias());
}, req -> fail());
@@ -111,7 +112,7 @@ public void testUpdateRequest() throws IOException {
{ "update":{ "_id": "bar", "require_alias": true } }
{}
""");
- parser.parse(request, "foo", null, null, null, null, null, false, XContentType.JSON, (req, type) -> fail(), updateRequest -> {
+ parser.parse(request, "foo", null, null, null, null, null, null, false, XContentType.JSON, (req, type) -> fail(), updateRequest -> {
assertTrue(updateRequest.isRequireAlias());
}, req -> fail());
@@ -119,7 +120,7 @@ public void testUpdateRequest() throws IOException {
{ "update":{ "_id": "bar", "require_alias": false } }
{}
""");
- parser.parse(request, "foo", null, null, null, true, null, false, XContentType.JSON, (req, type) -> fail(), updateRequest -> {
+ parser.parse(request, "foo", null, null, null, true, null, null, false, XContentType.JSON, (req, type) -> fail(), updateRequest -> {
assertFalse(updateRequest.isRequireAlias());
}, req -> fail());
}
@@ -139,6 +140,7 @@ public void testBarfOnLackOfTrailingNewline() {
null,
null,
null,
+ null,
false,
XContentType.JSON,
(req, type) -> fail(),
@@ -166,6 +168,7 @@ public void testFailOnExplicitIndex() {
null,
null,
null,
+ null,
false,
XContentType.JSON,
(req, type) -> fail(),
@@ -183,7 +186,7 @@ public void testTypesStillParsedForBulkMonitoring() throws IOException {
""");
BulkRequestParser parser = new BulkRequestParser(false, RestApiVersion.current());
final AtomicBoolean parsed = new AtomicBoolean();
- parser.parse(request, "foo", null, null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> {
+ parser.parse(request, "foo", null, null, null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> {
assertFalse(parsed.get());
assertEquals("foo", indexRequest.index());
assertEquals("bar", indexRequest.id());
@@ -210,6 +213,7 @@ public void testParseDeduplicatesParameterStrings() throws IOException {
null,
null,
null,
+ null,
true,
XContentType.JSON,
(indexRequest, type) -> indexRequests.add(indexRequest),
@@ -241,6 +245,7 @@ public void testFailOnInvalidAction() {
null,
null,
null,
+ null,
false,
XContentType.JSON,
(req, type) -> fail(),
@@ -260,11 +265,11 @@ public void testListExecutedPipelines() throws IOException {
{}
""");
BulkRequestParser parser = new BulkRequestParser(randomBoolean(), RestApiVersion.current());
- parser.parse(request, "foo", null, null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> {
+ parser.parse(request, "foo", null, null, null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> {
assertFalse(indexRequest.getListExecutedPipelines());
}, req -> fail(), req -> fail());
- parser.parse(request, "foo", null, null, null, null, true, false, XContentType.JSON, (indexRequest, type) -> {
+ parser.parse(request, "foo", null, null, null, null, null, true, false, XContentType.JSON, (indexRequest, type) -> {
assertTrue(indexRequest.getListExecutedPipelines());
}, req -> fail(), req -> fail());
@@ -272,7 +277,7 @@ public void testListExecutedPipelines() throws IOException {
{ "index":{ "_id": "bar", "op_type": "create" } }
{}
""");
- parser.parse(request, "foo", null, null, null, null, true, false, XContentType.JSON, (indexRequest, type) -> {
+ parser.parse(request, "foo", null, null, null, null, null, true, false, XContentType.JSON, (indexRequest, type) -> {
assertTrue(indexRequest.getListExecutedPipelines());
}, req -> fail(), req -> fail());
@@ -280,7 +285,7 @@ public void testListExecutedPipelines() throws IOException {
{ "create":{ "_id": "bar" } }
{}
""");
- parser.parse(request, "foo", null, null, null, null, true, false, XContentType.JSON, (indexRequest, type) -> {
+ parser.parse(request, "foo", null, null, null, null, null, true, false, XContentType.JSON, (indexRequest, type) -> {
assertTrue(indexRequest.getListExecutedPipelines());
}, req -> fail(), req -> fail());
@@ -288,7 +293,7 @@ public void testListExecutedPipelines() throws IOException {
{ "index":{ "_id": "bar", "list_executed_pipelines": "true" } }
{}
""");
- parser.parse(request, "foo", null, null, null, null, false, false, XContentType.JSON, (indexRequest, type) -> {
+ parser.parse(request, "foo", null, null, null, null, null, false, false, XContentType.JSON, (indexRequest, type) -> {
assertTrue(indexRequest.getListExecutedPipelines());
}, req -> fail(), req -> fail());
@@ -296,7 +301,7 @@ public void testListExecutedPipelines() throws IOException {
{ "index":{ "_id": "bar", "list_executed_pipelines": "false" } }
{}
""");
- parser.parse(request, "foo", null, null, null, null, true, false, XContentType.JSON, (indexRequest, type) -> {
+ parser.parse(request, "foo", null, null, null, null, null, true, false, XContentType.JSON, (indexRequest, type) -> {
assertFalse(indexRequest.getListExecutedPipelines());
}, req -> fail(), req -> fail());
}
diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java
index e097b83fb9d35..a7f959df9b771 100644
--- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java
+++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java
@@ -146,7 +146,7 @@ void executeBulk(
}
@Override
- void createIndex(String index, TimeValue timeout, ActionListener listener) {
+ void createIndex(String index, boolean requireDataStream, TimeValue timeout, ActionListener listener) {
try {
simulateAutoCreate.accept(index);
// If we try to create an index just immediately assume it worked
diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java
index 0168eb0488a5b..7763ec65d7963 100644
--- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java
+++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java
@@ -156,7 +156,7 @@ void executeBulk(
}
@Override
- void createIndex(String index, TimeValue timeout, ActionListener listener) {
+ void createIndex(String index, boolean requireDataStream, TimeValue timeout, ActionListener listener) {
indexCreated = true;
listener.onResponse(null);
}
diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java
index e2c71f3b20084..bd0877bee2194 100644
--- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java
+++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java
@@ -92,7 +92,7 @@ class TestTransportBulkAction extends TransportBulkAction {
}
@Override
- void createIndex(String index, TimeValue timeout, ActionListener listener) {
+ void createIndex(String index, boolean requireDataStream, TimeValue timeout, ActionListener listener) {
indexCreated = true;
if (beforeIndexCreation != null) {
beforeIndexCreation.run();
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/action/MonitoringBulkRequest.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/action/MonitoringBulkRequest.java
index 2b252607b05ad..638e57207fbeb 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/action/MonitoringBulkRequest.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/action/MonitoringBulkRequest.java
@@ -91,6 +91,7 @@ public MonitoringBulkRequest add(
null,
null,
null,
+ null,
true,
xContentType,
(indexRequest, type) -> {
diff --git a/x-pack/qa/runtime-fields/src/main/java/org/elasticsearch/xpack/runtimefields/test/CoreTestTranslater.java b/x-pack/qa/runtime-fields/src/main/java/org/elasticsearch/xpack/runtimefields/test/CoreTestTranslater.java
index 110a1fd24d0d3..36a03adb9a59c 100644
--- a/x-pack/qa/runtime-fields/src/main/java/org/elasticsearch/xpack/runtimefields/test/CoreTestTranslater.java
+++ b/x-pack/qa/runtime-fields/src/main/java/org/elasticsearch/xpack/runtimefields/test/CoreTestTranslater.java
@@ -352,6 +352,7 @@ private boolean handleBulk(ApiCallSection bulk) {
defaultPipeline,
null,
null,
+ null,
true,
XContentType.JSON,
(index, type) -> indexRequests.add(index),
From 966eccda57f5a11459be32bdbd46b79f6c8ead8e Mon Sep 17 00:00:00 2001
From: eyalkoren <41850454+eyalkoren@users.noreply.github.com>
Date: Tue, 7 Nov 2023 20:04:54 +0700
Subject: [PATCH 02/22] Update docs/changelog/101872.yaml
---
docs/changelog/101872.yaml | 6 ++++++
1 file changed, 6 insertions(+)
create mode 100644 docs/changelog/101872.yaml
diff --git a/docs/changelog/101872.yaml b/docs/changelog/101872.yaml
new file mode 100644
index 0000000000000..737b54ab752e6
--- /dev/null
+++ b/docs/changelog/101872.yaml
@@ -0,0 +1,6 @@
+pr: 101872
+summary: "[WIP] Adding `require_data_stream` feature"
+area: Data streams
+type: feature
+issues:
+ - 97032
From 3411ad2e2457c38ab9ea6ac6e63d3a78096ca965 Mon Sep 17 00:00:00 2001
From: eyalkoren <41850454+eyalkoren@users.noreply.github.com>
Date: Mon, 20 Nov 2023 08:42:53 +0200
Subject: [PATCH 03/22] Complete merge
---
.../action/bulk/TransportBulkAction.java | 13 +++++++------
.../action/bulk/TransportSimulateBulkAction.java | 3 +--
.../action/ingest/RestSimulateIngestAction.java | 1 +
.../bulk/TransportSimulateBulkActionTests.java | 11 ++++++-----
4 files changed, 15 insertions(+), 13 deletions(-)
diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
index 489171298d409..e15e4dc86c99b 100644
--- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
+++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
@@ -372,7 +372,7 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec
bulkRequest,
executorName,
listener,
- autoCreateIndices,
+ indicesToAutoCreate,
indicesThatCannotBeCreated,
startTime
);
@@ -386,17 +386,18 @@ protected void createMissingIndicesAndIndexData(
BulkRequest bulkRequest,
String executorName,
ActionListener listener,
- Set autoCreateIndices,
+ Map indicesToAutoCreate,
Map indicesThatCannotBeCreated,
long startTime
) {
final AtomicArray responses = new AtomicArray<>(bulkRequest.requests.size());
- if (autoCreateIndices.isEmpty()) {
+ if (indicesToAutoCreate.isEmpty()) {
executeBulk(task, bulkRequest, startTime, listener, executorName, responses, indicesThatCannotBeCreated);
} else {
- final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
- for (String index : autoCreateIndices) {
- createIndex(index, bulkRequest.timeout(), new ActionListener<>() {
+ final AtomicInteger counter = new AtomicInteger(indicesToAutoCreate.size());
+ for (Map.Entry indexEntry : indicesToAutoCreate.entrySet()) {
+ final String index = indexEntry.getKey();
+ createIndex(index, indexEntry.getValue(), bulkRequest.timeout(), new ActionListener<>() {
@Override
public void onResponse(CreateIndexResponse result) {
if (counter.decrementAndGet() == 0) {
diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java
index 7e2fef88c7680..69bdc34b14956 100644
--- a/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java
+++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java
@@ -28,7 +28,6 @@
import org.elasticsearch.transport.TransportService;
import java.util.Map;
-import java.util.Set;
public class TransportSimulateBulkAction extends TransportBulkAction {
@Inject
@@ -69,7 +68,7 @@ protected void createMissingIndicesAndIndexData(
BulkRequest bulkRequest,
String executorName,
ActionListener listener,
- Set autoCreateIndices,
+ Map indicesToAutoCreate,
Map indicesThatCannotBeCreated,
long startTime
) {
diff --git a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestAction.java b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestAction.java
index e0d9dd95206cf..5c68270a32d51 100644
--- a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestAction.java
+++ b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestAction.java
@@ -84,6 +84,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
defaultFetchSourceContext,
defaultPipeline,
null,
+ null,
true,
true,
request.getXContentType(),
diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java
index 647eafb5f3cdd..53c1cae61e00f 100644
--- a/server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java
+++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java
@@ -40,7 +40,6 @@
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@@ -52,7 +51,9 @@
public class TransportSimulateBulkActionTests extends ESTestCase {
- /** Services needed by bulk action */
+ /**
+ * Services needed by bulk action
+ */
private TransportService transportService;
private ClusterService clusterService;
private TestThreadPool threadPool;
@@ -80,7 +81,7 @@ class TestTransportSimulateBulkAction extends TransportSimulateBulkAction {
}
@Override
- void createIndex(String index, TimeValue timeout, ActionListener listener) {
+ void createIndex(String index, boolean requireDataStream, TimeValue timeout, ActionListener listener) {
indexCreated = true;
if (beforeIndexCreation != null) {
beforeIndexCreation.run();
@@ -189,7 +190,7 @@ public void onFailure(Exception e) {
fail(e, "Unexpected error");
}
};
- Set autoCreateIndices = Set.of(); // unused
+ Map indicesToAutoCreate = Map.of(); // unused
Map indicesThatCannotBeCreated = Map.of(); // unused
long startTime = 0;
bulkAction.createMissingIndicesAndIndexData(
@@ -197,7 +198,7 @@ public void onFailure(Exception e) {
bulkRequest,
randomAlphaOfLength(10),
listener,
- autoCreateIndices,
+ indicesToAutoCreate,
indicesThatCannotBeCreated,
startTime
);
From 85c199dd81ad57539c2e4718f47f78f244dfd575 Mon Sep 17 00:00:00 2001
From: eyalkoren <41850454+eyalkoren@users.noreply.github.com>
Date: Mon, 20 Nov 2023 09:22:34 +0200
Subject: [PATCH 04/22] Checkstyle fixes
---
.../elasticsearch/datastreams/AutoCreateDataStreamIT.java | 6 ++++--
.../action/admin/indices/create/AutoCreateAction.java | 3 ++-
2 files changed, 6 insertions(+), 3 deletions(-)
diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/AutoCreateDataStreamIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/AutoCreateDataStreamIT.java
index 8b3ad5ba1be26..4ffdc128f8ec0 100644
--- a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/AutoCreateDataStreamIT.java
+++ b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/AutoCreateDataStreamIT.java
@@ -82,7 +82,8 @@ public void testCannotAutoCreateDataStreamWhenNoDataStreamTemplateMatch() throws
assertThat(
Streams.copyToString(new InputStreamReader(responseException.getResponse().getEntity().getContent(), UTF_8)),
containsString(
- "no such index [ingredients_kr] and the index creation request requires a data stream, but no matching index template with data stream template was found for it"
+ "no such index [ingredients_kr] and the index creation request requires a data stream, "
+ + "but no matching index template with data stream template was found for it"
)
);
}
@@ -104,7 +105,8 @@ public void testCannotAutoCreateDataStreamWhenMatchingTemplateIsNotDataStream()
assertThat(
Streams.copyToString(new InputStreamReader(responseException.getResponse().getEntity().getContent(), UTF_8)),
containsString(
- "no such index [recipe_kr] and the index creation request requires a data stream, but no matching index template with data stream template was found for it"
+ "no such index [recipe_kr] and the index creation request requires a data stream, "
+ + "but no matching index template with data stream template was found for it"
)
);
}
diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java
index b6b80784d9473..f7d58e186d2fd 100644
--- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java
+++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/AutoCreateAction.java
@@ -275,7 +275,8 @@ ClusterState execute(
} else {
if (request.isRequireDataStream()) {
throw new IndexNotFoundException(
- "the index creation request requires a data stream, but no matching index template with data stream template was found for it",
+ "the index creation request requires a data stream, "
+ + "but no matching index template with data stream template was found for it",
request.index()
);
}
From 9697285a7ce545c3f18737f77ab07c5b3ff16d1d Mon Sep 17 00:00:00 2001
From: Lee Hinman
Date: Mon, 11 Dec 2023 15:56:19 -0700
Subject: [PATCH 05/22] Make ReducedRequestInfo a record
---
.../action/bulk/TransportBulkAction.java | 18 +++++-------------
1 file changed, 5 insertions(+), 13 deletions(-)
diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
index e15e4dc86c99b..2c10cfda2f951 100644
--- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
+++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
@@ -560,20 +560,12 @@ protected long buildTookInMillis(long startTimeNanos) {
return TimeUnit.NANOSECONDS.toMillis(relativeTime() - startTimeNanos);
}
- private static final class ReducedRequestInfo {
- // todo: can stream mapping be parallel on multi threads? If so, this needs to be volatile
- private boolean isRequireAlias;
- private boolean isRequireDataStream;
-
- private ReducedRequestInfo(boolean isRequireAlias, boolean isRequireDataStream) {
- this.isRequireAlias = isRequireAlias;
- this.isRequireDataStream = isRequireDataStream;
- }
-
+ private record ReducedRequestInfo(boolean isRequireAlias, boolean isRequireDataStream) {
private ReducedRequestInfo merge(ReducedRequestInfo other) {
- this.isRequireAlias |= other.isRequireAlias;
- this.isRequireDataStream |= other.isRequireDataStream;
- return this;
+ return new ReducedRequestInfo(
+ this.isRequireAlias || other.isRequireAlias,
+ this.isRequireDataStream || other.isRequireDataStream
+ );
}
}
From 3cc9369ed240c74c6e0adea587e661999feadc42 Mon Sep 17 00:00:00 2001
From: Lee Hinman
Date: Mon, 11 Dec 2023 15:57:54 -0700
Subject: [PATCH 06/22] Clarify log message
---
.../java/org/elasticsearch/action/bulk/TransportBulkAction.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
index 2c10cfda2f951..ec308050eebfa 100644
--- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
+++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
@@ -811,7 +811,7 @@ private boolean addFailureIfRequiresDataStreamAndNoParentDataStream(DocWriteRequ
+ DocWriteRequest.REQUIRE_DATA_STREAM
+ "] request flag is [true] and ["
+ request.index()
- + "] has no parent data stream",
+ + "] is not a data stream",
request.index()
);
addFailure(request, idx, exception);
From a785294c77c371abfa8c3e96f3f0f08e1a7a247f Mon Sep 17 00:00:00 2001
From: Lee Hinman
Date: Mon, 11 Dec 2023 16:04:56 -0700
Subject: [PATCH 07/22] Fix and test IndexRequest serialization
---
.../java/org/elasticsearch/action/index/IndexRequest.java | 2 +-
.../org/elasticsearch/action/index/IndexRequestTests.java | 4 ++--
2 files changed, 3 insertions(+), 3 deletions(-)
diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java
index 7c22c6361e4b9..06668d1e3f262 100644
--- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java
+++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java
@@ -743,7 +743,7 @@ private void writeBody(StreamOutput out) throws IOException {
}
}
if (out.getTransportVersion().onOrAfter(TransportVersions.REQUIRE_DATA_STREAM_ADDED)) {
- out.writeOptionalBoolean(requireDataStream);
+ out.writeBoolean(requireDataStream);
}
}
diff --git a/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java b/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java
index 38ffc6c46c3f3..e505f04e0ce06 100644
--- a/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java
+++ b/server/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java
@@ -46,12 +46,10 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import static org.apache.lucene.tests.util.LuceneTestCase.expectThrows;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
@@ -463,6 +461,7 @@ public void testSerialization() throws IOException {
assertThat(copy.ifSeqNo(), equalTo(indexRequest.ifSeqNo()));
assertThat(copy.getFinalPipeline(), equalTo(indexRequest.getFinalPipeline()));
assertThat(copy.ifPrimaryTerm(), equalTo(indexRequest.ifPrimaryTerm()));
+ assertThat(copy.isRequireDataStream(), equalTo(indexRequest.isRequireDataStream()));
}
private IndexRequest createTestInstance() {
@@ -470,6 +469,7 @@ private IndexRequest createTestInstance() {
indexRequest.setPipeline(randomAlphaOfLength(15));
indexRequest.setRequestId(randomLong());
indexRequest.setRequireAlias(randomBoolean());
+ indexRequest.setRequireDataStream(randomBoolean());
indexRequest.setIfSeqNo(randomNonNegativeLong());
indexRequest.setFinalPipeline(randomAlphaOfLength(20));
indexRequest.setIfPrimaryTerm(randomNonNegativeLong());
From 5a24d3162dc1b19fd1208a3ecfde8d90cad3bc93 Mon Sep 17 00:00:00 2001
From: Lee Hinman
Date: Mon, 11 Dec 2023 16:06:12 -0700
Subject: [PATCH 08/22] Fix UpdateRequest serialization
---
.../java/org/elasticsearch/action/update/UpdateRequest.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java
index c7c8cdc8f2207..8eb1da0c8ba3a 100644
--- a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java
+++ b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java
@@ -930,7 +930,7 @@ private void doWrite(StreamOutput out, boolean thin) throws IOException {
out.writeBoolean(requireAlias);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.REQUIRE_DATA_STREAM_ADDED)) {
- out.writeOptionalBoolean(requireDataStream);
+ out.writeBoolean(requireDataStream);
}
}
From 0746f0df775dd72867afdf19de4b456a16c1412f Mon Sep 17 00:00:00 2001
From: Lee Hinman
Date: Wed, 3 Jan 2024 10:03:24 -0700
Subject: [PATCH 09/22] Update description of require_data_stream flag
---
rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json b/rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json
index f2f857ed82ea8..9ced5d3e8c454 100644
--- a/rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json
+++ b/rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json
@@ -82,7 +82,7 @@
},
"require_data_stream": {
"type": "boolean",
- "description": "When true, requires destination to have a parent data stream. Default is false"
+ "description": "When true, requires the destination to be a data stream (existing or to-be-created). Default is false"
},
"list_executed_pipelines": {
"type": "boolean",
From 3c9b665b9405e28878c91fd454e2f5669c8b0594 Mon Sep 17 00:00:00 2001
From: Lee Hinman
Date: Wed, 3 Jan 2024 10:04:32 -0700
Subject: [PATCH 10/22] Fill in javadoc for setRequireDataStream
---
.../java/org/elasticsearch/action/index/IndexRequest.java | 4 +---
1 file changed, 1 insertion(+), 3 deletions(-)
diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java
index 97ee86bc4dfce..f63cfe8e08e81 100644
--- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java
+++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java
@@ -822,9 +822,7 @@ public boolean isRequireDataStream() {
}
/**
- * todo
- * @param requireDataStream
- * @return
+ * Set whether this IndexRequest requires a data stream. The data stream may be pre-existing or to-be-created.
*/
public IndexRequest setRequireDataStream(boolean requireDataStream) {
this.requireDataStream = requireDataStream;
From 602fec5cd065e1bf09427e4116f197542c15b5e1 Mon Sep 17 00:00:00 2001
From: Lee Hinman
Date: Wed, 3 Jan 2024 16:29:55 -0700
Subject: [PATCH 11/22] Remove unnecessary todo
---
.../java/org/elasticsearch/index/reindex/ReindexRequest.java | 2 --
1 file changed, 2 deletions(-)
diff --git a/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java b/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java
index b84d2e700dbe9..02776e3b24531 100644
--- a/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java
+++ b/server/src/main/java/org/elasticsearch/index/reindex/ReindexRequest.java
@@ -50,8 +50,6 @@
* returning all the subrequests that it will make it tries to return a representative set of subrequests. This is best-effort for a bunch
* of reasons, not least of which that scripts are allowed to change the destination request in drastic ways, including changing the index
* to which documents are written.
- *
- * todo add support for require_data_stream
*/
public class ReindexRequest extends AbstractBulkIndexByScrollRequest implements CompositeIndicesRequest, ToXContentObject {
/**
From e81f1a3b3235716c1e72d4d54e3aeee24ed33933 Mon Sep 17 00:00:00 2001
From: Lee Hinman
Date: Wed, 3 Jan 2024 16:30:06 -0700
Subject: [PATCH 12/22] Fill in javadoc for CreateIndexRequest
---
.../action/admin/indices/create/CreateIndexRequest.java | 4 +---
1 file changed, 1 insertion(+), 3 deletions(-)
diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java
index 80d483add77eb..2ec6db339b6ef 100644
--- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java
+++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java
@@ -458,9 +458,7 @@ public boolean isRequireDataStream() {
}
/**
- * todo
- * @param requireDataStream
- * @return
+ * Set whether this CreateIndexRequest requires a data stream. The data stream may be pre-existing or to-be-created.
*/
public CreateIndexRequest requireDataStream(boolean requireDataStream) {
this.requireDataStream = requireDataStream;
From a11653473403222626618d1cf997213adf6987c2 Mon Sep 17 00:00:00 2001
From: Lee Hinman
Date: Wed, 3 Jan 2024 16:30:45 -0700
Subject: [PATCH 13/22] Fill in more javadoc
---
.../admin/indices/create/CreateIndexRequestBuilder.java | 4 +---
1 file changed, 1 insertion(+), 3 deletions(-)
diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java
index 8de7592b2ba1a..307cafbb9b8e1 100644
--- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java
+++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java
@@ -248,9 +248,7 @@ public CreateIndexRequestBuilder setWaitForActiveShards(final int waitForActiveS
}
/**
- * todo
- * @param requireDataStream
- * @return
+ * Set whether this request requires a data stream. The data stream may be pre-existing or to-be-created.
*/
public CreateIndexRequestBuilder setRequireDataStream(final boolean requireDataStream) {
request.requireDataStream(requireDataStream);
From bb1e64c196620cf3954a9a7f34321c96cd83175c Mon Sep 17 00:00:00 2001
From: Lee Hinman
Date: Wed, 3 Jan 2024 16:31:31 -0700
Subject: [PATCH 14/22] Update skip version in yaml test
---
.../test/data_stream/190_require_data_stream.yml | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_require_data_stream.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_require_data_stream.yml
index a0ab0f9872244..2269be213f095 100644
--- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_require_data_stream.yml
+++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_require_data_stream.yml
@@ -1,8 +1,8 @@
---
"Testing require_data_stream in index creation":
- skip:
- version: " - 8.11.99"
- reason: "require_data_stream was introduced in 8.12.0"
+ version: " - 8.12.99"
+ reason: "require_data_stream was introduced in 8.13.0"
- do:
indices.put_index_template:
From 75e453e0c82e3d18f8749714a1cbf32f9aa5c47f Mon Sep 17 00:00:00 2001
From: Lee Hinman
Date: Wed, 3 Jan 2024 16:34:51 -0700
Subject: [PATCH 15/22] Typo
---
.../java/org/elasticsearch/action/bulk/TransportBulkAction.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
index 0f0d1ff583a94..0c4455971fa1a 100644
--- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
+++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
@@ -343,7 +343,7 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec
// Step 1: collect all the indices in the request
final Map indices = bulkRequest.requests.stream()
// delete requests should not attempt to create the index (if the index does not
- // exists), unless an external versioning is used
+ // exist), unless an external versioning is used
.filter(
request -> request.opType() != DocWriteRequest.OpType.DELETE
|| request.versionType() == VersionType.EXTERNAL
From e2f0275f825348fda1bf471704a6627f98601c3a Mon Sep 17 00:00:00 2001
From: Lee Hinman
Date: Fri, 5 Jan 2024 15:32:57 -0700
Subject: [PATCH 16/22] Drastically change the way this is implemented
---
.../data_stream/190_require_data_stream.yml | 51 +++++++++++++------
.../action/bulk/TransportBulkAction.java | 4 +-
.../action/update/TransportUpdateAction.java | 5 +-
.../cluster/metadata/Metadata.java | 8 +--
.../rest/action/document/RestBulkAction.java | 2 +-
5 files changed, 46 insertions(+), 24 deletions(-)
diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_require_data_stream.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_require_data_stream.yml
index 2269be213f095..6a291752f96c7 100644
--- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_require_data_stream.yml
+++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_require_data_stream.yml
@@ -3,8 +3,11 @@
- skip:
version: " - 8.12.99"
reason: "require_data_stream was introduced in 8.13.0"
+ features: allowed_warnings
- do:
+ allowed_warnings:
+ - "index template [ds-template] has index patterns [ds-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [ds-template] will take precedence during new index creation"
indices.put_index_template:
name: ds-template
body:
@@ -20,18 +23,36 @@
data_stream: {}
allow_auto_create: true
-# - do:
-# index:
-# index: ds-test
-# require_data_stream: true
-# body:
-# '@timestamp': '2022-12-12'
-# foo: bar
-#
-# - do:
-# index:
-# index: index-test
-# require_data_stream: true
-# body:
-# '@timestamp': '2022-12-12'
-# foo: bar
+ - do:
+ index:
+ index: ds-test
+ require_data_stream: true
+ body:
+ '@timestamp': '2022-12-12'
+ foo: bar
+
+ - do:
+ catch: /no matching index template with data stream template was found for it/
+ index:
+ index: index-test
+ require_data_stream: true
+ body:
+ '@timestamp': '2022-12-12'
+ foo: bar
+
+ - do:
+ index:
+ index: other-index
+ require_data_stream: false
+ body:
+ '@timestamp': '2022-12-12'
+ foo: bar
+
+ - do:
+ catch: /is not a data stream/
+ index:
+ index: other-index
+ require_data_stream: true
+ body:
+ '@timestamp': '2022-12-12'
+ foo: bar
diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
index 0c4455971fa1a..f2b6f3c279fdb 100644
--- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
+++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
@@ -795,8 +795,8 @@ private boolean addFailureIfRequiresAliasAndAliasIsMissing(DocWriteRequest> re
}
private boolean addFailureIfRequiresDataStreamAndNoParentDataStream(DocWriteRequest> request, int idx, final Metadata metadata) {
- if (request.isRequireDataStream() && (metadata.hasParentDataStream(request.index()) == false)) {
- Exception exception = new IndexNotFoundException(
+ if (request.isRequireDataStream() && (metadata.indexIsADataStream(request.index()) == false)) {
+ Exception exception = new ResourceNotFoundException(
"["
+ DocWriteRequest.REQUIRE_DATA_STREAM
+ "] request flag is [true] and ["
diff --git a/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java
index 84c0615023b94..a221a064b4397 100644
--- a/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java
+++ b/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java
@@ -9,6 +9,7 @@
package org.elasticsearch.action.update;
import org.elasticsearch.ResourceAlreadyExistsException;
+import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.ActionType;
@@ -116,8 +117,8 @@ protected void doExecute(Task task, final UpdateRequest request, final ActionLis
request.index()
);
}
- if (request.isRequireDataStream() && (clusterService.state().getMetadata().hasParentDataStream(request.index()) == false)) {
- throw new IndexNotFoundException(
+ if (request.isRequireDataStream() && (clusterService.state().getMetadata().indexIsADataStream(request.index()) == false)) {
+ throw new ResourceNotFoundException(
"["
+ DocWriteRequest.REQUIRE_DATA_STREAM
+ "] request flag is [true] and ["
diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java
index 0c8739119baa4..2a5aa3c603c9f 100644
--- a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java
+++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java
@@ -898,12 +898,12 @@ public Map findDataStreams(String... concreteIndices) {
}
/**
- * Checks whether the provided index has a parent data streams.
+ * Checks whether the provided index is a data stream.
*/
- public boolean hasParentDataStream(String indexName) {
+ public boolean indexIsADataStream(String indexName) {
final SortedMap lookup = getIndicesLookup();
- IndexAbstraction index = lookup.get(indexName);
- return index != null && index.getParentDataStream() != null;
+ IndexAbstraction abstraction = lookup.get(indexName);
+ return abstraction != null && abstraction.getType() == IndexAbstraction.Type.DATA_STREAM;
}
@SuppressWarnings("unchecked")
diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java
index c29c5ca61eaa5..fc92bc7d0b1de 100644
--- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java
+++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java
@@ -80,7 +80,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
bulkRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
}
Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, null);
- Boolean defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, null);
+ boolean defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false);
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
bulkRequest.setRefreshPolicy(request.param("refresh"));
bulkRequest.add(
From 8c99dcf566766d5a02d3afee302fbdb6c1170d50 Mon Sep 17 00:00:00 2001
From: Lee Hinman
Date: Mon, 8 Jan 2024 16:58:03 -0700
Subject: [PATCH 17/22] Update REST docstrings for the flag
---
rest-api-spec/src/main/resources/rest-api-spec/api/index.json | 2 +-
.../src/main/resources/rest-api-spec/api/update.json | 4 ----
2 files changed, 1 insertion(+), 5 deletions(-)
diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/index.json b/rest-api-spec/src/main/resources/rest-api-spec/api/index.json
index 8ebdb8658a370..102ca4e012e85 100644
--- a/rest-api-spec/src/main/resources/rest-api-spec/api/index.json
+++ b/rest-api-spec/src/main/resources/rest-api-spec/api/index.json
@@ -104,7 +104,7 @@
},
"require_data_stream": {
"type": "boolean",
- "description": "When true, requires destination to have a parent data stream. Default is false"
+ "description": "When true, requires the destination to be a data stream (existing or to-be-created). Default is false"
}
},
"body":{
diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/update.json b/rest-api-spec/src/main/resources/rest-api-spec/api/update.json
index 6975880cab3d9..e588777e990ec 100644
--- a/rest-api-spec/src/main/resources/rest-api-spec/api/update.json
+++ b/rest-api-spec/src/main/resources/rest-api-spec/api/update.json
@@ -83,10 +83,6 @@
"require_alias": {
"type": "boolean",
"description": "When true, requires destination is an alias. Default is false"
- },
- "require_data_stream": {
- "type": "boolean",
- "description": "When true, requires destination to have a parent data stream. Default is false"
}
},
"body":{
From 646ea739cddc8f2344b5bff3d8de230365a909fa Mon Sep 17 00:00:00 2001
From: Lee Hinman
Date: Mon, 8 Jan 2024 17:02:05 -0700
Subject: [PATCH 18/22] Remove support for `require_data_stream` on the update
API
---
.../action/bulk/BulkRequestParser.java | 7 +++++-
.../action/update/TransportUpdateAction.java | 14 +-----------
.../action/update/UpdateRequest.java | 22 ++-----------------
.../action/update/UpdateRequestBuilder.java | 8 -------
4 files changed, 9 insertions(+), 42 deletions(-)
diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java
index e38929403502f..5dccd1b55f554 100644
--- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java
+++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java
@@ -399,6 +399,12 @@ public void parse(
"Update requests do not support versioning. " + "Please use `if_seq_no` and `if_primary_term` instead"
);
}
+ if (requireDataStream) {
+ throw new IllegalArgumentException(
+ "Update requests do not support the `require_data_stream` flag, "
+ + "as data streams do not support update operations"
+ );
+ }
// TODO: support dynamic_templates in update requests
if (dynamicTemplates.isEmpty() == false) {
throw new IllegalArgumentException(
@@ -412,7 +418,6 @@ public void parse(
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.setRequireAlias(requireAlias)
- .setRequireDataStream(requireDataStream)
.routing(routing);
try (
XContentParser sliceParser = createParser(
diff --git a/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java
index a221a064b4397..d97368fa4eeca 100644
--- a/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java
+++ b/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java
@@ -9,7 +9,6 @@
package org.elasticsearch.action.update;
import org.elasticsearch.ResourceAlreadyExistsException;
-import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.ActionType;
@@ -117,16 +116,6 @@ protected void doExecute(Task task, final UpdateRequest request, final ActionLis
request.index()
);
}
- if (request.isRequireDataStream() && (clusterService.state().getMetadata().indexIsADataStream(request.index()) == false)) {
- throw new ResourceNotFoundException(
- "["
- + DocWriteRequest.REQUIRE_DATA_STREAM
- + "] request flag is [true] and ["
- + request.index()
- + "] has no parent data stream",
- request.index()
- );
- }
// if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API
if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) {
client.admin()
@@ -134,8 +123,7 @@ protected void doExecute(Task task, final UpdateRequest request, final ActionLis
.create(
new CreateIndexRequest().index(request.index())
.cause("auto(update api)")
- .masterNodeTimeout(request.timeout())
- .requireDataStream(request.isRequireDataStream()),
+ .masterNodeTimeout(request.timeout()),
new ActionListener() {
@Override
public void onResponse(CreateIndexResponse result) {
diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java
index 8eb1da0c8ba3a..afe5320a20914 100644
--- a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java
+++ b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java
@@ -123,7 +123,6 @@ public class UpdateRequest extends InstanceShardOperationRequest
private boolean docAsUpsert = false;
private boolean detectNoop = true;
private boolean requireAlias = false;
- private boolean requireDataStream = false;
@Nullable
private IndexRequest doc;
@@ -165,11 +164,6 @@ public UpdateRequest(@Nullable ShardId shardId, StreamInput in) throws IOExcepti
} else {
requireAlias = false;
}
- if (in.getTransportVersion().onOrAfter(TransportVersions.REQUIRE_DATA_STREAM_ADDED)) {
- requireDataStream = in.readBoolean();
- } else {
- requireDataStream = false;
- }
}
public UpdateRequest(String index, String id) {
@@ -839,17 +833,8 @@ public boolean isRequireAlias() {
@Override
public boolean isRequireDataStream() {
- return requireDataStream;
- }
-
- /**
- * todo
- * @param requireDataStream
- * @return
- */
- public UpdateRequest setRequireDataStream(boolean requireDataStream) {
- this.requireDataStream = requireDataStream;
- return this;
+ // Always false because data streams cannot accept update operations
+ return false;
}
@Override
@@ -929,9 +914,6 @@ private void doWrite(StreamOutput out, boolean thin) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.V_7_10_0)) {
out.writeBoolean(requireAlias);
}
- if (out.getTransportVersion().onOrAfter(TransportVersions.REQUIRE_DATA_STREAM_ADDED)) {
- out.writeBoolean(requireDataStream);
- }
}
@Override
diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java b/server/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java
index 4132a453c4cf5..88bed844558f2 100644
--- a/server/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java
+++ b/server/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java
@@ -355,12 +355,4 @@ public UpdateRequestBuilder setRequireAlias(boolean requireAlias) {
request.setRequireAlias(requireAlias);
return this;
}
-
- /**
- * Sets the require_data_stream flag
- */
- public UpdateRequestBuilder setRequireDataStream(boolean requireDataStream) {
- request.setRequireDataStream(requireDataStream);
- return this;
- }
}
From efc92865095fd3dfc25b0ec9d8d29ec61a20053b Mon Sep 17 00:00:00 2001
From: Lee Hinman
Date: Mon, 8 Jan 2024 17:03:29 -0700
Subject: [PATCH 19/22] =?UTF-8?q?Spotless=20=E0=B2=A0=5F=E0=B2=A0?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../elasticsearch/action/update/TransportUpdateAction.java | 4 +---
1 file changed, 1 insertion(+), 3 deletions(-)
diff --git a/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java
index d97368fa4eeca..c9deec8c504a1 100644
--- a/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java
+++ b/server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java
@@ -121,9 +121,7 @@ protected void doExecute(Task task, final UpdateRequest request, final ActionLis
client.admin()
.indices()
.create(
- new CreateIndexRequest().index(request.index())
- .cause("auto(update api)")
- .masterNodeTimeout(request.timeout()),
+ new CreateIndexRequest().index(request.index()).cause("auto(update api)").masterNodeTimeout(request.timeout()),
new ActionListener() {
@Override
public void onResponse(CreateIndexResponse result) {
From 5a5700b192c53ffb1e2ba345fa1fd310eeb3817a Mon Sep 17 00:00:00 2001
From: Lee Hinman
Date: Tue, 9 Jan 2024 13:47:48 -0700
Subject: [PATCH 20/22] Add additional tests for bulk requests
---
.../data_stream/190_require_data_stream.yml | 80 +++++++++++++++++++
1 file changed, 80 insertions(+)
diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_require_data_stream.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_require_data_stream.yml
index 6a291752f96c7..c77e114616c78 100644
--- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_require_data_stream.yml
+++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/190_require_data_stream.yml
@@ -56,3 +56,83 @@
body:
'@timestamp': '2022-12-12'
foo: bar
+
+---
+"Testing require_data_stream in bulk requests":
+ - skip:
+ version: " - 8.12.99"
+ reason: "require_data_stream was introduced in 8.13.0"
+ features: allowed_warnings
+
+ - do:
+ allowed_warnings:
+ - "index template [ds-template] has index patterns [ds-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [ds-template] will take precedence during new index creation"
+ indices.put_index_template:
+ name: ds-template
+ body:
+ index_patterns: ds-*
+ template:
+ settings:
+ number_of_shards: 1
+ number_of_replicas: 0
+ mappings:
+ properties:
+ field:
+ type: keyword
+ data_stream: {}
+ allow_auto_create: true
+
+ - do:
+ bulk:
+ refresh: true
+ require_data_stream: true
+ body:
+ - index:
+ _index: new_index_not_created
+ - f: 1
+ - index:
+ _index: new_index_created
+ require_data_stream: false
+ - f: 2
+ - index:
+ _index: ds-other
+ op_type: create
+ - "@timestamp": "2024-01-01"
+ - match: { errors: true }
+ - match: { items.0.index.status: 404 }
+ - match: { items.0.index.error.type: index_not_found_exception }
+ - match: { items.0.index.error.reason: "no such index [new_index_not_created] and the index creation request requires a data stream, but no matching index template with data stream template was found for it" }
+ - match: { items.1.index.result: created }
+ - match: { items.2.create.result: created }
+
+ - do:
+ allowed_warnings:
+ - "index template [other-template] has index patterns [ds-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [other-template] will take precedence during new index creation"
+ indices.put_index_template:
+ name: other-template
+ body:
+ index_patterns: other-*
+ template:
+ settings:
+ number_of_shards: 1
+ number_of_replicas: 0
+ mappings:
+ properties:
+ field:
+ type: keyword
+ allow_auto_create: true
+
+ - do:
+ bulk:
+ refresh: true
+ require_data_stream: false
+ body:
+ - index:
+ _index: other-myindex
+ require_data_stream: true
+ op_type: create
+ - "@timestamp": "2024-01-01"
+ - match: { errors: true }
+ - match: { items.0.create.status: 404 }
+ - match: { items.0.create.error.type: index_not_found_exception }
+ - match: { items.0.create.error.reason: "no such index [other-myindex] and the index creation request requires a data stream, but no matching index template with data stream template was found for it" }
From a0593dc7b42a284ebd74c662d4e6d98f7d457ad5 Mon Sep 17 00:00:00 2001
From: Lee Hinman
Date: Tue, 9 Jan 2024 13:50:52 -0700
Subject: [PATCH 21/22] Update changelog not to have a "WIP" in it
---
docs/changelog/101872.yaml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/docs/changelog/101872.yaml b/docs/changelog/101872.yaml
index 737b54ab752e6..1c63c2d8b009a 100644
--- a/docs/changelog/101872.yaml
+++ b/docs/changelog/101872.yaml
@@ -1,5 +1,5 @@
pr: 101872
-summary: "[WIP] Adding `require_data_stream` feature"
+summary: "Add `require_data_stream` parameter to indexing requests to enforce indexing operations target a data stream"
area: Data streams
type: feature
issues:
From 62698b9e1d46ea87e53bf6705dacdb157fe497d4 Mon Sep 17 00:00:00 2001
From: Lee Hinman
Date: Tue, 9 Jan 2024 16:10:08 -0700
Subject: [PATCH 22/22] Remove docs from create index API (the flag is not
implemented there)
---
.../src/main/resources/rest-api-spec/api/indices.create.json | 4 ----
1 file changed, 4 deletions(-)
diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.create.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.create.json
index 64cea3caad06b..3a3f279775fa8 100644
--- a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.create.json
+++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.create.json
@@ -38,10 +38,6 @@
"master_timeout":{
"type":"time",
"description":"Specify timeout for connection to master"
- },
- "require_data_stream": {
- "type": "boolean",
- "description": "When true, requires a matching index template with data stream template. Default is false"
}
},
"body":{