Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions integration/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1375,3 +1375,78 @@ func TestQuerierEngineConfigs(t *testing.T) {
}

}

func TestQuerierDistributedExecution(t *testing.T) {
// e2e test setup
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// initialize the flags
flags := mergeFlags(
BlocksStorageFlags(),
map[string]string{
"-blocks-storage.tsdb.block-ranges-period": (5 * time.Second).String(),
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.tsdb.retention-period": ((5 * time.Second * 2) - 1).String(),
"-querier.thanos-engine": "true",
// enable distributed execution (logical plan execution)
"-querier.distributed-exec-enabled": "true",
},
)

minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
consul := e2edb.NewConsul()
require.NoError(t, s.StartAndWaitReady(consul, minio))

// start services
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
queryScheduler := e2ecortex.NewQueryScheduler("query-scheduler", flags, "")
storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(queryScheduler, distributor, ingester, storeGateway))
flags = mergeFlags(flags, map[string]string{
"-querier.store-gateway-addresses": strings.Join([]string{storeGateway.NetworkGRPCEndpoint()}, ","),
})

queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", mergeFlags(flags, map[string]string{
"-frontend.scheduler-address": queryScheduler.NetworkGRPCEndpoint(),
}), "")
require.NoError(t, s.Start(queryFrontend))

querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
"-querier.scheduler-address": queryScheduler.NetworkGRPCEndpoint(),
}), "")
require.NoError(t, s.StartAndWaitReady(querier))

// wait until the distributor and querier has updated the ring.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*512), "cortex_ring_tokens_total"))

c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

series1Timestamp := time.Now()
series2Timestamp := series1Timestamp.Add(time.Minute * 1)
series1, expectedVector1 := generateSeries("series_1", series1Timestamp, prompb.Label{Name: "series_1", Value: "series_1"})
series2, expectedVector2 := generateSeries("series_2", series2Timestamp, prompb.Label{Name: "series_2", Value: "series_2"})

res, err := c.Push(series1)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

res, err = c.Push(series2)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

// main tests
// - make sure queries are still executable with distributed execution enabled
var val model.Value
val, err = c.Query("series_1", series1Timestamp)
require.NoError(t, err)
require.Equal(t, expectedVector1, val.(model.Vector))

val, err = c.Query("series_2", series2Timestamp)
require.NoError(t, err)
require.Equal(t, expectedVector2, val.(model.Vector))
}
4 changes: 2 additions & 2 deletions pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ import (
"github.com/prometheus/common/route"
"github.com/prometheus/common/version"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
v1 "github.com/prometheus/prometheus/web/api/v1"
"github.com/weaveworks/common/instrument"
"github.com/weaveworks/common/middleware"

"github.com/cortexproject/cortex/pkg/api/queryapi"
"github.com/cortexproject/cortex/pkg/engine"
"github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/querier/codec"
"github.com/cortexproject/cortex/pkg/querier/stats"
Expand Down Expand Up @@ -164,7 +164,7 @@ func NewQuerierHandler(
querierCfg querier.Config,
queryable storage.SampleAndChunkQueryable,
exemplarQueryable storage.ExemplarQueryable,
engine promql.QueryEngine,
engine engine.QueryEngine,
metadataQuerier querier.MetadataQuerier,
reg prometheus.Registerer,
logger log.Logger,
Expand Down
52 changes: 44 additions & 8 deletions pkg/api/queryapi/query_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/prometheus/prometheus/util/annotations"
"github.com/prometheus/prometheus/util/httputil"
v1 "github.com/prometheus/prometheus/web/api/v1"
"github.com/thanos-io/promql-engine/logicalplan"
"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/engine"
Expand All @@ -26,7 +27,7 @@ import (

type QueryAPI struct {
queryable storage.SampleAndChunkQueryable
queryEngine promql.QueryEngine
queryEngine engine.QueryEngine
now func() time.Time
statsRenderer v1.StatsRenderer
logger log.Logger
Expand All @@ -35,7 +36,7 @@ type QueryAPI struct {
}

func NewQueryAPI(
qe promql.QueryEngine,
qe engine.QueryEngine,
q storage.SampleAndChunkQueryable,
statsRenderer v1.StatsRenderer,
logger log.Logger,
Expand Down Expand Up @@ -101,10 +102,29 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {

ctx = engine.AddEngineTypeToContext(ctx, r)
ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader))
qry, err := q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step))
if err != nil {
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")

var qry promql.Query
startTime := convertMsToTime(start)
endTime := convertMsToTime(end)
stepDuration := convertMsToDuration(step)

byteLP := []byte(r.PostFormValue("plan"))
if len(byteLP) != 0 {
logicalPlan, err := logicalplan.Unmarshal(byteLP)
if err != nil {
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("invalid logical plan: %v", err)}, nil, nil}
}
qry, err = q.queryEngine.MakeRangeQueryFromPlan(ctx, q.queryable, opts, logicalPlan, startTime, endTime, stepDuration, r.FormValue("query"))
if err != nil {
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("failed to create range query from logical plan: %v", err)}, nil, nil}
}
} else { // if there is logical plan field is empty, fall back
qry, err = q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), startTime, endTime, stepDuration)
if err != nil {
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
}
}

// From now on, we must only return with a finalizer in the result (to
// be called by the caller) or call qry.Close ourselves (which is
// required in the case of a panic).
Expand Down Expand Up @@ -157,9 +177,25 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {

ctx = engine.AddEngineTypeToContext(ctx, r)
ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader))
qry, err := q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(ts))
if err != nil {
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")

var qry promql.Query
tsTime := convertMsToTime(ts)

byteLP := []byte(r.PostFormValue("plan"))
if len(byteLP) != 0 {
logicalPlan, err := logicalplan.Unmarshal(byteLP)
if err != nil {
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("invalid logical plan: %v", err)}, nil, nil}
}
qry, err = q.queryEngine.MakeInstantQueryFromPlan(ctx, q.queryable, opts, logicalPlan, tsTime, r.FormValue("query"))
if err != nil {
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("failed to create instant query from logical plan: %v", err)}, nil, nil}
}
} else { // if there is logical plan field is empty, fall back
qry, err = q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), tsTime)
if err != nil {
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
}
}

// From now on, we must only return with a finalizer in the result (to
Expand Down
Loading
Loading