From c989ef2731127b8b6d79808b5ec9b9b28a5be6e9 Mon Sep 17 00:00:00 2001 From: Jonathan Lange Date: Wed, 6 Dec 2017 15:59:59 +0000 Subject: [PATCH 01/13] Remove redundant `config` struct --- pkg/configs/db/memory/memory.go | 30 +++++++----------------------- 1 file changed, 7 insertions(+), 23 deletions(-) diff --git a/pkg/configs/db/memory/memory.go b/pkg/configs/db/memory/memory.go index b48232fed3a..5cb361b4913 100644 --- a/pkg/configs/db/memory/memory.go +++ b/pkg/configs/db/memory/memory.go @@ -6,28 +6,16 @@ import ( "github.com/weaveworks/cortex/pkg/configs" ) -type config struct { - cfg configs.Config - id configs.ID -} - -func (c config) toView() configs.View { - return configs.View{ - ID: c.id, - Config: c.cfg, - } -} - // DB is an in-memory database for testing, and local development type DB struct { - cfgs map[string]config + cfgs map[string]configs.View id uint } // New creates a new in-memory database func New(_, _ string) (*DB, error) { return &DB{ - cfgs: map[string]config{}, + cfgs: map[string]configs.View{}, id: 0, }, nil } @@ -38,31 +26,27 @@ func (d *DB) GetConfig(userID string) (configs.View, error) { if !ok { return configs.View{}, sql.ErrNoRows } - return c.toView(), nil + return c, nil } // SetConfig sets configuration for a user. func (d *DB) SetConfig(userID string, cfg configs.Config) error { - d.cfgs[userID] = config{cfg: cfg, id: configs.ID(d.id)} + d.cfgs[userID] = configs.View{Config: cfg, ID: configs.ID(d.id)} d.id++ return nil } // GetAllConfigs gets all of the configs. func (d *DB) GetAllConfigs() (map[string]configs.View, error) { - cfgs := map[string]configs.View{} - for user, c := range d.cfgs { - cfgs[user] = c.toView() - } - return cfgs, nil + return d.cfgs, nil } // GetConfigs gets all of the configs that have changed recently. func (d *DB) GetConfigs(since configs.ID) (map[string]configs.View, error) { cfgs := map[string]configs.View{} for user, c := range d.cfgs { - if c.id > since { - cfgs[user] = c.toView() + if c.ID > since { + cfgs[user] = c } } return cfgs, nil From 029f17ce5b2c17d9398e9c81ea701c648f446960 Mon Sep 17 00:00:00 2001 From: Jonathan Lange Date: Wed, 6 Dec 2017 17:21:41 +0000 Subject: [PATCH 02/13] Separate DB interface and implementation for rules configs --- pkg/configs/configs.go | 22 +++++++++- pkg/configs/db/db.go | 16 +++++++ pkg/configs/db/memory/memory.go | 43 +++++++++++++++++++ pkg/configs/db/postgres/postgres.go | 65 +++++++++++++++++++++++++++++ pkg/configs/db/timed.go | 30 +++++++++++++ pkg/configs/db/traced.go | 20 +++++++++ 6 files changed, 194 insertions(+), 2 deletions(-) diff --git a/pkg/configs/configs.go b/pkg/configs/configs.go index a568a8f7304..5e762bf3185 100644 --- a/pkg/configs/configs.go +++ b/pkg/configs/configs.go @@ -7,8 +7,8 @@ type ID int // A Config is a Cortex configuration for a single user. type Config struct { // RulesFiles maps from a rules filename to file contents. - RulesFiles map[string]string `json:"rules_files"` - AlertmanagerConfig string `json:"alertmanager_config"` + RulesFiles RulesConfig `json:"rules_files"` + AlertmanagerConfig string `json:"alertmanager_config"` } // View is what's returned from the Weave Cloud configs service @@ -21,3 +21,21 @@ type View struct { ID ID `json:"id"` Config Config `json:"config"` } + +// GetVersionedRulesConfig specializes the view to just the rules config. +func (v View) GetVersionedRulesConfig() VersionedRulesConfig { + return VersionedRulesConfig{ + ID: v.ID, + Config: v.Config.RulesFiles, + } +} + +// RulesConfig are the set of rules files for a particular organization. +type RulesConfig map[string]string + +// VersionedRulesConfig is a RulesConfig together with a version. +// `data Versioned a = Versioned { id :: ID , config :: a }` +type VersionedRulesConfig struct { + ID ID `json:"id"` + Config RulesConfig `json:"config"` +} diff --git a/pkg/configs/db/db.go b/pkg/configs/db/db.go index 3fa3817172c..dad45e6a1b5 100644 --- a/pkg/configs/db/db.go +++ b/pkg/configs/db/db.go @@ -22,8 +22,24 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { flag.StringVar(&cfg.MigrationsDir, "database.migrations", "", "Path where the database migration files can be found") } +// RulesDB has ruler-specific DB interfaces. +type RulesDB interface { + // GetRulesConfig gets the user's ruler config + GetRulesConfig(userID string) (configs.VersionedRulesConfig, error) + SetRulesConfig(userID string, config configs.RulesConfig) error + // SetRulesConfig sets the user's ruler config + + // GetAllRulesConfigs gets all of the ruler configs + GetAllRulesConfigs() (map[string]configs.VersionedRulesConfig, error) + // GetRulesConfigs gets all of the configs that have been added or have + // changed since the provided config. + GetRulesConfigs(since configs.ID) (map[string]configs.VersionedRulesConfig, error) +} + // DB is the interface for the database. type DB interface { + RulesDB + GetConfig(userID string) (configs.View, error) SetConfig(userID string, cfg configs.Config) error diff --git a/pkg/configs/db/memory/memory.go b/pkg/configs/db/memory/memory.go index 5cb361b4913..14a3c84a8dc 100644 --- a/pkg/configs/db/memory/memory.go +++ b/pkg/configs/db/memory/memory.go @@ -56,3 +56,46 @@ func (d *DB) GetConfigs(since configs.ID) (map[string]configs.View, error) { func (d *DB) Close() error { return nil } + +// GetRulesConfig gets the rules config for a user. +func (d *DB) GetRulesConfig(userID string) (configs.VersionedRulesConfig, error) { + c, ok := d.cfgs[userID] + if !ok { + return configs.VersionedRulesConfig{}, sql.ErrNoRows + } + return c.GetVersionedRulesConfig(), nil +} + +// SetRulesConfig sets the rules config for a user. +func (d *DB) SetRulesConfig(userID string, config configs.RulesConfig) error { + c, ok := d.cfgs[userID] + if !ok { + return d.SetConfig(userID, configs.Config{RulesFiles: config}) + } + return d.SetConfig(userID, configs.Config{ + AlertmanagerConfig: c.Config.AlertmanagerConfig, + RulesFiles: config, + }) +} + +// GetAllRulesConfigs gets the rules configs for all users that have them. +func (d *DB) GetAllRulesConfigs() (map[string]configs.VersionedRulesConfig, error) { + cfgs := map[string]configs.VersionedRulesConfig{} + for user, c := range d.cfgs { + cfgs[user] = c.GetVersionedRulesConfig() + } + return cfgs, nil +} + +// GetRulesConfigs gets the rules configs that have changed +// since the given config version. +func (d *DB) GetRulesConfigs(since configs.ID) (map[string]configs.VersionedRulesConfig, error) { + cfgs := map[string]configs.VersionedRulesConfig{} + for user, c := range d.cfgs { + if c.ID <= since { + continue + } + cfgs[user] = c.GetVersionedRulesConfig() + } + return cfgs, nil +} diff --git a/pkg/configs/db/postgres/postgres.go b/pkg/configs/db/postgres/postgres.go index 83b81fe6e36..dc7eee3fc01 100644 --- a/pkg/configs/db/postgres/postgres.go +++ b/pkg/configs/db/postgres/postgres.go @@ -134,6 +134,71 @@ func (d DB) GetConfigs(since configs.ID) (map[string]configs.View, error) { }) } +// GetRulesConfig gets the latest alertmanager config for a user. +func (d DB) GetRulesConfig(userID string) (configs.VersionedRulesConfig, error) { + current, err := d.GetConfig(userID) + if err != nil { + return configs.VersionedRulesConfig{}, err + } + return current.GetVersionedRulesConfig(), nil +} + +// SetRulesConfig sets the current alertmanager config for a user. +func (d DB) SetRulesConfig(userID string, config configs.RulesConfig) error { + current, err := d.GetConfig(userID) + if err != nil { + return err + } + new := configs.Config{ + AlertmanagerConfig: current.Config.AlertmanagerConfig, + RulesFiles: config, + } + return d.SetConfig(userID, new) +} + +func (d DB) findRulesConfigs(filter squirrel.Sqlizer) (map[string]configs.VersionedRulesConfig, error) { + rows, err := d.Select("id", "owner_id", "config ->> 'rules_files'"). + Options("DISTINCT ON (owner_id)"). + From("configs"). + Where(filter). + Where("config ->> 'rules_files' <> '{}'"). + OrderBy("owner_id, id DESC"). + Query() + if err != nil { + return nil, err + } + defer rows.Close() + cfgs := map[string]configs.VersionedRulesConfig{} + for rows.Next() { + var cfg configs.VersionedRulesConfig + var userID string + var cfgBytes []byte + err = rows.Scan(&cfg.ID, &userID, &cfgBytes) + if err != nil { + return nil, err + } + err = json.Unmarshal(cfgBytes, &cfg.Config) + if err != nil { + return nil, err + } + cfgs[userID] = cfg + } + return cfgs, nil +} + +// GetAllRulesConfigs gets all alertmanager configs for all users. +func (d DB) GetAllRulesConfigs() (map[string]configs.VersionedRulesConfig, error) { + return d.findRulesConfigs(activeConfig) +} + +// GetRulesConfigs gets all the alertmanager configs that have changed since a given config. +func (d DB) GetRulesConfigs(since configs.ID) (map[string]configs.VersionedRulesConfig, error) { + return d.findRulesConfigs(squirrel.And{ + activeConfig, + squirrel.Gt{"id": since}, + }) +} + // Transaction runs the given function in a postgres transaction. If fn returns // an error the txn will be rolled back. func (d DB) Transaction(f func(DB) error) error { diff --git a/pkg/configs/db/timed.go b/pkg/configs/db/timed.go index 5da7f94ba16..d9c0f7030bc 100644 --- a/pkg/configs/db/timed.go +++ b/pkg/configs/db/timed.go @@ -74,3 +74,33 @@ func (t timed) Close() error { return t.d.Close() }) } + +func (t timed) GetRulesConfig(userID string) (cfg configs.VersionedRulesConfig, err error) { + t.timeRequest("GetRulesConfig", func(_ context.Context) error { + cfg, err = t.d.GetRulesConfig(userID) + return err + }) + return +} + +func (t timed) SetRulesConfig(userID string, cfg configs.RulesConfig) (err error) { + return t.timeRequest("SetRulesConfig", func(_ context.Context) error { + return t.d.SetRulesConfig(userID, cfg) + }) +} + +func (t timed) GetAllRulesConfigs() (cfgs map[string]configs.VersionedRulesConfig, err error) { + t.timeRequest("GetAllRulesConfigs", func(_ context.Context) error { + cfgs, err = t.d.GetAllRulesConfigs() + return err + }) + return +} + +func (t timed) GetRulesConfigs(since configs.ID) (cfgs map[string]configs.VersionedRulesConfig, err error) { + t.timeRequest("GetRulesConfigs", func(_ context.Context) error { + cfgs, err = t.d.GetRulesConfigs(since) + return err + }) + return +} diff --git a/pkg/configs/db/traced.go b/pkg/configs/db/traced.go index 1f20c15c5b0..049f4a2e5c2 100644 --- a/pkg/configs/db/traced.go +++ b/pkg/configs/db/traced.go @@ -41,3 +41,23 @@ func (t traced) Close() (err error) { defer func() { t.trace("Close", err) }() return t.d.Close() } + +func (t traced) GetRulesConfig(userID string) (cfg configs.VersionedRulesConfig, err error) { + defer func() { t.trace("GetRulesConfig", userID, cfg, err) }() + return t.d.GetRulesConfig(userID) +} + +func (t traced) SetRulesConfig(userID string, cfg configs.RulesConfig) (err error) { + defer func() { t.trace("SetRulesConfig", userID, cfg, err) }() + return t.d.SetRulesConfig(userID, cfg) +} + +func (t traced) GetAllRulesConfigs() (cfgs map[string]configs.VersionedRulesConfig, err error) { + defer func() { t.trace("GetAllRulesConfigs", cfgs, err) }() + return t.d.GetAllRulesConfigs() +} + +func (t traced) GetRulesConfigs(since configs.ID) (cfgs map[string]configs.VersionedRulesConfig, err error) { + defer func() { t.trace("GetConfigs", since, cfgs, err) }() + return t.d.GetRulesConfigs(since) +} From 82b9ee8895637c7dbe50408735e119c7087b545e Mon Sep 17 00:00:00 2001 From: Jonathan Lange Date: Wed, 6 Dec 2017 18:26:41 +0000 Subject: [PATCH 03/13] Use rules DB directly from ruler Will fall back to using configs service if no DB information supplied. --- cmd/lite/main.go | 12 +++- cmd/ruler/main.go | 11 +++- pkg/configs/client/configs.go | 7 ++- pkg/configs/configs.go | 38 ++++++++++++ pkg/configs/db/db.go | 9 +++ pkg/ruler/configs.go | 105 ++++++++++++++++++++++++++++++++++ pkg/ruler/ruler.go | 17 +----- pkg/ruler/scheduler.go | 23 ++++---- 8 files changed, 187 insertions(+), 35 deletions(-) create mode 100644 pkg/ruler/configs.go diff --git a/cmd/lite/main.go b/cmd/lite/main.go index 9bdbed2a71e..a0010499136 100644 --- a/cmd/lite/main.go +++ b/cmd/lite/main.go @@ -42,6 +42,7 @@ func main() { chunkStoreConfig chunk.StoreConfig distributorConfig distributor.Config ingesterConfig ingester.Config + configStoreConfig ruler.ConfigStoreConfig rulerConfig ruler.Config schemaConfig chunk.SchemaConfig storageConfig storage.Config @@ -52,7 +53,7 @@ func main() { // Ingester needs to know our gRPC listen port. ingesterConfig.ListenPort = &serverConfig.GRPCListenPort util.RegisterFlags(&serverConfig, &chunkStoreConfig, &distributorConfig, - &ingesterConfig, &rulerConfig, &storageConfig, &schemaConfig, &logLevel) + &ingesterConfig, &configStoreConfig, &rulerConfig, &storageConfig, &schemaConfig, &logLevel) flag.BoolVar(&unauthenticated, "unauthenticated", false, "Set to true to disable multitenancy.") flag.Parse() schemaConfig.MaxChunkAge = ingesterConfig.MaxChunkAge @@ -122,7 +123,12 @@ func main() { tableManager.Start() defer tableManager.Stop() - if rulerConfig.ConfigsAPIURL.String() != "" { + if configStoreConfig.ConfigsAPIURL.String() != "" || configStoreConfig.DBConfig.URI != "" { + rulesAPI, err := ruler.NewRulesAPI(configStoreConfig) + if err != nil { + level.Error(util.Logger).Log("msg", "error initializing ruler config store", "err", err) + os.Exit(1) + } rlr, err := ruler.NewRuler(rulerConfig, dist, chunkStore) if err != nil { level.Error(util.Logger).Log("msg", "error initializing ruler", "err", err) @@ -130,7 +136,7 @@ func main() { } defer rlr.Stop() - rulerServer, err := ruler.NewServer(rulerConfig, rlr) + rulerServer, err := ruler.NewServer(rulerConfig, rlr, rulesAPI) if err != nil { level.Error(util.Logger).Log("msg", "error initializing ruler server", "err", err) os.Exit(1) diff --git a/cmd/ruler/main.go b/cmd/ruler/main.go index 3376a2b83c3..2e82f4444c0 100644 --- a/cmd/ruler/main.go +++ b/cmd/ruler/main.go @@ -32,10 +32,11 @@ func main() { chunkStoreConfig chunk.StoreConfig schemaConfig chunk.SchemaConfig storageConfig storage.Config + configStoreConfig ruler.ConfigStoreConfig logLevel util.LogLevel ) util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig, - &rulerConfig, &chunkStoreConfig, &storageConfig, &schemaConfig, &logLevel) + &rulerConfig, &chunkStoreConfig, &storageConfig, &schemaConfig, &configStoreConfig, &logLevel) flag.Parse() util.InitLogger(logLevel.AllowedLevel) @@ -75,7 +76,13 @@ func main() { } defer rlr.Stop() - rulerServer, err := ruler.NewServer(rulerConfig, rlr) + rulesAPI, err := ruler.NewRulesAPI(configStoreConfig) + if err != nil { + level.Error(util.Logger).Log("msg", "error initializing rules API", "err", err) + os.Exit(1) + } + + rulerServer, err := ruler.NewServer(rulerConfig, rlr, rulesAPI) if err != nil { level.Error(util.Logger).Log("msg", "error initializing ruler server: %v", err) os.Exit(1) diff --git a/pkg/configs/client/configs.go b/pkg/configs/client/configs.go index df30c29ca7a..629d3134fce 100644 --- a/pkg/configs/client/configs.go +++ b/pkg/configs/client/configs.go @@ -85,7 +85,8 @@ func AlertmanagerConfigFromConfig(c configs.Config) (*config.Config, error) { return cfg, nil } -func getConfigs(endpoint string, timeout time.Duration, since configs.ID) (*ConfigsResponse, error) { +// GetConfigs gets configurations from the configs server. +func GetConfigs(endpoint string, timeout time.Duration, since configs.ID) (*ConfigsResponse, error) { req, err := http.NewRequest("GET", endpoint, nil) if err != nil { return nil, err @@ -120,7 +121,7 @@ func (c *AlertManagerConfigsAPI) GetConfigs(since configs.ID) (*ConfigsResponse, suffix = fmt.Sprintf("?since=%d", since) } endpoint := fmt.Sprintf("%s/private/api/prom/configs/alertmanager%s", c.URL.String(), suffix) - return getConfigs(endpoint, c.Timeout, since) + return GetConfigs(endpoint, c.Timeout, since) } // RulesAPI allows retrieving recording and alerting rules. @@ -137,5 +138,5 @@ func (c *RulesAPI) GetConfigs(since configs.ID) (*ConfigsResponse, error) { suffix = fmt.Sprintf("?since=%d", since) } endpoint := fmt.Sprintf("%s/private/api/prom/configs/rules%s", c.URL.String(), suffix) - return getConfigs(endpoint, c.Timeout, since) + return GetConfigs(endpoint, c.Timeout, since) } diff --git a/pkg/configs/configs.go b/pkg/configs/configs.go index 5e762bf3185..abed3067f0d 100644 --- a/pkg/configs/configs.go +++ b/pkg/configs/configs.go @@ -1,5 +1,13 @@ package configs +import ( + "fmt" + + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/rules" + "github.com/weaveworks/cortex/pkg/util" +) + // An ID is the ID of a single users's Cortex configuration. When a // configuration changes, it gets a new ID. type ID int @@ -33,6 +41,36 @@ func (v View) GetVersionedRulesConfig() VersionedRulesConfig { // RulesConfig are the set of rules files for a particular organization. type RulesConfig map[string]string +// Parse rules from the Cortex configuration. +// +// Strongly inspired by `loadGroups` in Prometheus. +func (c RulesConfig) Parse() ([]rules.Rule, error) { + result := []rules.Rule{} + for fn, content := range c { + stmts, err := promql.ParseStmts(content) + if err != nil { + return nil, fmt.Errorf("error parsing %s: %s", fn, err) + } + + for _, stmt := range stmts { + var rule rules.Rule + + switch r := stmt.(type) { + case *promql.AlertStmt: + rule = rules.NewAlertingRule(r.Name, r.Expr, r.Duration, r.Labels, r.Annotations, util.Logger) + + case *promql.RecordStmt: + rule = rules.NewRecordingRule(r.Name, r.Expr, r.Labels) + + default: + return nil, fmt.Errorf("ruler.GetRules: unknown statement type") + } + result = append(result, rule) + } + } + return result, nil +} + // VersionedRulesConfig is a RulesConfig together with a version. // `data Versioned a = Versioned { id :: ID , config :: a }` type VersionedRulesConfig struct { diff --git a/pkg/configs/db/db.go b/pkg/configs/db/db.go index dad45e6a1b5..77cc21fa867 100644 --- a/pkg/configs/db/db.go +++ b/pkg/configs/db/db.go @@ -69,3 +69,12 @@ func New(cfg Config) (DB, error) { } return traced{timed{d}}, nil } + +// NewRulesDB creates a new rules config database. +func NewRulesDB(cfg Config) (RulesDB, error) { + db, err := New(cfg) + if err != nil { + return nil, err + } + return db, err +} diff --git a/pkg/ruler/configs.go b/pkg/ruler/configs.go new file mode 100644 index 00000000000..c1caa04aca8 --- /dev/null +++ b/pkg/ruler/configs.go @@ -0,0 +1,105 @@ +package ruler + +import ( + "flag" + "fmt" + "net/url" + "time" + + "github.com/weaveworks/cortex/pkg/configs" + configs_client "github.com/weaveworks/cortex/pkg/configs/client" + "github.com/weaveworks/cortex/pkg/configs/db" + "github.com/weaveworks/cortex/pkg/util" +) + +// ConfigStoreConfig says where we can find the ruler configs. +type ConfigStoreConfig struct { + DBConfig db.Config + + // DEPRECATED + ConfigsAPIURL util.URLValue + + // DEPRECATED. HTTP timeout duration for requests made to the Weave Cloud + // configs service. + ClientTimeout time.Duration +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *ConfigStoreConfig) RegisterFlags(f *flag.FlagSet) { + cfg.DBConfig.RegisterFlags(f) + f.Var(&cfg.ConfigsAPIURL, "ruler.configs.url", "DEPRECATED. URL of configs API server.") + f.DurationVar(&cfg.ClientTimeout, "ruler.client-timeout", 5*time.Second, "DEPRECATED. Timeout for requests to Weave Cloud configs service.") +} + +// RulesAPI is what the ruler needs from a config store to process rules. +type RulesAPI interface { + // GetConfigs returns all Cortex configurations from a configs API server + // that have been updated after the given configs.ID was last updated. + GetConfigs(since configs.ID) (map[string]configs.VersionedRulesConfig, error) +} + +// NewRulesAPI creates a new RulesAPI. +func NewRulesAPI(cfg ConfigStoreConfig) (RulesAPI, error) { + // All of this falderal is to allow for a smooth transition away from + // using the configs server and toward directly connecting to the database. + // See https://github.com/weaveworks/cortex/issues/619 + if cfg.DBConfig.URI == "" { + return configsClient{ + URL: cfg.ConfigsAPIURL.URL, + Timeout: cfg.ClientTimeout, + }, nil + } + db, err := db.NewRulesDB(cfg.DBConfig) + if err != nil { + return nil, err + } + return dbStore{db: db}, nil +} + +// configsClient allows retrieving recording and alerting rules from the configs server. +type configsClient struct { + URL *url.URL + Timeout time.Duration +} + +// GetConfigs implements RulesAPI. +func (c configsClient) GetConfigs(since configs.ID) (map[string]configs.VersionedRulesConfig, error) { + suffix := "" + if since != 0 { + suffix = fmt.Sprintf("?since=%d", since) + } + endpoint := fmt.Sprintf("%s/private/api/prom/configs/rules%s", c.URL.String(), suffix) + response, err := configs_client.GetConfigs(endpoint, c.Timeout, since) + if err != nil { + return nil, err + } + configs := map[string]configs.VersionedRulesConfig{} + for id, view := range response.Configs { + configs[id] = view.GetVersionedRulesConfig() + } + return configs, nil +} + +type dbStore struct { + db db.RulesDB +} + +// GetConfigs implements RulesAPI. +func (d dbStore) GetConfigs(since configs.ID) (map[string]configs.VersionedRulesConfig, error) { + if since == 0 { + return d.db.GetAllRulesConfigs() + } + return d.db.GetRulesConfigs(since) +} + +// getLatestConfigID gets the latest configs ID. +// max [latest, max (map getID cfgs)] +func getLatestConfigID(cfgs map[string]configs.VersionedRulesConfig, latest configs.ID) configs.ID { + ret := latest + for _, config := range cfgs { + if config.ID > ret { + ret = config.ID + } + } + return ret +} diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index ddccbe67ba0..2e54f1855d3 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -22,7 +22,6 @@ import ( "github.com/weaveworks/common/user" "github.com/weaveworks/cortex/pkg/chunk" - configs "github.com/weaveworks/cortex/pkg/configs/client" "github.com/weaveworks/cortex/pkg/distributor" "github.com/weaveworks/cortex/pkg/querier" "github.com/weaveworks/cortex/pkg/util" @@ -54,12 +53,6 @@ func init() { // Config is the configuration for the recording rules server. type Config struct { - ConfigsAPIURL util.URLValue - - // HTTP timeout duration for requests made to the Weave Cloud configs - // service. - ClientTimeout time.Duration - // This is used for template expansion in alerts; must be a valid URL ExternalURL util.URLValue @@ -84,10 +77,8 @@ type Config struct { // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.ExternalURL.URL, _ = url.Parse("") // Must be non-nil - f.Var(&cfg.ConfigsAPIURL, "ruler.configs.url", "URL of configs API server.") f.Var(&cfg.ExternalURL, "ruler.external.url", "URL of alerts return path.") f.DurationVar(&cfg.EvaluationInterval, "ruler.evaluation-interval", 15*time.Second, "How frequently to evaluate rules") - f.DurationVar(&cfg.ClientTimeout, "ruler.client-timeout", 5*time.Second, "Timeout for requests to Weave Cloud configs service.") f.IntVar(&cfg.NumWorkers, "ruler.num-workers", 1, "Number of rule evaluator worker routines in this process") f.Var(&cfg.AlertmanagerURL, "ruler.alertmanager-url", "URL of the Alertmanager to send notifications to.") f.BoolVar(&cfg.AlertmanagerDiscovery, "ruler.alertmanager-discovery", false, "Use DNS SRV records to discover alertmanager hosts.") @@ -280,13 +271,9 @@ type Server struct { } // NewServer makes a new rule processing server. -func NewServer(cfg Config, ruler *Ruler) (*Server, error) { - c := configs.RulesAPI{ - URL: cfg.ConfigsAPIURL.URL, - Timeout: cfg.ClientTimeout, - } +func NewServer(cfg Config, ruler *Ruler, rulesAPI RulesAPI) (*Server, error) { // TODO: Separate configuration for polling interval. - s := newScheduler(c, cfg.EvaluationInterval, cfg.EvaluationInterval) + s := newScheduler(rulesAPI, cfg.EvaluationInterval, cfg.EvaluationInterval) if cfg.NumWorkers <= 0 { return nil, fmt.Errorf("must have at least 1 worker, got %d", cfg.NumWorkers) } diff --git a/pkg/ruler/scheduler.go b/pkg/ruler/scheduler.go index 806d108d70c..729672e3dad 100644 --- a/pkg/ruler/scheduler.go +++ b/pkg/ruler/scheduler.go @@ -14,7 +14,6 @@ import ( "github.com/weaveworks/common/instrument" "github.com/weaveworks/cortex/pkg/configs" - configs_client "github.com/weaveworks/cortex/pkg/configs/client" "github.com/weaveworks/cortex/pkg/util" ) @@ -79,7 +78,7 @@ func (w workItem) String() string { } type scheduler struct { - configsAPI configs_client.RulesAPI + rulesAPI RulesAPI evaluationInterval time.Duration // how often we re-evaluate each rule set q *SchedulingQueue @@ -94,9 +93,9 @@ type scheduler struct { } // newScheduler makes a new scheduler. -func newScheduler(configsAPI configs_client.RulesAPI, evaluationInterval, pollInterval time.Duration) scheduler { +func newScheduler(rulesAPI RulesAPI, evaluationInterval, pollInterval time.Duration) scheduler { return scheduler{ - configsAPI: configsAPI, + rulesAPI: rulesAPI, evaluationInterval: evaluationInterval, pollInterval: pollInterval, q: NewSchedulingQueue(clockwork.NewRealClock()), @@ -137,7 +136,7 @@ func (s *scheduler) Stop() { // Load the full set of configurations from the server, retrying with backoff // until we can get them. -func (s *scheduler) loadAllConfigs() map[string]configs.View { +func (s *scheduler) loadAllConfigs() map[string]configs.VersionedRulesConfig { backoff := util.NewBackoff(context.Background(), backoffConfig) for { cfgs, err := s.poll() @@ -160,14 +159,14 @@ func (s *scheduler) updateConfigs(now time.Time) error { } // poll the configuration server. Not re-entrant. -func (s *scheduler) poll() (map[string]configs.View, error) { +func (s *scheduler) poll() (map[string]configs.VersionedRulesConfig, error) { s.Lock() configID := s.latestConfig s.Unlock() - var cfgs *configs_client.ConfigsResponse + var cfgs map[string]configs.VersionedRulesConfig err := instrument.TimeRequestHistogram(context.Background(), "Configs.GetConfigs", configsRequestDuration, func(_ context.Context) error { var err error - cfgs, err = s.configsAPI.GetConfigs(configID) + cfgs, err = s.rulesAPI.GetConfigs(configID) return err }) if err != nil { @@ -175,16 +174,16 @@ func (s *scheduler) poll() (map[string]configs.View, error) { return nil, err } s.Lock() - s.latestConfig = cfgs.GetLatestConfigID() + s.latestConfig = getLatestConfigID(cfgs, configID) s.Unlock() - return cfgs.Configs, nil + return cfgs, nil } -func (s *scheduler) addNewConfigs(now time.Time, cfgs map[string]configs.View) { +func (s *scheduler) addNewConfigs(now time.Time, cfgs map[string]configs.VersionedRulesConfig) { // TODO: instrument how many configs we have, both valid & invalid. level.Debug(util.Logger).Log("msg", "adding configurations", "num_configs", len(cfgs)) for userID, config := range cfgs { - rules, err := configs_client.RulesFromConfig(config.Config) + rules, err := config.Config.Parse() if err != nil { // XXX: This means that if a user has a working configuration and // they submit a broken one, we'll keep processing the last known From 3dc22b9b093678aae9626b97c01d4cdb16ff66f9 Mon Sep 17 00:00:00 2001 From: Jonathan Lange Date: Wed, 20 Dec 2017 15:45:26 +0000 Subject: [PATCH 04/13] Public RulesAPI no longer used. Delete it. Now an internal implementation detail of ruler while we migrate away from configs server for ruler configs. --- pkg/configs/client/configs.go | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/pkg/configs/client/configs.go b/pkg/configs/client/configs.go index 629d3134fce..a26adad7619 100644 --- a/pkg/configs/client/configs.go +++ b/pkg/configs/client/configs.go @@ -123,20 +123,3 @@ func (c *AlertManagerConfigsAPI) GetConfigs(since configs.ID) (*ConfigsResponse, endpoint := fmt.Sprintf("%s/private/api/prom/configs/alertmanager%s", c.URL.String(), suffix) return GetConfigs(endpoint, c.Timeout, since) } - -// RulesAPI allows retrieving recording and alerting rules. -type RulesAPI struct { - URL *url.URL - Timeout time.Duration -} - -// GetConfigs returns all Cortex configurations from a configs API server -// that have been updated after the given configs.ID was last updated. -func (c *RulesAPI) GetConfigs(since configs.ID) (*ConfigsResponse, error) { - suffix := "" - if since != 0 { - suffix = fmt.Sprintf("?since=%d", since) - } - endpoint := fmt.Sprintf("%s/private/api/prom/configs/rules%s", c.URL.String(), suffix) - return GetConfigs(endpoint, c.Timeout, since) -} From a8702b30357932831b8b904331aec93e2e427357 Mon Sep 17 00:00:00 2001 From: Jonathan Lange Date: Wed, 6 Dec 2017 21:19:04 +0000 Subject: [PATCH 05/13] Provide APIs for getting & setting rules on ruler itself --- cmd/lite/main.go | 12 ++++++ cmd/ruler/main.go | 12 ++++++ pkg/ruler/api.go | 107 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 131 insertions(+) create mode 100644 pkg/ruler/api.go diff --git a/cmd/lite/main.go b/cmd/lite/main.go index a0010499136..669ef30655f 100644 --- a/cmd/lite/main.go +++ b/cmd/lite/main.go @@ -169,6 +169,18 @@ func main() { }) } + // Only serve the API for setting & getting rules configs if the database + // was provided. Allows for smoother migration. See + // https://github.com/weaveworks/cortex/issues/619 + if configStoreConfig.DBConfig.URI != "" { + a, err := ruler.NewAPIFromConfig(configStoreConfig.DBConfig) + if err != nil { + level.Error(util.Logger).Log("msg", "error initializing public rules API", "err", err) + os.Exit(1) + } + a.RegisterRoutes(server.HTTP) + } + subrouter := server.HTTP.PathPrefix("/api/prom").Subrouter() subrouter.PathPrefix("/api/v1").Handler(activeMiddleware.Wrap(promRouter)) subrouter.Path("/read").Handler(activeMiddleware.Wrap(http.HandlerFunc(sampleQueryable.RemoteReadHandler))) diff --git a/cmd/ruler/main.go b/cmd/ruler/main.go index 2e82f4444c0..388f32c65f4 100644 --- a/cmd/ruler/main.go +++ b/cmd/ruler/main.go @@ -96,6 +96,18 @@ func main() { } defer server.Shutdown() + // Only serve the API for setting & getting rules configs if the database + // was provided. Allows for smoother migration. See + // https://github.com/weaveworks/cortex/issues/619 + if configStoreConfig.DBConfig.URI != "" { + a, err := ruler.NewAPIFromConfig(configStoreConfig.DBConfig) + if err != nil { + level.Error(util.Logger).Log("msg", "error initializing public rules API", "err", err) + os.Exit(1) + } + a.RegisterRoutes(server.HTTP) + } + server.HTTP.Handle("/ring", r) server.Run() } diff --git a/pkg/ruler/api.go b/pkg/ruler/api.go new file mode 100644 index 00000000000..208d2101a85 --- /dev/null +++ b/pkg/ruler/api.go @@ -0,0 +1,107 @@ +package ruler + +import ( + "database/sql" + "encoding/json" + "fmt" + "net/http" + + "github.com/go-kit/kit/log/level" + "github.com/gorilla/mux" + + "github.com/weaveworks/common/user" + "github.com/weaveworks/cortex/pkg/configs" + "github.com/weaveworks/cortex/pkg/configs/db" + "github.com/weaveworks/cortex/pkg/util" +) + +// API implements the configs api. +type API struct { + db db.RulesDB + http.Handler +} + +// NewAPIFromConfig makes a new API from our database config. +func NewAPIFromConfig(cfg db.Config) (*API, error) { + db, err := db.NewRulesDB(cfg) + if err != nil { + return nil, err + } + return NewAPI(db), nil +} + +// NewAPI creates a new API. +func NewAPI(db db.RulesDB) *API { + a := &API{db: db} + r := mux.NewRouter() + a.RegisterRoutes(r) + a.Handler = r + return a +} + +// RegisterRoutes registers the configs API HTTP routes with the provided Router. +func (a *API) RegisterRoutes(r *mux.Router) { + for _, route := range []struct { + name, method, path string + handler http.HandlerFunc + }{ + {"get_rules", "GET", "/api/prom/rules", a.getConfig}, + {"set_rules", "POST", "/api/prom/rules", a.setConfig}, + } { + r.Handle(route.path, route.handler).Methods(route.method).Name(route.name) + } +} + +// getConfig returns the request configuration. +func (a *API) getConfig(w http.ResponseWriter, r *http.Request) { + userID, _, err := user.ExtractOrgIDFromHTTPRequest(r) + if err != nil { + http.Error(w, err.Error(), http.StatusUnauthorized) + return + } + logger := util.WithContext(r.Context(), util.Logger) + + cfg, err := a.db.GetRulesConfig(userID) + if err == sql.ErrNoRows { + http.Error(w, "No configuration", http.StatusNotFound) + return + } else if err != nil { + level.Error(logger).Log("msg", "error getting config", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(cfg); err != nil { + level.Error(logger).Log("msg", "error encoding config", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +func (a *API) setConfig(w http.ResponseWriter, r *http.Request) { + userID, _, err := user.ExtractOrgIDFromHTTPRequest(r) + if err != nil { + http.Error(w, err.Error(), http.StatusUnauthorized) + return + } + logger := util.WithContext(r.Context(), util.Logger) + + var cfg configs.RulesConfig + if err := json.NewDecoder(r.Body).Decode(&cfg); err != nil { + level.Error(logger).Log("msg", "error decoding json body", "err", err) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + if _, err := cfg.Parse(); err != nil { + level.Error(logger).Log("msg", "invalid rules", "err", err) + http.Error(w, fmt.Sprintf("Invalid rules: %v", err), http.StatusBadRequest) + return + } + if err := a.db.SetRulesConfig(userID, cfg); err != nil { + level.Error(logger).Log("msg", "error storing config", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusNoContent) +} From a82b28a3b4c068b26ec56c5585081c70220efb12 Mon Sep 17 00:00:00 2001 From: Jonathan Lange Date: Wed, 10 Jan 2018 10:43:27 +0000 Subject: [PATCH 06/13] Make new `set` endpoint CAS Comes with tests, and updates to our Makefile to ensure that the tests run against both postgres and in-memory database. --- Makefile | 2 +- pkg/configs/db/db.go | 2 +- pkg/configs/db/memory/memory.go | 12 +- pkg/configs/db/postgres/postgres.go | 32 +++-- pkg/configs/db/timed.go | 8 +- pkg/configs/db/traced.go | 6 +- pkg/ruler/api.go | 21 +++- pkg/ruler/api_test.go | 179 ++++++++++++++++++++++++++++ 8 files changed, 234 insertions(+), 28 deletions(-) create mode 100644 pkg/ruler/api_test.go diff --git a/Makefile b/Makefile index fe234248633..76d0a86a991 100644 --- a/Makefile +++ b/Makefile @@ -106,7 +106,7 @@ shell: build-image/$(UPTODATE) bash configs-integration-test: - /bin/bash -c "go test -tags 'netgo integration' -timeout 30s ./pkg/configs/..." + /bin/bash -c "go test -tags 'netgo integration' -timeout 30s ./pkg/configs/... ./pkg/ruler/..." endif diff --git a/pkg/configs/db/db.go b/pkg/configs/db/db.go index 77cc21fa867..2232d2977ac 100644 --- a/pkg/configs/db/db.go +++ b/pkg/configs/db/db.go @@ -26,8 +26,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { type RulesDB interface { // GetRulesConfig gets the user's ruler config GetRulesConfig(userID string) (configs.VersionedRulesConfig, error) - SetRulesConfig(userID string, config configs.RulesConfig) error // SetRulesConfig sets the user's ruler config + SetRulesConfig(userID string, oldConfig, newConfig configs.RulesConfig) (bool, error) // GetAllRulesConfigs gets all of the ruler configs GetAllRulesConfigs() (map[string]configs.VersionedRulesConfig, error) diff --git a/pkg/configs/db/memory/memory.go b/pkg/configs/db/memory/memory.go index 14a3c84a8dc..9d50591c116 100644 --- a/pkg/configs/db/memory/memory.go +++ b/pkg/configs/db/memory/memory.go @@ -2,6 +2,7 @@ package memory import ( "database/sql" + "reflect" "github.com/weaveworks/cortex/pkg/configs" ) @@ -67,14 +68,17 @@ func (d *DB) GetRulesConfig(userID string) (configs.VersionedRulesConfig, error) } // SetRulesConfig sets the rules config for a user. -func (d *DB) SetRulesConfig(userID string, config configs.RulesConfig) error { +func (d *DB) SetRulesConfig(userID string, oldConfig, newConfig configs.RulesConfig) (bool, error) { c, ok := d.cfgs[userID] if !ok { - return d.SetConfig(userID, configs.Config{RulesFiles: config}) + return true, d.SetConfig(userID, configs.Config{RulesFiles: newConfig}) } - return d.SetConfig(userID, configs.Config{ + if !reflect.DeepEqual(c.Config.RulesFiles, oldConfig) { + return false, nil + } + return true, d.SetConfig(userID, configs.Config{ AlertmanagerConfig: c.Config.AlertmanagerConfig, - RulesFiles: config, + RulesFiles: newConfig, }) } diff --git a/pkg/configs/db/postgres/postgres.go b/pkg/configs/db/postgres/postgres.go index dc7eee3fc01..61901246212 100644 --- a/pkg/configs/db/postgres/postgres.go +++ b/pkg/configs/db/postgres/postgres.go @@ -4,6 +4,7 @@ import ( "database/sql" "encoding/json" "errors" + "reflect" "github.com/Masterminds/squirrel" "github.com/go-kit/kit/log/level" @@ -144,16 +145,27 @@ func (d DB) GetRulesConfig(userID string) (configs.VersionedRulesConfig, error) } // SetRulesConfig sets the current alertmanager config for a user. -func (d DB) SetRulesConfig(userID string, config configs.RulesConfig) error { - current, err := d.GetConfig(userID) - if err != nil { - return err - } - new := configs.Config{ - AlertmanagerConfig: current.Config.AlertmanagerConfig, - RulesFiles: config, - } - return d.SetConfig(userID, new) +func (d DB) SetRulesConfig(userID string, oldConfig, newConfig configs.RulesConfig) (bool, error) { + updated := false + err := d.Transaction(func(tx DB) error { + current, err := d.GetConfig(userID) + if err != nil && err != sql.ErrNoRows { + return err + } + // The supplied oldConfig must match the current config. If no config + // exists, then oldConfig must be nil. Otherwise, it must exactly + // equal the existing config. + if !((err == sql.ErrNoRows && oldConfig == nil) || reflect.DeepEqual(current.Config.RulesFiles, oldConfig)) { + return nil + } + new := configs.Config{ + AlertmanagerConfig: current.Config.AlertmanagerConfig, + RulesFiles: newConfig, + } + updated = true + return d.SetConfig(userID, new) + }) + return updated, err } func (d DB) findRulesConfigs(filter squirrel.Sqlizer) (map[string]configs.VersionedRulesConfig, error) { diff --git a/pkg/configs/db/timed.go b/pkg/configs/db/timed.go index d9c0f7030bc..dbc2fd9a83f 100644 --- a/pkg/configs/db/timed.go +++ b/pkg/configs/db/timed.go @@ -83,10 +83,12 @@ func (t timed) GetRulesConfig(userID string) (cfg configs.VersionedRulesConfig, return } -func (t timed) SetRulesConfig(userID string, cfg configs.RulesConfig) (err error) { - return t.timeRequest("SetRulesConfig", func(_ context.Context) error { - return t.d.SetRulesConfig(userID, cfg) +func (t timed) SetRulesConfig(userID string, oldCfg, newCfg configs.RulesConfig) (updated bool, err error) { + t.timeRequest("SetRulesConfig", func(_ context.Context) error { + updated, err = t.d.SetRulesConfig(userID, oldCfg, newCfg) + return err }) + return } func (t timed) GetAllRulesConfigs() (cfgs map[string]configs.VersionedRulesConfig, err error) { diff --git a/pkg/configs/db/traced.go b/pkg/configs/db/traced.go index 049f4a2e5c2..89f4a58c464 100644 --- a/pkg/configs/db/traced.go +++ b/pkg/configs/db/traced.go @@ -47,9 +47,9 @@ func (t traced) GetRulesConfig(userID string) (cfg configs.VersionedRulesConfig, return t.d.GetRulesConfig(userID) } -func (t traced) SetRulesConfig(userID string, cfg configs.RulesConfig) (err error) { - defer func() { t.trace("SetRulesConfig", userID, cfg, err) }() - return t.d.SetRulesConfig(userID, cfg) +func (t traced) SetRulesConfig(userID string, oldCfg, newCfg configs.RulesConfig) (updated bool, err error) { + defer func() { t.trace("SetRulesConfig", userID, oldCfg, newCfg, updated, err) }() + return t.d.SetRulesConfig(userID, oldCfg, newCfg) } func (t traced) GetAllRulesConfigs() (cfgs map[string]configs.VersionedRulesConfig, err error) { diff --git a/pkg/ruler/api.go b/pkg/ruler/api.go index 208d2101a85..f98de541a20 100644 --- a/pkg/ruler/api.go +++ b/pkg/ruler/api.go @@ -46,7 +46,7 @@ func (a *API) RegisterRoutes(r *mux.Router) { handler http.HandlerFunc }{ {"get_rules", "GET", "/api/prom/rules", a.getConfig}, - {"set_rules", "POST", "/api/prom/rules", a.setConfig}, + {"cas_rules", "POST", "/api/prom/rules", a.casConfig}, } { r.Handle(route.path, route.handler).Methods(route.method).Name(route.name) } @@ -79,7 +79,12 @@ func (a *API) getConfig(w http.ResponseWriter, r *http.Request) { } } -func (a *API) setConfig(w http.ResponseWriter, r *http.Request) { +type configUpdateRequest struct { + OldConfig configs.RulesConfig `json:"old_config"` + NewConfig configs.RulesConfig `json:"new_config"` +} + +func (a *API) casConfig(w http.ResponseWriter, r *http.Request) { userID, _, err := user.ExtractOrgIDFromHTTPRequest(r) if err != nil { http.Error(w, err.Error(), http.StatusUnauthorized) @@ -87,21 +92,25 @@ func (a *API) setConfig(w http.ResponseWriter, r *http.Request) { } logger := util.WithContext(r.Context(), util.Logger) - var cfg configs.RulesConfig - if err := json.NewDecoder(r.Body).Decode(&cfg); err != nil { + var updateReq configUpdateRequest + if err := json.NewDecoder(r.Body).Decode(&updateReq); err != nil { level.Error(logger).Log("msg", "error decoding json body", "err", err) http.Error(w, err.Error(), http.StatusBadRequest) return } - if _, err := cfg.Parse(); err != nil { + if _, err := updateReq.NewConfig.Parse(); err != nil { level.Error(logger).Log("msg", "invalid rules", "err", err) http.Error(w, fmt.Sprintf("Invalid rules: %v", err), http.StatusBadRequest) return } - if err := a.db.SetRulesConfig(userID, cfg); err != nil { + updated, err := a.db.SetRulesConfig(userID, updateReq.OldConfig, updateReq.NewConfig) + if err != nil { level.Error(logger).Log("msg", "error storing config", "err", err) http.Error(w, err.Error(), http.StatusInternalServerError) return } + if !updated { + http.Error(w, "Supplied configuration doesn't match current configuration", http.StatusConflict) + } w.WriteHeader(http.StatusNoContent) } diff --git a/pkg/ruler/api_test.go b/pkg/ruler/api_test.go new file mode 100644 index 00000000000..44d4151aa02 --- /dev/null +++ b/pkg/ruler/api_test.go @@ -0,0 +1,179 @@ +package ruler + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/weaveworks/common/user" + "github.com/weaveworks/cortex/pkg/configs" + "github.com/weaveworks/cortex/pkg/configs/db" + "github.com/weaveworks/cortex/pkg/configs/db/dbtest" +) + +const ( + endpoint = "/api/prom/rules" +) + +var ( + app *API + database db.DB + counter int +) + +// setup sets up the environment for the tests. +func setup(t *testing.T) { + database = dbtest.Setup(t) + app = NewAPI(database) + counter = 0 +} + +// cleanup cleans up the environment after a test. +func cleanup(t *testing.T) { + dbtest.Cleanup(t, database) +} + +// request makes a request to the configs API. +func request(t *testing.T, method, urlStr string, body io.Reader) *httptest.ResponseRecorder { + w := httptest.NewRecorder() + r, err := http.NewRequest(method, urlStr, body) + require.NoError(t, err) + app.ServeHTTP(w, r) + return w +} + +// requestAsUser makes a request to the configs API as the given user. +func requestAsUser(t *testing.T, userID string, method, urlStr string, body io.Reader) *httptest.ResponseRecorder { + w := httptest.NewRecorder() + r, err := http.NewRequest(method, urlStr, body) + require.NoError(t, err) + r = r.WithContext(user.InjectOrgID(r.Context(), userID)) + user.InjectOrgIDIntoHTTPRequest(r.Context(), r) + app.ServeHTTP(w, r) + return w +} + +// makeString makes a string, guaranteed to be unique within a test. +func makeString(pattern string) string { + counter++ + return fmt.Sprintf(pattern, counter) +} + +// makeUserID makes an arbitrary user ID. Guaranteed to be unique within a test. +func makeUserID() string { + return makeString("user%d") +} + +// makeRulerConfig makes an arbitrary ruler config +func makeRulerConfig() configs.RulesConfig { + return configs.RulesConfig(map[string]string{ + "filename.rules": makeString(` +# Config no. %d. +ALERT ScrapeFailed + IF up != 1 + FOR 10m + LABELS { severity="warning" } + ANNOTATIONS { + summary = "Scrape of {{$labels.job}} (pod: {{$labels.instance}}) failed.", + description = "Prometheus cannot reach the /metrics page on the {{$labels.instance}} pod.", + impact = "We have no monitoring data for {{$labels.job}} - {{$labels.instance}}. At worst, it's completely down. At best, we cannot reliably respond to operational issues.", + dashboardURL = "$${base_url}/admin/prometheus/targets", + } + `), + }) +} + +// parseVersionedRulesConfig parses a configs.VersionedRulesConfig from JSON. +func parseVersionedRulesConfig(t *testing.T, b []byte) configs.VersionedRulesConfig { + var x configs.VersionedRulesConfig + err := json.Unmarshal(b, &x) + require.NoError(t, err, "Could not unmarshal JSON: %v", string(b)) + return x +} + +// post a config +func post(t *testing.T, userID string, oldConfig configs.RulesConfig, newConfig configs.RulesConfig) configs.VersionedRulesConfig { + updateRequest := configUpdateRequest{ + OldConfig: oldConfig, + NewConfig: newConfig, + } + b, err := json.Marshal(updateRequest) + require.NoError(t, err) + reader := bytes.NewReader(b) + w := requestAsUser(t, userID, "POST", endpoint, reader) + require.Equal(t, http.StatusNoContent, w.Code) + return get(t, userID) +} + +// get a config +func get(t *testing.T, userID string) configs.VersionedRulesConfig { + w := requestAsUser(t, userID, "GET", endpoint, nil) + return parseVersionedRulesConfig(t, w.Body.Bytes()) +} + +// configs returns 404 if no config has been created yet. +func Test_GetConfig_NotFound(t *testing.T) { + setup(t) + defer cleanup(t) + + userID := makeUserID() + w := requestAsUser(t, userID, "GET", endpoint, nil) + assert.Equal(t, http.StatusNotFound, w.Code) +} + +// configs returns 401 to requests without authentication. +func Test_PostConfig_Anonymous(t *testing.T) { + setup(t) + defer cleanup(t) + + w := request(t, "POST", endpoint, nil) + assert.Equal(t, http.StatusUnauthorized, w.Code) +} + +// Posting to a configuration sets it so that you can get it again. +func Test_PostConfig_CreatesConfig(t *testing.T) { + setup(t) + defer cleanup(t) + + userID := makeUserID() + config := makeRulerConfig() + result := post(t, userID, nil, config) + assert.Equal(t, config, result.Config) +} + +// Posting to a configuration sets it so that you can get it again. +func Test_PostConfig_UpdatesConfig(t *testing.T) { + setup(t) + defer cleanup(t) + + userID := makeUserID() + config1 := makeRulerConfig() + view1 := post(t, userID, nil, config1) + config2 := makeRulerConfig() + view2 := post(t, userID, config1, config2) + assert.True(t, view2.ID > view1.ID, "%v > %v", view2.ID, view1.ID) + assert.Equal(t, config2, view2.Config) +} + +// Different users can have different configurations. +func Test_PostConfig_MultipleUsers(t *testing.T) { + setup(t) + defer cleanup(t) + + userID1 := makeUserID() + userID2 := makeUserID() + config1 := post(t, userID1, nil, makeRulerConfig()) + config2 := post(t, userID2, nil, makeRulerConfig()) + foundConfig1 := get(t, userID1) + assert.Equal(t, config1, foundConfig1) + foundConfig2 := get(t, userID2) + assert.Equal(t, config2, foundConfig2) + assert.True(t, config2.ID > config1.ID, "%v > %v", config2.ID, config1.ID) +} From a61dc30809a4254b150057b4a50360d9ab48b940 Mon Sep 17 00:00:00 2001 From: Jonathan Lange Date: Thu, 21 Dec 2017 09:48:26 +0000 Subject: [PATCH 07/13] Tests for getting all configs --- pkg/ruler/api_test.go | 68 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 65 insertions(+), 3 deletions(-) diff --git a/pkg/ruler/api_test.go b/pkg/ruler/api_test.go index 44d4151aa02..2f82f3e2840 100644 --- a/pkg/ruler/api_test.go +++ b/pkg/ruler/api_test.go @@ -23,9 +23,10 @@ const ( ) var ( - app *API - database db.DB - counter int + app *API + database db.DB + counter int + privateAPI RulesAPI ) // setup sets up the environment for the tests. @@ -33,6 +34,7 @@ func setup(t *testing.T) { database = dbtest.Setup(t) app = NewAPI(database) counter = 0 + privateAPI = dbStore{db: database} } // cleanup cleans up the environment after a test. @@ -177,3 +179,63 @@ func Test_PostConfig_MultipleUsers(t *testing.T) { assert.Equal(t, config2, foundConfig2) assert.True(t, config2.ID > config1.ID, "%v > %v", config2.ID, config1.ID) } + +// GetAllConfigs returns an empty list of configs if there aren't any. +func Test_GetAllConfigs_Empty(t *testing.T) { + setup(t) + defer cleanup(t) + + configs, err := privateAPI.GetConfigs(0) + assert.NoError(t, err, "error getting configs") + assert.Equal(t, 0, len(configs)) +} + +// GetAllConfigs returns all created configs. +func Test_GetAllConfigs(t *testing.T) { + setup(t) + defer cleanup(t) + + userID := makeUserID() + config := makeRulerConfig() + view := post(t, userID, nil, config) + + found, err := privateAPI.GetConfigs(0) + assert.NoError(t, err, "error getting configs") + assert.Equal(t, map[string]configs.VersionedRulesConfig{ + userID: view, + }, found) +} + +// GetAllConfigs returns the *newest* versions of all created configs. +func Test_GetAllConfigs_Newest(t *testing.T) { + setup(t) + defer cleanup(t) + + userID := makeUserID() + + config1 := post(t, userID, nil, makeRulerConfig()) + config2 := post(t, userID, config1.Config, makeRulerConfig()) + lastCreated := post(t, userID, config2.Config, makeRulerConfig()) + + found, err := privateAPI.GetConfigs(0) + assert.NoError(t, err, "error getting configs") + assert.Equal(t, map[string]configs.VersionedRulesConfig{ + userID: lastCreated, + }, found) +} + +func Test_GetConfigs_IncludesNewerConfigsAndExcludesOlder(t *testing.T) { + setup(t) + defer cleanup(t) + + post(t, makeUserID(), nil, makeRulerConfig()) + config2 := post(t, makeUserID(), nil, makeRulerConfig()) + userID3 := makeUserID() + config3 := post(t, userID3, nil, makeRulerConfig()) + + found, err := privateAPI.GetConfigs(config2.ID) + assert.NoError(t, err, "error getting configs") + assert.Equal(t, map[string]configs.VersionedRulesConfig{ + userID3: config3, + }, found) +} From 095d09f0c130123b62d1d4a8e816fe6353291310 Mon Sep 17 00:00:00 2001 From: Jonathan Lange Date: Fri, 5 Jan 2018 15:53:17 +0000 Subject: [PATCH 08/13] Tests for invalid configs --- pkg/ruler/api_test.go | 52 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/pkg/ruler/api_test.go b/pkg/ruler/api_test.go index 2f82f3e2840..408393cd5a5 100644 --- a/pkg/ruler/api_test.go +++ b/pkg/ruler/api_test.go @@ -150,6 +150,32 @@ func Test_PostConfig_CreatesConfig(t *testing.T) { assert.Equal(t, config, result.Config) } +// Posting an invalid config when there's none set returns an error and leaves the config unset. +func Test_PostConfig_InvalidNewConfig(t *testing.T) { + setup(t) + defer cleanup(t) + + userID := makeUserID() + invalidConfig := map[string]string{ + "some.rules": "invalid config", + } + updateRequest := configUpdateRequest{ + OldConfig: nil, + NewConfig: invalidConfig, + } + b, err := json.Marshal(updateRequest) + require.NoError(t, err) + reader := bytes.NewReader(b) + { + w := requestAsUser(t, userID, "POST", endpoint, reader) + require.Equal(t, http.StatusBadRequest, w.Code) + } + { + w := requestAsUser(t, userID, "GET", endpoint, nil) + require.Equal(t, http.StatusNotFound, w.Code) + } +} + // Posting to a configuration sets it so that you can get it again. func Test_PostConfig_UpdatesConfig(t *testing.T) { setup(t) @@ -164,6 +190,32 @@ func Test_PostConfig_UpdatesConfig(t *testing.T) { assert.Equal(t, config2, view2.Config) } +// Posting an invalid config when there's one already set returns an error and leaves the config as is. +func Test_PostConfig_InvalidChangedConfig(t *testing.T) { + setup(t) + defer cleanup(t) + + userID := makeUserID() + config := makeRulerConfig() + post(t, userID, nil, config) + invalidConfig := map[string]string{ + "some.rules": "invalid config", + } + updateRequest := configUpdateRequest{ + OldConfig: nil, + NewConfig: invalidConfig, + } + b, err := json.Marshal(updateRequest) + require.NoError(t, err) + reader := bytes.NewReader(b) + { + w := requestAsUser(t, userID, "POST", endpoint, reader) + require.Equal(t, http.StatusBadRequest, w.Code) + } + result := get(t, userID) + assert.Equal(t, config, result.Config) +} + // Different users can have different configurations. func Test_PostConfig_MultipleUsers(t *testing.T) { setup(t) From 7f2099e462daf9070968262fd5ab3da74a13707c Mon Sep 17 00:00:00 2001 From: Jonathan Lange Date: Wed, 10 Jan 2018 10:22:43 +0000 Subject: [PATCH 09/13] Explicitly provide handler in ruler API test helpers This allows us to re-use the helpers to communicate to other endpoints served by completely different API objects. --- pkg/ruler/api_test.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/pkg/ruler/api_test.go b/pkg/ruler/api_test.go index 408393cd5a5..83e57df9a8f 100644 --- a/pkg/ruler/api_test.go +++ b/pkg/ruler/api_test.go @@ -43,22 +43,22 @@ func cleanup(t *testing.T) { } // request makes a request to the configs API. -func request(t *testing.T, method, urlStr string, body io.Reader) *httptest.ResponseRecorder { +func request(t *testing.T, handler http.Handler, method, urlStr string, body io.Reader) *httptest.ResponseRecorder { w := httptest.NewRecorder() r, err := http.NewRequest(method, urlStr, body) require.NoError(t, err) - app.ServeHTTP(w, r) + handler.ServeHTTP(w, r) return w } // requestAsUser makes a request to the configs API as the given user. -func requestAsUser(t *testing.T, userID string, method, urlStr string, body io.Reader) *httptest.ResponseRecorder { +func requestAsUser(t *testing.T, handler http.Handler, userID string, method, urlStr string, body io.Reader) *httptest.ResponseRecorder { w := httptest.NewRecorder() r, err := http.NewRequest(method, urlStr, body) require.NoError(t, err) r = r.WithContext(user.InjectOrgID(r.Context(), userID)) user.InjectOrgIDIntoHTTPRequest(r.Context(), r) - app.ServeHTTP(w, r) + handler.ServeHTTP(w, r) return w } @@ -109,14 +109,14 @@ func post(t *testing.T, userID string, oldConfig configs.RulesConfig, newConfig b, err := json.Marshal(updateRequest) require.NoError(t, err) reader := bytes.NewReader(b) - w := requestAsUser(t, userID, "POST", endpoint, reader) + w := requestAsUser(t, app, userID, "POST", endpoint, reader) require.Equal(t, http.StatusNoContent, w.Code) return get(t, userID) } // get a config func get(t *testing.T, userID string) configs.VersionedRulesConfig { - w := requestAsUser(t, userID, "GET", endpoint, nil) + w := requestAsUser(t, app, userID, "GET", endpoint, nil) return parseVersionedRulesConfig(t, w.Body.Bytes()) } @@ -126,7 +126,7 @@ func Test_GetConfig_NotFound(t *testing.T) { defer cleanup(t) userID := makeUserID() - w := requestAsUser(t, userID, "GET", endpoint, nil) + w := requestAsUser(t, app, userID, "GET", endpoint, nil) assert.Equal(t, http.StatusNotFound, w.Code) } @@ -135,7 +135,7 @@ func Test_PostConfig_Anonymous(t *testing.T) { setup(t) defer cleanup(t) - w := request(t, "POST", endpoint, nil) + w := request(t, app, "POST", endpoint, nil) assert.Equal(t, http.StatusUnauthorized, w.Code) } @@ -167,11 +167,11 @@ func Test_PostConfig_InvalidNewConfig(t *testing.T) { require.NoError(t, err) reader := bytes.NewReader(b) { - w := requestAsUser(t, userID, "POST", endpoint, reader) + w := requestAsUser(t, app, userID, "POST", endpoint, reader) require.Equal(t, http.StatusBadRequest, w.Code) } { - w := requestAsUser(t, userID, "GET", endpoint, nil) + w := requestAsUser(t, app, userID, "GET", endpoint, nil) require.Equal(t, http.StatusNotFound, w.Code) } } @@ -209,7 +209,7 @@ func Test_PostConfig_InvalidChangedConfig(t *testing.T) { require.NoError(t, err) reader := bytes.NewReader(b) { - w := requestAsUser(t, userID, "POST", endpoint, reader) + w := requestAsUser(t, app, userID, "POST", endpoint, reader) require.Equal(t, http.StatusBadRequest, w.Code) } result := get(t, userID) From 53265d704e2b08183b55993b2455ff32290944e1 Mon Sep 17 00:00:00 2001 From: Jonathan Lange Date: Wed, 10 Jan 2018 10:25:40 +0000 Subject: [PATCH 10/13] Tests for alertmanager interaction Fixes errors that occur when alertmanager config is set but rules files are not. --- pkg/configs/configs.go | 7 ++- pkg/configs/db/memory/memory.go | 16 +++++-- pkg/configs/db/postgres/postgres.go | 6 ++- pkg/ruler/api_test.go | 66 +++++++++++++++++++++++++++++ pkg/ruler/configs.go | 5 ++- 5 files changed, 93 insertions(+), 7 deletions(-) diff --git a/pkg/configs/configs.go b/pkg/configs/configs.go index abed3067f0d..bdedcafea3e 100644 --- a/pkg/configs/configs.go +++ b/pkg/configs/configs.go @@ -31,8 +31,11 @@ type View struct { } // GetVersionedRulesConfig specializes the view to just the rules config. -func (v View) GetVersionedRulesConfig() VersionedRulesConfig { - return VersionedRulesConfig{ +func (v View) GetVersionedRulesConfig() *VersionedRulesConfig { + if v.Config.RulesFiles == nil { + return nil + } + return &VersionedRulesConfig{ ID: v.ID, Config: v.Config.RulesFiles, } diff --git a/pkg/configs/db/memory/memory.go b/pkg/configs/db/memory/memory.go index 9d50591c116..a743d5b8e0f 100644 --- a/pkg/configs/db/memory/memory.go +++ b/pkg/configs/db/memory/memory.go @@ -64,7 +64,11 @@ func (d *DB) GetRulesConfig(userID string) (configs.VersionedRulesConfig, error) if !ok { return configs.VersionedRulesConfig{}, sql.ErrNoRows } - return c.GetVersionedRulesConfig(), nil + cfg := c.GetVersionedRulesConfig() + if cfg == nil { + return configs.VersionedRulesConfig{}, sql.ErrNoRows + } + return *cfg, nil } // SetRulesConfig sets the rules config for a user. @@ -86,7 +90,10 @@ func (d *DB) SetRulesConfig(userID string, oldConfig, newConfig configs.RulesCon func (d *DB) GetAllRulesConfigs() (map[string]configs.VersionedRulesConfig, error) { cfgs := map[string]configs.VersionedRulesConfig{} for user, c := range d.cfgs { - cfgs[user] = c.GetVersionedRulesConfig() + cfg := c.GetVersionedRulesConfig() + if cfg != nil { + cfgs[user] = *cfg + } } return cfgs, nil } @@ -99,7 +106,10 @@ func (d *DB) GetRulesConfigs(since configs.ID) (map[string]configs.VersionedRule if c.ID <= since { continue } - cfgs[user] = c.GetVersionedRulesConfig() + cfg := c.GetVersionedRulesConfig() + if cfg != nil { + cfgs[user] = *cfg + } } return cfgs, nil } diff --git a/pkg/configs/db/postgres/postgres.go b/pkg/configs/db/postgres/postgres.go index 61901246212..74e7407c6b7 100644 --- a/pkg/configs/db/postgres/postgres.go +++ b/pkg/configs/db/postgres/postgres.go @@ -141,7 +141,11 @@ func (d DB) GetRulesConfig(userID string) (configs.VersionedRulesConfig, error) if err != nil { return configs.VersionedRulesConfig{}, err } - return current.GetVersionedRulesConfig(), nil + cfg := current.GetVersionedRulesConfig() + if cfg == nil { + return configs.VersionedRulesConfig{}, sql.ErrNoRows + } + return *cfg, nil } // SetRulesConfig sets the current alertmanager config for a user. diff --git a/pkg/ruler/api_test.go b/pkg/ruler/api_test.go index 83e57df9a8f..5850dc92fd3 100644 --- a/pkg/ruler/api_test.go +++ b/pkg/ruler/api_test.go @@ -14,6 +14,7 @@ import ( "github.com/weaveworks/common/user" "github.com/weaveworks/cortex/pkg/configs" + "github.com/weaveworks/cortex/pkg/configs/api" "github.com/weaveworks/cortex/pkg/configs/db" "github.com/weaveworks/cortex/pkg/configs/db/dbtest" ) @@ -291,3 +292,68 @@ func Test_GetConfigs_IncludesNewerConfigsAndExcludesOlder(t *testing.T) { userID3: config3, }, found) } + +// postAlertmanagerConfig posts an alertmanager config to the alertmanager configs API. +func postAlertmanagerConfig(t *testing.T, userID, configFile string) { + config := configs.Config{ + AlertmanagerConfig: configFile, + RulesFiles: nil, + } + b, err := json.Marshal(config) + require.NoError(t, err) + reader := bytes.NewReader(b) + configsAPI := api.New(database) + w := requestAsUser(t, configsAPI, userID, "POST", "/api/prom/configs/alertmanager", reader) + require.Equal(t, http.StatusNoContent, w.Code) +} + +// getAlertmanagerConfig posts an alertmanager config to the alertmanager configs API. +func getAlertmanagerConfig(t *testing.T, userID string) string { + w := requestAsUser(t, api.New(database), userID, "GET", "/api/prom/configs/alertmanager", nil) + var x configs.View + b := w.Body.Bytes() + err := json.Unmarshal(b, &x) + require.NoError(t, err, "Could not unmarshal JSON: %v", string(b)) + return x.Config.AlertmanagerConfig +} + +// If a user has only got alertmanager config set, then we learn nothing about them via GetConfigs. +func Test_AlertmanagerConfig_NotInAllConfigs(t *testing.T) { + setup(t) + defer cleanup(t) + + config := makeString(` + # Config no. %d. + route: + receiver: noop + + receivers: + - name: noop`) + postAlertmanagerConfig(t, makeUserID(), config) + + found, err := privateAPI.GetConfigs(0) + assert.NoError(t, err, "error getting configs") + assert.Equal(t, map[string]configs.VersionedRulesConfig{}, found) +} + +// Setting a ruler config doesn't change alertmanager config. +func Test_AlertmanagerConfig_RulerConfigDoesntChangeIt(t *testing.T) { + setup(t) + defer cleanup(t) + + userID := makeUserID() + alertmanagerConfig := makeString(` + # Config no. %d. + route: + receiver: noop + + receivers: + - name: noop`) + postAlertmanagerConfig(t, userID, alertmanagerConfig) + + rulerConfig := makeRulerConfig() + post(t, userID, nil, rulerConfig) + + newAlertmanagerConfig := getAlertmanagerConfig(t, userID) + assert.Equal(t, alertmanagerConfig, newAlertmanagerConfig) +} diff --git a/pkg/ruler/configs.go b/pkg/ruler/configs.go index c1caa04aca8..10c17e4055b 100644 --- a/pkg/ruler/configs.go +++ b/pkg/ruler/configs.go @@ -75,7 +75,10 @@ func (c configsClient) GetConfigs(since configs.ID) (map[string]configs.Versione } configs := map[string]configs.VersionedRulesConfig{} for id, view := range response.Configs { - configs[id] = view.GetVersionedRulesConfig() + cfg := view.GetVersionedRulesConfig() + if cfg != nil { + configs[id] = *cfg + } } return configs, nil } From 4585474d92191350aec7098a8f64cf5bb4948053 Mon Sep 17 00:00:00 2001 From: Jonathan Lange Date: Wed, 10 Jan 2018 09:56:04 +0000 Subject: [PATCH 11/13] Update documentation to refer to CAS Fixes 6d4f8be --- pkg/configs/db/db.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/configs/db/db.go b/pkg/configs/db/db.go index 2232d2977ac..a91aac499fd 100644 --- a/pkg/configs/db/db.go +++ b/pkg/configs/db/db.go @@ -26,7 +26,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { type RulesDB interface { // GetRulesConfig gets the user's ruler config GetRulesConfig(userID string) (configs.VersionedRulesConfig, error) - // SetRulesConfig sets the user's ruler config + // SetRulesConfig does a compare-and-swap (CAS) on the user's rules config. + // `oldConfig` must precisely match the current config in order to change the config to `newConfig`. + // Will return `true` if the config was updated, `false` otherwise. SetRulesConfig(userID string, oldConfig, newConfig configs.RulesConfig) (bool, error) // GetAllRulesConfigs gets all of the ruler configs From a9d2874b3bec3c85b81c7ea787fbd9fe15761fde Mon Sep 17 00:00:00 2001 From: Jonathan Lange Date: Wed, 10 Jan 2018 09:59:53 +0000 Subject: [PATCH 12/13] Specialized implementation of Equal for RulesConfig Fixes 6d4f8be --- pkg/configs/configs.go | 16 ++++++++++++++++ pkg/configs/db/memory/memory.go | 3 +-- pkg/configs/db/postgres/postgres.go | 3 +-- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/pkg/configs/configs.go b/pkg/configs/configs.go index bdedcafea3e..f9cb0082c59 100644 --- a/pkg/configs/configs.go +++ b/pkg/configs/configs.go @@ -44,6 +44,22 @@ func (v View) GetVersionedRulesConfig() *VersionedRulesConfig { // RulesConfig are the set of rules files for a particular organization. type RulesConfig map[string]string +// Equal compares two RulesConfigs for equality. +// +// instance Eq RulesConfig +func (c RulesConfig) Equal(o RulesConfig) bool { + if len(o) != len(c) { + return false + } + for k, v1 := range c { + v2, ok := o[k] + if !ok || v1 != v2 { + return false + } + } + return true +} + // Parse rules from the Cortex configuration. // // Strongly inspired by `loadGroups` in Prometheus. diff --git a/pkg/configs/db/memory/memory.go b/pkg/configs/db/memory/memory.go index a743d5b8e0f..b88f2b310c2 100644 --- a/pkg/configs/db/memory/memory.go +++ b/pkg/configs/db/memory/memory.go @@ -2,7 +2,6 @@ package memory import ( "database/sql" - "reflect" "github.com/weaveworks/cortex/pkg/configs" ) @@ -77,7 +76,7 @@ func (d *DB) SetRulesConfig(userID string, oldConfig, newConfig configs.RulesCon if !ok { return true, d.SetConfig(userID, configs.Config{RulesFiles: newConfig}) } - if !reflect.DeepEqual(c.Config.RulesFiles, oldConfig) { + if !oldConfig.Equal(c.Config.RulesFiles) { return false, nil } return true, d.SetConfig(userID, configs.Config{ diff --git a/pkg/configs/db/postgres/postgres.go b/pkg/configs/db/postgres/postgres.go index 74e7407c6b7..aca0d3ceb07 100644 --- a/pkg/configs/db/postgres/postgres.go +++ b/pkg/configs/db/postgres/postgres.go @@ -4,7 +4,6 @@ import ( "database/sql" "encoding/json" "errors" - "reflect" "github.com/Masterminds/squirrel" "github.com/go-kit/kit/log/level" @@ -159,7 +158,7 @@ func (d DB) SetRulesConfig(userID string, oldConfig, newConfig configs.RulesConf // The supplied oldConfig must match the current config. If no config // exists, then oldConfig must be nil. Otherwise, it must exactly // equal the existing config. - if !((err == sql.ErrNoRows && oldConfig == nil) || reflect.DeepEqual(current.Config.RulesFiles, oldConfig)) { + if !((err == sql.ErrNoRows && oldConfig == nil) || oldConfig.Equal(current.Config.RulesFiles)) { return nil } new := configs.Config{ From fd9fc8dc21aab4a6a13777d380a5845d946417bb Mon Sep 17 00:00:00 2001 From: Jonathan Lange Date: Wed, 10 Jan 2018 10:41:37 +0000 Subject: [PATCH 13/13] Better documentation for `findRulesConfig` Fixes 029f17c --- pkg/configs/db/postgres/postgres.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pkg/configs/db/postgres/postgres.go b/pkg/configs/db/postgres/postgres.go index aca0d3ceb07..b353ac7cf9b 100644 --- a/pkg/configs/db/postgres/postgres.go +++ b/pkg/configs/db/postgres/postgres.go @@ -171,11 +171,22 @@ func (d DB) SetRulesConfig(userID string, oldConfig, newConfig configs.RulesConf return updated, err } +// findRulesConfigs helps GetAllRulesConfigs and GetRulesConfigs retrieve the +// set of all active rules configurations across all our users. func (d DB) findRulesConfigs(filter squirrel.Sqlizer) (map[string]configs.VersionedRulesConfig, error) { rows, err := d.Select("id", "owner_id", "config ->> 'rules_files'"). Options("DISTINCT ON (owner_id)"). From("configs"). Where(filter). + // `->>` gets a JSON object field as text. When a config row exists + // and alertmanager config is provided but ruler config has not yet + // been, the 'rules_files' key will have an empty JSON object as its + // value. This is (probably) the most efficient way to test for a + // non-empty `rules_files` key. + // + // This whole situation is way too complicated. See + // https://github.com/weaveworks/cortex/issues/619 for the whole + // story, and our plans to improve it. Where("config ->> 'rules_files' <> '{}'"). OrderBy("owner_id, id DESC"). Query()