diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 983d50827b..475cf72f77 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -236,3 +236,132 @@ impl opentelemetry::logs::Logger for Logger { enabled } } + +#[cfg(test)] +mod tests { + use super::*; + use opentelemetry::global::{logger, set_logger_provider, shutdown_logger_provider}; + use opentelemetry::logs::Logger; + use std::sync::{Arc, Mutex}; + use std::thread; + + #[test] + fn shutdown_test() { + // cargo test shutdown_test --features=logs + + // Arrange + let shutdown_called = Arc::new(Mutex::new(false)); + let flush_called = Arc::new(Mutex::new(false)); + let signal_to_end = Arc::new(Mutex::new(false)); + let signal_to_thread_started = Arc::new(Mutex::new(false)); + let logger_provider = LoggerProvider::builder() + .with_log_processor(LazyLogProcessor::new( + shutdown_called.clone(), + flush_called.clone(), + )) + .build(); + set_logger_provider(logger_provider); + + // Act + let logger1 = logger("test-logger1"); + let logger2 = logger("test-logger2"); + logger1.emit(LogRecord::default()); + logger2.emit(LogRecord::default()); + + let signal_to_end_clone = signal_to_end.clone(); + let signal_to_thread_started_clone = signal_to_thread_started.clone(); + + let handle = thread::spawn(move || { + let logger3 = logger("test-logger3"); + loop { + // signal the main thread that this thread has started. + *signal_to_thread_started_clone.lock().unwrap() = true; + logger3.emit(LogRecord::default()); + if *signal_to_end_clone.lock().unwrap() { + break; + } + } + }); + + // wait for the spawned thread to start before calling shutdown This is + // very important - if shutdown is called before the spawned thread + // obtains its logger, then the logger will be no-op one, and the test + // will pass, but it will not be testing the intended scenario. + while !*signal_to_thread_started.lock().unwrap() { + thread::sleep(std::time::Duration::from_millis(10)); + } + + // Intentionally *not* calling shutdown/flush on the provider, but + // instead relying on shutdown_logger_provider which causes the global + // provider to be dropped, leading to the sdk logger provider's drop to + // be called, which is expected to call shutdown on processors. + shutdown_logger_provider(); + + // Assert + + // shutdown_logger_provider is necessary but not sufficient, as loggers + // hold on to the the provider (via inner provider clones). + assert!(!*shutdown_called.lock().unwrap()); + + // flush is never called by the sdk. + assert!(!*flush_called.lock().unwrap()); + + // Drop one of the logger. Not enough! + drop(logger1); + assert!(!*shutdown_called.lock().unwrap()); + + // drop logger2, which is the only remaining logger in this thread. + // Still not enough! + drop(logger2); + assert!(!*shutdown_called.lock().unwrap()); + + // now signal the spawned thread to end, which causes it to drop its + // logger. Since that is the last logger, the provider (inner provider) + // is finally dropped, triggering shutdown + *signal_to_end.lock().unwrap() = true; + handle.join().unwrap(); + assert!(*shutdown_called.lock().unwrap()); + + // flush is never called by the sdk. + assert!(!*flush_called.lock().unwrap()); + } + + #[derive(Debug)] + pub(crate) struct LazyLogProcessor { + shutdown_called: Arc>, + flush_called: Arc>, + } + + impl LazyLogProcessor { + pub(crate) fn new( + shutdown_called: Arc>, + flush_called: Arc>, + ) -> Self { + LazyLogProcessor { + shutdown_called, + flush_called, + } + } + } + + impl LogProcessor for LazyLogProcessor { + fn emit(&self, _data: LogData) { + // nothing to do. + } + + fn force_flush(&self) -> LogResult<()> { + *self.flush_called.lock().unwrap() = true; + Ok(()) + } + + fn shutdown(&mut self) -> LogResult<()> { + *self.shutdown_called.lock().unwrap() = true; + Ok(()) + } + + #[cfg(feature = "logs_level_enabled")] + fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool { + true + } + } +}