@@ -145,9 +145,10 @@ mod tests {
145
145
use crate :: metrics:: reader:: TemporalitySelector ;
146
146
use crate :: testing:: metrics:: InMemoryMetricsExporterBuilder ;
147
147
use crate :: { runtime, testing:: metrics:: InMemoryMetricsExporter } ;
148
- use opentelemetry:: metrics:: { Counter , UpDownCounter } ;
148
+ use opentelemetry:: metrics:: { Counter , Meter , UpDownCounter } ;
149
149
use opentelemetry:: { metrics:: MeterProvider as _, KeyValue } ;
150
150
use std:: borrow:: Cow ;
151
+ use std:: sync:: { Arc , Mutex } ;
151
152
152
153
// Run all tests in this mod
153
154
// cargo test metrics::tests --features=metrics,testing
@@ -213,86 +214,94 @@ mod tests {
213
214
}
214
215
215
216
#[ tokio:: test( flavor = "multi_thread" , worker_threads = 1 ) ]
216
- async fn observable_counter_aggregation ( ) {
217
+ async fn observable_counter_aggregation_cumulative_non_zero_increment ( ) {
217
218
// Run this test with stdout enabled to see output.
218
- // cargo test observable_counter_aggregation --features=metrics,testing -- --nocapture
219
+ // cargo test observable_counter_aggregation_cumulative_non_zero_increment --features=testing -- --nocapture
220
+ observable_counter_aggregation_helper ( Temporality :: Cumulative , 100 , 10 , 4 ) ;
221
+ }
219
222
220
- // Arrange
221
- let exporter = InMemoryMetricsExporter :: default ( ) ;
222
- let reader = PeriodicReader :: builder ( exporter. clone ( ) , runtime:: Tokio ) . build ( ) ;
223
- let meter_provider = SdkMeterProvider :: builder ( ) . with_reader ( reader) . build ( ) ;
223
+ #[ tokio:: test( flavor = "multi_thread" , worker_threads = 1 ) ]
224
+ async fn observable_counter_aggregation_delta_non_zero_increment ( ) {
225
+ // Run this test with stdout enabled to see output.
226
+ // cargo test observable_counter_aggregation_delta_non_zero_increment --features=testing -- --nocapture
227
+ observable_counter_aggregation_helper ( Temporality :: Delta , 100 , 10 , 4 ) ;
228
+ }
224
229
225
- // Act
226
- let meter = meter_provider. meter ( "test" ) ;
227
- let _counter = meter
230
+ #[ tokio:: test( flavor = "multi_thread" , worker_threads = 1 ) ]
231
+ async fn observable_counter_aggregation_cumulative_zero_increment ( ) {
232
+ // Run this test with stdout enabled to see output.
233
+ // cargo test observable_counter_aggregation_cumulative_zero_increment --features=testing -- --nocapture
234
+ observable_counter_aggregation_helper ( Temporality :: Cumulative , 100 , 0 , 4 ) ;
235
+ }
236
+
237
+ #[ tokio:: test( flavor = "multi_thread" , worker_threads = 1 ) ]
238
+ #[ ignore = "Aggregation bug! https://github.com/open-telemetry/opentelemetry-rust/issues/1517" ]
239
+ async fn observable_counter_aggregation_delta_zero_increment ( ) {
240
+ // Run this test with stdout enabled to see output.
241
+ // cargo test observable_counter_aggregation_delta_zero_increment --features=testing -- --nocapture
242
+ observable_counter_aggregation_helper ( Temporality :: Delta , 100 , 0 , 4 ) ;
243
+ }
244
+
245
+ fn observable_counter_aggregation_helper (
246
+ temporality : Temporality ,
247
+ start : u64 ,
248
+ increment : u64 ,
249
+ length : u64 ,
250
+ ) {
251
+ // Arrange
252
+ let mut test_context = TestContext :: new ( temporality) ;
253
+ // The Observable counter reports values[0], values[1],....values[n] on each flush.
254
+ let values: Vec < u64 > = ( 0 ..length) . map ( |i| start + i * increment) . collect ( ) ;
255
+ println ! ( "Testing with observable values: {:?}" , values) ;
256
+ let values = Arc :: new ( values) ;
257
+ let values_clone = values. clone ( ) ;
258
+ let i = Arc :: new ( Mutex :: new ( 0 ) ) ;
259
+ let _observable_counter = test_context
260
+ . meter ( )
228
261
. u64_observable_counter ( "my_observable_counter" )
229
262
. with_unit ( "my_unit" )
230
- . with_callback ( |observer| {
231
- observer. observe ( 100 , & [ KeyValue :: new ( "key1" , "value1" ) ] ) ;
232
- observer. observe ( 200 , & [ KeyValue :: new ( "key1" , "value2" ) ] ) ;
263
+ . with_callback ( move |observer| {
264
+ let mut index = i. lock ( ) . unwrap ( ) ;
265
+ if * index < values. len ( ) {
266
+ observer. observe ( values[ * index] , & [ KeyValue :: new ( "key1" , "value1" ) ] ) ;
267
+ * index += 1 ;
268
+ }
233
269
} )
234
270
. init ( ) ;
235
271
236
- meter_provider. force_flush ( ) . unwrap ( ) ;
237
-
238
- // Assert
239
- let resource_metrics = exporter
240
- . get_finished_metrics ( )
241
- . expect ( "metrics are expected to be exported." ) ;
242
- assert ! ( !resource_metrics. is_empty( ) ) ;
243
- let metric = & resource_metrics[ 0 ] . scope_metrics [ 0 ] . metrics [ 0 ] ;
244
- assert_eq ! ( metric. name, "my_observable_counter" ) ;
245
- assert_eq ! ( metric. unit, "my_unit" ) ;
246
- let sum = metric
247
- . data
248
- . as_any ( )
249
- . downcast_ref :: < data:: Sum < u64 > > ( )
250
- . expect ( "Sum aggregation expected for ObservableCounter instruments by default" ) ;
251
-
252
- // Expecting 2 time-series.
253
- assert_eq ! ( sum. data_points. len( ) , 2 ) ;
254
- assert ! ( sum. is_monotonic, "Counter should produce monotonic." ) ;
255
- assert_eq ! (
256
- sum. temporality,
257
- data:: Temporality :: Cumulative ,
258
- "Should produce cumulative by default."
259
- ) ;
260
-
261
- // find and validate key1=value1 datapoint
262
- let mut data_point1 = None ;
263
- for datapoint in & sum. data_points {
264
- if datapoint
265
- . attributes
266
- . iter ( )
267
- . any ( |kv| kv. key . as_str ( ) == "key1" && kv. value . as_str ( ) == "value1" )
268
- {
269
- data_point1 = Some ( datapoint) ;
272
+ for ( iter, v) in values_clone. iter ( ) . enumerate ( ) {
273
+ test_context. flush_metrics ( ) ;
274
+ let sum = test_context. get_aggregation :: < data:: Sum < u64 > > ( "my_observable_counter" , None ) ;
275
+ assert_eq ! ( sum. data_points. len( ) , 1 ) ;
276
+ assert ! ( sum. is_monotonic, "Counter should produce monotonic." ) ;
277
+ if let Temporality :: Cumulative = temporality {
278
+ assert_eq ! (
279
+ sum. temporality,
280
+ Temporality :: Cumulative ,
281
+ "Should produce cumulative"
282
+ ) ;
283
+ } else {
284
+ assert_eq ! ( sum. temporality, Temporality :: Delta , "Should produce delta" ) ;
270
285
}
271
- }
272
- assert_eq ! (
273
- data_point1
274
- . expect( "datapoint with key1=value1 expected" )
275
- . value,
276
- 100
277
- ) ;
278
286
279
- // find and validate key1=value2 datapoint
280
- let mut data_point1 = None ;
281
- for datapoint in & sum. data_points {
282
- if datapoint
283
- . attributes
284
- . iter ( )
285
- . any ( |kv| kv. key . as_str ( ) == "key1" && kv. value . as_str ( ) == "value2" )
286
- {
287
- data_point1 = Some ( datapoint) ;
287
+ // find and validate key1=value1 datapoint
288
+ let data_point = find_datapoint_with_key_value ( & sum. data_points , "key1" , "value1" )
289
+ . expect ( "datapoint with key1=value1 expected" ) ;
290
+ if let Temporality :: Cumulative = temporality {
291
+ // Cumulative counter should have the value as is.
292
+ assert_eq ! ( data_point. value, * v) ;
293
+ } else {
294
+ // Delta counter should have the increment value.
295
+ // Except for the first value which should be the start value.
296
+ if iter == 0 {
297
+ assert_eq ! ( data_point. value, start) ;
298
+ } else {
299
+ assert_eq ! ( data_point. value, increment) ;
300
+ }
288
301
}
302
+
303
+ test_context. reset_metrics ( ) ;
289
304
}
290
- assert_eq ! (
291
- data_point1
292
- . expect( "datapoint with key1=value2 expected" )
293
- . value,
294
- 200
295
- ) ;
296
305
}
297
306
298
307
#[ tokio:: test( flavor = "multi_thread" , worker_threads = 1 ) ]
@@ -566,7 +575,6 @@ mod tests {
566
575
}
567
576
568
577
#[ tokio:: test( flavor = "multi_thread" , worker_threads = 1 ) ]
569
- // #[ignore = "Spatial aggregation is not yet implemented."]
570
578
async fn spatial_aggregation_when_view_drops_attributes_observable_counter ( ) {
571
579
// cargo test spatial_aggregation_when_view_drops_attributes_observable_counter --features=metrics,testing
572
580
@@ -1169,6 +1177,10 @@ mod tests {
1169
1177
updown_counter_builder. init ( )
1170
1178
}
1171
1179
1180
+ fn meter ( & self ) -> Meter {
1181
+ self . meter_provider . meter ( "test" )
1182
+ }
1183
+
1172
1184
fn flush_metrics ( & self ) {
1173
1185
self . meter_provider . force_flush ( ) . unwrap ( ) ;
1174
1186
}
0 commit comments