11# encoding: utf-8
2+ require "java"
23require "concurrent"
3- require "concurrent/map"
44require "concurrent/timer_task"
55require "logstash/util"
66
7+ java_import "java.util.concurrent.ConcurrentHashMap"
8+
79module LogStash
810 module Outputs
911 class S3
@@ -28,14 +30,18 @@ def with_lock
2830 def stale?
2931 with_lock { |factory | factory . current . size == 0 && ( Time . now - factory . current . ctime > @stale_time ) }
3032 end
33+
34+ def apply ( prefix )
35+ return self
36+ end
3137 end
3238
3339 def initialize ( tags , encoding , temporary_directory ,
3440 stale_time = DEFAULT_STALE_TIME_SECS ,
3541 sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS )
3642 # The path need to contains the prefix so when we start
3743 # logtash after a crash we keep the remote structure
38- @prefixed_factories = Concurrent :: Map . new
44+ @prefixed_factories = ConcurrentHashMap . new
3945
4046 @tags = tags
4147 @encoding = encoding
@@ -48,19 +54,20 @@ def initialize(tags, encoding, temporary_directory,
4854 end
4955
5056 def keys
51- @prefixed_factories . keys
57+ arr = [ ]
58+ @prefixed_factories . keySet . each { |k | arr << k }
59+ arr
5260 end
5361
5462 def each_files
55- @prefixed_factories . each_value do |prefixed_file |
63+ @prefixed_factories . values do |prefixed_file |
5664 prefixed_file . with_lock { |factory | yield factory . current }
5765 end
5866 end
5967
6068 # Return the file factory
6169 def get_factory ( prefix_key )
62- @prefixed_factories . compute_if_absent ( prefix_key ) { PrefixedValue . new ( TemporaryFileFactory . new ( prefix_key , @tags , @encoding , @temporary_directory ) , @stale_time ) }
63- . with_lock { |factory | yield factory }
70+ @prefixed_factories . computeIfAbsent ( prefix_key , PrefixedValue . new ( TemporaryFileFactory . new ( prefix_key , @tags , @encoding , @temporary_directory ) , @stale_time ) ) . with_lock { |factory | yield factory }
6471 end
6572
6673 def get_file ( prefix_key )
@@ -79,8 +86,10 @@ def start_stale_sweeper
7986 @stale_sweeper = Concurrent ::TimerTask . new ( :execution_interval => @sweeper_interval ) do
8087 LogStash ::Util . set_thread_name ( "S3, Stale factory sweeper" )
8188
82- @prefixed_factories . each_pair do |k , v |
83- @prefixed_factories . delete_pair ( k , v ) if v . stale?
89+ @prefixed_factories . entrySet . each do |s |
90+ if s . getValue . stale?
91+ @prefixed_factories . remove ( s . getKey , s . getValue )
92+ end
8493 end
8594 end
8695
0 commit comments