Skip to content

Commit e8a6686

Browse files
authored
Batch series in streaming ingester based on message sizes. (#3015)
* Batch series in streaming ingester based on message sizes. Signed-off-by: Peter Štibraný <[email protected]> * CHANGELOG.md Signed-off-by: Peter Štibraný <[email protected]> * Renamed var. Signed-off-by: Peter Štibraný <[email protected]> * Fix whitespace. Signed-off-by: Peter Štibraný <[email protected]> * Added benchmark for v2QueryStream Signed-off-by: Peter Štibraný <[email protected]> * Don't use grpc server, but call method directly. Signed-off-by: Peter Štibraný <[email protected]> * Push CI Signed-off-by: Peter Štibraný <[email protected]>
1 parent 06aabd6 commit e8a6686

File tree

3 files changed

+189
-7
lines changed

3 files changed

+189
-7
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* [ENHANCEMENT] Expose `storage.aws.dynamodb.backoff_config` configuration file field. #3026
1616
* [BUGFIX] Query-frontend: Fixed rounding for incoming query timestamps, to be 100% Prometheus compatible. #2990
1717
* [BUGFIX] Querier: query /series from ingesters regardless the `-querier.query-ingesters-within` setting. #3035
18+
* [BUGFIX] Experimental blocks storage: Ingester is less likely to hit gRPC message size limit when streaming data to queriers. #3015
1819

1920
## 1.3.0 in progress
2021

pkg/ingester/ingester_v2.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -741,6 +741,8 @@ func createUserStats(db *userTSDB) *client.UserStatsResponse {
741741
}
742742
}
743743

744+
const queryStreamBatchMessageSize = 1 * 1024 * 1024
745+
744746
// v2QueryStream streams metrics from a TSDB. This implements the client.IngesterServer interface
745747
func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) error {
746748
log, ctx := spanlogger.New(stream.Context(), "v2QueryStream")
@@ -776,7 +778,7 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste
776778
}
777779

778780
timeseries := make([]client.TimeSeries, 0, queryStreamBatchSize)
779-
batchSize := 0
781+
batchSizeBytes := 0
780782
numSamples := 0
781783
numSeries := 0
782784
for ss.Next() {
@@ -793,21 +795,25 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste
793795
ts.Samples = append(ts.Samples, client.Sample{Value: v, TimestampMs: t})
794796
}
795797
numSamples += len(ts.Samples)
796-
797-
timeseries = append(timeseries, ts)
798798
numSeries++
799-
batchSize++
800-
if batchSize >= queryStreamBatchSize {
799+
tsSize := ts.Size()
800+
801+
if (batchSizeBytes > 0 && batchSizeBytes+tsSize > queryStreamBatchMessageSize) || len(timeseries) >= queryStreamBatchSize {
802+
// Adding this series to the batch would make it too big,
803+
// flush the data and add it to new batch instead.
801804
err = client.SendQueryStream(stream, &client.QueryStreamResponse{
802805
Timeseries: timeseries,
803806
})
804807
if err != nil {
805808
return err
806809
}
807810

808-
batchSize = 0
811+
batchSizeBytes = 0
809812
timeseries = timeseries[:0]
810813
}
814+
815+
timeseries = append(timeseries, ts)
816+
batchSizeBytes += tsSize
811817
}
812818

813819
// Ensure no error occurred while iterating the series set.
@@ -816,7 +822,7 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste
816822
}
817823

