diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index ae29704c9e..6ad7303415 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -1062,6 +1062,15 @@ mod tests { histogram_multithreaded_aggregation_helper(Temporality::Delta); histogram_multithreaded_aggregation_helper(Temporality::Cumulative); } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn histogram_f64_multithreaded() { + // Run this test with stdout enabled to see output. + // cargo test histogram_f64_multithreaded --features=testing -- --nocapture + + histogram_f64_multithreaded_aggregation_helper(Temporality::Delta); + histogram_f64_multithreaded_aggregation_helper(Temporality::Cumulative); + } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn synchronous_instruments_cumulative_with_gap_in_measurements() { // Run this test with stdout enabled to see output. @@ -1561,6 +1570,143 @@ mod tests { } } + fn histogram_f64_multithreaded_aggregation_helper(temporality: Temporality) { + // Arrange + let mut test_context = TestContext::new(temporality); + let histogram = Arc::new(test_context.meter().f64_histogram("test_histogram").init()); + + for i in 0..10 { + thread::scope(|s| { + s.spawn(|| { + histogram.record(1.5, &[]); + histogram.record(4.6, &[]); + + histogram.record(5.0, &[KeyValue::new("key1", "value1")]); + histogram.record(7.3, &[KeyValue::new("key1", "value1")]); + histogram.record(18.1, &[KeyValue::new("key1", "value1")]); + + // Test concurrent collection by forcing half of the update threads to `force_flush` metrics and sleep for some time. + if i % 2 == 0 { + test_context.flush_metrics(); + thread::sleep(Duration::from_millis(i)); // Make each thread sleep for some time duration for better testing + } + + histogram.record(35.1, &[KeyValue::new("key1", "value1")]); + histogram.record(35.1, &[KeyValue::new("key1", "value1")]); + }); + }); + } + + test_context.flush_metrics(); + + // Assert + // We invoke `test_context.flush_metrics()` six times. + let histograms = test_context.get_from_multiple_aggregations::>( + "test_histogram", + None, + 6, + ); + + let ( + mut sum_zero_attributes, + mut count_zero_attributes, + mut min_zero_attributes, + mut max_zero_attributes, + ) = (0.0, 0, f64::MAX, f64::MIN); + let (mut sum_key1_value1, mut count_key1_value1, mut min_key1_value1, mut max_key1_value1) = + (0.0, 0, f64::MAX, f64::MIN); + + let mut bucket_counts_zero_attributes = vec![0; 16]; // There are 16 buckets for the default configuration + let mut bucket_counts_key1_value1 = vec![0; 16]; + + histograms.iter().for_each(|histogram| { + assert_eq!(histogram.data_points.len(), 2); // Expecting 1 time-series. + assert_eq!(histogram.temporality, temporality); + + let data_point_zero_attributes = + find_histogram_datapoint_with_no_attributes(&histogram.data_points).unwrap(); + let data_point_key1_value1 = + find_histogram_datapoint_with_key_value(&histogram.data_points, "key1", "value1") + .unwrap(); + + if temporality == Temporality::Delta { + sum_zero_attributes += data_point_zero_attributes.sum; + sum_key1_value1 += data_point_key1_value1.sum; + + count_zero_attributes += data_point_zero_attributes.count; + count_key1_value1 += data_point_key1_value1.count; + + min_zero_attributes = + min_zero_attributes.min(data_point_zero_attributes.min.unwrap()); + min_key1_value1 = min_key1_value1.min(data_point_key1_value1.min.unwrap()); + + max_zero_attributes = + max_zero_attributes.max(data_point_zero_attributes.max.unwrap()); + max_key1_value1 = max_key1_value1.max(data_point_key1_value1.max.unwrap()); + + assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16); + assert_eq!(data_point_key1_value1.bucket_counts.len(), 16); + + for (i, _) in data_point_zero_attributes.bucket_counts.iter().enumerate() { + bucket_counts_zero_attributes[i] += data_point_zero_attributes.bucket_counts[i]; + } + + for (i, _) in data_point_key1_value1.bucket_counts.iter().enumerate() { + bucket_counts_key1_value1[i] += data_point_key1_value1.bucket_counts[i]; + } + } else { + sum_zero_attributes = data_point_zero_attributes.sum; + sum_key1_value1 = data_point_key1_value1.sum; + + count_zero_attributes = data_point_zero_attributes.count; + count_key1_value1 = data_point_key1_value1.count; + + min_zero_attributes = data_point_zero_attributes.min.unwrap(); + min_key1_value1 = data_point_key1_value1.min.unwrap(); + + max_zero_attributes = data_point_zero_attributes.max.unwrap(); + max_key1_value1 = data_point_key1_value1.max.unwrap(); + + assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16); + assert_eq!(data_point_key1_value1.bucket_counts.len(), 16); + + bucket_counts_zero_attributes.clone_from(&data_point_zero_attributes.bucket_counts); + bucket_counts_key1_value1.clone_from(&data_point_key1_value1.bucket_counts); + }; + }); + + // Default buckets: + // (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, 25.0], (25.0, 50.0], (50.0, 75.0], (75.0, 100.0], (100.0, 250.0], (250.0, 500.0], + // (500.0, 750.0], (750.0, 1000.0], (1000.0, 2500.0], (2500.0, 5000.0], (5000.0, 7500.0], (7500.0, 10000.0], (10000.0, +∞). + + assert_eq!(count_zero_attributes, 20); // Each of the 10 update threads record two measurements. + assert!(f64::abs(61.0 - sum_zero_attributes) < 0.0001); // Each of the 10 update threads record measurements summing up to 6.1 (1.5 + 4.6) + assert_eq!(min_zero_attributes, 1.5); + assert_eq!(max_zero_attributes, 4.6); + + for (i, count) in bucket_counts_zero_attributes.iter().enumerate() { + match i { + 1 => assert_eq!(*count, 20), // For each of the 10 update threads, both the recorded values 1.5 and 4.6 fall under the bucket (0, 5.0]. + _ => assert_eq!(*count, 0), + } + } + + assert_eq!(count_key1_value1, 50); // Each of the 10 update threads record 5 measurements. + assert!(f64::abs(1006.0 - sum_key1_value1) < 0.0001); // Each of the 10 update threads record measurements summing up to 100.4 (5.0 + 7.3 + 18.1 + 35.1 + 35.1). + assert_eq!(min_key1_value1, 5.0); + assert_eq!(max_key1_value1, 35.1); + + for (i, count) in bucket_counts_key1_value1.iter().enumerate() { + match i { + 1 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 5.0 falls under the bucket (0, 5.0]. + 2 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 7.3 falls under the bucket (5.0, 10.0]. + 3 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 18.1 falls under the bucket (10.0, 25.0]. + 4 => assert_eq!(*count, 20), // For each of the 10 update threads, the recorded value 35.1 (recorded twice) falls under the bucket (25.0, 50.0]. + _ => assert_eq!(*count, 0), + } + } + } + fn histogram_aggregation_helper(temporality: Temporality) { // Arrange let mut test_context = TestContext::new(temporality);