Skip to content

Batch series in streaming ingester based on message sizes. #3015

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* [ENHANCMENT] Expose `storage.aws.dynamodb.backoff_config` configuration file field. #3026
* [BUGFIX] Query-frontend: Fixed rounding for incoming query timestamps, to be 100% Prometheus compatible. #2990
* [BUGFIX] Querier: query /series from ingesters regardless the `-querier.query-ingesters-within` setting. #3035
* [BUGFIX] Experimental blocks storage: Ingester is less likely to hit gRPC message size limit when streaming data to queriers. #3015

## 1.3.0 in progress

Expand Down
20 changes: 13 additions & 7 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,8 @@ func createUserStats(db *userTSDB) *client.UserStatsResponse {
}
}

const queryStreamBatchMessageSize = 1 * 1024 * 1024

// v2QueryStream streams metrics from a TSDB. This implements the client.IngesterServer interface
func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) error {
log, ctx := spanlogger.New(stream.Context(), "v2QueryStream")
Expand Down Expand Up @@ -792,7 +794,7 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste
}

timeseries := make([]client.TimeSeries, 0, queryStreamBatchSize)
batchSize := 0
batchSizeBytes := 0
numSamples := 0
numSeries := 0
for ss.Next() {
Expand All @@ -809,21 +811,25 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste
ts.Samples = append(ts.Samples, client.Sample{Value: v, TimestampMs: t})
}
numSamples += len(ts.Samples)

timeseries = append(timeseries, ts)
numSeries++
batchSize++
if batchSize >= queryStreamBatchSize {
tsSize := ts.Size()

if (batchSizeBytes > 0 && batchSizeBytes+tsSize > queryStreamBatchMessageSize) || len(timeseries) >= queryStreamBatchSize {
// Adding this series to the batch would make it too big,
// flush the data and add it to new batch instead.
err = client.SendQueryStream(stream, &client.QueryStreamResponse{
Timeseries: timeseries,
})
if err != nil {
return err
}

batchSize = 0
batchSizeBytes = 0
timeseries = timeseries[:0]
}

timeseries = append(timeseries, ts)
batchSizeBytes += tsSize
}

// Ensure no error occurred while iterating the series set.
Expand All @@ -832,7 +838,7 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste
}

// Final flush any existing metrics
if batchSize != 0 {
if batchSizeBytes != 0 {
err = client.SendQueryStream(stream, &client.QueryStreamResponse{
Timeseries: timeseries,
})
Expand Down
175 changes: 175 additions & 0 deletions pkg/ingester/ingester_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/http/httptest"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -1105,6 +1106,180 @@ func TestIngester_v2QueryStream(t *testing.T) {
require.Equal(t, expectedResponse, lastResp)
}

func TestIngester_v2QueryStreamManySamples(t *testing.T) {
// Create ingester.
i, cleanup, err := newIngesterMockWithTSDBStorage(defaultIngesterTestConfig(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
defer cleanup()

// Wait until it's ACTIVE.
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
return i.lifecycler.GetState()
})

// Push series.
ctx := user.InjectOrgID(context.Background(), userID)

const samplesCount = 100000
samples := make([]client.Sample, 0, samplesCount)

for i := 0; i < samplesCount; i++ {
samples = append(samples, client.Sample{
Value: float64(i),
TimestampMs: int64(i),
})
}

// 10k samples encode to around 140 KiB,
_, err = i.v2Push(ctx, writeRequestSingleSeries(labels.Labels{{Name: labels.MetricName, Value: "foo"}, {Name: "l", Value: "1"}}, samples[0:10000]))
require.NoError(t, err)

// 100k samples encode to around 1.4 MiB,
_, err = i.v2Push(ctx, writeRequestSingleSeries(labels.Labels{{Name: labels.MetricName, Value: "foo"}, {Name: "l", Value: "2"}}, samples))
require.NoError(t, err)

// 50k samples encode to around 716 KiB,
_, err = i.v2Push(ctx, writeRequestSingleSeries(labels.Labels{{Name: labels.MetricName, Value: "foo"}, {Name: "l", Value: "3"}}, samples[0:50000]))
require.NoError(t, err)

// Create a GRPC server used to query back the data.
serv := grpc.NewServer(grpc.StreamInterceptor(middleware.StreamServerUserHeaderInterceptor))
defer serv.GracefulStop()
client.RegisterIngesterServer(serv, i)

listener, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)

go func() {
require.NoError(t, serv.Serve(listener))
}()

// Query back the series using GRPC streaming.
c, err := client.MakeIngesterClient(listener.Addr().String(), defaultClientTestConfig())
require.NoError(t, err)
defer c.Close()

s, err := c.QueryStream(ctx, &client.QueryRequest{
StartTimestampMs: 0,
EndTimestampMs: samplesCount + 1,

Matchers: []*client.LabelMatcher{{
Type: client.EQUAL,
Name: model.MetricNameLabel,
Value: "foo",
}},
})
require.NoError(t, err)

recvMsgs := 0
series := 0
totalSamples := 0

for {
resp, err := s.Recv()
if err == io.EOF {
break
}
require.NoError(t, err)
require.True(t, len(resp.Timeseries) > 0) // No empty messages.

recvMsgs++
series += len(resp.Timeseries)

for _, ts := range resp.Timeseries {
totalSamples += len(ts.Samples)
}
}

// As ingester doesn't guarantee sorting of series, we can get 2 (10k + 50k in first, 100k in second)
// or 3 messages (small series first, 100k second, small series last).

require.True(t, 2 <= recvMsgs && recvMsgs <= 3)
require.Equal(t, 3, series)
require.Equal(t, 10000+50000+samplesCount, totalSamples)
}

