Skip to content

Commit bac841c

Browse files
authored
fix: Fix log processing rule on orphan log (#912)
# Problem When a user follows [our doc](https://docs.datadoghq.com/serverless/aws_lambda/logs/?tab=serverlessframework#filter-or-scrub-information-from-logs) to set the env var: `DD_LOGS_CONFIG_PROCESSING_RULES `: `[{"type": "exclude_at_match", "name": "exclude_start_and_end_logs", "pattern": "(START|END|REPORT) RequestId"}]`, we not only excludes `START`, `END` and `REPORT` logs, but some other logs as well, such as (1) extension log and (2) error log from the Lambda handler. As a result, errors can be left unobserved. # Bug This is because we first apply the log processing rules to the log we receive from telemetry API, to decide whether to send it to Datadog, then use the same result for "orphan log", which may include extension log and user error log. # This PR For each orphan log, evaluate log processing rules again, instead of reusing the existing result. # Testing ## Setup - Lambda runtime: Node 22 - Handler: ``` const region = process.env["NO_REGION"]; if (!region) { throw new Error("NO_REGION environment variable not set"); } export const handler = async (event) => { const response = { statusCode: 200, body: JSON.stringify('Hello from Lambda!'), }; return response; }; ``` - Env var: `DD_LOGS_CONFIG_PROCESSING_RULES `: `[{"type": "exclude_at_match", "name": "exclude_start_and_end_logs", "pattern": "(START|END|REPORT) RequestId"}]` ## Result before: No log showed up in Datadog's Log Explorer or Live Tail ## Result after: Custom error log and extension log showed up. <img width="1239" height="552" alt="image" src="https://github.com/user-attachments/assets/e3991979-b72d-44a4-b819-c36c7d33aa7c" /> # Notes Thanks @duncanista @astuyve @litianningdatadog for debugging and discussion. Jira: https://datadoghq.atlassian.net/browse/SLES-2469
1 parent ebf4d41 commit bac841c

File tree

1 file changed

+121
-18
lines changed

1 file changed

+121
-18
lines changed

bottlecap/src/logs/lambda/processor.rs

Lines changed: 121 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -324,29 +324,30 @@ impl LambdaProcessor {
324324
}
325325
}
326326

