Skip to content

Commit e3dc7b0

Browse files
authored
feat(parquetconverter): add support for additional sort columns during Parquet file generation (#7003)
* feat(parquetconverter): add support for additional sort columns during Parquet file generation Signed-off-by: Angith <[email protected]> * docs: update CHANGELOG.md to include support for sort columns during Parquet file generation Signed-off-by: Angith <[email protected]> * fix: Remove duplicate flag registration for parquet-converter.sort-columns Signed-off-by: Angith <[email protected]> * fix: exluded sort columns from base converter options Signed-off-by: Angith <[email protected]> * doc: updated config-file-reference.md with parquet_converter_sort_columns configuration Signed-off-by: Angith <[email protected]> * doc: cleaned up white noise Signed-off-by: Angith <[email protected]> --------- Signed-off-by: Angith <[email protected]>
1 parent f114c29 commit e3dc7b0

File tree

6 files changed

+56
-7
lines changed

6 files changed

+56
-7
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
* [FEATURE] Querier: Support for configuring query optimizers and enabling XFunctions in the Thanos engine. #6873
2525
* [FEATURE] Query Frontend: Add support /api/v1/format_query API for formatting queries. #6893
2626
* [FEATURE] Query Frontend: Add support for /api/v1/parse_query API (experimental) to parse a PromQL expression and return it as a JSON-formatted AST (abstract syntax tree). #6978
27+
* [ENHANCEMENT] Parquet Storage: Add support for additional sort columns during Parquet file generation #7003
2728
* [ENHANCEMENT] Modernizes the entire codebase by using go modernize tool. #7005
2829
* [ENHANCEMENT] Overrides Exporter: Expose all fields that can be converted to float64. Also, the label value `max_local_series_per_metric` got renamed to `max_series_per_metric`, and `max_local_series_per_user` got renamed to `max_series_per_user`. #6979
2930
* [ENHANCEMENT] Ingester: Add `cortex_ingester_tsdb_wal_replay_unknown_refs_total` and `cortex_ingester_tsdb_wbl_replay_unknown_refs_total` metrics to track unknown series references during wal/wbl replaying. #6945

docs/configuration/config-file-reference.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4252,6 +4252,11 @@ query_rejection:
42524252
# CLI flag: -parquet-converter.tenant-shard-size
42534253
[parquet_converter_tenant_shard_size: <float> | default = 0]
42544254

4255+
# Additional label names for specific tenants to sort by after metric name, in
4256+
# order of precedence. These are applied during Parquet file generation.
4257+
# CLI flag: -parquet-converter.sort-columns
4258+
[parquet_converter_sort_columns: <list of string> | default = []]
4259+
42554260
# S3 server-side encryption type. Required to enable server-side encryption
42564261
# overrides for a specific tenant. If not set, the default S3 client settings
42574262
# are used.

docs/guides/parquet-mode.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,9 @@ limits:
106106

107107
# Shard size for shuffle sharding (0 = disabled)
108108
parquet_converter_tenant_shard_size: 0.8
109+
110+
# Defines sort columns applied during Parquet file generation for specific tenants
111+
parquet_converter_sort_columns: ["label1", "label2"]
109112
```
110113
111114
You can also configure per-tenant settings using runtime configuration:
@@ -115,6 +118,7 @@ overrides:
115118
tenant-1:
116119
parquet_converter_enabled: true
117120
parquet_converter_tenant_shard_size: 2
121+
parquet_converter_sort_columns: ["cluster", "namespace"]
118122
tenant-2:
119123
parquet_converter_enabled: false
120124
```
@@ -280,6 +284,7 @@ cortex_parquet_queryable_cache_misses_total
280284
1. **Row Group Size**: Adjust `max_rows_per_row_group` based on your query patterns
281285
2. **Cache Size**: Tune `parquet_queryable_shard_cache_size` based on available memory
282286
3. **Concurrency**: Adjust `meta_sync_concurrency` based on object storage performance
287+
4. **Sort Columns**: Configure `parquet_converter_sort_columns` based on your most common query filters to improve query performance
283288

284289
### Fallback Configuration
285290

pkg/parquetconverter/converter.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,6 @@ func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex
139139
metrics: newMetrics(registerer),
140140
bkt: bkt,
141141
baseConverterOptions: []convert.ConvertOption{
142-
convert.WithSortBy(labels.MetricName),
143142
convert.WithColDuration(time.Hour * 8),
144143
convert.WithRowGroupSize(cfg.MaxRowsPerRowGroup),
145144
},
@@ -430,6 +429,11 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
430429

431430
converterOpts := append(c.baseConverterOptions, convert.WithName(b.ULID.String()))
432431

432+
sortColumns := []string{labels.MetricName}
433+
userConfiguredSortColumns := c.limits.ParquetConverterSortColumns(userID)
434+
sortColumns = append(sortColumns, userConfiguredSortColumns...)
435+
converterOpts = append(converterOpts, convert.WithSortBy(sortColumns...))
436+
433437
if c.cfg.FileBufferEnabled {
434438
converterOpts = append(converterOpts, convert.WithColumnPageBuffers(parquet.NewFileBufferPool(bdir, "buffers.*")))
435439
}

pkg/parquetconverter/converter_test.go

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,19 @@ func TestConverter(t *testing.T) {
5959
flagext.DefaultValues(limits)
6060
limits.ParquetConverterEnabled = true
6161

62-
c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits)
62+
userSpecificSortColumns := []string{"cluster", "namespace"}
63+
64+
// Create a mock tenant limits implementation
65+
tenantLimits := &mockTenantLimits{
66+
limits: map[string]*validation.Limits{
67+
user: {
68+
ParquetConverterSortColumns: userSpecificSortColumns,
69+
ParquetConverterEnabled: true,
70+
},
71+
},
72+
}
73+
74+
c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits, tenantLimits)
6375

