From 814c585f65c6c27927a15f99015964646bc4e679 Mon Sep 17 00:00:00 2001 From: Tianning Li Date: Thu, 28 Aug 2025 14:59:23 -0400 Subject: [PATCH 1/4] feat: No longer launch Go-based agent for compatibility/OTLP/AAP config (#788) https://datadoghq.atlassian.net/browse/SVLS-7398 - As part of coming release, bottlecap agent no longer launches Go-based agent when compatibility/AAP/OTLP features are active - Emit the same metric when detecting any of above configuration - Update corresponding unit tests Manifests: - [Test lambda function](https://us-east-1.console.aws.amazon.com/lambda/home?region=us-east-1#/functions/ltn1-fullinstrument-bn-cold-python310-lambda?code=&subtab=envVars&tab=testing) with [logs](https://us-east-1.console.aws.amazon.com/cloudwatch/home?region=us-east-1#logsV2:log-groups/log-group/$252Faws$252Flambda$252Fltn1-fullinstrument-bn-cold-python310-lambda/log-events/2025$252F08$252F21$252F$255B$2524LATEST$255Df3788d359677452dad162488ff15456f$3FfilterPattern$3Dotel) showing compatibility/AAP/OTPL are enabled image - [Logging](https://app.datadoghq.com/logs/livetail?query=functionname%3Altn1-fullinstrument-bn-cold-python310-lambda%20Metric&agg_m=count&agg_m_source=base&agg_t=count&cols=host%2Cservice&fromUser=true&messageDisplay=inline&refresh_mode=paused&storage=driveline&stream_sort=desc&viz=stream&from_ts=1755787655569&to_ts=1755787689060&live=false) image - [Metric](https://app.datadoghq.com/screen/integration/aws_lambda_enhanced_metrics?fromUser=false&fullscreen_end_ts=1755788220000&fullscreen_paused=true&fullscreen_refresh_mode=paused&fullscreen_section=overview&fullscreen_start_ts=1755787200000&fullscreen_widget=2&graph-explorer__tile_def=N4IgbglgXiBcIBcD2AHANhAzgkAaEAxgK7ZIC2A%2BhgHYDWmcA2gLr4BOApgI5EfYOxGoTphRJqmDhQBmSNmQCGOeJgIK0CtnhA8ObCHyagAJkoUVMSImwIc4IMhwT6CDfNQWP7utgE8AjNo%2BvvaYRGSwpggKxkgA5gB0kmxgemh8mAkcAB4IHBIQ4gnSChBoSKlswAAkCgDumBQKBARW1Ai41ZxxhdSd0kTUBAi9AL4ABABGvuPAA0Mj4h6OowkKja2DCAAUAJTaCnFx3UpyoeEgo6wgsvJEGgJCN3Jk9wrevH6BV-iWbMqgTbtOAAJgADPg5MY9BRpkZEL4UHZ4LdXhptBBqNDsnAISAoXp7NDVJdmKMfiBsL50nBgOSgA&refresh_mode=sliding&from_ts=1755783890661&to_ts=1755787490661&live=true) image --- bottlecap/src/config/mod.rs | 120 +++++++++----------- bottlecap/src/metrics/enhanced/constants.rs | 2 + bottlecap/src/metrics/enhanced/lambda.rs | 86 ++++++++++++-- bottlecap/src/otlp/mod.rs | 6 +- 4 files changed, 136 insertions(+), 78 deletions(-) diff --git a/bottlecap/src/config/mod.rs b/bottlecap/src/config/mod.rs index 0dc7b734f..976308c9e 100644 --- a/bottlecap/src/config/mod.rs +++ b/bottlecap/src/config/mod.rs @@ -442,10 +442,13 @@ impl Default for Config { } fn log_fallback_reason(reason: &str) { - println!("{{\"DD_EXTENSION_FALLBACK_REASON\":\"{reason}\"}}"); + error!("Fallback support for {reason} is no longer available."); } -fn fallback(config: &Config) -> Result<(), ConfigError> { +#[must_use = "fallback reasons should be processed to emit appropriate metrics"] +pub fn fallback(config: &Config) -> Vec { + let mut fallback_reasons = Vec::new(); + // Customer explicitly opted out of the Next Gen extension let opted_out = match config.extension_version.as_deref() { Some("compatibility") => true, @@ -454,21 +457,18 @@ fn fallback(config: &Config) -> Result<(), ConfigError> { }; if opted_out { - log_fallback_reason("extension_version"); - return Err(ConfigError::UnsupportedField( - "extension_version".to_string(), - )); + let reason = "extension_version"; + log_fallback_reason(reason); + fallback_reasons.push(reason.to_string()); } // ASM / .NET // todo(duncanista): Remove once the .NET runtime is fixed if config.serverless_appsec_enabled && has_dotnet_binary() { - log_fallback_reason("serverless_appsec_enabled_dotnet"); - return Err(ConfigError::UnsupportedField( - "serverless_appsec_enabled_dotnet".to_string(), - )); + let reason = "serverless_appsec_enabled_dotnet"; + log_fallback_reason(reason); + fallback_reasons.push(reason.to_string()); } - // OTLP let has_otlp_config = config .otlp_config_receiver_protocols_grpc_endpoint @@ -500,25 +500,22 @@ fn fallback(config: &Config) -> Result<(), ConfigError> { || config.otlp_config_logs_enabled; if has_otlp_config { - log_fallback_reason("otel"); - return Err(ConfigError::UnsupportedField("otel".to_string())); + let reason = "otel"; + log_fallback_reason(reason); + fallback_reasons.push(reason.to_string()); } - Ok(()) + fallback_reasons } #[allow(clippy::module_name_repetitions)] -pub fn get_config(config_directory: &Path) -> Result { +#[must_use = "configuration must be used to initialize the application"] +pub fn get_config(config_directory: &Path) -> Config { let path: std::path::PathBuf = config_directory.join("datadog.yaml"); let mut config_builder = ConfigBuilder::default() .add_source(Box::new(YamlConfigSource { path })) .add_source(Box::new(EnvConfigSource)); - - let config = config_builder.build(); - - fallback(&config)?; - - Ok(config) + config_builder.build() } #[inline] @@ -731,11 +728,7 @@ pub mod tests { figment::Jail::expect_with(|jail| { jail.clear_env(); jail.set_env("DD_EXTENSION_VERSION", "compatibility"); - let config = get_config(Path::new("")).expect_err("should reject unknown fields"); - assert_eq!( - config, - ConfigError::UnsupportedField("extension_version".to_string()) - ); + let _config = get_config(Path::new("")); Ok(()) }); } @@ -749,8 +742,7 @@ pub mod tests { "localhost:4138", ); - let config = get_config(Path::new("")).expect_err("should reject unknown fields"); - assert_eq!(config, ConfigError::UnsupportedField("otel".to_string())); + let _config = get_config(Path::new("")); Ok(()) }); } @@ -770,8 +762,7 @@ pub mod tests { ", )?; - let config = get_config(Path::new("")).expect_err("should reject unknown fields"); - assert_eq!(config, ConfigError::UnsupportedField("otel".to_string())); + let _config = get_config(Path::new("")); Ok(()) }); } @@ -781,7 +772,7 @@ pub mod tests { figment::Jail::expect_with(|jail| { jail.clear_env(); - let config = get_config(Path::new("")).expect("should parse config"); + let config = get_config(Path::new("")); assert_eq!( config.logs_config_logs_dd_url, "https://http-intake.logs.datadoghq.com".to_string() @@ -799,7 +790,7 @@ pub mod tests { "agent-http-intake-pci.logs.datadoghq.com:443", ); - let config = get_config(Path::new("")).expect("should parse config"); + let config = get_config(Path::new("")); assert_eq!( config.logs_config_logs_dd_url, "agent-http-intake-pci.logs.datadoghq.com:443".to_string() @@ -814,7 +805,7 @@ pub mod tests { jail.clear_env(); jail.set_env("DD_APM_DD_URL", "https://trace-pci.agent.datadoghq.com"); - let config = get_config(Path::new("")).expect("should parse config"); + let config = get_config(Path::new("")); assert_eq!( config.apm_dd_url, "https://trace-pci.agent.datadoghq.com/api/v0.2/traces".to_string() @@ -829,7 +820,7 @@ pub mod tests { jail.clear_env(); jail.set_env("DD_DD_URL", "custom_proxy:3128"); - let config = get_config(Path::new("")).expect("should parse config"); + let config = get_config(Path::new("")); assert_eq!(config.dd_url, "custom_proxy:3128".to_string()); Ok(()) }); @@ -841,7 +832,7 @@ pub mod tests { jail.clear_env(); jail.set_env("DD_URL", "custom_proxy:3128"); - let config = get_config(Path::new("")).expect("should parse config"); + let config = get_config(Path::new("")); assert_eq!(config.url, "custom_proxy:3128".to_string()); Ok(()) }); @@ -852,7 +843,7 @@ pub mod tests { figment::Jail::expect_with(|jail| { jail.clear_env(); - let config = get_config(Path::new("")).expect("should parse config"); + let config = get_config(Path::new("")); assert_eq!(config.dd_url, String::new()); Ok(()) }); @@ -862,13 +853,9 @@ pub mod tests { fn test_allowed_but_disabled() { figment::Jail::expect_with(|jail| { jail.clear_env(); - jail.set_env( - "DD_OTLP_CONFIG_RECEIVER_PROTOCOLS_GRPC_ENDPOINT", - "localhost:4138", - ); + jail.set_env("DD_SERVERLESS_APPSEC_ENABLED", "true"); - let config = get_config(Path::new("")).expect_err("should reject unknown fields"); - assert_eq!(config, ConfigError::UnsupportedField("otel".to_string())); + let _config = get_config(Path::new("")); Ok(()) }); } @@ -884,7 +871,7 @@ pub mod tests { ", )?; jail.set_env("DD_SITE", "datad0g.com"); - let config = get_config(Path::new("")).expect("should parse config"); + let config = get_config(Path::new("")); assert_eq!(config.site, "datad0g.com"); Ok(()) }); @@ -900,7 +887,7 @@ pub mod tests { r" ", )?; - let config = get_config(Path::new("")).expect("should parse config"); + let config = get_config(Path::new("")); assert_eq!(config.site, "datadoghq.com"); Ok(()) }); @@ -911,7 +898,7 @@ pub mod tests { figment::Jail::expect_with(|jail| { jail.clear_env(); jail.set_env("DD_SITE", "datadoghq.eu"); - let config = get_config(Path::new("")).expect("should parse config"); + let config = get_config(Path::new("")); assert_eq!(config.site, "datadoghq.eu"); Ok(()) }); @@ -922,7 +909,7 @@ pub mod tests { figment::Jail::expect_with(|jail| { jail.clear_env(); jail.set_env("DD_LOG_LEVEL", "TRACE"); - let config = get_config(Path::new("")).expect("should parse config"); + let config = get_config(Path::new("")); assert_eq!(config.log_level, LogLevel::Trace); Ok(()) }); @@ -932,7 +919,7 @@ pub mod tests { fn test_parse_default() { figment::Jail::expect_with(|jail| { jail.clear_env(); - let config = get_config(Path::new("")).expect("should parse config"); + let config = get_config(Path::new("")); assert_eq!( config, Config { @@ -956,7 +943,7 @@ pub mod tests { figment::Jail::expect_with(|jail| { jail.clear_env(); jail.set_env("DD_PROXY_HTTPS", "my-proxy:3128"); - let config = get_config(Path::new("")).expect("should parse config"); + let config = get_config(Path::new("")); assert_eq!(config.proxy_https, Some("my-proxy:3128".to_string())); Ok(()) }); @@ -972,7 +959,7 @@ pub mod tests { "NO_PROXY", "127.0.0.1,localhost,172.16.0.0/12,us-east-1.amazonaws.com,datadoghq.eu", ); - let config = get_config(Path::new("")).expect("should parse noproxy"); + let config = get_config(Path::new("")); assert_eq!(config.proxy_https, None); Ok(()) }); @@ -990,7 +977,7 @@ pub mod tests { ", )?; - let config = get_config(Path::new("")).expect("should parse weird proxy config"); + let config = get_config(Path::new("")); assert_eq!(config.proxy_https, Some("my-proxy:3128".to_string())); Ok(()) }); @@ -1010,7 +997,7 @@ pub mod tests { ", )?; - let config = get_config(Path::new("")).expect("should parse weird proxy config"); + let config = get_config(Path::new("")); assert_eq!(config.proxy_https, None); // Assertion to ensure config.site runs before proxy // because we chenck that noproxy contains the site @@ -1024,7 +1011,7 @@ pub mod tests { figment::Jail::expect_with(|jail| { jail.clear_env(); jail.set_env("DD_SERVERLESS_FLUSH_STRATEGY", "end"); - let config = get_config(Path::new("")).expect("should parse config"); + let config = get_config(Path::new("")); assert_eq!(config.serverless_flush_strategy, FlushStrategy::End); Ok(()) }); @@ -1035,7 +1022,7 @@ pub mod tests { figment::Jail::expect_with(|jail| { jail.clear_env(); jail.set_env("DD_SERVERLESS_FLUSH_STRATEGY", "periodically,100000"); - let config = get_config(Path::new("")).expect("should parse config"); + let config = get_config(Path::new("")); assert_eq!( config.serverless_flush_strategy, FlushStrategy::Periodically(PeriodicStrategy { interval: 100_000 }) @@ -1049,7 +1036,7 @@ pub mod tests { figment::Jail::expect_with(|jail| { jail.clear_env(); jail.set_env("DD_SERVERLESS_FLUSH_STRATEGY", "invalid_strategy"); - let config = get_config(Path::new("")).expect("should parse config"); + let config = get_config(Path::new("")); assert_eq!(config.serverless_flush_strategy, FlushStrategy::Default); Ok(()) }); @@ -1063,7 +1050,7 @@ pub mod tests { "DD_SERVERLESS_FLUSH_STRATEGY", "periodically,invalid_interval", ); - let config = get_config(Path::new("")).expect("should parse config"); + let config = get_config(Path::new("")); assert_eq!(config.serverless_flush_strategy, FlushStrategy::Default); Ok(()) }); @@ -1076,7 +1063,7 @@ pub mod tests { jail.set_env("DD_VERSION", "123"); jail.set_env("DD_ENV", "123456890"); jail.set_env("DD_SERVICE", "123456"); - let config = get_config(Path::new("")).expect("should parse config"); + let config = get_config(Path::new("")); assert_eq!(config.version.expect("failed to parse DD_VERSION"), "123"); assert_eq!(config.env.expect("failed to parse DD_ENV"), "123456890"); assert_eq!( @@ -1106,7 +1093,7 @@ pub mod tests { pattern: exclude-me-yaml ", )?; - let config = get_config(Path::new("")).expect("should parse config"); + let config = get_config(Path::new("")); assert_eq!( config.logs_config_processing_rules, Some(vec![ProcessingRule { @@ -1135,7 +1122,7 @@ pub mod tests { pattern: exclude ", )?; - let config = get_config(Path::new("")).expect("should parse config"); + let config = get_config(Path::new("")); assert_eq!( config.logs_config_processing_rules, Some(vec![ProcessingRule { @@ -1164,7 +1151,7 @@ pub mod tests { repl: 'REDACTED' ", )?; - let config = get_config(Path::new("")).expect("should parse config"); + let config = get_config(Path::new("")); let rule = parse_rules_from_string( r#"[ {"name": "*", "pattern": "foo", "repl": "REDACTED"} @@ -1195,7 +1182,7 @@ pub mod tests { repl: 'REDACTED-YAML' ", )?; - let config = get_config(Path::new("")).expect("should parse config"); + let config = get_config(Path::new("")); let rule = parse_rules_from_string( r#"[ {"name": "*", "pattern": "foo", "repl": "REDACTED-ENV"} @@ -1222,7 +1209,7 @@ pub mod tests { remove_paths_with_digits: true ", )?; - let config = get_config(Path::new("")).expect("should parse config"); + let config = get_config(Path::new("")); assert!(config.apm_config_obfuscation_http_remove_query_string,); assert!(config.apm_config_obfuscation_http_remove_paths_with_digits,); Ok(()) @@ -1237,7 +1224,7 @@ pub mod tests { "datadog,tracecontext,b3,b3multi", ); jail.set_env("DD_EXTENSION_VERSION", "next"); - let config = get_config(Path::new("")).expect("should parse config"); + let config = get_config(Path::new("")); let expected_styles = vec![ TracePropagationStyle::Datadog, @@ -1256,7 +1243,7 @@ pub mod tests { figment::Jail::expect_with(|jail| { jail.clear_env(); jail.set_env("DD_TRACE_PROPAGATION_STYLE_EXTRACT", "datadog"); - let config = get_config(Path::new("")).expect("should parse config"); + let config = get_config(Path::new("")); assert_eq!( config.trace_propagation_style, @@ -1281,8 +1268,7 @@ pub mod tests { "DD_APM_REPLACE_TAGS", r#"[{"name":"resource.name","pattern":"(.*)/(foo[:%].+)","repl":"$1/{foo}"}]"#, ); - let config = get_config(Path::new("")); - assert!(config.is_ok()); + let _config = get_config(Path::new("")); Ok(()) }); } @@ -1295,7 +1281,7 @@ pub mod tests { jail.set_env("DD_ENHANCED_METRICS", "1"); jail.set_env("DD_LOGS_CONFIG_USE_COMPRESSION", "TRUE"); jail.set_env("DD_CAPTURE_LAMBDA_PAYLOAD", "0"); - let config = get_config(Path::new("")).expect("should parse config"); + let config = get_config(Path::new("")); assert!(config.serverless_logs_enabled); assert!(config.enhanced_metrics); assert!(config.logs_config_use_compression); @@ -1319,7 +1305,7 @@ pub mod tests { jail.set_env("DD_SITE", "us5.datadoghq.com"); jail.set_env("DD_API_KEY", "env-api-key"); jail.set_env("DD_FLUSH_TIMEOUT", "10"); - let config = get_config(Path::new("")).expect("should parse config"); + let config = get_config(Path::new("")); assert_eq!(config.site, "us5.datadoghq.com"); assert_eq!(config.api_key, "env-api-key"); diff --git a/bottlecap/src/metrics/enhanced/constants.rs b/bottlecap/src/metrics/enhanced/constants.rs index 44666910f..110c0b618 100644 --- a/bottlecap/src/metrics/enhanced/constants.rs +++ b/bottlecap/src/metrics/enhanced/constants.rs @@ -45,6 +45,8 @@ pub const THREADS_USE_METRIC: &str = "aws.lambda.enhanced.threads_use"; pub const SHUTDOWNS_METRIC: &str = "aws.lambda.enhanced.shutdowns"; //pub const ASM_INVOCATIONS_METRIC: &str = "aws.lambda.enhanced.asm.invocations"; pub const UNUSED_INIT: &str = "aws.lambda.enhanced.unused_init"; +pub const DATADOG_SERVERLESS_EXTENSION_FAILOVER_CONFIG_ISSUE_METRIC: &str = + "datadog.serverless.extension.failover"; pub const ENHANCED_METRICS_ENV_VAR: &str = "DD_ENHANCED_METRICS"; // Monitoring interval for tmp, fd, and threads metrics diff --git a/bottlecap/src/metrics/enhanced/lambda.rs b/bottlecap/src/metrics/enhanced/lambda.rs index 7e3276251..04287c317 100644 --- a/bottlecap/src/metrics/enhanced/lambda.rs +++ b/bottlecap/src/metrics/enhanced/lambda.rs @@ -60,18 +60,29 @@ impl Lambda { .insert(String::from("runtime"), runtime.to_string()); } - fn get_dynamic_value_tags(&self) -> Option { - let vec_tags: Vec = self - .dynamic_value_tags - .iter() - .map(|(k, v)| format!("{k}:{v}")) - .collect(); + fn tags_to_sorted_tags(tags: &HashMap) -> Option { + let vec_tags: Vec = tags.iter().map(|(k, v)| format!("{k}:{v}")).collect(); let string_tags = vec_tags.join(","); SortedTags::parse(&string_tags).ok() } + fn get_dynamic_value_tags(&self) -> Option { + Self::tags_to_sorted_tags(&self.dynamic_value_tags) + } + + fn get_combined_tags(&self, additional_tags: &HashMap) -> Option { + if additional_tags.is_empty() { + return self.get_dynamic_value_tags(); + } + + let mut combined_tags = self.dynamic_value_tags.clone(); + combined_tags.extend(additional_tags.clone()); + + Self::tags_to_sorted_tags(&combined_tags) + } + pub fn increment_invocation_metric(&self, timestamp: i64) { self.increment_metric(constants::INVOCATIONS_METRIC, timestamp); } @@ -94,6 +105,19 @@ impl Lambda { self.increment_metric(constants::OUT_OF_MEMORY_METRIC, timestamp); } + /// Set up a metric tracking configuration load issue with details + pub fn set_config_load_issue_metric(&self, timestamp: i64, reason_msg: &str) { + let dynamic_tags = self.get_combined_tags(&HashMap::from([( + "reason".to_string(), + reason_msg.to_string(), + )])); + self.increment_metric_with_tags( + constants::DATADOG_SERVERLESS_EXTENSION_FAILOVER_CONFIG_ISSUE_METRIC, + timestamp, + dynamic_tags, + ); + } + pub fn set_init_duration_metric( &mut self, init_type: InitType, @@ -122,13 +146,23 @@ impl Lambda { } fn increment_metric(&self, metric_name: &str, timestamp: i64) { + self.increment_metric_with_tags(metric_name, timestamp, self.get_dynamic_value_tags()); + } + + /// Helper function to emit metric with supplied tags + fn increment_metric_with_tags( + &self, + metric_name: &str, + timestamp: i64, + tags: Option, + ) { if !self.config.enhanced_metrics { return; } let metric = Metric::new( metric_name.into(), MetricValue::distribution(1f64), - self.get_dynamic_value_tags(), + tags, Some(timestamp), ); if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { @@ -767,9 +801,19 @@ mod tests { } async fn assert_sketch(handle: &AggregatorHandle, metric_id: &str, value: f64, timestamp: i64) { + assert_sketch_with_tag(handle, metric_id, value, timestamp, None).await; + } + + async fn assert_sketch_with_tag( + handle: &AggregatorHandle, + metric_id: &str, + value: f64, + timestamp: i64, + tags: Option, + ) { let ts = (timestamp / 10) * 10; if let Some(e) = handle - .get_entry_by_id(metric_id.into(), None, ts) + .get_entry_by_id(metric_id.into(), tags, ts) .await .unwrap() { @@ -1371,4 +1415,30 @@ mod tests { .is_none() ); } + + #[tokio::test] + async fn test_set_config_load_issue_metric() { + let (metrics_aggr, my_config) = setup(); + let lambda = Lambda::new(metrics_aggr.clone(), my_config); + let now: i64 = std::time::UNIX_EPOCH + .elapsed() + .expect("unable to poll clock, unrecoverable") + .as_secs() + .try_into() + .unwrap_or_default(); + let test_reason = "test_config_issue"; + + lambda.set_config_load_issue_metric(now, test_reason); + + // Create the expected tags for the metric lookup + let expected_tags = SortedTags::parse(&format!("reason:{test_reason}")).ok(); + assert_sketch_with_tag( + &metrics_aggr, + constants::DATADOG_SERVERLESS_EXTENSION_FAILOVER_CONFIG_ISSUE_METRIC, + 1f64, + now, + expected_tags, + ) + .await; + } } diff --git a/bottlecap/src/otlp/mod.rs b/bottlecap/src/otlp/mod.rs index 292a80180..4e0f4aed7 100644 --- a/bottlecap/src/otlp/mod.rs +++ b/bottlecap/src/otlp/mod.rs @@ -37,7 +37,7 @@ mod tests { ", )?; - let config = get_config(Path::new("")).expect("should parse config"); + let config = get_config(Path::new("")); // Since the default for traces is `true`, we don't need to set it. assert!(should_enable_otlp_agent(&Arc::new(config))); @@ -55,7 +55,7 @@ mod tests { "0.0.0.0:4318", ); - let config = get_config(Path::new("")).expect("should parse config"); + let config = get_config(Path::new("")); // Since the default for traces is `true`, we don't need to set it. assert!(should_enable_otlp_agent(&Arc::new(config))); @@ -74,7 +74,7 @@ mod tests { "0.0.0.0:4318", ); - let config = get_config(Path::new("")).expect("should parse config"); + let config = get_config(Path::new("")); assert!(!should_enable_otlp_agent(&Arc::new(config))); From 58f01a0f5d2523ab8b4a05d1f7af09464707b49c Mon Sep 17 00:00:00 2001 From: Tianning Li Date: Fri, 22 Aug 2025 16:34:00 -0400 Subject: [PATCH 2/4] feat: Add hierarchical configurable compression levels for metrics and traces - Add global compression_level config parameter (0-9, default: 6) with fallback hierarchy - Support 2-level compression configuration: global level first, then module-specific - This makes configuration more convenient - set once globally or override per module - Apply compression configuration to metrics flushers and trace processor - Add environment variable DD_COMPRESSION_LEVEL for global setting --- bottlecap/Cargo.toml | 4 +- bottlecap/src/bin/bottlecap/main.rs | 2 + bottlecap/src/config/env.rs | 51 ++++++++++++++++-- bottlecap/src/config/mod.rs | 10 ++++ bottlecap/src/config/yaml.rs | 57 +++++++++++++++++++-- bottlecap/src/traces/trace_processor.rs | 2 +- bottlecap/tests/metrics_integration_test.rs | 1 + 7 files changed, 115 insertions(+), 12 deletions(-) diff --git a/bottlecap/Cargo.toml b/bottlecap/Cargo.toml index c5ead6a50..e40f62609 100644 --- a/bottlecap/Cargo.toml +++ b/bottlecap/Cargo.toml @@ -62,8 +62,8 @@ datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "9405db9cb4ef733f3954c3ee77ce71a502e98e50" , features = ["mini_agent"] } datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "9405db9cb4ef733f3954c3ee77ce71a502e98e50" } datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "9405db9cb4ef733f3954c3ee77ce71a502e98e50" } -dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "abfec752b0638a9e4096e1465acd4bb2651edfa7", default-features = false } -datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "fa1d2f4ea2c4c2596144a1f362935e56cf0cb3c7", default-features = false } +dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "76727c3289d65953fe9d99bd955429ee4ab60d9f", default-features = false } +datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "76727c3289d65953fe9d99bd955429ee4ab60d9f", default-features = false } libddwaf = { version = "1.26.0", git = "https://github.com/DataDog/libddwaf-rust", rev = "1d57bf0ca49782723e556ba327ee7f378978aaa7", default-features = false, features = ["serde", "dynamic"] } [dev-dependencies] diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 0e87473f0..a87ee7fc2 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -1048,6 +1048,7 @@ fn start_metrics_flushers( https_proxy: config.proxy_https.clone(), timeout: Duration::from_secs(config.flush_timeout), retry_strategy: DsdRetryStrategy::Immediate(3), + compression_level: config.metrics_config_compression_level, }; flushers.push(MetricsFlusher::new(flusher_config)); @@ -1076,6 +1077,7 @@ fn start_metrics_flushers( https_proxy: config.proxy_https.clone(), timeout: Duration::from_secs(config.flush_timeout), retry_strategy: DsdRetryStrategy::Immediate(3), + compression_level: config.metrics_config_compression_level, }; flushers.push(MetricsFlusher::new(additional_flusher_config)); } diff --git a/bottlecap/src/config/env.rs b/bottlecap/src/config/env.rs index b6d444fc5..f416461c2 100644 --- a/bottlecap/src/config/env.rs +++ b/bottlecap/src/config/env.rs @@ -108,6 +108,13 @@ pub struct EnvConfig { #[serde(deserialize_with = "deserialize_key_value_pairs")] pub tags: HashMap, + /// @env `DD_COMPRESSION_LEVEL` + /// + /// Global level `compression_level` parameter accepts values from 0 (no compression) + /// to 9 (maximum compression but higher resource usage). This value is effective only if + /// the individual component doesn't specify its own. + pub compression_level: Option, + // Logs /// @env `DD_LOGS_CONFIG_LOGS_DD_URL` /// @@ -229,6 +236,12 @@ pub struct EnvConfig { #[serde(deserialize_with = "deserialize_optional_bool_from_anything")] pub trace_propagation_http_baggage_enabled: Option, + /// @env `DD_METRICS_CONFIG_COMPRESSION_LEVEL` + /// The metrics compresses traces before sending them. The `compression_level` parameter + /// accepts values from 0 (no compression) to 9 (maximum compression but + /// higher resource usage). + pub metrics_config_compression_level: Option, + // OTLP // // - APM / Traces @@ -383,6 +396,8 @@ fn merge_config(config: &mut Config, env_config: &EnvConfig) { merge_option!(config, env_config, version); merge_hashmap!(config, env_config, tags); + merge_option_to_value!(config, env_config, compression_level); + // Proxy merge_option!(config, env_config, proxy_https); merge_vec!(config, env_config, proxy_no_proxy); @@ -397,6 +412,12 @@ fn merge_config(config: &mut Config, env_config: &EnvConfig) { merge_string!(config, env_config, logs_config_logs_dd_url); merge_option!(config, env_config, logs_config_processing_rules); merge_option_to_value!(config, env_config, logs_config_use_compression); + merge_option_to_value!( + config, + logs_config_compression_level, + env_config, + compression_level + ); merge_option_to_value!(config, env_config, logs_config_compression_level); merge_vec!(config, env_config, logs_config_additional_endpoints); @@ -414,6 +435,12 @@ fn merge_config(config: &mut Config, env_config: &EnvConfig) { env_config, apm_config_obfuscation_http_remove_paths_with_digits ); + merge_option_to_value!( + config, + apm_config_compression_level, + env_config, + compression_level + ); merge_option_to_value!(config, env_config, apm_config_compression_level); merge_vec!(config, env_config, apm_features); merge_hashmap!(config, env_config, apm_additional_endpoints); @@ -429,6 +456,15 @@ fn merge_config(config: &mut Config, env_config: &EnvConfig) { merge_option_to_value!(config, env_config, trace_propagation_extract_first); merge_option_to_value!(config, env_config, trace_propagation_http_baggage_enabled); + // Metrics + merge_option_to_value!( + config, + metrics_config_compression_level, + env_config, + compression_level + ); + merge_option_to_value!(config, env_config, metrics_config_compression_level); + // OTLP merge_option_to_value!(config, env_config, otlp_config_traces_enabled); merge_option_to_value!( @@ -589,6 +625,8 @@ mod tests { jail.set_env("DD_VERSION", "1.0.0"); jail.set_env("DD_TAGS", "team:test-team,project:test-project"); + jail.set_env("DD_COMPRESSION_LEVEL", "4"); + // Logs jail.set_env("DD_LOGS_CONFIG_LOGS_DD_URL", "https://logs.datadoghq.com"); jail.set_env( @@ -596,7 +634,7 @@ mod tests { r#"[{"type":"exclude_at_match","name":"exclude","pattern":"exclude"}]"#, ); jail.set_env("DD_LOGS_CONFIG_USE_COMPRESSION", "false"); - jail.set_env("DD_LOGS_CONFIG_COMPRESSION_LEVEL", "3"); + jail.set_env("DD_LOGS_CONFIG_COMPRESSION_LEVEL", "1"); jail.set_env( "DD_LOGS_CONFIG_ADDITIONAL_ENDPOINTS", "[{\"api_key\": \"apikey2\", \"Host\": \"agent-http-intake.logs.datadoghq.com\", \"Port\": 443, \"is_reliable\": true}]", @@ -615,7 +653,7 @@ mod tests { "DD_APM_CONFIG_OBFUSCATION_HTTP_REMOVE_PATHS_WITH_DIGITS", "true", ); - jail.set_env("DD_APM_CONFIG_COMPRESSION_LEVEL", "3"); + jail.set_env("DD_APM_CONFIG_COMPRESSION_LEVEL", "2"); jail.set_env( "DD_APM_FEATURES", "enable_otlp_compute_top_level_by_span_kind,enable_stats_by_span_kind", @@ -639,6 +677,9 @@ mod tests { jail.set_env("DD_TRACE_PROPAGATION_HTTP_BAGGAGE_ENABLED", "true"); jail.set_env("DD_TRACE_AWS_SERVICE_REPRESENTATION_ENABLED", "true"); + // Metrics + jail.set_env("DD_METRICS_CONFIG_COMPRESSION_LEVEL", "3"); + // OTLP jail.set_env("DD_OTLP_CONFIG_TRACES_ENABLED", "false"); jail.set_env("DD_OTLP_CONFIG_TRACES_SPAN_NAME_AS_RESOURCE_NAME", "true"); @@ -721,6 +762,7 @@ mod tests { site: "test-site".to_string(), api_key: "test-api-key".to_string(), log_level: LogLevel::Debug, + compression_level: 4, flush_timeout: 42, proxy_https: Some("https://proxy.example.com".to_string()), proxy_no_proxy: vec!["localhost".to_string(), "127.0.0.1".to_string()], @@ -752,7 +794,7 @@ mod tests { replace_placeholder: None, }]), logs_config_use_compression: false, - logs_config_compression_level: 3, + logs_config_compression_level: 1, logs_config_additional_endpoints: vec![LogsAdditionalEndpoint { api_key: "apikey2".to_string(), host: "agent-http-intake.logs.datadoghq.com".to_string(), @@ -772,7 +814,7 @@ mod tests { ), apm_config_obfuscation_http_remove_query_string: true, apm_config_obfuscation_http_remove_paths_with_digits: true, - apm_config_compression_level: 3, + apm_config_compression_level: 2, apm_features: vec![ "enable_otlp_compute_top_level_by_span_kind".to_string(), "enable_stats_by_span_kind".to_string(), @@ -808,6 +850,7 @@ mod tests { trace_propagation_extract_first: true, trace_propagation_http_baggage_enabled: true, trace_aws_service_representation_enabled: true, + metrics_config_compression_level: 3, otlp_config_traces_enabled: false, otlp_config_traces_span_name_as_resource_name: true, otlp_config_traces_span_name_remappings: HashMap::from([( diff --git a/bottlecap/src/config/mod.rs b/bottlecap/src/config/mod.rs index 976308c9e..d37dcf3cd 100644 --- a/bottlecap/src/config/mod.rs +++ b/bottlecap/src/config/mod.rs @@ -245,6 +245,8 @@ pub struct Config { // Timeout for the request to flush data to Datadog endpoint pub flush_timeout: u64, + pub compression_level: i32, + // Proxy pub proxy_https: Option, pub proxy_no_proxy: Vec, @@ -291,6 +293,9 @@ pub struct Config { pub trace_propagation_http_baggage_enabled: bool, pub trace_aws_service_representation_enabled: bool, + // Metrics + pub metrics_config_compression_level: i32, + // OTLP // // - APM / Traces @@ -368,6 +373,8 @@ impl Default for Config { version: None, tags: HashMap::new(), + compression_level: 6, + // Logs logs_config_logs_dd_url: String::default(), logs_config_processing_rules: None, @@ -397,6 +404,9 @@ impl Default for Config { trace_propagation_extract_first: false, trace_propagation_http_baggage_enabled: false, + // Metrics + metrics_config_compression_level: 6, + // OTLP otlp_config_traces_enabled: true, otlp_config_traces_span_name_as_resource_name: false, diff --git a/bottlecap/src/config/yaml.rs b/bottlecap/src/config/yaml.rs index b8dca19b3..e1e28d4f4 100644 --- a/bottlecap/src/config/yaml.rs +++ b/bottlecap/src/config/yaml.rs @@ -38,6 +38,8 @@ pub struct YamlConfig { pub flush_timeout: Option, + pub compression_level: Option, + // Proxy pub proxy: ProxyConfig, // nit: this should probably be in the endpoints section @@ -78,6 +80,9 @@ pub struct YamlConfig { #[serde(deserialize_with = "deserialize_optional_bool_from_anything")] pub trace_propagation_http_baggage_enabled: Option, + // Metrics + pub metrics_config: MetricsConfig, + // OTLP pub otlp_config: Option, @@ -131,6 +136,15 @@ pub struct LogsConfig { pub additional_endpoints: Vec, } +/// Metrics specific config +/// +#[derive(Debug, PartialEq, Deserialize, Clone, Copy, Default)] +#[serde(default)] +#[allow(clippy::module_name_repetitions)] +pub struct MetricsConfig { + pub compression_level: Option, +} + /// APM Config /// @@ -373,6 +387,8 @@ fn merge_config(config: &mut Config, yaml_config: &YamlConfig) { merge_option!(config, yaml_config, version); merge_hashmap!(config, yaml_config, tags); + merge_option_to_value!(config, yaml_config, compression_level); + // Proxy merge_option!(config, proxy_https, yaml_config.proxy, https); merge_option_to_value!(config, proxy_no_proxy, yaml_config.proxy, no_proxy); @@ -401,6 +417,12 @@ fn merge_config(config: &mut Config, yaml_config: &YamlConfig) { yaml_config.logs_config, use_compression ); + merge_option_to_value!( + config, + logs_config_compression_level, + yaml_config, + compression_level + ); merge_option_to_value!( config, logs_config_compression_level, @@ -414,6 +436,20 @@ fn merge_config(config: &mut Config, yaml_config: &YamlConfig) { additional_endpoints ); + merge_option_to_value!( + config, + metrics_config_compression_level, + yaml_config, + compression_level + ); + + merge_option_to_value!( + config, + metrics_config_compression_level, + yaml_config.metrics_config, + compression_level + ); + // APM merge_hashmap!(config, yaml_config, service_mapping); merge_string!(config, apm_dd_url, yaml_config.apm_config, apm_dd_url); @@ -423,6 +459,12 @@ fn merge_config(config: &mut Config, yaml_config: &YamlConfig) { yaml_config.apm_config, replace_tags ); + merge_option_to_value!( + config, + apm_config_compression_level, + yaml_config, + compression_level + ); merge_option_to_value!( config, apm_config_compression_level, @@ -667,7 +709,7 @@ site: "test-site" api_key: "test-api-key" log_level: "debug" flush_timeout: 42 - +compression_level: 4 # Proxy proxy: https: "https://proxy.example.com" @@ -699,7 +741,7 @@ logs_config: type: "exclude_at_match" pattern: "test-pattern" use_compression: false - compression_level: 3 + compression_level: 1 additional_endpoints: - api_key: "apikey2" Host: "agent-http-intake.logs.datadoghq.com" @@ -714,7 +756,7 @@ apm_config: http: remove_query_string: true remove_paths_with_digits: true - compression_level: 3 + compression_level: 2 features: - "enable_otlp_compute_top_level_by_span_kind" - "enable_stats_by_span_kind" @@ -734,6 +776,9 @@ trace_propagation_extract_first: true trace_propagation_http_baggage_enabled: true trace_aws_service_representation_enabled: true +metrics_config: + compression_level: 3 + # OTLP otlp_config: receiver: @@ -801,6 +846,7 @@ extension_version: "compatibility" api_key: "test-api-key".to_string(), log_level: LogLevel::Debug, flush_timeout: 42, + compression_level: 4, proxy_https: Some("https://proxy.example.com".to_string()), proxy_no_proxy: vec!["localhost".to_string(), "127.0.0.1".to_string()], http_protocol: Some("http1".to_string()), @@ -831,7 +877,7 @@ extension_version: "compatibility" replace_placeholder: None, }]), logs_config_use_compression: false, - logs_config_compression_level: 3, + logs_config_compression_level: 1, logs_config_additional_endpoints: vec![LogsAdditionalEndpoint { api_key: "apikey2".to_string(), host: "agent-http-intake.logs.datadoghq.com".to_string(), @@ -846,7 +892,7 @@ extension_version: "compatibility" apm_replace_tags: Some(vec![]), apm_config_obfuscation_http_remove_query_string: true, apm_config_obfuscation_http_remove_paths_with_digits: true, - apm_config_compression_level: 3, + apm_config_compression_level: 2, apm_features: vec![ "enable_otlp_compute_top_level_by_span_kind".to_string(), "enable_stats_by_span_kind".to_string(), @@ -866,6 +912,7 @@ extension_version: "compatibility" trace_propagation_extract_first: true, trace_propagation_http_baggage_enabled: true, trace_aws_service_representation_enabled: true, + metrics_config_compression_level: 3, otlp_config_traces_enabled: false, otlp_config_traces_span_name_as_resource_name: true, otlp_config_traces_span_name_remappings: HashMap::from([( diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index f8bdbdb96..89e561dd1 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -361,7 +361,7 @@ impl TraceProcessor for ServerlessTraceProcessor { }; let builder = SendDataBuilder::new(body_size, payload, header_tags, &endpoint) - .with_compression(Compression::Zstd(6)) + .with_compression(Compression::Zstd(config.apm_config_compression_level)) .with_retry_strategy(RetryStrategy::new( 1, 100, diff --git a/bottlecap/tests/metrics_integration_test.rs b/bottlecap/tests/metrics_integration_test.rs index 8f9ec8553..60e596a28 100644 --- a/bottlecap/tests/metrics_integration_test.rs +++ b/bottlecap/tests/metrics_integration_test.rs @@ -54,6 +54,7 @@ async fn test_enhanced_metrics() { https_proxy: None, timeout: std::time::Duration::from_secs(5), retry_strategy: dogstatsd::datadog::RetryStrategy::Immediate(1), + compression_level: 6, }; let mut metrics_flusher = MetricsFlusher::new(flusher_config); let lambda_enhanced_metrics = From 6b6af448297930fce7b027bd1ec23fc79d660b5c Mon Sep 17 00:00:00 2001 From: Tianning Li Date: Tue, 2 Sep 2025 11:45:58 -0400 Subject: [PATCH 3/4] Merge branch 'main' into tianning.li/SVLS-7461 --- bottlecap/Cargo.lock | 6 +- bottlecap/src/config/env.rs | 7 ++ bottlecap/src/config/mod.rs | 122 +++++++++++--------- bottlecap/src/metrics/enhanced/constants.rs | 2 - bottlecap/src/metrics/enhanced/lambda.rs | 86 ++------------ bottlecap/src/otlp/mod.rs | 6 +- 6 files changed, 90 insertions(+), 139 deletions(-) diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index c04092ea5..75ec2b291 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -763,7 +763,7 @@ dependencies = [ [[package]] name = "datadog-fips" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=abfec752b0638a9e4096e1465acd4bb2651edfa7#abfec752b0638a9e4096e1465acd4bb2651edfa7" +source = "git+https://github.com/DataDog/serverless-components?rev=b0b0cb9310d8d8f2038c00a46e3267e21dc3e287#b0b0cb9310d8d8f2038c00a46e3267e21dc3e287" dependencies = [ "reqwest", "rustls", @@ -1003,9 +1003,9 @@ dependencies = [ [[package]] name = "dogstatsd" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=abfec752b0638a9e4096e1465acd4bb2651edfa7#abfec752b0638a9e4096e1465acd4bb2651edfa7" +source = "git+https://github.com/DataDog/serverless-components?rev=b0b0cb9310d8d8f2038c00a46e3267e21dc3e287#b0b0cb9310d8d8f2038c00a46e3267e21dc3e287" dependencies = [ - "datadog-fips 0.1.0 (git+https://github.com/DataDog/serverless-components?rev=abfec752b0638a9e4096e1465acd4bb2651edfa7)", + "datadog-fips 0.1.0 (git+https://github.com/DataDog/serverless-components?rev=b0b0cb9310d8d8f2038c00a46e3267e21dc3e287)", "datadog-protos 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "ddsketch-agent 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "derive_more", diff --git a/bottlecap/src/config/env.rs b/bottlecap/src/config/env.rs index f416461c2..2975d52b1 100644 --- a/bottlecap/src/config/env.rs +++ b/bottlecap/src/config/env.rs @@ -107,6 +107,12 @@ pub struct EnvConfig { /// @env `DD_TAGS` #[serde(deserialize_with = "deserialize_key_value_pairs")] pub tags: HashMap, + /// @env `DD_COMPRESSION_LEVEL` + /// + /// Global level `compression_level` parameter accepts values from 0 (no compression) + /// to 9 (maximum compression but higher resource usage). This value is effective only if + /// the individual component doesn't specify its own. + pub compression_level: Option, /// @env `DD_COMPRESSION_LEVEL` /// @@ -624,6 +630,7 @@ mod tests { jail.set_env("DD_SERVICE", "test-service"); jail.set_env("DD_VERSION", "1.0.0"); jail.set_env("DD_TAGS", "team:test-team,project:test-project"); + jail.set_env("DD_COMPRESSION_LEVEL", "4"); jail.set_env("DD_COMPRESSION_LEVEL", "4"); diff --git a/bottlecap/src/config/mod.rs b/bottlecap/src/config/mod.rs index d37dcf3cd..59a0e5be5 100644 --- a/bottlecap/src/config/mod.rs +++ b/bottlecap/src/config/mod.rs @@ -245,6 +245,8 @@ pub struct Config { // Timeout for the request to flush data to Datadog endpoint pub flush_timeout: u64, + // Global config of compression levels. + // It would be overridden by the setup for the individual component pub compression_level: i32, // Proxy @@ -452,13 +454,10 @@ impl Default for Config { } fn log_fallback_reason(reason: &str) { - error!("Fallback support for {reason} is no longer available."); + println!("{{\"DD_EXTENSION_FALLBACK_REASON\":\"{reason}\"}}"); } -#[must_use = "fallback reasons should be processed to emit appropriate metrics"] -pub fn fallback(config: &Config) -> Vec { - let mut fallback_reasons = Vec::new(); - +fn fallback(config: &Config) -> Result<(), ConfigError> { // Customer explicitly opted out of the Next Gen extension let opted_out = match config.extension_version.as_deref() { Some("compatibility") => true, @@ -467,18 +466,21 @@ pub fn fallback(config: &Config) -> Vec { }; if opted_out { - let reason = "extension_version"; - log_fallback_reason(reason); - fallback_reasons.push(reason.to_string()); + log_fallback_reason("extension_version"); + return Err(ConfigError::UnsupportedField( + "extension_version".to_string(), + )); } // ASM / .NET // todo(duncanista): Remove once the .NET runtime is fixed if config.serverless_appsec_enabled && has_dotnet_binary() { - let reason = "serverless_appsec_enabled_dotnet"; - log_fallback_reason(reason); - fallback_reasons.push(reason.to_string()); + log_fallback_reason("serverless_appsec_enabled_dotnet"); + return Err(ConfigError::UnsupportedField( + "serverless_appsec_enabled_dotnet".to_string(), + )); } + // OTLP let has_otlp_config = config .otlp_config_receiver_protocols_grpc_endpoint @@ -510,22 +512,25 @@ pub fn fallback(config: &Config) -> Vec { || config.otlp_config_logs_enabled; if has_otlp_config { - let reason = "otel"; - log_fallback_reason(reason); - fallback_reasons.push(reason.to_string()); + log_fallback_reason("otel"); + return Err(ConfigError::UnsupportedField("otel".to_string())); } - fallback_reasons + Ok(()) } #[allow(clippy::module_name_repetitions)] -#[must_use = "configuration must be used to initialize the application"] -pub fn get_config(config_directory: &Path) -> Config { +pub fn get_config(config_directory: &Path) -> Result { let path: std::path::PathBuf = config_directory.join("datadog.yaml"); let mut config_builder = ConfigBuilder::default() .add_source(Box::new(YamlConfigSource { path })) .add_source(Box::new(EnvConfigSource)); - config_builder.build() + + let config = config_builder.build(); + + fallback(&config)?; + + Ok(config) } #[inline] @@ -738,7 +743,11 @@ pub mod tests { figment::Jail::expect_with(|jail| { jail.clear_env(); jail.set_env("DD_EXTENSION_VERSION", "compatibility"); - let _config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect_err("should reject unknown fields"); + assert_eq!( + config, + ConfigError::UnsupportedField("extension_version".to_string()) + ); Ok(()) }); } @@ -752,7 +761,8 @@ pub mod tests { "localhost:4138", ); - let _config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect_err("should reject unknown fields"); + assert_eq!(config, ConfigError::UnsupportedField("otel".to_string())); Ok(()) }); } @@ -772,7 +782,8 @@ pub mod tests { ", )?; - let _config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect_err("should reject unknown fields"); + assert_eq!(config, ConfigError::UnsupportedField("otel".to_string())); Ok(()) }); } @@ -782,7 +793,7 @@ pub mod tests { figment::Jail::expect_with(|jail| { jail.clear_env(); - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse config"); assert_eq!( config.logs_config_logs_dd_url, "https://http-intake.logs.datadoghq.com".to_string() @@ -800,7 +811,7 @@ pub mod tests { "agent-http-intake-pci.logs.datadoghq.com:443", ); - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse config"); assert_eq!( config.logs_config_logs_dd_url, "agent-http-intake-pci.logs.datadoghq.com:443".to_string() @@ -815,7 +826,7 @@ pub mod tests { jail.clear_env(); jail.set_env("DD_APM_DD_URL", "https://trace-pci.agent.datadoghq.com"); - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse config"); assert_eq!( config.apm_dd_url, "https://trace-pci.agent.datadoghq.com/api/v0.2/traces".to_string() @@ -830,7 +841,7 @@ pub mod tests { jail.clear_env(); jail.set_env("DD_DD_URL", "custom_proxy:3128"); - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse config"); assert_eq!(config.dd_url, "custom_proxy:3128".to_string()); Ok(()) }); @@ -842,7 +853,7 @@ pub mod tests { jail.clear_env(); jail.set_env("DD_URL", "custom_proxy:3128"); - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse config"); assert_eq!(config.url, "custom_proxy:3128".to_string()); Ok(()) }); @@ -853,7 +864,7 @@ pub mod tests { figment::Jail::expect_with(|jail| { jail.clear_env(); - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse config"); assert_eq!(config.dd_url, String::new()); Ok(()) }); @@ -863,9 +874,13 @@ pub mod tests { fn test_allowed_but_disabled() { figment::Jail::expect_with(|jail| { jail.clear_env(); - jail.set_env("DD_SERVERLESS_APPSEC_ENABLED", "true"); + jail.set_env( + "DD_OTLP_CONFIG_RECEIVER_PROTOCOLS_GRPC_ENDPOINT", + "localhost:4138", + ); - let _config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect_err("should reject unknown fields"); + assert_eq!(config, ConfigError::UnsupportedField("otel".to_string())); Ok(()) }); } @@ -881,7 +896,7 @@ pub mod tests { ", )?; jail.set_env("DD_SITE", "datad0g.com"); - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse config"); assert_eq!(config.site, "datad0g.com"); Ok(()) }); @@ -897,7 +912,7 @@ pub mod tests { r" ", )?; - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse config"); assert_eq!(config.site, "datadoghq.com"); Ok(()) }); @@ -908,7 +923,7 @@ pub mod tests { figment::Jail::expect_with(|jail| { jail.clear_env(); jail.set_env("DD_SITE", "datadoghq.eu"); - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse config"); assert_eq!(config.site, "datadoghq.eu"); Ok(()) }); @@ -919,7 +934,7 @@ pub mod tests { figment::Jail::expect_with(|jail| { jail.clear_env(); jail.set_env("DD_LOG_LEVEL", "TRACE"); - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse config"); assert_eq!(config.log_level, LogLevel::Trace); Ok(()) }); @@ -929,7 +944,7 @@ pub mod tests { fn test_parse_default() { figment::Jail::expect_with(|jail| { jail.clear_env(); - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse config"); assert_eq!( config, Config { @@ -953,7 +968,7 @@ pub mod tests { figment::Jail::expect_with(|jail| { jail.clear_env(); jail.set_env("DD_PROXY_HTTPS", "my-proxy:3128"); - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse config"); assert_eq!(config.proxy_https, Some("my-proxy:3128".to_string())); Ok(()) }); @@ -969,7 +984,7 @@ pub mod tests { "NO_PROXY", "127.0.0.1,localhost,172.16.0.0/12,us-east-1.amazonaws.com,datadoghq.eu", ); - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse noproxy"); assert_eq!(config.proxy_https, None); Ok(()) }); @@ -987,7 +1002,7 @@ pub mod tests { ", )?; - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse weird proxy config"); assert_eq!(config.proxy_https, Some("my-proxy:3128".to_string())); Ok(()) }); @@ -1007,7 +1022,7 @@ pub mod tests { ", )?; - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse weird proxy config"); assert_eq!(config.proxy_https, None); // Assertion to ensure config.site runs before proxy // because we chenck that noproxy contains the site @@ -1021,7 +1036,7 @@ pub mod tests { figment::Jail::expect_with(|jail| { jail.clear_env(); jail.set_env("DD_SERVERLESS_FLUSH_STRATEGY", "end"); - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse config"); assert_eq!(config.serverless_flush_strategy, FlushStrategy::End); Ok(()) }); @@ -1032,7 +1047,7 @@ pub mod tests { figment::Jail::expect_with(|jail| { jail.clear_env(); jail.set_env("DD_SERVERLESS_FLUSH_STRATEGY", "periodically,100000"); - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse config"); assert_eq!( config.serverless_flush_strategy, FlushStrategy::Periodically(PeriodicStrategy { interval: 100_000 }) @@ -1046,7 +1061,7 @@ pub mod tests { figment::Jail::expect_with(|jail| { jail.clear_env(); jail.set_env("DD_SERVERLESS_FLUSH_STRATEGY", "invalid_strategy"); - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse config"); assert_eq!(config.serverless_flush_strategy, FlushStrategy::Default); Ok(()) }); @@ -1060,7 +1075,7 @@ pub mod tests { "DD_SERVERLESS_FLUSH_STRATEGY", "periodically,invalid_interval", ); - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse config"); assert_eq!(config.serverless_flush_strategy, FlushStrategy::Default); Ok(()) }); @@ -1073,7 +1088,7 @@ pub mod tests { jail.set_env("DD_VERSION", "123"); jail.set_env("DD_ENV", "123456890"); jail.set_env("DD_SERVICE", "123456"); - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse config"); assert_eq!(config.version.expect("failed to parse DD_VERSION"), "123"); assert_eq!(config.env.expect("failed to parse DD_ENV"), "123456890"); assert_eq!( @@ -1103,7 +1118,7 @@ pub mod tests { pattern: exclude-me-yaml ", )?; - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse config"); assert_eq!( config.logs_config_processing_rules, Some(vec![ProcessingRule { @@ -1132,7 +1147,7 @@ pub mod tests { pattern: exclude ", )?; - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse config"); assert_eq!( config.logs_config_processing_rules, Some(vec![ProcessingRule { @@ -1161,7 +1176,7 @@ pub mod tests { repl: 'REDACTED' ", )?; - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse config"); let rule = parse_rules_from_string( r#"[ {"name": "*", "pattern": "foo", "repl": "REDACTED"} @@ -1192,7 +1207,7 @@ pub mod tests { repl: 'REDACTED-YAML' ", )?; - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse config"); let rule = parse_rules_from_string( r#"[ {"name": "*", "pattern": "foo", "repl": "REDACTED-ENV"} @@ -1219,7 +1234,7 @@ pub mod tests { remove_paths_with_digits: true ", )?; - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse config"); assert!(config.apm_config_obfuscation_http_remove_query_string,); assert!(config.apm_config_obfuscation_http_remove_paths_with_digits,); Ok(()) @@ -1234,7 +1249,7 @@ pub mod tests { "datadog,tracecontext,b3,b3multi", ); jail.set_env("DD_EXTENSION_VERSION", "next"); - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse config"); let expected_styles = vec![ TracePropagationStyle::Datadog, @@ -1253,7 +1268,7 @@ pub mod tests { figment::Jail::expect_with(|jail| { jail.clear_env(); jail.set_env("DD_TRACE_PROPAGATION_STYLE_EXTRACT", "datadog"); - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse config"); assert_eq!( config.trace_propagation_style, @@ -1278,7 +1293,8 @@ pub mod tests { "DD_APM_REPLACE_TAGS", r#"[{"name":"resource.name","pattern":"(.*)/(foo[:%].+)","repl":"$1/{foo}"}]"#, ); - let _config = get_config(Path::new("")); + let config = get_config(Path::new("")); + assert!(config.is_ok()); Ok(()) }); } @@ -1291,7 +1307,7 @@ pub mod tests { jail.set_env("DD_ENHANCED_METRICS", "1"); jail.set_env("DD_LOGS_CONFIG_USE_COMPRESSION", "TRUE"); jail.set_env("DD_CAPTURE_LAMBDA_PAYLOAD", "0"); - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse config"); assert!(config.serverless_logs_enabled); assert!(config.enhanced_metrics); assert!(config.logs_config_use_compression); @@ -1315,7 +1331,7 @@ pub mod tests { jail.set_env("DD_SITE", "us5.datadoghq.com"); jail.set_env("DD_API_KEY", "env-api-key"); jail.set_env("DD_FLUSH_TIMEOUT", "10"); - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse config"); assert_eq!(config.site, "us5.datadoghq.com"); assert_eq!(config.api_key, "env-api-key"); diff --git a/bottlecap/src/metrics/enhanced/constants.rs b/bottlecap/src/metrics/enhanced/constants.rs index 110c0b618..44666910f 100644 --- a/bottlecap/src/metrics/enhanced/constants.rs +++ b/bottlecap/src/metrics/enhanced/constants.rs @@ -45,8 +45,6 @@ pub const THREADS_USE_METRIC: &str = "aws.lambda.enhanced.threads_use"; pub const SHUTDOWNS_METRIC: &str = "aws.lambda.enhanced.shutdowns"; //pub const ASM_INVOCATIONS_METRIC: &str = "aws.lambda.enhanced.asm.invocations"; pub const UNUSED_INIT: &str = "aws.lambda.enhanced.unused_init"; -pub const DATADOG_SERVERLESS_EXTENSION_FAILOVER_CONFIG_ISSUE_METRIC: &str = - "datadog.serverless.extension.failover"; pub const ENHANCED_METRICS_ENV_VAR: &str = "DD_ENHANCED_METRICS"; // Monitoring interval for tmp, fd, and threads metrics diff --git a/bottlecap/src/metrics/enhanced/lambda.rs b/bottlecap/src/metrics/enhanced/lambda.rs index 04287c317..7e3276251 100644 --- a/bottlecap/src/metrics/enhanced/lambda.rs +++ b/bottlecap/src/metrics/enhanced/lambda.rs @@ -60,29 +60,18 @@ impl Lambda { .insert(String::from("runtime"), runtime.to_string()); } - fn tags_to_sorted_tags(tags: &HashMap) -> Option { - let vec_tags: Vec = tags.iter().map(|(k, v)| format!("{k}:{v}")).collect(); + fn get_dynamic_value_tags(&self) -> Option { + let vec_tags: Vec = self + .dynamic_value_tags + .iter() + .map(|(k, v)| format!("{k}:{v}")) + .collect(); let string_tags = vec_tags.join(","); SortedTags::parse(&string_tags).ok() } - fn get_dynamic_value_tags(&self) -> Option { - Self::tags_to_sorted_tags(&self.dynamic_value_tags) - } - - fn get_combined_tags(&self, additional_tags: &HashMap) -> Option { - if additional_tags.is_empty() { - return self.get_dynamic_value_tags(); - } - - let mut combined_tags = self.dynamic_value_tags.clone(); - combined_tags.extend(additional_tags.clone()); - - Self::tags_to_sorted_tags(&combined_tags) - } - pub fn increment_invocation_metric(&self, timestamp: i64) { self.increment_metric(constants::INVOCATIONS_METRIC, timestamp); } @@ -105,19 +94,6 @@ impl Lambda { self.increment_metric(constants::OUT_OF_MEMORY_METRIC, timestamp); } - /// Set up a metric tracking configuration load issue with details - pub fn set_config_load_issue_metric(&self, timestamp: i64, reason_msg: &str) { - let dynamic_tags = self.get_combined_tags(&HashMap::from([( - "reason".to_string(), - reason_msg.to_string(), - )])); - self.increment_metric_with_tags( - constants::DATADOG_SERVERLESS_EXTENSION_FAILOVER_CONFIG_ISSUE_METRIC, - timestamp, - dynamic_tags, - ); - } - pub fn set_init_duration_metric( &mut self, init_type: InitType, @@ -146,23 +122,13 @@ impl Lambda { } fn increment_metric(&self, metric_name: &str, timestamp: i64) { - self.increment_metric_with_tags(metric_name, timestamp, self.get_dynamic_value_tags()); - } - - /// Helper function to emit metric with supplied tags - fn increment_metric_with_tags( - &self, - metric_name: &str, - timestamp: i64, - tags: Option, - ) { if !self.config.enhanced_metrics { return; } let metric = Metric::new( metric_name.into(), MetricValue::distribution(1f64), - tags, + self.get_dynamic_value_tags(), Some(timestamp), ); if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { @@ -801,19 +767,9 @@ mod tests { } async fn assert_sketch(handle: &AggregatorHandle, metric_id: &str, value: f64, timestamp: i64) { - assert_sketch_with_tag(handle, metric_id, value, timestamp, None).await; - } - - async fn assert_sketch_with_tag( - handle: &AggregatorHandle, - metric_id: &str, - value: f64, - timestamp: i64, - tags: Option, - ) { let ts = (timestamp / 10) * 10; if let Some(e) = handle - .get_entry_by_id(metric_id.into(), tags, ts) + .get_entry_by_id(metric_id.into(), None, ts) .await .unwrap() { @@ -1415,30 +1371,4 @@ mod tests { .is_none() ); } - - #[tokio::test] - async fn test_set_config_load_issue_metric() { - let (metrics_aggr, my_config) = setup(); - let lambda = Lambda::new(metrics_aggr.clone(), my_config); - let now: i64 = std::time::UNIX_EPOCH - .elapsed() - .expect("unable to poll clock, unrecoverable") - .as_secs() - .try_into() - .unwrap_or_default(); - let test_reason = "test_config_issue"; - - lambda.set_config_load_issue_metric(now, test_reason); - - // Create the expected tags for the metric lookup - let expected_tags = SortedTags::parse(&format!("reason:{test_reason}")).ok(); - assert_sketch_with_tag( - &metrics_aggr, - constants::DATADOG_SERVERLESS_EXTENSION_FAILOVER_CONFIG_ISSUE_METRIC, - 1f64, - now, - expected_tags, - ) - .await; - } } diff --git a/bottlecap/src/otlp/mod.rs b/bottlecap/src/otlp/mod.rs index 4e0f4aed7..292a80180 100644 --- a/bottlecap/src/otlp/mod.rs +++ b/bottlecap/src/otlp/mod.rs @@ -37,7 +37,7 @@ mod tests { ", )?; - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse config"); // Since the default for traces is `true`, we don't need to set it. assert!(should_enable_otlp_agent(&Arc::new(config))); @@ -55,7 +55,7 @@ mod tests { "0.0.0.0:4318", ); - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse config"); // Since the default for traces is `true`, we don't need to set it. assert!(should_enable_otlp_agent(&Arc::new(config))); @@ -74,7 +74,7 @@ mod tests { "0.0.0.0:4318", ); - let config = get_config(Path::new("")); + let config = get_config(Path::new("")).expect("should parse config"); assert!(!should_enable_otlp_agent(&Arc::new(config))); From d161fa8ac027d9bfaaa4ba52de0bf58f2a423194 Mon Sep 17 00:00:00 2001 From: Tianning Li Date: Fri, 5 Sep 2025 12:41:35 -0400 Subject: [PATCH 4/4] feat: add configurable compression levels for telemetry data --- bottlecap/Cargo.lock | 6 +-- bottlecap/Cargo.toml | 2 +- bottlecap/src/bin/bottlecap/main.rs | 3 ++ bottlecap/src/config/env.rs | 41 +++++++++++++-- bottlecap/src/config/mod.rs | 12 +++++ bottlecap/src/config/yaml.rs | 56 +++++++++++++++++++-- bottlecap/src/traces/trace_processor.rs | 2 +- bottlecap/tests/metrics_integration_test.rs | 1 + 8 files changed, 109 insertions(+), 14 deletions(-) diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index c04092ea5..75ec2b291 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -763,7 +763,7 @@ dependencies = [ [[package]] name = "datadog-fips" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=abfec752b0638a9e4096e1465acd4bb2651edfa7#abfec752b0638a9e4096e1465acd4bb2651edfa7" +source = "git+https://github.com/DataDog/serverless-components?rev=b0b0cb9310d8d8f2038c00a46e3267e21dc3e287#b0b0cb9310d8d8f2038c00a46e3267e21dc3e287" dependencies = [ "reqwest", "rustls", @@ -1003,9 +1003,9 @@ dependencies = [ [[package]] name = "dogstatsd" version = "0.1.0" -source = "git+https://github.com/DataDog/serverless-components?rev=abfec752b0638a9e4096e1465acd4bb2651edfa7#abfec752b0638a9e4096e1465acd4bb2651edfa7" +source = "git+https://github.com/DataDog/serverless-components?rev=b0b0cb9310d8d8f2038c00a46e3267e21dc3e287#b0b0cb9310d8d8f2038c00a46e3267e21dc3e287" dependencies = [ - "datadog-fips 0.1.0 (git+https://github.com/DataDog/serverless-components?rev=abfec752b0638a9e4096e1465acd4bb2651edfa7)", + "datadog-fips 0.1.0 (git+https://github.com/DataDog/serverless-components?rev=b0b0cb9310d8d8f2038c00a46e3267e21dc3e287)", "datadog-protos 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "ddsketch-agent 0.1.0 (git+https://github.com/DataDog/saluki/?rev=c89b58e5784b985819baf11f13f7d35876741222)", "derive_more", diff --git a/bottlecap/Cargo.toml b/bottlecap/Cargo.toml index c5ead6a50..a0cef5ed6 100644 --- a/bottlecap/Cargo.toml +++ b/bottlecap/Cargo.toml @@ -62,7 +62,7 @@ datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "9405db9cb4ef733f3954c3ee77ce71a502e98e50" , features = ["mini_agent"] } datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "9405db9cb4ef733f3954c3ee77ce71a502e98e50" } datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "9405db9cb4ef733f3954c3ee77ce71a502e98e50" } -dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "abfec752b0638a9e4096e1465acd4bb2651edfa7", default-features = false } +dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "b0b0cb9310d8d8f2038c00a46e3267e21dc3e287", default-features = false } datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "fa1d2f4ea2c4c2596144a1f362935e56cf0cb3c7", default-features = false } libddwaf = { version = "1.26.0", git = "https://github.com/DataDog/libddwaf-rust", rev = "1d57bf0ca49782723e556ba327ee7f378978aaa7", default-features = false, features = ["serde", "dynamic"] } diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 0e87473f0..0acaf4312 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -340,6 +340,7 @@ async fn main() -> Result<()> { let start_time = Instant::now(); init_ustr(); let (aws_config, aws_credentials, config) = load_configs(start_time); + println!("Using config: {config:?}"); enable_logging_subsystem(&config); log_fips_status(&aws_config.region); @@ -1048,6 +1049,7 @@ fn start_metrics_flushers( https_proxy: config.proxy_https.clone(), timeout: Duration::from_secs(config.flush_timeout), retry_strategy: DsdRetryStrategy::Immediate(3), + compression_level: config.metrics_config_compression_level, }; flushers.push(MetricsFlusher::new(flusher_config)); @@ -1076,6 +1078,7 @@ fn start_metrics_flushers( https_proxy: config.proxy_https.clone(), timeout: Duration::from_secs(config.flush_timeout), retry_strategy: DsdRetryStrategy::Immediate(3), + compression_level: config.metrics_config_compression_level, }; flushers.push(MetricsFlusher::new(additional_flusher_config)); } diff --git a/bottlecap/src/config/env.rs b/bottlecap/src/config/env.rs index b6d444fc5..212843f1e 100644 --- a/bottlecap/src/config/env.rs +++ b/bottlecap/src/config/env.rs @@ -107,6 +107,12 @@ pub struct EnvConfig { /// @env `DD_TAGS` #[serde(deserialize_with = "deserialize_key_value_pairs")] pub tags: HashMap, + /// @env `DD_COMPRESSION_LEVEL` + /// + /// Global level `compression_level` parameter accepts values from 0 (no compression) + /// to 9 (maximum compression but higher resource usage). This value is effective only if + /// the individual component doesn't specify its own. + pub compression_level: Option, // Logs /// @env `DD_LOGS_CONFIG_LOGS_DD_URL` @@ -229,6 +235,12 @@ pub struct EnvConfig { #[serde(deserialize_with = "deserialize_optional_bool_from_anything")] pub trace_propagation_http_baggage_enabled: Option, + /// @env `DD_METRICS_CONFIG_COMPRESSION_LEVEL` + /// The metrics compresses traces before sending them. The `compression_level` parameter + /// accepts values from 0 (no compression) to 9 (maximum compression but + /// higher resource usage). + pub metrics_config_compression_level: Option, + // OTLP // // - APM / Traces @@ -393,10 +405,18 @@ fn merge_config(config: &mut Config, env_config: &EnvConfig) { merge_string!(config, env_config, url); merge_hashmap!(config, env_config, additional_endpoints); + merge_option_to_value!(config, env_config, compression_level); + // Logs merge_string!(config, env_config, logs_config_logs_dd_url); merge_option!(config, env_config, logs_config_processing_rules); merge_option_to_value!(config, env_config, logs_config_use_compression); + merge_option_to_value!( + config, + logs_config_compression_level, + env_config, + compression_level + ); merge_option_to_value!(config, env_config, logs_config_compression_level); merge_vec!(config, env_config, logs_config_additional_endpoints); @@ -429,6 +449,15 @@ fn merge_config(config: &mut Config, env_config: &EnvConfig) { merge_option_to_value!(config, env_config, trace_propagation_extract_first); merge_option_to_value!(config, env_config, trace_propagation_http_baggage_enabled); + // Metrics + merge_option_to_value!( + config, + metrics_config_compression_level, + env_config, + compression_level + ); + merge_option_to_value!(config, env_config, metrics_config_compression_level); + // OTLP merge_option_to_value!(config, env_config, otlp_config_traces_enabled); merge_option_to_value!( @@ -588,6 +617,7 @@ mod tests { jail.set_env("DD_SERVICE", "test-service"); jail.set_env("DD_VERSION", "1.0.0"); jail.set_env("DD_TAGS", "team:test-team,project:test-project"); + jail.set_env("DD_COMPRESSION_LEVEL", "4"); // Logs jail.set_env("DD_LOGS_CONFIG_LOGS_DD_URL", "https://logs.datadoghq.com"); @@ -596,7 +626,7 @@ mod tests { r#"[{"type":"exclude_at_match","name":"exclude","pattern":"exclude"}]"#, ); jail.set_env("DD_LOGS_CONFIG_USE_COMPRESSION", "false"); - jail.set_env("DD_LOGS_CONFIG_COMPRESSION_LEVEL", "3"); + jail.set_env("DD_LOGS_CONFIG_COMPRESSION_LEVEL", "1"); jail.set_env( "DD_LOGS_CONFIG_ADDITIONAL_ENDPOINTS", "[{\"api_key\": \"apikey2\", \"Host\": \"agent-http-intake.logs.datadoghq.com\", \"Port\": 443, \"is_reliable\": true}]", @@ -615,7 +645,7 @@ mod tests { "DD_APM_CONFIG_OBFUSCATION_HTTP_REMOVE_PATHS_WITH_DIGITS", "true", ); - jail.set_env("DD_APM_CONFIG_COMPRESSION_LEVEL", "3"); + jail.set_env("DD_APM_CONFIG_COMPRESSION_LEVEL", "2"); jail.set_env( "DD_APM_FEATURES", "enable_otlp_compute_top_level_by_span_kind,enable_stats_by_span_kind", @@ -632,6 +662,7 @@ mod tests { "env:^test.*$ debug:^true$", ); + jail.set_env("DD_METRICS_CONFIG_COMPRESSION_LEVEL", "3"); // Trace Propagation jail.set_env("DD_TRACE_PROPAGATION_STYLE", "datadog"); jail.set_env("DD_TRACE_PROPAGATION_STYLE_EXTRACT", "b3"); @@ -721,6 +752,7 @@ mod tests { site: "test-site".to_string(), api_key: "test-api-key".to_string(), log_level: LogLevel::Debug, + compression_level: 4, flush_timeout: 42, proxy_https: Some("https://proxy.example.com".to_string()), proxy_no_proxy: vec!["localhost".to_string(), "127.0.0.1".to_string()], @@ -752,7 +784,7 @@ mod tests { replace_placeholder: None, }]), logs_config_use_compression: false, - logs_config_compression_level: 3, + logs_config_compression_level: 1, logs_config_additional_endpoints: vec![LogsAdditionalEndpoint { api_key: "apikey2".to_string(), host: "agent-http-intake.logs.datadoghq.com".to_string(), @@ -772,7 +804,7 @@ mod tests { ), apm_config_obfuscation_http_remove_query_string: true, apm_config_obfuscation_http_remove_paths_with_digits: true, - apm_config_compression_level: 3, + apm_config_compression_level: 2, apm_features: vec![ "enable_otlp_compute_top_level_by_span_kind".to_string(), "enable_stats_by_span_kind".to_string(), @@ -808,6 +840,7 @@ mod tests { trace_propagation_extract_first: true, trace_propagation_http_baggage_enabled: true, trace_aws_service_representation_enabled: true, + metrics_config_compression_level: 3, otlp_config_traces_enabled: false, otlp_config_traces_span_name_as_resource_name: true, otlp_config_traces_span_name_remappings: HashMap::from([( diff --git a/bottlecap/src/config/mod.rs b/bottlecap/src/config/mod.rs index 0dc7b734f..59a0e5be5 100644 --- a/bottlecap/src/config/mod.rs +++ b/bottlecap/src/config/mod.rs @@ -245,6 +245,10 @@ pub struct Config { // Timeout for the request to flush data to Datadog endpoint pub flush_timeout: u64, + // Global config of compression levels. + // It would be overridden by the setup for the individual component + pub compression_level: i32, + // Proxy pub proxy_https: Option, pub proxy_no_proxy: Vec, @@ -291,6 +295,9 @@ pub struct Config { pub trace_propagation_http_baggage_enabled: bool, pub trace_aws_service_representation_enabled: bool, + // Metrics + pub metrics_config_compression_level: i32, + // OTLP // // - APM / Traces @@ -368,6 +375,8 @@ impl Default for Config { version: None, tags: HashMap::new(), + compression_level: 6, + // Logs logs_config_logs_dd_url: String::default(), logs_config_processing_rules: None, @@ -397,6 +406,9 @@ impl Default for Config { trace_propagation_extract_first: false, trace_propagation_http_baggage_enabled: false, + // Metrics + metrics_config_compression_level: 6, + // OTLP otlp_config_traces_enabled: true, otlp_config_traces_span_name_as_resource_name: false, diff --git a/bottlecap/src/config/yaml.rs b/bottlecap/src/config/yaml.rs index b8dca19b3..8017873bb 100644 --- a/bottlecap/src/config/yaml.rs +++ b/bottlecap/src/config/yaml.rs @@ -38,6 +38,8 @@ pub struct YamlConfig { pub flush_timeout: Option, + pub compression_level: Option, + // Proxy pub proxy: ProxyConfig, // nit: this should probably be in the endpoints section @@ -78,6 +80,9 @@ pub struct YamlConfig { #[serde(deserialize_with = "deserialize_optional_bool_from_anything")] pub trace_propagation_http_baggage_enabled: Option, + // Metrics + pub metrics_config: MetricsConfig, + // OTLP pub otlp_config: Option, @@ -131,6 +136,15 @@ pub struct LogsConfig { pub additional_endpoints: Vec, } +/// Metrics specific config +/// +#[derive(Debug, PartialEq, Deserialize, Clone, Copy, Default)] +#[serde(default)] +#[allow(clippy::module_name_repetitions)] +pub struct MetricsConfig { + pub compression_level: Option, +} + /// APM Config /// @@ -373,6 +387,7 @@ fn merge_config(config: &mut Config, yaml_config: &YamlConfig) { merge_option!(config, yaml_config, version); merge_hashmap!(config, yaml_config, tags); + merge_option_to_value!(config, yaml_config, compression_level); // Proxy merge_option!(config, proxy_https, yaml_config.proxy, https); merge_option_to_value!(config, proxy_no_proxy, yaml_config.proxy, no_proxy); @@ -401,6 +416,12 @@ fn merge_config(config: &mut Config, yaml_config: &YamlConfig) { yaml_config.logs_config, use_compression ); + merge_option_to_value!( + config, + logs_config_compression_level, + yaml_config, + compression_level + ); merge_option_to_value!( config, logs_config_compression_level, @@ -414,6 +435,20 @@ fn merge_config(config: &mut Config, yaml_config: &YamlConfig) { additional_endpoints ); + merge_option_to_value!( + config, + metrics_config_compression_level, + yaml_config, + compression_level + ); + + merge_option_to_value!( + config, + metrics_config_compression_level, + yaml_config.metrics_config, + compression_level + ); + // APM merge_hashmap!(config, yaml_config, service_mapping); merge_string!(config, apm_dd_url, yaml_config.apm_config, apm_dd_url); @@ -423,6 +458,12 @@ fn merge_config(config: &mut Config, yaml_config: &YamlConfig) { yaml_config.apm_config, replace_tags ); + merge_option_to_value!( + config, + apm_config_compression_level, + yaml_config, + compression_level + ); merge_option_to_value!( config, apm_config_compression_level, @@ -667,7 +708,7 @@ site: "test-site" api_key: "test-api-key" log_level: "debug" flush_timeout: 42 - +compression_level: 4 # Proxy proxy: https: "https://proxy.example.com" @@ -699,7 +740,7 @@ logs_config: type: "exclude_at_match" pattern: "test-pattern" use_compression: false - compression_level: 3 + compression_level: 1 additional_endpoints: - api_key: "apikey2" Host: "agent-http-intake.logs.datadoghq.com" @@ -714,7 +755,7 @@ apm_config: http: remove_query_string: true remove_paths_with_digits: true - compression_level: 3 + compression_level: 2 features: - "enable_otlp_compute_top_level_by_span_kind" - "enable_stats_by_span_kind" @@ -734,6 +775,9 @@ trace_propagation_extract_first: true trace_propagation_http_baggage_enabled: true trace_aws_service_representation_enabled: true +metrics_config: + compression_level: 3 + # OTLP otlp_config: receiver: @@ -801,6 +845,7 @@ extension_version: "compatibility" api_key: "test-api-key".to_string(), log_level: LogLevel::Debug, flush_timeout: 42, + compression_level: 4, proxy_https: Some("https://proxy.example.com".to_string()), proxy_no_proxy: vec!["localhost".to_string(), "127.0.0.1".to_string()], http_protocol: Some("http1".to_string()), @@ -831,7 +876,7 @@ extension_version: "compatibility" replace_placeholder: None, }]), logs_config_use_compression: false, - logs_config_compression_level: 3, + logs_config_compression_level: 1, logs_config_additional_endpoints: vec![LogsAdditionalEndpoint { api_key: "apikey2".to_string(), host: "agent-http-intake.logs.datadoghq.com".to_string(), @@ -846,7 +891,7 @@ extension_version: "compatibility" apm_replace_tags: Some(vec![]), apm_config_obfuscation_http_remove_query_string: true, apm_config_obfuscation_http_remove_paths_with_digits: true, - apm_config_compression_level: 3, + apm_config_compression_level: 2, apm_features: vec![ "enable_otlp_compute_top_level_by_span_kind".to_string(), "enable_stats_by_span_kind".to_string(), @@ -866,6 +911,7 @@ extension_version: "compatibility" trace_propagation_extract_first: true, trace_propagation_http_baggage_enabled: true, trace_aws_service_representation_enabled: true, + metrics_config_compression_level: 3, otlp_config_traces_enabled: false, otlp_config_traces_span_name_as_resource_name: true, otlp_config_traces_span_name_remappings: HashMap::from([( diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index f8bdbdb96..89e561dd1 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -361,7 +361,7 @@ impl TraceProcessor for ServerlessTraceProcessor { }; let builder = SendDataBuilder::new(body_size, payload, header_tags, &endpoint) - .with_compression(Compression::Zstd(6)) + .with_compression(Compression::Zstd(config.apm_config_compression_level)) .with_retry_strategy(RetryStrategy::new( 1, 100, diff --git a/bottlecap/tests/metrics_integration_test.rs b/bottlecap/tests/metrics_integration_test.rs index 8f9ec8553..60e596a28 100644 --- a/bottlecap/tests/metrics_integration_test.rs +++ b/bottlecap/tests/metrics_integration_test.rs @@ -54,6 +54,7 @@ async fn test_enhanced_metrics() { https_proxy: None, timeout: std::time::Duration::from_secs(5), retry_strategy: dogstatsd::datadog::RetryStrategy::Immediate(1), + compression_level: 6, }; let mut metrics_flusher = MetricsFlusher::new(flusher_config); let lambda_enhanced_metrics =