Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 10.8.5
- Feat: assert returned item count from _bulk [#997](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/997)

## 10.8.4
- Fixed an issue where a retried request would drop "update" parameters [#800](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/800)

Expand Down
10 changes: 9 additions & 1 deletion lib/logstash/plugin_mixins/elasticsearch/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,16 @@ def submit(actions)
return
end

responses = bulk_response["items"]
if responses.size != actions.size # can not map action -> response reliably
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be more explicit in the comment saying
"to deal with weird behavior on ES 7.10.2, 7.11.1 where a _bulk request to
index X documents returned X+Y entries"

# an ES bug (on 7.10.2, 7.11.1) where a _bulk request to index X documents would return Y (> X) items
msg = "Sent #{actions.size} documents but Elasticsearch returned #{responses.size} responses"
@logger.warn(msg, actions: actions, responses: responses)
fail("#{msg} (likely a bug with _bulk endpoint)")
end

actions_to_retry = []
bulk_response["items"].each_with_index do |response,idx|
responses.each_with_index do |response,idx|
action_type, action_props = response.first

status = action_props["status"]
Expand Down
2 changes: 1 addition & 1 deletion logstash-output-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-elasticsearch'
s.version = '10.8.4'
s.version = '10.8.5'

s.licenses = ['apache-2.0']
s.summary = "Stores logs in Elasticsearch"
Expand Down
52 changes: 52 additions & 0 deletions spec/unit/outputs/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,58 @@
expect(subject.logger).to have_received(:debug).with(/Encountered a retryable error/i, anything)
end
end

context "unexpected bulk response" do
let(:options) do
{ "hosts" => "127.0.0.1:9999", "index" => "%{foo}", "manage_template" => false }
end

let(:events) { [ ::LogStash::Event.new("foo" => "bar1"), ::LogStash::Event.new("foo" => "bar2") ] }

let(:bulk_response) do
# shouldn't really happen but we've seen this happen - here ES returns more items than were sent
{ "took"=>1, "ingest_took"=>9, "errors"=>true,
"items"=>[{"index"=>{"_index"=>"bar1", "_type"=>"_doc", "_id"=>nil, "status"=>500,
"error"=>{"type" => "illegal_state_exception",
"reason" => "pipeline with id [test-ingest] could not be loaded, caused by [ElasticsearchParseException[Error updating pipeline with id [test-ingest]]; nested: ElasticsearchException[java.lang.IllegalArgumentException: no enrich index exists for policy with name [test-metadata1]]; nested: IllegalArgumentException[no enrich index exists for policy with name [test-metadata1]];; ElasticsearchException[java.lang.IllegalArgumentException: no enrich index exists for policy with name [test-metadata1]]; nested: IllegalArgumentException[no enrich index exists for policy with name [test-metadata1]];; java.lang.IllegalArgumentException: no enrich index exists for policy with name [test-metadata1]]"
}
}
},
# NOTE: this is an artificial success (usually everything fails with a 500) but even if some doc where
# to succeed due the unexpected reponse items we can not clearly identify which actions to retry ...
{"index"=>{"_index"=>"bar2", "_type"=>"_doc", "_id"=>nil, "status"=>201}},
{"index"=>{"_index"=>"bar2", "_type"=>"_doc", "_id"=>nil, "status"=>500,
"error"=>{"type" => "illegal_state_exception",
"reason" => "pipeline with id [test-ingest] could not be loaded, caused by [ElasticsearchParseException[Error updating pipeline with id [test-ingest]]; nested: ElasticsearchException[java.lang.IllegalArgumentException: no enrich index exists for policy with name [test-metadata1]];"
}
}
}]
}
end

before(:each) do
allow(subject.client).to receive(:bulk_send).with(instance_of(StringIO)) do |stream|
expect( stream.string ).to include '"foo":"bar1"'
expect( stream.string ).to include '"foo":"bar2"'
end.and_return(bulk_response, {"errors"=>false}) # let's make it go away (second call) to not retry indefinitely
end

it "should retry submit" do
allow(subject.logger).to receive(:error).with(/Encountered an unexpected error/i, anything)
allow(subject.client).to receive(:bulk).and_call_original # track count

subject.multi_receive(events)

expect(subject.client).to have_received(:bulk).twice
end

it "should log specific error message" do
expect(subject.logger).to receive(:error).with(/Encountered an unexpected error/i,
hash_including(:error_message => 'Sent 2 documents but Elasticsearch returned 3 responses (likely a bug with _bulk endpoint)'))

subject.multi_receive(events)
end
end
end

context "with timeout set" do
Expand Down