Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/93016.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 93016
summary: Adding the ability to skip the ingest pipeline
area: Ingest Node
type: enhancement
issues: []
2 changes: 2 additions & 0 deletions docs/reference/docs/bulk.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ on.

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=pipeline]

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=skip_pipeline]

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=refresh]

`require_alias`::
Expand Down
2 changes: 2 additions & 0 deletions docs/reference/docs/index_.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ required. See <<add-documents-to-a-data-stream>>.

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=pipeline]

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=skip_pipeline]

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=refresh]

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=routing]
Expand Down
7 changes: 7 additions & 0 deletions docs/reference/rest-api/common-parms.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,13 @@ tag::pipeline[]
(Optional, string) ID of the pipeline to use to preprocess incoming documents.
end::pipeline[]

tag::skip_pipeline[]
`skip_pipeline`::
(Optional, Boolean) If `true`, the pipeline (whether explicit or default) is not
executed. However the final pipeline (if one exists) is still executed. This parameter
is meant to support integrations that run pipelines external to {es}.
end::skip_pipeline[]

tag::pages-processed[]
The number of search or bulk index operations processed. Documents are
processed in batches instead of individually.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,38 @@ teardown:
index: test_index
id: 1
- match: { _source: { my_field: "upper" } }

---
"Test bulk request with skip_pipeline":

- do:
bulk:
body:
- index:
_index: test_index
_id: test_id1
pipeline: pipeline1
skip_pipeline: true
- f1: v1
- index:
_index: test_index
pipeline: pipeline2
_id: test_id2
- f1: v2
- gte: { ingest_took: 0 }

- do:
get:
index: test_index
id: test_id1

- is_false: _source.field1
- is_false: _source.field2

- do:
get:
index: test_index
id: test_id2

