Skip to content

Conversation

lym953
Copy link
Contributor

@lym953 lym953 commented Sep 10, 2025

What

This PR enable accurate counting of trace.aws.lambda.hits metric (hopefully).

How

Adds a "stats concentrator", which is a simple version of the stats concentrator in datadog-agent

Architecture

image

Testing

Tested with a modified "busycaller" self monitoring stack with Python 3.11.

After

The "hits" graph in Datadog overall aligns with the graph on AWS. The counts are:

  • Datadog: [44, 171, 171, 169, 172, 121]
  • AWS: [44, 171, 171, 170, 172, 121]

There's an undercounting of 1 invocation. To investigate later.

image image

Before

The "hits" graph in Datadog was very different from the graph on AWS. The timestamp for many invocations were wrong.

image

Next steps:

  1. Support other AWS Lambda trace metrics such as trace.aws.lambda.error
  2. Maybe support trace stats for custom traces
  3. Extract the code into a component for reuse

@@ -39,7 +39,7 @@ const SERVICE_KEY: &str = "service";
// ComputeStatsKey is the tag key indicating whether trace stats should be computed
const COMPUTE_STATS_KEY: &str = "_dd.compute_stats";
// ComputeStatsValue is the tag value indicating trace stats should be computed
const COMPUTE_STATS_VALUE: &str = "1";
const COMPUTE_STATS_VALUE: &str = "0";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason for this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 means computing stats on the backend when it receives traces, instead of on our extension side. This is the current approach and doesn't work well. This PR changes traces stats to be computed on our extension side.

.expect("Failed to get current timestamp")
.as_nanos()
.try_into()
.expect("Failed to convert timestamp to u64");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do below to avoid potential panic?

      let current_timestamp = match SystemTime::now()
          .duration_since(UNIX_EPOCH)
          .and_then(|d| d.as_nanos().try_into().map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, "Timestamp overflow")))
      {
          Ok(ts) => ts,
          Err(e) => {
              error!("Failed to get current timestamp: {}, skipping stats flush", e);
              return Vec::new();
          }
      };

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will change, though I don't it matters a lot because u64 can represent 300+ years of time.


for timestamp in to_remove {
self.buckets.remove(&timestamp);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using retain() to save one round of scanning?

      self.buckets.retain(|&timestamp, bucket| {
          if force_flush || Self::should_flush_bucket(current_timestamp, timestamp) {
              stats.push(self.construct_stats_payload(timestamp, bucket));
              false // remove this bucket
          } else {
              true // keep this bucket
          }
      });

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Will change.

Comment on lines 1122 to 1126
let stats_concentrator = Arc::new(TokioMutex::new(StatsConcentrator::new(
Arc::clone(config),
Arc::clone(tags_provider),
)));
let stats_aggregator = Arc::new(TokioMutex::new(StatsAggregator::new_with_concentrator(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way where we don't need any locks and we just follow the same pattern as dogstatsd event handling?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you referring to this? DataDog/serverless-components#32

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will implement

}

#[allow(clippy::module_name_repetitions)]
pub struct StatsAgent {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this own the channel it creates and then return the tx through a public method? that way it's not created in the main binary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Will do.

hostname: String::new(),
env: self.config.env.clone().unwrap_or_default(),
version: self.config.version.clone().unwrap_or_default(),
lang: "rust".to_string(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be the lambda runtime lang?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@raphaelgavache Do you know what the lang field means? I couldn't understand it from protobuf definition: https://github.com/DataDog/datadog-agent/blob/main/pkg/proto/datadog/trace/stats.proto#L32
Reaching out to you as the author of DataDog/datadog-agent#7875, which adds this field.

lym953 added a commit that referenced this pull request Sep 19, 2025
## This PR
- Add the skeleton of `StatsConcentrator`, with no implementation
- Add `StatsConcentratorHandle` and `StatsConcentratorService`, which
send and process stats requests (`add()` and `get_stats()`) to/from a
queue, so mutex is not needed, and lock contention can be avoided.
(Thanks @duncanista for the suggestion and @astuyve for the example code
DataDog/serverless-components#32)

## Next steps
- Implement `StatsConcentrator`, which aggregates stats data into
buckets and returns it in batch
- Add more fields to `AggregationKey` and `Stats`
- Move the processing of stats after "obfuscation", as suggested by APM
team. This will involve lots of code changes, so I'll make it a separate
PR.

I'll mainly move code from this draft PR:
#827

## Architecture
<img width="1296" height="674" alt="image"
src="https://github.com/user-attachments/assets/2d4cb925-6cfc-4581-8ed6-6bd87cf0d87a"
/>

Jira: https://datadoghq.atlassian.net/browse/SVLS-7593
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants