@@ -6,10 +6,26 @@ mod sum;
6
6
7
7
use core:: fmt;
8
8
use std:: ops:: { Add , AddAssign , Sub } ;
9
+ use std:: sync:: atomic:: { AtomicI64 , AtomicU64 , Ordering } ;
10
+ use std:: sync:: Mutex ;
9
11
10
12
pub ( crate ) use aggregate:: { AggregateBuilder , ComputeAggregation , Measure } ;
11
13
pub ( crate ) use exponential_histogram:: { EXPO_MAX_SCALE , EXPO_MIN_SCALE } ;
12
14
15
+ /// Marks a type that can have a value added and retrieved atomically. Required since
16
+ /// different types have different backing atomic mechanisms
17
+ pub ( crate ) trait AtomicTracker < T > : Sync + Send + ' static {
18
+ fn add ( & self , value : T ) ;
19
+ fn get_value ( & self ) -> T ;
20
+ fn get_and_reset_value ( & self ) -> T ;
21
+ }
22
+
23
+ /// Marks a type that can have an atomic tracker generated for it
24
+ pub ( crate ) trait AtomicallyUpdate < T > {
25
+ type AtomicTracker : AtomicTracker < T > ;
26
+ fn new_atomic_tracker ( ) -> Self :: AtomicTracker ;
27
+ }
28
+
13
29
pub ( crate ) trait Number < T > :
14
30
Add < Output = T >
15
31
+ AddAssign
@@ -23,6 +39,7 @@ pub(crate) trait Number<T>:
23
39
+ Send
24
40
+ Sync
25
41
+ ' static
42
+ + AtomicallyUpdate < T >
26
43
{
27
44
fn min ( ) -> Self ;
28
45
fn max ( ) -> Self ;
@@ -71,3 +88,159 @@ impl Number<f64> for f64 {
71
88
self
72
89
}
73
90
}
91
+
92
+ impl AtomicTracker < u64 > for AtomicU64 {
93
+ fn add ( & self , value : u64 ) {
94
+ self . fetch_add ( value, Ordering :: Relaxed ) ;
95
+ }
96
+
97
+ fn get_value ( & self ) -> u64 {
98
+ self . load ( Ordering :: Relaxed )
99
+ }
100
+
101
+ fn get_and_reset_value ( & self ) -> u64 {
102
+ self . swap ( 0 , Ordering :: Relaxed )
103
+ }
104
+ }
105
+
106
+ impl AtomicallyUpdate < u64 > for u64 {
107
+ type AtomicTracker = AtomicU64 ;
108
+
109
+ fn new_atomic_tracker ( ) -> Self :: AtomicTracker {
110
+ AtomicU64 :: new ( 0 )
111
+ }
112
+ }
113
+
114
+ impl AtomicTracker < i64 > for AtomicI64 {
115
+ fn add ( & self , value : i64 ) {
116
+ self . fetch_add ( value, Ordering :: Relaxed ) ;
117
+ }
118
+
119
+ fn get_value ( & self ) -> i64 {
120
+ self . load ( Ordering :: Relaxed )
121
+ }
122
+
123
+ fn get_and_reset_value ( & self ) -> i64 {
124
+ self . swap ( 0 , Ordering :: Relaxed )
125
+ }
126
+ }
127
+
128
+ impl AtomicallyUpdate < i64 > for i64 {
129
+ type AtomicTracker = AtomicI64 ;
130
+
131
+ fn new_atomic_tracker ( ) -> Self :: AtomicTracker {
132
+ AtomicI64 :: new ( 0 )
133
+ }
134
+ }
135
+
136
+ pub ( crate ) struct F64AtomicTracker {
137
+ inner : Mutex < f64 > , // Floating points don't have true atomics, so we need to use mutex for them
138
+ }
139
+
140
+ impl F64AtomicTracker {
141
+ fn new ( ) -> Self {
142
+ F64AtomicTracker {
143
+ inner : Mutex :: new ( 0.0 ) ,
144
+ }
145
+ }
146
+ }
147
+
148
+ impl AtomicTracker < f64 > for F64AtomicTracker {
149
+ fn add ( & self , value : f64 ) {
150
+ let mut guard = self . inner . lock ( ) . expect ( "F64 mutex was poisoned" ) ;
151
+ * guard += value;
152
+ }
153
+
154
+ fn get_value ( & self ) -> f64 {
155
+ let guard = self . inner . lock ( ) . expect ( "F64 mutex was poisoned" ) ;
156
+ * guard
157
+ }
158
+
159
+ fn get_and_reset_value ( & self ) -> f64 {
160
+ let mut guard = self . inner . lock ( ) . expect ( "F64 mutex was poisoned" ) ;
161
+ let value = * guard;
162
+ * guard = 0.0 ;
163
+
164
+ value
165
+ }
166
+ }
167
+
168
+ impl AtomicallyUpdate < f64 > for f64 {
169
+ type AtomicTracker = F64AtomicTracker ;
170
+
171
+ fn new_atomic_tracker ( ) -> Self :: AtomicTracker {
172
+ F64AtomicTracker :: new ( )
173
+ }
174
+ }
175
+
176
+ #[ cfg( test) ]
177
+ mod tests {
178
+ use super :: * ;
179
+
180
+ #[ test]
181
+ fn can_add_and_get_u64_atomic_value ( ) {
182
+ let atomic = u64:: new_atomic_tracker ( ) ;
183
+ atomic. add ( 15 ) ;
184
+ atomic. add ( 10 ) ;
185
+
186
+ let value = atomic. get_value ( ) ;
187
+ assert_eq ! ( value, 25 ) ;
188
+ }
189
+
190
+ #[ test]
191
+ fn can_reset_u64_atomic_value ( ) {
192
+ let atomic = u64:: new_atomic_tracker ( ) ;
193
+ atomic. add ( 15 ) ;
194
+
195
+ let value = atomic. get_and_reset_value ( ) ;
196
+ let value2 = atomic. get_value ( ) ;
197
+
198
+ assert_eq ! ( value, 15 , "Incorrect first value" ) ;
199
+ assert_eq ! ( value2, 0 , "Incorrect second value" ) ;
200
+ }
201
+
202
+ #[ test]
203
+ fn can_add_and_get_i64_atomic_value ( ) {
204
+ let atomic = i64:: new_atomic_tracker ( ) ;
205
+ atomic. add ( 15 ) ;
206
+ atomic. add ( -10 ) ;
207
+
208
+ let value = atomic. get_value ( ) ;
209
+ assert_eq ! ( value, 5 ) ;
210
+ }
211
+
212
+ #[ test]
213
+ fn can_reset_i64_atomic_value ( ) {
214
+ let atomic = i64:: new_atomic_tracker ( ) ;
215
+ atomic. add ( 15 ) ;
216
+
217
+ let value = atomic. get_and_reset_value ( ) ;
218
+ let value2 = atomic. get_value ( ) ;
219
+
220
+ assert_eq ! ( value, 15 , "Incorrect first value" ) ;
221
+ assert_eq ! ( value2, 0 , "Incorrect second value" ) ;
222
+ }
223
+
224
+ #[ test]
225
+ fn can_add_and_get_f64_atomic_value ( ) {
226
+ let atomic = f64:: new_atomic_tracker ( ) ;
227
+ atomic. add ( 15.3 ) ;
228
+ atomic. add ( 10.4 ) ;
229
+
230
+ let value = atomic. get_value ( ) ;
231
+
232
+ assert ! ( f64 :: abs( 25.7 - value) < 0.0001 ) ;
233
+ }
234
+
235
+ #[ test]
236
+ fn can_reset_f64_atomic_value ( ) {
237
+ let atomic = f64:: new_atomic_tracker ( ) ;
238
+ atomic. add ( 15.5 ) ;
239
+
240
+ let value = atomic. get_and_reset_value ( ) ;
241
+ let value2 = atomic. get_value ( ) ;
242
+
243
+ assert ! ( f64 :: abs( 15.5 - value) < 0.0001 , "Incorrect first value" ) ;
244
+ assert ! ( f64 :: abs( 0.0 - value2) < 0.0001 , "Incorrect second value" ) ;
245
+ }
246
+ }
0 commit comments