Skip to content

Commit f7b75da

Browse files
committed
Fix close behavior to actually shutdown workers
Fixes #90
1 parent c594f18 commit f7b75da

File tree

4 files changed

+40
-17
lines changed

4 files changed

+40
-17
lines changed

CHANGELOG.md

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
1-
## 3.1.1
2-
- Relax constraint on logstash-core-plugin-api to >= 1.60 <= 2.99
3-
4-
## 3.1.0
5-
- breaking,config: Remove deprecated config `endpoint_region`. Please use `region` instead.
6-
1+
## 3.1.2
2+
- Fix improper shutdown of output worker threads
3+
- improve exception handling
74
## 3.0.1
85
- Republish all the gems under jruby.
96

lib/logstash/outputs/s3.rb

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base
135135

136136
# Exposed attributes for testing purpose.
137137
attr_accessor :tempfile
138-
attr_reader :page_counter
138+
attr_reader :page_counter, :upload_workers
139139
attr_reader :s3
140140

141141
def aws_s3_config
@@ -421,15 +421,11 @@ def configure_upload_workers
421421
Stud::Task.new do
422422
LogStash::Util::set_thread_name("<S3 upload worker #{worker_id}")
423423

424-
while true do
424+
continue = true
425+
while continue do
425426
@logger.debug("S3: upload worker is waiting for a new file to upload.", :worker_id => worker_id)
426427

427-
begin
428-
upload_worker
429-
rescue Exception => ex
430-
@logger.error('upload_worker unhandled exception', :ex => ex, :backtrace => ex.backtrace)
431-
raise LogStash::Error, 'S3: uploader thread exited unexpectedly'
432-
end
428+
continue = upload_worker
433429
end
434430
end
435431
end
@@ -441,19 +437,22 @@ def upload_worker
441437
begin
442438
file = @upload_queue.deq
443439

444-
if file.is_a? LogStash::ShutdownEvent
440+
if file == LogStash::SHUTDOWN
445441
@logger.debug("S3: upload worker is shutting down gracefuly")
446442
@upload_queue.enq(LogStash::SHUTDOWN)
443+
false
447444
else
448445
@logger.debug("S3: upload working is uploading a new file", :filename => File.basename(file))
449446
move_file_to_bucket(file)
447+
true
450448
end
451449
rescue Exception => ex
452450
@logger.error("failed to upload, will re-enqueue #{file} for upload",
453451
:ex => ex, :backtrace => ex.backtrace)
454-
unless file.nil?
452+
unless file.nil? # Rare case if the first line of the begin doesn't execute
455453
@upload_queue.enq(file)
456454
end
455+
true
457456
end
458457
end
459458

logstash-output-s3.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-output-s3'
4-
s.version = '3.1.1'
4+
s.version = '3.1.2'
55
s.licenses = ['Apache License (2.0)']
66
s.summary = "This plugin was created for store the logstash's events into Amazon Simple Storage Service (Amazon S3)"
77
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/outputs/s3_spec.rb

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,33 @@
302302
end
303303
end
304304

305+
describe "closing" do
306+
let(:options) do
307+
{
308+
"access_key_id" => 1234,
309+
"secret_access_key" => "secret",
310+
"bucket" => "mahbucket"
311+
}
312+
end
313+
subject do
314+
::LogStash::Outputs::S3.new(options)
315+
end
316+
317+
before do
318+
subject.register
319+
end
320+
321+
it "should be clean" do
322+
subject.do_close
323+
end
324+
325+
it "should remove all worker threads" do
326+
subject.do_close
327+
sleep 1
328+
expect(subject.upload_workers.map(&:thread).any?(&:alive?)).to be false
329+
end
330+
end
331+
305332
it "doesn't skip events if using the time_file option", :tag => :slow do
306333
Stud::Temporary.directory do |temporary_directory|
307334
time_file = rand(1..2)

0 commit comments

Comments
 (0)