diff --git a/examples/metrics-basic/src/main.rs b/examples/metrics-basic/src/main.rs index 113b4a332e..cfeb7543e8 100644 --- a/examples/metrics-basic/src/main.rs +++ b/examples/metrics-basic/src/main.rs @@ -1,4 +1,5 @@ use opentelemetry::{global, KeyValue}; +use opentelemetry_sdk::error::ShutdownError; use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider}; use opentelemetry_sdk::Resource; use std::error::Error; @@ -23,7 +24,7 @@ fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider { } #[tokio::main] -async fn main() -> Result<(), Box> { +async fn main() -> Result<(), Box> { // Initialize the MeterProvider with the stdout Exporter. let meter_provider = init_meter_provider(); @@ -137,9 +138,41 @@ async fn main() -> Result<(), Box> { }) .build(); - // Metrics are exported by default every 30 seconds when using stdout exporter, - // however shutting down the MeterProvider here instantly flushes - // the metrics, instead of waiting for the 30 sec interval. + // Metrics are exported by default every 30 seconds when using stdout + // exporter, however shutting down the MeterProvider here instantly flushes + // the metrics, instead of waiting for the 30 sec interval. Shutdown returns + // a result, which is bubbled up to the caller The commented code below + // demonstrates handling the shutdown result, instead of bubbling up the + // error. meter_provider.shutdown()?; + + // let shutdown_result = meter_provider.shutdown(); + + // Handle the shutdown result. + // match shutdown_result { + // Ok(_) => println!("MeterProvider shutdown successfully"), + // Err(e) => { + // match e { + // opentelemetry_sdk::error::ShutdownError::InternalFailure(message) => { + // // This indicates some internal failure during shutdown. The + // // error message is intended for logging purposes only and + // // should not be used to make programmatic decisions. + // println!("MeterProvider shutdown failed: {}", message) + // } + // opentelemetry_sdk::error::ShutdownError::AlreadyShutdown => { + // // This indicates some user code tried to shutdown + // // elsewhere. user need to review their code to ensure + // // shutdown is called only once. + // println!("MeterProvider already shutdown") + // } + // opentelemetry_sdk::error::ShutdownError::Timeout(e) => { + // // This indicates the shutdown timed out, and a good hint to + // // user to increase the timeout. (Shutdown method does not + // // allow custom timeout today, but that is temporary) + // println!("MeterProvider shutdown timed out after {:?}", e) + // } + // } + // } + // } Ok(()) } diff --git a/opentelemetry-otlp/src/exporter/http/metrics.rs b/opentelemetry-otlp/src/exporter/http/metrics.rs index aac771a54a..9ac0ba5455 100644 --- a/opentelemetry-otlp/src/exporter/http/metrics.rs +++ b/opentelemetry-otlp/src/exporter/http/metrics.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use async_trait::async_trait; use http::{header::CONTENT_TYPE, Method}; use opentelemetry::otel_debug; +use opentelemetry_sdk::error::{ShutdownError, ShutdownResult}; use opentelemetry_sdk::metrics::data::ResourceMetrics; use opentelemetry_sdk::metrics::{MetricError, MetricResult}; @@ -43,8 +44,11 @@ impl MetricsClient for OtlpHttpClient { Ok(()) } - fn shutdown(&self) -> MetricResult<()> { - let _ = self.client.lock()?.take(); + fn shutdown(&self) -> ShutdownResult { + self.client + .lock() + .map_err(|e| ShutdownError::InternalFailure(format!("Failed to acquire lock: {}", e)))? + .take(); Ok(()) } diff --git a/opentelemetry-otlp/src/exporter/tonic/metrics.rs b/opentelemetry-otlp/src/exporter/tonic/metrics.rs index 8a938885a3..a413839a65 100644 --- a/opentelemetry-otlp/src/exporter/tonic/metrics.rs +++ b/opentelemetry-otlp/src/exporter/tonic/metrics.rs @@ -6,6 +6,7 @@ use opentelemetry::otel_debug; use opentelemetry_proto::tonic::collector::metrics::v1::{ metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest, }; +use opentelemetry_sdk::error::{ShutdownError, ShutdownResult}; use opentelemetry_sdk::metrics::data::ResourceMetrics; use opentelemetry_sdk::metrics::{MetricError, MetricResult}; use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; @@ -89,8 +90,11 @@ impl MetricsClient for TonicMetricsClient { Ok(()) } - fn shutdown(&self) -> MetricResult<()> { - let _ = self.inner.lock()?.take(); + fn shutdown(&self) -> ShutdownResult { + self.inner + .lock() + .map_err(|e| ShutdownError::InternalFailure(format!("Failed to acquire lock: {}", e)))? + .take(); Ok(()) } diff --git a/opentelemetry-otlp/src/metric.rs b/opentelemetry-otlp/src/metric.rs index d4a8c359e4..08642a0dc5 100644 --- a/opentelemetry-otlp/src/metric.rs +++ b/opentelemetry-otlp/src/metric.rs @@ -16,6 +16,7 @@ use crate::NoExporterBuilderSet; use async_trait::async_trait; use core::fmt; +use opentelemetry_sdk::error::ShutdownResult; use opentelemetry_sdk::metrics::MetricResult; use opentelemetry_sdk::metrics::{ @@ -123,7 +124,7 @@ impl HasHttpConfig for MetricExporterBuilder { #[async_trait] pub(crate) trait MetricsClient: fmt::Debug + Send + Sync + 'static { async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()>; - fn shutdown(&self) -> MetricResult<()>; + fn shutdown(&self) -> ShutdownResult; } /// Export metrics in OTEL format. @@ -149,7 +150,7 @@ impl PushMetricExporter for MetricExporter { Ok(()) } - fn shutdown(&self) -> MetricResult<()> { + fn shutdown(&self) -> ShutdownResult { self.client.shutdown() } diff --git a/opentelemetry-sdk/benches/metric.rs b/opentelemetry-sdk/benches/metric.rs index 112b3f780f..2eda5bc790 100644 --- a/opentelemetry-sdk/benches/metric.rs +++ b/opentelemetry-sdk/benches/metric.rs @@ -7,6 +7,7 @@ use opentelemetry::{ Key, KeyValue, }; use opentelemetry_sdk::{ + error::ShutdownResult, metrics::{ data::ResourceMetrics, new_view, reader::MetricReader, Aggregation, Instrument, InstrumentKind, ManualReader, MetricResult, Pipeline, SdkMeterProvider, Stream, @@ -31,7 +32,7 @@ impl MetricReader for SharedReader { self.0.force_flush() } - fn shutdown(&self) -> MetricResult<()> { + fn shutdown(&self) -> ShutdownResult { self.0.shutdown() } diff --git a/opentelemetry-sdk/src/error.rs b/opentelemetry-sdk/src/error.rs index 6a108f0cc9..2ba0df75a5 100644 --- a/opentelemetry-sdk/src/error.rs +++ b/opentelemetry-sdk/src/error.rs @@ -1,7 +1,45 @@ //! Wrapper for error from trace, logs and metrics part of open telemetry. +use std::{result::Result, time::Duration}; + +use thiserror::Error; + /// Trait for errors returned by exporters pub trait ExportError: std::error::Error + Send + Sync + 'static { /// The name of exporter that returned this error fn exporter_name(&self) -> &'static str; } + +#[derive(Error, Debug)] +/// Errors that can occur during shutdown. +pub enum ShutdownError { + /// Shutdown has already been invoked. + /// + /// While shutdown is idempotent and calling it multiple times has no + /// impact, this error suggests that another part of the application is + /// invoking `shutdown` earlier than intended. Users should review their + /// code to identify unintended or duplicate shutdown calls and ensure it is + /// only triggered once at the correct place. + #[error("Shutdown already invoked")] + AlreadyShutdown, + + /// Shutdown timed out before completing. + /// + /// This does not necessarily indicate a failure—shutdown may still be + /// complete. If this occurs frequently, consider increasing the timeout + /// duration to allow more time for completion. + #[error("Shutdown timed out after {0:?}")] + Timeout(Duration), + + /// Shutdown failed due to an internal error. + /// + /// The error message is intended for logging purposes only and should not + /// be used to make programmatic decisions. It is implementation-specific + /// and subject to change without notice. Consumers of this error should not + /// rely on its content beyond logging. + #[error("Shutdown failed: {0}")] + InternalFailure(String), +} + +/// A specialized `Result` type for Shutdown operations. +pub type ShutdownResult = Result<(), ShutdownError>; diff --git a/opentelemetry-sdk/src/metrics/exporter.rs b/opentelemetry-sdk/src/metrics/exporter.rs index 33c1fcb6be..868930883e 100644 --- a/opentelemetry-sdk/src/metrics/exporter.rs +++ b/opentelemetry-sdk/src/metrics/exporter.rs @@ -1,6 +1,7 @@ //! Interfaces for exporting metrics use async_trait::async_trait; +use crate::error::ShutdownResult; use crate::metrics::MetricResult; use crate::metrics::data::ResourceMetrics; @@ -27,7 +28,7 @@ pub trait PushMetricExporter: Send + Sync + 'static { /// /// After Shutdown is called, calls to Export will perform no operation and /// instead will return an error indicating the shutdown state. - fn shutdown(&self) -> MetricResult<()>; + fn shutdown(&self) -> ShutdownResult; /// Access the [Temporality] of the MetricExporter. fn temporality(&self) -> Temporality; diff --git a/opentelemetry-sdk/src/metrics/in_memory_exporter.rs b/opentelemetry-sdk/src/metrics/in_memory_exporter.rs index 2c5d988cf3..a87532b460 100644 --- a/opentelemetry-sdk/src/metrics/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/metrics/in_memory_exporter.rs @@ -1,3 +1,4 @@ +use crate::error::ShutdownResult; use crate::metrics::data::{self, Gauge, Sum}; use crate::metrics::data::{Histogram, Metric, ResourceMetrics, ScopeMetrics}; use crate::metrics::exporter::PushMetricExporter; @@ -277,7 +278,7 @@ impl PushMetricExporter for InMemoryMetricExporter { Ok(()) // In this implementation, flush does nothing } - fn shutdown(&self) -> MetricResult<()> { + fn shutdown(&self) -> ShutdownResult { Ok(()) } diff --git a/opentelemetry-sdk/src/metrics/manual_reader.rs b/opentelemetry-sdk/src/metrics/manual_reader.rs index 652bf19a35..206e8c6025 100644 --- a/opentelemetry-sdk/src/metrics/manual_reader.rs +++ b/opentelemetry-sdk/src/metrics/manual_reader.rs @@ -5,7 +5,10 @@ use std::{ use opentelemetry::otel_debug; -use crate::metrics::{MetricError, MetricResult, Temporality}; +use crate::{ + error::{ShutdownError, ShutdownResult}, + metrics::{MetricError, MetricResult, Temporality}, +}; use super::{ data::ResourceMetrics, @@ -107,8 +110,10 @@ impl MetricReader for ManualReader { } /// Closes any connections and frees any resources used by the reader. - fn shutdown(&self) -> MetricResult<()> { - let mut inner = self.inner.lock()?; + fn shutdown(&self) -> ShutdownResult { + let mut inner = self.inner.lock().map_err(|e| { + ShutdownError::InternalFailure(format!("Failed to acquire lock: {}", e)) + })?; // Any future call to collect will now return an error. inner.sdk_producer = None; diff --git a/opentelemetry-sdk/src/metrics/meter_provider.rs b/opentelemetry-sdk/src/metrics/meter_provider.rs index 881082714f..c3aa23f52c 100644 --- a/opentelemetry-sdk/src/metrics/meter_provider.rs +++ b/opentelemetry-sdk/src/metrics/meter_provider.rs @@ -12,8 +12,11 @@ use opentelemetry::{ otel_debug, otel_error, otel_info, InstrumentationScope, }; -use crate::metrics::{MetricError, MetricResult}; use crate::Resource; +use crate::{ + error::ShutdownResult, + metrics::{MetricError, MetricResult}, +}; use super::{ meter::SdkMeter, noop::NoopMeter, pipeline::Pipelines, reader::MetricReader, view::View, @@ -108,7 +111,7 @@ impl SdkMeterProvider { /// /// There is no guaranteed that all telemetry be flushed or all resources have /// been released on error. - pub fn shutdown(&self) -> MetricResult<()> { + pub fn shutdown(&self) -> ShutdownResult { otel_info!( name: "MeterProvider.Shutdown", message = "User initiated shutdown of MeterProvider." @@ -131,15 +134,13 @@ impl SdkMeterProviderInner { } } - fn shutdown(&self) -> MetricResult<()> { + fn shutdown(&self) -> ShutdownResult { if self .shutdown_invoked .swap(true, std::sync::atomic::Ordering::SeqCst) { // If the previous value was true, shutdown was already invoked. - Err(MetricError::Other( - "MeterProvider shutdown already invoked.".into(), - )) + Err(crate::error::ShutdownError::AlreadyShutdown) } else { self.pipes.shutdown() } diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index 2f4bfccee4..4d8a0a202b 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -11,6 +11,7 @@ use std::{ use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn}; use crate::{ + error::{ShutdownError, ShutdownResult}, metrics::{exporter::PushMetricExporter, reader::SdkProducer, MetricError, MetricResult}, Resource, }; @@ -402,12 +403,12 @@ impl PeriodicReaderInner { } } - fn shutdown(&self) -> MetricResult<()> { + fn shutdown(&self) -> ShutdownResult { // TODO: See if this is better to be created upfront. let (response_tx, response_rx) = mpsc::channel(); self.message_sender .send(Message::Shutdown(response_tx)) - .map_err(|e| MetricError::Other(e.to_string()))?; + .map_err(|e| ShutdownError::InternalFailure(e.to_string()))?; // TODO: Make this timeout configurable. match response_rx.recv_timeout(Duration::from_secs(5)) { @@ -415,14 +416,14 @@ impl PeriodicReaderInner { if response { Ok(()) } else { - Err(MetricError::Other("Failed to shutdown".into())) + Err(ShutdownError::InternalFailure("Failed to shutdown".into())) } } - Err(mpsc::RecvTimeoutError::Timeout) => Err(MetricError::Other( - "Failed to shutdown due to Timeout".into(), - )), + Err(mpsc::RecvTimeoutError::Timeout) => { + Err(ShutdownError::Timeout(Duration::from_secs(5))) + } Err(mpsc::RecvTimeoutError::Disconnected) => { - Err(MetricError::Other("Failed to shutdown".into())) + Err(ShutdownError::InternalFailure("Failed to shutdown".into())) } } } @@ -451,7 +452,7 @@ impl MetricReader for PeriodicReader { // completion, and avoid blocking the thread. The default shutdown on drop // can still use blocking call. If user already explicitly called shutdown, // drop won't call shutdown again. - fn shutdown(&self) -> MetricResult<()> { + fn shutdown(&self) -> ShutdownResult { self.inner.shutdown() } @@ -471,10 +472,10 @@ impl MetricReader for PeriodicReader { mod tests { use super::PeriodicReader; use crate::{ - metrics::InMemoryMetricExporter, + error::ShutdownResult, metrics::{ - data::ResourceMetrics, exporter::PushMetricExporter, reader::MetricReader, MetricError, - MetricResult, SdkMeterProvider, Temporality, + data::ResourceMetrics, exporter::PushMetricExporter, reader::MetricReader, + InMemoryMetricExporter, MetricError, MetricResult, SdkMeterProvider, Temporality, }, Resource, }; @@ -524,7 +525,7 @@ mod tests { Ok(()) } - fn shutdown(&self) -> MetricResult<()> { + fn shutdown(&self) -> ShutdownResult { Ok(()) } @@ -548,7 +549,7 @@ mod tests { Ok(()) } - fn shutdown(&self) -> MetricResult<()> { + fn shutdown(&self) -> ShutdownResult { self.is_shutdown.store(true, Ordering::Relaxed); Ok(()) } diff --git a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs index dd7e8f9d72..8ea0897e18 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader_with_async_runtime.rs @@ -15,6 +15,7 @@ use opentelemetry::{otel_debug, otel_error}; use crate::runtime::Runtime; use crate::{ + error::{ShutdownError, ShutdownResult}, metrics::{exporter::PushMetricExporter, reader::SdkProducer, MetricError, MetricResult}, Resource, }; @@ -216,7 +217,7 @@ struct PeriodicReaderInner { enum Message { Export, Flush(oneshot::Sender>), - Shutdown(oneshot::Sender>), + Shutdown(oneshot::Sender), } enum ProducerOrWorker { @@ -295,7 +296,9 @@ impl PeriodicReaderWorker { ); let res = self.collect_and_export().await; let _ = self.reader.exporter.shutdown(); - if let Err(send_error) = ch.send(res) { + if let Err(send_error) = + ch.send(res.map_err(|e| ShutdownError::InternalFailure(e.to_string()))) + { otel_debug!( name: "PeriodicReader.Shutdown.SendResultError", message = "Failed to send shutdown result", @@ -375,24 +378,30 @@ impl MetricReader for PeriodicReader { .and_then(|res| res) } - fn shutdown(&self) -> MetricResult<()> { - let mut inner = self.inner.lock()?; + fn shutdown(&self) -> ShutdownResult { + let mut inner = self + .inner + .lock() + .map_err(|e| ShutdownError::InternalFailure(e.to_string()))?; if inner.is_shutdown { - return Err(MetricError::Other("reader is already shut down".into())); + return Err(ShutdownError::AlreadyShutdown); } let (sender, receiver) = oneshot::channel(); inner .message_sender .try_send(Message::Shutdown(sender)) - .map_err(|e| MetricError::Other(e.to_string()))?; + .map_err(|e| ShutdownError::InternalFailure(e.to_string()))?; drop(inner); // don't hold lock when blocking on future let shutdown_result = futures_executor::block_on(receiver) - .map_err(|err| MetricError::Other(err.to_string()))?; + .map_err(|err| ShutdownError::InternalFailure(err.to_string()))?; // Acquire the lock again to set the shutdown flag - let mut inner = self.inner.lock()?; + let mut inner = self + .inner + .lock() + .map_err(|e| ShutdownError::InternalFailure(e.to_string()))?; inner.is_shutdown = true; shutdown_result diff --git a/opentelemetry-sdk/src/metrics/pipeline.rs b/opentelemetry-sdk/src/metrics/pipeline.rs index 8212d872ad..0aae397c5f 100644 --- a/opentelemetry-sdk/src/metrics/pipeline.rs +++ b/opentelemetry-sdk/src/metrics/pipeline.rs @@ -8,13 +8,12 @@ use std::{ use opentelemetry::{otel_debug, InstrumentationScope, KeyValue}; use crate::{ + error::ShutdownResult, metrics::{ aggregation, data::{Metric, ResourceMetrics, ScopeMetrics}, instrument::{Instrument, InstrumentId, InstrumentKind, Stream}, - internal, - internal::AggregateBuilder, - internal::Number, + internal::{self, AggregateBuilder, Number}, reader::{MetricReader, SdkProducer}, view::View, MetricError, MetricResult, @@ -96,7 +95,7 @@ impl Pipeline { } /// Shut down pipeline - fn shutdown(&self) -> MetricResult<()> { + fn shutdown(&self) -> ShutdownResult { self.reader.shutdown() } } @@ -651,7 +650,7 @@ impl Pipelines { } /// Shut down all pipelines - pub(crate) fn shutdown(&self) -> MetricResult<()> { + pub(crate) fn shutdown(&self) -> ShutdownResult { let mut errs = vec![]; for pipeline in &self.0 { if let Err(err) = pipeline.shutdown() { @@ -662,7 +661,9 @@ impl Pipelines { if errs.is_empty() { Ok(()) } else { - Err(MetricError::Other(format!("{errs:?}"))) + Err(crate::error::ShutdownError::InternalFailure(format!( + "{errs:?}" + ))) } } } diff --git a/opentelemetry-sdk/src/metrics/reader.rs b/opentelemetry-sdk/src/metrics/reader.rs index 8016a0dab4..59549d6104 100644 --- a/opentelemetry-sdk/src/metrics/reader.rs +++ b/opentelemetry-sdk/src/metrics/reader.rs @@ -1,7 +1,7 @@ //! Interfaces for reading and producing metrics use std::{fmt, sync::Weak}; -use crate::metrics::MetricResult; +use crate::{error::ShutdownResult, metrics::MetricResult}; use super::{data::ResourceMetrics, pipeline::Pipeline, InstrumentKind, Temporality}; @@ -46,7 +46,7 @@ pub trait MetricReader: fmt::Debug + Send + Sync + 'static { /// /// After `shutdown` is called, calls to `collect` will perform no operation and /// instead will return an error indicating the shutdown state. - fn shutdown(&self) -> MetricResult<()>; + fn shutdown(&self) -> ShutdownResult; /// The output temporality, a function of instrument kind. /// This SHOULD be obtained from the exporter. diff --git a/opentelemetry-sdk/src/testing/metrics/metric_reader.rs b/opentelemetry-sdk/src/testing/metrics/metric_reader.rs index c535fb1c93..dca6eabf58 100644 --- a/opentelemetry-sdk/src/testing/metrics/metric_reader.rs +++ b/opentelemetry-sdk/src/testing/metrics/metric_reader.rs @@ -1,5 +1,6 @@ use std::sync::{Arc, Mutex, Weak}; +use crate::error::{ShutdownError, ShutdownResult}; use crate::metrics::{ data::ResourceMetrics, pipeline::Pipeline, reader::MetricReader, InstrumentKind, }; @@ -41,13 +42,13 @@ impl MetricReader for TestMetricReader { Ok(()) } - fn shutdown(&self) -> MetricResult<()> { + fn shutdown(&self) -> ShutdownResult { let result = self.force_flush(); { let mut is_shutdown = self.is_shutdown.lock().unwrap(); *is_shutdown = true; } - result + result.map_err(|e| ShutdownError::InternalFailure(e.to_string())) } fn temporality(&self, _kind: InstrumentKind) -> Temporality { diff --git a/opentelemetry-stdout/src/metrics/exporter.rs b/opentelemetry-stdout/src/metrics/exporter.rs index 0981f939c4..6117916b64 100644 --- a/opentelemetry-stdout/src/metrics/exporter.rs +++ b/opentelemetry-stdout/src/metrics/exporter.rs @@ -1,14 +1,17 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use core::{f64, fmt}; -use opentelemetry_sdk::metrics::{ - data::{ - ExponentialHistogram, Gauge, GaugeDataPoint, Histogram, HistogramDataPoint, - ResourceMetrics, ScopeMetrics, Sum, SumDataPoint, +use opentelemetry_sdk::metrics::{MetricError, MetricResult, Temporality}; +use opentelemetry_sdk::{ + error::ShutdownResult, + metrics::{ + data::{ + ExponentialHistogram, Gauge, GaugeDataPoint, Histogram, HistogramDataPoint, + ResourceMetrics, ScopeMetrics, Sum, SumDataPoint, + }, + exporter::PushMetricExporter, }, - exporter::PushMetricExporter, }; -use opentelemetry_sdk::metrics::{MetricError, MetricResult, Temporality}; use std::fmt::Debug; use std::sync::atomic; @@ -62,7 +65,7 @@ impl PushMetricExporter for MetricExporter { Ok(()) } - fn shutdown(&self) -> MetricResult<()> { + fn shutdown(&self) -> ShutdownResult { self.is_shutdown.store(true, atomic::Ordering::SeqCst); Ok(()) }