Skip to content

Commit 628e5d6

Browse files
committed
Revert "Reuse write request from distributor to Ingesters (cortexproject#5193)"
This reverts commit 64b6c2b.
1 parent ec03722 commit 628e5d6

File tree

8 files changed

+25
-223
lines changed

8 files changed

+25
-223
lines changed

CHANGELOG.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,25 @@
9191
* [CHANGE] Distributor/Ingester: Log warn level on push requests when they have status code 4xx. Do not log if status is 429. #5103
9292
* [CHANGE] Tracing: Use the default OTEL trace sampler when `-tracing.otel.exporter-type` is set to `awsxray`. #5141
9393
* [CHANGE] Ingester partial error log line to debug level. #5192
94+
<<<<<<< HEAD
9495
* [CHANGE] Change HTTP status code from 503/422 to 499 if a request is canceled. #5220
9596
* [CHANGE] Store gateways summary metrics have been converted to histograms `cortex_bucket_store_series_blocks_queried`, `cortex_bucket_store_series_data_fetched`, `cortex_bucket_store_series_data_size_touched_bytes`, `cortex_bucket_store_series_data_size_fetched_bytes`, `cortex_bucket_store_series_data_touched`, `cortex_bucket_store_series_result_series` #5239
97+
=======
98+
* [ENHANCEMENT] Update Go version to 1.19.3. #4988
99+
* [ENHANCEMENT] Querier: limit series query to only ingesters if `start` param is not specified. #4976
100+
* [ENHANCEMENT] Query-frontend/scheduler: add a new limit `frontend.max-outstanding-requests-per-tenant` for configuring queue size per tenant. Started deprecating two flags `-query-scheduler.max-outstanding-requests-per-tenant` and `-querier.max-outstanding-requests-per-tenant`, and change their value default to 0. Now if both the old flag and new flag are specified, the old flag's queue size will be picked. #5005
101+
* [ENHANCEMENT] Query-tee: Add `/api/v1/query_exemplars` API endpoint support. #5010
102+
* [ENHANCEMENT] Let blocks_cleaner delete blocks concurrently(default 16 goroutines). #5028
103+
* [ENHANCEMENT] Query Frontend/Query Scheduler: Increase upper bound to 60s for queue duration histogram metric. #5029
104+
* [ENHANCEMENT] Query Frontend: Log Vertical sharding information when `query_stats_enabled` is enabled. #5037
105+
* [ENHANCEMENT] Ingester: The metadata APIs should honour `querier.query-ingesters-within` when `querier.query-store-for-labels-enabled` is true. #5027
106+
* [ENHANCEMENT] Query Frontend: Skip instant query roundtripper if sharding is not applicable. #5062
107+
* [ENHANCEMENT] Push reduce one hash operation of Labels. #4945 #5114
108+
* [ENHANCEMENT] Alertmanager: Added `-alertmanager.enabled-tenants` and `-alertmanager.disabled-tenants` to explicitly enable or disable alertmanager for specific tenants. #5116
109+
* [ENHANCEMENT] Upgraded Docker base images to `alpine:3.17`. #5132
110+
* [ENHANCEMENT] Add retry logic to S3 bucket client. #5135
111+
* [ENHANCEMENT] Update Go version to 1.20.1. #5159
112+
>>>>>>> parent of 64b6c2b73 (Reuse write request from distributor to Ingesters (#5193))
96113
* [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978
97114
* [FEATURE] Ingester: Add active series to all_user_stats page. #4972
98115
* [FEATURE] Ingester: Added `-blocks-storage.tsdb.head-chunks-write-queue-size` allowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000

pkg/cortexpb/slicesPool.go

Lines changed: 0 additions & 61 deletions
This file was deleted.

pkg/cortexpb/slicesPool_test.go

Lines changed: 0 additions & 30 deletions
This file was deleted.

pkg/cortexpb/timeseries.go

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,6 @@ var (
3737
}
3838
},
3939
}
40-
41-
writeRequestPool = sync.Pool{
42-
New: func() interface{} {
43-
return &PreallocWriteRequest{
44-
WriteRequest: WriteRequest{},
45-
}
46-
},
47-
}
48-
bytePool = newSlicePool(20)
4940
)
5041