818824
// Final flush any existing metrics
819-
if batchSize != 0 {
825+
if batchSizeBytes != 0 {
820826
err = client.SendQueryStream(stream, &client.QueryStreamResponse{
821827
Timeseries: timeseries,
822828
})

pkg/ingester/ingester_v2_test.go

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"net/http/httptest"
1212
"os"
1313
"path/filepath"
14+
"strconv"
1415
"strings"
1516
"sync"
1617
"testing"
@@ -1210,6 +1211,180 @@ func TestIngester_v2QueryStream(t *testing.T) {
12101211
require.Equal(t, expectedResponse, lastResp)
12111212
}
12121213

1214+
func TestIngester_v2QueryStreamManySamples(t *testing.T) {
1215+
// Create ingester.
1216+
i, cleanup, err := newIngesterMockWithTSDBStorage(defaultIngesterTestConfig(), nil)
1217+
require.NoError(t, err)
1218+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
1219+
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
1220+
defer cleanup()
1221+
1222+
// Wait until it's ACTIVE.
1223+
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
1224+
return i.lifecycler.GetState()
1225+
})
1226+
1227+
// Push series.
1228+
ctx := user.InjectOrgID(context.Background(), userID)
1229+
1230+
const samplesCount = 100000
1231+
samples := make([]client.Sample, 0, samplesCount)
1232+
1233+
for i := 0; i < samplesCount; i++ {
1234+
samples = append(samples, client.Sample{
1235+
Value: float64(i),
1236+
TimestampMs: int64(i),
1237+
})
1238+
}
1239+
1240+
// 10k samples encode to around 140 KiB,
1241+
_, err = i.v2Push(ctx, writeRequestSingleSeries(labels.Labels{{Name: labels.MetricName, Value: "foo"}, {Name: "l", Value: "1"}}, samples[0:10000]))
1242+
require.NoError(t, err)
1243+
1244+
// 100k samples encode to around 1.4 MiB,
1245+
_, err = i.v2Push(ctx, writeRequestSingleSeries(labels.Labels{{Name: labels.MetricName, Value: "foo"}, {Name: "l", Value: "2"}}, samples))
1246+
require.NoError(t, err)
1247+
1248+
// 50k samples encode to around 716 KiB,
1249+
_, err = i.v2Push(ctx, writeRequestSingleSeries(labels.Labels{{Name: labels.MetricName, Value: "foo"}, {Name: "l", Value: "3"}}, samples[0:50000]))
1250+
require.NoError(t, err)
1251+
1252+
// Create a GRPC server used to query back the data.
1253+
serv := grpc.NewServer(grpc.StreamInterceptor(middleware.StreamServerUserHeaderInterceptor))
1254+
defer serv.GracefulStop()
1255+
client.RegisterIngesterServer(serv, i)
1256+
1257+
listener, err := net.Listen("tcp", "localhost:0")
1258+
require.NoError(t, err)
1259+
1260+
go func() {
1261+
require.NoError(t, serv.Serve(listener))
1262+
}()
1263+
1264+
// Query back the series using GRPC streaming.
1265+
c, err := client.MakeIngesterClient(listener.Addr().String(), defaultClientTestConfig())
1266+
require.NoError(t, err)
1267+
defer c.Close()
1268+
1269+
s, err := c.QueryStream(ctx, &client.QueryRequest{
1270+
StartTimestampMs: 0,
1271+
EndTimestampMs: samplesCount + 1,
1272+
1273+
Matchers: []*client.LabelMatcher{{
1274+
Type: client.EQUAL,
1275+
Name: model.MetricNameLabel,
1276+
Value: "foo",
1277+
}},
1278+
})
1279+
require.NoError(t, err)
1280+
1281+
recvMsgs := 0
1282+
series := 0
1283+
totalSamples := 0
1284+
1285+
for {
1286+
resp, err := s.Recv()
1287+
if err == io.EOF {
1288+
break
1289+
}
1290+
require.NoError(t, err)
1291+
require.True(t, len(resp.Timeseries) > 0) // No empty messages.
1292+
1293+
recvMsgs++
1294+
series += len(resp.Timeseries)
1295+
1296+
for _, ts := range resp.Timeseries {
1297+
totalSamples += len(ts.Samples)
1298+
}
1299+
}
1300+
1301+
// As ingester doesn't guarantee sorting of series, we can get 2 (10k + 50k in first, 100k in second)
1302+
// or 3 messages (small series first, 100k second, small series last).
1303+
1304+
require.True(t, 2 <= recvMsgs && recvMsgs <= 3)
1305+
require.Equal(t, 3, series)
1306+
require.Equal(t, 10000+50000+samplesCount, totalSamples)
1307+
}
1308+
1309+
func writeRequestSingleSeries(lbls labels.Labels, samples []client.Sample) *client.WriteRequest {
1310+
req := &client.WriteRequest{
1311+
Source: client.API,
1312+
}
1313+
1314+
ts := client.TimeSeries{}
1315+
ts.Labels = client.FromLabelsToLabelAdapters(lbls)
1316+
ts.Samples = samples
1317+
req.Timeseries = append(req.Timeseries, client.PreallocTimeseries{TimeSeries: &ts})
1318+
1319+
return req
1320+
}
1321+
1322+
type mockQueryStreamServer struct {
1323+
grpc.ServerStream
1324+
ctx context.Context
1325+
}
1326+
1327+
func (m *mockQueryStreamServer) Send(response *client.QueryStreamResponse) error {
1328+
return nil
1329+
}
1330+
1331+
func (m *mockQueryStreamServer) Context() context.Context {
1332+
return m.ctx
1333+
}
1334+
1335+
func BenchmarkIngester_v2QueryStream(b *testing.B) {
1336+
// Create ingester.
1337+
i, cleanup, err := newIngesterMockWithTSDBStorage(defaultIngesterTestConfig(), nil)
1338+
require.NoError(b, err)
1339+
require.NoError(b, services.StartAndAwaitRunning(context.Background(), i))
1340+
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
1341+
defer cleanup()
1342+
1343+
// Wait until it's ACTIVE.
1344+
test.Poll(b, 1*time.Second, ring.ACTIVE, func() interface{} {
1345+
return i.lifecycler.GetState()
1346+
})
1347+
1348+
// Push series.
1349+
ctx := user.InjectOrgID(context.Background(), userID)
1350+
1351+
const samplesCount = 1000
1352+
samples := make([]client.Sample, 0, samplesCount)
1353+
1354+
for i := 0; i < samplesCount; i++ {
1355+
samples = append(samples, client.Sample{
1356+
Value: float64(i),
1357+
TimestampMs: int64(i),
1358+
})
1359+
}
1360+
1361+
const seriesCount = 100
1362+
for s := 0; s < seriesCount; s++ {
1363+
_, err = i.v2Push(ctx, writeRequestSingleSeries(labels.Labels{{Name: labels.MetricName, Value: "foo"}, {Name: "l", Value: strconv.Itoa(s)}}, samples))
1364+
require.NoError(b, err)
1365+
}
1366+
1367+
req := &client.QueryRequest{
1368+
StartTimestampMs: 0,
1369+
EndTimestampMs: samplesCount + 1,
1370+
1371+
Matchers: []*client.LabelMatcher{{
1372+
Type: client.EQUAL,
1373+
Name: model.MetricNameLabel,
1374+
Value: "foo",
1375+
}},
1376+
}
1377+
1378+
mockStream := &mockQueryStreamServer{ctx: ctx}
1379+
1380+
b.ResetTimer()
1381+
1382+
for ix := 0; ix < b.N; ix++ {
1383+
err := i.v2QueryStream(req, mockStream)
1384+
require.NoError(b, err)
1385+
}
1386+
}
1387+
12131388
func mockWriteRequest(lbls labels.Labels, value float64, timestampMs int64) (*client.WriteRequest, *client.QueryResponse, *client.QueryStreamResponse) {
12141389
samples := []client.Sample{
12151390
{

0 commit comments

Comments
 (0)