Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl LogExporter {
impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
async fn export<'a>(
&mut self,
batch: Vec<std::borrow::Cow<'a, LogData>>,
batch: Vec<std::borrow::Cow<'a, LogData<'a>>>,
) -> opentelemetry::logs::LogResult<()> {
self.client.export(batch).await
}
Expand Down
34 changes: 34 additions & 0 deletions opentelemetry-proto/src/transform/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,40 @@ pub mod tonic {
}
}

impl
From<(
Cow<'_, opentelemetry_sdk::InstrumentationLibrary>,
Option<Cow<'static, str>>,
)> for InstrumentationScope
{
fn from(
data: (
Cow<'_, opentelemetry_sdk::InstrumentationLibrary>,
Option<Cow<'static, str>>,
),
) -> Self {
let (library, target) = data;
if let Some(t) = target {
InstrumentationScope {
name: t.to_string(),
version: String::new(),
attributes: vec![],
..Default::default()
}
} else {
InstrumentationScope {
name: library.name.clone().into_owned(),
version: library
.version
.as_ref()
.map(ToString::to_string)
.unwrap_or_default(),
attributes: Attributes::from(library.attributes.clone()).0,
..Default::default()
}
}
}
}
/// Wrapper type for Vec<`KeyValue`>
#[derive(Default, Debug)]
pub struct Attributes(pub ::std::vec::Vec<crate::proto::tonic::common::v1::KeyValue>);
Expand Down
53 changes: 34 additions & 19 deletions opentelemetry-proto/src/transform/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ pub mod tonic {

impl
From<(
opentelemetry_sdk::export::logs::LogData,
opentelemetry_sdk::export::logs::LogData<'_>,
&ResourceAttributesWithSchema,
)> for ResourceLogs
{
Expand All @@ -164,30 +164,35 @@ pub mod tonic {
.clone()
.map(Into::into)
.unwrap_or_default(),
scope: Some((log_data.instrumentation, log_data.record.target.clone()).into()),
log_records: vec![log_data.record.into()],
scope: Some(
(
log_data.instrumentation.into_owned(),
log_data.record.target.clone(),
)
.into(),
),
log_records: vec![log_data.record.into_owned().into()],
}],
}
}
}

