From d7fc754a077ccd4c56020aa15c3ca62fb3479263 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 8 Oct 2024 12:22:06 -0700 Subject: [PATCH 01/11] initial commit --- opentelemetry-sdk/src/logs/log_emitter.rs | 21 ++++--- opentelemetry-sdk/src/logs/log_processor.rs | 61 ++++++-------------- opentelemetry/src/global/internal_logging.rs | 31 +++++++++- 3 files changed, 60 insertions(+), 53 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 0317a33774..0ded7f2dd7 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -1,9 +1,8 @@ use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, TraceContext}; use crate::{export::logs::LogExporter, runtime::RuntimeChannel, Resource}; -use opentelemetry::otel_warn; use opentelemetry::{ - global, logs::{LogError, LogResult}, + otel_error, otel_warn, trace::TraceContextExt, Context, InstrumentationLibrary, }; @@ -114,9 +113,9 @@ impl LoggerProvider { let mut errs = vec![]; for processor in &self.inner.processors { if let Err(err) = processor.shutdown() { - otel_warn!( - name: "logger_provider_shutdown_error", - error = format!("{:?}", err) + otel_error!( + name: "LoggerProvider.Shutdown.Error", + error = err ); errs.push(err); } @@ -128,10 +127,13 @@ impl LoggerProvider { Err(LogError::Other(format!("{errs:?}").into())) } } else { + let error = LogError::Other("logger provider already shut down".into()); + otel_warn!( - name: "logger_provider_already_shutdown" + name: "LoggerProvider.Shutdown.AlreadyShutdown", + error = error ); - Err(LogError::Other("logger provider already shut down".into())) + Err(error) } } } @@ -146,7 +148,10 @@ impl Drop for LoggerProviderInner { fn drop(&mut self) { for processor in &mut self.processors { if let Err(err) = processor.shutdown() { - global::handle_error(err); + otel_warn!( + name: "LoggerProvider.Drop.AlreadyShutdown", + error = err + ); } } } diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index e6578c7f7e..16e191b8d2 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -100,7 +100,8 @@ impl LogProcessor for SimpleLogProcessor { // noop after shutdown if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { otel_warn!( - name: "simple_log_processor_emit_after_shutdown" + name: "SimpleLogProcessor.Export.AfterShutdown", + error = LogError::Other("Attempted to export a log after processor shutdown".into()) ); return; } @@ -115,10 +116,9 @@ impl LogProcessor for SimpleLogProcessor { }); if let Err(err) = result { otel_error!( - name: "simple_log_processor_emit_error", - error = format!("{:?}", err) + name: "SimpleLogProcessor.Export.Error", + error = err ); - global::handle_error(err); } } @@ -133,9 +133,6 @@ impl LogProcessor for SimpleLogProcessor { exporter.shutdown(); Ok(()) } else { - otel_error!( - name: "simple_log_processor_shutdown_error" - ); Err(LogError::Other( "simple logprocessor mutex poison during shutdown".into(), )) @@ -172,8 +169,8 @@ impl LogProcessor for BatchLogProcessor { if let Err(err) = result { otel_error!( - name: "batch_log_processor_emit_error", - error = format!("{:?}", err) + name: "BatchLogProcessor.Export.Error", + error = err ); global::handle_error(LogError::Other(err.into())); } @@ -243,10 +240,9 @@ impl BatchLogProcessor { if let Err(err) = result { otel_error!( - name: "batch_log_processor_export_error", - error = format!("{:?}", err) + name: "BatchLogProcessor.Export.Error", + error = err ); - global::handle_error(err); } } } @@ -261,24 +257,12 @@ impl BatchLogProcessor { .await; if let Some(channel) = res_channel { - if let Err(result) = channel.send(result) { - global::handle_error(LogError::from(format!( - "failed to send flush result: {:?}", - result - ))); + if let Err(send_error) = channel.send(result) { otel_error!( - name: "batch_log_processor_flush_error", - error = format!("{:?}", result), - message = "Failed to send flush result" + name: "BatchLogProcessor.Flush.SendResultError", + error = format!("{:?}", send_error), ); } - } else if let Err(err) = result { - otel_error!( - name: "batch_log_processor_flush_error", - error = format!("{:?}", err), - message = "Flush failed" - ); - global::handle_error(err); } } // Stream has terminated or processor is shutdown, return to finish execution. @@ -294,15 +278,12 @@ impl BatchLogProcessor { exporter.shutdown(); if let Err(result) = ch.send(result) { - otel_error!( - name: "batch_log_processor_shutdown_error", - error = format!("{:?}", result), - message = "Failed to send shutdown result" - ); - global::handle_error(LogError::from(format!( - "failed to send batch processor shutdown result: {:?}", - result - ))); + if let Err(err) = result { + otel_error!( + name: "BatchLogProcessor.Shutdown.SendResultError", + error = err + ); + } } break; @@ -357,13 +338,7 @@ where pin_mut!(timeout); match future::select(export, timeout).await { Either::Left((export_res, _)) => export_res, - Either::Right((_, _)) => { - otel_error!( - name: "export_with_timeout_timeout", - timeout_duration = time_out.as_millis() - ); - ExportResult::Err(LogError::ExportTimedOut(time_out)) - } + Either::Right((_, _)) => ExportResult::Err(LogError::ExportTimedOut(time_out)), } } diff --git a/opentelemetry/src/global/internal_logging.rs b/opentelemetry/src/global/internal_logging.rs index 3a5c24ba69..4ba5170902 100644 --- a/opentelemetry/src/global/internal_logging.rs +++ b/opentelemetry/src/global/internal_logging.rs @@ -54,7 +54,13 @@ macro_rules! otel_warn { (name: $name:expr, $($key:ident = $value:expr),+ $(,)?) => { #[cfg(feature = "internal-logs")] { - tracing::warn!(name: $name, target: env!("CARGO_PKG_NAME"), $($key = $value),+, ""); + tracing::warn!(name: $name, + target: env!("CARGO_PKG_NAME"), + $($key = { + $crate::format_value!($value) + }),+, + "" + ) } }; } @@ -108,7 +114,28 @@ macro_rules! otel_error { (name: $name:expr, $($key:ident = $value:expr),+ $(,)?) => { #[cfg(feature = "internal-logs")] { - tracing::error!(name: $name, target: env!("CARGO_PKG_NAME"), $($key = $value),+, ""); + tracing::error!(name: $name, + target: env!("CARGO_PKG_NAME"), + $($key = { + $crate::format_value!($value) + }),+, + "" + ) } }; } + +/// Helper macro to format a value using Debug if available, falling back to Display +#[macro_export] +macro_rules! format_value { + ($value:expr) => {{ + // Try Debug first + let debug_result = std::fmt::format(format_args!("{:?}", $value)); + if debug_result.starts_with('<') || debug_result.contains("::") { + // Contains module path or starts with generic angle brackets + format!("{}", $value) + } else { + debug_result + } + }}; +} From 76bafefc731217762c5ca157716010e47bb49e62 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 8 Oct 2024 13:32:13 -0700 Subject: [PATCH 02/11] comments --- opentelemetry-sdk/src/logs/log_emitter.rs | 6 ++--- opentelemetry-sdk/src/logs/log_processor.rs | 16 ++++-------- opentelemetry/src/global/internal_logging.rs | 27 ++++++++++++++++++++ 3 files changed, 34 insertions(+), 15 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 0ded7f2dd7..85282d4828 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -115,7 +115,7 @@ impl LoggerProvider { if let Err(err) = processor.shutdown() { otel_error!( name: "LoggerProvider.Shutdown.Error", - error = err + error = format!("{err}") ); errs.push(err); } @@ -128,10 +128,8 @@ impl LoggerProvider { } } else { let error = LogError::Other("logger provider already shut down".into()); - otel_warn!( name: "LoggerProvider.Shutdown.AlreadyShutdown", - error = error ); Err(error) } @@ -149,7 +147,7 @@ impl Drop for LoggerProviderInner { for processor in &mut self.processors { if let Err(err) = processor.shutdown() { otel_warn!( - name: "LoggerProvider.Drop.AlreadyShutdown", + name: "LoggerProvider.Drop.ShutdownError", error = err ); } diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 16e191b8d2..2bf3c29061 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -12,7 +12,6 @@ use futures_util::{ #[cfg(feature = "logs_level_enabled")] use opentelemetry::logs::Severity; use opentelemetry::{ - global, logs::{LogError, LogResult}, otel_error, otel_warn, InstrumentationLibrary, }; @@ -172,7 +171,6 @@ impl LogProcessor for BatchLogProcessor { name: "BatchLogProcessor.Export.Error", error = err ); - global::handle_error(LogError::Other(err.into())); } } @@ -277,18 +275,14 @@ impl BatchLogProcessor { exporter.shutdown(); - if let Err(result) = ch.send(result) { - if let Err(err) = result { - otel_error!( - name: "BatchLogProcessor.Shutdown.SendResultError", - error = err - ); - } + if let Err(send_error) = ch.send(result) { + otel_error!( + name: "BatchLogProcessor.Shutdown.SendResultError", + error = format!("{:?}", send_error), + ); } - break; } - // propagate the resource BatchMessage::SetResource(resource) => { exporter.set_resource(&resource); diff --git a/opentelemetry/src/global/internal_logging.rs b/opentelemetry/src/global/internal_logging.rs index 4ba5170902..57c30e1df0 100644 --- a/opentelemetry/src/global/internal_logging.rs +++ b/opentelemetry/src/global/internal_logging.rs @@ -50,6 +50,13 @@ macro_rules! otel_warn { { tracing::warn!(name: $name, target: env!("CARGO_PKG_NAME"), ""); } + #[cfg(not(feature = "internal-logs"))] + { + #[allow(unused_variables)] + { + + } + } }; (name: $name:expr, $($key:ident = $value:expr),+ $(,)?) => { #[cfg(feature = "internal-logs")] @@ -62,6 +69,12 @@ macro_rules! otel_warn { "" ) } + #[cfg(not(feature = "internal-logs"))] + { + { + let _ = ($name, $($value),+); + } + } }; } @@ -110,6 +123,13 @@ macro_rules! otel_error { { tracing::error!(name: $name, target: env!("CARGO_PKG_NAME"), ""); } + #[cfg(not(feature = "internal-logs"))] + { + #[allow(unused_variables)] + { + + } + } }; (name: $name:expr, $($key:ident = $value:expr),+ $(,)?) => { #[cfg(feature = "internal-logs")] @@ -122,6 +142,13 @@ macro_rules! otel_error { "" ) } + #[cfg(not(feature = "internal-logs"))] + { + { + let _ = ($name, $($value),+); + + } + } }; } From 96e652aceab6be873abd8fbe071faff964977240 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 8 Oct 2024 17:35:45 -0700 Subject: [PATCH 03/11] make SendResultError as warning --- opentelemetry-sdk/src/logs/log_processor.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 2bf3c29061..dec02d5cfa 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -256,7 +256,7 @@ impl BatchLogProcessor { if let Some(channel) = res_channel { if let Err(send_error) = channel.send(result) { - otel_error!( + otel_warn!( name: "BatchLogProcessor.Flush.SendResultError", error = format!("{:?}", send_error), ); @@ -276,7 +276,7 @@ impl BatchLogProcessor { exporter.shutdown(); if let Err(send_error) = ch.send(result) { - otel_error!( + otel_warn!( name: "BatchLogProcessor.Shutdown.SendResultError", error = format!("{:?}", send_error), ); From 886a58328dc5b4ad0ae65eee6e0f846072e95a8a Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Tue, 8 Oct 2024 17:40:33 -0700 Subject: [PATCH 04/11] simplify internal logs formatting --- opentelemetry/src/global/internal_logging.rs | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/opentelemetry/src/global/internal_logging.rs b/opentelemetry/src/global/internal_logging.rs index 57c30e1df0..71c3deb786 100644 --- a/opentelemetry/src/global/internal_logging.rs +++ b/opentelemetry/src/global/internal_logging.rs @@ -152,17 +152,10 @@ macro_rules! otel_error { }; } -/// Helper macro to format a value using Debug if available, falling back to Display +/// Helper macro to format a value #[macro_export] macro_rules! format_value { ($value:expr) => {{ - // Try Debug first - let debug_result = std::fmt::format(format_args!("{:?}", $value)); - if debug_result.starts_with('<') || debug_result.contains("::") { - // Contains module path or starts with generic angle brackets - format!("{}", $value) - } else { - debug_result - } + format!("{:?}", $value) }}; } From 0d5bf142290136ee01d07c50edd25d4d96268d3c Mon Sep 17 00:00:00 2001 From: Lalit Date: Tue, 8 Oct 2024 19:02:05 -0700 Subject: [PATCH 05/11] change non-actionable error to debug --- opentelemetry-sdk/src/logs/log_processor.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index dec02d5cfa..846d087c64 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -13,7 +13,7 @@ use futures_util::{ use opentelemetry::logs::Severity; use opentelemetry::{ logs::{LogError, LogResult}, - otel_error, otel_warn, InstrumentationLibrary, + otel_debug, otel_error, otel_warn, InstrumentationLibrary, }; use std::sync::atomic::AtomicBool; @@ -256,7 +256,7 @@ impl BatchLogProcessor { if let Some(channel) = res_channel { if let Err(send_error) = channel.send(result) { - otel_warn!( + otel_debug!( name: "BatchLogProcessor.Flush.SendResultError", error = format!("{:?}", send_error), ); From c25923d5c24ca4d56aaf87bf80e071e79474d8af Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Wed, 9 Oct 2024 12:55:22 -0700 Subject: [PATCH 06/11] review comments --- opentelemetry-sdk/src/logs/log_emitter.rs | 28 +++++++++++---- opentelemetry-sdk/src/logs/log_processor.rs | 36 ++++++++++++-------- opentelemetry/src/global/internal_logging.rs | 12 ++----- opentelemetry/src/logs/mod.rs | 8 +++++ 4 files changed, 53 insertions(+), 31 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 85282d4828..37f8aa33f5 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -2,7 +2,7 @@ use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, Trac use crate::{export::logs::LogExporter, runtime::RuntimeChannel, Resource}; use opentelemetry::{ logs::{LogError, LogResult}, - otel_error, otel_warn, + otel_debug, otel_error, otel_warn, trace::TraceContextExt, Context, InstrumentationLibrary, }; @@ -113,10 +113,23 @@ impl LoggerProvider { let mut errs = vec![]; for processor in &self.inner.processors { if let Err(err) = processor.shutdown() { - otel_error!( - name: "LoggerProvider.Shutdown.Error", - error = format!("{err}") - ); + if let Err(err) = processor.shutdown() { + match err { + // Specific handling for mutex poisoning + LogError::MutexPoisoned(_) => { + otel_debug!( + name: "LoggerProvider.Shutdown.MutexPoisoned", + ); + } + _ => { + otel_error!( + name: "LoggerProvider.Shutdown.Error", + error = format!("{err}") + ); + } + } + errs.push(err); + } errs.push(err); } } @@ -124,10 +137,11 @@ impl LoggerProvider { if errs.is_empty() { Ok(()) } else { + // consolidate errors from all the processors - not all may be user errors Err(LogError::Other(format!("{errs:?}").into())) } } else { - let error = LogError::Other("logger provider already shut down".into()); + let error = LogError::AlreadyShutdown("LoggerProvider".to_string()); otel_warn!( name: "LoggerProvider.Shutdown.AlreadyShutdown", ); @@ -148,7 +162,7 @@ impl Drop for LoggerProviderInner { if let Err(err) = processor.shutdown() { otel_warn!( name: "LoggerProvider.Drop.ShutdownError", - error = err + error = format!("{}", err) ); } } diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index dec02d5cfa..388e0b2e31 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -13,7 +13,7 @@ use futures_util::{ use opentelemetry::logs::Severity; use opentelemetry::{ logs::{LogError, LogResult}, - otel_error, otel_warn, InstrumentationLibrary, + otel_debug, otel_error, otel_warn, InstrumentationLibrary, }; use std::sync::atomic::AtomicBool; @@ -98,9 +98,9 @@ impl LogProcessor for SimpleLogProcessor { fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) { // noop after shutdown if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { + // this is a warning, as the user is trying to log after the processor has been shutdown otel_warn!( - name: "SimpleLogProcessor.Export.AfterShutdown", - error = LogError::Other("Attempted to export a log after processor shutdown".into()) + name: "SimpleLogProcessor.Emit.ProcessorShutdown", ); return; } @@ -108,16 +108,26 @@ impl LogProcessor for SimpleLogProcessor { let result = self .exporter .lock() - .map_err(|_| LogError::Other("simple logprocessor mutex poison".into())) + .map_err(|_| LogError::MutexPoisoned("SimpleLogProcessor".into())) .and_then(|mut exporter| { let log_tuple = &[(record as &LogRecord, instrumentation)]; futures_executor::block_on(exporter.export(LogBatch::new(log_tuple))) }); - if let Err(err) = result { - otel_error!( - name: "SimpleLogProcessor.Export.Error", - error = err - ); + // Handle errors with specific static names + match result { + Err(LogError::MutexPoisoned(_)) => { + // logging as debug as this is not a user error + otel_debug!( + name: "SimpleLogProcessor.Emit.MutexPoisoning", + ); + } + Err(err) => { + otel_error!( + name: "SimpleLogProcessor.Emit.ExportError", + error = format!("{}",err) + ); + } + _ => {} } } @@ -132,9 +142,7 @@ impl LogProcessor for SimpleLogProcessor { exporter.shutdown(); Ok(()) } else { - Err(LogError::Other( - "simple logprocessor mutex poison during shutdown".into(), - )) + Err(LogError::MutexPoisoned("SimpleLogProcessor".into())) } } @@ -169,7 +177,7 @@ impl LogProcessor for BatchLogProcessor { if let Err(err) = result { otel_error!( name: "BatchLogProcessor.Export.Error", - error = err + error = format!("{}", err) ); } } @@ -239,7 +247,7 @@ impl BatchLogProcessor { if let Err(err) = result { otel_error!( name: "BatchLogProcessor.Export.Error", - error = err + error = format!("{}", err) ); } } diff --git a/opentelemetry/src/global/internal_logging.rs b/opentelemetry/src/global/internal_logging.rs index 71c3deb786..8059201ddc 100644 --- a/opentelemetry/src/global/internal_logging.rs +++ b/opentelemetry/src/global/internal_logging.rs @@ -64,7 +64,7 @@ macro_rules! otel_warn { tracing::warn!(name: $name, target: env!("CARGO_PKG_NAME"), $($key = { - $crate::format_value!($value) + $value }),+, "" ) @@ -137,7 +137,7 @@ macro_rules! otel_error { tracing::error!(name: $name, target: env!("CARGO_PKG_NAME"), $($key = { - $crate::format_value!($value) + $value }),+, "" ) @@ -151,11 +151,3 @@ macro_rules! otel_error { } }; } - -/// Helper macro to format a value -#[macro_export] -macro_rules! format_value { - ($value:expr) => {{ - format!("{:?}", $value) - }}; -} diff --git a/opentelemetry/src/logs/mod.rs b/opentelemetry/src/logs/mod.rs index f0bbe0d660..1a27edb2e0 100644 --- a/opentelemetry/src/logs/mod.rs +++ b/opentelemetry/src/logs/mod.rs @@ -30,6 +30,14 @@ pub enum LogError { #[error("Exporter timed out after {} seconds", .0.as_secs())] ExportTimedOut(Duration), + /// Processor is already shutdown + #[error("{0} already shutdown")] + AlreadyShutdown(String), + + /// Mutex lock poisoning + #[error("mutex lock poisioning for {0}")] + MutexPoisoned(String), + /// Other errors propagated from log SDK that weren't covered above. #[error(transparent)] Other(#[from] Box), From 0bd341b97eadd785df3216bdd4745f519f11cc44 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Wed, 9 Oct 2024 13:05:58 -0700 Subject: [PATCH 07/11] cont.. --- opentelemetry-sdk/src/logs/log_emitter.rs | 31 ++++++++++----------- opentelemetry-sdk/src/logs/log_processor.rs | 2 +- 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 37f8aa33f5..c7036435d9 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -113,22 +113,19 @@ impl LoggerProvider { let mut errs = vec![]; for processor in &self.inner.processors { if let Err(err) = processor.shutdown() { - if let Err(err) = processor.shutdown() { - match err { - // Specific handling for mutex poisoning - LogError::MutexPoisoned(_) => { - otel_debug!( - name: "LoggerProvider.Shutdown.MutexPoisoned", - ); - } - _ => { - otel_error!( - name: "LoggerProvider.Shutdown.Error", - error = format!("{err}") - ); - } + match err { + // Specific handling for mutex poisoning + LogError::MutexPoisoned(_) => { + otel_debug!( + name: "LoggerProvider.Shutdown.MutexPoisoned", + ); + } + _ => { + otel_debug!( + name: "LoggerProvider.Shutdown.Error", + error = format!("{err}") + ); } - errs.push(err); } errs.push(err); } @@ -142,7 +139,7 @@ impl LoggerProvider { } } else { let error = LogError::AlreadyShutdown("LoggerProvider".to_string()); - otel_warn!( + otel_debug!( name: "LoggerProvider.Shutdown.AlreadyShutdown", ); Err(error) @@ -160,7 +157,7 @@ impl Drop for LoggerProviderInner { fn drop(&mut self) { for processor in &mut self.processors { if let Err(err) = processor.shutdown() { - otel_warn!( + otel_debug!( name: "LoggerProvider.Drop.ShutdownError", error = format!("{}", err) ); diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 2184a262fa..e0784d4d94 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -284,7 +284,7 @@ impl BatchLogProcessor { exporter.shutdown(); if let Err(send_error) = ch.send(result) { - otel_warn!( + otel_debug!( name: "BatchLogProcessor.Shutdown.SendResultError", error = format!("{:?}", send_error), ); From 4979249f9cc3edbba101d928370a0fd059d8fdca Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Wed, 9 Oct 2024 13:06:46 -0700 Subject: [PATCH 08/11] lint --- opentelemetry-sdk/src/logs/log_emitter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index c7036435d9..e7f47eaabd 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -2,7 +2,7 @@ use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, Trac use crate::{export::logs::LogExporter, runtime::RuntimeChannel, Resource}; use opentelemetry::{ logs::{LogError, LogResult}, - otel_debug, otel_error, otel_warn, + otel_debug, trace::TraceContextExt, Context, InstrumentationLibrary, }; From f1182ff659b88f7ef5af7ea90135920182757681 Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Wed, 9 Oct 2024 13:21:20 -0700 Subject: [PATCH 09/11] handle loggerprovider_drop --- opentelemetry-sdk/src/logs/log_emitter.rs | 41 +++++++++++++++++------ 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index e7f47eaabd..4e4624344d 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -23,15 +23,14 @@ static NOOP_LOGGER_PROVIDER: Lazy = Lazy::new(|| LoggerProvider inner: Arc::new(LoggerProviderInner { processors: Vec::new(), resource: Resource::empty(), + is_shutdown: AtomicBool::new(true), }), - is_shutdown: Arc::new(AtomicBool::new(true)), }); #[derive(Debug, Clone)] /// Creator for `Logger` instances. pub struct LoggerProvider { inner: Arc, - is_shutdown: Arc, } /// Default logger name if empty string is provided. @@ -72,7 +71,7 @@ impl opentelemetry::logs::LoggerProvider for LoggerProvider { fn library_logger(&self, library: Arc) -> Self::Logger { // If the provider is shutdown, new logger will refer a no-op logger provider. - if self.is_shutdown.load(Ordering::Relaxed) { + if self.inner.is_shutdown.load(Ordering::Relaxed) { return Logger::new(library, NOOP_LOGGER_PROVIDER.clone()); } Logger::new(library, self.clone()) @@ -104,6 +103,7 @@ impl LoggerProvider { /// Shuts down this `LoggerProvider` pub fn shutdown(&self) -> LogResult<()> { if self + .inner .is_shutdown .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) .is_ok() @@ -151,17 +151,38 @@ impl LoggerProvider { struct LoggerProviderInner { processors: Vec>, resource: Resource, + is_shutdown: AtomicBool, } impl Drop for LoggerProviderInner { fn drop(&mut self) { - for processor in &mut self.processors { - if let Err(err) = processor.shutdown() { - otel_debug!( - name: "LoggerProvider.Drop.ShutdownError", - error = format!("{}", err) - ); + if self + .is_shutdown + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + { + for processor in &mut self.processors { + if let Err(err) = processor.shutdown() { + match err { + // Specific handling for mutex poisoning + LogError::MutexPoisoned(_) => { + otel_debug!( + name: "LoggerProvider.Drop.ShutdownMutexPoisoned", + ); + } + _ => { + otel_debug!( + name: "LoggerProvider.Drop.ShutdownError", + error = format!("{err}") + ); + } + } + } } + } else { + otel_debug!( + name: "LoggerProvider.Drop.AlreadyShutdown", + ); } } } @@ -216,8 +237,8 @@ impl Builder { inner: Arc::new(LoggerProviderInner { processors: self.processors, resource, + is_shutdown: AtomicBool::new(false), }), - is_shutdown: Arc::new(AtomicBool::new(false)), }; // invoke set_resource on all the processors From 40f7a5ca9fffdeb651ecb56a41247d1e182cf5ed Mon Sep 17 00:00:00 2001 From: Lalit Kumar Bhasin Date: Wed, 9 Oct 2024 13:41:17 -0700 Subject: [PATCH 10/11] add todo to prevert flooding --- opentelemetry-sdk/src/logs/log_processor.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index e0784d4d94..d74047fe13 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -174,6 +174,7 @@ impl LogProcessor for BatchLogProcessor { instrumentation.clone(), ))); + // TODO - Implement throttling to prevent error flooding when the queue is full or closed. if let Err(err) = result { otel_error!( name: "BatchLogProcessor.Export.Error", From 8d883927ff714cad9a669b96718c0e5c96298183 Mon Sep 17 00:00:00 2001 From: Lalit Date: Sun, 13 Oct 2024 21:35:28 -0700 Subject: [PATCH 11/11] fix merge errors --- opentelemetry-sdk/src/logs/log_emitter.rs | 37 ++++++++++++++--------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 834cdee6c6..71b02dc158 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -125,18 +125,10 @@ impl LoggerProvider { if errs.is_empty() { Ok(()) } else { - otel_warn!( - name: "logger_provider_shutdown_error", - error = format!("{:?}", errs) - ); - Err(LogError::Other(format!("{:?}", errs).into())) + Err(LogError::Other(format!("{errs:?}").into())) } } else { - let error = LogError::AlreadyShutdown("LoggerProvider".to_string()); - otel_debug!( - name: "LoggerProvider.Shutdown.AlreadyShutdown", - ); - Err(error) + Err(LogError::AlreadyShutdown("LoggerProvider".to_string())) } } } @@ -154,6 +146,24 @@ impl LoggerProviderInner { let mut errs = vec![]; for processor in &self.processors { if let Err(err) = processor.shutdown() { + // Log at debug level because: + // - The error is also returned to the user for handling (if applicable) + // - Or the error occurs during `LoggerProviderInner::Drop` as part of telemetry shutdown, + // which is non-actionable by the user + match err { + // specific handling for mutex poisioning + LogError::MutexPoisoned(_) => { + otel_debug!( + name: "LoggerProvider.Drop.ShutdownMutexPoisoned", + ); + } + _ => { + otel_debug!( + name: "LoggerProvider.Drop.ShutdownError", + error = format!("{err}") + ); + } + } errs.push(err); } } @@ -164,13 +174,10 @@ impl LoggerProviderInner { impl Drop for LoggerProviderInner { fn drop(&mut self) { if !self.is_shutdown.load(Ordering::Relaxed) { - let errs = self.shutdown(); - if !errs.is_empty() { - global::handle_error(LogError::Other(format!("{:?}", errs).into())); - } + let _ = self.shutdown(); // errors are handled within shutdown } else { otel_debug!( - name: "LoggerProvider.Drop.AlreadyShutdown", + name: "LoggerProvider.Drop.AlreadyShutdown" ); } }