From 776bbce66ba556601bc0844cae619adf02f61ef6 Mon Sep 17 00:00:00 2001 From: Albert Date: Wed, 18 Aug 2021 16:46:48 -0700 Subject: [PATCH 01/16] Add metrics for remaining planned compactions Signed-off-by: Albert --- pkg/compactor/compactor.go | 17 ++++++++++++++--- pkg/compactor/shuffle_sharding_grouper.go | 7 +++++++ 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 345d0abb003..1968312f024 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -51,7 +51,7 @@ var ( supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle} errInvalidShardingStrategy = errors.New("invalid sharding strategy") - DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter) compact.Grouper { + DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge) compact.Grouper { return compact.NewDefaultGrouper( logger, bkt, @@ -64,7 +64,7 @@ var ( metadata.NoneFunc) } - ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter) compact.Grouper { + ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge) compact.Grouper { return NewShuffleShardingGrouper( logger, bkt, @@ -74,6 +74,7 @@ var ( blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks, + remainingPlannedCompactions, metadata.NoneFunc, cfg) } @@ -115,6 +116,7 @@ type BlocksGrouperFactory func( blocksMarkedForDeletion prometheus.Counter, blocksMarkedForNoCompact prometheus.Counter, garbageCollectedBlocks prometheus.Counter, + remainingPlannedCompactions prometheus.Gauge, ) compact.Grouper // BlocksCompactorFactory builds and returns the compactor and planner to use to compact a tenant's blocks. @@ -273,6 +275,7 @@ type Compactor struct { blocksMarkedForDeletion prometheus.Counter blocksMarkedForNoCompaction prometheus.Counter garbageCollectedBlocks prometheus.Counter + remainingPlannedCompactions prometheus.Gauge // TSDB syncer metrics syncerMetrics *syncerMetrics @@ -320,6 +323,13 @@ func newCompactor( blocksGrouperFactory BlocksGrouperFactory, blocksCompactorFactory BlocksCompactorFactory, ) (*Compactor, error) { + var remainingPlannedCompactions prometheus.Gauge + if compactorCfg.ShardingStrategy == "shuffle-sharding" { + remainingPlannedCompactions = promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_compactor_remaining_planned_compactions", + Help: "Total number of plans that remain to be compacted.", + }) + } c := &Compactor{ compactorCfg: compactorCfg, storageCfg: storageCfg, @@ -382,6 +392,7 @@ func newCompactor( Name: "cortex_compactor_garbage_collected_blocks_total", Help: "Total number of blocks marked for deletion by compactor.", }), + remainingPlannedCompactions: remainingPlannedCompactions, } if len(compactorCfg.EnabledTenants) > 0 { @@ -722,7 +733,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { compactor, err := compact.NewBucketCompactor( ulogger, syncer, - c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.blocksMarkedForNoCompaction, c.garbageCollectedBlocks), + c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.blocksMarkedForNoCompaction, c.garbageCollectedBlocks, c.remainingPlannedCompactions), c.blocksPlannerFactory(ulogger, c.compactorCfg, noCompactMarkerFilter), c.blocksCompactor, path.Join(c.compactorCfg.DataDir, "compact"), diff --git a/pkg/compactor/shuffle_sharding_grouper.go b/pkg/compactor/shuffle_sharding_grouper.go index 464bc83ff0a..191600bc270 100644 --- a/pkg/compactor/shuffle_sharding_grouper.go +++ b/pkg/compactor/shuffle_sharding_grouper.go @@ -28,6 +28,7 @@ type ShuffleShardingGrouper struct { blocksMarkedForDeletion prometheus.Counter blocksMarkedForNoCompact prometheus.Counter garbageCollectedBlocks prometheus.Counter + remainingPlannedCompactions prometheus.Gauge hashFunc metadata.HashFunc compactions *prometheus.CounterVec compactionRunsStarted *prometheus.CounterVec @@ -46,6 +47,7 @@ func NewShuffleShardingGrouper( blocksMarkedForDeletion prometheus.Counter, blocksMarkedForNoCompact prometheus.Counter, garbageCollectedBlocks prometheus.Counter, + remainingPlannedCompactions prometheus.Gauge, hashFunc metadata.HashFunc, compactorCfg Config, ) *ShuffleShardingGrouper { @@ -62,6 +64,7 @@ func NewShuffleShardingGrouper( blocksMarkedForDeletion: blocksMarkedForDeletion, blocksMarkedForNoCompact: blocksMarkedForNoCompact, garbageCollectedBlocks: garbageCollectedBlocks, + remainingPlannedCompactions: remainingPlannedCompactions, hashFunc: hashFunc, // Metrics are copied from Thanos DefaultGrouper constructor compactions: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ @@ -103,6 +106,9 @@ func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (re var outGroups []*compact.Group i := 0 + // Metrics for the remaining planned compactions + g.remainingPlannedCompactions.Set(0) + for _, mainBlocks := range mainGroups { for _, group := range groupBlocksByCompactableRanges(mainBlocks, g.compactorCfg.BlockRanges.ToMilliseconds()) { // Nothing to do if we don't have at least 2 blocks. @@ -113,6 +119,7 @@ func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (re // TODO: Use the group's hash to determine whether a compactor should be responsible for compacting that group groupHash := hashGroup(group.blocks[0].Thanos.Labels["__org_id__"], group.rangeStart, group.rangeEnd) + g.remainingPlannedCompactions.Inc() groupKey := fmt.Sprintf("%v%d", groupHash, i) i++ From e8881fb959068063087d4b8e1425e0df3412bd44 Mon Sep 17 00:00:00 2001 From: Albert Date: Wed, 18 Aug 2021 18:33:15 -0700 Subject: [PATCH 02/16] fix unit tests Signed-off-by: Albert --- pkg/compactor/shuffle_sharding_grouper_test.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pkg/compactor/shuffle_sharding_grouper_test.go b/pkg/compactor/shuffle_sharding_grouper_test.go index e54a849be59..dfa13639700 100644 --- a/pkg/compactor/shuffle_sharding_grouper_test.go +++ b/pkg/compactor/shuffle_sharding_grouper_test.go @@ -6,6 +6,7 @@ import ( "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/tsdb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -142,14 +143,20 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { BlockRanges: testData.ranges, } + registerer := prometheus.NewRegistry() + remainingPlannedCompactions := promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_compactor_remaining_planned_compactions", + Help: "Total number of plans that remain to be compacted.", + }) + g := NewShuffleShardingGrouper(nil, nil, false, // Do not accept malformed indexes true, // Enable vertical compaction - prometheus.NewRegistry(), - nil, + registerer, nil, nil, + remainingPlannedCompactions, metadata.NoneFunc, *compactorCfg) actual, err := g.Groups(testData.blocks) From 0da3e9b7e73c967494266d886e78a26be080f3a8 Mon Sep 17 00:00:00 2001 From: Albert Date: Wed, 18 Aug 2021 18:59:21 -0700 Subject: [PATCH 03/16] Add shuffle sharding for compactor Signed-off-by: Albert --- docs/configuration/config-file-reference.md | 6 + docs/guides/shuffle-sharding.md | 15 + pkg/compactor/compactor.go | 46 ++- pkg/compactor/compactor_test.go | 280 ++++++++++++++++-- pkg/compactor/shuffle_sharding_grouper.go | 87 +++++- .../shuffle_sharding_grouper_test.go | 108 ++++++- pkg/cortex/cortex.go | 2 +- pkg/cortex/modules.go | 2 +- pkg/util/validation/limits.go | 7 + 9 files changed, 487 insertions(+), 66 deletions(-) diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index e6f3c5ea7a2..b7fd033d4eb 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4259,6 +4259,12 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -compactor.blocks-retention-period [compactor_blocks_retention_period: | default = 0s] +# The default tenant's shard size when the shuffle-sharding strategy is used by +# the compactor. When this setting is specified in the per-tenant overrides, a +# value of 0 disables shuffle sharding for the tenant. +# CLI flag: -compactor.tenant-shard-size +[compactor_tenant_shard_size: | default = 0] + # S3 server-side encryption type. Required to enable server-side encryption # overrides for a specific tenant. If not set, the default S3 client settings # are used. diff --git a/docs/guides/shuffle-sharding.md b/docs/guides/shuffle-sharding.md index 966b861854c..e8486f2f565 100644 --- a/docs/guides/shuffle-sharding.md +++ b/docs/guides/shuffle-sharding.md @@ -54,6 +54,7 @@ Cortex currently supports shuffle sharding in the following services: - [Query-frontend / Query-scheduler](#query-frontend-and-query-scheduler-shuffle-sharding) - [Store-gateway](#store-gateway-shuffle-sharding) - [Ruler](#ruler-shuffle-sharding) +- [Compactor](#compactor-shuffle-sharding) Shuffle sharding is **disabled by default** and needs to be explicitly enabled in the configuration. @@ -154,6 +155,20 @@ Cortex ruler can run in three modes: Note that when using sharding strategy, each rule group is evaluated by single ruler only, there is no replication. +### Compactor shuffle sharding + +Cortex compactor can run in three modes: + +1. **No sharding at all.** This is the most basic mode of the compactor. It is activated by using `-compactor.sharding-enabled=false` (default). In this mode every compactor will run every compaction. +2. **Default sharding**, activated by using `-compactor.sharding-enabled=true` and `-compactor.sharding-strategy=default` (default). In this mode compactors register themselves into the ring. Each compactor will then select and evaluate only those users that it "owns". +3. **Shuffle sharding**, activated by using `-compactor.sharding-enabled=true` and `-compactor.sharding-strategy=shuffle-sharding`. Similarly to default sharding, compactors use the ring to distribute workload, but compactions groups for each tenant can only be evaluated on limited number of compactors (`-compactor.tenant-shard-size`, can also be set per tenant as `compactor_tenant_shard_size` in overrides). + +The Cortex compactor by default shards by tenant ID when sharding is enabled. + +With shuffle sharding selected as the sharding strategy, a subset of the compactors will be used to handle a user based on the shard size. + +The idea behind using the shuffle sharding strategy for the compactor is to further enable horizontal scalability and build tolerance for compactions that may take longer than the compaction interval. + ## FAQ ### Does shuffle sharding add additional overhead to the KV store? diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 1968312f024..5916dbea4d3 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -34,6 +34,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/flagext" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/services" + "github.com/cortexproject/cortex/pkg/util/validation" ) const ( @@ -50,8 +51,10 @@ var ( supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle} errInvalidShardingStrategy = errors.New("invalid sharding strategy") + errShardingRequired = errors.New("sharding must be enabled to use shuffle-sharding sharding strategy") + errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0") - DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge) compact.Grouper { + DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion , blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ *ring.Ring, _ *ring.Lifecycler, _ CompactorLimits, _ string) compact.Grouper { return compact.NewDefaultGrouper( logger, bkt, @@ -64,7 +67,7 @@ var ( metadata.NoneFunc) } - ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge) compact.Grouper { + ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits CompactorLimits, userID string) compact.Grouper { return NewShuffleShardingGrouper( logger, bkt, @@ -76,7 +79,11 @@ var ( garbageCollectedBlocks, remainingPlannedCompactions, metadata.NoneFunc, - cfg) + cfg, + ring, + ringLifecycle.Addr, + limits, + userID) } DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) { @@ -117,6 +124,10 @@ type BlocksGrouperFactory func( blocksMarkedForNoCompact prometheus.Counter, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, + ring *ring.Ring, + ringLifecycler *ring.Lifecycler, + limit CompactorLimits, + userID string, ) compact.Grouper // BlocksCompactorFactory builds and returns the compactor and planner to use to compact a tenant's blocks. @@ -133,6 +144,11 @@ type PlannerFactory func( noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ) compact.Planner +// CompactorLimits defines limits used by the Compactor. +type CompactorLimits interface { + CompactorTenantShardSize(userID string) int +} + // Config holds the Compactor config. type Config struct { BlockRanges cortex_tsdb.DurationList `yaml:"block_ranges"` @@ -202,7 +218,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.Var(&cfg.DisabledTenants, "compactor.disabled-tenants", "Comma separated list of tenants that cannot be compacted by this compactor. If specified, and compactor would normally pick given tenant for compaction (via -compactor.enabled-tenants or sharding), it will be ignored instead.") } -func (cfg *Config) Validate() error { +func (cfg *Config) Validate(limits validation.Limits) error { // Each block range period should be divisible by the previous one. for i := 1; i < len(cfg.BlockRanges); i++ { if cfg.BlockRanges[i]%cfg.BlockRanges[i-1] != 0 { @@ -215,6 +231,14 @@ func (cfg *Config) Validate() error { return errInvalidShardingStrategy } + if cfg.ShardingStrategy == util.ShardingStrategyShuffle { + if !cfg.ShardingEnabled { + return errShardingRequired + } else if limits.CompactorTenantShardSize <= 0 { + return errInvalidTenantShardSize + } + } + return nil } @@ -235,6 +259,7 @@ type Compactor struct { parentLogger log.Logger registerer prometheus.Registerer allowedTenants *util.AllowedTenants + limits CompactorLimits // Functions that creates bucket client, grouper, planner and compactor using the context. // Useful for injecting mock objects from tests. @@ -282,7 +307,7 @@ type Compactor struct { } // NewCompactor makes a new Compactor. -func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, cfgProvider ConfigProvider, logger log.Logger, registerer prometheus.Registerer) (*Compactor, error) { +func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, cfgProvider ConfigProvider, logger log.Logger, registerer prometheus.Registerer, limits CompactorLimits) (*Compactor, error) { bucketClientFactory := func(ctx context.Context) (objstore.Bucket, error) { return bucket.NewClient(ctx, storageCfg.Bucket, "compactor", logger, registerer) } @@ -305,7 +330,7 @@ func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfi } } - cortexCompactor, err := newCompactor(compactorCfg, storageCfg, cfgProvider, logger, registerer, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory) + cortexCompactor, err := newCompactor(compactorCfg, storageCfg, cfgProvider, logger, registerer, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory, limits) if err != nil { return nil, errors.Wrap(err, "failed to create Cortex blocks compactor") } @@ -322,6 +347,7 @@ func newCompactor( bucketClientFactory func(ctx context.Context) (objstore.Bucket, error), blocksGrouperFactory BlocksGrouperFactory, blocksCompactorFactory BlocksCompactorFactory, + limits CompactorLimits, ) (*Compactor, error) { var remainingPlannedCompactions prometheus.Gauge if compactorCfg.ShardingStrategy == "shuffle-sharding" { @@ -393,6 +419,7 @@ func newCompactor( Help: "Total number of blocks marked for deletion by compactor.", }), remainingPlannedCompactions: remainingPlannedCompactions, + limits: limits, } if len(compactorCfg.EnabledTenants) > 0 { @@ -733,7 +760,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { compactor, err := compact.NewBucketCompactor( ulogger, syncer, - c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.blocksMarkedForNoCompaction, c.garbageCollectedBlocks, c.remainingPlannedCompactions), + c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.blocksMarkedForNoCompaction, c.garbageCollectedBlocks, c.remainingPlannedCompactions, c.ring, c.ringLifecycler, c.limits, userID), c.blocksPlannerFactory(ulogger, c.compactorCfg, noCompactMarkerFilter), c.blocksCompactor, path.Join(c.compactorCfg.DataDir, "compact"), @@ -791,8 +818,9 @@ func (c *Compactor) ownUser(userID string) (bool, error) { return false, nil } - // Always owned if sharding is disabled. - if !c.compactorCfg.ShardingEnabled { + // Always owned if sharding is disabled or if using shuffle-sharding as shard ownership + // is determined by the shuffle sharding grouper. + if !c.compactorCfg.ShardingEnabled || c.compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle { return true, nil } diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 0ef3d23daa1..8ab7cc8a220 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -39,6 +39,7 @@ import ( cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" cortex_storage_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" + "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/concurrency" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" @@ -90,34 +91,67 @@ func TestConfig_ShouldSupportCliFlags(t *testing.T) { func TestConfig_Validate(t *testing.T) { tests := map[string]struct { - setup func(cfg *Config) - expected string + setup func(cfg *Config) + initLimits func(*validation.Limits) + expected string }{ "should pass with the default config": { - setup: func(cfg *Config) {}, - expected: "", + setup: func(cfg *Config) {}, + initLimits: func(_ *validation.Limits) {}, + expected: "", }, "should pass with only 1 block range period": { setup: func(cfg *Config) { cfg.BlockRanges = cortex_tsdb.DurationList{time.Hour} }, - expected: "", + initLimits: func(_ *validation.Limits) {}, + expected: "", }, "should fail with non divisible block range periods": { setup: func(cfg *Config) { cfg.BlockRanges = cortex_tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour, 30 * time.Hour} }, - expected: errors.Errorf(errInvalidBlockRanges, 30*time.Hour, 24*time.Hour).Error(), + + initLimits: func(_ *validation.Limits) {}, + expected: errors.Errorf(errInvalidBlockRanges, 30*time.Hour, 24*time.Hour).Error(), + }, + "should pass with valid shuffle sharding config": { + setup: func(cfg *Config) { + cfg.ShardingStrategy = util.ShardingStrategyShuffle + cfg.ShardingEnabled = true + }, + initLimits: func(limits *validation.Limits) { + limits.CompactorTenantShardSize = 1 + }, + expected: "", + }, + "should fail with shuffle sharding strategy selected without sharding enabled": { + setup: func(cfg *Config) { + cfg.ShardingStrategy = util.ShardingStrategyShuffle + cfg.ShardingEnabled = false + }, + initLimits: func(_ *validation.Limits) {}, + expected: errShardingRequired.Error(), + }, + "should fail with bad compactor tenant shard size": { + setup: func(cfg *Config) { + cfg.ShardingStrategy = util.ShardingStrategyShuffle + cfg.ShardingEnabled = true + }, + initLimits: func(_ *validation.Limits) {}, + expected: errInvalidTenantShardSize.Error(), }, } for testName, testData := range tests { t.Run(testName, func(t *testing.T) { cfg := &Config{} - flagext.DefaultValues(cfg) + limits := validation.Limits{} + flagext.DefaultValues(cfg, &limits) testData.setup(cfg) + testData.initLimits(&limits) - if actualErr := cfg.Validate(); testData.expected != "" { + if actualErr := cfg.Validate(limits); testData.expected != "" { assert.EqualError(t, actualErr, testData.expected) } else { assert.NoError(t, actualErr) @@ -133,7 +167,7 @@ func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) { bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{}, nil) cfg := prepareConfig() - c, _, _, logs, registry := prepare(t, cfg, bucketClient) + c, _, _, logs, registry := prepare(t, cfg, bucketClient, nil) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) // Wait until a run has completed. @@ -283,7 +317,7 @@ func TestCompactor_ShouldRetryCompactionOnFailureWhileDiscoveringUsersFromBucket bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", nil, errors.New("failed to iterate the bucket")) - c, _, _, logs, registry := prepare(t, prepareConfig(), bucketClient) + c, _, _, logs, registry := prepare(t, prepareConfig(), bucketClient, nil) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) // Wait until all retry attempts have completed. @@ -445,7 +479,7 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant( bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil) bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil) - c, _, tsdbPlannerMock, _, registry := prepare(t, prepareConfig(), bucketClient) + c, _, tsdbPlannerMock, _, registry := prepare(t, prepareConfig(), bucketClient, nil) tsdbPlannerMock.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, errors.New("Failed to plan")) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) @@ -506,7 +540,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { bucketClient.MockUpload("user-1/bucket-index.json.gz", nil) bucketClient.MockUpload("user-2/bucket-index.json.gz", nil) - c, _, tsdbPlanner, logs, registry := prepare(t, prepareConfig(), bucketClient) + c, _, tsdbPlanner, logs, registry := prepare(t, prepareConfig(), bucketClient, nil) // Mock the planner as if there's no compaction to do, // in order to simplify tests (all in all, we just want to @@ -642,7 +676,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) { bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil) bucketClient.MockUpload("user-1/bucket-index.json.gz", nil) - c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bucketClient) + c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bucketClient, nil) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) @@ -759,7 +793,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForSkipCompact(t *testing.T) { bucketClient.MockUpload("user-1/bucket-index.json.gz", nil) bucketClient.MockUpload("user-2/bucket-index.json.gz", nil) - c, _, tsdbPlanner, _, registry := prepare(t, prepareConfig(), bucketClient) + c, _, tsdbPlanner, _, registry := prepare(t, prepareConfig(), bucketClient, nil) tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil) @@ -808,7 +842,7 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T) bucketClient.MockDelete("user-1/01DTVP434PA9VFXSW2JKB3392D/index", nil) bucketClient.MockDelete("user-1/bucket-index.json.gz", nil) - c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bucketClient) + c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bucketClient, nil) // Mock the planner as if there's no compaction to do, // in order to simplify tests (all in all, we just want to @@ -907,7 +941,7 @@ func TestCompactor_ShouldSkipOutOrOrderBlocks(t *testing.T) { cfg := prepareConfig() cfg.SkipBlocksWithOutOfOrderChunksEnabled = true - c, tsdbCompac, tsdbPlanner, _, registry := prepare(t, cfg, bucketClient) + c, tsdbCompac, tsdbPlanner, _, registry := prepare(t, cfg, bucketClient, nil) tsdbCompac.On("Compact", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(b1, nil) @@ -985,7 +1019,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni cfg.ShardingRing.InstanceAddr = "1.2.3.4" cfg.ShardingRing.KVStore.Mock = ringStore - c, _, tsdbPlanner, logs, _ := prepare(t, cfg, bucketClient) + c, _, tsdbPlanner, logs, _ := prepare(t, cfg, bucketClient, nil) // Mock the planner as if there's no compaction to do, // in order to simplify tests (all in all, we just want to @@ -1073,7 +1107,7 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM cfg.ShardingRing.WaitStabilityMaxDuration = 10 * time.Second cfg.ShardingRing.KVStore.Mock = kvstore - c, _, tsdbPlanner, l, _ := prepare(t, cfg, bucketClient) + c, _, tsdbPlanner, l, _ := prepare(t, cfg, bucketClient, nil) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck compactors = append(compactors, c) @@ -1106,6 +1140,166 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM } } +func TestCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingEnabledWithShuffleShardingAndMultipleInstancesRunning(t *testing.T) { + t.Parallel() + + numUsers := 3 + + // Setup user IDs + userIDs := make([]string, 0, numUsers) + for i := 1; i <= numUsers; i++ { + userIDs = append(userIDs, fmt.Sprintf("user-%d", i)) + } + + startTime := int64(1574776800000) + // Define blocks mapping block IDs to start and end times + blocks := map[string]map[string]int64{ + "01DTVP434PA9VFXSW2JKB3392D": { + "startTime": startTime, + "endTime": startTime + time.Hour.Milliseconds()*2, + }, + "01DTVP434PA9VFXSW2JKB3392E": { + "startTime": startTime, + "endTime": startTime + time.Hour.Milliseconds()*2, + }, + "01DTVP434PA9VFXSW2JKB3392F": { + "startTime": startTime + time.Hour.Milliseconds()*2, + "endTime": startTime + time.Hour.Milliseconds()*4, + }, + "01DTVP434PA9VFXSW2JKB3392G": { + "startTime": startTime + time.Hour.Milliseconds()*2, + "endTime": startTime + time.Hour.Milliseconds()*4, + }, + // Add another new block as the final block so that the previous groups will be planned for compaction + "01DTVP434PA9VFXSW2JKB3392H": { + "startTime": startTime + time.Hour.Milliseconds()*4, + "endTime": startTime + time.Hour.Milliseconds()*6, + }, + } + + // Mock the bucket to contain all users, each one with five blocks, 2 sets of overlapping blocks and 1 separate block. + bucketClient := &bucket.ClientMock{} + bucketClient.MockIter("", userIDs, nil) + + // Keys with a value greater than 1 will be groups that should be compacted + groupHashes := make(map[uint32]int) + for _, userID := range userIDs { + blockDirectory := []string{} + + for blockID, blockTimes := range blocks { + bucketClient.MockGet(userID+"/"+blockID+"/meta.json", mockBlockMetaJSONWithTime(blockID, userID, blockTimes["startTime"], blockTimes["endTime"]), nil) + bucketClient.MockGet(userID+"/"+blockID+"/deletion-mark.json", "", nil) + blockDirectory = append(blockDirectory, userID+"/"+blockID) + + // Get all of the unique group hashes so that they can be used to ensure all groups were compacted + groupHash := hashGroup(userID, blockTimes["startTime"], blockTimes["endTime"]) + groupHashes[groupHash]++ + } + + bucketClient.MockIter(userID+"/", blockDirectory, nil) + bucketClient.MockIter(userID+"/markers/", nil, nil) + bucketClient.MockExists(path.Join(userID, cortex_tsdb.TenantDeletionMarkPath), false, nil) + bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil) + bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil) + } + + // Create a shared KV Store + kvstore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + // Create four compactors + var compactors []*Compactor + var logs []*concurrency.SyncBuffer + + for i := 1; i <= 4; i++ { + cfg := prepareConfig() + cfg.ShardingEnabled = true + cfg.ShardingStrategy = util.ShardingStrategyShuffle + cfg.ShardingRing.InstanceID = fmt.Sprintf("compactor-%d", i) + cfg.ShardingRing.InstanceAddr = fmt.Sprintf("127.0.0.%d", i) + cfg.ShardingRing.WaitStabilityMinDuration = 3 * time.Second + cfg.ShardingRing.WaitStabilityMaxDuration = 10 * time.Second + cfg.ShardingRing.KVStore.Mock = kvstore + + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.CompactorTenantShardSize = 3 + + c, _, tsdbPlanner, l, _ := prepare(t, cfg, bucketClient, limits) + defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck + + compactors = append(compactors, c) + logs = append(logs, l) + + // Mock the planner as if there's no compaction to do, + // in order to simplify tests (all in all, we just want to + // test our logic and not TSDB compactor which we expect to + // be already tested). + tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil) + } + + // Start all compactors + for _, c := range compactors { + require.NoError(t, c.StartAsync(context.Background())) + } + // Wait for all the compactors to get into the Running state without errors. + // Cannot use StartAndAwaitRunning as this would cause the compactions to start before + // all the compactors are initialized + for _, c := range compactors { + require.NoError(t, c.AwaitRunning(context.Background())) + } + + // Wait until a run has been completed on each compactor + for _, c := range compactors { + cortex_testutil.Poll(t, 60*time.Second, 1.0, func() interface{} { + return prom_testutil.ToFloat64(c.compactionRunsCompleted) + }) + } + + // Ensure that each group was only compacted by exactly one compactor + for groupHash, blockCount := range groupHashes { + + l, found, err := checkLogsForCompaction(compactors, logs, groupHash) + require.NoError(t, err) + + // If the blockCount < 2 then the group shouldn't have been compacted, therefore not found in the logs + if blockCount < 2 { + assert.False(t, found) + } else { + assert.Contains(t, l.String(), fmt.Sprintf(`msg="found compactable group for user" group_hash=%d`, groupHash)) + + } + } +} + +// checkLogsForCompaction checks the logs to see if a compaction has happened on the groupHash, +// if there has been a compaction it will return the logs of the compactor that handled the group +// and will return true. Otherwise this function will return a nil value for the logs and false +// as the group was not compacted +func checkLogsForCompaction(compactors []*Compactor, logs []*concurrency.SyncBuffer, groupHash uint32) (*concurrency.SyncBuffer, bool, error) { + var log *concurrency.SyncBuffer + + // Make sure that the group_hash is only owned by a single compactor + for _, l := range logs { + owned := strings.Contains(l.String(), fmt.Sprintf(`msg="found compactable group for user" group_hash=%d`, groupHash)) + + // Ensure the group is not owned by multiple compactors + if owned && log != nil { + return nil, false, fmt.Errorf("group with group_hash=%d owned by multiple compactors", groupHash) + } + if owned { + log = l + } + } + + // Return an false if we've not been able to find it + if log == nil { + return nil, false, nil + } + + return log, true, nil +} + func createTSDBBlock(t *testing.T, bkt objstore.Bucket, userID string, minT, maxT int64, externalLabels map[string]string) ulid.ULID { // Create a temporary dir for TSDB. tempDir, err := ioutil.TempDir(os.TempDir(), "tsdb") @@ -1283,7 +1477,7 @@ func prepareConfig() Config { return compactorCfg } -func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket) (*Compactor, *tsdbCompactorMock, *tsdbPlannerMock, *concurrency.SyncBuffer, prometheus.Gatherer) { +func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket, limits *validation.Limits) (*Compactor, *tsdbCompactorMock, *tsdbPlannerMock, *concurrency.SyncBuffer, prometheus.Gatherer) { storageCfg := cortex_tsdb.BlocksStorageConfig{} flagext.DefaultValues(&storageCfg) @@ -1304,9 +1498,12 @@ func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket) (* logger := log.NewLogfmtLogger(logs) registry := prometheus.NewRegistry() - var limits validation.Limits - flagext.DefaultValues(&limits) - overrides, err := validation.NewOverrides(limits, nil) + if limits == nil { + limits = &validation.Limits{} + flagext.DefaultValues(limits) + } + + overrides, err := validation.NewOverrides(*limits, nil) require.NoError(t, err) bucketClientFactory := func(ctx context.Context) (objstore.Bucket, error) { @@ -1322,7 +1519,14 @@ func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket) (* nil } - c, err := newCompactor(compactorCfg, storageCfg, overrides, logger, registry, bucketClientFactory, DefaultBlocksGrouperFactory, blocksCompactorFactory) + var blocksGrouperFactory BlocksGrouperFactory + if compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle { + blocksGrouperFactory = ShuffleShardingGrouperFactory + } else { + blocksGrouperFactory = DefaultBlocksGrouperFactory + } + + c, err := newCompactor(compactorCfg, storageCfg, overrides, logger, registry, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory, overrides) require.NoError(t, err) return c, tsdbCompactor, tsdbPlanner, logs, registry @@ -1422,6 +1626,32 @@ func mockDeletionMarkJSON(id string, deletionTime time.Time) string { return string(content) } +func mockBlockMetaJSONWithTime(id string, orgID string, minTime int64, maxTime int64) string { + meta := metadata.Meta{ + Thanos: metadata.Thanos{ + Labels: map[string]string{"__org_id__": orgID}, + }, + } + + meta.BlockMeta = tsdb.BlockMeta{ + Version: 1, + ULID: ulid.MustParse(id), + MinTime: minTime, + MaxTime: maxTime, + Compaction: tsdb.BlockMetaCompaction{ + Level: 1, + Sources: []ulid.ULID{ulid.MustParse(id)}, + }, + } + + content, err := json.Marshal(meta) + if err != nil { + panic("failed to marshal mocked block meta") + } + + return string(content) +} + func TestCompactor_DeleteLocalSyncFiles(t *testing.T) { numUsers := 10 @@ -1457,7 +1687,7 @@ func TestCompactor_DeleteLocalSyncFiles(t *testing.T) { cfg.ShardingRing.KVStore.Mock = kvstore // Each compactor will get its own temp dir for storing local files. - c, _, tsdbPlanner, _, _ := prepare(t, cfg, inmem) + c, _, tsdbPlanner, _, _ := prepare(t, cfg, inmem, nil) t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c)) }) @@ -1526,7 +1756,7 @@ func TestCompactor_ShouldFailCompactionOnTimeout(t *testing.T) { // Set ObservePeriod to longer than the timeout period to mock a timeout while waiting on ring to become ACTIVE cfg.ShardingRing.ObservePeriod = time.Second * 10 - c, _, _, logs, _ := prepare(t, cfg, bucketClient) + c, _, _, logs, _ := prepare(t, cfg, bucketClient, nil) // Try to start the compactor with a bad consul kv-store. The err := services.StartAndAwaitRunning(context.Background(), c) diff --git a/pkg/compactor/shuffle_sharding_grouper.go b/pkg/compactor/shuffle_sharding_grouper.go index 191600bc270..a53028323c7 100644 --- a/pkg/compactor/shuffle_sharding_grouper.go +++ b/pkg/compactor/shuffle_sharding_grouper.go @@ -7,6 +7,7 @@ import ( "strings" "time" + "github.com/cortexproject/cortex/pkg/ring" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/oklog/ulid" @@ -29,13 +30,18 @@ type ShuffleShardingGrouper struct { blocksMarkedForNoCompact prometheus.Counter garbageCollectedBlocks prometheus.Counter remainingPlannedCompactions prometheus.Gauge - hashFunc metadata.HashFunc - compactions *prometheus.CounterVec - compactionRunsStarted *prometheus.CounterVec - compactionRunsCompleted *prometheus.CounterVec - compactionFailures *prometheus.CounterVec - verticalCompactions *prometheus.CounterVec - compactorCfg Config + hashFunc metadata.HashFunc + compactions *prometheus.CounterVec + compactionRunsStarted *prometheus.CounterVec + compactionRunsCompleted *prometheus.CounterVec + compactionFailures *prometheus.CounterVec + verticalCompactions *prometheus.CounterVec + compactorCfg Config + limits CompactorLimits + userID string + + ring ring.ReadRing + ringLifecyclerAddr string } func NewShuffleShardingGrouper( @@ -50,6 +56,10 @@ func NewShuffleShardingGrouper( remainingPlannedCompactions prometheus.Gauge, hashFunc metadata.HashFunc, compactorCfg Config, + ring ring.ReadRing, + ringLifecyclerAddr string, + limits CompactorLimits, + userID string, ) *ShuffleShardingGrouper { if logger == nil { logger = log.NewNopLogger() @@ -87,7 +97,11 @@ func NewShuffleShardingGrouper( Name: "thanos_compact_group_vertical_compactions_total", Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.", }, []string{"group"}), - compactorCfg: compactorCfg, + compactorCfg: compactorCfg, + ring: ring, + ringLifecyclerAddr: ringLifecyclerAddr, + limits: limits, + userID: userID, } } @@ -105,7 +119,18 @@ func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (re // which we can parallelly compact. var outGroups []*compact.Group - i := 0 + // Check if this compactor is on the subring. + // If the compactor is not on the subring when using the userID as a identifier + // no plans generated below will be owned by the compactor so we can just return an empty array + // as there will be no planned groups + onSubring, err := g.checkSubringForCompactor() + if err != nil { + return nil, errors.Wrap(err, "unable to check sub-ring for compactor ownership") + } + if !onSubring { + level.Info(g.logger).Log("msg", "compactor is not on the current sub-ring skipping user", "user", g.userID) + return outGroups, nil + } // Metrics for the remaining planned compactions g.remainingPlannedCompactions.Set(0) @@ -116,14 +141,20 @@ func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (re continue } - // TODO: Use the group's hash to determine whether a compactor should be responsible for compacting that group - groupHash := hashGroup(group.blocks[0].Thanos.Labels["__org_id__"], group.rangeStart, group.rangeEnd) + groupHash := hashGroup(g.userID, group.rangeStart, group.rangeEnd) + + if owned, err := g.ownGroup(groupHash); err != nil { + level.Warn(g.logger).Log("msg", "unable to check if user is owned by this shard", "group hash", groupHash, "err", err, "group", group.String()) + continue + } else if !owned { + level.Info(g.logger).Log("msg", "skipping group because it is not owned by this shard", "group_hash", groupHash) + continue + } g.remainingPlannedCompactions.Inc() - groupKey := fmt.Sprintf("%v%d", groupHash, i) - i++ + groupKey := fmt.Sprintf("%v%s", groupHash, compact.DefaultGroupKey(group.blocks[0].Thanos)) - level.Debug(g.logger).Log("msg", "found compactable group for user", "user", group.blocks[0].Thanos.Labels["__org_id__"], "plan", group.String()) + level.Info(g.logger).Log("msg", "found compactable group for user", "group_hash", groupHash, "group", group.String()) // All the blocks within the same group have the same downsample // resolution and external labels. @@ -184,6 +215,34 @@ func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (re return outGroups, nil } +// Check whether this compactor instance owns the group. +func (g *ShuffleShardingGrouper) ownGroup(groupHash uint32) (bool, error) { + subRing := g.ring.ShuffleShard(g.userID, g.limits.CompactorTenantShardSize(g.userID)) + + rs, err := subRing.Get(groupHash, RingOp, nil, nil, nil) + if err != nil { + return false, err + } + + if len(rs.Instances) != 1 { + return false, fmt.Errorf("unexpected number of compactors in the shard (expected 1, got %d)", len(rs.Instances)) + } + + return rs.Instances[0].Addr == g.ringLifecyclerAddr, nil +} + +// Check whether this compactor exists on the subring based on user ID +func (g *ShuffleShardingGrouper) checkSubringForCompactor() (bool, error) { + subRing := g.ring.ShuffleShard(g.userID, g.limits.CompactorTenantShardSize(g.userID)) + + rs, err := subRing.GetAllHealthy(RingOp) + if err != nil { + return false, err + } + + return rs.Includes(g.ringLifecyclerAddr), nil +} + // Get the hash of a group based on the UserID, and the starting and ending time of the group's range. func hashGroup(userID string, rangeStart int64, rangeEnd int64) uint32 { groupString := fmt.Sprintf("%v%v%v", userID, rangeStart, rangeEnd) diff --git a/pkg/compactor/shuffle_sharding_grouper_test.go b/pkg/compactor/shuffle_sharding_grouper_test.go index dfa13639700..af3a135db8f 100644 --- a/pkg/compactor/shuffle_sharding_grouper_test.go +++ b/pkg/compactor/shuffle_sharding_grouper_test.go @@ -4,11 +4,14 @@ import ( "testing" "time" + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/util/validation" "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/tsdb" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/thanos-io/thanos/pkg/block/metadata" ) @@ -34,63 +37,63 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { map[ulid.ULID]*metadata.Meta{ block1ulid: { BlockMeta: tsdb.BlockMeta{ULID: block1ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}, - Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, }, block2ulid: { BlockMeta: tsdb.BlockMeta{ULID: block2ulid, MinTime: 3 * time.Hour.Milliseconds(), MaxTime: 4 * time.Hour.Milliseconds()}, - Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, }, block3ulid: { BlockMeta: tsdb.BlockMeta{ULID: block3ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, - Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, }, block4ulid: { BlockMeta: tsdb.BlockMeta{ULID: block4ulid, MinTime: 2 * time.Hour.Milliseconds(), MaxTime: 3 * time.Hour.Milliseconds()}, - Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, }, block5ulid: { BlockMeta: tsdb.BlockMeta{ULID: block5ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}, - Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "2"}}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "2"}}, }, block6ulid: { BlockMeta: tsdb.BlockMeta{ULID: block6ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, - Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "2"}}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "2"}}, }, block7ulid: { BlockMeta: tsdb.BlockMeta{ULID: block7ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, - Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, }, block8ulid: { BlockMeta: tsdb.BlockMeta{ULID: block8ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, - Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "2"}}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "2"}}, }, block9ulid: { BlockMeta: tsdb.BlockMeta{ULID: block9ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, - Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "3"}}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "3"}}, }, block10ulid: { BlockMeta: tsdb.BlockMeta{ULID: block10ulid, MinTime: 4 * time.Hour.Milliseconds(), MaxTime: 6 * time.Hour.Milliseconds()}, - Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "2"}}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "2"}}, }, block11ulid: { BlockMeta: tsdb.BlockMeta{ULID: block11ulid, MinTime: 6 * time.Hour.Milliseconds(), MaxTime: 8 * time.Hour.Milliseconds()}, - Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "2"}}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "2"}}, }, block12ulid: { BlockMeta: tsdb.BlockMeta{ULID: block12ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}, - Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, }, block13ulid: { BlockMeta: tsdb.BlockMeta{ULID: block13ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 20 * time.Hour.Milliseconds()}, - Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, }, block14ulid: { BlockMeta: tsdb.BlockMeta{ULID: block14ulid, MinTime: 21 * time.Hour.Milliseconds(), MaxTime: 40 * time.Hour.Milliseconds()}, - Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, }, block15ulid: { BlockMeta: tsdb.BlockMeta{ULID: block15ulid, MinTime: 21 * time.Hour.Milliseconds(), MaxTime: 40 * time.Hour.Milliseconds()}, - Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, }, } @@ -143,6 +146,23 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { BlockRanges: testData.ranges, } + limits := &validation.Limits{} + overrides, err := validation.NewOverrides(*limits, nil) + require.NoError(t, err) + + // Setup mocking of the ring so that the grouper will own all the shards + rs := ring.ReplicationSet{ + Instances: []ring.InstanceDesc{ + {Addr: "test-addr"}, + }, + } + subring := &RingMock{} + subring.On("GetAllHealthy", mock.Anything).Return(rs, nil) + subring.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(rs, nil) + + ring := &RingMock{} + ring.On("ShuffleShard", mock.Anything, mock.Anything).Return(subring, nil) + registerer := prometheus.NewRegistry() remainingPlannedCompactions := promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ Name: "cortex_compactor_remaining_planned_compactions", @@ -158,7 +178,11 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { nil, remainingPlannedCompactions, metadata.NoneFunc, - *compactorCfg) + *compactorCfg, + ring, + "test-addr", + overrides, + "") actual, err := g.Groups(testData.blocks) require.NoError(t, err) require.Len(t, actual, len(testData.expected)) @@ -502,3 +526,55 @@ func TestBlocksGroup_overlaps(t *testing.T) { assert.Equal(t, tc.expected, tc.second.overlaps(tc.first)) } } + +type RingMock struct { + mock.Mock +} + +func (r *RingMock) Collect(ch chan<- prometheus.Metric) {} + +func (r *RingMock) Describe(ch chan<- *prometheus.Desc) {} + +func (r *RingMock) Get(key uint32, op ring.Operation, bufDescs []ring.InstanceDesc, bufHosts, bufZones []string) (ring.ReplicationSet, error) { + args := r.Called(key, op, bufDescs, bufHosts, bufZones) + return args.Get(0).(ring.ReplicationSet), args.Error(1) +} + +func (r *RingMock) GetAllHealthy(op ring.Operation) (ring.ReplicationSet, error) { + args := r.Called(op) + return args.Get(0).(ring.ReplicationSet), args.Error(1) +} + +func (r *RingMock) GetReplicationSetForOperation(op ring.Operation) (ring.ReplicationSet, error) { + args := r.Called(op) + return args.Get(0).(ring.ReplicationSet), args.Error(1) +} + +func (r *RingMock) ReplicationFactor() int { + return 0 +} + +func (r *RingMock) InstancesCount() int { + return 0 +} + +func (r *RingMock) ShuffleShard(identifier string, size int) ring.ReadRing { + args := r.Called(identifier, size) + return args.Get(0).(ring.ReadRing) +} + +func (r *RingMock) GetInstanceState(instanceID string) (ring.InstanceState, error) { + args := r.Called(instanceID) + return args.Get(0).(ring.InstanceState), args.Error(1) +} + +func (r *RingMock) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ring.ReadRing { + args := r.Called(identifier, size, lookbackPeriod, now) + return args.Get(0).(ring.ReadRing) +} + +func (r *RingMock) HasInstance(instanceID string) bool { + return true +} + +func (r *RingMock) CleanupShuffleShardCache(identifier string) {} diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 355bde61287..f429b247b00 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -233,7 +233,7 @@ func (c *Config) Validate(log log.Logger) error { if err := c.StoreGateway.Validate(c.LimitsConfig); err != nil { return errors.Wrap(err, "invalid store-gateway config") } - if err := c.Compactor.Validate(); err != nil { + if err := c.Compactor.Validate(c.LimitsConfig); err != nil { return errors.Wrap(err, "invalid compactor config") } if err := c.AlertmanagerStorage.Validate(); err != nil { diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 2568029087d..8d6d65005e0 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -731,7 +731,7 @@ func (t *Cortex) initAlertManager() (serv services.Service, err error) { func (t *Cortex) initCompactor() (serv services.Service, err error) { t.Cfg.Compactor.ShardingRing.ListenPort = t.Cfg.Server.GRPCListenPort - t.Compactor, err = compactor.NewCompactor(t.Cfg.Compactor, t.Cfg.BlocksStorage, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer) + t.Compactor, err = compactor.NewCompactor(t.Cfg.Compactor, t.Cfg.BlocksStorage, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer, t.Overrides) if err != nil { return } diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 9f2474c47db..df0baead297 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -93,6 +93,7 @@ type Limits struct { // Compactor. CompactorBlocksRetentionPeriod model.Duration `yaml:"compactor_blocks_retention_period" json:"compactor_blocks_retention_period"` + CompactorTenantShardSize int `yaml:"compactor_tenant_shard_size" json:"compactor_tenant_shard_size"` // This config doesn't have a CLI flag registered here because they're registered in // their own original config struct. @@ -168,6 +169,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.RulerMaxRuleGroupsPerTenant, "ruler.max-rule-groups-per-tenant", 0, "Maximum number of rule groups per-tenant. 0 to disable.") f.Var(&l.CompactorBlocksRetentionPeriod, "compactor.blocks-retention-period", "Delete blocks containing samples older than the specified retention period. 0 to disable.") + f.IntVar(&l.CompactorTenantShardSize, "compactor.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the compactor. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.") // Store-gateway. f.IntVar(&l.StoreGatewayTenantShardSize, "store-gateway.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used. Must be set when the store-gateway sharding is enabled with the shuffle-sharding strategy. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.") @@ -496,6 +498,11 @@ func (o *Overrides) CompactorBlocksRetentionPeriod(userID string) time.Duration return time.Duration(o.getOverridesForUser(userID).CompactorBlocksRetentionPeriod) } +// CompactorTenantShardSize returns shard size (number of rulers) used by this tenant when using shuffle-sharding strategy. +func (o *Overrides) CompactorTenantShardSize(userID string) int { + return o.getOverridesForUser(userID).CompactorTenantShardSize +} + // MetricRelabelConfigs returns the metric relabel configs for a given user. func (o *Overrides) MetricRelabelConfigs(userID string) []*relabel.Config { return o.getOverridesForUser(userID).MetricRelabelConfigs From acb1866bc2ee189d1d2b2ff342c2c5dce04953b9 Mon Sep 17 00:00:00 2001 From: Albert Date: Wed, 18 Aug 2021 19:02:38 -0700 Subject: [PATCH 04/16] update changelog Signed-off-by: Albert --- CHANGELOG.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8f01a3c35bf..609e19c395b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ * [CHANGE] Distributor: Apply `max_fetched_series_per_query` limit for `/series` API. #4683 * [FEATURE] Ruler: Add `external_labels` option to tag all alerts with a given set of labels. * [FEATURE] Compactor: Add `-compactor.skip-blocks-with-out-of-order-chunks-enabled` configuration to mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction +* [FEATURE] Add shuffle sharding grouper and planner within compactor to allow further work towards parallelizing compaction #4624 ## 1.12.0 in progress @@ -15,9 +16,13 @@ * [CHANGE] Compactor block deletion mark migration, needed when upgrading from v1.7, is now disabled by default. #4597 * [CHANGE] The `status_code` label on gRPC client metrics has changed from '200' and '500' to '2xx', '5xx', '4xx', 'cancel' or 'error'. 4601 * [CHANGE] Memberlist: changed probe interval from `1s` to `5s` and probe timeout from `500ms` to `2s`. #4601 -* [FEATURE] Add shuffle sharding grouper and planner within compactor to allow further work towards parallelizing compaction #4624 * [ENHANCEMENT] Update Go version to 1.17.8. #4602 #4604 #4658 * [ENHANCEMENT] Keep track of discarded samples due to bad relabel configuration in `cortex_discarded_samples_total`. #4503 +* [FEATURE] Add shuffle sharding grouper and planner within compactor to allow further work towards parallelizing compaction #4621 +* [FEATURE] Add shuffle sharding for the compactor #4433 +* [ENHANCEMENT] Update Go version to 1.17.5. #4602 #4604 +* [ENHANCEMENT] Keep track of discarded samples due to relabel configuration in `cortex_discarded_samples_total`. #4503 +>>>>>>> c2b1835ac (update changelog) * [ENHANCEMENT] Ruler: Add `-ruler.disable-rule-group-label` to disable the `rule_group` label on exported metrics. #4571 * [ENHANCEMENT] Query federation: improve performance in MergeQueryable by memoizing labels. #4502 * [ENHANCEMENT] Added new ring related config `-ingester.readiness-check-ring-health` when enabled the readiness probe will succeed only after all instances are ACTIVE and healthy in the ring, this is enabled by default. #4539 From 2063e96dd41583ab4be0226ba141f9235c120314 Mon Sep 17 00:00:00 2001 From: Albert Date: Wed, 18 Aug 2021 19:58:14 -0700 Subject: [PATCH 05/16] fix linting Signed-off-by: Albert --- pkg/compactor/compactor.go | 2 +- pkg/compactor/shuffle_sharding_grouper.go | 2 ++ pkg/compactor/shuffle_sharding_grouper_test.go | 5 +++-- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 5916dbea4d3..1f23023526b 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -145,7 +145,7 @@ type PlannerFactory func( ) compact.Planner // CompactorLimits defines limits used by the Compactor. -type CompactorLimits interface { +type CompactorLimits interface { //nolint CompactorTenantShardSize(userID string) int } diff --git a/pkg/compactor/shuffle_sharding_grouper.go b/pkg/compactor/shuffle_sharding_grouper.go index a53028323c7..5dd6d12b96f 100644 --- a/pkg/compactor/shuffle_sharding_grouper.go +++ b/pkg/compactor/shuffle_sharding_grouper.go @@ -18,6 +18,8 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/objstore" + + "github.com/cortexproject/cortex/pkg/ring" ) type ShuffleShardingGrouper struct { diff --git a/pkg/compactor/shuffle_sharding_grouper_test.go b/pkg/compactor/shuffle_sharding_grouper_test.go index af3a135db8f..8c67cf8d46a 100644 --- a/pkg/compactor/shuffle_sharding_grouper_test.go +++ b/pkg/compactor/shuffle_sharding_grouper_test.go @@ -4,8 +4,6 @@ import ( "testing" "time" - "github.com/cortexproject/cortex/pkg/ring" - "github.com/cortexproject/cortex/pkg/util/validation" "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -14,6 +12,9 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/thanos-io/thanos/pkg/block/metadata" + + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/util/validation" ) func TestShuffleShardingGrouper_Groups(t *testing.T) { From 1c8d4d8d7c9b0a6c2e845b5c679735cba2ecd2cb Mon Sep 17 00:00:00 2001 From: Alvin Lin Date: Fri, 14 Jan 2022 19:20:48 -0800 Subject: [PATCH 06/16] Fix build errors Signed-off-by: Alvin Lin --- pkg/compactor/shuffle_sharding_grouper_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/compactor/shuffle_sharding_grouper_test.go b/pkg/compactor/shuffle_sharding_grouper_test.go index 8c67cf8d46a..7fc69cf66aa 100644 --- a/pkg/compactor/shuffle_sharding_grouper_test.go +++ b/pkg/compactor/shuffle_sharding_grouper_test.go @@ -177,6 +177,7 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { registerer, nil, nil, + nil, remainingPlannedCompactions, metadata.NoneFunc, *compactorCfg, From 23e79a0cf165849fb0648a59760a6f91098c5f8d Mon Sep 17 00:00:00 2001 From: Alvin Lin Date: Fri, 14 Jan 2022 19:50:14 -0800 Subject: [PATCH 07/16] Fix up change log Signed-off-by: Alvin Lin --- CHANGELOG.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 609e19c395b..e53ac2bc4c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,7 @@ * [CHANGE] Distributor: Apply `max_fetched_series_per_query` limit for `/series` API. #4683 * [FEATURE] Ruler: Add `external_labels` option to tag all alerts with a given set of labels. * [FEATURE] Compactor: Add `-compactor.skip-blocks-with-out-of-order-chunks-enabled` configuration to mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction -* [FEATURE] Add shuffle sharding grouper and planner within compactor to allow further work towards parallelizing compaction #4624 +* [FEATURE] Add shuffle sharding grouper and planner within compactor to allow further work towards parallelizing compaction #4621 ## 1.12.0 in progress @@ -18,7 +18,8 @@ * [CHANGE] Memberlist: changed probe interval from `1s` to `5s` and probe timeout from `500ms` to `2s`. #4601 * [ENHANCEMENT] Update Go version to 1.17.8. #4602 #4604 #4658 * [ENHANCEMENT] Keep track of discarded samples due to bad relabel configuration in `cortex_discarded_samples_total`. #4503 -* [FEATURE] Add shuffle sharding grouper and planner within compactor to allow further work towards parallelizing compaction #4621 +* [FEATURE] Add shuffle sharding grouper and planner within compactor to allow further work towards parallelizing compaction #4624 +>>>>>>> bf3d29f2b (Fix up change log) * [FEATURE] Add shuffle sharding for the compactor #4433 * [ENHANCEMENT] Update Go version to 1.17.5. #4602 #4604 * [ENHANCEMENT] Keep track of discarded samples due to relabel configuration in `cortex_discarded_samples_total`. #4503 From eafb14e5c82faa90487ff22f62b6ed62f81a4164 Mon Sep 17 00:00:00 2001 From: Alvin Lin Date: Fri, 14 Jan 2022 20:21:22 -0800 Subject: [PATCH 08/16] Fix linting error Signed-off-by: Alvin Lin --- pkg/compactor/shuffle_sharding_grouper.go | 34 +++++++++++------------ 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/pkg/compactor/shuffle_sharding_grouper.go b/pkg/compactor/shuffle_sharding_grouper.go index 5dd6d12b96f..6f073008acf 100644 --- a/pkg/compactor/shuffle_sharding_grouper.go +++ b/pkg/compactor/shuffle_sharding_grouper.go @@ -23,14 +23,14 @@ import ( ) type ShuffleShardingGrouper struct { - logger log.Logger - bkt objstore.Bucket - acceptMalformedIndex bool - enableVerticalCompaction bool - reg prometheus.Registerer - blocksMarkedForDeletion prometheus.Counter - blocksMarkedForNoCompact prometheus.Counter - garbageCollectedBlocks prometheus.Counter + logger log.Logger + bkt objstore.Bucket + acceptMalformedIndex bool + enableVerticalCompaction bool + reg prometheus.Registerer + blocksMarkedForDeletion prometheus.Counter + blocksMarkedForNoCompact prometheus.Counter + garbageCollectedBlocks prometheus.Counter remainingPlannedCompactions prometheus.Gauge hashFunc metadata.HashFunc compactions *prometheus.CounterVec @@ -68,16 +68,16 @@ func NewShuffleShardingGrouper( } return &ShuffleShardingGrouper{ - logger: logger, - bkt: bkt, - acceptMalformedIndex: acceptMalformedIndex, - enableVerticalCompaction: enableVerticalCompaction, - reg: reg, - blocksMarkedForDeletion: blocksMarkedForDeletion, - blocksMarkedForNoCompact: blocksMarkedForNoCompact, - garbageCollectedBlocks: garbageCollectedBlocks, + logger: logger, + bkt: bkt, + acceptMalformedIndex: acceptMalformedIndex, + enableVerticalCompaction: enableVerticalCompaction, + reg: reg, + blocksMarkedForDeletion: blocksMarkedForDeletion, + blocksMarkedForNoCompact: blocksMarkedForNoCompact, + garbageCollectedBlocks: garbageCollectedBlocks, remainingPlannedCompactions: remainingPlannedCompactions, - hashFunc: hashFunc, + hashFunc: hashFunc, // Metrics are copied from Thanos DefaultGrouper constructor compactions: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_compact_group_compactions_total", From fb15202d2442bb0f69f5cf00d09375379915e3da Mon Sep 17 00:00:00 2001 From: Alvin Lin Date: Fri, 14 Jan 2022 22:01:53 -0800 Subject: [PATCH 09/16] Remove use of nolint Signed-off-by: Alvin Lin --- pkg/compactor/compactor.go | 16 ++++++++-------- pkg/compactor/shuffle_sharding_grouper.go | 4 ++-- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 1f23023526b..a27b445b8c8 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -54,7 +54,7 @@ var ( errShardingRequired = errors.New("sharding must be enabled to use shuffle-sharding sharding strategy") errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0") - DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion , blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ *ring.Ring, _ *ring.Lifecycler, _ CompactorLimits, _ string) compact.Grouper { + DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion , blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string) compact.Grouper { return compact.NewDefaultGrouper( logger, bkt, @@ -67,7 +67,7 @@ var ( metadata.NoneFunc) } - ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits CompactorLimits, userID string) compact.Grouper { + ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string) compact.Grouper { return NewShuffleShardingGrouper( logger, bkt, @@ -126,7 +126,7 @@ type BlocksGrouperFactory func( remainingPlannedCompactions prometheus.Gauge, ring *ring.Ring, ringLifecycler *ring.Lifecycler, - limit CompactorLimits, + limit Limits, userID string, ) compact.Grouper @@ -144,8 +144,8 @@ type PlannerFactory func( noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ) compact.Planner -// CompactorLimits defines limits used by the Compactor. -type CompactorLimits interface { //nolint +// Limits defines limits used by the Compactor. +type Limits interface { CompactorTenantShardSize(userID string) int } @@ -259,7 +259,7 @@ type Compactor struct { parentLogger log.Logger registerer prometheus.Registerer allowedTenants *util.AllowedTenants - limits CompactorLimits + limits Limits // Functions that creates bucket client, grouper, planner and compactor using the context. // Useful for injecting mock objects from tests. @@ -307,7 +307,7 @@ type Compactor struct { } // NewCompactor makes a new Compactor. -func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, cfgProvider ConfigProvider, logger log.Logger, registerer prometheus.Registerer, limits CompactorLimits) (*Compactor, error) { +func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, cfgProvider ConfigProvider, logger log.Logger, registerer prometheus.Registerer, limits Limits) (*Compactor, error) { bucketClientFactory := func(ctx context.Context) (objstore.Bucket, error) { return bucket.NewClient(ctx, storageCfg.Bucket, "compactor", logger, registerer) } @@ -347,7 +347,7 @@ func newCompactor( bucketClientFactory func(ctx context.Context) (objstore.Bucket, error), blocksGrouperFactory BlocksGrouperFactory, blocksCompactorFactory BlocksCompactorFactory, - limits CompactorLimits, + limits Limits, ) (*Compactor, error) { var remainingPlannedCompactions prometheus.Gauge if compactorCfg.ShardingStrategy == "shuffle-sharding" { diff --git a/pkg/compactor/shuffle_sharding_grouper.go b/pkg/compactor/shuffle_sharding_grouper.go index 6f073008acf..797c81bafd1 100644 --- a/pkg/compactor/shuffle_sharding_grouper.go +++ b/pkg/compactor/shuffle_sharding_grouper.go @@ -39,7 +39,7 @@ type ShuffleShardingGrouper struct { compactionFailures *prometheus.CounterVec verticalCompactions *prometheus.CounterVec compactorCfg Config - limits CompactorLimits + limits Limits userID string ring ring.ReadRing @@ -60,7 +60,7 @@ func NewShuffleShardingGrouper( compactorCfg Config, ring ring.ReadRing, ringLifecyclerAddr string, - limits CompactorLimits, + limits Limits, userID string, ) *ShuffleShardingGrouper { if logger == nil { From ccbf001d8693b878ac999bc67d075c05149cd4e0 Mon Sep 17 00:00:00 2001 From: Roy Chiang Date: Mon, 17 Jan 2022 15:35:45 -0800 Subject: [PATCH 10/16] Compactor.ownUser now determines whether the user is owned by a compactor via ring, instead of returning true if shuffle-sharding is enabled Signed-off-by: Roy Chiang --- pkg/compactor/compactor.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index a27b445b8c8..bd1fc5fa083 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -818,12 +818,23 @@ func (c *Compactor) ownUser(userID string) (bool, error) { return false, nil } - // Always owned if sharding is disabled or if using shuffle-sharding as shard ownership - // is determined by the shuffle sharding grouper. - if !c.compactorCfg.ShardingEnabled || c.compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle { + // Always owned if sharding is disabled + if !c.compactorCfg.ShardingEnabled { return true, nil } + // If using shuffle-sharding, ownership is determined by the ring + if c.compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle { + subRing := c.ring.ShuffleShard(userID, c.limits.CompactorTenantShardSize(userID)) + + rs, err := subRing.GetAllHealthy(RingOp) + if err != nil { + return false, err + } + + return rs.Includes(c.ringLifecycler.Addr), nil + } + // Hash the user ID. hasher := fnv.New32a() _, _ = hasher.Write([]byte(userID)) From 3cb44ee61a28302dbfbef96bc96c0d7d16d34945 Mon Sep 17 00:00:00 2001 From: Roy Chiang Date: Mon, 17 Jan 2022 15:52:19 -0800 Subject: [PATCH 11/16] fix bug where multiple compactors are trying to cleanup the same tenant at once, which results in dangling bucket index Signed-off-by: Roy Chiang --- pkg/compactor/compactor.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index bd1fc5fa083..6dc83e5f741 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -457,7 +457,7 @@ func (c *Compactor) starting(ctx context.Context) error { c.bucketClient = bucketindex.BucketWithGlobalMarkers(c.bucketClient) // Create the users scanner. - c.usersScanner = cortex_tsdb.NewUsersScanner(c.bucketClient, c.ownUser, c.parentLogger) + c.usersScanner = cortex_tsdb.NewUsersScanner(c.bucketClient, c.cleanUser, c.parentLogger) // Create the blocks cleaner (service). c.blocksCleaner = NewBlocksCleaner(BlocksCleanerConfig{ @@ -814,6 +814,14 @@ func (c *Compactor) discoverUsers(ctx context.Context) ([]string, error) { } func (c *Compactor) ownUser(userID string) (bool, error) { + return c.ownUserHelper(userID, false) +} + +func (c *Compactor) cleanUser(userID string) (bool, error) { + return c.ownUserHelper(userID, true) +} + +func (c *Compactor) ownUserHelper(userID string, isCleanUp bool) (bool, error) { if !c.allowedTenants.IsAllowed(userID) { return false, nil } @@ -823,8 +831,8 @@ func (c *Compactor) ownUser(userID string) (bool, error) { return true, nil } - // If using shuffle-sharding, ownership is determined by the ring - if c.compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle { + // If we aren't cleaning up user blocks, and we are using shuffle-sharding, ownership is determined by the ring + if !isCleanUp && c.compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle { subRing := c.ring.ShuffleShard(userID, c.limits.CompactorTenantShardSize(userID)) rs, err := subRing.GetAllHealthy(RingOp) From dd9681bbb02f94b3f538c96782a6e3acb8929347 Mon Sep 17 00:00:00 2001 From: Roy Chiang Date: Mon, 17 Jan 2022 15:55:23 -0800 Subject: [PATCH 12/16] set all remaining compation in one go, instead of slowly incrementing it as plans get generated Signed-off-by: Roy Chiang --- pkg/compactor/shuffle_sharding_grouper.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/compactor/shuffle_sharding_grouper.go b/pkg/compactor/shuffle_sharding_grouper.go index 797c81bafd1..914bca17942 100644 --- a/pkg/compactor/shuffle_sharding_grouper.go +++ b/pkg/compactor/shuffle_sharding_grouper.go @@ -134,7 +134,8 @@ func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (re return outGroups, nil } // Metrics for the remaining planned compactions - g.remainingPlannedCompactions.Set(0) + var remainingCompactions = 0. + defer g.remainingPlannedCompactions.Set(remainingCompactions) for _, mainBlocks := range mainGroups { for _, group := range groupBlocksByCompactableRanges(mainBlocks, g.compactorCfg.BlockRanges.ToMilliseconds()) { @@ -153,7 +154,7 @@ func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (re continue } - g.remainingPlannedCompactions.Inc() + remainingCompactions++ groupKey := fmt.Sprintf("%v%s", groupHash, compact.DefaultGroupKey(group.blocks[0].Thanos)) level.Info(g.logger).Log("msg", "found compactable group for user", "group_hash", groupHash, "group", group.String()) From 3ea1e363327f2d758ea4179674a4a8043ef9a5d6 Mon Sep 17 00:00:00 2001 From: Roy Chiang Date: Mon, 17 Jan 2022 17:49:36 -0800 Subject: [PATCH 13/16] rename ownUser function for better readability Signed-off-by: Roy Chiang --- pkg/compactor/compactor.go | 17 +++++++++-------- pkg/compactor/compactor_test.go | 2 +- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 6dc83e5f741..36ac23bf828 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -457,7 +457,7 @@ func (c *Compactor) starting(ctx context.Context) error { c.bucketClient = bucketindex.BucketWithGlobalMarkers(c.bucketClient) // Create the users scanner. - c.usersScanner = cortex_tsdb.NewUsersScanner(c.bucketClient, c.cleanUser, c.parentLogger) + c.usersScanner = cortex_tsdb.NewUsersScanner(c.bucketClient, c.ownUserForCleanUp, c.parentLogger) // Create the blocks cleaner (service). c.blocksCleaner = NewBlocksCleaner(BlocksCleanerConfig{ @@ -611,7 +611,7 @@ func (c *Compactor) compactUsers(ctx context.Context) { } // Ensure the user ID belongs to our shard. - if owned, err := c.ownUser(userID); err != nil { + if owned, err := c.ownUserForCompaction(userID); err != nil { c.compactionRunSkippedTenants.Inc() level.Warn(c.logger).Log("msg", "unable to check if user is owned by this shard", "user", userID, "err", err) continue @@ -813,15 +813,15 @@ func (c *Compactor) discoverUsers(ctx context.Context) ([]string, error) { return users, err } -func (c *Compactor) ownUser(userID string) (bool, error) { - return c.ownUserHelper(userID, false) +func (c *Compactor) ownUserForCompaction(userID string) (bool, error) { + return c.ownUser(userID, false) } -func (c *Compactor) cleanUser(userID string) (bool, error) { - return c.ownUserHelper(userID, true) +func (c *Compactor) ownUserForCleanUp(userID string) (bool, error) { + return c.ownUser(userID, true) } -func (c *Compactor) ownUserHelper(userID string, isCleanUp bool) (bool, error) { +func (c *Compactor) ownUser(userID string, isCleanUp bool) (bool, error) { if !c.allowedTenants.IsAllowed(userID) { return false, nil } @@ -831,7 +831,8 @@ func (c *Compactor) ownUserHelper(userID string, isCleanUp bool) (bool, error) { return true, nil } - // If we aren't cleaning up user blocks, and we are using shuffle-sharding, ownership is determined by the ring + // If we aren't cleaning up user blocks, and we are using shuffle-sharding, ownership is determined by a subring + // Cleanup should only be owned by a single compactor, as there could be race conditions during block deletion if !isCleanUp && c.compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle { subRing := c.ring.ShuffleShard(userID, c.limits.CompactorTenantShardSize(userID)) diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 8ab7cc8a220..916dcedee72 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -1394,7 +1394,7 @@ func findCompactorByUserID(compactors []*Compactor, logs []*concurrency.SyncBuff var log *concurrency.SyncBuffer for i, c := range compactors { - owned, err := c.ownUser(userID) + owned, err := c.ownUserForCompaction(userID) if err != nil { return nil, nil, err } From 1a7f65a5f5408ac40a3a660d0188e6284ff9a993 Mon Sep 17 00:00:00 2001 From: Roy Chiang Date: Tue, 18 Jan 2022 15:20:18 -0800 Subject: [PATCH 14/16] address comments Signed-off-by: Roy Chiang --- docs/guides/shuffle-sharding.md | 6 ++---- pkg/compactor/compactor.go | 11 ++++------- pkg/compactor/compactor_test.go | 8 -------- pkg/compactor/shuffle_sharding_grouper.go | 2 +- 4 files changed, 7 insertions(+), 20 deletions(-) diff --git a/docs/guides/shuffle-sharding.md b/docs/guides/shuffle-sharding.md index e8486f2f565..27b9c78c985 100644 --- a/docs/guides/shuffle-sharding.md +++ b/docs/guides/shuffle-sharding.md @@ -160,10 +160,8 @@ Note that when using sharding strategy, each rule group is evaluated by single r Cortex compactor can run in three modes: 1. **No sharding at all.** This is the most basic mode of the compactor. It is activated by using `-compactor.sharding-enabled=false` (default). In this mode every compactor will run every compaction. -2. **Default sharding**, activated by using `-compactor.sharding-enabled=true` and `-compactor.sharding-strategy=default` (default). In this mode compactors register themselves into the ring. Each compactor will then select and evaluate only those users that it "owns". -3. **Shuffle sharding**, activated by using `-compactor.sharding-enabled=true` and `-compactor.sharding-strategy=shuffle-sharding`. Similarly to default sharding, compactors use the ring to distribute workload, but compactions groups for each tenant can only be evaluated on limited number of compactors (`-compactor.tenant-shard-size`, can also be set per tenant as `compactor_tenant_shard_size` in overrides). - -The Cortex compactor by default shards by tenant ID when sharding is enabled. +2. **Default sharding**, activated by using `-compactor.sharding-enabled=true` and `-compactor.sharding-strategy=default` (default). In this mode compactors register themselves into the ring. One single tenant will belong to only 1 compactor. +3. **Shuffle sharding**, activated by using `-compactor.sharding-enabled=true` and `-compactor.sharding-strategy=shuffle-sharding`. Similarly to default sharding, but compactions for each tenant can be carried out on multiple compactors (`-compactor.tenant-shard-size`, can also be set per tenant as `compactor_tenant_shard_size` in overrides). With shuffle sharding selected as the sharding strategy, a subset of the compactors will be used to handle a user based on the shard size. diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 36ac23bf828..e5a02641316 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -51,7 +51,6 @@ var ( supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle} errInvalidShardingStrategy = errors.New("invalid sharding strategy") - errShardingRequired = errors.New("sharding must be enabled to use shuffle-sharding sharding strategy") errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0") DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion , blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string) compact.Grouper { @@ -231,10 +230,8 @@ func (cfg *Config) Validate(limits validation.Limits) error { return errInvalidShardingStrategy } - if cfg.ShardingStrategy == util.ShardingStrategyShuffle { - if !cfg.ShardingEnabled { - return errShardingRequired - } else if limits.CompactorTenantShardSize <= 0 { + if cfg.ShardingEnabled && cfg.ShardingStrategy == util.ShardingStrategyShuffle { + if limits.CompactorTenantShardSize <= 0 { return errInvalidTenantShardSize } } @@ -350,10 +347,10 @@ func newCompactor( limits Limits, ) (*Compactor, error) { var remainingPlannedCompactions prometheus.Gauge - if compactorCfg.ShardingStrategy == "shuffle-sharding" { + if compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle { remainingPlannedCompactions = promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ Name: "cortex_compactor_remaining_planned_compactions", - Help: "Total number of plans that remain to be compacted.", + Help: "Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy", }) } c := &Compactor{ diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 916dcedee72..f0009eea292 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -125,14 +125,6 @@ func TestConfig_Validate(t *testing.T) { }, expected: "", }, - "should fail with shuffle sharding strategy selected without sharding enabled": { - setup: func(cfg *Config) { - cfg.ShardingStrategy = util.ShardingStrategyShuffle - cfg.ShardingEnabled = false - }, - initLimits: func(_ *validation.Limits) {}, - expected: errShardingRequired.Error(), - }, "should fail with bad compactor tenant shard size": { setup: func(cfg *Config) { cfg.ShardingStrategy = util.ShardingStrategyShuffle diff --git a/pkg/compactor/shuffle_sharding_grouper.go b/pkg/compactor/shuffle_sharding_grouper.go index 914bca17942..702949cd4ae 100644 --- a/pkg/compactor/shuffle_sharding_grouper.go +++ b/pkg/compactor/shuffle_sharding_grouper.go @@ -130,7 +130,7 @@ func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (re return nil, errors.Wrap(err, "unable to check sub-ring for compactor ownership") } if !onSubring { - level.Info(g.logger).Log("msg", "compactor is not on the current sub-ring skipping user", "user", g.userID) + level.Debug(g.logger).Log("msg", "compactor is not on the current sub-ring skipping user", "user", g.userID) return outGroups, nil } // Metrics for the remaining planned compactions From 3cd209b015403cb09a6170080d243d8c3574b4f1 Mon Sep 17 00:00:00 2001 From: Roy Chiang Date: Wed, 6 Apr 2022 15:48:35 -0700 Subject: [PATCH 15/16] fixed rebase issues Signed-off-by: Roy Chiang --- CHANGELOG.md | 8 +------- pkg/compactor/compactor.go | 4 ++-- pkg/compactor/shuffle_sharding_grouper.go | 1 - 3 files changed, 3 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e53ac2bc4c7..5fe0f86b7b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,7 @@ * [CHANGE] Distributor: Apply `max_fetched_series_per_query` limit for `/series` API. #4683 * [FEATURE] Ruler: Add `external_labels` option to tag all alerts with a given set of labels. * [FEATURE] Compactor: Add `-compactor.skip-blocks-with-out-of-order-chunks-enabled` configuration to mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction -* [FEATURE] Add shuffle sharding grouper and planner within compactor to allow further work towards parallelizing compaction #4621 +* [FEATURE] Add shuffle sharding for the compactor #4433 ## 1.12.0 in progress @@ -18,12 +18,6 @@ * [CHANGE] Memberlist: changed probe interval from `1s` to `5s` and probe timeout from `500ms` to `2s`. #4601 * [ENHANCEMENT] Update Go version to 1.17.8. #4602 #4604 #4658 * [ENHANCEMENT] Keep track of discarded samples due to bad relabel configuration in `cortex_discarded_samples_total`. #4503 -* [FEATURE] Add shuffle sharding grouper and planner within compactor to allow further work towards parallelizing compaction #4624 ->>>>>>> bf3d29f2b (Fix up change log) -* [FEATURE] Add shuffle sharding for the compactor #4433 -* [ENHANCEMENT] Update Go version to 1.17.5. #4602 #4604 -* [ENHANCEMENT] Keep track of discarded samples due to relabel configuration in `cortex_discarded_samples_total`. #4503 ->>>>>>> c2b1835ac (update changelog) * [ENHANCEMENT] Ruler: Add `-ruler.disable-rule-group-label` to disable the `rule_group` label on exported metrics. #4571 * [ENHANCEMENT] Query federation: improve performance in MergeQueryable by memoizing labels. #4502 * [ENHANCEMENT] Added new ring related config `-ingester.readiness-check-ring-health` when enabled the readiness probe will succeed only after all instances are ACTIVE and healthy in the ring, this is enabled by default. #4539 diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index e5a02641316..352dcee7216 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -53,7 +53,7 @@ var ( errInvalidShardingStrategy = errors.New("invalid sharding strategy") errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0") - DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion , blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string) compact.Grouper { + DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string) compact.Grouper { return compact.NewDefaultGrouper( logger, bkt, @@ -66,7 +66,7 @@ var ( metadata.NoneFunc) } - ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string) compact.Grouper { + ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string) compact.Grouper { return NewShuffleShardingGrouper( logger, bkt, diff --git a/pkg/compactor/shuffle_sharding_grouper.go b/pkg/compactor/shuffle_sharding_grouper.go index 702949cd4ae..a631b728149 100644 --- a/pkg/compactor/shuffle_sharding_grouper.go +++ b/pkg/compactor/shuffle_sharding_grouper.go @@ -7,7 +7,6 @@ import ( "strings" "time" - "github.com/cortexproject/cortex/pkg/ring" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/oklog/ulid" From 18a8e709d7c7a04d0ad12d8db61c76151e184593 Mon Sep 17 00:00:00 2001 From: Roy Chiang Date: Wed, 6 Apr 2022 18:28:24 -0700 Subject: [PATCH 16/16] fix tests Signed-off-by: Roy Chiang --- pkg/compactor/compactor_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index f0009eea292..f419b9d248f 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -1181,6 +1181,7 @@ func TestCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingEnabledWit for blockID, blockTimes := range blocks { bucketClient.MockGet(userID+"/"+blockID+"/meta.json", mockBlockMetaJSONWithTime(blockID, userID, blockTimes["startTime"], blockTimes["endTime"]), nil) bucketClient.MockGet(userID+"/"+blockID+"/deletion-mark.json", "", nil) + bucketClient.MockGet(userID+"/"+blockID+"/no-compact-mark.json", "", nil) blockDirectory = append(blockDirectory, userID+"/"+blockID) // Get all of the unique group hashes so that they can be used to ensure all groups were compacted