Skip to content

Commit cd88307

Browse files
committed
refactored with thanos changes
Signed-off-by: Alex Le <[email protected]>
1 parent dd117d7 commit cd88307

13 files changed

+591
-128
lines changed
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package compactor
2+
3+
import(
4+
"context"
5+
6+
"github.com/prometheus/prometheus/storage"
7+
)
8+
9+
type backgrounChunkSeriesSet struct {
10+
nextSet chan storage.ChunkSeries
11+
actual storage.ChunkSeries
12+
cs storage.ChunkSeriesSet
13+
}
14+
15+
func (b *backgrounChunkSeriesSet) Next() bool {
16+
select {
17+
case s, ok := <-b.nextSet:
18+
b.actual = s
19+
return ok
20+
}
21+
}
22+
23+
func (b *backgrounChunkSeriesSet) At() storage.ChunkSeries {
24+
return b.actual
25+
}
26+
27+
func (b *backgrounChunkSeriesSet) Err() error {
28+
return b.cs.Err()
29+
}
30+
31+
func (b *backgrounChunkSeriesSet) Warnings() storage.Warnings {
32+
return b.cs.Warnings()
33+
}
34+
35+
func (b *backgrounChunkSeriesSet) run(ctx context.Context) {
36+
for {
37+
if (!b.cs.Next()) {
38+
close(b.nextSet)
39+
return
40+
}
41+
42+
select {
43+
case b.nextSet <- b.cs.At():
44+
case <-ctx.Done():
45+
return
46+
}
47+
}
48+
}
49+
50+
func NewBackgroundChunkSeriesSet(ctx context.Context, cs storage.ChunkSeriesSet) storage.ChunkSeriesSet {
51+
r := &backgrounChunkSeriesSet{
52+
cs: cs,
53+
nextSet: make(chan storage.ChunkSeries, 1000),
54+
}
55+
56+
go func() {
57+
r.run(ctx)
58+
}()
59+
60+
return r
61+
}

pkg/compactor/block_visit_marker_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ func TestMarkBlockVisitedHeartBeat(t *testing.T) {
153153
}
154154
time.Sleep(2 * time.Second)
155155
for _, meta := range blocks {
156-
res, err := ReadBlockVisitMarker(ctx, objstore.WithNoopInstr(bkt), logger, meta.ULID.String(), partitionID, dummyCounter)
156+
res, err := ReadBlockVisitMarker(context.Background(), objstore.WithNoopInstr(bkt), logger, meta.ULID.String(), partitionID, dummyCounter)
157157
require.NoError(t, err)
158158
require.Equal(t, tcase.expectedStatus, res.Status)
159159
}

pkg/compactor/compactor.go

Lines changed: 36 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ var (
9797
blockVisitMarkerWriteFailed,
9898
partitionedGroupInfoReadFailed,
9999
partitionedGroupInfoWriteFailed,
100-
noCompactionMarkFilter.NoCompactMarkedBlocks)
100+
noCompactionMarkFilter.NoCompactMarkedBlocks,
101+
)
101102
}
102103

103104
DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) {
@@ -126,12 +127,12 @@ var (
126127
return compactor, plannerFactory, nil
127128
}
128129

129-
DefaultCompactionCompleteCheckerFactory = func(_ context.Context, _ objstore.InstrumentedBucket, _ log.Logger, _ prometheus.Counter, _ prometheus.Counter) compact.ExtraCompactionCompleteChecker {
130-
return compact.DefaultCompactionCompleteChecker{}
130+
DefaultBlockDeletableCheckerFactory = func(_ context.Context, _ objstore.InstrumentedBucket, _ log.Logger, _ prometheus.Counter, _ prometheus.Counter) compact.BlockDeletableChecker {
131+
return compact.DefaultBlockDeletableChecker{}
131132
}
132133

133-
PartitionCompactionCompleteCheckerFactory = func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, blockVisitMarkerReadFailed prometheus.Counter, partitionedGroupInfoWriteFailed prometheus.Counter) compact.ExtraCompactionCompleteChecker {
134-
return NewPartitionCompactionCompleteChecker(ctx, bkt, logger, blockVisitMarkerReadFailed, partitionedGroupInfoWriteFailed)
134+
PartitionCompactionBlockDeletableCheckerFactory = func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, blockVisitMarkerReadFailed prometheus.Counter, partitionedGroupInfoWriteFailed prometheus.Counter) compact.BlockDeletableChecker {
135+
return NewPartitionCompactionBlockDeletableChecker(ctx, bkt, logger, blockVisitMarkerReadFailed, partitionedGroupInfoWriteFailed)
135136
}
136137
)
137138

