Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
* [CHANGE] Remove support for alertmanager and ruler legacy store configuration. Before upgrading, you need to convert your configuration to use the `alertmanager-storage` and `ruler-storage` configuration on the version that you're already running, then upgrade.
* [CHANGE] Disables TSDB isolation. #4825
* [CHANGE] Drops support Prometheus 1.x rule format on configdb. #4826
* [CHANGE] Removes `-ingester.stream-chunks-when-using-blocks` experimental flag and stream chunks by default when `querier.ingester-streaming` is enabled. #4864
* [ENHANCEMENT] AlertManager: Retrying AlertManager Get Requests (Get Alertmanager status, Get Alertmanager Receivers) on next replica on error #4840
* [ENHANCEMENT] Querier/Ruler: Retry store-gateway in case of unexpected failure, instead of failing the query. #4532 #4839
* [ENHANCEMENT] Ring: DoBatch prioritize 4xx errors when failing. #4783
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ Currently experimental features are:
- Querier: tenant federation
- The thanosconvert tool for converting Thanos block metadata to Cortex
- HA Tracker: cleanup of old replicas from KV Store.
- Flags for configuring whether blocks-ingester streams samples or chunks are temporary, and will be removed when feature is tested:
- Flags for configuring whether blocks-ingester streams samples or chunks are temporary, and will be removed on next release:
- `-ingester.stream-chunks-when-using-blocks` CLI flag
- `-ingester_stream_chunks_when_using_blocks` (boolean) field in runtime config file
- Instance limits in ingester and distributor
Expand Down
2 changes: 0 additions & 2 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,8 +540,6 @@ func TestRulerMetricsForInvalidQueries(t *testing.T) {

// Very low limit so that ruler hits it.
"-querier.max-fetched-chunks-per-query": "5",
// We need this to make limit work.
"-ingester.stream-chunks-when-using-blocks": "true",
},
)

Expand Down
1 change: 0 additions & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,6 @@ func (t *Cortex) initIngesterService() (serv services.Service, err error) {
t.Cfg.Ingester.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort
t.Cfg.Ingester.DistributorShardingStrategy = t.Cfg.Distributor.ShardingStrategy
t.Cfg.Ingester.DistributorShardByAllLabels = t.Cfg.Distributor.ShardByAllLabels
t.Cfg.Ingester.StreamTypeFn = ingesterChunkStreaming(t.RuntimeConfig)
t.Cfg.Ingester.InstanceLimitsFn = ingesterInstanceLimits(t.RuntimeConfig)
t.tsdbIngesterConfig()

Expand Down
22 changes: 0 additions & 22 deletions pkg/cortex/runtime_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,28 +104,6 @@ func multiClientRuntimeConfigChannel(manager *runtimeconfig.Manager) func() <-ch
}
}

func ingesterChunkStreaming(manager *runtimeconfig.Manager) func() ingester.QueryStreamType {
if manager == nil {
return nil
}

return func() ingester.QueryStreamType {
val := manager.GetConfig()
if cfg, ok := val.(*runtimeConfigValues); ok && cfg != nil {
if cfg.IngesterChunkStreaming == nil {
return ingester.QueryStreamDefault
}

if *cfg.IngesterChunkStreaming {
return ingester.QueryStreamChunks
}
return ingester.QueryStreamSamples
}

return ingester.QueryStreamDefault
}
}

