Skip to content

Conversation

astuyve
Copy link
Contributor

@astuyve astuyve commented Aug 12, 2025

  • feat: channel based aggregator
  • aggregator service
  • wip
  • remove weird logging for inserts

What does this PR do?

Adds an aggregator service which receives incoming commands on a channel. Commands can be flush requests, inserts, or shutdowns.

this removes lock contention as concurrently arriving payloads will be queued in the channel while flushing occurs.

Motivation

Additional Notes

Describe how to test/QA your changes

@astuyve astuyve requested review from Copilot and duncanpharvey and removed request for Copilot August 13, 2025 00:33
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces a channel-based aggregator service to replace the mutex-based aggregation pattern, aiming to reduce lock contention when multiple payloads arrive concurrently while flushing occurs.

  • Replaces Arc<Mutex<Aggregator>> with AggregatorService and AggregatorHandle using tokio channels
  • Implements command pattern for aggregator operations (insert batch, flush, shutdown)
  • Updates all components (DogStatsD, Flusher, tests) to use the new aggregator service architecture

Reviewed Changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
crates/dogstatsd/src/aggregator_service.rs New aggregator service implementation with channel-based command handling
crates/dogstatsd/src/flusher.rs Updated to use AggregatorHandle instead of mutex-wrapped aggregator
crates/dogstatsd/src/dogstatsd.rs Modified to send metrics through channels via AggregatorHandle
crates/dogstatsd/tests/integration_test.rs Updated test setup to use new aggregator service pattern
crates/datadog-serverless-compat/src/main.rs Integrated aggregator service into main application flow
crates/dogstatsd/src/lib.rs Added new aggregator_service module export
LICENSE-3rdparty.csv Updated third-party license entries for new dependencies

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

};

if let Err(_) = response_tx.send(response) {
error!("Failed to send flush response - receiver dropped");
Copy link
Preview

Copilot AI Aug 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using a more descriptive pattern match or binding the error to log it. The current if let Err(_) = ... pattern discards potentially useful error information.

Suggested change
error!("Failed to send flush response - receiver dropped");
if let Err(e) = response_tx.send(response) {
error!("Failed to send flush response - receiver dropped: {}", e);

Copilot uses AI. Check for mistakes.

let mut insert_errors = 0;
for metric in metrics {
// The only possible error here is an overflow
if let Err(_e) = self.aggregator.insert(metric) {
Copy link
Preview

Copilot AI Aug 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error _e is bound but not used. Consider either logging the error or using Err(_) if the error details aren't needed.

Suggested change
if let Err(_e) = self.aggregator.insert(metric) {
if let Err(_) = self.aggregator.insert(metric) {

Copilot uses AI. Check for mistakes.

Comment on lines +180 to +187
let (service, handle) = AggregatorService::new(
SortedTags::parse(dogstatsd_tags).unwrap_or(EMPTY_TAGS),
CONTEXTS,
)
.expect("Failed to create aggregator service");

// 2. Start the aggregator service in the background
tokio::spawn(service.run());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense for the service.run to spawn the task, instead of us doing it? Or would this be a way for us to keep control on it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm learning that the nice part of doing things this way is that the caller of the task can decide which tokio runtime or pool to run the task in. In this case the serverless-compat binary, but in the other case it's in bottlecap. So if we want to move it to run in the blocking threadpool or run in a named tokio runtime, we can do that.

Copy link
Contributor

@duncanista duncanista left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me!

I'd see if it would make sense to split handler, service, and the aggregator in a file each in an aggregator folder, but that's more of a personal choice of mine, up to you, more of a nit

@astuyve astuyve merged commit 4d616c9 into main Aug 21, 2025
51 checks passed
@astuyve astuyve deleted the aj/channel-aggregator branch August 21, 2025 16:41
litianningdatadog added a commit that referenced this pull request Aug 22, 2025
astuyve added a commit that referenced this pull request Aug 26, 2025
lym953 added a commit to DataDog/datadog-lambda-extension that referenced this pull request Sep 19, 2025
## This PR
- Add the skeleton of `StatsConcentrator`, with no implementation
- Add `StatsConcentratorHandle` and `StatsConcentratorService`, which
send and process stats requests (`add()` and `get_stats()`) to/from a
queue, so mutex is not needed, and lock contention can be avoided.
(Thanks @duncanista for the suggestion and @astuyve for the example code
DataDog/serverless-components#32)

## Next steps
- Implement `StatsConcentrator`, which aggregates stats data into
buckets and returns it in batch
- Add more fields to `AggregationKey` and `Stats`
- Move the processing of stats after "obfuscation", as suggested by APM
team. This will involve lots of code changes, so I'll make it a separate
PR.

I'll mainly move code from this draft PR:
#827

## Architecture
<img width="1296" height="674" alt="image"
src="https://github.com/user-attachments/assets/2d4cb925-6cfc-4581-8ed6-6bd87cf0d87a"
/>

Jira: https://datadoghq.atlassian.net/browse/SVLS-7593
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants