Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add global::meter_provider_shutdown #1623

Merged
merged 20 commits into from
Mar 16, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions examples/metrics-advanced/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use opentelemetry::global;
use opentelemetry::metrics::Unit;
use opentelemetry::Key;
use opentelemetry::{metrics::MeterProvider as _, KeyValue};
Expand All @@ -7,7 +8,7 @@ use opentelemetry_sdk::metrics::{
use opentelemetry_sdk::{runtime, Resource};
use std::error::Error;

fn init_meter_provider() -> SdkMeterProvider {
fn init_meter_provider() {
// for example 1
let my_view_rename_and_unit = |i: &Instrument| {
if i.name == "my_histogram" {
Expand Down Expand Up @@ -50,7 +51,7 @@ fn init_meter_provider() -> SdkMeterProvider {
// Ok(serde_json::to_writer_pretty(writer, &data).unwrap()))
.build();
let reader = PeriodicReader::builder(exporter, runtime::Tokio).build();
SdkMeterProvider::builder()
let provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_resource(Resource::new(vec![KeyValue::new(
"service.name",
Expand All @@ -59,13 +60,14 @@ fn init_meter_provider() -> SdkMeterProvider {
.with_view(my_view_rename_and_unit)
.with_view(my_view_drop_attributes)
.with_view(my_view_change_aggregation)
.build()
.build();
global::set_meter_provider(provider);
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let meter_provider = init_meter_provider();
let meter = meter_provider.meter("mylibraryname");
init_meter_provider();
let meter = global::meter_provider().meter("mylibraryname");

// Example 1 - Rename metric using View.
// This instrument will be renamed to "my_histogram_renamed",
Expand Down Expand Up @@ -151,6 +153,6 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
// Metrics are exported by default every 30 seconds when using stdout exporter,
// however shutting down the MeterProvider here instantly flushes
// the metrics, instead of waiting for the 30 sec interval.
meter_provider.shutdown()?;
global::shutdown_meter_provider();
Ok(())
}
14 changes: 8 additions & 6 deletions examples/metrics-basic/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,34 @@
use opentelemetry::global;
use opentelemetry::metrics::Unit;
use opentelemetry::{metrics::MeterProvider as _, KeyValue};
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
use opentelemetry_sdk::{runtime, Resource};
use std::error::Error;

fn init_meter_provider() -> SdkMeterProvider {
fn init_meter_provider() {
let exporter = opentelemetry_stdout::MetricsExporterBuilder::default()
// uncomment the below lines to pretty print output.
// .with_encoder(|writer, data|
// Ok(serde_json::to_writer_pretty(writer, &data).unwrap()))
.build();
let reader = PeriodicReader::builder(exporter, runtime::Tokio).build();
SdkMeterProvider::builder()
let provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_resource(Resource::new(vec![KeyValue::new(
"service.name",
"metrics-basic-example",
)]))
.build()
.build();
global::set_meter_provider(provider);
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
// Initialize the MeterProvider with the stdout Exporter.
let meter_provider = init_meter_provider();
init_meter_provider();

// Create a meter from the above MeterProvider.
let meter = meter_provider.meter("mylibraryname");
let meter = global::meter_provider().meter("mylibraryname");

// Create a Counter Instrument.
let counter = meter.u64_counter("my_counter").init();
Expand Down Expand Up @@ -146,6 +148,6 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
// Metrics are exported by default every 30 seconds when using stdout exporter,
// however shutting down the MeterProvider here instantly flushes
// the metrics, instead of waiting for the 30 sec interval.
meter_provider.shutdown()?;
global::shutdown_meter_provider();
Ok(())
}
18 changes: 11 additions & 7 deletions opentelemetry-otlp/examples/basic-otlp-http/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use once_cell::sync::Lazy;
use opentelemetry::{
global, metrics,
global,
metrics::MetricsError,
trace::{TraceContextExt, TraceError, Tracer},
Key, KeyValue,
};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::logs as sdklogs;
use opentelemetry_sdk::metrics as sdkmetrics;
use opentelemetry_sdk::resource;
use opentelemetry_sdk::trace as sdktrace;

Expand Down Expand Up @@ -44,19 +44,23 @@ fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
.install_batch(opentelemetry_sdk::runtime::Tokio)
}

fn init_metrics() -> metrics::Result<sdkmetrics::SdkMeterProvider> {
fn init_metrics() -> Result<(), MetricsError> {
let export_config = opentelemetry_otlp::ExportConfig {
endpoint: "http://localhost:4318/v1/metrics".to_string(),
..opentelemetry_otlp::ExportConfig::default()
};
opentelemetry_otlp::new_pipeline()
let provider = opentelemetry_otlp::new_pipeline()
.metrics(opentelemetry_sdk::runtime::Tokio)
.with_exporter(
opentelemetry_otlp::new_exporter()
.http()
.with_export_config(export_config),
)
.build()
.build();
match provider {
Ok(provider) => Ok(()),
Err(err) => Err(err),
}
}

const LEMONS_KEY: Key = Key::from_static_str("ex.com/lemons");
Expand All @@ -74,7 +78,7 @@ static COMMON_ATTRIBUTES: Lazy<[KeyValue; 4]> = Lazy::new(|| {
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let _ = init_tracer()?;
let meter_provider = init_metrics()?;
let _ = init_metrics()?;
let _ = init_logs();

let tracer = global::tracer("ex.com/basic");
Expand Down Expand Up @@ -108,7 +112,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {

global::shutdown_tracer_provider();
global::shutdown_logger_provider();
meter_provider.shutdown()?;
global::shutdown_meter_provider();

Ok(())
}
25 changes: 14 additions & 11 deletions opentelemetry-otlp/examples/basic-otlp/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
use log::{info, Level};
use once_cell::sync::Lazy;
use opentelemetry::global;
use opentelemetry::global::{logger_provider, shutdown_logger_provider, shutdown_tracer_provider};
use opentelemetry::logs::LogError;
use opentelemetry::metrics::MetricsError;
use opentelemetry::trace::TraceError;
use opentelemetry::{
metrics,
trace::{TraceContextExt, Tracer},
Key, KeyValue,
};
use opentelemetry_appender_log::OpenTelemetryLogBridge;
use opentelemetry_otlp::{ExportConfig, WithExportConfig};
use opentelemetry_sdk::logs::Config;
use opentelemetry_sdk::{metrics::SdkMeterProvider, runtime, trace as sdktrace, Resource};
use opentelemetry_sdk::{runtime, trace as sdktrace, Resource};
use std::error::Error;

fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
Expand All @@ -32,12 +31,12 @@ fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
.install_batch(runtime::Tokio)
}

fn init_metrics() -> metrics::Result<SdkMeterProvider> {
fn init_metrics() -> Result<(), MetricsError> {
let export_config = ExportConfig {
endpoint: "http://localhost:4317".to_string(),
..ExportConfig::default()
};
opentelemetry_otlp::new_pipeline()
let provider = opentelemetry_otlp::new_pipeline()
.metrics(runtime::Tokio)
.with_exporter(
opentelemetry_otlp::new_exporter()
Expand All @@ -48,7 +47,11 @@ fn init_metrics() -> metrics::Result<SdkMeterProvider> {
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
"basic-otlp-metrics-example",
)]))
.build()
.build();
match provider {
Ok(_provider) => Ok(()),
Err(err) => Err(err),
}
}

fn init_logs() -> Result<opentelemetry_sdk::logs::Logger, LogError> {
Expand Down Expand Up @@ -87,13 +90,13 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
// matches the containing block, reporting traces and metrics during the whole
// execution.
let _ = init_tracer()?;
let meter_provider = init_metrics()?;
let _ = init_metrics()?;

// Initialize logs, which sets the global loggerprovider.
let _ = init_logs();

// Retrieve the global LoggerProvider.
let logger_provider = logger_provider();
let logger_provider = global::logger_provider();

// Create a new OpenTelemetryLogBridge using the above LoggerProvider.
let otel_log_appender = OpenTelemetryLogBridge::new(&logger_provider);
Expand Down Expand Up @@ -137,9 +140,9 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {

info!(target: "my-target", "hello from {}. My price is {}", "apple", 1.99);

shutdown_tracer_provider();
shutdown_logger_provider();
meter_provider.shutdown()?;
global::shutdown_tracer_provider();
global::shutdown_logger_provider();
global::shutdown_meter_provider();

Ok(())
}
4 changes: 3 additions & 1 deletion opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
dependency on crossbeam-channel.
[1612](https://github.com/open-telemetry/opentelemetry-rust/pull/1612/files)
- [#1422](https://github.com/open-telemetry/opentelemetry-rust/pull/1422)
Fix metrics aggregation bug when using Views to drop attributes.
Fix metrics aggregation bug when using Views to drop attributes.
- [#1623](https://github.com/open-telemetry/opentelemetry-rust/pull/1623) Add Drop implementation for SdkTracerProvider, which shuts down
metricreaders, thereby allowing metrics still in memory to be flushed out.

## v0.22.1

Expand Down
48 changes: 43 additions & 5 deletions opentelemetry-sdk/src/metrics/meter_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
};

use opentelemetry::{
global,
metrics::{noop::NoopMeterCore, Meter, MeterProvider, MetricsError, Result},
KeyValue,
};
Expand Down Expand Up @@ -113,6 +114,16 @@
}
}

impl Drop for SdkMeterProvider {
fn drop(&mut self) {
if self.is_shutdown.load(Ordering::Relaxed) {
return;
}
if let Err(err) = self.shutdown() {
global::handle_error(err);

Check warning on line 123 in opentelemetry-sdk/src/metrics/meter_provider.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/meter_provider.rs#L123

Added line #L123 was not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might cause false alarms.. it is normal for users to call shutdown of their own, and then drop will call it again, causing the global error handler to print a message. It is unactionable for users, and they haven't anything wrong either.... Might need to revisit.

Copy link

@bwalk-at-ibm bwalk-at-ibm Apr 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's even worse because this is bugged because dropping just one reference to any metrics provider will shutdown and terminate the underlying data (pipelines), invalidating all copies, especially the registered global metrics provider.

}
}
}
impl MeterProvider for SdkMeterProvider {
fn versioned_meter(
&self,
Expand Down Expand Up @@ -211,6 +222,7 @@
mod tests {
use crate::testing::metrics::metric_reader::TestMetricReader;
use crate::Resource;
use opentelemetry::global;
use opentelemetry::Key;
use opentelemetry::KeyValue;
use std::env;
Expand All @@ -228,14 +240,14 @@
expect.map(|s| s.to_string())
);
};
let reader = TestMetricReader {};
let reader = TestMetricReader::new();
let default_meter_provider = super::SdkMeterProvider::builder()
.with_reader(reader)
.build();
assert_service_name(default_meter_provider, Some("unknown_service"));

// If user provided a resource, use that.
let reader2 = TestMetricReader {};
let reader2 = TestMetricReader::new();
let custom_meter_provider = super::SdkMeterProvider::builder()
.with_reader(reader2)
.with_resource(Resource::new(vec![KeyValue::new(
Expand All @@ -250,7 +262,7 @@
Some("key1=value1, k2, k3=value2"),
|| {
// If `OTEL_RESOURCE_ATTRIBUTES` is set, read them automatically
let reader3 = TestMetricReader {};
let reader3 = TestMetricReader::new();
let env_resource_provider = super::SdkMeterProvider::builder()
.with_reader(reader3)
.build();
Expand All @@ -273,7 +285,7 @@
"OTEL_RESOURCE_ATTRIBUTES",
Some("my-custom-key=env-val,k2=value2"),
|| {
let reader4 = TestMetricReader {};
let reader4 = TestMetricReader::new();
let user_provided_resource_config_provider = super::SdkMeterProvider::builder()
.with_reader(reader4)
.with_resource(Resource::default().merge(&mut Resource::new(vec![
Expand All @@ -295,12 +307,38 @@
);

// If user provided a resource, it takes priority during collision.
let reader5 = TestMetricReader {};
let reader5 = TestMetricReader::new();
let no_service_name = super::SdkMeterProvider::builder()
.with_reader(reader5)
.with_resource(Resource::empty())
.build();

assert_service_name(no_service_name, None);
}

#[test]
fn test_meter_provider_shutdown() {
let reader = TestMetricReader::new();
let provider = super::SdkMeterProvider::builder()
.with_reader(reader.clone())
.build();
global::set_meter_provider(provider.clone());
assert!(!provider
.is_shutdown
.load(std::sync::atomic::Ordering::Relaxed));
assert!(!reader.is_shutdown());
// create a meter and an instrument
let meter = global::meter("test");
let counter = meter.u64_counter("test_counter").init();
// no need to drop a meter for meter_provider shutdown
global::shutdown_meter_provider();
assert!(provider
.is_shutdown
.load(std::sync::atomic::Ordering::Relaxed));
assert!(reader.is_shutdown());
// TODO - assert that the instrument is no longer usable
// As of now, even after shutdown, the instrument can still be used.
// Even though reader is shutdown, and no collect will happen.
assert!(counter.add(1, &[]) == ());

Check failure on line 342 in opentelemetry-sdk/src/metrics/meter_provider.rs

View workflow job for this annotation

GitHub Actions / lint

==-comparison of unit values detected. This will always be true
}
}
Loading
Loading