Skip to content

Commit 4ef2edc

Browse files
committed
allow compactor to use bucket index when sync metas
1 parent 3b492ef commit 4ef2edc

File tree

3 files changed

+73
-36
lines changed

3 files changed

+73
-36
lines changed

pkg/compactor/compactor.go

Lines changed: 64 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"flag"
66
"fmt"
7+
"github.com/cortexproject/cortex/pkg/storegateway"
78
"hash/fnv"
89
"math/rand"
910
"os"
@@ -209,6 +210,8 @@ type Config struct {
209210
BlockVisitMarkerFileUpdateInterval time.Duration `yaml:"block_visit_marker_file_update_interval"`
210211

211212
AcceptMalformedIndex bool `yaml:"accept_malformed_index"`
213+
214+
BucketIndexMetadataFetcherEnabled bool `yaml:"bucket_index_metadata_fetcher_enabled"`
212215
}
213216

214217
// RegisterFlags registers the Compactor flags.
@@ -320,6 +323,9 @@ type Compactor struct {
320323
ringSubservices *services.Manager
321324
ringSubservicesWatcher *services.FailureWatcher
322325

326+
//sharding strategy
327+
shardingStrategy storegateway.ShardingStrategy
328+
323329
// Metrics.
324330
compactionRunsStarted prometheus.Counter
325331
compactionRunsInterrupted prometheus.Counter
@@ -474,6 +480,30 @@ func newCompactor(
474480
if len(compactorCfg.DisabledTenants) > 0 {
475481
level.Info(c.logger).Log("msg", "compactor using disabled users", "disabled", strings.Join(compactorCfg.DisabledTenants, ", "))
476482
}
483+
var err error
484+
if c.compactorCfg.ShardingEnabled {
485+
lifecyclerCfg := c.compactorCfg.ShardingRing.ToLifecyclerConfig()
486+
c.ringLifecycler, err = ring.NewLifecycler(lifecyclerCfg, ring.NewNoopFlushTransferer(), "compactor", ringKey, true, false, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer))
487+
if err != nil {
488+
return nil, errors.Wrap(err, "unable to initialize compactor ring lifecycler")
489+
}
490+
491+
c.ring, err = ring.New(lifecyclerCfg.RingConfig, "compactor", ringKey, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer))
492+
if err != nil {
493+
return nil, errors.Wrap(err, "unable to initialize compactor ring")
494+
}
495+
// Instance the right strategy.
496+
switch c.compactorCfg.ShardingStrategy {
497+
case util.ShardingStrategyDefault:
498+
c.shardingStrategy = storegateway.NewDefaultShardingStrategy(c.ring, c.ringLifecycler.Addr, logger)
499+
case util.ShardingStrategyShuffle:
500+
c.shardingStrategy = storegateway.NewShuffleShardingStrategy(c.ring, lifecyclerCfg.ID, lifecyclerCfg.Addr, limits, logger, c.compactorCfg.ShardingRing.ZoneStableShuffleSharding)
501+
default:
502+
return nil, errInvalidShardingStrategy
503+
}
504+
} else {
505+
c.shardingStrategy = storegateway.NewNoShardingStrategy()
506+
}
477507

478508
c.Service = services.NewBasicService(c.starting, c.running, c.stopping)
479509

