Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lock free updates for floating point metrics - Throughput increased by up to 50% #2016

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 31 additions & 14 deletions opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
use std::marker::PhantomData;
use std::ops::{Add, AddAssign, Sub};
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::sync::{Arc, RwLock};

use aggregate::is_under_cardinality_limit;
pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure};
Expand Down Expand Up @@ -267,39 +267,56 @@
}

pub(crate) struct F64AtomicTracker {
inner: Mutex<f64>, // Floating points don't have true atomics, so we need to use mutex for them
inner: AtomicU64, // Floating points don't have true atomics, so we need to use the their binary representation to perform atomic operations
}

impl F64AtomicTracker {
fn new() -> Self {
let zero_as_u64 = 0.0_f64.to_bits();
F64AtomicTracker {
inner: Mutex::new(0.0),
inner: AtomicU64::new(zero_as_u64),
}
}
}

impl AtomicTracker<f64> for F64AtomicTracker {
fn store(&self, value: f64) {
let mut guard = self.inner.lock().expect("F64 mutex was poisoned");
*guard = value;
let value_as_u64 = value.to_bits();
self.inner.store(value_as_u64, Ordering::Relaxed);

Check warning on line 285 in opentelemetry-sdk/src/metrics/internal/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/mod.rs#L284-L285

Added lines #L284 - L285 were not covered by tests
}

fn add(&self, value: f64) {
let mut guard = self.inner.lock().expect("F64 mutex was poisoned");
*guard += value;
let mut current_value_as_u64 = self.inner.load(Ordering::Relaxed);

loop {
let current_value = f64::from_bits(current_value_as_u64);
let new_value = current_value + value;
let new_value_as_u64 = new_value.to_bits();
match self.inner.compare_exchange(
current_value_as_u64,
new_value_as_u64,
Ordering::Relaxed,
Ordering::Relaxed,
) {
// Succeeded in updating the value
Ok(_) => return,

// Some other thread changed the value before this thread could update it.
// Read the latest value again and try to swap it with the recomputed `new_value_as_u64`.
Err(v) => current_value_as_u64 = v,

Check warning on line 306 in opentelemetry-sdk/src/metrics/internal/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/mod.rs#L306

Added line #L306 was not covered by tests
}
}
}

fn get_value(&self) -> f64 {
let guard = self.inner.lock().expect("F64 mutex was poisoned");
*guard
let value_as_u64 = self.inner.load(Ordering::Relaxed);
f64::from_bits(value_as_u64)
}

fn get_and_reset_value(&self) -> f64 {
let mut guard = self.inner.lock().expect("F64 mutex was poisoned");
let value = *guard;
*guard = 0.0;

value
let zero_as_u64 = 0.0_f64.to_bits();
let value = self.inner.swap(zero_as_u64, Ordering::Relaxed);
f64::from_bits(value)
}
}

Expand Down
62 changes: 62 additions & 0 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1251,6 +1251,15 @@ mod tests {
counter_multithreaded_aggregation_helper(Temporality::Cumulative);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn counter_f64_multithreaded() {
// Run this test with stdout enabled to see output.
// cargo test counter_f64_multithreaded --features=testing -- --nocapture

counter_f64_multithreaded_aggregation_helper(Temporality::Delta);
counter_f64_multithreaded_aggregation_helper(Temporality::Cumulative);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn synchronous_instruments_cumulative_with_gap_in_measurements() {
// Run this test with stdout enabled to see output.
Expand Down Expand Up @@ -1438,6 +1447,59 @@ mod tests {
assert_eq!(sum_key1_value1, 50); // Each of the 10 update threads record measurements summing up to 5.
}

fn counter_f64_multithreaded_aggregation_helper(temporality: Temporality) {
// Arrange
let mut test_context = TestContext::new(temporality);
let counter = Arc::new(test_context.meter().f64_counter("test_counter").init());

for i in 0..10 {
thread::scope(|s| {
s.spawn(|| {
counter.add(1.23, &[]);

counter.add(1.23, &[KeyValue::new("key1", "value1")]);
counter.add(1.23, &[KeyValue::new("key1", "value1")]);
counter.add(1.23, &[KeyValue::new("key1", "value1")]);

// Test concurrent collection by forcing half of the update threads to `force_flush` metrics and sleep for some time.
if i % 2 == 0 {
test_context.flush_metrics();
thread::sleep(Duration::from_millis(i)); // Make each thread sleep for some time duration for better testing
}

counter.add(1.23, &[KeyValue::new("key1", "value1")]);
counter.add(1.23, &[KeyValue::new("key1", "value1")]);
});
});
}

test_context.flush_metrics();

// Assert
// We invoke `test_context.flush_metrics()` six times.
let sums =
test_context.get_from_multiple_aggregations::<data::Sum<f64>>("test_counter", None, 6);

let mut sum_zero_attributes = 0.0;
let mut sum_key1_value1 = 0.0;
sums.iter().for_each(|sum| {
assert_eq!(sum.data_points.len(), 2); // Expecting 1 time-series.
assert!(sum.is_monotonic, "Counter should produce monotonic.");
assert_eq!(sum.temporality, temporality);

if temporality == Temporality::Delta {
sum_zero_attributes += sum.data_points[0].value;
sum_key1_value1 += sum.data_points[1].value;
} else {
sum_zero_attributes = sum.data_points[0].value;
sum_key1_value1 = sum.data_points[1].value;
};
});

assert!(f64::abs(12.3 - sum_zero_attributes) < 0.0001);
assert!(f64::abs(61.5 - sum_key1_value1) < 0.0001); // Each of the 10 update threads record measurements 5 times = 10 * 5 * 1.23 = 61.5
}

fn histogram_aggregation_helper(temporality: Temporality) {
// Arrange
let mut test_context = TestContext::new(temporality);
Expand Down
Loading