Skip to content

Commit 8c0404f

Browse files
authored
Fixed no compact block got grouped in shuffle sharding grouper (#5055)
* Fixed no compact block got grouped in shuffle sharding grouper Signed-off-by: Alex Le <[email protected]> * Updated change log Signed-off-by: Alex Le <[email protected]> Signed-off-by: Alex Le <[email protected]>
1 parent 03e023d commit 8c0404f

File tree

4 files changed

+38
-8
lines changed

4 files changed

+38
-8
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
* [FEATURE] Build ARM docker images. #5041
1919
* [BUGFIX] Updated `golang.org/x/net` dependency to fix CVE-2022-27664. #5008
2020
* [BUGFIX] Fix panic when otel and xray tracing is enabled. #5044
21+
* [BUGFIX] Fixed no compact block got grouped in shuffle sharding grouper. #5055
2122

2223
## 1.14.0 2022-12-02
2324

pkg/compactor/compactor.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ var (
5252
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
5353
errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0")
5454

55-
DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ prometheus.Counter, _ prometheus.Counter, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string) compact.Grouper {
55+
DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ prometheus.Counter, _ prometheus.Counter, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string, _ *compact.GatherNoCompactionMarkFilter) compact.Grouper {
5656
return compact.NewDefaultGrouper(
5757
logger,
5858
bkt,
@@ -67,7 +67,7 @@ var (
6767
cfg.BlocksFetchConcurrency)
6868
}
6969

70-
ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string) compact.Grouper {
70+
ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter) compact.Grouper {
7171
return NewShuffleShardingGrouper(
7272
ctx,
7373
logger,
@@ -91,7 +91,8 @@ var (
9191
cfg.CompactionConcurrency,
9292
cfg.BlockVisitMarkerTimeout,
9393
blockVisitMarkerReadFailed,
94-
blockVisitMarkerWriteFailed)
94+
blockVisitMarkerWriteFailed,
95+
noCompactionMarkFilter.NoCompactMarkedBlocks)
9596
}
9697

9798
DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) {
@@ -138,6 +139,7 @@ type BlocksGrouperFactory func(
138139
ringLifecycler *ring.Lifecycler,
139140
limit Limits,
140141
userID string,
142+
noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter,
141143
) compact.Grouper
142144

143145
// BlocksCompactorFactory builds and returns the compactor and planner to use to compact a tenant's blocks.
@@ -814,7 +816,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
814816
compactor, err := compact.NewBucketCompactor(
815817
ulogger,
816818
syncer,
817-
c.blocksGrouperFactory(currentCtx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.blocksMarkedForNoCompaction, c.garbageCollectedBlocks, c.remainingPlannedCompactions, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, c.ring, c.ringLifecycler, c.limits, userID),
819+
c.blocksGrouperFactory(currentCtx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.blocksMarkedForNoCompaction, c.garbageCollectedBlocks, c.remainingPlannedCompactions, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, c.ring, c.ringLifecycler, c.limits, userID, noCompactMarkerFilter),
818820
c.blocksPlannerFactory(currentCtx, bucket, ulogger, c.compactorCfg, noCompactMarkerFilter, c.ringLifecycler, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed),
819821
c.blocksCompactor,
820822
path.Join(c.compactorCfg.DataDir, "compact"),

pkg/compactor/shuffle_sharding_grouper.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ type ShuffleShardingGrouper struct {
5353
blockVisitMarkerTimeout time.Duration
5454
blockVisitMarkerReadFailed prometheus.Counter
5555
blockVisitMarkerWriteFailed prometheus.Counter
56+
57+
noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark
5658
}
5759

5860
func NewShuffleShardingGrouper(
@@ -79,6 +81,7 @@ func NewShuffleShardingGrouper(
7981
blockVisitMarkerTimeout time.Duration,
8082
blockVisitMarkerReadFailed prometheus.Counter,
8183
blockVisitMarkerWriteFailed prometheus.Counter,
84+
noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark,
8285
) *ShuffleShardingGrouper {
8386
if logger == nil {
8487
logger = log.NewNopLogger()
@@ -129,17 +132,21 @@ func NewShuffleShardingGrouper(
129132
blockVisitMarkerTimeout: blockVisitMarkerTimeout,
130133
blockVisitMarkerReadFailed: blockVisitMarkerReadFailed,
131134
blockVisitMarkerWriteFailed: blockVisitMarkerWriteFailed,
135+
noCompBlocksFunc: noCompBlocksFunc,
132136
}
133137
}
134138

135139
// Groups function modified from https://github.com/cortexproject/cortex/pull/2616
136140
func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*compact.Group, err error) {
141+
noCompactMarked := g.noCompBlocksFunc()
137142
// First of all we have to group blocks using the Thanos default
138143
// grouping (based on downsample resolution + external labels).
139144
mainGroups := map[string][]*metadata.Meta{}
140145
for _, b := range blocks {
141-
key := b.Thanos.GroupKey()
142-
mainGroups[key] = append(mainGroups[key], b)
146+
if _, excluded := noCompactMarked[b.ULID]; !excluded {
147+
key := b.Thanos.GroupKey()
148+
mainGroups[key] = append(mainGroups[key], b)
149+
}
143150
}
144151

145152
// For each group, we have to further split it into set of blocks

pkg/compactor/shuffle_sharding_grouper_test.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,9 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) {
127127
compactorID string
128128
isExpired bool
129129
}
130-
expected [][]ulid.ULID
131-
metrics string
130+
expected [][]ulid.ULID
131+
metrics string
132+
noCompactBlocks map[ulid.ULID]*metadata.NoCompactMark
132133
}{
133134
"test basic grouping": {
134135
concurrency: 3,
@@ -306,6 +307,20 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) {
306307
cortex_compactor_remaining_planned_compactions 2
307308
`,
308309
},
310+
"test should skip block with no compact marker": {
311+
concurrency: 2,
312+
ranges: []time.Duration{4 * time.Hour},
313+
blocks: map[ulid.ULID]*metadata.Meta{block1hto2hExt1Ulid: blocks[block1hto2hExt1Ulid], block0hto1hExt1Ulid: blocks[block0hto1hExt1Ulid], block1hto2hExt2Ulid: blocks[block1hto2hExt2Ulid], block0hto1hExt2Ulid: blocks[block0hto1hExt2Ulid], block2hto3hExt1Ulid: blocks[block2hto3hExt1Ulid]},
314+
expected: [][]ulid.ULID{
315+
{block1hto2hExt2Ulid, block0hto1hExt2Ulid},
316+
{block1hto2hExt1Ulid, block0hto1hExt1Ulid},
317+
},
318+
metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted.
319+
# TYPE cortex_compactor_remaining_planned_compactions gauge
320+
cortex_compactor_remaining_planned_compactions 2
321+
`,
322+
noCompactBlocks: map[ulid.ULID]*metadata.NoCompactMark{block2hto3hExt1Ulid: {}},
323+
},
309324
}
310325

311326
for testName, testData := range tests {
@@ -364,6 +379,10 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) {
364379
bkt.MockUpload(mock.Anything, nil)
365380
bkt.MockGet(mock.Anything, "", nil)
366381

382+
noCompactFilter := func() map[ulid.ULID]*metadata.NoCompactMark {
383+
return testData.noCompactBlocks
384+
}
385+
367386
ctx, cancel := context.WithCancel(context.Background())
368387
defer cancel()
369388
g := NewShuffleShardingGrouper(
@@ -390,6 +409,7 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) {
390409
blockVisitMarkerTimeout,
391410
blockVisitMarkerReadFailed,
392411
blockVisitMarkerWriteFailed,
412+
noCompactFilter,
393413
)
394414
actual, err := g.Groups(testData.blocks)
395415
require.NoError(t, err)

0 commit comments

Comments
 (0)