Skip to content

Commit ca00ad8

Browse files
committed
implement error backoff in metrics and logs pipelines
1 parent 78a77fe commit ca00ad8

File tree

2 files changed

+145
-65
lines changed

2 files changed

+145
-65
lines changed

receiver/kafkareceiver/kafka_receiver.go

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -224,20 +224,6 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro
224224
return nil
225225
}
226226

227-
func newExponentialBackOff(config configretry.BackOffConfig) *backoff.ExponentialBackOff {
228-
if !config.Enabled {
229-
return nil
230-
}
231-
backOff := backoff.NewExponentialBackOff()
232-
backOff.InitialInterval = config.InitialInterval
233-
backOff.RandomizationFactor = config.RandomizationFactor
234-
backOff.Multiplier = config.Multiplier
235-
backOff.MaxInterval = config.MaxInterval
236-
backOff.MaxElapsedTime = config.MaxElapsedTime
237-
backOff.Reset()
238-
return backOff
239-
}
240-
241227
func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) {
242228
defer c.consumeLoopWG.Done()
243229
for {
@@ -333,6 +319,7 @@ func (c *kafkaMetricsConsumer) Start(_ context.Context, host component.Host) err
333319
messageMarking: c.messageMarking,
334320
headerExtractor: &nopHeaderExtractor{},
335321
telemetryBuilder: c.telemetryBuilder,
322+
backOff: newExponentialBackOff(c.config.ErrorBackOff),
336323
}
337324
if c.headerExtraction {
338325
metricsConsumerGroup.headerExtractor = &headerExtractor{
@@ -444,6 +431,7 @@ func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error
444431
messageMarking: c.messageMarking,
445432
headerExtractor: &nopHeaderExtractor{},
446433
telemetryBuilder: c.telemetryBuilder,
434+
backOff: newExponentialBackOff(c.config.ErrorBackOff),
447435
}
448436
if c.headerExtraction {
449437
logsConsumerGroup.headerExtractor = &headerExtractor{
@@ -519,6 +507,7 @@ type metricsConsumerGroupHandler struct {
519507
autocommitEnabled bool
520508
messageMarking MessageMarking
521509
headerExtractor HeaderExtractor
510+
backOff *backoff.ExponentialBackOff
522511
}
523512

524513
type logsConsumerGroupHandler struct {
@@ -536,6 +525,7 @@ type logsConsumerGroupHandler struct {
536525
autocommitEnabled bool
537526
messageMarking MessageMarking
538527
headerExtractor HeaderExtractor
528+
backOff *backoff.ExponentialBackOff
539529
}
540530

541531
var (
@@ -632,7 +622,7 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
632622
}
633623

634624
func errorRequiresBackoff(err error) bool {
635-
return err.Error() == errMemoryLimiterDataRefused.Error()
625+
return errors.Is(err, errMemoryLimiterDataRefused)
636626
}
637627

638628
func (c *metricsConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
@@ -694,8 +684,18 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS
694684
if c.messageMarking.After && c.messageMarking.OnError {
695685
session.MarkMessage(message, "")
696686
}
687+
if errorRequiresBackoff(err) && c.backOff != nil {
688+
select {
689+
case <-session.Context().Done():
690+
return nil
691+
case <-time.After(c.backOff.NextBackOff()):
692+
}
693+
}
697694
return err
698695
}
696+
if c.backOff != nil {
697+
c.backOff.Reset()
698+
}
699699
if c.messageMarking.After {
700700
session.MarkMessage(message, "")
701701
}
@@ -770,8 +770,18 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
770770
if c.messageMarking.After && c.messageMarking.OnError {
771771
session.MarkMessage(message, "")
772772
}
773+
if errorRequiresBackoff(err) && c.backOff != nil {
774+
select {
775+
case <-session.Context().Done():
776+
return nil
777+
case <-time.After(c.backOff.NextBackOff()):
778+
}
779+
}
773780
return err
774781
}
782+
if c.backOff != nil {
783+
c.backOff.Reset()
784+
}
775785
if c.messageMarking.After {
776786
session.MarkMessage(message, "")
777787
}
@@ -788,6 +798,20 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
788798
}
789799
}
790800

801+
func newExponentialBackOff(config configretry.BackOffConfig) *backoff.ExponentialBackOff {
802+
if !config.Enabled {
803+
return nil
804+
}
805+
backOff := backoff.NewExponentialBackOff()
806+
backOff.InitialInterval = config.InitialInterval
807+
backOff.RandomizationFactor = config.RandomizationFactor
808+
backOff.Multiplier = config.Multiplier
809+
backOff.MaxInterval = config.MaxInterval
810+
backOff.MaxElapsedTime = config.MaxElapsedTime
811+
backOff.Reset()
812+
return backOff
813+
}
814+
791815
func toSaramaInitialOffset(initialOffset string) (int64, error) {
792816
switch initialOffset {
793817
case offsetEarliest:

receiver/kafkareceiver/kafka_receiver_test.go

Lines changed: 106 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -713,34 +713,62 @@ func TestMetricsConsumerGroupHandler_error_nextConsumer(t *testing.T) {
713713
consumerError := errors.New("failed to consume")
714714
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()})
715715
require.NoError(t, err)
716-
c := metricsConsumerGroupHandler{
717-
unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding),
718-
logger: zap.NewNop(),
719-
ready: make(chan bool),
720-
nextConsumer: consumertest.NewErr(consumerError),
721-
obsrecv: obsrecv,
722-
headerExtractor: &nopHeaderExtractor{},
723-
telemetryBuilder: nopTelemetryBuilder(t),
724-
}
725716

726-
wg := sync.WaitGroup{}
727-
wg.Add(1)
728-
groupClaim := &testConsumerGroupClaim{
729-
messageChan: make(chan *sarama.ConsumerMessage),
717+
tests := []struct {
718+
name string
719+
err error
720+
expectedBackoff time.Duration
721+
}{
722+
{
723+
name: "memory limiter data refused error",
724+
err: errMemoryLimiterDataRefused,
725+
expectedBackoff: backoff.DefaultInitialInterval,
726+
},
727+
{
728+
name: "consumer error that does not require backoff",
729+
err: consumerError,
730+
expectedBackoff: 0,
731+
},
730732
}
731-
go func() {
732-
e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim)
733-
assert.EqualError(t, e, consumerError.Error())
734-
wg.Done()
735-
}()
736733

737-
ld := testdata.GenerateMetrics(1)
738-
unmarshaler := &pmetric.ProtoMarshaler{}
739-
bts, err := unmarshaler.MarshalMetrics(ld)
740-
require.NoError(t, err)
741-
groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts}
742-
close(groupClaim.messageChan)
743-
wg.Wait()
734+
for _, tt := range tests {
735+
t.Run(tt.name, func(t *testing.T) {
736+
backOff := backoff.NewExponentialBackOff()
737+
backOff.RandomizationFactor = 0
738+
c := metricsConsumerGroupHandler{
739+
unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding),
740+
logger: zap.NewNop(),
741+
ready: make(chan bool),
742+
nextConsumer: consumertest.NewErr(tt.err),
743+
obsrecv: obsrecv,
744+
headerExtractor: &nopHeaderExtractor{},
745+
telemetryBuilder: nopTelemetryBuilder(t),
746+
backOff: backOff,
747+
}
748+
749+
wg := sync.WaitGroup{}
750+
wg.Add(1)
751+
groupClaim := &testConsumerGroupClaim{
752+
messageChan: make(chan *sarama.ConsumerMessage),
753+
}
754+
go func() {
755+
start := time.Now()
756+
e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim)
757+
end := time.Now()
758+
assert.EqualError(t, e, tt.err.Error())
759+
assert.WithinDuration(t, start.Add(tt.expectedBackoff), end, 100*time.Millisecond)
760+
wg.Done()
761+
}()
762+
763+
ld := testdata.GenerateMetrics(1)
764+
unmarshaler := &pmetric.ProtoMarshaler{}
765+
bts, err := unmarshaler.MarshalMetrics(ld)
766+
require.NoError(t, err)
767+
groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts}
768+
close(groupClaim.messageChan)
769+
wg.Wait()
770+
})
771+
}
744772
}
745773

