@@ -150,6 +150,7 @@ mod tests {
150
150
use rand:: { rngs, Rng , SeedableRng } ;
151
151
use std:: borrow:: Cow ;
152
152
use std:: sync:: { Arc , Mutex } ;
153
+ use std:: thread;
153
154
154
155
// Run all tests in this mod
155
156
// cargo test metrics::tests --features=metrics,testing
@@ -1029,6 +1030,61 @@ mod tests {
1029
1030
assert ! ( resource_metrics. is_empty( ) , "No metrics should be exported as no new measurements were recorded since last collect." ) ;
1030
1031
}
1031
1032
1033
+ #[ tokio:: test( flavor = "multi_thread" , worker_threads = 1 ) ]
1034
+ async fn counter_multithreaded ( ) {
1035
+ // Run this test with stdout enabled to see output.
1036
+ // cargo test counter_multithreaded --features=metrics,testing -- --nocapture
1037
+
1038
+ counter_multithreaded_aggregation_helper ( Temporality :: Delta ) ;
1039
+ counter_multithreaded_aggregation_helper ( Temporality :: Cumulative ) ;
1040
+ }
1041
+
1042
+ fn counter_multithreaded_aggregation_helper ( temporality : Temporality ) {
1043
+ // Arrange
1044
+ let mut test_context = TestContext :: new ( temporality) ;
1045
+ let counter = Arc :: new ( test_context. u64_counter ( "test" , "my_counter" , None ) ) ;
1046
+
1047
+ let mut update_threads = vec ! [ ] ;
1048
+ for _ in 0 ..10 {
1049
+ let counter = Arc :: clone ( & counter) ;
1050
+
1051
+ update_threads. push ( thread:: spawn ( move || {
1052
+ counter. add ( 1 , & [ KeyValue :: new ( "key1" , "value1" ) ] ) ;
1053
+ counter. add ( 1 , & [ KeyValue :: new ( "key1" , "value1" ) ] ) ;
1054
+ counter. add ( 1 , & [ KeyValue :: new ( "key1" , "value1" ) ] ) ;
1055
+ counter. add ( 1 , & [ KeyValue :: new ( "key1" , "value1" ) ] ) ;
1056
+ counter. add ( 1 , & [ KeyValue :: new ( "key1" , "value1" ) ] ) ;
1057
+ } ) ) ;
1058
+ }
1059
+
1060
+ for thread in update_threads {
1061
+ thread. join ( ) . unwrap ( ) ;
1062
+ }
1063
+
1064
+ test_context. flush_metrics ( ) ;
1065
+
1066
+ // Assert
1067
+ let sum = test_context. get_aggregation :: < data:: Sum < u64 > > ( "my_counter" , None ) ;
1068
+ // Expecting 2 time-series.
1069
+ assert_eq ! ( sum. data_points. len( ) , 1 ) ;
1070
+ assert ! ( sum. is_monotonic, "Counter should produce monotonic." ) ;
1071
+ assert_eq ! ( sum. temporality, temporality) ;
1072
+ if let Temporality :: Cumulative = temporality {
1073
+ assert_eq ! (
1074
+ sum. temporality,
1075
+ Temporality :: Cumulative ,
1076
+ "Should produce cumulative"
1077
+ ) ;
1078
+ } else {
1079
+ assert_eq ! ( sum. temporality, Temporality :: Delta , "Should produce delta" ) ;
1080
+ }
1081
+
1082
+ // find and validate key1=value2 datapoint
1083
+ let data_point1 = find_datapoint_with_key_value ( & sum. data_points , "key1" , "value1" )
1084
+ . expect ( "datapoint with key1=value1 expected" ) ;
1085
+ assert_eq ! ( data_point1. value, 50 ) ; // Each of the 10 update threads record measurements summing up to 5.
1086
+ }
1087
+
1032
1088
fn histogram_aggregation_helper ( temporality : Temporality ) {
1033
1089
// Arrange
1034
1090
let mut test_context = TestContext :: new ( temporality) ;
0 commit comments