diff --git a/examples/metrics-advanced/src/main.rs b/examples/metrics-advanced/src/main.rs index e5d03a9213..6c001c19e3 100644 --- a/examples/metrics-advanced/src/main.rs +++ b/examples/metrics-advanced/src/main.rs @@ -1,3 +1,4 @@ +use opentelemetry::global; use opentelemetry::metrics::Unit; use opentelemetry::Key; use opentelemetry::{metrics::MeterProvider as _, KeyValue}; @@ -7,7 +8,7 @@ use opentelemetry_sdk::metrics::{ use opentelemetry_sdk::{runtime, Resource}; use std::error::Error; -fn init_meter_provider() -> SdkMeterProvider { +fn init_meter_provider() { // for example 1 let my_view_rename_and_unit = |i: &Instrument| { if i.name == "my_histogram" { @@ -50,7 +51,7 @@ fn init_meter_provider() -> SdkMeterProvider { // Ok(serde_json::to_writer_pretty(writer, &data).unwrap())) .build(); let reader = PeriodicReader::builder(exporter, runtime::Tokio).build(); - SdkMeterProvider::builder() + let provider = SdkMeterProvider::builder() .with_reader(reader) .with_resource(Resource::new(vec![KeyValue::new( "service.name", @@ -59,13 +60,14 @@ fn init_meter_provider() -> SdkMeterProvider { .with_view(my_view_rename_and_unit) .with_view(my_view_drop_attributes) .with_view(my_view_change_aggregation) - .build() + .build(); + global::set_meter_provider(provider); } #[tokio::main] async fn main() -> Result<(), Box> { - let meter_provider = init_meter_provider(); - let meter = meter_provider.meter("mylibraryname"); + init_meter_provider(); + let meter = global::meter("mylibraryname"); // Example 1 - Rename metric using View. // This instrument will be renamed to "my_histogram_renamed", @@ -151,6 +153,6 @@ async fn main() -> Result<(), Box> { // Metrics are exported by default every 30 seconds when using stdout exporter, // however shutting down the MeterProvider here instantly flushes // the metrics, instead of waiting for the 30 sec interval. - meter_provider.shutdown()?; + global::shutdown_meter_provider(); Ok(()) } diff --git a/examples/metrics-basic/src/main.rs b/examples/metrics-basic/src/main.rs index f7828da796..51cfbb20bb 100644 --- a/examples/metrics-basic/src/main.rs +++ b/examples/metrics-basic/src/main.rs @@ -1,32 +1,34 @@ +use opentelemetry::global; use opentelemetry::metrics::Unit; use opentelemetry::{metrics::MeterProvider as _, KeyValue}; use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider}; use opentelemetry_sdk::{runtime, Resource}; use std::error::Error; -fn init_meter_provider() -> SdkMeterProvider { +fn init_meter_provider() { let exporter = opentelemetry_stdout::MetricsExporterBuilder::default() // uncomment the below lines to pretty print output. // .with_encoder(|writer, data| // Ok(serde_json::to_writer_pretty(writer, &data).unwrap())) .build(); let reader = PeriodicReader::builder(exporter, runtime::Tokio).build(); - SdkMeterProvider::builder() + let provider = SdkMeterProvider::builder() .with_reader(reader) .with_resource(Resource::new(vec![KeyValue::new( "service.name", "metrics-basic-example", )])) - .build() + .build(); + global::set_meter_provider(provider); } #[tokio::main] async fn main() -> Result<(), Box> { // Initialize the MeterProvider with the stdout Exporter. - let meter_provider = init_meter_provider(); + init_meter_provider(); // Create a meter from the above MeterProvider. - let meter = meter_provider.meter("mylibraryname"); + let meter = global::meter("mylibraryname"); // Create a Counter Instrument. let counter = meter.u64_counter("my_counter").init(); @@ -146,6 +148,6 @@ async fn main() -> Result<(), Box> { // Metrics are exported by default every 30 seconds when using stdout exporter, // however shutting down the MeterProvider here instantly flushes // the metrics, instead of waiting for the 30 sec interval. - meter_provider.shutdown()?; + global::shutdown_meter_provider(); Ok(()) } diff --git a/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs b/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs index 0f320dcb8d..f06ba1d9de 100644 --- a/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs +++ b/opentelemetry-otlp/examples/basic-otlp-http/src/main.rs @@ -1,13 +1,13 @@ use once_cell::sync::Lazy; use opentelemetry::{ - global, metrics, + global, + metrics::MetricsError, trace::{TraceContextExt, TraceError, Tracer}, Key, KeyValue, }; use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::logs as sdklogs; -use opentelemetry_sdk::metrics as sdkmetrics; use opentelemetry_sdk::resource; use opentelemetry_sdk::trace as sdktrace; @@ -44,19 +44,20 @@ fn init_tracer() -> Result { .install_batch(opentelemetry_sdk::runtime::Tokio) } -fn init_metrics() -> metrics::Result { +fn init_metrics() -> Result<(), MetricsError> { let export_config = opentelemetry_otlp::ExportConfig { endpoint: "http://localhost:4318/v1/metrics".to_string(), ..opentelemetry_otlp::ExportConfig::default() }; - opentelemetry_otlp::new_pipeline() + let provider = opentelemetry_otlp::new_pipeline() .metrics(opentelemetry_sdk::runtime::Tokio) .with_exporter( opentelemetry_otlp::new_exporter() .http() .with_export_config(export_config), ) - .build() + .build(); + provider.map(|_| ()) } const LEMONS_KEY: Key = Key::from_static_str("ex.com/lemons"); @@ -74,7 +75,7 @@ static COMMON_ATTRIBUTES: Lazy<[KeyValue; 4]> = Lazy::new(|| { #[tokio::main] async fn main() -> Result<(), Box> { let _ = init_tracer()?; - let meter_provider = init_metrics()?; + let _ = init_metrics()?; let _ = init_logs(); let tracer = global::tracer("ex.com/basic"); @@ -108,7 +109,7 @@ async fn main() -> Result<(), Box> { global::shutdown_tracer_provider(); global::shutdown_logger_provider(); - meter_provider.shutdown()?; + global::shutdown_meter_provider(); Ok(()) } diff --git a/opentelemetry-otlp/examples/basic-otlp/src/main.rs b/opentelemetry-otlp/examples/basic-otlp/src/main.rs index 1bf5f7e421..4f4320cab9 100644 --- a/opentelemetry-otlp/examples/basic-otlp/src/main.rs +++ b/opentelemetry-otlp/examples/basic-otlp/src/main.rs @@ -1,18 +1,17 @@ use log::{info, Level}; use once_cell::sync::Lazy; use opentelemetry::global; -use opentelemetry::global::{logger_provider, shutdown_logger_provider, shutdown_tracer_provider}; use opentelemetry::logs::LogError; +use opentelemetry::metrics::MetricsError; use opentelemetry::trace::TraceError; use opentelemetry::{ - metrics, trace::{TraceContextExt, Tracer}, Key, KeyValue, }; use opentelemetry_appender_log::OpenTelemetryLogBridge; use opentelemetry_otlp::{ExportConfig, WithExportConfig}; use opentelemetry_sdk::logs::Config; -use opentelemetry_sdk::{metrics::SdkMeterProvider, runtime, trace as sdktrace, Resource}; +use opentelemetry_sdk::{runtime, trace as sdktrace, Resource}; use std::error::Error; fn init_tracer() -> Result { @@ -32,12 +31,12 @@ fn init_tracer() -> Result { .install_batch(runtime::Tokio) } -fn init_metrics() -> metrics::Result { +fn init_metrics() -> Result<(), MetricsError> { let export_config = ExportConfig { endpoint: "http://localhost:4317".to_string(), ..ExportConfig::default() }; - opentelemetry_otlp::new_pipeline() + let provider = opentelemetry_otlp::new_pipeline() .metrics(runtime::Tokio) .with_exporter( opentelemetry_otlp::new_exporter() @@ -48,7 +47,11 @@ fn init_metrics() -> metrics::Result { opentelemetry_semantic_conventions::resource::SERVICE_NAME, "basic-otlp-metrics-example", )])) - .build() + .build(); + match provider { + Ok(_provider) => Ok(()), + Err(err) => Err(err), + } } fn init_logs() -> Result { @@ -87,13 +90,13 @@ async fn main() -> Result<(), Box> { // matches the containing block, reporting traces and metrics during the whole // execution. let _ = init_tracer()?; - let meter_provider = init_metrics()?; + let _ = init_metrics()?; // Initialize logs, which sets the global loggerprovider. let _ = init_logs(); // Retrieve the global LoggerProvider. - let logger_provider = logger_provider(); + let logger_provider = global::logger_provider(); // Create a new OpenTelemetryLogBridge using the above LoggerProvider. let otel_log_appender = OpenTelemetryLogBridge::new(&logger_provider); @@ -137,9 +140,9 @@ async fn main() -> Result<(), Box> { info!(target: "my-target", "hello from {}. My price is {}", "apple", 1.99); - shutdown_tracer_provider(); - shutdown_logger_provider(); - meter_provider.shutdown()?; + global::shutdown_tracer_provider(); + global::shutdown_logger_provider(); + global::shutdown_meter_provider(); Ok(()) } diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index abb71fc186..e249982872 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -6,7 +6,9 @@ dependency on crossbeam-channel. [1612](https://github.com/open-telemetry/opentelemetry-rust/pull/1612/files) - [#1422](https://github.com/open-telemetry/opentelemetry-rust/pull/1422) - Fix metrics aggregation bug when using Views to drop attributes. + Fix metrics aggregation bug when using Views to drop attributes. +- [#1623](https://github.com/open-telemetry/opentelemetry-rust/pull/1623) Add Drop implementation for SdkMeterProvider, which shuts down +metricreaders, thereby allowing metrics still in memory to be flushed out. ## v0.22.1 diff --git a/opentelemetry-sdk/src/metrics/meter_provider.rs b/opentelemetry-sdk/src/metrics/meter_provider.rs index 13a14c1739..d02b3fc2bc 100644 --- a/opentelemetry-sdk/src/metrics/meter_provider.rs +++ b/opentelemetry-sdk/src/metrics/meter_provider.rs @@ -9,6 +9,7 @@ use std::{ }; use opentelemetry::{ + global, metrics::{noop::NoopMeterCore, Meter, MeterProvider, MetricsError, Result}, KeyValue, }; @@ -113,6 +114,13 @@ impl SdkMeterProvider { } } +impl Drop for SdkMeterProvider { + fn drop(&mut self) { + if let Err(err) = self.shutdown() { + global::handle_error(err); + } + } +} impl MeterProvider for SdkMeterProvider { fn versioned_meter( &self, @@ -211,6 +219,7 @@ impl fmt::Debug for MeterProviderBuilder { mod tests { use crate::testing::metrics::metric_reader::TestMetricReader; use crate::Resource; + use opentelemetry::global; use opentelemetry::Key; use opentelemetry::KeyValue; use std::env; @@ -228,14 +237,14 @@ mod tests { expect.map(|s| s.to_string()) ); }; - let reader = TestMetricReader {}; + let reader = TestMetricReader::new(); let default_meter_provider = super::SdkMeterProvider::builder() .with_reader(reader) .build(); assert_service_name(default_meter_provider, Some("unknown_service")); // If user provided a resource, use that. - let reader2 = TestMetricReader {}; + let reader2 = TestMetricReader::new(); let custom_meter_provider = super::SdkMeterProvider::builder() .with_reader(reader2) .with_resource(Resource::new(vec![KeyValue::new( @@ -250,7 +259,7 @@ mod tests { Some("key1=value1, k2, k3=value2"), || { // If `OTEL_RESOURCE_ATTRIBUTES` is set, read them automatically - let reader3 = TestMetricReader {}; + let reader3 = TestMetricReader::new(); let env_resource_provider = super::SdkMeterProvider::builder() .with_reader(reader3) .build(); @@ -273,7 +282,7 @@ mod tests { "OTEL_RESOURCE_ATTRIBUTES", Some("my-custom-key=env-val,k2=value2"), || { - let reader4 = TestMetricReader {}; + let reader4 = TestMetricReader::new(); let user_provided_resource_config_provider = super::SdkMeterProvider::builder() .with_reader(reader4) .with_resource(Resource::default().merge(&mut Resource::new(vec![ @@ -295,7 +304,7 @@ mod tests { ); // If user provided a resource, it takes priority during collision. - let reader5 = TestMetricReader {}; + let reader5 = TestMetricReader::new(); let no_service_name = super::SdkMeterProvider::builder() .with_reader(reader5) .with_resource(Resource::empty()) @@ -303,4 +312,29 @@ mod tests { assert_service_name(no_service_name, None); } + + #[test] + fn test_meter_provider_shutdown() { + let reader = TestMetricReader::new(); + let provider = super::SdkMeterProvider::builder() + .with_reader(reader.clone()) + .build(); + global::set_meter_provider(provider.clone()); + assert!(!provider + .is_shutdown + .load(std::sync::atomic::Ordering::Relaxed)); + assert!(!reader.is_shutdown()); + // create a meter and an instrument + let meter = global::meter("test"); + let counter = meter.u64_counter("test_counter").init(); + // no need to drop a meter for meter_provider shutdown + global::shutdown_meter_provider(); + assert!(provider + .is_shutdown + .load(std::sync::atomic::Ordering::Relaxed)); + assert!(reader.is_shutdown()); + // TODO Fix: the instrument is still available, and can be used. + // While the reader is shutdown, and no collect is happening + counter.add(1, &[]); + } } diff --git a/opentelemetry-sdk/src/testing/metrics/metric_reader.rs b/opentelemetry-sdk/src/testing/metrics/metric_reader.rs index 1a63e2b06b..6bea3fdd02 100644 --- a/opentelemetry-sdk/src/testing/metrics/metric_reader.rs +++ b/opentelemetry-sdk/src/testing/metrics/metric_reader.rs @@ -1,4 +1,4 @@ -use std::sync::Weak; +use std::sync::{Arc, Mutex, Weak}; use crate::metrics::{ aggregation::Aggregation, @@ -9,8 +9,30 @@ use crate::metrics::{ }; use opentelemetry::metrics::Result; -#[derive(Debug)] -pub struct TestMetricReader {} +#[derive(Debug, Clone)] +pub struct TestMetricReader { + is_shutdown: Arc>, +} + +impl TestMetricReader { + // Constructor to initialize the TestMetricReader + pub fn new() -> Self { + TestMetricReader { + is_shutdown: Arc::new(Mutex::new(false)), + } + } + + // Method to check if the reader is shutdown + pub fn is_shutdown(&self) -> bool { + *self.is_shutdown.lock().unwrap() + } +} + +impl Default for TestMetricReader { + fn default() -> Self { + Self::new() + } +} impl MetricReader for TestMetricReader { fn register_pipeline(&self, _pipeline: Weak) {} @@ -24,7 +46,12 @@ impl MetricReader for TestMetricReader { } fn shutdown(&self) -> Result<()> { - self.force_flush() + let result = self.force_flush(); + { + let mut is_shutdown = self.is_shutdown.lock().unwrap(); + *is_shutdown = true; + } + result } } diff --git a/opentelemetry-stdout/examples/basic.rs b/opentelemetry-stdout/examples/basic.rs index 41cfa7a30f..b59a477b4b 100644 --- a/opentelemetry-stdout/examples/basic.rs +++ b/opentelemetry-stdout/examples/basic.rs @@ -2,7 +2,7 @@ #[cfg(all(feature = "metrics", feature = "trace"))] use opentelemetry::{ - metrics::MeterProvider as _, + global, trace::{Span, Tracer, TracerProvider as _}, KeyValue, }; @@ -22,17 +22,18 @@ fn init_trace() -> TracerProvider { } #[cfg(all(feature = "metrics", feature = "trace"))] -fn init_metrics() -> SdkMeterProvider { +fn init_metrics() { let exporter = opentelemetry_stdout::MetricsExporter::default(); let reader = PeriodicReader::builder(exporter, runtime::Tokio).build(); - SdkMeterProvider::builder().with_reader(reader).build() + let provider = SdkMeterProvider::builder().with_reader(reader).build(); + global::set_meter_provider(provider); } #[tokio::main] #[cfg(all(feature = "metrics", feature = "trace"))] async fn main() -> Result<(), Box> { let tracer_provider = init_trace(); - let meter_provider = init_metrics(); + init_metrics(); let tracer = tracer_provider.tracer("stdout-test"); let mut span = tracer.start("test_span"); @@ -43,11 +44,11 @@ async fn main() -> Result<(), Box> { ); span.end(); - let meter = meter_provider.meter("stdout-test"); + let meter = global::meter("stdout-test"); let c = meter.u64_counter("test_events").init(); c.add(1, &[KeyValue::new("test_key", "test_value")]); - meter_provider.shutdown()?; + global::shutdown_meter_provider(); Ok(()) } diff --git a/opentelemetry/CHANGELOG.md b/opentelemetry/CHANGELOG.md index 9613638a57..235201a7eb 100644 --- a/opentelemetry/CHANGELOG.md +++ b/opentelemetry/CHANGELOG.md @@ -2,7 +2,12 @@ ## vNext +### Added + +- [#1623](https://github.com/open-telemetry/opentelemetry-rust/pull/1623) Add global::meter_provider_shutdown + ### Removed + - Remove `urlencoding` crate dependency. [#1613](https://github.com/open-telemetry/opentelemetry-rust/pull/1613) ## v0.22.0 diff --git a/opentelemetry/src/global/metrics.rs b/opentelemetry/src/global/metrics.rs index c40477e5ac..24d293f154 100644 --- a/opentelemetry/src/global/metrics.rs +++ b/opentelemetry/src/global/metrics.rs @@ -1,4 +1,4 @@ -use crate::metrics::{self, Meter, MeterProvider}; +use crate::metrics::{self, noop::NoopMeterProvider, Meter, MeterProvider}; use crate::KeyValue; use core::fmt; use once_cell::sync::Lazy; @@ -116,6 +116,11 @@ pub fn meter(name: impl Into>) -> Meter { meter_provider().meter(name.into()) } +/// Shut down the current global [`MeterProvider`]. +pub fn shutdown_meter_provider() { + set_meter_provider(NoopMeterProvider::new()); +} + /// Creates a [`Meter`] with the name, version and schema url. /// /// - name SHOULD uniquely identify the instrumentation scope, such as the instrumentation library (e.g. io.opentelemetry.contrib.mongodb), package, module or class name.