Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/104585.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 104585
summary: Ingest correctly handle upsert operations and drop processors together
area: Ingest Node
type: bug
issues:
- 36746
Original file line number Diff line number Diff line change
@@ -1,94 +1,185 @@
---
teardown:
- do:
ingest.delete_pipeline:
id: "my_pipeline"
ignore: 404
- do:
indices.delete:
index: "test"
ignore_unavailable: true
- do:
ingest.delete_pipeline:
id: "my_pipeline"
ignore: 404

---
"Test Drop Processor":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description" : "pipeline with drop",
"processors" : [
{
"drop" : {
"if": "ctx.foo == 'bar'"
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description" : "pipeline with drop",
"processors" : [
{
"drop" : {
"if": "ctx.foo == 'bar'"
}
}
}
]
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: "1"
pipeline: "my_pipeline"
body: {
foo: "bar"
}
- match: { acknowledged: true }

- do:
index:
index: test
id: "1"
pipeline: "my_pipeline"
body: {
foo: "bar"
}

- do:
index:
index: test
id: "2"
pipeline: "my_pipeline"
body: {
foo: "blub"
}

- do:
catch: missing
get:
index: test
id: "1"
- match: { found: false }

- do:
get:
index: test
id: "2"
- match: { _source.foo: "blub" }

- do:
index:
index: test
id: "2"
pipeline: "my_pipeline"
body: {
foo: "blub"
}

- do:
catch: missing
get:
index: test
id: "1"
- match: { found: false }

- do:
get:
index: test
id: "2"
- match: { _source.foo: "blub" }

---
"Test Drop Processor On Failure":
- do:
ingest.put_pipeline:
id: "my_pipeline_with_failure"
body: >
{
"description" : "pipeline with on failure drop",
"processors": [
- do:
ingest.put_pipeline:
id: "my_pipeline_with_failure"
body: >
{
"description" : "pipeline with on failure drop",
"processors": [
{
"fail": {
"message": "failed",
"on_failure": [
{
"drop": {}
}
]
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: "3"
pipeline: "my_pipeline_with_failure"
body: {
foo: "bar"
}

- do:
catch: missing
get:
index: test
id: "3"

---
"Test Drop Processor with Upsert (_bulk)":
- skip:
version: ' - 7.17.17'
reason: 'https://github.com/elastic/elasticsearch/issues/36746 fixed in 7.17.18'
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"processors": [
{
"fail": {
"message": "failed",
"on_failure": [
{
"drop": {}
}
]
"drop": {
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: "3"
pipeline: "my_pipeline_with_failure"
body: {
foo: "bar"
}

- do:
catch: missing
get:
index: test
id: "3"
}
- match: { acknowledged: true }

- do:
bulk:
refresh: true
pipeline: "my_pipeline"
body:
- update:
_index: test
_id: 4
- '{"upsert":{"some":"fields"},"script":"ctx"}'
- match: { errors: false }
- match: { items.0.update._index: test }
- match: { items.0.update._id: "4" }
- match: { items.0.update._version: -3 }
- match: { items.0.update.result: noop }
- match: { items.0.update.status: 200 }

- do:
catch: missing
get:
index: test
id: "4"

---
"Test Drop Processor with Upsert (_update)":
- skip:
version: ' - 7.17.17'
reason: 'https://github.com/elastic/elasticsearch/issues/36746 fixed in 7.17.18'
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"processors": [
{
"drop": {
}
}
]
}
- match: { acknowledged: true }

- do:
indices.create:
index: test
body:
settings:
index:
default_pipeline: "my_pipeline"

- do:
update:
index: test
id: "5"
body:
script:
source: "ctx._source.foo = 'bar'"
upsert:
foo: "bar"

- match: { _index: test }
- match: { _id: "5" }
- match: { result: noop }

- do:
catch: missing
get:
index: test
id: "5"
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
---
teardown:
- do:
indices.delete:
index: "test"
ignore_unavailable: true
- do:
ingest.delete_pipeline:
id: "my_pipeline"
Expand All @@ -10,7 +14,7 @@ teardown:
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
body: >
{
"description": "_description",
"processors": [
Expand All @@ -36,7 +40,7 @@ teardown:
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
body: >
{
"description": "_description",
"processors": [
Expand Down Expand Up @@ -69,3 +73,87 @@ teardown:
index: test
id: "1"
- match: { _source.error_message: "fail_processor_ran" }

---
"Test Fail Processor with Upsert (bulk)":
- skip:
version: ' - 7.17.17'
reason: 'https://github.com/elastic/elasticsearch/issues/36746 fixed in 7.17.18'
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"processors": [
{
"fail": {
"message": "error-message"
}
}
]
}
- match: { acknowledged: true }

- do:
bulk:
refresh: true
pipeline: "my_pipeline"
body:
- update:
_index: test
_id: 3
- '{"upsert":{"some":"fields"},"script":"ctx"}'
- match: { errors: true }
- match: { items.0.update._index: test }
- match: { items.0.update._id: "3" }
- match: { items.0.update.status: 500 }
- match: { items.0.update.error.type: fail_processor_exception }
- match: { items.0.update.error.reason: /error-message/ }

- do:
catch: missing
get:
index: test
id: "3"

---
"Test Fail Processor with Upsert (_update)":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"processors": [
{
"fail": {
"message": "error-message"
}
}
]
}
- match: { acknowledged: true }

- do:
indices.create:
index: test
body:
settings:
index:
default_pipeline: "my_pipeline"

- do:
update:
index: test
id: "4"
body:
script:
source: "ctx._source.foo = 'bar'"
upsert:
foo: "bar"
catch: /error-message/

- do:
catch: missing
get:
index: test
id: "4"
Loading