Skip to content

Commit b6a108e

Browse files
lalitbcijothomas
andauthored
Avoid redundant shutdown in LoggerProvider::drop when already shut down (#2195)
Co-authored-by: Cijo Thomas <cijo.thomas@gmail.com>
1 parent 20fd454 commit b6a108e

File tree

2 files changed

+157
-21
lines changed

2 files changed

+157
-21
lines changed

opentelemetry-sdk/CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
- Bump MSRV to 1.70 [#2179](https://github.com/open-telemetry/opentelemetry-rust/pull/2179)
66
- Implement `LogRecord::set_trace_context` for `LogRecord`. Respect any trace context set on a `LogRecord` when emitting through a `Logger`.
7+
- Improved `LoggerProvider` shutdown handling to prevent redundant shutdown calls when `drop` is invoked. [#2195](https://github.com/open-telemetry/opentelemetry-rust/pull/2195)
78

89
## v0.26.0
910
Released 2024-Sep-30

opentelemetry-sdk/src/logs/log_emitter.rs

+156-21
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,26 @@ static NOOP_LOGGER_PROVIDER: Lazy<LoggerProvider> = Lazy::new(|| LoggerProvider
2424
inner: Arc::new(LoggerProviderInner {
2525
processors: Vec::new(),
2626
resource: Resource::empty(),
27+
is_shutdown: AtomicBool::new(true),
2728
}),
28-
is_shutdown: Arc::new(AtomicBool::new(true)),
2929
});
3030

3131
#[derive(Debug, Clone)]
32-
/// Creator for `Logger` instances.
32+
/// Handles the creation and coordination of [`Logger`]s.
33+
///
34+
/// All `Logger`s created by a `LoggerProvider` will share the same
35+
/// [`Resource`] and have their created log records processed by the
36+
/// configured log processors. This is a clonable handle to the `LoggerProvider`
37+
/// itself, and cloning it will create a new reference, not a new instance of a
38+
/// `LoggerProvider`. Dropping the last reference will trigger the shutdown of
39+
/// the provider, ensuring that all remaining logs are flushed and no further
40+
/// logs are processed. Shutdown can also be triggered manually by calling
41+
/// the [`shutdown`](LoggerProvider::shutdown) method.
42+
///
43+
/// [`Logger`]: opentelemetry::logs::Logger
44+
/// [`Resource`]: crate::Resource
3345
pub struct LoggerProvider {
3446
inner: Arc<LoggerProviderInner>,
35-
is_shutdown: Arc<AtomicBool>,
3647
}
3748

3849
/// Default logger name if empty string is provided.
@@ -73,7 +84,7 @@ impl opentelemetry::logs::LoggerProvider for LoggerProvider {
7384

7485
fn library_logger(&self, library: Arc<InstrumentationLibrary>) -> Self::Logger {
7586
// If the provider is shutdown, new logger will refer a no-op logger provider.
76-
if self.is_shutdown.load(Ordering::Relaxed) {
87+
if self.inner.is_shutdown.load(Ordering::Relaxed) {
7788
return Logger::new(library, NOOP_LOGGER_PROVIDER.clone());
7889
}
7990
Logger::new(library, self.clone())
@@ -105,27 +116,21 @@ impl LoggerProvider {
105116
/// Shuts down this `LoggerProvider`
106117
pub fn shutdown(&self) -> LogResult<()> {
107118
if self
119+
.inner
108120
.is_shutdown
109121
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
110122
.is_ok()
111123
{
112124
// propagate the shutdown signal to processors
113-
// it's up to the processor to properly block new logs after shutdown
114-
let mut errs = vec![];
115-
for processor in &self.inner.processors {
116-
if let Err(err) = processor.shutdown() {
117-
otel_warn!(
118-
name: "logger_provider_shutdown_error",
119-
error = format!("{:?}", err)
120-
);
121-
errs.push(err);
122-
}
123-
}
124-
125+
let errs = self.inner.shutdown();
125126
if errs.is_empty() {
126127
Ok(())
127128
} else {
128-
Err(LogError::Other(format!("{errs:?}").into()))
129+
otel_warn!(
130+
name: "logger_provider_shutdown_error",
131+
error = format!("{:?}", errs)
132+
);
133+
Err(LogError::Other(format!("{:?}", errs).into()))
129134
}
130135
} else {
131136
otel_warn!(
@@ -140,13 +145,28 @@ impl LoggerProvider {
140145
struct LoggerProviderInner {
141146
processors: Vec<Box<dyn LogProcessor>>,
142147
resource: Resource,
148+
is_shutdown: AtomicBool,
149+
}
150+
151+
impl LoggerProviderInner {
152+
/// Shuts down the `LoggerProviderInner` and returns any errors.
153+
pub(crate) fn shutdown(&self) -> Vec<LogError> {
154+
let mut errs = vec![];
155+
for processor in &self.processors {
156+
if let Err(err) = processor.shutdown() {
157+
errs.push(err);
158+
}
159+
}
160+
errs
161+
}
143162
}
144163

145164
impl Drop for LoggerProviderInner {
146165
fn drop(&mut self) {
147-
for processor in &mut self.processors {
148-
if let Err(err) = processor.shutdown() {
149-
global::handle_error(err);
166+
if !self.is_shutdown.load(Ordering::Relaxed) {
167+
let errs = self.shutdown();
168+
if !errs.is_empty() {
169+
global::handle_error(LogError::Other(format!("{:?}", errs).into()));
150170
}
151171
}
152172
}
@@ -202,8 +222,8 @@ impl Builder {
202222
inner: Arc::new(LoggerProviderInner {
203223
processors: self.processors,
204224
resource,
225+
is_shutdown: AtomicBool::new(false),
205226
}),
206-
is_shutdown: Arc::new(AtomicBool::new(false)),
207227
};
208228

209229
// invoke set_resource on all the processors
@@ -612,6 +632,89 @@ mod tests {
612632
assert!(!*flush_called.lock().unwrap());
613633
}
614634

635+
#[test]
636+
fn drop_test_with_multiple_providers() {
637+
let shutdown_called = Arc::new(Mutex::new(false));
638+
let flush_called = Arc::new(Mutex::new(false));
639+
{
640+
// Create a shared LoggerProviderInner and use it across multiple providers
641+
let shared_inner = Arc::new(LoggerProviderInner {
642+
processors: vec![Box::new(LazyLogProcessor::new(
643+
shutdown_called.clone(),
644+
flush_called.clone(),
645+
))],
646+
resource: Resource::empty(),
647+
is_shutdown: AtomicBool::new(false),
648+
});
649+
650+
{
651+
let logger_provider1 = LoggerProvider {
652+
inner: shared_inner.clone(),
653+
};
654+
let logger_provider2 = LoggerProvider {
655+
inner: shared_inner.clone(),
656+
};
657+
658+
let logger1 = logger_provider1.logger("test-logger1");
659+
let logger2 = logger_provider2.logger("test-logger2");
660+
661+
logger1.emit(logger1.create_log_record());
662+
logger2.emit(logger1.create_log_record());
663+
664+
// LoggerProviderInner should not be dropped yet, since both providers and `shared_inner`
665+
// are still holding a reference.
666+
}
667+
// At this point, both `logger_provider1` and `logger_provider2` are dropped,
668+
// but `shared_inner` still holds a reference, so `LoggerProviderInner` is NOT dropped yet.
669+
}
670+
// Verify shutdown was called during the drop of the shared LoggerProviderInner
671+
assert!(*shutdown_called.lock().unwrap());
672+
// Verify flush was not called during drop
673+
assert!(!*flush_called.lock().unwrap());
674+
}
675+
676+
#[test]
677+
fn drop_after_shutdown_test_with_multiple_providers() {
678+
let shutdown_called = Arc::new(Mutex::new(0)); // Count the number of times shutdown is called
679+
let flush_called = Arc::new(Mutex::new(false));
680+
681+
// Create a shared LoggerProviderInner and use it across multiple providers
682+
let shared_inner = Arc::new(LoggerProviderInner {
683+
processors: vec![Box::new(CountingShutdownProcessor::new(
684+
shutdown_called.clone(),
685+
flush_called.clone(),
686+
))],
687+
resource: Resource::empty(),
688+
is_shutdown: AtomicBool::new(false),
689+
});
690+
691+
// Create a scope to test behavior when providers are dropped
692+
{
693+
let logger_provider1 = LoggerProvider {
694+
inner: shared_inner.clone(),
695+
};
696+
let logger_provider2 = LoggerProvider {
697+
inner: shared_inner.clone(),
698+
};
699+
700+
// Explicitly shut down the logger provider
701+
let shutdown_result = logger_provider1.shutdown();
702+
assert!(shutdown_result.is_ok());
703+
704+
// Verify that shutdown was called exactly once
705+
assert_eq!(*shutdown_called.lock().unwrap(), 1);
706+
707+
// LoggerProvider2 should observe the shutdown state but not trigger another shutdown
708+
let shutdown_result2 = logger_provider2.shutdown();
709+
assert!(shutdown_result2.is_err());
710+
711+
// Both logger providers will be dropped at the end of this scope
712+
}
713+
714+
// Verify that shutdown was only called once, even after drop
715+
assert_eq!(*shutdown_called.lock().unwrap(), 1);
716+
}
717+
615718
#[derive(Debug)]
616719
pub(crate) struct LazyLogProcessor {
617720
shutdown_called: Arc<Mutex<bool>>,
@@ -645,4 +748,36 @@ mod tests {
645748
Ok(())
646749
}
647750
}
751+
752+
#[derive(Debug)]
753+
struct CountingShutdownProcessor {
754+
shutdown_count: Arc<Mutex<i32>>,
755+
flush_called: Arc<Mutex<bool>>,
756+
}
757+
758+
impl CountingShutdownProcessor {
759+
fn new(shutdown_count: Arc<Mutex<i32>>, flush_called: Arc<Mutex<bool>>) -> Self {
760+
CountingShutdownProcessor {
761+
shutdown_count,
762+
flush_called,
763+
}
764+
}
765+
}
766+
767+
impl LogProcessor for CountingShutdownProcessor {
768+
fn emit(&self, _data: &mut LogRecord, _library: &InstrumentationLibrary) {
769+
// nothing to do
770+
}
771+
772+
fn force_flush(&self) -> LogResult<()> {
773+
*self.flush_called.lock().unwrap() = true;
774+
Ok(())
775+
}
776+
777+
fn shutdown(&self) -> LogResult<()> {
778+
let mut count = self.shutdown_count.lock().unwrap();
779+
*count += 1;
780+
Ok(())
781+
}
782+
}
648783
}

0 commit comments

Comments
 (0)