From 0ee7eae357057c286df02b41f6c6a9c7e3b221d0 Mon Sep 17 00:00:00 2001 From: Nicolas Moutschen Date: Mon, 1 Nov 2021 21:51:53 +0100 Subject: [PATCH 1/3] chore: add custom tracing layer --- Cargo.toml | 3 +- src/{utils.rs => utils/mod.rs} | 7 +-- src/utils/trace.rs | 85 ++++++++++++++++++++++++++++++++++ 3 files changed, 89 insertions(+), 6 deletions(-) rename src/{utils.rs => utils/mod.rs} (86%) create mode 100644 src/utils/trace.rs diff --git a/Cargo.toml b/Cargo.toml index 84a25db..68886d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ aws-sdk-eventbridge = "0.0.22-alpha" aws-smithy-client = { version = "0.27.0-alpha", features = ["test-util"] } aws-smithy-http = "0.27.0-alpha" aws-types = "0.0.22-alpha" +chrono = "0.4" futures = { version = "0.3", features = ["std"] } lambda_runtime = { version = "0.4", optional = true } lambda_http = { version = "0.4", optional = true } @@ -19,7 +20,7 @@ rayon = { version = "1.5", optional = true } serde = "1" serde_json = "1.0" tracing = "0.1" -tracing-subscriber = { version = "0.2", features = ["fmt", "json"] } +tracing-subscriber = "0.3" tokio = { version = "1", features = ["full"] } [dev-dependencies] diff --git a/src/utils.rs b/src/utils/mod.rs similarity index 86% rename from src/utils.rs rename to src/utils/mod.rs index 6156c90..80483e5 100644 --- a/src/utils.rs +++ b/src/utils/mod.rs @@ -1,11 +1,8 @@ use crate::{event_bus, model, store, Service}; use tracing::{info, instrument}; -/// Setup tracing -pub fn setup_tracing() { - let subscriber = tracing_subscriber::fmt().json().finish(); - tracing::subscriber::set_global_default(subscriber).expect("failed to set tracing subscriber"); -} +mod trace; +pub use trace::setup_tracing; /// Retrieve a service /// diff --git a/src/utils/trace.rs b/src/utils/trace.rs new file mode 100644 index 0000000..56fe67d --- /dev/null +++ b/src/utils/trace.rs @@ -0,0 +1,85 @@ +use chrono::prelude::*; +use std::collections::BTreeMap; +use tracing_subscriber::{prelude::*, Layer}; + +/// Setup tracing +pub fn setup_tracing() { + let layer = LambdaLayer::new(tracing::Level::INFO); + tracing_subscriber::registry().with(layer).init(); +} + +struct LambdaVisitor<'a> { + pub data: &'a mut BTreeMap, +} + +impl<'a> tracing::field::Visit for LambdaVisitor<'a> { + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + self.data + .insert(field.name().to_string(), format!("{:?}", value).into()); + } + + fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { + self.data.insert(field.name().to_string(), value.into()); + } + + fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { + self.data.insert(field.name().to_string(), value.into()); + } + + fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { + self.data.insert(field.name().to_string(), value.into()); + } + + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + self.data.insert(field.name().to_string(), value.into()); + } + + fn record_error( + &mut self, + field: &tracing::field::Field, + value: &(dyn std::error::Error + 'static), + ) { + self.data + .insert(field.name().to_string(), format!("{:?}", value).into()); + } +} + +struct LambdaLayer { + level: tracing::Level, +} + +impl LambdaLayer { + pub fn new(level: tracing::Level) -> Self { + Self { level } + } +} + +impl Layer for LambdaLayer +where + S: tracing::Subscriber, +{ + fn on_event( + &self, + event: &tracing::Event<'_>, + _ctx: tracing_subscriber::layer::Context<'_, S>, + ) { + let metadata = event.metadata(); + + if metadata.level() > &self.level { + return; + } + + let mut data = BTreeMap::new(); + let mut visitor = LambdaVisitor { data: &mut data }; + event.record(&mut visitor); + + let output = serde_json::json!({ + "level": metadata.level().to_string(), + "location": format!("{}:{}", metadata.file().unwrap_or("UNKNOWN"), metadata.line().unwrap_or(0)), + "target": metadata.target(), + "message": data, + "timestamp": Utc::now().to_rfc3339(), + }); + println!("{}", serde_json::to_string(&output).unwrap()); + } +} From b03975967e817f653dd1a5d92824be75a8a9f90c Mon Sep 17 00:00:00 2001 From: Nicolas Moutschen Date: Tue, 2 Nov 2021 16:38:30 +0100 Subject: [PATCH 2/3] chore(BROKEN): add lambda Context in span extension --- src/bin/lambda/delete-product.rs | 5 ++ src/utils/trace.rs | 106 +++++++++++++++++++++++++++---- 2 files changed, 97 insertions(+), 14 deletions(-) diff --git a/src/bin/lambda/delete-product.rs b/src/bin/lambda/delete-product.rs index cecfda1..72f97a5 100644 --- a/src/bin/lambda/delete-product.rs +++ b/src/bin/lambda/delete-product.rs @@ -29,6 +29,11 @@ async fn main() -> Result<(), Box // which matches the signature of the lambda function. // See https://github.com/rust-lang/rust/issues/62290 lambda_runtime::run(handler(|event: Request, ctx: Context| { + let ctx_string = serde_json::to_string(&ctx).unwrap(); + let ctx_str = ctx_string.as_str(); + let span = tracing::span!(tracing::Level::TRACE, "lambda_handler", lambda_context = ctx_str); + let _guard = span.enter(); + delete_product(&service, event, ctx) })) .await?; diff --git a/src/utils/trace.rs b/src/utils/trace.rs index 56fe67d..edd20c3 100644 --- a/src/utils/trace.rs +++ b/src/utils/trace.rs @@ -1,6 +1,8 @@ use chrono::prelude::*; +use lambda_runtime::Context; use std::collections::BTreeMap; use tracing_subscriber::{prelude::*, Layer}; +use tracing::{span::Attributes, Id}; /// Setup tracing pub fn setup_tracing() { @@ -44,6 +46,22 @@ impl<'a> tracing::field::Visit for LambdaVisitor<'a> { } } +struct LambdaContextVisitor { + pub context: Option, +} + +impl tracing::field::Visit for LambdaContextVisitor { + fn record_debug(&mut self, _field: &tracing::field::Field, _value: &dyn std::fmt::Debug) { + } + + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + if field.name() == "lambda_context" { + let context = serde_json::from_str(value).ok(); + self.context = context; + } + } +} + struct LambdaLayer { level: tracing::Level, } @@ -57,29 +75,89 @@ impl LambdaLayer { impl Layer for LambdaLayer where S: tracing::Subscriber, + S: for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup>, { - fn on_event( - &self, - event: &tracing::Event<'_>, - _ctx: tracing_subscriber::layer::Context<'_, S>, - ) { - let metadata = event.metadata(); + fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: tracing_subscriber::layer::Context<'_, S>) { + let mut visitor = LambdaContextVisitor { context: None }; + attrs.record(&mut visitor); + if let Some(context) = visitor.context { + println!("INJECTING CONTEXT: {:?}", context); + let span = ctx.span(id).unwrap(); + let mut extensions = span.extensions_mut(); + extensions.insert(context); + } + } + fn on_event(&self, event: &tracing::Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) { + let metadata = event.metadata(); if metadata.level() > &self.level { return; } + // Find Lambda context + let lambda_ctxs = if let Some(scope) = ctx.event_scope(event) { + scope + .from_root() + .map(|span| { + if let Some(v) = span.extensions().get::() { + println!("FOUND CONTEXT: {:?}", v); + Some(v.clone()) + } else { + None + } + }) + .filter(Option::is_some) + .collect::>() + } else { + println!("EVENT_SCOPE NOT FOUND"); + vec![] + }; + let lambda_ctx = lambda_ctxs.first().unwrap(); + let mut data = BTreeMap::new(); let mut visitor = LambdaVisitor { data: &mut data }; event.record(&mut visitor); - - let output = serde_json::json!({ - "level": metadata.level().to_string(), - "location": format!("{}:{}", metadata.file().unwrap_or("UNKNOWN"), metadata.line().unwrap_or(0)), - "target": metadata.target(), - "message": data, - "timestamp": Utc::now().to_rfc3339(), - }); + + let output = if let Some(lambda_ctx) = lambda_ctx { + // Lambda context found + // + // Adding keys based on the Lambda context + serde_json::json!({ + "level": metadata.level().to_string(), + "location": format!("{}:{}", metadata.file().unwrap_or("UNKNOWN"), metadata.line().unwrap_or(0)), + "target": metadata.target(), + // If data has only one key named 'message', we can just use that as the message. + // This is the default key when using macros such as `info!()` or `debug!()`. + "message": if data.len() == 1 && data.contains_key("message") { + data.remove("message").unwrap().into() + } else { + serde_json::to_value(data).unwrap() + }, + "timestamp": Utc::now().to_rfc3339(), + + // Lambda context keys + "function_name": lambda_ctx.env_config.function_name, + "function_memory_size": lambda_ctx.env_config.memory, + "function_arn": lambda_ctx.invoked_function_arn, + "function_request_id": lambda_ctx.request_id, + "xray_trace_id": lambda_ctx.xray_trace_id, + }) + } else { + // No Lambda context found + serde_json::json!({ + "level": metadata.level().to_string(), + "location": format!("{}:{}", metadata.file().unwrap_or("UNKNOWN"), metadata.line().unwrap_or(0)), + "target": metadata.target(), + // If data has only one key named 'message', we can just use that as the message. + // This is the default key when using macros such as `info!()` or `debug!()`. + "message": if data.len() == 1 && data.contains_key("message") { + data.remove("message").unwrap().into() + } else { + serde_json::to_value(data).unwrap() + }, + "timestamp": Utc::now().to_rfc3339(), + }) + }; println!("{}", serde_json::to_string(&output).unwrap()); } } From 36d003643fe7dfad0c91fa034d959966eeb48f49 Mon Sep 17 00:00:00 2001 From: Nicolas Moutschen Date: Tue, 2 Nov 2021 18:56:46 +0100 Subject: [PATCH 3/3] fix: inject lambda context --- src/bin/lambda/delete-product.rs | 5 -- src/entrypoints/lambda/apigateway.rs | 22 +++++-- src/entrypoints/lambda/dynamodb/mod.rs | 7 ++- src/utils/mod.rs | 2 +- src/utils/trace.rs | 79 ++++++++++++-------------- 5 files changed, 59 insertions(+), 56 deletions(-) diff --git a/src/bin/lambda/delete-product.rs b/src/bin/lambda/delete-product.rs index 72f97a5..cecfda1 100644 --- a/src/bin/lambda/delete-product.rs +++ b/src/bin/lambda/delete-product.rs @@ -29,11 +29,6 @@ async fn main() -> Result<(), Box // which matches the signature of the lambda function. // See https://github.com/rust-lang/rust/issues/62290 lambda_runtime::run(handler(|event: Request, ctx: Context| { - let ctx_string = serde_json::to_string(&ctx).unwrap(); - let ctx_str = ctx_string.as_str(); - let span = tracing::span!(tracing::Level::TRACE, "lambda_handler", lambda_context = ctx_str); - let _guard = span.enter(); - delete_product(&service, event, ctx) })) .await?; diff --git a/src/entrypoints/lambda/apigateway.rs b/src/entrypoints/lambda/apigateway.rs index 45f0860..a922cfb 100644 --- a/src/entrypoints/lambda/apigateway.rs +++ b/src/entrypoints/lambda/apigateway.rs @@ -1,4 +1,4 @@ -use crate::{Product, Service}; +use crate::{Product, Service, utils::inject_lambda_context}; use lambda_http::{ ext::RequestExt, lambda_runtime::Context, Body, IntoResponse, Request, Response, }; @@ -12,8 +12,11 @@ type E = Box; pub async fn delete_product( service: &Service, event: Request, - _: Context, + ctx: Context, ) -> Result { + let span = inject_lambda_context(&ctx); + let _guard = span.enter(); + // Retrieve product ID from event // // If the event doesn't contain a product ID, we return a 400 Bad Request. @@ -62,8 +65,11 @@ pub async fn delete_product( pub async fn get_product( service: &Service, event: Request, - _: Context, + ctx: Context, ) -> Result { + let span = inject_lambda_context(&ctx); + let _guard = span.enter(); + // Retrieve product ID from event. // // If the event doesn't contain a product ID, we return a 400 Bad Request. @@ -112,8 +118,11 @@ pub async fn get_product( pub async fn get_products( service: &Service, _event: Request, - _: Context, + ctx: Context, ) -> Result { + let span = inject_lambda_context(&ctx); + let _guard = span.enter(); + // Retrieve products // TODO: Add pagination let res = service.get_products(None).await; @@ -138,8 +147,11 @@ pub async fn get_products( pub async fn put_product( service: &Service, event: Request, - _: Context, + ctx: Context, ) -> Result { + let span = inject_lambda_context(&ctx); + let _guard = span.enter(); + // Retrieve product ID from event. // // If the event doesn't contain a product ID, we return a 400 Bad Request. diff --git a/src/entrypoints/lambda/dynamodb/mod.rs b/src/entrypoints/lambda/dynamodb/mod.rs index a57d93d..40e4fa1 100644 --- a/src/entrypoints/lambda/dynamodb/mod.rs +++ b/src/entrypoints/lambda/dynamodb/mod.rs @@ -1,4 +1,4 @@ -use crate::{Event, Service}; +use crate::{Event, Service, utils::inject_lambda_context}; use lambda_runtime::Context; use rayon::prelude::*; use tracing::{info, instrument}; @@ -15,8 +15,11 @@ type E = Box; pub async fn parse_events( service: &Service, event: model::DynamoDBEvent, - _: Context, + ctx: Context, ) -> Result<(), E> { + let span = inject_lambda_context(&ctx); + let _guard = span.enter(); + info!("Transform events"); let events = event .records diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 80483e5..039c3e6 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -2,7 +2,7 @@ use crate::{event_bus, model, store, Service}; use tracing::{info, instrument}; mod trace; -pub use trace::setup_tracing; +pub use trace::{inject_lambda_context, setup_tracing}; /// Retrieve a service /// diff --git a/src/utils/trace.rs b/src/utils/trace.rs index edd20c3..48d4a19 100644 --- a/src/utils/trace.rs +++ b/src/utils/trace.rs @@ -10,6 +10,12 @@ pub fn setup_tracing() { tracing_subscriber::registry().with(layer).init(); } +pub fn inject_lambda_context(ctx: &Context) -> tracing::Span { + let ctx_string = serde_json::to_string(ctx).unwrap(); + let ctx_str = ctx_string.as_str(); + tracing::span!(tracing::Level::TRACE, "lambda_handler", lambda_context = ctx_str) +} + struct LambdaVisitor<'a> { pub data: &'a mut BTreeMap, } @@ -81,7 +87,6 @@ where let mut visitor = LambdaContextVisitor { context: None }; attrs.record(&mut visitor); if let Some(context) = visitor.context { - println!("INJECTING CONTEXT: {:?}", context); let span = ctx.span(id).unwrap(); let mut extensions = span.extensions_mut(); extensions.insert(context); @@ -100,64 +105,52 @@ where .from_root() .map(|span| { if let Some(v) = span.extensions().get::() { - println!("FOUND CONTEXT: {:?}", v); Some(v.clone()) } else { None } }) - .filter(Option::is_some) + .filter_map(|c| c) .collect::>() } else { - println!("EVENT_SCOPE NOT FOUND"); - vec![] + Default::default() }; - let lambda_ctx = lambda_ctxs.first().unwrap(); + let lambda_ctx = lambda_ctxs.first(); let mut data = BTreeMap::new(); let mut visitor = LambdaVisitor { data: &mut data }; event.record(&mut visitor); + let output = serde_json::json!({ + "level": metadata.level().to_string(), + "location": format!("{}:{}", metadata.file().unwrap_or("UNKNOWN"), metadata.line().unwrap_or(0)), + "target": metadata.target(), + // If data has only one key named 'message', we can just use that as the message. + // This is the default key when using macros such as `info!()` or `debug!()`. + "message": if data.len() == 1 && data.contains_key("message") { + data.remove("message").unwrap().into() + } else { + serde_json::to_value(data).unwrap() + }, + "timestamp": Utc::now().to_rfc3339(), + }); + let output = if let Some(lambda_ctx) = lambda_ctx { - // Lambda context found - // - // Adding keys based on the Lambda context - serde_json::json!({ - "level": metadata.level().to_string(), - "location": format!("{}:{}", metadata.file().unwrap_or("UNKNOWN"), metadata.line().unwrap_or(0)), - "target": metadata.target(), - // If data has only one key named 'message', we can just use that as the message. - // This is the default key when using macros such as `info!()` or `debug!()`. - "message": if data.len() == 1 && data.contains_key("message") { - data.remove("message").unwrap().into() - } else { - serde_json::to_value(data).unwrap() - }, - "timestamp": Utc::now().to_rfc3339(), - - // Lambda context keys - "function_name": lambda_ctx.env_config.function_name, - "function_memory_size": lambda_ctx.env_config.memory, - "function_arn": lambda_ctx.invoked_function_arn, - "function_request_id": lambda_ctx.request_id, - "xray_trace_id": lambda_ctx.xray_trace_id, - }) + if let serde_json::Value::Object(mut output) = output { + output.insert("function_name".to_string(), lambda_ctx.env_config.function_name.clone().into()); + output.insert("function_memory_size".to_string(), lambda_ctx.env_config.memory.into()); + output.insert("function_arn".to_string(), lambda_ctx.invoked_function_arn.clone().into()); + output.insert("function_request_id".to_string(), lambda_ctx.request_id.clone().into()); + output.insert("xray_trace_id".to_string(), lambda_ctx.xray_trace_id.clone().into()); + + serde_json::Value::Object(output) + } else { + output + } } else { - // No Lambda context found - serde_json::json!({ - "level": metadata.level().to_string(), - "location": format!("{}:{}", metadata.file().unwrap_or("UNKNOWN"), metadata.line().unwrap_or(0)), - "target": metadata.target(), - // If data has only one key named 'message', we can just use that as the message. - // This is the default key when using macros such as `info!()` or `debug!()`. - "message": if data.len() == 1 && data.contains_key("message") { - data.remove("message").unwrap().into() - } else { - serde_json::to_value(data).unwrap() - }, - "timestamp": Utc::now().to_rfc3339(), - }) + output }; + println!("{}", serde_json::to_string(&output).unwrap()); } }