diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index 3f21697fb0..6c936403c1 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -100,7 +100,7 @@ impl LogExporter { impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { async fn export<'a>( &mut self, - batch: Vec>, + batch: Vec>>, ) -> opentelemetry::logs::LogResult<()> { self.client.export(batch).await } diff --git a/opentelemetry-proto/src/transform/common.rs b/opentelemetry-proto/src/transform/common.rs index ff42479288..6e3bdfd8b5 100644 --- a/opentelemetry-proto/src/transform/common.rs +++ b/opentelemetry-proto/src/transform/common.rs @@ -108,6 +108,40 @@ pub mod tonic { } } + impl + From<( + Cow<'_, opentelemetry_sdk::InstrumentationLibrary>, + Option>, + )> for InstrumentationScope + { + fn from( + data: ( + Cow<'_, opentelemetry_sdk::InstrumentationLibrary>, + Option>, + ), + ) -> Self { + let (library, target) = data; + if let Some(t) = target { + InstrumentationScope { + name: t.to_string(), + version: String::new(), + attributes: vec![], + ..Default::default() + } + } else { + InstrumentationScope { + name: library.name.clone().into_owned(), + version: library + .version + .as_ref() + .map(ToString::to_string) + .unwrap_or_default(), + attributes: Attributes::from(library.attributes.clone()).0, + ..Default::default() + } + } + } + } /// Wrapper type for Vec<`KeyValue`> #[derive(Default, Debug)] pub struct Attributes(pub ::std::vec::Vec); diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index dfd845c5d8..d959194364 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -139,7 +139,7 @@ pub mod tonic { impl From<( - opentelemetry_sdk::export::logs::LogData, + opentelemetry_sdk::export::logs::LogData<'_>, &ResourceAttributesWithSchema, )> for ResourceLogs { @@ -164,15 +164,21 @@ pub mod tonic { .clone() .map(Into::into) .unwrap_or_default(), - scope: Some((log_data.instrumentation, log_data.record.target.clone()).into()), - log_records: vec![log_data.record.into()], + scope: Some( + ( + log_data.instrumentation.into_owned(), + log_data.record.target.clone(), + ) + .into(), + ), + log_records: vec![log_data.record.into_owned().into()], }], } } } pub fn group_logs_by_resource_and_scope( - logs: Vec, + logs: Vec>, resource: &ResourceAttributesWithSchema, ) -> Vec { // Group logs by target or instrumentation name @@ -180,14 +186,13 @@ pub mod tonic { HashMap::new(), |mut scope_map: HashMap< Cow<'static, str>, - Vec<&opentelemetry_sdk::export::logs::LogData>, + Vec<&opentelemetry_sdk::export::logs::LogData<'_>>, >, log| { - let key = log - .record - .target - .clone() - .unwrap_or_else(|| log.instrumentation.name.clone()); + let key = + log.record.target.clone().unwrap_or_else(|| { + Cow::Owned(log.instrumentation.name.clone().into_owned()) + }); scope_map.entry(key).or_default().push(log); scope_map }, @@ -197,13 +202,20 @@ pub mod tonic { .into_iter() .map(|(key, log_data)| ScopeLogs { scope: Some(InstrumentationScope::from(( - &log_data.first().unwrap().instrumentation, - Some(key), + Cow::Owned( + log_data + .first() + .unwrap() + .instrumentation + .clone() + .into_owned(), + ), + Some(key.into_owned().into()), ))), schema_url: resource.schema_url.clone().unwrap_or_default(), log_records: log_data .into_iter() - .map(|log_data| log_data.record.clone().into()) + .map(|log_data| log_data.record.clone().into_owned().into()) .collect(), }) .collect(); @@ -225,18 +237,21 @@ mod tests { use opentelemetry::logs::LogRecord as _; use opentelemetry_sdk::export::logs::LogData; use opentelemetry_sdk::{logs::LogRecord, Resource}; + use std::borrow::Cow; use std::time::SystemTime; - fn create_test_log_data(instrumentation_name: &str, _message: &str) -> LogData { + fn create_test_log_data<'a>(instrumentation_name: &str, _message: &str) -> LogData<'a> { let mut logrecord = LogRecord::default(); logrecord.set_timestamp(SystemTime::now()); logrecord.set_observed_timestamp(SystemTime::now()); LogData { - instrumentation: opentelemetry_sdk::InstrumentationLibrary::builder( - instrumentation_name.to_string(), - ) - .build(), - record: logrecord, + instrumentation: Cow::Owned( + opentelemetry_sdk::InstrumentationLibrary::builder( + instrumentation_name.to_string(), + ) + .build(), + ), + record: Cow::Owned(logrecord), } } diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index 9588339462..5d725df910 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -15,7 +15,7 @@ use std::fmt::Debug; #[async_trait] pub trait LogExporter: Send + Sync + Debug { /// Exports a batch of [`LogData`]. - async fn export<'a>(&mut self, batch: Vec>) -> LogResult<()>; + async fn export<'a>(&mut self, batch: Vec>>) -> LogResult<()>; /// Shuts down the exporter. fn shutdown(&mut self) {} #[cfg(feature = "logs_level_enabled")] @@ -29,11 +29,11 @@ pub trait LogExporter: Send + Sync + Debug { /// `LogData` represents a single log event without resource context. #[derive(Clone, Debug)] -pub struct LogData { - /// Log record - pub record: LogRecord, +pub struct LogData<'a> { + /// Log record, which can be borrowed or owned. + pub record: Cow<'a, LogRecord>, /// Instrumentation details for the emitter who produced this `LogEvent`. - pub instrumentation: InstrumentationLibrary, + pub instrumentation: Cow<'a, InstrumentationLibrary>, } /// Describes the result of an export. diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 905dde43e3..74f4890f6b 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -266,8 +266,8 @@ impl opentelemetry::logs::Logger for Logger { } let mut data = LogData { - record: log_record, - instrumentation: self.instrumentation_library().clone(), + record: Cow::Borrowed(&log_record), + instrumentation: Cow::Borrowed(self.instrumentation_library()), }; for p in processors { @@ -328,7 +328,7 @@ mod tests { } impl LogProcessor for ShutdownTestLogProcessor { - fn emit(&self, _data: &mut LogData) { + fn emit(&self, _data: &mut LogData<'_>) { self.is_shutdown .lock() .map(|is_shutdown| { @@ -563,7 +563,7 @@ mod tests { } impl LogProcessor for LazyLogProcessor { - fn emit(&self, _data: &mut LogData) { + fn emit(&self, _data: &mut LogData<'_>) { // nothing to do. } diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index fb5ee42647..711a37b99d 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -55,7 +55,7 @@ pub trait LogProcessor: Send + Sync + Debug { /// /// # Parameters /// - `data`: A mutable reference to `LogData` representing the log record. - fn emit(&self, data: &mut LogData); + fn emit(&self, data: &mut LogData<'_>); /// Force the logs lying in the cache to be exported. fn force_flush(&self) -> LogResult<()>; /// Shuts down the processor. @@ -90,7 +90,7 @@ impl SimpleLogProcessor { } impl LogProcessor for SimpleLogProcessor { - fn emit(&self, data: &mut LogData) { + fn emit(&self, data: &mut LogData<'_>) { // noop after shutdown if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { return; @@ -152,10 +152,14 @@ impl Debug for BatchLogProcessor { } impl LogProcessor for BatchLogProcessor { - fn emit(&self, data: &mut LogData) { + fn emit(&self, data: &mut LogData<'_>) { + let owned_data = LogData { + record: Cow::Owned(data.record.clone().into_owned()), + instrumentation: Cow::Owned(data.instrumentation.clone().into_owned()), + }; let result = self .message_sender - .try_send(BatchMessage::ExportLog(data.clone())); + .try_send(BatchMessage::ExportLog(owned_data)); if let Err(err) = result { global::handle_error(LogError::Other(err.into())); @@ -307,7 +311,7 @@ async fn export_with_timeout<'a, R, E>( time_out: Duration, exporter: &mut E, runtime: &R, - batch: Vec>, + batch: Vec>>, ) -> ExportResult where R: RuntimeChannel, @@ -497,7 +501,7 @@ where #[derive(Debug)] enum BatchMessage { /// Export logs, usually called when the log is emitted. - ExportLog(LogData), + ExportLog(LogData<'static>), /// Flush the current buffer to the backend, it can be triggered by /// pre configured interval or a call to `force_push` function. Flush(Option>), @@ -545,7 +549,7 @@ mod tests { #[async_trait] impl LogExporter for MockLogExporter { - async fn export<'a>(&mut self, _batch: Vec>) -> LogResult<()> { + async fn export<'a>(&mut self, _batch: Vec>>) -> LogResult<()> { Ok(()) } @@ -814,21 +818,29 @@ mod tests { #[derive(Debug)] struct FirstProcessor { - pub(crate) logs: Arc>>, + pub(crate) logs: Arc>>>, } impl LogProcessor for FirstProcessor { - fn emit(&self, data: &mut LogData) { + fn emit(&self, data: &mut LogData<'_>) { + // Ensure the record is owned before modifying + let record = data.record.to_mut(); // Add attribute - data.record.add_attribute( + record.add_attribute( Key::from_static_str("processed_by"), AnyValue::String("FirstProcessor".into()), ); // Update body - data.record.body = Some(AnyValue::String("Updated by FirstProcessor".into())); + record.body = Some(AnyValue::String("Updated by FirstProcessor".into())); + + // Convert the modified LogData to an owned version + let owned_data = LogData { + record: Cow::Owned(record.clone()), // Since record is already owned, no need to clone deeply + instrumentation: Cow::Owned(data.instrumentation.clone().into_owned()), + }; - self.logs.lock().unwrap().push(data.clone()); // Clone as the LogProcessor is storing the data. + self.logs.lock().unwrap().push(owned_data); // Clone as the LogProcessor is storing the data. } #[cfg(feature = "logs_level_enabled")] @@ -847,11 +859,11 @@ mod tests { #[derive(Debug)] struct SecondProcessor { - pub(crate) logs: Arc>>, + pub(crate) logs: Arc>>>, } impl LogProcessor for SecondProcessor { - fn emit(&self, data: &mut LogData) { + fn emit(&self, data: &mut LogData<'_>) { assert!(data.record.attributes_contains( &Key::from_static_str("processed_by"), &AnyValue::String("FirstProcessor".into()) @@ -860,7 +872,13 @@ mod tests { data.record.body.clone().unwrap() == AnyValue::String("Updated by FirstProcessor".into()) ); - self.logs.lock().unwrap().push(data.clone()); + // Convert the modified LogData to an owned version before storing it + let record = data.record.to_mut(); + let owned_data = LogData { + record: Cow::Owned(record.clone()), // Convert the record to owned + instrumentation: Cow::Owned(data.instrumentation.clone().into_owned()), + }; + self.logs.lock().unwrap().push(owned_data); } #[cfg(feature = "logs_level_enabled")] diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index 8068fafaec..e73a6e27c1 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -39,11 +39,20 @@ use std::sync::{Arc, Mutex}; /// #[derive(Clone, Debug)] pub struct InMemoryLogsExporter { - logs: Arc>>, + logs: Arc>>, resource: Arc>, should_reset_on_shutdown: bool, } +/// `OwnedLogData` represents a single log event without resource context. +#[derive(Debug, Clone)] +pub struct OwnedLogData { + /// Log record, which can be borrowed or owned. + pub record: LogRecord, + /// Instrumentation details for the emitter who produced this `LogEvent`. + pub instrumentation: InstrumentationLibrary, +} + impl Default for InMemoryLogsExporter { fn default() -> Self { InMemoryLogsExporterBuilder::new().build() @@ -175,10 +184,14 @@ impl InMemoryLogsExporter { #[async_trait] impl LogExporter for InMemoryLogsExporter { - async fn export<'a>(&mut self, batch: Vec>) -> LogResult<()> { + async fn export<'a>(&mut self, batch: Vec>>) -> LogResult<()> { let mut logs_guard = self.logs.lock().map_err(LogError::from)?; for log in batch.into_iter() { - logs_guard.push(log.into_owned()); + let owned_log = OwnedLogData { + record: log.record.clone().into_owned(), + instrumentation: log.instrumentation.clone().into_owned(), + }; + logs_guard.push(owned_log); } Ok(()) } diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index dacefa3d8b..6befdf0fa7 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -45,7 +45,7 @@ impl fmt::Debug for LogExporter { #[async_trait] impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { /// Export spans to stdout - async fn export<'a>(&mut self, batch: Vec>) -> ExportResult { + async fn export<'a>(&mut self, batch: Vec>>) -> ExportResult { if let Some(writer) = &mut self.writer { // TODO - Avoid cloning logdata if it is borrowed. let log_data = crate::logs::transform::LogData::from(( diff --git a/opentelemetry-stdout/src/logs/transform.rs b/opentelemetry-stdout/src/logs/transform.rs index 2f3199bd25..4a55a8a1ee 100644 --- a/opentelemetry-stdout/src/logs/transform.rs +++ b/opentelemetry-stdout/src/logs/transform.rs @@ -16,7 +16,7 @@ pub struct LogData { impl From<( - Vec, + Vec>, &opentelemetry_sdk::Resource, )> for LogData { @@ -31,7 +31,7 @@ impl for sdk_log in sdk_logs { let resource_schema_url = sdk_resource.schema_url().map(|s| s.to_string().into()); let schema_url = sdk_log.instrumentation.schema_url.clone(); - let scope: Scope = sdk_log.instrumentation.clone().into(); + let scope: Scope = sdk_log.instrumentation.clone().into_owned().into(); let resource: Resource = sdk_resource.into(); let rl = resource_logs @@ -104,7 +104,7 @@ struct LogRecord { trace_id: Option, } -impl From for LogRecord { +impl From> for LogRecord { fn from(value: opentelemetry_sdk::export::logs::LogData) -> Self { LogRecord { attributes: { @@ -142,6 +142,7 @@ impl From for LogRecord { flags: value .record .trace_context + .as_ref() .map(|c| c.trace_flags.map(|f| f.to_u8())) .unwrap_or_default(), time_unix_nano: value.record.timestamp, @@ -154,8 +155,8 @@ impl From for LogRecord { .map(|u| u as u32) .unwrap_or_default(), dropped_attributes_count: 0, - severity_text: value.record.severity_text, - body: value.record.body.map(|a| a.into()), + severity_text: value.record.severity_text.clone(), + body: value.record.body.clone().map(|a| a.into()), } } }