diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 84d0735053..99a2797a1a 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -9,7 +9,7 @@ use std::collections::HashMap; use std::marker::PhantomData; use std::ops::{Add, AddAssign, Sub}; use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, RwLock}; use aggregate::is_under_cardinality_limit; pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure}; @@ -267,39 +267,56 @@ impl AtomicallyUpdate for i64 { } pub(crate) struct F64AtomicTracker { - inner: Mutex, // Floating points don't have true atomics, so we need to use mutex for them + inner: AtomicU64, // Floating points don't have true atomics, so we need to use the their binary representation to perform atomic operations } impl F64AtomicTracker { fn new() -> Self { + let zero_as_u64 = 0.0_f64.to_bits(); F64AtomicTracker { - inner: Mutex::new(0.0), + inner: AtomicU64::new(zero_as_u64), } } } impl AtomicTracker for F64AtomicTracker { fn store(&self, value: f64) { - let mut guard = self.inner.lock().expect("F64 mutex was poisoned"); - *guard = value; + let value_as_u64 = value.to_bits(); + self.inner.store(value_as_u64, Ordering::Relaxed); } fn add(&self, value: f64) { - let mut guard = self.inner.lock().expect("F64 mutex was poisoned"); - *guard += value; + let mut current_value_as_u64 = self.inner.load(Ordering::Relaxed); + + loop { + let current_value = f64::from_bits(current_value_as_u64); + let new_value = current_value + value; + let new_value_as_u64 = new_value.to_bits(); + match self.inner.compare_exchange( + current_value_as_u64, + new_value_as_u64, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + // Succeeded in updating the value + Ok(_) => return, + + // Some other thread changed the value before this thread could update it. + // Read the latest value again and try to swap it with the recomputed `new_value_as_u64`. + Err(v) => current_value_as_u64 = v, + } + } } fn get_value(&self) -> f64 { - let guard = self.inner.lock().expect("F64 mutex was poisoned"); - *guard + let value_as_u64 = self.inner.load(Ordering::Relaxed); + f64::from_bits(value_as_u64) } fn get_and_reset_value(&self) -> f64 { - let mut guard = self.inner.lock().expect("F64 mutex was poisoned"); - let value = *guard; - *guard = 0.0; - - value + let zero_as_u64 = 0.0_f64.to_bits(); + let value = self.inner.swap(zero_as_u64, Ordering::Relaxed); + f64::from_bits(value) } } diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index df29d6647d..7bd5b3fef8 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -1251,6 +1251,15 @@ mod tests { counter_multithreaded_aggregation_helper(Temporality::Cumulative); } + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn counter_f64_multithreaded() { + // Run this test with stdout enabled to see output. + // cargo test counter_f64_multithreaded --features=testing -- --nocapture + + counter_f64_multithreaded_aggregation_helper(Temporality::Delta); + counter_f64_multithreaded_aggregation_helper(Temporality::Cumulative); + } + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn synchronous_instruments_cumulative_with_gap_in_measurements() { // Run this test with stdout enabled to see output. @@ -1438,6 +1447,59 @@ mod tests { assert_eq!(sum_key1_value1, 50); // Each of the 10 update threads record measurements summing up to 5. } + fn counter_f64_multithreaded_aggregation_helper(temporality: Temporality) { + // Arrange + let mut test_context = TestContext::new(temporality); + let counter = Arc::new(test_context.meter().f64_counter("test_counter").init()); + + for i in 0..10 { + thread::scope(|s| { + s.spawn(|| { + counter.add(1.23, &[]); + + counter.add(1.23, &[KeyValue::new("key1", "value1")]); + counter.add(1.23, &[KeyValue::new("key1", "value1")]); + counter.add(1.23, &[KeyValue::new("key1", "value1")]); + + // Test concurrent collection by forcing half of the update threads to `force_flush` metrics and sleep for some time. + if i % 2 == 0 { + test_context.flush_metrics(); + thread::sleep(Duration::from_millis(i)); // Make each thread sleep for some time duration for better testing + } + + counter.add(1.23, &[KeyValue::new("key1", "value1")]); + counter.add(1.23, &[KeyValue::new("key1", "value1")]); + }); + }); + } + + test_context.flush_metrics(); + + // Assert + // We invoke `test_context.flush_metrics()` six times. + let sums = + test_context.get_from_multiple_aggregations::>("test_counter", None, 6); + + let mut sum_zero_attributes = 0.0; + let mut sum_key1_value1 = 0.0; + sums.iter().for_each(|sum| { + assert_eq!(sum.data_points.len(), 2); // Expecting 1 time-series. + assert!(sum.is_monotonic, "Counter should produce monotonic."); + assert_eq!(sum.temporality, temporality); + + if temporality == Temporality::Delta { + sum_zero_attributes += sum.data_points[0].value; + sum_key1_value1 += sum.data_points[1].value; + } else { + sum_zero_attributes = sum.data_points[0].value; + sum_key1_value1 = sum.data_points[1].value; + }; + }); + + assert!(f64::abs(12.3 - sum_zero_attributes) < 0.0001); + assert!(f64::abs(61.5 - sum_key1_value1) < 0.0001); // Each of the 10 update threads record measurements 5 times = 10 * 5 * 1.23 = 61.5 + } + fn histogram_aggregation_helper(temporality: Temporality) { // Arrange let mut test_context = TestContext::new(temporality);