From 1a3cd09c04cc61f95bc52f6823f988ad99b983b9 Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany Date: Wed, 31 May 2023 08:25:20 -0700 Subject: [PATCH 1/9] added iff compression and decoding message --- .../sse/notification_processor.rb | 2 +- .../sse/workers/splits_worker.rb | 26 +++++++++++++++++++ spec/sse/workers/splits_worker_spec.rb | 20 ++++++++++++++ 3 files changed, 47 insertions(+), 1 deletion(-) diff --git a/lib/splitclient-rb/sse/notification_processor.rb b/lib/splitclient-rb/sse/notification_processor.rb index 6bde38a3..51c8616a 100644 --- a/lib/splitclient-rb/sse/notification_processor.rb +++ b/lib/splitclient-rb/sse/notification_processor.rb @@ -26,7 +26,7 @@ def process(incoming_notification) def process_split_update(notification) @config.logger.debug("SPLIT UPDATE notification received: #{notification}") if @config.debug_enabled - @splits_worker.add_to_queue(notification.data['changeNumber']) + @splits_worker.split_update(notification) end def process_split_kill(notification) diff --git a/lib/splitclient-rb/sse/workers/splits_worker.rb b/lib/splitclient-rb/sse/workers/splits_worker.rb index 48bccfa2..27b9c71f 100644 --- a/lib/splitclient-rb/sse/workers/splits_worker.rb +++ b/lib/splitclient-rb/sse/workers/splits_worker.rb @@ -32,6 +32,20 @@ def stop SplitIoClient::Helpers::ThreadHelper.stop(:split_update_worker, @config) end + def split_update(notification) + if @splits_repository.get_change_number() == notification.data['pcn'] + begin + @new_split = JSON.parse(get_encoded_definition(notification), symbolize_names: true) + @splits_repository.add_split(@new_split) + @splits_repository.set_change_number(notification.data['changeNumber']) + return + rescue Exception => e + @config.logger.debug("Failed to update Split: #{e.inspect}") if @config.debug_enabled + end + end + add_to_queue(notification.data['changeNumber']) + end + def add_to_queue(change_number) @config.logger.debug("feature_flags_worker add to queue #{change_number}") @queue.push(change_number) @@ -45,6 +59,18 @@ def kill_split(change_number, split_name, default_treatment) add_to_queue(change_number) end + def get_encoded_definition(notification) + case notification.data[:c] + when 0 + return Base64.decode64(notification.data[:d]) + when 1 + gz = Zlib::GzipReader.new(StringIO.new(Base64.decode64(notification.data[:d]))) + return gz.read + when 2 + return Zlib::Inflate.inflate(Base64.decode64(notification.data[:d])) + end + end + private def perform diff --git a/spec/sse/workers/splits_worker_spec.rb b/spec/sse/workers/splits_worker_spec.rb index 0bfc7845..8d5fb88f 100644 --- a/spec/sse/workers/splits_worker_spec.rb +++ b/spec/sse/workers/splits_worker_spec.rb @@ -2,6 +2,7 @@ require 'spec_helper' require 'http_server_mock' +require 'byebug' describe SplitIoClient::SSE::Workers::SplitsWorker do subject { SplitIoClient::SSE::Workers::SplitsWorker } @@ -16,6 +17,9 @@ let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config) } let(:telemetry_runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) } let(:split_fetcher) { SplitIoClient::Cache::Fetchers::SplitFetcher.new(splits_repository, api_key, config, telemetry_runtime_producer) } + let(:event_split_update_no_compression) { SplitIoClient::SSE::EventSource::StreamData.new("data", 123, {"type":"SPLIT_UPDATE","changeNumber":5564531221,"pcn":1234,"c": 0,"d":"eyJ0cmFmZmljVHlwZU5hbWUiOiJ1c2VyIiwiaWQiOiIzM2VhZmE1MC0xYTY1LTExZWQtOTBkZi1mYTMwZDk2OTA0NDUiLCJuYW1lIjoiYmlsYWxfc3BsaXQiLCJ0cmFmZmljQWxsb2NhdGlvbiI6MTAwLCJ0cmFmZmljQWxsb2NhdGlvblNlZWQiOi0xMzY0MTE5MjgyLCJzZWVkIjotNjA1OTM4ODQzLCJzdGF0dXMiOiJBQ1RJVkUiLCJraWxsZWQiOmZhbHNlLCJkZWZhdWx0VHJlYXRtZW50Ijoib2ZmIiwiY2hhbmdlTnVtYmVyIjoxNjg0MzQwOTA4NDc1LCJhbGdvIjoyLCJjb25maWd1cmF0aW9ucyI6e30sImNvbmRpdGlvbnMiOlt7ImNvbmRpdGlvblR5cGUiOiJST0xMT1VUIiwibWF0Y2hlckdyb3VwIjp7ImNvbWJpbmVyIjoiQU5EIiwibWF0Y2hlcnMiOlt7ImtleVNlbGVjdG9yIjp7InRyYWZmaWNUeXBlIjoidXNlciJ9LCJtYXRjaGVyVHlwZSI6IklOX1NFR01FTlQiLCJuZWdhdGUiOmZhbHNlLCJ1c2VyRGVmaW5lZFNlZ21lbnRNYXRjaGVyRGF0YSI6eyJzZWdtZW50TmFtZSI6ImJpbGFsX3NlZ21lbnQifX1dfSwicGFydGl0aW9ucyI6W3sidHJlYXRtZW50Ijoib24iLCJzaXplIjowfSx7InRyZWF0bWVudCI6Im9mZiIsInNpemUiOjEwMH1dLCJsYWJlbCI6ImluIHNlZ21lbnQgYmlsYWxfc2VnbWVudCJ9LHsiY29uZGl0aW9uVHlwZSI6IlJPTExPVVQiLCJtYXRjaGVyR3JvdXAiOnsiY29tYmluZXIiOiJBTkQiLCJtYXRjaGVycyI6W3sia2V5U2VsZWN0b3IiOnsidHJhZmZpY1R5cGUiOiJ1c2VyIn0sIm1hdGNoZXJUeXBlIjoiQUxMX0tFWVMiLCJuZWdhdGUiOmZhbHNlfV19LCJwYXJ0aXRpb25zIjpbeyJ0cmVhdG1lbnQiOiJvbiIsInNpemUiOjB9LHsidHJlYXRtZW50Ijoib2ZmIiwic2l6ZSI6MTAwfV0sImxhYmVsIjoiZGVmYXVsdCBydWxlIn1dfQ=="}, 'test') } + let(:event_split_update_gzip_compression) { SplitIoClient::SSE::EventSource::StreamData.new("data", 123, {"type":"SPLIT_UPDATE","changeNumber":5564531221,"pcn":1234,"c": 1,"d":"H4sIAAkVZWQC/8WST0+DQBDFv0qzZ0ig/BF6a2xjGismUk2MaZopzOKmy9Isy0EbvrtDwbY2Xo233Tdv5se85cCMBs5FtvrYYwIlsglratTMYiKns+chcAgc24UwsF0Xczt2cm5z8Jw8DmPH9wPyqr5zKyTITb2XwpA4TJ5KWWVgRKXYxHWcX/QUkVi264W+68bjaGyxupdCJ4i9KPI9UgyYpibI9Ha1eJnT/J2QsnNxkDVaLEcOjTQrjWBKVIasFefky95BFZg05Zb2mrhh5I9vgsiL44BAIIuKTeiQVYqLotHHLyLOoT1quRjub4fztQuLxj89LpePzytClGCyd9R3umr21ErOcitUh2PTZHY29HN2+JGixMxUujNfvMB3+u2pY1AXySad3z3Mk46msACDp8W7jhly4uUpFt3qD33vDAx0gLpXkx+P1GusbdcE24M2F4uaywwVEWvxSa1Oa13Vjvn2RXradm0xCVuUVBJqNCBGV0DrX4OcLpeb+/lreh3jH8Uw/JQj3UhkxPgCCurdEnADAAA="}, 'test') } + let(:event_split_update_zlib_compression) { SplitIoClient::SSE::EventSource::StreamData.new("data", 123, {"type":"SPLIT_UPDATE","changeNumber":5564531221,"pcn":1234,"c": 2,"d":"eJzEUtFq20AQ/JUwz2c4WZZr3ZupTQh1FKjcQinGrKU95cjpZE6nh9To34ssJ3FNX0sfd3Zm53b2TgietDbF9vXIGdUMha5lDwFTQiGOmTQlchLRPJlEEZeTVJZ6oimWZTpP5WyWQMCNyoOxZPft0ZoA8TZ5aW1TUDCNg4qk/AueM5dQkyiez6IonS6mAu0IzWWSxovFLBZoA4WuhcLy8/bh+xoCL8bagaXJtixQsqbOhq1nCjW7AIVGawgUz+Qqzrr6wB4qmi9m00/JIk7TZCpAtmqgpgJF47SpOn9+UQt16s9YaS71z9NHOYQFha9Pm83Tty0EagrFM/t733RHqIFZH4wb7LDMVh+Ecc4Lv+ZsuQiNH8hXF3hLv39XXNCHbJ+v7x/X2eDmuKLA74sPihVr47jMuRpWfxy1Kwo0GLQjmv1xpBFD3+96gSP5cLVouM7QQaA1vxhK9uKmd853bEZS9jsBSwe2UDDu7mJxd2Mo/muQy81m/2X9I7+N8R/FcPmUd76zjH7X/w4AAP//90glTw=="}, 'test') } let(:synchronizer) do segments_repository = SplitIoClient::Cache::Repositories::SegmentsRepository.new(config) telemetry_api = SplitIoClient::Api::TelemetryApi.new(config, api_key, telemetry_runtime_producer) @@ -129,6 +133,22 @@ end end + context 'update split notification' do + it 'decode and decompress split update data' do + worker = subject.new(synchronizer, config, splits_repository) + worker.start + split_definition = JSON.parse(worker.send(:get_encoded_definition, event_split_update_no_compression)) + expect(split_definition['name'] == 'bilal_split') + + split_definition = JSON.parse(worker.send(:get_encoded_definition, event_split_update_gzip_compression)) + expect(split_definition['name'] == 'bilal_split') + + split_definition = JSON.parse(worker.send(:get_encoded_definition, event_split_update_zlib_compression)) + expect(split_definition['name'] == 'bilal_split') + + end + end + private def mock_split_changes(splits_json) From ba9989e9556113ba4fce9bfe94c26fceb82e7611 Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany Date: Wed, 31 May 2023 08:30:50 -0700 Subject: [PATCH 2/9] moved decode function to private --- lib/splitclient-rb/sse/workers/splits_worker.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/splitclient-rb/sse/workers/splits_worker.rb b/lib/splitclient-rb/sse/workers/splits_worker.rb index 27b9c71f..1e603ca3 100644 --- a/lib/splitclient-rb/sse/workers/splits_worker.rb +++ b/lib/splitclient-rb/sse/workers/splits_worker.rb @@ -59,6 +59,8 @@ def kill_split(change_number, split_name, default_treatment) add_to_queue(change_number) end + private + def get_encoded_definition(notification) case notification.data[:c] when 0 @@ -71,8 +73,6 @@ def get_encoded_definition(notification) end end - private - def perform while (change_number = @queue.pop) @config.logger.debug("feature_flags_worker change_number dequeue #{change_number}") From 1cca90e6f822ee32d8fc5405a0f0e70fb5a74c80 Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany Date: Wed, 31 May 2023 13:03:19 -0700 Subject: [PATCH 3/9] polish and refactor --- lib/splitclient-rb.rb | 1 + .../helpers/decryption_helper.rb | 19 ++++++++ .../sse/notification_processor.rb | 8 +--- .../sse/workers/splits_worker.rb | 45 +++++++++---------- spec/sse/workers/splits_worker_spec.rb | 41 ++++++++++------- 5 files changed, 67 insertions(+), 47 deletions(-) create mode 100644 lib/splitclient-rb/helpers/decryption_helper.rb diff --git a/lib/splitclient-rb.rb b/lib/splitclient-rb.rb index 57ea263e..a69aa23f 100644 --- a/lib/splitclient-rb.rb +++ b/lib/splitclient-rb.rb @@ -41,6 +41,7 @@ require 'splitclient-rb/clients/split_client' require 'splitclient-rb/managers/split_manager' require 'splitclient-rb/helpers/thread_helper' +require 'splitclient-rb/helpers/decryption_helper' require 'splitclient-rb/split_factory' require 'splitclient-rb/split_factory_builder' require 'splitclient-rb/split_config' diff --git a/lib/splitclient-rb/helpers/decryption_helper.rb b/lib/splitclient-rb/helpers/decryption_helper.rb new file mode 100644 index 00000000..c7c2af18 --- /dev/null +++ b/lib/splitclient-rb/helpers/decryption_helper.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +module SplitIoClient + module Helpers + class DecryptionHelper + def self.get_encoded_definition(compression, data) + case compression + when 0 + return Base64.decode64(data) + when 1 + gz = Zlib::GzipReader.new(StringIO.new(Base64.decode64(data))) + return gz.read + when 2 + return Zlib::Inflate.inflate(Base64.decode64(data)) + end + end + end + end +end diff --git a/lib/splitclient-rb/sse/notification_processor.rb b/lib/splitclient-rb/sse/notification_processor.rb index 51c8616a..0c137f59 100644 --- a/lib/splitclient-rb/sse/notification_processor.rb +++ b/lib/splitclient-rb/sse/notification_processor.rb @@ -26,16 +26,12 @@ def process(incoming_notification) def process_split_update(notification) @config.logger.debug("SPLIT UPDATE notification received: #{notification}") if @config.debug_enabled - @splits_worker.split_update(notification) + @splits_worker.add_to_queue(notification) end def process_split_kill(notification) @config.logger.debug("SPLIT KILL notification received: #{notification}") if @config.debug_enabled - change_number = notification.data['changeNumber'] - default_treatment = notification.data['defaultTreatment'] - split_name = notification.data['splitName'] - - @splits_worker.kill_split(change_number, split_name, default_treatment) + @splits_worker.add_to_queue(notification) end def process_segment_update(notification) diff --git a/lib/splitclient-rb/sse/workers/splits_worker.rb b/lib/splitclient-rb/sse/workers/splits_worker.rb index 1e603ca3..e781f460 100644 --- a/lib/splitclient-rb/sse/workers/splits_worker.rb +++ b/lib/splitclient-rb/sse/workers/splits_worker.rb @@ -1,4 +1,5 @@ # frozen_string_literal: true +require 'byebug' module SplitIoClient module SSE @@ -33,9 +34,10 @@ def stop end def split_update(notification) +# byebug if @splits_repository.get_change_number() == notification.data['pcn'] begin - @new_split = JSON.parse(get_encoded_definition(notification), symbolize_names: true) + @new_split = JSON.parse(SplitIoClient::Helpers::DecryptionHelper.get_encoded_definition(notification.data['c'], notification.data['d']), symbolize_names: true) @splits_repository.add_split(@new_split) @splits_repository.set_change_number(notification.data['changeNumber']) return @@ -43,40 +45,33 @@ def split_update(notification) @config.logger.debug("Failed to update Split: #{e.inspect}") if @config.debug_enabled end end - add_to_queue(notification.data['changeNumber']) + @synchronizer.fetch_splits(notification.data['changeNumber']) end - def add_to_queue(change_number) - @config.logger.debug("feature_flags_worker add to queue #{change_number}") - @queue.push(change_number) + def add_to_queue(notification) + @config.logger.debug("feature_flags_worker add to queue #{notification.data['changeNumber']}") + @queue.push(notification) end - def kill_split(change_number, split_name, default_treatment) - return if @splits_repository.get_change_number.to_i > change_number + def kill_split(notification) + return if @splits_repository.get_change_number.to_i > notification.data['changeNumber'] - @config.logger.debug("feature_flags_worker kill #{split_name}, #{change_number}") - @splits_repository.kill(change_number, split_name, default_treatment) - add_to_queue(change_number) + @config.logger.debug("feature_flags_worker kill #{notification.data['splitName']}, #{notification.data['changeNumber']}") + @splits_repository.kill(notification.data['changeNumber'], notification.data['splitName'], notification.data['defaultTreatment']) + @synchronizer.fetch_splits(notification.data['changeNumber']) end private - def get_encoded_definition(notification) - case notification.data[:c] - when 0 - return Base64.decode64(notification.data[:d]) - when 1 - gz = Zlib::GzipReader.new(StringIO.new(Base64.decode64(notification.data[:d]))) - return gz.read - when 2 - return Zlib::Inflate.inflate(Base64.decode64(notification.data[:d])) - end - end - def perform - while (change_number = @queue.pop) - @config.logger.debug("feature_flags_worker change_number dequeue #{change_number}") - @synchronizer.fetch_splits(change_number) + while (notification = @queue.pop) + @config.logger.debug("feature_flags_worker change_number dequeue #{notification.data['changeNumber']}") + case notification.data['type'] + when SSE::EventSource::EventTypes::SPLIT_UPDATE + split_update(notification) + when SSE::EventSource::EventTypes::SPLIT_KILL + kill_split(notification) + end end end diff --git a/spec/sse/workers/splits_worker_spec.rb b/spec/sse/workers/splits_worker_spec.rb index 8d5fb88f..82a79fb2 100644 --- a/spec/sse/workers/splits_worker_spec.rb +++ b/spec/sse/workers/splits_worker_spec.rb @@ -17,9 +17,9 @@ let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config) } let(:telemetry_runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) } let(:split_fetcher) { SplitIoClient::Cache::Fetchers::SplitFetcher.new(splits_repository, api_key, config, telemetry_runtime_producer) } - let(:event_split_update_no_compression) { SplitIoClient::SSE::EventSource::StreamData.new("data", 123, {"type":"SPLIT_UPDATE","changeNumber":5564531221,"pcn":1234,"c": 0,"d":"eyJ0cmFmZmljVHlwZU5hbWUiOiJ1c2VyIiwiaWQiOiIzM2VhZmE1MC0xYTY1LTExZWQtOTBkZi1mYTMwZDk2OTA0NDUiLCJuYW1lIjoiYmlsYWxfc3BsaXQiLCJ0cmFmZmljQWxsb2NhdGlvbiI6MTAwLCJ0cmFmZmljQWxsb2NhdGlvblNlZWQiOi0xMzY0MTE5MjgyLCJzZWVkIjotNjA1OTM4ODQzLCJzdGF0dXMiOiJBQ1RJVkUiLCJraWxsZWQiOmZhbHNlLCJkZWZhdWx0VHJlYXRtZW50Ijoib2ZmIiwiY2hhbmdlTnVtYmVyIjoxNjg0MzQwOTA4NDc1LCJhbGdvIjoyLCJjb25maWd1cmF0aW9ucyI6e30sImNvbmRpdGlvbnMiOlt7ImNvbmRpdGlvblR5cGUiOiJST0xMT1VUIiwibWF0Y2hlckdyb3VwIjp7ImNvbWJpbmVyIjoiQU5EIiwibWF0Y2hlcnMiOlt7ImtleVNlbGVjdG9yIjp7InRyYWZmaWNUeXBlIjoidXNlciJ9LCJtYXRjaGVyVHlwZSI6IklOX1NFR01FTlQiLCJuZWdhdGUiOmZhbHNlLCJ1c2VyRGVmaW5lZFNlZ21lbnRNYXRjaGVyRGF0YSI6eyJzZWdtZW50TmFtZSI6ImJpbGFsX3NlZ21lbnQifX1dfSwicGFydGl0aW9ucyI6W3sidHJlYXRtZW50Ijoib24iLCJzaXplIjowfSx7InRyZWF0bWVudCI6Im9mZiIsInNpemUiOjEwMH1dLCJsYWJlbCI6ImluIHNlZ21lbnQgYmlsYWxfc2VnbWVudCJ9LHsiY29uZGl0aW9uVHlwZSI6IlJPTExPVVQiLCJtYXRjaGVyR3JvdXAiOnsiY29tYmluZXIiOiJBTkQiLCJtYXRjaGVycyI6W3sia2V5U2VsZWN0b3IiOnsidHJhZmZpY1R5cGUiOiJ1c2VyIn0sIm1hdGNoZXJUeXBlIjoiQUxMX0tFWVMiLCJuZWdhdGUiOmZhbHNlfV19LCJwYXJ0aXRpb25zIjpbeyJ0cmVhdG1lbnQiOiJvbiIsInNpemUiOjB9LHsidHJlYXRtZW50Ijoib2ZmIiwic2l6ZSI6MTAwfV0sImxhYmVsIjoiZGVmYXVsdCBydWxlIn1dfQ=="}, 'test') } - let(:event_split_update_gzip_compression) { SplitIoClient::SSE::EventSource::StreamData.new("data", 123, {"type":"SPLIT_UPDATE","changeNumber":5564531221,"pcn":1234,"c": 1,"d":"H4sIAAkVZWQC/8WST0+DQBDFv0qzZ0ig/BF6a2xjGismUk2MaZopzOKmy9Isy0EbvrtDwbY2Xo233Tdv5se85cCMBs5FtvrYYwIlsglratTMYiKns+chcAgc24UwsF0Xczt2cm5z8Jw8DmPH9wPyqr5zKyTITb2XwpA4TJ5KWWVgRKXYxHWcX/QUkVi264W+68bjaGyxupdCJ4i9KPI9UgyYpibI9Ha1eJnT/J2QsnNxkDVaLEcOjTQrjWBKVIasFefky95BFZg05Zb2mrhh5I9vgsiL44BAIIuKTeiQVYqLotHHLyLOoT1quRjub4fztQuLxj89LpePzytClGCyd9R3umr21ErOcitUh2PTZHY29HN2+JGixMxUujNfvMB3+u2pY1AXySad3z3Mk46msACDp8W7jhly4uUpFt3qD33vDAx0gLpXkx+P1GusbdcE24M2F4uaywwVEWvxSa1Oa13Vjvn2RXradm0xCVuUVBJqNCBGV0DrX4OcLpeb+/lreh3jH8Uw/JQj3UhkxPgCCurdEnADAAA="}, 'test') } - let(:event_split_update_zlib_compression) { SplitIoClient::SSE::EventSource::StreamData.new("data", 123, {"type":"SPLIT_UPDATE","changeNumber":5564531221,"pcn":1234,"c": 2,"d":"eJzEUtFq20AQ/JUwz2c4WZZr3ZupTQh1FKjcQinGrKU95cjpZE6nh9To34ssJ3FNX0sfd3Zm53b2TgietDbF9vXIGdUMha5lDwFTQiGOmTQlchLRPJlEEZeTVJZ6oimWZTpP5WyWQMCNyoOxZPft0ZoA8TZ5aW1TUDCNg4qk/AueM5dQkyiez6IonS6mAu0IzWWSxovFLBZoA4WuhcLy8/bh+xoCL8bagaXJtixQsqbOhq1nCjW7AIVGawgUz+Qqzrr6wB4qmi9m00/JIk7TZCpAtmqgpgJF47SpOn9+UQt16s9YaS71z9NHOYQFha9Pm83Tty0EagrFM/t733RHqIFZH4wb7LDMVh+Ecc4Lv+ZsuQiNH8hXF3hLv39XXNCHbJ+v7x/X2eDmuKLA74sPihVr47jMuRpWfxy1Kwo0GLQjmv1xpBFD3+96gSP5cLVouM7QQaA1vxhK9uKmd853bEZS9jsBSwe2UDDu7mJxd2Mo/muQy81m/2X9I7+N8R/FcPmUd76zjH7X/w4AAP//90glTw=="}, 'test') } + let(:event_split_update_no_compression) { SplitIoClient::SSE::EventSource::StreamData.new("data", 123, JSON.parse('{"type":"SPLIT_UPDATE","changeNumber":5564531221,"pcn":1234,"c": 0,"d":"eyJ0cmFmZmljVHlwZU5hbWUiOiJ1c2VyIiwiaWQiOiIzM2VhZmE1MC0xYTY1LTExZWQtOTBkZi1mYTMwZDk2OTA0NDUiLCJuYW1lIjoiYmlsYWxfc3BsaXQiLCJ0cmFmZmljQWxsb2NhdGlvbiI6MTAwLCJ0cmFmZmljQWxsb2NhdGlvblNlZWQiOi0xMzY0MTE5MjgyLCJzZWVkIjotNjA1OTM4ODQzLCJzdGF0dXMiOiJBQ1RJVkUiLCJraWxsZWQiOmZhbHNlLCJkZWZhdWx0VHJlYXRtZW50Ijoib2ZmIiwiY2hhbmdlTnVtYmVyIjoxNjg0MzQwOTA4NDc1LCJhbGdvIjoyLCJjb25maWd1cmF0aW9ucyI6e30sImNvbmRpdGlvbnMiOlt7ImNvbmRpdGlvblR5cGUiOiJST0xMT1VUIiwibWF0Y2hlckdyb3VwIjp7ImNvbWJpbmVyIjoiQU5EIiwibWF0Y2hlcnMiOlt7ImtleVNlbGVjdG9yIjp7InRyYWZmaWNUeXBlIjoidXNlciJ9LCJtYXRjaGVyVHlwZSI6IklOX1NFR01FTlQiLCJuZWdhdGUiOmZhbHNlLCJ1c2VyRGVmaW5lZFNlZ21lbnRNYXRjaGVyRGF0YSI6eyJzZWdtZW50TmFtZSI6ImJpbGFsX3NlZ21lbnQifX1dfSwicGFydGl0aW9ucyI6W3sidHJlYXRtZW50Ijoib24iLCJzaXplIjowfSx7InRyZWF0bWVudCI6Im9mZiIsInNpemUiOjEwMH1dLCJsYWJlbCI6ImluIHNlZ21lbnQgYmlsYWxfc2VnbWVudCJ9LHsiY29uZGl0aW9uVHlwZSI6IlJPTExPVVQiLCJtYXRjaGVyR3JvdXAiOnsiY29tYmluZXIiOiJBTkQiLCJtYXRjaGVycyI6W3sia2V5U2VsZWN0b3IiOnsidHJhZmZpY1R5cGUiOiJ1c2VyIn0sIm1hdGNoZXJUeXBlIjoiQUxMX0tFWVMiLCJuZWdhdGUiOmZhbHNlfV19LCJwYXJ0aXRpb25zIjpbeyJ0cmVhdG1lbnQiOiJvbiIsInNpemUiOjB9LHsidHJlYXRtZW50Ijoib2ZmIiwic2l6ZSI6MTAwfV0sImxhYmVsIjoiZGVmYXVsdCBydWxlIn1dfQ=="}'), 'test') } + let(:event_split_update_gzip_compression) { SplitIoClient::SSE::EventSource::StreamData.new("data", 123, JSON.parse('{"type":"SPLIT_UPDATE","changeNumber":5564531221,"pcn":1234,"c": 1,"d":"H4sIAAkVZWQC/8WST0+DQBDFv0qzZ0ig/BF6a2xjGismUk2MaZopzOKmy9Isy0EbvrtDwbY2Xo233Tdv5se85cCMBs5FtvrYYwIlsglratTMYiKns+chcAgc24UwsF0Xczt2cm5z8Jw8DmPH9wPyqr5zKyTITb2XwpA4TJ5KWWVgRKXYxHWcX/QUkVi264W+68bjaGyxupdCJ4i9KPI9UgyYpibI9Ha1eJnT/J2QsnNxkDVaLEcOjTQrjWBKVIasFefky95BFZg05Zb2mrhh5I9vgsiL44BAIIuKTeiQVYqLotHHLyLOoT1quRjub4fztQuLxj89LpePzytClGCyd9R3umr21ErOcitUh2PTZHY29HN2+JGixMxUujNfvMB3+u2pY1AXySad3z3Mk46msACDp8W7jhly4uUpFt3qD33vDAx0gLpXkx+P1GusbdcE24M2F4uaywwVEWvxSa1Oa13Vjvn2RXradm0xCVuUVBJqNCBGV0DrX4OcLpeb+/lreh3jH8Uw/JQj3UhkxPgCCurdEnADAAA="}'), 'test') } + let(:event_split_update_zlib_compression) { SplitIoClient::SSE::EventSource::StreamData.new("data", 123, JSON.parse('{"type":"SPLIT_UPDATE","changeNumber":5564531221,"pcn":1234,"c": 2,"d":"eJzEUtFq20AQ/JUwz2c4WZZr3ZupTQh1FKjcQinGrKU95cjpZE6nh9To34ssJ3FNX0sfd3Zm53b2TgietDbF9vXIGdUMha5lDwFTQiGOmTQlchLRPJlEEZeTVJZ6oimWZTpP5WyWQMCNyoOxZPft0ZoA8TZ5aW1TUDCNg4qk/AueM5dQkyiez6IonS6mAu0IzWWSxovFLBZoA4WuhcLy8/bh+xoCL8bagaXJtixQsqbOhq1nCjW7AIVGawgUz+Qqzrr6wB4qmi9m00/JIk7TZCpAtmqgpgJF47SpOn9+UQt16s9YaS71z9NHOYQFha9Pm83Tty0EagrFM/t733RHqIFZH4wb7LDMVh+Ecc4Lv+ZsuQiNH8hXF3hLv39XXNCHbJ+v7x/X2eDmuKLA74sPihVr47jMuRpWfxy1Kwo0GLQjmv1xpBFD3+96gSP5cLVouM7QQaA1vxhK9uKmd853bEZS9jsBSwe2UDDu7mJxd2Mo/muQy81m/2X9I7+N8R/FcPmUd76zjH7X/w4AAP//90glTw=="}'), 'test') } let(:synchronizer) do segments_repository = SplitIoClient::Cache::Repositories::SegmentsRepository.new(config) telemetry_api = SplitIoClient::Api::TelemetryApi.new(config, api_key, telemetry_runtime_producer) @@ -49,7 +49,7 @@ worker = subject.new(synchronizer, config, splits_repository) worker.start - worker.add_to_queue(1_506_703_262_919) + worker.add_to_queue(SplitIoClient::SSE::EventSource::StreamData.new("SPLIT_UPDATE", 123, JSON.parse('{"type":"SPLIT_UPDATE","changeNumber":1506703262919}'), 'test')) sleep 1 @@ -64,7 +64,7 @@ worker = subject.new(synchronizer, config, splits_repository) worker.start - worker.add_to_queue(1_506_703_262_918) + worker.add_to_queue(SplitIoClient::SSE::EventSource::StreamData.new("SPLIT_UPDATE", 123, JSON.parse('{"type":"SPLIT_UPDATE","changeNumber":1506703262918}'), 'test')) sleep 1 expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262916')).to have_been_made.once @@ -75,7 +75,7 @@ worker = subject.new(synchronizer, config, splits_repository) worker.start - worker.add_to_queue(1_506_703_262_916) + worker.add_to_queue(SplitIoClient::SSE::EventSource::StreamData.new("SPLIT_UPDATE", 123, JSON.parse('{"type":"SPLIT_UPDATE","changeNumber":1506703262916}'), 'test')) sleep 1 expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262916')).to have_been_made.times(0) @@ -83,7 +83,7 @@ it 'without start, must not fetch' do worker = subject.new(synchronizer, config, splits_repository) - worker.add_to_queue(1_506_703_262_918) + worker.add_to_queue(SplitIoClient::SSE::EventSource::StreamData.new("SPLIT_UPDATE", 123, JSON.parse('{"type":"SPLIT_UPDATE","changeNumber":1506703262918}'), 'test')) expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262916')).to have_been_made.times(0) end @@ -106,7 +106,7 @@ worker = subject.new(synchronizer, config, splits_repository) worker.start - worker.kill_split(1_506_703_262_918, 'FACUNDO_TEST', 'on') + worker.kill_split(SplitIoClient::SSE::EventSource::StreamData.new("SPLIT_KILL", 123, JSON.parse('{"splitName":"FACUNDO_TEST", "defaultTreatment":"on", "type":"SPLIT_KILL","changeNumber":1506703262918}'), 'test')) sleep(1) @@ -121,7 +121,7 @@ worker = subject.new(synchronizer, config, splits_repository) worker.start - worker.kill_split(1_506_703_262_916, 'FACUNDO_TEST', 'on') + worker.kill_split(SplitIoClient::SSE::EventSource::StreamData.new("SPLIT_KILL", 123, JSON.parse('{"splitName":"FACUNDO_TEST", "defaultTreatment":"on", "type":"SPLIT_KILL","changeNumber":1506703262916}'), 'test')) sleep(1) @@ -133,18 +133,27 @@ end end - context 'update split notification' do + context 'instant ff update split notification' do it 'decode and decompress split update data' do worker = subject.new(synchronizer, config, splits_repository) worker.start - split_definition = JSON.parse(worker.send(:get_encoded_definition, event_split_update_no_compression)) - expect(split_definition['name'] == 'bilal_split') + splits_repository.set_change_number(1234) + worker.add_to_queue(event_split_update_no_compression) + sleep 1 + split = splits_repository.get_split('bilal_split') + expect(split[:name] == 'bilal_split') - split_definition = JSON.parse(worker.send(:get_encoded_definition, event_split_update_gzip_compression)) - expect(split_definition['name'] == 'bilal_split') + splits_repository.set_change_number(1234) + worker.add_to_queue(event_split_update_gzip_compression) + sleep 1 + split = splits_repository.get_split('bilal_split') + expect(split[:name] == 'bilal_split') - split_definition = JSON.parse(worker.send(:get_encoded_definition, event_split_update_zlib_compression)) - expect(split_definition['name'] == 'bilal_split') + splits_repository.set_change_number(1234) + worker.add_to_queue(event_split_update_zlib_compression) + sleep 1 + split = splits_repository.get_split('bilal_split') + expect(split[:name] == 'bilal_split') end end From a75db9c1224eddc4d9a0a5b2fab9a9b016dc0168 Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany Date: Wed, 31 May 2023 13:05:48 -0700 Subject: [PATCH 4/9] polish --- lib/splitclient-rb/sse/workers/splits_worker.rb | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/splitclient-rb/sse/workers/splits_worker.rb b/lib/splitclient-rb/sse/workers/splits_worker.rb index e781f460..35137d52 100644 --- a/lib/splitclient-rb/sse/workers/splits_worker.rb +++ b/lib/splitclient-rb/sse/workers/splits_worker.rb @@ -1,5 +1,4 @@ # frozen_string_literal: true -require 'byebug' module SplitIoClient module SSE @@ -34,7 +33,6 @@ def stop end def split_update(notification) -# byebug if @splits_repository.get_change_number() == notification.data['pcn'] begin @new_split = JSON.parse(SplitIoClient::Helpers::DecryptionHelper.get_encoded_definition(notification.data['c'], notification.data['d']), symbolize_names: true) From 5a8491780fa34bc2b52f7363d77da64cc9f36dd6 Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany Date: Thu, 1 Jun 2023 09:01:48 -0700 Subject: [PATCH 5/9] used constants for compression level --- lib/splitclient-rb/helpers/decryption_helper.rb | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/lib/splitclient-rb/helpers/decryption_helper.rb b/lib/splitclient-rb/helpers/decryption_helper.rb index c7c2af18..b8e3dc22 100644 --- a/lib/splitclient-rb/helpers/decryption_helper.rb +++ b/lib/splitclient-rb/helpers/decryption_helper.rb @@ -1,16 +1,20 @@ # frozen_string_literal: true module SplitIoClient + NO_COMPRESSION = 0 + GZIP_COMPRESSION = 1 + ZLIB_COMPRESSION = 2 + module Helpers class DecryptionHelper def self.get_encoded_definition(compression, data) case compression - when 0 + when NO_COMPRESSION return Base64.decode64(data) - when 1 + when GZIP_COMPRESSION gz = Zlib::GzipReader.new(StringIO.new(Base64.decode64(data))) return gz.read - when 2 + when ZLIB_COMPRESSION return Zlib::Inflate.inflate(Base64.decode64(data)) end end From 8e9d1b52ceab3d11ca4cde7a5629f0ef83a86833 Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany Date: Mon, 5 Jun 2023 08:17:45 -0700 Subject: [PATCH 6/9] polish and fixes --- .../sse/workers/splits_worker.rb | 23 +++++++++++++++---- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/lib/splitclient-rb/sse/workers/splits_worker.rb b/lib/splitclient-rb/sse/workers/splits_worker.rb index 35137d52..1a258765 100644 --- a/lib/splitclient-rb/sse/workers/splits_worker.rb +++ b/lib/splitclient-rb/sse/workers/splits_worker.rb @@ -33,13 +33,22 @@ def stop end def split_update(notification) - if @splits_repository.get_change_number() == notification.data['pcn'] + return if @splits_repository.get_change_number.to_i > notification.data['changeNumber'] + + if @splits_repository.get_change_number == notification.data['pcn'] begin - @new_split = JSON.parse(SplitIoClient::Helpers::DecryptionHelper.get_encoded_definition(notification.data['c'], notification.data['d']), symbolize_names: true) - @splits_repository.add_split(@new_split) + @splits_repository.add_split( + JSON.parse( + SplitIoClient::Helpers::DecryptionHelper.get_encoded_definition( + notification.data['c'], + notification.data['d'] + ), + symbolize_names: true + ) + ) @splits_repository.set_change_number(notification.data['changeNumber']) return - rescue Exception => e + rescue StandardError => e @config.logger.debug("Failed to update Split: #{e.inspect}") if @config.debug_enabled end end @@ -55,7 +64,11 @@ def kill_split(notification) return if @splits_repository.get_change_number.to_i > notification.data['changeNumber'] @config.logger.debug("feature_flags_worker kill #{notification.data['splitName']}, #{notification.data['changeNumber']}") - @splits_repository.kill(notification.data['changeNumber'], notification.data['splitName'], notification.data['defaultTreatment']) + @splits_repository.kill( + notification.data['changeNumber'], + notification.data['splitName'], + notification.data['defaultTreatment'] + ) @synchronizer.fetch_splits(notification.data['changeNumber']) end From 6164afd832c9e255bfb9e6181cd1f843b8c60fa0 Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany Date: Mon, 5 Jun 2023 08:19:20 -0700 Subject: [PATCH 7/9] added byebug --- splitclient-rb.gemspec | 1 + 1 file changed, 1 insertion(+) diff --git a/splitclient-rb.gemspec b/splitclient-rb.gemspec index fb6a32c8..659a967a 100644 --- a/splitclient-rb.gemspec +++ b/splitclient-rb.gemspec @@ -48,6 +48,7 @@ Gem::Specification.new do |spec| spec.add_development_dependency 'timecop', '~> 0.9' spec.add_development_dependency 'webmock', '~> 3.14' spec.add_development_dependency 'webrick', '~> 1.7' + spec.add_development_dependency 'byebug', '~> 11.1' spec.add_runtime_dependency 'bitarray', '~> 1.3' spec.add_runtime_dependency 'concurrent-ruby', '~> 1.0' From d2a6b31a80122214607c96d7be54520428eec16e Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany Date: Mon, 5 Jun 2023 08:48:38 -0700 Subject: [PATCH 8/9] renaming split to feature flag --- .../sse/workers/splits_worker.rb | 38 +++++++++---------- spec/sse/workers/splits_worker_spec.rb | 4 +- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/lib/splitclient-rb/sse/workers/splits_worker.rb b/lib/splitclient-rb/sse/workers/splits_worker.rb index 1a258765..87e8d968 100644 --- a/lib/splitclient-rb/sse/workers/splits_worker.rb +++ b/lib/splitclient-rb/sse/workers/splits_worker.rb @@ -4,10 +4,10 @@ module SplitIoClient module SSE module Workers class SplitsWorker - def initialize(synchronizer, config, splits_repository) + def initialize(synchronizer, config, feature_flags_repository) @synchronizer = synchronizer @config = config - @splits_repository = splits_repository + @feature_flags_repository = feature_flags_repository @queue = Queue.new @running = Concurrent::AtomicBoolean.new(false) end @@ -32,12 +32,19 @@ def stop SplitIoClient::Helpers::ThreadHelper.stop(:split_update_worker, @config) end - def split_update(notification) - return if @splits_repository.get_change_number.to_i > notification.data['changeNumber'] + def add_to_queue(notification) + @config.logger.debug("feature_flags_worker add to queue #{notification.data['changeNumber']}") + @queue.push(notification) + end + + private + + def update_feature_flag(notification) + return if @feature_flags_repository.get_change_number.to_i > notification.data['changeNumber'] - if @splits_repository.get_change_number == notification.data['pcn'] + if @feature_flags_repository.get_change_number == notification.data['pcn'] begin - @splits_repository.add_split( + @feature_flags_repository.add_split( JSON.parse( SplitIoClient::Helpers::DecryptionHelper.get_encoded_definition( notification.data['c'], @@ -46,7 +53,7 @@ def split_update(notification) symbolize_names: true ) ) - @splits_repository.set_change_number(notification.data['changeNumber']) + @feature_flags_repository.set_change_number(notification.data['changeNumber']) return rescue StandardError => e @config.logger.debug("Failed to update Split: #{e.inspect}") if @config.debug_enabled @@ -55,16 +62,11 @@ def split_update(notification) @synchronizer.fetch_splits(notification.data['changeNumber']) end - def add_to_queue(notification) - @config.logger.debug("feature_flags_worker add to queue #{notification.data['changeNumber']}") - @queue.push(notification) - end - - def kill_split(notification) - return if @splits_repository.get_change_number.to_i > notification.data['changeNumber'] + def kill_feature_flag(notification) + return if @feature_flags_repository.get_change_number.to_i > notification.data['changeNumber'] @config.logger.debug("feature_flags_worker kill #{notification.data['splitName']}, #{notification.data['changeNumber']}") - @splits_repository.kill( + @feature_flags_repository.kill( notification.data['changeNumber'], notification.data['splitName'], notification.data['defaultTreatment'] @@ -72,16 +74,14 @@ def kill_split(notification) @synchronizer.fetch_splits(notification.data['changeNumber']) end - private - def perform while (notification = @queue.pop) @config.logger.debug("feature_flags_worker change_number dequeue #{notification.data['changeNumber']}") case notification.data['type'] when SSE::EventSource::EventTypes::SPLIT_UPDATE - split_update(notification) + update_feature_flag(notification) when SSE::EventSource::EventTypes::SPLIT_KILL - kill_split(notification) + kill_feature_flag(notification) end end end diff --git a/spec/sse/workers/splits_worker_spec.rb b/spec/sse/workers/splits_worker_spec.rb index 82a79fb2..5f67a120 100644 --- a/spec/sse/workers/splits_worker_spec.rb +++ b/spec/sse/workers/splits_worker_spec.rb @@ -106,7 +106,7 @@ worker = subject.new(synchronizer, config, splits_repository) worker.start - worker.kill_split(SplitIoClient::SSE::EventSource::StreamData.new("SPLIT_KILL", 123, JSON.parse('{"splitName":"FACUNDO_TEST", "defaultTreatment":"on", "type":"SPLIT_KILL","changeNumber":1506703262918}'), 'test')) + worker.send :kill_feature_flag, SplitIoClient::SSE::EventSource::StreamData.new("SPLIT_KILL", 123, JSON.parse('{"splitName":"FACUNDO_TEST", "defaultTreatment":"on", "type":"SPLIT_KILL","changeNumber":1506703262918}'), 'test') sleep(1) @@ -121,7 +121,7 @@ worker = subject.new(synchronizer, config, splits_repository) worker.start - worker.kill_split(SplitIoClient::SSE::EventSource::StreamData.new("SPLIT_KILL", 123, JSON.parse('{"splitName":"FACUNDO_TEST", "defaultTreatment":"on", "type":"SPLIT_KILL","changeNumber":1506703262916}'), 'test')) + worker.send :kill_feature_flag, SplitIoClient::SSE::EventSource::StreamData.new("SPLIT_KILL", 123, JSON.parse('{"splitName":"FACUNDO_TEST", "defaultTreatment":"on", "type":"SPLIT_KILL","changeNumber":1506703262916}'), 'test') sleep(1) From 3eecb05c0db10433edb1180defbd50915caea1b3 Mon Sep 17 00:00:00 2001 From: Bilal Al-Shahwany Date: Mon, 5 Jun 2023 11:24:05 -0700 Subject: [PATCH 9/9] polishing --- lib/splitclient-rb/helpers/decryption_helper.rb | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/splitclient-rb/helpers/decryption_helper.rb b/lib/splitclient-rb/helpers/decryption_helper.rb index b8e3dc22..5dc63128 100644 --- a/lib/splitclient-rb/helpers/decryption_helper.rb +++ b/lib/splitclient-rb/helpers/decryption_helper.rb @@ -10,12 +10,14 @@ class DecryptionHelper def self.get_encoded_definition(compression, data) case compression when NO_COMPRESSION - return Base64.decode64(data) + Base64.decode64(data) when GZIP_COMPRESSION gz = Zlib::GzipReader.new(StringIO.new(Base64.decode64(data))) - return gz.read + gz.read when ZLIB_COMPRESSION - return Zlib::Inflate.inflate(Base64.decode64(data)) + Zlib::Inflate.inflate(Base64.decode64(data)) + else + raise StandardError, 'Compression flag value is incorrect' end end end