-
Notifications
You must be signed in to change notification settings - Fork 839
Add planner filter #4318
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add planner filter #4318
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -120,6 +120,9 @@ type Config struct { | |
| // Allow downstream projects to customise the blocks compactor. | ||
| BlocksGrouperFactory BlocksGrouperFactory `yaml:"-"` | ||
| BlocksCompactorFactory BlocksCompactorFactory `yaml:"-"` | ||
|
|
||
| // Flag to enable planner filter | ||
| PlannerFilterEnabled bool `yaml:"planner_filter_enabled"` | ||
| } | ||
|
|
||
| // RegisterFlags registers the Compactor flags. | ||
|
|
@@ -146,6 +149,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { | |
| "If 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures.") | ||
| f.DurationVar(&cfg.TenantCleanupDelay, "compactor.tenant-cleanup-delay", 6*time.Hour, "For tenants marked for deletion, this is time between deleting of last block, and doing final cleanup (marker files, debug files) of the tenant.") | ||
| f.BoolVar(&cfg.BlockDeletionMarksMigrationEnabled, "compactor.block-deletion-marks-migration-enabled", true, "When enabled, at compactor startup the bucket will be scanned and all found deletion marks inside the block location will be copied to the markers global location too. This option can (and should) be safely disabled as soon as the compactor has successfully run at least once.") | ||
| f.BoolVar(&cfg.PlannerFilterEnabled, "compactor.planner-filter-enabled", false, "Filter and plan blocks within PlannerFilter instead of through Thanos planner and grouper.") | ||
|
|
||
| f.Var(&cfg.EnabledTenants, "compactor.enabled-tenants", "Comma separated list of tenants that can be compacted. If specified, only these tenants will be compacted by compactor, otherwise all tenants can be compacted. Subject to sharding.") | ||
| f.Var(&cfg.DisabledTenants, "compactor.disabled-tenants", "Comma separated list of tenants that cannot be compacted by this compactor. If specified, and compactor would normally pick given tenant for compaction (via -compactor.enabled-tenants or sharding), it will be ignored instead.") | ||
|
|
@@ -606,21 +610,51 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { | |
| time.Duration(c.compactorCfg.DeletionDelay.Seconds()/2)*time.Second, | ||
| c.compactorCfg.MetaSyncConcurrency) | ||
|
|
||
| // List of filters to apply (order matters). | ||
| fetcherFilters := []block.MetadataFilter{ | ||
| // Remove the ingester ID because we don't shard blocks anymore, while still | ||
| // honoring the shard ID if sharding was done in the past. | ||
| NewLabelRemoverFilter([]string{cortex_tsdb.IngesterIDExternalLabel}), | ||
| block.NewConsistencyDelayMetaFilter(ulogger, c.compactorCfg.ConsistencyDelay, reg), | ||
| ignoreDeletionMarkFilter, | ||
| deduplicateBlocksFilter, | ||
| } | ||
|
|
||
| // If config is set to use planner filter then generate plans and append it to the fetcherFilters | ||
| if c.compactorCfg.PlannerFilterEnabled { | ||
| level.Info(c.logger).Log("msg", "Compactor using planner filter") | ||
|
|
||
| // Create a new planner filter | ||
| f, err := NewPlannerFilter( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this is the right way to build it. It's not the responsability of the metadata fetcher to run the planning and filter out blocks belonging to other shards. It's not how the compactor was designed. You should build this feature working on the compactor grouper and planner. |
||
| ctx, | ||
| userID, | ||
| ulogger, | ||
| bucket, | ||
| fetcherFilters, | ||
| c.compactorCfg, | ||
| c.metaSyncDirForUser(userID), | ||
| ) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Generate all parallelizable plans | ||
| err = f.fetchBlocksAndGeneratePlans(ctx) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Add the planner filter to the fetcher's filters | ||
| fetcherFilters = append(fetcherFilters, f) | ||
| } | ||
|
|
||
| fetcher, err := block.NewMetaFetcher( | ||
| ulogger, | ||
| c.compactorCfg.MetaSyncConcurrency, | ||
| bucket, | ||
| c.metaSyncDirForUser(userID), | ||
| reg, | ||
| // List of filters to apply (order matters). | ||
| []block.MetadataFilter{ | ||
| // Remove the ingester ID because we don't shard blocks anymore, while still | ||
| // honoring the shard ID if sharding was done in the past. | ||
| NewLabelRemoverFilter([]string{cortex_tsdb.IngesterIDExternalLabel}), | ||
| block.NewConsistencyDelayMetaFilter(ulogger, c.compactorCfg.ConsistencyDelay, reg), | ||
| ignoreDeletionMarkFilter, | ||
| deduplicateBlocksFilter, | ||
| }, | ||
| fetcherFilters, | ||
| nil, | ||
| ) | ||
| if err != nil { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the config option should be that specific (what this CLI flag describes is an internal implementation detail). The whole purpose of the #4272 proposal is to introduce a different sharding strategy for the compactor. To keep it consistent with other Cortex services, the config option could be
compactor.sharding-strategywith valuesdefault(the current one) andshuffle-sharding(the new one you're working on).