func ingesterInstanceLimits(manager *runtimeconfig.Manager) func() *ingester.InstanceLimits {
if manager == nil {
return nil
Expand Down
108 changes: 2 additions & 106 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,7 @@ type Config struct {
ActiveSeriesMetricsIdleTimeout time.Duration `yaml:"active_series_metrics_idle_timeout"`

// Use blocks storage.
BlocksStorageConfig cortex_tsdb.BlocksStorageConfig `yaml:"-"`
StreamChunksWhenUsingBlocks bool `yaml:"-"`
// Runtime-override for type of streaming query to use (chunks or samples).
StreamTypeFn func() QueryStreamType `yaml:"-"`
BlocksStorageConfig cortex_tsdb.BlocksStorageConfig `yaml:"-"`

// Injected at runtime and read from the distributor config, required
// to accurately apply global limits.
Expand All @@ -126,7 +123,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.ActiveSeriesMetricsEnabled, "ingester.active-series-metrics-enabled", true, "Enable tracking of active series and export them as metrics.")
f.DurationVar(&cfg.ActiveSeriesMetricsUpdatePeriod, "ingester.active-series-metrics-update-period", 1*time.Minute, "How often to update active series metrics.")
f.DurationVar(&cfg.ActiveSeriesMetricsIdleTimeout, "ingester.active-series-metrics-idle-timeout", 10*time.Minute, "After what time a series is considered to be inactive.")
f.BoolVar(&cfg.StreamChunksWhenUsingBlocks, "ingester.stream-chunks-when-using-blocks", false, "Stream chunks when using blocks. This is experimental feature and not yet tested. Once ready, it will be made default and this config option removed.")

f.Float64Var(&cfg.DefaultLimits.MaxIngestionRate, "ingester.instance-limits.max-ingestion-rate", 0, "Max ingestion rate (samples/sec) that ingester will accept. This limit is per-ingester, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. This limit only works when using blocks engine. 0 = unlimited.")
f.Int64Var(&cfg.DefaultLimits.MaxInMemoryTenants, "ingester.instance-limits.max-tenants", 0, "Max users that this ingester can hold. Requests from additional users will be rejected. This limit only works when using blocks engine. 0 = unlimited.")
Expand Down Expand Up @@ -223,15 +219,6 @@ func (r tsdbCloseCheckResult) shouldClose() bool {
return r == tsdbIdle || r == tsdbTenantMarkedForDeletion
}

// QueryStreamType defines type of function to use when doing query-stream operation.
type QueryStreamType int

const (
QueryStreamDefault QueryStreamType = iota // Use default configured value.
QueryStreamSamples // Stream individual samples.
QueryStreamChunks // Stream entire chunks.
)

type userTSDB struct {
db *tsdb.DB
userID string
Expand Down Expand Up @@ -1622,31 +1609,8 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_

numSamples := 0
numSeries := 0
numSeries, numSamples, err = i.queryStreamChunks(ctx, db, int64(from), int64(through), matchers, stream)

streamType := QueryStreamSamples
if i.cfg.StreamChunksWhenUsingBlocks {
streamType = QueryStreamChunks
}

if i.cfg.StreamTypeFn != nil {
runtimeType := i.cfg.StreamTypeFn()
switch runtimeType {
case QueryStreamChunks:
streamType = QueryStreamChunks
case QueryStreamSamples:
streamType = QueryStreamSamples
default:
// no change from config value.
}
}

if streamType == QueryStreamChunks {
level.Debug(spanlog).Log("msg", "using queryStreamChunks")
numSeries, numSamples, err = i.queryStreamChunks(ctx, db, int64(from), int64(through), matchers, stream)
} else {
level.Debug(spanlog).Log("msg", "using QueryStreamSamples")
numSeries, numSamples, err = i.queryStreamSamples(ctx, db, int64(from), int64(through), matchers, stream)
}
if err != nil {
return err
}
Expand All @@ -1657,74 +1621,6 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
return nil
}

func (i *Ingester) queryStreamSamples(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, stream client.Ingester_QueryStreamServer) (numSeries, numSamples int, _ error) {
q, err := db.Querier(ctx, from, through)
if err != nil {
return 0, 0, err
}
defer q.Close()

// It's not required to return sorted series because series are sorted by the Cortex querier.
ss := q.Select(false, nil, matchers...)
if ss.Err() != nil {
return 0, 0, ss.Err()
}

timeseries := make([]cortexpb.TimeSeries, 0, queryStreamBatchSize)
batchSizeBytes := 0
for ss.Next() {
series := ss.At()

// convert labels to LabelAdapter
ts := cortexpb.TimeSeries{
Labels: cortexpb.FromLabelsToLabelAdapters(series.Labels()),
}

it := series.Iterator()
for it.Next() {
t, v := it.At()
ts.Samples = append(ts.Samples, cortexpb.Sample{Value: v, TimestampMs: t})
}
numSamples += len(ts.Samples)
numSeries++
tsSize := ts.Size()

if (batchSizeBytes > 0 && batchSizeBytes+tsSize > queryStreamBatchMessageSize) || len(timeseries) >= queryStreamBatchSize {
// Adding this series to the batch would make it too big,
// flush the data and add it to new batch instead.
err = client.SendQueryStream(stream, &client.QueryStreamResponse{
Timeseries: timeseries,
})
if err != nil {
return 0, 0, err
}

batchSizeBytes = 0
timeseries = timeseries[:0]
}

timeseries = append(timeseries, ts)
batchSizeBytes += tsSize
}

// Ensure no error occurred while iterating the series set.
if err := ss.Err(); err != nil {
return 0, 0, err
}

// Final flush any existing metrics
if batchSizeBytes != 0 {
err = client.SendQueryStream(stream, &client.QueryStreamResponse{
Timeseries: timeseries,
})
if err != nil {
return 0, 0, err
}
}

return numSeries, numSamples, nil
}

// queryStreamChunks streams metrics from a TSDB. This implements the client.IngesterServer interface
func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, stream client.Ingester_QueryStreamServer) (numSeries, numSamples int, _ error) {
q, err := db.ChunkQuerier(ctx, from, through)
Expand Down
Loading