Skip to content

Add support for wait_for_result, remove disabled_queue #12742

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions .chloggen/add_wait_for_response.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for wait_for_result, remove disabled_queue

# One or more tracking issues or pull requests related to the change
issues: [12742]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: This has a side effect for users of the experimental BatchConfig with the queue disabled, since not this is |
uses only NumCPU() consumers.

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
64 changes: 48 additions & 16 deletions exporter/exporterhelper/internal/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
"context"
"errors"
"fmt"
"math"
"runtime"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -47,6 +49,10 @@
// Enabled indicates whether to not enqueue batches before exporting.
Enabled bool `mapstructure:"enabled"`

// WaitForResult determines if incoming requests are blocked until the request is processed or not.
// Currently, this option is not available when persistent queue is configured using the storage configuration.
WaitForResult bool `mapstructure:"wait_for_result"`

// Sizer determines the type of size measurement used by this component.
// It accepts "requests", "items", or "bytes".
Sizer request.SizerType `mapstructure:"sizer"`
Expand Down Expand Up @@ -99,11 +105,14 @@
return errors.New("`queue_size` must be positive")
}

if qCfg.StorageID != nil && qCfg.WaitForResult {
return errors.New("`wait_for_result` is not supported with a persistent queue configured with `storage`")
}

Check warning on line 110 in exporter/exporterhelper/internal/queue_sender.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/queue_sender.go#L109-L110

Added lines #L109 - L110 were not covered by tests

// Only support request sizer for persistent queue at this moment.
if qCfg.StorageID != nil && qCfg.Sizer != request.SizerTypeRequests {
return errors.New("persistent queue only supports `requests` sizer")
return errors.New("persistent queue configured with `storage` only supports `requests` sizer")

Check warning on line 114 in exporter/exporterhelper/internal/queue_sender.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/queue_sender.go#L114

Added line #L114 was not covered by tests
}

return nil
}

Expand Down Expand Up @@ -133,20 +142,43 @@
}

func newQueueBatchConfig(qCfg QueueConfig, bCfg BatcherConfig) queuebatch.Config {
qbCfg := queuebatch.Config{
Enabled: true,
WaitForResult: !qCfg.Enabled,
Sizer: qCfg.Sizer,
QueueSize: qCfg.QueueSize,
NumConsumers: qCfg.NumConsumers,
BlockOnOverflow: qCfg.BlockOnOverflow,
StorageID: qCfg.StorageID,
}
if bCfg.Enabled {
qbCfg.Batch = &queuebatch.BatchConfig{
FlushTimeout: bCfg.FlushTimeout,
MinSize: bCfg.MinSize,
MaxSize: bCfg.MaxSize,
var qbCfg queuebatch.Config
// User configured queueing, copy all config.
if qCfg.Enabled {
qbCfg = queuebatch.Config{
Enabled: true,
WaitForResult: qCfg.WaitForResult,
Sizer: qCfg.Sizer,
QueueSize: qCfg.QueueSize,
NumConsumers: qCfg.NumConsumers,
BlockOnOverflow: qCfg.BlockOnOverflow,
StorageID: qCfg.StorageID,
// TODO: Copy batching configuration as well when available.
}
// TODO: Remove this when WithBatcher is removed.
if bCfg.Enabled {
qbCfg.Batch = &queuebatch.BatchConfig{
FlushTimeout: bCfg.FlushTimeout,
MinSize: bCfg.MinSize,
MaxSize: bCfg.MaxSize,
}
}

Check warning on line 165 in exporter/exporterhelper/internal/queue_sender.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/queue_sender.go#L160-L165

Added lines #L160 - L165 were not covered by tests
} else {
// This can happen only if the deprecated way to configure batching is used with a "disabled" queue.
// TODO: Remove this when WithBatcher is removed.
qbCfg = queuebatch.Config{
Enabled: true,
WaitForResult: true,
Sizer: request.SizerTypeRequests,
QueueSize: math.MaxInt,
NumConsumers: runtime.NumCPU(),
BlockOnOverflow: true,
StorageID: nil,
Batch: &queuebatch.BatchConfig{
FlushTimeout: bCfg.FlushTimeout,
MinSize: bCfg.MinSize,
MaxSize: bCfg.MaxSize,
},
}
}
return qbCfg
Expand Down
28 changes: 26 additions & 2 deletions exporter/exporterhelper/internal/queuebatch/async_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,31 @@ func TestAsyncMemoryQueue(t *testing.T) {
func TestAsyncMemoryQueueBlocking(t *testing.T) {
consumed := &atomic.Int64{}
ac := newAsyncQueue(
newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 100, blocking: true}),
newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 100, blockOnOverflow: true}),
4, func(_ context.Context, _ int64, done Done) {
consumed.Add(1)
done.OnDone(nil)
})
require.NoError(t, ac.Start(context.Background(), componenttest.NewNopHost()))
wg := &sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 100_000; j++ {
assert.NoError(t, ac.Offer(context.Background(), 10))
}
}()
}
wg.Wait()
require.NoError(t, ac.Shutdown(context.Background()))
assert.EqualValues(t, 1_000_000, consumed.Load())
}

