Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/compactor/block_visit_marker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ func (b *BlockVisitMarker) isVisitedByCompactor(blockVisitMarkerTimeout time.Dur
return time.Now().Before(b.VisitTime.Add(blockVisitMarkerTimeout)) && b.CompactorID == compactorID
}

func ReadBlockVisitMarker(ctx context.Context, bkt objstore.Bucket, blockID string, blockVisitMarkerReadFailed prometheus.Counter) (*BlockVisitMarker, error) {
func ReadBlockVisitMarker(ctx context.Context, bkt objstore.InstrumentedBucketReader, blockID string, blockVisitMarkerReadFailed prometheus.Counter) (*BlockVisitMarker, error) {
visitMarkerFile := path.Join(blockID, BlockVisitMarkerFile)
visitMarkerFileReader, err := bkt.Get(ctx, visitMarkerFile)
visitMarkerFileReader, err := bkt.ReaderWithExpectedErrs(bkt.IsObjNotFoundErr).Get(ctx, visitMarkerFile)
if err != nil {
if bkt.IsObjNotFoundErr(err) {
return nil, errors.Wrapf(ErrorBlockVisitMarkerNotFound, "block visit marker file: %s", visitMarkerFile)
Expand Down
16 changes: 6 additions & 10 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ var (
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0")

DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ prometheus.Counter, _ prometheus.Counter, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string) compact.Grouper {
DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ prometheus.Counter, _ prometheus.Counter, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string) compact.Grouper {
return compact.NewDefaultGrouper(
logger,
bkt,
Expand All @@ -67,7 +67,7 @@ var (
cfg.BlocksFetchConcurrency)
}

ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string) compact.Grouper {
ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string) compact.Grouper {
return NewShuffleShardingGrouper(
ctx,
logger,
Expand Down Expand Up @@ -100,7 +100,7 @@ var (
return nil, nil, err
}

plannerFactory := func(ctx context.Context, bkt objstore.Bucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ prometheus.Counter, _ prometheus.Counter) compact.Planner {
plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ prometheus.Counter, _ prometheus.Counter) compact.Planner {
return compact.NewPlanner(logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter)
}

Expand All @@ -113,7 +113,7 @@ var (
return nil, nil, err
}

plannerFactory := func(ctx context.Context, bkt objstore.Bucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter) compact.Planner {
plannerFactory := func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter) compact.Planner {

return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.BlockVisitMarkerTimeout, cfg.BlockVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed)
}
Expand All @@ -125,7 +125,7 @@ var (
type BlocksGrouperFactory func(
ctx context.Context,
cfg Config,
bkt objstore.Bucket,
bkt objstore.InstrumentedBucket,
logger log.Logger,
reg prometheus.Registerer,
blocksMarkedForDeletion prometheus.Counter,
Expand All @@ -150,7 +150,7 @@ type BlocksCompactorFactory func(

type PlannerFactory func(
ctx context.Context,
bkt objstore.Bucket,
bkt objstore.InstrumentedBucket,
logger log.Logger,
cfg Config,
noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter,
Expand Down Expand Up @@ -734,10 +734,6 @@ func (c *Compactor) compactUserWithRetries(ctx context.Context, userID string) e
func (c *Compactor) compactUser(ctx context.Context, userID string) error {
bucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider)

if ib, ok := bucket.WithExpectedErrs(bucket.IsObjNotFoundErr).(objstore.InstrumentedBucket); ok {
bucket = ib
}

reg := prometheus.NewRegistry()
defer c.syncerMetrics.gatherThanosSyncerMetrics(reg)

Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1538,7 +1538,7 @@ func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket, li

blocksCompactorFactory := func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) {
return tsdbCompactor,
func(ctx context.Context, bkt objstore.Bucket, _ log.Logger, _ Config, noCompactMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ prometheus.Counter, _ prometheus.Counter) compact.Planner {
func(ctx context.Context, bkt objstore.InstrumentedBucket, _ log.Logger, _ Config, noCompactMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ prometheus.Counter, _ prometheus.Counter) compact.Planner {
tsdbPlanner.noCompactMarkFilters = append(tsdbPlanner.noCompactMarkFilters, noCompactMarkFilter)
return tsdbPlanner
},
Expand Down
4 changes: 2 additions & 2 deletions pkg/compactor/shuffle_sharding_grouper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
type ShuffleShardingGrouper struct {
ctx context.Context
logger log.Logger
bkt objstore.Bucket
bkt objstore.InstrumentedBucket
acceptMalformedIndex bool
enableVerticalCompaction bool
reg prometheus.Registerer
Expand Down Expand Up @@ -58,7 +58,7 @@ type ShuffleShardingGrouper struct {
func NewShuffleShardingGrouper(
ctx context.Context,
logger log.Logger,
bkt objstore.Bucket,
bkt objstore.InstrumentedBucket,
acceptMalformedIndex bool,
enableVerticalCompaction bool,
reg prometheus.Registerer,
Expand Down
3 changes: 2 additions & 1 deletion pkg/compactor/shuffle_sharding_grouper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/thanos-io/objstore"

"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -368,7 +369,7 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) {
g := NewShuffleShardingGrouper(
ctx,
nil,
bkt,
objstore.WithNoopInstr(bkt),
false, // Do not accept malformed indexes
true, // Enable vertical compaction
registerer,
Expand Down
4 changes: 2 additions & 2 deletions pkg/compactor/shuffle_sharding_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

type ShuffleShardingPlanner struct {
ctx context.Context
bkt objstore.Bucket
bkt objstore.InstrumentedBucket
logger log.Logger
ranges []int64
noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark
Expand All @@ -27,7 +27,7 @@ type ShuffleShardingPlanner struct {

func NewShuffleShardingPlanner(
ctx context.Context,
bkt objstore.Bucket,
bkt objstore.InstrumentedBucket,
logger log.Logger,
ranges []int64,
noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark,
Expand Down
3 changes: 2 additions & 1 deletion pkg/compactor/shuffle_sharding_planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block/metadata"

"github.com/cortexproject/cortex/pkg/storage/bucket"
Expand Down Expand Up @@ -369,7 +370,7 @@ func TestShuffleShardingPlanner_Plan(t *testing.T) {
logger := log.NewLogfmtLogger(logs)
p := NewShuffleShardingPlanner(
context.Background(),
bkt,
objstore.WithNoopInstr(bkt),
logger,
testData.ranges,
func() map[ulid.ULID]*metadata.NoCompactMark {
Expand Down