From b7c14d6d9131ea89cee4cc958be8010266fdae42 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Wed, 13 Mar 2024 10:51:00 -0700 Subject: [PATCH 1/7] Add test to validate log shutdown --- opentelemetry-sdk/src/logs/mod.rs | 48 +++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/opentelemetry-sdk/src/logs/mod.rs b/opentelemetry-sdk/src/logs/mod.rs index e4c476d15c..431e35117d 100644 --- a/opentelemetry-sdk/src/logs/mod.rs +++ b/opentelemetry-sdk/src/logs/mod.rs @@ -17,6 +17,7 @@ mod tests { use crate::testing::logs::InMemoryLogsExporter; use opentelemetry::logs::{LogRecord, Logger, LoggerProvider as _, Severity}; use opentelemetry::{logs::AnyValue, Key}; + use opentelemetry::global::{set_logger_provider, logger, shutdown_logger_provider}; #[test] fn logging_sdk_test() { @@ -57,4 +58,51 @@ mod tests { .expect("Attributes are expected"); assert_eq!(attributes.len(), 2); } + + #[test] + fn logging_sdk_shutdown_test() { + // Arrange + let exporter: InMemoryLogsExporter = InMemoryLogsExporter::default(); + let logger_provider = LoggerProvider::builder() + .with_log_processor(SimpleLogProcessor::new(Box::new(exporter.clone()))) + .build(); + set_logger_provider(logger_provider); + + // Act + let logger = logger("test-logger"); + let mut log_record: LogRecord = LogRecord::default(); + log_record.severity_number = Some(Severity::Error); + log_record.severity_text = Some("Error".into()); + let attributes = vec![ + (Key::new("key1"), "value1".into()), + (Key::new("key2"), "value2".into()), + ]; + log_record.attributes = Some(attributes); + logger.emit(log_record); + + // Intentionally *not* calling shutdown/flush + // on the provider, but instead relying on + // shutdown_logger_provider which causes + // the global provider to be dropped, and + // the sdk logger provider's drop implementation + // will cause shutdown to be called on processors/exporters. + shutdown_logger_provider(); + + // Assert + let exported_logs = exporter + .get_emitted_logs() + .expect("Logs are expected to be exported."); + assert_eq!(exported_logs.len(), 1); + let log = exported_logs + .first() + .expect("Atleast one log is expected to be present."); + assert_eq!(log.instrumentation.name, "test-logger"); + assert_eq!(log.record.severity_number, Some(Severity::Error)); + let attributes: Vec<(Key, AnyValue)> = log + .record + .attributes + .clone() + .expect("Attributes are expected"); + assert_eq!(attributes.len(), 2); + } } From 525461e205c4c527607b47f29a10545e74f93cad Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Wed, 13 Mar 2024 10:51:52 -0700 Subject: [PATCH 2/7] fmt --- opentelemetry-sdk/src/logs/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opentelemetry-sdk/src/logs/mod.rs b/opentelemetry-sdk/src/logs/mod.rs index 431e35117d..530b1e1381 100644 --- a/opentelemetry-sdk/src/logs/mod.rs +++ b/opentelemetry-sdk/src/logs/mod.rs @@ -15,9 +15,9 @@ pub use log_processor::{ mod tests { use super::*; use crate::testing::logs::InMemoryLogsExporter; + use opentelemetry::global::{logger, set_logger_provider, shutdown_logger_provider}; use opentelemetry::logs::{LogRecord, Logger, LoggerProvider as _, Severity}; use opentelemetry::{logs::AnyValue, Key}; - use opentelemetry::global::{set_logger_provider, logger, shutdown_logger_provider}; #[test] fn logging_sdk_test() { @@ -81,7 +81,7 @@ mod tests { logger.emit(log_record); // Intentionally *not* calling shutdown/flush - // on the provider, but instead relying on + // on the provider, but instead relying on // shutdown_logger_provider which causes // the global provider to be dropped, and // the sdk logger provider's drop implementation From 0d650f4e0e23565a08d5039ac3e2ac9afc5936f6 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Wed, 13 Mar 2024 21:40:34 -0700 Subject: [PATCH 3/7] Add shutdown test for log --- opentelemetry-sdk/src/logs/log_emitter.rs | 95 +++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 983d50827b..9afc338e5b 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -236,3 +236,98 @@ impl opentelemetry::logs::Logger for Logger { enabled } } + +#[cfg(test)] +mod tests { + use super::*; + use opentelemetry::global::{logger, set_logger_provider, shutdown_logger_provider}; + use opentelemetry::logs::Logger; + use std::sync::{Arc, Mutex}; + + #[test] + fn logging_sdk_shutdown_test() { + // Arrange + let shutdown_called = Arc::new(Mutex::new(false)); + let flush_called = Arc::new(Mutex::new(false)); + let logger_provider = LoggerProvider::builder() + .with_log_processor(LazyLogProcessor::new( + shutdown_called.clone(), + flush_called.clone(), + )) + .build(); + set_logger_provider(logger_provider); + + // Act + let logger1 = logger("test-logger1"); + let logger2 = logger("test-logger2"); + logger1.emit(LogRecord::default()); + logger2.emit(LogRecord::default()); + + // Intentionally *not* calling shutdown/flush + // on the provider, but instead relying on + // shutdown_logger_provider which causes + // the global provider to be dropped, and + // the sdk logger provider's drop implementation + // will cause shutdown to be called on processors/exporters. + shutdown_logger_provider(); + + // Assert + // shutting down logger provider is not enough, + // as loggers hold the provider's inner provider. + assert!(!*shutdown_called.lock().unwrap()); + + // flush is never called by the sdk. + assert!(!*flush_called.lock().unwrap()); + + // Drop one of the logger. Not enough! + drop(logger1); + assert!(!*shutdown_called.lock().unwrap()); + + // drop logger2, which is the only remaining logger, + // and this will drop the innerprovider, triggering shutdown + drop(logger2); + assert!(*shutdown_called.lock().unwrap()); + + // flush is never called by the sdk. + assert!(!*flush_called.lock().unwrap()); + } + + #[derive(Debug)] + pub(crate) struct LazyLogProcessor { + shutdown_called: Arc>, + flush_called: Arc>, + } + + impl LazyLogProcessor { + pub(crate) fn new( + shutdown_called: Arc>, + flush_called: Arc>, + ) -> Self { + LazyLogProcessor { + shutdown_called: shutdown_called, + flush_called: flush_called, + } + } + } + + impl LogProcessor for LazyLogProcessor { + fn emit(&self, _data: LogData) { + // nothing to do. + } + + fn force_flush(&self) -> LogResult<()> { + *self.flush_called.lock().unwrap() = true; + Ok(()) + } + + fn shutdown(&mut self) -> LogResult<()> { + *self.shutdown_called.lock().unwrap() = true; + Ok(()) + } + + #[cfg(feature = "logs_level_enabled")] + fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool { + true + } + } +} From 11d809a1e4453313c2216096384b685a3c39ebbe Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Wed, 13 Mar 2024 21:44:47 -0700 Subject: [PATCH 4/7] move test to log emitter --- opentelemetry-sdk/src/logs/log_emitter.rs | 29 ++++++++++++----------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index 9afc338e5b..d28fb7c104 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -245,7 +245,9 @@ mod tests { use std::sync::{Arc, Mutex}; #[test] - fn logging_sdk_shutdown_test() { + fn shutdown_test() { + // cargo test shutdown_test --features=logs + // Arrange let shutdown_called = Arc::new(Mutex::new(false)); let flush_called = Arc::new(Mutex::new(false)); @@ -263,28 +265,27 @@ mod tests { logger1.emit(LogRecord::default()); logger2.emit(LogRecord::default()); - // Intentionally *not* calling shutdown/flush - // on the provider, but instead relying on - // shutdown_logger_provider which causes - // the global provider to be dropped, and - // the sdk logger provider's drop implementation - // will cause shutdown to be called on processors/exporters. + // Intentionally *not* calling shutdown/flush on the provider, but + // instead relying on shutdown_logger_provider which causes the global + // provider to be dropped, leading to the sdk logger provider's drop to + // be called, which is expected to call shutdown on processors. shutdown_logger_provider(); // Assert - // shutting down logger provider is not enough, - // as loggers hold the provider's inner provider. + + // shutdown_logger_provider is necessary but not sufficient, as loggers + // hold on to the the provider (via inner provider clones). assert!(!*shutdown_called.lock().unwrap()); // flush is never called by the sdk. assert!(!*flush_called.lock().unwrap()); - // Drop one of the logger. Not enough! + // Drop one of the logger. Still not enough! drop(logger1); assert!(!*shutdown_called.lock().unwrap()); - // drop logger2, which is the only remaining logger, - // and this will drop the innerprovider, triggering shutdown + // drop logger2, which is the only remaining logger, and this will + // finally drop the inner provider, triggering shutdown. drop(logger2); assert!(*shutdown_called.lock().unwrap()); @@ -304,8 +305,8 @@ mod tests { flush_called: Arc>, ) -> Self { LazyLogProcessor { - shutdown_called: shutdown_called, - flush_called: flush_called, + shutdown_called, + flush_called, } } } From 487fade02d23368bbdee1c88123b816ae8211bfd Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Wed, 13 Mar 2024 21:46:08 -0700 Subject: [PATCH 5/7] revert mod --- opentelemetry-sdk/src/logs/mod.rs | 48 ------------------------------- 1 file changed, 48 deletions(-) diff --git a/opentelemetry-sdk/src/logs/mod.rs b/opentelemetry-sdk/src/logs/mod.rs index 530b1e1381..e4c476d15c 100644 --- a/opentelemetry-sdk/src/logs/mod.rs +++ b/opentelemetry-sdk/src/logs/mod.rs @@ -15,7 +15,6 @@ pub use log_processor::{ mod tests { use super::*; use crate::testing::logs::InMemoryLogsExporter; - use opentelemetry::global::{logger, set_logger_provider, shutdown_logger_provider}; use opentelemetry::logs::{LogRecord, Logger, LoggerProvider as _, Severity}; use opentelemetry::{logs::AnyValue, Key}; @@ -58,51 +57,4 @@ mod tests { .expect("Attributes are expected"); assert_eq!(attributes.len(), 2); } - - #[test] - fn logging_sdk_shutdown_test() { - // Arrange - let exporter: InMemoryLogsExporter = InMemoryLogsExporter::default(); - let logger_provider = LoggerProvider::builder() - .with_log_processor(SimpleLogProcessor::new(Box::new(exporter.clone()))) - .build(); - set_logger_provider(logger_provider); - - // Act - let logger = logger("test-logger"); - let mut log_record: LogRecord = LogRecord::default(); - log_record.severity_number = Some(Severity::Error); - log_record.severity_text = Some("Error".into()); - let attributes = vec![ - (Key::new("key1"), "value1".into()), - (Key::new("key2"), "value2".into()), - ]; - log_record.attributes = Some(attributes); - logger.emit(log_record); - - // Intentionally *not* calling shutdown/flush - // on the provider, but instead relying on - // shutdown_logger_provider which causes - // the global provider to be dropped, and - // the sdk logger provider's drop implementation - // will cause shutdown to be called on processors/exporters. - shutdown_logger_provider(); - - // Assert - let exported_logs = exporter - .get_emitted_logs() - .expect("Logs are expected to be exported."); - assert_eq!(exported_logs.len(), 1); - let log = exported_logs - .first() - .expect("Atleast one log is expected to be present."); - assert_eq!(log.instrumentation.name, "test-logger"); - assert_eq!(log.record.severity_number, Some(Severity::Error)); - let attributes: Vec<(Key, AnyValue)> = log - .record - .attributes - .clone() - .expect("Attributes are expected"); - assert_eq!(attributes.len(), 2); - } } From 04a88ab063e900a56d0a15fd35d1ed1a3ef41edf Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Wed, 13 Mar 2024 23:33:17 -0700 Subject: [PATCH 6/7] logger from another thread --- opentelemetry-sdk/src/logs/log_emitter.rs | 41 ++++++++++++++++++++--- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index d28fb7c104..dbe4ee6471 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -13,7 +13,7 @@ use opentelemetry::{ #[cfg(feature = "logs_level_enabled")] use opentelemetry::logs::Severity; -use std::{borrow::Cow, sync::Arc}; +use std::{borrow::Cow, sync::Arc, thread}; #[derive(Debug, Clone)] /// Creator for `Logger` instances. @@ -243,6 +243,7 @@ mod tests { use opentelemetry::global::{logger, set_logger_provider, shutdown_logger_provider}; use opentelemetry::logs::Logger; use std::sync::{Arc, Mutex}; + use std::thread; #[test] fn shutdown_test() { @@ -251,6 +252,8 @@ mod tests { // Arrange let shutdown_called = Arc::new(Mutex::new(false)); let flush_called = Arc::new(Mutex::new(false)); + let signal_to_end = Arc::new(Mutex::new(false)); + let signal_to_thread_started = Arc::new(Mutex::new(false)); let logger_provider = LoggerProvider::builder() .with_log_processor(LazyLogProcessor::new( shutdown_called.clone(), @@ -265,6 +268,29 @@ mod tests { logger1.emit(LogRecord::default()); logger2.emit(LogRecord::default()); + let signal_to_end_clone = signal_to_end.clone(); + let signal_to_thread_started_clone = signal_to_thread_started.clone(); + + let handle = thread::spawn(move || { + let logger3 = logger("test-logger3"); + loop { + // signal the main thread that this thread has started. + *signal_to_thread_started_clone.lock().unwrap() = true; + logger3.emit(LogRecord::default()); + if *signal_to_end_clone.lock().unwrap() { + break; + } + } + }); + + // wait for the spawned thread to start before calling shutdown This is + // very important - if shutdown is called before the spawned thread + // obtains its logger, then the logger will be no-op one, and the test + // will pass, but it will not be testing the intended scenario. + while !*signal_to_thread_started.lock().unwrap() { + thread::sleep(std::time::Duration::from_millis(10)); + } + // Intentionally *not* calling shutdown/flush on the provider, but // instead relying on shutdown_logger_provider which causes the global // provider to be dropped, leading to the sdk logger provider's drop to @@ -280,13 +306,20 @@ mod tests { // flush is never called by the sdk. assert!(!*flush_called.lock().unwrap()); - // Drop one of the logger. Still not enough! + // Drop one of the logger. Not enough! drop(logger1); assert!(!*shutdown_called.lock().unwrap()); - // drop logger2, which is the only remaining logger, and this will - // finally drop the inner provider, triggering shutdown. + // drop logger2, which is the only remaining logger in this thread. + // Still not enough! drop(logger2); + assert!(!*shutdown_called.lock().unwrap()); + + // now signal the spawned thread to end, which causes it to drop its + // logger. Since that is the last logger, the provider (inner provider) + // is finally dropped, triggering shutdown + *signal_to_end.lock().unwrap() = true; + let _h = handle.join().unwrap(); assert!(*shutdown_called.lock().unwrap()); // flush is never called by the sdk. From cb4cf8d0b47b8987c1a066cace0ede9de35bcdfd Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Wed, 13 Mar 2024 23:33:39 -0700 Subject: [PATCH 7/7] clip --- opentelemetry-sdk/src/logs/log_emitter.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index dbe4ee6471..475cf72f77 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -13,7 +13,7 @@ use opentelemetry::{ #[cfg(feature = "logs_level_enabled")] use opentelemetry::logs::Severity; -use std::{borrow::Cow, sync::Arc, thread}; +use std::{borrow::Cow, sync::Arc}; #[derive(Debug, Clone)] /// Creator for `Logger` instances. @@ -319,7 +319,7 @@ mod tests { // logger. Since that is the last logger, the provider (inner provider) // is finally dropped, triggering shutdown *signal_to_end.lock().unwrap() = true; - let _h = handle.join().unwrap(); + handle.join().unwrap(); assert!(*shutdown_called.lock().unwrap()); // flush is never called by the sdk.