Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add global::meter_provider_shutdown #1623

Merged
merged 20 commits into from
Mar 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions examples/metrics-advanced/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use opentelemetry::global;
use opentelemetry::metrics::Unit;
use opentelemetry::Key;
use opentelemetry::{metrics::MeterProvider as _, KeyValue};
Expand All @@ -7,7 +8,7 @@ use opentelemetry_sdk::metrics::{
use opentelemetry_sdk::{runtime, Resource};
use std::error::Error;

fn init_meter_provider() -> SdkMeterProvider {
fn init_meter_provider() {
// for example 1
let my_view_rename_and_unit = |i: &Instrument| {
if i.name == "my_histogram" {
Expand Down Expand Up @@ -50,7 +51,7 @@ fn init_meter_provider() -> SdkMeterProvider {
// Ok(serde_json::to_writer_pretty(writer, &data).unwrap()))
.build();
let reader = PeriodicReader::builder(exporter, runtime::Tokio).build();
SdkMeterProvider::builder()
let provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_resource(Resource::new(vec![KeyValue::new(
"service.name",
Expand All @@ -59,13 +60,14 @@ fn init_meter_provider() -> SdkMeterProvider {
.with_view(my_view_rename_and_unit)
.with_view(my_view_drop_attributes)
.with_view(my_view_change_aggregation)
.build()
.build();
global::set_meter_provider(provider);
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let meter_provider = init_meter_provider();
let meter = meter_provider.meter("mylibraryname");
init_meter_provider();
let meter = global::meter("mylibraryname");

// Example 1 - Rename metric using View.
// This instrument will be renamed to "my_histogram_renamed",
Expand Down Expand Up @@ -151,6 +153,6 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
// Metrics are exported by default every 30 seconds when using stdout exporter,
// however shutting down the MeterProvider here instantly flushes
// the metrics, instead of waiting for the 30 sec interval.
meter_provider.shutdown()?;
global::shutdown_meter_provider();
Ok(())
}
14 changes: 8 additions & 6 deletions examples/metrics-basic/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,34 @@
use opentelemetry::global;
use opentelemetry::metrics::Unit;
use opentelemetry::{metrics::MeterProvider as _, KeyValue};
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
use opentelemetry_sdk::{runtime, Resource};
use std::error::Error;

fn init_meter_provider() -> SdkMeterProvider {
fn init_meter_provider() {
let exporter = opentelemetry_stdout::MetricsExporterBuilder::default()
// uncomment the below lines to pretty print output.
// .with_encoder(|writer, data|
// Ok(serde_json::to_writer_pretty(writer, &data).unwrap()))
.build();
let reader = PeriodicReader::builder(exporter, runtime::Tokio).build();
SdkMeterProvider::builder()
let provider = SdkMeterProvider::builder()
.with_reader(reader)
.with_resource(Resource::new(vec![KeyValue::new(
"service.name",
"metrics-basic-example",
)]))
.build()
.build();
global::set_meter_provider(provider);
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
// Initialize the MeterProvider with the stdout Exporter.
let meter_provider = init_meter_provider();
init_meter_provider();

// Create a meter from the above MeterProvider.
let meter = meter_provider.meter("mylibraryname");
let meter = global::meter("mylibraryname");

// Create a Counter Instrument.
let counter = meter.u64_counter("my_counter").init();
Expand Down Expand Up @@ -146,6 +148,6 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
// Metrics are exported by default every 30 seconds when using stdout exporter,
// however shutting down the MeterProvider here instantly flushes
// the metrics, instead of waiting for the 30 sec interval.
meter_provider.shutdown()?;
global::shutdown_meter_provider();
Ok(())
}
15 changes: 8 additions & 7 deletions opentelemetry-otlp/examples/basic-otlp-http/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use once_cell::sync::Lazy;
use opentelemetry::{
global, metrics,
global,
metrics::MetricsError,
trace::{TraceContextExt, TraceError, Tracer},
Key, KeyValue,
};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::logs as sdklogs;
use opentelemetry_sdk::metrics as sdkmetrics;
use opentelemetry_sdk::resource;
use opentelemetry_sdk::trace as sdktrace;

Expand Down Expand Up @@ -44,19 +44,20 @@ fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
.install_batch(opentelemetry_sdk::runtime::Tokio)
}

fn init_metrics() -> metrics::Result<sdkmetrics::SdkMeterProvider> {
fn init_metrics() -> Result<(), MetricsError> {
let export_config = opentelemetry_otlp::ExportConfig {
endpoint: "http://localhost:4318/v1/metrics".to_string(),
..opentelemetry_otlp::ExportConfig::default()
};
opentelemetry_otlp::new_pipeline()
let provider = opentelemetry_otlp::new_pipeline()
.metrics(opentelemetry_sdk::runtime::Tokio)
.with_exporter(
opentelemetry_otlp::new_exporter()
.http()
.with_export_config(export_config),
)
.build()
.build();
provider.map(|_| ())
}

const LEMONS_KEY: Key = Key::from_static_str("ex.com/lemons");
Expand All @@ -74,7 +75,7 @@ static COMMON_ATTRIBUTES: Lazy<[KeyValue; 4]> = Lazy::new(|| {
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let _ = init_tracer()?;
let meter_provider = init_metrics()?;
let _ = init_metrics()?;
let _ = init_logs();

let tracer = global::tracer("ex.com/basic");
Expand Down Expand Up @@ -108,7 +109,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {

global::shutdown_tracer_provider();
global::shutdown_logger_provider();
meter_provider.shutdown()?;
global::shutdown_meter_provider();

Ok(())
}
25 changes: 14 additions & 11 deletions opentelemetry-otlp/examples/basic-otlp/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
use log::{info, Level};
use once_cell::sync::Lazy;
use opentelemetry::global;
use opentelemetry::global::{logger_provider, shutdown_logger_provider, shutdown_tracer_provider};
use opentelemetry::logs::LogError;
use opentelemetry::metrics::MetricsError;
use opentelemetry::trace::TraceError;
use opentelemetry::{
metrics,
trace::{TraceContextExt, Tracer},
Key, KeyValue,
};
use opentelemetry_appender_log::OpenTelemetryLogBridge;
use opentelemetry_otlp::{ExportConfig, WithExportConfig};
use opentelemetry_sdk::logs::Config;
use opentelemetry_sdk::{metrics::SdkMeterProvider, runtime, trace as sdktrace, Resource};
use opentelemetry_sdk::{runtime, trace as sdktrace, Resource};
use std::error::Error;

fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
Expand All @@ -32,12 +31,12 @@ fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
.install_batch(runtime::Tokio)
}

fn init_metrics() -> metrics::Result<SdkMeterProvider> {
fn init_metrics() -> Result<(), MetricsError> {
let export_config = ExportConfig {
endpoint: "http://localhost:4317".to_string(),
..ExportConfig::default()
};
opentelemetry_otlp::new_pipeline()
let provider = opentelemetry_otlp::new_pipeline()
.metrics(runtime::Tokio)
.with_exporter(
opentelemetry_otlp::new_exporter()
Expand All @@ -48,7 +47,11 @@ fn init_metrics() -> metrics::Result<SdkMeterProvider> {
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
"basic-otlp-metrics-example",
)]))
.build()
.build();
match provider {
Ok(_provider) => Ok(()),
Err(err) => Err(err),
}
}

fn init_logs() -> Result<opentelemetry_sdk::logs::Logger, LogError> {
Expand Down Expand Up @@ -87,13 +90,13 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
// matches the containing block, reporting traces and metrics during the whole
// execution.
let _ = init_tracer()?;
let meter_provider = init_metrics()?;
let _ = init_metrics()?;

// Initialize logs, which sets the global loggerprovider.
let _ = init_logs();

// Retrieve the global LoggerProvider.
let logger_provider = logger_provider();
let logger_provider = global::logger_provider();

// Create a new OpenTelemetryLogBridge using the above LoggerProvider.
let otel_log_appender = OpenTelemetryLogBridge::new(&logger_provider);
Expand Down Expand Up @@ -137,9 +140,9 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {

info!(target: "my-target", "hello from {}. My price is {}", "apple", 1.99);

shutdown_tracer_provider();
shutdown_logger_provider();
meter_provider.shutdown()?;
global::shutdown_tracer_provider();
global::shutdown_logger_provider();
global::shutdown_meter_provider();

Ok(())
}
4 changes: 3 additions & 1 deletion opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
dependency on crossbeam-channel.
[1612](https://github.com/open-telemetry/opentelemetry-rust/pull/1612/files)
- [#1422](https://github.com/open-telemetry/opentelemetry-rust/pull/1422)
Fix metrics aggregation bug when using Views to drop attributes.
Fix metrics aggregation bug when using Views to drop attributes.
- [#1623](https://github.com/open-telemetry/opentelemetry-rust/pull/1623) Add Drop implementation for SdkMeterProvider, which shuts down
metricreaders, thereby allowing metrics still in memory to be flushed out.

## v0.22.1

Expand Down
44 changes: 39 additions & 5 deletions opentelemetry-sdk/src/metrics/meter_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{
};

use opentelemetry::{
global,
metrics::{noop::NoopMeterCore, Meter, MeterProvider, MetricsError, Result},
KeyValue,
};
Expand Down Expand Up @@ -113,6 +114,13 @@ impl SdkMeterProvider {
}
}

impl Drop for SdkMeterProvider {
fn drop(&mut self) {
if let Err(err) = self.shutdown() {
global::handle_error(err);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might cause false alarms.. it is normal for users to call shutdown of their own, and then drop will call it again, causing the global error handler to print a message. It is unactionable for users, and they haven't anything wrong either.... Might need to revisit.

Copy link

@bwalk-at-ibm bwalk-at-ibm Apr 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's even worse because this is bugged because dropping just one reference to any metrics provider will shutdown and terminate the underlying data (pipelines), invalidating all copies, especially the registered global metrics provider.

}
}
}
impl MeterProvider for SdkMeterProvider {
fn versioned_meter(
&self,
Expand Down Expand Up @@ -211,6 +219,7 @@ impl fmt::Debug for MeterProviderBuilder {
mod tests {
use crate::testing::metrics::metric_reader::TestMetricReader;
use crate::Resource;
use opentelemetry::global;
use opentelemetry::Key;
use opentelemetry::KeyValue;
use std::env;
Expand All @@ -228,14 +237,14 @@ mod tests {
expect.map(|s| s.to_string())
);
};
let reader = TestMetricReader {};
let reader = TestMetricReader::new();
let default_meter_provider = super::SdkMeterProvider::builder()
.with_reader(reader)
.build();
assert_service_name(default_meter_provider, Some("unknown_service"));

// If user provided a resource, use that.
let reader2 = TestMetricReader {};
let reader2 = TestMetricReader::new();
let custom_meter_provider = super::SdkMeterProvider::builder()
.with_reader(reader2)
.with_resource(Resource::new(vec![KeyValue::new(
Expand All @@ -250,7 +259,7 @@ mod tests {
Some("key1=value1, k2, k3=value2"),
|| {
// If `OTEL_RESOURCE_ATTRIBUTES` is set, read them automatically
let reader3 = TestMetricReader {};
let reader3 = TestMetricReader::new();
let env_resource_provider = super::SdkMeterProvider::builder()
.with_reader(reader3)
.build();
Expand All @@ -273,7 +282,7 @@ mod tests {
"OTEL_RESOURCE_ATTRIBUTES",
Some("my-custom-key=env-val,k2=value2"),
|| {
let reader4 = TestMetricReader {};
let reader4 = TestMetricReader::new();
let user_provided_resource_config_provider = super::SdkMeterProvider::builder()
.with_reader(reader4)
.with_resource(Resource::default().merge(&mut Resource::new(vec![
Expand All @@ -295,12 +304,37 @@ mod tests {
);

// If user provided a resource, it takes priority during collision.
let reader5 = TestMetricReader {};
let reader5 = TestMetricReader::new();
let no_service_name = super::SdkMeterProvider::builder()
.with_reader(reader5)
.with_resource(Resource::empty())
.build();

assert_service_name(no_service_name, None);
}

#[test]
fn test_meter_provider_shutdown() {
let reader = TestMetricReader::new();
let provider = super::SdkMeterProvider::builder()
.with_reader(reader.clone())
.build();
global::set_meter_provider(provider.clone());
assert!(!provider
.is_shutdown
.load(std::sync::atomic::Ordering::Relaxed));
assert!(!reader.is_shutdown());
// create a meter and an instrument
let meter = global::meter("test");
let counter = meter.u64_counter("test_counter").init();
// no need to drop a meter for meter_provider shutdown
global::shutdown_meter_provider();
assert!(provider
.is_shutdown
.load(std::sync::atomic::Ordering::Relaxed));
assert!(reader.is_shutdown());
// TODO Fix: the instrument is still available, and can be used.
// While the reader is shutdown, and no collect is happening
counter.add(1, &[]);
}
}
Loading
Loading