- is_false: _source.field1
- match: {_source.field2: value2}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public final class BulkRequestParser {
private static final ParseField VERSION_TYPE = new ParseField("version_type");
private static final ParseField RETRY_ON_CONFLICT = new ParseField("retry_on_conflict");
private static final ParseField PIPELINE = new ParseField("pipeline");
private static final ParseField SKIP_PIPELINE = new ParseField("skip_pipeline");
private static final ParseField SOURCE = new ParseField("_source");
private static final ParseField IF_SEQ_NO = new ParseField("if_seq_no");
private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term");
Expand Down Expand Up @@ -206,6 +207,7 @@ public void parse(
long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
int retryOnConflict = 0;
String pipeline = defaultPipeline;
boolean skipPipeline = false;
boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias;
Map<String, String> dynamicTemplates = Map.of();

Expand Down Expand Up @@ -256,6 +258,8 @@ public void parse(
retryOnConflict = parser.intValue();
} else if (PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
pipeline = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
} else if (SKIP_PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
skipPipeline = parser.booleanValue();
} else if (SOURCE.match(currentFieldName, parser.getDeprecationHandler())) {
fetchSourceContext = FetchSourceContext.fromXContent(parser);
} else if (REQUIRE_ALIAS.match(currentFieldName, parser.getDeprecationHandler())) {
Expand Down Expand Up @@ -339,6 +343,7 @@ public void parse(
.version(version)
.versionType(versionType)
.setPipeline(pipeline)
.setSkipPipeline(skipPipeline)
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
Expand All @@ -354,6 +359,7 @@ public void parse(
.versionType(versionType)
.create("create".equals(opType))
.setPipeline(pipeline)
.setSkipPipeline(skipPipeline)
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
Expand All @@ -370,6 +376,7 @@ public void parse(
.versionType(versionType)
.create(true)
.setPipeline(pipeline)
.setSkipPipeline(skipPipeline)
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
Expand Down Expand Up @@ -411,6 +418,7 @@ public void parse(
IndexRequest upsertRequest = updateRequest.upsertRequest();
if (upsertRequest != null) {
upsertRequest.setPipeline(pipeline);
upsertRequest.setSkipPipeline(skipPipeline);
}

updateRequestConsumer.accept(updateRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
private XContentType contentType;

private String pipeline;
/**
* If true, the request pipeline and default pipeline are to be skipped (the final pipeline will still be executed if it exists).
*/
private boolean skipPipeline = false;
private String finalPipeline;

private boolean isPipelineResolved;
Expand Down Expand Up @@ -139,6 +143,11 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
version = in.readLong();
versionType = VersionType.fromValue(in.readByte());
pipeline = in.readOptionalString();
if (in.getVersion().onOrAfter(Version.V_8_7_0)) {
skipPipeline = in.readBoolean();
} else {
skipPipeline = false;
}
if (in.getVersion().onOrAfter(Version.V_7_5_0)) {
finalPipeline = in.readOptionalString();
}
Expand Down Expand Up @@ -313,6 +322,26 @@ public String getPipeline() {
return this.pipeline;
}

/**
* Sets if the pipeline for this request should be skipped.
*
* @param skipPipeline true if the pipeline should be skipped
* @return the request
*/
public IndexRequest setSkipPipeline(final boolean skipPipeline) {
this.skipPipeline = skipPipeline;
return this;
}

/**
* Returns whether or not the pipeline for this request should be skipped.
*
* @return true if the pipeline has been resolved
*/
public boolean getSkipPipeline() {
return this.skipPipeline;
}

/**
* Sets the final ingest pipeline to be executed before indexing the document.
*
Expand Down Expand Up @@ -683,6 +712,9 @@ private void writeBody(StreamOutput out) throws IOException {
out.writeLong(version);
out.writeByte(versionType.getValue());
out.writeOptionalString(pipeline);
if (out.getVersion().onOrAfter(Version.V_8_7_0)) {
out.writeBoolean(skipPipeline);
}
if (out.getVersion().onOrAfter(Version.V_7_5_0)) {
out.writeOptionalString(finalPipeline);
}
Expand Down
16 changes: 12 additions & 4 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public static boolean resolvePipelines(
if (IndexSettings.DEFAULT_PIPELINE.exists(indexSettings)) {
// find the default pipeline if one is defined from an existing index setting
defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
indexRequest.setPipeline(defaultPipeline);
setPipelineOrSkip(indexRequest, defaultPipeline);
}
if (IndexSettings.FINAL_PIPELINE.exists(indexSettings)) {
// find the final pipeline if one is defined from an existing index setting
Expand All @@ -255,7 +255,7 @@ public static boolean resolvePipelines(
if (IndexSettings.FINAL_PIPELINE.exists(settings)) {
finalPipeline = IndexSettings.FINAL_PIPELINE.get(settings);
}
indexRequest.setPipeline(Objects.requireNonNullElse(defaultPipeline, NOOP_PIPELINE_NAME));
setPipelineOrSkip(indexRequest, Objects.requireNonNullElse(defaultPipeline, NOOP_PIPELINE_NAME));
indexRequest.setFinalPipeline(Objects.requireNonNullElse(finalPipeline, NOOP_PIPELINE_NAME));
} else {
List<IndexTemplateMetadata> templates = MetadataIndexTemplateService.findV1Templates(
Expand All @@ -279,13 +279,13 @@ public static boolean resolvePipelines(
break;
}
}
indexRequest.setPipeline(Objects.requireNonNullElse(defaultPipeline, NOOP_PIPELINE_NAME));
setPipelineOrSkip(indexRequest, Objects.requireNonNullElse(defaultPipeline, NOOP_PIPELINE_NAME));
indexRequest.setFinalPipeline(Objects.requireNonNullElse(finalPipeline, NOOP_PIPELINE_NAME));
}
}

if (requestPipeline != null) {
indexRequest.setPipeline(requestPipeline);
setPipelineOrSkip(indexRequest, requestPipeline);
}

/*
Expand All @@ -306,6 +306,14 @@ public static boolean resolvePipelines(
|| NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false;
}

private static void setPipelineOrSkip(IndexRequest indexRequest, String pipelineName) {
if (indexRequest.getSkipPipeline()) {
indexRequest.setPipeline(NOOP_PIPELINE_NAME);
} else {
indexRequest.setPipeline(pipelineName);
}
}

public ClusterService getClusterService() {
return clusterService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
indexRequest.id(request.param("id"));
indexRequest.routing(request.param("routing"));
indexRequest.setPipeline(request.param("pipeline"));
indexRequest.setSkipPipeline(request.paramAsBoolean("skip_pipeline", false));
indexRequest.source(request.requiredContent(), request.getXContentType());
indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));
indexRequest.setRefreshPolicy(request.param("refresh"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.hamcrest.Matchers.is;

public class BulkRequestParserTests extends ESTestCase {

public void testIndexRequest() throws IOException {
Expand Down Expand Up @@ -309,4 +311,35 @@ public void testFailOnInvalidAction() {
);
}

public void testSkipPipeline() throws IOException {
BytesArray request = new BytesArray("""
{ "index":{ "_index": "bar", "pipeline": "foo", "skip_pipeline": "true"} }
{}
{ "index":{ "_index": "bar", "pipeline": "foo", "routing": "blub" } }
{}
""");
BulkRequestParser parser = new BulkRequestParser(randomBoolean(), RestApiVersion.current());
final List<IndexRequest> indexRequests = new ArrayList<>();
parser.parse(
request,
null,
null,
null,
null,
null,
true,
XContentType.JSON,
(indexRequest, type) -> indexRequests.add(indexRequest),
req -> fail(),
req -> fail()
);
assertThat(indexRequests, Matchers.hasSize(2));
final IndexRequest first = indexRequests.get(0);
final IndexRequest second = indexRequests.get(1);
assertThat(first.getSkipPipeline(), is(true));
assertThat(first.getPipeline(), is("foo"));
assertThat(second.getSkipPipeline(), is(false));
assertThat(second.getPipeline(), is("foo"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1852,6 +1852,71 @@ public void testResolveRequestOrDefaultPipelineAndFinalPipeline() {
}
}

public void testResolvePipelinesWithSkipPipeline() {
// no pipeline:
{
Metadata metadata = Metadata.builder().build();
IndexRequest indexRequest = new IndexRequest("idx").setSkipPipeline(true);
boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata);
assertThat(result, is(false));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo(IngestService.NOOP_PIPELINE_NAME));
}

// request pipeline:
{
Metadata metadata = Metadata.builder().build();
IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline").setSkipPipeline(true);
boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata);
assertThat(result, is(false));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo(IngestService.NOOP_PIPELINE_NAME));
}

// default pipeline:
{
IndexMetadata.Builder builder = IndexMetadata.builder("idx")
.settings(settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default-pipeline"))
.numberOfShards(1)
.numberOfReplicas(0);
Metadata metadata = Metadata.builder().put(builder).build();
IndexRequest indexRequest = new IndexRequest("idx").setSkipPipeline(true);
boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata);
assertThat(result, is(false));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo(IngestService.NOOP_PIPELINE_NAME));
}

// request pipeline with default pipeline:
{
IndexMetadata.Builder builder = IndexMetadata.builder("idx")
.settings(settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default-pipeline"))
.numberOfShards(1)
.numberOfReplicas(0);
Metadata metadata = Metadata.builder().put(builder).build();
IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline").setSkipPipeline(true);
boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata);
assertThat(result, is(false));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo(IngestService.NOOP_PIPELINE_NAME));
}

// request pipeline with final pipeline:
{
IndexMetadata.Builder builder = IndexMetadata.builder("idx")
.settings(settings(Version.CURRENT).put(IndexSettings.FINAL_PIPELINE.getKey(), "final-pipeline"))
.numberOfShards(1)
.numberOfReplicas(0);
Metadata metadata = Metadata.builder().put(builder).build();
IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline").setSkipPipeline(true);
boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata);
assertThat(result, is(true));
assertThat(indexRequest.isPipelineResolved(), is(true));
assertThat(indexRequest.getPipeline(), equalTo(IngestService.NOOP_PIPELINE_NAME));
assertThat(indexRequest.getFinalPipeline(), equalTo("final-pipeline"));
}
}

public void testUpdatingRandomPipelineWithoutChangesIsNoOp() throws Exception {
var randomMap = randomMap(10, 50, IngestServiceTests::randomMapEntry);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,20 @@ public void testAutoIdWithType() {
dispatchRequest(deprecatedRequest);
assertCriticalWarnings(RestIndexAction.TYPES_DEPRECATION_MESSAGE);
}

public void testSkipPipeline() {
final String pipelineName = "foo";
verifyingClient.setExecuteVerifier((actionType, request) -> {
assertThat(request, instanceOf(IndexRequest.class));
assertThat(((IndexRequest) request).getPipeline(), equalTo(pipelineName));
assertThat(((IndexRequest) request).getSkipPipeline(), equalTo(true));
return new IndexResponse(new ShardId("test", "test", 0), "id", 0, 0, 0, true);
});
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST)
.withPath("/some_index/_doc")
.withParams(Map.of("pipeline", pipelineName, "skip_pipeline", "true"))
.withContent(new BytesArray("{}"), XContentType.JSON)
.build();
dispatchRequest(request);
}
}