Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions storage/inmemory/mutexmap/rulebasedsegment.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,25 @@ func (r *RuleBasedSegmentsStorageImpl) GetRuleBasedSegmentByName(name string) (*
return nil, fmt.Errorf("rule-based segment %s not found in storage", name)
}

func (m *RuleBasedSegmentsStorageImpl) _get(splitName string) *dtos.RuleBasedSegmentDTO {
item, exists := m.data[splitName]
if !exists {
return nil
}
return &item
}

// FetchMany fetches rule-based segments in the storage and returns an array of rule-based segments dtos
func (m *RuleBasedSegmentsStorageImpl) FetchMany(rbsNames []string) map[string]*dtos.RuleBasedSegmentDTO {
m.mutex.RLock()
defer m.mutex.RUnlock()
rbSegments := make(map[string]*dtos.RuleBasedSegmentDTO)
for _, rbsName := range rbsNames {
rbSegments[rbsName] = m._get(rbsName)
}
return rbSegments
}

func (r *RuleBasedSegmentsStorageImpl) ReplaceAll(toAdd []dtos.RuleBasedSegmentDTO, changeNumber int64) error {
r.mutex.RLock()
toRemove := make([]dtos.RuleBasedSegmentDTO, 0)
Expand Down
7 changes: 7 additions & 0 deletions storage/inmemory/mutexmap/rulebasedsegment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ func TestRuleBasedSegmentsStorage(t *testing.T) {
assert.Contains(t, names, "rule1")
assert.Contains(t, names, "rule2")

// Test FetchMany
rbsFetchMany := storage.FetchMany([]string{"rule1", "rule2", "nonexistent"})
assert.Len(t, rbsFetchMany, 3)
assert.Equal(t, "rule1", rbsFetchMany["rule1"].Name)
assert.Equal(t, "rule2", rbsFetchMany["rule2"].Name)
assert.Nil(t, rbsFetchMany["nonexistent"])

// Test GetSegments
segments := storage.Segments()
assert.True(t, segments.Has("segment1"), "segment1 should be in segments")
Expand Down
1 change: 1 addition & 0 deletions storage/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ type RuleBasedSegmentStorageProducer interface {
type RuleBasedSegmentStorageConsumer interface {
ChangeNumber() (int64, error)
All() []dtos.RuleBasedSegmentDTO
FetchMany(rbsNames []string) map[string]*dtos.RuleBasedSegmentDTO
RuleBasedSegmentNames() ([]string, error)
Contains(ruleBasedSegmentNames []string) bool
Segments() *set.ThreadUnsafeSet
Expand Down
5 changes: 5 additions & 0 deletions storage/mocks/rulebasedsegment.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,9 @@ func (m *MockRuleBasedSegmentStorage) LargeSegments() *set.ThreadUnsafeSet {
return args.Get(0).(*set.ThreadUnsafeSet)
}

func (m *MockRuleBasedSegmentStorage) FetchMany(rbsNames []string) map[string]*dtos.RuleBasedSegmentDTO {
args := m.Called(rbsNames)
return args.Get(0).(map[string]*dtos.RuleBasedSegmentDTO)
}

var _ storage.RuleBasedSegmentsStorage = (*MockRuleBasedSegmentStorage)(nil)
48 changes: 48 additions & 0 deletions storage/redis/rulebasedsegment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,54 @@ func TestReplaceAllRuleBased(t *testing.T) {
redisClient.Del(keys...)
}

func TestRBFetchMany(t *testing.T) {
t.Run("FetchMany Error", func(t *testing.T) {
expectedKey := "someprefix.SPLITIO.rbsegment.someRB1"
expectedKey2 := "someprefix.SPLITIO.rbsegment.someRB2"

mockedRedisClient := mocks.MockClient{
MGetCall: func(keys []string) redis.Result {
assert.ElementsMatch(t, []string{expectedKey, expectedKey2}, keys)
return &mocks.MockResultOutput{
MultiInterfaceCall: func() ([]interface{}, error) {
return []interface{}{}, errors.New("Some Error")
},
}
},
}
mockPrefixedClient, _ := redis.NewPrefixedRedisClient(&mockedRedisClient, "someprefix")
rbStorage := NewRuleBasedStorage(mockPrefixedClient, logging.NewLogger(&logging.LoggerOptions{}))
rbs := rbStorage.FetchMany([]string{"someRB1", "someRB2"})
assert.Nil(t, rbs)
})

t.Run("FetchMany Success", func(t *testing.T) {
expectedKey := "someprefix.SPLITIO.rbsegment.someRB1"
expectedKey2 := "someprefix.SPLITIO.rbsegment.someRB2"

mockedRedisClient := mocks.MockClient{
MGetCall: func(keys []string) redis.Result {
assert.ElementsMatch(t, []string{expectedKey, expectedKey2}, keys)
return &mocks.MockResultOutput{
MultiInterfaceCall: func() ([]interface{}, error) {
return []interface{}{
marshalRuleBasedSegment(createSampleRBSegment("someRB1")),
marshalRuleBasedSegment(createSampleRBSegment("someRB2")),
}, nil
},
}
},
}
mockPrefixedClient, _ := redis.NewPrefixedRedisClient(&mockedRedisClient, "someprefix")

rbStorage := NewRuleBasedStorage(mockPrefixedClient, logging.NewLogger(&logging.LoggerOptions{}))
rbs := rbStorage.FetchMany([]string{"someRB1", "someRB2"})
assert.Equal(t, 2, len(rbs))
assert.NotNil(t, rbs["someRB1"])
assert.NotNil(t, rbs["someRB2"])
})
}

func marshalRuleBasedSegment(rbSegment dtos.RuleBasedSegmentDTO) string {
json, _ := json.Marshal(rbSegment)
return string(json)
Expand Down
32 changes: 32 additions & 0 deletions storage/redis/rulebasedsegments.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,4 +368,36 @@ func (r *RuleBasedSegmentStorage) executePipeline(pipeline redis.Pipeline, toAdd
return failedToAdd, failedToRemove
}

func (r *RuleBasedSegmentStorage) FetchMany(names []string) map[string]*dtos.RuleBasedSegmentDTO {
if len(names) == 0 {
return nil
}

keysToFetch := make([]string, 0, len(names))
for _, name := range names {
keysToFetch = append(keysToFetch, strings.Replace(KeyRuleBasedSegment, "{rbsegment}", name, 1))
}
rawRBS, err := r.client.MGet(keysToFetch)
if err != nil {
r.logger.Error(fmt.Sprintf("Could not fetch rule-based segments from redis: %s", err.Error()))
return nil
}

rbs := make(map[string]*dtos.RuleBasedSegmentDTO)
for idx, rb := range names {
var rbSegment *dtos.RuleBasedSegmentDTO
rawRBSegment, ok := rawRBS[idx].(string)
if ok {
err = json.Unmarshal([]byte(rawRBSegment), &rbSegment)
if err != nil {
r.logger.Error("Could not parse rule-based segment \"%s\" fetched from redis", rb)
return nil
}
}
rbs[rb] = rbSegment
}

return rbs
}

var _ storage.RuleBasedSegmentsStorage = (*RuleBasedSegmentStorage)(nil)
30 changes: 15 additions & 15 deletions storage/redis/splits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,20 +562,20 @@ func TestUpdateRedis(t *testing.T) {
if len(splits) != 3 {
t.Error("Unexpected amount of splits")
}
set1, err := redisClient.SMembers("SPLITIO.flagSet.set1")
set1, _ := redisClient.SMembers("SPLITIO.flagSet.set1")
if len(set1) != 2 {
t.Error("set size should be 2")
}
if !slices.Contains(set1, "split1") || !slices.Contains(set1, "split2") {
t.Error("Split missing in set")
}
tt, err := redisClient.Get("SPLITIO.trafficType.user")
ttCount, _ := strconv.ParseFloat(tt, 10)
tt, _ := redisClient.Get("SPLITIO.trafficType.user")
ttCount, _ := strconv.ParseFloat(tt, 64)
if ttCount != 3 {
t.Error("Split should exist")
}
till, err := redisClient.Get("SPLITIO.splits.till")
tillInt, _ := strconv.ParseFloat(till, 10)
till, _ := redisClient.Get("SPLITIO.splits.till")
tillInt, _ := strconv.ParseFloat(till, 64)
if tillInt != 1 {
t.Error("ChangeNumber should be 1")
}
Expand All @@ -587,29 +587,29 @@ func TestUpdateRedis(t *testing.T) {
if len(splits) != 3 {
t.Error("Unexpected size")
}
set1, err = redisClient.SMembers("SPLITIO.flagSet.set1")
set1, _ = redisClient.SMembers("SPLITIO.flagSet.set1")
if len(set1) != 0 {
t.Error("set size should be 0")
}
set3, err := redisClient.SMembers("SPLITIO.flagSet.set3")
set3, _ := redisClient.SMembers("SPLITIO.flagSet.set3")
if len(set3) != 3 {
t.Error("set size should be 3")
}
if !slices.Contains(set3, "split3") || !slices.Contains(set3, "split4") || !slices.Contains(set3, "split5") {
t.Error("Split missing in set")
}
tt, err = redisClient.Get("SPLITIO.trafficType.user")
ttCount, _ = strconv.ParseFloat(tt, 10)
tt, _ = redisClient.Get("SPLITIO.trafficType.user")
ttCount, _ = strconv.ParseFloat(tt, 64)
if ttCount != 3 {
t.Error("Unexpected trafficType occurrences")
}

split1, err := redisClient.Get("SPLITIO.split.split1")
split1, _ := redisClient.Get("SPLITIO.split.split1")
if split1 != "" {
t.Error("Split should not exist")
}
till, err = redisClient.Get("SPLITIO.splits.till")
tillInt, _ = strconv.ParseFloat(till, 10)
till, _ = redisClient.Get("SPLITIO.splits.till")
tillInt, _ = strconv.ParseFloat(till, 64)
if tillInt != 2 {
t.Error("ChangeNumber should be 2")
}
Expand Down Expand Up @@ -649,15 +649,15 @@ func TestUpdateWithFlagSetFiltersRedis(t *testing.T) {
if len(splits) != 3 {
t.Error("Unexpected amount of splits")
}
set1, err := redisClient.SMembers("SPLITIO.flagSet.set1")
set1, _ := redisClient.SMembers("SPLITIO.flagSet.set1")
if len(set1) != 2 {
t.Error("set size should be 2")
}
set2, err := redisClient.SMembers("SPLITIO.flagSet.set2")
set2, _ := redisClient.SMembers("SPLITIO.flagSet.set2")
if len(set2) != 1 {
t.Error("set size should be 1")
}
set3, err := redisClient.SMembers("SPLITIO.flagSet.set3")
set3, _ := redisClient.SMembers("SPLITIO.flagSet.set3")
if len(set3) != 0 {
t.Error("set size should be 0")
}
Expand Down
2 changes: 1 addition & 1 deletion synchronizer/worker/split/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (s *UpdaterImpl) attemptLatestSync() (*UpdateResult, error) {
currentRBSince = splitChanges.RBTill()
s.runtimeTelemetry.RecordSyncLatency(telemetry.SplitSync, time.Since(before))
s.splitStorage.ReplaceAll(splitChanges.FeatureFlags(), currentSince)
s.ruleBasedSegmentStorage.ReplaceAll(splitChanges.RuleBasedSegments(), currentSince)
s.ruleBasedSegmentStorage.ReplaceAll(splitChanges.RuleBasedSegments(), currentRBSince)
segmentReferences := s.getSegmentsFromRuleBasedSegments(splitChanges.RuleBasedSegments())
segmentReferences = appendSegmentNames(segmentReferences, splitChanges.FeatureFlags())
updatedSplitNames = appendSplitNames(updatedSplitNames, splitChanges.FeatureFlags())
Expand Down
Loading