Skip to content

Commit ac44d3e

Browse files
committed
--signoff
follow up 4870
1 parent 8dbab42 commit ac44d3e

File tree

7 files changed

+17
-19
lines changed

7 files changed

+17
-19
lines changed

pkg/compactor/block_visit_marker.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ func (b *BlockVisitMarker) isVisitedByCompactor(blockVisitMarkerTimeout time.Dur
3838
return time.Now().Before(b.VisitTime.Add(blockVisitMarkerTimeout)) && b.CompactorID == compactorID
3939
}
4040

41-
func ReadBlockVisitMarker(ctx context.Context, bkt objstore.Bucket, blockID string, blockVisitMarkerReadFailed prometheus.Counter) (*BlockVisitMarker, error) {
41+
func ReadBlockVisitMarker(ctx context.Context, bkt objstore.InstrumentedBucketReader, blockID string, blockVisitMarkerReadFailed prometheus.Counter) (*BlockVisitMarker, error) {
4242
visitMarkerFile := path.Join(blockID, BlockVisitMarkerFile)
43-
visitMarkerFileReader, err := bkt.Get(ctx, visitMarkerFile)
43+
visitMarkerFileReader, err := bkt.ReaderWithExpectedErrs(bkt.IsObjNotFoundErr).Get(ctx, visitMarkerFile)
4444
if err != nil {
4545
if bkt.IsObjNotFoundErr(err) {
4646
return nil, errors.Wrapf(ErrorBlockVisitMarkerNotFound, "block visit marker file: %s", visitMarkerFile)

pkg/compactor/compactor.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ var (
5252
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
5353
errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0")
5454

55-
DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ prometheus.Counter, _ prometheus.Counter, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string) compact.Grouper {
55+
DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ prometheus.Counter, _ prometheus.Counter, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string) compact.Grouper {
5656
return compact.NewDefaultGrouper(
5757
logger,
5858
bkt,
@@ -67,7 +67,7 @@ var (
6767
cfg.BlocksFetchConcurrency)
6868
}
6969

70-
ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string) compact.Grouper {
70+
ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string) compact.Grouper {
7171
return NewShuffleShardingGrouper(
7272
ctx,
7373
logger,
@@ -100,7 +100,7 @@ var (
100100
return nil, nil, err
101101
}
102102

103-
plannerFactory := func(ctx context.Context, bkt objstore.Bucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ prometheus.Counter, _ prometheus.Counter) compact.Planner {
103+
plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ prometheus.Counter, _ prometheus.Counter) compact.Planner {
104104
return compact.NewPlanner(logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter)
105105
}
106106

@@ -113,7 +113,7 @@ var (
113113
return nil, nil, err
114114
}
115115

116-
plannerFactory := func(ctx context.Context, bkt objstore.Bucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter) compact.Planner {
116+
plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter) compact.Planner {
117117

118118
return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.BlockVisitMarkerTimeout, cfg.BlockVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed)
119119
}
@@ -125,7 +125,7 @@ var (
125125
type BlocksGrouperFactory func(
126126
ctx context.Context,
127127
cfg Config,
128-
bkt objstore.Bucket,
128+
bkt objstore.InstrumentedBucket,
129129
logger log.Logger,
130130
reg prometheus.Registerer,
131131
blocksMarkedForDeletion prometheus.Counter,
@@ -150,7 +150,7 @@ type BlocksCompactorFactory func(
150150

151151
type PlannerFactory func(
152152
ctx context.Context,
153-
bkt objstore.Bucket,
153+
bkt objstore.InstrumentedBucket,
154154
logger log.Logger,
155155
cfg Config,
156156
noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter,
@@ -734,10 +734,6 @@ func (c *Compactor) compactUserWithRetries(ctx context.Context, userID string) e
734734
func (c *Compactor) compactUser(ctx context.Context, userID string) error {
735735
bucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider)
736736

737-
if ib, ok := bucket.WithExpectedErrs(bucket.IsObjNotFoundErr).(objstore.InstrumentedBucket); ok {
738-
bucket = ib
739-
}
740-
741737
reg := prometheus.NewRegistry()
742738
defer c.syncerMetrics.gatherThanosSyncerMetrics(reg)
743739

pkg/compactor/compactor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1538,7 +1538,7 @@ func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket, li
15381538

15391539
blocksCompactorFactory := func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) {
15401540
return tsdbCompactor,
1541-
func(ctx context.Context, bkt objstore.Bucket, _ log.Logger, _ Config, noCompactMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ prometheus.Counter, _ prometheus.Counter) compact.Planner {
1541+
func(ctx context.Context, bkt objstore.InstrumentedBucket, _ log.Logger, _ Config, noCompactMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ prometheus.Counter, _ prometheus.Counter) compact.Planner {
15421542
tsdbPlanner.noCompactMarkFilters = append(tsdbPlanner.noCompactMarkFilters, noCompactMarkFilter)
15431543
return tsdbPlanner
15441544
},

pkg/compactor/shuffle_sharding_grouper.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
type ShuffleShardingGrouper struct {
2626
ctx context.Context
2727
logger log.Logger
28-
bkt objstore.Bucket
28+
bkt objstore.InstrumentedBucket
2929
acceptMalformedIndex bool
3030
enableVerticalCompaction bool
3131
reg prometheus.Registerer
@@ -58,7 +58,7 @@ type ShuffleShardingGrouper struct {
5858
func NewShuffleShardingGrouper(
5959
ctx context.Context,
6060
logger log.Logger,
61-
bkt objstore.Bucket,
61+
bkt objstore.InstrumentedBucket,
6262
acceptMalformedIndex bool,
6363
enableVerticalCompaction bool,
6464
reg prometheus.Registerer,

pkg/compactor/shuffle_sharding_grouper_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/prometheus/client_golang/prometheus/testutil"
12+
"github.com/thanos-io/objstore"
1213

1314
"github.com/oklog/ulid"
1415
"github.com/prometheus/client_golang/prometheus"
@@ -368,7 +369,7 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) {
368369
g := NewShuffleShardingGrouper(
369370
ctx,
370371
nil,
371-
bkt,
372+
objstore.WithNoopInstr(bkt),
372373
false, // Do not accept malformed indexes
373374
true, // Enable vertical compaction
374375
registerer,

pkg/compactor/shuffle_sharding_planner.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414

1515
type ShuffleShardingPlanner struct {
1616
ctx context.Context
17-
bkt objstore.Bucket
17+
bkt objstore.InstrumentedBucket
1818
logger log.Logger
1919
ranges []int64
2020
noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark
@@ -27,7 +27,7 @@ type ShuffleShardingPlanner struct {
2727

2828
func NewShuffleShardingPlanner(
2929
ctx context.Context,
30-
bkt objstore.Bucket,
30+
bkt objstore.InstrumentedBucket,
3131
logger log.Logger,
3232
ranges []int64,
3333
noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark,

pkg/compactor/shuffle_sharding_planner_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/stretchr/testify/assert"
1717
"github.com/stretchr/testify/mock"
1818
"github.com/stretchr/testify/require"
19+
"github.com/thanos-io/objstore"
1920
"github.com/thanos-io/thanos/pkg/block/metadata"
2021

2122
"github.com/cortexproject/cortex/pkg/storage/bucket"
@@ -369,7 +370,7 @@ func TestShuffleShardingPlanner_Plan(t *testing.T) {
369370
logger := log.NewLogfmtLogger(logs)
370371
p := NewShuffleShardingPlanner(
371372
context.Background(),
372-
bkt,
373+
objstore.WithNoopInstr(bkt),
373374
logger,
374375
testData.ranges,
375376
func() map[ulid.ULID]*metadata.NoCompactMark {

0 commit comments

Comments
 (0)