746774
func TestMetricsReceiver_encoding_extension(t *testing.T) {
@@ -1072,34 +1100,62 @@ func TestLogsConsumerGroupHandler_error_nextConsumer(t *testing.T) {
10721100
consumerError := errors.New("failed to consume")
10731101
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopSettings()})
10741102
require.NoError(t, err)
1075-
c := logsConsumerGroupHandler{
1076-
unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
1077-
logger: zap.NewNop(),
1078-
ready: make(chan bool),
1079-
nextConsumer: consumertest.NewErr(consumerError),
1080-
obsrecv: obsrecv,
1081-
headerExtractor: &nopHeaderExtractor{},
1082-
telemetryBuilder: nopTelemetryBuilder(t),
1083-
}
10841103

1085-
wg := sync.WaitGroup{}
1086-
wg.Add(1)
1087-
groupClaim := &testConsumerGroupClaim{
1088-
messageChan: make(chan *sarama.ConsumerMessage),
1104+
tests := []struct {
1105+
name string
1106+
err error
1107+
expectedBackoff time.Duration
1108+
}{
1109+
{
1110+
name: "memory limiter data refused error",
1111+
err: errMemoryLimiterDataRefused,
1112+
expectedBackoff: backoff.DefaultInitialInterval,
1113+
},
1114+
{
1115+
name: "consumer error that does not require backoff",
1116+
err: consumerError,
1117+
expectedBackoff: 0,
1118+
},
10891119
}
1090-
go func() {
1091-
e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim)
1092-
assert.EqualError(t, e, consumerError.Error())
1093-
wg.Done()
1094-
}()
10951120

