Skip to content

Commit c53086d

Browse files
authored
[receiver/kafkareceiver] Make calls to error backoff thread-safe, add logging (#38941)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description The implementation of https://pkg.go.dev/github.com/cenkalti/backoff/v4#ExponentialBackOff is not thread-safe. When error backoff of kafka receiver is enabled, reading from a topic with multiple partitions under high load could cause data race error. This PR protect access to backoff with a mutex to make it thread-safe. <!--Describe what testing was performed and which tests were added.--> #### Testing Start a kafka locally ``` docker run -p 9092:9092 apache/kafka:3.9.0 ``` Scale `otlp_logs` topic to 8 partitions ``` ./kafka-topics --alter --partitions 8 --topic otlp_logs --bootstrap-server localhost:9092 ``` Run the collector locally with the following config ``` # This is a sample collector configuration file. It is also used when running the collector via make run. receivers: kafka: encoding: text_utf-8 error_backoff: enabled: true initial_interval: 500ms max_interval: 10s multiplier: 1.5 randomization_factor: 0 max_elapsed_time: 1m processors: memory_limiter: check_interval: 1s limit_mib: 20 spike_limit_mib: 0 batch: exporters: debug: service: logs: receivers: - kafka processors: - memory_limiter - batch exporters: - debug ``` Send messages to the kafka topic ``` ./kafka-producer-perf-test \ --topic otlp_logs \ --throughput 100000 \ --num-records 1000000 \ --record-size 5024 \ --producer-props bootstrap.servers=localhost:9092 ``` From the log messages we can better observe the backoff behavior
1 parent fa9afe8 commit c53086d

File tree

2 files changed

+93
-6
lines changed

2 files changed

+93
-6
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafkareceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: make calls to error backoff thread-safe and add logging
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [38941]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/kafkareceiver/kafka_receiver.go

Lines changed: 66 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,7 @@ type tracesConsumerGroupHandler struct {
460460
messageMarking MessageMarking
461461
headerExtractor HeaderExtractor
462462
backOff *backoff.ExponentialBackOff
463+
backOffMutex sync.Mutex
463464
}
464465

465466
type metricsConsumerGroupHandler struct {
@@ -478,6 +479,7 @@ type metricsConsumerGroupHandler struct {
478479
messageMarking MessageMarking
479480
headerExtractor HeaderExtractor
480481
backOff *backoff.ExponentialBackOff
482+
backOffMutex sync.Mutex
481483
}
482484

483485
type logsConsumerGroupHandler struct {
@@ -496,6 +498,7 @@ type logsConsumerGroupHandler struct {
496498
messageMarking MessageMarking
497499
headerExtractor HeaderExtractor
498500
backOff *backoff.ExponentialBackOff
501+
backOffMutex sync.Mutex
499502
}
500503

501504
var (
@@ -561,8 +564,13 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
561564
c.obsrecv.EndTracesOp(ctx, c.unmarshaler.Encoding(), spanCount, err)
562565
if err != nil {
563566
if errorRequiresBackoff(err) && c.backOff != nil {
564-
backOffDelay := c.backOff.NextBackOff()
567+
backOffDelay := c.getNextBackoff()
565568
if backOffDelay != backoff.Stop {
569+
c.logger.Info("Backing off due to error from the next consumer.",
570+
zap.Error(err),
571+
zap.Duration("delay", backOffDelay),
572+
zap.String("topic", message.Topic),
573+
zap.Int32("partition", claim.Partition()))
566574
select {
567575
case <-session.Context().Done():
568576
return nil
@@ -574,14 +582,16 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
574582
return err
575583
}
576584
}
585+
c.logger.Info("Stop error backoff because the configured max_elapsed_time is reached",
586+
zap.Duration("max_elapsed_time", c.backOff.MaxElapsedTime))
577587
}
578588
if c.messageMarking.After && c.messageMarking.OnError {
579589
session.MarkMessage(message, "")
580590
}
581591
return err
582592
}
583593
if c.backOff != nil {
584-
c.backOff.Reset()
594+
c.resetBackoff()
585595
}
586596
if c.messageMarking.After {
587597
session.MarkMessage(message, "")
@@ -599,6 +609,18 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
599609
}
600610
}
601611

612+
func (c *tracesConsumerGroupHandler) getNextBackoff() time.Duration {
613+
c.backOffMutex.Lock()
614+
defer c.backOffMutex.Unlock()
615+
return c.backOff.NextBackOff()
616+
}
617+
618+
func (c *tracesConsumerGroupHandler) resetBackoff() {
619+
c.backOffMutex.Lock()
620+
defer c.backOffMutex.Unlock()
621+
c.backOff.Reset()
622+
}
623+
602624
func (c *metricsConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
603625
c.readyCloser.Do(func() {
604626
close(c.ready)
@@ -656,8 +678,13 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS
656678
c.obsrecv.EndMetricsOp(ctx, c.unmarshaler.Encoding(), dataPointCount, err)
657679
if err != nil {
658680
if errorRequiresBackoff(err) && c.backOff != nil {
659-
backOffDelay := c.backOff.NextBackOff()
681+
backOffDelay := c.getNextBackoff()
660682
if backOffDelay != backoff.Stop {
683+
c.logger.Info("Backing off due to error from the next consumer.",
684+
zap.Error(err),
685+
zap.Duration("delay", backOffDelay),
686+
zap.String("topic", message.Topic),
687+
zap.Int32("partition", claim.Partition()))
661688
select {
662689
case <-session.Context().Done():
663690
return nil
@@ -669,14 +696,16 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS
669696
return err
670697
}
671698
}
699+
c.logger.Info("Stop error backoff because the configured max_elapsed_time is reached",
700+
zap.Duration("max_elapsed_time", c.backOff.MaxElapsedTime))
672701
}
673702
if c.messageMarking.After && c.messageMarking.OnError {
674703
session.MarkMessage(message, "")
675704
}
676705
return err
677706
}
678707
if c.backOff != nil {
679-
c.backOff.Reset()
708+
c.resetBackoff()
680709
}
681710
if c.messageMarking.After {
682711
session.MarkMessage(message, "")
@@ -694,6 +723,18 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS
694723
}
695724
}
696725

726+
func (c *metricsConsumerGroupHandler) getNextBackoff() time.Duration {
727+
c.backOffMutex.Lock()
728+
defer c.backOffMutex.Unlock()
729+
return c.backOff.NextBackOff()
730+
}
731+
732+
func (c *metricsConsumerGroupHandler) resetBackoff() {
733+
c.backOffMutex.Lock()
734+
defer c.backOffMutex.Unlock()
735+
c.backOff.Reset()
736+
}
737+
697738
func (c *logsConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
698739
c.readyCloser.Do(func() {
699740
close(c.ready)
@@ -750,8 +791,13 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
750791
c.obsrecv.EndLogsOp(ctx, c.unmarshaler.Encoding(), logRecordCount, err)
751792
if err != nil {
752793
if errorRequiresBackoff(err) && c.backOff != nil {
753-
backOffDelay := c.backOff.NextBackOff()
794+
backOffDelay := c.getNextBackoff()
754795
if backOffDelay != backoff.Stop {
796+
c.logger.Info("Backing off due to error from the next consumer.",
797+
zap.Error(err),
798+
zap.Duration("delay", backOffDelay),
799+
zap.String("topic", message.Topic),
800+
zap.Int32("partition", claim.Partition()))
755801
select {
756802
case <-session.Context().Done():
757803
return nil
@@ -763,14 +809,16 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
763809
return err
764810
}
765811
}
812+
c.logger.Info("Stop error backoff because the configured max_elapsed_time is reached",
813+
zap.Duration("max_elapsed_time", c.backOff.MaxElapsedTime))
766814
}
767815
if c.messageMarking.After && c.messageMarking.OnError {
768816
session.MarkMessage(message, "")
769817
}
770818
return err
771819
}
772820
if c.backOff != nil {
773-
c.backOff.Reset()
821+
c.resetBackoff()
774822
}
775823
if c.messageMarking.After {
776824
session.MarkMessage(message, "")
@@ -788,6 +836,18 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
788836
}
789837
}
790838

839+
func (c *logsConsumerGroupHandler) getNextBackoff() time.Duration {
840+
c.backOffMutex.Lock()
841+
defer c.backOffMutex.Unlock()
842+
return c.backOff.NextBackOff()
843+
}
844+
845+
func (c *logsConsumerGroupHandler) resetBackoff() {
846+
c.backOffMutex.Lock()
847+
defer c.backOffMutex.Unlock()
848+
c.backOff.Reset()
849+
}
850+
791851
func newExponentialBackOff(config configretry.BackOffConfig) *backoff.ExponentialBackOff {
792852
if !config.Enabled {
793853
return nil

0 commit comments

Comments
 (0)