diff --git a/docs/changelog/93016.yaml b/docs/changelog/93016.yaml new file mode 100644 index 0000000000000..4185c836d67e9 --- /dev/null +++ b/docs/changelog/93016.yaml @@ -0,0 +1,5 @@ +pr: 93016 +summary: Adding the ability to skip the ingest pipeline +area: Ingest Node +type: enhancement +issues: [] diff --git a/docs/reference/docs/bulk.asciidoc b/docs/reference/docs/bulk.asciidoc index d900260739903..0bd51ebf63cbb 100644 --- a/docs/reference/docs/bulk.asciidoc +++ b/docs/reference/docs/bulk.asciidoc @@ -246,6 +246,8 @@ on. include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=pipeline] +include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=skip_pipeline] + include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=refresh] `require_alias`:: diff --git a/docs/reference/docs/index_.asciidoc b/docs/reference/docs/index_.asciidoc index 4577e02024805..2ea1cde22ec1e 100644 --- a/docs/reference/docs/index_.asciidoc +++ b/docs/reference/docs/index_.asciidoc @@ -103,6 +103,8 @@ required. See <>. include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=pipeline] +include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=skip_pipeline] + include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=refresh] include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=routing] diff --git a/docs/reference/rest-api/common-parms.asciidoc b/docs/reference/rest-api/common-parms.asciidoc index 92141b9d1952e..7ece50b122470 100644 --- a/docs/reference/rest-api/common-parms.asciidoc +++ b/docs/reference/rest-api/common-parms.asciidoc @@ -716,6 +716,13 @@ tag::pipeline[] (Optional, string) ID of the pipeline to use to preprocess incoming documents. end::pipeline[] +tag::skip_pipeline[] +`skip_pipeline`:: +(Optional, Boolean) If `true`, the pipeline (whether explicit or default) is not +executed. However the final pipeline (if one exists) is still executed. This parameter +is meant to support integrations that run pipelines external to {es}. +end::skip_pipeline[] + tag::pages-processed[] The number of search or bulk index operations processed. Documents are processed in batches instead of individually. diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml index d7738987de18b..c0942635e4931 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml @@ -181,3 +181,38 @@ teardown: index: test_index id: 1 - match: { _source: { my_field: "upper" } } + +--- +"Test bulk request with skip_pipeline": + + - do: + bulk: + body: + - index: + _index: test_index + _id: test_id1 + pipeline: pipeline1 + skip_pipeline: true + - f1: v1 + - index: + _index: test_index + pipeline: pipeline2 + _id: test_id2 + - f1: v2 + - gte: { ingest_took: 0 } + + - do: + get: + index: test_index + id: test_id1 + + - is_false: _source.field1 + - is_false: _source.field2 + + - do: + get: + index: test_index + id: test_id2 + + - is_false: _source.field1 + - match: {_source.field2: value2} diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java index d00585c989336..d65473d347499 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java @@ -57,6 +57,7 @@ public final class BulkRequestParser { private static final ParseField VERSION_TYPE = new ParseField("version_type"); private static final ParseField RETRY_ON_CONFLICT = new ParseField("retry_on_conflict"); private static final ParseField PIPELINE = new ParseField("pipeline"); + private static final ParseField SKIP_PIPELINE = new ParseField("skip_pipeline"); private static final ParseField SOURCE = new ParseField("_source"); private static final ParseField IF_SEQ_NO = new ParseField("if_seq_no"); private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term"); @@ -206,6 +207,7 @@ public void parse( long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; int retryOnConflict = 0; String pipeline = defaultPipeline; + boolean skipPipeline = false; boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias; Map dynamicTemplates = Map.of(); @@ -256,6 +258,8 @@ public void parse( retryOnConflict = parser.intValue(); } else if (PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) { pipeline = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity()); + } else if (SKIP_PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) { + skipPipeline = parser.booleanValue(); } else if (SOURCE.match(currentFieldName, parser.getDeprecationHandler())) { fetchSourceContext = FetchSourceContext.fromXContent(parser); } else if (REQUIRE_ALIAS.match(currentFieldName, parser.getDeprecationHandler())) { @@ -339,6 +343,7 @@ public void parse( .version(version) .versionType(versionType) .setPipeline(pipeline) + .setSkipPipeline(skipPipeline) .setIfSeqNo(ifSeqNo) .setIfPrimaryTerm(ifPrimaryTerm) .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType) @@ -354,6 +359,7 @@ public void parse( .versionType(versionType) .create("create".equals(opType)) .setPipeline(pipeline) + .setSkipPipeline(skipPipeline) .setIfSeqNo(ifSeqNo) .setIfPrimaryTerm(ifPrimaryTerm) .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType) @@ -370,6 +376,7 @@ public void parse( .versionType(versionType) .create(true) .setPipeline(pipeline) + .setSkipPipeline(skipPipeline) .setIfSeqNo(ifSeqNo) .setIfPrimaryTerm(ifPrimaryTerm) .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType) @@ -411,6 +418,7 @@ public void parse( IndexRequest upsertRequest = updateRequest.upsertRequest(); if (upsertRequest != null) { upsertRequest.setPipeline(pipeline); + upsertRequest.setSkipPipeline(skipPipeline); } updateRequestConsumer.accept(updateRequest); diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 70d0cd40b5a35..e85a75bd8ba3c 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -97,6 +97,10 @@ public class IndexRequest extends ReplicatedWriteRequest implement private XContentType contentType; private String pipeline; + /** + * If true, the request pipeline and default pipeline are to be skipped (the final pipeline will still be executed if it exists). + */ + private boolean skipPipeline = false; private String finalPipeline; private boolean isPipelineResolved; @@ -139,6 +143,11 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio version = in.readLong(); versionType = VersionType.fromValue(in.readByte()); pipeline = in.readOptionalString(); + if (in.getVersion().onOrAfter(Version.V_8_7_0)) { + skipPipeline = in.readBoolean(); + } else { + skipPipeline = false; + } if (in.getVersion().onOrAfter(Version.V_7_5_0)) { finalPipeline = in.readOptionalString(); } @@ -313,6 +322,26 @@ public String getPipeline() { return this.pipeline; } + /** + * Sets if the pipeline for this request should be skipped. + * + * @param skipPipeline true if the pipeline should be skipped + * @return the request + */ + public IndexRequest setSkipPipeline(final boolean skipPipeline) { + this.skipPipeline = skipPipeline; + return this; + } + + /** + * Returns whether or not the pipeline for this request should be skipped. + * + * @return true if the pipeline has been resolved + */ + public boolean getSkipPipeline() { + return this.skipPipeline; + } + /** * Sets the final ingest pipeline to be executed before indexing the document. * @@ -683,6 +712,9 @@ private void writeBody(StreamOutput out) throws IOException { out.writeLong(version); out.writeByte(versionType.getValue()); out.writeOptionalString(pipeline); + if (out.getVersion().onOrAfter(Version.V_8_7_0)) { + out.writeBoolean(skipPipeline); + } if (out.getVersion().onOrAfter(Version.V_7_5_0)) { out.writeOptionalString(finalPipeline); } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 9ddb20a04b219..678c1efcf32fd 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -235,7 +235,7 @@ public static boolean resolvePipelines( if (IndexSettings.DEFAULT_PIPELINE.exists(indexSettings)) { // find the default pipeline if one is defined from an existing index setting defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings); - indexRequest.setPipeline(defaultPipeline); + setPipelineOrSkip(indexRequest, defaultPipeline); } if (IndexSettings.FINAL_PIPELINE.exists(indexSettings)) { // find the final pipeline if one is defined from an existing index setting @@ -255,7 +255,7 @@ public static boolean resolvePipelines( if (IndexSettings.FINAL_PIPELINE.exists(settings)) { finalPipeline = IndexSettings.FINAL_PIPELINE.get(settings); } - indexRequest.setPipeline(Objects.requireNonNullElse(defaultPipeline, NOOP_PIPELINE_NAME)); + setPipelineOrSkip(indexRequest, Objects.requireNonNullElse(defaultPipeline, NOOP_PIPELINE_NAME)); indexRequest.setFinalPipeline(Objects.requireNonNullElse(finalPipeline, NOOP_PIPELINE_NAME)); } else { List templates = MetadataIndexTemplateService.findV1Templates( @@ -279,13 +279,13 @@ public static boolean resolvePipelines( break; } } - indexRequest.setPipeline(Objects.requireNonNullElse(defaultPipeline, NOOP_PIPELINE_NAME)); + setPipelineOrSkip(indexRequest, Objects.requireNonNullElse(defaultPipeline, NOOP_PIPELINE_NAME)); indexRequest.setFinalPipeline(Objects.requireNonNullElse(finalPipeline, NOOP_PIPELINE_NAME)); } } if (requestPipeline != null) { - indexRequest.setPipeline(requestPipeline); + setPipelineOrSkip(indexRequest, requestPipeline); } /* @@ -306,6 +306,14 @@ public static boolean resolvePipelines( || NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false; } + private static void setPipelineOrSkip(IndexRequest indexRequest, String pipelineName) { + if (indexRequest.getSkipPipeline()) { + indexRequest.setPipeline(NOOP_PIPELINE_NAME); + } else { + indexRequest.setPipeline(pipelineName); + } + } + public ClusterService getClusterService() { return clusterService; } diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java index fa2315e6b43e3..c781e12f13206 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java @@ -122,6 +122,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC indexRequest.id(request.param("id")); indexRequest.routing(request.param("routing")); indexRequest.setPipeline(request.param("pipeline")); + indexRequest.setSkipPipeline(request.paramAsBoolean("skip_pipeline", false)); indexRequest.source(request.requiredContent(), request.getXContentType()); indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT)); indexRequest.setRefreshPolicy(request.param("refresh")); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java index 386ac68e87b45..5483ede8d3bd7 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java @@ -20,6 +20,8 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import static org.hamcrest.Matchers.is; + public class BulkRequestParserTests extends ESTestCase { public void testIndexRequest() throws IOException { @@ -309,4 +311,35 @@ public void testFailOnInvalidAction() { ); } + public void testSkipPipeline() throws IOException { + BytesArray request = new BytesArray(""" + { "index":{ "_index": "bar", "pipeline": "foo", "skip_pipeline": "true"} } + {} + { "index":{ "_index": "bar", "pipeline": "foo", "routing": "blub" } } + {} + """); + BulkRequestParser parser = new BulkRequestParser(randomBoolean(), RestApiVersion.current()); + final List indexRequests = new ArrayList<>(); + parser.parse( + request, + null, + null, + null, + null, + null, + true, + XContentType.JSON, + (indexRequest, type) -> indexRequests.add(indexRequest), + req -> fail(), + req -> fail() + ); + assertThat(indexRequests, Matchers.hasSize(2)); + final IndexRequest first = indexRequests.get(0); + final IndexRequest second = indexRequests.get(1); + assertThat(first.getSkipPipeline(), is(true)); + assertThat(first.getPipeline(), is("foo")); + assertThat(second.getSkipPipeline(), is(false)); + assertThat(second.getPipeline(), is("foo")); + } + } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 1f1dc3d11c029..3e2d9f42a3d4c 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -1852,6 +1852,71 @@ public void testResolveRequestOrDefaultPipelineAndFinalPipeline() { } } + public void testResolvePipelinesWithSkipPipeline() { + // no pipeline: + { + Metadata metadata = Metadata.builder().build(); + IndexRequest indexRequest = new IndexRequest("idx").setSkipPipeline(true); + boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata); + assertThat(result, is(false)); + assertThat(indexRequest.isPipelineResolved(), is(true)); + assertThat(indexRequest.getPipeline(), equalTo(IngestService.NOOP_PIPELINE_NAME)); + } + + // request pipeline: + { + Metadata metadata = Metadata.builder().build(); + IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline").setSkipPipeline(true); + boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata); + assertThat(result, is(false)); + assertThat(indexRequest.isPipelineResolved(), is(true)); + assertThat(indexRequest.getPipeline(), equalTo(IngestService.NOOP_PIPELINE_NAME)); + } + + // default pipeline: + { + IndexMetadata.Builder builder = IndexMetadata.builder("idx") + .settings(settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default-pipeline")) + .numberOfShards(1) + .numberOfReplicas(0); + Metadata metadata = Metadata.builder().put(builder).build(); + IndexRequest indexRequest = new IndexRequest("idx").setSkipPipeline(true); + boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata); + assertThat(result, is(false)); + assertThat(indexRequest.isPipelineResolved(), is(true)); + assertThat(indexRequest.getPipeline(), equalTo(IngestService.NOOP_PIPELINE_NAME)); + } + + // request pipeline with default pipeline: + { + IndexMetadata.Builder builder = IndexMetadata.builder("idx") + .settings(settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default-pipeline")) + .numberOfShards(1) + .numberOfReplicas(0); + Metadata metadata = Metadata.builder().put(builder).build(); + IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline").setSkipPipeline(true); + boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata); + assertThat(result, is(false)); + assertThat(indexRequest.isPipelineResolved(), is(true)); + assertThat(indexRequest.getPipeline(), equalTo(IngestService.NOOP_PIPELINE_NAME)); + } + + // request pipeline with final pipeline: + { + IndexMetadata.Builder builder = IndexMetadata.builder("idx") + .settings(settings(Version.CURRENT).put(IndexSettings.FINAL_PIPELINE.getKey(), "final-pipeline")) + .numberOfShards(1) + .numberOfReplicas(0); + Metadata metadata = Metadata.builder().put(builder).build(); + IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline").setSkipPipeline(true); + boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata); + assertThat(result, is(true)); + assertThat(indexRequest.isPipelineResolved(), is(true)); + assertThat(indexRequest.getPipeline(), equalTo(IngestService.NOOP_PIPELINE_NAME)); + assertThat(indexRequest.getFinalPipeline(), equalTo("final-pipeline")); + } + } + public void testUpdatingRandomPipelineWithoutChangesIsNoOp() throws Exception { var randomMap = randomMap(10, 50, IngestServiceTests::randomMapEntry); diff --git a/server/src/test/java/org/elasticsearch/rest/action/document/RestIndexActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/document/RestIndexActionTests.java index 32616445371f8..5496706ff9029 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/document/RestIndexActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/document/RestIndexActionTests.java @@ -122,4 +122,20 @@ public void testAutoIdWithType() { dispatchRequest(deprecatedRequest); assertCriticalWarnings(RestIndexAction.TYPES_DEPRECATION_MESSAGE); } + + public void testSkipPipeline() { + final String pipelineName = "foo"; + verifyingClient.setExecuteVerifier((actionType, request) -> { + assertThat(request, instanceOf(IndexRequest.class)); + assertThat(((IndexRequest) request).getPipeline(), equalTo(pipelineName)); + assertThat(((IndexRequest) request).getSkipPipeline(), equalTo(true)); + return new IndexResponse(new ShardId("test", "test", 0), "id", 0, 0, 0, true); + }); + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) + .withPath("/some_index/_doc") + .withParams(Map.of("pipeline", pipelineName, "skip_pipeline", "true")) + .withContent(new BytesArray("{}"), XContentType.JSON) + .build(); + dispatchRequest(request); + } }