Skip to content

Commit 12e0a03

Browse files
Fix observable counter cumulative aggregation
1 parent ccd5f08 commit 12e0a03

File tree

5 files changed

+227
-25
lines changed

5 files changed

+227
-25
lines changed

opentelemetry-sdk/CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
- `shutdown` methods in `LoggerProvider` and `LogProcessor` now takes a immutable reference
2727
- After `shutdown`, `LoggerProvider` will return noop `Logger`
2828
- After `shutdown`, `LogProcessor` will not process any new logs
29+
- [#1644](https://github.com/open-telemetry/opentelemetry-rust/pull/1644) Fix cumulative aggregation for observable counters.
2930

3031
## v0.22.1
3132

opentelemetry-sdk/src/metrics/internal/sum.rs

+37-23
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::marker::PhantomData;
12
use std::sync::atomic::{AtomicBool, Ordering};
23
use std::{
34
collections::{hash_map::Entry, HashMap},
@@ -14,30 +15,56 @@ use super::{
1415
AtomicTracker, Number,
1516
};
1617

18+
/// Abstracts the update operation for a measurement.
19+
trait UpdateOperation<T> {
20+
fn update(a: &mut T, b: T);
21+
}
22+
23+
struct AddAssign;
24+
25+
impl<T> UpdateOperation<T> for AddAssign
26+
where
27+
T: std::ops::AddAssign<T>,
28+
{
29+
fn update(a: &mut T, b: T) {
30+
*a += b
31+
}
32+
}
33+
34+
struct Assign;
35+
36+
impl<T> UpdateOperation<T> for Assign {
37+
fn update(a: &mut T, b: T) {
38+
*a = b
39+
}
40+
}
41+
1742
/// The storage for sums.
18-
struct ValueMap<T: Number<T>> {
43+
struct ValueMap<T: Number<T>, O> {
1944
values: Mutex<HashMap<AttributeSet, T>>,
2045
has_no_value_attribute_value: AtomicBool,
2146
no_attribute_value: T::AtomicTracker,
47+
_operation: PhantomData<O>,
2248
}
2349

24-
impl<T: Number<T>> Default for ValueMap<T> {
50+
impl<T: Number<T>, O: UpdateOperation<T>> Default for ValueMap<T, O> {
2551
fn default() -> Self {
2652
ValueMap::new()
2753
}
2854
}
2955

30-
impl<T: Number<T>> ValueMap<T> {
56+
impl<T: Number<T>, O: UpdateOperation<T>> ValueMap<T, O> {
3157
fn new() -> Self {
3258
ValueMap {
3359
values: Mutex::new(HashMap::new()),
3460
has_no_value_attribute_value: AtomicBool::new(false),
3561
no_attribute_value: T::new_atomic_tracker(),
62+
_operation: PhantomData,
3663
}
3764
}
3865
}
3966

40-
impl<T: Number<T>> ValueMap<T> {
67+
impl<T: Number<T>, O: UpdateOperation<T>> ValueMap<T, O> {
4168
fn measure(&self, measurement: T, attrs: AttributeSet) {
4269
if attrs.is_empty() {
4370
self.no_attribute_value.add(measurement);
@@ -47,16 +74,16 @@ impl<T: Number<T>> ValueMap<T> {
4774
let size = values.len();
4875
match values.entry(attrs) {
4976
Entry::Occupied(mut occupied_entry) => {
50-
let sum = occupied_entry.get_mut();
51-
*sum += measurement;
77+
let value = occupied_entry.get_mut();
78+
O::update(value, measurement);
5279
}
5380
Entry::Vacant(vacant_entry) => {
5481
if is_under_cardinality_limit(size) {
5582
vacant_entry.insert(measurement);
5683
} else {
5784
values
5885
.entry(STREAM_OVERFLOW_ATTRIBUTE_SET.clone())
59-
.and_modify(|val| *val += measurement)
86+
.and_modify(|val| O::update(val, measurement))
6087
.or_insert(measurement);
6188
global::handle_error(MetricsError::Other("Warning: Maximum data points for metric stream exceeded. Entry added to overflow.".into()));
6289
}
@@ -68,7 +95,7 @@ impl<T: Number<T>> ValueMap<T> {
6895

6996
/// Summarizes a set of measurements made as their arithmetic sum.
7097
pub(crate) struct Sum<T: Number<T>> {
71-
value_map: ValueMap<T>,
98+
value_map: ValueMap<T, AddAssign>,
7299
monotonic: bool,
73100
start: Mutex<SystemTime>,
74101
}
@@ -232,7 +259,7 @@ impl<T: Number<T>> Sum<T> {
232259

233260
/// Summarizes a set of pre-computed sums as their arithmetic sum.
234261
pub(crate) struct PrecomputedSum<T: Number<T>> {
235-
value_map: ValueMap<T>,
262+
value_map: ValueMap<T, Assign>,
236263
monotonic: bool,
237264
start: Mutex<SystemTime>,
238265
reported: Mutex<HashMap<AttributeSet, T>>,
@@ -367,11 +394,6 @@ impl<T: Number<T>> PrecomputedSum<T> {
367394
.data_points
368395
.reserve_exact(n - s_data.data_points.capacity());
369396
}
370-
let mut new_reported = HashMap::with_capacity(n);
371-
let mut reported = match self.reported.lock() {
372-
Ok(r) => r,
373-
Err(_) => return (0, None),
374-
};
375397

376398
if self
377399
.value_map
@@ -387,24 +409,16 @@ impl<T: Number<T>> PrecomputedSum<T> {
387409
});
388410
}
389411

390-
let default = T::default();
391412
for (attrs, value) in values.iter() {
392-
let delta = *value - *reported.get(attrs).unwrap_or(&default);
393-
if delta != default {
394-
new_reported.insert(attrs.clone(), *value);
395-
}
396413
s_data.data_points.push(DataPoint {
397414
attributes: attrs.clone(),
398415
start_time: Some(prev_start),
399416
time: Some(t),
400-
value: delta,
417+
value: *value,
401418
exemplars: vec![],
402419
});
403420
}
404421

405-
*reported = new_reported;
406-
drop(reported); // drop before values guard is dropped
407-
408422
(
409423
s_data.data_points.len(),
410424
new_agg.map(|a| Box::new(a) as Box<_>),

opentelemetry-sdk/src/metrics/mod.rs

+187
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ mod tests {
7272
KeyValue,
7373
};
7474
use std::borrow::Cow;
75+
use std::sync::{Arc, Mutex};
7576

7677
// "multi_thread" tokio flavor must be used else flush won't
7778
// be able to make progress!
@@ -980,4 +981,190 @@ mod tests {
980981
.expect("Failed to cast aggregation to expected type")
981982
}
982983
}
984+
985+
/// Observable counter in delta aggregation.
986+
///
987+
/// ObservableCounter provides the current (i.e cumulative) value of the counter at the time of observation,
988+
/// and the SDK is expected to remember the previous value, so that it can do cumulative to
989+
/// delta conversion.
990+
#[tokio::test(flavor = "multi_thread")]
991+
async fn observable_counter_delta() {
992+
// cargo test observable_counter_delta --features=metrics,testing -- --nocapture
993+
994+
// Arrange
995+
let test_context = TestContext::new(Some(Temporality::Delta));
996+
let meter_provider = test_context.meter_provider;
997+
let exporter = test_context.exporter;
998+
let meter = meter_provider.meter("test");
999+
let observable_counter = meter.u64_observable_counter("my_observable_counter").init();
1000+
1001+
// Act
1002+
// The Observable counter reports 100, 200, 300 and so on.
1003+
let i = Arc::new(Mutex::new(0));
1004+
meter
1005+
.register_callback(&[observable_counter.as_any()], move |observer| {
1006+
let mut num = i.lock().unwrap();
1007+
*num += 1;
1008+
1009+
println!("Observable Counter is reporting: {}", *num * 100);
1010+
1011+
observer.observe_u64(
1012+
&observable_counter,
1013+
*num * 100,
1014+
[
1015+
KeyValue::new("statusCode", "200"),
1016+
KeyValue::new("verb", "get"),
1017+
]
1018+
.as_ref(),
1019+
);
1020+
})
1021+
.expect("Expected to register callback");
1022+
1023+
meter_provider.force_flush().unwrap();
1024+
1025+
// Assert
1026+
let resource_metrics = exporter
1027+
.get_finished_metrics()
1028+
.expect("metrics are expected to be exported.");
1029+
assert!(!resource_metrics.is_empty());
1030+
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
1031+
assert_eq!(metric.name, "my_observable_counter");
1032+
1033+
let sum = metric
1034+
.data
1035+
.as_any()
1036+
.downcast_ref::<data::Sum<u64>>()
1037+
.expect("Sum aggregation expected for ObservableCounter instruments by default");
1038+
1039+
assert_eq!(
1040+
sum.temporality,
1041+
data::Temporality::Delta,
1042+
"Should produce Delta as configured."
1043+
);
1044+
1045+
assert_eq!(sum.data_points.len(), 1);
1046+
1047+
// find and validate the single datapoint
1048+
let data_point = &sum.data_points[0];
1049+
assert_eq!(data_point.value, 100);
1050+
1051+
// Flush again, to trigger next collection.
1052+
exporter.reset();
1053+
meter_provider.force_flush().unwrap();
1054+
1055+
// Assert
1056+
let resource_metrics = exporter
1057+
.get_finished_metrics()
1058+
.expect("metrics are expected to be exported.");
1059+
assert!(!resource_metrics.is_empty());
1060+
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
1061+
assert_eq!(metric.name, "my_observable_counter");
1062+
1063+
let sum = metric
1064+
.data
1065+
.as_any()
1066+
.downcast_ref::<data::Sum<u64>>()
1067+
.expect("Sum aggregation expected for ObservableCounter instruments by default");
1068+
1069+
assert_eq!(sum.data_points.len(), 1);
1070+
1071+
// find and validate the single datapoint
1072+
let data_point = &sum.data_points[0];
1073+
1074+
// The second observation should be 100 as well, as temporality is delta
1075+
assert_eq!(data_point.value, 100);
1076+
}
1077+
1078+
/// Tests Observable counter in cumulative aggregation.
1079+
///
1080+
/// ObservableCounter provides the current (i.e Cumulative) value of the counter at the time of observation,
1081+
/// and the SDK is expected to aggregate the value as-is.
1082+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1083+
async fn observable_counter_cumulative() {
1084+
// cargo test observable_counter_cumulative --features=metrics,testing -- --nocapture
1085+
1086+
// Arrange
1087+
let test_context = TestContext::new(Some(Temporality::Cumulative));
1088+
let meter_provider = test_context.meter_provider;
1089+
let exporter = test_context.exporter;
1090+
let meter = meter_provider.meter("test");
1091+
let observable_counter = meter.u64_observable_counter("my_observable_counter").init();
1092+
1093+
// Act
1094+
// The Observable counter reports 100, 200, 300 and so on.
1095+
let i = Arc::new(Mutex::new(0));
1096+
meter
1097+
.register_callback(&[observable_counter.as_any()], move |observer| {
1098+
let mut num = i.lock().unwrap();
1099+
*num += 1;
1100+
1101+
println!("Observable Counter is reporting: {}", *num * 100);
1102+
1103+
observer.observe_u64(
1104+
&observable_counter,
1105+
*num * 100,
1106+
[
1107+
KeyValue::new("statusCode", "200"),
1108+
KeyValue::new("verb", "get"),
1109+
]
1110+
.as_ref(),
1111+
);
1112+
})
1113+
.expect("Expected to register callback");
1114+
1115+
meter_provider.force_flush().unwrap();
1116+
1117+
// Assert
1118+
let resource_metrics = exporter
1119+
.get_finished_metrics()
1120+
.expect("metrics are expected to be exported.");
1121+
assert!(!resource_metrics.is_empty());
1122+
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
1123+
assert_eq!(metric.name, "my_observable_counter");
1124+
1125+
let sum = metric
1126+
.data
1127+
.as_any()
1128+
.downcast_ref::<data::Sum<u64>>()
1129+
.expect("Sum aggregation expected for ObservableCounter instruments by default");
1130+
1131+
assert_eq!(
1132+
sum.temporality,
1133+
data::Temporality::Cumulative,
1134+
"Should produce Cumulative by default."
1135+
);
1136+
1137+
assert_eq!(sum.data_points.len(), 1);
1138+
1139+
// find and validate the single datapoint
1140+
let data_point = &sum.data_points[0];
1141+
// 100 is the first observation.
1142+
assert_eq!(data_point.value, 100);
1143+
1144+
// Flush again, to trigger next collection.
1145+
exporter.reset();
1146+
meter_provider.force_flush().unwrap();
1147+
1148+
// Assert
1149+
let resource_metrics = exporter
1150+
.get_finished_metrics()
1151+
.expect("metrics are expected to be exported.");
1152+
1153+
assert!(!resource_metrics.is_empty());
1154+
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
1155+
assert_eq!(metric.name, "my_observable_counter");
1156+
1157+
let sum = metric
1158+
.data
1159+
.as_any()
1160+
.downcast_ref::<data::Sum<u64>>()
1161+
.expect("Sum aggregation expected for ObservableCounter instruments by default");
1162+
1163+
assert_eq!(sum.data_points.len(), 1);
1164+
1165+
// find and validate the single datapoint
1166+
let data_point = &sum.data_points[0];
1167+
// The second observation should be 200
1168+
assert_eq!(data_point.value, 200);
1169+
}
9831170
}

opentelemetry/src/metrics/instruments/counter.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ impl<T> fmt::Debug for ObservableCounter<T> {
8282
}
8383

8484
impl<T> ObservableCounter<T> {
85-
/// Records an increment to the counter.
85+
/// Records the absolute value of the counter.
8686
///
8787
/// It is only valid to call this within a callback. If called outside of the
8888
/// registered callback it should have no effect on the instrument, and an

opentelemetry/src/metrics/instruments/up_down_counter.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ impl<T> ObservableUpDownCounter<T> {
8888
ObservableUpDownCounter(inner)
8989
}
9090

91-
/// Records the increment or decrement to the counter.
91+
/// Records the absolute value of the counter.
9292
///
9393
/// It is only valid to call this within a callback. If called outside of the
9494
/// registered callback it should have no effect on the instrument, and an

0 commit comments

Comments
 (0)