From 22324f19160a13f2d38aa547d97702bac57c7260 Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai Date: Mon, 3 Jun 2024 17:29:05 -0700 Subject: [PATCH 1/6] Update test --- opentelemetry-sdk/src/metrics/mod.rs | 128 ++++++++++++++++++++------- 1 file changed, 95 insertions(+), 33 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 1cf4e4c6cc..dd21503b94 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -150,7 +150,8 @@ mod tests { use rand::{rngs, Rng, SeedableRng}; use std::borrow::Cow; use std::sync::{Arc, Mutex}; - use std::thread; + use std::thread::{self, sleep}; + use std::time::Duration; // Run all tests in this mod // cargo test metrics::tests --features=testing @@ -1047,45 +1048,62 @@ mod tests { let mut test_context = TestContext::new(temporality); let counter = Arc::new(test_context.u64_counter("test", "my_counter", None)); - let mut update_threads = vec![]; - for _ in 0..10 { - let counter = Arc::clone(&counter); - - update_threads.push(thread::spawn(move || { - counter.add(1, &[KeyValue::new("key1", "value1")]); - counter.add(1, &[KeyValue::new("key1", "value1")]); - counter.add(1, &[KeyValue::new("key1", "value1")]); - counter.add(1, &[KeyValue::new("key1", "value1")]); - counter.add(1, &[KeyValue::new("key1", "value1")]); - })); - } - - for thread in update_threads { - thread.join().unwrap(); + for i in 0..10 { + println!("For loop: {}", i); + thread::scope(|s| { + s.spawn(|| { + counter.add(1, &[KeyValue::new("key1", "value1")]); + counter.add(1, &[KeyValue::new("key1", "value1")]); + counter.add(1, &[KeyValue::new("key1", "value1")]); + + // Test concurrent collection by forcing half of the update threads to `force_flush` metrics and sleep for some time. + if i % 2 == 0 { + test_context.flush_metrics(); + sleep(Duration::from_millis(i)); // Make each thread sleep for some time duration for better testing + } + + counter.add(1, &[KeyValue::new("key1", "value1")]); + counter.add(1, &[KeyValue::new("key1", "value1")]); + }); + }); } test_context.flush_metrics(); // Assert - let sum = test_context.get_aggregation::>("my_counter", None); - // Expecting 2 time-series. - assert_eq!(sum.data_points.len(), 1); - assert!(sum.is_monotonic, "Counter should produce monotonic."); - assert_eq!(sum.temporality, temporality); - if let Temporality::Cumulative = temporality { - assert_eq!( - sum.temporality, - Temporality::Cumulative, - "Should produce cumulative" - ); - } else { - assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta"); - } + // We invoke `test_context.flush_metrics()` six times. + let sums = test_context.get_from_multiple_aggregations::>("my_counter", None, 6); - // find and validate key1=value2 datapoint - let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value1") + let values = sums.iter().map(|sum| { + assert_eq!(sum.data_points.len(), 1); // Expecting 1 time-series. + assert!(sum.is_monotonic, "Counter should produce monotonic."); + assert_eq!(sum.temporality, temporality); + + if let Temporality::Cumulative = temporality { + assert_eq!( + sum.temporality, + Temporality::Cumulative, + "Should produce cumulative" + ); + } else { + assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta"); + } + + // find and validate key1=value1 datapoint + let data_point = find_datapoint_with_key_value(&sum.data_points, "key1", "value1") .expect("datapoint with key1=value1 expected"); - assert_eq!(data_point1.value, 50); // Each of the 10 update threads record measurements summing up to 5. + + data_point.value + }).collect::>(); + + let total_sum: u64 = if temporality == Temporality::Delta { + values.iter().sum() + } + else { + *values.last().unwrap() + }; + + assert_eq!(total_sum, 50); // Each of the 10 update threads record measurements summing up to 5. } fn histogram_aggregation_helper(temporality: Temporality) { @@ -1553,5 +1571,49 @@ mod tests { .downcast_ref::() .expect("Failed to cast aggregation to expected type") } + + fn get_from_multiple_aggregations( + &mut self, + counter_name: &str, + unit_name: Option<&str>, + invocation_count: usize, + ) -> Vec<&T> { + self.resource_metrics = self + .exporter + .get_finished_metrics() + .expect("metrics expected to be exported"); + + assert!( + !self.resource_metrics.is_empty(), + "no metrics were exported" + ); + + assert!( + self.resource_metrics.len() == invocation_count, + "Expected single resource metrics." + ); + + let result = self.resource_metrics.iter().map(|resource_metric| { + assert!( + !resource_metric.scope_metrics.is_empty(), + "No scope metrics in latest export" + ); + + assert!(!resource_metric.scope_metrics[0].metrics.is_empty()); + + let metric = &resource_metric.scope_metrics[0].metrics[0]; + assert_eq!(metric.name, counter_name); + + if let Some(expected_unit) = unit_name { + assert_eq!(metric.unit, expected_unit); + } + + let aggregation = metric.data.as_any().downcast_ref::().expect("Failed to cast aggregation to expected type"); + aggregation + }).collect::>(); + + result + } + } } From b7d85ffdf37d2ad7186411a6745a2b0423e1d9d3 Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai Date: Mon, 3 Jun 2024 17:38:54 -0700 Subject: [PATCH 2/6] Update test --- opentelemetry-sdk/src/metrics/mod.rs | 94 +++++++++++++++------------- 1 file changed, 52 insertions(+), 42 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index dd21503b94..09beb6a745 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -1055,13 +1055,13 @@ mod tests { counter.add(1, &[KeyValue::new("key1", "value1")]); counter.add(1, &[KeyValue::new("key1", "value1")]); counter.add(1, &[KeyValue::new("key1", "value1")]); - + // Test concurrent collection by forcing half of the update threads to `force_flush` metrics and sleep for some time. if i % 2 == 0 { test_context.flush_metrics(); - sleep(Duration::from_millis(i)); // Make each thread sleep for some time duration for better testing + thread::sleep(Duration::from_millis(i)); // Make each thread sleep for some time duration for better testing } - + counter.add(1, &[KeyValue::new("key1", "value1")]); counter.add(1, &[KeyValue::new("key1", "value1")]); }); @@ -1072,37 +1072,40 @@ mod tests { // Assert // We invoke `test_context.flush_metrics()` six times. - let sums = test_context.get_from_multiple_aggregations::>("my_counter", None, 6); - - let values = sums.iter().map(|sum| { - assert_eq!(sum.data_points.len(), 1); // Expecting 1 time-series. - assert!(sum.is_monotonic, "Counter should produce monotonic."); - assert_eq!(sum.temporality, temporality); + let sums = + test_context.get_from_multiple_aggregations::>("my_counter", None, 6); - if let Temporality::Cumulative = temporality { - assert_eq!( - sum.temporality, - Temporality::Cumulative, - "Should produce cumulative" - ); - } else { - assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta"); - } + let values = sums + .iter() + .map(|sum| { + assert_eq!(sum.data_points.len(), 1); // Expecting 1 time-series. + assert!(sum.is_monotonic, "Counter should produce monotonic."); + assert_eq!(sum.temporality, temporality); + + if let Temporality::Cumulative = temporality { + assert_eq!( + sum.temporality, + Temporality::Cumulative, + "Should produce cumulative" + ); + } else { + assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta"); + } - // find and validate key1=value1 datapoint - let data_point = find_datapoint_with_key_value(&sum.data_points, "key1", "value1") - .expect("datapoint with key1=value1 expected"); + // find and validate key1=value1 datapoint + let data_point = find_datapoint_with_key_value(&sum.data_points, "key1", "value1") + .expect("datapoint with key1=value1 expected"); - data_point.value - }).collect::>(); + data_point.value + }) + .collect::>(); let total_sum: u64 = if temporality == Temporality::Delta { values.iter().sum() - } - else { + } else { *values.last().unwrap() }; - + assert_eq!(total_sum, 50); // Each of the 10 update threads record measurements summing up to 5. } @@ -1571,7 +1574,7 @@ mod tests { .downcast_ref::() .expect("Failed to cast aggregation to expected type") } - + fn get_from_multiple_aggregations( &mut self, counter_name: &str, @@ -1593,27 +1596,34 @@ mod tests { "Expected single resource metrics." ); - let result = self.resource_metrics.iter().map(|resource_metric| { - assert!( - !resource_metric.scope_metrics.is_empty(), - "No scope metrics in latest export" - ); + let result = self + .resource_metrics + .iter() + .map(|resource_metric| { + assert!( + !resource_metric.scope_metrics.is_empty(), + "No scope metrics in latest export" + ); - assert!(!resource_metric.scope_metrics[0].metrics.is_empty()); + assert!(!resource_metric.scope_metrics[0].metrics.is_empty()); - let metric = &resource_metric.scope_metrics[0].metrics[0]; - assert_eq!(metric.name, counter_name); + let metric = &resource_metric.scope_metrics[0].metrics[0]; + assert_eq!(metric.name, counter_name); - if let Some(expected_unit) = unit_name { - assert_eq!(metric.unit, expected_unit); - } + if let Some(expected_unit) = unit_name { + assert_eq!(metric.unit, expected_unit); + } - let aggregation = metric.data.as_any().downcast_ref::().expect("Failed to cast aggregation to expected type"); - aggregation - }).collect::>(); + let aggregation = metric + .data + .as_any() + .downcast_ref::() + .expect("Failed to cast aggregation to expected type"); + aggregation + }) + .collect::>(); result } - } } From 0accd81a5733ebcc3dc123b3c543551471b5cf4d Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai Date: Mon, 3 Jun 2024 17:42:25 -0700 Subject: [PATCH 3/6] Remove print statement --- opentelemetry-sdk/src/metrics/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 09beb6a745..87cf0643e4 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -1049,7 +1049,6 @@ mod tests { let counter = Arc::new(test_context.u64_counter("test", "my_counter", None)); for i in 0..10 { - println!("For loop: {}", i); thread::scope(|s| { s.spawn(|| { counter.add(1, &[KeyValue::new("key1", "value1")]); From 1a45d40983b62b0d8e9c5ab2275f91e8577a2aab Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai Date: Mon, 3 Jun 2024 17:58:39 -0700 Subject: [PATCH 4/6] Remove unnecessary imports --- opentelemetry-sdk/src/metrics/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 87cf0643e4..21fe727e8a 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -150,7 +150,7 @@ mod tests { use rand::{rngs, Rng, SeedableRng}; use std::borrow::Cow; use std::sync::{Arc, Mutex}; - use std::thread::{self, sleep}; + use std::thread; use std::time::Duration; // Run all tests in this mod From c9fc8184f30940d9e7fdc2047829d96ff26fafef Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai Date: Mon, 3 Jun 2024 19:40:24 -0700 Subject: [PATCH 5/6] Address PR comments --- opentelemetry-sdk/src/metrics/mod.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 21fe727e8a..5a5dfa1970 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -1590,10 +1590,7 @@ mod tests { "no metrics were exported" ); - assert!( - self.resource_metrics.len() == invocation_count, - "Expected single resource metrics." - ); + assert_eq!(self.resource_metrics.len(), invocation_count, "Expected collect to be called {} times", invocation_count); let result = self .resource_metrics @@ -1601,7 +1598,7 @@ mod tests { .map(|resource_metric| { assert!( !resource_metric.scope_metrics.is_empty(), - "No scope metrics in latest export" + "An export with no scope metrics occurred" ); assert!(!resource_metric.scope_metrics[0].metrics.is_empty()); From 7ec23e1fbd3eac6169cc6fc35f0e5f9637909026 Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai Date: Tue, 4 Jun 2024 10:23:36 -0700 Subject: [PATCH 6/6] Fix lint; address PR comments --- opentelemetry-sdk/src/metrics/mod.rs | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 5a5dfa1970..2c6b709fef 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -1081,16 +1081,6 @@ mod tests { assert!(sum.is_monotonic, "Counter should produce monotonic."); assert_eq!(sum.temporality, temporality); - if let Temporality::Cumulative = temporality { - assert_eq!( - sum.temporality, - Temporality::Cumulative, - "Should produce cumulative" - ); - } else { - assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta"); - } - // find and validate key1=value1 datapoint let data_point = find_datapoint_with_key_value(&sum.data_points, "key1", "value1") .expect("datapoint with key1=value1 expected"); @@ -1590,7 +1580,12 @@ mod tests { "no metrics were exported" ); - assert_eq!(self.resource_metrics.len(), invocation_count, "Expected collect to be called {} times", invocation_count); + assert_eq!( + self.resource_metrics.len(), + invocation_count, + "Expected collect to be called {} times", + invocation_count + ); let result = self .resource_metrics