Skip to content

Commit 0ba4cbd

Browse files
authored
feat(logs): make logger shutdown &self (#1643)
1 parent 8a9a569 commit 0ba4cbd

File tree

8 files changed

+216
-29
lines changed

8 files changed

+216
-29
lines changed

opentelemetry-appender-tracing/benches/logs.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ impl LogProcessor for NoopProcessor {
6262
Ok(())
6363
}
6464

65-
fn shutdown(&mut self) -> LogResult<()> {
65+
fn shutdown(&self) -> LogResult<()> {
6666
Ok(())
6767
}
6868

opentelemetry-sdk/CHANGELOG.md

+5
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@
1313
`ProcessResourceDetector` resource detectors, use the
1414
[`opentelemetry-resource-detector`](https://crates.io/crates/opentelemetry-resource-detectors) instead.
1515
- Baggage propagation error will be reported to global error handler [#1640](https://github.com/open-telemetry/opentelemetry-rust/pull/1640)
16+
- Improves `shutdown` behavior of `LoggerProvider` and `LogProcessor` [#1643](https://github.com/open-telemetry/opentelemetry-rust/pull/1643).
17+
- `shutdown` can be called by any clone of the `LoggerProvider` without the need of waiting on all `Logger` drops. Thus, `try_shutdown` has been removed.
18+
- `shutdown` methods in `LoggerProvider` and `LogProcessor` now takes a immutable reference
19+
- After `shutdown`, `LoggerProvider` will return noop `Logger`
20+
- After `shutdown`, `LogProcessor` will not process any new logs
1621

1722
## v0.22.1
1823

opentelemetry-sdk/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ url = { workspace = true, optional = true }
2929
tokio = { workspace = true, features = ["rt", "time"], optional = true }
3030
tokio-stream = { workspace = true, optional = true }
3131
http = { workspace = true, optional = true }
32+
lazy_static = "1.4.0"
3233

3334
[package.metadata.docs.rs]
3435
all-features = true

opentelemetry-sdk/src/logs/log_emitter.rs

+109-19
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,25 @@ use opentelemetry::{
1313
#[cfg(feature = "logs_level_enabled")]
1414
use opentelemetry::logs::Severity;
1515

16+
use std::sync::atomic::AtomicBool;
1617
use std::{borrow::Cow, sync::Arc};
1718

19+
use once_cell::sync::Lazy;
20+
21+
// a no nop logger provider used as placeholder when the provider is shutdown
22+
static NOOP_LOGGER_PROVIDER: Lazy<LoggerProvider> = Lazy::new(|| LoggerProvider {
23+
inner: Arc::new(LoggerProviderInner {
24+
processors: Vec::new(),
25+
config: Config::default(),
26+
}),
27+
is_shutdown: Arc::new(AtomicBool::new(true)),
28+
});
29+
1830
#[derive(Debug, Clone)]
1931
/// Creator for `Logger` instances.
2032
pub struct LoggerProvider {
2133
inner: Arc<LoggerProviderInner>,
34+
is_shutdown: Arc<AtomicBool>,
2235
}
2336

2437
/// Default logger name if empty string is provided.
@@ -59,6 +72,10 @@ impl opentelemetry::logs::LoggerProvider for LoggerProvider {
5972
}
6073

6174
fn library_logger(&self, library: Arc<InstrumentationLibrary>) -> Self::Logger {
75+
// If the provider is shutdown, new logger will refer a no-op logger provider.
76+
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
77+
return Logger::new(library, NOOP_LOGGER_PROVIDER.clone());
78+
}
6279
Logger::new(library, self.clone())
6380
}
6481
}
@@ -87,22 +104,18 @@ impl LoggerProvider {
87104
.collect()
88105
}
89106

90-
/// Shuts down this `LoggerProvider`, panicking on failure.
91-
pub fn shutdown(&mut self) -> Vec<LogResult<()>> {
92-
self.try_shutdown()
93-
.expect("cannot shutdown LoggerProvider when child Loggers are still active")
94-
}
95-
96-
/// Attempts to shutdown this `LoggerProvider`, succeeding only when
97-
/// all cloned `LoggerProvider` values have been dropped.
98-
pub fn try_shutdown(&mut self) -> Option<Vec<LogResult<()>>> {
99-
Arc::get_mut(&mut self.inner).map(|inner| {
100-
inner
101-
.processors
102-
.iter_mut()
103-
.map(|processor| processor.shutdown())
104-
.collect()
105-
})
107+
/// Shuts down this `LoggerProvider`
108+
pub fn shutdown(&self) -> Vec<LogResult<()>> {
109+
// mark itself as already shutdown
110+
self.is_shutdown
111+
.store(true, std::sync::atomic::Ordering::Relaxed);
112+
// propagate the shutdown signal to processors
113+
// it's up to the processor to properly block new logs after shutdown
114+
self.inner
115+
.processors
116+
.iter()
117+
.map(|processor| processor.shutdown())
118+
.collect()
106119
}
107120
}
108121

@@ -168,6 +181,7 @@ impl Builder {
168181
processors: self.processors,
169182
config: self.config,
170183
}),
184+
is_shutdown: Arc::new(AtomicBool::new(false)),
171185
}
172186
}
173187
}
@@ -253,11 +267,63 @@ mod tests {
253267

254268
use super::*;
255269
use opentelemetry::global::{logger, set_logger_provider, shutdown_logger_provider};
256-
use opentelemetry::logs::Logger;
270+
use opentelemetry::logs::{Logger, LoggerProvider as _};
257271
use opentelemetry::{Key, KeyValue, Value};
258-
use std::sync::Mutex;
272+
use std::fmt::{Debug, Formatter};
273+
use std::sync::atomic::AtomicU64;
274+
use std::sync::{Arc, Mutex};
259275
use std::thread;
260276

277+
struct ShutdownTestLogProcessor {
278+
is_shutdown: Arc<Mutex<bool>>,
279+
counter: Arc<AtomicU64>,
280+
}
281+
282+
impl Debug for ShutdownTestLogProcessor {
283+
fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
284+
todo!()
285+
}
286+
}
287+
288+
impl ShutdownTestLogProcessor {
289+
pub(crate) fn new(counter: Arc<AtomicU64>) -> Self {
290+
ShutdownTestLogProcessor {
291+
is_shutdown: Arc::new(Mutex::new(false)),
292+
counter,
293+
}
294+
}
295+
}
296+
297+
impl LogProcessor for ShutdownTestLogProcessor {
298+
fn emit(&self, _data: LogData) {
299+
self.is_shutdown
300+
.lock()
301+
.map(|is_shutdown| {
302+
if !*is_shutdown {
303+
self.counter
304+
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
305+
}
306+
})
307+
.expect("lock poisoned");
308+
}
309+
310+
fn force_flush(&self) -> LogResult<()> {
311+
Ok(())
312+
}
313+
314+
fn shutdown(&self) -> LogResult<()> {
315+
self.is_shutdown
316+
.lock()
317+
.map(|mut is_shutdown| *is_shutdown = true)
318+
.expect("lock poisoned");
319+
Ok(())
320+
}
321+
322+
#[cfg(feature = "logs_level_enabled")]
323+
fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
324+
true
325+
}
326+
}
261327
#[test]
262328
fn test_logger_provider_default_resource() {
263329
let assert_resource = |provider: &super::LoggerProvider,
@@ -386,6 +452,30 @@ mod tests {
386452

387453
#[test]
388454
fn shutdown_test() {
455+
let counter = Arc::new(AtomicU64::new(0));
456+
let logger_provider = LoggerProvider::builder()
457+
.with_log_processor(ShutdownTestLogProcessor::new(counter.clone()))
458+
.build();
459+
460+
let logger1 = logger_provider.logger("test-logger1");
461+
let logger2 = logger_provider.logger("test-logger2");
462+
logger1.emit(LogRecord::default());
463+
logger2.emit(LogRecord::default());
464+
465+
let logger3 = logger_provider.logger("test-logger3");
466+
let handle = thread::spawn(move || {
467+
logger3.emit(LogRecord::default());
468+
});
469+
handle.join().expect("thread panicked");
470+
471+
let _ = logger_provider.shutdown();
472+
logger1.emit(LogRecord::default());
473+
474+
assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 3);
475+
}
476+
477+
#[test]
478+
fn global_shutdown_test() {
389479
// cargo test shutdown_test --features=logs
390480

391481
// Arrange
@@ -493,7 +583,7 @@ mod tests {
493583
Ok(())
494584
}
495585

496-
fn shutdown(&mut self) -> LogResult<()> {
586+
fn shutdown(&self) -> LogResult<()> {
497587
*self.shutdown_called.lock().unwrap() = true;
498588
Ok(())
499589
}

opentelemetry-sdk/src/logs/log_processor.rs

+75-3
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use opentelemetry::{
1313
global,
1414
logs::{LogError, LogResult},
1515
};
16+
use std::sync::atomic::AtomicBool;
1617
use std::{cmp::min, env, sync::Mutex};
1718
use std::{
1819
fmt::{self, Debug, Formatter},
@@ -46,7 +47,9 @@ pub trait LogProcessor: Send + Sync + Debug {
4647
/// Force the logs lying in the cache to be exported.
4748
fn force_flush(&self) -> LogResult<()>;
4849
/// Shuts down the processor.
49-
fn shutdown(&mut self) -> LogResult<()>;
50+
/// After shutdown returns the log processor should stop processing any logs.
51+
/// It's up to the implementation on when to drop the LogProcessor.
52+
fn shutdown(&self) -> LogResult<()>;
5053
#[cfg(feature = "logs_level_enabled")]
5154
/// Check if logging is enabled
5255
fn event_enabled(&self, level: Severity, target: &str, name: &str) -> bool;
@@ -59,18 +62,25 @@ pub trait LogProcessor: Send + Sync + Debug {
5962
#[derive(Debug)]
6063
pub struct SimpleLogProcessor {
6164
exporter: Mutex<Box<dyn LogExporter>>,
65+
is_shutdown: AtomicBool,
6266
}
6367

6468
impl SimpleLogProcessor {
6569
pub(crate) fn new(exporter: Box<dyn LogExporter>) -> Self {
6670
SimpleLogProcessor {
6771
exporter: Mutex::new(exporter),
72+
is_shutdown: AtomicBool::new(false),
6873
}
6974
}
7075
}
7176

7277
impl LogProcessor for SimpleLogProcessor {
7378
fn emit(&self, data: LogData) {
79+
// noop after shutdown
80+
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
81+
return;
82+
}
83+
7484
let result = self
7585
.exporter
7686
.lock()
@@ -85,7 +95,9 @@ impl LogProcessor for SimpleLogProcessor {
8595
Ok(())
8696
}
8797

88-
fn shutdown(&mut self) -> LogResult<()> {
98+
fn shutdown(&self) -> LogResult<()> {
99+
self.is_shutdown
100+
.store(true, std::sync::atomic::Ordering::Relaxed);
89101
if let Ok(mut exporter) = self.exporter.lock() {
90102
exporter.shutdown();
91103
Ok(())
@@ -141,7 +153,7 @@ impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
141153
.and_then(std::convert::identity)
142154
}
143155

144-
fn shutdown(&mut self) -> LogResult<()> {
156+
fn shutdown(&self) -> LogResult<()> {
145157
let (res_sender, res_receiver) = oneshot::channel();
146158
self.message_sender
147159
.try_send(BatchMessage::Shutdown(res_sender))
@@ -458,6 +470,9 @@ mod tests {
458470
BatchLogProcessor, OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
459471
OTEL_BLRP_MAX_QUEUE_SIZE, OTEL_BLRP_SCHEDULE_DELAY,
460472
};
473+
use crate::export::logs::LogData;
474+
use crate::logs::{LogProcessor, SimpleLogProcessor};
475+
use crate::testing::logs::InMemoryLogsExporterBuilder;
461476
use crate::{
462477
logs::{
463478
log_processor::{
@@ -620,4 +635,61 @@ mod tests {
620635
assert_eq!(actual.max_export_timeout, Duration::from_millis(3));
621636
assert_eq!(actual.max_queue_size, 4);
622637
}
638+
639+
#[tokio::test(flavor = "multi_thread")]
640+
async fn test_batch_shutdown() {
641+
// assert we will receive an error
642+
// setup
643+
let exporter = InMemoryLogsExporterBuilder::default()
644+
.keep_records_on_shutdown()
645+
.build();
646+
let processor = BatchLogProcessor::new(
647+
Box::new(exporter.clone()),
648+
BatchConfig::default(),
649+
runtime::Tokio,
650+
);
651+
processor.emit(LogData {
652+
record: Default::default(),
653+
resource: Default::default(),
654+
instrumentation: Default::default(),
655+
});
656+
processor.force_flush().unwrap();
657+
processor.shutdown().unwrap();
658+
// todo: expect to see errors here. How should we assert this?
659+
processor.emit(LogData {
660+
record: Default::default(),
661+
resource: Default::default(),
662+
instrumentation: Default::default(),
663+
});
664+
assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
665+
}
666+
667+
#[test]
668+
fn test_simple_shutdown() {
669+
let exporter = InMemoryLogsExporterBuilder::default()
670+
.keep_records_on_shutdown()
671+
.build();
672+
let processor = SimpleLogProcessor::new(Box::new(exporter.clone()));
673+
674+
processor.emit(LogData {
675+
record: Default::default(),
676+
resource: Default::default(),
677+
instrumentation: Default::default(),
678+
});
679+
680+
processor.shutdown().unwrap();
681+
682+
let is_shutdown = processor
683+
.is_shutdown
684+
.load(std::sync::atomic::Ordering::Relaxed);
685+
assert!(is_shutdown);
686+
687+
processor.emit(LogData {
688+
record: Default::default(),
689+
resource: Default::default(),
690+
instrumentation: Default::default(),
691+
});
692+
693+
assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
694+
}
623695
}

0 commit comments

Comments
 (0)