diff --git a/CHANGELOG.md b/CHANGELOG.md index bb2fca718b8..d24c5a6ea95 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ * [CHANGE] Remove support for alertmanager and ruler legacy store configuration. Before upgrading, you need to convert your configuration to use the `alertmanager-storage` and `ruler-storage` configuration on the version that you're already running, then upgrade. * [CHANGE] Disables TSDB isolation. #4825 * [CHANGE] Drops support Prometheus 1.x rule format on configdb. #4826 +* [CHANGE] Removes `-ingester.stream-chunks-when-using-blocks` experimental flag and stream chunks by default when `querier.ingester-streaming` is enabled. #4864 * [ENHANCEMENT] AlertManager: Retrying AlertManager Get Requests (Get Alertmanager status, Get Alertmanager Receivers) on next replica on error #4840 * [ENHANCEMENT] Querier/Ruler: Retry store-gateway in case of unexpected failure, instead of failing the query. #4532 #4839 * [ENHANCEMENT] Ring: DoBatch prioritize 4xx errors when failing. #4783 diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index 7c8b4cc63e2..ac69855eba3 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -65,7 +65,7 @@ Currently experimental features are: - Querier: tenant federation - The thanosconvert tool for converting Thanos block metadata to Cortex - HA Tracker: cleanup of old replicas from KV Store. -- Flags for configuring whether blocks-ingester streams samples or chunks are temporary, and will be removed when feature is tested: +- Flags for configuring whether blocks-ingester streams samples or chunks are temporary, and will be removed on next release: - `-ingester.stream-chunks-when-using-blocks` CLI flag - `-ingester_stream_chunks_when_using_blocks` (boolean) field in runtime config file - Instance limits in ingester and distributor diff --git a/integration/ruler_test.go b/integration/ruler_test.go index bfdc1570bc9..9e420490faf 100644 --- a/integration/ruler_test.go +++ b/integration/ruler_test.go @@ -540,8 +540,6 @@ func TestRulerMetricsForInvalidQueries(t *testing.T) { // Very low limit so that ruler hits it. "-querier.max-fetched-chunks-per-query": "5", - // We need this to make limit work. - "-ingester.stream-chunks-when-using-blocks": "true", }, ) diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 3ef5f891b8a..702634a66f3 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -391,7 +391,6 @@ func (t *Cortex) initIngesterService() (serv services.Service, err error) { t.Cfg.Ingester.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort t.Cfg.Ingester.DistributorShardingStrategy = t.Cfg.Distributor.ShardingStrategy t.Cfg.Ingester.DistributorShardByAllLabels = t.Cfg.Distributor.ShardByAllLabels - t.Cfg.Ingester.StreamTypeFn = ingesterChunkStreaming(t.RuntimeConfig) t.Cfg.Ingester.InstanceLimitsFn = ingesterInstanceLimits(t.RuntimeConfig) t.tsdbIngesterConfig() diff --git a/pkg/cortex/runtime_config.go b/pkg/cortex/runtime_config.go index 150c25727b2..67bfd928127 100644 --- a/pkg/cortex/runtime_config.go +++ b/pkg/cortex/runtime_config.go @@ -104,28 +104,6 @@ func multiClientRuntimeConfigChannel(manager *runtimeconfig.Manager) func() <-ch } } -func ingesterChunkStreaming(manager *runtimeconfig.Manager) func() ingester.QueryStreamType { - if manager == nil { - return nil - } - - return func() ingester.QueryStreamType { - val := manager.GetConfig() - if cfg, ok := val.(*runtimeConfigValues); ok && cfg != nil { - if cfg.IngesterChunkStreaming == nil { - return ingester.QueryStreamDefault - } - - if *cfg.IngesterChunkStreaming { - return ingester.QueryStreamChunks - } - return ingester.QueryStreamSamples - } - - return ingester.QueryStreamDefault - } -} - func ingesterInstanceLimits(manager *runtimeconfig.Manager) func() *ingester.InstanceLimits { if manager == nil { return nil diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 7ffa8dd3b13..66368fbc4e1 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -97,10 +97,7 @@ type Config struct { ActiveSeriesMetricsIdleTimeout time.Duration `yaml:"active_series_metrics_idle_timeout"` // Use blocks storage. - BlocksStorageConfig cortex_tsdb.BlocksStorageConfig `yaml:"-"` - StreamChunksWhenUsingBlocks bool `yaml:"-"` - // Runtime-override for type of streaming query to use (chunks or samples). - StreamTypeFn func() QueryStreamType `yaml:"-"` + BlocksStorageConfig cortex_tsdb.BlocksStorageConfig `yaml:"-"` // Injected at runtime and read from the distributor config, required // to accurately apply global limits. @@ -126,7 +123,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.ActiveSeriesMetricsEnabled, "ingester.active-series-metrics-enabled", true, "Enable tracking of active series and export them as metrics.") f.DurationVar(&cfg.ActiveSeriesMetricsUpdatePeriod, "ingester.active-series-metrics-update-period", 1*time.Minute, "How often to update active series metrics.") f.DurationVar(&cfg.ActiveSeriesMetricsIdleTimeout, "ingester.active-series-metrics-idle-timeout", 10*time.Minute, "After what time a series is considered to be inactive.") - f.BoolVar(&cfg.StreamChunksWhenUsingBlocks, "ingester.stream-chunks-when-using-blocks", false, "Stream chunks when using blocks. This is experimental feature and not yet tested. Once ready, it will be made default and this config option removed.") f.Float64Var(&cfg.DefaultLimits.MaxIngestionRate, "ingester.instance-limits.max-ingestion-rate", 0, "Max ingestion rate (samples/sec) that ingester will accept. This limit is per-ingester, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. This limit only works when using blocks engine. 0 = unlimited.") f.Int64Var(&cfg.DefaultLimits.MaxInMemoryTenants, "ingester.instance-limits.max-tenants", 0, "Max users that this ingester can hold. Requests from additional users will be rejected. This limit only works when using blocks engine. 0 = unlimited.") @@ -223,15 +219,6 @@ func (r tsdbCloseCheckResult) shouldClose() bool { return r == tsdbIdle || r == tsdbTenantMarkedForDeletion } -// QueryStreamType defines type of function to use when doing query-stream operation. -type QueryStreamType int - -const ( - QueryStreamDefault QueryStreamType = iota // Use default configured value. - QueryStreamSamples // Stream individual samples. - QueryStreamChunks // Stream entire chunks. -) - type userTSDB struct { db *tsdb.DB userID string @@ -1622,31 +1609,8 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ numSamples := 0 numSeries := 0 + numSeries, numSamples, err = i.queryStreamChunks(ctx, db, int64(from), int64(through), matchers, stream) - streamType := QueryStreamSamples - if i.cfg.StreamChunksWhenUsingBlocks { - streamType = QueryStreamChunks - } - - if i.cfg.StreamTypeFn != nil { - runtimeType := i.cfg.StreamTypeFn() - switch runtimeType { - case QueryStreamChunks: - streamType = QueryStreamChunks - case QueryStreamSamples: - streamType = QueryStreamSamples - default: - // no change from config value. - } - } - - if streamType == QueryStreamChunks { - level.Debug(spanlog).Log("msg", "using queryStreamChunks") - numSeries, numSamples, err = i.queryStreamChunks(ctx, db, int64(from), int64(through), matchers, stream) - } else { - level.Debug(spanlog).Log("msg", "using QueryStreamSamples") - numSeries, numSamples, err = i.queryStreamSamples(ctx, db, int64(from), int64(through), matchers, stream) - } if err != nil { return err } @@ -1657,74 +1621,6 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ return nil } -func (i *Ingester) queryStreamSamples(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, stream client.Ingester_QueryStreamServer) (numSeries, numSamples int, _ error) { - q, err := db.Querier(ctx, from, through) - if err != nil { - return 0, 0, err - } - defer q.Close() - - // It's not required to return sorted series because series are sorted by the Cortex querier. - ss := q.Select(false, nil, matchers...) - if ss.Err() != nil { - return 0, 0, ss.Err() - } - - timeseries := make([]cortexpb.TimeSeries, 0, queryStreamBatchSize) - batchSizeBytes := 0 - for ss.Next() { - series := ss.At() - - // convert labels to LabelAdapter - ts := cortexpb.TimeSeries{ - Labels: cortexpb.FromLabelsToLabelAdapters(series.Labels()), - } - - it := series.Iterator() - for it.Next() { - t, v := it.At() - ts.Samples = append(ts.Samples, cortexpb.Sample{Value: v, TimestampMs: t}) - } - numSamples += len(ts.Samples) - numSeries++ - tsSize := ts.Size() - - if (batchSizeBytes > 0 && batchSizeBytes+tsSize > queryStreamBatchMessageSize) || len(timeseries) >= queryStreamBatchSize { - // Adding this series to the batch would make it too big, - // flush the data and add it to new batch instead. - err = client.SendQueryStream(stream, &client.QueryStreamResponse{ - Timeseries: timeseries, - }) - if err != nil { - return 0, 0, err - } - - batchSizeBytes = 0 - timeseries = timeseries[:0] - } - - timeseries = append(timeseries, ts) - batchSizeBytes += tsSize - } - - // Ensure no error occurred while iterating the series set. - if err := ss.Err(); err != nil { - return 0, 0, err - } - - // Final flush any existing metrics - if batchSizeBytes != 0 { - err = client.SendQueryStream(stream, &client.QueryStreamResponse{ - Timeseries: timeseries, - }) - if err != nil { - return 0, 0, err - } - } - - return numSeries, numSamples, nil -} - // queryStreamChunks streams metrics from a TSDB. This implements the client.IngesterServer interface func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, stream client.Ingester_QueryStreamServer) (numSeries, numSamples int, _ error) { q, err := db.ChunkQuerier(ctx, from, through) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 9a85532d701..7559fb89ff0 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -1388,7 +1388,7 @@ func Test_Ingester_LabelNames(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "test") for _, series := range series { - req, _, _, _ := mockWriteRequest(t, series.lbls, series.value, series.timestamp) + req, _, _ := mockWriteRequest(t, series.lbls, series.value, series.timestamp) _, err := i.Push(ctx, req) require.NoError(t, err) } @@ -1432,7 +1432,7 @@ func Test_Ingester_LabelValues(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "test") for _, series := range series { - req, _, _, _ := mockWriteRequest(t, series.lbls, series.value, series.timestamp) + req, _, _ := mockWriteRequest(t, series.lbls, series.value, series.timestamp) _, err := i.Push(ctx, req) require.NoError(t, err) } @@ -1551,7 +1551,7 @@ func Test_Ingester_Query(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "test") for _, series := range series { - req, _, _, _ := mockWriteRequest(t, series.lbls, series.value, series.timestamp) + req, _, _ := mockWriteRequest(t, series.lbls, series.value, series.timestamp) _, err := i.Push(ctx, req) require.NoError(t, err) } @@ -1845,7 +1845,7 @@ func Test_Ingester_MetricsForLabelMatchers(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "test") for _, series := range fixtures { - req, _, _, _ := mockWriteRequest(t, series.lbls, series.value, series.timestamp) + req, _, _ := mockWriteRequest(t, series.lbls, series.value, series.timestamp) _, err := i.Push(ctx, req) require.NoError(t, err) } @@ -1984,12 +1984,6 @@ func TestIngester_QueryStream(t *testing.T) { // Create ingester. cfg := defaultIngesterTestConfig(t) - // change stream type in runtime. - var streamType QueryStreamType - cfg.StreamTypeFn = func() QueryStreamType { - return streamType - } - i, err := prepareIngesterWithBlocksStorage(t, cfg, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) @@ -2003,7 +1997,7 @@ func TestIngester_QueryStream(t *testing.T) { // Push series. ctx := user.InjectOrgID(context.Background(), userID) lbls := labels.Labels{{Name: labels.MetricName, Value: "foo"}} - req, _, expectedResponseSamples, expectedResponseChunks := mockWriteRequest(t, lbls, 123000, 456) + req, _, expectedResponseChunks := mockWriteRequest(t, lbls, 123000, 456) _, err = i.Push(ctx, req) require.NoError(t, err) @@ -2034,26 +2028,6 @@ func TestIngester_QueryStream(t *testing.T) { }}, } - samplesTest := func(t *testing.T) { - s, err := c.QueryStream(ctx, queryRequest) - require.NoError(t, err) - - count := 0 - var lastResp *client.QueryStreamResponse - for { - resp, err := s.Recv() - if err == io.EOF { - break - } - require.NoError(t, err) - require.Zero(t, len(resp.Chunkseries)) // No chunks expected - count += len(resp.Timeseries) - lastResp = resp - } - require.Equal(t, 1, count) - require.Equal(t, expectedResponseSamples, lastResp) - } - chunksTest := func(t *testing.T) { s, err := c.QueryStream(ctx, queryRequest) require.NoError(t, err) @@ -2074,114 +2048,12 @@ func TestIngester_QueryStream(t *testing.T) { require.Equal(t, expectedResponseChunks, lastResp) } - streamType = QueryStreamDefault - t.Run("default", samplesTest) - - streamType = QueryStreamSamples - t.Run("samples", samplesTest) - - streamType = QueryStreamChunks t.Run("chunks", chunksTest) } -func TestIngester_QueryStreamManySamples(t *testing.T) { - // Create ingester. - i, err := prepareIngesterWithBlocksStorage(t, defaultIngesterTestConfig(t), nil) - require.NoError(t, err) - require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) - defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck - - // Wait until it's ACTIVE. - test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} { - return i.lifecycler.GetState() - }) - - // Push series. - ctx := user.InjectOrgID(context.Background(), userID) - - const samplesCount = 100000 - samples := make([]cortexpb.Sample, 0, samplesCount) - - for i := 0; i < samplesCount; i++ { - samples = append(samples, cortexpb.Sample{ - Value: float64(i), - TimestampMs: int64(i), - }) - } - - // 10k samples encode to around 140 KiB, - _, err = i.Push(ctx, writeRequestSingleSeries(labels.Labels{{Name: labels.MetricName, Value: "foo"}, {Name: "l", Value: "1"}}, samples[0:10000])) - require.NoError(t, err) - - // 100k samples encode to around 1.4 MiB, - _, err = i.Push(ctx, writeRequestSingleSeries(labels.Labels{{Name: labels.MetricName, Value: "foo"}, {Name: "l", Value: "2"}}, samples)) - require.NoError(t, err) - - // 50k samples encode to around 716 KiB, - _, err = i.Push(ctx, writeRequestSingleSeries(labels.Labels{{Name: labels.MetricName, Value: "foo"}, {Name: "l", Value: "3"}}, samples[0:50000])) - require.NoError(t, err) - - // Create a GRPC server used to query back the data. - serv := grpc.NewServer(grpc.StreamInterceptor(middleware.StreamServerUserHeaderInterceptor)) - defer serv.GracefulStop() - client.RegisterIngesterServer(serv, i) - - listener, err := net.Listen("tcp", "localhost:0") - require.NoError(t, err) - - go func() { - require.NoError(t, serv.Serve(listener)) - }() - - // Query back the series using GRPC streaming. - c, err := client.MakeIngesterClient(listener.Addr().String(), defaultClientTestConfig()) - require.NoError(t, err) - defer c.Close() - - s, err := c.QueryStream(ctx, &client.QueryRequest{ - StartTimestampMs: 0, - EndTimestampMs: samplesCount + 1, - - Matchers: []*client.LabelMatcher{{ - Type: client.EQUAL, - Name: model.MetricNameLabel, - Value: "foo", - }}, - }) - require.NoError(t, err) - - recvMsgs := 0 - series := 0 - totalSamples := 0 - - for { - resp, err := s.Recv() - if err == io.EOF { - break - } - require.NoError(t, err) - require.True(t, len(resp.Timeseries) > 0) // No empty messages. - - recvMsgs++ - series += len(resp.Timeseries) - - for _, ts := range resp.Timeseries { - totalSamples += len(ts.Samples) - } - } - - // As ingester doesn't guarantee sorting of series, we can get 2 (10k + 50k in first, 100k in second) - // or 3 messages (small series first, 100k second, small series last). - - require.True(t, 2 <= recvMsgs && recvMsgs <= 3) - require.Equal(t, 3, series) - require.Equal(t, 10000+50000+samplesCount, totalSamples) -} - func TestIngester_QueryStreamManySamplesChunks(t *testing.T) { // Create ingester. cfg := defaultIngesterTestConfig(t) - cfg.StreamChunksWhenUsingBlocks = true i, err := prepareIngesterWithBlocksStorage(t, cfg, nil) require.NoError(t, err) @@ -2307,17 +2179,12 @@ func (m *mockQueryStreamServer) Context() context.Context { return m.ctx } -func BenchmarkIngester_QueryStream_Samples(b *testing.B) { - benchmarkQueryStream(b, false) -} - func BenchmarkIngester_QueryStream_Chunks(b *testing.B) { - benchmarkQueryStream(b, true) + benchmarkQueryStream(b) } -func benchmarkQueryStream(b *testing.B, streamChunks bool) { +func benchmarkQueryStream(b *testing.B) { cfg := defaultIngesterTestConfig(b) - cfg.StreamChunksWhenUsingBlocks = streamChunks // Create ingester. i, err := prepareIngesterWithBlocksStorage(b, cfg, nil) @@ -2370,7 +2237,7 @@ func benchmarkQueryStream(b *testing.B, streamChunks bool) { } } -func mockWriteRequest(t *testing.T, lbls labels.Labels, value float64, timestampMs int64) (*cortexpb.WriteRequest, *client.QueryResponse, *client.QueryStreamResponse, *client.QueryStreamResponse) { +func mockWriteRequest(t *testing.T, lbls labels.Labels, value float64, timestampMs int64) (*cortexpb.WriteRequest, *client.QueryResponse, *client.QueryStreamResponse) { samples := []cortexpb.Sample{ { TimestampMs: timestampMs, @@ -2390,15 +2257,6 @@ func mockWriteRequest(t *testing.T, lbls labels.Labels, value float64, timestamp }, } - expectedQueryStreamResSamples := &client.QueryStreamResponse{ - Timeseries: []cortexpb.TimeSeries{ - { - Labels: cortexpb.FromLabelsToLabelAdapters(lbls), - Samples: samples, - }, - }, - } - chunk := chunkenc.NewXORChunk() app, err := chunk.Appender() require.NoError(t, err) @@ -2421,7 +2279,7 @@ func mockWriteRequest(t *testing.T, lbls labels.Labels, value float64, timestamp }, } - return req, expectedQueryRes, expectedQueryStreamResSamples, expectedQueryStreamResChunks + return req, expectedQueryRes, expectedQueryStreamResChunks } func prepareIngesterWithBlocksStorage(t testing.TB, ingesterCfg Config, registerer prometheus.Registerer) (*Ingester, error) { @@ -2935,7 +2793,7 @@ func TestIngester_invalidSamplesDontChangeLastUpdateTime(t *testing.T) { sampleTimestamp := int64(model.Now()) { - req, _, _, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, sampleTimestamp) + req, _, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, sampleTimestamp) _, err = i.Push(ctx, req) require.NoError(t, err) } @@ -2950,7 +2808,7 @@ func TestIngester_invalidSamplesDontChangeLastUpdateTime(t *testing.T) { // Push another sample to the same metric and timestamp, with different value. We expect to get error. { - req, _, _, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test"}}, 1, sampleTimestamp) + req, _, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test"}}, 1, sampleTimestamp) _, err = i.Push(ctx, req) require.Error(t, err) } @@ -3267,7 +3125,7 @@ func Test_Ingester_UserStats(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "test") for _, series := range series { - req, _, _, _ := mockWriteRequest(t, series.lbls, series.value, series.timestamp) + req, _, _ := mockWriteRequest(t, series.lbls, series.value, series.timestamp) _, err := i.Push(ctx, req) require.NoError(t, err) } @@ -3312,7 +3170,7 @@ func Test_Ingester_AllUserStats(t *testing.T) { }) for _, series := range series { ctx := user.InjectOrgID(context.Background(), series.user) - req, _, _, _ := mockWriteRequest(t, series.lbls, series.value, series.timestamp) + req, _, _ := mockWriteRequest(t, series.lbls, series.value, series.timestamp) _, err := i.Push(ctx, req) require.NoError(t, err) } @@ -3562,7 +3420,7 @@ func verifyCompactedHead(t *testing.T, i *Ingester, expected bool) { func pushSingleSampleWithMetadata(t *testing.T, i *Ingester) { ctx := user.InjectOrgID(context.Background(), userID) - req, _, _, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, util.TimeToMillis(time.Now())) + req, _, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, util.TimeToMillis(time.Now())) req.Metadata = append(req.Metadata, &cortexpb.MetricMetadata{MetricFamilyName: "test", Help: "a help for metric", Unit: "", Type: cortexpb.COUNTER}) _, err := i.Push(ctx, req) require.NoError(t, err) @@ -3570,7 +3428,7 @@ func pushSingleSampleWithMetadata(t *testing.T, i *Ingester) { func pushSingleSampleAtTime(t *testing.T, i *Ingester, ts int64) { ctx := user.InjectOrgID(context.Background(), userID) - req, _, _, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, ts) + req, _, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, ts) _, err := i.Push(ctx, req) require.NoError(t, err) } @@ -3707,7 +3565,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) { // Push some data to create 3 blocks. ctx := user.InjectOrgID(context.Background(), userID) for j := int64(0); j < 5; j++ { - req, _, _, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, j*chunkRangeMilliSec) + req, _, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, j*chunkRangeMilliSec) _, err := i.Push(ctx, req) require.NoError(t, err) } @@ -3734,7 +3592,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) { // Add more samples that could trigger another compaction and hence reload of blocks. for j := int64(5); j < 6; j++ { - req, _, _, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, j*chunkRangeMilliSec) + req, _, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, j*chunkRangeMilliSec) _, err := i.Push(ctx, req) require.NoError(t, err) } @@ -3762,7 +3620,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) { // Add more samples that could trigger another compaction and hence reload of blocks. for j := int64(6); j < 7; j++ { - req, _, _, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, j*chunkRangeMilliSec) + req, _, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, j*chunkRangeMilliSec) _, err := i.Push(ctx, req) require.NoError(t, err) } @@ -3808,7 +3666,7 @@ func TestIngesterPushErrorDuringForcedCompaction(t *testing.T) { require.True(t, db.casState(active, forceCompacting)) // Ingestion should fail with a 503. - req, _, _, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, util.TimeToMillis(time.Now())) + req, _, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, util.TimeToMillis(time.Now())) ctx := user.InjectOrgID(context.Background(), userID) _, err = i.Push(ctx, req) require.Equal(t, httpgrpc.Errorf(http.StatusServiceUnavailable, wrapWithUser(errors.New("forced compaction in progress"), userID).Error()), err)