@@ -370,7 +370,7 @@ def close
370370 private
371371 def shutdown_upload_workers
372372 @logger . debug ( "S3: Gracefully shutdown the upload workers" )
373- @upload_queue << LogStash ::ShutdownEvent
373+ @upload_queue << LogStash ::SHUTDOWN
374374 end
375375
376376 private
@@ -424,23 +424,36 @@ def configure_upload_workers
424424 while true do
425425 @logger . debug ( "S3: upload worker is waiting for a new file to upload." , :worker_id => worker_id )
426426
427- upload_worker
427+ begin
428+ upload_worker
429+ rescue Exception => ex
430+ @logger . error ( 'upload_worker unhandled exception' , :ex => ex , :backtrace => ex . backtrace )
431+ raise LogStash ::Error , 'S3: uploader thread exited unexpectedly'
432+ end
428433 end
429434 end
430435 end
431436 end
432437
433438 private
434439 def upload_worker
435- file = @upload_queue . deq
440+ file = nil
441+ begin
442+ file = @upload_queue . deq
436443
437- case file
438- when LogStash ::ShutdownEvent
444+ if file . is_a? LogStash ::ShutdownEvent
439445 @logger . debug ( "S3: upload worker is shutting down gracefuly" )
440- @upload_queue . enq ( LogStash ::ShutdownEvent )
446+ @upload_queue . enq ( LogStash ::SHUTDOWN )
441447 else
442448 @logger . debug ( "S3: upload working is uploading a new file" , :filename => File . basename ( file ) )
443449 move_file_to_bucket ( file )
450+ end
451+ rescue Exception => ex
452+ @logger . error ( "failed to upload, will re-enqueue #{ file } for upload" ,
453+ :ex => ex , :backtrace => ex . backtrace )
454+ unless file . nil?
455+ @upload_queue . enq ( file )
456+ end
444457 end
445458 end
446459
0 commit comments