Skip to content

Commit 967dc93

Browse files
authored
Move shutdown checks to MeterProvider (#2433)
1 parent c726c4d commit 967dc93

File tree

2 files changed

+20
-32
lines changed

2 files changed

+20
-32
lines changed

opentelemetry-sdk/src/metrics/meter_provider.rs

+20-11
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ pub struct SdkMeterProvider {
3838
struct SdkMeterProviderInner {
3939
pipes: Arc<Pipelines>,
4040
meters: Mutex<HashMap<InstrumentationScope, Arc<SdkMeter>>>,
41-
is_shutdown: AtomicBool,
41+
shutdown_invoked: AtomicBool,
4242
}
4343

4444
impl Default for SdkMeterProvider {
@@ -119,20 +119,29 @@ impl SdkMeterProvider {
119119

120120
impl SdkMeterProviderInner {
121121
fn force_flush(&self) -> MetricResult<()> {
122-
self.pipes.force_flush()
122+
if self
123+
.shutdown_invoked
124+
.load(std::sync::atomic::Ordering::Relaxed)
125+
{
126+
Err(MetricError::Other(
127+
"Cannot perform flush as MeterProvider shutdown already invoked.".into(),
128+
))
129+
} else {
130+
self.pipes.force_flush()
131+
}
123132
}
124133

125134
fn shutdown(&self) -> MetricResult<()> {
126135
if self
127-
.is_shutdown
128-
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
129-
.is_ok()
136+
.shutdown_invoked
137+
.swap(true, std::sync::atomic::Ordering::SeqCst)
130138
{
131-
self.pipes.shutdown()
132-
} else {
139+
// If the previous value was true, shutdown was already invoked.
133140
Err(MetricError::Other(
134-
"metrics provider already shut down".into(),
141+
"MeterProvider shutdown already invoked.".into(),
135142
))
143+
} else {
144+
self.pipes.shutdown()
136145
}
137146
}
138147
}
@@ -141,7 +150,7 @@ impl Drop for SdkMeterProviderInner {
141150
fn drop(&mut self) {
142151
// If user has already shutdown the provider manually by calling
143152
// shutdown(), then we don't need to call shutdown again.
144-
if self.is_shutdown.load(Ordering::Relaxed) {
153+
if self.shutdown_invoked.load(Ordering::Relaxed) {
145154
otel_debug!(
146155
name: "MeterProvider.Drop.AlreadyShutdown",
147156
message = "MeterProvider was already shut down; drop will not attempt shutdown again."
@@ -173,7 +182,7 @@ impl MeterProvider for SdkMeterProvider {
173182
}
174183

175184
fn meter_with_scope(&self, scope: InstrumentationScope) -> Meter {
176-
if self.inner.is_shutdown.load(Ordering::Relaxed) {
185+
if self.inner.shutdown_invoked.load(Ordering::Relaxed) {
177186
otel_debug!(
178187
name: "MeterProvider.NoOpMeterReturned",
179188
meter_name = scope.name(),
@@ -270,7 +279,7 @@ impl MeterProviderBuilder {
270279
self.views,
271280
)),
272281
meters: Default::default(),
273-
is_shutdown: AtomicBool::new(false),
282+
shutdown_invoked: AtomicBool::new(false),
274283
}),
275284
};
276285

opentelemetry-sdk/src/metrics/periodic_reader.rs

-21
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use std::{
22
env, fmt,
33
sync::{
4-
atomic::AtomicBool,
54
mpsc::{self, Receiver, Sender},
65
Arc, Mutex, Weak,
76
},
@@ -158,7 +157,6 @@ impl PeriodicReader {
158157
let reader = PeriodicReader {
159158
inner: Arc::new(PeriodicReaderInner {
160159
message_sender: Arc::new(message_sender),
161-
shutdown_invoked: AtomicBool::new(false),
162160
producer: Mutex::new(None),
163161
exporter: Arc::new(exporter),
164162
}),
@@ -300,7 +298,6 @@ struct PeriodicReaderInner {
300298
exporter: Arc<dyn PushMetricExporter>,
301299
message_sender: Arc<mpsc::Sender<Message>>,
302300
producer: Mutex<Option<Weak<dyn SdkProducer>>>,
303-
shutdown_invoked: AtomicBool,
304301
}
305302

306303
impl PeriodicReaderInner {
@@ -374,15 +371,6 @@ impl PeriodicReaderInner {
374371
}
375372

376373
fn force_flush(&self) -> MetricResult<()> {
377-
if self
378-
.shutdown_invoked
379-
.load(std::sync::atomic::Ordering::Relaxed)
380-
{
381-
return Err(MetricError::Other(
382-
"Cannot perform flush as PeriodicReader shutdown already invoked.".into(),
383-
));
384-
}
385-
386374
// TODO: Better message for this scenario.
387375
// Flush and Shutdown called from 2 threads Flush check shutdown
388376
// flag before shutdown thread sets it. Both threads attempt to send
@@ -414,15 +402,6 @@ impl PeriodicReaderInner {
414402
}
415403

416404
fn shutdown(&self) -> MetricResult<()> {
417-
if self
418-
.shutdown_invoked
419-
.swap(true, std::sync::atomic::Ordering::Relaxed)
420-
{
421-
return Err(MetricError::Other(
422-
"PeriodicReader shutdown already invoked.".into(),
423-
));
424-
}
425-
426405
// TODO: See if this is better to be created upfront.
427406
let (response_tx, response_rx) = mpsc::channel();
428407
self.message_sender

0 commit comments

Comments
 (0)