Skip to content

Commit 6bc0201

Browse files
axwMovieStoreGuy
andauthored
kafkareceiver: connect async, report status (#40705)
#### Description Make consumer group creation asynchronous (i.e. move it to background goroutine), and report component status. This change ensures that the collector can start up if the Kafka cluster is not available or the consumer group creation fails due to some other transient, retryable error. We also now report the component status to indicate if and when the consumer group has been successfully created, and whether the receiver is ready to consume messages. This can be observed via the healthcheckv2 extension, e.g. for health/readiness probes. #### Link to tracking issue Fixes #40516 #### Testing First, I set up the collector with the following config: ```yaml extensions: healthcheckv2: use_v2: true component_health: include_permanent_errors: true include_recoverable_errors: true recovery_duration: 30s http: endpoint: localhost:13133 status: enabled: true path: /health/status config: enabled: true path: /health/config receivers: kafka: brokers: [localhost:9092] exporters: debug: verbosity: detailed service: extensions: [healthcheckv2] pipelines: metrics/kafka: receivers: [kafka] exporters: [debug] ``` Then: 1. Without Kafka running locally, start the collector 2. Running `curl 'http://localhost:13133/health/status?verbose' | jq .`, note that the receiver's status moves to StatusRecoverableError and reports an error like "kafka: client has run out of available brokers to talk to: dial tcp ..." 3. Start Kafka: `docker run --rm -p 9092:9092 --name broker apache/kafka:4.0.0` 4. Continue curling the status, and observe it moves to StatusOK 5. Stop Kafka, observe status moves back to StatusRecoverableError #### Documentation No changes --------- Co-authored-by: Sean Marciniak <[email protected]>
1 parent f51e4d2 commit 6bc0201

File tree

6 files changed

+215
-36
lines changed

6 files changed

+215
-36
lines changed
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafkareceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Create the consumer group asynchronously, and report component status
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [40516]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
This change ensures that the collector can start up if the Kafka cluster is not available
20+
or the consumer group creation fails due to some other transient, retryable error.
21+
22+
We also now report the component status to indicate if and when the consumer group has been
23+
successfully created, and whether the receiver is ready to consume messages. This can be
24+
observed via the healthcheckv2 extension.
25+
26+
# If your change doesn't affect end users or the exported elements of any package,
27+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
28+
# Optional: The change log or logs in which this entry should be included.
29+
# e.g. '[user]' or '[user, api]'
30+
# Include 'user' if the change is relevant to end users.
31+
# Include 'api' if there is a change to a library API.
32+
# Default: '[user]'
33+
change_logs: [user]

receiver/kafkareceiver/consumer_sarama.go

Lines changed: 76 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/IBM/sarama"
1414
"github.com/cenkalti/backoff/v4"
1515
"go.opentelemetry.io/collector/component"
16+
"go.opentelemetry.io/collector/component/componentstatus"
1617
"go.opentelemetry.io/collector/consumer/consumererror"
1718
"go.opentelemetry.io/collector/receiver"
1819
"go.opentelemetry.io/collector/receiver/receiverhelper"
@@ -54,8 +55,8 @@ type saramaConsumer struct {
5455
mu sync.Mutex
5556
started bool
5657
shutdown bool
58+
closing chan struct{}
5759
consumeLoopClosed chan struct{}
58-
consumerGroup sarama.ConsumerGroup
5960
}
6061

6162
func (c *saramaConsumer) Start(_ context.Context, host component.Host) error {
@@ -77,20 +78,10 @@ func (c *saramaConsumer) Start(_ context.Context, host component.Host) error {
7778
return err
7879
}
7980

80-
consumerGroup, err := kafka.NewSaramaConsumerGroup(
81-
context.Background(),
82-
c.config.ClientConfig,
83-
c.config.ConsumerConfig,
84-
)
85-
if err != nil {
86-
return err
87-
}
88-
c.consumerGroup = consumerGroup
89-
9081
handler := &consumerGroupHandler{
82+
host: host,
9183
id: c.settings.ID,
9284
logger: c.settings.Logger,
93-
ready: make(chan bool),
9485
obsrecv: obsrecv,
9586
autocommitEnabled: c.config.AutoCommit.Enable,
9687
messageMarking: c.config.MessageMarking,
@@ -105,42 +96,89 @@ func (c *saramaConsumer) Start(_ context.Context, host component.Host) error {
10596

10697
c.consumeLoopClosed = make(chan struct{})
10798
c.started = true
108-
go c.consumeLoop(handler)
99+
c.closing = make(chan struct{})
100+
go c.consumeLoop(handler, host)
109101
return nil
110102
}
111103

112-
func (c *saramaConsumer) consumeLoop(handler sarama.ConsumerGroupHandler) {
104+
func (c *saramaConsumer) consumeLoop(handler sarama.ConsumerGroupHandler, host component.Host) {
113105
defer close(c.consumeLoopClosed)
106+
defer componentstatus.ReportStatus(host, componentstatus.NewEvent(componentstatus.StatusStopped))
107+
componentstatus.ReportStatus(host, componentstatus.NewEvent(componentstatus.StatusStarting))
108+
109+
ctx, cancel := context.WithCancel(context.Background())
110+
defer cancel()
111+
go func() {
112+
select {
113+
case <-ctx.Done():
114+
case <-c.closing:
115+
componentstatus.ReportStatus(host, componentstatus.NewEvent(componentstatus.StatusStopping))
116+
cancel()
117+
}
118+
}()
119+
120+
// kafka.NewSaramaConsumerGroup (actually sarama.NewConsumerGroup)
121+
// may perform synchronous operations that can fail due to transient
122+
// errors, so we retry until it succeeds or the context is canceled.
123+
var consumerGroup sarama.ConsumerGroup
124+
err := backoff.Retry(func() (err error) {
125+
consumerGroup, err = kafka.NewSaramaConsumerGroup(ctx, c.config.ClientConfig, c.config.ConsumerConfig)
126+
if err != nil {
127+
if ctx.Err() == nil {
128+
// We only report an error if the context is not canceled.
129+
// If the context is canceled it means the receiver is
130+
// shutting down, which will lead to reporting StatusStopped
131+
// when consumeLoop exits.
132+
c.settings.Logger.Error("Error creating consumer group", zap.Error(err))
133+
componentstatus.ReportStatus(host, componentstatus.NewRecoverableErrorEvent(err))
134+
}
135+
return err
136+
}
137+
return nil
138+
}, backoff.WithContext(
139+
// Use a zero max elapsed time to retry indefinitely until the context is canceled.
140+
backoff.NewExponentialBackOff(backoff.WithMaxElapsedTime(0)),
141+
ctx,
142+
))
143+
if err != nil {
144+
return
145+
}
146+
defer func() {
147+
if err := consumerGroup.Close(); err != nil {
148+
c.settings.Logger.Error("Error closing consumer group", zap.Error(err))
149+
}
150+
}()
151+
c.settings.Logger.Debug("Created consumer group")
114152

115-
ctx := context.Background()
116153
for {
117154
// `Consume` should be called inside an infinite loop, when a
118155
// server-side rebalance happens, the consumer session will need to be
119156
// recreated to get the new claims
120-
if err := c.consumerGroup.Consume(ctx, c.topics, handler); err != nil {
157+
if err := consumerGroup.Consume(ctx, c.topics, handler); err != nil {
158+
if errors.Is(err, context.Canceled) {
159+
// Shutting down
160+
return
161+
}
121162
if errors.Is(err, sarama.ErrClosedConsumerGroup) {
122-
c.settings.Logger.Info("Consumer stopped", zap.Error(ctx.Err()))
163+
// Consumer group stopped unexpectedly.
164+
c.settings.Logger.Warn("Consumer stopped", zap.Error(ctx.Err()))
123165
return
124166
}
125167
c.settings.Logger.Error("Error from consumer", zap.Error(err))
168+
componentstatus.ReportStatus(host, componentstatus.NewRecoverableErrorEvent(err))
126169
}
127170
}
128171
}
129172

130173
func (c *saramaConsumer) Shutdown(ctx context.Context) error {
131174
c.mu.Lock()
132175
defer c.mu.Unlock()
133-
if c.shutdown {
176+
if !c.started || c.shutdown {
134177
return nil
135178
}
136179
c.shutdown = true
137-
if !c.started {
138-
return nil
139-
}
180+
close(c.closing)
140181

141-
if err := c.consumerGroup.Close(); err != nil {
142-
return err
143-
}
144182
select {
145183
case <-ctx.Done():
146184
return ctx.Err()
@@ -150,10 +188,9 @@ func (c *saramaConsumer) Shutdown(ctx context.Context) error {
150188
}
151189

152190
type consumerGroupHandler struct {
191+
host component.Host
153192
id component.ID
154193
consumeMessage consumeMessageFunc
155-
ready chan bool
156-
readyCloser sync.Once
157194
logger *zap.Logger
158195

159196
obsrecv *receiverhelper.ObsReport
@@ -166,22 +203,29 @@ type consumerGroupHandler struct {
166203
}
167204

168205
func (c *consumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
169-
c.readyCloser.Do(func() { close(c.ready) })
206+
c.logger.Debug("Consumer group session established")
207+
componentstatus.ReportStatus(c.host, componentstatus.NewEvent(componentstatus.StatusOK))
170208
c.telemetryBuilder.KafkaReceiverPartitionStart.Add(
171209
session.Context(), 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.Name())),
172210
)
173211
return nil
174212
}
175213

176214
func (c *consumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
215+
c.logger.Debug("Consumer group session stopped")
177216
c.telemetryBuilder.KafkaReceiverPartitionClose.Add(
178217
session.Context(), 1, metric.WithAttributes(attribute.String(attrInstanceName, c.id.Name())),
179218
)
180219
return nil
181220
}
182221

183222
func (c *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
184-
c.logger.Info("Starting consumer group", zap.Int32("partition", claim.Partition()))
223+
c.logger.Debug(
224+
"Consuming Kafka topic-partition",
225+
zap.String("topic", claim.Topic()),
226+
zap.Int32("partition", claim.Partition()),
227+
zap.Int64("initial_offset", claim.InitialOffset()),
228+
)
185229
if !c.autocommitEnabled {
186230
defer session.Commit()
187231
}
@@ -242,8 +286,10 @@ func (c *consumerGroupHandler) handleMessage(
242286
return err
243287
}
244288
}
245-
c.logger.Warn("Stop error backoff because the configured max_elapsed_time is reached",
246-
zap.Duration("max_elapsed_time", c.backOff.MaxElapsedTime))
289+
c.logger.Warn(
290+
"Stop error backoff because the configured max_elapsed_time is reached",
291+
zap.Duration("max_elapsed_time", c.backOff.MaxElapsedTime),
292+
)
247293
}
248294
if c.messageMarking.After && !c.messageMarking.OnError {
249295
// Only return an error if messages are marked after successful processing.

receiver/kafkareceiver/factory_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ func TestCreateTraces(t *testing.T) {
3030
cfg.ProtocolVersion = "2.0.0"
3131
r, err := createTracesReceiver(context.Background(), receivertest.NewNopSettings(metadata.Type), cfg, nil)
3232
require.NoError(t, err)
33-
// no available broker
34-
require.Error(t, r.Start(context.Background(), componenttest.NewNopHost()))
33+
require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()))
34+
assert.NoError(t, r.Shutdown(context.Background()))
3535
}
3636

3737
func TestWithTracesUnmarshalers(t *testing.T) {
@@ -64,8 +64,8 @@ func TestCreateMetrics(t *testing.T) {
6464
cfg.ProtocolVersion = "2.0.0"
6565
r, err := createMetricsReceiver(context.Background(), receivertest.NewNopSettings(metadata.Type), cfg, nil)
6666
require.NoError(t, err)
67-
// no available broker
68-
require.Error(t, r.Start(context.Background(), componenttest.NewNopHost()))
67+
require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()))
68+
assert.NoError(t, r.Shutdown(context.Background()))
6969
}
7070

7171
func TestWithMetricsUnmarshalers(t *testing.T) {
@@ -98,8 +98,8 @@ func TestCreateLogs(t *testing.T) {
9898
cfg.ProtocolVersion = "2.0.0"
9999
r, err := createLogsReceiver(context.Background(), receivertest.NewNopSettings(metadata.Type), cfg, nil)
100100
require.NoError(t, err)
101-
// no available broker
102-
require.Error(t, r.Start(context.Background(), componenttest.NewNopHost()))
101+
require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost()))
102+
assert.NoError(t, r.Shutdown(context.Background()))
103103
}
104104

105105
func TestWithLogsUnmarshalers(t *testing.T) {

receiver/kafkareceiver/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ require (
2222
github.com/twmb/franz-go/pkg/kfake v0.0.0-20250320172111-35ab5e5f5327
2323
go.opentelemetry.io/collector/client v1.34.1-0.20250610090210-188191247685
2424
go.opentelemetry.io/collector/component v1.34.1-0.20250610090210-188191247685
25+
go.opentelemetry.io/collector/component/componentstatus v0.128.1-0.20250610090210-188191247685
2526
go.opentelemetry.io/collector/component/componenttest v0.128.1-0.20250610090210-188191247685
2627
go.opentelemetry.io/collector/config/configretry v1.34.1-0.20250610090210-188191247685
2728
go.opentelemetry.io/collector/config/configtls v1.34.1-0.20250610090210-188191247685

receiver/kafkareceiver/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)