Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve multi-threaded test for Counter #1858

Merged
132 changes: 100 additions & 32 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@
use std::borrow::Cow;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

// Run all tests in this mod
// cargo test metrics::tests --features=testing
Expand Down Expand Up @@ -1047,45 +1048,64 @@
let mut test_context = TestContext::new(temporality);
let counter = Arc::new(test_context.u64_counter("test", "my_counter", None));

let mut update_threads = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);

update_threads.push(thread::spawn(move || {
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
}));
}

for thread in update_threads {
thread.join().unwrap();
for i in 0..10 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe use 2x number of cores in the machine?

thread::scope(|s| {
s.spawn(|| {
counter.add(1, &[KeyValue::new("key1", "value1")]);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add 1-3 diff. timeseries
add 1 with 0 attributes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and a separate test for overflow part.

counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);

// Test concurrent collection by forcing half of the update threads to `force_flush` metrics and sleep for some time.
if i % 2 == 0 {
test_context.flush_metrics();
thread::sleep(Duration::from_millis(i)); // Make each thread sleep for some time duration for better testing
}

counter.add(1, &[KeyValue::new("key1", "value1")]);
counter.add(1, &[KeyValue::new("key1", "value1")]);
});
});
}

test_context.flush_metrics();

// Assert
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
// Expecting 2 time-series.
assert_eq!(sum.data_points.len(), 1);
assert!(sum.is_monotonic, "Counter should produce monotonic.");
assert_eq!(sum.temporality, temporality);
if let Temporality::Cumulative = temporality {
assert_eq!(
sum.temporality,
Temporality::Cumulative,
"Should produce cumulative"
);
// We invoke `test_context.flush_metrics()` six times.
let sums =
test_context.get_from_multiple_aggregations::<data::Sum<u64>>("my_counter", None, 6);

let values = sums
.iter()
.map(|sum| {
assert_eq!(sum.data_points.len(), 1); // Expecting 1 time-series.
assert!(sum.is_monotonic, "Counter should produce monotonic.");
assert_eq!(sum.temporality, temporality);

if let Temporality::Cumulative = temporality {
assert_eq!(
sum.temporality,
Temporality::Cumulative,
"Should produce cumulative"

Check warning on line 1088 in opentelemetry-sdk/src/metrics/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/mod.rs#L1088

Added line #L1088 was not covered by tests
);
} else {
assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
}

// find and validate key1=value1 datapoint
let data_point = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
.expect("datapoint with key1=value1 expected");

data_point.value
})
.collect::<Vec<_>>();

let total_sum: u64 = if temporality == Temporality::Delta {
values.iter().sum()
} else {
assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
}
*values.last().unwrap()
};

// find and validate key1=value2 datapoint
let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
.expect("datapoint with key1=value1 expected");
assert_eq!(data_point1.value, 50); // Each of the 10 update threads record measurements summing up to 5.
assert_eq!(total_sum, 50); // Each of the 10 update threads record measurements summing up to 5.
}

fn histogram_aggregation_helper(temporality: Temporality) {
Expand Down Expand Up @@ -1553,5 +1573,53 @@
.downcast_ref::<T>()
.expect("Failed to cast aggregation to expected type")
}

fn get_from_multiple_aggregations<T: data::Aggregation>(
&mut self,
counter_name: &str,
unit_name: Option<&str>,
invocation_count: usize,
) -> Vec<&T> {
self.resource_metrics = self
.exporter
.get_finished_metrics()
.expect("metrics expected to be exported");

assert!(
!self.resource_metrics.is_empty(),
"no metrics were exported"

Check warning on line 1590 in opentelemetry-sdk/src/metrics/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/mod.rs#L1590

Added line #L1590 was not covered by tests
);

assert_eq!(self.resource_metrics.len(), invocation_count, "Expected collect to be called {} times", invocation_count);

let result = self
.resource_metrics
.iter()
.map(|resource_metric| {
assert!(
!resource_metric.scope_metrics.is_empty(),
"An export with no scope metrics occurred"

Check warning on line 1601 in opentelemetry-sdk/src/metrics/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/mod.rs#L1601

Added line #L1601 was not covered by tests
);

assert!(!resource_metric.scope_metrics[0].metrics.is_empty());

let metric = &resource_metric.scope_metrics[0].metrics[0];
assert_eq!(metric.name, counter_name);

if let Some(expected_unit) = unit_name {
assert_eq!(metric.unit, expected_unit);

Check warning on line 1610 in opentelemetry-sdk/src/metrics/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/mod.rs#L1610

Added line #L1610 was not covered by tests
}

let aggregation = metric
.data
.as_any()
.downcast_ref::<T>()
.expect("Failed to cast aggregation to expected type");
aggregation
})
.collect::<Vec<_>>();

result
}
}
}
Loading