6476
ctx := context.Background()
6577

@@ -157,7 +169,7 @@ func prepareConfig() Config {
157169
return cfg
158170
}
159171

160-
func prepare(t *testing.T, cfg Config, bucketClient objstore.InstrumentedBucket, limits *validation.Limits) (*Converter, log.Logger, prometheus.Gatherer) {
172+
func prepare(t *testing.T, cfg Config, bucketClient objstore.InstrumentedBucket, limits *validation.Limits, tenantLimits validation.TenantLimits) (*Converter, log.Logger, prometheus.Gatherer) {
161173
storageCfg := cortex_tsdb.BlocksStorageConfig{}
162174
blockRanges := cortex_tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}
163175
flagext.DefaultValues(&storageCfg)
@@ -176,7 +188,7 @@ func prepare(t *testing.T, cfg Config, bucketClient objstore.InstrumentedBucket,
176188
flagext.DefaultValues(limits)
177189
}
178190

179-
overrides := validation.NewOverrides(*limits, nil)
191+
overrides := validation.NewOverrides(*limits, tenantLimits)
180192

181193
scanner, err := users.NewScanner(cortex_tsdb.UsersScannerConfig{
182194
Strategy: cortex_tsdb.UserScanStrategyList,
@@ -384,3 +396,19 @@ func (r *RingMock) Get(key uint32, op ring.Operation, bufDescs []ring.InstanceDe
384396
},
385397
}, nil
386398
}
399+
400+
// mockTenantLimits implements the validation.TenantLimits interface for testing
401+
type mockTenantLimits struct {
402+
limits map[string]*validation.Limits
403+
}
404+
405+
func (m *mockTenantLimits) ByUserID(userID string) *validation.Limits {
406+
if limits, ok := m.limits[userID]; ok {
407+
return limits
408+
}
409+
return nil
410+
}
411+
412+
func (m *mockTenantLimits) AllByUserID() map[string]*validation.Limits {
413+
return m.limits
414+
}

pkg/util/validation/limits.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,9 +220,9 @@ type Limits struct {
220220
CompactorPartitionSeriesCount int64 `yaml:"compactor_partition_series_count" json:"compactor_partition_series_count"`
221221

222222
// Parquet converter
223-
ParquetConverterEnabled bool `yaml:"parquet_converter_enabled" json:"parquet_converter_enabled"`
224-
ParquetConverterTenantShardSize float64 `yaml:"parquet_converter_tenant_shard_size" json:"parquet_converter_tenant_shard_size"`
225-
223+
ParquetConverterEnabled bool `yaml:"parquet_converter_enabled" json:"parquet_converter_enabled"`
224+
ParquetConverterTenantShardSize float64 `yaml:"parquet_converter_tenant_shard_size" json:"parquet_converter_tenant_shard_size"`
225+
ParquetConverterSortColumns []string `yaml:"parquet_converter_sort_columns" json:"parquet_converter_sort_columns"`
226226
// This config doesn't have a CLI flag registered here because they're registered in
227227
// their own original config struct.
228228
S3SSEType string `yaml:"s3_sse_type" json:"s3_sse_type" doc:"nocli|description=S3 server-side encryption type. Required to enable server-side encryption overrides for a specific tenant. If not set, the default S3 client settings are used."`
@@ -325,6 +325,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
325325

326326
f.Float64Var(&l.ParquetConverterTenantShardSize, "parquet-converter.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the parquet converter. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant. If the value is < 1 and > 0 the shard size will be a percentage of the total parquet converters.")
327327
f.BoolVar(&l.ParquetConverterEnabled, "parquet-converter.enabled", false, "If set, enables the Parquet converter to create the parquet files.")
328+
f.Var((*flagext.StringSlice)(&l.ParquetConverterSortColumns), "parquet-converter.sort-columns", "Additional label names for specific tenants to sort by after metric name, in order of precedence. These are applied during Parquet file generation.")
328329

329330
// Parquet Queryable enforced limits.
330331
f.IntVar(&l.ParquetMaxFetchedRowCount, "querier.parquet-queryable.max-fetched-row-count", 0, "The maximum number of rows that can be fetched when querying parquet storage. Each row maps to a series in a parquet file. This limit applies before materializing chunks. 0 to disable.")
@@ -903,6 +904,11 @@ func (o *Overrides) ParquetConverterEnabled(userID string) bool {
903904
return o.GetOverridesForUser(userID).ParquetConverterEnabled
904905
}
905906

907+
// ParquetConverterSortColumns returns the additional sort columns for parquet files.
908+
func (o *Overrides) ParquetConverterSortColumns(userID string) []string {
909+
return o.GetOverridesForUser(userID).ParquetConverterSortColumns
910+
}
911+
906912
// ParquetMaxFetchedRowCount returns the maximum number of rows that can be fetched when querying parquet storage.
907913
func (o *Overrides) ParquetMaxFetchedRowCount(userID string) int {
908914
return o.GetOverridesForUser(userID).ParquetMaxFetchedRowCount

0 commit comments

Comments
 (0)