func writeRequestSingleSeries(lbls labels.Labels, samples []client.Sample) *client.WriteRequest {
req := &client.WriteRequest{
Source: client.API,
}

ts := client.TimeSeries{}
ts.Labels = client.FromLabelsToLabelAdapters(lbls)
ts.Samples = samples
req.Timeseries = append(req.Timeseries, client.PreallocTimeseries{TimeSeries: &ts})

return req
}

type mockQueryStreamServer struct {
grpc.ServerStream
ctx context.Context
}

func (m *mockQueryStreamServer) Send(response *client.QueryStreamResponse) error {
return nil
}

func (m *mockQueryStreamServer) Context() context.Context {
return m.ctx
}

func BenchmarkIngester_v2QueryStream(b *testing.B) {
// Create ingester.
i, cleanup, err := newIngesterMockWithTSDBStorage(defaultIngesterTestConfig(), nil)
require.NoError(b, err)
require.NoError(b, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
defer cleanup()

// Wait until it's ACTIVE.
test.Poll(b, 1*time.Second, ring.ACTIVE, func() interface{} {
return i.lifecycler.GetState()
})

// Push series.
ctx := user.InjectOrgID(context.Background(), userID)

const samplesCount = 1000
samples := make([]client.Sample, 0, samplesCount)

for i := 0; i < samplesCount; i++ {
samples = append(samples, client.Sample{
Value: float64(i),
TimestampMs: int64(i),
})
}

const seriesCount = 100
for s := 0; s < seriesCount; s++ {
_, err = i.v2Push(ctx, writeRequestSingleSeries(labels.Labels{{Name: labels.MetricName, Value: "foo"}, {Name: "l", Value: strconv.Itoa(s)}}, samples))
require.NoError(b, err)
}

req := &client.QueryRequest{
StartTimestampMs: 0,
EndTimestampMs: samplesCount + 1,

Matchers: []*client.LabelMatcher{{
Type: client.EQUAL,
Name: model.MetricNameLabel,
Value: "foo",
}},
}

mockStream := &mockQueryStreamServer{ctx: ctx}

b.ResetTimer()

for ix := 0; ix < b.N; ix++ {
err := i.v2QueryStream(req, mockStream)
require.NoError(b, err)
}
}

func mockWriteRequest(lbls labels.Labels, value float64, timestampMs int64) (*client.WriteRequest, *client.QueryResponse, *client.QueryStreamResponse) {
samples := []client.Sample{
{
Expand Down