From d91d107c52f287513117f8c65b82fcbb5a9b19b9 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Thu, 8 Feb 2024 00:21:32 -0800 Subject: [PATCH 1/3] Add tests to confirm observable counter aggregation bug --- .cspell.json | 2 + opentelemetry-sdk/benches/metric_counter.rs | 14 +- opentelemetry-sdk/src/metrics/mod.rs | 215 +++++++++++++++++++- 3 files changed, 229 insertions(+), 2 deletions(-) diff --git a/.cspell.json b/.cspell.json index be20dea3d4..9b9b3294de 100644 --- a/.cspell.json +++ b/.cspell.json @@ -31,6 +31,7 @@ "Bhasin", "Cijo", "codecov", + "datapoint", "deque", "Dirkjan", "hasher", @@ -41,6 +42,7 @@ "Kumar", "Lalit", "msrv", + "nocapture", "Ochtman", "opentelemetry", "OTLP", diff --git a/opentelemetry-sdk/benches/metric_counter.rs b/opentelemetry-sdk/benches/metric_counter.rs index 4bb4c84e6a..da4493c6a4 100644 --- a/opentelemetry-sdk/benches/metric_counter.rs +++ b/opentelemetry-sdk/benches/metric_counter.rs @@ -4,6 +4,7 @@ use opentelemetry::{ KeyValue, }; use opentelemetry_sdk::metrics::{ManualReader, SdkMeterProvider}; +use pprof::criterion::{Output, PProfProfiler}; use rand::{rngs::SmallRng, Rng, SeedableRng}; // Run this benchmark with: @@ -69,6 +70,17 @@ fn counter_add(c: &mut Criterion) { }); } -criterion_group!(benches, criterion_benchmark); +#[cfg(not(target_os = "windows"))] +criterion_group! { + name = benches; + config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); + targets = criterion_benchmark +} +#[cfg(target_os = "windows")] +criterion_group! { + name = benches; + config = Criterion::default(); + targets = criterion_benchmark +} criterion_main!(benches); diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 021ee3d469..d800025e89 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -60,8 +60,13 @@ pub use view::*; #[cfg(all(test, feature = "testing"))] mod tests { + use std::sync::{Arc, Mutex}; + use self::{data::Temporality, reader::TemporalitySelector}; use super::*; - use crate::{runtime, testing::metrics::InMemoryMetricsExporter}; + use crate::{ + runtime, + testing::metrics::{InMemoryMetricsExporter, InMemoryMetricsExporterBuilder}, + }; use opentelemetry::{ metrics::{MeterProvider as _, Unit}, KeyValue, @@ -428,4 +433,212 @@ mod tests { let data_point = &sum.data_points[0]; assert_eq!(data_point.value, 30); } + + // "multi_thread" tokio flavor must be used else flush won't + // be able to make progress! + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn observable_counter_delta() { + // cargo test observable_counter_delta --features=metrics,testing -- --nocapture + // Tests Observable counter in delta aggregation. + // ObservableCounter provides the current (i.e Cumulative) value of the counter at the time of observation, + // and the SDK is expected to remember the previous value, so that it can do cumulative to + // delta conversion. + + #[derive(Clone, Default, Debug)] + struct DeltaTemporalitySelector { + pub(crate) _private: (), + } + + impl DeltaTemporalitySelector { + /// Create a new delta temporality selector. + fn new() -> Self { + Self::default() + } + } + + impl TemporalitySelector for DeltaTemporalitySelector { + fn temporality(&self, _kind: InstrumentKind) -> Temporality { + Temporality::Delta + } + } + + // Arrange + let exporter = InMemoryMetricsExporterBuilder::default() + .with_temporality_selector(DeltaTemporalitySelector::new()) + .build(); + let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + + // Act + let meter = meter_provider.meter("test"); + let observable_counter = meter.u64_observable_counter("my_observable_counter").init(); + + // The Observable counter reports 100,200,300 and so on. + let i = Arc::new(Mutex::new(0)); + meter + .register_callback(&[observable_counter.as_any()], move |observer| { + let mut num = i.lock().unwrap(); + *num += 1; + + println!("Observable Counter is reporting: {}", *num * 100); + + observer.observe_u64( + &observable_counter, + *num * 100, + [ + KeyValue::new("statusCode", "200"), + KeyValue::new("verb", "get"), + ] + .as_ref(), + ); + }) + .expect("Expected to register callback"); + + meter_provider.force_flush().unwrap(); + + // Assert + let resource_metrics = exporter + .get_finished_metrics() + .expect("metrics are expected to be exported."); + assert!(!resource_metrics.is_empty()); + let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; + assert_eq!(metric.name, "my_observable_counter",); + + let sum = metric + .data + .as_any() + .downcast_ref::>() + .expect("Sum aggregation expected for ObservableCounter instruments by default"); + + assert_eq!( + sum.temporality, + data::Temporality::Delta, + "Should produce Delta as configured." + ); + + assert_eq!(sum.data_points.len(), 1); + + // find and validate the single datapoint + let data_point = &sum.data_points[0]; + assert_eq!(data_point.value, 100); + + // Flush again, to trigger next collection. + meter_provider.force_flush().unwrap(); + + // Assert + let resource_metrics = exporter + .get_finished_metrics() + .expect("metrics are expected to be exported."); + assert!(!resource_metrics.is_empty()); + let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; + assert_eq!(metric.name, "my_observable_counter",); + + let sum = metric + .data + .as_any() + .downcast_ref::>() + .expect("Sum aggregation expected for ObservableCounter instruments by default"); + + assert_eq!(sum.data_points.len(), 1); + + // find and validate the single datapoint + let data_point = &sum.data_points[0]; + // The second observation should be 100 as well, as temporality is delta + assert_eq!(data_point.value, 100); + } + + // "multi_thread" tokio flavor must be used else flush won't + // be able to make progress! + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + #[ignore = "Bug in the aggregation."] + + async fn observable_counter_cumulative() { + // cargo test observable_counter_cumulative --features=metrics,testing -- --nocapture + // Tests Observable counter in cumulative aggregation. + // ObservableCounter provides the current (i.e Cumulative) value of the counter at the time of observation, + // and the SDK is expected to aggregate the value as-is. + + // Arrange + let exporter = InMemoryMetricsExporter::default(); + let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build(); + let meter_provider = SdkMeterProvider::builder().with_reader(reader).build(); + + // Act + let meter = meter_provider.meter("test"); + let observable_counter = meter.u64_observable_counter("my_observable_counter").init(); + + // The Observable counter reports 100,200,300 and so on. + let i = Arc::new(Mutex::new(0)); + meter + .register_callback(&[observable_counter.as_any()], move |observer| { + let mut num = i.lock().unwrap(); + *num += 1; + + println!("Observable Counter is reporting: {}", *num * 100); + + observer.observe_u64( + &observable_counter, + *num * 100, + [ + KeyValue::new("statusCode", "200"), + KeyValue::new("verb", "get"), + ] + .as_ref(), + ); + }) + .expect("Expected to register callback"); + + meter_provider.force_flush().unwrap(); + + // Assert + let resource_metrics = exporter + .get_finished_metrics() + .expect("metrics are expected to be exported."); + assert!(!resource_metrics.is_empty()); + let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; + assert_eq!(metric.name, "my_observable_counter",); + + let sum = metric + .data + .as_any() + .downcast_ref::>() + .expect("Sum aggregation expected for ObservableCounter instruments by default"); + + assert_eq!( + sum.temporality, + data::Temporality::Cumulative, + "Should produce Cumulative by default." + ); + + assert_eq!(sum.data_points.len(), 1); + + // find and validate the single datapoint + let data_point = &sum.data_points[0]; + // 100 is the first observation. + assert_eq!(data_point.value, 100); + + // Flush again, to trigger next collection. + meter_provider.force_flush().unwrap(); + + // Assert + let resource_metrics = exporter + .get_finished_metrics() + .expect("metrics are expected to be exported."); + assert!(!resource_metrics.is_empty()); + let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; + assert_eq!(metric.name, "my_observable_counter",); + + let sum = metric + .data + .as_any() + .downcast_ref::>() + .expect("Sum aggregation expected for ObservableCounter instruments by default"); + + assert_eq!(sum.data_points.len(), 1); + + // find and validate the single datapoint + let data_point = &sum.data_points[0]; + // The second observation should be 200 + assert_eq!(data_point.value, 200); + } } From fa0ee96819921924e80e7ff9dd8f9926b7c69684 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Thu, 8 Feb 2024 00:24:33 -0800 Subject: [PATCH 2/3] fmt --- opentelemetry-sdk/src/metrics/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index d800025e89..e11aed50b7 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -60,7 +60,6 @@ pub use view::*; #[cfg(all(test, feature = "testing"))] mod tests { - use std::sync::{Arc, Mutex}; use self::{data::Temporality, reader::TemporalitySelector}; use super::*; use crate::{ @@ -71,6 +70,7 @@ mod tests { metrics::{MeterProvider as _, Unit}, KeyValue, }; + use std::sync::{Arc, Mutex}; // "multi_thread" tokio flavor must be used else flush won't // be able to make progress! From 9bcdae8f99bc861865eda8bc3b3577bf4f21664e Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Thu, 8 Feb 2024 00:26:33 -0800 Subject: [PATCH 3/3] link to issue --- opentelemetry-sdk/src/metrics/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index e11aed50b7..9957817bf4 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -550,7 +550,7 @@ mod tests { // "multi_thread" tokio flavor must be used else flush won't // be able to make progress! #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - #[ignore = "Bug in the aggregation."] + #[ignore = "Bug in the aggregation. See https://github.com/open-telemetry/opentelemetry-rust/issues/1517"] async fn observable_counter_cumulative() { // cargo test observable_counter_cumulative --features=metrics,testing -- --nocapture