Skip to content

Commit 384757d

Browse files
authored
ingest: support default pipelines + bulk upserts (#36618)
This commit adds support to enable bulk upserts to use an index's default pipeline. Bulk upsert, doc_as_upsert, and script_as_upsert are all supported. However, bulk script_as_upsert has slightly surprising behavior since the pipeline is executed _before_ the script is evaluated. This means that the pipeline only has access the data found in the upsert field of the script_as_upsert. The non-bulk script_as_upsert (existing behavior) runs the pipeline _after_ the script is executed. This commit does _not_ attempt to consolidate the bulk and non-bulk behavior for script_as_upsert. This commit also adds additional testing for the non-bulk behavior, which remains unchanged with this commit. fixes #36219
1 parent 75bfbe9 commit 384757d

File tree

5 files changed

+196
-20
lines changed

5 files changed

+196
-20
lines changed

modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/200_default_pipeline.yml

Lines changed: 95 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ teardown:
2323
]
2424
}
2525
- match: { acknowledged: true }
26-
26+
# default pipeline via index
2727
- do:
2828
indices.create:
2929
index: test
@@ -48,7 +48,7 @@ teardown:
4848
id: 1
4949
- match: { _source.bytes_source_field: "1kb" }
5050
- match: { _source.bytes_target_field: 1024 }
51-
51+
# default pipeline via alias
5252
- do:
5353
index:
5454
index: test_alias
@@ -63,28 +63,117 @@ teardown:
6363
id: 2
6464
- match: { _source.bytes_source_field: "1kb" }
6565
- match: { _source.bytes_target_field: 1024 }
66+
# default pipeline via upsert
67+
- do:
68+
update:
69+
index: test
70+
type: test
71+
id: 3
72+
body:
73+
script:
74+
source: "ctx._source.ran_script = true"
75+
lang: "painless"
76+
upsert: { "bytes_source_field":"1kb" }
77+
- do:
78+
get:
79+
index: test
80+
type: test
81+
id: 3
82+
- match: { _source.bytes_source_field: "1kb" }
83+
- match: { _source.bytes_target_field: 1024 }
84+
# default pipeline via scripted upsert
85+
- do:
86+
update:
87+
index: test
88+
type: test
89+
id: 4
90+
body:
91+
script:
92+
source: "ctx._source.bytes_source_field = '1kb'"
93+
lang: "painless"
94+
upsert : {}
95+
scripted_upsert: true
96+
- do:
97+
get:
98+
index: test
99+
type: test
100+
id: 4
101+
- match: { _source.bytes_source_field: "1kb" }
102+
- match: { _source.bytes_target_field: 1024 }
103+
# default pipeline via doc_as_upsert
104+
- do:
105+
update:
106+
index: test
107+
type: test
108+
id: 5
109+
body:
110+
doc: { "bytes_source_field":"1kb" }
111+
doc_as_upsert: true
112+
- do:
113+
get:
114+
index: test
115+
type: test
116+
id: 5
117+
- match: { _source.bytes_source_field: "1kb" }
118+
- match: { _source.bytes_target_field: 1024 }
119+
# default pipeline via bulk upsert
120+
# note - bulk scripted upsert's execute the pipeline before the script, so any data referenced by the pipeline
121+
# needs to be in the upsert, not the script
122+
- do:
123+
bulk:
124+
refresh: true
125+
body: |
126+
{"update":{"_id":"6","_index":"test","_type":"test"}}
127+
{"script":"ctx._source.ran_script = true","upsert":{"bytes_source_field":"1kb"}}
128+
{"update":{"_id":"7","_index":"test","_type":"test"}}
129+
{"doc":{"bytes_source_field":"2kb"}, "doc_as_upsert":true}
130+
{"update":{"_id":"8","_index":"test","_type":"test"}}
131+
{"script": "ctx._source.ran_script = true","upsert":{"bytes_source_field":"3kb"}, "scripted_upsert" : true}
66132
133+
- do:
134+
mget:
135+
body:
136+
docs:
137+
- { _index: "test", _type: "_doc", _id: "6" }
138+
- { _index: "test", _type: "_doc", _id: "7" }
139+
- { _index: "test", _type: "_doc", _id: "8" }
140+
- match: { docs.0._index: "test" }
141+
- match: { docs.0._id: "6" }
142+
- match: { docs.0._source.bytes_source_field: "1kb" }
143+
- match: { docs.0._source.bytes_target_field: 1024 }
144+
- is_false: docs.0._source.ran_script
145+
- match: { docs.1._index: "test" }
146+
- match: { docs.1._id: "7" }
147+
- match: { docs.1._source.bytes_source_field: "2kb" }
148+
- match: { docs.1._source.bytes_target_field: 2048 }
149+
- match: { docs.2._index: "test" }
150+
- match: { docs.2._id: "8" }
151+
- match: { docs.2._source.bytes_source_field: "3kb" }
152+
- match: { docs.2._source.bytes_target_field: 3072 }
153+
- match: { docs.2._source.ran_script: true }
154+
155+
# explicit no default pipeline
67156
- do:
68157
index:
69158
index: test
70159
type: test
71-
id: 3
160+
id: 9
72161
pipeline: "_none"
73162
body: {bytes_source_field: "1kb"}
74163

