Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/splitclient-rb.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
require 'splitclient-rb/clients/split_client'
require 'splitclient-rb/managers/split_manager'
require 'splitclient-rb/helpers/thread_helper'
require 'splitclient-rb/helpers/decryption_helper'
require 'splitclient-rb/split_factory'
require 'splitclient-rb/split_factory_builder'
require 'splitclient-rb/split_config'
Expand Down
25 changes: 25 additions & 0 deletions lib/splitclient-rb/helpers/decryption_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# frozen_string_literal: true

module SplitIoClient
NO_COMPRESSION = 0
GZIP_COMPRESSION = 1
ZLIB_COMPRESSION = 2

module Helpers
class DecryptionHelper
def self.get_encoded_definition(compression, data)
case compression
when NO_COMPRESSION
Base64.decode64(data)
when GZIP_COMPRESSION
gz = Zlib::GzipReader.new(StringIO.new(Base64.decode64(data)))
gz.read
when ZLIB_COMPRESSION
Zlib::Inflate.inflate(Base64.decode64(data))
else
raise StandardError, 'Compression flag value is incorrect'
end
end
end
end
end
8 changes: 2 additions & 6 deletions lib/splitclient-rb/sse/notification_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,12 @@ def process(incoming_notification)

def process_split_update(notification)
@config.logger.debug("SPLIT UPDATE notification received: #{notification}") if @config.debug_enabled
@splits_worker.add_to_queue(notification.data['changeNumber'])
@splits_worker.add_to_queue(notification)
end

def process_split_kill(notification)
@config.logger.debug("SPLIT KILL notification received: #{notification}") if @config.debug_enabled
change_number = notification.data['changeNumber']
default_treatment = notification.data['defaultTreatment']
split_name = notification.data['splitName']

@splits_worker.kill_split(change_number, split_name, default_treatment)
@splits_worker.add_to_queue(notification)
end

def process_segment_update(notification)
Expand Down
60 changes: 46 additions & 14 deletions lib/splitclient-rb/sse/workers/splits_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ module SplitIoClient
module SSE
module Workers
class SplitsWorker
def initialize(synchronizer, config, splits_repository)
def initialize(synchronizer, config, feature_flags_repository)
@synchronizer = synchronizer
@config = config
@splits_repository = splits_repository
@feature_flags_repository = feature_flags_repository
@queue = Queue.new
@running = Concurrent::AtomicBoolean.new(false)
end
Expand All @@ -32,25 +32,57 @@ def stop
SplitIoClient::Helpers::ThreadHelper.stop(:split_update_worker, @config)
end

def add_to_queue(change_number)
@config.logger.debug("feature_flags_worker add to queue #{change_number}")
@queue.push(change_number)
def add_to_queue(notification)
@config.logger.debug("feature_flags_worker add to queue #{notification.data['changeNumber']}")
@queue.push(notification)
end

def kill_split(change_number, split_name, default_treatment)
return if @splits_repository.get_change_number.to_i > change_number
private

def update_feature_flag(notification)
return if @feature_flags_repository.get_change_number.to_i > notification.data['changeNumber']

@config.logger.debug("feature_flags_worker kill #{split_name}, #{change_number}")
@splits_repository.kill(change_number, split_name, default_treatment)
add_to_queue(change_number)
if @feature_flags_repository.get_change_number == notification.data['pcn']
begin
@feature_flags_repository.add_split(
JSON.parse(
SplitIoClient::Helpers::DecryptionHelper.get_encoded_definition(
notification.data['c'],
notification.data['d']
),
symbolize_names: true
)
)
@feature_flags_repository.set_change_number(notification.data['changeNumber'])
return
rescue StandardError => e
@config.logger.debug("Failed to update Split: #{e.inspect}") if @config.debug_enabled
end
end
@synchronizer.fetch_splits(notification.data['changeNumber'])
end

private
def kill_feature_flag(notification)
return if @feature_flags_repository.get_change_number.to_i > notification.data['changeNumber']

