From 999146b0df9d639a9ff9746d1b7f625b86454e70 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Tue, 18 Mar 2025 07:40:12 -0700 Subject: [PATCH 1/2] SimpleProcessor for Logs simplified --- .../src/logs/in_memory_exporter.rs | 3 + .../src/logs/simple_log_processor.rs | 96 +++++++------------ 2 files changed, 39 insertions(+), 60 deletions(-) diff --git a/opentelemetry-sdk/src/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/logs/in_memory_exporter.rs index eb83033bdb..f19c88e8ff 100644 --- a/opentelemetry-sdk/src/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/logs/in_memory_exporter.rs @@ -198,6 +198,9 @@ impl InMemoryLogExporter { impl LogExporter for InMemoryLogExporter { async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult { + if self.is_shutdown_called() { + return Err(OTelSdkError::AlreadyShutdown); + } let mut logs_guard = self.logs.lock().map_err(|e| { OTelSdkError::InternalFailure(format!("Failed to lock logs for export: {}", e)) })?; diff --git a/opentelemetry-sdk/src/logs/simple_log_processor.rs b/opentelemetry-sdk/src/logs/simple_log_processor.rs index 4f12f553d6..4970182970 100644 --- a/opentelemetry-sdk/src/logs/simple_log_processor.rs +++ b/opentelemetry-sdk/src/logs/simple_log_processor.rs @@ -16,18 +16,17 @@ //! +-----+---------------+ +-----------------------+ +-------------------+ //! ``` -use crate::error::{OTelSdkError, OTelSdkResult}; +use crate::error::OTelSdkResult; use crate::logs::log_processor::LogProcessor; use crate::{ logs::{LogBatch, LogExporter, SdkLogRecord}, Resource, }; -use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope}; +use opentelemetry::{otel_warn, InstrumentationScope}; use std::fmt::Debug; -use std::sync::atomic::AtomicBool; -use std::sync::Mutex; +use std::sync::atomic::{AtomicBool, Ordering}; /// A [`LogProcessor`] designed for testing and debugging purpose, that immediately /// exports log records as they are emitted. Log records are exported synchronously @@ -60,54 +59,47 @@ use std::sync::Mutex; /// #[derive(Debug)] pub struct SimpleLogProcessor { - exporter: Mutex, - is_shutdown: AtomicBool, + exporter: T, + is_exporting: AtomicBool, } impl SimpleLogProcessor { /// Creates a new instance of `SimpleLogProcessor`. pub fn new(exporter: T) -> Self { SimpleLogProcessor { - exporter: Mutex::new(exporter), - is_shutdown: AtomicBool::new(false), + exporter, + is_exporting: AtomicBool::new(false), } } } impl LogProcessor for SimpleLogProcessor { fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) { - // noop after shutdown - if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { - // this is a warning, as the user is trying to log after the processor has been shutdown - otel_warn!( - name: "SimpleLogProcessor.Emit.ProcessorShutdown", - ); - return; + // export() does not require mutable self and can be called in parallel + // with other export() calls. However, OTel Spec requires that + // existing export() must be completed before the next export() call. + while !self + .is_exporting + .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + { + // Another thread is currently exporting, yield to let other work proceed + std::thread::yield_now(); } - let result = self - .exporter - .lock() - .map_err(|_| OTelSdkError::InternalFailure("SimpleLogProcessor mutex poison".into())) - .and_then(|exporter| { - let log_tuple = &[(record as &SdkLogRecord, instrumentation)]; - futures_executor::block_on(exporter.export(LogBatch::new(log_tuple))) - }); - // Handle errors with specific static names - match result { - Err(OTelSdkError::InternalFailure(_)) => { - // logging as debug as this is not a user error - otel_debug!( - name: "SimpleLogProcessor.Emit.MutexPoisoning", - ); - } - Err(err) => { - otel_error!( - name: "SimpleLogProcessor.Emit.ExportError", - error = format!("{}",err) - ); - } - _ => {} + // We now have exclusive access to export + let result = { + let log_tuple = &[(record as &SdkLogRecord, instrumentation)]; + futures_executor::block_on(self.exporter.export(LogBatch::new(log_tuple))) + }; + + // Release the lock + self.is_exporting.store(false, Ordering::Release); + if let Err(err) = result { + otel_warn!( + name: "SimpleLogProcessor.Emit.ExportError", + error = format!("{}",err) + ); } } @@ -116,21 +108,11 @@ impl LogProcessor for SimpleLogProcessor { } fn shutdown(&self) -> OTelSdkResult { - self.is_shutdown - .store(true, std::sync::atomic::Ordering::Relaxed); - if let Ok(exporter) = self.exporter.lock() { - exporter.shutdown() - } else { - Err(OTelSdkError::InternalFailure( - "SimpleLogProcessor mutex poison at shutdown".into(), - )) - } + self.exporter.shutdown() } fn set_resource(&mut self, resource: &Resource) { - if let Ok(mut exporter) = self.exporter.lock() { - exporter.set_resource(resource); - } + self.exporter.set_resource(resource); } #[cfg(feature = "spec_unstable_logs_enabled")] @@ -140,11 +122,7 @@ impl LogProcessor for SimpleLogProcessor { target: &str, name: Option<&str>, ) -> bool { - if let Ok(exporter) = self.exporter.lock() { - exporter.event_enabled(level, target, name) - } else { - true - } + self.exporter.event_enabled(level, target, name) } } @@ -231,13 +209,11 @@ mod tests { processor.shutdown().unwrap(); - let is_shutdown = processor - .is_shutdown - .load(std::sync::atomic::Ordering::Relaxed); - assert!(is_shutdown); - processor.emit(&mut record, &instrumentation); + // Emit was called after shutdown. While SimpleLogProcessor + // does not care, the exporter in this case does, + // and it ignores the export() calls after shutdown. assert_eq!(1, exporter.get_emitted_logs().unwrap().len()); assert!(exporter.is_shutdown_called()); } From 0ec25365ae7a6d207360bcdb3f34db205bbeae35 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Wed, 19 Mar 2025 08:42:59 -0700 Subject: [PATCH 2/2] no spin just block --- .../src/logs/simple_log_processor.rs | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/opentelemetry-sdk/src/logs/simple_log_processor.rs b/opentelemetry-sdk/src/logs/simple_log_processor.rs index a0a4139c6e..a395a091cb 100644 --- a/opentelemetry-sdk/src/logs/simple_log_processor.rs +++ b/opentelemetry-sdk/src/logs/simple_log_processor.rs @@ -26,7 +26,7 @@ use crate::{ use opentelemetry::{otel_warn, InstrumentationScope}; use std::fmt::Debug; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Mutex; /// A [`LogProcessor`] designed for testing and debugging purpose, that immediately /// exports log records as they are emitted. Log records are exported synchronously @@ -60,7 +60,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; #[derive(Debug)] pub struct SimpleLogProcessor { exporter: T, - is_exporting: AtomicBool, + export_mutex: Mutex<()>, } impl SimpleLogProcessor { @@ -68,7 +68,7 @@ impl SimpleLogProcessor { pub fn new(exporter: T) -> Self { SimpleLogProcessor { exporter, - is_exporting: AtomicBool::new(false), + export_mutex: Mutex::new(()), } } } @@ -78,23 +78,13 @@ impl LogProcessor for SimpleLogProcessor { // export() does not require mutable self and can be called in parallel // with other export() calls. However, OTel Spec requires that // existing export() must be completed before the next export() call. - while !self - .is_exporting - .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) - .is_ok() - { - // Another thread is currently exporting, yield to let other work proceed - std::thread::yield_now(); - } + let _guard = self.export_mutex.lock().unwrap(); // We now have exclusive access to export let result = { let log_tuple = &[(record as &SdkLogRecord, instrumentation)]; futures_executor::block_on(self.exporter.export(LogBatch::new(log_tuple))) }; - - // Release the lock - self.is_exporting.store(false, Ordering::Release); if let Err(err) = result { otel_warn!( name: "SimpleLogProcessor.Emit.ExportError",