Skip to content

Commit e378bc8

Browse files
authored
Fix PeriodicReader shutdown to invoke shutdown on exporter (#2477)
1 parent 36f9caf commit e378bc8

File tree

2 files changed

+53
-8
lines changed

2 files changed

+53
-8
lines changed

opentelemetry-sdk/src/metrics/periodic_reader.rs

+53-3
Original file line numberDiff line numberDiff line change
@@ -154,11 +154,12 @@ impl PeriodicReader {
154154
{
155155
let (message_sender, message_receiver): (Sender<Message>, Receiver<Message>) =
156156
mpsc::channel();
157+
let exporter_arc = Arc::new(exporter);
157158
let reader = PeriodicReader {
158159
inner: Arc::new(PeriodicReaderInner {
159160
message_sender: Arc::new(message_sender),
160161
producer: Mutex::new(None),
161-
exporter: Arc::new(exporter),
162+
exporter: exporter_arc.clone(),
162163
}),
163164
};
164165
let cloned_reader = reader.clone();
@@ -213,7 +214,13 @@ impl PeriodicReader {
213214
Ok(Message::Shutdown(response_sender)) => {
214215
// Perform final export and break out of loop and exit the thread
215216
otel_debug!(name: "PeriodReaderThreadExportingDueToShutdown");
216-
if let Err(_e) = cloned_reader.collect_and_export(timeout) {
217+
let export_result = cloned_reader.collect_and_export(timeout);
218+
let shutdown_result = exporter_arc.shutdown();
219+
otel_debug!(
220+
name: "PeriodReaderInvokedExporterShutdown",
221+
shutdown_result = format!("{:?}", shutdown_result)
222+
);
223+
if export_result.is_err() || shutdown_result.is_err() {
217224
response_sender.send(false).unwrap();
218225
} else {
219226
response_sender.send(true).unwrap();
@@ -474,7 +481,7 @@ mod tests {
474481
use opentelemetry::metrics::MeterProvider;
475482
use std::{
476483
sync::{
477-
atomic::{AtomicUsize, Ordering},
484+
atomic::{AtomicBool, AtomicUsize, Ordering},
478485
mpsc, Arc,
479486
},
480487
time::Duration,
@@ -525,6 +532,31 @@ mod tests {
525532
}
526533
}
527534

535+
#[derive(Debug, Clone, Default)]
536+
struct MockMetricExporter {
537+
is_shutdown: Arc<AtomicBool>,
538+
}
539+
540+
#[async_trait]
541+
impl PushMetricExporter for MockMetricExporter {
542+
async fn export(&self, _metrics: &mut ResourceMetrics) -> MetricResult<()> {
543+
Ok(())
544+
}
545+
546+
async fn force_flush(&self) -> MetricResult<()> {
547+
Ok(())
548+
}
549+
550+
fn shutdown(&self) -> MetricResult<()> {
551+
self.is_shutdown.store(true, Ordering::Relaxed);
552+
Ok(())
553+
}
554+
555+
fn temporality(&self) -> Temporality {
556+
Temporality::Cumulative
557+
}
558+
}
559+
528560
#[test]
529561
fn collection_triggered_by_interval_multiple() {
530562
// Arrange
@@ -687,6 +719,24 @@ mod tests {
687719
assert!(exporter.get_count() >= 2);
688720
}
689721

722+
#[test]
723+
fn shutdown_passed_to_exporter() {
724+
// Arrange
725+
let exporter = MockMetricExporter::default();
726+
let reader = PeriodicReader::builder(exporter.clone()).build();
727+
728+
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
729+
let meter = meter_provider.meter("test");
730+
let counter = meter.u64_counter("sync_counter").build();
731+
counter.add(1, &[]);
732+
733+
// shutdown the provider, which should call shutdown on periodic reader
734+
// which in turn should call shutdown on exporter.
735+
let result = meter_provider.shutdown();
736+
assert!(result.is_ok());
737+
assert!(exporter.is_shutdown.load(Ordering::Relaxed));
738+
}
739+
690740
#[test]
691741
fn collection() {
692742
collection_triggered_by_interval_helper();

opentelemetry-sdk/src/testing/metrics/in_memory_exporter.rs

-5
Original file line numberDiff line numberDiff line change
@@ -278,11 +278,6 @@ impl PushMetricExporter for InMemoryMetricExporter {
278278
}
279279

280280
fn shutdown(&self) -> MetricResult<()> {
281-
self.metrics
282-
.lock()
283-
.map(|mut metrics_guard| metrics_guard.clear())
284-
.map_err(MetricError::from)?;
285-
286281
Ok(())
287282
}
288283

0 commit comments

Comments
 (0)