Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multithreaded test for Histogram for f64 values #2038

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 146 additions & 0 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1062,6 +1062,15 @@ mod tests {
histogram_multithreaded_aggregation_helper(Temporality::Delta);
histogram_multithreaded_aggregation_helper(Temporality::Cumulative);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn histogram_f64_multithreaded() {
// Run this test with stdout enabled to see output.
// cargo test histogram_f64_multithreaded --features=testing -- --nocapture

histogram_f64_multithreaded_aggregation_helper(Temporality::Delta);
histogram_f64_multithreaded_aggregation_helper(Temporality::Cumulative);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn synchronous_instruments_cumulative_with_gap_in_measurements() {
// Run this test with stdout enabled to see output.
Expand Down Expand Up @@ -1561,6 +1570,143 @@ mod tests {
}
}

fn histogram_f64_multithreaded_aggregation_helper(temporality: Temporality) {
// Arrange
let mut test_context = TestContext::new(temporality);
let histogram = Arc::new(test_context.meter().f64_histogram("test_histogram").init());

for i in 0..10 {
thread::scope(|s| {
s.spawn(|| {
histogram.record(1.5, &[]);
histogram.record(4.6, &[]);

histogram.record(5.0, &[KeyValue::new("key1", "value1")]);
histogram.record(7.3, &[KeyValue::new("key1", "value1")]);
histogram.record(18.1, &[KeyValue::new("key1", "value1")]);

// Test concurrent collection by forcing half of the update threads to `force_flush` metrics and sleep for some time.
if i % 2 == 0 {
test_context.flush_metrics();
thread::sleep(Duration::from_millis(i)); // Make each thread sleep for some time duration for better testing
}

histogram.record(35.1, &[KeyValue::new("key1", "value1")]);
histogram.record(35.1, &[KeyValue::new("key1", "value1")]);
});
});
}

test_context.flush_metrics();

// Assert
// We invoke `test_context.flush_metrics()` six times.
let histograms = test_context.get_from_multiple_aggregations::<data::Histogram<f64>>(
"test_histogram",
None,
6,
);

let (
mut sum_zero_attributes,
mut count_zero_attributes,
mut min_zero_attributes,
mut max_zero_attributes,
) = (0.0, 0, f64::MAX, f64::MIN);
let (mut sum_key1_value1, mut count_key1_value1, mut min_key1_value1, mut max_key1_value1) =
(0.0, 0, f64::MAX, f64::MIN);

let mut bucket_counts_zero_attributes = vec![0; 16]; // There are 16 buckets for the default configuration
let mut bucket_counts_key1_value1 = vec![0; 16];

histograms.iter().for_each(|histogram| {
assert_eq!(histogram.data_points.len(), 2); // Expecting 1 time-series.
assert_eq!(histogram.temporality, temporality);

let data_point_zero_attributes =
find_histogram_datapoint_with_no_attributes(&histogram.data_points).unwrap();
let data_point_key1_value1 =
find_histogram_datapoint_with_key_value(&histogram.data_points, "key1", "value1")
.unwrap();

if temporality == Temporality::Delta {
sum_zero_attributes += data_point_zero_attributes.sum;
sum_key1_value1 += data_point_key1_value1.sum;

count_zero_attributes += data_point_zero_attributes.count;
count_key1_value1 += data_point_key1_value1.count;

min_zero_attributes =
min_zero_attributes.min(data_point_zero_attributes.min.unwrap());
min_key1_value1 = min_key1_value1.min(data_point_key1_value1.min.unwrap());

max_zero_attributes =
max_zero_attributes.max(data_point_zero_attributes.max.unwrap());
max_key1_value1 = max_key1_value1.max(data_point_key1_value1.max.unwrap());

assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16);
assert_eq!(data_point_key1_value1.bucket_counts.len(), 16);

for (i, _) in data_point_zero_attributes.bucket_counts.iter().enumerate() {
bucket_counts_zero_attributes[i] += data_point_zero_attributes.bucket_counts[i];
}

for (i, _) in data_point_key1_value1.bucket_counts.iter().enumerate() {
bucket_counts_key1_value1[i] += data_point_key1_value1.bucket_counts[i];
}
} else {
sum_zero_attributes = data_point_zero_attributes.sum;
sum_key1_value1 = data_point_key1_value1.sum;

count_zero_attributes = data_point_zero_attributes.count;
count_key1_value1 = data_point_key1_value1.count;

min_zero_attributes = data_point_zero_attributes.min.unwrap();
min_key1_value1 = data_point_key1_value1.min.unwrap();

max_zero_attributes = data_point_zero_attributes.max.unwrap();
max_key1_value1 = data_point_key1_value1.max.unwrap();

assert_eq!(data_point_zero_attributes.bucket_counts.len(), 16);
assert_eq!(data_point_key1_value1.bucket_counts.len(), 16);

bucket_counts_zero_attributes.clone_from(&data_point_zero_attributes.bucket_counts);
bucket_counts_key1_value1.clone_from(&data_point_key1_value1.bucket_counts);
};
});

// Default buckets:
// (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, 25.0], (25.0, 50.0], (50.0, 75.0], (75.0, 100.0], (100.0, 250.0], (250.0, 500.0],
// (500.0, 750.0], (750.0, 1000.0], (1000.0, 2500.0], (2500.0, 5000.0], (5000.0, 7500.0], (7500.0, 10000.0], (10000.0, +∞).

assert_eq!(count_zero_attributes, 20); // Each of the 10 update threads record two measurements.
assert!(f64::abs(61.0 - sum_zero_attributes) < 0.0001); // Each of the 10 update threads record measurements summing up to 6.1 (1.5 + 4.6)
assert_eq!(min_zero_attributes, 1.5);
assert_eq!(max_zero_attributes, 4.6);

for (i, count) in bucket_counts_zero_attributes.iter().enumerate() {
match i {
1 => assert_eq!(*count, 20), // For each of the 10 update threads, both the recorded values 1.5 and 4.6 fall under the bucket (0, 5.0].
_ => assert_eq!(*count, 0),
}
}

assert_eq!(count_key1_value1, 50); // Each of the 10 update threads record 5 measurements.
assert!(f64::abs(1006.0 - sum_key1_value1) < 0.0001); // Each of the 10 update threads record measurements summing up to 100.4 (5.0 + 7.3 + 18.1 + 35.1 + 35.1).
assert_eq!(min_key1_value1, 5.0);
assert_eq!(max_key1_value1, 35.1);

for (i, count) in bucket_counts_key1_value1.iter().enumerate() {
match i {
1 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 5.0 falls under the bucket (0, 5.0].
2 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 7.3 falls under the bucket (5.0, 10.0].
3 => assert_eq!(*count, 10), // For each of the 10 update threads, the recorded value 18.1 falls under the bucket (10.0, 25.0].
4 => assert_eq!(*count, 20), // For each of the 10 update threads, the recorded value 35.1 (recorded twice) falls under the bucket (25.0, 50.0].
_ => assert_eq!(*count, 0),
}
}
}

fn histogram_aggregation_helper(temporality: Temporality) {
// Arrange
let mut test_context = TestContext::new(temporality);
Expand Down
Loading