pub fn group_logs_by_resource_and_scope(
logs: Vec<opentelemetry_sdk::export::logs::LogData>,
logs: Vec<opentelemetry_sdk::export::logs::LogData<'_>>,
resource: &ResourceAttributesWithSchema,
) -> Vec<ResourceLogs> {
// Group logs by target or instrumentation name
let scope_map = logs.iter().fold(
HashMap::new(),
|mut scope_map: HashMap<
Cow<'static, str>,
Vec<&opentelemetry_sdk::export::logs::LogData>,
Vec<&opentelemetry_sdk::export::logs::LogData<'_>>,
>,
log| {
let key = log
.record
.target
.clone()
.unwrap_or_else(|| log.instrumentation.name.clone());
let key =
log.record.target.clone().unwrap_or_else(|| {
Cow::Owned(log.instrumentation.name.clone().into_owned())
});
scope_map.entry(key).or_default().push(log);
scope_map
},
Expand All @@ -197,13 +202,20 @@ pub mod tonic {
.into_iter()
.map(|(key, log_data)| ScopeLogs {
scope: Some(InstrumentationScope::from((
&log_data.first().unwrap().instrumentation,
Some(key),
Cow::Owned(
log_data
.first()
.unwrap()
.instrumentation
.clone()
.into_owned(),
),
Some(key.into_owned().into()),
))),
schema_url: resource.schema_url.clone().unwrap_or_default(),
log_records: log_data
.into_iter()
.map(|log_data| log_data.record.clone().into())
.map(|log_data| log_data.record.clone().into_owned().into())
.collect(),
})
.collect();
Expand All @@ -225,18 +237,21 @@ mod tests {
use opentelemetry::logs::LogRecord as _;
use opentelemetry_sdk::export::logs::LogData;
use opentelemetry_sdk::{logs::LogRecord, Resource};
use std::borrow::Cow;
use std::time::SystemTime;

fn create_test_log_data(instrumentation_name: &str, _message: &str) -> LogData {
fn create_test_log_data<'a>(instrumentation_name: &str, _message: &str) -> LogData<'a> {
let mut logrecord = LogRecord::default();
logrecord.set_timestamp(SystemTime::now());
logrecord.set_observed_timestamp(SystemTime::now());
LogData {
instrumentation: opentelemetry_sdk::InstrumentationLibrary::builder(
instrumentation_name.to_string(),
)
.build(),
record: logrecord,
instrumentation: Cow::Owned(
opentelemetry_sdk::InstrumentationLibrary::builder(
instrumentation_name.to_string(),
)
.build(),
),
record: Cow::Owned(logrecord),
}
}

Expand Down
10 changes: 5 additions & 5 deletions opentelemetry-sdk/src/export/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::fmt::Debug;
#[async_trait]
pub trait LogExporter: Send + Sync + Debug {
/// Exports a batch of [`LogData`].
async fn export<'a>(&mut self, batch: Vec<Cow<'a, LogData>>) -> LogResult<()>;
async fn export<'a>(&mut self, batch: Vec<Cow<'a, LogData<'a>>>) -> LogResult<()>;
/// Shuts down the exporter.
fn shutdown(&mut self) {}
#[cfg(feature = "logs_level_enabled")]
Expand All @@ -29,11 +29,11 @@ pub trait LogExporter: Send + Sync + Debug {

/// `LogData` represents a single log event without resource context.
#[derive(Clone, Debug)]
pub struct LogData {
/// Log record
pub record: LogRecord,
pub struct LogData<'a> {
/// Log record, which can be borrowed or owned.
pub record: Cow<'a, LogRecord>,
/// Instrumentation details for the emitter who produced this `LogEvent`.
pub instrumentation: InstrumentationLibrary,
pub instrumentation: Cow<'a, InstrumentationLibrary>,
}

/// Describes the result of an export.
Expand Down
8 changes: 4 additions & 4 deletions opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,8 @@ impl opentelemetry::logs::Logger for Logger {
}

let mut data = LogData {
record: log_record,
instrumentation: self.instrumentation_library().clone(),
record: Cow::Borrowed(&log_record),
instrumentation: Cow::Borrowed(self.instrumentation_library()),
};

for p in processors {
Expand Down Expand Up @@ -328,7 +328,7 @@ mod tests {
}

impl LogProcessor for ShutdownTestLogProcessor {
fn emit(&self, _data: &mut LogData) {
fn emit(&self, _data: &mut LogData<'_>) {
self.is_shutdown
.lock()
.map(|is_shutdown| {
Expand Down Expand Up @@ -563,7 +563,7 @@ mod tests {
}

impl LogProcessor for LazyLogProcessor {
fn emit(&self, _data: &mut LogData) {
fn emit(&self, _data: &mut LogData<'_>) {
// nothing to do.
}

Expand Down
48 changes: 33 additions & 15 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub trait LogProcessor: Send + Sync + Debug {
///
/// # Parameters
/// - `data`: A mutable reference to `LogData` representing the log record.
fn emit(&self, data: &mut LogData);
fn emit(&self, data: &mut LogData<'_>);
/// Force the logs lying in the cache to be exported.
fn force_flush(&self) -> LogResult<()>;
/// Shuts down the processor.
Expand Down Expand Up @@ -90,7 +90,7 @@ impl SimpleLogProcessor {
}

impl LogProcessor for SimpleLogProcessor {
fn emit(&self, data: &mut LogData) {
fn emit(&self, data: &mut LogData<'_>) {
// noop after shutdown
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
return;
Expand Down Expand Up @@ -152,10 +152,14 @@ impl<R: RuntimeChannel> Debug for BatchLogProcessor<R> {
}

impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
fn emit(&self, data: &mut LogData) {
fn emit(&self, data: &mut LogData<'_>) {
let owned_data = LogData {
record: Cow::Owned(data.record.clone().into_owned()),
instrumentation: Cow::Owned(data.instrumentation.clone().into_owned()),
};
let result = self
.message_sender
.try_send(BatchMessage::ExportLog(data.clone()));
.try_send(BatchMessage::ExportLog(owned_data));

if let Err(err) = result {
global::handle_error(LogError::Other(err.into()));
Expand Down Expand Up @@ -307,7 +311,7 @@ async fn export_with_timeout<'a, R, E>(
time_out: Duration,
exporter: &mut E,
runtime: &R,
batch: Vec<Cow<'a, LogData>>,
batch: Vec<Cow<'a, LogData<'a>>>,
) -> ExportResult
where
R: RuntimeChannel,
Expand Down Expand Up @@ -497,7 +501,7 @@ where
#[derive(Debug)]
enum BatchMessage {
/// Export logs, usually called when the log is emitted.
ExportLog(LogData),
ExportLog(LogData<'static>),
/// Flush the current buffer to the backend, it can be triggered by
/// pre configured interval or a call to `force_push` function.
Flush(Option<oneshot::Sender<ExportResult>>),
Expand Down Expand Up @@ -545,7 +549,7 @@ mod tests {

#[async_trait]
impl LogExporter for MockLogExporter {
async fn export<'a>(&mut self, _batch: Vec<Cow<'a, LogData>>) -> LogResult<()> {
async fn export<'a>(&mut self, _batch: Vec<Cow<'a, LogData<'a>>>) -> LogResult<()> {
Ok(())
}

Expand Down Expand Up @@ -814,21 +818,29 @@ mod tests {

#[derive(Debug)]
struct FirstProcessor {
pub(crate) logs: Arc<Mutex<Vec<LogData>>>,
pub(crate) logs: Arc<Mutex<Vec<LogData<'static>>>>,
}

impl LogProcessor for FirstProcessor {
fn emit(&self, data: &mut LogData) {
fn emit(&self, data: &mut LogData<'_>) {
// Ensure the record is owned before modifying
let record = data.record.to_mut();
// Add attribute
data.record.add_attribute(
record.add_attribute(
Key::from_static_str("processed_by"),
AnyValue::String("FirstProcessor".into()),
);

// Update body
data.record.body = Some(AnyValue::String("Updated by FirstProcessor".into()));
record.body = Some(AnyValue::String("Updated by FirstProcessor".into()));

// Convert the modified LogData to an owned version
let owned_data = LogData {
record: Cow::Owned(record.clone()), // Since record is already owned, no need to clone deeply
instrumentation: Cow::Owned(data.instrumentation.clone().into_owned()),
};

self.logs.lock().unwrap().push(data.clone()); // Clone as the LogProcessor is storing the data.
self.logs.lock().unwrap().push(owned_data); // Clone as the LogProcessor is storing the data.
}

#[cfg(feature = "logs_level_enabled")]
Expand All @@ -847,11 +859,11 @@ mod tests {

#[derive(Debug)]
struct SecondProcessor {
pub(crate) logs: Arc<Mutex<Vec<LogData>>>,
pub(crate) logs: Arc<Mutex<Vec<LogData<'static>>>>,
}

impl LogProcessor for SecondProcessor {
fn emit(&self, data: &mut LogData) {
fn emit(&self, data: &mut LogData<'_>) {
assert!(data.record.attributes_contains(
&Key::from_static_str("processed_by"),
&AnyValue::String("FirstProcessor".into())
Expand All @@ -860,7 +872,13 @@ mod tests {
data.record.body.clone().unwrap()
== AnyValue::String("Updated by FirstProcessor".into())
);
self.logs.lock().unwrap().push(data.clone());
// Convert the modified LogData to an owned version before storing it
let record = data.record.to_mut();
let owned_data = LogData {
record: Cow::Owned(record.clone()), // Convert the record to owned
instrumentation: Cow::Owned(data.instrumentation.clone().into_owned()),
};
self.logs.lock().unwrap().push(owned_data);
}

#[cfg(feature = "logs_level_enabled")]
Expand Down
19 changes: 16 additions & 3 deletions opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,20 @@ use std::sync::{Arc, Mutex};
///
#[derive(Clone, Debug)]
pub struct InMemoryLogsExporter {
logs: Arc<Mutex<Vec<LogData>>>,
logs: Arc<Mutex<Vec<OwnedLogData>>>,
resource: Arc<Mutex<Resource>>,
should_reset_on_shutdown: bool,
}

/// `OwnedLogData` represents a single log event without resource context.
#[derive(Debug, Clone)]
pub struct OwnedLogData {
/// Log record, which can be borrowed or owned.
pub record: LogRecord,
/// Instrumentation details for the emitter who produced this `LogEvent`.
pub instrumentation: InstrumentationLibrary,
}

impl Default for InMemoryLogsExporter {
fn default() -> Self {
InMemoryLogsExporterBuilder::new().build()
Expand Down Expand Up @@ -175,10 +184,14 @@ impl InMemoryLogsExporter {

#[async_trait]
impl LogExporter for InMemoryLogsExporter {
async fn export<'a>(&mut self, batch: Vec<Cow<'a, LogData>>) -> LogResult<()> {
async fn export<'a>(&mut self, batch: Vec<Cow<'a, LogData<'a>>>) -> LogResult<()> {
let mut logs_guard = self.logs.lock().map_err(LogError::from)?;
for log in batch.into_iter() {
logs_guard.push(log.into_owned());
let owned_log = OwnedLogData {
record: log.record.clone().into_owned(),
instrumentation: log.instrumentation.clone().into_owned(),
};
logs_guard.push(owned_log);
}
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-stdout/src/logs/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl fmt::Debug for LogExporter {
#[async_trait]
impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
/// Export spans to stdout
async fn export<'a>(&mut self, batch: Vec<Cow<'a, LogData>>) -> ExportResult {
async fn export<'a>(&mut self, batch: Vec<Cow<'a, LogData<'a>>>) -> ExportResult {
if let Some(writer) = &mut self.writer {
// TODO - Avoid cloning logdata if it is borrowed.
let log_data = crate::logs::transform::LogData::from((
Expand Down
Loading