From 4daaf614cac760ca4b08c66cc77325fd19b1e9aa Mon Sep 17 00:00:00 2001 From: kares Date: Tue, 16 Feb 2021 10:37:45 +0100 Subject: [PATCH 1/4] Feat: assert returned item count from _bulk we've seen some weird behavior on ES 7.10.2 where a _bulk request to index 69 documents returned 135 entries this leads to an ugly `NoMethodError: undefined method ``[]' for nil:NilClass` resolves #989 --- .../plugin_mixins/elasticsearch/common.rb | 9 +++- spec/unit/outputs/elasticsearch_spec.rb | 52 +++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/lib/logstash/plugin_mixins/elasticsearch/common.rb b/lib/logstash/plugin_mixins/elasticsearch/common.rb index 404adaf61..b48bfe0f0 100644 --- a/lib/logstash/plugin_mixins/elasticsearch/common.rb +++ b/lib/logstash/plugin_mixins/elasticsearch/common.rb @@ -204,8 +204,15 @@ def submit(actions) return end + responses = bulk_response["items"] + if responses.size != actions.size # can not map action -> response reliably + msg = "Sent #{actions.size} documents but Elasticsearch returned #{responses.size} responses" + @logger.debug? && @logger.debug(msg, actions: actions, responses: responses) + fail("#{msg} (likely a bug with _bulk endpoint)") + end + actions_to_retry = [] - bulk_response["items"].each_with_index do |response,idx| + responses.each_with_index do |response,idx| action_type, action_props = response.first status = action_props["status"] diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index 9f3c1ecd4..7b52ed71e 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -296,6 +296,58 @@ expect(subject.logger).to have_received(:debug).with(/Encountered a retryable error/i, anything) end end + + context "unexpected bulk response" do + let(:options) do + { "hosts" => "127.0.0.1:9999", "index" => "%{foo}", "manage_template" => false } + end + + let(:events) { [ ::LogStash::Event.new("foo" => "bar1"), ::LogStash::Event.new("foo" => "bar2") ] } + + let(:bulk_response) do + # shouldn't really happen but we've seen this happen - here ES returns more items than were sent + { "took"=>1, "ingest_took"=>9, "errors"=>true, + "items"=>[{"index"=>{"_index"=>"bar1", "_type"=>"_doc", "_id"=>nil, "status"=>500, + "error"=>{"type" => "illegal_state_exception", + "reason" => "pipeline with id [test-ingest] could not be loaded, caused by [ElasticsearchParseException[Error updating pipeline with id [test-ingest]]; nested: ElasticsearchException[java.lang.IllegalArgumentException: no enrich index exists for policy with name [test-metadata1]]; nested: IllegalArgumentException[no enrich index exists for policy with name [test-metadata1]];; ElasticsearchException[java.lang.IllegalArgumentException: no enrich index exists for policy with name [test-metadata1]]; nested: IllegalArgumentException[no enrich index exists for policy with name [test-metadata1]];; java.lang.IllegalArgumentException: no enrich index exists for policy with name [test-metadata1]]" + } + } + }, + # NOTE: this is an artificial success (usually everything fails with a 500) but even if some doc where + # to succeed due the unexpected reponse items we can not clearly identify which actions to retry ... + {"index"=>{"_index"=>"bar2", "_type"=>"_doc", "_id"=>nil, "status"=>201}}, + {"index"=>{"_index"=>"bar2", "_type"=>"_doc", "_id"=>nil, "status"=>500, + "error"=>{"type" => "illegal_state_exception", + "reason" => "pipeline with id [test-ingest] could not be loaded, caused by [ElasticsearchParseException[Error updating pipeline with id [test-ingest]]; nested: ElasticsearchException[java.lang.IllegalArgumentException: no enrich index exists for policy with name [test-metadata1]];" + } + } + }] + } + end + + before(:each) do + allow(subject.client).to receive(:bulk_send).with(instance_of(StringIO)) do |stream| + expect( stream.string ).to include '"foo":"bar1"' + expect( stream.string ).to include '"foo":"bar2"' + end.and_return(bulk_response, {"errors"=>false}) # let's make it go away (second call) to not retry indefinitely + end + + it "should retry submit" do + allow(subject.logger).to receive(:error).with(/Encountered an unexpected error/i, anything) + allow(subject.client).to receive(:bulk).and_call_original # track count + + subject.multi_receive(events) + + expect(subject.client).to have_received(:bulk).twice + end + + it "should log specific error message" do + expect(subject.logger).to receive(:error).with(/Encountered an unexpected error/i, + hash_including(:error_message => 'Sent 2 documents but Elasticsearch returned 3 responses (likely a bug with _bulk endpoint)')) + + subject.multi_receive(events) + end + end end context "with timeout set" do From f8d289a90cb91f439cf9a994d58adf5ceee78878 Mon Sep 17 00:00:00 2001 From: kares Date: Thu, 4 Mar 2021 10:14:55 +0100 Subject: [PATCH 2/4] switch to warn level instead of debug --- lib/logstash/plugin_mixins/elasticsearch/common.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logstash/plugin_mixins/elasticsearch/common.rb b/lib/logstash/plugin_mixins/elasticsearch/common.rb index b48bfe0f0..c11ccc117 100644 --- a/lib/logstash/plugin_mixins/elasticsearch/common.rb +++ b/lib/logstash/plugin_mixins/elasticsearch/common.rb @@ -207,7 +207,7 @@ def submit(actions) responses = bulk_response["items"] if responses.size != actions.size # can not map action -> response reliably msg = "Sent #{actions.size} documents but Elasticsearch returned #{responses.size} responses" - @logger.debug? && @logger.debug(msg, actions: actions, responses: responses) + @logger.warn(msg, actions: actions, responses: responses) fail("#{msg} (likely a bug with _bulk endpoint)") end From 87133727d9e31f4a84d00f5a25c5b3963181fb21 Mon Sep 17 00:00:00 2001 From: kares Date: Thu, 4 Mar 2021 10:15:36 +0100 Subject: [PATCH 3/4] bump + changelog --- CHANGELOG.md | 3 +++ logstash-output-elasticsearch.gemspec | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 222209979..77a2ea592 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 10.8.5 + - Feat: assert returned item count from _bulk [#997](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/997) + ## 10.8.4 - Fixed an issue where a retried request would drop "update" parameters [#800](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/800) diff --git a/logstash-output-elasticsearch.gemspec b/logstash-output-elasticsearch.gemspec index b2a695423..344f0c4a7 100644 --- a/logstash-output-elasticsearch.gemspec +++ b/logstash-output-elasticsearch.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-elasticsearch' - s.version = '10.8.4' + s.version = '10.8.5' s.licenses = ['apache-2.0'] s.summary = "Stores logs in Elasticsearch" From 53cf6f0f2fe75784b7c523370061baeb33dea6ee Mon Sep 17 00:00:00 2001 From: kares Date: Thu, 4 Mar 2021 12:39:11 +0100 Subject: [PATCH 4/4] Chore: a comment for future generations --- lib/logstash/plugin_mixins/elasticsearch/common.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/logstash/plugin_mixins/elasticsearch/common.rb b/lib/logstash/plugin_mixins/elasticsearch/common.rb index c11ccc117..7ba2d8e22 100644 --- a/lib/logstash/plugin_mixins/elasticsearch/common.rb +++ b/lib/logstash/plugin_mixins/elasticsearch/common.rb @@ -206,6 +206,7 @@ def submit(actions) responses = bulk_response["items"] if responses.size != actions.size # can not map action -> response reliably + # an ES bug (on 7.10.2, 7.11.1) where a _bulk request to index X documents would return Y (> X) items msg = "Sent #{actions.size} documents but Elasticsearch returned #{responses.size} responses" @logger.warn(msg, actions: actions, responses: responses) fail("#{msg} (likely a bug with _bulk endpoint)")