Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid redundant shutdown in LoggerProvider::drop when already shut down #2195

Merged
merged 15 commits into from
Oct 12, 2024
1 change: 1 addition & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- Bump MSRV to 1.70 [#2179](https://github.com/open-telemetry/opentelemetry-rust/pull/2179)
- Implement `LogRecord::set_trace_context` for `LogRecord`. Respect any trace context set on a `LogRecord` when emitting through a `Logger`.
- Improved `LoggerProvider` shutdown handling to prevent redundant shutdown calls when `drop` is invoked. [#2195](https://github.com/open-telemetry/opentelemetry-rust/pull/2195)

## v0.26.0
Released 2024-Sep-30
Expand Down
177 changes: 156 additions & 21 deletions opentelemetry-sdk/src/logs/log_emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,26 @@
inner: Arc::new(LoggerProviderInner {
processors: Vec::new(),
resource: Resource::empty(),
is_shutdown: AtomicBool::new(true),

Check warning on line 27 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L27

Added line #L27 was not covered by tests
}),
is_shutdown: Arc::new(AtomicBool::new(true)),
});

#[derive(Debug, Clone)]
/// Creator for `Logger` instances.
/// Handles the creation and coordination of [`Logger`]s.
///
/// All `Logger`s created by a `LoggerProvider` will share the same
/// [`Resource`] and have their created log records processed by the
/// configured log processors. This is a clonable handle to the `LoggerProvider`
/// itself, and cloning it will create a new reference, not a new instance of a
/// `LoggerProvider`. Dropping the last reference will trigger the shutdown of
/// the provider, ensuring that all remaining logs are flushed and no further
/// logs are processed. Shutdown can also be triggered manually by calling
/// the [`shutdown`](LoggerProvider::shutdown) method.
///
/// [`Logger`]: opentelemetry::logs::Logger
/// [`Resource`]: crate::Resource
pub struct LoggerProvider {
inner: Arc<LoggerProviderInner>,
is_shutdown: Arc<AtomicBool>,
}

/// Default logger name if empty string is provided.
Expand Down Expand Up @@ -73,7 +84,7 @@