@@ -176,13 +177,13 @@ type PlannerFactory func(
176177
blockVisitMarkerWriteFailed prometheus.Counter,
177178
) compact.Planner
178179

179-
type CompactionCompleteCheckerFactory func(
180+
type BlockDeletableCheckerFactory func(
180181
ctx context.Context,
181182
bkt objstore.InstrumentedBucket,
182183
logger log.Logger,
183184
blockVisitMarkerReadFailed prometheus.Counter,
184185
partitionedGroupInfoReadFailed prometheus.Counter,
185-
) compact.ExtraCompactionCompleteChecker
186+
) compact.BlockDeletableChecker
186187

187188
// Limits defines limits used by the Compactor.
188189
type Limits interface {
@@ -334,7 +335,7 @@ type Compactor struct {
334335

335336
blocksPlannerFactory PlannerFactory
336337

337-
compactionCompleteCheckerFactory CompactionCompleteCheckerFactory
338+
blockDeletableCheckerFactory BlockDeletableCheckerFactory
338339

339340
// Client used to run operations on the bucket storing blocks.
340341
bucketClient objstore.Bucket
@@ -347,7 +348,6 @@ type Compactor struct {
347348

348349
// Metrics.
349350
compactionRunsStarted prometheus.Counter
350-
compactionRunsInterrupted prometheus.Counter
351351
compactionRunsCompleted prometheus.Counter
352352
compactionRunsFailed prometheus.Counter
353353
compactionRunsLastSuccess prometheus.Gauge
@@ -393,14 +393,14 @@ func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfi
393393
}
394394
}
395395

396-
var compactionCompleteCheckerFactory CompactionCompleteCheckerFactory
396+
var blockDeletableCheckerFactory BlockDeletableCheckerFactory
397397
if compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle {
398-
compactionCompleteCheckerFactory = PartitionCompactionCompleteCheckerFactory
398+
blockDeletableCheckerFactory = PartitionCompactionBlockDeletableCheckerFactory
399399
} else {
400-
compactionCompleteCheckerFactory = DefaultCompactionCompleteCheckerFactory
400+
blockDeletableCheckerFactory = DefaultBlockDeletableCheckerFactory
401401
}
402402

403-
cortexCompactor, err := newCompactor(compactorCfg, storageCfg, cfgProvider, logger, registerer, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory, compactionCompleteCheckerFactory, limits)
403+
cortexCompactor, err := newCompactor(compactorCfg, storageCfg, cfgProvider, logger, registerer, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory, blockDeletableCheckerFactory, limits)
404404
if err != nil {
405405
return nil, errors.Wrap(err, "failed to create Cortex blocks compactor")
406406
}
@@ -417,7 +417,7 @@ func newCompactor(
417417
bucketClientFactory func(ctx context.Context) (objstore.Bucket, error),
418418
blocksGrouperFactory BlocksGrouperFactory,
419419
blocksCompactorFactory BlocksCompactorFactory,
420-
compactionCompleteCheckerFactory CompactionCompleteCheckerFactory,
420+
blockDeletableCheckerFactory BlockDeletableCheckerFactory,
421421
limits Limits,
422422
) (*Compactor, error) {
423423
var remainingPlannedCompactions prometheus.Gauge
@@ -428,27 +428,23 @@ func newCompactor(
428428
})
429429
}
430430
c := &Compactor{
431-
compactorCfg: compactorCfg,
432-
storageCfg: storageCfg,
433-
cfgProvider: cfgProvider,
434-
parentLogger: logger,
435-
logger: log.With(logger, "component", "compactor"),
436-
registerer: registerer,
437-
syncerMetrics: newSyncerMetrics(registerer),
438-
bucketClientFactory: bucketClientFactory,
439-
blocksGrouperFactory: blocksGrouperFactory,
440-
blocksCompactorFactory: blocksCompactorFactory,
441-
compactionCompleteCheckerFactory: compactionCompleteCheckerFactory,
442-
allowedTenants: util.NewAllowedTenants(compactorCfg.EnabledTenants, compactorCfg.DisabledTenants),
431+
compactorCfg: compactorCfg,
432+
storageCfg: storageCfg,
433+
cfgProvider: cfgProvider,
434+
parentLogger: logger,
435+
logger: log.With(logger, "component", "compactor"),
436+
registerer: registerer,
437+
syncerMetrics: newSyncerMetrics(registerer),
438+
bucketClientFactory: bucketClientFactory,
439+
blocksGrouperFactory: blocksGrouperFactory,
440+
blocksCompactorFactory: blocksCompactorFactory,
441+
blockDeletableCheckerFactory: blockDeletableCheckerFactory,
442+
allowedTenants: util.NewAllowedTenants(compactorCfg.EnabledTenants, compactorCfg.DisabledTenants),
443443

444444
compactionRunsStarted: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
445445
Name: "cortex_compactor_runs_started_total",
446446
Help: "Total number of compaction runs started.",
447447
}),
448-
compactionRunsInterrupted: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
449-
Name: "cortex_compactor_runs_interrupted_total",
450-
Help: "Total number of compaction runs interrupted.",
451-
}),
452448
compactionRunsCompleted: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
453449
Name: "cortex_compactor_runs_completed_total",
454450
Help: "Total number of compaction runs successfully completed.",
@@ -656,23 +652,16 @@ func (c *Compactor) running(ctx context.Context) error {
656652
}
657653

