From 4349a0b3305b68fb83a7f93e88fb1e15c48438b0 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Tue, 17 Jan 2023 17:02:51 -0600 Subject: [PATCH 1/6] Adding the ability to skip the ingest pipeline --- docs/reference/docs/bulk.asciidoc | 2 + docs/reference/docs/index_.asciidoc | 2 + docs/reference/rest-api/common-parms.asciidoc | 11 +++- .../rest-api-spec/test/ingest/70_bulk.yml | 53 +++++++++++++++ .../action/bulk/BulkRequestParser.java | 8 +++ .../action/index/IndexRequest.java | 29 +++++++++ .../elasticsearch/ingest/IngestService.java | 16 +++-- .../rest/action/document/RestIndexAction.java | 1 + .../action/bulk/BulkRequestParserTests.java | 33 ++++++++++ .../ingest/IngestServiceTests.java | 65 +++++++++++++++++++ .../action/document/RestIndexActionTests.java | 16 +++++ 11 files changed, 230 insertions(+), 6 deletions(-) diff --git a/docs/reference/docs/bulk.asciidoc b/docs/reference/docs/bulk.asciidoc index d900260739903..0bd51ebf63cbb 100644 --- a/docs/reference/docs/bulk.asciidoc +++ b/docs/reference/docs/bulk.asciidoc @@ -246,6 +246,8 @@ on. include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=pipeline] +include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=skip_pipeline] + include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=refresh] `require_alias`:: diff --git a/docs/reference/docs/index_.asciidoc b/docs/reference/docs/index_.asciidoc index 4577e02024805..2ea1cde22ec1e 100644 --- a/docs/reference/docs/index_.asciidoc +++ b/docs/reference/docs/index_.asciidoc @@ -103,6 +103,8 @@ required. See <>. include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=pipeline] +include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=skip_pipeline] + include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=refresh] include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=routing] diff --git a/docs/reference/rest-api/common-parms.asciidoc b/docs/reference/rest-api/common-parms.asciidoc index 92141b9d1952e..5f741c1601557 100644 --- a/docs/reference/rest-api/common-parms.asciidoc +++ b/docs/reference/rest-api/common-parms.asciidoc @@ -130,8 +130,8 @@ shards. Statuses are: All shards are assigned. * `yellow`: - All primary shards are assigned, but one or more replica shards are - unassigned. If a node in the cluster fails, some data could be unavailable + All primary shards are assigned, but one or more replica shards are + unassigned. If a node in the cluster fails, some data could be unavailable until that node is repaired. * `red`: @@ -716,6 +716,13 @@ tag::pipeline[] (Optional, string) ID of the pipeline to use to preprocess incoming documents. end::pipeline[] +tag::skip_pipeline[] +`skip_pipeline`:: +(Optional, Boolean) If `true`, the pipeline (whether explicit or default) is not +executed. However the final pipeline (if one exists) is still executed. This paramter +is meant to support integrations that run pipelines external to {es}. +end::skip_pipeline[] + tag::pages-processed[] The number of search or bulk index operations processed. Documents are processed in batches instead of individually. diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml index d7738987de18b..43a08f6afdbf1 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml @@ -181,3 +181,56 @@ teardown: index: test_index id: 1 - match: { _source: { my_field: "upper" } } + +--- +"Test bulk request with skip_pipeline": + + - do: + bulk: + body: + - index: + _index: test_index + _id: test_id1 + pipeline: pipeline1 + skip_pipeline: true + - f1: v1 + - index: + _index: test_index + pipeline: pipeline2 + _id: test_id2 + - f1: v2 + - gte: { ingest_took: 0 } + + - do: + get: + index: test_index + id: test_id1 + + - is_false: _source.field1 + - is_false: _source.field2 + + - do: + get: + index: test_index + id: test_id2 + + - is_false: _source.field1 + - match: {_source.field2: value2} + + - do: + cluster.state: {} + # Get master node id + - set: { master_node: master } + + - do: + nodes.stats: + metric: [ ingest ] + #we can't assert anything here since we might have more than one node in the cluster + - gte: {nodes.$master.ingest.total.count: 0} + - gte: {nodes.$master.ingest.total.failed: 0} + - gte: {nodes.$master.ingest.total.time_in_millis: 0} + - match: {nodes.$master.ingest.total.current: 0} + - gte: {nodes.$master.ingest.pipelines.pipeline1.count: 0} + - match: {nodes.$master.ingest.pipelines.pipeline1.failed: 0} + - gte: {nodes.$master.ingest.pipelines.pipeline1.time_in_millis: 0} + - match: {nodes.$master.ingest.pipelines.pipeline1.current: 0} diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java index d00585c989336..e876bc0c37e60 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java @@ -57,6 +57,7 @@ public final class BulkRequestParser { private static final ParseField VERSION_TYPE = new ParseField("version_type"); private static final ParseField RETRY_ON_CONFLICT = new ParseField("retry_on_conflict"); private static final ParseField PIPELINE = new ParseField("pipeline"); + private static final ParseField SKIP_PIPELINE = new ParseField("skip_pipeline"); private static final ParseField SOURCE = new ParseField("_source"); private static final ParseField IF_SEQ_NO = new ParseField("if_seq_no"); private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term"); @@ -206,6 +207,7 @@ public void parse( long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM; int retryOnConflict = 0; String pipeline = defaultPipeline; + boolean skipPipeline = false; boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias; Map dynamicTemplates = Map.of(); @@ -256,6 +258,8 @@ public void parse( retryOnConflict = parser.intValue(); } else if (PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) { pipeline = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity()); + } else if (SKIP_PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) { + skipPipeline = parser.booleanValue(); } else if (SOURCE.match(currentFieldName, parser.getDeprecationHandler())) { fetchSourceContext = FetchSourceContext.fromXContent(parser); } else if (REQUIRE_ALIAS.match(currentFieldName, parser.getDeprecationHandler())) { @@ -339,6 +343,7 @@ public void parse( .version(version) .versionType(versionType) .setPipeline(pipeline) + .skipPipeline(skipPipeline) .setIfSeqNo(ifSeqNo) .setIfPrimaryTerm(ifPrimaryTerm) .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType) @@ -354,6 +359,7 @@ public void parse( .versionType(versionType) .create("create".equals(opType)) .setPipeline(pipeline) + .skipPipeline(skipPipeline) .setIfSeqNo(ifSeqNo) .setIfPrimaryTerm(ifPrimaryTerm) .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType) @@ -370,6 +376,7 @@ public void parse( .versionType(versionType) .create(true) .setPipeline(pipeline) + .skipPipeline(skipPipeline) .setIfSeqNo(ifSeqNo) .setIfPrimaryTerm(ifPrimaryTerm) .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType) @@ -411,6 +418,7 @@ public void parse( IndexRequest upsertRequest = updateRequest.upsertRequest(); if (upsertRequest != null) { upsertRequest.setPipeline(pipeline); + upsertRequest.skipPipeline(skipPipeline); } updateRequestConsumer.accept(updateRequest); diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 70d0cd40b5a35..60ab322e71e8a 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -97,6 +97,7 @@ public class IndexRequest extends ReplicatedWriteRequest implement private XContentType contentType; private String pipeline; + private boolean skipPipeline; private String finalPipeline; private boolean isPipelineResolved; @@ -139,6 +140,11 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio version = in.readLong(); versionType = VersionType.fromValue(in.readByte()); pipeline = in.readOptionalString(); + if (in.getVersion().onOrAfter(Version.V_8_7_0)) { + skipPipeline = in.readBoolean(); + } else { + skipPipeline = false; + } if (in.getVersion().onOrAfter(Version.V_7_5_0)) { finalPipeline = in.readOptionalString(); } @@ -313,6 +319,26 @@ public String getPipeline() { return this.pipeline; } + /** + * Sets if the pipeline for this request should be skipped. + * + * @param skipPipeline true if the pipeline should be skipped + * @return the request + */ + public IndexRequest skipPipeline(final boolean skipPipeline) { + this.skipPipeline = skipPipeline; + return this; + } + + /** + * Returns whether or not the pipeline for this request should be skipped. + * + * @return true if the pipeline has been resolved + */ + public boolean skipPipeline() { + return this.skipPipeline; + } + /** * Sets the final ingest pipeline to be executed before indexing the document. * @@ -683,6 +709,9 @@ private void writeBody(StreamOutput out) throws IOException { out.writeLong(version); out.writeByte(versionType.getValue()); out.writeOptionalString(pipeline); + if (out.getVersion().onOrAfter(Version.V_8_7_0)) { + out.writeBoolean(skipPipeline); + } if (out.getVersion().onOrAfter(Version.V_7_5_0)) { out.writeOptionalString(finalPipeline); } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 9ddb20a04b219..d39ec86277020 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -235,7 +235,7 @@ public static boolean resolvePipelines( if (IndexSettings.DEFAULT_PIPELINE.exists(indexSettings)) { // find the default pipeline if one is defined from an existing index setting defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings); - indexRequest.setPipeline(defaultPipeline); + setPipelineOrSkip(indexRequest, defaultPipeline); } if (IndexSettings.FINAL_PIPELINE.exists(indexSettings)) { // find the final pipeline if one is defined from an existing index setting @@ -255,7 +255,7 @@ public static boolean resolvePipelines( if (IndexSettings.FINAL_PIPELINE.exists(settings)) { finalPipeline = IndexSettings.FINAL_PIPELINE.get(settings); } - indexRequest.setPipeline(Objects.requireNonNullElse(defaultPipeline, NOOP_PIPELINE_NAME)); + setPipelineOrSkip(indexRequest, Objects.requireNonNullElse(defaultPipeline, NOOP_PIPELINE_NAME)); indexRequest.setFinalPipeline(Objects.requireNonNullElse(finalPipeline, NOOP_PIPELINE_NAME)); } else { List templates = MetadataIndexTemplateService.findV1Templates( @@ -279,13 +279,13 @@ public static boolean resolvePipelines( break; } } - indexRequest.setPipeline(Objects.requireNonNullElse(defaultPipeline, NOOP_PIPELINE_NAME)); + setPipelineOrSkip(indexRequest, Objects.requireNonNullElse(defaultPipeline, NOOP_PIPELINE_NAME)); indexRequest.setFinalPipeline(Objects.requireNonNullElse(finalPipeline, NOOP_PIPELINE_NAME)); } } if (requestPipeline != null) { - indexRequest.setPipeline(requestPipeline); + setPipelineOrSkip(indexRequest, requestPipeline); } /* @@ -306,6 +306,14 @@ public static boolean resolvePipelines( || NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false; } + private static void setPipelineOrSkip(IndexRequest indexRequest, String pipelineName) { + if (indexRequest.skipPipeline()) { + indexRequest.setPipeline(NOOP_PIPELINE_NAME); + } else { + indexRequest.setPipeline(pipelineName); + } + } + public ClusterService getClusterService() { return clusterService; } diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java index fa2315e6b43e3..a83e6a8949878 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java @@ -122,6 +122,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC indexRequest.id(request.param("id")); indexRequest.routing(request.param("routing")); indexRequest.setPipeline(request.param("pipeline")); + indexRequest.skipPipeline(request.paramAsBoolean("skip_pipeline", false)); indexRequest.source(request.requiredContent(), request.getXContentType()); indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT)); indexRequest.setRefreshPolicy(request.param("refresh")); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java index 386ac68e87b45..ffaef5dd3ca18 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java @@ -20,6 +20,8 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import static org.hamcrest.Matchers.is; + public class BulkRequestParserTests extends ESTestCase { public void testIndexRequest() throws IOException { @@ -309,4 +311,35 @@ public void testFailOnInvalidAction() { ); } + public void testSkipPipeline() throws IOException { + BytesArray request = new BytesArray(""" + { "index":{ "_index": "bar", "pipeline": "foo", "skip_pipeline": "true"} } + {} + { "index":{ "_index": "bar", "pipeline": "foo", "routing": "blub" } } + {} + """); + BulkRequestParser parser = new BulkRequestParser(randomBoolean(), RestApiVersion.current()); + final List indexRequests = new ArrayList<>(); + parser.parse( + request, + null, + null, + null, + null, + null, + true, + XContentType.JSON, + (indexRequest, type) -> indexRequests.add(indexRequest), + req -> fail(), + req -> fail() + ); + assertThat(indexRequests, Matchers.hasSize(2)); + final IndexRequest first = indexRequests.get(0); + final IndexRequest second = indexRequests.get(1); + assertThat(first.skipPipeline(), is(true)); + assertThat(first.getPipeline(), is("foo")); + assertThat(second.skipPipeline(), is(false)); + assertThat(second.getPipeline(), is("foo")); + } + } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 1f1dc3d11c029..4a965b2d49af8 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -1852,6 +1852,71 @@ public void testResolveRequestOrDefaultPipelineAndFinalPipeline() { } } + public void testResolvePipelinesWithSkipPipeline() { + // no pipeline: + { + Metadata metadata = Metadata.builder().build(); + IndexRequest indexRequest = new IndexRequest("idx").skipPipeline(true); + boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata); + assertThat(result, is(false)); + assertThat(indexRequest.isPipelineResolved(), is(true)); + assertThat(indexRequest.getPipeline(), equalTo(IngestService.NOOP_PIPELINE_NAME)); + } + + // request pipeline: + { + Metadata metadata = Metadata.builder().build(); + IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline").skipPipeline(true); + boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata); + assertThat(result, is(false)); + assertThat(indexRequest.isPipelineResolved(), is(true)); + assertThat(indexRequest.getPipeline(), equalTo(IngestService.NOOP_PIPELINE_NAME)); + } + + // default pipeline: + { + IndexMetadata.Builder builder = IndexMetadata.builder("idx") + .settings(settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default-pipeline")) + .numberOfShards(1) + .numberOfReplicas(0); + Metadata metadata = Metadata.builder().put(builder).build(); + IndexRequest indexRequest = new IndexRequest("idx").skipPipeline(true); + boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata); + assertThat(result, is(false)); + assertThat(indexRequest.isPipelineResolved(), is(true)); + assertThat(indexRequest.getPipeline(), equalTo(IngestService.NOOP_PIPELINE_NAME)); + } + + // request pipeline with default pipeline: + { + IndexMetadata.Builder builder = IndexMetadata.builder("idx") + .settings(settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default-pipeline")) + .numberOfShards(1) + .numberOfReplicas(0); + Metadata metadata = Metadata.builder().put(builder).build(); + IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline").skipPipeline(true); + boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata); + assertThat(result, is(false)); + assertThat(indexRequest.isPipelineResolved(), is(true)); + assertThat(indexRequest.getPipeline(), equalTo(IngestService.NOOP_PIPELINE_NAME)); + } + + // request pipeline with final pipeline: + { + IndexMetadata.Builder builder = IndexMetadata.builder("idx") + .settings(settings(Version.CURRENT).put(IndexSettings.FINAL_PIPELINE.getKey(), "final-pipeline")) + .numberOfShards(1) + .numberOfReplicas(0); + Metadata metadata = Metadata.builder().put(builder).build(); + IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline").skipPipeline(true); + boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata); + assertThat(result, is(true)); + assertThat(indexRequest.isPipelineResolved(), is(true)); + assertThat(indexRequest.getPipeline(), equalTo(IngestService.NOOP_PIPELINE_NAME)); + assertThat(indexRequest.getFinalPipeline(), equalTo("final-pipeline")); + } + } + public void testUpdatingRandomPipelineWithoutChangesIsNoOp() throws Exception { var randomMap = randomMap(10, 50, IngestServiceTests::randomMapEntry); diff --git a/server/src/test/java/org/elasticsearch/rest/action/document/RestIndexActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/document/RestIndexActionTests.java index 32616445371f8..b24614dfc0c10 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/document/RestIndexActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/document/RestIndexActionTests.java @@ -122,4 +122,20 @@ public void testAutoIdWithType() { dispatchRequest(deprecatedRequest); assertCriticalWarnings(RestIndexAction.TYPES_DEPRECATION_MESSAGE); } + + public void testSkipPipeline() { + final String pipelineName = "foo"; + verifyingClient.setExecuteVerifier((actionType, request) -> { + assertThat(request, instanceOf(IndexRequest.class)); + assertThat(((IndexRequest) request).getPipeline(), equalTo(pipelineName)); + assertThat(((IndexRequest) request).skipPipeline(), equalTo(true)); + return new IndexResponse(new ShardId("test", "test", 0), "id", 0, 0, 0, true); + }); + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) + .withPath("/some_index/_doc") + .withParams(Map.of("pipeline", pipelineName, "skip_pipeline", "true")) + .withContent(new BytesArray("{}"), XContentType.JSON) + .build(); + dispatchRequest(request); + } } From e153cc5ea7f8d30a28922cc0a08737b19784a6fb Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Tue, 17 Jan 2023 17:04:40 -0600 Subject: [PATCH 2/6] Update docs/changelog/93016.yaml --- docs/changelog/93016.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/93016.yaml diff --git a/docs/changelog/93016.yaml b/docs/changelog/93016.yaml new file mode 100644 index 0000000000000..4185c836d67e9 --- /dev/null +++ b/docs/changelog/93016.yaml @@ -0,0 +1,5 @@ +pr: 93016 +summary: Adding the ability to skip the ingest pipeline +area: Ingest Node +type: enhancement +issues: [] From 000aac5616f41f5e2e717645687903bd9e6966b0 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Tue, 17 Jan 2023 17:19:23 -0600 Subject: [PATCH 3/6] removing unnecessary changes --- docs/reference/rest-api/common-parms.asciidoc | 4 ++-- .../rest-api-spec/test/ingest/70_bulk.yml | 18 ------------------ 2 files changed, 2 insertions(+), 20 deletions(-) diff --git a/docs/reference/rest-api/common-parms.asciidoc b/docs/reference/rest-api/common-parms.asciidoc index 5f741c1601557..137ec135cff6c 100644 --- a/docs/reference/rest-api/common-parms.asciidoc +++ b/docs/reference/rest-api/common-parms.asciidoc @@ -130,8 +130,8 @@ shards. Statuses are: All shards are assigned. * `yellow`: - All primary shards are assigned, but one or more replica shards are - unassigned. If a node in the cluster fails, some data could be unavailable + All primary shards are assigned, but one or more replica shards are + unassigned. If a node in the cluster fails, some data could be unavailable until that node is repaired. * `red`: diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml index 43a08f6afdbf1..c0942635e4931 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml @@ -216,21 +216,3 @@ teardown: - is_false: _source.field1 - match: {_source.field2: value2} - - - do: - cluster.state: {} - # Get master node id - - set: { master_node: master } - - - do: - nodes.stats: - metric: [ ingest ] - #we can't assert anything here since we might have more than one node in the cluster - - gte: {nodes.$master.ingest.total.count: 0} - - gte: {nodes.$master.ingest.total.failed: 0} - - gte: {nodes.$master.ingest.total.time_in_millis: 0} - - match: {nodes.$master.ingest.total.current: 0} - - gte: {nodes.$master.ingest.pipelines.pipeline1.count: 0} - - match: {nodes.$master.ingest.pipelines.pipeline1.failed: 0} - - gte: {nodes.$master.ingest.pipelines.pipeline1.time_in_millis: 0} - - match: {nodes.$master.ingest.pipelines.pipeline1.current: 0} From 5837f7033fe38bc2cac09f19bf6ef97a2d476d90 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Wed, 18 Jan 2023 08:34:34 -0600 Subject: [PATCH 4/6] changing getter/setter names --- .../elasticsearch/action/bulk/BulkRequestParser.java | 8 ++++---- .../org/elasticsearch/action/index/IndexRequest.java | 4 ++-- .../java/org/elasticsearch/ingest/IngestService.java | 2 +- .../rest/action/document/RestIndexAction.java | 2 +- .../action/bulk/BulkRequestParserTests.java | 4 ++-- .../org/elasticsearch/ingest/IngestServiceTests.java | 10 +++++----- .../rest/action/document/RestIndexActionTests.java | 2 +- 7 files changed, 16 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java index e876bc0c37e60..d65473d347499 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java @@ -343,7 +343,7 @@ public void parse( .version(version) .versionType(versionType) .setPipeline(pipeline) - .skipPipeline(skipPipeline) + .setSkipPipeline(skipPipeline) .setIfSeqNo(ifSeqNo) .setIfPrimaryTerm(ifPrimaryTerm) .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType) @@ -359,7 +359,7 @@ public void parse( .versionType(versionType) .create("create".equals(opType)) .setPipeline(pipeline) - .skipPipeline(skipPipeline) + .setSkipPipeline(skipPipeline) .setIfSeqNo(ifSeqNo) .setIfPrimaryTerm(ifPrimaryTerm) .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType) @@ -376,7 +376,7 @@ public void parse( .versionType(versionType) .create(true) .setPipeline(pipeline) - .skipPipeline(skipPipeline) + .setSkipPipeline(skipPipeline) .setIfSeqNo(ifSeqNo) .setIfPrimaryTerm(ifPrimaryTerm) .source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType) @@ -418,7 +418,7 @@ public void parse( IndexRequest upsertRequest = updateRequest.upsertRequest(); if (upsertRequest != null) { upsertRequest.setPipeline(pipeline); - upsertRequest.skipPipeline(skipPipeline); + upsertRequest.setSkipPipeline(skipPipeline); } updateRequestConsumer.accept(updateRequest); diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 60ab322e71e8a..01f0eddea0e33 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -325,7 +325,7 @@ public String getPipeline() { * @param skipPipeline true if the pipeline should be skipped * @return the request */ - public IndexRequest skipPipeline(final boolean skipPipeline) { + public IndexRequest setSkipPipeline(final boolean skipPipeline) { this.skipPipeline = skipPipeline; return this; } @@ -335,7 +335,7 @@ public IndexRequest skipPipeline(final boolean skipPipeline) { * * @return true if the pipeline has been resolved */ - public boolean skipPipeline() { + public boolean getSkipPipeline() { return this.skipPipeline; } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index d39ec86277020..678c1efcf32fd 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -307,7 +307,7 @@ public static boolean resolvePipelines( } private static void setPipelineOrSkip(IndexRequest indexRequest, String pipelineName) { - if (indexRequest.skipPipeline()) { + if (indexRequest.getSkipPipeline()) { indexRequest.setPipeline(NOOP_PIPELINE_NAME); } else { indexRequest.setPipeline(pipelineName); diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java index a83e6a8949878..c781e12f13206 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java @@ -122,7 +122,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC indexRequest.id(request.param("id")); indexRequest.routing(request.param("routing")); indexRequest.setPipeline(request.param("pipeline")); - indexRequest.skipPipeline(request.paramAsBoolean("skip_pipeline", false)); + indexRequest.setSkipPipeline(request.paramAsBoolean("skip_pipeline", false)); indexRequest.source(request.requiredContent(), request.getXContentType()); indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT)); indexRequest.setRefreshPolicy(request.param("refresh")); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java index ffaef5dd3ca18..5483ede8d3bd7 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java @@ -336,9 +336,9 @@ public void testSkipPipeline() throws IOException { assertThat(indexRequests, Matchers.hasSize(2)); final IndexRequest first = indexRequests.get(0); final IndexRequest second = indexRequests.get(1); - assertThat(first.skipPipeline(), is(true)); + assertThat(first.getSkipPipeline(), is(true)); assertThat(first.getPipeline(), is("foo")); - assertThat(second.skipPipeline(), is(false)); + assertThat(second.getSkipPipeline(), is(false)); assertThat(second.getPipeline(), is("foo")); } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 4a965b2d49af8..3e2d9f42a3d4c 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -1856,7 +1856,7 @@ public void testResolvePipelinesWithSkipPipeline() { // no pipeline: { Metadata metadata = Metadata.builder().build(); - IndexRequest indexRequest = new IndexRequest("idx").skipPipeline(true); + IndexRequest indexRequest = new IndexRequest("idx").setSkipPipeline(true); boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata); assertThat(result, is(false)); assertThat(indexRequest.isPipelineResolved(), is(true)); @@ -1866,7 +1866,7 @@ public void testResolvePipelinesWithSkipPipeline() { // request pipeline: { Metadata metadata = Metadata.builder().build(); - IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline").skipPipeline(true); + IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline").setSkipPipeline(true); boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata); assertThat(result, is(false)); assertThat(indexRequest.isPipelineResolved(), is(true)); @@ -1880,7 +1880,7 @@ public void testResolvePipelinesWithSkipPipeline() { .numberOfShards(1) .numberOfReplicas(0); Metadata metadata = Metadata.builder().put(builder).build(); - IndexRequest indexRequest = new IndexRequest("idx").skipPipeline(true); + IndexRequest indexRequest = new IndexRequest("idx").setSkipPipeline(true); boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata); assertThat(result, is(false)); assertThat(indexRequest.isPipelineResolved(), is(true)); @@ -1894,7 +1894,7 @@ public void testResolvePipelinesWithSkipPipeline() { .numberOfShards(1) .numberOfReplicas(0); Metadata metadata = Metadata.builder().put(builder).build(); - IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline").skipPipeline(true); + IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline").setSkipPipeline(true); boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata); assertThat(result, is(false)); assertThat(indexRequest.isPipelineResolved(), is(true)); @@ -1908,7 +1908,7 @@ public void testResolvePipelinesWithSkipPipeline() { .numberOfShards(1) .numberOfReplicas(0); Metadata metadata = Metadata.builder().put(builder).build(); - IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline").skipPipeline(true); + IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline").setSkipPipeline(true); boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata); assertThat(result, is(true)); assertThat(indexRequest.isPipelineResolved(), is(true)); diff --git a/server/src/test/java/org/elasticsearch/rest/action/document/RestIndexActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/document/RestIndexActionTests.java index b24614dfc0c10..5496706ff9029 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/document/RestIndexActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/document/RestIndexActionTests.java @@ -128,7 +128,7 @@ public void testSkipPipeline() { verifyingClient.setExecuteVerifier((actionType, request) -> { assertThat(request, instanceOf(IndexRequest.class)); assertThat(((IndexRequest) request).getPipeline(), equalTo(pipelineName)); - assertThat(((IndexRequest) request).skipPipeline(), equalTo(true)); + assertThat(((IndexRequest) request).getSkipPipeline(), equalTo(true)); return new IndexResponse(new ShardId("test", "test", 0), "id", 0, 0, 0, true); }); RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) From 99e7291881d06bf2e70c637453ac593298fdd39a Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Wed, 18 Jan 2023 09:13:41 -0600 Subject: [PATCH 5/6] Fix typo in docs Co-authored-by: Abdon Pijpelink --- docs/reference/rest-api/common-parms.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/reference/rest-api/common-parms.asciidoc b/docs/reference/rest-api/common-parms.asciidoc index 137ec135cff6c..7ece50b122470 100644 --- a/docs/reference/rest-api/common-parms.asciidoc +++ b/docs/reference/rest-api/common-parms.asciidoc @@ -719,7 +719,7 @@ end::pipeline[] tag::skip_pipeline[] `skip_pipeline`:: (Optional, Boolean) If `true`, the pipeline (whether explicit or default) is not -executed. However the final pipeline (if one exists) is still executed. This paramter +executed. However the final pipeline (if one exists) is still executed. This parameter is meant to support integrations that run pipelines external to {es}. end::skip_pipeline[] From 2ca3a17443853a476413ba8f81e8603e1cb51b24 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Wed, 18 Jan 2023 09:22:46 -0600 Subject: [PATCH 6/6] adding a comment and default value for skipPipeline --- .../java/org/elasticsearch/action/index/IndexRequest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 01f0eddea0e33..e85a75bd8ba3c 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -97,7 +97,10 @@ public class IndexRequest extends ReplicatedWriteRequest implement private XContentType contentType; private String pipeline; - private boolean skipPipeline; + /** + * If true, the request pipeline and default pipeline are to be skipped (the final pipeline will still be executed if it exists). + */ + private boolean skipPipeline = false; private String finalPipeline; private boolean isPipelineResolved;