@@ -516,17 +546,6 @@ func (c *Compactor) starting(ctx context.Context) error {
516546

517547
// Initialize the compactors ring if sharding is enabled.
518548
if c.compactorCfg.ShardingEnabled {
519-
lifecyclerCfg := c.compactorCfg.ShardingRing.ToLifecyclerConfig()
520-
c.ringLifecycler, err = ring.NewLifecycler(lifecyclerCfg, ring.NewNoopFlushTransferer(), "compactor", ringKey, true, false, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer))
521-
if err != nil {
522-
return errors.Wrap(err, "unable to initialize compactor ring lifecycler")
523-
}
524-
525-
c.ring, err = ring.New(lifecyclerCfg.RingConfig, "compactor", ringKey, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer))
526-
if err != nil {
527-
return errors.Wrap(err, "unable to initialize compactor ring")
528-
}
529-
530549
c.ringSubservices, err = services.NewManager(c.ringLifecycler, c.ring)
531550
if err == nil {
532551
c.ringSubservicesWatcher = services.NewFailureWatcher()
@@ -570,7 +589,6 @@ func (c *Compactor) starting(ctx context.Context) error {
570589
}
571590
}
572591
}
573-
574592
// Ensure an initial cleanup occurred before starting the compactor.
575593
if err := services.StartAndAwaitRunning(ctx, c.blocksCleaner); err != nil {
576594
c.ringSubservices.StopAsync()
@@ -789,28 +807,41 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
789807
// Filters out blocks with no compaction maker; blocks can be marked as no compaction for reasons like
790808
// out of order chunks or index file too big.
791809
noCompactMarkerFilter := compact.NewGatherNoCompactionMarkFilter(ulogger, bucket, c.compactorCfg.MetaSyncConcurrency)
792-
793-
fetcher, err := block.NewMetaFetcher(
794-
ulogger,
795-
c.compactorCfg.MetaSyncConcurrency,
796-
bucket,
797-
c.metaSyncDirForUser(userID),
798-
reg,
799-
// List of filters to apply (order matters).
800-
[]block.MetadataFilter{
801-
// Remove the ingester ID because we don't shard blocks anymore, while still
802-
// honoring the shard ID if sharding was done in the past.
803-
NewLabelRemoverFilter([]string{cortex_tsdb.IngesterIDExternalLabel}),
804-
block.NewConsistencyDelayMetaFilter(ulogger, c.compactorCfg.ConsistencyDelay, reg),
805-
ignoreDeletionMarkFilter,
806-
deduplicateBlocksFilter,
807-
noCompactMarkerFilter,
808-
},
809-
)
810-
if err != nil {
811-
return err
810+
var fetcher block.MetadataFetcher
811+
var err error
812+
filters := []block.MetadataFilter{
813+
// Remove the ingester ID because we don't shard blocks anymore, while still
814+
// honoring the shard ID if sharding was done in the past.
815+
NewLabelRemoverFilter([]string{cortex_tsdb.IngesterIDExternalLabel}),
816+
block.NewConsistencyDelayMetaFilter(ulogger, c.compactorCfg.ConsistencyDelay, reg),
817+
ignoreDeletionMarkFilter,
818+
deduplicateBlocksFilter,
819+
noCompactMarkerFilter,
820+
}
821+
if c.compactorCfg.BucketIndexMetadataFetcherEnabled {
822+
fetcher = storegateway.NewBucketIndexMetadataFetcher(
823+
userID,
824+
bucket,
825+
c.shardingStrategy,
826+
c.limits,
827+
ulogger,
828+
reg,
829+
filters,
830+
)
831+
} else {
832+
fetcher, err = block.NewMetaFetcher(
833+
ulogger,
834+
c.compactorCfg.MetaSyncConcurrency,
835+
bucket,
836+
c.metaSyncDirForUser(userID),
837+
reg,
838+
// List of filters to apply (order matters).
839+
filters,
840+
)
841+
if err != nil {
842+
return err
843+
}
812844
}
813-
814845
syncer, err := compact.NewMetaSyncer(
815846
ulogger,
816847
reg,

pkg/compactor/compactor_ring.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ type RingConfig struct {
3939

4040
WaitActiveInstanceTimeout time.Duration `yaml:"wait_active_instance_timeout"`
4141

42-
ObservePeriod time.Duration `yaml:"-"`
42+
ObservePeriod time.Duration `yaml:"-"`
43+
ZoneStableShuffleSharding bool `yaml:"zone_stable_shuffle_sharding" doc:"hidden"`
4344
}
4445

4546
// RegisterFlags adds the flags required to config this to the given FlagSet
@@ -70,6 +71,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
7071

7172
// Timeout durations
7273
f.DurationVar(&cfg.WaitActiveInstanceTimeout, "compactor.ring.wait-active-instance-timeout", 10*time.Minute, "Timeout for waiting on compactor to become ACTIVE in the ring.")
74+
f.BoolVar(&cfg.ZoneStableShuffleSharding, "compactor.ring.zone-stable-shuffle-sharding", false, "If true, use zone stable shuffle sharding algorithm. Otherwise, use the default shuffle sharding algorithm.")
7375
}
7476

7577
// ToLifecyclerConfig returns a LifecyclerConfig based on the compactor

vendor/github.com/thanos-io/thanos/pkg/compact/compact.go

Lines changed: 6 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)