5142
// PreallocConfig configures how structures will be preallocated to optimise
@@ -62,7 +53,6 @@ func (PreallocConfig) RegisterFlags(f *flag.FlagSet) {
6253
// PreallocWriteRequest is a WriteRequest which preallocs slices on Unmarshal.
6354
type PreallocWriteRequest struct {
6455
WriteRequest
65-
data *[]byte
6656
}
6757

6858
// Unmarshal implements proto.Message.
@@ -82,32 +72,6 @@ func (p *PreallocTimeseries) Unmarshal(dAtA []byte) error {
8272
return p.TimeSeries.Unmarshal(dAtA)
8373
}
8474

85-
func (p *PreallocWriteRequest) Marshal() (dAtA []byte, err error) {
86-
size := p.Size()
87-
p.data = bytePool.getSlice(size)
88-
dAtA = *p.data
89-
n, err := p.MarshalToSizedBuffer(dAtA[:size])
90-
if err != nil {
91-
return nil, err
92-
}
93-
return dAtA[:n], nil
94-
}
95-
96-
func ReuseWriteRequest(req *PreallocWriteRequest) {
97-
if req.data != nil {
98-
bytePool.reuseSlice(req.data)
99-
req.data = nil
100-
}
101-
req.Source = 0
102-
req.Metadata = nil
103-
req.Timeseries = nil
104-
writeRequestPool.Put(req)
105-
}
106-
107-
func PreallocWriteRequestFromPool() *PreallocWriteRequest {
108-
return writeRequestPool.Get().(*PreallocWriteRequest)
109-
}
110-
11175
// LabelAdapter is a labels.Label that can be marshalled to/from protos.
11276
type LabelAdapter labels.Label
11377

pkg/cortexpb/timeseries_test.go

Lines changed: 0 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
package cortexpb
22

33
import (
4-
"fmt"
54
"testing"
65

7-
"github.com/gogo/protobuf/proto"
86
"github.com/stretchr/testify/assert"
97
"github.com/stretchr/testify/require"
108
)
@@ -66,67 +64,3 @@ func TestTimeseriesFromPool(t *testing.T) {
6664
assert.Len(t, reused.Samples, 0)
6765
})
6866
}
69-
70-
func BenchmarkMarshallWriteRequest(b *testing.B) {
71-
ts := PreallocTimeseriesSliceFromPool()
72-
73-
for i := 0; i < 100; i++ {
74-
ts = append(ts, PreallocTimeseries{TimeSeries: TimeseriesFromPool()})
75-
ts[i].Labels = []LabelAdapter{
76-
{Name: "foo", Value: "bar"},
77-
{Name: "very long label name", Value: "very long label value"},
78-
{Name: "very long label name 2", Value: "very long label value 2"},
79-
{Name: "very long label name 3", Value: "very long label value 3"},
80-
{Name: "int", Value: fmt.Sprint(i)},
81-
}
82-
ts[i].Samples = []Sample{{Value: 1, TimestampMs: 2}}
83-
}
84-
85-
tests := []struct {
86-
name string
87-
writeRequestFactory func() proto.Marshaler
88-
clean func(in interface{})
89-
}{
90-
{
91-
name: "no-pool",
92-
writeRequestFactory: func() proto.Marshaler {
93-
return &WriteRequest{Timeseries: ts}
94-
},
95-
clean: func(in interface{}) {},
96-
},
97-
{
98-
name: "byte pool",
99-
writeRequestFactory: func() proto.Marshaler {
100-
w := &PreallocWriteRequest{}
101-
w.Timeseries = ts
102-
return w
103-
},
104-
clean: func(in interface{}) {
105-
ReuseWriteRequest(in.(*PreallocWriteRequest))
106-
},
107-
},
108-
{
109-
name: "byte and write pool",
110-
writeRequestFactory: func() proto.Marshaler {
111-
w := PreallocWriteRequestFromPool()
112-
w.Timeseries = ts
113-
return w
114-
},
115-
clean: func(in interface{}) {
116-
ReuseWriteRequest(in.(*PreallocWriteRequest))
117-
},
118-
},
119-
}
120-
121-
for _, tc := range tests {
122-
b.Run(tc.name, func(b *testing.B) {
123-
for i := 0; i < b.N; i++ {
124-
w := tc.writeRequestFactory()
125-
_, err := w.Marshal()
126-
require.NoError(b, err)
127-
tc.clean(w)
128-
}
129-
b.ReportAllocs()
130-
})
131-
}
132-
}

