From 5052ca5b73a12a83be954aa4e6570b3c77784cce Mon Sep 17 00:00:00 2001 From: Yasir Ali Date: Wed, 30 Mar 2022 14:42:44 +0500 Subject: [PATCH 1/4] Initial commit. --- config.yaml | 6 ++ config/config.go | 13 +++++ go.mod | 1 + go.sum | 2 + pkg/optimizely/cache.go | 55 ++++++++++++++----- pkg/optimizely/cache_test.go | 39 ++++++++----- plugins/userprofileservice/registry.go | 8 +++ .../userprofileservice/services/redis_ups.go | 27 +++++++-- .../services/redis_ups_test.go | 11 +++- 9 files changed, 131 insertions(+), 31 deletions(-) diff --git a/config.yaml b/config.yaml index c508a33f..f35bbfef 100644 --- a/config.yaml +++ b/config.yaml @@ -129,6 +129,12 @@ client: flushInterval: 30s ## Template URL for SDK datafile location. The template should specify a "%s" token for SDK key substitution. datafileURLTemplate: "https://cdn.optimizely.com/datafiles/%s.json" + ## Datafile cache to save initial datafile fetch call in a multi-node environment. + datafileCache: + # redis: + # host: "localhost:6379" + # password: "" + # database: 0 ## URL for dispatching events. eventURL: "https://logx.optimizely.com/v1/events" ## Validation Regex on the request SDK Key diff --git a/config/config.go b/config/config.go index ae2a49df..42fa53ba 100644 --- a/config/config.go +++ b/config/config.go @@ -160,11 +160,24 @@ type ClientConfig struct { QueueSize int `json:"queueSize" default:"1000"` FlushInterval time.Duration `json:"flushInterval" default:"30s"` DatafileURLTemplate string `json:"datafileURLTemplate"` + DatafileCache DatafileCache `json:"datafileCache"` EventURL string `json:"eventURL"` SdkKeyRegex string `json:"sdkKeyRegex"` UserProfileService UserProfileServiceConfigs `json:"userProfileService"` } +// DatafileCache holds the configuration options for the Datafile Cache. +type DatafileCache struct { + RedisCache RedisDatafileCache `json:"redis"` +} + +// RedisDatafileCache holds the configuration options for the Redis Datafile Cache. +type RedisDatafileCache struct { + Address string `json:"host"` + Password string `json:"password"` + Database int `json:"database"` +} + // LogConfig holds the log configuration type LogConfig struct { Pretty bool `json:"pretty"` diff --git a/go.mod b/go.mod index 8e31b227..ac8fb18a 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.13 require ( github.com/VividCortex/gohistogram v1.0.0 // indirect + github.com/bsm/redislock v0.7.2 github.com/go-chi/chi v4.1.1+incompatible github.com/go-chi/cors v1.1.1 github.com/go-chi/httplog v0.1.6 diff --git a/go.sum b/go.sum index 6376afdf..c341a5a9 100644 --- a/go.sum +++ b/go.sum @@ -9,6 +9,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= +github.com/bsm/redislock v0.7.2 h1:jggqOio8JyX9FJBKIfjF3fTxAu/v7zC5mAID9LveqG4= +github.com/bsm/redislock v0.7.2/go.mod h1:kS2g0Yvlymc9Dz8V3iVYAtLAaSVruYbAFdYBDrmC5WU= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= diff --git a/pkg/optimizely/cache.go b/pkg/optimizely/cache.go index 01f46cb5..d8156ea3 100644 --- a/pkg/optimizely/cache.go +++ b/pkg/optimizely/cache.go @@ -25,6 +25,7 @@ import ( "strings" "sync" + "github.com/go-redis/redis/v8" "github.com/optimizely/agent/config" "github.com/optimizely/agent/plugins/userprofileservice" "github.com/optimizely/go-sdk/pkg/client" @@ -58,7 +59,7 @@ func NewCache(ctx context.Context, conf config.ClientConfig, metricsRegistry *Me cache := &OptlyCache{ ctx: ctx, wg: sync.WaitGroup{}, - loader: defaultLoader(conf, metricsRegistry, userProfileServiceMap, cmLoader, event.NewBatchEventProcessor), + loader: defaultLoader(ctx, conf, metricsRegistry, userProfileServiceMap, cmLoader, event.NewBatchEventProcessor), optlyMap: cmap.New(), userProfileServiceMap: userProfileServiceMap, } @@ -147,6 +148,7 @@ func regexValidator(sdkKeyRegex string) func(string) bool { } func defaultLoader( + ctx context.Context, conf config.ClientConfig, metricsRegistry *MetricsRegistry, userProfileServiceMap cmap.ConcurrentMap, @@ -186,25 +188,48 @@ func defaultLoader( log.Info().Msg(message) } + // Options for PollingProjectConfigManager + options := []sdkconfig.OptionFunc{} + // Check if datafile is already present in redis cache + var redisClient *redis.Client + var isDatafileCached bool + + if conf.DatafileCache.RedisCache.Address != "" { + redisCacheConfig := conf.DatafileCache.RedisCache + redisClient = redis.NewClient(&redis.Options{ + Addr: redisCacheConfig.Address, + Password: redisCacheConfig.Password, + DB: redisCacheConfig.Database, + }) + if datafile, err := redisClient.Get(ctx, sdkKey).Result(); err != nil && datafile != "" { + // Set datafile in config manager so it uses the cached datafile for initialization + options = append(options, sdkconfig.WithInitialDatafile([]byte(datafile))) + isDatafileCached = true + } + } + + options = append(options, + sdkconfig.WithPollingInterval(conf.PollingInterval), + sdkconfig.WithDatafileURLTemplate(conf.DatafileURLTemplate), + ) + if datafileAccessToken != "" { - configManager = pcFactory( - sdkKey, - sdkconfig.WithPollingInterval(conf.PollingInterval), - sdkconfig.WithDatafileURLTemplate(conf.DatafileURLTemplate), + options = append(options, sdkconfig.WithDatafileAccessToken(datafileAccessToken), ) - } else { - configManager = pcFactory( - sdkKey, - sdkconfig.WithPollingInterval(conf.PollingInterval), - sdkconfig.WithDatafileURLTemplate(conf.DatafileURLTemplate), - ) } + configManager = pcFactory(sdkKey, options...) + if _, err := configManager.GetConfig(); err != nil { return &OptlyClient{}, err } + if redisClient != nil && !isDatafileCached { + // Need to use redis lock here + redisClient.Set(ctx, sdkKey, configManager.GetOptimizelyConfig().GetDatafile(), 0) + } + q := event.NewInMemoryQueue(conf.QueueSize) ep := bpFactory( event.WithSDKKey(sdkKey), @@ -226,7 +251,7 @@ func defaultLoader( } var clientUserProfileService decision.UserProfileService - if clientUserProfileService = getUserProfileService(sdkKey, userProfileServiceMap, conf); clientUserProfileService != nil { + if clientUserProfileService = getUserProfileService(ctx, sdkKey, userProfileServiceMap, conf); clientUserProfileService != nil { clientOptions = append(clientOptions, client.WithUserProfileService(clientUserProfileService)) } @@ -238,7 +263,7 @@ func defaultLoader( } // Returns the registered userProfileService against the sdkKey -func getUserProfileService(sdkKey string, userProfileServiceMap cmap.ConcurrentMap, conf config.ClientConfig) decision.UserProfileService { +func getUserProfileService(ctx context.Context, sdkKey string, userProfileServiceMap cmap.ConcurrentMap, conf config.ClientConfig) decision.UserProfileService { intializeUPSWithName := func(upsName string) decision.UserProfileService { if clientConfigUPSMap, ok := conf.UserProfileService["services"].(map[string]interface{}); ok { @@ -256,6 +281,10 @@ func getUserProfileService(sdkKey string, userProfileServiceMap cmap.ConcurrentM success = false } if success { + // Pass context to ups if required, necessary for redis + if ctxUps, ok := upsInstance.(userprofileservice.ContextUserProfileService); ok { + ctxUps.AddContext(ctx) + } log.Info().Msgf(`UserProfileService of type: "%s" created for sdkKey: "%s"`, upsName, sdkKey) return upsInstance } diff --git a/pkg/optimizely/cache_test.go b/pkg/optimizely/cache_test.go index 398d644c..99757915 100644 --- a/pkg/optimizely/cache_test.go +++ b/pkg/optimizely/cache_test.go @@ -136,7 +136,7 @@ func (suite *CacheTestSuite) TestGetUserProfileServiceJSONErrorCases() { // json unmarshal error case suite.cache.SetUserProfileService("one", "in-memory") - userProfileService := getUserProfileService("one", suite.cache.userProfileServiceMap, conf) + userProfileService := getUserProfileService(suite.cache.ctx, "one", suite.cache.userProfileServiceMap, conf) suite.Nil(userProfileService) // json marshal error case @@ -145,7 +145,7 @@ func (suite *CacheTestSuite) TestGetUserProfileServiceJSONErrorCases() { "capacity": make(chan int), }}, } - userProfileService = getUserProfileService("one", suite.cache.userProfileServiceMap, conf) + userProfileService = getUserProfileService(suite.cache.ctx, "one", suite.cache.userProfileServiceMap, conf) suite.Nil(userProfileService) } @@ -154,7 +154,7 @@ func (suite *CacheTestSuite) TestNoUserProfileServicesProvidedInConfig() { UserProfileService: map[string]interface{}{}, } suite.cache.SetUserProfileService("one", "in-memory") - userProfileService := getUserProfileService("one", suite.cache.userProfileServiceMap, conf) + userProfileService := getUserProfileService(suite.cache.ctx, "one", suite.cache.userProfileServiceMap, conf) suite.Nil(userProfileService) } @@ -168,7 +168,7 @@ func (suite *CacheTestSuite) TestUPSForSDKKeyNotProvidedInConfig() { }, } suite.cache.SetUserProfileService("one", "dummy") - userProfileService := getUserProfileService("one", suite.cache.userProfileServiceMap, conf) + userProfileService := getUserProfileService(suite.cache.ctx, "one", suite.cache.userProfileServiceMap, conf) suite.Nil(userProfileService) } @@ -182,7 +182,7 @@ func (suite *CacheTestSuite) TestNoCreatorAddedforUPS() { }, } suite.cache.SetUserProfileService("one", "dummy") - userProfileService := getUserProfileService("one", suite.cache.userProfileServiceMap, conf) + userProfileService := getUserProfileService(suite.cache.ctx, "one", suite.cache.userProfileServiceMap, conf) suite.Nil(userProfileService) } @@ -201,7 +201,7 @@ func (suite *CacheTestSuite) TestNilCreatorAddedforUPS() { }, } suite.cache.SetUserProfileService("one", "dummy") - userProfileService := getUserProfileService("one", suite.cache.userProfileServiceMap, conf) + userProfileService := getUserProfileService(suite.cache.ctx, "one", suite.cache.userProfileServiceMap, conf) suite.Nil(userProfileService) } @@ -245,11 +245,16 @@ type DefaultLoaderTestSuite struct { registry *MetricsRegistry bp *event.BatchEventProcessor upsMap cmap.ConcurrentMap + ctx context.Context + cancel func() bpFactory func(options ...event.BPOptionConfig) *event.BatchEventProcessor pcFactory func(sdkKey string, options ...sdkconfig.OptionFunc) SyncedConfigManager } func (s *DefaultLoaderTestSuite) SetupTest() { + ctx, cancel := context.WithCancel(context.Background()) + s.ctx = ctx + s.cancel = cancel // Need the registry to be created only once since it panics if we create gauges with the same name again and again doOnce.Do(func() { s.registry = &MetricsRegistry{metrics.NewRegistry()} @@ -266,6 +271,10 @@ func (s *DefaultLoaderTestSuite) SetupTest() { } } +func (s *DefaultLoaderTestSuite) TearDownTest() { + s.cancel() +} + func (s *DefaultLoaderTestSuite) TestDefaultLoader() { conf := config.ClientConfig{ FlushInterval: 321 * time.Second, @@ -281,7 +290,7 @@ func (s *DefaultLoaderTestSuite) TestDefaultLoader() { }, } - loader := defaultLoader(conf, s.registry, s.upsMap, s.pcFactory, s.bpFactory) + loader := defaultLoader(s.ctx, conf, s.registry, s.upsMap, s.pcFactory, s.bpFactory) client, err := loader("sdkkey") s.NoError(err) @@ -318,7 +327,7 @@ func (s *DefaultLoaderTestSuite) TestUPSHeaderOverridesDefaultKey() { tmpUPSMap := cmap.New() tmpUPSMap.Set("sdkkey", "in-memory") - loader := defaultLoader(conf, s.registry, tmpUPSMap, s.pcFactory, s.bpFactory) + loader := defaultLoader(s.ctx, conf, s.registry, tmpUPSMap, s.pcFactory, s.bpFactory) client, err := loader("sdkkey") s.NoError(err) @@ -358,7 +367,7 @@ func (s *DefaultLoaderTestSuite) TestFirstSaveConfiguresClientForRedisUPS() { }, }}, } - loader := defaultLoader(conf, s.registry, s.upsMap, s.pcFactory, s.bpFactory) + loader := defaultLoader(s.ctx, conf, s.registry, s.upsMap, s.pcFactory, s.bpFactory) client, err := loader("sdkkey") s.NoError(err) s.NotNil(client.UserProfileService) @@ -377,6 +386,10 @@ func (s *DefaultLoaderTestSuite) TestFirstSaveConfiguresClientForRedisUPS() { s.Equal("10", testRedisUPS.Password) s.Equal(1, testRedisUPS.Database) + // Check if the original context was passed to redis + s.NotNil(testRedisUPS.Ctx) + s.Equal(s.ctx, testRedisUPS.Ctx) + // Check if redis client was instantiated with updated config s.NotNil(testRedisUPS.Client) s.Equal(testRedisUPS.Address, testRedisUPS.Client.Options().Addr) @@ -394,7 +407,7 @@ func (s *DefaultLoaderTestSuite) TestHttpClientInitializesByDefaultRestUPS() { "rest": map[string]interface{}{}, }}, } - loader := defaultLoader(conf, s.registry, s.upsMap, s.pcFactory, s.bpFactory) + loader := defaultLoader(s.ctx, conf, s.registry, s.upsMap, s.pcFactory, s.bpFactory) client, err := loader("sdkkey") s.NoError(err) s.NotNil(client.UserProfileService) @@ -422,7 +435,7 @@ func (s *DefaultLoaderTestSuite) TestLoaderWithValidUserProfileServices() { }, }}, } - loader := defaultLoader(conf, s.registry, s.upsMap, s.pcFactory, s.bpFactory) + loader := defaultLoader(s.ctx, conf, s.registry, s.upsMap, s.pcFactory, s.bpFactory) client, err := loader("sdkkey") s.NoError(err) @@ -445,7 +458,7 @@ func (s *DefaultLoaderTestSuite) TestLoaderWithEmptyUserProfileServices() { conf := config.ClientConfig{ UserProfileService: map[string]interface{}{}, } - loader := defaultLoader(conf, s.registry, s.upsMap, s.pcFactory, s.bpFactory) + loader := defaultLoader(s.ctx, conf, s.registry, s.upsMap, s.pcFactory, s.bpFactory) client, err := loader("sdkkey") s.NoError(err) @@ -463,7 +476,7 @@ func (s *DefaultLoaderTestSuite) TestLoaderWithNoDefaultUserProfileServices() { "mock3": map[string]interface{}{}, }}, } - loader := defaultLoader(conf, s.registry, s.upsMap, s.pcFactory, s.bpFactory) + loader := defaultLoader(s.ctx, conf, s.registry, s.upsMap, s.pcFactory, s.bpFactory) client, err := loader("sdkkey") s.NoError(err) s.Nil(client.UserProfileService) diff --git a/plugins/userprofileservice/registry.go b/plugins/userprofileservice/registry.go index c7158298..3cf73c2e 100644 --- a/plugins/userprofileservice/registry.go +++ b/plugins/userprofileservice/registry.go @@ -18,11 +18,19 @@ package userprofileservice import ( + "context" "fmt" "github.com/optimizely/go-sdk/pkg/decision" ) +// ContextUserProfileService has the basic UserProfileService methods plus the AddContext method to provide context +// to services like Redis +type ContextUserProfileService interface { + decision.UserProfileService + AddContext(context.Context) +} + // Creator type defines a function for creating an instance of a UserProfileService type Creator func() decision.UserProfileService diff --git a/plugins/userprofileservice/services/redis_ups.go b/plugins/userprofileservice/services/redis_ups.go index 4e08b991..766afb2d 100644 --- a/plugins/userprofileservice/services/redis_ups.go +++ b/plugins/userprofileservice/services/redis_ups.go @@ -22,17 +22,18 @@ import ( "encoding/json" "time" + "github.com/bsm/redislock" "github.com/go-redis/redis/v8" "github.com/optimizely/agent/plugins/userprofileservice" "github.com/optimizely/go-sdk/pkg/decision" "github.com/rs/zerolog/log" ) -var ctx = context.Background() - // RedisUserProfileService represents the redis implementation of UserProfileService interface type RedisUserProfileService struct { + Ctx context.Context Client *redis.Client + Locker *redislock.Client Expiration time.Duration Address string `json:"host"` Password string `json:"password"` @@ -56,7 +57,7 @@ func (u *RedisUserProfileService) Lookup(userID string) (profile decision.UserPr } // Check if profile exists - result, getError := u.Client.Get(ctx, userID).Result() + result, getError := u.Client.Get(u.Ctx, userID).Result() if getError != nil { log.Error().Msg(getError.Error()) return profile @@ -93,18 +94,36 @@ func (u *RedisUserProfileService) Save(profile decision.UserProfile) { if finalProfile, err := json.Marshal(experimentBucketMap); err == nil { // Log error message if something went wrong - if setError := u.Client.Set(ctx, profile.ID, finalProfile, u.Expiration).Err(); setError != nil { + // Try to obtain lock. + + // TODO: need to discuss the time duration for the lock + lock, err := u.Locker.Obtain(u.Ctx, profile.ID, 100*time.Millisecond, nil) + if err != nil { + log.Error().Msg(err.Error()) + return + } + // Release lock after setting value + defer lock.Release(u.Ctx) + // Safely set the new value + if setError := u.Client.Set(u.Ctx, profile.ID, finalProfile, u.Expiration).Err(); setError != nil { log.Error().Msg(setError.Error()) } } } +// AddContext is used to set context in RedisUserProfileService +func (u *RedisUserProfileService) AddContext(ctx context.Context) { + u.Ctx = ctx +} + func (u *RedisUserProfileService) initClient() { u.Client = redis.NewClient(&redis.Options{ Addr: u.Address, Password: u.Password, DB: u.Database, }) + // Create a new lock client. + u.Locker = redislock.New(u.Client) } func init() { diff --git a/plugins/userprofileservice/services/redis_ups_test.go b/plugins/userprofileservice/services/redis_ups_test.go index 0d2cc485..3527727d 100644 --- a/plugins/userprofileservice/services/redis_ups_test.go +++ b/plugins/userprofileservice/services/redis_ups_test.go @@ -18,6 +18,7 @@ package services import ( + "context" "testing" "github.com/optimizely/go-sdk/pkg/decision" @@ -26,18 +27,26 @@ import ( type RedisUPSTestSuite struct { suite.Suite - ups RedisUserProfileService + ups RedisUserProfileService + cancel func() } func (r *RedisUPSTestSuite) SetupTest() { // To check if lifo is used by default + ctx, cancel := context.WithCancel(context.Background()) + r.cancel = cancel r.ups = RedisUserProfileService{ Address: "100", Password: "10", Database: 1, + Ctx: ctx, } } +func (r *RedisUPSTestSuite) TearDownTest() { + r.cancel() +} + func (r *RedisUPSTestSuite) TestFirstSaveOrLookupConfiguresClient() { r.Nil(r.ups.Client) From e85255b149d5c627a683a86251e68f8d78641630 Mon Sep 17 00:00:00 2001 From: Yasir Ali Date: Fri, 1 Apr 2022 17:17:58 +0500 Subject: [PATCH 2/4] fixes. --- config/config.go | 13 ++++++++----- go.mod | 1 - go.sum | 2 -- pkg/optimizely/cache.go | 11 ++++++++--- plugins/userprofileservice/services/redis_ups.go | 15 --------------- 5 files changed, 16 insertions(+), 26 deletions(-) diff --git a/config/config.go b/config/config.go index 42fa53ba..9a0ffae0 100644 --- a/config/config.go +++ b/config/config.go @@ -79,6 +79,9 @@ func NewDefaultConfig() *AgentConfig { EventURL: "https://logx.optimizely.com/v1/events", // https://github.com/google/re2/wiki/Syntax SdkKeyRegex: "^\\w+(:\\w+)?$", + DatafileCache: DatafileCache{ + RedisCache: RedisDatafileCache{}, + }, UserProfileService: UserProfileServiceConfigs{ "default": "", "services": map[string]interface{}{}, @@ -160,7 +163,7 @@ type ClientConfig struct { QueueSize int `json:"queueSize" default:"1000"` FlushInterval time.Duration `json:"flushInterval" default:"30s"` DatafileURLTemplate string `json:"datafileURLTemplate"` - DatafileCache DatafileCache `json:"datafileCache"` + DatafileCache DatafileCache `mapstructure:"datafileCache"` EventURL string `json:"eventURL"` SdkKeyRegex string `json:"sdkKeyRegex"` UserProfileService UserProfileServiceConfigs `json:"userProfileService"` @@ -168,14 +171,14 @@ type ClientConfig struct { // DatafileCache holds the configuration options for the Datafile Cache. type DatafileCache struct { - RedisCache RedisDatafileCache `json:"redis"` + RedisCache RedisDatafileCache `mapstructure:"redis"` } // RedisDatafileCache holds the configuration options for the Redis Datafile Cache. type RedisDatafileCache struct { - Address string `json:"host"` - Password string `json:"password"` - Database int `json:"database"` + Address string `mapstructure:"host"` + Password string `mapstructure:"password"` + Database int `mapstructure:"database"` } // LogConfig holds the log configuration diff --git a/go.mod b/go.mod index ac8fb18a..8e31b227 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,6 @@ go 1.13 require ( github.com/VividCortex/gohistogram v1.0.0 // indirect - github.com/bsm/redislock v0.7.2 github.com/go-chi/chi v4.1.1+incompatible github.com/go-chi/cors v1.1.1 github.com/go-chi/httplog v0.1.6 diff --git a/go.sum b/go.sum index c341a5a9..6376afdf 100644 --- a/go.sum +++ b/go.sum @@ -9,8 +9,6 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= -github.com/bsm/redislock v0.7.2 h1:jggqOio8JyX9FJBKIfjF3fTxAu/v7zC5mAID9LveqG4= -github.com/bsm/redislock v0.7.2/go.mod h1:kS2g0Yvlymc9Dz8V3iVYAtLAaSVruYbAFdYBDrmC5WU= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= diff --git a/pkg/optimizely/cache.go b/pkg/optimizely/cache.go index d8156ea3..011e8c15 100644 --- a/pkg/optimizely/cache.go +++ b/pkg/optimizely/cache.go @@ -201,10 +201,13 @@ func defaultLoader( Password: redisCacheConfig.Password, DB: redisCacheConfig.Database, }) - if datafile, err := redisClient.Get(ctx, sdkKey).Result(); err != nil && datafile != "" { + datafile, err := redisClient.Get(ctx, sdkKey).Result() + if err == nil && datafile != "" { // Set datafile in config manager so it uses the cached datafile for initialization options = append(options, sdkconfig.WithInitialDatafile([]byte(datafile))) isDatafileCached = true + } else { + log.Error().Msg(err.Error()) } } @@ -226,8 +229,10 @@ func defaultLoader( } if redisClient != nil && !isDatafileCached { - // Need to use redis lock here - redisClient.Set(ctx, sdkKey, configManager.GetOptimizelyConfig().GetDatafile(), 0) + datafile := configManager.GetOptimizelyConfig().GetDatafile() + if setError := redisClient.Set(ctx, sdkKey, datafile, 0).Err(); setError != nil { + log.Error().Msg(setError.Error()) + } } q := event.NewInMemoryQueue(conf.QueueSize) diff --git a/plugins/userprofileservice/services/redis_ups.go b/plugins/userprofileservice/services/redis_ups.go index 766afb2d..d013e465 100644 --- a/plugins/userprofileservice/services/redis_ups.go +++ b/plugins/userprofileservice/services/redis_ups.go @@ -22,7 +22,6 @@ import ( "encoding/json" "time" - "github.com/bsm/redislock" "github.com/go-redis/redis/v8" "github.com/optimizely/agent/plugins/userprofileservice" "github.com/optimizely/go-sdk/pkg/decision" @@ -33,7 +32,6 @@ import ( type RedisUserProfileService struct { Ctx context.Context Client *redis.Client - Locker *redislock.Client Expiration time.Duration Address string `json:"host"` Password string `json:"password"` @@ -94,17 +92,6 @@ func (u *RedisUserProfileService) Save(profile decision.UserProfile) { if finalProfile, err := json.Marshal(experimentBucketMap); err == nil { // Log error message if something went wrong - // Try to obtain lock. - - // TODO: need to discuss the time duration for the lock - lock, err := u.Locker.Obtain(u.Ctx, profile.ID, 100*time.Millisecond, nil) - if err != nil { - log.Error().Msg(err.Error()) - return - } - // Release lock after setting value - defer lock.Release(u.Ctx) - // Safely set the new value if setError := u.Client.Set(u.Ctx, profile.ID, finalProfile, u.Expiration).Err(); setError != nil { log.Error().Msg(setError.Error()) } @@ -122,8 +109,6 @@ func (u *RedisUserProfileService) initClient() { Password: u.Password, DB: u.Database, }) - // Create a new lock client. - u.Locker = redislock.New(u.Client) } func init() { From 0e643823998426c18c0f41196f95e072c6301e00 Mon Sep 17 00:00:00 2001 From: Yasir Ali Date: Mon, 4 Apr 2022 17:47:27 +0500 Subject: [PATCH 3/4] Implementation --- cmd/optimizely/main.go | 5 ++ cmd/optimizely/main_test.go | 65 ++++++++++------- cmd/optimizely/testdata/default.yaml | 4 ++ config.yaml | 2 +- config/config.go | 39 ++++------- pkg/optimizely/cache.go | 62 +++++++++------- pkg/optimizely/cache_test.go | 46 +++++++++++- pkg/optimizely/client_test.go | 8 ++- .../redis_cache_service.go | 70 +++++++++++++++++++ .../redis_cache_service_test.go | 63 +++++++++++++++++ 10 files changed, 284 insertions(+), 80 deletions(-) create mode 100644 pkg/optimizely/datafilecacheservice/redis_cache_service.go create mode 100644 pkg/optimizely/datafilecacheservice/redis_cache_service_test.go diff --git a/cmd/optimizely/main.go b/cmd/optimizely/main.go index 7d5958e5..c8bd2c2c 100644 --- a/cmd/optimizely/main.go +++ b/cmd/optimizely/main.go @@ -93,6 +93,11 @@ func loadConfig(v *viper.Viper) *config.AgentConfig { conf.Client.UserProfileService = userProfileService } + // Check if JSON string was set using OPTIMIZELY_CLIENT_DATAFILECACHESERVICE environment variable + if datafileCacheService := v.GetStringMap("client.datafilecacheservice"); datafileCacheService != nil { + conf.Client.DatafileCacheService = datafileCacheService + } + return conf } diff --git a/cmd/optimizely/main_test.go b/cmd/optimizely/main_test.go index a1e01842..eae168b1 100644 --- a/cmd/optimizely/main_test.go +++ b/cmd/optimizely/main_test.go @@ -56,7 +56,7 @@ func assertServer(t *testing.T, actual config.ServerConfig, assertPlugins bool) } } -func assertClient(t *testing.T, actual config.ClientConfig, assertUserProfileService bool) { +func assertClient(t *testing.T, actual config.ClientConfig) { assert.Equal(t, 10*time.Second, actual.PollingInterval) assert.Equal(t, 1, actual.BatchSize) assert.Equal(t, 10, actual.QueueSize) @@ -64,29 +64,33 @@ func assertClient(t *testing.T, actual config.ClientConfig, assertUserProfileSer assert.Equal(t, "https://localhost/v1/%s.json", actual.DatafileURLTemplate) assert.Equal(t, "https://logx.localhost.com/v1", actual.EventURL) assert.Equal(t, "custom-regex", actual.SdkKeyRegex) - if assertUserProfileService { - assert.Equal(t, "in-memory", actual.UserProfileService["default"]) - userProfileServices := map[string]interface{}{ - "in-memory": map[string]interface{}{ - // Viper.set is case in-sensitive - "storagestrategy": "fifo", - }, - "redis": map[string]interface{}{ - "host": "localhost:6379", - "password": "", - }, - "rest": map[string]interface{}{ - "host": "http://localhost", - "lookuppath": "/ups/lookup", - "savepath": "/ups/save", - "headers": map[string]interface{}{"content-type": "application/json"}, - }, - "custom": map[string]interface{}{ - "path": "http://test2.com", - }, - } - assert.Equal(t, userProfileServices, actual.UserProfileService["services"]) + assert.Equal(t, "in-memory", actual.UserProfileService["default"]) + userProfileServices := map[string]interface{}{ + "in-memory": map[string]interface{}{ + // Viper.set is case in-sensitive + "storagestrategy": "fifo", + }, + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "", + }, + "rest": map[string]interface{}{ + "host": "http://localhost", + "lookuppath": "/ups/lookup", + "savepath": "/ups/save", + "headers": map[string]interface{}{"content-type": "application/json"}, + }, + "custom": map[string]interface{}{ + "path": "http://test2.com", + }, + } + assert.Equal(t, userProfileServices, actual.UserProfileService["services"]) + + datafileCacheService := map[string]interface{}{ + "host": "localhost:6379", + "password": "123", } + assert.Equal(t, datafileCacheService, actual.DatafileCacheService["redis"]) } func assertLog(t *testing.T, actual config.LogConfig) { @@ -164,7 +168,7 @@ func TestViperYaml(t *testing.T) { assertRoot(t, actual) assertServer(t, actual.Server, true) - assertClient(t, actual.Client, true) + assertClient(t, actual.Client) assertLog(t, actual.Log) assertAdmin(t, actual.Admin) assertAdminAuth(t, actual.Admin.Auth) @@ -225,6 +229,14 @@ func TestViperProps(t *testing.T) { } v.Set("client.userProfileService", userProfileServices) + datafileCacheService := map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "123", + }, + } + v.Set("client.datafileCacheService", datafileCacheService) + v.Set("log.pretty", true) v.Set("log.includeSdkKey", false) v.Set("log.level", "debug") @@ -277,7 +289,7 @@ func TestViperProps(t *testing.T) { assertRoot(t, actual) assertServer(t, actual.Server, true) - assertClient(t, actual.Client, true) + assertClient(t, actual.Client) assertLog(t, actual.Log) assertAdmin(t, actual.Admin) assertAdminAuth(t, actual.Admin.Auth) @@ -311,6 +323,7 @@ func TestViperEnv(t *testing.T) { _ = os.Setenv("OPTIMIZELY_CLIENT_EVENTURL", "https://logx.localhost.com/v1") _ = os.Setenv("OPTIMIZELY_CLIENT_SDKKEYREGEX", "custom-regex") _ = os.Setenv("OPTIMIZELY_CLIENT_USERPROFILESERVICE", `{"default":"in-memory","services":{"in-memory":{"storagestrategy":"fifo"},"redis":{"host":"localhost:6379","password":""},"rest":{"host":"http://localhost","lookuppath":"/ups/lookup","savepath":"/ups/save","headers":{"content-type":"application/json"}},"custom":{"path":"http://test2.com"}}}`) + _ = os.Setenv("OPTIMIZELY_CLIENT_DATAFILECACHESERVICE", `{"redis":{"host":"localhost:6379","password":"123"}}`) _ = os.Setenv("OPTIMIZELY_LOG_PRETTY", "true") _ = os.Setenv("OPTIMIZELY_LOG_INCLUDESDKKEY", "false") @@ -340,7 +353,7 @@ func TestViperEnv(t *testing.T) { assertRoot(t, actual) assertServer(t, actual.Server, false) - assertClient(t, actual.Client, true) + assertClient(t, actual.Client) assertLog(t, actual.Log) assertAdmin(t, actual.Admin) assertAPI(t, actual.API) diff --git a/cmd/optimizely/testdata/default.yaml b/cmd/optimizely/testdata/default.yaml index e8adcdb7..7dc02627 100644 --- a/cmd/optimizely/testdata/default.yaml +++ b/cmd/optimizely/testdata/default.yaml @@ -32,6 +32,10 @@ client: datafileURLTemplate: "https://localhost/v1/%s.json" eventURL: "https://logx.localhost.com/v1" sdkKeyRegex: "custom-regex" + datafileCacheService: + redis: + host: "localhost:6379" + password: "123" userProfileService: default: "in-memory" services: diff --git a/config.yaml b/config.yaml index f35bbfef..76a0fa32 100644 --- a/config.yaml +++ b/config.yaml @@ -130,7 +130,7 @@ client: ## Template URL for SDK datafile location. The template should specify a "%s" token for SDK key substitution. datafileURLTemplate: "https://cdn.optimizely.com/datafiles/%s.json" ## Datafile cache to save initial datafile fetch call in a multi-node environment. - datafileCache: + datafileCacheService: # redis: # host: "localhost:6379" # password: "" diff --git a/config/config.go b/config/config.go index 9a0ffae0..7286122f 100644 --- a/config/config.go +++ b/config/config.go @@ -78,10 +78,8 @@ func NewDefaultConfig() *AgentConfig { DatafileURLTemplate: "https://cdn.optimizely.com/datafiles/%s.json", EventURL: "https://logx.optimizely.com/v1/events", // https://github.com/google/re2/wiki/Syntax - SdkKeyRegex: "^\\w+(:\\w+)?$", - DatafileCache: DatafileCache{ - RedisCache: RedisDatafileCache{}, - }, + SdkKeyRegex: "^\\w+(:\\w+)?$", + DatafileCacheService: DatafileCache{}, UserProfileService: UserProfileServiceConfigs{ "default": "", "services": map[string]interface{}{}, @@ -156,29 +154,20 @@ func (ac *AgentConfig) LogConfigWarnings() { // UserProfileServiceConfigs defines the generic mapping of userprofileservice plugins type UserProfileServiceConfigs map[string]interface{} -// ClientConfig holds the configuration options for the Optimizely Client. -type ClientConfig struct { - PollingInterval time.Duration `json:"pollingInterval"` - BatchSize int `json:"batchSize" default:"10"` - QueueSize int `json:"queueSize" default:"1000"` - FlushInterval time.Duration `json:"flushInterval" default:"30s"` - DatafileURLTemplate string `json:"datafileURLTemplate"` - DatafileCache DatafileCache `mapstructure:"datafileCache"` - EventURL string `json:"eventURL"` - SdkKeyRegex string `json:"sdkKeyRegex"` - UserProfileService UserProfileServiceConfigs `json:"userProfileService"` -} - // DatafileCache holds the configuration options for the Datafile Cache. -type DatafileCache struct { - RedisCache RedisDatafileCache `mapstructure:"redis"` -} +type DatafileCache map[string]interface{} -// RedisDatafileCache holds the configuration options for the Redis Datafile Cache. -type RedisDatafileCache struct { - Address string `mapstructure:"host"` - Password string `mapstructure:"password"` - Database int `mapstructure:"database"` +// ClientConfig holds the configuration options for the Optimizely Client. +type ClientConfig struct { + PollingInterval time.Duration `json:"pollingInterval"` + BatchSize int `json:"batchSize" default:"10"` + QueueSize int `json:"queueSize" default:"1000"` + FlushInterval time.Duration `json:"flushInterval" default:"30s"` + DatafileURLTemplate string `json:"datafileURLTemplate"` + DatafileCacheService DatafileCache `json:"datafileCacheService"` + EventURL string `json:"eventURL"` + SdkKeyRegex string `json:"sdkKeyRegex"` + UserProfileService UserProfileServiceConfigs `json:"userProfileService"` } // LogConfig holds the log configuration diff --git a/pkg/optimizely/cache.go b/pkg/optimizely/cache.go index 011e8c15..ae115858 100644 --- a/pkg/optimizely/cache.go +++ b/pkg/optimizely/cache.go @@ -25,8 +25,8 @@ import ( "strings" "sync" - "github.com/go-redis/redis/v8" "github.com/optimizely/agent/config" + "github.com/optimizely/agent/pkg/optimizely/datafilecacheservice" "github.com/optimizely/agent/plugins/userprofileservice" "github.com/optimizely/go-sdk/pkg/client" sdkconfig "github.com/optimizely/go-sdk/pkg/config" @@ -47,6 +47,13 @@ type OptlyCache struct { wg sync.WaitGroup } +type datafileCacheServiceType = string + +// Represents types of datafile cache services +const ( + redis datafileCacheServiceType = "redis" +) + // NewCache returns a new implementation of OptlyCache interface backed by a concurrent map. func NewCache(ctx context.Context, conf config.ClientConfig, metricsRegistry *MetricsRegistry) *OptlyCache { @@ -190,25 +197,12 @@ func defaultLoader( // Options for PollingProjectConfigManager options := []sdkconfig.OptionFunc{} - // Check if datafile is already present in redis cache - var redisClient *redis.Client - var isDatafileCached bool - - if conf.DatafileCache.RedisCache.Address != "" { - redisCacheConfig := conf.DatafileCache.RedisCache - redisClient = redis.NewClient(&redis.Options{ - Addr: redisCacheConfig.Address, - Password: redisCacheConfig.Password, - DB: redisCacheConfig.Database, - }) - datafile, err := redisClient.Get(ctx, sdkKey).Result() - if err == nil && datafile != "" { - // Set datafile in config manager so it uses the cached datafile for initialization - options = append(options, sdkconfig.WithInitialDatafile([]byte(datafile))) - isDatafileCached = true - } else { - log.Error().Msg(err.Error()) - } + + // Check if datafile is already present in cache + cachedDatafile, cacheService := getDatafileFromCacheService(ctx, sdkKey, conf) + if cachedDatafile != "" { + // Set datafile in config manager so it uses the cached datafile for initialization + options = append(options, sdkconfig.WithInitialDatafile([]byte(cachedDatafile))) } options = append(options, @@ -228,11 +222,10 @@ func defaultLoader( return &OptlyClient{}, err } - if redisClient != nil && !isDatafileCached { + // Set datafile in datafileCacheService if not present + if cachedDatafile == "" && cacheService != nil { datafile := configManager.GetOptimizelyConfig().GetDatafile() - if setError := redisClient.Set(ctx, sdkKey, datafile, 0).Err(); setError != nil { - log.Error().Msg(setError.Error()) - } + cacheService.SetDatafileInCacheService(ctx, sdkKey, datafile) } q := event.NewInMemoryQueue(conf.QueueSize) @@ -267,6 +260,27 @@ func defaultLoader( } } +func getDatafileFromCacheService(ctx context.Context, sdkKey string, conf config.ClientConfig) (datafile string, cacheService datafilecacheservice.DatafileCacheService) { + // In case of multiple cache services provided, use the first valid service + for k, v := range conf.DatafileCacheService { + bytes, err := json.Marshal(v) + if err != nil { + continue + } + switch k { + case redis: + var redisDatafileCache datafilecacheservice.RedisCacheService + if err = json.Unmarshal(bytes, &redisDatafileCache); err != nil || redisDatafileCache.Address == "" { + continue + } + return redisDatafileCache.GetDatafileFromCacheService(ctx, sdkKey), &redisDatafileCache + default: + // do nothing + } + } + return "", nil +} + // Returns the registered userProfileService against the sdkKey func getUserProfileService(ctx context.Context, sdkKey string, userProfileServiceMap cmap.ConcurrentMap, conf config.ClientConfig) decision.UserProfileService { diff --git a/pkg/optimizely/cache_test.go b/pkg/optimizely/cache_test.go index 99757915..e98a25a0 100644 --- a/pkg/optimizely/cache_test.go +++ b/pkg/optimizely/cache_test.go @@ -1,5 +1,5 @@ /**************************************************************************** - * Copyright 2019,2021 Optimizely, Inc. and contributors * + * Copyright 2019,2021-2022 Optimizely, Inc. and contributors * * * * Licensed under the Apache License, Version 2.0 (the "License"); * * you may not use this file except in compliance with the License. * @@ -267,7 +267,10 @@ func (s *DefaultLoaderTestSuite) SetupTest() { // Note we're NOT testing that the ConfigManager was configured properly // This would require a bit larger refactor since the optimizelyFactory.Client takes a few liberties s.pcFactory = func(sdkKey string, options ...sdkconfig.OptionFunc) SyncedConfigManager { - return MockConfigManager{} + return MockConfigManager{ + // Needed since GetOptimizelyConfig is called in case no datafile is saved in the datafileCacheService + optlyConfig: sdkconfig.NewOptimizelyConfig(&optimizelytest.TestProjectConfig{Datafile: `{"abs":123,}`}), + } } } @@ -288,6 +291,12 @@ func (s *DefaultLoaderTestSuite) TestDefaultLoader() { "storageStrategy": "fifo", }}, }, + DatafileCacheService: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "123", + }, + }, } loader := defaultLoader(s.ctx, conf, s.registry, s.upsMap, s.pcFactory, s.bpFactory) @@ -482,6 +491,39 @@ func (s *DefaultLoaderTestSuite) TestLoaderWithNoDefaultUserProfileServices() { s.Nil(client.UserProfileService) } +func (s *DefaultLoaderTestSuite) TestGetDatafileFromCacheService() { + // Redis + conf := config.ClientConfig{ + DatafileCacheService: map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "", + }, + }} + + datafile, service := getDatafileFromCacheService(s.ctx, "123", conf) + s.Equal("", datafile) + s.NotNil(service) + + // Empty redis config + conf.DatafileCacheService["redis"] = map[string]interface{}{} + datafile, service = getDatafileFromCacheService(s.ctx, "123", conf) + s.Equal("", datafile) + s.Nil(service) + + // No configs + conf.DatafileCacheService = make(config.DatafileCache) + datafile, service = getDatafileFromCacheService(s.ctx, "123", conf) + s.Equal("", datafile) + s.Nil(service) + + // invalid configs + conf.DatafileCacheService["invalid"] = map[string]interface{}{} + datafile, service = getDatafileFromCacheService(s.ctx, "123", conf) + s.Equal("", datafile) + s.Nil(service) +} + func (s *DefaultLoaderTestSuite) TestDefaultRegexValidator() { scenarios := []struct { diff --git a/pkg/optimizely/client_test.go b/pkg/optimizely/client_test.go index fa5d9481..447dde32 100644 --- a/pkg/optimizely/client_test.go +++ b/pkg/optimizely/client_test.go @@ -1,5 +1,5 @@ /**************************************************************************** - * Copyright 2019-2020, Optimizely, Inc. and contributors * + * Copyright 2019-2020,2022 Optimizely, Inc. and contributors * * * * Licensed under the Apache License, Version 2.0 (the "License"); * * you may not use this file except in compliance with the License. * @@ -281,7 +281,8 @@ func (e ErrorConfigManager) SyncConfig() { } type MockConfigManager struct { - config config.ProjectConfig + config config.ProjectConfig + optlyConfig *config.OptimizelyConfig } func (m MockConfigManager) RemoveOnProjectConfigUpdate(int) error { @@ -297,6 +298,9 @@ func (m MockConfigManager) GetConfig() (config.ProjectConfig, error) { } func (m MockConfigManager) GetOptimizelyConfig() *config.OptimizelyConfig { + if m.optlyConfig != nil { + return m.optlyConfig + } panic("implement me") } diff --git a/pkg/optimizely/datafilecacheservice/redis_cache_service.go b/pkg/optimizely/datafilecacheservice/redis_cache_service.go new file mode 100644 index 00000000..2296eb4f --- /dev/null +++ b/pkg/optimizely/datafilecacheservice/redis_cache_service.go @@ -0,0 +1,70 @@ +/**************************************************************************** + * Copyright 2022, Optimizely, Inc. and contributors * + * * + * Licensed under the Apache License, Version 2.0 (the "License"); * + * you may not use this file except in compliance with the License. * + * You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, software * + * distributed under the License is distributed on an "AS IS" BASIS, * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * + * See the License for the specific language governing permissions and * + * limitations under the License. * + ***************************************************************************/ + +// Package datafilecacheservice // +package datafilecacheservice + +import ( + "context" + + "github.com/go-redis/redis/v8" + "github.com/rs/zerolog/log" +) + +// DatafileCacheService represents interface for datafileCacheService +type DatafileCacheService interface { + GetDatafileFromCacheService(ctx context.Context, sdkKey string) string + SetDatafileInCacheService(ctx context.Context, sdkKey, datafile string) +} + +// RedisCacheService represents the redis implementation of DatafileCacheService interface +type RedisCacheService struct { + Client *redis.Client + Address string `json:"host"` + Password string `json:"password"` + Database int `json:"database"` +} + +// GetDatafileFromCacheService returns the saved datafile from the cache service +func (r *RedisCacheService) GetDatafileFromCacheService(ctx context.Context, sdkKey string) string { + if r.Client == nil { + r.initClient() + } + datafile, err := r.Client.Get(ctx, sdkKey).Result() + if err != nil { + log.Error().Msg(err.Error()) + return "" + } + return datafile +} + +// SetDatafileInCacheService saves the datafile in the cache service +func (r *RedisCacheService) SetDatafileInCacheService(ctx context.Context, sdkKey, datafile string) { + if r.Client == nil { + r.initClient() + } + if setError := r.Client.Set(ctx, sdkKey, datafile, 0).Err(); setError != nil { + log.Error().Msg(setError.Error()) + } +} + +func (r *RedisCacheService) initClient() { + r.Client = redis.NewClient(&redis.Options{ + Addr: r.Address, + Password: r.Password, + DB: r.Database, + }) +} diff --git a/pkg/optimizely/datafilecacheservice/redis_cache_service_test.go b/pkg/optimizely/datafilecacheservice/redis_cache_service_test.go new file mode 100644 index 00000000..04d92625 --- /dev/null +++ b/pkg/optimizely/datafilecacheservice/redis_cache_service_test.go @@ -0,0 +1,63 @@ +/**************************************************************************** + * Copyright 2022, Optimizely, Inc. and contributors * + * * + * Licensed under the Apache License, Version 2.0 (the "License"); * + * you may not use this file except in compliance with the License. * + * You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, software * + * distributed under the License is distributed on an "AS IS" BASIS, * + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * + * See the License for the specific language governing permissions and * + * limitations under the License. * + ***************************************************************************/ + +// Package datafilecacheservice // +package datafilecacheservice + +import ( + "context" + "testing" + + "github.com/stretchr/testify/suite" +) + +type RedisCacheServiceTestSuite struct { + suite.Suite + service RedisCacheService + ctx context.Context + cancel func() +} + +func (r *RedisCacheServiceTestSuite) SetupTest() { + // To check if lifo is used by default + r.ctx, r.cancel = context.WithCancel(context.Background()) + r.service = RedisCacheService{ + Address: "100", + Password: "10", + Database: 1, + } +} + +func (r *RedisCacheServiceTestSuite) TearDownTest() { + r.cancel() +} + +func (r *RedisCacheServiceTestSuite) TestFirstSaveOrLookupConfiguresClient() { + r.Nil(r.service.Client) + + // Should initialize redis client on first SetDatafileInCacheService call + r.service.SetDatafileInCacheService(r.ctx, "123", `{"abs":123,}`) + r.NotNil(r.service.Client) + + r.service.Client = nil + // Should initialize redis client on first GetDatafileFromCacheService call + r.service.GetDatafileFromCacheService(r.ctx, "123") + r.NotNil(r.service.Client) +} + +func TestRedisUPSTestSuite(t *testing.T) { + suite.Run(t, new(RedisCacheServiceTestSuite)) +} From 9f8d742391d21f5c6e00b68ff4144944d224d36e Mon Sep 17 00:00:00 2001 From: Yasir Ali Date: Wed, 6 Apr 2022 16:47:11 +0500 Subject: [PATCH 4/4] suggested changes made. --- cmd/optimizely/main_test.go | 20 ++++++---- cmd/optimizely/testdata/default.yaml | 8 ++-- config.yaml | 10 +++-- config/config.go | 7 +++- config/config_test.go | 4 ++ pkg/optimizely/cache.go | 40 +++++++++++-------- pkg/optimizely/cache_test.go | 40 ++++++++++++++----- .../redis_cache_service.go | 21 +++++----- .../userprofileservice/services/redis_ups.go | 28 ++++--------- .../services/redis_ups_test.go | 11 +++-- 10 files changed, 111 insertions(+), 78 deletions(-) diff --git a/cmd/optimizely/main_test.go b/cmd/optimizely/main_test.go index eae168b1..6c04203c 100644 --- a/cmd/optimizely/main_test.go +++ b/cmd/optimizely/main_test.go @@ -87,10 +87,13 @@ func assertClient(t *testing.T, actual config.ClientConfig) { assert.Equal(t, userProfileServices, actual.UserProfileService["services"]) datafileCacheService := map[string]interface{}{ - "host": "localhost:6379", - "password": "123", + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "123", + }, } - assert.Equal(t, datafileCacheService, actual.DatafileCacheService["redis"]) + assert.Equal(t, datafileCacheService, actual.DatafileCacheService["services"]) + assert.Equal(t, true, actual.DatafileCacheService["enabled"]) } func assertLog(t *testing.T, actual config.LogConfig) { @@ -230,9 +233,12 @@ func TestViperProps(t *testing.T) { v.Set("client.userProfileService", userProfileServices) datafileCacheService := map[string]interface{}{ - "redis": map[string]interface{}{ - "host": "localhost:6379", - "password": "123", + "enabled": true, + "services": map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "123", + }, }, } v.Set("client.datafileCacheService", datafileCacheService) @@ -323,7 +329,7 @@ func TestViperEnv(t *testing.T) { _ = os.Setenv("OPTIMIZELY_CLIENT_EVENTURL", "https://logx.localhost.com/v1") _ = os.Setenv("OPTIMIZELY_CLIENT_SDKKEYREGEX", "custom-regex") _ = os.Setenv("OPTIMIZELY_CLIENT_USERPROFILESERVICE", `{"default":"in-memory","services":{"in-memory":{"storagestrategy":"fifo"},"redis":{"host":"localhost:6379","password":""},"rest":{"host":"http://localhost","lookuppath":"/ups/lookup","savepath":"/ups/save","headers":{"content-type":"application/json"}},"custom":{"path":"http://test2.com"}}}`) - _ = os.Setenv("OPTIMIZELY_CLIENT_DATAFILECACHESERVICE", `{"redis":{"host":"localhost:6379","password":"123"}}`) + _ = os.Setenv("OPTIMIZELY_CLIENT_DATAFILECACHESERVICE", `{"enabled":true,"services":{"redis":{"host":"localhost:6379","password":"123"}}}`) _ = os.Setenv("OPTIMIZELY_LOG_PRETTY", "true") _ = os.Setenv("OPTIMIZELY_LOG_INCLUDESDKKEY", "false") diff --git a/cmd/optimizely/testdata/default.yaml b/cmd/optimizely/testdata/default.yaml index 7dc02627..0ce66080 100644 --- a/cmd/optimizely/testdata/default.yaml +++ b/cmd/optimizely/testdata/default.yaml @@ -33,9 +33,11 @@ client: eventURL: "https://logx.localhost.com/v1" sdkKeyRegex: "custom-regex" datafileCacheService: - redis: - host: "localhost:6379" - password: "123" + enabled: true + services: + redis: + host: "localhost:6379" + password: "123" userProfileService: default: "in-memory" services: diff --git a/config.yaml b/config.yaml index 76a0fa32..cf5efe3d 100644 --- a/config.yaml +++ b/config.yaml @@ -131,10 +131,12 @@ client: datafileURLTemplate: "https://cdn.optimizely.com/datafiles/%s.json" ## Datafile cache to save initial datafile fetch call in a multi-node environment. datafileCacheService: - # redis: - # host: "localhost:6379" - # password: "" - # database: 0 + enabled: false + services: + # redis: + # host: "localhost:6379" + # password: "" + # database: 0 ## URL for dispatching events. eventURL: "https://logx.optimizely.com/v1/events" ## Validation Regex on the request SDK Key diff --git a/config/config.go b/config/config.go index 7286122f..c2155fe7 100644 --- a/config/config.go +++ b/config/config.go @@ -78,8 +78,11 @@ func NewDefaultConfig() *AgentConfig { DatafileURLTemplate: "https://cdn.optimizely.com/datafiles/%s.json", EventURL: "https://logx.optimizely.com/v1/events", // https://github.com/google/re2/wiki/Syntax - SdkKeyRegex: "^\\w+(:\\w+)?$", - DatafileCacheService: DatafileCache{}, + SdkKeyRegex: "^\\w+(:\\w+)?$", + DatafileCacheService: DatafileCache{ + "enabled": false, + "services": map[string]interface{}{}, + }, UserProfileService: UserProfileServiceConfigs{ "default": "", "services": map[string]interface{}{}, diff --git a/config/config_test.go b/config/config_test.go index 4cc52bd2..978ef8a5 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -84,6 +84,10 @@ func TestDefaultConfig(t *testing.T) { assert.Equal(t, "^\\w+(:\\w+)?$", conf.Client.SdkKeyRegex) assert.Equal(t, "", conf.Client.UserProfileService["default"]) assert.Equal(t, map[string]interface{}{}, conf.Client.UserProfileService["services"]) + assert.Equal(t, DatafileCache{ + "enabled": false, + "services": map[string]interface{}{}, + }, conf.Client.DatafileCacheService) assert.Equal(t, 0, conf.Runtime.BlockProfileRate) assert.Equal(t, 0, conf.Runtime.MutexProfileFraction) diff --git a/pkg/optimizely/cache.go b/pkg/optimizely/cache.go index ae115858..dd84ea31 100644 --- a/pkg/optimizely/cache.go +++ b/pkg/optimizely/cache.go @@ -47,11 +47,12 @@ type OptlyCache struct { wg sync.WaitGroup } -type datafileCacheServiceType = string +type datafileCacheServiceKey = string -// Represents types of datafile cache services +// Represents keys in datafile cache services const ( - redis datafileCacheServiceType = "redis" + enabled datafileCacheServiceKey = "enabled" + redis datafileCacheServiceKey = "redis" ) // NewCache returns a new implementation of OptlyCache interface backed by a concurrent map. @@ -261,21 +262,26 @@ func defaultLoader( } func getDatafileFromCacheService(ctx context.Context, sdkKey string, conf config.ClientConfig) (datafile string, cacheService datafilecacheservice.DatafileCacheService) { - // In case of multiple cache services provided, use the first valid service - for k, v := range conf.DatafileCacheService { - bytes, err := json.Marshal(v) - if err != nil { - continue - } - switch k { - case redis: - var redisDatafileCache datafilecacheservice.RedisCacheService - if err = json.Unmarshal(bytes, &redisDatafileCache); err != nil || redisDatafileCache.Address == "" { - continue + // Check whether datafileCacheService should be enabled + if shouldEnable, ok := conf.DatafileCacheService[enabled].(bool); ok && shouldEnable { + if services, ok := conf.DatafileCacheService["services"].(map[string]interface{}); ok { + // In case of multiple cache services provided, use the first valid service + for k, v := range services { + bytes, err := json.Marshal(v) + if err != nil { + continue + } + switch k { + case redis: + var redisDatafileCache datafilecacheservice.RedisCacheService + if err = json.Unmarshal(bytes, &redisDatafileCache); err != nil || redisDatafileCache.Address == "" { + continue + } + return redisDatafileCache.GetDatafileFromCacheService(ctx, sdkKey), &redisDatafileCache + default: + // do nothing + } } - return redisDatafileCache.GetDatafileFromCacheService(ctx, sdkKey), &redisDatafileCache - default: - // do nothing } } return "", nil diff --git a/pkg/optimizely/cache_test.go b/pkg/optimizely/cache_test.go index e98a25a0..0cb629b5 100644 --- a/pkg/optimizely/cache_test.go +++ b/pkg/optimizely/cache_test.go @@ -292,9 +292,12 @@ func (s *DefaultLoaderTestSuite) TestDefaultLoader() { }}, }, DatafileCacheService: map[string]interface{}{ - "redis": map[string]interface{}{ - "host": "localhost:6379", - "password": "123", + "enabled": true, + "services": map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "123", + }, }, }, } @@ -492,21 +495,38 @@ func (s *DefaultLoaderTestSuite) TestLoaderWithNoDefaultUserProfileServices() { } func (s *DefaultLoaderTestSuite) TestGetDatafileFromCacheService() { - // Redis + cacheServices := map[string]interface{}{ + "redis": map[string]interface{}{ + "host": "localhost:6379", + "password": "", + }, + } + // Redis Enabled conf := config.ClientConfig{ DatafileCacheService: map[string]interface{}{ - "redis": map[string]interface{}{ - "host": "localhost:6379", - "password": "", - }, + "enabled": true, + "services": cacheServices, }} datafile, service := getDatafileFromCacheService(s.ctx, "123", conf) s.Equal("", datafile) s.NotNil(service) + // Redis Disabled + conf.DatafileCacheService["enabled"] = false + datafile, service = getDatafileFromCacheService(s.ctx, "123", conf) + s.Equal("", datafile) + s.Nil(service) + + // No Enabled key + delete(conf.DatafileCacheService, "enabled") + datafile, service = getDatafileFromCacheService(s.ctx, "123", conf) + s.Equal("", datafile) + s.Nil(service) + // Empty redis config - conf.DatafileCacheService["redis"] = map[string]interface{}{} + conf.DatafileCacheService["enabled"] = true + cacheServices["redis"] = map[string]interface{}{} datafile, service = getDatafileFromCacheService(s.ctx, "123", conf) s.Equal("", datafile) s.Nil(service) @@ -518,7 +538,7 @@ func (s *DefaultLoaderTestSuite) TestGetDatafileFromCacheService() { s.Nil(service) // invalid configs - conf.DatafileCacheService["invalid"] = map[string]interface{}{} + cacheServices["invalid"] = map[string]interface{}{} datafile, service = getDatafileFromCacheService(s.ctx, "123", conf) s.Equal("", datafile) s.Nil(service) diff --git a/pkg/optimizely/datafilecacheservice/redis_cache_service.go b/pkg/optimizely/datafilecacheservice/redis_cache_service.go index 2296eb4f..0693ce32 100644 --- a/pkg/optimizely/datafilecacheservice/redis_cache_service.go +++ b/pkg/optimizely/datafilecacheservice/redis_cache_service.go @@ -38,10 +38,19 @@ type RedisCacheService struct { Database int `json:"database"` } +// InitClient initializes redis client with the given configuration +func (r *RedisCacheService) InitClient() { + r.Client = redis.NewClient(&redis.Options{ + Addr: r.Address, + Password: r.Password, + DB: r.Database, + }) +} + // GetDatafileFromCacheService returns the saved datafile from the cache service func (r *RedisCacheService) GetDatafileFromCacheService(ctx context.Context, sdkKey string) string { if r.Client == nil { - r.initClient() + r.InitClient() } datafile, err := r.Client.Get(ctx, sdkKey).Result() if err != nil { @@ -54,17 +63,9 @@ func (r *RedisCacheService) GetDatafileFromCacheService(ctx context.Context, sdk // SetDatafileInCacheService saves the datafile in the cache service func (r *RedisCacheService) SetDatafileInCacheService(ctx context.Context, sdkKey, datafile string) { if r.Client == nil { - r.initClient() + r.InitClient() } if setError := r.Client.Set(ctx, sdkKey, datafile, 0).Err(); setError != nil { log.Error().Msg(setError.Error()) } } - -func (r *RedisCacheService) initClient() { - r.Client = redis.NewClient(&redis.Options{ - Addr: r.Address, - Password: r.Password, - DB: r.Database, - }) -} diff --git a/plugins/userprofileservice/services/redis_ups.go b/plugins/userprofileservice/services/redis_ups.go index d013e465..04c4f48c 100644 --- a/plugins/userprofileservice/services/redis_ups.go +++ b/plugins/userprofileservice/services/redis_ups.go @@ -22,7 +22,7 @@ import ( "encoding/json" "time" - "github.com/go-redis/redis/v8" + "github.com/optimizely/agent/pkg/optimizely/datafilecacheservice" "github.com/optimizely/agent/plugins/userprofileservice" "github.com/optimizely/go-sdk/pkg/decision" "github.com/rs/zerolog/log" @@ -30,12 +30,8 @@ import ( // RedisUserProfileService represents the redis implementation of UserProfileService interface type RedisUserProfileService struct { - Ctx context.Context - Client *redis.Client - Expiration time.Duration - Address string `json:"host"` - Password string `json:"password"` - Database int `json:"database"` + datafilecacheservice.RedisCacheService + Ctx context.Context } // Lookup is used to retrieve past bucketing decisions for users @@ -47,7 +43,7 @@ func (u *RedisUserProfileService) Lookup(userID string) (profile decision.UserPr // This is required in both lookup and save since an old redis instance can also be used if u.Client == nil { - u.initClient() + u.InitClient() } if userID == "" { @@ -78,7 +74,7 @@ func (u *RedisUserProfileService) Save(profile decision.UserProfile) { // This is required in both lookup and save since an old redis instance can also be used if u.Client == nil { - u.initClient() + u.InitClient() } if profile.ID == "" { @@ -92,7 +88,7 @@ func (u *RedisUserProfileService) Save(profile decision.UserProfile) { if finalProfile, err := json.Marshal(experimentBucketMap); err == nil { // Log error message if something went wrong - if setError := u.Client.Set(u.Ctx, profile.ID, finalProfile, u.Expiration).Err(); setError != nil { + if setError := u.Client.Set(u.Ctx, profile.ID, finalProfile, 0*time.Second).Err(); setError != nil { log.Error().Msg(setError.Error()) } } @@ -103,19 +99,9 @@ func (u *RedisUserProfileService) AddContext(ctx context.Context) { u.Ctx = ctx } -func (u *RedisUserProfileService) initClient() { - u.Client = redis.NewClient(&redis.Options{ - Addr: u.Address, - Password: u.Password, - DB: u.Database, - }) -} - func init() { redisUPSCreator := func() decision.UserProfileService { - return &RedisUserProfileService{ - Expiration: 0 * time.Second, - } + return &RedisUserProfileService{} } userprofileservice.Add("redis", redisUPSCreator) } diff --git a/plugins/userprofileservice/services/redis_ups_test.go b/plugins/userprofileservice/services/redis_ups_test.go index 3527727d..7791dbe0 100644 --- a/plugins/userprofileservice/services/redis_ups_test.go +++ b/plugins/userprofileservice/services/redis_ups_test.go @@ -21,6 +21,7 @@ import ( "context" "testing" + "github.com/optimizely/agent/pkg/optimizely/datafilecacheservice" "github.com/optimizely/go-sdk/pkg/decision" "github.com/stretchr/testify/suite" ) @@ -36,10 +37,12 @@ func (r *RedisUPSTestSuite) SetupTest() { ctx, cancel := context.WithCancel(context.Background()) r.cancel = cancel r.ups = RedisUserProfileService{ - Address: "100", - Password: "10", - Database: 1, - Ctx: ctx, + RedisCacheService: datafilecacheservice.RedisCacheService{ + Address: "100", + Password: "10", + Database: 1, + }, + Ctx: ctx, } }