@config.logger.debug("feature_flags_worker kill #{notification.data['splitName']}, #{notification.data['changeNumber']}")
@feature_flags_repository.kill(
notification.data['changeNumber'],
notification.data['splitName'],
notification.data['defaultTreatment']
)
@synchronizer.fetch_splits(notification.data['changeNumber'])
end

def perform
while (change_number = @queue.pop)
@config.logger.debug("feature_flags_worker change_number dequeue #{change_number}")
@synchronizer.fetch_splits(change_number)
while (notification = @queue.pop)
@config.logger.debug("feature_flags_worker change_number dequeue #{notification.data['changeNumber']}")
case notification.data['type']
when SSE::EventSource::EventTypes::SPLIT_UPDATE
update_feature_flag(notification)
when SSE::EventSource::EventTypes::SPLIT_KILL
kill_feature_flag(notification)
end
end
end

Expand Down
41 changes: 35 additions & 6 deletions spec/sse/workers/splits_worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require 'spec_helper'
require 'http_server_mock'
require 'byebug'

describe SplitIoClient::SSE::Workers::SplitsWorker do
subject { SplitIoClient::SSE::Workers::SplitsWorker }
Expand All @@ -16,6 +17,9 @@
let(:splits_repository) { SplitIoClient::Cache::Repositories::SplitsRepository.new(config) }
let(:telemetry_runtime_producer) { SplitIoClient::Telemetry::RuntimeProducer.new(config) }
let(:split_fetcher) { SplitIoClient::Cache::Fetchers::SplitFetcher.new(splits_repository, api_key, config, telemetry_runtime_producer) }
let(:event_split_update_no_compression) { SplitIoClient::SSE::EventSource::StreamData.new("data", 123, JSON.parse('{"type":"SPLIT_UPDATE","changeNumber":5564531221,"pcn":1234,"c": 0,"d":"eyJ0cmFmZmljVHlwZU5hbWUiOiJ1c2VyIiwiaWQiOiIzM2VhZmE1MC0xYTY1LTExZWQtOTBkZi1mYTMwZDk2OTA0NDUiLCJuYW1lIjoiYmlsYWxfc3BsaXQiLCJ0cmFmZmljQWxsb2NhdGlvbiI6MTAwLCJ0cmFmZmljQWxsb2NhdGlvblNlZWQiOi0xMzY0MTE5MjgyLCJzZWVkIjotNjA1OTM4ODQzLCJzdGF0dXMiOiJBQ1RJVkUiLCJraWxsZWQiOmZhbHNlLCJkZWZhdWx0VHJlYXRtZW50Ijoib2ZmIiwiY2hhbmdlTnVtYmVyIjoxNjg0MzQwOTA4NDc1LCJhbGdvIjoyLCJjb25maWd1cmF0aW9ucyI6e30sImNvbmRpdGlvbnMiOlt7ImNvbmRpdGlvblR5cGUiOiJST0xMT1VUIiwibWF0Y2hlckdyb3VwIjp7ImNvbWJpbmVyIjoiQU5EIiwibWF0Y2hlcnMiOlt7ImtleVNlbGVjdG9yIjp7InRyYWZmaWNUeXBlIjoidXNlciJ9LCJtYXRjaGVyVHlwZSI6IklOX1NFR01FTlQiLCJuZWdhdGUiOmZhbHNlLCJ1c2VyRGVmaW5lZFNlZ21lbnRNYXRjaGVyRGF0YSI6eyJzZWdtZW50TmFtZSI6ImJpbGFsX3NlZ21lbnQifX1dfSwicGFydGl0aW9ucyI6W3sidHJlYXRtZW50Ijoib24iLCJzaXplIjowfSx7InRyZWF0bWVudCI6Im9mZiIsInNpemUiOjEwMH1dLCJsYWJlbCI6ImluIHNlZ21lbnQgYmlsYWxfc2VnbWVudCJ9LHsiY29uZGl0aW9uVHlwZSI6IlJPTExPVVQiLCJtYXRjaGVyR3JvdXAiOnsiY29tYmluZXIiOiJBTkQiLCJtYXRjaGVycyI6W3sia2V5U2VsZWN0b3IiOnsidHJhZmZpY1R5cGUiOiJ1c2VyIn0sIm1hdGNoZXJUeXBlIjoiQUxMX0tFWVMiLCJuZWdhdGUiOmZhbHNlfV19LCJwYXJ0aXRpb25zIjpbeyJ0cmVhdG1lbnQiOiJvbiIsInNpemUiOjB9LHsidHJlYXRtZW50Ijoib2ZmIiwic2l6ZSI6MTAwfV0sImxhYmVsIjoiZGVmYXVsdCBydWxlIn1dfQ=="}'), 'test') }
let(:event_split_update_gzip_compression) { SplitIoClient::SSE::EventSource::StreamData.new("data", 123, JSON.parse('{"type":"SPLIT_UPDATE","changeNumber":5564531221,"pcn":1234,"c": 1,"d":"H4sIAAkVZWQC/8WST0+DQBDFv0qzZ0ig/BF6a2xjGismUk2MaZopzOKmy9Isy0EbvrtDwbY2Xo233Tdv5se85cCMBs5FtvrYYwIlsglratTMYiKns+chcAgc24UwsF0Xczt2cm5z8Jw8DmPH9wPyqr5zKyTITb2XwpA4TJ5KWWVgRKXYxHWcX/QUkVi264W+68bjaGyxupdCJ4i9KPI9UgyYpibI9Ha1eJnT/J2QsnNxkDVaLEcOjTQrjWBKVIasFefky95BFZg05Zb2mrhh5I9vgsiL44BAIIuKTeiQVYqLotHHLyLOoT1quRjub4fztQuLxj89LpePzytClGCyd9R3umr21ErOcitUh2PTZHY29HN2+JGixMxUujNfvMB3+u2pY1AXySad3z3Mk46msACDp8W7jhly4uUpFt3qD33vDAx0gLpXkx+P1GusbdcE24M2F4uaywwVEWvxSa1Oa13Vjvn2RXradm0xCVuUVBJqNCBGV0DrX4OcLpeb+/lreh3jH8Uw/JQj3UhkxPgCCurdEnADAAA="}'), 'test') }
let(:event_split_update_zlib_compression) { SplitIoClient::SSE::EventSource::StreamData.new("data", 123, JSON.parse('{"type":"SPLIT_UPDATE","changeNumber":5564531221,"pcn":1234,"c": 2,"d":"eJzEUtFq20AQ/JUwz2c4WZZr3ZupTQh1FKjcQinGrKU95cjpZE6nh9To34ssJ3FNX0sfd3Zm53b2TgietDbF9vXIGdUMha5lDwFTQiGOmTQlchLRPJlEEZeTVJZ6oimWZTpP5WyWQMCNyoOxZPft0ZoA8TZ5aW1TUDCNg4qk/AueM5dQkyiez6IonS6mAu0IzWWSxovFLBZoA4WuhcLy8/bh+xoCL8bagaXJtixQsqbOhq1nCjW7AIVGawgUz+Qqzrr6wB4qmi9m00/JIk7TZCpAtmqgpgJF47SpOn9+UQt16s9YaS71z9NHOYQFha9Pm83Tty0EagrFM/t733RHqIFZH4wb7LDMVh+Ecc4Lv+ZsuQiNH8hXF3hLv39XXNCHbJ+v7x/X2eDmuKLA74sPihVr47jMuRpWfxy1Kwo0GLQjmv1xpBFD3+96gSP5cLVouM7QQaA1vxhK9uKmd853bEZS9jsBSwe2UDDu7mJxd2Mo/muQy81m/2X9I7+N8R/FcPmUd76zjH7X/w4AAP//90glTw=="}'), 'test') }
let(:synchronizer) do
segments_repository = SplitIoClient::Cache::Repositories::SegmentsRepository.new(config)
telemetry_api = SplitIoClient::Api::TelemetryApi.new(config, api_key, telemetry_runtime_producer)
Expand Down Expand Up @@ -45,7 +49,7 @@

