Skip to content

Commit bd47a96

Browse files
frailltMindaugas Vinkelis
authored and
Mindaugas Vinkelis
committed
ValueMap interface change
1 parent d652dc8 commit bd47a96

File tree

6 files changed

+276
-192
lines changed

6 files changed

+276
-192
lines changed

opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs

+1
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,7 @@ impl<T: Number> ExpoHistogram<T> {
343343
pub(crate) fn measure(&self, value: T, attrs: &[KeyValue]) {
344344
let f_value = value.into_float();
345345
// Ignore NaN and infinity.
346+
// Only makes sense if T is f64, maybe this could be no-op for other cases?
346347
if f_value.is_infinite() || f_value.is_nan() {
347348
return;
348349
}

opentelemetry-sdk/src/metrics/internal/histogram.rs

+123-79
Original file line numberDiff line numberDiff line change
@@ -7,45 +7,9 @@ use crate::metrics::data::HistogramDataPoint;
77
use crate::metrics::data::{self, Aggregation, Temporality};
88
use opentelemetry::KeyValue;
99

10-
use super::Number;
11-
use super::{AtomicTracker, AtomicallyUpdate, Operation, ValueMap};
10+
use super::ValueMap;
11+
use super::{Aggregator, Number};
1212

13-
struct HistogramUpdate;
14-
15-
impl Operation for HistogramUpdate {
16-
fn update_tracker<T: Default, AT: AtomicTracker<T>>(tracker: &AT, value: T, index: usize) {
17-
tracker.update_histogram(index, value);
18-
}
19-
}
20-
21-
struct HistogramTracker<T> {
22-
buckets: Mutex<Buckets<T>>,
23-
}
24-
25-
impl<T: Number> AtomicTracker<T> for HistogramTracker<T> {
26-
fn update_histogram(&self, index: usize, value: T) {
27-
let mut buckets = match self.buckets.lock() {
28-
Ok(guard) => guard,
29-
Err(_) => return,
30-
};
31-
32-
buckets.bin(index, value);
33-
buckets.sum(value);
34-
}
35-
}
36-
37-
impl<T: Number> AtomicallyUpdate<T> for HistogramTracker<T> {
38-
type AtomicTracker = HistogramTracker<T>;
39-
40-
fn new_atomic_tracker(buckets_count: Option<usize>) -> Self::AtomicTracker {
41-
let count = buckets_count.unwrap();
42-
HistogramTracker {
43-
buckets: Mutex::new(Buckets::<T>::new(count)),
44-
}
45-
}
46-
}
47-
48-
#[derive(Default)]
4913
struct Buckets<T> {
5014
counts: Vec<u64>,
5115
count: u64,
@@ -54,29 +18,17 @@ struct Buckets<T> {
5418
max: T,
5519
}
5620

57-
impl<T: Number> Buckets<T> {
58-
/// returns buckets with `n` bins.
59-
fn new(n: usize) -> Buckets<T> {
60-
Buckets {
61-
counts: vec![0; n],
21+
impl<T> Buckets<T>
22+
where
23+
T: Number,
24+
{
25+
fn new(size: usize) -> Self {
26+
Self {
27+
counts: vec![0; size],
28+
count: 0,
29+
total: T::default(),
6230
min: T::max(),
6331
max: T::min(),
64-
..Default::default()
65-
}
66-
}
67-
68-
fn sum(&mut self, value: T) {
69-
self.total += value;
70-
}
71-
72-
fn bin(&mut self, idx: usize, value: T) {
73-
self.counts[idx] += 1;
74-
self.count += 1;
75-
if value < self.min {
76-
self.min = value;
77-
}
78-
if value > self.max {
79-
self.max = value
8032
}
8133
}
8234

@@ -91,45 +43,72 @@ impl<T: Number> Buckets<T> {
9143
}
9244
}
9345

46+
impl<T> Aggregator<T> for Mutex<Buckets<T>>
47+
where
48+
T: Number,
49+
{
50+
type InitConfig = usize;
51+
/// Value and bucket index
52+
type PreComputedValue = (T, usize);
53+
54+
fn create(size: &usize) -> Self {
55+
Mutex::new(Buckets::new(*size))
56+
}
57+
58+
fn update(&self, (value, idx): (T, usize)) {
59+
if let Ok(mut this) = self.lock() {
60+
this.counts[idx] += 1;
61+
this.count += 1;
62+
if value < this.min {
63+
this.min = value;
64+
}
65+
if value > this.max {
66+
this.max = value
67+
}
68+
this.total += value;
69+
}
70+
}
71+
}
72+
9473
/// Summarizes a set of measurements as a histogram with explicitly defined
9574
/// buckets.
9675
pub(crate) struct Histogram<T: Number> {
97-
value_map: ValueMap<HistogramTracker<T>, T, HistogramUpdate>,
76+
value_map: ValueMap<T, Mutex<Buckets<T>>>,
9877
bounds: Vec<f64>,
9978
record_min_max: bool,
10079
record_sum: bool,
10180
start: Mutex<SystemTime>,
10281
}
10382

10483
impl<T: Number> Histogram<T> {
105-
pub(crate) fn new(boundaries: Vec<f64>, record_min_max: bool, record_sum: bool) -> Self {
106-
let buckets_count = boundaries.len() + 1;
107-
let mut histogram = Histogram {
108-
value_map: ValueMap::new_with_buckets_count(buckets_count),
109-
bounds: boundaries,
84+
pub(crate) fn new(mut bounds: Vec<f64>, record_min_max: bool, record_sum: bool) -> Self {
85+
bounds.retain(|v| !v.is_nan());
86+
bounds.sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out"));
87+
let buckets_count = bounds.len() + 1;
88+
Self {
89+
value_map: ValueMap::new(buckets_count),
90+
bounds,
11091
record_min_max,
11192
record_sum,
11293
start: Mutex::new(SystemTime::now()),
113-
};
114-
115-
histogram.bounds.retain(|v| !v.is_nan());
116-
histogram
117-
.bounds
118-
.sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out"));
119-
120-
histogram
94+
}
12195
}
12296

12397
pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) {
12498
let f = measurement.into_float();
125-
99+
// Ignore NaN and infinity.
100+
// Only makes sense if T is f64, maybe this could be no-op for other cases?
101+
if f.is_infinite() || f.is_nan() {
102+
return;
103+
}
126104
// This search will return an index in the range `[0, bounds.len()]`, where
127105
// it will return `bounds.len()` if value is greater than the last element
128106
// of `bounds`. This aligns with the buckets in that the length of buckets
129107
// is `bounds.len()+1`, with the last bucket representing:
130108
// `(bounds[bounds.len()-1], +∞)`.
131109
let index = self.bounds.partition_point(|&x| x < f);
132-
self.value_map.measure(measurement, attrs, index);
110+
111+
self.value_map.measure((measurement, index), attrs);
133112
}
134113

135114
pub(crate) fn delta(
@@ -167,7 +146,7 @@ impl<T: Number> Histogram<T> {
167146
.has_no_attribute_value
168147
.swap(false, Ordering::AcqRel)
169148
{
170-
if let Ok(ref mut b) = self.value_map.no_attribute_tracker.buckets.lock() {
149+
if let Ok(ref mut b) = self.value_map.no_attribute_tracker.lock() {
171150
h.data_points.push(HistogramDataPoint {
172151
attributes: vec![],
173152
start_time: start,
@@ -205,7 +184,7 @@ impl<T: Number> Histogram<T> {
205184
let mut seen = HashSet::new();
206185
for (attrs, tracker) in trackers.drain() {
207186
if seen.insert(Arc::as_ptr(&tracker)) {
208-
if let Ok(b) = tracker.buckets.lock() {
187+
if let Ok(b) = tracker.lock() {
209188
h.data_points.push(HistogramDataPoint {
210189
attributes: attrs.clone(),
211190
start_time: start,
@@ -278,7 +257,7 @@ impl<T: Number> Histogram<T> {
278257
.has_no_attribute_value
279258
.load(Ordering::Acquire)
280259
{
281-
if let Ok(b) = &self.value_map.no_attribute_tracker.buckets.lock() {
260+
if let Ok(b) = &self.value_map.no_attribute_tracker.lock() {
282261
h.data_points.push(HistogramDataPoint {
283262
attributes: vec![],
284263
start_time: start,
@@ -318,7 +297,7 @@ impl<T: Number> Histogram<T> {
318297
let mut seen = HashSet::new();
319298
for (attrs, tracker) in trackers.iter() {
320299
if seen.insert(Arc::as_ptr(tracker)) {
321-
if let Ok(b) = tracker.buckets.lock() {
300+
if let Ok(b) = tracker.lock() {
322301
h.data_points.push(HistogramDataPoint {
323302
attributes: attrs.clone(),
324303
start_time: start,
@@ -350,3 +329,68 @@ impl<T: Number> Histogram<T> {
350329
(h.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
351330
}
352331
}
332+
333+
#[cfg(test)]
334+
mod tests {
335+
336+
use super::*;
337+
338+
#[test]
339+
fn when_f64_is_nan_or_infinity_then_ignore() {
340+
struct Expected {
341+
min: f64,
342+
max: f64,
343+
sum: f64,
344+
count: u64,
345+
}
346+
impl Expected {
347+
fn new(min: f64, max: f64, sum: f64, count: u64) -> Self {
348+
Expected {
349+
min,
350+
max,
351+
sum,
352+
count,
353+
}
354+
}
355+
}
356+
struct TestCase {
357+
values: Vec<f64>,
358+
expected: Expected,
359+
}
360+
361+
let test_cases = vec![
362+
TestCase {
363+
values: vec![2.0, 4.0, 1.0],
364+
expected: Expected::new(1.0, 4.0, 7.0, 3),
365+
},
366+
TestCase {
367+
values: vec![2.0, 4.0, 1.0, f64::INFINITY],
368+
expected: Expected::new(1.0, 4.0, 7.0, 3),
369+
},
370+
TestCase {
371+
values: vec![2.0, 4.0, 1.0, -f64::INFINITY],
372+
expected: Expected::new(1.0, 4.0, 7.0, 3),
373+
},
374+
TestCase {
375+
values: vec![2.0, f64::NAN, 4.0, 1.0],
376+
expected: Expected::new(1.0, 4.0, 7.0, 3),
377+
},
378+
TestCase {
379+
values: vec![4.0, 4.0, 4.0, 2.0, 16.0, 1.0],
380+
expected: Expected::new(1.0, 16.0, 31.0, 6),
381+
},
382+
];
383+
384+
for test in test_cases {
385+
let h = Histogram::new(vec![], true, true);
386+
for v in test.values {
387+
h.measure(v, &[]);
388+
}
389+
let res = h.value_map.no_attribute_tracker.lock().unwrap();
390+
assert_eq!(test.expected.max, res.max);
391+
assert_eq!(test.expected.min, res.min);
392+
assert_eq!(test.expected.sum, res.total);
393+
assert_eq!(test.expected.count, res.count);
394+
}
395+
}
396+
}

opentelemetry-sdk/src/metrics/internal/last_value.rs

+38-8
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,51 @@ use std::{
77
use crate::metrics::data::DataPoint;
88
use opentelemetry::KeyValue;
99

10-
use super::{Assign, AtomicTracker, Number, ValueMap};
10+
use super::{Aggregator, AtomicTracker, AtomicallyUpdate, Number, ValueMap};
11+
12+
/// this is reused by PrecomputedSum
13+
pub(crate) struct Assign<T>
14+
where
15+
T: AtomicallyUpdate<T>,
16+
{
17+
pub(crate) value: T::AtomicTracker,
18+
}
19+
20+
impl<T> Aggregator<T> for Assign<T>
21+
where
22+
T: Number,
23+
{
24+
type InitConfig = ();
25+
type PreComputedValue = T;
26+
27+
fn create(_init: &()) -> Self {
28+
Self {
29+
value: T::new_atomic_tracker(T::default()),
30+
}
31+
}
32+
33+
fn update(&self, value: T) {
34+
self.value.store(value)
35+
}
36+
}
1137

1238
/// Summarizes a set of measurements as the last one made.
1339
pub(crate) struct LastValue<T: Number> {
14-
value_map: ValueMap<T, T, Assign>,
40+
value_map: ValueMap<T, Assign<T>>,
1541
start: Mutex<SystemTime>,
1642
}
1743

1844
impl<T: Number> LastValue<T> {
1945
pub(crate) fn new() -> Self {
2046
LastValue {
21-
value_map: ValueMap::new(),
47+
value_map: ValueMap::new(()),
2248
start: Mutex::new(SystemTime::now()),
2349
}
2450
}
2551

2652
pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) {
2753
// The argument index is not applicable to LastValue.
28-
self.value_map.measure(measurement, attrs, 0);
54+
self.value_map.measure(measurement, attrs);
2955
}
3056

3157
pub(crate) fn compute_aggregation_delta(&self, dest: &mut Vec<DataPoint<T>>) {
@@ -49,7 +75,11 @@ impl<T: Number> LastValue<T> {
4975
attributes: vec![],
5076
start_time: Some(prev_start),
5177
time: Some(t),
52-
value: self.value_map.no_attribute_tracker.get_and_reset_value(),
78+
value: self
79+
.value_map
80+
.no_attribute_tracker
81+
.value
82+
.get_and_reset_value(),
5383
exemplars: vec![],
5484
});
5585
}
@@ -66,7 +96,7 @@ impl<T: Number> LastValue<T> {
6696
attributes: attrs.clone(),
6797
start_time: Some(prev_start),
6898
time: Some(t),
69-
value: tracker.get_value(),
99+
value: tracker.value.get_value(),
70100
exemplars: vec![],
71101
});
72102
}
@@ -101,7 +131,7 @@ impl<T: Number> LastValue<T> {
101131
attributes: vec![],
102132
start_time: Some(prev_start),
103133
time: Some(t),
104-
value: self.value_map.no_attribute_tracker.get_value(),
134+
value: self.value_map.no_attribute_tracker.value.get_value(),
105135
exemplars: vec![],
106136
});
107137
}
@@ -118,7 +148,7 @@ impl<T: Number> LastValue<T> {
118148
attributes: attrs.clone(),
119149
start_time: Some(prev_start),
120150
time: Some(t),
121-
value: tracker.get_value(),
151+
value: tracker.value.get_value(),
122152
exemplars: vec![],
123153
});
124154
}

0 commit comments

Comments
 (0)