Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions integration/getting_started_single_process_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func TestGettingStartedSingleProcessConfigWithChunksStorage(t *testing.T) {
labelNames, err := c.LabelNames()
require.NoError(t, err)
require.Equal(t, []string{"__name__", "foo"}, labelNames)

// Check that a range query does not return an error to sanity check the queryrange tripperware.
_, err = c.QueryRange("series_1", now.Add(-15*time.Minute), now, 15*time.Second)
require.NoError(t, err)
}

func TestGettingStartedSingleProcessConfigWithBlocksStorage(t *testing.T) {
Expand Down Expand Up @@ -103,4 +107,8 @@ func TestGettingStartedSingleProcessConfigWithBlocksStorage(t *testing.T) {
labelNames, err := c.LabelNames()
require.NoError(t, err)
require.Equal(t, []string{"__name__", "foo"}, labelNames)

// Check that a range query does not return an error to sanity check the queryrange tripperware.
_, err = c.QueryRange("series_1", now.Add(-15*time.Minute), now, 15*time.Second)
require.NoError(t, err)
}
1 change: 1 addition & 0 deletions integration/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func runQueryFrontendTest(t *testing.T, testMissingMetricName bool, setup queryF
// requests to /metrics and /ready.
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_request_duration_seconds"}, e2e.WithMetricCount))
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_request_duration_seconds"}, e2e.WithMetricCount))
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_querier_request_duration_seconds"}, e2e.WithMetricCount))

// Ensure no service-specific metrics prefix is used by the wrong service.
assertServiceMetricsPrefixes(t, Distributor, distributor)
Expand Down
143 changes: 10 additions & 133 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,16 @@ package api

import (
"context"
"errors"
"flag"
"net/http"
"regexp"
"strings"
"time"

"github.com/opentracing-contrib/go-stdlib/nethttp"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"

"github.com/felixge/fgprof"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gorilla/mux"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/storage"
v1 "github.com/prometheus/prometheus/web/api/v1"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/server"

Expand Down Expand Up @@ -104,23 +93,17 @@ func New(cfg Config, serverCfg server.Config, s *server.Server, logger log.Logge
// RegisterRoute registers a single route enforcing HTTP methods. A single
// route is expected to be specific about which HTTP methods are supported.
func (a *API) RegisterRoute(path string, handler http.Handler, auth bool, method string, methods ...string) {
a.registerRouteWithRouter(a.server.HTTP, path, handler, auth, method, methods...)
}

// RegisterRoute registers a single route to a router, enforcing HTTP methods. A single
// route is expected to be specific about which HTTP methods are supported.
func (a *API) registerRouteWithRouter(router *mux.Router, path string, handler http.Handler, auth bool, method string, methods ...string) {
methods = append([]string{method}, methods...)

level.Debug(a.logger).Log("msg", "api: registering route", "methods", strings.Join(methods, ","), "path", path, "auth", auth)
if auth {
handler = a.authMiddleware.Wrap(handler)
}
if len(methods) == 0 {
router.Path(path).Handler(handler)
a.server.HTTP.Path(path).Handler(handler)
return
}
router.Path(path).Methods(methods...).Handler(handler)
a.server.HTTP.Path(path).Methods(methods...).Handler(handler)
}

func (a *API) RegisterRoutesWithPrefix(prefix string, handler http.Handler, auth bool, methods ...string) {
Expand All @@ -135,20 +118,6 @@ func (a *API) RegisterRoutesWithPrefix(prefix string, handler http.Handler, auth
a.server.HTTP.PathPrefix(prefix).Methods(methods...).Handler(handler)
}

// Latest Prometheus requires r.RemoteAddr to be set to addr:port, otherwise it reject the request.
// Requests to Querier sometimes doesn't have that (if they are fetched from Query-Frontend).
// Prometheus uses this when logging queries to QueryLogger, but Cortex doesn't call engine.SetQueryLogger to set one.
//
// Can be removed when (if) https://github.com/prometheus/prometheus/pull/6840 is merged.
func fakeRemoteAddr(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.RemoteAddr == "" {
r.RemoteAddr = "127.0.0.1:8888"
}
handler.ServeHTTP(w, r)
})
}

// RegisterAlertmanager registers endpoints associated with the alertmanager. It will only
// serve endpoints using the legacy http-prefix if it is not run as a single binary.
func (a *API) RegisterAlertmanager(am *alertmanager.MultitenantAlertmanager, target, apiEnabled bool) {
Expand Down Expand Up @@ -302,114 +271,22 @@ func (a *API) RegisterCompactor(c *compactor.Compactor) {
a.RegisterRoute("/compactor/ring", http.HandlerFunc(c.RingHandler), false, "GET", "POST")
}

// RegisterQuerier registers the Prometheus routes supported by the
// Cortex querier service. Currently this can not be registered simultaneously
// with the QueryFrontend.
func (a *API) RegisterQuerier(
// RegisterQueryable registers the the default routes associated with the querier
// module.
func (a *API) RegisterQueryable(
queryable storage.SampleAndChunkQueryable,
engine *promql.Engine,
distributor *distributor.Distributor,
registerRoutesExternally bool,
tombstonesLoader *purger.TombstonesLoader,
querierRequestDuration *prometheus.HistogramVec,
receivedMessageSize *prometheus.HistogramVec,
sentMessageSize *prometheus.HistogramVec,
inflightRequests *prometheus.GaugeVec,
) http.Handler {
api := v1.NewAPI(
engine,
errorTranslateQueryable{queryable}, // Translate errors to errors expected by API.
func(context.Context) v1.TargetRetriever { return &querier.DummyTargetRetriever{} },
func(context.Context) v1.AlertmanagerRetriever { return &querier.DummyAlertmanagerRetriever{} },
func() config.Config { return config.Config{} },
map[string]string{}, // TODO: include configuration flags
v1.GlobalURLOptions{},
func(f http.HandlerFunc) http.HandlerFunc { return f },
nil, // Only needed for admin APIs.
"", // This is for snapshots, which is disabled when admin APIs are disabled. Hence empty.
false, // Disable admin APIs.
a.logger,
func(context.Context) v1.RulesRetriever { return &querier.DummyRulesRetriever{} },
0, 0, 0, // Remote read samples and concurrency limit.
regexp.MustCompile(".*"),
func() (v1.RuntimeInfo, error) { return v1.RuntimeInfo{}, errors.New("not implemented") },
&v1.PrometheusVersion{},
// This is used for the stats API which we should not support. Or find other ways to.
prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) { return nil, nil }),
)

) {
// these routes are always registered to the default server
a.RegisterRoute("/api/v1/user_stats", http.HandlerFunc(distributor.UserStatsHandler), true, "GET")
a.RegisterRoute("/api/v1/chunks", querier.ChunksHandler(queryable), true, "GET")

a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/user_stats", http.HandlerFunc(distributor.UserStatsHandler), true, "GET")
a.RegisterRoute(a.cfg.LegacyHTTPPrefix+"/chunks", querier.ChunksHandler(queryable), true, "GET")

// these routes are either registered the default server OR to an internal mux. The internal mux is
// for use in a single binary mode when both the query frontend and the querier would attempt to claim these routes
// TODO: Add support to expose querier paths with a configurable prefix in single binary mode.
router := mux.NewRouter()
if registerRoutesExternally {
router = a.server.HTTP
}

// Use a separate metric for the querier in order to differentiate requests from the query-frontend when
// running Cortex as a single binary.
inst := middleware.Instrument{
RouteMatcher: router,
Duration: querierRequestDuration,
RequestBodySize: receivedMessageSize,
ResponseBodySize: sentMessageSize,
InflightRequests: inflightRequests,
}

promRouter := route.New().WithPrefix(a.cfg.ServerPrefix + a.cfg.PrometheusHTTPPrefix + "/api/v1")
api.Register(promRouter)
cacheGenHeaderMiddleware := getHTTPCacheGenNumberHeaderSetterMiddleware(tombstonesLoader)
promHandler := fakeRemoteAddr(inst.Wrap(cacheGenHeaderMiddleware.Wrap(promRouter)))

a.registerRouteWithRouter(router, a.cfg.PrometheusHTTPPrefix+"/api/v1/read", querier.RemoteReadHandler(queryable), true, "POST")
a.registerRouteWithRouter(router, a.cfg.PrometheusHTTPPrefix+"/api/v1/query", promHandler, true, "GET", "POST")
a.registerRouteWithRouter(router, a.cfg.PrometheusHTTPPrefix+"/api/v1/query_range", promHandler, true, "GET", "POST")
a.registerRouteWithRouter(router, a.cfg.PrometheusHTTPPrefix+"/api/v1/labels", promHandler, true, "GET", "POST")
a.registerRouteWithRouter(router, a.cfg.PrometheusHTTPPrefix+"/api/v1/label/{name}/values", promHandler, true, "GET")
a.registerRouteWithRouter(router, a.cfg.PrometheusHTTPPrefix+"/api/v1/series", promHandler, true, "GET", "POST", "DELETE")
Comment on lines -371 to -376
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who register these now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These routes are registered with the two routers when we call api.Register on them. We end up having to register two routers because the upstream prometheus route library and registration function requires us to register separately for each path prefix.

Previously we explicitly registered every route in the Prometheus API to query handler. However, now we only explicitly register the metadata and read routes because they aren't handled by the upstream API struct. For the rest we register the prefix and since the promHandler and legacyPromHandler are themselves routers, the requests should be forwarded correctly.

Also I should note that the only way to hit these routes is by a request being routed to the internalQuerierHandler which can only happen if it hit's the routes registered in RegisterQueryAPI, so no new routes should have been exposed and no existing routes should have been removed. This PR just simplified the router that already was only acting as a middleman to the upstream Prometheus routers we already registered with the API struct.

//TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
// https://github.com/prometheus/prometheus/pull/7125/files
a.registerRouteWithRouter(router, a.cfg.PrometheusHTTPPrefix+"/api/v1/metadata", querier.MetadataHandler(distributor), true, "GET")

legacyPromRouter := route.New().WithPrefix(a.cfg.ServerPrefix + a.cfg.LegacyHTTPPrefix + "/api/v1")
api.Register(legacyPromRouter)
legacyPromHandler := fakeRemoteAddr(inst.Wrap(cacheGenHeaderMiddleware.Wrap(legacyPromRouter)))

a.registerRouteWithRouter(router, a.cfg.LegacyHTTPPrefix+"/api/v1/read", querier.RemoteReadHandler(queryable), true, "POST")
a.registerRouteWithRouter(router, a.cfg.LegacyHTTPPrefix+"/api/v1/query", legacyPromHandler, true, "GET", "POST")
a.registerRouteWithRouter(router, a.cfg.LegacyHTTPPrefix+"/api/v1/query_range", legacyPromHandler, true, "GET", "POST")
a.registerRouteWithRouter(router, a.cfg.LegacyHTTPPrefix+"/api/v1/labels", legacyPromHandler, true, "GET", "POST")
a.registerRouteWithRouter(router, a.cfg.LegacyHTTPPrefix+"/api/v1/label/{name}/values", legacyPromHandler, true, "GET")
a.registerRouteWithRouter(router, a.cfg.LegacyHTTPPrefix+"/api/v1/series", legacyPromHandler, true, "GET", "POST", "DELETE")
//TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
// https://github.com/prometheus/prometheus/pull/7125/files
a.registerRouteWithRouter(router, a.cfg.LegacyHTTPPrefix+"/api/v1/metadata", querier.MetadataHandler(distributor), true, "GET")

// if we have externally registered routes then we need to return the server handler
// so that we continue to use all standard middleware
if registerRoutesExternally {
return a.server.HTTPServer.Handler
}

// Since we have a new router and the request will not go trough the default server
// HTTP middleware stack, we need to add a middleware to extract the trace context
// from the HTTP headers and inject it into the Go context.
return nethttp.MiddlewareFunc(opentracing.GlobalTracer(), router.ServeHTTP, nethttp.OperationNameFunc(func(r *http.Request) string {
return "internalQuerier"
}))
}

// registerQueryAPI registers the Prometheus routes supported by the
// Cortex querier service. Currently this can not be registered simultaneously
// with the Querier.
func (a *API) registerQueryAPI(handler http.Handler) {
// RegisterQueryAPI registers the Prometheus API routes with the provided handler.
func (a *API) RegisterQueryAPI(handler http.Handler) {
a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/read", handler, true, "POST")
a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/query", handler, true, "GET", "POST")
a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/query_range", handler, true, "GET", "POST")
Expand All @@ -433,7 +310,7 @@ func (a *API) registerQueryAPI(handler http.Handler) {
// with the Querier.
func (a *API) RegisterQueryFrontend(f *frontend.Frontend) {
frontend.RegisterFrontendServer(a.server.GRPC, f)
a.registerQueryAPI(f.Handler())
a.RegisterQueryAPI(f.Handler())
}

// RegisterServiceMapHandler registers the Cortex structs service handler
Expand Down
123 changes: 123 additions & 0 deletions pkg/api/handlers.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,34 @@
package api

import (
"context"
"html/template"
"net/http"
"path"
"regexp"
"sync"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gorilla/mux"
"github.com/opentracing-contrib/go-stdlib/nethttp"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
v1 "github.com/prometheus/prometheus/web/api/v1"
"github.com/weaveworks/common/instrument"
"github.com/weaveworks/common/middleware"
"gopkg.in/yaml.v2"

"github.com/cortexproject/cortex/pkg/chunk/purger"
"github.com/cortexproject/cortex/pkg/distributor"
"github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/util"
)

Expand Down Expand Up @@ -109,3 +129,106 @@ func configHandler(cfg interface{}) http.HandlerFunc {
}
}
}

// NewQuerierHandler returns a HTTP handler that can be used by the querier service to
// either register with the frontend worker query processor or with the external HTTP
// server to fulfill the Prometheus query API.
func NewQuerierHandler(
cfg Config,
queryable storage.SampleAndChunkQueryable,
engine *promql.Engine,
distributor *distributor.Distributor,
tombstonesLoader *purger.TombstonesLoader,
reg prometheus.Registerer,
logger log.Logger,
) http.Handler {
// Prometheus histograms for requests to the querier.
querierRequestDuration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "querier_request_duration_seconds",
Help: "Time (in seconds) spent serving HTTP requests to the querier.",
Buckets: instrument.DefBuckets,
}, []string{"method", "route", "status_code", "ws"})

receivedMessageSize := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "querier_request_message_bytes",
Help: "Size (in bytes) of messages received in the request to the querier.",
Buckets: middleware.BodySizeBuckets,
}, []string{"method", "route"})

sentMessageSize := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "querier_response_message_bytes",
Help: "Size (in bytes) of messages sent in response by the querier.",
Buckets: middleware.BodySizeBuckets,
}, []string{"method", "route"})