worker = subject.new(synchronizer, config, splits_repository)
worker.start
worker.add_to_queue(1_506_703_262_919)
worker.add_to_queue(SplitIoClient::SSE::EventSource::StreamData.new("SPLIT_UPDATE", 123, JSON.parse('{"type":"SPLIT_UPDATE","changeNumber":1506703262919}'), 'test'))

sleep 1

Expand All @@ -60,7 +64,7 @@

worker = subject.new(synchronizer, config, splits_repository)
worker.start
worker.add_to_queue(1_506_703_262_918)
worker.add_to_queue(SplitIoClient::SSE::EventSource::StreamData.new("SPLIT_UPDATE", 123, JSON.parse('{"type":"SPLIT_UPDATE","changeNumber":1506703262918}'), 'test'))
sleep 1

expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262916')).to have_been_made.once
Expand All @@ -71,15 +75,15 @@

worker = subject.new(synchronizer, config, splits_repository)
worker.start
worker.add_to_queue(1_506_703_262_916)
worker.add_to_queue(SplitIoClient::SSE::EventSource::StreamData.new("SPLIT_UPDATE", 123, JSON.parse('{"type":"SPLIT_UPDATE","changeNumber":1506703262916}'), 'test'))
sleep 1

expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262916')).to have_been_made.times(0)
end

