diff --git a/README b/README new file mode 100644 index 00000000..c4e741f2 --- /dev/null +++ b/README @@ -0,0 +1,15 @@ +[Missing the other part of the readme] + +## Running the tests + +``` +bundle install +bundle rspec +``` + +If you want to run the integration test against a real bucket you need to pass +your aws credentials to the test runner or declare it in your environment. + +``` +AWS_REGION=us-east-1 AWS_ACCESS_KEY_ID=123 AWS_SECRET_ACCESS_KEY=secret AWS_LOGSTASH_TEST_BUCKET=mytest bundle exec rspec spec/integration/s3_spec.rb --tag integration +``` diff --git a/lib/logstash/outputs/s3.rb b/lib/logstash/outputs/s3.rb index 9e84831a..1332b7f6 100644 --- a/lib/logstash/outputs/s3.rb +++ b/lib/logstash/outputs/s3.rb @@ -1,11 +1,14 @@ # encoding: utf-8 require "logstash/outputs/base" require "logstash/namespace" +require "logstash/plugin_mixins/aws_config" +require "stud/temporary" require "socket" # for Socket.gethostname +require "thread" +require "tmpdir" +require "fileutils" + -# TODO integrate aws_config in the future -#require "logstash/plugin_mixins/aws_config" -# # INFORMATION: # # This plugin was created for store the logstash's events into Amazon Simple Storage Service (Amazon S3). @@ -34,7 +37,6 @@ # ## If you specify size_file and time_file then it will create file for each tag (if specified), when time_file or ## their size > size_file, it will be triggered then they will be pushed on s3's bucket and will delete from local disk. -# ## If you don't specify size_file, but time_file then it will create only one file for each tag (if specified). ## When time_file it will be triggered then the files will be pushed on s3's bucket and delete from local disk. # @@ -44,22 +46,8 @@ ## If you don't specific size_file and time_file you have a curios mode. It will create only one file for each tag (if specified). ## Then the file will be rest on temporary directory and don't will be pushed on bucket until we will restart logstash. # -# INFORMATION ABOUT CLASS: -# -# I tried to comment the class at best i could do. -# I think there are much thing to improve, but if you want some points to develop here a list: -# -# TODO Integrate aws_config in the future -# TODO Find a method to push them all files when logtstash close the session. -# TODO Integrate @field on the path file -# TODO Permanent connection or on demand? For now on demand, but isn't a good implementation. -# Use a while or a thread to try the connection before break a time_out and signal an error. -# TODO If you have bugs report or helpful advice contact me, but remember that this code is much mine as much as yours, -# try to work on it if you want :) -# -# -# USAGE: # +# #### Usage: # This is an example of logstash config: # [source,ruby] # output { @@ -73,285 +61,359 @@ # format => "plain" (optional) # canned_acl => "private" (optional. Options are "private", "public_read", "public_read_write", "authenticated_read". Defaults to "private" ) # } -# } -# -# We analize this: -# -# access_key_id => "crazy_key" -# Amazon will give you the key for use their service if you buy it or try it. (not very much open source anyway) -# -# secret_access_key => "monkey_access_key" -# Amazon will give you the secret_access_key for use their service if you buy it or try it . (not very much open source anyway). -# -# endpoint_region => "eu-west-1" -# When you make a contract with Amazon, you should know where the services you use. -# -# bucket => "boss_please_open_your_bucket" -# Be careful you have the permission to write on bucket and know the name. -# -# size_file => 2048 -# Means the size, in KB, of files who can store on temporary directory before you will be pushed on bucket. -# Is useful if you have a little server with poor space on disk and you don't want blow up the server with unnecessary temporary log files. -# -# time_file => 5 -# Means, in minutes, the time before the files will be pushed on bucket. Is useful if you want to push the files every specific time. -# -# format => "plain" -# Means the format of events you want to store in the files -# -# canned_acl => "private" -# The S3 canned ACL to use when putting the file. Defaults to "private". -# -# LET'S ROCK AND ROLL ON THE CODE! # class LogStash::Outputs::S3 < LogStash::Outputs::Base - #TODO integrate aws_config in the future - # include LogStash::PluginMixins::AwsConfig - - config_name "s3" - milestone 1 + include LogStash::PluginMixins::AwsConfig + + TEMPFILE_EXTENSION = "txt" + S3_INVALID_CHARACTERS = /[\^`><]/ + + config_name "s3" + milestone 1 + default :codec, 'line' + + # S3 bucket + config :bucket, :validate => :string + + # AWS endpoint_region + config :endpoint_region, :validate => ["us-east-1", "us-west-1", "us-west-2", + "eu-west-1", "ap-southeast-1", "ap-southeast-2", + "ap-northeast-1", "sa-east-1", "us-gov-west-1"], :deprecated => 'Deprecated, use region instead.' + + # Set the size of file in bytes, this means that files on bucket when have dimension > file_size, they are stored in two or more file. + # If you have tags then it will generate a specific size file for every tags + ##NOTE: define size of file is the better thing, because generate a local temporary file on disk and then put it in bucket. + config :size_file, :validate => :number, :default => 0 + + # Set the time, in minutes, to close the current sub_time_section of bucket. + # If you define file_size you have a number of files in consideration of the section and the current tag. + # 0 stay all time on listerner, beware if you specific 0 and size_file 0, because you will not put the file on bucket, + # for now the only thing this plugin can do is to put the file when logstash restart. + config :time_file, :validate => :number, :default => 0 + + ## IMPORTANT: if you use multiple instance of s3, you should specify on one of them the "restore=> true" and on the others "restore => false". + ## This is hack for not destroy the new files after restoring the initial files. + ## If you do not specify "restore => true" when logstash crashes or is restarted, the files are not sent into the bucket, + ## for example if you have single Instance. + config :restore, :validate => :boolean, :default => false + + # The S3 canned ACL to use when putting the file. Defaults to "private". + config :canned_acl, :validate => ["private", "public_read", "public_read_write", "authenticated_read"], + :default => "private" + + # Set the directory where logstash will store the tmp files before sending it to S3 + # default to the current OS temporary directory in linux /tmp/logstash + config :temporary_directory, :validate => :string, :default => File.join(Dir.tmpdir, "logstash") + + # Specify a prefix to the uploaded filename, this can simulate directories on S3 + config :prefix, :validate => :string, :default => '' + + # Specify how many workers to use to upload the files to S3 + config :upload_workers_count, :validate => :number, :default => 1 + + # Exposed attributes for testing purpose. + attr_accessor :tempfile + attr_reader :page_counter + attr_reader :s3 + + def aws_s3_config + @logger.info("Registering s3 output", :bucket => @bucket, :endpoint_region => @region) + @s3 = AWS::S3.new(aws_options_hash) + end - # Aws access_key. - config :access_key_id, :validate => :string + def aws_service_endpoint(region) + # Make the deprecated endpoint_region work + # TODO: (ph) Remove this after deprecation. + + if @endpoint_region + region_to_use = @endpoint_region + else + region_to_use = @region + end - # Aws secret_access_key - config :secret_access_key, :validate => :string + return { + :s3_endpoint => region_to_use == 'us-east-1' ? 's3.amazonaws.com' : "s3-#{region_to_use}.amazonaws.com" + } + end - # S3 bucket - config :bucket, :validate => :string + public + def write_on_bucket(file) + # find and use the bucket + bucket = @s3.buckets[@bucket] - # Aws endpoint_region - config :endpoint_region, :validate => ["us-east-1", "us-west-1", "us-west-2", - "eu-west-1", "ap-southeast-1", "ap-southeast-2", - "ap-northeast-1", "sa-east-1", "us-gov-west-1"], :default => "us-east-1" + remote_filename = "#{@prefix}#{File.basename(file)}" - # Set the size of file in KB, this means that files on bucket when have dimension > file_size, they are stored in two or more file. - # If you have tags then it will generate a specific size file for every tags - ##NOTE: define size of file is the better thing, because generate a local temporary file on disk and then put it in bucket. - config :size_file, :validate => :number, :default => 0 + @logger.debug("S3: ready to write file in bucket", :remote_filename => remote_filename, :bucket => @bucket) - # Set the time, in minutes, to close the current sub_time_section of bucket. - # If you define file_size you have a number of files in consideration of the section and the current tag. - # 0 stay all time on listerner, beware if you specific 0 and size_file 0, because you will not put the file on bucket, - # for now the only thing this plugin can do is to put the file when logstash restart. - config :time_file, :validate => :number, :default => 0 + begin + # prepare for write the file + object = bucket.objects[remote_filename] + object.write(:file => file, :acl => @canned_acl) + rescue AWS::Errors::Base => error + @logger.error("S3: AWS error", :error => error) + raise LogStash::Error, "AWS Configuration Error, #{error}" + end - # The event format you want to store in files. Defaults to plain text. - config :format, :validate => [ "json", "plain", "nil" ], :default => "plain" + @logger.debug("S3: has written remote file in bucket with canned ACL", :remote_filename => remote_filename, :bucket => @bucket, :canned_acl => @canned_acl) + end - ## IMPORTANT: if you use multiple instance of s3, you should specify on one of them the "restore=> true" and on the others "restore => false". - ## This is hack for not destroy the new files after restoring the initial files. - ## If you do not specify "restore => true" when logstash crashes or is restarted, the files are not sent into the bucket, - ## for example if you have single Instance. - config :restore, :validate => :boolean, :default => false + # This method is used for create new empty temporary files for use. Flag is needed for indicate new subsection time_file. + public + def create_temporary_file + filename = File.join(@temporary_directory, get_temporary_filename(@page_counter)) - # Aws canned ACL - config :canned_acl, :validate => ["private", "public_read", "public_read_write", "authenticated_read"], - :default => "private" + @logger.debug("S3: Creating a new temporary file", :filename => filename) - # Method to set up the aws configuration and establish connection - def aws_s3_config + @file_rotation_lock.synchronize do + unless @tempfile.nil? + @tempfile.close + end - @endpoint_region == 'us-east-1' ? @endpoint_region = 's3.amazonaws.com' : @endpoint_region = 's3-'+@endpoint_region+'.amazonaws.com' + @tempfile = File.open(filename, "a") + end + end - @logger.info("Registering s3 output", :bucket => @bucket, :endpoint_region => @endpoint_region) + public + def register + require "aws-sdk" + # required if using ruby version < 2.0 + # http://ruby.awsblog.com/post/Tx16QY1CI5GVBFT/Threading-with-the-AWS-SDK-for-Ruby + AWS.eager_autoload!(AWS::S3) - AWS.config( - :access_key_id => @access_key_id, - :secret_access_key => @secret_access_key, - :s3_endpoint => @endpoint_region - ) - @s3 = AWS::S3.new + workers_not_supported - end + @s3 = aws_s3_config + @upload_queue = Queue.new + @file_rotation_lock = Mutex.new - # This method is used to manage sleep and awaken thread. - def time_alert(interval) + if @prefix && @prefix =~ S3_INVALID_CHARACTERS + @logger.error("S3: prefix contains invalid characters", :prefix => @prefix, :contains => S3_INVALID_CHARACTERS) + raise LogStash::ConfigurationError, "S3: prefix contains invalid characters" + end - Thread.new do - loop do - start_time = Time.now - yield - elapsed = Time.now - start_time - sleep([interval - elapsed, 0].max) + if !Dir.exist?(@temporary_directory) + FileUtils.mkdir_p(@temporary_directory) end - end - end + test_s3_write - # this method is used for write files on bucket. It accept the file and the name of file. - def write_on_bucket (file_data, file_basename) + restore_from_crashes if @restore == true + reset_page_counter + create_temporary_file + configure_periodic_rotation if time_file != 0 + configure_upload_workers - # if you lose connection with s3, bad control implementation. - if ( @s3 == nil) - aws_s3_config + @codec.on_event do |event, encoded_event| + handle_event(encoded_event) + end end - # find and use the bucket - bucket = @s3.buckets[@bucket] - - @logger.debug "S3: ready to write "+file_basename+" in bucket "+@bucket+", Fire in the hole!" - # prepare for write the file - object = bucket.objects[file_basename] - object.write(:file => file_data, :acl => @canned_acl) + # Use the same method that Amazon use to check + # permission on the user bucket by creating a small file + public + def test_s3_write + @logger.debug("S3: Creating a test file on S3") - @logger.debug "S3: has written "+file_basename+" in bucket "+@bucket + " with canned ACL \"" + @canned_acl + "\"" + test_filename = File.join(@temporary_directory, + "logstash-programmatic-access-test-object-#{Time.now.to_i}") - end - - # this method is used for create new path for name the file - def getFinalPath + File.open(test_filename, 'a') do |file| + file.write('test') + end - @pass_time = Time.now - return @temp_directory+"ls.s3."+Socket.gethostname+"."+(@pass_time).strftime("%Y-%m-%dT%H.%M") + begin + write_on_bucket(test_filename) + delete_on_bucket(test_filename) + ensure + File.delete(test_filename) + end + end + + public + def restore_from_crashes + @logger.debug("S3: is attempting to verify previous crashes...") + + Dir[File.join(@temporary_directory, "*.#{TEMPFILE_EXTENSION}")].each do |file| + name_file = File.basename(file) + @logger.warn("S3: have found temporary file the upload process crashed, uploading file to S3.", :filename => name_file) + move_file_to_bucket_async(file) + end + end - end + public + def move_file_to_bucket(file) + if !File.zero?(file) + write_on_bucket(file) + @logger.debug("S3: file was put on the upload thread", :filename => File.basename(file), :bucket => @bucket) + end - # This method is used for restore the previous crash of logstash or to prepare the files to send in bucket. - # Take two parameter: flag and name. Flag indicate if you want to restore or not, name is the name of file - def upFile(flag, name) + begin + File.delete(file) + rescue Errno::ENOENT + # Something else deleted the file, logging but not raising the issue + @logger.warn("S3: Cannot delete the temporary file since it doesn't exist on disk", :filename => File.basename(file)) + rescue Errno::EACCES + @logger.error("S3: Logstash doesnt have the permission to delete the file in the temporary directory.", :filename => File.basename, :temporary_directory => @temporary_directory) + end + end - Dir[@temp_directory+name].each do |file| - name_file = File.basename(file) + public + def periodic_interval + @time_file * 60 + end - if (flag == true) - @logger.warn "S3: have found temporary file: "+name_file+", something has crashed before... Prepare for upload in bucket!" - end + public + def get_temporary_filename(page_counter = 0) + current_time = Time.now + filename = "ls.s3.#{Socket.gethostname}.#{current_time.strftime("%Y-%m-%dT%H.%M")}" - if (!File.zero?(file)) - write_on_bucket(file, name_file) + if @tags.size > 0 + return "#{filename}.tag_#{@tags.join('.')}.part#{page_counter}.#{TEMPFILE_EXTENSION}" + else + return "#{filename}.part#{page_counter}.#{TEMPFILE_EXTENSION}" + end + end - if (flag == true) - @logger.debug "S3: file: "+name_file+" restored on bucket "+@bucket - else - @logger.debug "S3: file: "+name_file+" was put on bucket "+@bucket - end - end + public + def receive(event) + return unless output?(event) + @codec.encode(event) + end - File.delete (file) + public + def rotate_events_log? + @tempfile.size > @size_file + end - end - end + public + def write_events_to_multiple_files? + @size_file > 0 + end - # This method is used for create new empty temporary files for use. Flag is needed for indicate new subsection time_file. - def newFile (flag) + public + def write_to_tempfile(event) + begin + @logger.debug("S3: put event into tempfile ", :tempfile => File.basename(@tempfile)) + + @file_rotation_lock.synchronize do + @tempfile.syswrite(event) + end + rescue Errno::ENOSPC + @logger.error("S3: No space left in temporary directory", :temporary_directory => @temporary_directory) + teardown + end + end - if (flag == true) - @current_final_path = getFinalPath - @sizeCounter = 0 - end + public + def teardown + shutdown_upload_workers + @periodic_rotation_thread.stop! if @periodic_rotation_thread - if (@tags.size != 0) - @tempFile = File.new(@current_final_path+".tag_"+@tag_path+"part"+@sizeCounter.to_s+".txt", "w") - else - @tempFile = File.new(@current_final_path+".part"+@sizeCounter.to_s+".txt", "w") - end + @tempfile.close + finished + end - end + private + def shutdown_upload_workers + @logger.debug("S3: Gracefully shutdown the upload workers") + @upload_queue << LogStash::ShutdownEvent + end - public - def register - require "aws-sdk" - @temp_directory = "/opt/logstash/S3_temp/" + private + def handle_event(encoded_event) + if write_events_to_multiple_files? + if rotate_events_log? + @logger.debug("S3: tempfile is too large, let's bucket it and create new file", :tempfile => File.basename(@tempfile)) + + move_file_to_bucket_async(@tempfile.path) + next_page + create_temporary_file + else + @logger.debug("S3: tempfile file size report.", :tempfile_size => @tempfile.size, :size_file => @size_file) + end + end + + write_to_tempfile(encoded_event) + end - if (@tags.size != 0) - @tag_path = "" - for i in (0..@tags.size-1) - @tag_path += @tags[i].to_s+"." - end - end + private + def configure_periodic_rotation + @periodic_rotation_thread = Stud::Task.new do + LogStash::Util::set_thread_name(" true) do + @logger.debug("S3: time_file triggered, bucketing the file", :filename => @tempfile.path) - if (@restore == true ) - @logger.debug "S3: is attempting to verify previous crashes..." + move_file_to_bucket_async(@tempfile.path) + next_page + create_temporary_file + end + end + end - upFile(true, "*.txt") - end + private + def configure_upload_workers + @logger.debug("S3: Configure upload workers") - newFile(true) + @upload_workers = @upload_workers_count.times.map do |worker_id| + Stud::Task.new do + LogStash::Util::set_thread_name(" worker_id) - end - - public - def receive(event) - return unless output?(event) - - # Prepare format of Events - if (@format == "plain") - message = self.class.format_message(event) - elsif (@format == "json") - message = event.to_json - else - message = event.to_s + upload_worker + end + end + end end - if(time_file !=0) - @logger.debug "S3: trigger files after "+((@pass_time+60*time_file)-Time.now).to_s + private + def upload_worker + file = @upload_queue.deq + + case file + when LogStash::ShutdownEvent + @logger.debug("S3: upload worker is shutting down gracefuly") + @upload_queue.enq(LogStash::ShutdownEvent) + else + @logger.debug("S3: upload working is uploading a new file", :filename => File.basename(file)) + move_file_to_bucket(file) + end end - # if specific the size - if(size_file !=0) - - if (@tempFile.size < @size_file ) - - @logger.debug "S3: File have size: "+@tempFile.size.to_s+" and size_file is: "+ @size_file.to_s - @logger.debug "S3: put event into: "+File.basename(@tempFile) - - # Put the event in the file, now! - File.open(@tempFile, 'a') do |file| - file.puts message - file.write "\n" - end + private + def next_page + @page_counter += 1 + end - else + private + def reset_page_counter + @page_counter = 0 + end - @logger.debug "S3: file: "+File.basename(@tempFile)+" is too large, let's bucket it and create new file" - upFile(false, File.basename(@tempFile)) - @sizeCounter += 1 - newFile(false) + private + def delete_on_bucket(filename) + bucket = @s3.buckets[@bucket] - end + remote_filename = "#{@prefix}#{File.basename(filename)}" - # else we put all in one file - else + @logger.debug("S3: delete file from bucket", :remote_filename => remote_filename, :bucket => @bucket) - @logger.debug "S3: put event into "+File.basename(@tempFile) - File.open(@tempFile, 'a') do |file| - file.puts message - file.write "\n" + begin + # prepare for write the file + object = bucket.objects[remote_filename] + object.delete + rescue AWS::Errors::Base => e + @logger.error("S3: AWS error", :error => e) + raise LogStash::ConfigurationError, "AWS Configuration Error" end end - end - - def self.format_message(event) - message = "Date: #{event[LogStash::Event::TIMESTAMP]}\n" - message << "Source: #{event["source"]}\n" - message << "Tags: #{event["tags"].join(', ')}\n" - message << "Fields: #{event.to_hash.inspect}\n" - message << "Message: #{event["message"]}" - end - + private + def move_file_to_bucket_async(file) + @logger.debug("S3: Sending the file to the upload queue.", :filename => File.basename(file)) + @upload_queue.enq(file) + end end - -# Enjoy it, by Bistic:) diff --git a/logstash-output-s3.gemspec b/logstash-output-s3.gemspec index a9ea8c90..7ca2403c 100644 --- a/logstash-output-s3.gemspec +++ b/logstash-output-s3.gemspec @@ -23,6 +23,10 @@ Gem::Specification.new do |s| s.add_runtime_dependency 'logstash', '>= 1.4.0', '< 2.0.0' s.add_runtime_dependency 'logstash-mixin-aws' s.add_runtime_dependency 'aws-sdk' + s.add_runtime_dependency 'stud', '~> 0.0.18' s.add_development_dependency 'logstash-devutils' + s.add_development_dependency 'logstash-input-generator' + s.add_development_dependency 'logstash-input-stdin' + s.add_development_dependency 'logstash-codec-line' end diff --git a/spec/integration/s3_spec.rb b/spec/integration/s3_spec.rb new file mode 100644 index 00000000..022c74a3 --- /dev/null +++ b/spec/integration/s3_spec.rb @@ -0,0 +1,96 @@ +require "logstash/devutils/rspec/spec_helper" +require "logstash/outputs/s3" +require 'socket' +require "aws-sdk" +require "fileutils" +require "stud/temporary" +require_relative "../supports/helpers" + +describe LogStash::Outputs::S3, :integration => true, :s3 => true do + before do + Thread.abort_on_exception = true + end + + let!(:minimal_settings) { { "access_key_id" => ENV['AWS_ACCESS_KEY_ID'], + "secret_access_key" => ENV['AWS_SECRET_ACCESS_KEY'], + "bucket" => ENV['AWS_LOGSTASH_TEST_BUCKET'], + "region" => ENV["AWS_REGION"] || "us-east-1", + "temporary_directory" => Stud::Temporary.pathname('temporary_directory') }} + + let!(:s3_object) do + s3output = LogStash::Outputs::S3.new(minimal_settings) + s3output.register + s3output.s3 + end + + after(:all) do + delete_matching_keys_on_bucket('studtmp') + delete_matching_keys_on_bucket('my-prefix') + end + + describe "#register" do + it "write a file on the bucket to check permissions" do + s3 = LogStash::Outputs::S3.new(minimal_settings) + expect(s3.register).not_to raise_error + end + end + + describe "#write_on_bucket" do + after(:all) do + File.unlink(fake_data.path) + end + + let!(:fake_data) { Stud::Temporary.file } + + it "should prefix the file on the bucket if a prefix is specified" do + prefix = "my-prefix" + + config = minimal_settings.merge({ + "prefix" => prefix, + }) + + s3 = LogStash::Outputs::S3.new(config) + s3.register + s3.write_on_bucket(fake_data) + + expect(key_exists_on_bucket?("#{prefix}#{File.basename(fake_data.path)}")).to eq(true) + end + + it 'should use the same local filename if no prefix is specified' do + s3 = LogStash::Outputs::S3.new(minimal_settings) + s3.register + s3.write_on_bucket(fake_data) + + expect(key_exists_on_bucket?(File.basename(fake_data.path))).to eq(true) + end + end + + describe "#move_file_to_bucket" do + let!(:s3) { LogStash::Outputs::S3.new(minimal_settings) } + + before do + s3.register + end + + it "should upload the file if the size > 0" do + tmp = Stud::Temporary.file + allow(File).to receive(:zero?).and_return(false) + s3.move_file_to_bucket(tmp) + expect(key_exists_on_bucket?(File.basename(tmp.path))).to eq(true) + end + end + + describe "#restore_from_crashes" do + it "read the temp directory and upload the matching file to s3" do + Stud::Temporary.pathname do |temp_path| + tempfile = File.open(File.join(temp_path, 'A'), 'w+') { |f| f.write('test')} + + s3 = LogStash::Outputs::S3.new(minimal_settings.merge({ "temporary_directory" => temp_path })) + s3.restore_from_crashes + + expect(File.exist?(tempfile.path)).to eq(false) + expect(key_exists_on_bucket?(File.basename(tempfile.path))).to eq(true) + end + end + end +end diff --git a/spec/outputs/s3_spec.rb b/spec/outputs/s3_spec.rb index 863444ec..1753b9ed 100644 --- a/spec/outputs/s3_spec.rb +++ b/spec/outputs/s3_spec.rb @@ -1,6 +1,324 @@ # encoding: utf-8 require "logstash/devutils/rspec/spec_helper" -require 'logstash/outputs/s3' +require "logstash/outputs/s3" +require "logstash/codecs/line" +require "logstash/pipeline" +require "aws-sdk" +require "fileutils" +require_relative "../supports/helpers" describe LogStash::Outputs::S3 do + before do + # We stub all the calls from S3, for more information see: + # http://ruby.awsblog.com/post/Tx2SU6TYJWQQLC3/Stubbing-AWS-Responses + AWS.stub! + + Thread.abort_on_exception = true + end + + let(:minimal_settings) { { "access_key_id" => "1234", + "secret_access_key" => "secret", + "bucket" => "my-bucket" } } + + describe "configuration" do + let!(:config) { { "endpoint_region" => "sa-east-1" } } + + it "should support the deprecated endpoint_region as a configuration option" do + s3 = LogStash::Outputs::S3.new(config) + expect(s3.aws_options_hash[:s3_endpoint]).to eq("s3-sa-east-1.amazonaws.com") + end + + it "should fallback to region if endpoint_region isnt defined" do + s3 = LogStash::Outputs::S3.new(config.merge({ "region" => 'sa-east-1' })) + expect(s3.aws_options_hash).to include(:s3_endpoint => "s3-sa-east-1.amazonaws.com") + end + end + + describe "#register" do + it "should create the tmp directory if it doesn't exist" do + temporary_directory = Stud::Temporary.pathname("temporary_directory") + + config = { + "access_key_id" => "1234", + "secret_access_key" => "secret", + "bucket" => "logstash", + "size_file" => 10, + "temporary_directory" => temporary_directory + } + + s3 = LogStash::Outputs::S3.new(config) + allow(s3).to receive(:test_s3_write) + s3.register + + expect(Dir.exist?(temporary_directory)).to eq(true) + FileUtils.rm_r(temporary_directory) + end + + it "should raise a ConfigurationError if the prefix contains one or more '\^`><' characters" do + config = { + "prefix" => "`no\><^" + } + + s3 = LogStash::Outputs::S3.new(config) + + expect { + s3.register + }.to raise_error(LogStash::ConfigurationError) + end + end + + describe "#generate_temporary_filename" do + before do + Socket.stub(:gethostname) { "logstash.local" } + Time.stub(:now) { Time.new('2015-10-09-09:00') } + end + + it "should add tags to the filename if present" do + config = minimal_settings.merge({ "tags" => ["elasticsearch", "logstash", "kibana"], "temporary_directory" => "/tmp/logstash"}) + s3 = LogStash::Outputs::S3.new(config) + expect(s3.get_temporary_filename).to eq("ls.s3.logstash.local.2015-01-01T00.00.tag_elasticsearch.logstash.kibana.part0.txt") + end + + it "should not add the tags to the filename" do + config = minimal_settings.merge({ "tags" => [], "temporary_directory" => "/tmp/logstash" }) + s3 = LogStash::Outputs::S3.new(config) + expect(s3.get_temporary_filename(3)).to eq("ls.s3.logstash.local.2015-01-01T00.00.part3.txt") + end + + it "normalized the temp directory to include the trailing slash if missing" do + s3 = LogStash::Outputs::S3.new(minimal_settings.merge({ "temporary_directory" => "/tmp/logstash" })) + expect(s3.get_temporary_filename).to eq("ls.s3.logstash.local.2015-01-01T00.00.part0.txt") + end + end + + describe "#write_on_bucket" do + after(:all) do + File.unlink(fake_data.path) + end + + let!(:fake_data) { Stud::Temporary.file } + + let(:fake_bucket) do + s3 = double('S3Object') + s3.stub(:write) + s3 + end + + it "should prefix the file on the bucket if a prefix is specified" do + prefix = "my-prefix" + + config = minimal_settings.merge({ + "prefix" => prefix, + "bucket" => "my-bucket" + }) + + expect_any_instance_of(AWS::S3::ObjectCollection).to receive(:[]).with("#{prefix}#{File.basename(fake_data)}") { fake_bucket } + + s3 = LogStash::Outputs::S3.new(config) + allow(s3).to receive(:test_s3_write) + s3.register + s3.write_on_bucket(fake_data) + end + + it 'should use the same local filename if no prefix is specified' do + config = minimal_settings.merge({ + "bucket" => "my-bucket" + }) + + expect_any_instance_of(AWS::S3::ObjectCollection).to receive(:[]).with(File.basename(fake_data)) { fake_bucket } + + s3 = LogStash::Outputs::S3.new(minimal_settings) + allow(s3).to receive(:test_s3_write) + s3.register + s3.write_on_bucket(fake_data) + end + end + + describe "#write_events_to_multiple_files?" do + it 'returns true if the size_file is != 0 ' do + s3 = LogStash::Outputs::S3.new(minimal_settings.merge({ "size_file" => 200 })) + expect(s3.write_events_to_multiple_files?).to eq(true) + end + + it 'returns false if size_file is zero or not set' do + s3 = LogStash::Outputs::S3.new(minimal_settings) + expect(s3.write_events_to_multiple_files?).to eq(false) + end + end + + describe "#write_to_tempfile" do + it "should append the event to a file" do + Stud::Temporary.file("logstash", "a+") do |tmp| + s3 = LogStash::Outputs::S3.new(minimal_settings) + allow(s3).to receive(:test_s3_write) + s3.register + s3.tempfile = tmp + s3.write_to_tempfile("test-write") + tmp.rewind + expect(tmp.read).to eq("test-write") + end + end + end + + describe "#rotate_events_log" do + let(:s3) { LogStash::Outputs::S3.new(minimal_settings.merge({ "size_file" => 1024 })) } + + it "returns true if the tempfile is over the file_size limit" do + Stud::Temporary.file do |tmp| + tmp.stub(:size) { 2024001 } + + s3.tempfile = tmp + expect(s3.rotate_events_log?).to be(true) + end + end + + it "returns false if the tempfile is under the file_size limit" do + Stud::Temporary.file do |tmp| + tmp.stub(:size) { 100 } + + s3.tempfile = tmp + expect(s3.rotate_events_log?).to eq(false) + end + end + end + + describe "#move_file_to_bucket" do + let!(:s3) { LogStash::Outputs::S3.new(minimal_settings) } + + before do + # Assume the AWS test credentials pass. + allow(s3).to receive(:test_s3_write) + s3.register + end + + it "should always delete the source file" do + tmp = Stud::Temporary.file + + allow(File).to receive(:zero?).and_return(true) + expect(File).to receive(:delete).with(tmp) + + s3.move_file_to_bucket(tmp) + end + + it 'should not upload the file if the size of the file is zero' do + temp_file = Stud::Temporary.file + allow(temp_file).to receive(:zero?).and_return(true) + + expect(s3).not_to receive(:write_on_bucket) + s3.move_file_to_bucket(temp_file) + end + + it "should upload the file if the size > 0" do + tmp = Stud::Temporary.file + + allow(File).to receive(:zero?).and_return(false) + expect(s3).to receive(:write_on_bucket) + + s3.move_file_to_bucket(tmp) + end + end + + describe "#restore_from_crashes" do + it "read the temp directory and upload the matching file to s3" do + s3 = LogStash::Outputs::S3.new(minimal_settings.merge({ "temporary_directory" => "/tmp/logstash/" })) + + expect(Dir).to receive(:[]).with("/tmp/logstash/*.txt").and_return(["/tmp/logstash/01.txt"]) + expect(s3).to receive(:move_file_to_bucket_async).with("/tmp/logstash/01.txt") + + + s3.restore_from_crashes + end + end + + describe "#receive" do + it "should send the event through the codecs" do + data = {"foo" => "bar", "baz" => {"bah" => ["a","b","c"]}, "@timestamp" => "2014-05-30T02:52:17.929Z"} + event = LogStash::Event.new(data) + + expect_any_instance_of(LogStash::Codecs::Line).to receive(:encode).with(event) + + s3 = LogStash::Outputs::S3.new(minimal_settings) + allow(s3).to receive(:test_s3_write) + s3.register + + s3.receive(event) + end + end + + describe "when rotating the temporary file" do + before { allow(File).to receive(:delete) } + + it "doesn't skip events if using the size_file option" do + Stud::Temporary.directory do |temporary_directory| + size_file = rand(200..20000) + event_count = rand(300..15000) + + config = %Q[ + input { + generator { + count => #{event_count} + } + } + output { + s3 { + access_key_id => "1234" + secret_access_key => "secret" + size_file => #{size_file} + codec => line + temporary_directory => '#{temporary_directory}' + bucket => 'testing' + } + } + ] + + pipeline = LogStash::Pipeline.new(config) + + pipeline_thread = Thread.new { pipeline.run } + sleep 0.1 while !pipeline.ready? + pipeline_thread.join + + events_written_count = events_in_files(Dir[File.join(temporary_directory, 'ls.*.txt')]) + expect(events_written_count).to eq(event_count) + end + end + + it "doesn't skip events if using the time_file option", :tag => :slow do + Stud::Temporary.directory do |temporary_directory| + time_file = rand(5..10) + number_of_rotation = rand(4..10) + + config = { + "time_file" => time_file, + "codec" => "line", + "temporary_directory" => temporary_directory, + "bucket" => "testing" + } + + s3 = LogStash::Outputs::S3.new(minimal_settings.merge(config)) + # Make the test run in seconds intead of minutes.. + allow(s3).to receive(:periodic_interval).and_return(time_file) + s3.register + + # Force to have a few files rotation + stop_time = Time.now + (number_of_rotation * time_file) + event_count = 0 + + event = LogStash::Event.new("message" => "Hello World") + + until Time.now > stop_time do + s3.receive(event) + event_count += 1 + end + s3.teardown + + generated_files = Dir[File.join(temporary_directory, 'ls.*.txt')] + + events_written_count = events_in_files(generated_files) + + # Skew times can affect the number of rotation.. + expect(generated_files.count).to be_within(number_of_rotation).of(number_of_rotation + 1) + expect(events_written_count).to eq(event_count) + end + end + end end diff --git a/spec/supports/helpers.rb b/spec/supports/helpers.rb new file mode 100644 index 00000000..e34f1ff3 --- /dev/null +++ b/spec/supports/helpers.rb @@ -0,0 +1,14 @@ +def delete_matching_keys_on_bucket(prefix) + s3_object.buckets[minimal_settings["bucket"]].objects.with_prefix(prefix).each do |obj| + obj.delete + end +end + +def key_exists_on_bucket?(key) + s3_object.buckets[minimal_settings["bucket"]].objects[key].exists? +end + +def events_in_files(files) + files.collect { |file| File.foreach(file).count }.inject(&:+) +end +