@@ -4,12 +4,14 @@ module SplitIoClient
44 module SSE
55 module Workers
66 class SplitsWorker
7- def initialize ( synchronizer , config , feature_flags_repository )
7+ def initialize ( synchronizer , config , feature_flags_repository , telemetry_runtime_producer , segment_fetcher )
88 @synchronizer = synchronizer
99 @config = config
1010 @feature_flags_repository = feature_flags_repository
1111 @queue = Queue . new
1212 @running = Concurrent ::AtomicBoolean . new ( false )
13+ @telemetry_runtime_producer = telemetry_runtime_producer
14+ @segment_fetcher = segment_fetcher
1315 end
1416
1517 def start
@@ -29,7 +31,7 @@ def stop
2931 end
3032
3133 @running . make_false
32- SplitIoClient :: Helpers ::ThreadHelper . stop ( :split_update_worker , @config )
34+ Helpers ::ThreadHelper . stop ( :split_update_worker , @config )
3335 end
3436
3537 def add_to_queue ( notification )
@@ -39,38 +41,47 @@ def add_to_queue(notification)
3941
4042 private
4143
42- def return_split_from_json ( notification )
43- JSON . parse (
44- SplitIoClient ::Helpers ::DecryptionHelper . get_encoded_definition (
45- notification . data [ 'c' ] ,
46- notification . data [ 'd' ]
47- ) ,
48- symbolize_names : true
49- )
44+ def perform_thread
45+ @config . threads [ :split_update_worker ] = Thread . new do
46+ @config . logger . debug ( 'starting feature_flags_worker ...' ) if @config . debug_enabled
47+ perform
48+ end
5049 end
5150
52- def check_update ( notification )
53- @feature_flags_repository . get_change_number == notification . data [ 'pcn' ] && !notification . data [ 'd' ] . nil?
51+ def perform
52+ while ( notification = @queue . pop )
53+ @config . logger . debug ( "feature_flags_worker change_number dequeue #{ notification . data [ 'changeNumber' ] } " )
54+ case notification . data [ 'type' ]
55+ when SSE ::EventSource ::EventTypes ::SPLIT_UPDATE
56+ success = update_feature_flag ( notification )
57+ @synchronizer . fetch_splits ( notification . data [ 'changeNumber' ] ) unless success
58+ when SSE ::EventSource ::EventTypes ::SPLIT_KILL
59+ kill_feature_flag ( notification )
60+ end
61+ end
5462 end
5563
5664 def update_feature_flag ( notification )
57- return if @feature_flags_repository . get_change_number . to_i > notification . data [ 'changeNumber' ]
65+ return true if @feature_flags_repository . get_change_number . to_i >= notification . data [ 'changeNumber' ]
66+ return false unless !notification . data [ 'd' ] . nil? && @feature_flags_repository . get_change_number == notification . data [ 'pcn' ]
5867
59- if check_update ( notification )
60- begin
61- new_split = return_split_from_json ( notification )
62- if SplitIoClient ::Engine ::Models ::Split . archived? ( new_split )
63- @feature_flags_repository . remove_split ( new_split )
64- else
65- @feature_flags_repository . add_split ( new_split )
66- end
67- @feature_flags_repository . set_change_number ( notification . data [ 'changeNumber' ] )
68- return
69- rescue StandardError => e
70- @config . logger . debug ( "Failed to update Split: #{ e . inspect } " ) if @config . debug_enabled
71- end
68+ new_split = return_split_from_json ( notification )
69+ if Engine ::Models ::Split . archived? ( new_split )
70+ @feature_flags_repository . remove_split ( new_split )
71+ else
72+ @feature_flags_repository . add_split ( new_split )
73+
74+ fetch_segments_if_not_exists ( new_split )
7275 end
73- @synchronizer . fetch_splits ( notification . data [ 'changeNumber' ] )
76+
77+ @feature_flags_repository . set_change_number ( notification . data [ 'changeNumber' ] )
78+ @telemetry_runtime_producer . record_updates_from_sse ( Telemetry ::Domain ::Constants ::SPLITS )
79+
80+ true
81+ rescue StandardError => e
82+ @config . logger . debug ( "Failed to update Split: #{ e . inspect } " ) if @config . debug_enabled
83+
84+ false
7485 end
7586
7687 def kill_feature_flag ( notification )
@@ -85,23 +96,18 @@ def kill_feature_flag(notification)
8596 @synchronizer . fetch_splits ( notification . data [ 'changeNumber' ] )
8697 end
8798
88- def perform
89- while ( notification = @queue . pop )
90- @config . logger . debug ( "feature_flags_worker change_number dequeue #{ notification . data [ 'changeNumber' ] } " )
91- case notification . data [ 'type' ]
92- when SSE ::EventSource ::EventTypes ::SPLIT_UPDATE
93- update_feature_flag ( notification )
94- when SSE ::EventSource ::EventTypes ::SPLIT_KILL
95- kill_feature_flag ( notification )
96- end
97- end
99+ def return_split_from_json ( notification )
100+ split_json = Helpers ::DecryptionHelper . get_encoded_definition ( notification . data [ 'c' ] , notification . data [ 'd' ] )
101+
102+ JSON . parse ( split_json , symbolize_names : true )
98103 end
99104
100- def perform_thread
101- @config . threads [ :split_update_worker ] = Thread . new do
102- @config . logger . debug ( 'starting feature_flags_worker ...' ) if @config . debug_enabled
103- perform
104- end
105+ def fetch_segments_if_not_exists ( feature_flag )
106+ segment_names = Helpers ::Util . segment_names_by_feature_flag ( feature_flag )
107+ return if segment_names . nil?
108+
109+ @feature_flags_repository . set_segment_names ( segment_names )
110+ @segment_fetcher . fetch_segments_if_not_exists ( segment_names )
105111 end
106112 end
107113 end
0 commit comments