inflightRequests := promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Namespace: "cortex",
Name: "querier_inflight_requests",
Help: "Current number of inflight requests to the querier.",
}, []string{"method", "route"})

api := v1.NewAPI(
engine,
errorTranslateQueryable{queryable}, // Translate errors to errors expected by API.
func(context.Context) v1.TargetRetriever { return &querier.DummyTargetRetriever{} },
func(context.Context) v1.AlertmanagerRetriever { return &querier.DummyAlertmanagerRetriever{} },
func() config.Config { return config.Config{} },
map[string]string{}, // TODO: include configuration flags
v1.GlobalURLOptions{},
func(f http.HandlerFunc) http.HandlerFunc { return f },
nil, // Only needed for admin APIs.
"", // This is for snapshots, which is disabled when admin APIs are disabled. Hence empty.
false, // Disable admin APIs.
logger,
func(context.Context) v1.RulesRetriever { return &querier.DummyRulesRetriever{} },
0, 0, 0, // Remote read samples and concurrency limit.
regexp.MustCompile(".*"),
func() (v1.RuntimeInfo, error) { return v1.RuntimeInfo{}, errors.New("not implemented") },
&v1.PrometheusVersion{},
// This is used for the stats API which we should not support. Or find other ways to.
prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) { return nil, nil }),
)

