diff --git a/CHANGELOG.md b/CHANGELOG.md index d31b9d8746e..a33045728d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ * [ENHANCEMENT] Query-tee: Add `/api/v1/query_exemplars` API endpoint support. #5010 * [ENHANCEMENT] Query Frontend/Query Scheduler: Increase upper bound to 60s for queue duration histogram metric. #5029 * [ENHANCEMENT] Query Frontend: Log Vertical sharding information when `query_stats_enabled` is enabled. #5037 +* [ENHANCEMENT] Ingester: The metadata APIs should honour `querier.query-ingesters-within` when `querier.query-store-for-labels-enabled` is true. #5027 * [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978 * [FEATURE] Ingester: Add active series to all_user_stats page. #4972 * [FEATURE] Ingester: Added `-blocks-storage.tsdb.head-chunks-write-queue-size` allowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index af2edea1cbf..1e84886d514 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -131,8 +131,9 @@ querier: # CLI flag: -querier.query-ingesters-within [query_ingesters_within: | default = 0s] - # Query long-term store for series, label values and label names APIs. Works - # only with blocks engine. + # Deprecated (Querying long-term store for labels will be always enabled in + # the future.): Query long-term store for series, label values and label names + # APIs. # CLI flag: -querier.query-store-for-labels-enabled [query_store_for_labels_enabled: | default = false] diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 926edb10424..705c84ad305 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -810,8 +810,8 @@ The `querier_config` configures the Cortex querier. # CLI flag: -querier.query-ingesters-within [query_ingesters_within: | default = 0s] -# Query long-term store for series, label values and label names APIs. Works -# only with blocks engine. +# Deprecated (Querying long-term store for labels will be always enabled in the +# future.): Query long-term store for series, label values and label names APIs. # CLI flag: -querier.query-store-for-labels-enabled [query_store_for_labels_enabled: | default = false] diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index d05ed1083dc..c8560f7e3b7 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -393,6 +393,8 @@ func (t *Cortex) initIngesterService() (serv services.Service, err error) { t.Cfg.Ingester.DistributorShardingStrategy = t.Cfg.Distributor.ShardingStrategy t.Cfg.Ingester.DistributorShardByAllLabels = t.Cfg.Distributor.ShardByAllLabels t.Cfg.Ingester.InstanceLimitsFn = ingesterInstanceLimits(t.RuntimeConfig) + t.Cfg.Ingester.QueryStoreForLabels = t.Cfg.Querier.QueryStoreForLabels + t.Cfg.Ingester.QueryIngestersWithin = t.Cfg.Querier.QueryIngestersWithin t.tsdbIngesterConfig() t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Overrides, prometheus.DefaultRegisterer, util_log.Logger) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 9f776bd93bc..b8f8ac23456 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -106,6 +106,10 @@ type Config struct { DistributorShardingStrategy string `yaml:"-"` DistributorShardByAllLabels bool `yaml:"-"` + // Injected at runtime and read from querier config. + QueryStoreForLabels bool `yaml:"-"` + QueryIngestersWithin time.Duration `yaml:"-"` + DefaultLimits InstanceLimits `yaml:"instance_limits"` InstanceLimitsFn func() *InstanceLimits `yaml:"-"` @@ -1303,7 +1307,7 @@ func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesReque return &client.LabelValuesResponse{}, nil } - mint, maxt, err := metadataQueryRange(startTimestampMs, endTimestampMs, db) + mint, maxt, err := metadataQueryRange(startTimestampMs, endTimestampMs, db, i.cfg.QueryStoreForLabels, i.cfg.QueryIngestersWithin) if err != nil { return nil, err } @@ -1364,7 +1368,7 @@ func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest return &client.LabelNamesResponse{}, nil } - mint, maxt, err := metadataQueryRange(req.StartTimestampMs, req.EndTimestampMs, db) + mint, maxt, err := metadataQueryRange(req.StartTimestampMs, req.EndTimestampMs, db, i.cfg.QueryStoreForLabels, i.cfg.QueryIngestersWithin) if err != nil { return nil, err } @@ -1432,7 +1436,7 @@ func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.Metr return nil, err } - mint, maxt, err := metadataQueryRange(req.StartTimestampMs, req.EndTimestampMs, db) + mint, maxt, err := metadataQueryRange(req.StartTimestampMs, req.EndTimestampMs, db, i.cfg.QueryStoreForLabels, i.cfg.QueryIngestersWithin) if err != nil { return nil, err } @@ -2564,7 +2568,13 @@ func (i *Ingester) flushHandler(w http.ResponseWriter, r *http.Request) { } // metadataQueryRange returns the best range to query for metadata queries based on the timerange in the ingester. -func metadataQueryRange(queryStart, queryEnd int64, db *userTSDB) (mint, maxt int64, err error) { +func metadataQueryRange(queryStart, queryEnd int64, db *userTSDB, queryStoreForLabels bool, queryIngestersWithin time.Duration) (mint, maxt int64, err error) { + if queryIngestersWithin > 0 && queryStoreForLabels { + // If the feature for querying metadata from store-gateway is enabled, + // then we don't want to manipulate the mint and maxt. + return + } + // Ingesters are run with limited retention and we don't support querying the store-gateway for labels yet. // This means if someone loads a dashboard that is outside the range of the ingester, and we only return the // data for the timerange requested (which will be empty), the dashboards will break. To fix this we should diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 748bd194c0f..8ae139d9a98 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -1733,10 +1733,12 @@ func Test_Ingester_MetricsForLabelMatchers(t *testing.T) { } tests := map[string]struct { - from int64 - to int64 - matchers []*client.LabelMatchers - expected []*cortexpb.Metric + from int64 + to int64 + matchers []*client.LabelMatchers + expected []*cortexpb.Metric + queryStoreForLabels bool + queryIngestersWithin time.Duration }{ "should return an empty response if no metric match": { from: math.MinInt64, @@ -1794,6 +1796,18 @@ func Test_Ingester_MetricsForLabelMatchers(t *testing.T) { {Labels: cortexpb.FromLabelsToLabelAdapters(fixtures[1].lbls)}, }, }, + "should filter metrics by time range if queryStoreForLabels and queryIngestersWithin is enabled": { + from: 100, + to: 1000, + matchers: []*client.LabelMatchers{{ + Matchers: []*client.LabelMatcher{ + {Type: client.EQUAL, Name: model.MetricNameLabel, Value: "test_1"}, + }, + }}, + expected: []*cortexpb.Metric{}, + queryStoreForLabels: true, + queryIngestersWithin: time.Hour, + }, "should not return duplicated metrics on overlapping matchers": { from: math.MinInt64, to: math.MaxInt64, @@ -1860,7 +1874,8 @@ func Test_Ingester_MetricsForLabelMatchers(t *testing.T) { EndTimestampMs: testData.to, MatchersSet: testData.matchers, } - + i.cfg.QueryStoreForLabels = testData.queryStoreForLabels + i.cfg.QueryIngestersWithin = testData.queryIngestersWithin res, err := i.MetricsForLabelMatchers(ctx, req) require.NoError(t, err) assert.ElementsMatch(t, testData.expected, res.Metric) diff --git a/pkg/querier/distributor_queryable.go b/pkg/querier/distributor_queryable.go index 1d89dd064e3..ae3468cc65c 100644 --- a/pkg/querier/distributor_queryable.go +++ b/pkg/querier/distributor_queryable.go @@ -37,13 +37,14 @@ type Distributor interface { MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) } -func newDistributorQueryable(distributor Distributor, streaming bool, streamingMetdata bool, iteratorFn chunkIteratorFunc, queryIngestersWithin time.Duration) QueryableWithFilter { +func newDistributorQueryable(distributor Distributor, streaming bool, streamingMetdata bool, iteratorFn chunkIteratorFunc, queryIngestersWithin time.Duration, queryStoreForLabels bool) QueryableWithFilter { return distributorQueryable{ distributor: distributor, streaming: streaming, streamingMetdata: streamingMetdata, iteratorFn: iteratorFn, queryIngestersWithin: queryIngestersWithin, + queryStoreForLabels: queryStoreForLabels, } } @@ -53,6 +54,7 @@ type distributorQueryable struct { streamingMetdata bool iteratorFn chunkIteratorFunc queryIngestersWithin time.Duration + queryStoreForLabels bool } func (d distributorQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { @@ -65,6 +67,7 @@ func (d distributorQueryable) Querier(ctx context.Context, mint, maxt int64) (st streamingMetadata: d.streamingMetdata, chunkIterFn: d.iteratorFn, queryIngestersWithin: d.queryIngestersWithin, + queryStoreForLabels: d.queryStoreForLabels, }, nil } @@ -81,6 +84,7 @@ type distributorQuerier struct { streamingMetadata bool chunkIterFn chunkIteratorFunc queryIngestersWithin time.Duration + queryStoreForLabels bool } // Select implements storage.Querier interface. @@ -95,35 +99,18 @@ func (q *distributorQuerier) Select(sortSeries bool, sp *storage.SelectHints, ma } // If the querier receives a 'series' query, it means only metadata is needed. - // For this specific case we shouldn't apply the queryIngestersWithin - // time range manipulation, otherwise we'll end up returning no series at all for + // For the specific case where queryStoreForLabels is disabled + // we shouldn't apply the queryIngestersWithin time range manipulation. + // Otherwise we'll end up returning no series at all for // older time ranges (while in Cortex we do ignore the start/end and always return // series in ingesters). - // Also, in the recent versions of Prometheus, we pass in the hint but with Func set to "series". - // See: https://github.com/prometheus/prometheus/pull/8050 - if sp != nil && sp.Func == "series" { - var ( - ms []metric.Metric - err error - ) - - if q.streamingMetadata { - ms, err = q.distributor.MetricsForLabelMatchersStream(ctx, model.Time(minT), model.Time(maxT), matchers...) - } else { - ms, err = q.distributor.MetricsForLabelMatchers(ctx, model.Time(minT), model.Time(maxT), matchers...) - } - - if err != nil { - return storage.ErrSeriesSet(err) - } - return series.MetricsToSeriesSet(sortSeries, ms) - } + shouldNotQueryStoreForMetadata := (sp != nil && sp.Func == "series" && !q.queryStoreForLabels) // If queryIngestersWithin is enabled, we do manipulate the query mint to query samples up until // now - queryIngestersWithin, because older time ranges are covered by the storage. This // optimization is particularly important for the blocks storage where the blocks retention in the // ingesters could be way higher than queryIngestersWithin. - if q.queryIngestersWithin > 0 { + if q.queryIngestersWithin > 0 && !shouldNotQueryStoreForMetadata { now := time.Now() origMinT := minT minT = math.Max64(minT, util.TimeToMillis(now.Add(-q.queryIngestersWithin))) @@ -138,6 +125,26 @@ func (q *distributorQuerier) Select(sortSeries bool, sp *storage.SelectHints, ma } } + // In the recent versions of Prometheus, we pass in the hint but with Func set to "series". + // See: https://github.com/prometheus/prometheus/pull/8050 + if sp != nil && sp.Func == "series" { + var ( + ms []metric.Metric + err error + ) + + if q.streamingMetadata { + ms, err = q.distributor.MetricsForLabelMatchersStream(ctx, model.Time(minT), model.Time(maxT), matchers...) + } else { + ms, err = q.distributor.MetricsForLabelMatchers(ctx, model.Time(minT), model.Time(maxT), matchers...) + } + + if err != nil { + return storage.ErrSeriesSet(err) + } + return series.MetricsToSeriesSet(sortSeries, ms) + } + if q.streaming { return q.streamingSelect(ctx, sortSeries, minT, maxT, matchers) } diff --git a/pkg/querier/distributor_queryable_test.go b/pkg/querier/distributor_queryable_test.go index 1481d719825..410de35421c 100644 --- a/pkg/querier/distributor_queryable_test.go +++ b/pkg/querier/distributor_queryable_test.go @@ -46,7 +46,7 @@ func TestDistributorQuerier(t *testing.T) { }, nil) - queryable := newDistributorQueryable(d, false, false, nil, 0) + queryable := newDistributorQueryable(d, false, false, nil, 0, false) querier, err := queryable.Querier(context.Background(), mint, maxt) require.NoError(t, err) @@ -70,6 +70,7 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T) tests := map[string]struct { querySeries bool + queryStoreForLabels bool queryIngestersWithin time.Duration queryMinT int64 queryMaxT int64 @@ -112,6 +113,15 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T) expectedMinT: util.TimeToMillis(now.Add(-100 * time.Minute)), expectedMaxT: util.TimeToMillis(now.Add(-90 * time.Minute)), }, + "should manipulate query time range if queryIngestersWithin is enabled and queryStoreForLabels is enabled": { + querySeries: true, + queryStoreForLabels: true, + queryIngestersWithin: time.Hour, + queryMinT: util.TimeToMillis(now.Add(-100 * time.Minute)), + queryMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)), + expectedMinT: util.TimeToMillis(now.Add(-60 * time.Minute)), + expectedMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)), + }, } for _, streamingEnabled := range []bool{false, true} { @@ -124,7 +134,7 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T) distributor.On("MetricsForLabelMatchersStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]metric.Metric{}, nil) ctx := user.InjectOrgID(context.Background(), "test") - queryable := newDistributorQueryable(distributor, streamingEnabled, streamingEnabled, nil, testData.queryIngestersWithin) + queryable := newDistributorQueryable(distributor, streamingEnabled, streamingEnabled, nil, testData.queryIngestersWithin, testData.queryStoreForLabels) querier, err := queryable.Querier(ctx, testData.queryMinT, testData.queryMaxT) require.NoError(t, err) @@ -161,7 +171,7 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T) func TestDistributorQueryableFilter(t *testing.T) { d := &MockDistributor{} - dq := newDistributorQueryable(d, false, false, nil, 1*time.Hour) + dq := newDistributorQueryable(d, false, false, nil, 1*time.Hour, true) now := time.Now() @@ -209,7 +219,7 @@ func TestIngesterStreaming(t *testing.T) { nil) ctx := user.InjectOrgID(context.Background(), "0") - queryable := newDistributorQueryable(d, true, true, mergeChunks, 0) + queryable := newDistributorQueryable(d, true, true, mergeChunks, 0, true) querier, err := queryable.Querier(ctx, mint, maxt) require.NoError(t, err) @@ -285,7 +295,7 @@ func TestIngesterStreamingMixedResults(t *testing.T) { nil) ctx := user.InjectOrgID(context.Background(), "0") - queryable := newDistributorQueryable(d, true, true, mergeChunks, 0) + queryable := newDistributorQueryable(d, true, true, mergeChunks, 0, true) querier, err := queryable.Querier(ctx, mint, maxt) require.NoError(t, err) @@ -336,7 +346,7 @@ func TestDistributorQuerier_LabelNames(t *testing.T) { d.On("MetricsForLabelMatchersStream", mock.Anything, model.Time(mint), model.Time(maxt), someMatchers). Return(metrics, nil) - queryable := newDistributorQueryable(d, false, streamingEnabled, nil, 0) + queryable := newDistributorQueryable(d, false, streamingEnabled, nil, 0, true) querier, err := queryable.Querier(context.Background(), mint, maxt) require.NoError(t, err) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 705bec628db..3e4ceb76e3e 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -96,7 +96,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.IngesterMetadataStreaming, "querier.ingester-metadata-streaming", false, "Use streaming RPCs for metadata APIs from ingester.") f.IntVar(&cfg.MaxSamples, "querier.max-samples", 50e6, "Maximum number of samples a single query can load into memory.") f.DurationVar(&cfg.QueryIngestersWithin, "querier.query-ingesters-within", 0, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.") - f.BoolVar(&cfg.QueryStoreForLabels, "querier.query-store-for-labels-enabled", false, "Query long-term store for series, label values and label names APIs. Works only with blocks engine.") + f.BoolVar(&cfg.QueryStoreForLabels, "querier.query-store-for-labels-enabled", false, "Deprecated (Querying long-term store for labels will be always enabled in the future.): Query long-term store for series, label values and label names APIs.") f.BoolVar(&cfg.EnablePerStepStats, "querier.per-step-stats-enabled", false, "Enable returning samples stats per steps in query response.") f.DurationVar(&cfg.MaxQueryIntoFuture, "querier.max-query-into-future", 10*time.Minute, "Maximum duration into the future you can query. 0 to disable.") f.DurationVar(&cfg.DefaultEvaluationInterval, "querier.default-evaluation-interval", time.Minute, "The default evaluation interval or step size for subqueries.") @@ -146,7 +146,7 @@ func getChunksIteratorFunction(cfg Config) chunkIteratorFunc { func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, tombstonesLoader purger.TombstonesLoader, reg prometheus.Registerer, logger log.Logger) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, v1.QueryEngine) { iteratorFunc := getChunksIteratorFunction(cfg) - distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterStreaming, cfg.IngesterMetadataStreaming, iteratorFunc, cfg.QueryIngestersWithin) + distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterStreaming, cfg.IngesterMetadataStreaming, iteratorFunc, cfg.QueryIngestersWithin, cfg.QueryStoreForLabels) ns := make([]QueryableWithFilter, len(stores)) for ix, s := range stores { diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index b8cfe922570..24f57950547 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -213,8 +213,8 @@ func TestShouldSortSeriesIfQueryingMultipleQueryables(t *testing.T) { distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&unorderedResponse, nil) distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(unorderedResponseMatrix, nil) - distributorQueryableStreaming := newDistributorQueryable(distributor, true, cfg.IngesterMetadataStreaming, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin) - distributorQueryable := newDistributorQueryable(distributor, false, cfg.IngesterMetadataStreaming, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin) + distributorQueryableStreaming := newDistributorQueryable(distributor, true, cfg.IngesterMetadataStreaming, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, cfg.QueryStoreForLabels) + distributorQueryable := newDistributorQueryable(distributor, false, cfg.IngesterMetadataStreaming, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, cfg.QueryStoreForLabels) tCases := []struct { name string