Skip to content

Commit 58d378b

Browse files
committed
Introduced time-based concurrent compaction
Signed-off-by: Marco Pracucci <[email protected]>
1 parent b40c117 commit 58d378b

File tree

11 files changed

+1547
-318
lines changed

11 files changed

+1547
-318
lines changed

pkg/compactor/blocks_sharding_filter.go

Lines changed: 0 additions & 66 deletions
This file was deleted.

pkg/compactor/blocks_sharding_filter_test.go

Lines changed: 0 additions & 152 deletions
This file was deleted.

pkg/compactor/compactor.go

Lines changed: 32 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/prometheus/client_golang/prometheus"
1616
"github.com/prometheus/client_golang/prometheus/promauto"
1717
"github.com/prometheus/prometheus/tsdb"
18+
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
1819
"github.com/thanos-io/thanos/pkg/block"
1920
"github.com/thanos-io/thanos/pkg/compact"
2021
"github.com/thanos-io/thanos/pkg/compact/downsample"
@@ -28,20 +29,19 @@ import (
2829

2930
// Config holds the Compactor config.
3031
type Config struct {
31-
BlockRanges cortex_tsdb.DurationList `yaml:"block_ranges"`
32-
BlockSyncConcurrency int `yaml:"block_sync_concurrency"`
33-
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
34-
ConsistencyDelay time.Duration `yaml:"consistency_delay"`
35-
DataDir string `yaml:"data_dir"`
36-
CompactionInterval time.Duration `yaml:"compaction_interval"`
37-
CompactionRetries int `yaml:"compaction_retries"`
38-
DeletionDelay time.Duration `yaml:"deletion_delay"`
32+
BlockRanges cortex_tsdb.DurationList `yaml:"block_ranges"`
33+
BlockSyncConcurrency int `yaml:"block_sync_concurrency"`
34+
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
35+
ConsistencyDelay time.Duration `yaml:"consistency_delay"`
36+
DataDir string `yaml:"data_dir"`
37+
CompactionInterval time.Duration `yaml:"compaction_interval"`
38+
CompactionRetries int `yaml:"compaction_retries"`
39+
CompactionConcurrency int `yaml:"compaction_concurrency"`
40+
DeletionDelay time.Duration `yaml:"deletion_delay"`
3941

4042
// Compactors sharding.
41-
ShardingEnabled bool `yaml:"sharding_enabled"`
42-
ShardingRing RingConfig `yaml:"sharding_ring"`
43-
PerTenantNumShards uint `yaml:"per_tenant_num_shards"`
44-
PerTenantShardsConcurrency int `yaml:"per_tenant_shards_concurrency"`
43+
ShardingEnabled bool `yaml:"sharding_enabled"`
44+
ShardingRing RingConfig `yaml:"sharding_ring"`
4545

4646
// No need to add options to customize the retry backoff,
4747
// given the defaults should be fine, but allow to override
@@ -59,15 +59,14 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
5959
cfg.retryMaxBackoff = time.Minute
6060

6161
f.Var(&cfg.BlockRanges, "compactor.block-ranges", "List of compaction time ranges.")
62-
f.DurationVar(&cfg.ConsistencyDelay, "compactor.consistency-delay", 30*time.Minute, fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %s will be removed.", compact.PartialUploadThresholdAge))
62+
f.DurationVar(&cfg.ConsistencyDelay, "compactor.consistency-delay", 0, fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %s will be removed.", compact.PartialUploadThresholdAge))
6363
f.IntVar(&cfg.BlockSyncConcurrency, "compactor.block-sync-concurrency", 20, "Number of Go routines to use when syncing block index and chunks files from the long term storage.")
6464
f.IntVar(&cfg.MetaSyncConcurrency, "compactor.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from the long term storage.")
65+
f.IntVar(&cfg.CompactionConcurrency, "compactor.compaction-concurrency", 1, "Max number of concurrent compactions running.")
6566
f.StringVar(&cfg.DataDir, "compactor.data-dir", "./data", "Data directory in which to cache blocks and process compactions")
6667
f.DurationVar(&cfg.CompactionInterval, "compactor.compaction-interval", time.Hour, "The frequency at which the compaction runs")
6768
f.IntVar(&cfg.CompactionRetries, "compactor.compaction-retries", 3, "How many times to retry a failed compaction during a single compaction interval")
6869
f.BoolVar(&cfg.ShardingEnabled, "compactor.sharding-enabled", false, "Shard tenants across multiple compactor instances. Sharding is required if you run multiple compactor instances, in order to coordinate compactions and avoid race conditions leading to the same tenant blocks simultaneously compacted by different instances.")
69-
f.UintVar(&cfg.PerTenantNumShards, "compactor.per-tenant-num-shards", 1, "Number of shards a single tenant blocks should be grouped into (0 or 1 means per-tenant blocks sharding is disabled).")
70-
f.IntVar(&cfg.PerTenantShardsConcurrency, "compactor.per-tenant-shards-concurrency", 1, "Number of concurrent shards compacted for a single tenant.")
7170
f.DurationVar(&cfg.DeletionDelay, "compactor.deletion-delay", 12*time.Hour, "Time before a block marked for deletion is deleted from bucket. "+
7271
"If not 0, blocks will be marked for deletion and compactor component will delete blocks marked for deletion from the bucket. "+
7372
"If delete-delay is 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures, "+
@@ -82,7 +81,7 @@ type Compactor struct {
8281
storageCfg cortex_tsdb.Config
8382
logger log.Logger
8483

85-
// function that creates bucket client and TSDB compactor using the context.
84+
// Function that creates bucket client and TSDB compactor using the context.
8685
// Useful for injecting mock objects from tests.
8786
createBucketClientAndTsdbCompactor func(ctx context.Context) (objstore.Bucket, tsdb.Compactor, error)
8887

@@ -271,10 +270,12 @@ func (c *Compactor) compactUsersWithRetries(ctx context.Context) {
271270
c.compactionRunsStarted.Inc()
272271

273272
for retries.Ongoing() {
274-
if success := c.compactUsers(ctx); success {
273+
if err := c.compactUsers(ctx); err == nil {
275274
c.compactionRunsCompleted.Inc()
276275
c.compactionRunsLastSuccess.SetToCurrentTime()
277276
return
277+
} else if errors.Is(err, context.Canceled) {
278+
return
278279
}
279280

280281
retries.Wait()
@@ -283,20 +284,22 @@ func (c *Compactor) compactUsersWithRetries(ctx context.Context) {
283284
c.compactionRunsFailed.Inc()
284285
}
285286

286-
func (c *Compactor) compactUsers(ctx context.Context) bool {
287+
func (c *Compactor) compactUsers(ctx context.Context) error {
287288
level.Info(c.logger).Log("msg", "discovering users from bucket")
288289
users, err := c.discoverUsers(ctx)
289290
if err != nil {
290291
level.Error(c.logger).Log("msg", "failed to discover users from bucket", "err", err)
291-
return false
292+
return errors.Wrap(err, "failed to discover users from bucket")
292293
}
293294
level.Info(c.logger).Log("msg", "discovered users from bucket", "users", len(users))
294295

296+
errs := tsdb_errors.MultiError{}
297+
295298
for _, userID := range users {
296299
// Ensure the context has not been canceled (ie. compactor shutdown has been triggered).
297300
if ctx.Err() != nil {
298301
level.Info(c.logger).Log("msg", "interrupting compaction of user blocks", "err", err)
299-
return false
302+
return ctx.Err()
300303
}
301304

302305
// If sharding is enabled, ensure the user ID belongs to our shard.
@@ -314,13 +317,14 @@ func (c *Compactor) compactUsers(ctx context.Context) bool {
314317

315318
if err = c.compactUser(ctx, userID); err != nil {
316319
level.Error(c.logger).Log("msg", "failed to compact user blocks", "user", userID, "err", err)
320+
errs.Add(errors.Wrapf(err, "failed to compact user blocks (user: %s)", userID))
317321
continue
318322
}
319323

320324
level.Info(c.logger).Log("msg", "successfully compacted user blocks", "user", userID)
321325
}
322326

323-
return true
327+
return errs.Err()
324328
}
325329

326330
func (c *Compactor) compactUser(ctx context.Context, userID string) error {
@@ -348,9 +352,11 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
348352
// the directory used by the Thanos Syncer, whatever is the user ID.
349353
path.Join(c.compactorCfg.DataDir, "meta-"+userID),
350354
reg,
355+
// List of filters to apply (order matters).
351356
[]block.MetadataFilter{
352-
// List of filters to apply (order matters).
353-
NewBlocksShardingFilter(uint32(c.compactorCfg.PerTenantNumShards)),
357+
// Remove the ingester ID because we don't shard blocks anymore, while still
358+
// honoring the shard ID if sharding was already done.
359+
NewLabelRemoverFilter([]string{cortex_tsdb.IngesterIDExternalLabel}),
354360
block.NewConsistencyDelayMetaFilter(ulogger, c.compactorCfg.ConsistencyDelay, reg),
355361
ignoreDeletionMarkFilter,
356362
deduplicateBlocksFilter,
@@ -380,10 +386,11 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
380386
compactor, err := compact.NewBucketCompactor(
381387
ulogger,
382388
syncer,
383-
c.tsdbCompactor,
389+
NewTimeShardingGrouper(bucket, c.compactorCfg.BlockRanges.ToMilliseconds(), ulogger),
390+
NewTimeShardingCompactor(c.tsdbCompactor, c.compactorCfg.BlockRanges.ToMilliseconds()),
384391
path.Join(c.compactorCfg.DataDir, "compact"),
385392
bucket,
386-
c.compactorCfg.PerTenantShardsConcurrency,
393+
c.compactorCfg.CompactionConcurrency,
387394
)
388395
if err != nil {
389396
return errors.Wrap(err, "failed to create bucket compactor")

0 commit comments

Comments
 (0)