diff --git a/integration/querier_sharding_test.go b/integration/querier_sharding_test.go index c4a14e39270..da37dd90c23 100644 --- a/integration/querier_sharding_test.go +++ b/integration/querier_sharding_test.go @@ -55,7 +55,8 @@ func TestQuerierNoShardingWithQueryScheduler(t *testing.T) { func runQuerierShardingTest(t *testing.T, cfg querierShardingTestConfig) { // Going to high starts hitting file descriptor limit, since we run all queriers concurrently. - const numQueries = 100 + const batchSize = 100 + const numQueries = 500 s, err := e2e.NewScenario(networkName) require.NoError(t, err) @@ -141,26 +142,30 @@ func runQuerierShardingTest(t *testing.T, cfg querierShardingTestConfig) { require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(2), "cortex_query_frontend_connected_clients")) } + batches := generateBatches(batchSize, numQueries) + wg := sync.WaitGroup{} // Run all queries concurrently to get better distribution of requests between queriers. - for i := 0; i < numQueries; i++ { - wg.Add(1) - - go func() { - defer wg.Done() - c, err := e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", userID) - require.NoError(t, err) - - result, err := c.Query("series_1", now) - require.NoError(t, err) - require.Equal(t, model.ValVector, result.Type()) - assert.Equal(t, expectedVector, result.(model.Vector)) - }() + for _, concurrentQueries := range batches { + for i := 0; i < concurrentQueries; i++ { + wg.Add(1) + + go func() { + defer wg.Done() + c, err := e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", userID) + require.NoError(t, err) + + result, err := c.Query("series_1", now) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + assert.Equal(t, expectedVector, result.(model.Vector)) + }() + } + + wg.Wait() } - wg.Wait() - require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(numQueries), "cortex_query_frontend_queries_total")) // Verify that only single querier handled all the queries when sharding is enabled, otherwise queries have been fairly distributed across queriers. @@ -194,3 +199,19 @@ func runQuerierShardingTest(t *testing.T, cfg querierShardingTestConfig) { assertServiceMetricsPrefixes(t, QueryFrontend, queryFrontend) assertServiceMetricsPrefixes(t, QueryScheduler, queryScheduler) } + +func generateBatches(batchSize, total int) []int { + remain := total + batches := []int{} + for remain != 0 { + if remain > batchSize { + batches = append(batches, batchSize) + remain -= batchSize + } else { + batches = append(batches, remain) + remain = 0 + } + } + + return batches +}