1096-
ld := testdata.GenerateLogs(1)
1097-
unmarshaler := &plog.ProtoMarshaler{}
1098-
bts, err := unmarshaler.MarshalLogs(ld)
1099-
require.NoError(t, err)
1100-
groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts}
1101-
close(groupClaim.messageChan)
1102-
wg.Wait()
1121+
for _, tt := range tests {
1122+
t.Run(tt.name, func(t *testing.T) {
1123+
backOff := backoff.NewExponentialBackOff()
1124+
backOff.RandomizationFactor = 0
1125+
c := logsConsumerGroupHandler{
1126+
unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding),
1127+
logger: zap.NewNop(),
1128+
ready: make(chan bool),
1129+
nextConsumer: consumertest.NewErr(tt.err),
1130+
obsrecv: obsrecv,
1131+
headerExtractor: &nopHeaderExtractor{},
1132+
telemetryBuilder: nopTelemetryBuilder(t),
1133+
backOff: backOff,
1134+
}
1135+
1136+
wg := sync.WaitGroup{}
1137+
wg.Add(1)
1138+
groupClaim := &testConsumerGroupClaim{
1139+
messageChan: make(chan *sarama.ConsumerMessage),
1140+
}
1141+
go func() {
1142+
start := time.Now()
1143+
e := c.ConsumeClaim(testConsumerGroupSession{ctx: context.Background()}, groupClaim)
1144+
end := time.Now()
1145+
assert.EqualError(t, e, tt.err.Error())
1146+
assert.WithinDuration(t, start.Add(tt.expectedBackoff), end, 100*time.Millisecond)
1147+
wg.Done()
1148+
}()
1149+
1150+
ld := testdata.GenerateLogs(1)
1151+
unmarshaler := &plog.ProtoMarshaler{}
1152+
bts, err := unmarshaler.MarshalLogs(ld)
1153+
require.NoError(t, err)
1154+
groupClaim.messageChan <- &sarama.ConsumerMessage{Value: bts}
1155+
close(groupClaim.messageChan)
1156+
wg.Wait()
1157+
})
1158+
}
11031159
}
11041160

11051161
// Test unmarshaler for different charsets and encodings.

0 commit comments

Comments
 (0)