Skip to content

[receiver/kafkareceiver] Make calls to error backoff thread-safe, add logging #38941

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 25, 2025

Conversation

yiquanzhou
Copy link
Contributor

Description

The implementation of https://pkg.go.dev/github.com/cenkalti/backoff/v4#ExponentialBackOff is not thread-safe.
When error backoff of kafka receiver is enabled, reading from a topic with multiple partitions under high load could cause data race error.

This PR protect access to backoff with a mutex to make it thread-safe.

Testing

Start a kafka locally

docker run -p 9092:9092 apache/kafka:3.9.0

Scale otlp_logs topic to 8 partitions

./kafka-topics --alter --partitions 8 --topic otlp_logs --bootstrap-server localhost:9092

Run the collector locally with the following config

# This is a sample collector configuration file. It is also used when running the collector via make run.

receivers:
  kafka:
    encoding: text_utf-8
    error_backoff:
      enabled: true
      initial_interval: 500ms
      max_interval: 10s
      multiplier: 1.5
      randomization_factor: 0
      max_elapsed_time: 1m

processors:
  memory_limiter:
    check_interval: 1s
    limit_mib: 20
    spike_limit_mib: 0

  batch:

exporters:
  debug:

service:
    logs:
      receivers:
      - kafka
      processors:
      - memory_limiter
      - batch
      exporters:
      - debug

Send messages to the kafka topic

./kafka-producer-perf-test \
  --topic otlp_logs \
  --throughput 100000 \
  --num-records 1000000 \
  --record-size 5024 \
  --producer-props bootstrap.servers=localhost:9092

From the log messages we can better observe the backoff behavior

Copy link
Contributor

@axw axw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. We should probably add some concurrency tests. I'll take a look into that once #38882 is in.

Copy link
Contributor

@atoulme atoulme left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved by codeowners. CI passes. Merging.

@atoulme atoulme merged commit c53086d into open-telemetry:main Mar 25, 2025
171 checks passed
@github-actions github-actions bot added this to the next release milestone Mar 25, 2025
Fiery-Fenix pushed a commit to Fiery-Fenix/opentelemetry-collector-contrib that referenced this pull request Apr 24, 2025
… logging (open-telemetry#38941)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description
The implementation of
https://pkg.go.dev/github.com/cenkalti/backoff/v4#ExponentialBackOff is
not thread-safe.
When error backoff of kafka receiver is enabled, reading from a topic
with multiple partitions under high load could cause data race error.

This PR protect access to backoff with a mutex to make it thread-safe.

<!--Describe what testing was performed and which tests were added.-->
#### Testing
Start a kafka locally
```
docker run -p 9092:9092 apache/kafka:3.9.0
```

Scale `otlp_logs` topic to 8 partitions
```
./kafka-topics --alter --partitions 8 --topic otlp_logs --bootstrap-server localhost:9092
```

Run the collector locally with the following config
```
# This is a sample collector configuration file. It is also used when running the collector via make run.

receivers:
  kafka:
    encoding: text_utf-8
    error_backoff:
      enabled: true
      initial_interval: 500ms
      max_interval: 10s
      multiplier: 1.5
      randomization_factor: 0
      max_elapsed_time: 1m

processors:
  memory_limiter:
    check_interval: 1s
    limit_mib: 20
    spike_limit_mib: 0

  batch:

exporters:
  debug:

service:
    logs:
      receivers:
      - kafka
      processors:
      - memory_limiter
      - batch
      exporters:
      - debug
```

Send messages to the kafka topic
```
./kafka-producer-perf-test \
  --topic otlp_logs \
  --throughput 100000 \
  --num-records 1000000 \
  --record-size 5024 \
  --producer-props bootstrap.servers=localhost:9092
```

From the log messages we can better observe the backoff behavior
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants