Skip to content

Commit d2fafa4

Browse files
utpillacijothomas
andauthored
Fix aggregation for Gauge (#2021)
Co-authored-by: Cijo Thomas <cijo.thomas@gmail.com>
1 parent df91db3 commit d2fafa4

File tree

4 files changed

+184
-12
lines changed

4 files changed

+184
-12
lines changed

opentelemetry-sdk/CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
ObservableCounter,UpDownCounter including
2121
[#1517](https://github.com/open-telemetry/opentelemetry-rust/issues/1517).
2222
[#2004](https://github.com/open-telemetry/opentelemetry-rust/pull/2004)
23+
- Fixed a bug related to cumulative aggregation of `Gauge` measurements.
24+
[#1975](https://github.com/open-telemetry/opentelemetry-rust/issues/1975).
25+
[#2021](https://github.com/open-telemetry/opentelemetry-rust/pull/2021)
2326

2427
## v0.24.1
2528

opentelemetry-sdk/src/metrics/internal/aggregate.rs

+7-5
Original file line numberDiff line numberDiff line change
@@ -106,13 +106,10 @@ impl<T: Number<T>> AggregateBuilder<T> {
106106
}
107107

108108
/// Builds a last-value aggregate function input and output.
109-
///
110-
/// [Builder::temporality] is ignored and delta is always used.
111109
pub(crate) fn last_value(&self) -> (impl Measure<T>, impl ComputeAggregation) {
112-
// Delta temporality is the only temporality that makes semantic sense for
113-
// a last-value aggregate.
114110
let lv_filter = Arc::new(LastValue::new());
115111
let lv_agg = Arc::clone(&lv_filter);
112+
let t = self.temporality;
116113

117114
(
118115
self.filter(move |n, a: &[KeyValue]| lv_filter.measure(n, a)),
@@ -127,7 +124,12 @@ impl<T: Number<T>> AggregateBuilder<T> {
127124
};
128125
let g = g.unwrap_or_else(|| new_agg.as_mut().expect("present if g is none"));
129126

130-
lv_agg.compute_aggregation(&mut g.data_points);
127+
match t {
128+
Some(Temporality::Delta) => {
129+
lv_agg.compute_aggregation_delta(&mut g.data_points)
130+
}
131+
_ => lv_agg.compute_aggregation_cumulative(&mut g.data_points),
132+
}
131133

132134
(g.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
133135
},

opentelemetry-sdk/src/metrics/internal/last_value.rs

+45-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ impl<T: Number<T>> LastValue<T> {
2525
self.value_map.measure(measurement, attrs);
2626
}
2727

28-
pub(crate) fn compute_aggregation(&self, dest: &mut Vec<DataPoint<T>>) {
28+
pub(crate) fn compute_aggregation_delta(&self, dest: &mut Vec<DataPoint<T>>) {
2929
let t = SystemTime::now();
3030
dest.clear();
3131

@@ -68,4 +68,48 @@ impl<T: Number<T>> LastValue<T> {
6868
}
6969
}
7070
}
71+
72+
pub(crate) fn compute_aggregation_cumulative(&self, dest: &mut Vec<DataPoint<T>>) {
73+
let t = SystemTime::now();
74+
dest.clear();
75+
76+
// Max number of data points need to account for the special casing
77+
// of the no attribute value + overflow attribute.
78+
let n = self.value_map.count.load(Ordering::SeqCst) + 2;
79+
if n > dest.capacity() {
80+
dest.reserve_exact(n - dest.capacity());
81+
}
82+
83+
if self
84+
.value_map
85+
.has_no_attribute_value
86+
.load(Ordering::Acquire)
87+
{
88+
dest.push(DataPoint {
89+
attributes: vec![],
90+
start_time: None,
91+
time: Some(t),
92+
value: self.value_map.no_attribute_tracker.get_value(),
93+
exemplars: vec![],
94+
});
95+
}
96+
97+
let trackers = match self.value_map.trackers.write() {
98+
Ok(v) => v,
99+
_ => return,
100+
};
101+
102+
let mut seen = HashSet::new();
103+
for (attrs, tracker) in trackers.iter() {
104+
if seen.insert(Arc::as_ptr(tracker)) {
105+
dest.push(DataPoint {
106+
attributes: attrs.clone(),
107+
start_time: None,
108+
time: Some(t),
109+
value: tracker.get_value(),
110+
exemplars: vec![],
111+
});
112+
}
113+
}
114+
}
71115
}

opentelemetry-sdk/src/metrics/mod.rs

+129-6
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ mod tests {
159159
use opentelemetry::{metrics::MeterProvider as _, KeyValue};
160160
use rand::{rngs, Rng, SeedableRng};
161161
use std::borrow::Cow;
162+
use std::sync::atomic::{AtomicBool, Ordering};
162163
use std::sync::{Arc, Mutex};
163164
use std::thread;
164165
use std::time::Duration;
@@ -1278,12 +1279,7 @@ mod tests {
12781279
synchronous_instruments_cumulative_with_gap_in_measurements_helper("counter");
12791280
synchronous_instruments_cumulative_with_gap_in_measurements_helper("updown_counter");
12801281
synchronous_instruments_cumulative_with_gap_in_measurements_helper("histogram");
1281-
1282-
/* Synchronous Gauge has an aggregation bug. Uncomment the code below to run the test for gauge
1283-
once this issue is fixed: https://github.com/open-telemetry/opentelemetry-rust/issues/1975
1284-
*/
1285-
1286-
// synchronous_instruments_cumulative_with_gap_in_measurements_helper("gauge");
1282+
synchronous_instruments_cumulative_with_gap_in_measurements_helper("gauge");
12871283
}
12881284

12891285
fn synchronous_instruments_cumulative_with_gap_in_measurements_helper(
@@ -1404,6 +1400,133 @@ mod tests {
14041400
}
14051401
}
14061402

1403+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1404+
async fn asynchronous_instruments_cumulative_with_gap_in_measurements() {
1405+
// Run this test with stdout enabled to see output.
1406+
// cargo test asynchronous_instruments_cumulative_with_gap_in_measurements --features=testing -- --nocapture
1407+
1408+
asynchronous_instruments_cumulative_with_gap_in_measurements_helper("counter");
1409+
asynchronous_instruments_cumulative_with_gap_in_measurements_helper("updown_counter");
1410+
asynchronous_instruments_cumulative_with_gap_in_measurements_helper("gauge");
1411+
}
1412+
1413+
fn asynchronous_instruments_cumulative_with_gap_in_measurements_helper(
1414+
instrument_name: &'static str,
1415+
) {
1416+
let mut test_context = TestContext::new(Temporality::Cumulative);
1417+
let attributes = Arc::new([KeyValue::new("key1", "value1")]);
1418+
1419+
// Create instrument and emit measurements
1420+
match instrument_name {
1421+
"counter" => {
1422+
let has_run = AtomicBool::new(false);
1423+
let _observable_counter = test_context
1424+
.meter()
1425+
.u64_observable_counter("test_counter")
1426+
.with_callback(move |observer| {
1427+
if !has_run.load(Ordering::SeqCst) {
1428+
observer.observe(5, &[]);
1429+
observer.observe(10, &*attributes.clone());
1430+
has_run.store(true, Ordering::SeqCst);
1431+
}
1432+
})
1433+
.init();
1434+
}
1435+
"updown_counter" => {
1436+
let has_run = AtomicBool::new(false);
1437+
let _observable_up_down_counter = test_context
1438+
.meter()
1439+
.i64_observable_up_down_counter("test_updowncounter")
1440+
.with_callback(move |observer| {
1441+
if !has_run.load(Ordering::SeqCst) {
1442+
observer.observe(15, &[]);
1443+
observer.observe(20, &*attributes.clone());
1444+
has_run.store(true, Ordering::SeqCst);
1445+
}
1446+
})
1447+
.init();
1448+
}
1449+
"gauge" => {
1450+
let has_run = AtomicBool::new(false);
1451+
let _observable_gauge = test_context
1452+
.meter()
1453+
.u64_observable_gauge("test_gauge")
1454+
.with_callback(move |observer| {
1455+
if !has_run.load(Ordering::SeqCst) {
1456+
observer.observe(25, &[]);
1457+
observer.observe(30, &*attributes.clone());
1458+
has_run.store(true, Ordering::SeqCst);
1459+
}
1460+
})
1461+
.init();
1462+
}
1463+
_ => panic!("Incorrect instrument kind provided"),
1464+
};
1465+
1466+
test_context.flush_metrics();
1467+
1468+
// Test the first export
1469+
assert_correct_export(&mut test_context, instrument_name);
1470+
1471+
// Reset and export again without making any measurements
1472+
test_context.reset_metrics();
1473+
1474+
test_context.flush_metrics();
1475+
1476+
// Test that latest export has the same data as the previous one
1477+
assert_correct_export(&mut test_context, instrument_name);
1478+
1479+
fn assert_correct_export(test_context: &mut TestContext, instrument_name: &'static str) {
1480+
match instrument_name {
1481+
"counter" => {
1482+
let counter_data =
1483+
test_context.get_aggregation::<data::Sum<u64>>("test_counter", None);
1484+
assert_eq!(counter_data.data_points.len(), 2);
1485+
assert!(counter_data.is_monotonic);
1486+
let zero_attribute_datapoint =
1487+
find_datapoint_with_no_attributes(&counter_data.data_points)
1488+
.expect("datapoint with no attributes expected");
1489+
assert_eq!(zero_attribute_datapoint.value, 5);
1490+
let data_point1 =
1491+
find_datapoint_with_key_value(&counter_data.data_points, "key1", "value1")
1492+
.expect("datapoint with key1=value1 expected");
1493+
assert_eq!(data_point1.value, 10);
1494+
}
1495+
"updown_counter" => {
1496+
let updown_counter_data =
1497+
test_context.get_aggregation::<data::Sum<i64>>("test_updowncounter", None);
1498+
assert_eq!(updown_counter_data.data_points.len(), 2);
1499+
assert!(!updown_counter_data.is_monotonic);
1500+
let zero_attribute_datapoint =
1501+
find_datapoint_with_no_attributes(&updown_counter_data.data_points)
1502+
.expect("datapoint with no attributes expected");
1503+
assert_eq!(zero_attribute_datapoint.value, 15);
1504+
let data_point1 = find_datapoint_with_key_value(
1505+
&updown_counter_data.data_points,
1506+
"key1",
1507+
"value1",
1508+
)
1509+
.expect("datapoint with key1=value1 expected");
1510+
assert_eq!(data_point1.value, 20);
1511+
}
1512+
"gauge" => {
1513+
let gauge_data =
1514+
test_context.get_aggregation::<data::Gauge<u64>>("test_gauge", None);
1515+
assert_eq!(gauge_data.data_points.len(), 2);
1516+
let zero_attribute_datapoint =
1517+
find_datapoint_with_no_attributes(&gauge_data.data_points)
1518+
.expect("datapoint with no attributes expected");
1519+
assert_eq!(zero_attribute_datapoint.value, 25);
1520+
let data_point1 =
1521+
find_datapoint_with_key_value(&gauge_data.data_points, "key1", "value1")
1522+
.expect("datapoint with key1=value1 expected");
1523+
assert_eq!(data_point1.value, 30);
1524+
}
1525+
_ => panic!("Incorrect instrument kind provided"),
1526+
}
1527+
}
1528+
}
1529+
14071530
fn counter_multithreaded_aggregation_helper(temporality: Temporality) {
14081531
// Arrange
14091532
let mut test_context = TestContext::new(temporality);

0 commit comments

Comments
 (0)