327-
pub async fn process(&mut self, event: TelemetryEvent, aggregator_handle: &AggregatorHandle) {
328-
if let Ok(mut log) = self.make_log(event).await {
329-
let should_send_log = self.logs_enabled
330-
&& LambdaProcessor::apply_rules(&self.rules, &mut log.message.message);
331-
if should_send_log {
332-
if let Ok(serialized_log) = serde_json::to_string(&log) {
333-
// explicitly drop log so we don't accidentally re-use it and push
334-
// duplicate logs to the aggregator
335-
drop(log);
336-
self.ready_logs.push(serialized_log);
337-
}
327+
/// Processes a log, applies filtering rules, serializes it, and queues it for aggregation
328+
fn process_and_queue_log(&mut self, mut log: IntakeLog) {
329+
let should_send_log = self.logs_enabled
330+
&& LambdaProcessor::apply_rules(&self.rules, &mut log.message.message);
331+
if should_send_log {
332+
if let Ok(serialized_log) = serde_json::to_string(&log) {
333+
// explicitly drop log so we don't accidentally re-use it and push
334+
// duplicate logs to the aggregator
335+
drop(log);
336+
self.ready_logs.push(serialized_log);
338337
}
338+
}
339+
}
340+
341+
pub async fn process(&mut self, event: TelemetryEvent, aggregator_handle: &AggregatorHandle) {
342+
if let Ok(log) = self.make_log(event).await {
343+
self.process_and_queue_log(log);
339344

340345
// Process orphan logs, since we have a `request_id` now
341-
for mut orphan_log in self.orphan_logs.drain(..) {
346+
let orphan_logs = std::mem::take(&mut self.orphan_logs);
347+
for mut orphan_log in orphan_logs {
342348
orphan_log.message.lambda.request_id =
343349
Some(self.invocation_context.request_id.clone());
344-
if should_send_log {
345-
if let Ok(serialized_log) = serde_json::to_string(&orphan_log) {
346-
drop(orphan_log);
347-
self.ready_logs.push(serialized_log);
348-
}
349-
}
350+
self.process_and_queue_log(orphan_log);
350351
}
351352
}
352353

@@ -1085,4 +1086,106 @@ mod tests {
10851086
};
10861087
assert_eq!(intake_log, function_log);
10871088
}
1089+
1090+
#[tokio::test]
1091+
async fn test_process_orphan_logs_with_exclude_at_match_rule() {
1092+
use crate::config::processing_rule;
1093+
1094+
// This test verifies that the orphan logs are not excluded when the START/END/REPORT
1095+
// logs matched an exclude_at_match rule.
1096+
let processing_rules = vec![processing_rule::ProcessingRule {
1097+
kind: processing_rule::Kind::ExcludeAtMatch,
1098+
name: "exclude_start_and_end_logs".to_string(),
1099+
pattern: "(START|END|REPORT) RequestId".to_string(),
1100+
replace_placeholder: None,
1101+
}];
1102+
1103+
let config = Arc::new(config::Config {
1104+
service: Some("test-service".to_string()),
1105+
tags: HashMap::from([("test".to_string(), "tags".to_string())]),
1106+
logs_config_processing_rules: Some(processing_rules),
1107+
..config::Config::default()
1108+
});
1109+
1110+
let tags_provider = Arc::new(provider::Provider::new(
1111+
Arc::clone(&config),
1112+
LAMBDA_RUNTIME_SLUG.to_string(),
1113+
&HashMap::from([("function_arn".to_string(), "test-arn".to_string())]),
1114+
));
1115+
1116+
let (tx, _rx) = tokio::sync::mpsc::channel(2);
1117+
let (aggregator_service, aggregator_handle) = AggregatorService::default();
1118+
let service_handle = tokio::spawn(async move {
1119+
aggregator_service.run().await;
1120+
});
1121+
1122+
let mut processor =
1123+
LambdaProcessor::new(Arc::clone(&tags_provider), Arc::clone(&config), tx.clone());
1124+
1125+
// First, send an extension log (orphan) that doesn't have a request_id
1126+
let extension_event = TelemetryEvent {
1127+
time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(),
1128+
record: TelemetryRecord::Extension(Value::String(
1129+
"[Extension] Important extension log that should not be excluded".to_string(),
1130+
)),
1131+
};
1132+
processor
1133+
.process(extension_event.clone(), &aggregator_handle)
1134+
.await;
1135+
1136+
// Verify the extension log is queued as an orphan
1137+
assert_eq!(processor.orphan_logs.len(), 1);
1138+
1139+
// Send a user error log (orphan) that also doesn't have a request_id
1140+
let error_event = TelemetryEvent {
1141+
time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 48).unwrap(),
1142+
record: TelemetryRecord::Function(Value::String(
1143+
"Error: NO_REGION environment variable not set".to_string(),
1144+
)),
1145+
};
1146+
processor
1147+
.process(error_event.clone(), &aggregator_handle)
1148+
.await;
1149+
1150+
// Now we should have 2 orphan logs
1151+
assert_eq!(processor.orphan_logs.len(), 2);
1152+
1153+
// Now send a PlatformStart event that should be excluded by the rule
1154+
let start_event = TelemetryEvent {
1155+
time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 49).unwrap(),
1156+
record: TelemetryRecord::PlatformStart {
1157+
request_id: "test-request-id".to_string(),
1158+
version: Some("test".to_string()),
1159+
},
1160+
};
1161+
processor
1162+
.process(start_event.clone(), &aggregator_handle)
1163+
.await;
1164+
1165+
// The START log should be excluded, but the orphan logs should be sent
1166+
// because they don't match the exclude pattern
1167+
let batches = aggregator_handle.get_batches().await.unwrap();
1168+
assert_eq!(batches.len(), 1);
1169+
1170+
let batch_str = String::from_utf8(batches[0].clone()).unwrap();
1171+
1172+
// Verify the START log is NOT in the batch (it should be excluded)
1173+
assert!(!batch_str.contains("START RequestId"));
1174+
1175+
// Verify the extension log IS in the batch
1176+
assert!(batch_str.contains("Important extension log that should not be excluded"));
1177+
1178+
// Verify the error log IS in the batch
1179+
assert!(batch_str.contains("NO_REGION environment variable not set"));
1180+
1181+
// Verify both logs have the correct request_id assigned
1182+
assert!(batch_str.contains("test-request-id"));
1183+
1184+
aggregator_handle
1185+
.shutdown()
1186+
.expect("Failed to shutdown aggregator service");
1187+
service_handle
1188+
.await
1189+
.expect("Aggregator service task failed");
1190+
}
10881191
}

0 commit comments

Comments
 (0)