Skip to content

Commit 125c3fe

Browse files
frailltMindaugas Vinkelis
authored and
Mindaugas Vinkelis
committed
ValueMap interface change
1 parent 3976f3d commit 125c3fe

File tree

6 files changed

+239
-221
lines changed

6 files changed

+239
-221
lines changed

opentelemetry-sdk/src/metrics/internal/histogram.rs

+93-105
Original file line numberDiff line numberDiff line change
@@ -7,79 +7,41 @@ use crate::metrics::data::HistogramDataPoint;
77
use crate::metrics::data::{self, Aggregation, Temporality};
88
use opentelemetry::KeyValue;
99

10-
use super::Number;
11-
use super::{AtomicTracker, AtomicallyUpdate, Operation, ValueMap};
10+
use super::ValueMap;
11+
use super::{Aggregator, Number};
1212

13-
struct HistogramUpdate;
14-
15-
impl Operation for HistogramUpdate {
16-
fn update_tracker<T: Default, AT: AtomicTracker<T>>(tracker: &AT, value: T, index: usize) {
17-
tracker.update_histogram(index, value);
18-
}
19-
}
20-
21-
struct HistogramTracker<T> {
22-
buckets: Mutex<Buckets<T>>,
23-
}
24-
25-
impl<T: Number> AtomicTracker<T> for HistogramTracker<T> {
26-
fn update_histogram(&self, index: usize, value: T) {
27-
let mut buckets = match self.buckets.lock() {
28-
Ok(guard) => guard,
29-
Err(_) => return,
30-
};
31-
32-
buckets.bin(index, value);
33-
buckets.sum(value);
34-
}
35-
}
36-
37-
impl<T: Number> AtomicallyUpdate<T> for HistogramTracker<T> {
38-
type AtomicTracker = HistogramTracker<T>;
39-
40-
fn new_atomic_tracker(buckets_count: Option<usize>) -> Self::AtomicTracker {
41-
let count = buckets_count.unwrap();
42-
HistogramTracker {
43-
buckets: Mutex::new(Buckets::<T>::new(count)),
44-
}
45-
}
13+
struct BucketsConfig {
14+
bounds: Vec<f64>,
15+
record_min_max: bool,
16+
record_sum: bool,
4617
}
4718

48-
#[derive(Default)]
49-
struct Buckets<T> {
19+
#[derive(Default, Debug, Clone)]
20+
struct BucketsData<T> {
5021
counts: Vec<u64>,
5122
count: u64,
5223
total: T,
5324
min: T,
5425
max: T,
5526
}
5627

57-
impl<T: Number> Buckets<T> {
58-
/// returns buckets with `n` bins.
59-
fn new(n: usize) -> Buckets<T> {
60-
Buckets {
61-
counts: vec![0; n],
28+
struct Buckets<T> {
29+
data: Mutex<BucketsData<T>>,
30+
}
31+
32+
impl<T> BucketsData<T>
33+
where
34+
T: Number,
35+
{
36+
fn new(size: usize) -> Self {
37+
Self {
38+
counts: vec![0; size],
6239
min: T::max(),
6340
max: T::min(),
6441
..Default::default()
6542
}
6643
}
6744

68-
fn sum(&mut self, value: T) {
69-
self.total += value;
70-
}
71-
72-
fn bin(&mut self, idx: usize, value: T) {
73-
self.counts[idx] += 1;
74-
self.count += 1;
75-
if value < self.min {
76-
self.min = value;
77-
}
78-
if value > self.max {
79-
self.max = value
80-
}
81-
}
82-
8345
fn reset(&mut self) {
8446
for item in &mut self.counts {
8547
*item = 0;
@@ -91,45 +53,71 @@ impl<T: Number> Buckets<T> {
9153
}
9254
}
9355

56+
impl<T> Aggregator<T> for Buckets<T>
57+
where
58+
T: Number,
59+
{
60+
type Config = BucketsConfig;
61+
62+
fn create(config: &BucketsConfig) -> Self {
63+
let size = config.bounds.len() + 1;
64+
Buckets {
65+
data: Mutex::new(BucketsData::new(size)),
66+
}
67+
}
68+
69+
fn update(&self, config: &BucketsConfig, measurement: T) {
70+
let f_value = measurement.into_float();
71+
// Ignore NaN and infinity.
72+
if f_value.is_infinite() || f_value.is_nan() {
73+
return;
74+
}
75+
// This search will return an index in the range `[0, bounds.len()]`, where
76+
// it will return `bounds.len()` if value is greater than the last element
77+
// of `bounds`. This aligns with the buckets in that the length of buckets
78+
// is `bounds.len()+1`, with the last bucket representing:
79+
// `(bounds[bounds.len()-1], +∞)`.
80+
let idx = config.bounds.partition_point(|&x| x < f_value);
81+
if let Ok(mut data) = self.data.lock() {
82+
data.counts[idx] += 1;
83+
data.count += 1;
84+
if config.record_min_max {
85+
if measurement < data.min {
86+
data.min = measurement;
87+
}
88+
if measurement > data.max {
89+
data.max = measurement
90+
}
91+
}
92+
// it's very cheap to update it, even if it is not configured to record_sum
93+
data.total += measurement;
94+
}
95+
}
96+
}
97+
9498
/// Summarizes a set of measurements as a histogram with explicitly defined
9599
/// buckets.
96100
pub(crate) struct Histogram<T: Number> {
97-
value_map: ValueMap<HistogramTracker<T>, T, HistogramUpdate>,
98-
bounds: Vec<f64>,
99-
record_min_max: bool,
100-
record_sum: bool,
101+
value_map: ValueMap<T, Buckets<T>>,
101102
start: Mutex<SystemTime>,
102103
}
103104

104105
impl<T: Number> Histogram<T> {
105-
pub(crate) fn new(boundaries: Vec<f64>, record_min_max: bool, record_sum: bool) -> Self {
106-
let buckets_count = boundaries.len() + 1;
107-
let mut histogram = Histogram {
108-
value_map: ValueMap::new_with_buckets_count(buckets_count),
109-
bounds: boundaries,
110-
record_min_max,
111-
record_sum,
106+
pub(crate) fn new(mut bounds: Vec<f64>, record_min_max: bool, record_sum: bool) -> Self {
107+
bounds.retain(|v| !v.is_nan());
108+
bounds.sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out"));
109+
Self {
110+
value_map: ValueMap::new(BucketsConfig {
111+
record_min_max,
112+
record_sum,
113+
bounds,
114+
}),
112115
start: Mutex::new(SystemTime::now()),
113-
};
114-
115-
histogram.bounds.retain(|v| !v.is_nan());
116-
histogram
117-
.bounds
118-
.sort_by(|a, b| a.partial_cmp(b).expect("NaNs filtered out"));
119-
120-
histogram
116+
}
121117
}
122118

123119
pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) {
124-
let f = measurement.into_float();
125-
126-
// This search will return an index in the range `[0, bounds.len()]`, where
127-
// it will return `bounds.len()` if value is greater than the last element
128-
// of `bounds`. This aligns with the buckets in that the length of buckets
129-
// is `bounds.len()+1`, with the last bucket representing:
130-
// `(bounds[bounds.len()-1], +∞)`.
131-
let index = self.bounds.partition_point(|&x| x < f);
132-
self.value_map.measure(measurement, attrs, index);
120+
self.value_map.measure(measurement, attrs);
133121
}
134122

135123
pub(crate) fn delta(
@@ -167,25 +155,25 @@ impl<T: Number> Histogram<T> {
167155
.has_no_attribute_value
168156
.swap(false, Ordering::AcqRel)
169157
{
170-
if let Ok(ref mut b) = self.value_map.no_attribute_tracker.buckets.lock() {
158+
if let Ok(ref mut b) = self.value_map.no_attribute_tracker.data.lock() {
171159
h.data_points.push(HistogramDataPoint {
172160
attributes: vec![],
173161
start_time: start,
174162
time: t,
175163
count: b.count,
176-
bounds: self.bounds.clone(),
164+
bounds: self.value_map.config.bounds.clone(),
177165
bucket_counts: b.counts.clone(),
178-
sum: if self.record_sum {
166+
sum: if self.value_map.config.record_sum {
179167
b.total
180168
} else {
181169
T::default()
182170
},
183-
min: if self.record_min_max {
171+
min: if self.value_map.config.record_min_max {
184172
Some(b.min)
185173
} else {
186174
None
187175
},
188-
max: if self.record_min_max {
176+
max: if self.value_map.config.record_min_max {
189177
Some(b.max)
190178
} else {
191179
None
@@ -205,25 +193,25 @@ impl<T: Number> Histogram<T> {
205193
let mut seen = HashSet::new();
206194
for (attrs, tracker) in trackers.drain() {
207195
if seen.insert(Arc::as_ptr(&tracker)) {
208-
if let Ok(b) = tracker.buckets.lock() {
196+
if let Ok(b) = tracker.data.lock() {
209197
h.data_points.push(HistogramDataPoint {
210198
attributes: attrs.clone(),
211199
start_time: start,
212200
time: t,
213201
count: b.count,
214-
bounds: self.bounds.clone(),
202+
bounds: self.value_map.config.bounds.clone(),
215203
bucket_counts: b.counts.clone(),
216-
sum: if self.record_sum {
204+
sum: if self.value_map.config.record_sum {
217205
b.total
218206
} else {
219207
T::default()
220208
},
221-
min: if self.record_min_max {
209+
min: if self.value_map.config.record_min_max {
222210
Some(b.min)
223211
} else {
224212
None
225213
},
226-
max: if self.record_min_max {
214+
max: if self.value_map.config.record_min_max {
227215
Some(b.max)
228216
} else {
229217
None
@@ -278,25 +266,25 @@ impl<T: Number> Histogram<T> {
278266
.has_no_attribute_value
279267
.load(Ordering::Acquire)
280268
{
281-
if let Ok(b) = &self.value_map.no_attribute_tracker.buckets.lock() {
269+
if let Ok(b) = &self.value_map.no_attribute_tracker.data.lock() {
282270
h.data_points.push(HistogramDataPoint {
283271
attributes: vec![],
284272
start_time: start,
285273
time: t,
286274
count: b.count,
287-
bounds: self.bounds.clone(),
275+
bounds: self.value_map.config.bounds.clone(),
288276
bucket_counts: b.counts.clone(),
289-
sum: if self.record_sum {
277+
sum: if self.value_map.config.record_sum {
290278
b.total
291279
} else {
292280
T::default()
293281
},
294-
min: if self.record_min_max {
282+
min: if self.value_map.config.record_min_max {
295283
Some(b.min)
296284
} else {
297285
None
298286
},
299-
max: if self.record_min_max {
287+
max: if self.value_map.config.record_min_max {
300288
Some(b.max)
301289
} else {
302290
None
@@ -318,25 +306,25 @@ impl<T: Number> Histogram<T> {
318306
let mut seen = HashSet::new();
319307
for (attrs, tracker) in trackers.iter() {
320308
if seen.insert(Arc::as_ptr(tracker)) {
321-
if let Ok(b) = tracker.buckets.lock() {
309+
if let Ok(b) = tracker.data.lock() {
322310
h.data_points.push(HistogramDataPoint {
323311
attributes: attrs.clone(),
324312
start_time: start,
325313
time: t,
326314
count: b.count,
327-
bounds: self.bounds.clone(),
315+
bounds: self.value_map.config.bounds.clone(),
328316
bucket_counts: b.counts.clone(),
329-
sum: if self.record_sum {
317+
sum: if self.value_map.config.record_sum {
330318
b.total
331319
} else {
332320
T::default()
333321
},
334-
min: if self.record_min_max {
322+
min: if self.value_map.config.record_min_max {
335323
Some(b.min)
336324
} else {
337325
None
338326
},
339-
max: if self.record_min_max {
327+
max: if self.value_map.config.record_min_max {
340328
Some(b.max)
341329
} else {
342330
None

0 commit comments

Comments
 (0)