diff --git a/Makefile b/Makefile index fe234248633..76d0a86a991 100644 --- a/Makefile +++ b/Makefile @@ -106,7 +106,7 @@ shell: build-image/$(UPTODATE) bash configs-integration-test: - /bin/bash -c "go test -tags 'netgo integration' -timeout 30s ./pkg/configs/..." + /bin/bash -c "go test -tags 'netgo integration' -timeout 30s ./pkg/configs/... ./pkg/ruler/..." endif diff --git a/cmd/lite/main.go b/cmd/lite/main.go index 9bdbed2a71e..669ef30655f 100644 --- a/cmd/lite/main.go +++ b/cmd/lite/main.go @@ -42,6 +42,7 @@ func main() { chunkStoreConfig chunk.StoreConfig distributorConfig distributor.Config ingesterConfig ingester.Config + configStoreConfig ruler.ConfigStoreConfig rulerConfig ruler.Config schemaConfig chunk.SchemaConfig storageConfig storage.Config @@ -52,7 +53,7 @@ func main() { // Ingester needs to know our gRPC listen port. ingesterConfig.ListenPort = &serverConfig.GRPCListenPort util.RegisterFlags(&serverConfig, &chunkStoreConfig, &distributorConfig, - &ingesterConfig, &rulerConfig, &storageConfig, &schemaConfig, &logLevel) + &ingesterConfig, &configStoreConfig, &rulerConfig, &storageConfig, &schemaConfig, &logLevel) flag.BoolVar(&unauthenticated, "unauthenticated", false, "Set to true to disable multitenancy.") flag.Parse() schemaConfig.MaxChunkAge = ingesterConfig.MaxChunkAge @@ -122,7 +123,12 @@ func main() { tableManager.Start() defer tableManager.Stop() - if rulerConfig.ConfigsAPIURL.String() != "" { + if configStoreConfig.ConfigsAPIURL.String() != "" || configStoreConfig.DBConfig.URI != "" { + rulesAPI, err := ruler.NewRulesAPI(configStoreConfig) + if err != nil { + level.Error(util.Logger).Log("msg", "error initializing ruler config store", "err", err) + os.Exit(1) + } rlr, err := ruler.NewRuler(rulerConfig, dist, chunkStore) if err != nil { level.Error(util.Logger).Log("msg", "error initializing ruler", "err", err) @@ -130,7 +136,7 @@ func main() { } defer rlr.Stop() - rulerServer, err := ruler.NewServer(rulerConfig, rlr) + rulerServer, err := ruler.NewServer(rulerConfig, rlr, rulesAPI) if err != nil { level.Error(util.Logger).Log("msg", "error initializing ruler server", "err", err) os.Exit(1) @@ -163,6 +169,18 @@ func main() { }) } + // Only serve the API for setting & getting rules configs if the database + // was provided. Allows for smoother migration. See + // https://github.com/weaveworks/cortex/issues/619 + if configStoreConfig.DBConfig.URI != "" { + a, err := ruler.NewAPIFromConfig(configStoreConfig.DBConfig) + if err != nil { + level.Error(util.Logger).Log("msg", "error initializing public rules API", "err", err) + os.Exit(1) + } + a.RegisterRoutes(server.HTTP) + } + subrouter := server.HTTP.PathPrefix("/api/prom").Subrouter() subrouter.PathPrefix("/api/v1").Handler(activeMiddleware.Wrap(promRouter)) subrouter.Path("/read").Handler(activeMiddleware.Wrap(http.HandlerFunc(sampleQueryable.RemoteReadHandler))) diff --git a/cmd/ruler/main.go b/cmd/ruler/main.go index 3376a2b83c3..388f32c65f4 100644 --- a/cmd/ruler/main.go +++ b/cmd/ruler/main.go @@ -32,10 +32,11 @@ func main() { chunkStoreConfig chunk.StoreConfig schemaConfig chunk.SchemaConfig storageConfig storage.Config + configStoreConfig ruler.ConfigStoreConfig logLevel util.LogLevel ) util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig, - &rulerConfig, &chunkStoreConfig, &storageConfig, &schemaConfig, &logLevel) + &rulerConfig, &chunkStoreConfig, &storageConfig, &schemaConfig, &configStoreConfig, &logLevel) flag.Parse() util.InitLogger(logLevel.AllowedLevel) @@ -75,7 +76,13 @@ func main() { } defer rlr.Stop() - rulerServer, err := ruler.NewServer(rulerConfig, rlr) + rulesAPI, err := ruler.NewRulesAPI(configStoreConfig) + if err != nil { + level.Error(util.Logger).Log("msg", "error initializing rules API", "err", err) + os.Exit(1) + } + + rulerServer, err := ruler.NewServer(rulerConfig, rlr, rulesAPI) if err != nil { level.Error(util.Logger).Log("msg", "error initializing ruler server: %v", err) os.Exit(1) @@ -89,6 +96,18 @@ func main() { } defer server.Shutdown() + // Only serve the API for setting & getting rules configs if the database + // was provided. Allows for smoother migration. See + // https://github.com/weaveworks/cortex/issues/619 + if configStoreConfig.DBConfig.URI != "" { + a, err := ruler.NewAPIFromConfig(configStoreConfig.DBConfig) + if err != nil { + level.Error(util.Logger).Log("msg", "error initializing public rules API", "err", err) + os.Exit(1) + } + a.RegisterRoutes(server.HTTP) + } + server.HTTP.Handle("/ring", r) server.Run() } diff --git a/pkg/configs/client/configs.go b/pkg/configs/client/configs.go index df30c29ca7a..a26adad7619 100644 --- a/pkg/configs/client/configs.go +++ b/pkg/configs/client/configs.go @@ -85,7 +85,8 @@ func AlertmanagerConfigFromConfig(c configs.Config) (*config.Config, error) { return cfg, nil } -func getConfigs(endpoint string, timeout time.Duration, since configs.ID) (*ConfigsResponse, error) { +// GetConfigs gets configurations from the configs server. +func GetConfigs(endpoint string, timeout time.Duration, since configs.ID) (*ConfigsResponse, error) { req, err := http.NewRequest("GET", endpoint, nil) if err != nil { return nil, err @@ -120,22 +121,5 @@ func (c *AlertManagerConfigsAPI) GetConfigs(since configs.ID) (*ConfigsResponse, suffix = fmt.Sprintf("?since=%d", since) } endpoint := fmt.Sprintf("%s/private/api/prom/configs/alertmanager%s", c.URL.String(), suffix) - return getConfigs(endpoint, c.Timeout, since) -} - -// RulesAPI allows retrieving recording and alerting rules. -type RulesAPI struct { - URL *url.URL - Timeout time.Duration -} - -// GetConfigs returns all Cortex configurations from a configs API server -// that have been updated after the given configs.ID was last updated. -func (c *RulesAPI) GetConfigs(since configs.ID) (*ConfigsResponse, error) { - suffix := "" - if since != 0 { - suffix = fmt.Sprintf("?since=%d", since) - } - endpoint := fmt.Sprintf("%s/private/api/prom/configs/rules%s", c.URL.String(), suffix) - return getConfigs(endpoint, c.Timeout, since) + return GetConfigs(endpoint, c.Timeout, since) } diff --git a/pkg/configs/configs.go b/pkg/configs/configs.go index a568a8f7304..f9cb0082c59 100644 --- a/pkg/configs/configs.go +++ b/pkg/configs/configs.go @@ -1,5 +1,13 @@ package configs +import ( + "fmt" + + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/rules" + "github.com/weaveworks/cortex/pkg/util" +) + // An ID is the ID of a single users's Cortex configuration. When a // configuration changes, it gets a new ID. type ID int @@ -7,8 +15,8 @@ type ID int // A Config is a Cortex configuration for a single user. type Config struct { // RulesFiles maps from a rules filename to file contents. - RulesFiles map[string]string `json:"rules_files"` - AlertmanagerConfig string `json:"alertmanager_config"` + RulesFiles RulesConfig `json:"rules_files"` + AlertmanagerConfig string `json:"alertmanager_config"` } // View is what's returned from the Weave Cloud configs service @@ -21,3 +29,70 @@ type View struct { ID ID `json:"id"` Config Config `json:"config"` } + +// GetVersionedRulesConfig specializes the view to just the rules config. +func (v View) GetVersionedRulesConfig() *VersionedRulesConfig { + if v.Config.RulesFiles == nil { + return nil + } + return &VersionedRulesConfig{ + ID: v.ID, + Config: v.Config.RulesFiles, + } +} + +// RulesConfig are the set of rules files for a particular organization. +type RulesConfig map[string]string + +// Equal compares two RulesConfigs for equality. +// +// instance Eq RulesConfig +func (c RulesConfig) Equal(o RulesConfig) bool { + if len(o) != len(c) { + return false + } + for k, v1 := range c { + v2, ok := o[k] + if !ok || v1 != v2 { + return false + } + } + return true +} + +// Parse rules from the Cortex configuration. +// +// Strongly inspired by `loadGroups` in Prometheus. +func (c RulesConfig) Parse() ([]rules.Rule, error) { + result := []rules.Rule{} + for fn, content := range c { + stmts, err := promql.ParseStmts(content) + if err != nil { + return nil, fmt.Errorf("error parsing %s: %s", fn, err) + } + + for _, stmt := range stmts { + var rule rules.Rule + + switch r := stmt.(type) { + case *promql.AlertStmt: + rule = rules.NewAlertingRule(r.Name, r.Expr, r.Duration, r.Labels, r.Annotations, util.Logger) + + case *promql.RecordStmt: + rule = rules.NewRecordingRule(r.Name, r.Expr, r.Labels) + + default: + return nil, fmt.Errorf("ruler.GetRules: unknown statement type") + } + result = append(result, rule) + } + } + return result, nil +} + +// VersionedRulesConfig is a RulesConfig together with a version. +// `data Versioned a = Versioned { id :: ID , config :: a }` +type VersionedRulesConfig struct { + ID ID `json:"id"` + Config RulesConfig `json:"config"` +} diff --git a/pkg/configs/db/db.go b/pkg/configs/db/db.go index 3fa3817172c..a91aac499fd 100644 --- a/pkg/configs/db/db.go +++ b/pkg/configs/db/db.go @@ -22,8 +22,26 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { flag.StringVar(&cfg.MigrationsDir, "database.migrations", "", "Path where the database migration files can be found") } +// RulesDB has ruler-specific DB interfaces. +type RulesDB interface { + // GetRulesConfig gets the user's ruler config + GetRulesConfig(userID string) (configs.VersionedRulesConfig, error) + // SetRulesConfig does a compare-and-swap (CAS) on the user's rules config. + // `oldConfig` must precisely match the current config in order to change the config to `newConfig`. + // Will return `true` if the config was updated, `false` otherwise. + SetRulesConfig(userID string, oldConfig, newConfig configs.RulesConfig) (bool, error) + + // GetAllRulesConfigs gets all of the ruler configs + GetAllRulesConfigs() (map[string]configs.VersionedRulesConfig, error) + // GetRulesConfigs gets all of the configs that have been added or have + // changed since the provided config. + GetRulesConfigs(since configs.ID) (map[string]configs.VersionedRulesConfig, error) +} + // DB is the interface for the database. type DB interface { + RulesDB + GetConfig(userID string) (configs.View, error) SetConfig(userID string, cfg configs.Config) error @@ -53,3 +71,12 @@ func New(cfg Config) (DB, error) { } return traced{timed{d}}, nil } + +// NewRulesDB creates a new rules config database. +func NewRulesDB(cfg Config) (RulesDB, error) { + db, err := New(cfg) + if err != nil { + return nil, err + } + return db, err +} diff --git a/pkg/configs/db/memory/memory.go b/pkg/configs/db/memory/memory.go index b48232fed3a..b88f2b310c2 100644 --- a/pkg/configs/db/memory/memory.go +++ b/pkg/configs/db/memory/memory.go @@ -6,28 +6,16 @@ import ( "github.com/weaveworks/cortex/pkg/configs" ) -type config struct { - cfg configs.Config - id configs.ID -} - -func (c config) toView() configs.View { - return configs.View{ - ID: c.id, - Config: c.cfg, - } -} - // DB is an in-memory database for testing, and local development type DB struct { - cfgs map[string]config + cfgs map[string]configs.View id uint } // New creates a new in-memory database func New(_, _ string) (*DB, error) { return &DB{ - cfgs: map[string]config{}, + cfgs: map[string]configs.View{}, id: 0, }, nil } @@ -38,31 +26,27 @@ func (d *DB) GetConfig(userID string) (configs.View, error) { if !ok { return configs.View{}, sql.ErrNoRows } - return c.toView(), nil + return c, nil } // SetConfig sets configuration for a user. func (d *DB) SetConfig(userID string, cfg configs.Config) error { - d.cfgs[userID] = config{cfg: cfg, id: configs.ID(d.id)} + d.cfgs[userID] = configs.View{Config: cfg, ID: configs.ID(d.id)} d.id++ return nil } // GetAllConfigs gets all of the configs. func (d *DB) GetAllConfigs() (map[string]configs.View, error) { - cfgs := map[string]configs.View{} - for user, c := range d.cfgs { - cfgs[user] = c.toView() - } - return cfgs, nil + return d.cfgs, nil } // GetConfigs gets all of the configs that have changed recently. func (d *DB) GetConfigs(since configs.ID) (map[string]configs.View, error) { cfgs := map[string]configs.View{} for user, c := range d.cfgs { - if c.id > since { - cfgs[user] = c.toView() + if c.ID > since { + cfgs[user] = c } } return cfgs, nil @@ -72,3 +56,59 @@ func (d *DB) GetConfigs(since configs.ID) (map[string]configs.View, error) { func (d *DB) Close() error { return nil } + +// GetRulesConfig gets the rules config for a user. +func (d *DB) GetRulesConfig(userID string) (configs.VersionedRulesConfig, error) { + c, ok := d.cfgs[userID] + if !ok { + return configs.VersionedRulesConfig{}, sql.ErrNoRows + } + cfg := c.GetVersionedRulesConfig() + if cfg == nil { + return configs.VersionedRulesConfig{}, sql.ErrNoRows + } + return *cfg, nil +} + +// SetRulesConfig sets the rules config for a user. +func (d *DB) SetRulesConfig(userID string, oldConfig, newConfig configs.RulesConfig) (bool, error) { + c, ok := d.cfgs[userID] + if !ok { + return true, d.SetConfig(userID, configs.Config{RulesFiles: newConfig}) + } + if !oldConfig.Equal(c.Config.RulesFiles) { + return false, nil + } + return true, d.SetConfig(userID, configs.Config{ + AlertmanagerConfig: c.Config.AlertmanagerConfig, + RulesFiles: newConfig, + }) +} + +// GetAllRulesConfigs gets the rules configs for all users that have them. +func (d *DB) GetAllRulesConfigs() (map[string]configs.VersionedRulesConfig, error) { + cfgs := map[string]configs.VersionedRulesConfig{} + for user, c := range d.cfgs { + cfg := c.GetVersionedRulesConfig() + if cfg != nil { + cfgs[user] = *cfg + } + } + return cfgs, nil +} + +// GetRulesConfigs gets the rules configs that have changed +// since the given config version. +func (d *DB) GetRulesConfigs(since configs.ID) (map[string]configs.VersionedRulesConfig, error) { + cfgs := map[string]configs.VersionedRulesConfig{} + for user, c := range d.cfgs { + if c.ID <= since { + continue + } + cfg := c.GetVersionedRulesConfig() + if cfg != nil { + cfgs[user] = *cfg + } + } + return cfgs, nil +} diff --git a/pkg/configs/db/postgres/postgres.go b/pkg/configs/db/postgres/postgres.go index 83b81fe6e36..b353ac7cf9b 100644 --- a/pkg/configs/db/postgres/postgres.go +++ b/pkg/configs/db/postgres/postgres.go @@ -134,6 +134,97 @@ func (d DB) GetConfigs(since configs.ID) (map[string]configs.View, error) { }) } +// GetRulesConfig gets the latest alertmanager config for a user. +func (d DB) GetRulesConfig(userID string) (configs.VersionedRulesConfig, error) { + current, err := d.GetConfig(userID) + if err != nil { + return configs.VersionedRulesConfig{}, err + } + cfg := current.GetVersionedRulesConfig() + if cfg == nil { + return configs.VersionedRulesConfig{}, sql.ErrNoRows + } + return *cfg, nil +} + +// SetRulesConfig sets the current alertmanager config for a user. +func (d DB) SetRulesConfig(userID string, oldConfig, newConfig configs.RulesConfig) (bool, error) { + updated := false + err := d.Transaction(func(tx DB) error { + current, err := d.GetConfig(userID) + if err != nil && err != sql.ErrNoRows { + return err + } + // The supplied oldConfig must match the current config. If no config + // exists, then oldConfig must be nil. Otherwise, it must exactly + // equal the existing config. + if !((err == sql.ErrNoRows && oldConfig == nil) || oldConfig.Equal(current.Config.RulesFiles)) { + return nil + } + new := configs.Config{ + AlertmanagerConfig: current.Config.AlertmanagerConfig, + RulesFiles: newConfig, + } + updated = true + return d.SetConfig(userID, new) + }) + return updated, err +} + +// findRulesConfigs helps GetAllRulesConfigs and GetRulesConfigs retrieve the +// set of all active rules configurations across all our users. +func (d DB) findRulesConfigs(filter squirrel.Sqlizer) (map[string]configs.VersionedRulesConfig, error) { + rows, err := d.Select("id", "owner_id", "config ->> 'rules_files'"). + Options("DISTINCT ON (owner_id)"). + From("configs"). + Where(filter). + // `->>` gets a JSON object field as text. When a config row exists + // and alertmanager config is provided but ruler config has not yet + // been, the 'rules_files' key will have an empty JSON object as its + // value. This is (probably) the most efficient way to test for a + // non-empty `rules_files` key. + // + // This whole situation is way too complicated. See + // https://github.com/weaveworks/cortex/issues/619 for the whole + // story, and our plans to improve it. + Where("config ->> 'rules_files' <> '{}'"). + OrderBy("owner_id, id DESC"). + Query() + if err != nil { + return nil, err + } + defer rows.Close() + cfgs := map[string]configs.VersionedRulesConfig{} + for rows.Next() { + var cfg configs.VersionedRulesConfig + var userID string + var cfgBytes []byte + err = rows.Scan(&cfg.ID, &userID, &cfgBytes) + if err != nil { + return nil, err + } + err = json.Unmarshal(cfgBytes, &cfg.Config) + if err != nil { + return nil, err + } + cfgs[userID] = cfg + } + return cfgs, nil +} + +// GetAllRulesConfigs gets all alertmanager configs for all users. +func (d DB) GetAllRulesConfigs() (map[string]configs.VersionedRulesConfig, error) { + return d.findRulesConfigs(activeConfig) +} + +// GetRulesConfigs gets all the alertmanager configs that have changed since a given config. +func (d DB) GetRulesConfigs(since configs.ID) (map[string]configs.VersionedRulesConfig, error) { + return d.findRulesConfigs(squirrel.And{ + activeConfig, + squirrel.Gt{"id": since}, + }) +} + // Transaction runs the given function in a postgres transaction. If fn returns // an error the txn will be rolled back. func (d DB) Transaction(f func(DB) error) error { diff --git a/pkg/configs/db/timed.go b/pkg/configs/db/timed.go index 5da7f94ba16..dbc2fd9a83f 100644 --- a/pkg/configs/db/timed.go +++ b/pkg/configs/db/timed.go @@ -74,3 +74,35 @@ func (t timed) Close() error { return t.d.Close() }) } + +func (t timed) GetRulesConfig(userID string) (cfg configs.VersionedRulesConfig, err error) { + t.timeRequest("GetRulesConfig", func(_ context.Context) error { + cfg, err = t.d.GetRulesConfig(userID) + return err + }) + return +} + +func (t timed) SetRulesConfig(userID string, oldCfg, newCfg configs.RulesConfig) (updated bool, err error) { + t.timeRequest("SetRulesConfig", func(_ context.Context) error { + updated, err = t.d.SetRulesConfig(userID, oldCfg, newCfg) + return err + }) + return +} + +func (t timed) GetAllRulesConfigs() (cfgs map[string]configs.VersionedRulesConfig, err error) { + t.timeRequest("GetAllRulesConfigs", func(_ context.Context) error { + cfgs, err = t.d.GetAllRulesConfigs() + return err + }) + return +} + +func (t timed) GetRulesConfigs(since configs.ID) (cfgs map[string]configs.VersionedRulesConfig, err error) { + t.timeRequest("GetRulesConfigs", func(_ context.Context) error { + cfgs, err = t.d.GetRulesConfigs(since) + return err + }) + return +} diff --git a/pkg/configs/db/traced.go b/pkg/configs/db/traced.go index 1f20c15c5b0..89f4a58c464 100644 --- a/pkg/configs/db/traced.go +++ b/pkg/configs/db/traced.go @@ -41,3 +41,23 @@ func (t traced) Close() (err error) { defer func() { t.trace("Close", err) }() return t.d.Close() } + +func (t traced) GetRulesConfig(userID string) (cfg configs.VersionedRulesConfig, err error) { + defer func() { t.trace("GetRulesConfig", userID, cfg, err) }() + return t.d.GetRulesConfig(userID) +} + +func (t traced) SetRulesConfig(userID string, oldCfg, newCfg configs.RulesConfig) (updated bool, err error) { + defer func() { t.trace("SetRulesConfig", userID, oldCfg, newCfg, updated, err) }() + return t.d.SetRulesConfig(userID, oldCfg, newCfg) +} + +func (t traced) GetAllRulesConfigs() (cfgs map[string]configs.VersionedRulesConfig, err error) { + defer func() { t.trace("GetAllRulesConfigs", cfgs, err) }() + return t.d.GetAllRulesConfigs() +} + +func (t traced) GetRulesConfigs(since configs.ID) (cfgs map[string]configs.VersionedRulesConfig, err error) { + defer func() { t.trace("GetConfigs", since, cfgs, err) }() + return t.d.GetRulesConfigs(since) +} diff --git a/pkg/ruler/api.go b/pkg/ruler/api.go new file mode 100644 index 00000000000..f98de541a20 --- /dev/null +++ b/pkg/ruler/api.go @@ -0,0 +1,116 @@ +package ruler + +import ( + "database/sql" + "encoding/json" + "fmt" + "net/http" + + "github.com/go-kit/kit/log/level" + "github.com/gorilla/mux" + + "github.com/weaveworks/common/user" + "github.com/weaveworks/cortex/pkg/configs" + "github.com/weaveworks/cortex/pkg/configs/db" + "github.com/weaveworks/cortex/pkg/util" +) + +// API implements the configs api. +type API struct { + db db.RulesDB + http.Handler +} + +// NewAPIFromConfig makes a new API from our database config. +func NewAPIFromConfig(cfg db.Config) (*API, error) { + db, err := db.NewRulesDB(cfg) + if err != nil { + return nil, err + } + return NewAPI(db), nil +} + +// NewAPI creates a new API. +func NewAPI(db db.RulesDB) *API { + a := &API{db: db} + r := mux.NewRouter() + a.RegisterRoutes(r) + a.Handler = r + return a +} + +// RegisterRoutes registers the configs API HTTP routes with the provided Router. +func (a *API) RegisterRoutes(r *mux.Router) { + for _, route := range []struct { + name, method, path string + handler http.HandlerFunc + }{ + {"get_rules", "GET", "/api/prom/rules", a.getConfig}, + {"cas_rules", "POST", "/api/prom/rules", a.casConfig}, + } { + r.Handle(route.path, route.handler).Methods(route.method).Name(route.name) + } +} + +// getConfig returns the request configuration. +func (a *API) getConfig(w http.ResponseWriter, r *http.Request) { + userID, _, err := user.ExtractOrgIDFromHTTPRequest(r) + if err != nil { + http.Error(w, err.Error(), http.StatusUnauthorized) + return + } + logger := util.WithContext(r.Context(), util.Logger) + + cfg, err := a.db.GetRulesConfig(userID) + if err == sql.ErrNoRows { + http.Error(w, "No configuration", http.StatusNotFound) + return + } else if err != nil { + level.Error(logger).Log("msg", "error getting config", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(cfg); err != nil { + level.Error(logger).Log("msg", "error encoding config", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +type configUpdateRequest struct { + OldConfig configs.RulesConfig `json:"old_config"` + NewConfig configs.RulesConfig `json:"new_config"` +} + +func (a *API) casConfig(w http.ResponseWriter, r *http.Request) { + userID, _, err := user.ExtractOrgIDFromHTTPRequest(r) + if err != nil { + http.Error(w, err.Error(), http.StatusUnauthorized) + return + } + logger := util.WithContext(r.Context(), util.Logger) + + var updateReq configUpdateRequest + if err := json.NewDecoder(r.Body).Decode(&updateReq); err != nil { + level.Error(logger).Log("msg", "error decoding json body", "err", err) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + if _, err := updateReq.NewConfig.Parse(); err != nil { + level.Error(logger).Log("msg", "invalid rules", "err", err) + http.Error(w, fmt.Sprintf("Invalid rules: %v", err), http.StatusBadRequest) + return + } + updated, err := a.db.SetRulesConfig(userID, updateReq.OldConfig, updateReq.NewConfig) + if err != nil { + level.Error(logger).Log("msg", "error storing config", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if !updated { + http.Error(w, "Supplied configuration doesn't match current configuration", http.StatusConflict) + } + w.WriteHeader(http.StatusNoContent) +} diff --git a/pkg/ruler/api_test.go b/pkg/ruler/api_test.go new file mode 100644 index 00000000000..5850dc92fd3 --- /dev/null +++ b/pkg/ruler/api_test.go @@ -0,0 +1,359 @@ +package ruler + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/weaveworks/common/user" + "github.com/weaveworks/cortex/pkg/configs" + "github.com/weaveworks/cortex/pkg/configs/api" + "github.com/weaveworks/cortex/pkg/configs/db" + "github.com/weaveworks/cortex/pkg/configs/db/dbtest" +) + +const ( + endpoint = "/api/prom/rules" +) + +var ( + app *API + database db.DB + counter int + privateAPI RulesAPI +) + +// setup sets up the environment for the tests. +func setup(t *testing.T) { + database = dbtest.Setup(t) + app = NewAPI(database) + counter = 0 + privateAPI = dbStore{db: database} +} + +// cleanup cleans up the environment after a test. +func cleanup(t *testing.T) { + dbtest.Cleanup(t, database) +} + +// request makes a request to the configs API. +func request(t *testing.T, handler http.Handler, method, urlStr string, body io.Reader) *httptest.ResponseRecorder { + w := httptest.NewRecorder() + r, err := http.NewRequest(method, urlStr, body) + require.NoError(t, err) + handler.ServeHTTP(w, r) + return w +} + +// requestAsUser makes a request to the configs API as the given user. +func requestAsUser(t *testing.T, handler http.Handler, userID string, method, urlStr string, body io.Reader) *httptest.ResponseRecorder { + w := httptest.NewRecorder() + r, err := http.NewRequest(method, urlStr, body) + require.NoError(t, err) + r = r.WithContext(user.InjectOrgID(r.Context(), userID)) + user.InjectOrgIDIntoHTTPRequest(r.Context(), r) + handler.ServeHTTP(w, r) + return w +} + +// makeString makes a string, guaranteed to be unique within a test. +func makeString(pattern string) string { + counter++ + return fmt.Sprintf(pattern, counter) +} + +// makeUserID makes an arbitrary user ID. Guaranteed to be unique within a test. +func makeUserID() string { + return makeString("user%d") +} + +// makeRulerConfig makes an arbitrary ruler config +func makeRulerConfig() configs.RulesConfig { + return configs.RulesConfig(map[string]string{ + "filename.rules": makeString(` +# Config no. %d. +ALERT ScrapeFailed + IF up != 1 + FOR 10m + LABELS { severity="warning" } + ANNOTATIONS { + summary = "Scrape of {{$labels.job}} (pod: {{$labels.instance}}) failed.", + description = "Prometheus cannot reach the /metrics page on the {{$labels.instance}} pod.", + impact = "We have no monitoring data for {{$labels.job}} - {{$labels.instance}}. At worst, it's completely down. At best, we cannot reliably respond to operational issues.", + dashboardURL = "$${base_url}/admin/prometheus/targets", + } + `), + }) +} + +// parseVersionedRulesConfig parses a configs.VersionedRulesConfig from JSON. +func parseVersionedRulesConfig(t *testing.T, b []byte) configs.VersionedRulesConfig { + var x configs.VersionedRulesConfig + err := json.Unmarshal(b, &x) + require.NoError(t, err, "Could not unmarshal JSON: %v", string(b)) + return x +} + +// post a config +func post(t *testing.T, userID string, oldConfig configs.RulesConfig, newConfig configs.RulesConfig) configs.VersionedRulesConfig { + updateRequest := configUpdateRequest{ + OldConfig: oldConfig, + NewConfig: newConfig, + } + b, err := json.Marshal(updateRequest) + require.NoError(t, err) + reader := bytes.NewReader(b) + w := requestAsUser(t, app, userID, "POST", endpoint, reader) + require.Equal(t, http.StatusNoContent, w.Code) + return get(t, userID) +} + +// get a config +func get(t *testing.T, userID string) configs.VersionedRulesConfig { + w := requestAsUser(t, app, userID, "GET", endpoint, nil) + return parseVersionedRulesConfig(t, w.Body.Bytes()) +} + +// configs returns 404 if no config has been created yet. +func Test_GetConfig_NotFound(t *testing.T) { + setup(t) + defer cleanup(t) + + userID := makeUserID() + w := requestAsUser(t, app, userID, "GET", endpoint, nil) + assert.Equal(t, http.StatusNotFound, w.Code) +} + +// configs returns 401 to requests without authentication. +func Test_PostConfig_Anonymous(t *testing.T) { + setup(t) + defer cleanup(t) + + w := request(t, app, "POST", endpoint, nil) + assert.Equal(t, http.StatusUnauthorized, w.Code) +} + +// Posting to a configuration sets it so that you can get it again. +func Test_PostConfig_CreatesConfig(t *testing.T) { + setup(t) + defer cleanup(t) + + userID := makeUserID() + config := makeRulerConfig() + result := post(t, userID, nil, config) + assert.Equal(t, config, result.Config) +} + +// Posting an invalid config when there's none set returns an error and leaves the config unset. +func Test_PostConfig_InvalidNewConfig(t *testing.T) { + setup(t) + defer cleanup(t) + + userID := makeUserID() + invalidConfig := map[string]string{ + "some.rules": "invalid config", + } + updateRequest := configUpdateRequest{ + OldConfig: nil, + NewConfig: invalidConfig, + } + b, err := json.Marshal(updateRequest) + require.NoError(t, err) + reader := bytes.NewReader(b) + { + w := requestAsUser(t, app, userID, "POST", endpoint, reader) + require.Equal(t, http.StatusBadRequest, w.Code) + } + { + w := requestAsUser(t, app, userID, "GET", endpoint, nil) + require.Equal(t, http.StatusNotFound, w.Code) + } +} + +// Posting to a configuration sets it so that you can get it again. +func Test_PostConfig_UpdatesConfig(t *testing.T) { + setup(t) + defer cleanup(t) + + userID := makeUserID() + config1 := makeRulerConfig() + view1 := post(t, userID, nil, config1) + config2 := makeRulerConfig() + view2 := post(t, userID, config1, config2) + assert.True(t, view2.ID > view1.ID, "%v > %v", view2.ID, view1.ID) + assert.Equal(t, config2, view2.Config) +} + +// Posting an invalid config when there's one already set returns an error and leaves the config as is. +func Test_PostConfig_InvalidChangedConfig(t *testing.T) { + setup(t) + defer cleanup(t) + + userID := makeUserID() + config := makeRulerConfig() + post(t, userID, nil, config) + invalidConfig := map[string]string{ + "some.rules": "invalid config", + } + updateRequest := configUpdateRequest{ + OldConfig: nil, + NewConfig: invalidConfig, + } + b, err := json.Marshal(updateRequest) + require.NoError(t, err) + reader := bytes.NewReader(b) + { + w := requestAsUser(t, app, userID, "POST", endpoint, reader) + require.Equal(t, http.StatusBadRequest, w.Code) + } + result := get(t, userID) + assert.Equal(t, config, result.Config) +} + +// Different users can have different configurations. +func Test_PostConfig_MultipleUsers(t *testing.T) { + setup(t) + defer cleanup(t) + + userID1 := makeUserID() + userID2 := makeUserID() + config1 := post(t, userID1, nil, makeRulerConfig()) + config2 := post(t, userID2, nil, makeRulerConfig()) + foundConfig1 := get(t, userID1) + assert.Equal(t, config1, foundConfig1) + foundConfig2 := get(t, userID2) + assert.Equal(t, config2, foundConfig2) + assert.True(t, config2.ID > config1.ID, "%v > %v", config2.ID, config1.ID) +} + +// GetAllConfigs returns an empty list of configs if there aren't any. +func Test_GetAllConfigs_Empty(t *testing.T) { + setup(t) + defer cleanup(t) + + configs, err := privateAPI.GetConfigs(0) + assert.NoError(t, err, "error getting configs") + assert.Equal(t, 0, len(configs)) +} + +// GetAllConfigs returns all created configs. +func Test_GetAllConfigs(t *testing.T) { + setup(t) + defer cleanup(t) + + userID := makeUserID() + config := makeRulerConfig() + view := post(t, userID, nil, config) + + found, err := privateAPI.GetConfigs(0) + assert.NoError(t, err, "error getting configs") + assert.Equal(t, map[string]configs.VersionedRulesConfig{ + userID: view, + }, found) +} + +// GetAllConfigs returns the *newest* versions of all created configs. +func Test_GetAllConfigs_Newest(t *testing.T) { + setup(t) + defer cleanup(t) + + userID := makeUserID() + + config1 := post(t, userID, nil, makeRulerConfig()) + config2 := post(t, userID, config1.Config, makeRulerConfig()) + lastCreated := post(t, userID, config2.Config, makeRulerConfig()) + + found, err := privateAPI.GetConfigs(0) + assert.NoError(t, err, "error getting configs") + assert.Equal(t, map[string]configs.VersionedRulesConfig{ + userID: lastCreated, + }, found) +} + +func Test_GetConfigs_IncludesNewerConfigsAndExcludesOlder(t *testing.T) { + setup(t) + defer cleanup(t) + + post(t, makeUserID(), nil, makeRulerConfig()) + config2 := post(t, makeUserID(), nil, makeRulerConfig()) + userID3 := makeUserID() + config3 := post(t, userID3, nil, makeRulerConfig()) + + found, err := privateAPI.GetConfigs(config2.ID) + assert.NoError(t, err, "error getting configs") + assert.Equal(t, map[string]configs.VersionedRulesConfig{ + userID3: config3, + }, found) +} + +// postAlertmanagerConfig posts an alertmanager config to the alertmanager configs API. +func postAlertmanagerConfig(t *testing.T, userID, configFile string) { + config := configs.Config{ + AlertmanagerConfig: configFile, + RulesFiles: nil, + } + b, err := json.Marshal(config) + require.NoError(t, err) + reader := bytes.NewReader(b) + configsAPI := api.New(database) + w := requestAsUser(t, configsAPI, userID, "POST", "/api/prom/configs/alertmanager", reader) + require.Equal(t, http.StatusNoContent, w.Code) +} + +// getAlertmanagerConfig posts an alertmanager config to the alertmanager configs API. +func getAlertmanagerConfig(t *testing.T, userID string) string { + w := requestAsUser(t, api.New(database), userID, "GET", "/api/prom/configs/alertmanager", nil) + var x configs.View + b := w.Body.Bytes() + err := json.Unmarshal(b, &x) + require.NoError(t, err, "Could not unmarshal JSON: %v", string(b)) + return x.Config.AlertmanagerConfig +} + +// If a user has only got alertmanager config set, then we learn nothing about them via GetConfigs. +func Test_AlertmanagerConfig_NotInAllConfigs(t *testing.T) { + setup(t) + defer cleanup(t) + + config := makeString(` + # Config no. %d. + route: + receiver: noop + + receivers: + - name: noop`) + postAlertmanagerConfig(t, makeUserID(), config) + + found, err := privateAPI.GetConfigs(0) + assert.NoError(t, err, "error getting configs") + assert.Equal(t, map[string]configs.VersionedRulesConfig{}, found) +} + +// Setting a ruler config doesn't change alertmanager config. +func Test_AlertmanagerConfig_RulerConfigDoesntChangeIt(t *testing.T) { + setup(t) + defer cleanup(t) + + userID := makeUserID() + alertmanagerConfig := makeString(` + # Config no. %d. + route: + receiver: noop + + receivers: + - name: noop`) + postAlertmanagerConfig(t, userID, alertmanagerConfig) + + rulerConfig := makeRulerConfig() + post(t, userID, nil, rulerConfig) + + newAlertmanagerConfig := getAlertmanagerConfig(t, userID) + assert.Equal(t, alertmanagerConfig, newAlertmanagerConfig) +} diff --git a/pkg/ruler/configs.go b/pkg/ruler/configs.go new file mode 100644 index 00000000000..10c17e4055b --- /dev/null +++ b/pkg/ruler/configs.go @@ -0,0 +1,108 @@ +package ruler + +import ( + "flag" + "fmt" + "net/url" + "time" + + "github.com/weaveworks/cortex/pkg/configs" + configs_client "github.com/weaveworks/cortex/pkg/configs/client" + "github.com/weaveworks/cortex/pkg/configs/db" + "github.com/weaveworks/cortex/pkg/util" +) + +// ConfigStoreConfig says where we can find the ruler configs. +type ConfigStoreConfig struct { + DBConfig db.Config + + // DEPRECATED + ConfigsAPIURL util.URLValue + + // DEPRECATED. HTTP timeout duration for requests made to the Weave Cloud + // configs service. + ClientTimeout time.Duration +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *ConfigStoreConfig) RegisterFlags(f *flag.FlagSet) { + cfg.DBConfig.RegisterFlags(f) + f.Var(&cfg.ConfigsAPIURL, "ruler.configs.url", "DEPRECATED. URL of configs API server.") + f.DurationVar(&cfg.ClientTimeout, "ruler.client-timeout", 5*time.Second, "DEPRECATED. Timeout for requests to Weave Cloud configs service.") +} + +// RulesAPI is what the ruler needs from a config store to process rules. +type RulesAPI interface { + // GetConfigs returns all Cortex configurations from a configs API server + // that have been updated after the given configs.ID was last updated. + GetConfigs(since configs.ID) (map[string]configs.VersionedRulesConfig, error) +} + +// NewRulesAPI creates a new RulesAPI. +func NewRulesAPI(cfg ConfigStoreConfig) (RulesAPI, error) { + // All of this falderal is to allow for a smooth transition away from + // using the configs server and toward directly connecting to the database. + // See https://github.com/weaveworks/cortex/issues/619 + if cfg.DBConfig.URI == "" { + return configsClient{ + URL: cfg.ConfigsAPIURL.URL, + Timeout: cfg.ClientTimeout, + }, nil + } + db, err := db.NewRulesDB(cfg.DBConfig) + if err != nil { + return nil, err + } + return dbStore{db: db}, nil +} + +// configsClient allows retrieving recording and alerting rules from the configs server. +type configsClient struct { + URL *url.URL + Timeout time.Duration +} + +// GetConfigs implements RulesAPI. +func (c configsClient) GetConfigs(since configs.ID) (map[string]configs.VersionedRulesConfig, error) { + suffix := "" + if since != 0 { + suffix = fmt.Sprintf("?since=%d", since) + } + endpoint := fmt.Sprintf("%s/private/api/prom/configs/rules%s", c.URL.String(), suffix) + response, err := configs_client.GetConfigs(endpoint, c.Timeout, since) + if err != nil { + return nil, err + } + configs := map[string]configs.VersionedRulesConfig{} + for id, view := range response.Configs { + cfg := view.GetVersionedRulesConfig() + if cfg != nil { + configs[id] = *cfg + } + } + return configs, nil +} + +type dbStore struct { + db db.RulesDB +} + +// GetConfigs implements RulesAPI. +func (d dbStore) GetConfigs(since configs.ID) (map[string]configs.VersionedRulesConfig, error) { + if since == 0 { + return d.db.GetAllRulesConfigs() + } + return d.db.GetRulesConfigs(since) +} + +// getLatestConfigID gets the latest configs ID. +// max [latest, max (map getID cfgs)] +func getLatestConfigID(cfgs map[string]configs.VersionedRulesConfig, latest configs.ID) configs.ID { + ret := latest + for _, config := range cfgs { + if config.ID > ret { + ret = config.ID + } + } + return ret +} diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index ddccbe67ba0..2e54f1855d3 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -22,7 +22,6 @@ import ( "github.com/weaveworks/common/user" "github.com/weaveworks/cortex/pkg/chunk" - configs "github.com/weaveworks/cortex/pkg/configs/client" "github.com/weaveworks/cortex/pkg/distributor" "github.com/weaveworks/cortex/pkg/querier" "github.com/weaveworks/cortex/pkg/util" @@ -54,12 +53,6 @@ func init() { // Config is the configuration for the recording rules server. type Config struct { - ConfigsAPIURL util.URLValue - - // HTTP timeout duration for requests made to the Weave Cloud configs - // service. - ClientTimeout time.Duration - // This is used for template expansion in alerts; must be a valid URL ExternalURL util.URLValue @@ -84,10 +77,8 @@ type Config struct { // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.ExternalURL.URL, _ = url.Parse("") // Must be non-nil - f.Var(&cfg.ConfigsAPIURL, "ruler.configs.url", "URL of configs API server.") f.Var(&cfg.ExternalURL, "ruler.external.url", "URL of alerts return path.") f.DurationVar(&cfg.EvaluationInterval, "ruler.evaluation-interval", 15*time.Second, "How frequently to evaluate rules") - f.DurationVar(&cfg.ClientTimeout, "ruler.client-timeout", 5*time.Second, "Timeout for requests to Weave Cloud configs service.") f.IntVar(&cfg.NumWorkers, "ruler.num-workers", 1, "Number of rule evaluator worker routines in this process") f.Var(&cfg.AlertmanagerURL, "ruler.alertmanager-url", "URL of the Alertmanager to send notifications to.") f.BoolVar(&cfg.AlertmanagerDiscovery, "ruler.alertmanager-discovery", false, "Use DNS SRV records to discover alertmanager hosts.") @@ -280,13 +271,9 @@ type Server struct { } // NewServer makes a new rule processing server. -func NewServer(cfg Config, ruler *Ruler) (*Server, error) { - c := configs.RulesAPI{ - URL: cfg.ConfigsAPIURL.URL, - Timeout: cfg.ClientTimeout, - } +func NewServer(cfg Config, ruler *Ruler, rulesAPI RulesAPI) (*Server, error) { // TODO: Separate configuration for polling interval. - s := newScheduler(c, cfg.EvaluationInterval, cfg.EvaluationInterval) + s := newScheduler(rulesAPI, cfg.EvaluationInterval, cfg.EvaluationInterval) if cfg.NumWorkers <= 0 { return nil, fmt.Errorf("must have at least 1 worker, got %d", cfg.NumWorkers) } diff --git a/pkg/ruler/scheduler.go b/pkg/ruler/scheduler.go index 806d108d70c..729672e3dad 100644 --- a/pkg/ruler/scheduler.go +++ b/pkg/ruler/scheduler.go @@ -14,7 +14,6 @@ import ( "github.com/weaveworks/common/instrument" "github.com/weaveworks/cortex/pkg/configs" - configs_client "github.com/weaveworks/cortex/pkg/configs/client" "github.com/weaveworks/cortex/pkg/util" ) @@ -79,7 +78,7 @@ func (w workItem) String() string { } type scheduler struct { - configsAPI configs_client.RulesAPI + rulesAPI RulesAPI evaluationInterval time.Duration // how often we re-evaluate each rule set q *SchedulingQueue @@ -94,9 +93,9 @@ type scheduler struct { } // newScheduler makes a new scheduler. -func newScheduler(configsAPI configs_client.RulesAPI, evaluationInterval, pollInterval time.Duration) scheduler { +func newScheduler(rulesAPI RulesAPI, evaluationInterval, pollInterval time.Duration) scheduler { return scheduler{ - configsAPI: configsAPI, + rulesAPI: rulesAPI, evaluationInterval: evaluationInterval, pollInterval: pollInterval, q: NewSchedulingQueue(clockwork.NewRealClock()), @@ -137,7 +136,7 @@ func (s *scheduler) Stop() { // Load the full set of configurations from the server, retrying with backoff // until we can get them. -func (s *scheduler) loadAllConfigs() map[string]configs.View { +func (s *scheduler) loadAllConfigs() map[string]configs.VersionedRulesConfig { backoff := util.NewBackoff(context.Background(), backoffConfig) for { cfgs, err := s.poll() @@ -160,14 +159,14 @@ func (s *scheduler) updateConfigs(now time.Time) error { } // poll the configuration server. Not re-entrant. -func (s *scheduler) poll() (map[string]configs.View, error) { +func (s *scheduler) poll() (map[string]configs.VersionedRulesConfig, error) { s.Lock() configID := s.latestConfig s.Unlock() - var cfgs *configs_client.ConfigsResponse + var cfgs map[string]configs.VersionedRulesConfig err := instrument.TimeRequestHistogram(context.Background(), "Configs.GetConfigs", configsRequestDuration, func(_ context.Context) error { var err error - cfgs, err = s.configsAPI.GetConfigs(configID) + cfgs, err = s.rulesAPI.GetConfigs(configID) return err }) if err != nil { @@ -175,16 +174,16 @@ func (s *scheduler) poll() (map[string]configs.View, error) { return nil, err } s.Lock() - s.latestConfig = cfgs.GetLatestConfigID() + s.latestConfig = getLatestConfigID(cfgs, configID) s.Unlock() - return cfgs.Configs, nil + return cfgs, nil } -func (s *scheduler) addNewConfigs(now time.Time, cfgs map[string]configs.View) { +func (s *scheduler) addNewConfigs(now time.Time, cfgs map[string]configs.VersionedRulesConfig) { // TODO: instrument how many configs we have, both valid & invalid. level.Debug(util.Logger).Log("msg", "adding configurations", "num_configs", len(cfgs)) for userID, config := range cfgs { - rules, err := configs_client.RulesFromConfig(config.Config) + rules, err := config.Config.Parse() if err != nil { // XXX: This means that if a user has a working configuration and // they submit a broken one, we'll keep processing the last known