diff --git a/pkg/compactor/block_visit_marker.go b/pkg/compactor/block_visit_marker.go index 3337f20b6e4..0e4ec811726 100644 --- a/pkg/compactor/block_visit_marker.go +++ b/pkg/compactor/block_visit_marker.go @@ -38,9 +38,9 @@ func (b *BlockVisitMarker) isVisitedByCompactor(blockVisitMarkerTimeout time.Dur return time.Now().Before(b.VisitTime.Add(blockVisitMarkerTimeout)) && b.CompactorID == compactorID } -func ReadBlockVisitMarker(ctx context.Context, bkt objstore.Bucket, blockID string, blockVisitMarkerReadFailed prometheus.Counter) (*BlockVisitMarker, error) { +func ReadBlockVisitMarker(ctx context.Context, bkt objstore.InstrumentedBucketReader, blockID string, blockVisitMarkerReadFailed prometheus.Counter) (*BlockVisitMarker, error) { visitMarkerFile := path.Join(blockID, BlockVisitMarkerFile) - visitMarkerFileReader, err := bkt.Get(ctx, visitMarkerFile) + visitMarkerFileReader, err := bkt.ReaderWithExpectedErrs(bkt.IsObjNotFoundErr).Get(ctx, visitMarkerFile) if err != nil { if bkt.IsObjNotFoundErr(err) { return nil, errors.Wrapf(ErrorBlockVisitMarkerNotFound, "block visit marker file: %s", visitMarkerFile) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 78e1f40ea2d..c36ee8224d7 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -52,7 +52,7 @@ var ( errInvalidShardingStrategy = errors.New("invalid sharding strategy") errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0") - DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ prometheus.Counter, _ prometheus.Counter, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string) compact.Grouper { + DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ prometheus.Counter, _ prometheus.Counter, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string) compact.Grouper { return compact.NewDefaultGrouper( logger, bkt, @@ -67,7 +67,7 @@ var ( cfg.BlocksFetchConcurrency) } - ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string) compact.Grouper { + ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string) compact.Grouper { return NewShuffleShardingGrouper( ctx, logger, @@ -100,7 +100,7 @@ var ( return nil, nil, err } - plannerFactory := func(ctx context.Context, bkt objstore.Bucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ prometheus.Counter, _ prometheus.Counter) compact.Planner { + plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ prometheus.Counter, _ prometheus.Counter) compact.Planner { return compact.NewPlanner(logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter) } @@ -113,7 +113,7 @@ var ( return nil, nil, err } - plannerFactory := func(ctx context.Context, bkt objstore.Bucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter) compact.Planner { + plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter) compact.Planner { return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.BlockVisitMarkerTimeout, cfg.BlockVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed) } @@ -125,7 +125,7 @@ var ( type BlocksGrouperFactory func( ctx context.Context, cfg Config, - bkt objstore.Bucket, + bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, @@ -150,7 +150,7 @@ type BlocksCompactorFactory func( type PlannerFactory func( ctx context.Context, - bkt objstore.Bucket, + bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, @@ -734,10 +734,6 @@ func (c *Compactor) compactUserWithRetries(ctx context.Context, userID string) e func (c *Compactor) compactUser(ctx context.Context, userID string) error { bucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider) - if ib, ok := bucket.WithExpectedErrs(bucket.IsObjNotFoundErr).(objstore.InstrumentedBucket); ok { - bucket = ib - } - reg := prometheus.NewRegistry() defer c.syncerMetrics.gatherThanosSyncerMetrics(reg) diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 7ffc1560340..f5dbb6efc69 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -1538,7 +1538,7 @@ func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket, li blocksCompactorFactory := func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) { return tsdbCompactor, - func(ctx context.Context, bkt objstore.Bucket, _ log.Logger, _ Config, noCompactMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ prometheus.Counter, _ prometheus.Counter) compact.Planner { + func(ctx context.Context, bkt objstore.InstrumentedBucket, _ log.Logger, _ Config, noCompactMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ prometheus.Counter, _ prometheus.Counter) compact.Planner { tsdbPlanner.noCompactMarkFilters = append(tsdbPlanner.noCompactMarkFilters, noCompactMarkFilter) return tsdbPlanner }, diff --git a/pkg/compactor/shuffle_sharding_grouper.go b/pkg/compactor/shuffle_sharding_grouper.go index 27d32be50f2..1be80caf807 100644 --- a/pkg/compactor/shuffle_sharding_grouper.go +++ b/pkg/compactor/shuffle_sharding_grouper.go @@ -25,7 +25,7 @@ import ( type ShuffleShardingGrouper struct { ctx context.Context logger log.Logger - bkt objstore.Bucket + bkt objstore.InstrumentedBucket acceptMalformedIndex bool enableVerticalCompaction bool reg prometheus.Registerer @@ -58,7 +58,7 @@ type ShuffleShardingGrouper struct { func NewShuffleShardingGrouper( ctx context.Context, logger log.Logger, - bkt objstore.Bucket, + bkt objstore.InstrumentedBucket, acceptMalformedIndex bool, enableVerticalCompaction bool, reg prometheus.Registerer, diff --git a/pkg/compactor/shuffle_sharding_grouper_test.go b/pkg/compactor/shuffle_sharding_grouper_test.go index d618fbd9559..b44738a0210 100644 --- a/pkg/compactor/shuffle_sharding_grouper_test.go +++ b/pkg/compactor/shuffle_sharding_grouper_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/thanos-io/objstore" "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" @@ -368,7 +369,7 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { g := NewShuffleShardingGrouper( ctx, nil, - bkt, + objstore.WithNoopInstr(bkt), false, // Do not accept malformed indexes true, // Enable vertical compaction registerer, diff --git a/pkg/compactor/shuffle_sharding_planner.go b/pkg/compactor/shuffle_sharding_planner.go index d878f625d22..9cf35ba5a04 100644 --- a/pkg/compactor/shuffle_sharding_planner.go +++ b/pkg/compactor/shuffle_sharding_planner.go @@ -14,7 +14,7 @@ import ( type ShuffleShardingPlanner struct { ctx context.Context - bkt objstore.Bucket + bkt objstore.InstrumentedBucket logger log.Logger ranges []int64 noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark @@ -27,7 +27,7 @@ type ShuffleShardingPlanner struct { func NewShuffleShardingPlanner( ctx context.Context, - bkt objstore.Bucket, + bkt objstore.InstrumentedBucket, logger log.Logger, ranges []int64, noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark, diff --git a/pkg/compactor/shuffle_sharding_planner_test.go b/pkg/compactor/shuffle_sharding_planner_test.go index b4d0a3f69e7..26b84f4238d 100644 --- a/pkg/compactor/shuffle_sharding_planner_test.go +++ b/pkg/compactor/shuffle_sharding_planner_test.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/cortexproject/cortex/pkg/storage/bucket" @@ -369,7 +370,7 @@ func TestShuffleShardingPlanner_Plan(t *testing.T) { logger := log.NewLogfmtLogger(logs) p := NewShuffleShardingPlanner( context.Background(), - bkt, + objstore.WithNoopInstr(bkt), logger, testData.ranges, func() map[ulid.ULID]*metadata.NoCompactMark {