@@ -151,6 +151,7 @@ mod tests {
151
151
use std:: borrow:: Cow ;
152
152
use std:: sync:: { Arc , Mutex } ;
153
153
use std:: thread;
154
+ use std:: time:: Duration ;
154
155
155
156
// Run all tests in this mod
156
157
// cargo test metrics::tests --features=testing
@@ -1047,45 +1048,54 @@ mod tests {
1047
1048
let mut test_context = TestContext :: new ( temporality) ;
1048
1049
let counter = Arc :: new ( test_context. u64_counter ( "test" , "my_counter" , None ) ) ;
1049
1050
1050
- let mut update_threads = vec ! [ ] ;
1051
- for _ in 0 ..10 {
1052
- let counter = Arc :: clone ( & counter) ;
1053
-
1054
- update_threads. push ( thread:: spawn ( move || {
1055
- counter. add ( 1 , & [ KeyValue :: new ( "key1" , "value1" ) ] ) ;
1056
- counter. add ( 1 , & [ KeyValue :: new ( "key1" , "value1" ) ] ) ;
1057
- counter. add ( 1 , & [ KeyValue :: new ( "key1" , "value1" ) ] ) ;
1058
- counter. add ( 1 , & [ KeyValue :: new ( "key1" , "value1" ) ] ) ;
1059
- counter. add ( 1 , & [ KeyValue :: new ( "key1" , "value1" ) ] ) ;
1060
- } ) ) ;
1061
- }
1062
-
1063
- for thread in update_threads {
1064
- thread. join ( ) . unwrap ( ) ;
1051
+ for i in 0 ..10 {
1052
+ thread:: scope ( |s| {
1053
+ s. spawn ( || {
1054
+ counter. add ( 1 , & [ KeyValue :: new ( "key1" , "value1" ) ] ) ;
1055
+ counter. add ( 1 , & [ KeyValue :: new ( "key1" , "value1" ) ] ) ;
1056
+ counter. add ( 1 , & [ KeyValue :: new ( "key1" , "value1" ) ] ) ;
1057
+
1058
+ // Test concurrent collection by forcing half of the update threads to `force_flush` metrics and sleep for some time.
1059
+ if i % 2 == 0 {
1060
+ test_context. flush_metrics ( ) ;
1061
+ thread:: sleep ( Duration :: from_millis ( i) ) ; // Make each thread sleep for some time duration for better testing
1062
+ }
1063
+
1064
+ counter. add ( 1 , & [ KeyValue :: new ( "key1" , "value1" ) ] ) ;
1065
+ counter. add ( 1 , & [ KeyValue :: new ( "key1" , "value1" ) ] ) ;
1066
+ } ) ;
1067
+ } ) ;
1065
1068
}
1066
1069
1067
1070
test_context. flush_metrics ( ) ;
1068
1071
1069
1072
// Assert
1070
- let sum = test_context. get_aggregation :: < data:: Sum < u64 > > ( "my_counter" , None ) ;
1071
- // Expecting 2 time-series.
1072
- assert_eq ! ( sum. data_points. len( ) , 1 ) ;
1073
- assert ! ( sum. is_monotonic, "Counter should produce monotonic." ) ;
1074
- assert_eq ! ( sum. temporality, temporality) ;
1075
- if let Temporality :: Cumulative = temporality {
1076
- assert_eq ! (
1077
- sum. temporality,
1078
- Temporality :: Cumulative ,
1079
- "Should produce cumulative"
1080
- ) ;
1073
+ // We invoke `test_context.flush_metrics()` six times.
1074
+ let sums =
1075
+ test_context. get_from_multiple_aggregations :: < data:: Sum < u64 > > ( "my_counter" , None , 6 ) ;
1076
+
1077
+ let values = sums
1078
+ . iter ( )
1079
+ . map ( |sum| {
1080
+ assert_eq ! ( sum. data_points. len( ) , 1 ) ; // Expecting 1 time-series.
1081
+ assert ! ( sum. is_monotonic, "Counter should produce monotonic." ) ;
1082
+ assert_eq ! ( sum. temporality, temporality) ;
1083
+
1084
+ // find and validate key1=value1 datapoint
1085
+ let data_point = find_datapoint_with_key_value ( & sum. data_points , "key1" , "value1" )
1086
+ . expect ( "datapoint with key1=value1 expected" ) ;
1087
+
1088
+ data_point. value
1089
+ } )
1090
+ . collect :: < Vec < _ > > ( ) ;
1091
+
1092
+ let total_sum: u64 = if temporality == Temporality :: Delta {
1093
+ values. iter ( ) . sum ( )
1081
1094
} else {
1082
- assert_eq ! ( sum . temporality , Temporality :: Delta , "Should produce delta" ) ;
1083
- }
1095
+ * values . last ( ) . unwrap ( )
1096
+ } ;
1084
1097
1085
- // find and validate key1=value2 datapoint
1086
- let data_point1 = find_datapoint_with_key_value ( & sum. data_points , "key1" , "value1" )
1087
- . expect ( "datapoint with key1=value1 expected" ) ;
1088
- assert_eq ! ( data_point1. value, 50 ) ; // Each of the 10 update threads record measurements summing up to 5.
1098
+ assert_eq ! ( total_sum, 50 ) ; // Each of the 10 update threads record measurements summing up to 5.
1089
1099
}
1090
1100
1091
1101
fn histogram_aggregation_helper ( temporality : Temporality ) {
@@ -1553,5 +1563,58 @@ mod tests {
1553
1563
. downcast_ref :: < T > ( )
1554
1564
. expect ( "Failed to cast aggregation to expected type" )
1555
1565
}
1566
+
1567
+ fn get_from_multiple_aggregations < T : data:: Aggregation > (
1568
+ & mut self ,
1569
+ counter_name : & str ,
1570
+ unit_name : Option < & str > ,
1571
+ invocation_count : usize ,
1572
+ ) -> Vec < & T > {
1573
+ self . resource_metrics = self
1574
+ . exporter
1575
+ . get_finished_metrics ( )
1576
+ . expect ( "metrics expected to be exported" ) ;
1577
+
1578
+ assert ! (
1579
+ !self . resource_metrics. is_empty( ) ,
1580
+ "no metrics were exported"
1581
+ ) ;
1582
+
1583
+ assert_eq ! (
1584
+ self . resource_metrics. len( ) ,
1585
+ invocation_count,
1586
+ "Expected collect to be called {} times" ,
1587
+ invocation_count
1588
+ ) ;
1589
+
1590
+ let result = self
1591
+ . resource_metrics
1592
+ . iter ( )
1593
+ . map ( |resource_metric| {
1594
+ assert ! (
1595
+ !resource_metric. scope_metrics. is_empty( ) ,
1596
+ "An export with no scope metrics occurred"
1597
+ ) ;
1598
+
1599
+ assert ! ( !resource_metric. scope_metrics[ 0 ] . metrics. is_empty( ) ) ;
1600
+
1601
+ let metric = & resource_metric. scope_metrics [ 0 ] . metrics [ 0 ] ;
1602
+ assert_eq ! ( metric. name, counter_name) ;
1603
+
1604
+ if let Some ( expected_unit) = unit_name {
1605
+ assert_eq ! ( metric. unit, expected_unit) ;
1606
+ }
1607
+
1608
+ let aggregation = metric
1609
+ . data
1610
+ . as_any ( )
1611
+ . downcast_ref :: < T > ( )
1612
+ . expect ( "Failed to cast aggregation to expected type" ) ;
1613
+ aggregation
1614
+ } )
1615
+ . collect :: < Vec < _ > > ( ) ;
1616
+
1617
+ result
1618
+ }
1556
1619
}
1557
1620
}
0 commit comments