router := mux.NewRouter()

// Use a separate metric for the querier in order to differentiate requests from the query-frontend when
// running Cortex as a single binary.
inst := middleware.Instrument{
RouteMatcher: router,
Duration: querierRequestDuration,
RequestBodySize: receivedMessageSize,
ResponseBodySize: sentMessageSize,
InflightRequests: inflightRequests,
}
cacheGenHeaderMiddleware := getHTTPCacheGenNumberHeaderSetterMiddleware(tombstonesLoader)
middlewares := middleware.Merge(inst, cacheGenHeaderMiddleware)
router.Use(middlewares.Wrap)

promRouter := route.New().WithPrefix(cfg.ServerPrefix + cfg.PrometheusHTTPPrefix + "/api/v1")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I missing anything, or promRouter and legacyPromRouter don't have middlewares attached?

Copy link
Contributor Author

@jtlisi jtlisi Oct 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose for these routers is that they are compatible with the upstream Prometheus API struct route registration function. But we end up needing two of them because we register the routes on two separate path prefixes. Since we consolidate them both under another router that we instantiate in this function, it's easier to register the middleware with that router instead of individually with both of these routers.

We could wrap these two routers individually with the middleware as we did previously. However, that seems a bit redundant.

api.Register(promRouter)

legacyPromRouter := route.New().WithPrefix(cfg.ServerPrefix + cfg.LegacyHTTPPrefix + "/api/v1")
api.Register(legacyPromRouter)

//TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
// https://github.com/prometheus/prometheus/pull/7125/files
router.Path(cfg.PrometheusHTTPPrefix + "/api/v1/metadata").Handler(querier.MetadataHandler(distributor))
router.Path(cfg.PrometheusHTTPPrefix + "/api/v1/read").Handler(querier.RemoteReadHandler(queryable))
// A prefix is fine because external routes will be registered explicitly
router.PathPrefix(cfg.PrometheusHTTPPrefix + "/api/v1/").Handler(promRouter)

//TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
// https://github.com/prometheus/prometheus/pull/7125/files
router.Path(cfg.LegacyHTTPPrefix + "/api/v1/metadata").Handler(querier.MetadataHandler(distributor))
router.Path(cfg.LegacyHTTPPrefix + "/api/v1/read").Handler(querier.RemoteReadHandler(queryable))
// A prefix is fine because external routes will be registered explicitly
router.PathPrefix(cfg.LegacyHTTPPrefix + "/api/v1/").Handler(legacyPromRouter)

// Add a middleware to extract the trace context and add a header.
return nethttp.MiddlewareFunc(opentracing.GlobalTracer(), router.ServeHTTP, nethttp.OperationNameFunc(func(r *http.Request) string {
return "internalQuerier"
}))
}
Loading