Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve multi-threaded test for Counter #1858

Merged
127 changes: 95 additions & 32 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ mod tests {
use std::borrow::Cow;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

// Run all tests in this mod
// cargo test metrics::tests --features=testing
Expand Down Expand Up @@ -1047,45 +1048,54 @@ mod tests {
let mut test_context = TestContext::new(temporality);
let counter = Arc::new(test_context.u64_counter("test", "my_counter", None));

let mut update_threads = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);

update_threads.push(thread::spawn(move || {
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
}));
}

for thread in update_threads {
thread.join().unwrap();
for i in 0..10 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe use 2x number of cores in the machine?

thread::scope(|s| {
s.spawn(|| {
counter.add(1, &[KeyValue::new("key1", "value1")]);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add 1-3 diff. timeseries
add 1 with 0 attributes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and a separate test for overflow part.

counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);

// Test concurrent collection by forcing half of the update threads to `force_flush` metrics and sleep for some time.
if i % 2 == 0 {
test_context.flush_metrics();
thread::sleep(Duration::from_millis(i)); // Make each thread sleep for some time duration for better testing
}

counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
});
});
}

test_context.flush_metrics();

// Assert
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
// Expecting 2 time-series.
assert_eq!(sum.data_points.len(), 1);
assert!(sum.is_monotonic, "Counter should produce monotonic.");
assert_eq!(sum.temporality, temporality);
if let Temporality::Cumulative = temporality {
assert_eq!(
sum.temporality,
Temporality::Cumulative,
"Should produce cumulative"
);
// We invoke `test_context.flush_metrics()` six times.
let sums =
test_context.get_from_multiple_aggregations::<data::Sum<u64>>("my_counter", None, 6);

let values = sums
.iter()
.map(|sum| {
assert_eq!(sum.data_points.len(), 1); // Expecting 1 time-series.
assert!(sum.is_monotonic, "Counter should produce monotonic.");
assert_eq!(sum.temporality, temporality);

// find and validate key1=value1 datapoint
let data_point = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
.expect("datapoint with key1=value1 expected");

data_point.value
})
.collect::<Vec<_>>();

let total_sum: u64 = if temporality == Temporality::Delta {
values.iter().sum()
} else {
assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
}
*values.last().unwrap()
};

// find and validate key1=value2 datapoint
let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
.expect("datapoint with key1=value1 expected");
assert_eq!(data_point1.value, 50); // Each of the 10 update threads record measurements summing up to 5.
assert_eq!(total_sum, 50); // Each of the 10 update threads record measurements summing up to 5.
}

fn histogram_aggregation_helper(temporality: Temporality) {
Expand Down Expand Up @@ -1553,5 +1563,58 @@ mod tests {
.downcast_ref::<T>()
.expect("Failed to cast aggregation to expected type")
}

fn get_from_multiple_aggregations<T: data::Aggregation>(
&mut self,
counter_name: &str,
unit_name: Option<&str>,
invocation_count: usize,
) -> Vec<&T> {
self.resource_metrics = self
.exporter
.get_finished_metrics()
.expect("metrics expected to be exported");

assert!(
!self.resource_metrics.is_empty(),
"no metrics were exported"
);

assert_eq!(
self.resource_metrics.len(),
invocation_count,
"Expected collect to be called {} times",
invocation_count
);

let result = self
.resource_metrics
.iter()
.map(|resource_metric| {
assert!(
!resource_metric.scope_metrics.is_empty(),
"An export with no scope metrics occurred"
);

assert!(!resource_metric.scope_metrics[0].metrics.is_empty());

let metric = &resource_metric.scope_metrics[0].metrics[0];
assert_eq!(metric.name, counter_name);

if let Some(expected_unit) = unit_name {
assert_eq!(metric.unit, expected_unit);
}

let aggregation = metric
.data
.as_any()
.downcast_ref::<T>()
.expect("Failed to cast aggregation to expected type");
aggregation
})
.collect::<Vec<_>>();

result
}
}
}
Loading