658654
func (c *Compactor) compactUsers(ctx context.Context) {
659-
failed := false
660-
interrupted := false
655+
succeeded := false
656+
compactionErrorCount := 0
661657

662658
c.compactionRunsStarted.Inc()
663659

664660
defer func() {
665-
// interruptions and successful runs are considered
666-
// mutually exclusive but we consider a run failed if any
667-
// tenant runs failed even if later runs are interrupted
668-
if !interrupted && !failed {
661+
if succeeded && compactionErrorCount == 0 {
669662
c.compactionRunsCompleted.Inc()
670663
c.compactionRunsLastSuccess.SetToCurrentTime()
671-
}
672-
if interrupted {
673-
c.compactionRunsInterrupted.Inc()
674-
}
675-
if failed {
664+
} else {
676665
c.compactionRunsFailed.Inc()
677666
}
678667

@@ -686,7 +675,6 @@ func (c *Compactor) compactUsers(ctx context.Context) {
686675
level.Info(c.logger).Log("msg", "discovering users from bucket")
687676
users, err := c.discoverUsersWithRetries(ctx)
688677
if err != nil {
689-
failed = true
690678
level.Error(c.logger).Log("msg", "failed to discover users from bucket", "err", err)
691679
return
692680
}
@@ -706,8 +694,7 @@ func (c *Compactor) compactUsers(ctx context.Context) {
706694
for _, userID := range users {
707695
// Ensure the context has not been canceled (ie. compactor shutdown has been triggered).
708696
if ctx.Err() != nil {
709-
interrupted = true
710-
level.Info(c.logger).Log("msg", "interrupting compaction of user blocks", "user", userID)
697+
level.Info(c.logger).Log("msg", "interrupting compaction of user blocks", "err", err)
711698
return
712699
}
713700

@@ -737,15 +724,8 @@ func (c *Compactor) compactUsers(ctx context.Context) {
737724
level.Info(c.logger).Log("msg", "starting compaction of user blocks", "user", userID)
738725

739726
if err = c.compactUserWithRetries(ctx, userID); err != nil {
740-
// TODO: patch thanos error types to support errors.Is(err, context.Canceled) here
741-
if ctx.Err() != nil && ctx.Err() == context.Canceled {
742-
interrupted = true
743-
level.Info(c.logger).Log("msg", "interrupting compaction of user blocks", "user", userID)
744-
return
745-
}
746-
747727
c.compactionRunFailedTenants.Inc()
748-
failed = true
728+
compactionErrorCount++
749729
level.Error(c.logger).Log("msg", "failed to compact user blocks", "user", userID, "err", err)
750730
continue
751731
}
@@ -780,6 +760,8 @@ func (c *Compactor) compactUsers(ctx context.Context) {
780760
}
781761
}
782762
}
763+
764+
succeeded = true
783765
}
784766

785767
func (c *Compactor) compactUserWithRetries(ctx context.Context, userID string) error {
@@ -868,8 +850,8 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
868850
c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.blocksMarkedForNoCompaction, c.garbageCollectedBlocks, c.remainingPlannedCompactions, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, c.partitionedGroupInfoReadFailed, c.partitionedGroupInfoWriteFailed, c.ring, c.ringLifecycler, c.limits, userID, noCompactMarkerFilter),
869851
c.blocksPlannerFactory(ctx, bucket, ulogger, c.compactorCfg, noCompactMarkerFilter, c.ringLifecycler, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed),
870852
c.blocksCompactor,
871-
c.compactionCompleteCheckerFactory(ctx, bucket, ulogger, c.blockVisitMarkerReadFailed, c.partitionedGroupInfoReadFailed),
872-
compact.NoopCompactionLifecycleCallback{},
853+
c.blockDeletableCheckerFactory(ctx, bucket, ulogger, c.blockVisitMarkerReadFailed, c.partitionedGroupInfoReadFailed),
854+
ShardedCompactionLifecycleCallback{},
873855
path.Join(c.compactorCfg.DataDir, "compact"),
874856
bucket,
875857
c.compactorCfg.CompactionConcurrency,

pkg/compactor/compactor_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1614,14 +1614,14 @@ func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket, li
16141614
}
16151615
}
16161616

1617-
var compactionCompleteCheckerFactory CompactionCompleteCheckerFactory
1617+
var blockDeletableCheckerFactory BlockDeletableCheckerFactory
16181618
if compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle {
1619-
compactionCompleteCheckerFactory = PartitionCompactionCompleteCheckerFactory
1619+
blockDeletableCheckerFactory = PartitionCompactionBlockDeletableCheckerFactory
16201620
} else {
1621-
compactionCompleteCheckerFactory = DefaultCompactionCompleteCheckerFactory
1621+
blockDeletableCheckerFactory = DefaultBlockDeletableCheckerFactory
16221622
}
16231623

1624-
c, err := newCompactor(compactorCfg, storageCfg, overrides, logger, registry, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory, compactionCompleteCheckerFactory, overrides)
1624+
c, err := newCompactor(compactorCfg, storageCfg, overrides, logger, registry, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory, blockDeletableCheckerFactory, overrides)
16251625
require.NoError(t, err)
16261626

16271627
return c, tsdbCompactor, tsdbPlanner, logs, registry
@@ -1646,8 +1646,8 @@ func (m *tsdbCompactorMock) Compact(dest string, dirs []string, open []*tsdb.Blo
16461646
return args.Get(0).(ulid.ULID), args.Error(1)
16471647
}
16481648

1649-
func (m *tsdbCompactorMock) CompactWithAdditionalPostings(dest string, dirs []string, open []*tsdb.Block, additionalPostingsProvider tsdb.AdditionalPostingsFunc) (ulid.ULID, error) {
1650-
args := m.Called(dest, dirs, open, additionalPostingsProvider)
1649+
func (m *tsdbCompactorMock) CompactWithBlockPopulator(dest string, dirs []string, open []*tsdb.Block, blockPopulator tsdb.BlockPopulator) (ulid.ULID, error) {
1650+
args := m.Called(dest, dirs, open, blockPopulator)
16511651
return args.Get(0).(ulid.ULID), args.Error(1)
16521652
}
16531653

pkg/compactor/partition_compaction_complete_checker.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,22 @@ import (
1111
"github.com/thanos-io/thanos/pkg/compact"
1212
)
1313

14-
type PartitionCompactionCompleteChecker struct {
14+
type PartitionCompactionBlockDeletableChecker struct {
1515
ctx context.Context
1616
bkt objstore.InstrumentedBucket
1717
logger log.Logger
1818
blockVisitMarkerReadFailed prometheus.Counter
1919
partitionedGroupInfoReadFailed prometheus.Counter
2020
}
2121

22-
func NewPartitionCompactionCompleteChecker(
22+
func NewPartitionCompactionBlockDeletableChecker(
2323
ctx context.Context,
2424
bkt objstore.InstrumentedBucket,
2525
logger log.Logger,
2626
blockVisitMarkerReadFailed prometheus.Counter,
2727
partitionedGroupInfoReadFailed prometheus.Counter,
28-
) *PartitionCompactionCompleteChecker {
29-
return &PartitionCompactionCompleteChecker{
28+
) *PartitionCompactionBlockDeletableChecker {
29+
return &PartitionCompactionBlockDeletableChecker{
3030
ctx: ctx,
3131
bkt: bkt,
3232
logger: logger,
@@ -35,9 +35,12 @@ func NewPartitionCompactionCompleteChecker(
3535
}
3636
}
3737

38-
func (p *PartitionCompactionCompleteChecker) IsComplete(group *compact.Group, blockID ulid.ULID) bool {
39-
partitionedGroupID := group.PartitionedGroupID()
40-
currentPartitionID := group.PartitionID()
38+
func (p *PartitionCompactionBlockDeletableChecker) CanDelete(group *compact.Group, blockID ulid.ULID) bool {
39+
if group.PartitionedInfo() == nil {
40+
return true
41+
}
42+
partitionedGroupID := group.PartitionedInfo().PartitionedGroupID
43+
currentPartitionID := group.PartitionedInfo().PartitionID
4144
partitionedGroupInfo, err := ReadPartitionedGroupInfo(p.ctx, p.bkt, p.logger, partitionedGroupID, p.partitionedGroupInfoReadFailed)
4245
if err != nil {
4346
level.Warn(p.logger).Log("msg", "unable to read partitioned group info", "partitioned_group_id", partitionedGroupID, "block_id", blockID, "err", err)
@@ -46,7 +49,7 @@ func (p *PartitionCompactionCompleteChecker) IsComplete(group *compact.Group, bl
4649
return p.IsPartitionedBlockComplete(partitionedGroupInfo, currentPartitionID, blockID)
4750
}
4851

49-
func (p *PartitionCompactionCompleteChecker) IsPartitionedBlockComplete(partitionedGroupInfo *PartitionedGroupInfo, currentPartitionID int, blockID ulid.ULID) bool {
52+
func (p *PartitionCompactionBlockDeletableChecker) IsPartitionedBlockComplete(partitionedGroupInfo *PartitionedGroupInfo, currentPartitionID int, blockID ulid.ULID) bool {
5053
partitionedGroupID := partitionedGroupInfo.PartitionedGroupID
5154
for _, partitionID := range partitionedGroupInfo.getPartitionIDsByBlock(blockID) {
5255
// Skip current partition ID since current one is completed

0 commit comments

Comments
 (0)