Skip to content

Commit b2ef074

Browse files
committed
size batch by _decompressed_ payload size
1 parent 6fc923b commit b2ef074

File tree

4 files changed

+77
-43
lines changed

4 files changed

+77
-43
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
## 10.8.6
22
- Fixed an issue where a single over-size event being rejected by Elasticsearch would cause the entire entire batch to be retried indefinitely. The oversize event will still be retried on its own and logging has been improved to include payload sizes in this situation [#972](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/972)
3+
- Fixed an issue with `http_compression => true` where a well-compressed payload could fit under our outbound 20MB limit but expand beyond Elasticsearch's 100MB limit, causing bulk failures. Bulk grouping is now determined entirely by the decompressed payload size [#823](https://github.com/logstash-plugins/logstash-output-elasticsearch/issues/823)
4+
- Improved debug-level logging about bulk requests.
35

46
## 10.8.5
57
- Feat: assert returned item count from _bulk [#997](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/997)

lib/logstash/outputs/elasticsearch/http_client.rb

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,8 @@ def bulk(actions)
109109
body_stream = StringIO.new
110110
if http_compression
111111
body_stream.set_encoding "BINARY"
112-
stream_writer = Zlib::GzipWriter.new(body_stream, Zlib::DEFAULT_COMPRESSION, Zlib::DEFAULT_STRATEGY)
113-
else
112+
stream_writer = gzip_writer(body_stream)
113+
else
114114
stream_writer = body_stream
115115
end
116116
bulk_responses = []
@@ -120,21 +120,39 @@ def bulk(actions)
120120
action.map {|line| LogStash::Json.dump(line)}.join("\n") :
121121
LogStash::Json.dump(action)
122122
as_json << "\n"
123-
if (body_stream.size + as_json.bytesize) > TARGET_BULK_BYTES && body_stream.size > 0
124-
logger.debug("Sending partial bulk request for batch with one or more actions remaining.", :action_count => batch_actions.size, :content_length => body_stream.size, :batch_offset => (index + 1 - batch_actions.size))
123+
if (stream_writer.pos + as_json.bytesize) > TARGET_BULK_BYTES && stream_writer.pos > 0
124+
stream_writer.flush # ensure writer has sync'd buffers before reporting sizes
125+
logger.debug("Sending partial bulk request for batch with one or more actions remaining.",
126+
:action_count => batch_actions.size,
127+
:payload_size => stream_writer.pos,
128+
:content_length => body_stream.size,
129+
:batch_offset => (index + 1 - batch_actions.size))
125130
bulk_responses << bulk_send(body_stream, batch_actions)
131+
body_stream.truncate(0) && body_stream.seek(0)
132+
stream_writer = gzip_writer(body_stream) if http_compression
126133
batch_actions.clear
127134
end
128135
stream_writer.write(as_json)
129136
batch_actions << action
130137
end
131138
stream_writer.close if http_compression
132-
logger.debug("Sending final bulk request for batch.", :action_count => batch_actions.size, :content_length => body_stream.size, :batch_offset => (actions.size - batch_actions.size))
139+
logger.debug("Sending final bulk request for batch.",
140+
:action_count => batch_actions.size,
141+
:payload_size => stream_writer.pos,
142+
:content_length => body_stream.size,
143+
:batch_offset => (actions.size - batch_actions.size))
133144
bulk_responses << bulk_send(body_stream, batch_actions) if body_stream.size > 0
134145
body_stream.close if !http_compression
135146
join_bulk_responses(bulk_responses)
136147
end
137148

149+
def gzip_writer(io)
150+
fail(ArgumentError, "Cannot create gzip writer on IO with unread bytes") unless io.eof?
151+
fail(ArgumentError, "Cannot create gzip writer on non-empty IO") unless io.pos == 0
152+
153+
Zlib::GzipWriter.new(io, Zlib::DEFAULT_COMPRESSION, Zlib::DEFAULT_STRATEGY)
154+
end
155+
138156
def join_bulk_responses(bulk_responses)
139157
{
140158
"errors" => bulk_responses.any? {|r| r["errors"] == true},
@@ -160,11 +178,6 @@ def bulk_send(body_stream, batch_actions)
160178
response.code, url, body_stream.to_s, response.body
161179
)
162180
end
163-
ensure
164-
if !body_stream.closed?
165-
body_stream.truncate(0)
166-
body_stream.seek(0)
167-
end
168181
end
169182

170183
def emulate_batch_error_response(actions, http_code, reason)

spec/unit/outputs/elasticsearch/http_client_spec.rb

Lines changed: 51 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -204,50 +204,69 @@
204204
end
205205

206206
describe "#bulk" do
207-
subject { described_class.new(base_options) }
207+
subject(:http_client) { described_class.new(base_options) }
208208

209209
require "json"
210210
let(:message) { "hey" }
211211
let(:actions) { [
212212
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message}],
213213
]}
214214

