Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests to confirm observable counter aggregation bug #1516

Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"Bhasin",
"Cijo",
"codecov",
"datapoint",
"deque",
"Dirkjan",
"hasher",
Expand All @@ -41,6 +42,7 @@
"Kumar",
"Lalit",
"msrv",
"nocapture",
"Ochtman",
"opentelemetry",
"OTLP",
Expand Down
14 changes: 13 additions & 1 deletion opentelemetry-sdk/benches/metric_counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use opentelemetry::{
KeyValue,
};
use opentelemetry_sdk::metrics::{ManualReader, SdkMeterProvider};
use pprof::criterion::{Output, PProfProfiler};
use rand::{rngs::SmallRng, Rng, SeedableRng};

// Run this benchmark with:
Expand Down Expand Up @@ -69,6 +70,17 @@ fn counter_add(c: &mut Criterion) {
});
}

criterion_group!(benches, criterion_benchmark);
#[cfg(not(target_os = "windows"))]
criterion_group! {
name = benches;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
targets = criterion_benchmark
}
#[cfg(target_os = "windows")]
criterion_group! {
name = benches;
config = Criterion::default();
targets = criterion_benchmark
}

criterion_main!(benches);
215 changes: 214 additions & 1 deletion opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,17 @@

#[cfg(all(test, feature = "testing"))]
mod tests {
use self::{data::Temporality, reader::TemporalitySelector};
use super::*;
use crate::{runtime, testing::metrics::InMemoryMetricsExporter};
use crate::{
runtime,
testing::metrics::{InMemoryMetricsExporter, InMemoryMetricsExporterBuilder},
};
use opentelemetry::{
metrics::{MeterProvider as _, Unit},
KeyValue,
};
use std::sync::{Arc, Mutex};

// "multi_thread" tokio flavor must be used else flush won't
// be able to make progress!
Expand Down Expand Up @@ -428,4 +433,212 @@
let data_point = &sum.data_points[0];
assert_eq!(data_point.value, 30);
}

// "multi_thread" tokio flavor must be used else flush won't
// be able to make progress!
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn observable_counter_delta() {
// cargo test observable_counter_delta --features=metrics,testing -- --nocapture
// Tests Observable counter in delta aggregation.
// ObservableCounter provides the current (i.e Cumulative) value of the counter at the time of observation,
// and the SDK is expected to remember the previous value, so that it can do cumulative to
// delta conversion.

#[derive(Clone, Default, Debug)]
struct DeltaTemporalitySelector {
pub(crate) _private: (),
}

impl DeltaTemporalitySelector {
/// Create a new delta temporality selector.
fn new() -> Self {
Self::default()
}
}

impl TemporalitySelector for DeltaTemporalitySelector {
fn temporality(&self, _kind: InstrumentKind) -> Temporality {
Temporality::Delta
}
}

// Arrange
let exporter = InMemoryMetricsExporterBuilder::default()
.with_temporality_selector(DeltaTemporalitySelector::new())
.build();
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();

// Act
let meter = meter_provider.meter("test");
let observable_counter = meter.u64_observable_counter("my_observable_counter").init();

// The Observable counter reports 100,200,300 and so on.
let i = Arc::new(Mutex::new(0));
meter
.register_callback(&[observable_counter.as_any()], move |observer| {
let mut num = i.lock().unwrap();
*num += 1;

println!("Observable Counter is reporting: {}", *num * 100);

observer.observe_u64(
&observable_counter,
*num * 100,
[
KeyValue::new("statusCode", "200"),
KeyValue::new("verb", "get"),
]
.as_ref(),
);
})
.expect("Expected to register callback");

meter_provider.force_flush().unwrap();

// Assert
let resource_metrics = exporter
.get_finished_metrics()
.expect("metrics are expected to be exported.");
assert!(!resource_metrics.is_empty());
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
assert_eq!(metric.name, "my_observable_counter",);

let sum = metric
.data
.as_any()
.downcast_ref::<data::Sum<u64>>()
.expect("Sum aggregation expected for ObservableCounter instruments by default");

assert_eq!(
sum.temporality,
data::Temporality::Delta,
"Should produce Delta as configured."
);

assert_eq!(sum.data_points.len(), 1);

// find and validate the single datapoint
let data_point = &sum.data_points[0];
assert_eq!(data_point.value, 100);

// Flush again, to trigger next collection.
meter_provider.force_flush().unwrap();

// Assert
let resource_metrics = exporter
.get_finished_metrics()
.expect("metrics are expected to be exported.");
assert!(!resource_metrics.is_empty());
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
assert_eq!(metric.name, "my_observable_counter",);

let sum = metric
.data
.as_any()
.downcast_ref::<data::Sum<u64>>()
.expect("Sum aggregation expected for ObservableCounter instruments by default");

assert_eq!(sum.data_points.len(), 1);

// find and validate the single datapoint
let data_point = &sum.data_points[0];
// The second observation should be 100 as well, as temporality is delta
assert_eq!(data_point.value, 100);
}

