diff --git a/CHANGELOG.md b/CHANGELOG.md index b80d9bce..9359c9ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 4.1.7 + - Fixed issue where on restart, 0 byte files could erroneously be uploaded to s3 [#195](https://github.com/logstash-plugins/logstash-output-s3/issues/195) + ## 4.1.6 - Fixed leak of file handles that prevented temporary files from being cleaned up before pipeline restart [#190](https://github.com/logstash-plugins/logstash-output-s3/issues/190) diff --git a/Gemfile b/Gemfile index 32cc6fbb..f4610bdb 100644 --- a/Gemfile +++ b/Gemfile @@ -9,3 +9,7 @@ if Dir.exist?(logstash_path) && use_logstash_source gem 'logstash-core', :path => "#{logstash_path}/logstash-core" gem 'logstash-core-plugin-api', :path => "#{logstash_path}/logstash-core-plugin-api" end + +if RUBY_VERSION == "1.9.3" + gem 'rake', '12.2.1' +end diff --git a/lib/logstash/outputs/s3.rb b/lib/logstash/outputs/s3.rb index 6de2939f..1a9a2726 100644 --- a/lib/logstash/outputs/s3.rb +++ b/lib/logstash/outputs/s3.rb @@ -385,8 +385,12 @@ def restore_from_crash .select { |file| ::File.file?(file) } .each do |file| temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path) - @logger.debug("Recovering from crash and uploading", :file => temp_file.path) - @crash_uploader.upload_async(temp_file, :on_complete => method(:clean_temporary_file), :upload_options => upload_options) + if temp_file.size > 0 + @logger.debug("Recovering from crash and uploading", :file => temp_file.path) + @crash_uploader.upload_async(temp_file, :on_complete => method(:clean_temporary_file), :upload_options => upload_options) + else + clean_temporary_file(temp_file) + end end end end diff --git a/logstash-output-s3.gemspec b/logstash-output-s3.gemspec index 788fa23b..649097a1 100644 --- a/logstash-output-s3.gemspec +++ b/logstash-output-s3.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-s3' - s.version = '4.1.6' + s.version = '4.1.7' s.licenses = ['Apache-2.0'] s.summary = "Sends Logstash events to the Amazon Simple Storage Service" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/integration/restore_from_crash_spec.rb b/spec/integration/restore_from_crash_spec.rb index bc965cf4..f65bb228 100644 --- a/spec/integration/restore_from_crash_spec.rb +++ b/spec/integration/restore_from_crash_spec.rb @@ -11,29 +11,56 @@ let(:number_of_files) { 5 } let(:dummy_content) { "foobar\n" * 100 } + let(:factory) { LogStash::Outputs::S3::TemporaryFileFactory.new(prefix, tags, "none", temporary_directory)} before do clean_remote_files(prefix) - # Use the S3 factory to create mutliples files with dummy content - factory = LogStash::Outputs::S3::TemporaryFileFactory.new(prefix, tags, "none", temporary_directory) + end - # Creating a factory always create a file - factory.current.write(dummy_content) - factory.current.fsync - (number_of_files - 1).times do - factory.rotate! + context 'with a non-empty tempfile' do + before do + # Creating a factory always create a file factory.current.write(dummy_content) factory.current.fsync + + (number_of_files - 1).times do + factory.rotate! + factory.current.write(dummy_content) + factory.current.fsync + end + end + it "uploads the file to the bucket" do + subject.register + try(20) do + expect(bucket_resource.objects(:prefix => prefix).count).to eq(number_of_files) + expect(Dir.glob(File.join(temporary_directory, "*")).size).to eq(0) + expect(bucket_resource.objects(:prefix => prefix).first.acl.grants.collect(&:permission)).to include("READ", "WRITE") + end end end - it "uploads the file to the bucket" do - subject.register - try(20) do - expect(bucket_resource.objects(:prefix => prefix).count).to eq(number_of_files) - expect(Dir.glob(File.join(temporary_directory, "*")).size).to eq(0) - expect(bucket_resource.objects(:prefix => prefix).first.acl.grants.collect(&:permission)).to include("READ", "WRITE") + context 'with an empty tempfile' do + before do + factory.current + factory.rotate! + end + + it "should remove the temporary file" do + expect(Dir.glob(::File.join(temporary_directory, "**", "*")).size).to be > 0 + subject.register + puts Dir.glob(::File.join(temporary_directory, "**", "*")) + expect(Dir.glob(::File.join(temporary_directory, "**", "*")).size).to eq(0) + end + + it "should not upload the file to the bucket" do + expect(bucket_resource.objects(:prefix => prefix).count).to eq(0) + expect(Dir.glob(::File.join(temporary_directory, "**", "*")).size).to be > 0 + subject.register + + # Sleep to give enough time for plugin upload to s3 if it attempts to upload empty temporary file to S3 + sleep 5 + expect(bucket_resource.objects(:prefix => prefix).count).to eq(0) end end end