Skip to content

Commit 706a067

Browse files
frailltlalitbcijothomas
authored
ValueMap interface change (#2117)
Co-authored-by: Lalit Kumar Bhasin <labhas@microsoft.com> Co-authored-by: Cijo Thomas <cijo.thomas@gmail.com>
1 parent e1860c7 commit 706a067

File tree

6 files changed

+244
-136
lines changed

6 files changed

+244
-136
lines changed

opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs

+1
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,7 @@ impl<T: Number> ExpoHistogram<T> {
352352
pub(crate) fn measure(&self, value: T, attrs: &[KeyValue]) {
353353
let f_value = value.into_float();
354354
// Ignore NaN and infinity.
355+
// Only makes sense if T is f64, maybe this could be no-op for other cases?
355356
if f_value.is_infinite() || f_value.is_nan() {
356357
return;
357358
}

opentelemetry-sdk/src/metrics/internal/histogram.rs

+91-23
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,22 @@ use crate::metrics::data::HistogramDataPoint;
77
use crate::metrics::data::{self, Aggregation, Temporality};
88
use opentelemetry::KeyValue;
99

10-
use super::Number;
11-
use super::{AtomicTracker, AtomicallyUpdate, Operation, ValueMap};
12-
13-
struct HistogramUpdate;
14-
15-
impl Operation for HistogramUpdate {
16-
fn update_tracker<T: Default, AT: AtomicTracker<T>>(tracker: &AT, value: T, index: usize) {
17-
tracker.update_histogram(index, value);
18-
}
19-
}
10+
use super::ValueMap;
11+
use super::{Aggregator, Number};
2012

2113
struct HistogramTracker<T> {
2214
buckets: Mutex<Buckets<T>>,
2315
}
2416

25-
impl<T: Number> AtomicTracker<T> for HistogramTracker<T> {
26-
fn update_histogram(&self, index: usize, value: T) {
17+
impl<T> Aggregator<T> for HistogramTracker<T>
18+
where
19+
T: Number,
20+
{
21+
type InitConfig = usize;
22+
/// Value and bucket index
23+
type PreComputedValue = (T, usize);
24+
25+
fn update(&self, (value, index): (T, usize)) {
2726
let mut buckets = match self.buckets.lock() {
2827
Ok(guard) => guard,
2928
Err(_) => return,
@@ -32,15 +31,10 @@ impl<T: Number> AtomicTracker<T> for HistogramTracker<T> {
3231
buckets.bin(index, value);
3332
buckets.sum(value);
3433
}
35-
}
36-
37-
impl<T: Number> AtomicallyUpdate<T> for HistogramTracker<T> {
38-
type AtomicTracker = HistogramTracker<T>;
3934

40-
fn new_atomic_tracker(buckets_count: Option<usize>) -> Self::AtomicTracker {
41-
let count = buckets_count.unwrap();
35+
fn create(count: &usize) -> Self {
4236
HistogramTracker {
43-
buckets: Mutex::new(Buckets::<T>::new(count)),
37+
buckets: Mutex::new(Buckets::<T>::new(*count)),
4438
}
4539
}
4640
}
@@ -94,7 +88,7 @@ impl<T: Number> Buckets<T> {
9488
/// Summarizes a set of measurements as a histogram with explicitly defined
9589
/// buckets.
9690
pub(crate) struct Histogram<T: Number> {
97-
value_map: ValueMap<HistogramTracker<T>, T, HistogramUpdate>,
91+
value_map: ValueMap<T, HistogramTracker<T>>,
9892
bounds: Vec<f64>,
9993
record_min_max: bool,
10094
record_sum: bool,
@@ -103,9 +97,11 @@ pub(crate) struct Histogram<T: Number> {
10397

10498
impl<T: Number> Histogram<T> {
10599
pub(crate) fn new(boundaries: Vec<f64>, record_min_max: bool, record_sum: bool) -> Self {
100+
// TODO fix the bug, by first removing NaN and only then getting buckets_count
101+
// once we know the reason for performance degradation
106102
let buckets_count = boundaries.len() + 1;
107103
let mut histogram = Histogram {
108-
value_map: ValueMap::new_with_buckets_count(buckets_count),
104+
value_map: ValueMap::new(buckets_count),
109105
bounds: boundaries,
110106
record_min_max,
111107
record_sum,
@@ -122,14 +118,20 @@ impl<T: Number> Histogram<T> {
122118

123119
pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) {
124120
let f = measurement.into_float();
125-
121+
// Ignore NaN and infinity.
122+
// Only makes sense if T is f64, maybe this could be no-op for other cases?
123+
// TODO: uncomment once we know the reason for performance degradation
124+
// if f.is_infinite() || f.is_nan() {
125+
// return;
126+
// }
126127
// This search will return an index in the range `[0, bounds.len()]`, where
127128
// it will return `bounds.len()` if value is greater than the last element
128129
// of `bounds`. This aligns with the buckets in that the length of buckets
129130
// is `bounds.len()+1`, with the last bucket representing:
130131
// `(bounds[bounds.len()-1], +∞)`.
131132
let index = self.bounds.partition_point(|&x| x < f);
132-
self.value_map.measure(measurement, attrs, index);
133+
134+
self.value_map.measure((measurement, index), attrs);
133135
}
134136

135137
pub(crate) fn delta(
@@ -350,3 +352,69 @@ impl<T: Number> Histogram<T> {
350352
(h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
351353
}
352354
}
355+
356+
// TODO: uncomment once we know the reason for performance degradation
357+
// #[cfg(test)]
358+
// mod tests {
359+
360+
// use super::*;
361+
362+
// #[test]
363+
// fn when_f64_is_nan_or_infinity_then_ignore() {
364+
// struct Expected {
365+
// min: f64,
366+
// max: f64,
367+
// sum: f64,
368+
// count: u64,
369+
// }
370+
// impl Expected {
371+
// fn new(min: f64, max: f64, sum: f64, count: u64) -> Self {
372+
// Expected {
373+
// min,
374+
// max,
375+
// sum,
376+
// count,
377+
// }
378+
// }
379+
// }
380+
// struct TestCase {
381+
// values: Vec<f64>,
382+
// expected: Expected,
383+
// }
384+
385+
// let test_cases = vec![
386+
// TestCase {
387+
// values: vec![2.0, 4.0, 1.0],
388+
// expected: Expected::new(1.0, 4.0, 7.0, 3),
389+
// },
390+
// TestCase {
391+
// values: vec![2.0, 4.0, 1.0, f64::INFINITY],
392+
// expected: Expected::new(1.0, 4.0, 7.0, 3),
393+
// },
394+
// TestCase {
395+
// values: vec![2.0, 4.0, 1.0, -f64::INFINITY],
396+
// expected: Expected::new(1.0, 4.0, 7.0, 3),
397+
// },
398+
// TestCase {
399+
// values: vec![2.0, f64::NAN, 4.0, 1.0],
400+
// expected: Expected::new(1.0, 4.0, 7.0, 3),
401+
// },
402+
// TestCase {
403+
// values: vec![4.0, 4.0, 4.0, 2.0, 16.0, 1.0],
404+
// expected: Expected::new(1.0, 16.0, 31.0, 6),
405+
// },
406+
// ];
407+
408+
// for test in test_cases {
409+
// let h = Histogram::new(vec![], true, true);
410+
// for v in test.values {
411+
// h.measure(v, &[]);
412+
// }
413+
// let res = h.value_map.no_attribute_tracker.buckets.lock().unwrap();
414+
// assert_eq!(test.expected.max, res.max);
415+
// assert_eq!(test.expected.min, res.min);
416+
// assert_eq!(test.expected.sum, res.total);
417+
// assert_eq!(test.expected.count, res.count);
418+
// }
419+
// }
420+
// }

opentelemetry-sdk/src/metrics/internal/last_value.rs

+38-8
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,51 @@ use std::{
77
use crate::metrics::data::DataPoint;
88
use opentelemetry::KeyValue;
99

10-
use super::{Assign, AtomicTracker, Number, ValueMap};
10+
use super::{Aggregator, AtomicTracker, AtomicallyUpdate, Number, ValueMap};
11+
12+
/// this is reused by PrecomputedSum
13+
pub(crate) struct Assign<T>
14+
where
15+
T: AtomicallyUpdate<T>,
16+
{
17+
pub(crate) value: T::AtomicTracker,
18+
}
19+
20+
impl<T> Aggregator<T> for Assign<T>
21+
where
22+
T: Number,
23+
{
24+
type InitConfig = ();
25+
type PreComputedValue = T;
26+
27+
fn create(_init: &()) -> Self {
28+
Self {
29+
value: T::new_atomic_tracker(T::default()),
30+
}
31+
}
32+
33+
fn update(&self, value: T) {
34+
self.value.store(value)
35+
}
36+
}
1137

1238
/// Summarizes a set of measurements as the last one made.
1339
pub(crate) struct LastValue<T: Number> {
14-
value_map: ValueMap<T, T, Assign>,
40+
value_map: ValueMap<T, Assign<T>>,
1541
start: Mutex<SystemTime>,
1642
}
1743

1844
impl<T: Number> LastValue<T> {
1945
pub(crate) fn new() -> Self {
2046
LastValue {
21-
value_map: ValueMap::new(),
47+
value_map: ValueMap::new(()),
2248
start: Mutex::new(SystemTime::now()),
2349
}
2450
}
2551

2652
pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) {
2753
// The argument index is not applicable to LastValue.
28-
self.value_map.measure(measurement, attrs, 0);
54+
self.value_map.measure(measurement, attrs);
2955
}
3056

3157
pub(crate) fn compute_aggregation_delta(&self, dest: &mut Vec<DataPoint<T>>) {
@@ -49,7 +75,11 @@ impl<T: Number> LastValue<T> {
4975
attributes: vec![],
5076
start_time: Some(prev_start),
5177
time: Some(t),
52-
value: self.value_map.no_attribute_tracker.get_and_reset_value(),
78+
value: self
79+
.value_map
80+
.no_attribute_tracker
81+
.value
82+
.get_and_reset_value(),
5383
exemplars: vec![],
5484
});
5585
}
@@ -66,7 +96,7 @@ impl<T: Number> LastValue<T> {
6696
attributes: attrs.clone(),
6797
start_time: Some(prev_start),
6898
time: Some(t),
69-
value: tracker.get_value(),
99+
value: tracker.value.get_value(),
70100
exemplars: vec![],
71101
});
72102
}
@@ -101,7 +131,7 @@ impl<T: Number> LastValue<T> {
101131
attributes: vec![],
102132
start_time: Some(prev_start),
103133
time: Some(t),
104-
value: self.value_map.no_attribute_tracker.get_value(),
134+
value: self.value_map.no_attribute_tracker.value.get_value(),
105135
exemplars: vec![],
106136
});
107137
}
@@ -118,7 +148,7 @@ impl<T: Number> LastValue<T> {
118148
attributes: attrs.clone(),
119149
start_time: Some(prev_start),
120150
time: Some(t),
121-
value: tracker.get_value(),
151+
value: tracker.value.get_value(),
122152
exemplars: vec![],
123153
});
124154
}

0 commit comments

Comments
 (0)