@@ -2051,100 +2051,6 @@ func TestIngester_QueryStream(t *testing.T) {
20512051 t .Run ("chunks" , chunksTest )
20522052}
20532053
2054- func TestIngester_QueryStreamManySamples (t * testing.T ) {
2055- // Create ingester.
2056- i , err := prepareIngesterWithBlocksStorage (t , defaultIngesterTestConfig (t ), nil )
2057- require .NoError (t , err )
2058- require .NoError (t , services .StartAndAwaitRunning (context .Background (), i ))
2059- defer services .StopAndAwaitTerminated (context .Background (), i ) //nolint:errcheck
2060-
2061- // Wait until it's ACTIVE.
2062- test .Poll (t , 1 * time .Second , ring .ACTIVE , func () interface {} {
2063- return i .lifecycler .GetState ()
2064- })
2065-
2066- // Push series.
2067- ctx := user .InjectOrgID (context .Background (), userID )
2068-
2069- const samplesCount = 100000
2070- samples := make ([]cortexpb.Sample , 0 , samplesCount )
2071-
2072- for i := 0 ; i < samplesCount ; i ++ {
2073- samples = append (samples , cortexpb.Sample {
2074- Value : float64 (i ),
2075- TimestampMs : int64 (i ),
2076- })
2077- }
2078-
2079- // 10k samples encode to around 140 KiB,
2080- _ , err = i .Push (ctx , writeRequestSingleSeries (labels.Labels {{Name : labels .MetricName , Value : "foo" }, {Name : "l" , Value : "1" }}, samples [0 :10000 ]))
2081- require .NoError (t , err )
2082-
2083- // 100k samples encode to around 1.4 MiB,
2084- _ , err = i .Push (ctx , writeRequestSingleSeries (labels.Labels {{Name : labels .MetricName , Value : "foo" }, {Name : "l" , Value : "2" }}, samples ))
2085- require .NoError (t , err )
2086-
2087- // 50k samples encode to around 716 KiB,
2088- _ , err = i .Push (ctx , writeRequestSingleSeries (labels.Labels {{Name : labels .MetricName , Value : "foo" }, {Name : "l" , Value : "3" }}, samples [0 :50000 ]))
2089- require .NoError (t , err )
2090-
2091- // Create a GRPC server used to query back the data.
2092- serv := grpc .NewServer (grpc .StreamInterceptor (middleware .StreamServerUserHeaderInterceptor ))
2093- defer serv .GracefulStop ()
2094- client .RegisterIngesterServer (serv , i )
2095-
2096- listener , err := net .Listen ("tcp" , "localhost:0" )
2097- require .NoError (t , err )
2098-
2099- go func () {
2100- require .NoError (t , serv .Serve (listener ))
2101- }()
2102-
2103- // Query back the series using GRPC streaming.
2104- c , err := client .MakeIngesterClient (listener .Addr ().String (), defaultClientTestConfig ())
2105- require .NoError (t , err )
2106- defer c .Close ()
2107-
2108- s , err := c .QueryStream (ctx , & client.QueryRequest {
2109- StartTimestampMs : 0 ,
2110- EndTimestampMs : samplesCount + 1 ,
2111-
2112- Matchers : []* client.LabelMatcher {{
2113- Type : client .EQUAL ,
2114- Name : model .MetricNameLabel ,
2115- Value : "foo" ,
2116- }},
2117- })
2118- require .NoError (t , err )
2119-
2120- recvMsgs := 0
2121- series := 0
2122- totalSamples := 0
2123-
2124- for {
2125- resp , err := s .Recv ()
2126- if err == io .EOF {
2127- break
2128- }
2129- require .NoError (t , err )
2130- require .True (t , len (resp .Timeseries ) > 0 ) // No empty messages.
2131-
2132- recvMsgs ++
2133- series += len (resp .Timeseries )
2134-
2135- for _ , ts := range resp .Timeseries {
2136- totalSamples += len (ts .Samples )
2137- }
2138- }
2139-
2140- // As ingester doesn't guarantee sorting of series, we can get 2 (10k + 50k in first, 100k in second)
2141- // or 3 messages (small series first, 100k second, small series last).
2142-
2143- require .True (t , 2 <= recvMsgs && recvMsgs <= 3 )
2144- require .Equal (t , 3 , series )
2145- require .Equal (t , 10000 + 50000 + samplesCount , totalSamples )
2146- }
2147-
21482054func TestIngester_QueryStreamManySamplesChunks (t * testing.T ) {
21492055 // Create ingester.
21502056 cfg := defaultIngesterTestConfig (t )
0 commit comments