Skip to content
Merged
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ shell: build-image/$(UPTODATE)
bash

configs-integration-test:
/bin/bash -c "go test -tags 'netgo integration' -timeout 30s ./pkg/configs/..."
/bin/bash -c "go test -tags 'netgo integration' -timeout 30s ./pkg/configs/... ./pkg/ruler/..."

endif

Expand Down
24 changes: 21 additions & 3 deletions cmd/lite/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func main() {
chunkStoreConfig chunk.StoreConfig
distributorConfig distributor.Config
ingesterConfig ingester.Config
configStoreConfig ruler.ConfigStoreConfig
rulerConfig ruler.Config
schemaConfig chunk.SchemaConfig
storageConfig storage.Config
Expand All @@ -52,7 +53,7 @@ func main() {
// Ingester needs to know our gRPC listen port.
ingesterConfig.ListenPort = &serverConfig.GRPCListenPort
util.RegisterFlags(&serverConfig, &chunkStoreConfig, &distributorConfig,
&ingesterConfig, &rulerConfig, &storageConfig, &schemaConfig, &logLevel)
&ingesterConfig, &configStoreConfig, &rulerConfig, &storageConfig, &schemaConfig, &logLevel)
flag.BoolVar(&unauthenticated, "unauthenticated", false, "Set to true to disable multitenancy.")
flag.Parse()
schemaConfig.MaxChunkAge = ingesterConfig.MaxChunkAge
Expand Down Expand Up @@ -122,15 +123,20 @@ func main() {
tableManager.Start()
defer tableManager.Stop()

if rulerConfig.ConfigsAPIURL.String() != "" {
if configStoreConfig.ConfigsAPIURL.String() != "" || configStoreConfig.DBConfig.URI != "" {
rulesAPI, err := ruler.NewRulesAPI(configStoreConfig)
if err != nil {
level.Error(util.Logger).Log("msg", "error initializing ruler config store", "err", err)
os.Exit(1)
}
rlr, err := ruler.NewRuler(rulerConfig, dist, chunkStore)
if err != nil {
level.Error(util.Logger).Log("msg", "error initializing ruler", "err", err)
os.Exit(1)
}
defer rlr.Stop()

rulerServer, err := ruler.NewServer(rulerConfig, rlr)
rulerServer, err := ruler.NewServer(rulerConfig, rlr, rulesAPI)
if err != nil {
level.Error(util.Logger).Log("msg", "error initializing ruler server", "err", err)
os.Exit(1)
Expand Down Expand Up @@ -163,6 +169,18 @@ func main() {
})
}

// Only serve the API for setting & getting rules configs if the database
// was provided. Allows for smoother migration. See
// https://github.com/weaveworks/cortex/issues/619
if configStoreConfig.DBConfig.URI != "" {
a, err := ruler.NewAPIFromConfig(configStoreConfig.DBConfig)
if err != nil {
level.Error(util.Logger).Log("msg", "error initializing public rules API", "err", err)
os.Exit(1)
}
a.RegisterRoutes(server.HTTP)
}

subrouter := server.HTTP.PathPrefix("/api/prom").Subrouter()
subrouter.PathPrefix("/api/v1").Handler(activeMiddleware.Wrap(promRouter))
subrouter.Path("/read").Handler(activeMiddleware.Wrap(http.HandlerFunc(sampleQueryable.RemoteReadHandler)))
Expand Down
23 changes: 21 additions & 2 deletions cmd/ruler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ func main() {
chunkStoreConfig chunk.StoreConfig
schemaConfig chunk.SchemaConfig
storageConfig storage.Config
configStoreConfig ruler.ConfigStoreConfig
logLevel util.LogLevel
)
util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig,
&rulerConfig, &chunkStoreConfig, &storageConfig, &schemaConfig, &logLevel)
&rulerConfig, &chunkStoreConfig, &storageConfig, &schemaConfig, &configStoreConfig, &logLevel)
flag.Parse()

util.InitLogger(logLevel.AllowedLevel)
Expand Down Expand Up @@ -75,7 +76,13 @@ func main() {
}
defer rlr.Stop()

rulerServer, err := ruler.NewServer(rulerConfig, rlr)
rulesAPI, err := ruler.NewRulesAPI(configStoreConfig)
if err != nil {
level.Error(util.Logger).Log("msg", "error initializing rules API", "err", err)
os.Exit(1)
}

rulerServer, err := ruler.NewServer(rulerConfig, rlr, rulesAPI)
if err != nil {
level.Error(util.Logger).Log("msg", "error initializing ruler server: %v", err)
os.Exit(1)
Expand All @@ -89,6 +96,18 @@ func main() {
}
defer server.Shutdown()

// Only serve the API for setting & getting rules configs if the database
// was provided. Allows for smoother migration. See
// https://github.com/weaveworks/cortex/issues/619
if configStoreConfig.DBConfig.URI != "" {
a, err := ruler.NewAPIFromConfig(configStoreConfig.DBConfig)
if err != nil {
level.Error(util.Logger).Log("msg", "error initializing public rules API", "err", err)
os.Exit(1)
}
a.RegisterRoutes(server.HTTP)
}

server.HTTP.Handle("/ring", r)
server.Run()
}
22 changes: 3 additions & 19 deletions pkg/configs/client/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ func AlertmanagerConfigFromConfig(c configs.Config) (*config.Config, error) {
return cfg, nil
}

func getConfigs(endpoint string, timeout time.Duration, since configs.ID) (*ConfigsResponse, error) {
// GetConfigs gets configurations from the configs server.
func GetConfigs(endpoint string, timeout time.Duration, since configs.ID) (*ConfigsResponse, error) {
req, err := http.NewRequest("GET", endpoint, nil)
if err != nil {
return nil, err
Expand Down Expand Up @@ -120,22 +121,5 @@ func (c *AlertManagerConfigsAPI) GetConfigs(since configs.ID) (*ConfigsResponse,
suffix = fmt.Sprintf("?since=%d", since)
}
endpoint := fmt.Sprintf("%s/private/api/prom/configs/alertmanager%s", c.URL.String(), suffix)
return getConfigs(endpoint, c.Timeout, since)
}

// RulesAPI allows retrieving recording and alerting rules.
type RulesAPI struct {
URL *url.URL
Timeout time.Duration
}

// GetConfigs returns all Cortex configurations from a configs API server
// that have been updated after the given configs.ID was last updated.
func (c *RulesAPI) GetConfigs(since configs.ID) (*ConfigsResponse, error) {
suffix := ""
if since != 0 {
suffix = fmt.Sprintf("?since=%d", since)
}
endpoint := fmt.Sprintf("%s/private/api/prom/configs/rules%s", c.URL.String(), suffix)
return getConfigs(endpoint, c.Timeout, since)
return GetConfigs(endpoint, c.Timeout, since)
}
79 changes: 77 additions & 2 deletions pkg/configs/configs.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
package configs

import (
"fmt"

"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/rules"
"github.com/weaveworks/cortex/pkg/util"
)

// An ID is the ID of a single users's Cortex configuration. When a
// configuration changes, it gets a new ID.
type ID int

// A Config is a Cortex configuration for a single user.
type Config struct {
// RulesFiles maps from a rules filename to file contents.
RulesFiles map[string]string `json:"rules_files"`
AlertmanagerConfig string `json:"alertmanager_config"`
RulesFiles RulesConfig `json:"rules_files"`
AlertmanagerConfig string `json:"alertmanager_config"`
}

// View is what's returned from the Weave Cloud configs service
Expand All @@ -21,3 +29,70 @@ type View struct {
ID ID `json:"id"`
Config Config `json:"config"`
}

// GetVersionedRulesConfig specializes the view to just the rules config.
func (v View) GetVersionedRulesConfig() *VersionedRulesConfig {
if v.Config.RulesFiles == nil {
return nil
}
return &VersionedRulesConfig{
ID: v.ID,
Config: v.Config.RulesFiles,
}
}

// RulesConfig are the set of rules files for a particular organization.
type RulesConfig map[string]string

// Equal compares two RulesConfigs for equality.
//
// instance Eq RulesConfig
func (c RulesConfig) Equal(o RulesConfig) bool {
if len(o) != len(c) {
return false
}
for k, v1 := range c {
v2, ok := o[k]
if !ok || v1 != v2 {
return false
}
}
return true
}

// Parse rules from the Cortex configuration.
//
// Strongly inspired by `loadGroups` in Prometheus.
func (c RulesConfig) Parse() ([]rules.Rule, error) {
result := []rules.Rule{}
for fn, content := range c {
stmts, err := promql.ParseStmts(content)
if err != nil {
return nil, fmt.Errorf("error parsing %s: %s", fn, err)
}

for _, stmt := range stmts {
var rule rules.Rule

switch r := stmt.(type) {
case *promql.AlertStmt:
rule = rules.NewAlertingRule(r.Name, r.Expr, r.Duration, r.Labels, r.Annotations, util.Logger)

case *promql.RecordStmt:
rule = rules.NewRecordingRule(r.Name, r.Expr, r.Labels)

default:
return nil, fmt.Errorf("ruler.GetRules: unknown statement type")
}
result = append(result, rule)
}
}
return result, nil
}

// VersionedRulesConfig is a RulesConfig together with a version.
// `data Versioned a = Versioned { id :: ID , config :: a }`
type VersionedRulesConfig struct {
ID ID `json:"id"`
Config RulesConfig `json:"config"`
}
27 changes: 27 additions & 0 deletions pkg/configs/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,26 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
flag.StringVar(&cfg.MigrationsDir, "database.migrations", "", "Path where the database migration files can be found")
}

// RulesDB has ruler-specific DB interfaces.
type RulesDB interface {
// GetRulesConfig gets the user's ruler config
GetRulesConfig(userID string) (configs.VersionedRulesConfig, error)
// SetRulesConfig does a compare-and-swap (CAS) on the user's rules config.
// `oldConfig` must precisely match the current config in order to change the config to `newConfig`.
// Will return `true` if the config was updated, `false` otherwise.
SetRulesConfig(userID string, oldConfig, newConfig configs.RulesConfig) (bool, error)

// GetAllRulesConfigs gets all of the ruler configs
GetAllRulesConfigs() (map[string]configs.VersionedRulesConfig, error)
// GetRulesConfigs gets all of the configs that have been added or have
// changed since the provided config.
GetRulesConfigs(since configs.ID) (map[string]configs.VersionedRulesConfig, error)
}

// DB is the interface for the database.
type DB interface {
RulesDB

GetConfig(userID string) (configs.View, error)
SetConfig(userID string, cfg configs.Config) error

Expand Down Expand Up @@ -53,3 +71,12 @@ func New(cfg Config) (DB, error) {
}
return traced{timed{d}}, nil
}

// NewRulesDB creates a new rules config database.
func NewRulesDB(cfg Config) (RulesDB, error) {
db, err := New(cfg)
if err != nil {
return nil, err
}
return db, err
}
Loading