fn library_logger(&self, library: Arc<InstrumentationLibrary>) -> Self::Logger {
// If the provider is shutdown, new logger will refer a no-op logger provider.
if self.is_shutdown.load(Ordering::Relaxed) {
if self.inner.is_shutdown.load(Ordering::Relaxed) {
return Logger::new(library, NOOP_LOGGER_PROVIDER.clone());
}
Logger::new(library, self.clone())
Expand Down Expand Up @@ -105,27 +116,21 @@
/// Shuts down this `LoggerProvider`
pub fn shutdown(&self) -> LogResult<()> {
if self
.inner
.is_shutdown
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
// propagate the shutdown signal to processors
// it's up to the processor to properly block new logs after shutdown
let mut errs = vec![];
for processor in &self.inner.processors {
if let Err(err) = processor.shutdown() {
otel_warn!(
name: "logger_provider_shutdown_error",
error = format!("{:?}", err)
);
errs.push(err);
}
}

let errs = self.inner.shutdown();
if errs.is_empty() {
Ok(())
} else {
Err(LogError::Other(format!("{errs:?}").into()))
otel_warn!(

Check warning on line 129 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L129

Added line #L129 was not covered by tests
name: "logger_provider_shutdown_error",
error = format!("{:?}", errs)

Check warning on line 131 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L131

Added line #L131 was not covered by tests
);
Err(LogError::Other(format!("{:?}", errs).into()))

Check warning on line 133 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L133

Added line #L133 was not covered by tests
}
} else {
otel_warn!(
Expand All @@ -140,13 +145,28 @@
struct LoggerProviderInner {
processors: Vec<Box<dyn LogProcessor>>,
resource: Resource,
is_shutdown: AtomicBool,
}

impl LoggerProviderInner {
/// Shuts down the `LoggerProviderInner` and returns any errors.
pub(crate) fn shutdown(&self) -> Vec<LogError> {
let mut errs = vec![];
for processor in &self.processors {
if let Err(err) = processor.shutdown() {
errs.push(err);

Check warning on line 157 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L157

Added line #L157 was not covered by tests
}
}
errs
}
}

impl Drop for LoggerProviderInner {
fn drop(&mut self) {
for processor in &mut self.processors {
if let Err(err) = processor.shutdown() {
global::handle_error(err);
if !self.is_shutdown.load(Ordering::Relaxed) {
let errs = self.shutdown();
if !errs.is_empty() {
global::handle_error(LogError::Other(format!("{:?}", errs).into()));

Check warning on line 169 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L169

Added line #L169 was not covered by tests
}
}
}
Expand Down Expand Up @@ -202,8 +222,8 @@
inner: Arc::new(LoggerProviderInner {
processors: self.processors,
resource,
is_shutdown: AtomicBool::new(false),
}),
is_shutdown: Arc::new(AtomicBool::new(false)),
};

// invoke set_resource on all the processors
Expand Down Expand Up @@ -612,6 +632,89 @@
assert!(!*flush_called.lock().unwrap());
}

#[test]
fn drop_test_with_multiple_providers() {
let shutdown_called = Arc::new(Mutex::new(false));
let flush_called = Arc::new(Mutex::new(false));
{
// Create a shared LoggerProviderInner and use it across multiple providers
let shared_inner = Arc::new(LoggerProviderInner {
processors: vec![Box::new(LazyLogProcessor::new(
shutdown_called.clone(),
flush_called.clone(),
))],
resource: Resource::empty(),
is_shutdown: AtomicBool::new(false),
});

{
let logger_provider1 = LoggerProvider {
inner: shared_inner.clone(),
};
let logger_provider2 = LoggerProvider {
inner: shared_inner.clone(),
};

let logger1 = logger_provider1.logger("test-logger1");
let logger2 = logger_provider2.logger("test-logger2");

logger1.emit(logger1.create_log_record());
logger2.emit(logger1.create_log_record());

// LoggerProviderInner should not be dropped yet, since both providers and `shared_inner`
// are still holding a reference.
}
// At this point, both `logger_provider1` and `logger_provider2` are dropped,
// but `shared_inner` still holds a reference, so `LoggerProviderInner` is NOT dropped yet.
}
// Verify shutdown was called during the drop of the shared LoggerProviderInner
assert!(*shutdown_called.lock().unwrap());
// Verify flush was not called during drop
assert!(!*flush_called.lock().unwrap());
}

#[test]
fn drop_after_shutdown_test_with_multiple_providers() {
let shutdown_called = Arc::new(Mutex::new(0)); // Count the number of times shutdown is called
let flush_called = Arc::new(Mutex::new(false));

// Create a shared LoggerProviderInner and use it across multiple providers
let shared_inner = Arc::new(LoggerProviderInner {
processors: vec![Box::new(CountingShutdownProcessor::new(
shutdown_called.clone(),
flush_called.clone(),
))],
resource: Resource::empty(),
is_shutdown: AtomicBool::new(false),
});

// Create a scope to test behavior when providers are dropped
{
let logger_provider1 = LoggerProvider {
inner: shared_inner.clone(),
};
let logger_provider2 = LoggerProvider {
inner: shared_inner.clone(),
};

// Explicitly shut down the logger provider
let shutdown_result = logger_provider1.shutdown();
assert!(shutdown_result.is_ok());

// Verify that shutdown was called exactly once
assert_eq!(*shutdown_called.lock().unwrap(), 1);

// LoggerProvider2 should observe the shutdown state but not trigger another shutdown
let shutdown_result2 = logger_provider2.shutdown();
assert!(shutdown_result2.is_err());

// Both logger providers will be dropped at the end of this scope
}

// Verify that shutdown was only called once, even after drop
assert_eq!(*shutdown_called.lock().unwrap(), 1);
}

#[derive(Debug)]
pub(crate) struct LazyLogProcessor {
shutdown_called: Arc<Mutex<bool>>,
Expand Down Expand Up @@ -645,4 +748,36 @@
Ok(())
}
}

#[derive(Debug)]
struct CountingShutdownProcessor {
shutdown_count: Arc<Mutex<i32>>,
flush_called: Arc<Mutex<bool>>,
}

impl CountingShutdownProcessor {
fn new(shutdown_count: Arc<Mutex<i32>>, flush_called: Arc<Mutex<bool>>) -> Self {
CountingShutdownProcessor {
shutdown_count,
flush_called,
}
}
}

impl LogProcessor for CountingShutdownProcessor {
fn emit(&self, _data: &mut LogRecord, _library: &InstrumentationLibrary) {
// nothing to do
}

Check warning on line 770 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L768-L770

Added lines #L768 - L770 were not covered by tests

fn force_flush(&self) -> LogResult<()> {
*self.flush_called.lock().unwrap() = true;
Ok(())
}

Check warning on line 775 in opentelemetry-sdk/src/logs/log_emitter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_emitter.rs#L772-L775

Added lines #L772 - L775 were not covered by tests

fn shutdown(&self) -> LogResult<()> {
let mut count = self.shutdown_count.lock().unwrap();
*count += 1;
Ok(())
}
}
}
Loading