Skip to content

Commit 026c09a

Browse files
ac1214alvinlin123
authored andcommitted
Add metrics for remaining planned compactions
Signed-off-by: Albert <[email protected]>
1 parent b52735e commit 026c09a

File tree

2 files changed

+19
-3
lines changed

2 files changed

+19
-3
lines changed

pkg/compactor/compactor.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ var (
5151
supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle}
5252
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
5353

54-
DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter) compact.Grouper {
54+
DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge) compact.Grouper {
5555
return compact.NewDefaultGrouper(
5656
logger,
5757
bkt,
@@ -64,7 +64,7 @@ var (
6464
metadata.NoneFunc)
6565
}
6666

67-
ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter) compact.Grouper {
67+
ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge) compact.Grouper {
6868
return NewShuffleShardingGrouper(
6969
logger,
7070
bkt,
@@ -74,6 +74,7 @@ var (
7474
blocksMarkedForDeletion,
7575
prometheus.NewCounter(prometheus.CounterOpts{}),
7676
garbageCollectedBlocks,
77+
remainingPlannedCompactions,
7778
metadata.NoneFunc,
7879
cfg)
7980
}
@@ -108,6 +109,7 @@ type BlocksGrouperFactory func(
108109
reg prometheus.Registerer,
109110
blocksMarkedForDeletion prometheus.Counter,
110111
garbageCollectedBlocks prometheus.Counter,
112+
remainingPlannedCompactions prometheus.Gauge,
111113
) compact.Grouper
112114

113115
// BlocksCompactorFactory builds and returns the compactor and planner to use to compact a tenant's blocks.
@@ -256,6 +258,7 @@ type Compactor struct {
256258
compactionRunInterval prometheus.Gauge
257259
blocksMarkedForDeletion prometheus.Counter
258260
garbageCollectedBlocks prometheus.Counter
261+
remainingPlannedCompactions prometheus.Gauge
259262

260263
// TSDB syncer metrics
261264
syncerMetrics *syncerMetrics
@@ -303,6 +306,13 @@ func newCompactor(
303306
blocksGrouperFactory BlocksGrouperFactory,
304307
blocksCompactorFactory BlocksCompactorFactory,
305308
) (*Compactor, error) {
309+
var remainingPlannedCompactions prometheus.Gauge
310+
if compactorCfg.ShardingStrategy == "shuffle-sharding" {
311+
remainingPlannedCompactions = promauto.With(registerer).NewGauge(prometheus.GaugeOpts{
312+
Name: "cortex_compactor_remaining_planned_compactions",
313+
Help: "Total number of plans that remain to be compacted.",
314+
})
315+
}
306316
c := &Compactor{
307317
compactorCfg: compactorCfg,
308318
storageCfg: storageCfg,
@@ -361,6 +371,7 @@ func newCompactor(
361371
Name: "cortex_compactor_garbage_collected_blocks_total",
362372
Help: "Total number of blocks marked for deletion by compactor.",
363373
}),
374+
remainingPlannedCompactions: remainingPlannedCompactions,
364375
}
365376

366377
if len(compactorCfg.EnabledTenants) > 0 {
@@ -696,7 +707,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
696707
compactor, err := compact.NewBucketCompactor(
697708
ulogger,
698709
syncer,
699-
c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.garbageCollectedBlocks),
710+
c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.garbageCollectedBlocks, c.remainingPlannedCompactions),
700711
c.blocksPlanner,
701712
c.blocksCompactor,
702713
path.Join(c.compactorCfg.DataDir, "compact"),

pkg/compactor/shuffle_sharding_grouper.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ func NewShuffleShardingGrouper(
4646
blocksMarkedForDeletion prometheus.Counter,
4747
blocksMarkedForNoCompact prometheus.Counter,
4848
garbageCollectedBlocks prometheus.Counter,
49+
remainingPlannedCompactions prometheus.Gauge,
4950
hashFunc metadata.HashFunc,
5051
compactorCfg Config,
5152
) *ShuffleShardingGrouper {
@@ -103,6 +104,9 @@ func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (re
103104
var outGroups []*compact.Group
104105

105106
i := 0
107+
// Metrics for the remaining planned compactions
108+
g.remainingPlannedCompactions.Set(0)
109+
106110
for _, mainBlocks := range mainGroups {
107111
for _, group := range groupBlocksByCompactableRanges(mainBlocks, g.compactorCfg.BlockRanges.ToMilliseconds()) {
108112
// Nothing to do if we don't have at least 2 blocks.
@@ -113,6 +117,7 @@ func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (re
113117
// TODO: Use the group's hash to determine whether a compactor should be responsible for compacting that group
114118
groupHash := hashGroup(group.blocks[0].Thanos.Labels["__org_id__"], group.rangeStart, group.rangeEnd)
115119

120+
g.remainingPlannedCompactions.Inc()
116121
groupKey := fmt.Sprintf("%v%d", groupHash, i)
117122
i++
118123

0 commit comments

Comments
 (0)