@@ -7,45 +7,9 @@ use crate::metrics::data::HistogramDataPoint;
7
7
use crate :: metrics:: data:: { self , Aggregation , Temporality } ;
8
8
use opentelemetry:: KeyValue ;
9
9
10
- use super :: Number ;
11
- use super :: { AtomicTracker , AtomicallyUpdate , Operation , ValueMap } ;
10
+ use super :: ValueMap ;
11
+ use super :: { Aggregator , Number } ;
12
12
13
- struct HistogramUpdate ;
14
-
15
- impl Operation for HistogramUpdate {
16
- fn update_tracker < T : Default , AT : AtomicTracker < T > > ( tracker : & AT , value : T , index : usize ) {
17
- tracker. update_histogram ( index, value) ;
18
- }
19
- }
20
-
21
- struct HistogramTracker < T > {
22
- buckets : Mutex < Buckets < T > > ,
23
- }
24
-
25
- impl < T : Number > AtomicTracker < T > for HistogramTracker < T > {
26
- fn update_histogram ( & self , index : usize , value : T ) {
27
- let mut buckets = match self . buckets . lock ( ) {
28
- Ok ( guard) => guard,
29
- Err ( _) => return ,
30
- } ;
31
-
32
- buckets. bin ( index, value) ;
33
- buckets. sum ( value) ;
34
- }
35
- }
36
-
37
- impl < T : Number > AtomicallyUpdate < T > for HistogramTracker < T > {
38
- type AtomicTracker = HistogramTracker < T > ;
39
-
40
- fn new_atomic_tracker ( buckets_count : Option < usize > ) -> Self :: AtomicTracker {
41
- let count = buckets_count. unwrap ( ) ;
42
- HistogramTracker {
43
- buckets : Mutex :: new ( Buckets :: < T > :: new ( count) ) ,
44
- }
45
- }
46
- }
47
-
48
- #[ derive( Default ) ]
49
13
struct Buckets < T > {
50
14
counts : Vec < u64 > ,
51
15
count : u64 ,
@@ -54,29 +18,17 @@ struct Buckets<T> {
54
18
max : T ,
55
19
}
56
20
57
- impl < T : Number > Buckets < T > {
58
- /// returns buckets with `n` bins.
59
- fn new ( n : usize ) -> Buckets < T > {
60
- Buckets {
61
- counts : vec ! [ 0 ; n] ,
21
+ impl < T > Buckets < T >
22
+ where
23
+ T : Number ,
24
+ {
25
+ fn new ( size : usize ) -> Self {
26
+ Self {
27
+ counts : vec ! [ 0 ; size] ,
28
+ count : 0 ,
29
+ total : T :: default ( ) ,
62
30
min : T :: max ( ) ,
63
31
max : T :: min ( ) ,
64
- ..Default :: default ( )
65
- }
66
- }
67
-
68
- fn sum ( & mut self , value : T ) {
69
- self . total += value;
70
- }
71
-
72
- fn bin ( & mut self , idx : usize , value : T ) {
73
- self . counts [ idx] += 1 ;
74
- self . count += 1 ;
75
- if value < self . min {
76
- self . min = value;
77
- }
78
- if value > self . max {
79
- self . max = value
80
32
}
81
33
}
82
34
@@ -91,45 +43,72 @@ impl<T: Number> Buckets<T> {
91
43
}
92
44
}
93
45
46
+ impl < T > Aggregator < T > for Mutex < Buckets < T > >
47
+ where
48
+ T : Number ,
49
+ {
50
+ type InitConfig = usize ;
51
+ /// Value and bucket index
52
+ type PreComputedValue = ( T , usize ) ;
53
+
54
+ fn create ( size : & usize ) -> Self {
55
+ Mutex :: new ( Buckets :: new ( * size) )
56
+ }
57
+
58
+ fn update ( & self , ( value, idx) : ( T , usize ) ) {
59
+ if let Ok ( mut this) = self . lock ( ) {
60
+ this. counts [ idx] += 1 ;
61
+ this. count += 1 ;
62
+ if value < this. min {
63
+ this. min = value;
64
+ }
65
+ if value > this. max {
66
+ this. max = value
67
+ }
68
+ this. total += value;
69
+ }
70
+ }
71
+ }
72
+
94
73
/// Summarizes a set of measurements as a histogram with explicitly defined
95
74
/// buckets.
96
75
pub ( crate ) struct Histogram < T : Number > {
97
- value_map : ValueMap < HistogramTracker < T > , T , HistogramUpdate > ,
76
+ value_map : ValueMap < T , Mutex < Buckets < T > > > ,
98
77
bounds : Vec < f64 > ,
99
78
record_min_max : bool ,
100
79
record_sum : bool ,
101
80
start : Mutex < SystemTime > ,
102
81
}
103
82
104
83
impl < T : Number > Histogram < T > {
105
- pub ( crate ) fn new ( boundaries : Vec < f64 > , record_min_max : bool , record_sum : bool ) -> Self {
106
- let buckets_count = boundaries. len ( ) + 1 ;
107
- let mut histogram = Histogram {
108
- value_map : ValueMap :: new_with_buckets_count ( buckets_count) ,
109
- bounds : boundaries,
84
+ pub ( crate ) fn new ( mut bounds : Vec < f64 > , record_min_max : bool , record_sum : bool ) -> Self {
85
+ bounds. retain ( |v| !v. is_nan ( ) ) ;
86
+ bounds. sort_by ( |a, b| a. partial_cmp ( b) . expect ( "NaNs filtered out" ) ) ;
87
+ let buckets_count = bounds. len ( ) + 1 ;
88
+ Self {
89
+ value_map : ValueMap :: new ( buckets_count) ,
90
+ bounds,
110
91
record_min_max,
111
92
record_sum,
112
93
start : Mutex :: new ( SystemTime :: now ( ) ) ,
113
- } ;
114
-
115
- histogram. bounds . retain ( |v| !v. is_nan ( ) ) ;
116
- histogram
117
- . bounds
118
- . sort_by ( |a, b| a. partial_cmp ( b) . expect ( "NaNs filtered out" ) ) ;
119
-
120
- histogram
94
+ }
121
95
}
122
96
123
97
pub ( crate ) fn measure ( & self , measurement : T , attrs : & [ KeyValue ] ) {
124
98
let f = measurement. into_float ( ) ;
125
-
99
+ // Ignore NaN and infinity.
100
+ // Only makes sense if T is f64, maybe this could be no-op for other cases?
101
+ if f. is_infinite ( ) || f. is_nan ( ) {
102
+ return ;
103
+ }
126
104
// This search will return an index in the range `[0, bounds.len()]`, where
127
105
// it will return `bounds.len()` if value is greater than the last element
128
106
// of `bounds`. This aligns with the buckets in that the length of buckets
129
107
// is `bounds.len()+1`, with the last bucket representing:
130
108
// `(bounds[bounds.len()-1], +∞)`.
131
109
let index = self . bounds . partition_point ( |& x| x < f) ;
132
- self . value_map . measure ( measurement, attrs, index) ;
110
+
111
+ self . value_map . measure ( ( measurement, index) , attrs) ;
133
112
}
134
113
135
114
pub ( crate ) fn delta (
@@ -167,7 +146,7 @@ impl<T: Number> Histogram<T> {
167
146
. has_no_attribute_value
168
147
. swap ( false , Ordering :: AcqRel )
169
148
{
170
- if let Ok ( ref mut b) = self . value_map . no_attribute_tracker . buckets . lock ( ) {
149
+ if let Ok ( ref mut b) = self . value_map . no_attribute_tracker . lock ( ) {
171
150
h. data_points . push ( HistogramDataPoint {
172
151
attributes : vec ! [ ] ,
173
152
start_time : start,
@@ -205,7 +184,7 @@ impl<T: Number> Histogram<T> {
205
184
let mut seen = HashSet :: new ( ) ;
206
185
for ( attrs, tracker) in trackers. drain ( ) {
207
186
if seen. insert ( Arc :: as_ptr ( & tracker) ) {
208
- if let Ok ( b) = tracker. buckets . lock ( ) {
187
+ if let Ok ( b) = tracker. lock ( ) {
209
188
h. data_points . push ( HistogramDataPoint {
210
189
attributes : attrs. clone ( ) ,
211
190
start_time : start,
@@ -278,7 +257,7 @@ impl<T: Number> Histogram<T> {
278
257
. has_no_attribute_value
279
258
. load ( Ordering :: Acquire )
280
259
{
281
- if let Ok ( b) = & self . value_map . no_attribute_tracker . buckets . lock ( ) {
260
+ if let Ok ( b) = & self . value_map . no_attribute_tracker . lock ( ) {
282
261
h. data_points . push ( HistogramDataPoint {
283
262
attributes : vec ! [ ] ,
284
263
start_time : start,
@@ -318,7 +297,7 @@ impl<T: Number> Histogram<T> {
318
297
let mut seen = HashSet :: new ( ) ;
319
298
for ( attrs, tracker) in trackers. iter ( ) {
320
299
if seen. insert ( Arc :: as_ptr ( tracker) ) {
321
- if let Ok ( b) = tracker. buckets . lock ( ) {
300
+ if let Ok ( b) = tracker. lock ( ) {
322
301
h. data_points . push ( HistogramDataPoint {
323
302
attributes : attrs. clone ( ) ,
324
303
start_time : start,
@@ -350,3 +329,68 @@ impl<T: Number> Histogram<T> {
350
329
( h. data_points . len ( ) , new_agg. map ( |a| Box :: new ( a) as Box < _ > ) )
351
330
}
352
331
}
332
+
333
+ #[ cfg( test) ]
334
+ mod tests {
335
+
336
+ use super :: * ;
337
+
338
+ #[ test]
339
+ fn when_f64_is_nan_or_infinity_then_ignore ( ) {
340
+ struct Expected {
341
+ min : f64 ,
342
+ max : f64 ,
343
+ sum : f64 ,
344
+ count : u64 ,
345
+ }
346
+ impl Expected {
347
+ fn new ( min : f64 , max : f64 , sum : f64 , count : u64 ) -> Self {
348
+ Expected {
349
+ min,
350
+ max,
351
+ sum,
352
+ count,
353
+ }
354
+ }
355
+ }
356
+ struct TestCase {
357
+ values : Vec < f64 > ,
358
+ expected : Expected ,
359
+ }
360
+
361
+ let test_cases = vec ! [
362
+ TestCase {
363
+ values: vec![ 2.0 , 4.0 , 1.0 ] ,
364
+ expected: Expected :: new( 1.0 , 4.0 , 7.0 , 3 ) ,
365
+ } ,
366
+ TestCase {
367
+ values: vec![ 2.0 , 4.0 , 1.0 , f64 :: INFINITY ] ,
368
+ expected: Expected :: new( 1.0 , 4.0 , 7.0 , 3 ) ,
369
+ } ,
370
+ TestCase {
371
+ values: vec![ 2.0 , 4.0 , 1.0 , -f64 :: INFINITY ] ,
372
+ expected: Expected :: new( 1.0 , 4.0 , 7.0 , 3 ) ,
373
+ } ,
374
+ TestCase {
375
+ values: vec![ 2.0 , f64 :: NAN , 4.0 , 1.0 ] ,
376
+ expected: Expected :: new( 1.0 , 4.0 , 7.0 , 3 ) ,
377
+ } ,
378
+ TestCase {
379
+ values: vec![ 4.0 , 4.0 , 4.0 , 2.0 , 16.0 , 1.0 ] ,
380
+ expected: Expected :: new( 1.0 , 16.0 , 31.0 , 6 ) ,
381
+ } ,
382
+ ] ;
383
+
384
+ for test in test_cases {
385
+ let h = Histogram :: new ( vec ! [ ] , true , true ) ;
386
+ for v in test. values {
387
+ h. measure ( v, & [ ] ) ;
388
+ }
389
+ let res = h. value_map . no_attribute_tracker . lock ( ) . unwrap ( ) ;
390
+ assert_eq ! ( test. expected. max, res. max) ;
391
+ assert_eq ! ( test. expected. min, res. min) ;
392
+ assert_eq ! ( test. expected. sum, res. total) ;
393
+ assert_eq ! ( test. expected. count, res. count) ;
394
+ }
395
+ }
396
+ }
0 commit comments