@@ -47,7 +47,7 @@ impl<T: Number<T>> ValueMap<T> {
47
47
. take ( BUCKET_COUNT )
48
48
. collect :: < Vec < _ > > ( )
49
49
. try_into ( )
50
- . unwrap ( ) ;
50
+ . unwrap ( ) ; // this will never fail as Vec length is fixed
51
51
52
52
ValueMap {
53
53
buckets : Arc :: new ( buckets) ,
@@ -71,12 +71,16 @@ impl<T: Number<T>> ValueMap<T> {
71
71
// Calculate the total length of data points across all buckets.
72
72
fn total_data_points_count ( & self ) -> usize {
73
73
self . buckets
74
- . iter ( )
75
- . map ( |bucket_mutex| {
76
- let locked_bucket = bucket_mutex. lock ( ) . unwrap ( ) ;
77
- locked_bucket. as_ref ( ) . map_or ( 0 , |bucket| bucket. len ( ) )
78
- } )
79
- . sum :: < usize > ( )
74
+ . iter ( )
75
+ . map ( |bucket_mutex| {
76
+ bucket_mutex. lock ( )
77
+ . map ( |locked_bucket| locked_bucket. as_ref ( ) . map_or ( 0 , |bucket| bucket. len ( ) ) )
78
+ . unwrap_or_else ( |_| {
79
+ global:: handle_error ( MetricsError :: Other ( "Failed to acquire lock on a bucket. Using default `0` as total data points." . into ( ) ) ) ;
80
+ 0
81
+ } )
82
+ } )
83
+ . sum :: < usize > ( )
80
84
}
81
85
}
82
86
@@ -87,12 +91,19 @@ impl<T: Number<T>> ValueMap<T> {
87
91
self . has_no_value_attribute_value
88
92
. store ( true , Ordering :: Release ) ;
89
93
} else {
90
- let bucket_index = Self :: hash_to_bucket ( & attrs) as usize ; // Ensure index is usize for array indexing
94
+ let bucket_index = Self :: hash_to_bucket ( & attrs) as usize ;
91
95
let bucket_mutex = & self . buckets [ bucket_index] ;
92
- let mut bucket_guard = bucket_mutex. lock ( ) . unwrap ( ) ;
96
+
97
+ let mut bucket_guard = match bucket_mutex. lock ( ) {
98
+ Ok ( guard) => guard,
99
+ Err ( e) => {
100
+ eprintln ! ( "Failed to acquire lock due to: {:?}" , e) ;
101
+ return ; // TBD - retry ?
102
+ }
103
+ } ;
93
104
94
105
if bucket_guard. is_none ( ) {
95
- * bucket_guard = Some ( HashMap :: new ( ) ) ; // Initialize the bucket if it's None
106
+ * bucket_guard = Some ( HashMap :: new ( ) ) ;
96
107
}
97
108
98
109
if let Some ( ref mut values) = * bucket_guard {
@@ -106,7 +117,6 @@ impl<T: Number<T>> ValueMap<T> {
106
117
if is_under_cardinality_limit ( size) {
107
118
vacant_entry. insert ( measurement) ;
108
119
} else {
109
- // TBD - Update total_count ??
110
120
values
111
121
. entry ( STREAM_OVERFLOW_ATTRIBUTE_SET . clone ( ) )
112
122
. and_modify ( |val| * val += measurement)
@@ -188,17 +198,31 @@ impl<T: Number<T>> Sum<T> {
188
198
}
189
199
190
200
for bucket_mutex in self . value_map . buckets . iter ( ) {
191
- if let Some ( ref mut locked_bucket) = * bucket_mutex. lock ( ) . unwrap ( ) {
192
- for ( attrs, value) in locked_bucket. drain ( ) {
193
- s_data. data_points . push ( DataPoint {
194
- attributes : attrs,
195
- start_time : Some ( * self . start . lock ( ) . unwrap ( ) ) ,
196
- time : Some ( t) ,
197
- value,
198
- exemplars : vec ! [ ] ,
199
- } ) ;
201
+ match bucket_mutex. lock ( ) {
202
+ Ok ( mut locked_bucket) => {
203
+ if let Some ( ref mut bucket) = * locked_bucket {
204
+ for ( attrs, value) in bucket. drain ( ) {
205
+ // Correctly handle lock acquisition on self.start
206
+ let start_time = self . start . lock ( ) . map_or_else (
207
+ |_| SystemTime :: now ( ) , // In case of an error, use SystemTime::now()
208
+ |guard| * guard, // In case of success, dereference the guard to get the SystemTime
209
+ ) ;
210
+
211
+ s_data. data_points . push ( DataPoint {
212
+ attributes : attrs,
213
+ start_time : Some ( start_time) ,
214
+ time : Some ( t) ,
215
+ value,
216
+ exemplars : vec ! [ ] ,
217
+ } ) ;
218
+ }
219
+ }
220
+ }
221
+ Err ( e) => {
222
+ global:: handle_error ( MetricsError :: Other (
223
+ format ! ( "Failed to acquire lock on bucket due to: {:?}" , e) . into ( ) ,
224
+ ) ) ;
200
225
}
201
- // The bucket is automatically cleared by the .drain() method
202
226
}
203
227
}
204
228
@@ -261,16 +285,29 @@ impl<T: Number<T>> Sum<T> {
261
285
// sets that become "stale" need to be forgotten so this will not
262
286
// overload the system.
263
287
for bucket_mutex in self . value_map . buckets . iter ( ) {
264
- if let Some ( ref locked_bucket) = * bucket_mutex. lock ( ) . unwrap ( ) {
265
- for ( attrs, value) in locked_bucket. iter ( ) {
266
- s_data. data_points . push ( DataPoint {
267
- attributes : attrs. clone ( ) ,
268
- start_time : Some ( * self . start . lock ( ) . unwrap ( ) ) , // Consider last reset time
269
- time : Some ( t) ,
270
- value : * value,
271
- exemplars : vec ! [ ] ,
272
- } ) ;
288
+ // Handle potential lock failure gracefully
289
+ if let Ok ( locked_bucket) = bucket_mutex. lock ( ) {
290
+ if let Some ( locked_bucket) = & * locked_bucket {
291
+ for ( attrs, value) in locked_bucket. iter ( ) {
292
+ // Handle potential lock failure on self.start and use current time as fallback
293
+ let start_time = self . start . lock ( ) . map_or_else (
294
+ |_| SystemTime :: now ( ) , // Use SystemTime::now() as fallback on error
295
+ |guard| * guard, // Dereference the guard to get the SystemTime on success
296
+ ) ;
297
+
298
+ s_data. data_points . push ( DataPoint {
299
+ attributes : attrs. clone ( ) ,
300
+ start_time : Some ( start_time) ,
301
+ time : Some ( t) ,
302
+ value : * value,
303
+ exemplars : vec ! [ ] ,
304
+ } ) ;
305
+ }
273
306
}
307
+ } else {
308
+ global:: handle_error ( MetricsError :: Other (
309
+ "Failed to acquire lock on a bucket" . into ( ) ,
310
+ ) ) ;
274
311
}
275
312
}
276
313
@@ -352,20 +389,32 @@ impl<T: Number<T>> PrecomputedSum<T> {
352
389
}
353
390
354
391
for bucket_mutex in self . value_map . buckets . iter ( ) {
355
- if let Some ( ref mut locked_bucket) = * bucket_mutex. lock ( ) . unwrap ( ) {
356
- let default = T :: default ( ) ;
357
- for ( attrs, value) in locked_bucket. drain ( ) {
358
- let delta = value - * reported. get ( & attrs) . unwrap_or ( & default) ;
359
- if delta != default {
360
- new_reported. insert ( attrs. clone ( ) , value) ;
392
+ match bucket_mutex. lock ( ) {
393
+ Ok ( mut locked_bucket) => {
394
+ if let Some ( locked_bucket) = & mut * locked_bucket {
395
+ let default = T :: default ( ) ;
396
+ for ( attrs, value) in locked_bucket. drain ( ) {
397
+ let delta = value - * reported. get ( & attrs) . unwrap_or ( & default) ;
398
+ if delta != default {
399
+ new_reported. insert ( attrs. clone ( ) , value) ;
400
+ }
401
+ s_data. data_points . push ( DataPoint {
402
+ attributes : attrs. clone ( ) ,
403
+ start_time : Some ( prev_start) ,
404
+ time : Some ( t) ,
405
+ value : delta,
406
+ exemplars : vec ! [ ] ,
407
+ } ) ;
408
+ }
361
409
}
362
- s_data. data_points . push ( DataPoint {
363
- attributes : attrs. clone ( ) ,
364
- start_time : Some ( prev_start) ,
365
- time : Some ( t) ,
366
- value : delta,
367
- exemplars : vec ! [ ] ,
368
- } ) ;
410
+ }
411
+ Err ( e) => {
412
+ // Log or handle the lock acquisition error if necessary
413
+ global:: handle_error ( MetricsError :: Other (
414
+ format ! ( "Failed to acquire lock on bucket due to: {:?}" , e) . into ( ) ,
415
+ ) ) ;
416
+ // Continue to the next bucket if the lock cannot be acquired
417
+ continue ;
369
418
}
370
419
}
371
420
}
@@ -434,17 +483,31 @@ impl<T: Number<T>> PrecomputedSum<T> {
434
483
435
484
let default = T :: default ( ) ;
436
485
for bucket_mutex in self . value_map . buckets . iter ( ) {
437
- if let Some ( ref locked_bucket) = * bucket_mutex. lock ( ) . unwrap ( ) {
486
+ // Safely attempt to acquire the lock, handling any potential error.
487
+ let locked_bucket = match bucket_mutex. lock ( ) {
488
+ Ok ( bucket) => bucket,
489
+ Err ( e) => {
490
+ // Log the error or handle it as needed.
491
+ global:: handle_error ( MetricsError :: Other (
492
+ format ! ( "Failed to acquire lock on bucket due to: {:?}" , e) . into ( ) ,
493
+ ) ) ;
494
+ continue ; // Skip to the next bucket if the lock cannot be acquired.
495
+ }
496
+ } ;
497
+
498
+ // Proceed only if the bucket is not empty.
499
+ if let Some ( locked_bucket) = & * locked_bucket {
438
500
for ( attrs, value) in locked_bucket. iter ( ) {
439
501
let delta = * value - * reported. get ( attrs) . unwrap_or ( & default) ;
440
502
if delta != default {
441
503
new_reported. insert ( attrs. clone ( ) , * value) ;
442
504
}
505
+
443
506
s_data. data_points . push ( DataPoint {
444
507
attributes : attrs. clone ( ) ,
445
508
start_time : Some ( prev_start) ,
446
509
time : Some ( t) ,
447
- value : * value, // For cumulative, we use the value directly without calculating delta
510
+ value : * value, // For cumulative, directly use the value without calculating the delta.
448
511
exemplars : vec ! [ ] ,
449
512
} ) ;
450
513
}
0 commit comments