func TestAsyncMemoryWaitForResultQueueBlocking(t *testing.T) {
consumed := &atomic.Int64{}
ac := newAsyncQueue(
newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 100, waitForResult: true, blockOnOverflow: true}),
4, func(_ context.Context, _ int64, done Done) {
consumed.Add(1)
done.OnDone(nil)
Expand All @@ -58,7 +82,7 @@ func TestAsyncMemoryQueueBlocking(t *testing.T) {
func TestAsyncMemoryQueueBlockingCancelled(t *testing.T) {
stop := make(chan struct{})
ac := newAsyncQueue(
newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 10, blocking: true}),
newMemoryQueue[int64](memoryQueueSettings[int64]{sizer: sizerInt64{}, capacity: 10, blockOnOverflow: true}),
1, func(_ context.Context, _ int64, done Done) {
<-stop
done.OnDone(nil)
Expand Down
8 changes: 7 additions & 1 deletion exporter/exporterhelper/internal/queuebatch/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Config struct {
StorageID *component.ID `mapstructure:"storage"`

// NumConsumers is the maximum number of concurrent consumers from the queue.
// This applies across all different optional configurations from above (e.g. wait_for_result, blocking, persistent, etc.).
// This applies across all different optional configurations from above (e.g. wait_for_result, blockOnOverflow, persistent, etc.).
// TODO: This will also control the maximum number of shards, when supported:
// https://github.com/open-telemetry/opentelemetry-collector/issues/12473.
NumConsumers int `mapstructure:"num_consumers"`
Expand All @@ -64,6 +64,12 @@ func (cfg *Config) Validate() error {
if cfg.StorageID != nil && cfg.WaitForResult {
return errors.New("`wait_for_result` is not supported with a persistent queue configured with `storage`")
}

// Only support request sizer for persistent queue at this moment.
if cfg.StorageID != nil && cfg.Sizer != request.SizerTypeRequests {
return errors.New("persistent queue configured with `storage` only supports `requests` sizer")
}

return nil
}

Expand Down
76 changes: 76 additions & 0 deletions exporter/exporterhelper/internal/queuebatch/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queuebatch

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
)

func TestConfig_Validate(t *testing.T) {
cfg := newTestConfig()
require.NoError(t, cfg.Validate())

cfg.NumConsumers = 0
require.EqualError(t, cfg.Validate(), "`num_consumers` must be positive")

cfg = newTestConfig()
cfg.QueueSize = 0
require.EqualError(t, cfg.Validate(), "`queue_size` must be positive")

cfg = newTestConfig()
cfg.QueueSize = 0
require.EqualError(t, cfg.Validate(), "`queue_size` must be positive")

storageID := component.MustNewID("test")
cfg = newTestConfig()
cfg.WaitForResult = true
cfg.StorageID = &storageID
require.EqualError(t, cfg.Validate(), "`wait_for_result` is not supported with a persistent queue configured with `storage`")

cfg = newTestConfig()
cfg.Sizer = request.SizerTypeBytes
cfg.StorageID = &storageID
require.EqualError(t, cfg.Validate(), "persistent queue configured with `storage` only supports `requests` sizer")

// Confirm Validate doesn't return error with invalid config when feature is disabled
cfg.Enabled = false
assert.NoError(t, cfg.Validate())
}

func TestBatchConfig_Validate(t *testing.T) {
cfg := newTestBatchConfig()
require.NoError(t, cfg.Validate())

cfg = newTestBatchConfig()
cfg.FlushTimeout = 0
require.EqualError(t, cfg.Validate(), "`flush_timeout` must be positive")

cfg = newTestBatchConfig()
cfg.MinSize = -1
require.EqualError(t, cfg.Validate(), "`min_size` must be non-negative")

cfg = newTestBatchConfig()
cfg.MaxSize = -1
require.EqualError(t, cfg.Validate(), "`max_size` must be non-negative")

cfg = newTestBatchConfig()
cfg.MinSize = 2048
cfg.MaxSize = 1024
require.EqualError(t, cfg.Validate(), "`max_size` must be greater or equal to `min_size`")
}

func newTestBatchConfig() BatchConfig {
return BatchConfig{
FlushTimeout: 200 * time.Millisecond,
MinSize: 2048,
MaxSize: 0,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ func TestDisabledBatcher(t *testing.T) {
ba := newDisabledBatcher(sink.Export)

mq := newMemoryQueue[request.Request](memoryQueueSettings[request.Request]{
sizer: request.RequestsSizer[request.Request]{},
capacity: 1000,
blocking: true,
sizer: request.RequestsSizer[request.Request]{},
capacity: 1000,
blockOnOverflow: true,
})
q := newAsyncQueue(mq, tt.maxWorkers, ba.Consume)

Expand Down
82 changes: 0 additions & 82 deletions exporter/exporterhelper/internal/queuebatch/disabled_queue.go

This file was deleted.

Loading
Loading