// "multi_thread" tokio flavor must be used else flush won't
// be able to make progress!
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]

Check warning on line 552 in opentelemetry-sdk/src/metrics/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/mod.rs#L552

Added line #L552 was not covered by tests
#[ignore = "Bug in the aggregation. See https://github.com/open-telemetry/opentelemetry-rust/issues/1517"]

async fn observable_counter_cumulative() {
// cargo test observable_counter_cumulative --features=metrics,testing -- --nocapture
// Tests Observable counter in cumulative aggregation.
// ObservableCounter provides the current (i.e Cumulative) value of the counter at the time of observation,
// and the SDK is expected to aggregate the value as-is.

// Arrange
let exporter = InMemoryMetricsExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();

// Act
let meter = meter_provider.meter("test");
let observable_counter = meter.u64_observable_counter("my_observable_counter").init();

// The Observable counter reports 100,200,300 and so on.
let i = Arc::new(Mutex::new(0));
meter
.register_callback(&[observable_counter.as_any()], move |observer| {
let mut num = i.lock().unwrap();
*num += 1;

println!("Observable Counter is reporting: {}", *num * 100);

observer.observe_u64(
&observable_counter,
*num * 100,
[
KeyValue::new("statusCode", "200"),
KeyValue::new("verb", "get"),
]
.as_ref(),
);
})
.expect("Expected to register callback");

meter_provider.force_flush().unwrap();

// Assert
let resource_metrics = exporter
.get_finished_metrics()
.expect("metrics are expected to be exported.");
assert!(!resource_metrics.is_empty());
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
assert_eq!(metric.name, "my_observable_counter",);

let sum = metric
.data
.as_any()
.downcast_ref::<data::Sum<u64>>()
.expect("Sum aggregation expected for ObservableCounter instruments by default");

assert_eq!(
sum.temporality,
data::Temporality::Cumulative,
"Should produce Cumulative by default."
);

assert_eq!(sum.data_points.len(), 1);

// find and validate the single datapoint
let data_point = &sum.data_points[0];
// 100 is the first observation.
assert_eq!(data_point.value, 100);

// Flush again, to trigger next collection.
meter_provider.force_flush().unwrap();

// Assert
let resource_metrics = exporter
.get_finished_metrics()
.expect("metrics are expected to be exported.");
assert!(!resource_metrics.is_empty());
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
assert_eq!(metric.name, "my_observable_counter",);

let sum = metric
.data
.as_any()
.downcast_ref::<data::Sum<u64>>()
.expect("Sum aggregation expected for ObservableCounter instruments by default");

assert_eq!(sum.data_points.len(), 1);

// find and validate the single datapoint
let data_point = &sum.data_points[0];
// The second observation should be 200
assert_eq!(data_point.value, 200);
}

Check warning on line 643 in opentelemetry-sdk/src/metrics/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/mod.rs#L555-L643

Added lines #L555 - L643 were not covered by tests
}
Loading