Skip to content

Commit ac959eb

Browse files
authored
Merge pull request #580 from splitio/development
Development
2 parents 47c78b9 + 5d8f3cc commit ac959eb

File tree

8 files changed

+141
-68
lines changed

8 files changed

+141
-68
lines changed

CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
CHANGES
22

3+
8.8.0 (Sep 26, 2025)
4+
- Added a maximum size payload when posting unique keys telemetry in batches
5+
36
8.7.0 (Aug 1, 2025)
47
- Added a new optional argument to the client `getTreatment` methods to allow passing additional evaluation options, such as a map of properties to append to the generated impressions sent to Split backend. Read more in our docs.
58

lib/splitclient-rb/engine/impressions/unique_keys_tracker.rb

Lines changed: 58 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ def initialize(config,
1414
@filter_adapter = filter_adapter
1515
@sender_adapter = sender_adapter
1616
@cache = cache
17-
@cache_max_size = config.unique_keys_cache_max_size
1817
@max_bulk_size = config.unique_keys_bulk_size
1918
@semaphore = Mutex.new
19+
@keys_size = 0
2020
end
2121

2222
def call
@@ -30,8 +30,9 @@ def track(feature_name, key)
3030
@filter_adapter.add(feature_name, key)
3131

3232
add_or_update(feature_name, key)
33+
@keys_size += 1
3334

34-
send_bulk_data if @cache.size >= @cache_max_size
35+
send_bulk_data if @keys_size >= @max_bulk_size
3536

3637
true
3738
rescue StandardError => e
@@ -70,27 +71,73 @@ def add_or_update(feature_name, key)
7071
end
7172
end
7273

74+
def clear_cache
75+
uniques = @cache.clone
76+
keys_size = @keys_size
77+
@cache.clear
78+
@keys_size = 0
79+
80+
[uniques, keys_size]
81+
end
82+
7383
def send_bulk_data
7484
@semaphore.synchronize do
7585
return if @cache.empty?
7686

77-
uniques = @cache.clone
78-
@cache.clear
79-
80-
if uniques.size <= @max_bulk_size
87+
uniques, keys_size = clear_cache
88+
if keys_size <= @max_bulk_size
8189
@sender_adapter.record_uniques_key(uniques)
8290
return
83-
end
84-
85-
bulks = SplitIoClient::Utilities.split_bulk_to_send(uniques, uniques.size / @max_bulk_size)
8691

87-
bulks.each do |b|
88-
@sender_adapter.record_uniques_key(b)
8992
end
93+
bulks = flatten_bulks(uniques)
94+
bulks_to_post = group_bulks_by_max_size(bulks)
95+
@sender_adapter.record_uniques_key(bulks_to_post)
9096
end
9197
rescue StandardError => e
9298
@config.log_found_exception(__method__.to_s, e)
9399
end
100+
101+
def group_bulks_by_max_size(bulks)
102+
current_size = 0
103+
bulks_to_post = Concurrent::Hash.new
104+
bulks.each do |bulk|
105+
key, value = bulk.first
106+
if (value.size + current_size) > @max_bulk_size
107+
@sender_adapter.record_uniques_key(bulks_to_post)
108+
bulks_to_post = Concurrent::Hash.new
109+
current_size = 0
110+
end
111+
bulks_to_post[key] = value
112+
current_size += value.size
113+
end
114+
115+
bulks_to_post
116+
end
117+
118+
def flatten_bulks(uniques)
119+
bulks = []
120+
uniques.each_key do |unique_key|
121+
bulks += check_keys_and_split_to_bulks(uniques[unique_key], unique_key)
122+
end
123+
124+
bulks
125+
end
126+
127+
def check_keys_and_split_to_bulks(value, key)
128+
unique_updated = []
129+
if value.size > @max_bulk_size
130+
sub_bulks = SplitIoClient::Utilities.split_bulk_to_send(value, @max_bulk_size)
131+
sub_bulks.each do |sub_bulk|
132+
unique_updated << { key => sub_bulk.to_set }
133+
end
134+
return unique_updated
135+
136+
end
137+
unique_updated << { key => value }
138+
139+
unique_updated
140+
end
94141
end
95142
end
96143
end

lib/splitclient-rb/split_config.rb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ def initialize(opts = {})
112112
@telemetry_service_url = opts[:telemetry_service_url] || SplitConfig.default_telemetry_service_url
113113

114114
@unique_keys_refresh_rate = SplitConfig.default_unique_keys_refresh_rate(@cache_adapter)
115-
@unique_keys_cache_max_size = SplitConfig.default_unique_keys_cache_max_size
115+
# @unique_keys_cache_max_size = SplitConfig.default_unique_keys_cache_max_size
116116
@unique_keys_bulk_size = SplitConfig.default_unique_keys_bulk_size(@cache_adapter)
117117

118118
@counter_refresh_rate = SplitConfig.default_counter_refresh_rate(@cache_adapter)
@@ -292,7 +292,7 @@ def initialize(opts = {})
292292
attr_accessor :on_demand_fetch_max_retries
293293

294294
attr_accessor :unique_keys_refresh_rate
295-
attr_accessor :unique_keys_cache_max_size
295+
#attr_accessor :unique_keys_cache_max_size
296296
attr_accessor :unique_keys_bulk_size
297297

298298
attr_accessor :counter_refresh_rate
@@ -498,9 +498,9 @@ def self.default_unique_keys_refresh_rate(adapter)
498498
900
499499
end
500500

501-
def self.default_unique_keys_cache_max_size
502-
30000
503-
end
501+
# def self.default_unique_keys_cache_max_size
502+
# 30000
503+
# end
504504

505505
def self.default_unique_keys_bulk_size(adapter)
506506
return 2000 if adapter == :redis

lib/splitclient-rb/utilitites.rb

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,12 @@ def randomize_interval(interval)
3838
interval * random_factor
3939
end
4040

41-
def split_bulk_to_send(hash, divisions)
42-
count = 0
43-
44-
hash.each_with_object([]) do |key_value, final|
45-
final[count % divisions] ||= {}
46-
final[count % divisions][key_value[0]] = key_value[1]
47-
count += 1
48-
end
49-
rescue StandardError
50-
[]
41+
def split_bulk_to_send(items, divisions)
42+
to_return = []
43+
items.to_a.each_slice(divisions) {|bulk|
44+
to_return.push(bulk.to_set)
45+
}
46+
to_return
5147
end
5248
end
5349
end

lib/splitclient-rb/version.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
module SplitIoClient
2-
VERSION = '8.7.0'
2+
VERSION = '8.8.0'
33
end

spec/engine/impressions/memory_unique_keys_tracker_spec.rb

Lines changed: 63 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,17 @@
2121
it 'track - full cache and send bulk' do
2222
post_url = 'https://telemetry.split.io/api/v1/keys/ss'
2323
body_expect = {
24-
keys: [{ f: 'feature-test-0', ks: ['key_test-1', 'key_test-2'] }, { f: 'feature-test-1', ks: ['key_test-1'] }]
24+
keys: [{ f: 'feature-test-0', ks: ['key_test-1', 'key_test-2'] }]
25+
}.to_json
26+
27+
body_expect2 = {
28+
keys: [{ f: 'feature-test-1', ks: ['key_test-1', 'key_test-2'] }]
2529
}.to_json
2630

2731
stub_request(:post, post_url).with(body: body_expect).to_return(status: 200, body: '')
32+
stub_request(:post, post_url).with(body: body_expect2).to_return(status: 200, body: '')
2833

2934
cache = Concurrent::Hash.new
30-
config.unique_keys_cache_max_size = 2
3135
config.unique_keys_bulk_size = 2
3236
tracker = subject.new(config, filter_adapter, sender_adapter, cache)
3337

@@ -36,36 +40,61 @@
3640
expect(tracker.track("feature-test-#{i}", 'key_test-2')).to eq(true)
3741
end
3842

43+
expect(a_request(:post, post_url).with(body: body_expect2)).to have_been_made
3944
expect(a_request(:post, post_url).with(body: body_expect)).to have_been_made
4045

4146
cache.clear
4247
end
4348

44-
it 'track - full cache and send 2 bulks' do
49+
it 'track - full cache and send 4 bulks' do
4550
post_url = 'https://telemetry.split.io/api/v1/keys/ss'
4651
body_expect1 = {
47-
keys: [{ f: 'feature-test-0', ks: ['key-1', 'key-2'] }, { f: 'feature-test-2', ks: ['key-1', 'key-2'] }]
52+
keys: [{ f: 'feature-test-0', ks: ['key-1', 'key-2'] }]
4853
}.to_json
4954

5055
body_expect2 = {
51-
keys: [{ f: 'feature-test-1', ks: ['key-1', 'key-2'] }, { f: 'feature-test-3', ks: ['key-1'] }]
56+
keys: [{ f: 'feature-test-0', ks: ['key-3'] }, { f: 'feature-test-1', ks: ['key-1'] }]
57+
}.to_json
58+
59+
body_expect3 = {
60+
keys: [{ f: 'feature-test-1', ks: ['key-2', 'key-3'] }]
61+
}.to_json
62+
63+
body_expect4 = {
64+
keys: [{ f: 'feature-test-2', ks: ['key-1', 'key-2'] }]
65+
}.to_json
66+
67+
body_expect5 = {
68+
keys: [{ f: 'feature-test-2', ks: ['key-3'] }, { f: 'feature-test-3', ks: ['key-1'] }]
69+
}.to_json
70+
71+
body_expect6 = {
72+
keys: [{ f: 'feature-test-3', ks: ['key-2', 'key-3'] }]
5273
}.to_json
5374

5475
stub_request(:post, post_url).with(body: body_expect1).to_return(status: 200, body: '')
5576
stub_request(:post, post_url).with(body: body_expect2).to_return(status: 200, body: '')
77+
stub_request(:post, post_url).with(body: body_expect3).to_return(status: 200, body: '')
78+
stub_request(:post, post_url).with(body: body_expect4).to_return(status: 200, body: '')
79+
stub_request(:post, post_url).with(body: body_expect5).to_return(status: 200, body: '')
80+
stub_request(:post, post_url).with(body: body_expect6).to_return(status: 200, body: '')
5681

5782
cache = Concurrent::Hash.new
58-
config.unique_keys_cache_max_size = 4
5983
config.unique_keys_bulk_size = 2
6084
tracker = subject.new(config, filter_adapter, sender_adapter, cache)
6185

6286
4.times do |i|
6387
expect(tracker.track("feature-test-#{i}", 'key-1')).to eq(true)
6488
expect(tracker.track("feature-test-#{i}", 'key-2')).to eq(true)
89+
expect(tracker.track("feature-test-#{i}", 'key-3')).to eq(true)
6590
end
6691

6792
expect(a_request(:post, post_url).with(body: body_expect1)).to have_been_made
6893
expect(a_request(:post, post_url).with(body: body_expect2)).to have_been_made
94+
expect(a_request(:post, post_url).with(body: body_expect3)).to have_been_made
95+
expect(a_request(:post, post_url).with(body: body_expect4)).to have_been_made
96+
expect(a_request(:post, post_url).with(body: body_expect5)).to have_been_made
97+
expect(a_request(:post, post_url).with(body: body_expect6)).to have_been_made
6998

7099
cache.clear
71100
end
@@ -74,9 +103,8 @@
74103
context 'with sender_adapter_test' do
75104
let(:sender_adapter_test) { MemoryUniqueKeysSenderTest.new }
76105

77-
it 'track - should add elemets to cache' do
106+
it 'track - should trigger send when bulk size reached and add elemets to cache' do
78107
cache = Concurrent::Hash.new
79-
config.unique_keys_cache_max_size = 5
80108
config.unique_keys_bulk_size = 5
81109
tracker = subject.new(config, filter_adapter, sender_adapter_test, cache)
82110

@@ -85,24 +113,26 @@
85113
expect(tracker.track('feature_name_test', 'key_test-1')).to eq(true)
86114
expect(tracker.track('feature_name_test', 'key_test-2')).to eq(true)
87115
expect(tracker.track('other_test', 'key_test-2')).to eq(true)
88-
expect(tracker.track('other_test', 'key_test-35')).to eq(true)
89-
90116
expect(cache.size).to eq(2)
117+
expect(tracker.instance_variable_get(:@keys_size)).to eq(4)
118+
91119
expect(cache['feature_name_test'].include?('key_test')).to eq(true)
92120
expect(cache['feature_name_test'].include?('key_test-1')).to eq(true)
93121
expect(cache['feature_name_test'].include?('key_test-2')).to eq(true)
94122
expect(cache['feature_name_test'].include?('key_test-35')).to eq(false)
95123

96124
expect(cache['other_test'].include?('key_test-2')).to eq(true)
97-
expect(cache['other_test'].include?('key_test-35')).to eq(true)
98125
expect(cache['other_test'].include?('key_test-1')).to eq(false)
99126

127+
expect(tracker.track('other_test', 'key_test-35')).to eq(true)
128+
expect(cache.size).to eq(0)
129+
expect(tracker.instance_variable_get(:@keys_size)).to eq(0)
130+
100131
cache.clear
101132
end
102133

103134
it 'track - full cache and send bulk' do
104135
cache = Concurrent::Hash.new
105-
config.unique_keys_cache_max_size = 10
106136
config.unique_keys_bulk_size = 5
107137
tracker = subject.new(config, filter_adapter, sender_adapter_test, cache)
108138

@@ -116,5 +146,26 @@
116146

117147
cache.clear
118148
end
149+
150+
it 'track - split chunks if above limit' do
151+
cache = Concurrent::Hash.new
152+
config.unique_keys_bulk_size = 1000
153+
tracker = subject.new(config, filter_adapter, sender_adapter_test, cache)
154+
155+
10.times { |i| expect(tracker.track("feature-test-#{i}", 'key_test')).to eq(true) }
156+
5.times { |i| expect(tracker.track("feature-test-1", "key_test-#{i}")).to eq(true) }
157+
158+
tracker.instance_variable_set(:@max_bulk_size, 5)
159+
tracker.send(:send_bulk_data)
160+
161+
result = sender_adapter_test.bulks
162+
expect(result[0].size).to eq(1)
163+
expect(result[1].size).to eq(1)
164+
expect(result[1]["feature-test-1"].size).to eq(5)
165+
expect(result[2].size).to eq(5)
166+
expect(result[3].size).to eq(4)
167+
168+
cache.clear
169+
end
119170
end
120171
end

spec/engine/impressions/redis_unique_keys_tracker_spec.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
key = "#{config.redis_namespace}.uniquekeys"
2727

2828
cache = Concurrent::Hash.new
29-
config.unique_keys_cache_max_size = 20
3029
config.unique_keys_bulk_size = 2
3130
tracker = subject.new(config, filter_adapter, sender_adapter, cache)
3231

spec/splitclient/utilities_spec.rb

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -19,37 +19,14 @@
1919
end
2020

2121
it 'split bulk of data - split equally' do
22-
hash = {}
22+
items = Set['feature', 'feature-1', 'feature-2', 'feature-3', 'feature-4', 'feature-5', 'feature-6']
2323

24-
i = 1
25-
while i <= 6
26-
hash["mauro-#{i}"] = Set.new(['feature', 'feature-1'])
27-
i += 1
28-
end
29-
30-
result = SplitIoClient::Utilities.split_bulk_to_send(hash, 3)
24+
result = SplitIoClient::Utilities.split_bulk_to_send(items, 3)
3125

26+
puts result
3227
expect(result.size).to eq 3
33-
expect(result[0].size).to eq 2
34-
expect(result[1].size).to eq 2
35-
expect(result[2].size).to eq 2
36-
end
37-
38-
it 'split bulk of data - split in 4 bulks' do
39-
hash = {}
40-
41-
i = 1
42-
while i <= 6
43-
hash["mauro-#{i}"] = 'feature-test'
44-
i += 1
45-
end
46-
47-
result = SplitIoClient::Utilities.split_bulk_to_send(hash, 4)
48-
49-
expect(result.size).to eq 4
50-
expect(result[0].size).to eq 2
51-
expect(result[1].size).to eq 2
28+
expect(result[0].size).to eq 3
29+
expect(result[1].size).to eq 3
5230
expect(result[2].size).to eq 1
53-
expect(result[3].size).to eq 1
5431
end
5532
end

0 commit comments

Comments
 (0)