@@ -4,16 +4,19 @@ use crate::{
4
4
runtime:: RuntimeChannel ,
5
5
} ;
6
6
use opentelemetry:: {
7
- global:: { self } ,
8
- logs:: LogResult ,
7
+ global,
8
+ logs:: { LogError , LogResult } ,
9
9
trace:: TraceContextExt ,
10
10
Context , InstrumentationLibrary ,
11
11
} ;
12
12
13
13
#[ cfg( feature = "logs_level_enabled" ) ]
14
14
use opentelemetry:: logs:: Severity ;
15
15
16
- use std:: { borrow:: Cow , sync:: Arc } ;
16
+ use std:: {
17
+ borrow:: Cow ,
18
+ sync:: { atomic:: Ordering , Arc } ,
19
+ } ;
17
20
use std:: { sync:: atomic:: AtomicBool , time:: SystemTime } ;
18
21
19
22
use once_cell:: sync:: Lazy ;
@@ -105,17 +108,29 @@ impl LoggerProvider {
105
108
}
106
109
107
110
/// Shuts down this `LoggerProvider`
108
- pub fn shutdown ( & self ) -> Vec < LogResult < ( ) > > {
109
- // mark itself as already shutdown
110
- self . is_shutdown
111
- . store ( true , std:: sync:: atomic:: Ordering :: Relaxed ) ;
112
- // propagate the shutdown signal to processors
113
- // it's up to the processor to properly block new logs after shutdown
114
- self . inner
115
- . processors
116
- . iter ( )
117
- . map ( |processor| processor. shutdown ( ) )
118
- . collect ( )
111
+ pub fn shutdown ( & self ) -> LogResult < ( ) > {
112
+ if self
113
+ . is_shutdown
114
+ . compare_exchange ( false , true , Ordering :: SeqCst , Ordering :: SeqCst )
115
+ . is_ok ( )
116
+ {
117
+ // propagate the shutdown signal to processors
118
+ // it's up to the processor to properly block new logs after shutdown
119
+ let mut errs = vec ! [ ] ;
120
+ for processor in & self . inner . processors {
121
+ if let Err ( err) = processor. shutdown ( ) {
122
+ errs. push ( err) ;
123
+ }
124
+ }
125
+
126
+ if errs. is_empty ( ) {
127
+ Ok ( ( ) )
128
+ } else {
129
+ Err ( LogError :: Other ( format ! ( "{errs:?}" ) . into ( ) ) )
130
+ }
131
+ } else {
132
+ Err ( LogError :: Other ( "logger provider already shut down" . into ( ) ) )
133
+ }
119
134
}
120
135
}
121
136
@@ -485,6 +500,25 @@ mod tests {
485
500
assert_eq ! ( counter. load( std:: sync:: atomic:: Ordering :: SeqCst ) , 3 ) ;
486
501
}
487
502
503
+ #[ test]
504
+ fn shutdown_idempotent_test ( ) {
505
+ let counter = Arc :: new ( AtomicU64 :: new ( 0 ) ) ;
506
+ let logger_provider = LoggerProvider :: builder ( )
507
+ . with_log_processor ( ShutdownTestLogProcessor :: new ( counter. clone ( ) ) )
508
+ . build ( ) ;
509
+
510
+ let shutdown_res = logger_provider. shutdown ( ) ;
511
+ assert ! ( shutdown_res. is_ok( ) ) ;
512
+
513
+ // Subsequent shutdowns should return an error.
514
+ let shutdown_res = logger_provider. shutdown ( ) ;
515
+ assert ! ( shutdown_res. is_err( ) ) ;
516
+
517
+ // Subsequent shutdowns should return an error.
518
+ let shutdown_res = logger_provider. shutdown ( ) ;
519
+ assert ! ( shutdown_res. is_err( ) ) ;
520
+ }
521
+
488
522
#[ test]
489
523
fn global_shutdown_test ( ) {
490
524
// cargo test shutdown_test --features=logs
@@ -508,7 +542,7 @@ mod tests {
508
542
509
543
// explicitly calling shutdown on logger_provider. This will
510
544
// indeed do the shutdown, even if there are loggers still alive.
511
- logger_provider. shutdown ( ) ;
545
+ let _ = logger_provider. shutdown ( ) ;
512
546
513
547
// Assert
514
548
0 commit comments