pkg/distributor/distributor.go

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -869,20 +869,14 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time
869869
if err != nil {
870870
return err
871871
}
872-
c := h.(ingester_client.HealthAndIngesterClient)
872+
c := h.(ingester_client.IngesterClient)
873873

874-
req := cortexpb.PreallocWriteRequestFromPool()
875-
req.Timeseries = timeseries
876-
req.Metadata = metadata
877-
req.Source = source
878-
879-
_, err = c.PushPreAlloc(ctx, req)
880-
881-
// We should not reuse the req in case of errors:
882-
// See: https://github.com/grpc/grpc-go/issues/6355
883-
if err == nil {
884-
cortexpb.ReuseWriteRequest(req)
874+
req := cortexpb.WriteRequest{
875+
Timeseries: timeseries,
876+
Metadata: metadata,
877+
Source: source,
885878
}
879+
_, err = c.Push(ctx, &req)
886880

887881
if len(metadata) > 0 {
888882
d.ingesterAppends.WithLabelValues(ingester.Addr, typeMetadata).Inc()

pkg/distributor/distributor_test.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2689,10 +2689,6 @@ func (i *mockIngester) Close() error {
26892689
return nil
26902690
}
26912691

2692-
func (i *mockIngester) PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) {
2693-
return i.Push(ctx, &in.WriteRequest, opts...)
2694-
}
2695-
26962692
func (i *mockIngester) Push(ctx context.Context, req *cortexpb.WriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) {
26972693
i.Lock()
26982694
defer i.Unlock()

pkg/ingester/client/client.go

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,15 @@
11
package client
22

33
import (
4-
"context"
54
"flag"
65

7-
"github.com/cortexproject/cortex/pkg/cortexpb"
8-
"github.com/cortexproject/cortex/pkg/util/grpcclient"
9-
106
"github.com/go-kit/log"
117
"github.com/prometheus/client_golang/prometheus"
128
"github.com/prometheus/client_golang/prometheus/promauto"
139
"google.golang.org/grpc"
1410
"google.golang.org/grpc/health/grpc_health_v1"
11+
12+
"github.com/cortexproject/cortex/pkg/util/grpcclient"
1513
)
1614

1715
var ingesterClientRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
@@ -26,7 +24,6 @@ type HealthAndIngesterClient interface {
2624
IngesterClient
2725
grpc_health_v1.HealthClient
2826
Close() error
29-
PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error)
3027
}
3128

3229
type closableHealthAndIngesterClient struct {
@@ -35,15 +32,6 @@ type closableHealthAndIngesterClient struct {
3532
conn *grpc.ClientConn
3633
}
3734

38-
func (c *closableHealthAndIngesterClient) PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) {
39-
out := new(cortexpb.WriteResponse)
40-
err := c.conn.Invoke(ctx, "/cortex.Ingester/Push", in, out, opts...)
41-
if err != nil {
42-
return nil, err
43-
}
44-
return out, nil
45-
}
46-
4735
// MakeIngesterClient makes a new IngesterClient
4836
func MakeIngesterClient(addr string, cfg Config) (HealthAndIngesterClient, error) {
4937
dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(ingesterClientRequestDuration))

0 commit comments

Comments
 (0)