diff --git a/CHANGELOG.md b/CHANGELOG.md index f079199f..a275c031 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,6 @@ -## 3.1.1 - - Relax constraint on logstash-core-plugin-api to >= 1.60 <= 2.99 - -## 3.1.0 - - breaking,config: Remove deprecated config `endpoint_region`. Please use `region` instead. - +## 3.1.2 + - Fix improper shutdown of output worker threads + - improve exception handling ## 3.0.1 - Republish all the gems under jruby. diff --git a/lib/logstash/outputs/s3.rb b/lib/logstash/outputs/s3.rb index 21eaeafe..85910ccc 100644 --- a/lib/logstash/outputs/s3.rb +++ b/lib/logstash/outputs/s3.rb @@ -135,7 +135,7 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base # Exposed attributes for testing purpose. attr_accessor :tempfile - attr_reader :page_counter + attr_reader :page_counter, :upload_workers attr_reader :s3 def aws_s3_config @@ -370,7 +370,7 @@ def close private def shutdown_upload_workers @logger.debug("S3: Gracefully shutdown the upload workers") - @upload_queue << LogStash::ShutdownEvent + @upload_queue << LogStash::SHUTDOWN end private @@ -421,10 +421,11 @@ def configure_upload_workers Stud::Task.new do LogStash::Util::set_thread_name(" worker_id) - upload_worker + continue = upload_worker end end end @@ -432,15 +433,26 @@ def configure_upload_workers private def upload_worker - file = @upload_queue.deq + file = nil + begin + file = @upload_queue.deq - case file - when LogStash::ShutdownEvent + if file == LogStash::SHUTDOWN @logger.debug("S3: upload worker is shutting down gracefuly") - @upload_queue.enq(LogStash::ShutdownEvent) + @upload_queue.enq(LogStash::SHUTDOWN) + false else @logger.debug("S3: upload working is uploading a new file", :filename => File.basename(file)) move_file_to_bucket(file) + true + end + rescue Exception => ex + @logger.error("failed to upload, will re-enqueue #{file} for upload", + :ex => ex, :backtrace => ex.backtrace) + unless file.nil? # Rare case if the first line of the begin doesn't execute + @upload_queue.enq(file) + end + true end end diff --git a/logstash-output-s3.gemspec b/logstash-output-s3.gemspec index df635ee3..e0370979 100644 --- a/logstash-output-s3.gemspec +++ b/logstash-output-s3.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-output-s3' - s.version = '3.1.1' + s.version = '3.1.2' s.licenses = ['Apache License (2.0)'] s.summary = "This plugin was created for store the logstash's events into Amazon Simple Storage Service (Amazon S3)" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/outputs/s3_spec.rb b/spec/outputs/s3_spec.rb index 0b5d0e74..77803174 100644 --- a/spec/outputs/s3_spec.rb +++ b/spec/outputs/s3_spec.rb @@ -302,6 +302,33 @@ end end + describe "closing" do + let(:options) do + { + "access_key_id" => 1234, + "secret_access_key" => "secret", + "bucket" => "mahbucket" + } + end + subject do + ::LogStash::Outputs::S3.new(options) + end + + before do + subject.register + end + + it "should be clean" do + subject.do_close + end + + it "should remove all worker threads" do + subject.do_close + sleep 1 + expect(subject.upload_workers.map(&:thread).any?(&:alive?)).to be false + end + end + it "doesn't skip events if using the time_file option", :tag => :slow do Stud::Temporary.directory do |temporary_directory| time_file = rand(1..2)