Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 37 additions & 16 deletions integration/querier_sharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ func TestQuerierNoShardingWithQueryScheduler(t *testing.T) {

func runQuerierShardingTest(t *testing.T, cfg querierShardingTestConfig) {
// Going to high starts hitting file descriptor limit, since we run all queriers concurrently.
const numQueries = 100
const batchSize = 100
const numQueries = 500

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
Expand Down Expand Up @@ -141,26 +142,30 @@ func runQuerierShardingTest(t *testing.T, cfg querierShardingTestConfig) {
require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(2), "cortex_query_frontend_connected_clients"))
}

batches := generateBatches(batchSize, numQueries)

wg := sync.WaitGroup{}

// Run all queries concurrently to get better distribution of requests between queriers.
for i := 0; i < numQueries; i++ {
wg.Add(1)

go func() {
defer wg.Done()
c, err := e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", userID)
require.NoError(t, err)

result, err := c.Query("series_1", now)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector, result.(model.Vector))
}()
for _, concurrentQueries := range batches {
for i := 0; i < concurrentQueries; i++ {
wg.Add(1)

go func() {
defer wg.Done()
c, err := e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", userID)
require.NoError(t, err)

result, err := c.Query("series_1", now)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector, result.(model.Vector))
}()
}

wg.Wait()
}

wg.Wait()

require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(numQueries), "cortex_query_frontend_queries_total"))

// Verify that only single querier handled all the queries when sharding is enabled, otherwise queries have been fairly distributed across queriers.
Expand Down Expand Up @@ -194,3 +199,19 @@ func runQuerierShardingTest(t *testing.T, cfg querierShardingTestConfig) {
assertServiceMetricsPrefixes(t, QueryFrontend, queryFrontend)
assertServiceMetricsPrefixes(t, QueryScheduler, queryScheduler)
}

func generateBatches(batchSize, total int) []int {
remain := total
batches := []int{}
for remain != 0 {
if remain > batchSize {
batches = append(batches, batchSize)
remain -= batchSize
} else {
batches = append(batches, remain)
remain = 0
}
}

return batches
}