diff --git a/CHANGELOG.md b/CHANGELOG.md index 222209979..77a2ea592 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 10.8.5 + - Feat: assert returned item count from _bulk [#997](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/997) + ## 10.8.4 - Fixed an issue where a retried request would drop "update" parameters [#800](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/800) diff --git a/lib/logstash/plugin_mixins/elasticsearch/common.rb b/lib/logstash/plugin_mixins/elasticsearch/common.rb index 404adaf61..7ba2d8e22 100644 --- a/lib/logstash/plugin_mixins/elasticsearch/common.rb +++ b/lib/logstash/plugin_mixins/elasticsearch/common.rb @@ -204,8 +204,16 @@ def submit(actions) return end + responses = bulk_response["items"] + if responses.size != actions.size # can not map action -> response reliably + # an ES bug (on 7.10.2, 7.11.1) where a _bulk request to index X documents would return Y (> X) items + msg = "Sent #{actions.size} documents but Elasticsearch returned #{responses.size} responses" + @logger.warn(msg, actions: actions, responses: responses) + fail("#{msg} (likely a bug with _bulk endpoint)") + end + actions_to_retry = [] - bulk_response["items"].each_with_index do |response,idx| + responses.each_with_index do |response,idx| action_type, action_props = response.first status = action_props["status"] diff --git a/logstash-output-elasticsearch.gemspec b/logstash-output-elasticsearch.gemspec index b2a695423..344f0c4a7 100644 --- a/logstash-output-elasticsearch.gemspec +++ b/logstash-output-elasticsearch.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-elasticsearch' - s.version = '10.8.4' + s.version = '10.8.5' s.licenses = ['apache-2.0'] s.summary = "Stores logs in Elasticsearch" diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index 9f3c1ecd4..7b52ed71e 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -296,6 +296,58 @@ expect(subject.logger).to have_received(:debug).with(/Encountered a retryable error/i, anything) end end + + context "unexpected bulk response" do + let(:options) do + { "hosts" => "127.0.0.1:9999", "index" => "%{foo}", "manage_template" => false } + end + + let(:events) { [ ::LogStash::Event.new("foo" => "bar1"), ::LogStash::Event.new("foo" => "bar2") ] } + + let(:bulk_response) do + # shouldn't really happen but we've seen this happen - here ES returns more items than were sent + { "took"=>1, "ingest_took"=>9, "errors"=>true, + "items"=>[{"index"=>{"_index"=>"bar1", "_type"=>"_doc", "_id"=>nil, "status"=>500, + "error"=>{"type" => "illegal_state_exception", + "reason" => "pipeline with id [test-ingest] could not be loaded, caused by [ElasticsearchParseException[Error updating pipeline with id [test-ingest]]; nested: ElasticsearchException[java.lang.IllegalArgumentException: no enrich index exists for policy with name [test-metadata1]]; nested: IllegalArgumentException[no enrich index exists for policy with name [test-metadata1]];; ElasticsearchException[java.lang.IllegalArgumentException: no enrich index exists for policy with name [test-metadata1]]; nested: IllegalArgumentException[no enrich index exists for policy with name [test-metadata1]];; java.lang.IllegalArgumentException: no enrich index exists for policy with name [test-metadata1]]" + } + } + }, + # NOTE: this is an artificial success (usually everything fails with a 500) but even if some doc where + # to succeed due the unexpected reponse items we can not clearly identify which actions to retry ... + {"index"=>{"_index"=>"bar2", "_type"=>"_doc", "_id"=>nil, "status"=>201}}, + {"index"=>{"_index"=>"bar2", "_type"=>"_doc", "_id"=>nil, "status"=>500, + "error"=>{"type" => "illegal_state_exception", + "reason" => "pipeline with id [test-ingest] could not be loaded, caused by [ElasticsearchParseException[Error updating pipeline with id [test-ingest]]; nested: ElasticsearchException[java.lang.IllegalArgumentException: no enrich index exists for policy with name [test-metadata1]];" + } + } + }] + } + end + + before(:each) do + allow(subject.client).to receive(:bulk_send).with(instance_of(StringIO)) do |stream| + expect( stream.string ).to include '"foo":"bar1"' + expect( stream.string ).to include '"foo":"bar2"' + end.and_return(bulk_response, {"errors"=>false}) # let's make it go away (second call) to not retry indefinitely + end + + it "should retry submit" do + allow(subject.logger).to receive(:error).with(/Encountered an unexpected error/i, anything) + allow(subject.client).to receive(:bulk).and_call_original # track count + + subject.multi_receive(events) + + expect(subject.client).to have_received(:bulk).twice + end + + it "should log specific error message" do + expect(subject.logger).to receive(:error).with(/Encountered an unexpected error/i, + hash_including(:error_message => 'Sent 2 documents but Elasticsearch returned 3 responses (likely a bug with _bulk endpoint)')) + + subject.multi_receive(events) + end + end end context "with timeout set" do