Skip to content

Commit d1426ab

Browse files
authored
Merge pull request #205 from splitio/fix/uniqui-keys-size
Fix Unique keys size
2 parents 85cc48c + 62906f3 commit d1426ab

25 files changed

+313
-110
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ jobs:
3535

3636
- name: SonarQube Scan (Push)
3737
if: ${{ github.event_name == 'push' }}
38-
uses: SonarSource/sonarcloud-github-action@v4.0.0
38+
uses: SonarSource/sonarcloud-github-action@v5.0.0
3939
env:
4040
SONAR_TOKEN: ${{ secrets.SONARQUBE_TOKEN }}
4141
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
@@ -46,7 +46,7 @@ jobs:
4646
4747
- name: SonarQube Scan (Pull Request)
4848
if: ${{ github.event_name == 'pull_request' }}
49-
uses: SonarSource/sonarcloud-github-action@v4.0.0
49+
uses: SonarSource/sonarcloud-github-action@v5.0.0
5050
env:
5151
SONAR_TOKEN: ${{ secrets.SONARQUBE_TOKEN }}
5252
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

conf/conf.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ type AdvancedConfig struct {
7777
EventsQueueSize int
7878
ImpressionsQueueSize int
7979
ImpressionsBulkSize int64
80+
UniqueKeysQueueSize int64
81+
UniqueKeysBulkSize int64
8082
StreamingEnabled bool
8183
AuthServiceURL string
8284
StreamingServiceURL string

dtos/telemetry.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,8 @@ type Stats struct {
143143

144144
// Key struct
145145
type Key struct {
146-
Feature string `json:"f,omitempty"`
147-
Keys []string `json:"ks,omitempty"`
146+
Feature string `json:"f,omitempty"`
147+
Keys []interface{} `json:"ks,omitempty"`
148148
}
149149

150150
// Uniques struct

provisional/impmanager_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ import (
88
"github.com/splitio/go-split-commons/v6/provisional/strategy"
99
"github.com/splitio/go-split-commons/v6/storage/filter"
1010
"github.com/splitio/go-split-commons/v6/storage/inmemory"
11+
"github.com/splitio/go-split-commons/v6/storage/inmemory/mutexqueue"
1112
"github.com/splitio/go-split-commons/v6/telemetry"
13+
"github.com/splitio/go-toolkit/v5/logging"
1214
)
1315

1416
func TestImpManagerInMemoryDebugListenerDisabled(t *testing.T) {
@@ -122,7 +124,8 @@ func TestImpManagerInMemoryOptimized(t *testing.T) {
122124
func TestImpManagerInMemoryNone(t *testing.T) {
123125
counter := strategy.NewImpressionsCounter()
124126
filter := filter.NewBloomFilter(3000, 0.01)
125-
uniqueTracker := strategy.NewUniqueKeysTracker(filter)
127+
uniqueKeysStorage := mutexqueue.NewMQUniqueKeysStorage(100, make(chan string), logging.NewLogger(nil))
128+
uniqueTracker := strategy.NewUniqueKeysTracker(filter, uniqueKeysStorage)
126129
none := strategy.NewNoneImpl(counter, uniqueTracker, true)
127130
impManager := NewImpressionManager(none)
128131

@@ -190,7 +193,8 @@ func TestProcess(t *testing.T) {
190193
observer, _ := strategy.NewImpressionObserver(5000)
191194
debug := strategy.NewDebugImpl(observer, true)
192195
filter := filter.NewBloomFilter(3000, 0.01)
193-
uniqueTracker := strategy.NewUniqueKeysTracker(filter)
196+
uniqueKeysStorage := mutexqueue.NewMQUniqueKeysStorage(100, make(chan string), logging.NewLogger(nil))
197+
uniqueTracker := strategy.NewUniqueKeysTracker(filter, uniqueKeysStorage)
194198
counter := strategy.NewImpressionsCounter()
195199
none := strategy.NewNoneImpl(counter, uniqueTracker, false)
196200

provisional/strategy/none_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,16 @@ import (
66

77
"github.com/splitio/go-split-commons/v6/dtos"
88
"github.com/splitio/go-split-commons/v6/storage/filter"
9+
"github.com/splitio/go-split-commons/v6/storage/inmemory/mutexqueue"
910
"github.com/splitio/go-split-commons/v6/util"
11+
"github.com/splitio/go-toolkit/v5/logging"
1012
)
1113

1214
func TestNoneMode(t *testing.T) {
1315
now := time.Now().UTC().UnixNano()
1416
filter := filter.NewBloomFilter(1000, 0.01)
15-
tracker := NewUniqueKeysTracker(filter)
17+
uniqueKeysStorage := mutexqueue.NewMQUniqueKeysStorage(100, make(chan string), logging.NewLogger(nil))
18+
tracker := NewUniqueKeysTracker(filter, uniqueKeysStorage)
1619
counter := NewImpressionsCounter()
1720
none := NewNoneImpl(counter, tracker, true)
1821

@@ -52,7 +55,8 @@ func TestNoneMode(t *testing.T) {
5255
func TestApplySingleNone(t *testing.T) {
5356
now := time.Now().UTC().UnixNano()
5457
filter := filter.NewBloomFilter(1000, 0.01)
55-
tracker := NewUniqueKeysTracker(filter)
58+
uniqueKeysStorage := mutexqueue.NewMQUniqueKeysStorage(100, make(chan string), logging.NewLogger(nil))
59+
tracker := NewUniqueKeysTracker(filter, uniqueKeysStorage)
5660
counter := NewImpressionsCounter()
5761
none := NewNoneImpl(counter, tracker, true)
5862

Lines changed: 6 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,25 @@
11
package strategy
22

33
import (
4-
"sync"
5-
6-
"github.com/splitio/go-split-commons/v6/dtos"
74
"github.com/splitio/go-split-commons/v6/storage"
8-
"github.com/splitio/go-toolkit/v5/datastructures/set"
95
)
106

117
// UniqueKeysTracker interface
128
type UniqueKeysTracker interface {
139
Track(featureName string, key string) bool
14-
PopAll() dtos.Uniques
1510
}
1611

1712
// UniqueKeysTrackerImpl description
1813
type UniqueKeysTrackerImpl struct {
19-
filter storage.Filter
20-
cache map[string]*set.ThreadUnsafeSet
21-
mutex *sync.RWMutex
14+
filter storage.Filter
15+
storage storage.UniqueKeysStorageProducer
2216
}
2317

2418
// NewUniqueKeysTracker create new implementation
25-
func NewUniqueKeysTracker(f storage.Filter) UniqueKeysTracker {
19+
func NewUniqueKeysTracker(f storage.Filter, storage storage.UniqueKeysStorageProducer) UniqueKeysTracker {
2620
return &UniqueKeysTrackerImpl{
27-
filter: f,
28-
cache: make(map[string]*set.ThreadUnsafeSet),
29-
mutex: &sync.RWMutex{},
21+
filter: f,
22+
storage: storage,
3023
}
3124
}
3225

@@ -37,49 +30,8 @@ func (t *UniqueKeysTrackerImpl) Track(featureName string, key string) bool {
3730
return false
3831
}
3932

40-
t.mutex.Lock()
41-
defer t.mutex.Unlock()
42-
4333
t.filter.Add(fKey)
44-
_, ok := t.cache[featureName]
45-
if !ok {
46-
t.cache[featureName] = set.NewSet()
47-
}
48-
49-
t.cache[featureName].Add(key)
34+
t.storage.Push(featureName, key)
5035

5136
return true
5237
}
53-
54-
// PopAll returns all the elements stored in the cache and resets the cache
55-
func (t *UniqueKeysTrackerImpl) PopAll() dtos.Uniques {
56-
t.mutex.Lock()
57-
defer t.mutex.Unlock()
58-
toReturn := t.cache
59-
t.cache = make(map[string]*set.ThreadUnsafeSet)
60-
61-
return getUniqueKeysDto(toReturn)
62-
}
63-
64-
func getUniqueKeysDto(uniques map[string]*set.ThreadUnsafeSet) dtos.Uniques {
65-
uniqueKeys := dtos.Uniques{
66-
Keys: make([]dtos.Key, 0, len(uniques)),
67-
}
68-
69-
for name, keys := range uniques {
70-
list := keys.List()
71-
keysDto := make([]string, 0, len(list))
72-
73-
for _, value := range list {
74-
keysDto = append(keysDto, value.(string))
75-
}
76-
keyDto := dtos.Key{
77-
Feature: name,
78-
Keys: keysDto,
79-
}
80-
81-
uniqueKeys.Keys = append(uniqueKeys.Keys, keyDto)
82-
}
83-
84-
return uniqueKeys
85-
}

provisional/strategy/uniquekeystracker_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@ import (
55
"testing"
66

77
"github.com/splitio/go-split-commons/v6/storage/filter"
8+
"github.com/splitio/go-split-commons/v6/storage/inmemory/mutexqueue"
9+
"github.com/splitio/go-toolkit/v5/logging"
810
)
911

10-
func Test(t *testing.T) {
12+
func TestUniqueKeysTracker(t *testing.T) {
1113
bf := filter.NewBloomFilter(10000, 0.01)
12-
13-
tracker := NewUniqueKeysTracker(bf)
14+
uniqueKeysStorage := mutexqueue.NewMQUniqueKeysStorage(100, make(chan string), logging.NewLogger(nil))
15+
tracker := NewUniqueKeysTracker(bf, uniqueKeysStorage)
1416

1517
for i := 0; i < 10; i++ {
1618
if !tracker.Track("feature-1", "key-"+fmt.Sprint(i)) {

service/api/http_recorders_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -237,11 +237,11 @@ func TestJsonUniqueKeys(t *testing.T) {
237237
Keys: []dtos.Key{
238238
{
239239
Feature: "feature-1",
240-
Keys: []string{"key-1", "key-2"},
240+
Keys: []interface{}{"key-1", "key-2"},
241241
},
242242
{
243243
Feature: "feature-2",
244-
Keys: []string{"key-10", "key-20"},
244+
Keys: []interface{}{"key-10", "key-20"},
245245
},
246246
},
247247
}
@@ -317,11 +317,11 @@ func TestPostUniqueKeys(t *testing.T) {
317317
keys := []dtos.Key{
318318
{
319319
Feature: "feature-1",
320-
Keys: []string{"key-1", "key-2"},
320+
Keys: []interface{}{"key-1", "key-2"},
321321
},
322322
{
323323
Feature: "feature-2",
324-
Keys: []string{"key-3", "key-4"},
324+
Keys: []interface{}{"key-3", "key-4"},
325325
},
326326
}
327327
err := telemetryRecorder.RecordUniqueKeys(dtos.Uniques{
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package mutexqueue
2+
3+
import (
4+
"container/list"
5+
"sync"
6+
7+
"github.com/splitio/go-split-commons/v6/dtos"
8+
"github.com/splitio/go-split-commons/v6/storage"
9+
"github.com/splitio/go-toolkit/v5/datastructures/set"
10+
"github.com/splitio/go-toolkit/v5/logging"
11+
)
12+
13+
type UniqueKeyWrapper struct {
14+
featureName string
15+
key string
16+
}
17+
18+
type MQUniqueKeysStorage struct {
19+
queue *list.List
20+
maxSize int64
21+
mutexQueue *sync.Mutex
22+
fullChan chan string //only write channel
23+
logger logging.LoggerInterface
24+
}
25+
26+
func NewMQUniqueKeysStorage(maxSize int64, isFull chan string, logger logging.LoggerInterface) *MQUniqueKeysStorage {
27+
return &MQUniqueKeysStorage{
28+
queue: list.New(),
29+
maxSize: maxSize,
30+
mutexQueue: &sync.Mutex{},
31+
fullChan: isFull,
32+
logger: logger,
33+
}
34+
}
35+
36+
func (s *MQUniqueKeysStorage) Push(featureName string, key string) {
37+
s.mutexQueue.Lock()
38+
defer s.mutexQueue.Unlock()
39+
40+
s.queue.PushBack(UniqueKeyWrapper{featureName: featureName, key: key})
41+
if s.queue.Len() == int(s.maxSize) {
42+
s.sendSignalIsFull()
43+
}
44+
}
45+
46+
func (s *MQUniqueKeysStorage) PopN(n int64) dtos.Uniques {
47+
s.mutexQueue.Lock()
48+
defer s.mutexQueue.Unlock()
49+
50+
var totalItems int
51+
if int64(s.queue.Len()) >= n {
52+
totalItems = int(n)
53+
} else {
54+
totalItems = s.queue.Len()
55+
}
56+
57+
uniques := make(map[string]*set.ThreadUnsafeSet)
58+
for i := 0; i < totalItems; i++ {
59+
item, ok := s.queue.Remove(s.queue.Front()).(UniqueKeyWrapper)
60+
if !ok {
61+
continue
62+
}
63+
64+
_, exists := uniques[item.featureName]
65+
if !exists {
66+
uniques[item.featureName] = set.NewSet()
67+
}
68+
69+
uniques[item.featureName].Add(item.key)
70+
}
71+
72+
return getUniqueKeysDto(uniques)
73+
}
74+
75+
func (s *MQUniqueKeysStorage) sendSignalIsFull() {
76+
// Nom blocking select
77+
select {
78+
case s.fullChan <- "UNIQUE_KEYS_FULL":
79+
// Send "queue is full" signal
80+
break
81+
default:
82+
s.logger.Debug("Some error occurred on sending signal for unique keys")
83+
}
84+
}
85+
86+
func getUniqueKeysDto(uniques map[string]*set.ThreadUnsafeSet) dtos.Uniques {
87+
keysToReturn := make([]dtos.Key, 0)
88+
89+
for name, keys := range uniques {
90+
keyDto := dtos.Key{
91+
Feature: name,
92+
Keys: keys.List(),
93+
}
94+
95+
keysToReturn = append(keysToReturn, keyDto)
96+
}
97+
98+
return dtos.Uniques{
99+
Keys: keysToReturn,
100+
}
101+
}
102+
103+
var _ storage.UniqueKeysStorageConsumer = (*MQUniqueKeysStorage)(nil)
104+
var _ storage.UniqueKeysStorageProducer = (*MQUniqueKeysStorage)(nil)

0 commit comments

Comments
 (0)