1111require "fileutils"
1212require "set"
1313require "pathname"
14+ require "aws-sdk"
15+ require "logstash/outputs/s3/patch"
1416
17+ Aws . eager_autoload!
1518
1619# INFORMATION:
1720#
@@ -118,7 +121,7 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base
118121 ## This is hack for not destroy the new files after restoring the initial files.
119122 ## If you do not specify "restore => true" when logstash crashes or is restarted, the files are not sent into the bucket,
120123 ## for example if you have single Instance.
121- config :restore , :validate => :boolean , :default => false
124+ config :restore , :validate => :boolean , :default => true
122125
123126 # The S3 canned ACL to use when putting the file. Defaults to "private".
124127 config :canned_acl , :validate => [ "private" , "public_read" , "public_read_write" , "authenticated_read" ] ,
@@ -191,20 +194,21 @@ def register
191194 @file_repository = FileRepository . new ( @tags , @encoding , @temporary_directory )
192195
193196 @rotation = rotation_strategy
194- @uploader = Uploader . new ( bucket_resource , @logger , Concurrent ::ThreadPoolExecutor . new ( {
195- :min_threads => 1 ,
196- :max_threads => @upload_workers_count ,
197- :max_queue => @upload_queue_size ,
198- :fallback_policy => :caller_runs
199- } ) )
197+
198+ executor = Concurrent ::ThreadPoolExecutor . new ( { :min_threads => 1 ,
199+ :max_threads => @upload_workers_count ,
200+ :max_queue => @upload_queue_size ,
201+ :fallback_policy => :caller_runs } )
202+
203+ @uploader = Uploader . new ( bucket_resource , @logger , executor )
200204
201205 # Restoring from crash will use a new threadpool to slowly recover
202206 # New events should have more priority.
203207 restore_from_crash if @restore
204208
205209 # If we need time based rotation we need to do periodic check on the file
206210 # to take care of file that were not updated recently
207- start_periodic_check if @rotation . need_periodic ?
211+ start_periodic_check if @rotation . needs_periodic ?
208212 end
209213
210214 def multi_receive_encoded ( events_and_encoded )
@@ -229,7 +233,7 @@ def multi_receive_encoded(events_and_encoded)
229233 end
230234
231235 def close
232- stop_periodic_check if @rotation . need_periodic ?
236+ stop_periodic_check if @rotation . needs_periodic ?
233237
234238 @logger . debug ( "Uploading current workspace" )
235239
@@ -294,7 +298,8 @@ def upload_options
294298
295299 def rotate_if_needed ( prefixes )
296300 prefixes . each do |prefix |
297- # Each file access is thread safe, until the rotation is done then only
301+ # Each file access is thread safe,
302+ # until the rotation is done then only
298303 # one thread has access to the resource.
299304 @file_repository . get_factory ( prefix ) do |factory |
300305 temp_file = factory . current
@@ -316,7 +321,7 @@ def upload_file(temp_file)
316321 @logger . debug ( "Queue for upload" , :path => temp_file . path )
317322
318323 # if the queue is full the calling thread will be used to upload
319- temp_file . fsync # make sure we flush the fd before uploading it.
324+ temp_file . close # make sure the content is on disk
320325 if temp_file . size > 0
321326 @uploader . upload_async ( temp_file ,
322327 :on_complete => method ( :clean_temporary_file ) ,
@@ -346,14 +351,12 @@ def restore_from_crash
346351 @crash_uploader = Uploader . new ( bucket_resource , @logger , CRASH_RECOVERY_THREADPOOL )
347352
348353 temp_folder_path = Pathname . new ( @temporary_directory )
349- Dir . glob ( ::File . join ( @temporary_directory , "**/*" ) ) do |file |
350- if ::File . file? ( file )
351- key_parts = Pathname . new ( file ) . relative_path_from ( temp_folder_path ) . to_s . split ( ::File ::SEPARATOR )
352- temp_file = TemporaryFile . new ( key_parts . slice ( 1 , key_parts . size ) . join ( "/" ) , ::File . open ( file , "r" ) , key_parts . slice ( 0 , 1 ) )
353-
354- @logger . debug ( "Recovering from crash and uploading" , :file => temp_file . path )
355- @crash_uploader . upload_async ( temp_file , :on_complete => method ( :clean_temporary_file ) )
356- end
354+ Dir . glob ( ::File . join ( @temporary_directory , "**/*" ) )
355+ . select { |file | ::File . file? ( file ) }
356+ . each do |file |
357+ temp_file = TemporaryFile . create_from_existing_file ( file , temp_folder_path )
358+ @logger . debug ( "Recovering from crash and uploading" , :file => temp_file . path )
359+ @crash_uploader . upload_async ( temp_file , :on_complete => method ( :clean_temporary_file ) )
357360 end
358361 end
359362end
0 commit comments