it 'without start, must not fetch' do
worker = subject.new(synchronizer, config, splits_repository)
worker.add_to_queue(1_506_703_262_918)
worker.add_to_queue(SplitIoClient::SSE::EventSource::StreamData.new("SPLIT_UPDATE", 123, JSON.parse('{"type":"SPLIT_UPDATE","changeNumber":1506703262918}'), 'test'))

expect(a_request(:get, 'https://sdk.split.io/api/splitChanges?since=1506703262916')).to have_been_made.times(0)
end
Expand All @@ -102,7 +106,7 @@

worker = subject.new(synchronizer, config, splits_repository)
worker.start
worker.kill_split(1_506_703_262_918, 'FACUNDO_TEST', 'on')
worker.send :kill_feature_flag, SplitIoClient::SSE::EventSource::StreamData.new("SPLIT_KILL", 123, JSON.parse('{"splitName":"FACUNDO_TEST", "defaultTreatment":"on", "type":"SPLIT_KILL","changeNumber":1506703262918}'), 'test')

sleep(1)

Expand All @@ -117,7 +121,7 @@
worker = subject.new(synchronizer, config, splits_repository)

worker.start
worker.kill_split(1_506_703_262_916, 'FACUNDO_TEST', 'on')
worker.send :kill_feature_flag, SplitIoClient::SSE::EventSource::StreamData.new("SPLIT_KILL", 123, JSON.parse('{"splitName":"FACUNDO_TEST", "defaultTreatment":"on", "type":"SPLIT_KILL","changeNumber":1506703262916}'), 'test')

sleep(1)

Expand All @@ -129,6 +133,31 @@
end
end

context 'instant ff update split notification' do
it 'decode and decompress split update data' do
worker = subject.new(synchronizer, config, splits_repository)
worker.start
splits_repository.set_change_number(1234)
worker.add_to_queue(event_split_update_no_compression)
sleep 1
split = splits_repository.get_split('bilal_split')
expect(split[:name] == 'bilal_split')

splits_repository.set_change_number(1234)
worker.add_to_queue(event_split_update_gzip_compression)
sleep 1
split = splits_repository.get_split('bilal_split')
expect(split[:name] == 'bilal_split')

splits_repository.set_change_number(1234)
worker.add_to_queue(event_split_update_zlib_compression)
sleep 1
split = splits_repository.get_split('bilal_split')
expect(split[:name] == 'bilal_split')

end
end

private

def mock_split_changes(splits_json)
Expand Down
1 change: 1 addition & 0 deletions splitclient-rb.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Gem::Specification.new do |spec|
spec.add_development_dependency 'timecop', '~> 0.9'
spec.add_development_dependency 'webmock', '~> 3.14'
spec.add_development_dependency 'webrick', '~> 1.7'
spec.add_development_dependency 'byebug', '~> 11.1'

spec.add_runtime_dependency 'bitarray', '~> 1.3'
spec.add_runtime_dependency 'concurrent-ruby', '~> 1.0'
Expand Down