75164
- do:
76165
get:
77166
index: test
78167
type: test
79-
id: 3
168+
id: 9
80169
- match: { _source.bytes_source_field: "1kb" }
81170
- is_false: _source.bytes_target_field
82-
171+
# bad request
83172
- do:
84173
catch: bad_request
85174
index:
86175
index: test
87176
type: test
88-
id: 4
177+
id: 10
89178
pipeline: ""
90179
body: {bytes_source_field: "1kb"}

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,24 @@ public TransportBulkAction(ThreadPool threadPool, TransportService transportServ
127127
clusterService.addStateApplier(this.ingestForwarder);
128128
}
129129

130+
/**
131+
* Retrieves the {@link IndexRequest} from the provided {@link DocWriteRequest} for index or upsert actions. Upserts are
132+
* modeled as {@link IndexRequest} inside the {@link UpdateRequest}. Ignores {@link org.elasticsearch.action.delete.DeleteRequest}'s
133+
*
134+
* @param docWriteRequest The request to find the {@link IndexRequest}
135+
* @return the found {@link IndexRequest} or {@code null} if one can not be found.
136+
*/
137+
public static IndexRequest getIndexWriteRequest(DocWriteRequest docWriteRequest) {
138+
IndexRequest indexRequest = null;
139+
if (docWriteRequest instanceof IndexRequest) {
140+
indexRequest = (IndexRequest) docWriteRequest;
141+
} else if (docWriteRequest instanceof UpdateRequest) {
142+
UpdateRequest updateRequest = (UpdateRequest) docWriteRequest;
143+
indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest();
144+
}
145+
return indexRequest;
146+
}
147+
130148
@Override
131149
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
132150
final long startTime = relativeTime();
@@ -207,12 +225,12 @@ private void executeIngestAndBulk(Task task, final BulkRequest bulkRequest, fina
207225
final MetaData metaData = clusterService.state().getMetaData();
208226
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = metaData.indices();
209227
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
210-
if (actionRequest instanceof IndexRequest) {
211-
IndexRequest indexRequest = (IndexRequest) actionRequest;
228+
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
229+
if(indexRequest != null){
212230
String pipeline = indexRequest.getPipeline();
213231
if (pipeline == null) {
214-
IndexMetaData indexMetaData = indicesMetaData.get(indexRequest.index());
215-
if (indexMetaData == null) {
232+
IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index());
233+
if (indexMetaData == null && indexRequest.index() != null) {
216234
//check the alias
217235
AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index());
218236
if (indexOrAlias != null && indexOrAlias.isAlias()) {
@@ -626,7 +644,7 @@ ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis,
626644
}
627645

628646
void markCurrentItemAsDropped() {
629-
IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(currentSlot);
647+
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot));
630648
failedSlots.set(currentSlot);
631649
itemResponses.add(
632650
new BulkItemResponse(currentSlot, indexRequest.opType(),
@@ -639,7 +657,7 @@ void markCurrentItemAsDropped() {
639657
}
640658

641659
void markCurrentItemAsFailed(Exception e) {
642-
IndexRequest indexRequest = (IndexRequest) bulkRequest.requests().get(currentSlot);
660+
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(currentSlot));
643661
// We hit a error during preprocessing a request, so we:
644662
// 1) Remember the request item slot from the bulk, so that we're done processing all requests we know what failed
645663
// 2) Add a bulk item failure for this request

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@
2424
import org.elasticsearch.ResourceNotFoundException;
2525
import org.elasticsearch.action.ActionListener;
2626
import org.elasticsearch.action.DocWriteRequest;
27+
import org.elasticsearch.action.bulk.TransportBulkAction;
2728
import org.elasticsearch.action.index.IndexRequest;
2829
import org.elasticsearch.action.ingest.DeletePipelineRequest;
2930
import org.elasticsearch.action.ingest.PutPipelineRequest;
3031
import org.elasticsearch.action.support.master.AcknowledgedResponse;
31-
import org.elasticsearch.action.update.UpdateRequest;
3232
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
3333
import org.elasticsearch.cluster.ClusterChangedEvent;
3434
import org.elasticsearch.cluster.ClusterState;
@@ -388,13 +388,7 @@ public void onFailure(Exception e) {
388388
@Override
389389
protected void doRun() {
390390
for (DocWriteRequest<?> actionRequest : actionRequests) {
391-
IndexRequest indexRequest = null;
392-
if (actionRequest instanceof IndexRequest) {
393-
indexRequest = (IndexRequest) actionRequest;
394-
} else if (actionRequest instanceof UpdateRequest) {
395-
UpdateRequest updateRequest = (UpdateRequest) actionRequest;
396-
indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest();
397-
}
391+
IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest);
398392
if (indexRequest == null) {
399393
continue;
400394
}

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.action.index.IndexResponse;
2929
import org.elasticsearch.action.support.ActionFilters;
3030
import org.elasticsearch.action.support.AutoCreateIndex;
31+
import org.elasticsearch.action.update.UpdateRequest;
3132
import org.elasticsearch.cluster.ClusterChangedEvent;
3233
import org.elasticsearch.cluster.ClusterState;
3334
import org.elasticsearch.cluster.ClusterStateApplier;
@@ -408,6 +409,57 @@ public void testUseDefaultPipelineWithAlias() throws Exception {
408409
validateDefaultPipeline(new IndexRequest(WITH_DEFAULT_PIPELINE_ALIAS, "type", "id"));
409410
}
410411

412+
public void testUseDefaultPipelineWithBulkUpsert() throws Exception {
413+
Exception exception = new Exception("fake exception");
414+
BulkRequest bulkRequest = new BulkRequest();
415+
IndexRequest indexRequest1 = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id1").source(Collections.emptyMap());
416+
IndexRequest indexRequest2 = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id2").source(Collections.emptyMap());
417+
IndexRequest indexRequest3 = new IndexRequest(WITH_DEFAULT_PIPELINE, "type", "id3").source(Collections.emptyMap());
418+
UpdateRequest upsertRequest = new UpdateRequest(WITH_DEFAULT_PIPELINE, "type", "id1").upsert(indexRequest1).script(mockScript("1"));
419+
UpdateRequest docAsUpsertRequest = new UpdateRequest(WITH_DEFAULT_PIPELINE, "type", "id2").doc(indexRequest2).docAsUpsert(true);
420+
// this test only covers the mechanics that scripted bulk upserts will execute a default pipeline. However, in practice scripted
421+
// bulk upserts with a default pipeline are a bit surprising since the script executes AFTER the pipeline.
422+
UpdateRequest scriptedUpsert = new UpdateRequest(WITH_DEFAULT_PIPELINE, "type", "id2").upsert(indexRequest3).script(mockScript("1"))
423+
.scriptedUpsert(true);
424+
bulkRequest.add(upsertRequest).add(docAsUpsertRequest).add(scriptedUpsert);
425+
426+
AtomicBoolean responseCalled = new AtomicBoolean(false);
427+
AtomicBoolean failureCalled = new AtomicBoolean(false);
428+
assertNull(indexRequest1.getPipeline());
429+
assertNull(indexRequest2.getPipeline());
430+
assertNull(indexRequest3.getPipeline());
431+
action.execute(null, bulkRequest, ActionListener.wrap(
432+
response -> {
433+
BulkItemResponse itemResponse = response.iterator().next();
434+
assertThat(itemResponse.getFailure().getMessage(), containsString("fake exception"));
435+
responseCalled.set(true);
436+
},
437+
e -> {
438+
assertThat(e, sameInstance(exception));
439+
failureCalled.set(true);
440+
}));
441+
442+
// check failure works, and passes through to the listener
443+
assertFalse(action.isExecuted); // haven't executed yet
444+
assertFalse(responseCalled.get());
445+
assertFalse(failureCalled.get());
446+
verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any());
447+
assertEquals(indexRequest1.getPipeline(), "default_pipeline");
448+
assertEquals(indexRequest2.getPipeline(), "default_pipeline");
449+
assertEquals(indexRequest3.getPipeline(), "default_pipeline");
450+
completionHandler.getValue().accept(exception);
451+
assertTrue(failureCalled.get());
452+
453+
// now check success of the transport bulk action
454+
indexRequest1.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
455+
indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
456+
indexRequest3.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
457+
completionHandler.getValue().accept(null);
458+
assertTrue(action.isExecuted);
459+
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
460+
verifyZeroInteractions(transportService);
461+
}
462+
411463
public void testCreateIndexBeforeRunPipeline() throws Exception {
412464
Exception exception = new Exception("fake exception");
413465
IndexRequest indexRequest = new IndexRequest("missing_index", "type", "id");
@@ -445,6 +497,7 @@ private void validateDefaultPipeline(IndexRequest indexRequest) {
445497
indexRequest.source(Collections.emptyMap());
446498
AtomicBoolean responseCalled = new AtomicBoolean(false);
447499
AtomicBoolean failureCalled = new AtomicBoolean(false);
500+
assertNull(indexRequest.getPipeline());
448501
singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap(
449502
response -> {
450503
responseCalled.set(true);
@@ -459,6 +512,7 @@ private void validateDefaultPipeline(IndexRequest indexRequest) {
459512
assertFalse(responseCalled.get());
460513
assertFalse(failureCalled.get());
461514
verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture(), any());
515+
assertEquals(indexRequest.getPipeline(), "default_pipeline");
462516
completionHandler.getValue().accept(exception);
463517
assertTrue(failureCalled.get());
464518

server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
2424
import org.elasticsearch.action.bulk.TransportBulkActionTookTests.Resolver;
2525
import org.elasticsearch.action.delete.DeleteRequest;
26+
import org.elasticsearch.action.index.IndexRequest;
2627
import org.elasticsearch.action.support.ActionFilters;
2728
import org.elasticsearch.action.support.AutoCreateIndex;
29+
import org.elasticsearch.action.update.UpdateRequest;
2830
import org.elasticsearch.cluster.service.ClusterService;
2931
import org.elasticsearch.common.settings.Settings;
3032
import org.elasticsearch.common.unit.TimeValue;
@@ -132,4 +134,23 @@ public void testDeleteNonExistingDocExternalGteVersionCreatesIndex() throws Exce
132134
throw new AssertionError(exception);
133135
}));
134136
}
137+
138+
public void testGetIndexWriteRequest() throws Exception {
139+
IndexRequest indexRequest = new IndexRequest("index", "type", "id1").source(Collections.emptyMap());
140+
UpdateRequest upsertRequest = new UpdateRequest("index", "type", "id1").upsert(indexRequest).script(mockScript("1"));
141+
UpdateRequest docAsUpsertRequest = new UpdateRequest("index", "type", "id2").doc(indexRequest).docAsUpsert(true);
142+
UpdateRequest scriptedUpsert = new UpdateRequest("index", "type", "id2").upsert(indexRequest).script(mockScript("1"))
143+
.scriptedUpsert(true);
144+
145+
assertEquals(TransportBulkAction.getIndexWriteRequest(indexRequest), indexRequest);
146+
assertEquals(TransportBulkAction.getIndexWriteRequest(upsertRequest), indexRequest);
147+
assertEquals(TransportBulkAction.getIndexWriteRequest(docAsUpsertRequest), indexRequest);
148+
assertEquals(TransportBulkAction.getIndexWriteRequest(scriptedUpsert), indexRequest);
149+
150+
DeleteRequest deleteRequest = new DeleteRequest("index", "id");
151+
assertNull(TransportBulkAction.getIndexWriteRequest(deleteRequest));
152+
153+
UpdateRequest badUpsertRequest = new UpdateRequest("index", "type", "id1");
154+
assertNull(TransportBulkAction.getIndexWriteRequest(badUpsertRequest));
155+
}
135156
}

0 commit comments

Comments
 (0)