Skip to content

Commit bab9b8c

Browse files
committed
Add planner filter
1 parent a8604cd commit bab9b8c

File tree

5 files changed

+718
-125
lines changed

5 files changed

+718
-125
lines changed

pkg/compactor/compactor.go

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ var (
6262
return nil, nil, err
6363
}
6464

65-
planner, _ := NewPlanner(logger, cfg.BlockRanges.ToMilliseconds(), "default")
65+
planner := compact.NewTSDBBasedPlanner(logger, cfg.BlockRanges.ToMilliseconds())
6666
return compactor, planner, nil
6767
}
6868
)
@@ -120,6 +120,9 @@ type Config struct {
120120
// Allow downstream projects to customise the blocks compactor.
121121
BlocksGrouperFactory BlocksGrouperFactory `yaml:"-"`
122122
BlocksCompactorFactory BlocksCompactorFactory `yaml:"-"`
123+
124+
// Flag to enable planner filter
125+
PlannerFilterEnabled bool `yaml:"planner_filter_enabled"`
123126
}
124127

125128
// RegisterFlags registers the Compactor flags.
@@ -146,6 +149,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
146149
"If 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures.")
147150
f.DurationVar(&cfg.TenantCleanupDelay, "compactor.tenant-cleanup-delay", 6*time.Hour, "For tenants marked for deletion, this is time between deleting of last block, and doing final cleanup (marker files, debug files) of the tenant.")
148151
f.BoolVar(&cfg.BlockDeletionMarksMigrationEnabled, "compactor.block-deletion-marks-migration-enabled", true, "When enabled, at compactor startup the bucket will be scanned and all found deletion marks inside the block location will be copied to the markers global location too. This option can (and should) be safely disabled as soon as the compactor has successfully run at least once.")
152+
f.BoolVar(&cfg.PlannerFilterEnabled, "compactor.planner-filter-enabled", false, "Filter and plan blocks within PlannerFilter instead of through Thanos planner and grouper.")
149153

150154
f.Var(&cfg.EnabledTenants, "compactor.enabled-tenants", "Comma separated list of tenants that can be compacted. If specified, only these tenants will be compacted by compactor, otherwise all tenants can be compacted. Subject to sharding.")
151155
f.Var(&cfg.DisabledTenants, "compactor.disabled-tenants", "Comma separated list of tenants that cannot be compacted by this compactor. If specified, and compactor would normally pick given tenant for compaction (via -compactor.enabled-tenants or sharding), it will be ignored instead.")
@@ -606,21 +610,51 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
606610
time.Duration(c.compactorCfg.DeletionDelay.Seconds()/2)*time.Second,
607611
c.compactorCfg.MetaSyncConcurrency)
608612

613+
fetcherFilters := []block.MetadataFilter{
614+
// Remove the ingester ID because we don't shard blocks anymore, while still
615+
// honoring the shard ID if sharding was done in the past.
616+
NewLabelRemoverFilter([]string{cortex_tsdb.IngesterIDExternalLabel}),
617+
block.NewConsistencyDelayMetaFilter(ulogger, c.compactorCfg.ConsistencyDelay, reg),
618+
ignoreDeletionMarkFilter,
619+
deduplicateBlocksFilter,
620+
}
621+
622+
// If config is set to use planner filter then generate plans and append it to the fetcherFilters
623+
if c.compactorCfg.PlannerFilterEnabled {
624+
level.Info(c.logger).Log("msg", "Compactor using planner filter")
625+
626+
// Create a new planner filter
627+
f, err := NewPlannerFilter(
628+
ctx,
629+
userID,
630+
ulogger,
631+
bucket,
632+
fetcherFilters,
633+
c.compactorCfg,
634+
c.metaSyncDirForUser(userID),
635+
)
636+
if err != nil {
637+
return err
638+
}
639+
640+
// Generate all parallelizable plans
641+
err = f.fetchBlocksAndGeneratePlans(ctx)
642+
if err != nil {
643+
return err
644+
}
645+
646+
// Add the planner filter to the fetcher's filters
647+
fetcherFilters = append(fetcherFilters, f)
648+
}
649+
609650
fetcher, err := block.NewMetaFetcher(
610651
ulogger,
611652
c.compactorCfg.MetaSyncConcurrency,
612653
bucket,
613654
c.metaSyncDirForUser(userID),
614655
reg,
615656
// List of filters to apply (order matters).
616-
[]block.MetadataFilter{
617-
// Remove the ingester ID because we don't shard blocks anymore, while still
618-
// honoring the shard ID if sharding was done in the past.
619-
NewLabelRemoverFilter([]string{cortex_tsdb.IngesterIDExternalLabel}),
620-
block.NewConsistencyDelayMetaFilter(ulogger, c.compactorCfg.ConsistencyDelay, reg),
621-
ignoreDeletionMarkFilter,
622-
deduplicateBlocksFilter,
623-
},
657+
fetcherFilters,
624658
nil,
625659
)
626660
if err != nil {

pkg/compactor/planner.go

Lines changed: 0 additions & 58 deletions
This file was deleted.

0 commit comments

Comments
 (0)