-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Description
Elasticsearch Version
7.x, 8.x
Problem Description
If a "foreach" processor follows an "enrich" processor to handle the results of the the "enrich" processor but the "enrich" processor does not match anything a memory leak is possible.
Steps to Reproduce
Create the enrich policy and index:
PUT mysource/_doc/1
{
"tomatch" : "a",
"other" : ["foo", "bar"]
}
PUT /_enrich/policy/myenrich
{
"match" : {
"indices": [
"mysource"
],
"match_field": "tomatch",
"enrich_fields": [
"other"
]
}
}
POST /_enrich/policy/myenrich/_execute
Create the pipeline:
PUT _ingest/pipeline/test
{
"processors": [
{
"enrich": {
"policy_name": "myenrich",
"field": "tomatch",
"target_field": "matched"
}
},
{
"foreach": {
"field": "matched",
"processor": {
"append": {
"field": "matched2",
"value": "{{_ingest._value}}"
}
}
}
}
]
}
Execute the pipeline:
PUT test/_doc/2?pipeline=test
{
"tomatch" : "x"
}
Check the node stats:
GET _nodes/stats/ingest?filter_path=**.pipelines.test
Here you will notice that "count" will be 0 for the "foreach" processor, but "current" will remain incremented. Repeat the pipeline execution and you can see the "current" value continue to increase.
For example:
"foreach": {
"type": "foreach",
"stats": {
"count": 0,
"time_in_millis": 0,
"current": 4,
"failed": 0
}
}
Review the tasks:
GET _tasks?human
Here you will see that the write task never finished: "action": "indices:data/write/index"
Check the index
GET test/_search
No data is written.
In the above example change "tomatch" to "a" for the document getting index to allow the match to happen and all is well. This bug only presents when the match does not happen (more specifically if the {{_ingest._value}} of the foreach processor is null).
Workaround
Use the "if" condition of the foreach processor to ensure that {{_ingest._value}} is never null by checking the value of the "field". (you can not directly access the _ingest from the if condition - see #60470)
Example above with workaround:
PUT _ingest/pipeline/test
{
"processors": [
{
"enrich": {
"policy_name": "myenrich",
"field": "tomatch",
"target_field": "matched"
}
},
{
"foreach": {
"if": "ctx.matched != null",
"field": "matched",
"processor": {
"append": {
"field": "matched2",
"value": "{{_ingest._value}}"
}
}
}
}
]
}