Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions LICENSE-3rdparty.csv

Large diffs are not rendered by default.

38 changes: 21 additions & 17 deletions crates/datadog-serverless-compat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

use env_logger::Builder;
use log::{debug, error, info};
use std::{env, str::FromStr, sync::Arc, sync::Mutex};
use std::{env, str::FromStr, sync::Arc};
use tokio::{
sync::Mutex as TokioMutex,
time::{interval, Duration},
Expand All @@ -26,7 +26,7 @@ use datadog_trace_agent::{
use datadog_trace_utils::{config_utils::read_cloud_env, trace_utils::EnvironmentType};

use dogstatsd::{
aggregator::Aggregator as MetricsAggregator,
aggregator_service::{AggregatorHandle, AggregatorService},
api_key::ApiKeyFactory,
constants::CONTEXTS,
datadog::{MetricsIntakeUrlPrefix, RetryStrategy, Site},
Expand Down Expand Up @@ -138,9 +138,9 @@ pub async fn main() {
}
});

let mut metrics_flusher = if dd_use_dogstatsd {
let (mut metrics_flusher, _aggregator_handle) = if dd_use_dogstatsd {
debug!("Starting dogstatsd");
let (_, metrics_flusher) = start_dogstatsd(
let (_, metrics_flusher, aggregator_handle) = start_dogstatsd(
dd_dogstatsd_port,
dd_api_key,
dd_site,
Expand All @@ -149,10 +149,10 @@ pub async fn main() {
)
.await;
info!("dogstatsd-udp: starting to listen on port {dd_dogstatsd_port}");
metrics_flusher
(metrics_flusher, Some(aggregator_handle))
} else {
info!("dogstatsd disabled");
None
(None, None)
};

let mut flush_interval = interval(Duration::from_secs(DOGSTATSD_FLUSH_INTERVAL));
Expand All @@ -174,24 +174,28 @@ async fn start_dogstatsd(
dd_site: String,
https_proxy: Option<String>,
dogstatsd_tags: &str,
) -> (CancellationToken, Option<Flusher>) {
) -> (CancellationToken, Option<Flusher>, AggregatorHandle) {
// 1. Create the aggregator service
#[allow(clippy::expect_used)]
let metrics_aggr = Arc::new(Mutex::new(
MetricsAggregator::new(
SortedTags::parse(dogstatsd_tags).unwrap_or(EMPTY_TAGS),
CONTEXTS,
)
.expect("Failed to create metrics aggregator"),
));
let (service, handle) = AggregatorService::new(
SortedTags::parse(dogstatsd_tags).unwrap_or(EMPTY_TAGS),
CONTEXTS,
)
.expect("Failed to create aggregator service");

// 2. Start the aggregator service in the background
tokio::spawn(service.run());
Comment on lines +180 to +187
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense for the service.run to spawn the task, instead of us doing it? Or would this be a way for us to keep control on it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm learning that the nice part of doing things this way is that the caller of the task can decide which tokio runtime or pool to run the task in. In this case the serverless-compat binary, but in the other case it's in bottlecap. So if we want to move it to run in the blocking threadpool or run in a named tokio runtime, we can do that.


let dogstatsd_config = DogStatsDConfig {
host: AGENT_HOST.to_string(),
port,
};
let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new();

// 3. Use handle in DogStatsD (cheap to clone)
let dogstatsd_client = DogStatsD::new(
&dogstatsd_config,
Arc::clone(&metrics_aggr),
handle.clone(),
dogstatsd_cancel_token.clone(),
)
.await;
Expand All @@ -205,7 +209,7 @@ async fn start_dogstatsd(
#[allow(clippy::expect_used)]
let metrics_flusher = Flusher::new(FlusherConfig {
api_key_factory: Arc::new(ApiKeyFactory::new(&dd_api_key)),
aggregator: Arc::clone(&metrics_aggr),
aggregator_handle: handle.clone(),
metrics_intake_url_prefix: MetricsIntakeUrlPrefix::new(
Some(Site::new(dd_site).expect("Failed to parse site")),
None,
Expand All @@ -223,5 +227,5 @@ async fn start_dogstatsd(
}
};

(dogstatsd_cancel_token, metrics_flusher)
(dogstatsd_cancel_token, metrics_flusher, handle)
}
177 changes: 177 additions & 0 deletions crates/dogstatsd/src/aggregator_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use crate::aggregator::Aggregator;
use crate::datadog::Series;
use crate::metric::{Metric, SortedTags};
use datadog_protos::metrics::SketchPayload;
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, error, warn};

#[derive(Debug)]
pub enum AggregatorCommand {
InsertBatch(Vec<Metric>),
Flush(oneshot::Sender<FlushResponse>),
Shutdown,
}

#[derive(Debug)]
pub struct FlushResponse {
pub series: Vec<Series>,
pub distributions: Vec<SketchPayload>,
}

#[derive(Clone)]
pub struct AggregatorHandle {
tx: mpsc::UnboundedSender<AggregatorCommand>,
}

impl AggregatorHandle {
pub fn insert_batch(
&self,
metrics: Vec<Metric>,
) -> Result<(), mpsc::error::SendError<AggregatorCommand>> {
self.tx.send(AggregatorCommand::InsertBatch(metrics))
}

pub async fn flush(&self) -> Result<FlushResponse, String> {
let (response_tx, response_rx) = oneshot::channel();
self.tx
.send(AggregatorCommand::Flush(response_tx))
.map_err(|e| format!("Failed to send flush command: {}", e))?;

response_rx
.await
.map_err(|e| format!("Failed to receive flush response: {}", e))
}

pub fn shutdown(&self) -> Result<(), mpsc::error::SendError<AggregatorCommand>> {
self.tx.send(AggregatorCommand::Shutdown)
}
}

pub struct AggregatorService {
aggregator: Aggregator,
rx: mpsc::UnboundedReceiver<AggregatorCommand>,
}

impl AggregatorService {
pub fn new(
tags: SortedTags,
max_context: usize,
) -> Result<(Self, AggregatorHandle), crate::errors::Creation> {
let (tx, rx) = mpsc::unbounded_channel();
let aggregator = Aggregator::new(tags, max_context)?;

let service = Self { aggregator, rx };

let handle = AggregatorHandle { tx };

Ok((service, handle))
}

pub async fn run(mut self) {
debug!("Aggregator service started");

while let Some(command) = self.rx.recv().await {
match command {
AggregatorCommand::InsertBatch(metrics) => {
let mut insert_errors = 0;
for metric in metrics {
// The only possible error here is an overflow
if let Err(_e) = self.aggregator.insert(metric) {
Copy link
Preview

Copilot AI Aug 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error _e is bound but not used. Consider either logging the error or using Err(_) if the error details aren't needed.

Suggested change
if let Err(_e) = self.aggregator.insert(metric) {
if let Err(_) = self.aggregator.insert(metric) {

Copilot uses AI. Check for mistakes.

insert_errors += 1;
}
}
if insert_errors > 0 {
warn!("Total of {} metrics failed to insert", insert_errors);
}
}

AggregatorCommand::Flush(response_tx) => {
let series = self.aggregator.consume_metrics();
let distributions = self.aggregator.consume_distributions();

let response = FlushResponse {
series,
distributions,
};

if let Err(_) = response_tx.send(response) {
error!("Failed to send flush response - receiver dropped");
Copy link
Preview

Copilot AI Aug 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using a more descriptive pattern match or binding the error to log it. The current if let Err(_) = ... pattern discards potentially useful error information.

Suggested change
error!("Failed to send flush response - receiver dropped");
if let Err(e) = response_tx.send(response) {
error!("Failed to send flush response - receiver dropped: {}", e);

Copilot uses AI. Check for mistakes.

}
}

AggregatorCommand::Shutdown => {
debug!("Aggregator service shutting down");
break;
}
}
}

debug!("Aggregator service stopped");
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::metric::{parse, EMPTY_TAGS};

#[tokio::test]
async fn test_aggregator_service_basic_flow() {
let (service, handle) =
AggregatorService::new(EMPTY_TAGS, 1000).expect("Failed to create aggregator service");

// Start the service in a background task
let service_task = tokio::spawn(service.run());

// Insert some metrics
let metrics = vec![
parse("test:1|c|#k:v").expect("metric parse failed"),
parse("foo:2|c|#k:v").expect("metric parse failed"),
];

handle
.insert_batch(metrics)
.expect("Failed to insert metrics");

// Flush and check results
let response = handle.flush().await.expect("Failed to flush");
assert_eq!(response.series.len(), 1);
assert_eq!(response.series[0].series.len(), 2);

// Shutdown the service
handle.shutdown().expect("Failed to shutdown");
service_task.await.expect("Service task failed");
}

#[tokio::test]
async fn test_aggregator_service_distributions() {
let (service, handle) =
AggregatorService::new(EMPTY_TAGS, 1000).expect("Failed to create aggregator service");

// Start the service in a background task
let service_task = tokio::spawn(service.run());

// Insert distribution metrics
let metrics = vec![
parse("dist1:100|d|#k:v").expect("metric parse failed"),
parse("dist2:200|d|#k:v").expect("metric parse failed"),
];

handle
.insert_batch(metrics)
.expect("Failed to insert metrics");

// Flush and check results
let response = handle.flush().await.expect("Failed to flush");
assert_eq!(response.distributions.len(), 1);
assert_eq!(response.distributions[0].sketches.len(), 2);
assert_eq!(response.series.len(), 0);

// Shutdown the service
handle.shutdown().expect("Failed to shutdown");
service_task.await.expect("Service task failed");
}
}
Loading