From cc8dc0ddca810d0be5189f9196b5124eeda40523 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 14 Nov 2022 22:49:58 -0800 Subject: [PATCH] update thanos to bring sharding support for label manipulation functions Signed-off-by: Ben Ye --- go.mod | 2 +- go.sum | 4 +- .../tripperware/test_shard_by_query_utils.go | 57 ++++++++++++- .../thanos/pkg/cacheutil/redis_client.go | 3 +- .../thanos/pkg/querysharding/analyzer.go | 80 +++++++++++++------ vendor/modules.txt | 2 +- 6 files changed, 115 insertions(+), 33 deletions(-) diff --git a/go.mod b/go.mod index 468dba1bf3d..cf07fe20559 100644 --- a/go.mod +++ b/go.mod @@ -48,7 +48,7 @@ require ( github.com/spf13/afero v1.6.0 github.com/stretchr/testify v1.8.0 github.com/thanos-io/objstore v0.0.0-20221006135717-79dcec7fe604 - github.com/thanos-io/thanos v0.29.1-0.20221111094505-b2badad930d6 + github.com/thanos-io/thanos v0.29.1-0.20221115064008-fe45cfc66b7d github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/weaveworks/common v0.0.0-20220706100410-67d27ed40fae go.etcd.io/etcd/api/v3 v3.5.4 diff --git a/go.sum b/go.sum index 700ca3adf0e..5e233aeff6f 100644 --- a/go.sum +++ b/go.sum @@ -927,8 +927,8 @@ github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1 github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM= github.com/thanos-io/objstore v0.0.0-20221006135717-79dcec7fe604 h1:9dceDSKKsLWNHjrMpyzK1t7eVcAZv9Dp3FX+uokUS2Y= github.com/thanos-io/objstore v0.0.0-20221006135717-79dcec7fe604/go.mod h1:Vx5dZs9ElxEhNLnum/OgB0pNTqNdI2zdXL82BeJr3T4= -github.com/thanos-io/thanos v0.29.1-0.20221111094505-b2badad930d6 h1:8q0LB3XOhscrqEYU0g5+ekB1ZHERk8tUl1l/y9DbeXA= -github.com/thanos-io/thanos v0.29.1-0.20221111094505-b2badad930d6/go.mod h1:odqdxSO+o/UaVgNpdkYYaQUW/JpT7LByXyZmxoe6uoc= +github.com/thanos-io/thanos v0.29.1-0.20221115064008-fe45cfc66b7d h1:cfGwZH4LBrkFbQHea7HUlNDOQhILGBRPxDWjy4FhAME= +github.com/thanos-io/thanos v0.29.1-0.20221115064008-fe45cfc66b7d/go.mod h1:odqdxSO+o/UaVgNpdkYYaQUW/JpT7LByXyZmxoe6uoc= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY= github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= diff --git a/pkg/querier/tripperware/test_shard_by_query_utils.go b/pkg/querier/tripperware/test_shard_by_query_utils.go index 0564cf22b81..f673fc8e4e6 100644 --- a/pkg/querier/tripperware/test_shard_by_query_utils.go +++ b/pkg/querier/tripperware/test_shard_by_query_utils.go @@ -67,10 +67,6 @@ func TestQueryShardQuery(t *testing.T, instantQueryCodec Codec, shardedPrometheu name: "binary aggregation with different grouping labels", expression: `sum by (pod) (http_requests_total{code="400"}) / sum by (cluster) (http_requests_total)`, }, - { - name: "binary expression with vector matching and label_replace", - expression: `http_requests_total{code="400"} / on (pod) label_replace(metric, "dst_label", "$1", "src_label", "re")`, - }, { name: "multiple binary expressions", expression: `(http_requests_total{code="400"} + http_requests_total{code="500"}) / http_requests_total`, @@ -86,6 +82,14 @@ http_requests_total`, name: "problematic query", expression: `sum(a by(lanel)`, }, + { + name: "aggregate by expression with label_replace, sharding label is dynamic", + expression: `sum by (dst_label) (label_replace(metric, "dst_label", "$1", "src_label", "re"))`, + }, + { + name: "aggregate by expression with label_join, sharding label is dynamic", + expression: `sum by (dst_label) (label_join(metric, "dst_label", ",", "src_label"))`, + }, } shardableByLabels := []queries{ @@ -147,6 +151,36 @@ sum by (container) ( expression: "histogram_quantile(0.95, sum(rate(metric[1m])) by (le, cluster))", shardingLabels: []string{"cluster"}, }, + { + name: "aggregate by expression with label_replace, sharding label is not dynamic", + expression: `sum by (pod) (label_replace(metric, "dst_label", "$1", "src_label", "re"))`, + shardingLabels: []string{"pod"}, + }, + { + name: "aggregate by expression with label_join, sharding label is not dynamic", + expression: `sum by (pod) (label_join(metric, "dst_label", ",", "src_label"))`, + shardingLabels: []string{"pod"}, + }, + { + name: "label_join and aggregation on multiple labels. Can be sharded by the static one", + expression: `sum by (pod, dst_label) (label_join(metric, "dst_label", ",", "src_label"))`, + shardingLabels: []string{"pod"}, + }, + { + name: "binary expression with vector matching and label_replace", + expression: `http_requests_total{code="400"} / on (pod) label_replace(metric, "dst_label", "$1", "src_label", "re")`, + shardingLabels: []string{"pod"}, + }, + { + name: "nested label joins", + expression: `label_join(sum by (pod) (label_join(metric, "dst_label", ",", "src_label")), "dst_label1", ",", "dst_label")`, + shardingLabels: []string{"pod"}, + }, + { + name: "complex query with label_replace, binary expr and aggregations on dynamic label", + expression: `sum(sum_over_time(container_memory_working_set_bytes{container_name!="POD",container_name!="",namespace="kube-system"}[1d:5m])) by (instance, cluster) / avg(label_replace(sum(sum_over_time(kube_node_status_capacity_memory_bytes[1d:5m])) by (node, cluster), "instance", "$1", "node", "(.*)")) by (instance, cluster)`, + shardingLabels: []string{"cluster"}, + }, } // Shardable by labels instant queries with matrix response @@ -197,6 +231,21 @@ sum by (container) ( http_requests_total`, shardingLabels: []string{"cluster", "pod", model.MetricNameLabel}, }, + { + name: "aggregate without expression with label_replace, sharding label is not dynamic", + expression: `sum without (dst_label) (label_replace(metric, "dst_label", "$1", "src_label", "re"))`, + shardingLabels: []string{"dst_label"}, + }, + { + name: "aggregate without expression with label_join, sharding label is not dynamic", + expression: `sum without (dst_label) (label_join(metric, "dst_label", ",", "src_label"))`, + shardingLabels: []string{"dst_label"}, + }, + { + name: "aggregate without expression with label_replace", + expression: `sum without (pod) (label_replace(metric, "dst_label", "$1", "src_label", "re"))`, + shardingLabels: []string{"pod", "dst_label"}, + }, } type testCase struct { diff --git a/vendor/github.com/thanos-io/thanos/pkg/cacheutil/redis_client.go b/vendor/github.com/thanos-io/thanos/pkg/cacheutil/redis_client.go index a12c8d5e839..cb5a7ee2aa7 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/cacheutil/redis_client.go +++ b/vendor/github.com/thanos-io/thanos/pkg/cacheutil/redis_client.go @@ -239,8 +239,9 @@ func (c *RedisClient) SetMulti(ctx context.Context, data map[string][]byte, ttl keys = append(keys, k) } err := doWithBatch(ctx, len(data), c.config.SetMultiBatchSize, c.setMultiGate, func(startIndex, endIndex int) error { + currentKeys := keys[startIndex:endIndex] _, err := c.Pipelined(ctx, func(p redis.Pipeliner) error { - for _, key := range keys { + for _, key := range currentKeys { p.SetEX(ctx, key, data[key], ttl) } return nil diff --git a/vendor/github.com/thanos-io/thanos/pkg/querysharding/analyzer.go b/vendor/github.com/thanos-io/thanos/pkg/querysharding/analyzer.go index 1009af25751..dc8b16ca72f 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/querysharding/analyzer.go +++ b/vendor/github.com/thanos-io/thanos/pkg/querysharding/analyzer.go @@ -1,23 +1,33 @@ // Copyright (c) The Thanos Authors. // Licensed under the Apache License 2.0. +// Copyright 2013 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package querysharding import ( - "fmt" - lru "github.com/hashicorp/golang-lru" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/promql/parser" ) -// QueryAnalyzer is an analyzer which determines -// whether a PromQL Query is shardable and using which labels. - type Analyzer interface { Analyze(string) (QueryAnalysis, error) } +// QueryAnalyzer is an analyzer which determines +// whether a PromQL Query is shardable and using which labels. type QueryAnalyzer struct{} type CachedQueryAnalyzer struct { @@ -25,11 +35,6 @@ type CachedQueryAnalyzer struct { cache *lru.Cache } -var nonShardableFuncs = []string{ - "label_join", - "label_replace", -} - // NewQueryAnalyzer creates a new QueryAnalyzer. func NewQueryAnalyzer() *CachedQueryAnalyzer { // Ignore the error check since it throws error @@ -66,8 +71,8 @@ func (a *CachedQueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { // Analyze analyzes a query and returns a QueryAnalysis. // Analyze uses the following algorithm: -// - if a query has subqueries, such as label_join or label_replace, -// or has functions which cannot be sharded, then treat the query as non shardable. +// - if a query has functions which cannot be sharded such as +// label_join or label_replace, then treat the query as non shardable. // - Walk the query and find the least common labelset // used in grouping expressions. If non-empty, treat the query // as shardable by those labels. @@ -80,14 +85,18 @@ func (a *QueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { return nonShardableQuery(), err } - isShardable := true - var analysis QueryAnalysis + var ( + analysis QueryAnalysis + dynamicLabels []string + ) parser.Inspect(expr, func(node parser.Node, nodes []parser.Node) error { switch n := node.(type) { case *parser.Call: - if n.Func != nil && contains(n.Func.Name, nonShardableFuncs) { - isShardable = false - return fmt.Errorf("expressions with %s are not shardable", n.Func.Name) + if n.Func != nil { + if n.Func.Name == "label_join" || n.Func.Name == "label_replace" { + dstLabel := stringFromArg(n.Args[1]) + dynamicLabels = append(dynamicLabels, dstLabel) + } } case *parser.BinaryExpr: if n.VectorMatching != nil { @@ -108,19 +117,42 @@ func (a *QueryAnalyzer) Analyze(query string) (QueryAnalysis, error) { return nil }) - if !isShardable { - return nonShardableQuery(), nil + // If currently it is shard by, it is still shardable if there is + // any label left after removing the dynamic labels. + // If currently it is shard without, it is still shardable if we + // shard without the union of the labels. + // TODO(yeya24): we can still make dynamic labels shardable if we push + // down the label_replace and label_join computation to the store level. + if len(dynamicLabels) > 0 { + analysis = analysis.scopeToLabels(dynamicLabels, false) } return analysis, nil } -func contains(needle string, haystack []string) bool { - for _, item := range haystack { - if needle == item { - return true +// Copied from https://github.com/prometheus/prometheus/blob/v2.40.1/promql/functions.go#L1416. +func stringFromArg(e parser.Expr) string { + tmp := unwrapStepInvariantExpr(e) // Unwrap StepInvariant + unwrapParenExpr(&tmp) // Optionally unwrap ParenExpr + return tmp.(*parser.StringLiteral).Val +} + +// Copied from https://github.com/prometheus/prometheus/blob/v2.40.1/promql/engine.go#L2642. +// unwrapParenExpr does the AST equivalent of removing parentheses around a expression. +func unwrapParenExpr(e *parser.Expr) { + for { + if p, ok := (*e).(*parser.ParenExpr); ok { + *e = p.Expr + } else { + break } } +} - return false +// Copied from https://github.com/prometheus/prometheus/blob/v2.40.1/promql/engine.go#L2652. +func unwrapStepInvariantExpr(e parser.Expr) parser.Expr { + if p, ok := e.(*parser.StepInvariantExpr); ok { + return p.Expr + } + return e } diff --git a/vendor/modules.txt b/vendor/modules.txt index 3d1baf6e572..7aa15dda3f6 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -775,7 +775,7 @@ github.com/thanos-io/objstore/providers/gcs github.com/thanos-io/objstore/providers/s3 github.com/thanos-io/objstore/providers/swift github.com/thanos-io/objstore/tracing -# github.com/thanos-io/thanos v0.29.1-0.20221111094505-b2badad930d6 +# github.com/thanos-io/thanos v0.29.1-0.20221115064008-fe45cfc66b7d ## explicit; go 1.18 github.com/thanos-io/thanos/pkg/block github.com/thanos-io/thanos/pkg/block/indexheader