From 1c903c82fda36bf35fa8a9807c4c7c77f27c3bcb Mon Sep 17 00:00:00 2001 From: Alvin Lin Date: Tue, 15 Nov 2022 16:55:25 -0800 Subject: [PATCH 1/5] Make querier_sharding_test less brittle Signed-off-by: Alvin Lin --- integration/querier_sharding_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/querier_sharding_test.go b/integration/querier_sharding_test.go index c4a14e39270..c47a3bb6500 100644 --- a/integration/querier_sharding_test.go +++ b/integration/querier_sharding_test.go @@ -55,7 +55,7 @@ func TestQuerierNoShardingWithQueryScheduler(t *testing.T) { func runQuerierShardingTest(t *testing.T, cfg querierShardingTestConfig) { // Going to high starts hitting file descriptor limit, since we run all queriers concurrently. - const numQueries = 100 + const numQueries = 500 s, err := e2e.NewScenario(networkName) require.NoError(t, err) From b689403b6b9a832a29728847a4715e166518ae05 Mon Sep 17 00:00:00 2001 From: Alvin Lin Date: Tue, 15 Nov 2022 17:43:22 -0800 Subject: [PATCH 2/5] Make querier_sharding_test less brittle Signed-off-by: Alvin Lin --- integration/querier_sharding_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/integration/querier_sharding_test.go b/integration/querier_sharding_test.go index c47a3bb6500..6ae6158c3b3 100644 --- a/integration/querier_sharding_test.go +++ b/integration/querier_sharding_test.go @@ -71,6 +71,7 @@ func runQuerierShardingTest(t *testing.T, cfg querierShardingTestConfig) { "-querier.query-ingesters-within": "12h", // Required by the test on query /series out of ingesters time range "-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), "-querier.max-outstanding-requests-per-tenant": strconv.Itoa(numQueries), // To avoid getting errors. + "-querier.max-concurrent": strconv.Itoa(numQueries), }) minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) From b79bf3c7746f1dad9ef85de5918ccc5e008fd8a7 Mon Sep 17 00:00:00 2001 From: Alvin Lin Date: Tue, 15 Nov 2022 19:49:43 -0800 Subject: [PATCH 3/5] rate limit query per second to avoid 429 Signed-off-by: Alvin Lin --- integration/querier_sharding_test.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/integration/querier_sharding_test.go b/integration/querier_sharding_test.go index 6ae6158c3b3..0e57808bf93 100644 --- a/integration/querier_sharding_test.go +++ b/integration/querier_sharding_test.go @@ -18,6 +18,8 @@ import ( e2ecache "github.com/cortexproject/cortex/integration/e2e/cache" e2edb "github.com/cortexproject/cortex/integration/e2e/db" "github.com/cortexproject/cortex/integration/e2ecortex" + + "golang.org/x/time/rate" ) type querierShardingTestConfig struct { @@ -54,7 +56,6 @@ func TestQuerierNoShardingWithQueryScheduler(t *testing.T) { } func runQuerierShardingTest(t *testing.T, cfg querierShardingTestConfig) { - // Going to high starts hitting file descriptor limit, since we run all queriers concurrently. const numQueries = 500 s, err := e2e.NewScenario(networkName) @@ -71,7 +72,6 @@ func runQuerierShardingTest(t *testing.T, cfg querierShardingTestConfig) { "-querier.query-ingesters-within": "12h", // Required by the test on query /series out of ingesters time range "-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), "-querier.max-outstanding-requests-per-tenant": strconv.Itoa(numQueries), // To avoid getting errors. - "-querier.max-concurrent": strconv.Itoa(numQueries), }) minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) @@ -144,8 +144,14 @@ func runQuerierShardingTest(t *testing.T, cfg querierShardingTestConfig) { wg := sync.WaitGroup{} + // Going to high starts hitting file descriptor limit, since we run all queriers concurrently. + const qps = 100 + limiter := rate.NewLimiter(rate.Limit(qps), qps) + // Run all queries concurrently to get better distribution of requests between queriers. for i := 0; i < numQueries; i++ { + reservation := limiter.Reserve() + time.Sleep(reservation.Delay()) wg.Add(1) go func() { From 6c1569e3df42080ecea4d071d5754dabfd760fd1 Mon Sep 17 00:00:00 2001 From: Alvin Lin Date: Tue, 15 Nov 2022 20:17:25 -0800 Subject: [PATCH 4/5] rate limit query per second to avoid 429 Signed-off-by: Alvin Lin --- integration/querier_sharding_test.go | 60 +++++++++++++++++----------- 1 file changed, 37 insertions(+), 23 deletions(-) diff --git a/integration/querier_sharding_test.go b/integration/querier_sharding_test.go index 0e57808bf93..c4dc162cca2 100644 --- a/integration/querier_sharding_test.go +++ b/integration/querier_sharding_test.go @@ -18,8 +18,6 @@ import ( e2ecache "github.com/cortexproject/cortex/integration/e2e/cache" e2edb "github.com/cortexproject/cortex/integration/e2e/db" "github.com/cortexproject/cortex/integration/e2ecortex" - - "golang.org/x/time/rate" ) type querierShardingTestConfig struct { @@ -56,6 +54,8 @@ func TestQuerierNoShardingWithQueryScheduler(t *testing.T) { } func runQuerierShardingTest(t *testing.T, cfg querierShardingTestConfig) { + // Going to high starts hitting file descriptor limit, since we run all queriers concurrently. + const batch = 100 const numQueries = 500 s, err := e2e.NewScenario(networkName) @@ -142,32 +142,30 @@ func runQuerierShardingTest(t *testing.T, cfg querierShardingTestConfig) { require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(2), "cortex_query_frontend_connected_clients")) } - wg := sync.WaitGroup{} + batches := generateBatches(batch, numQueries) - // Going to high starts hitting file descriptor limit, since we run all queriers concurrently. - const qps = 100 - limiter := rate.NewLimiter(rate.Limit(qps), qps) + wg := sync.WaitGroup{} // Run all queries concurrently to get better distribution of requests between queriers. - for i := 0; i < numQueries; i++ { - reservation := limiter.Reserve() - time.Sleep(reservation.Delay()) - wg.Add(1) - - go func() { - defer wg.Done() - c, err := e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", userID) - require.NoError(t, err) - - result, err := c.Query("series_1", now) - require.NoError(t, err) - require.Equal(t, model.ValVector, result.Type()) - assert.Equal(t, expectedVector, result.(model.Vector)) - }() + for _, concurrentQueries := range batches { + for i := 0; i < concurrentQueries; i++ { + wg.Add(1) + + go func() { + defer wg.Done() + c, err := e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", userID) + require.NoError(t, err) + + result, err := c.Query("series_1", now) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + assert.Equal(t, expectedVector, result.(model.Vector)) + }() + } + + wg.Wait() } - wg.Wait() - require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(numQueries), "cortex_query_frontend_queries_total")) // Verify that only single querier handled all the queries when sharding is enabled, otherwise queries have been fairly distributed across queriers. @@ -201,3 +199,19 @@ func runQuerierShardingTest(t *testing.T, cfg querierShardingTestConfig) { assertServiceMetricsPrefixes(t, QueryFrontend, queryFrontend) assertServiceMetricsPrefixes(t, QueryScheduler, queryScheduler) } + +func generateBatches(batchSize, total int) []int { + remain := total + batches := []int{} + for remain != 0 { + if remain > batchSize { + batches = append(batches, batchSize) + remain -= batchSize + } else { + batches = append(batches, remain) + remain = 0 + } + } + + return batches +} From 0173fdfe9d26c1364d2b516d6e60d15ecfe359ea Mon Sep 17 00:00:00 2001 From: Alvin Lin Date: Tue, 15 Nov 2022 20:50:49 -0800 Subject: [PATCH 5/5] Better var name la Signed-off-by: Alvin Lin --- integration/querier_sharding_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integration/querier_sharding_test.go b/integration/querier_sharding_test.go index c4dc162cca2..da37dd90c23 100644 --- a/integration/querier_sharding_test.go +++ b/integration/querier_sharding_test.go @@ -55,7 +55,7 @@ func TestQuerierNoShardingWithQueryScheduler(t *testing.T) { func runQuerierShardingTest(t *testing.T, cfg querierShardingTestConfig) { // Going to high starts hitting file descriptor limit, since we run all queriers concurrently. - const batch = 100 + const batchSize = 100 const numQueries = 500 s, err := e2e.NewScenario(networkName) @@ -142,7 +142,7 @@ func runQuerierShardingTest(t *testing.T, cfg querierShardingTestConfig) { require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(2), "cortex_query_frontend_connected_clients")) } - batches := generateBatches(batch, numQueries) + batches := generateBatches(batchSize, numQueries) wg := sync.WaitGroup{}