215-
context "if a message is over TARGET_BULK_BYTES" do
216-
let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES }
217-
let(:message) { "a" * (target_bulk_bytes + 1) }
215+
[true,false].each do |http_compression_enabled|
216+
context "with `http_compression => #{http_compression_enabled}`" do
218217

219-
it "should be handled properly" do
220-
allow(subject).to receive(:join_bulk_responses)
221-
expect(subject).to receive(:bulk_send).once do |data|
222-
expect(data.size).to be > target_bulk_bytes
218+
let(:base_options) { super().merge(:client_settings => {:http_compression => http_compression_enabled}) }
219+
220+
before(:each) do
221+
if http_compression_enabled
222+
expect(http_client).to receive(:gzip_writer).at_least(:once).and_call_original
223+
else
224+
expect(http_client).to_not receive(:gzip_writer)
225+
end
223226
end
224-
s = subject.send(:bulk, actions)
225-
end
226-
end
227227

228-
context "with two messages" do
229-
let(:message1) { "hey" }
230-
let(:message2) { "you" }
231-
let(:actions) { [
232-
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message1}],
233-
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message2}],
234-
]}
235-
it "executes one bulk_send operation" do
236-
allow(subject).to receive(:join_bulk_responses)
237-
expect(subject).to receive(:bulk_send).once
238-
s = subject.send(:bulk, actions)
239-
end
228+
context "if a message is over TARGET_BULK_BYTES" do
229+
let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES }
230+
let(:message) { "a" * (target_bulk_bytes + 1) }
231+
232+
it "should be handled properly" do
233+
allow(subject).to receive(:join_bulk_responses)
234+
expect(subject).to receive(:bulk_send).once do |data|
235+
if !http_compression_enabled
236+
expect(data.size).to be > target_bulk_bytes
237+
else
238+
expect(Zlib::gunzip(data.string).size).to be > target_bulk_bytes
239+
end
240+
end
241+
s = subject.send(:bulk, actions)
242+
end
243+
end
244+
245+
context "with two messages" do
246+
let(:message1) { "hey" }
247+
let(:message2) { "you" }
248+
let(:actions) { [
249+
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message1}],
250+
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message2}],
251+
]}
252+
it "executes one bulk_send operation" do
253+
allow(subject).to receive(:join_bulk_responses)
254+
expect(subject).to receive(:bulk_send).once
255+
s = subject.send(:bulk, actions)
256+
end
240257

241-
context "if one exceeds TARGET_BULK_BYTES" do
242-
let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES }
243-
let(:message1) { "a" * (target_bulk_bytes + 1) }
244-
it "executes two bulk_send operations" do
245-
allow(subject).to receive(:join_bulk_responses)
246-
expect(subject).to receive(:bulk_send).twice
247-
s = subject.send(:bulk, actions)
258+
context "if one exceeds TARGET_BULK_BYTES" do
259+
let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES }
260+
let(:message1) { "a" * (target_bulk_bytes + 1) }
261+
it "executes two bulk_send operations" do
262+
allow(subject).to receive(:join_bulk_responses)
263+
expect(subject).to receive(:bulk_send).twice
264+
s = subject.send(:bulk, actions)
265+
end
266+
end
248267
end
249-
end
250-
end
268+
end
269+
end
251270
end
252271

253272
describe "sniffing" do

spec/unit/outputs/elasticsearch_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@
328328
end
329329

330330
before(:each) do
331-
allow(subject.client).to receive(:bulk_send).with(instance_of(StringIO)) do |stream|
331+
allow(subject.client).to receive(:bulk_send).with(instance_of(StringIO), instance_of(Array)) do |stream, actions|
332332
expect( stream.string ).to include '"foo":"bar1"'
333333
expect( stream.string ).to include '"foo":"bar2"'
334334
end.and_return(bulk_response, {"errors"=>false}) # let's make it go away (second call) to not retry indefinitely

0 commit comments

Comments
 (0)