Skip to content

Commit f1cdaca

Browse files
utpillaTommyCppcijothomas
authored
Improve multi-threaded test for Counter (#1858)
Co-authored-by: Zhongyang Wu <zhongyang.wu@outlook.com> Co-authored-by: Cijo Thomas <cijo.thomas@gmail.com>
1 parent f488006 commit f1cdaca

File tree

1 file changed

+95
-32
lines changed
  • opentelemetry-sdk/src/metrics

1 file changed

+95
-32
lines changed

opentelemetry-sdk/src/metrics/mod.rs

+95-32
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ mod tests {
151151
use std::borrow::Cow;
152152
use std::sync::{Arc, Mutex};
153153
use std::thread;
154+
use std::time::Duration;
154155

155156
// Run all tests in this mod
156157
// cargo test metrics::tests --features=testing
@@ -1047,45 +1048,54 @@ mod tests {
10471048
let mut test_context = TestContext::new(temporality);
10481049
let counter = Arc::new(test_context.u64_counter("test", "my_counter", None));
10491050

1050-
let mut update_threads = vec![];
1051-
for _ in 0..10 {
1052-
let counter = Arc::clone(&counter);
1053-
1054-
update_threads.push(thread::spawn(move || {
1055-
counter.add(1, &[KeyValue::new("key1", "value1")]);
1056-
counter.add(1, &[KeyValue::new("key1", "value1")]);
1057-
counter.add(1, &[KeyValue::new("key1", "value1")]);
1058-
counter.add(1, &[KeyValue::new("key1", "value1")]);
1059-
counter.add(1, &[KeyValue::new("key1", "value1")]);
1060-
}));
1061-
}
1062-
1063-
for thread in update_threads {
1064-
thread.join().unwrap();
1051+
for i in 0..10 {
1052+
thread::scope(|s| {
1053+
s.spawn(|| {
1054+
counter.add(1, &[KeyValue::new("key1", "value1")]);
1055+
counter.add(1, &[KeyValue::new("key1", "value1")]);
1056+
counter.add(1, &[KeyValue::new("key1", "value1")]);
1057+
1058+
// Test concurrent collection by forcing half of the update threads to `force_flush` metrics and sleep for some time.
1059+
if i % 2 == 0 {
1060+
test_context.flush_metrics();
1061+
thread::sleep(Duration::from_millis(i)); // Make each thread sleep for some time duration for better testing
1062+
}
1063+
1064+
counter.add(1, &[KeyValue::new("key1", "value1")]);
1065+
counter.add(1, &[KeyValue::new("key1", "value1")]);
1066+
});
1067+
});
10651068
}
10661069

10671070
test_context.flush_metrics();
10681071

10691072
// Assert
1070-
let sum = test_context.get_aggregation::<data::Sum<u64>>("my_counter", None);
1071-
// Expecting 2 time-series.
1072-
assert_eq!(sum.data_points.len(), 1);
1073-
assert!(sum.is_monotonic, "Counter should produce monotonic.");
1074-
assert_eq!(sum.temporality, temporality);
1075-
if let Temporality::Cumulative = temporality {
1076-
assert_eq!(
1077-
sum.temporality,
1078-
Temporality::Cumulative,
1079-
"Should produce cumulative"
1080-
);
1073+
// We invoke `test_context.flush_metrics()` six times.
1074+
let sums =
1075+
test_context.get_from_multiple_aggregations::<data::Sum<u64>>("my_counter", None, 6);
1076+
1077+
let values = sums
1078+
.iter()
1079+
.map(|sum| {
1080+
assert_eq!(sum.data_points.len(), 1); // Expecting 1 time-series.
1081+
assert!(sum.is_monotonic, "Counter should produce monotonic.");
1082+
assert_eq!(sum.temporality, temporality);
1083+
1084+
// find and validate key1=value1 datapoint
1085+
let data_point = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1086+
.expect("datapoint with key1=value1 expected");
1087+
1088+
data_point.value
1089+
})
1090+
.collect::<Vec<_>>();
1091+
1092+
let total_sum: u64 = if temporality == Temporality::Delta {
1093+
values.iter().sum()
10811094
} else {
1082-
assert_eq!(sum.temporality, Temporality::Delta, "Should produce delta");
1083-
}
1095+
*values.last().unwrap()
1096+
};
10841097

1085-
// find and validate key1=value2 datapoint
1086-
let data_point1 = find_datapoint_with_key_value(&sum.data_points, "key1", "value1")
1087-
.expect("datapoint with key1=value1 expected");
1088-
assert_eq!(data_point1.value, 50); // Each of the 10 update threads record measurements summing up to 5.
1098+
assert_eq!(total_sum, 50); // Each of the 10 update threads record measurements summing up to 5.
10891099
}
10901100

10911101
fn histogram_aggregation_helper(temporality: Temporality) {
@@ -1553,5 +1563,58 @@ mod tests {
15531563
.downcast_ref::<T>()
15541564
.expect("Failed to cast aggregation to expected type")
15551565
}
1566+
1567+
fn get_from_multiple_aggregations<T: data::Aggregation>(
1568+
&mut self,
1569+
counter_name: &str,
1570+
unit_name: Option<&str>,
1571+
invocation_count: usize,
1572+
) -> Vec<&T> {
1573+
self.resource_metrics = self
1574+
.exporter
1575+
.get_finished_metrics()
1576+
.expect("metrics expected to be exported");
1577+
1578+
assert!(
1579+
!self.resource_metrics.is_empty(),
1580+
"no metrics were exported"
1581+
);
1582+
1583+
assert_eq!(
1584+
self.resource_metrics.len(),
1585+
invocation_count,
1586+
"Expected collect to be called {} times",
1587+
invocation_count
1588+
);
1589+
1590+
let result = self
1591+
.resource_metrics
1592+
.iter()
1593+
.map(|resource_metric| {
1594+
assert!(
1595+
!resource_metric.scope_metrics.is_empty(),
1596+
"An export with no scope metrics occurred"
1597+
);
1598+
1599+
assert!(!resource_metric.scope_metrics[0].metrics.is_empty());
1600+
1601+
let metric = &resource_metric.scope_metrics[0].metrics[0];
1602+
assert_eq!(metric.name, counter_name);
1603+
1604+
if let Some(expected_unit) = unit_name {
1605+
assert_eq!(metric.unit, expected_unit);
1606+
}
1607+
1608+
let aggregation = metric
1609+
.data
1610+
.as_any()
1611+
.downcast_ref::<T>()
1612+
.expect("Failed to cast aggregation to expected type");
1613+
aggregation
1614+
})
1615+
.collect::<Vec<_>>();
1616+
1617+
result
1618+
}
15561619
}
15571620
}

0 commit comments

Comments
 (0)