@@ -161,6 +161,189 @@ mod tests {
161
161
) ;
162
162
}
163
163
164
+ // "multi_thread" tokio flavor must be used else flush won't
165
+ // be able to make progress!
166
+ #[ tokio:: test( flavor = "multi_thread" , worker_threads = 1 ) ]
167
+ async fn counter_aggregation_overflow ( ) {
168
+ // Run this test with stdout enabled to see output.
169
+ // cargo test counter --features=metrics,testing -- --nocapture
170
+
171
+ // Arrange
172
+ let exporter = InMemoryMetricsExporter :: default ( ) ;
173
+ // PeriodicReader with large interval to avoid auto-flush
174
+ let reader = PeriodicReader :: builder ( exporter. clone ( ) , runtime:: Tokio )
175
+ . with_interval ( std:: time:: Duration :: from_secs ( 100000 ) )
176
+ . build ( ) ;
177
+ let meter_provider = SdkMeterProvider :: builder ( ) . with_reader ( reader) . build ( ) ;
178
+
179
+ // Act
180
+ let meter = meter_provider. meter ( "test" ) ;
181
+ let counter = meter
182
+ . u64_counter ( "my_counter" )
183
+ . with_unit ( Unit :: new ( "my_unit" ) )
184
+ . init ( ) ;
185
+
186
+ // sleep for random ~5 milis to avoid recording during first collect cycle
187
+ // (TBD: need to fix PeriodicReader to NOT collect data immediately after start)
188
+ std:: thread:: sleep ( std:: time:: Duration :: from_millis ( 5 ) ) ;
189
+ let unique_measurements = 1999 ;
190
+ let overflow_measurements = 4 ;
191
+ // Generate measurements to enforce overflow
192
+ for i in 0 ..unique_measurements + overflow_measurements {
193
+ let attribute_value = format ! ( "value{}" , i) ; // Creates a unique attribute value for each measurement
194
+ counter. add ( 1 , & [ KeyValue :: new ( "key1" , attribute_value) ] ) ;
195
+ }
196
+ meter_provider. force_flush ( ) . unwrap ( ) ;
197
+
198
+ // Assert
199
+ let resource_metrics = exporter
200
+ . get_finished_metrics ( )
201
+ . expect ( "metrics are expected to be exported." ) ;
202
+ // Every collect cycle produces a new ResourceMetrics (even if no data is collected).
203
+ // TBD = This needs to be fixed, and then below assert should validate for one entry
204
+ assert ! ( resource_metrics. len( ) == 2 ) ;
205
+ let metric = & resource_metrics[ 1 ] . scope_metrics [ 0 ] . metrics [ 0 ] ; // second ResourceMetrics
206
+ assert_eq ! ( metric. name, "my_counter" ) ;
207
+ assert_eq ! ( metric. unit. as_str( ) , "my_unit" ) ;
208
+ let sum = metric
209
+ . data
210
+ . as_any ( )
211
+ . downcast_ref :: < data:: Sum < u64 > > ( )
212
+ . expect ( "Sum aggregation expected for Counter instruments by default" ) ;
213
+
214
+ // Expecting 2000 unique time-series.
215
+ assert_eq ! ( sum. data_points. len( ) , unique_measurements + 1 ) ; // all overflow measurements are merged into one
216
+ assert ! ( sum. is_monotonic, "Counter should produce monotonic." ) ;
217
+ assert_eq ! (
218
+ sum. temporality,
219
+ data:: Temporality :: Cumulative ,
220
+ "Should produce cumulative by default."
221
+ ) ;
222
+ // ensure that overflow attribute is persent
223
+ for data_point in & sum. data_points {
224
+ let mut overflow_attribute_present = false ;
225
+ for attribute in data_point. attributes . iter ( ) {
226
+ if attribute. 0 == & opentelemetry:: Key :: from ( "otel.metric.overflow" ) {
227
+ overflow_attribute_present = true ;
228
+ break ;
229
+ }
230
+ }
231
+ if overflow_attribute_present {
232
+ assert_eq ! ( data_point. value, overflow_measurements as u64 ) ;
233
+ } else {
234
+ assert_eq ! ( data_point. value, 1 ) ;
235
+ }
236
+ }
237
+ }
238
+
239
+ // "multi_thread" tokio flavor must be used else flush won't
240
+ // be able to make progress!
241
+ #[ tokio:: test( flavor = "multi_thread" , worker_threads = 1 ) ]
242
+ async fn counter_aggregation_concurrent_overflow ( ) {
243
+ // Run this test with stdout enabled to see output.
244
+ // cargo test counter --features=metrics,testing -- --nocapture
245
+
246
+ // Arrange
247
+ let exporter = InMemoryMetricsExporter :: default ( ) ;
248
+ // PeriodicReader with large interval to avoid auto-flush
249
+ let reader = PeriodicReader :: builder ( exporter. clone ( ) , runtime:: Tokio )
250
+ . with_interval ( std:: time:: Duration :: from_secs ( 100000 ) )
251
+ . build ( ) ;
252
+ let meter_provider = SdkMeterProvider :: builder ( ) . with_reader ( reader) . build ( ) ;
253
+
254
+ // Act
255
+ let meter = meter_provider. meter ( "test" ) ;
256
+ let counter = meter
257
+ . u64_counter ( "my_counter" )
258
+ . with_unit ( Unit :: new ( "my_unit" ) )
259
+ . init ( ) ;
260
+
261
+ // sleep for random ~5 milis to avoid recording during first collect cycle
262
+ // (TBD: need to fix PeriodicReader to NOT collect data immediately after start)
263
+ std:: thread:: sleep ( std:: time:: Duration :: from_millis ( 5 ) ) ;
264
+
265
+ let unique_measurements = 1999 ;
266
+ let overflow_measurements = 4 ;
267
+ let total_measurements = unique_measurements + overflow_measurements;
268
+
269
+ let counter = std:: sync:: Arc :: new ( std:: sync:: Mutex :: new ( counter) ) ; // Shared counter among threads
270
+
271
+ let num_threads = 4 ;
272
+ let measurements_per_thread = total_measurements / num_threads;
273
+ let remainder = total_measurements % num_threads; // Remainder to be added to the last thread
274
+
275
+ let mut handles = vec ! [ ] ;
276
+
277
+ for thread_id in 0 ..num_threads {
278
+ let counter_clone = std:: sync:: Arc :: clone ( & counter) ;
279
+ let start_index = thread_id * measurements_per_thread;
280
+ let end_index = if thread_id == num_threads - 1 {
281
+ start_index + measurements_per_thread + remainder // Add remainder to the last thread
282
+ } else {
283
+ start_index + measurements_per_thread
284
+ } ;
285
+
286
+ let handle = std:: thread:: spawn ( move || {
287
+ for i in start_index..end_index {
288
+ let attribute_value = format ! ( "value{}" , i) ;
289
+ let kv = vec ! [ KeyValue :: new( "key1" , attribute_value) ] ;
290
+
291
+ let counter = counter_clone. lock ( ) . unwrap ( ) ;
292
+ counter. add ( 1 , & kv) ;
293
+ }
294
+ } ) ;
295
+
296
+ handles. push ( handle) ;
297
+ }
298
+
299
+ for handle in handles {
300
+ handle. join ( ) . unwrap ( ) ;
301
+ }
302
+
303
+ meter_provider. force_flush ( ) . unwrap ( ) ;
304
+
305
+ // Assert
306
+ let resource_metrics = exporter
307
+ . get_finished_metrics ( )
308
+ . expect ( "metrics are expected to be exported." ) ;
309
+ // Every collect cycle produces a new ResourceMetrics (even if no data is collected).
310
+ // TBD = This needs to be fixed, and then below assert should validate for one entry
311
+ assert ! ( resource_metrics. len( ) == 2 ) ;
312
+ let metric = & resource_metrics[ 1 ] . scope_metrics [ 0 ] . metrics [ 0 ] ; // second ResourceMetrics
313
+ assert_eq ! ( metric. name, "my_counter" ) ;
314
+ assert_eq ! ( metric. unit. as_str( ) , "my_unit" ) ;
315
+ let sum = metric
316
+ . data
317
+ . as_any ( )
318
+ . downcast_ref :: < data:: Sum < u64 > > ( )
319
+ . expect ( "Sum aggregation expected for Counter instruments by default" ) ;
320
+
321
+ // Expecting 2000 unique time-series.
322
+ assert_eq ! ( sum. data_points. len( ) , unique_measurements + 1 ) ; // all overflow measurements are merged into one
323
+ assert ! ( sum. is_monotonic, "Counter should produce monotonic." ) ;
324
+ assert_eq ! (
325
+ sum. temporality,
326
+ data:: Temporality :: Cumulative ,
327
+ "Should produce cumulative by default."
328
+ ) ;
329
+
330
+ // ensure that overflow attribute is persent
331
+ for data_point in & sum. data_points {
332
+ let mut overflow_attribute_present = false ;
333
+ for attribute in data_point. attributes . iter ( ) {
334
+ if attribute. 0 == & opentelemetry:: Key :: from ( "otel.metric.overflow" ) {
335
+ overflow_attribute_present = true ;
336
+ break ;
337
+ }
338
+ }
339
+ if overflow_attribute_present {
340
+ assert_eq ! ( data_point. value, overflow_measurements as u64 ) ;
341
+ } else {
342
+ assert_eq ! ( data_point. value, 1 ) ;
343
+ }
344
+ }
345
+ }
346
+
164
347
// "multi_thread" tokio flavor must be used else flush won't
165
348
// be able to make progress!
166
349
#[ tokio:: test( flavor = "multi_thread" , worker_threads = 1 ) ]
0 commit comments